Skip to main content

max / pom

40.5 KB · 1377 lines History Blame Raw
1 //! SQLite persistence — schema, health checks, test runs, and peer data.
2 //!
3 //! Uses a migration versioning system: each schema change is a numbered migration
4 //! stored in [`MIGRATIONS`]. On startup, [`run_migrations`] checks the current
5 //! version and runs any pending migrations. Existing databases (pre-migration)
6 //! are detected by the presence of the `health_checks` table and marked as v1.
7
8 use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
9 use std::path::Path;
10 use std::str::FromStr;
11 use tracing::{info, instrument};
12
13 use crate::error::Result;
14 use crate::types::{CorsCheckResult, DnsCheckResult, HealthDetails, HealthSnapshot, HealthStatus, TestDetail, TestRun, TestSummary, TlsStatus, WhoisResult};
15
16 /// Each migration is a (version, description, SQL) tuple. Versions start at 1.
17 /// The SQL may contain multiple statements separated by semicolons.
18 const MIGRATIONS: &[(i64, &str, &str)] = &[
19 (1, "initial schema", r#"
20 CREATE TABLE IF NOT EXISTS health_checks (
21 id INTEGER PRIMARY KEY AUTOINCREMENT,
22 target TEXT NOT NULL,
23 status TEXT NOT NULL,
24 checked_at TEXT NOT NULL,
25 response_time_ms INTEGER NOT NULL,
26 details_json TEXT,
27 error TEXT
28 );
29 CREATE TABLE IF NOT EXISTS test_runs (
30 id INTEGER PRIMARY KEY AUTOINCREMENT,
31 target TEXT NOT NULL,
32 started_at TEXT NOT NULL,
33 finished_at TEXT,
34 duration_secs INTEGER,
35 exit_code INTEGER,
36 passed INTEGER NOT NULL,
37 summary_json TEXT NOT NULL,
38 raw_output TEXT NOT NULL,
39 filter TEXT
40 );
41 CREATE TABLE IF NOT EXISTS peer_identities (
42 peer_name TEXT PRIMARY KEY,
43 instance_id TEXT NOT NULL,
44 first_seen TEXT NOT NULL
45 );
46 CREATE TABLE IF NOT EXISTS peer_heartbeats (
47 id INTEGER PRIMARY KEY AUTOINCREMENT,
48 peer_name TEXT NOT NULL,
49 status TEXT NOT NULL,
50 latency_ms INTEGER NOT NULL,
51 checked_at TEXT NOT NULL
52 );
53 CREATE INDEX IF NOT EXISTS idx_health_checks_target_id ON health_checks(target, id DESC);
54 CREATE INDEX IF NOT EXISTS idx_health_checks_target_checked ON health_checks(target, checked_at);
55 CREATE INDEX IF NOT EXISTS idx_test_runs_target_id ON test_runs(target, id DESC);
56 CREATE INDEX IF NOT EXISTS idx_peer_heartbeats_peer_id ON peer_heartbeats(peer_name, id DESC);
57 "#),
58 (2, "add alerts table", r#"
59 CREATE TABLE IF NOT EXISTS alerts (
60 id INTEGER PRIMARY KEY AUTOINCREMENT,
61 target TEXT NOT NULL,
62 alert_type TEXT NOT NULL,
63 from_status TEXT,
64 to_status TEXT,
65 sent_at TEXT NOT NULL,
66 error TEXT
67 );
68 CREATE INDEX IF NOT EXISTS idx_alerts_target_sent ON alerts(target, sent_at);
69 "#),
70 (3, "add tls_checks table", r#"
71 CREATE TABLE tls_checks (
72 id INTEGER PRIMARY KEY AUTOINCREMENT,
73 target TEXT NOT NULL,
74 host TEXT NOT NULL,
75 valid INTEGER NOT NULL,
76 days_remaining INTEGER NOT NULL,
77 not_before TEXT NOT NULL,
78 not_after TEXT NOT NULL,
79 subject TEXT NOT NULL,
80 issuer TEXT NOT NULL,
81 checked_at TEXT NOT NULL,
82 error TEXT
83 );
84 CREATE INDEX idx_tls_checks_target_id ON tls_checks(target, id DESC);
85 "#),
86 (4, "add incidents table", r#"
87 CREATE TABLE incidents (
88 id INTEGER PRIMARY KEY AUTOINCREMENT,
89 target TEXT NOT NULL,
90 started_at TEXT NOT NULL,
91 ended_at TEXT,
92 duration_secs INTEGER,
93 from_status TEXT NOT NULL,
94 to_status TEXT NOT NULL
95 );
96 CREATE INDEX idx_incidents_target_id ON incidents(target, id DESC);
97 "#),
98 (5, "add route_checks table", r#"
99 CREATE TABLE route_checks (
100 id INTEGER PRIMARY KEY AUTOINCREMENT,
101 target TEXT NOT NULL,
102 path TEXT NOT NULL,
103 status_code INTEGER NOT NULL,
104 ok INTEGER NOT NULL,
105 response_time_ms INTEGER NOT NULL,
106 checked_at TEXT NOT NULL,
107 error TEXT
108 );
109 CREATE INDEX idx_route_checks_target_path ON route_checks(target, path, id DESC);
110 CREATE INDEX idx_route_checks_target ON route_checks(target, checked_at DESC);
111 "#),
112 (6, "add dns_checks and whois_checks tables", r#"
113 CREATE TABLE dns_checks (
114 id INTEGER PRIMARY KEY AUTOINCREMENT,
115 target TEXT NOT NULL,
116 name TEXT NOT NULL,
117 record_type TEXT NOT NULL,
118 expected TEXT NOT NULL,
119 actual TEXT NOT NULL,
120 matches INTEGER NOT NULL,
121 checked_at TEXT NOT NULL,
122 error TEXT
123 );
124 CREATE INDEX idx_dns_checks_target ON dns_checks(target, name, id DESC);
125
126 CREATE TABLE whois_checks (
127 id INTEGER PRIMARY KEY AUTOINCREMENT,
128 target TEXT NOT NULL,
129 domain TEXT NOT NULL,
130 registrar TEXT,
131 expiry_date TEXT,
132 days_remaining INTEGER,
133 nameservers TEXT,
134 checked_at TEXT NOT NULL,
135 error TEXT
136 );
137 CREATE INDEX idx_whois_checks_target ON whois_checks(target, id DESC);
138 "#),
139 (7, "add test_details table", r#"
140 CREATE TABLE IF NOT EXISTS test_details (
141 id INTEGER PRIMARY KEY AUTOINCREMENT,
142 run_id INTEGER NOT NULL REFERENCES test_runs(id) ON DELETE CASCADE,
143 test_name TEXT NOT NULL,
144 passed INTEGER NOT NULL,
145 duration_ms INTEGER
146 );
147 CREATE INDEX IF NOT EXISTS idx_test_details_run_id ON test_details(run_id);
148 CREATE INDEX IF NOT EXISTS idx_test_details_name ON test_details(test_name, run_id DESC);
149 "#),
150 (8, "add cors_checks table", r#"
151 CREATE TABLE IF NOT EXISTS cors_checks (
152 id INTEGER PRIMARY KEY AUTOINCREMENT,
153 target TEXT NOT NULL,
154 url TEXT NOT NULL,
155 origin TEXT NOT NULL,
156 method TEXT NOT NULL,
157 passes INTEGER NOT NULL,
158 checked_at TEXT NOT NULL,
159 error TEXT
160 );
161 CREATE INDEX IF NOT EXISTS idx_cors_target_id ON cors_checks(target, id DESC);
162 "#),
163 ];
164
165 #[instrument(skip_all)]
166 pub async fn connect(path: &Path) -> Result<SqlitePool> {
167 let opts = SqliteConnectOptions::from_str(&format!("sqlite:{}", path.display()))?
168 .create_if_missing(true)
169 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
170
171 let pool = SqlitePoolOptions::new()
172 .max_connections(5)
173 .connect_with(opts)
174 .await?;
175
176 run_migrations(&pool).await?;
177 Ok(pool)
178 }
179
180 #[instrument(skip_all)]
181 pub async fn connect_in_memory() -> Result<SqlitePool> {
182 let opts = SqliteConnectOptions::from_str("sqlite::memory:")?;
183 let pool = SqlitePoolOptions::new()
184 .max_connections(1)
185 .connect_with(opts)
186 .await?;
187
188 run_migrations(&pool).await?;
189 Ok(pool)
190 }
191
192 /// Run pending schema migrations. Detects pre-migration databases by checking
193 /// for existing tables and stamps them as version 1 without re-running.
194 #[instrument(skip_all)]
195 pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
196 // Ensure the schema_version table exists
197 sqlx::query(
198 "CREATE TABLE IF NOT EXISTS schema_version (
199 version INTEGER NOT NULL,
200 description TEXT NOT NULL,
201 applied_at TEXT NOT NULL
202 )",
203 )
204 .execute(pool)
205 .await?;
206
207 let current_version = get_schema_version(pool).await?;
208
209 // Detect pre-migration databases: if schema_version is empty but tables exist,
210 // this is an existing database that predates the migration system.
211 if current_version == 0 && has_existing_tables(pool).await? {
212 info!("detected pre-migration database, stamping as version 1");
213 stamp_version(pool, 1, "initial schema (pre-existing)").await?;
214 // Run remaining migrations (2+) if any
215 for &(version, description, sql) in MIGRATIONS {
216 if version > 1 {
217 run_one_migration(pool, version, description, sql).await?;
218 }
219 }
220 return Ok(());
221 }
222
223 // Run all migrations newer than current version
224 for &(version, description, sql) in MIGRATIONS {
225 if version > current_version {
226 run_one_migration(pool, version, description, sql).await?;
227 }
228 }
229
230 Ok(())
231 }
232
233 /// Get the current schema version (0 if no migrations have been applied).
234 #[instrument(skip_all)]
235 pub async fn get_schema_version(pool: &SqlitePool) -> Result<i64> {
236 let row = sqlx::query_as::<_, (i64,)>(
237 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
238 )
239 .fetch_one(pool)
240 .await?;
241 Ok(row.0)
242 }
243
244 /// Check whether the database has existing tables from before the migration system.
245 async fn has_existing_tables(pool: &SqlitePool) -> Result<bool> {
246 let row = sqlx::query_as::<_, (i64,)>(
247 "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'health_checks'",
248 )
249 .fetch_one(pool)
250 .await?;
251 Ok(row.0 > 0)
252 }
253
254 /// Execute a single migration's SQL and record it in schema_version.
255 /// Wrapped in an explicit transaction so partial failures roll back cleanly.
256 async fn run_one_migration(
257 pool: &SqlitePool,
258 version: i64,
259 description: &str,
260 sql: &str,
261 ) -> Result<()> {
262 info!(version, description, "running migration");
263
264 let mut tx = pool.begin().await?;
265
266 // Execute each statement in the migration SQL
267 for statement in sql.split(';') {
268 let trimmed = statement.trim();
269 if !trimmed.is_empty() {
270 sqlx::query(trimmed).execute(&mut *tx).await?;
271 }
272 }
273
274 // Record the version inside the same transaction
275 let now = chrono::Utc::now().to_rfc3339();
276 sqlx::query(
277 "INSERT INTO schema_version (version, description, applied_at) VALUES (?, ?, ?)",
278 )
279 .bind(version)
280 .bind(description)
281 .bind(&now)
282 .execute(&mut *tx)
283 .await?;
284
285 tx.commit().await?;
286 Ok(())
287 }
288
289 /// Record a version in the schema_version table.
290 async fn stamp_version(pool: &SqlitePool, version: i64, description: &str) -> Result<()> {
291 let now = chrono::Utc::now().to_rfc3339();
292 sqlx::query(
293 "INSERT INTO schema_version (version, description, applied_at) VALUES (?, ?, ?)",
294 )
295 .bind(version)
296 .bind(description)
297 .bind(&now)
298 .execute(pool)
299 .await?;
300 Ok(())
301 }
302
303 // --- Health check queries ---
304
305 #[instrument(skip_all)]
306 pub async fn insert_health_check(
307 pool: &SqlitePool,
308 snapshot: &HealthSnapshot,
309 ) -> Result<i64> {
310 let status = snapshot.status.to_string();
311 let details_json = snapshot
312 .details
313 .as_ref()
314 .map(|d| serde_json::to_string(d).unwrap_or_default());
315
316 let result = sqlx::query(
317 "INSERT INTO health_checks (target, status, checked_at, response_time_ms, details_json, error)
318 VALUES (?, ?, ?, ?, ?, ?)",
319 )
320 .bind(&snapshot.target)
321 .bind(&status)
322 .bind(&snapshot.checked_at)
323 .bind(snapshot.response_time_ms)
324 .bind(&details_json)
325 .bind(&snapshot.error)
326 .execute(pool)
327 .await?;
328
329 Ok(result.last_insert_rowid())
330 }
331
332 #[instrument(skip_all)]
333 pub async fn get_health_history(
334 pool: &SqlitePool,
335 target: Option<&str>,
336 limit: i64,
337 ) -> Result<Vec<HealthSnapshot>> {
338 let rows = match target {
339 Some(t) => {
340 sqlx::query_as::<_, HealthCheckRow>(
341 "SELECT id, target, status, checked_at, response_time_ms, details_json, error
342 FROM health_checks WHERE target = ? ORDER BY id DESC LIMIT ?",
343 )
344 .bind(t)
345 .bind(limit)
346 .fetch_all(pool)
347 .await?
348 }
349 None => {
350 sqlx::query_as::<_, HealthCheckRow>(
351 "SELECT id, target, status, checked_at, response_time_ms, details_json, error
352 FROM health_checks ORDER BY id DESC LIMIT ?",
353 )
354 .bind(limit)
355 .fetch_all(pool)
356 .await?
357 }
358 };
359
360 Ok(rows.into_iter().map(|r| r.into_snapshot()).collect())
361 }
362
363 #[instrument(skip_all)]
364 pub async fn get_latest_health(
365 pool: &SqlitePool,
366 target: &str,
367 ) -> Result<Option<HealthSnapshot>> {
368 let row = sqlx::query_as::<_, HealthCheckRow>(
369 "SELECT id, target, status, checked_at, response_time_ms, details_json, error
370 FROM health_checks WHERE target = ? ORDER BY id DESC LIMIT 1",
371 )
372 .bind(target)
373 .fetch_optional(pool)
374 .await?;
375
376 Ok(row.map(|r| r.into_snapshot()))
377 }
378
379 // --- Test run queries ---
380
381 #[instrument(skip_all)]
382 pub async fn insert_test_run(
383 pool: &SqlitePool,
384 run: &TestRun,
385 ) -> Result<i64> {
386 let summary_json = serde_json::to_string(&run.summary).unwrap_or_default();
387
388 let result = sqlx::query(
389 "INSERT INTO test_runs (target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter)
390 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
391 )
392 .bind(&run.target)
393 .bind(&run.started_at)
394 .bind(&run.finished_at)
395 .bind(run.duration_secs)
396 .bind(run.exit_code)
397 .bind(run.passed)
398 .bind(&summary_json)
399 .bind(&run.raw_output)
400 .bind(&run.filter)
401 .execute(pool)
402 .await?;
403
404 Ok(result.last_insert_rowid())
405 }
406
407 #[instrument(skip_all)]
408 pub async fn get_test_history(
409 pool: &SqlitePool,
410 target: Option<&str>,
411 limit: i64,
412 ) -> Result<Vec<TestRun>> {
413 let rows = match target {
414 Some(t) => {
415 sqlx::query_as::<_, TestRunRow>(
416 "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter
417 FROM test_runs WHERE target = ? ORDER BY id DESC LIMIT ?",
418 )
419 .bind(t)
420 .bind(limit)
421 .fetch_all(pool)
422 .await?
423 }
424 None => {
425 sqlx::query_as::<_, TestRunRow>(
426 "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter
427 FROM test_runs ORDER BY id DESC LIMIT ?",
428 )
429 .bind(limit)
430 .fetch_all(pool)
431 .await?
432 }
433 };
434
435 Ok(rows.into_iter().map(|r| r.into_test_run()).collect())
436 }
437
438 #[instrument(skip_all)]
439 pub async fn get_latest_test_run(
440 pool: &SqlitePool,
441 target: &str,
442 ) -> Result<Option<TestRun>> {
443 let row = sqlx::query_as::<_, TestRunRow>(
444 "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter
445 FROM test_runs WHERE target = ? ORDER BY id DESC LIMIT 1",
446 )
447 .bind(target)
448 .fetch_optional(pool)
449 .await?;
450
451 Ok(row.map(|r| r.into_test_run()))
452 }
453
454 // --- Test detail queries ---
455
456 /// Insert per-test results for a given test run.
457 #[instrument(skip_all)]
458 pub async fn insert_test_details(
459 pool: &SqlitePool,
460 run_id: i64,
461 details: &[TestDetail],
462 ) -> Result<()> {
463 for detail in details {
464 sqlx::query(
465 "INSERT INTO test_details (run_id, test_name, passed) VALUES (?, ?, ?)",
466 )
467 .bind(run_id)
468 .bind(&detail.test_name)
469 .bind(detail.passed)
470 .execute(pool)
471 .await?;
472 }
473 Ok(())
474 }
475
476 /// Find tests that passed in the previous run but failed in this one (regressions).
477 #[instrument(skip_all)]
478 pub async fn get_test_regressions(
479 pool: &SqlitePool,
480 target: &str,
481 current_run_id: i64,
482 ) -> Result<Vec<String>> {
483 // Find the run immediately before this one for the same target
484 let prev_run = sqlx::query_as::<_, (i64,)>(
485 "SELECT id FROM test_runs
486 WHERE target = ? AND id < ?
487 ORDER BY id DESC LIMIT 1",
488 )
489 .bind(target)
490 .bind(current_run_id)
491 .fetch_optional(pool)
492 .await?;
493
494 let Some((prev_id,)) = prev_run else {
495 return Ok(vec![]);
496 };
497
498 // Tests that passed in prev run but failed in current run
499 let rows = sqlx::query_as::<_, (String,)>(
500 "SELECT curr.test_name FROM test_details curr
501 INNER JOIN test_details prev ON prev.test_name = curr.test_name AND prev.run_id = ?
502 WHERE curr.run_id = ? AND curr.passed = 0 AND prev.passed = 1",
503 )
504 .bind(prev_id)
505 .bind(current_run_id)
506 .fetch_all(pool)
507 .await?;
508
509 Ok(rows.into_iter().map(|r| r.0).collect())
510 }
511
512 /// Get test duration history for a target (most recent first).
513 #[instrument(skip_all)]
514 pub async fn get_test_durations(
515 pool: &SqlitePool,
516 target: &str,
517 limit: i64,
518 ) -> Result<Vec<(String, i64)>> {
519 let rows = sqlx::query_as::<_, (String, i64)>(
520 "SELECT started_at, duration_secs FROM test_runs
521 WHERE target = ? AND duration_secs IS NOT NULL
522 ORDER BY id DESC LIMIT ?",
523 )
524 .bind(target)
525 .bind(limit)
526 .fetch_all(pool)
527 .await?;
528 Ok(rows)
529 }
530
531 /// Get the version from the health check closest to (but before) a given timestamp.
532 #[instrument(skip_all)]
533 pub async fn get_version_at_time(
534 pool: &SqlitePool,
535 target: &str,
536 before_rfc3339: &str,
537 ) -> Result<Option<String>> {
538 let row = sqlx::query_as::<_, (Option<String>,)>(
539 "SELECT details_json FROM health_checks
540 WHERE target = ? AND checked_at <= ?
541 ORDER BY checked_at DESC LIMIT 1",
542 )
543 .bind(target)
544 .bind(before_rfc3339)
545 .fetch_optional(pool)
546 .await?;
547
548 let version = row
549 .and_then(|r| r.0)
550 .and_then(|json_str| serde_json::from_str::<serde_json::Value>(&json_str).ok())
551 .and_then(|json| json.get("version").and_then(|v| v.as_str()).map(String::from));
552
553 Ok(version)
554 }
555
556 /// Calculate uptime percentage for a target over the given number of hours.
557 /// Returns the percentage of health checks with "operational" status.
558 #[instrument(skip_all)]
559 pub async fn get_uptime_percent(
560 pool: &SqlitePool,
561 target: &str,
562 hours: i64,
563 ) -> Result<Option<f64>> {
564 let cutoff = chrono::Utc::now() - chrono::Duration::hours(hours);
565 let cutoff_str = cutoff.to_rfc3339();
566
567 let row = sqlx::query_as::<_, (i64, i64)>(
568 "SELECT
569 COUNT(*) as total,
570 SUM(CASE WHEN status = 'operational' THEN 1 ELSE 0 END) as operational
571 FROM health_checks
572 WHERE target = ? AND checked_at >= ?",
573 )
574 .bind(target)
575 .bind(&cutoff_str)
576 .fetch_one(pool)
577 .await?;
578
579 if row.0 == 0 {
580 Ok(None)
581 } else {
582 Ok(Some(row.1 as f64 / row.0 as f64 * 100.0))
583 }
584 }
585
586 // --- Latency trending queries ---
587
588 /// Fetch all response times for a target since a given timestamp, ordered ASC.
589 #[instrument(skip_all)]
590 pub async fn get_response_times(
591 pool: &SqlitePool,
592 target: &str,
593 since_rfc3339: &str,
594 ) -> Result<Vec<(String, i64)>> {
595 let rows = sqlx::query_as::<_, (String, i64)>(
596 "SELECT checked_at, response_time_ms FROM health_checks
597 WHERE target = ? AND checked_at >= ?
598 ORDER BY checked_at ASC",
599 )
600 .bind(target)
601 .bind(since_rfc3339)
602 .fetch_all(pool)
603 .await?;
604 Ok(rows)
605 }
606
607 /// Fetch the last N response times for **operational** checks only (most recent first).
608 #[instrument(skip_all)]
609 pub async fn get_recent_response_times(
610 pool: &SqlitePool,
611 target: &str,
612 count: i64,
613 ) -> Result<Vec<i64>> {
614 let rows = sqlx::query_as::<_, (i64,)>(
615 "SELECT response_time_ms FROM health_checks
616 WHERE target = ? AND status = 'operational'
617 ORDER BY id DESC LIMIT ?",
618 )
619 .bind(target)
620 .bind(count)
621 .fetch_all(pool)
622 .await?;
623 Ok(rows.into_iter().map(|r| r.0).collect())
624 }
625
626 // --- Alert queries ---
627
628 #[derive(Debug, sqlx::FromRow)]
629 pub struct AlertRow {
630 pub id: i64,
631 pub target: String,
632 pub alert_type: String,
633 pub from_status: Option<String>,
634 pub to_status: Option<String>,
635 pub sent_at: String,
636 pub error: Option<String>,
637 }
638
639 #[instrument(skip_all)]
640 pub async fn insert_alert(
641 pool: &SqlitePool,
642 target: &str,
643 alert_type: &str,
644 from_status: Option<&str>,
645 to_status: Option<&str>,
646 error: Option<&str>,
647 ) -> Result<i64> {
648 let now = chrono::Utc::now().to_rfc3339();
649 let result = sqlx::query(
650 "INSERT INTO alerts (target, alert_type, from_status, to_status, sent_at, error)
651 VALUES (?, ?, ?, ?, ?, ?)",
652 )
653 .bind(target)
654 .bind(alert_type)
655 .bind(from_status)
656 .bind(to_status)
657 .bind(&now)
658 .bind(error)
659 .execute(pool)
660 .await?;
661 Ok(result.last_insert_rowid())
662 }
663
664 #[instrument(skip_all)]
665 pub async fn get_latest_alert_for_target(
666 pool: &SqlitePool,
667 target: &str,
668 ) -> Result<Option<AlertRow>> {
669 Ok(sqlx::query_as::<_, AlertRow>(
670 "SELECT id, target, alert_type, from_status, to_status, sent_at, error
671 FROM alerts WHERE target = ? AND alert_type NOT LIKE '%recovery%'
672 ORDER BY id DESC LIMIT 1",
673 )
674 .bind(target)
675 .fetch_optional(pool)
676 .await?)
677 }
678
679 // --- TLS check queries ---
680
681 #[derive(Debug, sqlx::FromRow, serde::Serialize)]
682 pub struct TlsCheckRow {
683 pub id: i64,
684 pub target: String,
685 pub host: String,
686 pub valid: bool,
687 pub days_remaining: i64,
688 pub not_before: String,
689 pub not_after: String,
690 pub subject: String,
691 pub issuer: String,
692 pub checked_at: String,
693 pub error: Option<String>,
694 }
695
696 #[instrument(skip_all)]
697 pub async fn insert_tls_check(
698 pool: &SqlitePool,
699 status: &TlsStatus,
700 ) -> Result<i64> {
701 let result = sqlx::query(
702 "INSERT INTO tls_checks (target, host, valid, days_remaining, not_before, not_after, subject, issuer, checked_at, error)
703 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
704 )
705 .bind(&status.target)
706 .bind(&status.host)
707 .bind(status.valid)
708 .bind(status.days_remaining)
709 .bind(&status.not_before)
710 .bind(&status.not_after)
711 .bind(&status.subject)
712 .bind(&status.issuer)
713 .bind(&status.checked_at)
714 .bind(&status.error)
715 .execute(pool)
716 .await?;
717
718 Ok(result.last_insert_rowid())
719 }
720
721 #[instrument(skip_all)]
722 pub async fn get_latest_tls_check(
723 pool: &SqlitePool,
724 target: &str,
725 ) -> Result<Option<TlsCheckRow>> {
726 Ok(sqlx::query_as::<_, TlsCheckRow>(
727 "SELECT id, target, host, valid, days_remaining, not_before, not_after, subject, issuer, checked_at, error
728 FROM tls_checks WHERE target = ? ORDER BY id DESC LIMIT 1",
729 )
730 .bind(target)
731 .fetch_optional(pool)
732 .await?)
733 }
734
735 // --- Incident queries ---
736
737 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
738 pub struct IncidentRow {
739 pub id: i64,
740 pub target: String,
741 pub started_at: String,
742 pub ended_at: Option<String>,
743 pub duration_secs: Option<i64>,
744 pub from_status: String,
745 pub to_status: String,
746 }
747
748 #[instrument(skip_all)]
749 pub async fn insert_incident(
750 pool: &SqlitePool,
751 target: &str,
752 from_status: &str,
753 to_status: &str,
754 ) -> Result<i64> {
755 let now = chrono::Utc::now().to_rfc3339();
756 let result = sqlx::query(
757 "INSERT INTO incidents (target, started_at, from_status, to_status)
758 VALUES (?, ?, ?, ?)",
759 )
760 .bind(target)
761 .bind(&now)
762 .bind(from_status)
763 .bind(to_status)
764 .execute(pool)
765 .await?;
766 Ok(result.last_insert_rowid())
767 }
768
769 #[instrument(skip_all)]
770 pub async fn close_open_incidents(
771 pool: &SqlitePool,
772 target: &str,
773 ) -> Result<u64> {
774 let now = chrono::Utc::now().to_rfc3339();
775 let result = sqlx::query(
776 "UPDATE incidents SET ended_at = ?, duration_secs = CAST((julianday(?) - julianday(started_at)) * 86400 AS INTEGER)
777 WHERE target = ? AND ended_at IS NULL",
778 )
779 .bind(&now)
780 .bind(&now)
781 .bind(target)
782 .execute(pool)
783 .await?;
784 Ok(result.rows_affected())
785 }
786
787 #[instrument(skip_all)]
788 pub async fn get_open_incident(
789 pool: &SqlitePool,
790 target: &str,
791 ) -> Result<Option<IncidentRow>> {
792 Ok(sqlx::query_as::<_, IncidentRow>(
793 "SELECT id, target, started_at, ended_at, duration_secs, from_status, to_status
794 FROM incidents WHERE target = ? AND ended_at IS NULL ORDER BY id DESC LIMIT 1",
795 )
796 .bind(target)
797 .fetch_optional(pool)
798 .await?)
799 }
800
801 #[instrument(skip_all)]
802 pub async fn get_recent_incidents(
803 pool: &SqlitePool,
804 target: &str,
805 limit: i64,
806 ) -> Result<Vec<IncidentRow>> {
807 Ok(sqlx::query_as::<_, IncidentRow>(
808 "SELECT id, target, started_at, ended_at, duration_secs, from_status, to_status
809 FROM incidents WHERE target = ? ORDER BY id DESC LIMIT ?",
810 )
811 .bind(target)
812 .bind(limit)
813 .fetch_all(pool)
814 .await?)
815 }
816
817 // --- Route check queries ---
818
819 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
820 pub struct RouteCheckRow {
821 pub id: i64,
822 pub target: String,
823 pub path: String,
824 pub status_code: i64,
825 pub ok: bool,
826 pub response_time_ms: i64,
827 pub checked_at: String,
828 pub error: Option<String>,
829 }
830
831 #[instrument(skip_all)]
832 pub async fn insert_route_check(
833 pool: &SqlitePool,
834 result: &crate::checks::routes::RouteCheckResult,
835 ) -> Result<i64> {
836 let row = sqlx::query(
837 "INSERT INTO route_checks (target, path, status_code, ok, response_time_ms, checked_at, error)
838 VALUES (?, ?, ?, ?, ?, ?, ?)",
839 )
840 .bind(&result.target)
841 .bind(&result.path)
842 .bind(result.status_code as i64)
843 .bind(result.ok)
844 .bind(result.response_time_ms)
845 .bind(&result.checked_at)
846 .bind(&result.error)
847 .execute(pool)
848 .await?;
849 Ok(row.last_insert_rowid())
850 }
851
852 /// Get the latest route check per path for a target.
853 #[instrument(skip_all)]
854 pub async fn get_latest_route_checks(
855 pool: &SqlitePool,
856 target: &str,
857 ) -> Result<Vec<RouteCheckRow>> {
858 Ok(sqlx::query_as::<_, RouteCheckRow>(
859 "SELECT r.id, r.target, r.path, r.status_code, r.ok, r.response_time_ms, r.checked_at, r.error
860 FROM route_checks r
861 INNER JOIN (SELECT path, MAX(id) as max_id FROM route_checks WHERE target = ? GROUP BY path) latest
862 ON r.id = latest.max_id
863 ORDER BY r.path",
864 )
865 .bind(target)
866 .fetch_all(pool)
867 .await?)
868 }
869
870 // --- DNS check queries ---
871
872 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
873 pub struct DnsCheckRow {
874 pub id: i64,
875 pub target: String,
876 pub name: String,
877 pub record_type: String,
878 pub expected: String,
879 pub actual: String,
880 pub matches: bool,
881 pub checked_at: String,
882 pub error: Option<String>,
883 }
884
885 #[instrument(skip_all)]
886 pub async fn insert_dns_check(
887 pool: &SqlitePool,
888 result: &DnsCheckResult,
889 ) -> Result<i64> {
890 let expected = serde_json::to_string(&result.expected).unwrap_or_default();
891 let actual = serde_json::to_string(&result.actual).unwrap_or_default();
892 let record_type_str = result.record_type.to_string();
893
894 let row = sqlx::query(
895 "INSERT INTO dns_checks (target, name, record_type, expected, actual, matches, checked_at, error)
896 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
897 )
898 .bind(&result.target)
899 .bind(&result.name)
900 .bind(&record_type_str)
901 .bind(&expected)
902 .bind(&actual)
903 .bind(result.matches)
904 .bind(&result.checked_at)
905 .bind(&result.error)
906 .execute(pool)
907 .await?;
908 Ok(row.last_insert_rowid())
909 }
910
911 /// Get the latest DNS check per (name, record_type) for a target.
912 #[instrument(skip_all)]
913 pub async fn get_latest_dns_checks(
914 pool: &SqlitePool,
915 target: &str,
916 ) -> Result<Vec<DnsCheckRow>> {
917 Ok(sqlx::query_as::<_, DnsCheckRow>(
918 "SELECT d.id, d.target, d.name, d.record_type, d.expected, d.actual, d.matches, d.checked_at, d.error
919 FROM dns_checks d
920 INNER JOIN (SELECT name, record_type, MAX(id) as max_id FROM dns_checks WHERE target = ? GROUP BY name, record_type) latest
921 ON d.id = latest.max_id
922 ORDER BY d.name, d.record_type",
923 )
924 .bind(target)
925 .fetch_all(pool)
926 .await?)
927 }
928
929 // --- WHOIS check queries ---
930
931 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
932 pub struct WhoisCheckRow {
933 pub id: i64,
934 pub target: String,
935 pub domain: String,
936 pub registrar: Option<String>,
937 pub expiry_date: Option<String>,
938 pub days_remaining: Option<i64>,
939 pub nameservers: Option<String>,
940 pub checked_at: String,
941 pub error: Option<String>,
942 }
943
944 #[instrument(skip_all)]
945 pub async fn insert_whois_check(
946 pool: &SqlitePool,
947 result: &WhoisResult,
948 ) -> Result<i64> {
949 let nameservers = serde_json::to_string(&result.nameservers).unwrap_or_default();
950
951 let row = sqlx::query(
952 "INSERT INTO whois_checks (target, domain, registrar, expiry_date, days_remaining, nameservers, checked_at, error)
953 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
954 )
955 .bind(&result.target)
956 .bind(&result.domain)
957 .bind(&result.registrar)
958 .bind(&result.expiry_date)
959 .bind(result.days_remaining)
960 .bind(&nameservers)
961 .bind(&result.checked_at)
962 .bind(&result.error)
963 .execute(pool)
964 .await?;
965 Ok(row.last_insert_rowid())
966 }
967
968 #[instrument(skip_all)]
969 pub async fn get_latest_whois_check(
970 pool: &SqlitePool,
971 target: &str,
972 ) -> Result<Option<WhoisCheckRow>> {
973 Ok(sqlx::query_as::<_, WhoisCheckRow>(
974 "SELECT id, target, domain, registrar, expiry_date, days_remaining, nameservers, checked_at, error
975 FROM whois_checks WHERE target = ? ORDER BY id DESC LIMIT 1",
976 )
977 .bind(target)
978 .fetch_optional(pool)
979 .await?)
980 }
981
982 // --- CORS check queries ---
983
984 #[instrument(skip_all)]
985 pub async fn insert_cors_check(
986 pool: &SqlitePool,
987 result: &CorsCheckResult,
988 ) -> Result<i64> {
989 let row = sqlx::query(
990 "INSERT INTO cors_checks (target, url, origin, method, passes, checked_at, error)
991 VALUES (?, ?, ?, ?, ?, ?, ?)",
992 )
993 .bind(&result.target)
994 .bind(&result.url)
995 .bind(&result.origin)
996 .bind(&result.method)
997 .bind(result.passes)
998 .bind(&result.checked_at)
999 .bind(&result.error)
1000 .execute(pool)
1001 .await?;
1002 Ok(row.last_insert_rowid())
1003 }
1004
1005 /// Get the latest CORS check per URL for a target.
1006 #[instrument(skip_all)]
1007 pub async fn get_latest_cors_checks(
1008 pool: &SqlitePool,
1009 target: &str,
1010 ) -> Result<Vec<CorsCheckResult>> {
1011 let rows = sqlx::query_as::<_, (String, String, String, String, bool, String, Option<String>)>(
1012 "SELECT c.target, c.url, c.origin, c.method, c.passes, c.checked_at, c.error
1013 FROM cors_checks c
1014 INNER JOIN (SELECT url, MAX(id) as max_id FROM cors_checks WHERE target = ? GROUP BY url) latest
1015 ON c.id = latest.max_id
1016 ORDER BY c.url",
1017 )
1018 .bind(target)
1019 .fetch_all(pool)
1020 .await?;
1021
1022 Ok(rows
1023 .into_iter()
1024 .map(|(target, url, origin, method, passes, checked_at, error)| CorsCheckResult {
1025 target,
1026 url,
1027 origin,
1028 method,
1029 passes,
1030 checked_at,
1031 error,
1032 })
1033 .collect())
1034 }
1035
1036 // --- Stale data cleanup ---
1037
1038 /// Delete route_checks for paths no longer in the config.
1039 #[instrument(skip_all)]
1040 pub async fn prune_stale_routes(
1041 pool: &SqlitePool,
1042 target: &str,
1043 expected_routes: &[String],
1044 ) -> Result<u64> {
1045 if expected_routes.is_empty() {
1046 // No configured routes — delete all route checks for this target
1047 let result = sqlx::query("DELETE FROM route_checks WHERE target = ?")
1048 .bind(target)
1049 .execute(pool)
1050 .await?;
1051 return Ok(result.rows_affected());
1052 }
1053
1054 // Build placeholders for the IN clause
1055 let placeholders: Vec<&str> = expected_routes.iter().map(|_| "?").collect();
1056 let sql = format!(
1057 "DELETE FROM route_checks WHERE target = ? AND path NOT IN ({})",
1058 placeholders.join(", ")
1059 );
1060 let mut query = sqlx::query(&sql).bind(target);
1061 for route in expected_routes {
1062 query = query.bind(route);
1063 }
1064 let result = query.execute(pool).await?;
1065 Ok(result.rows_affected())
1066 }
1067
1068 /// Delete dns_checks for (name, record_type) pairs no longer in the config.
1069 #[instrument(skip_all)]
1070 pub async fn prune_stale_dns(
1071 pool: &SqlitePool,
1072 target: &str,
1073 expected_dns: &[(String, String)],
1074 ) -> Result<u64> {
1075 if expected_dns.is_empty() {
1076 // No configured DNS records — delete all DNS checks for this target
1077 let result = sqlx::query("DELETE FROM dns_checks WHERE target = ?")
1078 .bind(target)
1079 .execute(pool)
1080 .await?;
1081 return Ok(result.rows_affected());
1082 }
1083
1084 // Build a compound condition: keep rows matching any configured (name, record_type) pair
1085 let conditions: Vec<String> = expected_dns
1086 .iter()
1087 .map(|_| "(name = ? AND record_type = ?)".to_string())
1088 .collect();
1089 let sql = format!(
1090 "DELETE FROM dns_checks WHERE target = ? AND NOT ({})",
1091 conditions.join(" OR ")
1092 );
1093 let mut query = sqlx::query(&sql).bind(target);
1094 for (name, record_type) in expected_dns {
1095 query = query.bind(name).bind(record_type);
1096 }
1097 let result = query.execute(pool).await?;
1098 Ok(result.rows_affected())
1099 }
1100
1101 // --- Maintenance ---
1102
1103 /// Prune result with counts for each table.
1104 pub struct PruneResult {
1105 pub health: u64,
1106 pub tests: u64,
1107 pub test_details: u64,
1108 pub heartbeats: u64,
1109 pub alerts: u64,
1110 pub tls: u64,
1111 pub incidents: u64,
1112 pub routes: u64,
1113 pub dns: u64,
1114 pub whois: u64,
1115 }
1116
1117 /// Delete records older than `days` from all tables.
1118 /// Only closed incidents (with a non-NULL `ended_at`) are pruned.
1119 #[instrument(skip_all)]
1120 pub async fn prune_old_records(
1121 pool: &SqlitePool,
1122 days: i64,
1123 ) -> Result<PruneResult> {
1124 // Guard: days <= 0 would set cutoff to now (or the future), deleting
1125 // everything. Treat this as a no-op instead.
1126 if days <= 0 {
1127 return Ok(PruneResult { health: 0, tests: 0, test_details: 0, heartbeats: 0, alerts: 0, tls: 0, incidents: 0, routes: 0, dns: 0, whois: 0 });
1128 }
1129
1130 let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
1131 let cutoff_str = cutoff.to_rfc3339();
1132
1133 let health_result = sqlx::query("DELETE FROM health_checks WHERE checked_at < ?")
1134 .bind(&cutoff_str)
1135 .execute(pool)
1136 .await?;
1137
1138 let test_result = sqlx::query("DELETE FROM test_runs WHERE started_at < ?")
1139 .bind(&cutoff_str)
1140 .execute(pool)
1141 .await?;
1142
1143 // Prune orphaned test_details (run was deleted above).
1144 let test_details_result = sqlx::query(
1145 "DELETE FROM test_details WHERE run_id NOT IN (SELECT id FROM test_runs)",
1146 )
1147 .execute(pool)
1148 .await?;
1149
1150 let peer_hb_result = sqlx::query("DELETE FROM peer_heartbeats WHERE checked_at < ?")
1151 .bind(&cutoff_str)
1152 .execute(pool)
1153 .await?;
1154
1155 let alerts_result = sqlx::query("DELETE FROM alerts WHERE sent_at < ?")
1156 .bind(&cutoff_str)
1157 .execute(pool)
1158 .await?;
1159
1160 let tls_result = sqlx::query("DELETE FROM tls_checks WHERE checked_at < ?")
1161 .bind(&cutoff_str)
1162 .execute(pool)
1163 .await?;
1164
1165 let incidents_result = sqlx::query("DELETE FROM incidents WHERE ended_at IS NOT NULL AND ended_at < ?")
1166 .bind(&cutoff_str)
1167 .execute(pool)
1168 .await?;
1169
1170 let routes_result = sqlx::query("DELETE FROM route_checks WHERE checked_at < ?")
1171 .bind(&cutoff_str)
1172 .execute(pool)
1173 .await?;
1174
1175 let dns_result = sqlx::query("DELETE FROM dns_checks WHERE checked_at < ?")
1176 .bind(&cutoff_str)
1177 .execute(pool)
1178 .await?;
1179
1180 let whois_result = sqlx::query("DELETE FROM whois_checks WHERE checked_at < ?")
1181 .bind(&cutoff_str)
1182 .execute(pool)
1183 .await?;
1184
1185 Ok(PruneResult {
1186 health: health_result.rows_affected(),
1187 tests: test_result.rows_affected(),
1188 test_details: test_details_result.rows_affected(),
1189 heartbeats: peer_hb_result.rows_affected(),
1190 alerts: alerts_result.rows_affected(),
1191 tls: tls_result.rows_affected(),
1192 incidents: incidents_result.rows_affected(),
1193 routes: routes_result.rows_affected(),
1194 dns: dns_result.rows_affected(),
1195 whois: whois_result.rows_affected(),
1196 })
1197 }
1198
1199 // --- Peer identity queries ---
1200
1201 /// Store a peer's identity (first-seen UUID). INSERT OR IGNORE — first wins.
1202 #[instrument(skip_all)]
1203 pub async fn store_peer_identity(
1204 pool: &SqlitePool,
1205 peer_name: &str,
1206 instance_id: &str,
1207 ) -> Result<()> {
1208 let now = chrono::Utc::now().to_rfc3339();
1209 sqlx::query(
1210 "INSERT OR IGNORE INTO peer_identities (peer_name, instance_id, first_seen)
1211 VALUES (?, ?, ?)",
1212 )
1213 .bind(peer_name)
1214 .bind(instance_id)
1215 .bind(&now)
1216 .execute(pool)
1217 .await?;
1218 Ok(())
1219 }
1220
1221 /// Update a peer's identity (UUID changed, e.g. after reinstall).
1222 #[instrument(skip_all)]
1223 pub async fn update_peer_identity(
1224 pool: &SqlitePool,
1225 peer_name: &str,
1226 instance_id: &str,
1227 ) -> Result<()> {
1228 let now = chrono::Utc::now().to_rfc3339();
1229 sqlx::query(
1230 "UPDATE peer_identities SET instance_id = ?, first_seen = ?
1231 WHERE peer_name = ?",
1232 )
1233 .bind(instance_id)
1234 .bind(&now)
1235 .bind(peer_name)
1236 .execute(pool)
1237 .await?;
1238 Ok(())
1239 }
1240
1241 /// Get a peer's first-seen instance ID.
1242 #[instrument(skip_all)]
1243 pub async fn get_peer_identity(
1244 pool: &SqlitePool,
1245 peer_name: &str,
1246 ) -> Result<Option<String>> {
1247 let row = sqlx::query_as::<_, (String,)>(
1248 "SELECT instance_id FROM peer_identities WHERE peer_name = ?",
1249 )
1250 .bind(peer_name)
1251 .fetch_optional(pool)
1252 .await?;
1253 Ok(row.map(|r| r.0))
1254 }
1255
1256 // --- Peer heartbeat queries ---
1257
1258 #[instrument(skip_all)]
1259 pub async fn insert_peer_heartbeat(
1260 pool: &SqlitePool,
1261 peer_name: &str,
1262 status: &str,
1263 latency_ms: i64,
1264 ) -> Result<i64> {
1265 let now = chrono::Utc::now().to_rfc3339();
1266 let result = sqlx::query(
1267 "INSERT INTO peer_heartbeats (peer_name, status, latency_ms, checked_at)
1268 VALUES (?, ?, ?, ?)",
1269 )
1270 .bind(peer_name)
1271 .bind(status)
1272 .bind(latency_ms)
1273 .bind(&now)
1274 .execute(pool)
1275 .await?;
1276 Ok(result.last_insert_rowid())
1277 }
1278
1279 #[instrument(skip_all)]
1280 pub async fn get_peer_heartbeat_history(
1281 pool: &SqlitePool,
1282 peer_name: &str,
1283 limit: i64,
1284 ) -> Result<Vec<PeerHeartbeatRow>> {
1285 Ok(sqlx::query_as::<_, PeerHeartbeatRow>(
1286 "SELECT id, peer_name, status, latency_ms, checked_at
1287 FROM peer_heartbeats WHERE peer_name = ? ORDER BY id DESC LIMIT ?",
1288 )
1289 .bind(peer_name)
1290 .bind(limit)
1291 .fetch_all(pool)
1292 .await?)
1293 }
1294
1295 #[derive(Debug, sqlx::FromRow, serde::Serialize)]
1296 pub struct PeerHeartbeatRow {
1297 pub id: i64,
1298 pub peer_name: String,
1299 pub status: String,
1300 pub latency_ms: i64,
1301 pub checked_at: String,
1302 }
1303
1304 // --- Internal row types ---
1305
1306 #[derive(sqlx::FromRow)]
1307 struct HealthCheckRow {
1308 id: i64,
1309 target: String,
1310 status: String,
1311 checked_at: String,
1312 response_time_ms: i64,
1313 details_json: Option<String>,
1314 error: Option<String>,
1315 }
1316
1317 impl HealthCheckRow {
1318 fn into_snapshot(self) -> HealthSnapshot {
1319 let status = self
1320 .status
1321 .parse::<HealthStatus>()
1322 .unwrap_or(HealthStatus::Error);
1323 let details = self
1324 .details_json
1325 .as_deref()
1326 .and_then(|s| serde_json::from_str::<HealthDetails>(s).ok());
1327
1328 HealthSnapshot {
1329 id: Some(self.id),
1330 target: self.target,
1331 status,
1332 checked_at: self.checked_at,
1333 response_time_ms: self.response_time_ms,
1334 details,
1335 error: self.error,
1336 }
1337 }
1338 }
1339
1340 #[derive(sqlx::FromRow)]
1341 struct TestRunRow {
1342 id: i64,
1343 target: String,
1344 started_at: String,
1345 finished_at: Option<String>,
1346 duration_secs: Option<i64>,
1347 exit_code: Option<i32>,
1348 passed: bool,
1349 summary_json: String,
1350 raw_output: String,
1351 filter: Option<String>,
1352 }
1353
1354 impl TestRunRow {
1355 fn into_test_run(self) -> TestRun {
1356 let summary = serde_json::from_str::<TestSummary>(&self.summary_json).unwrap_or(TestSummary {
1357 steps: vec![],
1358 total_passed: None,
1359 total_failed: None,
1360 details: vec![],
1361 });
1362
1363 TestRun {
1364 id: Some(self.id),
1365 target: self.target,
1366 started_at: self.started_at,
1367 finished_at: self.finished_at,
1368 duration_secs: self.duration_secs,
1369 exit_code: self.exit_code,
1370 passed: self.passed,
1371 summary,
1372 raw_output: self.raw_output,
1373 filter: self.filter,
1374 }
1375 }
1376 }
1377