//! Durable queue for S3 object deletions that must survive server crashes. use sqlx::PgPool; use uuid::Uuid; use crate::error::Result; /// A pending S3 deletion record. #[derive(Debug, sqlx::FromRow)] pub struct PendingS3Deletion { pub id: Uuid, pub s3_key: String, pub bucket: String, pub source: String, pub attempts: i32, } /// Enqueue S3 keys for deletion. Each key is (s3_key, bucket). #[tracing::instrument(skip_all)] pub async fn enqueue_deletions( pool: &PgPool, keys: &[(String, String)], source: &str, ) -> Result<()> { if keys.is_empty() { return Ok(()); } let s3_keys: Vec<&str> = keys.iter().map(|(k, _)| k.as_str()).collect(); let buckets: Vec<&str> = keys.iter().map(|(_, b)| b.as_str()).collect(); sqlx::query( "INSERT INTO pending_s3_deletions (s3_key, bucket, source) SELECT * FROM unnest($1::text[], $2::text[], $3::text[]) ON CONFLICT DO NOTHING", ) .bind(&s3_keys) .bind(&buckets) .bind(vec![source; keys.len()]) .execute(pool) .await?; Ok(()) } /// Remove completed deletions by ID. #[tracing::instrument(skip_all)] pub async fn remove_completed(pool: &PgPool, ids: &[Uuid]) -> Result<()> { if ids.is_empty() { return Ok(()); } sqlx::query("DELETE FROM pending_s3_deletions WHERE id = ANY($1)") .bind(ids) .execute(pool) .await?; Ok(()) } /// Move permanently-failing deletions off the hot queue into the operator-visible /// dead-letter table (see migration 129). Atomic per the single-statement CTE: /// the rows are deleted from `pending_s3_deletions` and inserted into /// `pending_s3_deletions_dead_letter` in one operation, preserving their /// `created_at`/`attempts`/`last_attempted_at`. Returns the number moved. /// /// Without this the worker only logged and DELETEd such rows, so a malformed /// key / gone bucket / ACL gap left an S3 object orphaned with no durable /// record — the exact leak the dead-letter table was added to prevent. #[tracing::instrument(skip_all)] pub async fn move_to_dead_letter(pool: &PgPool, ids: &[Uuid]) -> Result { if ids.is_empty() { return Ok(0); } let result = sqlx::query( r#" WITH moved AS ( DELETE FROM pending_s3_deletions WHERE id = ANY($1) RETURNING id, s3_key, bucket, source, created_at, attempts, last_attempted_at ) INSERT INTO pending_s3_deletions_dead_letter (id, s3_key, bucket, source, created_at, attempts, last_attempted_at) SELECT id, s3_key, bucket, source, created_at, attempts, last_attempted_at FROM moved ON CONFLICT (id) DO NOTHING "#, ) .bind(ids) .execute(pool) .await?; Ok(result.rows_affected()) } /// How a table column references an S3 object key. #[derive(Clone, Copy)] enum S3KeyMatch { /// `column = $1` — the column stores the bare s3_key. Exact, /// `column LIKE $2 ESCAPE '\'` — the column stores a full URL ending in the /// s3_key (e.g. a CDN cover-image URL). The bound value is `%{escaped_key}`. UrlSuffix, } /// One place a live S3 object key can still be referenced. [`is_s3_key_live`] is /// generated from this registry, so a new s3_key-bearing table is added in /// exactly one place AND declares its own bucket — it can't be checked against /// the wrong bucket (the OTA-key-checked-in-the-main-branch bug this replaced) /// or silently forgotten by one branch of a hand-written query. struct S3KeyRef { bucket: &'static str, table: &'static str, column: &'static str, matching: S3KeyMatch, } /// The registry. To add a table that stores an S3 key, add a row here. const S3_KEY_REFS: &[S3KeyRef] = &[ // ── main bucket: creator content ── S3KeyRef { bucket: "main", table: "media_files", column: "s3_key", matching: S3KeyMatch::Exact }, S3KeyRef { bucket: "main", table: "versions", column: "s3_key", matching: S3KeyMatch::Exact }, S3KeyRef { bucket: "main", table: "items", column: "audio_s3_key", matching: S3KeyMatch::Exact }, S3KeyRef { bucket: "main", table: "items", column: "cover_s3_key", matching: S3KeyMatch::Exact }, S3KeyRef { bucket: "main", table: "items", column: "video_s3_key", matching: S3KeyMatch::Exact }, // cover_image_url stores a full CDN URL, not a bare key; image keys live // under their own `.../image/` prefix so a suffix match is unambiguous. S3KeyRef { bucket: "main", table: "items", column: "cover_image_url", matching: S3KeyMatch::UrlSuffix }, S3KeyRef { bucket: "main", table: "projects", column: "cover_image_url", matching: S3KeyMatch::UrlSuffix }, // gallery images store the bare s3_key directly (not a URL) → exact match. S3KeyRef { bucket: "main", table: "item_images", column: "s3_key", matching: S3KeyMatch::Exact }, S3KeyRef { bucket: "main", table: "project_images", column: "s3_key", matching: S3KeyMatch::Exact }, S3KeyRef { bucket: "main", table: "content_insertions", column: "storage_key", matching: S3KeyMatch::Exact }, // ── synckit bucket: SyncKit blobs + OTA artifacts (both deterministic keys) ── S3KeyRef { bucket: "synckit", table: "sync_blobs", column: "s3_key", matching: S3KeyMatch::Exact }, S3KeyRef { bucket: "synckit", table: "ota_artifacts", column: "s3_key", matching: S3KeyMatch::Exact }, ]; /// Returns true if any live row in `bucket` still references `s3_key`. Used by /// the deletion worker to detect the delete-then-reupload race: a freshly /// uploaded object reusing a queued key must not be torpedoed by the worker /// draining the queue. The tables checked are [`S3_KEY_REFS`] filtered to /// `bucket` — add a new key-bearing table there. /// /// Note: this deliberately does NOT consult `pending_uploads`. An in-flight /// upload whose S3 PUT finished but whose durable row hasn't committed is /// invisible here. That window is safe in practice because the durable row is /// written early for every key class (OTA writes `ota_artifacts` at presign /// time; same-name item/version/media replaces are caught by their confirm /// handlers' idempotency guards before any delete is queued), and a queued /// delete only fires after a >=10-minute staleness check. Adding /// `pending_uploads` would broaden the live-set to ephemeral rows the reaper is /// meant to clean, so it is intentionally excluded. #[tracing::instrument(skip_all)] pub async fn is_s3_key_live(pool: &PgPool, bucket: &str, s3_key: &str) -> Result { let refs: Vec<&S3KeyRef> = S3_KEY_REFS.iter().filter(|r| r.bucket == bucket).collect(); if refs.is_empty() { // Only "main"/"synckit" exist, so an unregistered bucket is unexpected. // Refuse to declare the key dead — don't let the worker delete an object // we have no way to verify. tracing::warn!(bucket, "is_s3_key_live: no registered tables for bucket; treating key as live"); return Ok(true); } // Table/column names are compile-time constants from S3_KEY_REFS (never user // input); the key value is always a bound parameter ($1 exact, $2 url-suffix). let needs_url_suffix = refs.iter().any(|r| matches!(r.matching, S3KeyMatch::UrlSuffix)); let clauses: Vec = refs .iter() .map(|r| match r.matching { S3KeyMatch::Exact => format!("EXISTS(SELECT 1 FROM {} WHERE {} = $1)", r.table, r.column), S3KeyMatch::UrlSuffix => { format!("EXISTS(SELECT 1 FROM {} WHERE {} LIKE $2 ESCAPE '\\')", r.table, r.column) } }) .collect(); let sql = format!("SELECT {}", clauses.join(" OR ")); let mut query = sqlx::query_scalar::<_, bool>(&sql).bind(s3_key); if needs_url_suffix { // Escape LIKE wildcards so a key with a literal `_`/`%` (from a user // folder name) can't false-positive against neighbouring rows. let escaped = s3_key.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_"); // Anchor on the leading path separator. Stored values are always // `{cdn_base}/{s3_key}`, so the key is preceded by `/` in every live // URL. Without the anchor, `LIKE '%u/1/cover.png'` also matches an // unrelated row ending `.../OTHERu/1/cover.png`, falsely marking a dead // object live and suppressing its deletion (orphan leak). Requiring the // `/` boundary makes the substring match a true path-suffix match. query = query.bind(format!("%/{escaped}")); } Ok(query.fetch_one(pool).await?) } /// Fetch stale pending deletions (older than min_age, up to limit). /// Atomically increments attempt count. #[tracing::instrument(skip_all)] pub async fn get_stale_pending( pool: &PgPool, min_age: chrono::Duration, limit: i64, ) -> Result> { let cutoff = chrono::Utc::now() - min_age; let rows = sqlx::query_as::<_, PendingS3Deletion>( r#" UPDATE pending_s3_deletions SET attempts = attempts + 1, last_attempted_at = NOW() WHERE id IN ( SELECT id FROM pending_s3_deletions WHERE created_at < $1 ORDER BY created_at LIMIT $2 FOR UPDATE SKIP LOCKED ) RETURNING id, s3_key, bucket, source, attempts "#, ) .bind(cutoff) .bind(limit) .fetch_all(pool) .await?; Ok(rows) }