Skip to main content

max / pom

Audit remediation: observability, security hardening, MT monitoring, docs Multi-round audit improvements: - Observability A: 57 #[instrument] annotations across checks, API, alerts, display - Security: API auth + rate limiting, peer mesh auth, graceful shutdown with CancellationToken + grace period, self-monitoring /api/health endpoint - Monitoring: MT target added (astra health + TLS + routes), htpy.app target added, MNW health page integration (incidents, expandable checks, route status) - Features: TLS cert monitoring, response validation, latency trending, anomaly detection, incident history, smart test prompting, route spec checks - Code documentation: module-level docs, public function docs, README, description.md - Deploy: updated configs for 3 targets, deploy.sh for astra + hetzner Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-03-14 03:36 UTC
Commit: a661dc1aa4a78cfc261779dc1922d94b2f16de6e
Parent: bf22a1e
27 files changed, +2414 insertions, -106 deletions
M .gitignore +4
@@ -3,6 +3,10 @@
3 3 *.db-wal
4 4 *.db-shm
5 5
6 + # Environment
7 + .env
8 + .env.*
9 +
6 10 # OS
7 11 .DS_Store
8 12
M Cargo.lock +113 -8
@@ -88,7 +88,7 @@ version = "0.6.2"
88 88 source = "registry+https://github.com/rust-lang/crates.io-index"
89 89 checksum = "5493c3bedbacf7fd7382c6346bbd66687d12bbaad3a89a2d2c303ee6cf20b048"
90 90 dependencies = [
91 - "asn1-rs-derive",
91 + "asn1-rs-derive 0.5.1",
92 92 "asn1-rs-impl",
93 93 "displaydoc",
94 94 "nom",
@@ -99,6 +99,22 @@ dependencies = [
99 99 ]
100 100
101 101 [[package]]
102 + name = "asn1-rs"
103 + version = "0.7.1"
104 + source = "registry+https://github.com/rust-lang/crates.io-index"
105 + checksum = "56624a96882bb8c26d61312ae18cb45868e5a9992ea73c58e45c3101e56a1e60"
106 + dependencies = [
107 + "asn1-rs-derive 0.6.0",
108 + "asn1-rs-impl",
109 + "displaydoc",
110 + "nom",
111 + "num-traits",
112 + "rusticata-macros",
113 + "thiserror 2.0.18",
114 + "time",
115 + ]
116 +
117 + [[package]]
102 118 name = "asn1-rs-derive"
103 119 version = "0.5.1"
104 120 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -111,6 +127,18 @@ dependencies = [
111 127 ]
112 128
113 129 [[package]]
130 + name = "asn1-rs-derive"
131 + version = "0.6.0"
132 + source = "registry+https://github.com/rust-lang/crates.io-index"
133 + checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c"
134 + dependencies = [
135 + "proc-macro2",
136 + "quote",
137 + "syn",
138 + "synstructure",
139 + ]
140 +
141 + [[package]]
114 142 name = "asn1-rs-impl"
115 143 version = "0.2.0"
116 144 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -454,7 +482,21 @@ version = "9.0.0"
454 482 source = "registry+https://github.com/rust-lang/crates.io-index"
455 483 checksum = "5cd0a5c643689626bec213c4d8bd4d96acc8ffdb4ad4bb6bc16abf27d5f4b553"
456 484 dependencies = [
457 - "asn1-rs",
485 + "asn1-rs 0.6.2",
486 + "displaydoc",
487 + "nom",
488 + "num-bigint",
489 + "num-traits",
490 + "rusticata-macros",
491 + ]
492 +
493 + [[package]]
494 + name = "der-parser"
495 + version = "10.0.0"
496 + source = "registry+https://github.com/rust-lang/crates.io-index"
497 + checksum = "07da5016415d5a3c4dd39b11ed26f915f52fc4e0dc197d87908bc916e51bc1a6"
498 + dependencies = [
499 + "asn1-rs 0.7.1",
458 500 "displaydoc",
459 501 "nom",
460 502 "num-bigint",
@@ -1360,7 +1402,16 @@ version = "0.7.1"
1360 1402 source = "registry+https://github.com/rust-lang/crates.io-index"
1361 1403 checksum = "a8d8034d9489cdaf79228eb9f6a3b8d7bb32ba00d6645ebd48eef4077ceb5bd9"
1362 1404 dependencies = [
1363 - "asn1-rs",
1405 + "asn1-rs 0.6.2",
1406 + ]
1407 +
1408 + [[package]]
1409 + name = "oid-registry"
1410 + version = "0.8.1"
1411 + source = "registry+https://github.com/rust-lang/crates.io-index"
1412 + checksum = "12f40cff3dde1b6087cc5d5f5d4d65712f34016a03ed60e9c08dcc392736b5b7"
1413 + dependencies = [
1414 + "asn1-rs 0.7.1",
1364 1415 ]
1365 1416
1366 1417 [[package]]
@@ -1417,6 +1468,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
1417 1468 checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
1418 1469
1419 1470 [[package]]
1471 + name = "pem"
1472 + version = "3.0.6"
1473 + source = "registry+https://github.com/rust-lang/crates.io-index"
1474 + checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be"
1475 + dependencies = [
1476 + "base64 0.22.1",
1477 + "serde_core",
1478 + ]
1479 +
1480 + [[package]]
1420 1481 name = "pem-rfc7468"
1421 1482 version = "0.7.0"
1422 1483 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1478,7 +1539,7 @@ checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
1478 1539
1479 1540 [[package]]
1480 1541 name = "pom"
1481 - version = "0.2.1"
1542 + version = "0.2.3"
1482 1543 dependencies = [
1483 1544 "axum",
1484 1545 "chrono",
@@ -1486,6 +1547,7 @@ dependencies = [
1486 1547 "dirs",
1487 1548 "hostname",
1488 1549 "http-body-util",
1550 + "rcgen",
1489 1551 "reqwest",
1490 1552 "rmcp",
1491 1553 "rustls-pki-types",
@@ -1496,13 +1558,14 @@ dependencies = [
1496 1558 "thiserror 2.0.18",
1497 1559 "tokio",
1498 1560 "tokio-rustls",
1561 + "tokio-util",
1499 1562 "toml",
1500 1563 "tower",
1501 1564 "tracing",
1502 1565 "tracing-subscriber",
1503 1566 "uuid",
1504 1567 "webpki-roots",
1505 - "x509-parser",
1568 + "x509-parser 0.16.0",
1506 1569 ]
1507 1570
1508 1571 [[package]]
@@ -1684,6 +1747,20 @@ dependencies = [
1684 1747 ]
1685 1748
1686 1749 [[package]]
1750 + name = "rcgen"
1751 + version = "0.14.7"
1752 + source = "registry+https://github.com/rust-lang/crates.io-index"
1753 + checksum = "10b99e0098aa4082912d4c649628623db6aba77335e4f4569ff5083a6448b32e"
1754 + dependencies = [
1755 + "pem",
1756 + "ring",
1757 + "rustls-pki-types",
1758 + "time",
1759 + "x509-parser 0.18.1",
1760 + "yasna",
1761 + ]
1762 +
1763 + [[package]]
1687 1764 name = "redox_syscall"
1688 1765 version = "0.5.18"
1689 1766 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2531,6 +2608,7 @@ dependencies = [
2531 2608 "bytes",
2532 2609 "futures-core",
2533 2610 "futures-sink",
2611 + "futures-util",
2534 2612 "pin-project-lite",
2535 2613 "tokio",
2536 2614 ]
@@ -3274,18 +3352,45 @@ version = "0.16.0"
3274 3352 source = "registry+https://github.com/rust-lang/crates.io-index"
3275 3353 checksum = "fcbc162f30700d6f3f82a24bf7cc62ffe7caea42c0b2cba8bf7f3ae50cf51f69"
3276 3354 dependencies = [
3277 - "asn1-rs",
3355 + "asn1-rs 0.6.2",
3278 3356 "data-encoding",
3279 - "der-parser",
3357 + "der-parser 9.0.0",
3280 3358 "lazy_static",
3281 3359 "nom",
3282 - "oid-registry",
3360 + "oid-registry 0.7.1",
3283 3361 "rusticata-macros",
3284 3362 "thiserror 1.0.69",
3285 3363 "time",
3286 3364 ]
3287 3365
3288 3366 [[package]]
3367 + name = "x509-parser"
3368 + version = "0.18.1"
3369 + source = "registry+https://github.com/rust-lang/crates.io-index"
3370 + checksum = "d43b0f71ce057da06bc0851b23ee24f3f86190b07203dd8f567d0b706a185202"
3371 + dependencies = [
3372 + "asn1-rs 0.7.1",
3373 + "data-encoding",
3374 + "der-parser 10.0.0",
3375 + "lazy_static",
3376 + "nom",
3377 + "oid-registry 0.8.1",
3378 + "ring",
3379 + "rusticata-macros",
3380 + "thiserror 2.0.18",
3381 + "time",
3382 + ]
3383 +
3384 + [[package]]
3385 + name = "yasna"
3386 + version = "0.5.2"
3387 + source = "registry+https://github.com/rust-lang/crates.io-index"
3388 + checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd"
3389 + dependencies = [
3390 + "time",
3391 + ]
3392 +
3393 + [[package]]
3289 3394 name = "yoke"
3290 3395 version = "0.8.1"
3291 3396 source = "registry+https://github.com/rust-lang/crates.io-index"
M Cargo.toml +6 -1
@@ -1,7 +1,8 @@
1 1 [package]
2 2 name = "pom"
3 - version = "0.2.2"
3 + version = "0.2.3"
4 4 edition = "2024"
5 + license-file = "LICENSE"
5 6
6 7 [lib]
7 8 name = "pom"
@@ -61,6 +62,10 @@ webpki-roots = "1"
61 62 tracing = "0.1"
62 63 tracing-subscriber = { version = "0.3", features = ["env-filter"] }
63 64
65 + # Graceful shutdown
66 + tokio-util = { version = "0.7", features = ["rt"] }
67 +
64 68 [dev-dependencies]
65 69 tower = { version = "0.5", features = ["util"] }
66 70 http-body-util = "0.1"
71 + rcgen = "0.14.7"
A LICENSE +120
@@ -0,0 +1,120 @@
1 + Required Notice: Copyright 2026 Make Creative, LLC (https://makenot.work)
2 +
3 + PolyForm Noncommercial License 1.0.0
4 +
5 + <https://polyformproject.org/licenses/noncommercial/1.0.0>
6 +
7 + Acceptance
8 +
9 + In order to get any license under these terms, you must agree to them as
10 + both strict obligations and conditions to all your licenses.
11 +
12 + Copyright License
13 +
14 + The licensor grants you a copyright license for the software to do
15 + everything you might do with the software that would otherwise infringe
16 + the licensor's copyright in it for any permitted purpose. However, you
17 + may only distribute the software according to Distribution License and
18 + make changes or new works based on the software according to Changes and
19 + New Works License.
20 +
21 + Distribution License
22 +
23 + The licensor grants you an additional copyright license to distribute
24 + copies of the software. Your license to distribute covers distributing
25 + the software with changes and new works permitted by Changes and New
26 + Works License.
27 +
28 + Notices
29 +
30 + You must ensure that anyone who gets a copy of any part of the software
31 + from you also gets a copy of these terms or the URL for them above, as
32 + well as copies of any plain-text lines beginning with "Required Notice:"
33 + that the licensor provided with the software. For example:
34 +
35 + Required Notice: Copyright Yoyodyne, Inc. (http://example.com)
36 +
37 + Changes and New Works License
38 +
39 + The licensor grants you an additional copyright license to make changes
40 + and new works based on the software for any permitted purpose.
41 +
42 + Patent License
43 +
44 + The licensor grants you a patent license for the software that covers
45 + patent claims the licensor can license, or becomes able to license, that
46 + you would infringe by using the software.
47 +
48 + Noncommercial Purposes
49 +
50 + Any noncommercial purpose is a permitted purpose.
51 +
52 + Personal Uses
53 +
54 + Personal use for research, experiment, and testing for the benefit of
55 + public knowledge, personal study, private entertainment, hobby projects,
56 + amateur pursuits, or religious observance, without any anticipated
57 + commercial application, is use for a permitted purpose.
58 +
59 + Noncommercial Organizations
60 +
61 + Use by any charitable organization, educational institution, public
62 + research organization, public safety or health organization,
63 + environmental protection organization, or government institution is use
64 + for a permitted purpose regardless of the source of funding or
65 + obligations resulting from the funding.
66 +
67 + Fair Use
68 +
69 + You may have "fair use" rights for the software under the law. These
70 + terms do not limit them.
71 +
72 + No Other Rights
73 +
74 + These terms do not allow you to sublicense or transfer any of your
75 + licenses to anyone else, or prevent the licensor from granting licenses
76 + to anyone else. These terms do not imply any other licenses.
77 +
78 + Patent Defense
79 +
80 + If you make any written claim that the software infringes or contributes
81 + to infringement of any patent, your patent license for the software
82 + granted under these terms ends immediately. If your company makes such a
83 + claim, your patent license ends immediately for work on behalf of your
84 + company.
85 +
86 + Violations
87 +
88 + The first time you are notified in writing that you have violated any of
89 + these terms, or done anything with the software not covered by your
90 + licenses, your licenses can nonetheless continue if you come into full
91 + compliance with these terms, and take practical steps to correct past
92 + violations, within 32 days of receiving notice. Otherwise, all your
93 + licenses end immediately.
94 +
95 + No Liability
96 +
97 + As far as the law allows, the software comes as is, without any warranty
98 + or condition, and the licensor will not be liable to you for any damages
99 + arising out of these terms or the use or nature of the software, under
100 + any kind of legal claim.
101 +
102 + Definitions
103 +
104 + The licensor is the individual or entity offering these terms, and the
105 + software is the software the licensor makes available under these terms.
106 +
107 + You refers to the individual or entity agreeing to these terms.
108 +
109 + Your company is any legal entity, sole proprietorship, or other kind of
110 + organization that you work for, plus all organizations that have control
111 + over, are under the control of, or are under common control with that
112 + organization. Control means ownership of substantially all the assets of
113 + an entity, or the power to direct its management and policies by vote,
114 + contract, or otherwise. Control can be direct or indirect.
115 +
116 + Your licenses are all the licenses granted to you for the software under
117 + these terms.
118 +
119 + Use means anything you do with the software requiring one of your
120 + licenses.
A README.md +62
@@ -0,0 +1,62 @@
1 + # Peace of Mind
2 +
3 + A production operations monitor -- health checks, TLS certificate tracking, remote test orchestration, peer mesh, and email alerts. Built with Rust, Tokio, Axum, and SQLite.
4 +
5 + ## Prerequisites
6 +
7 + - **Rust** (stable toolchain, 2024 edition)
8 + - **Config file** at `~/.config/pom/pom.toml` (targets, peers, alert settings)
9 +
10 + ## Usage
11 +
12 + PoM operates in three modes: CLI, HTTP daemon, and MCP server.
13 +
14 + ```sh
15 + # Run all configured health checks once
16 + pom check
17 +
18 + # Start the HTTP API daemon (periodic checks, peer heartbeats, pruning)
19 + pom serve
20 +
21 + # Start as an MCP server (stdio transport, for Claude integration)
22 + pom mcp
23 +
24 + # Show current status of all targets
25 + pom status
26 +
27 + # Run remote test suites via SSH
28 + pom test
29 +
30 + # Show TLS certificate expiry for monitored hosts
31 + pom tls
32 + ```
33 +
34 + ## Configuration
35 +
36 + PoM reads `~/.config/pom/pom.toml`. The config defines:
37 +
38 + - **Targets** -- HTTP endpoints to monitor, with expected status codes, JSON field checks, body substring matches, and check intervals
39 + - **Peers** -- other PoM instances in the mesh (URL, bearer token, heartbeat interval, grace period)
40 + - **Alerts** -- Postmark API credentials, recipient addresses, per-target cooldowns (falls back to stdout in dev mode)
41 + - **TLS** -- hosts to probe for certificate expiry warnings
42 + - **Tests** -- SSH targets and commands for remote test suite execution
43 +
44 + ## Module Overview
45 +
46 + | Module | Role |
47 + |--------|------|
48 + | `main.rs` / `cli.rs` | Binary entry point, CLI argument parsing and dispatch |
49 + | `config.rs` | TOML config loading and validation |
50 + | `types.rs` | Shared domain types |
51 + | `checks/` | HTTP health checks, TLS probes, SSH test runners |
52 + | `peer.rs` | Peer mesh heartbeats, identity verification, grace periods |
53 + | `db.rs` | SQLite persistence (incidents, history, trends) |
54 + | `api.rs` | Axum HTTP API (status, trends, mesh data) |
55 + | `alerts.rs` | Email alerts via Postmark API |
56 + | `tools/` | MCP tool definitions for Claude integration |
57 + | `display.rs` | Terminal output formatting |
58 + | `error.rs` | Error types |
59 +
60 + ## License
61 +
62 + PolyForm Noncommercial 1.0.0
@@ -45,6 +45,10 @@ deploy_target() {
45 45 scp "$SCRIPT_DIR/pom.service" "$host:/tmp/pom.service"
46 46
47 47 ssh "$host" "$sudo_prefix mv /tmp/pom /usr/local/bin/pom && $sudo_prefix chmod +x /usr/local/bin/pom && $sudo_prefix mv /tmp/pom.toml /etc/pom/pom.toml && $sudo_prefix mv /tmp/pom.service /etc/systemd/system/pom.service"
48 +
49 + # Create env file with Postmark token if it doesn't exist
50 + ssh "$host" "if [ ! -f /etc/pom/env ]; then echo 'POM_POSTMARK_TOKEN=SET_ME' | $sudo_prefix tee /etc/pom/env > /dev/null && $sudo_prefix chmod 600 /etc/pom/env; fi"
51 +
48 52 ssh "$host" "$sudo_prefix systemctl daemon-reload && $sudo_prefix systemctl enable pom && $sudo_prefix systemctl restart pom"
49 53
50 54 echo "=== $name: deployed ==="
@@ -3,12 +3,15 @@ interval_secs = 300
3 3 prune_days = 30
4 4 listen = "0.0.0.0:9100"
5 5 peer_heartbeat_secs = 60
6 + route_check_interval_secs = 300
7 + # api_token loaded from POM_API_TOKEN env var
6 8
7 9 [instance]
8 10 name = "astra"
9 11
10 12 [targets.mnw]
11 13 label = "Makenotwork Production"
14 + expected_routes = ["/", "/discover", "/login", "/docs", "/health"]
12 15
13 16 [targets.mnw.health]
14 17 url = "https://makenot.work/api/health"
@@ -31,14 +34,78 @@ command = "/home/max/staging/run-ci.sh"
31 34 timeout_secs = 600
32 35 staleness_days = 7
33 36
37 + [targets.mt]
38 + label = "Multithreaded Forum"
39 + expected_routes = ["/"]
40 +
41 + [targets.mt.health]
42 + url = "http://127.0.0.1:3400/api/health"
43 + timeout_secs = 5
44 +
45 + [targets.mt.health.expect]
46 + status_code = 200
47 + json_fields = { "status" = "operational" }
48 +
49 + [targets.htpy]
50 + label = "htpy.app"
51 +
52 + [targets.htpy.health]
53 + url = "http://100.99.153.68:8080/archive/S_2"
54 + timeout_secs = 10
55 +
56 + [targets.htpy.health.expect]
57 + status_code = 200
58 + body_contains = "htpy"
59 +
60 + [targets.htpy.tls]
61 + host = "htpy.app"
62 +
63 + [targets.go]
64 + label = "GoingsOn"
65 +
66 + [targets.go.tests]
67 + ssh = "max@100.106.221.39"
68 + command = "cd /home/max/staging/goingson && cargo test --workspace 2>&1"
69 + timeout_secs = 600
70 + staleness_days = 7
71 +
72 + [targets.bb]
73 + label = "Balanced Breakfast"
74 +
75 + [targets.bb.tests]
76 + ssh = "max@100.106.221.39"
77 + command = "cd /home/max/staging/balanced_breakfast && cargo test --workspace 2>&1"
78 + timeout_secs = 600
79 + staleness_days = 7
80 +
81 + [targets.af]
82 + label = "AudioFiles"
83 +
84 + [targets.af.tests]
85 + ssh = "max@100.106.221.39"
86 + command = "cd /home/max/staging/audiofiles && cargo test --workspace 2>&1"
87 + timeout_secs = 600
88 + staleness_days = 7
89 +
90 + [targets.sk]
91 + label = "SyncKit SDK"
92 +
93 + [targets.sk.tests]
94 + ssh = "max@100.106.221.39"
95 + command = "cd /home/max/staging/synckit-client && cargo test 2>&1"
96 + timeout_secs = 300
97 + staleness_days = 7
98 +
34 99 [peers.hetzner]
35 100 address = "100.120.174.96:9100"
36 101 on_missing = "alert"
102 + # token = "<hetzner's POM_API_TOKEN value>"
37 103
38 104 [peers.macbook]
39 105 address = "100.100.246.136:9100"
40 106 on_missing = "log"
107 + # token = "<macbook's POM_API_TOKEN value>"
41 108
42 109 [alerts]
43 - postmark_token = "29ec3abb-b3c7-4ffd-bf51-09d0f31865cd"
110 + # postmark_token loaded from POM_POSTMARK_TOKEN env var
44 111 to = "pom-alerts@makenot.work"
@@ -3,12 +3,15 @@ interval_secs = 300
3 3 prune_days = 30
4 4 listen = "0.0.0.0:9100"
5 5 peer_heartbeat_secs = 60
6 + route_check_interval_secs = 300
7 + # api_token loaded from POM_API_TOKEN env var
6 8
7 9 [instance]
8 10 name = "hetzner"
9 11
10 12 [targets.mnw]
11 13 label = "Makenotwork Production"
14 + expected_routes = ["/", "/discover", "/login", "/docs", "/health"]
12 15
13 16 [targets.mnw.health]
14 17 url = "https://makenot.work/api/health"
@@ -31,14 +34,42 @@ command = "/home/max/staging/run-ci.sh"
31 34 timeout_secs = 600
32 35 staleness_days = 7
33 36
37 + [targets.mt]
38 + label = "Multithreaded Forum"
39 + expected_routes = ["/"]
40 +
41 + [targets.mt.health]
42 + url = "http://100.106.221.39:3400/api/health"
43 + timeout_secs = 10
44 +
45 + [targets.mt.health.expect]
46 + status_code = 200
47 + json_fields = { "status" = "operational" }
48 +
49 + [targets.htpy]
50 + label = "htpy.app"
51 +
52 + [targets.htpy.health]
53 + url = "http://100.99.153.68:8080/archive/S_2"
54 + timeout_secs = 10
55 +
56 + [targets.htpy.health.expect]
57 + status_code = 200
58 + body_contains = "htpy"
59 +
60 + [targets.htpy.tls]
61 + host = "htpy.app"
62 +
34 63 [peers.astra]
35 64 address = "100.106.221.39:9100"
36 65 on_missing = "alert"
66 + # token = "<astra's POM_API_TOKEN value>"
37 67
38 68 [peers.macbook]
39 69 address = "100.100.246.136:9100"
40 70 on_missing = "log"
71 + # token = "<macbook's POM_API_TOKEN value>"
41 72
42 73 [alerts]
43 - postmark_token = "29ec3abb-b3c7-4ffd-bf51-09d0f31865cd"
74 + # postmark_token loaded from POM_POSTMARK_TOKEN env var
44 75 to = "pom-alerts@makenot.work"
@@ -5,6 +5,7 @@ Wants=network-online.target
5 5
6 6 [Service]
7 7 Type=simple
8 + EnvironmentFile=-/etc/pom/env
8 9 ExecStart=/usr/local/bin/pom serve --config /etc/pom/pom.toml
9 10 Restart=on-failure
10 11 RestartSec=10
M src/alerts.rs +118 -4
@@ -4,7 +4,7 @@
4 4 //! If no `postmark_token` is configured, alerts are logged to stdout instead.
5 5
6 6 use sqlx::SqlitePool;
7 - use tracing::{info, warn};
7 + use tracing::{info, instrument, warn};
8 8
9 9 use crate::config::AlertConfig;
10 10 use crate::db;
@@ -26,6 +26,7 @@ impl Alerter {
26 26 Self { config, client, pool, instance_name }
27 27 }
28 28
29 + #[instrument(skip_all)]
29 30 pub async fn send_health_alert(
30 31 &self,
31 32 target: &str,
@@ -55,9 +56,10 @@ impl Alerter {
55 56 body.push_str("\n- PoM");
56 57
57 58 self.send_email(&subject, &body).await;
58 - self.record_alert(target, "health", Some(from_status), Some(to_status), error).await;
59 + self.record_alert(&alert_key, "health", Some(from_status), Some(to_status), error).await;
59 60 }
60 61
62 + #[instrument(skip_all)]
61 63 pub async fn send_health_recovery(
62 64 &self,
63 65 target: &str,
@@ -81,6 +83,7 @@ impl Alerter {
81 83 self.record_alert(&alert_key, "recovery", Some(from_status), Some("operational"), None).await;
82 84 }
83 85
86 + #[instrument(skip_all)]
84 87 pub async fn send_tls_expiry_alert(
85 88 &self,
86 89 target: &str,
@@ -111,6 +114,7 @@ impl Alerter {
111 114 self.record_alert(&alert_key, "tls_expiry", None, None, None).await;
112 115 }
113 116
117 + #[instrument(skip_all)]
114 118 pub async fn send_tls_error_alert(
115 119 &self,
116 120 target: &str,
@@ -139,6 +143,7 @@ impl Alerter {
139 143 self.record_alert(&alert_key, "tls_error", None, None, Some(error)).await;
140 144 }
141 145
146 + #[instrument(skip_all)]
142 147 pub async fn send_tls_recovery(
143 148 &self,
144 149 target: &str,
@@ -162,6 +167,7 @@ impl Alerter {
162 167 self.record_alert(&alert_key, "tls_recovery", None, None, None).await;
163 168 }
164 169
170 + #[instrument(skip_all)]
165 171 pub async fn send_peer_missing(
166 172 &self,
167 173 peer_name: &str,
@@ -190,6 +196,7 @@ impl Alerter {
190 196 self.record_alert(&alert_key, "peer_missing", None, None, None).await;
191 197 }
192 198
199 + #[instrument(skip_all)]
193 200 pub async fn send_peer_recovery(
194 201 &self,
195 202 peer_name: &str,
@@ -211,6 +218,62 @@ impl Alerter {
211 218 self.record_alert(&alert_key, "peer_recovery", None, None, None).await;
212 219 }
213 220
221 + #[instrument(skip_all)]
222 + pub async fn send_route_failure_alert(
223 + &self,
224 + target: &str,
225 + label: &str,
226 + failed_paths: &[String],
227 + ) {
228 + let alert_key = format!("route:{target}");
229 + if self.is_within_cooldown(&alert_key).await {
230 + info!("alert cooldown active for {alert_key}, skipping");
231 + return;
232 + }
233 +
234 + let n = failed_paths.len();
235 + let subject = format!("[PoM] {label}: {n} route(s) failing");
236 + let body = format!(
237 + "Target: {label} ({target})\n\
238 + Failed routes:\n{}\n\
239 + Instance: {}\n\
240 + Time: {}\n\n\
241 + - PoM",
242 + failed_paths.iter().map(|p| format!(" - {p}")).collect::<Vec<_>>().join("\n"),
243 + self.instance_name,
244 + chrono::Utc::now().to_rfc3339(),
245 + );
246 +
247 + self.send_email(&subject, &body).await;
248 + self.record_alert(&alert_key, "route_failure", None, None, None).await;
249 + }
250 +
251 + #[instrument(skip_all)]
252 + pub async fn send_route_recovery_alert(
253 + &self,
254 + target: &str,
255 + label: &str,
256 + recovered_paths: &[String],
257 + ) {
258 + // No cooldown on recovery — always send
259 + let alert_key = format!("route:{target}");
260 + let subject = format!("[PoM] {label}: routes recovered");
261 + let body = format!(
262 + "Target: {label} ({target})\n\
263 + Recovered routes:\n{}\n\
264 + Instance: {}\n\
265 + Time: {}\n\n\
266 + - PoM",
267 + recovered_paths.iter().map(|p| format!(" - {p}")).collect::<Vec<_>>().join("\n"),
268 + self.instance_name,
269 + chrono::Utc::now().to_rfc3339(),
270 + );
271 +
272 + self.send_email(&subject, &body).await;
273 + self.record_alert(&alert_key, "route_recovery", None, None, None).await;
274 + }
275 +
276 + #[instrument(skip_all)]
214 277 pub async fn send_latency_drift_alert(
215 278 &self,
216 279 target: &str,
@@ -238,6 +301,7 @@ impl Alerter {
238 301 self.record_alert(&alert_key, "latency_drift", None, None, Some(drift_message)).await;
239 302 }
240 303
304 + #[instrument(skip_all)]
241 305 pub async fn send_latency_recovery(
242 306 &self,
243 307 target: &str,
@@ -378,12 +442,62 @@ mod tests {
378 442 // This should log instead of making HTTP calls (no panic, no error)
379 443 alerter.send_health_alert("mnw", "MakeNotWork", "operational", "error", None).await;
380 444
381 - // Verify alert was recorded in DB
382 - let latest = db::get_latest_alert_for_target(&pool, "mnw").await.unwrap();
445 + // Verify alert was recorded in DB with the prefixed key (health:mnw),
446 + // matching the cooldown lookup key format.
447 + let latest = db::get_latest_alert_for_target(&pool, "health:mnw").await.unwrap();
383 448 assert!(latest.is_some());
384 449 let row = latest.unwrap();
385 450 assert_eq!(row.alert_type, "health");
386 451 assert_eq!(row.from_status.as_deref(), Some("operational"));
387 452 assert_eq!(row.to_status.as_deref(), Some("error"));
388 453 }
454 +
455 + #[tokio::test]
456 + async fn route_alert_cooldown_key() {
457 + let pool = db::connect_in_memory().await.unwrap();
458 + let alerter = test_alerter(pool.clone());
459 +
460 + assert!(!alerter.is_within_cooldown("route:mnw").await);
461 +
462 + alerter.send_route_failure_alert("mnw", "MakeNotWork", &["/docs/faq".to_string()]).await;
463 +
464 + assert!(alerter.is_within_cooldown("route:mnw").await);
465 + assert!(!alerter.is_within_cooldown("route:mt").await);
466 + }
467 +
468 + #[tokio::test]
469 + async fn recovery_does_not_start_cooldown_for_next_failure() {
470 + let pool = db::connect_in_memory().await.unwrap();
471 + let alerter = test_alerter(pool.clone());
472 +
473 + // Send a failure alert — starts cooldown
474 + alerter.send_health_alert("mnw", "MakeNotWork", "operational", "error", None).await;
475 + assert!(alerter.is_within_cooldown("health:mnw").await);
476 +
477 + // Send a recovery alert (always sends, no cooldown check)
478 + alerter.send_health_recovery("mnw", "MakeNotWork", "error").await;
479 +
480 + // The recovery alert should NOT reset cooldown for failures.
481 + // is_within_cooldown now excludes recovery-type alerts, so it checks
482 + // the original failure alert's timestamp — which is still within cooldown.
483 + assert!(alerter.is_within_cooldown("health:mnw").await);
484 + }
485 +
486 + #[tokio::test]
487 + async fn health_alert_cooldown_key_matches_record_key() {
488 + let pool = db::connect_in_memory().await.unwrap();
489 + let alerter = test_alerter(pool.clone());
490 +
491 + // Not in cooldown initially
492 + assert!(!alerter.is_within_cooldown("health:example.com").await);
493 +
494 + // Send an alert for "example.com"
495 + alerter.send_health_alert("example.com", "Example", "operational", "error", None).await;
496 +
497 + // Same target should now be in cooldown
498 + assert!(alerter.is_within_cooldown("health:example.com").await);
499 +
500 + // Different target should NOT be in cooldown
501 + assert!(!alerter.is_within_cooldown("health:other.com").await);
502 + }
389 503 }
M src/api.rs +242 -3
@@ -2,9 +2,11 @@
2 2
3 3 use std::collections::HashMap;
4 4 use std::sync::Arc;
5 + use std::sync::atomic::{AtomicU64, Ordering};
5 6
6 - use axum::extract::{Path, State as AxumState};
7 + use axum::extract::{Path, Request, State as AxumState};
7 8 use axum::http::StatusCode;
9 + use axum::middleware::{self, Next};
8 10 use axum::response::IntoResponse;
9 11 use axum::routing::get;
10 12 use axum::{Json, Router};
@@ -16,12 +18,101 @@ use crate::db;
16 18 use crate::peer::SharedMeshState;
17 19 use crate::types::{HealthSnapshot, LatencyBucket, LatencyStats, TestStaleness};
18 20
21 + /// Fixed-window rate limiter: allows `max_per_window` requests per `window_duration`.
22 + #[derive(Clone)]
23 + pub struct RateLimiter {
24 + count: Arc<AtomicU64>,
25 + window_start: Arc<std::sync::Mutex<std::time::Instant>>,
26 + max_per_window: u64,
27 + window_duration: std::time::Duration,
28 + }
29 +
30 + impl RateLimiter {
31 + pub fn new(max_per_window: u64, window_duration: std::time::Duration) -> Self {
32 + Self {
33 + count: Arc::new(AtomicU64::new(0)),
34 + window_start: Arc::new(std::sync::Mutex::new(std::time::Instant::now())),
35 + max_per_window,
36 + window_duration,
37 + }
38 + }
39 +
40 + pub fn try_acquire(&self) -> bool {
41 + let mut start = self.window_start.lock().unwrap();
42 + let now = std::time::Instant::now();
43 + if now.duration_since(*start) > self.window_duration {
44 + *start = now;
45 + self.count.store(1, Ordering::Relaxed);
46 + true
47 + } else {
48 + let prev = self.count.fetch_add(1, Ordering::Relaxed);
49 + prev < self.max_per_window
50 + }
51 + }
52 + }
53 +
19 54 /// Shared state for the API server.
20 55 #[derive(Clone)]
21 56 pub struct ApiState {
22 57 pub pool: sqlx::SqlitePool,
23 58 pub config: Arc<Config>,
24 59 pub mesh: Option<SharedMeshState>,
60 + pub rate_limiter: RateLimiter,
61 + }
62 +
63 + /// Rate limiting middleware. Returns 429 if the request rate exceeds the limit.
64 + async fn rate_limit(
65 + AxumState(state): AxumState<ApiState>,
66 + req: Request,
67 + next: Next,
68 + ) -> impl IntoResponse {
69 + if state.rate_limiter.try_acquire() {
70 + Ok(next.run(req).await)
71 + } else {
72 + Err((StatusCode::TOO_MANY_REQUESTS, Json(serde_json::json!({
73 + "error": "rate limit exceeded"
74 + }))))
75 + }
76 + }
77 +
78 + /// Bearer token authentication middleware.
79 + /// If `api_token` is configured, requires `Authorization: Bearer <token>` on every request.
80 + /// If no token is configured, all requests pass through.
81 + async fn require_bearer_token(
82 + AxumState(state): AxumState<ApiState>,
83 + req: Request,
84 + next: Next,
85 + ) -> impl IntoResponse {
86 + let expected = state.config.serve.api_token.as_deref();
87 + let Some(expected) = expected else {
88 + return Ok(next.run(req).await);
89 + };
90 +
91 + let auth_header = req.headers().get("authorization").and_then(|v| v.to_str().ok());
92 + match auth_header {
93 + Some(header) if header.starts_with("Bearer ") => {
94 + let token = &header[7..];
95 + if token == expected {
96 + Ok(next.run(req).await)
97 + } else {
98 + Err((StatusCode::UNAUTHORIZED, Json(serde_json::json!({
99 + "error": "invalid bearer token"
100 + }))))
101 + }
102 + }
103 + _ => Err((StatusCode::UNAUTHORIZED, Json(serde_json::json!({
104 + "error": "missing or malformed Authorization header"
105 + })))),
106 + }
107 + }
108 +
109 + /// `GET /api/health` — simple health endpoint for PoM itself.
110 + /// Not behind auth — allows external monitoring without credentials.
111 + async fn self_health() -> impl IntoResponse {
112 + Json(serde_json::json!({
113 + "status": "operational",
114 + "version": env!("CARGO_PKG_VERSION"),
115 + }))
25 116 }
26 117
27 118 /// Build the axum router for the PoM API.
@@ -30,51 +121,88 @@ pub fn router(pool: sqlx::SqlitePool, config: Config, mesh: Option<SharedMeshSta
30 121 pool,
31 122 config: Arc::new(config),
32 123 mesh,
124 + rate_limiter: RateLimiter::new(60, std::time::Duration::from_secs(60)),
33 125 };
34 126
35 - Router::new()
127 + // Authenticated routes (behind bearer token + rate limit)
128 + let authenticated = Router::new()
36 129 .route("/api/status", get(status_all))
37 130 .route("/api/status/{target}", get(status_target))
38 131 .route("/api/trends/{target}", get(trends))
39 132 .route("/api/peer/info", get(peer_info))
40 133 .route("/api/peer/status", get(peer_status))
41 134 .route("/api/mesh", get(mesh_view))
42 - .with_state(state)
135 + .layer(middleware::from_fn_with_state(state.clone(), require_bearer_token))
136 + .layer(middleware::from_fn_with_state(state.clone(), rate_limit));
137 +
138 + // Public routes (no auth, no rate limit)
139 + let public = Router::new()
140 + .route("/api/health", get(self_health));
141 +
142 + public.merge(authenticated).with_state(state)
43 143 }
44 144
45 145 // --- Response types ---
46 146
47 147 #[derive(Serialize)]
48 148 struct StatusResponse {
149 + /// Per-target status summaries, keyed by target config name.
49 150 targets: HashMap<String, TargetStatus>,
50 151 }
51 152
52 153 #[derive(Serialize)]
53 154 struct TargetStatus {
155 + /// Human-readable display label for this target.
54 156 label: String,
157 + /// Most recent health check snapshot. `None` if no checks have been recorded yet.
55 158 latest: Option<SnapshotJson>,
159 + /// Last 10 health check snapshots, most recent first.
56 160 recent: Vec<SnapshotJson>,
161 + /// Uptime percentage over the last 24 hours. `None` if no checks in that window.
57 162 uptime_24h: Option<f64>,
163 + /// Uptime percentage over the last 7 days. `None` if no checks in that window.
58 164 uptime_7d: Option<f64>,
165 + /// Latency statistics over the last 24 hours. Omitted if no operational checks exist.
59 166 #[serde(skip_serializing_if = "Option::is_none")]
60 167 latency_24h: Option<LatencyStats>,
168 + /// Latest TLS certificate check result. Omitted if TLS monitoring is not configured.
61 169 #[serde(skip_serializing_if = "Option::is_none")]
62 170 tls: Option<db::TlsCheckRow>,
171 + /// Test staleness assessment. Omitted if test running is not configured for this target.
63 172 #[serde(skip_serializing_if = "Option::is_none")]
64 173 test_staleness: Option<TestStaleness>,
174 + /// Currently open incident. Omitted if the target is not in an incident state.
65 175 #[serde(skip_serializing_if = "Option::is_none")]
66 176 current_incident: Option<db::IncidentRow>,
177 + /// Recent resolved and open incidents (up to 10). Omitted if empty.
67 178 #[serde(skip_serializing_if = "Vec::is_empty")]
68 179 incidents: Vec<db::IncidentRow>,
180 + /// Latest route check results per path. Omitted if empty.
181 + #[serde(skip_serializing_if = "Vec::is_empty")]
182 + route_status: Vec<RouteStatusJson>,
183 + }
184 +
185 + #[derive(Serialize)]
186 + struct RouteStatusJson {
187 + path: String,
188 + status_code: i64,
189 + ok: bool,
190 + checked_at: String,
191 + response_time_ms: i64,
69 192 }
70 193
71 194 #[derive(Serialize)]
72 195 struct SnapshotJson {
196 + /// Health status as a lowercase string (e.g. "operational", "degraded").
73 197 status: String,
198 + /// Timestamp of the check in RFC 3339 format.
74 199 checked_at: String,
200 + /// Round-trip response time in milliseconds.
75 201 response_time_ms: i64,
202 + /// Structured health details from the endpoint. Omitted when unavailable.
76 203 #[serde(skip_serializing_if = "Option::is_none")]
77 204 details: Option<serde_json::Value>,
205 + /// Error message if the check failed. Omitted on success.
78 206 #[serde(skip_serializing_if = "Option::is_none")]
79 207 error: Option<String>,
80 208 }
@@ -168,6 +296,20 @@ async fn build_target_status(
168 296 .await
169 297 .unwrap_or_default();
170 298
299 + let route_checks = db::get_latest_route_checks(pool, name)
300 + .await
301 + .unwrap_or_default();
302 + let route_status: Vec<RouteStatusJson> = route_checks
303 + .into_iter()
304 + .map(|r| RouteStatusJson {
305 + path: r.path,
306 + status_code: r.status_code,
307 + ok: r.ok,
308 + checked_at: r.checked_at,
309 + response_time_ms: r.response_time_ms,
310 + })
311 + .collect();
312 +
171 313 TargetStatus {
172 314 label: label.to_string(),
173 315 latest,
@@ -179,6 +321,7 @@ async fn build_target_status(
179 321 test_staleness,
180 322 current_incident,
181 323 incidents,
324 + route_status,
182 325 }
183 326 }
184 327
@@ -344,11 +487,17 @@ async fn mesh_view(
344 487
345 488 #[derive(Serialize)]
346 489 struct TrendResponse {
490 + /// Target config name this trend data belongs to.
347 491 target: String,
492 + /// Requested time window in hours (from query param, default 24).
348 493 window_hours: u64,
494 + /// Requested bucket width in minutes (from query param, default 60).
349 495 bucket_minutes: u64,
496 + /// Per-bucket latency statistics within the requested window.
350 497 buckets: Vec<LatencyBucket>,
498 + /// Aggregate latency statistics across the entire requested window.
351 499 overall: Option<LatencyStats>,
500 + /// 7-day baseline latency statistics for drift comparison.
352 501 baseline: Option<LatencyStats>,
353 502 }
354 503
@@ -406,6 +555,96 @@ async fn trends(
406 555
407 556 #[derive(serde::Deserialize)]
408 557 struct TrendQueryParams {
558 + /// Time window to query, in hours. Defaults to 24 if omitted.
409 559 hours: Option<u64>,
560 + /// Width of each latency bucket, in minutes. Defaults to 60 if omitted.
410 561 bucket_minutes: Option<u64>,
411 562 }
563 +
564 + #[cfg(test)]
565 + mod tests {
566 + use super::*;
567 + use axum::body::Body;
568 + use axum::http::Request as HttpRequest;
569 + use tower::ServiceExt;
570 +
571 + fn test_config(api_token: Option<&str>) -> Config {
572 + let mut config = Config {
573 + serve: crate::config::ServeConfig::default(),
574 + instance: Default::default(),
575 + targets: HashMap::new(),
576 + peers: HashMap::new(),
577 + alerts: None,
578 + };
579 + config.serve.api_token = api_token.map(|s| s.to_string());
580 + config
581 + }
582 +
583 + #[tokio::test]
584 + async fn no_token_configured_allows_all_requests() {
585 + let pool = crate::db::connect_in_memory().await.unwrap();
586 + let app = router(pool, test_config(None), None);
587 +
588 + let req = HttpRequest::builder()
589 + .uri("/api/status")
590 + .body(Body::empty())
591 + .unwrap();
592 + let resp = app.oneshot(req).await.unwrap();
593 + assert_eq!(resp.status(), StatusCode::OK);
594 + }
595 +
596 + #[tokio::test]
597 + async fn valid_token_allows_request() {
598 + let pool = crate::db::connect_in_memory().await.unwrap();
599 + let app = router(pool, test_config(Some("secret123")), None);
600 +
601 + let req = HttpRequest::builder()
602 + .uri("/api/status")
603 + .header("authorization", "Bearer secret123")
604 + .body(Body::empty())
605 + .unwrap();
606 + let resp = app.oneshot(req).await.unwrap();
607 + assert_eq!(resp.status(), StatusCode::OK);
608 + }
609 +
610 + #[tokio::test]
611 + async fn wrong_token_returns_401() {
612 + let pool = crate::db::connect_in_memory().await.unwrap();
613 + let app = router(pool, test_config(Some("secret123")), None);
614 +
615 + let req = HttpRequest::builder()
616 + .uri("/api/status")
617 + .header("authorization", "Bearer wrong-token")
618 + .body(Body::empty())
619 + .unwrap();
620 + let resp = app.oneshot(req).await.unwrap();
621 + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
622 + }
623 +
624 + #[tokio::test]
625 + async fn missing_header_returns_401() {
626 + let pool = crate::db::connect_in_memory().await.unwrap();
627 + let app = router(pool, test_config(Some("secret123")), None);
628 +
629 + let req = HttpRequest::builder()
630 + .uri("/api/status")
631 + .body(Body::empty())
632 + .unwrap();
633 + let resp = app.oneshot(req).await.unwrap();
634 + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
635 + }
636 +
637 + #[tokio::test]
638 + async fn malformed_header_returns_401() {
639 + let pool = crate::db::connect_in_memory().await.unwrap();
640 + let app = router(pool, test_config(Some("secret123")), None);
641 +
642 + let req = HttpRequest::builder()
643 + .uri("/api/status")
644 + .header("authorization", "Basic dXNlcjpwYXNz")
645 + .body(Body::empty())
646 + .unwrap();
647 + let resp = app.oneshot(req).await.unwrap();
648 + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
649 + }
650 + }
@@ -1,8 +1,14 @@
1 1 use std::time::Instant;
2 2
3 + use tracing::instrument;
4 +
3 5 use crate::config::{HealthConfig, HealthExpectation};
4 6 use crate::types::{HealthDetails, HealthSnapshot, HealthStatus};
5 7
8 + /// Maximum response body size we'll read into memory (10 MB).
9 + const MAX_RESPONSE_BYTES: u64 = 10 * 1024 * 1024;
10 +
11 + #[instrument(skip_all)]
6 12 pub async fn check_health(
7 13 target_name: &str,
8 14 config: &HealthConfig,
@@ -21,7 +27,40 @@ pub async fn check_health(
21 27 let response_time_ms = start.elapsed().as_millis() as i64;
22 28 let status_code = response.status().as_u16();
23 29
24 - match response.text().await {
30 + // Reject responses that declare a content-length exceeding our limit.
31 + if let Some(len) = response.content_length() {
32 + if len > MAX_RESPONSE_BYTES {
33 + return HealthSnapshot {
34 + id: None,
35 + target: target_name.to_string(),
36 + status: HealthStatus::Degraded,
37 + checked_at,
38 + response_time_ms,
39 + details: None,
40 + error: Some(format!(
41 + "Response body too large: {len} bytes (limit: {MAX_RESPONSE_BYTES} bytes)"
42 + )),
43 + };
44 + }
45 + }
46 +
47 + // Read body with size cap (handles chunked/streaming responses without content-length).
48 + let body_result = match response.bytes().await {
49 + Ok(bytes) => {
50 + if bytes.len() as u64 > MAX_RESPONSE_BYTES {
51 + Err(format!(
52 + "Response body too large: {} bytes (limit: {MAX_RESPONSE_BYTES} bytes)",
53 + bytes.len()
54 + ))
55 + } else {
56 + String::from_utf8(bytes.to_vec())
57 + .map_err(|e| format!("Response body not valid UTF-8: {e}"))
58 + }
59 + }
60 + Err(e) => Err(format!("Failed to read response body: {e}")),
61 + };
62 +
63 + match body_result {
25 64 Ok(body) => {
26 65 let json: Option<serde_json::Value> = serde_json::from_str(&body).ok();
27 66
@@ -38,6 +77,10 @@ pub async fn check_health(
38 77 if !failures.is_empty() {
39 78 status = HealthStatus::Degraded;
40 79 error = Some(failures.join("; "));
80 + } else if json.is_none() {
81 + // Non-JSON response but all expectations passed — treat as operational
82 + status = HealthStatus::Operational;
83 + error = None;
41 84 }
42 85 }
43 86
@@ -54,11 +97,11 @@ pub async fn check_health(
54 97 Err(e) => HealthSnapshot {
55 98 id: None,
56 99 target: target_name.to_string(),
57 - status: classify_non_json(status_code),
100 + status: HealthStatus::Degraded,
58 101 checked_at,
59 102 response_time_ms,
60 103 details: None,
61 - error: Some(format!("Failed to read response body: {e}")),
104 + error: Some(e),
62 105 },
63 106 }
64 107 }
@@ -1,4 +1,5 @@
1 1 pub mod http;
2 2 pub mod parse;
3 + pub mod routes;
3 4 pub mod ssh;
4 5 pub mod tls;
@@ -0,0 +1,168 @@
1 + //! Route accessibility checks — verify expected pages are reachable.
2 +
3 + use std::time::{Duration, Instant};
4 +
5 + use tracing::instrument;
6 +
7 + /// Result of checking a single route.
8 + #[derive(Debug, Clone)]
9 + pub struct RouteCheckResult {
10 + pub target: String,
11 + pub path: String,
12 + pub status_code: u16,
13 + pub ok: bool,
14 + pub checked_at: String,
15 + pub response_time_ms: i64,
16 + pub error: Option<String>,
17 + }
18 +
19 + /// Extract the base URL (scheme + host + port) from a health endpoint URL.
20 + ///
21 + /// ```text
22 + /// "https://makenot.work/api/health" → "https://makenot.work"
23 + /// "http://100.106.221.39:3400/api/health" → "http://100.106.221.39:3400"
24 + /// ```
25 + pub fn base_url_from_health_url(health_url: &str) -> Option<String> {
26 + let rest = health_url.strip_prefix("https://")
27 + .map(|r| ("https", r))
28 + .or_else(|| health_url.strip_prefix("http://").map(|r| ("http", r)))?;
29 + let (scheme, after_scheme) = rest;
30 + // Host (+ optional port) ends at the first '/' or end of string
31 + let authority = after_scheme.split('/').next()?;
32 + if authority.is_empty() {
33 + return None;
34 + }
35 + Some(format!("{scheme}://{authority}"))
36 + }
37 +
38 + /// Check all expected routes for a target, returning one result per path.
39 + ///
40 + /// Runs sequentially within a target to avoid hammering the server.
41 + #[instrument(skip_all, fields(target, route_count = paths.len()))]
42 + pub async fn check_routes(
43 + client: &reqwest::Client,
44 + target: &str,
45 + base_url: &str,
46 + paths: &[String],
47 + timeout: Duration,
48 + ) -> Vec<RouteCheckResult> {
49 + let mut results = Vec::with_capacity(paths.len());
50 +
51 + for path in paths {
52 + let url = format!("{base_url}{path}");
53 + let checked_at = chrono::Utc::now().to_rfc3339();
54 + let start = Instant::now();
55 +
56 + let result = match client
57 + .get(&url)
58 + .timeout(timeout)
59 + .send()
60 + .await
61 + {
62 + Ok(response) => {
63 + let status_code = response.status().as_u16();
64 + let ok = response.status().is_success();
65 + RouteCheckResult {
66 + target: target.to_string(),
67 + path: path.clone(),
68 + status_code,
69 + ok,
70 + checked_at,
71 + response_time_ms: start.elapsed().as_millis() as i64,
72 + error: if ok { None } else { Some(format!("HTTP {status_code}")) },
73 + }
74 + }
75 + Err(e) => {
76 + let error_msg = if e.is_timeout() {
77 + "timeout".to_string()
78 + } else {
79 + e.to_string()
80 + };
81 + RouteCheckResult {
82 + target: target.to_string(),
83 + path: path.clone(),
84 + status_code: 0,
85 + ok: false,
86 + checked_at,
87 + response_time_ms: start.elapsed().as_millis() as i64,
88 + error: Some(error_msg),
89 + }
90 + }
91 + };
92 +
93 + results.push(result);
94 + }
95 +
96 + results
97 + }
98 +
99 + #[cfg(test)]
100 + mod tests {
101 + use super::*;
102 +
103 + #[test]
104 + fn base_url_from_https() {
105 + assert_eq!(
106 + base_url_from_health_url("https://makenot.work/api/health"),
107 + Some("https://makenot.work".to_string()),
108 + );
109 + }
110 +
111 + #[test]
112 + fn base_url_from_http_with_port() {
113 + assert_eq!(
114 + base_url_from_health_url("http://100.106.221.39:3400/api/health"),
115 + Some("http://100.106.221.39:3400".to_string()),
116 + );
117 + }
118 +
119 + #[test]
120 + fn base_url_from_invalid_url() {
121 + assert_eq!(base_url_from_health_url("not-a-url"), None);
122 + }
123 +
124 + #[test]
125 + fn base_url_from_localhost() {
126 + assert_eq!(
127 + base_url_from_health_url("http://127.0.0.1:9100/api/health"),
128 + Some("http://127.0.0.1:9100".to_string()),
129 + );
130 + }
131 +
132 + #[tokio::test]
133 + async fn check_routes_handles_unreachable() {
134 + let client = reqwest::Client::builder()
135 + .timeout(Duration::from_millis(100))
136 + .build()
137 + .unwrap();
138 +
139 + let results = check_routes(
140 + &client,
141 + "test",
142 + "http://127.0.0.1:19999",
143 + &["/".to_string()],
144 + Duration::from_millis(100),
145 + )
146 + .await;
147 +
148 + assert_eq!(results.len(), 1);
149 + assert!(!results[0].ok);
150 + assert_eq!(results[0].target, "test");
151 + assert_eq!(results[0].path, "/");
152 + assert!(results[0].error.is_some());
153 + }
154 +
155 + #[tokio::test]
156 + async fn check_routes_empty_paths_returns_empty() {
157 + let client = reqwest::Client::builder().build().unwrap();
158 + let results = check_routes(
159 + &client,
160 + "test",
161 + "http://127.0.0.1:19999",
162 + &[],
163 + Duration::from_millis(100),
164 + )
165 + .await;
166 + assert!(results.is_empty());
167 + }
168 + }
@@ -1,9 +1,11 @@
1 1 use tokio::process::Command;
2 + use tracing::instrument;
2 3
3 4 use crate::checks::parse;
4 5 use crate::config::TestsConfig;
5 6 use crate::types::{TestRun, TestSummary};
6 7
8 + #[instrument(skip_all)]
7 9 pub async fn run_tests(
8 10 target_name: &str,
9 11 config: &TestsConfig,
@@ -12,6 +14,33 @@ pub async fn run_tests(
12 14 let started_at = chrono::Utc::now().to_rfc3339();
13 15 let start = std::time::Instant::now();
14 16
17 + // Validate filter characters before appending to SSH command.
18 + // Only allow alphanumeric, underscore, colon, dash — covers all valid Rust test filter patterns.
19 + if let Some(f) = filter {
20 + if !f.chars().all(|c| c.is_alphanumeric() || c == '_' || c == ':' || c == '-') {
21 + let finished_at = chrono::Utc::now().to_rfc3339();
22 + let duration_secs = start.elapsed().as_secs() as i64;
23 + return TestRun {
24 + id: None,
25 + target: target_name.to_string(),
26 + started_at,
27 + finished_at: Some(finished_at),
28 + duration_secs: Some(duration_secs),
29 + exit_code: None,
30 + passed: false,
31 + summary: TestSummary {
32 + steps: vec![],
33 + total_passed: None,
34 + total_failed: None,
35 + },
36 + raw_output: format!(
37 + "Invalid filter: contains characters outside [a-zA-Z0-9_:-]. Got: {f}"
38 + ),
39 + filter: Some(f.to_string()),
40 + };
41 + }
42 + }
43 +
15 44 let mut cmd_str = config.command.clone();
16 45 if let Some(f) = filter {
17 46 cmd_str.push(' ');
@@ -22,8 +51,9 @@ pub async fn run_tests(
22 51 .arg("-o")
23 52 .arg("BatchMode=yes")
24 53 .arg("-o")
25 - .arg("ConnectTimeout=10")
54 + .arg(format!("ConnectTimeout={}", config.timeout_secs))
26 55 .arg(&config.ssh)
56 + .arg("--")
27 57 .arg(&cmd_str)
28 58 .output()
29 59 .await;
@@ -6,10 +6,13 @@ use tokio::net::TcpStream;
6 6 use tokio_rustls::rustls;
7 7 use tokio_rustls::TlsConnector;
8 8
9 + use tracing::instrument;
10 +
9 11 use crate::config::TlsConfig;
10 12 use crate::types::TlsStatus;
11 13
12 14 /// Connect to host:port, complete TLS handshake, and extract leaf cert fields.
15 + #[instrument(skip_all)]
13 16 pub async fn check_tls(target_name: &str, config: &TlsConfig) -> TlsStatus {
14 17 let checked_at = chrono::Utc::now().to_rfc3339();
15 18 let addr = format!("{}:{}", config.host, config.port);
@@ -83,7 +86,14 @@ pub fn parse_leaf_cert(
83 86 .unwrap_or(now);
84 87 let not_before_chrono = chrono::DateTime::from_timestamp(not_before_ts, 0)
85 88 .unwrap_or(now);
86 - let days_remaining = (not_after_chrono - now).num_days();
89 +
90 + // Use date-level comparison for consistent day boundary behavior.
91 + // A certificate expiring today (same calendar day in UTC) gets 0 days remaining
92 + // and is treated as expired. This avoids time-of-day inconsistencies where
93 + // num_days() might return 0 for both "expires later today" and "expired earlier today".
94 + let today = now.date_naive();
95 + let expiry_date = not_after_chrono.date_naive();
96 + let days_remaining = (expiry_date - today).num_days();
87 97
88 98 let subject = cert.subject().to_string();
89 99 let issuer = cert.issuer().to_string();
@@ -150,4 +160,69 @@ mod tests {
150 160 assert_eq!(result.days_remaining, 0);
151 161 assert_eq!(result.error.as_deref(), Some("connection refused"));
152 162 }
163 +
164 + /// Generate a self-signed DER certificate with the given validity period.
165 + fn make_cert_der(not_before: chrono::DateTime<chrono::Utc>, not_after: chrono::DateTime<chrono::Utc>) -> Vec<u8> {
166 + use rcgen::{CertificateParams, KeyPair};
167 +
168 + let mut params = CertificateParams::new(vec!["example.com".to_string()]).unwrap();
169 + // Convert chrono::DateTime<Utc> -> std::time::SystemTime -> time::OffsetDateTime
170 + let nb_sys = std::time::UNIX_EPOCH + std::time::Duration::from_secs(not_before.timestamp() as u64);
171 + let na_sys = std::time::UNIX_EPOCH + std::time::Duration::from_secs(not_after.timestamp() as u64);
172 + params.not_before = nb_sys.into();
173 + params.not_after = na_sys.into();
174 +
175 + let key = KeyPair::generate().unwrap();
176 + let cert = params.self_signed(&key).unwrap();
177 + cert.der().to_vec()
178 + }
179 +
180 + #[test]
181 + fn tls_cert_expiring_today_is_zero_days_and_invalid() {
182 + let config = test_config();
183 + let now = chrono::Utc::now();
184 + let today_start = now.date_naive().and_hms_opt(0, 0, 0).unwrap()
185 + .and_utc();
186 + let today_end = now.date_naive().and_hms_opt(23, 59, 59).unwrap()
187 + .and_utc();
188 +
189 + // Cert that started yesterday, expires today
190 + let not_before = today_start - chrono::Duration::days(1);
191 + let der = make_cert_der(not_before, today_end);
192 + let result = parse_leaf_cert("test", &config, &now.to_rfc3339(), &der);
193 +
194 + assert_eq!(result.days_remaining, 0, "cert expiring today should show 0 days");
195 + assert!(!result.valid, "cert expiring today should be invalid");
196 + assert!(result.error.is_none());
197 + }
198 +
199 + #[test]
200 + fn tls_cert_expiring_tomorrow_has_one_day_remaining() {
201 + let config = test_config();
202 + let now = chrono::Utc::now();
203 + let tomorrow = (now + chrono::Duration::days(1)).date_naive()
204 + .and_hms_opt(23, 59, 59).unwrap().and_utc();
205 +
206 + let not_before = now - chrono::Duration::days(30);
207 + let der = make_cert_der(not_before, tomorrow);
208 + let result = parse_leaf_cert("test", &config, &now.to_rfc3339(), &der);
209 +
210 + assert_eq!(result.days_remaining, 1, "cert expiring tomorrow should show 1 day");
211 + assert!(result.valid, "cert expiring tomorrow should be valid");
212 + }
213 +
214 + #[test]
215 + fn tls_cert_already_expired_is_invalid() {
216 + let config = test_config();
217 + let now = chrono::Utc::now();
218 + let yesterday = (now - chrono::Duration::days(1)).date_naive()
219 + .and_hms_opt(23, 59, 59).unwrap().and_utc();
220 +
221 + let not_before = now - chrono::Duration::days(90);
222 + let der = make_cert_der(not_before, yesterday);
223 + let result = parse_leaf_cert("test", &config, &now.to_rfc3339(), &der);
224 +
225 + assert!(result.days_remaining <= 0, "expired cert should show 0 or negative days, got {}", result.days_remaining);
226 + assert!(!result.valid, "expired cert should be invalid");
227 + }
153 228 }
M src/cli.rs +148 -17
@@ -4,7 +4,7 @@ use clap::Subcommand;
4 4 use tracing::info;
5 5
6 6 use pom::alerts::Alerter;
7 - use pom::checks::{http, ssh, tls};
7 + use pom::checks::{http, routes, ssh, tls};
8 8 use pom::config::Config;
9 9 use pom::db;
10 10 use pom::display;
@@ -126,6 +126,7 @@ pub(crate) async fn cmd_status(
126 126 let target = config.get_target(&name).unwrap();
127 127 let health = db::get_latest_health(pool, &name).await?;
128 128 let tls_check = db::get_latest_tls_check(pool, &name).await?;
129 + let route_checks = db::get_latest_route_checks(pool, &name).await?;
129 130 let test = db::get_latest_test_run(pool, &name).await?;
130 131 let incident = db::get_open_incident(pool, &name).await?;
131 132
@@ -180,6 +181,7 @@ pub(crate) async fn cmd_status(
180 181 "incident": incident,
181 182 }));
182 183 } else {
184 + let route_slice = if route_checks.is_empty() { None } else { Some(route_checks.as_slice()) };
183 185 print!(
184 186 "{}",
185 187 display::format_status_target(
@@ -188,6 +190,7 @@ pub(crate) async fn cmd_status(
188 190 health.as_ref(),
189 191 latency_24h.as_ref(),
190 192 tls_check.as_ref(),
193 + route_slice,
191 194 test.as_ref(),
192 195 staleness.as_ref(),
193 196 incident.as_ref(),
@@ -245,10 +248,10 @@ pub(crate) async fn cmd_prune(
245 248 pool: &sqlx::SqlitePool,
246 249 days: i64,
247 250 ) -> Result<()> {
248 - let (health_pruned, test_pruned, heartbeat_pruned, alerts_pruned, tls_pruned, incidents_pruned) = db::prune_old_records(pool, days).await?;
251 + let (health_pruned, test_pruned, heartbeat_pruned, alerts_pruned, tls_pruned, incidents_pruned, routes_pruned) = db::prune_old_records(pool, days).await?;
249 252 print!(
250 253 "{}",
251 - display::format_prune(health_pruned, test_pruned, heartbeat_pruned, alerts_pruned, tls_pruned, incidents_pruned, days),
254 + display::format_prune(health_pruned, test_pruned, heartbeat_pruned, alerts_pruned, tls_pruned, incidents_pruned, routes_pruned, days),
252 255 );
253 256 Ok(())
254 257 }
@@ -261,6 +264,9 @@ pub(crate) async fn cmd_serve(
261 264 let prune_days = config.serve.prune_days;
262 265 let listen_addr = config.serve.listen.clone();
263 266
267 + // --- Cancellation token for graceful shutdown ---
268 + let token = tokio_util::sync::CancellationToken::new();
269 +
264 270 // --- Instance identity ---
265 271 let instance_id = peer::load_or_create_instance_id(config.instance.id.as_deref())?;
266 272 let instance_name = config.instance_name();
@@ -305,6 +311,7 @@ pub(crate) async fn cmd_serve(
305 311 let name = name.clone();
306 312 let label = target_config.label.clone();
307 313 let alerter = alerter.clone();
314 + let cancel = token.clone();
308 315
309 316 info!("{name}: health check every {interval_secs}s");
310 317
@@ -316,8 +323,12 @@ pub(crate) async fn cmd_serve(
316 323 );
317 324 let expect = health_config.expect.as_ref();
318 325 let mut in_drift = false;
326 + interval.tick().await; // consume immediate first tick
319 327 loop {
320 - interval.tick().await;
328 + tokio::select! {
329 + _ = cancel.cancelled() => break,
330 + _ = interval.tick() => {}
331 + }
321 332 let previous = db::get_latest_health(&pool, &name).await.ok().flatten();
322 333 let snapshot = http::check_health(&name, &health_config, expect).await;
323 334 info!("{}: {} ({}ms)", name, snapshot.status, snapshot.response_time_ms);
@@ -425,6 +436,7 @@ pub(crate) async fn cmd_serve(
425 436 let label = target_config.label.clone();
426 437 let alerter = alerter.clone();
427 438 let warn_days = tls_config.warn_days;
439 + let cancel = token.clone();
428 440
429 441 info!("{name}: TLS check every {tls_interval_secs}s (host={})", tls_config.host);
430 442
@@ -432,8 +444,12 @@ pub(crate) async fn cmd_serve(
432 444 let mut interval = tokio::time::interval(
433 445 std::time::Duration::from_secs(tls_interval_secs),
434 446 );
447 + interval.tick().await; // consume immediate first tick
435 448 loop {
436 - interval.tick().await;
449 + tokio::select! {
450 + _ = cancel.cancelled() => break,
451 + _ = interval.tick() => {}
452 + }
437 453 let previous = db::get_latest_tls_check(&pool, &name).await.ok().flatten();
438 454 let status = tls::check_tls(&name, &tls_config).await;
439 455 info!("{}: TLS {} — {}d remaining", name, if status.valid { "valid" } else { "invalid" }, status.days_remaining);
@@ -477,16 +493,99 @@ pub(crate) async fn cmd_serve(
477 493 }
478 494 }
479 495
496 + // Spawn route check tasks
497 + let route_interval = config.serve.route_check_interval_secs;
498 + for name in config.target_names() {
499 + let target_config = config.get_target(&name).unwrap().clone();
500 + if target_config.expected_routes.is_empty() {
501 + continue;
502 + }
503 + let Some(ref health_config) = target_config.health else { continue };
504 + let Some(base_url) = routes::base_url_from_health_url(&health_config.url) else { continue };
505 + let route_paths = target_config.expected_routes.clone();
506 + let timeout = std::time::Duration::from_secs(health_config.timeout_secs);
507 + let label = target_config.label.clone();
508 + let pool = pool.clone();
509 + let alerter = alerter.clone();
510 + let n = route_paths.len();
511 + let cancel = token.clone();
512 +
513 + info!("{name}: route checks every {route_interval}s ({n} routes)");
514 +
515 + handles.push(tokio::spawn(async move {
516 + let mut interval = tokio::time::interval(
517 + std::time::Duration::from_secs(route_interval),
518 + );
519 + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
520 + let client = reqwest::Client::builder()
521 + .redirect(reqwest::redirect::Policy::none())
522 + .build()
523 + .unwrap_or_default();
524 + let mut prev_failed: std::collections::HashSet<String> = std::collections::HashSet::new();
525 +
526 + interval.tick().await; // consume immediate first tick
527 + loop {
528 + tokio::select! {
529 + _ = cancel.cancelled() => break,
530 + _ = interval.tick() => {}
531 + }
532 + let results = routes::check_routes(&client, &name, &base_url, &route_paths, timeout).await;
533 +
534 + for result in &results {
535 + if let Err(e) = db::insert_route_check(&pool, result).await {
536 + tracing::error!("{}: failed to store route check for {}: {e}", name, result.path);
537 + }
538 + }
539 +
540 + let current_failed: std::collections::HashSet<String> = results
541 + .iter()
542 + .filter(|r| !r.ok)
543 + .map(|r| r.path.clone())
544 + .collect();
545 +
546 + let ok_count = results.iter().filter(|r| r.ok).count();
547 + info!("{name}: routes {ok_count}/{n} OK");
548 +
549 + if let Some(ref alerter) = alerter {
550 + // New failures
551 + let new_failures: Vec<String> = current_failed
552 + .difference(&prev_failed)
553 + .cloned()
554 + .collect();
555 + if !new_failures.is_empty() {
556 + alerter.send_route_failure_alert(&name, &label, &new_failures).await;
557 + }
558 +
559 + // Recoveries
560 + let recoveries: Vec<String> = prev_failed
561 + .difference(&current_failed)
562 + .cloned()
563 + .collect();
564 + if !recoveries.is_empty() {
565 + alerter.send_route_recovery_alert(&name, &label, &recoveries).await;
566 + }
567 + }
568 +
569 + prev_failed = current_failed;
570 + }
571 + }));
572 + }
573 +
480 574 // Spawn daily prune task
481 575 let prune_pool = pool.clone();
576 + let prune_cancel = token.clone();
482 577 handles.push(tokio::spawn(async move {
483 578 let mut interval = tokio::time::interval(
484 579 std::time::Duration::from_secs(86400),
485 580 );
581 + interval.tick().await; // consume immediate first tick
486 582 loop {
487 - interval.tick().await;
583 + tokio::select! {
584 + _ = prune_cancel.cancelled() => break,
585 + _ = interval.tick() => {}
586 + }
488 587 match db::prune_old_records(&prune_pool, prune_days).await {
489 - Ok((h, t, p, a, tl, inc)) => info!("Pruned {h} health checks, {t} test runs, {p} peer heartbeats, {a} alerts, {tl} TLS checks, {inc} incidents"),
588 + Ok((h, t, p, a, tl, inc, rc)) => info!("Pruned {h} health checks, {t} test runs, {p} peer heartbeats, {a} alerts, {tl} TLS checks, {inc} incidents, {rc} route checks"),
490 589 Err(e) => tracing::error!("Prune failed: {e}"),
491 590 }
492 591 }
@@ -501,6 +600,7 @@ pub(crate) async fn cmd_serve(
501 600 pool.clone(),
502 601 heartbeat_secs,
503 602 alerter.clone(),
603 + token.clone(),
504 604 ).await;
505 605 handles.extend(hb_handles);
506 606 }
@@ -509,30 +609,61 @@ pub(crate) async fn cmd_serve(
509 609 let api_app = pom::api::router(pool.clone(), config.clone(), Some(mesh.clone()));
510 610 let api_listener = tokio::net::TcpListener::bind(&listen_addr).await?;
511 611 info!("API server listening on {listen_addr}");
612 + let api_cancel = token.clone();
512 613 handles.push(tokio::spawn(async move {
513 - if let Err(e) = axum::serve(api_listener, api_app).await {
614 + if let Err(e) = axum::serve(api_listener, api_app)
615 + .with_graceful_shutdown(async move { api_cancel.cancelled().await })
616 + .await
617 + {
514 618 tracing::error!("API server error: {e}");
515 619 }
516 620 }));
517 621
518 - // Wait for shutdown signal
622 + // Wait for shutdown signal or unexpected task exit
519 623 let mut sigterm = tokio::signal::unix::signal(
520 624 tokio::signal::unix::SignalKind::terminate(),
521 625 )?;
522 626
523 - tokio::select! {
524 - _ = tokio::signal::ctrl_c() => {
525 - info!("Received SIGINT, shutting down");
526 - }
527 - _ = sigterm.recv() => {
528 - info!("Received SIGTERM, shutting down");
627 + let mut watchdog_interval = tokio::time::interval(std::time::Duration::from_secs(60));
628 + watchdog_interval.tick().await; // consume immediate first tick
629 +
630 + loop {
631 + tokio::select! {
632 + _ = tokio::signal::ctrl_c() => {
633 + info!("Received SIGINT, shutting down");
634 + break;
635 + }
636 + _ = sigterm.recv() => {
637 + info!("Received SIGTERM, shutting down");
638 + break;
639 + }
640 + _ = watchdog_interval.tick() => {
641 + for (i, handle) in handles.iter().enumerate() {
642 + if handle.is_finished() {
643 + tracing::error!("Background task {i} exited unexpectedly (possible panic)");
644 + }
645 + }
646 + }
529 647 }
530 648 }
531 649
532 - for handle in handles {
533 - handle.abort();
650 + // Graceful shutdown: cancel all tasks, then wait with grace period
651 + token.cancel();
652 + info!("Waiting for tasks to finish (5s grace period)...");
653 +
654 + let shutdown = async {
655 + for handle in handles {
656 + if let Err(e) = handle.await {
657 + tracing::error!("Task shutdown error: {e}");
658 + }
659 + }
660 + };
661 +
662 + if tokio::time::timeout(std::time::Duration::from_secs(5), shutdown).await.is_err() {
663 + tracing::warn!("Grace period elapsed, some tasks may not have finished cleanly");
534 664 }
535 665
666 + info!("Shutdown complete");
536 667 Ok(())
537 668 }
538 669
M src/config.rs +187 -4
@@ -9,53 +9,81 @@ use crate::peer::OnMissing;
9 9
10 10 #[derive(Debug, Clone, Deserialize)]
11 11 pub struct Config {
12 + /// Serve-mode settings (intervals, listen address, pruning).
12 13 #[serde(default)]
13 14 pub serve: ServeConfig,
15 + /// This PoM instance's identity (name, optional fixed ID).
14 16 #[serde(default)]
15 17 pub instance: InstanceConfig,
18 + /// Monitored targets, keyed by short name (e.g. "mnw", "go").
16 19 #[serde(default)]
17 20 pub targets: HashMap<String, TargetConfig>,
21 + /// Peer PoM instances for mesh monitoring, keyed by peer name.
18 22 #[serde(default)]
19 23 pub peers: HashMap<String, PeerConfig>,
24 + /// Email alert configuration via Postmark. `None` disables alerting.
20 25 pub alerts: Option<AlertConfig>,
21 26 }
22 27
23 28 #[derive(Debug, Clone, Deserialize)]
24 29 pub struct AlertConfig {
30 + /// Postmark server API token. Can also be set via `POM_POSTMARK_TOKEN` env var.
25 31 pub postmark_token: Option<String>,
32 + /// Recipient email address for alert notifications.
26 33 pub to: String,
34 + /// Sender email address for alert notifications.
27 35 #[serde(default = "default_alert_from")]
28 36 pub from: String,
37 + /// Minimum seconds between repeated alerts for the same target.
29 38 #[serde(default = "default_cooldown_secs")]
30 39 pub cooldown_secs: u64,
31 40 }
32 41
33 42 #[derive(Debug, Clone, Default, Deserialize)]
34 43 pub struct InstanceConfig {
44 + /// Human-readable instance name. Falls back to OS hostname if unset.
35 45 pub name: Option<String>,
46 + /// Fixed instance UUID. Auto-generated and persisted to disk if unset.
36 47 pub id: Option<String>,
37 48 }
38 49
39 50 #[derive(Debug, Clone, Deserialize)]
40 51 pub struct PeerConfig {
52 + /// Network address of the peer (host:port).
41 53 pub address: String,
54 + /// Action to take when the peer is declared missing.
42 55 #[serde(default)]
43 56 pub on_missing: OnMissing,
57 + /// Number of consecutive heartbeat failures before declaring the peer missing.
58 + /// Defaults to 3 at runtime if unset.
44 59 pub grace_count: Option<u32>,
60 + /// Bearer token for authenticating with this peer's API.
61 + pub token: Option<String>,
45 62 }
46 63
47 64 #[derive(Debug, Clone, Deserialize)]
48 65 pub struct ServeConfig {
66 + /// Seconds between health check cycles for all targets.
49 67 #[serde(default = "default_serve_interval")]
50 68 pub interval_secs: u64,
69 + /// Number of days of history to retain before pruning.
51 70 #[serde(default = "default_prune_days")]
52 71 pub prune_days: i64,
72 + /// Socket address the API server binds to (e.g. "127.0.0.1:9100").
53 73 #[serde(default = "default_listen")]
54 74 pub listen: String,
75 + /// Seconds between peer heartbeat probes.
55 76 #[serde(default = "default_peer_heartbeat")]
56 77 pub peer_heartbeat_secs: u64,
78 + /// Seconds between TLS certificate checks.
57 79 #[serde(default = "default_tls_check_interval")]
58 80 pub tls_check_interval_secs: u64,
81 + /// Seconds between route accessibility checks for all targets.
82 + #[serde(default = "default_route_check_interval")]
83 + pub route_check_interval_secs: u64,
84 + /// Bearer token required for API access. If set, all /api/* requests must
85 + /// include `Authorization: Bearer <token>`. Can also be set via POM_API_TOKEN env var.
86 + pub api_token: Option<String>,
59 87 }
60 88
61 89 impl Default for ServeConfig {
@@ -66,23 +94,35 @@ impl Default for ServeConfig {
66 94 listen: default_listen(),
67 95 peer_heartbeat_secs: 60,
68 96 tls_check_interval_secs: 3600,
97 + route_check_interval_secs: 300,
98 + api_token: None,
69 99 }
70 100 }
71 101 }
72 102
73 103 fn default_peer_heartbeat() -> u64 {
104 + // 1 minute: detects peer failures within the grace period
74 105 60
75 106 }
76 107
77 108 fn default_tls_check_interval() -> u64 {
109 + // 1 hour: certificates change slowly, no need to probe frequently
78 110 3600
79 111 }
80 112
113 + fn default_route_check_interval() -> u64 {
114 + // 5 minutes: same cadence as health checks, catches broken pages quickly
115 + 300
116 + }
117 +
81 118 fn default_serve_interval() -> u64 {
119 + // 5 minutes: frequent enough to catch outages within an SLA window,
120 + // infrequent enough to avoid noise
82 121 300
83 122 }
84 123
85 124 fn default_prune_days() -> i64 {
125 + // 30 days: enough history for monthly reporting, keeps DB small
86 126 30
87 127 }
88 128
@@ -92,17 +132,28 @@ fn default_listen() -> String {
92 132
93 133 #[derive(Debug, Clone, Deserialize)]
94 134 pub struct TargetConfig {
135 + /// Human-readable display name for this target.
95 136 pub label: String,
137 + /// HTTP health check configuration. `None` disables health monitoring.
96 138 pub health: Option<HealthConfig>,
139 + /// Remote test runner configuration. `None` disables test execution.
97 140 pub tests: Option<TestsConfig>,
141 + /// TLS certificate monitoring configuration. `None` disables TLS checks.
98 142 pub tls: Option<TlsConfig>,
143 + /// Expected routes to check for accessibility. Empty disables route checks.
144 + /// Requires `health` config for base URL derivation.
145 + #[serde(default)]
146 + pub expected_routes: Vec<String>,
99 147 }
100 148
101 149 #[derive(Debug, Clone, Deserialize)]
102 150 pub struct TlsConfig {
151 + /// Hostname to connect to for the TLS check.
103 152 pub host: String,
153 + /// TCP port for the TLS connection.
104 154 #[serde(default = "default_tls_port")]
105 155 pub port: u16,
156 + /// Days before expiry at which to start warning.
106 157 #[serde(default = "default_tls_warn_days")]
107 158 pub warn_days: u32,
108 159 }
@@ -112,65 +163,84 @@ fn default_tls_port() -> u16 {
112 163 }
113 164
114 165 fn default_tls_warn_days() -> u32 {
166 + // 2 weeks: enough lead time to renew before expiry
115 167 14
116 168 }
117 169
118 170 #[derive(Debug, Clone, Deserialize)]
119 171 pub struct HealthConfig {
172 + /// URL of the health endpoint to check.
120 173 pub url: String,
174 + /// HTTP request timeout in seconds for this health check.
121 175 #[serde(default = "default_health_timeout")]
122 176 pub timeout_secs: u64,
123 - /// Per-target interval override for serve mode
177 + /// Per-target interval override for serve mode.
124 178 pub interval_secs: Option<u64>,
125 - /// Response validation expectations
179 + /// Response validation expectations.
126 180 pub expect: Option<HealthExpectation>,
127 - /// Latency trending and drift detection
181 + /// Latency trending and drift detection.
128 182 pub trending: Option<TrendingConfig>,
129 183 }
130 184
131 185 #[derive(Debug, Clone, Deserialize)]
132 186 pub struct TrendingConfig {
187 + /// Number of hours of history used to compute the baseline average latency.
133 188 #[serde(default = "default_baseline_window_hours")]
134 189 pub baseline_window_hours: u64,
190 + /// Multiplier over the baseline average that constitutes a latency spike.
135 191 #[serde(default = "default_spike_threshold")]
136 192 pub spike_threshold: f64,
137 193 }
138 194
139 195 fn default_baseline_window_hours() -> u64 {
196 + // 7 days: captures weekly traffic patterns for stable baseline
140 197 168
141 198 }
142 199
143 200 fn default_spike_threshold() -> f64 {
201 + // 2x baseline average: significant deviation without false positives
202 + // from normal variance
144 203 2.0
145 204 }
146 205
147 206 #[derive(Debug, Clone, Deserialize, Default)]
148 207 pub struct HealthExpectation {
208 + /// Expected HTTP status code (e.g. 200). `None` accepts any 2xx.
149 209 pub status_code: Option<u16>,
210 + /// JSON field paths and their expected string values (e.g. `{"status": "operational"}`).
150 211 #[serde(default)]
151 212 pub json_fields: HashMap<String, String>,
213 + /// Substring that must appear in the response body.
152 214 pub body_contains: Option<String>,
153 215 }
154 216
155 217 #[derive(Debug, Clone, Deserialize)]
156 218 pub struct TestsConfig {
219 + /// SSH host alias (from ~/.ssh/config) for the test runner machine.
157 220 pub ssh: String,
221 + /// Shell command to execute on the remote host to run tests.
158 222 pub command: String,
223 + /// Maximum seconds to wait for the test command before killing it.
159 224 #[serde(default = "default_test_timeout")]
160 225 pub timeout_secs: u64,
226 + /// Number of days after which a test run is considered stale.
161 227 #[serde(default = "default_staleness_days")]
162 228 pub staleness_days: u64,
163 229 }
164 230
165 231 fn default_staleness_days() -> u64 {
232 + // 1 week: tests older than a week may not reflect current code
166 233 7
167 234 }
168 235
169 236 fn default_health_timeout() -> u64 {
237 + // 10 seconds: generous for most HTTP endpoints, avoids false positives
238 + // on slow networks
170 239 10
171 240 }
172 241
173 242 fn default_test_timeout() -> u64 {
243 + // 10 minutes: full CI suites can take time, especially on slow machines
174 244 600
175 245 }
176 246
@@ -179,6 +249,7 @@ fn default_alert_from() -> String {
179 249 }
180 250
181 251 fn default_cooldown_secs() -> u64 {
252 + // 5 minutes: prevents alert storms during sustained outages
182 253 300
183 254 }
184 255
@@ -197,7 +268,35 @@ impl Config {
197 268 }
198 269
199 270 let contents = std::fs::read_to_string(&config_path)?;
200 - let config: Config = toml::from_str(&contents)?;
271 + let mut config: Config = toml::from_str(&contents)?;
272 +
273 + // Allow postmark_token from environment variable (preferred over config file)
274 + if let Some(ref mut alerts) = config.alerts {
275 + if alerts.postmark_token.is_none() {
276 + if let Ok(token) = std::env::var("POM_POSTMARK_TOKEN") {
277 + alerts.postmark_token = Some(token);
278 + }
279 + }
280 + }
281 +
282 + // Allow api_token from environment variable (preferred over config file)
283 + if config.serve.api_token.is_none() {
284 + if let Ok(token) = std::env::var("POM_API_TOKEN") {
285 + config.serve.api_token = Some(token);
286 + }
287 + }
288 +
289 + // Validate expected_routes paths start with '/'
290 + for (name, target) in &config.targets {
291 + for route in &target.expected_routes {
292 + if !route.starts_with('/') {
293 + return Err(PomError::Config(format!(
294 + "target {name}: expected_route \"{route}\" must start with '/'"
295 + )));
296 + }
297 + }
298 + }
299 +
201 300 Ok(config)
202 301 }
203 302
@@ -303,6 +402,35 @@ address = "10.0.0.1:9100"
303 402 let peer = config.peers.get("test").unwrap();
304 403 assert_eq!(peer.on_missing, OnMissing::Log);
305 404 assert_eq!(peer.grace_count, None);
405 + assert!(peer.token.is_none());
406 + }
407 +
408 + #[test]
409 + fn peer_with_token() {
410 + let toml = r#"
411 + [peers.test]
412 + address = "10.0.0.1:9100"
413 + token = "peer-secret-123"
414 + "#;
415 + let config: Config = toml::from_str(toml).unwrap();
416 + let peer = config.peers.get("test").unwrap();
417 + assert_eq!(peer.token.as_deref(), Some("peer-secret-123"));
418 + }
419 +
420 + #[test]
421 + fn serve_api_token_from_config() {
422 + let toml = r#"
423 + [serve]
424 + api_token = "my-api-secret"
425 + "#;
426 + let config: Config = toml::from_str(toml).unwrap();
427 + assert_eq!(config.serve.api_token.as_deref(), Some("my-api-secret"));
428 + }
429 +
430 + #[test]
431 + fn serve_api_token_defaults_to_none() {
432 + let config: Config = toml::from_str("").unwrap();
433 + assert!(config.serve.api_token.is_none());
306 434 }
307 435
308 436 #[test]
@@ -524,4 +652,59 @@ cooldown_secs = 60
524 652 assert_eq!(alerts.from, "Custom <custom@example.com>");
525 653 assert_eq!(alerts.cooldown_secs, 60);
526 654 }
655 +
656 + #[test]
657 + fn config_expected_routes() {
658 + let toml = r#"
659 + [targets.mnw]
660 + label = "MakeNotWork"
661 + expected_routes = ["/", "/discover", "/login", "/docs"]
662 + [targets.mnw.health]
663 + url = "https://makenot.work/api/health"
664 + "#;
665 + let config: Config = toml::from_str(toml).unwrap();
666 + let mnw = config.get_target("mnw").unwrap();
667 + assert_eq!(mnw.expected_routes, vec!["/", "/discover", "/login", "/docs"]);
668 + }
669 +
670 + #[test]
671 + fn config_expected_routes_default_empty() {
672 + let toml = r#"
673 + [targets.mnw]
674 + label = "MakeNotWork"
675 + "#;
676 + let config: Config = toml::from_str(toml).unwrap();
677 + assert!(config.get_target("mnw").unwrap().expected_routes.is_empty());
678 + }
679 +
680 + #[test]
681 + fn config_route_check_interval_default() {
682 + let config: Config = toml::from_str("").unwrap();
683 + assert_eq!(config.serve.route_check_interval_secs, 300);
684 + }
685 +
686 + #[test]
687 + fn config_route_check_interval_custom() {
688 + let toml = r#"
689 + [serve]
690 + route_check_interval_secs = 600
691 + "#;
692 + let config: Config = toml::from_str(toml).unwrap();
693 + assert_eq!(config.serve.route_check_interval_secs, 600);
694 + }
695 +
696 + #[test]
697 + fn config_expected_routes_without_slash_detected() {
698 + let toml = r#"
699 + [targets.mnw]
700 + label = "MakeNotWork"
701 + expected_routes = ["discover", "/login"]
702 + "#;
703 + let config: Config = toml::from_str(toml).unwrap();
704 + let bad_routes: Vec<_> = config.get_target("mnw").unwrap()
705 + .expected_routes.iter()
706 + .filter(|r| !r.starts_with('/'))
707 + .collect();
708 + assert_eq!(bad_routes, vec!["discover"]);
709 + }
527 710 }
M src/db.rs +135 -3
@@ -8,7 +8,7 @@
8 8 use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
9 9 use std::path::Path;
10 10 use std::str::FromStr;
11 - use tracing::info;
11 + use tracing::{info, instrument};
12 12
13 13 use crate::error::Result;
14 14 use crate::types::{HealthDetails, HealthSnapshot, HealthStatus, TestRun, TestSummary, TlsStatus};
@@ -95,8 +95,23 @@ const MIGRATIONS: &[(i64, &str, &str)] = &[
95 95 );
96 96 CREATE INDEX idx_incidents_target_id ON incidents(target, id DESC);
97 97 "#),
98 + (5, "add route_checks table", r#"
99 + CREATE TABLE route_checks (
100 + id INTEGER PRIMARY KEY AUTOINCREMENT,
101 + target TEXT NOT NULL,
102 + path TEXT NOT NULL,
103 + status_code INTEGER NOT NULL,
104 + ok INTEGER NOT NULL,
105 + response_time_ms INTEGER NOT NULL,
106 + checked_at TEXT NOT NULL,
107 + error TEXT
108 + );
109 + CREATE INDEX idx_route_checks_target_path ON route_checks(target, path, id DESC);
110 + CREATE INDEX idx_route_checks_target ON route_checks(target, checked_at DESC);
111 + "#),
98 112 ];
99 113
114 + #[instrument(skip_all)]
100 115 pub async fn connect(path: &Path) -> Result<SqlitePool> {
101 116 let opts = SqliteConnectOptions::from_str(&format!("sqlite:{}", path.display()))?
102 117 .create_if_missing(true)
@@ -111,6 +126,7 @@ pub async fn connect(path: &Path) -> Result<SqlitePool> {
111 126 Ok(pool)
112 127 }
113 128
129 + #[instrument(skip_all)]
114 130 pub async fn connect_in_memory() -> Result<SqlitePool> {
115 131 let opts = SqliteConnectOptions::from_str("sqlite::memory:")?;
116 132 let pool = SqlitePoolOptions::new()
@@ -124,6 +140,7 @@ pub async fn connect_in_memory() -> Result<SqlitePool> {
124 140
125 141 /// Run pending schema migrations. Detects pre-migration databases by checking
126 142 /// for existing tables and stamps them as version 1 without re-running.
143 + #[instrument(skip_all)]
127 144 pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
128 145 // Ensure the schema_version table exists
129 146 sqlx::query(
@@ -163,6 +180,7 @@ pub async fn run_migrations(pool: &SqlitePool) -> Result<()> {
163 180 }
164 181
165 182 /// Get the current schema version (0 if no migrations have been applied).
183 + #[instrument(skip_all)]
166 184 pub async fn get_schema_version(pool: &SqlitePool) -> Result<i64> {
167 185 let row = sqlx::query_as::<_, (i64,)>(
168 186 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
@@ -219,6 +237,7 @@ async fn stamp_version(pool: &SqlitePool, version: i64, description: &str) -> Re
219 237
220 238 // --- Health check queries ---
221 239
240 + #[instrument(skip_all)]
222 241 pub async fn insert_health_check(
223 242 pool: &SqlitePool,
224 243 snapshot: &HealthSnapshot,
@@ -245,6 +264,7 @@ pub async fn insert_health_check(
245 264 Ok(result.last_insert_rowid())
246 265 }
247 266
267 + #[instrument(skip_all)]
248 268 pub async fn get_health_history(
249 269 pool: &SqlitePool,
250 270 target: Option<&str>,
@@ -275,6 +295,7 @@ pub async fn get_health_history(
275 295 Ok(rows.into_iter().map(|r| r.into_snapshot()).collect())
276 296 }
277 297
298 + #[instrument(skip_all)]
278 299 pub async fn get_latest_health(
279 300 pool: &SqlitePool,
280 301 target: &str,
@@ -292,6 +313,7 @@ pub async fn get_latest_health(
292 313
293 314 // --- Test run queries ---
294 315
316 + #[instrument(skip_all)]
295 317 pub async fn insert_test_run(
296 318 pool: &SqlitePool,
297 319 run: &TestRun,
@@ -317,6 +339,7 @@ pub async fn insert_test_run(
317 339 Ok(result.last_insert_rowid())
318 340 }
319 341
342 + #[instrument(skip_all)]
320 343 pub async fn get_test_history(
321 344 pool: &SqlitePool,
322 345 target: Option<&str>,
@@ -347,6 +370,7 @@ pub async fn get_test_history(
347 370 Ok(rows.into_iter().map(|r| r.into_test_run()).collect())
348 371 }
349 372
373 + #[instrument(skip_all)]
350 374 pub async fn get_latest_test_run(
351 375 pool: &SqlitePool,
352 376 target: &str,
@@ -363,6 +387,7 @@ pub async fn get_latest_test_run(
363 387 }
364 388
365 389 /// Get the version from the health check closest to (but before) a given timestamp.
390 + #[instrument(skip_all)]
366 391 pub async fn get_version_at_time(
367 392 pool: &SqlitePool,
368 393 target: &str,
@@ -388,6 +413,7 @@ pub async fn get_version_at_time(
388 413
389 414 /// Calculate uptime percentage for a target over the given number of hours.
390 415 /// Returns the percentage of health checks with "operational" status.
416 + #[instrument(skip_all)]
391 417 pub async fn get_uptime_percent(
392 418 pool: &SqlitePool,
393 419 target: &str,
@@ -418,6 +444,7 @@ pub async fn get_uptime_percent(
418 444 // --- Latency trending queries ---
419 445
420 446 /// Fetch all response times for a target since a given timestamp, ordered ASC.
447 + #[instrument(skip_all)]
421 448 pub async fn get_response_times(
422 449 pool: &SqlitePool,
423 450 target: &str,
@@ -436,6 +463,7 @@ pub async fn get_response_times(
436 463 }
437 464
438 465 /// Fetch the last N response times for **operational** checks only (most recent first).
466 + #[instrument(skip_all)]
439 467 pub async fn get_recent_response_times(
440 468 pool: &SqlitePool,
441 469 target: &str,
@@ -466,6 +494,7 @@ pub struct AlertRow {
466 494 pub error: Option<String>,
467 495 }
468 496
497 + #[instrument(skip_all)]
469 498 pub async fn insert_alert(
470 499 pool: &SqlitePool,
471 500 target: &str,
@@ -490,13 +519,15 @@ pub async fn insert_alert(
490 519 Ok(result.last_insert_rowid())
491 520 }
492 521
522 + #[instrument(skip_all)]
493 523 pub async fn get_latest_alert_for_target(
494 524 pool: &SqlitePool,
495 525 target: &str,
496 526 ) -> Result<Option<AlertRow>> {
497 527 Ok(sqlx::query_as::<_, AlertRow>(
498 528 "SELECT id, target, alert_type, from_status, to_status, sent_at, error
499 - FROM alerts WHERE target = ? ORDER BY id DESC LIMIT 1",
529 + FROM alerts WHERE target = ? AND alert_type NOT LIKE '%recovery%'
530 + ORDER BY id DESC LIMIT 1",
500 531 )
501 532 .bind(target)
502 533 .fetch_optional(pool)
@@ -520,6 +551,7 @@ pub struct TlsCheckRow {
520 551 pub error: Option<String>,
521 552 }
522 553
554 + #[instrument(skip_all)]
523 555 pub async fn insert_tls_check(
524 556 pool: &SqlitePool,
525 557 status: &TlsStatus,
@@ -544,6 +576,7 @@ pub async fn insert_tls_check(
544 576 Ok(result.last_insert_rowid())
545 577 }
546 578
579 + #[instrument(skip_all)]
547 580 pub async fn get_latest_tls_check(
548 581 pool: &SqlitePool,
549 582 target: &str,
@@ -570,6 +603,7 @@ pub struct IncidentRow {
570 603 pub to_status: String,
571 604 }
572 605
606 + #[instrument(skip_all)]
573 607 pub async fn insert_incident(
574 608 pool: &SqlitePool,
575 609 target: &str,
@@ -590,6 +624,7 @@ pub async fn insert_incident(
590 624 Ok(result.last_insert_rowid())
591 625 }
592 626
627 + #[instrument(skip_all)]
593 628 pub async fn close_open_incidents(
594 629 pool: &SqlitePool,
595 630 target: &str,
@@ -607,6 +642,7 @@ pub async fn close_open_incidents(
607 642 Ok(result.rows_affected())
608 643 }
609 644
645 + #[instrument(skip_all)]
610 646 pub async fn get_open_incident(
611 647 pool: &SqlitePool,
612 648 target: &str,
@@ -620,6 +656,7 @@ pub async fn get_open_incident(
620 656 .await?)
621 657 }
622 658
659 + #[instrument(skip_all)]
623 660 pub async fn get_recent_incidents(
624 661 pool: &SqlitePool,
625 662 target: &str,
@@ -635,12 +672,77 @@ pub async fn get_recent_incidents(
635 672 .await?)
636 673 }
637 674
675 + // --- Route check queries ---
676 +
677 + #[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
678 + pub struct RouteCheckRow {
679 + pub id: i64,
680 + pub target: String,
681 + pub path: String,
682 + pub status_code: i64,
683 + pub ok: bool,
684 + pub response_time_ms: i64,
685 + pub checked_at: String,
686 + pub error: Option<String>,
687 + }
688 +
689 + #[instrument(skip_all)]
690 + pub async fn insert_route_check(
691 + pool: &SqlitePool,
692 + result: &crate::checks::routes::RouteCheckResult,
693 + ) -> Result<i64> {
694 + let row = sqlx::query(
695 + "INSERT INTO route_checks (target, path, status_code, ok, response_time_ms, checked_at, error)
696 + VALUES (?, ?, ?, ?, ?, ?, ?)",
697 + )
698 + .bind(&result.target)
699 + .bind(&result.path)
700 + .bind(result.status_code as i64)
701 + .bind(result.ok)
702 + .bind(result.response_time_ms)
703 + .bind(&result.checked_at)
704 + .bind(&result.error)
705 + .execute(pool)
706 + .await?;
707 + Ok(row.last_insert_rowid())
708 + }
709 +
710 + /// Get the latest route check per path for a target.
711 + #[instrument(skip_all)]
712 + pub async fn get_latest_route_checks(
713 + pool: &SqlitePool,
714 + target: &str,
715 + ) -> Result<Vec<RouteCheckRow>> {
716 + Ok(sqlx::query_as::<_, RouteCheckRow>(
717 + "SELECT r.id, r.target, r.path, r.status_code, r.ok, r.response_time_ms, r.checked_at, r.error
718 + FROM route_checks r
719 + INNER JOIN (SELECT path, MAX(id) as max_id FROM route_checks WHERE target = ? GROUP BY path) latest
720 + ON r.id = latest.max_id
721 + ORDER BY r.path",
722 + )
723 + .bind(target)
724 + .fetch_all(pool)
725 + .await?)
726 + }
727 +
638 728 // --- Maintenance ---
639 729
730 + /// Delete records older than `days` from all tables.
731 + ///
732 + /// Returns a tuple of deleted row counts in this order:
733 + /// (health_checks, test_runs, heartbeats, alerts, tls_checks, incidents, route_checks).
734 + /// Only closed incidents (with a non-NULL `ended_at`) are pruned.
735 + #[instrument(skip_all)]
640 736 pub async fn prune_old_records(
641 737 pool: &SqlitePool,
642 738 days: i64,
643 - ) -> Result<(u64, u64, u64, u64, u64, u64)> {
739 + ) -> Result<(u64, u64, u64, u64, u64, u64, u64)> {
740 + // Guard: days <= 0 would set cutoff to now (or the future), deleting
741 + // everything. Treat this as a no-op instead.
742 + if days <= 0 {
743 + return Ok((0, 0, 0, 0, 0, 0, 0));
744 + }
745 +
644 746 let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
645 747 let cutoff_str = cutoff.to_rfc3339();
646 748
@@ -674,6 +776,11 @@ pub async fn prune_old_records(
674 776 .execute(pool)
675 777 .await?;
676 778
779 + let routes_result = sqlx::query("DELETE FROM route_checks WHERE checked_at < ?")
780 + .bind(&cutoff_str)
781 + .execute(pool)
782 + .await?;
783 +
677 784 Ok((
678 785 health_result.rows_affected(),
679 786 test_result.rows_affected(),
@@ -681,12 +788,14 @@ pub async fn prune_old_records(
681 788 alerts_result.rows_affected(),
682 789 tls_result.rows_affected(),
683 790 incidents_result.rows_affected(),
791 + routes_result.rows_affected(),
684 792 ))
685 793 }
686 794
687 795 // --- Peer identity queries ---
688 796
689 797 /// Store a peer's identity (first-seen UUID). INSERT OR IGNORE — first wins.
798 + #[instrument(skip_all)]
690 799 pub async fn store_peer_identity(
691 800 pool: &SqlitePool,
692 801 peer_name: &str,
@@ -705,7 +814,28 @@ pub async fn store_peer_identity(
705 814 Ok(())
706 815 }
707 816
817 + /// Update a peer's identity (UUID changed, e.g. after reinstall).
818 + #[instrument(skip_all)]
819 + pub async fn update_peer_identity(
820 + pool: &SqlitePool,
821 + peer_name: &str,
822 + instance_id: &str,
823 + ) -> Result<()> {
824 + let now = chrono::Utc::now().to_rfc3339();
825 + sqlx::query(
826 + "UPDATE peer_identities SET instance_id = ?, first_seen = ?
827 + WHERE peer_name = ?",
828 + )
829 + .bind(instance_id)
830 + .bind(&now)
831 + .bind(peer_name)
832 + .execute(pool)
833 + .await?;
834 + Ok(())
835 + }
836 +
708 837 /// Get a peer's first-seen instance ID.
838 + #[instrument(skip_all)]
709 839 pub async fn get_peer_identity(
710 840 pool: &SqlitePool,
711 841 peer_name: &str,
@@ -721,6 +851,7 @@ pub async fn get_peer_identity(
721 851
722 852 // --- Peer heartbeat queries ---
723 853
854 + #[instrument(skip_all)]
724 855 pub async fn insert_peer_heartbeat(
725 856 pool: &SqlitePool,
726 857 peer_name: &str,
@@ -741,6 +872,7 @@ pub async fn insert_peer_heartbeat(
741 872 Ok(result.last_insert_rowid())
742 873 }
743 874
875 + #[instrument(skip_all)]
744 876 pub async fn get_peer_heartbeat_history(
745 877 pool: &SqlitePool,
746 878 peer_name: &str,
M src/display.rs +66 -23
@@ -5,7 +5,7 @@
5 5
6 6 use std::fmt::Write;
7 7
8 - use crate::db::{IncidentRow, TlsCheckRow};
8 + use crate::db::{IncidentRow, RouteCheckRow, TlsCheckRow};
9 9 use crate::types::{HealthSnapshot, LatencyStats, TestRun, TestStaleness};
10 10
11 11 /// Format a single health snapshot as a human-readable line.
@@ -58,7 +58,7 @@ pub fn format_test_result(target_name: &str, run: &TestRun) -> String {
58 58 out
59 59 }
60 60
61 - /// Format a single target's status block (health + latency + TLS + tests + staleness + incident) for CLI display.
61 + /// Format a single target's status block (health + latency + TLS + routes + tests + staleness + incident) for CLI display.
62 62 #[allow(clippy::too_many_arguments)]
63 63 pub fn format_status_target(
64 64 name: &str,
@@ -66,6 +66,7 @@ pub fn format_status_target(
66 66 health: Option<&HealthSnapshot>,
67 67 latency: Option<&LatencyStats>,
68 68 tls: Option<&TlsCheckRow>,
69 + route_checks: Option<&[RouteCheckRow]>,
69 70 test: Option<&TestRun>,
70 71 staleness: Option<&TestStaleness>,
71 72 incident: Option<&IncidentRow>,
@@ -107,6 +108,19 @@ pub fn format_status_target(
107 108 }
108 109 }
109 110
111 + if let Some(checks) = route_checks {
112 + if !checks.is_empty() {
113 + let total = checks.len();
114 + let ok_count = checks.iter().filter(|c| c.ok).count();
115 + if ok_count == total {
116 + writeln!(out, " Routes: {ok_count}/{total} OK").unwrap();
117 + } else {
118 + let failed: Vec<&str> = checks.iter().filter(|c| !c.ok).map(|c| c.path.as_str()).collect();
119 + writeln!(out, " Routes: {ok_count}/{total} (FAIL: {})", failed.join(", ")).unwrap();
120 + }
121 + }
122 + }
123 +
110 124 if let Some(t) = test {
111 125 let result = if t.passed { "PASSED" } else { "FAILED" };
112 126 write!(out, " Tests: {result}").unwrap();
@@ -179,8 +193,8 @@ pub fn format_test_history(history: &[TestRun]) -> String {
179 193 }
180 194
181 195 /// Format prune results for CLI display.
182 - pub fn format_prune(health_pruned: u64, test_pruned: u64, heartbeat_pruned: u64, alerts_pruned: u64, tls_pruned: u64, incidents_pruned: u64, days: i64) -> String {
183 - format!("Pruned {health_pruned} health checks, {test_pruned} test runs, {heartbeat_pruned} peer heartbeats, {alerts_pruned} alerts, {tls_pruned} TLS checks, {incidents_pruned} incidents older than {days} days.\n")
196 + pub fn format_prune(health_pruned: u64, test_pruned: u64, heartbeat_pruned: u64, alerts_pruned: u64, tls_pruned: u64, incidents_pruned: u64, routes_pruned: u64, days: i64) -> String {
197 + format!("Pruned {health_pruned} health checks, {test_pruned} test runs, {heartbeat_pruned} peer heartbeats, {alerts_pruned} alerts, {tls_pruned} TLS checks, {incidents_pruned} incidents, {routes_pruned} route checks older than {days} days.\n")
184 198 }
185 199
186 200 /// Format mesh data (from JSON) for human-readable CLI display.
@@ -480,7 +494,7 @@ mod tests {
480 494 raw_output: String::new(),
481 495 filter: None,
482 496 };
483 - let out = format_status_target("mnw", "MakeNotWork", Some(&health), None, None, Some(&test), None, None);
497 + let out = format_status_target("mnw", "MakeNotWork", Some(&health), None, None, None, Some(&test), None, None);
484 498 assert!(out.contains("=== mnw (MakeNotWork) ==="));
485 499 assert!(out.contains("Health: [OK] operational (95ms) v2.1.0"));
486 500 assert!(out.contains("Tests: PASSED (60s)"));
@@ -489,7 +503,7 @@ mod tests {
489 503
490 504 #[test]
491 505 fn status_target_no_data() {
492 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None);
506 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None, None);
493 507 assert!(out.contains("=== mnw (MakeNotWork) ==="));
494 508 assert!(out.contains("Health: no data"));
495 509 assert!(out.contains("Tests: no data"));
@@ -506,7 +520,7 @@ mod tests {
506 520 details: None,
507 521 error: None,
508 522 };
509 - let out = format_status_target("mnw", "MakeNotWork", Some(&health), None, None, None, None, None);
523 + let out = format_status_target("mnw", "MakeNotWork", Some(&health), None, None, None, None, None, None);
510 524 assert!(out.contains("Health: [WARN] degraded (2000ms)"));
511 525 assert!(out.contains("Tests: no data"));
512 526 }
@@ -529,7 +543,7 @@ mod tests {
529 543 raw_output: String::new(),
530 544 filter: None,
531 545 };
532 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, Some(&test), None, None);
546 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, Some(&test), None, None);
533 547 assert!(out.contains("Tests: FAILED"));
534 548 assert!(out.contains("80 passed, 5 failed"));
535 549 }
@@ -551,7 +565,7 @@ mod tests {
551 565 checked_at: "2026-03-11T00:00:00Z".to_string(),
552 566 error: None,
553 567 };
554 - let out = format_status_target("mnw", "MakeNotWork", None, None, Some(&tls), None, None, None);
568 + let out = format_status_target("mnw", "MakeNotWork", None, None, Some(&tls), None, None, None, None);
555 569 assert!(out.contains("TLS: [OK] makenot.work"));
556 570 assert!(out.contains("47d remaining"));
557 571 assert!(out.contains("expires 2026-04-27"));
@@ -572,7 +586,7 @@ mod tests {
572 586 checked_at: "2026-03-11T00:00:00Z".to_string(),
573 587 error: None,
574 588 };
575 - let out = format_status_target("mnw", "MakeNotWork", None, None, Some(&tls), None, None, None);
589 + let out = format_status_target("mnw", "MakeNotWork", None, None, Some(&tls), None, None, None, None);
576 590 assert!(out.contains("TLS: [WARN] makenot.work"));
577 591 assert!(out.contains("12d remaining"));
578 592 }
@@ -592,7 +606,7 @@ mod tests {
592 606 checked_at: "2026-03-11T00:00:00Z".to_string(),
593 607 error: Some("connection refused".to_string()),
594 608 };
595 - let out = format_status_target("mnw", "MakeNotWork", None, None, Some(&tls), None, None, None);
609 + let out = format_status_target("mnw", "MakeNotWork", None, None, Some(&tls), None, None, None, None);
596 610 assert!(out.contains("TLS: [ERR] makenot.work"));
597 611 assert!(out.contains("connection refused"));
598 612 }
@@ -610,13 +624,13 @@ mod tests {
610 624 from_status: "operational".to_string(),
611 625 to_status: "degraded".to_string(),
612 626 };
613 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, Some(&incident));
627 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None, Some(&incident));
614 628 assert!(out.contains("Incident: [ACTIVE] degraded since 2026-03-11T14:30:00Z"));
615 629 }
616 630
617 631 #[test]
618 632 fn status_target_no_incident() {
619 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None);
633 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None, None);
620 634 assert!(!out.contains("Incident"));
621 635 }
622 636
@@ -631,13 +645,13 @@ mod tests {
631 645 p95_ms: 180,
632 646 sample_count: 288,
633 647 };
634 - let out = format_status_target("mnw", "MakeNotWork", None, Some(&latency), None, None, None, None);
648 + let out = format_status_target("mnw", "MakeNotWork", None, Some(&latency), None, None, None, None, None);
635 649 assert!(out.contains("Latency (24h): avg 120ms, p95 180ms, range 95-210ms (288 samples)"));
636 650 }
637 651
638 652 #[test]
639 653 fn status_target_without_latency() {
640 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None);
654 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None, None);
641 655 assert!(!out.contains("Latency"));
642 656 }
643 657
@@ -757,17 +771,17 @@ mod tests {
757 771
758 772 #[test]
759 773 fn prune_formatting() {
760 - let out = format_prune(5, 3, 10, 2, 1, 4, 30);
774 + let out = format_prune(5, 3, 10, 2, 1, 4, 0, 30);
761 775 assert_eq!(
762 776 out,
763 - "Pruned 5 health checks, 3 test runs, 10 peer heartbeats, 2 alerts, 1 TLS checks, 4 incidents older than 30 days.\n"
777 + "Pruned 5 health checks, 3 test runs, 10 peer heartbeats, 2 alerts, 1 TLS checks, 4 incidents, 0 route checks older than 30 days.\n"
764 778 );
765 779 }
766 780
767 781 #[test]
768 782 fn prune_zero_records() {
769 - let out = format_prune(0, 0, 0, 0, 0, 0, 7);
770 - assert!(out.contains("Pruned 0 health checks, 0 test runs, 0 peer heartbeats, 0 alerts, 0 TLS checks, 0 incidents older than 7 days."));
783 + let out = format_prune(0, 0, 0, 0, 0, 0, 0, 7);
784 + assert!(out.contains("Pruned 0 health checks, 0 test runs, 0 peer heartbeats, 0 alerts, 0 TLS checks, 0 incidents, 0 route checks older than 7 days."));
771 785 }
772 786
773 787 // --- format_mesh ---
@@ -913,7 +927,7 @@ mod tests {
913 927 last_test_at: Some("2026-03-10T00:00:00Z".to_string()),
914 928 days_since_test: Some(1),
915 929 };
916 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, Some(&staleness), None);
930 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, Some(&staleness), None);
917 931 assert!(out.contains("Tests: STALE"));
918 932 assert!(out.contains("version changed: 0.1.8 -> 0.1.9"));
919 933 }
@@ -928,7 +942,7 @@ mod tests {
928 942 last_test_at: Some("2026-03-01T00:00:00Z".to_string()),
929 943 days_since_test: Some(10),
930 944 };
931 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, Some(&staleness), None);
945 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, Some(&staleness), None);
932 946 assert!(out.contains("Tests: STALE"));
933 947 assert!(out.contains("tests are 10 days old"));
934 948 }
@@ -943,13 +957,42 @@ mod tests {
943 957 last_test_at: Some("2026-03-10T00:00:00Z".to_string()),
944 958 days_since_test: Some(1),
945 959 };
946 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, Some(&staleness), None);
960 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, Some(&staleness), None);
947 961 assert!(!out.contains("STALE"));
948 962 }
949 963
950 964 #[test]
951 965 fn status_target_no_staleness_data() {
952 - let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None);
966 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None, None);
953 967 assert!(!out.contains("STALE"));
954 968 }
969 +
970 + // --- format_status_target with routes ---
971 +
972 + #[test]
973 + fn status_target_all_routes_ok() {
974 + let checks = vec![
975 + RouteCheckRow { id: 1, target: "mnw".to_string(), path: "/".to_string(), status_code: 200, ok: true, response_time_ms: 50, checked_at: "2026-03-13T00:00:00Z".to_string(), error: None },
976 + RouteCheckRow { id: 2, target: "mnw".to_string(), path: "/docs".to_string(), status_code: 200, ok: true, response_time_ms: 60, checked_at: "2026-03-13T00:00:00Z".to_string(), error: None },
977 + ];
978 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, Some(&checks), None, None, None);
979 + assert!(out.contains("Routes: 2/2 OK"));
980 + }
981 +
982 + #[test]
983 + fn status_target_some_routes_failing() {
984 + let checks = vec![
985 + RouteCheckRow { id: 1, target: "mnw".to_string(), path: "/".to_string(), status_code: 200, ok: true, response_time_ms: 50, checked_at: "2026-03-13T00:00:00Z".to_string(), error: None },
986 + RouteCheckRow { id: 2, target: "mnw".to_string(), path: "/docs/faq".to_string(), status_code: 404, ok: false, response_time_ms: 30, checked_at: "2026-03-13T00:00:00Z".to_string(), error: Some("HTTP 404".to_string()) },
987 + RouteCheckRow { id: 3, target: "mnw".to_string(), path: "/pricing".to_string(), status_code: 500, ok: false, response_time_ms: 20, checked_at: "2026-03-13T00:00:00Z".to_string(), error: Some("HTTP 500".to_string()) },
988 + ];
989 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, Some(&checks), None, None, None);
990 + assert!(out.contains("Routes: 1/3 (FAIL: /docs/faq, /pricing)"));
991 + }
992 +
993 + #[test]
994 + fn status_target_no_route_checks() {
995 + let out = format_status_target("mnw", "MakeNotWork", None, None, None, None, None, None, None);
996 + assert!(!out.contains("Routes"));
997 + }
955 998 }
M src/error.rs +10
M src/peer.rs +119 -19
M src/types.rs +63