Skip to main content

max / makenotwork

21.4 KB · 637 lines History Blame Raw
1 //! Peer mesh — identity, heartbeat monitoring, and mesh state aggregation.
2
3 use std::collections::HashMap;
4 use std::sync::Arc;
5 use tokio::sync::RwLock;
6 use serde::Serialize;
7
8 use tracing::instrument;
9
10 use crate::alerts::Alerter;
11 use crate::config::PeerConfig;
12 use crate::error::{PomError, Result};
13
14 // --- Identity ---
15
16 /// Load or create a persistent instance ID (UUID v4).
17 /// Stored at `~/.local/share/pom/instance_id`, same directory as `pom.db`.
18 pub fn load_or_create_instance_id(
19 override_id: Option<&str>,
20 ) -> Result<String> {
21 if let Some(id) = override_id {
22 return Ok(id.to_string());
23 }
24
25 let data_dir = dirs::data_local_dir()
26 .ok_or_else(|| PomError::Config("Could not determine data directory".into()))?;
27 let pom_dir = data_dir.join("pom");
28 std::fs::create_dir_all(&pom_dir)?;
29 let id_path = pom_dir.join("instance_id");
30
31 if id_path.exists() {
32 let id = std::fs::read_to_string(&id_path)?.trim().to_string();
33 if !id.is_empty() {
34 return Ok(id);
35 }
36 }
37
38 let id = uuid::Uuid::new_v4().to_string();
39 std::fs::write(&id_path, &id)?;
40 Ok(id)
41 }
42
43 // --- Types ---
44
45 #[derive(Debug, Clone, Serialize, serde::Deserialize)]
46 pub struct InstanceInfo {
47 pub id: String,
48 pub name: String,
49 pub version: String,
50 pub targets: Vec<String>,
51 pub started_at: String,
52 }
53
54 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
55 #[serde(rename_all = "lowercase")]
56 pub enum PeerStatus {
57 /// Last heartbeat succeeded and UUID matches the known identity.
58 Online,
59 /// Heartbeat failed but consecutive failures are still below `grace_count`.
60 GracePeriod,
61 /// Heartbeat failed and consecutive failures have reached or exceeded `grace_count`.
62 Missing,
63 /// Initial state before the first heartbeat attempt has completed.
64 Unknown,
65 }
66
67 impl std::fmt::Display for PeerStatus {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 match self {
70 Self::Online => write!(f, "online"),
71 Self::GracePeriod => write!(f, "grace_period"),
72 Self::Missing => write!(f, "missing"),
73 Self::Unknown => write!(f, "unknown"),
74 }
75 }
76 }
77
78 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, serde::Deserialize)]
79 #[serde(rename_all = "lowercase")]
80 pub enum OnMissing {
81 /// Send an email alert via Postmark when the peer is declared missing.
82 Alert,
83 /// Emit a `tracing::warn` log line (default behavior).
84 #[default]
85 Log,
86 /// Suppress all failure output for this peer.
87 Ignore,
88 }
89
90 #[derive(Debug, Clone, Serialize)]
91 pub struct PeerState {
92 pub address: String,
93 pub on_missing: OnMissing,
94 pub grace_count: u32,
95 pub status: PeerStatus,
96 pub info: Option<InstanceInfo>,
97 pub last_seen: Option<String>,
98 pub latency_ms: Option<u64>,
99 pub consecutive_failures: u32,
100 #[serde(skip)]
101 pub known_id: Option<String>,
102 /// Cached status data from the peer's /api/peer/status endpoint.
103 #[serde(skip)]
104 pub status_data: Option<serde_json::Value>,
105 /// Bearer token for authenticating with this peer's API.
106 #[serde(skip)]
107 pub token: Option<String>,
108 }
109
110 // --- Mesh State ---
111
112 #[derive(Debug)]
113 pub struct MeshState {
114 pub instance: InstanceInfo,
115 pub peers: HashMap<String, PeerState>,
116 }
117
118 pub type SharedMeshState = Arc<RwLock<MeshState>>;
119
120 pub fn new_mesh_state(
121 instance: InstanceInfo,
122 peer_configs: &HashMap<String, PeerConfig>,
123 ) -> SharedMeshState {
124 let mut peers = HashMap::new();
125 for (name, cfg) in peer_configs {
126 peers.insert(
127 name.clone(),
128 PeerState {
129 address: cfg.address.clone(),
130 on_missing: cfg.on_missing,
131 grace_count: cfg.grace_count.unwrap_or(3),
132 status: PeerStatus::Unknown,
133 info: None,
134 last_seen: None,
135 latency_ms: None,
136 consecutive_failures: 0,
137 known_id: None,
138 status_data: None,
139 token: cfg.token.clone(),
140 },
141 );
142 }
143 Arc::new(RwLock::new(MeshState { instance, peers }))
144 }
145
146 // --- Heartbeat ---
147
148 #[instrument(skip_all)]
149 pub async fn spawn_heartbeat_tasks(
150 mesh: SharedMeshState,
151 pool: sqlx::SqlitePool,
152 interval_secs: u64,
153 alerter: Option<Alerter>,
154 token: tokio_util::sync::CancellationToken,
155 ) -> Vec<tokio::task::JoinHandle<()>> {
156 let peer_names: Vec<String> = {
157 let mesh_guard = mesh.read().await;
158 mesh_guard.peers.keys().cloned().collect()
159 };
160
161 let mut handles = Vec::new();
162 for peer_name in peer_names {
163 let mesh = Arc::clone(&mesh);
164 let pool = pool.clone();
165 let alerter = alerter.clone();
166 let token = token.clone();
167 handles.push(tokio::spawn(async move {
168 heartbeat_loop(&peer_name, mesh, pool, interval_secs, alerter, token).await;
169 }));
170 }
171 handles
172 }
173
174 async fn heartbeat_loop(
175 peer_name: &str,
176 mesh: SharedMeshState,
177 pool: sqlx::SqlitePool,
178 interval_secs: u64,
179 alerter: Option<Alerter>,
180 cancel: tokio_util::sync::CancellationToken,
181 ) {
182 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
183 // Skip the first immediate tick — give peers time to start up
184 interval.tick().await;
185
186 let (address, auth_token) = {
187 let state = mesh.read().await;
188 match state.peers.get(peer_name) {
189 Some(p) => (p.address.clone(), p.token.clone()),
190 None => return,
191 }
192 };
193
194 let client = reqwest::Client::builder()
195 .timeout(std::time::Duration::from_secs(10))
196 .build()
197 .unwrap_or_default();
198
199 loop {
200 tokio::select! {
201 _ = cancel.cancelled() => break,
202 _ = interval.tick() => {}
203 }
204
205 let start = std::time::Instant::now();
206 let mut req = client.get(format!("http://{address}/api/peer/info"));
207 if let Some(ref t) = auth_token {
208 req = req.bearer_auth(t);
209 }
210 let result = req.send().await;
211 let latency_ms = start.elapsed().as_millis() as u64;
212
213 match result.and_then(|r| r.error_for_status()) {
214 Ok(response) => {
215 let info: Option<InstanceInfo> = response.json().await.ok();
216 handle_heartbeat_success(peer_name, &mesh, &pool, info, latency_ms, &alerter).await;
217 }
218 Err(_e) => {
219 handle_heartbeat_failure(peer_name, &mesh, &pool, latency_ms, &alerter).await;
220 }
221 }
222
223 // Also fetch /api/peer/status for mesh aggregation
224 let mut status_req = client.get(format!("http://{address}/api/peer/status"));
225 if let Some(ref t) = auth_token {
226 status_req = status_req.bearer_auth(t);
227 }
228 let status_result = status_req.send().await;
229 match status_result {
230 Ok(resp) => {
231 if let Ok(data) = resp.json::<serde_json::Value>().await {
232 let mut state = mesh.write().await;
233 if let Some(peer) = state.peers.get_mut(peer_name) {
234 peer.status_data = Some(data);
235 }
236 }
237 }
238 Err(e) => {
239 tracing::debug!("{peer_name}: failed to fetch /api/peer/status: {e}");
240 }
241 }
242 }
243 }
244
245 async fn handle_heartbeat_success(
246 peer_name: &str,
247 mesh: &SharedMeshState,
248 pool: &sqlx::SqlitePool,
249 info: Option<InstanceInfo>,
250 latency_ms: u64,
251 alerter: &Option<Alerter>,
252 ) {
253 let now = chrono::Utc::now().to_rfc3339();
254
255 // Update in-memory state under lock, collect data for DB writes
256 let (new_identity_id, updated_identity_id, recovery_info) = {
257 let mut state = mesh.write().await;
258 let Some(peer) = state.peers.get_mut(peer_name) else {
259 return;
260 };
261
262 let was_missing = peer.status == PeerStatus::Missing || peer.status == PeerStatus::GracePeriod;
263
264 // Check UUID consistency
265 let mut new_identity = None;
266 let mut updated_identity = None;
267 if let Some(ref info) = info {
268 match &peer.known_id {
269 None => {
270 peer.known_id = Some(info.id.clone());
271 new_identity = Some(info.id.clone());
272 tracing::info!("{peer_name}: first contact, id={}", info.id);
273 }
274 Some(known) if known != &info.id => {
275 // UUID changed -- likely a reinstall. Log the error and reset
276 // peer state so the new identity is accepted.
277 tracing::error!(
278 "{peer_name}: UUID mismatch! expected={known}, got={}. Resetting peer state (possible reinstall).",
279 info.id
280 );
281 peer.known_id = Some(info.id.clone());
282 updated_identity = Some(info.id.clone());
283 }
284 _ => {}
285 }
286 }
287
288 // Collect recovery data before mutating state
289 let recovery = if was_missing && peer.on_missing == OnMissing::Alert {
290 Some(peer.address.clone())
291 } else {
292 None
293 };
294
295 if was_missing {
296 tracing::info!("{peer_name}: recovered (was {:?})", peer.status);
297 }
298
299 peer.status = PeerStatus::Online;
300 peer.info = info;
301 peer.last_seen = Some(now);
302 peer.latency_ms = Some(latency_ms);
303 peer.consecutive_failures = 0;
304
305 (new_identity, updated_identity, recovery)
306 };
307 // Lock dropped — DB writes and alerts happen without holding mesh lock
308
309 if let Some(id) = new_identity_id {
310 let _ = crate::db::store_peer_identity(pool, peer_name, &id).await;
311 }
312 if let Some(id) = updated_identity_id {
313 let _ = crate::db::update_peer_identity(pool, peer_name, &id).await;
314 }
315 let _ = crate::db::insert_peer_heartbeat(pool, peer_name, "online", latency_ms as i64).await;
316
317 if let (Some(address), Some(alerter)) = (recovery_info, alerter) {
318 alerter.send_peer_recovery(peer_name, &address).await;
319 }
320 }
321
322 async fn handle_heartbeat_failure(
323 peer_name: &str,
324 mesh: &SharedMeshState,
325 pool: &sqlx::SqlitePool,
326 latency_ms: u64,
327 alerter: &Option<Alerter>,
328 ) {
329 // Update in-memory state under lock, collect data for alert after lock drop
330 let (new_status, alert_info) = {
331 let mut state = mesh.write().await;
332 let Some(peer) = state.peers.get_mut(peer_name) else {
333 return;
334 };
335
336 peer.consecutive_failures += 1;
337
338 let new_status = match peer.status {
339 PeerStatus::Online | PeerStatus::Unknown | PeerStatus::GracePeriod => {
340 if peer.consecutive_failures >= peer.grace_count {
341 PeerStatus::Missing
342 } else {
343 PeerStatus::GracePeriod
344 }
345 }
346 PeerStatus::Missing => PeerStatus::Missing,
347 };
348
349 let transitioned_to_missing = new_status == PeerStatus::Missing && peer.status != PeerStatus::Missing;
350
351 // Collect alert data before mutating state
352 let alert_info = if transitioned_to_missing {
353 match peer.on_missing {
354 OnMissing::Alert => {
355 tracing::warn!(
356 "{peer_name}: MISSING after {} consecutive failures (action: alert)",
357 peer.consecutive_failures
358 );
359 Some((peer.address.clone(), peer.consecutive_failures))
360 }
361 OnMissing::Log => {
362 tracing::info!(
363 "{peer_name}: missing after {} consecutive failures",
364 peer.consecutive_failures
365 );
366 None
367 }
368 OnMissing::Ignore => None,
369 }
370 } else {
371 None
372 };
373
374 peer.status = new_status;
375
376 (new_status, alert_info)
377 };
378 // Lock dropped — DB write and alerts happen without holding mesh lock
379
380 let status_str = new_status.to_string();
381 let _ = crate::db::insert_peer_heartbeat(pool, peer_name, &status_str, latency_ms as i64).await;
382
383 if let (Some((address, failures)), Some(alerter)) = (alert_info, alerter) {
384 alerter.send_peer_missing(peer_name, &address, failures).await;
385 }
386 }
387
388 #[cfg(test)]
389 mod tests {
390 use super::*;
391
392 #[test]
393 fn override_id_takes_precedence() {
394 let id = load_or_create_instance_id(Some("override-id")).unwrap();
395 assert_eq!(id, "override-id");
396 }
397
398 #[test]
399 fn auto_id_is_valid_uuid() {
400 let id = load_or_create_instance_id(None).unwrap();
401 assert!(uuid::Uuid::parse_str(&id).is_ok());
402 }
403
404 #[test]
405 fn on_missing_deserialize() {
406 #[derive(serde::Deserialize)]
407 struct Wrapper {
408 #[serde(default)]
409 on_missing: OnMissing,
410 }
411
412 let w: Wrapper = toml::from_str(r#"on_missing = "alert""#).unwrap();
413 assert_eq!(w.on_missing, OnMissing::Alert);
414
415 let w: Wrapper = toml::from_str(r#"on_missing = "log""#).unwrap();
416 assert_eq!(w.on_missing, OnMissing::Log);
417
418 let w: Wrapper = toml::from_str(r#"on_missing = "ignore""#).unwrap();
419 assert_eq!(w.on_missing, OnMissing::Ignore);
420
421 // Default is Log
422 let w: Wrapper = toml::from_str("").unwrap();
423 assert_eq!(w.on_missing, OnMissing::Log);
424 }
425
426 fn test_instance_info() -> InstanceInfo {
427 InstanceInfo {
428 id: "test-id".to_string(),
429 name: "test".to_string(),
430 version: "0.1.0".to_string(),
431 targets: vec![],
432 started_at: "2026-03-10T00:00:00Z".to_string(),
433 }
434 }
435
436 fn test_mesh_with_peer(grace_count: u32) -> SharedMeshState {
437 let mut peer_configs = HashMap::new();
438 peer_configs.insert(
439 "peer1".to_string(),
440 PeerConfig {
441 address: "10.0.0.1:9100".to_string(),
442 on_missing: OnMissing::Alert,
443 grace_count: Some(grace_count),
444 token: None,
445 },
446 );
447 new_mesh_state(test_instance_info(), &peer_configs)
448 }
449
450 #[tokio::test]
451 async fn heartbeat_failure_transitions_through_grace_to_missing() {
452 let pool = crate::db::connect_in_memory().await.unwrap();
453 let mesh = test_mesh_with_peer(3);
454
455 // Start at Unknown
456 assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Unknown);
457
458 // First failure → GracePeriod
459 handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await;
460 assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::GracePeriod);
461
462 // Second failure → still GracePeriod
463 handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await;
464 assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::GracePeriod);
465
466 // Third failure (= grace_count) → Missing
467 handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await;
468 assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing);
469
470 // Fourth failure → stays Missing
471 handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await;
472 assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing);
473 }
474
475 #[tokio::test]
476 async fn heartbeat_success_recovers_from_missing() {
477 let pool = crate::db::connect_in_memory().await.unwrap();
478 let mesh = test_mesh_with_peer(1);
479
480 // Drive to Missing
481 handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await;
482 assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing);
483
484 // Success → Online
485 let info = InstanceInfo {
486 id: "remote-id".to_string(),
487 name: "remote".to_string(),
488 version: "0.1.0".to_string(),
489 targets: vec![],
490 started_at: "2026-03-10T00:00:00Z".to_string(),
491 };
492 handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 42, &None).await;
493
494 let state = mesh.read().await;
495 let peer = &state.peers["peer1"];
496 assert_eq!(peer.status, PeerStatus::Online);
497 assert_eq!(peer.consecutive_failures, 0);
498 assert_eq!(peer.latency_ms, Some(42));
499 assert_eq!(peer.known_id.as_deref(), Some("remote-id"));
500 }
501
502 #[tokio::test]
503 async fn heartbeat_success_detects_uuid_stored_on_first_contact() {
504 let pool = crate::db::connect_in_memory().await.unwrap();
505 let mesh = test_mesh_with_peer(3);
506
507 let info = InstanceInfo {
508 id: "uuid-abc".to_string(),
509 name: "remote".to_string(),
510 version: "0.1.0".to_string(),
511 targets: vec![],
512 started_at: "2026-03-10T00:00:00Z".to_string(),
513 };
514 handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 10, &None).await;
515
516 // UUID should be persisted in DB
517 let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap();
518 assert_eq!(stored, Some("uuid-abc".to_string()));
519 }
520
521 #[tokio::test]
522 async fn heartbeat_records_to_db() {
523 let pool = crate::db::connect_in_memory().await.unwrap();
524 let mesh = test_mesh_with_peer(3);
525
526 handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await;
527 handle_heartbeat_success("peer1", &mesh, &pool, None, 55, &None).await;
528
529 let history = crate::db::get_peer_heartbeat_history(&pool, "peer1", 10).await.unwrap();
530 assert_eq!(history.len(), 2);
531 // Most recent first
532 assert_eq!(history[0].status, "online");
533 assert_eq!(history[0].latency_ms, 55);
534 assert_eq!(history[1].status, "grace_period");
535 }
536
537 #[tokio::test]
538 async fn heartbeat_uuid_mismatch_resets_state() {
539 let pool = crate::db::connect_in_memory().await.unwrap();
540 let mesh = test_mesh_with_peer(3);
541
542 // First contact with UUID "aaa"
543 let info_a = InstanceInfo {
544 id: "aaa".to_string(),
545 name: "remote".to_string(),
546 version: "0.1.0".to_string(),
547 targets: vec![],
548 started_at: "2026-03-10T00:00:00Z".to_string(),
549 };
550 handle_heartbeat_success("peer1", &mesh, &pool, Some(info_a), 10, &None).await;
551 assert_eq!(mesh.read().await.peers["peer1"].known_id.as_deref(), Some("aaa"));
552 assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Online);
553
554 // UUID stored in DB
555 let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap();
556 assert_eq!(stored, Some("aaa".to_string()));
557
558 // Reconnect with UUID "bbb" (simulating reinstall)
559 let info_b = InstanceInfo {
560 id: "bbb".to_string(),
561 name: "remote-reinstalled".to_string(),
562 version: "0.2.0".to_string(),
563 targets: vec![],
564 started_at: "2026-03-13T00:00:00Z".to_string(),
565 };
566 handle_heartbeat_success("peer1", &mesh, &pool, Some(info_b), 20, &None).await;
567
568 // State should be reset: new UUID accepted, status online, failures cleared
569 let state = mesh.read().await;
570 let peer = &state.peers["peer1"];
571 assert_eq!(peer.known_id.as_deref(), Some("bbb"));
572 assert_eq!(peer.status, PeerStatus::Online);
573 assert_eq!(peer.consecutive_failures, 0);
574
575 // DB identity should be updated to "bbb"
576 let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap();
577 assert_eq!(stored, Some("bbb".to_string()));
578 }
579
580 #[tokio::test]
581 async fn heartbeat_same_uuid_proceeds_normally() {
582 let pool = crate::db::connect_in_memory().await.unwrap();
583 let mesh = test_mesh_with_peer(3);
584
585 let info = InstanceInfo {
586 id: "same-uuid".to_string(),
587 name: "remote".to_string(),
588 version: "0.1.0".to_string(),
589 targets: vec![],
590 started_at: "2026-03-10T00:00:00Z".to_string(),
591 };
592 // First contact
593 handle_heartbeat_success("peer1", &mesh, &pool, Some(info.clone()), 10, &None).await;
594 assert_eq!(mesh.read().await.peers["peer1"].known_id.as_deref(), Some("same-uuid"));
595
596 // Second heartbeat with same UUID
597 handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 15, &None).await;
598 let state = mesh.read().await;
599 let peer = &state.peers["peer1"];
600 assert_eq!(peer.known_id.as_deref(), Some("same-uuid"));
601 assert_eq!(peer.status, PeerStatus::Online);
602 assert_eq!(peer.latency_ms, Some(15));
603 }
604
605 #[test]
606 fn mesh_state_construction() {
607 let info = InstanceInfo {
608 id: "test-id".to_string(),
609 name: "test".to_string(),
610 version: "0.1.0".to_string(),
611 targets: vec!["mnw".to_string()],
612 started_at: "2026-03-10T00:00:00Z".to_string(),
613 };
614
615 let mut peer_configs = HashMap::new();
616 peer_configs.insert(
617 "peer1".to_string(),
618 PeerConfig {
619 address: "10.0.0.1:9100".to_string(),
620 on_missing: OnMissing::Alert,
621 grace_count: Some(5),
622 token: None,
623 },
624 );
625
626 let mesh = new_mesh_state(info, &peer_configs);
627 let state = mesh.blocking_read();
628 assert_eq!(state.instance.id, "test-id");
629 assert_eq!(state.peers.len(), 1);
630 let peer = state.peers.get("peer1").unwrap();
631 assert_eq!(peer.address, "10.0.0.1:9100");
632 assert_eq!(peer.on_missing, OnMissing::Alert);
633 assert_eq!(peer.grace_count, 5);
634 assert_eq!(peer.status, PeerStatus::Unknown);
635 }
636 }
637