brows3r_lib/transfers/progress.rs
1//! Progress event emission helpers for transfers.
2//!
3//! # Design
4//!
5//! `emit_progress` throttles emissions to at most one per 250 ms **or** per
6//! 256 KB transferred, whichever condition is met first. This avoids flooding
7//! the IPC channel while still giving the frontend a responsive progress bar.
8//!
9//! `emit_state` is unthrottled — state transitions (Queued → Running → Done /
10//! Failed / Canceled) must always reach the frontend.
11//!
12//! # OCP contract
13//!
14//! The same helpers are reused for uploads (task 32) — the throttle logic lives
15//! in one place and is called identically from both `download.rs` and
16//! `upload.rs`.
17
18use serde::Serialize;
19
20use crate::{
21 error::AppError,
22 events::{EventEmitter, EventKind},
23 transfers::TransferState,
24};
25
26// ---------------------------------------------------------------------------
27// Throttle constants
28// ---------------------------------------------------------------------------
29
30/// Minimum milliseconds between progress events for one transfer.
31pub const PROGRESS_THROTTLE_MS: i64 = 250;
32
33/// Minimum bytes transferred between progress events.
34pub const PROGRESS_THROTTLE_BYTES: u64 = 262_144; // 256 KB
35
36// ---------------------------------------------------------------------------
37// Event payloads
38// ---------------------------------------------------------------------------
39
40/// Payload for `transfer:progress`.
41#[derive(Debug, Clone, Serialize)]
42#[serde(rename_all = "camelCase")]
43pub struct TransferProgressPayload {
44 pub request_id: String,
45 pub bytes_done: u64,
46 pub bytes_total: Option<u64>,
47 pub parts_done: u32,
48 pub parts_total: u32,
49}
50
51/// Payload for `transfer:state`.
52///
53/// Carries the AppError when the transition is to `Failed`. The frontend
54/// stores this on the transfer record so TransferRow can render the
55/// failure reason inline instead of just a red badge.
56#[derive(Debug, Clone, Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct TransferStatePayload {
59 pub request_id: String,
60 pub state: TransferState,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub error: Option<AppError>,
63}
64
65// ---------------------------------------------------------------------------
66// ProgressThrottle — per-transfer state tracker
67// ---------------------------------------------------------------------------
68
69/// Tracks when the last progress event was emitted so `emit_progress` can
70/// apply the 250 ms / 256 KB throttle.
71///
72/// Instantiate one `ProgressThrottle` per transfer at the start of the stream
73/// loop and pass it mutably to each `emit_progress` call.
74#[derive(Debug)]
75pub struct ProgressThrottle {
76 /// Millisecond timestamp of the last emitted event (0 = never).
77 pub last_emitted_at_ms: i64,
78 /// `transferred_bytes` value at the time of the last emitted event.
79 pub last_emitted_bytes: u64,
80}
81
82impl ProgressThrottle {
83 /// Create a new throttle state. `now_ms` is the current time in ms.
84 pub fn new() -> Self {
85 Self {
86 last_emitted_at_ms: 0,
87 last_emitted_bytes: 0,
88 }
89 }
90
91 /// Returns `true` when a progress event should be emitted now.
92 ///
93 /// The gate opens when:
94 /// - no event has been emitted yet (`last_emitted_at_ms == 0`), OR
95 /// - at least 250 ms have elapsed since the last emission, OR
96 /// - at least 256 KB more have been transferred since the last emission.
97 pub fn should_emit(&self, now_ms: i64, bytes_done: u64) -> bool {
98 if self.last_emitted_at_ms == 0 {
99 return true;
100 }
101 let elapsed_ms = now_ms - self.last_emitted_at_ms;
102 let delta_bytes = bytes_done.saturating_sub(self.last_emitted_bytes);
103 elapsed_ms >= PROGRESS_THROTTLE_MS || delta_bytes >= PROGRESS_THROTTLE_BYTES
104 }
105
106 /// Record that an event was just emitted.
107 pub fn record_emission(&mut self, now_ms: i64, bytes_done: u64) {
108 self.last_emitted_at_ms = now_ms;
109 self.last_emitted_bytes = bytes_done;
110 }
111}
112
113impl Default for ProgressThrottle {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119// ---------------------------------------------------------------------------
120// emit_progress
121// ---------------------------------------------------------------------------
122
123/// Emit a `transfer:progress` event if the throttle gate is open.
124///
125/// Updates `throttle` on emission. `now_ms` is the current wall-clock time
126/// in milliseconds (callers obtain this via
127/// `std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64`).
128///
129/// Returns `Ok(true)` when an event was emitted, `Ok(false)` when throttled.
130pub fn emit_progress<E: EventEmitter>(
131 channel: &E,
132 request_id: &str,
133 bytes_done: u64,
134 bytes_total: Option<u64>,
135 parts_done: u32,
136 parts_total: u32,
137 throttle: &mut ProgressThrottle,
138 now_ms: i64,
139) -> Result<bool, AppError> {
140 if !throttle.should_emit(now_ms, bytes_done) {
141 return Ok(false);
142 }
143
144 crate::events::emit(
145 channel,
146 EventKind::TransferProgress,
147 TransferProgressPayload {
148 request_id: request_id.to_owned(),
149 bytes_done,
150 bytes_total,
151 parts_done,
152 parts_total,
153 },
154 )?;
155
156 throttle.record_emission(now_ms, bytes_done);
157 Ok(true)
158}
159
160// ---------------------------------------------------------------------------
161// emit_state
162// ---------------------------------------------------------------------------
163
164/// Emit a `transfer:state` event (unthrottled). Use
165/// [`emit_state_with_error`] on `Failed` transitions to attach the
166/// underlying `AppError` to the payload so the frontend can render the
167/// failure reason.
168pub fn emit_state<E: EventEmitter>(
169 channel: &E,
170 request_id: &str,
171 state: TransferState,
172) -> Result<(), AppError> {
173 emit_state_with_error(channel, request_id, state, None)
174}
175
176/// Emit a `transfer:state` event with an optional `AppError` attached.
177///
178/// Intended for `Failed` transitions: pass the underlying error so the
179/// frontend's `applyStateEvent` can hydrate the failure reason onto the
180/// transfer record. Non-Failed callers should keep using `emit_state`.
181pub fn emit_state_with_error<E: EventEmitter>(
182 channel: &E,
183 request_id: &str,
184 state: TransferState,
185 error: Option<AppError>,
186) -> Result<(), AppError> {
187 crate::events::emit(
188 channel,
189 EventKind::TransferState,
190 TransferStatePayload {
191 request_id: request_id.to_owned(),
192 state,
193 error,
194 },
195 )
196}
197
198// ---------------------------------------------------------------------------
199// Tests
200// ---------------------------------------------------------------------------
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::events::{EventKind, MockChannel};
206
207 fn now() -> i64 {
208 1_700_000_000_000_i64
209 }
210
211 // -----------------------------------------------------------------------
212 // ProgressThrottle::should_emit
213 // -----------------------------------------------------------------------
214
215 #[test]
216 fn throttle_always_emits_first_event() {
217 let throttle = ProgressThrottle::new();
218 assert!(throttle.should_emit(now(), 0));
219 }
220
221 #[test]
222 fn throttle_blocks_within_window_and_threshold() {
223 let mut throttle = ProgressThrottle::new();
224 throttle.record_emission(now(), 0);
225
226 // 100 ms later, only 1 KB transferred — both conditions unmet.
227 let blocked = !throttle.should_emit(now() + 100, 1024);
228 assert!(blocked, "should NOT emit within throttle window");
229 }
230
231 #[test]
232 fn throttle_opens_after_250ms() {
233 let mut throttle = ProgressThrottle::new();
234 throttle.record_emission(now(), 0);
235
236 // Exactly 250 ms later.
237 assert!(
238 throttle.should_emit(now() + 250, 1024),
239 "should emit after 250 ms"
240 );
241 }
242
243 #[test]
244 fn throttle_opens_after_256kb() {
245 let mut throttle = ProgressThrottle::new();
246 throttle.record_emission(now(), 0);
247
248 // Only 100 ms elapsed, but 256 KB transferred.
249 assert!(
250 throttle.should_emit(now() + 100, 262_144),
251 "should emit after 256 KB"
252 );
253 }
254
255 // -----------------------------------------------------------------------
256 // emit_progress collapses multiple rapid calls into one
257 // -----------------------------------------------------------------------
258
259 #[test]
260 fn rapid_progress_calls_collapse_into_one_emission() {
261 let channel = MockChannel::default();
262 let mut throttle = ProgressThrottle::new();
263 let t0 = now();
264
265 // First call — always emitted.
266 let emitted = emit_progress(
267 &channel,
268 "req-1",
269 0,
270 Some(1_000_000),
271 0,
272 0,
273 &mut throttle,
274 t0,
275 )
276 .expect("emit must not error");
277 assert!(emitted, "first call must emit");
278
279 // Five calls within 100 ms and < 256 KB each — all throttled.
280 for i in 1_u64..=5 {
281 let emitted = emit_progress(
282 &channel,
283 "req-1",
284 i * 10_000, // only 10 KB per step
285 Some(1_000_000),
286 0,
287 0,
288 &mut throttle,
289 t0 + 20 * i as i64, // 20 ms apart
290 )
291 .expect("emit must not error");
292 assert!(!emitted, "call {i} must be throttled");
293 }
294
295 // Only one event should have been emitted.
296 let emitted_events = channel.emitted();
297 assert_eq!(
298 emitted_events.len(),
299 1,
300 "rapid calls must collapse into one emission"
301 );
302 assert_eq!(emitted_events[0].0, EventKind::TransferProgress);
303 assert_eq!(emitted_events[0].1["requestId"], "req-1");
304 assert_eq!(emitted_events[0].1["bytesDone"], 0_u64);
305 }
306
307 #[test]
308 fn progress_emits_again_after_250ms() {
309 let channel = MockChannel::default();
310 let mut throttle = ProgressThrottle::new();
311 let t0 = now();
312
313 emit_progress(
314 &channel,
315 "req-2",
316 0,
317 Some(1_000_000),
318 0,
319 0,
320 &mut throttle,
321 t0,
322 )
323 .unwrap();
324 // Advance 250 ms.
325 let emitted = emit_progress(
326 &channel,
327 "req-2",
328 256_000,
329 Some(1_000_000),
330 0,
331 0,
332 &mut throttle,
333 t0 + 250,
334 )
335 .unwrap();
336
337 assert!(emitted, "must emit again after 250 ms");
338 assert_eq!(channel.emitted().len(), 2);
339 }
340
341 // -----------------------------------------------------------------------
342 // emit_state — always fires
343 // -----------------------------------------------------------------------
344
345 #[test]
346 fn emit_state_always_fires() {
347 let channel = MockChannel::default();
348 emit_state(&channel, "req-3", TransferState::Running).expect("must succeed");
349 emit_state(&channel, "req-3", TransferState::Done).expect("must succeed");
350
351 let events = channel.emitted();
352 assert_eq!(events.len(), 2);
353 assert_eq!(events[0].0, EventKind::TransferState);
354 assert_eq!(events[0].1["state"], "running");
355 assert_eq!(events[1].1["state"], "done");
356 }
357
358 // -----------------------------------------------------------------------
359 // TransferProgressPayload serializes to camelCase
360 // -----------------------------------------------------------------------
361
362 #[test]
363 fn progress_payload_serializes_camel_case() {
364 let channel = MockChannel::default();
365 let mut throttle = ProgressThrottle::new();
366 emit_progress(
367 &channel,
368 "req-4",
369 512,
370 Some(1024),
371 1,
372 4,
373 &mut throttle,
374 now(),
375 )
376 .unwrap();
377 let events = channel.emitted();
378 let payload = &events[0].1;
379 assert!(
380 payload.get("requestId").is_some(),
381 "requestId must be present"
382 );
383 assert!(
384 payload.get("bytesDone").is_some(),
385 "bytesDone must be present"
386 );
387 assert!(
388 payload.get("bytesTotal").is_some(),
389 "bytesTotal must be present"
390 );
391 assert!(
392 payload.get("partsDone").is_some(),
393 "partsDone must be present"
394 );
395 assert!(
396 payload.get("partsTotal").is_some(),
397 "partsTotal must be present"
398 );
399 }
400}