max / makenotwork
6 files changed,
+164 insertions,
-2 deletions
| @@ -35,6 +35,12 @@ ProtectSystem=strict | |||
| 35 | 35 | ProtectHome=true | |
| 36 | 36 | PrivateTmp=true | |
| 37 | 37 | ReadWritePaths=/opt/makenotwork | |
| 38 | + | # Scan-spool tempfiles for streaming large uploads through the malware | |
| 39 | + | # pipeline. systemd creates /var/lib/makenotwork/scan-spool, chowns to | |
| 40 | + | # the service user, and adds it to ReadWritePaths automatically. Path | |
| 41 | + | # is mirrored in `constants::SCAN_SPOOL_DIR`. | |
| 42 | + | StateDirectory=makenotwork/scan-spool | |
| 43 | + | StateDirectoryMode=0700 | |
| 38 | 44 | RestrictAddressFamilies=AF_UNIX AF_INET AF_INET6 | |
| 39 | 45 | RestrictNamespaces=true | |
| 40 | 46 | RestrictRealtime=true |
| @@ -163,7 +163,12 @@ pub const SCAN_JOB_RETENTION_DAYS: u32 = 30; | |||
| 163 | 163 | /// before invoking path/stream-based layer entries. On production, systemd | |
| 164 | 164 | /// provisions this via `StateDirectory=mnw/scan-spool` so the path resolves | |
| 165 | 165 | /// to `/var/lib/mnw/scan-spool`. Override with `MNW_SCAN_SPOOL_DIR` for dev. | |
| 166 | - | pub const SCAN_SPOOL_DIR: &str = "/var/lib/mnw/scan-spool"; | |
| 166 | + | pub const SCAN_SPOOL_DIR: &str = "/var/lib/makenotwork/scan-spool"; | |
| 167 | + | /// Files in `SCAN_SPOOL_DIR` older than this are considered orphaned | |
| 168 | + | /// (a panic, OOM, or hard kill left them behind) and reaped on the | |
| 169 | + | /// next sweep. RAII drop in `SpoolHandle` covers the live path; this | |
| 170 | + | /// covers process-death. | |
| 171 | + | pub const SCAN_SPOOL_ORPHAN_AGE_SECS: u64 = 3600; | |
| 167 | 172 | /// Hard cap on a single spooled object. Above this, the scanner refuses | |
| 168 | 173 | /// the job rather than risk filling the volume. 8 GiB matches the largest | |
| 169 | 174 | /// payload the upload tier currently allows. |
| @@ -342,6 +342,17 @@ async fn main() { | |||
| 342 | 342 | let worker_shutdown_rx = shutdown_tx.subscribe(); | |
| 343 | 343 | makenotwork::scanning::worker::spawn_pool(worker_count, scan_ctx, worker_shutdown_rx); | |
| 344 | 344 | tracing::info!(worker_count, "scan worker pool started"); | |
| 345 | + | ||
| 346 | + | let report = makenotwork::scanning::spool::reap_all( | |
| 347 | + | std::path::Path::new(makenotwork::constants::SCAN_SPOOL_DIR), | |
| 348 | + | ); | |
| 349 | + | if report.deleted > 0 || report.errors > 0 { | |
| 350 | + | tracing::info!( | |
| 351 | + | deleted = report.deleted, | |
| 352 | + | errors = report.errors, | |
| 353 | + | "scan spool startup reaper completed" | |
| 354 | + | ); | |
| 355 | + | } | |
| 345 | 356 | } | |
| 346 | 357 | ||
| 347 | 358 | // Build router (shared with integration tests via lib.rs) |
| @@ -498,6 +498,8 @@ struct ScanPipelineHealth { | |||
| 498 | 498 | held_media: i64, | |
| 499 | 499 | held_total: i64, | |
| 500 | 500 | layers: Vec<LayerHealthJson>, | |
| 501 | + | scan_spool_free_bytes: u64, | |
| 502 | + | scan_spool_file_count: u64, | |
| 501 | 503 | generated_at: String, | |
| 502 | 504 | } | |
| 503 | 505 | ||
| @@ -532,6 +534,17 @@ pub(super) async fn scan_health_json( | |||
| 532 | 534 | } | |
| 533 | 535 | }).collect(); | |
| 534 | 536 | ||
| 537 | + | let spool_dir = std::path::Path::new(crate::constants::SCAN_SPOOL_DIR); | |
| 538 | + | let scan_spool_free_bytes = fs2::available_space(if spool_dir.exists() { | |
| 539 | + | spool_dir | |
| 540 | + | } else { | |
| 541 | + | spool_dir.parent().unwrap_or(std::path::Path::new("/")) | |
| 542 | + | }) | |
| 543 | + | .unwrap_or(0); | |
| 544 | + | let scan_spool_file_count = std::fs::read_dir(spool_dir) | |
| 545 | + | .map(|rd| rd.flatten().filter(|e| e.file_type().map(|t| t.is_file()).unwrap_or(false)).count() as u64) | |
| 546 | + | .unwrap_or(0); | |
| 547 | + | ||
| 535 | 548 | let body = ScanPipelineHealth { | |
| 536 | 549 | queue_pending, | |
| 537 | 550 | queue_running, | |
| @@ -541,6 +554,8 @@ pub(super) async fn scan_health_json( | |||
| 541 | 554 | held_media: held.held_media, | |
| 542 | 555 | held_total, | |
| 543 | 556 | layers, | |
| 557 | + | scan_spool_free_bytes, | |
| 558 | + | scan_spool_file_count, | |
| 544 | 559 | generated_at: now.to_rfc3339(), | |
| 545 | 560 | }; | |
| 546 | 561 | Ok(axum::Json(body)) |
| @@ -9,7 +9,7 @@ | |||
| 9 | 9 | //! Nothing in the scan pipeline calls this module yet; it is wired up in | |
| 10 | 10 | //! later chunks of the scanner-streaming refactor. | |
| 11 | 11 | ||
| 12 | - | use crate::constants::{SCAN_SPOOL_FREE_RESERVE_BYTES, SCAN_SPOOL_MAX_BYTES}; | |
| 12 | + | use crate::constants::{SCAN_SPOOL_FREE_RESERVE_BYTES, SCAN_SPOOL_MAX_BYTES, SCAN_SPOOL_ORPHAN_AGE_SECS}; | |
| 13 | 13 | use s3_storage::ByteStream; | |
| 14 | 14 | use std::path::{Path, PathBuf}; | |
| 15 | 15 | use tokio::fs::{File, OpenOptions}; | |
| @@ -125,3 +125,118 @@ pub async fn download_into_tempfile( | |||
| 125 | 125 | ||
| 126 | 126 | Ok(SpoolHandle { path }) | |
| 127 | 127 | } | |
| 128 | + | ||
| 129 | + | /// Reaper outcome — surfaced for logs and metrics. | |
| 130 | + | #[derive(Debug, Default, Clone, Copy)] | |
| 131 | + | pub struct ReaperReport { | |
| 132 | + | pub deleted: u64, | |
| 133 | + | pub kept: u64, | |
| 134 | + | pub errors: u64, | |
| 135 | + | } | |
| 136 | + | ||
| 137 | + | /// Walk the spool directory and delete every regular file present — | |
| 138 | + | /// intended for startup, when no live scan can own anything on disk. | |
| 139 | + | /// Missing directory is not an error (a fresh box hasn't created it yet). | |
| 140 | + | pub fn reap_all(spool_dir: &Path) -> ReaperReport { | |
| 141 | + | reap_predicate(spool_dir, |_meta| true) | |
| 142 | + | } | |
| 143 | + | ||
| 144 | + | /// Walk the spool directory and delete regular files older than the | |
| 145 | + | /// orphan-age threshold. Intended for the scheduler's 5-minute tick. | |
| 146 | + | pub fn reap_orphans(spool_dir: &Path) -> ReaperReport { | |
| 147 | + | let threshold = std::time::Duration::from_secs(SCAN_SPOOL_ORPHAN_AGE_SECS); | |
| 148 | + | reap_predicate(spool_dir, |meta| { | |
| 149 | + | meta.modified() | |
| 150 | + | .ok() | |
| 151 | + | .and_then(|m| m.elapsed().ok()) | |
| 152 | + | .map(|age| age > threshold) | |
| 153 | + | .unwrap_or(false) | |
| 154 | + | }) | |
| 155 | + | } | |
| 156 | + | ||
| 157 | + | fn reap_predicate<F: Fn(&std::fs::Metadata) -> bool>( | |
| 158 | + | spool_dir: &Path, | |
| 159 | + | should_delete: F, | |
| 160 | + | ) -> ReaperReport { | |
| 161 | + | let mut report = ReaperReport::default(); | |
| 162 | + | let entries = match std::fs::read_dir(spool_dir) { | |
| 163 | + | Ok(rd) => rd, | |
| 164 | + | Err(e) if e.kind() == std::io::ErrorKind::NotFound => return report, | |
| 165 | + | Err(e) => { | |
| 166 | + | tracing::warn!(dir = %spool_dir.display(), error = %e, "spool reaper: read_dir failed"); | |
| 167 | + | report.errors += 1; | |
| 168 | + | return report; | |
| 169 | + | } | |
| 170 | + | }; | |
| 171 | + | for entry in entries.flatten() { | |
| 172 | + | let path = entry.path(); | |
| 173 | + | let meta = match entry.metadata() { | |
| 174 | + | Ok(m) => m, | |
| 175 | + | Err(_) => { | |
| 176 | + | report.errors += 1; | |
| 177 | + | continue; | |
| 178 | + | } | |
| 179 | + | }; | |
| 180 | + | if !meta.is_file() { | |
| 181 | + | continue; | |
| 182 | + | } | |
| 183 | + | if !should_delete(&meta) { | |
| 184 | + | report.kept += 1; | |
| 185 | + | continue; | |
| 186 | + | } | |
| 187 | + | match std::fs::remove_file(&path) { | |
| 188 | + | Ok(()) => { | |
| 189 | + | report.deleted += 1; | |
| 190 | + | tracing::info!(path = %path.display(), "spool reaper: deleted orphan"); | |
| 191 | + | } | |
| 192 | + | Err(e) => { | |
| 193 | + | report.errors += 1; | |
| 194 | + | tracing::warn!(path = %path.display(), error = %e, "spool reaper: delete failed"); | |
| 195 | + | } | |
| 196 | + | } | |
| 197 | + | } | |
| 198 | + | report | |
| 199 | + | } | |
| 200 | + | ||
| 201 | + | #[cfg(test)] | |
| 202 | + | mod tests { | |
| 203 | + | use super::*; | |
| 204 | + | ||
| 205 | + | #[test] | |
| 206 | + | fn reap_all_deletes_present_files() { | |
| 207 | + | let dir = tempfile::tempdir().unwrap(); | |
| 208 | + | std::fs::write(dir.path().join("scan-1.tmp"), b"x").unwrap(); | |
| 209 | + | std::fs::write(dir.path().join("scan-2.tmp"), b"y").unwrap(); | |
| 210 | + | let report = reap_all(dir.path()); | |
| 211 | + | assert_eq!(report.deleted, 2); | |
| 212 | + | assert_eq!(report.errors, 0); | |
| 213 | + | assert_eq!(std::fs::read_dir(dir.path()).unwrap().count(), 0); | |
| 214 | + | } | |
| 215 | + | ||
| 216 | + | #[test] | |
| 217 | + | fn reap_orphans_keeps_recent_files() { | |
| 218 | + | let dir = tempfile::tempdir().unwrap(); | |
| 219 | + | std::fs::write(dir.path().join("scan-1.tmp"), b"x").unwrap(); | |
| 220 | + | let report = reap_orphans(dir.path()); | |
| 221 | + | assert_eq!(report.deleted, 0); | |
| 222 | + | assert_eq!(report.kept, 1); | |
| 223 | + | } | |
| 224 | + | ||
| 225 | + | #[test] | |
| 226 | + | fn reap_handles_missing_directory() { | |
| 227 | + | let report = reap_all(std::path::Path::new("/nonexistent/spool/dir/xyz123")); | |
| 228 | + | assert_eq!(report.deleted, 0); | |
| 229 | + | assert_eq!(report.errors, 0); | |
| 230 | + | } | |
| 231 | + | ||
| 232 | + | #[test] | |
| 233 | + | fn spool_handle_drops_file() { | |
| 234 | + | let dir = tempfile::tempdir().unwrap(); | |
| 235 | + | let path = dir.path().join("scan-drop.tmp"); | |
| 236 | + | std::fs::write(&path, b"data").unwrap(); | |
| 237 | + | let handle = SpoolHandle { path: path.clone() }; | |
| 238 | + | assert!(path.exists()); | |
| 239 | + | drop(handle); | |
| 240 | + | assert!(!path.exists()); | |
| 241 | + | } | |
| 242 | + | } |
| @@ -186,6 +186,16 @@ pub fn spawn_scheduler( | |||
| 186 | 186 | if run_sandbox { | |
| 187 | 187 | cleanup::cleanup_sandbox_accounts(&state).await; | |
| 188 | 188 | cleanup::retry_pending_s3_deletions(&state).await; | |
| 189 | + | let report = crate::scanning::spool::reap_orphans( | |
| 190 | + | std::path::Path::new(constants::SCAN_SPOOL_DIR), | |
| 191 | + | ); | |
| 192 | + | if report.deleted > 0 || report.errors > 0 { | |
| 193 | + | tracing::info!( | |
| 194 | + | deleted = report.deleted, | |
| 195 | + | errors = report.errors, | |
| 196 | + | "scan spool reaper swept orphans" | |
| 197 | + | ); | |
| 198 | + | } | |
| 189 | 199 | } | |
| 190 | 200 | ||
| 191 | 201 | // Retry failed webhook events |