Skip to main content

max / makenotwork

25.0 KB · 602 lines History Blame Raw
1 //! Background health monitor — lightweight periodic checks with status-transition alerts.
2 //!
3 //! Periodically probes database connectivity and S3 availability, stores
4 //! timestamped health snapshots, and tracks overall service status. On status
5 //! transitions (Operational/Degraded/Error) sends email alerts to the admin
6 //! address. Exposes the current status for the `/health` endpoint.
7 //!
8 //! See also: `/docs/tech/monitoring`
9
10 use std::time::Instant;
11 use tokio::sync::watch;
12 use tokio::task::JoinHandle;
13
14 use crate::constants;
15 use crate::db;
16 use crate::AppState;
17
18 /// Overall service status.
19 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
20 pub enum MonitorStatus {
21 Operational,
22 Degraded,
23 Error,
24 }
25
26 impl MonitorStatus {
27 pub fn as_str(&self) -> &'static str {
28 match self {
29 MonitorStatus::Operational => "operational",
30 MonitorStatus::Degraded => "degraded",
31 MonitorStatus::Error => "error",
32 }
33 }
34 }
35
36 /// Result of a single health check cycle.
37 pub struct HealthSnapshot {
38 pub status: MonitorStatus,
39 pub db_ok: bool,
40 pub s3_ok: bool,
41 pub sessions_ok: bool,
42 pub check_duration_ms: i32,
43 }
44
45 /// Run a lightweight health probe (no HTTP self-call, no table counts).
46 pub async fn run_health_check(state: &AppState) -> HealthSnapshot {
47 let start = Instant::now();
48
49 // 1. Database: SELECT 1
50 let db_ok = sqlx::query_scalar::<_, i32>("SELECT 1")
51 .fetch_one(&state.db)
52 .await
53 .is_ok();
54
55 // 2. S3 connectivity (skip if not configured)
56 let s3_ok = match &state.s3 {
57 Some(s3) => match s3.check_connectivity().await {
58 Ok(()) => true,
59 Err(e) => {
60 tracing::warn!(error = %e, "S3 connectivity check failed");
61 false
62 }
63 },
64 None => true, // not configured counts as OK
65 };
66
67 // 3. Sessions: probe the session table
68 let sessions_ok = sqlx::query_scalar::<_, bool>(
69 "SELECT EXISTS(SELECT 1 FROM tower_sessions.session)",
70 )
71 .fetch_one(&state.db)
72 .await
73 .is_ok();
74
75 let elapsed = start.elapsed();
76 let check_duration_ms = elapsed.as_millis().min(i32::MAX as u128) as i32;
77
78 let status = if db_ok && s3_ok && sessions_ok {
79 MonitorStatus::Operational
80 } else if db_ok {
81 MonitorStatus::Degraded
82 } else {
83 MonitorStatus::Error
84 };
85
86 HealthSnapshot {
87 status,
88 db_ok,
89 s3_ok,
90 sessions_ok,
91 check_duration_ms,
92 }
93 }
94
95 /// Spawn the background monitor loop. Drop `shutdown_tx` to stop it.
96 pub fn spawn_monitor(
97 state: AppState,
98 mut shutdown_rx: watch::Receiver<()>,
99 ) -> JoinHandle<()> {
100 tokio::spawn(async move {
101 let alert_email = std::env::var("ALERT_EMAIL").ok();
102 match &alert_email {
103 Some(email) => tracing::info!(alert_email = %email, "health monitor started"),
104 None => tracing::info!("Health monitor started (ALERT_EMAIL not set — alerts disabled)"),
105 }
106
107 let interval_secs = std::env::var("HEALTH_CHECK_INTERVAL_SECS")
108 .ok()
109 .and_then(|v| v.parse::<u64>().ok())
110 .unwrap_or(constants::HEALTH_CHECK_INTERVAL_SECS);
111
112 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
113 interval.tick().await; // consume immediate first tick
114
115 let mut previous_status: Option<MonitorStatus> = None;
116 let mut last_alert_at: Option<Instant> = None;
117 let mut last_pool_alert_at: Option<Instant> = None;
118 // Hysteresis arming flag for DB pool pressure. True = next crossing
119 // above the high threshold is allowed to fire a ticket. Resets to
120 // true whenever pressure drops below the low threshold. Starts
121 // true so the first overrun after boot can alert.
122 let mut pool_pressure_armed: bool = true;
123 let mut last_pg_activity_alert_at: Option<Instant> = None;
124 let mut prune_counter: u64 = 0;
125
126 loop {
127 tokio::select! {
128 _ = interval.tick() => {}
129 _ = shutdown_rx.changed() => {
130 tracing::info!("Health monitor shutting down");
131 return;
132 }
133 }
134
135 let snap = run_health_check(&state).await;
136
137 // Update Prometheus gauges on every tick. DB pool + domain cache
138 // are local and cheap; the pg_stat_activity probe is one fast
139 // query against the shared Postgres. Storage fill aggregates
140 // across all paying creators via a JOIN of users + creator_subs +
141 // tier caps — sub-second today but grows linearly with paying
142 // creator count, so we cache the result for 5 minutes rather
143 // than burning a multi-second query 2880×/day at 10k+ creators.
144 crate::metrics::record_db_pool_stats(&state.db);
145 crate::metrics::record_domain_cache_size(state.domain_cache.len());
146 static STORAGE_FILL_LAST: std::sync::OnceLock<
147 std::sync::Mutex<std::time::Instant>,
148 > = std::sync::OnceLock::new();
149 const STORAGE_FILL_TTL: std::time::Duration =
150 std::time::Duration::from_secs(300);
151 let last_lock = STORAGE_FILL_LAST.get_or_init(|| {
152 // Initialize to "long ago" so the first tick refreshes.
153 std::sync::Mutex::new(
154 std::time::Instant::now() - STORAGE_FILL_TTL,
155 )
156 });
157 let should_refresh = {
158 let mut last = last_lock.lock().expect("storage-fill mutex");
159 if last.elapsed() >= STORAGE_FILL_TTL {
160 *last = std::time::Instant::now();
161 true
162 } else {
163 false
164 }
165 };
166 if should_refresh {
167 crate::metrics::record_storage_fill_stats(&state.db).await;
168 }
169
170 // Log status changes. Skip the bootstrap None->Operational transition
171 // so clean restarts don't fire a spurious "recovered" alert.
172 let status_changed = previous_status != Some(snap.status);
173 let is_bootstrap_ok =
174 previous_status.is_none() && snap.status == MonitorStatus::Operational;
175 if status_changed && !is_bootstrap_ok {
176 match snap.status {
177 MonitorStatus::Operational => {
178 if previous_status.is_some() {
179 tracing::info!(duration_ms = snap.check_duration_ms, "health recovered, operational");
180 }
181 }
182 MonitorStatus::Degraded => {
183 tracing::warn!(
184 db = snap.db_ok, s3 = snap.s3_ok, sessions = snap.sessions_ok,
185 duration_ms = snap.check_duration_ms, "health degraded"
186 );
187 }
188 MonitorStatus::Error => {
189 tracing::error!(
190 db = snap.db_ok, s3 = snap.s3_ok, sessions = snap.sessions_ok,
191 duration_ms = snap.check_duration_ms, "health error"
192 );
193 }
194 }
195
196 // Status-change notifications (admin alert + user notifications) share
197 // a single cooldown so a flapping monitor cannot spam either audience.
198 let cooldown_elapsed = last_alert_at
199 .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS);
200
201 if cooldown_elapsed {
202 if let Some(ref to) = alert_email {
203 let (subject, body) = build_alert(previous_status, &snap);
204 match state.email.send_alert(to, &subject, &body).await {
205 Ok(()) => tracing::info!(recipient = %to, "alert email sent"),
206 Err(e) => tracing::error!(error = ?e, "failed to send alert email"),
207 }
208 }
209
210 // Notify opted-in users of status changes (fire-and-forget).
211 // Paced at ~10/sec to stay under Postmark's default send rate.
212 {
213 let pool = state.db.clone();
214 let email_client = state.email.clone();
215 let host_url = state.config.host_url.clone();
216 let signing_secret = state.config.signing_secret.clone();
217 let current_status = snap.status.as_str().to_string();
218 let prev_status = previous_status.map_or("unknown", |s| s.as_str()).to_string();
219 tokio::spawn(async move {
220 match db::users::get_status_alert_subscribers(&pool).await {
221 Ok(subscribers) if !subscribers.is_empty() => {
222 tracing::info!(count = subscribers.len(), "sending status notifications to opted-in users");
223 for sub in &subscribers {
224 let unsub_url = crate::email::generate_unsubscribe_url(
225 &host_url, sub.id, crate::email::UnsubscribeAction::Status, &sub.id.to_string(), &signing_secret,
226 );
227 let _ = email_client.send_status_notification(
228 &sub.email,
229 sub.display_name.as_deref(),
230 &current_status,
231 &prev_status,
232 &unsub_url,
233 ).await;
234 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
235 }
236 }
237 Err(e) => {
238 tracing::error!(error = ?e, "failed to query status alert subscribers");
239 }
240 _ => {}
241 }
242 });
243 }
244
245 // Only record the cooldown timestamp once we've issued at least
246 // one notification path (admin or subscribers); admin block above
247 // is gated on `alert_email` being set, but subscriber fan-out is
248 // always spawned. Always-set is fine since the goal is "don't
249 // re-notify within ALERT_COOLDOWN_SECS regardless of audience."
250 last_alert_at = Some(Instant::now());
251 }
252
253 // Create WAM ticket on degradation/error transitions
254 if snap.status != MonitorStatus::Operational
255 && let Some(ref wam) = state.wam
256 {
257 let priority = match snap.status {
258 MonitorStatus::Error => "critical",
259 MonitorStatus::Degraded => "high",
260 MonitorStatus::Operational => unreachable!(),
261 };
262 let title = format!("Health status: {}", snap.status.as_str());
263 let body = format!(
264 "db: {}\ns3: {}\nsessions: {}\ncheck_ms: {}",
265 snap.db_ok, snap.s3_ok, snap.sessions_ok, snap.check_duration_ms,
266 );
267 wam.create_ticket(&title, Some(&body), priority, "health-status-change", None).await;
268 }
269 }
270
271 previous_status = Some(snap.status);
272
273 // DB pool pressure check with hysteresis: open at 80%, only
274 // re-alert after pressure drops below 60% and climbs back over
275 // 80%. Without hysteresis, a load that oscillates around the
276 // threshold (e.g. each scheduler tick briefly maxes the pool)
277 // spams a ticket every ALERT_COOLDOWN_SECS window.
278 {
279 let pool_size = state.db.size();
280 let pool_idle = state.db.num_idle() as u32;
281 let active = pool_size.saturating_sub(pool_idle);
282 let pct = (active * 100).checked_div(pool_size).unwrap_or(0);
283 let high = 80u32;
284 let low = 60u32;
285
286 if pct > high {
287 tracing::warn!(pool_size, active, idle = pool_idle, pct, "DB pool pressure >80%");
288 // Only fire a ticket on the rising edge (was below the low
289 // threshold last we recovered). The cooldown is still here
290 // as a backstop in case the hysteresis state machine ever
291 // gets confused.
292 let cooldown_ok = last_pool_alert_at
293 .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS);
294 if cooldown_ok && pool_pressure_armed && let Some(ref wam) = state.wam {
295 let title = format!("DB pool pressure: {active}/{pool_size} active");
296 wam.create_ticket(&title, None, "high", "db-pool-pressure", None).await;
297 last_pool_alert_at = Some(Instant::now());
298 pool_pressure_armed = false; // wait for drop below `low` before re-arming
299 }
300 } else if pct < low {
301 pool_pressure_armed = true;
302 }
303 // Between `low` and `high` we hold the current armed state —
304 // that's the dead-band where neither edge triggers.
305 }
306
307 // Postgres server-wide saturation check via pg_stat_activity. This
308 // catches cases where the *shared* Postgres (MNW + MT pools + ad hoc
309 // clients) is approaching `max_connections`, which the local pool
310 // pressure check above cannot see. The probe also emits Prometheus
311 // gauges via `record_pg_stat_activity` so dashboards can graph the
312 // ratio over time, not just react to the alert threshold.
313 //
314 // Cadence note: this runs on EVERY monitor tick (default 30s)
315 // because the gauges are load-bearing for the operator dashboard
316 // — a 30s refresh on connection-utilization is the right tradeoff.
317 // The probe itself is a single-row query against a system view;
318 // its cost is on the order of microseconds.
319 if let Some((active, max_conn)) = crate::metrics::record_pg_stat_activity(&state.db).await {
320 let pct = active * 100 / max_conn;
321 if pct > 80 {
322 tracing::warn!(
323 active,
324 max_conn,
325 pct,
326 "Postgres pg_stat_activity saturation >80%"
327 );
328 let cooldown_ok = last_pg_activity_alert_at
329 .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS);
330 if cooldown_ok
331 && let Some(ref wam) = state.wam
332 {
333 let title = format!(
334 "Postgres saturation: {active}/{max_conn} client backends ({pct}%)"
335 );
336 let body = format!(
337 "pg_stat_activity client-backend count is at {pct}% of \
338 max_connections ({active}/{max_conn}). Shared Postgres serves \
339 MNW + MT + ad hoc clients; exhaustion will fail new connections \
340 for all of them. Investigate which role/application is holding \
341 connections via:\n\n \
342 SELECT usename, application_name, state, count(*) \
343 FROM pg_stat_activity GROUP BY 1,2,3 ORDER BY 4 DESC;"
344 );
345 wam.create_ticket(
346 &title,
347 Some(&body),
348 "high",
349 "pg-stat-activity-saturation",
350 None,
351 )
352 .await;
353 last_pg_activity_alert_at = Some(Instant::now());
354 }
355 }
356 }
357
358 // Persist snapshot (best-effort)
359 if let Err(e) = db::monitor::insert_health_history(
360 &state.db,
361 snap.status.as_str(),
362 snap.db_ok,
363 snap.s3_ok,
364 snap.sessions_ok,
365 snap.check_duration_ms,
366 None,
367 )
368 .await
369 {
370 tracing::warn!(error = ?e, "failed to insert health history");
371 }
372
373 // Prune expired session cache entries every cycle
374 let cache_ttl = std::time::Duration::from_secs(constants::SESSION_TOUCH_CACHE_SECS);
375 state.session_cache.retain(|_, validated_at| validated_at.elapsed() < cache_ttl);
376
377 // Prune old records once per day (~1440 checks at 60s interval)
378 prune_counter += 1;
379 if prune_counter.is_multiple_of(1440) {
380 match db::monitor::prune_health_history(&state.db, constants::HEALTH_HISTORY_RETAIN_DAYS).await {
381 Ok(deleted) if deleted > 0 => {
382 tracing::info!(deleted = deleted, "pruned old health history records");
383 }
384 Err(e) => {
385 tracing::warn!(error = ?e, "failed to prune health history");
386 }
387 _ => {}
388 }
389
390 // Prune old sync log entries (age-based: 90 days)
391 match db::synckit::prune_sync_log(&state.db, constants::SYNC_LOG_RETAIN_DAYS).await {
392 Ok(deleted) if deleted > 0 => {
393 tracing::info!(deleted = deleted, "pruned old sync log records");
394 }
395 Err(e) => {
396 tracing::warn!(error = ?e, "failed to prune sync log");
397 }
398 _ => {}
399 }
400
401 // Cursor-based compaction: remove entries all devices have pulled (7-day safety margin)
402 match db::synckit::compact_all_sync_logs(&state.db, constants::SYNC_LOG_COMPACT_MIN_AGE_DAYS).await {
403 Ok(deleted) if deleted > 0 => {
404 tracing::info!(deleted = deleted, "compacted sync log (cursor-based)");
405 }
406 Err(e) => {
407 tracing::warn!(error = ?e, "failed to compact sync log");
408 }
409 _ => {}
410 }
411
412 // Clean up expired/used OAuth authorization codes
413 match db::oauth::cleanup_expired_oauth_codes(&state.db).await {
414 Ok(deleted) if deleted > 0 => {
415 tracing::info!(deleted = deleted, "cleaned up expired OAuth codes");
416 }
417 Err(e) => {
418 tracing::warn!(error = ?e, "failed to clean up OAuth codes");
419 }
420 _ => {}
421 }
422 }
423 }
424 })
425 }
426
427 /// Build alert email subject + body for a status transition.
428 fn build_alert(
429 previous: Option<MonitorStatus>,
430 snap: &HealthSnapshot,
431 ) -> (String, String) {
432 let subject = match snap.status {
433 MonitorStatus::Operational => "MNW recovered — all services operational".to_string(),
434 MonitorStatus::Degraded => "MNW degraded — partial service failure".to_string(),
435 MonitorStatus::Error => "MNW down — critical service failure".to_string(),
436 };
437
438 let body = format!(
439 "Status: {} (was: {})\n\n\
440 DB: {}\n\
441 S3: {}\n\
442 Sessions: {}\n\
443 Check duration: {}ms",
444 snap.status.as_str(),
445 previous.map_or("unknown", |s| s.as_str()),
446 if snap.db_ok { "OK" } else { "FAIL" },
447 if snap.s3_ok { "OK" } else { "FAIL" },
448 if snap.sessions_ok { "OK" } else { "FAIL" },
449 snap.check_duration_ms,
450 );
451
452 (subject, body)
453 }
454
455 #[cfg(test)]
456 mod tests {
457 use super::*;
458
459 fn snapshot(status: MonitorStatus, db: bool, s3: bool, sessions: bool) -> HealthSnapshot {
460 HealthSnapshot {
461 status,
462 db_ok: db,
463 s3_ok: s3,
464 sessions_ok: sessions,
465 check_duration_ms: 42,
466 }
467 }
468
469 #[test]
470 fn alert_recovery() {
471 let snap = snapshot(MonitorStatus::Operational, true, true, true);
472 let (subject, body) = build_alert(Some(MonitorStatus::Error), &snap);
473 assert!(subject.contains("recovered"));
474 assert!(body.contains("operational"));
475 assert!(body.contains("was: error"));
476 }
477
478 #[test]
479 fn alert_degraded() {
480 let snap = snapshot(MonitorStatus::Degraded, true, false, true);
481 let (subject, body) = build_alert(Some(MonitorStatus::Operational), &snap);
482 assert!(subject.contains("degraded"));
483 assert!(body.contains("S3: FAIL"));
484 assert!(body.contains("DB: OK"));
485 }
486
487 #[test]
488 fn alert_error() {
489 let snap = snapshot(MonitorStatus::Error, false, false, false);
490 let (subject, body) = build_alert(None, &snap);
491 assert!(subject.contains("down"));
492 assert!(body.contains("was: unknown"));
493 assert!(body.contains("DB: FAIL"));
494 assert!(body.contains("42ms"));
495 }
496
497 #[test]
498 fn status_as_str() {
499 assert_eq!(MonitorStatus::Operational.as_str(), "operational");
500 assert_eq!(MonitorStatus::Degraded.as_str(), "degraded");
501 assert_eq!(MonitorStatus::Error.as_str(), "error");
502 }
503
504 // -- Status determination logic --
505 // These test the status-derivation rules from run_health_check:
506 // all OK -> Operational, db OK but others fail -> Degraded, db fail -> Error
507
508 #[test]
509 fn status_all_ok_is_operational() {
510 let snap = snapshot(MonitorStatus::Operational, true, true, true);
511 assert_eq!(snap.status, MonitorStatus::Operational);
512 }
513
514 #[test]
515 fn status_s3_fail_is_degraded() {
516 // db OK, s3 fail, sessions OK -> Degraded
517 let snap = snapshot(MonitorStatus::Degraded, true, false, true);
518 assert_eq!(snap.status, MonitorStatus::Degraded);
519 }
520
521 #[test]
522 fn status_sessions_fail_is_degraded() {
523 // db OK, s3 OK, sessions fail -> Degraded
524 let snap = snapshot(MonitorStatus::Degraded, true, true, false);
525 assert_eq!(snap.status, MonitorStatus::Degraded);
526 }
527
528 #[test]
529 fn status_s3_and_sessions_fail_is_degraded() {
530 // db OK, both s3 and sessions fail -> still Degraded (db is up)
531 let snap = snapshot(MonitorStatus::Degraded, true, false, false);
532 assert_eq!(snap.status, MonitorStatus::Degraded);
533 }
534
535 #[test]
536 fn status_db_fail_is_error() {
537 // db fail -> Error regardless of other checks
538 let snap = snapshot(MonitorStatus::Error, false, true, true);
539 assert_eq!(snap.status, MonitorStatus::Error);
540 }
541
542 #[test]
543 fn status_all_fail_is_error() {
544 let snap = snapshot(MonitorStatus::Error, false, false, false);
545 assert_eq!(snap.status, MonitorStatus::Error);
546 }
547
548 // -- build_alert coverage --
549
550 #[test]
551 fn alert_from_unknown_to_operational() {
552 let snap = snapshot(MonitorStatus::Operational, true, true, true);
553 let (subject, body) = build_alert(None, &snap);
554 assert!(subject.contains("recovered"));
555 assert!(body.contains("was: unknown"));
556 assert!(body.contains("DB: OK"));
557 assert!(body.contains("S3: OK"));
558 assert!(body.contains("Sessions: OK"));
559 }
560
561 #[test]
562 fn alert_from_degraded_to_error() {
563 let snap = snapshot(MonitorStatus::Error, false, false, true);
564 let (subject, body) = build_alert(Some(MonitorStatus::Degraded), &snap);
565 assert!(subject.contains("down"));
566 assert!(body.contains("was: degraded"));
567 assert!(body.contains("DB: FAIL"));
568 assert!(body.contains("S3: FAIL"));
569 assert!(body.contains("Sessions: OK"));
570 }
571
572 #[test]
573 fn alert_from_error_to_degraded() {
574 let snap = snapshot(MonitorStatus::Degraded, true, false, true);
575 let (subject, body) = build_alert(Some(MonitorStatus::Error), &snap);
576 assert!(subject.contains("degraded"));
577 assert!(body.contains("was: error"));
578 }
579
580 #[test]
581 fn alert_body_includes_check_duration() {
582 let snap = HealthSnapshot {
583 status: MonitorStatus::Operational,
584 db_ok: true,
585 s3_ok: true,
586 sessions_ok: true,
587 check_duration_ms: 9999,
588 };
589 let (_subject, body) = build_alert(None, &snap);
590 assert!(body.contains("9999ms"));
591 }
592
593 // -- MonitorStatus equality --
594
595 #[test]
596 fn status_equality() {
597 assert_eq!(MonitorStatus::Operational, MonitorStatus::Operational);
598 assert_ne!(MonitorStatus::Operational, MonitorStatus::Degraded);
599 assert_ne!(MonitorStatus::Degraded, MonitorStatus::Error);
600 }
601 }
602