| 1 |
|
| 2 |
|
| 3 |
use std::collections::HashSet; |
| 4 |
use std::sync::Arc; |
| 5 |
use std::time::Duration; |
| 6 |
|
| 7 |
use sqlx::PgPool; |
| 8 |
|
| 9 |
use crate::storage::S3Storage; |
| 10 |
|
| 11 |
|
| 12 |
|
| 13 |
const PURGE_BATCH: i64 = 500; |
| 14 |
|
| 15 |
|
| 16 |
|
| 17 |
|
| 18 |
|
| 19 |
|
| 20 |
|
| 21 |
|
| 22 |
|
| 23 |
|
| 24 |
|
| 25 |
pub async fn continuously_purge_removed_images(db: PgPool, s3: Arc<S3Storage>, interval: Duration) { |
| 26 |
loop { |
| 27 |
match purge_removed_image_objects(&db, &s3).await { |
| 28 |
Ok(0) => {} |
| 29 |
Ok(n) => tracing::info!(purged = n, "reconcile: purged orphaned image objects"), |
| 30 |
Err(e) => tracing::error!(error = %e, "reconcile: image purge sweep failed"), |
| 31 |
} |
| 32 |
tokio::time::sleep(interval).await; |
| 33 |
} |
| 34 |
} |
| 35 |
|
| 36 |
|
| 37 |
|
| 38 |
|
| 39 |
|
| 40 |
async fn purge_removed_image_objects(db: &PgPool, s3: &S3Storage) -> Result<usize, String> { |
| 41 |
let mut purged = 0usize; |
| 42 |
loop { |
| 43 |
let pending = mt_db::queries::list_images_pending_s3_purge(db, PURGE_BATCH) |
| 44 |
.await |
| 45 |
.map_err(|e| format!("db error listing images to purge: {e}"))?; |
| 46 |
if pending.is_empty() { |
| 47 |
break; |
| 48 |
} |
| 49 |
|
| 50 |
let keys: Vec<String> = pending.iter().map(|p| p.s3_key.clone()).collect(); |
| 51 |
let failures = s3.delete_objects(&keys).await?; |
| 52 |
let failed: HashSet<&str> = failures.iter().map(|(k, _)| k.as_str()).collect(); |
| 53 |
for (key, msg) in &failures { |
| 54 |
tracing::warn!(s3_key = %key, error = %msg, "reconcile: failed to delete image object"); |
| 55 |
} |
| 56 |
|
| 57 |
let purged_ids: Vec<uuid::Uuid> = pending |
| 58 |
.iter() |
| 59 |
.filter(|p| !failed.contains(p.s3_key.as_str())) |
| 60 |
.map(|p| p.id) |
| 61 |
.collect(); |
| 62 |
|
| 63 |
if purged_ids.is_empty() { |
| 64 |
|
| 65 |
break; |
| 66 |
} |
| 67 |
|
| 68 |
purged += purged_ids.len(); |
| 69 |
mt_db::mutations::mark_images_s3_purged(db, &purged_ids) |
| 70 |
.await |
| 71 |
.map_err(|e| format!("db error marking images purged: {e}"))?; |
| 72 |
|
| 73 |
if pending.len() < PURGE_BATCH as usize { |
| 74 |
break; |
| 75 |
} |
| 76 |
} |
| 77 |
Ok(purged) |
| 78 |
} |
| 79 |
|