Skip to main content

max / makenotwork

10.1 KB · 313 lines History Blame Raw
1 //! Database operations for file scan results.
2
3 use chrono::{DateTime, Utc};
4 use sqlx::{FromRow, PgPool};
5 use uuid::Uuid;
6
7 use super::FileScanStatus;
8 use super::ItemId;
9 use super::UserId;
10 use super::VersionId;
11 use crate::scanning::ScanResult;
12
13 /// An item held for review, joined with creator info and latest scan layers.
14 #[derive(Debug, Clone, FromRow)]
15 pub struct HeldItemRow {
16 pub item_id: ItemId,
17 pub item_title: String,
18 pub s3_key: Option<String>,
19 pub creator_username: String,
20 pub creator_id: UserId,
21 pub upload_trusted: bool,
22 pub held_at: DateTime<Utc>,
23 /// Latest `file_scan_results.scan_layers` JSON for this entity's s3_key,
24 /// or `null` if no scan has run yet. The dashboard renders this as chips.
25 pub scan_layers: Option<serde_json::Value>,
26 }
27
28 /// A version held for review, joined with creator info and latest scan layers.
29 #[derive(Debug, Clone, FromRow)]
30 pub struct HeldVersionRow {
31 pub item_id: ItemId,
32 pub item_title: String,
33 pub version_id: VersionId,
34 pub version_number: String,
35 pub s3_key: Option<String>,
36 pub creator_username: String,
37 pub creator_id: UserId,
38 pub upload_trusted: bool,
39 pub held_at: DateTime<Utc>,
40 pub scan_layers: Option<serde_json::Value>,
41 }
42
43 /// Insert a scan result record for audit trail.
44 #[tracing::instrument(skip_all)]
45 /// Remove every CDN-served image row referencing `s3_key`, across the three
46 /// image tables that have no per-row scan gate: `item_images.s3_key`,
47 /// `project_images.s3_key`, and `content_insertions.storage_key`. Returns the
48 /// number of rows removed.
49 ///
50 /// On quarantine of these kinds, deleting the row IS the primary enforcement: it
51 /// stops the app from ever rendering the (Cloudflare-served) URL again, and —
52 /// critically — makes the key non-live so the durable S3-deletion queue will
53 /// actually purge the object instead of parking it behind the `is_s3_key_live`
54 /// guard. `storage_used` counters self-heal on the weekly
55 /// `recalculate_all_storage_used` pass; we accept a transient over-count for a
56 /// malicious upload rather than join through three ownership paths here.
57 #[tracing::instrument(skip_all)]
58 pub async fn purge_cdn_image_rows_by_key(db: &PgPool, s3_key: &str) -> Result<u64, sqlx::Error> {
59 let mut removed = 0u64;
60 for sql in [
61 "DELETE FROM item_images WHERE s3_key = $1",
62 "DELETE FROM project_images WHERE s3_key = $1",
63 "DELETE FROM content_insertions WHERE storage_key = $1",
64 ] {
65 removed += sqlx::query(sql).bind(s3_key).execute(db).await?.rows_affected();
66 }
67 Ok(removed)
68 }
69
70 pub async fn insert_scan_result(
71 db: &PgPool,
72 s3_key: &str,
73 result: &ScanResult,
74 ) -> Result<Uuid, sqlx::Error> {
75 let layers_json = serde_json::to_value(&result.layers)
76 .unwrap_or_else(|_| serde_json::Value::Array(vec![]));
77
78 let id = sqlx::query_scalar::<_, Uuid>(
79 r#"
80 INSERT INTO file_scan_results (s3_key, scan_status, scan_layers, sha256, file_size_bytes)
81 VALUES ($1, $2, $3, $4, $5)
82 RETURNING id
83 "#,
84 )
85 .bind(s3_key)
86 .bind(result.status)
87 .bind(&layers_json)
88 .bind(&result.sha256)
89 .bind(result.file_size as i64)
90 .fetch_one(db)
91 .await?;
92
93 Ok(id)
94 }
95
96 /// Update the scan_status column on an item.
97 #[tracing::instrument(skip_all)]
98 pub async fn update_item_scan_status(
99 db: &PgPool,
100 item_id: ItemId,
101 status: FileScanStatus,
102 ) -> Result<(), sqlx::Error> {
103 sqlx::query(
104 r#"
105 UPDATE items SET scan_status = $1, updated_at = NOW() WHERE id = $2
106 "#,
107 )
108 .bind(status)
109 .bind(item_id)
110 .execute(db)
111 .await?;
112
113 Ok(())
114 }
115
116 /// Update the scan_status column on a version.
117 #[tracing::instrument(skip_all)]
118 pub async fn update_version_scan_status(
119 db: &PgPool,
120 version_id: VersionId,
121 status: FileScanStatus,
122 ) -> Result<(), sqlx::Error> {
123 sqlx::query(
124 r#"
125 UPDATE versions SET scan_status = $1 WHERE id = $2
126 "#,
127 )
128 .bind(status)
129 .bind(version_id)
130 .execute(db)
131 .await?;
132
133 Ok(())
134 }
135
136 /// Update the scan_status column on a media file.
137 #[tracing::instrument(skip_all)]
138 pub async fn update_media_file_scan_status(
139 db: &PgPool,
140 media_file_id: crate::db::MediaFileId,
141 status: FileScanStatus,
142 ) -> Result<(), sqlx::Error> {
143 let id: uuid::Uuid = media_file_id.into();
144 sqlx::query("UPDATE media_files SET scan_status = $1 WHERE id = $2")
145 .bind(status)
146 .bind(id)
147 .execute(db)
148 .await?;
149 Ok(())
150 }
151
152 /// Get items held for review, joined with creator info + latest scan layers.
153 /// Oldest first.
154 #[tracing::instrument(skip_all)]
155 pub async fn get_held_items(db: &PgPool) -> Result<Vec<HeldItemRow>, sqlx::Error> {
156 let rows = sqlx::query_as::<_, HeldItemRow>(
157 r#"
158 SELECT i.id AS item_id, i.title AS item_title,
159 COALESCE(i.audio_s3_key, i.cover_s3_key) AS s3_key,
160 u.username AS creator_username, u.id AS creator_id,
161 u.upload_trusted, i.updated_at AS held_at,
162 (
163 SELECT fsr.scan_layers FROM file_scan_results fsr
164 WHERE fsr.s3_key = COALESCE(i.audio_s3_key, i.cover_s3_key)
165 ORDER BY fsr.scanned_at DESC LIMIT 1
166 ) AS scan_layers
167 FROM items i
168 JOIN projects p ON p.id = i.project_id
169 JOIN users u ON u.id = p.user_id
170 WHERE i.scan_status = 'held_for_review'
171 ORDER BY i.updated_at ASC
172 "#,
173 )
174 .fetch_all(db)
175 .await?;
176
177 Ok(rows)
178 }
179
180 /// Get versions held for review, joined with creator info + latest scan layers.
181 /// Oldest first.
182 #[tracing::instrument(skip_all)]
183 pub async fn get_held_versions(db: &PgPool) -> Result<Vec<HeldVersionRow>, sqlx::Error> {
184 let rows = sqlx::query_as::<_, HeldVersionRow>(
185 r#"
186 SELECT i.id AS item_id, i.title AS item_title,
187 v.id AS version_id, v.version_number,
188 v.s3_key,
189 u.username AS creator_username, u.id AS creator_id,
190 u.upload_trusted, v.created_at AS held_at,
191 (
192 SELECT fsr.scan_layers FROM file_scan_results fsr
193 WHERE fsr.s3_key = v.s3_key
194 ORDER BY fsr.scanned_at DESC LIMIT 1
195 ) AS scan_layers
196 FROM versions v
197 JOIN items i ON i.id = v.item_id
198 JOIN projects p ON p.id = i.project_id
199 JOIN users u ON u.id = p.user_id
200 WHERE v.scan_status = 'held_for_review'
201 ORDER BY v.created_at ASC
202 "#,
203 )
204 .fetch_all(db)
205 .await?;
206
207 Ok(rows)
208 }
209
210 /// Per-layer aggregate stats over a window for the admin dashboard.
211 #[derive(Debug, Clone, FromRow)]
212 pub struct LayerHealthRow {
213 pub layer: String,
214 pub pass_count: i64,
215 pub skip_count: i64,
216 pub fail_count: i64,
217 pub error_count: i64,
218 pub last_pass_or_skip: Option<DateTime<Utc>>,
219 }
220
221 /// Compute per-layer health stats over the last N hours.
222 ///
223 /// Reads `file_scan_results.scan_layers` JSONB and rolls up verdict counts
224 /// per layer. `last_pass_or_skip` is the most recent timestamp at which the
225 /// layer returned a non-error, non-fail verdict — the indicator the admin
226 /// panel uses to flag a layer as down.
227 #[tracing::instrument(skip_all)]
228 pub async fn layer_health_window(
229 db: &PgPool,
230 hours: i64,
231 ) -> Result<Vec<LayerHealthRow>, sqlx::Error> {
232 sqlx::query_as::<_, LayerHealthRow>(
233 r#"
234 WITH expanded AS (
235 SELECT fsr.scanned_at,
236 (l ->> 'layer') AS layer,
237 (l ->> 'verdict') AS verdict
238 FROM file_scan_results fsr,
239 jsonb_array_elements(fsr.scan_layers) AS l
240 WHERE fsr.scanned_at > NOW() - ($1 || ' hours')::interval
241 )
242 SELECT layer,
243 COUNT(*) FILTER (WHERE verdict = 'pass') AS pass_count,
244 COUNT(*) FILTER (WHERE verdict = 'skip') AS skip_count,
245 COUNT(*) FILTER (WHERE verdict = 'fail') AS fail_count,
246 COUNT(*) FILTER (WHERE verdict = 'error') AS error_count,
247 MAX(scanned_at) FILTER (WHERE verdict IN ('pass', 'skip')) AS last_pass_or_skip
248 FROM expanded
249 GROUP BY layer
250 ORDER BY layer
251 "#,
252 )
253 .bind(hours.to_string())
254 .fetch_all(db)
255 .await
256 }
257
258 /// A scan-history row for the dashboard's "Recent" grid.
259 #[derive(Debug, Clone, FromRow)]
260 pub struct ScanHistoryRow {
261 pub scanned_at: DateTime<Utc>,
262 pub s3_key: String,
263 pub scan_status: String,
264 pub sha256: Option<String>,
265 pub file_size_bytes: Option<i64>,
266 pub scan_layers: serde_json::Value,
267 }
268
269 /// Aggregate counts of entities currently in non-clean states. Used by the
270 /// PoM health endpoint to alert on growing review backlogs.
271 #[derive(Debug, Clone)]
272 pub struct HeldCounts {
273 pub held_versions: i64,
274 pub held_items: i64,
275 pub held_media: i64,
276 }
277
278 #[tracing::instrument(skip_all)]
279 pub async fn held_counts(db: &PgPool) -> Result<HeldCounts, sqlx::Error> {
280 let held_versions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM versions WHERE scan_status = 'held_for_review'")
281 .fetch_one(db).await?;
282 let held_items: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM items WHERE scan_status = 'held_for_review'")
283 .fetch_one(db).await?;
284 let held_media: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM media_files WHERE scan_status = 'held_for_review'")
285 .fetch_one(db).await?;
286 Ok(HeldCounts { held_versions, held_items, held_media })
287 }
288
289 /// Recent scan results across all entities. Newest first, capped at `limit`.
290 /// Used by the Recent History collapsible section. `since_hours` bounds the
291 /// window so the grid renders fast.
292 #[tracing::instrument(skip_all)]
293 pub async fn recent_history(
294 db: &PgPool,
295 since_hours: i64,
296 limit: i64,
297 ) -> Result<Vec<ScanHistoryRow>, sqlx::Error> {
298 sqlx::query_as::<_, ScanHistoryRow>(
299 r#"
300 SELECT fsr.scanned_at, fsr.s3_key, fsr.scan_status,
301 fsr.sha256, fsr.file_size_bytes, fsr.scan_layers
302 FROM file_scan_results fsr
303 WHERE fsr.scanned_at > NOW() - ($1 || ' hours')::interval
304 ORDER BY fsr.scanned_at DESC
305 LIMIT $2
306 "#,
307 )
308 .bind(since_hours.to_string())
309 .bind(limit)
310 .fetch_all(db)
311 .await
312 }
313