1use std::sync::Arc;
24
25use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
26use serde::{Deserialize, Serialize};
27
28use crate::error::AppError;
29use crate::ids::{BucketId, ProfileId};
30
31const MULTIPART_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("multipart_active");
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct MultipartRecord {
48 pub upload_id: String,
50 pub started_at: i64,
52 pub source: String,
58 pub profile_id: ProfileId,
59 pub bucket: BucketId,
60 pub key: String,
62}
63
64#[derive(Clone)]
72pub struct MultipartTable {
73 db: Arc<Database>,
74}
75
76impl MultipartTable {
77 pub fn new(db: Arc<Database>) -> Result<Self, AppError> {
83 let txn = db.begin_write().map_err(|e| AppError::Internal {
85 trace_id: format!("multipart redb begin_write failed: {e}"),
86 })?;
87 {
88 txn.open_table(MULTIPART_TABLE)
89 .map_err(|e| AppError::Internal {
90 trace_id: format!("multipart redb open_table failed: {e}"),
91 })?;
92 }
93 txn.commit().map_err(|e| AppError::Internal {
94 trace_id: format!("multipart redb commit failed: {e}"),
95 })?;
96
97 Ok(Self { db })
98 }
99
100 fn make_key(profile: &ProfileId, bucket: &BucketId, key: &str) -> String {
105 format!("{}\x00{}\x00{}", profile.as_str(), bucket.as_str(), key)
106 }
107
108 pub fn record(&self, rec: &MultipartRecord) -> Result<(), AppError> {
116 let composite = Self::make_key(&rec.profile_id, &rec.bucket, &rec.key);
117 let bytes = serde_json::to_vec(rec).map_err(|e| AppError::Internal {
118 trace_id: format!("multipart serialize failed: {e}"),
119 })?;
120
121 let txn = self.db.begin_write().map_err(|e| AppError::Internal {
122 trace_id: format!("multipart redb begin_write failed: {e}"),
123 })?;
124 {
125 let mut table = txn
126 .open_table(MULTIPART_TABLE)
127 .map_err(|e| AppError::Internal {
128 trace_id: format!("multipart redb open_table failed: {e}"),
129 })?;
130 table
131 .insert(composite.as_str(), bytes.as_slice())
132 .map_err(|e| AppError::Internal {
133 trace_id: format!("multipart redb insert failed: {e}"),
134 })?;
135 }
136 txn.commit().map_err(|e| AppError::Internal {
137 trace_id: format!("multipart redb commit failed: {e}"),
138 })
139 }
140
141 pub fn remove(
149 &self,
150 profile: &ProfileId,
151 bucket: &BucketId,
152 key: &str,
153 ) -> Result<(), AppError> {
154 let composite = Self::make_key(profile, bucket, key);
155
156 let txn = self.db.begin_write().map_err(|e| AppError::Internal {
157 trace_id: format!("multipart redb begin_write failed: {e}"),
158 })?;
159 {
160 let mut table = txn
161 .open_table(MULTIPART_TABLE)
162 .map_err(|e| AppError::Internal {
163 trace_id: format!("multipart redb open_table failed: {e}"),
164 })?;
165 let _ = table
166 .remove(composite.as_str())
167 .map_err(|e| AppError::Internal {
168 trace_id: format!("multipart redb remove failed: {e}"),
169 })?;
170 }
171 txn.commit().map_err(|e| AppError::Internal {
172 trace_id: format!("multipart redb commit failed: {e}"),
173 })
174 }
175
176 pub fn list_all(&self) -> Result<Vec<MultipartRecord>, AppError> {
182 let txn = self.db.begin_read().map_err(|e| AppError::Internal {
183 trace_id: format!("multipart redb begin_read failed: {e}"),
184 })?;
185 let table = txn
186 .open_table(MULTIPART_TABLE)
187 .map_err(|e| AppError::Internal {
188 trace_id: format!("multipart redb open_table failed: {e}"),
189 })?;
190
191 let mut records = Vec::new();
192 for entry in table.iter().map_err(|e| AppError::Internal {
193 trace_id: format!("multipart redb iter failed: {e}"),
194 })? {
195 let (_, v) = entry.map_err(|e| AppError::Internal {
196 trace_id: format!("multipart redb iter entry failed: {e}"),
197 })?;
198 let rec: MultipartRecord =
199 serde_json::from_slice(v.value()).map_err(|e| AppError::Internal {
200 trace_id: format!("multipart deserialize failed: {e}"),
201 })?;
202 records.push(rec);
203 }
204 Ok(records)
205 }
206
207 pub fn list_for_profile(&self, profile: &ProfileId) -> Result<Vec<MultipartRecord>, AppError> {
213 let all = self.list_all()?;
214 Ok(all
215 .into_iter()
216 .filter(|r| &r.profile_id == profile)
217 .collect())
218 }
219
220 pub fn find_by_upload_id(&self, upload_id: &str) -> Result<Option<MultipartRecord>, AppError> {
229 let all = self.list_all()?;
230 Ok(all.into_iter().find(|r| r.upload_id == upload_id))
231 }
232}
233
234#[derive(Clone)]
242pub struct MultipartTableHandle(pub MultipartTable);
243
244impl MultipartTableHandle {
245 pub fn new(table: MultipartTable) -> Self {
246 Self(table)
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
262#[serde(rename_all = "camelCase")]
263pub enum MultipartSource {
264 Brows3r,
266 Unknown,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
279#[serde(rename_all = "camelCase")]
280pub struct MultipartUpload {
281 pub upload_id: String,
283 pub key: String,
285 pub initiated: Option<i64>,
287 pub source: MultipartSource,
289 pub bucket: BucketId,
291}
292
293pub async fn scan_multipart_uploads(
310 client: &aws_sdk_s3::Client,
311 bucket: &BucketId,
312 multipart_table: &MultipartTable,
313 older_than_secs: Option<u64>,
314) -> Result<Vec<MultipartUpload>, AppError> {
315 let resp = client
316 .list_multipart_uploads()
317 .bucket(bucket.as_str())
318 .send()
319 .await
320 .map_err(|e| AppError::Network {
321 source: format!("list_multipart_uploads failed: {e}"),
322 })?;
323
324 let now_secs = std::time::SystemTime::now()
325 .duration_since(std::time::UNIX_EPOCH)
326 .map(|d| d.as_secs())
327 .unwrap_or(0);
328
329 let mut result = Vec::new();
330
331 for upload in resp.uploads() {
332 let upload_id = match upload.upload_id() {
333 Some(id) => id.to_owned(),
334 None => continue,
335 };
336 let key = match upload.key() {
337 Some(k) => k.to_owned(),
338 None => continue,
339 };
340
341 let initiated_secs: Option<i64> = upload
343 .initiated()
344 .and_then(|dt| dt.to_millis().ok())
345 .map(|ms| ms / 1000);
346
347 if let (Some(threshold), Some(init_secs)) = (older_than_secs, initiated_secs) {
349 let age_secs = now_secs.saturating_sub(init_secs as u64);
350 if age_secs < threshold {
351 continue;
352 }
353 }
354
355 let source = if multipart_table.find_by_upload_id(&upload_id)?.is_some() {
356 MultipartSource::Brows3r
357 } else {
358 MultipartSource::Unknown
359 };
360
361 result.push(MultipartUpload {
362 upload_id,
363 key,
364 initiated: initiated_secs,
365 source,
366 bucket: bucket.clone(),
367 });
368 }
369
370 Ok(result)
371}
372
373pub async fn abort_multipart_upload(
392 client: &aws_sdk_s3::Client,
393 bucket: &BucketId,
394 key: &str,
395 upload_id: &str,
396 source: MultipartSource,
397 multipart_table: &MultipartTable,
398 profile_id: &ProfileId,
399 confirmed_unknown: bool,
400) -> Result<(), AppError> {
401 if source == MultipartSource::Unknown && !confirmed_unknown {
402 return Err(AppError::Validation {
403 field: "confirmedUnknown".to_string(),
404 hint: "Aborting an unknown multipart upload requires explicit confirmation".to_string(),
405 });
406 }
407
408 client
409 .abort_multipart_upload()
410 .bucket(bucket.as_str())
411 .key(key)
412 .upload_id(upload_id)
413 .send()
414 .await
415 .map_err(|e| AppError::Network {
416 source: format!("abort_multipart_upload failed: {e}"),
417 })?;
418
419 if source == MultipartSource::Brows3r {
421 multipart_table.remove(profile_id, bucket, key)?;
422 }
423
424 Ok(())
425}
426
427#[cfg(test)]
432mod tests {
433 use super::*;
434 use crate::ids::{BucketId, ProfileId};
435 use std::sync::Arc;
436 use tempfile::tempdir;
437
438 fn open_db(path: &std::path::Path) -> Arc<Database> {
439 Arc::new(Database::create(path).expect("test db must open"))
440 }
441
442 fn profile() -> ProfileId {
443 ProfileId::new("p1")
444 }
445
446 fn bucket() -> BucketId {
447 BucketId::new("my-bucket")
448 }
449
450 fn sample_record(upload_id: &str, key: &str) -> MultipartRecord {
451 MultipartRecord {
452 upload_id: upload_id.to_owned(),
453 started_at: 1_700_000_000_000,
454 source: "brows3r".to_owned(),
455 profile_id: profile(),
456 bucket: bucket(),
457 key: key.to_owned(),
458 }
459 }
460
461 #[test]
466 fn redb_round_trip_record_list_remove() {
467 let dir = tempdir().unwrap();
468 let db = open_db(&dir.path().join("test.redb"));
469 let table = MultipartTable::new(Arc::clone(&db)).expect("table must open");
470
471 let rec = sample_record("upload-abc", "data/file.bin");
472 table.record(&rec).expect("record must succeed");
473
474 let all = table.list_all().expect("list_all must succeed");
475 assert_eq!(all.len(), 1);
476 assert_eq!(all[0], rec);
477
478 table
479 .remove(&profile(), &bucket(), "data/file.bin")
480 .expect("remove must succeed");
481
482 let after = table
483 .list_all()
484 .expect("list_all after remove must succeed");
485 assert!(after.is_empty(), "table must be empty after remove");
486 }
487
488 #[test]
493 fn record_overwrites_same_key() {
494 let dir = tempdir().unwrap();
495 let db = open_db(&dir.path().join("test2.redb"));
496 let table = MultipartTable::new(Arc::clone(&db)).expect("table must open");
497
498 let rec1 = sample_record("upload-1", "obj.bin");
499 let mut rec2 = sample_record("upload-2", "obj.bin");
500 rec2.started_at = 1_700_000_001_000;
501
502 table.record(&rec1).expect("first record must succeed");
503 table.record(&rec2).expect("overwrite must succeed");
504
505 let all = table.list_all().expect("list_all must succeed");
506 assert_eq!(all.len(), 1, "overwrite must not duplicate");
507 assert_eq!(all[0].upload_id, "upload-2", "second record must win");
508 }
509
510 #[test]
515 fn list_for_profile_filters_by_profile() {
516 let dir = tempdir().unwrap();
517 let db = open_db(&dir.path().join("test3.redb"));
518 let table = MultipartTable::new(Arc::clone(&db)).expect("table must open");
519
520 let other_profile = ProfileId::new("other");
521 let rec_p1 = sample_record("up-p1", "a.bin");
522 let mut rec_other = sample_record("up-other", "b.bin");
523 rec_other.profile_id = other_profile.clone();
524
525 table.record(&rec_p1).expect("record p1 must succeed");
526 table.record(&rec_other).expect("record other must succeed");
527
528 let p1_records = table
529 .list_for_profile(&profile())
530 .expect("list_for_profile must succeed");
531 assert_eq!(p1_records.len(), 1);
532 assert_eq!(p1_records[0].upload_id, "up-p1");
533
534 let other_records = table
535 .list_for_profile(&other_profile)
536 .expect("list_for_profile must succeed");
537 assert_eq!(other_records.len(), 1);
538 assert_eq!(other_records[0].upload_id, "up-other");
539 }
540
541 #[test]
546 fn find_by_upload_id_returns_matching_record() {
547 let dir = tempdir().unwrap();
548 let db = open_db(&dir.path().join("test4.redb"));
549 let table = MultipartTable::new(Arc::clone(&db)).expect("table must open");
550
551 let rec = sample_record("uid-xyz", "prefix/obj.bin");
552 table.record(&rec).expect("record must succeed");
553
554 let found = table
555 .find_by_upload_id("uid-xyz")
556 .expect("find must not error");
557 assert!(found.is_some(), "record must be found by upload_id");
558 assert_eq!(found.unwrap().key, "prefix/obj.bin");
559
560 let missing = table
561 .find_by_upload_id("nonexistent")
562 .expect("find must not error");
563 assert!(missing.is_none(), "missing upload_id must return None");
564 }
565
566 #[test]
571 fn remove_missing_key_is_noop() {
572 let dir = tempdir().unwrap();
573 let db = open_db(&dir.path().join("test5.redb"));
574 let table = MultipartTable::new(Arc::clone(&db)).expect("table must open");
575
576 table
578 .remove(&profile(), &bucket(), "nonexistent.bin")
579 .expect("remove must be no-op on missing key");
580 }
581
582 #[test]
587 fn classify_brows3r_upload_by_upload_id() {
588 let dir = tempdir().unwrap();
590 let db = open_db(&dir.path().join("test_class.redb"));
591 let table = MultipartTable::new(Arc::clone(&db)).expect("table must open");
592
593 let rec = sample_record("brows3r-upload-id", "obj.bin");
594 table.record(&rec).expect("record must succeed");
595
596 let found = table
598 .find_by_upload_id("brows3r-upload-id")
599 .expect("find must not error");
600 let source = if found.is_some() {
601 MultipartSource::Brows3r
602 } else {
603 MultipartSource::Unknown
604 };
605 assert_eq!(source, MultipartSource::Brows3r);
606 }
607
608 #[test]
609 fn classify_unknown_upload_when_not_in_table() {
610 let dir = tempdir().unwrap();
612 let db = open_db(&dir.path().join("test_unknown.redb"));
613 let table = MultipartTable::new(Arc::clone(&db)).expect("table must open");
614
615 let found = table
617 .find_by_upload_id("foreign-upload-id")
618 .expect("find must not error");
619 let source = if found.is_some() {
620 MultipartSource::Brows3r
621 } else {
622 MultipartSource::Unknown
623 };
624 assert_eq!(source, MultipartSource::Unknown);
625 }
626
627 #[test]
632 fn age_filter_excludes_young_uploads() {
633 let now_secs: u64 = std::time::SystemTime::now()
635 .duration_since(std::time::UNIX_EPOCH)
636 .map(|d| d.as_secs())
637 .unwrap_or(0);
638
639 let threshold: u64 = 3600; let young_init_secs = (now_secs - 1800) as i64;
643 let age_young = now_secs.saturating_sub(young_init_secs as u64);
644 assert!(
645 age_young < threshold,
646 "young upload must be below threshold"
647 );
648
649 let old_init_secs = (now_secs.saturating_sub(7200)) as i64;
651 let age_old = now_secs.saturating_sub(old_init_secs as u64);
652 assert!(
653 age_old >= threshold,
654 "old upload must meet or exceed threshold"
655 );
656 }
657
658 #[test]
663 fn abort_guard_rejects_unknown_without_confirmation() {
664 let source = MultipartSource::Unknown;
666 let confirmed_unknown = false;
667
668 let result: Result<(), AppError> =
669 if source == MultipartSource::Unknown && !confirmed_unknown {
670 Err(AppError::Validation {
671 field: "confirmedUnknown".to_string(),
672 hint: "Aborting an unknown multipart upload requires explicit confirmation"
673 .to_string(),
674 })
675 } else {
676 Ok(())
677 };
678
679 assert!(
680 result.is_err(),
681 "guard must reject Unknown without confirmation"
682 );
683 let err = result.unwrap_err();
684 match err {
685 AppError::Validation { field, .. } => {
686 assert_eq!(field, "confirmedUnknown");
687 }
688 _ => panic!("expected Validation error"),
689 }
690 }
691
692 #[test]
693 fn abort_guard_allows_unknown_with_confirmation() {
694 let source = MultipartSource::Unknown;
696 let confirmed_unknown = true;
697
698 let result: Result<(), AppError> =
699 if source == MultipartSource::Unknown && !confirmed_unknown {
700 Err(AppError::Validation {
701 field: "confirmedUnknown".to_string(),
702 hint: "Aborting an unknown multipart upload requires explicit confirmation"
703 .to_string(),
704 })
705 } else {
706 Ok(())
707 };
708
709 assert!(result.is_ok(), "guard must allow Unknown with confirmation");
710 }
711
712 #[test]
713 fn abort_guard_allows_brows3r_without_confirmation() {
714 let source = MultipartSource::Brows3r;
716 let confirmed_unknown = false;
717
718 let result: Result<(), AppError> =
719 if source == MultipartSource::Unknown && !confirmed_unknown {
720 Err(AppError::Validation {
721 field: "confirmedUnknown".to_string(),
722 hint: "Aborting an unknown multipart upload requires explicit confirmation"
723 .to_string(),
724 })
725 } else {
726 Ok(())
727 };
728
729 assert!(
730 result.is_ok(),
731 "Brows3r uploads must not require confirmation"
732 );
733 }
734}