| 1 |
|
| 2 |
|
| 3 |
use std::collections::HashMap; |
| 4 |
use std::sync::Arc; |
| 5 |
use tokio::sync::RwLock; |
| 6 |
use serde::Serialize; |
| 7 |
|
| 8 |
use tracing::instrument; |
| 9 |
|
| 10 |
use crate::alerts::Alerter; |
| 11 |
use crate::config::PeerConfig; |
| 12 |
use crate::error::{PomError, Result}; |
| 13 |
|
| 14 |
|
| 15 |
|
| 16 |
|
| 17 |
|
| 18 |
pub fn load_or_create_instance_id( |
| 19 |
override_id: Option<&str>, |
| 20 |
) -> Result<String> { |
| 21 |
if let Some(id) = override_id { |
| 22 |
return Ok(id.to_string()); |
| 23 |
} |
| 24 |
|
| 25 |
let data_dir = dirs::data_local_dir() |
| 26 |
.ok_or_else(|| PomError::Config("Could not determine data directory".into()))?; |
| 27 |
let pom_dir = data_dir.join("pom"); |
| 28 |
std::fs::create_dir_all(&pom_dir)?; |
| 29 |
let id_path = pom_dir.join("instance_id"); |
| 30 |
|
| 31 |
if id_path.exists() { |
| 32 |
let id = std::fs::read_to_string(&id_path)?.trim().to_string(); |
| 33 |
if !id.is_empty() { |
| 34 |
return Ok(id); |
| 35 |
} |
| 36 |
} |
| 37 |
|
| 38 |
let id = uuid::Uuid::new_v4().to_string(); |
| 39 |
std::fs::write(&id_path, &id)?; |
| 40 |
Ok(id) |
| 41 |
} |
| 42 |
|
| 43 |
|
| 44 |
|
| 45 |
#[derive(Debug, Clone, Serialize, serde::Deserialize)] |
| 46 |
pub struct InstanceInfo { |
| 47 |
pub id: String, |
| 48 |
pub name: String, |
| 49 |
pub version: String, |
| 50 |
pub targets: Vec<String>, |
| 51 |
pub started_at: String, |
| 52 |
} |
| 53 |
|
| 54 |
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] |
| 55 |
#[serde(rename_all = "lowercase")] |
| 56 |
pub enum PeerStatus { |
| 57 |
|
| 58 |
Online, |
| 59 |
|
| 60 |
GracePeriod, |
| 61 |
|
| 62 |
Missing, |
| 63 |
|
| 64 |
Unknown, |
| 65 |
} |
| 66 |
|
| 67 |
impl std::fmt::Display for PeerStatus { |
| 68 |
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 69 |
match self { |
| 70 |
Self::Online => write!(f, "online"), |
| 71 |
Self::GracePeriod => write!(f, "grace_period"), |
| 72 |
Self::Missing => write!(f, "missing"), |
| 73 |
Self::Unknown => write!(f, "unknown"), |
| 74 |
} |
| 75 |
} |
| 76 |
} |
| 77 |
|
| 78 |
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, serde::Deserialize)] |
| 79 |
#[serde(rename_all = "lowercase")] |
| 80 |
pub enum OnMissing { |
| 81 |
|
| 82 |
Alert, |
| 83 |
|
| 84 |
#[default] |
| 85 |
Log, |
| 86 |
|
| 87 |
Ignore, |
| 88 |
} |
| 89 |
|
| 90 |
#[derive(Debug, Clone, Serialize)] |
| 91 |
pub struct PeerState { |
| 92 |
pub address: String, |
| 93 |
pub on_missing: OnMissing, |
| 94 |
pub grace_count: u32, |
| 95 |
pub status: PeerStatus, |
| 96 |
pub info: Option<InstanceInfo>, |
| 97 |
pub last_seen: Option<String>, |
| 98 |
pub latency_ms: Option<u64>, |
| 99 |
pub consecutive_failures: u32, |
| 100 |
#[serde(skip)] |
| 101 |
pub known_id: Option<String>, |
| 102 |
|
| 103 |
#[serde(skip)] |
| 104 |
pub status_data: Option<serde_json::Value>, |
| 105 |
|
| 106 |
#[serde(skip)] |
| 107 |
pub token: Option<String>, |
| 108 |
} |
| 109 |
|
| 110 |
|
| 111 |
|
| 112 |
#[derive(Debug)] |
| 113 |
pub struct MeshState { |
| 114 |
pub instance: InstanceInfo, |
| 115 |
pub peers: HashMap<String, PeerState>, |
| 116 |
} |
| 117 |
|
| 118 |
pub type SharedMeshState = Arc<RwLock<MeshState>>; |
| 119 |
|
| 120 |
pub fn new_mesh_state( |
| 121 |
instance: InstanceInfo, |
| 122 |
peer_configs: &HashMap<String, PeerConfig>, |
| 123 |
) -> SharedMeshState { |
| 124 |
let mut peers = HashMap::new(); |
| 125 |
for (name, cfg) in peer_configs { |
| 126 |
peers.insert( |
| 127 |
name.clone(), |
| 128 |
PeerState { |
| 129 |
address: cfg.address.clone(), |
| 130 |
on_missing: cfg.on_missing, |
| 131 |
grace_count: cfg.grace_count.unwrap_or(3), |
| 132 |
status: PeerStatus::Unknown, |
| 133 |
info: None, |
| 134 |
last_seen: None, |
| 135 |
latency_ms: None, |
| 136 |
consecutive_failures: 0, |
| 137 |
known_id: None, |
| 138 |
status_data: None, |
| 139 |
token: cfg.token.clone(), |
| 140 |
}, |
| 141 |
); |
| 142 |
} |
| 143 |
Arc::new(RwLock::new(MeshState { instance, peers })) |
| 144 |
} |
| 145 |
|
| 146 |
|
| 147 |
|
| 148 |
#[instrument(skip_all)] |
| 149 |
pub async fn spawn_heartbeat_tasks( |
| 150 |
mesh: SharedMeshState, |
| 151 |
pool: sqlx::SqlitePool, |
| 152 |
interval_secs: u64, |
| 153 |
alerter: Option<Alerter>, |
| 154 |
token: tokio_util::sync::CancellationToken, |
| 155 |
) -> Vec<tokio::task::JoinHandle<()>> { |
| 156 |
let peer_names: Vec<String> = { |
| 157 |
let mesh_guard = mesh.read().await; |
| 158 |
mesh_guard.peers.keys().cloned().collect() |
| 159 |
}; |
| 160 |
|
| 161 |
let mut handles = Vec::new(); |
| 162 |
for peer_name in peer_names { |
| 163 |
let mesh = Arc::clone(&mesh); |
| 164 |
let pool = pool.clone(); |
| 165 |
let alerter = alerter.clone(); |
| 166 |
let token = token.clone(); |
| 167 |
handles.push(tokio::spawn(async move { |
| 168 |
heartbeat_loop(&peer_name, mesh, pool, interval_secs, alerter, token).await; |
| 169 |
})); |
| 170 |
} |
| 171 |
handles |
| 172 |
} |
| 173 |
|
| 174 |
async fn heartbeat_loop( |
| 175 |
peer_name: &str, |
| 176 |
mesh: SharedMeshState, |
| 177 |
pool: sqlx::SqlitePool, |
| 178 |
interval_secs: u64, |
| 179 |
alerter: Option<Alerter>, |
| 180 |
cancel: tokio_util::sync::CancellationToken, |
| 181 |
) { |
| 182 |
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); |
| 183 |
|
| 184 |
interval.tick().await; |
| 185 |
|
| 186 |
let (address, auth_token) = { |
| 187 |
let state = mesh.read().await; |
| 188 |
match state.peers.get(peer_name) { |
| 189 |
Some(p) => (p.address.clone(), p.token.clone()), |
| 190 |
None => return, |
| 191 |
} |
| 192 |
}; |
| 193 |
|
| 194 |
let client = reqwest::Client::builder() |
| 195 |
.timeout(std::time::Duration::from_secs(10)) |
| 196 |
.build() |
| 197 |
.unwrap_or_default(); |
| 198 |
|
| 199 |
loop { |
| 200 |
tokio::select! { |
| 201 |
_ = cancel.cancelled() => break, |
| 202 |
_ = interval.tick() => {} |
| 203 |
} |
| 204 |
|
| 205 |
let start = std::time::Instant::now(); |
| 206 |
let mut req = client.get(format!("http://{address}/api/peer/info")); |
| 207 |
if let Some(ref t) = auth_token { |
| 208 |
req = req.bearer_auth(t); |
| 209 |
} |
| 210 |
let result = req.send().await; |
| 211 |
let latency_ms = start.elapsed().as_millis() as u64; |
| 212 |
|
| 213 |
match result.and_then(|r| r.error_for_status()) { |
| 214 |
Ok(response) => { |
| 215 |
let info: Option<InstanceInfo> = response.json().await.ok(); |
| 216 |
handle_heartbeat_success(peer_name, &mesh, &pool, info, latency_ms, &alerter).await; |
| 217 |
} |
| 218 |
Err(_e) => { |
| 219 |
handle_heartbeat_failure(peer_name, &mesh, &pool, latency_ms, &alerter).await; |
| 220 |
} |
| 221 |
} |
| 222 |
|
| 223 |
|
| 224 |
let mut status_req = client.get(format!("http://{address}/api/peer/status")); |
| 225 |
if let Some(ref t) = auth_token { |
| 226 |
status_req = status_req.bearer_auth(t); |
| 227 |
} |
| 228 |
let status_result = status_req.send().await; |
| 229 |
match status_result { |
| 230 |
Ok(resp) => { |
| 231 |
if let Ok(data) = resp.json::<serde_json::Value>().await { |
| 232 |
let mut state = mesh.write().await; |
| 233 |
if let Some(peer) = state.peers.get_mut(peer_name) { |
| 234 |
peer.status_data = Some(data); |
| 235 |
} |
| 236 |
} |
| 237 |
} |
| 238 |
Err(e) => { |
| 239 |
tracing::debug!("{peer_name}: failed to fetch /api/peer/status: {e}"); |
| 240 |
} |
| 241 |
} |
| 242 |
} |
| 243 |
} |
| 244 |
|
| 245 |
async fn handle_heartbeat_success( |
| 246 |
peer_name: &str, |
| 247 |
mesh: &SharedMeshState, |
| 248 |
pool: &sqlx::SqlitePool, |
| 249 |
info: Option<InstanceInfo>, |
| 250 |
latency_ms: u64, |
| 251 |
alerter: &Option<Alerter>, |
| 252 |
) { |
| 253 |
let now = chrono::Utc::now().to_rfc3339(); |
| 254 |
|
| 255 |
|
| 256 |
let (new_identity_id, updated_identity_id, recovery_info) = { |
| 257 |
let mut state = mesh.write().await; |
| 258 |
let Some(peer) = state.peers.get_mut(peer_name) else { |
| 259 |
return; |
| 260 |
}; |
| 261 |
|
| 262 |
let was_missing = peer.status == PeerStatus::Missing || peer.status == PeerStatus::GracePeriod; |
| 263 |
|
| 264 |
|
| 265 |
let mut new_identity = None; |
| 266 |
let mut updated_identity = None; |
| 267 |
if let Some(ref info) = info { |
| 268 |
match &peer.known_id { |
| 269 |
None => { |
| 270 |
peer.known_id = Some(info.id.clone()); |
| 271 |
new_identity = Some(info.id.clone()); |
| 272 |
tracing::info!("{peer_name}: first contact, id={}", info.id); |
| 273 |
} |
| 274 |
Some(known) if known != &info.id => { |
| 275 |
|
| 276 |
|
| 277 |
tracing::error!( |
| 278 |
"{peer_name}: UUID mismatch! expected={known}, got={}. Resetting peer state (possible reinstall).", |
| 279 |
info.id |
| 280 |
); |
| 281 |
peer.known_id = Some(info.id.clone()); |
| 282 |
updated_identity = Some(info.id.clone()); |
| 283 |
} |
| 284 |
_ => {} |
| 285 |
} |
| 286 |
} |
| 287 |
|
| 288 |
|
| 289 |
let recovery = if was_missing && peer.on_missing == OnMissing::Alert { |
| 290 |
Some(peer.address.clone()) |
| 291 |
} else { |
| 292 |
None |
| 293 |
}; |
| 294 |
|
| 295 |
if was_missing { |
| 296 |
tracing::info!("{peer_name}: recovered (was {:?})", peer.status); |
| 297 |
} |
| 298 |
|
| 299 |
peer.status = PeerStatus::Online; |
| 300 |
peer.info = info; |
| 301 |
peer.last_seen = Some(now); |
| 302 |
peer.latency_ms = Some(latency_ms); |
| 303 |
peer.consecutive_failures = 0; |
| 304 |
|
| 305 |
(new_identity, updated_identity, recovery) |
| 306 |
}; |
| 307 |
|
| 308 |
|
| 309 |
if let Some(id) = new_identity_id { |
| 310 |
let _ = crate::db::store_peer_identity(pool, peer_name, &id).await; |
| 311 |
} |
| 312 |
if let Some(id) = updated_identity_id { |
| 313 |
let _ = crate::db::update_peer_identity(pool, peer_name, &id).await; |
| 314 |
} |
| 315 |
let _ = crate::db::insert_peer_heartbeat(pool, peer_name, "online", latency_ms as i64).await; |
| 316 |
|
| 317 |
if let (Some(address), Some(alerter)) = (recovery_info, alerter) { |
| 318 |
alerter.send_peer_recovery(peer_name, &address).await; |
| 319 |
} |
| 320 |
} |
| 321 |
|
| 322 |
async fn handle_heartbeat_failure( |
| 323 |
peer_name: &str, |
| 324 |
mesh: &SharedMeshState, |
| 325 |
pool: &sqlx::SqlitePool, |
| 326 |
latency_ms: u64, |
| 327 |
alerter: &Option<Alerter>, |
| 328 |
) { |
| 329 |
|
| 330 |
let (new_status, alert_info) = { |
| 331 |
let mut state = mesh.write().await; |
| 332 |
let Some(peer) = state.peers.get_mut(peer_name) else { |
| 333 |
return; |
| 334 |
}; |
| 335 |
|
| 336 |
peer.consecutive_failures += 1; |
| 337 |
|
| 338 |
let new_status = match peer.status { |
| 339 |
PeerStatus::Online | PeerStatus::Unknown | PeerStatus::GracePeriod => { |
| 340 |
if peer.consecutive_failures >= peer.grace_count { |
| 341 |
PeerStatus::Missing |
| 342 |
} else { |
| 343 |
PeerStatus::GracePeriod |
| 344 |
} |
| 345 |
} |
| 346 |
PeerStatus::Missing => PeerStatus::Missing, |
| 347 |
}; |
| 348 |
|
| 349 |
let transitioned_to_missing = new_status == PeerStatus::Missing && peer.status != PeerStatus::Missing; |
| 350 |
|
| 351 |
|
| 352 |
let alert_info = if transitioned_to_missing { |
| 353 |
match peer.on_missing { |
| 354 |
OnMissing::Alert => { |
| 355 |
tracing::warn!( |
| 356 |
"{peer_name}: MISSING after {} consecutive failures (action: alert)", |
| 357 |
peer.consecutive_failures |
| 358 |
); |
| 359 |
Some((peer.address.clone(), peer.consecutive_failures)) |
| 360 |
} |
| 361 |
OnMissing::Log => { |
| 362 |
tracing::info!( |
| 363 |
"{peer_name}: missing after {} consecutive failures", |
| 364 |
peer.consecutive_failures |
| 365 |
); |
| 366 |
None |
| 367 |
} |
| 368 |
OnMissing::Ignore => None, |
| 369 |
} |
| 370 |
} else { |
| 371 |
None |
| 372 |
}; |
| 373 |
|
| 374 |
peer.status = new_status; |
| 375 |
|
| 376 |
(new_status, alert_info) |
| 377 |
}; |
| 378 |
|
| 379 |
|
| 380 |
let status_str = new_status.to_string(); |
| 381 |
let _ = crate::db::insert_peer_heartbeat(pool, peer_name, &status_str, latency_ms as i64).await; |
| 382 |
|
| 383 |
if let (Some((address, failures)), Some(alerter)) = (alert_info, alerter) { |
| 384 |
alerter.send_peer_missing(peer_name, &address, failures).await; |
| 385 |
} |
| 386 |
} |
| 387 |
|
| 388 |
#[cfg(test)] |
| 389 |
mod tests { |
| 390 |
use super::*; |
| 391 |
|
| 392 |
#[test] |
| 393 |
fn override_id_takes_precedence() { |
| 394 |
let id = load_or_create_instance_id(Some("override-id")).unwrap(); |
| 395 |
assert_eq!(id, "override-id"); |
| 396 |
} |
| 397 |
|
| 398 |
#[test] |
| 399 |
fn auto_id_is_valid_uuid() { |
| 400 |
let id = load_or_create_instance_id(None).unwrap(); |
| 401 |
assert!(uuid::Uuid::parse_str(&id).is_ok()); |
| 402 |
} |
| 403 |
|
| 404 |
#[test] |
| 405 |
fn on_missing_deserialize() { |
| 406 |
#[derive(serde::Deserialize)] |
| 407 |
struct Wrapper { |
| 408 |
#[serde(default)] |
| 409 |
on_missing: OnMissing, |
| 410 |
} |
| 411 |
|
| 412 |
let w: Wrapper = toml::from_str(r#"on_missing = "alert""#).unwrap(); |
| 413 |
assert_eq!(w.on_missing, OnMissing::Alert); |
| 414 |
|
| 415 |
let w: Wrapper = toml::from_str(r#"on_missing = "log""#).unwrap(); |
| 416 |
assert_eq!(w.on_missing, OnMissing::Log); |
| 417 |
|
| 418 |
let w: Wrapper = toml::from_str(r#"on_missing = "ignore""#).unwrap(); |
| 419 |
assert_eq!(w.on_missing, OnMissing::Ignore); |
| 420 |
|
| 421 |
|
| 422 |
let w: Wrapper = toml::from_str("").unwrap(); |
| 423 |
assert_eq!(w.on_missing, OnMissing::Log); |
| 424 |
} |
| 425 |
|
| 426 |
fn test_instance_info() -> InstanceInfo { |
| 427 |
InstanceInfo { |
| 428 |
id: "test-id".to_string(), |
| 429 |
name: "test".to_string(), |
| 430 |
version: "0.1.0".to_string(), |
| 431 |
targets: vec![], |
| 432 |
started_at: "2026-03-10T00:00:00Z".to_string(), |
| 433 |
} |
| 434 |
} |
| 435 |
|
| 436 |
fn test_mesh_with_peer(grace_count: u32) -> SharedMeshState { |
| 437 |
let mut peer_configs = HashMap::new(); |
| 438 |
peer_configs.insert( |
| 439 |
"peer1".to_string(), |
| 440 |
PeerConfig { |
| 441 |
address: "10.0.0.1:9100".to_string(), |
| 442 |
on_missing: OnMissing::Alert, |
| 443 |
grace_count: Some(grace_count), |
| 444 |
token: None, |
| 445 |
}, |
| 446 |
); |
| 447 |
new_mesh_state(test_instance_info(), &peer_configs) |
| 448 |
} |
| 449 |
|
| 450 |
#[tokio::test] |
| 451 |
async fn heartbeat_failure_transitions_through_grace_to_missing() { |
| 452 |
let pool = crate::db::connect_in_memory().await.unwrap(); |
| 453 |
let mesh = test_mesh_with_peer(3); |
| 454 |
|
| 455 |
|
| 456 |
assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Unknown); |
| 457 |
|
| 458 |
|
| 459 |
handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; |
| 460 |
assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::GracePeriod); |
| 461 |
|
| 462 |
|
| 463 |
handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; |
| 464 |
assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::GracePeriod); |
| 465 |
|
| 466 |
|
| 467 |
handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; |
| 468 |
assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing); |
| 469 |
|
| 470 |
|
| 471 |
handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; |
| 472 |
assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing); |
| 473 |
} |
| 474 |
|
| 475 |
#[tokio::test] |
| 476 |
async fn heartbeat_success_recovers_from_missing() { |
| 477 |
let pool = crate::db::connect_in_memory().await.unwrap(); |
| 478 |
let mesh = test_mesh_with_peer(1); |
| 479 |
|
| 480 |
|
| 481 |
handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; |
| 482 |
assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Missing); |
| 483 |
|
| 484 |
|
| 485 |
let info = InstanceInfo { |
| 486 |
id: "remote-id".to_string(), |
| 487 |
name: "remote".to_string(), |
| 488 |
version: "0.1.0".to_string(), |
| 489 |
targets: vec![], |
| 490 |
started_at: "2026-03-10T00:00:00Z".to_string(), |
| 491 |
}; |
| 492 |
handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 42, &None).await; |
| 493 |
|
| 494 |
let state = mesh.read().await; |
| 495 |
let peer = &state.peers["peer1"]; |
| 496 |
assert_eq!(peer.status, PeerStatus::Online); |
| 497 |
assert_eq!(peer.consecutive_failures, 0); |
| 498 |
assert_eq!(peer.latency_ms, Some(42)); |
| 499 |
assert_eq!(peer.known_id.as_deref(), Some("remote-id")); |
| 500 |
} |
| 501 |
|
| 502 |
#[tokio::test] |
| 503 |
async fn heartbeat_success_detects_uuid_stored_on_first_contact() { |
| 504 |
let pool = crate::db::connect_in_memory().await.unwrap(); |
| 505 |
let mesh = test_mesh_with_peer(3); |
| 506 |
|
| 507 |
let info = InstanceInfo { |
| 508 |
id: "uuid-abc".to_string(), |
| 509 |
name: "remote".to_string(), |
| 510 |
version: "0.1.0".to_string(), |
| 511 |
targets: vec![], |
| 512 |
started_at: "2026-03-10T00:00:00Z".to_string(), |
| 513 |
}; |
| 514 |
handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 10, &None).await; |
| 515 |
|
| 516 |
|
| 517 |
let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap(); |
| 518 |
assert_eq!(stored, Some("uuid-abc".to_string())); |
| 519 |
} |
| 520 |
|
| 521 |
#[tokio::test] |
| 522 |
async fn heartbeat_records_to_db() { |
| 523 |
let pool = crate::db::connect_in_memory().await.unwrap(); |
| 524 |
let mesh = test_mesh_with_peer(3); |
| 525 |
|
| 526 |
handle_heartbeat_failure("peer1", &mesh, &pool, 0, &None).await; |
| 527 |
handle_heartbeat_success("peer1", &mesh, &pool, None, 55, &None).await; |
| 528 |
|
| 529 |
let history = crate::db::get_peer_heartbeat_history(&pool, "peer1", 10).await.unwrap(); |
| 530 |
assert_eq!(history.len(), 2); |
| 531 |
|
| 532 |
assert_eq!(history[0].status, "online"); |
| 533 |
assert_eq!(history[0].latency_ms, 55); |
| 534 |
assert_eq!(history[1].status, "grace_period"); |
| 535 |
} |
| 536 |
|
| 537 |
#[tokio::test] |
| 538 |
async fn heartbeat_uuid_mismatch_resets_state() { |
| 539 |
let pool = crate::db::connect_in_memory().await.unwrap(); |
| 540 |
let mesh = test_mesh_with_peer(3); |
| 541 |
|
| 542 |
|
| 543 |
let info_a = InstanceInfo { |
| 544 |
id: "aaa".to_string(), |
| 545 |
name: "remote".to_string(), |
| 546 |
version: "0.1.0".to_string(), |
| 547 |
targets: vec![], |
| 548 |
started_at: "2026-03-10T00:00:00Z".to_string(), |
| 549 |
}; |
| 550 |
handle_heartbeat_success("peer1", &mesh, &pool, Some(info_a), 10, &None).await; |
| 551 |
assert_eq!(mesh.read().await.peers["peer1"].known_id.as_deref(), Some("aaa")); |
| 552 |
assert_eq!(mesh.read().await.peers["peer1"].status, PeerStatus::Online); |
| 553 |
|
| 554 |
|
| 555 |
let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap(); |
| 556 |
assert_eq!(stored, Some("aaa".to_string())); |
| 557 |
|
| 558 |
|
| 559 |
let info_b = InstanceInfo { |
| 560 |
id: "bbb".to_string(), |
| 561 |
name: "remote-reinstalled".to_string(), |
| 562 |
version: "0.2.0".to_string(), |
| 563 |
targets: vec![], |
| 564 |
started_at: "2026-03-13T00:00:00Z".to_string(), |
| 565 |
}; |
| 566 |
handle_heartbeat_success("peer1", &mesh, &pool, Some(info_b), 20, &None).await; |
| 567 |
|
| 568 |
|
| 569 |
let state = mesh.read().await; |
| 570 |
let peer = &state.peers["peer1"]; |
| 571 |
assert_eq!(peer.known_id.as_deref(), Some("bbb")); |
| 572 |
assert_eq!(peer.status, PeerStatus::Online); |
| 573 |
assert_eq!(peer.consecutive_failures, 0); |
| 574 |
|
| 575 |
|
| 576 |
let stored = crate::db::get_peer_identity(&pool, "peer1").await.unwrap(); |
| 577 |
assert_eq!(stored, Some("bbb".to_string())); |
| 578 |
} |
| 579 |
|
| 580 |
#[tokio::test] |
| 581 |
async fn heartbeat_same_uuid_proceeds_normally() { |
| 582 |
let pool = crate::db::connect_in_memory().await.unwrap(); |
| 583 |
let mesh = test_mesh_with_peer(3); |
| 584 |
|
| 585 |
let info = InstanceInfo { |
| 586 |
id: "same-uuid".to_string(), |
| 587 |
name: "remote".to_string(), |
| 588 |
version: "0.1.0".to_string(), |
| 589 |
targets: vec![], |
| 590 |
started_at: "2026-03-10T00:00:00Z".to_string(), |
| 591 |
}; |
| 592 |
|
| 593 |
handle_heartbeat_success("peer1", &mesh, &pool, Some(info.clone()), 10, &None).await; |
| 594 |
assert_eq!(mesh.read().await.peers["peer1"].known_id.as_deref(), Some("same-uuid")); |
| 595 |
|
| 596 |
|
| 597 |
handle_heartbeat_success("peer1", &mesh, &pool, Some(info), 15, &None).await; |
| 598 |
let state = mesh.read().await; |
| 599 |
let peer = &state.peers["peer1"]; |
| 600 |
assert_eq!(peer.known_id.as_deref(), Some("same-uuid")); |
| 601 |
assert_eq!(peer.status, PeerStatus::Online); |
| 602 |
assert_eq!(peer.latency_ms, Some(15)); |
| 603 |
} |
| 604 |
|
| 605 |
#[test] |
| 606 |
fn mesh_state_construction() { |
| 607 |
let info = InstanceInfo { |
| 608 |
id: "test-id".to_string(), |
| 609 |
name: "test".to_string(), |
| 610 |
version: "0.1.0".to_string(), |
| 611 |
targets: vec!["mnw".to_string()], |
| 612 |
started_at: "2026-03-10T00:00:00Z".to_string(), |
| 613 |
}; |
| 614 |
|
| 615 |
let mut peer_configs = HashMap::new(); |
| 616 |
peer_configs.insert( |
| 617 |
"peer1".to_string(), |
| 618 |
PeerConfig { |
| 619 |
address: "10.0.0.1:9100".to_string(), |
| 620 |
on_missing: OnMissing::Alert, |
| 621 |
grace_count: Some(5), |
| 622 |
token: None, |
| 623 |
}, |
| 624 |
); |
| 625 |
|
| 626 |
let mesh = new_mesh_state(info, &peer_configs); |
| 627 |
let state = mesh.blocking_read(); |
| 628 |
assert_eq!(state.instance.id, "test-id"); |
| 629 |
assert_eq!(state.peers.len(), 1); |
| 630 |
let peer = state.peers.get("peer1").unwrap(); |
| 631 |
assert_eq!(peer.address, "10.0.0.1:9100"); |
| 632 |
assert_eq!(peer.on_missing, OnMissing::Alert); |
| 633 |
assert_eq!(peer.grace_count, 5); |
| 634 |
assert_eq!(peer.status, PeerStatus::Unknown); |
| 635 |
} |
| 636 |
} |
| 637 |
|