Skip to main content

brows3r_lib/commands/
buckets_cmd.rs

1//! Tauri commands for bucket listing and region discovery.
2//!
3//! # Commands
4//!
5//! - [`buckets_list`]      — list buckets for a profile; SWR cache + validation gate.
6//! - [`bucket_region_get`] — get cached region for one bucket; lazy-resolves on miss.
7//!
8//! # Validation gate (AC-8 / round-1 finding #9)
9//!
10//! Both commands refuse to serve any data when `profile.validated_at` is `None`.
11//! The cache itself also enforces this, but the command boundary check is
12//! defence-in-depth: a future refactor of the cache must not silently lift the gate.
13//!
14//! # SWR behaviour
15//!
16//! - Fresh (within TTL): return cache directly.
17//! - Stale (past TTL, within SWR window): return stale value immediately and
18//!   spawn a background task that re-fetches and updates the cache.
19//! - Missing: fetch synchronously, store, return.
20//! - `force = Some(true)`: bypass cache; fetch synchronously.
21//!
22//! # Region discovery
23//!
24//! After a successful bucket list, a background task fires `GetBucketLocation`
25//! for every bucket. Failures are logged as `Severity::Warning` notifications
26//! and do not fail the command. Each successful discovery updates the cache
27//! and emits a `buckets:updated` event.
28
29use std::sync::Arc;
30
31use tauri::{AppHandle, State};
32use tokio::task;
33
34use crate::{
35    cache::{store::CacheHandle, CacheKey, Freshness},
36    error::AppError,
37    events::{self, EventKind},
38    ids::{BucketId, ProfileId},
39    notifications::{Notification, NotificationCategory, NotificationLogHandle, Severity},
40    profiles::{Profile, ProfileStoreHandle},
41    s3::{
42        list::{discover_bucket_region, list_buckets, BucketSummary},
43        ClientPool, S3ClientPoolHandle,
44    },
45};
46
47// ---------------------------------------------------------------------------
48// Region cache key helper
49// ---------------------------------------------------------------------------
50
51// Per-bucket regions are stored using a reuse of the ObjectHead key with
52// the sentinel ObjectKey `__region__`. This avoids adding a new CacheKey
53// variant inside task-23 scope while still giving per-bucket region
54// persistence through the existing cache infrastructure.
55fn region_cache_key(profile_id: &ProfileId, bucket: &str) -> CacheKey {
56    CacheKey::ObjectHead {
57        profile: profile_id.clone(),
58        bucket: BucketId::new(bucket),
59        key: crate::ids::ObjectKey::new("__region__"),
60    }
61}
62
63// ---------------------------------------------------------------------------
64// buckets_list — main command
65// ---------------------------------------------------------------------------
66
67/// List all buckets for the given profile.
68///
69/// Applies the validation gate, SWR cache logic, and background region
70/// discovery. Emits `buckets:updated { profileId }` after every revalidation.
71#[tauri::command]
72pub async fn buckets_list(
73    profile_id: ProfileId,
74    force: Option<bool>,
75    store: State<'_, ProfileStoreHandle>,
76    pool: State<'_, S3ClientPoolHandle>,
77    cache: State<'_, CacheHandle>,
78    notification_log: State<'_, NotificationLogHandle>,
79    channel: AppHandle,
80) -> Result<Vec<BucketSummary>, AppError> {
81    // ------------------------------------------------------------------
82    // 1. Resolve profile + validation gate
83    // ------------------------------------------------------------------
84    let profile = {
85        let store_guard = store.inner.lock().await;
86        store_guard
87            .get(&profile_id)
88            .ok_or_else(|| AppError::NotFound {
89                resource: format!("profile:{}", profile_id.as_str()),
90            })?
91    };
92
93    // Command-boundary validation gate (defence-in-depth; cache also enforces).
94    if profile.validated_at.is_none() {
95        return Err(AppError::Auth {
96            reason: "profile_not_validated_in_session".to_string(),
97        });
98    }
99
100    let default_region = profile
101        .default_region
102        .clone()
103        .unwrap_or_else(|| "us-east-1".to_string());
104
105    // Clone the Arc handles we need to pass to background tasks.
106    let pool_arc: Arc<ClientPool> = pool.inner.clone();
107    let cache_arc: CacheHandle = (*cache).clone();
108    let log_arc = notification_log.0.clone();
109
110    // ------------------------------------------------------------------
111    // 2. SWR cache check (skip if force = true)
112    // ------------------------------------------------------------------
113    if force != Some(true) {
114        let cache_key = CacheKey::Buckets(profile_id.clone());
115        let cached = cache_arc.get::<Vec<BucketSummary>>(&cache_key, profile.validated_at)?;
116
117        if let Some(read) = cached {
118            match read.freshness {
119                Freshness::Fresh => return Ok(read.value),
120                Freshness::Stale => {
121                    // Return stale value immediately, revalidate in background.
122                    let stale_value = read.value.clone();
123                    let profile_id_bg = profile_id.clone();
124                    let profile_bg = profile.clone();
125                    let default_region_bg = default_region.clone();
126                    let pool_bg = pool_arc.clone();
127                    let cache_bg = cache_arc.clone();
128                    let log_bg = log_arc.clone();
129                    let channel_bg = channel.clone();
130
131                    task::spawn(async move {
132                        revalidate_buckets(
133                            profile_id_bg,
134                            profile_bg,
135                            default_region_bg,
136                            pool_bg,
137                            cache_bg,
138                            log_bg,
139                            channel_bg,
140                        )
141                        .await;
142                    });
143
144                    return Ok(stale_value);
145                }
146                Freshness::Missing => {
147                    // Past SWR window — fall through to fresh fetch below.
148                }
149            }
150        }
151    }
152
153    // ------------------------------------------------------------------
154    // 3. Fresh fetch (cache miss, past SWR window, or force=true)
155    // ------------------------------------------------------------------
156    let client = pool_arc
157        .get_or_build(&profile_id, &default_region)
158        .await
159        .ok_or_else(|| AppError::Internal {
160            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
161        })?;
162
163    let buckets = list_buckets(&client, &profile_id).await?;
164
165    // Store in cache.
166    cache_arc.put(&CacheKey::Buckets(profile_id.clone()), &buckets, None)?;
167
168    // Emit buckets:updated.
169    let _ = events::emit(
170        &channel,
171        EventKind::BucketsUpdated,
172        serde_json::json!({ "profileId": profile_id.as_str() }),
173    );
174
175    // Spawn background region discovery.
176    spawn_region_discovery(
177        profile_id.clone(),
178        buckets.clone(),
179        pool_arc,
180        cache_arc,
181        log_arc,
182        channel,
183        default_region,
184    );
185
186    Ok(buckets)
187}
188
189// ---------------------------------------------------------------------------
190// bucket_region_get — per-bucket region lookup
191// ---------------------------------------------------------------------------
192
193/// Return the cached region for `bucket`, resolving it lazily on cache miss.
194///
195/// If the region is not yet known (background discovery has not finished),
196/// this command resolves it synchronously and caches the result.
197#[tauri::command]
198pub async fn bucket_region_get(
199    profile_id: ProfileId,
200    bucket: BucketId,
201    store: State<'_, ProfileStoreHandle>,
202    pool: State<'_, S3ClientPoolHandle>,
203    cache: State<'_, CacheHandle>,
204) -> Result<String, AppError> {
205    // Resolve profile + validation gate.
206    let profile = {
207        let store_guard = store.inner.lock().await;
208        store_guard
209            .get(&profile_id)
210            .ok_or_else(|| AppError::NotFound {
211                resource: format!("profile:{}", profile_id.as_str()),
212            })?
213    };
214
215    if profile.validated_at.is_none() {
216        return Err(AppError::Auth {
217            reason: "profile_not_validated_in_session".to_string(),
218        });
219    }
220
221    let cache_arc: CacheHandle = (*cache).clone();
222    let bucket_str = bucket.as_str().to_string();
223    let key = region_cache_key(&profile_id, &bucket_str);
224
225    // Check cache first.
226    if let Some(read) = cache_arc.get::<String>(&key, profile.validated_at)? {
227        if read.freshness != Freshness::Missing {
228            return Ok(read.value);
229        }
230    }
231
232    // Lazy resolution: build a client and call GetBucketLocation.
233    let default_region = profile
234        .default_region
235        .as_deref()
236        .unwrap_or("us-east-1")
237        .to_string();
238    let client = pool
239        .inner
240        .get_or_build(&profile_id, &default_region)
241        .await
242        .ok_or_else(|| AppError::Internal {
243            trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
244        })?;
245
246    let region = discover_bucket_region(&client, &bucket_str)
247        .await?
248        .unwrap_or(default_region);
249
250    cache_arc.put(&key, &region, None)?;
251    Ok(region)
252}
253
254// ---------------------------------------------------------------------------
255// Internal helpers
256// ---------------------------------------------------------------------------
257
258/// Re-fetch the bucket list and update the cache + emit events.
259/// Called from the background task spawned on a stale cache hit.
260async fn revalidate_buckets(
261    profile_id: ProfileId,
262    _profile: Profile,
263    default_region: String,
264    pool: Arc<ClientPool>,
265    cache: CacheHandle,
266    log: Arc<tokio::sync::RwLock<crate::notifications::NotificationLog>>,
267    channel: AppHandle,
268) {
269    let client = match pool.get_or_build(&profile_id, &default_region).await {
270        Some(c) => c,
271        None => return,
272    };
273
274    match list_buckets(&client, &profile_id).await {
275        Ok(buckets) => {
276            let _ = cache.put(&CacheKey::Buckets(profile_id.clone()), &buckets, None);
277            let _ = events::emit(
278                &channel,
279                EventKind::BucketsUpdated,
280                serde_json::json!({ "profileId": profile_id.as_str() }),
281            );
282            // Kick off region discovery for the refreshed list.
283            spawn_region_discovery(
284                profile_id,
285                buckets,
286                pool,
287                cache,
288                log,
289                channel,
290                default_region,
291            );
292        }
293        Err(e) => {
294            push_bg_warning(
295                &log,
296                &format!("background bucket revalidation failed: {e}"),
297                &profile_id,
298            )
299            .await;
300        }
301    }
302}
303
304/// Spawn a background task that calls `GetBucketLocation` for every bucket
305/// in the list, updating the per-bucket region cache on success and emitting
306/// `buckets:updated` per discovery.
307fn spawn_region_discovery(
308    profile_id: ProfileId,
309    buckets: Vec<BucketSummary>,
310    pool: Arc<ClientPool>,
311    cache: CacheHandle,
312    log: Arc<tokio::sync::RwLock<crate::notifications::NotificationLog>>,
313    channel: AppHandle,
314    default_region: String,
315) {
316    let names: Vec<String> = buckets.into_iter().map(|b| b.name).collect();
317
318    task::spawn(async move {
319        let client = match pool.get_or_build(&profile_id, &default_region).await {
320            Some(c) => c,
321            None => return,
322        };
323
324        for bucket_name in &names {
325            match discover_bucket_region(&client, bucket_name).await {
326                Ok(Some(region)) => {
327                    let key = region_cache_key(&profile_id, bucket_name);
328                    let _ = cache.put(&key, &region, None);
329                    let _ = events::emit(
330                        &channel,
331                        EventKind::BucketsUpdated,
332                        serde_json::json!({ "profileId": profile_id.as_str() }),
333                    );
334                }
335                Ok(None) => {
336                    // AccessDenied or NoSuchBucket — silently skip.
337                }
338                Err(e) => {
339                    push_bg_warning(
340                        &log,
341                        &format!("region discovery for {bucket_name}: {e}"),
342                        &profile_id,
343                    )
344                    .await;
345                }
346            }
347        }
348    });
349}
350
351/// Push a `Severity::Warning` notification into the log — best-effort.
352async fn push_bg_warning(
353    log: &tokio::sync::RwLock<crate::notifications::NotificationLog>,
354    message: &str,
355    profile_id: &ProfileId,
356) {
357    let notification = Notification {
358        id: uuid::Uuid::new_v4().to_string(),
359        severity: Severity::Warning,
360        category: NotificationCategory::Background,
361        title: "S3 background operation".to_string(),
362        message: message.to_string(),
363        resource: Some(format!("profile:{}", profile_id.as_str())),
364        operation: Some("buckets_list".to_string()),
365        timestamp: std::time::SystemTime::now()
366            .duration_since(std::time::UNIX_EPOCH)
367            .map(|d| d.as_millis() as i64)
368            .unwrap_or(0),
369        details: None,
370    };
371    log.write().await.push(notification);
372}
373
374// ---------------------------------------------------------------------------
375// Tests
376// ---------------------------------------------------------------------------
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use crate::{
382        cache::{store::CacheStore, CacheConfig},
383        events::MockChannel,
384    };
385
386    fn validated_profile_id() -> ProfileId {
387        ProfileId::new("p-validated")
388    }
389
390    fn validated_at() -> Option<i64> {
391        Some(1_700_000_000_000)
392    }
393
394    // ------------------------------------------------------------------
395    // Validation gate: unvalidated profile returns Auth error
396    // ------------------------------------------------------------------
397
398    #[test]
399    fn unvalidated_profile_gate_returns_auth_error() {
400        // Simulate the gate check directly (no live Tauri command context needed).
401        let validated_at: Option<i64> = None;
402        let result: Result<(), AppError> = if validated_at.is_none() {
403            Err(AppError::Auth {
404                reason: "profile_not_validated_in_session".to_string(),
405            })
406        } else {
407            Ok(())
408        };
409
410        match result {
411            Err(AppError::Auth { reason }) => {
412                assert_eq!(reason, "profile_not_validated_in_session");
413            }
414            _ => panic!("expected Auth error for unvalidated profile"),
415        }
416    }
417
418    // ------------------------------------------------------------------
419    // Cache fresh hit: cached value returned without an SDK call
420    // ------------------------------------------------------------------
421
422    #[test]
423    fn cache_fresh_hit_returns_without_sdk_call() {
424        let pid = validated_profile_id();
425        let cache = CacheStore::in_memory(CacheConfig::default());
426
427        let expected = vec![BucketSummary {
428            name: "my-bucket".to_string(),
429            creation_date: Some(1_700_000_000_000),
430            region: Some("us-east-1".to_string()),
431            profile_id: pid.clone(),
432        }];
433
434        cache
435            .put(&CacheKey::Buckets(pid.clone()), &expected, None)
436            .expect("put must succeed");
437
438        let read = cache
439            .get::<Vec<BucketSummary>>(&CacheKey::Buckets(pid.clone()), validated_at())
440            .expect("get must not error")
441            .expect("entry must exist");
442
443        assert_eq!(read.freshness, Freshness::Fresh);
444        assert_eq!(read.value.len(), 1);
445        assert_eq!(read.value[0].name, "my-bucket");
446    }
447
448    // ------------------------------------------------------------------
449    // Cache gate: unvalidated profile cannot read from cache
450    // ------------------------------------------------------------------
451
452    #[test]
453    fn unvalidated_profile_cannot_read_from_cache() {
454        let pid = ProfileId::new("p-unval");
455        let cache = CacheStore::in_memory(CacheConfig::default());
456
457        let buckets = vec![BucketSummary {
458            name: "secret-bucket".to_string(),
459            creation_date: None,
460            region: None,
461            profile_id: pid.clone(),
462        }];
463        cache
464            .put(&CacheKey::Buckets(pid.clone()), &buckets, None)
465            .unwrap();
466
467        // validated_at = None → gate refuses read.
468        let result = cache
469            .get::<Vec<BucketSummary>>(&CacheKey::Buckets(pid.clone()), None)
470            .expect("get must not error");
471
472        assert!(
473            result.is_none(),
474            "unvalidated profile must not read from cache"
475        );
476    }
477
478    // ------------------------------------------------------------------
479    // Event emission: buckets:updated carries profileId
480    // ------------------------------------------------------------------
481
482    #[test]
483    fn buckets_updated_event_carries_profile_id() {
484        let channel = MockChannel::default();
485        let pid = ProfileId::new("evt-profile");
486
487        events::emit(
488            &channel,
489            EventKind::BucketsUpdated,
490            serde_json::json!({ "profileId": pid.as_str() }),
491        )
492        .expect("emit must succeed");
493
494        let emitted = channel.emitted();
495        assert_eq!(emitted.len(), 1);
496        assert_eq!(emitted[0].0, EventKind::BucketsUpdated);
497        assert_eq!(emitted[0].1["profileId"], pid.as_str());
498    }
499
500    // ------------------------------------------------------------------
501    // region_cache_key: different buckets produce different keys
502    // ------------------------------------------------------------------
503
504    #[test]
505    fn region_cache_keys_are_bucket_specific() {
506        let pid = ProfileId::new("p1");
507        let k1 = region_cache_key(&pid, "bucket-a");
508        let k2 = region_cache_key(&pid, "bucket-b");
509        assert_ne!(k1.serialize_key(), k2.serialize_key());
510    }
511
512    // ------------------------------------------------------------------
513    // region_cache_key: different profiles produce different keys
514    // ------------------------------------------------------------------
515
516    #[test]
517    fn region_cache_keys_are_profile_specific() {
518        let k1 = region_cache_key(&ProfileId::new("p1"), "same-bucket");
519        let k2 = region_cache_key(&ProfileId::new("p2"), "same-bucket");
520        assert_ne!(k1.serialize_key(), k2.serialize_key());
521    }
522
523    // ------------------------------------------------------------------
524    // Region cache put+get round-trip
525    // ------------------------------------------------------------------
526
527    #[test]
528    fn region_cache_round_trip() {
529        let pid = ProfileId::new("p-region");
530        let cache = CacheStore::in_memory(CacheConfig::default());
531        let key = region_cache_key(&pid, "bucket-eu");
532
533        cache.put(&key, "eu-west-1".to_string(), None).unwrap();
534
535        let read = cache
536            .get::<String>(&key, Some(1_700_000_000_000))
537            .unwrap()
538            .unwrap();
539        assert_eq!(read.value, "eu-west-1");
540        assert_eq!(read.freshness, Freshness::Fresh);
541    }
542}