//! Background maintenance tasks. use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use sqlx::PgPool; use crate::storage::S3Storage; /// One batch of removed-image objects to delete per round. S3's batch-delete /// caps at 1000; staying well under keeps each request small. const PURGE_BATCH: i64 = 500; /// Periodically delete the S3 objects backing removed images whose object has /// not been purged yet. /// /// Closes two gaps left by the inline best-effort delete in /// `remove_image_handler`: images removed before purge-tracking existed (the /// backlog) and removals whose inline delete failed transiently. The sweep is /// convergent — every successfully deleted object's image is marked /// `s3_purged_at` and never revisited, so steady-state work is zero. /// /// Runs once at startup, then every `interval`. Cancel by aborting the task. pub async fn continuously_purge_removed_images(db: PgPool, s3: Arc, interval: Duration) { loop { match purge_removed_image_objects(&db, &s3).await { Ok(0) => {} Ok(n) => tracing::info!(purged = n, "reconcile: purged orphaned image objects"), Err(e) => tracing::error!(error = %e, "reconcile: image purge sweep failed"), } tokio::time::sleep(interval).await; } } /// Delete the S3 objects for all removed-but-unpurged images, in batches. /// Returns the number of image objects purged. A batch that fails to make any /// progress (every key errored) stops the sweep so it retries next interval /// rather than spinning on the same failures. async fn purge_removed_image_objects(db: &PgPool, s3: &S3Storage) -> Result { let mut purged = 0usize; loop { let pending = mt_db::queries::list_images_pending_s3_purge(db, PURGE_BATCH) .await .map_err(|e| format!("db error listing images to purge: {e}"))?; if pending.is_empty() { break; } let keys: Vec = pending.iter().map(|p| p.s3_key.clone()).collect(); let failures = s3.delete_objects(&keys).await?; let failed: HashSet<&str> = failures.iter().map(|(k, _)| k.as_str()).collect(); for (key, msg) in &failures { tracing::warn!(s3_key = %key, error = %msg, "reconcile: failed to delete image object"); } let purged_ids: Vec = pending .iter() .filter(|p| !failed.contains(p.s3_key.as_str())) .map(|p| p.id) .collect(); if purged_ids.is_empty() { // No progress this round — leave the rest for the next interval. break; } purged += purged_ids.len(); mt_db::mutations::mark_images_s3_purged(db, &purged_ids) .await .map_err(|e| format!("db error marking images purged: {e}"))?; if pending.len() < PURGE_BATCH as usize { break; } } Ok(purged) }