Skip to main content

brows3r_lib/locks/
mod.rs

1//! Resource lock registry with full lifecycle.
2//!
3//! Provides acquire/release/heartbeat/TTL-expire/startup-cleanup for any
4//! scoped resource operation, wired to the typed event system from task 9.
5//!
6//! # OCP contract
7//!
8//! - Adding a new scope dimension: add one optional field to `LockScope` and
9//!   one intersection arm in `LockScope::intersects`.  No existing arms change.
10//! - Adding a new release reason: add one `ReleaseReason` variant.  Serializes
11//!   automatically via `rename_all = "snake_case"`.
12//! - Heartbeat loop works for any scope — it is a generic background task that
13//!   calls `release_stale` on any `LockRegistry`.
14//! - Event emission uses `events::emit` — no string literals scattered at call
15//!   sites.
16
17pub mod lifecycle;
18
19use std::{
20    collections::HashMap,
21    sync::{Arc, Mutex},
22};
23
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use crate::{
28    error::AppError,
29    events::{EventEmitter, EventKind},
30    ids::{BucketId, ObjectKey, ProfileId},
31};
32
33// ---------------------------------------------------------------------------
34// LockId
35// ---------------------------------------------------------------------------
36
37/// Opaque lock identifier — UUID v4 minted on `acquire`.
38#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
39#[serde(transparent)]
40pub struct LockId(pub String);
41
42impl LockId {
43    /// Mint a new `LockId` backed by a UUID v4.
44    pub fn new_v4() -> Self {
45        Self(Uuid::new_v4().to_string())
46    }
47
48    pub fn as_str(&self) -> &str {
49        &self.0
50    }
51}
52
53impl std::fmt::Display for LockId {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.write_str(&self.0)
56    }
57}
58
59impl From<&str> for LockId {
60    fn from(s: &str) -> Self {
61        Self(s.to_owned())
62    }
63}
64
65impl From<String> for LockId {
66    fn from(s: String) -> Self {
67        Self(s)
68    }
69}
70
71// ---------------------------------------------------------------------------
72// LockScope
73// ---------------------------------------------------------------------------
74
75/// Hierarchical scope key for a resource lock.
76///
77/// Conflict detection uses longest-prefix matching: two scopes conflict when
78/// one is an ancestor of the other in the hierarchy
79/// `profile → bucket → prefix → key`.
80///
81/// Adding a new scope dimension requires only: a new optional field here and
82/// one additional arm in `intersects`.
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct LockScope {
86    pub profile: ProfileId,
87    pub bucket: Option<BucketId>,
88    /// S3 key prefix (folder path), e.g. `"images/"`.
89    pub prefix: Option<String>,
90    pub key: Option<ObjectKey>,
91}
92
93impl LockScope {
94    /// Returns `true` when `self` and `other` share a resource and a lock on
95    /// one would conflict with a lock on the other.
96    ///
97    /// Two scopes intersect when one is an ancestor (or equal to) the other in
98    /// the `profile → bucket → prefix → key` hierarchy — longest-prefix rule.
99    pub fn intersects(&self, other: &LockScope) -> bool {
100        // Different profiles never conflict.
101        if self.profile != other.profile {
102            return false;
103        }
104
105        // Different buckets never conflict (when both specified).
106        if let (Some(a), Some(b)) = (&self.bucket, &other.bucket) {
107            if a != b {
108                return false;
109            }
110        }
111        // If either bucket is None the scope is profile-wide → overlaps any
112        // same-profile scope.
113        if self.bucket.is_none() || other.bucket.is_none() {
114            return true;
115        }
116
117        // Same bucket. Key-level checks take precedence so two concurrent
118        // single-object operations (e.g. download a.pdf and download
119        // b.html) do not falsely conflict via a None prefix on both sides.
120        // The previous implementation early-returned `true` when either
121        // prefix was None — making every `prefix=None, key=Some` scope
122        // collide with every other one in the same bucket.
123        match (&self.key, &other.key) {
124            (Some(a), Some(b)) => return a == b,
125            (Some(k), None) => {
126                // self is key-specific; other is broader (prefix or bucket).
127                // Overlap iff k is under other's prefix (or other is
128                // bucket-wide, i.e. other.prefix == None).
129                return match &other.prefix {
130                    Some(p) => k.as_str().starts_with(p.as_str()),
131                    None => true,
132                };
133            }
134            (None, Some(k)) => {
135                return match &self.prefix {
136                    Some(p) => k.as_str().starts_with(p.as_str()),
137                    None => true,
138                };
139            }
140            (None, None) => {}
141        }
142
143        // Neither side carries a specific key. Compare prefixes.
144        match (&self.prefix, &other.prefix) {
145            (None, _) | (_, None) => true,
146            (Some(a), Some(b)) => a.starts_with(b.as_str()) || b.starts_with(a.as_str()),
147        }
148    }
149}
150
151// ---------------------------------------------------------------------------
152// ReleaseReason
153// ---------------------------------------------------------------------------
154
155/// Why a lock was released.
156///
157/// Serialized as snake_case strings to match the `lock:released { reason }`
158/// event payload described in the design (line 399–400).
159#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
160#[serde(rename_all = "snake_case")]
161pub enum ReleaseReason {
162    Success,
163    Failure,
164    Cancel,
165    Ttl,
166    StartupCleanup,
167}
168
169// ---------------------------------------------------------------------------
170// ResourceLock
171// ---------------------------------------------------------------------------
172
173/// An active resource lock held by an operation.
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "camelCase")]
176pub struct ResourceLock {
177    pub id: LockId,
178    pub scope: LockScope,
179    pub op_name: String,
180    /// Unix timestamp (seconds) when the lock was acquired.
181    pub acquired_at: i64,
182    /// Unix timestamp (seconds) of the last heartbeat (or initial acquisition).
183    pub last_heartbeat_at: i64,
184    /// How many seconds of inactivity before the lock is considered stale.
185    pub ttl_secs: u64,
186}
187
188// ---------------------------------------------------------------------------
189// Event payloads
190// ---------------------------------------------------------------------------
191
192#[derive(Debug, Clone, Serialize)]
193#[serde(rename_all = "camelCase")]
194pub struct LockAcquiredPayload {
195    pub lock_id: LockId,
196    pub scope: LockScope,
197    pub op_name: String,
198}
199
200#[derive(Debug, Clone, Serialize)]
201#[serde(rename_all = "camelCase")]
202pub struct LockReleasedPayload {
203    pub lock_id: LockId,
204    pub scope: LockScope,
205    pub reason: ReleaseReason,
206}
207
208// ---------------------------------------------------------------------------
209// LockRegistry
210// ---------------------------------------------------------------------------
211
212/// Inner registry state — separated so tests can poke at it directly.
213struct RegistryInner {
214    locks: HashMap<LockId, ResourceLock>,
215}
216
217impl RegistryInner {
218    fn new() -> Self {
219        Self {
220            locks: HashMap::new(),
221        }
222    }
223}
224
225/// Thread-safe in-memory registry of active resource locks.
226///
227/// Wrap in `Arc<LockRegistry>` and manage via Tauri's state system.
228pub struct LockRegistry {
229    inner: Mutex<RegistryInner>,
230}
231
232impl LockRegistry {
233    pub fn new() -> Self {
234        Self {
235            inner: Mutex::new(RegistryInner::new()),
236        }
237    }
238
239    // -----------------------------------------------------------------------
240    // acquire
241    // -----------------------------------------------------------------------
242
243    /// Acquire a lock for `scope` / `op_name` with `ttl_secs` TTL.
244    ///
245    /// Returns the new `LockId` on success.  Returns `AppError::Locked` when
246    /// any existing lock has an intersecting scope.
247    ///
248    /// The caller is responsible for emitting `lock:acquired` via
249    /// `lifecycle::emit_acquired`.
250    pub fn acquire(
251        &self,
252        scope: LockScope,
253        op_name: impl Into<String>,
254        ttl_secs: u64,
255        now: i64,
256    ) -> Result<LockId, AppError> {
257        let op_name = op_name.into();
258        let mut inner = self.inner.lock().expect("lock poisoned");
259
260        // Check for conflicts.
261        for existing in inner.locks.values() {
262            if existing.scope.intersects(&scope) {
263                return Err(AppError::Locked {
264                    lock_id: existing.id.as_str().to_owned(),
265                    op_name: existing.op_name.clone(),
266                });
267            }
268        }
269
270        let id = LockId::new_v4();
271        let lock = ResourceLock {
272            id: id.clone(),
273            scope,
274            op_name,
275            acquired_at: now,
276            last_heartbeat_at: now,
277            ttl_secs,
278        };
279        inner.locks.insert(id.clone(), lock);
280        Ok(id)
281    }
282
283    // -----------------------------------------------------------------------
284    // heartbeat
285    // -----------------------------------------------------------------------
286
287    /// Extend the heartbeat for `lock_id`.
288    ///
289    /// Returns `AppError::NotFound` if the lock does not exist.
290    pub fn heartbeat(&self, lock_id: &LockId, now: i64) -> Result<(), AppError> {
291        let mut inner = self.inner.lock().expect("lock poisoned");
292        match inner.locks.get_mut(lock_id) {
293            Some(lock) => {
294                lock.last_heartbeat_at = now;
295                Ok(())
296            }
297            None => Err(AppError::NotFound {
298                resource: format!("lock:{}", lock_id.as_str()),
299            }),
300        }
301    }
302
303    // -----------------------------------------------------------------------
304    // release
305    // -----------------------------------------------------------------------
306
307    /// Release a lock explicitly.
308    ///
309    /// Returns `AppError::NotFound` if the lock does not exist.
310    /// The returned `ResourceLock` carries the scope needed for the event.
311    pub fn release(&self, lock_id: &LockId) -> Result<ResourceLock, AppError> {
312        let mut inner = self.inner.lock().expect("lock poisoned");
313        inner
314            .locks
315            .remove(lock_id)
316            .ok_or_else(|| AppError::NotFound {
317                resource: format!("lock:{}", lock_id.as_str()),
318            })
319    }
320
321    // -----------------------------------------------------------------------
322    // list
323    // -----------------------------------------------------------------------
324
325    /// Return all active locks, optionally filtered to those whose scope
326    /// intersects `scope_filter`.
327    pub fn list(&self, scope_filter: Option<&LockScope>) -> Vec<ResourceLock> {
328        let inner = self.inner.lock().expect("lock poisoned");
329        inner
330            .locks
331            .values()
332            .filter(|l| scope_filter.map(|f| l.scope.intersects(f)).unwrap_or(true))
333            .cloned()
334            .collect()
335    }
336
337    // -----------------------------------------------------------------------
338    // release_stale
339    // -----------------------------------------------------------------------
340
341    /// Release all locks whose TTL has expired relative to `now`.
342    ///
343    /// Returns the released locks so the caller can emit events.
344    pub fn release_stale(&self, now: i64) -> Vec<ResourceLock> {
345        let mut inner = self.inner.lock().expect("lock poisoned");
346        let stale_ids: Vec<LockId> = inner
347            .locks
348            .values()
349            .filter(|l| (l.last_heartbeat_at + l.ttl_secs as i64) < now)
350            .map(|l| l.id.clone())
351            .collect();
352
353        let mut released = Vec::with_capacity(stale_ids.len());
354        for id in stale_ids {
355            if let Some(lock) = inner.locks.remove(&id) {
356                released.push(lock);
357            }
358        }
359        released
360    }
361
362    // -----------------------------------------------------------------------
363    // startup_cleanup
364    // -----------------------------------------------------------------------
365
366    /// Remove *all* locks regardless of TTL.
367    ///
368    /// Called once at app start to clear any locks left over from a prior crash
369    /// or abnormal exit.  Returns the removed locks so the caller can emit
370    /// `lock:released { reason: StartupCleanup }` for each.
371    pub fn startup_cleanup(&self) -> Vec<ResourceLock> {
372        let mut inner = self.inner.lock().expect("lock poisoned");
373        inner.locks.drain().map(|(_, v)| v).collect()
374    }
375}
376
377impl Default for LockRegistry {
378    fn default() -> Self {
379        Self::new()
380    }
381}
382
383// ---------------------------------------------------------------------------
384// LockRegistryHandle — Arc wrapper for Tauri state
385// ---------------------------------------------------------------------------
386
387/// `Arc<LockRegistry>` wrapped for Tauri's `State` system.
388#[derive(Clone)]
389pub struct LockRegistryHandle(pub Arc<LockRegistry>);
390
391impl LockRegistryHandle {
392    pub fn new(registry: LockRegistry) -> Self {
393        Self(Arc::new(registry))
394    }
395
396    pub fn inner(&self) -> &LockRegistry {
397        &self.0
398    }
399}
400
401impl std::ops::Deref for LockRegistryHandle {
402    type Target = LockRegistry;
403    fn deref(&self) -> &LockRegistry {
404        &self.0
405    }
406}
407
408impl Default for LockRegistryHandle {
409    fn default() -> Self {
410        Self::new(LockRegistry::new())
411    }
412}
413
414// ---------------------------------------------------------------------------
415// Convenience: emit helpers (used by commands and lifecycle)
416// ---------------------------------------------------------------------------
417
418/// Emit `lock:acquired` for `lock`.
419pub fn emit_acquired<E: EventEmitter>(channel: &E, lock: &ResourceLock) -> Result<(), AppError> {
420    crate::events::emit(
421        channel,
422        EventKind::LockAcquired,
423        LockAcquiredPayload {
424            lock_id: lock.id.clone(),
425            scope: lock.scope.clone(),
426            op_name: lock.op_name.clone(),
427        },
428    )
429}
430
431/// Emit `lock:released` for a lock that was removed with `reason`.
432pub fn emit_released<E: EventEmitter>(
433    channel: &E,
434    lock: &ResourceLock,
435    reason: ReleaseReason,
436) -> Result<(), AppError> {
437    crate::events::emit(
438        channel,
439        EventKind::LockReleased,
440        LockReleasedPayload {
441            lock_id: lock.id.clone(),
442            scope: lock.scope.clone(),
443            reason,
444        },
445    )
446}
447
448// ---------------------------------------------------------------------------
449// Tests
450// ---------------------------------------------------------------------------
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use crate::events::{EventKind, MockChannel};
456
457    fn profile() -> ProfileId {
458        ProfileId::new("p1")
459    }
460
461    fn bucket() -> BucketId {
462        BucketId::new("my-bucket")
463    }
464
465    fn scope_profile() -> LockScope {
466        LockScope {
467            profile: profile(),
468            bucket: None,
469            prefix: None,
470            key: None,
471        }
472    }
473
474    fn scope_bucket() -> LockScope {
475        LockScope {
476            profile: profile(),
477            bucket: Some(bucket()),
478            prefix: None,
479            key: None,
480        }
481    }
482
483    fn scope_prefix(prefix: &str) -> LockScope {
484        LockScope {
485            profile: profile(),
486            bucket: Some(bucket()),
487            prefix: Some(prefix.to_owned()),
488            key: None,
489        }
490    }
491
492    fn scope_key(prefix: &str, key: &str) -> LockScope {
493        LockScope {
494            profile: profile(),
495            bucket: Some(bucket()),
496            prefix: Some(prefix.to_owned()),
497            key: Some(ObjectKey::new(key)),
498        }
499    }
500
501    const NOW: i64 = 1_000_000;
502
503    // -----------------------------------------------------------------------
504    // LockScope::intersects
505    // -----------------------------------------------------------------------
506
507    #[test]
508    fn different_profiles_do_not_intersect() {
509        let a = LockScope {
510            profile: ProfileId::new("p1"),
511            bucket: None,
512            prefix: None,
513            key: None,
514        };
515        let b = LockScope {
516            profile: ProfileId::new("p2"),
517            bucket: None,
518            prefix: None,
519            key: None,
520        };
521        assert!(!a.intersects(&b));
522    }
523
524    #[test]
525    fn profile_level_intersects_everything_same_profile() {
526        assert!(scope_profile().intersects(&scope_bucket()));
527        assert!(scope_bucket().intersects(&scope_profile()));
528    }
529
530    #[test]
531    fn bucket_level_intersects_prefix_under_same_bucket() {
532        assert!(scope_bucket().intersects(&scope_prefix("images/")));
533        assert!(scope_prefix("images/").intersects(&scope_bucket()));
534    }
535
536    #[test]
537    fn different_buckets_do_not_intersect() {
538        let a = LockScope {
539            profile: profile(),
540            bucket: Some(BucketId::new("bucket-a")),
541            prefix: None,
542            key: None,
543        };
544        let b = LockScope {
545            profile: profile(),
546            bucket: Some(BucketId::new("bucket-b")),
547            prefix: None,
548            key: None,
549        };
550        assert!(!a.intersects(&b));
551    }
552
553    #[test]
554    fn prefix_conflict_by_longest_prefix() {
555        // "images/" is a prefix of "images/cats/" — conflict.
556        assert!(scope_prefix("images/").intersects(&scope_prefix("images/cats/")));
557        assert!(scope_prefix("images/cats/").intersects(&scope_prefix("images/")));
558    }
559
560    #[test]
561    fn disjoint_prefixes_do_not_intersect() {
562        assert!(!scope_prefix("images/").intersects(&scope_prefix("videos/")));
563    }
564
565    #[test]
566    fn same_prefix_different_keys_do_not_intersect() {
567        let a = scope_key("images/", "images/cat.png");
568        let b = scope_key("images/", "images/dog.png");
569        assert!(!a.intersects(&b));
570    }
571
572    #[test]
573    fn same_key_intersects() {
574        let a = scope_key("images/", "images/cat.png");
575        let b = scope_key("images/", "images/cat.png");
576        assert!(a.intersects(&b));
577    }
578
579    /// Regression: two concurrent single-object downloads acquire scopes
580    /// with `prefix: None, key: Some(...)`. The previous intersect logic
581    /// early-returned `true` whenever either prefix was None, so the
582    /// second download was always rejected with AppError::Locked — which
583    /// surfaced as a silently-failing PDF (or HTML, depending on race
584    /// order) when the user downloaded a folder containing both.
585    #[test]
586    fn prefix_none_key_some_different_keys_do_not_intersect() {
587        let a = LockScope {
588            profile: profile(),
589            bucket: Some(bucket()),
590            prefix: None,
591            key: Some(ObjectKey::new("folder/cat.pdf")),
592        };
593        let b = LockScope {
594            profile: profile(),
595            bucket: Some(bucket()),
596            prefix: None,
597            key: Some(ObjectKey::new("folder/dog.html")),
598        };
599        assert!(!a.intersects(&b));
600    }
601
602    #[test]
603    fn prefix_none_key_some_same_key_still_intersects() {
604        let a = LockScope {
605            profile: profile(),
606            bucket: Some(bucket()),
607            prefix: None,
608            key: Some(ObjectKey::new("folder/cat.pdf")),
609        };
610        let b = LockScope {
611            profile: profile(),
612            bucket: Some(bucket()),
613            prefix: None,
614            key: Some(ObjectKey::new("folder/cat.pdf")),
615        };
616        assert!(a.intersects(&b));
617    }
618
619    /// Key-specific scope conflicts with a prefix-wide scope that
620    /// covers it (mixed-mode operations e.g. download single object
621    /// during a bulk upload to its parent prefix).
622    #[test]
623    fn key_under_prefix_intersects() {
624        let key = LockScope {
625            profile: profile(),
626            bucket: Some(bucket()),
627            prefix: None,
628            key: Some(ObjectKey::new("foo/bar.txt")),
629        };
630        let prefix = scope_prefix("foo/");
631        assert!(key.intersects(&prefix));
632        assert!(prefix.intersects(&key));
633    }
634
635    /// Key-specific scope does NOT conflict with a sibling prefix that
636    /// doesn't cover it.
637    #[test]
638    fn key_under_unrelated_prefix_does_not_intersect() {
639        let key = LockScope {
640            profile: profile(),
641            bucket: Some(bucket()),
642            prefix: None,
643            key: Some(ObjectKey::new("foo/bar.txt")),
644        };
645        let prefix = scope_prefix("baz/");
646        assert!(!key.intersects(&prefix));
647        assert!(!prefix.intersects(&key));
648    }
649
650    // -----------------------------------------------------------------------
651    // Test 1: Acquire happy path
652    // -----------------------------------------------------------------------
653
654    #[test]
655    fn acquire_happy_path() {
656        let registry = LockRegistry::new();
657        let channel = MockChannel::default();
658
659        let scope = scope_bucket();
660        let lock_id = registry
661            .acquire(scope.clone(), "DeleteObject", 300, NOW)
662            .expect("acquire should succeed");
663
664        // Emit acquired event.
665        let locks = registry.list(None);
666        assert_eq!(locks.len(), 1);
667        let lock = &locks[0];
668        assert_eq!(lock.id, lock_id);
669        assert_eq!(lock.op_name, "DeleteObject");
670        assert_eq!(lock.ttl_secs, 300);
671
672        emit_acquired(&channel, lock).expect("emit should succeed");
673
674        let emitted = channel.emitted();
675        assert_eq!(emitted.len(), 1);
676        assert_eq!(emitted[0].0, EventKind::LockAcquired);
677        assert_eq!(emitted[0].1["lockId"], lock_id.as_str());
678        assert_eq!(emitted[0].1["opName"], "DeleteObject");
679    }
680
681    // -----------------------------------------------------------------------
682    // Test 2: Double-acquire conflict
683    // -----------------------------------------------------------------------
684
685    #[test]
686    fn double_acquire_conflict() {
687        let registry = LockRegistry::new();
688        let scope = scope_bucket();
689
690        registry
691            .acquire(scope.clone(), "Op1", 300, NOW)
692            .expect("first acquire must succeed");
693
694        let err = registry
695            .acquire(scope.clone(), "Op2", 300, NOW)
696            .expect_err("second acquire on overlapping scope must fail");
697
698        match err {
699            AppError::Locked { op_name, .. } => {
700                assert_eq!(op_name, "Op1");
701            }
702            other => panic!("expected Locked, got {:?}", other),
703        }
704    }
705
706    // -----------------------------------------------------------------------
707    // Test 3: Heartbeat extension
708    // -----------------------------------------------------------------------
709
710    #[test]
711    fn heartbeat_extends_lock_survives_release_stale() {
712        let registry = LockRegistry::new();
713        let scope = scope_bucket();
714        let ttl: u64 = 300;
715
716        let lock_id = registry
717            .acquire(scope.clone(), "Op1", ttl, NOW)
718            .expect("acquire");
719
720        // Advance time past the original TTL.
721        let after_ttl = NOW + ttl as i64 + 1;
722
723        // Heartbeat at `after_ttl` — resets last_heartbeat_at.
724        registry.heartbeat(&lock_id, after_ttl).expect("heartbeat");
725
726        // release_stale at `after_ttl + 1` — lock's new deadline is
727        // after_ttl + ttl = after_ttl + 300; still alive.
728        let stale = registry.release_stale(after_ttl + 1);
729        assert!(
730            stale.is_empty(),
731            "lock should survive after heartbeat extension"
732        );
733
734        // Lock still in registry.
735        let locks = registry.list(None);
736        assert_eq!(locks.len(), 1, "lock should still be present");
737    }
738
739    // -----------------------------------------------------------------------
740    // Test 4: TTL expiry
741    // -----------------------------------------------------------------------
742
743    #[test]
744    fn ttl_expiry_releases_lock_and_emits_event() {
745        let registry = LockRegistry::new();
746        let channel = MockChannel::default();
747        let ttl: u64 = 300;
748
749        let _lock_id = registry
750            .acquire(scope_bucket(), "Op1", ttl, NOW)
751            .expect("acquire");
752
753        // Advance past TTL without heartbeating.
754        let expired_at = NOW + ttl as i64 + 1;
755        let stale = registry.release_stale(expired_at);
756        assert_eq!(stale.len(), 1, "one stale lock should be released");
757
758        // Emit event for each stale lock.
759        for lock in &stale {
760            emit_released(&channel, lock, ReleaseReason::Ttl).expect("emit");
761        }
762
763        let emitted = channel.emitted();
764        assert_eq!(emitted.len(), 1);
765        assert_eq!(emitted[0].0, EventKind::LockReleased);
766        assert_eq!(emitted[0].1["reason"], "ttl");
767
768        // Registry is now empty.
769        assert!(registry.list(None).is_empty());
770    }
771
772    // -----------------------------------------------------------------------
773    // Test 5: Startup cleanup
774    // -----------------------------------------------------------------------
775
776    #[test]
777    fn startup_cleanup_removes_all_locks_and_emits_events() {
778        let registry = LockRegistry::new();
779        let channel = MockChannel::default();
780
781        // Acquire two locks with different scopes (different profiles to avoid
782        // conflict detection).
783        let scope_a = LockScope {
784            profile: ProfileId::new("p-a"),
785            bucket: None,
786            prefix: None,
787            key: None,
788        };
789        let scope_b = LockScope {
790            profile: ProfileId::new("p-b"),
791            bucket: None,
792            prefix: None,
793            key: None,
794        };
795
796        registry
797            .acquire(scope_a, "OpA", 300, NOW)
798            .expect("acquire A");
799        registry
800            .acquire(scope_b, "OpB", 300, NOW)
801            .expect("acquire B");
802
803        assert_eq!(registry.list(None).len(), 2);
804
805        let removed = registry.startup_cleanup();
806        assert_eq!(removed.len(), 2, "all locks should be removed");
807
808        for lock in &removed {
809            emit_released(&channel, lock, ReleaseReason::StartupCleanup).expect("emit");
810        }
811
812        let emitted = channel.emitted();
813        assert_eq!(emitted.len(), 2);
814        for e in &emitted {
815            assert_eq!(e.0, EventKind::LockReleased);
816            assert_eq!(e.1["reason"], "startup_cleanup");
817        }
818
819        assert!(registry.list(None).is_empty());
820    }
821
822    // -----------------------------------------------------------------------
823    // Test 6: Release reasons
824    // -----------------------------------------------------------------------
825
826    #[test]
827    fn release_reasons_emit_correct_events() {
828        let cases = [
829            ReleaseReason::Success,
830            ReleaseReason::Failure,
831            ReleaseReason::Cancel,
832        ];
833        let expected_strings = ["success", "failure", "cancel"];
834
835        for (reason, expected) in cases.iter().zip(expected_strings.iter()) {
836            let registry = LockRegistry::new();
837            let channel = MockChannel::default();
838
839            let lock_id = registry
840                .acquire(scope_bucket(), "Op1", 300, NOW)
841                .expect("acquire");
842
843            let lock = registry.release(&lock_id).expect("release");
844            emit_released(&channel, &lock, reason.clone()).expect("emit");
845
846            let emitted = channel.emitted();
847            assert_eq!(emitted.len(), 1);
848            assert_eq!(emitted[0].0, EventKind::LockReleased);
849            assert_eq!(
850                emitted[0].1["reason"], *expected,
851                "reason should be {:?}",
852                reason
853            );
854            assert_eq!(emitted[0].1["lockId"], lock_id.as_str());
855        }
856    }
857
858    // -----------------------------------------------------------------------
859    // Heartbeat on non-existent lock returns NotFound
860    // -----------------------------------------------------------------------
861
862    #[test]
863    fn heartbeat_nonexistent_lock_returns_not_found() {
864        let registry = LockRegistry::new();
865        let fake_id = LockId::from("00000000-0000-0000-0000-000000000000");
866        let err = registry
867            .heartbeat(&fake_id, NOW)
868            .expect_err("heartbeat on missing lock must fail");
869        match err {
870            AppError::NotFound { .. } => {}
871            other => panic!("expected NotFound, got {:?}", other),
872        }
873    }
874
875    // -----------------------------------------------------------------------
876    // List with scope filter
877    // -----------------------------------------------------------------------
878
879    #[test]
880    fn list_with_scope_filter() {
881        let registry = LockRegistry::new();
882
883        let scope_a = LockScope {
884            profile: ProfileId::new("p-a"),
885            bucket: None,
886            prefix: None,
887            key: None,
888        };
889        let scope_b = LockScope {
890            profile: ProfileId::new("p-b"),
891            bucket: None,
892            prefix: None,
893            key: None,
894        };
895
896        registry
897            .acquire(scope_a.clone(), "OpA", 300, NOW)
898            .expect("acquire A");
899        registry
900            .acquire(scope_b, "OpB", 300, NOW)
901            .expect("acquire B");
902
903        // Filter by profile p-a: should return only OpA.
904        let filtered = registry.list(Some(&scope_a));
905        assert_eq!(filtered.len(), 1);
906        assert_eq!(filtered[0].op_name, "OpA");
907    }
908}