Skip to main content

max / makenotwork

server: clamav INSTREAM streaming entry
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-27 15:55 UTC
Commit: f93487b8af4a833c627c588a19e8812839fed58c
Parent: 44cef50
1 file changed, +78 insertions, -0 deletions
@@ -41,6 +41,30 @@ pub async fn scan_with_clamav(socket_path: &str, data: &[u8]) -> LayerResult {
41 41 }
42 42 }
43 43
44 + /// Streaming entry. Pumps `reader` into ClamAV INSTREAM frame-by-frame
45 + /// rather than buffering the whole object first. Frame shape matches the
46 + /// buffered path: 4-byte BE length + bytes, terminated with `0u32`.
47 + pub async fn scan_with_clamav_stream<R>(socket_path: &str, reader: R) -> LayerResult
48 + where
49 + R: tokio::io::AsyncRead + Unpin,
50 + {
51 + let timeout = Duration::from_secs(constants::SCAN_CLAMAV_TIMEOUT_SECS);
52 +
53 + match tokio::time::timeout(timeout, scan_instream_streaming(socket_path, reader)).await {
54 + Ok(Ok(result)) => result,
55 + Ok(Err(e)) => LayerResult {
56 + layer: "clamav",
57 + verdict: LayerVerdict::Error,
58 + detail: Some(format!("ClamAV error: {}", e)),
59 + },
60 + Err(_) => LayerResult {
61 + layer: "clamav",
62 + verdict: LayerVerdict::Error,
63 + detail: Some("ClamAV scan timed out".to_string()),
64 + },
65 + }
66 + }
67 +
44 68 async fn scan_instream(socket_path: &str, data: &[u8]) -> Result<LayerResult, String> {
45 69 let mut stream = UnixStream::connect(socket_path)
46 70 .await
@@ -85,6 +109,60 @@ async fn scan_instream(socket_path: &str, data: &[u8]) -> Result<LayerResult, St
85 109 Ok(parse_clamav_response(response_str))
86 110 }
87 111
112 + async fn scan_instream_streaming<R>(
113 + socket_path: &str,
114 + mut reader: R,
115 + ) -> Result<LayerResult, String>
116 + where
117 + R: tokio::io::AsyncRead + Unpin,
118 + {
119 + let mut stream = UnixStream::connect(socket_path)
120 + .await
121 + .map_err(|e| format!("Failed to connect to ClamAV socket: {}", e))?;
122 +
123 + stream
124 + .write_all(b"zINSTREAM\0")
125 + .await
126 + .map_err(|e| format!("Failed to send INSTREAM command: {}", e))?;
127 +
128 + let mut buf = vec![0u8; CHUNK_SIZE];
129 + loop {
130 + let n = reader
131 + .read(&mut buf)
132 + .await
133 + .map_err(|e| format!("Failed to read source: {}", e))?;
134 + if n == 0 {
135 + break;
136 + }
137 + let len = (n as u32).to_be_bytes();
138 + stream
139 + .write_all(&len)
140 + .await
141 + .map_err(|e| format!("Failed to write chunk length: {}", e))?;
142 + stream
143 + .write_all(&buf[..n])
144 + .await
145 + .map_err(|e| format!("Failed to write chunk data: {}", e))?;
146 + }
147 +
148 + stream
149 + .write_all(&0u32.to_be_bytes())
150 + .await
151 + .map_err(|e| format!("Failed to send end marker: {}", e))?;
152 +
153 + let mut response = Vec::with_capacity(256);
154 + stream
155 + .take(16_384)
156 + .read_to_end(&mut response)
157 + .await
158 + .map_err(|e| format!("Failed to read ClamAV response: {}", e))?;
159 +
160 + let response_str = String::from_utf8_lossy(&response);
161 + let response_str = response_str.trim_end_matches('\0').trim();
162 +
163 + Ok(parse_clamav_response(response_str))
164 + }
165 +
88 166 /// Parse a ClamAV INSTREAM response string into a LayerResult.
89 167 /// Extracted for testability — the socket/IO layer just feeds the string in.
90 168 ///