//! Layer 5: ClamAV daemon scanning via Unix socket. //! //! Connects to `clamd` using the INSTREAM protocol: sends file data in //! length-prefixed chunks, reads the verdict. Optional — only runs if //! CLAMAV_SOCKET env var is set. use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::UnixStream; use crate::constants; use super::{ErrorPolicy, LayerResult, LayerVerdict}; /// External local-service layer. An `Error` here means `clamd` is down, /// unreachable, or out of memory — operational problems that must not block /// every upload across the platform. Fail open; PoM surfaces degraded health. pub const ERROR_POLICY: ErrorPolicy = ErrorPolicy::FailOpen; /// Verify the `clamd` socket is reachable and responsive. Used at startup /// to refuse to boot if scanning is configured but the daemon is dead — /// otherwise the FailOpen policy would silently pass every upload as Clean /// for the lifetime of the process. pub async fn ping(socket_path: &str) -> Result<(), String> { let timeout = Duration::from_secs(2); let work = async { let mut stream = UnixStream::connect(socket_path) .await .map_err(|e| format!("connect: {e}"))?; stream .write_all(b"zPING\0") .await .map_err(|e| format!("write PING: {e}"))?; let mut response = Vec::with_capacity(8); stream .take(16) .read_to_end(&mut response) .await .map_err(|e| format!("read PONG: {e}"))?; let reply = String::from_utf8_lossy(&response); let reply = reply.trim_end_matches('\0').trim(); if reply == "PONG" { Ok(()) } else { Err(format!("unexpected PING response: {reply:?}")) } }; tokio::time::timeout(timeout, work) .await .map_err(|_| "ClamAV PING timed out after 2s".to_string())? } /// Maximum chunk size for INSTREAM protocol (ClamAV default) const CHUNK_SIZE: usize = 8192; /// Scan file data using a ClamAV daemon via Unix socket. pub async fn scan_with_clamav(socket_path: &str, data: &[u8]) -> LayerResult { let timeout = Duration::from_secs(constants::SCAN_CLAMAV_TIMEOUT_SECS); match tokio::time::timeout(timeout, scan_instream(socket_path, data)).await { Ok(Ok(result)) => result, Ok(Err(e)) => LayerResult { layer: "clamav", verdict: LayerVerdict::Error, detail: Some(format!("ClamAV error: {}", e)), }, Err(_) => LayerResult { layer: "clamav", verdict: LayerVerdict::Error, detail: Some("ClamAV scan timed out".to_string()), }, } } /// Streaming entry. Pumps `reader` into ClamAV INSTREAM frame-by-frame /// rather than buffering the whole object first. Frame shape matches the /// buffered path: 4-byte BE length + bytes, terminated with `0u32`. pub async fn scan_with_clamav_stream(socket_path: &str, reader: R) -> LayerResult where R: tokio::io::AsyncRead + Unpin, { let timeout = Duration::from_secs(constants::SCAN_CLAMAV_TIMEOUT_SECS); match tokio::time::timeout(timeout, scan_instream_streaming(socket_path, reader)).await { Ok(Ok(result)) => result, Ok(Err(e)) => LayerResult { layer: "clamav", verdict: LayerVerdict::Error, detail: Some(format!("ClamAV error: {}", e)), }, Err(_) => LayerResult { layer: "clamav", verdict: LayerVerdict::Error, detail: Some("ClamAV scan timed out".to_string()), }, } } async fn scan_instream(socket_path: &str, data: &[u8]) -> Result { let mut stream = UnixStream::connect(socket_path) .await .map_err(|e| format!("Failed to connect to ClamAV socket: {}", e))?; // Send INSTREAM command stream .write_all(b"zINSTREAM\0") .await .map_err(|e| format!("Failed to send INSTREAM command: {}", e))?; // Send data in chunks: each chunk prefixed with 4-byte big-endian length for chunk in data.chunks(CHUNK_SIZE) { let len = (chunk.len() as u32).to_be_bytes(); stream .write_all(&len) .await .map_err(|e| format!("Failed to write chunk length: {}", e))?; stream .write_all(chunk) .await .map_err(|e| format!("Failed to write chunk data: {}", e))?; } // Send zero-length chunk to signal end of data stream .write_all(&0u32.to_be_bytes()) .await .map_err(|e| format!("Failed to send end marker: {}", e))?; // Read response (capped at 16 KB — verdicts are typically under 100 bytes) let mut response = Vec::with_capacity(256); stream .take(16_384) .read_to_end(&mut response) .await .map_err(|e| format!("Failed to read ClamAV response: {}", e))?; // A 16 KB response means we hit the take() cap — clamd's verdict was // longer than expected (typical verdicts are <100 bytes) and we don't // know how much was truncated. Treat as suspicious / fail-closed rather // than letting `parse_clamav_response` see a half-token and return Error // (which under FailOpen would silently Pass). EICAR's "FOUND" suffix // might be just beyond the cap; assume the worst. if response.len() >= 16_384 { return Ok(LayerResult { layer: "clamav", verdict: LayerVerdict::Fail, detail: Some("ClamAV response truncated at 16 KB cap — refusing to interpret".to_string()), }); } let response_str = String::from_utf8_lossy(&response); let response_str = response_str.trim_end_matches('\0').trim(); Ok(parse_clamav_response(response_str)) } async fn scan_instream_streaming( socket_path: &str, mut reader: R, ) -> Result where R: tokio::io::AsyncRead + Unpin, { let mut stream = UnixStream::connect(socket_path) .await .map_err(|e| format!("Failed to connect to ClamAV socket: {}", e))?; stream .write_all(b"zINSTREAM\0") .await .map_err(|e| format!("Failed to send INSTREAM command: {}", e))?; let mut buf = vec![0u8; CHUNK_SIZE]; loop { let n = reader .read(&mut buf) .await .map_err(|e| format!("Failed to read source: {}", e))?; if n == 0 { break; } let len = (n as u32).to_be_bytes(); stream .write_all(&len) .await .map_err(|e| format!("Failed to write chunk length: {}", e))?; stream .write_all(&buf[..n]) .await .map_err(|e| format!("Failed to write chunk data: {}", e))?; } stream .write_all(&0u32.to_be_bytes()) .await .map_err(|e| format!("Failed to send end marker: {}", e))?; let mut response = Vec::with_capacity(256); stream .take(16_384) .read_to_end(&mut response) .await .map_err(|e| format!("Failed to read ClamAV response: {}", e))?; // A 16 KB response means we hit the take() cap — clamd's verdict was // longer than expected (typical verdicts are <100 bytes) and we don't // know how much was truncated. Treat as suspicious / fail-closed rather // than letting `parse_clamav_response` see a half-token and return Error // (which under FailOpen would silently Pass). EICAR's "FOUND" suffix // might be just beyond the cap; assume the worst. if response.len() >= 16_384 { return Ok(LayerResult { layer: "clamav", verdict: LayerVerdict::Fail, detail: Some("ClamAV response truncated at 16 KB cap — refusing to interpret".to_string()), }); } let response_str = String::from_utf8_lossy(&response); let response_str = response_str.trim_end_matches('\0').trim(); Ok(parse_clamav_response(response_str)) } /// Parse a ClamAV INSTREAM response string into a LayerResult. /// Extracted for testability — the socket/IO layer just feeds the string in. /// /// ClamAV response format: /// - `"stream: OK"` — clean /// - `"stream: FOUND"` — infected /// - `"stream: ERROR"` — scan error fn parse_clamav_response(response: &str) -> LayerResult { if response == "stream: OK" { LayerResult { layer: "clamav", verdict: LayerVerdict::Pass, detail: None, } } else if response.ends_with("FOUND") { // Extract virus name: "stream: Eicar-Signature FOUND" → "Eicar-Signature" let virus_name = response .strip_prefix("stream: ") .unwrap_or(response) .strip_suffix(" FOUND") .unwrap_or(response); LayerResult { layer: "clamav", verdict: LayerVerdict::Fail, detail: Some(format!("ClamAV detection: {}", virus_name)), } } else { LayerResult { layer: "clamav", verdict: LayerVerdict::Error, detail: Some(format!("Unexpected ClamAV response: {}", response)), } } } #[cfg(test)] mod tests { use super::*; // -- Clean responses -- #[test] fn clean_response_passes() { let result = parse_clamav_response("stream: OK"); assert_eq!(result.verdict, LayerVerdict::Pass); assert!(result.detail.is_none()); } // -- Malware detections -- #[test] fn eicar_detection_fails() { let result = parse_clamav_response("stream: Eicar-Signature FOUND"); assert_eq!(result.verdict, LayerVerdict::Fail); let detail = result.detail.unwrap(); assert!(detail.contains("Eicar-Signature")); } #[test] fn complex_virus_name_extracted() { let result = parse_clamav_response("stream: Win.Test.EICAR_HDB-1 FOUND"); assert_eq!(result.verdict, LayerVerdict::Fail); let detail = result.detail.unwrap(); assert!(detail.contains("Win.Test.EICAR_HDB-1")); } #[test] fn trojan_detection_fails() { let result = parse_clamav_response("stream: Win.Trojan.Agent-123456 FOUND"); assert_eq!(result.verdict, LayerVerdict::Fail); let detail = result.detail.unwrap(); assert!(detail.contains("Win.Trojan.Agent-123456")); } // -- Error responses -- #[test] fn empty_response_is_error() { let result = parse_clamav_response(""); assert_eq!(result.verdict, LayerVerdict::Error); } #[test] fn garbage_response_is_error() { let result = parse_clamav_response("this is not a valid response"); assert_eq!(result.verdict, LayerVerdict::Error); assert!(result.detail.unwrap().contains("Unexpected ClamAV response")); } #[test] fn error_keyword_in_response_is_error() { // ClamAV may respond with "stream: ERROR" for scan errors let result = parse_clamav_response("stream: INSTREAM size limit exceeded ERROR"); assert_eq!(result.verdict, LayerVerdict::Error); } }