Skip to main content

brows3r_lib/transfers/
progress.rs

1//! Progress event emission helpers for transfers.
2//!
3//! # Design
4//!
5//! `emit_progress` throttles emissions to at most one per 250 ms **or** per
6//! 256 KB transferred, whichever condition is met first.  This avoids flooding
7//! the IPC channel while still giving the frontend a responsive progress bar.
8//!
9//! `emit_state` is unthrottled — state transitions (Queued → Running → Done /
10//! Failed / Canceled) must always reach the frontend.
11//!
12//! # OCP contract
13//!
14//! The same helpers are reused for uploads (task 32) — the throttle logic lives
15//! in one place and is called identically from both `download.rs` and
16//! `upload.rs`.
17
18use serde::Serialize;
19
20use crate::{
21    error::AppError,
22    events::{EventEmitter, EventKind},
23    transfers::TransferState,
24};
25
26// ---------------------------------------------------------------------------
27// Throttle constants
28// ---------------------------------------------------------------------------
29
30/// Minimum milliseconds between progress events for one transfer.
31pub const PROGRESS_THROTTLE_MS: i64 = 250;
32
33/// Minimum bytes transferred between progress events.
34pub const PROGRESS_THROTTLE_BYTES: u64 = 262_144; // 256 KB
35
36// ---------------------------------------------------------------------------
37// Event payloads
38// ---------------------------------------------------------------------------
39
40/// Payload for `transfer:progress`.
41#[derive(Debug, Clone, Serialize)]
42#[serde(rename_all = "camelCase")]
43pub struct TransferProgressPayload {
44    pub request_id: String,
45    pub bytes_done: u64,
46    pub bytes_total: Option<u64>,
47    pub parts_done: u32,
48    pub parts_total: u32,
49}
50
51/// Payload for `transfer:state`.
52///
53/// Carries the AppError when the transition is to `Failed`. The frontend
54/// stores this on the transfer record so TransferRow can render the
55/// failure reason inline instead of just a red badge.
56#[derive(Debug, Clone, Serialize)]
57#[serde(rename_all = "camelCase")]
58pub struct TransferStatePayload {
59    pub request_id: String,
60    pub state: TransferState,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub error: Option<AppError>,
63}
64
65// ---------------------------------------------------------------------------
66// ProgressThrottle — per-transfer state tracker
67// ---------------------------------------------------------------------------
68
69/// Tracks when the last progress event was emitted so `emit_progress` can
70/// apply the 250 ms / 256 KB throttle.
71///
72/// Instantiate one `ProgressThrottle` per transfer at the start of the stream
73/// loop and pass it mutably to each `emit_progress` call.
74#[derive(Debug)]
75pub struct ProgressThrottle {
76    /// Millisecond timestamp of the last emitted event (0 = never).
77    pub last_emitted_at_ms: i64,
78    /// `transferred_bytes` value at the time of the last emitted event.
79    pub last_emitted_bytes: u64,
80}
81
82impl ProgressThrottle {
83    /// Create a new throttle state.  `now_ms` is the current time in ms.
84    pub fn new() -> Self {
85        Self {
86            last_emitted_at_ms: 0,
87            last_emitted_bytes: 0,
88        }
89    }
90
91    /// Returns `true` when a progress event should be emitted now.
92    ///
93    /// The gate opens when:
94    /// - no event has been emitted yet (`last_emitted_at_ms == 0`), OR
95    /// - at least 250 ms have elapsed since the last emission, OR
96    /// - at least 256 KB more have been transferred since the last emission.
97    pub fn should_emit(&self, now_ms: i64, bytes_done: u64) -> bool {
98        if self.last_emitted_at_ms == 0 {
99            return true;
100        }
101        let elapsed_ms = now_ms - self.last_emitted_at_ms;
102        let delta_bytes = bytes_done.saturating_sub(self.last_emitted_bytes);
103        elapsed_ms >= PROGRESS_THROTTLE_MS || delta_bytes >= PROGRESS_THROTTLE_BYTES
104    }
105
106    /// Record that an event was just emitted.
107    pub fn record_emission(&mut self, now_ms: i64, bytes_done: u64) {
108        self.last_emitted_at_ms = now_ms;
109        self.last_emitted_bytes = bytes_done;
110    }
111}
112
113impl Default for ProgressThrottle {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119// ---------------------------------------------------------------------------
120// emit_progress
121// ---------------------------------------------------------------------------
122
123/// Emit a `transfer:progress` event if the throttle gate is open.
124///
125/// Updates `throttle` on emission.  `now_ms` is the current wall-clock time
126/// in milliseconds (callers obtain this via
127/// `std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64`).
128///
129/// Returns `Ok(true)` when an event was emitted, `Ok(false)` when throttled.
130pub fn emit_progress<E: EventEmitter>(
131    channel: &E,
132    request_id: &str,
133    bytes_done: u64,
134    bytes_total: Option<u64>,
135    parts_done: u32,
136    parts_total: u32,
137    throttle: &mut ProgressThrottle,
138    now_ms: i64,
139) -> Result<bool, AppError> {
140    if !throttle.should_emit(now_ms, bytes_done) {
141        return Ok(false);
142    }
143
144    crate::events::emit(
145        channel,
146        EventKind::TransferProgress,
147        TransferProgressPayload {
148            request_id: request_id.to_owned(),
149            bytes_done,
150            bytes_total,
151            parts_done,
152            parts_total,
153        },
154    )?;
155
156    throttle.record_emission(now_ms, bytes_done);
157    Ok(true)
158}
159
160// ---------------------------------------------------------------------------
161// emit_state
162// ---------------------------------------------------------------------------
163
164/// Emit a `transfer:state` event (unthrottled). Use
165/// [`emit_state_with_error`] on `Failed` transitions to attach the
166/// underlying `AppError` to the payload so the frontend can render the
167/// failure reason.
168pub fn emit_state<E: EventEmitter>(
169    channel: &E,
170    request_id: &str,
171    state: TransferState,
172) -> Result<(), AppError> {
173    emit_state_with_error(channel, request_id, state, None)
174}
175
176/// Emit a `transfer:state` event with an optional `AppError` attached.
177///
178/// Intended for `Failed` transitions: pass the underlying error so the
179/// frontend's `applyStateEvent` can hydrate the failure reason onto the
180/// transfer record. Non-Failed callers should keep using `emit_state`.
181pub fn emit_state_with_error<E: EventEmitter>(
182    channel: &E,
183    request_id: &str,
184    state: TransferState,
185    error: Option<AppError>,
186) -> Result<(), AppError> {
187    crate::events::emit(
188        channel,
189        EventKind::TransferState,
190        TransferStatePayload {
191            request_id: request_id.to_owned(),
192            state,
193            error,
194        },
195    )
196}
197
198// ---------------------------------------------------------------------------
199// Tests
200// ---------------------------------------------------------------------------
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::events::{EventKind, MockChannel};
206
207    fn now() -> i64 {
208        1_700_000_000_000_i64
209    }
210
211    // -----------------------------------------------------------------------
212    // ProgressThrottle::should_emit
213    // -----------------------------------------------------------------------
214
215    #[test]
216    fn throttle_always_emits_first_event() {
217        let throttle = ProgressThrottle::new();
218        assert!(throttle.should_emit(now(), 0));
219    }
220
221    #[test]
222    fn throttle_blocks_within_window_and_threshold() {
223        let mut throttle = ProgressThrottle::new();
224        throttle.record_emission(now(), 0);
225
226        // 100 ms later, only 1 KB transferred — both conditions unmet.
227        let blocked = !throttle.should_emit(now() + 100, 1024);
228        assert!(blocked, "should NOT emit within throttle window");
229    }
230
231    #[test]
232    fn throttle_opens_after_250ms() {
233        let mut throttle = ProgressThrottle::new();
234        throttle.record_emission(now(), 0);
235
236        // Exactly 250 ms later.
237        assert!(
238            throttle.should_emit(now() + 250, 1024),
239            "should emit after 250 ms"
240        );
241    }
242
243    #[test]
244    fn throttle_opens_after_256kb() {
245        let mut throttle = ProgressThrottle::new();
246        throttle.record_emission(now(), 0);
247
248        // Only 100 ms elapsed, but 256 KB transferred.
249        assert!(
250            throttle.should_emit(now() + 100, 262_144),
251            "should emit after 256 KB"
252        );
253    }
254
255    // -----------------------------------------------------------------------
256    // emit_progress collapses multiple rapid calls into one
257    // -----------------------------------------------------------------------
258
259    #[test]
260    fn rapid_progress_calls_collapse_into_one_emission() {
261        let channel = MockChannel::default();
262        let mut throttle = ProgressThrottle::new();
263        let t0 = now();
264
265        // First call — always emitted.
266        let emitted = emit_progress(
267            &channel,
268            "req-1",
269            0,
270            Some(1_000_000),
271            0,
272            0,
273            &mut throttle,
274            t0,
275        )
276        .expect("emit must not error");
277        assert!(emitted, "first call must emit");
278
279        // Five calls within 100 ms and < 256 KB each — all throttled.
280        for i in 1_u64..=5 {
281            let emitted = emit_progress(
282                &channel,
283                "req-1",
284                i * 10_000, // only 10 KB per step
285                Some(1_000_000),
286                0,
287                0,
288                &mut throttle,
289                t0 + 20 * i as i64, // 20 ms apart
290            )
291            .expect("emit must not error");
292            assert!(!emitted, "call {i} must be throttled");
293        }
294
295        // Only one event should have been emitted.
296        let emitted_events = channel.emitted();
297        assert_eq!(
298            emitted_events.len(),
299            1,
300            "rapid calls must collapse into one emission"
301        );
302        assert_eq!(emitted_events[0].0, EventKind::TransferProgress);
303        assert_eq!(emitted_events[0].1["requestId"], "req-1");
304        assert_eq!(emitted_events[0].1["bytesDone"], 0_u64);
305    }
306
307    #[test]
308    fn progress_emits_again_after_250ms() {
309        let channel = MockChannel::default();
310        let mut throttle = ProgressThrottle::new();
311        let t0 = now();
312
313        emit_progress(
314            &channel,
315            "req-2",
316            0,
317            Some(1_000_000),
318            0,
319            0,
320            &mut throttle,
321            t0,
322        )
323        .unwrap();
324        // Advance 250 ms.
325        let emitted = emit_progress(
326            &channel,
327            "req-2",
328            256_000,
329            Some(1_000_000),
330            0,
331            0,
332            &mut throttle,
333            t0 + 250,
334        )
335        .unwrap();
336
337        assert!(emitted, "must emit again after 250 ms");
338        assert_eq!(channel.emitted().len(), 2);
339    }
340
341    // -----------------------------------------------------------------------
342    // emit_state — always fires
343    // -----------------------------------------------------------------------
344
345    #[test]
346    fn emit_state_always_fires() {
347        let channel = MockChannel::default();
348        emit_state(&channel, "req-3", TransferState::Running).expect("must succeed");
349        emit_state(&channel, "req-3", TransferState::Done).expect("must succeed");
350
351        let events = channel.emitted();
352        assert_eq!(events.len(), 2);
353        assert_eq!(events[0].0, EventKind::TransferState);
354        assert_eq!(events[0].1["state"], "running");
355        assert_eq!(events[1].1["state"], "done");
356    }
357
358    // -----------------------------------------------------------------------
359    // TransferProgressPayload serializes to camelCase
360    // -----------------------------------------------------------------------
361
362    #[test]
363    fn progress_payload_serializes_camel_case() {
364        let channel = MockChannel::default();
365        let mut throttle = ProgressThrottle::new();
366        emit_progress(
367            &channel,
368            "req-4",
369            512,
370            Some(1024),
371            1,
372            4,
373            &mut throttle,
374            now(),
375        )
376        .unwrap();
377        let events = channel.emitted();
378        let payload = &events[0].1;
379        assert!(
380            payload.get("requestId").is_some(),
381            "requestId must be present"
382        );
383        assert!(
384            payload.get("bytesDone").is_some(),
385            "bytesDone must be present"
386        );
387        assert!(
388            payload.get("bytesTotal").is_some(),
389            "bytesTotal must be present"
390        );
391        assert!(
392            payload.get("partsDone").is_some(),
393            "partsDone must be present"
394        );
395        assert!(
396            payload.get("partsTotal").is_some(),
397            "partsTotal must be present"
398        );
399    }
400}