//! Gate execution. Each gate kind has a runner that produces a pass/fail //! outcome plus an optional detail string (typically a stderr tail or a //! human-readable reason). Outcomes are persisted to `gate_runs` so /state //! and the TUI can show them. use crate::classify; use crate::config::Config; use crate::domain::{GateKind, GateRunId, TierId, Version}; use crate::events::{self, Event, EventTx}; use crate::live_log::LiveLog; use crate::outcome::{GateBlocker, GateFailure, GateOutcome, LogRef, PassNote}; use crate::topology::Gate; use anyhow::Result; use chrono::Utc; use sqlx::SqlitePool; use std::path::PathBuf; use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::process::Command; pub struct GateCtx { pub pool: SqlitePool, pub cfg: Arc, pub tier: TierId, pub version: Version, pub worktree: PathBuf, pub events: EventTx, } /// Run a single gate end-to-end: insert the in-flight row, execute the gate, /// update the row with the outcome. Returns the outcome for the caller. pub async fn run(ctx: &GateCtx, gate: &Gate) -> Result { let kind = gate.kind(); let started_at = Utc::now().to_rfc3339(); let id: i64 = sqlx::query_scalar( "INSERT INTO gate_runs (version, tier, gate_kind, started_at) VALUES (?, ?, ?, ?) RETURNING id", ) .bind(&ctx.version) .bind(&ctx.tier) .bind(kind) .bind(&started_at) .fetch_one(&ctx.pool) .await?; let run_id = GateRunId(id); tracing::info!( run_id = %run_id, tier = %ctx.tier, version = %ctx.version, gate = %kind, "gate start", ); events::emit(&ctx.events, Event::GateStart { run_id, tier: ctx.tier.clone(), version: ctx.version.clone(), gate: kind, }); let outcome = match gate { Gate::CargoTest => cargo_test(ctx, run_id).await, Gate::MigrationDryRun => migration_dry_run(ctx).await, Gate::BootSmoke => boot_smoke(ctx, run_id).await, Gate::BurnIn { hours } => burn_in(ctx, *hours).await, Gate::ManualConfirm => manual_confirm(ctx).await, }; let outcome = outcome.unwrap_or_else(|e| GateOutcome::failed(GateFailure::Unclassified { legacy_detail: Some(format!("gate runner errored: {e}")), })); let outcome_json = serde_json::to_string(&outcome) .unwrap_or_else(|e| format!("{{\"_serialize_error\":{e:?}}}")); sqlx::query( "UPDATE gate_runs SET finished_at = ?, status = ?, outcome_json = ?, log_ref = ? WHERE id = ?", ) .bind(Utc::now().to_rfc3339()) .bind(outcome.status_str()) .bind(&outcome_json) .bind(outcome.log_ref.as_ref().map(|l| l.as_str())) .bind(id) .execute(&ctx.pool) .await?; tracing::info!( tier = %ctx.tier, version = %ctx.version, gate = %kind, status = outcome.status_str(), "gate done", ); events::emit(&ctx.events, Event::GateDone { run_id, tier: ctx.tier.clone(), version: ctx.version.clone(), gate: kind, outcome: outcome.clone(), }); Ok(outcome) } /// Run a sequence of gates; stops on the first failure (no point running the /// Run every gate in order and return true iff all passed. We deliberately do /// NOT short-circuit on first failure — every gate's outcome is recorded in /// `gate_runs`, which is the operator's only visibility into pipeline health. /// Hiding later gates because an earlier one failed makes diagnosis worse. pub async fn run_all(ctx: &GateCtx, gates: &[Gate]) -> Result { let mut all_ok = true; for g in gates { let o = run(ctx, g).await?; if !o.is_passed() { all_ok = false; } } Ok(all_ok) } // ---- individual gate runners ---- async fn cargo_test(ctx: &GateCtx, run_id: GateRunId) -> Result { let server_dir = ctx.worktree.join("server"); let mut cmd = Command::new("cargo"); // Match CI (`server/deploy/run-ci.sh`): `--features fast-tests` relaxes // auth rate-limit burst (5 → 20) and argon2 cost so signup-heavy + lockout // workflow tests can complete without hitting Governor before the // hand-rolled lockout check. The feature is specifically documented for // this in `server/src/constants.rs:87`. cmd.args(["test", "--release", "--features", "fast-tests"]) .current_dir(&server_dir) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .kill_on_drop(true); // Same online-mode rationale as the build step: sqlx query macros need a // live DB to type-check against. The scratch DB is left in migrated state // by the preceding build, so we can reuse it here. if let Some(scratch_url) = ctx.cfg.scratch_db_url.as_deref() { cmd.env("DATABASE_URL", scratch_url); // The server test harness (tests/harness/db.rs) parses TEST_DATABASE_URL // with rfind('/'), which mangles URLs whose query string contains '/' // (e.g. `?host=/var/run/postgresql`). Strip the query — libpq defaults // to /var/run/postgresql on Debian/Ubuntu when host is unspecified. let test_url = scratch_url .split_once('?') .map(|(base, _)| base) .unwrap_or(scratch_url); cmd.env("TEST_DATABASE_URL", test_url); } let started = std::time::Instant::now(); let log_path = gate_log_path(ctx, GateKind::CargoTest); let log_ref = LogRef::new(&ctx.version, GateKind::CargoTest); let mut child = match cmd.spawn() { Ok(c) => c, Err(e) => { return Ok(GateOutcome::failed(GateFailure::SpawnFailed { message: e.to_string(), }).with_log_ref(log_ref)); } }; let (stdout_buf, stderr_buf, status) = stream_child_to_live_log(&mut child, ctx.events.clone(), run_id, log_path).await?; let duration_s = started.elapsed().as_secs() as u32; if status.success() { Ok(GateOutcome::passed(PassNote::TestsPassed { duration_s }).with_log_ref(log_ref)) } else { let failure = classify::classify_cargo_test(&stdout_buf, &stderr_buf); Ok(GateOutcome::failed(failure).with_log_ref(log_ref)) } } async fn migration_dry_run(ctx: &GateCtx) -> Result { let mut log_buf: Vec = Vec::new(); let log_ref = LogRef::new(&ctx.version, GateKind::MigrationDryRun); let finish = |outcome: GateOutcome, buf: Vec| async move { persist_gate_log(ctx, GateKind::MigrationDryRun, &buf, &[]).await; outcome }; let Some(db_url) = ctx.cfg.scratch_db_url.as_deref() else { log_buf.extend_from_slice(b"scratch_db_url unset in daemon config\n"); return Ok(finish( GateOutcome::blocked(GateBlocker::ScratchDbUrlUnset).with_log_ref(log_ref), log_buf, ).await); }; let backup: Option<(String,)> = sqlx::query_as( "SELECT local_path FROM backups ORDER BY id DESC LIMIT 1", ) .fetch_optional(&ctx.pool) .await?; let Some((backup_path,)) = backup else { log_buf.extend_from_slice(b"no backup fetched; call /backup/fetch first\n"); return Ok(finish( GateOutcome::blocked(GateBlocker::NoBackupAvailable).with_log_ref(log_ref), log_buf, ).await); }; log_buf.extend_from_slice(b"---- reset_scratch ----\n"); if let Err(e) = reset_scratch(db_url).await { let msg = format!("scratch reset: {e}"); log_buf.extend_from_slice(msg.as_bytes()); return Ok(finish( GateOutcome::failed(GateFailure::RestoreFailed { reason: msg }).with_log_ref(log_ref), log_buf, ).await); } log_buf.extend_from_slice(format!("---- restore_dump ({backup_path}) ----\n").as_bytes()); if let Err(e) = restore_dump(db_url, &backup_path, &mut log_buf).await { let msg = format!("restore: {e}"); log_buf.extend_from_slice(msg.as_bytes()); return Ok(finish( GateOutcome::failed(GateFailure::RestoreFailed { reason: msg }).with_log_ref(log_ref), log_buf, ).await); } let migrations_dir = ctx.worktree.join("server").join("migrations"); log_buf.extend_from_slice(b"---- run_migrator ----\n"); match run_migrator(db_url, &migrations_dir).await { Ok(()) => { let detail = format!("restored {backup_path} + migrated"); log_buf.extend_from_slice(detail.as_bytes()); Ok(finish( GateOutcome::passed(PassNote::Migrated { backup_path: backup_path.clone() }) .with_log_ref(log_ref), log_buf, ).await) } Err(e) => { let err_s = e.to_string(); log_buf.extend_from_slice(err_s.as_bytes()); let failure = classify::classify_migration_error(&err_s, None); Ok(finish( GateOutcome::failed(failure).with_log_ref(log_ref), log_buf, ).await) } } } pub(crate) async fn reset_scratch(db_url: &str) -> Result<()> { use sqlx::postgres::PgPoolOptions; use sqlx::Executor; let pool = PgPoolOptions::new().max_connections(1).connect(db_url).await?; // Drop every non-system schema, not just public — migrations create custom // schemas (e.g. tower_sessions) that survive `DROP SCHEMA public CASCADE` // and then collide on the next migration run. pool.execute( r#" DO $$ DECLARE s text; BEGIN FOR s IN SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname NOT IN ('information_schema') LOOP EXECUTE format('DROP SCHEMA IF EXISTS %I CASCADE', s); END LOOP; EXECUTE 'CREATE SCHEMA public'; END $$; "#, ) .await?; pool.close().await; Ok(()) } async fn restore_dump(db_url: &str, dump: &str, log_buf: &mut Vec) -> Result<()> { // Two pipelines we accept: // *.sql -> psql $url < dump // *.sql.gz -> gunzip -c dump | psql $url let is_gz = dump.ends_with(".gz"); let shell = if is_gz { format!("gunzip -c {q} | psql {url}", q = shell_escape(dump), url = shell_escape(db_url)) } else { format!("psql {url} < {q}", q = shell_escape(dump), url = shell_escape(db_url)) }; let out = Command::new("sh").arg("-c").arg(&shell).output().await?; log_buf.extend_from_slice(&out.stdout); log_buf.extend_from_slice(&out.stderr); anyhow::ensure!( out.status.success(), "restore failed: {}", String::from_utf8_lossy(&out.stderr), ); Ok(()) } pub(crate) async fn run_migrator(db_url: &str, dir: &std::path::Path) -> Result<()> { use sqlx::postgres::PgPoolOptions; let pool = PgPoolOptions::new().max_connections(1).connect(db_url).await?; let migrator = sqlx::migrate::Migrator::new(dir).await?; migrator.run(&pool).await?; pool.close().await; Ok(()) } fn shell_escape(s: &str) -> String { format!("'{}'", s.replace('\'', "'\\''")) } async fn boot_smoke(ctx: &GateCtx, run_id: GateRunId) -> Result { let bin: Option<(String,)> = sqlx::query_as( "SELECT artifact_path FROM versions WHERE version = ?", ) .bind(&ctx.version) .fetch_optional(&ctx.pool) .await?; let Some((bin,)) = bin else { return Ok(GateOutcome::blocked(GateBlocker::ArtifactMissing { version: ctx.version.clone(), })); }; // Lowest-bar smoke: start the binary and verify it stays up for a few // seconds without exiting. Panics in main, missing config, port-bind // failures show up here. Anything more ambitious (probing /healthz on a // real port) needs server config we don't generically know. // // The server requires DATABASE_URL or it panics on config load before // we can observe anything. We point it at the scratch DB (already // migrated by the build step and refreshed by migration_dry_run if // that gate ran first). SCAN_ENABLED=false skips loading YARA rules // from /opt/makenotwork/yara-rules which doesn't exist on the build // host. Other config has sane optional defaults. let mut cmd = tokio::process::Command::new(&bin); cmd.env("SANDO_BOOT_SMOKE", "1") .env("SCAN_ENABLED", "false") .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .kill_on_drop(true); if let Some(scratch_url) = ctx.cfg.scratch_db_url.as_deref() { cmd.env("DATABASE_URL", scratch_url); } let log_path = gate_log_path(ctx, GateKind::BootSmoke); let log_ref = LogRef::new(&ctx.version, GateKind::BootSmoke); let mut child = match cmd.spawn() { Ok(c) => c, Err(e) => { // Spawn failures get a one-off log line via LiveLog so the // on-disk file still exists for `GET /logs/...`. let mut log = LiveLog::open(ctx.events.clone(), run_id, log_path).await; log.write_chunk(format!("spawn: {e}\n").as_bytes()).await; log.close().await; return Ok(GateOutcome::failed(GateFailure::SpawnFailed { message: e.to_string(), }).with_log_ref(log_ref)); } }; // The boot smoke window is 3s. Drain stdout/stderr concurrently through // a shared LiveLog sink so the operator sees panics/log lines stream in // real time before the kill, AND the on-disk log gets the full byte // stream for post-mortem reads. The drainers exit when their pipe // closes — which happens when the child exits naturally or after kill. let log = std::sync::Arc::new(tokio::sync::Mutex::new( LiveLog::open(ctx.events.clone(), run_id, log_path).await, )); let stdout_task = tokio::spawn(stream_into_log(child.stdout.take(), log.clone())); let stderr_task = tokio::spawn(stream_into_log(child.stderr.take(), log.clone())); tokio::time::sleep(std::time::Duration::from_secs(3)).await; let exit = child.try_wait()?; if exit.is_none() { let _ = child.kill().await; } // The boot_smoke classifier looks at exit code only — the streamed // bytes already landed in the live log and the on-disk file for the // post-mortem reader. Drain the join handles to avoid hangs. let _ = stdout_task.await; let _ = stderr_task.await; // Unique owner of the Arc at this point (both tasks dropped their clones). if let Ok(mutex) = std::sync::Arc::try_unwrap(log) { mutex.into_inner().close().await; } match exit { Some(status) => { let failure = classify::classify_boot_smoke(status.code()); Ok(GateOutcome::failed(failure).with_log_ref(log_ref)) } None => Ok(GateOutcome::passed(PassNote::StayedUp { duration_s: 3 }) .with_log_ref(log_ref)), } } /// Drain `stream` into the shared `LiveLog` (which forwards each chunk to /// the on-disk log file AND broadcasts a `GateLogChunk` event), and return /// the concatenated bytes so the classifier can still operate on the full /// output post-hoc. async fn stream_into_log( stream: Option, log: std::sync::Arc>, ) -> Vec where R: tokio::io::AsyncRead + Unpin + Send + 'static, { let mut total = Vec::new(); let Some(mut s) = stream else { return total }; let mut buf = [0u8; 4096]; loop { match s.read(&mut buf).await { Ok(0) => break, Err(_) => break, Ok(n) => { total.extend_from_slice(&buf[..n]); log.lock().await.write_chunk(&buf[..n]).await; } } } total } /// Spawn a child, drain its stdout/stderr through a `LiveLog`, return the /// combined buffers and exit status. Shared by `cargo_test` (no deadline) /// and ad-hoc callers — `boot_smoke` rolls its own variant because of its /// 3s kill window. async fn stream_child_to_live_log( child: &mut tokio::process::Child, events: EventTx, run_id: GateRunId, log_path: PathBuf, ) -> Result<(Vec, Vec, std::process::ExitStatus)> { let log = std::sync::Arc::new(tokio::sync::Mutex::new( LiveLog::open(events, run_id, log_path).await, )); let stdout_task = tokio::spawn(stream_into_log(child.stdout.take(), log.clone())); let stderr_task = tokio::spawn(stream_into_log(child.stderr.take(), log.clone())); let status = child.wait().await?; let stdout_buf = stdout_task.await.unwrap_or_default(); let stderr_buf = stderr_task.await.unwrap_or_default(); if let Ok(mutex) = std::sync::Arc::try_unwrap(log) { mutex.into_inner().close().await; } Ok((stdout_buf, stderr_buf, status)) } fn gate_log_path(ctx: &GateCtx, gate: GateKind) -> PathBuf { ctx.cfg.logs_root.join(ctx.version.to_string()).join(format!("{}.log", gate.as_str())) } async fn burn_in(ctx: &GateCtx, hours: u32) -> Result { // Check tier_state.burn_in_started_at on this tier; pass if enough time // has elapsed. The clock is started by /promote when a version lands on // the burn-in tier. let started: Option = sqlx::query_scalar( "SELECT burn_in_started_at FROM tier_state WHERE tier = ?", ) .bind(&ctx.tier) .fetch_optional(&ctx.pool) .await? .flatten(); let Some(started) = started else { return Ok(GateOutcome::blocked(GateBlocker::BurnInClockNotStarted)); }; let started = chrono::DateTime::parse_from_rfc3339(&started)?.with_timezone(&Utc); let elapsed = Utc::now() - started; let needed = chrono::Duration::hours(hours as i64); if elapsed >= needed { Ok(GateOutcome::passed(PassNote::BurnInElapsed { hours: elapsed.num_hours() as u32 })) } else { let remaining = (needed - elapsed).num_hours().max(0) as u32; Ok(GateOutcome::blocked(GateBlocker::BurnInRemaining { hours_remaining: remaining, hours_total: hours, })) } } async fn manual_confirm(ctx: &GateCtx) -> Result { // Pass iff a row in gate_runs exists with status='passed' for this // (tier, version, manual_confirm) that was inserted out-of-band by an // operator action. Since the harness inserts the in-flight row itself, // look for a prior confirmation row. let prior_at: Option = sqlx::query_scalar( "SELECT finished_at FROM gate_runs WHERE tier = ? AND version = ? AND gate_kind = 'manual_confirm' AND status = 'passed' ORDER BY id DESC LIMIT 1", ) .bind(&ctx.tier) .bind(&ctx.version) .fetch_optional(&ctx.pool) .await?; match prior_at { Some(at_str) => { let at = chrono::DateTime::parse_from_rfc3339(&at_str) .map(|d| d.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()); Ok(GateOutcome::passed(PassNote::OperatorConfirmed { at })) } None => Ok(GateOutcome::blocked(GateBlocker::AwaitingOperatorConfirmation)), } } /// Write `//.log` containing the gate's /// captured stdout + stderr. Best-effort: any IO error is logged and /// swallowed — a missing log must not turn a passing gate red. /// /// Layout matches §6.8 of launchplan_final.md so the `GET /logs/{version}/{gate}` /// route can find it. We use `ctx.version` (semver, what `gate_runs.version` /// already stores) rather than the git sha; the launchplan uses "sha" in /// prose but switching the on-disk key is a Phase B change after `GitSha` / /// `Version` newtypes land. async fn persist_gate_log(ctx: &GateCtx, gate_kind: GateKind, stdout: &[u8], stderr: &[u8]) { let dir = ctx.cfg.logs_root.join(ctx.version.to_string()); if let Err(e) = tokio::fs::create_dir_all(&dir).await { tracing::warn!(error = %e, dir = %dir.display(), "could not create gate log dir"); return; } let path = dir.join(format!("{}.log", gate_kind.as_str())); let mut body = Vec::with_capacity(stdout.len() + stderr.len() + 64); body.extend_from_slice(b"==== stdout ====\n"); body.extend_from_slice(stdout); if !stdout.last().is_some_and(|b| *b == b'\n') { body.push(b'\n'); } body.extend_from_slice(b"==== stderr ====\n"); body.extend_from_slice(stderr); if let Err(e) = tokio::fs::write(&path, &body).await { tracing::warn!(error = %e, path = %path.display(), "could not write gate log"); } } #[cfg(test)] mod tests { use super::*; use crate::events; use sqlx::sqlite::SqlitePoolOptions; /// burn_in returns a typed Blocked when the clock isn't started; the /// runner persists status='blocked' + outcome_json (the json carries /// blocker.kind = 'burn_in_clock_not_started'). #[tokio::test] async fn burn_in_blocked_persists_typed_outcome() { let pool = SqlitePoolOptions::new() .max_connections(1) .connect("sqlite::memory:") .await .unwrap(); crate::db::migrate(&pool).await.unwrap(); // Topology sync expects a tier row before gate_runs can reference it. sqlx::query("INSERT INTO tiers (name, ord, provisioned, canary) VALUES ('host', 0, 0, 'sequential')") .execute(&pool).await.unwrap(); sqlx::query("INSERT INTO tier_state (tier) VALUES ('host')") .execute(&pool).await.unwrap(); // versions FK target. sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES ('0.1.0', 'abc1234', '2026-01-01T00:00:00Z', '/tmp/x')") .execute(&pool).await.unwrap(); let cfg = std::sync::Arc::new(crate::config::Config::for_tests()); let ctx = GateCtx { pool: pool.clone(), cfg, tier: TierId::new("host"), version: "0.1.0".parse().unwrap(), worktree: std::path::PathBuf::from("/tmp/unused"), events: events::channel(), }; let out = run(&ctx, &Gate::BurnIn { hours: 24 }).await.unwrap(); assert_eq!(out.status_str(), "blocked"); assert!(!out.is_passed()); // Read the persisted row. let row: (Option, Option) = sqlx::query_as( "SELECT status, outcome_json FROM gate_runs ORDER BY id DESC LIMIT 1", ).fetch_one(&pool).await.unwrap(); assert_eq!(row.0.as_deref(), Some("blocked"), "typed status"); let json: serde_json::Value = serde_json::from_str(row.1.as_deref().unwrap()).unwrap(); assert_eq!(json["status"]["kind"], "blocked"); assert_eq!(json["status"]["blocker"]["kind"], "burn_in_clock_not_started"); } /// reset_scratch must drop every non-system schema, not just `public` — /// otherwise migrations that create custom schemas (e.g. tower_sessions) /// collide on the next run. This regressed once (Phase 0) and the fix is /// load-bearing for migration_dry_run. /// /// Gated on `SANDO_TEST_PG_URL` so it only runs where postgres is /// available. Set `SANDO_TEST_PG_URL=postgres:///sando_scratch?host=/var/run/postgresql` /// (or similar) before `cargo test`. #[tokio::test] async fn reset_scratch_drops_all_non_system_schemas() { let Ok(url) = std::env::var("SANDO_TEST_PG_URL") else { eprintln!("skipping: SANDO_TEST_PG_URL not set"); return; }; use sqlx::Executor; use sqlx::postgres::PgPoolOptions; let pool = PgPoolOptions::new().max_connections(1).connect(&url).await.unwrap(); // Plant two non-system schemas + a table in each. pool.execute("DROP SCHEMA IF EXISTS foo CASCADE; CREATE SCHEMA foo; CREATE TABLE foo.t (i int);") .await.unwrap(); pool.execute("DROP SCHEMA IF EXISTS tower_sessions CASCADE; CREATE SCHEMA tower_sessions; CREATE TABLE tower_sessions.session (id text);") .await.unwrap(); pool.close().await; reset_scratch(&url).await.expect("reset_scratch"); let pool = PgPoolOptions::new().max_connections(1).connect(&url).await.unwrap(); let rows: Vec<(String,)> = sqlx::query_as( "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname <> 'information_schema'", ) .fetch_all(&pool) .await .unwrap(); let names: Vec = rows.into_iter().map(|(s,)| s).collect(); // After reset, only `public` should remain among non-system schemas. assert_eq!(names, vec!["public".to_string()], "got: {names:?}"); pool.close().await; } /// Sanity: applying MNW migrations from a *non-existent* dir errors, /// rather than silently no-op'ing. Cheap pure check, no postgres needed /// (the sqlx::Migrator::new constructor itself reads the dir). #[tokio::test] async fn run_migrator_errors_on_missing_dir() { // The first thing run_migrator does is `Migrator::new(dir)`, which // needs a real dir to read migration files from. let res = run_migrator("postgres:///does-not-matter", std::path::Path::new("/nonexistent/sando-test-migrations")).await; assert!(res.is_err()); } }