| 1 |
use tokio::task::JoinHandle; |
| 2 |
use tracing::info; |
| 3 |
|
| 4 |
use pom::alerts::Alerter; |
| 5 |
use pom::checks::routes; |
| 6 |
use pom::config::Config; |
| 7 |
use pom::db; |
| 8 |
|
| 9 |
pub(crate) fn spawn_route_tasks( |
| 10 |
config: &Config, |
| 11 |
pool: &sqlx::SqlitePool, |
| 12 |
cancel: &tokio_util::sync::CancellationToken, |
| 13 |
alerter: &Option<Alerter>, |
| 14 |
) -> Vec<JoinHandle<()>> { |
| 15 |
let route_interval = config.serve.route_check_interval_secs; |
| 16 |
let mut handles = Vec::new(); |
| 17 |
|
| 18 |
for name in config.target_names() { |
| 19 |
let target_config = config.get_target(&name).unwrap().clone(); |
| 20 |
if target_config.expected_routes.is_empty() { |
| 21 |
continue; |
| 22 |
} |
| 23 |
let Some(ref health_config) = target_config.health else { continue }; |
| 24 |
let Some(base_url) = routes::base_url_from_health_url(&health_config.url) else { continue }; |
| 25 |
let route_paths = target_config.expected_routes.clone(); |
| 26 |
let timeout = std::time::Duration::from_secs(health_config.timeout_secs); |
| 27 |
let label = target_config.label.clone(); |
| 28 |
let pool = pool.clone(); |
| 29 |
let alerter = alerter.clone(); |
| 30 |
let n = route_paths.len(); |
| 31 |
let cancel = cancel.clone(); |
| 32 |
|
| 33 |
info!("{name}: route checks every {route_interval}s ({n} routes)"); |
| 34 |
|
| 35 |
handles.push(tokio::spawn(async move { |
| 36 |
|
| 37 |
match db::prune_stale_routes(&pool, &name, &route_paths).await { |
| 38 |
Ok(0) => {} |
| 39 |
Ok(n) => info!("{name}: pruned {n} stale route check rows"), |
| 40 |
Err(e) => tracing::error!("{name}: failed to prune stale routes: {e}"), |
| 41 |
} |
| 42 |
|
| 43 |
let mut interval = tokio::time::interval( |
| 44 |
std::time::Duration::from_secs(route_interval), |
| 45 |
); |
| 46 |
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); |
| 47 |
let client = reqwest::Client::builder() |
| 48 |
.redirect(reqwest::redirect::Policy::none()) |
| 49 |
.build() |
| 50 |
.unwrap_or_default(); |
| 51 |
let mut prev_failed: std::collections::HashSet<String> = std::collections::HashSet::new(); |
| 52 |
|
| 53 |
interval.tick().await; |
| 54 |
loop { |
| 55 |
tokio::select! { |
| 56 |
_ = cancel.cancelled() => break, |
| 57 |
_ = interval.tick() => {} |
| 58 |
} |
| 59 |
let results = routes::check_routes(&client, &name, &base_url, &route_paths, timeout).await; |
| 60 |
|
| 61 |
for result in &results { |
| 62 |
if let Err(e) = db::insert_route_check(&pool, result).await { |
| 63 |
tracing::error!("{}: failed to store route check for {}: {e}", name, result.path); |
| 64 |
} |
| 65 |
} |
| 66 |
|
| 67 |
let current_failed: std::collections::HashSet<String> = results |
| 68 |
.iter() |
| 69 |
.filter(|r| !r.ok) |
| 70 |
.map(|r| r.path.clone()) |
| 71 |
.collect(); |
| 72 |
|
| 73 |
let ok_count = results.iter().filter(|r| r.ok).count(); |
| 74 |
info!("{name}: routes {ok_count}/{n} OK"); |
| 75 |
|
| 76 |
if let Some(ref alerter) = alerter { |
| 77 |
|
| 78 |
let new_failures: Vec<String> = current_failed |
| 79 |
.difference(&prev_failed) |
| 80 |
.cloned() |
| 81 |
.collect(); |
| 82 |
if !new_failures.is_empty() { |
| 83 |
alerter.send_route_failure_alert(&name, &label, &new_failures).await; |
| 84 |
} |
| 85 |
|
| 86 |
|
| 87 |
let recoveries: Vec<String> = prev_failed |
| 88 |
.difference(¤t_failed) |
| 89 |
.cloned() |
| 90 |
.collect(); |
| 91 |
if !recoveries.is_empty() { |
| 92 |
alerter.send_route_recovery_alert(&name, &label, &recoveries).await; |
| 93 |
} |
| 94 |
} |
| 95 |
|
| 96 |
prev_failed = current_failed; |
| 97 |
} |
| 98 |
})); |
| 99 |
} |
| 100 |
|
| 101 |
handles |
| 102 |
} |
| 103 |
|