| 1 |
|
| 2 |
|
| 3 |
|
| 4 |
|
| 5 |
|
| 6 |
|
| 7 |
|
| 8 |
use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; |
| 9 |
use std::path::Path; |
| 10 |
use std::str::FromStr; |
| 11 |
use tracing::{info, instrument}; |
| 12 |
|
| 13 |
use crate::error::Result; |
| 14 |
use crate::types::{BackupCheckResult, CorsCheckResult, DnsCheckResult, HealthDetails, HealthSnapshot, HealthStatus, TestDetail, TestRun, TestRunId, TestSummary, TlsStatus, WhoisResult}; |
| 15 |
|
| 16 |
|
| 17 |
|
| 18 |
const MIGRATIONS: &[(i64, &str, &str)] = &[ |
| 19 |
(1, "initial schema", r#" |
| 20 |
CREATE TABLE IF NOT EXISTS health_checks ( |
| 21 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 22 |
target TEXT NOT NULL, |
| 23 |
status TEXT NOT NULL, |
| 24 |
checked_at TEXT NOT NULL, |
| 25 |
response_time_ms INTEGER NOT NULL, |
| 26 |
details_json TEXT, |
| 27 |
error TEXT |
| 28 |
); |
| 29 |
CREATE TABLE IF NOT EXISTS test_runs ( |
| 30 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 31 |
target TEXT NOT NULL, |
| 32 |
started_at TEXT NOT NULL, |
| 33 |
finished_at TEXT, |
| 34 |
duration_secs INTEGER, |
| 35 |
exit_code INTEGER, |
| 36 |
passed INTEGER NOT NULL, |
| 37 |
summary_json TEXT NOT NULL, |
| 38 |
raw_output TEXT NOT NULL, |
| 39 |
filter TEXT |
| 40 |
); |
| 41 |
CREATE TABLE IF NOT EXISTS peer_identities ( |
| 42 |
peer_name TEXT PRIMARY KEY, |
| 43 |
instance_id TEXT NOT NULL, |
| 44 |
first_seen TEXT NOT NULL |
| 45 |
); |
| 46 |
CREATE TABLE IF NOT EXISTS peer_heartbeats ( |
| 47 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 48 |
peer_name TEXT NOT NULL, |
| 49 |
status TEXT NOT NULL, |
| 50 |
latency_ms INTEGER NOT NULL, |
| 51 |
checked_at TEXT NOT NULL |
| 52 |
); |
| 53 |
CREATE INDEX IF NOT EXISTS idx_health_checks_target_id ON health_checks(target, id DESC); |
| 54 |
CREATE INDEX IF NOT EXISTS idx_health_checks_target_checked ON health_checks(target, checked_at); |
| 55 |
CREATE INDEX IF NOT EXISTS idx_test_runs_target_id ON test_runs(target, id DESC); |
| 56 |
CREATE INDEX IF NOT EXISTS idx_peer_heartbeats_peer_id ON peer_heartbeats(peer_name, id DESC); |
| 57 |
"#), |
| 58 |
(2, "add alerts table", r#" |
| 59 |
CREATE TABLE IF NOT EXISTS alerts ( |
| 60 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 61 |
target TEXT NOT NULL, |
| 62 |
alert_type TEXT NOT NULL, |
| 63 |
from_status TEXT, |
| 64 |
to_status TEXT, |
| 65 |
sent_at TEXT NOT NULL, |
| 66 |
error TEXT |
| 67 |
); |
| 68 |
CREATE INDEX IF NOT EXISTS idx_alerts_target_sent ON alerts(target, sent_at); |
| 69 |
"#), |
| 70 |
(3, "add tls_checks table", r#" |
| 71 |
CREATE TABLE IF NOT EXISTS tls_checks ( |
| 72 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 73 |
target TEXT NOT NULL, |
| 74 |
host TEXT NOT NULL, |
| 75 |
valid INTEGER NOT NULL, |
| 76 |
days_remaining INTEGER NOT NULL, |
| 77 |
not_before TEXT NOT NULL, |
| 78 |
not_after TEXT NOT NULL, |
| 79 |
subject TEXT NOT NULL, |
| 80 |
issuer TEXT NOT NULL, |
| 81 |
checked_at TEXT NOT NULL, |
| 82 |
error TEXT |
| 83 |
); |
| 84 |
CREATE INDEX IF NOT EXISTS idx_tls_checks_target_id ON tls_checks(target, id DESC); |
| 85 |
"#), |
| 86 |
(4, "add incidents table", r#" |
| 87 |
CREATE TABLE IF NOT EXISTS incidents ( |
| 88 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 89 |
target TEXT NOT NULL, |
| 90 |
started_at TEXT NOT NULL, |
| 91 |
ended_at TEXT, |
| 92 |
duration_secs INTEGER, |
| 93 |
from_status TEXT NOT NULL, |
| 94 |
to_status TEXT NOT NULL |
| 95 |
); |
| 96 |
CREATE INDEX IF NOT EXISTS idx_incidents_target_id ON incidents(target, id DESC); |
| 97 |
"#), |
| 98 |
(5, "add route_checks table", r#" |
| 99 |
CREATE TABLE IF NOT EXISTS route_checks ( |
| 100 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 101 |
target TEXT NOT NULL, |
| 102 |
path TEXT NOT NULL, |
| 103 |
status_code INTEGER NOT NULL, |
| 104 |
ok INTEGER NOT NULL, |
| 105 |
response_time_ms INTEGER NOT NULL, |
| 106 |
checked_at TEXT NOT NULL, |
| 107 |
error TEXT |
| 108 |
); |
| 109 |
CREATE INDEX IF NOT EXISTS idx_route_checks_target_path ON route_checks(target, path, id DESC); |
| 110 |
CREATE INDEX IF NOT EXISTS idx_route_checks_target ON route_checks(target, checked_at DESC); |
| 111 |
"#), |
| 112 |
(6, "add dns_checks and whois_checks tables", r#" |
| 113 |
CREATE TABLE IF NOT EXISTS dns_checks ( |
| 114 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 115 |
target TEXT NOT NULL, |
| 116 |
name TEXT NOT NULL, |
| 117 |
record_type TEXT NOT NULL, |
| 118 |
expected TEXT NOT NULL, |
| 119 |
actual TEXT NOT NULL, |
| 120 |
matches INTEGER NOT NULL, |
| 121 |
checked_at TEXT NOT NULL, |
| 122 |
error TEXT |
| 123 |
); |
| 124 |
CREATE INDEX IF NOT EXISTS idx_dns_checks_target ON dns_checks(target, name, id DESC); |
| 125 |
|
| 126 |
CREATE TABLE IF NOT EXISTS whois_checks ( |
| 127 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 128 |
target TEXT NOT NULL, |
| 129 |
domain TEXT NOT NULL, |
| 130 |
registrar TEXT, |
| 131 |
expiry_date TEXT, |
| 132 |
days_remaining INTEGER, |
| 133 |
nameservers TEXT, |
| 134 |
checked_at TEXT NOT NULL, |
| 135 |
error TEXT |
| 136 |
); |
| 137 |
CREATE INDEX IF NOT EXISTS idx_whois_checks_target ON whois_checks(target, id DESC); |
| 138 |
"#), |
| 139 |
(7, "add test_details table", r#" |
| 140 |
CREATE TABLE IF NOT EXISTS test_details ( |
| 141 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 142 |
run_id INTEGER NOT NULL REFERENCES test_runs(id) ON DELETE CASCADE, |
| 143 |
test_name TEXT NOT NULL, |
| 144 |
passed INTEGER NOT NULL, |
| 145 |
duration_ms INTEGER |
| 146 |
); |
| 147 |
CREATE INDEX IF NOT EXISTS idx_test_details_run_id ON test_details(run_id); |
| 148 |
CREATE INDEX IF NOT EXISTS idx_test_details_name ON test_details(test_name, run_id DESC); |
| 149 |
"#), |
| 150 |
(8, "add cors_checks table", r#" |
| 151 |
CREATE TABLE IF NOT EXISTS cors_checks ( |
| 152 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 153 |
target TEXT NOT NULL, |
| 154 |
url TEXT NOT NULL, |
| 155 |
origin TEXT NOT NULL, |
| 156 |
method TEXT NOT NULL, |
| 157 |
passes INTEGER NOT NULL, |
| 158 |
checked_at TEXT NOT NULL, |
| 159 |
error TEXT |
| 160 |
); |
| 161 |
CREATE INDEX IF NOT EXISTS idx_cors_target_id ON cors_checks(target, id DESC); |
| 162 |
"#), |
| 163 |
(9, "add backup_checks table", r#" |
| 164 |
CREATE TABLE IF NOT EXISTS backup_checks ( |
| 165 |
id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 166 |
target TEXT NOT NULL, |
| 167 |
database_name TEXT NOT NULL, |
| 168 |
status TEXT NOT NULL, |
| 169 |
last_backup_at TEXT, |
| 170 |
size_bytes INTEGER, |
| 171 |
age_hours INTEGER, |
| 172 |
checked_at TEXT NOT NULL, |
| 173 |
error TEXT |
| 174 |
); |
| 175 |
CREATE INDEX IF NOT EXISTS idx_backup_checks_target ON backup_checks(target, id DESC); |
| 176 |
"#), |
| 177 |
]; |
| 178 |
|
| 179 |
#[instrument(skip_all)] |
| 180 |
pub async fn connect(path: &Path) -> Result<SqlitePool> { |
| 181 |
let opts = SqliteConnectOptions::from_str(&format!("sqlite:{}", path.display()))? |
| 182 |
.create_if_missing(true) |
| 183 |
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); |
| 184 |
|
| 185 |
let pool = SqlitePoolOptions::new() |
| 186 |
.max_connections(5) |
| 187 |
.connect_with(opts) |
| 188 |
.await?; |
| 189 |
|
| 190 |
run_migrations(&pool).await?; |
| 191 |
Ok(pool) |
| 192 |
} |
| 193 |
|
| 194 |
#[instrument(skip_all)] |
| 195 |
pub async fn connect_in_memory() -> Result<SqlitePool> { |
| 196 |
let opts = SqliteConnectOptions::from_str("sqlite::memory:")?; |
| 197 |
let pool = SqlitePoolOptions::new() |
| 198 |
.max_connections(1) |
| 199 |
.connect_with(opts) |
| 200 |
.await?; |
| 201 |
|
| 202 |
run_migrations(&pool).await?; |
| 203 |
Ok(pool) |
| 204 |
} |
| 205 |
|
| 206 |
|
| 207 |
|
| 208 |
#[instrument(skip_all)] |
| 209 |
pub async fn run_migrations(pool: &SqlitePool) -> Result<()> { |
| 210 |
|
| 211 |
sqlx::query( |
| 212 |
"CREATE TABLE IF NOT EXISTS schema_version ( |
| 213 |
version INTEGER NOT NULL, |
| 214 |
description TEXT NOT NULL, |
| 215 |
applied_at TEXT NOT NULL |
| 216 |
)", |
| 217 |
) |
| 218 |
.execute(pool) |
| 219 |
.await?; |
| 220 |
|
| 221 |
let current_version = get_schema_version(pool).await?; |
| 222 |
|
| 223 |
|
| 224 |
|
| 225 |
if current_version == 0 && has_existing_tables(pool).await? { |
| 226 |
info!("detected pre-migration database, stamping as version 1"); |
| 227 |
stamp_version(pool, 1, "initial schema (pre-existing)").await?; |
| 228 |
|
| 229 |
for &(version, description, sql) in MIGRATIONS { |
| 230 |
if version > 1 { |
| 231 |
run_one_migration(pool, version, description, sql).await?; |
| 232 |
} |
| 233 |
} |
| 234 |
return Ok(()); |
| 235 |
} |
| 236 |
|
| 237 |
|
| 238 |
for &(version, description, sql) in MIGRATIONS { |
| 239 |
if version > current_version { |
| 240 |
run_one_migration(pool, version, description, sql).await?; |
| 241 |
} |
| 242 |
} |
| 243 |
|
| 244 |
Ok(()) |
| 245 |
} |
| 246 |
|
| 247 |
|
| 248 |
#[instrument(skip_all)] |
| 249 |
pub async fn get_schema_version(pool: &SqlitePool) -> Result<i64> { |
| 250 |
let row = sqlx::query_as::<_, (i64,)>( |
| 251 |
"SELECT COALESCE(MAX(version), 0) FROM schema_version", |
| 252 |
) |
| 253 |
.fetch_one(pool) |
| 254 |
.await?; |
| 255 |
Ok(row.0) |
| 256 |
} |
| 257 |
|
| 258 |
|
| 259 |
async fn has_existing_tables(pool: &SqlitePool) -> Result<bool> { |
| 260 |
let row = sqlx::query_as::<_, (i64,)>( |
| 261 |
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'health_checks'", |
| 262 |
) |
| 263 |
.fetch_one(pool) |
| 264 |
.await?; |
| 265 |
Ok(row.0 > 0) |
| 266 |
} |
| 267 |
|
| 268 |
|
| 269 |
|
| 270 |
async fn run_one_migration( |
| 271 |
pool: &SqlitePool, |
| 272 |
version: i64, |
| 273 |
description: &str, |
| 274 |
sql: &str, |
| 275 |
) -> Result<()> { |
| 276 |
info!(version, description, "running migration"); |
| 277 |
|
| 278 |
let mut tx = pool.begin().await?; |
| 279 |
|
| 280 |
|
| 281 |
for statement in sql.split(';') { |
| 282 |
let trimmed = statement.trim(); |
| 283 |
if !trimmed.is_empty() { |
| 284 |
sqlx::query(trimmed).execute(&mut *tx).await?; |
| 285 |
} |
| 286 |
} |
| 287 |
|
| 288 |
|
| 289 |
let now = chrono::Utc::now().to_rfc3339(); |
| 290 |
sqlx::query( |
| 291 |
"INSERT INTO schema_version (version, description, applied_at) VALUES (?, ?, ?)", |
| 292 |
) |
| 293 |
.bind(version) |
| 294 |
.bind(description) |
| 295 |
.bind(&now) |
| 296 |
.execute(&mut *tx) |
| 297 |
.await?; |
| 298 |
|
| 299 |
tx.commit().await?; |
| 300 |
Ok(()) |
| 301 |
} |
| 302 |
|
| 303 |
|
| 304 |
async fn stamp_version(pool: &SqlitePool, version: i64, description: &str) -> Result<()> { |
| 305 |
let now = chrono::Utc::now().to_rfc3339(); |
| 306 |
sqlx::query( |
| 307 |
"INSERT INTO schema_version (version, description, applied_at) VALUES (?, ?, ?)", |
| 308 |
) |
| 309 |
.bind(version) |
| 310 |
.bind(description) |
| 311 |
.bind(&now) |
| 312 |
.execute(pool) |
| 313 |
.await?; |
| 314 |
Ok(()) |
| 315 |
} |
| 316 |
|
| 317 |
|
| 318 |
|
| 319 |
#[instrument(skip_all)] |
| 320 |
pub async fn insert_health_check( |
| 321 |
pool: &SqlitePool, |
| 322 |
snapshot: &HealthSnapshot, |
| 323 |
) -> Result<i64> { |
| 324 |
let status = snapshot.status.to_string(); |
| 325 |
let details_json = snapshot |
| 326 |
.details |
| 327 |
.as_ref() |
| 328 |
.map(|d| serde_json::to_string(d).unwrap_or_default()); |
| 329 |
|
| 330 |
let result = sqlx::query( |
| 331 |
"INSERT INTO health_checks (target, status, checked_at, response_time_ms, details_json, error) |
| 332 |
VALUES (?, ?, ?, ?, ?, ?)", |
| 333 |
) |
| 334 |
.bind(&snapshot.target) |
| 335 |
.bind(&status) |
| 336 |
.bind(&snapshot.checked_at) |
| 337 |
.bind(snapshot.response_time_ms) |
| 338 |
.bind(&details_json) |
| 339 |
.bind(&snapshot.error) |
| 340 |
.execute(pool) |
| 341 |
.await?; |
| 342 |
|
| 343 |
Ok(result.last_insert_rowid()) |
| 344 |
} |
| 345 |
|
| 346 |
#[instrument(skip_all)] |
| 347 |
pub async fn get_health_history( |
| 348 |
pool: &SqlitePool, |
| 349 |
target: Option<&str>, |
| 350 |
limit: i64, |
| 351 |
) -> Result<Vec<HealthSnapshot>> { |
| 352 |
let rows = match target { |
| 353 |
Some(t) => { |
| 354 |
sqlx::query_as::<_, HealthCheckRow>( |
| 355 |
"SELECT id, target, status, checked_at, response_time_ms, details_json, error |
| 356 |
FROM health_checks WHERE target = ? ORDER BY id DESC LIMIT ?", |
| 357 |
) |
| 358 |
.bind(t) |
| 359 |
.bind(limit) |
| 360 |
.fetch_all(pool) |
| 361 |
.await? |
| 362 |
} |
| 363 |
None => { |
| 364 |
sqlx::query_as::<_, HealthCheckRow>( |
| 365 |
"SELECT id, target, status, checked_at, response_time_ms, details_json, error |
| 366 |
FROM health_checks ORDER BY id DESC LIMIT ?", |
| 367 |
) |
| 368 |
.bind(limit) |
| 369 |
.fetch_all(pool) |
| 370 |
.await? |
| 371 |
} |
| 372 |
}; |
| 373 |
|
| 374 |
Ok(rows.into_iter().map(|r| r.into_snapshot()).collect()) |
| 375 |
} |
| 376 |
|
| 377 |
#[instrument(skip_all)] |
| 378 |
pub async fn get_latest_health( |
| 379 |
pool: &SqlitePool, |
| 380 |
target: &str, |
| 381 |
) -> Result<Option<HealthSnapshot>> { |
| 382 |
let row = sqlx::query_as::<_, HealthCheckRow>( |
| 383 |
"SELECT id, target, status, checked_at, response_time_ms, details_json, error |
| 384 |
FROM health_checks WHERE target = ? ORDER BY id DESC LIMIT 1", |
| 385 |
) |
| 386 |
.bind(target) |
| 387 |
.fetch_optional(pool) |
| 388 |
.await?; |
| 389 |
|
| 390 |
Ok(row.map(|r| r.into_snapshot())) |
| 391 |
} |
| 392 |
|
| 393 |
|
| 394 |
|
| 395 |
#[instrument(skip_all)] |
| 396 |
pub async fn insert_test_run( |
| 397 |
pool: &SqlitePool, |
| 398 |
run: &TestRun, |
| 399 |
) -> Result<TestRunId> { |
| 400 |
let summary_json = serde_json::to_string(&run.summary).unwrap_or_default(); |
| 401 |
|
| 402 |
let result = sqlx::query( |
| 403 |
"INSERT INTO test_runs (target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter) |
| 404 |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", |
| 405 |
) |
| 406 |
.bind(&run.target) |
| 407 |
.bind(&run.started_at) |
| 408 |
.bind(&run.finished_at) |
| 409 |
.bind(run.duration_secs) |
| 410 |
.bind(run.exit_code) |
| 411 |
.bind(run.passed) |
| 412 |
.bind(&summary_json) |
| 413 |
.bind(&run.raw_output) |
| 414 |
.bind(&run.filter) |
| 415 |
.execute(pool) |
| 416 |
.await?; |
| 417 |
|
| 418 |
Ok(TestRunId(result.last_insert_rowid())) |
| 419 |
} |
| 420 |
|
| 421 |
#[instrument(skip_all)] |
| 422 |
pub async fn get_test_history( |
| 423 |
pool: &SqlitePool, |
| 424 |
target: Option<&str>, |
| 425 |
limit: i64, |
| 426 |
) -> Result<Vec<TestRun>> { |
| 427 |
let rows = match target { |
| 428 |
Some(t) => { |
| 429 |
sqlx::query_as::<_, TestRunRow>( |
| 430 |
"SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter |
| 431 |
FROM test_runs WHERE target = ? ORDER BY id DESC LIMIT ?", |
| 432 |
) |
| 433 |
.bind(t) |
| 434 |
.bind(limit) |
| 435 |
.fetch_all(pool) |
| 436 |
.await? |
| 437 |
} |
| 438 |
None => { |
| 439 |
sqlx::query_as::<_, TestRunRow>( |
| 440 |
"SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter |
| 441 |
FROM test_runs ORDER BY id DESC LIMIT ?", |
| 442 |
) |
| 443 |
.bind(limit) |
| 444 |
.fetch_all(pool) |
| 445 |
.await? |
| 446 |
} |
| 447 |
}; |
| 448 |
|
| 449 |
Ok(rows.into_iter().map(|r| r.into_test_run()).collect()) |
| 450 |
} |
| 451 |
|
| 452 |
#[instrument(skip_all)] |
| 453 |
pub async fn get_latest_test_run( |
| 454 |
pool: &SqlitePool, |
| 455 |
target: &str, |
| 456 |
) -> Result<Option<TestRun>> { |
| 457 |
let row = sqlx::query_as::<_, TestRunRow>( |
| 458 |
"SELECT id, target, started_at, finished_at, duration_secs, exit_code, passed, summary_json, raw_output, filter |
| 459 |
FROM test_runs WHERE target = ? ORDER BY id DESC LIMIT 1", |
| 460 |
) |
| 461 |
.bind(target) |
| 462 |
.fetch_optional(pool) |
| 463 |
.await?; |
| 464 |
|
| 465 |
Ok(row.map(|r| r.into_test_run())) |
| 466 |
} |
| 467 |
|
| 468 |
|
| 469 |
|
| 470 |
|
| 471 |
#[instrument(skip_all)] |
| 472 |
pub async fn insert_test_details( |
| 473 |
pool: &SqlitePool, |
| 474 |
run_id: TestRunId, |
| 475 |
details: &[TestDetail], |
| 476 |
) -> Result<()> { |
| 477 |
for detail in details { |
| 478 |
sqlx::query( |
| 479 |
"INSERT INTO test_details (run_id, test_name, passed) VALUES (?, ?, ?)", |
| 480 |
) |
| 481 |
.bind(run_id.0) |
| 482 |
.bind(&detail.test_name) |
| 483 |
.bind(detail.passed) |
| 484 |
.execute(pool) |
| 485 |
.await?; |
| 486 |
} |
| 487 |
Ok(()) |
| 488 |
} |
| 489 |
|
| 490 |
|
| 491 |
#[instrument(skip_all)] |
| 492 |
pub async fn get_test_regressions( |
| 493 |
pool: &SqlitePool, |
| 494 |
target: &str, |
| 495 |
current_run_id: TestRunId, |
| 496 |
) -> Result<Vec<String>> { |
| 497 |
|
| 498 |
let prev_run = sqlx::query_as::<_, (i64,)>( |
| 499 |
"SELECT id FROM test_runs |
| 500 |
WHERE target = ? AND id < ? |
| 501 |
ORDER BY id DESC LIMIT 1", |
| 502 |
) |
| 503 |
.bind(target) |
| 504 |
.bind(current_run_id.0) |
| 505 |
.fetch_optional(pool) |
| 506 |
.await?; |
| 507 |
|
| 508 |
let Some((prev_id,)) = prev_run else { |
| 509 |
return Ok(vec![]); |
| 510 |
}; |
| 511 |
|
| 512 |
|
| 513 |
let rows = sqlx::query_as::<_, (String,)>( |
| 514 |
"SELECT curr.test_name FROM test_details curr |
| 515 |
INNER JOIN test_details prev ON prev.test_name = curr.test_name AND prev.run_id = ? |
| 516 |
WHERE curr.run_id = ? AND curr.passed = 0 AND prev.passed = 1", |
| 517 |
) |
| 518 |
.bind(prev_id) |
| 519 |
.bind(current_run_id.0) |
| 520 |
.fetch_all(pool) |
| 521 |
.await?; |
| 522 |
|
| 523 |
Ok(rows.into_iter().map(|r| r.0).collect()) |
| 524 |
} |
| 525 |
|
| 526 |
|
| 527 |
#[instrument(skip_all)] |
| 528 |
pub async fn get_test_durations( |
| 529 |
pool: &SqlitePool, |
| 530 |
target: &str, |
| 531 |
limit: i64, |
| 532 |
) -> Result<Vec<(String, i64)>> { |
| 533 |
let rows = sqlx::query_as::<_, (String, i64)>( |
| 534 |
"SELECT started_at, duration_secs FROM test_runs |
| 535 |
WHERE target = ? AND duration_secs IS NOT NULL |
| 536 |
ORDER BY id DESC LIMIT ?", |
| 537 |
) |
| 538 |
.bind(target) |
| 539 |
.bind(limit) |
| 540 |
.fetch_all(pool) |
| 541 |
.await?; |
| 542 |
Ok(rows) |
| 543 |
} |
| 544 |
|
| 545 |
|
| 546 |
#[instrument(skip_all)] |
| 547 |
pub async fn get_version_at_time( |
| 548 |
pool: &SqlitePool, |
| 549 |
target: &str, |
| 550 |
before_rfc3339: &str, |
| 551 |
) -> Result<Option<String>> { |
| 552 |
let row = sqlx::query_as::<_, (Option<String>,)>( |
| 553 |
"SELECT details_json FROM health_checks |
| 554 |
WHERE target = ? AND checked_at <= ? |
| 555 |
ORDER BY checked_at DESC LIMIT 1", |
| 556 |
) |
| 557 |
.bind(target) |
| 558 |
.bind(before_rfc3339) |
| 559 |
.fetch_optional(pool) |
| 560 |
.await?; |
| 561 |
|
| 562 |
let version = row |
| 563 |
.and_then(|r| r.0) |
| 564 |
.and_then(|json_str| serde_json::from_str::<serde_json::Value>(&json_str).ok()) |
| 565 |
.and_then(|json| json.get("version").and_then(|v| v.as_str()).map(String::from)); |
| 566 |
|
| 567 |
Ok(version) |
| 568 |
} |
| 569 |
|
| 570 |
|
| 571 |
|
| 572 |
#[instrument(skip_all)] |
| 573 |
pub async fn get_uptime_percent( |
| 574 |
pool: &SqlitePool, |
| 575 |
target: &str, |
| 576 |
hours: i64, |
| 577 |
) -> Result<Option<f64>> { |
| 578 |
let cutoff = chrono::Utc::now() - chrono::Duration::hours(hours); |
| 579 |
let cutoff_str = cutoff.to_rfc3339(); |
| 580 |
|
| 581 |
let row = sqlx::query_as::<_, (i64, i64)>( |
| 582 |
"SELECT |
| 583 |
COUNT(*) as total, |
| 584 |
SUM(CASE WHEN status = 'operational' THEN 1 ELSE 0 END) as operational |
| 585 |
FROM health_checks |
| 586 |
WHERE target = ? AND checked_at >= ?", |
| 587 |
) |
| 588 |
.bind(target) |
| 589 |
.bind(&cutoff_str) |
| 590 |
.fetch_one(pool) |
| 591 |
.await?; |
| 592 |
|
| 593 |
if row.0 == 0 { |
| 594 |
Ok(None) |
| 595 |
} else { |
| 596 |
Ok(Some(row.1 as f64 / row.0 as f64 * 100.0)) |
| 597 |
} |
| 598 |
} |
| 599 |
|
| 600 |
|
| 601 |
|
| 602 |
|
| 603 |
#[instrument(skip_all)] |
| 604 |
pub async fn get_response_times( |
| 605 |
pool: &SqlitePool, |
| 606 |
target: &str, |
| 607 |
since_rfc3339: &str, |
| 608 |
) -> Result<Vec<(String, i64)>> { |
| 609 |
let rows = sqlx::query_as::<_, (String, i64)>( |
| 610 |
"SELECT checked_at, response_time_ms FROM health_checks |
| 611 |
WHERE target = ? AND checked_at >= ? |
| 612 |
ORDER BY checked_at ASC", |
| 613 |
) |
| 614 |
.bind(target) |
| 615 |
.bind(since_rfc3339) |
| 616 |
.fetch_all(pool) |
| 617 |
.await?; |
| 618 |
Ok(rows) |
| 619 |
} |
| 620 |
|
| 621 |
|
| 622 |
#[instrument(skip_all)] |
| 623 |
pub async fn get_recent_response_times( |
| 624 |
pool: &SqlitePool, |
| 625 |
target: &str, |
| 626 |
count: i64, |
| 627 |
) -> Result<Vec<i64>> { |
| 628 |
let rows = sqlx::query_as::<_, (i64,)>( |
| 629 |
"SELECT response_time_ms FROM health_checks |
| 630 |
WHERE target = ? AND status = 'operational' |
| 631 |
ORDER BY id DESC LIMIT ?", |
| 632 |
) |
| 633 |
.bind(target) |
| 634 |
.bind(count) |
| 635 |
.fetch_all(pool) |
| 636 |
.await?; |
| 637 |
Ok(rows.into_iter().map(|r| r.0).collect()) |
| 638 |
} |
| 639 |
|
| 640 |
|
| 641 |
|
| 642 |
#[derive(Debug, sqlx::FromRow)] |
| 643 |
pub struct AlertRow { |
| 644 |
pub id: i64, |
| 645 |
pub target: String, |
| 646 |
pub alert_type: String, |
| 647 |
pub from_status: Option<String>, |
| 648 |
pub to_status: Option<String>, |
| 649 |
pub sent_at: String, |
| 650 |
pub error: Option<String>, |
| 651 |
} |
| 652 |
|
| 653 |
#[instrument(skip_all)] |
| 654 |
pub async fn insert_alert( |
| 655 |
pool: &SqlitePool, |
| 656 |
target: &str, |
| 657 |
alert_type: &str, |
| 658 |
from_status: Option<&str>, |
| 659 |
to_status: Option<&str>, |
| 660 |
error: Option<&str>, |
| 661 |
) -> Result<i64> { |
| 662 |
let now = chrono::Utc::now().to_rfc3339(); |
| 663 |
let result = sqlx::query( |
| 664 |
"INSERT INTO alerts (target, alert_type, from_status, to_status, sent_at, error) |
| 665 |
VALUES (?, ?, ?, ?, ?, ?)", |
| 666 |
) |
| 667 |
.bind(target) |
| 668 |
.bind(alert_type) |
| 669 |
.bind(from_status) |
| 670 |
.bind(to_status) |
| 671 |
.bind(&now) |
| 672 |
.bind(error) |
| 673 |
.execute(pool) |
| 674 |
.await?; |
| 675 |
Ok(result.last_insert_rowid()) |
| 676 |
} |
| 677 |
|
| 678 |
#[instrument(skip_all)] |
| 679 |
pub async fn get_latest_alert_for_target( |
| 680 |
pool: &SqlitePool, |
| 681 |
target: &str, |
| 682 |
) -> Result<Option<AlertRow>> { |
| 683 |
Ok(sqlx::query_as::<_, AlertRow>( |
| 684 |
"SELECT id, target, alert_type, from_status, to_status, sent_at, error |
| 685 |
FROM alerts WHERE target = ? AND alert_type NOT LIKE '%recovery%' |
| 686 |
ORDER BY id DESC LIMIT 1", |
| 687 |
) |
| 688 |
.bind(target) |
| 689 |
.fetch_optional(pool) |
| 690 |
.await?) |
| 691 |
} |
| 692 |
|
| 693 |
|
| 694 |
|
| 695 |
#[derive(Debug, sqlx::FromRow, serde::Serialize)] |
| 696 |
pub struct TlsCheckRow { |
| 697 |
pub id: i64, |
| 698 |
pub target: String, |
| 699 |
pub host: String, |
| 700 |
pub valid: bool, |
| 701 |
pub days_remaining: i64, |
| 702 |
pub not_before: String, |
| 703 |
pub not_after: String, |
| 704 |
pub subject: String, |
| 705 |
pub issuer: String, |
| 706 |
pub checked_at: String, |
| 707 |
pub error: Option<String>, |
| 708 |
} |
| 709 |
|
| 710 |
#[instrument(skip_all)] |
| 711 |
pub async fn insert_tls_check( |
| 712 |
pool: &SqlitePool, |
| 713 |
status: &TlsStatus, |
| 714 |
) -> Result<i64> { |
| 715 |
let result = sqlx::query( |
| 716 |
"INSERT INTO tls_checks (target, host, valid, days_remaining, not_before, not_after, subject, issuer, checked_at, error) |
| 717 |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", |
| 718 |
) |
| 719 |
.bind(&status.target) |
| 720 |
.bind(&status.host) |
| 721 |
.bind(status.valid) |
| 722 |
.bind(status.days_remaining) |
| 723 |
.bind(&status.not_before) |
| 724 |
.bind(&status.not_after) |
| 725 |
.bind(&status.subject) |
| 726 |
.bind(&status.issuer) |
| 727 |
.bind(&status.checked_at) |
| 728 |
.bind(&status.error) |
| 729 |
.execute(pool) |
| 730 |
.await?; |
| 731 |
|
| 732 |
Ok(result.last_insert_rowid()) |
| 733 |
} |
| 734 |
|
| 735 |
#[instrument(skip_all)] |
| 736 |
pub async fn get_latest_tls_check( |
| 737 |
pool: &SqlitePool, |
| 738 |
target: &str, |
| 739 |
) -> Result<Option<TlsCheckRow>> { |
| 740 |
Ok(sqlx::query_as::<_, TlsCheckRow>( |
| 741 |
"SELECT id, target, host, valid, days_remaining, not_before, not_after, subject, issuer, checked_at, error |
| 742 |
FROM tls_checks WHERE target = ? ORDER BY id DESC LIMIT 1", |
| 743 |
) |
| 744 |
.bind(target) |
| 745 |
.fetch_optional(pool) |
| 746 |
.await?) |
| 747 |
} |
| 748 |
|
| 749 |
|
| 750 |
|
| 751 |
#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] |
| 752 |
pub struct IncidentRow { |
| 753 |
pub id: i64, |
| 754 |
pub target: String, |
| 755 |
pub started_at: String, |
| 756 |
pub ended_at: Option<String>, |
| 757 |
pub duration_secs: Option<i64>, |
| 758 |
pub from_status: String, |
| 759 |
pub to_status: String, |
| 760 |
} |
| 761 |
|
| 762 |
#[instrument(skip_all)] |
| 763 |
pub async fn insert_incident( |
| 764 |
pool: &SqlitePool, |
| 765 |
target: &str, |
| 766 |
from_status: &str, |
| 767 |
to_status: &str, |
| 768 |
) -> Result<i64> { |
| 769 |
let now = chrono::Utc::now().to_rfc3339(); |
| 770 |
let result = sqlx::query( |
| 771 |
"INSERT INTO incidents (target, started_at, from_status, to_status) |
| 772 |
VALUES (?, ?, ?, ?)", |
| 773 |
) |
| 774 |
.bind(target) |
| 775 |
.bind(&now) |
| 776 |
.bind(from_status) |
| 777 |
.bind(to_status) |
| 778 |
.execute(pool) |
| 779 |
.await?; |
| 780 |
Ok(result.last_insert_rowid()) |
| 781 |
} |
| 782 |
|
| 783 |
#[instrument(skip_all)] |
| 784 |
pub async fn close_open_incidents( |
| 785 |
pool: &SqlitePool, |
| 786 |
target: &str, |
| 787 |
) -> Result<u64> { |
| 788 |
let now = chrono::Utc::now().to_rfc3339(); |
| 789 |
let result = sqlx::query( |
| 790 |
"UPDATE incidents SET ended_at = ?, duration_secs = CAST((julianday(?) - julianday(started_at)) * 86400 AS INTEGER) |
| 791 |
WHERE target = ? AND ended_at IS NULL", |
| 792 |
) |
| 793 |
.bind(&now) |
| 794 |
.bind(&now) |
| 795 |
.bind(target) |
| 796 |
.execute(pool) |
| 797 |
.await?; |
| 798 |
Ok(result.rows_affected()) |
| 799 |
} |
| 800 |
|
| 801 |
#[instrument(skip_all)] |
| 802 |
pub async fn get_open_incident( |
| 803 |
pool: &SqlitePool, |
| 804 |
target: &str, |
| 805 |
) -> Result<Option<IncidentRow>> { |
| 806 |
Ok(sqlx::query_as::<_, IncidentRow>( |
| 807 |
"SELECT id, target, started_at, ended_at, duration_secs, from_status, to_status |
| 808 |
FROM incidents WHERE target = ? AND ended_at IS NULL ORDER BY id DESC LIMIT 1", |
| 809 |
) |
| 810 |
.bind(target) |
| 811 |
.fetch_optional(pool) |
| 812 |
.await?) |
| 813 |
} |
| 814 |
|
| 815 |
#[instrument(skip_all)] |
| 816 |
pub async fn get_recent_incidents( |
| 817 |
pool: &SqlitePool, |
| 818 |
target: &str, |
| 819 |
limit: i64, |
| 820 |
) -> Result<Vec<IncidentRow>> { |
| 821 |
Ok(sqlx::query_as::<_, IncidentRow>( |
| 822 |
"SELECT id, target, started_at, ended_at, duration_secs, from_status, to_status |
| 823 |
FROM incidents WHERE target = ? ORDER BY id DESC LIMIT ?", |
| 824 |
) |
| 825 |
.bind(target) |
| 826 |
.bind(limit) |
| 827 |
.fetch_all(pool) |
| 828 |
.await?) |
| 829 |
} |
| 830 |
|
| 831 |
|
| 832 |
|
| 833 |
#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] |
| 834 |
pub struct RouteCheckRow { |
| 835 |
pub id: i64, |
| 836 |
pub target: String, |
| 837 |
pub path: String, |
| 838 |
pub status_code: i64, |
| 839 |
pub ok: bool, |
| 840 |
pub response_time_ms: i64, |
| 841 |
pub checked_at: String, |
| 842 |
pub error: Option<String>, |
| 843 |
} |
| 844 |
|
| 845 |
#[instrument(skip_all)] |
| 846 |
pub async fn insert_route_check( |
| 847 |
pool: &SqlitePool, |
| 848 |
result: &crate::checks::routes::RouteCheckResult, |
| 849 |
) -> Result<i64> { |
| 850 |
let row = sqlx::query( |
| 851 |
"INSERT INTO route_checks (target, path, status_code, ok, response_time_ms, checked_at, error) |
| 852 |
VALUES (?, ?, ?, ?, ?, ?, ?)", |
| 853 |
) |
| 854 |
.bind(&result.target) |
| 855 |
.bind(&result.path) |
| 856 |
.bind(result.status_code as i64) |
| 857 |
.bind(result.ok) |
| 858 |
.bind(result.response_time_ms) |
| 859 |
.bind(&result.checked_at) |
| 860 |
.bind(&result.error) |
| 861 |
.execute(pool) |
| 862 |
.await?; |
| 863 |
Ok(row.last_insert_rowid()) |
| 864 |
} |
| 865 |
|
| 866 |
|
| 867 |
#[instrument(skip_all)] |
| 868 |
pub async fn get_latest_route_checks( |
| 869 |
pool: &SqlitePool, |
| 870 |
target: &str, |
| 871 |
) -> Result<Vec<RouteCheckRow>> { |
| 872 |
Ok(sqlx::query_as::<_, RouteCheckRow>( |
| 873 |
"SELECT r.id, r.target, r.path, r.status_code, r.ok, r.response_time_ms, r.checked_at, r.error |
| 874 |
FROM route_checks r |
| 875 |
INNER JOIN (SELECT path, MAX(id) as max_id FROM route_checks WHERE target = ? GROUP BY path) latest |
| 876 |
ON r.id = latest.max_id |
| 877 |
ORDER BY r.path", |
| 878 |
) |
| 879 |
.bind(target) |
| 880 |
.fetch_all(pool) |
| 881 |
.await?) |
| 882 |
} |
| 883 |
|
| 884 |
|
| 885 |
|
| 886 |
#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] |
| 887 |
pub struct DnsCheckRow { |
| 888 |
pub id: i64, |
| 889 |
pub target: String, |
| 890 |
pub name: String, |
| 891 |
pub record_type: String, |
| 892 |
pub expected: String, |
| 893 |
pub actual: String, |
| 894 |
pub matches: bool, |
| 895 |
pub checked_at: String, |
| 896 |
pub error: Option<String>, |
| 897 |
} |
| 898 |
|
| 899 |
#[instrument(skip_all)] |
| 900 |
pub async fn insert_dns_check( |
| 901 |
pool: &SqlitePool, |
| 902 |
result: &DnsCheckResult, |
| 903 |
) -> Result<i64> { |
| 904 |
let expected = serde_json::to_string(&result.expected).unwrap_or_default(); |
| 905 |
let actual = serde_json::to_string(&result.actual).unwrap_or_default(); |
| 906 |
let record_type_str = result.record_type.to_string(); |
| 907 |
|
| 908 |
let row = sqlx::query( |
| 909 |
"INSERT INTO dns_checks (target, name, record_type, expected, actual, matches, checked_at, error) |
| 910 |
VALUES (?, ?, ?, ?, ?, ?, ?, ?)", |
| 911 |
) |
| 912 |
.bind(&result.target) |
| 913 |
.bind(&result.name) |
| 914 |
.bind(&record_type_str) |
| 915 |
.bind(&expected) |
| 916 |
.bind(&actual) |
| 917 |
.bind(result.matches) |
| 918 |
.bind(&result.checked_at) |
| 919 |
.bind(&result.error) |
| 920 |
.execute(pool) |
| 921 |
.await?; |
| 922 |
Ok(row.last_insert_rowid()) |
| 923 |
} |
| 924 |
|
| 925 |
|
| 926 |
#[instrument(skip_all)] |
| 927 |
pub async fn get_latest_dns_checks( |
| 928 |
pool: &SqlitePool, |
| 929 |
target: &str, |
| 930 |
) -> Result<Vec<DnsCheckRow>> { |
| 931 |
Ok(sqlx::query_as::<_, DnsCheckRow>( |
| 932 |
"SELECT d.id, d.target, d.name, d.record_type, d.expected, d.actual, d.matches, d.checked_at, d.error |
| 933 |
FROM dns_checks d |
| 934 |
INNER JOIN (SELECT name, record_type, MAX(id) as max_id FROM dns_checks WHERE target = ? GROUP BY name, record_type) latest |
| 935 |
ON d.id = latest.max_id |
| 936 |
ORDER BY d.name, d.record_type", |
| 937 |
) |
| 938 |
.bind(target) |
| 939 |
.fetch_all(pool) |
| 940 |
.await?) |
| 941 |
} |
| 942 |
|
| 943 |
|
| 944 |
|
| 945 |
#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] |
| 946 |
pub struct WhoisCheckRow { |
| 947 |
pub id: i64, |
| 948 |
pub target: String, |
| 949 |
pub domain: String, |
| 950 |
pub registrar: Option<String>, |
| 951 |
pub expiry_date: Option<String>, |
| 952 |
pub days_remaining: Option<i64>, |
| 953 |
pub nameservers: Option<String>, |
| 954 |
pub checked_at: String, |
| 955 |
pub error: Option<String>, |
| 956 |
} |
| 957 |
|
| 958 |
#[instrument(skip_all)] |
| 959 |
pub async fn insert_whois_check( |
| 960 |
pool: &SqlitePool, |
| 961 |
result: &WhoisResult, |
| 962 |
) -> Result<i64> { |
| 963 |
let nameservers = serde_json::to_string(&result.nameservers).unwrap_or_default(); |
| 964 |
|
| 965 |
let row = sqlx::query( |
| 966 |
"INSERT INTO whois_checks (target, domain, registrar, expiry_date, days_remaining, nameservers, checked_at, error) |
| 967 |
VALUES (?, ?, ?, ?, ?, ?, ?, ?)", |
| 968 |
) |
| 969 |
.bind(&result.target) |
| 970 |
.bind(&result.domain) |
| 971 |
.bind(&result.registrar) |
| 972 |
.bind(&result.expiry_date) |
| 973 |
.bind(result.days_remaining) |
| 974 |
.bind(&nameservers) |
| 975 |
.bind(&result.checked_at) |
| 976 |
.bind(&result.error) |
| 977 |
.execute(pool) |
| 978 |
.await?; |
| 979 |
Ok(row.last_insert_rowid()) |
| 980 |
} |
| 981 |
|
| 982 |
#[instrument(skip_all)] |
| 983 |
pub async fn get_latest_whois_check( |
| 984 |
pool: &SqlitePool, |
| 985 |
target: &str, |
| 986 |
) -> Result<Option<WhoisCheckRow>> { |
| 987 |
Ok(sqlx::query_as::<_, WhoisCheckRow>( |
| 988 |
"SELECT id, target, domain, registrar, expiry_date, days_remaining, nameservers, checked_at, error |
| 989 |
FROM whois_checks WHERE target = ? ORDER BY id DESC LIMIT 1", |
| 990 |
) |
| 991 |
.bind(target) |
| 992 |
.fetch_optional(pool) |
| 993 |
.await?) |
| 994 |
} |
| 995 |
|
| 996 |
|
| 997 |
|
| 998 |
#[instrument(skip_all)] |
| 999 |
pub async fn insert_cors_check( |
| 1000 |
pool: &SqlitePool, |
| 1001 |
result: &CorsCheckResult, |
| 1002 |
) -> Result<i64> { |
| 1003 |
let row = sqlx::query( |
| 1004 |
"INSERT INTO cors_checks (target, url, origin, method, passes, checked_at, error) |
| 1005 |
VALUES (?, ?, ?, ?, ?, ?, ?)", |
| 1006 |
) |
| 1007 |
.bind(&result.target) |
| 1008 |
.bind(&result.url) |
| 1009 |
.bind(&result.origin) |
| 1010 |
.bind(&result.method) |
| 1011 |
.bind(result.passes) |
| 1012 |
.bind(&result.checked_at) |
| 1013 |
.bind(&result.error) |
| 1014 |
.execute(pool) |
| 1015 |
.await?; |
| 1016 |
Ok(row.last_insert_rowid()) |
| 1017 |
} |
| 1018 |
|
| 1019 |
|
| 1020 |
#[instrument(skip_all)] |
| 1021 |
pub async fn get_latest_cors_checks( |
| 1022 |
pool: &SqlitePool, |
| 1023 |
target: &str, |
| 1024 |
) -> Result<Vec<CorsCheckResult>> { |
| 1025 |
let rows = sqlx::query_as::<_, (String, String, String, String, bool, String, Option<String>)>( |
| 1026 |
"SELECT c.target, c.url, c.origin, c.method, c.passes, c.checked_at, c.error |
| 1027 |
FROM cors_checks c |
| 1028 |
INNER JOIN (SELECT url, MAX(id) as max_id FROM cors_checks WHERE target = ? GROUP BY url) latest |
| 1029 |
ON c.id = latest.max_id |
| 1030 |
ORDER BY c.url", |
| 1031 |
) |
| 1032 |
.bind(target) |
| 1033 |
.fetch_all(pool) |
| 1034 |
.await?; |
| 1035 |
|
| 1036 |
Ok(rows |
| 1037 |
.into_iter() |
| 1038 |
.map(|(target, url, origin, method, passes, checked_at, error)| CorsCheckResult { |
| 1039 |
target, |
| 1040 |
url, |
| 1041 |
origin, |
| 1042 |
method, |
| 1043 |
passes, |
| 1044 |
checked_at, |
| 1045 |
error, |
| 1046 |
}) |
| 1047 |
.collect()) |
| 1048 |
} |
| 1049 |
|
| 1050 |
|
| 1051 |
|
| 1052 |
#[instrument(skip_all)] |
| 1053 |
pub async fn insert_backup_check( |
| 1054 |
pool: &SqlitePool, |
| 1055 |
result: &BackupCheckResult, |
| 1056 |
) -> Result<i64> { |
| 1057 |
let row = sqlx::query( |
| 1058 |
"INSERT INTO backup_checks (target, database_name, status, last_backup_at, size_bytes, age_hours, checked_at, error) |
| 1059 |
VALUES (?, ?, ?, ?, ?, ?, ?, ?)", |
| 1060 |
) |
| 1061 |
.bind(&result.target) |
| 1062 |
.bind(&result.database_name) |
| 1063 |
.bind(&result.status) |
| 1064 |
.bind(&result.last_backup_at) |
| 1065 |
.bind(result.size_bytes) |
| 1066 |
.bind(result.age_hours) |
| 1067 |
.bind(&result.checked_at) |
| 1068 |
.bind(&result.error) |
| 1069 |
.execute(pool) |
| 1070 |
.await?; |
| 1071 |
Ok(row.last_insert_rowid()) |
| 1072 |
} |
| 1073 |
|
| 1074 |
|
| 1075 |
#[instrument(skip_all)] |
| 1076 |
pub async fn get_latest_backup_check( |
| 1077 |
pool: &SqlitePool, |
| 1078 |
target: &str, |
| 1079 |
database_name: &str, |
| 1080 |
) -> Result<Option<BackupCheckRow>> { |
| 1081 |
Ok(sqlx::query_as::<_, BackupCheckRow>( |
| 1082 |
"SELECT id, target, database_name, status, last_backup_at, size_bytes, age_hours, checked_at, error |
| 1083 |
FROM backup_checks WHERE target = ? AND database_name = ? ORDER BY id DESC LIMIT 1", |
| 1084 |
) |
| 1085 |
.bind(target) |
| 1086 |
.bind(database_name) |
| 1087 |
.fetch_optional(pool) |
| 1088 |
.await?) |
| 1089 |
} |
| 1090 |
|
| 1091 |
#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)] |
| 1092 |
pub struct BackupCheckRow { |
| 1093 |
pub id: i64, |
| 1094 |
pub target: String, |
| 1095 |
pub database_name: String, |
| 1096 |
pub status: String, |
| 1097 |
pub last_backup_at: Option<String>, |
| 1098 |
pub size_bytes: Option<i64>, |
| 1099 |
pub age_hours: Option<i64>, |
| 1100 |
pub checked_at: String, |
| 1101 |
pub error: Option<String>, |
| 1102 |
} |
| 1103 |
|
| 1104 |
|
| 1105 |
|
| 1106 |
|
| 1107 |
#[instrument(skip_all)] |
| 1108 |
pub async fn prune_stale_routes( |
| 1109 |
pool: &SqlitePool, |
| 1110 |
target: &str, |
| 1111 |
expected_routes: &[String], |
| 1112 |
) -> Result<u64> { |
| 1113 |
if expected_routes.is_empty() { |
| 1114 |
|
| 1115 |
let result = sqlx::query("DELETE FROM route_checks WHERE target = ?") |
| 1116 |
.bind(target) |
| 1117 |
.execute(pool) |
| 1118 |
.await?; |
| 1119 |
return Ok(result.rows_affected()); |
| 1120 |
} |
| 1121 |
|
| 1122 |
|
| 1123 |
let placeholders: Vec<&str> = expected_routes.iter().map(|_| "?").collect(); |
| 1124 |
let sql = format!( |
| 1125 |
"DELETE FROM route_checks WHERE target = ? AND path NOT IN ({})", |
| 1126 |
placeholders.join(", ") |
| 1127 |
); |
| 1128 |
let mut query = sqlx::query(&sql).bind(target); |
| 1129 |
for route in expected_routes { |
| 1130 |
query = query.bind(route); |
| 1131 |
} |
| 1132 |
let result = query.execute(pool).await?; |
| 1133 |
Ok(result.rows_affected()) |
| 1134 |
} |
| 1135 |
|
| 1136 |
|
| 1137 |
#[instrument(skip_all)] |
| 1138 |
pub async fn prune_stale_dns( |
| 1139 |
pool: &SqlitePool, |
| 1140 |
target: &str, |
| 1141 |
expected_dns: &[(String, String)], |
| 1142 |
) -> Result<u64> { |
| 1143 |
if expected_dns.is_empty() { |
| 1144 |
|
| 1145 |
let result = sqlx::query("DELETE FROM dns_checks WHERE target = ?") |
| 1146 |
.bind(target) |
| 1147 |
.execute(pool) |
| 1148 |
.await?; |
| 1149 |
return Ok(result.rows_affected()); |
| 1150 |
} |
| 1151 |
|
| 1152 |
|
| 1153 |
let conditions: Vec<String> = expected_dns |
| 1154 |
.iter() |
| 1155 |
.map(|_| "(name = ? AND record_type = ?)".to_string()) |
| 1156 |
.collect(); |
| 1157 |
let sql = format!( |
| 1158 |
"DELETE FROM dns_checks WHERE target = ? AND NOT ({})", |
| 1159 |
conditions.join(" OR ") |
| 1160 |
); |
| 1161 |
let mut query = sqlx::query(&sql).bind(target); |
| 1162 |
for (name, record_type) in expected_dns { |
| 1163 |
query = query.bind(name).bind(record_type); |
| 1164 |
} |
| 1165 |
let result = query.execute(pool).await?; |
| 1166 |
Ok(result.rows_affected()) |
| 1167 |
} |
| 1168 |
|
| 1169 |
|
| 1170 |
|
| 1171 |
|
| 1172 |
pub struct PruneResult { |
| 1173 |
pub health: u64, |
| 1174 |
pub tests: u64, |
| 1175 |
pub test_details: u64, |
| 1176 |
pub heartbeats: u64, |
| 1177 |
pub alerts: u64, |
| 1178 |
pub tls: u64, |
| 1179 |
pub incidents: u64, |
| 1180 |
pub routes: u64, |
| 1181 |
pub dns: u64, |
| 1182 |
pub whois: u64, |
| 1183 |
pub backups: u64, |
| 1184 |
} |
| 1185 |
|
| 1186 |
|
| 1187 |
|
| 1188 |
#[instrument(skip_all)] |
| 1189 |
pub async fn prune_old_records( |
| 1190 |
pool: &SqlitePool, |
| 1191 |
days: i64, |
| 1192 |
) -> Result<PruneResult> { |
| 1193 |
|
| 1194 |
|
| 1195 |
if days <= 0 { |
| 1196 |
return Ok(PruneResult { health: 0, tests: 0, test_details: 0, heartbeats: 0, alerts: 0, tls: 0, incidents: 0, routes: 0, dns: 0, whois: 0, backups: 0 }); |
| 1197 |
} |
| 1198 |
|
| 1199 |
let cutoff = chrono::Utc::now() - chrono::Duration::days(days); |
| 1200 |
let cutoff_str = cutoff.to_rfc3339(); |
| 1201 |
|
| 1202 |
let health_result = sqlx::query("DELETE FROM health_checks WHERE checked_at < ?") |
| 1203 |
.bind(&cutoff_str) |
| 1204 |
.execute(pool) |
| 1205 |
.await?; |
| 1206 |
|
| 1207 |
let test_result = sqlx::query("DELETE FROM test_runs WHERE started_at < ?") |
| 1208 |
.bind(&cutoff_str) |
| 1209 |
.execute(pool) |
| 1210 |
.await?; |
| 1211 |
|
| 1212 |
|
| 1213 |
let test_details_result = sqlx::query( |
| 1214 |
"DELETE FROM test_details WHERE run_id NOT IN (SELECT id FROM test_runs)", |
| 1215 |
) |
| 1216 |
.execute(pool) |
| 1217 |
.await?; |
| 1218 |
|
| 1219 |
let peer_hb_result = sqlx::query("DELETE FROM peer_heartbeats WHERE checked_at < ?") |
| 1220 |
.bind(&cutoff_str) |
| 1221 |
.execute(pool) |
| 1222 |
.await?; |
| 1223 |
|
| 1224 |
let alerts_result = sqlx::query("DELETE FROM alerts WHERE sent_at < ?") |
| 1225 |
.bind(&cutoff_str) |
| 1226 |
.execute(pool) |
| 1227 |
.await?; |
| 1228 |
|
| 1229 |
let tls_result = sqlx::query("DELETE FROM tls_checks WHERE checked_at < ?") |
| 1230 |
.bind(&cutoff_str) |
| 1231 |
.execute(pool) |
| 1232 |
.await?; |
| 1233 |
|
| 1234 |
let incidents_result = sqlx::query("DELETE FROM incidents WHERE ended_at IS NOT NULL AND ended_at < ?") |
| 1235 |
.bind(&cutoff_str) |
| 1236 |
.execute(pool) |
| 1237 |
.await?; |
| 1238 |
|
| 1239 |
let routes_result = sqlx::query("DELETE FROM route_checks WHERE checked_at < ?") |
| 1240 |
.bind(&cutoff_str) |
| 1241 |
.execute(pool) |
| 1242 |
.await?; |
| 1243 |
|
| 1244 |
let dns_result = sqlx::query("DELETE FROM dns_checks WHERE checked_at < ?") |
| 1245 |
.bind(&cutoff_str) |
| 1246 |
.execute(pool) |
| 1247 |
.await?; |
| 1248 |
|
| 1249 |
let whois_result = sqlx::query("DELETE FROM whois_checks WHERE checked_at < ?") |
| 1250 |
.bind(&cutoff_str) |
| 1251 |
.execute(pool) |
| 1252 |
.await?; |
| 1253 |
|
| 1254 |
let backups_result = sqlx::query("DELETE FROM backup_checks WHERE checked_at < ?") |
| 1255 |
.bind(&cutoff_str) |
| 1256 |
.execute(pool) |
| 1257 |
.await?; |
| 1258 |
|
| 1259 |
Ok(PruneResult { |
| 1260 |
health: health_result.rows_affected(), |
| 1261 |
tests: test_result.rows_affected(), |
| 1262 |
test_details: test_details_result.rows_affected(), |
| 1263 |
heartbeats: peer_hb_result.rows_affected(), |
| 1264 |
alerts: alerts_result.rows_affected(), |
| 1265 |
tls: tls_result.rows_affected(), |
| 1266 |
incidents: incidents_result.rows_affected(), |
| 1267 |
routes: routes_result.rows_affected(), |
| 1268 |
dns: dns_result.rows_affected(), |
| 1269 |
whois: whois_result.rows_affected(), |
| 1270 |
backups: backups_result.rows_affected(), |
| 1271 |
}) |
| 1272 |
} |
| 1273 |
|
| 1274 |
|
| 1275 |
|
| 1276 |
|
| 1277 |
#[instrument(skip_all)] |
| 1278 |
pub async fn store_peer_identity( |
| 1279 |
pool: &SqlitePool, |
| 1280 |
peer_name: &str, |
| 1281 |
instance_id: &str, |
| 1282 |
) -> Result<()> { |
| 1283 |
let now = chrono::Utc::now().to_rfc3339(); |
| 1284 |
sqlx::query( |
| 1285 |
"INSERT OR IGNORE INTO peer_identities (peer_name, instance_id, first_seen) |
| 1286 |
VALUES (?, ?, ?)", |
| 1287 |
) |
| 1288 |
.bind(peer_name) |
| 1289 |
.bind(instance_id) |
| 1290 |
.bind(&now) |
| 1291 |
.execute(pool) |
| 1292 |
.await?; |
| 1293 |
Ok(()) |
| 1294 |
} |
| 1295 |
|
| 1296 |
|
| 1297 |
#[instrument(skip_all)] |
| 1298 |
pub async fn update_peer_identity( |
| 1299 |
pool: &SqlitePool, |
| 1300 |
peer_name: &str, |
| 1301 |
instance_id: &str, |
| 1302 |
) -> Result<()> { |
| 1303 |
let now = chrono::Utc::now().to_rfc3339(); |
| 1304 |
sqlx::query( |
| 1305 |
"UPDATE peer_identities SET instance_id = ?, first_seen = ? |
| 1306 |
WHERE peer_name = ?", |
| 1307 |
) |
| 1308 |
.bind(instance_id) |
| 1309 |
.bind(&now) |
| 1310 |
.bind(peer_name) |
| 1311 |
.execute(pool) |
| 1312 |
.await?; |
| 1313 |
Ok(()) |
| 1314 |
} |
| 1315 |
|
| 1316 |
|
| 1317 |
#[instrument(skip_all)] |
| 1318 |
pub async fn get_peer_identity( |
| 1319 |
pool: &SqlitePool, |
| 1320 |
peer_name: &str, |
| 1321 |
) -> Result<Option<String>> { |
| 1322 |
let row = sqlx::query_as::<_, (String,)>( |
| 1323 |
"SELECT instance_id FROM peer_identities WHERE peer_name = ?", |
| 1324 |
) |
| 1325 |
.bind(peer_name) |
| 1326 |
.fetch_optional(pool) |
| 1327 |
.await?; |
| 1328 |
Ok(row.map(|r| r.0)) |
| 1329 |
} |
| 1330 |
|
| 1331 |
|
| 1332 |
|
| 1333 |
#[instrument(skip_all)] |
| 1334 |
pub async fn insert_peer_heartbeat( |
| 1335 |
pool: &SqlitePool, |
| 1336 |
peer_name: &str, |
| 1337 |
status: &str, |
| 1338 |
latency_ms: i64, |
| 1339 |
) -> Result<i64> { |
| 1340 |
let now = chrono::Utc::now().to_rfc3339(); |
| 1341 |
let result = sqlx::query( |
| 1342 |
"INSERT INTO peer_heartbeats (peer_name, status, latency_ms, checked_at) |
| 1343 |
VALUES (?, ?, ?, ?)", |
| 1344 |
) |
| 1345 |
.bind(peer_name) |
| 1346 |
.bind(status) |
| 1347 |
.bind(latency_ms) |
| 1348 |
.bind(&now) |
| 1349 |
.execute(pool) |
| 1350 |
.await?; |
| 1351 |
Ok(result.last_insert_rowid()) |
| 1352 |
} |
| 1353 |
|
| 1354 |
#[instrument(skip_all)] |
| 1355 |
pub async fn get_peer_heartbeat_history( |
| 1356 |
pool: &SqlitePool, |
| 1357 |
peer_name: &str, |
| 1358 |
limit: i64, |
| 1359 |
) -> Result<Vec<PeerHeartbeatRow>> { |
| 1360 |
Ok(sqlx::query_as::<_, PeerHeartbeatRow>( |
| 1361 |
"SELECT id, peer_name, status, latency_ms, checked_at |
| 1362 |
FROM peer_heartbeats WHERE peer_name = ? ORDER BY id DESC LIMIT ?", |
| 1363 |
) |
| 1364 |
.bind(peer_name) |
| 1365 |
.bind(limit) |
| 1366 |
.fetch_all(pool) |
| 1367 |
.await?) |
| 1368 |
} |
| 1369 |
|
| 1370 |
#[derive(Debug, sqlx::FromRow, serde::Serialize)] |
| 1371 |
pub struct PeerHeartbeatRow { |
| 1372 |
pub id: i64, |
| 1373 |
pub peer_name: String, |
| 1374 |
pub status: String, |
| 1375 |
pub latency_ms: i64, |
| 1376 |
pub checked_at: String, |
| 1377 |
} |
| 1378 |
|
| 1379 |
|
| 1380 |
|
| 1381 |
#[derive(sqlx::FromRow)] |
| 1382 |
struct HealthCheckRow { |
| 1383 |
id: i64, |
| 1384 |
target: String, |
| 1385 |
status: String, |
| 1386 |
checked_at: String, |
| 1387 |
response_time_ms: i64, |
| 1388 |
details_json: Option<String>, |
| 1389 |
error: Option<String>, |
| 1390 |
} |
| 1391 |
|
| 1392 |
impl HealthCheckRow { |
| 1393 |
fn into_snapshot(self) -> HealthSnapshot { |
| 1394 |
let status = self |
| 1395 |
.status |
| 1396 |
.parse::<HealthStatus>() |
| 1397 |
.unwrap_or(HealthStatus::Error); |
| 1398 |
let details = self |
| 1399 |
.details_json |
| 1400 |
.as_deref() |
| 1401 |
.and_then(|s| serde_json::from_str::<HealthDetails>(s).ok()); |
| 1402 |
|
| 1403 |
HealthSnapshot { |
| 1404 |
id: Some(self.id), |
| 1405 |
target: self.target, |
| 1406 |
status, |
| 1407 |
checked_at: self.checked_at, |
| 1408 |
response_time_ms: self.response_time_ms, |
| 1409 |
details, |
| 1410 |
error: self.error, |
| 1411 |
} |
| 1412 |
} |
| 1413 |
} |
| 1414 |
|
| 1415 |
#[derive(sqlx::FromRow)] |
| 1416 |
struct TestRunRow { |
| 1417 |
id: i64, |
| 1418 |
target: String, |
| 1419 |
started_at: String, |
| 1420 |
finished_at: Option<String>, |
| 1421 |
duration_secs: Option<i64>, |
| 1422 |
exit_code: Option<i32>, |
| 1423 |
passed: bool, |
| 1424 |
summary_json: String, |
| 1425 |
raw_output: String, |
| 1426 |
filter: Option<String>, |
| 1427 |
} |
| 1428 |
|
| 1429 |
impl TestRunRow { |
| 1430 |
fn into_test_run(self) -> TestRun { |
| 1431 |
let summary = serde_json::from_str::<TestSummary>(&self.summary_json).unwrap_or(TestSummary { |
| 1432 |
steps: vec![], |
| 1433 |
total_passed: None, |
| 1434 |
total_failed: None, |
| 1435 |
details: vec![], |
| 1436 |
}); |
| 1437 |
|
| 1438 |
TestRun { |
| 1439 |
id: Some(self.id), |
| 1440 |
target: self.target, |
| 1441 |
started_at: self.started_at, |
| 1442 |
finished_at: self.finished_at, |
| 1443 |
duration_secs: self.duration_secs, |
| 1444 |
exit_code: self.exit_code, |
| 1445 |
passed: self.passed, |
| 1446 |
summary, |
| 1447 |
raw_output: self.raw_output, |
| 1448 |
filter: self.filter, |
| 1449 |
} |
| 1450 |
} |
| 1451 |
} |
| 1452 |
|