Skip to main content

max / pom

Split cli.rs into cli/ directory module, add dashboard, test and monitoring enhancements (v0.2.6) Break 1035-line cli.rs into cli/{mod,serve,status,incident}.rs and cli/tasks/{health,tls,routes,dns,whois,prune,meta_alert}.rs. Each background task loop is now a standalone spawn function. Also includes test duration drift detection, SSH test detail parsing, WHOIS monitoring, dashboard improvements, and expanded integration test coverage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-03-17 02:07 UTC
Commit: 3e1314507e60b08463ae0223fa996c279cac3bd3
Parent: c0f3d59
29 files changed, +2318 insertions, -582 deletions
M Cargo.lock +2 -1
@@ -1665,7 +1665,7 @@ checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
1665 1665
1666 1666 [[package]]
1667 1667 name = "pom"
1668 - version = "0.2.4"
1668 + version = "0.2.5"
1669 1669 dependencies = [
1670 1670 "axum",
1671 1671 "chrono",
@@ -1682,6 +1682,7 @@ dependencies = [
1682 1682 "serde",
1683 1683 "serde_json",
1684 1684 "sqlx",
1685 + "subtle",
1685 1686 "thiserror 2.0.18",
1686 1687 "tokio",
1687 1688 "tokio-rustls",
M Cargo.toml +4 -1
@@ -1,6 +1,6 @@
1 1 [package]
2 2 name = "pom"
3 - version = "0.2.5"
3 + version = "0.2.6"
4 4 edition = "2024"
5 5 license-file = "LICENSE"
6 6
@@ -39,6 +39,9 @@ schemars = "0.8"
39 39 # Config
40 40 toml = "0.8"
41 41
42 + # Constant-time comparison
43 + subtle = "2"
44 +
42 45 # Errors
43 46 thiserror = "2"
44 47
@@ -69,6 +69,12 @@ json_fields = { "status" = "operational" }
69 69 [targets.mt.tls]
70 70 host = "forums.makenot.work"
71 71
72 + [targets.mt.tests]
73 + ssh = "max@100.106.221.39"
74 + command = "cd /home/max/staging/multithreaded && cargo test --workspace 2>&1"
75 + timeout_secs = 300
76 + staleness_days = 7
77 +
72 78 [targets.htpy]
73 79 label = "htpy.app"
74 80
@@ -69,6 +69,12 @@ json_fields = { "status" = "operational" }
69 69 [targets.mt.tls]
70 70 host = "forums.makenot.work"
71 71
72 + [targets.mt.tests]
73 + ssh = "max@100.106.221.39"
74 + command = "cd /home/max/staging/multithreaded && cargo test --workspace 2>&1"
75 + timeout_secs = 300
76 + staleness_days = 7
77 +
72 78 [targets.htpy]
73 79 label = "htpy.app"
74 80
M src/alerts.rs +150
@@ -450,6 +450,77 @@ impl Alerter {
450 450 self.record_alert(&alert_key, "latency_recovery", None, None, None).await;
451 451 }
452 452
453 + #[instrument(skip_all)]
454 + pub async fn send_test_duration_drift_alert(
455 + &self,
456 + target: &str,
457 + label: &str,
458 + drift_message: &str,
459 + ) {
460 + let alert_key = format!("test_duration:{target}");
461 + if self.is_within_cooldown(&alert_key).await {
462 + info!("alert cooldown active for {alert_key}, skipping");
463 + return;
464 + }
465 +
466 + let subject = format!("[PoM] {target}: test duration drift detected");
467 + let body = format!(
468 + "Target: {label} ({target})\n\
469 + {drift_message}\n\
470 + Instance: {}\n\
471 + Time: {}\n\n\
472 + - PoM",
473 + self.instance_name,
474 + chrono::Utc::now().to_rfc3339(),
475 + );
476 +
477 + self.send_email(&subject, &body).await;
478 + self.record_alert(&alert_key, "test_duration_drift", None, None, Some(drift_message)).await;
479 + }
480 +
481 + /// All monitored targets are unreachable — likely a network issue with PoM itself.
482 + #[instrument(skip_all)]
483 + pub async fn send_monitoring_offline_alert(&self, target_count: usize) {
484 + let alert_key = "monitoring:self";
485 + if self.is_within_cooldown(alert_key).await {
486 + info!("alert cooldown active for {alert_key}, skipping");
487 + return;
488 + }
489 +
490 + let subject = format!("[PoM] all {target_count} targets unreachable");
491 + let body = format!(
492 + "All {target_count} monitored targets are non-operational.\n\
493 + This likely indicates a network issue with the PoM instance itself,\n\
494 + not an actual outage of all targets.\n\n\
495 + Instance: {}\n\
496 + Time: {}\n\n\
497 + - PoM",
498 + self.instance_name,
499 + chrono::Utc::now().to_rfc3339(),
500 + );
501 +
502 + self.send_email(&subject, &body).await;
503 + self.record_alert(alert_key, "monitoring_offline", None, None, None).await;
504 + }
505 +
506 + /// At least one target is reachable again after a monitoring-offline event.
507 + #[instrument(skip_all)]
508 + pub async fn send_monitoring_recovery(&self) {
509 + let alert_key = "monitoring:self";
510 + let subject = "[PoM] monitoring recovered".to_string();
511 + let body = format!(
512 + "At least one target is reachable again.\n\
513 + Instance: {}\n\
514 + Time: {}\n\n\
515 + - PoM",
516 + self.instance_name,
517 + chrono::Utc::now().to_rfc3339(),
518 + );
519 +
520 + self.send_email(&subject, &body).await;
521 + self.record_alert(alert_key, "monitoring_recovery", None, None, None).await;
522 + }
523 +
453 524 async fn is_within_cooldown(&self, target: &str) -> bool {
454 525 let latest = match db::get_latest_alert_for_target(&self.pool, target).await {
455 526 Ok(Some(row)) => row,
@@ -662,4 +733,83 @@ mod tests {
662 733 // Different target should NOT be in cooldown
663 734 assert!(!alerter.is_within_cooldown("health:other.com").await);
664 735 }
736 +
737 + #[tokio::test]
738 + async fn tls_expiry_alert_cooldown_key() {
739 + let pool = db::connect_in_memory().await.unwrap();
740 + let alerter = test_alerter(pool.clone());
741 +
742 + assert!(!alerter.is_within_cooldown("tls:mnw").await);
743 + alerter.send_tls_expiry_alert("mnw", "makenot.work", 10, "2026-04-01T00:00:00Z").await;
744 + assert!(alerter.is_within_cooldown("tls:mnw").await);
745 + }
746 +
747 + #[tokio::test]
748 + async fn tls_error_alert_cooldown_key() {
749 + let pool = db::connect_in_memory().await.unwrap();
750 + let alerter = test_alerter(pool.clone());
751 +
752 + assert!(!alerter.is_within_cooldown("tls:mnw").await);
753 + alerter.send_tls_error_alert("mnw", "makenot.work", "certificate expired").await;
754 + assert!(alerter.is_within_cooldown("tls:mnw").await);
755 + }
756 +
757 + #[tokio::test]
758 + async fn latency_drift_alert_cooldown_key() {
759 + let pool = db::connect_in_memory().await.unwrap();
760 + let alerter = test_alerter(pool.clone());
761 +
762 + assert!(!alerter.is_within_cooldown("latency:mnw").await);
763 + alerter.send_latency_drift_alert("mnw", "MakeNotWork", "avg 500ms, baseline 100ms").await;
764 + assert!(alerter.is_within_cooldown("latency:mnw").await);
765 + }
766 +
767 + #[tokio::test]
768 + async fn test_duration_drift_alert_cooldown_key() {
769 + let pool = db::connect_in_memory().await.unwrap();
770 + let alerter = test_alerter(pool.clone());
771 +
772 + assert!(!alerter.is_within_cooldown("test_duration:mnw").await);
773 + alerter.send_test_duration_drift_alert("mnw", "MakeNotWork", "drift: 120s vs 60s baseline").await;
774 + assert!(alerter.is_within_cooldown("test_duration:mnw").await);
775 + }
776 +
777 + #[tokio::test]
778 + async fn monitoring_offline_alert_cooldown_key() {
779 + let pool = db::connect_in_memory().await.unwrap();
780 + let alerter = test_alerter(pool.clone());
781 +
782 + assert!(!alerter.is_within_cooldown("monitoring:self").await);
783 + alerter.send_monitoring_offline_alert(3).await;
784 + assert!(alerter.is_within_cooldown("monitoring:self").await);
785 + }
786 +
787 + #[tokio::test]
788 + async fn route_recovery_does_not_start_cooldown() {
789 + let pool = db::connect_in_memory().await.unwrap();
790 + let alerter = test_alerter(pool.clone());
791 +
792 + alerter.send_route_recovery_alert("mnw", "MakeNotWork", &["/health".to_string()]).await;
793 + // Recovery alerts are excluded from cooldown lookups, so sending a recovery
794 + // should NOT put the key into cooldown.
795 + assert!(!alerter.is_within_cooldown("route:mnw").await);
796 + }
797 +
798 + #[tokio::test]
799 + async fn dns_recovery_does_not_start_cooldown() {
800 + let pool = db::connect_in_memory().await.unwrap();
801 + let alerter = test_alerter(pool.clone());
802 +
803 + alerter.send_dns_recovery_alert("mnw", "MakeNotWork").await;
804 + assert!(!alerter.is_within_cooldown("dns:mnw").await);
805 + }
806 +
807 + #[tokio::test]
808 + async fn tls_recovery_does_not_start_cooldown() {
809 + let pool = db::connect_in_memory().await.unwrap();
810 + let alerter = test_alerter(pool.clone());
811 +
812 + alerter.send_tls_recovery("mnw", "MakeNotWork", 90).await;
813 + assert!(!alerter.is_within_cooldown("tls:mnw").await);
814 + }
665 815 }
M src/api.rs +18 -2
@@ -12,7 +12,7 @@ use axum::routing::get;
12 12 use axum::{Json, Router};
13 13 use serde::Serialize;
14 14
15 - use crate::checks::http::compute_test_staleness;
15 + use crate::checks::http::{compute_test_staleness, detect_test_duration_drift};
16 16 use crate::config::Config;
17 17 use crate::db;
18 18 use crate::peer::SharedMeshState;
@@ -92,7 +92,9 @@ async fn require_bearer_token(
92 92 match auth_header {
93 93 Some(header) if header.starts_with("Bearer ") => {
94 94 let token = &header[7..];
95 - if token == expected {
95 + // Constant-time comparison to prevent timing side-channels
96 + use subtle::ConstantTimeEq;
97 + if token.as_bytes().ct_eq(expected.as_bytes()).into() {
96 98 Ok(next.run(req).await)
97 99 } else {
98 100 Err((StatusCode::UNAUTHORIZED, Json(serde_json::json!({
@@ -192,6 +194,9 @@ struct TargetStatus {
192 194 /// Latest WHOIS check result. Omitted if no WHOIS monitoring is configured.
193 195 #[serde(skip_serializing_if = "Option::is_none")]
194 196 whois: Option<db::WhoisCheckRow>,
197 + /// Test duration drift warning. Omitted if no drift detected or no test config.
198 + #[serde(skip_serializing_if = "Option::is_none")]
199 + test_duration_drift: Option<String>,
195 200 }
196 201
197 202 #[derive(Serialize)]
@@ -310,6 +315,16 @@ async fn build_target_status(
310 315 None
311 316 };
312 317
318 + // Compute test duration drift for targets with test config
319 + let test_duration_drift = if config.get_target(name).and_then(|t| t.tests.as_ref()).is_some() {
320 + let durations = db::get_test_durations(pool, name, 13)
321 + .await
322 + .unwrap_or_default();
323 + detect_test_duration_drift(&durations, 10, 3, 1.5)
324 + } else {
325 + None
326 + };
327 +
313 328 let current_incident = db::get_open_incident(pool, name)
314 329 .await
315 330 .unwrap_or(None);
@@ -360,6 +375,7 @@ async fn build_target_status(
360 375 latency_24h,
361 376 tls,
362 377 test_staleness,
378 + test_duration_drift,
363 379 current_incident,
364 380 incidents,
365 381 route_status,
M src/checks/http.rs +55 -14
@@ -28,20 +28,20 @@ pub async fn check_health(
28 28 let status_code = response.status().as_u16();
29 29
30 30 // Reject responses that declare a content-length exceeding our limit.
31 - if let Some(len) = response.content_length() {
32 - if len > MAX_RESPONSE_BYTES {
33 - return HealthSnapshot {
34 - id: None,
35 - target: target_name.to_string(),
36 - status: HealthStatus::Degraded,
37 - checked_at,
38 - response_time_ms,
39 - details: None,
40 - error: Some(format!(
41 - "Response body too large: {len} bytes (limit: {MAX_RESPONSE_BYTES} bytes)"
42 - )),
43 - };
44 - }
31 + if let Some(len) = response.content_length()
32 + && len > MAX_RESPONSE_BYTES
33 + {
34 + return HealthSnapshot {
35 + id: None,
36 + target: target_name.to_string(),
37 + status: HealthStatus::Degraded,
38 + checked_at,
39 + response_time_ms,
40 + details: None,
41 + error: Some(format!(
42 + "Response body too large: {len} bytes (limit: {MAX_RESPONSE_BYTES} bytes)"
43 + )),
44 + };
45 45 }
46 46
47 47 // Read body with size cap (handles chunked/streaming responses without content-length).
@@ -241,6 +241,47 @@ pub fn detect_latency_drift(
241 241 }
242 242 }
243 243
244 + /// Detect sustained test duration drift by checking if all recent durations
245 + /// exceed the baseline average by the given threshold multiplier.
246 + ///
247 + /// Returns a description string if drift is detected, `None` otherwise.
248 + /// Requires at least `baseline_count` samples for the baseline window.
249 + pub fn detect_test_duration_drift(
250 + durations: &[(String, i64)],
251 + baseline_count: usize,
252 + recent_count: usize,
253 + threshold: f64,
254 + ) -> Option<String> {
255 + if durations.len() < baseline_count + recent_count {
256 + return None;
257 + }
258 +
259 + // durations are ordered most recent first from get_test_durations
260 + let recent = &durations[..recent_count];
261 + let baseline = &durations[recent_count..];
262 +
263 + if baseline.is_empty() {
264 + return None;
265 + }
266 +
267 + let baseline_avg = baseline.iter().map(|(_, d)| *d).sum::<i64>() as f64 / baseline.len() as f64;
268 + let drift_threshold = baseline_avg * threshold;
269 +
270 + let all_over = recent.iter().all(|(_, d)| *d as f64 > drift_threshold);
271 + if all_over {
272 + let recent_avg = recent.iter().map(|(_, d)| *d).sum::<i64>() as f64 / recent.len() as f64;
273 + Some(format!(
274 + "test duration drift: last {} runs avg {:.0}s (baseline avg {:.0}s, threshold {:.0}s)",
275 + recent_count,
276 + recent_avg,
277 + baseline_avg,
278 + drift_threshold,
279 + ))
280 + } else {
281 + None
282 + }
283 + }
284 +
244 285 /// Compute test staleness from version and timing data.
245 286 ///
246 287 /// A target's tests are considered stale when:
@@ -1,10 +1,11 @@
1 - use crate::types::{StepResult, TestSummary};
1 + use crate::types::{StepResult, TestDetail, TestSummary};
2 2
3 3 /// Parse run-ci.sh output into a structured TestSummary.
4 4 ///
5 5 /// Looks for:
6 6 /// - `PASS <step name>` / `FAIL <step name>` lines from the CI summary
7 7 /// - `test result: ok. N passed; M failed` lines from cargo test
8 + /// - `test <name> ... ok` / `test <name> ... FAILED` individual test lines
8 9 pub fn parse_ci_output(output: &str) -> TestSummary {
9 10 let mut steps = Vec::new();
10 11 let mut total_passed: i64 = 0;
@@ -38,11 +39,44 @@ pub fn parse_ci_output(output: &str) -> TestSummary {
38 39 }
39 40 }
40 41
42 + let details = parse_individual_tests(output);
43 +
41 44 TestSummary {
42 45 steps,
43 46 total_passed: if found_test_results { Some(total_passed) } else { None },
44 47 total_failed: if found_test_results { Some(total_failed) } else { None },
48 + details,
49 + }
50 + }
51 +
52 + /// Parse individual test result lines from cargo test output.
53 + ///
54 + /// Matches lines like:
55 + /// - `test workflows::endorsements::toggle ... ok`
56 + /// - `test markdown::tests::bold_and_italic ... FAILED`
57 + pub fn parse_individual_tests(output: &str) -> Vec<TestDetail> {
58 + let mut details = Vec::new();
59 +
60 + for line in output.lines() {
61 + let trimmed = line.trim();
62 +
63 + // Match: "test <name> ... ok" or "test <name> ... FAILED"
64 + if let Some(rest) = trimmed.strip_prefix("test ") {
65 + if let Some(name) = rest.strip_suffix(" ... ok") {
66 + details.push(TestDetail {
67 + test_name: name.to_string(),
68 + passed: true,
69 + });
70 + } else if let Some(name) = rest.strip_suffix(" ... FAILED") {
71 + details.push(TestDetail {
72 + test_name: name.to_string(),
73 + passed: false,
74 + });
75 + }
76 + }
45 77 }
78 +
79 + details
46 80 }
47 81
48 82 fn parse_test_result_line(line: &str) -> Option<(i64, i64)> {
@@ -137,6 +171,67 @@ All steps passed.
137 171 }
138 172
139 173 #[test]
174 + fn parse_individual_tests_ok_and_failed() {
175 + let output = r#"
176 + running 4 tests
177 + test workflows::endorsements::toggle_endorsement_removes ... ok
178 + test workflows::endorsements::self_endorse_rejected ... ok
179 + test markdown::tests::bold_and_italic ... ok
180 + test workflows::endorsements::endorse_post_happy_path ... FAILED
181 +
182 + test result: FAILED. 3 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out
183 + "#;
184 + let details = parse_individual_tests(output);
185 + assert_eq!(details.len(), 4);
186 + assert!(details[0].passed);
187 + assert_eq!(details[0].test_name, "workflows::endorsements::toggle_endorsement_removes");
188 + assert!(details[1].passed);
189 + assert!(!details[3].passed);
190 + assert_eq!(details[3].test_name, "workflows::endorsements::endorse_post_happy_path");
191 + }
192 +
193 + #[test]
194 + fn parse_individual_tests_empty_when_no_individual_lines() {
195 + let output = r#"
196 + ========================================
197 + CI Summary
198 + ========================================
199 +
200 + PASS cargo check
201 + PASS cargo test --lib
202 + "#;
203 + let details = parse_individual_tests(output);
204 + assert!(details.is_empty());
205 + }
206 +
207 + #[test]
208 + fn parse_ci_output_includes_details() {
209 + let output = r#"
210 + ========================================
211 + cargo test --lib
212 + ========================================
213 +
214 + running 2 tests
215 + test foo::bar ... ok
216 + test foo::baz ... FAILED
217 +
218 + test result: FAILED. 1 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out
219 +
220 + ========================================
221 + CI Summary
222 + ========================================
223 +
224 + FAIL cargo test --lib
225 + "#;
226 + let summary = parse_ci_output(output);
227 + assert_eq!(summary.details.len(), 2);
228 + assert!(summary.details[0].passed);
229 + assert!(!summary.details[1].passed);
230 + assert_eq!(summary.total_passed, Some(1));
231 + assert_eq!(summary.total_failed, Some(1));
232 + }
233 +
234 + #[test]
140 235 fn parse_test_result_line_ok() {
141 236 let line = "test result: ok. 42 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out";
142 237 let (p, f) = parse_test_result_line(line).unwrap();
M src/checks/ssh.rs +91 -23
@@ -5,6 +5,12 @@ use crate::checks::parse;
5 5 use crate::config::TestsConfig;
6 6 use crate::types::{TestRun, TestSummary};
7 7
8 + /// Returns `true` if every character in `filter` is in `[a-zA-Z0-9_:-]`.
9 + /// An empty string is considered valid (no characters to reject).
10 + pub fn validate_test_filter(filter: &str) -> bool {
11 + filter.chars().all(|c| c.is_alphanumeric() || c == '_' || c == ':' || c == '-')
12 + }
13 +
8 14 #[instrument(skip_all)]
9 15 pub async fn run_tests(
10 16 target_name: &str,
@@ -16,29 +22,30 @@ pub async fn run_tests(
16 22
17 23 // Validate filter characters before appending to SSH command.
18 24 // Only allow alphanumeric, underscore, colon, dash — covers all valid Rust test filter patterns.
19 - if let Some(f) = filter {
20 - if !f.chars().all(|c| c.is_alphanumeric() || c == '_' || c == ':' || c == '-') {
21 - let finished_at = chrono::Utc::now().to_rfc3339();
22 - let duration_secs = start.elapsed().as_secs() as i64;
23 - return TestRun {
24 - id: None,
25 - target: target_name.to_string(),
26 - started_at,
27 - finished_at: Some(finished_at),
28 - duration_secs: Some(duration_secs),
29 - exit_code: None,
30 - passed: false,
31 - summary: TestSummary {
32 - steps: vec![],
33 - total_passed: None,
34 - total_failed: None,
35 - },
36 - raw_output: format!(
37 - "Invalid filter: contains characters outside [a-zA-Z0-9_:-]. Got: {f}"
38 - ),
39 - filter: Some(f.to_string()),
40 - };
41 - }
25 + if let Some(f) = filter
26 + && !validate_test_filter(f)
27 + {
28 + let finished_at = chrono::Utc::now().to_rfc3339();
29 + let duration_secs = start.elapsed().as_secs() as i64;
30 + return TestRun {
31 + id: None,
32 + target: target_name.to_string(),
33 + started_at,
34 + finished_at: Some(finished_at),
35 + duration_secs: Some(duration_secs),
36 + exit_code: None,
37 + passed: false,
38 + summary: TestSummary {
39 + steps: vec![],
40 + total_passed: None,
41 + total_failed: None,
42 + details: vec![],
43 + },
44 + raw_output: format!(
45 + "Invalid filter: contains characters outside [a-zA-Z0-9_:-]. Got: {f}"
46 + ),
47 + filter: Some(f.to_string()),
48 + };
42 49 }
43 50
44 51 let mut cmd_str = config.command.clone();
@@ -96,9 +103,70 @@ pub async fn run_tests(
96 103 steps: vec![],
97 104 total_passed: None,
98 105 total_failed: None,
106 + details: vec![],
99 107 },
100 108 raw_output: format!("SSH connection failed: {e}"),
101 109 filter: filter.map(String::from),
102 110 },
103 111 }
104 112 }
113 +
114 + #[cfg(test)]
115 + mod tests {
116 + use super::*;
117 +
118 + #[test]
119 + fn validate_test_filter_valid_simple() {
120 + assert!(validate_test_filter("foo"));
121 + }
122 +
123 + #[test]
124 + fn validate_test_filter_valid_module_path() {
125 + assert!(validate_test_filter("foo::bar"));
126 + }
127 +
128 + #[test]
129 + fn validate_test_filter_valid_underscore() {
130 + assert!(validate_test_filter("foo_bar"));
131 + }
132 +
133 + #[test]
134 + fn validate_test_filter_valid_dash() {
135 + assert!(validate_test_filter("foo-bar"));
136 + }
137 +
138 + #[test]
139 + fn validate_test_filter_valid_alphanumeric() {
140 + assert!(validate_test_filter("a123"));
141 + }
142 +
143 + #[test]
144 + fn validate_test_filter_empty_is_valid() {
145 + assert!(validate_test_filter(""));
146 + }
147 +
148 + #[test]
149 + fn validate_test_filter_rejects_semicolon() {
150 + assert!(!validate_test_filter("foo;rm"));
151 + }
152 +
153 + #[test]
154 + fn validate_test_filter_rejects_ampersand() {
155 + assert!(!validate_test_filter("foo && bar"));
156 + }
157 +
158 + #[test]
159 + fn validate_test_filter_rejects_pipe() {
160 + assert!(!validate_test_filter("foo|bar"));
161 + }
162 +
163 + #[test]
164 + fn validate_test_filter_rejects_subshell() {
165 + assert!(!validate_test_filter("$(cmd)"));
166 + }
167 +
168 + #[test]
169 + fn validate_test_filter_rejects_space() {
170 + assert!(!validate_test_filter("foo bar"));
171 + }
172 + }
D src/cli.rs -500
@@ -1,871 +0,0 @@
1 - //! CLI command handlers for PoM subcommands.
2 -
3 - use clap::Subcommand;
4 - use tracing::info;
5 -
6 - use pom::alerts::Alerter;
7 - use pom::checks::{dns, http, routes, ssh, tls, whois};
8 - use pom::config::Config;
9 - use pom::db;
10 - use pom::display;
11 - use pom::error::{PomError, Result};
12 - use pom::peer;
13 - use pom::types::LatencyStats;
14 - use pom::types::TestStaleness;
15 -
16 - #[derive(Subcommand)]
17 - pub(crate) enum HistoryKind {
18 - /// Health check history
19 - Health {
20 - /// Filter by target
21 - target: Option<String>,
22 - /// Number of results
23 - #[arg(short, default_value = "10")]
24 - n: i64,
25 - /// Output as JSON
26 - #[arg(long)]
27 - json: bool,
28 - },
29 - /// Test run history
30 - Tests {
31 - /// Filter by target
32 - target: Option<String>,
33 - /// Number of results
34 - #[arg(short, default_value = "10")]
35 - n: i64,
36 - /// Output as JSON
37 - #[arg(long)]
38 - json: bool,
39 - },
40 - }
41 -
42 - pub(crate) async fn cmd_health(
43 - pool: &sqlx::SqlitePool,
44 - config: &Config,
45 - target: Option<&str>,
46 - json: bool,
47 - ) -> Result<()> {
48 - let targets: Vec<String> = match target {
49 - Some(t) => {
50 - if config.get_target(t).is_none() {
51 - eprintln!("Unknown target: {t}");
52 - std::process::exit(1);
53 - }
54 - vec![t.to_string()]
55 - }
56 - None => config.target_names(),
57 - };
58 -
59 - let mut snapshots = Vec::new();
60 -
61 - for name in &targets {
62 - let target_config = config.get_target(name).unwrap();
63 - if let Some(health_config) = &target_config.health {
64 - let snapshot = http::check_health(name, health_config, health_config.expect.as_ref()).await;
65 - db::insert_health_check(pool, &snapshot).await?;
66 - snapshots.push(snapshot);
67 - } else {
68 - eprintln!("{name}: no health endpoint configured");
69 - }
70 - }
71 -
72 - if json {
73 - println!("{}", serde_json::to_string_pretty(&snapshots)?);
74 - } else {
75 - print!("{}", display::format_health_snapshots(&snapshots));
76 - }
77 -
78 - Ok(())
79 - }
80 -
81 - pub(crate) async fn cmd_test(
82 - pool: &sqlx::SqlitePool,
83 - config: &Config,
84 - target_name: &str,
85 - filter: Option<&str>,
86 - json: bool,
87 - ) -> Result<()> {
88 - let target = config.get_target(target_name).ok_or_else(|| {
89 - PomError::Config(format!("Unknown target: {target_name}"))
90 - })?;
91 - let tests_config = target.tests.as_ref().ok_or_else(|| {
92 - PomError::Config(format!("Target '{target_name}' has no test configuration"))
93 - })?;
94 -
95 - eprintln!("Running tests on {target_name}...");
96 - let run = ssh::run_tests(target_name, tests_config, filter).await;
97 - db::insert_test_run(pool, &run).await?;
98 -
99 - if json {
100 - let summary = serde_json::json!({
101 - "target": run.target,
102 - "passed": run.passed,
103 - "exit_code": run.exit_code,
104 - "duration_secs": run.duration_secs,
105 - "started_at": run.started_at,
106 - "finished_at": run.finished_at,
107 - "filter": run.filter,
108 - "summary": run.summary,
109 - });
110 - println!("{}", serde_json::to_string_pretty(&summary)?);
111 - } else {
112 - print!("{}", display::format_test_result(target_name, &run));
113 - }
114 -
115 - Ok(())
116 - }
117 -
118 - pub(crate) async fn cmd_status(
119 - pool: &sqlx::SqlitePool,
120 - config: &Config,
121 - json: bool,
122 - ) -> Result<()> {
123 - let mut target_statuses = Vec::new();
124 -
125 - for name in config.target_names() {
126 - let target = config.get_target(&name).unwrap();
127 - let health = db::get_latest_health(pool, &name).await?;
128 - let tls_check = db::get_latest_tls_check(pool, &name).await?;
129 - let route_checks = db::get_latest_route_checks(pool, &name).await?;
130 - let dns_checks = db::get_latest_dns_checks(pool, &name).await?;
131 - let whois_check = db::get_latest_whois_check(pool, &name).await?;
132 - let test = db::get_latest_test_run(pool, &name).await?;
133 - let incident = db::get_open_incident(pool, &name).await?;
134 -
135 - // Compute 24h latency stats
136 - let latency_24h = {
137 - let cutoff = (chrono::Utc::now() - chrono::Duration::hours(24)).to_rfc3339();
138 - let times = db::get_response_times(pool, &name, &cutoff).await.unwrap_or_default();
139 - let operational_times: Vec<i64> = times.iter()
140 - .filter(|(_, ms)| *ms > 0)
141 - .map(|(_, ms)| *ms)
142 - .collect();
143 - LatencyStats::from_times(&operational_times)
144 - };
145 -
146 - // Compute test staleness
147 - let staleness: Option<TestStaleness> = if let Some(tests_config) = &target.tests {
148 - let current_version = health.as_ref()
149 - .and_then(|h| h.details.as_ref())
150 - .and_then(|d| d.version.clone());
151 -
152 - let tested_version = if let Some(ref t) = test {
153 - db::get_version_at_time(pool, &name, &t.started_at).await.unwrap_or(None)
154 - } else {
155 - None
156 - };
157 -
158 - Some(http::compute_test_staleness(
159 - current_version.as_deref(),
160 - tested_version.as_deref(),
161 - test.as_ref().map(|t| t.started_at.as_str()),
162 - tests_config.staleness_days,
163 - ))
164 - } else {
165 - None
166 - };
167 -
168 - if json {
169 - target_statuses.push(serde_json::json!({
170 - "target": name,
171 - "label": target.label,
172 - "health": health,
173 - "tls": tls_check,
174 - "latency_24h": latency_24h,
175 - "dns": dns_checks,
176 - "whois": whois_check,
177 - "last_test": test.map(|t| serde_json::json!({
178 - "passed": t.passed,
179 - "exit_code": t.exit_code,
180 - "duration_secs": t.duration_secs,
181 - "started_at": t.started_at,
182 - "summary": t.summary,
183 - })),
184 - "test_staleness": staleness,
185 - "incident": incident,
186 - }));
187 - } else {
188 - let route_slice = if route_checks.is_empty() { None } else { Some(route_checks.as_slice()) };
189 - let dns_slice = if dns_checks.is_empty() { None } else { Some(dns_checks.as_slice()) };
190 - print!(
191 - "{}",
192 - display::format_status_target(
193 - &name,
194 - &target.label,
195 - health.as_ref(),
196 - latency_24h.as_ref(),
197 - tls_check.as_ref(),
198 - route_slice,
199 - dns_slice,
200 - whois_check.as_ref(),
201 - test.as_ref(),
202 - staleness.as_ref(),
203 - incident.as_ref(),
204 - )
205 - );
206 - }
207 - }
208 -
209 - if json {
210 - println!("{}", serde_json::to_string_pretty(&target_statuses)?);
211 - }
212 -
213 - Ok(())
214 - }
215 -
216 - pub(crate) async fn cmd_history(
217 - pool: &sqlx::SqlitePool,
218 - kind: HistoryKind,
219 - ) -> Result<()> {
220 - match kind {
221 - HistoryKind::Health { target, n, json } => {
222 - let history = db::get_health_history(pool, target.as_deref(), n).await?;
223 - if json {
224 - println!("{}", serde_json::to_string_pretty(&history)?);
225 - } else {
226 - print!("{}", display::format_health_history(&history));
227 - }
228 - }
229 - HistoryKind::Tests { target, n, json } => {
230 - let history = db::get_test_history(pool, target.as_deref(), n).await?;
231 - if json {
232 - let summaries: Vec<serde_json::Value> = history
233 - .iter()
234 - .map(|r| serde_json::json!({
235 - "id": r.id,
236 - "target": r.target,
237 - "passed": r.passed,
238 - "exit_code": r.exit_code,
239 - "duration_secs": r.duration_secs,
240 - "started_at": r.started_at,
241 - "summary": r.summary,
242 - }))
243 - .collect();
244 - println!("{}", serde_json::to_string_pretty(&summaries)?);
245 - } else {
246 - print!("{}", display::format_test_history(&history));
247 - }
248 - }
249 - }
250 -
251 - Ok(())
252 - }
253 -
254 - pub(crate) async fn cmd_prune(
255 - pool: &sqlx::SqlitePool,
256 - days: i64,
257 - ) -> Result<()> {
258 - let result = db::prune_old_records(pool, days).await?;
259 - print!("{}", display::format_prune(&result, days));
260 - Ok(())
261 - }
262 -
263 - pub(crate) async fn cmd_dns(
264 - pool: &sqlx::SqlitePool,
265 - config: &Config,
266 - target: Option<&str>,
267 - json: bool,
268 - ) -> Result<()> {
269 - let targets: Vec<String> = match target {
270 - Some(t) => {
271 - if config.get_target(t).is_none() {
272 - eprintln!("Unknown target: {t}");
273 - std::process::exit(1);
274 - }
275 - vec![t.to_string()]
276 - }
277 - None => config.target_names(),
278 - };
279 -
280 - let mut all_dns_results = Vec::new();
281 - let mut all_whois_results = Vec::new();
282 -
283 - for name in &targets {
284 - let target_config = config.get_target(name).unwrap();
285 -
286 - // DNS checks
287 - if !target_config.dns.is_empty() {
288 - let results = dns::check_dns(name, &target_config.dns).await;
289 - for result in &results {
290 - if let Err(e) = db::insert_dns_check(pool, result).await {
291 - tracing::error!("{name}: failed to store DNS check: {e}");
292 - }
293 - }
294 - all_dns_results.extend(results);
295 - }
296 -
297 - // WHOIS check
298 - if let Some(ref whois_config) = target_config.whois {
299 - let result = whois::check_whois(name, whois_config).await;
300 - if let Err(e) = db::insert_whois_check(pool, &result).await {
301 - tracing::error!("{name}: failed to store WHOIS check: {e}");
302 - }
303 - all_whois_results.push(result);
304 - }
305 - }
306 -
307 - if json {
308 - let output = serde_json::json!({
309 - "dns": all_dns_results,
310 - "whois": all_whois_results,
311 - });
312 - println!("{}", serde_json::to_string_pretty(&output)?);
313 - } else if all_dns_results.is_empty() && all_whois_results.is_empty() {
314 - println!("No DNS or WHOIS checks configured for the selected target(s).");
315 - } else {
316 - print!("{}", display::format_dns_results(&all_dns_results, &all_whois_results));
317 - }
318 -
319 - Ok(())
320 - }
321 -
322 - pub(crate) async fn cmd_serve(
323 - pool: &sqlx::SqlitePool,
324 - config: &Config,
325 - ) -> Result<()> {
326 - let default_interval = config.serve.interval_secs;
327 - let prune_days = config.serve.prune_days;
328 - let listen_addr = config.serve.listen.clone();
329 -
330 - // --- Cancellation token for graceful shutdown ---
331 - let token = tokio_util::sync::CancellationToken::new();
332 -
333 - // --- Instance identity ---
334 - let instance_id = peer::load_or_create_instance_id(config.instance.id.as_deref())?;
335 - let instance_name = config.instance_name();
336 - let instance_info = peer::InstanceInfo {
337 - id: instance_id.clone(),
338 - name: instance_name.clone(),
339 - version: env!("CARGO_PKG_VERSION").to_string(),
340 - targets: config.target_names(),
341 - started_at: chrono::Utc::now().to_rfc3339(),
342 - };
343 -
344 - // --- Alerter ---
345 - let alerter = config.alerts.as_ref().map(|alert_config| {
346 - info!("Alerts enabled (to: {})", alert_config.to);
347 - Alerter::new(alert_config.clone(), pool.clone(), instance_name.clone())
348 - });
349 -
350 - info!("Instance: {instance_name} (id={instance_id})");
351 - info!("Starting serve mode (default interval: {default_interval}s, prune: {prune_days}d)");
352 -
353 - // --- Mesh state ---
354 - let mesh = peer::new_mesh_state(instance_info, &config.peers);
355 -
356 - // Load known peer identities from DB
357 - {
358 - let mut state = mesh.write().await;
359 - for (name, peer) in state.peers.iter_mut() {
360 - if let Ok(Some(known_id)) = db::get_peer_identity(pool, name).await {
361 - peer.known_id = Some(known_id);
362 - }
363 - }
364 - }
365 -
366 - // Spawn a health check task per target
367 - let mut handles = Vec::new();
368 -
369 - for name in config.target_names() {
370 - let target_config = config.get_target(&name).unwrap().clone();
371 - if let Some(health_config) = target_config.health {
372 - let interval_secs = health_config.interval_secs.unwrap_or(default_interval);
373 - let pool = pool.clone();
374 - let name = name.clone();
375 - let label = target_config.label.clone();
376 - let alerter = alerter.clone();
377 - let cancel = token.clone();
378 -
379 - info!("{name}: health check every {interval_secs}s");
380 -
381 - let trending_config = health_config.trending.clone();
382 -
383 - handles.push(tokio::spawn(async move {
384 - let mut interval = tokio::time::interval(
385 - std::time::Duration::from_secs(interval_secs),
386 - );
387 - let expect = health_config.expect.as_ref();
388 - let mut in_drift = false;
389 - interval.tick().await; // consume immediate first tick
390 - loop {
391 - tokio::select! {
392 - _ = cancel.cancelled() => break,
393 - _ = interval.tick() => {}
394 - }
395 - let previous = db::get_latest_health(&pool, &name).await.ok().flatten();
396 - let snapshot = http::check_health(&name, &health_config, expect).await;
397 - info!("{}: {} ({}ms)", name, snapshot.status, snapshot.response_time_ms);
398 - if let Err(e) = db::insert_health_check(&pool, &snapshot).await {
399 - tracing::error!("{name}: failed to store health check: {e}");
400 - }
401 -
402 - // Fire alerts on status transitions
403 - if let Some(ref alerter) = alerter
404 - && let Some(ref prev) = previous
405 - && prev.status != snapshot.status
406 - {
407 - let from = prev.status.to_string();
408 - let to = snapshot.status.to_string();
409 - if snapshot.status == pom::types::HealthStatus::Operational {
410 - alerter.send_health_recovery(&name, &label, &from).await;
411 - } else {
412 - alerter.send_health_alert(
413 - &name,
414 - &label,
415 - &from,
416 - &to,
417 - snapshot.error.as_deref(),
418 - ).await;
419 - }
420 - }
421 -
422 - // Track incidents on status transitions
423 - if let Some(ref prev) = previous
424 - && prev.status != snapshot.status
425 - {
426 - let prev_op = prev.status == pom::types::HealthStatus::Operational;
427 - let now_op = snapshot.status == pom::types::HealthStatus::Operational;
428 -
429 - if prev_op && !now_op {
430 - // Was operational, now unhealthy — open incident
431 - if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await {
432 - tracing::error!("{name}: failed to open incident: {e}");
433 - }
434 - } else if !prev_op && now_op {
435 - // Was unhealthy, now operational — close incidents
436 - if let Err(e) = db::close_open_incidents(&pool, &name).await {
437 - tracing::error!("{name}: failed to close incidents: {e}");
438 - }
439 - } else {
440 - // Status changed between non-operational states — close old, open new
441 - if let Err(e) = db::close_open_incidents(&pool, &name).await {
442 - tracing::error!("{name}: failed to close incidents: {e}");
443 - }
444 - if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await {
445 - tracing::error!("{name}: failed to open incident: {e}");
446 - }
447 - }
448 - }
449 -
450 - // Latency drift detection
451 - if let Some(ref trending) = trending_config
452 - && snapshot.status == pom::types::HealthStatus::Operational
453 - {
454 - let baseline_cutoff = (chrono::Utc::now()
455 - - chrono::Duration::hours(trending.baseline_window_hours as i64))
456 - .to_rfc3339();
457 - let baseline_data = db::get_response_times(&pool, &name, &baseline_cutoff)
458 - .await
459 - .unwrap_or_default();
460 - let operational_times: Vec<i64> = baseline_data.iter()
461 - .filter(|(_, ms)| *ms > 0)
462 - .map(|(_, ms)| *ms)
463 - .collect();
464 - let baseline = LatencyStats::from_times(&operational_times);
465 - let recent = db::get_recent_response_times(&pool, &name, 3)
466 - .await
467 - .unwrap_or_default();
468 -
469 - if let Some(ref bl) = baseline {
470 - if let Some(msg) = http::detect_latency_drift(&recent, bl, trending.spike_threshold) {
471 - if !in_drift {
472 - info!("{name}: {msg}");
473 - if let Some(ref alerter) = alerter {
474 - alerter.send_latency_drift_alert(&name, &label, &msg).await;
475 - }
476 - in_drift = true;
477 - }
478 - } else if in_drift {
479 - info!("{name}: latency drift recovered");
480 - if let Some(ref alerter) = alerter {
481 - alerter.send_latency_recovery(&name, &label).await;
482 - }
483 - in_drift = false;
484 - }
485 - }
486 - }
487 - }
488 - }));
489 - }
490 - }
491 -
492 - // Spawn TLS check tasks
493 - let tls_interval_secs = config.serve.tls_check_interval_secs;
494 - for name in config.target_names() {
495 - let target_config = config.get_target(&name).unwrap().clone();
496 - if let Some(tls_config) = target_config.tls {
497 - let pool = pool.clone();
498 - let name = name.clone();
499 - let label = target_config.label.clone();
500 - let alerter = alerter.clone();
Lines truncated
@@ -0,0 +1,92 @@
1 + use pom::types::HealthStatus;
2 +
3 + #[derive(Debug, PartialEq, Eq)]
4 + pub(crate) enum IncidentAction {
5 + None,
6 + Open,
7 + Close,
8 + CloseAndOpen,
9 + }
10 +
11 + pub(crate) fn incident_action(prev: HealthStatus, curr: HealthStatus) -> IncidentAction {
12 + if prev == curr {
13 + return IncidentAction::None;
14 + }
15 + let prev_op = prev == HealthStatus::Operational;
16 + let curr_op = curr == HealthStatus::Operational;
17 + match (prev_op, curr_op) {
18 + (true, false) => IncidentAction::Open,
19 + (false, true) => IncidentAction::Close,
20 + (false, false) => IncidentAction::CloseAndOpen,
21 + (true, true) => IncidentAction::None,
22 + }
23 + }
24 +
25 + #[cfg(test)]
26 + mod tests {
27 + use super::*;
28 +
29 + #[test]
30 + fn incident_operational_to_error_opens() {
31 + assert_eq!(
32 + incident_action(HealthStatus::Operational, HealthStatus::Error),
33 + IncidentAction::Open,
34 + );
35 + }
36 +
37 + #[test]
38 + fn incident_operational_to_degraded_opens() {
39 + assert_eq!(
40 + incident_action(HealthStatus::Operational, HealthStatus::Degraded),
41 + IncidentAction::Open,
42 + );
43 + }
44 +
45 + #[test]
46 + fn incident_error_to_operational_closes() {
47 + assert_eq!(
48 + incident_action(HealthStatus::Error, HealthStatus::Operational),
49 + IncidentAction::Close,
50 + );
51 + }
52 +
53 + #[test]
54 + fn incident_degraded_to_operational_closes() {
55 + assert_eq!(
56 + incident_action(HealthStatus::Degraded, HealthStatus::Operational),
57 + IncidentAction::Close,
58 + );
59 + }
60 +
61 + #[test]
62 + fn incident_degraded_to_error_closes_and_opens() {
63 + assert_eq!(
64 + incident_action(HealthStatus::Degraded, HealthStatus::Error),
65 + IncidentAction::CloseAndOpen,
66 + );
67 + }
68 +
69 + #[test]
70 + fn incident_error_to_degraded_closes_and_opens() {
71 + assert_eq!(
72 + incident_action(HealthStatus::Error, HealthStatus::Degraded),
73 + IncidentAction::CloseAndOpen,
74 + );
75 + }
76 +
77 + #[test]
78 + fn incident_operational_to_operational_none() {
79 + assert_eq!(
80 + incident_action(HealthStatus::Operational, HealthStatus::Operational),
81 + IncidentAction::None,
82 + );
83 + }
84 +
85 + #[test]
86 + fn incident_error_to_error_none() {
87 + assert_eq!(
88 + incident_action(HealthStatus::Error, HealthStatus::Error),
89 + IncidentAction::None,
90 + );
91 + }
92 + }
@@ -0,0 +1,262 @@
1 + //! CLI command handlers for PoM subcommands.
2 +
3 + mod incident;
4 + mod serve;
5 + mod status;
6 + mod tasks;
7 +
8 + pub(crate) use serve::cmd_serve;
9 + pub(crate) use status::cmd_status;
10 +
11 + use clap::Subcommand;
12 +
13 + use pom::checks::{dns, http, ssh, whois};
14 + use pom::config::Config;
15 + use pom::db;
16 + use pom::display;
17 + use pom::error::{PomError, Result};
18 +
19 + #[derive(Subcommand)]
20 + pub(crate) enum HistoryKind {
21 + /// Health check history
22 + Health {
23 + /// Filter by target
24 + target: Option<String>,
25 + /// Number of results
26 + #[arg(short, default_value = "10")]
27 + n: i64,
28 + /// Output as JSON
29 + #[arg(long)]
30 + json: bool,
31 + },
32 + /// Test run history
33 + Tests {
34 + /// Filter by target
35 + target: Option<String>,
36 + /// Number of results
37 + #[arg(short, default_value = "10")]
38 + n: i64,
39 + /// Output as JSON
40 + #[arg(long)]
41 + json: bool,
42 + },
43 + }
44 +
45 + pub(crate) async fn cmd_health(
46 + pool: &sqlx::SqlitePool,
47 + config: &Config,
48 + target: Option<&str>,
49 + json: bool,
50 + ) -> Result<()> {
51 + let targets: Vec<String> = match target {
52 + Some(t) => {
53 + if config.get_target(t).is_none() {
54 + eprintln!("Unknown target: {t}");
55 + std::process::exit(1);
56 + }
57 + vec![t.to_string()]
58 + }
59 + None => config.target_names(),
60 + };
61 +
62 + let mut snapshots = Vec::new();
63 +
64 + for name in &targets {
65 + let target_config = config.get_target(name).unwrap();
66 + if let Some(health_config) = &target_config.health {
67 + let snapshot = http::check_health(name, health_config, health_config.expect.as_ref()).await;
68 + db::insert_health_check(pool, &snapshot).await?;
69 + snapshots.push(snapshot);
70 + } else {
71 + eprintln!("{name}: no health endpoint configured");
72 + }
73 + }
74 +
75 + if json {
76 + println!("{}", serde_json::to_string_pretty(&snapshots)?);
77 + } else {
78 + print!("{}", display::format_health_snapshots(&snapshots));
79 + }
80 +
81 + Ok(())
82 + }
83 +
84 + pub(crate) async fn cmd_test(
85 + pool: &sqlx::SqlitePool,
86 + config: &Config,
87 + target_name: &str,
88 + filter: Option<&str>,
89 + json: bool,
90 + ) -> Result<()> {
91 + let target = config.get_target(target_name).ok_or_else(|| {
92 + PomError::Config(format!("Unknown target: {target_name}"))
93 + })?;
94 + let tests_config = target.tests.as_ref().ok_or_else(|| {
95 + PomError::Config(format!("Target '{target_name}' has no test configuration"))
96 + })?;
97 +
98 + eprintln!("Running tests on {target_name}...");
99 + let run = ssh::run_tests(target_name, tests_config, filter).await;
100 + let run_id = db::insert_test_run(pool, &run).await?;
101 +
102 + // Store per-test details and detect regressions
103 + if !run.summary.details.is_empty() {
104 + db::insert_test_details(pool, run_id, &run.summary.details).await?;
105 + }
106 + let regressions = db::get_test_regressions(pool, target_name, run_id).await.unwrap_or_default();
107 +
108 + if json {
109 + let summary = serde_json::json!({
110 + "target": run.target,
111 + "passed": run.passed,
112 + "exit_code": run.exit_code,
113 + "duration_secs": run.duration_secs,
114 + "started_at": run.started_at,
115 + "finished_at": run.finished_at,
116 + "filter": run.filter,
117 + "summary": run.summary,
118 + "regressions": regressions,
119 + });
120 + println!("{}", serde_json::to_string_pretty(&summary)?);
121 + } else {
122 + print!("{}", display::format_test_result(target_name, &run));
123 + if !regressions.is_empty() {
124 + print!("{}", display::format_regressions(&regressions));
125 + }
126 + }
127 +
128 + Ok(())
129 + }
130 +
131 + pub(crate) async fn cmd_history(
132 + pool: &sqlx::SqlitePool,
133 + kind: HistoryKind,
134 + ) -> Result<()> {
135 + match kind {
136 + HistoryKind::Health { target, n, json } => {
137 + let history = db::get_health_history(pool, target.as_deref(), n).await?;
138 + if json {
139 + println!("{}", serde_json::to_string_pretty(&history)?);
140 + } else {
141 + print!("{}", display::format_health_history(&history));
142 + }
143 + }
144 + HistoryKind::Tests { target, n, json } => {
145 + let history = db::get_test_history(pool, target.as_deref(), n).await?;
146 + if json {
147 + let summaries: Vec<serde_json::Value> = history
148 + .iter()
149 + .map(|r| serde_json::json!({
150 + "id": r.id,
151 + "target": r.target,
152 + "passed": r.passed,
153 + "exit_code": r.exit_code,
154 + "duration_secs": r.duration_secs,
155 + "started_at": r.started_at,
156 + "summary": r.summary,
157 + }))
158 + .collect();
159 + println!("{}", serde_json::to_string_pretty(&summaries)?);
160 + } else {
161 + print!("{}", display::format_test_history(&history));
162 + }
163 + }
164 + }
165 +
166 + Ok(())
167 + }
168 +
169 + pub(crate) async fn cmd_prune(
170 + pool: &sqlx::SqlitePool,
171 + days: i64,
172 + ) -> Result<()> {
173 + let result = db::prune_old_records(pool, days).await?;
174 + print!("{}", display::format_prune(&result, days));
175 + Ok(())
176 + }
177 +
178 + pub(crate) async fn cmd_dns(
179 + pool: &sqlx::SqlitePool,
180 + config: &Config,
181 + target: Option<&str>,
182 + json: bool,
183 + ) -> Result<()> {
184 + let targets: Vec<String> = match target {
185 + Some(t) => {
186 + if config.get_target(t).is_none() {
187 + eprintln!("Unknown target: {t}");
188 + std::process::exit(1);
189 + }
190 + vec![t.to_string()]
191 + }
192 + None => config.target_names(),
193 + };
194 +
195 + let mut all_dns_results = Vec::new();
196 + let mut all_whois_results = Vec::new();
197 +
198 + for name in &targets {
199 + let target_config = config.get_target(name).unwrap();
200 +
201 + // DNS checks
202 + if !target_config.dns.is_empty() {
203 + let results = dns::check_dns(name, &target_config.dns).await;
204 + for result in &results {
205 + if let Err(e) = db::insert_dns_check(pool, result).await {
206 + tracing::error!("{name}: failed to store DNS check: {e}");
207 + }
208 + }
209 + all_dns_results.extend(results);
210 + }
211 +
212 + // WHOIS check
213 + if let Some(ref whois_config) = target_config.whois {
214 + let result = whois::check_whois(name, whois_config).await;
215 + if let Err(e) = db::insert_whois_check(pool, &result).await {
216 + tracing::error!("{name}: failed to store WHOIS check: {e}");
217 + }
218 + all_whois_results.push(result);
219 + }
220 + }
221 +
222 + if json {
223 + let output = serde_json::json!({
224 + "dns": all_dns_results,
225 + "whois": all_whois_results,
226 + });
227 + println!("{}", serde_json::to_string_pretty(&output)?);
228 + } else if all_dns_results.is_empty() && all_whois_results.is_empty() {
229 + println!("No DNS or WHOIS checks configured for the selected target(s).");
230 + } else {
231 + print!("{}", display::format_dns_results(&all_dns_results, &all_whois_results));
232 + }
233 +
234 + Ok(())
235 + }
236 +
237 + pub(crate) async fn cmd_mesh(
238 + config: &Config,
239 + json: bool,
240 + ) -> Result<()> {
241 + let listen = &config.serve.listen;
242 + let url = format!("http://{listen}/api/mesh");
243 +
244 + let client = reqwest::Client::builder()
245 + .timeout(std::time::Duration::from_secs(5))
246 + .build()?;
247 +
248 + let response = client.get(&url).send().await.map_err(|e| {
249 + PomError::Config(format!("Could not reach local PoM instance at {listen}: {e}"))
250 + })?;
251 +
252 + let data: serde_json::Value = response.json().await?;
253 +
254 + if json {
255 + println!("{}", serde_json::to_string_pretty(&data)?);
256 + return Ok(());
257 + }
258 +
259 + print!("{}", display::format_mesh(&data));
260 +
261 + Ok(())
262 + }
@@ -0,0 +1,145 @@
1 + use tokio::task::JoinHandle;
2 + use tracing::info;
3 +
4 + use pom::alerts::Alerter;
5 + use pom::config::Config;
6 + use pom::db;
7 + use pom::error::Result;
8 + use pom::peer;
9 +
10 + use super::tasks;
11 +
12 + pub(crate) async fn cmd_serve(
13 + pool: &sqlx::SqlitePool,
14 + config: &Config,
15 + ) -> Result<()> {
16 + let default_interval = config.serve.interval_secs;
17 + let prune_days = config.serve.prune_days;
18 + let listen_addr = config.serve.listen.clone();
19 +
20 + // --- Cancellation token for graceful shutdown ---
21 + let token = tokio_util::sync::CancellationToken::new();
22 +
23 + // --- Instance identity ---
24 + let instance_id = peer::load_or_create_instance_id(config.instance.id.as_deref())?;
25 + let instance_name = config.instance_name();
26 + let instance_info = peer::InstanceInfo {
27 + id: instance_id.clone(),
28 + name: instance_name.clone(),
29 + version: env!("CARGO_PKG_VERSION").to_string(),
30 + targets: config.target_names(),
31 + started_at: chrono::Utc::now().to_rfc3339(),
32 + };
33 +
34 + // --- Alerter ---
35 + let alerter = config.alerts.as_ref().map(|alert_config| {
36 + info!("Alerts enabled (to: {})", alert_config.to);
37 + Alerter::new(alert_config.clone(), pool.clone(), instance_name.clone())
38 + });
39 +
40 + info!("Instance: {instance_name} (id={instance_id})");
41 + info!("Starting serve mode (default interval: {default_interval}s, prune: {prune_days}d)");
42 +
43 + // --- Mesh state ---
44 + let mesh = peer::new_mesh_state(instance_info, &config.peers);
45 +
46 + // Load known peer identities from DB
47 + {
48 + let mut state = mesh.write().await;
49 + for (name, peer) in state.peers.iter_mut() {
50 + if let Ok(Some(known_id)) = db::get_peer_identity(pool, name).await {
51 + peer.known_id = Some(known_id);
52 + }
53 + }
54 + }
55 +
56 + let mut handles: Vec<JoinHandle<()>> = Vec::new();
57 +
58 + // Spawn all monitoring tasks
59 + handles.extend(tasks::spawn_health_tasks(config, pool, &token, &alerter));
60 + handles.extend(tasks::spawn_tls_tasks(config, pool, &token, &alerter));
61 + handles.extend(tasks::spawn_route_tasks(config, pool, &token, &alerter));
62 + handles.extend(tasks::spawn_dns_tasks(config, pool, &token, &alerter));
63 + handles.extend(tasks::spawn_whois_tasks(config, pool, &token, &alerter));
64 + handles.push(tasks::spawn_prune_task(pool, prune_days, &token));
65 +
66 + // Spawn peer heartbeat tasks
67 + if !config.peers.is_empty() {
68 + let heartbeat_secs = config.serve.peer_heartbeat_secs;
69 + info!("Peer mesh: {} peers, heartbeat every {heartbeat_secs}s", config.peers.len());
70 + let hb_handles = peer::spawn_heartbeat_tasks(
71 + mesh.clone(),
72 + pool.clone(),
73 + heartbeat_secs,
74 + alerter.clone(),
75 + token.clone(),
76 + ).await;
77 + handles.extend(hb_handles);
78 + }
79 +
80 + // Spawn monitoring-offline meta-alert task
81 + if let Some(handle) = tasks::spawn_meta_alert_task(config, pool, default_interval, &token, &alerter) {
82 + handles.push(handle);
83 + }
84 +
85 + // Start HTTP API server
86 + let api_app = pom::api::router(pool.clone(), config.clone(), Some(mesh.clone()));
87 + let api_listener = tokio::net::TcpListener::bind(&listen_addr).await?;
88 + info!("API server listening on {listen_addr}");
89 + let api_cancel = token.clone();
90 + handles.push(tokio::spawn(async move {
91 + if let Err(e) = axum::serve(api_listener, api_app)
92 + .with_graceful_shutdown(async move { api_cancel.cancelled().await })
93 + .await
94 + {
95 + tracing::error!("API server error: {e}");
96 + }
97 + }));
98 +
99 + // Wait for shutdown signal or unexpected task exit
100 + let mut sigterm = tokio::signal::unix::signal(
101 + tokio::signal::unix::SignalKind::terminate(),
102 + )?;
103 +
104 + let mut watchdog_interval = tokio::time::interval(std::time::Duration::from_secs(60));
105 + watchdog_interval.tick().await; // consume immediate first tick
106 +
107 + loop {
108 + tokio::select! {
109 + _ = tokio::signal::ctrl_c() => {
110 + info!("Received SIGINT, shutting down");
111 + break;
112 + }
113 + _ = sigterm.recv() => {
114 + info!("Received SIGTERM, shutting down");
115 + break;
116 + }
117 + _ = watchdog_interval.tick() => {
118 + for (i, handle) in handles.iter().enumerate() {
119 + if handle.is_finished() {
120 + tracing::error!("Background task {i} exited unexpectedly (possible panic)");
121 + }
122 + }
123 + }
124 + }
125 + }
126 +
127 + // Graceful shutdown: cancel all tasks, then wait with grace period
128 + token.cancel();
129 + info!("Waiting for tasks to finish (5s grace period)...");
130 +
131 + let shutdown = async {
132 + for handle in handles {
133 + if let Err(e) = handle.await {
134 + tracing::error!("Task shutdown error: {e}");
135 + }
136 + }
137 + };
138 +
139 + if tokio::time::timeout(std::time::Duration::from_secs(5), shutdown).await.is_err() {
140 + tracing::warn!("Grace period elapsed, some tasks may not have finished cleanly");
141 + }
142 +
143 + info!("Shutdown complete");
144 + Ok(())
145 + }
@@ -0,0 +1,121 @@
1 + use pom::checks::http;
2 + use pom::config::Config;
3 + use pom::db;
4 + use pom::display;
5 + use pom::error::Result;
6 + use pom::types::{LatencyStats, TestStaleness};
7 +
8 + pub(crate) async fn cmd_status(
9 + pool: &sqlx::SqlitePool,
10 + config: &Config,
11 + json: bool,
12 + ) -> Result<()> {
13 + let mut target_statuses = Vec::new();
14 +
15 + for name in config.target_names() {
16 + let target = config.get_target(&name).unwrap();
17 + let health = db::get_latest_health(pool, &name).await?;
18 + let tls_check = db::get_latest_tls_check(pool, &name).await?;
19 + let route_checks = db::get_latest_route_checks(pool, &name).await?;
20 + let dns_checks = db::get_latest_dns_checks(pool, &name).await?;
21 + let whois_check = db::get_latest_whois_check(pool, &name).await?;
22 + let test = db::get_latest_test_run(pool, &name).await?;
23 + let incident = db::get_open_incident(pool, &name).await?;
24 +
25 + // Compute 24h latency stats
26 + let latency_24h = {
27 + let cutoff = (chrono::Utc::now() - chrono::Duration::hours(24)).to_rfc3339();
28 + let times = db::get_response_times(pool, &name, &cutoff).await.unwrap_or_default();
29 + let operational_times: Vec<i64> = times.iter()
30 + .filter(|(_, ms)| *ms > 0)
31 + .map(|(_, ms)| *ms)
32 + .collect();
33 + LatencyStats::from_times(&operational_times)
34 + };
35 +
36 + // Compute test staleness
37 + let staleness: Option<TestStaleness> = if let Some(tests_config) = &target.tests {
38 + let current_version = health.as_ref()
39 + .and_then(|h| h.details.as_ref())
40 + .and_then(|d| d.version.clone());
41 +
42 + let tested_version = if let Some(ref t) = test {
43 + db::get_version_at_time(pool, &name, &t.started_at).await.unwrap_or(None)
44 + } else {
45 + None
46 + };
47 +
48 + Some(http::compute_test_staleness(
49 + current_version.as_deref(),
50 + tested_version.as_deref(),
51 + test.as_ref().map(|t| t.started_at.as_str()),
52 + tests_config.staleness_days,
53 + ))
54 + } else {
55 + None
56 + };
57 +
58 + // Compute test duration trend
59 + let test_durations = if target.tests.is_some() {
60 + db::get_test_durations(pool, &name, 13).await.unwrap_or_default()
61 + } else {
62 + vec![]
63 + };
64 + let duration_drift = if !test_durations.is_empty() {
65 + http::detect_test_duration_drift(&test_durations, 10, 3, 1.5)
66 + } else {
67 + None
68 + };
69 +
70 + if json {
71 + target_statuses.push(serde_json::json!({
72 + "target": name,
73 + "label": target.label,
74 + "health": health,
75 + "tls": tls_check,
76 + "latency_24h": latency_24h,
77 + "dns": dns_checks,
78 + "whois": whois_check,
79 + "last_test": test.map(|t| serde_json::json!({
80 + "passed": t.passed,
81 + "exit_code": t.exit_code,
82 + "duration_secs": t.duration_secs,
83 + "started_at": t.started_at,
84 + "summary": t.summary,
85 + })),
86 + "test_staleness": staleness,
87 + "test_duration_drift": duration_drift,
88 + "incident": incident,
89 + }));
90 + } else {
91 + let route_slice = if route_checks.is_empty() { None } else { Some(route_checks.as_slice()) };
92 + let dns_slice = if dns_checks.is_empty() { None } else { Some(dns_checks.as_slice()) };
93 + print!(
94 + "{}",
95 + display::format_status_target(
96 + &name,
97 + &target.label,
98 + health.as_ref(),
99 + latency_24h.as_ref(),
100 + tls_check.as_ref(),
101 + route_slice,
102 + dns_slice,
103 + whois_check.as_ref(),
104 + test.as_ref(),
105 + staleness.as_ref(),
106 + incident.as_ref(),
107 + )
108 + );
109 + if !test_durations.is_empty() {
110 + let recent_5: Vec<(String, i64)> = test_durations.iter().take(5).cloned().collect();
111 + print!("{}", display::format_test_duration_trend(&recent_5, duration_drift.as_deref()));
112 + }
113 + }
114 + }
115 +
116 + if json {
117 + println!("{}", serde_json::to_string_pretty(&target_statuses)?);
118 + }
119 +
120 + Ok(())
121 + }
@@ -0,0 +1,85 @@
1 + use tokio::task::JoinHandle;
2 + use tracing::info;
3 +
4 + use pom::alerts::Alerter;
5 + use pom::checks::dns;
6 + use pom::config::Config;
7 + use pom::db;
8 +
9 + pub(crate) fn spawn_dns_tasks(
10 + config: &Config,
11 + pool: &sqlx::SqlitePool,
12 + cancel: &tokio_util::sync::CancellationToken,
13 + alerter: &Option<Alerter>,
14 + ) -> Vec<JoinHandle<()>> {
15 + let dns_interval_secs = config.serve.dns_check_interval_secs;
16 + let mut handles = Vec::new();
17 +
18 + for name in config.target_names() {
19 + let target_config = config.get_target(&name).unwrap().clone();
20 + if target_config.dns.is_empty() {
21 + continue;
22 + }
23 + let dns_records = target_config.dns.clone();
24 + let label = target_config.label.clone();
25 + let pool = pool.clone();
26 + let alerter = alerter.clone();
27 + let cancel = cancel.clone();
28 + let n = dns_records.len();
29 +
30 + info!("{name}: DNS check every {dns_interval_secs}s ({n} records)");
31 +
32 + handles.push(tokio::spawn(async move {
33 + let mut interval = tokio::time::interval(
34 + std::time::Duration::from_secs(dns_interval_secs),
35 + );
36 + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
37 + let mut prev_mismatched: std::collections::HashSet<(String, String)> = std::collections::HashSet::new();
38 +
39 + interval.tick().await; // consume immediate first tick
40 + loop {
41 + tokio::select! {
42 + _ = cancel.cancelled() => break,
43 + _ = interval.tick() => {}
44 + }
45 + let results = dns::check_dns(&name, &dns_records).await;
46 +
47 + for result in &results {
48 + if let Err(e) = db::insert_dns_check(&pool, result).await {
49 + tracing::error!("{}: failed to store DNS check for {} {}: {e}", name, result.name, result.record_type);
50 + }
51 + }
52 +
53 + let current_mismatched: std::collections::HashSet<(String, String)> = results
54 + .iter()
55 + .filter(|r| !r.matches)
56 + .map(|r| (r.name.clone(), r.record_type.clone()))
57 + .collect();
58 +
59 + let ok_count = results.iter().filter(|r| r.matches).count();
60 + info!("{name}: DNS {ok_count}/{n} match");
61 +
62 + if let Some(ref alerter) = alerter {
63 + // New mismatches
64 + let new_mismatches: Vec<&pom::types::DnsCheckResult> = results
65 + .iter()
66 + .filter(|r| !r.matches && !prev_mismatched.contains(&(r.name.clone(), r.record_type.clone())))
67 + .collect();
68 + if !new_mismatches.is_empty() {
69 + let owned: Vec<pom::types::DnsCheckResult> = new_mismatches.into_iter().cloned().collect();
70 + alerter.send_dns_mismatch_alert(&name, &label, &owned).await;
71 + }
72 +
73 + // All recovered
74 + if !prev_mismatched.is_empty() && current_mismatched.is_empty() {
75 + alerter.send_dns_recovery_alert(&name, &label).await;
76 + }
77 + }
78 +
79 + prev_mismatched = current_mismatched;
80 + }
81 + }));
82 + }
83 +
84 + handles
85 + }
@@ -0,0 +1,141 @@
1 + use tokio::task::JoinHandle;
2 + use tracing::info;
3 +
4 + use pom::alerts::Alerter;
5 + use pom::checks::http;
6 + use pom::config::Config;
7 + use pom::db;
8 + use pom::types::{HealthStatus, LatencyStats};
9 +
10 + use super::super::incident::{incident_action, IncidentAction};
11 +
12 + pub(crate) fn spawn_health_tasks(
13 + config: &Config,
14 + pool: &sqlx::SqlitePool,
15 + cancel: &tokio_util::sync::CancellationToken,
16 + alerter: &Option<Alerter>,
17 + ) -> Vec<JoinHandle<()>> {
18 + let default_interval = config.serve.interval_secs;
19 + let mut handles = Vec::new();
20 +
21 + for name in config.target_names() {
22 + let target_config = config.get_target(&name).unwrap().clone();
23 + if let Some(health_config) = target_config.health {
24 + let interval_secs = health_config.interval_secs.unwrap_or(default_interval);
25 + let pool = pool.clone();
26 + let name = name.clone();
27 + let label = target_config.label.clone();
28 + let alerter = alerter.clone();
29 + let cancel = cancel.clone();
30 + let trending_config = health_config.trending.clone();
31 +
32 + info!("{name}: health check every {interval_secs}s");
33 +
34 + handles.push(tokio::spawn(async move {
35 + let mut interval = tokio::time::interval(
36 + std::time::Duration::from_secs(interval_secs),
37 + );
38 + let expect = health_config.expect.as_ref();
39 + let mut in_drift = false;
40 + interval.tick().await; // consume immediate first tick
41 + loop {
42 + tokio::select! {
43 + _ = cancel.cancelled() => break,
44 + _ = interval.tick() => {}
45 + }
46 + let previous = db::get_latest_health(&pool, &name).await.ok().flatten();
47 + let snapshot = http::check_health(&name, &health_config, expect).await;
48 + info!("{}: {} ({}ms)", name, snapshot.status, snapshot.response_time_ms);
49 + if let Err(e) = db::insert_health_check(&pool, &snapshot).await {
50 + tracing::error!("{name}: failed to store health check: {e}");
51 + }
52 +
53 + // Fire alerts on status transitions
54 + if let Some(ref alerter) = alerter
55 + && let Some(ref prev) = previous
56 + && prev.status != snapshot.status
57 + {
58 + let from = prev.status.to_string();
59 + let to = snapshot.status.to_string();
60 + if snapshot.status == HealthStatus::Operational {
61 + alerter.send_health_recovery(&name, &label, &from).await;
62 + } else {
63 + alerter.send_health_alert(
64 + &name,
65 + &label,
66 + &from,
67 + &to,
68 + snapshot.error.as_deref(),
69 + ).await;
70 + }
71 + }
72 +
73 + // Track incidents on status transitions
74 + if let Some(ref prev) = previous {
75 + match incident_action(prev.status, snapshot.status) {
76 + IncidentAction::None => {}
77 + IncidentAction::Open => {
78 + if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await {
79 + tracing::error!("{name}: failed to open incident: {e}");
80 + }
81 + }
82 + IncidentAction::Close => {
83 + if let Err(e) = db::close_open_incidents(&pool, &name).await {
84 + tracing::error!("{name}: failed to close incidents: {e}");
85 + }
86 + }
87 + IncidentAction::CloseAndOpen => {
88 + if let Err(e) = db::close_open_incidents(&pool, &name).await {
89 + tracing::error!("{name}: failed to close incidents: {e}");
90 + }
91 + if let Err(e) = db::insert_incident(&pool, &name, &prev.status.to_string(), &snapshot.status.to_string()).await {
92 + tracing::error!("{name}: failed to open incident: {e}");
93 + }
94 + }
95 + }
96 + }
97 +
98 + // Latency drift detection
99 + if let Some(ref trending) = trending_config
100 + && snapshot.status == HealthStatus::Operational
101 + {
102 + let baseline_cutoff = (chrono::Utc::now()
103 + - chrono::Duration::hours(trending.baseline_window_hours as i64))
104 + .to_rfc3339();
105 + let baseline_data = db::get_response_times(&pool, &name, &baseline_cutoff)
106 + .await
107 + .unwrap_or_default();
108 + let operational_times: Vec<i64> = baseline_data.iter()
109 + .filter(|(_, ms)| *ms > 0)
110 + .map(|(_, ms)| *ms)
111 + .collect();
112 + let baseline = LatencyStats::from_times(&operational_times);
113 + let recent = db::get_recent_response_times(&pool, &name, 3)
114 + .await
115 + .unwrap_or_default();
116 +
117 + if let Some(ref bl) = baseline {
118 + if let Some(msg) = http::detect_latency_drift(&recent, bl, trending.spike_threshold) {
119 + if !in_drift {
120 + info!("{name}: {msg}");
121 + if let Some(ref alerter) = alerter {
122 + alerter.send_latency_drift_alert(&name, &label, &msg).await;
123 + }
124 + in_drift = true;
125 + }
126 + } else if in_drift {
127 + info!("{name}: latency drift recovered");
128 + if let Some(ref alerter) = alerter {
129 + alerter.send_latency_recovery(&name, &label).await;
130 + }
131 + in_drift = false;
132 + }
133 + }
134 + }
135 + }
136 + }));
137 + }
138 + }
139 +
140 + handles
141 + }
@@ -0,0 +1,64 @@
1 + use tokio::task::JoinHandle;
2 + use tracing::info;
3 +
4 + use pom::alerts::Alerter;
5 + use pom::config::Config;
6 + use pom::db;
7 + use pom::types::HealthStatus;
8 +
9 + pub(crate) fn spawn_meta_alert_task(
10 + config: &Config,
11 + pool: &sqlx::SqlitePool,
12 + default_interval: u64,
13 + cancel: &tokio_util::sync::CancellationToken,
14 + alerter: &Option<Alerter>,
15 + ) -> Option<JoinHandle<()>> {
16 + let health_target_names: Vec<String> = config.target_names()
17 + .into_iter()
18 + .filter(|n| config.get_target(n).is_some_and(|t| t.health.is_some()))
19 + .collect();
20 +
21 + if health_target_names.len() < 2 {
22 + return None;
23 + }
24 + let alerter = alerter.clone()?;
25 +
26 + let pool = pool.clone();
27 + let cancel = cancel.clone();
28 + let meta_interval_secs = default_interval * 2;
29 +
30 + info!("Meta-alert: monitoring-offline check every {meta_interval_secs}s ({} targets)", health_target_names.len());
31 +
32 + Some(tokio::spawn(async move {
33 + let mut interval = tokio::time::interval(
34 + std::time::Duration::from_secs(meta_interval_secs),
35 + );
36 + interval.tick().await; // consume immediate first tick
37 + let mut was_all_down = false;
38 +
39 + loop {
40 + tokio::select! {
41 + _ = cancel.cancelled() => break,
42 + _ = interval.tick() => {}
43 + }
44 +
45 + let mut all_down = true;
46 + for name in &health_target_names {
47 + if let Ok(Some(snap)) = db::get_latest_health(&pool, name).await
48 + && (snap.status == HealthStatus::Operational
49 + || snap.status == HealthStatus::Degraded)
50 + {
51 + all_down = false;
52 + break;
53 + }
54 + }
55 +
56 + if all_down && !was_all_down {
57 + alerter.send_monitoring_offline_alert(health_target_names.len()).await;
58 + } else if !all_down && was_all_down {
59 + alerter.send_monitoring_recovery().await;
60 + }
61 + was_all_down = all_down;
62 + }
63 + }))
64 + }
@@ -0,0 +1,15 @@
1 + mod dns;
2 + mod health;
3 + mod meta_alert;
4 + mod prune;
5 + mod routes;
6 + mod tls;
7 + mod whois;
8 +
9 + pub(crate) use dns::spawn_dns_tasks;
10 + pub(crate) use health::spawn_health_tasks;
11 + pub(crate) use meta_alert::spawn_meta_alert_task;
12 + pub(crate) use prune::spawn_prune_task;
13 + pub(crate) use routes::spawn_route_tasks;
14 + pub(crate) use tls::spawn_tls_tasks;
15 + pub(crate) use whois::spawn_whois_tasks;
@@ -0,0 +1,30 @@
1 + use tokio::task::JoinHandle;
2 + use tracing::info;
3 +
4 + use pom::db;
5 +
6 + pub(crate) fn spawn_prune_task(
7 + pool: &sqlx::SqlitePool,
8 + prune_days: i64,
9 + cancel: &tokio_util::sync::CancellationToken,
10 + ) -> JoinHandle<()> {
11 + let pool = pool.clone();
12 + let cancel = cancel.clone();
13 +
14 + tokio::spawn(async move {
15 + let mut interval = tokio::time::interval(
16 + std::time::Duration::from_secs(86400),
17 + );
18 + interval.tick().await; // consume immediate first tick
19 + loop {
20 + tokio::select! {
21 + _ = cancel.cancelled() => break,
22 + _ = interval.tick() => {}
23 + }
24 + match db::prune_old_records(&pool, prune_days).await {
25 + Ok(r) => info!("Pruned {} health checks, {} test runs, {} test details, {} peer heartbeats, {} alerts, {} TLS checks, {} incidents, {} route checks, {} DNS checks, {} WHOIS checks", r.health, r.tests, r.test_details, r.heartbeats, r.alerts, r.tls, r.incidents, r.routes, r.dns, r.whois),
26 + Err(e) => tracing::error!("Prune failed: {e}"),
27 + }
28 + }
29 + })
30 + }
@@ -0,0 +1,95 @@
1 + use tokio::task::JoinHandle;
2 + use tracing::info;
3 +
4 + use pom::alerts::Alerter;
5 + use pom::checks::routes;
6 + use pom::config::Config;
7 + use pom::db;
8 +
9 + pub(crate) fn spawn_route_tasks(
10 + config: &Config,
11 + pool: &sqlx::SqlitePool,
12 + cancel: &tokio_util::sync::CancellationToken,
13 + alerter: &Option<Alerter>,
14 + ) -> Vec<JoinHandle<()>> {
15 + let route_interval = config.serve.route_check_interval_secs;
16 + let mut handles = Vec::new();
17 +
18 + for name in config.target_names() {
19 + let target_config = config.get_target(&name).unwrap().clone();
20 + if target_config.expected_routes.is_empty() {
21 + continue;
22 + }
23 + let Some(ref health_config) = target_config.health else { continue };
24 + let Some(base_url) = routes::base_url_from_health_url(&health_config.url) else { continue };
25 + let route_paths = target_config.expected_routes.clone();
26 + let timeout = std::time::Duration::from_secs(health_config.timeout_secs);
27 + let label = target_config.label.clone();
28 + let pool = pool.clone();
29 + let alerter = alerter.clone();
30 + let n = route_paths.len();
31 + let cancel = cancel.clone();
32 +
33 + info!("{name}: route checks every {route_interval}s ({n} routes)");
34 +
35 + handles.push(tokio::spawn(async move {
36 + let mut interval = tokio::time::interval(
37 + std::time::Duration::from_secs(route_interval),
38 + );
39 + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
40 + let client = reqwest::Client::builder()
41 + .redirect(reqwest::redirect::Policy::none())
42 + .build()
43 + .unwrap_or_default();
44 + let mut prev_failed: std::collections::HashSet<String> = std::collections::HashSet::new();
45 +
46 + interval.tick().await; // consume immediate first tick
47 + loop {
48 + tokio::select! {
49 + _ = cancel.cancelled() => break,
50 + _ = interval.tick() => {}
51 + }
52 + let results = routes::check_routes(&client, &name, &base_url, &route_paths, timeout).await;
53 +
54 + for result in &results {
55 + if let Err(e) = db::insert_route_check(&pool, result).await {
56 + tracing::error!("{}: failed to store route check for {}: {e}", name, result.path);
57 + }
58 + }
59 +
60 + let current_failed: std::collections::HashSet<String> = results
61 + .iter()
62 + .filter(|r| !r.ok)
63 + .map(|r| r.path.clone())
64 + .collect();
65 +
66 + let ok_count = results.iter().filter(|r| r.ok).count();
67 + info!("{name}: routes {ok_count}/{n} OK");
68 +
69 + if let Some(ref alerter) = alerter {
70 + // New failures
71 + let new_failures: Vec<String> = current_failed
72 + .difference(&prev_failed)
73 + .cloned()
74 + .collect();
75 + if !new_failures.is_empty() {
76 + alerter.send_route_failure_alert(&name, &label, &new_failures).await;
77 + }
78 +
79 + // Recoveries
80 + let recoveries: Vec<String> = prev_failed
81 + .difference(&current_failed)
82 + .cloned()
83 + .collect();
84 + if !recoveries.is_empty() {
85 + alerter.send_route_recovery_alert(&name, &label, &recoveries).await;
86 + }
87 + }
88 +
89 + prev_failed = current_failed;
90 + }
91 + }));
92 + }
93 +
94 + handles
95 + }
M src/config.rs +19 -9
M src/dashboard.rs +34 -1
M src/db.rs +100 -2
M src/display.rs +38 -6
M src/types.rs +12 -1