use tokio::task::JoinHandle; use tracing::info; use pom::alerts::Alerter; use pom::config::Config; use pom::db; use pom::error::Result; use pom::peer; use super::tasks; pub(crate) async fn cmd_serve( pool: &sqlx::SqlitePool, config: &Config, ) -> Result<()> { let default_interval = config.serve.interval_secs; let prune_days = config.serve.prune_days; let listen_addr = config.serve.listen.clone(); // --- Cancellation token for graceful shutdown --- let token = tokio_util::sync::CancellationToken::new(); // --- Instance identity --- let instance_id = peer::load_or_create_instance_id(config.instance.id.as_deref())?; let instance_name = config.instance_name(); let instance_info = peer::InstanceInfo { id: instance_id.clone(), name: instance_name.clone(), version: env!("CARGO_PKG_VERSION").to_string(), targets: config.target_names(), started_at: chrono::Utc::now().to_rfc3339(), }; // --- Alerter --- let alerter = config.alerts.as_ref().map(|alert_config| { info!("Alerts enabled (to: {})", alert_config.to); Alerter::new(alert_config.clone(), pool.clone(), instance_name.clone()) }); info!("Instance: {instance_name} (id={instance_id})"); info!("Starting serve mode (default interval: {default_interval}s, prune: {prune_days}d)"); // --- Mesh state --- let mesh = peer::new_mesh_state(instance_info, &config.peers); // Load known peer identities from DB { let mut state = mesh.write().await; for (name, peer) in state.peers.iter_mut() { if let Ok(Some(known_id)) = db::get_peer_identity(pool, name).await { peer.known_id = Some(known_id); } } } let mut handles: Vec> = Vec::new(); // Spawn all monitoring tasks handles.extend(tasks::spawn_health_tasks(config, pool, &token, &alerter)); handles.extend(tasks::spawn_tls_tasks(config, pool, &token, &alerter)); handles.extend(tasks::spawn_route_tasks(config, pool, &token, &alerter)); handles.extend(tasks::spawn_dns_tasks(config, pool, &token, &alerter)); handles.extend(tasks::spawn_whois_tasks(config, pool, &token, &alerter)); handles.extend(tasks::spawn_cors_tasks(config, pool, &token, &alerter)); handles.extend(tasks::spawn_backup_tasks(config, pool, &token, &alerter)); handles.extend(tasks::spawn_scan_pipeline_tasks(config, &token, &alerter)); handles.push(tasks::spawn_prune_task(pool, prune_days, &token)); // Spawn peer heartbeat tasks if !config.peers.is_empty() { let heartbeat_secs = config.serve.peer_heartbeat_secs; info!("Peer mesh: {} peers, heartbeat every {heartbeat_secs}s", config.peers.len()); let hb_handles = peer::spawn_heartbeat_tasks( mesh.clone(), pool.clone(), heartbeat_secs, alerter.clone(), token.clone(), ).await; handles.extend(hb_handles); } // Spawn monitoring-offline meta-alert task if let Some(handle) = tasks::spawn_meta_alert_task(config, pool, default_interval, &token, &alerter) { handles.push(handle); } // Start HTTP API server let api_app = pom::api::router(pool.clone(), config.clone(), Some(mesh.clone())); let api_listener = tokio::net::TcpListener::bind(&listen_addr).await?; info!("API server listening on {listen_addr}"); let api_cancel = token.clone(); handles.push(tokio::spawn(async move { if let Err(e) = axum::serve(api_listener, api_app) .with_graceful_shutdown(async move { api_cancel.cancelled().await }) .await { tracing::error!("API server error: {e}"); } })); // Wait for shutdown signal or unexpected task exit let mut sigterm = tokio::signal::unix::signal( tokio::signal::unix::SignalKind::terminate(), )?; let mut watchdog_interval = tokio::time::interval(std::time::Duration::from_secs(60)); watchdog_interval.tick().await; // consume immediate first tick loop { tokio::select! { _ = tokio::signal::ctrl_c() => { info!("Received SIGINT, shutting down"); break; } _ = sigterm.recv() => { info!("Received SIGTERM, shutting down"); break; } _ = watchdog_interval.tick() => { for (i, handle) in handles.iter().enumerate() { if handle.is_finished() { tracing::error!("Background task {i} exited unexpectedly (possible panic)"); } } } } } // Graceful shutdown: cancel all tasks, then wait with grace period token.cancel(); info!("Waiting for tasks to finish (5s grace period)..."); let shutdown = async { for handle in handles { if let Err(e) = handle.await { tracing::error!("Task shutdown error: {e}"); } } }; if tokio::time::timeout(std::time::Duration::from_secs(5), shutdown).await.is_err() { tracing::warn!("Grace period elapsed, some tasks may not have finished cleanly"); } info!("Shutdown complete"); Ok(()) }