1use std::{
23 path::PathBuf,
24 sync::Arc,
25 time::{SystemTime, UNIX_EPOCH},
26};
27
28use aws_sdk_s3::Client;
29use tokio::{fs, io::AsyncWriteExt, sync::oneshot};
30
31use crate::{
32 error::AppError,
33 events::EventEmitter,
34 ids::{BucketId, ObjectKey, ProfileId},
35 locks::{LockId, LockRegistry, LockScope, ReleaseReason},
36 notifications::{os::OsNotifyChannel, NotificationLogHandle},
37 transfers::{
38 notify::notify_terminal,
39 progress::{emit_progress, emit_state, emit_state_with_error, ProgressThrottle},
40 TransferRegistryHandle, TransferState,
41 },
42};
43
44const CHUNK_SIZE: usize = 262_144;
50
51pub async fn download_object<E, C>(
75 client: Arc<Client>,
76 bucket: BucketId,
77 key: String,
78 dest_path: PathBuf,
79 request_id: String,
80 channel: &E,
81 registry: TransferRegistryHandle,
82 lock_registry: Arc<LockRegistry>,
83 mut cancel_rx: oneshot::Receiver<()>,
84 profile_id: ProfileId,
85 lock_ttl_secs: u64,
86 log: NotificationLogHandle,
87 os_notifier: &crate::notifications::os::OsNotifier<C>,
88) -> Result<(), AppError>
89where
90 E: EventEmitter,
91 C: OsNotifyChannel,
92{
93 let now = now_secs();
97 let scope = LockScope {
98 profile: profile_id.clone(),
99 bucket: Some(bucket.clone()),
100 prefix: None,
101 key: Some(ObjectKey::new(key.clone())),
102 };
103
104 let lock_id = lock_registry.acquire(scope, "download", lock_ttl_secs, now)?;
105
106 {
110 let mut reg = registry.0.write().await;
111 let _ = reg.update(&request_id, |t| {
112 t.state = TransferState::Running;
113 });
114 }
115 let _ = emit_state(channel, &request_id, TransferState::Running);
116
117 let resp = client
121 .get_object()
122 .bucket(bucket.as_str())
123 .key(&key)
124 .send()
125 .await
126 .map_err(|e| AppError::Network {
127 source: format!("get_object failed: {e}"),
128 });
129
130 let resp = match resp {
131 Ok(r) => r,
132 Err(e) => {
133 cleanup_on_error(
134 None,
135 &request_id,
136 ®istry,
137 &lock_registry,
138 &lock_id,
139 channel,
140 e.clone(),
141 &log,
142 os_notifier,
143 )
144 .await;
145 return Err(e);
146 }
147 };
148
149 let content_length = resp.content_length().map(|l| l as u64);
150
151 {
153 let mut reg = registry.0.write().await;
154 let _ = reg.update(&request_id, |t| {
155 t.total_bytes = content_length;
156 });
157 }
158
159 let partial_path = {
163 let mut p = dest_path.clone();
164 let name = p
165 .file_name()
166 .map(|n| format!("{}.partial", n.to_string_lossy()))
167 .unwrap_or_else(|| "download.partial".to_string());
168 p.set_file_name(name);
169 p
170 };
171
172 if let Some(parent) = partial_path.parent() {
174 if let Err(e) = fs::create_dir_all(parent).await {
175 let err = AppError::Network {
176 source: format!("create_dir_all failed: {e}"),
177 };
178 cleanup_on_error(
179 None,
180 &request_id,
181 ®istry,
182 &lock_registry,
183 &lock_id,
184 channel,
185 err.clone(),
186 &log,
187 os_notifier,
188 )
189 .await;
190 return Err(err);
191 }
192 }
193
194 let mut file = match fs::File::create(&partial_path).await {
195 Ok(f) => f,
196 Err(e) => {
197 let err = AppError::Network {
198 source: format!("create partial file failed: {e}"),
199 };
200 cleanup_on_error(
201 None,
202 &request_id,
203 ®istry,
204 &lock_registry,
205 &lock_id,
206 channel,
207 err.clone(),
208 &log,
209 os_notifier,
210 )
211 .await;
212 return Err(err);
213 }
214 };
215
216 let mut throttle = ProgressThrottle::new();
220 let mut transferred_bytes: u64 = 0;
221
222 let body_result = resp.body.collect().await.map_err(|e| AppError::Network {
225 source: format!("body collect failed: {e}"),
226 });
227
228 let body = match body_result {
229 Ok(b) => b,
230 Err(e) => {
231 cleanup_on_error(
232 Some(&partial_path),
233 &request_id,
234 ®istry,
235 &lock_registry,
236 &lock_id,
237 channel,
238 e.clone(),
239 &log,
240 os_notifier,
241 )
242 .await;
243 return Err(e);
244 }
245 };
246
247 let bytes = body.into_bytes();
248
249 for chunk in bytes.chunks(CHUNK_SIZE) {
250 if cancel_rx.try_recv().is_ok() {
252 return handle_cancel(
253 &partial_path,
254 &request_id,
255 ®istry,
256 &lock_registry,
257 &lock_id,
258 channel,
259 &log,
260 os_notifier,
261 )
262 .await;
263 }
264
265 if let Err(e) = file.write_all(chunk).await {
266 let err = AppError::Network {
267 source: format!("write chunk failed: {e}"),
268 };
269 cleanup_on_error(
270 Some(&partial_path),
271 &request_id,
272 ®istry,
273 &lock_registry,
274 &lock_id,
275 channel,
276 err.clone(),
277 &log,
278 os_notifier,
279 )
280 .await;
281 return Err(err);
282 }
283
284 transferred_bytes += chunk.len() as u64;
285
286 {
288 let mut reg = registry.0.write().await;
289 let _ = reg.update(&request_id, |t| {
290 t.transferred_bytes = transferred_bytes;
291 });
292 }
293
294 let now_ms = now_ms();
296 let _ = emit_progress(
297 channel,
298 &request_id,
299 transferred_bytes,
300 content_length,
301 0,
302 0,
303 &mut throttle,
304 now_ms,
305 );
306 }
307
308 if cancel_rx.try_recv().is_ok() {
310 return handle_cancel(
311 &partial_path,
312 &request_id,
313 ®istry,
314 &lock_registry,
315 &lock_id,
316 channel,
317 &log,
318 os_notifier,
319 )
320 .await;
321 }
322
323 if let Err(e) = file.flush().await {
325 let err = AppError::Network {
326 source: format!("flush failed: {e}"),
327 };
328 cleanup_on_error(
329 Some(&partial_path),
330 &request_id,
331 ®istry,
332 &lock_registry,
333 &lock_id,
334 channel,
335 err.clone(),
336 &log,
337 os_notifier,
338 )
339 .await;
340 return Err(err);
341 }
342 drop(file);
343
344 if let Err(e) = fs::rename(&partial_path, &dest_path).await {
348 let err = AppError::Network {
349 source: format!("rename partial failed: {e}"),
350 };
351 cleanup_on_error(
352 Some(&partial_path),
353 &request_id,
354 ®istry,
355 &lock_registry,
356 &lock_id,
357 channel,
358 err.clone(),
359 &log,
360 os_notifier,
361 )
362 .await;
363 return Err(err);
364 }
365
366 let finished_at = now_ms();
370 {
371 let mut reg = registry.0.write().await;
372 let _ = reg.update(&request_id, |t| {
373 t.state = TransferState::Done;
374 t.transferred_bytes = transferred_bytes;
375 t.finished_at = Some(finished_at);
376 });
377 }
378
379 let _ = emit_state(channel, &request_id, TransferState::Done);
380
381 if let Ok(lock) = lock_registry.release(&lock_id) {
382 let _ = crate::locks::emit_released(channel, &lock, ReleaseReason::Success);
383 }
384
385 if let Some(transfer) = registry.0.read().await.get(&request_id).cloned() {
387 let _ = notify_terminal(&transfer, channel, &log, os_notifier).await;
388 }
389
390 Ok(())
391}
392
393fn now_secs() -> i64 {
398 SystemTime::now()
399 .duration_since(UNIX_EPOCH)
400 .map(|d| d.as_secs() as i64)
401 .unwrap_or(0)
402}
403
404fn now_ms() -> i64 {
405 SystemTime::now()
406 .duration_since(UNIX_EPOCH)
407 .map(|d| d.as_millis() as i64)
408 .unwrap_or(0)
409}
410
411async fn handle_cancel<E, C>(
413 partial_path: &PathBuf,
414 request_id: &str,
415 registry: &TransferRegistryHandle,
416 lock_registry: &LockRegistry,
417 lock_id: &LockId,
418 channel: &E,
419 log: &NotificationLogHandle,
420 os_notifier: &crate::notifications::os::OsNotifier<C>,
421) -> Result<(), AppError>
422where
423 E: EventEmitter,
424 C: OsNotifyChannel,
425{
426 let _ = fs::remove_file(partial_path).await;
427
428 let finished_at = now_ms();
429 {
430 let mut reg = registry.0.write().await;
431 let _ = reg.update(request_id, |t| {
432 t.state = TransferState::Canceled;
433 t.finished_at = Some(finished_at);
434 });
435 }
436
437 let _ = emit_state(channel, request_id, TransferState::Canceled);
438
439 if let Ok(lock) = lock_registry.release(lock_id) {
440 let _ = crate::locks::emit_released(channel, &lock, ReleaseReason::Cancel);
441 }
442
443 if let Some(transfer) = registry.0.read().await.get(request_id).cloned() {
446 let _ = notify_terminal(&transfer, channel, log, os_notifier).await;
447 }
448
449 Err(AppError::Cancelled)
450}
451
452async fn cleanup_on_error<E, C>(
454 partial_path: Option<&PathBuf>,
455 request_id: &str,
456 registry: &TransferRegistryHandle,
457 lock_registry: &LockRegistry,
458 lock_id: &LockId,
459 channel: &E,
460 error: AppError,
461 log: &NotificationLogHandle,
462 os_notifier: &crate::notifications::os::OsNotifier<C>,
463) where
464 E: EventEmitter,
465 C: OsNotifyChannel,
466{
467 if let Some(p) = partial_path {
468 let _ = fs::remove_file(p).await;
469 }
470
471 let finished_at = now_ms();
472 {
473 let mut reg = registry.0.write().await;
474 let _ = reg.update(request_id, |t| {
475 t.state = TransferState::Failed;
476 t.finished_at = Some(finished_at);
477 t.error = Some(error.clone());
478 });
479 }
480
481 let _ = emit_state_with_error(
482 channel,
483 request_id,
484 TransferState::Failed,
485 Some(error.clone()),
486 );
487
488 if let Ok(lock) = lock_registry.release(lock_id) {
489 let _ = crate::locks::emit_released(channel, &lock, ReleaseReason::Failure);
490 }
491
492 if let Some(transfer) = registry.0.read().await.get(request_id).cloned() {
494 let _ = notify_terminal(&transfer, channel, log, os_notifier).await;
495 }
496}
497
498#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::{
506 ids::{BucketId, ObjectKey, ProfileId},
507 locks::LockRegistry,
508 transfers::{Transfer, TransferKind, TransferState},
509 };
510 use tempfile::tempdir;
511
512 fn make_registry_handle() -> TransferRegistryHandle {
513 TransferRegistryHandle::default()
514 }
515
516 fn make_lock_registry() -> Arc<LockRegistry> {
517 Arc::new(LockRegistry::new())
518 }
519
520 fn profile() -> ProfileId {
521 ProfileId::new("p1")
522 }
523
524 fn bucket() -> BucketId {
525 BucketId::new("test-bucket")
526 }
527
528 fn make_transfer(id: &str, dest: &std::path::Path) -> Transfer {
529 Transfer {
530 id: id.to_owned(),
531 kind: TransferKind::Download,
532 profile_id: profile(),
533 bucket: bucket(),
534 key: "file.bin".to_string(),
535 source_path: None,
536 dest_path: Some(dest.to_path_buf()),
537 total_bytes: None,
538 transferred_bytes: 0,
539 parts_done: 0,
540 parts_total: 0,
541 state: TransferState::Queued,
542 started_at: 1_700_000_000_000,
543 finished_at: None,
544 error: None,
545 }
546 }
547
548 #[tokio::test]
553 async fn conflicting_download_returns_locked_error() {
554 use crate::locks::LockScope;
555
556 let lock_registry = make_lock_registry();
557
558 let scope = LockScope {
560 profile: profile(),
561 bucket: Some(bucket()),
562 prefix: None,
563 key: Some(ObjectKey::new("file.bin")),
564 };
565 let _ = lock_registry
566 .acquire(scope, "existing-op", 300, now_secs())
567 .expect("pre-acquire must succeed");
568
569 let scope2 = LockScope {
571 profile: profile(),
572 bucket: Some(bucket()),
573 prefix: None,
574 key: Some(ObjectKey::new("file.bin")),
575 };
576 let err = lock_registry
577 .acquire(scope2, "download", 300, now_secs())
578 .expect_err("second acquire must fail");
579
580 match err {
581 AppError::Locked { .. } => {}
582 other => panic!("expected Locked, got {:?}", other),
583 }
584 }
585
586 #[tokio::test]
591 async fn cancel_via_registry_sends_signal() {
592 let dir = tempdir().unwrap();
593 let dest = dir.path().join("output.bin");
594
595 let registry_handle = make_registry_handle();
596 let t = make_transfer("t-cancel-test", &dest);
597
598 let (_id, rx) = {
599 let mut reg = registry_handle.0.write().await;
600 reg.register(t)
601 };
602
603 {
605 let mut reg = registry_handle.0.write().await;
606 reg.cancel("t-cancel-test").expect("cancel must succeed");
607 }
608
609 let result = rx.await;
611 assert!(result.is_ok(), "cancel signal must fire on receiver");
612 }
613}