1use std::sync::Arc;
30
31use tauri::{AppHandle, State};
32use tokio::task;
33
34use crate::{
35 cache::{store::CacheHandle, CacheKey, Freshness},
36 error::AppError,
37 events::{self, EventKind},
38 ids::{BucketId, ProfileId},
39 notifications::{Notification, NotificationCategory, NotificationLogHandle, Severity},
40 profiles::{Profile, ProfileStoreHandle},
41 s3::{
42 list::{discover_bucket_region, list_buckets, BucketSummary},
43 ClientPool, S3ClientPoolHandle,
44 },
45};
46
47fn region_cache_key(profile_id: &ProfileId, bucket: &str) -> CacheKey {
56 CacheKey::ObjectHead {
57 profile: profile_id.clone(),
58 bucket: BucketId::new(bucket),
59 key: crate::ids::ObjectKey::new("__region__"),
60 }
61}
62
63#[tauri::command]
72pub async fn buckets_list(
73 profile_id: ProfileId,
74 force: Option<bool>,
75 store: State<'_, ProfileStoreHandle>,
76 pool: State<'_, S3ClientPoolHandle>,
77 cache: State<'_, CacheHandle>,
78 notification_log: State<'_, NotificationLogHandle>,
79 channel: AppHandle,
80) -> Result<Vec<BucketSummary>, AppError> {
81 let profile = {
85 let store_guard = store.inner.lock().await;
86 store_guard
87 .get(&profile_id)
88 .ok_or_else(|| AppError::NotFound {
89 resource: format!("profile:{}", profile_id.as_str()),
90 })?
91 };
92
93 if profile.validated_at.is_none() {
95 return Err(AppError::Auth {
96 reason: "profile_not_validated_in_session".to_string(),
97 });
98 }
99
100 let default_region = profile
101 .default_region
102 .clone()
103 .unwrap_or_else(|| "us-east-1".to_string());
104
105 let pool_arc: Arc<ClientPool> = pool.inner.clone();
107 let cache_arc: CacheHandle = (*cache).clone();
108 let log_arc = notification_log.0.clone();
109
110 if force != Some(true) {
114 let cache_key = CacheKey::Buckets(profile_id.clone());
115 let cached = cache_arc.get::<Vec<BucketSummary>>(&cache_key, profile.validated_at)?;
116
117 if let Some(read) = cached {
118 match read.freshness {
119 Freshness::Fresh => return Ok(read.value),
120 Freshness::Stale => {
121 let stale_value = read.value.clone();
123 let profile_id_bg = profile_id.clone();
124 let profile_bg = profile.clone();
125 let default_region_bg = default_region.clone();
126 let pool_bg = pool_arc.clone();
127 let cache_bg = cache_arc.clone();
128 let log_bg = log_arc.clone();
129 let channel_bg = channel.clone();
130
131 task::spawn(async move {
132 revalidate_buckets(
133 profile_id_bg,
134 profile_bg,
135 default_region_bg,
136 pool_bg,
137 cache_bg,
138 log_bg,
139 channel_bg,
140 )
141 .await;
142 });
143
144 return Ok(stale_value);
145 }
146 Freshness::Missing => {
147 }
149 }
150 }
151 }
152
153 let client = pool_arc
157 .get_or_build(&profile_id, &default_region)
158 .await
159 .ok_or_else(|| AppError::Internal {
160 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
161 })?;
162
163 let buckets = list_buckets(&client, &profile_id).await?;
164
165 cache_arc.put(&CacheKey::Buckets(profile_id.clone()), &buckets, None)?;
167
168 let _ = events::emit(
170 &channel,
171 EventKind::BucketsUpdated,
172 serde_json::json!({ "profileId": profile_id.as_str() }),
173 );
174
175 spawn_region_discovery(
177 profile_id.clone(),
178 buckets.clone(),
179 pool_arc,
180 cache_arc,
181 log_arc,
182 channel,
183 default_region,
184 );
185
186 Ok(buckets)
187}
188
189#[tauri::command]
198pub async fn bucket_region_get(
199 profile_id: ProfileId,
200 bucket: BucketId,
201 store: State<'_, ProfileStoreHandle>,
202 pool: State<'_, S3ClientPoolHandle>,
203 cache: State<'_, CacheHandle>,
204) -> Result<String, AppError> {
205 let profile = {
207 let store_guard = store.inner.lock().await;
208 store_guard
209 .get(&profile_id)
210 .ok_or_else(|| AppError::NotFound {
211 resource: format!("profile:{}", profile_id.as_str()),
212 })?
213 };
214
215 if profile.validated_at.is_none() {
216 return Err(AppError::Auth {
217 reason: "profile_not_validated_in_session".to_string(),
218 });
219 }
220
221 let cache_arc: CacheHandle = (*cache).clone();
222 let bucket_str = bucket.as_str().to_string();
223 let key = region_cache_key(&profile_id, &bucket_str);
224
225 if let Some(read) = cache_arc.get::<String>(&key, profile.validated_at)? {
227 if read.freshness != Freshness::Missing {
228 return Ok(read.value);
229 }
230 }
231
232 let default_region = profile
234 .default_region
235 .as_deref()
236 .unwrap_or("us-east-1")
237 .to_string();
238 let client = pool
239 .inner
240 .get_or_build(&profile_id, &default_region)
241 .await
242 .ok_or_else(|| AppError::Internal {
243 trace_id: format!("pool_miss:profile:{}", profile_id.as_str()),
244 })?;
245
246 let region = discover_bucket_region(&client, &bucket_str)
247 .await?
248 .unwrap_or(default_region);
249
250 cache_arc.put(&key, ®ion, None)?;
251 Ok(region)
252}
253
254async fn revalidate_buckets(
261 profile_id: ProfileId,
262 _profile: Profile,
263 default_region: String,
264 pool: Arc<ClientPool>,
265 cache: CacheHandle,
266 log: Arc<tokio::sync::RwLock<crate::notifications::NotificationLog>>,
267 channel: AppHandle,
268) {
269 let client = match pool.get_or_build(&profile_id, &default_region).await {
270 Some(c) => c,
271 None => return,
272 };
273
274 match list_buckets(&client, &profile_id).await {
275 Ok(buckets) => {
276 let _ = cache.put(&CacheKey::Buckets(profile_id.clone()), &buckets, None);
277 let _ = events::emit(
278 &channel,
279 EventKind::BucketsUpdated,
280 serde_json::json!({ "profileId": profile_id.as_str() }),
281 );
282 spawn_region_discovery(
284 profile_id,
285 buckets,
286 pool,
287 cache,
288 log,
289 channel,
290 default_region,
291 );
292 }
293 Err(e) => {
294 push_bg_warning(
295 &log,
296 &format!("background bucket revalidation failed: {e}"),
297 &profile_id,
298 )
299 .await;
300 }
301 }
302}
303
304fn spawn_region_discovery(
308 profile_id: ProfileId,
309 buckets: Vec<BucketSummary>,
310 pool: Arc<ClientPool>,
311 cache: CacheHandle,
312 log: Arc<tokio::sync::RwLock<crate::notifications::NotificationLog>>,
313 channel: AppHandle,
314 default_region: String,
315) {
316 let names: Vec<String> = buckets.into_iter().map(|b| b.name).collect();
317
318 task::spawn(async move {
319 let client = match pool.get_or_build(&profile_id, &default_region).await {
320 Some(c) => c,
321 None => return,
322 };
323
324 for bucket_name in &names {
325 match discover_bucket_region(&client, bucket_name).await {
326 Ok(Some(region)) => {
327 let key = region_cache_key(&profile_id, bucket_name);
328 let _ = cache.put(&key, ®ion, None);
329 let _ = events::emit(
330 &channel,
331 EventKind::BucketsUpdated,
332 serde_json::json!({ "profileId": profile_id.as_str() }),
333 );
334 }
335 Ok(None) => {
336 }
338 Err(e) => {
339 push_bg_warning(
340 &log,
341 &format!("region discovery for {bucket_name}: {e}"),
342 &profile_id,
343 )
344 .await;
345 }
346 }
347 }
348 });
349}
350
351async fn push_bg_warning(
353 log: &tokio::sync::RwLock<crate::notifications::NotificationLog>,
354 message: &str,
355 profile_id: &ProfileId,
356) {
357 let notification = Notification {
358 id: uuid::Uuid::new_v4().to_string(),
359 severity: Severity::Warning,
360 category: NotificationCategory::Background,
361 title: "S3 background operation".to_string(),
362 message: message.to_string(),
363 resource: Some(format!("profile:{}", profile_id.as_str())),
364 operation: Some("buckets_list".to_string()),
365 timestamp: std::time::SystemTime::now()
366 .duration_since(std::time::UNIX_EPOCH)
367 .map(|d| d.as_millis() as i64)
368 .unwrap_or(0),
369 details: None,
370 };
371 log.write().await.push(notification);
372}
373
374#[cfg(test)]
379mod tests {
380 use super::*;
381 use crate::{
382 cache::{store::CacheStore, CacheConfig},
383 events::MockChannel,
384 };
385
386 fn validated_profile_id() -> ProfileId {
387 ProfileId::new("p-validated")
388 }
389
390 fn validated_at() -> Option<i64> {
391 Some(1_700_000_000_000)
392 }
393
394 #[test]
399 fn unvalidated_profile_gate_returns_auth_error() {
400 let validated_at: Option<i64> = None;
402 let result: Result<(), AppError> = if validated_at.is_none() {
403 Err(AppError::Auth {
404 reason: "profile_not_validated_in_session".to_string(),
405 })
406 } else {
407 Ok(())
408 };
409
410 match result {
411 Err(AppError::Auth { reason }) => {
412 assert_eq!(reason, "profile_not_validated_in_session");
413 }
414 _ => panic!("expected Auth error for unvalidated profile"),
415 }
416 }
417
418 #[test]
423 fn cache_fresh_hit_returns_without_sdk_call() {
424 let pid = validated_profile_id();
425 let cache = CacheStore::in_memory(CacheConfig::default());
426
427 let expected = vec![BucketSummary {
428 name: "my-bucket".to_string(),
429 creation_date: Some(1_700_000_000_000),
430 region: Some("us-east-1".to_string()),
431 profile_id: pid.clone(),
432 }];
433
434 cache
435 .put(&CacheKey::Buckets(pid.clone()), &expected, None)
436 .expect("put must succeed");
437
438 let read = cache
439 .get::<Vec<BucketSummary>>(&CacheKey::Buckets(pid.clone()), validated_at())
440 .expect("get must not error")
441 .expect("entry must exist");
442
443 assert_eq!(read.freshness, Freshness::Fresh);
444 assert_eq!(read.value.len(), 1);
445 assert_eq!(read.value[0].name, "my-bucket");
446 }
447
448 #[test]
453 fn unvalidated_profile_cannot_read_from_cache() {
454 let pid = ProfileId::new("p-unval");
455 let cache = CacheStore::in_memory(CacheConfig::default());
456
457 let buckets = vec![BucketSummary {
458 name: "secret-bucket".to_string(),
459 creation_date: None,
460 region: None,
461 profile_id: pid.clone(),
462 }];
463 cache
464 .put(&CacheKey::Buckets(pid.clone()), &buckets, None)
465 .unwrap();
466
467 let result = cache
469 .get::<Vec<BucketSummary>>(&CacheKey::Buckets(pid.clone()), None)
470 .expect("get must not error");
471
472 assert!(
473 result.is_none(),
474 "unvalidated profile must not read from cache"
475 );
476 }
477
478 #[test]
483 fn buckets_updated_event_carries_profile_id() {
484 let channel = MockChannel::default();
485 let pid = ProfileId::new("evt-profile");
486
487 events::emit(
488 &channel,
489 EventKind::BucketsUpdated,
490 serde_json::json!({ "profileId": pid.as_str() }),
491 )
492 .expect("emit must succeed");
493
494 let emitted = channel.emitted();
495 assert_eq!(emitted.len(), 1);
496 assert_eq!(emitted[0].0, EventKind::BucketsUpdated);
497 assert_eq!(emitted[0].1["profileId"], pid.as_str());
498 }
499
500 #[test]
505 fn region_cache_keys_are_bucket_specific() {
506 let pid = ProfileId::new("p1");
507 let k1 = region_cache_key(&pid, "bucket-a");
508 let k2 = region_cache_key(&pid, "bucket-b");
509 assert_ne!(k1.serialize_key(), k2.serialize_key());
510 }
511
512 #[test]
517 fn region_cache_keys_are_profile_specific() {
518 let k1 = region_cache_key(&ProfileId::new("p1"), "same-bucket");
519 let k2 = region_cache_key(&ProfileId::new("p2"), "same-bucket");
520 assert_ne!(k1.serialize_key(), k2.serialize_key());
521 }
522
523 #[test]
528 fn region_cache_round_trip() {
529 let pid = ProfileId::new("p-region");
530 let cache = CacheStore::in_memory(CacheConfig::default());
531 let key = region_cache_key(&pid, "bucket-eu");
532
533 cache.put(&key, "eu-west-1".to_string(), None).unwrap();
534
535 let read = cache
536 .get::<String>(&key, Some(1_700_000_000_000))
537 .unwrap()
538 .unwrap();
539 assert_eq!(read.value, "eu-west-1");
540 assert_eq!(read.freshness, Freshness::Fresh);
541 }
542}