//! Scan-pipeline health check — polls `/admin/uploads/health.json` on //! a target makenotwork instance and applies the audit-doc thresholds. //! //! Thresholds (audit doc § 6): //! - Per-layer error rate > 10% over 1h → degraded //! - Per-layer success count == 0 over 24h → down (layer fully unavailable) //! (we use the cheaper proxy "no clean response in 24h" via //! `last_clean_secs_ago` from the upstream payload). //! - Queue depth > 50 pending → degraded //! - Stuck-scan count > 5 → degraded (workers falling behind) //! - Held-for-review total > 100 → degraded (review backlog growing) //! //! Anything that fires "down" elevates the overall status to Unreachable; //! anything "degraded" elevates to Degraded. Multiple issues all reported //! in the `issues` vec. use std::time::Duration; use tracing::instrument; use crate::types::{ScanLayerSnapshot, ScanPipelineCheckResult}; const QUEUE_PENDING_THRESHOLD: i64 = 50; const STUCK_SCAN_THRESHOLD: i64 = 5; const HELD_BACKLOG_THRESHOLD: i64 = 100; const ERROR_RATE_PCT_THRESHOLD: i32 = 10; const LAYER_DOWN_AGE_SECS: i64 = 24 * 3600; #[derive(Debug, serde::Deserialize)] struct UpstreamLayer { layer: String, total_1h: i64, success_1h: i64, error_1h: i64, last_clean_secs_ago: Option, } #[derive(Debug, serde::Deserialize)] struct UpstreamHealth { queue_pending: i64, queue_running: i64, queue_stuck: i64, held_total: i64, layers: Vec, } #[instrument(skip_all)] pub async fn check_scan_pipeline( target_name: &str, base_url: &str, timeout_secs: u64, ) -> ScanPipelineCheckResult { let checked_at = chrono::Utc::now().to_rfc3339(); let url = format!("{}/admin/uploads/health.json", base_url.trim_end_matches('/')); let client = match reqwest::Client::builder() .timeout(Duration::from_secs(timeout_secs)) .build() { Ok(c) => c, Err(e) => return unreachable(target_name, &checked_at, format!("client build: {e}")), }; let response = match client.get(&url).send().await { Ok(r) => r, Err(e) => return unreachable(target_name, &checked_at, format!("request: {e}")), }; let status = response.status(); if !status.is_success() { return unreachable(target_name, &checked_at, format!("HTTP {}", status.as_u16())); } let body: UpstreamHealth = match response.json().await { Ok(b) => b, Err(e) => return unreachable(target_name, &checked_at, format!("parse: {e}")), }; classify(target_name, &checked_at, body) } fn unreachable(target: &str, checked_at: &str, msg: String) -> ScanPipelineCheckResult { ScanPipelineCheckResult { target: target.to_string(), status: "unreachable".to_string(), queue_pending: 0, queue_running: 0, queue_stuck: 0, held_total: 0, layers: Vec::new(), issues: vec![format!("scan-pipeline health endpoint: {msg}")], checked_at: checked_at.to_string(), error: Some(msg), } } fn classify(target: &str, checked_at: &str, body: UpstreamHealth) -> ScanPipelineCheckResult { let mut issues: Vec = Vec::new(); let mut any_down = false; let mut any_degraded = false; if body.queue_pending > QUEUE_PENDING_THRESHOLD { issues.push(format!("queue depth {} > {}", body.queue_pending, QUEUE_PENDING_THRESHOLD)); any_degraded = true; } if body.queue_stuck > STUCK_SCAN_THRESHOLD { issues.push(format!("stuck scans {} > {}", body.queue_stuck, STUCK_SCAN_THRESHOLD)); any_degraded = true; } if body.held_total > HELD_BACKLOG_THRESHOLD { issues.push(format!("held backlog {} > {}", body.held_total, HELD_BACKLOG_THRESHOLD)); any_degraded = true; } let layers: Vec = body.layers.into_iter().map(|l| { let mut layer_status = "operational"; let error_rate_pct = if l.total_1h > 0 { (100 * l.error_1h / l.total_1h) as i32 } else { 0 }; let truly_idle = l.total_1h == 0 && l.last_clean_secs_ago.is_none_or(|s| s > LAYER_DOWN_AGE_SECS); if truly_idle { layer_status = "down"; any_down = true; issues.push(format!("layer '{}': no clean response in >24h", l.layer)); } else if error_rate_pct > ERROR_RATE_PCT_THRESHOLD { layer_status = "degraded"; any_degraded = true; issues.push(format!("layer '{}': error rate {}% > {}%", l.layer, error_rate_pct, ERROR_RATE_PCT_THRESHOLD)); } ScanLayerSnapshot { layer: l.layer, total_1h: l.total_1h, success_1h: l.success_1h, error_1h: l.error_1h, last_clean_secs_ago: l.last_clean_secs_ago, status: layer_status.to_string(), } }).collect(); let overall = if any_down { "unreachable" } else if any_degraded { "degraded" } else { "operational" }; ScanPipelineCheckResult { target: target.to_string(), status: overall.to_string(), queue_pending: body.queue_pending, queue_running: body.queue_running, queue_stuck: body.queue_stuck, held_total: body.held_total, layers, issues, checked_at: checked_at.to_string(), error: None, } } #[cfg(test)] mod tests { use super::*; fn body(layers: Vec<(&str, i64, i64, i64, Option)>) -> UpstreamHealth { UpstreamHealth { queue_pending: 0, queue_running: 0, queue_stuck: 0, held_total: 0, layers: layers.into_iter().map(|(name, total, success, errors, last)| UpstreamLayer { layer: name.to_string(), total_1h: total, success_1h: success, error_1h: errors, last_clean_secs_ago: last, }).collect(), } } #[test] fn empty_pipeline_is_operational() { let b = UpstreamHealth { queue_pending: 0, queue_running: 0, queue_stuck: 0, held_total: 0, layers: vec![] }; let r = classify("t", "now", b); assert_eq!(r.status, "operational"); assert!(r.issues.is_empty()); } #[test] fn queue_depth_threshold_degrades() { let mut b = body(vec![]); b.queue_pending = 99; let r = classify("t", "now", b); assert_eq!(r.status, "degraded"); assert!(r.issues.iter().any(|i| i.contains("queue depth"))); } #[test] fn stuck_scans_threshold_degrades() { let mut b = body(vec![]); b.queue_stuck = 6; let r = classify("t", "now", b); assert_eq!(r.status, "degraded"); } #[test] fn held_backlog_threshold_degrades() { let mut b = body(vec![]); b.held_total = 150; let r = classify("t", "now", b); assert_eq!(r.status, "degraded"); } #[test] fn high_layer_error_rate_degrades() { // 20 of 100 are errors = 20% > 10% threshold. let b = body(vec![("malwarebazaar", 100, 80, 20, Some(60))]); let r = classify("t", "now", b); assert_eq!(r.status, "degraded"); assert!(r.issues.iter().any(|i| i.contains("error rate"))); } #[test] fn layer_silent_for_24h_is_down() { let b = body(vec![("clamav", 0, 0, 0, None)]); let r = classify("t", "now", b); assert_eq!(r.status, "unreachable"); assert!(r.issues.iter().any(|i| i.contains("no clean response in >24h"))); } #[test] fn layer_silent_with_stale_clean_is_down() { // 2 days ago = 172_800 secs ago, beyond 24h threshold. let b = body(vec![("urlhaus", 0, 0, 0, Some(172_800))]); let r = classify("t", "now", b); assert_eq!(r.status, "unreachable"); } #[test] fn layer_recently_clean_is_operational_even_with_no_recent_volume() { // 0 scans in last hour but a clean response 30 min ago → operational. let b = body(vec![("yara", 0, 0, 0, Some(1800))]); let r = classify("t", "now", b); assert_eq!(r.status, "operational"); } #[test] fn down_layer_beats_degraded_signals() { let mut b = body(vec![("malwarebazaar", 0, 0, 0, None)]); b.queue_pending = 99; let r = classify("t", "now", b); assert_eq!(r.status, "unreachable", "down beats degraded"); } #[test] fn boundary_error_rate_does_not_trigger() { // exactly 10% — at the boundary, not over. let b = body(vec![("malwarebazaar", 100, 90, 10, Some(60))]); let r = classify("t", "now", b); assert_eq!(r.status, "operational"); } }