Skip to main content

max / makenotwork

Add pending_s3_deletions durable queue for crash-gap safety All 5 S3 deletion paths now enqueue keys before doing destructive work: 1. User CASCADE cleanup (main + synckit buckets, prefixes) 2. Soft-delete item purge (item + version S3 keys) 3. OTA release delete (synckit artifacts) 4. Media file delete (main bucket) 5. Content insertion delete (main bucket) If the server crashes between S3 delete and DB delete, the retry job picks up orphaned keys every 5 minutes (10-min age, batch 100). S3 deletes are idempotent so duplicate retries are safe. Enqueue failure in scheduler paths bails before any destructive work; in route handlers it logs and continues to avoid blocking user requests. Migration 105: pending_s3_deletions table with attempts tracking. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-09 03:28 UTC
Commit: bac01d6f98775b59d7c6b4c7734b051bdde21420
Parent: ba3bbac
9 files changed, +252 insertions, -43 deletions
@@ -161,7 +161,7 @@ Two-pass fuzz: initial scan + deep verification. Items marked REFUTED were dispr
161 161 - [x] MODERATE: Dead webhook retry storm — added `AND attempts < 5` guard to `get_retryable_events` query (`db/webhook_events.rs`)
162 162 - [x] MINOR: WAM client double-slash URL — `trim_end_matches('/')` on base_url (`wam_client.rs`)
163 163 - [x] NOTE: `COUNT(*) LIMIT 1` no-op in monitor — changed to `SELECT EXISTS(...)` (`monitor.rs`)
164 - - [ ] MODERATE: S3 delete then DB delete crash gap — partial S3 prefix deletion + DB delete on retry = permanent orphans (`scheduler/cleanup.rs:26-74`) — see Backlog: `pending_s3_deletions` plan
164 + - [x] MODERATE: S3 delete then DB delete crash gap — implemented `pending_s3_deletions` durable queue across all 5 deletion paths (migration 105)
165 165 - [x] MINOR: Stale refund escalation sends duplicate alerts — `mark_escalated` now runs before alerts, skips on failure (`scheduler/webhooks.rs`)
166 166 - [ ] NOTE: Announcement emails lost on server restart — no delivery persistence (`scheduler/announcements.rs:59-85`)
167 167 - [ ] NOTE: Duplicate onboarding emails on step-advance DB failure (`scheduler/announcements.rs:210-222`)
@@ -217,15 +217,15 @@ the DB references deleted S3 objects (or S3 objects are orphaned with no DB refe
217 217
218 218 **Implementation:**
219 219
220 - - [ ] Migration (`102_pending_s3_deletions.sql`): `pending_s3_deletions` table with `id UUID PK`, `s3_key TEXT NOT NULL`, `bucket TEXT NOT NULL DEFAULT 'main'`, `source TEXT NOT NULL`, `created_at TIMESTAMPTZ`, `attempts INT DEFAULT 0`, `last_attempted_at TIMESTAMPTZ`. Index on `created_at`. No FK to users (must survive CASCADE).
221 - - [ ] DB module (`db/pending_s3_deletions.rs`): `enqueue_deletions(pool, keys: &[(String, String)], source)` bulk INSERT via unnest; `remove_completed(pool, ids: &[Uuid])` DELETE by id; `get_stale_pending(pool, min_age, limit) -> Vec<PendingS3Deletion>` SELECT + UPDATE attempts atomically.
222 - - [ ] Update `cleanup_user_s3_and_delete`: collect all S3 prefixes (user, projects, synckit apps) -> enqueue -> S3 delete (best-effort) -> CASCADE delete user -> dequeue completed. If enqueue fails, bail before any S3 work.
223 - - [ ] Update `purge_expired_deleted_items`: query S3 keys -> enqueue -> S3 delete -> DB purge -> dequeue. If enqueue fails, skip entirely (items stay soft-deleted, retry next tick).
224 - - [ ] Update `delete_release` (ota.rs): enqueue artifact keys -> S3 delete -> DB delete -> dequeue.
225 - - [ ] Update media file delete: enqueue -> S3 delete -> storage decrement -> DB delete -> dequeue.
226 - - [ ] Update insertion delete: enqueue -> S3 delete -> DB delete -> dequeue.
227 - - [ ] Scheduler retry job (`retry_pending_s3_deletions`): fetch rows older than 10 min (batch 100), attempt S3 delete (prefix if trailing `/`, else single object), dequeue on success. Run in sandbox cadence (every 5 ticks). Log warning at attempts > 5.
228 - - [ ] Wire into `scheduler/mod.rs` in sandbox cadence block. Record job run.
220 + - [x] Migration 105: `pending_s3_deletions` table
221 + - [x] DB module `db/pending_s3_deletions.rs`: enqueue, remove, get_stale
222 + - [x] `cleanup_user_s3_and_delete`: enqueue before S3/CASCADE, bail on failure
223 + - [x] `purge_expired_deleted_items`: enqueue before S3/purge, bail on failure
224 + - [x] `delete_release` (ota.rs): enqueue before S3/DB delete
225 + - [x] media file delete: enqueue before S3/DB delete
226 + - [x] insertion delete: enqueue before S3/DB delete
227 + - [x] Scheduler retry job `retry_pending_s3_deletions` (10min, batch 100, sandbox cadence)
228 + - [x] Wired into `scheduler/mod.rs` sandbox block
229 229
230 230 **Edge cases:** S3 deletes are idempotent (404 = success). Duplicate rows are harmless. Enqueue failure = bail before any destructive work. Crash between enqueue and S3 = retry job picks up. Crash after S3 but before DB = next scheduler tick re-processes (re-enqueue is harmless). Advisory lock prevents concurrent schedulers.
231 231
@@ -0,0 +1,11 @@
1 + CREATE TABLE IF NOT EXISTS pending_s3_deletions (
2 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
3 + s3_key TEXT NOT NULL,
4 + bucket TEXT NOT NULL DEFAULT 'main',
5 + source TEXT NOT NULL,
6 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
7 + attempts INT NOT NULL DEFAULT 0,
8 + last_attempted_at TIMESTAMPTZ
9 + );
10 +
11 + CREATE INDEX IF NOT EXISTS idx_pending_s3_deletions_created_at ON pending_s3_deletions(created_at);
@@ -65,6 +65,7 @@ pub(crate) mod moderation;
65 65 pub(crate) mod wishlists;
66 66 pub(crate) mod cart;
67 67 pub(crate) mod page_views;
68 + pub(crate) mod pending_s3_deletions;
68 69 pub(crate) mod pending_uploads;
69 70 pub(crate) mod app_sync;
70 71
@@ -0,0 +1,81 @@
1 + //! Durable queue for S3 object deletions that must survive server crashes.
2 +
3 + use sqlx::PgPool;
4 + use uuid::Uuid;
5 + use crate::error::Result;
6 +
7 + /// A pending S3 deletion record.
8 + #[derive(Debug, sqlx::FromRow)]
9 + pub struct PendingS3Deletion {
10 + pub id: Uuid,
11 + pub s3_key: String,
12 + pub bucket: String,
13 + pub source: String,
14 + pub attempts: i32,
15 + }
16 +
17 + /// Enqueue S3 keys for deletion. Each key is (s3_key, bucket).
18 + #[tracing::instrument(skip_all)]
19 + pub async fn enqueue_deletions(
20 + pool: &PgPool,
21 + keys: &[(String, String)],
22 + source: &str,
23 + ) -> Result<()> {
24 + if keys.is_empty() {
25 + return Ok(());
26 + }
27 + let s3_keys: Vec<&str> = keys.iter().map(|(k, _)| k.as_str()).collect();
28 + let buckets: Vec<&str> = keys.iter().map(|(_, b)| b.as_str()).collect();
29 + sqlx::query(
30 + "INSERT INTO pending_s3_deletions (s3_key, bucket, source) SELECT * FROM unnest($1::text[], $2::text[], $3::text[]) ON CONFLICT DO NOTHING",
31 + )
32 + .bind(&s3_keys)
33 + .bind(&buckets)
34 + .bind(&vec![source; keys.len()])
35 + .execute(pool)
36 + .await?;
37 + Ok(())
38 + }
39 +
40 + /// Remove completed deletions by ID.
41 + #[tracing::instrument(skip_all)]
42 + pub async fn remove_completed(pool: &PgPool, ids: &[Uuid]) -> Result<()> {
43 + if ids.is_empty() {
44 + return Ok(());
45 + }
46 + sqlx::query("DELETE FROM pending_s3_deletions WHERE id = ANY($1)")
47 + .bind(ids)
48 + .execute(pool)
49 + .await?;
50 + Ok(())
51 + }
52 +
53 + /// Fetch stale pending deletions (older than min_age, up to limit).
54 + /// Atomically increments attempt count.
55 + #[tracing::instrument(skip_all)]
56 + pub async fn get_stale_pending(
57 + pool: &PgPool,
58 + min_age: chrono::Duration,
59 + limit: i64,
60 + ) -> Result<Vec<PendingS3Deletion>> {
61 + let cutoff = chrono::Utc::now() - min_age;
62 + let rows = sqlx::query_as::<_, PendingS3Deletion>(
63 + r#"
64 + UPDATE pending_s3_deletions
65 + SET attempts = attempts + 1, last_attempted_at = NOW()
66 + WHERE id IN (
67 + SELECT id FROM pending_s3_deletions
68 + WHERE created_at < $1
69 + ORDER BY created_at
70 + LIMIT $2
71 + FOR UPDATE SKIP LOCKED
72 + )
73 + RETURNING id, s3_key, bucket, source, attempts
74 + "#,
75 + )
76 + .bind(cutoff)
77 + .bind(limit)
78 + .fetch_all(pool)
79 + .await?;
80 + Ok(rows)
81 + }
@@ -280,7 +280,18 @@ pub(super) async fn delete_insertion(
280 280 let insertion = db::content_insertions::get_insertion(&state.db, id, user.id).await?;
281 281 let file_size = insertion.as_ref().map(|i| i.file_size).unwrap_or(0);
282 282
283 - // Optionally delete the S3 object
283 + // Enqueue as a durable safety net
284 + if let Some(ref ins) = insertion {
285 + if let Err(e) = db::pending_s3_deletions::enqueue_deletions(
286 + &state.db,
287 + &[(ins.storage_key.clone(), "main".to_string())],
288 + "insertion_delete",
289 + ).await {
290 + tracing::warn!(error = ?e, "failed to enqueue S3 deletion for insertion");
291 + }
292 + }
293 +
294 + // Optionally delete the S3 object (best-effort)
284 295 if let Some(ref s3) = state.s3
285 296 && let Some(ref ins) = insertion
286 297 {
@@ -247,7 +247,13 @@ async fn delete_release_handler(
247 247 .await?
248 248 .ok_or(AppError::NotFound)?;
249 249
250 - // Clean up S3 artifacts before deleting the DB records
250 + // Enqueue keys as a durable safety net
251 + let enqueue_keys: Vec<(String, String)> = s3_keys.iter().map(|k| (k.clone(), "synckit".to_string())).collect();
252 + if let Err(e) = db::pending_s3_deletions::enqueue_deletions(&state.db, &enqueue_keys, "delete_release").await {
253 + tracing::warn!(error = ?e, "failed to enqueue S3 deletions for release artifacts");
254 + }
255 +
256 + // Clean up S3 artifacts before deleting the DB records (best-effort)
251 257 if let Some(synckit_s3) = state.synckit_s3.as_ref() {
252 258 for key in &s3_keys {
253 259 let _ = synckit_s3.delete_object(key).await;
@@ -371,6 +371,15 @@ pub(super) async fn media_delete(
371 371 return Err(AppError::Forbidden);
372 372 }
373 373
374 + // Enqueue as a durable safety net
375 + if let Err(e) = db::pending_s3_deletions::enqueue_deletions(
376 + &state.db,
377 + &[(file.s3_key.clone(), "main".to_string())],
378 + "media_delete",
379 + ).await {
380 + tracing::warn!(error = ?e, "failed to enqueue S3 deletion for media file");
381 + }
382 +
374 383 // Delete from S3 (log errors but don't block — DB record is the source of truth)
375 384 if let Err(e) = s3.delete_object(&file.s3_key).await {
376 385 tracing::warn!(s3_key = %file.s3_key, error = ?e, "S3 delete failed for media file, proceeding with DB cleanup");
@@ -24,9 +24,34 @@ pub(super) async fn cleanup_sandbox_accounts(state: &AppState) {
24 24 /// Shared between sandbox, terminated, and content-removal account cleanup.
25 25 /// S3 objects are deleted first (before CASCADE removes the DB rows that reference them).
26 26 async fn cleanup_user_s3_and_delete(state: &AppState, user_id: db::UserId, event: &str, label: &str) -> bool {
27 - // Main S3 bucket: user-level and project-level objects
27 + // Collect all S3 prefixes to enqueue as a durable safety net
28 + let mut keys: Vec<(String, String)> = Vec::new();
29 +
30 + // Main bucket prefixes
31 + let user_prefix = format!("{user_id}/");
32 + keys.push((user_prefix.clone(), "main".to_string()));
33 + if let Ok(project_ids) = db::projects::get_project_ids_for_user(&state.db, user_id).await {
34 + for pid in &project_ids {
35 + keys.push((format!("projects/{pid}/"), "main".to_string()));
36 + }
37 + }
38 +
39 + // SyncKit bucket prefixes
40 + if let Ok(apps) = db::synckit::get_sync_apps_by_creator(&state.db, user_id).await {
41 + for app in &apps {
42 + keys.push((format!("{}/", app.id), "synckit".to_string()));
43 + keys.push((format!("ota/{}/", app.id), "synckit".to_string()));
44 + }
45 + }
46 +
47 + // Enqueue all keys before any destructive work
48 + if let Err(e) = db::pending_s3_deletions::enqueue_deletions(&state.db, &keys, label).await {
49 + tracing::error!(error = ?e, %user_id, "{label}: failed to enqueue S3 deletions, aborting cleanup");
50 + return false;
51 + }
52 +
53 + // Best-effort S3 deletes (existing logic)
28 54 if let Some(ref s3) = state.s3 {
29 - let user_prefix = format!("{user_id}/");
30 55 if let Err(e) = s3.delete_prefix(&user_prefix).await {
31 56 tracing::warn!(error = ?e, %user_id, "{label}: failed to delete user S3 objects");
32 57 }
@@ -41,7 +66,6 @@ async fn cleanup_user_s3_and_delete(state: &AppState, user_id: db::UserId, event
41 66 }
42 67 }
43 68
44 - // SyncKit S3 bucket: blobs and OTA artifacts
45 69 if let Some(ref synckit_s3) = state.synckit_s3 {
46 70 if let Ok(apps) = db::synckit::get_sync_apps_by_creator(&state.db, user_id).await {
47 71 for app in &apps {
@@ -190,40 +214,49 @@ pub(super) async fn scrub_stale_ip_addresses(state: &AppState) {
190 214 /// before DB deletion to prevent orphaned storage and accounting drift.
191 215 pub(super) async fn purge_expired_deleted_items(state: &AppState) {
192 216 // Collect S3 keys from items AND their versions before CASCADE delete destroys the data
193 - if let Some(ref s3) = state.s3 {
194 - // Item-level S3 keys (audio, cover, video)
195 - match db::items::get_expired_deleted_item_s3_keys(&state.db).await {
196 - Ok(keys) => {
197 - for key in &keys {
198 - if let Err(e) = s3.delete_object(key).await {
199 - tracing::warn!(key = %key, error = ?e, "failed to delete S3 object for purged item");
200 - }
201 - }
202 - if !keys.is_empty() {
203 - tracing::info!(count = keys.len(), "deleted item S3 objects for purged items");
204 - }
205 - }
206 - Err(e) => {
207 - tracing::error!(error = ?e, "failed to query item S3 keys for items pending purge");
217 + let mut all_s3_keys: Vec<(String, String)> = Vec::new();
218 +
219 + match db::items::get_expired_deleted_item_s3_keys(&state.db).await {
220 + Ok(keys) => {
221 + for key in &keys {
222 + all_s3_keys.push((key.clone(), "main".to_string()));
208 223 }
209 224 }
225 + Err(e) => {
226 + tracing::error!(error = ?e, "failed to query item S3 keys for items pending purge");
227 + }
228 + }
210 229
211 - // Version-level S3 keys (download files attached to items being purged)
212 - match db::items::get_expired_deleted_item_version_s3_keys(&state.db).await {
213 - Ok(keys) => {
214 - for key in &keys {
215 - if let Err(e) = s3.delete_object(key).await {
216 - tracing::warn!(key = %key, error = ?e, "failed to delete version S3 object for purged item");
217 - }
218 - }
219 - if !keys.is_empty() {
220 - tracing::info!(count = keys.len(), "deleted version S3 objects for purged items");
221 - }
230 + match db::items::get_expired_deleted_item_version_s3_keys(&state.db).await {
231 + Ok(keys) => {
232 + for key in &keys {
233 + all_s3_keys.push((key.clone(), "main".to_string()));
222 234 }
223 - Err(e) => {
224 - tracing::error!(error = ?e, "failed to query version S3 keys for items pending purge");
235 + }
236 + Err(e) => {
237 + tracing::error!(error = ?e, "failed to query version S3 keys for items pending purge");
238 + }
239 + }
240 +
241 + // Enqueue all keys as a durable safety net before any destructive work
242 + if !all_s3_keys.is_empty() {
243 + if let Err(e) = db::pending_s3_deletions::enqueue_deletions(&state.db, &all_s3_keys, "purge_deleted_items").await {
244 + tracing::error!(error = ?e, "failed to enqueue S3 deletions for purged items, aborting purge");
245 + return;
246 + }
247 + }
248 +
249 + // Best-effort S3 deletes (existing logic)
250 + if let Some(ref s3) = state.s3 {
251 + let item_keys: Vec<&str> = all_s3_keys.iter().map(|(k, _)| k.as_str()).collect();
252 + for key in &item_keys {
253 + if let Err(e) = s3.delete_object(key).await {
254 + tracing::warn!(key = %key, error = ?e, "failed to delete S3 object for purged item");
225 255 }
226 256 }
257 + if !item_keys.is_empty() {
258 + tracing::info!(count = item_keys.len(), "deleted S3 objects for purged items");
259 + }
227 260 }
228 261
229 262 // Decrement storage for each affected user before CASCADE delete
@@ -328,6 +361,62 @@ pub(super) async fn cleanup_cart_items(state: &AppState) {
328 361 }
329 362 }
330 363
364 + /// Retry stale pending S3 deletions (older than 10 minutes, batch of 100).
365 + pub(super) async fn retry_pending_s3_deletions(state: &AppState) {
366 + let stale = match db::pending_s3_deletions::get_stale_pending(
367 + &state.db,
368 + chrono::Duration::minutes(10),
369 + 100,
370 + ).await {
371 + Ok(rows) if rows.is_empty() => return,
372 + Ok(rows) => rows,
373 + Err(e) => {
374 + tracing::error!(error = ?e, "failed to fetch stale pending S3 deletions");
375 + return;
376 + }
377 + };
378 +
379 + let mut completed_ids = Vec::new();
380 + for row in &stale {
381 + if row.attempts > 5 {
382 + tracing::warn!(s3_key = %row.s3_key, bucket = %row.bucket, source = %row.source, attempts = row.attempts,
383 + "S3 deletion stuck after 5+ attempts");
384 + }
385 +
386 + let s3 = if row.bucket == "synckit" {
387 + state.synckit_s3.as_ref()
388 + } else {
389 + state.s3.as_ref()
390 + };
391 +
392 + if let Some(s3) = s3 {
393 + let result = if row.s3_key.ends_with('/') {
394 + s3.delete_prefix(&row.s3_key).await
395 + } else {
396 + s3.delete_object(&row.s3_key).await
397 + };
398 + match result {
399 + Ok(()) => completed_ids.push(row.id),
400 + Err(e) => {
401 + tracing::warn!(s3_key = %row.s3_key, error = ?e, "retry S3 deletion failed");
402 + }
403 + }
404 + } else {
405 + // S3 not configured — remove from queue (can't delete what doesn't exist)
406 + completed_ids.push(row.id);
407 + }
408 + }
409 +
410 + if !completed_ids.is_empty() {
411 + if let Err(e) = db::pending_s3_deletions::remove_completed(&state.db, &completed_ids).await {
412 + tracing::error!(error = ?e, "failed to dequeue completed S3 deletions");
413 + } else {
414 + tracing::info!(completed = completed_ids.len(), total = stale.len(), "retried pending S3 deletions");
415 + }
416 + let _ = db::scheduler_jobs::record_job_run(&state.db, "s3_deletion_retry", completed_ids.len() as i64).await;
417 + }
418 + }
419 +
331 420 #[cfg(test)]
332 421 mod tests {
333 422 use super::*;
@@ -156,6 +156,7 @@ pub fn spawn_scheduler(
156 156 // Clean up expired sandbox accounts (every 5 ticks = 5 min at 60s interval)
157 157 if run_sandbox {
158 158 cleanup::cleanup_sandbox_accounts(&state).await;
159 + cleanup::retry_pending_s3_deletions(&state).await;
159 160 }
160 161
161 162 // Retry failed webhook events