//! Peer mesh — identity, heartbeat monitoring, and mesh state aggregation. use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use serde::Serialize; use tracing::instrument; use crate::alerts::Alerter; use crate::config::PeerConfig; use crate::error::{PomError, Result}; // --- Identity --- /// Load or create a persistent instance ID (UUID v4). /// Stored at `~/.local/share/pom/instance_id`, same directory as `pom.db`. pub fn load_or_create_instance_id( override_id: Option<&str>, ) -> Result { if let Some(id) = override_id { return Ok(id.to_string()); } let data_dir = dirs::data_local_dir() .ok_or_else(|| PomError::Config("Could not determine data directory".into()))?; let pom_dir = data_dir.join("pom"); std::fs::create_dir_all(&pom_dir)?; let id_path = pom_dir.join("instance_id"); if id_path.exists() { let id = std::fs::read_to_string(&id_path)?.trim().to_string(); if !id.is_empty() { return Ok(id); } } let id = uuid::Uuid::new_v4().to_string(); std::fs::write(&id_path, &id)?; Ok(id) } // --- Types --- #[derive(Debug, Clone, Serialize, serde::Deserialize)] pub struct InstanceInfo { pub id: String, pub name: String, pub version: String, pub targets: Vec, pub started_at: String, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] #[serde(rename_all = "lowercase")] pub enum PeerStatus { /// Last heartbeat succeeded and UUID matches the known identity. Online, /// Heartbeat failed but consecutive failures are still below `grace_count`. GracePeriod, /// Heartbeat failed and consecutive failures have reached or exceeded `grace_count`. Missing, /// Initial state before the first heartbeat attempt has completed. Unknown, } impl std::fmt::Display for PeerStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Online => write!(f, "online"), Self::GracePeriod => write!(f, "grace_period"), Self::Missing => write!(f, "missing"), Self::Unknown => write!(f, "unknown"), } } } #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, serde::Deserialize)] #[serde(rename_all = "lowercase")] pub enum OnMissing { /// Send an email alert via Postmark when the peer is declared missing. Alert, /// Emit a `tracing::warn` log line (default behavior). #[default] Log, /// Suppress all failure output for this peer. Ignore, } #[derive(Debug, Clone, Serialize)] pub struct PeerState { pub address: String, pub on_missing: OnMissing, pub grace_count: u32, pub status: PeerStatus, pub info: Option, pub last_seen: Option, pub latency_ms: Option, pub consecutive_failures: u32, #[serde(skip)] pub known_id: Option, /// Cached status data from the peer's /api/peer/status endpoint. #[serde(skip)] pub status_data: Option, /// Bearer token for authenticating with this peer's API. #[serde(skip)] pub token: Option, } // --- Mesh State --- #[derive(Debug)] pub struct MeshState { pub instance: InstanceInfo, pub peers: HashMap, } pub type SharedMeshState = Arc>; pub fn new_mesh_state( instance: InstanceInfo, peer_configs: &HashMap, ) -> SharedMeshState { let mut peers = HashMap::new(); for (name, cfg) in peer_configs { peers.insert( name.clone(), PeerState { address: cfg.address.clone(), on_missing: cfg.on_missing, grace_count: cfg.grace_count.unwrap_or(3), status: PeerStatus::Unknown, info: None, last_seen: None, latency_ms: None, consecutive_failures: 0, known_id: None, status_data: None, token: cfg.token.clone(), }, ); } Arc::new(RwLock::new(MeshState { instance, peers })) } // --- Heartbeat --- #[instrument(skip_all)] pub async fn spawn_heartbeat_tasks( mesh: SharedMeshState, pool: sqlx::SqlitePool, interval_secs: u64, alerter: Option, token: tokio_util::sync::CancellationToken, ) -> Vec> { let peer_names: Vec = { let mesh_guard = mesh.read().await; mesh_guard.peers.keys().cloned().collect() }; let mut handles = Vec::new(); for peer_name in peer_names { let mesh = Arc::clone(&mesh); let pool = pool.clone(); let alerter = alerter.clone(); let token = token.clone(); handles.push(tokio::spawn(async move { heartbeat_loop(&peer_name, mesh, pool, interval_secs, alerter, token).await; })); } handles } async fn heartbeat_loop( peer_name: &str, mesh: SharedMeshState, pool: sqlx::SqlitePool, interval_secs: u64, alerter: Option, cancel: tokio_util::sync::CancellationToken, ) { let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); // Skip the first immediate tick — give peers time to start up interval.tick().await; let (address, auth_token) = { let state = mesh.read().await; match state.peers.get(peer_name) { Some(p) => (p.address.clone(), p.token.clone()), None => return, } }; let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(10)) .build() .unwrap_or_default(); loop { tokio::select! { _ = cancel.cancelled() => break, _ = interval.tick() => {} } let start = std::time::Instant::now(); let mut req = client.get(format!("http://{address}/api/peer/info")); if let Some(ref t) = auth_token { req = req.bearer_auth(t); } let result = req.send().await; let latency_ms = start.elapsed().as_millis() as u64; match result.and_then(|r| r.error_for_status()) { Ok(response) => { let info: Option = response.json().await.ok(); handle_heartbeat_success(peer_name, &mesh, &pool, info, latency_ms, &alerter).await; } Err(_e) => { handle_heartbeat_failure(peer_name, &mesh, &pool, latency_ms, &alerter).await; } } // Also fetch /api/peer/status for mesh aggregation let mut status_req = client.get(format!("http://{address}/api/peer/status")); if let Some(ref t) = auth_token { status_req = status_req.bearer_auth(t); } let status_result = status_req.send().await; match status_result { Ok(resp) => { if let Ok(data) = resp.json::().await { let mut state = mesh.write().await; if let Some(peer) = state.peers.get_mut(peer_name) { peer.status_data = Some(data); } } } Err(e) => { tracing::debug!("{peer_name}: failed to fetch /api/peer/status: {e}"); } } } } async fn handle_heartbeat_success( peer_name: &str, mesh: &SharedMeshState, pool: &sqlx::SqlitePool, info: Option, latency_ms: u64, alerter: &Option, ) { let now = chrono::Utc::now().to_rfc3339(); // Update in-memory state under lock, collect data for DB writes let (new_identity_id, updated_identity_id, recovery_info) = { let mut state = mesh.write().await; let Some(peer) = state.peers.get_mut(peer_name) else { return; }; let was_missing = peer.status == PeerStatus::Missing || peer.status == PeerStatus::GracePeriod; // Check UUID consistency let mut new_identity = None; let mut updated_identity = None; if let Some(ref info) = info { match &peer.known_id { None => { peer.known_id = Some(info.id.clone()); new_identity = Some(info.id.clone()); tracing::info!("{peer_name}: first contact, id={}", info.id); } Some(known) if known != &info.id => { // UUID changed -- likely a reinstall. Log the error and reset // peer state so the new identity is accepted. tracing::error!( "{peer_name}: UUID mismatch! expected={known}, got={}. Resetting peer state (possible reinstall).", info.id ); peer.known_id = Some(info.id.clone()); updated_identity = Some(info.id.clone()); } _ => {} } } // Collect recovery data before mutating state let recovery = if was_missing && peer.on_missing == OnMissing::Alert { Some(peer.address.clone()) } else { None }; if was_missing { tracing::info!("{peer_name}: recovered (was {:?})", peer.status); } peer.status = PeerStatus::Online; peer.info = info; peer.last_seen = Some(now); peer.latency_ms = Some(latency_ms); peer.consecutive_failures = 0; (new_identity, updated_identity, recovery) }; // Lock dropped — DB writes and alerts happen without holding mesh lock if let Some(id) = new_identity_id { let _ = crate::db::store_peer_identity(pool, peer_name, &id).await; } if let Some(id) = updated_identity_id { let _ = crate::db::update_peer_identity(pool, peer_name, &id).await; } let _ = crate::db::insert_peer_heartbeat(pool, peer_name, "online", latency_ms as i64).await; if let (Some(address), Some(alerter)) = (recovery_info, alerter) { alerter.send_peer_recovery(peer_name, &address).await; } } async fn handle_heartbeat_failure( peer_name: &str, mesh: &SharedMeshState, pool: &sqlx::SqlitePool, latency_ms: u64, alerter: &Option, ) { // Update in-memory state under lock, collect data for alert after lock drop let (new_status, alert_info) = { let mut state = mesh.write().await; let Some(peer) = state.peers.get_mut(peer_name) else { return; }; peer.consecutive_failures += 1; let new_status = match peer.status { PeerStatus::Online | PeerStatus::Unknown | PeerStatus::GracePeriod => { if peer.consecutive_failures >= peer.grace_count { PeerStatus::Missing } else { PeerStatus::GracePeriod } } PeerStatus::Missing => PeerStatus::Missing, }; let transitioned_to_missing = new_status == PeerStatus::Missing && peer.status != PeerStatus::Missing; // Collect alert data before mutating state let alert_info = if transitioned_to_missing { match peer.on_missing { OnMissing::Alert => { tracing::warn!( "{peer_name}: MISSING after {} consecutive failures (action: alert)", peer.consecutive_failures ); Some((peer.address.clone(), peer.consecutive_failures)) } OnMissing::Log => { tracing::info!( "{peer_name}: missing after {} consecutive failures", peer.consecutive_failures ); None } OnMissing::Ignore => None, } } else { None }; peer.status = new_status; (new_status, alert_info) }; // Lock dropped — DB write and alerts happen without holding mesh lock let status_str = new_status.to_string(); let _ = crate::db::insert_peer_heartbeat(pool, peer_name, &status_str, latency_ms as i64).await; if let (Some((address, failures)), Some(alerter)) = (alert_info, alerter) { alerter.send_peer_missing(peer_name, &address, failures).await; } } #[cfg(test)] mod tests { use super::*; #[test] fn override_id_takes_precedence() { let id = load_or_create_instance_id(Some("override-id")).unwrap(); assert_eq!(id, "override-id"); } #[test] fn auto_id_is_valid_uuid() { let id = load_or_create_instance_id(None).unwrap(); assert!(uuid::Uuid::parse_str(&id).is_ok()); } #[test] fn on_missing_deserialize() { #[derive(serde::Deserialize)] struct Wrapper { #[serde(default)] on_missing: OnMissing, } let w: Wrapper = toml::from_str(r#"on_missing = "alert""#).unwrap(); assert_eq!(w.on_missing, OnMissing::Alert); let w: Wrapper = toml::from_str(r#"on_missing = "log""#).unwrap(); assert_eq!(w.on_missing, OnMissing::Log); let w: Wrapper = toml::from_str(r#"on_missing = "ignore""#).unwrap(); assert_eq!(w.on_missing, OnMissing::Ignore); // Default is Log let w: Wrapper = toml::from_str("").unwrap(); assert_eq!(w.on_missing, OnMissing::Log); } fn test_instance_info() -> InstanceInfo { InstanceInfo { id: "test-id".to_string(), name: "test".to_string(), version: "0.1.0".to_string(), targets: vec![], started_at: "2026-03-10T00:00:00Z".to_string(), } } fn test_mesh_with_peer(grace_count: u32) -> SharedMeshState { let mut peer_configs = HashMap::new(); peer_configs.insert( "peer1".to_string(), PeerConfig { address: "10.0.0.1:9100".to_string(), on_missing: OnMissing::Alert, grace_count: Some(grace_count), token: None, }, ); new_mesh_state(test_instance_info(), &peer_configs) } #[tokio::test] async fn heartbeat_failure_transitions_through_grace_to_missing() { let pool = crate::db::connect_in_memory().await.unwrap(); let mesh = test_mesh_with_peer(3); // Start at Unknown assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Unknown); // First failure → GracePeriod handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::GracePeriod); // Second failure → still GracePeriod handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::GracePeriod); // Third failure (= grace_count) → Missing handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing); // Fourth failure → stays Missing handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing); } #[tokio::test] async fn heartbeat_success_recovers_from_missing() { let pool = crate::db::connect_in_memory().await.unwrap(); let mesh = test_mesh_with_peer(1); // Drive to Missing handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing); // Success → Online let info = InstanceInfo { id: "remote-id".to_string(), name: "remote".to_string(), version: "0.1.0".to_string(), targets: vec![], started_at: "2026-03-10T00:00:00Z".to_string(), }; handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 42, &None).await; let state = mesh.read().await; let peer = &state.peers["peer1"]; assert_eq!(peer.status, PeerStatus::Online); assert_eq!(peer.consecutive_failures, 0); assert_eq!(peer.latency_ms, Some(42)); assert_eq!(peer.known_id.as_deref(), Some("remote-id")); } #[tokio::test] async fn heartbeat_success_detects_uuid_stored_on_first_contact() { let pool = crate::db::connect_in_memory().await.unwrap(); let mesh = test_mesh_with_peer(3); let info = InstanceInfo { id: "uuid-abc".to_string(), name: "remote".to_string(), version: "0.1.0".to_string(), targets: vec![], started_at: "2026-03-10T00:00:00Z".to_string(), }; handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 10, &None).await; // UUID should be persisted in DB let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap(); assert_eq!(stored, Some("uuid-abc".to_string())); } #[tokio::test] async fn heartbeat_records_to_db() { let pool = crate::db::connect_in_memory().await.unwrap(); let mesh = test_mesh_with_peer(3); handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; handle_heartbeat_success("peer1", &mesh, &pool, None, 55, &None).await; let history = crate::db::get_peer_heartbeat_history(&pool, "peer1", 10).await.unwrap(); assert_eq!(history.len(), 2); // Most recent first assert_eq!(history[0].status, "online"); assert_eq!(history[0].latency_ms, 55); assert_eq!(history[1].status, "grace_period"); } #[tokio::test] async fn heartbeat_uuid_mismatch_resets_state() { let pool = crate::db::connect_in_memory().await.unwrap(); let mesh = test_mesh_with_peer(3); // First contact with UUID "aaa" let info_a = InstanceInfo { id: "aaa".to_string(), name: "remote".to_string(), version: "0.1.0".to_string(), targets: vec![], started_at: "2026-03-10T00:00:00Z".to_string(), }; handle_heartbeat_success("peer1", &mesh, &pool, Some(info_a), 10, &None).await; assert_eq!(mesh.read().await.peers["peer1"].known_id.as_deref(), Some("aaa")); assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Online); // UUID stored in DB let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap(); assert_eq!(stored, Some("aaa".to_string())); // Reconnect with UUID "bbb" (simulating reinstall) let info_b = InstanceInfo { id: "bbb".to_string(), name: "remote-reinstalled".to_string(), version: "0.2.0".to_string(), targets: vec![], started_at: "2026-03-13T00:00:00Z".to_string(), }; handle_heartbeat_success("peer1", &mesh, &pool, Some(info_b), 20, &None).await; // State should be reset: new UUID accepted, status online, failures cleared let state = mesh.read().await; let peer = &state.peers["peer1"]; assert_eq!(peer.known_id.as_deref(), Some("bbb")); assert_eq!(peer.status, PeerStatus::Online); assert_eq!(peer.consecutive_failures, 0); // DB identity should be updated to "bbb" let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap(); assert_eq!(stored, Some("bbb".to_string())); } #[tokio::test] async fn heartbeat_same_uuid_proceeds_normally() { let pool = crate::db::connect_in_memory().await.unwrap(); let mesh = test_mesh_with_peer(3); let info = InstanceInfo { id: "same-uuid".to_string(), name: "remote".to_string(), version: "0.1.0".to_string(), targets: vec![], started_at: "2026-03-10T00:00:00Z".to_string(), }; // First contact handle_heartbeat_success("peer1", &mesh, &pool, Some(info.clone()), 10, &None).await; assert_eq!(mesh.read().await.peers["peer1"].known_id.as_deref(), Some("same-uuid")); // Second heartbeat with same UUID handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 15, &None).await; let state = mesh.read().await; let peer = &state.peers["peer1"]; assert_eq!(peer.known_id.as_deref(), Some("same-uuid")); assert_eq!(peer.status, PeerStatus::Online); assert_eq!(peer.latency_ms, Some(15)); } #[test] fn mesh_state_construction() { let info = InstanceInfo { id: "test-id".to_string(), name: "test".to_string(), version: "0.1.0".to_string(), targets: vec!["mnw".to_string()], started_at: "2026-03-10T00:00:00Z".to_string(), }; let mut peer_configs = HashMap::new(); peer_configs.insert( "peer1".to_string(), PeerConfig { address: "10.0.0.1:9100".to_string(), on_missing: OnMissing::Alert, grace_count: Some(5), token: None, }, ); let mesh = new_mesh_state(info, &peer_configs); let state = mesh.blocking_read(); assert_eq!(state.instance.id, "test-id"); assert_eq!(state.peers.len(), 1); let peer = state.peers.get("peer1").unwrap(); assert_eq!(peer.address, "10.0.0.1:9100"); assert_eq!(peer.on_missing, OnMissing::Alert); assert_eq!(peer.grace_count, 5); assert_eq!(peer.status, PeerStatus::Unknown); } }