Skip to main content

max / makenotwork

8.6 KB · 267 lines History Blame Raw
1 //! Scan-pipeline health check — polls `<host>/admin/uploads/health.json` on
2 //! a target makenotwork instance and applies the audit-doc thresholds.
3 //!
4 //! Thresholds (audit doc § 6):
5 //! - Per-layer error rate > 10% over 1h → degraded
6 //! - Per-layer success count == 0 over 24h → down (layer fully unavailable)
7 //! (we use the cheaper proxy "no clean response in 24h" via
8 //! `last_clean_secs_ago` from the upstream payload).
9 //! - Queue depth > 50 pending → degraded
10 //! - Stuck-scan count > 5 → degraded (workers falling behind)
11 //! - Held-for-review total > 100 → degraded (review backlog growing)
12 //!
13 //! Anything that fires "down" elevates the overall status to Unreachable;
14 //! anything "degraded" elevates to Degraded. Multiple issues all reported
15 //! in the `issues` vec.
16
17 use std::time::Duration;
18
19 use tracing::instrument;
20
21 use crate::types::{ScanLayerSnapshot, ScanPipelineCheckResult};
22
23 const QUEUE_PENDING_THRESHOLD: i64 = 50;
24 const STUCK_SCAN_THRESHOLD: i64 = 5;
25 const HELD_BACKLOG_THRESHOLD: i64 = 100;
26 const ERROR_RATE_PCT_THRESHOLD: i32 = 10;
27 const LAYER_DOWN_AGE_SECS: i64 = 24 * 3600;
28
29 #[derive(Debug, serde::Deserialize)]
30 struct UpstreamLayer {
31 layer: String,
32 total_1h: i64,
33 success_1h: i64,
34 error_1h: i64,
35 last_clean_secs_ago: Option<i64>,
36 }
37
38 #[derive(Debug, serde::Deserialize)]
39 struct UpstreamHealth {
40 queue_pending: i64,
41 queue_running: i64,
42 queue_stuck: i64,
43 held_total: i64,
44 layers: Vec<UpstreamLayer>,
45 }
46
47 #[instrument(skip_all)]
48 pub async fn check_scan_pipeline(
49 target_name: &str,
50 base_url: &str,
51 timeout_secs: u64,
52 ) -> ScanPipelineCheckResult {
53 let checked_at = chrono::Utc::now().to_rfc3339();
54 let url = format!("{}/admin/uploads/health.json", base_url.trim_end_matches('/'));
55
56 let client = match reqwest::Client::builder()
57 .timeout(Duration::from_secs(timeout_secs))
58 .build()
59 {
60 Ok(c) => c,
61 Err(e) => return unreachable(target_name, &checked_at, format!("client build: {e}")),
62 };
63
64 let response = match client.get(&url).send().await {
65 Ok(r) => r,
66 Err(e) => return unreachable(target_name, &checked_at, format!("request: {e}")),
67 };
68
69 let status = response.status();
70 if !status.is_success() {
71 return unreachable(target_name, &checked_at, format!("HTTP {}", status.as_u16()));
72 }
73
74 let body: UpstreamHealth = match response.json().await {
75 Ok(b) => b,
76 Err(e) => return unreachable(target_name, &checked_at, format!("parse: {e}")),
77 };
78
79 classify(target_name, &checked_at, body)
80 }
81
82 fn unreachable(target: &str, checked_at: &str, msg: String) -> ScanPipelineCheckResult {
83 ScanPipelineCheckResult {
84 target: target.to_string(),
85 status: "unreachable".to_string(),
86 queue_pending: 0,
87 queue_running: 0,
88 queue_stuck: 0,
89 held_total: 0,
90 layers: Vec::new(),
91 issues: vec![format!("scan-pipeline health endpoint: {msg}")],
92 checked_at: checked_at.to_string(),
93 error: Some(msg),
94 }
95 }
96
97 fn classify(target: &str, checked_at: &str, body: UpstreamHealth) -> ScanPipelineCheckResult {
98 let mut issues: Vec<String> = Vec::new();
99 let mut any_down = false;
100 let mut any_degraded = false;
101
102 if body.queue_pending > QUEUE_PENDING_THRESHOLD {
103 issues.push(format!("queue depth {} > {}", body.queue_pending, QUEUE_PENDING_THRESHOLD));
104 any_degraded = true;
105 }
106 if body.queue_stuck > STUCK_SCAN_THRESHOLD {
107 issues.push(format!("stuck scans {} > {}", body.queue_stuck, STUCK_SCAN_THRESHOLD));
108 any_degraded = true;
109 }
110 if body.held_total > HELD_BACKLOG_THRESHOLD {
111 issues.push(format!("held backlog {} > {}", body.held_total, HELD_BACKLOG_THRESHOLD));
112 any_degraded = true;
113 }
114
115 let layers: Vec<ScanLayerSnapshot> = body.layers.into_iter().map(|l| {
116 let mut layer_status = "operational";
117 let error_rate_pct = if l.total_1h > 0 {
118 (100 * l.error_1h / l.total_1h) as i32
119 } else { 0 };
120
121 let truly_idle = l.total_1h == 0
122 && l.last_clean_secs_ago.is_none_or(|s| s > LAYER_DOWN_AGE_SECS);
123 if truly_idle {
124 layer_status = "down";
125 any_down = true;
126 issues.push(format!("layer '{}': no clean response in >24h", l.layer));
127 } else if error_rate_pct > ERROR_RATE_PCT_THRESHOLD {
128 layer_status = "degraded";
129 any_degraded = true;
130 issues.push(format!("layer '{}': error rate {}% > {}%", l.layer, error_rate_pct, ERROR_RATE_PCT_THRESHOLD));
131 }
132
133 ScanLayerSnapshot {
134 layer: l.layer,
135 total_1h: l.total_1h,
136 success_1h: l.success_1h,
137 error_1h: l.error_1h,
138 last_clean_secs_ago: l.last_clean_secs_ago,
139 status: layer_status.to_string(),
140 }
141 }).collect();
142
143 let overall = if any_down {
144 "unreachable"
145 } else if any_degraded {
146 "degraded"
147 } else {
148 "operational"
149 };
150
151 ScanPipelineCheckResult {
152 target: target.to_string(),
153 status: overall.to_string(),
154 queue_pending: body.queue_pending,
155 queue_running: body.queue_running,
156 queue_stuck: body.queue_stuck,
157 held_total: body.held_total,
158 layers,
159 issues,
160 checked_at: checked_at.to_string(),
161 error: None,
162 }
163 }
164
165 #[cfg(test)]
166 mod tests {
167 use super::*;
168
169 fn body(layers: Vec<(&str, i64, i64, i64, Option<i64>)>) -> UpstreamHealth {
170 UpstreamHealth {
171 queue_pending: 0,
172 queue_running: 0,
173 queue_stuck: 0,
174 held_total: 0,
175 layers: layers.into_iter().map(|(name, total, success, errors, last)| UpstreamLayer {
176 layer: name.to_string(),
177 total_1h: total,
178 success_1h: success,
179 error_1h: errors,
180 last_clean_secs_ago: last,
181 }).collect(),
182 }
183 }
184
185 #[test]
186 fn empty_pipeline_is_operational() {
187 let b = UpstreamHealth { queue_pending: 0, queue_running: 0, queue_stuck: 0, held_total: 0, layers: vec![] };
188 let r = classify("t", "now", b);
189 assert_eq!(r.status, "operational");
190 assert!(r.issues.is_empty());
191 }
192
193 #[test]
194 fn queue_depth_threshold_degrades() {
195 let mut b = body(vec![]);
196 b.queue_pending = 99;
197 let r = classify("t", "now", b);
198 assert_eq!(r.status, "degraded");
199 assert!(r.issues.iter().any(|i| i.contains("queue depth")));
200 }
201
202 #[test]
203 fn stuck_scans_threshold_degrades() {
204 let mut b = body(vec![]);
205 b.queue_stuck = 6;
206 let r = classify("t", "now", b);
207 assert_eq!(r.status, "degraded");
208 }
209
210 #[test]
211 fn held_backlog_threshold_degrades() {
212 let mut b = body(vec![]);
213 b.held_total = 150;
214 let r = classify("t", "now", b);
215 assert_eq!(r.status, "degraded");
216 }
217
218 #[test]
219 fn high_layer_error_rate_degrades() {
220 // 20 of 100 are errors = 20% > 10% threshold.
221 let b = body(vec![("malwarebazaar", 100, 80, 20, Some(60))]);
222 let r = classify("t", "now", b);
223 assert_eq!(r.status, "degraded");
224 assert!(r.issues.iter().any(|i| i.contains("error rate")));
225 }
226
227 #[test]
228 fn layer_silent_for_24h_is_down() {
229 let b = body(vec![("clamav", 0, 0, 0, None)]);
230 let r = classify("t", "now", b);
231 assert_eq!(r.status, "unreachable");
232 assert!(r.issues.iter().any(|i| i.contains("no clean response in >24h")));
233 }
234
235 #[test]
236 fn layer_silent_with_stale_clean_is_down() {
237 // 2 days ago = 172_800 secs ago, beyond 24h threshold.
238 let b = body(vec![("urlhaus", 0, 0, 0, Some(172_800))]);
239 let r = classify("t", "now", b);
240 assert_eq!(r.status, "unreachable");
241 }
242
243 #[test]
244 fn layer_recently_clean_is_operational_even_with_no_recent_volume() {
245 // 0 scans in last hour but a clean response 30 min ago → operational.
246 let b = body(vec![("yara", 0, 0, 0, Some(1800))]);
247 let r = classify("t", "now", b);
248 assert_eq!(r.status, "operational");
249 }
250
251 #[test]
252 fn down_layer_beats_degraded_signals() {
253 let mut b = body(vec![("malwarebazaar", 0, 0, 0, None)]);
254 b.queue_pending = 99;
255 let r = classify("t", "now", b);
256 assert_eq!(r.status, "unreachable", "down beats degraded");
257 }
258
259 #[test]
260 fn boundary_error_rate_does_not_trigger() {
261 // exactly 10% — at the boundary, not over.
262 let b = body(vec![("malwarebazaar", 100, 90, 10, Some(60))]);
263 let r = classify("t", "now", b);
264 assert_eq!(r.status, "operational");
265 }
266 }
267