Skip to main content

max / makenotwork

26.3 KB · 631 lines History Blame Raw
1 //! Scheduled cleanup jobs: sandbox expiry, terminated accounts, content
2 //! removal, IP scrubbing, stale pending transactions, orphaned uploads, cart
3 //! items, soft-deleted item purges, and pending S3 deletion retries.
4
5 use crate::constants;
6 use crate::db;
7 use crate::AppState;
8
9 /// Hourly: purge `scan_jobs` rows in a terminal state older than the
10 /// retention window. Queued/running rows are not touched.
11 pub(super) async fn purge_old_scan_jobs(state: &AppState) {
12 let window = chrono::Duration::days(constants::SCAN_JOB_RETENTION_DAYS as i64);
13 match db::scan_jobs::purge_old_terminal(&state.db, window).await {
14 Ok(n) => {
15 if n > 0 {
16 tracing::info!(purged = n, "scan_jobs retention sweep");
17 }
18 let _ = db::scheduler_jobs::record_job_run(
19 &state.db, "scan_jobs_retention", n as i64,
20 ).await;
21 }
22 Err(e) => tracing::error!(error = ?e, "scan_jobs retention sweep failed"),
23 }
24 }
25
26 /// Drain `ids` through `cleanup_user_s3_and_delete` with bounded concurrency
27 /// (`CLEANUP_PARALLELISM` in flight). Returns the count of successful deletes.
28 ///
29 /// The single bounded-drain primitive shared by every per-user cleanup sweep —
30 /// each cleanup holds a few pool conns + an S3 delete-prefix, so the cap (4)
31 /// leaves the bulk of the ~25-conn pool for request handlers even on a
32 /// mass-expiry tick. Hoisting this means the cap can't be forgotten or drift
33 /// between sweeps (Run #12: the two former copies were the predicted recurrence
34 /// of the "guard-by-convention across siblings" meta-pattern).
35 async fn drain_cleanup_bounded(
36 state: &AppState,
37 ids: Vec<db::UserId>,
38 event: &'static str,
39 label: &'static str,
40 ) -> i64 {
41 const CLEANUP_PARALLELISM: usize = 4;
42 let mut set = tokio::task::JoinSet::new();
43 let mut iter = ids.into_iter();
44 let mut deleted = 0i64;
45 loop {
46 while set.len() < CLEANUP_PARALLELISM {
47 match iter.next() {
48 Some(uid) => {
49 let state = state.clone();
50 set.spawn(async move {
51 cleanup_user_s3_and_delete(&state, uid, event, label).await
52 });
53 }
54 None => break,
55 }
56 }
57 match set.join_next().await {
58 Some(Ok(true)) => deleted += 1,
59 Some(Ok(false)) => {}
60 Some(Err(e)) => tracing::warn!(error = ?e, %label, "cleanup task panicked"),
61 None => break,
62 }
63 }
64 deleted
65 }
66
67 /// Delete expired sandbox accounts and their S3 objects.
68 pub(super) async fn cleanup_sandbox_accounts(state: &AppState) {
69 let expired_ids = match db::users::get_expired_sandbox_ids(&state.db).await {
70 Ok(ids) => ids,
71 Err(e) => {
72 tracing::error!(error = ?e, "failed to query expired sandbox accounts");
73 return;
74 }
75 };
76
77 // Drained inline on the tick (sandbox sweep is cheap + frequent); bounded at
78 // 4 by the shared helper.
79 let deleted = drain_cleanup_bounded(state, expired_ids, "sandbox_expired", "sandbox").await;
80 let _ = db::scheduler_jobs::record_job_run(&state.db, "sandbox_cleanup", deleted).await;
81 }
82
83 /// Clean up a user's S3 objects, git repos, and CASCADE-delete the user row.
84 ///
85 /// Shared between sandbox, terminated, and content-removal account cleanup.
86 /// S3 objects are deleted first (before CASCADE removes the DB rows that reference them).
87 async fn cleanup_user_s3_and_delete(state: &AppState, user_id: db::UserId, event: &str, label: &str) -> bool {
88 // Resolve everything we need from the DB once — the enqueue list and the
89 // delete list must come from the same snapshot, or a project/app created
90 // between calls will be enqueued but not deleted (or vice versa).
91 let project_ids = db::projects::get_project_ids_for_user(&state.db, user_id)
92 .await
93 .unwrap_or_default();
94 let sync_apps = db::synckit::get_sync_apps_by_creator(&state.db, user_id)
95 .await
96 .unwrap_or_default();
97
98 let user_prefix = format!("{user_id}/");
99 let mut keys: Vec<(String, String)> = Vec::new();
100 keys.push((user_prefix.clone(), "main".to_string()));
101 for pid in &project_ids {
102 keys.push((format!("projects/{pid}/"), "main".to_string()));
103 }
104 for app in &sync_apps {
105 keys.push((format!("{}/", app.id), "synckit".to_string()));
106 keys.push((format!("ota/{}/", app.id), "synckit".to_string()));
107 }
108
109 // Enqueue all keys before any destructive work
110 if let Err(e) = db::pending_s3_deletions::enqueue_deletions(&state.db, &keys, label).await {
111 tracing::error!(error = ?e, %user_id, "{label}: failed to enqueue S3 deletions, aborting cleanup");
112 return false;
113 }
114
115 if let Some(ref s3) = state.s3 {
116 if let Err(e) = s3.delete_prefix(&user_prefix).await {
117 tracing::warn!(error = ?e, %user_id, "{label}: failed to delete user S3 objects");
118 }
119 for pid in &project_ids {
120 let proj_prefix = format!("projects/{pid}/");
121 if let Err(e) = s3.delete_prefix(&proj_prefix).await {
122 tracing::warn!(error = ?e, %user_id, %pid, "{label}: failed to delete project S3 objects");
123 }
124 }
125 }
126
127 if let Some(ref synckit_s3) = state.synckit_s3 {
128 for app in &sync_apps {
129 let blob_prefix = format!("{}/", app.id);
130 if let Err(e) = synckit_s3.delete_prefix(&blob_prefix).await {
131 tracing::warn!(error = ?e, %user_id, app_id = %app.id, "{label}: failed to delete SyncKit blobs");
132 }
133 let ota_prefix = format!("ota/{}/", app.id);
134 if let Err(e) = synckit_s3.delete_prefix(&ota_prefix).await {
135 tracing::warn!(error = ?e, %user_id, app_id = %app.id, "{label}: failed to delete OTA artifacts");
136 }
137 }
138 }
139
140 // Git repos on disk
141 if let Some(ref git_root) = state.config.git_repos_path
142 && let Ok(Some(user)) = db::users::get_user_by_id(&state.db, user_id).await
143 {
144 cleanup_git_repos_on_disk(git_root, &user.username, user_id).await;
145 }
146
147 // CASCADE delete user row
148 if let Err(e) = db::users::delete_user(&state.db, user_id).await {
149 tracing::error!(error = ?e, %user_id, "{label}: failed to delete account");
150 false
151 } else {
152 tracing::info!(%user_id, event, "{label}: account cleaned up");
153 true
154 }
155 }
156
157 /// Remove a user's bare git repositories from disk.
158 ///
159 /// Must be called before `delete_user` (which CASCADE-deletes the git_repos rows).
160 /// Best-effort: logs warnings on failure but does not block account deletion.
161 /// Runs blocking I/O on a dedicated thread to avoid stalling the Tokio runtime.
162 pub(super) async fn cleanup_git_repos_on_disk(git_repos_path: &str, username: &str, user_id: db::UserId) {
163 let user_git_dir = std::path::Path::new(git_repos_path).join(username);
164 if user_git_dir.exists() {
165 let path = user_git_dir.clone();
166 match tokio::task::spawn_blocking(move || std::fs::remove_dir_all(&path)).await {
167 Ok(Ok(())) => tracing::info!(%user_id, path = %user_git_dir.display(), "deleted git repos from disk"),
168 Ok(Err(e)) => tracing::warn!(error = ?e, %user_id, path = %user_git_dir.display(), "failed to delete git repos from disk"),
169 Err(e) => tracing::warn!(error = ?e, %user_id, path = %user_git_dir.display(), "git repo cleanup task panicked"),
170 }
171 }
172 }
173
174 /// Delete accounts that were terminated >30 days ago.
175 ///
176 /// Per-user S3 sweeps + git-repo removal + CASCADE delete are spawned off the
177 /// scheduler's advisory-lock-held tick so a backlog of expired accounts
178 /// (5 creators × 20 projects of multi-page `delete_prefix` calls) doesn't
179 /// extend the lock hold past `TICK_DURATION_ALERT_SECS`. Each step in
180 /// `cleanup_user_s3_and_delete` is idempotent: `delete_prefix` on a missing
181 /// prefix is a no-op, the CASCADE `delete_user` on a non-existent row affects
182 /// zero rows. If a second tick fires before all spawned cleanups complete
183 /// (extremely large accounts), the next `get_expired_terminated_ids` query
184 /// excludes already-deleted rows; the worst-case race is one extra harmless
185 /// no-op call against an in-flight target.
186 pub(super) async fn delete_expired_terminated_accounts(state: &AppState) {
187 spawn_expired_account_cleanups(
188 state,
189 db::users::get_expired_terminated_ids(&state.db).await,
190 "termination_expired",
191 "terminated account",
192 "terminated_account_cleanup",
193 )
194 .await;
195 }
196
197 /// Delete creator accounts whose 90-day content removal grace period has expired.
198 /// Off-lock spawn pattern — see `delete_expired_terminated_accounts`.
199 pub(super) async fn delete_expired_content_removal_accounts(state: &AppState) {
200 spawn_expired_account_cleanups(
201 state,
202 db::users::get_expired_content_removal_ids(&state.db).await,
203 "content_removal_expired",
204 "content removal",
205 "content_removal_cleanup",
206 )
207 .await;
208 }
209
210 async fn spawn_expired_account_cleanups(
211 state: &AppState,
212 fetched: Result<Vec<db::UserId>, crate::error::AppError>,
213 event: &'static str,
214 label: &'static str,
215 job_name: &'static str,
216 ) {
217 let expired_ids = match fetched {
218 Ok(ids) if ids.is_empty() => {
219 let _ = db::scheduler_jobs::record_job_run(&state.db, job_name, 0).await;
220 return;
221 }
222 Ok(ids) => ids,
223 Err(e) => {
224 tracing::error!(error = ?e, %job_name, "failed to query expired accounts");
225 return;
226 }
227 };
228
229 let scheduled = expired_ids.len() as i64;
230
231 // Run the per-user sweeps in a SINGLE supervisor task spawned off the tick,
232 // draining through the shared bounded-concurrency helper. This keeps the
233 // off-lock property the doc comment relies on (the scheduler tick returns
234 // immediately, never holding the advisory lock across the sweeps) AND caps
235 // how many sweeps run at once. A bare `tokio::spawn` per expired account had
236 // no cap: a mass expiry (terminated-account or 90-day content-removal purge)
237 // could fan out N concurrent tasks, each holding several pool connections
238 // plus an S3 delete-prefix, and drain the connection pool out from under
239 // request handlers (Run #11 SERIOUS).
240 let supervisor_state = state.clone();
241 tokio::spawn(async move {
242 drain_cleanup_bounded(&supervisor_state, expired_ids, event, label).await;
243 });
244
245 // Record the count *scheduled*, not *deleted*: the deletes finish out of band.
246 // Operators read this metric to confirm the scheduler saw the work, not to
247 // count completed sweeps. cleanup_user_s3_and_delete logs its own success/
248 // failure per user.
249 let _ = db::scheduler_jobs::record_job_run(&state.db, job_name, scheduled).await;
250 }
251
252 /// Delete pending transactions older than 25 hours and release promo code reservations.
253 pub(super) async fn cleanup_stale_pending_transactions(state: &AppState) {
254 let promo_ids = match db::transactions::cleanup_stale_pending(
255 &state.db,
256 chrono::Duration::hours(25),
257 )
258 .await
259 {
260 Ok(ids) => ids,
261 Err(e) => {
262 tracing::error!(error = ?e, "failed to clean up stale pending transactions");
263 return;
264 }
265 };
266
267 // Cart checkouts produce N pending-tx rows that share a promo_code_id;
268 // release once per reservation, not once per row.
269 let unique_promo_ids: std::collections::HashSet<_> = promo_ids.into_iter().flatten().collect();
270
271 let mut released = 0i64;
272 for pc_id in unique_promo_ids {
273 if let Err(e) = db::promo_codes::release_use_count(&state.db, pc_id).await {
274 tracing::warn!(promo_code_id = %pc_id, error = ?e, "failed to release promo code use count");
275 } else {
276 released += 1;
277 }
278 }
279
280 if released > 0 {
281 tracing::info!(released, "released promo code reservations from stale pending transactions");
282 }
283 let _ = db::scheduler_jobs::record_job_run(&state.db, "stale_pending_cleanup", released).await;
284 }
285
286 /// NULL out IP addresses older than 30 days in user_sessions.
287 pub(super) async fn scrub_stale_ip_addresses(state: &AppState) {
288 let cutoff = chrono::Utc::now() - chrono::Duration::days(30);
289
290 match sqlx::query(
291 "UPDATE user_sessions SET ip_address = NULL WHERE ip_address IS NOT NULL AND created_at < $1",
292 )
293 .bind(cutoff)
294 .execute(&state.db)
295 .await
296 {
297 Ok(r) => {
298 if r.rows_affected() > 0 {
299 tracing::info!(scrubbed = r.rows_affected(), "scrubbed stale IP addresses (30-day retention)");
300 }
301 let _ = db::scheduler_jobs::record_job_run(&state.db, "ip_scrub", r.rows_affected() as i64).await;
302 }
303 Err(e) => tracing::error!(error = ?e, "failed to scrub IPs from user_sessions"),
304 }
305 }
306
307 /// Permanently delete items that were soft-deleted more than 7 days ago.
308 /// Cleans up S3 objects (item files + version files) and decrements storage
309 /// before DB deletion to prevent orphaned storage and accounting drift.
310 pub(super) async fn purge_expired_deleted_items(state: &AppState) {
311 // Collect S3 keys from items AND their versions before CASCADE delete destroys the data
312 let mut all_s3_keys: Vec<(String, String)> = Vec::new();
313
314 match db::items::get_expired_deleted_item_s3_keys(&state.db).await {
315 Ok(keys) => {
316 for key in &keys {
317 all_s3_keys.push((key.clone(), "main".to_string()));
318 }
319 }
320 Err(e) => {
321 tracing::error!(error = ?e, "failed to query item S3 keys for items pending purge");
322 }
323 }
324
325 match db::items::get_expired_deleted_item_version_s3_keys(&state.db).await {
326 Ok(keys) => {
327 for key in &keys {
328 all_s3_keys.push((key.clone(), "main".to_string()));
329 }
330 }
331 Err(e) => {
332 tracing::error!(error = ?e, "failed to query version S3 keys for items pending purge");
333 }
334 }
335
336 // Enqueue all keys as a durable safety net before any destructive work
337 if !all_s3_keys.is_empty()
338 && let Err(e) = db::pending_s3_deletions::enqueue_deletions(&state.db, &all_s3_keys, "purge_deleted_items").await
339 {
340 tracing::error!(error = ?e, "failed to enqueue S3 deletions for purged items, aborting purge");
341 return;
342 }
343
344 if let Some(ref s3) = state.s3
345 && !all_s3_keys.is_empty()
346 {
347 let keys_only: Vec<String> = all_s3_keys.iter().map(|(k, _)| k.clone()).collect();
348 if let Err(e) = s3.delete_objects(&keys_only).await {
349 tracing::warn!(error = ?e, "batch S3 delete failed for purged items; pending_s3_deletions queue will retry");
350 }
351 tracing::info!(count = all_s3_keys.len(), "deleted S3 objects for purged items");
352 }
353
354 // Decrement storage for each affected user before CASCADE delete
355 match db::items::get_expired_deleted_item_storage_by_user(&state.db).await {
356 Ok(user_sizes) => {
357 for (user_id, total_bytes) in &user_sizes {
358 if *total_bytes > 0
359 && let Err(e) = db::creator_tiers::decrement_storage_used(&state.db, *user_id, *total_bytes).await
360 {
361 tracing::warn!(user_id = %user_id, bytes = total_bytes, error = ?e,
362 "failed to decrement storage for purged items");
363 }
364 }
365 }
366 Err(e) => {
367 tracing::error!(error = ?e, "failed to query storage sizes for items pending purge");
368 }
369 }
370
371 match db::items::purge_expired_deleted_items(&state.db).await {
372 Ok(0) => {
373 let _ = db::scheduler_jobs::record_job_run(&state.db, "soft_delete_purge", 0).await;
374 }
375 Ok(n) => {
376 tracing::info!(deleted = n, "purged expired soft-deleted items");
377 let _ = db::scheduler_jobs::record_job_run(&state.db, "soft_delete_purge", n as i64).await;
378 }
379 Err(e) => {
380 tracing::error!(error = ?e, "failed to purge expired soft-deleted items");
381 }
382 }
383 }
384
385 /// Outcome of a guarded single-key orphan S3 delete.
386 enum GuardedDelete {
387 /// Object deleted (S3 returned Ok).
388 Deleted,
389 /// Skipped: a live DB row still references the key (delete-then-reupload
390 /// race). The caller should clear its queue/record without deleting.
391 SkippedLive,
392 /// S3 delete failed (already logged). The caller decides retry vs. clear.
393 Failed,
394 }
395
396 /// The single funnel for deleting one non-prefix orphan S3 object.
397 ///
398 /// Performs the `is_s3_key_live` check FIRST, so an object that a fresh upload
399 /// reclaimed under the same key is never torpedoed. Both the pending-deletions
400 /// retry worker and the orphaned-upload reaper route every single-key delete
401 /// through here — neither can delete a key without the live-check. (Storage S2
402 /// / CHRONIC 2: the worker had this guard inline; its sibling reaper did not,
403 /// and could delete a now-live deterministic-key object.)
404 async fn delete_orphan_key_guarded(
405 db: &sqlx::PgPool,
406 s3: &dyn crate::storage::StorageBackend,
407 bucket: &str,
408 s3_key: &str,
409 ) -> GuardedDelete {
410 match db::pending_s3_deletions::is_s3_key_live(db, bucket, s3_key).await {
411 Ok(true) => {
412 tracing::info!(s3_key = %s3_key, bucket = %bucket,
413 "orphan S3 delete skipped — key reclaimed by a live row (delete-then-reupload)");
414 return GuardedDelete::SkippedLive;
415 }
416 Ok(false) => {}
417 Err(e) => {
418 tracing::warn!(s3_key = %s3_key, error = ?e,
419 "live-key check failed; proceeding with delete attempt");
420 }
421 }
422 match s3.delete_object(s3_key).await {
423 Ok(()) => GuardedDelete::Deleted,
424 Err(e) => {
425 tracing::warn!(s3_key = %s3_key, bucket = %bucket, error = ?e, "orphan S3 delete failed");
426 GuardedDelete::Failed
427 }
428 }
429 }
430
431 /// Delete S3 objects from presigned uploads that were never confirmed (>24h old).
432 pub(super) async fn cleanup_orphaned_uploads(state: &AppState) {
433 let stale = match db::pending_uploads::get_stale_pending_uploads(
434 &state.db,
435 chrono::Duration::hours(24),
436 )
437 .await
438 {
439 Ok(rows) if rows.is_empty() => {
440 let _ = db::scheduler_jobs::record_job_run(&state.db, "orphaned_upload_cleanup", 0).await;
441 return;
442 }
443 Ok(rows) => rows,
444 Err(e) => {
445 tracing::error!(error = ?e, "failed to query stale pending uploads");
446 return;
447 }
448 };
449
450 let mut cleaned = 0i64;
451 let mut keys_to_delete: Vec<String> = Vec::with_capacity(stale.len());
452
453 for (s3_key, bucket) in &stale {
454 let s3_client = match bucket.as_str() {
455 "synckit" => state.synckit_s3.as_ref(),
456 _ => state.s3.as_ref(),
457 };
458 if let Some(s3) = s3_client {
459 // Route through the guarded funnel: if a confirm already reclaimed
460 // this deterministic key, the live row owns the object and we must
461 // NOT delete it — just clear the stale pending_uploads record.
462 match delete_orphan_key_guarded(&state.db, s3.as_ref(), bucket, s3_key).await {
463 GuardedDelete::Deleted => {
464 cleaned += 1;
465 keys_to_delete.push(s3_key.clone());
466 }
467 // Live row owns the key now: clear the stale record, keep object.
468 GuardedDelete::SkippedLive => keys_to_delete.push(s3_key.clone()),
469 // Delete failed: still clear the record so we don't retry forever.
470 GuardedDelete::Failed => keys_to_delete.push(s3_key.clone()),
471 }
472 } else {
473 // S3 not configured for this bucket; remove the DB record anyway
474 keys_to_delete.push(s3_key.clone());
475 }
476 }
477
478 if !keys_to_delete.is_empty()
479 && let Err(e) = db::pending_uploads::delete_pending_uploads(&state.db, &keys_to_delete).await
480 {
481 tracing::error!(error = ?e, "failed to delete pending upload records");
482 }
483
484 if cleaned > 0 {
485 tracing::info!(cleaned, "cleaned up orphaned presigned uploads");
486 }
487 let _ = db::scheduler_jobs::record_job_run(&state.db, "orphaned_upload_cleanup", cleaned).await;
488 }
489
490 /// Remove stale cart items (>30 days old) and items that became unavailable.
491 pub(super) async fn cleanup_cart_items(state: &AppState) {
492 match db::cart::cleanup_stale_cart_items(&state.db, chrono::Duration::days(30)).await {
493 Ok(n) if n > 0 => tracing::info!(removed = n, "cleaned up stale cart items"),
494 Err(e) => tracing::error!(error = ?e, "failed to clean up stale cart items"),
495 _ => {}
496 }
497 match db::cart::cleanup_unavailable_cart_items(&state.db).await {
498 Ok(n) if n > 0 => tracing::info!(removed = n, "cleaned up unavailable cart items"),
499 Err(e) => tracing::error!(error = ?e, "failed to clean up unavailable cart items"),
500 _ => {}
501 }
502 }
503
504 /// Retry stale pending S3 deletions (older than 10 minutes, batch of 100).
505 pub(super) async fn retry_pending_s3_deletions(state: &AppState) {
506 let stale = match db::pending_s3_deletions::get_stale_pending(
507 &state.db,
508 chrono::Duration::minutes(10),
509 100,
510 ).await {
511 Ok(rows) => rows,
512 Err(e) => {
513 tracing::error!(error = ?e, "failed to fetch stale pending S3 deletions");
514 return;
515 }
516 };
517
518 if stale.is_empty() {
519 let _ = db::scheduler_jobs::record_job_run(&state.db, "s3_deletion_retry", 0).await;
520 return;
521 }
522
523 let mut completed_ids = Vec::new();
524 let mut dead_letter_ids = Vec::new();
525 for row in &stale {
526 if row.attempts >= 10 {
527 tracing::error!(s3_key = %row.s3_key, bucket = %row.bucket, source = %row.source, attempts = row.attempts,
528 "S3 deletion dead-lettered after 10 attempts — moving to dead-letter table for manual triage");
529 dead_letter_ids.push(row.id);
530 continue;
531 } else if row.attempts >= 5 {
532 tracing::warn!(s3_key = %row.s3_key, bucket = %row.bucket, source = %row.source, attempts = row.attempts,
533 "S3 deletion stuck after 5+ attempts");
534 }
535
536 let s3 = if row.bucket == "synckit" {
537 state.synckit_s3.as_ref()
538 } else {
539 state.s3.as_ref()
540 };
541
542 if let Some(s3) = s3 {
543 if row.s3_key.ends_with('/') {
544 // Prefix delete (user-scoped cascade cleanup). Bypasses the
545 // per-key live-check by design, but carries its own guard: a bare
546 // `{user_id}/` prefix wipes a creator's ENTIRE storage and is only
547 // ever enqueued by account cleanup (which CASCADE-deletes the user
548 // row in the same pass). A STILL-LIVE user here means something
549 // wrongly enqueued a prefix delete against a live account — refuse
550 // and let the row climb toward dead-letter triage rather than
551 // silently nuking live files.
552 if row.bucket == "main"
553 && let Some(uid) = row.s3_key.strip_suffix('/').and_then(|s| s.parse::<db::UserId>().ok())
554 && matches!(db::users::get_user_by_id(&state.db, uid).await, Ok(Some(_)))
555 {
556 tracing::error!(s3_key = %row.s3_key, user_id = %uid,
557 "refusing user-prefix S3 delete: user still exists — parking for dead-letter triage instead of wiping live storage");
558 continue;
559 }
560 match s3.delete_prefix(&row.s3_key).await {
561 Ok(()) => completed_ids.push(row.id),
562 Err(e) => {
563 tracing::warn!(s3_key = %row.s3_key, error = ?e, "retry S3 prefix deletion failed");
564 }
565 }
566 } else {
567 // Single-key delete — routed through the one guarded funnel so the
568 // delete-then-reupload live-check can never be skipped. The same
569 // funnel backs the orphaned-upload reaper.
570 match delete_orphan_key_guarded(&state.db, s3.as_ref(), &row.bucket, &row.s3_key).await {
571 GuardedDelete::Deleted | GuardedDelete::SkippedLive => completed_ids.push(row.id),
572 GuardedDelete::Failed => {} // leave queued; climbs toward dead-letter
573 }
574 }
575 } else {
576 // S3 not configured — remove from queue (can't delete what doesn't exist)
577 completed_ids.push(row.id);
578 }
579 }
580
581 // Move permanently-failing rows to the dead-letter table (durable, operator-
582 // visible) rather than silently DELETEing them and orphaning the S3 object.
583 if !dead_letter_ids.is_empty() {
584 match db::pending_s3_deletions::move_to_dead_letter(&state.db, &dead_letter_ids).await {
585 Ok(moved) => tracing::warn!(
586 moved, "moved permanently-failing S3 deletions to dead-letter table — manual triage required"
587 ),
588 Err(e) => tracing::error!(error = ?e, "failed to move S3 deletions to dead-letter table"),
589 }
590 }
591
592 if !completed_ids.is_empty() {
593 if let Err(e) = db::pending_s3_deletions::remove_completed(&state.db, &completed_ids).await {
594 tracing::error!(error = ?e, "failed to dequeue completed S3 deletions");
595 } else {
596 tracing::info!(completed = completed_ids.len(), total = stale.len(), "retried pending S3 deletions");
597 }
598 }
599
600 if !completed_ids.is_empty() || !dead_letter_ids.is_empty() {
601 let processed = (completed_ids.len() + dead_letter_ids.len()) as i64;
602 let _ = db::scheduler_jobs::record_job_run(&state.db, "s3_deletion_retry", processed).await;
603 }
604 }
605
606 #[cfg(test)]
607 mod tests {
608 use super::*;
609
610 #[tokio::test]
611 async fn cleanup_git_repos_removes_directory() {
612 let tmp = tempfile::tempdir().unwrap();
613 let git_root = tmp.path();
614 let user_dir = git_root.join("testuser");
615 std::fs::create_dir_all(user_dir.join("repo.git")).unwrap();
616 std::fs::write(user_dir.join("repo.git/HEAD"), "ref: refs/heads/main\n").unwrap();
617
618 let user_id = db::UserId::nil();
619 cleanup_git_repos_on_disk(git_root.to_str().unwrap(), "testuser", user_id).await;
620
621 assert!(!user_dir.exists(), "user git directory should be deleted");
622 }
623
624 #[tokio::test]
625 async fn cleanup_git_repos_noop_if_missing() {
626 let tmp = tempfile::tempdir().unwrap();
627 let user_id = db::UserId::nil();
628 cleanup_git_repos_on_disk(tmp.path().to_str().unwrap(), "nonexistent", user_id).await;
629 }
630 }
631