Skip to main content

max / makenotwork

pom: scan-pipeline check + alerting Polls the makenotwork /admin/uploads/health.json endpoint on a per-target interval (default 300s), applies thresholds from the scan-pipeline-audit doc § 6, fires alerts on status transitions. Thresholds: - Per-layer error rate > 10% over 1h → degraded - Layer silent (no clean response in >24h, zero recent traffic) → down - Queue depth > 50 pending → degraded - Stuck-scan count > 5 → degraded - Held-for-review total > 100 → degraded Down beats degraded for the overall verdict. Multiple issues all reported in the alert body. New types: ScanPipelineConfig (target), ScanPipelineCheckResult, ScanLayerSnapshot. New AlertCategory variants ScanPipelineDegraded and ScanPipelineRecovery, with wam_ticket + email + cooldown handling matching the existing backup-alert pattern. 10 unit tests cover empty pipeline, every threshold individually, boundary error rate, down-beats-degraded precedence, and the "recently clean but currently idle" Operational case. This converts the dashboard from "check it manually" into the audit-doc-mandated alerting that would have caught the original MalwareBazaar regression in hours.
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-24 22:24 UTC
Commit: 218c512272b59387f73c8cd9106954f169675474
Parent: 0962909
8 files changed, +465 insertions, -0 deletions
@@ -642,6 +642,61 @@ impl Alerter {
642 642 self.record_alert(&alert_key, AlertCategory::BackupRecovery, None, Some("ok"), None).await;
643 643 }
644 644
645 + /// Fire when the scan pipeline transitions from operational → degraded or
646 + /// unreachable. Includes the audit-doc threshold issues that fired.
647 + #[instrument(skip_all)]
648 + pub async fn send_scan_pipeline_alert(
649 + &self,
650 + target: &str,
651 + label: &str,
652 + status: &str,
653 + issues: &[String],
654 + ) {
655 + let alert_key = format!("scan_pipeline:{target}");
656 + if self.is_within_cooldown(&alert_key).await {
657 + info!("alert cooldown active for {alert_key}, skipping");
658 + return;
659 + }
660 +
661 + let subject = format!("[PoM] {label}: scan pipeline {status}");
662 + let body = format!(
663 + "Target: {label} ({target})\n\
664 + Status: {status}\n\
665 + Issues:\n{}\n\
666 + Instance: {}\n\
667 + Time: {}\n\n\
668 + Dashboard: <https://{}/admin/uploads>\n\n\
669 + - PoM",
670 + issues.iter().map(|i| format!(" - {i}")).collect::<Vec<_>>().join("\n"),
671 + self.instance_name,
672 + chrono::Utc::now().to_rfc3339(),
673 + target,
674 + );
675 +
676 + let priority = if status == "unreachable" { "high" } else { "medium" };
677 + self.wam_ticket(&subject, &body, priority, "pom-scan-pipeline", Some(target)).await;
678 + self.record_alert(&alert_key, AlertCategory::ScanPipelineDegraded, None, Some(status), None).await;
679 + }
680 +
681 + /// Fire on recovery from a degraded / unreachable scan-pipeline state.
682 + #[instrument(skip_all)]
683 + pub async fn send_scan_pipeline_recovery(&self, target: &str, label: &str) {
684 + let alert_key = format!("scan_pipeline:{target}");
685 + let subject = format!("[PoM] {label}: scan pipeline recovered");
686 + let body = format!(
687 + "Target: {label} ({target})\n\
688 + Scan pipeline is operational.\n\
689 + Instance: {}\n\
690 + Time: {}\n\n\
691 + - PoM",
692 + self.instance_name,
693 + chrono::Utc::now().to_rfc3339(),
694 + );
695 +
696 + self.send_email(&subject, &body).await;
697 + self.record_alert(&alert_key, AlertCategory::ScanPipelineRecovery, None, Some("operational"), None).await;
698 + }
699 +
645 700 /// All monitored targets are unreachable — likely a network issue with PoM itself.
646 701 #[instrument(skip_all)]
647 702 pub async fn send_monitoring_offline_alert(&self, target_count: usize) {
@@ -4,6 +4,7 @@ pub mod dns;
4 4 pub mod http;
5 5 pub mod parse;
6 6 pub mod routes;
7 + pub mod scan_pipeline;
7 8 pub mod ssh;
8 9 pub mod ssh_banner;
9 10 pub mod tls;
@@ -0,0 +1,266 @@
1 + //! Scan-pipeline health check — polls `<host>/admin/uploads/health.json` on
2 + //! a target makenotwork instance and applies the audit-doc thresholds.
3 + //!
4 + //! Thresholds (audit doc § 6):
5 + //! - Per-layer error rate > 10% over 1h → degraded
6 + //! - Per-layer success count == 0 over 24h → down (layer fully unavailable)
7 + //! (we use the cheaper proxy "no clean response in 24h" via
8 + //! `last_clean_secs_ago` from the upstream payload).
9 + //! - Queue depth > 50 pending → degraded
10 + //! - Stuck-scan count > 5 → degraded (workers falling behind)
11 + //! - Held-for-review total > 100 → degraded (review backlog growing)
12 + //!
13 + //! Anything that fires "down" elevates the overall status to Unreachable;
14 + //! anything "degraded" elevates to Degraded. Multiple issues all reported
15 + //! in the `issues` vec.
16 +
17 + use std::time::Duration;
18 +
19 + use tracing::instrument;
20 +
21 + use crate::types::{ScanLayerSnapshot, ScanPipelineCheckResult};
22 +
23 + const QUEUE_PENDING_THRESHOLD: i64 = 50;
24 + const STUCK_SCAN_THRESHOLD: i64 = 5;
25 + const HELD_BACKLOG_THRESHOLD: i64 = 100;
26 + const ERROR_RATE_PCT_THRESHOLD: i32 = 10;
27 + const LAYER_DOWN_AGE_SECS: i64 = 24 * 3600;
28 +
29 + #[derive(Debug, serde::Deserialize)]
30 + struct UpstreamLayer {
31 + layer: String,
32 + total_1h: i64,
33 + success_1h: i64,
34 + error_1h: i64,
35 + last_clean_secs_ago: Option<i64>,
36 + }
37 +
38 + #[derive(Debug, serde::Deserialize)]
39 + struct UpstreamHealth {
40 + queue_pending: i64,
41 + queue_running: i64,
42 + queue_stuck: i64,
43 + held_total: i64,
44 + layers: Vec<UpstreamLayer>,
45 + }
46 +
47 + #[instrument(skip_all)]
48 + pub async fn check_scan_pipeline(
49 + target_name: &str,
50 + base_url: &str,
51 + timeout_secs: u64,
52 + ) -> ScanPipelineCheckResult {
53 + let checked_at = chrono::Utc::now().to_rfc3339();
54 + let url = format!("{}/admin/uploads/health.json", base_url.trim_end_matches('/'));
55 +
56 + let client = match reqwest::Client::builder()
57 + .timeout(Duration::from_secs(timeout_secs))
58 + .build()
59 + {
60 + Ok(c) => c,
61 + Err(e) => return unreachable(target_name, &checked_at, format!("client build: {e}")),
62 + };
63 +
64 + let response = match client.get(&url).send().await {
65 + Ok(r) => r,
66 + Err(e) => return unreachable(target_name, &checked_at, format!("request: {e}")),
67 + };
68 +
69 + let status = response.status();
70 + if !status.is_success() {
71 + return unreachable(target_name, &checked_at, format!("HTTP {}", status.as_u16()));
72 + }
73 +
74 + let body: UpstreamHealth = match response.json().await {
75 + Ok(b) => b,
76 + Err(e) => return unreachable(target_name, &checked_at, format!("parse: {e}")),
77 + };
78 +
79 + classify(target_name, &checked_at, body)
80 + }
81 +
82 + fn unreachable(target: &str, checked_at: &str, msg: String) -> ScanPipelineCheckResult {
83 + ScanPipelineCheckResult {
84 + target: target.to_string(),
85 + status: "unreachable".to_string(),
86 + queue_pending: 0,
87 + queue_running: 0,
88 + queue_stuck: 0,
89 + held_total: 0,
90 + layers: Vec::new(),
91 + issues: vec![format!("scan-pipeline health endpoint: {msg}")],
92 + checked_at: checked_at.to_string(),
93 + error: Some(msg),
94 + }
95 + }
96 +
97 + fn classify(target: &str, checked_at: &str, body: UpstreamHealth) -> ScanPipelineCheckResult {
98 + let mut issues: Vec<String> = Vec::new();
99 + let mut any_down = false;
100 + let mut any_degraded = false;
101 +
102 + if body.queue_pending > QUEUE_PENDING_THRESHOLD {
103 + issues.push(format!("queue depth {} > {}", body.queue_pending, QUEUE_PENDING_THRESHOLD));
104 + any_degraded = true;
105 + }
106 + if body.queue_stuck > STUCK_SCAN_THRESHOLD {
107 + issues.push(format!("stuck scans {} > {}", body.queue_stuck, STUCK_SCAN_THRESHOLD));
108 + any_degraded = true;
109 + }
110 + if body.held_total > HELD_BACKLOG_THRESHOLD {
111 + issues.push(format!("held backlog {} > {}", body.held_total, HELD_BACKLOG_THRESHOLD));
112 + any_degraded = true;
113 + }
114 +
115 + let layers: Vec<ScanLayerSnapshot> = body.layers.into_iter().map(|l| {
116 + let mut layer_status = "operational";
117 + let error_rate_pct = if l.total_1h > 0 {
118 + (100 * l.error_1h / l.total_1h) as i32
119 + } else { 0 };
120 +
121 + let truly_idle = l.total_1h == 0
122 + && l.last_clean_secs_ago.is_none_or(|s| s > LAYER_DOWN_AGE_SECS);
123 + if truly_idle {
124 + layer_status = "down";
125 + any_down = true;
126 + issues.push(format!("layer '{}': no clean response in >24h", l.layer));
127 + } else if error_rate_pct > ERROR_RATE_PCT_THRESHOLD {
128 + layer_status = "degraded";
129 + any_degraded = true;
130 + issues.push(format!("layer '{}': error rate {}% > {}%", l.layer, error_rate_pct, ERROR_RATE_PCT_THRESHOLD));
131 + }
132 +
133 + ScanLayerSnapshot {
134 + layer: l.layer,
135 + total_1h: l.total_1h,
136 + success_1h: l.success_1h,
137 + error_1h: l.error_1h,
138 + last_clean_secs_ago: l.last_clean_secs_ago,
139 + status: layer_status.to_string(),
140 + }
141 + }).collect();
142 +
143 + let overall = if any_down {
144 + "unreachable"
145 + } else if any_degraded {
146 + "degraded"
147 + } else {
148 + "operational"
149 + };
150 +
151 + ScanPipelineCheckResult {
152 + target: target.to_string(),
153 + status: overall.to_string(),
154 + queue_pending: body.queue_pending,
155 + queue_running: body.queue_running,
156 + queue_stuck: body.queue_stuck,
157 + held_total: body.held_total,
158 + layers,
159 + issues,
160 + checked_at: checked_at.to_string(),
161 + error: None,
162 + }
163 + }
164 +
165 + #[cfg(test)]
166 + mod tests {
167 + use super::*;
168 +
169 + fn body(layers: Vec<(&str, i64, i64, i64, Option<i64>)>) -> UpstreamHealth {
170 + UpstreamHealth {
171 + queue_pending: 0,
172 + queue_running: 0,
173 + queue_stuck: 0,
174 + held_total: 0,
175 + layers: layers.into_iter().map(|(name, total, success, errors, last)| UpstreamLayer {
176 + layer: name.to_string(),
177 + total_1h: total,
178 + success_1h: success,
179 + error_1h: errors,
180 + last_clean_secs_ago: last,
181 + }).collect(),
182 + }
183 + }
184 +
185 + #[test]
186 + fn empty_pipeline_is_operational() {
187 + let b = UpstreamHealth { queue_pending: 0, queue_running: 0, queue_stuck: 0, held_total: 0, layers: vec![] };
188 + let r = classify("t", "now", b);
189 + assert_eq!(r.status, "operational");
190 + assert!(r.issues.is_empty());
191 + }
192 +
193 + #[test]
194 + fn queue_depth_threshold_degrades() {
195 + let mut b = body(vec![]);
196 + b.queue_pending = 99;
197 + let r = classify("t", "now", b);
198 + assert_eq!(r.status, "degraded");
199 + assert!(r.issues.iter().any(|i| i.contains("queue depth")));
200 + }
201 +
202 + #[test]
203 + fn stuck_scans_threshold_degrades() {
204 + let mut b = body(vec![]);
205 + b.queue_stuck = 6;
206 + let r = classify("t", "now", b);
207 + assert_eq!(r.status, "degraded");
208 + }
209 +
210 + #[test]
211 + fn held_backlog_threshold_degrades() {
212 + let mut b = body(vec![]);
213 + b.held_total = 150;
214 + let r = classify("t", "now", b);
215 + assert_eq!(r.status, "degraded");
216 + }
217 +
218 + #[test]
219 + fn high_layer_error_rate_degrades() {
220 + // 20 of 100 are errors = 20% > 10% threshold.
221 + let b = body(vec![("malwarebazaar", 100, 80, 20, Some(60))]);
222 + let r = classify("t", "now", b);
223 + assert_eq!(r.status, "degraded");
224 + assert!(r.issues.iter().any(|i| i.contains("error rate")));
225 + }
226 +
227 + #[test]
228 + fn layer_silent_for_24h_is_down() {
229 + let b = body(vec![("clamav", 0, 0, 0, None)]);
230 + let r = classify("t", "now", b);
231 + assert_eq!(r.status, "unreachable");
232 + assert!(r.issues.iter().any(|i| i.contains("no clean response in >24h")));
233 + }
234 +
235 + #[test]
236 + fn layer_silent_with_stale_clean_is_down() {
237 + // 2 days ago = 172_800 secs ago, beyond 24h threshold.
238 + let b = body(vec![("urlhaus", 0, 0, 0, Some(172_800))]);
239 + let r = classify("t", "now", b);
240 + assert_eq!(r.status, "unreachable");
241 + }
242 +
243 + #[test]
244 + fn layer_recently_clean_is_operational_even_with_no_recent_volume() {
245 + // 0 scans in last hour but a clean response 30 min ago → operational.
246 + let b = body(vec![("yara", 0, 0, 0, Some(1800))]);
247 + let r = classify("t", "now", b);
248 + assert_eq!(r.status, "operational");
249 + }
250 +
251 + #[test]
252 + fn down_layer_beats_degraded_signals() {
253 + let mut b = body(vec![("malwarebazaar", 0, 0, 0, None)]);
254 + b.queue_pending = 99;
255 + let r = classify("t", "now", b);
256 + assert_eq!(r.status, "unreachable", "down beats degraded");
257 + }
258 +
259 + #[test]
260 + fn boundary_error_rate_does_not_trigger() {
261 + // exactly 10% — at the boundary, not over.
262 + let b = body(vec![("malwarebazaar", 100, 90, 10, Some(60))]);
263 + let r = classify("t", "now", b);
264 + assert_eq!(r.status, "operational");
265 + }
266 + }
@@ -63,6 +63,7 @@ pub(crate) async fn cmd_serve(
63 63 handles.extend(tasks::spawn_whois_tasks(config, pool, &token, &alerter));
64 64 handles.extend(tasks::spawn_cors_tasks(config, pool, &token, &alerter));
65 65 handles.extend(tasks::spawn_backup_tasks(config, pool, &token, &alerter));
66 + handles.extend(tasks::spawn_scan_pipeline_tasks(config, &token, &alerter));
66 67 handles.push(tasks::spawn_prune_task(pool, prune_days, &token));
67 68
68 69 // Spawn peer heartbeat tasks
@@ -5,6 +5,7 @@ mod health;
5 5 mod meta_alert;
6 6 mod prune;
7 7 mod routes;
8 + mod scan_pipeline;
8 9 mod tls;
9 10 mod whois;
10 11
@@ -15,5 +16,6 @@ pub(crate) use health::spawn_health_tasks;
15 16 pub(crate) use meta_alert::spawn_meta_alert_task;
16 17 pub(crate) use prune::spawn_prune_task;
17 18 pub(crate) use routes::spawn_route_tasks;
19 + pub(crate) use scan_pipeline::spawn_scan_pipeline_tasks;
18 20 pub(crate) use tls::spawn_tls_tasks;
19 21 pub(crate) use whois::spawn_whois_tasks;
@@ -0,0 +1,85 @@
1 + //! Scan-pipeline health-check task. Polls the makenotwork health endpoint
2 + //! on a per-target interval, applies thresholds (audit doc § 6), and fires
3 + //! alerts on operational → degraded / unreachable transitions.
4 +
5 + use tokio::task::JoinHandle;
6 + use tracing::{info, warn};
7 +
8 + use pom::alerts::Alerter;
9 + use pom::checks::scan_pipeline;
10 + use pom::config::Config;
11 +
12 + pub(crate) fn spawn_scan_pipeline_tasks(
13 + config: &Config,
14 + cancel: &tokio_util::sync::CancellationToken,
15 + alerter: &Option<Alerter>,
16 + ) -> Vec<JoinHandle<()>> {
17 + let mut handles = Vec::new();
18 +
19 + for name in config.target_names() {
20 + let target_config = config.get_target(&name).unwrap().clone();
21 + let Some(sp_config) = target_config.scan_pipeline else { continue };
22 +
23 + let name = name.clone();
24 + let label = target_config.label.clone();
25 + let alerter = alerter.clone();
26 + let cancel = cancel.clone();
27 +
28 + info!(
29 + "{name}: scan-pipeline check every {}s against {}",
30 + sp_config.interval_secs, sp_config.base_url,
31 + );
32 +
33 + handles.push(tokio::spawn(async move {
34 + let mut interval = tokio::time::interval(
35 + std::time::Duration::from_secs(sp_config.interval_secs),
36 + );
37 + interval.tick().await; // consume immediate first tick
38 +
39 + // Track previous status so we only fire alerts on transitions.
40 + // None means we haven't observed yet.
41 + let mut previous_status: Option<String> = None;
42 +
43 + loop {
44 + tokio::select! {
45 + _ = cancel.cancelled() => break,
46 + _ = interval.tick() => {}
47 + }
48 +
49 + let result = scan_pipeline::check_scan_pipeline(
50 + &name, &sp_config.base_url, sp_config.timeout_secs,
51 + ).await;
52 +
53 + if result.issues.is_empty() && result.error.is_none() {
54 + info!("{name}: scan pipeline operational (queue p={}/r={}, held={})",
55 + result.queue_pending, result.queue_running, result.held_total);
56 + } else {
57 + warn!(
58 + target = %name,
59 + status = %result.status,
60 + issues = ?result.issues,
61 + "scan pipeline non-operational"
62 + );
63 + }
64 +
65 + // Fire alerts on status transitions only.
66 + if let Some(ref alerter) = alerter {
67 + let prev_ok = previous_status.as_deref().is_none_or(|s| s == "operational");
68 + let now_ok = result.status == "operational";
69 +
70 + if prev_ok && !now_ok {
71 + alerter.send_scan_pipeline_alert(
72 + &name, &label, &result.status, &result.issues,
73 + ).await;
74 + } else if !prev_ok && now_ok {
75 + alerter.send_scan_pipeline_recovery(&name, &label).await;
76 + }
77 + }
78 +
79 + previous_status = Some(result.status);
80 + }
81 + }));
82 + }
83 +
84 + handles
85 + }
@@ -190,9 +190,27 @@ pub struct TargetConfig {
190 190 pub backups: Option<BackupConfig>,
191 191 /// SSH banner check (TCP connect + verify "SSH-" banner). `None` disables.
192 192 pub ssh_banner: Option<SshBannerConfig>,
193 + /// Scan-pipeline health check against `<base_url>/admin/uploads/health.json`.
194 + /// `None` disables.
195 + pub scan_pipeline: Option<ScanPipelineConfig>,
193 196 }
194 197
195 198 #[derive(Debug, Clone, Deserialize)]
199 + pub struct ScanPipelineConfig {
200 + /// Base URL of the makenotwork instance (e.g. "https://makenot.work").
201 + pub base_url: String,
202 + /// Check interval. Defaults to 300s (5 min).
203 + #[serde(default = "default_scan_pipeline_interval")]
204 + pub interval_secs: u64,
205 + /// HTTP request timeout. Defaults to 10s.
206 + #[serde(default = "default_scan_pipeline_timeout")]
207 + pub timeout_secs: u64,
208 + }
209 +
210 + fn default_scan_pipeline_interval() -> u64 { 300 }
211 + fn default_scan_pipeline_timeout() -> u64 { 10 }
212 +
213 + #[derive(Debug, Clone, Deserialize)]
196 214 pub struct DnsRecord {
197 215 /// Hostname to resolve (e.g. "makenot.work").
198 216 pub name: String,
@@ -29,6 +29,8 @@ pub enum AlertCategory {
29 29 CorsRecovery,
30 30 BackupStale,
31 31 BackupRecovery,
32 + ScanPipelineDegraded,
33 + ScanPipelineRecovery,
32 34 MonitoringOffline,
33 35 MonitoringRecovery,
34 36 }
@@ -56,6 +58,8 @@ impl fmt::Display for AlertCategory {
56 58 Self::CorsRecovery => write!(f, "cors_recovery"),
57 59 Self::BackupStale => write!(f, "backup_stale"),
58 60 Self::BackupRecovery => write!(f, "backup_recovery"),
61 + Self::ScanPipelineDegraded => write!(f, "scan_pipeline_degraded"),
62 + Self::ScanPipelineRecovery => write!(f, "scan_pipeline_recovery"),
59 63 Self::MonitoringOffline => write!(f, "monitoring_offline"),
60 64 Self::MonitoringRecovery => write!(f, "monitoring_recovery"),
61 65 }
@@ -86,6 +90,8 @@ impl std::str::FromStr for AlertCategory {
86 90 "cors_recovery" => Ok(Self::CorsRecovery),
87 91 "backup_stale" => Ok(Self::BackupStale),
88 92 "backup_recovery" => Ok(Self::BackupRecovery),
93 + "scan_pipeline_degraded" => Ok(Self::ScanPipelineDegraded),
94 + "scan_pipeline_recovery" => Ok(Self::ScanPipelineRecovery),
89 95 "monitoring_offline" => Ok(Self::MonitoringOffline),
90 96 "monitoring_recovery" => Ok(Self::MonitoringRecovery),
91 97 other => Err(format!("unknown alert category: {other}")),
@@ -414,6 +420,37 @@ pub struct CorsCheckResult {
414 420 }
415 421
416 422 #[derive(Debug, Clone, Serialize, Deserialize)]
423 + pub struct ScanLayerSnapshot {
424 + pub layer: String,
425 + pub total_1h: i64,
426 + pub success_1h: i64,
427 + pub error_1h: i64,
428 + pub last_clean_secs_ago: Option<i64>,
429 + /// Status derived from the audit-doc thresholds.
430 + pub status: String,
431 + }
432 +
433 + #[derive(Debug, Clone, Serialize, Deserialize)]
434 + pub struct ScanPipelineCheckResult {
435 + /// Config key identifying the monitored target.
436 + pub target: String,
437 + /// Overall pipeline status — worst of any layer + queue + backlog signal.
438 + pub status: String,
439 + pub queue_pending: i64,
440 + pub queue_running: i64,
441 + pub queue_stuck: i64,
442 + pub held_total: i64,
443 + pub layers: Vec<ScanLayerSnapshot>,
444 + /// Aggregated issues found (one short line per fired threshold).
445 + pub issues: Vec<String>,
446 + /// When this check was performed, in RFC 3339 format (UTC).
447 + pub checked_at: String,
448 + /// Error message if the upstream endpoint was unreachable or returned
449 + /// an unexpected shape. `None` on success even if `issues` is non-empty.
450 + pub error: Option<String>,
451 + }
452 +
453 + #[derive(Debug, Clone, Serialize, Deserialize)]
417 454 pub struct BackupCheckResult {
418 455 /// Config key identifying the monitored target.
419 456 pub target: String,