1pub mod lifecycle;
18
19use std::{
20 collections::HashMap,
21 sync::{Arc, Mutex},
22};
23
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use crate::{
28 error::AppError,
29 events::{EventEmitter, EventKind},
30 ids::{BucketId, ObjectKey, ProfileId},
31};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
39#[serde(transparent)]
40pub struct LockId(pub String);
41
42impl LockId {
43 pub fn new_v4() -> Self {
45 Self(Uuid::new_v4().to_string())
46 }
47
48 pub fn as_str(&self) -> &str {
49 &self.0
50 }
51}
52
53impl std::fmt::Display for LockId {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.write_str(&self.0)
56 }
57}
58
59impl From<&str> for LockId {
60 fn from(s: &str) -> Self {
61 Self(s.to_owned())
62 }
63}
64
65impl From<String> for LockId {
66 fn from(s: String) -> Self {
67 Self(s)
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct LockScope {
86 pub profile: ProfileId,
87 pub bucket: Option<BucketId>,
88 pub prefix: Option<String>,
90 pub key: Option<ObjectKey>,
91}
92
93impl LockScope {
94 pub fn intersects(&self, other: &LockScope) -> bool {
100 if self.profile != other.profile {
102 return false;
103 }
104
105 if let (Some(a), Some(b)) = (&self.bucket, &other.bucket) {
107 if a != b {
108 return false;
109 }
110 }
111 if self.bucket.is_none() || other.bucket.is_none() {
114 return true;
115 }
116
117 match (&self.key, &other.key) {
124 (Some(a), Some(b)) => return a == b,
125 (Some(k), None) => {
126 return match &other.prefix {
130 Some(p) => k.as_str().starts_with(p.as_str()),
131 None => true,
132 };
133 }
134 (None, Some(k)) => {
135 return match &self.prefix {
136 Some(p) => k.as_str().starts_with(p.as_str()),
137 None => true,
138 };
139 }
140 (None, None) => {}
141 }
142
143 match (&self.prefix, &other.prefix) {
145 (None, _) | (_, None) => true,
146 (Some(a), Some(b)) => a.starts_with(b.as_str()) || b.starts_with(a.as_str()),
147 }
148 }
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
160#[serde(rename_all = "snake_case")]
161pub enum ReleaseReason {
162 Success,
163 Failure,
164 Cancel,
165 Ttl,
166 StartupCleanup,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "camelCase")]
176pub struct ResourceLock {
177 pub id: LockId,
178 pub scope: LockScope,
179 pub op_name: String,
180 pub acquired_at: i64,
182 pub last_heartbeat_at: i64,
184 pub ttl_secs: u64,
186}
187
188#[derive(Debug, Clone, Serialize)]
193#[serde(rename_all = "camelCase")]
194pub struct LockAcquiredPayload {
195 pub lock_id: LockId,
196 pub scope: LockScope,
197 pub op_name: String,
198}
199
200#[derive(Debug, Clone, Serialize)]
201#[serde(rename_all = "camelCase")]
202pub struct LockReleasedPayload {
203 pub lock_id: LockId,
204 pub scope: LockScope,
205 pub reason: ReleaseReason,
206}
207
208struct RegistryInner {
214 locks: HashMap<LockId, ResourceLock>,
215}
216
217impl RegistryInner {
218 fn new() -> Self {
219 Self {
220 locks: HashMap::new(),
221 }
222 }
223}
224
225pub struct LockRegistry {
229 inner: Mutex<RegistryInner>,
230}
231
232impl LockRegistry {
233 pub fn new() -> Self {
234 Self {
235 inner: Mutex::new(RegistryInner::new()),
236 }
237 }
238
239 pub fn acquire(
251 &self,
252 scope: LockScope,
253 op_name: impl Into<String>,
254 ttl_secs: u64,
255 now: i64,
256 ) -> Result<LockId, AppError> {
257 let op_name = op_name.into();
258 let mut inner = self.inner.lock().expect("lock poisoned");
259
260 for existing in inner.locks.values() {
262 if existing.scope.intersects(&scope) {
263 return Err(AppError::Locked {
264 lock_id: existing.id.as_str().to_owned(),
265 op_name: existing.op_name.clone(),
266 });
267 }
268 }
269
270 let id = LockId::new_v4();
271 let lock = ResourceLock {
272 id: id.clone(),
273 scope,
274 op_name,
275 acquired_at: now,
276 last_heartbeat_at: now,
277 ttl_secs,
278 };
279 inner.locks.insert(id.clone(), lock);
280 Ok(id)
281 }
282
283 pub fn heartbeat(&self, lock_id: &LockId, now: i64) -> Result<(), AppError> {
291 let mut inner = self.inner.lock().expect("lock poisoned");
292 match inner.locks.get_mut(lock_id) {
293 Some(lock) => {
294 lock.last_heartbeat_at = now;
295 Ok(())
296 }
297 None => Err(AppError::NotFound {
298 resource: format!("lock:{}", lock_id.as_str()),
299 }),
300 }
301 }
302
303 pub fn release(&self, lock_id: &LockId) -> Result<ResourceLock, AppError> {
312 let mut inner = self.inner.lock().expect("lock poisoned");
313 inner
314 .locks
315 .remove(lock_id)
316 .ok_or_else(|| AppError::NotFound {
317 resource: format!("lock:{}", lock_id.as_str()),
318 })
319 }
320
321 pub fn list(&self, scope_filter: Option<&LockScope>) -> Vec<ResourceLock> {
328 let inner = self.inner.lock().expect("lock poisoned");
329 inner
330 .locks
331 .values()
332 .filter(|l| scope_filter.map(|f| l.scope.intersects(f)).unwrap_or(true))
333 .cloned()
334 .collect()
335 }
336
337 pub fn release_stale(&self, now: i64) -> Vec<ResourceLock> {
345 let mut inner = self.inner.lock().expect("lock poisoned");
346 let stale_ids: Vec<LockId> = inner
347 .locks
348 .values()
349 .filter(|l| (l.last_heartbeat_at + l.ttl_secs as i64) < now)
350 .map(|l| l.id.clone())
351 .collect();
352
353 let mut released = Vec::with_capacity(stale_ids.len());
354 for id in stale_ids {
355 if let Some(lock) = inner.locks.remove(&id) {
356 released.push(lock);
357 }
358 }
359 released
360 }
361
362 pub fn startup_cleanup(&self) -> Vec<ResourceLock> {
372 let mut inner = self.inner.lock().expect("lock poisoned");
373 inner.locks.drain().map(|(_, v)| v).collect()
374 }
375}
376
377impl Default for LockRegistry {
378 fn default() -> Self {
379 Self::new()
380 }
381}
382
383#[derive(Clone)]
389pub struct LockRegistryHandle(pub Arc<LockRegistry>);
390
391impl LockRegistryHandle {
392 pub fn new(registry: LockRegistry) -> Self {
393 Self(Arc::new(registry))
394 }
395
396 pub fn inner(&self) -> &LockRegistry {
397 &self.0
398 }
399}
400
401impl std::ops::Deref for LockRegistryHandle {
402 type Target = LockRegistry;
403 fn deref(&self) -> &LockRegistry {
404 &self.0
405 }
406}
407
408impl Default for LockRegistryHandle {
409 fn default() -> Self {
410 Self::new(LockRegistry::new())
411 }
412}
413
414pub fn emit_acquired<E: EventEmitter>(channel: &E, lock: &ResourceLock) -> Result<(), AppError> {
420 crate::events::emit(
421 channel,
422 EventKind::LockAcquired,
423 LockAcquiredPayload {
424 lock_id: lock.id.clone(),
425 scope: lock.scope.clone(),
426 op_name: lock.op_name.clone(),
427 },
428 )
429}
430
431pub fn emit_released<E: EventEmitter>(
433 channel: &E,
434 lock: &ResourceLock,
435 reason: ReleaseReason,
436) -> Result<(), AppError> {
437 crate::events::emit(
438 channel,
439 EventKind::LockReleased,
440 LockReleasedPayload {
441 lock_id: lock.id.clone(),
442 scope: lock.scope.clone(),
443 reason,
444 },
445 )
446}
447
448#[cfg(test)]
453mod tests {
454 use super::*;
455 use crate::events::{EventKind, MockChannel};
456
457 fn profile() -> ProfileId {
458 ProfileId::new("p1")
459 }
460
461 fn bucket() -> BucketId {
462 BucketId::new("my-bucket")
463 }
464
465 fn scope_profile() -> LockScope {
466 LockScope {
467 profile: profile(),
468 bucket: None,
469 prefix: None,
470 key: None,
471 }
472 }
473
474 fn scope_bucket() -> LockScope {
475 LockScope {
476 profile: profile(),
477 bucket: Some(bucket()),
478 prefix: None,
479 key: None,
480 }
481 }
482
483 fn scope_prefix(prefix: &str) -> LockScope {
484 LockScope {
485 profile: profile(),
486 bucket: Some(bucket()),
487 prefix: Some(prefix.to_owned()),
488 key: None,
489 }
490 }
491
492 fn scope_key(prefix: &str, key: &str) -> LockScope {
493 LockScope {
494 profile: profile(),
495 bucket: Some(bucket()),
496 prefix: Some(prefix.to_owned()),
497 key: Some(ObjectKey::new(key)),
498 }
499 }
500
501 const NOW: i64 = 1_000_000;
502
503 #[test]
508 fn different_profiles_do_not_intersect() {
509 let a = LockScope {
510 profile: ProfileId::new("p1"),
511 bucket: None,
512 prefix: None,
513 key: None,
514 };
515 let b = LockScope {
516 profile: ProfileId::new("p2"),
517 bucket: None,
518 prefix: None,
519 key: None,
520 };
521 assert!(!a.intersects(&b));
522 }
523
524 #[test]
525 fn profile_level_intersects_everything_same_profile() {
526 assert!(scope_profile().intersects(&scope_bucket()));
527 assert!(scope_bucket().intersects(&scope_profile()));
528 }
529
530 #[test]
531 fn bucket_level_intersects_prefix_under_same_bucket() {
532 assert!(scope_bucket().intersects(&scope_prefix("images/")));
533 assert!(scope_prefix("images/").intersects(&scope_bucket()));
534 }
535
536 #[test]
537 fn different_buckets_do_not_intersect() {
538 let a = LockScope {
539 profile: profile(),
540 bucket: Some(BucketId::new("bucket-a")),
541 prefix: None,
542 key: None,
543 };
544 let b = LockScope {
545 profile: profile(),
546 bucket: Some(BucketId::new("bucket-b")),
547 prefix: None,
548 key: None,
549 };
550 assert!(!a.intersects(&b));
551 }
552
553 #[test]
554 fn prefix_conflict_by_longest_prefix() {
555 assert!(scope_prefix("images/").intersects(&scope_prefix("images/cats/")));
557 assert!(scope_prefix("images/cats/").intersects(&scope_prefix("images/")));
558 }
559
560 #[test]
561 fn disjoint_prefixes_do_not_intersect() {
562 assert!(!scope_prefix("images/").intersects(&scope_prefix("videos/")));
563 }
564
565 #[test]
566 fn same_prefix_different_keys_do_not_intersect() {
567 let a = scope_key("images/", "images/cat.png");
568 let b = scope_key("images/", "images/dog.png");
569 assert!(!a.intersects(&b));
570 }
571
572 #[test]
573 fn same_key_intersects() {
574 let a = scope_key("images/", "images/cat.png");
575 let b = scope_key("images/", "images/cat.png");
576 assert!(a.intersects(&b));
577 }
578
579 #[test]
586 fn prefix_none_key_some_different_keys_do_not_intersect() {
587 let a = LockScope {
588 profile: profile(),
589 bucket: Some(bucket()),
590 prefix: None,
591 key: Some(ObjectKey::new("folder/cat.pdf")),
592 };
593 let b = LockScope {
594 profile: profile(),
595 bucket: Some(bucket()),
596 prefix: None,
597 key: Some(ObjectKey::new("folder/dog.html")),
598 };
599 assert!(!a.intersects(&b));
600 }
601
602 #[test]
603 fn prefix_none_key_some_same_key_still_intersects() {
604 let a = LockScope {
605 profile: profile(),
606 bucket: Some(bucket()),
607 prefix: None,
608 key: Some(ObjectKey::new("folder/cat.pdf")),
609 };
610 let b = LockScope {
611 profile: profile(),
612 bucket: Some(bucket()),
613 prefix: None,
614 key: Some(ObjectKey::new("folder/cat.pdf")),
615 };
616 assert!(a.intersects(&b));
617 }
618
619 #[test]
623 fn key_under_prefix_intersects() {
624 let key = LockScope {
625 profile: profile(),
626 bucket: Some(bucket()),
627 prefix: None,
628 key: Some(ObjectKey::new("foo/bar.txt")),
629 };
630 let prefix = scope_prefix("foo/");
631 assert!(key.intersects(&prefix));
632 assert!(prefix.intersects(&key));
633 }
634
635 #[test]
638 fn key_under_unrelated_prefix_does_not_intersect() {
639 let key = LockScope {
640 profile: profile(),
641 bucket: Some(bucket()),
642 prefix: None,
643 key: Some(ObjectKey::new("foo/bar.txt")),
644 };
645 let prefix = scope_prefix("baz/");
646 assert!(!key.intersects(&prefix));
647 assert!(!prefix.intersects(&key));
648 }
649
650 #[test]
655 fn acquire_happy_path() {
656 let registry = LockRegistry::new();
657 let channel = MockChannel::default();
658
659 let scope = scope_bucket();
660 let lock_id = registry
661 .acquire(scope.clone(), "DeleteObject", 300, NOW)
662 .expect("acquire should succeed");
663
664 let locks = registry.list(None);
666 assert_eq!(locks.len(), 1);
667 let lock = &locks[0];
668 assert_eq!(lock.id, lock_id);
669 assert_eq!(lock.op_name, "DeleteObject");
670 assert_eq!(lock.ttl_secs, 300);
671
672 emit_acquired(&channel, lock).expect("emit should succeed");
673
674 let emitted = channel.emitted();
675 assert_eq!(emitted.len(), 1);
676 assert_eq!(emitted[0].0, EventKind::LockAcquired);
677 assert_eq!(emitted[0].1["lockId"], lock_id.as_str());
678 assert_eq!(emitted[0].1["opName"], "DeleteObject");
679 }
680
681 #[test]
686 fn double_acquire_conflict() {
687 let registry = LockRegistry::new();
688 let scope = scope_bucket();
689
690 registry
691 .acquire(scope.clone(), "Op1", 300, NOW)
692 .expect("first acquire must succeed");
693
694 let err = registry
695 .acquire(scope.clone(), "Op2", 300, NOW)
696 .expect_err("second acquire on overlapping scope must fail");
697
698 match err {
699 AppError::Locked { op_name, .. } => {
700 assert_eq!(op_name, "Op1");
701 }
702 other => panic!("expected Locked, got {:?}", other),
703 }
704 }
705
706 #[test]
711 fn heartbeat_extends_lock_survives_release_stale() {
712 let registry = LockRegistry::new();
713 let scope = scope_bucket();
714 let ttl: u64 = 300;
715
716 let lock_id = registry
717 .acquire(scope.clone(), "Op1", ttl, NOW)
718 .expect("acquire");
719
720 let after_ttl = NOW + ttl as i64 + 1;
722
723 registry.heartbeat(&lock_id, after_ttl).expect("heartbeat");
725
726 let stale = registry.release_stale(after_ttl + 1);
729 assert!(
730 stale.is_empty(),
731 "lock should survive after heartbeat extension"
732 );
733
734 let locks = registry.list(None);
736 assert_eq!(locks.len(), 1, "lock should still be present");
737 }
738
739 #[test]
744 fn ttl_expiry_releases_lock_and_emits_event() {
745 let registry = LockRegistry::new();
746 let channel = MockChannel::default();
747 let ttl: u64 = 300;
748
749 let _lock_id = registry
750 .acquire(scope_bucket(), "Op1", ttl, NOW)
751 .expect("acquire");
752
753 let expired_at = NOW + ttl as i64 + 1;
755 let stale = registry.release_stale(expired_at);
756 assert_eq!(stale.len(), 1, "one stale lock should be released");
757
758 for lock in &stale {
760 emit_released(&channel, lock, ReleaseReason::Ttl).expect("emit");
761 }
762
763 let emitted = channel.emitted();
764 assert_eq!(emitted.len(), 1);
765 assert_eq!(emitted[0].0, EventKind::LockReleased);
766 assert_eq!(emitted[0].1["reason"], "ttl");
767
768 assert!(registry.list(None).is_empty());
770 }
771
772 #[test]
777 fn startup_cleanup_removes_all_locks_and_emits_events() {
778 let registry = LockRegistry::new();
779 let channel = MockChannel::default();
780
781 let scope_a = LockScope {
784 profile: ProfileId::new("p-a"),
785 bucket: None,
786 prefix: None,
787 key: None,
788 };
789 let scope_b = LockScope {
790 profile: ProfileId::new("p-b"),
791 bucket: None,
792 prefix: None,
793 key: None,
794 };
795
796 registry
797 .acquire(scope_a, "OpA", 300, NOW)
798 .expect("acquire A");
799 registry
800 .acquire(scope_b, "OpB", 300, NOW)
801 .expect("acquire B");
802
803 assert_eq!(registry.list(None).len(), 2);
804
805 let removed = registry.startup_cleanup();
806 assert_eq!(removed.len(), 2, "all locks should be removed");
807
808 for lock in &removed {
809 emit_released(&channel, lock, ReleaseReason::StartupCleanup).expect("emit");
810 }
811
812 let emitted = channel.emitted();
813 assert_eq!(emitted.len(), 2);
814 for e in &emitted {
815 assert_eq!(e.0, EventKind::LockReleased);
816 assert_eq!(e.1["reason"], "startup_cleanup");
817 }
818
819 assert!(registry.list(None).is_empty());
820 }
821
822 #[test]
827 fn release_reasons_emit_correct_events() {
828 let cases = [
829 ReleaseReason::Success,
830 ReleaseReason::Failure,
831 ReleaseReason::Cancel,
832 ];
833 let expected_strings = ["success", "failure", "cancel"];
834
835 for (reason, expected) in cases.iter().zip(expected_strings.iter()) {
836 let registry = LockRegistry::new();
837 let channel = MockChannel::default();
838
839 let lock_id = registry
840 .acquire(scope_bucket(), "Op1", 300, NOW)
841 .expect("acquire");
842
843 let lock = registry.release(&lock_id).expect("release");
844 emit_released(&channel, &lock, reason.clone()).expect("emit");
845
846 let emitted = channel.emitted();
847 assert_eq!(emitted.len(), 1);
848 assert_eq!(emitted[0].0, EventKind::LockReleased);
849 assert_eq!(
850 emitted[0].1["reason"], *expected,
851 "reason should be {:?}",
852 reason
853 );
854 assert_eq!(emitted[0].1["lockId"], lock_id.as_str());
855 }
856 }
857
858 #[test]
863 fn heartbeat_nonexistent_lock_returns_not_found() {
864 let registry = LockRegistry::new();
865 let fake_id = LockId::from("00000000-0000-0000-0000-000000000000");
866 let err = registry
867 .heartbeat(&fake_id, NOW)
868 .expect_err("heartbeat on missing lock must fail");
869 match err {
870 AppError::NotFound { .. } => {}
871 other => panic!("expected NotFound, got {:?}", other),
872 }
873 }
874
875 #[test]
880 fn list_with_scope_filter() {
881 let registry = LockRegistry::new();
882
883 let scope_a = LockScope {
884 profile: ProfileId::new("p-a"),
885 bucket: None,
886 prefix: None,
887 key: None,
888 };
889 let scope_b = LockScope {
890 profile: ProfileId::new("p-b"),
891 bucket: None,
892 prefix: None,
893 key: None,
894 };
895
896 registry
897 .acquire(scope_a.clone(), "OpA", 300, NOW)
898 .expect("acquire A");
899 registry
900 .acquire(scope_b, "OpB", 300, NOW)
901 .expect("acquire B");
902
903 let filtered = registry.list(Some(&scope_a));
905 assert_eq!(filtered.len(), 1);
906 assert_eq!(filtered[0].op_name, "OpA");
907 }
908}