Skip to main content

brows3r_lib/cache/
store.rs

1//! `CacheStore` — in-memory LRU + `redb` disk-backed cache.
2//!
3//! # Architecture
4//!
5//! ```text
6//!   get(key)
7//!     │
8//!     ├─ validation gate: profile_validation_ts == None? → Ok(None)
9//!     │
10//!     ├─ in-memory map hit? → deserialise + classify freshness → return
11//!     │
12//!     └─ redb table hit?   → populate in-memory + classify freshness → return
13//! ```
14//!
15//! `put` writes to both layers.  `invalidate` removes from both layers.
16//! `invalidate_profile` removes every key whose profile prefix matches.
17//!
18//! # Disk schema
19//!
20//! One `redb` table named `"cache"`.
21//! - Key  : serialized `CacheKey` bytes (`CacheKey::serialize_key()`).
22//! - Value: `serde_json::to_vec(CacheEntry<serde_json::Value>)`.
23//!
24//! # Clock injection
25//!
26//! `CacheStore::new_with_clock` accepts a `Clock` impl so tests can control
27//! time without sleeping.  Production code uses `SystemClock`.
28
29use std::{
30    collections::HashMap,
31    sync::{Arc, Mutex},
32    time::Duration,
33};
34
35use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
36use serde::{de::DeserializeOwned, Serialize};
37use serde_json::Value;
38
39use crate::error::AppError;
40
41use super::{CacheConfig, CacheEntry, CacheKey, CacheRead, Freshness};
42
43// ---------------------------------------------------------------------------
44// redb table definition
45// ---------------------------------------------------------------------------
46
47/// Single table: serialized CacheKey → JSON-encoded `CacheEntry<Value>`.
48const CACHE_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("cache");
49
50// ---------------------------------------------------------------------------
51// Clock trait — allows test-time injection
52// ---------------------------------------------------------------------------
53
54/// Source of the current Unix timestamp in seconds.
55pub trait Clock: Send + Sync {
56    fn now_secs(&self) -> i64;
57}
58
59/// Real wall-clock implementation.
60#[derive(Debug, Clone, Default)]
61pub struct SystemClock;
62
63impl Clock for SystemClock {
64    fn now_secs(&self) -> i64 {
65        std::time::SystemTime::now()
66            .duration_since(std::time::UNIX_EPOCH)
67            .unwrap_or_default()
68            .as_secs() as i64
69    }
70}
71
72/// Controllable test clock.
73#[derive(Debug, Default)]
74pub struct MockClock {
75    inner: Mutex<i64>,
76}
77
78impl MockClock {
79    pub fn new(secs: i64) -> Arc<Self> {
80        Arc::new(Self {
81            inner: Mutex::new(secs),
82        })
83    }
84
85    /// Advance the clock by `delta` seconds.
86    pub fn advance(&self, delta: i64) {
87        *self.inner.lock().unwrap() += delta;
88    }
89}
90
91impl Clock for MockClock {
92    fn now_secs(&self) -> i64 {
93        *self.inner.lock().unwrap()
94    }
95}
96
97// ---------------------------------------------------------------------------
98// InMemoryEntry
99// ---------------------------------------------------------------------------
100
101/// An in-memory slot holds the raw JSON bytes so the type parameter can vary
102/// per call site without making `CacheStore` itself generic.
103#[derive(Debug, Clone)]
104struct InMemoryEntry {
105    raw: Vec<u8>, // serde_json bytes of CacheEntry<Value>
106    expires_at: i64,
107    swr_deadline: i64, // expires_at + swr_window_secs
108}
109
110// ---------------------------------------------------------------------------
111// open_or_recreate_redb — wipe + retry on stale-schema files
112// ---------------------------------------------------------------------------
113
114/// Open a `redb` `Database` at `path`, recreating the file when it has a
115/// stale on-disk schema.
116///
117/// redb's file format changes between major versions (we recently bumped
118/// 2.x → 4.x). Opening a file written by the previous major returns
119/// `DatabaseError::UpgradeRequired(_)` and there is no automatic in-place
120/// migration. Every redb file the app maintains is derivative state —
121/// the SWR cache and the multipart-upload bookkeeping table — that can be
122/// safely wiped and rebuilt at runtime; the alternative (panicking and
123/// refusing to launch) is far worse for the user than losing a cache and
124/// orphaning a handful of multipart uploads (which the MultipartPanel can
125/// still clean up by scanning S3 directly).
126///
127/// Retry strategy:
128///   1. `Database::create(path)` (opens existing or creates new).
129///   2. If `UpgradeRequired` → remove the file and call `create` again.
130///   3. Any other error propagates unchanged.
131pub fn open_or_recreate_redb(path: &std::path::Path) -> Result<Database, redb::DatabaseError> {
132    use redb::DatabaseError;
133    match Database::create(path) {
134        Ok(db) => Ok(db),
135        Err(DatabaseError::UpgradeRequired(_)) => {
136            // Best-effort: ignore the remove error so a missing/locked file
137            // still falls through to the retry path with a meaningful error.
138            let _ = std::fs::remove_file(path);
139            Database::create(path)
140        }
141        Err(e) => Err(e),
142    }
143}
144
145/// Open a redb `Database`, returning an in-memory backend as the final
146/// fallback so callers never have to panic during app startup.
147///
148/// First tries [`open_or_recreate_redb`]. On any error that
149/// `open_or_recreate_redb` cannot itself recover from (permissions, full
150/// disk, locked file, corrupt header that isn't a stale-schema marker),
151/// switches to redb's `InMemoryBackend`. The app keeps running with a
152/// non-persistent state for this session.
153///
154/// Returns `(db, was_in_memory)` so callers can surface a one-shot
155/// startup notification explaining that, e.g., the multipart upload
156/// bookkeeping is not persisting this session.
157pub fn open_redb_or_in_memory(path: &std::path::Path) -> (Database, bool) {
158    match open_or_recreate_redb(path) {
159        Ok(db) => (db, false),
160        Err(_) => {
161            // `InMemoryBackend` is purely in-process state with no IO of its
162            // own to fail; the only way `create_with_backend` errors here is
163            // if redb changes the invariant — be loud so we catch it.
164            let db = Database::builder()
165                .create_with_backend(redb::backends::InMemoryBackend::new())
166                .expect(
167                    "redb InMemoryBackend create cannot fail in current crate \
168                     version; audit this call if it ever does",
169                );
170            (db, true)
171        }
172    }
173}
174
175// ---------------------------------------------------------------------------
176// CacheStore
177// ---------------------------------------------------------------------------
178
179/// Shared cache handle.  Clone to share across threads.
180pub type CacheHandle = Arc<CacheStore>;
181
182/// In-memory + disk-backed authoritative cache.
183///
184/// All methods are synchronous (redb is synchronous).  Callers that need
185/// async can wrap calls in `tokio::task::spawn_blocking`.
186pub struct CacheStore {
187    config: CacheConfig,
188    clock: Arc<dyn Clock>,
189    /// In-memory LRU map.  `HashMap` with a simple insertion-order eviction
190    /// is sufficient for the v1 `max_in_memory_entries` budget; a proper LRU
191    /// is a future optimisation.
192    mem: Mutex<HashMap<Vec<u8>, InMemoryEntry>>,
193    /// `redb` database handle.  `None` in unit tests that skip the disk layer.
194    db: Option<Arc<Database>>,
195}
196
197impl CacheStore {
198    /// Open a `redb` database at `path` and return a shared `CacheHandle`.
199    pub fn open(path: &std::path::Path, config: CacheConfig) -> Result<CacheHandle, AppError> {
200        let db = open_or_recreate_redb(path).map_err(|e| AppError::Internal {
201            trace_id: format!("redb open failed: {e}"),
202        })?;
203        // Ensure the table exists.
204        let write_txn = db.begin_write().map_err(|e| AppError::Internal {
205            trace_id: format!("redb begin_write failed: {e}"),
206        })?;
207        {
208            write_txn
209                .open_table(CACHE_TABLE)
210                .map_err(|e| AppError::Internal {
211                    trace_id: format!("redb open_table failed: {e}"),
212                })?;
213        }
214        write_txn.commit().map_err(|e| AppError::Internal {
215            trace_id: format!("redb commit failed: {e}"),
216        })?;
217
218        Ok(Arc::new(Self {
219            config,
220            clock: Arc::new(SystemClock),
221            mem: Mutex::new(HashMap::new()),
222            db: Some(Arc::new(db)),
223        }))
224    }
225
226    /// In-memory only store — used in tests that do not exercise the disk layer.
227    pub fn in_memory(config: CacheConfig) -> CacheHandle {
228        Arc::new(Self {
229            config,
230            clock: Arc::new(SystemClock),
231            mem: Mutex::new(HashMap::new()),
232            db: None,
233        })
234    }
235
236    /// Constructor with an injected clock for tests.
237    pub fn new_with_clock(
238        config: CacheConfig,
239        clock: Arc<dyn Clock>,
240        db: Option<Arc<Database>>,
241    ) -> CacheHandle {
242        Arc::new(Self {
243            config,
244            clock,
245            mem: Mutex::new(HashMap::new()),
246            db,
247        })
248    }
249
250    // -----------------------------------------------------------------------
251    // db
252    // -----------------------------------------------------------------------
253
254    /// Return the underlying `redb::Database` handle, if one was opened.
255    ///
256    /// Used by `MultipartTable` (task 32) to share the same file handle
257    /// rather than opening `cache.redb` a second time.
258    pub fn db(&self) -> Option<Arc<Database>> {
259        self.db.clone()
260    }
261
262    // -----------------------------------------------------------------------
263    // get
264    // -----------------------------------------------------------------------
265
266    /// Read a cached value.
267    ///
268    /// # Validation gate (AC-8)
269    ///
270    /// If `profile_validation_ts` is `None`, the profile has not been
271    /// validated in the current session and cached data MUST NOT be surfaced.
272    /// The gate returns `Ok(None)` without reading from disk.
273    pub fn get<T: DeserializeOwned>(
274        &self,
275        key: &CacheKey,
276        profile_validation_ts: Option<i64>,
277    ) -> Result<Option<CacheRead<T>>, AppError> {
278        // --- AC-8 validation gate -------------------------------------------
279        if profile_validation_ts.is_none() {
280            return Ok(None);
281        }
282
283        let raw_key = key.serialize_key();
284        let now = self.clock.now_secs();
285
286        // --- in-memory lookup -----------------------------------------------
287        if let Some(entry) = self.mem.lock().unwrap().get(&raw_key).cloned() {
288            return Self::classify_entry_bytes::<T>(
289                &entry.raw,
290                now,
291                entry.expires_at,
292                entry.swr_deadline,
293            );
294        }
295
296        // --- disk lookup ----------------------------------------------------
297        if let Some(db) = &self.db {
298            let read_txn = db.begin_read().map_err(|e| AppError::Internal {
299                trace_id: format!("redb begin_read failed: {e}"),
300            })?;
301            let table = read_txn
302                .open_table(CACHE_TABLE)
303                .map_err(|e| AppError::Internal {
304                    trace_id: format!("redb open_table failed: {e}"),
305                })?;
306            if let Some(bytes_guard) =
307                table
308                    .get(raw_key.as_slice())
309                    .map_err(|e| AppError::Internal {
310                        trace_id: format!("redb get failed: {e}"),
311                    })?
312            {
313                let bytes: Vec<u8> = bytes_guard.value().to_vec();
314                // Parse the stored CacheEntry<Value> to extract timing metadata.
315                let entry: CacheEntry<Value> =
316                    serde_json::from_slice(&bytes).map_err(|e| AppError::Internal {
317                        trace_id: format!("cache deserialise failed: {e}"),
318                    })?;
319                let swr_deadline = entry.expires_at + self.config.swr_window_secs as i64;
320
321                // Populate in-memory cache.
322                self.mem.lock().unwrap().insert(
323                    raw_key,
324                    InMemoryEntry {
325                        raw: bytes.clone(),
326                        expires_at: entry.expires_at,
327                        swr_deadline,
328                    },
329                );
330
331                return Self::classify_entry_bytes::<T>(
332                    &bytes,
333                    now,
334                    entry.expires_at,
335                    swr_deadline,
336                );
337            }
338        }
339
340        Ok(None)
341    }
342
343    // -----------------------------------------------------------------------
344    // put
345    // -----------------------------------------------------------------------
346
347    /// Write a value into both the in-memory and disk layers.
348    pub fn put<T: Serialize>(
349        &self,
350        key: &CacheKey,
351        value: T,
352        ttl: Option<Duration>,
353    ) -> Result<(), AppError> {
354        let now = self.clock.now_secs();
355        let ttl_secs = ttl
356            .map(|d| d.as_secs())
357            .unwrap_or(self.config.default_ttl_secs) as i64;
358        let expires_at = now + ttl_secs;
359        let swr_deadline = expires_at + self.config.swr_window_secs as i64;
360
361        // Serialise value to JSON Value first so we can store it generically.
362        let json_value: Value = serde_json::to_value(&value).map_err(|e| AppError::Internal {
363            trace_id: format!("cache serialise failed: {e}"),
364        })?;
365
366        let entry: CacheEntry<Value> = CacheEntry {
367            value: json_value,
368            fetched_at: now,
369            expires_at,
370            etag: None,
371        };
372
373        let bytes = serde_json::to_vec(&entry).map_err(|e| AppError::Internal {
374            trace_id: format!("cache serialise failed: {e}"),
375        })?;
376
377        let raw_key = key.serialize_key();
378
379        // In-memory with naïve eviction: drop oldest when at capacity.
380        {
381            let mut mem = self.mem.lock().unwrap();
382            if mem.len() >= self.config.max_in_memory_entries && !mem.contains_key(&raw_key) {
383                // Remove an arbitrary entry (simplest eviction for v1).
384                if let Some(oldest_key) = mem.keys().next().cloned() {
385                    mem.remove(&oldest_key);
386                }
387            }
388            mem.insert(
389                raw_key.clone(),
390                InMemoryEntry {
391                    raw: bytes.clone(),
392                    expires_at,
393                    swr_deadline,
394                },
395            );
396        }
397
398        // Disk.
399        if let Some(db) = &self.db {
400            let write_txn = db.begin_write().map_err(|e| AppError::Internal {
401                trace_id: format!("redb begin_write failed: {e}"),
402            })?;
403            {
404                let mut table =
405                    write_txn
406                        .open_table(CACHE_TABLE)
407                        .map_err(|e| AppError::Internal {
408                            trace_id: format!("redb open_table failed: {e}"),
409                        })?;
410                table
411                    .insert(raw_key.as_slice(), bytes.as_slice())
412                    .map_err(|e| AppError::Internal {
413                        trace_id: format!("redb insert failed: {e}"),
414                    })?;
415            }
416            write_txn.commit().map_err(|e| AppError::Internal {
417                trace_id: format!("redb commit failed: {e}"),
418            })?;
419        }
420
421        Ok(())
422    }
423
424    // -----------------------------------------------------------------------
425    // invalidate
426    // -----------------------------------------------------------------------
427
428    /// Remove a single entry from both memory and disk.
429    pub fn invalidate(&self, key: &CacheKey) {
430        let raw_key = key.serialize_key();
431        self.mem.lock().unwrap().remove(&raw_key);
432
433        if let Some(db) = &self.db {
434            if let Ok(txn) = db.begin_write() {
435                if let Ok(mut table) = txn.open_table(CACHE_TABLE) {
436                    let _ = table.remove(raw_key.as_slice());
437                }
438                let _ = txn.commit();
439            }
440        }
441    }
442
443    // -----------------------------------------------------------------------
444    // invalidate_profile
445    // -----------------------------------------------------------------------
446
447    /// Remove all entries scoped to `profile` from both layers.
448    ///
449    /// In-memory: O(n) linear scan — acceptable for `max_in_memory_entries`.
450    /// Disk: full table scan, removing matching keys in one transaction.
451    pub fn invalidate_profile(&self, profile: &crate::ids::ProfileId) {
452        // `prefix` matches keys like `objects\x00<profile>\x00...`.
453        // `profile_tag` matches the `Buckets` key which ends with `\x00<profile>`.
454        let prefix = format!("\x00{}\x00", profile.as_str());
455        let profile_tag = format!("\x00{}", profile.as_str());
456
457        // Memory: O(n) linear scan.
458        {
459            let mut mem = self.mem.lock().unwrap();
460            mem.retain(|k, _| {
461                let s = String::from_utf8_lossy(k);
462                !s.contains(&prefix) && !s.ends_with(profile_tag.as_str())
463            });
464        }
465
466        // Disk.
467        if let Some(db) = &self.db {
468            if let Ok(txn) = db.begin_write() {
469                if let Ok(mut table) = txn.open_table(CACHE_TABLE) {
470                    // Collect keys to remove (we cannot remove while iterating
471                    // in some redb versions, so collect first).
472                    let to_remove: Vec<Vec<u8>> = table
473                        .iter()
474                        .ok()
475                        .map(|iter| {
476                            iter.filter_map(|r| {
477                                let (k, _) = r.ok()?;
478                                let bytes: &[u8] = k.value();
479                                let s = String::from_utf8_lossy(bytes);
480                                if s.contains(&prefix) || s.ends_with(profile_tag.as_str()) {
481                                    Some(bytes.to_vec())
482                                } else {
483                                    None
484                                }
485                            })
486                            .collect()
487                        })
488                        .unwrap_or_default();
489
490                    for k in to_remove {
491                        let _ = table.remove(k.as_slice());
492                    }
493                }
494                let _ = txn.commit();
495            }
496        }
497    }
498
499    // -----------------------------------------------------------------------
500    // clear_all
501    // -----------------------------------------------------------------------
502
503    /// Wipe every cached entry from both layers.
504    pub fn clear_all(&self) {
505        self.mem.lock().unwrap().clear();
506
507        if let Some(db) = &self.db {
508            if let Ok(txn) = db.begin_write() {
509                if let Ok(mut table) = txn.open_table(CACHE_TABLE) {
510                    // Drain by iterating the full table.
511                    let keys: Vec<Vec<u8>> = table
512                        .iter()
513                        .ok()
514                        .map(|iter| {
515                            iter.filter_map(|r| {
516                                let (k, _) = r.ok()?;
517                                Some(k.value().to_vec())
518                            })
519                            .collect()
520                        })
521                        .unwrap_or_default();
522                    for k in keys {
523                        let _ = table.remove(k.as_slice());
524                    }
525                }
526                let _ = txn.commit();
527            }
528        }
529    }
530
531    // -----------------------------------------------------------------------
532    // Internal helpers
533    // -----------------------------------------------------------------------
534
535    /// Deserialise raw bytes and classify freshness based on the current time.
536    fn classify_entry_bytes<T: DeserializeOwned>(
537        raw: &[u8],
538        now: i64,
539        expires_at: i64,
540        swr_deadline: i64,
541    ) -> Result<Option<CacheRead<T>>, AppError> {
542        let entry: CacheEntry<Value> =
543            serde_json::from_slice(raw).map_err(|e| AppError::Internal {
544                trace_id: format!("cache deserialise failed: {e}"),
545            })?;
546
547        let freshness = if now < expires_at {
548            Freshness::Fresh
549        } else if now < swr_deadline {
550            Freshness::Stale
551        } else {
552            // Beyond the SWR window — entry is logically gone.
553            return Ok(None);
554        };
555
556        let value: T = serde_json::from_value(entry.value).map_err(|e| AppError::Internal {
557            trace_id: format!("cache value deserialise failed: {e}"),
558        })?;
559
560        Ok(Some(CacheRead { value, freshness }))
561    }
562}
563
564// ---------------------------------------------------------------------------
565// Tests
566// ---------------------------------------------------------------------------
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571    use crate::ids::{BucketId, ObjectKey, ProfileId};
572
573    use std::sync::Arc;
574
575    fn test_store(clock: Arc<dyn Clock>) -> CacheHandle {
576        let config = CacheConfig {
577            default_ttl_secs: 10,
578            swr_window_secs: 20,
579            max_in_memory_entries: 1024,
580        };
581        CacheStore::new_with_clock(config, clock, None)
582    }
583
584    fn profile_a() -> ProfileId {
585        ProfileId::new("profile-a")
586    }
587    fn profile_b() -> ProfileId {
588        ProfileId::new("profile-b")
589    }
590
591    // -----------------------------------------------------------------------
592    // AC-8 verbatim test
593    // -----------------------------------------------------------------------
594
595    #[test]
596    fn validation_gate_blocks_read_through_for_unvalidated_profile_in_current_session() {
597        let clock = MockClock::new(1_000_000);
598        let store = test_store(clock.clone());
599
600        let key = CacheKey::Buckets(profile_a());
601
602        // Put an entry that would normally be fresh.
603        store
604            .put(&key, serde_json::json!(["bucket-1", "bucket-2"]), None)
605            .unwrap();
606
607        // Reading with profile_validation_ts = None must return Ok(None).
608        let result: Option<CacheRead<serde_json::Value>> = store.get(&key, None).unwrap();
609
610        assert!(
611            result.is_none(),
612            "cache must not serve data for an unvalidated profile (AC-8)"
613        );
614    }
615
616    // -----------------------------------------------------------------------
617    // TTL expiry
618    // -----------------------------------------------------------------------
619
620    #[test]
621    fn ttl_expiry_returns_stale_then_missing() {
622        let clock = MockClock::new(1_000_000);
623        let store = test_store(clock.clone());
624
625        let key = CacheKey::Buckets(profile_a());
626        let validated = Some(999_999_i64);
627
628        store.put(&key, serde_json::json!(42u32), None).unwrap();
629
630        // Within TTL (10 s) → Fresh.
631        let read: CacheRead<serde_json::Value> = store
632            .get(&key, validated)
633            .unwrap()
634            .expect("should be present");
635        assert_eq!(read.freshness, Freshness::Fresh);
636
637        // Advance past TTL but within SWR window (10 < 12 < 30).
638        clock.advance(12);
639        let read: CacheRead<serde_json::Value> = store
640            .get(&key, validated)
641            .unwrap()
642            .expect("should still be present as stale");
643        assert_eq!(read.freshness, Freshness::Stale);
644
645        // Advance past SWR window (TTL=10, SWR=20, total=30 from write).
646        // We've advanced 12 already; need 18 more to clear 30 total.
647        clock.advance(19);
648        let missing: Option<CacheRead<serde_json::Value>> = store.get(&key, validated).unwrap();
649        assert!(
650            missing.is_none(),
651            "entry must be missing after SWR window expires"
652        );
653    }
654
655    // -----------------------------------------------------------------------
656    // SWR stale read
657    // -----------------------------------------------------------------------
658
659    #[test]
660    fn swr_stale_read_returns_value_between_ttl_and_swr_window() {
661        let clock = MockClock::new(2_000_000);
662        let store = test_store(clock.clone());
663
664        let key = CacheKey::Objects {
665            profile: profile_a(),
666            bucket: BucketId::new("my-bucket"),
667            prefix: "photos/".to_string(),
668        };
669        let validated = Some(1_999_999_i64);
670        let payload = serde_json::json!({ "items": ["a.jpg", "b.jpg"] });
671
672        store.put(&key, payload.clone(), None).unwrap();
673
674        // Advance into the SWR window (TTL=10, advance 15 → stale).
675        clock.advance(15);
676
677        let read: CacheRead<serde_json::Value> = store
678            .get(&key, validated)
679            .unwrap()
680            .expect("stale entry must still be returned");
681
682        assert_eq!(
683            read.freshness,
684            Freshness::Stale,
685            "should be Stale in SWR window"
686        );
687        assert_eq!(
688            read.value, payload,
689            "stale value must equal the written payload"
690        );
691    }
692
693    // -----------------------------------------------------------------------
694    // Per-profile invalidation
695    // -----------------------------------------------------------------------
696
697    #[test]
698    fn per_profile_invalidation_removes_profile_a_leaves_profile_b() {
699        let clock = MockClock::new(3_000_000);
700        let store = test_store(clock.clone());
701        let validated_a = Some(2_999_999_i64);
702        let validated_b = Some(2_999_999_i64);
703
704        // 3 entries for profile A.
705        let key_a1 = CacheKey::Buckets(profile_a());
706        let key_a2 = CacheKey::Objects {
707            profile: profile_a(),
708            bucket: BucketId::new("bkt-a"),
709            prefix: String::new(),
710        };
711        let key_a3 = CacheKey::ObjectHead {
712            profile: profile_a(),
713            bucket: BucketId::new("bkt-a"),
714            key: ObjectKey::new("file.txt"),
715        };
716        // 1 entry for profile B.
717        let key_b1 = CacheKey::Buckets(profile_b());
718
719        store.put(&key_a1, serde_json::json!("a1"), None).unwrap();
720        store.put(&key_a2, serde_json::json!("a2"), None).unwrap();
721        store.put(&key_a3, serde_json::json!("a3"), None).unwrap();
722        store.put(&key_b1, serde_json::json!("b1"), None).unwrap();
723
724        // Invalidate all of profile A.
725        store.invalidate_profile(&profile_a());
726
727        // Profile A entries must be gone.
728        assert!(
729            store
730                .get::<serde_json::Value>(&key_a1, validated_a)
731                .unwrap()
732                .is_none(),
733            "key_a1 must be removed"
734        );
735        assert!(
736            store
737                .get::<serde_json::Value>(&key_a2, validated_a)
738                .unwrap()
739                .is_none(),
740            "key_a2 must be removed"
741        );
742        assert!(
743            store
744                .get::<serde_json::Value>(&key_a3, validated_a)
745                .unwrap()
746                .is_none(),
747            "key_a3 must be removed"
748        );
749
750        // Profile B entry must survive.
751        let b1 = store
752            .get::<serde_json::Value>(&key_b1, validated_b)
753            .unwrap();
754        assert!(b1.is_some(), "key_b1 must survive profile A invalidation");
755    }
756
757    // -----------------------------------------------------------------------
758    // redb round-trip
759    // -----------------------------------------------------------------------
760
761    #[test]
762    fn redb_round_trip_survives_store_reopen() {
763        let dir = tempfile::tempdir().expect("tempdir");
764        let db_path = dir.path().join("cache.redb");
765        let config = CacheConfig {
766            default_ttl_secs: 300,
767            swr_window_secs: 600,
768            max_in_memory_entries: 1024,
769        };
770        let key = CacheKey::Buckets(ProfileId::new("redb-profile"));
771        let payload = serde_json::json!(["bucket-x", "bucket-y"]);
772        let validated = Some(0_i64);
773
774        // Write with the first store instance.
775        {
776            let store = CacheStore::open(&db_path, config.clone()).expect("open store");
777            store.put(&key, payload.clone(), None).unwrap();
778        }
779
780        // Reopen — in-memory is cold; must fall through to redb.
781        {
782            let store = CacheStore::open(&db_path, config).expect("reopen store");
783            let read: CacheRead<serde_json::Value> = store
784                .get(&key, validated)
785                .unwrap()
786                .expect("entry must survive reopen");
787            assert_eq!(read.freshness, Freshness::Fresh);
788            assert_eq!(read.value, payload);
789        }
790    }
791}