use tokio::task::JoinHandle; use tracing::info; use pom::alerts::Alerter; use pom::checks::routes; use pom::config::Config; use pom::db; pub(crate) fn spawn_route_tasks( config: &Config, pool: &sqlx::SqlitePool, cancel: &tokio_util::sync::CancellationToken, alerter: &Option, ) -> Vec> { let route_interval = config.serve.route_check_interval_secs; let mut handles = Vec::new(); for name in config.target_names() { let target_config = config.get_target(&name).unwrap().clone(); if target_config.expected_routes.is_empty() { continue; } let Some(ref health_config) = target_config.health else { continue }; let Some(base_url) = routes::base_url_from_health_url(&health_config.url) else { continue }; let route_paths = target_config.expected_routes.clone(); let timeout = std::time::Duration::from_secs(health_config.timeout_secs); let label = target_config.label.clone(); let pool = pool.clone(); let alerter = alerter.clone(); let n = route_paths.len(); let cancel = cancel.clone(); info!("{name}: route checks every {route_interval}s ({n} routes)"); handles.push(tokio::spawn(async move { // Prune stale route data from DB (paths removed from config) match db::prune_stale_routes(&pool, &name, &route_paths).await { Ok(0) => {} Ok(n) => info!("{name}: pruned {n} stale route check rows"), Err(e) => tracing::error!("{name}: failed to prune stale routes: {e}"), } let mut interval = tokio::time::interval( std::time::Duration::from_secs(route_interval), ); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let client = reqwest::Client::builder() .redirect(reqwest::redirect::Policy::none()) .build() .unwrap_or_default(); let mut prev_failed: std::collections::HashSet = std::collections::HashSet::new(); interval.tick().await; // consume immediate first tick loop { tokio::select! { _ = cancel.cancelled() => break, _ = interval.tick() => {} } let results = routes::check_routes(&client, &name, &base_url, &route_paths, timeout).await; for result in &results { if let Err(e) = db::insert_route_check(&pool, result).await { tracing::error!("{}: failed to store route check for {}: {e}", name, result.path); } } let current_failed: std::collections::HashSet = results .iter() .filter(|r| !r.ok) .map(|r| r.path.clone()) .collect(); let ok_count = results.iter().filter(|r| r.ok).count(); info!("{name}: routes {ok_count}/{n} OK"); if let Some(ref alerter) = alerter { // New failures let new_failures: Vec = current_failed .difference(&prev_failed) .cloned() .collect(); if !new_failures.is_empty() { alerter.send_route_failure_alert(&name, &label, &new_failures).await; } // Recoveries let recoveries: Vec = prev_failed .difference(¤t_failed) .cloned() .collect(); if !recoveries.is_empty() { alerter.send_route_recovery_alert(&name, &label, &recoveries).await; } } prev_failed = current_failed; } })); } handles }