Skip to main content

max / makenotwork

14.3 KB · 324 lines History Blame Raw
1 //! Async scan worker.
2 //!
3 //! Spawned at startup from `main.rs`. Drains `scan_jobs` via
4 //! `db::scan_jobs::claim_next` and runs each job through the pipeline. On
5 //! completion, updates the target entity's `scan_status` (for entities that
6 //! have one) and creates a WAM ticket on `Quarantined`.
7 //!
8 //! See `docs/scan-pipeline-audit.md` § 4.4 for the architecture.
9
10 use std::sync::Arc;
11 use std::time::Duration;
12
13 use sqlx::PgPool;
14 use tokio::sync::Semaphore;
15 use uuid::Uuid;
16
17 use crate::constants;
18 use crate::db::{self, scan_jobs::{ScanJob, ScanTargetKind}, FileScanStatus, ItemId, VersionId};
19 use crate::storage::{FileType, StorageBackend};
20 use crate::wam_client::WamClient;
21
22 use super::{LayerVerdict, ScanPipeline, ScanResult};
23
24 /// Worker poll interval when the queue is empty.
25 const IDLE_POLL_INTERVAL: Duration = Duration::from_millis(500);
26
27 /// How long a `running` job can sit before the reaper resets it to `queued`.
28 /// A scan that legitimately takes longer than this is an outlier and warrants
29 /// admin attention anyway.
30 const STUCK_JOB_SECS: i64 = 300;
31
32 /// Cadence at which any worker tries to reap stuck jobs.
33 const REAPER_INTERVAL: Duration = Duration::from_secs(60);
34
35 /// Shared dependencies the worker pool needs.
36 pub struct WorkerContext {
37 pub db: PgPool,
38 pub s3: Arc<dyn StorageBackend>,
39 pub pipeline: Arc<ScanPipeline>,
40 pub scan_semaphore: Arc<Semaphore>,
41 pub wam: Option<WamClient>,
42 }
43
44 /// Spawn `n` scan workers on the current tokio runtime. Each worker drains
45 /// `scan_jobs` independently with FOR UPDATE SKIP LOCKED. A single reaper
46 /// task per pool resets jobs that get stuck in `running`.
47 ///
48 /// All tasks observe `shutdown_rx`: when the sender is dropped (or the value
49 /// changes), they exit on their next idle cycle.
50 pub fn spawn_pool(n: usize, ctx: Arc<WorkerContext>, shutdown_rx: tokio::sync::watch::Receiver<()>) {
51 for worker_id in 0..n {
52 let ctx = Arc::clone(&ctx);
53 let mut shutdown_rx = shutdown_rx.clone();
54 tokio::spawn(async move {
55 tracing::info!(worker_id, "scan worker started");
56 loop {
57 match db::scan_jobs::claim_next(&ctx.db).await {
58 Ok(Some(job)) => {
59 let job_id = job.id;
60 if let Err(e) = process_job(&ctx, job).await {
61 tracing::error!(worker_id, %job_id, error = %e, "scan job failed");
62 if let Err(e2) = db::scan_jobs::mark_failed(&ctx.db, job_id, &e.to_string()).await {
63 tracing::error!(worker_id, %job_id, error = %e2, "failed to mark job failed");
64 }
65 }
66 }
67 Ok(None) => {
68 tokio::select! {
69 _ = tokio::time::sleep(IDLE_POLL_INTERVAL) => {}
70 res = shutdown_rx.changed() => {
71 if res.is_err() {
72 tracing::info!(worker_id, "scan worker shutting down");
73 break;
74 }
75 }
76 }
77 }
78 Err(e) => {
79 tracing::error!(worker_id, error = %e, "claim_next failed; backing off");
80 tokio::select! {
81 _ = tokio::time::sleep(Duration::from_secs(5)) => {}
82 res = shutdown_rx.changed() => {
83 if res.is_err() {
84 break;
85 }
86 }
87 }
88 }
89 }
90 }
91 });
92 }
93
94 let ctx_reaper = Arc::clone(&ctx);
95 let mut shutdown_rx = shutdown_rx;
96 tokio::spawn(async move {
97 loop {
98 match db::scan_jobs::reap_stuck(&ctx_reaper.db, STUCK_JOB_SECS).await {
99 Ok(n) if n > 0 => {
100 tracing::warn!(reset = n, max_age_secs = STUCK_JOB_SECS, "reset stuck scan jobs");
101 }
102 Ok(_) => {}
103 Err(e) => tracing::error!(error = %e, "scan job reaper failed"),
104 }
105 tokio::select! {
106 _ = tokio::time::sleep(REAPER_INTERVAL) => {}
107 res = shutdown_rx.changed() => {
108 if res.is_err() {
109 break;
110 }
111 }
112 }
113 }
114 });
115 }
116
117 /// Test/dev helper: claim and process at most one queued scan job synchronously.
118 /// Returns `Ok(true)` when a job ran, `Ok(false)` when the queue was empty.
119 /// Mirrors `spawn_pool`'s per-iteration logic without spawning a background
120 /// task, so integration tests can deterministically drain the queue between
121 /// upload-confirm and assertion.
122 pub async fn process_next_for_test(ctx: &WorkerContext) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
123 match db::scan_jobs::claim_next(&ctx.db).await? {
124 Some(job) => {
125 let job_id = job.id;
126 if let Err(e) = process_job(ctx, job).await {
127 db::scan_jobs::mark_failed(&ctx.db, job_id, &e.to_string()).await?;
128 return Err(e);
129 }
130 Ok(true)
131 }
132 None => Ok(false),
133 }
134 }
135
136 /// Run a single scan job end-to-end. On success the job is marked done; the
137 /// caller marks failed if this returns an error.
138 ///
139 /// On pipeline error (e.g. S3 download failure), reset the entity from
140 /// Scanning back to HeldForReview before bubbling the error up. Otherwise
141 /// the entity stays stuck at Scanning forever — a real regression we hit
142 /// in production with stale s3_keys.
143 #[tracing::instrument(skip_all, fields(%job_id = job.id, target_kind = %job.target_kind, %target_id = job.target_id, attempts = job.attempts))]
144 async fn process_job(ctx: &WorkerContext, job: ScanJob) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
145 let job_id = job.id;
146 let kind = job.typed_kind().ok_or_else(|| format!("unknown target_kind: {}", job.target_kind))?;
147 let file_type = job.typed_file_type().ok_or_else(|| format!("unknown file_type: {}", job.file_type))?;
148 let target_id = job.target_id;
149
150 // Mark target as Scanning while the worker is running (only entities with
151 // a scan_status column). This is a visible signal in the admin dashboard
152 // queue panel.
153 update_entity_status(&ctx.db, kind, target_id, FileScanStatus::Scanning).await.ok();
154
155 let entity_status = match run_pipeline_and_decide(ctx, &job, kind, file_type).await {
156 Ok(s) => s,
157 Err(e) => {
158 // Pipeline blew up — most often a stale s3_key. Reset entity to
159 // HeldForReview so admins see it on the dashboard and decide
160 // whether to delete the orphan record.
161 update_entity_status(&ctx.db, kind, target_id, FileScanStatus::HeldForReview).await.ok();
162 return Err(e);
163 }
164 };
165 update_entity_status(&ctx.db, kind, target_id, entity_status).await?;
166
167 db::scan_jobs::mark_done(&ctx.db, job_id).await?;
168 Ok(())
169 }
170
171 /// Run the pipeline against the S3 object and return the entity status to
172 /// apply, honoring the size guard, trust gate, and WAM ticketing.
173 async fn run_pipeline_and_decide(
174 ctx: &WorkerContext,
175 job: &ScanJob,
176 kind: ScanTargetKind,
177 file_type: FileType,
178 ) -> Result<FileScanStatus, Box<dyn std::error::Error + Send + Sync>> {
179 // Two paths, gated on file size. Small files go through the original
180 // buffered `Pipeline::scan(Vec<u8>)`: a single S3 GET into a heap
181 // buffer, then layers walk the slice. Big files (>= SCAN_MAX_MEMORY_BYTES)
182 // stream from S3 into a tempfile under SCAN_SPOOL_DIR, then layers
183 // run against the spooled path (mmap or streamed). The buffered path
184 // stays alive: it's the hot path for tip-jar avatars / small audio /
185 // download files, and avoiding the tempfile syscall + write matters at
186 // that scale. Both branches run S3 IO *outside* the scan_semaphore:
187 // the permit bounds the CPU/clamd-heavy scan phase, not network IO.
188 // Holding it across the GET serializes downloads at SCAN_MAX_CONCURRENT
189 // and lets a scan backlog starve the DB pool.
190 let result: ScanResult = if (job.file_size_bytes as usize) < constants::SCAN_MAX_MEMORY_BYTES {
191 let data = ctx.s3.download_object(&job.s3_key).await?;
192 let _permit = ctx.scan_semaphore.acquire().await?;
193 Arc::clone(&ctx.pipeline).scan(data, file_type).await
194 } else {
195 let stream = ctx.s3.download_stream(&job.s3_key).await?;
196 let spool = super::spool::download_into_tempfile(
197 std::path::Path::new(constants::SCAN_SPOOL_DIR),
198 &job.id.to_string(),
199 &job.s3_key,
200 job.file_size_bytes as u64,
201 stream,
202 )
203 .await?;
204 let _permit = ctx.scan_semaphore.acquire().await?;
205 Arc::clone(&ctx.pipeline).scan_stream(spool, file_type).await
206 };
207
208 db::scanning::insert_scan_result(&ctx.db, &job.s3_key, &result).await?;
209
210 if result.status == FileScanStatus::Quarantined {
211 let failed_layers: Vec<&str> = result.layers.iter()
212 .filter(|l| l.verdict == LayerVerdict::Fail)
213 .map(|l| l.layer)
214 .collect();
215 if let Some(ref wam) = ctx.wam {
216 let title = format!("File quarantined: {}", job.s3_key);
217 let body = format!(
218 "Upload by user {} flagged as malicious.\n\
219 Failed layers: {}\nFile type: {file_type:?}\nSize: {}",
220 job.user_id, failed_layers.join(", "), job.file_size_bytes,
221 );
222 wam.create_ticket(&title, Some(&body), "high", "malware-quarantine", Some(&job.s3_key)).await;
223 }
224
225 // CDN-served image kinds (project/gallery/content-insertion) have no
226 // scan_status column and no app-proxied download route to gate on, so a
227 // verdict has to be enforced by removing the content, not by flipping a
228 // status. Two-step enforcement:
229 //
230 // 1. Delete the DB image row. This stops the app from ever rendering
231 // the (Cloudflare-served) URL again AND makes the s3_key non-live,
232 // so the durable-deletion queue below won't park it behind the
233 // `is_s3_key_live` guard.
234 // 2. Delete the S3 object; on failure, enqueue it for durable retry
235 // (the scheduler worker retries to completion and dead-letters on
236 // exhaustion) rather than a single best-effort attempt that could
237 // leave the origin object behind forever.
238 //
239 // Residual exposure: Cloudflare caches these objects immutably for a
240 // year, so an already-edge-cached copy survives origin deletion until
241 // the cache TTL lapses. Purging the edge cache needs the Cloudflare API
242 // (token + zone), which the app doesn't currently hold — the WAM ticket
243 // above is the manual trigger for that. The verdict + failed layers stay
244 // in file_scan_results for admin review.
245 if kind.is_cdn_served_without_gate() {
246 match db::scanning::purge_cdn_image_rows_by_key(&ctx.db, &job.s3_key).await {
247 Ok(n) => tracing::warn!(
248 s3_key = %job.s3_key, target_kind = %kind.as_str(), rows_removed = n,
249 "removed quarantined CDN-served image row(s); URL is no longer rendered"
250 ),
251 Err(e) => tracing::error!(
252 s3_key = %job.s3_key, target_kind = %kind.as_str(), error = %e,
253 "FAILED to remove quarantined image row(s); the URL may still render until manual removal"
254 ),
255 }
256
257 match ctx.s3.delete_object(&job.s3_key).await {
258 Ok(()) => tracing::warn!(
259 s3_key = %job.s3_key, target_kind = %kind.as_str(),
260 "purged quarantined CDN-served image object from storage"
261 ),
262 Err(e) => {
263 tracing::error!(
264 s3_key = %job.s3_key, target_kind = %kind.as_str(), error = %e,
265 "immediate purge of quarantined image object failed; enqueuing for durable retry"
266 );
267 if let Err(enqueue_err) = db::pending_s3_deletions::enqueue_deletions(
268 &ctx.db,
269 &[(job.s3_key.clone(), "main".to_string())],
270 "malware_quarantine",
271 )
272 .await
273 {
274 tracing::error!(
275 s3_key = %job.s3_key, error = %enqueue_err,
276 "FAILED to enqueue quarantined image object for durable deletion; object may persist at origin until manual removal"
277 );
278 }
279 }
280 }
281 }
282 return Ok(FileScanStatus::Quarantined);
283 }
284
285 // Pipeline returned Clean or HeldForReview. Apply the uploader-trust
286 // overlay: untrusted users always route to admin review even on a clean
287 // scan. This preserves the pre-async semantics — the architecture changed,
288 // not the policy.
289 let is_trusted = db::users::is_upload_trusted(&ctx.db, job.user_id).await?;
290 Ok(if is_trusted { FileScanStatus::Clean } else { FileScanStatus::HeldForReview })
291 }
292
293 /// Update the per-entity `scan_status` column for the kinds that have one.
294 /// `ProjectImage` / `ContentInsertion` don't carry their own column today —
295 /// the worker still scanned the file and recorded results, but there's no
296 /// status to flip on those entities.
297 async fn update_entity_status(
298 db: &PgPool,
299 kind: ScanTargetKind,
300 target_id: Uuid,
301 status: FileScanStatus,
302 ) -> Result<(), sqlx::Error> {
303 match kind {
304 ScanTargetKind::Version => {
305 db::scanning::update_version_scan_status(db, VersionId::from(target_id), status).await
306 }
307 ScanTargetKind::Item | ScanTargetKind::ItemImage => {
308 db::scanning::update_item_scan_status(db, ItemId::from(target_id), status).await
309 }
310 ScanTargetKind::Media => {
311 db::scanning::update_media_file_scan_status(
312 db,
313 db::MediaFileId::from(target_id),
314 status,
315 )
316 .await
317 }
318 ScanTargetKind::ProjectImage
319 | ScanTargetKind::GalleryImage
320 | ScanTargetKind::ContentInsertion => Ok(()),
321 }
322 }
323
324