Skip to main content

max / makenotwork

9.2 KB · 215 lines History Blame Raw
1 //! Durable queue for S3 object deletions that must survive server crashes.
2
3 use sqlx::PgPool;
4 use uuid::Uuid;
5 use crate::error::Result;
6
7 /// A pending S3 deletion record.
8 #[derive(Debug, sqlx::FromRow)]
9 pub struct PendingS3Deletion {
10 pub id: Uuid,
11 pub s3_key: String,
12 pub bucket: String,
13 pub source: String,
14 pub attempts: i32,
15 }
16
17 /// Enqueue S3 keys for deletion. Each key is (s3_key, bucket).
18 #[tracing::instrument(skip_all)]
19 pub async fn enqueue_deletions(
20 pool: &PgPool,
21 keys: &[(String, String)],
22 source: &str,
23 ) -> Result<()> {
24 if keys.is_empty() {
25 return Ok(());
26 }
27 let s3_keys: Vec<&str> = keys.iter().map(|(k, _)| k.as_str()).collect();
28 let buckets: Vec<&str> = keys.iter().map(|(_, b)| b.as_str()).collect();
29 sqlx::query(
30 "INSERT INTO pending_s3_deletions (s3_key, bucket, source) SELECT * FROM unnest($1::text[], $2::text[], $3::text[]) ON CONFLICT DO NOTHING",
31 )
32 .bind(&s3_keys)
33 .bind(&buckets)
34 .bind(vec![source; keys.len()])
35 .execute(pool)
36 .await?;
37 Ok(())
38 }
39
40 /// Remove completed deletions by ID.
41 #[tracing::instrument(skip_all)]
42 pub async fn remove_completed(pool: &PgPool, ids: &[Uuid]) -> Result<()> {
43 if ids.is_empty() {
44 return Ok(());
45 }
46 sqlx::query("DELETE FROM pending_s3_deletions WHERE id = ANY($1)")
47 .bind(ids)
48 .execute(pool)
49 .await?;
50 Ok(())
51 }
52
53 /// Move permanently-failing deletions off the hot queue into the operator-visible
54 /// dead-letter table (see migration 129). Atomic per the single-statement CTE:
55 /// the rows are deleted from `pending_s3_deletions` and inserted into
56 /// `pending_s3_deletions_dead_letter` in one operation, preserving their
57 /// `created_at`/`attempts`/`last_attempted_at`. Returns the number moved.
58 ///
59 /// Without this the worker only logged and DELETEd such rows, so a malformed
60 /// key / gone bucket / ACL gap left an S3 object orphaned with no durable
61 /// record — the exact leak the dead-letter table was added to prevent.
62 #[tracing::instrument(skip_all)]
63 pub async fn move_to_dead_letter(pool: &PgPool, ids: &[Uuid]) -> Result<u64> {
64 if ids.is_empty() {
65 return Ok(0);
66 }
67 let result = sqlx::query(
68 r#"
69 WITH moved AS (
70 DELETE FROM pending_s3_deletions
71 WHERE id = ANY($1)
72 RETURNING id, s3_key, bucket, source, created_at, attempts, last_attempted_at
73 )
74 INSERT INTO pending_s3_deletions_dead_letter
75 (id, s3_key, bucket, source, created_at, attempts, last_attempted_at)
76 SELECT id, s3_key, bucket, source, created_at, attempts, last_attempted_at
77 FROM moved
78 ON CONFLICT (id) DO NOTHING
79 "#,
80 )
81 .bind(ids)
82 .execute(pool)
83 .await?;
84 Ok(result.rows_affected())
85 }
86
87 /// How a table column references an S3 object key.
88 #[derive(Clone, Copy)]
89 enum S3KeyMatch {
90 /// `column = $1` — the column stores the bare s3_key.
91 Exact,
92 /// `column LIKE $2 ESCAPE '\'` — the column stores a full URL ending in the
93 /// s3_key (e.g. a CDN cover-image URL). The bound value is `%{escaped_key}`.
94 UrlSuffix,
95 }
96
97 /// One place a live S3 object key can still be referenced. [`is_s3_key_live`] is
98 /// generated from this registry, so a new s3_key-bearing table is added in
99 /// exactly one place AND declares its own bucket — it can't be checked against
100 /// the wrong bucket (the OTA-key-checked-in-the-main-branch bug this replaced)
101 /// or silently forgotten by one branch of a hand-written query.
102 struct S3KeyRef {
103 bucket: &'static str,
104 table: &'static str,
105 column: &'static str,
106 matching: S3KeyMatch,
107 }
108
109 /// The registry. To add a table that stores an S3 key, add a row here.
110 const S3_KEY_REFS: &[S3KeyRef] = &[
111 // ── main bucket: creator content ──
112 S3KeyRef { bucket: "main", table: "media_files", column: "s3_key", matching: S3KeyMatch::Exact },
113 S3KeyRef { bucket: "main", table: "versions", column: "s3_key", matching: S3KeyMatch::Exact },
114 S3KeyRef { bucket: "main", table: "items", column: "audio_s3_key", matching: S3KeyMatch::Exact },
115 S3KeyRef { bucket: "main", table: "items", column: "cover_s3_key", matching: S3KeyMatch::Exact },
116 S3KeyRef { bucket: "main", table: "items", column: "video_s3_key", matching: S3KeyMatch::Exact },
117 // cover_image_url stores a full CDN URL, not a bare key; image keys live
118 // under their own `.../image/` prefix so a suffix match is unambiguous.
119 S3KeyRef { bucket: "main", table: "items", column: "cover_image_url", matching: S3KeyMatch::UrlSuffix },
120 S3KeyRef { bucket: "main", table: "projects", column: "cover_image_url", matching: S3KeyMatch::UrlSuffix },
121 // gallery images store the bare s3_key directly (not a URL) → exact match.
122 S3KeyRef { bucket: "main", table: "item_images", column: "s3_key", matching: S3KeyMatch::Exact },
123 S3KeyRef { bucket: "main", table: "project_images", column: "s3_key", matching: S3KeyMatch::Exact },
124 S3KeyRef { bucket: "main", table: "content_insertions", column: "storage_key", matching: S3KeyMatch::Exact },
125 // ── synckit bucket: SyncKit blobs + OTA artifacts (both deterministic keys) ──
126 S3KeyRef { bucket: "synckit", table: "sync_blobs", column: "s3_key", matching: S3KeyMatch::Exact },
127 S3KeyRef { bucket: "synckit", table: "ota_artifacts", column: "s3_key", matching: S3KeyMatch::Exact },
128 ];
129
130 /// Returns true if any live row in `bucket` still references `s3_key`. Used by
131 /// the deletion worker to detect the delete-then-reupload race: a freshly
132 /// uploaded object reusing a queued key must not be torpedoed by the worker
133 /// draining the queue. The tables checked are [`S3_KEY_REFS`] filtered to
134 /// `bucket` — add a new key-bearing table there.
135 ///
136 /// Note: this deliberately does NOT consult `pending_uploads`. An in-flight
137 /// upload whose S3 PUT finished but whose durable row hasn't committed is
138 /// invisible here. That window is safe in practice because the durable row is
139 /// written early for every key class (OTA writes `ota_artifacts` at presign
140 /// time; same-name item/version/media replaces are caught by their confirm
141 /// handlers' idempotency guards before any delete is queued), and a queued
142 /// delete only fires after a >=10-minute staleness check. Adding
143 /// `pending_uploads` would broaden the live-set to ephemeral rows the reaper is
144 /// meant to clean, so it is intentionally excluded.
145 #[tracing::instrument(skip_all)]
146 pub async fn is_s3_key_live(pool: &PgPool, bucket: &str, s3_key: &str) -> Result<bool> {
147 let refs: Vec<&S3KeyRef> = S3_KEY_REFS.iter().filter(|r| r.bucket == bucket).collect();
148 if refs.is_empty() {
149 // Only "main"/"synckit" exist, so an unregistered bucket is unexpected.
150 // Refuse to declare the key dead — don't let the worker delete an object
151 // we have no way to verify.
152 tracing::warn!(bucket, "is_s3_key_live: no registered tables for bucket; treating key as live");
153 return Ok(true);
154 }
155
156 // Table/column names are compile-time constants from S3_KEY_REFS (never user
157 // input); the key value is always a bound parameter ($1 exact, $2 url-suffix).
158 let needs_url_suffix = refs.iter().any(|r| matches!(r.matching, S3KeyMatch::UrlSuffix));
159 let clauses: Vec<String> = refs
160 .iter()
161 .map(|r| match r.matching {
162 S3KeyMatch::Exact => format!("EXISTS(SELECT 1 FROM {} WHERE {} = $1)", r.table, r.column),
163 S3KeyMatch::UrlSuffix => {
164 format!("EXISTS(SELECT 1 FROM {} WHERE {} LIKE $2 ESCAPE '\\')", r.table, r.column)
165 }
166 })
167 .collect();
168 let sql = format!("SELECT {}", clauses.join(" OR "));
169
170 let mut query = sqlx::query_scalar::<_, bool>(&sql).bind(s3_key);
171 if needs_url_suffix {
172 // Escape LIKE wildcards so a key with a literal `_`/`%` (from a user
173 // folder name) can't false-positive against neighbouring rows.
174 let escaped = s3_key.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_");
175 // Anchor on the leading path separator. Stored values are always
176 // `{cdn_base}/{s3_key}`, so the key is preceded by `/` in every live
177 // URL. Without the anchor, `LIKE '%u/1/cover.png'` also matches an
178 // unrelated row ending `.../OTHERu/1/cover.png`, falsely marking a dead
179 // object live and suppressing its deletion (orphan leak). Requiring the
180 // `/` boundary makes the substring match a true path-suffix match.
181 query = query.bind(format!("%/{escaped}"));
182 }
183 Ok(query.fetch_one(pool).await?)
184 }
185
186 /// Fetch stale pending deletions (older than min_age, up to limit).
187 /// Atomically increments attempt count.
188 #[tracing::instrument(skip_all)]
189 pub async fn get_stale_pending(
190 pool: &PgPool,
191 min_age: chrono::Duration,
192 limit: i64,
193 ) -> Result<Vec<PendingS3Deletion>> {
194 let cutoff = chrono::Utc::now() - min_age;
195 let rows = sqlx::query_as::<_, PendingS3Deletion>(
196 r#"
197 UPDATE pending_s3_deletions
198 SET attempts = attempts + 1, last_attempted_at = NOW()
199 WHERE id IN (
200 SELECT id FROM pending_s3_deletions
201 WHERE created_at < $1
202 ORDER BY created_at
203 LIMIT $2
204 FOR UPDATE SKIP LOCKED
205 )
206 RETURNING id, s3_key, bucket, source, attempts
207 "#,
208 )
209 .bind(cutoff)
210 .bind(limit)
211 .fetch_all(pool)
212 .await?;
213 Ok(rows)
214 }
215