1use tauri::{AppHandle, State};
41
42use std::collections::HashMap;
43
44use crate::{
45 cache::{invalidation::on_object_mutation, store::CacheHandle, CacheKey, Freshness},
46 error::AppError,
47 events::{self, EventKind},
48 ids::{BucketId, ObjectKey, ProfileId},
49 locks::{emit_acquired, emit_released, LockRegistryHandle, LockScope, ReleaseReason},
50 profiles::ProfileStoreHandle,
51 s3::{
52 cross_account::{ConfirmScope, ConfirmationCacheHandle},
53 list::{list_objects, list_objects_flat, ListPage},
54 metadata::{set_object_metadata as s3_set_metadata, PutResult},
55 object::{
56 copy_object_with_fallback as s3_copy_object_with_fallback,
57 create_folder as s3_create_folder, delete_objects_batch as s3_delete_objects_batch,
58 get_object_bytes as s3_get_object_bytes, get_object_text as s3_get_object_text,
59 move_object as s3_move_object, parent_prefix, put_object_text as s3_put_object_text,
60 BytesPayload, CopyOptions, CopyOutcome, DeleteReport, MoveResult, TextPayload,
61 DEFAULT_BYTES_MAX_BYTES, DEFAULT_TEXT_MAX_BYTES,
62 },
63 presign::{presign_get_object, PresignedUrl},
64 tags::set_object_tags as s3_set_tags,
65 S3ClientPoolHandle,
66 },
67};
68use serde::{Deserialize, Serialize};
69
70#[tauri::command]
80pub async fn objects_list(
81 profile_id: ProfileId,
82 bucket: BucketId,
83 prefix: String,
84 continuation_token: Option<String>,
85 force: Option<bool>,
86 store: State<'_, ProfileStoreHandle>,
87 pool: State<'_, S3ClientPoolHandle>,
88 cache: State<'_, CacheHandle>,
89 _channel: AppHandle,
90) -> Result<ListPage, AppError> {
91 let profile = {
95 let store_guard = store.inner.lock().await;
96 store_guard
97 .get(&profile_id)
98 .ok_or_else(|| AppError::NotFound {
99 resource: format!("profile:{}", profile_id.as_str()),
100 })?
101 };
102
103 if profile.validated_at.is_none() {
105 return Err(AppError::Auth {
106 reason: "profile_not_validated_in_session".to_string(),
107 });
108 }
109
110 let default_region = profile
111 .default_region
112 .clone()
113 .unwrap_or_else(|| "us-east-1".to_string());
114
115 let cache_arc: CacheHandle = (*cache).clone();
116
117 let is_first_page = continuation_token.is_none();
121
122 if is_first_page && force != Some(true) {
123 let cache_key = CacheKey::Objects {
124 profile: profile_id.clone(),
125 bucket: bucket.clone(),
126 prefix: prefix.clone(),
127 };
128
129 let cached = cache_arc.get::<ListPage>(&cache_key, profile.validated_at)?;
130
131 if let Some(read) = cached {
132 if read.freshness == Freshness::Fresh || read.freshness == Freshness::Stale {
133 return Ok(read.value);
134 }
135 }
136 }
137
138 let client = pool
142 .inner
143 .get_or_build(&profile_id, &default_region)
144 .await
145 .ok_or_else(|| AppError::Internal {
146 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
147 })?;
148
149 let page = list_objects(
150 &client,
151 bucket.as_str(),
152 &prefix,
153 Some("/"),
154 continuation_token.as_deref(),
155 None,
156 )
157 .await?;
158
159 if is_first_page {
163 let cache_key = CacheKey::Objects {
164 profile: profile_id.clone(),
165 bucket: bucket.clone(),
166 prefix: prefix.clone(),
167 };
168 let _ = cache_arc.put(&cache_key, &page, None);
169 }
170
171 Ok(page)
172}
173
174#[tauri::command]
183pub async fn objects_list_flat(
184 profile_id: ProfileId,
185 bucket: BucketId,
186 prefix: String,
187 continuation_token: Option<String>,
188 force: Option<bool>,
189 store: State<'_, ProfileStoreHandle>,
190 pool: State<'_, S3ClientPoolHandle>,
191 cache: State<'_, CacheHandle>,
192 _channel: AppHandle,
193) -> Result<ListPage, AppError> {
194 let profile = {
198 let store_guard = store.inner.lock().await;
199 store_guard
200 .get(&profile_id)
201 .ok_or_else(|| AppError::NotFound {
202 resource: format!("profile:{}", profile_id.as_str()),
203 })?
204 };
205
206 if profile.validated_at.is_none() {
207 return Err(AppError::Auth {
208 reason: "profile_not_validated_in_session".to_string(),
209 });
210 }
211
212 let default_region = profile
213 .default_region
214 .clone()
215 .unwrap_or_else(|| "us-east-1".to_string());
216
217 let cache_arc: CacheHandle = (*cache).clone();
218
219 let is_first_page = continuation_token.is_none();
226 let flat_prefix = format!("{}__FLAT__", prefix);
227
228 if is_first_page && force != Some(true) {
229 let cache_key = CacheKey::Objects {
230 profile: profile_id.clone(),
231 bucket: bucket.clone(),
232 prefix: flat_prefix.clone(),
233 };
234
235 let cached = cache_arc.get::<ListPage>(&cache_key, profile.validated_at)?;
236
237 if let Some(read) = cached {
238 if read.freshness == Freshness::Fresh || read.freshness == Freshness::Stale {
239 return Ok(read.value);
240 }
241 }
242 }
243
244 let client = pool
248 .inner
249 .get_or_build(&profile_id, &default_region)
250 .await
251 .ok_or_else(|| AppError::Internal {
252 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
253 })?;
254
255 let page = list_objects_flat(
256 &client,
257 bucket.as_str(),
258 &prefix,
259 continuation_token.as_deref(),
260 None,
261 )
262 .await?;
263
264 if is_first_page {
268 let cache_key = CacheKey::Objects {
269 profile: profile_id.clone(),
270 bucket: bucket.clone(),
271 prefix: flat_prefix,
272 };
273 let _ = cache_arc.put(&cache_key, &page, None);
274 }
275
276 Ok(page)
277}
278
279#[derive(Debug, Clone, Serialize, Deserialize)]
290#[serde(rename_all = "camelCase")]
291pub struct ObjectRef {
292 pub bucket: BucketId,
293 pub key: String,
294}
295
296#[derive(Debug, Clone, Serialize)]
298#[serde(rename_all = "camelCase")]
299struct ObjectsUpdatedPayload {
300 profile_id: ProfileId,
301 bucket: BucketId,
302 prefix: String,
303}
304
305fn emit_objects_updated<E: events::EventEmitter>(
307 channel: &E,
308 profile_id: &ProfileId,
309 bucket: &BucketId,
310 prefix: &str,
311) -> Result<(), AppError> {
312 events::emit(
313 channel,
314 EventKind::ObjectsUpdated,
315 ObjectsUpdatedPayload {
316 profile_id: profile_id.clone(),
317 bucket: bucket.clone(),
318 prefix: prefix.to_string(),
319 },
320 )
321}
322
323#[tauri::command]
343pub async fn object_copy(
344 profile_id: ProfileId,
345 source: ObjectRef,
346 destination: ObjectRef,
347 options: CopyOptions,
348 confirmed_token: Option<String>,
349 store: State<'_, ProfileStoreHandle>,
350 pool: State<'_, S3ClientPoolHandle>,
351 locks: State<'_, LockRegistryHandle>,
352 cache: State<'_, CacheHandle>,
353 confirm_cache: State<'_, ConfirmationCacheHandle>,
354 channel: AppHandle,
355) -> Result<CopyOutcome, AppError> {
356 let profile = {
360 let store_guard = store.inner.lock().await;
361 store_guard
362 .get(&profile_id)
363 .ok_or_else(|| AppError::NotFound {
364 resource: format!("profile:{}", profile_id.as_str()),
365 })?
366 };
367
368 if profile.validated_at.is_none() {
369 return Err(AppError::Auth {
370 reason: "profile_not_validated_in_session".to_string(),
371 });
372 }
373
374 let default_region = profile
375 .default_region
376 .clone()
377 .unwrap_or_else(|| "us-east-1".to_string());
378
379 let src_prefix = parent_prefix(&source.key);
383 let dest_prefix = parent_prefix(&destination.key);
384
385 let now = std::time::SystemTime::now()
386 .duration_since(std::time::UNIX_EPOCH)
387 .unwrap_or_default()
388 .as_secs() as i64;
389
390 let src_scope = LockScope {
391 profile: profile_id.clone(),
392 bucket: Some(source.bucket.clone()),
393 prefix: Some(src_prefix.clone()),
394 key: None,
395 };
396 let dest_scope = LockScope {
397 profile: profile_id.clone(),
398 bucket: Some(destination.bucket.clone()),
399 prefix: Some(dest_prefix.clone()),
400 key: None,
401 };
402
403 let registry = locks.inner();
404 let src_lock_id = registry.acquire(src_scope, "object_copy:source", 300, now)?;
405 let src_lock = registry
406 .list(None)
407 .into_iter()
408 .find(|l| l.id == src_lock_id)
409 .expect("lock must exist right after acquire");
410 let _ = emit_acquired(&channel, &src_lock);
411
412 let dest_lock_id = match registry.acquire(dest_scope, "object_copy:dest", 300, now) {
413 Ok(id) => id,
414 Err(e) => {
415 if let Ok(lock) = registry.release(&src_lock_id) {
417 let _ = emit_released(&channel, &lock, ReleaseReason::Failure);
418 }
419 return Err(e);
420 }
421 };
422 let dest_lock = registry
423 .list(None)
424 .into_iter()
425 .find(|l| l.id == dest_lock_id)
426 .expect("lock must exist right after acquire");
427 let _ = emit_acquired(&channel, &dest_lock);
428
429 let client = pool
433 .inner
434 .get_or_build(&profile_id, &default_region)
435 .await
436 .ok_or_else(|| AppError::Internal {
437 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
438 })?;
439
440 const DEFAULT_FALLBACK_THRESHOLD: u64 = 100 * 1024 * 1024;
442
443 let result = s3_copy_object_with_fallback(
444 &client,
445 source.bucket.as_str(),
446 &source.key,
447 destination.bucket.as_str(),
448 &destination.key,
449 &options,
450 DEFAULT_FALLBACK_THRESHOLD,
451 confirmed_token,
452 &confirm_cache.inner,
453 profile_id.as_str(),
454 )
455 .await;
456
457 let release_reason = if result.is_ok() {
461 ReleaseReason::Success
462 } else {
463 ReleaseReason::Failure
464 };
465
466 if let Ok(lock) = registry.release(&dest_lock_id) {
467 let _ = emit_released(&channel, &lock, release_reason.clone());
468 }
469 if let Ok(lock) = registry.release(&src_lock_id) {
470 let _ = emit_released(&channel, &lock, release_reason);
471 }
472
473 let copy_outcome = result?;
474
475 let cache_arc: CacheHandle = (*cache).clone();
479 on_object_mutation(
480 &profile_id,
481 &destination.bucket,
482 &dest_prefix,
483 None,
484 &cache_arc,
485 );
486 let _ = emit_objects_updated(&channel, &profile_id, &destination.bucket, &dest_prefix);
487
488 Ok(copy_outcome)
489}
490
491#[tauri::command]
512pub async fn cross_account_confirm(
513 profile_id: ProfileId,
514 source: ObjectRef,
515 destination: ObjectRef,
516 store: State<'_, ProfileStoreHandle>,
517 confirm_cache: State<'_, ConfirmationCacheHandle>,
518) -> Result<String, AppError> {
519 let profile = {
521 let store_guard = store.inner.lock().await;
522 store_guard
523 .get(&profile_id)
524 .ok_or_else(|| AppError::NotFound {
525 resource: format!("profile:{}", profile_id.as_str()),
526 })?
527 };
528
529 if profile.validated_at.is_none() {
530 return Err(AppError::Auth {
531 reason: "profile_not_validated_in_session".to_string(),
532 });
533 }
534
535 let scope = ConfirmScope {
536 profile: profile_id.as_str().to_string(),
537 source_bucket: source.bucket.as_str().to_string(),
538 source_key: source.key.clone(),
539 dest_bucket: destination.bucket.as_str().to_string(),
540 dest_key: destination.key.clone(),
541 };
542
543 let token = confirm_cache.inner.mint(scope);
544 Ok(token)
545}
546
547#[tauri::command]
555pub async fn object_move(
556 profile_id: ProfileId,
557 source: ObjectRef,
558 destination: ObjectRef,
559 options: CopyOptions,
560 store: State<'_, ProfileStoreHandle>,
561 pool: State<'_, S3ClientPoolHandle>,
562 locks: State<'_, LockRegistryHandle>,
563 cache: State<'_, CacheHandle>,
564 channel: AppHandle,
565) -> Result<MoveResult, AppError> {
566 let profile = {
570 let store_guard = store.inner.lock().await;
571 store_guard
572 .get(&profile_id)
573 .ok_or_else(|| AppError::NotFound {
574 resource: format!("profile:{}", profile_id.as_str()),
575 })?
576 };
577
578 if profile.validated_at.is_none() {
579 return Err(AppError::Auth {
580 reason: "profile_not_validated_in_session".to_string(),
581 });
582 }
583
584 let default_region = profile
585 .default_region
586 .clone()
587 .unwrap_or_else(|| "us-east-1".to_string());
588
589 let src_prefix = parent_prefix(&source.key);
593 let dest_prefix = parent_prefix(&destination.key);
594
595 let now = std::time::SystemTime::now()
596 .duration_since(std::time::UNIX_EPOCH)
597 .unwrap_or_default()
598 .as_secs() as i64;
599
600 let src_scope = LockScope {
601 profile: profile_id.clone(),
602 bucket: Some(source.bucket.clone()),
603 prefix: Some(src_prefix.clone()),
604 key: None,
605 };
606 let dest_scope = LockScope {
607 profile: profile_id.clone(),
608 bucket: Some(destination.bucket.clone()),
609 prefix: Some(dest_prefix.clone()),
610 key: None,
611 };
612
613 let registry = locks.inner();
614 let src_lock_id = registry.acquire(src_scope, "object_move:source", 300, now)?;
615 let src_lock = registry
616 .list(None)
617 .into_iter()
618 .find(|l| l.id == src_lock_id)
619 .expect("lock must exist right after acquire");
620 let _ = emit_acquired(&channel, &src_lock);
621
622 let dest_lock_id = match registry.acquire(dest_scope, "object_move:dest", 300, now) {
623 Ok(id) => id,
624 Err(e) => {
625 if let Ok(lock) = registry.release(&src_lock_id) {
626 let _ = emit_released(&channel, &lock, ReleaseReason::Failure);
627 }
628 return Err(e);
629 }
630 };
631 let dest_lock = registry
632 .list(None)
633 .into_iter()
634 .find(|l| l.id == dest_lock_id)
635 .expect("lock must exist right after acquire");
636 let _ = emit_acquired(&channel, &dest_lock);
637
638 let client = pool
642 .inner
643 .get_or_build(&profile_id, &default_region)
644 .await
645 .ok_or_else(|| AppError::Internal {
646 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
647 })?;
648
649 let result = s3_move_object(
650 &client,
651 source.bucket.as_str(),
652 &source.key,
653 destination.bucket.as_str(),
654 &destination.key,
655 &options,
656 )
657 .await;
658
659 let release_reason = if result.is_ok() {
663 ReleaseReason::Success
664 } else {
665 ReleaseReason::Failure
666 };
667
668 if let Ok(lock) = registry.release(&dest_lock_id) {
669 let _ = emit_released(&channel, &lock, release_reason.clone());
670 }
671 if let Ok(lock) = registry.release(&src_lock_id) {
672 let _ = emit_released(&channel, &lock, release_reason);
673 }
674
675 let move_result = result?;
676
677 let cache_arc: CacheHandle = (*cache).clone();
681 on_object_mutation(&profile_id, &source.bucket, &src_prefix, None, &cache_arc);
682 on_object_mutation(
683 &profile_id,
684 &destination.bucket,
685 &dest_prefix,
686 None,
687 &cache_arc,
688 );
689 let _ = emit_objects_updated(&channel, &profile_id, &source.bucket, &src_prefix);
690 let _ = emit_objects_updated(&channel, &profile_id, &destination.bucket, &dest_prefix);
691
692 Ok(move_result)
693}
694
695#[tauri::command]
704pub async fn object_create_folder(
705 profile_id: ProfileId,
706 bucket: BucketId,
707 prefix: String,
708 store: State<'_, ProfileStoreHandle>,
709 pool: State<'_, S3ClientPoolHandle>,
710 locks: State<'_, LockRegistryHandle>,
711 cache: State<'_, CacheHandle>,
712 channel: AppHandle,
713) -> Result<(), AppError> {
714 let profile = {
718 let store_guard = store.inner.lock().await;
719 store_guard
720 .get(&profile_id)
721 .ok_or_else(|| AppError::NotFound {
722 resource: format!("profile:{}", profile_id.as_str()),
723 })?
724 };
725
726 if profile.validated_at.is_none() {
727 return Err(AppError::Auth {
728 reason: "profile_not_validated_in_session".to_string(),
729 });
730 }
731
732 let default_region = profile
733 .default_region
734 .clone()
735 .unwrap_or_else(|| "us-east-1".to_string());
736
737 let now = std::time::SystemTime::now()
741 .duration_since(std::time::UNIX_EPOCH)
742 .unwrap_or_default()
743 .as_secs() as i64;
744
745 let scope = LockScope {
746 profile: profile_id.clone(),
747 bucket: Some(bucket.clone()),
748 prefix: Some(prefix.clone()),
749 key: None,
750 };
751
752 let registry = locks.inner();
753 let lock_id = registry.acquire(scope, "object_create_folder", 300, now)?;
754 let lock = registry
755 .list(None)
756 .into_iter()
757 .find(|l| l.id == lock_id)
758 .expect("lock must exist right after acquire");
759 let _ = emit_acquired(&channel, &lock);
760
761 let client = pool
765 .inner
766 .get_or_build(&profile_id, &default_region)
767 .await
768 .ok_or_else(|| AppError::Internal {
769 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
770 })?;
771
772 let result = s3_create_folder(&client, bucket.as_str(), &prefix).await;
773
774 let release_reason = if result.is_ok() {
778 ReleaseReason::Success
779 } else {
780 ReleaseReason::Failure
781 };
782 if let Ok(lock) = registry.release(&lock_id) {
783 let _ = emit_released(&channel, &lock, release_reason);
784 }
785
786 result?;
787
788 let parent = parent_prefix(&prefix);
792 let cache_arc: CacheHandle = (*cache).clone();
793 on_object_mutation(&profile_id, &bucket, &parent, None, &cache_arc);
794 let _ = emit_objects_updated(&channel, &profile_id, &bucket, &parent);
795
796 Ok(())
797}
798
799#[derive(Debug, Clone, Serialize, Deserialize)]
808#[serde(rename_all = "camelCase")]
809pub struct DeleteKey {
810 pub key: String,
811 #[serde(skip_serializing_if = "Option::is_none")]
812 pub version_id: Option<String>,
813}
814
815#[tauri::command]
826pub async fn object_delete_batch(
827 profile_id: ProfileId,
828 bucket: BucketId,
829 keys: Vec<DeleteKey>,
830 store: State<'_, ProfileStoreHandle>,
831 pool: State<'_, S3ClientPoolHandle>,
832 locks: State<'_, LockRegistryHandle>,
833 cache: State<'_, CacheHandle>,
834 channel: AppHandle,
835) -> Result<DeleteReport, AppError> {
836 let profile = {
840 let store_guard = store.inner.lock().await;
841 store_guard
842 .get(&profile_id)
843 .ok_or_else(|| AppError::NotFound {
844 resource: format!("profile:{}", profile_id.as_str()),
845 })?
846 };
847
848 if profile.validated_at.is_none() {
849 return Err(AppError::Auth {
850 reason: "profile_not_validated_in_session".to_string(),
851 });
852 }
853
854 let default_region = profile
855 .default_region
856 .clone()
857 .unwrap_or_else(|| "us-east-1".to_string());
858
859 let unique_prefixes: Vec<String> = keys
866 .iter()
867 .map(|dk| parent_prefix(&dk.key))
868 .collect::<std::collections::BTreeSet<_>>()
869 .into_iter()
870 .collect();
871
872 let now = std::time::SystemTime::now()
873 .duration_since(std::time::UNIX_EPOCH)
874 .unwrap_or_default()
875 .as_secs() as i64;
876
877 let registry = locks.inner();
878 let mut acquired_lock_ids: Vec<crate::locks::LockId> =
879 Vec::with_capacity(unique_prefixes.len());
880
881 for prefix in &unique_prefixes {
882 let scope = LockScope {
883 profile: profile_id.clone(),
884 bucket: Some(bucket.clone()),
885 prefix: Some(prefix.clone()),
886 key: None,
887 };
888 let lock_id = match registry.acquire(scope, "object_delete_batch", 300, now) {
889 Ok(id) => id,
890 Err(e) => {
891 for held_id in &acquired_lock_ids {
893 if let Ok(lock) = registry.release(held_id) {
894 let _ = emit_released(&channel, &lock, ReleaseReason::Failure);
895 }
896 }
897 return Err(e);
898 }
899 };
900 let lock = registry
901 .list(None)
902 .into_iter()
903 .find(|l| l.id == lock_id)
904 .expect("lock must exist right after acquire");
905 let _ = emit_acquired(&channel, &lock);
906 acquired_lock_ids.push(lock_id);
907 }
908
909 let client = pool
913 .inner
914 .get_or_build(&profile_id, &default_region)
915 .await
916 .ok_or_else(|| AppError::Internal {
917 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
918 })?;
919
920 let key_pairs: Vec<(ObjectKey, Option<String>)> = keys
922 .iter()
923 .map(|dk| (ObjectKey::new(dk.key.clone()), dk.version_id.clone()))
924 .collect();
925
926 let result = s3_delete_objects_batch(&client, bucket.as_str(), key_pairs).await;
927
928 let release_reason = if result.is_ok() {
932 ReleaseReason::Success
933 } else {
934 ReleaseReason::Failure
935 };
936 for held_id in &acquired_lock_ids {
937 if let Ok(lock) = registry.release(held_id) {
938 let _ = emit_released(&channel, &lock, release_reason.clone());
939 }
940 }
941
942 let report = result?;
943
944 let affected_prefixes: std::collections::BTreeSet<String> = report
953 .deleted
954 .iter()
955 .map(|d| parent_prefix(&d.key))
956 .collect();
957
958 let cache_arc: CacheHandle = (*cache).clone();
959 for prefix in &affected_prefixes {
960 on_object_mutation(&profile_id, &bucket, prefix, None, &cache_arc);
961 let _ = emit_objects_updated(&channel, &profile_id, &bucket, prefix);
962 }
963
964 Ok(report)
965}
966
967#[tauri::command]
979pub async fn object_set_metadata(
980 profile_id: ProfileId,
981 bucket: BucketId,
982 key: String,
983 metadata: HashMap<String, String>,
984 if_match_etag: Option<String>,
985 store: State<'_, ProfileStoreHandle>,
986 pool: State<'_, S3ClientPoolHandle>,
987 locks: State<'_, LockRegistryHandle>,
988 cache: State<'_, CacheHandle>,
989 channel: AppHandle,
990) -> Result<PutResult, AppError> {
991 let profile = {
995 let store_guard = store.inner.lock().await;
996 store_guard
997 .get(&profile_id)
998 .ok_or_else(|| AppError::NotFound {
999 resource: format!("profile:{}", profile_id.as_str()),
1000 })?
1001 };
1002
1003 if profile.validated_at.is_none() {
1004 return Err(AppError::Auth {
1005 reason: "profile_not_validated_in_session".to_string(),
1006 });
1007 }
1008
1009 let default_region = profile
1010 .default_region
1011 .clone()
1012 .unwrap_or_else(|| "us-east-1".to_string());
1013
1014 let prefix = parent_prefix(&key);
1018
1019 let now = std::time::SystemTime::now()
1020 .duration_since(std::time::UNIX_EPOCH)
1021 .unwrap_or_default()
1022 .as_secs() as i64;
1023
1024 let scope = LockScope {
1025 profile: profile_id.clone(),
1026 bucket: Some(bucket.clone()),
1027 prefix: Some(prefix.clone()),
1028 key: Some(ObjectKey::new(key.clone())),
1029 };
1030
1031 let registry = locks.inner();
1032 let lock_id = registry.acquire(scope, "object_set_metadata", 300, now)?;
1033 let lock = registry
1034 .list(None)
1035 .into_iter()
1036 .find(|l| l.id == lock_id)
1037 .expect("lock must exist right after acquire");
1038 let _ = emit_acquired(&channel, &lock);
1039
1040 let client = pool
1044 .inner
1045 .get_or_build(&profile_id, &default_region)
1046 .await
1047 .ok_or_else(|| AppError::Internal {
1048 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1049 })?;
1050
1051 let result = s3_set_metadata(&client, bucket.as_str(), &key, metadata, if_match_etag).await;
1052
1053 let release_reason = if result.is_ok() {
1057 ReleaseReason::Success
1058 } else {
1059 ReleaseReason::Failure
1060 };
1061 if let Ok(lock) = registry.release(&lock_id) {
1062 let _ = emit_released(&channel, &lock, release_reason);
1063 }
1064
1065 let put_result = result?;
1066
1067 let cache_arc: CacheHandle = (*cache).clone();
1071 on_object_mutation(&profile_id, &bucket, &prefix, None, &cache_arc);
1072 let _ = emit_objects_updated(&channel, &profile_id, &bucket, &prefix);
1073
1074 Ok(put_result)
1075}
1076
1077#[tauri::command]
1089pub async fn object_set_tags(
1090 profile_id: ProfileId,
1091 bucket: BucketId,
1092 key: String,
1093 tags: HashMap<String, String>,
1094 if_match_etag: Option<String>,
1095 store: State<'_, ProfileStoreHandle>,
1096 pool: State<'_, S3ClientPoolHandle>,
1097 locks: State<'_, LockRegistryHandle>,
1098 cache: State<'_, CacheHandle>,
1099 channel: AppHandle,
1100) -> Result<PutResult, AppError> {
1101 let profile = {
1105 let store_guard = store.inner.lock().await;
1106 store_guard
1107 .get(&profile_id)
1108 .ok_or_else(|| AppError::NotFound {
1109 resource: format!("profile:{}", profile_id.as_str()),
1110 })?
1111 };
1112
1113 if profile.validated_at.is_none() {
1114 return Err(AppError::Auth {
1115 reason: "profile_not_validated_in_session".to_string(),
1116 });
1117 }
1118
1119 let default_region = profile
1120 .default_region
1121 .clone()
1122 .unwrap_or_else(|| "us-east-1".to_string());
1123
1124 let prefix = parent_prefix(&key);
1128
1129 let now = std::time::SystemTime::now()
1130 .duration_since(std::time::UNIX_EPOCH)
1131 .unwrap_or_default()
1132 .as_secs() as i64;
1133
1134 let scope = LockScope {
1135 profile: profile_id.clone(),
1136 bucket: Some(bucket.clone()),
1137 prefix: Some(prefix.clone()),
1138 key: Some(ObjectKey::new(key.clone())),
1139 };
1140
1141 let registry = locks.inner();
1142 let lock_id = registry.acquire(scope, "object_set_tags", 300, now)?;
1143 let lock = registry
1144 .list(None)
1145 .into_iter()
1146 .find(|l| l.id == lock_id)
1147 .expect("lock must exist right after acquire");
1148 let _ = emit_acquired(&channel, &lock);
1149
1150 let client = pool
1154 .inner
1155 .get_or_build(&profile_id, &default_region)
1156 .await
1157 .ok_or_else(|| AppError::Internal {
1158 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1159 })?;
1160
1161 let result = s3_set_tags(&client, bucket.as_str(), &key, tags, if_match_etag).await;
1162
1163 let release_reason = if result.is_ok() {
1167 ReleaseReason::Success
1168 } else {
1169 ReleaseReason::Failure
1170 };
1171 if let Ok(lock) = registry.release(&lock_id) {
1172 let _ = emit_released(&channel, &lock, release_reason);
1173 }
1174
1175 let put_result = result?;
1176
1177 let cache_arc: CacheHandle = (*cache).clone();
1181 on_object_mutation(&profile_id, &bucket, &prefix, None, &cache_arc);
1182 let _ = emit_objects_updated(&channel, &profile_id, &bucket, &prefix);
1183
1184 Ok(put_result)
1185}
1186
1187#[tauri::command]
1209pub async fn object_presign(
1210 profile_id: ProfileId,
1211 bucket: BucketId,
1212 key: String,
1213 expires_sec: Option<u64>,
1214 store: State<'_, ProfileStoreHandle>,
1215 pool: State<'_, S3ClientPoolHandle>,
1216) -> Result<PresignedUrl, AppError> {
1217 const DEFAULT_EXPIRES_SECS: u64 = 3_600;
1221 let expires_secs = expires_sec.unwrap_or(DEFAULT_EXPIRES_SECS);
1222
1223 let profile = {
1227 let store_guard = store.inner.lock().await;
1228 store_guard
1229 .get(&profile_id)
1230 .ok_or_else(|| AppError::NotFound {
1231 resource: format!("profile:{}", profile_id.as_str()),
1232 })?
1233 };
1234
1235 if profile.validated_at.is_none() {
1236 return Err(AppError::Auth {
1237 reason: "profile_not_validated_in_session".to_string(),
1238 });
1239 }
1240
1241 let default_region = profile
1242 .default_region
1243 .clone()
1244 .unwrap_or_else(|| "us-east-1".to_string());
1245
1246 let client = pool
1250 .inner
1251 .get_or_build(&profile_id, &default_region)
1252 .await
1253 .ok_or_else(|| AppError::Internal {
1254 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1255 })?;
1256
1257 presign_get_object(&client, bucket.as_str(), &key, expires_secs).await
1261}
1262
1263#[tauri::command]
1296pub async fn object_set_storage_class(
1297 profile_id: ProfileId,
1298 targets: Vec<ObjectRef>,
1299 new_storage_class: String,
1300 confirmed_diff_id: crate::diff::DiffId,
1301 store: State<'_, ProfileStoreHandle>,
1302 pool: State<'_, S3ClientPoolHandle>,
1303 locks: State<'_, LockRegistryHandle>,
1304 cache: State<'_, CacheHandle>,
1305 channel: AppHandle,
1306 diff_store: State<'_, crate::diff::DiffStoreHandle>,
1307) -> Result<Vec<crate::s3::metadata::PutResult>, AppError> {
1308 let consumed_payload = diff_store
1312 .inner
1313 .consume(&confirmed_diff_id)
1314 .ok_or_else(|| {
1315 let record = diff_store.inner.get(&confirmed_diff_id);
1316 match record {
1317 Some(r) => match r.status {
1318 crate::diff::DiffStatus::Cancelled | crate::diff::DiffStatus::Expired => {
1319 AppError::Validation {
1320 field: "confirmed_diff_id".to_string(),
1321 hint: "Diff was cancelled or expired".to_string(),
1322 }
1323 }
1324 crate::diff::DiffStatus::Confirmed => AppError::Validation {
1325 field: "confirmed_diff_id".to_string(),
1326 hint: "Diff already consumed (double-confirm rejection)".to_string(),
1327 },
1328 crate::diff::DiffStatus::Pending => AppError::Internal {
1329 trace_id: "diff_consume_failed_unexpectedly".to_string(),
1330 },
1331 },
1332 None => AppError::Validation {
1333 field: "confirmed_diff_id".to_string(),
1334 hint: "Diff not found".to_string(),
1335 },
1336 }
1337 })?;
1338
1339 let crate::diff::DiffPayload::StorageClass {
1345 targets: diff_targets,
1346 new_class: diff_new_class,
1347 ..
1348 } = &consumed_payload;
1349 if *diff_new_class != new_storage_class {
1351 return Err(AppError::Validation {
1352 field: "new_storage_class".to_string(),
1353 hint: format!(
1354 "Requested class '{}' does not match diff's class '{}'",
1355 new_storage_class, diff_new_class
1356 ),
1357 });
1358 }
1359 if diff_targets.len() != targets.len()
1361 || !targets.iter().all(|t| {
1362 diff_targets
1363 .iter()
1364 .any(|dt| dt.bucket.as_str() == t.bucket.as_str() && dt.key == t.key)
1365 })
1366 {
1367 return Err(AppError::Validation {
1368 field: "targets".to_string(),
1369 hint: "Requested targets do not match the diff's targets".to_string(),
1370 });
1371 }
1372
1373 let profile = {
1377 let store_guard = store.inner.lock().await;
1378 store_guard
1379 .get(&profile_id)
1380 .ok_or_else(|| AppError::NotFound {
1381 resource: format!("profile:{}", profile_id.as_str()),
1382 })?
1383 };
1384
1385 if profile.validated_at.is_none() {
1386 return Err(AppError::Auth {
1387 reason: "profile_not_validated_in_session".to_string(),
1388 });
1389 }
1390
1391 let default_region = profile
1392 .default_region
1393 .clone()
1394 .unwrap_or_else(|| "us-east-1".to_string());
1395
1396 let client = pool
1397 .inner
1398 .get_or_build(&profile_id, &default_region)
1399 .await
1400 .ok_or_else(|| AppError::Internal {
1401 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1402 })?;
1403
1404 let now = std::time::SystemTime::now()
1408 .duration_since(std::time::UNIX_EPOCH)
1409 .unwrap_or_default()
1410 .as_secs() as i64;
1411
1412 let registry = locks.inner();
1413 let cache_arc: CacheHandle = (*cache).clone();
1414 let mut results: Vec<crate::s3::metadata::PutResult> = Vec::with_capacity(targets.len());
1415
1416 for target in &targets {
1417 let prefix = parent_prefix(&target.key);
1418
1419 let scope = LockScope {
1420 profile: profile_id.clone(),
1421 bucket: Some(target.bucket.clone()),
1422 prefix: Some(prefix.clone()),
1423 key: Some(ObjectKey::new(target.key.clone())),
1424 };
1425
1426 let lock_id = registry.acquire(scope, "object_set_storage_class", 300, now)?;
1427 let lock = registry
1428 .list(None)
1429 .into_iter()
1430 .find(|l| l.id == lock_id)
1431 .expect("lock must exist right after acquire");
1432 let _ = emit_acquired(&channel, &lock);
1433
1434 let result = crate::s3::object::set_object_storage_class(
1435 &client,
1436 target.bucket.as_str(),
1437 &target.key,
1438 new_storage_class.clone(),
1439 )
1440 .await;
1441
1442 let release_reason = if result.is_ok() {
1443 ReleaseReason::Success
1444 } else {
1445 ReleaseReason::Failure
1446 };
1447 if let Ok(lock) = registry.release(&lock_id) {
1448 let _ = emit_released(&channel, &lock, release_reason);
1449 }
1450
1451 let put_result = result?;
1452
1453 on_object_mutation(&profile_id, &target.bucket, &prefix, None, &cache_arc);
1455 let _ = emit_objects_updated(&channel, &profile_id, &target.bucket, &prefix);
1456
1457 results.push(put_result);
1458 }
1459
1460 Ok(results)
1461}
1462
1463#[tauri::command]
1485pub async fn object_get_text(
1486 profile_id: ProfileId,
1487 bucket: String,
1488 key: String,
1489 max_bytes: Option<u64>,
1490 store: State<'_, ProfileStoreHandle>,
1491 pool: State<'_, S3ClientPoolHandle>,
1492) -> Result<TextPayload, AppError> {
1493 let (validated, region_override, default_region) = {
1497 let store_guard = store.inner.lock().await;
1498 let profile = store_guard
1499 .get(&profile_id)
1500 .ok_or_else(|| AppError::NotFound {
1501 resource: format!("profile:{}", profile_id.as_str()),
1502 })?;
1503 (
1504 profile.validated_at.is_some(),
1505 profile.compat_flags.region_override.clone(),
1506 profile.default_region.clone(),
1507 )
1508 };
1509
1510 if !validated {
1511 return Err(AppError::Auth {
1512 reason: "profile_not_validated_in_session".to_string(),
1513 });
1514 }
1515
1516 let region = region_override
1517 .or(default_region)
1518 .unwrap_or_else(|| "us-east-1".to_string());
1519
1520 let client = pool
1524 .inner
1525 .get_or_build(&profile_id, ®ion)
1526 .await
1527 .ok_or_else(|| AppError::Internal {
1528 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1529 })?;
1530
1531 let limit = max_bytes.unwrap_or(DEFAULT_TEXT_MAX_BYTES);
1535 s3_get_object_text(&client, &bucket, &key, limit).await
1536}
1537
1538#[tauri::command]
1556pub async fn object_get_bytes(
1557 profile_id: ProfileId,
1558 bucket: String,
1559 key: String,
1560 max_bytes: Option<u64>,
1561 store: State<'_, ProfileStoreHandle>,
1562 pool: State<'_, S3ClientPoolHandle>,
1563) -> Result<BytesPayload, AppError> {
1564 let (validated, region_override, default_region) = {
1568 let store_guard = store.inner.lock().await;
1569 let profile = store_guard
1570 .get(&profile_id)
1571 .ok_or_else(|| AppError::NotFound {
1572 resource: format!("profile:{}", profile_id.as_str()),
1573 })?;
1574 (
1575 profile.validated_at.is_some(),
1576 profile.compat_flags.region_override.clone(),
1577 profile.default_region.clone(),
1578 )
1579 };
1580
1581 if !validated {
1582 return Err(AppError::Auth {
1583 reason: "profile_not_validated_in_session".to_string(),
1584 });
1585 }
1586
1587 let region = region_override
1588 .or(default_region)
1589 .unwrap_or_else(|| "us-east-1".to_string());
1590
1591 let client = pool
1595 .inner
1596 .get_or_build(&profile_id, ®ion)
1597 .await
1598 .ok_or_else(|| AppError::Internal {
1599 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1600 })?;
1601
1602 let limit = max_bytes.unwrap_or(DEFAULT_BYTES_MAX_BYTES);
1606 s3_get_object_bytes(&client, &bucket, &key, limit).await
1607}
1608
1609#[tauri::command]
1631pub async fn object_put_text(
1632 profile_id: ProfileId,
1633 bucket: String,
1634 key: String,
1635 body: String,
1636 if_match_etag: Option<String>,
1637 store: State<'_, ProfileStoreHandle>,
1638 pool: State<'_, S3ClientPoolHandle>,
1639 locks: State<'_, LockRegistryHandle>,
1640 cache: State<'_, CacheHandle>,
1641 channel: AppHandle,
1642) -> Result<PutResult, AppError> {
1643 let (validated, region_override, default_region) = {
1647 let store_guard = store.inner.lock().await;
1648 let profile = store_guard
1649 .get(&profile_id)
1650 .ok_or_else(|| AppError::NotFound {
1651 resource: format!("profile:{}", profile_id.as_str()),
1652 })?;
1653 (
1654 profile.validated_at.is_some(),
1655 profile.compat_flags.region_override.clone(),
1656 profile.default_region.clone(),
1657 )
1658 };
1659
1660 if !validated {
1661 return Err(AppError::Auth {
1662 reason: "profile_not_validated_in_session".to_string(),
1663 });
1664 }
1665
1666 let region = region_override
1667 .or(default_region)
1668 .unwrap_or_else(|| "us-east-1".to_string());
1669
1670 let prefix = parent_prefix(&key);
1674
1675 let now = std::time::SystemTime::now()
1676 .duration_since(std::time::UNIX_EPOCH)
1677 .unwrap_or_default()
1678 .as_secs() as i64;
1679
1680 let bucket_id = BucketId::new(bucket.clone());
1681
1682 let scope = LockScope {
1683 profile: profile_id.clone(),
1684 bucket: Some(bucket_id.clone()),
1685 prefix: Some(prefix.clone()),
1686 key: Some(ObjectKey::new(key.clone())),
1687 };
1688
1689 let registry = locks.inner();
1690 let lock_id = registry.acquire(scope, "object_put_text", 300, now)?;
1691 let lock = registry
1692 .list(None)
1693 .into_iter()
1694 .find(|l| l.id == lock_id)
1695 .expect("lock must exist right after acquire");
1696 let _ = emit_acquired(&channel, &lock);
1697
1698 let client = pool
1702 .inner
1703 .get_or_build(&profile_id, ®ion)
1704 .await
1705 .ok_or_else(|| AppError::Internal {
1706 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1707 })?;
1708
1709 let result = s3_put_object_text(&client, &bucket, &key, body, if_match_etag).await;
1710
1711 let release_reason = if result.is_ok() {
1715 ReleaseReason::Success
1716 } else {
1717 ReleaseReason::Failure
1718 };
1719 if let Ok(lock) = registry.release(&lock_id) {
1720 let _ = emit_released(&channel, &lock, release_reason);
1721 }
1722
1723 let put_result = result?;
1724
1725 let cache_arc: CacheHandle = (*cache).clone();
1729 on_object_mutation(&profile_id, &bucket_id, &prefix, None, &cache_arc);
1730 let _ = emit_objects_updated(&channel, &profile_id, &bucket_id, &prefix);
1731
1732 Ok(put_result)
1733}
1734
1735#[cfg(test)]
1740mod tests {
1741 use super::*;
1742 use crate::{
1743 cache::{store::CacheStore, CacheConfig},
1744 ids::BucketId,
1745 s3::list::{ListPage, ObjectEntry},
1746 };
1747
1748 fn validated_at() -> Option<i64> {
1749 Some(1_700_000_000_000)
1750 }
1751
1752 fn make_page(prefix: &str, flat: bool) -> ListPage {
1753 ListPage {
1754 entries: vec![ObjectEntry {
1755 key: format!("{prefix}file.txt"),
1756 size: 42,
1757 last_modified: None,
1758 etag: None,
1759 storage_class: None,
1760 is_prefix: false,
1761 }],
1762 common_prefixes: if flat {
1763 vec![]
1764 } else {
1765 vec![format!("{prefix}subdir/")]
1766 },
1767 next_continuation_token: None,
1768 is_truncated: false,
1769 prefix: prefix.to_string(),
1770 delimiter: if flat { None } else { Some("/".to_string()) },
1771 }
1772 }
1773
1774 #[test]
1779 fn unvalidated_profile_gate_returns_auth_error() {
1780 let validated_at: Option<i64> = None;
1781 let result: Result<(), AppError> = if validated_at.is_none() {
1782 Err(AppError::Auth {
1783 reason: "profile_not_validated_in_session".to_string(),
1784 })
1785 } else {
1786 Ok(())
1787 };
1788
1789 match result {
1790 Err(AppError::Auth { reason }) => {
1791 assert_eq!(reason, "profile_not_validated_in_session");
1792 }
1793 _ => panic!("expected Auth error for unvalidated profile"),
1794 }
1795 }
1796
1797 #[test]
1802 fn cache_hit_on_first_page_returns_cached_value() {
1803 let pid = ProfileId::new("p-objects");
1804 let bid = BucketId::new("my-bucket");
1805 let prefix = "photos/".to_string();
1806 let cache = CacheStore::in_memory(CacheConfig::default());
1807
1808 let expected = make_page(&prefix, false);
1809 let cache_key = CacheKey::Objects {
1810 profile: pid.clone(),
1811 bucket: bid.clone(),
1812 prefix: prefix.clone(),
1813 };
1814 cache
1815 .put(&cache_key, &expected, None)
1816 .expect("put must succeed");
1817
1818 let read = cache
1819 .get::<ListPage>(&cache_key, validated_at())
1820 .expect("get must not error")
1821 .expect("entry must exist");
1822
1823 assert_eq!(read.freshness, Freshness::Fresh);
1824 assert_eq!(read.value.prefix, prefix);
1825 assert_eq!(read.value.entries.len(), 1);
1826 }
1827
1828 #[test]
1833 fn cache_miss_on_continuation_token_bypasses_cache() {
1834 let pid = ProfileId::new("p-paged");
1841 let bid = BucketId::new("paged-bucket");
1842 let prefix = "data/".to_string();
1843 let cache = CacheStore::in_memory(CacheConfig::default());
1844
1845 let page = make_page(&prefix, false);
1846 let cache_key = CacheKey::Objects {
1847 profile: pid.clone(),
1848 bucket: bid.clone(),
1849 prefix: prefix.clone(),
1850 };
1851 cache.put(&cache_key, &page, None).unwrap();
1852
1853 let has_token = true; let is_first_page = !has_token;
1858
1859 let result = if is_first_page {
1861 cache.get::<ListPage>(&cache_key, validated_at()).unwrap()
1862 } else {
1863 None };
1865
1866 assert!(
1867 result.is_none(),
1868 "cache must be bypassed when continuation_token is present"
1869 );
1870 }
1871
1872 #[test]
1877 fn flat_and_hierarchical_cache_keys_do_not_collide() {
1878 let pid = ProfileId::new("p-flat");
1879 let bid = BucketId::new("flat-bucket");
1880 let prefix = "logs/".to_string();
1881 let flat_prefix = format!("{}__FLAT__", prefix);
1882
1883 let key_hier = CacheKey::Objects {
1884 profile: pid.clone(),
1885 bucket: bid.clone(),
1886 prefix: prefix.clone(),
1887 };
1888 let key_flat = CacheKey::Objects {
1889 profile: pid.clone(),
1890 bucket: bid.clone(),
1891 prefix: flat_prefix,
1892 };
1893
1894 assert_ne!(
1895 key_hier.serialize_key(),
1896 key_flat.serialize_key(),
1897 "flat and hierarchical cache keys must differ"
1898 );
1899 }
1900
1901 #[test]
1906 fn unvalidated_profile_cannot_read_objects_from_cache() {
1907 let pid = ProfileId::new("p-unval-objects");
1908 let bid = BucketId::new("secret-bucket");
1909 let cache = CacheStore::in_memory(CacheConfig::default());
1910
1911 let page = make_page("secret/", false);
1912 let cache_key = CacheKey::Objects {
1913 profile: pid.clone(),
1914 bucket: bid.clone(),
1915 prefix: "secret/".to_string(),
1916 };
1917 cache.put(&cache_key, &page, None).unwrap();
1918
1919 let result = cache.get::<ListPage>(&cache_key, None).unwrap();
1921 assert!(
1922 result.is_none(),
1923 "unvalidated profile must not read objects from cache"
1924 );
1925 }
1926
1927 #[test]
1932 fn object_ref_serialises_camel_case() {
1933 let r = ObjectRef {
1934 bucket: BucketId::new("my-bucket"),
1935 key: "folder/file.txt".to_string(),
1936 };
1937 let v = serde_json::to_value(&r).unwrap();
1938 assert_eq!(v["bucket"], "my-bucket");
1939 assert_eq!(v["key"], "folder/file.txt");
1940 }
1941
1942 #[test]
1948 fn emit_objects_updated_emits_correct_payload() {
1949 use crate::events::{EventKind, MockChannel};
1950
1951 let channel = MockChannel::default();
1952 let pid = ProfileId::new("p1");
1953 let bid = BucketId::new("my-bucket");
1954
1955 emit_objects_updated(&channel, &pid, &bid, "photos/2024/").unwrap();
1956
1957 let emitted = channel.emitted();
1958 assert_eq!(emitted.len(), 1);
1959 assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
1960 assert_eq!(emitted[0].1["profileId"], "p1");
1961 assert_eq!(emitted[0].1["bucket"], "my-bucket");
1962 assert_eq!(emitted[0].1["prefix"], "photos/2024/");
1963 }
1964
1965 #[test]
1966 fn emit_objects_updated_root_prefix() {
1967 use crate::events::{EventKind, MockChannel};
1968
1969 let channel = MockChannel::default();
1970 let pid = ProfileId::new("p2");
1971 let bid = BucketId::new("root-bucket");
1972
1973 emit_objects_updated(&channel, &pid, &bid, "").unwrap();
1974
1975 let emitted = channel.emitted();
1976 assert_eq!(emitted.len(), 1);
1977 assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
1978 assert_eq!(emitted[0].1["prefix"], "");
1979 }
1980
1981 #[test]
1982 fn emit_objects_updated_two_different_prefixes() {
1983 use crate::events::{EventKind, MockChannel};
1984
1985 let channel = MockChannel::default();
1986 let pid = ProfileId::new("p3");
1987 let src = BucketId::new("src-bucket");
1988 let dst = BucketId::new("dst-bucket");
1989
1990 emit_objects_updated(&channel, &pid, &src, "old/path/").unwrap();
1992 emit_objects_updated(&channel, &pid, &dst, "new/path/").unwrap();
1993
1994 let emitted = channel.emitted();
1995 assert_eq!(emitted.len(), 2);
1996 assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
1997 assert_eq!(emitted[0].1["bucket"], "src-bucket");
1998 assert_eq!(emitted[0].1["prefix"], "old/path/");
1999 assert_eq!(emitted[1].1["bucket"], "dst-bucket");
2000 assert_eq!(emitted[1].1["prefix"], "new/path/");
2001 }
2002
2003 #[test]
2008 fn objects_updated_payload_serialises_camel_case() {
2009 let payload = ObjectsUpdatedPayload {
2010 profile_id: ProfileId::new("prof"),
2011 bucket: BucketId::new("bkt"),
2012 prefix: "a/b/".to_string(),
2013 };
2014 let v = serde_json::to_value(&payload).unwrap();
2015 assert_eq!(v["profileId"], "prof");
2016 assert_eq!(v["bucket"], "bkt");
2017 assert_eq!(v["prefix"], "a/b/");
2018 }
2019
2020 #[test]
2025 fn delete_key_with_version_id_serialises_camel_case() {
2026 let dk = DeleteKey {
2027 key: "photos/img.jpg".to_string(),
2028 version_id: Some("vid-abc".to_string()),
2029 };
2030 let v = serde_json::to_value(&dk).unwrap();
2031 assert_eq!(v["key"], "photos/img.jpg");
2032 assert_eq!(v["versionId"], "vid-abc");
2033 }
2034
2035 #[test]
2036 fn delete_key_without_version_id_skips_field() {
2037 let dk = DeleteKey {
2038 key: "file.txt".to_string(),
2039 version_id: None,
2040 };
2041 let v = serde_json::to_value(&dk).unwrap();
2042 assert_eq!(v["key"], "file.txt");
2043 assert!(!v.as_object().unwrap().contains_key("versionId"));
2044 }
2045
2046 #[test]
2047 fn delete_key_deserialises_from_camel_case() {
2048 let json = r#"{"key":"a/b.txt","versionId":"v123"}"#;
2049 let dk: DeleteKey = serde_json::from_str(json).unwrap();
2050 assert_eq!(dk.key, "a/b.txt");
2051 assert_eq!(dk.version_id.as_deref(), Some("v123"));
2052 }
2053
2054 #[test]
2055 fn delete_key_deserialises_without_version_id() {
2056 let json = r#"{"key":"a/b.txt"}"#;
2057 let dk: DeleteKey = serde_json::from_str(json).unwrap();
2058 assert_eq!(dk.key, "a/b.txt");
2059 assert!(dk.version_id.is_none());
2060 }
2061
2062 #[test]
2067 fn emit_objects_updated_multi_prefix_fires_once_per_prefix() {
2068 use crate::events::{EventKind, MockChannel};
2069 use std::collections::BTreeSet;
2070
2071 let channel = MockChannel::default();
2072 let pid = ProfileId::new("p-delete-batch");
2073 let bid = BucketId::new("my-bucket");
2074
2075 let deleted_keys = [
2078 "photos/img1.jpg",
2079 "photos/img2.jpg",
2080 "docs/report.pdf",
2081 "docs/slides.pptx",
2082 ];
2083
2084 let affected_prefixes: BTreeSet<String> = deleted_keys
2085 .iter()
2086 .map(|k| crate::s3::object::parent_prefix(k))
2087 .collect();
2088
2089 for prefix in &affected_prefixes {
2090 emit_objects_updated(&channel, &pid, &bid, prefix).unwrap();
2091 }
2092
2093 let emitted = channel.emitted();
2094 assert_eq!(
2096 emitted.len(),
2097 2,
2098 "must emit once per unique affected prefix"
2099 );
2100
2101 assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2103 assert_eq!(emitted[0].1["prefix"], "docs/");
2104 assert_eq!(emitted[1].1["prefix"], "photos/");
2105 }
2106
2107 #[test]
2108 fn emit_objects_updated_root_prefix_only_once_for_root_keys() {
2109 use crate::events::{EventKind, MockChannel};
2110 use std::collections::BTreeSet;
2111
2112 let channel = MockChannel::default();
2113 let pid = ProfileId::new("p-delete-root");
2114 let bid = BucketId::new("root-bucket");
2115
2116 let deleted_keys = ["file1.txt", "file2.txt"];
2118 let affected_prefixes: BTreeSet<String> = deleted_keys
2119 .iter()
2120 .map(|k| crate::s3::object::parent_prefix(k))
2121 .collect();
2122
2123 for prefix in &affected_prefixes {
2124 emit_objects_updated(&channel, &pid, &bid, prefix).unwrap();
2125 }
2126
2127 let emitted = channel.emitted();
2128 assert_eq!(
2129 emitted.len(),
2130 1,
2131 "two root-level keys share the same prefix"
2132 );
2133 assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2134 assert_eq!(emitted[0].1["prefix"], "");
2135 }
2136
2137 #[test]
2142 fn put_result_serialises_camel_case() {
2143 let r = crate::s3::metadata::PutResult {
2144 etag: Some("abc123".to_string()),
2145 last_modified: Some(1_700_000_000_000),
2146 version_id: Some("v1".to_string()),
2147 };
2148 let v = serde_json::to_value(&r).unwrap();
2149 assert_eq!(v["etag"], "abc123");
2150 assert_eq!(v["lastModified"], 1_700_000_000_000_i64);
2151 assert_eq!(v["versionId"], "v1");
2152 }
2153
2154 #[test]
2155 fn put_result_skips_none_fields() {
2156 let r = crate::s3::metadata::PutResult {
2157 etag: None,
2158 last_modified: None,
2159 version_id: None,
2160 };
2161 let v = serde_json::to_value(&r).unwrap();
2162 assert!(!v.as_object().unwrap().contains_key("etag"));
2163 assert!(!v.as_object().unwrap().contains_key("lastModified"));
2164 assert!(!v.as_object().unwrap().contains_key("versionId"));
2165 }
2166
2167 #[test]
2172 fn set_metadata_emit_objects_updated_correct_prefix() {
2173 use crate::events::{EventKind, MockChannel};
2174
2175 let channel = MockChannel::default();
2176 let pid = ProfileId::new("p-meta");
2177 let bid = BucketId::new("my-bucket");
2178
2179 let key = "reports/2024/annual.pdf";
2181 let prefix = crate::s3::object::parent_prefix(key);
2182
2183 emit_objects_updated(&channel, &pid, &bid, &prefix).unwrap();
2184
2185 let emitted = channel.emitted();
2186 assert_eq!(emitted.len(), 1);
2187 assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2188 assert_eq!(emitted[0].1["prefix"], "reports/2024/");
2189 }
2190
2191 #[test]
2196 fn set_tags_emit_objects_updated_correct_prefix() {
2197 use crate::events::{EventKind, MockChannel};
2198
2199 let channel = MockChannel::default();
2200 let pid = ProfileId::new("p-tags");
2201 let bid = BucketId::new("tagged-bucket");
2202
2203 let key = "logs/app/server.log";
2205 let prefix = crate::s3::object::parent_prefix(key);
2206
2207 emit_objects_updated(&channel, &pid, &bid, &prefix).unwrap();
2208
2209 let emitted = channel.emitted();
2210 assert_eq!(emitted.len(), 1);
2211 assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2212 assert_eq!(emitted[0].1["prefix"], "logs/app/");
2213 }
2214
2215 #[test]
2220 fn conflict_error_serialises_expected_etag() {
2221 let err = AppError::Conflict {
2222 etag_expected: "\"etag-original\"".to_string(),
2223 etag_actual: Some("\"etag-modified\"".to_string()),
2224 };
2225 let v = serde_json::to_value(&err).unwrap();
2226 assert_eq!(v["kind"], "Conflict");
2227 assert_eq!(v["details"]["etagExpected"], "\"etag-original\"");
2228 assert_eq!(v["details"]["etagActual"], "\"etag-modified\"");
2229 }
2230
2231 #[test]
2232 fn conflict_error_without_actual_etag() {
2233 let err = AppError::Conflict {
2234 etag_expected: "\"abc\"".to_string(),
2235 etag_actual: None,
2236 };
2237 let v = serde_json::to_value(&err).unwrap();
2238 assert_eq!(v["kind"], "Conflict");
2239 assert!(v["details"]["etagActual"].is_null());
2240 }
2241
2242 #[test]
2247 fn presign_expires_below_minimum_returns_validation_error() {
2248 let result = crate::s3::presign::presign_get_object_validate_only(1);
2250 match result {
2251 Err(AppError::Validation { field, hint }) => {
2252 assert_eq!(field, "expires_secs");
2253 assert!(
2254 hint.contains("60"),
2255 "hint should mention the 60-second minimum: {hint}"
2256 );
2257 }
2258 other => panic!("expected Validation error, got {:?}", other),
2259 }
2260 }
2261
2262 #[test]
2263 fn presign_expires_at_minimum_is_ok() {
2264 assert!(crate::s3::presign::presign_get_object_validate_only(
2265 crate::s3::presign::MIN_EXPIRES_SECS
2266 )
2267 .is_ok());
2268 }
2269
2270 #[test]
2271 fn presign_expires_one_hour_is_ok() {
2272 assert!(crate::s3::presign::presign_get_object_validate_only(3_600).is_ok());
2273 }
2274
2275 #[test]
2276 fn presign_expires_at_maximum_is_ok() {
2277 assert!(crate::s3::presign::presign_get_object_validate_only(
2278 crate::s3::presign::MAX_EXPIRES_SECS
2279 )
2280 .is_ok());
2281 }
2282
2283 #[test]
2284 fn presign_expires_above_maximum_returns_validation_error() {
2285 let result = crate::s3::presign::presign_get_object_validate_only(
2286 crate::s3::presign::MAX_EXPIRES_SECS + 1,
2287 );
2288 match result {
2289 Err(AppError::Validation { field, hint }) => {
2290 assert_eq!(field, "expires_secs");
2291 assert!(
2292 hint.contains("604800") || hint.contains("7 days"),
2293 "hint should mention the 7-day maximum: {hint}"
2294 );
2295 }
2296 other => panic!("expected Validation error, got {:?}", other),
2297 }
2298 }
2299
2300 #[test]
2305 fn presigned_url_ipc_shape_is_camel_case() {
2306 let p = crate::s3::presign::PresignedUrl {
2307 url: "https://example.com/bucket/key?X-Amz-Signature=sig".to_string(),
2308 expires_at: 1_700_000_000_000,
2309 };
2310 let v = serde_json::to_value(&p).unwrap();
2311 assert!(v.as_object().unwrap().contains_key("url"));
2312 assert!(v.as_object().unwrap().contains_key("expiresAt"));
2313 assert_eq!(
2314 v["url"],
2315 "https://example.com/bucket/key?X-Amz-Signature=sig"
2316 );
2317 assert_eq!(v["expiresAt"], 1_700_000_000_000_i64);
2318 }
2319
2320 #[test]
2328 fn diff_consume_cancelled_diff_returns_validation_hint() {
2329 use crate::diff::{DiffPayload, DiffStore, DiffStoreHandle};
2330 use std::collections::HashMap;
2331
2332 let diff_store = DiffStoreHandle::new(DiffStore::new());
2333 let p = DiffPayload::StorageClass {
2334 targets: vec![],
2335 current: HashMap::new(),
2336 new_class: "GLACIER".to_string(),
2337 };
2338 let id = diff_store.inner.create(p);
2339 diff_store.inner.cancel(&id).unwrap();
2340
2341 let consumed = diff_store.inner.consume(&id);
2343 assert!(consumed.is_none(), "cancelled diff must not be consumable");
2344
2345 let record = diff_store.inner.get(&id).unwrap();
2346 let err = match record.status {
2347 crate::diff::DiffStatus::Cancelled | crate::diff::DiffStatus::Expired => {
2348 AppError::Validation {
2349 field: "confirmed_diff_id".to_string(),
2350 hint: "Diff was cancelled or expired".to_string(),
2351 }
2352 }
2353 crate::diff::DiffStatus::Confirmed => AppError::Validation {
2354 field: "confirmed_diff_id".to_string(),
2355 hint: "Diff already consumed (double-confirm rejection)".to_string(),
2356 },
2357 _ => AppError::Internal {
2358 trace_id: "unexpected".to_string(),
2359 },
2360 };
2361
2362 match err {
2363 AppError::Validation { hint, .. } => {
2364 assert!(hint.contains("cancelled or expired"));
2365 }
2366 other => panic!("expected Validation, got {:?}", other),
2367 }
2368 }
2369
2370 #[test]
2371 fn diff_consume_expired_diff_returns_validation_hint() {
2372 use crate::diff::{DiffPayload, DiffStore, DiffStoreHandle};
2373 use std::collections::HashMap;
2374
2375 let diff_store = DiffStoreHandle::new(DiffStore::with_ttl(10));
2376 let now = 1_000_000_i64;
2377 let p = DiffPayload::StorageClass {
2378 targets: vec![],
2379 current: HashMap::new(),
2380 new_class: "GLACIER".to_string(),
2381 };
2382 let id = diff_store.inner.create_at(p, now);
2383
2384 let consumed = diff_store.inner.consume_at(&id, now + 11);
2386 assert!(consumed.is_none(), "expired diff must not be consumable");
2387 }
2388
2389 #[test]
2390 fn diff_double_confirm_returns_already_consumed_hint() {
2391 use crate::diff::{DiffPayload, DiffStore, DiffStoreHandle};
2392 use std::collections::HashMap;
2393
2394 let diff_store = DiffStoreHandle::new(DiffStore::new());
2395 let p = DiffPayload::StorageClass {
2396 targets: vec![],
2397 current: HashMap::new(),
2398 new_class: "GLACIER".to_string(),
2399 };
2400 let id = diff_store.inner.create(p);
2401
2402 let first = diff_store.inner.consume(&id);
2403 assert!(first.is_some(), "first consume must succeed");
2404
2405 let second = diff_store.inner.consume(&id);
2406 assert!(second.is_none(), "double consume must fail");
2407
2408 let record = diff_store.inner.get(&id).unwrap();
2409 match record.status {
2410 crate::diff::DiffStatus::Confirmed => {
2411 }
2413 other => panic!("expected Confirmed status, got {:?}", other),
2414 }
2415 }
2416
2417 #[test]
2423 fn storage_class_change_emit_objects_updated_uses_parent_prefix() {
2424 use crate::events::{EventKind, MockChannel};
2425
2426 let channel = MockChannel::default();
2427 let pid = ProfileId::new("p-sc");
2428 let bid = BucketId::new("sc-bucket");
2429
2430 let key = "archive/2024/data.csv";
2432 let prefix = crate::s3::object::parent_prefix(key);
2433 emit_objects_updated(&channel, &pid, &bid, &prefix).unwrap();
2434
2435 let emitted = channel.emitted();
2436 assert_eq!(emitted.len(), 1);
2437 assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2438 assert_eq!(emitted[0].1["bucket"], "sc-bucket");
2439 assert_eq!(emitted[0].1["prefix"], "archive/2024/");
2440 }
2441}