//! Tempfile spool for large scan jobs. //! //! Objects above `SCAN_MAX_MEMORY_BYTES` stream from S3 into a tempfile //! under `SCAN_SPOOL_DIR` rather than buffering into a `Vec`. The //! resulting `SpoolHandle` exposes both a stable filesystem path (for //! layers that need random access — ZIP central directory, YARA) and an //! `AsyncRead` (for clamd INSTREAM). The file is removed on drop. //! //! Nothing in the scan pipeline calls this module yet; it is wired up in //! later chunks of the scanner-streaming refactor. use crate::constants::{SCAN_SPOOL_FREE_RESERVE_BYTES, SCAN_SPOOL_MAX_BYTES, SCAN_SPOOL_ORPHAN_AGE_SECS}; use s3_storage::ByteStream; use std::path::{Path, PathBuf}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncWriteExt; /// Memory-map a spooled file for the byte-slice-only layers (structural, /// signing_*). Mmap gives us `&[u8]` backed by the OS page cache: the /// pages get demand-paged in as the parser walks them, so a 500 MB file /// doesn't allocate 500 MB of RSS. /// /// Safety: the caller must guarantee no other process is writing to the /// file while the mapping is alive. Scan-spool tempfiles are written /// exclusively by the scanner before the layer runs and unlinked on /// drop, so no other writer exists. pub fn mmap_read(path: &Path) -> Result { let file = std::fs::File::open(path) .map_err(|e| format!("open spool {}: {e}", path.display()))?; unsafe { memmap2::Mmap::map(&file) } .map_err(|e| format!("mmap spool {}: {e}", path.display())) } /// Owned handle to a spooled tempfile. The file is unlinked when the /// handle drops, even if a layer panics mid-scan. pub struct SpoolHandle { path: PathBuf, } impl SpoolHandle { /// On-disk path. Layers that want random access (ZIP, YARA) open this /// directly. Layers that want a stream open it and pass the `File`. pub fn path(&self) -> &Path { &self.path } /// Open a fresh read handle at offset 0. Cheap; each consumer gets /// its own cursor. pub async fn reader(&self) -> std::io::Result { OpenOptions::new().read(true).open(&self.path).await } } impl Drop for SpoolHandle { fn drop(&mut self) { let _ = std::fs::remove_file(&self.path); } } /// Refuse the job if writing `expected_size` would leave the spool volume /// with less than `SCAN_SPOOL_FREE_RESERVE_BYTES` free, or if it exceeds /// the per-object cap. pub fn check_free_space(spool_dir: &Path, expected_size: u64) -> Result<(), String> { if expected_size > SCAN_SPOOL_MAX_BYTES { return Err(format!( "object exceeds scan spool cap ({} > {} bytes)", expected_size, SCAN_SPOOL_MAX_BYTES )); } let probe = if spool_dir.exists() { spool_dir } else { spool_dir.parent().unwrap_or(Path::new("/")) }; let free = fs2::available_space(probe) .map_err(|e| format!("statvfs {}: {e}", probe.display()))?; if free.saturating_sub(expected_size) < SCAN_SPOOL_FREE_RESERVE_BYTES { return Err(format!( "scan spool volume short on space: free={} expected={} reserve={}", free, expected_size, SCAN_SPOOL_FREE_RESERVE_BYTES )); } Ok(()) } /// Stream the entire `ByteStream` into a fresh tempfile under `spool_dir` /// and return a handle. `expected_size` is used for the pre-flight cap /// check; the actual write enforces nothing beyond the cap (the upload /// presign already bounded the object size). pub async fn download_into_tempfile( spool_dir: &Path, unique: &str, s3_key: &str, expected_size: u64, mut stream: ByteStream, ) -> Result { check_free_space(spool_dir, expected_size)?; tokio::fs::create_dir_all(spool_dir) .await .map_err(|e| format!("create spool dir {}: {e}", spool_dir.display()))?; // `unique` (the scan job's UUID) makes the path collision-free even when two // workers scan the same s3_key concurrently (an item and its version, or a // re-queued job). Keying solely on pid+s3_key let the second `create_new` // fail `AlreadyExists` — failing a legitimately-running job — and let one // scan's `SpoolHandle::drop` unlink the other's live file. The s3_key suffix // is retained only for human-readable debugging. let suffix = s3_key.replace('/', "_"); let path = spool_dir.join(format!("scan-{}-{}-{}.tmp", std::process::id(), unique, suffix)); let mut file = OpenOptions::new() .create_new(true) .write(true) .open(&path) .await .map_err(|e| format!("open spool tempfile {}: {e}", path.display()))?; while let Some(chunk) = stream .try_next() .await .map_err(|e| format!("read S3 stream: {e}"))? { file.write_all(&chunk) .await .map_err(|e| format!("write spool tempfile: {e}"))?; } file.flush() .await .map_err(|e| format!("flush spool tempfile: {e}"))?; drop(file); Ok(SpoolHandle { path }) } /// Reaper outcome — surfaced for logs and metrics. #[derive(Debug, Default, Clone, Copy)] pub struct ReaperReport { pub deleted: u64, pub kept: u64, pub errors: u64, } /// Walk the spool directory and delete every regular file present — /// intended for startup, when no live scan can own anything on disk. /// Missing directory is not an error (a fresh box hasn't created it yet). pub fn reap_all(spool_dir: &Path) -> ReaperReport { reap_predicate(spool_dir, |_meta| true) } /// Walk the spool directory and delete regular files older than the /// orphan-age threshold. Intended for the scheduler's 5-minute tick. pub fn reap_orphans(spool_dir: &Path) -> ReaperReport { let threshold = std::time::Duration::from_secs(SCAN_SPOOL_ORPHAN_AGE_SECS); reap_predicate(spool_dir, |meta| { meta.modified() .ok() .and_then(|m| m.elapsed().ok()) .map(|age| age > threshold) .unwrap_or(false) }) } fn reap_predicate bool>( spool_dir: &Path, should_delete: F, ) -> ReaperReport { let mut report = ReaperReport::default(); let entries = match std::fs::read_dir(spool_dir) { Ok(rd) => rd, Err(e) if e.kind() == std::io::ErrorKind::NotFound => return report, Err(e) => { tracing::warn!(dir = %spool_dir.display(), error = %e, "spool reaper: read_dir failed"); report.errors += 1; return report; } }; for entry in entries.flatten() { let path = entry.path(); let meta = match entry.metadata() { Ok(m) => m, Err(_) => { report.errors += 1; continue; } }; if !meta.is_file() { continue; } if !should_delete(&meta) { report.kept += 1; continue; } match std::fs::remove_file(&path) { Ok(()) => { report.deleted += 1; tracing::info!(path = %path.display(), "spool reaper: deleted orphan"); } Err(e) => { report.errors += 1; tracing::warn!(path = %path.display(), error = %e, "spool reaper: delete failed"); } } } report } #[cfg(test)] mod tests { use super::*; #[test] fn reap_all_deletes_present_files() { let dir = tempfile::tempdir().unwrap(); std::fs::write(dir.path().join("scan-1.tmp"), b"x").unwrap(); std::fs::write(dir.path().join("scan-2.tmp"), b"y").unwrap(); let report = reap_all(dir.path()); assert_eq!(report.deleted, 2); assert_eq!(report.errors, 0); assert_eq!(std::fs::read_dir(dir.path()).unwrap().count(), 0); } #[test] fn reap_orphans_keeps_recent_files() { let dir = tempfile::tempdir().unwrap(); std::fs::write(dir.path().join("scan-1.tmp"), b"x").unwrap(); let report = reap_orphans(dir.path()); assert_eq!(report.deleted, 0); assert_eq!(report.kept, 1); } #[test] fn reap_handles_missing_directory() { let report = reap_all(std::path::Path::new("/nonexistent/spool/dir/xyz123")); assert_eq!(report.deleted, 0); assert_eq!(report.errors, 0); } #[test] fn spool_handle_drops_file() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("scan-drop.tmp"); std::fs::write(&path, b"data").unwrap(); let handle = SpoolHandle { path: path.clone() }; assert!(path.exists()); drop(handle); assert!(!path.exists()); } }