//! Async scan worker. //! //! Spawned at startup from `main.rs`. Drains `scan_jobs` via //! `db::scan_jobs::claim_next` and runs each job through the pipeline. On //! completion, updates the target entity's `scan_status` (for entities that //! have one) and creates a WAM ticket on `Quarantined`. //! //! See `docs/scan-pipeline-audit.md` § 4.4 for the architecture. use std::sync::Arc; use std::time::Duration; use sqlx::PgPool; use tokio::sync::Semaphore; use uuid::Uuid; use crate::constants; use crate::db::{self, scan_jobs::{ScanJob, ScanTargetKind}, FileScanStatus, ItemId, VersionId}; use crate::storage::{FileType, StorageBackend}; use crate::wam_client::WamClient; use super::{LayerVerdict, ScanPipeline, ScanResult}; /// Worker poll interval when the queue is empty. const IDLE_POLL_INTERVAL: Duration = Duration::from_millis(500); /// How long a `running` job can sit before the reaper resets it to `queued`. /// A scan that legitimately takes longer than this is an outlier and warrants /// admin attention anyway. const STUCK_JOB_SECS: i64 = 300; /// Cadence at which any worker tries to reap stuck jobs. const REAPER_INTERVAL: Duration = Duration::from_secs(60); /// Shared dependencies the worker pool needs. pub struct WorkerContext { pub db: PgPool, pub s3: Arc, pub pipeline: Arc, pub scan_semaphore: Arc, pub wam: Option, } /// Spawn `n` scan workers on the current tokio runtime. Each worker drains /// `scan_jobs` independently with FOR UPDATE SKIP LOCKED. A single reaper /// task per pool resets jobs that get stuck in `running`. /// /// All tasks observe `shutdown_rx`: when the sender is dropped (or the value /// changes), they exit on their next idle cycle. pub fn spawn_pool(n: usize, ctx: Arc, shutdown_rx: tokio::sync::watch::Receiver<()>) { for worker_id in 0..n { let ctx = Arc::clone(&ctx); let mut shutdown_rx = shutdown_rx.clone(); tokio::spawn(async move { tracing::info!(worker_id, "scan worker started"); loop { match db::scan_jobs::claim_next(&ctx.db).await { Ok(Some(job)) => { let job_id = job.id; if let Err(e) = process_job(&ctx, job).await { tracing::error!(worker_id, %job_id, error = %e, "scan job failed"); if let Err(e2) = db::scan_jobs::mark_failed(&ctx.db, job_id, &e.to_string()).await { tracing::error!(worker_id, %job_id, error = %e2, "failed to mark job failed"); } } } Ok(None) => { tokio::select! { _ = tokio::time::sleep(IDLE_POLL_INTERVAL) => {} res = shutdown_rx.changed() => { if res.is_err() { tracing::info!(worker_id, "scan worker shutting down"); break; } } } } Err(e) => { tracing::error!(worker_id, error = %e, "claim_next failed; backing off"); tokio::select! { _ = tokio::time::sleep(Duration::from_secs(5)) => {} res = shutdown_rx.changed() => { if res.is_err() { break; } } } } } } }); } let ctx_reaper = Arc::clone(&ctx); let mut shutdown_rx = shutdown_rx; tokio::spawn(async move { loop { match db::scan_jobs::reap_stuck(&ctx_reaper.db, STUCK_JOB_SECS).await { Ok(n) if n > 0 => { tracing::warn!(reset = n, max_age_secs = STUCK_JOB_SECS, "reset stuck scan jobs"); } Ok(_) => {} Err(e) => tracing::error!(error = %e, "scan job reaper failed"), } tokio::select! { _ = tokio::time::sleep(REAPER_INTERVAL) => {} res = shutdown_rx.changed() => { if res.is_err() { break; } } } } }); } /// Test/dev helper: claim and process at most one queued scan job synchronously. /// Returns `Ok(true)` when a job ran, `Ok(false)` when the queue was empty. /// Mirrors `spawn_pool`'s per-iteration logic without spawning a background /// task, so integration tests can deterministically drain the queue between /// upload-confirm and assertion. pub async fn process_next_for_test(ctx: &WorkerContext) -> Result> { match db::scan_jobs::claim_next(&ctx.db).await? { Some(job) => { let job_id = job.id; if let Err(e) = process_job(ctx, job).await { db::scan_jobs::mark_failed(&ctx.db, job_id, &e.to_string()).await?; return Err(e); } Ok(true) } None => Ok(false), } } /// Run a single scan job end-to-end. On success the job is marked done; the /// caller marks failed if this returns an error. /// /// On pipeline error (e.g. S3 download failure), reset the entity from /// Scanning back to HeldForReview before bubbling the error up. Otherwise /// the entity stays stuck at Scanning forever — a real regression we hit /// in production with stale s3_keys. #[tracing::instrument(skip_all, fields(%job_id = job.id, target_kind = %job.target_kind, %target_id = job.target_id, attempts = job.attempts))] async fn process_job(ctx: &WorkerContext, job: ScanJob) -> Result<(), Box> { let job_id = job.id; let kind = job.typed_kind().ok_or_else(|| format!("unknown target_kind: {}", job.target_kind))?; let file_type = job.typed_file_type().ok_or_else(|| format!("unknown file_type: {}", job.file_type))?; let target_id = job.target_id; // Mark target as Scanning while the worker is running (only entities with // a scan_status column). This is a visible signal in the admin dashboard // queue panel. update_entity_status(&ctx.db, kind, target_id, FileScanStatus::Scanning).await.ok(); let entity_status = match run_pipeline_and_decide(ctx, &job, kind, file_type).await { Ok(s) => s, Err(e) => { // Pipeline blew up — most often a stale s3_key. Reset entity to // HeldForReview so admins see it on the dashboard and decide // whether to delete the orphan record. update_entity_status(&ctx.db, kind, target_id, FileScanStatus::HeldForReview).await.ok(); return Err(e); } }; update_entity_status(&ctx.db, kind, target_id, entity_status).await?; db::scan_jobs::mark_done(&ctx.db, job_id).await?; Ok(()) } /// Run the pipeline against the S3 object and return the entity status to /// apply, honoring the size guard, trust gate, and WAM ticketing. async fn run_pipeline_and_decide( ctx: &WorkerContext, job: &ScanJob, kind: ScanTargetKind, file_type: FileType, ) -> Result> { // Two paths, gated on file size. Small files go through the original // buffered `Pipeline::scan(Vec)`: a single S3 GET into a heap // buffer, then layers walk the slice. Big files (>= SCAN_MAX_MEMORY_BYTES) // stream from S3 into a tempfile under SCAN_SPOOL_DIR, then layers // run against the spooled path (mmap or streamed). The buffered path // stays alive: it's the hot path for tip-jar avatars / small audio / // download files, and avoiding the tempfile syscall + write matters at // that scale. Both branches run S3 IO *outside* the scan_semaphore: // the permit bounds the CPU/clamd-heavy scan phase, not network IO. // Holding it across the GET serializes downloads at SCAN_MAX_CONCURRENT // and lets a scan backlog starve the DB pool. let result: ScanResult = if (job.file_size_bytes as usize) < constants::SCAN_MAX_MEMORY_BYTES { let data = ctx.s3.download_object(&job.s3_key).await?; let _permit = ctx.scan_semaphore.acquire().await?; Arc::clone(&ctx.pipeline).scan(data, file_type).await } else { let stream = ctx.s3.download_stream(&job.s3_key).await?; let spool = super::spool::download_into_tempfile( std::path::Path::new(constants::SCAN_SPOOL_DIR), &job.id.to_string(), &job.s3_key, job.file_size_bytes as u64, stream, ) .await?; let _permit = ctx.scan_semaphore.acquire().await?; Arc::clone(&ctx.pipeline).scan_stream(spool, file_type).await }; db::scanning::insert_scan_result(&ctx.db, &job.s3_key, &result).await?; if result.status == FileScanStatus::Quarantined { let failed_layers: Vec<&str> = result.layers.iter() .filter(|l| l.verdict == LayerVerdict::Fail) .map(|l| l.layer) .collect(); if let Some(ref wam) = ctx.wam { let title = format!("File quarantined: {}", job.s3_key); let body = format!( "Upload by user {} flagged as malicious.\n\ Failed layers: {}\nFile type: {file_type:?}\nSize: {}", job.user_id, failed_layers.join(", "), job.file_size_bytes, ); wam.create_ticket(&title, Some(&body), "high", "malware-quarantine", Some(&job.s3_key)).await; } // CDN-served image kinds (project/gallery/content-insertion) have no // scan_status column and no app-proxied download route to gate on, so a // verdict has to be enforced by removing the content, not by flipping a // status. Two-step enforcement: // // 1. Delete the DB image row. This stops the app from ever rendering // the (Cloudflare-served) URL again AND makes the s3_key non-live, // so the durable-deletion queue below won't park it behind the // `is_s3_key_live` guard. // 2. Delete the S3 object; on failure, enqueue it for durable retry // (the scheduler worker retries to completion and dead-letters on // exhaustion) rather than a single best-effort attempt that could // leave the origin object behind forever. // // Residual exposure: Cloudflare caches these objects immutably for a // year, so an already-edge-cached copy survives origin deletion until // the cache TTL lapses. Purging the edge cache needs the Cloudflare API // (token + zone), which the app doesn't currently hold — the WAM ticket // above is the manual trigger for that. The verdict + failed layers stay // in file_scan_results for admin review. if kind.is_cdn_served_without_gate() { match db::scanning::purge_cdn_image_rows_by_key(&ctx.db, &job.s3_key).await { Ok(n) => tracing::warn!( s3_key = %job.s3_key, target_kind = %kind.as_str(), rows_removed = n, "removed quarantined CDN-served image row(s); URL is no longer rendered" ), Err(e) => tracing::error!( s3_key = %job.s3_key, target_kind = %kind.as_str(), error = %e, "FAILED to remove quarantined image row(s); the URL may still render until manual removal" ), } match ctx.s3.delete_object(&job.s3_key).await { Ok(()) => tracing::warn!( s3_key = %job.s3_key, target_kind = %kind.as_str(), "purged quarantined CDN-served image object from storage" ), Err(e) => { tracing::error!( s3_key = %job.s3_key, target_kind = %kind.as_str(), error = %e, "immediate purge of quarantined image object failed; enqueuing for durable retry" ); if let Err(enqueue_err) = db::pending_s3_deletions::enqueue_deletions( &ctx.db, &[(job.s3_key.clone(), "main".to_string())], "malware_quarantine", ) .await { tracing::error!( s3_key = %job.s3_key, error = %enqueue_err, "FAILED to enqueue quarantined image object for durable deletion; object may persist at origin until manual removal" ); } } } } return Ok(FileScanStatus::Quarantined); } // Pipeline returned Clean or HeldForReview. Apply the uploader-trust // overlay: untrusted users always route to admin review even on a clean // scan. This preserves the pre-async semantics — the architecture changed, // not the policy. let is_trusted = db::users::is_upload_trusted(&ctx.db, job.user_id).await?; Ok(if is_trusted { FileScanStatus::Clean } else { FileScanStatus::HeldForReview }) } /// Update the per-entity `scan_status` column for the kinds that have one. /// `ProjectImage` / `ContentInsertion` don't carry their own column today — /// the worker still scanned the file and recorded results, but there's no /// status to flip on those entities. async fn update_entity_status( db: &PgPool, kind: ScanTargetKind, target_id: Uuid, status: FileScanStatus, ) -> Result<(), sqlx::Error> { match kind { ScanTargetKind::Version => { db::scanning::update_version_scan_status(db, VersionId::from(target_id), status).await } ScanTargetKind::Item | ScanTargetKind::ItemImage => { db::scanning::update_item_scan_status(db, ItemId::from(target_id), status).await } ScanTargetKind::Media => { db::scanning::update_media_file_scan_status( db, db::MediaFileId::from(target_id), status, ) .await } ScanTargetKind::ProjectImage | ScanTargetKind::GalleryImage | ScanTargetKind::ContentInsertion => Ok(()), } }