use tokio::task::JoinHandle; use tracing::info; use pom::alerts::Alerter; use pom::checks::{http, ssh_banner}; use pom::config::Config; use pom::db; use pom::types::{HealthStatus, LatencyStats}; use super::super::incident::{incident_action, IncidentAction}; pub(crate) fn spawn_health_tasks( config: &Config, pool: &sqlx::SqlitePool, cancel: &tokio_util::sync::CancellationToken, alerter: &Option, ) -> Vec> { let default_interval = config.serve.interval_secs; let mut handles = Vec::new(); for name in config.target_names() { let target_config = config.get_target(&name).unwrap().clone(); if let Some(health_config) = target_config.health { let interval_secs = health_config.interval_secs.unwrap_or(default_interval); let pool = pool.clone(); let name = name.clone(); let label = target_config.label.clone(); let alerter = alerter.clone(); let cancel = cancel.clone(); let trending_config = health_config.trending.clone(); info!("{name}: health check every {interval_secs}s"); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(interval_secs), ); let expect = health_config.expect.as_ref(); let mut in_drift = false; interval.tick().await; // consume immediate first tick loop { tokio::select! { _ = cancel.cancelled() => break, _ = interval.tick() => {} } let previous = db::get_latest_health(&pool, &name).await.ok().flatten(); let snapshot = http::check_health(&name, &health_config, expect).await; info!("{}: {} ({}ms)", name, snapshot.status, snapshot.response_time_ms); if let Err(e) = db::insert_health_check(&pool, &snapshot).await { tracing::error!("{name}: failed to store health check: {e}"); } // Fire alerts on status transitions if let Some(ref alerter) = alerter && let Some(ref prev) = previous && prev.status != snapshot.status { let from = prev.status.to_string(); let to = snapshot.status.to_string(); if snapshot.status == HealthStatus::Operational { alerter.send_health_recovery(&name, &label, &from).await; } else { alerter.send_health_alert( &name, &label, &from, &to, snapshot.error.as_deref(), ).await; } } // Track incidents on status transitions if let Some(ref prev) = previous { match incident_action(prev.status, snapshot.status) { IncidentAction::None => {} IncidentAction::Open => { if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await { tracing::error!("{name}: failed to open incident: {e}"); } } IncidentAction::Close => { if let Err(e) = db::close_open_incidents(&pool, &name).await { tracing::error!("{name}: failed to close incidents: {e}"); } } IncidentAction::CloseAndOpen => { if let Err(e) = db::close_open_incidents(&pool, &name).await { tracing::error!("{name}: failed to close incidents: {e}"); } if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await { tracing::error!("{name}: failed to open incident: {e}"); } } } } // Latency drift detection if let Some(ref trending) = trending_config && snapshot.status == HealthStatus::Operational { let baseline_cutoff = (chrono::Utc::now() - chrono::Duration::hours(trending.baseline_window_hours as i64)) .to_rfc3339(); let baseline_data = db::get_response_times(&pool, &name, &baseline_cutoff) .await .unwrap_or_default(); let operational_times: Vec = baseline_data.iter() .filter(|(_, ms)| *ms > 0) .map(|(_, ms)| *ms) .collect(); let baseline = LatencyStats::from_times(&operational_times); let recent = db::get_recent_response_times(&pool, &name, 3) .await .unwrap_or_default(); if let Some(ref bl) = baseline { if let Some(msg) = http::detect_latency_drift(&recent, bl, trending.spike_threshold) { if !in_drift { info!("{name}: {msg}"); if let Some(ref alerter) = alerter { alerter.send_latency_drift_alert(&name, &label, &msg).await; } in_drift = true; } } else if in_drift { info!("{name}: latency drift recovered"); if let Some(ref alerter) = alerter { alerter.send_latency_recovery(&name, &label).await; } in_drift = false; } } } } })); } } // SSH banner checks — stored as health check entries with target name "{name}:ssh" for name in config.target_names() { let target_config = config.get_target(&name).unwrap().clone(); if let Some(ssh_config) = target_config.ssh_banner { let interval_secs = default_interval; let pool = pool.clone(); let check_name = format!("{name}:ssh"); let label = target_config.label.clone(); let alerter = alerter.clone(); let cancel = cancel.clone(); info!("{check_name}: SSH banner check every {interval_secs}s"); handles.push(tokio::spawn(async move { let mut interval = tokio::time::interval( std::time::Duration::from_secs(interval_secs), ); interval.tick().await; loop { tokio::select! { _ = cancel.cancelled() => break, _ = interval.tick() => {} } let previous = db::get_latest_health(&pool, &check_name).await.ok().flatten(); let snapshot = ssh_banner::check_ssh_banner(&check_name, &ssh_config).await; info!("{}: {} ({}ms)", check_name, snapshot.status, snapshot.response_time_ms); if let Err(e) = db::insert_health_check(&pool, &snapshot).await { tracing::error!("{check_name}: failed to store SSH banner check: {e}"); } if let Some(ref alerter) = alerter && let Some(ref prev) = previous && prev.status != snapshot.status { let from = prev.status.to_string(); let to = snapshot.status.to_string(); if snapshot.status == HealthStatus::Operational { alerter.send_health_recovery(&check_name, &label, &from).await; } else { alerter.send_health_alert( &check_name, &label, &from, &to, snapshot.error.as_deref(), ).await; } } } })); } } handles }