| 1 |
use tokio::task::JoinHandle; |
| 2 |
use tracing::info; |
| 3 |
|
| 4 |
use pom::alerts::Alerter; |
| 5 |
use pom::checks::{http, ssh_banner}; |
| 6 |
use pom::config::Config; |
| 7 |
use pom::db; |
| 8 |
use pom::types::{HealthStatus, LatencyStats}; |
| 9 |
|
| 10 |
use super::super::incident::{incident_action, IncidentAction}; |
| 11 |
|
| 12 |
pub(crate) fn spawn_health_tasks( |
| 13 |
config: &Config, |
| 14 |
pool: &sqlx::SqlitePool, |
| 15 |
cancel: &tokio_util::sync::CancellationToken, |
| 16 |
alerter: &Option<Alerter>, |
| 17 |
) -> Vec<JoinHandle<()>> { |
| 18 |
let default_interval = config.serve.interval_secs; |
| 19 |
let mut handles = Vec::new(); |
| 20 |
|
| 21 |
for name in config.target_names() { |
| 22 |
let target_config = config.get_target(&name).unwrap().clone(); |
| 23 |
if let Some(health_config) = target_config.health { |
| 24 |
let interval_secs = health_config.interval_secs.unwrap_or(default_interval); |
| 25 |
let pool = pool.clone(); |
| 26 |
let name = name.clone(); |
| 27 |
let label = target_config.label.clone(); |
| 28 |
let alerter = alerter.clone(); |
| 29 |
let cancel = cancel.clone(); |
| 30 |
let trending_config = health_config.trending.clone(); |
| 31 |
|
| 32 |
info!("{name}: health check every {interval_secs}s"); |
| 33 |
|
| 34 |
handles.push(tokio::spawn(async move { |
| 35 |
let mut interval = tokio::time::interval( |
| 36 |
std::time::Duration::from_secs(interval_secs), |
| 37 |
); |
| 38 |
let expect = health_config.expect.as_ref(); |
| 39 |
let mut in_drift = false; |
| 40 |
interval.tick().await; |
| 41 |
loop { |
| 42 |
tokio::select! { |
| 43 |
_ = cancel.cancelled() => break, |
| 44 |
_ = interval.tick() => {} |
| 45 |
} |
| 46 |
let previous = db::get_latest_health(&pool, &name).await.ok().flatten(); |
| 47 |
let snapshot = http::check_health(&name, &health_config, expect).await; |
| 48 |
info!("{}: {} ({}ms)", name, snapshot.status, snapshot.response_time_ms); |
| 49 |
if let Err(e) = db::insert_health_check(&pool, &snapshot).await { |
| 50 |
tracing::error!("{name}: failed to store health check: {e}"); |
| 51 |
} |
| 52 |
|
| 53 |
|
| 54 |
if let Some(ref alerter) = alerter |
| 55 |
&& let Some(ref prev) = previous |
| 56 |
&& prev.status != snapshot.status |
| 57 |
{ |
| 58 |
let from = prev.status.to_string(); |
| 59 |
let to = snapshot.status.to_string(); |
| 60 |
if snapshot.status == HealthStatus::Operational { |
| 61 |
alerter.send_health_recovery(&name, &label, &from).await; |
| 62 |
} else { |
| 63 |
alerter.send_health_alert( |
| 64 |
&name, |
| 65 |
&label, |
| 66 |
&from, |
| 67 |
&to, |
| 68 |
snapshot.error.as_deref(), |
| 69 |
).await; |
| 70 |
} |
| 71 |
} |
| 72 |
|
| 73 |
|
| 74 |
if let Some(ref prev) = previous { |
| 75 |
match incident_action(prev.status, snapshot.status) { |
| 76 |
IncidentAction::None => {} |
| 77 |
IncidentAction::Open => { |
| 78 |
if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await { |
| 79 |
tracing::error!("{name}: failed to open incident: {e}"); |
| 80 |
} |
| 81 |
} |
| 82 |
IncidentAction::Close => { |
| 83 |
if let Err(e) = db::close_open_incidents(&pool, &name).await { |
| 84 |
tracing::error!("{name}: failed to close incidents: {e}"); |
| 85 |
} |
| 86 |
} |
| 87 |
IncidentAction::CloseAndOpen => { |
| 88 |
if let Err(e) = db::close_open_incidents(&pool, &name).await { |
| 89 |
tracing::error!("{name}: failed to close incidents: {e}"); |
| 90 |
} |
| 91 |
if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await { |
| 92 |
tracing::error!("{name}: failed to open incident: {e}"); |
| 93 |
} |
| 94 |
} |
| 95 |
} |
| 96 |
} |
| 97 |
|
| 98 |
|
| 99 |
if let Some(ref trending) = trending_config |
| 100 |
&& snapshot.status == HealthStatus::Operational |
| 101 |
{ |
| 102 |
let baseline_cutoff = (chrono::Utc::now() |
| 103 |
- chrono::Duration::hours(trending.baseline_window_hours as i64)) |
| 104 |
.to_rfc3339(); |
| 105 |
let baseline_data = db::get_response_times(&pool, &name, &baseline_cutoff) |
| 106 |
.await |
| 107 |
.unwrap_or_default(); |
| 108 |
let operational_times: Vec<i64> = baseline_data.iter() |
| 109 |
.filter(|(_, ms)| *ms > 0) |
| 110 |
.map(|(_, ms)| *ms) |
| 111 |
.collect(); |
| 112 |
let baseline = LatencyStats::from_times(&operational_times); |
| 113 |
let recent = db::get_recent_response_times(&pool, &name, 3) |
| 114 |
.await |
| 115 |
.unwrap_or_default(); |
| 116 |
|
| 117 |
if let Some(ref bl) = baseline { |
| 118 |
if let Some(msg) = http::detect_latency_drift(&recent, bl, trending.spike_threshold) { |
| 119 |
if !in_drift { |
| 120 |
info!("{name}: {msg}"); |
| 121 |
if let Some(ref alerter) = alerter { |
| 122 |
alerter.send_latency_drift_alert(&name, &label, &msg).await; |
| 123 |
} |
| 124 |
in_drift = true; |
| 125 |
} |
| 126 |
} else if in_drift { |
| 127 |
info!("{name}: latency drift recovered"); |
| 128 |
if let Some(ref alerter) = alerter { |
| 129 |
alerter.send_latency_recovery(&name, &label).await; |
| 130 |
} |
| 131 |
in_drift = false; |
| 132 |
} |
| 133 |
} |
| 134 |
} |
| 135 |
} |
| 136 |
})); |
| 137 |
} |
| 138 |
} |
| 139 |
|
| 140 |
|
| 141 |
for name in config.target_names() { |
| 142 |
let target_config = config.get_target(&name).unwrap().clone(); |
| 143 |
if let Some(ssh_config) = target_config.ssh_banner { |
| 144 |
let interval_secs = default_interval; |
| 145 |
let pool = pool.clone(); |
| 146 |
let check_name = format!("{name}:ssh"); |
| 147 |
let label = target_config.label.clone(); |
| 148 |
let alerter = alerter.clone(); |
| 149 |
let cancel = cancel.clone(); |
| 150 |
|
| 151 |
info!("{check_name}: SSH banner check every {interval_secs}s"); |
| 152 |
|
| 153 |
handles.push(tokio::spawn(async move { |
| 154 |
let mut interval = tokio::time::interval( |
| 155 |
std::time::Duration::from_secs(interval_secs), |
| 156 |
); |
| 157 |
interval.tick().await; |
| 158 |
loop { |
| 159 |
tokio::select! { |
| 160 |
_ = cancel.cancelled() => break, |
| 161 |
_ = interval.tick() => {} |
| 162 |
} |
| 163 |
let previous = db::get_latest_health(&pool, &check_name).await.ok().flatten(); |
| 164 |
let snapshot = ssh_banner::check_ssh_banner(&check_name, &ssh_config).await; |
| 165 |
info!("{}: {} ({}ms)", check_name, snapshot.status, snapshot.response_time_ms); |
| 166 |
if let Err(e) = db::insert_health_check(&pool, &snapshot).await { |
| 167 |
tracing::error!("{check_name}: failed to store SSH banner check: {e}"); |
| 168 |
} |
| 169 |
|
| 170 |
if let Some(ref alerter) = alerter |
| 171 |
&& let Some(ref prev) = previous |
| 172 |
&& prev.status != snapshot.status |
| 173 |
{ |
| 174 |
let from = prev.status.to_string(); |
| 175 |
let to = snapshot.status.to_string(); |
| 176 |
if snapshot.status == HealthStatus::Operational { |
| 177 |
alerter.send_health_recovery(&check_name, &label, &from).await; |
| 178 |
} else { |
| 179 |
alerter.send_health_alert( |
| 180 |
&check_name, |
| 181 |
&label, |
| 182 |
&from, |
| 183 |
&to, |
| 184 |
snapshot.error.as_deref(), |
| 185 |
).await; |
| 186 |
} |
| 187 |
} |
| 188 |
} |
| 189 |
})); |
| 190 |
} |
| 191 |
} |
| 192 |
|
| 193 |
handles |
| 194 |
} |
| 195 |
|