Skip to main content

max / pom

6.6 KB · 142 lines History Blame Raw
1 use tokio::task::JoinHandle;
2 use tracing::info;
3
4 use pom::alerts::Alerter;
5 use pom::checks::http;
6 use pom::config::Config;
7 use pom::db;
8 use pom::types::{HealthStatus, LatencyStats};
9
10 use super::super::incident::{incident_action, IncidentAction};
11
12 pub(crate) fn spawn_health_tasks(
13 config: &Config,
14 pool: &sqlx::SqlitePool,
15 cancel: &tokio_util::sync::CancellationToken,
16 alerter: &Option<Alerter>,
17 ) -> Vec<JoinHandle<()>> {
18 let default_interval = config.serve.interval_secs;
19 let mut handles = Vec::new();
20
21 for name in config.target_names() {
22 let target_config = config.get_target(&name).unwrap().clone();
23 if let Some(health_config) = target_config.health {
24 let interval_secs = health_config.interval_secs.unwrap_or(default_interval);
25 let pool = pool.clone();
26 let name = name.clone();
27 let label = target_config.label.clone();
28 let alerter = alerter.clone();
29 let cancel = cancel.clone();
30 let trending_config = health_config.trending.clone();
31
32 info!("{name}: health check every {interval_secs}s");
33
34 handles.push(tokio::spawn(async move {
35 let mut interval = tokio::time::interval(
36 std::time::Duration::from_secs(interval_secs),
37 );
38 let expect = health_config.expect.as_ref();
39 let mut in_drift = false;
40 interval.tick().await; // consume immediate first tick
41 loop {
42 tokio::select! {
43 _ = cancel.cancelled() => break,
44 _ = interval.tick() => {}
45 }
46 let previous = db::get_latest_health(&pool, &name).await.ok().flatten();
47 let snapshot = http::check_health(&name, &health_config, expect).await;
48 info!("{}: {} ({}ms)", name, snapshot.status, snapshot.response_time_ms);
49 if let Err(e) = db::insert_health_check(&pool, &snapshot).await {
50 tracing::error!("{name}: failed to store health check: {e}");
51 }
52
53 // Fire alerts on status transitions
54 if let Some(ref alerter) = alerter
55 && let Some(ref prev) = previous
56 && prev.status != snapshot.status
57 {
58 let from = prev.status.to_string();
59 let to = snapshot.status.to_string();
60 if snapshot.status == HealthStatus::Operational {
61 alerter.send_health_recovery(&name, &label, &from).await;
62 } else {
63 alerter.send_health_alert(
64 &name,
65 &label,
66 &from,
67 &to,
68 snapshot.error.as_deref(),
69 ).await;
70 }
71 }
72
73 // Track incidents on status transitions
74 if let Some(ref prev) = previous {
75 match incident_action(prev.status, snapshot.status) {
76 IncidentAction::None => {}
77 IncidentAction::Open => {
78 if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await {
79 tracing::error!("{name}: failed to open incident: {e}");
80 }
81 }
82 IncidentAction::Close => {
83 if let Err(e) = db::close_open_incidents(&pool, &name).await {
84 tracing::error!("{name}: failed to close incidents: {e}");
85 }
86 }
87 IncidentAction::CloseAndOpen => {
88 if let Err(e) = db::close_open_incidents(&pool, &name).await {
89 tracing::error!("{name}: failed to close incidents: {e}");
90 }
91 if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await {
92 tracing::error!("{name}: failed to open incident: {e}");
93 }
94 }
95 }
96 }
97
98 // Latency drift detection
99 if let Some(ref trending) = trending_config
100 && snapshot.status == HealthStatus::Operational
101 {
102 let baseline_cutoff = (chrono::Utc::now()
103 - chrono::Duration::hours(trending.baseline_window_hours as i64))
104 .to_rfc3339();
105 let baseline_data = db::get_response_times(&pool, &name, &baseline_cutoff)
106 .await
107 .unwrap_or_default();
108 let operational_times: Vec<i64> = baseline_data.iter()
109 .filter(|(_, ms)| *ms > 0)
110 .map(|(_, ms)| *ms)
111 .collect();
112 let baseline = LatencyStats::from_times(&operational_times);
113 let recent = db::get_recent_response_times(&pool, &name, 3)
114 .await
115 .unwrap_or_default();
116
117 if let Some(ref bl) = baseline {
118 if let Some(msg) = http::detect_latency_drift(&recent, bl, trending.spike_threshold) {
119 if !in_drift {
120 info!("{name}: {msg}");
121 if let Some(ref alerter) = alerter {
122 alerter.send_latency_drift_alert(&name, &label, &msg).await;
123 }
124 in_drift = true;
125 }
126 } else if in_drift {
127 info!("{name}: latency drift recovered");
128 if let Some(ref alerter) = alerter {
129 alerter.send_latency_recovery(&name, &label).await;
130 }
131 in_drift = false;
132 }
133 }
134 }
135 }
136 }));
137 }
138 }
139
140 handles
141 }
142