Skip to main content

max / makenotwork

3.9 KB · 103 lines History Blame Raw
1 use tokio::task::JoinHandle;
2 use tracing::info;
3
4 use pom::alerts::Alerter;
5 use pom::checks::routes;
6 use pom::config::Config;
7 use pom::db;
8
9 pub(crate) fn spawn_route_tasks(
10 config: &Config,
11 pool: &sqlx::SqlitePool,
12 cancel: &tokio_util::sync::CancellationToken,
13 alerter: &Option<Alerter>,
14 ) -> Vec<JoinHandle<()>> {
15 let route_interval = config.serve.route_check_interval_secs;
16 let mut handles = Vec::new();
17
18 for name in config.target_names() {
19 let target_config = config.get_target(&name).unwrap().clone();
20 if target_config.expected_routes.is_empty() {
21 continue;
22 }
23 let Some(ref health_config) = target_config.health else { continue };
24 let Some(base_url) = routes::base_url_from_health_url(&health_config.url) else { continue };
25 let route_paths = target_config.expected_routes.clone();
26 let timeout = std::time::Duration::from_secs(health_config.timeout_secs);
27 let label = target_config.label.clone();
28 let pool = pool.clone();
29 let alerter = alerter.clone();
30 let n = route_paths.len();
31 let cancel = cancel.clone();
32
33 info!("{name}: route checks every {route_interval}s ({n} routes)");
34
35 handles.push(tokio::spawn(async move {
36 // Prune stale route data from DB (paths removed from config)
37 match db::prune_stale_routes(&pool, &name, &route_paths).await {
38 Ok(0) => {}
39 Ok(n) => info!("{name}: pruned {n} stale route check rows"),
40 Err(e) => tracing::error!("{name}: failed to prune stale routes: {e}"),
41 }
42
43 let mut interval = tokio::time::interval(
44 std::time::Duration::from_secs(route_interval),
45 );
46 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
47 let client = reqwest::Client::builder()
48 .redirect(reqwest::redirect::Policy::none())
49 .build()
50 .unwrap_or_default();
51 let mut prev_failed: std::collections::HashSet<String> = std::collections::HashSet::new();
52
53 interval.tick().await; // consume immediate first tick
54 loop {
55 tokio::select! {
56 _ = cancel.cancelled() => break,
57 _ = interval.tick() => {}
58 }
59 let results = routes::check_routes(&client, &name, &base_url, &route_paths, timeout).await;
60
61 for result in &results {
62 if let Err(e) = db::insert_route_check(&pool, result).await {
63 tracing::error!("{}: failed to store route check for {}: {e}", name, result.path);
64 }
65 }
66
67 let current_failed: std::collections::HashSet<String> = results
68 .iter()
69 .filter(|r| !r.ok)
70 .map(|r| r.path.clone())
71 .collect();
72
73 let ok_count = results.iter().filter(|r| r.ok).count();
74 info!("{name}: routes {ok_count}/{n} OK");
75
76 if let Some(ref alerter) = alerter {
77 // New failures
78 let new_failures: Vec<String> = current_failed
79 .difference(&prev_failed)
80 .cloned()
81 .collect();
82 if !new_failures.is_empty() {
83 alerter.send_route_failure_alert(&name, &label, &new_failures).await;
84 }
85
86 // Recoveries
87 let recoveries: Vec<String> = prev_failed
88 .difference(&current_failed)
89 .cloned()
90 .collect();
91 if !recoveries.is_empty() {
92 alerter.send_route_recovery_alert(&name, &label, &recoveries).await;
93 }
94 }
95
96 prev_failed = current_failed;
97 }
98 }));
99 }
100
101 handles
102 }
103