Skip to main content

max / makenotwork

20.2 KB · 482 lines History Blame Raw
1 //! Application entry point — tracing, config, pool, then hands off to `build_app`.
2
3 use axum::http::Request;
4 use sqlx::ConnectOptions;
5 use sqlx::postgres::PgPoolOptions;
6 use std::time::{Duration, Instant};
7 use tower_http::request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer};
8 use tower_http::trace::TraceLayer;
9 use tower_sessions::cookie::time::Duration as CookieDuration;
10 use tower_sessions::cookie::SameSite;
11 use tower_sessions::{Expiry, SessionManagerLayer};
12 use tower_sessions_sqlx_store::PostgresStore;
13 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
14
15 use makenotwork::config::Config;
16 use makenotwork::constants;
17 use docengine::{Assumptions, DocLoader, DocLoaderConfig};
18 use makenotwork::email::{EmailClient, EmailConfig};
19 use makenotwork::payments::StripeClient;
20 use makenotwork::storage::S3Client;
21 use makenotwork::scanning::ScanPipeline;
22 use makenotwork::{build_app, AppState};
23 use webauthn_rs::WebauthnBuilder;
24
25 #[tokio::main]
26 async fn main() {
27 dotenvy::dotenv().ok();
28
29 // JSON in release, human-readable in dev
30 tracing_subscriber::registry()
31 .with(
32 tracing_subscriber::EnvFilter::try_from_default_env()
33 .unwrap_or_else(|_| "makenotwork=debug,tower_http=debug,sqlx=info".into()),
34 )
35 .with(if cfg!(debug_assertions) {
36 tracing_subscriber::fmt::layer().boxed()
37 } else {
38 tracing_subscriber::fmt::layer().json().boxed()
39 })
40 .init();
41
42 // Sando boot-smoke gate spawns the binary with SANDO_BOOT_SMOKE=1 to
43 // verify it loads + links + survives. The real init loads
44 // assumptions.toml and other runtime files that don't exist in the
45 // sando build workspace. Short-circuit to a tiny axum server that
46 // proves the binary, tokio runtime, axum, and TCP bind all work,
47 // then idles until the gate kills it.
48 if std::env::var("SANDO_BOOT_SMOKE").is_ok() {
49 tracing::info!("SANDO_BOOT_SMOKE=1; running minimal smoke server");
50 let app = axum::Router::new()
51 .route("/health", axum::routing::get(|| async { "ok" }));
52 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await
53 .expect("smoke: bind 127.0.0.1:0");
54 axum::serve(listener, app).await.expect("smoke: serve");
55 return;
56 }
57
58 let config = Config::from_env().expect("Failed to load configuration");
59 tracing::info!("Configuration loaded");
60
61 // Create database connection pool with health checks and lifecycle limits.
62 // - test_before_acquire: validates connections before use (catches stale/broken conns)
63 // - max_lifetime: rotates connections to prevent long-lived session issues
64 // - idle_timeout: prunes idle connections to free server resources
65 // - min_connections: keeps warm connections ready for immediate use
66 // - log_slow_statements: logs queries exceeding 100ms at WARN level
67 let connect_options: sqlx::postgres::PgConnectOptions = config.database_url.parse()
68 .expect("Invalid DATABASE_URL");
69 let connect_options = connect_options
70 .log_statements(log::LevelFilter::Trace)
71 .log_slow_statements(log::LevelFilter::Warn, Duration::from_millis(100));
72
73 let db = PgPoolOptions::new()
74 .max_connections(constants::DB_POOL_MAX_CONNECTIONS)
75 .min_connections(constants::DB_POOL_MIN_CONNECTIONS)
76 .acquire_timeout(Duration::from_secs(constants::DB_ACQUIRE_TIMEOUT_SECS))
77 .max_lifetime(Duration::from_secs(constants::DB_MAX_LIFETIME_SECS))
78 .idle_timeout(Duration::from_secs(constants::DB_IDLE_TIMEOUT_SECS))
79 .test_before_acquire(true)
80 .connect_with(connect_options)
81 .await
82 .expect("Failed to connect to database");
83
84 tracing::info!("Database connected");
85
86 // Run migrations — exit cleanly (code 2) on failure instead of panicking.
87 // This prevents systemd from crash-looping on migration errors (e.g. version
88 // conflicts after a bad deploy). WAM ticket alerts the operator.
89 if let Err(e) = sqlx::migrate!("./migrations").run(&db).await {
90 tracing::error!(error = %e, "Migration failed — exiting without restart");
91
92 // Best-effort WAM alert (DB is up, WAM may be reachable)
93 if let Ok(wam_url) = std::env::var("WAM_URL") {
94 let body = format!("Migration error on startup:\n{e}");
95 let _ = reqwest::Client::new()
96 .post(format!("{wam_url}/tickets"))
97 .json(&serde_json::json!({
98 "title": "Migration failure — server not starting",
99 "body": body,
100 "priority": "critical",
101 "source": "migration-failure",
102 }))
103 .timeout(std::time::Duration::from_secs(5))
104 .send()
105 .await;
106 }
107
108 // Exit code 2 — systemd configured not to restart on this code
109 std::process::exit(2);
110 }
111
112 tracing::info!("Migrations complete");
113
114 // Create PostgreSQL-backed session store (persists across restarts)
115 let session_store = PostgresStore::new(db.clone());
116 session_store
117 .migrate()
118 .await
119 .expect("Failed to migrate session store");
120
121 // In release mode, require HTTPS for session cookies (override with INSECURE_COOKIES=1 for staging)
122 let secure_cookies =
123 !cfg!(debug_assertions) && std::env::var("INSECURE_COOKIES").unwrap_or_default() != "1";
124 if secure_cookies {
125 tracing::info!("Session cookies configured for HTTPS (secure=true)");
126 } else if !cfg!(debug_assertions) {
127 tracing::warn!("Session cookies set to insecure (INSECURE_COOKIES=1)");
128 }
129
130 let session_layer = SessionManagerLayer::new(session_store)
131 .with_secure(secure_cookies)
132 .with_http_only(true)
133 .with_same_site(SameSite::Lax) // Lax allows session on top-level navigations (OAuth redirects)
134 .with_expiry(Expiry::OnInactivity(CookieDuration::days(
135 constants::SESSION_EXPIRY_DAYS,
136 )));
137
138 tracing::info!("PostgreSQL session store initialized");
139
140 // Initialize S3 client if storage is configured
141 let s3: Option<std::sync::Arc<dyn makenotwork::storage::StorageBackend>> =
142 if let Some(ref storage_config) = config.storage {
143 match S3Client::new(storage_config, &config.host_url).await {
144 Ok(client) => {
145 tracing::info!(bucket = %storage_config.bucket, "S3 storage initialized");
146 Some(std::sync::Arc::new(client))
147 }
148 Err(e) => {
149 tracing::warn!(error = ?e, "failed to initialize S3 storage, file uploads unavailable");
150 None
151 }
152 }
153 } else {
154 tracing::info!("S3 storage not configured. File uploads will be unavailable.");
155 None
156 };
157
158 // Initialize SyncKit blob S3 client if configured (separate bucket)
159 let synckit_s3: Option<std::sync::Arc<dyn makenotwork::storage::StorageBackend>> =
160 if let Some(ref synckit_storage_config) = config.synckit_storage {
161 match S3Client::new(synckit_storage_config, &config.host_url).await {
162 Ok(client) => {
163 tracing::info!(bucket = %synckit_storage_config.bucket, "SyncKit blob storage initialized");
164 Some(std::sync::Arc::new(client))
165 }
166 Err(e) => {
167 tracing::warn!(error = ?e, "Failed to initialize SyncKit blob storage");
168 None
169 }
170 }
171 } else {
172 tracing::info!("SyncKit blob storage not configured");
173 None
174 };
175
176 // Initialize Stripe client if configured
177 let stripe: Option<std::sync::Arc<dyn makenotwork::payments::PaymentProvider>> = if let Some(ref stripe_config) = config.stripe {
178 tracing::info!("Stripe payments initialized");
179 Some(std::sync::Arc::new(StripeClient::new(stripe_config)))
180 } else {
181 tracing::info!("Stripe not configured. Payments will be unavailable.");
182 None
183 };
184
185 // Initialize email client (logs in dev mode when POSTMARK_TOKEN is not set)
186 let email = EmailClient::new(EmailConfig::from_env(), Some(db.clone()));
187
188 // Load business assumptions (single source of truth for figures in the docs).
189 // Validation failure aborts startup — we don't want to serve stale numbers.
190 let assumptions_path = std::env::var("ASSUMPTIONS_PATH")
191 .unwrap_or_else(|_| "docs/business/assumptions.toml".to_string());
192 let assumptions = std::sync::Arc::new(
193 match Assumptions::load(&assumptions_path) {
194 Ok(a) => {
195 if let Err(e) = a.validate() {
196 panic!("assumptions validation failed:\n{e}");
197 }
198 tracing::info!(path = %assumptions_path, "assumptions loaded and validated");
199 a
200 }
201 Err(e) => panic!("failed to load assumptions from {assumptions_path}: {e}"),
202 }
203 );
204
205 // Load documentation pages from disk
206 let docs_path = std::env::var("DOCS_PATH").unwrap_or_else(|_| "site-docs/public".to_string());
207 let docs = std::sync::Arc::new(DocLoader::load(
208 std::path::Path::new(&docs_path),
209 &DocLoaderConfig {
210 sections: vec![
211 ("about".to_string(), "About".to_string()),
212 ("guide".to_string(), "Guide".to_string()),
213 ("developer".to_string(), "Developer".to_string()),
214 ("legal".to_string(), "Legal".to_string()),
215 ("support".to_string(), "Support".to_string()),
216 ("tech".to_string(), "Tech".to_string()),
217 ],
218 link_prefix: "/docs".to_string(),
219 unpublished_pattern: Some("unpublished/".to_string()),
220 examples_path: Some(std::path::Path::new(&docs_path).join("../examples")),
221 pre_process: {
222 let assumptions = assumptions.clone();
223 Some(Box::new(move |md: &str| {
224 assumptions.substitute(md).map_err(|e| e.to_string())
225 }))
226 },
227 },
228 ));
229
230 // Initialize file scanning pipeline (optional). If scanning is configured,
231 // assert at least one AV layer is actually live — otherwise the FailOpen
232 // policy on ClamAV silently passes everything as Clean and the deploy
233 // looks healthy while shipping zero real coverage. Refuse to boot.
234 let scanner = if let Some(ref scan_config) = config.scan {
235 match ScanPipeline::new(scan_config) {
236 Ok(s) => match s.assert_live().await {
237 Ok(()) => {
238 tracing::info!("File scanning enabled");
239 Some(std::sync::Arc::new(s))
240 }
241 Err(e) => panic!("Scanning configured but no live AV layer: {e}"),
242 },
243 Err(e) => {
244 tracing::warn!(error = %e, "File scanning disabled");
245 None
246 }
247 }
248 } else {
249 tracing::info!("File scanning not configured");
250 None
251 };
252
253 // Initialize WebAuthn from HOST_URL (passkey / passwordless login)
254 let rp_origin = url::Url::parse(&config.host_url).expect("HOST_URL is not a valid URL");
255 let rp_id = rp_origin.host_str().expect("HOST_URL has no host").to_string();
256 let webauthn = std::sync::Arc::new(
257 WebauthnBuilder::new(&rp_id, &rp_origin)
258 .expect("Failed to create WebauthnBuilder")
259 .rp_name("Makenotwork")
260 .build()
261 .expect("Failed to build Webauthn"),
262 );
263 tracing::info!("WebAuthn initialized (rp_id={rp_id})");
264
265 // Initialize syntax highlighter if git repos path is configured
266 let syntax = if config.git_repos_path.is_some() {
267 tracing::info!(path = ?config.git_repos_path, "Git source browser enabled");
268 Some(std::sync::Arc::new(makenotwork::git::SyntaxHighlighter::new()))
269 } else {
270 tracing::info!("Git source browser not configured (GIT_REPOS_PATH unset)");
271 None
272 };
273
274 // Construct MT client when both mt_base_url and internal_shared_secret are set
275 let mt_client = match (&config.mt_base_url, &config.internal_shared_secret) {
276 (Some(base_url), Some(secret)) => {
277 tracing::info!(base_url = %base_url, "MT integration enabled");
278 Some(makenotwork::mt_client::MtClient::new(base_url.clone(), secret.clone()))
279 }
280 _ => {
281 tracing::info!("MT integration not configured (need MT_BASE_URL + INTERNAL_SHARED_SECRET)");
282 None
283 }
284 };
285
286 // WAM ticket manager client (tailnet-only, for operational alerts)
287 let wam = config.wam_url.as_ref().map(|url| {
288 tracing::info!(url = %url, "WAM integration enabled");
289 makenotwork::wam_client::WamClient::new(url.clone())
290 });
291
292 // Warm custom domain cache
293 let domain_cache = std::sync::Arc::new(dashmap::DashMap::new());
294 match makenotwork::db::custom_domains::get_all_verified_domains(&db).await {
295 Ok(domains) => {
296 for d in &domains {
297 domain_cache.insert(d.domain.clone(), d.user_id);
298 }
299 if !domains.is_empty() {
300 tracing::info!(count = domains.len(), "Custom domain cache warmed");
301 }
302 }
303 Err(e) => {
304 tracing::warn!(error = ?e, "Failed to warm custom domain cache");
305 }
306 }
307
308 let started_at = chrono::Utc::now();
309 let start_instant = Instant::now();
310 let page_view_tx = makenotwork::db::page_views::spawn_batcher(db.clone());
311 let bg = makenotwork::background::spawn_pool();
312 let state = AppState {
313 db,
314 config: config.clone(),
315 s3,
316 synckit_s3,
317 stripe,
318 email,
319 docs,
320 tier_prices: makenotwork::tier_prices::TierPrices::from_assumptions(&assumptions),
321 cost_allocation: {
322 let tp = makenotwork::tier_prices::TierPrices::from_assumptions(&assumptions);
323 makenotwork::tier_prices::CostAllocation::from_assumptions(&assumptions, &tp)
324 },
325 runway_config: makenotwork::tier_prices::RunwayConfig::from_assumptions(&assumptions),
326 scanner,
327 webauthn,
328 syntax,
329 started_at,
330 start_instant,
331 session_cache: std::sync::Arc::new(dashmap::DashMap::new()),
332 mt_client,
333 wam,
334 domain_cache,
335 scan_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(makenotwork::constants::SCAN_MAX_CONCURRENT)),
336 caddy_ask_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(makenotwork::constants::CADDY_ASK_MAX_CONCURRENT)),
337 restart_at: std::sync::Arc::new(std::sync::atomic::AtomicI64::new(0)),
338 sync_notify: std::sync::Arc::new(dashmap::DashMap::new()),
339 sse_connections: std::sync::Arc::new(dashmap::DashMap::new()),
340 metrics_handle: Some(makenotwork::metrics::init()),
341 page_view_tx,
342 bg,
343 };
344
345 // Log active features at startup
346 tracing::info!(
347 s3 = state.s3.is_some(),
348 synckit_s3 = state.synckit_s3.is_some(),
349 stripe = state.stripe.is_some(),
350 scanner = state.scanner.is_some(),
351 mt = state.mt_client.is_some(),
352 wam = state.wam.is_some(),
353 git = state.config.git_repos_path.is_some(),
354 "Active features"
355 );
356
357 // Start background health monitor and scheduler
358 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(());
359 let _monitor_handle = makenotwork::monitor::spawn_monitor(state.clone(), shutdown_rx);
360 let scheduler_shutdown_rx = shutdown_tx.subscribe();
361 let _scheduler_handle = makenotwork::scheduler::spawn_scheduler(state.clone(), scheduler_shutdown_rx);
362
363 // Start scan worker pool. Only meaningful if a scanner is configured;
364 // otherwise enqueue_scan_for never enqueues (trust-gate fast path).
365 if let (Some(scanner), Some(s3_for_workers)) = (state.scanner.clone(), state.s3.clone()) {
366 let scan_ctx = std::sync::Arc::new(makenotwork::scanning::worker::WorkerContext {
367 db: state.db.clone(),
368 s3: s3_for_workers,
369 pipeline: scanner,
370 scan_semaphore: state.scan_semaphore.clone(),
371 wam: state.wam.clone(),
372 });
373 let worker_count = makenotwork::constants::SCAN_WORKER_COUNT;
374 let worker_shutdown_rx = shutdown_tx.subscribe();
375 makenotwork::scanning::worker::spawn_pool(worker_count, scan_ctx, worker_shutdown_rx);
376 tracing::info!(worker_count, "scan worker pool started");
377
378 let report = makenotwork::scanning::spool::reap_all(
379 std::path::Path::new(makenotwork::constants::SCAN_SPOOL_DIR),
380 );
381 if report.deleted > 0 || report.errors > 0 {
382 tracing::info!(
383 deleted = report.deleted,
384 errors = report.errors,
385 "scan spool startup reaper completed"
386 );
387 }
388 }
389
390 // Build router (shared with integration tests via lib.rs)
391 let app = build_app(state, session_layer)
392 // Request ID: propagate → trace → set (Axum applies inside-out)
393 .layer(PropagateRequestIdLayer::x_request_id())
394 .layer(
395 TraceLayer::new_for_http()
396 .make_span_with(|request: &Request<_>| {
397 let request_id = request
398 .headers()
399 .get("x-request-id")
400 .and_then(|v| v.to_str().ok())
401 .unwrap_or("-");
402
403 tracing::info_span!(
404 "request",
405 method = %request.method(),
406 uri = %request.uri(),
407 request_id = %request_id,
408 user_id = tracing::field::Empty,
409 )
410 })
411 .on_response(|response: &axum::http::Response<_>, latency: Duration, _span: &tracing::Span| {
412 let status = response.status().as_u16();
413 if status >= 500 {
414 tracing::error!(status, latency_ms = latency.as_millis() as u64, "response");
415 } else if status >= 400 {
416 tracing::warn!(status, latency_ms = latency.as_millis() as u64, "response");
417 } else {
418 tracing::debug!(status, latency_ms = latency.as_millis() as u64, "response");
419 }
420 }),
421 )
422 .layer(SetRequestIdLayer::x_request_id(MakeRequestUuid));
423
424 let addr = config.socket_addr();
425 tracing::info!(addr = %addr, "listening");
426
427 let listener = tokio::net::TcpListener::bind(addr)
428 .await
429 .expect("Failed to bind to address");
430
431 // Use into_make_service_with_connect_info to provide peer IP for rate limiting.
432 // After the shutdown signal fires, in-flight requests get a 10-second window
433 // to complete before the process force-exits. This prevents hung connections
434 // from blocking deployment restarts indefinitely.
435 axum::serve(
436 listener,
437 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
438 )
439 .with_graceful_shutdown(shutdown_signal())
440 .await
441 .expect("Server error");
442
443 drop(shutdown_tx); // signal monitor to stop
444 tracing::info!("Server shut down gracefully");
445 }
446
447 /// Wait for SIGINT (Ctrl-C) or SIGTERM, then return to trigger graceful shutdown.
448 /// In-flight requests are allowed up to 10 seconds to complete. After that,
449 /// the process force-exits to prevent hung connections from blocking restarts.
450 async fn shutdown_signal() {
451 use tokio::signal;
452
453 let ctrl_c = async {
454 signal::ctrl_c()
455 .await
456 .expect("Failed to install Ctrl+C handler");
457 };
458
459 #[cfg(unix)]
460 let terminate = async {
461 signal::unix::signal(signal::unix::SignalKind::terminate())
462 .expect("Failed to install SIGTERM handler")
463 .recv()
464 .await;
465 };
466
467 #[cfg(not(unix))]
468 let terminate = std::future::pending::<()>();
469
470 tokio::select! {
471 () = ctrl_c => tracing::info!("Received SIGINT, starting graceful shutdown"),
472 () = terminate => tracing::info!("Received SIGTERM, starting graceful shutdown"),
473 }
474
475 // Spawn a hard deadline: if graceful drain takes longer than 10s, force exit
476 tokio::spawn(async {
477 tokio::time::sleep(Duration::from_secs(10)).await;
478 eprintln!("Graceful shutdown timed out after 10s, forcing exit");
479 std::process::exit(1);
480 });
481 }
482