Skip to main content

max / makenotwork

43.1 KB · 1452 lines History Blame Raw
1 //! SQLite persistence — schema, health checks, test runs, and peer data.
2 //!
3 //! Uses a migration versioning system: each schema change is a numbered migration
4 //! stored in [`MIGRATIONS`]. On startup, [`run_migrations`] checks the current
5 //! version and runs any pending migrations. Existing databases (pre-migration)
6 //! are detected by the presence of the `health_checks` table and marked as v1.
7
8 use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
9 use std::path::Path;
10 use std::str::FromStr;
11 use tracing::{info, instrument};
12
13 use crate::error::Result;
14 use crate::types::{BackupCheckResult, CorsCheckResult, DnsCheckResult, HealthDetails, HealthSnapshot, HealthStatus, TestDetail, TestRun, TestRunId, TestSummary, TlsStatus, WhoisResult};
15
16 /// Each migration is a (version, description, SQL) tuple. Versions start at 1.
17 /// The SQL may contain multiple statements separated by semicolons.
18 const MIGRATIONS: &[(i64, &str, &str)] = &[
19 (1, "initial schema", r#"
20 CREATE TABLE IF NOT EXISTS health_checks (
21 id INTEGER PRIMARY KEY AUTOINCREMENT,
22 target TEXT NOT NULL,
23 status TEXT NOT NULL,
24 checked_at TEXT NOT NULL,
25 response_time_ms INTEGER NOT NULL,
26 details_json TEXT,
27 error TEXT
28 );
29 CREATE TABLE IF NOT EXISTS test_runs (
30 id INTEGER PRIMARY KEY AUTOINCREMENT,
31 target TEXT NOT NULL,
32 started_at TEXT NOT NULL,
33 finished_at TEXT,
34 duration_secs INTEGER,
35 exit_code INTEGER,
36 passed INTEGER NOT NULL,
37 summary_json TEXT NOT NULL,
38 raw_output TEXT NOT NULL,
39 filter TEXT
40 );
41 CREATE TABLE IF NOT EXISTS peer_identities (
42 peer_name TEXT PRIMARY KEY,
43 instance_id TEXT NOT NULL,
44 first_seen TEXT NOT NULL
45 );
46 CREATE TABLE IF NOT EXISTS peer_heartbeats (
47 id INTEGER PRIMARY KEY AUTOINCREMENT,
48 peer_name TEXT NOT NULL,
49 status TEXT NOT NULL,
50 latency_ms INTEGER NOT NULL,
51 checked_at TEXT NOT NULL
52 );
53 CREATE INDEX IF NOT EXISTS idx_health_checks_target_id ON health_checks(target, id DESC);
54 CREATE INDEX IF NOT EXISTS idx_health_checks_target_checked ON health_checks(target, checked_at);
55 CREATE INDEX IF NOT EXISTS idx_test_runs_target_id ON test_runs(target, id DESC);
56 CREATE INDEX IF NOT EXISTS idx_peer_heartbeats_peer_id ON peer_heartbeats(peer_name, id DESC);
57 "#),
58 (2, "add alerts table", r#"
59 CREATE TABLE IF NOT EXISTS alerts (
60 id INTEGER PRIMARY KEY AUTOINCREMENT,
61 target TEXT NOT NULL,
62 alert_type TEXT NOT NULL,
63 from_status TEXT,
64 to_status TEXT,
65 sent_at TEXT NOT NULL,
66 error TEXT
67 );
68 CREATE INDEX IF NOT EXISTS idx_alerts_target_sent ON alerts(target, sent_at);
69 "#),
70 (3, "add tls_checks table", r#"
71 CREATE TABLE IF NOT EXISTS tls_checks (
72 id INTEGER PRIMARY KEY AUTOINCREMENT,
73 target TEXT NOT NULL,
74 host TEXT NOT NULL,
75 valid INTEGER NOT NULL,
76 days_remaining INTEGER NOT NULL,
77 not_before TEXT NOT NULL,
78 not_after TEXT NOT NULL,
79 subject TEXT NOT NULL,
80 issuer TEXT NOT NULL,
81 checked_at TEXT NOT NULL,
82 error TEXT
83 );
84 CREATE INDEX IF NOT EXISTS idx_tls_checks_target_id ON tls_checks(target, id DESC);
85 "#),
86 (4, "add incidents table", r#"
87 CREATE TABLE IF NOT EXISTS incidents (
88 id INTEGER PRIMARY KEY AUTOINCREMENT,
89 target TEXT NOT NULL,
90 started_at TEXT NOT NULL,
91 ended_at TEXT,
92 duration_secs INTEGER,
93 from_status TEXT NOT NULL,
94 to_status TEXT NOT NULL
95 );
96 CREATE INDEX IF NOT EXISTS idx_incidents_target_id ON incidents(target, id DESC);
97 "#),
98 (5, "add route_checks table", r#"
99 CREATE TABLE IF NOT EXISTS route_checks (
100 id INTEGER PRIMARY KEY AUTOINCREMENT,
101 target TEXT NOT NULL,
102 path TEXT NOT NULL,
103 status_code INTEGER NOT NULL,
104 ok INTEGER NOT NULL,
105 response_time_ms INTEGER NOT NULL,
106 checked_at TEXT NOT NULL,
107 error TEXT
108 );
109 CREATE INDEX IF NOT EXISTS idx_route_checks_target_path ON route_checks(target, path, id DESC);
110 CREATE INDEX IF NOT EXISTS idx_route_checks_target ON route_checks(target, checked_at DESC);
111 "#),
112 (6, "add dns_checks and whois_checks tables", r#"
113 CREATE TABLE IF NOT EXISTS dns_checks (
114 id INTEGER PRIMARY KEY AUTOINCREMENT,
115 target TEXT NOT NULL,
116 name TEXT NOT NULL,
117 record_type TEXT NOT NULL,
118 expected TEXT NOT NULL,
119 actual TEXT NOT NULL,
120 matches INTEGER NOT NULL,
121 checked_at TEXT NOT NULL,
122 error TEXT
123 );
124 CREATE INDEX IF NOT EXISTS idx_dns_checks_target ON dns_checks(target, name, id DESC);
125
126 CREATE TABLE IF NOT EXISTS whois_checks (
127 id INTEGER PRIMARY KEY AUTOINCREMENT,
128 target TEXT NOT NULL,
129 domain TEXT NOT NULL,
130 registrar TEXT,
131 expiry_date TEXT,
132 days_remaining INTEGER,
133 nameservers TEXT,
134 checked_at TEXT NOT NULL,
135 error TEXT
136 );
137 CREATE INDEX IF NOT EXISTS idx_whois_checks_target ON whois_checks(target, id DESC);
138 "#),
139 (7, "add test_details table", r#"
140 CREATE TABLE IF NOT EXISTS test_details (
141 id INTEGER PRIMARY KEY AUTOINCREMENT,
142 run_id INTEGER NOT NULL REFERENCES test_runs(id) ON DELETE CASCADE,
143 test_name TEXT NOT NULL,
144 passed INTEGER NOT NULL,
145 duration_ms INTEGER
146 );
147 CREATE INDEX IF NOT EXISTS idx_test_details_run_id ON test_details(run_id);
148 CREATE INDEX IF NOT EXISTS idx_test_details_name ON test_details(test_name, run_id DESC);
149 "#),
150 (8, "add cors_checks table", r#"
151 CREATE TABLE IF NOT EXISTS cors_checks (
152 id INTEGER PRIMARY KEY AUTOINCREMENT,
153 target TEXT NOT NULL,
154 url TEXT NOT NULL,
155 origin TEXT NOT NULL,
156 method TEXT NOT NULL,
157 passes INTEGER NOT NULL,
158 checked_at TEXT NOT NULL,
159 error TEXT
160 );
161 CREATE INDEX IF NOT EXISTS idx_cors_target_id ON cors_checks(target, id DESC);
162 "#),
163 (9, "add backup_checks table", r#"
164 CREATE TABLE IF NOT EXISTS backup_checks (
165 id INTEGER PRIMARY KEY AUTOINCREMENT,
166 target TEXT NOT NULL,
167 database_name TEXT NOT NULL,
168 status TEXT NOT NULL,
169 last_backup_at TEXT,
170 size_bytes INTEGER,
171 age_hours INTEGER,
172 checked_at TEXT NOT NULL,
173 error TEXT
174 );
175 CREATE INDEX IF NOT EXISTS idx_backup_checks_target ON backup_checks(target, id DESC);
176 "#),
177 ];
178
179 #[instrument(skip_all)]
180 pub async fn connect(path: &Path) -> Result<SqlitePool> {
181 let opts = SqliteConnectOptions::from_str(&format!("sqlite:{}", path.display()))?
182 .create_if_missing(true)
183 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
184
185 let pool = SqlitePoolOptions::new()
186 .max_connections(5)
187 .connect_with(opts)
188 .await?;
189
190 run_migrations(&pool).await?;
191 Ok(pool)
192 }
193
194 #[instrument(skip_all)]
195 pub async fn connect_in_memory() -> Result<SqlitePool> {
196 let opts = SqliteConnectOptions::from_str("sqlite::memory:")?;
197 let pool = SqlitePoolOptions::new()
198 .max_connections(1)
199 .connect_with(opts)
200 .await?;
201
202 run_migrations(&pool).await?;
203 Ok(pool)
204 }
205
206 /// Run pending schema migrations. Detects pre-migration databases by checking
207 /// for existing tables and stamps them as version 1 without re-running.
208 #[instrument(skip_all)]
209 pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
210 // Ensure the schema_version table exists
211 sqlx::query(
212 "CREATE TABLE IF NOT EXISTS schema_version (
213 version INTEGER NOT NULL,
214 description TEXT NOT NULL,
215 applied_at TEXT NOT NULL
216 )",
217 )
218 .execute(pool)
219 .await?;
220
221 let current_version = get_schema_version(pool).await?;
222
223 // Detect pre-migration databases: if schema_version is empty but tables exist,
224 // this is an existing database that predates the migration system.
225 if current_version == 0 && has_existing_tables(pool).await? {
226 info!("detected pre-migration database, stamping as version 1");
227 stamp_version(pool, 1, "initial schema (pre-existing)").await?;
228 // Run remaining migrations (2+) if any
229 for &(version, description, sql) in MIGRATIONS {
230 if version > 1 {
231 run_one_migration(pool, version, description, sql).await?;
232 }
233 }
234 return Ok(());
235 }
236
237 // Run all migrations newer than current version
238 for &(version, description, sql) in MIGRATIONS {
239 if version > current_version {
240 run_one_migration(pool, version, description, sql).await?;
241 }
242 }
243
244 Ok(())
245 }
246
247 /// Get the current schema version (0 if no migrations have been applied).
248 #[instrument(skip_all)]
249 pub async fn get_schema_version(pool: &SqlitePool) -> Result<i64> {
250 let row = sqlx::query_as::<_, (i64,)>(
251 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
252 )
253 .fetch_one(pool)
254 .await?;
255 Ok(row.0)
256 }
257
258 /// Check whether the database has existing tables from before the migration system.
259 async fn has_existing_tables(pool: &SqlitePool) -> Result<bool> {
260 let row = sqlx::query_as::<_, (i64,)>(
261 "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'health_checks'",
262 )
263 .fetch_one(pool)
264 .await?;
265 Ok(row.0 > 0)
266 }
267
268 /// Execute a single migration's SQL and record it in schema_version.
269 /// Wrapped in an explicit transaction so partial failures roll back cleanly.
270 async fn run_one_migration(
271 pool: &SqlitePool,
272 version: i64,
273 description: &str,
274 sql: &str,
275 ) -> Result<()> {
276 info!(version, description, "running migration");
277
278 let mut tx = pool.begin().await?;
279
280 // Execute each statement in the migration SQL
281 for statement in sql.split(';') {
282 let trimmed = statement.trim();
283 if !trimmed.is_empty() {
284 sqlx::query(trimmed).execute(&mut *tx).await?;
285 }
286 }
287
288 // Record the version inside the same transaction
289 let now = chrono::Utc::now().to_rfc3339();
290 sqlx::query(
291 "INSERT INTO schema_version (version, description, applied_at) VALUES (?, ?, ?)",
292 )
293 .bind(version)
294 .bind(description)
295 .bind(&now)
296 .execute(&mut *tx)
297 .await?;
298
299 tx.commit().await?;
300 Ok(())
301 }
302
303 /// Record a version in the schema_version table.
304 async fn stamp_version(pool: &SqlitePool, version: i64, description: &str) -> Result<()> {
305 let now = chrono::Utc::now().to_rfc3339();
306 sqlx::query(
307 "INSERT INTO schema_version (version, description, applied_at) VALUES (?, ?, ?)",
308 )
309 .bind(version)
310 .bind(description)
311 .bind(&now)
312 .execute(pool)
313 .await?;
314 Ok(())
315 }
316
317 // --- Health check queries ---
318
319 #[instrument(skip_all)]
320 pub async fn insert_health_check(
321 pool: &SqlitePool,
322 snapshot: &HealthSnapshot,
323 ) -> Result<i64> {
324 let status = snapshot.status.to_string();
325 let details_json = snapshot
326 .details
327 .as_ref()
328 .map(|d| serde_json::to_string(d).unwrap_or_default());
329
330 let result = sqlx::query(
331 "INSERT INTO health_checks (target, status, checked_at, response_time_ms, details_json, error)
332 VALUES (?, ?, ?, ?, ?, ?)",
333 )
334 .bind(&snapshot.target)
335 .bind(&status)
336 .bind(&snapshot.checked_at)
337 .bind(snapshot.response_time_ms)
338 .bind(&details_json)
339 .bind(&snapshot.error)
340 .execute(pool)
341 .await?;
342
343 Ok(result.last_insert_rowid())
344 }
345
346 #[instrument(skip_all)]
347 pub async fn get_health_history(
348 pool: &SqlitePool,
349 target: Option<&str>,
350 limit: i64,
351 ) -> Result<Vec<HealthSnapshot>> {
352 let rows = match target {
353 Some(t) => {
354 sqlx::query_as::<_, HealthCheckRow>(
355 "SELECT id, target, status, checked_at, response_time_ms, details_json, error
356 FROM health_checks WHERE target = ? ORDER BY id DESC LIMIT ?",
357 )
358 .bind(t)
359 .bind(limit)
360 .fetch_all(pool)
361 .await?
362 }
363 None => {
364 sqlx::query_as::<_, HealthCheckRow>(
365 "SELECT id, target, status, checked_at, response_time_ms, details_json, error
366 FROM health_checks ORDER BY id DESC LIMIT ?",
367 )
368 .bind(limit)
369 .fetch_all(pool)
370 .await?
371 }
372 };
373
374 Ok(rows.into_iter().map(|r| r.into_snapshot()).collect())
375 }
376
377 #[instrument(skip_all)]
378 pub async fn get_latest_health(
379 pool: &SqlitePool,
380 target: &str,
381 ) -> Result<Option<HealthSnapshot>> {
382 let row = sqlx::query_as::<_, HealthCheckRow>(
383 "SELECT id, target, status, checked_at, response_time_ms, details_json, error
384 FROM health_checks WHERE target = ? ORDER BY id DESC LIMIT 1",
385 )
386 .bind(target)
387 .fetch_optional(pool)
388 .await?;
389
390 Ok(row.map(|r| r.into_snapshot()))
391 }
392
393 // --- Test run queries ---
394
395 #[instrument(skip_all)]
396 pub async fn insert_test_run(
397 pool: &SqlitePool,
398 run: &TestRun,
399 ) -> Result<TestRunId> {
400 let summary_json = serde_json::to_string(&run.summary).unwrap_or_default();
401
402 let result = sqlx::query(
403 "INSERT INTO test_runs (target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter)
404 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
405 )
406 .bind(&run.target)
407 .bind(&run.started_at)
408 .bind(&run.finished_at)
409 .bind(run.duration_secs)
410 .bind(run.exit_code)
411 .bind(run.passed)
412 .bind(&summary_json)
413 .bind(&run.raw_output)
414 .bind(&run.filter)
415 .execute(pool)
416 .await?;
417
418 Ok(TestRunId(result.last_insert_rowid()))
419 }
420
421 #[instrument(skip_all)]
422 pub async fn get_test_history(
423 pool: &SqlitePool,
424 target: Option<&str>,
425 limit: i64,
426 ) -> Result<Vec<TestRun>> {
427 let rows = match target {
428 Some(t) => {
429 sqlx::query_as::<_, TestRunRow>(
430 "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter
431 FROM test_runs WHERE target = ? ORDER BY id DESC LIMIT ?",
432 )
433 .bind(t)
434 .bind(limit)
435 .fetch_all(pool)
436 .await?
437 }
438 None => {
439 sqlx::query_as::<_, TestRunRow>(
440 "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter
441 FROM test_runs ORDER BY id DESC LIMIT ?",
442 )
443 .bind(limit)
444 .fetch_all(pool)
445 .await?
446 }
447 };
448
449 Ok(rows.into_iter().map(|r| r.into_test_run()).collect())
450 }
451
452 #[instrument(skip_all)]
453 pub async fn get_latest_test_run(
454 pool: &SqlitePool,
455 target: &str,
456 ) -> Result<Option<TestRun>> {
457 let row = sqlx::query_as::<_, TestRunRow>(
458 "SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter
459 FROM test_runs WHERE target = ? ORDER BY id DESC LIMIT 1",
460 )
461 .bind(target)
462 .fetch_optional(pool)
463 .await?;
464
465 Ok(row.map(|r| r.into_test_run()))
466 }
467
468 // --- Test detail queries ---
469
470 /// Insert per-test results for a given test run.
471 #[instrument(skip_all)]
472 pub async fn insert_test_details(
473 pool: &SqlitePool,
474 run_id: TestRunId,
475 details: &[TestDetail],
476 ) -> Result<()> {
477 for detail in details {
478 sqlx::query(
479 "INSERT INTO test_details (run_id, test_name, passed) VALUES (?, ?, ?)",
480 )
481 .bind(run_id.0)
482 .bind(&detail.test_name)
483 .bind(detail.passed)
484 .execute(pool)
485 .await?;
486 }
487 Ok(())
488 }
489
490 /// Find tests that passed in the previous run but failed in this one (regressions).
491 #[instrument(skip_all)]
492 pub async fn get_test_regressions(
493 pool: &SqlitePool,
494 target: &str,
495 current_run_id: TestRunId,
496 ) -> Result<Vec<String>> {
497 // Find the run immediately before this one for the same target
498 let prev_run = sqlx::query_as::<_, (i64,)>(
499 "SELECT id FROM test_runs
500 WHERE target = ? AND id < ?
501 ORDER BY id DESC LIMIT 1",
502 )
503 .bind(target)
504 .bind(current_run_id.0)
505 .fetch_optional(pool)
506 .await?;
507
508 let Some((prev_id,)) = prev_run else {
509 return Ok(vec![]);
510 };
511
512 // Tests that passed in prev run but failed in current run
513 let rows = sqlx::query_as::<_, (String,)>(
514 "SELECT curr.test_name FROM test_details curr
515 INNER JOIN test_details prev ON prev.test_name = curr.test_name AND prev.run_id = ?
516 WHERE curr.run_id = ? AND curr.passed = 0 AND prev.passed = 1",
517 )
518 .bind(prev_id)
519 .bind(current_run_id.0)
520 .fetch_all(pool)
521 .await?;
522
523 Ok(rows.into_iter().map(|r| r.0).collect())
524 }
525
526 /// Get test duration history for a target (most recent first).
527 #[instrument(skip_all)]
528 pub async fn get_test_durations(
529 pool: &SqlitePool,
530 target: &str,
531 limit: i64,
532 ) -> Result<Vec<(String, i64)>> {
533 let rows = sqlx::query_as::<_, (String, i64)>(
534 "SELECT started_at, duration_secs FROM test_runs
535 WHERE target = ? AND duration_secs IS NOT NULL
536 ORDER BY id DESC LIMIT ?",
537 )
538 .bind(target)
539 .bind(limit)
540 .fetch_all(pool)
541 .await?;
542 Ok(rows)
543 }
544
545 /// Get the version from the health check closest to (but before) a given timestamp.
546 #[instrument(skip_all)]
547 pub async fn get_version_at_time(
548 pool: &SqlitePool,
549 target: &str,
550 before_rfc3339: &str,
551 ) -> Result<Option<String>> {
552 let row = sqlx::query_as::<_, (Option<String>,)>(
553 "SELECT details_json FROM health_checks
554 WHERE target = ? AND checked_at <= ?
555 ORDER BY checked_at DESC LIMIT 1",
556 )
557 .bind(target)
558 .bind(before_rfc3339)
559 .fetch_optional(pool)
560 .await?;
561
562 let version = row
563 .and_then(|r| r.0)
564 .and_then(|json_str| serde_json::from_str::<serde_json::Value>(&json_str).ok())
565 .and_then(|json| json.get("version").and_then(|v| v.as_str()).map(String::from));
566
567 Ok(version)
568 }
569
570 /// Calculate uptime percentage for a target over the given number of hours.
571 /// Returns the percentage of health checks with "operational" status.
572 #[instrument(skip_all)]
573 pub async fn get_uptime_percent(
574 pool: &SqlitePool,
575 target: &str,
576 hours: i64,
577 ) -> Result<Option<f64>> {
578 let cutoff = chrono::Utc::now() - chrono::Duration::hours(hours);
579 let cutoff_str = cutoff.to_rfc3339();
580
581 let row = sqlx::query_as::<_, (i64, i64)>(
582 "SELECT
583 COUNT(*) as total,
584 SUM(CASE WHEN status = 'operational' THEN 1 ELSE 0 END) as operational
585 FROM health_checks
586 WHERE target = ? AND checked_at >= ?",
587 )
588 .bind(target)
589 .bind(&cutoff_str)
590 .fetch_one(pool)
591 .await?;
592
593 if row.0 == 0 {
594 Ok(None)
595 } else {
596 Ok(Some(row.1 as f64 / row.0 as f64 * 100.0))
597 }
598 }
599
600 // --- Latency trending queries ---
601
602 /// Fetch all response times for a target since a given timestamp, ordered ASC.
603 #[instrument(skip_all)]
604 pub async fn get_response_times(
605 pool: &SqlitePool,
606 target: &str,
607 since_rfc3339: &str,
608 ) -> Result<Vec<(String, i64)>> {
609 let rows = sqlx::query_as::<_, (String, i64)>(
610 "SELECT checked_at, response_time_ms FROM health_checks
611 WHERE target = ? AND checked_at >= ?
612 ORDER BY checked_at ASC",
613 )
614 .bind(target)
615 .bind(since_rfc3339)
616 .fetch_all(pool)
617 .await?;
618 Ok(rows)
619 }
620
621 /// Fetch the last N response times for **operational** checks only (most recent first).
622 #[instrument(skip_all)]
623 pub async fn get_recent_response_times(
624 pool: &SqlitePool,
625 target: &str,
626 count: i64,
627 ) -> Result<Vec<i64>> {
628 let rows = sqlx::query_as::<_, (i64,)>(
629 "SELECT response_time_ms FROM health_checks
630 WHERE target = ? AND status = 'operational'
631 ORDER BY id DESC LIMIT ?",
632 )
633 .bind(target)
634 .bind(count)
635 .fetch_all(pool)
636 .await?;
637 Ok(rows.into_iter().map(|r| r.0).collect())
638 }
639
640 // --- Alert queries ---
641
642 #[derive(Debug, sqlx::FromRow)]
643 pub struct AlertRow {
644 pub id: i64,
645 pub target: String,
646 pub alert_type: String,
647 pub from_status: Option<String>,
648 pub to_status: Option<String>,
649 pub sent_at: String,
650 pub error: Option<String>,
651 }
652
653 #[instrument(skip_all)]
654 pub async fn insert_alert(
655 pool: &SqlitePool,
656 target: &str,
657 alert_type: &str,
658 from_status: Option<&str>,
659 to_status: Option<&str>,
660 error: Option<&str>,
661 ) -> Result<i64> {
662 let now = chrono::Utc::now().to_rfc3339();
663 let result = sqlx::query(
664 "INSERT INTO alerts (target, alert_type, from_status, to_status, sent_at, error)
665 VALUES (?, ?, ?, ?, ?, ?)",
666 )
667 .bind(target)
668 .bind(alert_type)
669 .bind(from_status)
670 .bind(to_status)
671 .bind(&now)
672 .bind(error)
673 .execute(pool)
674 .await?;
675 Ok(result.last_insert_rowid())
676 }
677
678 #[instrument(skip_all)]
679 pub async fn get_latest_alert_for_target(
680 pool: &SqlitePool,
681 target: &str,
682 ) -> Result<Option<AlertRow>> {
683 Ok(sqlx::query_as::<_, AlertRow>(
684 "SELECT id, target, alert_type, from_status, to_status, sent_at, error
685 FROM alerts WHERE target = ? AND alert_type NOT LIKE '%recovery%'
686 ORDER BY id DESC LIMIT 1",
687 )
688 .bind(target)
689 .fetch_optional(pool)
690 .await?)
691 }
692
693 // --- TLS check queries ---
694
695 #[derive(Debug, sqlx::FromRow, serde::Serialize)]
696 pub struct TlsCheckRow {
697 pub id: i64,
698 pub target: String,
699 pub host: String,
700 pub valid: bool,
701 pub days_remaining: i64,
702 pub not_before: String,
703 pub not_after: String,
704 pub subject: String,
705 pub issuer: String,
706 pub checked_at: String,
707 pub error: Option<String>,
708 }
709
710 #[instrument(skip_all)]
711 pub async fn insert_tls_check(
712 pool: &SqlitePool,
713 status: &TlsStatus,
714 ) -> Result<i64> {
715 let result = sqlx::query(
716 "INSERT INTO tls_checks (target, host, valid, days_remaining, not_before, not_after, subject, issuer, checked_at, error)
717 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
718 )
719 .bind(&status.target)
720 .bind(&status.host)
721 .bind(status.valid)
722 .bind(status.days_remaining)
723 .bind(&status.not_before)
724 .bind(&status.not_after)
725 .bind(&status.subject)
726 .bind(&status.issuer)
727 .bind(&status.checked_at)
728 .bind(&status.error)
729 .execute(pool)
730 .await?;
731
732 Ok(result.last_insert_rowid())
733 }
734
735 #[instrument(skip_all)]
736 pub async fn get_latest_tls_check(
737 pool: &SqlitePool,
738 target: &str,
739 ) -> Result<Option<TlsCheckRow>> {
740 Ok(sqlx::query_as::<_, TlsCheckRow>(
741 "SELECT id, target, host, valid, days_remaining, not_before, not_after, subject, issuer, checked_at, error
742 FROM tls_checks WHERE target = ? ORDER BY id DESC LIMIT 1",
743 )
744 .bind(target)
745 .fetch_optional(pool)
746 .await?)
747 }
748
749 // --- Incident queries ---
750
751 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
752 pub struct IncidentRow {
753 pub id: i64,
754 pub target: String,
755 pub started_at: String,
756 pub ended_at: Option<String>,
757 pub duration_secs: Option<i64>,
758 pub from_status: String,
759 pub to_status: String,
760 }
761
762 #[instrument(skip_all)]
763 pub async fn insert_incident(
764 pool: &SqlitePool,
765 target: &str,
766 from_status: &str,
767 to_status: &str,
768 ) -> Result<i64> {
769 let now = chrono::Utc::now().to_rfc3339();
770 let result = sqlx::query(
771 "INSERT INTO incidents (target, started_at, from_status, to_status)
772 VALUES (?, ?, ?, ?)",
773 )
774 .bind(target)
775 .bind(&now)
776 .bind(from_status)
777 .bind(to_status)
778 .execute(pool)
779 .await?;
780 Ok(result.last_insert_rowid())
781 }
782
783 #[instrument(skip_all)]
784 pub async fn close_open_incidents(
785 pool: &SqlitePool,
786 target: &str,
787 ) -> Result<u64> {
788 let now = chrono::Utc::now().to_rfc3339();
789 let result = sqlx::query(
790 "UPDATE incidents SET ended_at = ?, duration_secs = CAST((julianday(?) - julianday(started_at)) * 86400 AS INTEGER)
791 WHERE target = ? AND ended_at IS NULL",
792 )
793 .bind(&now)
794 .bind(&now)
795 .bind(target)
796 .execute(pool)
797 .await?;
798 Ok(result.rows_affected())
799 }
800
801 #[instrument(skip_all)]
802 pub async fn get_open_incident(
803 pool: &SqlitePool,
804 target: &str,
805 ) -> Result<Option<IncidentRow>> {
806 Ok(sqlx::query_as::<_, IncidentRow>(
807 "SELECT id, target, started_at, ended_at, duration_secs, from_status, to_status
808 FROM incidents WHERE target = ? AND ended_at IS NULL ORDER BY id DESC LIMIT 1",
809 )
810 .bind(target)
811 .fetch_optional(pool)
812 .await?)
813 }
814
815 #[instrument(skip_all)]
816 pub async fn get_recent_incidents(
817 pool: &SqlitePool,
818 target: &str,
819 limit: i64,
820 ) -> Result<Vec<IncidentRow>> {
821 Ok(sqlx::query_as::<_, IncidentRow>(
822 "SELECT id, target, started_at, ended_at, duration_secs, from_status, to_status
823 FROM incidents WHERE target = ? ORDER BY id DESC LIMIT ?",
824 )
825 .bind(target)
826 .bind(limit)
827 .fetch_all(pool)
828 .await?)
829 }
830
831 // --- Route check queries ---
832
833 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
834 pub struct RouteCheckRow {
835 pub id: i64,
836 pub target: String,
837 pub path: String,
838 pub status_code: i64,
839 pub ok: bool,
840 pub response_time_ms: i64,
841 pub checked_at: String,
842 pub error: Option<String>,
843 }
844
845 #[instrument(skip_all)]
846 pub async fn insert_route_check(
847 pool: &SqlitePool,
848 result: &crate::checks::routes::RouteCheckResult,
849 ) -> Result<i64> {
850 let row = sqlx::query(
851 "INSERT INTO route_checks (target, path, status_code, ok, response_time_ms, checked_at, error)
852 VALUES (?, ?, ?, ?, ?, ?, ?)",
853 )
854 .bind(&result.target)
855 .bind(&result.path)
856 .bind(result.status_code as i64)
857 .bind(result.ok)
858 .bind(result.response_time_ms)
859 .bind(&result.checked_at)
860 .bind(&result.error)
861 .execute(pool)
862 .await?;
863 Ok(row.last_insert_rowid())
864 }
865
866 /// Get the latest route check per path for a target.
867 #[instrument(skip_all)]
868 pub async fn get_latest_route_checks(
869 pool: &SqlitePool,
870 target: &str,
871 ) -> Result<Vec<RouteCheckRow>> {
872 Ok(sqlx::query_as::<_, RouteCheckRow>(
873 "SELECT r.id, r.target, r.path, r.status_code, r.ok, r.response_time_ms, r.checked_at, r.error
874 FROM route_checks r
875 INNER JOIN (SELECT path, MAX(id) as max_id FROM route_checks WHERE target = ? GROUP BY path) latest
876 ON r.id = latest.max_id
877 ORDER BY r.path",
878 )
879 .bind(target)
880 .fetch_all(pool)
881 .await?)
882 }
883
884 // --- DNS check queries ---
885
886 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
887 pub struct DnsCheckRow {
888 pub id: i64,
889 pub target: String,
890 pub name: String,
891 pub record_type: String,
892 pub expected: String,
893 pub actual: String,
894 pub matches: bool,
895 pub checked_at: String,
896 pub error: Option<String>,
897 }
898
899 #[instrument(skip_all)]
900 pub async fn insert_dns_check(
901 pool: &SqlitePool,
902 result: &DnsCheckResult,
903 ) -> Result<i64> {
904 let expected = serde_json::to_string(&result.expected).unwrap_or_default();
905 let actual = serde_json::to_string(&result.actual).unwrap_or_default();
906 let record_type_str = result.record_type.to_string();
907
908 let row = sqlx::query(
909 "INSERT INTO dns_checks (target, name, record_type, expected, actual, matches, checked_at, error)
910 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
911 )
912 .bind(&result.target)
913 .bind(&result.name)
914 .bind(&record_type_str)
915 .bind(&expected)
916 .bind(&actual)
917 .bind(result.matches)
918 .bind(&result.checked_at)
919 .bind(&result.error)
920 .execute(pool)
921 .await?;
922 Ok(row.last_insert_rowid())
923 }
924
925 /// Get the latest DNS check per (name, record_type) for a target.
926 #[instrument(skip_all)]
927 pub async fn get_latest_dns_checks(
928 pool: &SqlitePool,
929 target: &str,
930 ) -> Result<Vec<DnsCheckRow>> {
931 Ok(sqlx::query_as::<_, DnsCheckRow>(
932 "SELECT d.id, d.target, d.name, d.record_type, d.expected, d.actual, d.matches, d.checked_at, d.error
933 FROM dns_checks d
934 INNER JOIN (SELECT name, record_type, MAX(id) as max_id FROM dns_checks WHERE target = ? GROUP BY name, record_type) latest
935 ON d.id = latest.max_id
936 ORDER BY d.name, d.record_type",
937 )
938 .bind(target)
939 .fetch_all(pool)
940 .await?)
941 }
942
943 // --- WHOIS check queries ---
944
945 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
946 pub struct WhoisCheckRow {
947 pub id: i64,
948 pub target: String,
949 pub domain: String,
950 pub registrar: Option<String>,
951 pub expiry_date: Option<String>,
952 pub days_remaining: Option<i64>,
953 pub nameservers: Option<String>,
954 pub checked_at: String,
955 pub error: Option<String>,
956 }
957
958 #[instrument(skip_all)]
959 pub async fn insert_whois_check(
960 pool: &SqlitePool,
961 result: &WhoisResult,
962 ) -> Result<i64> {
963 let nameservers = serde_json::to_string(&result.nameservers).unwrap_or_default();
964
965 let row = sqlx::query(
966 "INSERT INTO whois_checks (target, domain, registrar, expiry_date, days_remaining, nameservers, checked_at, error)
967 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
968 )
969 .bind(&result.target)
970 .bind(&result.domain)
971 .bind(&result.registrar)
972 .bind(&result.expiry_date)
973 .bind(result.days_remaining)
974 .bind(&nameservers)
975 .bind(&result.checked_at)
976 .bind(&result.error)
977 .execute(pool)
978 .await?;
979 Ok(row.last_insert_rowid())
980 }
981
982 #[instrument(skip_all)]
983 pub async fn get_latest_whois_check(
984 pool: &SqlitePool,
985 target: &str,
986 ) -> Result<Option<WhoisCheckRow>> {
987 Ok(sqlx::query_as::<_, WhoisCheckRow>(
988 "SELECT id, target, domain, registrar, expiry_date, days_remaining, nameservers, checked_at, error
989 FROM whois_checks WHERE target = ? ORDER BY id DESC LIMIT 1",
990 )
991 .bind(target)
992 .fetch_optional(pool)
993 .await?)
994 }
995
996 // --- CORS check queries ---
997
998 #[instrument(skip_all)]
999 pub async fn insert_cors_check(
1000 pool: &SqlitePool,
1001 result: &CorsCheckResult,
1002 ) -> Result<i64> {
1003 let row = sqlx::query(
1004 "INSERT INTO cors_checks (target, url, origin, method, passes, checked_at, error)
1005 VALUES (?, ?, ?, ?, ?, ?, ?)",
1006 )
1007 .bind(&result.target)
1008 .bind(&result.url)
1009 .bind(&result.origin)
1010 .bind(&result.method)
1011 .bind(result.passes)
1012 .bind(&result.checked_at)
1013 .bind(&result.error)
1014 .execute(pool)
1015 .await?;
1016 Ok(row.last_insert_rowid())
1017 }
1018
1019 /// Get the latest CORS check per URL for a target.
1020 #[instrument(skip_all)]
1021 pub async fn get_latest_cors_checks(
1022 pool: &SqlitePool,
1023 target: &str,
1024 ) -> Result<Vec<CorsCheckResult>> {
1025 let rows = sqlx::query_as::<_, (String, String, String, String, bool, String, Option<String>)>(
1026 "SELECT c.target, c.url, c.origin, c.method, c.passes, c.checked_at, c.error
1027 FROM cors_checks c
1028 INNER JOIN (SELECT url, MAX(id) as max_id FROM cors_checks WHERE target = ? GROUP BY url) latest
1029 ON c.id = latest.max_id
1030 ORDER BY c.url",
1031 )
1032 .bind(target)
1033 .fetch_all(pool)
1034 .await?;
1035
1036 Ok(rows
1037 .into_iter()
1038 .map(|(target, url, origin, method, passes, checked_at, error)| CorsCheckResult {
1039 target,
1040 url,
1041 origin,
1042 method,
1043 passes,
1044 checked_at,
1045 error,
1046 })
1047 .collect())
1048 }
1049
1050 // --- Backup check queries ---
1051
1052 #[instrument(skip_all)]
1053 pub async fn insert_backup_check(
1054 pool: &SqlitePool,
1055 result: &BackupCheckResult,
1056 ) -> Result<i64> {
1057 let row = sqlx::query(
1058 "INSERT INTO backup_checks (target, database_name, status, last_backup_at, size_bytes, age_hours, checked_at, error)
1059 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
1060 )
1061 .bind(&result.target)
1062 .bind(&result.database_name)
1063 .bind(&result.status)
1064 .bind(&result.last_backup_at)
1065 .bind(result.size_bytes)
1066 .bind(result.age_hours)
1067 .bind(&result.checked_at)
1068 .bind(&result.error)
1069 .execute(pool)
1070 .await?;
1071 Ok(row.last_insert_rowid())
1072 }
1073
1074 /// Get the latest backup check for a specific target and database.
1075 #[instrument(skip_all)]
1076 pub async fn get_latest_backup_check(
1077 pool: &SqlitePool,
1078 target: &str,
1079 database_name: &str,
1080 ) -> Result<Option<BackupCheckRow>> {
1081 Ok(sqlx::query_as::<_, BackupCheckRow>(
1082 "SELECT id, target, database_name, status, last_backup_at, size_bytes, age_hours, checked_at, error
1083 FROM backup_checks WHERE target = ? AND database_name = ? ORDER BY id DESC LIMIT 1",
1084 )
1085 .bind(target)
1086 .bind(database_name)
1087 .fetch_optional(pool)
1088 .await?)
1089 }
1090
1091 #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
1092 pub struct BackupCheckRow {
1093 pub id: i64,
1094 pub target: String,
1095 pub database_name: String,
1096 pub status: String,
1097 pub last_backup_at: Option<String>,
1098 pub size_bytes: Option<i64>,
1099 pub age_hours: Option<i64>,
1100 pub checked_at: String,
1101 pub error: Option<String>,
1102 }
1103
1104 // --- Stale data cleanup ---
1105
1106 /// Delete route_checks for paths no longer in the config.
1107 #[instrument(skip_all)]
1108 pub async fn prune_stale_routes(
1109 pool: &SqlitePool,
1110 target: &str,
1111 expected_routes: &[String],
1112 ) -> Result<u64> {
1113 if expected_routes.is_empty() {
1114 // No configured routes — delete all route checks for this target
1115 let result = sqlx::query("DELETE FROM route_checks WHERE target = ?")
1116 .bind(target)
1117 .execute(pool)
1118 .await?;
1119 return Ok(result.rows_affected());
1120 }
1121
1122 // Build placeholders for the IN clause
1123 let placeholders: Vec<&str> = expected_routes.iter().map(|_| "?").collect();
1124 let sql = format!(
1125 "DELETE FROM route_checks WHERE target = ? AND path NOT IN ({})",
1126 placeholders.join(", ")
1127 );
1128 let mut query = sqlx::query(&sql).bind(target);
1129 for route in expected_routes {
1130 query = query.bind(route);
1131 }
1132 let result = query.execute(pool).await?;
1133 Ok(result.rows_affected())
1134 }
1135
1136 /// Delete dns_checks for (name, record_type) pairs no longer in the config.
1137 #[instrument(skip_all)]
1138 pub async fn prune_stale_dns(
1139 pool: &SqlitePool,
1140 target: &str,
1141 expected_dns: &[(String, String)],
1142 ) -> Result<u64> {
1143 if expected_dns.is_empty() {
1144 // No configured DNS records — delete all DNS checks for this target
1145 let result = sqlx::query("DELETE FROM dns_checks WHERE target = ?")
1146 .bind(target)
1147 .execute(pool)
1148 .await?;
1149 return Ok(result.rows_affected());
1150 }
1151
1152 // Build a compound condition: keep rows matching any configured (name, record_type) pair
1153 let conditions: Vec<String> = expected_dns
1154 .iter()
1155 .map(|_| "(name = ? AND record_type = ?)".to_string())
1156 .collect();
1157 let sql = format!(
1158 "DELETE FROM dns_checks WHERE target = ? AND NOT ({})",
1159 conditions.join(" OR ")
1160 );
1161 let mut query = sqlx::query(&sql).bind(target);
1162 for (name, record_type) in expected_dns {
1163 query = query.bind(name).bind(record_type);
1164 }
1165 let result = query.execute(pool).await?;
1166 Ok(result.rows_affected())
1167 }
1168
1169 // --- Maintenance ---
1170
1171 /// Prune result with counts for each table.
1172 pub struct PruneResult {
1173 pub health: u64,
1174 pub tests: u64,
1175 pub test_details: u64,
1176 pub heartbeats: u64,
1177 pub alerts: u64,
1178 pub tls: u64,
1179 pub incidents: u64,
1180 pub routes: u64,
1181 pub dns: u64,
1182 pub whois: u64,
1183 pub backups: u64,
1184 }
1185
1186 /// Delete records older than `days` from all tables.
1187 /// Only closed incidents (with a non-NULL `ended_at`) are pruned.
1188 #[instrument(skip_all)]
1189 pub async fn prune_old_records(
1190 pool: &SqlitePool,
1191 days: i64,
1192 ) -> Result<PruneResult> {
1193 // Guard: days <= 0 would set cutoff to now (or the future), deleting
1194 // everything. Treat this as a no-op instead.
1195 if days <= 0 {
1196 return Ok(PruneResult { health: 0, tests: 0, test_details: 0, heartbeats: 0, alerts: 0, tls: 0, incidents: 0, routes: 0, dns: 0, whois: 0, backups: 0 });
1197 }
1198
1199 let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
1200 let cutoff_str = cutoff.to_rfc3339();
1201
1202 let health_result = sqlx::query("DELETE FROM health_checks WHERE checked_at < ?")
1203 .bind(&cutoff_str)
1204 .execute(pool)
1205 .await?;
1206
1207 let test_result = sqlx::query("DELETE FROM test_runs WHERE started_at < ?")
1208 .bind(&cutoff_str)
1209 .execute(pool)
1210 .await?;
1211
1212 // Prune orphaned test_details (run was deleted above).
1213 let test_details_result = sqlx::query(
1214 "DELETE FROM test_details WHERE run_id NOT IN (SELECT id FROM test_runs)",
1215 )
1216 .execute(pool)
1217 .await?;
1218
1219 let peer_hb_result = sqlx::query("DELETE FROM peer_heartbeats WHERE checked_at < ?")
1220 .bind(&cutoff_str)
1221 .execute(pool)
1222 .await?;
1223
1224 let alerts_result = sqlx::query("DELETE FROM alerts WHERE sent_at < ?")
1225 .bind(&cutoff_str)
1226 .execute(pool)
1227 .await?;
1228
1229 let tls_result = sqlx::query("DELETE FROM tls_checks WHERE checked_at < ?")
1230 .bind(&cutoff_str)
1231 .execute(pool)
1232 .await?;
1233
1234 let incidents_result = sqlx::query("DELETE FROM incidents WHERE ended_at IS NOT NULL AND ended_at < ?")
1235 .bind(&cutoff_str)
1236 .execute(pool)
1237 .await?;
1238
1239 let routes_result = sqlx::query("DELETE FROM route_checks WHERE checked_at < ?")
1240 .bind(&cutoff_str)
1241 .execute(pool)
1242 .await?;
1243
1244 let dns_result = sqlx::query("DELETE FROM dns_checks WHERE checked_at < ?")
1245 .bind(&cutoff_str)
1246 .execute(pool)
1247 .await?;
1248
1249 let whois_result = sqlx::query("DELETE FROM whois_checks WHERE checked_at < ?")
1250 .bind(&cutoff_str)
1251 .execute(pool)
1252 .await?;
1253
1254 let backups_result = sqlx::query("DELETE FROM backup_checks WHERE checked_at < ?")
1255 .bind(&cutoff_str)
1256 .execute(pool)
1257 .await?;
1258
1259 Ok(PruneResult {
1260 health: health_result.rows_affected(),
1261 tests: test_result.rows_affected(),
1262 test_details: test_details_result.rows_affected(),
1263 heartbeats: peer_hb_result.rows_affected(),
1264 alerts: alerts_result.rows_affected(),
1265 tls: tls_result.rows_affected(),
1266 incidents: incidents_result.rows_affected(),
1267 routes: routes_result.rows_affected(),
1268 dns: dns_result.rows_affected(),
1269 whois: whois_result.rows_affected(),
1270 backups: backups_result.rows_affected(),
1271 })
1272 }
1273
1274 // --- Peer identity queries ---
1275
1276 /// Store a peer's identity (first-seen UUID). INSERT OR IGNORE — first wins.
1277 #[instrument(skip_all)]
1278 pub async fn store_peer_identity(
1279 pool: &SqlitePool,
1280 peer_name: &str,
1281 instance_id: &str,
1282 ) -> Result<()> {
1283 let now = chrono::Utc::now().to_rfc3339();
1284 sqlx::query(
1285 "INSERT OR IGNORE INTO peer_identities (peer_name, instance_id, first_seen)
1286 VALUES (?, ?, ?)",
1287 )
1288 .bind(peer_name)
1289 .bind(instance_id)
1290 .bind(&now)
1291 .execute(pool)
1292 .await?;
1293 Ok(())
1294 }
1295
1296 /// Update a peer's identity (UUID changed, e.g. after reinstall).
1297 #[instrument(skip_all)]
1298 pub async fn update_peer_identity(
1299 pool: &SqlitePool,
1300 peer_name: &str,
1301 instance_id: &str,
1302 ) -> Result<()> {
1303 let now = chrono::Utc::now().to_rfc3339();
1304 sqlx::query(
1305 "UPDATE peer_identities SET instance_id = ?, first_seen = ?
1306 WHERE peer_name = ?",
1307 )
1308 .bind(instance_id)
1309 .bind(&now)
1310 .bind(peer_name)
1311 .execute(pool)
1312 .await?;
1313 Ok(())
1314 }
1315
1316 /// Get a peer's first-seen instance ID.
1317 #[instrument(skip_all)]
1318 pub async fn get_peer_identity(
1319 pool: &SqlitePool,
1320 peer_name: &str,
1321 ) -> Result<Option<String>> {
1322 let row = sqlx::query_as::<_, (String,)>(
1323 "SELECT instance_id FROM peer_identities WHERE peer_name = ?",
1324 )
1325 .bind(peer_name)
1326 .fetch_optional(pool)
1327 .await?;
1328 Ok(row.map(|r| r.0))
1329 }
1330
1331 // --- Peer heartbeat queries ---
1332
1333 #[instrument(skip_all)]
1334 pub async fn insert_peer_heartbeat(
1335 pool: &SqlitePool,
1336 peer_name: &str,
1337 status: &str,
1338 latency_ms: i64,
1339 ) -> Result<i64> {
1340 let now = chrono::Utc::now().to_rfc3339();
1341 let result = sqlx::query(
1342 "INSERT INTO peer_heartbeats (peer_name, status, latency_ms, checked_at)
1343 VALUES (?, ?, ?, ?)",
1344 )
1345 .bind(peer_name)
1346 .bind(status)
1347 .bind(latency_ms)
1348 .bind(&now)
1349 .execute(pool)
1350 .await?;
1351 Ok(result.last_insert_rowid())
1352 }
1353
1354 #[instrument(skip_all)]
1355 pub async fn get_peer_heartbeat_history(
1356 pool: &SqlitePool,
1357 peer_name: &str,
1358 limit: i64,
1359 ) -> Result<Vec<PeerHeartbeatRow>> {
1360 Ok(sqlx::query_as::<_, PeerHeartbeatRow>(
1361 "SELECT id, peer_name, status, latency_ms, checked_at
1362 FROM peer_heartbeats WHERE peer_name = ? ORDER BY id DESC LIMIT ?",
1363 )
1364 .bind(peer_name)
1365 .bind(limit)
1366 .fetch_all(pool)
1367 .await?)
1368 }
1369
1370 #[derive(Debug, sqlx::FromRow, serde::Serialize)]
1371 pub struct PeerHeartbeatRow {
1372 pub id: i64,
1373 pub peer_name: String,
1374 pub status: String,
1375 pub latency_ms: i64,
1376 pub checked_at: String,
1377 }
1378
1379 // --- Internal row types ---
1380
1381 #[derive(sqlx::FromRow)]
1382 struct HealthCheckRow {
1383 id: i64,
1384 target: String,
1385 status: String,
1386 checked_at: String,
1387 response_time_ms: i64,
1388 details_json: Option<String>,
1389 error: Option<String>,
1390 }
1391
1392 impl HealthCheckRow {
1393 fn into_snapshot(self) -> HealthSnapshot {
1394 let status = self
1395 .status
1396 .parse::<HealthStatus>()
1397 .unwrap_or(HealthStatus::Error);
1398 let details = self
1399 .details_json
1400 .as_deref()
1401 .and_then(|s| serde_json::from_str::<HealthDetails>(s).ok());
1402
1403 HealthSnapshot {
1404 id: Some(self.id),
1405 target: self.target,
1406 status,
1407 checked_at: self.checked_at,
1408 response_time_ms: self.response_time_ms,
1409 details,
1410 error: self.error,
1411 }
1412 }
1413 }
1414
1415 #[derive(sqlx::FromRow)]
1416 struct TestRunRow {
1417 id: i64,
1418 target: String,
1419 started_at: String,
1420 finished_at: Option<String>,
1421 duration_secs: Option<i64>,
1422 exit_code: Option<i32>,
1423 passed: bool,
1424 summary_json: String,
1425 raw_output: String,
1426 filter: Option<String>,
1427 }
1428
1429 impl TestRunRow {
1430 fn into_test_run(self) -> TestRun {
1431 let summary = serde_json::from_str::<TestSummary>(&self.summary_json).unwrap_or(TestSummary {
1432 steps: vec![],
1433 total_passed: None,
1434 total_failed: None,
1435 details: vec![],
1436 });
1437
1438 TestRun {
1439 id: Some(self.id),
1440 target: self.target,
1441 started_at: self.started_at,
1442 finished_at: self.finished_at,
1443 duration_secs: self.duration_secs,
1444 exit_code: self.exit_code,
1445 passed: self.passed,
1446 summary,
1447 raw_output: self.raw_output,
1448 filter: self.filter,
1449 }
1450 }
1451 }
1452