Skip to main content

brows3r_lib/commands/
objects_cmd.rs

1//! Tauri commands for object listing and mutations.
2//!
3//! # Commands
4//!
5//! - [`objects_list`]          — hierarchical listing (`delimiter="/"`) with SWR
6//!                               cache and validation gate.
7//! - [`objects_list_flat`]     — flat listing (no delimiter) with validation gate.
8//! - [`object_copy`]           — server-side copy with cross-account fallback.
9//! - [`object_move`]           — server-side copy + delete source.
10//! - [`object_create_folder`]  — PUT zero-byte `prefix/` placeholder.
11//! - [`object_delete_batch`]   — batched delete with partial-failure reporting (AC-4).
12//! - [`object_set_metadata`]   — replace user-defined metadata via self-overwrite CopyObject.
13//! - [`object_set_tags`]       — set or clear object tags via PutObjectTagging / DeleteObjectTagging.
14//! - [`object_presign`]        — generate a presigned GetObject URL with configurable expiry.
15//! - [`cross_account_confirm`] — mint a one-time confirmation token for a large cross-account copy.
16//!
17//! # Validation gate (AC-8 / round-1 finding #9)
18//!
19//! All commands refuse to serve any data when `profile.validated_at` is `None`.
20//! The cache itself also enforces this, but the command boundary check is
21//! defence-in-depth: a future refactor of the cache must not silently lift the gate.
22//!
23//! # Caching
24//!
25//! Only first-page requests (no `continuation_token`) are cached, because the
26//! token is derived from the previous page and is therefore not stable across
27//! sessions.  Subsequent pages bypass the cache and always hit S3.
28//!
29//! The flat variant uses a separate cache key — the prefix is suffixed with
30//! `"__FLAT__"` — so hierarchical and flat listings for the same prefix do
31//! not collide in the cache.
32//!
33//! # Mutation → cache invalidation → event
34//!
35//! Every mutating command follows this sequence after a successful S3 call:
36//! 1. Invalidate the affected `CacheKey::Objects` prefix(es).
37//! 2. Emit `objects:updated { profileId, bucket, prefix }` for each affected
38//!    prefix so the frontend's TanStack Query adapter invalidates its local cache.
39
40use tauri::{AppHandle, State};
41
42use std::collections::HashMap;
43
44use crate::{
45    cache::{invalidation::on_object_mutation, store::CacheHandle, CacheKey, Freshness},
46    error::AppError,
47    events::{self, EventKind},
48    ids::{BucketId, ObjectKey, ProfileId},
49    locks::{emit_acquired, emit_released, LockRegistryHandle, LockScope, ReleaseReason},
50    profiles::ProfileStoreHandle,
51    s3::{
52        cross_account::{ConfirmScope, ConfirmationCacheHandle},
53        list::{list_objects, list_objects_flat, ListPage},
54        metadata::{set_object_metadata as s3_set_metadata, PutResult},
55        object::{
56            copy_object_with_fallback as s3_copy_object_with_fallback,
57            create_folder as s3_create_folder, delete_objects_batch as s3_delete_objects_batch,
58            get_object_bytes as s3_get_object_bytes, get_object_text as s3_get_object_text,
59            move_object as s3_move_object, parent_prefix, put_object_text as s3_put_object_text,
60            BytesPayload, CopyOptions, CopyOutcome, DeleteReport, MoveResult, TextPayload,
61            DEFAULT_BYTES_MAX_BYTES, DEFAULT_TEXT_MAX_BYTES,
62        },
63        presign::{presign_get_object, PresignedUrl},
64        tags::set_object_tags as s3_set_tags,
65        S3ClientPoolHandle,
66    },
67};
68use serde::{Deserialize, Serialize};
69
70// ---------------------------------------------------------------------------
71// objects_list — hierarchical listing
72// ---------------------------------------------------------------------------
73
74/// List objects under `prefix` using `delimiter="/"`.
75///
76/// - Only the first page (`continuation_token = None`) is cached.
77/// - Subsequent pages always call S3 directly.
78/// - Refuses to serve data when the profile has not been validated.
79#[tauri::command]
80pub async fn objects_list(
81    profile_id: ProfileId,
82    bucket: BucketId,
83    prefix: String,
84    continuation_token: Option<String>,
85    force: Option<bool>,
86    store: State<'_, ProfileStoreHandle>,
87    pool: State<'_, S3ClientPoolHandle>,
88    cache: State<'_, CacheHandle>,
89    _channel: AppHandle,
90) -> Result<ListPage, AppError> {
91    // ------------------------------------------------------------------
92    // 1. Resolve profile + validation gate
93    // ------------------------------------------------------------------
94    let profile = {
95        let store_guard = store.inner.lock().await;
96        store_guard
97            .get(&profile_id)
98            .ok_or_else(|| AppError::NotFound {
99                resource: format!("profile:{}", profile_id.as_str()),
100            })?
101    };
102
103    // Command-boundary validation gate (defence-in-depth; cache also enforces).
104    if profile.validated_at.is_none() {
105        return Err(AppError::Auth {
106            reason: "profile_not_validated_in_session".to_string(),
107        });
108    }
109
110    let default_region = profile
111        .default_region
112        .clone()
113        .unwrap_or_else(|| "us-east-1".to_string());
114
115    let cache_arc: CacheHandle = (*cache).clone();
116
117    // ------------------------------------------------------------------
118    // 2. Cache check — only for first-page requests
119    // ------------------------------------------------------------------
120    let is_first_page = continuation_token.is_none();
121
122    if is_first_page && force != Some(true) {
123        let cache_key = CacheKey::Objects {
124            profile: profile_id.clone(),
125            bucket: bucket.clone(),
126            prefix: prefix.clone(),
127        };
128
129        let cached = cache_arc.get::<ListPage>(&cache_key, profile.validated_at)?;
130
131        if let Some(read) = cached {
132            if read.freshness == Freshness::Fresh || read.freshness == Freshness::Stale {
133                return Ok(read.value);
134            }
135        }
136    }
137
138    // ------------------------------------------------------------------
139    // 3. S3 fetch
140    // ------------------------------------------------------------------
141    let client = pool
142        .inner
143        .get_or_build(&profile_id, &default_region)
144        .await
145        .ok_or_else(|| AppError::Internal {
146            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
147        })?;
148
149    let page = list_objects(
150        &client,
151        bucket.as_str(),
152        &prefix,
153        Some("/"),
154        continuation_token.as_deref(),
155        None,
156    )
157    .await?;
158
159    // ------------------------------------------------------------------
160    // 4. Cache the result if this was the first page
161    // ------------------------------------------------------------------
162    if is_first_page {
163        let cache_key = CacheKey::Objects {
164            profile: profile_id.clone(),
165            bucket: bucket.clone(),
166            prefix: prefix.clone(),
167        };
168        let _ = cache_arc.put(&cache_key, &page, None);
169    }
170
171    Ok(page)
172}
173
174// ---------------------------------------------------------------------------
175// objects_list_flat — flat listing (no delimiter)
176// ---------------------------------------------------------------------------
177
178/// List all objects under `prefix` without a delimiter (flat key tree).
179///
180/// Uses the same validation gate and first-page caching as `objects_list`,
181/// but with a distinct cache-key suffix (`"__FLAT__"`) to avoid collision.
182#[tauri::command]
183pub async fn objects_list_flat(
184    profile_id: ProfileId,
185    bucket: BucketId,
186    prefix: String,
187    continuation_token: Option<String>,
188    force: Option<bool>,
189    store: State<'_, ProfileStoreHandle>,
190    pool: State<'_, S3ClientPoolHandle>,
191    cache: State<'_, CacheHandle>,
192    _channel: AppHandle,
193) -> Result<ListPage, AppError> {
194    // ------------------------------------------------------------------
195    // 1. Resolve profile + validation gate
196    // ------------------------------------------------------------------
197    let profile = {
198        let store_guard = store.inner.lock().await;
199        store_guard
200            .get(&profile_id)
201            .ok_or_else(|| AppError::NotFound {
202                resource: format!("profile:{}", profile_id.as_str()),
203            })?
204    };
205
206    if profile.validated_at.is_none() {
207        return Err(AppError::Auth {
208            reason: "profile_not_validated_in_session".to_string(),
209        });
210    }
211
212    let default_region = profile
213        .default_region
214        .clone()
215        .unwrap_or_else(|| "us-east-1".to_string());
216
217    let cache_arc: CacheHandle = (*cache).clone();
218
219    // ------------------------------------------------------------------
220    // 2. Cache check — only for first-page requests
221    //
222    // The flat cache key appends "__FLAT__" to the prefix so it does not
223    // collide with the hierarchical listing for the same prefix.
224    // ------------------------------------------------------------------
225    let is_first_page = continuation_token.is_none();
226    let flat_prefix = format!("{}__FLAT__", prefix);
227
228    if is_first_page && force != Some(true) {
229        let cache_key = CacheKey::Objects {
230            profile: profile_id.clone(),
231            bucket: bucket.clone(),
232            prefix: flat_prefix.clone(),
233        };
234
235        let cached = cache_arc.get::<ListPage>(&cache_key, profile.validated_at)?;
236
237        if let Some(read) = cached {
238            if read.freshness == Freshness::Fresh || read.freshness == Freshness::Stale {
239                return Ok(read.value);
240            }
241        }
242    }
243
244    // ------------------------------------------------------------------
245    // 3. S3 fetch
246    // ------------------------------------------------------------------
247    let client = pool
248        .inner
249        .get_or_build(&profile_id, &default_region)
250        .await
251        .ok_or_else(|| AppError::Internal {
252            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
253        })?;
254
255    let page = list_objects_flat(
256        &client,
257        bucket.as_str(),
258        &prefix,
259        continuation_token.as_deref(),
260        None,
261    )
262    .await?;
263
264    // ------------------------------------------------------------------
265    // 4. Cache the first page
266    // ------------------------------------------------------------------
267    if is_first_page {
268        let cache_key = CacheKey::Objects {
269            profile: profile_id.clone(),
270            bucket: bucket.clone(),
271            prefix: flat_prefix,
272        };
273        let _ = cache_arc.put(&cache_key, &page, None);
274    }
275
276    Ok(page)
277}
278
279// ---------------------------------------------------------------------------
280// Shared IPC types for mutation commands
281// ---------------------------------------------------------------------------
282
283/// Reference to a single S3 object: bucket + full key.
284///
285/// Used as both source and destination for copy/move.
286///
287/// OCP: `version_id` can be added later as `Option<String>` with `#[serde(default)]`
288/// without breaking existing call sites.
289#[derive(Debug, Clone, Serialize, Deserialize)]
290#[serde(rename_all = "camelCase")]
291pub struct ObjectRef {
292    pub bucket: BucketId,
293    pub key: String,
294}
295
296/// Payload emitted as `objects:updated` after every mutation.
297#[derive(Debug, Clone, Serialize)]
298#[serde(rename_all = "camelCase")]
299struct ObjectsUpdatedPayload {
300    profile_id: ProfileId,
301    bucket: BucketId,
302    prefix: String,
303}
304
305/// Emit `objects:updated` for one (profile, bucket, prefix) triple.
306fn emit_objects_updated<E: events::EventEmitter>(
307    channel: &E,
308    profile_id: &ProfileId,
309    bucket: &BucketId,
310    prefix: &str,
311) -> Result<(), AppError> {
312    events::emit(
313        channel,
314        EventKind::ObjectsUpdated,
315        ObjectsUpdatedPayload {
316            profile_id: profile_id.clone(),
317            bucket: bucket.clone(),
318            prefix: prefix.to_string(),
319        },
320    )
321}
322
323// ---------------------------------------------------------------------------
324// object_copy
325// ---------------------------------------------------------------------------
326
327/// Copy `source` to `destination` with automatic cross-account fallback.
328///
329/// - Attempts server-side `CopyObject` first.
330/// - On `AccessDenied` (cross-account scenario):
331///   - If the source size is ≤ `fallback_threshold_bytes` (default 100 MiB),
332///     falls back to download+upload automatically.
333///   - If the source size is above the threshold **and** `confirmed_token` is a
334///     valid one-time token minted by `cross_account_confirm`, falls back.
335///   - Otherwise returns `AppError::Validation` asking the frontend to call
336///     `cross_account_confirm` first.
337/// - Returns `CopyOutcome` instead of `CopyResult` so the frontend can show
338///   a "Used fallback" indicator when the fallback path was taken.
339/// - Acquires scoped locks on source and destination prefixes before calling S3.
340/// - Invalidates destination prefix cache and emits `objects:updated` on success.
341/// - Both locks are released (success or failure) before returning.
342#[tauri::command]
343pub async fn object_copy(
344    profile_id: ProfileId,
345    source: ObjectRef,
346    destination: ObjectRef,
347    options: CopyOptions,
348    confirmed_token: Option<String>,
349    store: State<'_, ProfileStoreHandle>,
350    pool: State<'_, S3ClientPoolHandle>,
351    locks: State<'_, LockRegistryHandle>,
352    cache: State<'_, CacheHandle>,
353    confirm_cache: State<'_, ConfirmationCacheHandle>,
354    channel: AppHandle,
355) -> Result<CopyOutcome, AppError> {
356    // ------------------------------------------------------------------
357    // 1. Resolve profile + validation gate
358    // ------------------------------------------------------------------
359    let profile = {
360        let store_guard = store.inner.lock().await;
361        store_guard
362            .get(&profile_id)
363            .ok_or_else(|| AppError::NotFound {
364                resource: format!("profile:{}", profile_id.as_str()),
365            })?
366    };
367
368    if profile.validated_at.is_none() {
369        return Err(AppError::Auth {
370            reason: "profile_not_validated_in_session".to_string(),
371        });
372    }
373
374    let default_region = profile
375        .default_region
376        .clone()
377        .unwrap_or_else(|| "us-east-1".to_string());
378
379    // ------------------------------------------------------------------
380    // 2. Acquire locks on source and destination prefixes
381    // ------------------------------------------------------------------
382    let src_prefix = parent_prefix(&source.key);
383    let dest_prefix = parent_prefix(&destination.key);
384
385    let now = std::time::SystemTime::now()
386        .duration_since(std::time::UNIX_EPOCH)
387        .unwrap_or_default()
388        .as_secs() as i64;
389
390    let src_scope = LockScope {
391        profile: profile_id.clone(),
392        bucket: Some(source.bucket.clone()),
393        prefix: Some(src_prefix.clone()),
394        key: None,
395    };
396    let dest_scope = LockScope {
397        profile: profile_id.clone(),
398        bucket: Some(destination.bucket.clone()),
399        prefix: Some(dest_prefix.clone()),
400        key: None,
401    };
402
403    let registry = locks.inner();
404    let src_lock_id = registry.acquire(src_scope, "object_copy:source", 300, now)?;
405    let src_lock = registry
406        .list(None)
407        .into_iter()
408        .find(|l| l.id == src_lock_id)
409        .expect("lock must exist right after acquire");
410    let _ = emit_acquired(&channel, &src_lock);
411
412    let dest_lock_id = match registry.acquire(dest_scope, "object_copy:dest", 300, now) {
413        Ok(id) => id,
414        Err(e) => {
415            // Release source lock before propagating.
416            if let Ok(lock) = registry.release(&src_lock_id) {
417                let _ = emit_released(&channel, &lock, ReleaseReason::Failure);
418            }
419            return Err(e);
420        }
421    };
422    let dest_lock = registry
423        .list(None)
424        .into_iter()
425        .find(|l| l.id == dest_lock_id)
426        .expect("lock must exist right after acquire");
427    let _ = emit_acquired(&channel, &dest_lock);
428
429    // ------------------------------------------------------------------
430    // 3. S3 copy (with cross-account fallback)
431    // ------------------------------------------------------------------
432    let client = pool
433        .inner
434        .get_or_build(&profile_id, &default_region)
435        .await
436        .ok_or_else(|| AppError::Internal {
437            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
438        })?;
439
440    // Default threshold: 100 MiB.  Future tasks will thread settings through here.
441    const DEFAULT_FALLBACK_THRESHOLD: u64 = 100 * 1024 * 1024;
442
443    let result = s3_copy_object_with_fallback(
444        &client,
445        source.bucket.as_str(),
446        &source.key,
447        destination.bucket.as_str(),
448        &destination.key,
449        &options,
450        DEFAULT_FALLBACK_THRESHOLD,
451        confirmed_token,
452        &confirm_cache.inner,
453        profile_id.as_str(),
454    )
455    .await;
456
457    // ------------------------------------------------------------------
458    // 4. Release locks
459    // ------------------------------------------------------------------
460    let release_reason = if result.is_ok() {
461        ReleaseReason::Success
462    } else {
463        ReleaseReason::Failure
464    };
465
466    if let Ok(lock) = registry.release(&dest_lock_id) {
467        let _ = emit_released(&channel, &lock, release_reason.clone());
468    }
469    if let Ok(lock) = registry.release(&src_lock_id) {
470        let _ = emit_released(&channel, &lock, release_reason);
471    }
472
473    let copy_outcome = result?;
474
475    // ------------------------------------------------------------------
476    // 5. Cache invalidation + event
477    // ------------------------------------------------------------------
478    let cache_arc: CacheHandle = (*cache).clone();
479    on_object_mutation(
480        &profile_id,
481        &destination.bucket,
482        &dest_prefix,
483        None,
484        &cache_arc,
485    );
486    let _ = emit_objects_updated(&channel, &profile_id, &destination.bucket, &dest_prefix);
487
488    Ok(copy_outcome)
489}
490
491// ---------------------------------------------------------------------------
492// cross_account_confirm
493// ---------------------------------------------------------------------------
494
495/// Mint a one-time confirmation token for a large cross-account copy.
496///
497/// The frontend calls this command when `object_copy` returns a
498/// `Validation { field: "confirmed_token" }` error.  The returned token must
499/// be passed back to `object_copy` as `confirmed_token` in the next call.
500///
501/// Tokens are single-use, scoped to exactly the (profile, source, destination)
502/// triple, and expire after 5 minutes.
503///
504/// # Note on "heuristic check"
505///
506/// This command does not re-verify that a cross-account error actually
507/// occurred — the frontend is trusted to call it only after receiving the
508/// `Validation` error from `object_copy`.  The token is harmless if the next
509/// `object_copy` succeeds via the server-side path (which returns
510/// `ServerSideCopy` without consulting the token).
511#[tauri::command]
512pub async fn cross_account_confirm(
513    profile_id: ProfileId,
514    source: ObjectRef,
515    destination: ObjectRef,
516    store: State<'_, ProfileStoreHandle>,
517    confirm_cache: State<'_, ConfirmationCacheHandle>,
518) -> Result<String, AppError> {
519    // Validation gate: profile must exist and be validated.
520    let profile = {
521        let store_guard = store.inner.lock().await;
522        store_guard
523            .get(&profile_id)
524            .ok_or_else(|| AppError::NotFound {
525                resource: format!("profile:{}", profile_id.as_str()),
526            })?
527    };
528
529    if profile.validated_at.is_none() {
530        return Err(AppError::Auth {
531            reason: "profile_not_validated_in_session".to_string(),
532        });
533    }
534
535    let scope = ConfirmScope {
536        profile: profile_id.as_str().to_string(),
537        source_bucket: source.bucket.as_str().to_string(),
538        source_key: source.key.clone(),
539        dest_bucket: destination.bucket.as_str().to_string(),
540        dest_key: destination.key.clone(),
541    };
542
543    let token = confirm_cache.inner.mint(scope);
544    Ok(token)
545}
546
547// ---------------------------------------------------------------------------
548// object_move
549// ---------------------------------------------------------------------------
550
551/// Move `source` to `destination`: server-side copy then delete source.
552///
553/// On success emits `objects:updated` for both source and destination prefixes.
554#[tauri::command]
555pub async fn object_move(
556    profile_id: ProfileId,
557    source: ObjectRef,
558    destination: ObjectRef,
559    options: CopyOptions,
560    store: State<'_, ProfileStoreHandle>,
561    pool: State<'_, S3ClientPoolHandle>,
562    locks: State<'_, LockRegistryHandle>,
563    cache: State<'_, CacheHandle>,
564    channel: AppHandle,
565) -> Result<MoveResult, AppError> {
566    // ------------------------------------------------------------------
567    // 1. Resolve profile + validation gate
568    // ------------------------------------------------------------------
569    let profile = {
570        let store_guard = store.inner.lock().await;
571        store_guard
572            .get(&profile_id)
573            .ok_or_else(|| AppError::NotFound {
574                resource: format!("profile:{}", profile_id.as_str()),
575            })?
576    };
577
578    if profile.validated_at.is_none() {
579        return Err(AppError::Auth {
580            reason: "profile_not_validated_in_session".to_string(),
581        });
582    }
583
584    let default_region = profile
585        .default_region
586        .clone()
587        .unwrap_or_else(|| "us-east-1".to_string());
588
589    // ------------------------------------------------------------------
590    // 2. Acquire locks on source and destination prefixes
591    // ------------------------------------------------------------------
592    let src_prefix = parent_prefix(&source.key);
593    let dest_prefix = parent_prefix(&destination.key);
594
595    let now = std::time::SystemTime::now()
596        .duration_since(std::time::UNIX_EPOCH)
597        .unwrap_or_default()
598        .as_secs() as i64;
599
600    let src_scope = LockScope {
601        profile: profile_id.clone(),
602        bucket: Some(source.bucket.clone()),
603        prefix: Some(src_prefix.clone()),
604        key: None,
605    };
606    let dest_scope = LockScope {
607        profile: profile_id.clone(),
608        bucket: Some(destination.bucket.clone()),
609        prefix: Some(dest_prefix.clone()),
610        key: None,
611    };
612
613    let registry = locks.inner();
614    let src_lock_id = registry.acquire(src_scope, "object_move:source", 300, now)?;
615    let src_lock = registry
616        .list(None)
617        .into_iter()
618        .find(|l| l.id == src_lock_id)
619        .expect("lock must exist right after acquire");
620    let _ = emit_acquired(&channel, &src_lock);
621
622    let dest_lock_id = match registry.acquire(dest_scope, "object_move:dest", 300, now) {
623        Ok(id) => id,
624        Err(e) => {
625            if let Ok(lock) = registry.release(&src_lock_id) {
626                let _ = emit_released(&channel, &lock, ReleaseReason::Failure);
627            }
628            return Err(e);
629        }
630    };
631    let dest_lock = registry
632        .list(None)
633        .into_iter()
634        .find(|l| l.id == dest_lock_id)
635        .expect("lock must exist right after acquire");
636    let _ = emit_acquired(&channel, &dest_lock);
637
638    // ------------------------------------------------------------------
639    // 3. S3 move (copy + delete)
640    // ------------------------------------------------------------------
641    let client = pool
642        .inner
643        .get_or_build(&profile_id, &default_region)
644        .await
645        .ok_or_else(|| AppError::Internal {
646            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
647        })?;
648
649    let result = s3_move_object(
650        &client,
651        source.bucket.as_str(),
652        &source.key,
653        destination.bucket.as_str(),
654        &destination.key,
655        &options,
656    )
657    .await;
658
659    // ------------------------------------------------------------------
660    // 4. Release locks
661    // ------------------------------------------------------------------
662    let release_reason = if result.is_ok() {
663        ReleaseReason::Success
664    } else {
665        ReleaseReason::Failure
666    };
667
668    if let Ok(lock) = registry.release(&dest_lock_id) {
669        let _ = emit_released(&channel, &lock, release_reason.clone());
670    }
671    if let Ok(lock) = registry.release(&src_lock_id) {
672        let _ = emit_released(&channel, &lock, release_reason);
673    }
674
675    let move_result = result?;
676
677    // ------------------------------------------------------------------
678    // 5. Cache invalidation + events (source AND destination)
679    // ------------------------------------------------------------------
680    let cache_arc: CacheHandle = (*cache).clone();
681    on_object_mutation(&profile_id, &source.bucket, &src_prefix, None, &cache_arc);
682    on_object_mutation(
683        &profile_id,
684        &destination.bucket,
685        &dest_prefix,
686        None,
687        &cache_arc,
688    );
689    let _ = emit_objects_updated(&channel, &profile_id, &source.bucket, &src_prefix);
690    let _ = emit_objects_updated(&channel, &profile_id, &destination.bucket, &dest_prefix);
691
692    Ok(move_result)
693}
694
695// ---------------------------------------------------------------------------
696// object_create_folder
697// ---------------------------------------------------------------------------
698
699/// Create a virtual folder placeholder at `bucket/prefix/`.
700///
701/// Acquires a lock on the prefix, calls `create_folder`, then emits
702/// `objects:updated` for the parent prefix.
703#[tauri::command]
704pub async fn object_create_folder(
705    profile_id: ProfileId,
706    bucket: BucketId,
707    prefix: String,
708    store: State<'_, ProfileStoreHandle>,
709    pool: State<'_, S3ClientPoolHandle>,
710    locks: State<'_, LockRegistryHandle>,
711    cache: State<'_, CacheHandle>,
712    channel: AppHandle,
713) -> Result<(), AppError> {
714    // ------------------------------------------------------------------
715    // 1. Resolve profile + validation gate
716    // ------------------------------------------------------------------
717    let profile = {
718        let store_guard = store.inner.lock().await;
719        store_guard
720            .get(&profile_id)
721            .ok_or_else(|| AppError::NotFound {
722                resource: format!("profile:{}", profile_id.as_str()),
723            })?
724    };
725
726    if profile.validated_at.is_none() {
727        return Err(AppError::Auth {
728            reason: "profile_not_validated_in_session".to_string(),
729        });
730    }
731
732    let default_region = profile
733        .default_region
734        .clone()
735        .unwrap_or_else(|| "us-east-1".to_string());
736
737    // ------------------------------------------------------------------
738    // 2. Acquire lock on prefix
739    // ------------------------------------------------------------------
740    let now = std::time::SystemTime::now()
741        .duration_since(std::time::UNIX_EPOCH)
742        .unwrap_or_default()
743        .as_secs() as i64;
744
745    let scope = LockScope {
746        profile: profile_id.clone(),
747        bucket: Some(bucket.clone()),
748        prefix: Some(prefix.clone()),
749        key: None,
750    };
751
752    let registry = locks.inner();
753    let lock_id = registry.acquire(scope, "object_create_folder", 300, now)?;
754    let lock = registry
755        .list(None)
756        .into_iter()
757        .find(|l| l.id == lock_id)
758        .expect("lock must exist right after acquire");
759    let _ = emit_acquired(&channel, &lock);
760
761    // ------------------------------------------------------------------
762    // 3. S3 create folder
763    // ------------------------------------------------------------------
764    let client = pool
765        .inner
766        .get_or_build(&profile_id, &default_region)
767        .await
768        .ok_or_else(|| AppError::Internal {
769            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
770        })?;
771
772    let result = s3_create_folder(&client, bucket.as_str(), &prefix).await;
773
774    // ------------------------------------------------------------------
775    // 4. Release lock
776    // ------------------------------------------------------------------
777    let release_reason = if result.is_ok() {
778        ReleaseReason::Success
779    } else {
780        ReleaseReason::Failure
781    };
782    if let Ok(lock) = registry.release(&lock_id) {
783        let _ = emit_released(&channel, &lock, release_reason);
784    }
785
786    result?;
787
788    // ------------------------------------------------------------------
789    // 5. Cache invalidation + event (parent prefix)
790    // ------------------------------------------------------------------
791    let parent = parent_prefix(&prefix);
792    let cache_arc: CacheHandle = (*cache).clone();
793    on_object_mutation(&profile_id, &bucket, &parent, None, &cache_arc);
794    let _ = emit_objects_updated(&channel, &profile_id, &bucket, &parent);
795
796    Ok(())
797}
798
799// ---------------------------------------------------------------------------
800// object_delete_batch
801// ---------------------------------------------------------------------------
802
803/// One key (with optional version ID) in a batch delete request.
804///
805/// OCP: `bypass_governance_retention: bool` can be added later for object-lock
806/// support without breaking existing callers (`#[serde(default)]`).
807#[derive(Debug, Clone, Serialize, Deserialize)]
808#[serde(rename_all = "camelCase")]
809pub struct DeleteKey {
810    pub key: String,
811    #[serde(skip_serializing_if = "Option::is_none")]
812    pub version_id: Option<String>,
813}
814
815/// Delete `keys` from `bucket` using the S3 batched `DeleteObjects` API.
816///
817/// - Groups keys by parent prefix and acquires one lock per unique prefix.
818/// - Issues the batch delete (chunked at 1 000 keys per AWS limit).
819/// - Invalidates cache and emits `objects:updated` once per unique affected
820///   prefix (only for prefixes that had at least one successfully deleted key).
821/// - All locks are released before returning.
822///
823/// Partial per-key failures (AC-4) are returned in `DeleteReport.failed`.
824/// A non-empty `failed` list is NOT an `Err` — the caller decides UI handling.
825#[tauri::command]
826pub async fn object_delete_batch(
827    profile_id: ProfileId,
828    bucket: BucketId,
829    keys: Vec<DeleteKey>,
830    store: State<'_, ProfileStoreHandle>,
831    pool: State<'_, S3ClientPoolHandle>,
832    locks: State<'_, LockRegistryHandle>,
833    cache: State<'_, CacheHandle>,
834    channel: AppHandle,
835) -> Result<DeleteReport, AppError> {
836    // ------------------------------------------------------------------
837    // 1. Resolve profile + validation gate
838    // ------------------------------------------------------------------
839    let profile = {
840        let store_guard = store.inner.lock().await;
841        store_guard
842            .get(&profile_id)
843            .ok_or_else(|| AppError::NotFound {
844                resource: format!("profile:{}", profile_id.as_str()),
845            })?
846    };
847
848    if profile.validated_at.is_none() {
849        return Err(AppError::Auth {
850            reason: "profile_not_validated_in_session".to_string(),
851        });
852    }
853
854    let default_region = profile
855        .default_region
856        .clone()
857        .unwrap_or_else(|| "us-east-1".to_string());
858
859    // ------------------------------------------------------------------
860    // 2. Collect unique affected prefixes and acquire one lock per prefix
861    // ------------------------------------------------------------------
862    // Collect unique prefixes from all requested keys.
863    // BTreeSet gives sorted, deduplicated order — deterministic acquisition prevents
864    // cross-task deadlocks if two concurrent batches touch overlapping prefixes.
865    let unique_prefixes: Vec<String> = keys
866        .iter()
867        .map(|dk| parent_prefix(&dk.key))
868        .collect::<std::collections::BTreeSet<_>>()
869        .into_iter()
870        .collect();
871
872    let now = std::time::SystemTime::now()
873        .duration_since(std::time::UNIX_EPOCH)
874        .unwrap_or_default()
875        .as_secs() as i64;
876
877    let registry = locks.inner();
878    let mut acquired_lock_ids: Vec<crate::locks::LockId> =
879        Vec::with_capacity(unique_prefixes.len());
880
881    for prefix in &unique_prefixes {
882        let scope = LockScope {
883            profile: profile_id.clone(),
884            bucket: Some(bucket.clone()),
885            prefix: Some(prefix.clone()),
886            key: None,
887        };
888        let lock_id = match registry.acquire(scope, "object_delete_batch", 300, now) {
889            Ok(id) => id,
890            Err(e) => {
891                // Release all already-acquired locks before propagating.
892                for held_id in &acquired_lock_ids {
893                    if let Ok(lock) = registry.release(held_id) {
894                        let _ = emit_released(&channel, &lock, ReleaseReason::Failure);
895                    }
896                }
897                return Err(e);
898            }
899        };
900        let lock = registry
901            .list(None)
902            .into_iter()
903            .find(|l| l.id == lock_id)
904            .expect("lock must exist right after acquire");
905        let _ = emit_acquired(&channel, &lock);
906        acquired_lock_ids.push(lock_id);
907    }
908
909    // ------------------------------------------------------------------
910    // 3. S3 batch delete
911    // ------------------------------------------------------------------
912    let client = pool
913        .inner
914        .get_or_build(&profile_id, &default_region)
915        .await
916        .ok_or_else(|| AppError::Internal {
917            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
918        })?;
919
920    // Convert DeleteKey vec to the format expected by delete_objects_batch.
921    let key_pairs: Vec<(ObjectKey, Option<String>)> = keys
922        .iter()
923        .map(|dk| (ObjectKey::new(dk.key.clone()), dk.version_id.clone()))
924        .collect();
925
926    let result = s3_delete_objects_batch(&client, bucket.as_str(), key_pairs).await;
927
928    // ------------------------------------------------------------------
929    // 4. Release all locks
930    // ------------------------------------------------------------------
931    let release_reason = if result.is_ok() {
932        ReleaseReason::Success
933    } else {
934        ReleaseReason::Failure
935    };
936    for held_id in &acquired_lock_ids {
937        if let Ok(lock) = registry.release(held_id) {
938            let _ = emit_released(&channel, &lock, release_reason.clone());
939        }
940    }
941
942    let report = result?;
943
944    // ------------------------------------------------------------------
945    // 5. Cache invalidation + events for each affected prefix
946    //
947    // Emit one `objects:updated` per unique prefix that had at least one
948    // successfully deleted key. Consistent with the single-prefix event
949    // shape from copy/move/create_folder.
950    // ------------------------------------------------------------------
951    // Collect the parent prefixes of all successfully deleted keys.
952    let affected_prefixes: std::collections::BTreeSet<String> = report
953        .deleted
954        .iter()
955        .map(|d| parent_prefix(&d.key))
956        .collect();
957
958    let cache_arc: CacheHandle = (*cache).clone();
959    for prefix in &affected_prefixes {
960        on_object_mutation(&profile_id, &bucket, prefix, None, &cache_arc);
961        let _ = emit_objects_updated(&channel, &profile_id, &bucket, prefix);
962    }
963
964    Ok(report)
965}
966
967// ---------------------------------------------------------------------------
968// object_set_metadata
969// ---------------------------------------------------------------------------
970
971/// Replace user-defined metadata on `bucket/key`.
972///
973/// Uses a server-side `CopyObject` self-overwrite with `MetadataDirective::Replace`
974/// so the object body is preserved without re-uploading.
975///
976/// When `if_match_etag` is supplied the backend enforces an ETag precondition.
977/// A mismatch returns `AppError::Conflict`.
978#[tauri::command]
979pub async fn object_set_metadata(
980    profile_id: ProfileId,
981    bucket: BucketId,
982    key: String,
983    metadata: HashMap<String, String>,
984    if_match_etag: Option<String>,
985    store: State<'_, ProfileStoreHandle>,
986    pool: State<'_, S3ClientPoolHandle>,
987    locks: State<'_, LockRegistryHandle>,
988    cache: State<'_, CacheHandle>,
989    channel: AppHandle,
990) -> Result<PutResult, AppError> {
991    // ------------------------------------------------------------------
992    // 1. Resolve profile + validation gate
993    // ------------------------------------------------------------------
994    let profile = {
995        let store_guard = store.inner.lock().await;
996        store_guard
997            .get(&profile_id)
998            .ok_or_else(|| AppError::NotFound {
999                resource: format!("profile:{}", profile_id.as_str()),
1000            })?
1001    };
1002
1003    if profile.validated_at.is_none() {
1004        return Err(AppError::Auth {
1005            reason: "profile_not_validated_in_session".to_string(),
1006        });
1007    }
1008
1009    let default_region = profile
1010        .default_region
1011        .clone()
1012        .unwrap_or_else(|| "us-east-1".to_string());
1013
1014    // ------------------------------------------------------------------
1015    // 2. Acquire lock on object key
1016    // ------------------------------------------------------------------
1017    let prefix = parent_prefix(&key);
1018
1019    let now = std::time::SystemTime::now()
1020        .duration_since(std::time::UNIX_EPOCH)
1021        .unwrap_or_default()
1022        .as_secs() as i64;
1023
1024    let scope = LockScope {
1025        profile: profile_id.clone(),
1026        bucket: Some(bucket.clone()),
1027        prefix: Some(prefix.clone()),
1028        key: Some(ObjectKey::new(key.clone())),
1029    };
1030
1031    let registry = locks.inner();
1032    let lock_id = registry.acquire(scope, "object_set_metadata", 300, now)?;
1033    let lock = registry
1034        .list(None)
1035        .into_iter()
1036        .find(|l| l.id == lock_id)
1037        .expect("lock must exist right after acquire");
1038    let _ = emit_acquired(&channel, &lock);
1039
1040    // ------------------------------------------------------------------
1041    // 3. S3 metadata update
1042    // ------------------------------------------------------------------
1043    let client = pool
1044        .inner
1045        .get_or_build(&profile_id, &default_region)
1046        .await
1047        .ok_or_else(|| AppError::Internal {
1048            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1049        })?;
1050
1051    let result = s3_set_metadata(&client, bucket.as_str(), &key, metadata, if_match_etag).await;
1052
1053    // ------------------------------------------------------------------
1054    // 4. Release lock
1055    // ------------------------------------------------------------------
1056    let release_reason = if result.is_ok() {
1057        ReleaseReason::Success
1058    } else {
1059        ReleaseReason::Failure
1060    };
1061    if let Ok(lock) = registry.release(&lock_id) {
1062        let _ = emit_released(&channel, &lock, release_reason);
1063    }
1064
1065    let put_result = result?;
1066
1067    // ------------------------------------------------------------------
1068    // 5. Cache invalidation + event
1069    // ------------------------------------------------------------------
1070    let cache_arc: CacheHandle = (*cache).clone();
1071    on_object_mutation(&profile_id, &bucket, &prefix, None, &cache_arc);
1072    let _ = emit_objects_updated(&channel, &profile_id, &bucket, &prefix);
1073
1074    Ok(put_result)
1075}
1076
1077// ---------------------------------------------------------------------------
1078// object_set_tags
1079// ---------------------------------------------------------------------------
1080
1081/// Set (or clear) the tags on `bucket/key`.
1082///
1083/// An empty `tags` map removes all tags via `DeleteObjectTagging`.
1084///
1085/// When `if_match_etag` is supplied an explicit `HeadObject` precondition check
1086/// is performed before `PutObjectTagging` (race-prone but the best AWS allows).
1087/// A mismatch returns `AppError::Conflict`.
1088#[tauri::command]
1089pub async fn object_set_tags(
1090    profile_id: ProfileId,
1091    bucket: BucketId,
1092    key: String,
1093    tags: HashMap<String, String>,
1094    if_match_etag: Option<String>,
1095    store: State<'_, ProfileStoreHandle>,
1096    pool: State<'_, S3ClientPoolHandle>,
1097    locks: State<'_, LockRegistryHandle>,
1098    cache: State<'_, CacheHandle>,
1099    channel: AppHandle,
1100) -> Result<PutResult, AppError> {
1101    // ------------------------------------------------------------------
1102    // 1. Resolve profile + validation gate
1103    // ------------------------------------------------------------------
1104    let profile = {
1105        let store_guard = store.inner.lock().await;
1106        store_guard
1107            .get(&profile_id)
1108            .ok_or_else(|| AppError::NotFound {
1109                resource: format!("profile:{}", profile_id.as_str()),
1110            })?
1111    };
1112
1113    if profile.validated_at.is_none() {
1114        return Err(AppError::Auth {
1115            reason: "profile_not_validated_in_session".to_string(),
1116        });
1117    }
1118
1119    let default_region = profile
1120        .default_region
1121        .clone()
1122        .unwrap_or_else(|| "us-east-1".to_string());
1123
1124    // ------------------------------------------------------------------
1125    // 2. Acquire lock on object key
1126    // ------------------------------------------------------------------
1127    let prefix = parent_prefix(&key);
1128
1129    let now = std::time::SystemTime::now()
1130        .duration_since(std::time::UNIX_EPOCH)
1131        .unwrap_or_default()
1132        .as_secs() as i64;
1133
1134    let scope = LockScope {
1135        profile: profile_id.clone(),
1136        bucket: Some(bucket.clone()),
1137        prefix: Some(prefix.clone()),
1138        key: Some(ObjectKey::new(key.clone())),
1139    };
1140
1141    let registry = locks.inner();
1142    let lock_id = registry.acquire(scope, "object_set_tags", 300, now)?;
1143    let lock = registry
1144        .list(None)
1145        .into_iter()
1146        .find(|l| l.id == lock_id)
1147        .expect("lock must exist right after acquire");
1148    let _ = emit_acquired(&channel, &lock);
1149
1150    // ------------------------------------------------------------------
1151    // 3. S3 tags update
1152    // ------------------------------------------------------------------
1153    let client = pool
1154        .inner
1155        .get_or_build(&profile_id, &default_region)
1156        .await
1157        .ok_or_else(|| AppError::Internal {
1158            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1159        })?;
1160
1161    let result = s3_set_tags(&client, bucket.as_str(), &key, tags, if_match_etag).await;
1162
1163    // ------------------------------------------------------------------
1164    // 4. Release lock
1165    // ------------------------------------------------------------------
1166    let release_reason = if result.is_ok() {
1167        ReleaseReason::Success
1168    } else {
1169        ReleaseReason::Failure
1170    };
1171    if let Ok(lock) = registry.release(&lock_id) {
1172        let _ = emit_released(&channel, &lock, release_reason);
1173    }
1174
1175    let put_result = result?;
1176
1177    // ------------------------------------------------------------------
1178    // 5. Cache invalidation + event
1179    // ------------------------------------------------------------------
1180    let cache_arc: CacheHandle = (*cache).clone();
1181    on_object_mutation(&profile_id, &bucket, &prefix, None, &cache_arc);
1182    let _ = emit_objects_updated(&channel, &profile_id, &bucket, &prefix);
1183
1184    Ok(put_result)
1185}
1186
1187// ---------------------------------------------------------------------------
1188// object_presign
1189// ---------------------------------------------------------------------------
1190
1191/// Generate a presigned `GetObject` URL for `bucket/key`.
1192///
1193/// The URL embeds the credentials in the query string (SigV4) and is valid
1194/// for `expires_sec` seconds.  When `expires_sec` is omitted the default is
1195/// 3 600 s (1 hour).
1196///
1197/// # Validation
1198///
1199/// Returns `AppError::Validation { field: "expires_secs", … }` when the
1200/// supplied expiry is outside `[60, 604_800]` (60 s – 7 days).
1201///
1202/// # Security
1203///
1204/// The URL is generated in Rust — AWS credentials never cross the IPC
1205/// boundary.  The frontend receives an opaque `PresignedUrl` struct and
1206/// writes the URL to the clipboard.  The URL itself carries no ongoing auth
1207/// state; once generated it may be shared freely within its expiry window.
1208#[tauri::command]
1209pub async fn object_presign(
1210    profile_id: ProfileId,
1211    bucket: BucketId,
1212    key: String,
1213    expires_sec: Option<u64>,
1214    store: State<'_, ProfileStoreHandle>,
1215    pool: State<'_, S3ClientPoolHandle>,
1216) -> Result<PresignedUrl, AppError> {
1217    // ------------------------------------------------------------------
1218    // 1. Resolve expiry with default
1219    // ------------------------------------------------------------------
1220    const DEFAULT_EXPIRES_SECS: u64 = 3_600;
1221    let expires_secs = expires_sec.unwrap_or(DEFAULT_EXPIRES_SECS);
1222
1223    // ------------------------------------------------------------------
1224    // 2. Resolve profile + validation gate
1225    // ------------------------------------------------------------------
1226    let profile = {
1227        let store_guard = store.inner.lock().await;
1228        store_guard
1229            .get(&profile_id)
1230            .ok_or_else(|| AppError::NotFound {
1231                resource: format!("profile:{}", profile_id.as_str()),
1232            })?
1233    };
1234
1235    if profile.validated_at.is_none() {
1236        return Err(AppError::Auth {
1237            reason: "profile_not_validated_in_session".to_string(),
1238        });
1239    }
1240
1241    let default_region = profile
1242        .default_region
1243        .clone()
1244        .unwrap_or_else(|| "us-east-1".to_string());
1245
1246    // ------------------------------------------------------------------
1247    // 3. Build S3 client
1248    // ------------------------------------------------------------------
1249    let client = pool
1250        .inner
1251        .get_or_build(&profile_id, &default_region)
1252        .await
1253        .ok_or_else(|| AppError::Internal {
1254            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1255        })?;
1256
1257    // ------------------------------------------------------------------
1258    // 4. Generate presigned URL (validation happens inside this helper)
1259    // ------------------------------------------------------------------
1260    presign_get_object(&client, bucket.as_str(), &key, expires_secs).await
1261}
1262
1263// ---------------------------------------------------------------------------
1264// object_set_storage_class
1265// ---------------------------------------------------------------------------
1266
1267/// Change the storage class of one or more objects.
1268///
1269/// # Safety gate (diff framework)
1270///
1271/// This command requires a `confirmed_diff_id` that was previously created via
1272/// `diff_preview_create`.  The id is consumed atomically on entry:
1273///
1274/// - If the diff does not exist, was cancelled, or expired → `Validation` error.
1275/// - If the diff was already consumed (double-confirm rejection) → `Validation`
1276///   error.
1277/// - If the diff payload does not match the requested targets/class →
1278///   `Validation` error.
1279///
1280/// This is the single authoritative enforce point for the "no blind storage
1281/// class change" invariant from Decision D2.
1282///
1283/// # Decision D2 (optimistic boundary)
1284///
1285/// Storage class change is explicitly NOT subject to optimistic updates.
1286/// The command emits `objects:updated` per target on success, which allows the
1287/// frontend's event-driven path to refresh the listing.  No `optimistic.ts`
1288/// helper exists for this operation (asserted in
1289/// `storage_class_change_does_not_use_optimistic_path`).
1290///
1291/// # Event emission (round-1 finding #14)
1292///
1293/// `objects:updated { profileId, bucket, prefix }` is emitted for each
1294/// successfully changed object's parent prefix.
1295#[tauri::command]
1296pub async fn object_set_storage_class(
1297    profile_id: ProfileId,
1298    targets: Vec<ObjectRef>,
1299    new_storage_class: String,
1300    confirmed_diff_id: crate::diff::DiffId,
1301    store: State<'_, ProfileStoreHandle>,
1302    pool: State<'_, S3ClientPoolHandle>,
1303    locks: State<'_, LockRegistryHandle>,
1304    cache: State<'_, CacheHandle>,
1305    channel: AppHandle,
1306    diff_store: State<'_, crate::diff::DiffStoreHandle>,
1307) -> Result<Vec<crate::s3::metadata::PutResult>, AppError> {
1308    // ------------------------------------------------------------------
1309    // 1. Consume the diff — single authoritative safety gate
1310    // ------------------------------------------------------------------
1311    let consumed_payload = diff_store
1312        .inner
1313        .consume(&confirmed_diff_id)
1314        .ok_or_else(|| {
1315            let record = diff_store.inner.get(&confirmed_diff_id);
1316            match record {
1317                Some(r) => match r.status {
1318                    crate::diff::DiffStatus::Cancelled | crate::diff::DiffStatus::Expired => {
1319                        AppError::Validation {
1320                            field: "confirmed_diff_id".to_string(),
1321                            hint: "Diff was cancelled or expired".to_string(),
1322                        }
1323                    }
1324                    crate::diff::DiffStatus::Confirmed => AppError::Validation {
1325                        field: "confirmed_diff_id".to_string(),
1326                        hint: "Diff already consumed (double-confirm rejection)".to_string(),
1327                    },
1328                    crate::diff::DiffStatus::Pending => AppError::Internal {
1329                        trace_id: "diff_consume_failed_unexpectedly".to_string(),
1330                    },
1331                },
1332                None => AppError::Validation {
1333                    field: "confirmed_diff_id".to_string(),
1334                    hint: "Diff not found".to_string(),
1335                },
1336            }
1337        })?;
1338
1339    // ------------------------------------------------------------------
1340    // 2. Validate that the consumed diff payload matches the request
1341    // ------------------------------------------------------------------
1342    // DiffPayload currently has a single variant — destructure with `let`.
1343    // When new variants are added, switch back to a `match`.
1344    let crate::diff::DiffPayload::StorageClass {
1345        targets: diff_targets,
1346        new_class: diff_new_class,
1347        ..
1348    } = &consumed_payload;
1349    // Check new_class matches.
1350    if *diff_new_class != new_storage_class {
1351        return Err(AppError::Validation {
1352            field: "new_storage_class".to_string(),
1353            hint: format!(
1354                "Requested class '{}' does not match diff's class '{}'",
1355                new_storage_class, diff_new_class
1356            ),
1357        });
1358    }
1359    // Check targets match (same set, order-insensitive).
1360    if diff_targets.len() != targets.len()
1361        || !targets.iter().all(|t| {
1362            diff_targets
1363                .iter()
1364                .any(|dt| dt.bucket.as_str() == t.bucket.as_str() && dt.key == t.key)
1365        })
1366    {
1367        return Err(AppError::Validation {
1368            field: "targets".to_string(),
1369            hint: "Requested targets do not match the diff's targets".to_string(),
1370        });
1371    }
1372
1373    // ------------------------------------------------------------------
1374    // 3. Resolve profile + validation gate
1375    // ------------------------------------------------------------------
1376    let profile = {
1377        let store_guard = store.inner.lock().await;
1378        store_guard
1379            .get(&profile_id)
1380            .ok_or_else(|| AppError::NotFound {
1381                resource: format!("profile:{}", profile_id.as_str()),
1382            })?
1383    };
1384
1385    if profile.validated_at.is_none() {
1386        return Err(AppError::Auth {
1387            reason: "profile_not_validated_in_session".to_string(),
1388        });
1389    }
1390
1391    let default_region = profile
1392        .default_region
1393        .clone()
1394        .unwrap_or_else(|| "us-east-1".to_string());
1395
1396    let client = pool
1397        .inner
1398        .get_or_build(&profile_id, &default_region)
1399        .await
1400        .ok_or_else(|| AppError::Internal {
1401            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1402        })?;
1403
1404    // ------------------------------------------------------------------
1405    // 4. Process each target: acquire lock → set storage class → release
1406    // ------------------------------------------------------------------
1407    let now = std::time::SystemTime::now()
1408        .duration_since(std::time::UNIX_EPOCH)
1409        .unwrap_or_default()
1410        .as_secs() as i64;
1411
1412    let registry = locks.inner();
1413    let cache_arc: CacheHandle = (*cache).clone();
1414    let mut results: Vec<crate::s3::metadata::PutResult> = Vec::with_capacity(targets.len());
1415
1416    for target in &targets {
1417        let prefix = parent_prefix(&target.key);
1418
1419        let scope = LockScope {
1420            profile: profile_id.clone(),
1421            bucket: Some(target.bucket.clone()),
1422            prefix: Some(prefix.clone()),
1423            key: Some(ObjectKey::new(target.key.clone())),
1424        };
1425
1426        let lock_id = registry.acquire(scope, "object_set_storage_class", 300, now)?;
1427        let lock = registry
1428            .list(None)
1429            .into_iter()
1430            .find(|l| l.id == lock_id)
1431            .expect("lock must exist right after acquire");
1432        let _ = emit_acquired(&channel, &lock);
1433
1434        let result = crate::s3::object::set_object_storage_class(
1435            &client,
1436            target.bucket.as_str(),
1437            &target.key,
1438            new_storage_class.clone(),
1439        )
1440        .await;
1441
1442        let release_reason = if result.is_ok() {
1443            ReleaseReason::Success
1444        } else {
1445            ReleaseReason::Failure
1446        };
1447        if let Ok(lock) = registry.release(&lock_id) {
1448            let _ = emit_released(&channel, &lock, release_reason);
1449        }
1450
1451        let put_result = result?;
1452
1453        // Cache invalidation + event (round-1 finding #14).
1454        on_object_mutation(&profile_id, &target.bucket, &prefix, None, &cache_arc);
1455        let _ = emit_objects_updated(&channel, &profile_id, &target.bucket, &prefix);
1456
1457        results.push(put_result);
1458    }
1459
1460    Ok(results)
1461}
1462
1463// ---------------------------------------------------------------------------
1464// object_get_text — text content fetch for preview
1465// ---------------------------------------------------------------------------
1466
1467/// Fetch the first `max_bytes` bytes of an S3 object as UTF-8 text.
1468///
1469/// Uses a `Range: bytes=0-<max_bytes-1>` request so large objects are not
1470/// fully downloaded.  Invalid UTF-8 bytes are replaced with U+FFFD.
1471///
1472/// Returns a `TextPayload` with the decoded body, total content length, ETag,
1473/// and a `truncated` flag.  The default limit is 1 MiB when `max_bytes` is
1474/// omitted.
1475///
1476/// This command is intentionally read-only and does not emit any events or
1477/// touch the mutation cache.  It is reusable by the Monaco editor (task 50)
1478/// for its initial content load.
1479///
1480/// # Validation gate
1481///
1482/// Refuses to serve data when the profile has not been validated (AC-8 /
1483/// round-1 finding #9).
1484#[tauri::command]
1485pub async fn object_get_text(
1486    profile_id: ProfileId,
1487    bucket: String,
1488    key: String,
1489    max_bytes: Option<u64>,
1490    store: State<'_, ProfileStoreHandle>,
1491    pool: State<'_, S3ClientPoolHandle>,
1492) -> Result<TextPayload, AppError> {
1493    // ------------------------------------------------------------------
1494    // 1. Resolve profile + validation gate
1495    // ------------------------------------------------------------------
1496    let (validated, region_override, default_region) = {
1497        let store_guard = store.inner.lock().await;
1498        let profile = store_guard
1499            .get(&profile_id)
1500            .ok_or_else(|| AppError::NotFound {
1501                resource: format!("profile:{}", profile_id.as_str()),
1502            })?;
1503        (
1504            profile.validated_at.is_some(),
1505            profile.compat_flags.region_override.clone(),
1506            profile.default_region.clone(),
1507        )
1508    };
1509
1510    if !validated {
1511        return Err(AppError::Auth {
1512            reason: "profile_not_validated_in_session".to_string(),
1513        });
1514    }
1515
1516    let region = region_override
1517        .or(default_region)
1518        .unwrap_or_else(|| "us-east-1".to_string());
1519
1520    // ------------------------------------------------------------------
1521    // 2. Build client
1522    // ------------------------------------------------------------------
1523    let client = pool
1524        .inner
1525        .get_or_build(&profile_id, &region)
1526        .await
1527        .ok_or_else(|| AppError::Internal {
1528            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1529        })?;
1530
1531    // ------------------------------------------------------------------
1532    // 3. Fetch text
1533    // ------------------------------------------------------------------
1534    let limit = max_bytes.unwrap_or(DEFAULT_TEXT_MAX_BYTES);
1535    s3_get_object_text(&client, &bucket, &key, limit).await
1536}
1537
1538// ---------------------------------------------------------------------------
1539// object_get_bytes — raw binary fetch for hex/archive preview
1540// ---------------------------------------------------------------------------
1541
1542/// Fetch the first `max_bytes` bytes of an S3 object as a base64-encoded string.
1543///
1544/// Uses a `Range: bytes=0-<max_bytes-1>` request so large objects are not
1545/// fully downloaded.  The frontend decodes with `atob` or equivalent.
1546///
1547/// Returns a `BytesPayload` with the base64 body, total content length, ETag,
1548/// and a `truncated` flag.  The default limit is 1 MiB when `max_bytes` is
1549/// omitted.
1550///
1551/// # Validation gate
1552///
1553/// Refuses to serve data when the profile has not been validated (AC-8 /
1554/// round-1 finding #9).
1555#[tauri::command]
1556pub async fn object_get_bytes(
1557    profile_id: ProfileId,
1558    bucket: String,
1559    key: String,
1560    max_bytes: Option<u64>,
1561    store: State<'_, ProfileStoreHandle>,
1562    pool: State<'_, S3ClientPoolHandle>,
1563) -> Result<BytesPayload, AppError> {
1564    // ------------------------------------------------------------------
1565    // 1. Resolve profile + validation gate
1566    // ------------------------------------------------------------------
1567    let (validated, region_override, default_region) = {
1568        let store_guard = store.inner.lock().await;
1569        let profile = store_guard
1570            .get(&profile_id)
1571            .ok_or_else(|| AppError::NotFound {
1572                resource: format!("profile:{}", profile_id.as_str()),
1573            })?;
1574        (
1575            profile.validated_at.is_some(),
1576            profile.compat_flags.region_override.clone(),
1577            profile.default_region.clone(),
1578        )
1579    };
1580
1581    if !validated {
1582        return Err(AppError::Auth {
1583            reason: "profile_not_validated_in_session".to_string(),
1584        });
1585    }
1586
1587    let region = region_override
1588        .or(default_region)
1589        .unwrap_or_else(|| "us-east-1".to_string());
1590
1591    // ------------------------------------------------------------------
1592    // 2. Build client
1593    // ------------------------------------------------------------------
1594    let client = pool
1595        .inner
1596        .get_or_build(&profile_id, &region)
1597        .await
1598        .ok_or_else(|| AppError::Internal {
1599            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1600        })?;
1601
1602    // ------------------------------------------------------------------
1603    // 3. Fetch bytes
1604    // ------------------------------------------------------------------
1605    let limit = max_bytes.unwrap_or(DEFAULT_BYTES_MAX_BYTES);
1606    s3_get_object_bytes(&client, &bucket, &key, limit).await
1607}
1608
1609// ---------------------------------------------------------------------------
1610// object_put_text — write text body with optional ETag precondition
1611// ---------------------------------------------------------------------------
1612
1613/// Write a UTF-8 text body to `bucket/key`.
1614///
1615/// When `if_match_etag` is supplied the backend sets the `If-Match` header so
1616/// S3 rejects the write with 412 if the object was modified since the editor
1617/// loaded it.  That 412 maps to `AppError::Conflict { etag_expected, etag_actual: None }`.
1618///
1619/// The frontend should:
1620/// - Supply `if_match_etag` from the ETag returned by `object_get_text` for
1621///   conflict-safe saves.
1622/// - Omit `if_match_etag` (pass `null`) for "save anyway" after a conflict.
1623///
1624/// On success the backend emits `objects:updated { profileId, bucket, prefix }`
1625/// for the parent prefix of `key` so the listing refreshes.
1626///
1627/// # Validation gate
1628///
1629/// Refuses to write when the profile has not been validated (AC-8).
1630#[tauri::command]
1631pub async fn object_put_text(
1632    profile_id: ProfileId,
1633    bucket: String,
1634    key: String,
1635    body: String,
1636    if_match_etag: Option<String>,
1637    store: State<'_, ProfileStoreHandle>,
1638    pool: State<'_, S3ClientPoolHandle>,
1639    locks: State<'_, LockRegistryHandle>,
1640    cache: State<'_, CacheHandle>,
1641    channel: AppHandle,
1642) -> Result<PutResult, AppError> {
1643    // ------------------------------------------------------------------
1644    // 1. Resolve profile + validation gate
1645    // ------------------------------------------------------------------
1646    let (validated, region_override, default_region) = {
1647        let store_guard = store.inner.lock().await;
1648        let profile = store_guard
1649            .get(&profile_id)
1650            .ok_or_else(|| AppError::NotFound {
1651                resource: format!("profile:{}", profile_id.as_str()),
1652            })?;
1653        (
1654            profile.validated_at.is_some(),
1655            profile.compat_flags.region_override.clone(),
1656            profile.default_region.clone(),
1657        )
1658    };
1659
1660    if !validated {
1661        return Err(AppError::Auth {
1662            reason: "profile_not_validated_in_session".to_string(),
1663        });
1664    }
1665
1666    let region = region_override
1667        .or(default_region)
1668        .unwrap_or_else(|| "us-east-1".to_string());
1669
1670    // ------------------------------------------------------------------
1671    // 2. Acquire lock on the object key
1672    // ------------------------------------------------------------------
1673    let prefix = parent_prefix(&key);
1674
1675    let now = std::time::SystemTime::now()
1676        .duration_since(std::time::UNIX_EPOCH)
1677        .unwrap_or_default()
1678        .as_secs() as i64;
1679
1680    let bucket_id = BucketId::new(bucket.clone());
1681
1682    let scope = LockScope {
1683        profile: profile_id.clone(),
1684        bucket: Some(bucket_id.clone()),
1685        prefix: Some(prefix.clone()),
1686        key: Some(ObjectKey::new(key.clone())),
1687    };
1688
1689    let registry = locks.inner();
1690    let lock_id = registry.acquire(scope, "object_put_text", 300, now)?;
1691    let lock = registry
1692        .list(None)
1693        .into_iter()
1694        .find(|l| l.id == lock_id)
1695        .expect("lock must exist right after acquire");
1696    let _ = emit_acquired(&channel, &lock);
1697
1698    // ------------------------------------------------------------------
1699    // 3. Build client + call S3
1700    // ------------------------------------------------------------------
1701    let client = pool
1702        .inner
1703        .get_or_build(&profile_id, &region)
1704        .await
1705        .ok_or_else(|| AppError::Internal {
1706            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
1707        })?;
1708
1709    let result = s3_put_object_text(&client, &bucket, &key, body, if_match_etag).await;
1710
1711    // ------------------------------------------------------------------
1712    // 4. Release lock
1713    // ------------------------------------------------------------------
1714    let release_reason = if result.is_ok() {
1715        ReleaseReason::Success
1716    } else {
1717        ReleaseReason::Failure
1718    };
1719    if let Ok(lock) = registry.release(&lock_id) {
1720        let _ = emit_released(&channel, &lock, release_reason);
1721    }
1722
1723    let put_result = result?;
1724
1725    // ------------------------------------------------------------------
1726    // 5. Cache invalidation + event
1727    // ------------------------------------------------------------------
1728    let cache_arc: CacheHandle = (*cache).clone();
1729    on_object_mutation(&profile_id, &bucket_id, &prefix, None, &cache_arc);
1730    let _ = emit_objects_updated(&channel, &profile_id, &bucket_id, &prefix);
1731
1732    Ok(put_result)
1733}
1734
1735// ---------------------------------------------------------------------------
1736// Tests
1737// ---------------------------------------------------------------------------
1738
1739#[cfg(test)]
1740mod tests {
1741    use super::*;
1742    use crate::{
1743        cache::{store::CacheStore, CacheConfig},
1744        ids::BucketId,
1745        s3::list::{ListPage, ObjectEntry},
1746    };
1747
1748    fn validated_at() -> Option<i64> {
1749        Some(1_700_000_000_000)
1750    }
1751
1752    fn make_page(prefix: &str, flat: bool) -> ListPage {
1753        ListPage {
1754            entries: vec![ObjectEntry {
1755                key: format!("{prefix}file.txt"),
1756                size: 42,
1757                last_modified: None,
1758                etag: None,
1759                storage_class: None,
1760                is_prefix: false,
1761            }],
1762            common_prefixes: if flat {
1763                vec![]
1764            } else {
1765                vec![format!("{prefix}subdir/")]
1766            },
1767            next_continuation_token: None,
1768            is_truncated: false,
1769            prefix: prefix.to_string(),
1770            delimiter: if flat { None } else { Some("/".to_string()) },
1771        }
1772    }
1773
1774    // ------------------------------------------------------------------
1775    // Validation gate: unvalidated profile returns Auth error
1776    // ------------------------------------------------------------------
1777
1778    #[test]
1779    fn unvalidated_profile_gate_returns_auth_error() {
1780        let validated_at: Option<i64> = None;
1781        let result: Result<(), AppError> = if validated_at.is_none() {
1782            Err(AppError::Auth {
1783                reason: "profile_not_validated_in_session".to_string(),
1784            })
1785        } else {
1786            Ok(())
1787        };
1788
1789        match result {
1790            Err(AppError::Auth { reason }) => {
1791                assert_eq!(reason, "profile_not_validated_in_session");
1792            }
1793            _ => panic!("expected Auth error for unvalidated profile"),
1794        }
1795    }
1796
1797    // ------------------------------------------------------------------
1798    // Cache hit on first page (no continuation token)
1799    // ------------------------------------------------------------------
1800
1801    #[test]
1802    fn cache_hit_on_first_page_returns_cached_value() {
1803        let pid = ProfileId::new("p-objects");
1804        let bid = BucketId::new("my-bucket");
1805        let prefix = "photos/".to_string();
1806        let cache = CacheStore::in_memory(CacheConfig::default());
1807
1808        let expected = make_page(&prefix, false);
1809        let cache_key = CacheKey::Objects {
1810            profile: pid.clone(),
1811            bucket: bid.clone(),
1812            prefix: prefix.clone(),
1813        };
1814        cache
1815            .put(&cache_key, &expected, None)
1816            .expect("put must succeed");
1817
1818        let read = cache
1819            .get::<ListPage>(&cache_key, validated_at())
1820            .expect("get must not error")
1821            .expect("entry must exist");
1822
1823        assert_eq!(read.freshness, Freshness::Fresh);
1824        assert_eq!(read.value.prefix, prefix);
1825        assert_eq!(read.value.entries.len(), 1);
1826    }
1827
1828    // ------------------------------------------------------------------
1829    // Cache miss on continuation token: key includes no token in the cache
1830    // ------------------------------------------------------------------
1831
1832    #[test]
1833    fn cache_miss_on_continuation_token_bypasses_cache() {
1834        // The cache is keyed on (profile, bucket, prefix) — no token.
1835        // A call with a token is always a "page 2+" and must not hit cache.
1836        // We verify this by putting a page into the cache and asserting
1837        // that looking up with the same key but different scenario works:
1838        // the command code simply skips the cache lookup when token is Some.
1839        // Here we test the cache key logic directly.
1840        let pid = ProfileId::new("p-paged");
1841        let bid = BucketId::new("paged-bucket");
1842        let prefix = "data/".to_string();
1843        let cache = CacheStore::in_memory(CacheConfig::default());
1844
1845        let page = make_page(&prefix, false);
1846        let cache_key = CacheKey::Objects {
1847            profile: pid.clone(),
1848            bucket: bid.clone(),
1849            prefix: prefix.clone(),
1850        };
1851        cache.put(&cache_key, &page, None).unwrap();
1852
1853        // A call with continuation_token = Some(_) must NOT look up the cache
1854        // (enforced in objects_list: `is_first_page` is false when token is Some).
1855        // The cache itself has no knowledge of tokens; verify the bypass gate.
1856        let has_token = true; // simulates continuation_token = Some("...")
1857        let is_first_page = !has_token;
1858
1859        // The lookup should be skipped.
1860        let result = if is_first_page {
1861            cache.get::<ListPage>(&cache_key, validated_at()).unwrap()
1862        } else {
1863            None // bypass — command does not read cache for page 2+
1864        };
1865
1866        assert!(
1867            result.is_none(),
1868            "cache must be bypassed when continuation_token is present"
1869        );
1870    }
1871
1872    // ------------------------------------------------------------------
1873    // Flat cache key differs from hierarchical key for the same prefix
1874    // ------------------------------------------------------------------
1875
1876    #[test]
1877    fn flat_and_hierarchical_cache_keys_do_not_collide() {
1878        let pid = ProfileId::new("p-flat");
1879        let bid = BucketId::new("flat-bucket");
1880        let prefix = "logs/".to_string();
1881        let flat_prefix = format!("{}__FLAT__", prefix);
1882
1883        let key_hier = CacheKey::Objects {
1884            profile: pid.clone(),
1885            bucket: bid.clone(),
1886            prefix: prefix.clone(),
1887        };
1888        let key_flat = CacheKey::Objects {
1889            profile: pid.clone(),
1890            bucket: bid.clone(),
1891            prefix: flat_prefix,
1892        };
1893
1894        assert_ne!(
1895            key_hier.serialize_key(),
1896            key_flat.serialize_key(),
1897            "flat and hierarchical cache keys must differ"
1898        );
1899    }
1900
1901    // ------------------------------------------------------------------
1902    // Cache gate: unvalidated profile cannot read from cache
1903    // ------------------------------------------------------------------
1904
1905    #[test]
1906    fn unvalidated_profile_cannot_read_objects_from_cache() {
1907        let pid = ProfileId::new("p-unval-objects");
1908        let bid = BucketId::new("secret-bucket");
1909        let cache = CacheStore::in_memory(CacheConfig::default());
1910
1911        let page = make_page("secret/", false);
1912        let cache_key = CacheKey::Objects {
1913            profile: pid.clone(),
1914            bucket: bid.clone(),
1915            prefix: "secret/".to_string(),
1916        };
1917        cache.put(&cache_key, &page, None).unwrap();
1918
1919        // validated_at = None → gate refuses read.
1920        let result = cache.get::<ListPage>(&cache_key, None).unwrap();
1921        assert!(
1922            result.is_none(),
1923            "unvalidated profile must not read objects from cache"
1924        );
1925    }
1926
1927    // ------------------------------------------------------------------
1928    // ObjectRef serialisation
1929    // ------------------------------------------------------------------
1930
1931    #[test]
1932    fn object_ref_serialises_camel_case() {
1933        let r = ObjectRef {
1934            bucket: BucketId::new("my-bucket"),
1935            key: "folder/file.txt".to_string(),
1936        };
1937        let v = serde_json::to_value(&r).unwrap();
1938        assert_eq!(v["bucket"], "my-bucket");
1939        assert_eq!(v["key"], "folder/file.txt");
1940    }
1941
1942    // ------------------------------------------------------------------
1943    // emit_objects_updated — event emission test via MockChannel
1944    // (round-1 finding #14)
1945    // ------------------------------------------------------------------
1946
1947    #[test]
1948    fn emit_objects_updated_emits_correct_payload() {
1949        use crate::events::{EventKind, MockChannel};
1950
1951        let channel = MockChannel::default();
1952        let pid = ProfileId::new("p1");
1953        let bid = BucketId::new("my-bucket");
1954
1955        emit_objects_updated(&channel, &pid, &bid, "photos/2024/").unwrap();
1956
1957        let emitted = channel.emitted();
1958        assert_eq!(emitted.len(), 1);
1959        assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
1960        assert_eq!(emitted[0].1["profileId"], "p1");
1961        assert_eq!(emitted[0].1["bucket"], "my-bucket");
1962        assert_eq!(emitted[0].1["prefix"], "photos/2024/");
1963    }
1964
1965    #[test]
1966    fn emit_objects_updated_root_prefix() {
1967        use crate::events::{EventKind, MockChannel};
1968
1969        let channel = MockChannel::default();
1970        let pid = ProfileId::new("p2");
1971        let bid = BucketId::new("root-bucket");
1972
1973        emit_objects_updated(&channel, &pid, &bid, "").unwrap();
1974
1975        let emitted = channel.emitted();
1976        assert_eq!(emitted.len(), 1);
1977        assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
1978        assert_eq!(emitted[0].1["prefix"], "");
1979    }
1980
1981    #[test]
1982    fn emit_objects_updated_two_different_prefixes() {
1983        use crate::events::{EventKind, MockChannel};
1984
1985        let channel = MockChannel::default();
1986        let pid = ProfileId::new("p3");
1987        let src = BucketId::new("src-bucket");
1988        let dst = BucketId::new("dst-bucket");
1989
1990        // Simulate what object_move does: emit for both source and dest.
1991        emit_objects_updated(&channel, &pid, &src, "old/path/").unwrap();
1992        emit_objects_updated(&channel, &pid, &dst, "new/path/").unwrap();
1993
1994        let emitted = channel.emitted();
1995        assert_eq!(emitted.len(), 2);
1996        assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
1997        assert_eq!(emitted[0].1["bucket"], "src-bucket");
1998        assert_eq!(emitted[0].1["prefix"], "old/path/");
1999        assert_eq!(emitted[1].1["bucket"], "dst-bucket");
2000        assert_eq!(emitted[1].1["prefix"], "new/path/");
2001    }
2002
2003    // ------------------------------------------------------------------
2004    // ObjectsUpdatedPayload camelCase serialisation
2005    // ------------------------------------------------------------------
2006
2007    #[test]
2008    fn objects_updated_payload_serialises_camel_case() {
2009        let payload = ObjectsUpdatedPayload {
2010            profile_id: ProfileId::new("prof"),
2011            bucket: BucketId::new("bkt"),
2012            prefix: "a/b/".to_string(),
2013        };
2014        let v = serde_json::to_value(&payload).unwrap();
2015        assert_eq!(v["profileId"], "prof");
2016        assert_eq!(v["bucket"], "bkt");
2017        assert_eq!(v["prefix"], "a/b/");
2018    }
2019
2020    // ------------------------------------------------------------------
2021    // DeleteKey serialisation
2022    // ------------------------------------------------------------------
2023
2024    #[test]
2025    fn delete_key_with_version_id_serialises_camel_case() {
2026        let dk = DeleteKey {
2027            key: "photos/img.jpg".to_string(),
2028            version_id: Some("vid-abc".to_string()),
2029        };
2030        let v = serde_json::to_value(&dk).unwrap();
2031        assert_eq!(v["key"], "photos/img.jpg");
2032        assert_eq!(v["versionId"], "vid-abc");
2033    }
2034
2035    #[test]
2036    fn delete_key_without_version_id_skips_field() {
2037        let dk = DeleteKey {
2038            key: "file.txt".to_string(),
2039            version_id: None,
2040        };
2041        let v = serde_json::to_value(&dk).unwrap();
2042        assert_eq!(v["key"], "file.txt");
2043        assert!(!v.as_object().unwrap().contains_key("versionId"));
2044    }
2045
2046    #[test]
2047    fn delete_key_deserialises_from_camel_case() {
2048        let json = r#"{"key":"a/b.txt","versionId":"v123"}"#;
2049        let dk: DeleteKey = serde_json::from_str(json).unwrap();
2050        assert_eq!(dk.key, "a/b.txt");
2051        assert_eq!(dk.version_id.as_deref(), Some("v123"));
2052    }
2053
2054    #[test]
2055    fn delete_key_deserialises_without_version_id() {
2056        let json = r#"{"key":"a/b.txt"}"#;
2057        let dk: DeleteKey = serde_json::from_str(json).unwrap();
2058        assert_eq!(dk.key, "a/b.txt");
2059        assert!(dk.version_id.is_none());
2060    }
2061
2062    // ------------------------------------------------------------------
2063    // Event emission: objects:updated fires once per affected prefix
2064    // ------------------------------------------------------------------
2065
2066    #[test]
2067    fn emit_objects_updated_multi_prefix_fires_once_per_prefix() {
2068        use crate::events::{EventKind, MockChannel};
2069        use std::collections::BTreeSet;
2070
2071        let channel = MockChannel::default();
2072        let pid = ProfileId::new("p-delete-batch");
2073        let bid = BucketId::new("my-bucket");
2074
2075        // Simulate what object_delete_batch does: collect unique parent prefixes
2076        // from successfully deleted keys and emit one event per prefix.
2077        let deleted_keys = [
2078            "photos/img1.jpg",
2079            "photos/img2.jpg",
2080            "docs/report.pdf",
2081            "docs/slides.pptx",
2082        ];
2083
2084        let affected_prefixes: BTreeSet<String> = deleted_keys
2085            .iter()
2086            .map(|k| crate::s3::object::parent_prefix(k))
2087            .collect();
2088
2089        for prefix in &affected_prefixes {
2090            emit_objects_updated(&channel, &pid, &bid, prefix).unwrap();
2091        }
2092
2093        let emitted = channel.emitted();
2094        // Two unique prefixes: "photos/" and "docs/"
2095        assert_eq!(
2096            emitted.len(),
2097            2,
2098            "must emit once per unique affected prefix"
2099        );
2100
2101        // BTreeSet guarantees alphabetical order: "docs/" < "photos/"
2102        assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2103        assert_eq!(emitted[0].1["prefix"], "docs/");
2104        assert_eq!(emitted[1].1["prefix"], "photos/");
2105    }
2106
2107    #[test]
2108    fn emit_objects_updated_root_prefix_only_once_for_root_keys() {
2109        use crate::events::{EventKind, MockChannel};
2110        use std::collections::BTreeSet;
2111
2112        let channel = MockChannel::default();
2113        let pid = ProfileId::new("p-delete-root");
2114        let bid = BucketId::new("root-bucket");
2115
2116        // Root-level keys: parent_prefix("file.txt") == ""
2117        let deleted_keys = ["file1.txt", "file2.txt"];
2118        let affected_prefixes: BTreeSet<String> = deleted_keys
2119            .iter()
2120            .map(|k| crate::s3::object::parent_prefix(k))
2121            .collect();
2122
2123        for prefix in &affected_prefixes {
2124            emit_objects_updated(&channel, &pid, &bid, prefix).unwrap();
2125        }
2126
2127        let emitted = channel.emitted();
2128        assert_eq!(
2129            emitted.len(),
2130            1,
2131            "two root-level keys share the same prefix"
2132        );
2133        assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2134        assert_eq!(emitted[0].1["prefix"], "");
2135    }
2136
2137    // ------------------------------------------------------------------
2138    // PutResult serialisation (from metadata module)
2139    // ------------------------------------------------------------------
2140
2141    #[test]
2142    fn put_result_serialises_camel_case() {
2143        let r = crate::s3::metadata::PutResult {
2144            etag: Some("abc123".to_string()),
2145            last_modified: Some(1_700_000_000_000),
2146            version_id: Some("v1".to_string()),
2147        };
2148        let v = serde_json::to_value(&r).unwrap();
2149        assert_eq!(v["etag"], "abc123");
2150        assert_eq!(v["lastModified"], 1_700_000_000_000_i64);
2151        assert_eq!(v["versionId"], "v1");
2152    }
2153
2154    #[test]
2155    fn put_result_skips_none_fields() {
2156        let r = crate::s3::metadata::PutResult {
2157            etag: None,
2158            last_modified: None,
2159            version_id: None,
2160        };
2161        let v = serde_json::to_value(&r).unwrap();
2162        assert!(!v.as_object().unwrap().contains_key("etag"));
2163        assert!(!v.as_object().unwrap().contains_key("lastModified"));
2164        assert!(!v.as_object().unwrap().contains_key("versionId"));
2165    }
2166
2167    // ------------------------------------------------------------------
2168    // objects:updated is emitted with correct prefix for set_metadata key
2169    // ------------------------------------------------------------------
2170
2171    #[test]
2172    fn set_metadata_emit_objects_updated_correct_prefix() {
2173        use crate::events::{EventKind, MockChannel};
2174
2175        let channel = MockChannel::default();
2176        let pid = ProfileId::new("p-meta");
2177        let bid = BucketId::new("my-bucket");
2178
2179        // key = "reports/2024/annual.pdf" → parent = "reports/2024/"
2180        let key = "reports/2024/annual.pdf";
2181        let prefix = crate::s3::object::parent_prefix(key);
2182
2183        emit_objects_updated(&channel, &pid, &bid, &prefix).unwrap();
2184
2185        let emitted = channel.emitted();
2186        assert_eq!(emitted.len(), 1);
2187        assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2188        assert_eq!(emitted[0].1["prefix"], "reports/2024/");
2189    }
2190
2191    // ------------------------------------------------------------------
2192    // objects:updated is emitted with correct prefix for set_tags key
2193    // ------------------------------------------------------------------
2194
2195    #[test]
2196    fn set_tags_emit_objects_updated_correct_prefix() {
2197        use crate::events::{EventKind, MockChannel};
2198
2199        let channel = MockChannel::default();
2200        let pid = ProfileId::new("p-tags");
2201        let bid = BucketId::new("tagged-bucket");
2202
2203        // key = "logs/app/server.log" → parent = "logs/app/"
2204        let key = "logs/app/server.log";
2205        let prefix = crate::s3::object::parent_prefix(key);
2206
2207        emit_objects_updated(&channel, &pid, &bid, &prefix).unwrap();
2208
2209        let emitted = channel.emitted();
2210        assert_eq!(emitted.len(), 1);
2211        assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2212        assert_eq!(emitted[0].1["prefix"], "logs/app/");
2213    }
2214
2215    // ------------------------------------------------------------------
2216    // ETag conflict error serialisation (used by both set_metadata + set_tags)
2217    // ------------------------------------------------------------------
2218
2219    #[test]
2220    fn conflict_error_serialises_expected_etag() {
2221        let err = AppError::Conflict {
2222            etag_expected: "\"etag-original\"".to_string(),
2223            etag_actual: Some("\"etag-modified\"".to_string()),
2224        };
2225        let v = serde_json::to_value(&err).unwrap();
2226        assert_eq!(v["kind"], "Conflict");
2227        assert_eq!(v["details"]["etagExpected"], "\"etag-original\"");
2228        assert_eq!(v["details"]["etagActual"], "\"etag-modified\"");
2229    }
2230
2231    #[test]
2232    fn conflict_error_without_actual_etag() {
2233        let err = AppError::Conflict {
2234            etag_expected: "\"abc\"".to_string(),
2235            etag_actual: None,
2236        };
2237        let v = serde_json::to_value(&err).unwrap();
2238        assert_eq!(v["kind"], "Conflict");
2239        assert!(v["details"]["etagActual"].is_null());
2240    }
2241
2242    // ------------------------------------------------------------------
2243    // object_presign: expiry range validation
2244    // ------------------------------------------------------------------
2245
2246    #[test]
2247    fn presign_expires_below_minimum_returns_validation_error() {
2248        // 1 second is below the 60-second minimum.
2249        let result = crate::s3::presign::presign_get_object_validate_only(1);
2250        match result {
2251            Err(AppError::Validation { field, hint }) => {
2252                assert_eq!(field, "expires_secs");
2253                assert!(
2254                    hint.contains("60"),
2255                    "hint should mention the 60-second minimum: {hint}"
2256                );
2257            }
2258            other => panic!("expected Validation error, got {:?}", other),
2259        }
2260    }
2261
2262    #[test]
2263    fn presign_expires_at_minimum_is_ok() {
2264        assert!(crate::s3::presign::presign_get_object_validate_only(
2265            crate::s3::presign::MIN_EXPIRES_SECS
2266        )
2267        .is_ok());
2268    }
2269
2270    #[test]
2271    fn presign_expires_one_hour_is_ok() {
2272        assert!(crate::s3::presign::presign_get_object_validate_only(3_600).is_ok());
2273    }
2274
2275    #[test]
2276    fn presign_expires_at_maximum_is_ok() {
2277        assert!(crate::s3::presign::presign_get_object_validate_only(
2278            crate::s3::presign::MAX_EXPIRES_SECS
2279        )
2280        .is_ok());
2281    }
2282
2283    #[test]
2284    fn presign_expires_above_maximum_returns_validation_error() {
2285        let result = crate::s3::presign::presign_get_object_validate_only(
2286            crate::s3::presign::MAX_EXPIRES_SECS + 1,
2287        );
2288        match result {
2289            Err(AppError::Validation { field, hint }) => {
2290                assert_eq!(field, "expires_secs");
2291                assert!(
2292                    hint.contains("604800") || hint.contains("7 days"),
2293                    "hint should mention the 7-day maximum: {hint}"
2294                );
2295            }
2296            other => panic!("expected Validation error, got {:?}", other),
2297        }
2298    }
2299
2300    // ------------------------------------------------------------------
2301    // PresignedUrl IPC shape
2302    // ------------------------------------------------------------------
2303
2304    #[test]
2305    fn presigned_url_ipc_shape_is_camel_case() {
2306        let p = crate::s3::presign::PresignedUrl {
2307            url: "https://example.com/bucket/key?X-Amz-Signature=sig".to_string(),
2308            expires_at: 1_700_000_000_000,
2309        };
2310        let v = serde_json::to_value(&p).unwrap();
2311        assert!(v.as_object().unwrap().contains_key("url"));
2312        assert!(v.as_object().unwrap().contains_key("expiresAt"));
2313        assert_eq!(
2314            v["url"],
2315            "https://example.com/bucket/key?X-Amz-Signature=sig"
2316        );
2317        assert_eq!(v["expiresAt"], 1_700_000_000_000_i64);
2318    }
2319
2320    // ------------------------------------------------------------------
2321    // object_set_storage_class: diff consumption safety gate tests
2322    //
2323    // These tests exercise the diff framework safety gate without calling
2324    // the S3 API (no mock client needed — the error path fires before S3).
2325    // ------------------------------------------------------------------
2326
2327    #[test]
2328    fn diff_consume_cancelled_diff_returns_validation_hint() {
2329        use crate::diff::{DiffPayload, DiffStore, DiffStoreHandle};
2330        use std::collections::HashMap;
2331
2332        let diff_store = DiffStoreHandle::new(DiffStore::new());
2333        let p = DiffPayload::StorageClass {
2334            targets: vec![],
2335            current: HashMap::new(),
2336            new_class: "GLACIER".to_string(),
2337        };
2338        let id = diff_store.inner.create(p);
2339        diff_store.inner.cancel(&id).unwrap();
2340
2341        // Simulate what object_set_storage_class does on consume failure.
2342        let consumed = diff_store.inner.consume(&id);
2343        assert!(consumed.is_none(), "cancelled diff must not be consumable");
2344
2345        let record = diff_store.inner.get(&id).unwrap();
2346        let err = match record.status {
2347            crate::diff::DiffStatus::Cancelled | crate::diff::DiffStatus::Expired => {
2348                AppError::Validation {
2349                    field: "confirmed_diff_id".to_string(),
2350                    hint: "Diff was cancelled or expired".to_string(),
2351                }
2352            }
2353            crate::diff::DiffStatus::Confirmed => AppError::Validation {
2354                field: "confirmed_diff_id".to_string(),
2355                hint: "Diff already consumed (double-confirm rejection)".to_string(),
2356            },
2357            _ => AppError::Internal {
2358                trace_id: "unexpected".to_string(),
2359            },
2360        };
2361
2362        match err {
2363            AppError::Validation { hint, .. } => {
2364                assert!(hint.contains("cancelled or expired"));
2365            }
2366            other => panic!("expected Validation, got {:?}", other),
2367        }
2368    }
2369
2370    #[test]
2371    fn diff_consume_expired_diff_returns_validation_hint() {
2372        use crate::diff::{DiffPayload, DiffStore, DiffStoreHandle};
2373        use std::collections::HashMap;
2374
2375        let diff_store = DiffStoreHandle::new(DiffStore::with_ttl(10));
2376        let now = 1_000_000_i64;
2377        let p = DiffPayload::StorageClass {
2378            targets: vec![],
2379            current: HashMap::new(),
2380            new_class: "GLACIER".to_string(),
2381        };
2382        let id = diff_store.inner.create_at(p, now);
2383
2384        // Consume past TTL.
2385        let consumed = diff_store.inner.consume_at(&id, now + 11);
2386        assert!(consumed.is_none(), "expired diff must not be consumable");
2387    }
2388
2389    #[test]
2390    fn diff_double_confirm_returns_already_consumed_hint() {
2391        use crate::diff::{DiffPayload, DiffStore, DiffStoreHandle};
2392        use std::collections::HashMap;
2393
2394        let diff_store = DiffStoreHandle::new(DiffStore::new());
2395        let p = DiffPayload::StorageClass {
2396            targets: vec![],
2397            current: HashMap::new(),
2398            new_class: "GLACIER".to_string(),
2399        };
2400        let id = diff_store.inner.create(p);
2401
2402        let first = diff_store.inner.consume(&id);
2403        assert!(first.is_some(), "first consume must succeed");
2404
2405        let second = diff_store.inner.consume(&id);
2406        assert!(second.is_none(), "double consume must fail");
2407
2408        let record = diff_store.inner.get(&id).unwrap();
2409        match record.status {
2410            crate::diff::DiffStatus::Confirmed => {
2411                // Expected: already consumed
2412            }
2413            other => panic!("expected Confirmed status, got {:?}", other),
2414        }
2415    }
2416
2417    // ------------------------------------------------------------------
2418    // Decision D2: objects:updated is emitted on storage class change
2419    // (round-1 finding #14 — event emission test for storage class path)
2420    // ------------------------------------------------------------------
2421
2422    #[test]
2423    fn storage_class_change_emit_objects_updated_uses_parent_prefix() {
2424        use crate::events::{EventKind, MockChannel};
2425
2426        let channel = MockChannel::default();
2427        let pid = ProfileId::new("p-sc");
2428        let bid = BucketId::new("sc-bucket");
2429
2430        // Simulate what object_set_storage_class does after a successful change.
2431        let key = "archive/2024/data.csv";
2432        let prefix = crate::s3::object::parent_prefix(key);
2433        emit_objects_updated(&channel, &pid, &bid, &prefix).unwrap();
2434
2435        let emitted = channel.emitted();
2436        assert_eq!(emitted.len(), 1);
2437        assert_eq!(emitted[0].0, EventKind::ObjectsUpdated);
2438        assert_eq!(emitted[0].1["bucket"], "sc-bucket");
2439        assert_eq!(emitted[0].1["prefix"], "archive/2024/");
2440    }
2441}