Skip to main content

max / makenotwork

server: add /admin/uploads/health.json for PoM scan-pipeline check Unauthenticated aggregate stats endpoint for external monitors (PoM). Returns counts only, no PII. Same shape the admin dashboard health panel uses; JSON contract is stable for PoM threshold rules. Body: queue_pending / queue_running / queue_stuck (>300s) held_versions / held_items / held_media / held_total layers[]: { layer, total_1h, success_1h, error_1h, last_clean_secs_ago } generated_at Supporting queries: - db::scan_jobs::stuck_count for the 5-min worker reaper threshold. - db::scanning::held_counts for backlog visibility across all three scan_status-bearing tables (versions, items, media_files).
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-24 22:24 UTC
Commit: 0962909b083d6722d8830a82967895c9805328c2
Parent: d225f23
4 files changed, +101 insertions, -0 deletions
@@ -225,6 +225,17 @@ pub async fn running_count(db: &PgPool) -> Result<i64, sqlx::Error> {
225 225 .await
226 226 }
227 227
228 + /// Count of running jobs that have been in flight longer than `max_age_secs`.
229 + /// Used by PoM to alert on stuck workers.
230 + pub async fn stuck_count(db: &PgPool, max_age_secs: i64) -> Result<i64, sqlx::Error> {
231 + sqlx::query_scalar::<_, i64>(
232 + "SELECT COUNT(*) FROM scan_jobs WHERE status = 'running' AND started_at < NOW() - ($1 || ' seconds')::interval",
233 + )
234 + .bind(max_age_secs.to_string())
235 + .fetch_one(db)
236 + .await
237 + }
238 +
228 239 /// A held version with enough context to re-enqueue it for scanning.
229 240 #[allow(dead_code)]
230 241 #[derive(Debug, Clone, FromRow)]
@@ -225,6 +225,26 @@ pub struct ScanHistoryRow {
225 225 pub scan_layers: serde_json::Value,
226 226 }
227 227
228 + /// Aggregate counts of entities currently in non-clean states. Used by the
229 + /// PoM health endpoint to alert on growing review backlogs.
230 + #[derive(Debug, Clone)]
231 + pub struct HeldCounts {
232 + pub held_versions: i64,
233 + pub held_items: i64,
234 + pub held_media: i64,
235 + }
236 +
237 + #[tracing::instrument(skip_all)]
238 + pub async fn held_counts(db: &PgPool) -> Result<HeldCounts, sqlx::Error> {
239 + let held_versions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM versions WHERE scan_status = 'held_for_review'")
240 + .fetch_one(db).await?;
241 + let held_items: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM items WHERE scan_status = 'held_for_review'")
242 + .fetch_one(db).await?;
243 + let held_media: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM media_files WHERE scan_status = 'held_for_review'")
244 + .fetch_one(db).await?;
245 + Ok(HeldCounts { held_versions, held_items, held_media })
246 + }
247 +
228 248 /// Recent scan results across all entities. Newest first, capped at `limit`.
229 249 /// Used by the Recent History collapsible section. `since_hours` bounds the
230 250 /// window so the grid renders fast.
@@ -49,6 +49,7 @@ pub fn admin_routes() -> Router<AppState> {
49 49 .route("/api/admin/uploads/bulk/promote", post(uploads::admin_bulk_promote_held))
50 50 .route("/admin/uploads/queue-summary", get(uploads::admin_queue_summary_partial))
51 51 .route("/admin/uploads/audit", get(uploads::admin_scan_audit))
52 + .route("/admin/uploads/health.json", get(uploads::scan_health_json))
52 53 .route("/api/admin/users/{id}/trust", post(users::admin_trust_user))
53 54 .route("/api/admin/users/{id}/untrust", post(users::admin_untrust_user))
54 55 // Appeals
@@ -477,6 +477,75 @@ pub(super) struct BulkPromoteForm {
477 477 pub note: String,
478 478 }
479 479
480 + // ─── Public health endpoint (for PoM) ────────────────────────────────────────
481 +
482 + #[derive(serde::Serialize)]
483 + struct LayerHealthJson {
484 + layer: String,
485 + total_1h: i64,
486 + success_1h: i64,
487 + error_1h: i64,
488 + last_clean_secs_ago: Option<i64>,
489 + }
490 +
491 + #[derive(serde::Serialize)]
492 + struct ScanPipelineHealth {
493 + queue_pending: i64,
494 + queue_running: i64,
495 + queue_stuck: i64,
496 + held_versions: i64,
497 + held_items: i64,
498 + held_media: i64,
499 + held_total: i64,
500 + layers: Vec<LayerHealthJson>,
501 + generated_at: String,
502 + }
503 +
504 + /// Aggregate scan-pipeline health stats for external monitors (PoM).
505 + ///
506 + /// **Unauthenticated** — returns counts only, no PII. The data is the same
507 + /// shape the admin dashboard's health panel uses; the JSON contract is
508 + /// stable for PoM threshold rules. Stuck threshold matches the worker's
509 + /// reaper (300s).
510 + #[tracing::instrument(skip_all, name = "admin::scan_health_json")]
511 + pub(super) async fn scan_health_json(
512 + State(state): State<AppState>,
513 + ) -> Result<impl IntoResponse> {
514 + let queue_pending = db::scan_jobs::queued_count(&state.db).await.unwrap_or(0);
515 + let queue_running = db::scan_jobs::running_count(&state.db).await.unwrap_or(0);
516 + let queue_stuck = db::scan_jobs::stuck_count(&state.db, 300).await.unwrap_or(0);
517 + let held = db::scanning::held_counts(&state.db).await
518 + .unwrap_or(db::scanning::HeldCounts { held_versions: 0, held_items: 0, held_media: 0 });
519 + let held_total = held.held_versions + held.held_items + held.held_media;
520 +
521 + let rows = db::scanning::layer_health_window(&state.db, 1).await.unwrap_or_default();
522 + let now = chrono::Utc::now();
523 + let layers: Vec<LayerHealthJson> = rows.into_iter().map(|r| {
524 + let success = r.pass_count + r.skip_count;
525 + let total = success + r.fail_count + r.error_count;
526 + LayerHealthJson {
527 + layer: r.layer,
528 + total_1h: total,
529 + success_1h: success,
530 + error_1h: r.error_count,
531 + last_clean_secs_ago: r.last_pass_or_skip.map(|t| (now - t).num_seconds()),
532 + }
533 + }).collect();
534 +
535 + let body = ScanPipelineHealth {
536 + queue_pending,
537 + queue_running,
538 + queue_stuck,
539 + held_versions: held.held_versions,
540 + held_items: held.held_items,
541 + held_media: held.held_media,
542 + held_total,
543 + layers,
544 + generated_at: now.to_rfc3339(),
545 + };
546 + Ok(axum::Json(body))
547 + }
548 +
480 549 // ─── Live partials ───────────────────────────────────────────────────────────
481 550
482 551 /// HTMX partial: current pending + running counts. Polled by the dashboard.