Skip to main content

max / makenotwork

8.6 KB · 250 lines History Blame Raw
1 //! Tempfile spool for large scan jobs.
2 //!
3 //! Objects above `SCAN_MAX_MEMORY_BYTES` stream from S3 into a tempfile
4 //! under `SCAN_SPOOL_DIR` rather than buffering into a `Vec<u8>`. The
5 //! resulting `SpoolHandle` exposes both a stable filesystem path (for
6 //! layers that need random access — ZIP central directory, YARA) and an
7 //! `AsyncRead` (for clamd INSTREAM). The file is removed on drop.
8 //!
9 //! Nothing in the scan pipeline calls this module yet; it is wired up in
10 //! later chunks of the scanner-streaming refactor.
11
12 use crate::constants::{SCAN_SPOOL_FREE_RESERVE_BYTES, SCAN_SPOOL_MAX_BYTES, SCAN_SPOOL_ORPHAN_AGE_SECS};
13 use s3_storage::ByteStream;
14 use std::path::{Path, PathBuf};
15 use tokio::fs::{File, OpenOptions};
16 use tokio::io::AsyncWriteExt;
17
18 /// Memory-map a spooled file for the byte-slice-only layers (structural,
19 /// signing_*). Mmap gives us `&[u8]` backed by the OS page cache: the
20 /// pages get demand-paged in as the parser walks them, so a 500 MB file
21 /// doesn't allocate 500 MB of RSS.
22 ///
23 /// Safety: the caller must guarantee no other process is writing to the
24 /// file while the mapping is alive. Scan-spool tempfiles are written
25 /// exclusively by the scanner before the layer runs and unlinked on
26 /// drop, so no other writer exists.
27 pub fn mmap_read(path: &Path) -> Result<memmap2::Mmap, String> {
28 let file = std::fs::File::open(path)
29 .map_err(|e| format!("open spool {}: {e}", path.display()))?;
30 unsafe { memmap2::Mmap::map(&file) }
31 .map_err(|e| format!("mmap spool {}: {e}", path.display()))
32 }
33
34 /// Owned handle to a spooled tempfile. The file is unlinked when the
35 /// handle drops, even if a layer panics mid-scan.
36 pub struct SpoolHandle {
37 path: PathBuf,
38 }
39
40 impl SpoolHandle {
41 /// On-disk path. Layers that want random access (ZIP, YARA) open this
42 /// directly. Layers that want a stream open it and pass the `File`.
43 pub fn path(&self) -> &Path {
44 &self.path
45 }
46
47 /// Open a fresh read handle at offset 0. Cheap; each consumer gets
48 /// its own cursor.
49 pub async fn reader(&self) -> std::io::Result<File> {
50 OpenOptions::new().read(true).open(&self.path).await
51 }
52 }
53
54 impl Drop for SpoolHandle {
55 fn drop(&mut self) {
56 let _ = std::fs::remove_file(&self.path);
57 }
58 }
59
60 /// Refuse the job if writing `expected_size` would leave the spool volume
61 /// with less than `SCAN_SPOOL_FREE_RESERVE_BYTES` free, or if it exceeds
62 /// the per-object cap.
63 pub fn check_free_space(spool_dir: &Path, expected_size: u64) -> Result<(), String> {
64 if expected_size > SCAN_SPOOL_MAX_BYTES {
65 return Err(format!(
66 "object exceeds scan spool cap ({} > {} bytes)",
67 expected_size, SCAN_SPOOL_MAX_BYTES
68 ));
69 }
70 let probe = if spool_dir.exists() {
71 spool_dir
72 } else {
73 spool_dir.parent().unwrap_or(Path::new("/"))
74 };
75 let free = fs2::available_space(probe)
76 .map_err(|e| format!("statvfs {}: {e}", probe.display()))?;
77 if free.saturating_sub(expected_size) < SCAN_SPOOL_FREE_RESERVE_BYTES {
78 return Err(format!(
79 "scan spool volume short on space: free={} expected={} reserve={}",
80 free, expected_size, SCAN_SPOOL_FREE_RESERVE_BYTES
81 ));
82 }
83 Ok(())
84 }
85
86 /// Stream the entire `ByteStream` into a fresh tempfile under `spool_dir`
87 /// and return a handle. `expected_size` is used for the pre-flight cap
88 /// check; the actual write enforces nothing beyond the cap (the upload
89 /// presign already bounded the object size).
90 pub async fn download_into_tempfile(
91 spool_dir: &Path,
92 unique: &str,
93 s3_key: &str,
94 expected_size: u64,
95 mut stream: ByteStream,
96 ) -> Result<SpoolHandle, String> {
97 check_free_space(spool_dir, expected_size)?;
98
99 tokio::fs::create_dir_all(spool_dir)
100 .await
101 .map_err(|e| format!("create spool dir {}: {e}", spool_dir.display()))?;
102
103 // `unique` (the scan job's UUID) makes the path collision-free even when two
104 // workers scan the same s3_key concurrently (an item and its version, or a
105 // re-queued job). Keying solely on pid+s3_key let the second `create_new`
106 // fail `AlreadyExists` — failing a legitimately-running job — and let one
107 // scan's `SpoolHandle::drop` unlink the other's live file. The s3_key suffix
108 // is retained only for human-readable debugging.
109 let suffix = s3_key.replace('/', "_");
110 let path = spool_dir.join(format!("scan-{}-{}-{}.tmp", std::process::id(), unique, suffix));
111
112 let mut file = OpenOptions::new()
113 .create_new(true)
114 .write(true)
115 .open(&path)
116 .await
117 .map_err(|e| format!("open spool tempfile {}: {e}", path.display()))?;
118
119 while let Some(chunk) = stream
120 .try_next()
121 .await
122 .map_err(|e| format!("read S3 stream: {e}"))?
123 {
124 file.write_all(&chunk)
125 .await
126 .map_err(|e| format!("write spool tempfile: {e}"))?;
127 }
128 file.flush()
129 .await
130 .map_err(|e| format!("flush spool tempfile: {e}"))?;
131 drop(file);
132
133 Ok(SpoolHandle { path })
134 }
135
136 /// Reaper outcome — surfaced for logs and metrics.
137 #[derive(Debug, Default, Clone, Copy)]
138 pub struct ReaperReport {
139 pub deleted: u64,
140 pub kept: u64,
141 pub errors: u64,
142 }
143
144 /// Walk the spool directory and delete every regular file present —
145 /// intended for startup, when no live scan can own anything on disk.
146 /// Missing directory is not an error (a fresh box hasn't created it yet).
147 pub fn reap_all(spool_dir: &Path) -> ReaperReport {
148 reap_predicate(spool_dir, |_meta| true)
149 }
150
151 /// Walk the spool directory and delete regular files older than the
152 /// orphan-age threshold. Intended for the scheduler's 5-minute tick.
153 pub fn reap_orphans(spool_dir: &Path) -> ReaperReport {
154 let threshold = std::time::Duration::from_secs(SCAN_SPOOL_ORPHAN_AGE_SECS);
155 reap_predicate(spool_dir, |meta| {
156 meta.modified()
157 .ok()
158 .and_then(|m| m.elapsed().ok())
159 .map(|age| age > threshold)
160 .unwrap_or(false)
161 })
162 }
163
164 fn reap_predicate<F: Fn(&std::fs::Metadata) -> bool>(
165 spool_dir: &Path,
166 should_delete: F,
167 ) -> ReaperReport {
168 let mut report = ReaperReport::default();
169 let entries = match std::fs::read_dir(spool_dir) {
170 Ok(rd) => rd,
171 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return report,
172 Err(e) => {
173 tracing::warn!(dir = %spool_dir.display(), error = %e, "spool reaper: read_dir failed");
174 report.errors += 1;
175 return report;
176 }
177 };
178 for entry in entries.flatten() {
179 let path = entry.path();
180 let meta = match entry.metadata() {
181 Ok(m) => m,
182 Err(_) => {
183 report.errors += 1;
184 continue;
185 }
186 };
187 if !meta.is_file() {
188 continue;
189 }
190 if !should_delete(&meta) {
191 report.kept += 1;
192 continue;
193 }
194 match std::fs::remove_file(&path) {
195 Ok(()) => {
196 report.deleted += 1;
197 tracing::info!(path = %path.display(), "spool reaper: deleted orphan");
198 }
199 Err(e) => {
200 report.errors += 1;
201 tracing::warn!(path = %path.display(), error = %e, "spool reaper: delete failed");
202 }
203 }
204 }
205 report
206 }
207
208 #[cfg(test)]
209 mod tests {
210 use super::*;
211
212 #[test]
213 fn reap_all_deletes_present_files() {
214 let dir = tempfile::tempdir().unwrap();
215 std::fs::write(dir.path().join("scan-1.tmp"), b"x").unwrap();
216 std::fs::write(dir.path().join("scan-2.tmp"), b"y").unwrap();
217 let report = reap_all(dir.path());
218 assert_eq!(report.deleted, 2);
219 assert_eq!(report.errors, 0);
220 assert_eq!(std::fs::read_dir(dir.path()).unwrap().count(), 0);
221 }
222
223 #[test]
224 fn reap_orphans_keeps_recent_files() {
225 let dir = tempfile::tempdir().unwrap();
226 std::fs::write(dir.path().join("scan-1.tmp"), b"x").unwrap();
227 let report = reap_orphans(dir.path());
228 assert_eq!(report.deleted, 0);
229 assert_eq!(report.kept, 1);
230 }
231
232 #[test]
233 fn reap_handles_missing_directory() {
234 let report = reap_all(std::path::Path::new("/nonexistent/spool/dir/xyz123"));
235 assert_eq!(report.deleted, 0);
236 assert_eq!(report.errors, 0);
237 }
238
239 #[test]
240 fn spool_handle_drops_file() {
241 let dir = tempfile::tempdir().unwrap();
242 let path = dir.path().join("scan-drop.tmp");
243 std::fs::write(&path, b"data").unwrap();
244 let handle = SpoolHandle { path: path.clone() };
245 assert!(path.exists());
246 drop(handle);
247 assert!(!path.exists());
248 }
249 }
250