//! HTTP API for serve mode — exposes health check data to consumers like MNW. use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use axum::extract::{Path, Request, State as AxumState}; use axum::http::StatusCode; use axum::middleware::{self, Next}; use axum::response::IntoResponse; use axum::routing::get; use axum::{Json, Router}; use serde::Serialize; use crate::checks::http::{compute_test_staleness, detect_test_duration_drift}; use crate::config::Config; use crate::db; use crate::peer::SharedMeshState; use crate::types::{HealthSnapshot, LatencyBucket, LatencyStats, TestStaleness}; /// Fixed-window rate limiter: allows `max_per_window` requests per `window_duration`. #[derive(Clone)] pub struct RateLimiter { count: Arc, window_start: Arc>, max_per_window: u64, window_duration: std::time::Duration, } impl RateLimiter { pub fn new(max_per_window: u64, window_duration: std::time::Duration) -> Self { Self { count: Arc::new(AtomicU64::new(0)), window_start: Arc::new(std::sync::Mutex::new(std::time::Instant::now())), max_per_window, window_duration, } } pub fn try_acquire(&self) -> bool { let mut start = self.window_start.lock().unwrap(); let now = std::time::Instant::now(); if now.duration_since(*start) > self.window_duration { *start = now; self.count.store(1, Ordering::Release); true } else { let prev = self.count.fetch_add(1, Ordering::Acquire); prev < self.max_per_window } } } /// Shared state for the API server. #[derive(Clone)] pub struct ApiState { pub pool: sqlx::SqlitePool, pub config: Arc, pub mesh: Option, pub rate_limiter: RateLimiter, } /// Rate limiting middleware. Returns 429 if the request rate exceeds the limit. async fn rate_limit( AxumState(state): AxumState, req: Request, next: Next, ) -> impl IntoResponse { if state.rate_limiter.try_acquire() { Ok(next.run(req).await) } else { Err((StatusCode::TOO_MANY_REQUESTS, Json(serde_json::json!({ "error": "rate limit exceeded" })))) } } /// Bearer token authentication middleware. /// If `api_token` is configured, requires `Authorization: Bearer ` on every request. /// If no token is configured, all requests pass through. async fn require_bearer_token( AxumState(state): AxumState, req: Request, next: Next, ) -> impl IntoResponse { let expected = state.config.serve.api_token.as_deref(); let Some(expected) = expected else { return Ok(next.run(req).await); }; let auth_header = req.headers().get("authorization").and_then(|v| v.to_str().ok()); match auth_header { Some(header) if header.starts_with("Bearer ") => { let token = &header[7..]; // Constant-time comparison to prevent timing side-channels use subtle::ConstantTimeEq; if token.as_bytes().ct_eq(expected.as_bytes()).into() { Ok(next.run(req).await) } else { Err((StatusCode::UNAUTHORIZED, Json(serde_json::json!({ "error": "invalid bearer token" })))) } } _ => Err((StatusCode::UNAUTHORIZED, Json(serde_json::json!({ "error": "missing or malformed Authorization header" })))), } } /// `GET /api/health` — simple health endpoint for PoM itself. /// Not behind auth — allows external monitoring without credentials. async fn self_health() -> impl IntoResponse { Json(serde_json::json!({ "status": "operational", "version": env!("CARGO_PKG_VERSION"), })) } /// Build the axum router for the PoM API. pub fn router(pool: sqlx::SqlitePool, config: Config, mesh: Option) -> Router { let state = ApiState { pool, config: Arc::new(config), mesh, rate_limiter: RateLimiter::new(60, std::time::Duration::from_secs(60)), }; // Authenticated routes (behind bearer token + rate limit) let authenticated = Router::new() .route("/api/status", get(status_all)) .route("/api/status/{target}", get(status_target)) .route("/api/trends/{target}", get(trends)) .route("/api/peer/info", get(peer_info)) .route("/api/peer/status", get(peer_status)) .route("/api/mesh", get(mesh_view)) .layer(middleware::from_fn_with_state(state.clone(), require_bearer_token)) .layer(middleware::from_fn_with_state(state.clone(), rate_limit)); // Public routes (no auth, no rate limit) let public = Router::new() .route("/api/health", get(self_health)); let mut app = public.merge(authenticated); if state.config.serve.dashboard { app = app.route("/", get(crate::dashboard::dashboard_handler)); } app.with_state(state) } // --- Response types --- #[derive(Serialize)] struct StatusResponse { /// Per-target status summaries, keyed by target config name. targets: HashMap, } #[derive(Serialize)] struct TargetStatus { /// Human-readable display label for this target. label: String, /// Most recent health check snapshot. `None` if no checks have been recorded yet. latest: Option, /// Last 10 health check snapshots, most recent first. recent: Vec, /// Uptime percentage over the last 24 hours. `None` if no checks in that window. uptime_24h: Option, /// Uptime percentage over the last 7 days. `None` if no checks in that window. uptime_7d: Option, /// Latency statistics over the last 24 hours. Omitted if no operational checks exist. #[serde(skip_serializing_if = "Option::is_none")] latency_24h: Option, /// Latest TLS certificate check result. Omitted if TLS monitoring is not configured. #[serde(skip_serializing_if = "Option::is_none")] tls: Option, /// Test staleness assessment. Omitted if test running is not configured for this target. #[serde(skip_serializing_if = "Option::is_none")] test_staleness: Option, /// Currently open incident. Omitted if the target is not in an incident state. #[serde(skip_serializing_if = "Option::is_none")] current_incident: Option, /// Recent resolved and open incidents (up to 10). Omitted if empty. #[serde(skip_serializing_if = "Vec::is_empty")] incidents: Vec, /// Latest route check results per path. Omitted if empty. #[serde(skip_serializing_if = "Vec::is_empty")] route_status: Vec, /// Latest DNS check results. Omitted if empty. #[serde(skip_serializing_if = "Vec::is_empty")] dns_status: Vec, /// Latest WHOIS check result. Omitted if no WHOIS monitoring is configured. #[serde(skip_serializing_if = "Option::is_none")] whois: Option, /// Test duration drift warning. Omitted if no drift detected or no test config. #[serde(skip_serializing_if = "Option::is_none")] test_duration_drift: Option, } #[derive(Serialize)] struct DnsStatusJson { name: String, record_type: String, expected: Vec, actual: Vec, matches: bool, checked_at: String, } #[derive(Serialize)] struct RouteStatusJson { path: String, status_code: i64, ok: bool, checked_at: String, response_time_ms: i64, } #[derive(Serialize)] struct SnapshotJson { /// Health status as a lowercase string (e.g. "operational", "degraded"). status: String, /// Timestamp of the check in RFC 3339 format. checked_at: String, /// Round-trip response time in milliseconds. response_time_ms: i64, /// Structured health details from the endpoint. Omitted when unavailable. #[serde(skip_serializing_if = "Option::is_none")] details: Option, /// Error message if the check failed. Omitted on success. #[serde(skip_serializing_if = "Option::is_none")] error: Option, } impl From for SnapshotJson { fn from(s: HealthSnapshot) -> Self { Self { status: s.status.to_string(), checked_at: s.checked_at, response_time_ms: s.response_time_ms, details: s.details.map(|d| serde_json::to_value(d).unwrap_or_default()), error: s.error, } } } /// Build a `TargetStatus` for a single target. async fn build_target_status( pool: &sqlx::SqlitePool, name: &str, label: &str, config: &Config, ) -> TargetStatus { let recent = db::get_health_history(pool, Some(name), 10) .await .unwrap_or_default(); // Extract the version info we need before consuming the snapshots. let latest_version = recent.first() .and_then(|s| s.details.as_ref()) .and_then(|d| d.version.clone()); let latest = recent.first().cloned().map(SnapshotJson::from); let recent_json: Vec = recent.into_iter().map(SnapshotJson::from).collect(); let uptime_24h = db::get_uptime_percent(pool, name, 24) .await .unwrap_or(None); let uptime_7d = db::get_uptime_percent(pool, name, 168) .await .unwrap_or(None); // Compute 24h latency stats from operational checks let latency_24h = { let cutoff = (chrono::Utc::now() - chrono::Duration::hours(24)).to_rfc3339(); let times = db::get_response_times(pool, name, &cutoff) .await .unwrap_or_default(); let operational_times: Vec = times.iter() .filter(|(_, ms)| *ms > 0) .map(|(_, ms)| *ms) .collect(); LatencyStats::from_times(&operational_times) }; let tls = db::get_latest_tls_check(pool, name) .await .unwrap_or(None); // Compute test staleness for targets with test config let test_staleness = if let Some(target_config) = config.get_target(name) && let Some(tests_config) = &target_config.tests { let current_version = latest_version.clone(); let latest_test = db::get_latest_test_run(pool, name).await.unwrap_or(None); let tested_version = if let Some(ref test) = latest_test { db::get_version_at_time(pool, name, &test.started_at) .await .unwrap_or(None) } else { None }; let staleness = compute_test_staleness( current_version.as_deref(), tested_version.as_deref(), latest_test.as_ref().map(|t| t.started_at.as_str()), tests_config.staleness_days, ); Some(staleness) } else { None }; // Compute test duration drift for targets with test config let test_duration_drift = if config.get_target(name).and_then(|t| t.tests.as_ref()).is_some() { let durations = db::get_test_durations(pool, name, 13) .await .unwrap_or_default(); detect_test_duration_drift(&durations, 10, 3, 1.5) } else { None }; let current_incident = db::get_open_incident(pool, name) .await .unwrap_or(None); let incidents = db::get_recent_incidents(pool, name, 10) .await .unwrap_or_default(); let route_checks = db::get_latest_route_checks(pool, name) .await .unwrap_or_default(); let expected_routes: HashSet<&str> = config.get_target(name) .map(|t| t.expected_routes.iter().map(|s| s.as_str()).collect()) .unwrap_or_default(); let route_status: Vec = route_checks .into_iter() .filter(|r| expected_routes.contains(r.path.as_str())) .map(|r| RouteStatusJson { path: r.path, status_code: r.status_code, ok: r.ok, checked_at: r.checked_at, response_time_ms: r.response_time_ms, }) .collect(); let dns_checks = db::get_latest_dns_checks(pool, name) .await .unwrap_or_default(); let expected_dns: HashSet<(String, String)> = config.get_target(name) .map(|t| t.dns.iter().map(|d| (d.name.clone(), d.record_type.to_string())).collect()) .unwrap_or_default(); let dns_status: Vec = dns_checks .into_iter() .filter(|r| expected_dns.contains(&(r.name.clone(), r.record_type.clone()))) .map(|r| DnsStatusJson { name: r.name, record_type: r.record_type, expected: serde_json::from_str(&r.expected).unwrap_or_default(), actual: serde_json::from_str(&r.actual).unwrap_or_default(), matches: r.matches, checked_at: r.checked_at, }) .collect(); let whois = db::get_latest_whois_check(pool, name) .await .unwrap_or(None); TargetStatus { label: label.to_string(), latest, recent: recent_json, uptime_24h, uptime_7d, latency_24h, tls, test_staleness, test_duration_drift, current_incident, incidents, route_status, dns_status, whois, } } /// `GET /api/status` — JSON summary for all targets. async fn status_all( AxumState(state): AxumState, ) -> impl IntoResponse { let mut targets = HashMap::new(); for name in state.config.target_names() { if let Some(target_config) = state.config.get_target(&name) { let status = build_target_status(&state.pool, &name, &target_config.label, &state.config).await; targets.insert(name, status); } } Json(StatusResponse { targets }) } /// `GET /api/status/{target}` — JSON summary for a single target. async fn status_target( AxumState(state): AxumState, Path(target): Path, ) -> impl IntoResponse { let Some(target_config) = state.config.get_target(&target) else { return Err((StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": format!("unknown target: {target}") })))); }; let status = build_target_status(&state.pool, &target, &target_config.label, &state.config).await; Ok(Json(status)) } // --- Peer endpoints --- /// `GET /api/peer/info` — Returns this instance's identity info. async fn peer_info( AxumState(state): AxumState, ) -> impl IntoResponse { let Some(ref mesh) = state.mesh else { return Err((StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({ "error": "peer mesh not enabled" })))); }; let mesh_state = mesh.read().await; Ok(Json(serde_json::to_value(&mesh_state.instance).unwrap_or_default())) } /// `GET /api/peer/status` — This instance's full view: own info + target statuses + peer summaries. async fn peer_status( AxumState(state): AxumState, ) -> impl IntoResponse { let Some(ref mesh) = state.mesh else { return Err((StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({ "error": "peer mesh not enabled" })))); }; // Collect mesh data under lock, then drop lock before DB queries let (instance, peers) = { let mesh_state = mesh.read().await; let instance = mesh_state.instance.clone(); let peers: HashMap = mesh_state.peers.iter().map(|(name, peer)| { (name.clone(), serde_json::json!({ "status": peer.status, "last_seen": peer.last_seen, "latency_ms": peer.latency_ms, })) }).collect(); (instance, peers) }; // Build target statuses (DB queries with no lock held) let mut targets = HashMap::new(); for name in state.config.target_names() { if let Some(target_config) = state.config.get_target(&name) && let Ok(Some(latest)) = db::get_latest_health(&state.pool, &name).await { targets.insert(name, serde_json::json!({ "label": target_config.label, "status": latest.status.to_string(), "response_time_ms": latest.response_time_ms, "checked_at": latest.checked_at, })); } } Ok(Json(serde_json::json!({ "instance": instance, "targets": targets, "peers": peers, }))) } /// `GET /api/mesh` — Aggregated view: self + each peer's cached status. async fn mesh_view( AxumState(state): AxumState, ) -> impl IntoResponse { let Some(ref mesh) = state.mesh else { return Err((StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({ "error": "peer mesh not enabled" })))); }; // Collect all mesh data under lock, then drop lock before DB queries let (instance, own_peers_json, peer_entries) = { let mesh_state = mesh.read().await; let instance = mesh_state.instance.clone(); let own_peers: HashMap = mesh_state.peers.iter().map(|(name, peer)| { (name.clone(), serde_json::json!({ "status": peer.status, "last_seen": peer.last_seen, "latency_ms": peer.latency_ms, })) }).collect(); let peer_entries: Vec<(String, Option, serde_json::Value)> = mesh_state.peers.iter().map(|(name, peer)| { let fallback = serde_json::json!({ "status": peer.status, "last_seen": peer.last_seen, "error": "no status data cached", }); (name.clone(), peer.status_data.clone(), fallback) }).collect(); (instance, own_peers, peer_entries) }; // Build target statuses (DB queries with no lock held) let mut targets = HashMap::new(); for name in state.config.target_names() { if let Some(target_config) = state.config.get_target(&name) && let Ok(Some(latest)) = db::get_latest_health(&state.pool, &name).await { targets.insert(name, serde_json::json!({ "label": target_config.label, "status": latest.status.to_string(), "response_time_ms": latest.response_time_ms, "checked_at": latest.checked_at, })); } } let self_entry = serde_json::json!({ "instance": instance, "targets": targets, "peers": own_peers_json, }); let mut instances = serde_json::Map::new(); instances.insert(instance.name.clone(), self_entry); for (name, status_data, fallback) in peer_entries { instances.insert(name, status_data.unwrap_or(fallback)); } Ok(Json(serde_json::json!({ "instances": instances, }))) } // --- Trends endpoint --- #[derive(Serialize)] struct TrendResponse { /// Target config name this trend data belongs to. target: String, /// Requested time window in hours (from query param, default 24). window_hours: u64, /// Requested bucket width in minutes (from query param, default 60). bucket_minutes: u64, /// Per-bucket latency statistics within the requested window. buckets: Vec, /// Aggregate latency statistics across the entire requested window. overall: Option, /// 7-day baseline latency statistics for drift comparison. baseline: Option, } /// `GET /api/trends/{target}?hours=24&bucket_minutes=60` — latency trend data. async fn trends( AxumState(state): AxumState, Path(target): Path, axum::extract::Query(params): axum::extract::Query, ) -> impl IntoResponse { let Some(_target_config) = state.config.get_target(&target) else { return Err((StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": format!("unknown target: {target}") })))); }; let hours = params.hours.unwrap_or(24); let bucket_minutes = params.bucket_minutes.unwrap_or(60); let cutoff = (chrono::Utc::now() - chrono::Duration::hours(hours as i64)).to_rfc3339(); let times = db::get_response_times(&state.pool, &target, &cutoff) .await .unwrap_or_default(); let operational_times: Vec = times.iter() .filter(|(_, ms)| *ms > 0) .map(|(_, ms)| *ms) .collect(); let overall = LatencyStats::from_times(&operational_times); let operational_data: Vec<(String, i64)> = times.into_iter() .filter(|(_, ms)| *ms > 0) .collect(); let buckets = LatencyStats::bucket_by_time(&operational_data, bucket_minutes); // 7d baseline for reference let baseline_cutoff = (chrono::Utc::now() - chrono::Duration::hours(168)).to_rfc3339(); let baseline_times = db::get_response_times(&state.pool, &target, &baseline_cutoff) .await .unwrap_or_default(); let baseline_operational: Vec = baseline_times.iter() .filter(|(_, ms)| *ms > 0) .map(|(_, ms)| *ms) .collect(); let baseline = LatencyStats::from_times(&baseline_operational); Ok(Json(TrendResponse { target, window_hours: hours, bucket_minutes, buckets, overall, baseline, })) } #[derive(serde::Deserialize)] struct TrendQueryParams { /// Time window to query, in hours. Defaults to 24 if omitted. hours: Option, /// Width of each latency bucket, in minutes. Defaults to 60 if omitted. bucket_minutes: Option, } #[cfg(test)] mod tests { use super::*; use axum::body::Body; use axum::http::Request as HttpRequest; use tower::ServiceExt; fn test_config(api_token: Option<&str>) -> Config { let mut config = Config { serve: crate::config::ServeConfig::default(), instance: Default::default(), targets: HashMap::new(), peers: HashMap::new(), alerts: None, }; config.serve.api_token = api_token.map(|s| s.to_string()); config } #[tokio::test] async fn no_token_configured_allows_all_requests() { let pool = crate::db::connect_in_memory().await.unwrap(); let app = router(pool, test_config(None), None); let req = HttpRequest::builder() .uri("/api/status") .body(Body::empty()) .unwrap(); let resp = app.oneshot(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); } #[tokio::test] async fn valid_token_allows_request() { let pool = crate::db::connect_in_memory().await.unwrap(); let app = router(pool, test_config(Some("secret123")), None); let req = HttpRequest::builder() .uri("/api/status") .header("authorization", "Bearer secret123") .body(Body::empty()) .unwrap(); let resp = app.oneshot(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); } #[tokio::test] async fn wrong_token_returns_401() { let pool = crate::db::connect_in_memory().await.unwrap(); let app = router(pool, test_config(Some("secret123")), None); let req = HttpRequest::builder() .uri("/api/status") .header("authorization", "Bearer wrong-token") .body(Body::empty()) .unwrap(); let resp = app.oneshot(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); } #[tokio::test] async fn missing_header_returns_401() { let pool = crate::db::connect_in_memory().await.unwrap(); let app = router(pool, test_config(Some("secret123")), None); let req = HttpRequest::builder() .uri("/api/status") .body(Body::empty()) .unwrap(); let resp = app.oneshot(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); } #[tokio::test] async fn malformed_header_returns_401() { let pool = crate::db::connect_in_memory().await.unwrap(); let app = router(pool, test_config(Some("secret123")), None); let req = HttpRequest::builder() .uri("/api/status") .header("authorization", "Basic dXNlcjpwYXNz") .body(Body::empty()) .unwrap(); let resp = app.oneshot(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); } #[test] fn rate_limiter_allows_within_limit() { let limiter = RateLimiter::new(3, std::time::Duration::from_secs(60)); assert!(limiter.try_acquire()); assert!(limiter.try_acquire()); assert!(limiter.try_acquire()); } #[test] fn rate_limiter_blocks_over_limit() { let limiter = RateLimiter::new(2, std::time::Duration::from_secs(60)); assert!(limiter.try_acquire()); assert!(limiter.try_acquire()); assert!(!limiter.try_acquire()); } #[tokio::test] async fn rate_limiter_resets_after_window() { let limiter = RateLimiter::new(1, std::time::Duration::from_millis(10)); assert!(limiter.try_acquire()); assert!(!limiter.try_acquire()); tokio::time::sleep(std::time::Duration::from_millis(15)).await; assert!(limiter.try_acquire()); } #[test] fn rate_limiter_counter_starts_at_one() { let limiter = RateLimiter::new(1, std::time::Duration::from_millis(0)); // First call resets window (counter stored as 1), returns true assert!(limiter.try_acquire()); // Window has already expired (0ms), so next call also resets assert!(limiter.try_acquire()); } }