|
1 |
+ |
//! Tempfile spool for large scan jobs.
|
|
2 |
+ |
//!
|
|
3 |
+ |
//! Objects above `SCAN_MAX_MEMORY_BYTES` stream from S3 into a tempfile
|
|
4 |
+ |
//! under `SCAN_SPOOL_DIR` rather than buffering into a `Vec<u8>`. The
|
|
5 |
+ |
//! resulting `SpoolHandle` exposes both a stable filesystem path (for
|
|
6 |
+ |
//! layers that need random access — ZIP central directory, YARA) and an
|
|
7 |
+ |
//! `AsyncRead` (for clamd INSTREAM). The file is removed on drop.
|
|
8 |
+ |
//!
|
|
9 |
+ |
//! Nothing in the scan pipeline calls this module yet; it is wired up in
|
|
10 |
+ |
//! later chunks of the scanner-streaming refactor.
|
|
11 |
+ |
|
|
12 |
+ |
use crate::constants::{SCAN_SPOOL_FREE_RESERVE_BYTES, SCAN_SPOOL_MAX_BYTES};
|
|
13 |
+ |
use s3_storage::ByteStream;
|
|
14 |
+ |
use std::path::{Path, PathBuf};
|
|
15 |
+ |
use tokio::fs::{File, OpenOptions};
|
|
16 |
+ |
use tokio::io::AsyncWriteExt;
|
|
17 |
+ |
|
|
18 |
+ |
/// Owned handle to a spooled tempfile. The file is unlinked when the
|
|
19 |
+ |
/// handle drops, even if a layer panics mid-scan.
|
|
20 |
+ |
pub struct SpoolHandle {
|
|
21 |
+ |
path: PathBuf,
|
|
22 |
+ |
}
|
|
23 |
+ |
|
|
24 |
+ |
impl SpoolHandle {
|
|
25 |
+ |
/// On-disk path. Layers that want random access (ZIP, YARA) open this
|
|
26 |
+ |
/// directly. Layers that want a stream open it and pass the `File`.
|
|
27 |
+ |
pub fn path(&self) -> &Path {
|
|
28 |
+ |
&self.path
|
|
29 |
+ |
}
|
|
30 |
+ |
|
|
31 |
+ |
/// Open a fresh read handle at offset 0. Cheap; each consumer gets
|
|
32 |
+ |
/// its own cursor.
|
|
33 |
+ |
pub async fn reader(&self) -> std::io::Result<File> {
|
|
34 |
+ |
OpenOptions::new().read(true).open(&self.path).await
|
|
35 |
+ |
}
|
|
36 |
+ |
}
|
|
37 |
+ |
|
|
38 |
+ |
impl Drop for SpoolHandle {
|
|
39 |
+ |
fn drop(&mut self) {
|
|
40 |
+ |
let _ = std::fs::remove_file(&self.path);
|
|
41 |
+ |
}
|
|
42 |
+ |
}
|
|
43 |
+ |
|
|
44 |
+ |
/// Refuse the job if writing `expected_size` would leave the spool volume
|
|
45 |
+ |
/// with less than `SCAN_SPOOL_FREE_RESERVE_BYTES` free, or if it exceeds
|
|
46 |
+ |
/// the per-object cap.
|
|
47 |
+ |
///
|
|
48 |
+ |
/// TODO: the free-space half of this check requires a `statvfs` call.
|
|
49 |
+ |
/// Pending the user's call on adding a `fs2` (or `nix`) dependency, this
|
|
50 |
+ |
/// only enforces the per-object cap; full free-space enforcement lands
|
|
51 |
+ |
/// with the dep.
|
|
52 |
+ |
pub fn check_free_space(_spool_dir: &Path, expected_size: u64) -> Result<(), String> {
|
|
53 |
+ |
if expected_size > SCAN_SPOOL_MAX_BYTES {
|
|
54 |
+ |
return Err(format!(
|
|
55 |
+ |
"object exceeds scan spool cap ({} > {} bytes)",
|
|
56 |
+ |
expected_size, SCAN_SPOOL_MAX_BYTES
|
|
57 |
+ |
));
|
|
58 |
+ |
}
|
|
59 |
+ |
let _ = SCAN_SPOOL_FREE_RESERVE_BYTES;
|
|
60 |
+ |
Ok(())
|
|
61 |
+ |
}
|
|
62 |
+ |
|
|
63 |
+ |
/// Stream the entire `ByteStream` into a fresh tempfile under `spool_dir`
|
|
64 |
+ |
/// and return a handle. `expected_size` is used for the pre-flight cap
|
|
65 |
+ |
/// check; the actual write enforces nothing beyond the cap (the upload
|
|
66 |
+ |
/// presign already bounded the object size).
|
|
67 |
+ |
pub async fn download_into_tempfile(
|
|
68 |
+ |
spool_dir: &Path,
|
|
69 |
+ |
s3_key: &str,
|
|
70 |
+ |
expected_size: u64,
|
|
71 |
+ |
mut stream: ByteStream,
|
|
72 |
+ |
) -> Result<SpoolHandle, String> {
|
|
73 |
+ |
check_free_space(spool_dir, expected_size)?;
|
|
74 |
+ |
|
|
75 |
+ |
tokio::fs::create_dir_all(spool_dir)
|
|
76 |
+ |
.await
|
|
77 |
+ |
.map_err(|e| format!("create spool dir {}: {e}", spool_dir.display()))?;
|
|
78 |
+ |
|
|
79 |
+ |
let suffix = s3_key.replace('/', "_");
|
|
80 |
+ |
let path = spool_dir.join(format!("scan-{}-{}.tmp", std::process::id(), suffix));
|
|
81 |
+ |
|
|
82 |
+ |
let mut file = OpenOptions::new()
|
|
83 |
+ |
.create_new(true)
|
|
84 |
+ |
.write(true)
|
|
85 |
+ |
.open(&path)
|
|
86 |
+ |
.await
|
|
87 |
+ |
.map_err(|e| format!("open spool tempfile {}: {e}", path.display()))?;
|
|
88 |
+ |
|
|
89 |
+ |
while let Some(chunk) = stream
|
|
90 |
+ |
.try_next()
|
|
91 |
+ |
.await
|
|
92 |
+ |
.map_err(|e| format!("read S3 stream: {e}"))?
|
|
93 |
+ |
{
|
|
94 |
+ |
file.write_all(&chunk)
|
|
95 |
+ |
.await
|
|
96 |
+ |
.map_err(|e| format!("write spool tempfile: {e}"))?;
|
|
97 |
+ |
}
|
|
98 |
+ |
file.flush()
|
|
99 |
+ |
.await
|
|
100 |
+ |
.map_err(|e| format!("flush spool tempfile: {e}"))?;
|
|
101 |
+ |
drop(file);
|
|
102 |
+ |
|
|
103 |
+ |
Ok(SpoolHandle { path })
|
|
104 |
+ |
}
|