Skip to main content

max / makenotwork

11.9 KB · 371 lines History Blame Raw
1 //! Async scan-job queue for the malware pipeline.
2 //!
3 //! Upload routes call [`enqueue`] to register a scan job and return their
4 //! request to the client. A pool of scan workers (`crate::scanning::worker`)
5 //! drains the queue with `FOR UPDATE SKIP LOCKED` via [`claim_next`], runs the
6 //! pipeline against the S3 object, and finalizes the job with [`mark_done`] or
7 //! [`mark_failed`].
8 //!
9 //! See `docs/scan-pipeline-audit.md` for the wider architecture.
10
11 use chrono::{DateTime, Utc};
12 use sqlx::{FromRow, PgPool};
13 use uuid::Uuid;
14
15 use crate::storage::FileType;
16
17 use super::UserId;
18
19 /// The entity whose `scan_status` the worker should update when the scan
20 /// completes. `Item`, `Version`, and `Media` have `scan_status` columns;
21 /// `ProjectImage` and `ContentInsertion` do not — for those, the worker
22 /// still scans (recording results in `file_scan_results`) but only acts on
23 /// `Quarantined` by creating a WAM ticket for admin follow-up.
24 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
25 pub enum ScanTargetKind {
26 Item,
27 Version,
28 Media,
29 ProjectImage,
30 ItemImage,
31 GalleryImage,
32 ContentInsertion,
33 }
34
35 impl ScanTargetKind {
36 pub fn as_str(&self) -> &'static str {
37 match self {
38 ScanTargetKind::Item => "item",
39 ScanTargetKind::Version => "version",
40 ScanTargetKind::Media => "media",
41 ScanTargetKind::ProjectImage => "project_image",
42 ScanTargetKind::ItemImage => "item_image",
43 ScanTargetKind::GalleryImage => "gallery_image",
44 ScanTargetKind::ContentInsertion => "content_insertion",
45 }
46 }
47
48 pub fn from_str(s: &str) -> Option<Self> {
49 Some(match s {
50 "item" => ScanTargetKind::Item,
51 "version" => ScanTargetKind::Version,
52 "media" => ScanTargetKind::Media,
53 "project_image" => ScanTargetKind::ProjectImage,
54 "item_image" => ScanTargetKind::ItemImage,
55 "gallery_image" => ScanTargetKind::GalleryImage,
56 "content_insertion" => ScanTargetKind::ContentInsertion,
57 _ => return None,
58 })
59 }
60
61 /// Whether this kind is served directly from the CDN by `s3_key` with no
62 /// app-proxied download route that can consult a `scan_status` column.
63 ///
64 /// `Item`/`Version`/`Media` are gated at their download handlers (the route
65 /// refuses to issue a non-`Clean` object). These three image kinds carry no
66 /// `scan_status` column and are rendered straight from `cdn.makenot.work/{key}`,
67 /// so the ONLY enforcement point for a `Quarantined` verdict is removing the
68 /// object from storage — otherwise the malicious file keeps serving from the
69 /// CDN until an admin manually acts on the WAM ticket. The worker purges the
70 /// object for these kinds on quarantine.
71 pub fn is_cdn_served_without_gate(&self) -> bool {
72 matches!(
73 self,
74 ScanTargetKind::ProjectImage
75 | ScanTargetKind::GalleryImage
76 | ScanTargetKind::ContentInsertion
77 )
78 }
79 }
80
81 /// A queued or running scan job, as claimed by a worker. Most fields are
82 /// populated via `FromRow` from sqlx; fields not consumed by the worker
83 /// today are kept for the admin dashboard (Phase 2 of the audit).
84 #[allow(dead_code)]
85 #[derive(Debug, Clone, FromRow)]
86 pub struct ScanJob {
87 pub id: Uuid,
88 pub target_kind: String,
89 pub target_id: Uuid,
90 pub s3_key: String,
91 pub file_type: String,
92 pub user_id: UserId,
93 pub file_size_bytes: i64,
94 pub status: String,
95 pub attempts: i32,
96 pub enqueued_at: DateTime<Utc>,
97 pub started_at: Option<DateTime<Utc>>,
98 pub completed_at: Option<DateTime<Utc>>,
99 pub last_error: Option<String>,
100 }
101
102 impl ScanJob {
103 pub fn typed_kind(&self) -> Option<ScanTargetKind> {
104 ScanTargetKind::from_str(&self.target_kind)
105 }
106
107 pub fn typed_file_type(&self) -> Option<FileType> {
108 self.file_type.parse().ok()
109 }
110 }
111
112 /// Enqueue a scan job. Returns the job id.
113 #[tracing::instrument(skip_all, fields(target_kind = target_kind.as_str(), %target_id, s3_key))]
114 pub async fn enqueue(
115 db: &PgPool,
116 target_kind: ScanTargetKind,
117 target_id: Uuid,
118 s3_key: &str,
119 file_type: FileType,
120 user_id: UserId,
121 file_size_bytes: i64,
122 ) -> Result<Uuid, sqlx::Error> {
123 let id = sqlx::query_scalar::<_, Uuid>(
124 r#"
125 INSERT INTO scan_jobs
126 (target_kind, target_id, s3_key, file_type, user_id, file_size_bytes)
127 VALUES ($1, $2, $3, $4, $5, $6)
128 RETURNING id
129 "#,
130 )
131 .bind(target_kind.as_str())
132 .bind(target_id)
133 .bind(s3_key)
134 .bind(file_type.as_str())
135 .bind(user_id)
136 .bind(file_size_bytes)
137 .fetch_one(db)
138 .await?;
139
140 Ok(id)
141 }
142
143 /// Atomically claim the next queued scan job for processing.
144 ///
145 /// Uses `FOR UPDATE SKIP LOCKED` so multiple workers can drain the queue in
146 /// parallel without contention. Sets `status='running'`, increments `attempts`,
147 /// and stamps `started_at`. Returns `Ok(None)` if the queue is empty.
148 #[tracing::instrument(skip_all)]
149 pub async fn claim_next(db: &PgPool) -> Result<Option<ScanJob>, sqlx::Error> {
150 let job = sqlx::query_as::<_, ScanJob>(
151 r#"
152 WITH next AS (
153 SELECT id FROM scan_jobs
154 WHERE status = 'queued'
155 ORDER BY enqueued_at ASC
156 FOR UPDATE SKIP LOCKED
157 LIMIT 1
158 )
159 UPDATE scan_jobs
160 SET status = 'running',
161 attempts = attempts + 1,
162 started_at = NOW()
163 WHERE id = (SELECT id FROM next)
164 RETURNING *
165 "#,
166 )
167 .fetch_optional(db)
168 .await?;
169
170 Ok(job)
171 }
172
173 /// Mark a job as completed successfully.
174 #[tracing::instrument(skip_all, fields(%job_id))]
175 pub async fn mark_done(db: &PgPool, job_id: Uuid) -> Result<(), sqlx::Error> {
176 sqlx::query(
177 r#"
178 UPDATE scan_jobs
179 SET status = 'done', completed_at = NOW(), last_error = NULL
180 WHERE id = $1
181 "#,
182 )
183 .bind(job_id)
184 .execute(db)
185 .await?;
186 Ok(())
187 }
188
189 /// Mark a job as failed (worker exception, S3 fetch failure, etc.).
190 ///
191 /// Records `last_error` for admin inspection. The job stays in `failed`
192 /// status; an admin or operator can manually re-enqueue via the dashboard
193 /// (Phase 2) by inserting a fresh row.
194 #[tracing::instrument(skip_all, fields(%job_id))]
195 pub async fn mark_failed(db: &PgPool, job_id: Uuid, err: &str) -> Result<(), sqlx::Error> {
196 sqlx::query(
197 r#"
198 UPDATE scan_jobs
199 SET status = 'failed', completed_at = NOW(), last_error = $1
200 WHERE id = $2
201 "#,
202 )
203 .bind(err)
204 .bind(job_id)
205 .execute(db)
206 .await?;
207 Ok(())
208 }
209
210 /// Reset jobs that have been stuck in `running` longer than `max_age_secs`.
211 ///
212 /// Run on worker startup to recover from a previous-process crash mid-scan:
213 /// the row would otherwise stay `running` forever and never be re-claimed.
214 /// Increments `attempts` so a perpetually-crashing scan eventually trips the
215 /// retry budget.
216 #[tracing::instrument(skip_all)]
217 pub async fn reap_stuck(db: &PgPool, max_age_secs: i64) -> Result<u64, sqlx::Error> {
218 let affected = sqlx::query(
219 r#"
220 UPDATE scan_jobs
221 SET status = 'queued', started_at = NULL
222 WHERE status = 'running'
223 AND started_at < NOW() - ($1 || ' seconds')::interval
224 "#,
225 )
226 .bind(max_age_secs.to_string())
227 .execute(db)
228 .await?
229 .rows_affected();
230 Ok(affected)
231 }
232
233 /// Delete terminal-state rows older than `older_than`. Returns the count.
234 ///
235 /// Only touches `done`/`failed` rows — operational state (`queued`,
236 /// `running`) is owned by the worker loop and `reap_stuck`. The verdict
237 /// (Clean / Quarantined / HeldForReview) lives on the entity's
238 /// `scan_status` column, not here, so dropping a `done` row loses queue
239 /// history only, not malware-detection state.
240 ///
241 /// No supporting index today: at soft-launch volume Postgres seq-scans the
242 /// table fine. Revisit once `EXPLAIN ANALYZE` shows it as a bottleneck.
243 #[tracing::instrument(skip_all)]
244 pub async fn purge_old_terminal(
245 db: &PgPool,
246 older_than: chrono::Duration,
247 ) -> Result<u64, sqlx::Error> {
248 let cutoff = chrono::Utc::now() - older_than;
249 let n = sqlx::query(
250 r#"
251 DELETE FROM scan_jobs
252 WHERE status IN ('done', 'failed')
253 AND COALESCE(completed_at, started_at, enqueued_at) < $1
254 "#,
255 )
256 .bind(cutoff)
257 .execute(db)
258 .await?
259 .rows_affected();
260 Ok(n)
261 }
262
263 /// Count of currently-queued jobs. Used by the admin dashboard health panel
264 /// (Phase 2 of the audit). Allowed dead code until that route lands.
265 #[allow(dead_code)]
266 pub async fn queued_count(db: &PgPool) -> Result<i64, sqlx::Error> {
267 sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM scan_jobs WHERE status = 'queued'")
268 .fetch_one(db)
269 .await
270 }
271
272 /// Count of currently-running jobs. Phase 2 dashboard consumer.
273 #[allow(dead_code)]
274 pub async fn running_count(db: &PgPool) -> Result<i64, sqlx::Error> {
275 sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM scan_jobs WHERE status = 'running'")
276 .fetch_one(db)
277 .await
278 }
279
280 /// Count of running jobs that have been in flight longer than `max_age_secs`.
281 /// Used by PoM to alert on stuck workers.
282 pub async fn stuck_count(db: &PgPool, max_age_secs: i64) -> Result<i64, sqlx::Error> {
283 sqlx::query_scalar::<_, i64>(
284 "SELECT COUNT(*) FROM scan_jobs WHERE status = 'running' AND started_at < NOW() - ($1 || ' seconds')::interval",
285 )
286 .bind(max_age_secs.to_string())
287 .fetch_one(db)
288 .await
289 }
290
291 /// A held version with enough context to re-enqueue it for scanning.
292 #[allow(dead_code)]
293 #[derive(Debug, Clone, FromRow)]
294 pub struct RescanCandidateVersion {
295 pub version_id: Uuid,
296 pub s3_key: String,
297 pub file_size_bytes: i64,
298 pub user_id: UserId,
299 }
300
301 /// A held item (audio or cover) with re-enqueue context.
302 #[allow(dead_code)]
303 #[derive(Debug, Clone, FromRow)]
304 pub struct RescanCandidateItem {
305 pub item_id: Uuid,
306 pub s3_key: String,
307 pub file_size_bytes: i64,
308 pub user_id: UserId,
309 /// Audio / cover, selected by the query.
310 pub file_type: String,
311 }
312
313 /// Find currently-held versions with enough context to be re-scanned.
314 pub async fn rescan_candidates_versions(db: &PgPool) -> Result<Vec<RescanCandidateVersion>, sqlx::Error> {
315 sqlx::query_as::<_, RescanCandidateVersion>(
316 r#"
317 SELECT v.id AS version_id,
318 v.s3_key,
319 COALESCE(v.file_size_bytes, 0) AS file_size_bytes,
320 p.user_id
321 FROM versions v
322 JOIN items i ON i.id = v.item_id
323 JOIN projects p ON p.id = i.project_id
324 WHERE v.scan_status = 'held_for_review'
325 AND v.s3_key IS NOT NULL
326 "#,
327 )
328 .fetch_all(db)
329 .await
330 }
331
332 /// Find currently-held items (audio or cover) with re-enqueue context.
333 pub async fn rescan_candidates_items(db: &PgPool) -> Result<Vec<RescanCandidateItem>, sqlx::Error> {
334 sqlx::query_as::<_, RescanCandidateItem>(
335 r#"
336 SELECT i.id AS item_id,
337 COALESCE(i.audio_s3_key, i.cover_s3_key) AS s3_key,
338 COALESCE(i.audio_file_size_bytes, i.cover_file_size_bytes, 0) AS file_size_bytes,
339 p.user_id,
340 CASE WHEN i.audio_s3_key IS NOT NULL THEN 'audio' ELSE 'cover' END AS file_type
341 FROM items i
342 JOIN projects p ON p.id = i.project_id
343 WHERE i.scan_status = 'held_for_review'
344 AND COALESCE(i.audio_s3_key, i.cover_s3_key) IS NOT NULL
345 "#,
346 )
347 .fetch_all(db)
348 .await
349 }
350
351 #[cfg(test)]
352 mod tests {
353 use super::*;
354
355 #[test]
356 fn target_kind_round_trip() {
357 for kind in [
358 ScanTargetKind::Item,
359 ScanTargetKind::Version,
360 ScanTargetKind::Media,
361 ScanTargetKind::ProjectImage,
362 ScanTargetKind::ItemImage,
363 ScanTargetKind::GalleryImage,
364 ScanTargetKind::ContentInsertion,
365 ] {
366 assert_eq!(ScanTargetKind::from_str(kind.as_str()), Some(kind));
367 }
368 assert_eq!(ScanTargetKind::from_str("bogus"), None);
369 }
370 }
371