Skip to main content

brows3r_lib/transfers/
mod.rs

1//! Transfer registry and domain types for file downloads and uploads.
2//!
3//! # Architecture
4//!
5//! - [`Transfer`]              — per-transfer state record.
6//! - [`TransferKind`]          — discriminates Download vs Upload.
7//! - [`TransferState`]         — lifecycle state machine (Queued → Running → terminal).
8//! - [`TransferRegistry`]      — in-memory registry with cancellation tokens.
9//! - [`TransferRegistryHandle`]— `Arc<RwLock<TransferRegistry>>` for Tauri state.
10//! - [`TransferSpec`]          — discriminated union describing what to enqueue.
11//! - [`TransferFilter`]        — filter enum for `transfer_list`.
12//! - [`TransferQueue`]         — concurrency-capped scheduling layer over `TransferRegistry`.
13//! - [`TransferQueueHandle`]   — `Arc<TransferQueue>` for Tauri state.
14//!
15//! # OCP contract
16//!
17//! - `Transfer` gains `checksum`, `priority`, `retries` as optional fields later
18//!   with no breaking change to existing call sites.
19//! - `TransferRegistry` is decoupled from transfer logic — upload (task 32) reuses
20//!   the same registry without modification.
21//! - `TransferState` uses 5 variants as per design.md events line 397.
22//! - `TransferSpec` is an open enum — new kinds (`Move`, `Copy`) are additive.
23//! - `TransferFilter` is extensible without breaking existing call sites.
24//! - The concurrency cap is a single `Arc<Semaphore>` — rebuilding on settings
25//!   changes is one `rebuild_semaphore` call.
26
27pub mod download;
28pub mod notify;
29pub mod progress;
30pub mod upload;
31
32use std::{collections::HashMap, path::PathBuf, sync::Arc};
33
34use serde::{Deserialize, Serialize};
35use tokio::sync::{oneshot, RwLock, Semaphore};
36use uuid::Uuid;
37
38use crate::{
39    error::AppError,
40    ids::{BucketId, ProfileId},
41};
42
43// ---------------------------------------------------------------------------
44// TransferKind
45// ---------------------------------------------------------------------------
46
47/// Discriminates between download and upload transfers.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum TransferKind {
51    Download,
52    Upload,
53}
54
55// ---------------------------------------------------------------------------
56// TransferState
57// ---------------------------------------------------------------------------
58
59/// Lifecycle state of a transfer.
60///
61/// Serialized as snake_case to match design.md events line 397:
62/// `queued | running | done | failed | canceled`
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum TransferState {
66    Queued,
67    Running,
68    Done,
69    Failed,
70    Canceled,
71}
72
73// ---------------------------------------------------------------------------
74// Transfer
75// ---------------------------------------------------------------------------
76
77/// Full state record for one download or upload transfer.
78///
79/// OCP: adding `checksum`, `priority`, or `retries` is a non-breaking additive
80/// change — existing call sites are unaffected.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct Transfer {
84    /// UUID v4 request identifier returned to the frontend.
85    pub id: String,
86    pub kind: TransferKind,
87    pub profile_id: ProfileId,
88    pub bucket: BucketId,
89    /// S3 object key.
90    pub key: String,
91    /// Source local path for uploads; `None` for downloads.
92    pub source_path: Option<PathBuf>,
93    /// Destination local path for downloads; `None` for uploads.
94    pub dest_path: Option<PathBuf>,
95    /// Total bytes, if known before the transfer starts.
96    pub total_bytes: Option<u64>,
97    /// Bytes transferred so far.
98    pub transferred_bytes: u64,
99    /// Multipart parts completed so far.
100    pub parts_done: u32,
101    /// Total multipart parts, if applicable.
102    pub parts_total: u32,
103    pub state: TransferState,
104    /// Unix timestamp (milliseconds) when the transfer was registered.
105    pub started_at: i64,
106    /// Unix timestamp (milliseconds) when the transfer reached a terminal state.
107    pub finished_at: Option<i64>,
108    /// Error details when `state` is `Failed`. AppError serializes one-way
109    /// (backend → frontend) via its custom `Serialize` impl; we skip on
110    /// deserialize so the Transfer struct can still be deserialized when it
111    /// crosses IPC the other direction (e.g. test fixtures).
112    #[serde(skip_deserializing, default)]
113    pub error: Option<AppError>,
114}
115
116// ---------------------------------------------------------------------------
117// CancelToken — oneshot-based cancellation
118// ---------------------------------------------------------------------------
119
120/// Sender side of a cancellation signal for one transfer.
121///
122/// Held by `TransferRegistry`; the download/upload task holds the receiver.
123pub struct CancelToken(pub oneshot::Sender<()>);
124
125// ---------------------------------------------------------------------------
126// TransferRegistry
127// ---------------------------------------------------------------------------
128
129/// In-memory registry of active and recently-completed transfers.
130///
131/// Thread-safe via `Arc<RwLock<TransferRegistry>>`.  Commands register a
132/// transfer, spawn the work, and the work task calls back via `update`.
133pub struct TransferRegistry {
134    transfers: HashMap<String, Transfer>,
135    cancel_tokens: HashMap<String, CancelToken>,
136}
137
138impl TransferRegistry {
139    pub fn new() -> Self {
140        Self {
141            transfers: HashMap::new(),
142            cancel_tokens: HashMap::new(),
143        }
144    }
145
146    /// Register a new transfer and return its `id` (UUID v4).
147    ///
148    /// Returns `(id, cancel_receiver)` — the caller spawns the task with
149    /// the receiver so it can detect a cancel signal.
150    pub fn register(&mut self, transfer: Transfer) -> (String, oneshot::Receiver<()>) {
151        let id = transfer.id.clone();
152        let (tx, rx) = oneshot::channel::<()>();
153        self.transfers.insert(id.clone(), transfer);
154        self.cancel_tokens.insert(id.clone(), CancelToken(tx));
155        (id, rx)
156    }
157
158    /// Apply a mutator closure to the transfer with `id`.
159    ///
160    /// Returns `AppError::NotFound` when the id is unknown.
161    pub fn update<F>(&mut self, id: &str, mutator: F) -> Result<(), AppError>
162    where
163        F: FnOnce(&mut Transfer),
164    {
165        match self.transfers.get_mut(id) {
166            Some(t) => {
167                mutator(t);
168                Ok(())
169            }
170            None => Err(AppError::NotFound {
171                resource: format!("transfer:{id}"),
172            }),
173        }
174    }
175
176    /// Send the cancellation signal to the transfer with `id`.
177    ///
178    /// The cancel token is consumed on first call; subsequent calls on the
179    /// same id return `Ok(())` (idempotent).
180    pub fn cancel(&mut self, id: &str) -> Result<(), AppError> {
181        match self.cancel_tokens.remove(id) {
182            Some(token) => {
183                // Receiver may already be dropped (completed transfer).
184                let _ = token.0.send(());
185                Ok(())
186            }
187            None => {
188                // Token already consumed (cancel already sent) or id unknown.
189                if self.transfers.contains_key(id) {
190                    Ok(()) // idempotent
191                } else {
192                    Err(AppError::NotFound {
193                        resource: format!("transfer:{id}"),
194                    })
195                }
196            }
197        }
198    }
199
200    /// Return transfers, optionally filtered by `profile_id`.
201    ///
202    /// `None` → all transfers.
203    pub fn list(&self, profile_filter: Option<&ProfileId>) -> Vec<Transfer> {
204        self.transfers
205            .values()
206            .filter(|t| profile_filter.map(|pf| &t.profile_id == pf).unwrap_or(true))
207            .cloned()
208            .collect()
209    }
210
211    /// Retrieve a single transfer by id.
212    pub fn get(&self, id: &str) -> Option<&Transfer> {
213        self.transfers.get(id)
214    }
215}
216
217impl Default for TransferRegistry {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223// ---------------------------------------------------------------------------
224// TransferRegistryHandle — Arc<RwLock<...>> for Tauri state
225// ---------------------------------------------------------------------------
226
227/// Tauri managed state handle for the transfer registry.
228///
229/// Wraps `Arc<RwLock<TransferRegistry>>` so commands can `.write().await` to
230/// register/update/cancel, or `.read().await` to list/get.
231#[derive(Clone)]
232pub struct TransferRegistryHandle(pub Arc<RwLock<TransferRegistry>>);
233
234impl TransferRegistryHandle {
235    pub fn new(registry: TransferRegistry) -> Self {
236        Self(Arc::new(RwLock::new(registry)))
237    }
238
239    /// Borrow the inner `RwLock<TransferRegistry>` for read/write access.
240    pub fn inner(&self) -> &RwLock<TransferRegistry> {
241        &self.0
242    }
243}
244
245impl Default for TransferRegistryHandle {
246    fn default() -> Self {
247        Self::new(TransferRegistry::new())
248    }
249}
250
251// ---------------------------------------------------------------------------
252// TransferSpec — open enum describing a transfer to enqueue
253// ---------------------------------------------------------------------------
254
255/// Discriminated union describing a transfer that can be enqueued into
256/// [`TransferQueue`].
257///
258/// OCP: new kinds (`Move`, `Copy`) are added as additional variants here
259/// without touching existing call sites that match on `Upload`/`Download`.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261#[serde(tag = "kind", rename_all = "snake_case")]
262pub enum TransferSpec {
263    Upload {
264        profile: ProfileId,
265        bucket: BucketId,
266        key: String,
267        source_path: PathBuf,
268    },
269    Download {
270        profile: ProfileId,
271        bucket: BucketId,
272        key: String,
273        dest_path: PathBuf,
274    },
275}
276
277// ---------------------------------------------------------------------------
278// TransferFilter — extensible filter for transfer_list
279// ---------------------------------------------------------------------------
280
281/// Filter applied by `transfer_list` to narrow the returned set.
282///
283/// OCP: new filter variants (e.g. `ByBucket(BucketId)`) are additive.
284#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
285#[serde(rename_all = "snake_case")]
286pub enum TransferFilter {
287    /// Running or Queued transfers.
288    Active,
289    /// Transfers in terminal state Done.
290    Completed,
291    /// Transfers in terminal state Failed.
292    Failed,
293    /// All transfers regardless of state.
294    All,
295}
296
297impl TransferFilter {
298    fn matches(&self, t: &Transfer) -> bool {
299        match self {
300            Self::Active => t.state == TransferState::Queued || t.state == TransferState::Running,
301            Self::Completed => t.state == TransferState::Done,
302            Self::Failed => t.state == TransferState::Failed,
303            Self::All => true,
304        }
305    }
306}
307
308// ---------------------------------------------------------------------------
309// TransferQueue — concurrency-capped scheduling layer
310// ---------------------------------------------------------------------------
311
312/// Concurrency-capped transfer queue wrapping [`TransferRegistry`].
313///
314/// The [`Semaphore`] limits how many transfers run simultaneously.  The
315/// semaphore is acquired **inside** the spawned task, so `enqueue` always
316/// returns immediately with a `request_id`.
317///
318/// When `transfer_concurrency` changes in settings, call `rebuild_semaphore`
319/// to replace the semaphore with a new one at the new width.
320pub struct TransferQueue {
321    registry: Arc<RwLock<TransferRegistry>>,
322    semaphore: std::sync::RwLock<Arc<Semaphore>>,
323}
324
325impl TransferQueue {
326    /// Create a new queue with the given concurrency cap.
327    pub fn new(concurrency: u32) -> Self {
328        let cap = (concurrency as usize).max(1);
329        Self {
330            registry: Arc::new(RwLock::new(TransferRegistry::new())),
331            semaphore: std::sync::RwLock::new(Arc::new(Semaphore::new(cap))),
332        }
333    }
334
335    /// Shared reference to the inner `RwLock<TransferRegistry>`.
336    pub fn registry(&self) -> &RwLock<TransferRegistry> {
337        &self.registry
338    }
339
340    /// Replace the semaphore with a new one at `new_concurrency`.
341    ///
342    /// In-flight transfers hold their old permits until they finish; the new
343    /// semaphore only affects newly spawned transfers.
344    pub fn rebuild_semaphore(&self, new_concurrency: u32) {
345        let cap = (new_concurrency as usize).max(1);
346        let mut guard = self.semaphore.write().expect("semaphore lock poisoned");
347        *guard = Arc::new(Semaphore::new(cap));
348    }
349
350    /// Acquire a clone of the current semaphore.
351    ///
352    /// The returned `Arc<Semaphore>` is the one that was active at the moment
353    /// of the call.  Spawned tasks hold this arc; a `rebuild_semaphore` call
354    /// does not invalidate existing permit holders.
355    pub fn current_semaphore(&self) -> Arc<Semaphore> {
356        Arc::clone(&*self.semaphore.read().expect("semaphore lock poisoned"))
357    }
358
359    /// Return a [`TransferRegistryHandle`] that shares the same underlying
360    /// registry, suitable for passing to `download_object` / `upload_object`.
361    pub fn registry_handle(&self) -> TransferRegistryHandle {
362        TransferRegistryHandle(Arc::clone(&self.registry))
363    }
364
365    /// List transfers, optionally filtering by state.
366    ///
367    /// `filter` = `None` is equivalent to `TransferFilter::All`.
368    pub async fn list(&self, filter: Option<TransferFilter>) -> Vec<Transfer> {
369        let reg = self.registry.read().await;
370        let f = filter.unwrap_or(TransferFilter::All);
371        reg.transfers
372            .values()
373            .filter(|t| f.matches(t))
374            .cloned()
375            .collect()
376    }
377
378    /// Cancel the transfer with `id`.
379    pub async fn cancel(&self, id: &str) -> Result<(), AppError> {
380        let mut reg = self.registry.write().await;
381        reg.cancel(id)
382    }
383}
384
385impl Default for TransferQueue {
386    fn default() -> Self {
387        Self::new(4)
388    }
389}
390
391// ---------------------------------------------------------------------------
392// TransferQueueHandle — Arc<TransferQueue> for Tauri state
393// ---------------------------------------------------------------------------
394
395/// Tauri managed-state handle for the transfer queue.
396///
397/// Cloning is cheap — all clones share the same underlying `TransferQueue`.
398#[derive(Clone)]
399pub struct TransferQueueHandle(pub Arc<TransferQueue>);
400
401impl TransferQueueHandle {
402    pub fn new(queue: TransferQueue) -> Self {
403        Self(Arc::new(queue))
404    }
405}
406
407impl Default for TransferQueueHandle {
408    fn default() -> Self {
409        Self::new(TransferQueue::default())
410    }
411}
412
413// ---------------------------------------------------------------------------
414// Helper: mint a new Transfer id
415// ---------------------------------------------------------------------------
416
417pub fn new_transfer_id() -> String {
418    Uuid::new_v4().to_string()
419}
420
421// ---------------------------------------------------------------------------
422// Tests
423// ---------------------------------------------------------------------------
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428
429    fn profile() -> ProfileId {
430        ProfileId::new("p1")
431    }
432
433    fn bucket() -> BucketId {
434        BucketId::new("my-bucket")
435    }
436
437    fn now_ms() -> i64 {
438        1_700_000_000_000
439    }
440
441    fn make_transfer(id: &str) -> Transfer {
442        Transfer {
443            id: id.to_owned(),
444            kind: TransferKind::Download,
445            profile_id: profile(),
446            bucket: bucket(),
447            key: "test/file.bin".to_string(),
448            source_path: None,
449            dest_path: Some(PathBuf::from("/tmp/file.bin")),
450            total_bytes: Some(1_048_576),
451            transferred_bytes: 0,
452            parts_done: 0,
453            parts_total: 0,
454            state: TransferState::Queued,
455            started_at: now_ms(),
456            finished_at: None,
457            error: None,
458        }
459    }
460
461    // -----------------------------------------------------------------------
462    // Register
463    // -----------------------------------------------------------------------
464
465    #[test]
466    fn register_returns_id_and_cancel_receiver() {
467        let mut registry = TransferRegistry::new();
468        let t = make_transfer("xfer-001");
469        let (id, _rx) = registry.register(t);
470        assert_eq!(id, "xfer-001");
471        assert!(registry.get("xfer-001").is_some());
472    }
473
474    #[test]
475    fn register_multiple_transfers() {
476        let mut registry = TransferRegistry::new();
477        registry.register(make_transfer("t1"));
478        registry.register(make_transfer("t2"));
479        assert_eq!(registry.list(None).len(), 2);
480    }
481
482    // -----------------------------------------------------------------------
483    // Update
484    // -----------------------------------------------------------------------
485
486    #[test]
487    fn update_mutates_transfer_state() {
488        let mut registry = TransferRegistry::new();
489        registry.register(make_transfer("t-upd"));
490
491        registry
492            .update("t-upd", |t| {
493                t.transferred_bytes = 256_000;
494                t.state = TransferState::Running;
495            })
496            .expect("update must succeed");
497
498        let t = registry.get("t-upd").unwrap();
499        assert_eq!(t.transferred_bytes, 256_000);
500        assert_eq!(t.state, TransferState::Running);
501    }
502
503    #[test]
504    fn update_unknown_id_returns_not_found() {
505        let mut registry = TransferRegistry::new();
506        let err = registry
507            .update("nonexistent", |_| {})
508            .expect_err("update on missing id must fail");
509        match err {
510            AppError::NotFound { resource } => {
511                assert!(resource.contains("nonexistent"));
512            }
513            other => panic!("expected NotFound, got {:?}", other),
514        }
515    }
516
517    // -----------------------------------------------------------------------
518    // Cancel
519    // -----------------------------------------------------------------------
520
521    #[tokio::test]
522    async fn cancel_sends_signal_to_receiver() {
523        let mut registry = TransferRegistry::new();
524        let (_, rx) = registry.register(make_transfer("t-cancel"));
525
526        registry.cancel("t-cancel").expect("cancel must succeed");
527
528        // The receiver should immediately resolve.
529        let result = rx.await;
530        assert!(result.is_ok(), "cancel receiver must fire");
531    }
532
533    #[test]
534    fn cancel_twice_is_idempotent() {
535        let mut registry = TransferRegistry::new();
536        registry.register(make_transfer("t-idem"));
537
538        registry
539            .cancel("t-idem")
540            .expect("first cancel must succeed");
541        // Second cancel: token already consumed but transfer still exists.
542        registry
543            .cancel("t-idem")
544            .expect("second cancel must be idempotent");
545    }
546
547    #[test]
548    fn cancel_unknown_id_returns_not_found() {
549        let mut registry = TransferRegistry::new();
550        let err = registry
551            .cancel("ghost-id")
552            .expect_err("cancel on missing id must fail");
553        match err {
554            AppError::NotFound { .. } => {}
555            other => panic!("expected NotFound, got {:?}", other),
556        }
557    }
558
559    // -----------------------------------------------------------------------
560    // List with profile filter
561    // -----------------------------------------------------------------------
562
563    #[test]
564    fn list_filters_by_profile() {
565        let mut registry = TransferRegistry::new();
566
567        let mut t2 = make_transfer("t-other");
568        t2.profile_id = ProfileId::new("other-profile");
569        registry.register(make_transfer("t-p1"));
570        registry.register(t2);
571
572        let filtered = registry.list(Some(&profile()));
573        assert_eq!(filtered.len(), 1);
574        assert_eq!(filtered[0].id, "t-p1");
575    }
576
577    #[test]
578    fn list_none_filter_returns_all() {
579        let mut registry = TransferRegistry::new();
580        registry.register(make_transfer("a"));
581        registry.register(make_transfer("b"));
582        assert_eq!(registry.list(None).len(), 2);
583    }
584
585    // -----------------------------------------------------------------------
586    // TransferState serialization
587    // -----------------------------------------------------------------------
588
589    #[test]
590    fn transfer_state_serializes_as_snake_case() {
591        let cases = [
592            (TransferState::Queued, "queued"),
593            (TransferState::Running, "running"),
594            (TransferState::Done, "done"),
595            (TransferState::Failed, "failed"),
596            (TransferState::Canceled, "canceled"),
597        ];
598        for (state, expected) in &cases {
599            let v = serde_json::to_value(state).expect("must serialize");
600            assert_eq!(v.as_str().unwrap(), *expected, "state {:?}", state);
601        }
602    }
603
604    // -----------------------------------------------------------------------
605    // TransferKind serialization
606    // -----------------------------------------------------------------------
607
608    #[test]
609    fn transfer_kind_serializes_as_snake_case() {
610        assert_eq!(
611            serde_json::to_value(TransferKind::Download)
612                .unwrap()
613                .as_str()
614                .unwrap(),
615            "download"
616        );
617        assert_eq!(
618            serde_json::to_value(TransferKind::Upload)
619                .unwrap()
620                .as_str()
621                .unwrap(),
622            "upload"
623        );
624    }
625
626    // -----------------------------------------------------------------------
627    // TransferFilter
628    // -----------------------------------------------------------------------
629
630    #[test]
631    fn filter_active_matches_queued_and_running() {
632        let mut t_q = make_transfer("q");
633        t_q.state = TransferState::Queued;
634        let mut t_r = make_transfer("r");
635        t_r.state = TransferState::Running;
636        let mut t_d = make_transfer("d");
637        t_d.state = TransferState::Done;
638
639        assert!(TransferFilter::Active.matches(&t_q));
640        assert!(TransferFilter::Active.matches(&t_r));
641        assert!(!TransferFilter::Active.matches(&t_d));
642    }
643
644    #[test]
645    fn filter_completed_matches_done_only() {
646        let mut t_d = make_transfer("d");
647        t_d.state = TransferState::Done;
648        let mut t_f = make_transfer("f");
649        t_f.state = TransferState::Failed;
650
651        assert!(TransferFilter::Completed.matches(&t_d));
652        assert!(!TransferFilter::Completed.matches(&t_f));
653    }
654
655    #[test]
656    fn filter_failed_matches_failed_only() {
657        let mut t_f = make_transfer("f");
658        t_f.state = TransferState::Failed;
659        let mut t_d = make_transfer("d");
660        t_d.state = TransferState::Done;
661
662        assert!(TransferFilter::Failed.matches(&t_f));
663        assert!(!TransferFilter::Failed.matches(&t_d));
664    }
665
666    #[test]
667    fn filter_all_matches_every_state() {
668        for state in [
669            TransferState::Queued,
670            TransferState::Running,
671            TransferState::Done,
672            TransferState::Failed,
673            TransferState::Canceled,
674        ] {
675            let mut t = make_transfer("any");
676            t.state = state;
677            assert!(TransferFilter::All.matches(&t));
678        }
679    }
680
681    // -----------------------------------------------------------------------
682    // TransferQueue — concurrency cap
683    // -----------------------------------------------------------------------
684
685    /// Assert that at most `cap` transfers can be Running at the same time.
686    ///
687    /// We enqueue 6 long-running tasks with cap=2.  Each task sleeps briefly
688    /// then marks itself Done.  We verify that just before the first batch
689    /// finishes, the running count does not exceed 2.
690    #[tokio::test]
691    async fn concurrency_cap_limits_running_transfers() {
692        use std::sync::atomic::{AtomicUsize, Ordering};
693
694        const CAP: usize = 2;
695        const TOTAL: usize = 6;
696
697        let queue = Arc::new(TransferQueue::new(CAP as u32));
698        let active_count = Arc::new(AtomicUsize::new(0));
699        let peak_concurrent = Arc::new(AtomicUsize::new(0));
700        let sem = queue.current_semaphore();
701
702        let mut handles = Vec::new();
703
704        for i in 0..TOTAL {
705            let sem_clone = Arc::clone(&sem);
706            let active_clone = Arc::clone(&active_count);
707            let peak_clone = Arc::clone(&peak_concurrent);
708            let id = format!("t{i}");
709
710            // Register into the registry so we can track state.
711            {
712                let t = Transfer {
713                    id: id.clone(),
714                    kind: TransferKind::Download,
715                    profile_id: profile(),
716                    bucket: bucket(),
717                    key: format!("key/{i}"),
718                    source_path: None,
719                    dest_path: Some(PathBuf::from(format!("/tmp/{id}"))),
720                    total_bytes: None,
721                    transferred_bytes: 0,
722                    parts_done: 0,
723                    parts_total: 0,
724                    state: TransferState::Queued,
725                    started_at: now_ms(),
726                    finished_at: None,
727                    error: None,
728                };
729                let mut reg = queue.registry.write().await;
730                reg.register(t);
731            }
732
733            let handle = tokio::spawn(async move {
734                // Semaphore acquisition is inside the task — simulates the queue.
735                let _permit = sem_clone
736                    .acquire()
737                    .await
738                    .expect("semaphore must not be closed");
739
740                let prev = active_clone.fetch_add(1, Ordering::SeqCst);
741                let current = prev + 1;
742                // Atomically track peak concurrent.
743                peak_clone.fetch_max(current, Ordering::SeqCst);
744
745                // Simulate brief work.
746                tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
747
748                active_clone.fetch_sub(1, Ordering::SeqCst);
749            });
750
751            handles.push(handle);
752        }
753
754        for h in handles {
755            h.await.expect("task must not panic");
756        }
757
758        let peak = peak_concurrent.load(Ordering::SeqCst);
759        assert!(
760            peak <= CAP,
761            "peak concurrent ({peak}) must not exceed cap ({CAP})"
762        );
763    }
764
765    // -----------------------------------------------------------------------
766    // TransferQueue::list with filter
767    // -----------------------------------------------------------------------
768
769    #[tokio::test]
770    async fn queue_list_filter_active() {
771        let queue = TransferQueue::new(4);
772        let mut t_active = make_transfer("a");
773        t_active.state = TransferState::Running;
774        let mut t_done = make_transfer("b");
775        t_done.state = TransferState::Done;
776
777        {
778            let mut reg = queue.registry.write().await;
779            reg.register(t_active);
780            reg.register(t_done);
781        }
782
783        let active = queue.list(Some(TransferFilter::Active)).await;
784        assert_eq!(active.len(), 1);
785        assert_eq!(active[0].id, "a");
786    }
787}