//! Background scheduler — runs periodic jobs on a fixed interval. //! //! Every tick: publish scheduled items/posts, send onboarding emails, dispatch builds. //! Every 5 ticks: sandbox cleanup. Every tick: webhook retry, refund escalation, transaction cleanup. //! Daily: subscription checks, bounce monitoring, session pruning, IP scrubbing, account deletion. //! Weekly: storage drift correction, sales count integrity. mod announcements; mod cleanup; mod integrity; mod mt_threads; mod synckit_warnings; mod webhooks; use tokio::sync::watch; use tokio::task::JoinHandle; use crate::constants; use crate::db; use crate::AppState; // Re-export public API used by route handlers pub use announcements::{send_blog_post_announcements, send_release_announcements}; pub use mt_threads::{spawn_mt_thread_for_blog_post, spawn_mt_thread_for_item}; /// Advisory lock ID for single-instance scheduler coordination. /// Prevents duplicate job execution during rolling deploys. const SCHEDULER_ADVISORY_LOCK_ID: i64 = 0x4D4E575F5343484; // "MNW_SCH" truncated /// Weekly drift correction interval in scheduler ticks (10,080 = 7 days at 60s). const DRIFT_CORRECTION_INTERVAL: u64 = 10_080; /// Daily interval in scheduler ticks (1,440 = 24h at 60s). const DAILY_INTERVAL: u64 = 1440; /// Sandbox cleanup interval in scheduler ticks (5 = 5min at 60s). const SANDBOX_CLEANUP_INTERVAL: u64 = 5; /// Hourly interval in scheduler ticks (60 = 1h at 60s). Used by the SyncKit /// usage-warning job. const HOURLY_INTERVAL: u64 = 60; /// Soft tick-duration ceiling. A tick longer than this logs WARN; longer /// than `TICK_DURATION_ALERT_SECS` also opens a WAM ticket (rate-limited). /// Tuned conservatively — at 60s interval, anything over 30s means we're /// burning more than half the budget and the next tick will skip. const TICK_DURATION_WARN_SECS: u64 = 30; const TICK_DURATION_ALERT_SECS: u64 = 50; /// Determine which scheduled job groups should run for a given tick. /// /// Returns `(sandbox_cleanup, hourly_jobs, daily_jobs, weekly_jobs)`. fn jobs_for_tick(tick: u64) -> (bool, bool, bool, bool) { let sandbox = tick.is_multiple_of(SANDBOX_CLEANUP_INTERVAL); let hourly = tick.is_multiple_of(HOURLY_INTERVAL); let daily = tick == 1 || tick.is_multiple_of(DAILY_INTERVAL); let weekly = tick.is_multiple_of(DRIFT_CORRECTION_INTERVAL); (sandbox, hourly, daily, weekly) } /// Spawn the background scheduler loop. Drop `shutdown_tx` to stop it. pub fn spawn_scheduler( state: AppState, mut shutdown_rx: watch::Receiver<()>, ) -> JoinHandle<()> { tokio::spawn(async move { tracing::info!("Scheduler started (interval={}s)", constants::SCHEDULER_INTERVAL_SECS); let mut interval = tokio::time::interval( std::time::Duration::from_secs(constants::SCHEDULER_INTERVAL_SECS), ); interval.tick().await; // consume immediate first tick let mut tick_count: u64 = 0; let mut last_overrun_alert: Option = None; loop { tokio::select! { _ = interval.tick() => {} _ = shutdown_rx.changed() => { tracing::info!("Scheduler shutting down"); return; } } tick_count += 1; let tick_started = std::time::Instant::now(); // Pin advisory lock to a dedicated connection held for the entire tick. // pg_try_advisory_lock is session-scoped — holding the connection prevents // another instance from acquiring the lock until this tick completes. let mut lock_conn = match state.db.acquire().await { Ok(conn) => conn, Err(e) => { tracing::warn!(error = ?e, "scheduler: failed to acquire connection for advisory lock, skipping tick"); continue; } }; let locked: bool = match sqlx::query_scalar("SELECT pg_try_advisory_lock($1)") .bind(SCHEDULER_ADVISORY_LOCK_ID) .fetch_one(&mut *lock_conn) .await { Ok(v) => v, Err(e) => { tracing::warn!(error = ?e, "scheduler: failed to acquire advisory lock, skipping tick"); continue; } }; if !locked { tracing::debug!("scheduler: advisory lock held by another instance, skipping tick"); continue; } // Publish scheduled items. The announcement fan-out (4 DB // roundtrips per item) gets spawned off the lock-held tick so a // burst of releases doesn't extend the advisory-lock hold time. match db::items::publish_scheduled_items(&state.db).await { Ok(items) => { for item in &items { tracing::info!( item_id = %item.id, title = %item.title, "scheduler published item" ); let state_for_announce = state.clone(); let item_for_announce = item.clone(); tokio::spawn(async move { announcements::send_release_announcements( &state_for_announce, &item_for_announce, ).await; }); if item.mt_thread_id.is_none() { mt_threads::spawn_mt_thread_for_item_by_lookup(&state, item); } } } Err(e) => { tracing::error!(error = ?e, "scheduler failed to publish items"); } } // Send onboarding drip emails announcements::send_onboarding_emails(&state).await; // Dispatch pending builds crate::build_runner::dispatch_pending_build(&state).await; // Publish scheduled blog posts — same off-lock pattern as items. match db::blog_posts::publish_scheduled_blog_posts(&state.db).await { Ok(posts) => { for post in &posts { tracing::info!( post_id = %post.id, title = %post.title, "scheduler published blog post" ); let state_for_announce = state.clone(); let post_for_announce = post.clone(); tokio::spawn(async move { announcements::send_blog_post_announcements( &state_for_announce, &post_for_announce, ).await; }); if post.mt_thread_id.is_none() { mt_threads::spawn_mt_thread_for_blog_post_by_lookup(&state, post); } } } Err(e) => { tracing::error!(error = ?e, "scheduler failed to publish blog posts"); } } // Enforce post-grace item hiding (canceled 30+ days ago) integrity::enforce_post_grace_hiding(&state).await; // Clean up expired idempotency keys (every tick is fine — cheap DELETE) if let Err(e) = db::idempotency::cleanup_expired(&state.db).await { tracing::error!(error = ?e, "failed to clean up expired idempotency keys"); } let (run_sandbox, run_hourly, run_daily, run_weekly) = jobs_for_tick(tick_count); // Clean up expired sandbox accounts (every 5 ticks = 5 min at 60s interval) if run_sandbox { cleanup::cleanup_sandbox_accounts(&state).await; cleanup::retry_pending_s3_deletions(&state).await; let report = crate::scanning::spool::reap_orphans( std::path::Path::new(constants::SCAN_SPOOL_DIR), ); if report.deleted > 0 || report.errors > 0 { tracing::info!( deleted = report.deleted, errors = report.errors, "scan spool reaper swept orphans" ); } } // Retry failed webhook events webhooks::retry_failed_webhooks(&state).await; // Escalate stale pending refunds (unmatched for >24 hours) webhooks::escalate_stale_refunds(&state).await; // Clean up stale pending transactions (>24h) and release promo code reservations cleanup::cleanup_stale_pending_transactions(&state).await; // Delete S3 objects from presigned uploads that were never confirmed (>24h) cleanup::cleanup_orphaned_uploads(&state).await; // Hourly: scan SyncKit apps for 75/90/100% cap breaches and email // the app owner. Cheap query — single JOIN on a small table. if run_hourly { synckit_warnings::check_and_send_warnings(&state).await; cleanup::purge_old_scan_jobs(&state).await; } // Weekly storage drift correction + integrity checks if run_weekly { integrity::recalculate_all_storage_used(&state).await; integrity::check_sales_count_drift(&state).await; match db::synckit_billing::recalculate_synckit_app_storage(&state.db).await { Ok(n) => { if n > 0 { tracing::info!(corrected = n, "synckit app storage drift corrected"); } } Err(e) => tracing::error!(error = ?e, "synckit storage drift correction failed"), } } // Daily checks (every 1440 ticks at 60s interval, plus first tick after startup) if run_daily { integrity::check_stale_subscriptions(&state).await; integrity::check_email_bounce_spike(&state).await; // Prune session records inactive for 90+ days let session_threshold = chrono::Utc::now() - chrono::Duration::days(90); match db::sessions::prune_expired_sessions(&state.db, session_threshold).await { Ok(n) => { if n > 0 { tracing::info!(pruned = n, "pruned expired session records"); } let _ = db::scheduler_jobs::record_job_run(&state.db, "session_prune", n as i64).await; } Err(e) => tracing::error!(error = ?e, "failed to prune expired sessions"), } // Scrub IP addresses older than 30 days (privacy policy commitment) cleanup::scrub_stale_ip_addresses(&state).await; // Delete terminated accounts whose 30-day export window has expired cleanup::delete_expired_terminated_accounts(&state).await; // Delete self-deleted creator accounts whose 90-day content grace period has expired cleanup::delete_expired_content_removal_accounts(&state).await; // Permanently delete soft-deleted items older than 7 days cleanup::purge_expired_deleted_items(&state).await; // Clean up stale and unavailable cart items cleanup::cleanup_cart_items(&state).await; // Prune page view aggregates older than 2 years match db::page_views::prune_old_views(&state.db, 730).await { Ok(n) => { if n > 0 { tracing::info!(pruned = n, "pruned old page view records"); } } Err(e) => tracing::error!(error = ?e, "failed to prune page views"), } } // Explicitly release the advisory lock (defense-in-depth: also released // when lock_conn is dropped, but explicit unlock survives refactors that // might move lock_conn into a shorter-lived scope). let _ = sqlx::query("SELECT pg_advisory_unlock($1)") .bind(SCHEDULER_ADVISORY_LOCK_ID) .execute(&mut *lock_conn) .await; // Tick-duration accounting. WARN at TICK_DURATION_WARN_SECS so the // log surfaces it; raise a WAM ticket past TICK_DURATION_ALERT_SECS // with a 1-hour cooldown so a chronic overrun doesn't flood tickets. let tick_duration = tick_started.elapsed(); let tick_secs = tick_duration.as_secs(); if tick_secs >= TICK_DURATION_WARN_SECS { tracing::warn!( tick = tick_count, duration_secs = tick_secs, "scheduler tick exceeded soft duration ceiling" ); } if tick_secs >= TICK_DURATION_ALERT_SECS && let Some(ref wam) = state.wam { let cooldown_ok = last_overrun_alert .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS); if cooldown_ok { let title = format!("Scheduler tick overran: {tick_secs}s"); let body = format!( "tick #{tick_count} took {tick_secs}s (interval is {}s).", constants::SCHEDULER_INTERVAL_SECS ); wam.create_ticket(&title, Some(&body), "high", "scheduler-tick-overrun", None).await; last_overrun_alert = Some(std::time::Instant::now()); } } } }) } #[cfg(test)] mod tests { use super::*; // ── Tick cadence tests ── #[test] fn tick_1_runs_daily_not_weekly() { let (sandbox, _hourly, daily, weekly) = jobs_for_tick(1); assert!(!sandbox, "tick 1 is not a multiple of 5"); assert!(daily, "tick 1 should trigger daily jobs (first-tick rule)"); assert!(!weekly, "tick 1 should not trigger weekly jobs"); } #[test] fn tick_5_runs_sandbox_cleanup() { let (sandbox, _hourly, daily, weekly) = jobs_for_tick(5); assert!(sandbox); assert!(!daily); assert!(!weekly); } #[test] fn tick_1440_runs_daily_and_sandbox() { let (sandbox, _hourly, daily, weekly) = jobs_for_tick(1440); assert!(sandbox, "1440 is divisible by 5"); assert!(daily, "1440 is the daily interval"); assert!(!weekly); } #[test] fn tick_10080_runs_all_three() { let (sandbox, _hourly, daily, weekly) = jobs_for_tick(10_080); assert!(sandbox, "10080 is divisible by 5"); assert!(daily, "10080 is divisible by 1440"); assert!(weekly, "10080 is the weekly interval"); } #[test] fn normal_tick_runs_nothing_special() { let (sandbox, _hourly, daily, weekly) = jobs_for_tick(7); assert!(!sandbox); assert!(!daily); assert!(!weekly); } #[test] fn second_daily_tick() { let (_, _, daily, _) = jobs_for_tick(2880); assert!(daily, "2880 = 2 * 1440"); } #[test] fn second_weekly_tick() { let (_, _, _, weekly) = jobs_for_tick(20_160); assert!(weekly, "20160 = 2 * 10080"); } // ── Interval constant sanity checks ── #[test] fn drift_correction_interval_is_7_days() { assert_eq!(DRIFT_CORRECTION_INTERVAL, 7 * 24 * 60); } #[test] fn daily_interval_is_24_hours() { assert_eq!(DAILY_INTERVAL, 24 * 60); } #[test] fn sandbox_cleanup_interval_is_5_minutes() { assert_eq!(SANDBOX_CLEANUP_INTERVAL, 5); } // ── Adversarial tests (test-fuzz) ── #[test] fn tick_0_runs_nothing() { // Tick 0 should not run anything: 0 % N == 0 for all N, // but tick 0 never happens in practice (counter starts at 0, increments before use). // Test what would happen if it did. let (sandbox, _hourly, daily, weekly) = jobs_for_tick(0); // 0.is_multiple_of(N) is true for all N, so these all fire assert!(sandbox, "0 is a multiple of 5"); assert!(daily, "0 is a multiple of 1440"); assert!(weekly, "0 is a multiple of 10080"); } #[test] fn large_tick_values() { // 52 weeks of ticks — exact multiple of weekly interval let fifty_two_weeks = 52 * DRIFT_CORRECTION_INTERVAL; let (sandbox, _hourly, daily, weekly) = jobs_for_tick(fifty_two_weeks); assert!(sandbox); assert!(daily, "{} should be divisible by 1440", fifty_two_weeks); assert!(weekly, "{} should be divisible by 10080", fifty_two_weeks); // Large non-aligned tick let (_, _, daily, weekly) = jobs_for_tick(999_999); assert!(!daily, "999999 is not divisible by 1440"); assert!(!weekly, "999999 is not divisible by 10080"); } #[test] fn daily_not_on_partial_day() { // Tick 720 = half a day, should not trigger daily let (_, _, daily, _) = jobs_for_tick(720); assert!(!daily, "720 ticks is only half a day"); } #[test] fn weekly_not_on_partial_week() { // 5040 = 3.5 days, should not trigger weekly let (_, _, _, weekly) = jobs_for_tick(5040); assert!(!weekly, "5040 is only 3.5 days"); } #[test] fn sandbox_every_5_ticks_consecutively() { for tick in 1..=25 { let (sandbox, _, _, _) = jobs_for_tick(tick); if tick % 5 == 0 { assert!(sandbox, "tick {} should run sandbox cleanup", tick); } else { assert!(!sandbox, "tick {} should NOT run sandbox cleanup", tick); } } } #[test] fn intervals_are_coprime_aware() { // Verify weekly is an exact multiple of daily assert_eq!(DRIFT_CORRECTION_INTERVAL % DAILY_INTERVAL, 0, "weekly interval must be exact multiple of daily"); // Verify sandbox fits evenly into daily assert_eq!(DAILY_INTERVAL % SANDBOX_CLEANUP_INTERVAL, 0, "sandbox interval must divide evenly into daily"); } }