Skip to main content

max / makenotwork

10.9 KB · 324 lines History Blame Raw
1 //! Layer 5: ClamAV daemon scanning via Unix socket.
2 //!
3 //! Connects to `clamd` using the INSTREAM protocol: sends file data in
4 //! length-prefixed chunks, reads the verdict. Optional — only runs if
5 //! CLAMAV_SOCKET env var is set.
6
7 use std::time::Duration;
8
9 use tokio::io::{AsyncReadExt, AsyncWriteExt};
10 use tokio::net::UnixStream;
11
12 use crate::constants;
13
14 use super::{ErrorPolicy, LayerResult, LayerVerdict};
15
16 /// External local-service layer. An `Error` here means `clamd` is down,
17 /// unreachable, or out of memory — operational problems that must not block
18 /// every upload across the platform. Fail open; PoM surfaces degraded health.
19 pub const ERROR_POLICY: ErrorPolicy = ErrorPolicy::FailOpen;
20
21 /// Verify the `clamd` socket is reachable and responsive. Used at startup
22 /// to refuse to boot if scanning is configured but the daemon is dead —
23 /// otherwise the FailOpen policy would silently pass every upload as Clean
24 /// for the lifetime of the process.
25 pub async fn ping(socket_path: &str) -> Result<(), String> {
26 let timeout = Duration::from_secs(2);
27 let work = async {
28 let mut stream = UnixStream::connect(socket_path)
29 .await
30 .map_err(|e| format!("connect: {e}"))?;
31 stream
32 .write_all(b"zPING\0")
33 .await
34 .map_err(|e| format!("write PING: {e}"))?;
35 let mut response = Vec::with_capacity(8);
36 stream
37 .take(16)
38 .read_to_end(&mut response)
39 .await
40 .map_err(|e| format!("read PONG: {e}"))?;
41 let reply = String::from_utf8_lossy(&response);
42 let reply = reply.trim_end_matches('\0').trim();
43 if reply == "PONG" {
44 Ok(())
45 } else {
46 Err(format!("unexpected PING response: {reply:?}"))
47 }
48 };
49 tokio::time::timeout(timeout, work)
50 .await
51 .map_err(|_| "ClamAV PING timed out after 2s".to_string())?
52 }
53
54
55 /// Maximum chunk size for INSTREAM protocol (ClamAV default)
56 const CHUNK_SIZE: usize = 8192;
57
58 /// Scan file data using a ClamAV daemon via Unix socket.
59 pub async fn scan_with_clamav(socket_path: &str, data: &[u8]) -> LayerResult {
60 let timeout = Duration::from_secs(constants::SCAN_CLAMAV_TIMEOUT_SECS);
61
62 match tokio::time::timeout(timeout, scan_instream(socket_path, data)).await {
63 Ok(Ok(result)) => result,
64 Ok(Err(e)) => LayerResult {
65 layer: "clamav",
66 verdict: LayerVerdict::Error,
67 detail: Some(format!("ClamAV error: {}", e)),
68 },
69 Err(_) => LayerResult {
70 layer: "clamav",
71 verdict: LayerVerdict::Error,
72 detail: Some("ClamAV scan timed out".to_string()),
73 },
74 }
75 }
76
77 /// Streaming entry. Pumps `reader` into ClamAV INSTREAM frame-by-frame
78 /// rather than buffering the whole object first. Frame shape matches the
79 /// buffered path: 4-byte BE length + bytes, terminated with `0u32`.
80 pub async fn scan_with_clamav_stream<R>(socket_path: &str, reader: R) -> LayerResult
81 where
82 R: tokio::io::AsyncRead + Unpin,
83 {
84 let timeout = Duration::from_secs(constants::SCAN_CLAMAV_TIMEOUT_SECS);
85
86 match tokio::time::timeout(timeout, scan_instream_streaming(socket_path, reader)).await {
87 Ok(Ok(result)) => result,
88 Ok(Err(e)) => LayerResult {
89 layer: "clamav",
90 verdict: LayerVerdict::Error,
91 detail: Some(format!("ClamAV error: {}", e)),
92 },
93 Err(_) => LayerResult {
94 layer: "clamav",
95 verdict: LayerVerdict::Error,
96 detail: Some("ClamAV scan timed out".to_string()),
97 },
98 }
99 }
100
101 async fn scan_instream(socket_path: &str, data: &[u8]) -> Result<LayerResult, String> {
102 let mut stream = UnixStream::connect(socket_path)
103 .await
104 .map_err(|e| format!("Failed to connect to ClamAV socket: {}", e))?;
105
106 // Send INSTREAM command
107 stream
108 .write_all(b"zINSTREAM\0")
109 .await
110 .map_err(|e| format!("Failed to send INSTREAM command: {}", e))?;
111
112 // Send data in chunks: each chunk prefixed with 4-byte big-endian length
113 for chunk in data.chunks(CHUNK_SIZE) {
114 let len = (chunk.len() as u32).to_be_bytes();
115 stream
116 .write_all(&len)
117 .await
118 .map_err(|e| format!("Failed to write chunk length: {}", e))?;
119 stream
120 .write_all(chunk)
121 .await
122 .map_err(|e| format!("Failed to write chunk data: {}", e))?;
123 }
124
125 // Send zero-length chunk to signal end of data
126 stream
127 .write_all(&0u32.to_be_bytes())
128 .await
129 .map_err(|e| format!("Failed to send end marker: {}", e))?;
130
131 // Read response (capped at 16 KB — verdicts are typically under 100 bytes)
132 let mut response = Vec::with_capacity(256);
133 stream
134 .take(16_384)
135 .read_to_end(&mut response)
136 .await
137 .map_err(|e| format!("Failed to read ClamAV response: {}", e))?;
138
139 // A 16 KB response means we hit the take() cap — clamd's verdict was
140 // longer than expected (typical verdicts are <100 bytes) and we don't
141 // know how much was truncated. Treat as suspicious / fail-closed rather
142 // than letting `parse_clamav_response` see a half-token and return Error
143 // (which under FailOpen would silently Pass). EICAR's "FOUND" suffix
144 // might be just beyond the cap; assume the worst.
145 if response.len() >= 16_384 {
146 return Ok(LayerResult {
147 layer: "clamav",
148 verdict: LayerVerdict::Fail,
149 detail: Some("ClamAV response truncated at 16 KB cap — refusing to interpret".to_string()),
150 });
151 }
152
153 let response_str = String::from_utf8_lossy(&response);
154 let response_str = response_str.trim_end_matches('\0').trim();
155
156 Ok(parse_clamav_response(response_str))
157 }
158
159 async fn scan_instream_streaming<R>(
160 socket_path: &str,
161 mut reader: R,
162 ) -> Result<LayerResult, String>
163 where
164 R: tokio::io::AsyncRead + Unpin,
165 {
166 let mut stream = UnixStream::connect(socket_path)
167 .await
168 .map_err(|e| format!("Failed to connect to ClamAV socket: {}", e))?;
169
170 stream
171 .write_all(b"zINSTREAM\0")
172 .await
173 .map_err(|e| format!("Failed to send INSTREAM command: {}", e))?;
174
175 let mut buf = vec![0u8; CHUNK_SIZE];
176 loop {
177 let n = reader
178 .read(&mut buf)
179 .await
180 .map_err(|e| format!("Failed to read source: {}", e))?;
181 if n == 0 {
182 break;
183 }
184 let len = (n as u32).to_be_bytes();
185 stream
186 .write_all(&len)
187 .await
188 .map_err(|e| format!("Failed to write chunk length: {}", e))?;
189 stream
190 .write_all(&buf[..n])
191 .await
192 .map_err(|e| format!("Failed to write chunk data: {}", e))?;
193 }
194
195 stream
196 .write_all(&0u32.to_be_bytes())
197 .await
198 .map_err(|e| format!("Failed to send end marker: {}", e))?;
199
200 let mut response = Vec::with_capacity(256);
201 stream
202 .take(16_384)
203 .read_to_end(&mut response)
204 .await
205 .map_err(|e| format!("Failed to read ClamAV response: {}", e))?;
206
207 // A 16 KB response means we hit the take() cap — clamd's verdict was
208 // longer than expected (typical verdicts are <100 bytes) and we don't
209 // know how much was truncated. Treat as suspicious / fail-closed rather
210 // than letting `parse_clamav_response` see a half-token and return Error
211 // (which under FailOpen would silently Pass). EICAR's "FOUND" suffix
212 // might be just beyond the cap; assume the worst.
213 if response.len() >= 16_384 {
214 return Ok(LayerResult {
215 layer: "clamav",
216 verdict: LayerVerdict::Fail,
217 detail: Some("ClamAV response truncated at 16 KB cap — refusing to interpret".to_string()),
218 });
219 }
220
221 let response_str = String::from_utf8_lossy(&response);
222 let response_str = response_str.trim_end_matches('\0').trim();
223
224 Ok(parse_clamav_response(response_str))
225 }
226
227 /// Parse a ClamAV INSTREAM response string into a LayerResult.
228 /// Extracted for testability — the socket/IO layer just feeds the string in.
229 ///
230 /// ClamAV response format:
231 /// - `"stream: OK"` — clean
232 /// - `"stream: <virus_name> FOUND"` — infected
233 /// - `"stream: <error> ERROR"` — scan error
234 fn parse_clamav_response(response: &str) -> LayerResult {
235 if response == "stream: OK" {
236 LayerResult {
237 layer: "clamav",
238 verdict: LayerVerdict::Pass,
239 detail: None,
240 }
241 } else if response.ends_with("FOUND") {
242 // Extract virus name: "stream: Eicar-Signature FOUND" → "Eicar-Signature"
243 let virus_name = response
244 .strip_prefix("stream: ")
245 .unwrap_or(response)
246 .strip_suffix(" FOUND")
247 .unwrap_or(response);
248
249 LayerResult {
250 layer: "clamav",
251 verdict: LayerVerdict::Fail,
252 detail: Some(format!("ClamAV detection: {}", virus_name)),
253 }
254 } else {
255 LayerResult {
256 layer: "clamav",
257 verdict: LayerVerdict::Error,
258 detail: Some(format!("Unexpected ClamAV response: {}", response)),
259 }
260 }
261 }
262
263 #[cfg(test)]
264 mod tests {
265 use super::*;
266
267 // -- Clean responses --
268
269 #[test]
270 fn clean_response_passes() {
271 let result = parse_clamav_response("stream: OK");
272 assert_eq!(result.verdict, LayerVerdict::Pass);
273 assert!(result.detail.is_none());
274 }
275
276 // -- Malware detections --
277
278 #[test]
279 fn eicar_detection_fails() {
280 let result = parse_clamav_response("stream: Eicar-Signature FOUND");
281 assert_eq!(result.verdict, LayerVerdict::Fail);
282 let detail = result.detail.unwrap();
283 assert!(detail.contains("Eicar-Signature"));
284 }
285
286 #[test]
287 fn complex_virus_name_extracted() {
288 let result = parse_clamav_response("stream: Win.Test.EICAR_HDB-1 FOUND");
289 assert_eq!(result.verdict, LayerVerdict::Fail);
290 let detail = result.detail.unwrap();
291 assert!(detail.contains("Win.Test.EICAR_HDB-1"));
292 }
293
294 #[test]
295 fn trojan_detection_fails() {
296 let result = parse_clamav_response("stream: Win.Trojan.Agent-123456 FOUND");
297 assert_eq!(result.verdict, LayerVerdict::Fail);
298 let detail = result.detail.unwrap();
299 assert!(detail.contains("Win.Trojan.Agent-123456"));
300 }
301
302 // -- Error responses --
303
304 #[test]
305 fn empty_response_is_error() {
306 let result = parse_clamav_response("");
307 assert_eq!(result.verdict, LayerVerdict::Error);
308 }
309
310 #[test]
311 fn garbage_response_is_error() {
312 let result = parse_clamav_response("this is not a valid response");
313 assert_eq!(result.verdict, LayerVerdict::Error);
314 assert!(result.detail.unwrap().contains("Unexpected ClamAV response"));
315 }
316
317 #[test]
318 fn error_keyword_in_response_is_error() {
319 // ClamAV may respond with "stream: <message> ERROR" for scan errors
320 let result = parse_clamav_response("stream: INSTREAM size limit exceeded ERROR");
321 assert_eq!(result.verdict, LayerVerdict::Error);
322 }
323 }
324