//! Async scan-job queue for the malware pipeline. //! //! Upload routes call [`enqueue`] to register a scan job and return their //! request to the client. A pool of scan workers (`crate::scanning::worker`) //! drains the queue with `FOR UPDATE SKIP LOCKED` via [`claim_next`], runs the //! pipeline against the S3 object, and finalizes the job with [`mark_done`] or //! [`mark_failed`]. //! //! See `docs/scan-pipeline-audit.md` for the wider architecture. use chrono::{DateTime, Utc}; use sqlx::{FromRow, PgPool}; use uuid::Uuid; use crate::storage::FileType; use super::UserId; /// The entity whose `scan_status` the worker should update when the scan /// completes. `Item`, `Version`, and `Media` have `scan_status` columns; /// `ProjectImage` and `ContentInsertion` do not — for those, the worker /// still scans (recording results in `file_scan_results`) but only acts on /// `Quarantined` by creating a WAM ticket for admin follow-up. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ScanTargetKind { Item, Version, Media, ProjectImage, ItemImage, GalleryImage, ContentInsertion, } impl ScanTargetKind { pub fn as_str(&self) -> &'static str { match self { ScanTargetKind::Item => "item", ScanTargetKind::Version => "version", ScanTargetKind::Media => "media", ScanTargetKind::ProjectImage => "project_image", ScanTargetKind::ItemImage => "item_image", ScanTargetKind::GalleryImage => "gallery_image", ScanTargetKind::ContentInsertion => "content_insertion", } } pub fn from_str(s: &str) -> Option { Some(match s { "item" => ScanTargetKind::Item, "version" => ScanTargetKind::Version, "media" => ScanTargetKind::Media, "project_image" => ScanTargetKind::ProjectImage, "item_image" => ScanTargetKind::ItemImage, "gallery_image" => ScanTargetKind::GalleryImage, "content_insertion" => ScanTargetKind::ContentInsertion, _ => return None, }) } /// Whether this kind is served directly from the CDN by `s3_key` with no /// app-proxied download route that can consult a `scan_status` column. /// /// `Item`/`Version`/`Media` are gated at their download handlers (the route /// refuses to issue a non-`Clean` object). These three image kinds carry no /// `scan_status` column and are rendered straight from `cdn.makenot.work/{key}`, /// so the ONLY enforcement point for a `Quarantined` verdict is removing the /// object from storage — otherwise the malicious file keeps serving from the /// CDN until an admin manually acts on the WAM ticket. The worker purges the /// object for these kinds on quarantine. pub fn is_cdn_served_without_gate(&self) -> bool { matches!( self, ScanTargetKind::ProjectImage | ScanTargetKind::GalleryImage | ScanTargetKind::ContentInsertion ) } } /// A queued or running scan job, as claimed by a worker. Most fields are /// populated via `FromRow` from sqlx; fields not consumed by the worker /// today are kept for the admin dashboard (Phase 2 of the audit). #[allow(dead_code)] #[derive(Debug, Clone, FromRow)] pub struct ScanJob { pub id: Uuid, pub target_kind: String, pub target_id: Uuid, pub s3_key: String, pub file_type: String, pub user_id: UserId, pub file_size_bytes: i64, pub status: String, pub attempts: i32, pub enqueued_at: DateTime, pub started_at: Option>, pub completed_at: Option>, pub last_error: Option, } impl ScanJob { pub fn typed_kind(&self) -> Option { ScanTargetKind::from_str(&self.target_kind) } pub fn typed_file_type(&self) -> Option { self.file_type.parse().ok() } } /// Enqueue a scan job. Returns the job id. #[tracing::instrument(skip_all, fields(target_kind = target_kind.as_str(), %target_id, s3_key))] pub async fn enqueue( db: &PgPool, target_kind: ScanTargetKind, target_id: Uuid, s3_key: &str, file_type: FileType, user_id: UserId, file_size_bytes: i64, ) -> Result { let id = sqlx::query_scalar::<_, Uuid>( r#" INSERT INTO scan_jobs (target_kind, target_id, s3_key, file_type, user_id, file_size_bytes) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id "#, ) .bind(target_kind.as_str()) .bind(target_id) .bind(s3_key) .bind(file_type.as_str()) .bind(user_id) .bind(file_size_bytes) .fetch_one(db) .await?; Ok(id) } /// Atomically claim the next queued scan job for processing. /// /// Uses `FOR UPDATE SKIP LOCKED` so multiple workers can drain the queue in /// parallel without contention. Sets `status='running'`, increments `attempts`, /// and stamps `started_at`. Returns `Ok(None)` if the queue is empty. #[tracing::instrument(skip_all)] pub async fn claim_next(db: &PgPool) -> Result, sqlx::Error> { let job = sqlx::query_as::<_, ScanJob>( r#" WITH next AS ( SELECT id FROM scan_jobs WHERE status = 'queued' ORDER BY enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT 1 ) UPDATE scan_jobs SET status = 'running', attempts = attempts + 1, started_at = NOW() WHERE id = (SELECT id FROM next) RETURNING * "#, ) .fetch_optional(db) .await?; Ok(job) } /// Mark a job as completed successfully. #[tracing::instrument(skip_all, fields(%job_id))] pub async fn mark_done(db: &PgPool, job_id: Uuid) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE scan_jobs SET status = 'done', completed_at = NOW(), last_error = NULL WHERE id = $1 "#, ) .bind(job_id) .execute(db) .await?; Ok(()) } /// Mark a job as failed (worker exception, S3 fetch failure, etc.). /// /// Records `last_error` for admin inspection. The job stays in `failed` /// status; an admin or operator can manually re-enqueue via the dashboard /// (Phase 2) by inserting a fresh row. #[tracing::instrument(skip_all, fields(%job_id))] pub async fn mark_failed(db: &PgPool, job_id: Uuid, err: &str) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE scan_jobs SET status = 'failed', completed_at = NOW(), last_error = $1 WHERE id = $2 "#, ) .bind(err) .bind(job_id) .execute(db) .await?; Ok(()) } /// Reset jobs that have been stuck in `running` longer than `max_age_secs`. /// /// Run on worker startup to recover from a previous-process crash mid-scan: /// the row would otherwise stay `running` forever and never be re-claimed. /// Increments `attempts` so a perpetually-crashing scan eventually trips the /// retry budget. #[tracing::instrument(skip_all)] pub async fn reap_stuck(db: &PgPool, max_age_secs: i64) -> Result { let affected = sqlx::query( r#" UPDATE scan_jobs SET status = 'queued', started_at = NULL WHERE status = 'running' AND started_at < NOW() - ($1 || ' seconds')::interval "#, ) .bind(max_age_secs.to_string()) .execute(db) .await? .rows_affected(); Ok(affected) } /// Delete terminal-state rows older than `older_than`. Returns the count. /// /// Only touches `done`/`failed` rows — operational state (`queued`, /// `running`) is owned by the worker loop and `reap_stuck`. The verdict /// (Clean / Quarantined / HeldForReview) lives on the entity's /// `scan_status` column, not here, so dropping a `done` row loses queue /// history only, not malware-detection state. /// /// No supporting index today: at soft-launch volume Postgres seq-scans the /// table fine. Revisit once `EXPLAIN ANALYZE` shows it as a bottleneck. #[tracing::instrument(skip_all)] pub async fn purge_old_terminal( db: &PgPool, older_than: chrono::Duration, ) -> Result { let cutoff = chrono::Utc::now() - older_than; let n = sqlx::query( r#" DELETE FROM scan_jobs WHERE status IN ('done', 'failed') AND COALESCE(completed_at, started_at, enqueued_at) < $1 "#, ) .bind(cutoff) .execute(db) .await? .rows_affected(); Ok(n) } /// Count of currently-queued jobs. Used by the admin dashboard health panel /// (Phase 2 of the audit). Allowed dead code until that route lands. #[allow(dead_code)] pub async fn queued_count(db: &PgPool) -> Result { sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM scan_jobs WHERE status = 'queued'") .fetch_one(db) .await } /// Count of currently-running jobs. Phase 2 dashboard consumer. #[allow(dead_code)] pub async fn running_count(db: &PgPool) -> Result { sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM scan_jobs WHERE status = 'running'") .fetch_one(db) .await } /// Count of running jobs that have been in flight longer than `max_age_secs`. /// Used by PoM to alert on stuck workers. pub async fn stuck_count(db: &PgPool, max_age_secs: i64) -> Result { sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM scan_jobs WHERE status = 'running' AND started_at < NOW() - ($1 || ' seconds')::interval", ) .bind(max_age_secs.to_string()) .fetch_one(db) .await } /// A held version with enough context to re-enqueue it for scanning. #[allow(dead_code)] #[derive(Debug, Clone, FromRow)] pub struct RescanCandidateVersion { pub version_id: Uuid, pub s3_key: String, pub file_size_bytes: i64, pub user_id: UserId, } /// A held item (audio or cover) with re-enqueue context. #[allow(dead_code)] #[derive(Debug, Clone, FromRow)] pub struct RescanCandidateItem { pub item_id: Uuid, pub s3_key: String, pub file_size_bytes: i64, pub user_id: UserId, /// Audio / cover, selected by the query. pub file_type: String, } /// Find currently-held versions with enough context to be re-scanned. pub async fn rescan_candidates_versions(db: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, RescanCandidateVersion>( r#" SELECT v.id AS version_id, v.s3_key, COALESCE(v.file_size_bytes, 0) AS file_size_bytes, p.user_id FROM versions v JOIN items i ON i.id = v.item_id JOIN projects p ON p.id = i.project_id WHERE v.scan_status = 'held_for_review' AND v.s3_key IS NOT NULL "#, ) .fetch_all(db) .await } /// Find currently-held items (audio or cover) with re-enqueue context. pub async fn rescan_candidates_items(db: &PgPool) -> Result, sqlx::Error> { sqlx::query_as::<_, RescanCandidateItem>( r#" SELECT i.id AS item_id, COALESCE(i.audio_s3_key, i.cover_s3_key) AS s3_key, COALESCE(i.audio_file_size_bytes, i.cover_file_size_bytes, 0) AS file_size_bytes, p.user_id, CASE WHEN i.audio_s3_key IS NOT NULL THEN 'audio' ELSE 'cover' END AS file_type FROM items i JOIN projects p ON p.id = i.project_id WHERE i.scan_status = 'held_for_review' AND COALESCE(i.audio_s3_key, i.cover_s3_key) IS NOT NULL "#, ) .fetch_all(db) .await } #[cfg(test)] mod tests { use super::*; #[test] fn target_kind_round_trip() { for kind in [ ScanTargetKind::Item, ScanTargetKind::Version, ScanTargetKind::Media, ScanTargetKind::ProjectImage, ScanTargetKind::ItemImage, ScanTargetKind::GalleryImage, ScanTargetKind::ContentInsertion, ] { assert_eq!(ScanTargetKind::from_str(kind.as_str()), Some(kind)); } assert_eq!(ScanTargetKind::from_str("bogus"), None); } }