//! Background health monitor — lightweight periodic checks with status-transition alerts. //! //! Periodically probes database connectivity and S3 availability, stores //! timestamped health snapshots, and tracks overall service status. On status //! transitions (Operational/Degraded/Error) sends email alerts to the admin //! address. Exposes the current status for the `/health` endpoint. //! //! See also: `/docs/tech/monitoring` use std::time::Instant; use tokio::sync::watch; use tokio::task::JoinHandle; use crate::constants; use crate::db; use crate::AppState; /// Overall service status. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum MonitorStatus { Operational, Degraded, Error, } impl MonitorStatus { pub fn as_str(&self) -> &'static str { match self { MonitorStatus::Operational => "operational", MonitorStatus::Degraded => "degraded", MonitorStatus::Error => "error", } } } /// Result of a single health check cycle. pub struct HealthSnapshot { pub status: MonitorStatus, pub db_ok: bool, pub s3_ok: bool, pub sessions_ok: bool, pub check_duration_ms: i32, } /// Run a lightweight health probe (no HTTP self-call, no table counts). pub async fn run_health_check(state: &AppState) -> HealthSnapshot { let start = Instant::now(); // 1. Database: SELECT 1 let db_ok = sqlx::query_scalar::<_, i32>("SELECT 1") .fetch_one(&state.db) .await .is_ok(); // 2. S3 connectivity (skip if not configured) let s3_ok = match &state.s3 { Some(s3) => match s3.check_connectivity().await { Ok(()) => true, Err(e) => { tracing::warn!(error = %e, "S3 connectivity check failed"); false } }, None => true, // not configured counts as OK }; // 3. Sessions: probe the session table let sessions_ok = sqlx::query_scalar::<_, bool>( "SELECT EXISTS(SELECT 1 FROM tower_sessions.session)", ) .fetch_one(&state.db) .await .is_ok(); let elapsed = start.elapsed(); let check_duration_ms = elapsed.as_millis().min(i32::MAX as u128) as i32; let status = if db_ok && s3_ok && sessions_ok { MonitorStatus::Operational } else if db_ok { MonitorStatus::Degraded } else { MonitorStatus::Error }; HealthSnapshot { status, db_ok, s3_ok, sessions_ok, check_duration_ms, } } /// Spawn the background monitor loop. Drop `shutdown_tx` to stop it. pub fn spawn_monitor( state: AppState, mut shutdown_rx: watch::Receiver<()>, ) -> JoinHandle<()> { tokio::spawn(async move { let alert_email = std::env::var("ALERT_EMAIL").ok(); match &alert_email { Some(email) => tracing::info!(alert_email = %email, "health monitor started"), None => tracing::info!("Health monitor started (ALERT_EMAIL not set — alerts disabled)"), } let interval_secs = std::env::var("HEALTH_CHECK_INTERVAL_SECS") .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(constants::HEALTH_CHECK_INTERVAL_SECS); let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); interval.tick().await; // consume immediate first tick let mut previous_status: Option = None; let mut last_alert_at: Option = None; let mut last_pool_alert_at: Option = None; // Hysteresis arming flag for DB pool pressure. True = next crossing // above the high threshold is allowed to fire a ticket. Resets to // true whenever pressure drops below the low threshold. Starts // true so the first overrun after boot can alert. let mut pool_pressure_armed: bool = true; let mut last_pg_activity_alert_at: Option = None; let mut prune_counter: u64 = 0; loop { tokio::select! { _ = interval.tick() => {} _ = shutdown_rx.changed() => { tracing::info!("Health monitor shutting down"); return; } } let snap = run_health_check(&state).await; // Update Prometheus gauges on every tick. DB pool + domain cache // are local and cheap; the pg_stat_activity probe is one fast // query against the shared Postgres. Storage fill aggregates // across all paying creators via a JOIN of users + creator_subs + // tier caps — sub-second today but grows linearly with paying // creator count, so we cache the result for 5 minutes rather // than burning a multi-second query 2880×/day at 10k+ creators. crate::metrics::record_db_pool_stats(&state.db); crate::metrics::record_domain_cache_size(state.domain_cache.len()); static STORAGE_FILL_LAST: std::sync::OnceLock< std::sync::Mutex, > = std::sync::OnceLock::new(); const STORAGE_FILL_TTL: std::time::Duration = std::time::Duration::from_secs(300); let last_lock = STORAGE_FILL_LAST.get_or_init(|| { // Initialize to "long ago" so the first tick refreshes. std::sync::Mutex::new( std::time::Instant::now() - STORAGE_FILL_TTL, ) }); let should_refresh = { let mut last = last_lock.lock().expect("storage-fill mutex"); if last.elapsed() >= STORAGE_FILL_TTL { *last = std::time::Instant::now(); true } else { false } }; if should_refresh { crate::metrics::record_storage_fill_stats(&state.db).await; } // Log status changes. Skip the bootstrap None->Operational transition // so clean restarts don't fire a spurious "recovered" alert. let status_changed = previous_status != Some(snap.status); let is_bootstrap_ok = previous_status.is_none() && snap.status == MonitorStatus::Operational; if status_changed && !is_bootstrap_ok { match snap.status { MonitorStatus::Operational => { if previous_status.is_some() { tracing::info!(duration_ms = snap.check_duration_ms, "health recovered, operational"); } } MonitorStatus::Degraded => { tracing::warn!( db = snap.db_ok, s3 = snap.s3_ok, sessions = snap.sessions_ok, duration_ms = snap.check_duration_ms, "health degraded" ); } MonitorStatus::Error => { tracing::error!( db = snap.db_ok, s3 = snap.s3_ok, sessions = snap.sessions_ok, duration_ms = snap.check_duration_ms, "health error" ); } } // Status-change notifications (admin alert + user notifications) share // a single cooldown so a flapping monitor cannot spam either audience. let cooldown_elapsed = last_alert_at .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS); if cooldown_elapsed { if let Some(ref to) = alert_email { let (subject, body) = build_alert(previous_status, &snap); match state.email.send_alert(to, &subject, &body).await { Ok(()) => tracing::info!(recipient = %to, "alert email sent"), Err(e) => tracing::error!(error = ?e, "failed to send alert email"), } } // Notify opted-in users of status changes (fire-and-forget). // Paced at ~10/sec to stay under Postmark's default send rate. { let pool = state.db.clone(); let email_client = state.email.clone(); let host_url = state.config.host_url.clone(); let signing_secret = state.config.signing_secret.clone(); let current_status = snap.status.as_str().to_string(); let prev_status = previous_status.map_or("unknown", |s| s.as_str()).to_string(); tokio::spawn(async move { match db::users::get_status_alert_subscribers(&pool).await { Ok(subscribers) if !subscribers.is_empty() => { tracing::info!(count = subscribers.len(), "sending status notifications to opted-in users"); for sub in &subscribers { let unsub_url = crate::email::generate_unsubscribe_url( &host_url, sub.id, crate::email::UnsubscribeAction::Status, &sub.id.to_string(), &signing_secret, ); let _ = email_client.send_status_notification( &sub.email, sub.display_name.as_deref(), ¤t_status, &prev_status, &unsub_url, ).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } Err(e) => { tracing::error!(error = ?e, "failed to query status alert subscribers"); } _ => {} } }); } // Only record the cooldown timestamp once we've issued at least // one notification path (admin or subscribers); admin block above // is gated on `alert_email` being set, but subscriber fan-out is // always spawned. Always-set is fine since the goal is "don't // re-notify within ALERT_COOLDOWN_SECS regardless of audience." last_alert_at = Some(Instant::now()); } // Create WAM ticket on degradation/error transitions if snap.status != MonitorStatus::Operational && let Some(ref wam) = state.wam { let priority = match snap.status { MonitorStatus::Error => "critical", MonitorStatus::Degraded => "high", MonitorStatus::Operational => unreachable!(), }; let title = format!("Health status: {}", snap.status.as_str()); let body = format!( "db: {}\ns3: {}\nsessions: {}\ncheck_ms: {}", snap.db_ok, snap.s3_ok, snap.sessions_ok, snap.check_duration_ms, ); wam.create_ticket(&title, Some(&body), priority, "health-status-change", None).await; } } previous_status = Some(snap.status); // DB pool pressure check with hysteresis: open at 80%, only // re-alert after pressure drops below 60% and climbs back over // 80%. Without hysteresis, a load that oscillates around the // threshold (e.g. each scheduler tick briefly maxes the pool) // spams a ticket every ALERT_COOLDOWN_SECS window. { let pool_size = state.db.size(); let pool_idle = state.db.num_idle() as u32; let active = pool_size.saturating_sub(pool_idle); let pct = (active * 100).checked_div(pool_size).unwrap_or(0); let high = 80u32; let low = 60u32; if pct > high { tracing::warn!(pool_size, active, idle = pool_idle, pct, "DB pool pressure >80%"); // Only fire a ticket on the rising edge (was below the low // threshold last we recovered). The cooldown is still here // as a backstop in case the hysteresis state machine ever // gets confused. let cooldown_ok = last_pool_alert_at .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS); if cooldown_ok && pool_pressure_armed && let Some(ref wam) = state.wam { let title = format!("DB pool pressure: {active}/{pool_size} active"); wam.create_ticket(&title, None, "high", "db-pool-pressure", None).await; last_pool_alert_at = Some(Instant::now()); pool_pressure_armed = false; // wait for drop below `low` before re-arming } } else if pct < low { pool_pressure_armed = true; } // Between `low` and `high` we hold the current armed state — // that's the dead-band where neither edge triggers. } // Postgres server-wide saturation check via pg_stat_activity. This // catches cases where the *shared* Postgres (MNW + MT pools + ad hoc // clients) is approaching `max_connections`, which the local pool // pressure check above cannot see. The probe also emits Prometheus // gauges via `record_pg_stat_activity` so dashboards can graph the // ratio over time, not just react to the alert threshold. // // Cadence note: this runs on EVERY monitor tick (default 30s) // because the gauges are load-bearing for the operator dashboard // — a 30s refresh on connection-utilization is the right tradeoff. // The probe itself is a single-row query against a system view; // its cost is on the order of microseconds. if let Some((active, max_conn)) = crate::metrics::record_pg_stat_activity(&state.db).await { let pct = active * 100 / max_conn; if pct > 80 { tracing::warn!( active, max_conn, pct, "Postgres pg_stat_activity saturation >80%" ); let cooldown_ok = last_pg_activity_alert_at .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS); if cooldown_ok && let Some(ref wam) = state.wam { let title = format!( "Postgres saturation: {active}/{max_conn} client backends ({pct}%)" ); let body = format!( "pg_stat_activity client-backend count is at {pct}% of \ max_connections ({active}/{max_conn}). Shared Postgres serves \ MNW + MT + ad hoc clients; exhaustion will fail new connections \ for all of them. Investigate which role/application is holding \ connections via:\n\n \ SELECT usename, application_name, state, count(*) \ FROM pg_stat_activity GROUP BY 1,2,3 ORDER BY 4 DESC;" ); wam.create_ticket( &title, Some(&body), "high", "pg-stat-activity-saturation", None, ) .await; last_pg_activity_alert_at = Some(Instant::now()); } } } // Persist snapshot (best-effort) if let Err(e) = db::monitor::insert_health_history( &state.db, snap.status.as_str(), snap.db_ok, snap.s3_ok, snap.sessions_ok, snap.check_duration_ms, None, ) .await { tracing::warn!(error = ?e, "failed to insert health history"); } // Prune expired session cache entries every cycle let cache_ttl = std::time::Duration::from_secs(constants::SESSION_TOUCH_CACHE_SECS); state.session_cache.retain(|_, validated_at| validated_at.elapsed() < cache_ttl); // Prune old records once per day (~1440 checks at 60s interval) prune_counter += 1; if prune_counter.is_multiple_of(1440) { match db::monitor::prune_health_history(&state.db, constants::HEALTH_HISTORY_RETAIN_DAYS).await { Ok(deleted) if deleted > 0 => { tracing::info!(deleted = deleted, "pruned old health history records"); } Err(e) => { tracing::warn!(error = ?e, "failed to prune health history"); } _ => {} } // Prune old sync log entries (age-based: 90 days) match db::synckit::prune_sync_log(&state.db, constants::SYNC_LOG_RETAIN_DAYS).await { Ok(deleted) if deleted > 0 => { tracing::info!(deleted = deleted, "pruned old sync log records"); } Err(e) => { tracing::warn!(error = ?e, "failed to prune sync log"); } _ => {} } // Cursor-based compaction: remove entries all devices have pulled (7-day safety margin) match db::synckit::compact_all_sync_logs(&state.db, constants::SYNC_LOG_COMPACT_MIN_AGE_DAYS).await { Ok(deleted) if deleted > 0 => { tracing::info!(deleted = deleted, "compacted sync log (cursor-based)"); } Err(e) => { tracing::warn!(error = ?e, "failed to compact sync log"); } _ => {} } // Clean up expired/used OAuth authorization codes match db::oauth::cleanup_expired_oauth_codes(&state.db).await { Ok(deleted) if deleted > 0 => { tracing::info!(deleted = deleted, "cleaned up expired OAuth codes"); } Err(e) => { tracing::warn!(error = ?e, "failed to clean up OAuth codes"); } _ => {} } } } }) } /// Build alert email subject + body for a status transition. fn build_alert( previous: Option, snap: &HealthSnapshot, ) -> (String, String) { let subject = match snap.status { MonitorStatus::Operational => "MNW recovered — all services operational".to_string(), MonitorStatus::Degraded => "MNW degraded — partial service failure".to_string(), MonitorStatus::Error => "MNW down — critical service failure".to_string(), }; let body = format!( "Status: {} (was: {})\n\n\ DB: {}\n\ S3: {}\n\ Sessions: {}\n\ Check duration: {}ms", snap.status.as_str(), previous.map_or("unknown", |s| s.as_str()), if snap.db_ok { "OK" } else { "FAIL" }, if snap.s3_ok { "OK" } else { "FAIL" }, if snap.sessions_ok { "OK" } else { "FAIL" }, snap.check_duration_ms, ); (subject, body) } #[cfg(test)] mod tests { use super::*; fn snapshot(status: MonitorStatus, db: bool, s3: bool, sessions: bool) -> HealthSnapshot { HealthSnapshot { status, db_ok: db, s3_ok: s3, sessions_ok: sessions, check_duration_ms: 42, } } #[test] fn alert_recovery() { let snap = snapshot(MonitorStatus::Operational, true, true, true); let (subject, body) = build_alert(Some(MonitorStatus::Error), &snap); assert!(subject.contains("recovered")); assert!(body.contains("operational")); assert!(body.contains("was: error")); } #[test] fn alert_degraded() { let snap = snapshot(MonitorStatus::Degraded, true, false, true); let (subject, body) = build_alert(Some(MonitorStatus::Operational), &snap); assert!(subject.contains("degraded")); assert!(body.contains("S3: FAIL")); assert!(body.contains("DB: OK")); } #[test] fn alert_error() { let snap = snapshot(MonitorStatus::Error, false, false, false); let (subject, body) = build_alert(None, &snap); assert!(subject.contains("down")); assert!(body.contains("was: unknown")); assert!(body.contains("DB: FAIL")); assert!(body.contains("42ms")); } #[test] fn status_as_str() { assert_eq!(MonitorStatus::Operational.as_str(), "operational"); assert_eq!(MonitorStatus::Degraded.as_str(), "degraded"); assert_eq!(MonitorStatus::Error.as_str(), "error"); } // -- Status determination logic -- // These test the status-derivation rules from run_health_check: // all OK -> Operational, db OK but others fail -> Degraded, db fail -> Error #[test] fn status_all_ok_is_operational() { let snap = snapshot(MonitorStatus::Operational, true, true, true); assert_eq!(snap.status, MonitorStatus::Operational); } #[test] fn status_s3_fail_is_degraded() { // db OK, s3 fail, sessions OK -> Degraded let snap = snapshot(MonitorStatus::Degraded, true, false, true); assert_eq!(snap.status, MonitorStatus::Degraded); } #[test] fn status_sessions_fail_is_degraded() { // db OK, s3 OK, sessions fail -> Degraded let snap = snapshot(MonitorStatus::Degraded, true, true, false); assert_eq!(snap.status, MonitorStatus::Degraded); } #[test] fn status_s3_and_sessions_fail_is_degraded() { // db OK, both s3 and sessions fail -> still Degraded (db is up) let snap = snapshot(MonitorStatus::Degraded, true, false, false); assert_eq!(snap.status, MonitorStatus::Degraded); } #[test] fn status_db_fail_is_error() { // db fail -> Error regardless of other checks let snap = snapshot(MonitorStatus::Error, false, true, true); assert_eq!(snap.status, MonitorStatus::Error); } #[test] fn status_all_fail_is_error() { let snap = snapshot(MonitorStatus::Error, false, false, false); assert_eq!(snap.status, MonitorStatus::Error); } // -- build_alert coverage -- #[test] fn alert_from_unknown_to_operational() { let snap = snapshot(MonitorStatus::Operational, true, true, true); let (subject, body) = build_alert(None, &snap); assert!(subject.contains("recovered")); assert!(body.contains("was: unknown")); assert!(body.contains("DB: OK")); assert!(body.contains("S3: OK")); assert!(body.contains("Sessions: OK")); } #[test] fn alert_from_degraded_to_error() { let snap = snapshot(MonitorStatus::Error, false, false, true); let (subject, body) = build_alert(Some(MonitorStatus::Degraded), &snap); assert!(subject.contains("down")); assert!(body.contains("was: degraded")); assert!(body.contains("DB: FAIL")); assert!(body.contains("S3: FAIL")); assert!(body.contains("Sessions: OK")); } #[test] fn alert_from_error_to_degraded() { let snap = snapshot(MonitorStatus::Degraded, true, false, true); let (subject, body) = build_alert(Some(MonitorStatus::Error), &snap); assert!(subject.contains("degraded")); assert!(body.contains("was: error")); } #[test] fn alert_body_includes_check_duration() { let snap = HealthSnapshot { status: MonitorStatus::Operational, db_ok: true, s3_ok: true, sessions_ok: true, check_duration_ms: 9999, }; let (_subject, body) = build_alert(None, &snap); assert!(body.contains("9999ms")); } // -- MonitorStatus equality -- #[test] fn status_equality() { assert_eq!(MonitorStatus::Operational, MonitorStatus::Operational); assert_ne!(MonitorStatus::Operational, MonitorStatus::Degraded); assert_ne!(MonitorStatus::Degraded, MonitorStatus::Error); } }