1use std::{
24 path::PathBuf,
25 sync::{atomic::AtomicBool, Arc},
26};
27
28use tauri::{AppHandle, State};
29
30use crate::{
31 error::AppError,
32 ids::{BucketId, ProfileId},
33 locks::LockRegistryHandle,
34 notifications::{
35 os::{AppHandleChannel, OsNotifier},
36 NotificationLogHandle,
37 },
38 profiles::ProfileStoreHandle,
39 s3::{
40 multipart::{
41 abort_multipart_upload, scan_multipart_uploads, MultipartSource, MultipartTableHandle,
42 MultipartUpload,
43 },
44 S3ClientPoolHandle,
45 },
46 settings::SettingsHandle,
47 transfers::{
48 download::download_object, new_transfer_id, upload::upload_object, Transfer,
49 TransferFilter, TransferKind, TransferQueueHandle, TransferState,
50 },
51};
52
53#[derive(serde::Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct TransferUploadSpec {
61 pub profile_id: ProfileId,
62 pub bucket: BucketId,
63 pub key: String,
64 pub source_path: String,
65}
66
67#[derive(serde::Deserialize)]
69#[serde(rename_all = "camelCase")]
70pub struct TransferDownloadSpec {
71 pub profile_id: ProfileId,
72 pub bucket: BucketId,
73 pub key: String,
74 pub dest_path: String,
75}
76
77async fn enqueue_download(
83 profile_id: ProfileId,
84 bucket: BucketId,
85 key: String,
86 dest_path: PathBuf,
87 queue: &TransferQueueHandle,
88 client: Arc<aws_sdk_s3::Client>,
89 lock_registry: Arc<crate::locks::LockRegistry>,
90 log: NotificationLogHandle,
91 settings: SettingsHandle,
92 channel: AppHandle,
93) -> Result<String, AppError> {
94 let request_id = new_transfer_id();
95 let now_ms = now_ms();
96
97 let transfer = Transfer {
98 id: request_id.clone(),
99 kind: TransferKind::Download,
100 profile_id: profile_id.clone(),
101 bucket: bucket.clone(),
102 key: key.clone(),
103 source_path: None,
104 dest_path: Some(dest_path.clone()),
105 total_bytes: None,
106 transferred_bytes: 0,
107 parts_done: 0,
108 parts_total: 0,
109 state: TransferState::Queued,
110 started_at: now_ms,
111 finished_at: None,
112 error: None,
113 };
114
115 let cancel_rx = {
116 let mut reg = queue.0.registry().write().await;
117 let (id, rx) = reg.register(transfer);
118 assert_eq!(id, request_id);
119 rx
120 };
121
122 let registry_handle = queue.0.registry_handle();
123 let request_id_clone = request_id.clone();
124 let sem = queue.0.current_semaphore();
125
126 const LOCK_TTL_SECS: u64 = 300;
127
128 tokio::spawn(async move {
129 let _permit = sem
130 .acquire_owned()
131 .await
132 .expect("semaphore must not be closed");
133
134 let os_notifier = OsNotifier::new(
135 AppHandleChannel {
136 app: channel.clone(),
137 },
138 settings,
139 );
140
141 let id_for_err = request_id_clone.clone();
142 let result = download_object(
143 client,
144 bucket,
145 key,
146 dest_path,
147 request_id_clone,
148 &channel,
149 registry_handle.clone(),
150 lock_registry,
151 cancel_rx,
152 profile_id,
153 LOCK_TTL_SECS,
154 log,
155 &os_notifier,
156 )
157 .await;
158
159 if let Err(err) = result {
165 eprintln!("download task {id_for_err} failed: {err:?}");
166 let mut reg = registry_handle.0.write().await;
167 let _ = reg.update(&id_for_err, |t| {
168 t.error = Some(err);
169 if t.state != TransferState::Failed {
170 t.state = TransferState::Failed;
171 }
172 });
173 }
174 });
175
176 Ok(request_id)
177}
178
179#[allow(clippy::too_many_arguments)]
181async fn enqueue_upload(
182 profile_id: ProfileId,
183 bucket: BucketId,
184 key: String,
185 source_path: PathBuf,
186 queue: &TransferQueueHandle,
187 client: Arc<aws_sdk_s3::Client>,
188 lock_registry: Arc<crate::locks::LockRegistry>,
189 multipart_table: Arc<crate::s3::multipart::MultipartTable>,
190 log: NotificationLogHandle,
191 settings: SettingsHandle,
192 channel: AppHandle,
193) -> Result<String, AppError> {
194 let request_id = new_transfer_id();
195 let now_ms = now_ms();
196
197 let transfer = Transfer {
198 id: request_id.clone(),
199 kind: TransferKind::Upload,
200 profile_id: profile_id.clone(),
201 bucket: bucket.clone(),
202 key: key.clone(),
203 source_path: Some(source_path.clone()),
204 dest_path: None,
205 total_bytes: None,
206 transferred_bytes: 0,
207 parts_done: 0,
208 parts_total: 0,
209 state: TransferState::Queued,
210 started_at: now_ms,
211 finished_at: None,
212 error: None,
213 };
214
215 let cancel_flag = Arc::new(AtomicBool::new(false));
216
217 {
218 let mut reg = queue.0.registry().write().await;
219 let (id, _rx) = reg.register(transfer);
220 assert_eq!(id, request_id);
221 }
222
223 let registry_handle = queue.0.registry_handle();
224 let request_id_clone = request_id.clone();
225 let sem = queue.0.current_semaphore();
226
227 const LOCK_TTL_SECS: u64 = 300;
228 const PARTS_CONCURRENCY: u32 = 4;
229
230 tokio::spawn(async move {
231 let _permit = sem
232 .acquire_owned()
233 .await
234 .expect("semaphore must not be closed");
235
236 let os_notifier = OsNotifier::new(
237 AppHandleChannel {
238 app: channel.clone(),
239 },
240 settings,
241 );
242
243 let id_for_err = request_id_clone.clone();
244 let result = upload_object(
245 client,
246 bucket,
247 key,
248 source_path,
249 request_id_clone,
250 &channel,
251 registry_handle.clone(),
252 lock_registry,
253 multipart_table,
254 PARTS_CONCURRENCY,
255 profile_id,
256 LOCK_TTL_SECS,
257 cancel_flag,
258 log,
259 &os_notifier,
260 )
261 .await;
262
263 if let Err(err) = result {
267 eprintln!("upload task {id_for_err} failed: {err:?}");
268 let mut reg = registry_handle.0.write().await;
269 let _ = reg.update(&id_for_err, |t| {
270 t.error = Some(err);
271 if t.state != TransferState::Failed {
272 t.state = TransferState::Failed;
273 }
274 });
275 }
276 });
277
278 Ok(request_id)
279}
280
281#[tauri::command]
289pub async fn transfer_download(
290 profile_id: ProfileId,
291 bucket: BucketId,
292 key: String,
293 dest_path: String,
294 queue: State<'_, TransferQueueHandle>,
295 pool: State<'_, S3ClientPoolHandle>,
296 locks: State<'_, LockRegistryHandle>,
297 store: State<'_, ProfileStoreHandle>,
298 log: State<'_, NotificationLogHandle>,
299 settings: State<'_, SettingsHandle>,
300 channel: AppHandle,
301) -> Result<String, AppError> {
302 let (client, profile_id_resolved) = resolve_client(&profile_id, &store, &pool).await?;
303 let lock_registry = Arc::clone(&locks.0);
304 enqueue_download(
305 profile_id_resolved,
306 bucket,
307 key,
308 PathBuf::from(dest_path),
309 &queue,
310 client,
311 lock_registry,
312 (*log).clone(),
313 (*settings).clone(),
314 channel,
315 )
316 .await
317}
318
319#[tauri::command]
327pub async fn transfer_upload(
328 profile_id: ProfileId,
329 bucket: BucketId,
330 key: String,
331 source_path: String,
332 queue: State<'_, TransferQueueHandle>,
333 pool: State<'_, S3ClientPoolHandle>,
334 locks: State<'_, LockRegistryHandle>,
335 store: State<'_, ProfileStoreHandle>,
336 multipart_table: State<'_, MultipartTableHandle>,
337 log: State<'_, NotificationLogHandle>,
338 settings: State<'_, SettingsHandle>,
339 channel: AppHandle,
340) -> Result<String, AppError> {
341 let (client, profile_id_resolved) = resolve_client(&profile_id, &store, &pool).await?;
342 let lock_registry = Arc::clone(&locks.0);
343 let multipart_arc = Arc::new(multipart_table.0.clone());
344 enqueue_upload(
345 profile_id_resolved,
346 bucket,
347 key,
348 PathBuf::from(source_path),
349 &queue,
350 client,
351 lock_registry,
352 multipart_arc,
353 (*log).clone(),
354 (*settings).clone(),
355 channel,
356 )
357 .await
358}
359
360#[tauri::command]
368pub async fn transfer_list(
369 filter: Option<TransferFilter>,
370 queue: State<'_, TransferQueueHandle>,
371) -> Result<Vec<Transfer>, AppError> {
372 Ok(queue.0.list(filter).await)
373}
374
375#[tauri::command]
383pub async fn transfer_cancel(
384 request_id: String,
385 queue: State<'_, TransferQueueHandle>,
386) -> Result<(), AppError> {
387 queue.0.cancel(&request_id).await
388}
389
390#[tauri::command]
400pub async fn transfer_retry(
401 request_id: String,
402 queue: State<'_, TransferQueueHandle>,
403 pool: State<'_, S3ClientPoolHandle>,
404 locks: State<'_, LockRegistryHandle>,
405 store: State<'_, ProfileStoreHandle>,
406 multipart_table: State<'_, MultipartTableHandle>,
407 log: State<'_, NotificationLogHandle>,
408 settings: State<'_, SettingsHandle>,
409 channel: AppHandle,
410) -> Result<String, AppError> {
411 let original = {
413 let reg = queue.0.registry().read().await;
414 reg.get(&request_id)
415 .cloned()
416 .ok_or_else(|| AppError::NotFound {
417 resource: format!("transfer:{request_id}"),
418 })?
419 };
420
421 match original.state {
423 TransferState::Failed | TransferState::Canceled => {}
424 _ => {
425 return Err(AppError::Validation {
426 field: "state".to_string(),
427 hint: "retry is only valid for failed or canceled transfers".to_string(),
428 });
429 }
430 }
431
432 let (client, profile_id_resolved) = resolve_client(&original.profile_id, &store, &pool).await?;
433 let lock_registry = Arc::clone(&locks.0);
434
435 match original.kind {
436 TransferKind::Download => {
437 let dest_path = original
438 .dest_path
439 .clone()
440 .ok_or_else(|| AppError::Internal {
441 trace_id: format!("retry:{request_id}:missing_dest_path"),
442 })?;
443 enqueue_download(
444 profile_id_resolved,
445 original.bucket,
446 original.key,
447 dest_path,
448 &queue,
449 client,
450 lock_registry,
451 (*log).clone(),
452 (*settings).clone(),
453 channel,
454 )
455 .await
456 }
457 TransferKind::Upload => {
458 let source_path = original
459 .source_path
460 .clone()
461 .ok_or_else(|| AppError::Internal {
462 trace_id: format!("retry:{request_id}:missing_source_path"),
463 })?;
464 let multipart_arc = Arc::new(multipart_table.0.clone());
465 enqueue_upload(
466 profile_id_resolved,
467 original.bucket,
468 original.key,
469 source_path,
470 &queue,
471 client,
472 lock_registry,
473 multipart_arc,
474 (*log).clone(),
475 (*settings).clone(),
476 channel,
477 )
478 .await
479 }
480 }
481}
482
483#[tauri::command]
491pub async fn transfer_upload_many(
492 specs: Vec<TransferUploadSpec>,
493 queue: State<'_, TransferQueueHandle>,
494 pool: State<'_, S3ClientPoolHandle>,
495 locks: State<'_, LockRegistryHandle>,
496 store: State<'_, ProfileStoreHandle>,
497 multipart_table: State<'_, MultipartTableHandle>,
498 log: State<'_, NotificationLogHandle>,
499 settings: State<'_, SettingsHandle>,
500 channel: AppHandle,
501) -> Result<Vec<String>, AppError> {
502 let mut ids = Vec::with_capacity(specs.len());
510 for spec in specs {
511 let (client, profile_id_resolved) = resolve_client(&spec.profile_id, &store, &pool).await?;
512 let lock_registry = Arc::clone(&locks.0);
513 let multipart_arc = Arc::new(multipart_table.0.clone());
514 let id = enqueue_upload(
515 profile_id_resolved,
516 spec.bucket,
517 spec.key,
518 PathBuf::from(spec.source_path),
519 &queue,
520 client,
521 lock_registry,
522 multipart_arc,
523 (*log).clone(),
524 (*settings).clone(),
525 channel.clone(),
526 )
527 .await?;
528 ids.push(id);
529 }
530 Ok(ids)
531}
532
533#[tauri::command]
541pub async fn transfer_download_many(
542 specs: Vec<TransferDownloadSpec>,
543 queue: State<'_, TransferQueueHandle>,
544 pool: State<'_, S3ClientPoolHandle>,
545 locks: State<'_, LockRegistryHandle>,
546 store: State<'_, ProfileStoreHandle>,
547 log: State<'_, NotificationLogHandle>,
548 settings: State<'_, SettingsHandle>,
549 channel: AppHandle,
550) -> Result<Vec<String>, AppError> {
551 let mut ids = Vec::with_capacity(specs.len());
554 for spec in specs {
555 let (client, profile_id_resolved) = resolve_client(&spec.profile_id, &store, &pool).await?;
556 let lock_registry = Arc::clone(&locks.0);
557 let id = enqueue_download(
558 profile_id_resolved,
559 spec.bucket,
560 spec.key,
561 PathBuf::from(spec.dest_path),
562 &queue,
563 client,
564 lock_registry,
565 (*log).clone(),
566 (*settings).clone(),
567 channel.clone(),
568 )
569 .await?;
570 ids.push(id);
571 }
572 Ok(ids)
573}
574
575#[tauri::command]
586pub async fn multipart_scan(
587 profile_id: ProfileId,
588 bucket: BucketId,
589 older_than_secs: Option<u64>,
590 store: State<'_, ProfileStoreHandle>,
591 pool: State<'_, S3ClientPoolHandle>,
592 multipart_table: State<'_, MultipartTableHandle>,
593) -> Result<Vec<MultipartUpload>, AppError> {
594 let (client, _) = resolve_client(&profile_id, &store, &pool).await?;
595 scan_multipart_uploads(&client, &bucket, &multipart_table.0, older_than_secs).await
596}
597
598#[tauri::command]
611pub async fn multipart_abort(
612 profile_id: ProfileId,
613 bucket: BucketId,
614 upload_id: String,
615 key: String,
616 source: MultipartSource,
617 confirmed_unknown: Option<bool>,
618 store: State<'_, ProfileStoreHandle>,
619 pool: State<'_, S3ClientPoolHandle>,
620 multipart_table: State<'_, MultipartTableHandle>,
621) -> Result<(), AppError> {
622 let (client, profile_id_resolved) = resolve_client(&profile_id, &store, &pool).await?;
623 abort_multipart_upload(
624 &client,
625 &bucket,
626 &key,
627 &upload_id,
628 source,
629 &multipart_table.0,
630 &profile_id_resolved,
631 confirmed_unknown.unwrap_or(false),
632 )
633 .await
634}
635
636fn now_ms() -> i64 {
641 std::time::SystemTime::now()
642 .duration_since(std::time::UNIX_EPOCH)
643 .map(|d| d.as_millis() as i64)
644 .unwrap_or(0)
645}
646
647async fn resolve_client(
649 profile_id: &ProfileId,
650 store: &State<'_, ProfileStoreHandle>,
651 pool: &State<'_, S3ClientPoolHandle>,
652) -> Result<(Arc<aws_sdk_s3::Client>, ProfileId), AppError> {
653 let profile = {
654 let store_guard = store.inner.lock().await;
655 store_guard
656 .get(profile_id)
657 .ok_or_else(|| AppError::NotFound {
658 resource: format!("profile:{}", profile_id.as_str()),
659 })?
660 };
661
662 if profile.validated_at.is_none() {
663 return Err(AppError::Auth {
664 reason: "profile_not_validated_in_session".to_string(),
665 });
666 }
667
668 let default_region = profile
669 .default_region
670 .clone()
671 .unwrap_or_else(|| "us-east-1".to_string());
672
673 let client = pool
674 .inner
675 .get_or_build(profile_id, &default_region)
676 .await
677 .ok_or_else(|| AppError::Internal {
678 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
679 })?;
680
681 Ok((client, profile_id.clone()))
682}
683
684#[cfg(test)]
689mod tests {
690 use super::*;
691 use crate::transfers::TransferSpec;
694
695 #[test]
700 fn transfer_queue_handle_default_is_empty() {
701 let handle = TransferQueueHandle::default();
702 let rt = tokio::runtime::Runtime::new().unwrap();
703 rt.block_on(async {
704 let transfers = handle.0.list(None).await;
705 assert_eq!(transfers.len(), 0, "fresh queue must be empty");
706 });
707 }
708
709 #[test]
714 fn transfer_filter_serializes_snake_case() {
715 let cases = [
716 (TransferFilter::Active, "active"),
717 (TransferFilter::Completed, "completed"),
718 (TransferFilter::Failed, "failed"),
719 (TransferFilter::All, "all"),
720 ];
721 for (filter, expected) in &cases {
722 let v = serde_json::to_value(filter).expect("must serialize");
723 assert_eq!(v.as_str().unwrap(), *expected, "filter {:?}", filter);
724 }
725 }
726
727 #[test]
732 fn transfer_spec_upload_serializes() {
733 let spec = TransferSpec::Upload {
734 profile: ProfileId::new("my-profile"),
735 bucket: BucketId::new("my-bucket"),
736 key: "data/file.txt".to_string(),
737 source_path: PathBuf::from("/local/file.txt"),
738 };
739 let v = serde_json::to_value(&spec).expect("must serialize");
740 assert_eq!(v["kind"], "upload");
741 assert_eq!(v["key"], "data/file.txt");
742 }
743
744 #[test]
745 fn transfer_spec_download_serializes() {
746 let spec = TransferSpec::Download {
747 profile: ProfileId::new("my-profile"),
748 bucket: BucketId::new("my-bucket"),
749 key: "data/file.txt".to_string(),
750 dest_path: PathBuf::from("/tmp/file.txt"),
751 };
752 let v = serde_json::to_value(&spec).expect("must serialize");
753 assert_eq!(v["kind"], "download");
754 }
755
756 #[tokio::test]
761 async fn transfer_retry_rejects_non_terminal_state() {
762 let queue = TransferQueueHandle::default();
763 let request_id = new_transfer_id();
764 let transfer = Transfer {
765 id: request_id.clone(),
766 kind: TransferKind::Download,
767 profile_id: ProfileId::new("p1"),
768 bucket: BucketId::new("bucket"),
769 key: "key".to_string(),
770 source_path: None,
771 dest_path: Some(PathBuf::from("/tmp/out")),
772 total_bytes: None,
773 transferred_bytes: 0,
774 parts_done: 0,
775 parts_total: 0,
776 state: TransferState::Running,
777 started_at: 0,
778 finished_at: None,
779 error: None,
780 };
781 {
782 let mut reg = queue.0.registry().write().await;
783 reg.register(transfer);
784 }
785
786 let reg = queue.0.registry().read().await;
789 let t = reg.get(&request_id).unwrap();
790 let is_retriable = t.state == TransferState::Failed || t.state == TransferState::Canceled;
791 assert!(!is_retriable, "Running state must not be retriable");
792 }
793}