Skip to main content

brows3r_lib/transfers/
download.rs

1//! Streaming download implementation.
2//!
3//! # Protocol
4//!
5//! 1. Acquire a scoped resource lock via `LockRegistry`.
6//! 2. Emit `transfer:state { state: running }`.
7//! 3. Issue `GetObject` — obtain `content_length` and the body `ByteStream`.
8//! 4. Open `<dest>.partial` for writing (atomic rename on success).
9//! 5. Stream 256 KB chunks; for each chunk: write to file, increment
10//!    `transferred_bytes`, call `emit_progress` (throttled).
11//! 6. Poll the cancel receiver between chunks.
12//! 7. On cancel: delete `.partial`, emit `Canceled`, release lock with `Cancel`.
13//! 8. On success: rename `.partial` → final, emit `Done`, release lock.
14//! 9. On I/O or S3 error: delete `.partial`, emit `Failed`, release lock.
15//!
16//! # OCP contract
17//!
18//! The atomic `.partial` rename pattern and lock acquire/release sequence are
19//! reusable for any downloaded asset.  Upload (task 32) acquires the same lock
20//! registry with a different `op_name`.
21
22use std::{
23    path::PathBuf,
24    sync::Arc,
25    time::{SystemTime, UNIX_EPOCH},
26};
27
28use aws_sdk_s3::Client;
29use tokio::{fs, io::AsyncWriteExt, sync::oneshot};
30
31use crate::{
32    error::AppError,
33    events::EventEmitter,
34    ids::{BucketId, ObjectKey, ProfileId},
35    locks::{LockId, LockRegistry, LockScope, ReleaseReason},
36    notifications::{os::OsNotifyChannel, NotificationLogHandle},
37    transfers::{
38        notify::notify_terminal,
39        progress::{emit_progress, emit_state, emit_state_with_error, ProgressThrottle},
40        TransferRegistryHandle, TransferState,
41    },
42};
43
44// ---------------------------------------------------------------------------
45// Chunk size constant
46// ---------------------------------------------------------------------------
47
48/// Bytes per read chunk — 256 KB.
49const CHUNK_SIZE: usize = 262_144;
50
51// ---------------------------------------------------------------------------
52// download_object
53// ---------------------------------------------------------------------------
54
55/// Stream-download `key` from `bucket` and write it atomically to `dest_path`.
56///
57/// # Parameters
58///
59/// - `client`          — authenticated S3 client for the profile's region.
60/// - `bucket`          — target bucket name.
61/// - `key`             — S3 object key.
62/// - `dest_path`       — local destination path (final, not `.partial`).
63/// - `request_id`      — UUID v4 string from `TransferRegistry::register`.
64/// - `channel`         — Tauri `AppHandle` or `MockChannel` for event emission.
65/// - `registry`        — shared `TransferRegistryHandle` for state updates.
66/// - `lock_registry`   — resource lock registry; this function acquires and
67///                       releases the lock.
68/// - `cancel_rx`       — oneshot receiver; resolved when the caller calls
69///                       `TransferRegistry::cancel`.
70/// - `profile_id`      — used to build the `LockScope`.
71/// - `lock_ttl_secs`   — TTL for the acquired lock.
72/// - `log`             — in-app notification log for terminal-state notifications.
73/// - `os_notifier`     — OS notification bridge (settings-gated internally).
74pub async fn download_object<E, C>(
75    client: Arc<Client>,
76    bucket: BucketId,
77    key: String,
78    dest_path: PathBuf,
79    request_id: String,
80    channel: &E,
81    registry: TransferRegistryHandle,
82    lock_registry: Arc<LockRegistry>,
83    mut cancel_rx: oneshot::Receiver<()>,
84    profile_id: ProfileId,
85    lock_ttl_secs: u64,
86    log: NotificationLogHandle,
87    os_notifier: &crate::notifications::os::OsNotifier<C>,
88) -> Result<(), AppError>
89where
90    E: EventEmitter,
91    C: OsNotifyChannel,
92{
93    // -----------------------------------------------------------------------
94    // 1. Acquire resource lock
95    // -----------------------------------------------------------------------
96    let now = now_secs();
97    let scope = LockScope {
98        profile: profile_id.clone(),
99        bucket: Some(bucket.clone()),
100        prefix: None,
101        key: Some(ObjectKey::new(key.clone())),
102    };
103
104    let lock_id = lock_registry.acquire(scope, "download", lock_ttl_secs, now)?;
105
106    // -----------------------------------------------------------------------
107    // 2. Mark transfer Running
108    // -----------------------------------------------------------------------
109    {
110        let mut reg = registry.0.write().await;
111        let _ = reg.update(&request_id, |t| {
112            t.state = TransferState::Running;
113        });
114    }
115    let _ = emit_state(channel, &request_id, TransferState::Running);
116
117    // -----------------------------------------------------------------------
118    // 3. GetObject — obtain body stream and content length
119    // -----------------------------------------------------------------------
120    let resp = client
121        .get_object()
122        .bucket(bucket.as_str())
123        .key(&key)
124        .send()
125        .await
126        .map_err(|e| AppError::Network {
127            source: format!("get_object failed: {e}"),
128        });
129
130    let resp = match resp {
131        Ok(r) => r,
132        Err(e) => {
133            cleanup_on_error(
134                None,
135                &request_id,
136                &registry,
137                &lock_registry,
138                &lock_id,
139                channel,
140                e.clone(),
141                &log,
142                os_notifier,
143            )
144            .await;
145            return Err(e);
146        }
147    };
148
149    let content_length = resp.content_length().map(|l| l as u64);
150
151    // Update total_bytes now that we know it.
152    {
153        let mut reg = registry.0.write().await;
154        let _ = reg.update(&request_id, |t| {
155            t.total_bytes = content_length;
156        });
157    }
158
159    // -----------------------------------------------------------------------
160    // 4. Open <dest>.partial for writing
161    // -----------------------------------------------------------------------
162    let partial_path = {
163        let mut p = dest_path.clone();
164        let name = p
165            .file_name()
166            .map(|n| format!("{}.partial", n.to_string_lossy()))
167            .unwrap_or_else(|| "download.partial".to_string());
168        p.set_file_name(name);
169        p
170    };
171
172    // Ensure parent directory exists.
173    if let Some(parent) = partial_path.parent() {
174        if let Err(e) = fs::create_dir_all(parent).await {
175            let err = AppError::Network {
176                source: format!("create_dir_all failed: {e}"),
177            };
178            cleanup_on_error(
179                None,
180                &request_id,
181                &registry,
182                &lock_registry,
183                &lock_id,
184                channel,
185                err.clone(),
186                &log,
187                os_notifier,
188            )
189            .await;
190            return Err(err);
191        }
192    }
193
194    let mut file = match fs::File::create(&partial_path).await {
195        Ok(f) => f,
196        Err(e) => {
197            let err = AppError::Network {
198                source: format!("create partial file failed: {e}"),
199            };
200            cleanup_on_error(
201                None,
202                &request_id,
203                &registry,
204                &lock_registry,
205                &lock_id,
206                channel,
207                err.clone(),
208                &log,
209                os_notifier,
210            )
211            .await;
212            return Err(err);
213        }
214    };
215
216    // -----------------------------------------------------------------------
217    // 5. Stream body in 256 KB chunks
218    // -----------------------------------------------------------------------
219    let mut throttle = ProgressThrottle::new();
220    let mut transferred_bytes: u64 = 0;
221
222    // Collect the full body; AWS SDK v1 ByteStream collects via `.collect()`.
223    // We still emit progress events by slicing the collected bytes into chunks.
224    let body_result = resp.body.collect().await.map_err(|e| AppError::Network {
225        source: format!("body collect failed: {e}"),
226    });
227
228    let body = match body_result {
229        Ok(b) => b,
230        Err(e) => {
231            cleanup_on_error(
232                Some(&partial_path),
233                &request_id,
234                &registry,
235                &lock_registry,
236                &lock_id,
237                channel,
238                e.clone(),
239                &log,
240                os_notifier,
241            )
242            .await;
243            return Err(e);
244        }
245    };
246
247    let bytes = body.into_bytes();
248
249    for chunk in bytes.chunks(CHUNK_SIZE) {
250        // Poll cancel signal between chunks (non-blocking try_recv).
251        if cancel_rx.try_recv().is_ok() {
252            return handle_cancel(
253                &partial_path,
254                &request_id,
255                &registry,
256                &lock_registry,
257                &lock_id,
258                channel,
259                &log,
260                os_notifier,
261            )
262            .await;
263        }
264
265        if let Err(e) = file.write_all(chunk).await {
266            let err = AppError::Network {
267                source: format!("write chunk failed: {e}"),
268            };
269            cleanup_on_error(
270                Some(&partial_path),
271                &request_id,
272                &registry,
273                &lock_registry,
274                &lock_id,
275                channel,
276                err.clone(),
277                &log,
278                os_notifier,
279            )
280            .await;
281            return Err(err);
282        }
283
284        transferred_bytes += chunk.len() as u64;
285
286        // Update registry.
287        {
288            let mut reg = registry.0.write().await;
289            let _ = reg.update(&request_id, |t| {
290                t.transferred_bytes = transferred_bytes;
291            });
292        }
293
294        // Throttled progress event.
295        let now_ms = now_ms();
296        let _ = emit_progress(
297            channel,
298            &request_id,
299            transferred_bytes,
300            content_length,
301            0,
302            0,
303            &mut throttle,
304            now_ms,
305        );
306    }
307
308    // Final check for cancel signal before rename.
309    if cancel_rx.try_recv().is_ok() {
310        return handle_cancel(
311            &partial_path,
312            &request_id,
313            &registry,
314            &lock_registry,
315            &lock_id,
316            channel,
317            &log,
318            os_notifier,
319        )
320        .await;
321    }
322
323    // Flush and close.
324    if let Err(e) = file.flush().await {
325        let err = AppError::Network {
326            source: format!("flush failed: {e}"),
327        };
328        cleanup_on_error(
329            Some(&partial_path),
330            &request_id,
331            &registry,
332            &lock_registry,
333            &lock_id,
334            channel,
335            err.clone(),
336            &log,
337            os_notifier,
338        )
339        .await;
340        return Err(err);
341    }
342    drop(file);
343
344    // -----------------------------------------------------------------------
345    // 6. Atomic rename: .partial → final
346    // -----------------------------------------------------------------------
347    if let Err(e) = fs::rename(&partial_path, &dest_path).await {
348        let err = AppError::Network {
349            source: format!("rename partial failed: {e}"),
350        };
351        cleanup_on_error(
352            Some(&partial_path),
353            &request_id,
354            &registry,
355            &lock_registry,
356            &lock_id,
357            channel,
358            err.clone(),
359            &log,
360            os_notifier,
361        )
362        .await;
363        return Err(err);
364    }
365
366    // -----------------------------------------------------------------------
367    // 7. Success path: emit Done, release lock
368    // -----------------------------------------------------------------------
369    let finished_at = now_ms();
370    {
371        let mut reg = registry.0.write().await;
372        let _ = reg.update(&request_id, |t| {
373            t.state = TransferState::Done;
374            t.transferred_bytes = transferred_bytes;
375            t.finished_at = Some(finished_at);
376        });
377    }
378
379    let _ = emit_state(channel, &request_id, TransferState::Done);
380
381    if let Ok(lock) = lock_registry.release(&lock_id) {
382        let _ = crate::locks::emit_released(channel, &lock, ReleaseReason::Success);
383    }
384
385    // Fire in-app and optional OS notification for terminal Done state.
386    if let Some(transfer) = registry.0.read().await.get(&request_id).cloned() {
387        let _ = notify_terminal(&transfer, channel, &log, os_notifier).await;
388    }
389
390    Ok(())
391}
392
393// ---------------------------------------------------------------------------
394// Helpers
395// ---------------------------------------------------------------------------
396
397fn now_secs() -> i64 {
398    SystemTime::now()
399        .duration_since(UNIX_EPOCH)
400        .map(|d| d.as_secs() as i64)
401        .unwrap_or(0)
402}
403
404fn now_ms() -> i64 {
405    SystemTime::now()
406        .duration_since(UNIX_EPOCH)
407        .map(|d| d.as_millis() as i64)
408        .unwrap_or(0)
409}
410
411/// Cancel path: delete `.partial`, emit Canceled, release lock.
412async fn handle_cancel<E, C>(
413    partial_path: &PathBuf,
414    request_id: &str,
415    registry: &TransferRegistryHandle,
416    lock_registry: &LockRegistry,
417    lock_id: &LockId,
418    channel: &E,
419    log: &NotificationLogHandle,
420    os_notifier: &crate::notifications::os::OsNotifier<C>,
421) -> Result<(), AppError>
422where
423    E: EventEmitter,
424    C: OsNotifyChannel,
425{
426    let _ = fs::remove_file(partial_path).await;
427
428    let finished_at = now_ms();
429    {
430        let mut reg = registry.0.write().await;
431        let _ = reg.update(request_id, |t| {
432            t.state = TransferState::Canceled;
433            t.finished_at = Some(finished_at);
434        });
435    }
436
437    let _ = emit_state(channel, request_id, TransferState::Canceled);
438
439    if let Ok(lock) = lock_registry.release(lock_id) {
440        let _ = crate::locks::emit_released(channel, &lock, ReleaseReason::Cancel);
441    }
442
443    // Fire in-app notification for Canceled (OS notification is silenced by
444    // notify_terminal because Canceled maps to Info severity).
445    if let Some(transfer) = registry.0.read().await.get(request_id).cloned() {
446        let _ = notify_terminal(&transfer, channel, log, os_notifier).await;
447    }
448
449    Err(AppError::Cancelled)
450}
451
452/// Error path: delete `.partial` (if any), emit Failed, release lock.
453async fn cleanup_on_error<E, C>(
454    partial_path: Option<&PathBuf>,
455    request_id: &str,
456    registry: &TransferRegistryHandle,
457    lock_registry: &LockRegistry,
458    lock_id: &LockId,
459    channel: &E,
460    error: AppError,
461    log: &NotificationLogHandle,
462    os_notifier: &crate::notifications::os::OsNotifier<C>,
463) where
464    E: EventEmitter,
465    C: OsNotifyChannel,
466{
467    if let Some(p) = partial_path {
468        let _ = fs::remove_file(p).await;
469    }
470
471    let finished_at = now_ms();
472    {
473        let mut reg = registry.0.write().await;
474        let _ = reg.update(request_id, |t| {
475            t.state = TransferState::Failed;
476            t.finished_at = Some(finished_at);
477            t.error = Some(error.clone());
478        });
479    }
480
481    let _ = emit_state_with_error(
482        channel,
483        request_id,
484        TransferState::Failed,
485        Some(error.clone()),
486    );
487
488    if let Ok(lock) = lock_registry.release(lock_id) {
489        let _ = crate::locks::emit_released(channel, &lock, ReleaseReason::Failure);
490    }
491
492    // Fire in-app + OS notification for Failed terminal state.
493    if let Some(transfer) = registry.0.read().await.get(request_id).cloned() {
494        let _ = notify_terminal(&transfer, channel, log, os_notifier).await;
495    }
496}
497
498// ---------------------------------------------------------------------------
499// Tests
500// ---------------------------------------------------------------------------
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::{
506        ids::{BucketId, ObjectKey, ProfileId},
507        locks::LockRegistry,
508        transfers::{Transfer, TransferKind, TransferState},
509    };
510    use tempfile::tempdir;
511
512    fn make_registry_handle() -> TransferRegistryHandle {
513        TransferRegistryHandle::default()
514    }
515
516    fn make_lock_registry() -> Arc<LockRegistry> {
517        Arc::new(LockRegistry::new())
518    }
519
520    fn profile() -> ProfileId {
521        ProfileId::new("p1")
522    }
523
524    fn bucket() -> BucketId {
525        BucketId::new("test-bucket")
526    }
527
528    fn make_transfer(id: &str, dest: &std::path::Path) -> Transfer {
529        Transfer {
530            id: id.to_owned(),
531            kind: TransferKind::Download,
532            profile_id: profile(),
533            bucket: bucket(),
534            key: "file.bin".to_string(),
535            source_path: None,
536            dest_path: Some(dest.to_path_buf()),
537            total_bytes: None,
538            transferred_bytes: 0,
539            parts_done: 0,
540            parts_total: 0,
541            state: TransferState::Queued,
542            started_at: 1_700_000_000_000,
543            finished_at: None,
544            error: None,
545        }
546    }
547
548    // -----------------------------------------------------------------------
549    // Lock acquisition conflict
550    // -----------------------------------------------------------------------
551
552    #[tokio::test]
553    async fn conflicting_download_returns_locked_error() {
554        use crate::locks::LockScope;
555
556        let lock_registry = make_lock_registry();
557
558        // Pre-acquire a lock on the same key.
559        let scope = LockScope {
560            profile: profile(),
561            bucket: Some(bucket()),
562            prefix: None,
563            key: Some(ObjectKey::new("file.bin")),
564        };
565        let _ = lock_registry
566            .acquire(scope, "existing-op", 300, now_secs())
567            .expect("pre-acquire must succeed");
568
569        // Attempting to acquire the same scope should yield Locked.
570        let scope2 = LockScope {
571            profile: profile(),
572            bucket: Some(bucket()),
573            prefix: None,
574            key: Some(ObjectKey::new("file.bin")),
575        };
576        let err = lock_registry
577            .acquire(scope2, "download", 300, now_secs())
578            .expect_err("second acquire must fail");
579
580        match err {
581            AppError::Locked { .. } => {}
582            other => panic!("expected Locked, got {:?}", other),
583        }
584    }
585
586    // -----------------------------------------------------------------------
587    // Cancel receiver fires correctly
588    // -----------------------------------------------------------------------
589
590    #[tokio::test]
591    async fn cancel_via_registry_sends_signal() {
592        let dir = tempdir().unwrap();
593        let dest = dir.path().join("output.bin");
594
595        let registry_handle = make_registry_handle();
596        let t = make_transfer("t-cancel-test", &dest);
597
598        let (_id, rx) = {
599            let mut reg = registry_handle.0.write().await;
600            reg.register(t)
601        };
602
603        // Cancel via the registry.
604        {
605            let mut reg = registry_handle.0.write().await;
606            reg.cancel("t-cancel-test").expect("cancel must succeed");
607        }
608
609        // Receiver should immediately be ready.
610        let result = rx.await;
611        assert!(result.is_ok(), "cancel signal must fire on receiver");
612    }
613}