Skip to main content

max / makenotwork

server: rewrite scan pipeline (async + dashboard + 5 new layers) Implements Phases 1-3 of docs/scan-pipeline-audit.md as a single coherent rewrite of the malware-scan pipeline. Triggered by the MalwareBazaar regression that silently held every upload since 2026-05-10 because of the global fail-closed-by-default policy. Architecture (Phase 1) - Per-layer ERROR_POLICY const co-located in each layer file (FailClosed for in-process deterministic layers; FailOpen for external network layers). final_status aggregator consults each layer's declared policy via error_policy_for. Removes the global has-error switch that caused the regression. - Async scan_jobs queue (migration 121) drained by a pool of tokio workers (constants::SCAN_WORKER_COUNT = 2) using FOR UPDATE SKIP LOCKED. Upload routes enqueue and return Pending immediately; worker flips the target entity's scan_status when done. Stuck-job reaper resets jobs older than 5 minutes. - scan_and_classify is replaced by enqueue_scan_for; all 7 callsites migrated. Trusted-user fast-path preserved; quarantine-then-bail semantics replaced by admin dashboard cleanup. - FileScanStatus::Scanning added. Admin dashboard (Phase 2) - /admin/uploads expanded to three-section layout (Pipeline Health panel, Active Queue counts, Held for Review table with per-layer chips) plus collapsible Recent History grid and full audit log at /admin/uploads/audit. - scan_admin_actions table (migration 122) records every promote, quarantine, and rescan. Each held row carries an inline "last: promote by max 2m ago" tooltip via batched DISTINCT ON lookups. - Per-row actions: Promote, Quarantine, Re-scan. Bulk operation: Re-scan all held (the operational fix for the existing 9-version backlog after deploy). - HTMX polls /admin/uploads/queue-summary every 10s for live pending+running counts. - CSS for layer chips (verdict color-coded), layer health cards (status-badge left border), scan-history grid. Layer-set expansion (Phase 3) - malwarebazaar: adds Auth-Key header for the abuse.ch API change, short-circuits to Skip without a key, explicit handling for unauthorized / key_required / key_invalid status values. - urlhaus: new layer. Pure-Rust URL extraction from binary strings, dedupes hosts, queries up to 5 against URLhaus with same key. - signing_macos: new layer (apple-codesign). Mach-O / .app / DMG verification with team-id extraction and notarization staple presence detection. Powered by indygreg's apple-codesign 0.29.0. - signing_windows: new layer (authenticode). PE detection, Authenticode signature parse, subject-CN extraction. - signing_linux: new layer. AppImage detection via ELF + AI marker; presence check for .sha256_sig + .sig_key sections. - metadefender: new layer. Second-opinion only — invoked when another layer flagged the upload suspicious (suspicion_present gating). Free-tier 40/day. Fail-open per policy. - Config: URLHAUS_ENABLED, ABUSE_CH_AUTH_KEY, METADEFENDER_API_KEY env vars wired through ScanConfig. Trade-offs - Quarantine is no longer rejected synchronously. The upload completes, scan runs async, admin handles cleanup via dashboard. - Project images and content_insertions enqueue but have no entity scan_status column — worker only records file_scan_results and tickets quarantine via WAM. - Notarization staple verification is presence + structural sanity; cryptographic CMS chain verification against Apple's notarization CA is deferred to a hardening pass. Tests: 1622 passing, including 21 new policy + suspicion + classify tests + per-layer verdict tests for each of the 5 new layers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-24 20:54 UTC
Commit: 34a68b7bc9683da41eaa2564c1a1af99206bd876
Parent: 0d02c2f
41 files changed, +3762 insertions, -236 deletions
@@ -0,0 +1,48 @@
1 + -- Async scan worker queue.
2 + --
3 + -- Before this migration, every upload route called `scan_and_classify`
4 + -- synchronously: the request handler held the S3 download + the full
5 + -- in-memory scan pipeline (incl. third-party network calls to abuse.ch /
6 + -- ClamAV) on its own thread, with the scan semaphore serializing concurrent
7 + -- uploads. A stuck third-party endpoint could stall every upload.
8 + --
9 + -- Going forward, the upload route enqueues a row here and returns immediately
10 + -- with `scan_status = 'pending'`. A pool of scan workers (spawned from
11 + -- main.rs) drains the queue with FOR UPDATE SKIP LOCKED, runs the pipeline,
12 + -- and updates the target entity's scan_status when done.
13 + --
14 + -- target_kind/target_id identifies which entity to update on completion:
15 + -- 'version' → versions.id
16 + -- 'item' → items.id (item-level uploads, image-only items)
17 + -- 'project_image' → projects.id (project cover image)
18 + -- 'item_image' → items.id (item cover image; same row, different field)
19 + -- 'media' → media.id
20 + -- 'content_insertion' → content_insertions.id
21 +
22 + CREATE TABLE scan_jobs (
23 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
24 + target_kind TEXT NOT NULL CHECK (target_kind IN (
25 + 'version', 'item', 'project_image', 'item_image', 'media', 'content_insertion'
26 + )),
27 + target_id UUID NOT NULL,
28 + s3_key TEXT NOT NULL,
29 + file_type TEXT NOT NULL,
30 + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
31 + file_size_bytes BIGINT NOT NULL,
32 + status TEXT NOT NULL DEFAULT 'queued' CHECK (status IN ('queued', 'running', 'done', 'failed')),
33 + attempts INT NOT NULL DEFAULT 0,
34 + enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
35 + started_at TIMESTAMPTZ,
36 + completed_at TIMESTAMPTZ,
37 + last_error TEXT
38 + );
39 +
40 + -- Worker claim path: pull the oldest queued job. Partial index keeps it tiny
41 + -- once the queue mostly contains completed/failed rows.
42 + CREATE INDEX idx_scan_jobs_queued ON scan_jobs (enqueued_at) WHERE status = 'queued';
43 +
44 + -- Stuck-scan detection in the admin dashboard.
45 + CREATE INDEX idx_scan_jobs_running ON scan_jobs (started_at) WHERE status = 'running';
46 +
47 + -- History lookups by target entity (e.g. "show me the scan history for this version").
48 + CREATE INDEX idx_scan_jobs_target ON scan_jobs (target_kind, target_id);
@@ -0,0 +1,32 @@
1 + -- Audit log of admin scan-pipeline actions.
2 + --
3 + -- Every promote / quarantine / rescan that an admin performs from the
4 + -- /admin/uploads dashboard writes a row here. The dashboard surfaces the
5 + -- last action inline on each held row; the full log lives at
6 + -- /admin/uploads/audit (Phase 2 of `docs/scan-pipeline-audit.md`).
7 + --
8 + -- One of `version_id` or `item_id` is populated to identify the target;
9 + -- bulk operations write one row per affected target (no separate "bulk"
10 + -- target_kind). `action` covers both per-row and bulk variants so we can
11 + -- distinguish a single promote from one issued via bulk-promote when
12 + -- reviewing intent later.
13 +
14 + CREATE TABLE scan_admin_actions (
15 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
16 + version_id UUID REFERENCES versions(id) ON DELETE SET NULL,
17 + item_id UUID REFERENCES items(id) ON DELETE SET NULL,
18 + admin_id UUID NOT NULL REFERENCES users(id) ON DELETE RESTRICT,
19 + action TEXT NOT NULL CHECK (action IN (
20 + 'promote', 'quarantine', 'rescan',
21 + 'bulk_promote', 'bulk_rescan'
22 + )),
23 + prev_status TEXT,
24 + new_status TEXT,
25 + note TEXT,
26 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
27 + );
28 +
29 + CREATE INDEX idx_scan_admin_actions_version ON scan_admin_actions(version_id) WHERE version_id IS NOT NULL;
30 + CREATE INDEX idx_scan_admin_actions_item ON scan_admin_actions(item_id) WHERE item_id IS NOT NULL;
31 + CREATE INDEX idx_scan_admin_actions_admin ON scan_admin_actions(admin_id, created_at DESC);
32 + CREATE INDEX idx_scan_admin_actions_created ON scan_admin_actions(created_at DESC);
@@ -352,6 +352,16 @@ pub struct ScanConfig {
352 352 pub yara_rules_dir: String,
353 353 /// Whether to enable MalwareBazaar hash lookups
354 354 pub malwarebazaar_enabled: bool,
355 + /// Whether to enable URLhaus URL-reputation lookups
356 + pub urlhaus_enabled: bool,
357 + /// Shared abuse.ch Auth-Key (issued at https://auth.abuse.ch/). Required
358 + /// for MalwareBazaar and URLhaus as of 2024+; without it both layers
359 + /// fail-open and the dashboard surfaces them as degraded.
360 + pub abuse_ch_auth_key: Option<String>,
361 + /// MetaDefender Cloud API key (free tier at
362 + /// <https://metadefender.com/account>). Second-opinion layer; only
363 + /// invoked when another layer flagged the file as suspicious.
364 + pub metadefender_api_key: Option<String>,
355 365 }
356 366
357 367 impl ScanConfig {
@@ -373,6 +383,11 @@ impl ScanConfig {
373 383 malwarebazaar_enabled: std::env::var("MALWAREBAZAAR_ENABLED")
374 384 .map(|v| v != "false" && v != "0")
375 385 .unwrap_or(true),
386 + urlhaus_enabled: std::env::var("URLHAUS_ENABLED")
387 + .map(|v| v != "false" && v != "0")
388 + .unwrap_or(true),
389 + abuse_ch_auth_key: std::env::var("ABUSE_CH_AUTH_KEY").ok().filter(|s| !s.is_empty()),
390 + metadefender_api_key: std::env::var("METADEFENDER_API_KEY").ok().filter(|s| !s.is_empty()),
376 391 })
377 392 }
378 393 }
@@ -383,6 +398,9 @@ impl std::fmt::Debug for ScanConfig {
383 398 .field("clamav_socket", &self.clamav_socket)
384 399 .field("yara_rules_dir", &self.yara_rules_dir)
385 400 .field("malwarebazaar_enabled", &self.malwarebazaar_enabled)
401 + .field("urlhaus_enabled", &self.urlhaus_enabled)
402 + .field("abuse_ch_auth_key", &self.abuse_ch_auth_key.as_ref().map(|_| "<set>"))
403 + .field("metadefender_api_key", &self.metadefender_api_key.as_ref().map(|_| "<set>"))
386 404 .finish()
387 405 }
388 406 }
@@ -517,7 +535,8 @@ mod tests {
517 535 "SYNCKIT_S3_SECRET_KEY", "SYNCKIT_S3_REGION",
518 536 "STRIPE_SECRET_KEY", "STRIPE_WEBHOOK_SECRET", "STRIPE_WEBHOOK_SECRET_V2",
519 537 "ADMIN_USER_ID", "SYNCKIT_JWT_SECRET", "SCAN_ENABLED", "CLAMAV_SOCKET",
520 - "YARA_RULES_DIR", "MALWAREBAZAAR_ENABLED", "GIT_REPOS_PATH",
538 + "YARA_RULES_DIR", "MALWAREBAZAAR_ENABLED", "URLHAUS_ENABLED",
539 + "ABUSE_CH_AUTH_KEY", "METADEFENDER_API_KEY", "GIT_REPOS_PATH",
521 540 "POSTMARK_WEBHOOK_TOKEN", "POSTMARK_BROADCAST_WEBHOOK_TOKEN",
522 541 "GIT_SSH_HOST", "MT_BASE_URL", "FAN_PLUS_STRIPE_PRICE_ID",
523 542 "CREATOR_TIER_BASIC_PRICE_ID", "CREATOR_TIER_SMALL_FILES_PRICE_ID",
@@ -145,6 +145,7 @@ pub const PAGINATION_WINDOW_SIZE: u32 = 5;
145 145 // -- File scanning --
146 146 pub const SCAN_MAX_MEMORY_BYTES: usize = 100 * 1024 * 1024; // 100 MB in-memory threshold
147 147 pub const SCAN_MAX_CONCURRENT: usize = 4; // Max concurrent file scans (each can use up to 100 MB RAM)
148 + pub const SCAN_WORKER_COUNT: usize = 2; // Background worker tasks draining scan_jobs queue
148 149
149 150 // -- Caddy on-demand TLS --
150 151 // Caps concurrent cache-miss DB lookups in `/api/domains/caddy-ask`. Cache hits
@@ -277,10 +277,22 @@ impl_str_enum!(SyncPlatform {
277 277
278 278 // ── File scanning ──
279 279
280 + /// Status of an uploaded file in the scan pipeline.
281 + ///
282 + /// `Pending` — accepted, waiting in `scan_jobs` queue for a worker.
283 + /// `Scanning` — worker has claimed the job and is running the pipeline.
284 + /// `Clean` — pipeline completed, no Fail verdicts, no fail-closed Errors.
285 + /// `HeldForReview` — pipeline completed with a fail-closed Error, OR the
286 + /// uploader is untrusted (every untrusted upload routes to admin review).
287 + /// `Quarantined` — pipeline returned a Fail verdict on at least one layer.
288 + /// `Error` — pipeline itself crashed (worker exception, S3 fetch failed, etc.).
289 + ///
290 + /// State machine in `docs/scan-pipeline-audit.md`.
280 291 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
281 292 #[serde(rename_all = "snake_case")]
282 293 pub enum FileScanStatus {
283 294 Pending,
295 + Scanning,
284 296 Clean,
285 297 Quarantined,
286 298 HeldForReview,
@@ -289,6 +301,7 @@ pub enum FileScanStatus {
289 301
290 302 impl_str_enum!(FileScanStatus {
291 303 Pending => "pending",
304 + Scanning => "scanning",
292 305 Clean => "clean",
293 306 Quarantined => "quarantined",
294 307 HeldForReview => "held_for_review",
@@ -1068,6 +1081,10 @@ mod tests {
1068 1081 #[test]
1069 1082 fn file_scan_status_round_trip() {
1070 1083 assert_eq!(FileScanStatus::Clean.to_string(), "clean");
1084 + assert_eq!(FileScanStatus::Pending.to_string(), "pending");
1085 + assert_eq!(FileScanStatus::Scanning.to_string(), "scanning");
1086 + assert_eq!("pending".parse::<FileScanStatus>().unwrap(), FileScanStatus::Pending);
1087 + assert_eq!("scanning".parse::<FileScanStatus>().unwrap(), FileScanStatus::Scanning);
1071 1088 assert_eq!("held_for_review".parse::<FileScanStatus>().unwrap(), FileScanStatus::HeldForReview);
1072 1089 assert_eq!(FileScanStatus::HeldForReview.to_string(), "held_for_review");
1073 1090 assert_eq!("quarantined".parse::<FileScanStatus>().unwrap(), FileScanStatus::Quarantined);
@@ -36,6 +36,8 @@ pub(crate) mod passkeys;
36 36 pub(crate) mod health;
37 37 pub(crate) mod monitor;
38 38 pub(crate) mod scanning;
39 + pub(crate) mod scan_jobs;
40 + pub(crate) mod scan_admin_actions;
39 41 pub(crate) mod content_insertions;
40 42 pub(crate) mod invites;
41 43 pub(crate) mod analytics;
@@ -0,0 +1,247 @@
1 + //! Audit log for admin scan-pipeline actions.
2 + //!
3 + //! Every promote / quarantine / rescan from the `/admin/uploads` dashboard
4 + //! writes one row here. Bulk operations write one row per affected target
5 + //! (the `action` column distinguishes per-row from bulk). See
6 + //! `docs/scan-pipeline-audit.md` § 5.2 (Audit Trail).
7 +
8 + use chrono::{DateTime, Utc};
9 + use sqlx::{FromRow, PgPool};
10 + use uuid::Uuid;
11 +
12 + use super::{ItemId, UserId, VersionId};
13 +
14 + #[derive(Debug, Clone, Copy, PartialEq, Eq)]
15 + pub enum AdminAction {
16 + Promote,
17 + Quarantine,
18 + Rescan,
19 + /// Reserved for the Phase 2b bulk-promote action; not yet wired into routes.
20 + #[allow(dead_code)]
21 + BulkPromote,
22 + BulkRescan,
23 + }
24 +
25 + impl AdminAction {
26 + pub fn as_str(&self) -> &'static str {
27 + match self {
28 + AdminAction::Promote => "promote",
29 + AdminAction::Quarantine => "quarantine",
30 + AdminAction::Rescan => "rescan",
31 + AdminAction::BulkPromote => "bulk_promote",
32 + AdminAction::BulkRescan => "bulk_rescan",
33 + }
34 + }
35 + }
36 +
37 + /// An audit-log row. Either `version_id` or `item_id` is populated.
38 + #[allow(dead_code)]
39 + #[derive(Debug, Clone, FromRow)]
40 + pub struct ScanAdminActionRow {
41 + pub id: Uuid,
42 + pub version_id: Option<Uuid>,
43 + pub item_id: Option<Uuid>,
44 + pub admin_id: UserId,
45 + pub action: String,
46 + pub prev_status: Option<String>,
47 + pub new_status: Option<String>,
48 + pub note: Option<String>,
49 + pub created_at: DateTime<Utc>,
50 + }
51 +
52 + /// Log an admin action against a version.
53 + #[tracing::instrument(skip_all, fields(%version_id, %admin_id, action = action.as_str()))]
54 + pub async fn log_version(
55 + db: &PgPool,
56 + version_id: VersionId,
57 + admin_id: UserId,
58 + action: AdminAction,
59 + prev_status: Option<&str>,
60 + new_status: Option<&str>,
61 + note: Option<&str>,
62 + ) -> Result<(), sqlx::Error> {
63 + sqlx::query(
64 + r#"
65 + INSERT INTO scan_admin_actions (version_id, admin_id, action, prev_status, new_status, note)
66 + VALUES ($1, $2, $3, $4, $5, $6)
67 + "#,
68 + )
69 + .bind(*version_id.as_uuid())
70 + .bind(admin_id)
71 + .bind(action.as_str())
72 + .bind(prev_status)
73 + .bind(new_status)
74 + .bind(note)
75 + .execute(db)
76 + .await?;
77 + Ok(())
78 + }
79 +
80 + /// Log an admin action against an item.
81 + #[tracing::instrument(skip_all, fields(%item_id, %admin_id, action = action.as_str()))]
82 + pub async fn log_item(
83 + db: &PgPool,
84 + item_id: ItemId,
85 + admin_id: UserId,
86 + action: AdminAction,
87 + prev_status: Option<&str>,
88 + new_status: Option<&str>,
89 + note: Option<&str>,
90 + ) -> Result<(), sqlx::Error> {
91 + sqlx::query(
92 + r#"
93 + INSERT INTO scan_admin_actions (item_id, admin_id, action, prev_status, new_status, note)
94 + VALUES ($1, $2, $3, $4, $5, $6)
95 + "#,
96 + )
97 + .bind(*item_id.as_uuid())
98 + .bind(admin_id)
99 + .bind(action.as_str())
100 + .bind(prev_status)
101 + .bind(new_status)
102 + .bind(note)
103 + .execute(db)
104 + .await?;
105 + Ok(())
106 + }
107 +
108 + /// Brief "last action" summary attached to a held row inline.
109 + #[derive(Debug, Clone, FromRow)]
110 + pub struct LastActionSummary {
111 + pub action: String,
112 + pub admin_username: String,
113 + pub created_at: DateTime<Utc>,
114 + }
115 +
116 + /// Latest admin action recorded against each version_id in the given set.
117 + /// Returns a map keyed by version_id string. Empty input returns empty map.
118 + pub async fn latest_per_version(
119 + db: &PgPool,
120 + version_ids: &[Uuid],
121 + ) -> Result<std::collections::HashMap<Uuid, LastActionSummary>, sqlx::Error> {
122 + if version_ids.is_empty() {
123 + return Ok(std::collections::HashMap::new());
124 + }
125 + let rows = sqlx::query(
126 + r#"
127 + SELECT DISTINCT ON (saa.version_id)
128 + saa.version_id, saa.action, saa.created_at, u.username AS admin_username
129 + FROM scan_admin_actions saa
130 + JOIN users u ON u.id = saa.admin_id
131 + WHERE saa.version_id = ANY($1)
132 + ORDER BY saa.version_id, saa.created_at DESC
133 + "#,
134 + )
135 + .bind(version_ids)
136 + .fetch_all(db)
137 + .await?;
138 +
139 + use sqlx::Row;
140 + let mut out = std::collections::HashMap::with_capacity(rows.len());
141 + for row in rows {
142 + let id: Uuid = row.try_get("version_id")?;
143 + out.insert(id, LastActionSummary {
144 + action: row.try_get("action")?,
145 + admin_username: row.try_get("admin_username")?,
146 + created_at: row.try_get("created_at")?,
147 + });
148 + }
149 + Ok(out)
150 + }
151 +
152 + /// Latest admin action per item_id in the given set.
153 + pub async fn latest_per_item(
154 + db: &PgPool,
155 + item_ids: &[Uuid],
156 + ) -> Result<std::collections::HashMap<Uuid, LastActionSummary>, sqlx::Error> {
157 + if item_ids.is_empty() {
158 + return Ok(std::collections::HashMap::new());
159 + }
160 + let rows = sqlx::query(
161 + r#"
162 + SELECT DISTINCT ON (saa.item_id)
163 + saa.item_id, saa.action, saa.created_at, u.username AS admin_username
164 + FROM scan_admin_actions saa
165 + JOIN users u ON u.id = saa.admin_id
166 + WHERE saa.item_id = ANY($1)
167 + ORDER BY saa.item_id, saa.created_at DESC
168 + "#,
169 + )
170 + .bind(item_ids)
171 + .fetch_all(db)
172 + .await?;
173 +
174 + use sqlx::Row;
175 + let mut out = std::collections::HashMap::with_capacity(rows.len());
176 + for row in rows {
177 + let id: Uuid = row.try_get("item_id")?;
178 + out.insert(id, LastActionSummary {
179 + action: row.try_get("action")?,
180 + admin_username: row.try_get("admin_username")?,
181 + created_at: row.try_get("created_at")?,
182 + });
183 + }
184 + Ok(out)
185 + }
186 +
187 + /// Audit-log row joined with the actor's username, for the audit page.
188 + #[derive(Debug, Clone, FromRow)]
189 + pub struct AuditLogRow {
190 + pub version_id: Option<Uuid>,
191 + pub item_id: Option<Uuid>,
192 + pub admin_username: String,
193 + pub action: String,
194 + pub prev_status: Option<String>,
195 + pub new_status: Option<String>,
196 + pub note: Option<String>,
197 + pub created_at: DateTime<Utc>,
198 + }
199 +
200 + /// Recent audit entries joined with the admin's username, newest first.
201 + /// Phase 2b consumer for the audit-log page.
202 + pub async fn list_recent_with_admin(db: &PgPool, limit: i64) -> Result<Vec<AuditLogRow>, sqlx::Error> {
203 + sqlx::query_as::<_, AuditLogRow>(
204 + r#"
205 + SELECT saa.version_id, saa.item_id, u.username AS admin_username,
206 + saa.action, saa.prev_status, saa.new_status, saa.note, saa.created_at
207 + FROM scan_admin_actions saa
208 + JOIN users u ON u.id = saa.admin_id
209 + ORDER BY saa.created_at DESC
210 + LIMIT $1
211 + "#,
212 + )
213 + .bind(limit)
214 + .fetch_all(db)
215 + .await
216 + }
217 +
218 + /// Recent audit entries for the full-log page. Phase 2 surface.
219 + #[allow(dead_code)]
220 + pub async fn list_recent(db: &PgPool, limit: i64) -> Result<Vec<ScanAdminActionRow>, sqlx::Error> {
221 + sqlx::query_as::<_, ScanAdminActionRow>(
222 + r#"
223 + SELECT id, version_id, item_id, admin_id, action,
224 + prev_status, new_status, note, created_at
225 + FROM scan_admin_actions
226 + ORDER BY created_at DESC
227 + LIMIT $1
228 + "#,
229 + )
230 + .bind(limit)
231 + .fetch_all(db)
232 + .await
233 + }
234 +
235 + #[cfg(test)]
236 + mod tests {
237 + use super::*;
238 +
239 + #[test]
240 + fn admin_action_as_str() {
241 + assert_eq!(AdminAction::Promote.as_str(), "promote");
242 + assert_eq!(AdminAction::Quarantine.as_str(), "quarantine");
243 + assert_eq!(AdminAction::Rescan.as_str(), "rescan");
244 + assert_eq!(AdminAction::BulkPromote.as_str(), "bulk_promote");
245 + assert_eq!(AdminAction::BulkRescan.as_str(), "bulk_rescan");
246 + }
247 + }
@@ -0,0 +1,306 @@
1 + //! Async scan-job queue for the malware pipeline.
2 + //!
3 + //! Upload routes call [`enqueue`] to register a scan job and return their
4 + //! request to the client. A pool of scan workers (`crate::scanning::worker`)
5 + //! drains the queue with `FOR UPDATE SKIP LOCKED` via [`claim_next`], runs the
6 + //! pipeline against the S3 object, and finalizes the job with [`mark_done`] or
7 + //! [`mark_failed`].
8 + //!
9 + //! See `docs/scan-pipeline-audit.md` for the wider architecture.
10 +
11 + use chrono::{DateTime, Utc};
12 + use sqlx::{FromRow, PgPool};
13 + use uuid::Uuid;
14 +
15 + use crate::storage::FileType;
16 +
17 + use super::UserId;
18 +
19 + /// The entity whose `scan_status` the worker should update when the scan
20 + /// completes. `Item`, `Version`, and `Media` have `scan_status` columns;
21 + /// `ProjectImage` and `ContentInsertion` do not — for those, the worker
22 + /// still scans (recording results in `file_scan_results`) but only acts on
23 + /// `Quarantined` by creating a WAM ticket for admin follow-up.
24 + #[derive(Debug, Clone, Copy, PartialEq, Eq)]
25 + pub enum ScanTargetKind {
26 + Item,
27 + Version,
28 + Media,
29 + ProjectImage,
30 + ItemImage,
31 + ContentInsertion,
32 + }
33 +
34 + impl ScanTargetKind {
35 + pub fn as_str(&self) -> &'static str {
36 + match self {
37 + ScanTargetKind::Item => "item",
38 + ScanTargetKind::Version => "version",
39 + ScanTargetKind::Media => "media",
40 + ScanTargetKind::ProjectImage => "project_image",
41 + ScanTargetKind::ItemImage => "item_image",
42 + ScanTargetKind::ContentInsertion => "content_insertion",
43 + }
44 + }
45 +
46 + pub fn from_str(s: &str) -> Option<Self> {
47 + Some(match s {
48 + "item" => ScanTargetKind::Item,
49 + "version" => ScanTargetKind::Version,
50 + "media" => ScanTargetKind::Media,
51 + "project_image" => ScanTargetKind::ProjectImage,
52 + "item_image" => ScanTargetKind::ItemImage,
53 + "content_insertion" => ScanTargetKind::ContentInsertion,
54 + _ => return None,
55 + })
56 + }
57 + }
58 +
59 + /// A queued or running scan job, as claimed by a worker. Most fields are
60 + /// populated via `FromRow` from sqlx; fields not consumed by the worker
61 + /// today are kept for the admin dashboard (Phase 2 of the audit).
62 + #[allow(dead_code)]
63 + #[derive(Debug, Clone, FromRow)]
64 + pub struct ScanJob {
65 + pub id: Uuid,
66 + pub target_kind: String,
67 + pub target_id: Uuid,
68 + pub s3_key: String,
69 + pub file_type: String,
70 + pub user_id: UserId,
71 + pub file_size_bytes: i64,
72 + pub status: String,
73 + pub attempts: i32,
74 + pub enqueued_at: DateTime<Utc>,
75 + pub started_at: Option<DateTime<Utc>>,
76 + pub completed_at: Option<DateTime<Utc>>,
77 + pub last_error: Option<String>,
78 + }
79 +
80 + impl ScanJob {
81 + pub fn typed_kind(&self) -> Option<ScanTargetKind> {
82 + ScanTargetKind::from_str(&self.target_kind)
83 + }
84 +
85 + pub fn typed_file_type(&self) -> Option<FileType> {
86 + self.file_type.parse().ok()
87 + }
88 + }
89 +
90 + /// Enqueue a scan job. Returns the job id.
91 + #[tracing::instrument(skip_all, fields(target_kind = target_kind.as_str(), %target_id, s3_key))]
92 + pub async fn enqueue(
93 + db: &PgPool,
94 + target_kind: ScanTargetKind,
95 + target_id: Uuid,
96 + s3_key: &str,
97 + file_type: FileType,
98 + user_id: UserId,
99 + file_size_bytes: i64,
100 + ) -> Result<Uuid, sqlx::Error> {
101 + let id = sqlx::query_scalar::<_, Uuid>(
102 + r#"
103 + INSERT INTO scan_jobs
104 + (target_kind, target_id, s3_key, file_type, user_id, file_size_bytes)
105 + VALUES ($1, $2, $3, $4, $5, $6)
106 + RETURNING id
107 + "#,
108 + )
109 + .bind(target_kind.as_str())
110 + .bind(target_id)
111 + .bind(s3_key)
112 + .bind(file_type.as_str())
113 + .bind(user_id)
114 + .bind(file_size_bytes)
115 + .fetch_one(db)
116 + .await?;
117 +
118 + Ok(id)
119 + }
120 +
121 + /// Atomically claim the next queued scan job for processing.
122 + ///
123 + /// Uses `FOR UPDATE SKIP LOCKED` so multiple workers can drain the queue in
124 + /// parallel without contention. Sets `status='running'`, increments `attempts`,
125 + /// and stamps `started_at`. Returns `Ok(None)` if the queue is empty.
126 + #[tracing::instrument(skip_all)]
127 + pub async fn claim_next(db: &PgPool) -> Result<Option<ScanJob>, sqlx::Error> {
128 + let job = sqlx::query_as::<_, ScanJob>(
129 + r#"
130 + WITH next AS (
131 + SELECT id FROM scan_jobs
132 + WHERE status = 'queued'
133 + ORDER BY enqueued_at ASC
134 + FOR UPDATE SKIP LOCKED
135 + LIMIT 1
136 + )
137 + UPDATE scan_jobs
138 + SET status = 'running',
139 + attempts = attempts + 1,
140 + started_at = NOW()
141 + WHERE id = (SELECT id FROM next)
142 + RETURNING *
143 + "#,
144 + )
145 + .fetch_optional(db)
146 + .await?;
147 +
148 + Ok(job)
149 + }
150 +
151 + /// Mark a job as completed successfully.
152 + #[tracing::instrument(skip_all, fields(%job_id))]
153 + pub async fn mark_done(db: &PgPool, job_id: Uuid) -> Result<(), sqlx::Error> {
154 + sqlx::query(
155 + r#"
156 + UPDATE scan_jobs
157 + SET status = 'done', completed_at = NOW(), last_error = NULL
158 + WHERE id = $1
159 + "#,
160 + )
161 + .bind(job_id)
162 + .execute(db)
163 + .await?;
164 + Ok(())
165 + }
166 +
167 + /// Mark a job as failed (worker exception, S3 fetch failure, etc.).
168 + ///
169 + /// Records `last_error` for admin inspection. The job stays in `failed`
170 + /// status; an admin or operator can manually re-enqueue via the dashboard
171 + /// (Phase 2) by inserting a fresh row.
172 + #[tracing::instrument(skip_all, fields(%job_id))]
173 + pub async fn mark_failed(db: &PgPool, job_id: Uuid, err: &str) -> Result<(), sqlx::Error> {
174 + sqlx::query(
175 + r#"
176 + UPDATE scan_jobs
177 + SET status = 'failed', completed_at = NOW(), last_error = $1
178 + WHERE id = $2
179 + "#,
180 + )
181 + .bind(err)
182 + .bind(job_id)
183 + .execute(db)
184 + .await?;
185 + Ok(())
186 + }
187 +
188 + /// Reset jobs that have been stuck in `running` longer than `max_age_secs`.
189 + ///
190 + /// Run on worker startup to recover from a previous-process crash mid-scan:
191 + /// the row would otherwise stay `running` forever and never be re-claimed.
192 + /// Increments `attempts` so a perpetually-crashing scan eventually trips the
193 + /// retry budget.
194 + #[tracing::instrument(skip_all)]
195 + pub async fn reap_stuck(db: &PgPool, max_age_secs: i64) -> Result<u64, sqlx::Error> {
196 + let affected = sqlx::query(
197 + r#"
198 + UPDATE scan_jobs
199 + SET status = 'queued', started_at = NULL
200 + WHERE status = 'running'
201 + AND started_at < NOW() - ($1 || ' seconds')::interval
202 + "#,
203 + )
204 + .bind(max_age_secs.to_string())
205 + .execute(db)
206 + .await?
207 + .rows_affected();
208 + Ok(affected)
209 + }
210 +
211 + /// Count of currently-queued jobs. Used by the admin dashboard health panel
212 + /// (Phase 2 of the audit). Allowed dead code until that route lands.
213 + #[allow(dead_code)]
214 + pub async fn queued_count(db: &PgPool) -> Result<i64, sqlx::Error> {
215 + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM scan_jobs WHERE status = 'queued'")
216 + .fetch_one(db)
217 + .await
218 + }
219 +
220 + /// Count of currently-running jobs. Phase 2 dashboard consumer.
221 + #[allow(dead_code)]
222 + pub async fn running_count(db: &PgPool) -> Result<i64, sqlx::Error> {
223 + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM scan_jobs WHERE status = 'running'")
224 + .fetch_one(db)
225 + .await
226 + }
227 +
228 + /// A held version with enough context to re-enqueue it for scanning.
229 + #[allow(dead_code)]
230 + #[derive(Debug, Clone, FromRow)]
231 + pub struct RescanCandidateVersion {
232 + pub version_id: Uuid,
233 + pub s3_key: String,
234 + pub file_size_bytes: i64,
235 + pub user_id: UserId,
236 + }
237 +
238 + /// A held item (audio or cover) with re-enqueue context.
239 + #[allow(dead_code)]
240 + #[derive(Debug, Clone, FromRow)]
241 + pub struct RescanCandidateItem {
242 + pub item_id: Uuid,
243 + pub s3_key: String,
244 + pub file_size_bytes: i64,
245 + pub user_id: UserId,
246 + /// Audio / cover, selected by the query.
247 + pub file_type: String,
248 + }
249 +
250 + /// Find currently-held versions with enough context to be re-scanned.
251 + pub async fn rescan_candidates_versions(db: &PgPool) -> Result<Vec<RescanCandidateVersion>, sqlx::Error> {
252 + sqlx::query_as::<_, RescanCandidateVersion>(
253 + r#"
254 + SELECT v.id AS version_id,
255 + v.s3_key,
256 + COALESCE(v.file_size_bytes, 0) AS file_size_bytes,
257 + p.user_id
258 + FROM versions v
259 + JOIN items i ON i.id = v.item_id
260 + JOIN projects p ON p.id = i.project_id
261 + WHERE v.scan_status = 'held_for_review'
262 + AND v.s3_key IS NOT NULL
263 + "#,
264 + )
265 + .fetch_all(db)
266 + .await
267 + }
268 +
269 + /// Find currently-held items (audio or cover) with re-enqueue context.
270 + pub async fn rescan_candidates_items(db: &PgPool) -> Result<Vec<RescanCandidateItem>, sqlx::Error> {
271 + sqlx::query_as::<_, RescanCandidateItem>(
272 + r#"
273 + SELECT i.id AS item_id,
274 + COALESCE(i.audio_s3_key, i.cover_s3_key) AS s3_key,
275 + COALESCE(i.audio_file_size_bytes, i.cover_file_size_bytes, 0) AS file_size_bytes,
276 + p.user_id,
277 + CASE WHEN i.audio_s3_key IS NOT NULL THEN 'audio' ELSE 'cover' END AS file_type
278 + FROM items i
279 + JOIN projects p ON p.id = i.project_id
280 + WHERE i.scan_status = 'held_for_review'
281 + AND COALESCE(i.audio_s3_key, i.cover_s3_key) IS NOT NULL
282 + "#,
283 + )
284 + .fetch_all(db)
285 + .await
286 + }
287 +
288 + #[cfg(test)]
289 + mod tests {
290 + use super::*;
291 +
292 + #[test]
293 + fn target_kind_round_trip() {
294 + for kind in [
295 + ScanTargetKind::Item,
296 + ScanTargetKind::Version,
297 + ScanTargetKind::Media,
298 + ScanTargetKind::ProjectImage,
299 + ScanTargetKind::ItemImage,
300 + ScanTargetKind::ContentInsertion,
301 + ] {
302 + assert_eq!(ScanTargetKind::from_str(kind.as_str()), Some(kind));
303 + }
304 + assert_eq!(ScanTargetKind::from_str("bogus"), None);
305 + }
306 + }
@@ -10,7 +10,7 @@ use super::UserId;
10 10 use super::VersionId;
11 11 use crate::scanning::ScanResult;
12 12
13 - /// An item held for review, joined with creator info.
13 + /// An item held for review, joined with creator info and latest scan layers.
14 14 #[derive(Debug, Clone, FromRow)]
15 15 pub struct HeldItemRow {
16 16 pub item_id: ItemId,
@@ -20,9 +20,12 @@ pub struct HeldItemRow {
20 20 pub creator_id: UserId,
21 21 pub upload_trusted: bool,
22 22 pub held_at: DateTime<Utc>,
23 + /// Latest `file_scan_results.scan_layers` JSON for this entity's s3_key,
24 + /// or `null` if no scan has run yet. The dashboard renders this as chips.
25 + pub scan_layers: Option<serde_json::Value>,
23 26 }
24 27
25 - /// A version held for review, joined with creator info.
28 + /// A version held for review, joined with creator info and latest scan layers.
26 29 #[derive(Debug, Clone, FromRow)]
27 30 pub struct HeldVersionRow {
28 31 pub item_id: ItemId,
@@ -34,6 +37,7 @@ pub struct HeldVersionRow {
34 37 pub creator_id: UserId,
35 38 pub upload_trusted: bool,
36 39 pub held_at: DateTime<Utc>,
40 + pub scan_layers: Option<serde_json::Value>,
37 41 }
38 42
39 43 /// Insert a scan result record for audit trail.
@@ -104,7 +108,8 @@ pub async fn update_version_scan_status(
104 108 Ok(())
105 109 }
106 110
107 - /// Get items held for review, joined with creator info. Oldest first.
111 + /// Get items held for review, joined with creator info + latest scan layers.
112 + /// Oldest first.
108 113 #[tracing::instrument(skip_all)]
109 114 pub async fn get_held_items(db: &PgPool) -> Result<Vec<HeldItemRow>, sqlx::Error> {
110 115 let rows = sqlx::query_as::<_, HeldItemRow>(
@@ -112,7 +117,12 @@ pub async fn get_held_items(db: &PgPool) -> Result<Vec<HeldItemRow>, sqlx::Error
112 117 SELECT i.id AS item_id, i.title AS item_title,
113 118 COALESCE(i.audio_s3_key, i.cover_s3_key) AS s3_key,
114 119 u.username AS creator_username, u.id AS creator_id,
115 - u.upload_trusted, i.updated_at AS held_at
120 + u.upload_trusted, i.updated_at AS held_at,
121 + (
122 + SELECT fsr.scan_layers FROM file_scan_results fsr
123 + WHERE fsr.s3_key = COALESCE(i.audio_s3_key, i.cover_s3_key)
124 + ORDER BY fsr.scanned_at DESC LIMIT 1
125 + ) AS scan_layers
116 126 FROM items i
117 127 JOIN projects p ON p.id = i.project_id
118 128 JOIN users u ON u.id = p.user_id
@@ -126,7 +136,8 @@ pub async fn get_held_items(db: &PgPool) -> Result<Vec<HeldItemRow>, sqlx::Error
126 136 Ok(rows)
127 137 }
128 138
129 - /// Get versions held for review, joined with creator info. Oldest first.
139 + /// Get versions held for review, joined with creator info + latest scan layers.
140 + /// Oldest first.
130 141 #[tracing::instrument(skip_all)]
131 142 pub async fn get_held_versions(db: &PgPool) -> Result<Vec<HeldVersionRow>, sqlx::Error> {
132 143 let rows = sqlx::query_as::<_, HeldVersionRow>(
@@ -135,7 +146,12 @@ pub async fn get_held_versions(db: &PgPool) -> Result<Vec<HeldVersionRow>, sqlx:
135 146 v.id AS version_id, v.version_number,
136 147 v.s3_key,
137 148 u.username AS creator_username, u.id AS creator_id,
138 - u.upload_trusted, v.created_at AS held_at
149 + u.upload_trusted, v.created_at AS held_at,
150 + (
151 + SELECT fsr.scan_layers FROM file_scan_results fsr
152 + WHERE fsr.s3_key = v.s3_key
153 + ORDER BY fsr.scanned_at DESC LIMIT 1
154 + ) AS scan_layers
139 155 FROM versions v
140 156 JOIN items i ON i.id = v.item_id
141 157 JOIN projects p ON p.id = i.project_id
@@ -149,3 +165,87 @@ pub async fn get_held_versions(db: &PgPool) -> Result<Vec<HeldVersionRow>, sqlx:
149 165
150 166 Ok(rows)
151 167 }
168 +
169 + /// Per-layer aggregate stats over a window for the admin dashboard.
170 + #[derive(Debug, Clone, FromRow)]
171 + pub struct LayerHealthRow {
172 + pub layer: String,
173 + pub pass_count: i64,
174 + pub skip_count: i64,
175 + pub fail_count: i64,
176 + pub error_count: i64,
177 + pub last_pass_or_skip: Option<DateTime<Utc>>,
178 + }
179 +
180 + /// Compute per-layer health stats over the last N hours.
181 + ///
182 + /// Reads `file_scan_results.scan_layers` JSONB and rolls up verdict counts
183 + /// per layer. `last_pass_or_skip` is the most recent timestamp at which the
184 + /// layer returned a non-error, non-fail verdict — the indicator the admin
185 + /// panel uses to flag a layer as down.
186 + #[tracing::instrument(skip_all)]
187 + pub async fn layer_health_window(
188 + db: &PgPool,
189 + hours: i64,
190 + ) -> Result<Vec<LayerHealthRow>, sqlx::Error> {
191 + sqlx::query_as::<_, LayerHealthRow>(
192 + r#"
193 + WITH expanded AS (
194 + SELECT fsr.scanned_at,
195 + (l ->> 'layer') AS layer,
196 + (l ->> 'verdict') AS verdict
197 + FROM file_scan_results fsr,
198 + jsonb_array_elements(fsr.scan_layers) AS l
199 + WHERE fsr.scanned_at > NOW() - ($1 || ' hours')::interval
200 + )
201 + SELECT layer,
202 + COUNT(*) FILTER (WHERE verdict = 'pass') AS pass_count,
203 + COUNT(*) FILTER (WHERE verdict = 'skip') AS skip_count,
204 + COUNT(*) FILTER (WHERE verdict = 'fail') AS fail_count,
205 + COUNT(*) FILTER (WHERE verdict = 'error') AS error_count,
206 + MAX(scanned_at) FILTER (WHERE verdict IN ('pass', 'skip')) AS last_pass_or_skip
207 + FROM expanded
208 + GROUP BY layer
209 + ORDER BY layer
210 + "#,
211 + )
212 + .bind(hours.to_string())
213 + .fetch_all(db)
214 + .await
215 + }
216 +
217 + /// A scan-history row for the dashboard's "Recent" grid.
218 + #[derive(Debug, Clone, FromRow)]
219 + pub struct ScanHistoryRow {
220 + pub scanned_at: DateTime<Utc>,
221 + pub s3_key: String,
222 + pub scan_status: String,
223 + pub sha256: Option<String>,
224 + pub file_size_bytes: Option<i64>,
225 + pub scan_layers: serde_json::Value,
226 + }
227 +
228 + /// Recent scan results across all entities. Newest first, capped at `limit`.
229 + /// Used by the Recent History collapsible section. `since_hours` bounds the
230 + /// window so the grid renders fast.
231 + #[tracing::instrument(skip_all)]
232 + pub async fn recent_history(
233 + db: &PgPool,
234 + since_hours: i64,
235 + limit: i64,
236 + ) -> Result<Vec<ScanHistoryRow>, sqlx::Error> {
237 + sqlx::query_as::<_, ScanHistoryRow>(
238 + r#"
239 + SELECT fsr.scanned_at, fsr.s3_key, fsr.scan_status,
240 + fsr.sha256, fsr.file_size_bytes, fsr.scan_layers
241 + FROM file_scan_results fsr
242 + WHERE fsr.scanned_at > NOW() - ($1 || ' hours')::interval
243 + ORDER BY fsr.scanned_at DESC
244 + LIMIT $2
245 + "#,
246 + )
247 + .bind(since_hours.to_string())
248 + .bind(limit)
249 + .fetch_all(db)
250 + .await
251 + }
@@ -327,6 +327,22 @@ async fn main() {
327 327 let scheduler_shutdown_rx = shutdown_tx.subscribe();
328 328 let _scheduler_handle = makenotwork::scheduler::spawn_scheduler(state.clone(), scheduler_shutdown_rx);
329 329
330 + // Start scan worker pool. Only meaningful if a scanner is configured;
331 + // otherwise enqueue_scan_for never enqueues (trust-gate fast path).
332 + if let (Some(scanner), Some(s3_for_workers)) = (state.scanner.clone(), state.s3.clone()) {
333 + let scan_ctx = std::sync::Arc::new(makenotwork::scanning::worker::WorkerContext {
334 + db: state.db.clone(),
335 + s3: s3_for_workers,
336 + pipeline: scanner,
337 + scan_semaphore: state.scan_semaphore.clone(),
338 + wam: state.wam.clone(),
339 + });
340 + let worker_count = makenotwork::constants::SCAN_WORKER_COUNT;
341 + let worker_shutdown_rx = shutdown_tx.subscribe();
342 + makenotwork::scanning::worker::spawn_pool(worker_count, scan_ctx, worker_shutdown_rx);
343 + tracing::info!(worker_count, "scan worker pool started");
344 + }
345 +
330 346 // Build router (shared with integration tests via lib.rs)
331 347 let app = build_app(state, session_layer)
332 348 // Request ID: propagate → trace → set (Axum applies inside-out)
@@ -39,10 +39,15 @@ pub fn admin_routes() -> Router<AppState> {
39 39 .route("/api/admin/users/{id}/terminate", post(users::admin_terminate_user))
40 40 // Upload review queue
41 41 .route("/admin/uploads", get(uploads::admin_uploads))
42 - .route("/api/admin/uploads/items/{id}/approve", post(uploads::admin_approve_item_upload))
43 - .route("/api/admin/uploads/items/{id}/reject", post(uploads::admin_reject_item_upload))
44 - .route("/api/admin/uploads/versions/{id}/approve", post(uploads::admin_approve_version_upload))
45 - .route("/api/admin/uploads/versions/{id}/reject", post(uploads::admin_reject_version_upload))
42 + .route("/api/admin/uploads/items/{id}/promote", post(uploads::admin_promote_item))
43 + .route("/api/admin/uploads/items/{id}/quarantine", post(uploads::admin_quarantine_item))
44 + .route("/api/admin/uploads/items/{id}/rescan", post(uploads::admin_rescan_item))
45 + .route("/api/admin/uploads/versions/{id}/promote", post(uploads::admin_promote_version))
46 + .route("/api/admin/uploads/versions/{id}/quarantine", post(uploads::admin_quarantine_version))
47 + .route("/api/admin/uploads/versions/{id}/rescan", post(uploads::admin_rescan_version))
48 + .route("/api/admin/uploads/bulk/rescan", post(uploads::admin_bulk_rescan_held))
49 + .route("/admin/uploads/queue-summary", get(uploads::admin_queue_summary_partial))
50 + .route("/admin/uploads/audit", get(uploads::admin_scan_audit))
46 51 .route("/api/admin/users/{id}/trust", post(users::admin_trust_user))
47 52 .route("/api/admin/users/{id}/untrust", post(users::admin_untrust_user))
48 53 // Appeals
@@ -1,4 +1,9 @@
1 - //! Admin upload review queue: approve/reject held items and versions.
1 + //! Admin upload review queue + scan-pipeline dashboard.
2 + //!
3 + //! Backs the routes registered under `/admin/uploads` and `/api/admin/uploads`.
4 + //! See `docs/scan-pipeline-audit.md` § 5 for the layout and behavior spec.
5 +
6 + use std::collections::HashMap;
2 7
3 8 use axum::{
4 9 extract::{Path, State},
@@ -7,25 +12,97 @@ use axum::{
7 12
8 13 use crate::{
9 14 auth::AdminUser,
10 - db::{self, ItemId, VersionId},
15 + db::{self, scan_admin_actions::AdminAction, scan_jobs::ScanTargetKind, FileScanStatus, ItemId, UserId, VersionId},
11 16 error::Result,
12 17 helpers::get_csrf_token,
18 + storage::FileType,
13 19 templates::*,
14 20 types::*,
15 21 AppState,
16 22 };
17 23
18 - /// Re-query held uploads and return the entries partial.
19 - pub(super) async fn refresh_held_uploads_partial(state: &AppState) -> Result<Response> {
20 - let held_items = db::scanning::get_held_items(&state.db).await?;
21 - let held_versions = db::scanning::get_held_versions(&state.db).await?;
24 + const HEALTH_WINDOW_HOURS: i64 = 24;
25 + const STALE_LAYER_THRESHOLD_HOURS: i64 = 1;
26 + const HISTORY_WINDOW_HOURS: i64 = 24 * 7;
27 + const HISTORY_ROW_LIMIT: i64 = 100;
28 + const AUDIT_LOG_ROW_LIMIT: i64 = 500;
22 29
23 - let mut held_uploads: Vec<AdminHeldUploadRow> = Vec::new();
24 - held_uploads.extend(held_items.iter().map(AdminHeldUploadRow::from_held_item));
25 - held_uploads.extend(held_versions.iter().map(AdminHeldUploadRow::from_held_version));
26 - held_uploads.sort_by(|a, b| a.held_at.cmp(&b.held_at));
30 + /// Build the per-layer health-card vec from the DB rollup.
31 + async fn fetch_layer_health(state: &AppState) -> Result<Vec<LayerHealthCard>> {
32 + let rows = db::scanning::layer_health_window(&state.db, HEALTH_WINDOW_HOURS).await?;
33 + let mut by_layer: HashMap<String, LayerHealthCard> = HashMap::new();
27 34
28 - Ok(AdminUploadEntriesTemplate { held_uploads }.into_response())
35 + // The canonical layer list — keep cards visible even if a layer hasn't
36 + // run at all in the window. The dashboard depends on seeing "clamav: ✗
37 + // not running" rather than the layer just being absent.
38 + for name in ["content_type", "structural", "archive", "yara", "signing_macos", "signing_windows", "signing_linux", "clamav", "malwarebazaar", "urlhaus", "metadefender"] {
39 + by_layer.insert(name.to_string(), LayerHealthCard {
40 + layer: name.to_string(),
41 + total: 0,
42 + success_rate_pct: 0,
43 + error_rate_pct: 0,
44 + fail_count: 0,
45 + status_badge: "down",
46 + last_seen: "never".to_string(),
47 + });
48 + }
49 +
50 + for row in rows {
51 + let total = row.pass_count + row.skip_count + row.fail_count + row.error_count;
52 + let success = row.pass_count + row.skip_count;
53 + let success_rate = if total > 0 { (100 * success / total) as i32 } else { 0 };
54 + let error_rate = if total > 0 { (100 * row.error_count / total) as i32 } else { 0 };
55 +
56 + let stale_cutoff = chrono::Utc::now() - chrono::Duration::hours(STALE_LAYER_THRESHOLD_HOURS);
57 + let recent = row.last_pass_or_skip.is_some_and(|t| t > stale_cutoff);
58 + let status_badge = if total == 0 {
59 + "down"
60 + } else if !recent && row.error_count > 0 {
61 + "down"
62 + } else if error_rate > 10 {
63 + "degraded"
64 + } else {
65 + "ok"
66 + };
67 +
68 + let last_seen = match row.last_pass_or_skip {
69 + Some(t) => relative_age(t),
70 + None => "never".to_string(),
71 + };
72 +
73 + by_layer.insert(row.layer.clone(), LayerHealthCard {
74 + layer: row.layer,
75 + total,
76 + success_rate_pct: success_rate,
77 + error_rate_pct: error_rate,
78 + fail_count: row.fail_count,
79 + status_badge,
80 + last_seen,
81 + });
82 + }
83 +
84 + // Stable ordering matches the pipeline's actual execution order.
85 + let mut out: Vec<LayerHealthCard> = Vec::with_capacity(11);
86 + for name in ["content_type", "structural", "archive", "yara", "signing_macos", "signing_windows", "signing_linux", "clamav", "malwarebazaar", "urlhaus", "metadefender"] {
87 + if let Some(card) = by_layer.remove(name) {
88 + out.push(card);
89 + }
90 + }
91 + Ok(out)
92 + }
93 +
94 + fn relative_age(t: chrono::DateTime<chrono::Utc>) -> String {
95 + let now = chrono::Utc::now();
96 + let delta = now - t;
97 + if delta.num_seconds() < 60 {
98 + format!("{}s ago", delta.num_seconds().max(0))
99 + } else if delta.num_minutes() < 60 {
100 + format!("{}m ago", delta.num_minutes())
101 + } else if delta.num_hours() < 24 {
102 + format!("{}h ago", delta.num_hours())
103 + } else {
104 + format!("{}d ago", delta.num_days())
105 + }
29 106 }
30 107
31 108 /// Render the admin upload review queue.
@@ -45,7 +122,22 @@ pub(super) async fn admin_uploads(
45 122 held_uploads.extend(held_versions.iter().map(AdminHeldUploadRow::from_held_version));
46 123 held_uploads.sort_by(|a, b| a.held_at.cmp(&b.held_at));
47 124
125 + // Attach the last admin action to each held row in a single batch lookup.
126 + attach_last_actions(&state, &mut held_uploads).await;
127 +
48 128 let total_held = held_uploads.len();
129 + let layer_health = fetch_layer_health(&state).await?;
130 + let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0);
131 + let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0);
132 +
133 + let recent_history: Vec<ScanHistoryDisplay> = db::scanning::recent_history(
134 + &state.db, HISTORY_WINDOW_HOURS, HISTORY_ROW_LIMIT,
135 + ).await
136 + .unwrap_or_default()
137 + .iter()
138 + .map(ScanHistoryDisplay::from_row)
139 + .collect();
140 + let history_total = recent_history.len();
49 141
50 142 Ok(AdminUploadsTemplate {
51 143 csrf_token,
@@ -53,53 +145,276 @@ pub(super) async fn admin_uploads(
53 145 held_uploads,
54 146 total_held,
55 147 admin_active_page: "uploads",
148 + layer_health,
149 + queue_pending,
150 + queue_running,
151 + recent_history,
152 + history_total,
56 153 })
57 154 }
58 155
59 - /// Approve a held item upload (set scan_status to clean).
60 - #[tracing::instrument(skip_all, name = "admin::admin_approve_item_upload")]
61 - pub(super) async fn admin_approve_item_upload(
156 + /// Batch-fetch the latest admin action for the displayed held rows and
157 + /// attach each as `last_action` on its row.
158 + async fn attach_last_actions(state: &AppState, rows: &mut [AdminHeldUploadRow]) {
159 + use std::str::FromStr;
160 + let mut version_ids: Vec<uuid::Uuid> = Vec::new();
161 + let mut item_ids: Vec<uuid::Uuid> = Vec::new();
162 + for r in rows.iter() {
163 + if let Some(ref vid) = r.version_id
164 + && let Ok(uuid) = uuid::Uuid::from_str(vid)
165 + {
166 + version_ids.push(uuid);
167 + } else if let Ok(uuid) = uuid::Uuid::from_str(&r.item_id) {
168 + item_ids.push(uuid);
169 + }
170 + }
171 + let version_actions = db::scan_admin_actions::latest_per_version(&state.db, &version_ids)
172 + .await.unwrap_or_default();
173 + let item_actions = db::scan_admin_actions::latest_per_item(&state.db, &item_ids)
174 + .await.unwrap_or_default();
175 +
176 + for r in rows.iter_mut() {
177 + let summary = if let Some(ref vid) = r.version_id {
178 + uuid::Uuid::from_str(vid).ok().and_then(|u| version_actions.get(&u))
179 + } else {
180 + uuid::Uuid::from_str(&r.item_id).ok().and_then(|u| item_actions.get(&u))
181 + };
182 + if let Some(s) = summary {
183 + r.last_action = Some(LastAction {
184 + action: s.action.clone(),
185 + admin_username: s.admin_username.clone(),
186 + when: relative_age(s.created_at),
187 + });
188 + }
189 + }
190 + }
191 +
192 + /// Full audit-log page.
193 + #[tracing::instrument(skip_all, name = "admin::scan_audit")]
194 + pub(super) async fn admin_scan_audit(
62 195 State(state): State<AppState>,
63 - AdminUser(_admin): AdminUser,
196 + session: tower_sessions::Session,
197 + AdminUser(user): AdminUser,
198 + ) -> Result<impl IntoResponse> {
199 + let csrf_token = get_csrf_token(&session).await;
200 + let entries: Vec<AdminAuditLogRow> = db::scan_admin_actions::list_recent_with_admin(
201 + &state.db, AUDIT_LOG_ROW_LIMIT,
202 + ).await
203 + .unwrap_or_default()
204 + .iter()
205 + .map(AdminAuditLogRow::from_db)
206 + .collect();
207 + Ok(AdminScanAuditTemplate {
208 + csrf_token,
209 + session_user: Some(user),
210 + admin_active_page: "uploads",
211 + entries,
212 + })
213 + }
214 +
215 + /// Re-query held uploads and return the entries partial.
216 + pub(super) async fn refresh_held_uploads_partial(state: &AppState) -> Result<Response> {
217 + let held_items = db::scanning::get_held_items(&state.db).await?;
218 + let held_versions = db::scanning::get_held_versions(&state.db).await?;
219 +
220 + let mut held_uploads: Vec<AdminHeldUploadRow> = Vec::new();
221 + held_uploads.extend(held_items.iter().map(AdminHeldUploadRow::from_held_item));
222 + held_uploads.extend(held_versions.iter().map(AdminHeldUploadRow::from_held_version));
223 + held_uploads.sort_by(|a, b| a.held_at.cmp(&b.held_at));
224 +
225 + Ok(AdminUploadEntriesTemplate { held_uploads }.into_response())
226 + }
227 +
228 + // ─── Per-row actions ─────────────────────────────────────────────────────────
229 +
230 + /// Promote a held item upload to Clean. Renames the legacy "approve" verb.
231 + #[tracing::instrument(skip_all, name = "admin::promote_item")]
232 + pub(super) async fn admin_promote_item(
233 + State(state): State<AppState>,
234 + AdminUser(admin): AdminUser,
64 235 Path(id): Path<ItemId>,
65 236 ) -> Result<Response> {
66 - db::scanning::update_item_scan_status(&state.db, id, db::FileScanStatus::Clean).await?;
67 - tracing::info!(item_id = %id, "admin approved item upload");
237 + db::scanning::update_item_scan_status(&state.db, id, FileScanStatus::Clean).await?;
238 + db::scan_admin_actions::log_item(
239 + &state.db, id, admin.id, AdminAction::Promote,
240 + Some("held_for_review"), Some("clean"),
241 + None,
242 + ).await.ok();
243 + tracing::info!(item_id = %id, admin_id = %admin.id, "item promoted to clean");
68 244 refresh_held_uploads_partial(&state).await
69 245 }
70 246
71 - /// Reject a held item upload (set scan_status to quarantined).
72 - #[tracing::instrument(skip_all, name = "admin::admin_reject_item_upload")]
73 - pub(super) async fn admin_reject_item_upload(
247 + /// Quarantine a held item upload. Renames the legacy "reject" verb.
248 + #[tracing::instrument(skip_all, name = "admin::quarantine_item")]
249 + pub(super) async fn admin_quarantine_item(
74 250 State(state): State<AppState>,
75 - AdminUser(_admin): AdminUser,
251 + AdminUser(admin): AdminUser,
76 252 Path(id): Path<ItemId>,
77 253 ) -> Result<Response> {
78 - db::scanning::update_item_scan_status(&state.db, id, db::FileScanStatus::Quarantined).await?;
79 - tracing::info!(item_id = %id, "admin rejected item upload");
254 + db::scanning::update_item_scan_status(&state.db, id, FileScanStatus::Quarantined).await?;
255 + db::scan_admin_actions::log_item(
256 + &state.db, id, admin.id, AdminAction::Quarantine,
257 + Some("held_for_review"), Some("quarantined"),
258 + None,
259 + ).await.ok();
260 + tracing::info!(item_id = %id, admin_id = %admin.id, "item quarantined");
80 261 refresh_held_uploads_partial(&state).await
81 262 }
82 263
83 - /// Approve a held version upload (set scan_status to clean).
84 - #[tracing::instrument(skip_all, name = "admin::admin_approve_version_upload")]
85 - pub(super) async fn admin_approve_version_upload(
264 + /// Promote a held version upload to Clean.
265 + #[tracing::instrument(skip_all, name = "admin::promote_version")]
266 + pub(super) async fn admin_promote_version(
86 267 State(state): State<AppState>,
87 - AdminUser(_admin): AdminUser,
268 + AdminUser(admin): AdminUser,
88 269 Path(id): Path<VersionId>,
89 270 ) -> Result<Response> {
90 - db::scanning::update_version_scan_status(&state.db, id, db::FileScanStatus::Clean).await?;
91 - tracing::info!(version_id = %id, "admin approved version upload");
271 + db::scanning::update_version_scan_status(&state.db, id, FileScanStatus::Clean).await?;
272 + db::scan_admin_actions::log_version(
273 + &state.db, id, admin.id, AdminAction::Promote,
274 + Some("held_for_review"), Some("clean"),
275 + None,
276 + ).await.ok();
277 + tracing::info!(version_id = %id, admin_id = %admin.id, "version promoted to clean");
92 278 refresh_held_uploads_partial(&state).await
93 279 }
94 280
95 - /// Reject a held version upload (set scan_status to quarantined).
96 - #[tracing::instrument(skip_all, name = "admin::admin_reject_version_upload")]
97 - pub(super) async fn admin_reject_version_upload(
281 + /// Quarantine a held version upload.
282 + #[tracing::instrument(skip_all, name = "admin::quarantine_version")]
283 + pub(super) async fn admin_quarantine_version(
98 284 State(state): State<AppState>,
99 - AdminUser(_admin): AdminUser,
285 + AdminUser(admin): AdminUser,
100 286 Path(id): Path<VersionId>,
101 287 ) -> Result<Response> {
102 - db::scanning::update_version_scan_status(&state.db, id, db::FileScanStatus::Quarantined).await?;
103 - tracing::info!(version_id = %id, "admin rejected version upload");
288 + db::scanning::update_version_scan_status(&state.db, id, FileScanStatus::Quarantined).await?;
289 + db::scan_admin_actions::log_version(
290 + &state.db, id, admin.id, AdminAction::Quarantine,
291 + Some("held_for_review"), Some("quarantined"),
292 + None,
293 + ).await.ok();
294 + tracing::info!(version_id = %id, admin_id = %admin.id, "version quarantined");
104 295 refresh_held_uploads_partial(&state).await
105 296 }
297 +
298 + // ─── Rescan ──────────────────────────────────────────────────────────────────
299 +
300 + /// Re-enqueue a single version for scanning. Worker picks it up on its next
301 + /// claim cycle. The entity is moved back to Pending so the dashboard reflects
302 + /// that a scan is in flight.
303 + #[tracing::instrument(skip_all, name = "admin::rescan_version")]
304 + pub(super) async fn admin_rescan_version(
305 + State(state): State<AppState>,
306 + AdminUser(admin): AdminUser,
307 + Path(id): Path<VersionId>,
308 + ) -> Result<Response> {
309 + rescan_version_inner(&state, id, admin.id, AdminAction::Rescan).await?;
310 + refresh_held_uploads_partial(&state).await
311 + }
312 +
313 + async fn rescan_version_inner(
314 + state: &AppState,
315 + id: VersionId,
316 + admin_id: UserId,
317 + action: AdminAction,
318 + ) -> Result<()> {
319 + let v = db::versions::get_version_by_id(&state.db, id).await?
320 + .ok_or(crate::error::AppError::NotFound)?;
321 + let item = db::items::get_item_by_id(&state.db, v.item_id).await?
322 + .ok_or(crate::error::AppError::NotFound)?;
323 + let owner = db::items::get_item_owner(&state.db, item.id).await?
324 + .ok_or(crate::error::AppError::NotFound)?;
325 + let s3_key = v.s3_key.clone().ok_or(crate::error::AppError::NotFound)?;
326 + let size = v.file_size_bytes.unwrap_or(0);
327 +
328 + db::scan_jobs::enqueue(
329 + &state.db, ScanTargetKind::Version, *id.as_uuid(),
330 + &s3_key, FileType::Download, owner, size,
331 + ).await?;
332 + db::scanning::update_version_scan_status(&state.db, id, FileScanStatus::Pending).await?;
333 + db::scan_admin_actions::log_version(
334 + &state.db, id, admin_id, action,
335 + Some("held_for_review"), Some("pending"), None,
336 + ).await.ok();
337 + tracing::info!(version_id = %id, admin_id = %admin_id, "version re-enqueued for scan");
338 + Ok(())
339 + }
340 +
341 + /// Re-enqueue a single item for scanning.
342 + #[tracing::instrument(skip_all, name = "admin::rescan_item")]
343 + pub(super) async fn admin_rescan_item(
344 + State(state): State<AppState>,
345 + AdminUser(admin): AdminUser,
346 + Path(id): Path<ItemId>,
347 + ) -> Result<Response> {
348 + rescan_item_inner(&state, id, admin.id, AdminAction::Rescan).await?;
349 + refresh_held_uploads_partial(&state).await
350 + }
351 +
352 + async fn rescan_item_inner(
353 + state: &AppState,
354 + id: ItemId,
355 + admin_id: UserId,
356 + action: AdminAction,
357 + ) -> Result<()> {
358 + let item = db::items::get_item_by_id(&state.db, id).await?
359 + .ok_or(crate::error::AppError::NotFound)?;
360 + let owner = db::items::get_item_owner(&state.db, id).await?
361 + .ok_or(crate::error::AppError::NotFound)?;
362 +
363 + let (s3_key, size, file_type) = if let Some(k) = item.audio_s3_key.clone() {
364 + (k, item.audio_file_size_bytes.unwrap_or(0), FileType::Audio)
365 + } else if let Some(k) = item.cover_s3_key.clone() {
366 + (k, item.cover_file_size_bytes.unwrap_or(0), FileType::Cover)
367 + } else {
368 + return Err(crate::error::AppError::NotFound);
369 + };
370 +
371 + db::scan_jobs::enqueue(
372 + &state.db, ScanTargetKind::Item, *id.as_uuid(),
373 + &s3_key, file_type, owner, size,
374 + ).await?;
375 + db::scanning::update_item_scan_status(&state.db, id, FileScanStatus::Pending).await?;
376 + db::scan_admin_actions::log_item(
377 + &state.db, id, admin_id, action,
378 + Some("held_for_review"), Some("pending"), None,
379 + ).await.ok();
380 + tracing::info!(item_id = %id, admin_id = %admin_id, "item re-enqueued for scan");
381 + Ok(())
382 + }
383 +
384 + /// Bulk-rescan every currently-held item and version. Used to clear backlogs
385 + /// accumulated under a previous broken pipeline configuration.
386 + #[tracing::instrument(skip_all, name = "admin::bulk_rescan_held")]
387 + pub(super) async fn admin_bulk_rescan_held(
388 + State(state): State<AppState>,
389 + AdminUser(admin): AdminUser,
390 + ) -> Result<Response> {
391 + let mut total = 0usize;
392 +
393 + for cand in db::scan_jobs::rescan_candidates_versions(&state.db).await? {
394 + let id = VersionId::from_uuid(cand.version_id);
395 + if rescan_version_inner(&state, id, admin.id, AdminAction::BulkRescan).await.is_ok() {
396 + total += 1;
397 + }
398 + }
399 + for cand in db::scan_jobs::rescan_candidates_items(&state.db).await? {
400 + let id = ItemId::from_uuid(cand.item_id);
401 + if rescan_item_inner(&state, id, admin.id, AdminAction::BulkRescan).await.is_ok() {
402 + total += 1;
403 + }
404 + }
405 + tracing::info!(total, admin_id = %admin.id, "bulk rescan of held queue dispatched");
406 + refresh_held_uploads_partial(&state).await
407 + }
408 +
409 + // ─── Live partials ───────────────────────────────────────────────────────────
410 +
411 + /// HTMX partial: current pending + running counts. Polled by the dashboard.
412 + #[tracing::instrument(skip_all, name = "admin::queue_summary_partial")]
413 + pub(super) async fn admin_queue_summary_partial(
414 + State(state): State<AppState>,
415 + AdminUser(_admin): AdminUser,
416 + ) -> Result<impl IntoResponse> {
417 + let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0);
418 + let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0);
419 + Ok(AdminQueueSummaryTemplate { queue_pending, queue_running })
420 + }
@@ -13,7 +13,8 @@ use crate::{
13 13 db::{self, ContentInsertionId, ContentInsertionPlacementId, InsertionPosition, ItemId},
14 14 error::{AppError, Result, ResultExt},
15 15 helpers::htmx_toast_response,
16 - routes::storage::scan_and_classify,
16 + db::scan_jobs::ScanTargetKind,
17 + routes::storage::enqueue_scan_for,
17 18 storage::{FileType, S3Client},
18 19 templates::InsertionListTemplate,
19 20 AppState,
@@ -163,13 +164,6 @@ pub(super) async fn confirm_insertion(
163 164 }
164 165 };
165 166
166 - // Scan for malware before accepting the upload
167 - let (_scan_status, malware_err) =
168 - scan_and_classify(&state, s3.as_ref(), &req.s3_key, FileType::Insertion, user.id, file_size_bytes).await?;
169 - if let Some(err) = malware_err {
170 - return Err(err);
171 - }
172 -
173 167 // Atomically increment storage BEFORE writing the DB record.
174 168 // Avoids orphaned unbilled file references.
175 169 if let Err(e) = db::creator_tiers::try_increment_storage(&state.db, user.id, file_size_bytes, max_storage).await {
@@ -192,6 +186,19 @@ pub(super) async fn confirm_insertion(
192 186 )
193 187 .await?;
194 188
189 + // Enqueue async scan. Content insertions don't carry a scan_status column,
190 + // so the worker only records the scan result and creates a WAM ticket on
191 + // quarantine for admin follow-up.
192 + enqueue_scan_for(
193 + &state,
194 + ScanTargetKind::ContentInsertion,
195 + insertion.id.into(),
196 + &req.s3_key,
197 + FileType::Insertion,
198 + user.id,
199 + file_size_bytes,
200 + ).await?;
201 +
195 202 tracing::info!(
196 203 "Insertion confirmed: id={}, user={}, key={}",
197 204 insertion.id,
@@ -166,14 +166,17 @@ pub(super) async fn confirm_upload(
166 166 }
167 167 };
168 168
169 - // Scan + classify
170 - let (status, malware_err) =
171 - crate::routes::storage::scan_and_classify(&state, s3.as_ref(), &req.s3_key, file_type, req.user_id, file_size_bytes)
172 - .await?;
169 + // Enqueue async scan. Worker flips item's scan_status when done.
170 + let status = crate::routes::storage::enqueue_scan_for(
171 + &state,
172 + crate::db::scan_jobs::ScanTargetKind::Item,
173 + req.item_id.into(),
174 + &req.s3_key,
175 + file_type,
176 + req.user_id,
177 + file_size_bytes,
178 + ).await?;
173 179 db::scanning::update_item_scan_status(&state.db, req.item_id, status).await?;
174 - if let Some(err) = malware_err {
175 - return Err(err);
176 - }
177 180
178 181 // Increment storage BEFORE writing the DB record (if quota exceeded, the
179 182 // S3 object is cleaned up and no DB record is created)
@@ -15,7 +15,8 @@ use crate::{
15 15 AppState,
16 16 };
17 17
18 - use super::{scan_and_classify, PresignUploadResponse};
18 + use super::{enqueue_scan_for, PresignUploadResponse};
19 + use crate::db::scan_jobs::ScanTargetKind;
19 20
20 21 /// JSON input for requesting a presigned project image upload URL.
21 22 #[derive(Debug, Deserialize)]
@@ -162,12 +163,18 @@ pub(super) async fn project_image_confirm(
162 163 }
163 164 };
164 165
165 - // Scan + classify — block both Quarantined and HeldForReview (fail closed)
166 - let (_status, malware_err) = scan_and_classify(&state, s3.as_ref(), &req.s3_key, FileType::Cover, user.id, file_size_bytes).await?;
167 - if let Some(err) = malware_err {
168 - s3.delete_object(&req.s3_key).await.ok();
169 - return Err(err);
170 - }
166 + // Enqueue async scan. Project images don't carry their own scan_status
167 + // column, so the worker only records the scan result + creates a WAM
168 + // ticket on quarantine; admin handles deletion via the dashboard.
169 + enqueue_scan_for(
170 + &state,
171 + ScanTargetKind::ProjectImage,
172 + req.project_id.into(),
173 + &req.s3_key,
174 + FileType::Cover,
175 + user.id,
176 + file_size_bytes,
177 + ).await?;
171 178
172 179 // Atomically replace storage (decrement old, increment new) BEFORE writing the DB record
173 180 if let Some(ref old_url) = project.cover_image_url
@@ -348,14 +355,19 @@ pub(super) async fn item_image_confirm(
348 355 }));
349 356 }
350 357
351 - // Scan + classify BEFORE touching storage or the old cover. Fail-closed:
352 - // on malware/quarantine, the new upload is dropped and the old cover stays intact.
353 - let (status, malware_err) = scan_and_classify(&state, s3.as_ref(), &req.s3_key, FileType::Cover, user.id, file_size_bytes).await?;
358 + // Enqueue async scan. The worker flips the item's scan_status when done.
359 + // The old cover is still replaced below; if a later scan quarantines the
360 + // new cover, the admin dashboard surfaces it for cleanup.
361 + let status = enqueue_scan_for(
362 + &state,
363 + ScanTargetKind::ItemImage,
364 + req.item_id.into(),
365 + &req.s3_key,
366 + FileType::Cover,
367 + user.id,
368 + file_size_bytes,
369 + ).await?;
354 370 db::scanning::update_item_scan_status(&state.db, req.item_id, status).await?;
355 - if let Some(err) = malware_err {
356 - s3.delete_object(&req.s3_key).await.ok();
357 - return Err(err);
358 - }
359 371
360 372 // Atomically replace storage (decrement old, increment new) BEFORE writing the DB record.
361 373 // Old S3 object is durably enqueued for deletion only after the swap commits.
@@ -20,7 +20,8 @@ use crate::{
20 20 AppState,
21 21 };
22 22
23 - use super::{scan_and_classify, ConfirmUploadResponse, PresignUploadResponse};
23 + use super::{enqueue_scan_for, ConfirmUploadResponse, PresignUploadResponse};
24 + use crate::db::scan_jobs::ScanTargetKind;
24 25
25 26 // =============================================================================
26 27 // Request / Response Types
@@ -232,13 +233,6 @@ pub(super) async fn media_confirm(
232 233 }
233 234 };
234 235
235 - // Scan
236 - let (scan_status, malware_err) =
237 - scan_and_classify(&state, s3.as_ref(), &req.s3_key, file_type, user.id, file_size_bytes).await?;
238 - if let Some(err) = malware_err {
239 - return Err(err);
240 - }
241 -
242 236 // Atomically increment storage BEFORE writing the DB record.
243 237 // Avoids orphaned unbilled file references.
244 238 if let Err(e) = db::creator_tiers::try_increment_storage(&state.db, user.id, file_size_bytes, max_storage).await {
@@ -255,9 +249,13 @@ pub(super) async fn media_confirm(
255 249 .filter(|c| c.is_alphanumeric() || *c == '.' || *c == '-' || *c == '_')
256 250 .collect::<String>();
257 251
258 - // Insert DB record — unique index on (user_id, folder, filename) prevents
259 - // TOCTOU race between presign uniqueness check and confirm.
260 - match db::media_files::create(
252 + // Insert DB record with `scan_status='pending'` and capture the new id so
253 + // the scan worker can flip it to Clean/Quarantined/HeldForReview when the
254 + // background scan completes. Order matters: media row must exist before
255 + // we enqueue the scan job, otherwise the worker has no target to update.
256 + // Unique index on (user_id, folder, filename) prevents TOCTOU race
257 + // between presign uniqueness check and confirm.
258 + let inserted = match db::media_files::create(
261 259 &state.db,
262 260 user.id,
263 261 &folder,
@@ -266,11 +264,11 @@ pub(super) async fn media_confirm(
266 264 &req.content_type,
267 265 file_size_bytes,
268 266 media_type,
269 - &scan_status.to_string(),
267 + db::FileScanStatus::Pending.to_string().as_str(),
270 268 )
271 269 .await
272 270 {
273 - Ok(_) => {}
271 + Ok(row) => row,
274 272 Err(e) => {
275 273 // Any INSERT failure (conflict or otherwise) must roll back both the storage
276 274 // credit we just incremented and the S3 object — otherwise we leak both.
@@ -285,6 +283,29 @@ pub(super) async fn media_confirm(
285 283 }
286 284 return Err(e);
287 285 }
286 + };
287 +
288 + // Enqueue async scan. The worker will flip the row's scan_status when done.
289 + let scan_status = enqueue_scan_for(
290 + &state,
291 + ScanTargetKind::Media,
292 + inserted.id.into(),
293 + &req.s3_key,
294 + file_type,
295 + user.id,
296 + file_size_bytes,
297 + ).await?;
298 + // If no scanner is configured at all, `enqueue_scan_for` returned a
299 + // terminal status (Clean / HeldForReview) directly. Reflect that on the
300 + // row immediately rather than leaving it at Pending forever.
301 + if scan_status != db::FileScanStatus::Pending {
302 + let media_uuid: uuid::Uuid = inserted.id.into();
303 + sqlx::query("UPDATE media_files SET scan_status = $1 WHERE id = $2")
304 + .bind(scan_status)
305 + .bind(media_uuid)
306 + .execute(&state.db)
307 + .await
308 + .ok();
288 309 }
289 310
290 311 tracing::info!(
@@ -9,11 +9,13 @@ mod versions;
9 9 use axum::{routing::{delete, get, post}, Router};
10 10 use serde::Serialize;
11 11 use tower_governor::GovernorLayer;
12 + use uuid::Uuid;
12 13
13 14 use crate::{
14 15 constants,
15 16 db,
16 - error::{AppError, Result, ResultExt},
17 + db::scan_jobs::ScanTargetKind,
18 + error::Result,
17 19 storage::FileType,
18 20 AppState,
19 21 };
@@ -88,82 +90,50 @@ pub struct ConfirmUploadResponse {
88 90 // Helpers
89 91 // =============================================================================
90 92
91 - /// Scan a file (if scanner is available) and determine final upload status
92 - /// based on scan result and user trust level. Returns the status to apply
93 - /// and optionally a `MalwareDetected` error if the file was quarantined.
94 - /// The caller must update the entity-specific scan status before propagating
95 - /// the quarantine error.
96 - pub(crate) async fn scan_and_classify(
93 + /// Enqueue an async scan job for an uploaded file and return the initial
94 + /// `scan_status` to write onto the target entity.
95 + ///
96 + /// Replaces the previous synchronous `scan_and_classify`. The upload route
97 + /// no longer waits on the scan pipeline — it stamps the returned status
98 + /// (`Pending` if a scan job was enqueued, `Clean`/`HeldForReview` if no
99 + /// scanner is configured at all) and returns. A scan worker (see
100 + /// `crate::scanning::worker`) finishes the work in the background, updates
101 + /// the target entity's `scan_status`, and creates a WAM ticket on
102 + /// quarantine.
103 + ///
104 + /// The legacy `is_upload_trusted` overlay is preserved: untrusted users
105 + /// route to admin review even on a clean scan. That decision now lives in
106 + /// the worker rather than here, since the worker is the one that sees the
107 + /// scan result. When no scanner is configured at all (test/dev), the trust
108 + /// gate fires here so we don't enqueue jobs no worker will consume.
109 + pub(crate) async fn enqueue_scan_for(
97 110 state: &AppState,
98 - s3: &dyn crate::storage::StorageBackend,
111 + target_kind: ScanTargetKind,
112 + target_id: Uuid,
99 113 s3_key: &str,
100 114 file_type: FileType,
101 115 user_id: db::UserId,
102 116 file_size_bytes: i64,
103 - ) -> Result<(db::FileScanStatus, Option<AppError>)> {
104 - if let Some(ref scanner) = state.scanner {
105 - // Guard against OOM: files larger than SCAN_MAX_MEMORY_BYTES cannot
106 - // be loaded into RAM for in-process scanning. Hold them for admin
107 - // review instead of skipping the scan entirely (fail closed).
108 - if file_size_bytes as usize > constants::SCAN_MAX_MEMORY_BYTES {
109 - tracing::warn!(
110 - s3_key,
111 - file_size_bytes,
112 - max = constants::SCAN_MAX_MEMORY_BYTES,
113 - "file too large for in-memory scan, holding for review"
114 - );
115 - return Ok((db::FileScanStatus::HeldForReview, None));
116 - }
117 -
118 - // Limit concurrent scans to avoid memory exhaustion (each downloads
119 - // up to 100 MB into a Vec<u8>). The permit covers the in-memory work
120 - // — download buffer + scan — and is dropped BEFORE the DB insert and
121 - // WAM ticket creation. Those tail ops don't allocate big buffers, so
122 - // holding the semaphore through them just serializes uploads for no
123 - // benefit. The scope below ensures the permit's drop is the last
124 - // thing that happens before we leave the memory-heavy region.
125 - let result = {
126 - let _permit = state.scan_semaphore.acquire().await
127 - .context("acquire scan semaphore")?;
128 - let data = s3.download_object(s3_key).await?;
129 - // `scanner` is `Arc<ScanPipeline>`; clone the Arc so the
130 - // spawn_blocking closure inside `scan` can own it for the
131 - // duration of CPU-bound layers. `scan` consumes the Vec, so
132 - // by the time it returns we hold only the ScanResult + layer
133 - // metadata — small enough that the permit can drop here.
134 - std::sync::Arc::clone(scanner).scan(data, file_type).await
135 - };
136 -
137 - db::scanning::insert_scan_result(&state.db, s3_key, &result).await?;
138 -
139 - if result.status == db::FileScanStatus::Quarantined {
140 - let failed_layers: Vec<&str> = result
141 - .layers
142 - .iter()
143 - .filter(|l| l.verdict == crate::scanning::LayerVerdict::Fail)
144 - .map(|l| l.layer)
145 - .collect();
146 - if let Some(ref wam) = state.wam {
147 - let title = format!("File quarantined: {s3_key}");
148 - let body = format!(
149 - "Upload by user {user_id} flagged as malicious.\n\
150 - Failed layers: {}\nFile type: {file_type:?}\nSize: {file_size_bytes}",
151 - failed_layers.join(", "),
152 - );
153 - wam.create_ticket(&title, Some(&body), "high", "malware-quarantine", Some(s3_key)).await;
154 - }
155 - return Ok((
156 - db::FileScanStatus::Quarantined,
157 - Some(AppError::MalwareDetected(failed_layers.join(", "))),
158 - ));
159 - }
117 + ) -> Result<db::FileScanStatus> {
118 + if state.scanner.is_none() {
119 + let is_trusted = db::users::is_upload_trusted(&state.db, user_id).await?;
120 + return Ok(if is_trusted {
121 + db::FileScanStatus::Clean
122 + } else {
123 + db::FileScanStatus::HeldForReview
124 + });
160 125 }
161 126
162 - let is_trusted = db::users::is_upload_trusted(&state.db, user_id).await?;
163 - let status = if is_trusted {
164 - db::FileScanStatus::Clean
165 - } else {
166 - db::FileScanStatus::HeldForReview
167 - };
168 - Ok((status, None))
127 + db::scan_jobs::enqueue(
128 + &state.db,
129 + target_kind,
130 + target_id,
131 + s3_key,
132 + file_type,
133 + user_id,
134 + file_size_bytes,
135 + )
136 + .await?;
137 +
138 + Ok(db::FileScanStatus::Pending)
169 139 }
@@ -16,7 +16,8 @@ use crate::{
16 16 AppState,
17 17 };
18 18
19 - use super::{scan_and_classify, ConfirmUploadResponse, PresignUploadResponse};
19 + use super::{enqueue_scan_for, ConfirmUploadResponse, PresignUploadResponse};
20 + use crate::db::scan_jobs::ScanTargetKind;
20 21
21 22 /// JSON input for requesting a presigned S3 upload URL.
22 23 ///
@@ -199,12 +200,17 @@ pub(super) async fn confirm_upload(
199 200 }
200 201 };
201 202
202 - // Scan + classify trust level
203 - let (scan_status, malware_err) = scan_and_classify(&state, s3.as_ref(), &req.s3_key, file_type, user.id, file_size_bytes).await?;
203 + // Enqueue async scan. Worker flips the item's scan_status when done.
204 + let scan_status = enqueue_scan_for(
205 + &state,
206 + ScanTargetKind::Item,
207 + req.item_id.into(),
208 + &req.s3_key,
209 + file_type,
210 + user.id,
211 + file_size_bytes,
212 + ).await?;
204 213 db::scanning::update_item_scan_status(&state.db, req.item_id, scan_status).await?;
205 - if let Some(err) = malware_err {
206 - return Err(err);
207 - }
208 214
209 215 // Reject file types that have their own dedicated confirm routes before
210 216 // touching the storage counter — otherwise the increment is never undone.
@@ -15,7 +15,8 @@ use crate::{
15 15 AppState,
16 16 };
17 17
18 - use super::{scan_and_classify, ConfirmUploadResponse, PresignUploadResponse};
18 + use super::{enqueue_scan_for, ConfirmUploadResponse, PresignUploadResponse};
19 + use crate::db::scan_jobs::ScanTargetKind;
19 20
20 21 /// JSON input for requesting a presigned version upload URL.
21 22 #[derive(Debug, Deserialize)]
@@ -155,12 +156,17 @@ pub(super) async fn version_confirm_upload(
155 156 }
156 157 };
157 158
158 - // Scan + classify trust level
159 - let (scan_status, malware_err) = scan_and_classify(&state, s3.as_ref(), &req.s3_key, FileType::Download, user.id, file_size_bytes).await?;
159 + // Enqueue async scan. Worker flips the version's scan_status when done.
160 + let scan_status = enqueue_scan_for(
161 + &state,
162 + ScanTargetKind::Version,
163 + version_id.into(),
164 + &req.s3_key,
165 + FileType::Download,
166 + user.id,
167 + file_size_bytes,
168 + ).await?;
160 169 db::scanning::update_version_scan_status(&state.db, version_id, scan_status).await?;
161 - if let Some(err) = malware_err {
162 - return Err(err);
163 - }
164 170
165 171 // Idempotency: if the version already has this exact s3_key, return success (no-op)
166 172 if version.s3_key.as_deref() == Some(&req.s3_key) {
@@ -9,7 +9,12 @@ use std::io::Cursor;
9 9 use crate::constants;
10 10 use crate::storage::FileType;
11 11
12 - use super::{LayerResult, LayerVerdict};
12 + use super::{ErrorPolicy, LayerResult, LayerVerdict};
13 +
14 + /// In-process deterministic layer. ZIP parser errors fail closed because they
15 + /// indicate either a corrupt archive or an evasion attempt — both warrant a
16 + /// human look rather than an automatic pass.
17 + pub const ERROR_POLICY: ErrorPolicy = ErrorPolicy::FailClosed;
13 18
14 19 /// Check a file for ZIP bomb and archive safety issues.
15 20 /// Runs for any file that has ZIP magic bytes, regardless of claimed type.