//! SQLite persistence — schema, health checks, test runs, and peer data. //! //! Uses a migration versioning system: each schema change is a numbered migration //! stored in [`MIGRATIONS`]. On startup, [`run_migrations`] checks the current //! version and runs any pending migrations. Existing databases (pre-migration) //! are detected by the presence of the `health_checks` table and marked as v1. use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; use std::path::Path; use std::str::FromStr; use tracing::{info, instrument}; use crate::error::Result; use crate::types::{BackupCheckResult, CorsCheckResult, DnsCheckResult, HealthDetails, HealthSnapshot, HealthStatus, TestDetail, TestRun, TestRunId, TestSummary, TlsStatus, WhoisResult}; /// Each migration is a (version, description, SQL) tuple. Versions start at 1. /// The SQL may contain multiple statements separated by semicolons. const MIGRATIONS: &[(i64, &str, &str)] = &[ (1, "initial schema", r#" CREATE TABLE IF NOT EXISTS health_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, status TEXT NOT NULL, checked_at TEXT NOT NULL, response_time_ms INTEGER NOT NULL, details_json TEXT, error TEXT ); CREATE TABLE IF NOT EXISTS test_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, started_at TEXT NOT NULL, finished_at TEXT, duration_secs INTEGER, exit_code INTEGER, passed INTEGER NOT NULL, summary_json TEXT NOT NULL, raw_output TEXT NOT NULL, filter TEXT ); CREATE TABLE IF NOT EXISTS peer_identities ( peer_name TEXT PRIMARY KEY, instance_id TEXT NOT NULL, first_seen TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS peer_heartbeats ( id INTEGER PRIMARY KEY AUTOINCREMENT, peer_name TEXT NOT NULL, status TEXT NOT NULL, latency_ms INTEGER NOT NULL, checked_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_health_checks_target_id ON health_checks(target, id DESC); CREATE INDEX IF NOT EXISTS idx_health_checks_target_checked ON health_checks(target, checked_at); CREATE INDEX IF NOT EXISTS idx_test_runs_target_id ON test_runs(target, id DESC); CREATE INDEX IF NOT EXISTS idx_peer_heartbeats_peer_id ON peer_heartbeats(peer_name, id DESC); "#), (2, "add alerts table", r#" CREATE TABLE IF NOT EXISTS alerts ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, alert_type TEXT NOT NULL, from_status TEXT, to_status TEXT, sent_at TEXT NOT NULL, error TEXT ); CREATE INDEX IF NOT EXISTS idx_alerts_target_sent ON alerts(target, sent_at); "#), (3, "add tls_checks table", r#" CREATE TABLE IF NOT EXISTS tls_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, host TEXT NOT NULL, valid INTEGER NOT NULL, days_remaining INTEGER NOT NULL, not_before TEXT NOT NULL, not_after TEXT NOT NULL, subject TEXT NOT NULL, issuer TEXT NOT NULL, checked_at TEXT NOT NULL, error TEXT ); CREATE INDEX IF NOT EXISTS idx_tls_checks_target_id ON tls_checks(target, id DESC); "#), (4, "add incidents table", r#" CREATE TABLE IF NOT EXISTS incidents ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, started_at TEXT NOT NULL, ended_at TEXT, duration_secs INTEGER, from_status TEXT NOT NULL, to_status TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_incidents_target_id ON incidents(target, id DESC); "#), (5, "add route_checks table", r#" CREATE TABLE IF NOT EXISTS route_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, path TEXT NOT NULL, status_code INTEGER NOT NULL, ok INTEGER NOT NULL, response_time_ms INTEGER NOT NULL, checked_at TEXT NOT NULL, error TEXT ); CREATE INDEX IF NOT EXISTS idx_route_checks_target_path ON route_checks(target, path, id DESC); CREATE INDEX IF NOT EXISTS idx_route_checks_target ON route_checks(target, checked_at DESC); "#), (6, "add dns_checks and whois_checks tables", r#" CREATE TABLE IF NOT EXISTS dns_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, name TEXT NOT NULL, record_type TEXT NOT NULL, expected TEXT NOT NULL, actual TEXT NOT NULL, matches INTEGER NOT NULL, checked_at TEXT NOT NULL, error TEXT ); CREATE INDEX IF NOT EXISTS idx_dns_checks_target ON dns_checks(target, name, id DESC); CREATE TABLE IF NOT EXISTS whois_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, domain TEXT NOT NULL, registrar TEXT, expiry_date TEXT, days_remaining INTEGER, nameservers TEXT, checked_at TEXT NOT NULL, error TEXT ); CREATE INDEX IF NOT EXISTS idx_whois_checks_target ON whois_checks(target, id DESC); "#), (7, "add test_details table", r#" CREATE TABLE IF NOT EXISTS test_details ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER NOT NULL REFERENCES test_runs(id) ON DELETE CASCADE, test_name TEXT NOT NULL, passed INTEGER NOT NULL, duration_ms INTEGER ); CREATE INDEX IF NOT EXISTS idx_test_details_run_id ON test_details(run_id); CREATE INDEX IF NOT EXISTS idx_test_details_name ON test_details(test_name, run_id DESC); "#), (8, "add cors_checks table", r#" CREATE TABLE IF NOT EXISTS cors_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, url TEXT NOT NULL, origin TEXT NOT NULL, method TEXT NOT NULL, passes INTEGER NOT NULL, checked_at TEXT NOT NULL, error TEXT ); CREATE INDEX IF NOT EXISTS idx_cors_target_id ON cors_checks(target, id DESC); "#), (9, "add backup_checks table", r#" CREATE TABLE IF NOT EXISTS backup_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, target TEXT NOT NULL, database_name TEXT NOT NULL, status TEXT NOT NULL, last_backup_at TEXT, size_bytes INTEGER, age_hours INTEGER, checked_at TEXT NOT NULL, error TEXT ); CREATE INDEX IF NOT EXISTS idx_backup_checks_target ON backup_checks(target, id DESC); "#), ]; #[instrument(skip_all)] pub async fn connect(path: &Path) -> Result { let opts = SqliteConnectOptions::from_str(&format!("sqlite:{}", path.display()))? .create_if_missing(true) .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); let pool = SqlitePoolOptions::new() .max_connections(5) .connect_with(opts) .await?; run_migrations(&pool).await?; Ok(pool) } #[instrument(skip_all)] pub async fn connect_in_memory() -> Result { let opts = SqliteConnectOptions::from_str("sqlite::memory:")?; let pool = SqlitePoolOptions::new() .max_connections(1) .connect_with(opts) .await?; run_migrations(&pool).await?; Ok(pool) } /// Run pending schema migrations. Detects pre-migration databases by checking /// for existing tables and stamps them as version 1 without re-running. #[instrument(skip_all)] pub async fn run_migrations(pool: &SqlitePool) -> Result<()> { // Ensure the schema_version table exists sqlx::query( "CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER NOT NULL, description TEXT NOT NULL, applied_at TEXT NOT NULL )", ) .execute(pool) .await?; let current_version = get_schema_version(pool).await?; // Detect pre-migration databases: if schema_version is empty but tables exist, // this is an existing database that predates the migration system. if current_version == 0 && has_existing_tables(pool).await? { info!("detected pre-migration database, stamping as version 1"); stamp_version(pool, 1, "initial schema (pre-existing)").await?; // Run remaining migrations (2+) if any for &(version, description, sql) in MIGRATIONS { if version > 1 { run_one_migration(pool, version, description, sql).await?; } } return Ok(()); } // Run all migrations newer than current version for &(version, description, sql) in MIGRATIONS { if version > current_version { run_one_migration(pool, version, description, sql).await?; } } Ok(()) } /// Get the current schema version (0 if no migrations have been applied). #[instrument(skip_all)] pub async fn get_schema_version(pool: &SqlitePool) -> Result { let row = sqlx::query_as::<_, (i64,)>( "SELECT COALESCE(MAX(version), 0) FROM schema_version", ) .fetch_one(pool) .await?; Ok(row.0) } /// Check whether the database has existing tables from before the migration system. async fn has_existing_tables(pool: &SqlitePool) -> Result { let row = sqlx::query_as::<_, (i64,)>( "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'health_checks'", ) .fetch_one(pool) .await?; Ok(row.0 > 0) } /// Execute a single migration's SQL and record it in schema_version. /// Wrapped in an explicit transaction so partial failures roll back cleanly. async fn run_one_migration( pool: &SqlitePool, version: i64, description: &str, sql: &str, ) -> Result<()> { info!(version, description, "running migration"); let mut tx = pool.begin().await?; // Execute each statement in the migration SQL for statement in sql.split(';') { let trimmed = statement.trim(); if !trimmed.is_empty() { sqlx::query(trimmed).execute(&mut *tx).await?; } } // Record the version inside the same transaction let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO schema_version (version, description, applied_at) VALUES (?, ?, ?)", ) .bind(version) .bind(description) .bind(&now) .execute(&mut *tx) .await?; tx.commit().await?; Ok(()) } /// Record a version in the schema_version table. async fn stamp_version(pool: &SqlitePool, version: i64, description: &str) -> Result<()> { let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO schema_version (version, description, applied_at) VALUES (?, ?, ?)", ) .bind(version) .bind(description) .bind(&now) .execute(pool) .await?; Ok(()) } // --- Health check queries --- #[instrument(skip_all)] pub async fn insert_health_check( pool: &SqlitePool, snapshot: &HealthSnapshot, ) -> Result { let status = snapshot.status.to_string(); let details_json = snapshot .details .as_ref() .map(|d| serde_json::to_string(d).unwrap_or_default()); let result = sqlx::query( "INSERT INTO health_checks (target, status, checked_at, response_time_ms, details_json, error) VALUES (?, ?, ?, ?, ?, ?)", ) .bind(&snapshot.target) .bind(&status) .bind(&snapshot.checked_at) .bind(snapshot.response_time_ms) .bind(&details_json) .bind(&snapshot.error) .execute(pool) .await?; Ok(result.last_insert_rowid()) } #[instrument(skip_all)] pub async fn get_health_history( pool: &SqlitePool, target: Option<&str>, limit: i64, ) -> Result> { let rows = match target { Some(t) => { sqlx::query_as::<_, HealthCheckRow>( "SELECT id, target, status, checked_at, response_time_ms, details_json, error FROM health_checks WHERE target = ? ORDER BY id DESC LIMIT ?", ) .bind(t) .bind(limit) .fetch_all(pool) .await? } None => { sqlx::query_as::<_, HealthCheckRow>( "SELECT id, target, status, checked_at, response_time_ms, details_json, error FROM health_checks ORDER BY id DESC LIMIT ?", ) .bind(limit) .fetch_all(pool) .await? } }; Ok(rows.into_iter().map(|r| r.into_snapshot()).collect()) } #[instrument(skip_all)] pub async fn get_latest_health( pool: &SqlitePool, target: &str, ) -> Result> { let row = sqlx::query_as::<_, HealthCheckRow>( "SELECT id, target, status, checked_at, response_time_ms, details_json, error FROM health_checks WHERE target = ? ORDER BY id DESC LIMIT 1", ) .bind(target) .fetch_optional(pool) .await?; Ok(row.map(|r| r.into_snapshot())) } // --- Test run queries --- #[instrument(skip_all)] pub async fn insert_test_run( pool: &SqlitePool, run: &TestRun, ) -> Result { let summary_json = serde_json::to_string(&run.summary).unwrap_or_default(); let result = sqlx::query( "INSERT INTO test_runs (target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&run.target) .bind(&run.started_at) .bind(&run.finished_at) .bind(run.duration_secs) .bind(run.exit_code) .bind(run.passed) .bind(&summary_json) .bind(&run.raw_output) .bind(&run.filter) .execute(pool) .await?; Ok(TestRunId(result.last_insert_rowid())) } #[instrument(skip_all)] pub async fn get_test_history( pool: &SqlitePool, target: Option<&str>, limit: i64, ) -> Result> { let rows = match target { Some(t) => { sqlx::query_as::<_, TestRunRow>( "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter FROM test_runs WHERE target = ? ORDER BY id DESC LIMIT ?", ) .bind(t) .bind(limit) .fetch_all(pool) .await? } None => { sqlx::query_as::<_, TestRunRow>( "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter FROM test_runs ORDER BY id DESC LIMIT ?", ) .bind(limit) .fetch_all(pool) .await? } }; Ok(rows.into_iter().map(|r| r.into_test_run()).collect()) } #[instrument(skip_all)] pub async fn get_latest_test_run( pool: &SqlitePool, target: &str, ) -> Result> { let row = sqlx::query_as::<_, TestRunRow>( "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter FROM test_runs WHERE target = ? ORDER BY id DESC LIMIT 1", ) .bind(target) .fetch_optional(pool) .await?; Ok(row.map(|r| r.into_test_run())) } // --- Test detail queries --- /// Insert per-test results for a given test run. #[instrument(skip_all)] pub async fn insert_test_details( pool: &SqlitePool, run_id: TestRunId, details: &[TestDetail], ) -> Result<()> { for detail in details { sqlx::query( "INSERT INTO test_details (run_id, test_name, passed) VALUES (?, ?, ?)", ) .bind(run_id.0) .bind(&detail.test_name) .bind(detail.passed) .execute(pool) .await?; } Ok(()) } /// Find tests that passed in the previous run but failed in this one (regressions). #[instrument(skip_all)] pub async fn get_test_regressions( pool: &SqlitePool, target: &str, current_run_id: TestRunId, ) -> Result> { // Find the run immediately before this one for the same target let prev_run = sqlx::query_as::<_, (i64,)>( "SELECT id FROM test_runs WHERE target = ? AND id < ? ORDER BY id DESC LIMIT 1", ) .bind(target) .bind(current_run_id.0) .fetch_optional(pool) .await?; let Some((prev_id,)) = prev_run else { return Ok(vec![]); }; // Tests that passed in prev run but failed in current run let rows = sqlx::query_as::<_, (String,)>( "SELECT curr.test_name FROM test_details curr INNER JOIN test_details prev ON prev.test_name = curr.test_name AND prev.run_id = ? WHERE curr.run_id = ? AND curr.passed = 0 AND prev.passed = 1", ) .bind(prev_id) .bind(current_run_id.0) .fetch_all(pool) .await?; Ok(rows.into_iter().map(|r| r.0).collect()) } /// Get test duration history for a target (most recent first). #[instrument(skip_all)] pub async fn get_test_durations( pool: &SqlitePool, target: &str, limit: i64, ) -> Result> { let rows = sqlx::query_as::<_, (String, i64)>( "SELECT started_at, duration_secs FROM test_runs WHERE target = ? AND duration_secs IS NOT NULL ORDER BY id DESC LIMIT ?", ) .bind(target) .bind(limit) .fetch_all(pool) .await?; Ok(rows) } /// Get the version from the health check closest to (but before) a given timestamp. #[instrument(skip_all)] pub async fn get_version_at_time( pool: &SqlitePool, target: &str, before_rfc3339: &str, ) -> Result> { let row = sqlx::query_as::<_, (Option,)>( "SELECT details_json FROM health_checks WHERE target = ? AND checked_at <= ? ORDER BY checked_at DESC LIMIT 1", ) .bind(target) .bind(before_rfc3339) .fetch_optional(pool) .await?; let version = row .and_then(|r| r.0) .and_then(|json_str| serde_json::from_str::(&json_str).ok()) .and_then(|json| json.get("version").and_then(|v| v.as_str()).map(String::from)); Ok(version) } /// Calculate uptime percentage for a target over the given number of hours. /// Returns the percentage of health checks with "operational" status. #[instrument(skip_all)] pub async fn get_uptime_percent( pool: &SqlitePool, target: &str, hours: i64, ) -> Result> { let cutoff = chrono::Utc::now() - chrono::Duration::hours(hours); let cutoff_str = cutoff.to_rfc3339(); let row = sqlx::query_as::<_, (i64, i64)>( "SELECT COUNT(*) as total, SUM(CASE WHEN status = 'operational' THEN 1 ELSE 0 END) as operational FROM health_checks WHERE target = ? AND checked_at >= ?", ) .bind(target) .bind(&cutoff_str) .fetch_one(pool) .await?; if row.0 == 0 { Ok(None) } else { Ok(Some(row.1 as f64 / row.0 as f64 * 100.0)) } } // --- Latency trending queries --- /// Fetch all response times for a target since a given timestamp, ordered ASC. #[instrument(skip_all)] pub async fn get_response_times( pool: &SqlitePool, target: &str, since_rfc3339: &str, ) -> Result> { let rows = sqlx::query_as::<_, (String, i64)>( "SELECT checked_at, response_time_ms FROM health_checks WHERE target = ? AND checked_at >= ? ORDER BY checked_at ASC", ) .bind(target) .bind(since_rfc3339) .fetch_all(pool) .await?; Ok(rows) } /// Fetch the last N response times for **operational** checks only (most recent first). #[instrument(skip_all)] pub async fn get_recent_response_times( pool: &SqlitePool, target: &str, count: i64, ) -> Result> { let rows = sqlx::query_as::<_, (i64,)>( "SELECT response_time_ms FROM health_checks WHERE target = ? AND status = 'operational' ORDER BY id DESC LIMIT ?", ) .bind(target) .bind(count) .fetch_all(pool) .await?; Ok(rows.into_iter().map(|r| r.0).collect()) } // --- Alert queries --- #[derive(Debug, sqlx::FromRow)] pub struct AlertRow { pub id: i64, pub target: String, pub alert_type: String, pub from_status: Option, pub to_status: Option, pub sent_at: String, pub error: Option, } #[instrument(skip_all)] pub async fn insert_alert( pool: &SqlitePool, target: &str, alert_type: &str, from_status: Option<&str>, to_status: Option<&str>, error: Option<&str>, ) -> Result { let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query( "INSERT INTO alerts (target, alert_type, from_status, to_status, sent_at, error) VALUES (?, ?, ?, ?, ?, ?)", ) .bind(target) .bind(alert_type) .bind(from_status) .bind(to_status) .bind(&now) .bind(error) .execute(pool) .await?; Ok(result.last_insert_rowid()) } #[instrument(skip_all)] pub async fn get_latest_alert_for_target( pool: &SqlitePool, target: &str, ) -> Result> { Ok(sqlx::query_as::<_, AlertRow>( "SELECT id, target, alert_type, from_status, to_status, sent_at, error FROM alerts WHERE target = ? AND alert_type NOT LIKE '%recovery%' ORDER BY id DESC LIMIT 1", ) .bind(target) .fetch_optional(pool) .await?) } // --- TLS check queries --- #[derive(Debug, sqlx::FromRow, serde::Serialize)] pub struct TlsCheckRow { pub id: i64, pub target: String, pub host: String, pub valid: bool, pub days_remaining: i64, pub not_before: String, pub not_after: String, pub subject: String, pub issuer: String, pub checked_at: String, pub error: Option, } #[instrument(skip_all)] pub async fn insert_tls_check( pool: &SqlitePool, status: &TlsStatus, ) -> Result { let result = sqlx::query( "INSERT INTO tls_checks (target, host, valid, days_remaining, not_before, not_after, subject, issuer, checked_at, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&status.target) .bind(&status.host) .bind(status.valid) .bind(status.days_remaining) .bind(&status.not_before) .bind(&status.not_after) .bind(&status.subject) .bind(&status.issuer) .bind(&status.checked_at) .bind(&status.error) .execute(pool) .await?; Ok(result.last_insert_rowid()) } #[instrument(skip_all)] pub async fn get_latest_tls_check( pool: &SqlitePool, target: &str, ) -> Result> { Ok(sqlx::query_as::<_, TlsCheckRow>( "SELECT id, target, host, valid, days_remaining, not_before, not_after, subject, issuer, checked_at, error FROM tls_checks WHERE target = ? ORDER BY id DESC LIMIT 1", ) .bind(target) .fetch_optional(pool) .await?) } // --- Incident queries --- #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] pub struct IncidentRow { pub id: i64, pub target: String, pub started_at: String, pub ended_at: Option, pub duration_secs: Option, pub from_status: String, pub to_status: String, } #[instrument(skip_all)] pub async fn insert_incident( pool: &SqlitePool, target: &str, from_status: &str, to_status: &str, ) -> Result { let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query( "INSERT INTO incidents (target, started_at, from_status, to_status) VALUES (?, ?, ?, ?)", ) .bind(target) .bind(&now) .bind(from_status) .bind(to_status) .execute(pool) .await?; Ok(result.last_insert_rowid()) } #[instrument(skip_all)] pub async fn close_open_incidents( pool: &SqlitePool, target: &str, ) -> Result { let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query( "UPDATE incidents SET ended_at = ?, duration_secs = CAST((julianday(?) - julianday(started_at)) * 86400 AS INTEGER) WHERE target = ? AND ended_at IS NULL", ) .bind(&now) .bind(&now) .bind(target) .execute(pool) .await?; Ok(result.rows_affected()) } #[instrument(skip_all)] pub async fn get_open_incident( pool: &SqlitePool, target: &str, ) -> Result> { Ok(sqlx::query_as::<_, IncidentRow>( "SELECT id, target, started_at, ended_at, duration_secs, from_status, to_status FROM incidents WHERE target = ? AND ended_at IS NULL ORDER BY id DESC LIMIT 1", ) .bind(target) .fetch_optional(pool) .await?) } #[instrument(skip_all)] pub async fn get_recent_incidents( pool: &SqlitePool, target: &str, limit: i64, ) -> Result> { Ok(sqlx::query_as::<_, IncidentRow>( "SELECT id, target, started_at, ended_at, duration_secs, from_status, to_status FROM incidents WHERE target = ? ORDER BY id DESC LIMIT ?", ) .bind(target) .bind(limit) .fetch_all(pool) .await?) } // --- Route check queries --- #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] pub struct RouteCheckRow { pub id: i64, pub target: String, pub path: String, pub status_code: i64, pub ok: bool, pub response_time_ms: i64, pub checked_at: String, pub error: Option, } #[instrument(skip_all)] pub async fn insert_route_check( pool: &SqlitePool, result: &crate::checks::routes::RouteCheckResult, ) -> Result { let row = sqlx::query( "INSERT INTO route_checks (target, path, status_code, ok, response_time_ms, checked_at, error) VALUES (?, ?, ?, ?, ?, ?, ?)", ) .bind(&result.target) .bind(&result.path) .bind(result.status_code as i64) .bind(result.ok) .bind(result.response_time_ms) .bind(&result.checked_at) .bind(&result.error) .execute(pool) .await?; Ok(row.last_insert_rowid()) } /// Get the latest route check per path for a target. #[instrument(skip_all)] pub async fn get_latest_route_checks( pool: &SqlitePool, target: &str, ) -> Result> { Ok(sqlx::query_as::<_, RouteCheckRow>( "SELECT r.id, r.target, r.path, r.status_code, r.ok, r.response_time_ms, r.checked_at, r.error FROM route_checks r INNER JOIN (SELECT path, MAX(id) as max_id FROM route_checks WHERE target = ? GROUP BY path) latest ON r.id = latest.max_id ORDER BY r.path", ) .bind(target) .fetch_all(pool) .await?) } // --- DNS check queries --- #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] pub struct DnsCheckRow { pub id: i64, pub target: String, pub name: String, pub record_type: String, pub expected: String, pub actual: String, pub matches: bool, pub checked_at: String, pub error: Option, } #[instrument(skip_all)] pub async fn insert_dns_check( pool: &SqlitePool, result: &DnsCheckResult, ) -> Result { let expected = serde_json::to_string(&result.expected).unwrap_or_default(); let actual = serde_json::to_string(&result.actual).unwrap_or_default(); let record_type_str = result.record_type.to_string(); let row = sqlx::query( "INSERT INTO dns_checks (target, name, record_type, expected, actual, matches, checked_at, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&result.target) .bind(&result.name) .bind(&record_type_str) .bind(&expected) .bind(&actual) .bind(result.matches) .bind(&result.checked_at) .bind(&result.error) .execute(pool) .await?; Ok(row.last_insert_rowid()) } /// Get the latest DNS check per (name, record_type) for a target. #[instrument(skip_all)] pub async fn get_latest_dns_checks( pool: &SqlitePool, target: &str, ) -> Result> { Ok(sqlx::query_as::<_, DnsCheckRow>( "SELECT d.id, d.target, d.name, d.record_type, d.expected, d.actual, d.matches, d.checked_at, d.error FROM dns_checks d INNER JOIN (SELECT name, record_type, MAX(id) as max_id FROM dns_checks WHERE target = ? GROUP BY name, record_type) latest ON d.id = latest.max_id ORDER BY d.name, d.record_type", ) .bind(target) .fetch_all(pool) .await?) } // --- WHOIS check queries --- #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] pub struct WhoisCheckRow { pub id: i64, pub target: String, pub domain: String, pub registrar: Option, pub expiry_date: Option, pub days_remaining: Option, pub nameservers: Option, pub checked_at: String, pub error: Option, } #[instrument(skip_all)] pub async fn insert_whois_check( pool: &SqlitePool, result: &WhoisResult, ) -> Result { let nameservers = serde_json::to_string(&result.nameservers).unwrap_or_default(); let row = sqlx::query( "INSERT INTO whois_checks (target, domain, registrar, expiry_date, days_remaining, nameservers, checked_at, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&result.target) .bind(&result.domain) .bind(&result.registrar) .bind(&result.expiry_date) .bind(result.days_remaining) .bind(&nameservers) .bind(&result.checked_at) .bind(&result.error) .execute(pool) .await?; Ok(row.last_insert_rowid()) } #[instrument(skip_all)] pub async fn get_latest_whois_check( pool: &SqlitePool, target: &str, ) -> Result> { Ok(sqlx::query_as::<_, WhoisCheckRow>( "SELECT id, target, domain, registrar, expiry_date, days_remaining, nameservers, checked_at, error FROM whois_checks WHERE target = ? ORDER BY id DESC LIMIT 1", ) .bind(target) .fetch_optional(pool) .await?) } // --- CORS check queries --- #[instrument(skip_all)] pub async fn insert_cors_check( pool: &SqlitePool, result: &CorsCheckResult, ) -> Result { let row = sqlx::query( "INSERT INTO cors_checks (target, url, origin, method, passes, checked_at, error) VALUES (?, ?, ?, ?, ?, ?, ?)", ) .bind(&result.target) .bind(&result.url) .bind(&result.origin) .bind(&result.method) .bind(result.passes) .bind(&result.checked_at) .bind(&result.error) .execute(pool) .await?; Ok(row.last_insert_rowid()) } /// Get the latest CORS check per URL for a target. #[instrument(skip_all)] pub async fn get_latest_cors_checks( pool: &SqlitePool, target: &str, ) -> Result> { let rows = sqlx::query_as::<_, (String, String, String, String, bool, String, Option)>( "SELECT c.target, c.url, c.origin, c.method, c.passes, c.checked_at, c.error FROM cors_checks c INNER JOIN (SELECT url, MAX(id) as max_id FROM cors_checks WHERE target = ? GROUP BY url) latest ON c.id = latest.max_id ORDER BY c.url", ) .bind(target) .fetch_all(pool) .await?; Ok(rows .into_iter() .map(|(target, url, origin, method, passes, checked_at, error)| CorsCheckResult { target, url, origin, method, passes, checked_at, error, }) .collect()) } // --- Backup check queries --- #[instrument(skip_all)] pub async fn insert_backup_check( pool: &SqlitePool, result: &BackupCheckResult, ) -> Result { let row = sqlx::query( "INSERT INTO backup_checks (target, database_name, status, last_backup_at, size_bytes, age_hours, checked_at, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&result.target) .bind(&result.database_name) .bind(&result.status) .bind(&result.last_backup_at) .bind(result.size_bytes) .bind(result.age_hours) .bind(&result.checked_at) .bind(&result.error) .execute(pool) .await?; Ok(row.last_insert_rowid()) } /// Get the latest backup check for a specific target and database. #[instrument(skip_all)] pub async fn get_latest_backup_check( pool: &SqlitePool, target: &str, database_name: &str, ) -> Result> { Ok(sqlx::query_as::<_, BackupCheckRow>( "SELECT id, target, database_name, status, last_backup_at, size_bytes, age_hours, checked_at, error FROM backup_checks WHERE target = ? AND database_name = ? ORDER BY id DESC LIMIT 1", ) .bind(target) .bind(database_name) .fetch_optional(pool) .await?) } #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] pub struct BackupCheckRow { pub id: i64, pub target: String, pub database_name: String, pub status: String, pub last_backup_at: Option, pub size_bytes: Option, pub age_hours: Option, pub checked_at: String, pub error: Option, } // --- Stale data cleanup --- /// Delete route_checks for paths no longer in the config. #[instrument(skip_all)] pub async fn prune_stale_routes( pool: &SqlitePool, target: &str, expected_routes: &[String], ) -> Result { if expected_routes.is_empty() { // No configured routes — delete all route checks for this target let result = sqlx::query("DELETE FROM route_checks WHERE target = ?") .bind(target) .execute(pool) .await?; return Ok(result.rows_affected()); } // Build placeholders for the IN clause let placeholders: Vec<&str> = expected_routes.iter().map(|_| "?").collect(); let sql = format!( "DELETE FROM route_checks WHERE target = ? AND path NOT IN ({})", placeholders.join(", ") ); let mut query = sqlx::query(&sql).bind(target); for route in expected_routes { query = query.bind(route); } let result = query.execute(pool).await?; Ok(result.rows_affected()) } /// Delete dns_checks for (name, record_type) pairs no longer in the config. #[instrument(skip_all)] pub async fn prune_stale_dns( pool: &SqlitePool, target: &str, expected_dns: &[(String, String)], ) -> Result { if expected_dns.is_empty() { // No configured DNS records — delete all DNS checks for this target let result = sqlx::query("DELETE FROM dns_checks WHERE target = ?") .bind(target) .execute(pool) .await?; return Ok(result.rows_affected()); } // Build a compound condition: keep rows matching any configured (name, record_type) pair let conditions: Vec = expected_dns .iter() .map(|_| "(name = ? AND record_type = ?)".to_string()) .collect(); let sql = format!( "DELETE FROM dns_checks WHERE target = ? AND NOT ({})", conditions.join(" OR ") ); let mut query = sqlx::query(&sql).bind(target); for (name, record_type) in expected_dns { query = query.bind(name).bind(record_type); } let result = query.execute(pool).await?; Ok(result.rows_affected()) } // --- Maintenance --- /// Prune result with counts for each table. pub struct PruneResult { pub health: u64, pub tests: u64, pub test_details: u64, pub heartbeats: u64, pub alerts: u64, pub tls: u64, pub incidents: u64, pub routes: u64, pub dns: u64, pub whois: u64, pub backups: u64, } /// Delete records older than `days` from all tables. /// Only closed incidents (with a non-NULL `ended_at`) are pruned. #[instrument(skip_all)] pub async fn prune_old_records( pool: &SqlitePool, days: i64, ) -> Result { // Guard: days <= 0 would set cutoff to now (or the future), deleting // everything. Treat this as a no-op instead. if days <= 0 { return Ok(PruneResult { health: 0, tests: 0, test_details: 0, heartbeats: 0, alerts: 0, tls: 0, incidents: 0, routes: 0, dns: 0, whois: 0, backups: 0 }); } let cutoff = chrono::Utc::now() - chrono::Duration::days(days); let cutoff_str = cutoff.to_rfc3339(); let health_result = sqlx::query("DELETE FROM health_checks WHERE checked_at < ?") .bind(&cutoff_str) .execute(pool) .await?; let test_result = sqlx::query("DELETE FROM test_runs WHERE started_at < ?") .bind(&cutoff_str) .execute(pool) .await?; // Prune orphaned test_details (run was deleted above). let test_details_result = sqlx::query( "DELETE FROM test_details WHERE run_id NOT IN (SELECT id FROM test_runs)", ) .execute(pool) .await?; let peer_hb_result = sqlx::query("DELETE FROM peer_heartbeats WHERE checked_at < ?") .bind(&cutoff_str) .execute(pool) .await?; let alerts_result = sqlx::query("DELETE FROM alerts WHERE sent_at < ?") .bind(&cutoff_str) .execute(pool) .await?; let tls_result = sqlx::query("DELETE FROM tls_checks WHERE checked_at < ?") .bind(&cutoff_str) .execute(pool) .await?; let incidents_result = sqlx::query("DELETE FROM incidents WHERE ended_at IS NOT NULL AND ended_at < ?") .bind(&cutoff_str) .execute(pool) .await?; let routes_result = sqlx::query("DELETE FROM route_checks WHERE checked_at < ?") .bind(&cutoff_str) .execute(pool) .await?; let dns_result = sqlx::query("DELETE FROM dns_checks WHERE checked_at < ?") .bind(&cutoff_str) .execute(pool) .await?; let whois_result = sqlx::query("DELETE FROM whois_checks WHERE checked_at < ?") .bind(&cutoff_str) .execute(pool) .await?; let backups_result = sqlx::query("DELETE FROM backup_checks WHERE checked_at < ?") .bind(&cutoff_str) .execute(pool) .await?; Ok(PruneResult { health: health_result.rows_affected(), tests: test_result.rows_affected(), test_details: test_details_result.rows_affected(), heartbeats: peer_hb_result.rows_affected(), alerts: alerts_result.rows_affected(), tls: tls_result.rows_affected(), incidents: incidents_result.rows_affected(), routes: routes_result.rows_affected(), dns: dns_result.rows_affected(), whois: whois_result.rows_affected(), backups: backups_result.rows_affected(), }) } // --- Peer identity queries --- /// Store a peer's identity (first-seen UUID). INSERT OR IGNORE — first wins. #[instrument(skip_all)] pub async fn store_peer_identity( pool: &SqlitePool, peer_name: &str, instance_id: &str, ) -> Result<()> { let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT OR IGNORE INTO peer_identities (peer_name, instance_id, first_seen) VALUES (?, ?, ?)", ) .bind(peer_name) .bind(instance_id) .bind(&now) .execute(pool) .await?; Ok(()) } /// Update a peer's identity (UUID changed, e.g. after reinstall). #[instrument(skip_all)] pub async fn update_peer_identity( pool: &SqlitePool, peer_name: &str, instance_id: &str, ) -> Result<()> { let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "UPDATE peer_identities SET instance_id = ?, first_seen = ? WHERE peer_name = ?", ) .bind(instance_id) .bind(&now) .bind(peer_name) .execute(pool) .await?; Ok(()) } /// Get a peer's first-seen instance ID. #[instrument(skip_all)] pub async fn get_peer_identity( pool: &SqlitePool, peer_name: &str, ) -> Result> { let row = sqlx::query_as::<_, (String,)>( "SELECT instance_id FROM peer_identities WHERE peer_name = ?", ) .bind(peer_name) .fetch_optional(pool) .await?; Ok(row.map(|r| r.0)) } // --- Peer heartbeat queries --- #[instrument(skip_all)] pub async fn insert_peer_heartbeat( pool: &SqlitePool, peer_name: &str, status: &str, latency_ms: i64, ) -> Result { let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query( "INSERT INTO peer_heartbeats (peer_name, status, latency_ms, checked_at) VALUES (?, ?, ?, ?)", ) .bind(peer_name) .bind(status) .bind(latency_ms) .bind(&now) .execute(pool) .await?; Ok(result.last_insert_rowid()) } #[instrument(skip_all)] pub async fn get_peer_heartbeat_history( pool: &SqlitePool, peer_name: &str, limit: i64, ) -> Result> { Ok(sqlx::query_as::<_, PeerHeartbeatRow>( "SELECT id, peer_name, status, latency_ms, checked_at FROM peer_heartbeats WHERE peer_name = ? ORDER BY id DESC LIMIT ?", ) .bind(peer_name) .bind(limit) .fetch_all(pool) .await?) } #[derive(Debug, sqlx::FromRow, serde::Serialize)] pub struct PeerHeartbeatRow { pub id: i64, pub peer_name: String, pub status: String, pub latency_ms: i64, pub checked_at: String, } // --- Internal row types --- #[derive(sqlx::FromRow)] struct HealthCheckRow { id: i64, target: String, status: String, checked_at: String, response_time_ms: i64, details_json: Option, error: Option, } impl HealthCheckRow { fn into_snapshot(self) -> HealthSnapshot { let status = self .status .parse::() .unwrap_or(HealthStatus::Error); let details = self .details_json .as_deref() .and_then(|s| serde_json::from_str::(s).ok()); HealthSnapshot { id: Some(self.id), target: self.target, status, checked_at: self.checked_at, response_time_ms: self.response_time_ms, details, error: self.error, } } } #[derive(sqlx::FromRow)] struct TestRunRow { id: i64, target: String, started_at: String, finished_at: Option, duration_secs: Option, exit_code: Option, passed: bool, summary_json: String, raw_output: String, filter: Option, } impl TestRunRow { fn into_test_run(self) -> TestRun { let summary = serde_json::from_str::(&self.summary_json).unwrap_or(TestSummary { steps: vec![], total_passed: None, total_failed: None, details: vec![], }); TestRun { id: Some(self.id), target: self.target, started_at: self.started_at, finished_at: self.finished_at, duration_secs: self.duration_secs, exit_code: self.exit_code, passed: self.passed, summary, raw_output: self.raw_output, filter: self.filter, } } }