Skip to main content

max / makenotwork

5.2 KB · 149 lines History Blame Raw
1 use tokio::task::JoinHandle;
2 use tracing::info;
3
4 use pom::alerts::Alerter;
5 use pom::config::Config;
6 use pom::db;
7 use pom::error::Result;
8 use pom::peer;
9
10 use super::tasks;
11
12 pub(crate) async fn cmd_serve(
13 pool: &sqlx::SqlitePool,
14 config: &Config,
15 ) -> Result<()> {
16 let default_interval = config.serve.interval_secs;
17 let prune_days = config.serve.prune_days;
18 let listen_addr = config.serve.listen.clone();
19
20 // --- Cancellation token for graceful shutdown ---
21 let token = tokio_util::sync::CancellationToken::new();
22
23 // --- Instance identity ---
24 let instance_id = peer::load_or_create_instance_id(config.instance.id.as_deref())?;
25 let instance_name = config.instance_name();
26 let instance_info = peer::InstanceInfo {
27 id: instance_id.clone(),
28 name: instance_name.clone(),
29 version: env!("CARGO_PKG_VERSION").to_string(),
30 targets: config.target_names(),
31 started_at: chrono::Utc::now().to_rfc3339(),
32 };
33
34 // --- Alerter ---
35 let alerter = config.alerts.as_ref().map(|alert_config| {
36 info!("Alerts enabled (to: {})", alert_config.to);
37 Alerter::new(alert_config.clone(), pool.clone(), instance_name.clone())
38 });
39
40 info!("Instance: {instance_name} (id={instance_id})");
41 info!("Starting serve mode (default interval: {default_interval}s, prune: {prune_days}d)");
42
43 // --- Mesh state ---
44 let mesh = peer::new_mesh_state(instance_info, &config.peers);
45
46 // Load known peer identities from DB
47 {
48 let mut state = mesh.write().await;
49 for (name, peer) in state.peers.iter_mut() {
50 if let Ok(Some(known_id)) = db::get_peer_identity(pool, name).await {
51 peer.known_id = Some(known_id);
52 }
53 }
54 }
55
56 let mut handles: Vec<JoinHandle<()>> = Vec::new();
57
58 // Spawn all monitoring tasks
59 handles.extend(tasks::spawn_health_tasks(config, pool, &token, &alerter));
60 handles.extend(tasks::spawn_tls_tasks(config, pool, &token, &alerter));
61 handles.extend(tasks::spawn_route_tasks(config, pool, &token, &alerter));
62 handles.extend(tasks::spawn_dns_tasks(config, pool, &token, &alerter));
63 handles.extend(tasks::spawn_whois_tasks(config, pool, &token, &alerter));
64 handles.extend(tasks::spawn_cors_tasks(config, pool, &token, &alerter));
65 handles.extend(tasks::spawn_backup_tasks(config, pool, &token, &alerter));
66 handles.extend(tasks::spawn_scan_pipeline_tasks(config, &token, &alerter));
67 handles.push(tasks::spawn_prune_task(pool, prune_days, &token));
68
69 // Spawn peer heartbeat tasks
70 if !config.peers.is_empty() {
71 let heartbeat_secs = config.serve.peer_heartbeat_secs;
72 info!("Peer mesh: {} peers, heartbeat every {heartbeat_secs}s", config.peers.len());
73 let hb_handles = peer::spawn_heartbeat_tasks(
74 mesh.clone(),
75 pool.clone(),
76 heartbeat_secs,
77 alerter.clone(),
78 token.clone(),
79 ).await;
80 handles.extend(hb_handles);
81 }
82
83 // Spawn monitoring-offline meta-alert task
84 if let Some(handle) = tasks::spawn_meta_alert_task(config, pool, default_interval, &token, &alerter) {
85 handles.push(handle);
86 }
87
88 // Start HTTP API server
89 let api_app = pom::api::router(pool.clone(), config.clone(), Some(mesh.clone()));
90 let api_listener = tokio::net::TcpListener::bind(&listen_addr).await?;
91 info!("API server listening on {listen_addr}");
92 let api_cancel = token.clone();
93 handles.push(tokio::spawn(async move {
94 if let Err(e) = axum::serve(api_listener, api_app)
95 .with_graceful_shutdown(async move { api_cancel.cancelled().await })
96 .await
97 {
98 tracing::error!("API server error: {e}");
99 }
100 }));
101
102 // Wait for shutdown signal or unexpected task exit
103 let mut sigterm = tokio::signal::unix::signal(
104 tokio::signal::unix::SignalKind::terminate(),
105 )?;
106
107 let mut watchdog_interval = tokio::time::interval(std::time::Duration::from_secs(60));
108 watchdog_interval.tick().await; // consume immediate first tick
109
110 loop {
111 tokio::select! {
112 _ = tokio::signal::ctrl_c() => {
113 info!("Received SIGINT, shutting down");
114 break;
115 }
116 _ = sigterm.recv() => {
117 info!("Received SIGTERM, shutting down");
118 break;
119 }
120 _ = watchdog_interval.tick() => {
121 for (i, handle) in handles.iter().enumerate() {
122 if handle.is_finished() {
123 tracing::error!("Background task {i} exited unexpectedly (possible panic)");
124 }
125 }
126 }
127 }
128 }
129
130 // Graceful shutdown: cancel all tasks, then wait with grace period
131 token.cancel();
132 info!("Waiting for tasks to finish (5s grace period)...");
133
134 let shutdown = async {
135 for handle in handles {
136 if let Err(e) = handle.await {
137 tracing::error!("Task shutdown error: {e}");
138 }
139 }
140 };
141
142 if tokio::time::timeout(std::time::Duration::from_secs(5), shutdown).await.is_err() {
143 tracing::warn!("Grace period elapsed, some tasks may not have finished cleanly");
144 }
145
146 info!("Shutdown complete");
147 Ok(())
148 }
149