Skip to main content

max / makenotwork

2.3 KB · 49 lines History Blame Raw
1 -- Async scan worker queue.
2 --
3 -- Before this migration, every upload route called `scan_and_classify`
4 -- synchronously: the request handler held the S3 download + the full
5 -- in-memory scan pipeline (incl. third-party network calls to abuse.ch /
6 -- ClamAV) on its own thread, with the scan semaphore serializing concurrent
7 -- uploads. A stuck third-party endpoint could stall every upload.
8 --
9 -- Going forward, the upload route enqueues a row here and returns immediately
10 -- with `scan_status = 'pending'`. A pool of scan workers (spawned from
11 -- main.rs) drains the queue with FOR UPDATE SKIP LOCKED, runs the pipeline,
12 -- and updates the target entity's scan_status when done.
13 --
14 -- target_kind/target_id identifies which entity to update on completion:
15 -- 'version' → versions.id
16 -- 'item' → items.id (item-level uploads, image-only items)
17 -- 'project_image' → projects.id (project cover image)
18 -- 'item_image' → items.id (item cover image; same row, different field)
19 -- 'media' → media.id
20 -- 'content_insertion' → content_insertions.id
21
22 CREATE TABLE scan_jobs (
23 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
24 target_kind TEXT NOT NULL CHECK (target_kind IN (
25 'version', 'item', 'project_image', 'item_image', 'media', 'content_insertion'
26 )),
27 target_id UUID NOT NULL,
28 s3_key TEXT NOT NULL,
29 file_type TEXT NOT NULL,
30 user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
31 file_size_bytes BIGINT NOT NULL,
32 status TEXT NOT NULL DEFAULT 'queued' CHECK (status IN ('queued', 'running', 'done', 'failed')),
33 attempts INT NOT NULL DEFAULT 0,
34 enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
35 started_at TIMESTAMPTZ,
36 completed_at TIMESTAMPTZ,
37 last_error TEXT
38 );
39
40 -- Worker claim path: pull the oldest queued job. Partial index keeps it tiny
41 -- once the queue mostly contains completed/failed rows.
42 CREATE INDEX idx_scan_jobs_queued ON scan_jobs (enqueued_at) WHERE status = 'queued';
43
44 -- Stuck-scan detection in the admin dashboard.
45 CREATE INDEX idx_scan_jobs_running ON scan_jobs (started_at) WHERE status = 'running';
46
47 -- History lookups by target entity (e.g. "show me the scan history for this version").
48 CREATE INDEX idx_scan_jobs_target ON scan_jobs (target_kind, target_id);
49