//! Scheduled cleanup jobs: sandbox expiry, terminated accounts, content //! removal, IP scrubbing, stale pending transactions, orphaned uploads, cart //! items, soft-deleted item purges, and pending S3 deletion retries. use crate::constants; use crate::db; use crate::AppState; /// Hourly: purge `scan_jobs` rows in a terminal state older than the /// retention window. Queued/running rows are not touched. pub(super) async fn purge_old_scan_jobs(state: &AppState) { let window = chrono::Duration::days(constants::SCAN_JOB_RETENTION_DAYS as i64); match db::scan_jobs::purge_old_terminal(&state.db, window).await { Ok(n) => { if n > 0 { tracing::info!(purged = n, "scan_jobs retention sweep"); } let _ = db::scheduler_jobs::record_job_run( &state.db, "scan_jobs_retention", n as i64, ).await; } Err(e) => tracing::error!(error = ?e, "scan_jobs retention sweep failed"), } } /// Drain `ids` through `cleanup_user_s3_and_delete` with bounded concurrency /// (`CLEANUP_PARALLELISM` in flight). Returns the count of successful deletes. /// /// The single bounded-drain primitive shared by every per-user cleanup sweep — /// each cleanup holds a few pool conns + an S3 delete-prefix, so the cap (4) /// leaves the bulk of the ~25-conn pool for request handlers even on a /// mass-expiry tick. Hoisting this means the cap can't be forgotten or drift /// between sweeps (Run #12: the two former copies were the predicted recurrence /// of the "guard-by-convention across siblings" meta-pattern). async fn drain_cleanup_bounded( state: &AppState, ids: Vec, event: &'static str, label: &'static str, ) -> i64 { const CLEANUP_PARALLELISM: usize = 4; let mut set = tokio::task::JoinSet::new(); let mut iter = ids.into_iter(); let mut deleted = 0i64; loop { while set.len() < CLEANUP_PARALLELISM { match iter.next() { Some(uid) => { let state = state.clone(); set.spawn(async move { cleanup_user_s3_and_delete(&state, uid, event, label).await }); } None => break, } } match set.join_next().await { Some(Ok(true)) => deleted += 1, Some(Ok(false)) => {} Some(Err(e)) => tracing::warn!(error = ?e, %label, "cleanup task panicked"), None => break, } } deleted } /// Delete expired sandbox accounts and their S3 objects. pub(super) async fn cleanup_sandbox_accounts(state: &AppState) { let expired_ids = match db::users::get_expired_sandbox_ids(&state.db).await { Ok(ids) => ids, Err(e) => { tracing::error!(error = ?e, "failed to query expired sandbox accounts"); return; } }; // Drained inline on the tick (sandbox sweep is cheap + frequent); bounded at // 4 by the shared helper. let deleted = drain_cleanup_bounded(state, expired_ids, "sandbox_expired", "sandbox").await; let _ = db::scheduler_jobs::record_job_run(&state.db, "sandbox_cleanup", deleted).await; } /// Clean up a user's S3 objects, git repos, and CASCADE-delete the user row. /// /// Shared between sandbox, terminated, and content-removal account cleanup. /// S3 objects are deleted first (before CASCADE removes the DB rows that reference them). async fn cleanup_user_s3_and_delete(state: &AppState, user_id: db::UserId, event: &str, label: &str) -> bool { // Resolve everything we need from the DB once — the enqueue list and the // delete list must come from the same snapshot, or a project/app created // between calls will be enqueued but not deleted (or vice versa). let project_ids = db::projects::get_project_ids_for_user(&state.db, user_id) .await .unwrap_or_default(); let sync_apps = db::synckit::get_sync_apps_by_creator(&state.db, user_id) .await .unwrap_or_default(); let user_prefix = format!("{user_id}/"); let mut keys: Vec<(String, String)> = Vec::new(); keys.push((user_prefix.clone(), "main".to_string())); for pid in &project_ids { keys.push((format!("projects/{pid}/"), "main".to_string())); } for app in &sync_apps { keys.push((format!("{}/", app.id), "synckit".to_string())); keys.push((format!("ota/{}/", app.id), "synckit".to_string())); } // Enqueue all keys before any destructive work if let Err(e) = db::pending_s3_deletions::enqueue_deletions(&state.db, &keys, label).await { tracing::error!(error = ?e, %user_id, "{label}: failed to enqueue S3 deletions, aborting cleanup"); return false; } if let Some(ref s3) = state.s3 { if let Err(e) = s3.delete_prefix(&user_prefix).await { tracing::warn!(error = ?e, %user_id, "{label}: failed to delete user S3 objects"); } for pid in &project_ids { let proj_prefix = format!("projects/{pid}/"); if let Err(e) = s3.delete_prefix(&proj_prefix).await { tracing::warn!(error = ?e, %user_id, %pid, "{label}: failed to delete project S3 objects"); } } } if let Some(ref synckit_s3) = state.synckit_s3 { for app in &sync_apps { let blob_prefix = format!("{}/", app.id); if let Err(e) = synckit_s3.delete_prefix(&blob_prefix).await { tracing::warn!(error = ?e, %user_id, app_id = %app.id, "{label}: failed to delete SyncKit blobs"); } let ota_prefix = format!("ota/{}/", app.id); if let Err(e) = synckit_s3.delete_prefix(&ota_prefix).await { tracing::warn!(error = ?e, %user_id, app_id = %app.id, "{label}: failed to delete OTA artifacts"); } } } // Git repos on disk if let Some(ref git_root) = state.config.git_repos_path && let Ok(Some(user)) = db::users::get_user_by_id(&state.db, user_id).await { cleanup_git_repos_on_disk(git_root, &user.username, user_id).await; } // CASCADE delete user row if let Err(e) = db::users::delete_user(&state.db, user_id).await { tracing::error!(error = ?e, %user_id, "{label}: failed to delete account"); false } else { tracing::info!(%user_id, event, "{label}: account cleaned up"); true } } /// Remove a user's bare git repositories from disk. /// /// Must be called before `delete_user` (which CASCADE-deletes the git_repos rows). /// Best-effort: logs warnings on failure but does not block account deletion. /// Runs blocking I/O on a dedicated thread to avoid stalling the Tokio runtime. pub(super) async fn cleanup_git_repos_on_disk(git_repos_path: &str, username: &str, user_id: db::UserId) { let user_git_dir = std::path::Path::new(git_repos_path).join(username); if user_git_dir.exists() { let path = user_git_dir.clone(); match tokio::task::spawn_blocking(move || std::fs::remove_dir_all(&path)).await { Ok(Ok(())) => tracing::info!(%user_id, path = %user_git_dir.display(), "deleted git repos from disk"), Ok(Err(e)) => tracing::warn!(error = ?e, %user_id, path = %user_git_dir.display(), "failed to delete git repos from disk"), Err(e) => tracing::warn!(error = ?e, %user_id, path = %user_git_dir.display(), "git repo cleanup task panicked"), } } } /// Delete accounts that were terminated >30 days ago. /// /// Per-user S3 sweeps + git-repo removal + CASCADE delete are spawned off the /// scheduler's advisory-lock-held tick so a backlog of expired accounts /// (5 creators × 20 projects of multi-page `delete_prefix` calls) doesn't /// extend the lock hold past `TICK_DURATION_ALERT_SECS`. Each step in /// `cleanup_user_s3_and_delete` is idempotent: `delete_prefix` on a missing /// prefix is a no-op, the CASCADE `delete_user` on a non-existent row affects /// zero rows. If a second tick fires before all spawned cleanups complete /// (extremely large accounts), the next `get_expired_terminated_ids` query /// excludes already-deleted rows; the worst-case race is one extra harmless /// no-op call against an in-flight target. pub(super) async fn delete_expired_terminated_accounts(state: &AppState) { spawn_expired_account_cleanups( state, db::users::get_expired_terminated_ids(&state.db).await, "termination_expired", "terminated account", "terminated_account_cleanup", ) .await; } /// Delete creator accounts whose 90-day content removal grace period has expired. /// Off-lock spawn pattern — see `delete_expired_terminated_accounts`. pub(super) async fn delete_expired_content_removal_accounts(state: &AppState) { spawn_expired_account_cleanups( state, db::users::get_expired_content_removal_ids(&state.db).await, "content_removal_expired", "content removal", "content_removal_cleanup", ) .await; } async fn spawn_expired_account_cleanups( state: &AppState, fetched: Result, crate::error::AppError>, event: &'static str, label: &'static str, job_name: &'static str, ) { let expired_ids = match fetched { Ok(ids) if ids.is_empty() => { let _ = db::scheduler_jobs::record_job_run(&state.db, job_name, 0).await; return; } Ok(ids) => ids, Err(e) => { tracing::error!(error = ?e, %job_name, "failed to query expired accounts"); return; } }; let scheduled = expired_ids.len() as i64; // Run the per-user sweeps in a SINGLE supervisor task spawned off the tick, // draining through the shared bounded-concurrency helper. This keeps the // off-lock property the doc comment relies on (the scheduler tick returns // immediately, never holding the advisory lock across the sweeps) AND caps // how many sweeps run at once. A bare `tokio::spawn` per expired account had // no cap: a mass expiry (terminated-account or 90-day content-removal purge) // could fan out N concurrent tasks, each holding several pool connections // plus an S3 delete-prefix, and drain the connection pool out from under // request handlers (Run #11 SERIOUS). let supervisor_state = state.clone(); tokio::spawn(async move { drain_cleanup_bounded(&supervisor_state, expired_ids, event, label).await; }); // Record the count *scheduled*, not *deleted*: the deletes finish out of band. // Operators read this metric to confirm the scheduler saw the work, not to // count completed sweeps. cleanup_user_s3_and_delete logs its own success/ // failure per user. let _ = db::scheduler_jobs::record_job_run(&state.db, job_name, scheduled).await; } /// Delete pending transactions older than 25 hours and release promo code reservations. pub(super) async fn cleanup_stale_pending_transactions(state: &AppState) { let promo_ids = match db::transactions::cleanup_stale_pending( &state.db, chrono::Duration::hours(25), ) .await { Ok(ids) => ids, Err(e) => { tracing::error!(error = ?e, "failed to clean up stale pending transactions"); return; } }; // Cart checkouts produce N pending-tx rows that share a promo_code_id; // release once per reservation, not once per row. let unique_promo_ids: std::collections::HashSet<_> = promo_ids.into_iter().flatten().collect(); let mut released = 0i64; for pc_id in unique_promo_ids { if let Err(e) = db::promo_codes::release_use_count(&state.db, pc_id).await { tracing::warn!(promo_code_id = %pc_id, error = ?e, "failed to release promo code use count"); } else { released += 1; } } if released > 0 { tracing::info!(released, "released promo code reservations from stale pending transactions"); } let _ = db::scheduler_jobs::record_job_run(&state.db, "stale_pending_cleanup", released).await; } /// NULL out IP addresses older than 30 days in user_sessions. pub(super) async fn scrub_stale_ip_addresses(state: &AppState) { let cutoff = chrono::Utc::now() - chrono::Duration::days(30); match sqlx::query( "UPDATE user_sessions SET ip_address = NULL WHERE ip_address IS NOT NULL AND created_at < $1", ) .bind(cutoff) .execute(&state.db) .await { Ok(r) => { if r.rows_affected() > 0 { tracing::info!(scrubbed = r.rows_affected(), "scrubbed stale IP addresses (30-day retention)"); } let _ = db::scheduler_jobs::record_job_run(&state.db, "ip_scrub", r.rows_affected() as i64).await; } Err(e) => tracing::error!(error = ?e, "failed to scrub IPs from user_sessions"), } } /// Permanently delete items that were soft-deleted more than 7 days ago. /// Cleans up S3 objects (item files + version files) and decrements storage /// before DB deletion to prevent orphaned storage and accounting drift. pub(super) async fn purge_expired_deleted_items(state: &AppState) { // Collect S3 keys from items AND their versions before CASCADE delete destroys the data let mut all_s3_keys: Vec<(String, String)> = Vec::new(); match db::items::get_expired_deleted_item_s3_keys(&state.db).await { Ok(keys) => { for key in &keys { all_s3_keys.push((key.clone(), "main".to_string())); } } Err(e) => { tracing::error!(error = ?e, "failed to query item S3 keys for items pending purge"); } } match db::items::get_expired_deleted_item_version_s3_keys(&state.db).await { Ok(keys) => { for key in &keys { all_s3_keys.push((key.clone(), "main".to_string())); } } Err(e) => { tracing::error!(error = ?e, "failed to query version S3 keys for items pending purge"); } } // Enqueue all keys as a durable safety net before any destructive work if !all_s3_keys.is_empty() && let Err(e) = db::pending_s3_deletions::enqueue_deletions(&state.db, &all_s3_keys, "purge_deleted_items").await { tracing::error!(error = ?e, "failed to enqueue S3 deletions for purged items, aborting purge"); return; } if let Some(ref s3) = state.s3 && !all_s3_keys.is_empty() { let keys_only: Vec = all_s3_keys.iter().map(|(k, _)| k.clone()).collect(); if let Err(e) = s3.delete_objects(&keys_only).await { tracing::warn!(error = ?e, "batch S3 delete failed for purged items; pending_s3_deletions queue will retry"); } tracing::info!(count = all_s3_keys.len(), "deleted S3 objects for purged items"); } // Decrement storage for each affected user before CASCADE delete match db::items::get_expired_deleted_item_storage_by_user(&state.db).await { Ok(user_sizes) => { for (user_id, total_bytes) in &user_sizes { if *total_bytes > 0 && let Err(e) = db::creator_tiers::decrement_storage_used(&state.db, *user_id, *total_bytes).await { tracing::warn!(user_id = %user_id, bytes = total_bytes, error = ?e, "failed to decrement storage for purged items"); } } } Err(e) => { tracing::error!(error = ?e, "failed to query storage sizes for items pending purge"); } } match db::items::purge_expired_deleted_items(&state.db).await { Ok(0) => { let _ = db::scheduler_jobs::record_job_run(&state.db, "soft_delete_purge", 0).await; } Ok(n) => { tracing::info!(deleted = n, "purged expired soft-deleted items"); let _ = db::scheduler_jobs::record_job_run(&state.db, "soft_delete_purge", n as i64).await; } Err(e) => { tracing::error!(error = ?e, "failed to purge expired soft-deleted items"); } } } /// Outcome of a guarded single-key orphan S3 delete. enum GuardedDelete { /// Object deleted (S3 returned Ok). Deleted, /// Skipped: a live DB row still references the key (delete-then-reupload /// race). The caller should clear its queue/record without deleting. SkippedLive, /// S3 delete failed (already logged). The caller decides retry vs. clear. Failed, } /// The single funnel for deleting one non-prefix orphan S3 object. /// /// Performs the `is_s3_key_live` check FIRST, so an object that a fresh upload /// reclaimed under the same key is never torpedoed. Both the pending-deletions /// retry worker and the orphaned-upload reaper route every single-key delete /// through here — neither can delete a key without the live-check. (Storage S2 /// / CHRONIC 2: the worker had this guard inline; its sibling reaper did not, /// and could delete a now-live deterministic-key object.) async fn delete_orphan_key_guarded( db: &sqlx::PgPool, s3: &dyn crate::storage::StorageBackend, bucket: &str, s3_key: &str, ) -> GuardedDelete { match db::pending_s3_deletions::is_s3_key_live(db, bucket, s3_key).await { Ok(true) => { tracing::info!(s3_key = %s3_key, bucket = %bucket, "orphan S3 delete skipped — key reclaimed by a live row (delete-then-reupload)"); return GuardedDelete::SkippedLive; } Ok(false) => {} Err(e) => { tracing::warn!(s3_key = %s3_key, error = ?e, "live-key check failed; proceeding with delete attempt"); } } match s3.delete_object(s3_key).await { Ok(()) => GuardedDelete::Deleted, Err(e) => { tracing::warn!(s3_key = %s3_key, bucket = %bucket, error = ?e, "orphan S3 delete failed"); GuardedDelete::Failed } } } /// Delete S3 objects from presigned uploads that were never confirmed (>24h old). pub(super) async fn cleanup_orphaned_uploads(state: &AppState) { let stale = match db::pending_uploads::get_stale_pending_uploads( &state.db, chrono::Duration::hours(24), ) .await { Ok(rows) if rows.is_empty() => { let _ = db::scheduler_jobs::record_job_run(&state.db, "orphaned_upload_cleanup", 0).await; return; } Ok(rows) => rows, Err(e) => { tracing::error!(error = ?e, "failed to query stale pending uploads"); return; } }; let mut cleaned = 0i64; let mut keys_to_delete: Vec = Vec::with_capacity(stale.len()); for (s3_key, bucket) in &stale { let s3_client = match bucket.as_str() { "synckit" => state.synckit_s3.as_ref(), _ => state.s3.as_ref(), }; if let Some(s3) = s3_client { // Route through the guarded funnel: if a confirm already reclaimed // this deterministic key, the live row owns the object and we must // NOT delete it — just clear the stale pending_uploads record. match delete_orphan_key_guarded(&state.db, s3.as_ref(), bucket, s3_key).await { GuardedDelete::Deleted => { cleaned += 1; keys_to_delete.push(s3_key.clone()); } // Live row owns the key now: clear the stale record, keep object. GuardedDelete::SkippedLive => keys_to_delete.push(s3_key.clone()), // Delete failed: still clear the record so we don't retry forever. GuardedDelete::Failed => keys_to_delete.push(s3_key.clone()), } } else { // S3 not configured for this bucket; remove the DB record anyway keys_to_delete.push(s3_key.clone()); } } if !keys_to_delete.is_empty() && let Err(e) = db::pending_uploads::delete_pending_uploads(&state.db, &keys_to_delete).await { tracing::error!(error = ?e, "failed to delete pending upload records"); } if cleaned > 0 { tracing::info!(cleaned, "cleaned up orphaned presigned uploads"); } let _ = db::scheduler_jobs::record_job_run(&state.db, "orphaned_upload_cleanup", cleaned).await; } /// Remove stale cart items (>30 days old) and items that became unavailable. pub(super) async fn cleanup_cart_items(state: &AppState) { match db::cart::cleanup_stale_cart_items(&state.db, chrono::Duration::days(30)).await { Ok(n) if n > 0 => tracing::info!(removed = n, "cleaned up stale cart items"), Err(e) => tracing::error!(error = ?e, "failed to clean up stale cart items"), _ => {} } match db::cart::cleanup_unavailable_cart_items(&state.db).await { Ok(n) if n > 0 => tracing::info!(removed = n, "cleaned up unavailable cart items"), Err(e) => tracing::error!(error = ?e, "failed to clean up unavailable cart items"), _ => {} } } /// Retry stale pending S3 deletions (older than 10 minutes, batch of 100). pub(super) async fn retry_pending_s3_deletions(state: &AppState) { let stale = match db::pending_s3_deletions::get_stale_pending( &state.db, chrono::Duration::minutes(10), 100, ).await { Ok(rows) => rows, Err(e) => { tracing::error!(error = ?e, "failed to fetch stale pending S3 deletions"); return; } }; if stale.is_empty() { let _ = db::scheduler_jobs::record_job_run(&state.db, "s3_deletion_retry", 0).await; return; } let mut completed_ids = Vec::new(); let mut dead_letter_ids = Vec::new(); for row in &stale { if row.attempts >= 10 { tracing::error!(s3_key = %row.s3_key, bucket = %row.bucket, source = %row.source, attempts = row.attempts, "S3 deletion dead-lettered after 10 attempts — moving to dead-letter table for manual triage"); dead_letter_ids.push(row.id); continue; } else if row.attempts >= 5 { tracing::warn!(s3_key = %row.s3_key, bucket = %row.bucket, source = %row.source, attempts = row.attempts, "S3 deletion stuck after 5+ attempts"); } let s3 = if row.bucket == "synckit" { state.synckit_s3.as_ref() } else { state.s3.as_ref() }; if let Some(s3) = s3 { if row.s3_key.ends_with('/') { // Prefix delete (user-scoped cascade cleanup). Bypasses the // per-key live-check by design, but carries its own guard: a bare // `{user_id}/` prefix wipes a creator's ENTIRE storage and is only // ever enqueued by account cleanup (which CASCADE-deletes the user // row in the same pass). A STILL-LIVE user here means something // wrongly enqueued a prefix delete against a live account — refuse // and let the row climb toward dead-letter triage rather than // silently nuking live files. if row.bucket == "main" && let Some(uid) = row.s3_key.strip_suffix('/').and_then(|s| s.parse::().ok()) && matches!(db::users::get_user_by_id(&state.db, uid).await, Ok(Some(_))) { tracing::error!(s3_key = %row.s3_key, user_id = %uid, "refusing user-prefix S3 delete: user still exists — parking for dead-letter triage instead of wiping live storage"); continue; } match s3.delete_prefix(&row.s3_key).await { Ok(()) => completed_ids.push(row.id), Err(e) => { tracing::warn!(s3_key = %row.s3_key, error = ?e, "retry S3 prefix deletion failed"); } } } else { // Single-key delete — routed through the one guarded funnel so the // delete-then-reupload live-check can never be skipped. The same // funnel backs the orphaned-upload reaper. match delete_orphan_key_guarded(&state.db, s3.as_ref(), &row.bucket, &row.s3_key).await { GuardedDelete::Deleted | GuardedDelete::SkippedLive => completed_ids.push(row.id), GuardedDelete::Failed => {} // leave queued; climbs toward dead-letter } } } else { // S3 not configured — remove from queue (can't delete what doesn't exist) completed_ids.push(row.id); } } // Move permanently-failing rows to the dead-letter table (durable, operator- // visible) rather than silently DELETEing them and orphaning the S3 object. if !dead_letter_ids.is_empty() { match db::pending_s3_deletions::move_to_dead_letter(&state.db, &dead_letter_ids).await { Ok(moved) => tracing::warn!( moved, "moved permanently-failing S3 deletions to dead-letter table — manual triage required" ), Err(e) => tracing::error!(error = ?e, "failed to move S3 deletions to dead-letter table"), } } if !completed_ids.is_empty() { if let Err(e) = db::pending_s3_deletions::remove_completed(&state.db, &completed_ids).await { tracing::error!(error = ?e, "failed to dequeue completed S3 deletions"); } else { tracing::info!(completed = completed_ids.len(), total = stale.len(), "retried pending S3 deletions"); } } if !completed_ids.is_empty() || !dead_letter_ids.is_empty() { let processed = (completed_ids.len() + dead_letter_ids.len()) as i64; let _ = db::scheduler_jobs::record_job_run(&state.db, "s3_deletion_retry", processed).await; } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn cleanup_git_repos_removes_directory() { let tmp = tempfile::tempdir().unwrap(); let git_root = tmp.path(); let user_dir = git_root.join("testuser"); std::fs::create_dir_all(user_dir.join("repo.git")).unwrap(); std::fs::write(user_dir.join("repo.git/HEAD"), "ref: refs/heads/main\n").unwrap(); let user_id = db::UserId::nil(); cleanup_git_repos_on_disk(git_root.to_str().unwrap(), "testuser", user_id).await; assert!(!user_dir.exists(), "user git directory should be deleted"); } #[tokio::test] async fn cleanup_git_repos_noop_if_missing() { let tmp = tempfile::tempdir().unwrap(); let user_id = db::UserId::nil(); cleanup_git_repos_on_disk(tmp.path().to_str().unwrap(), "nonexistent", user_id).await; } }