Skip to main content

max / makenotwork

Add scheduler jobs: sandbox cleanup, stale tx cleanup, IP scrubbing, termination Sandbox cleanup: delete expired sandbox accounts and their S3 objects (user prefix + project prefixes) every 5 minutes. Stale pending transactions: delete pending txns older than 25h, release reserved promo code use_count slots. IP scrubbing: NULL out IP addresses older than 30 days in user_sessions, download_fingerprints, and streaming_sessions (privacy policy commitment). Terminated accounts: delete accounts 30 days after termination, with full S3 cleanup before CASCADE delete. S3: add delete_prefix to StorageBackend trait and S3Client, with paginated list+delete implementation in shared/s3-storage. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-04-26 19:45 UTC
Commit: be9c4b0dcecbadf70e14edf23124efaf13d61af6
Parent: 34f43e1
3 files changed, +239 insertions, -0 deletions
@@ -389,12 +389,20 @@ pub fn spawn_scheduler(
389 389 tracing::error!(error = ?e, "failed to clean up expired idempotency keys");
390 390 }
391 391
392 + // Clean up expired sandbox accounts (every 5 ticks = 5 min at 60s interval)
393 + if tick_count.is_multiple_of(5) {
394 + cleanup_sandbox_accounts(&state).await;
395 + }
396 +
392 397 // Retry failed webhook events
393 398 retry_failed_webhooks(&state).await;
394 399
395 400 // Escalate stale pending refunds (unmatched for >24 hours)
396 401 escalate_stale_refunds(&state).await;
397 402
403 + // Clean up stale pending transactions (>24h) and release promo code reservations
404 + cleanup_stale_pending_transactions(&state).await;
405 +
398 406 // Weekly storage drift correction + integrity checks
399 407 if tick_count.is_multiple_of(DRIFT_CORRECTION_INTERVAL) {
400 408 recalculate_all_storage_used(&state).await;
@@ -413,12 +421,68 @@ pub fn spawn_scheduler(
413 421 Ok(n) => tracing::info!(pruned = n, "pruned expired session records"),
414 422 Err(e) => tracing::error!(error = ?e, "failed to prune expired sessions"),
415 423 }
424 +
425 + // Scrub IP addresses older than 30 days (privacy policy commitment)
426 + scrub_stale_ip_addresses(&state).await;
427 +
428 + // Delete terminated accounts whose 30-day export window has expired
429 + delete_expired_terminated_accounts(&state).await;
416 430 }
417 431 }
418 432 })
419 433 }
420 434
421 435 // ============================================================================
436 + // Sandbox cleanup
437 + // ============================================================================
438 +
439 + /// Delete expired sandbox accounts and their S3 objects.
440 + ///
441 + /// S3 cleanup uses two prefix patterns:
442 + /// - `{user_id}/` — covers item audio, downloads, covers, versions, media
443 + /// - `projects/{project_id}/` — covers project images
444 + ///
445 + /// Both are deleted before the user row CASCADE wipes DB records.
446 + async fn cleanup_sandbox_accounts(state: &AppState) {
447 + let expired_ids = match db::users::get_expired_sandbox_ids(&state.db).await {
448 + Ok(ids) if ids.is_empty() => return,
449 + Ok(ids) => ids,
450 + Err(e) => {
451 + tracing::error!(error = ?e, "failed to query expired sandbox accounts");
452 + return;
453 + }
454 + };
455 +
456 + for user_id in &expired_ids {
457 + if let Some(ref s3) = state.s3 {
458 + // Delete item-level S3 objects (audio, downloads, covers, versions, media)
459 + let user_prefix = format!("{user_id}/");
460 + if let Err(e) = s3.delete_prefix(&user_prefix).await {
461 + tracing::warn!(error = ?e, %user_id, "failed to delete sandbox user S3 objects");
462 + }
463 +
464 + // Delete project-level S3 objects (project images)
465 + // Must query project IDs before the CASCADE delete removes them
466 + if let Ok(project_ids) = db::projects::get_project_ids_for_user(&state.db, *user_id).await {
467 + for pid in project_ids {
468 + let proj_prefix = format!("projects/{pid}/");
469 + if let Err(e) = s3.delete_prefix(&proj_prefix).await {
470 + tracing::warn!(error = ?e, %user_id, %pid, "failed to delete sandbox project S3 objects");
471 + }
472 + }
473 + }
474 + }
475 +
476 + // delete_user cascades to all child rows (projects, items, sessions, etc.)
477 + if let Err(e) = db::users::delete_user(&state.db, *user_id).await {
478 + tracing::error!(error = ?e, %user_id, "failed to delete expired sandbox account");
479 + } else {
480 + tracing::info!(%user_id, event = "sandbox_expired", "sandbox account cleaned up");
481 + }
482 + }
483 + }
484 +
485 + // ============================================================================
422 486 // Periodic integrity checks
423 487 // ============================================================================
424 488
@@ -703,6 +767,136 @@ async fn escalate_stale_refunds(state: &AppState) {
703 767 }
704 768
705 769 // ============================================================================
770 + // Stale pending transaction cleanup
771 + // ============================================================================
772 +
773 + /// Delete pending transactions older than 25 hours (Stripe sessions expire at 24h)
774 + /// and release any reserved promo code use_count slots.
775 + async fn cleanup_stale_pending_transactions(state: &AppState) {
776 + let promo_ids = match db::transactions::cleanup_stale_pending(
777 + &state.db,
778 + chrono::Duration::hours(25),
779 + )
780 + .await
781 + {
782 + Ok(ids) if ids.is_empty() => return,
783 + Ok(ids) => ids,
784 + Err(e) => {
785 + tracing::error!(error = ?e, "failed to clean up stale pending transactions");
786 + return;
787 + }
788 + };
789 +
790 + let mut released = 0u64;
791 + for pc_id in promo_ids.into_iter().flatten() {
792 + if let Err(e) = db::promo_codes::release_use_count(&state.db, pc_id).await {
793 + tracing::warn!(promo_code_id = %pc_id, error = ?e, "failed to release promo code use count");
794 + } else {
795 + released += 1;
796 + }
797 + }
798 +
799 + if released > 0 {
800 + tracing::info!(released, "released promo code reservations from stale pending transactions");
801 + }
802 + }
803 +
804 + // ============================================================================
805 + // Terminated account cleanup (30-day export window expired)
806 + // ============================================================================
807 +
808 + /// Delete accounts that were terminated >30 days ago.
809 + /// Same cleanup pattern as sandbox accounts: S3 objects first, then CASCADE delete.
810 + async fn delete_expired_terminated_accounts(state: &AppState) {
811 + let expired_ids = match db::users::get_expired_terminated_ids(&state.db).await {
812 + Ok(ids) if ids.is_empty() => return,
813 + Ok(ids) => ids,
814 + Err(e) => {
815 + tracing::error!(error = ?e, "failed to query expired terminated accounts");
816 + return;
817 + }
818 + };
819 +
820 + for user_id in &expired_ids {
821 + // Clean up S3 objects before DB cascade
822 + if let Some(ref s3) = state.s3 {
823 + let user_prefix = format!("{user_id}/");
824 + if let Err(e) = s3.delete_prefix(&user_prefix).await {
825 + tracing::warn!(error = ?e, %user_id, "failed to delete terminated user S3 objects");
826 + }
827 +
828 + if let Ok(project_ids) = db::projects::get_project_ids_for_user(&state.db, *user_id).await {
829 + for pid in project_ids {
830 + let proj_prefix = format!("projects/{pid}/");
831 + if let Err(e) = s3.delete_prefix(&proj_prefix).await {
832 + tracing::warn!(error = ?e, %user_id, %pid, "failed to delete terminated user project S3 objects");
833 + }
834 + }
835 + }
836 + }
837 +
838 + if let Err(e) = db::users::delete_user(&state.db, *user_id).await {
839 + tracing::error!(error = ?e, %user_id, "failed to delete expired terminated account");
840 + } else {
841 + tracing::info!(%user_id, event = "termination_expired", "terminated account deleted after 30-day export window");
842 + }
843 + }
844 + }
845 +
846 + // ============================================================================
847 + // IP address scrubbing (privacy policy: 30-day retention)
848 + // ============================================================================
849 +
850 + /// NULL out IP addresses older than 30 days in user_sessions,
851 + /// download_fingerprints, and streaming_sessions.
852 + ///
853 + /// Keeps the rows intact for history/analytics; only the IP field is cleared.
854 + async fn scrub_stale_ip_addresses(state: &AppState) {
855 + let cutoff = chrono::Utc::now() - chrono::Duration::days(30);
856 + let mut total = 0u64;
857 +
858 + // user_sessions: ip_address TEXT, keyed on created_at
859 + match sqlx::query(
860 + "UPDATE user_sessions SET ip_address = NULL WHERE ip_address IS NOT NULL AND created_at < $1",
861 + )
862 + .bind(cutoff)
863 + .execute(&state.db)
864 + .await
865 + {
866 + Ok(r) => total += r.rows_affected(),
867 + Err(e) => tracing::error!(error = ?e, "failed to scrub IPs from user_sessions"),
868 + }
869 +
870 + // download_fingerprints: ip_address INET, keyed on downloaded_at
871 + match sqlx::query(
872 + "UPDATE download_fingerprints SET ip_address = NULL WHERE ip_address IS NOT NULL AND downloaded_at < $1",
873 + )
874 + .bind(cutoff)
875 + .execute(&state.db)
876 + .await
877 + {
878 + Ok(r) => total += r.rows_affected(),
879 + Err(e) => tracing::error!(error = ?e, "failed to scrub IPs from download_fingerprints"),
880 + }
881 +
882 + // streaming_sessions: ip_address INET NOT NULL — set to 0.0.0.0 since column is NOT NULL
883 + match sqlx::query(
884 + "UPDATE streaming_sessions SET ip_address = '0.0.0.0' WHERE ip_address != '0.0.0.0' AND started_at < $1",
885 + )
886 + .bind(cutoff)
887 + .execute(&state.db)
888 + .await
889 + {
890 + Ok(r) => total += r.rows_affected(),
891 + Err(e) => tracing::error!(error = ?e, "failed to scrub IPs from streaming_sessions"),
892 + }
893 +
894 + if total > 0 {
895 + tracing::info!(scrubbed = total, "scrubbed stale IP addresses (30-day retention)");
896 + }
897 + }
898 +
899 + // ============================================================================
706 900 // MT thread provisioning helpers
707 901 // ============================================================================
708 902
@@ -175,6 +175,8 @@ pub trait StorageBackend: Send + Sync {
175 175 async fn download_object(&self, s3_key: &str) -> Result<Vec<u8>>;
176 176 async fn upload_object(&self, s3_key: &str, content_type: &str, data: Vec<u8>, cache_control: Option<&str>) -> Result<()>;
177 177 async fn delete_object(&self, s3_key: &str) -> Result<()>;
178 + /// Delete all objects under a key prefix. Default is a no-op.
179 + async fn delete_prefix(&self, _prefix: &str) -> Result<()> { Ok(()) }
178 180 async fn check_connectivity(&self) -> std::result::Result<(), String>;
179 181 fn bucket(&self) -> &str;
180 182 }
@@ -457,6 +459,11 @@ impl StorageBackend for S3Client {
457 459 self.delete_object(s3_key).await
458 460 }
459 461
462 + async fn delete_prefix(&self, prefix: &str) -> Result<()> {
463 + self.inner.delete_prefix(prefix).await
464 + .map_err(|e| AppError::Storage(e))
465 + }
466 +
460 467 async fn check_connectivity(&self) -> std::result::Result<(), String> {
461 468 self.check_connectivity().await
462 469 }
@@ -130,6 +130,44 @@ impl S3Client {
130 130 Ok(())
131 131 }
132 132
133 + /// Delete all objects under a given key prefix.
134 + ///
135 + /// Lists objects with the prefix and deletes them one at a time.
136 + /// Suitable for small numbers of objects (sandbox cleanup, etc.).
137 + pub async fn delete_prefix(&self, prefix: &str) -> Result<(), String> {
138 + let mut continuation_token: Option<String> = None;
139 + loop {
140 + let mut req = self
141 + .client
142 + .list_objects_v2()
143 + .bucket(&self.bucket)
144 + .prefix(prefix)
145 + .max_keys(1000);
146 + if let Some(ref token) = continuation_token {
147 + req = req.continuation_token(token);
148 + }
149 + let resp = req
150 + .send()
151 + .await
152 + .map_err(|e| format!("S3 list objects failed: {e}"))?;
153 +
154 + if let Some(contents) = resp.contents {
155 + for obj in contents {
156 + if let Some(key) = obj.key {
157 + self.delete(&key).await?;
158 + }
159 + }
160 + }
161 +
162 + if resp.is_truncated.unwrap_or(false) {
163 + continuation_token = resp.next_continuation_token;
164 + } else {
165 + break;
166 + }
167 + }
168 + Ok(())
169 + }
170 +
133 171 /// Check if an object exists in S3.
134 172 pub async fn object_exists(&self, key: &str) -> Result<bool, String> {
135 173 match self