//! Admin upload review queue + scan-pipeline dashboard. //! //! Backs the routes registered under `/admin/uploads` and `/api/admin/uploads`. //! See `docs/scan-pipeline-audit.md` § 5 for the layout and behavior spec. use std::collections::HashMap; use axum::{ extract::{Path, State}, response::{IntoResponse, Response}, }; use crate::{ auth::AdminUser, db::{self, scan_admin_actions::AdminAction, FileScanStatus, ItemId, UserId, VersionId}, error::Result, helpers::get_csrf_token, storage::FileType, templates::*, types::*, AppState, }; const HEALTH_WINDOW_HOURS: i64 = 24; const STALE_LAYER_THRESHOLD_HOURS: i64 = 1; const HISTORY_WINDOW_HOURS: i64 = 24 * 7; const HISTORY_ROW_LIMIT: i64 = 100; const AUDIT_LOG_ROW_LIMIT: i64 = 500; /// Build the per-layer health-card vec from the DB rollup. async fn fetch_layer_health(state: &AppState) -> Result> { let rows = db::scanning::layer_health_window(&state.db, HEALTH_WINDOW_HOURS).await?; let mut by_layer: HashMap = HashMap::new(); // The canonical layer list — keep cards visible even if a layer hasn't // run at all in the window. The dashboard depends on seeing "clamav: ✗ // not running" rather than the layer just being absent. for name in ["content_type", "structural", "archive", "yara", "signing_macos", "signing_windows", "signing_linux", "clamav", "malwarebazaar", "urlhaus", "metadefender"] { by_layer.insert(name.to_string(), LayerHealthCard { layer: name.to_string(), total: 0, success_rate_pct: 0, error_rate_pct: 0, fail_count: 0, status_badge: "down", last_seen: "never".to_string(), }); } for row in rows { let total = row.pass_count + row.skip_count + row.fail_count + row.error_count; let success = row.pass_count + row.skip_count; let success_rate = if total > 0 { (100 * success / total) as i32 } else { 0 }; let error_rate = if total > 0 { (100 * row.error_count / total) as i32 } else { 0 }; let stale_cutoff = chrono::Utc::now() - chrono::Duration::hours(STALE_LAYER_THRESHOLD_HOURS); let recent = row.last_pass_or_skip.is_some_and(|t| t > stale_cutoff); let status_badge = if total == 0 || (!recent && row.error_count > 0) { "down" } else if error_rate > 10 { "degraded" } else { "ok" }; let last_seen = match row.last_pass_or_skip { Some(t) => relative_age(t), None => "never".to_string(), }; by_layer.insert(row.layer.clone(), LayerHealthCard { layer: row.layer, total, success_rate_pct: success_rate, error_rate_pct: error_rate, fail_count: row.fail_count, status_badge, last_seen, }); } // Stable ordering matches the pipeline's actual execution order. let mut out: Vec = Vec::with_capacity(11); for name in ["content_type", "structural", "archive", "yara", "signing_macos", "signing_windows", "signing_linux", "clamav", "malwarebazaar", "urlhaus", "metadefender"] { if let Some(card) = by_layer.remove(name) { out.push(card); } } Ok(out) } fn relative_age(t: chrono::DateTime) -> String { let now = chrono::Utc::now(); let delta = now - t; if delta.num_seconds() < 60 { format!("{}s ago", delta.num_seconds().max(0)) } else if delta.num_minutes() < 60 { format!("{}m ago", delta.num_minutes()) } else if delta.num_hours() < 24 { format!("{}h ago", delta.num_hours()) } else { format!("{}d ago", delta.num_days()) } } /// Render the admin upload review queue. #[tracing::instrument(skip_all, name = "admin::admin_uploads")] pub(super) async fn admin_uploads( State(state): State, session: tower_sessions::Session, AdminUser(user): AdminUser, ) -> Result { let csrf_token = get_csrf_token(&session).await; let held_items = db::scanning::get_held_items(&state.db).await?; let held_versions = db::scanning::get_held_versions(&state.db).await?; let mut held_uploads: Vec = Vec::new(); held_uploads.extend(held_items.iter().map(AdminHeldUploadRow::from_held_item)); held_uploads.extend(held_versions.iter().map(AdminHeldUploadRow::from_held_version)); held_uploads.sort_by(|a, b| a.held_at.cmp(&b.held_at)); // Attach the last admin action to each held row in a single batch lookup. attach_last_actions(&state, &mut held_uploads).await; let total_held = held_uploads.len(); let layer_health = fetch_layer_health(&state).await?; let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0); let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0); let recent_history: Vec = db::scanning::recent_history( &state.db, HISTORY_WINDOW_HOURS, HISTORY_ROW_LIMIT, ).await .unwrap_or_default() .iter() .map(ScanHistoryDisplay::from_row) .collect(); let history_total = recent_history.len(); Ok(AdminUploadsTemplate { csrf_token, session_user: Some(user), held_uploads, total_held, admin_active_page: "uploads", layer_health, queue_pending, queue_running, recent_history, history_total, }) } /// Batch-fetch the latest admin action for the displayed held rows and /// attach each as `last_action` on its row. async fn attach_last_actions(state: &AppState, rows: &mut [AdminHeldUploadRow]) { use std::str::FromStr; let mut version_ids: Vec = Vec::new(); let mut item_ids: Vec = Vec::new(); for r in rows.iter() { if let Some(ref vid) = r.version_id && let Ok(uuid) = uuid::Uuid::from_str(vid) { version_ids.push(uuid); } else if let Ok(uuid) = uuid::Uuid::from_str(&r.item_id) { item_ids.push(uuid); } } let version_actions = db::scan_admin_actions::latest_per_version(&state.db, &version_ids) .await.unwrap_or_default(); let item_actions = db::scan_admin_actions::latest_per_item(&state.db, &item_ids) .await.unwrap_or_default(); for r in rows.iter_mut() { let summary = if let Some(ref vid) = r.version_id { uuid::Uuid::from_str(vid).ok().and_then(|u| version_actions.get(&u)) } else { uuid::Uuid::from_str(&r.item_id).ok().and_then(|u| item_actions.get(&u)) }; if let Some(s) = summary { r.last_action = Some(LastAction { action: s.action.clone(), admin_username: s.admin_username.clone(), when: relative_age(s.created_at), }); } } } #[derive(serde::Deserialize, Default)] pub(super) struct AuditFilters { #[serde(default)] pub action: Option, #[serde(default)] pub admin: Option, #[serde(default)] pub since_days: Option, } /// Full audit-log page with query-string filters. /// /// `?action=promote&admin=max&since_days=30` is the canonical example. All /// filters are optional; empty/absent means no constraint on that column. #[tracing::instrument(skip_all, name = "admin::scan_audit")] pub(super) async fn admin_scan_audit( State(state): State, session: tower_sessions::Session, AdminUser(user): AdminUser, axum::extract::Query(filters): axum::extract::Query, ) -> Result { let csrf_token = get_csrf_token(&session).await; let action = filters.action.as_deref().filter(|s| !s.is_empty()); let admin = filters.admin.as_deref().filter(|s| !s.is_empty()); let entries: Vec = db::scan_admin_actions::list_filtered( &state.db, action, admin, filters.since_days, AUDIT_LOG_ROW_LIMIT, ).await .unwrap_or_default() .iter() .map(AdminAuditLogRow::from_db) .collect(); Ok(AdminScanAuditTemplate { csrf_token, session_user: Some(user), admin_active_page: "uploads", entries, filter_action: filters.action.unwrap_or_default(), filter_admin: filters.admin.unwrap_or_default(), filter_since_days: filters.since_days.map(|d| d.to_string()).unwrap_or_default(), }) } /// Re-query held uploads and return the entries partial. pub(super) async fn refresh_held_uploads_partial(state: &AppState) -> Result { let held_items = db::scanning::get_held_items(&state.db).await?; let held_versions = db::scanning::get_held_versions(&state.db).await?; let mut held_uploads: Vec = Vec::new(); held_uploads.extend(held_items.iter().map(AdminHeldUploadRow::from_held_item)); held_uploads.extend(held_versions.iter().map(AdminHeldUploadRow::from_held_version)); held_uploads.sort_by(|a, b| a.held_at.cmp(&b.held_at)); Ok(AdminUploadEntriesTemplate { held_uploads }.into_response()) } // ─── Per-row actions ───────────────────────────────────────────────────────── /// Promote a held item upload to Clean. Renames the legacy "approve" verb. #[tracing::instrument(skip_all, name = "admin::promote_item")] pub(super) async fn admin_promote_item( State(state): State, AdminUser(admin): AdminUser, Path(id): Path, ) -> Result { db::scanning::update_item_scan_status(&state.db, id, FileScanStatus::Clean).await?; db::scan_admin_actions::log_item( &state.db, id, admin.id, AdminAction::Promote, Some("held_for_review"), Some("clean"), None, ).await.ok(); tracing::info!(item_id = %id, admin_id = %admin.id, "item promoted to clean"); refresh_held_uploads_partial(&state).await } /// Quarantine a held item upload. Renames the legacy "reject" verb. #[tracing::instrument(skip_all, name = "admin::quarantine_item")] pub(super) async fn admin_quarantine_item( State(state): State, AdminUser(admin): AdminUser, Path(id): Path, ) -> Result { db::scanning::update_item_scan_status(&state.db, id, FileScanStatus::Quarantined).await?; db::scan_admin_actions::log_item( &state.db, id, admin.id, AdminAction::Quarantine, Some("held_for_review"), Some("quarantined"), None, ).await.ok(); tracing::info!(item_id = %id, admin_id = %admin.id, "item quarantined"); refresh_held_uploads_partial(&state).await } /// Promote a held version upload to Clean. #[tracing::instrument(skip_all, name = "admin::promote_version")] pub(super) async fn admin_promote_version( State(state): State, AdminUser(admin): AdminUser, Path(id): Path, ) -> Result { db::scanning::update_version_scan_status(&state.db, id, FileScanStatus::Clean).await?; db::scan_admin_actions::log_version( &state.db, id, admin.id, AdminAction::Promote, Some("held_for_review"), Some("clean"), None, ).await.ok(); tracing::info!(version_id = %id, admin_id = %admin.id, "version promoted to clean"); refresh_held_uploads_partial(&state).await } /// Quarantine a held version upload. #[tracing::instrument(skip_all, name = "admin::quarantine_version")] pub(super) async fn admin_quarantine_version( State(state): State, AdminUser(admin): AdminUser, Path(id): Path, ) -> Result { db::scanning::update_version_scan_status(&state.db, id, FileScanStatus::Quarantined).await?; db::scan_admin_actions::log_version( &state.db, id, admin.id, AdminAction::Quarantine, Some("held_for_review"), Some("quarantined"), None, ).await.ok(); tracing::info!(version_id = %id, admin_id = %admin.id, "version quarantined"); refresh_held_uploads_partial(&state).await } // ─── Rescan ────────────────────────────────────────────────────────────────── /// Re-enqueue a single version for scanning. Worker picks it up on its next /// claim cycle. The entity is moved back to Pending so the dashboard reflects /// that a scan is in flight. #[tracing::instrument(skip_all, name = "admin::rescan_version")] pub(super) async fn admin_rescan_version( State(state): State, AdminUser(admin): AdminUser, Path(id): Path, ) -> Result { rescan_version_inner(&state, id, admin.id, AdminAction::Rescan).await?; refresh_held_uploads_partial(&state).await } async fn rescan_version_inner( state: &AppState, id: VersionId, admin_id: UserId, action: AdminAction, ) -> Result<()> { let v = db::versions::get_version_by_id(&state.db, id).await? .ok_or(crate::error::AppError::NotFound)?; let item = db::items::get_item_by_id(&state.db, v.item_id).await? .ok_or(crate::error::AppError::NotFound)?; let owner = db::items::get_item_owner(&state.db, item.id).await? .ok_or(crate::error::AppError::NotFound)?; let s3_key = v.s3_key.clone().ok_or(crate::error::AppError::NotFound)?; let size = v.file_size_bytes.unwrap_or(0); // Route through commit_rescan so the enqueue → flip-to-Pending order is // the same as the chronic-disease commit_upload seal — admin paths used // to call scan_jobs::enqueue + update_*_scan_status directly, which made // ordering bugs hard to fence at the type level. crate::routes::storage::commit_rescan( state, crate::routes::storage::CommitTarget::Version(id), &s3_key, FileType::Download, owner, size, ).await?; db::scan_admin_actions::log_version( &state.db, id, admin_id, action, Some("held_for_review"), Some("pending"), None, ).await.ok(); tracing::info!(version_id = %id, admin_id = %admin_id, "version re-enqueued for scan"); Ok(()) } /// Re-enqueue a single item for scanning. #[tracing::instrument(skip_all, name = "admin::rescan_item")] pub(super) async fn admin_rescan_item( State(state): State, AdminUser(admin): AdminUser, Path(id): Path, ) -> Result { rescan_item_inner(&state, id, admin.id, AdminAction::Rescan).await?; refresh_held_uploads_partial(&state).await } async fn rescan_item_inner( state: &AppState, id: ItemId, admin_id: UserId, action: AdminAction, ) -> Result<()> { let item = db::items::get_item_by_id(&state.db, id).await? .ok_or(crate::error::AppError::NotFound)?; let owner = db::items::get_item_owner(&state.db, id).await? .ok_or(crate::error::AppError::NotFound)?; let (s3_key, size, file_type) = if let Some(k) = item.audio_s3_key.clone() { (k, item.audio_file_size_bytes.unwrap_or(0), FileType::Audio) } else if let Some(k) = item.cover_s3_key.clone() { (k, item.cover_file_size_bytes.unwrap_or(0), FileType::Cover) } else { return Err(crate::error::AppError::NotFound); }; crate::routes::storage::commit_rescan( state, crate::routes::storage::CommitTarget::Item(id), &s3_key, file_type, owner, size, ).await?; db::scan_admin_actions::log_item( &state.db, id, admin_id, action, Some("held_for_review"), Some("pending"), None, ).await.ok(); tracing::info!(item_id = %id, admin_id = %admin_id, "item re-enqueued for scan"); Ok(()) } /// Bulk-rescan every currently-held item and version. Used to clear backlogs /// accumulated under a previous broken pipeline configuration. #[tracing::instrument(skip_all, name = "admin::bulk_rescan_held")] pub(super) async fn admin_bulk_rescan_held( State(state): State, AdminUser(admin): AdminUser, ) -> Result { let mut total = 0usize; for cand in db::scan_jobs::rescan_candidates_versions(&state.db).await? { let id = VersionId::from_uuid(cand.version_id); if rescan_version_inner(&state, id, admin.id, AdminAction::BulkRescan).await.is_ok() { total += 1; } } for cand in db::scan_jobs::rescan_candidates_items(&state.db).await? { let id = ItemId::from_uuid(cand.item_id); if rescan_item_inner(&state, id, admin.id, AdminAction::BulkRescan).await.is_ok() { total += 1; } } tracing::info!(total, admin_id = %admin.id, "bulk rescan of held queue dispatched"); refresh_held_uploads_partial(&state).await } /// Bulk-promote every currently-held item and version to Clean. Sticky /// decision; requires a non-empty note for audit-trail attribution. Use /// after a manual review pass where the admin has eyes on the held set and /// has confirmed each is safe. /// /// Distinct from bulk-rescan: rescan re-runs the pipeline (system decides); /// promote bypasses the pipeline (admin decides). The note is the only /// record of *why*, so we require it. #[tracing::instrument(skip_all, name = "admin::bulk_promote_held")] pub(super) async fn admin_bulk_promote_held( State(state): State, AdminUser(admin): AdminUser, axum::Form(form): axum::Form, ) -> Result { let note = form.note.trim(); if note.is_empty() { return Err(crate::error::AppError::BadRequest( "Bulk promote requires a note explaining why every held file is safe to clear.".to_string(), )); } let held_items = db::scanning::get_held_items(&state.db).await?; let held_versions = db::scanning::get_held_versions(&state.db).await?; let mut total = 0usize; for v in &held_versions { if db::scanning::update_version_scan_status(&state.db, v.version_id, FileScanStatus::Clean).await.is_ok() { db::scan_admin_actions::log_version( &state.db, v.version_id, admin.id, AdminAction::BulkPromote, Some("held_for_review"), Some("clean"), Some(note), ).await.ok(); total += 1; } } for i in &held_items { if db::scanning::update_item_scan_status(&state.db, i.item_id, FileScanStatus::Clean).await.is_ok() { db::scan_admin_actions::log_item( &state.db, i.item_id, admin.id, AdminAction::BulkPromote, Some("held_for_review"), Some("clean"), Some(note), ).await.ok(); total += 1; } } tracing::warn!(total, admin_id = %admin.id, note = %note, "bulk promote of held queue executed"); refresh_held_uploads_partial(&state).await } #[derive(serde::Deserialize)] pub(super) struct BulkPromoteForm { pub note: String, } // ─── Public health endpoint (for PoM) ──────────────────────────────────────── #[derive(serde::Serialize)] struct LayerHealthJson { layer: String, total_1h: i64, success_1h: i64, error_1h: i64, last_clean_secs_ago: Option, } #[derive(serde::Serialize)] struct ScanPipelineHealth { queue_pending: i64, queue_running: i64, queue_stuck: i64, held_versions: i64, held_items: i64, held_media: i64, held_total: i64, layers: Vec, scan_spool_free_bytes: u64, scan_spool_file_count: u64, generated_at: String, } /// Aggregate scan-pipeline health stats for external monitors (PoM). /// /// **Unauthenticated** — returns counts only, no PII. The data is the same /// shape the admin dashboard's health panel uses; the JSON contract is /// stable for PoM threshold rules. Stuck threshold matches the worker's /// reaper (300s). #[tracing::instrument(skip_all, name = "admin::scan_health_json")] pub(super) async fn scan_health_json( State(state): State, ) -> Result { let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0); let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0); let queue_stuck = db::scan_jobs::stuck_count(&state.db, 300).await.unwrap_or(0); let held = db::scanning::held_counts(&state.db).await .unwrap_or(db::scanning::HeldCounts { held_versions: 0, held_items: 0, held_media: 0 }); let held_total = held.held_versions + held.held_items + held.held_media; let rows = db::scanning::layer_health_window(&state.db, 1).await.unwrap_or_default(); let now = chrono::Utc::now(); let layers: Vec = rows.into_iter().map(|r| { let success = r.pass_count + r.skip_count; let total = success + r.fail_count + r.error_count; LayerHealthJson { layer: r.layer, total_1h: total, success_1h: success, error_1h: r.error_count, last_clean_secs_ago: r.last_pass_or_skip.map(|t| (now - t).num_seconds()), } }).collect(); let spool_dir = std::path::Path::new(crate::constants::SCAN_SPOOL_DIR); let scan_spool_free_bytes = fs2::available_space(if spool_dir.exists() { spool_dir } else { spool_dir.parent().unwrap_or(std::path::Path::new("/")) }) .unwrap_or(0); let scan_spool_file_count = std::fs::read_dir(spool_dir) .map(|rd| rd.flatten().filter(|e| e.file_type().map(|t| t.is_file()).unwrap_or(false)).count() as u64) .unwrap_or(0); let body = ScanPipelineHealth { queue_pending, queue_running, queue_stuck, held_versions: held.held_versions, held_items: held.held_items, held_media: held.held_media, held_total, layers, scan_spool_free_bytes, scan_spool_file_count, generated_at: now.to_rfc3339(), }; Ok(axum::Json(body)) } // ─── Live partials ─────────────────────────────────────────────────────────── /// HTMX partial: current pending + running counts. Polled by the dashboard. #[tracing::instrument(skip_all, name = "admin::queue_summary_partial")] pub(super) async fn admin_queue_summary_partial( State(state): State, AdminUser(_admin): AdminUser, ) -> Result { let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0); let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0); Ok(AdminQueueSummaryTemplate { queue_pending, queue_running }) }