Skip to main content

max / makenotwork

2.9 KB · 79 lines History Blame Raw
1 //! Background maintenance tasks.
2
3 use std::collections::HashSet;
4 use std::sync::Arc;
5 use std::time::Duration;
6
7 use sqlx::PgPool;
8
9 use crate::storage::S3Storage;
10
11 /// One batch of removed-image objects to delete per round. S3's batch-delete
12 /// caps at 1000; staying well under keeps each request small.
13 const PURGE_BATCH: i64 = 500;
14
15 /// Periodically delete the S3 objects backing removed images whose object has
16 /// not been purged yet.
17 ///
18 /// Closes two gaps left by the inline best-effort delete in
19 /// `remove_image_handler`: images removed before purge-tracking existed (the
20 /// backlog) and removals whose inline delete failed transiently. The sweep is
21 /// convergent — every successfully deleted object's image is marked
22 /// `s3_purged_at` and never revisited, so steady-state work is zero.
23 ///
24 /// Runs once at startup, then every `interval`. Cancel by aborting the task.
25 pub async fn continuously_purge_removed_images(db: PgPool, s3: Arc<S3Storage>, interval: Duration) {
26 loop {
27 match purge_removed_image_objects(&db, &s3).await {
28 Ok(0) => {}
29 Ok(n) => tracing::info!(purged = n, "reconcile: purged orphaned image objects"),
30 Err(e) => tracing::error!(error = %e, "reconcile: image purge sweep failed"),
31 }
32 tokio::time::sleep(interval).await;
33 }
34 }
35
36 /// Delete the S3 objects for all removed-but-unpurged images, in batches.
37 /// Returns the number of image objects purged. A batch that fails to make any
38 /// progress (every key errored) stops the sweep so it retries next interval
39 /// rather than spinning on the same failures.
40 async fn purge_removed_image_objects(db: &PgPool, s3: &S3Storage) -> Result<usize, String> {
41 let mut purged = 0usize;
42 loop {
43 let pending = mt_db::queries::list_images_pending_s3_purge(db, PURGE_BATCH)
44 .await
45 .map_err(|e| format!("db error listing images to purge: {e}"))?;
46 if pending.is_empty() {
47 break;
48 }
49
50 let keys: Vec<String> = pending.iter().map(|p| p.s3_key.clone()).collect();
51 let failures = s3.delete_objects(&keys).await?;
52 let failed: HashSet<&str> = failures.iter().map(|(k, _)| k.as_str()).collect();
53 for (key, msg) in &failures {
54 tracing::warn!(s3_key = %key, error = %msg, "reconcile: failed to delete image object");
55 }
56
57 let purged_ids: Vec<uuid::Uuid> = pending
58 .iter()
59 .filter(|p| !failed.contains(p.s3_key.as_str()))
60 .map(|p| p.id)
61 .collect();
62
63 if purged_ids.is_empty() {
64 // No progress this round — leave the rest for the next interval.
65 break;
66 }
67
68 purged += purged_ids.len();
69 mt_db::mutations::mark_images_s3_purged(db, &purged_ids)
70 .await
71 .map_err(|e| format!("db error marking images purged: {e}"))?;
72
73 if pending.len() < PURGE_BATCH as usize {
74 break;
75 }
76 }
77 Ok(purged)
78 }
79