Skip to main content

max / makenotwork

22.4 KB · 584 lines History Blame Raw
1 //! Admin upload review queue + scan-pipeline dashboard.
2 //!
3 //! Backs the routes registered under `/admin/uploads` and `/api/admin/uploads`.
4 //! See `docs/scan-pipeline-audit.md` § 5 for the layout and behavior spec.
5
6 use std::collections::HashMap;
7
8 use axum::{
9 extract::{Path, State},
10 response::{IntoResponse, Response},
11 };
12
13 use crate::{
14 auth::AdminUser,
15 db::{self, scan_admin_actions::AdminAction, FileScanStatus, ItemId, UserId, VersionId},
16 error::Result,
17 helpers::get_csrf_token,
18 storage::FileType,
19 templates::*,
20 types::*,
21 AppState,
22 };
23
24 const HEALTH_WINDOW_HOURS: i64 = 24;
25 const STALE_LAYER_THRESHOLD_HOURS: i64 = 1;
26 const HISTORY_WINDOW_HOURS: i64 = 24 * 7;
27 const HISTORY_ROW_LIMIT: i64 = 100;
28 const AUDIT_LOG_ROW_LIMIT: i64 = 500;
29
30 /// Build the per-layer health-card vec from the DB rollup.
31 async fn fetch_layer_health(state: &AppState) -> Result<Vec<LayerHealthCard>> {
32 let rows = db::scanning::layer_health_window(&state.db, HEALTH_WINDOW_HOURS).await?;
33 let mut by_layer: HashMap<String, LayerHealthCard> = HashMap::new();
34
35 // The canonical layer list — keep cards visible even if a layer hasn't
36 // run at all in the window. The dashboard depends on seeing "clamav: ✗
37 // not running" rather than the layer just being absent.
38 for name in ["content_type", "structural", "archive", "yara", "signing_macos", "signing_windows", "signing_linux", "clamav", "malwarebazaar", "urlhaus", "metadefender"] {
39 by_layer.insert(name.to_string(), LayerHealthCard {
40 layer: name.to_string(),
41 total: 0,
42 success_rate_pct: 0,
43 error_rate_pct: 0,
44 fail_count: 0,
45 status_badge: "down",
46 last_seen: "never".to_string(),
47 });
48 }
49
50 for row in rows {
51 let total = row.pass_count + row.skip_count + row.fail_count + row.error_count;
52 let success = row.pass_count + row.skip_count;
53 let success_rate = if total > 0 { (100 * success / total) as i32 } else { 0 };
54 let error_rate = if total > 0 { (100 * row.error_count / total) as i32 } else { 0 };
55
56 let stale_cutoff = chrono::Utc::now() - chrono::Duration::hours(STALE_LAYER_THRESHOLD_HOURS);
57 let recent = row.last_pass_or_skip.is_some_and(|t| t > stale_cutoff);
58 let status_badge = if total == 0 || (!recent && row.error_count > 0) {
59 "down"
60 } else if error_rate > 10 {
61 "degraded"
62 } else {
63 "ok"
64 };
65
66 let last_seen = match row.last_pass_or_skip {
67 Some(t) => relative_age(t),
68 None => "never".to_string(),
69 };
70
71 by_layer.insert(row.layer.clone(), LayerHealthCard {
72 layer: row.layer,
73 total,
74 success_rate_pct: success_rate,
75 error_rate_pct: error_rate,
76 fail_count: row.fail_count,
77 status_badge,
78 last_seen,
79 });
80 }
81
82 // Stable ordering matches the pipeline's actual execution order.
83 let mut out: Vec<LayerHealthCard> = Vec::with_capacity(11);
84 for name in ["content_type", "structural", "archive", "yara", "signing_macos", "signing_windows", "signing_linux", "clamav", "malwarebazaar", "urlhaus", "metadefender"] {
85 if let Some(card) = by_layer.remove(name) {
86 out.push(card);
87 }
88 }
89 Ok(out)
90 }
91
92 fn relative_age(t: chrono::DateTime<chrono::Utc>) -> String {
93 let now = chrono::Utc::now();
94 let delta = now - t;
95 if delta.num_seconds() < 60 {
96 format!("{}s ago", delta.num_seconds().max(0))
97 } else if delta.num_minutes() < 60 {
98 format!("{}m ago", delta.num_minutes())
99 } else if delta.num_hours() < 24 {
100 format!("{}h ago", delta.num_hours())
101 } else {
102 format!("{}d ago", delta.num_days())
103 }
104 }
105
106 /// Render the admin upload review queue.
107 #[tracing::instrument(skip_all, name = "admin::admin_uploads")]
108 pub(super) async fn admin_uploads(
109 State(state): State<AppState>,
110 session: tower_sessions::Session,
111 AdminUser(user): AdminUser,
112 ) -> Result<impl IntoResponse> {
113 let csrf_token = get_csrf_token(&session).await;
114
115 let held_items = db::scanning::get_held_items(&state.db).await?;
116 let held_versions = db::scanning::get_held_versions(&state.db).await?;
117
118 let mut held_uploads: Vec<AdminHeldUploadRow> = Vec::new();
119 held_uploads.extend(held_items.iter().map(AdminHeldUploadRow::from_held_item));
120 held_uploads.extend(held_versions.iter().map(AdminHeldUploadRow::from_held_version));
121 held_uploads.sort_by(|a, b| a.held_at.cmp(&b.held_at));
122
123 // Attach the last admin action to each held row in a single batch lookup.
124 attach_last_actions(&state, &mut held_uploads).await;
125
126 let total_held = held_uploads.len();
127 let layer_health = fetch_layer_health(&state).await?;
128 let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0);
129 let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0);
130
131 let recent_history: Vec<ScanHistoryDisplay> = db::scanning::recent_history(
132 &state.db, HISTORY_WINDOW_HOURS, HISTORY_ROW_LIMIT,
133 ).await
134 .unwrap_or_default()
135 .iter()
136 .map(ScanHistoryDisplay::from_row)
137 .collect();
138 let history_total = recent_history.len();
139
140 Ok(AdminUploadsTemplate {
141 csrf_token,
142 session_user: Some(user),
143 held_uploads,
144 total_held,
145 admin_active_page: "uploads",
146 layer_health,
147 queue_pending,
148 queue_running,
149 recent_history,
150 history_total,
151 })
152 }
153
154 /// Batch-fetch the latest admin action for the displayed held rows and
155 /// attach each as `last_action` on its row.
156 async fn attach_last_actions(state: &AppState, rows: &mut [AdminHeldUploadRow]) {
157 use std::str::FromStr;
158 let mut version_ids: Vec<uuid::Uuid> = Vec::new();
159 let mut item_ids: Vec<uuid::Uuid> = Vec::new();
160 for r in rows.iter() {
161 if let Some(ref vid) = r.version_id
162 && let Ok(uuid) = uuid::Uuid::from_str(vid)
163 {
164 version_ids.push(uuid);
165 } else if let Ok(uuid) = uuid::Uuid::from_str(&r.item_id) {
166 item_ids.push(uuid);
167 }
168 }
169 let version_actions = db::scan_admin_actions::latest_per_version(&state.db, &version_ids)
170 .await.unwrap_or_default();
171 let item_actions = db::scan_admin_actions::latest_per_item(&state.db, &item_ids)
172 .await.unwrap_or_default();
173
174 for r in rows.iter_mut() {
175 let summary = if let Some(ref vid) = r.version_id {
176 uuid::Uuid::from_str(vid).ok().and_then(|u| version_actions.get(&u))
177 } else {
178 uuid::Uuid::from_str(&r.item_id).ok().and_then(|u| item_actions.get(&u))
179 };
180 if let Some(s) = summary {
181 r.last_action = Some(LastAction {
182 action: s.action.clone(),
183 admin_username: s.admin_username.clone(),
184 when: relative_age(s.created_at),
185 });
186 }
187 }
188 }
189
190 #[derive(serde::Deserialize, Default)]
191 pub(super) struct AuditFilters {
192 #[serde(default)]
193 pub action: Option<String>,
194 #[serde(default)]
195 pub admin: Option<String>,
196 #[serde(default)]
197 pub since_days: Option<i64>,
198 }
199
200 /// Full audit-log page with query-string filters.
201 ///
202 /// `?action=promote&admin=max&since_days=30` is the canonical example. All
203 /// filters are optional; empty/absent means no constraint on that column.
204 #[tracing::instrument(skip_all, name = "admin::scan_audit")]
205 pub(super) async fn admin_scan_audit(
206 State(state): State<AppState>,
207 session: tower_sessions::Session,
208 AdminUser(user): AdminUser,
209 axum::extract::Query(filters): axum::extract::Query<AuditFilters>,
210 ) -> Result<impl IntoResponse> {
211 let csrf_token = get_csrf_token(&session).await;
212 let action = filters.action.as_deref().filter(|s| !s.is_empty());
213 let admin = filters.admin.as_deref().filter(|s| !s.is_empty());
214 let entries: Vec<AdminAuditLogRow> = db::scan_admin_actions::list_filtered(
215 &state.db, action, admin, filters.since_days, AUDIT_LOG_ROW_LIMIT,
216 ).await
217 .unwrap_or_default()
218 .iter()
219 .map(AdminAuditLogRow::from_db)
220 .collect();
221 Ok(AdminScanAuditTemplate {
222 csrf_token,
223 session_user: Some(user),
224 admin_active_page: "uploads",
225 entries,
226 filter_action: filters.action.unwrap_or_default(),
227 filter_admin: filters.admin.unwrap_or_default(),
228 filter_since_days: filters.since_days.map(|d| d.to_string()).unwrap_or_default(),
229 })
230 }
231
232 /// Re-query held uploads and return the entries partial.
233 pub(super) async fn refresh_held_uploads_partial(state: &AppState) -> Result<Response> {
234 let held_items = db::scanning::get_held_items(&state.db).await?;
235 let held_versions = db::scanning::get_held_versions(&state.db).await?;
236
237 let mut held_uploads: Vec<AdminHeldUploadRow> = Vec::new();
238 held_uploads.extend(held_items.iter().map(AdminHeldUploadRow::from_held_item));
239 held_uploads.extend(held_versions.iter().map(AdminHeldUploadRow::from_held_version));
240 held_uploads.sort_by(|a, b| a.held_at.cmp(&b.held_at));
241
242 Ok(AdminUploadEntriesTemplate { held_uploads }.into_response())
243 }
244
245 // ─── Per-row actions ─────────────────────────────────────────────────────────
246
247 /// Promote a held item upload to Clean. Renames the legacy "approve" verb.
248 #[tracing::instrument(skip_all, name = "admin::promote_item")]
249 pub(super) async fn admin_promote_item(
250 State(state): State<AppState>,
251 AdminUser(admin): AdminUser,
252 Path(id): Path<ItemId>,
253 ) -> Result<Response> {
254 db::scanning::update_item_scan_status(&state.db, id, FileScanStatus::Clean).await?;
255 db::scan_admin_actions::log_item(
256 &state.db, id, admin.id, AdminAction::Promote,
257 Some("held_for_review"), Some("clean"),
258 None,
259 ).await.ok();
260 tracing::info!(item_id = %id, admin_id = %admin.id, "item promoted to clean");
261 refresh_held_uploads_partial(&state).await
262 }
263
264 /// Quarantine a held item upload. Renames the legacy "reject" verb.
265 #[tracing::instrument(skip_all, name = "admin::quarantine_item")]
266 pub(super) async fn admin_quarantine_item(
267 State(state): State<AppState>,
268 AdminUser(admin): AdminUser,
269 Path(id): Path<ItemId>,
270 ) -> Result<Response> {
271 db::scanning::update_item_scan_status(&state.db, id, FileScanStatus::Quarantined).await?;
272 db::scan_admin_actions::log_item(
273 &state.db, id, admin.id, AdminAction::Quarantine,
274 Some("held_for_review"), Some("quarantined"),
275 None,
276 ).await.ok();
277 tracing::info!(item_id = %id, admin_id = %admin.id, "item quarantined");
278 refresh_held_uploads_partial(&state).await
279 }
280
281 /// Promote a held version upload to Clean.
282 #[tracing::instrument(skip_all, name = "admin::promote_version")]
283 pub(super) async fn admin_promote_version(
284 State(state): State<AppState>,
285 AdminUser(admin): AdminUser,
286 Path(id): Path<VersionId>,
287 ) -> Result<Response> {
288 db::scanning::update_version_scan_status(&state.db, id, FileScanStatus::Clean).await?;
289 db::scan_admin_actions::log_version(
290 &state.db, id, admin.id, AdminAction::Promote,
291 Some("held_for_review"), Some("clean"),
292 None,
293 ).await.ok();
294 tracing::info!(version_id = %id, admin_id = %admin.id, "version promoted to clean");
295 refresh_held_uploads_partial(&state).await
296 }
297
298 /// Quarantine a held version upload.
299 #[tracing::instrument(skip_all, name = "admin::quarantine_version")]
300 pub(super) async fn admin_quarantine_version(
301 State(state): State<AppState>,
302 AdminUser(admin): AdminUser,
303 Path(id): Path<VersionId>,
304 ) -> Result<Response> {
305 db::scanning::update_version_scan_status(&state.db, id, FileScanStatus::Quarantined).await?;
306 db::scan_admin_actions::log_version(
307 &state.db, id, admin.id, AdminAction::Quarantine,
308 Some("held_for_review"), Some("quarantined"),
309 None,
310 ).await.ok();
311 tracing::info!(version_id = %id, admin_id = %admin.id, "version quarantined");
312 refresh_held_uploads_partial(&state).await
313 }
314
315 // ─── Rescan ──────────────────────────────────────────────────────────────────
316
317 /// Re-enqueue a single version for scanning. Worker picks it up on its next
318 /// claim cycle. The entity is moved back to Pending so the dashboard reflects
319 /// that a scan is in flight.
320 #[tracing::instrument(skip_all, name = "admin::rescan_version")]
321 pub(super) async fn admin_rescan_version(
322 State(state): State<AppState>,
323 AdminUser(admin): AdminUser,
324 Path(id): Path<VersionId>,
325 ) -> Result<Response> {
326 rescan_version_inner(&state, id, admin.id, AdminAction::Rescan).await?;
327 refresh_held_uploads_partial(&state).await
328 }
329
330 async fn rescan_version_inner(
331 state: &AppState,
332 id: VersionId,
333 admin_id: UserId,
334 action: AdminAction,
335 ) -> Result<()> {
336 let v = db::versions::get_version_by_id(&state.db, id).await?
337 .ok_or(crate::error::AppError::NotFound)?;
338 let item = db::items::get_item_by_id(&state.db, v.item_id).await?
339 .ok_or(crate::error::AppError::NotFound)?;
340 let owner = db::items::get_item_owner(&state.db, item.id).await?
341 .ok_or(crate::error::AppError::NotFound)?;
342 let s3_key = v.s3_key.clone().ok_or(crate::error::AppError::NotFound)?;
343 let size = v.file_size_bytes.unwrap_or(0);
344
345 // Route through commit_rescan so the enqueue → flip-to-Pending order is
346 // the same as the chronic-disease commit_upload seal — admin paths used
347 // to call scan_jobs::enqueue + update_*_scan_status directly, which made
348 // ordering bugs hard to fence at the type level.
349 crate::routes::storage::commit_rescan(
350 state,
351 crate::routes::storage::CommitTarget::Version(id),
352 &s3_key,
353 FileType::Download,
354 owner,
355 size,
356 ).await?;
357 db::scan_admin_actions::log_version(
358 &state.db, id, admin_id, action,
359 Some("held_for_review"), Some("pending"), None,
360 ).await.ok();
361 tracing::info!(version_id = %id, admin_id = %admin_id, "version re-enqueued for scan");
362 Ok(())
363 }
364
365 /// Re-enqueue a single item for scanning.
366 #[tracing::instrument(skip_all, name = "admin::rescan_item")]
367 pub(super) async fn admin_rescan_item(
368 State(state): State<AppState>,
369 AdminUser(admin): AdminUser,
370 Path(id): Path<ItemId>,
371 ) -> Result<Response> {
372 rescan_item_inner(&state, id, admin.id, AdminAction::Rescan).await?;
373 refresh_held_uploads_partial(&state).await
374 }
375
376 async fn rescan_item_inner(
377 state: &AppState,
378 id: ItemId,
379 admin_id: UserId,
380 action: AdminAction,
381 ) -> Result<()> {
382 let item = db::items::get_item_by_id(&state.db, id).await?
383 .ok_or(crate::error::AppError::NotFound)?;
384 let owner = db::items::get_item_owner(&state.db, id).await?
385 .ok_or(crate::error::AppError::NotFound)?;
386
387 let (s3_key, size, file_type) = if let Some(k) = item.audio_s3_key.clone() {
388 (k, item.audio_file_size_bytes.unwrap_or(0), FileType::Audio)
389 } else if let Some(k) = item.cover_s3_key.clone() {
390 (k, item.cover_file_size_bytes.unwrap_or(0), FileType::Cover)
391 } else {
392 return Err(crate::error::AppError::NotFound);
393 };
394
395 crate::routes::storage::commit_rescan(
396 state,
397 crate::routes::storage::CommitTarget::Item(id),
398 &s3_key,
399 file_type,
400 owner,
401 size,
402 ).await?;
403 db::scan_admin_actions::log_item(
404 &state.db, id, admin_id, action,
405 Some("held_for_review"), Some("pending"), None,
406 ).await.ok();
407 tracing::info!(item_id = %id, admin_id = %admin_id, "item re-enqueued for scan");
408 Ok(())
409 }
410
411 /// Bulk-rescan every currently-held item and version. Used to clear backlogs
412 /// accumulated under a previous broken pipeline configuration.
413 #[tracing::instrument(skip_all, name = "admin::bulk_rescan_held")]
414 pub(super) async fn admin_bulk_rescan_held(
415 State(state): State<AppState>,
416 AdminUser(admin): AdminUser,
417 ) -> Result<Response> {
418 let mut total = 0usize;
419
420 for cand in db::scan_jobs::rescan_candidates_versions(&state.db).await? {
421 let id = VersionId::from_uuid(cand.version_id);
422 if rescan_version_inner(&state, id, admin.id, AdminAction::BulkRescan).await.is_ok() {
423 total += 1;
424 }
425 }
426 for cand in db::scan_jobs::rescan_candidates_items(&state.db).await? {
427 let id = ItemId::from_uuid(cand.item_id);
428 if rescan_item_inner(&state, id, admin.id, AdminAction::BulkRescan).await.is_ok() {
429 total += 1;
430 }
431 }
432 tracing::info!(total, admin_id = %admin.id, "bulk rescan of held queue dispatched");
433 refresh_held_uploads_partial(&state).await
434 }
435
436 /// Bulk-promote every currently-held item and version to Clean. Sticky
437 /// decision; requires a non-empty note for audit-trail attribution. Use
438 /// after a manual review pass where the admin has eyes on the held set and
439 /// has confirmed each is safe.
440 ///
441 /// Distinct from bulk-rescan: rescan re-runs the pipeline (system decides);
442 /// promote bypasses the pipeline (admin decides). The note is the only
443 /// record of *why*, so we require it.
444 #[tracing::instrument(skip_all, name = "admin::bulk_promote_held")]
445 pub(super) async fn admin_bulk_promote_held(
446 State(state): State<AppState>,
447 AdminUser(admin): AdminUser,
448 axum::Form(form): axum::Form<BulkPromoteForm>,
449 ) -> Result<Response> {
450 let note = form.note.trim();
451 if note.is_empty() {
452 return Err(crate::error::AppError::BadRequest(
453 "Bulk promote requires a note explaining why every held file is safe to clear.".to_string(),
454 ));
455 }
456
457 let held_items = db::scanning::get_held_items(&state.db).await?;
458 let held_versions = db::scanning::get_held_versions(&state.db).await?;
459 let mut total = 0usize;
460
461 for v in &held_versions {
462 if db::scanning::update_version_scan_status(&state.db, v.version_id, FileScanStatus::Clean).await.is_ok() {
463 db::scan_admin_actions::log_version(
464 &state.db, v.version_id, admin.id, AdminAction::BulkPromote,
465 Some("held_for_review"), Some("clean"), Some(note),
466 ).await.ok();
467 total += 1;
468 }
469 }
470 for i in &held_items {
471 if db::scanning::update_item_scan_status(&state.db, i.item_id, FileScanStatus::Clean).await.is_ok() {
472 db::scan_admin_actions::log_item(
473 &state.db, i.item_id, admin.id, AdminAction::BulkPromote,
474 Some("held_for_review"), Some("clean"), Some(note),
475 ).await.ok();
476 total += 1;
477 }
478 }
479 tracing::warn!(total, admin_id = %admin.id, note = %note, "bulk promote of held queue executed");
480 refresh_held_uploads_partial(&state).await
481 }
482
483 #[derive(serde::Deserialize)]
484 pub(super) struct BulkPromoteForm {
485 pub note: String,
486 }
487
488 // ─── Public health endpoint (for PoM) ────────────────────────────────────────
489
490 #[derive(serde::Serialize)]
491 struct LayerHealthJson {
492 layer: String,
493 total_1h: i64,
494 success_1h: i64,
495 error_1h: i64,
496 last_clean_secs_ago: Option<i64>,
497 }
498
499 #[derive(serde::Serialize)]
500 struct ScanPipelineHealth {
501 queue_pending: i64,
502 queue_running: i64,
503 queue_stuck: i64,
504 held_versions: i64,
505 held_items: i64,
506 held_media: i64,
507 held_total: i64,
508 layers: Vec<LayerHealthJson>,
509 scan_spool_free_bytes: u64,
510 scan_spool_file_count: u64,
511 generated_at: String,
512 }
513
514 /// Aggregate scan-pipeline health stats for external monitors (PoM).
515 ///
516 /// **Unauthenticated** — returns counts only, no PII. The data is the same
517 /// shape the admin dashboard's health panel uses; the JSON contract is
518 /// stable for PoM threshold rules. Stuck threshold matches the worker's
519 /// reaper (300s).
520 #[tracing::instrument(skip_all, name = "admin::scan_health_json")]
521 pub(super) async fn scan_health_json(
522 State(state): State<AppState>,
523 ) -> Result<impl IntoResponse> {
524 let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0);
525 let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0);
526 let queue_stuck = db::scan_jobs::stuck_count(&state.db, 300).await.unwrap_or(0);
527 let held = db::scanning::held_counts(&state.db).await
528 .unwrap_or(db::scanning::HeldCounts { held_versions: 0, held_items: 0, held_media: 0 });
529 let held_total = held.held_versions + held.held_items + held.held_media;
530
531 let rows = db::scanning::layer_health_window(&state.db, 1).await.unwrap_or_default();
532 let now = chrono::Utc::now();
533 let layers: Vec<LayerHealthJson> = rows.into_iter().map(|r| {
534 let success = r.pass_count + r.skip_count;
535 let total = success + r.fail_count + r.error_count;
536 LayerHealthJson {
537 layer: r.layer,
538 total_1h: total,
539 success_1h: success,
540 error_1h: r.error_count,
541 last_clean_secs_ago: r.last_pass_or_skip.map(|t| (now - t).num_seconds()),
542 }
543 }).collect();
544
545 let spool_dir = std::path::Path::new(crate::constants::SCAN_SPOOL_DIR);
546 let scan_spool_free_bytes = fs2::available_space(if spool_dir.exists() {
547 spool_dir
548 } else {
549 spool_dir.parent().unwrap_or(std::path::Path::new("/"))
550 })
551 .unwrap_or(0);
552 let scan_spool_file_count = std::fs::read_dir(spool_dir)
553 .map(|rd| rd.flatten().filter(|e| e.file_type().map(|t| t.is_file()).unwrap_or(false)).count() as u64)
554 .unwrap_or(0);
555
556 let body = ScanPipelineHealth {
557 queue_pending,
558 queue_running,
559 queue_stuck,
560 held_versions: held.held_versions,
561 held_items: held.held_items,
562 held_media: held.held_media,
563 held_total,
564 layers,
565 scan_spool_free_bytes,
566 scan_spool_file_count,
567 generated_at: now.to_rfc3339(),
568 };
569 Ok(axum::Json(body))
570 }
571
572 // ─── Live partials ───────────────────────────────────────────────────────────
573
574 /// HTMX partial: current pending + running counts. Polled by the dashboard.
575 #[tracing::instrument(skip_all, name = "admin::queue_summary_partial")]
576 pub(super) async fn admin_queue_summary_partial(
577 State(state): State<AppState>,
578 AdminUser(_admin): AdminUser,
579 ) -> Result<impl IntoResponse> {
580 let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0);
581 let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0);
582 Ok(AdminQueueSummaryTemplate { queue_pending, queue_running })
583 }
584