| 1 |
|
| 2 |
|
| 3 |
use axum::http::Request; |
| 4 |
use sqlx::ConnectOptions; |
| 5 |
use sqlx::postgres::PgPoolOptions; |
| 6 |
use std::time::{Duration, Instant}; |
| 7 |
use tower_http::request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer}; |
| 8 |
use tower_http::trace::TraceLayer; |
| 9 |
use tower_sessions::cookie::time::Duration as CookieDuration; |
| 10 |
use tower_sessions::cookie::SameSite; |
| 11 |
use tower_sessions::{Expiry, SessionManagerLayer}; |
| 12 |
use tower_sessions_sqlx_store::PostgresStore; |
| 13 |
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer}; |
| 14 |
|
| 15 |
use makenotwork::config::Config; |
| 16 |
use makenotwork::constants; |
| 17 |
use docengine::{Assumptions, DocLoader, DocLoaderConfig}; |
| 18 |
use makenotwork::email::{EmailClient, EmailConfig}; |
| 19 |
use makenotwork::payments::StripeClient; |
| 20 |
use makenotwork::storage::S3Client; |
| 21 |
use makenotwork::scanning::ScanPipeline; |
| 22 |
use makenotwork::{build_app, AppState}; |
| 23 |
use webauthn_rs::WebauthnBuilder; |
| 24 |
|
| 25 |
#[tokio::main] |
| 26 |
async fn main() { |
| 27 |
dotenvy::dotenv().ok(); |
| 28 |
|
| 29 |
|
| 30 |
tracing_subscriber::registry() |
| 31 |
.with( |
| 32 |
tracing_subscriber::EnvFilter::try_from_default_env() |
| 33 |
.unwrap_or_else(|_| "makenotwork=debug,tower_http=debug,sqlx=info".into()), |
| 34 |
) |
| 35 |
.with(if cfg!(debug_assertions) { |
| 36 |
tracing_subscriber::fmt::layer().boxed() |
| 37 |
} else { |
| 38 |
tracing_subscriber::fmt::layer().json().boxed() |
| 39 |
}) |
| 40 |
.init(); |
| 41 |
|
| 42 |
|
| 43 |
|
| 44 |
|
| 45 |
|
| 46 |
|
| 47 |
|
| 48 |
if std::env::var("SANDO_BOOT_SMOKE").is_ok() { |
| 49 |
tracing::info!("SANDO_BOOT_SMOKE=1; running minimal smoke server"); |
| 50 |
let app = axum::Router::new() |
| 51 |
.route("/health", axum::routing::get(|| async { "ok" })); |
| 52 |
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await |
| 53 |
.expect("smoke: bind 127.0.0.1:0"); |
| 54 |
axum::serve(listener, app).await.expect("smoke: serve"); |
| 55 |
return; |
| 56 |
} |
| 57 |
|
| 58 |
let config = Config::from_env().expect("Failed to load configuration"); |
| 59 |
tracing::info!("Configuration loaded"); |
| 60 |
|
| 61 |
|
| 62 |
|
| 63 |
|
| 64 |
|
| 65 |
|
| 66 |
|
| 67 |
let connect_options: sqlx::postgres::PgConnectOptions = config.database_url.parse() |
| 68 |
.expect("Invalid DATABASE_URL"); |
| 69 |
let connect_options = connect_options |
| 70 |
.log_statements(log::LevelFilter::Trace) |
| 71 |
.log_slow_statements(log::LevelFilter::Warn, Duration::from_millis(100)); |
| 72 |
|
| 73 |
let db = PgPoolOptions::new() |
| 74 |
.max_connections(constants::DB_POOL_MAX_CONNECTIONS) |
| 75 |
.min_connections(constants::DB_POOL_MIN_CONNECTIONS) |
| 76 |
.acquire_timeout(Duration::from_secs(constants::DB_ACQUIRE_TIMEOUT_SECS)) |
| 77 |
.max_lifetime(Duration::from_secs(constants::DB_MAX_LIFETIME_SECS)) |
| 78 |
.idle_timeout(Duration::from_secs(constants::DB_IDLE_TIMEOUT_SECS)) |
| 79 |
.test_before_acquire(true) |
| 80 |
.connect_with(connect_options) |
| 81 |
.await |
| 82 |
.expect("Failed to connect to database"); |
| 83 |
|
| 84 |
tracing::info!("Database connected"); |
| 85 |
|
| 86 |
|
| 87 |
|
| 88 |
|
| 89 |
if let Err(e) = sqlx::migrate!("./migrations").run(&db).await { |
| 90 |
tracing::error!(error = %e, "Migration failed — exiting without restart"); |
| 91 |
|
| 92 |
|
| 93 |
if let Ok(wam_url) = std::env::var("WAM_URL") { |
| 94 |
let body = format!("Migration error on startup:\n{e}"); |
| 95 |
let _ = reqwest::Client::new() |
| 96 |
.post(format!("{wam_url}/tickets")) |
| 97 |
.json(&serde_json::json!({ |
| 98 |
"title": "Migration failure — server not starting", |
| 99 |
"body": body, |
| 100 |
"priority": "critical", |
| 101 |
"source": "migration-failure", |
| 102 |
})) |
| 103 |
.timeout(std::time::Duration::from_secs(5)) |
| 104 |
.send() |
| 105 |
.await; |
| 106 |
} |
| 107 |
|
| 108 |
|
| 109 |
std::process::exit(2); |
| 110 |
} |
| 111 |
|
| 112 |
tracing::info!("Migrations complete"); |
| 113 |
|
| 114 |
|
| 115 |
let session_store = PostgresStore::new(db.clone()); |
| 116 |
session_store |
| 117 |
.migrate() |
| 118 |
.await |
| 119 |
.expect("Failed to migrate session store"); |
| 120 |
|
| 121 |
|
| 122 |
let secure_cookies = |
| 123 |
!cfg!(debug_assertions) && std::env::var("INSECURE_COOKIES").unwrap_or_default() != "1"; |
| 124 |
if secure_cookies { |
| 125 |
tracing::info!("Session cookies configured for HTTPS (secure=true)"); |
| 126 |
} else if !cfg!(debug_assertions) { |
| 127 |
tracing::warn!("Session cookies set to insecure (INSECURE_COOKIES=1)"); |
| 128 |
} |
| 129 |
|
| 130 |
let session_layer = SessionManagerLayer::new(session_store) |
| 131 |
.with_secure(secure_cookies) |
| 132 |
.with_http_only(true) |
| 133 |
.with_same_site(SameSite::Lax) |
| 134 |
.with_expiry(Expiry::OnInactivity(CookieDuration::days( |
| 135 |
constants::SESSION_EXPIRY_DAYS, |
| 136 |
))); |
| 137 |
|
| 138 |
tracing::info!("PostgreSQL session store initialized"); |
| 139 |
|
| 140 |
|
| 141 |
let s3: Option<std::sync::Arc<dyn makenotwork::storage::StorageBackend>> = |
| 142 |
if let Some(ref storage_config) = config.storage { |
| 143 |
match S3Client::new(storage_config, &config.host_url).await { |
| 144 |
Ok(client) => { |
| 145 |
tracing::info!(bucket = %storage_config.bucket, "S3 storage initialized"); |
| 146 |
Some(std::sync::Arc::new(client)) |
| 147 |
} |
| 148 |
Err(e) => { |
| 149 |
tracing::warn!(error = ?e, "failed to initialize S3 storage, file uploads unavailable"); |
| 150 |
None |
| 151 |
} |
| 152 |
} |
| 153 |
} else { |
| 154 |
tracing::info!("S3 storage not configured. File uploads will be unavailable."); |
| 155 |
None |
| 156 |
}; |
| 157 |
|
| 158 |
|
| 159 |
let synckit_s3: Option<std::sync::Arc<dyn makenotwork::storage::StorageBackend>> = |
| 160 |
if let Some(ref synckit_storage_config) = config.synckit_storage { |
| 161 |
match S3Client::new(synckit_storage_config, &config.host_url).await { |
| 162 |
Ok(client) => { |
| 163 |
tracing::info!(bucket = %synckit_storage_config.bucket, "SyncKit blob storage initialized"); |
| 164 |
Some(std::sync::Arc::new(client)) |
| 165 |
} |
| 166 |
Err(e) => { |
| 167 |
tracing::warn!(error = ?e, "Failed to initialize SyncKit blob storage"); |
| 168 |
None |
| 169 |
} |
| 170 |
} |
| 171 |
} else { |
| 172 |
tracing::info!("SyncKit blob storage not configured"); |
| 173 |
None |
| 174 |
}; |
| 175 |
|
| 176 |
|
| 177 |
let stripe: Option<std::sync::Arc<dyn makenotwork::payments::PaymentProvider>> = if let Some(ref stripe_config) = config.stripe { |
| 178 |
tracing::info!("Stripe payments initialized"); |
| 179 |
Some(std::sync::Arc::new(StripeClient::new(stripe_config))) |
| 180 |
} else { |
| 181 |
tracing::info!("Stripe not configured. Payments will be unavailable."); |
| 182 |
None |
| 183 |
}; |
| 184 |
|
| 185 |
|
| 186 |
let email = EmailClient::new(EmailConfig::from_env(), Some(db.clone())); |
| 187 |
|
| 188 |
|
| 189 |
|
| 190 |
let assumptions_path = std::env::var("ASSUMPTIONS_PATH") |
| 191 |
.unwrap_or_else(|_| "docs/business/assumptions.toml".to_string()); |
| 192 |
let assumptions = std::sync::Arc::new( |
| 193 |
match Assumptions::load(&assumptions_path) { |
| 194 |
Ok(a) => { |
| 195 |
if let Err(e) = a.validate() { |
| 196 |
panic!("assumptions validation failed:\n{e}"); |
| 197 |
} |
| 198 |
tracing::info!(path = %assumptions_path, "assumptions loaded and validated"); |
| 199 |
a |
| 200 |
} |
| 201 |
Err(e) => panic!("failed to load assumptions from {assumptions_path}: {e}"), |
| 202 |
} |
| 203 |
); |
| 204 |
|
| 205 |
|
| 206 |
let docs_path = std::env::var("DOCS_PATH").unwrap_or_else(|_| "site-docs/public".to_string()); |
| 207 |
let docs = std::sync::Arc::new(DocLoader::load( |
| 208 |
std::path::Path::new(&docs_path), |
| 209 |
&DocLoaderConfig { |
| 210 |
sections: vec![ |
| 211 |
("about".to_string(), "About".to_string()), |
| 212 |
("guide".to_string(), "Guide".to_string()), |
| 213 |
("developer".to_string(), "Developer".to_string()), |
| 214 |
("legal".to_string(), "Legal".to_string()), |
| 215 |
("support".to_string(), "Support".to_string()), |
| 216 |
("tech".to_string(), "Tech".to_string()), |
| 217 |
], |
| 218 |
link_prefix: "/docs".to_string(), |
| 219 |
unpublished_pattern: Some("unpublished/".to_string()), |
| 220 |
examples_path: Some(std::path::Path::new(&docs_path).join("../examples")), |
| 221 |
pre_process: { |
| 222 |
let assumptions = assumptions.clone(); |
| 223 |
Some(Box::new(move |md: &str| { |
| 224 |
assumptions.substitute(md).map_err(|e| e.to_string()) |
| 225 |
})) |
| 226 |
}, |
| 227 |
}, |
| 228 |
)); |
| 229 |
|
| 230 |
|
| 231 |
|
| 232 |
|
| 233 |
|
| 234 |
let scanner = if let Some(ref scan_config) = config.scan { |
| 235 |
match ScanPipeline::new(scan_config) { |
| 236 |
Ok(s) => match s.assert_live().await { |
| 237 |
Ok(()) => { |
| 238 |
tracing::info!("File scanning enabled"); |
| 239 |
Some(std::sync::Arc::new(s)) |
| 240 |
} |
| 241 |
Err(e) => panic!("Scanning configured but no live AV layer: {e}"), |
| 242 |
}, |
| 243 |
Err(e) => { |
| 244 |
tracing::warn!(error = %e, "File scanning disabled"); |
| 245 |
None |
| 246 |
} |
| 247 |
} |
| 248 |
} else { |
| 249 |
tracing::info!("File scanning not configured"); |
| 250 |
None |
| 251 |
}; |
| 252 |
|
| 253 |
|
| 254 |
let rp_origin = url::Url::parse(&config.host_url).expect("HOST_URL is not a valid URL"); |
| 255 |
let rp_id = rp_origin.host_str().expect("HOST_URL has no host").to_string(); |
| 256 |
let webauthn = std::sync::Arc::new( |
| 257 |
WebauthnBuilder::new(&rp_id, &rp_origin) |
| 258 |
.expect("Failed to create WebauthnBuilder") |
| 259 |
.rp_name("Makenotwork") |
| 260 |
.build() |
| 261 |
.expect("Failed to build Webauthn"), |
| 262 |
); |
| 263 |
tracing::info!("WebAuthn initialized (rp_id={rp_id})"); |
| 264 |
|
| 265 |
|
| 266 |
let syntax = if config.git_repos_path.is_some() { |
| 267 |
tracing::info!(path = ?config.git_repos_path, "Git source browser enabled"); |
| 268 |
Some(std::sync::Arc::new(makenotwork::git::SyntaxHighlighter::new())) |
| 269 |
} else { |
| 270 |
tracing::info!("Git source browser not configured (GIT_REPOS_PATH unset)"); |
| 271 |
None |
| 272 |
}; |
| 273 |
|
| 274 |
|
| 275 |
let mt_client = match (&config.mt_base_url, &config.internal_shared_secret) { |
| 276 |
(Some(base_url), Some(secret)) => { |
| 277 |
tracing::info!(base_url = %base_url, "MT integration enabled"); |
| 278 |
Some(makenotwork::mt_client::MtClient::new(base_url.clone(), secret.clone())) |
| 279 |
} |
| 280 |
_ => { |
| 281 |
tracing::info!("MT integration not configured (need MT_BASE_URL + INTERNAL_SHARED_SECRET)"); |
| 282 |
None |
| 283 |
} |
| 284 |
}; |
| 285 |
|
| 286 |
|
| 287 |
let wam = config.wam_url.as_ref().map(|url| { |
| 288 |
tracing::info!(url = %url, "WAM integration enabled"); |
| 289 |
makenotwork::wam_client::WamClient::new(url.clone()) |
| 290 |
}); |
| 291 |
|
| 292 |
|
| 293 |
let domain_cache = std::sync::Arc::new(dashmap::DashMap::new()); |
| 294 |
match makenotwork::db::custom_domains::get_all_verified_domains(&db).await { |
| 295 |
Ok(domains) => { |
| 296 |
for d in &domains { |
| 297 |
domain_cache.insert(d.domain.clone(), d.user_id); |
| 298 |
} |
| 299 |
if !domains.is_empty() { |
| 300 |
tracing::info!(count = domains.len(), "Custom domain cache warmed"); |
| 301 |
} |
| 302 |
} |
| 303 |
Err(e) => { |
| 304 |
tracing::warn!(error = ?e, "Failed to warm custom domain cache"); |
| 305 |
} |
| 306 |
} |
| 307 |
|
| 308 |
let started_at = chrono::Utc::now(); |
| 309 |
let start_instant = Instant::now(); |
| 310 |
let page_view_tx = makenotwork::db::page_views::spawn_batcher(db.clone()); |
| 311 |
let bg = makenotwork::background::spawn_pool(); |
| 312 |
let state = AppState { |
| 313 |
db, |
| 314 |
config: config.clone(), |
| 315 |
s3, |
| 316 |
synckit_s3, |
| 317 |
stripe, |
| 318 |
email, |
| 319 |
docs, |
| 320 |
tier_prices: makenotwork::tier_prices::TierPrices::from_assumptions(&assumptions), |
| 321 |
cost_allocation: { |
| 322 |
let tp = makenotwork::tier_prices::TierPrices::from_assumptions(&assumptions); |
| 323 |
makenotwork::tier_prices::CostAllocation::from_assumptions(&assumptions, &tp) |
| 324 |
}, |
| 325 |
runway_config: makenotwork::tier_prices::RunwayConfig::from_assumptions(&assumptions), |
| 326 |
scanner, |
| 327 |
webauthn, |
| 328 |
syntax, |
| 329 |
started_at, |
| 330 |
start_instant, |
| 331 |
session_cache: std::sync::Arc::new(dashmap::DashMap::new()), |
| 332 |
mt_client, |
| 333 |
wam, |
| 334 |
domain_cache, |
| 335 |
scan_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(makenotwork::constants::SCAN_MAX_CONCURRENT)), |
| 336 |
caddy_ask_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(makenotwork::constants::CADDY_ASK_MAX_CONCURRENT)), |
| 337 |
restart_at: std::sync::Arc::new(std::sync::atomic::AtomicI64::new(0)), |
| 338 |
sync_notify: std::sync::Arc::new(dashmap::DashMap::new()), |
| 339 |
sse_connections: std::sync::Arc::new(dashmap::DashMap::new()), |
| 340 |
metrics_handle: Some(makenotwork::metrics::init()), |
| 341 |
page_view_tx, |
| 342 |
bg, |
| 343 |
}; |
| 344 |
|
| 345 |
|
| 346 |
tracing::info!( |
| 347 |
s3 = state.s3.is_some(), |
| 348 |
synckit_s3 = state.synckit_s3.is_some(), |
| 349 |
stripe = state.stripe.is_some(), |
| 350 |
scanner = state.scanner.is_some(), |
| 351 |
mt = state.mt_client.is_some(), |
| 352 |
wam = state.wam.is_some(), |
| 353 |
git = state.config.git_repos_path.is_some(), |
| 354 |
"Active features" |
| 355 |
); |
| 356 |
|
| 357 |
|
| 358 |
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(()); |
| 359 |
let _monitor_handle = makenotwork::monitor::spawn_monitor(state.clone(), shutdown_rx); |
| 360 |
let scheduler_shutdown_rx = shutdown_tx.subscribe(); |
| 361 |
let _scheduler_handle = makenotwork::scheduler::spawn_scheduler(state.clone(), scheduler_shutdown_rx); |
| 362 |
|
| 363 |
|
| 364 |
|
| 365 |
if let (Some(scanner), Some(s3_for_workers)) = (state.scanner.clone(), state.s3.clone()) { |
| 366 |
let scan_ctx = std::sync::Arc::new(makenotwork::scanning::worker::WorkerContext { |
| 367 |
db: state.db.clone(), |
| 368 |
s3: s3_for_workers, |
| 369 |
pipeline: scanner, |
| 370 |
scan_semaphore: state.scan_semaphore.clone(), |
| 371 |
wam: state.wam.clone(), |
| 372 |
}); |
| 373 |
let worker_count = makenotwork::constants::SCAN_WORKER_COUNT; |
| 374 |
let worker_shutdown_rx = shutdown_tx.subscribe(); |
| 375 |
makenotwork::scanning::worker::spawn_pool(worker_count, scan_ctx, worker_shutdown_rx); |
| 376 |
tracing::info!(worker_count, "scan worker pool started"); |
| 377 |
|
| 378 |
let report = makenotwork::scanning::spool::reap_all( |
| 379 |
std::path::Path::new(makenotwork::constants::SCAN_SPOOL_DIR), |
| 380 |
); |
| 381 |
if report.deleted > 0 || report.errors > 0 { |
| 382 |
tracing::info!( |
| 383 |
deleted = report.deleted, |
| 384 |
errors = report.errors, |
| 385 |
"scan spool startup reaper completed" |
| 386 |
); |
| 387 |
} |
| 388 |
} |
| 389 |
|
| 390 |
|
| 391 |
let app = build_app(state, session_layer) |
| 392 |
|
| 393 |
.layer(PropagateRequestIdLayer::x_request_id()) |
| 394 |
.layer( |
| 395 |
TraceLayer::new_for_http() |
| 396 |
.make_span_with(|request: &Request<_>| { |
| 397 |
let request_id = request |
| 398 |
.headers() |
| 399 |
.get("x-request-id") |
| 400 |
.and_then(|v| v.to_str().ok()) |
| 401 |
.unwrap_or("-"); |
| 402 |
|
| 403 |
tracing::info_span!( |
| 404 |
"request", |
| 405 |
method = %request.method(), |
| 406 |
uri = %request.uri(), |
| 407 |
request_id = %request_id, |
| 408 |
user_id = tracing::field::Empty, |
| 409 |
) |
| 410 |
}) |
| 411 |
.on_response(|response: &axum::http::Response<_>, latency: Duration, _span: &tracing::Span| { |
| 412 |
let status = response.status().as_u16(); |
| 413 |
if status >= 500 { |
| 414 |
tracing::error!(status, latency_ms = latency.as_millis() as u64, "response"); |
| 415 |
} else if status >= 400 { |
| 416 |
tracing::warn!(status, latency_ms = latency.as_millis() as u64, "response"); |
| 417 |
} else { |
| 418 |
tracing::debug!(status, latency_ms = latency.as_millis() as u64, "response"); |
| 419 |
} |
| 420 |
}), |
| 421 |
) |
| 422 |
.layer(SetRequestIdLayer::x_request_id(MakeRequestUuid)); |
| 423 |
|
| 424 |
let addr = config.socket_addr(); |
| 425 |
tracing::info!(addr = %addr, "listening"); |
| 426 |
|
| 427 |
let listener = tokio::net::TcpListener::bind(addr) |
| 428 |
.await |
| 429 |
.expect("Failed to bind to address"); |
| 430 |
|
| 431 |
|
| 432 |
|
| 433 |
|
| 434 |
|
| 435 |
axum::serve( |
| 436 |
listener, |
| 437 |
app.into_make_service_with_connect_info::<std::net::SocketAddr>(), |
| 438 |
) |
| 439 |
.with_graceful_shutdown(shutdown_signal()) |
| 440 |
.await |
| 441 |
.expect("Server error"); |
| 442 |
|
| 443 |
drop(shutdown_tx); |
| 444 |
tracing::info!("Server shut down gracefully"); |
| 445 |
} |
| 446 |
|
| 447 |
|
| 448 |
|
| 449 |
|
| 450 |
async fn shutdown_signal() { |
| 451 |
use tokio::signal; |
| 452 |
|
| 453 |
let ctrl_c = async { |
| 454 |
signal::ctrl_c() |
| 455 |
.await |
| 456 |
.expect("Failed to install Ctrl+C handler"); |
| 457 |
}; |
| 458 |
|
| 459 |
#[cfg(unix)] |
| 460 |
let terminate = async { |
| 461 |
signal::unix::signal(signal::unix::SignalKind::terminate()) |
| 462 |
.expect("Failed to install SIGTERM handler") |
| 463 |
.recv() |
| 464 |
.await; |
| 465 |
}; |
| 466 |
|
| 467 |
#[cfg(not(unix))] |
| 468 |
let terminate = std::future::pending::<()>(); |
| 469 |
|
| 470 |
tokio::select! { |
| 471 |
() = ctrl_c => tracing::info!("Received SIGINT, starting graceful shutdown"), |
| 472 |
() = terminate => tracing::info!("Received SIGTERM, starting graceful shutdown"), |
| 473 |
} |
| 474 |
|
| 475 |
|
| 476 |
tokio::spawn(async { |
| 477 |
tokio::time::sleep(Duration::from_secs(10)).await; |
| 478 |
eprintln!("Graceful shutdown timed out after 10s, forcing exit"); |
| 479 |
std::process::exit(1); |
| 480 |
}); |
| 481 |
} |
| 482 |
|