//! Database operations for file scan results. use chrono::{DateTime, Utc}; use sqlx::{FromRow, PgPool}; use uuid::Uuid; use super::FileScanStatus; use super::ItemId; use super::UserId; use super::VersionId; use crate::scanning::ScanResult; /// An item held for review, joined with creator info and latest scan layers. #[derive(Debug, Clone, FromRow)] pub struct HeldItemRow { pub item_id: ItemId, pub item_title: String, pub s3_key: Option, pub creator_username: String, pub creator_id: UserId, pub upload_trusted: bool, pub held_at: DateTime, /// Latest `file_scan_results.scan_layers` JSON for this entity's s3_key, /// or `null` if no scan has run yet. The dashboard renders this as chips. pub scan_layers: Option, } /// A version held for review, joined with creator info and latest scan layers. #[derive(Debug, Clone, FromRow)] pub struct HeldVersionRow { pub item_id: ItemId, pub item_title: String, pub version_id: VersionId, pub version_number: String, pub s3_key: Option, pub creator_username: String, pub creator_id: UserId, pub upload_trusted: bool, pub held_at: DateTime, pub scan_layers: Option, } /// Insert a scan result record for audit trail. #[tracing::instrument(skip_all)] /// Remove every CDN-served image row referencing `s3_key`, across the three /// image tables that have no per-row scan gate: `item_images.s3_key`, /// `project_images.s3_key`, and `content_insertions.storage_key`. Returns the /// number of rows removed. /// /// On quarantine of these kinds, deleting the row IS the primary enforcement: it /// stops the app from ever rendering the (Cloudflare-served) URL again, and — /// critically — makes the key non-live so the durable S3-deletion queue will /// actually purge the object instead of parking it behind the `is_s3_key_live` /// guard. `storage_used` counters self-heal on the weekly /// `recalculate_all_storage_used` pass; we accept a transient over-count for a /// malicious upload rather than join through three ownership paths here. #[tracing::instrument(skip_all)] pub async fn purge_cdn_image_rows_by_key(db: &PgPool, s3_key: &str) -> Result { let mut removed = 0u64; for sql in [ "DELETE FROM item_images WHERE s3_key = $1", "DELETE FROM project_images WHERE s3_key = $1", "DELETE FROM content_insertions WHERE storage_key = $1", ] { removed += sqlx::query(sql).bind(s3_key).execute(db).await?.rows_affected(); } Ok(removed) } pub async fn insert_scan_result( db: &PgPool, s3_key: &str, result: &ScanResult, ) -> Result { let layers_json = serde_json::to_value(&result.layers) .unwrap_or_else(|_| serde_json::Value::Array(vec![])); let id = sqlx::query_scalar::<_, Uuid>( r#" INSERT INTO file_scan_results (s3_key, scan_status, scan_layers, sha256, file_size_bytes) VALUES ($1, $2, $3, $4, $5) RETURNING id "#, ) .bind(s3_key) .bind(result.status) .bind(&layers_json) .bind(&result.sha256) .bind(result.file_size as i64) .fetch_one(db) .await?; Ok(id) } /// Update the scan_status column on an item. #[tracing::instrument(skip_all)] pub async fn update_item_scan_status( db: &PgPool, item_id: ItemId, status: FileScanStatus, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE items SET scan_status = $1, updated_at = NOW() WHERE id = $2 "#, ) .bind(status) .bind(item_id) .execute(db) .await?; Ok(()) } /// Update the scan_status column on a version. #[tracing::instrument(skip_all)] pub async fn update_version_scan_status( db: &PgPool, version_id: VersionId, status: FileScanStatus, ) -> Result<(), sqlx::Error> { sqlx::query( r#" UPDATE versions SET scan_status = $1 WHERE id = $2 "#, ) .bind(status) .bind(version_id) .execute(db) .await?; Ok(()) } /// Update the scan_status column on a media file. #[tracing::instrument(skip_all)] pub async fn update_media_file_scan_status( db: &PgPool, media_file_id: crate::db::MediaFileId, status: FileScanStatus, ) -> Result<(), sqlx::Error> { let id: uuid::Uuid = media_file_id.into(); sqlx::query("UPDATE media_files SET scan_status = $1 WHERE id = $2") .bind(status) .bind(id) .execute(db) .await?; Ok(()) } /// Get items held for review, joined with creator info + latest scan layers. /// Oldest first. #[tracing::instrument(skip_all)] pub async fn get_held_items(db: &PgPool) -> Result, sqlx::Error> { let rows = sqlx::query_as::<_, HeldItemRow>( r#" SELECT i.id AS item_id, i.title AS item_title, COALESCE(i.audio_s3_key, i.cover_s3_key) AS s3_key, u.username AS creator_username, u.id AS creator_id, u.upload_trusted, i.updated_at AS held_at, ( SELECT fsr.scan_layers FROM file_scan_results fsr WHERE fsr.s3_key = COALESCE(i.audio_s3_key, i.cover_s3_key) ORDER BY fsr.scanned_at DESC LIMIT 1 ) AS scan_layers FROM items i JOIN projects p ON p.id = i.project_id JOIN users u ON u.id = p.user_id WHERE i.scan_status = 'held_for_review' ORDER BY i.updated_at ASC "#, ) .fetch_all(db) .await?; Ok(rows) } /// Get versions held for review, joined with creator info + latest scan layers. /// Oldest first. #[tracing::instrument(skip_all)] pub async fn get_held_versions(db: &PgPool) -> Result, sqlx::Error> { let rows = sqlx::query_as::<_, HeldVersionRow>( r#" SELECT i.id AS item_id, i.title AS item_title, v.id AS version_id, v.version_number, v.s3_key, u.username AS creator_username, u.id AS creator_id, u.upload_trusted, v.created_at AS held_at, ( SELECT fsr.scan_layers FROM file_scan_results fsr WHERE fsr.s3_key = v.s3_key ORDER BY fsr.scanned_at DESC LIMIT 1 ) AS scan_layers FROM versions v JOIN items i ON i.id = v.item_id JOIN projects p ON p.id = i.project_id JOIN users u ON u.id = p.user_id WHERE v.scan_status = 'held_for_review' ORDER BY v.created_at ASC "#, ) .fetch_all(db) .await?; Ok(rows) } /// Per-layer aggregate stats over a window for the admin dashboard. #[derive(Debug, Clone, FromRow)] pub struct LayerHealthRow { pub layer: String, pub pass_count: i64, pub skip_count: i64, pub fail_count: i64, pub error_count: i64, pub last_pass_or_skip: Option>, } /// Compute per-layer health stats over the last N hours. /// /// Reads `file_scan_results.scan_layers` JSONB and rolls up verdict counts /// per layer. `last_pass_or_skip` is the most recent timestamp at which the /// layer returned a non-error, non-fail verdict — the indicator the admin /// panel uses to flag a layer as down. #[tracing::instrument(skip_all)] pub async fn layer_health_window( db: &PgPool, hours: i64, ) -> Result, sqlx::Error> { sqlx::query_as::<_, LayerHealthRow>( r#" WITH expanded AS ( SELECT fsr.scanned_at, (l ->> 'layer') AS layer, (l ->> 'verdict') AS verdict FROM file_scan_results fsr, jsonb_array_elements(fsr.scan_layers) AS l WHERE fsr.scanned_at > NOW() - ($1 || ' hours')::interval ) SELECT layer, COUNT(*) FILTER (WHERE verdict = 'pass') AS pass_count, COUNT(*) FILTER (WHERE verdict = 'skip') AS skip_count, COUNT(*) FILTER (WHERE verdict = 'fail') AS fail_count, COUNT(*) FILTER (WHERE verdict = 'error') AS error_count, MAX(scanned_at) FILTER (WHERE verdict IN ('pass', 'skip')) AS last_pass_or_skip FROM expanded GROUP BY layer ORDER BY layer "#, ) .bind(hours.to_string()) .fetch_all(db) .await } /// A scan-history row for the dashboard's "Recent" grid. #[derive(Debug, Clone, FromRow)] pub struct ScanHistoryRow { pub scanned_at: DateTime, pub s3_key: String, pub scan_status: String, pub sha256: Option, pub file_size_bytes: Option, pub scan_layers: serde_json::Value, } /// Aggregate counts of entities currently in non-clean states. Used by the /// PoM health endpoint to alert on growing review backlogs. #[derive(Debug, Clone)] pub struct HeldCounts { pub held_versions: i64, pub held_items: i64, pub held_media: i64, } #[tracing::instrument(skip_all)] pub async fn held_counts(db: &PgPool) -> Result { let held_versions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM versions WHERE scan_status = 'held_for_review'") .fetch_one(db).await?; let held_items: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM items WHERE scan_status = 'held_for_review'") .fetch_one(db).await?; let held_media: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM media_files WHERE scan_status = 'held_for_review'") .fetch_one(db).await?; Ok(HeldCounts { held_versions, held_items, held_media }) } /// Recent scan results across all entities. Newest first, capped at `limit`. /// Used by the Recent History collapsible section. `since_hours` bounds the /// window so the grid renders fast. #[tracing::instrument(skip_all)] pub async fn recent_history( db: &PgPool, since_hours: i64, limit: i64, ) -> Result, sqlx::Error> { sqlx::query_as::<_, ScanHistoryRow>( r#" SELECT fsr.scanned_at, fsr.s3_key, fsr.scan_status, fsr.sha256, fsr.file_size_bytes, fsr.scan_layers FROM file_scan_results fsr WHERE fsr.scanned_at > NOW() - ($1 || ' hours')::interval ORDER BY fsr.scanned_at DESC LIMIT $2 "#, ) .bind(since_hours.to_string()) .bind(limit) .fetch_all(db) .await }