Skip to main content

brows3r_lib/commands/
transfers_cmd.rs

1//! Tauri commands for transfer operations.
2//!
3//! # Commands
4//!
5//! - [`transfer_download`]       — register a download, spawn the stream task, return
6//!                                 the `request_id` immediately so the frontend can
7//!                                 subscribe to `transfer:progress` and `transfer:state`.
8//! - [`transfer_upload`]         — register an upload, spawn the upload task, return
9//!                                 the `request_id` immediately.
10//! - [`transfer_list`]           — list transfers filtered by state.
11//! - [`transfer_cancel`]         — cancel an in-flight transfer.
12//! - [`transfer_retry`]          — re-enqueue a failed/canceled transfer from start.
13//! - [`transfer_upload_many`]    — bulk-enqueue multiple uploads.
14//! - [`transfer_download_many`]  — bulk-enqueue multiple downloads.
15//!
16//! # OCP contract
17//!
18//! Upload/download are independent command paths.  `transfer_list` and
19//! `transfer_cancel` compose over `TransferQueueHandle` without touching either.
20//! Adding a new transfer kind (`Move`, `Copy`) = new `TransferSpec` variant +
21//! one new command here.
22
23use std::{
24    path::PathBuf,
25    sync::{atomic::AtomicBool, Arc},
26};
27
28use tauri::{AppHandle, State};
29
30use crate::{
31    error::AppError,
32    ids::{BucketId, ProfileId},
33    locks::LockRegistryHandle,
34    notifications::{
35        os::{AppHandleChannel, OsNotifier},
36        NotificationLogHandle,
37    },
38    profiles::ProfileStoreHandle,
39    s3::{
40        multipart::{
41            abort_multipart_upload, scan_multipart_uploads, MultipartSource, MultipartTableHandle,
42            MultipartUpload,
43        },
44        S3ClientPoolHandle,
45    },
46    settings::SettingsHandle,
47    transfers::{
48        download::download_object, new_transfer_id, upload::upload_object, Transfer,
49        TransferFilter, TransferKind, TransferQueueHandle, TransferState,
50    },
51};
52
53// ---------------------------------------------------------------------------
54// DTO types for bulk-enqueue commands
55// ---------------------------------------------------------------------------
56
57/// Input for a single upload spec (used by `transfer_upload_many`).
58#[derive(serde::Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct TransferUploadSpec {
61    pub profile_id: ProfileId,
62    pub bucket: BucketId,
63    pub key: String,
64    pub source_path: String,
65}
66
67/// Input for a single download spec (used by `transfer_download_many`).
68#[derive(serde::Deserialize)]
69#[serde(rename_all = "camelCase")]
70pub struct TransferDownloadSpec {
71    pub profile_id: ProfileId,
72    pub bucket: BucketId,
73    pub key: String,
74    pub dest_path: String,
75}
76
77// ---------------------------------------------------------------------------
78// Internal enqueue helpers (no State<> — safe to call from retry)
79// ---------------------------------------------------------------------------
80
81/// Register and spawn a download task.  Returns the new `request_id`.
82async fn enqueue_download(
83    profile_id: ProfileId,
84    bucket: BucketId,
85    key: String,
86    dest_path: PathBuf,
87    queue: &TransferQueueHandle,
88    client: Arc<aws_sdk_s3::Client>,
89    lock_registry: Arc<crate::locks::LockRegistry>,
90    log: NotificationLogHandle,
91    settings: SettingsHandle,
92    channel: AppHandle,
93) -> Result<String, AppError> {
94    let request_id = new_transfer_id();
95    let now_ms = now_ms();
96
97    let transfer = Transfer {
98        id: request_id.clone(),
99        kind: TransferKind::Download,
100        profile_id: profile_id.clone(),
101        bucket: bucket.clone(),
102        key: key.clone(),
103        source_path: None,
104        dest_path: Some(dest_path.clone()),
105        total_bytes: None,
106        transferred_bytes: 0,
107        parts_done: 0,
108        parts_total: 0,
109        state: TransferState::Queued,
110        started_at: now_ms,
111        finished_at: None,
112        error: None,
113    };
114
115    let cancel_rx = {
116        let mut reg = queue.0.registry().write().await;
117        let (id, rx) = reg.register(transfer);
118        assert_eq!(id, request_id);
119        rx
120    };
121
122    let registry_handle = queue.0.registry_handle();
123    let request_id_clone = request_id.clone();
124    let sem = queue.0.current_semaphore();
125
126    const LOCK_TTL_SECS: u64 = 300;
127
128    tokio::spawn(async move {
129        let _permit = sem
130            .acquire_owned()
131            .await
132            .expect("semaphore must not be closed");
133
134        let os_notifier = OsNotifier::new(
135            AppHandleChannel {
136                app: channel.clone(),
137            },
138            settings,
139        );
140
141        let id_for_err = request_id_clone.clone();
142        let result = download_object(
143            client,
144            bucket,
145            key,
146            dest_path,
147            request_id_clone,
148            &channel,
149            registry_handle.clone(),
150            lock_registry,
151            cancel_rx,
152            profile_id,
153            LOCK_TTL_SECS,
154            log,
155            &os_notifier,
156        )
157        .await;
158
159        // download_object emits its own Failed state events on internal
160        // errors, but a non-emitting failure (lock acquisition, early
161        // setup, panic-equivalent) used to vanish silently. Persist the
162        // AppError on the Transfer record so TransferRow can render the
163        // reason instead of just a red badge.
164        if let Err(err) = result {
165            eprintln!("download task {id_for_err} failed: {err:?}");
166            let mut reg = registry_handle.0.write().await;
167            let _ = reg.update(&id_for_err, |t| {
168                t.error = Some(err);
169                if t.state != TransferState::Failed {
170                    t.state = TransferState::Failed;
171                }
172            });
173        }
174    });
175
176    Ok(request_id)
177}
178
179/// Register and spawn an upload task.  Returns the new `request_id`.
180#[allow(clippy::too_many_arguments)]
181async fn enqueue_upload(
182    profile_id: ProfileId,
183    bucket: BucketId,
184    key: String,
185    source_path: PathBuf,
186    queue: &TransferQueueHandle,
187    client: Arc<aws_sdk_s3::Client>,
188    lock_registry: Arc<crate::locks::LockRegistry>,
189    multipart_table: Arc<crate::s3::multipart::MultipartTable>,
190    log: NotificationLogHandle,
191    settings: SettingsHandle,
192    channel: AppHandle,
193) -> Result<String, AppError> {
194    let request_id = new_transfer_id();
195    let now_ms = now_ms();
196
197    let transfer = Transfer {
198        id: request_id.clone(),
199        kind: TransferKind::Upload,
200        profile_id: profile_id.clone(),
201        bucket: bucket.clone(),
202        key: key.clone(),
203        source_path: Some(source_path.clone()),
204        dest_path: None,
205        total_bytes: None,
206        transferred_bytes: 0,
207        parts_done: 0,
208        parts_total: 0,
209        state: TransferState::Queued,
210        started_at: now_ms,
211        finished_at: None,
212        error: None,
213    };
214
215    let cancel_flag = Arc::new(AtomicBool::new(false));
216
217    {
218        let mut reg = queue.0.registry().write().await;
219        let (id, _rx) = reg.register(transfer);
220        assert_eq!(id, request_id);
221    }
222
223    let registry_handle = queue.0.registry_handle();
224    let request_id_clone = request_id.clone();
225    let sem = queue.0.current_semaphore();
226
227    const LOCK_TTL_SECS: u64 = 300;
228    const PARTS_CONCURRENCY: u32 = 4;
229
230    tokio::spawn(async move {
231        let _permit = sem
232            .acquire_owned()
233            .await
234            .expect("semaphore must not be closed");
235
236        let os_notifier = OsNotifier::new(
237            AppHandleChannel {
238                app: channel.clone(),
239            },
240            settings,
241        );
242
243        let id_for_err = request_id_clone.clone();
244        let result = upload_object(
245            client,
246            bucket,
247            key,
248            source_path,
249            request_id_clone,
250            &channel,
251            registry_handle.clone(),
252            lock_registry,
253            multipart_table,
254            PARTS_CONCURRENCY,
255            profile_id,
256            LOCK_TTL_SECS,
257            cancel_flag,
258            log,
259            &os_notifier,
260        )
261        .await;
262
263        // upload_object emits its own Failed state events on internal
264        // errors, but non-emitting failures used to vanish. Persist the
265        // AppError on the Transfer record so TransferRow can render it.
266        if let Err(err) = result {
267            eprintln!("upload task {id_for_err} failed: {err:?}");
268            let mut reg = registry_handle.0.write().await;
269            let _ = reg.update(&id_for_err, |t| {
270                t.error = Some(err);
271                if t.state != TransferState::Failed {
272                    t.state = TransferState::Failed;
273                }
274            });
275        }
276    });
277
278    Ok(request_id)
279}
280
281// ---------------------------------------------------------------------------
282// transfer_download
283// ---------------------------------------------------------------------------
284
285/// Initiate a streaming download of `key` from `bucket` to `dest_path`.
286///
287/// Returns the `request_id` (UUID v4) immediately.
288#[tauri::command]
289pub async fn transfer_download(
290    profile_id: ProfileId,
291    bucket: BucketId,
292    key: String,
293    dest_path: String,
294    queue: State<'_, TransferQueueHandle>,
295    pool: State<'_, S3ClientPoolHandle>,
296    locks: State<'_, LockRegistryHandle>,
297    store: State<'_, ProfileStoreHandle>,
298    log: State<'_, NotificationLogHandle>,
299    settings: State<'_, SettingsHandle>,
300    channel: AppHandle,
301) -> Result<String, AppError> {
302    let (client, profile_id_resolved) = resolve_client(&profile_id, &store, &pool).await?;
303    let lock_registry = Arc::clone(&locks.0);
304    enqueue_download(
305        profile_id_resolved,
306        bucket,
307        key,
308        PathBuf::from(dest_path),
309        &queue,
310        client,
311        lock_registry,
312        (*log).clone(),
313        (*settings).clone(),
314        channel,
315    )
316    .await
317}
318
319// ---------------------------------------------------------------------------
320// transfer_upload
321// ---------------------------------------------------------------------------
322
323/// Initiate an upload of `source_path` to `bucket/key`.
324///
325/// Returns the `request_id` (UUID v4) immediately.
326#[tauri::command]
327pub async fn transfer_upload(
328    profile_id: ProfileId,
329    bucket: BucketId,
330    key: String,
331    source_path: String,
332    queue: State<'_, TransferQueueHandle>,
333    pool: State<'_, S3ClientPoolHandle>,
334    locks: State<'_, LockRegistryHandle>,
335    store: State<'_, ProfileStoreHandle>,
336    multipart_table: State<'_, MultipartTableHandle>,
337    log: State<'_, NotificationLogHandle>,
338    settings: State<'_, SettingsHandle>,
339    channel: AppHandle,
340) -> Result<String, AppError> {
341    let (client, profile_id_resolved) = resolve_client(&profile_id, &store, &pool).await?;
342    let lock_registry = Arc::clone(&locks.0);
343    let multipart_arc = Arc::new(multipart_table.0.clone());
344    enqueue_upload(
345        profile_id_resolved,
346        bucket,
347        key,
348        PathBuf::from(source_path),
349        &queue,
350        client,
351        lock_registry,
352        multipart_arc,
353        (*log).clone(),
354        (*settings).clone(),
355        channel,
356    )
357    .await
358}
359
360// ---------------------------------------------------------------------------
361// transfer_list
362// ---------------------------------------------------------------------------
363
364/// List transfers, optionally filtered by state.
365///
366/// `filter` accepts `null` (→ All), `"active"`, `"completed"`, `"failed"`.
367#[tauri::command]
368pub async fn transfer_list(
369    filter: Option<TransferFilter>,
370    queue: State<'_, TransferQueueHandle>,
371) -> Result<Vec<Transfer>, AppError> {
372    Ok(queue.0.list(filter).await)
373}
374
375// ---------------------------------------------------------------------------
376// transfer_cancel
377// ---------------------------------------------------------------------------
378
379/// Cancel the in-flight transfer with `request_id`.
380///
381/// Idempotent: canceling an already-terminal transfer returns `Ok(())`.
382#[tauri::command]
383pub async fn transfer_cancel(
384    request_id: String,
385    queue: State<'_, TransferQueueHandle>,
386) -> Result<(), AppError> {
387    queue.0.cancel(&request_id).await
388}
389
390// ---------------------------------------------------------------------------
391// transfer_retry
392// ---------------------------------------------------------------------------
393
394/// Re-enqueue a failed or canceled transfer from the beginning.
395///
396/// The original transfer record is not mutated.  A fresh request_id is
397/// returned.  This satisfies AC-14: "the transfer restarts from the beginning
398/// (not resumable in v1)".
399#[tauri::command]
400pub async fn transfer_retry(
401    request_id: String,
402    queue: State<'_, TransferQueueHandle>,
403    pool: State<'_, S3ClientPoolHandle>,
404    locks: State<'_, LockRegistryHandle>,
405    store: State<'_, ProfileStoreHandle>,
406    multipart_table: State<'_, MultipartTableHandle>,
407    log: State<'_, NotificationLogHandle>,
408    settings: State<'_, SettingsHandle>,
409    channel: AppHandle,
410) -> Result<String, AppError> {
411    // Read the original transfer.
412    let original = {
413        let reg = queue.0.registry().read().await;
414        reg.get(&request_id)
415            .cloned()
416            .ok_or_else(|| AppError::NotFound {
417                resource: format!("transfer:{request_id}"),
418            })?
419    };
420
421    // Only allow retry of terminal states.
422    match original.state {
423        TransferState::Failed | TransferState::Canceled => {}
424        _ => {
425            return Err(AppError::Validation {
426                field: "state".to_string(),
427                hint: "retry is only valid for failed or canceled transfers".to_string(),
428            });
429        }
430    }
431
432    let (client, profile_id_resolved) = resolve_client(&original.profile_id, &store, &pool).await?;
433    let lock_registry = Arc::clone(&locks.0);
434
435    match original.kind {
436        TransferKind::Download => {
437            let dest_path = original
438                .dest_path
439                .clone()
440                .ok_or_else(|| AppError::Internal {
441                    trace_id: format!("retry:{request_id}:missing_dest_path"),
442                })?;
443            enqueue_download(
444                profile_id_resolved,
445                original.bucket,
446                original.key,
447                dest_path,
448                &queue,
449                client,
450                lock_registry,
451                (*log).clone(),
452                (*settings).clone(),
453                channel,
454            )
455            .await
456        }
457        TransferKind::Upload => {
458            let source_path = original
459                .source_path
460                .clone()
461                .ok_or_else(|| AppError::Internal {
462                    trace_id: format!("retry:{request_id}:missing_source_path"),
463                })?;
464            let multipart_arc = Arc::new(multipart_table.0.clone());
465            enqueue_upload(
466                profile_id_resolved,
467                original.bucket,
468                original.key,
469                source_path,
470                &queue,
471                client,
472                lock_registry,
473                multipart_arc,
474                (*log).clone(),
475                (*settings).clone(),
476                channel,
477            )
478            .await
479        }
480    }
481}
482
483// ---------------------------------------------------------------------------
484// transfer_upload_many
485// ---------------------------------------------------------------------------
486
487/// Bulk-enqueue multiple uploads.
488///
489/// Returns a `Vec<String>` of request IDs in the same order as `specs`.
490#[tauri::command]
491pub async fn transfer_upload_many(
492    specs: Vec<TransferUploadSpec>,
493    queue: State<'_, TransferQueueHandle>,
494    pool: State<'_, S3ClientPoolHandle>,
495    locks: State<'_, LockRegistryHandle>,
496    store: State<'_, ProfileStoreHandle>,
497    multipart_table: State<'_, MultipartTableHandle>,
498    log: State<'_, NotificationLogHandle>,
499    settings: State<'_, SettingsHandle>,
500    channel: AppHandle,
501) -> Result<Vec<String>, AppError> {
502    // Fail-fast on the first spec that cannot be enqueued. Previously this
503    // used `.unwrap_or_default()` which silently pushed an empty string for
504    // each failed spec — the frontend has no caller that distinguishes an
505    // empty id from a real one, so per-item failures were invisible. The
506    // user-visible payoff: a bad spec now surfaces in the toolbar's
507    // `surfaceUnknownError` instead of looking like "upload started" with
508    // nothing actually happening.
509    let mut ids = Vec::with_capacity(specs.len());
510    for spec in specs {
511        let (client, profile_id_resolved) = resolve_client(&spec.profile_id, &store, &pool).await?;
512        let lock_registry = Arc::clone(&locks.0);
513        let multipart_arc = Arc::new(multipart_table.0.clone());
514        let id = enqueue_upload(
515            profile_id_resolved,
516            spec.bucket,
517            spec.key,
518            PathBuf::from(spec.source_path),
519            &queue,
520            client,
521            lock_registry,
522            multipart_arc,
523            (*log).clone(),
524            (*settings).clone(),
525            channel.clone(),
526        )
527        .await?;
528        ids.push(id);
529    }
530    Ok(ids)
531}
532
533// ---------------------------------------------------------------------------
534// transfer_download_many
535// ---------------------------------------------------------------------------
536
537/// Bulk-enqueue multiple downloads.
538///
539/// Returns a `Vec<String>` of request IDs in the same order as `specs`.
540#[tauri::command]
541pub async fn transfer_download_many(
542    specs: Vec<TransferDownloadSpec>,
543    queue: State<'_, TransferQueueHandle>,
544    pool: State<'_, S3ClientPoolHandle>,
545    locks: State<'_, LockRegistryHandle>,
546    store: State<'_, ProfileStoreHandle>,
547    log: State<'_, NotificationLogHandle>,
548    settings: State<'_, SettingsHandle>,
549    channel: AppHandle,
550) -> Result<Vec<String>, AppError> {
551    // Fail-fast on the first spec that cannot be enqueued. See
552    // `transfer_upload_many` for the full rationale — same change.
553    let mut ids = Vec::with_capacity(specs.len());
554    for spec in specs {
555        let (client, profile_id_resolved) = resolve_client(&spec.profile_id, &store, &pool).await?;
556        let lock_registry = Arc::clone(&locks.0);
557        let id = enqueue_download(
558            profile_id_resolved,
559            spec.bucket,
560            spec.key,
561            PathBuf::from(spec.dest_path),
562            &queue,
563            client,
564            lock_registry,
565            (*log).clone(),
566            (*settings).clone(),
567            channel.clone(),
568        )
569        .await?;
570        ids.push(id);
571    }
572    Ok(ids)
573}
574
575// ---------------------------------------------------------------------------
576// multipart_scan
577// ---------------------------------------------------------------------------
578
579/// List all in-progress multipart uploads for `bucket`, classifying each as
580/// `Brows3r` (in our `multipart_active` table) or `Unknown` (foreign).
581///
582/// Optionally filter out uploads younger than `older_than_secs` seconds.
583///
584/// Satisfies AC-4 cleanup scanner requirement.
585#[tauri::command]
586pub async fn multipart_scan(
587    profile_id: ProfileId,
588    bucket: BucketId,
589    older_than_secs: Option<u64>,
590    store: State<'_, ProfileStoreHandle>,
591    pool: State<'_, S3ClientPoolHandle>,
592    multipart_table: State<'_, MultipartTableHandle>,
593) -> Result<Vec<MultipartUpload>, AppError> {
594    let (client, _) = resolve_client(&profile_id, &store, &pool).await?;
595    scan_multipart_uploads(&client, &bucket, &multipart_table.0, older_than_secs).await
596}
597
598// ---------------------------------------------------------------------------
599// multipart_abort
600// ---------------------------------------------------------------------------
601
602/// Abort a single in-progress multipart upload.
603///
604/// If `source == Unknown` and `confirmed_unknown` is not `true`, returns a
605/// `Validation` error so the frontend must obtain explicit user consent before
606/// aborting a foreign upload.
607///
608/// On success with a `Brows3r`-sourced upload the record is removed from the
609/// `multipart_active` table.
610#[tauri::command]
611pub async fn multipart_abort(
612    profile_id: ProfileId,
613    bucket: BucketId,
614    upload_id: String,
615    key: String,
616    source: MultipartSource,
617    confirmed_unknown: Option<bool>,
618    store: State<'_, ProfileStoreHandle>,
619    pool: State<'_, S3ClientPoolHandle>,
620    multipart_table: State<'_, MultipartTableHandle>,
621) -> Result<(), AppError> {
622    let (client, profile_id_resolved) = resolve_client(&profile_id, &store, &pool).await?;
623    abort_multipart_upload(
624        &client,
625        &bucket,
626        &key,
627        &upload_id,
628        source,
629        &multipart_table.0,
630        &profile_id_resolved,
631        confirmed_unknown.unwrap_or(false),
632    )
633    .await
634}
635
636// ---------------------------------------------------------------------------
637// Helpers
638// ---------------------------------------------------------------------------
639
640fn now_ms() -> i64 {
641    std::time::SystemTime::now()
642        .duration_since(std::time::UNIX_EPOCH)
643        .map(|d| d.as_millis() as i64)
644        .unwrap_or(0)
645}
646
647/// Resolve the S3 client for a profile, enforcing the validation gate.
648async fn resolve_client(
649    profile_id: &ProfileId,
650    store: &State<'_, ProfileStoreHandle>,
651    pool: &State<'_, S3ClientPoolHandle>,
652) -> Result<(Arc<aws_sdk_s3::Client>, ProfileId), AppError> {
653    let profile = {
654        let store_guard = store.inner.lock().await;
655        store_guard
656            .get(profile_id)
657            .ok_or_else(|| AppError::NotFound {
658                resource: format!("profile:{}", profile_id.as_str()),
659            })?
660    };
661
662    if profile.validated_at.is_none() {
663        return Err(AppError::Auth {
664            reason: "profile_not_validated_in_session".to_string(),
665        });
666    }
667
668    let default_region = profile
669        .default_region
670        .clone()
671        .unwrap_or_else(|| "us-east-1".to_string());
672
673    let client = pool
674        .inner
675        .get_or_build(profile_id, &default_region)
676        .await
677        .ok_or_else(|| AppError::Internal {
678            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
679        })?;
680
681    Ok((client, profile_id.clone()))
682}
683
684// ---------------------------------------------------------------------------
685// Tests
686// ---------------------------------------------------------------------------
687
688#[cfg(test)]
689mod tests {
690    use super::*;
691    // Test-only types — production paths construct transfers via the queue's
692    // typed `enqueue_*` helpers, so these aren't reached at runtime.
693    use crate::transfers::TransferSpec;
694
695    // The Tauri command itself is hard to unit-test without a running app;
696    // integration testing is in the integration test file.  Here we just
697    // verify that the TransferQueueHandle default construction works.
698
699    #[test]
700    fn transfer_queue_handle_default_is_empty() {
701        let handle = TransferQueueHandle::default();
702        let rt = tokio::runtime::Runtime::new().unwrap();
703        rt.block_on(async {
704            let transfers = handle.0.list(None).await;
705            assert_eq!(transfers.len(), 0, "fresh queue must be empty");
706        });
707    }
708
709    // -----------------------------------------------------------------------
710    // TransferFilter serialization
711    // -----------------------------------------------------------------------
712
713    #[test]
714    fn transfer_filter_serializes_snake_case() {
715        let cases = [
716            (TransferFilter::Active, "active"),
717            (TransferFilter::Completed, "completed"),
718            (TransferFilter::Failed, "failed"),
719            (TransferFilter::All, "all"),
720        ];
721        for (filter, expected) in &cases {
722            let v = serde_json::to_value(filter).expect("must serialize");
723            assert_eq!(v.as_str().unwrap(), *expected, "filter {:?}", filter);
724        }
725    }
726
727    // -----------------------------------------------------------------------
728    // TransferSpec serialization
729    // -----------------------------------------------------------------------
730
731    #[test]
732    fn transfer_spec_upload_serializes() {
733        let spec = TransferSpec::Upload {
734            profile: ProfileId::new("my-profile"),
735            bucket: BucketId::new("my-bucket"),
736            key: "data/file.txt".to_string(),
737            source_path: PathBuf::from("/local/file.txt"),
738        };
739        let v = serde_json::to_value(&spec).expect("must serialize");
740        assert_eq!(v["kind"], "upload");
741        assert_eq!(v["key"], "data/file.txt");
742    }
743
744    #[test]
745    fn transfer_spec_download_serializes() {
746        let spec = TransferSpec::Download {
747            profile: ProfileId::new("my-profile"),
748            bucket: BucketId::new("my-bucket"),
749            key: "data/file.txt".to_string(),
750            dest_path: PathBuf::from("/tmp/file.txt"),
751        };
752        let v = serde_json::to_value(&spec).expect("must serialize");
753        assert_eq!(v["kind"], "download");
754    }
755
756    // -----------------------------------------------------------------------
757    // Retry: requires Failed or Canceled state
758    // -----------------------------------------------------------------------
759
760    #[tokio::test]
761    async fn transfer_retry_rejects_non_terminal_state() {
762        let queue = TransferQueueHandle::default();
763        let request_id = new_transfer_id();
764        let transfer = Transfer {
765            id: request_id.clone(),
766            kind: TransferKind::Download,
767            profile_id: ProfileId::new("p1"),
768            bucket: BucketId::new("bucket"),
769            key: "key".to_string(),
770            source_path: None,
771            dest_path: Some(PathBuf::from("/tmp/out")),
772            total_bytes: None,
773            transferred_bytes: 0,
774            parts_done: 0,
775            parts_total: 0,
776            state: TransferState::Running,
777            started_at: 0,
778            finished_at: None,
779            error: None,
780        };
781        {
782            let mut reg = queue.0.registry().write().await;
783            reg.register(transfer);
784        }
785
786        // We cannot call transfer_retry (needs State<>), so we test the
787        // state guard logic directly.
788        let reg = queue.0.registry().read().await;
789        let t = reg.get(&request_id).unwrap();
790        let is_retriable = t.state == TransferState::Failed || t.state == TransferState::Canceled;
791        assert!(!is_retriable, "Running state must not be retriable");
792    }
793}