Skip to main content

max / pom

5.1 KB · 147 lines History Blame Raw
1 use tokio::task::JoinHandle;
2 use tracing::info;
3
4 use pom::alerts::Alerter;
5 use pom::config::Config;
6 use pom::db;
7 use pom::error::Result;
8 use pom::peer;
9
10 use super::tasks;
11
12 pub(crate) async fn cmd_serve(
13 pool: &sqlx::SqlitePool,
14 config: &Config,
15 ) -> Result<()> {
16 let default_interval = config.serve.interval_secs;
17 let prune_days = config.serve.prune_days;
18 let listen_addr = config.serve.listen.clone();
19
20 // --- Cancellation token for graceful shutdown ---
21 let token = tokio_util::sync::CancellationToken::new();
22
23 // --- Instance identity ---
24 let instance_id = peer::load_or_create_instance_id(config.instance.id.as_deref())?;
25 let instance_name = config.instance_name();
26 let instance_info = peer::InstanceInfo {
27 id: instance_id.clone(),
28 name: instance_name.clone(),
29 version: env!("CARGO_PKG_VERSION").to_string(),
30 targets: config.target_names(),
31 started_at: chrono::Utc::now().to_rfc3339(),
32 };
33
34 // --- Alerter ---
35 let alerter = config.alerts.as_ref().map(|alert_config| {
36 info!("Alerts enabled (to: {})", alert_config.to);
37 Alerter::new(alert_config.clone(), pool.clone(), instance_name.clone())
38 });
39
40 info!("Instance: {instance_name} (id={instance_id})");
41 info!("Starting serve mode (default interval: {default_interval}s, prune: {prune_days}d)");
42
43 // --- Mesh state ---
44 let mesh = peer::new_mesh_state(instance_info, &config.peers);
45
46 // Load known peer identities from DB
47 {
48 let mut state = mesh.write().await;
49 for (name, peer) in state.peers.iter_mut() {
50 if let Ok(Some(known_id)) = db::get_peer_identity(pool, name).await {
51 peer.known_id = Some(known_id);
52 }
53 }
54 }
55
56 let mut handles: Vec<JoinHandle<()>> = Vec::new();
57
58 // Spawn all monitoring tasks
59 handles.extend(tasks::spawn_health_tasks(config, pool, &token, &alerter));
60 handles.extend(tasks::spawn_tls_tasks(config, pool, &token, &alerter));
61 handles.extend(tasks::spawn_route_tasks(config, pool, &token, &alerter));
62 handles.extend(tasks::spawn_dns_tasks(config, pool, &token, &alerter));
63 handles.extend(tasks::spawn_whois_tasks(config, pool, &token, &alerter));
64 handles.extend(tasks::spawn_cors_tasks(config, pool, &token, &alerter));
65 handles.push(tasks::spawn_prune_task(pool, prune_days, &token));
66
67 // Spawn peer heartbeat tasks
68 if !config.peers.is_empty() {
69 let heartbeat_secs = config.serve.peer_heartbeat_secs;
70 info!("Peer mesh: {} peers, heartbeat every {heartbeat_secs}s", config.peers.len());
71 let hb_handles = peer::spawn_heartbeat_tasks(
72 mesh.clone(),
73 pool.clone(),
74 heartbeat_secs,
75 alerter.clone(),
76 token.clone(),
77 ).await;
78 handles.extend(hb_handles);
79 }
80
81 // Spawn monitoring-offline meta-alert task
82 if let Some(handle) = tasks::spawn_meta_alert_task(config, pool, default_interval, &token, &alerter) {
83 handles.push(handle);
84 }
85
86 // Start HTTP API server
87 let api_app = pom::api::router(pool.clone(), config.clone(), Some(mesh.clone()));
88 let api_listener = tokio::net::TcpListener::bind(&listen_addr).await?;
89 info!("API server listening on {listen_addr}");
90 let api_cancel = token.clone();
91 handles.push(tokio::spawn(async move {
92 if let Err(e) = axum::serve(api_listener, api_app)
93 .with_graceful_shutdown(async move { api_cancel.cancelled().await })
94 .await
95 {
96 tracing::error!("API server error: {e}");
97 }
98 }));
99
100 // Wait for shutdown signal or unexpected task exit
101 let mut sigterm = tokio::signal::unix::signal(
102 tokio::signal::unix::SignalKind::terminate(),
103 )?;
104
105 let mut watchdog_interval = tokio::time::interval(std::time::Duration::from_secs(60));
106 watchdog_interval.tick().await; // consume immediate first tick
107
108 loop {
109 tokio::select! {
110 _ = tokio::signal::ctrl_c() => {
111 info!("Received SIGINT, shutting down");
112 break;
113 }
114 _ = sigterm.recv() => {
115 info!("Received SIGTERM, shutting down");
116 break;
117 }
118 _ = watchdog_interval.tick() => {
119 for (i, handle) in handles.iter().enumerate() {
120 if handle.is_finished() {
121 tracing::error!("Background task {i} exited unexpectedly (possible panic)");
122 }
123 }
124 }
125 }
126 }
127
128 // Graceful shutdown: cancel all tasks, then wait with grace period
129 token.cancel();
130 info!("Waiting for tasks to finish (5s grace period)...");
131
132 let shutdown = async {
133 for handle in handles {
134 if let Err(e) = handle.await {
135 tracing::error!("Task shutdown error: {e}");
136 }
137 }
138 };
139
140 if tokio::time::timeout(std::time::Duration::from_secs(5), shutdown).await.is_err() {
141 tracing::warn!("Grace period elapsed, some tasks may not have finished cleanly");
142 }
143
144 info!("Shutdown complete");
145 Ok(())
146 }
147