brows3r_lib/transfers/mod.rs
1//! Transfer registry and domain types for file downloads and uploads.
2//!
3//! # Architecture
4//!
5//! - [`Transfer`] — per-transfer state record.
6//! - [`TransferKind`] — discriminates Download vs Upload.
7//! - [`TransferState`] — lifecycle state machine (Queued → Running → terminal).
8//! - [`TransferRegistry`] — in-memory registry with cancellation tokens.
9//! - [`TransferRegistryHandle`]— `Arc<RwLock<TransferRegistry>>` for Tauri state.
10//! - [`TransferSpec`] — discriminated union describing what to enqueue.
11//! - [`TransferFilter`] — filter enum for `transfer_list`.
12//! - [`TransferQueue`] — concurrency-capped scheduling layer over `TransferRegistry`.
13//! - [`TransferQueueHandle`] — `Arc<TransferQueue>` for Tauri state.
14//!
15//! # OCP contract
16//!
17//! - `Transfer` gains `checksum`, `priority`, `retries` as optional fields later
18//! with no breaking change to existing call sites.
19//! - `TransferRegistry` is decoupled from transfer logic — upload (task 32) reuses
20//! the same registry without modification.
21//! - `TransferState` uses 5 variants as per design.md events line 397.
22//! - `TransferSpec` is an open enum — new kinds (`Move`, `Copy`) are additive.
23//! - `TransferFilter` is extensible without breaking existing call sites.
24//! - The concurrency cap is a single `Arc<Semaphore>` — rebuilding on settings
25//! changes is one `rebuild_semaphore` call.
26
27pub mod download;
28pub mod notify;
29pub mod progress;
30pub mod upload;
31
32use std::{collections::HashMap, path::PathBuf, sync::Arc};
33
34use serde::{Deserialize, Serialize};
35use tokio::sync::{oneshot, RwLock, Semaphore};
36use uuid::Uuid;
37
38use crate::{
39 error::AppError,
40 ids::{BucketId, ProfileId},
41};
42
43// ---------------------------------------------------------------------------
44// TransferKind
45// ---------------------------------------------------------------------------
46
47/// Discriminates between download and upload transfers.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum TransferKind {
51 Download,
52 Upload,
53}
54
55// ---------------------------------------------------------------------------
56// TransferState
57// ---------------------------------------------------------------------------
58
59/// Lifecycle state of a transfer.
60///
61/// Serialized as snake_case to match design.md events line 397:
62/// `queued | running | done | failed | canceled`
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum TransferState {
66 Queued,
67 Running,
68 Done,
69 Failed,
70 Canceled,
71}
72
73// ---------------------------------------------------------------------------
74// Transfer
75// ---------------------------------------------------------------------------
76
77/// Full state record for one download or upload transfer.
78///
79/// OCP: adding `checksum`, `priority`, or `retries` is a non-breaking additive
80/// change — existing call sites are unaffected.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct Transfer {
84 /// UUID v4 request identifier returned to the frontend.
85 pub id: String,
86 pub kind: TransferKind,
87 pub profile_id: ProfileId,
88 pub bucket: BucketId,
89 /// S3 object key.
90 pub key: String,
91 /// Source local path for uploads; `None` for downloads.
92 pub source_path: Option<PathBuf>,
93 /// Destination local path for downloads; `None` for uploads.
94 pub dest_path: Option<PathBuf>,
95 /// Total bytes, if known before the transfer starts.
96 pub total_bytes: Option<u64>,
97 /// Bytes transferred so far.
98 pub transferred_bytes: u64,
99 /// Multipart parts completed so far.
100 pub parts_done: u32,
101 /// Total multipart parts, if applicable.
102 pub parts_total: u32,
103 pub state: TransferState,
104 /// Unix timestamp (milliseconds) when the transfer was registered.
105 pub started_at: i64,
106 /// Unix timestamp (milliseconds) when the transfer reached a terminal state.
107 pub finished_at: Option<i64>,
108 /// Error details when `state` is `Failed`. AppError serializes one-way
109 /// (backend → frontend) via its custom `Serialize` impl; we skip on
110 /// deserialize so the Transfer struct can still be deserialized when it
111 /// crosses IPC the other direction (e.g. test fixtures).
112 #[serde(skip_deserializing, default)]
113 pub error: Option<AppError>,
114}
115
116// ---------------------------------------------------------------------------
117// CancelToken — oneshot-based cancellation
118// ---------------------------------------------------------------------------
119
120/// Sender side of a cancellation signal for one transfer.
121///
122/// Held by `TransferRegistry`; the download/upload task holds the receiver.
123pub struct CancelToken(pub oneshot::Sender<()>);
124
125// ---------------------------------------------------------------------------
126// TransferRegistry
127// ---------------------------------------------------------------------------
128
129/// In-memory registry of active and recently-completed transfers.
130///
131/// Thread-safe via `Arc<RwLock<TransferRegistry>>`. Commands register a
132/// transfer, spawn the work, and the work task calls back via `update`.
133pub struct TransferRegistry {
134 transfers: HashMap<String, Transfer>,
135 cancel_tokens: HashMap<String, CancelToken>,
136}
137
138impl TransferRegistry {
139 pub fn new() -> Self {
140 Self {
141 transfers: HashMap::new(),
142 cancel_tokens: HashMap::new(),
143 }
144 }
145
146 /// Register a new transfer and return its `id` (UUID v4).
147 ///
148 /// Returns `(id, cancel_receiver)` — the caller spawns the task with
149 /// the receiver so it can detect a cancel signal.
150 pub fn register(&mut self, transfer: Transfer) -> (String, oneshot::Receiver<()>) {
151 let id = transfer.id.clone();
152 let (tx, rx) = oneshot::channel::<()>();
153 self.transfers.insert(id.clone(), transfer);
154 self.cancel_tokens.insert(id.clone(), CancelToken(tx));
155 (id, rx)
156 }
157
158 /// Apply a mutator closure to the transfer with `id`.
159 ///
160 /// Returns `AppError::NotFound` when the id is unknown.
161 pub fn update<F>(&mut self, id: &str, mutator: F) -> Result<(), AppError>
162 where
163 F: FnOnce(&mut Transfer),
164 {
165 match self.transfers.get_mut(id) {
166 Some(t) => {
167 mutator(t);
168 Ok(())
169 }
170 None => Err(AppError::NotFound {
171 resource: format!("transfer:{id}"),
172 }),
173 }
174 }
175
176 /// Send the cancellation signal to the transfer with `id`.
177 ///
178 /// The cancel token is consumed on first call; subsequent calls on the
179 /// same id return `Ok(())` (idempotent).
180 pub fn cancel(&mut self, id: &str) -> Result<(), AppError> {
181 match self.cancel_tokens.remove(id) {
182 Some(token) => {
183 // Receiver may already be dropped (completed transfer).
184 let _ = token.0.send(());
185 Ok(())
186 }
187 None => {
188 // Token already consumed (cancel already sent) or id unknown.
189 if self.transfers.contains_key(id) {
190 Ok(()) // idempotent
191 } else {
192 Err(AppError::NotFound {
193 resource: format!("transfer:{id}"),
194 })
195 }
196 }
197 }
198 }
199
200 /// Return transfers, optionally filtered by `profile_id`.
201 ///
202 /// `None` → all transfers.
203 pub fn list(&self, profile_filter: Option<&ProfileId>) -> Vec<Transfer> {
204 self.transfers
205 .values()
206 .filter(|t| profile_filter.map(|pf| &t.profile_id == pf).unwrap_or(true))
207 .cloned()
208 .collect()
209 }
210
211 /// Retrieve a single transfer by id.
212 pub fn get(&self, id: &str) -> Option<&Transfer> {
213 self.transfers.get(id)
214 }
215}
216
217impl Default for TransferRegistry {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223// ---------------------------------------------------------------------------
224// TransferRegistryHandle — Arc<RwLock<...>> for Tauri state
225// ---------------------------------------------------------------------------
226
227/// Tauri managed state handle for the transfer registry.
228///
229/// Wraps `Arc<RwLock<TransferRegistry>>` so commands can `.write().await` to
230/// register/update/cancel, or `.read().await` to list/get.
231#[derive(Clone)]
232pub struct TransferRegistryHandle(pub Arc<RwLock<TransferRegistry>>);
233
234impl TransferRegistryHandle {
235 pub fn new(registry: TransferRegistry) -> Self {
236 Self(Arc::new(RwLock::new(registry)))
237 }
238
239 /// Borrow the inner `RwLock<TransferRegistry>` for read/write access.
240 pub fn inner(&self) -> &RwLock<TransferRegistry> {
241 &self.0
242 }
243}
244
245impl Default for TransferRegistryHandle {
246 fn default() -> Self {
247 Self::new(TransferRegistry::new())
248 }
249}
250
251// ---------------------------------------------------------------------------
252// TransferSpec — open enum describing a transfer to enqueue
253// ---------------------------------------------------------------------------
254
255/// Discriminated union describing a transfer that can be enqueued into
256/// [`TransferQueue`].
257///
258/// OCP: new kinds (`Move`, `Copy`) are added as additional variants here
259/// without touching existing call sites that match on `Upload`/`Download`.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261#[serde(tag = "kind", rename_all = "snake_case")]
262pub enum TransferSpec {
263 Upload {
264 profile: ProfileId,
265 bucket: BucketId,
266 key: String,
267 source_path: PathBuf,
268 },
269 Download {
270 profile: ProfileId,
271 bucket: BucketId,
272 key: String,
273 dest_path: PathBuf,
274 },
275}
276
277// ---------------------------------------------------------------------------
278// TransferFilter — extensible filter for transfer_list
279// ---------------------------------------------------------------------------
280
281/// Filter applied by `transfer_list` to narrow the returned set.
282///
283/// OCP: new filter variants (e.g. `ByBucket(BucketId)`) are additive.
284#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
285#[serde(rename_all = "snake_case")]
286pub enum TransferFilter {
287 /// Running or Queued transfers.
288 Active,
289 /// Transfers in terminal state Done.
290 Completed,
291 /// Transfers in terminal state Failed.
292 Failed,
293 /// All transfers regardless of state.
294 All,
295}
296
297impl TransferFilter {
298 fn matches(&self, t: &Transfer) -> bool {
299 match self {
300 Self::Active => t.state == TransferState::Queued || t.state == TransferState::Running,
301 Self::Completed => t.state == TransferState::Done,
302 Self::Failed => t.state == TransferState::Failed,
303 Self::All => true,
304 }
305 }
306}
307
308// ---------------------------------------------------------------------------
309// TransferQueue — concurrency-capped scheduling layer
310// ---------------------------------------------------------------------------
311
312/// Concurrency-capped transfer queue wrapping [`TransferRegistry`].
313///
314/// The [`Semaphore`] limits how many transfers run simultaneously. The
315/// semaphore is acquired **inside** the spawned task, so `enqueue` always
316/// returns immediately with a `request_id`.
317///
318/// When `transfer_concurrency` changes in settings, call `rebuild_semaphore`
319/// to replace the semaphore with a new one at the new width.
320pub struct TransferQueue {
321 registry: Arc<RwLock<TransferRegistry>>,
322 semaphore: std::sync::RwLock<Arc<Semaphore>>,
323}
324
325impl TransferQueue {
326 /// Create a new queue with the given concurrency cap.
327 pub fn new(concurrency: u32) -> Self {
328 let cap = (concurrency as usize).max(1);
329 Self {
330 registry: Arc::new(RwLock::new(TransferRegistry::new())),
331 semaphore: std::sync::RwLock::new(Arc::new(Semaphore::new(cap))),
332 }
333 }
334
335 /// Shared reference to the inner `RwLock<TransferRegistry>`.
336 pub fn registry(&self) -> &RwLock<TransferRegistry> {
337 &self.registry
338 }
339
340 /// Replace the semaphore with a new one at `new_concurrency`.
341 ///
342 /// In-flight transfers hold their old permits until they finish; the new
343 /// semaphore only affects newly spawned transfers.
344 pub fn rebuild_semaphore(&self, new_concurrency: u32) {
345 let cap = (new_concurrency as usize).max(1);
346 let mut guard = self.semaphore.write().expect("semaphore lock poisoned");
347 *guard = Arc::new(Semaphore::new(cap));
348 }
349
350 /// Acquire a clone of the current semaphore.
351 ///
352 /// The returned `Arc<Semaphore>` is the one that was active at the moment
353 /// of the call. Spawned tasks hold this arc; a `rebuild_semaphore` call
354 /// does not invalidate existing permit holders.
355 pub fn current_semaphore(&self) -> Arc<Semaphore> {
356 Arc::clone(&*self.semaphore.read().expect("semaphore lock poisoned"))
357 }
358
359 /// Return a [`TransferRegistryHandle`] that shares the same underlying
360 /// registry, suitable for passing to `download_object` / `upload_object`.
361 pub fn registry_handle(&self) -> TransferRegistryHandle {
362 TransferRegistryHandle(Arc::clone(&self.registry))
363 }
364
365 /// List transfers, optionally filtering by state.
366 ///
367 /// `filter` = `None` is equivalent to `TransferFilter::All`.
368 pub async fn list(&self, filter: Option<TransferFilter>) -> Vec<Transfer> {
369 let reg = self.registry.read().await;
370 let f = filter.unwrap_or(TransferFilter::All);
371 reg.transfers
372 .values()
373 .filter(|t| f.matches(t))
374 .cloned()
375 .collect()
376 }
377
378 /// Cancel the transfer with `id`.
379 pub async fn cancel(&self, id: &str) -> Result<(), AppError> {
380 let mut reg = self.registry.write().await;
381 reg.cancel(id)
382 }
383}
384
385impl Default for TransferQueue {
386 fn default() -> Self {
387 Self::new(4)
388 }
389}
390
391// ---------------------------------------------------------------------------
392// TransferQueueHandle — Arc<TransferQueue> for Tauri state
393// ---------------------------------------------------------------------------
394
395/// Tauri managed-state handle for the transfer queue.
396///
397/// Cloning is cheap — all clones share the same underlying `TransferQueue`.
398#[derive(Clone)]
399pub struct TransferQueueHandle(pub Arc<TransferQueue>);
400
401impl TransferQueueHandle {
402 pub fn new(queue: TransferQueue) -> Self {
403 Self(Arc::new(queue))
404 }
405}
406
407impl Default for TransferQueueHandle {
408 fn default() -> Self {
409 Self::new(TransferQueue::default())
410 }
411}
412
413// ---------------------------------------------------------------------------
414// Helper: mint a new Transfer id
415// ---------------------------------------------------------------------------
416
417pub fn new_transfer_id() -> String {
418 Uuid::new_v4().to_string()
419}
420
421// ---------------------------------------------------------------------------
422// Tests
423// ---------------------------------------------------------------------------
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428
429 fn profile() -> ProfileId {
430 ProfileId::new("p1")
431 }
432
433 fn bucket() -> BucketId {
434 BucketId::new("my-bucket")
435 }
436
437 fn now_ms() -> i64 {
438 1_700_000_000_000
439 }
440
441 fn make_transfer(id: &str) -> Transfer {
442 Transfer {
443 id: id.to_owned(),
444 kind: TransferKind::Download,
445 profile_id: profile(),
446 bucket: bucket(),
447 key: "test/file.bin".to_string(),
448 source_path: None,
449 dest_path: Some(PathBuf::from("/tmp/file.bin")),
450 total_bytes: Some(1_048_576),
451 transferred_bytes: 0,
452 parts_done: 0,
453 parts_total: 0,
454 state: TransferState::Queued,
455 started_at: now_ms(),
456 finished_at: None,
457 error: None,
458 }
459 }
460
461 // -----------------------------------------------------------------------
462 // Register
463 // -----------------------------------------------------------------------
464
465 #[test]
466 fn register_returns_id_and_cancel_receiver() {
467 let mut registry = TransferRegistry::new();
468 let t = make_transfer("xfer-001");
469 let (id, _rx) = registry.register(t);
470 assert_eq!(id, "xfer-001");
471 assert!(registry.get("xfer-001").is_some());
472 }
473
474 #[test]
475 fn register_multiple_transfers() {
476 let mut registry = TransferRegistry::new();
477 registry.register(make_transfer("t1"));
478 registry.register(make_transfer("t2"));
479 assert_eq!(registry.list(None).len(), 2);
480 }
481
482 // -----------------------------------------------------------------------
483 // Update
484 // -----------------------------------------------------------------------
485
486 #[test]
487 fn update_mutates_transfer_state() {
488 let mut registry = TransferRegistry::new();
489 registry.register(make_transfer("t-upd"));
490
491 registry
492 .update("t-upd", |t| {
493 t.transferred_bytes = 256_000;
494 t.state = TransferState::Running;
495 })
496 .expect("update must succeed");
497
498 let t = registry.get("t-upd").unwrap();
499 assert_eq!(t.transferred_bytes, 256_000);
500 assert_eq!(t.state, TransferState::Running);
501 }
502
503 #[test]
504 fn update_unknown_id_returns_not_found() {
505 let mut registry = TransferRegistry::new();
506 let err = registry
507 .update("nonexistent", |_| {})
508 .expect_err("update on missing id must fail");
509 match err {
510 AppError::NotFound { resource } => {
511 assert!(resource.contains("nonexistent"));
512 }
513 other => panic!("expected NotFound, got {:?}", other),
514 }
515 }
516
517 // -----------------------------------------------------------------------
518 // Cancel
519 // -----------------------------------------------------------------------
520
521 #[tokio::test]
522 async fn cancel_sends_signal_to_receiver() {
523 let mut registry = TransferRegistry::new();
524 let (_, rx) = registry.register(make_transfer("t-cancel"));
525
526 registry.cancel("t-cancel").expect("cancel must succeed");
527
528 // The receiver should immediately resolve.
529 let result = rx.await;
530 assert!(result.is_ok(), "cancel receiver must fire");
531 }
532
533 #[test]
534 fn cancel_twice_is_idempotent() {
535 let mut registry = TransferRegistry::new();
536 registry.register(make_transfer("t-idem"));
537
538 registry
539 .cancel("t-idem")
540 .expect("first cancel must succeed");
541 // Second cancel: token already consumed but transfer still exists.
542 registry
543 .cancel("t-idem")
544 .expect("second cancel must be idempotent");
545 }
546
547 #[test]
548 fn cancel_unknown_id_returns_not_found() {
549 let mut registry = TransferRegistry::new();
550 let err = registry
551 .cancel("ghost-id")
552 .expect_err("cancel on missing id must fail");
553 match err {
554 AppError::NotFound { .. } => {}
555 other => panic!("expected NotFound, got {:?}", other),
556 }
557 }
558
559 // -----------------------------------------------------------------------
560 // List with profile filter
561 // -----------------------------------------------------------------------
562
563 #[test]
564 fn list_filters_by_profile() {
565 let mut registry = TransferRegistry::new();
566
567 let mut t2 = make_transfer("t-other");
568 t2.profile_id = ProfileId::new("other-profile");
569 registry.register(make_transfer("t-p1"));
570 registry.register(t2);
571
572 let filtered = registry.list(Some(&profile()));
573 assert_eq!(filtered.len(), 1);
574 assert_eq!(filtered[0].id, "t-p1");
575 }
576
577 #[test]
578 fn list_none_filter_returns_all() {
579 let mut registry = TransferRegistry::new();
580 registry.register(make_transfer("a"));
581 registry.register(make_transfer("b"));
582 assert_eq!(registry.list(None).len(), 2);
583 }
584
585 // -----------------------------------------------------------------------
586 // TransferState serialization
587 // -----------------------------------------------------------------------
588
589 #[test]
590 fn transfer_state_serializes_as_snake_case() {
591 let cases = [
592 (TransferState::Queued, "queued"),
593 (TransferState::Running, "running"),
594 (TransferState::Done, "done"),
595 (TransferState::Failed, "failed"),
596 (TransferState::Canceled, "canceled"),
597 ];
598 for (state, expected) in &cases {
599 let v = serde_json::to_value(state).expect("must serialize");
600 assert_eq!(v.as_str().unwrap(), *expected, "state {:?}", state);
601 }
602 }
603
604 // -----------------------------------------------------------------------
605 // TransferKind serialization
606 // -----------------------------------------------------------------------
607
608 #[test]
609 fn transfer_kind_serializes_as_snake_case() {
610 assert_eq!(
611 serde_json::to_value(TransferKind::Download)
612 .unwrap()
613 .as_str()
614 .unwrap(),
615 "download"
616 );
617 assert_eq!(
618 serde_json::to_value(TransferKind::Upload)
619 .unwrap()
620 .as_str()
621 .unwrap(),
622 "upload"
623 );
624 }
625
626 // -----------------------------------------------------------------------
627 // TransferFilter
628 // -----------------------------------------------------------------------
629
630 #[test]
631 fn filter_active_matches_queued_and_running() {
632 let mut t_q = make_transfer("q");
633 t_q.state = TransferState::Queued;
634 let mut t_r = make_transfer("r");
635 t_r.state = TransferState::Running;
636 let mut t_d = make_transfer("d");
637 t_d.state = TransferState::Done;
638
639 assert!(TransferFilter::Active.matches(&t_q));
640 assert!(TransferFilter::Active.matches(&t_r));
641 assert!(!TransferFilter::Active.matches(&t_d));
642 }
643
644 #[test]
645 fn filter_completed_matches_done_only() {
646 let mut t_d = make_transfer("d");
647 t_d.state = TransferState::Done;
648 let mut t_f = make_transfer("f");
649 t_f.state = TransferState::Failed;
650
651 assert!(TransferFilter::Completed.matches(&t_d));
652 assert!(!TransferFilter::Completed.matches(&t_f));
653 }
654
655 #[test]
656 fn filter_failed_matches_failed_only() {
657 let mut t_f = make_transfer("f");
658 t_f.state = TransferState::Failed;
659 let mut t_d = make_transfer("d");
660 t_d.state = TransferState::Done;
661
662 assert!(TransferFilter::Failed.matches(&t_f));
663 assert!(!TransferFilter::Failed.matches(&t_d));
664 }
665
666 #[test]
667 fn filter_all_matches_every_state() {
668 for state in [
669 TransferState::Queued,
670 TransferState::Running,
671 TransferState::Done,
672 TransferState::Failed,
673 TransferState::Canceled,
674 ] {
675 let mut t = make_transfer("any");
676 t.state = state;
677 assert!(TransferFilter::All.matches(&t));
678 }
679 }
680
681 // -----------------------------------------------------------------------
682 // TransferQueue — concurrency cap
683 // -----------------------------------------------------------------------
684
685 /// Assert that at most `cap` transfers can be Running at the same time.
686 ///
687 /// We enqueue 6 long-running tasks with cap=2. Each task sleeps briefly
688 /// then marks itself Done. We verify that just before the first batch
689 /// finishes, the running count does not exceed 2.
690 #[tokio::test]
691 async fn concurrency_cap_limits_running_transfers() {
692 use std::sync::atomic::{AtomicUsize, Ordering};
693
694 const CAP: usize = 2;
695 const TOTAL: usize = 6;
696
697 let queue = Arc::new(TransferQueue::new(CAP as u32));
698 let active_count = Arc::new(AtomicUsize::new(0));
699 let peak_concurrent = Arc::new(AtomicUsize::new(0));
700 let sem = queue.current_semaphore();
701
702 let mut handles = Vec::new();
703
704 for i in 0..TOTAL {
705 let sem_clone = Arc::clone(&sem);
706 let active_clone = Arc::clone(&active_count);
707 let peak_clone = Arc::clone(&peak_concurrent);
708 let id = format!("t{i}");
709
710 // Register into the registry so we can track state.
711 {
712 let t = Transfer {
713 id: id.clone(),
714 kind: TransferKind::Download,
715 profile_id: profile(),
716 bucket: bucket(),
717 key: format!("key/{i}"),
718 source_path: None,
719 dest_path: Some(PathBuf::from(format!("/tmp/{id}"))),
720 total_bytes: None,
721 transferred_bytes: 0,
722 parts_done: 0,
723 parts_total: 0,
724 state: TransferState::Queued,
725 started_at: now_ms(),
726 finished_at: None,
727 error: None,
728 };
729 let mut reg = queue.registry.write().await;
730 reg.register(t);
731 }
732
733 let handle = tokio::spawn(async move {
734 // Semaphore acquisition is inside the task — simulates the queue.
735 let _permit = sem_clone
736 .acquire()
737 .await
738 .expect("semaphore must not be closed");
739
740 let prev = active_clone.fetch_add(1, Ordering::SeqCst);
741 let current = prev + 1;
742 // Atomically track peak concurrent.
743 peak_clone.fetch_max(current, Ordering::SeqCst);
744
745 // Simulate brief work.
746 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
747
748 active_clone.fetch_sub(1, Ordering::SeqCst);
749 });
750
751 handles.push(handle);
752 }
753
754 for h in handles {
755 h.await.expect("task must not panic");
756 }
757
758 let peak = peak_concurrent.load(Ordering::SeqCst);
759 assert!(
760 peak <= CAP,
761 "peak concurrent ({peak}) must not exceed cap ({CAP})"
762 );
763 }
764
765 // -----------------------------------------------------------------------
766 // TransferQueue::list with filter
767 // -----------------------------------------------------------------------
768
769 #[tokio::test]
770 async fn queue_list_filter_active() {
771 let queue = TransferQueue::new(4);
772 let mut t_active = make_transfer("a");
773 t_active.state = TransferState::Running;
774 let mut t_done = make_transfer("b");
775 t_done.state = TransferState::Done;
776
777 {
778 let mut reg = queue.registry.write().await;
779 reg.register(t_active);
780 reg.register(t_done);
781 }
782
783 let active = queue.list(Some(TransferFilter::Active)).await;
784 assert_eq!(active.len(), 1);
785 assert_eq!(active[0].id, "a");
786 }
787}