| 1 |
use tokio::task::JoinHandle; |
| 2 |
use tracing::info; |
| 3 |
|
| 4 |
use pom::alerts::Alerter; |
| 5 |
use pom::config::Config; |
| 6 |
use pom::db; |
| 7 |
use pom::error::Result; |
| 8 |
use pom::peer; |
| 9 |
|
| 10 |
use super::tasks; |
| 11 |
|
| 12 |
pub(crate) async fn cmd_serve( |
| 13 |
pool: &sqlx::SqlitePool, |
| 14 |
config: &Config, |
| 15 |
) -> Result<()> { |
| 16 |
let default_interval = config.serve.interval_secs; |
| 17 |
let prune_days = config.serve.prune_days; |
| 18 |
let listen_addr = config.serve.listen.clone(); |
| 19 |
|
| 20 |
|
| 21 |
let token = tokio_util::sync::CancellationToken::new(); |
| 22 |
|
| 23 |
|
| 24 |
let instance_id = peer::load_or_create_instance_id(config.instance.id.as_deref())?; |
| 25 |
let instance_name = config.instance_name(); |
| 26 |
let instance_info = peer::InstanceInfo { |
| 27 |
id: instance_id.clone(), |
| 28 |
name: instance_name.clone(), |
| 29 |
version: env!("CARGO_PKG_VERSION").to_string(), |
| 30 |
targets: config.target_names(), |
| 31 |
started_at: chrono::Utc::now().to_rfc3339(), |
| 32 |
}; |
| 33 |
|
| 34 |
|
| 35 |
let alerter = config.alerts.as_ref().map(|alert_config| { |
| 36 |
info!("Alerts enabled (to: {})", alert_config.to); |
| 37 |
Alerter::new(alert_config.clone(), pool.clone(), instance_name.clone()) |
| 38 |
}); |
| 39 |
|
| 40 |
info!("Instance: {instance_name} (id={instance_id})"); |
| 41 |
info!("Starting serve mode (default interval: {default_interval}s, prune: {prune_days}d)"); |
| 42 |
|
| 43 |
|
| 44 |
let mesh = peer::new_mesh_state(instance_info, &config.peers); |
| 45 |
|
| 46 |
|
| 47 |
{ |
| 48 |
let mut state = mesh.write().await; |
| 49 |
for (name, peer) in state.peers.iter_mut() { |
| 50 |
if let Ok(Some(known_id)) = db::get_peer_identity(pool, name).await { |
| 51 |
peer.known_id = Some(known_id); |
| 52 |
} |
| 53 |
} |
| 54 |
} |
| 55 |
|
| 56 |
let mut handles: Vec<JoinHandle<()>> = Vec::new(); |
| 57 |
|
| 58 |
|
| 59 |
handles.extend(tasks::spawn_health_tasks(config, pool, &token, &alerter)); |
| 60 |
handles.extend(tasks::spawn_tls_tasks(config, pool, &token, &alerter)); |
| 61 |
handles.extend(tasks::spawn_route_tasks(config, pool, &token, &alerter)); |
| 62 |
handles.extend(tasks::spawn_dns_tasks(config, pool, &token, &alerter)); |
| 63 |
handles.extend(tasks::spawn_whois_tasks(config, pool, &token, &alerter)); |
| 64 |
handles.extend(tasks::spawn_cors_tasks(config, pool, &token, &alerter)); |
| 65 |
handles.extend(tasks::spawn_backup_tasks(config, pool, &token, &alerter)); |
| 66 |
handles.extend(tasks::spawn_scan_pipeline_tasks(config, &token, &alerter)); |
| 67 |
handles.push(tasks::spawn_prune_task(pool, prune_days, &token)); |
| 68 |
|
| 69 |
|
| 70 |
if !config.peers.is_empty() { |
| 71 |
let heartbeat_secs = config.serve.peer_heartbeat_secs; |
| 72 |
info!("Peer mesh: {} peers, heartbeat every {heartbeat_secs}s", config.peers.len()); |
| 73 |
let hb_handles = peer::spawn_heartbeat_tasks( |
| 74 |
mesh.clone(), |
| 75 |
pool.clone(), |
| 76 |
heartbeat_secs, |
| 77 |
alerter.clone(), |
| 78 |
token.clone(), |
| 79 |
).await; |
| 80 |
handles.extend(hb_handles); |
| 81 |
} |
| 82 |
|
| 83 |
|
| 84 |
if let Some(handle) = tasks::spawn_meta_alert_task(config, pool, default_interval, &token, &alerter) { |
| 85 |
handles.push(handle); |
| 86 |
} |
| 87 |
|
| 88 |
|
| 89 |
let api_app = pom::api::router(pool.clone(), config.clone(), Some(mesh.clone())); |
| 90 |
let api_listener = tokio::net::TcpListener::bind(&listen_addr).await?; |
| 91 |
info!("API server listening on {listen_addr}"); |
| 92 |
let api_cancel = token.clone(); |
| 93 |
handles.push(tokio::spawn(async move { |
| 94 |
if let Err(e) = axum::serve(api_listener, api_app) |
| 95 |
.with_graceful_shutdown(async move { api_cancel.cancelled().await }) |
| 96 |
.await |
| 97 |
{ |
| 98 |
tracing::error!("API server error: {e}"); |
| 99 |
} |
| 100 |
})); |
| 101 |
|
| 102 |
|
| 103 |
let mut sigterm = tokio::signal::unix::signal( |
| 104 |
tokio::signal::unix::SignalKind::terminate(), |
| 105 |
)?; |
| 106 |
|
| 107 |
let mut watchdog_interval = tokio::time::interval(std::time::Duration::from_secs(60)); |
| 108 |
watchdog_interval.tick().await; |
| 109 |
|
| 110 |
loop { |
| 111 |
tokio::select! { |
| 112 |
_ = tokio::signal::ctrl_c() => { |
| 113 |
info!("Received SIGINT, shutting down"); |
| 114 |
break; |
| 115 |
} |
| 116 |
_ = sigterm.recv() => { |
| 117 |
info!("Received SIGTERM, shutting down"); |
| 118 |
break; |
| 119 |
} |
| 120 |
_ = watchdog_interval.tick() => { |
| 121 |
for (i, handle) in handles.iter().enumerate() { |
| 122 |
if handle.is_finished() { |
| 123 |
tracing::error!("Background task {i} exited unexpectedly (possible panic)"); |
| 124 |
} |
| 125 |
} |
| 126 |
} |
| 127 |
} |
| 128 |
} |
| 129 |
|
| 130 |
|
| 131 |
token.cancel(); |
| 132 |
info!("Waiting for tasks to finish (5s grace period)..."); |
| 133 |
|
| 134 |
let shutdown = async { |
| 135 |
for handle in handles { |
| 136 |
if let Err(e) = handle.await { |
| 137 |
tracing::error!("Task shutdown error: {e}"); |
| 138 |
} |
| 139 |
} |
| 140 |
}; |
| 141 |
|
| 142 |
if tokio::time::timeout(std::time::Duration::from_secs(5), shutdown).await.is_err() { |
| 143 |
tracing::warn!("Grace period elapsed, some tasks may not have finished cleanly"); |
| 144 |
} |
| 145 |
|
| 146 |
info!("Shutdown complete"); |
| 147 |
Ok(()) |
| 148 |
} |
| 149 |
|