Skip to main content

max / makenotwork

server: route big-file scans through the streaming pipeline
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-27 16:35 UTC
Commit: 6aabeef1e18e7c908c68d7f039ce1f6ad5650150
Parent: dff45ff
2 files changed, +126 insertions, -20 deletions
@@ -254,6 +254,115 @@ impl ScanPipeline {
254 254 }
255 255 }
256 256
257 + /// Streaming counterpart to `scan`. Runs against a spooled tempfile so
258 + /// the >100 MB case doesn't hold the whole object in RAM. The CPU
259 + /// layers operate on a memory mapping (`spool::mmap_read`) — pages are
260 + /// demand-paged by the kernel as goblin / yara-x / archive walk them —
261 + /// and ClamAV streams the file via INSTREAM frames.
262 + pub async fn scan_stream(
263 + self: std::sync::Arc<Self>,
264 + spool: spool::SpoolHandle,
265 + file_type: FileType,
266 + ) -> ScanResult {
267 + let file_size = std::fs::metadata(spool.path())
268 + .map(|m| m.len())
269 + .unwrap_or(0);
270 +
271 + let map = match spool::mmap_read(spool.path()) {
272 + Ok(m) => std::sync::Arc::new(m),
273 + Err(e) => {
274 + let layer = LayerResult {
275 + layer: "spool",
276 + verdict: LayerVerdict::Error,
277 + detail: Some(e),
278 + };
279 + return ScanResult {
280 + status: final_status(std::slice::from_ref(&layer)),
281 + layers: vec![layer],
282 + sha256: String::new(),
283 + file_size,
284 + };
285 + }
286 + };
287 +
288 + let sync_map = std::sync::Arc::clone(&map);
289 + let sync_self = std::sync::Arc::clone(&self);
290 + let sync_fut = tokio::task::spawn_blocking(move || sync_self.run_sync_layers(&sync_map, file_type));
291 +
292 + let clamav_socket = self.clamav_socket.clone();
293 + let clamav_path = spool.path().to_path_buf();
294 + let clamav_fut = async move {
295 + match clamav_socket {
296 + Some(socket) => match tokio::fs::File::open(&clamav_path).await {
297 + Ok(file) => clamav::scan_with_clamav_stream(&socket, file).await,
298 + Err(e) => LayerResult {
299 + layer: "clamav",
300 + verdict: LayerVerdict::Error,
301 + detail: Some(format!("open spool for clamav: {e}")),
302 + },
303 + },
304 + None => LayerResult {
305 + layer: "clamav",
306 + verdict: LayerVerdict::Skip,
307 + detail: Some("ClamAV not configured".to_string()),
308 + },
309 + }
310 + };
311 +
312 + let urlhaus_map = std::sync::Arc::clone(&map);
313 + let urlhaus_enabled = self.urlhaus_enabled;
314 + let urlhaus_key = self.abuse_ch_auth_key.clone();
315 + let urlhaus_fut = async move {
316 + if urlhaus_enabled {
317 + urlhaus::check_urlhaus(&urlhaus_map, urlhaus_key.as_deref()).await
318 + } else {
319 + LayerResult {
320 + layer: "urlhaus",
321 + verdict: LayerVerdict::Skip,
322 + detail: Some("URLhaus lookups disabled".to_string()),
323 + }
324 + }
325 + };
326 +
327 + let (sync_result, clamav_result, urlhaus_result) =
328 + tokio::join!(sync_fut, clamav_fut, urlhaus_fut);
329 + let (mut layers, sha256) = sync_result
330 + .expect("scan_stream sync spawn_blocking panicked");
331 + layers.push(clamav_result);
332 + layers.push(urlhaus_result);
333 +
334 + layers.push(if self.malwarebazaar_enabled {
335 + hash_lookup::check_malwarebazaar(&sha256, self.abuse_ch_auth_key.as_deref()).await
336 + } else {
337 + LayerResult {
338 + layer: "malwarebazaar",
339 + verdict: LayerVerdict::Skip,
340 + detail: Some("MalwareBazaar lookups disabled".to_string()),
341 + }
342 + });
343 +
344 + layers.push(if suspicion_present(&layers) {
345 + metadefender::check_metadefender(&sha256, self.metadefender_api_key.as_deref()).await
346 + } else {
347 + LayerResult {
348 + layer: "metadefender",
349 + verdict: LayerVerdict::Skip,
350 + detail: Some("No prior suspicion; second-opinion not invoked".to_string()),
351 + }
352 + });
353 +
354 + let status = final_status(&layers);
355 + drop(map);
356 + drop(spool);
357 +
358 + ScanResult {
359 + status,
360 + layers,
361 + sha256,
362 + file_size,
363 + }
364 + }
365 +
257 366 /// CPU-bound layers + SHA-256. Pure sync; safe to call from `spawn_blocking`.
258 367 fn run_sync_layers(&self, data: &[u8], file_type: FileType) -> (Vec<LayerResult>, String) {
259 368 let mut layers = Vec::with_capacity(5);
@@ -156,28 +156,25 @@ async fn run_pipeline_and_decide(
156 156 job: &ScanJob,
157 157 file_type: FileType,
158 158 ) -> Result<FileScanStatus, Box<dyn std::error::Error + Send + Sync>> {
159 - // Size guard. Larger-than-RAM files don't get scanned in-process; hold
160 - // them for review. Operator can manually clear via the admin dashboard.
161 - if job.file_size_bytes as usize > constants::SCAN_MAX_MEMORY_BYTES {
162 - tracing::warn!(
163 - s3_key = %job.s3_key,
164 - file_size_bytes = job.file_size_bytes,
165 - max = constants::SCAN_MAX_MEMORY_BYTES,
166 - "file too large for in-memory scan; holding for review"
167 - );
168 - return Ok(FileScanStatus::HeldForReview);
169 - }
170 -
171 - // S3 download runs *outside* the scan_semaphore. The permit bounds the
172 - // CPU/clamd-heavy scan phase, not network IO — holding it across the GET
173 - // serializes downloads at SCAN_MAX_CONCURRENT and lets a scan backlog
174 - // starve the DB pool. Worker count is the operational cap on concurrent
175 - // downloads.
176 - let data = ctx.s3.download_object(&job.s3_key).await?;
177 -
178 - let result: ScanResult = {
159 + // Both branches run S3 IO *outside* the scan_semaphore: the permit
160 + // bounds the CPU/clamd-heavy scan phase, not network IO. Holding it
161 + // across the GET serializes downloads at SCAN_MAX_CONCURRENT and lets
162 + // a scan backlog starve the DB pool.
163 + let result: ScanResult = if (job.file_size_bytes as usize) < constants::SCAN_MAX_MEMORY_BYTES {
164 + let data = ctx.s3.download_object(&job.s3_key).await?;
179 165 let _permit = ctx.scan_semaphore.acquire().await?;
180 166 Arc::clone(&ctx.pipeline).scan(data, file_type).await
167 + } else {
168 + let stream = ctx.s3.download_stream(&job.s3_key).await?;
169 + let spool = super::spool::download_into_tempfile(
170 + std::path::Path::new(constants::SCAN_SPOOL_DIR),
171 + &job.s3_key,
172 + job.file_size_bytes as u64,
173 + stream,
174 + )
175 + .await?;
176 + let _permit = ctx.scan_semaphore.acquire().await?;
177 + Arc::clone(&ctx.pipeline).scan_stream(spool, file_type).await
181 178 };
182 179
183 180 db::scanning::insert_scan_result(&ctx.db, &job.s3_key, &result).await?;