Skip to main content

max / makenotwork

18.1 KB · 451 lines History Blame Raw
1 //! Background scheduler — runs periodic jobs on a fixed interval.
2 //!
3 //! Every tick: publish scheduled items/posts, send onboarding emails, dispatch builds.
4 //! Every 5 ticks: sandbox cleanup. Every tick: webhook retry, refund escalation, transaction cleanup.
5 //! Daily: subscription checks, bounce monitoring, session pruning, IP scrubbing, account deletion.
6 //! Weekly: storage drift correction, sales count integrity.
7
8 mod announcements;
9 mod cleanup;
10 mod integrity;
11 mod mt_threads;
12 mod synckit_warnings;
13 mod webhooks;
14
15 use tokio::sync::watch;
16 use tokio::task::JoinHandle;
17
18 use crate::constants;
19 use crate::db;
20 use crate::AppState;
21
22 // Re-export public API used by route handlers
23 pub use announcements::{send_blog_post_announcements, send_release_announcements};
24 pub use mt_threads::{spawn_mt_thread_for_blog_post, spawn_mt_thread_for_item};
25
26 /// Advisory lock ID for single-instance scheduler coordination.
27 /// Prevents duplicate job execution during rolling deploys.
28 const SCHEDULER_ADVISORY_LOCK_ID: i64 = 0x4D4E575F5343484; // "MNW_SCH" truncated
29
30 /// Weekly drift correction interval in scheduler ticks (10,080 = 7 days at 60s).
31 const DRIFT_CORRECTION_INTERVAL: u64 = 10_080;
32
33 /// Daily interval in scheduler ticks (1,440 = 24h at 60s).
34 const DAILY_INTERVAL: u64 = 1440;
35
36 /// Sandbox cleanup interval in scheduler ticks (5 = 5min at 60s).
37 const SANDBOX_CLEANUP_INTERVAL: u64 = 5;
38
39 /// Hourly interval in scheduler ticks (60 = 1h at 60s). Used by the SyncKit
40 /// usage-warning job.
41 const HOURLY_INTERVAL: u64 = 60;
42
43 /// Soft tick-duration ceiling. A tick longer than this logs WARN; longer
44 /// than `TICK_DURATION_ALERT_SECS` also opens a WAM ticket (rate-limited).
45 /// Tuned conservatively — at 60s interval, anything over 30s means we're
46 /// burning more than half the budget and the next tick will skip.
47 const TICK_DURATION_WARN_SECS: u64 = 30;
48 const TICK_DURATION_ALERT_SECS: u64 = 50;
49
50 /// Determine which scheduled job groups should run for a given tick.
51 ///
52 /// Returns `(sandbox_cleanup, hourly_jobs, daily_jobs, weekly_jobs)`.
53 fn jobs_for_tick(tick: u64) -> (bool, bool, bool, bool) {
54 let sandbox = tick.is_multiple_of(SANDBOX_CLEANUP_INTERVAL);
55 let hourly = tick.is_multiple_of(HOURLY_INTERVAL);
56 let daily = tick == 1 || tick.is_multiple_of(DAILY_INTERVAL);
57 let weekly = tick.is_multiple_of(DRIFT_CORRECTION_INTERVAL);
58 (sandbox, hourly, daily, weekly)
59 }
60
61 /// Spawn the background scheduler loop. Drop `shutdown_tx` to stop it.
62 pub fn spawn_scheduler(
63 state: AppState,
64 mut shutdown_rx: watch::Receiver<()>,
65 ) -> JoinHandle<()> {
66 tokio::spawn(async move {
67 tracing::info!("Scheduler started (interval={}s)", constants::SCHEDULER_INTERVAL_SECS);
68
69 let mut interval = tokio::time::interval(
70 std::time::Duration::from_secs(constants::SCHEDULER_INTERVAL_SECS),
71 );
72 interval.tick().await; // consume immediate first tick
73
74 let mut tick_count: u64 = 0;
75 let mut last_overrun_alert: Option<std::time::Instant> = None;
76
77 loop {
78 tokio::select! {
79 _ = interval.tick() => {}
80 _ = shutdown_rx.changed() => {
81 tracing::info!("Scheduler shutting down");
82 return;
83 }
84 }
85
86 tick_count += 1;
87 let tick_started = std::time::Instant::now();
88
89 // Pin advisory lock to a dedicated connection held for the entire tick.
90 // pg_try_advisory_lock is session-scoped — holding the connection prevents
91 // another instance from acquiring the lock until this tick completes.
92 let mut lock_conn = match state.db.acquire().await {
93 Ok(conn) => conn,
94 Err(e) => {
95 tracing::warn!(error = ?e, "scheduler: failed to acquire connection for advisory lock, skipping tick");
96 continue;
97 }
98 };
99 let locked: bool = match sqlx::query_scalar("SELECT pg_try_advisory_lock($1)")
100 .bind(SCHEDULER_ADVISORY_LOCK_ID)
101 .fetch_one(&mut *lock_conn)
102 .await
103 {
104 Ok(v) => v,
105 Err(e) => {
106 tracing::warn!(error = ?e, "scheduler: failed to acquire advisory lock, skipping tick");
107 continue;
108 }
109 };
110 if !locked {
111 tracing::debug!("scheduler: advisory lock held by another instance, skipping tick");
112 continue;
113 }
114
115 // Publish scheduled items. The announcement fan-out (4 DB
116 // roundtrips per item) gets spawned off the lock-held tick so a
117 // burst of releases doesn't extend the advisory-lock hold time.
118 match db::items::publish_scheduled_items(&state.db).await {
119 Ok(items) => {
120 for item in &items {
121 tracing::info!(
122 item_id = %item.id,
123 title = %item.title,
124 "scheduler published item"
125 );
126 let state_for_announce = state.clone();
127 let item_for_announce = item.clone();
128 tokio::spawn(async move {
129 announcements::send_release_announcements(
130 &state_for_announce, &item_for_announce,
131 ).await;
132 });
133 if item.mt_thread_id.is_none() {
134 mt_threads::spawn_mt_thread_for_item_by_lookup(&state, item);
135 }
136 }
137 }
138 Err(e) => {
139 tracing::error!(error = ?e, "scheduler failed to publish items");
140 }
141 }
142
143 // Send onboarding drip emails
144 announcements::send_onboarding_emails(&state).await;
145
146 // Dispatch pending builds
147 crate::build_runner::dispatch_pending_build(&state).await;
148
149 // Publish scheduled blog posts — same off-lock pattern as items.
150 match db::blog_posts::publish_scheduled_blog_posts(&state.db).await {
151 Ok(posts) => {
152 for post in &posts {
153 tracing::info!(
154 post_id = %post.id,
155 title = %post.title,
156 "scheduler published blog post"
157 );
158 let state_for_announce = state.clone();
159 let post_for_announce = post.clone();
160 tokio::spawn(async move {
161 announcements::send_blog_post_announcements(
162 &state_for_announce, &post_for_announce,
163 ).await;
164 });
165 if post.mt_thread_id.is_none() {
166 mt_threads::spawn_mt_thread_for_blog_post_by_lookup(&state, post);
167 }
168 }
169 }
170 Err(e) => {
171 tracing::error!(error = ?e, "scheduler failed to publish blog posts");
172 }
173 }
174
175 // Enforce post-grace item hiding (canceled 30+ days ago)
176 integrity::enforce_post_grace_hiding(&state).await;
177
178 // Clean up expired idempotency keys (every tick is fine — cheap DELETE)
179 if let Err(e) = db::idempotency::cleanup_expired(&state.db).await {
180 tracing::error!(error = ?e, "failed to clean up expired idempotency keys");
181 }
182
183 let (run_sandbox, run_hourly, run_daily, run_weekly) = jobs_for_tick(tick_count);
184
185 // Clean up expired sandbox accounts (every 5 ticks = 5 min at 60s interval)
186 if run_sandbox {
187 cleanup::cleanup_sandbox_accounts(&state).await;
188 cleanup::retry_pending_s3_deletions(&state).await;
189 let report = crate::scanning::spool::reap_orphans(
190 std::path::Path::new(constants::SCAN_SPOOL_DIR),
191 );
192 if report.deleted > 0 || report.errors > 0 {
193 tracing::info!(
194 deleted = report.deleted,
195 errors = report.errors,
196 "scan spool reaper swept orphans"
197 );
198 }
199 }
200
201 // Retry failed webhook events
202 webhooks::retry_failed_webhooks(&state).await;
203
204 // Escalate stale pending refunds (unmatched for >24 hours)
205 webhooks::escalate_stale_refunds(&state).await;
206
207 // Clean up stale pending transactions (>24h) and release promo code reservations
208 cleanup::cleanup_stale_pending_transactions(&state).await;
209
210 // Delete S3 objects from presigned uploads that were never confirmed (>24h)
211 cleanup::cleanup_orphaned_uploads(&state).await;
212
213 // Hourly: scan SyncKit apps for 75/90/100% cap breaches and email
214 // the app owner. Cheap query — single JOIN on a small table.
215 if run_hourly {
216 synckit_warnings::check_and_send_warnings(&state).await;
217 cleanup::purge_old_scan_jobs(&state).await;
218 }
219
220 // Weekly storage drift correction + integrity checks
221 if run_weekly {
222 integrity::recalculate_all_storage_used(&state).await;
223 integrity::check_sales_count_drift(&state).await;
224 match db::synckit_billing::recalculate_synckit_app_storage(&state.db).await {
225 Ok(n) => {
226 if n > 0 {
227 tracing::info!(corrected = n, "synckit app storage drift corrected");
228 }
229 }
230 Err(e) => tracing::error!(error = ?e, "synckit storage drift correction failed"),
231 }
232 }
233
234 // Daily checks (every 1440 ticks at 60s interval, plus first tick after startup)
235 if run_daily {
236 integrity::check_stale_subscriptions(&state).await;
237 integrity::check_email_bounce_spike(&state).await;
238
239 // Prune session records inactive for 90+ days
240 let session_threshold = chrono::Utc::now() - chrono::Duration::days(90);
241 match db::sessions::prune_expired_sessions(&state.db, session_threshold).await {
242 Ok(n) => {
243 if n > 0 { tracing::info!(pruned = n, "pruned expired session records"); }
244 let _ = db::scheduler_jobs::record_job_run(&state.db, "session_prune", n as i64).await;
245 }
246 Err(e) => tracing::error!(error = ?e, "failed to prune expired sessions"),
247 }
248
249 // Scrub IP addresses older than 30 days (privacy policy commitment)
250 cleanup::scrub_stale_ip_addresses(&state).await;
251
252 // Delete terminated accounts whose 30-day export window has expired
253 cleanup::delete_expired_terminated_accounts(&state).await;
254
255 // Delete self-deleted creator accounts whose 90-day content grace period has expired
256 cleanup::delete_expired_content_removal_accounts(&state).await;
257
258 // Permanently delete soft-deleted items older than 7 days
259 cleanup::purge_expired_deleted_items(&state).await;
260
261 // Clean up stale and unavailable cart items
262 cleanup::cleanup_cart_items(&state).await;
263
264 // Prune page view aggregates older than 2 years
265 match db::page_views::prune_old_views(&state.db, 730).await {
266 Ok(n) => {
267 if n > 0 { tracing::info!(pruned = n, "pruned old page view records"); }
268 }
269 Err(e) => tracing::error!(error = ?e, "failed to prune page views"),
270 }
271 }
272
273 // Explicitly release the advisory lock (defense-in-depth: also released
274 // when lock_conn is dropped, but explicit unlock survives refactors that
275 // might move lock_conn into a shorter-lived scope).
276 let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
277 .bind(SCHEDULER_ADVISORY_LOCK_ID)
278 .execute(&mut *lock_conn)
279 .await;
280
281 // Tick-duration accounting. WARN at TICK_DURATION_WARN_SECS so the
282 // log surfaces it; raise a WAM ticket past TICK_DURATION_ALERT_SECS
283 // with a 1-hour cooldown so a chronic overrun doesn't flood tickets.
284 let tick_duration = tick_started.elapsed();
285 let tick_secs = tick_duration.as_secs();
286 if tick_secs >= TICK_DURATION_WARN_SECS {
287 tracing::warn!(
288 tick = tick_count, duration_secs = tick_secs,
289 "scheduler tick exceeded soft duration ceiling"
290 );
291 }
292 if tick_secs >= TICK_DURATION_ALERT_SECS
293 && let Some(ref wam) = state.wam
294 {
295 let cooldown_ok = last_overrun_alert
296 .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS);
297 if cooldown_ok {
298 let title = format!("Scheduler tick overran: {tick_secs}s");
299 let body = format!(
300 "tick #{tick_count} took {tick_secs}s (interval is {}s).",
301 constants::SCHEDULER_INTERVAL_SECS
302 );
303 wam.create_ticket(&title, Some(&body), "high", "scheduler-tick-overrun", None).await;
304 last_overrun_alert = Some(std::time::Instant::now());
305 }
306 }
307 }
308 })
309 }
310
311 #[cfg(test)]
312 mod tests {
313 use super::*;
314
315 // ── Tick cadence tests ──
316
317 #[test]
318 fn tick_1_runs_daily_not_weekly() {
319 let (sandbox, _hourly, daily, weekly) = jobs_for_tick(1);
320 assert!(!sandbox, "tick 1 is not a multiple of 5");
321 assert!(daily, "tick 1 should trigger daily jobs (first-tick rule)");
322 assert!(!weekly, "tick 1 should not trigger weekly jobs");
323 }
324
325 #[test]
326 fn tick_5_runs_sandbox_cleanup() {
327 let (sandbox, _hourly, daily, weekly) = jobs_for_tick(5);
328 assert!(sandbox);
329 assert!(!daily);
330 assert!(!weekly);
331 }
332
333 #[test]
334 fn tick_1440_runs_daily_and_sandbox() {
335 let (sandbox, _hourly, daily, weekly) = jobs_for_tick(1440);
336 assert!(sandbox, "1440 is divisible by 5");
337 assert!(daily, "1440 is the daily interval");
338 assert!(!weekly);
339 }
340
341 #[test]
342 fn tick_10080_runs_all_three() {
343 let (sandbox, _hourly, daily, weekly) = jobs_for_tick(10_080);
344 assert!(sandbox, "10080 is divisible by 5");
345 assert!(daily, "10080 is divisible by 1440");
346 assert!(weekly, "10080 is the weekly interval");
347 }
348
349 #[test]
350 fn normal_tick_runs_nothing_special() {
351 let (sandbox, _hourly, daily, weekly) = jobs_for_tick(7);
352 assert!(!sandbox);
353 assert!(!daily);
354 assert!(!weekly);
355 }
356
357 #[test]
358 fn second_daily_tick() {
359 let (_, _, daily, _) = jobs_for_tick(2880);
360 assert!(daily, "2880 = 2 * 1440");
361 }
362
363 #[test]
364 fn second_weekly_tick() {
365 let (_, _, _, weekly) = jobs_for_tick(20_160);
366 assert!(weekly, "20160 = 2 * 10080");
367 }
368
369 // ── Interval constant sanity checks ──
370
371 #[test]
372 fn drift_correction_interval_is_7_days() {
373 assert_eq!(DRIFT_CORRECTION_INTERVAL, 7 * 24 * 60);
374 }
375
376 #[test]
377 fn daily_interval_is_24_hours() {
378 assert_eq!(DAILY_INTERVAL, 24 * 60);
379 }
380
381 #[test]
382 fn sandbox_cleanup_interval_is_5_minutes() {
383 assert_eq!(SANDBOX_CLEANUP_INTERVAL, 5);
384 }
385
386 // ── Adversarial tests (test-fuzz) ──
387
388 #[test]
389 fn tick_0_runs_nothing() {
390 // Tick 0 should not run anything: 0 % N == 0 for all N,
391 // but tick 0 never happens in practice (counter starts at 0, increments before use).
392 // Test what would happen if it did.
393 let (sandbox, _hourly, daily, weekly) = jobs_for_tick(0);
394 // 0.is_multiple_of(N) is true for all N, so these all fire
395 assert!(sandbox, "0 is a multiple of 5");
396 assert!(daily, "0 is a multiple of 1440");
397 assert!(weekly, "0 is a multiple of 10080");
398 }
399
400 #[test]
401 fn large_tick_values() {
402 // 52 weeks of ticks — exact multiple of weekly interval
403 let fifty_two_weeks = 52 * DRIFT_CORRECTION_INTERVAL;
404 let (sandbox, _hourly, daily, weekly) = jobs_for_tick(fifty_two_weeks);
405 assert!(sandbox);
406 assert!(daily, "{} should be divisible by 1440", fifty_two_weeks);
407 assert!(weekly, "{} should be divisible by 10080", fifty_two_weeks);
408
409 // Large non-aligned tick
410 let (_, _, daily, weekly) = jobs_for_tick(999_999);
411 assert!(!daily, "999999 is not divisible by 1440");
412 assert!(!weekly, "999999 is not divisible by 10080");
413 }
414
415 #[test]
416 fn daily_not_on_partial_day() {
417 // Tick 720 = half a day, should not trigger daily
418 let (_, _, daily, _) = jobs_for_tick(720);
419 assert!(!daily, "720 ticks is only half a day");
420 }
421
422 #[test]
423 fn weekly_not_on_partial_week() {
424 // 5040 = 3.5 days, should not trigger weekly
425 let (_, _, _, weekly) = jobs_for_tick(5040);
426 assert!(!weekly, "5040 is only 3.5 days");
427 }
428
429 #[test]
430 fn sandbox_every_5_ticks_consecutively() {
431 for tick in 1..=25 {
432 let (sandbox, _, _, _) = jobs_for_tick(tick);
433 if tick % 5 == 0 {
434 assert!(sandbox, "tick {} should run sandbox cleanup", tick);
435 } else {
436 assert!(!sandbox, "tick {} should NOT run sandbox cleanup", tick);
437 }
438 }
439 }
440
441 #[test]
442 fn intervals_are_coprime_aware() {
443 // Verify weekly is an exact multiple of daily
444 assert_eq!(DRIFT_CORRECTION_INTERVAL % DAILY_INTERVAL, 0,
445 "weekly interval must be exact multiple of daily");
446 // Verify sandbox fits evenly into daily
447 assert_eq!(DAILY_INTERVAL % SANDBOX_CLEANUP_INTERVAL, 0,
448 "sandbox interval must divide evenly into daily");
449 }
450 }
451