use crate::error::Result; use crate::state::AppState; use axum::extract::{Path, State, WebSocketUpgrade}; use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::{Json, Router}; use serde::{Deserialize, Serialize}; use sqlx::Row; pub fn router(state: AppState) -> Router { let prom = state.prom.clone(); Router::new() .route("/state", get(get_state)) .route("/promote/{tier}", post(promote)) .route("/rollback/{tier}", post(rollback)) .route("/rebuild", post(rebuild)) .route("/confirm/{tier}", post(confirm)) .route("/backup/fetch", post(backup_fetch)) .route("/logs/{version}/{gate}", get(get_gate_log)) .route("/events", get(events_ws)) .with_state(state) .route("/metrics", get(crate::metrics::render).with_state(prom)) } #[derive(Serialize)] struct StateView { tiers: Vec, } #[derive(Serialize)] struct TierView { name: String, ord: i64, provisioned: bool, canary: String, current_version: Option, previous_version: Option, burn_in_started_at: Option, nodes: Vec, gates: Vec, } #[derive(Serialize)] struct GateView { kind: String, finished_at: Option, /// `'passed' | 'failed' | 'blocked'` or NULL while in-flight. The TUI /// uses this to choose green/red/yellow rendering. status: Option, /// Full typed `GateOutcome` as a JSON object, when present. /// Deserialized lazily by the consumer; sandod doesn't re-parse it. outcome: Option, /// Relative path under `cfg.logs_root` to the persisted stdout/stderr. log_ref: Option, } async fn get_state(State(s): State) -> Result> { let rows = sqlx::query( "SELECT t.name, t.ord, t.provisioned, t.canary, ts.current_version, ts.previous_version, ts.burn_in_started_at FROM tiers t LEFT JOIN tier_state ts ON ts.tier = t.name ORDER BY t.ord", ) .fetch_all(&s.pool) .await?; let mut tiers = Vec::with_capacity(rows.len()); for r in rows { let name: String = r.get("name"); let current_version: Option = r.get("current_version"); let nodes: Vec = sqlx::query_scalar("SELECT name FROM nodes WHERE tier = ? ORDER BY name") .bind(&name) .fetch_all(&s.pool) .await?; // Surface gates for current_version when set, otherwise for the most // recently attempted version on this tier. Without the fallback, a // tier that has never gone green (MM after a build failure, B before // first deploy) exposes no gate detail via /state — debugging required // SSH and direct SQLite access. See sando todo: gate observability. let gate_version: Option = if current_version.is_some() { current_version.clone() } else { sqlx::query_scalar( "SELECT version FROM gate_runs WHERE tier = ? ORDER BY id DESC LIMIT 1", ) .bind(&name) .fetch_optional(&s.pool) .await? }; let gates: Vec = if let Some(ver) = gate_version.as_ref() { // Most recent gate_runs row per gate_kind for (tier, ver). sqlx::query( "SELECT gate_kind, finished_at, status, outcome_json, log_ref FROM gate_runs g WHERE tier = ?1 AND version = ?2 AND id = (SELECT MAX(id) FROM gate_runs WHERE tier = ?1 AND version = ?2 AND gate_kind = g.gate_kind) ORDER BY gate_kind", ) .bind(&name) .bind(ver) .fetch_all(&s.pool) .await? .into_iter() .map(|gr| GateView { kind: gr.get("gate_kind"), finished_at: gr.get("finished_at"), status: gr.get("status"), outcome: gr.get::, _>("outcome_json") .and_then(|s| serde_json::from_str(&s).ok()), log_ref: gr.get("log_ref"), }) .collect() } else { Vec::new() }; tiers.push(TierView { name, ord: r.get("ord"), provisioned: r.get::("provisioned") != 0, canary: r.get("canary"), current_version, previous_version: r.get("previous_version"), burn_in_started_at: r.get("burn_in_started_at"), nodes, gates, }); } Ok(Json(StateView { tiers })) } #[derive(Deserialize, Default)] struct PromoteBody { /// Optional. If absent, defaults to the predecessor tier's `current_version` /// (i.e. promote whatever just finished baking on the previous tier). #[serde(default)] version: Option, #[serde(default)] hotfix: bool, #[serde(default)] reset_burn_in: bool, } async fn promote( State(s): State, Path(tier): Path, body: Option>, ) -> Result> { let body = body.map(|Json(b)| b).unwrap_or_default(); let tier = crate::domain::TierId::new(tier); let idx = s.topo.tiers.iter().position(|t| t.name == tier) .ok_or(crate::error::Error::NotFound)?; if idx == 0 { return Err(crate::error::Error::GateBlocked( "cannot /promote to the first tier; use /rebuild".into(), )); } let target = &s.topo.tiers[idx]; let source = &s.topo.tiers[idx - 1]; // Resolve version: explicit if given, else the source tier's current. let version_str = match body.version { Some(v) => v, None => sqlx::query_scalar::<_, Option>( "SELECT current_version FROM tier_state WHERE tier = ?", ) .bind(&source.name) .fetch_optional(&s.pool).await .map_err(crate::error::Error::Db)? .flatten() .ok_or_else(|| crate::error::Error::GateBlocked( format!("no version specified and tier {} has no current_version", source.name), ))?, }; let version = crate::domain::Version::parse(&version_str) .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?; // 1. Predecessor must have all of its gates green for this version (with // optional hotfix override that skips burn_in). let pending = unsatisfied_gates(&s.pool, source.name.as_str(), &version_str, body.hotfix).await?; if !pending.is_empty() { return Err(crate::error::Error::GateBlocked(format!( "{} gate(s) not satisfied on tier {}: {}", pending.len(), source.name, pending.join(", "), ))); } // 2. Look up the artifact for this version. let bin: Option<(String,)> = sqlx::query_as( "SELECT artifact_path FROM versions WHERE version = ?", ) .bind(&version) .fetch_optional(&s.pool) .await .map_err(crate::error::Error::Db)?; let Some((bin,)) = bin else { return Err(crate::error::Error::NotFound); }; let bin_path = std::path::PathBuf::from(bin); // `artifact_path` is the primary binary; the staged release dir is its parent. let staged_dir = bin_path.parent() .ok_or_else(|| crate::error::Error::Other(anyhow::anyhow!("artifact_path has no parent")))? .to_path_buf(); // 3. Deploy to each node. Sequential canary is the only policy // implemented in v0; parallel is a one-line change once we trust the // sequential path. for node in &target.nodes { let started = chrono::Utc::now().to_rfc3339(); crate::events::emit(&s.events, crate::events::Event::DeployStart { tier: target.name.clone(), node: node.name.clone(), version: version.clone(), }); let executor = s.executors.get(&node.name).cloned() .unwrap_or_else(|| crate::state::build_executor(node)); let result = crate::deploy::deploy_node(executor.as_ref(), node, &version_str, &staged_dir, s.cfg.primary_bin()).await; let finished = chrono::Utc::now().to_rfc3339(); let (outcome_obj, err_for_propagation) = match result { Ok(_) => (crate::outcome::DeployOutcome::ok(), None), Err(e) => { let msg = format!("{e:#}"); let kind = crate::classify::classify_deploy_error(&msg); (crate::outcome::DeployOutcome::failed(kind), Some(e)) } }; let outcome_json = serde_json::to_string(&outcome_obj) .unwrap_or_else(|e| format!("{{\"_serialize_error\":{e:?}}}")); sqlx::query( "INSERT INTO deploys (version, tier, node, started_at, finished_at, outcome, outcome_json, hotfix, reset_burn_in) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&version).bind(&target.name).bind(&node.name) .bind(&started).bind(&finished).bind(outcome_obj.status_str()) .bind(&outcome_json) .bind(body.hotfix as i64).bind(body.reset_burn_in as i64) .execute(&s.pool).await.map_err(crate::error::Error::Db)?; if let Some(e) = err_for_propagation { let crate::outcome::DeployStatus::Failed { failure } = outcome_obj.status else { unreachable!("err_for_propagation is Some iff status is Failed"); }; tracing::error!( tier = %target.name, node = %node.name, version = %version, failure = failure.summary(), "deploy failed; current symlink left intact, tier_state not advanced" ); crate::events::emit(&s.events, crate::events::Event::DeployFailed { tier: target.name.clone(), node: node.name.clone(), version: version.clone(), failure, }); return Err(crate::error::Error::Other(e)); } crate::events::emit(&s.events, crate::events::Event::DeployOk { tier: target.name.clone(), node: node.name.clone(), version: version.clone(), }); } // 4. Advance tier_state. burn_in_started_at is set to now so the target // tier's burn_in gate starts ticking. reset_burn_in on the *source* // tier nulls its clock only when the operator explicitly asked for it. let prev: Option = sqlx::query_scalar( "SELECT current_version FROM tier_state WHERE tier = ?", ) .bind(&target.name) .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?.flatten(); sqlx::query( "UPDATE tier_state SET previous_version = ?, current_version = ?, burn_in_started_at = ? WHERE tier = ?", ) .bind(prev) .bind(&version) .bind(chrono::Utc::now().to_rfc3339()) .bind(&target.name) .execute(&s.pool).await.map_err(crate::error::Error::Db)?; if body.reset_burn_in { sqlx::query("UPDATE tier_state SET burn_in_started_at = NULL WHERE tier = ?") .bind(&source.name) .execute(&s.pool).await.map_err(crate::error::Error::Db)?; } crate::events::emit(&s.events, crate::events::Event::PromoteComplete { tier: target.name.clone(), version: version.clone(), }); tracing::info!( version = %version, tier = %target.name, hotfix = body.hotfix, reset_burn_in = body.reset_burn_in, "promote complete", ); Ok(Json(serde_json::json!({ "tier": target.name, "version": version, "nodes_deployed": target.nodes.iter().map(|n| n.name.clone()).collect::>(), }))) } /// Returns the kinds of gates on `tier` that have not (yet) passed for /// `version`. `hotfix` suppresses the burn_in requirement only. async fn unsatisfied_gates( pool: &sqlx::SqlitePool, tier: &str, version: &str, hotfix: bool, ) -> std::result::Result, crate::error::Error> { // We need the configured gate list for the tier to know what *should* // pass. The route handler has Topology in hand and could pass it in, but // the DB also captures it implicitly via gate_runs rows. Simplest correct // answer: re-read from topology via tier name; the caller has it. // For now we inspect the latest gate_runs. let rows: Vec<(String, Option)> = sqlx::query_as( "SELECT gate_kind, status FROM gate_runs g WHERE tier = ?1 AND version = ?2 AND id = (SELECT MAX(id) FROM gate_runs WHERE tier = ?1 AND version = ?2 AND gate_kind = g.gate_kind)", ) .bind(tier).bind(version) .fetch_all(pool).await.map_err(crate::error::Error::Db)?; let mut bad = Vec::new(); for (kind, status) in rows { if hotfix && kind == "burn_in" { continue; } // NULL status (in-flight) and any non-passed status both count as // unsatisfied; only an explicit 'passed' clears the gate. if status.as_deref() != Some("passed") { bad.push(kind); } } Ok(bad) } async fn rollback( State(s): State, Path(tier): Path, ) -> Result> { let tier = crate::domain::TierId::new(tier); let target = s.topo.tiers.iter().find(|t| t.name == tier) .ok_or(crate::error::Error::NotFound)?; let row: Option<(Option, Option)> = sqlx::query_as( "SELECT current_version, previous_version FROM tier_state WHERE tier = ?", ) .bind(&tier) .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?; let (Some(current_str), Some(previous_str)) = row.unwrap_or((None, None)) else { return Err(crate::error::Error::GateBlocked( "no previous_version to roll back to".into(), )); }; let current = crate::domain::Version::parse(¤t_str) .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?; let previous = crate::domain::Version::parse(&previous_str) .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?; let bin: Option<(String,)> = sqlx::query_as( "SELECT artifact_path FROM versions WHERE version = ?", ) .bind(&previous) .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?; let Some((bin,)) = bin else { return Err(crate::error::Error::GateBlocked( format!("previous version {previous} has no artifact_path; rollback impossible"), )); }; let bin_path = std::path::PathBuf::from(bin); let staged_dir = bin_path.parent() .ok_or_else(|| crate::error::Error::Other(anyhow::anyhow!("artifact_path has no parent")))? .to_path_buf(); for node in &target.nodes { let executor = s.executors.get(&node.name).cloned() .unwrap_or_else(|| crate::state::build_executor(node)); crate::deploy::deploy_node(executor.as_ref(), node, &previous_str, &staged_dir, s.cfg.primary_bin()) .await .map_err(crate::error::Error::Other)?; } sqlx::query( "UPDATE tier_state SET current_version = ?, previous_version = ?, burn_in_started_at = NULL WHERE tier = ?", ) .bind(&previous) .bind(¤t) .bind(&tier) .execute(&s.pool).await.map_err(crate::error::Error::Db)?; tracing::warn!(tier = %tier, from = %current, to = %previous, "rollback complete"); crate::events::emit(&s.events, crate::events::Event::Rollback { tier: tier.clone(), from: current.clone(), to: previous.clone(), }); Ok(Json(serde_json::json!({ "tier": tier, "rolled_back_from": current, "now_running": previous, }))) } #[derive(Deserialize, Default)] struct RebuildBody { /// Specific sha to build. If absent, resolve `topo.repo.branch` from the bare repo. #[serde(default)] sha: Option, } async fn rebuild( State(s): State, body: Option>, ) -> Result> { let body = body.map(|Json(b)| b).unwrap_or_default(); let sha = match body.sha { Some(s) => s, None => crate::git::resolve_ref( std::path::Path::new(&s.topo.repo.bare_path), &s.topo.repo.branch, ) .await .map_err(crate::error::Error::Other)?, }; // Boundary parse: a sha entering Sando must be hex of plausible length. // The build pipeline downstream only ever sees `GitSha`. let sha = crate::domain::GitSha::parse(&sha) .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?; tracing::info!(sha = %sha, "rebuild requested"); crate::events::emit(&s.events, crate::events::Event::RebuildRequested { sha: sha.clone() }); // Latest /rebuild wins: abort any in-flight build before spawning a new // one. Aborting drops the spawned task's future, which drops any // tokio::process::Child it owns; with `kill_on_drop(true)` set on the // cargo Command, SIGKILL propagates to cargo + its rustc children. let mut slot = s.active_build.lock().await; if let Some(prev) = slot.take() { if !prev.is_finished() { tracing::warn!("aborting in-flight build for newer /rebuild request"); crate::events::emit(&s.events, crate::events::Event::BuildAborted { sha_aborted: sha.clone() }); prev.abort(); } } let pool = s.pool.clone(); let cfg = s.cfg.clone(); let topo = s.topo.clone(); let events_for_task = s.events.clone(); let sha_for_task = sha.clone(); let sha_response = sha.to_string(); let handle = tokio::spawn(async move { if let Err(e) = crate::build::build_and_run_host(pool, cfg, topo, sha_for_task.clone(), events_for_task).await { tracing::error!(sha = %sha_for_task, error = %e, "rebuild pipeline failed"); } }); *slot = Some(handle.abort_handle()); Ok(Json(serde_json::json!({ "accepted": true, "sha": sha_response }))) } async fn confirm( State(s): State, Path(tier): Path, ) -> Result> { // Operator-driven satisfaction of a `manual_confirm` gate. Looks up the // pending version (current MM version, or the tier's own if non-mm) and // inserts a passing gate_runs row so /promote can advance. let tier = crate::domain::TierId::new(tier); let target = s.topo.tiers.iter().find(|t| t.name == tier) .ok_or(crate::error::Error::NotFound)?; let version_str: Option = sqlx::query_scalar( "SELECT current_version FROM tier_state WHERE tier = ?", ) .bind(&target.name) .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?.flatten(); let version_str = version_str.ok_or_else(|| crate::error::Error::GateBlocked( format!("tier {tier} has no current_version; nothing to confirm"), ))?; let version = crate::domain::Version::parse(&version_str) .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?; let now = chrono::Utc::now().to_rfc3339(); let outcome = crate::outcome::GateOutcome::passed( crate::outcome::PassNote::OperatorConfirmed { at: chrono::Utc::now() }, ); let outcome_json = serde_json::to_string(&outcome) .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?; sqlx::query( "INSERT INTO gate_runs (version, tier, gate_kind, started_at, finished_at, status, outcome_json) VALUES (?, ?, 'manual_confirm', ?, ?, 'passed', ?)", ) .bind(&version).bind(&target.name).bind(&now).bind(&now).bind(&outcome_json) .execute(&s.pool).await.map_err(crate::error::Error::Db)?; tracing::info!(tier = %tier, version = %version, "manual_confirm recorded"); crate::events::emit(&s.events, crate::events::Event::ManualConfirm { tier: tier.clone(), version: version.clone(), }); Ok(Json(serde_json::json!({ "tier": tier, "version": version }))) } async fn backup_fetch(State(s): State) -> Result> { let fb = crate::backup::fetch(&s.pool, &s.cfg, &s.topo) .await .map_err(crate::error::Error::Other)?; crate::events::emit(&s.events, crate::events::Event::BackupFetched { source: fb.source.clone(), byte_size: fb.byte_size.unwrap_or(0), }); Ok(Json(serde_json::json!({ "source": fb.source, "local_path": fb.local_path, "byte_size": fb.byte_size, }))) } async fn get_gate_log( State(s): State, Path((version, gate)): Path<(String, String)>, ) -> Result { // Guard against `..` / absolute paths — both segments must be a single // safe component. Without this, `GET /logs/..%2Fetc/passwd` would escape // logs_root. fn safe(seg: &str) -> bool { !seg.is_empty() && !seg.contains('/') && !seg.contains('\\') && seg != "." && seg != ".." } if !safe(&version) || !safe(&gate) { return Err(crate::error::Error::NotFound); } let path = s.cfg.logs_root.join(&version).join(format!("{gate}.log")); match tokio::fs::read(&path).await { Ok(bytes) => Ok(( [(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")], bytes, ).into_response()), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(crate::error::Error::NotFound), Err(e) => Err(crate::error::Error::Other(e.into())), } } async fn events_ws(ws: WebSocketUpgrade, State(s): State) -> impl IntoResponse { use axum::extract::ws::Message; use tokio::sync::broadcast::error::RecvError; ws.on_upgrade(move |mut socket| async move { let mut rx = s.events.subscribe(); loop { match rx.recv().await { Ok(env) => { let json = match serde_json::to_string(&env) { Ok(s) => s, Err(e) => { tracing::warn!(error = %e, "events ws: serialize failed"); continue; } }; if socket.send(Message::Text(json.into())).await.is_err() { break; } } Err(RecvError::Lagged(n)) => { let _ = socket.send(Message::Text( format!(r#"{{"kind":"lagged","skipped":{n}}}"#).into(), )).await; } Err(RecvError::Closed) => break, } } }) } #[cfg(test)] mod tests { use super::*; use crate::config::Config; use crate::topology::{BackupConfig, CanaryPolicy, Gate, Node, RepoConfig, Tier, Topology}; use axum::body::Body; use axum::http::{Request, StatusCode}; use http_body_util::BodyExt; use metrics_exporter_prometheus::PrometheusBuilder; use sqlx::sqlite::SqlitePoolOptions; use sqlx::SqlitePool; use std::path::PathBuf; use std::sync::Arc; use tower::ServiceExt; async fn fresh_pool() -> SqlitePool { let pool = SqlitePoolOptions::new() .max_connections(1) .connect("sqlite::memory:") .await .unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); pool } /// Two-tier topology used by the route tests: mm (provisioned, no nodes) /// → a (provisioned, one local node). Mirrors the production shape /// without involving real ssh / postgres. fn test_topo() -> Topology { Topology { repo: RepoConfig { bare_path: "/tmp/test.git".into(), branch: "main".into() }, backup: BackupConfig { source: "file:///tmp/test-backup.sql".into(), local_path: "/tmp/local-backup.sql".into(), }, tiers: vec![ Tier { name: "host".into(), provisioned: true, gates: vec![], canary: CanaryPolicy::Sequential, nodes: vec![], }, Tier { name: "a".into(), provisioned: true, gates: vec![Gate::BootSmoke], canary: CanaryPolicy::Sequential, nodes: vec![Node { name: "a-local".into(), ssh_target: "local".into(), release_root: "/tmp/a-node".into(), service_name: "makenotwork.service".into(), actuate: crate::topology::default_actuate(), observe: crate::topology::default_observe(), }], }, ], } } fn test_cfg() -> Config { Config { listen: "127.0.0.1:0".into(), db_path: PathBuf::from(":memory:"), topology_path: PathBuf::from("/tmp/test-sando.toml"), workdir: PathBuf::from("/tmp/sando-work"), release_root: PathBuf::from("/tmp/sando-releases"), scratch_db_url: None, bin_names: vec!["makenotwork".into()], logs_root: PathBuf::from("/tmp/sando-logs"), release_contents: vec![], } } async fn test_state() -> AppState { let pool = fresh_pool().await; // Seed tier rows so FKs on tier_state / gate_runs are satisfied. for (i, name) in ["host", "a"].iter().enumerate() { sqlx::query( "INSERT INTO tiers (name, ord, provisioned, canary) VALUES (?, ?, 1, 'sequential')", ) .bind(name).bind(i as i64).execute(&pool).await.unwrap(); sqlx::query("INSERT INTO tier_state (tier) VALUES (?)") .bind(name).execute(&pool).await.unwrap(); } // Don't call install_recorder in tests — it touches a process-global // and conflicts when tests run in parallel. let prom = PrometheusBuilder::new().build_recorder().handle(); let topo = test_topo(); let executors = Arc::new(crate::state::build_executors(&topo)); AppState { pool, topo: Arc::new(topo), cfg: Arc::new(test_cfg()), prom, active_build: Arc::new(tokio::sync::Mutex::new(None)), events: crate::events::channel(), executors, } } async fn body_string(resp: axum::response::Response) -> String { let bytes = resp.into_body().collect().await.unwrap().to_bytes(); String::from_utf8(bytes.to_vec()).unwrap() } /// Insert the FK prerequisites for inserting gate_runs/tier_state rows. async fn seed(pool: &SqlitePool, tier: &str, version: &str) { sqlx::query("INSERT INTO tiers (name, ord, provisioned, canary) VALUES (?, 0, 1, 'sequential') ON CONFLICT DO NOTHING") .bind(tier).execute(pool).await.unwrap(); sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES (?, 'sha', datetime('now'), '/tmp/x') ON CONFLICT DO NOTHING") .bind(version).execute(pool).await.unwrap(); sqlx::query("INSERT INTO tier_state (tier, current_version) VALUES (?, NULL) ON CONFLICT DO NOTHING") .bind(tier).execute(pool).await.unwrap(); } async fn insert_gate(pool: &SqlitePool, tier: &str, version: &str, kind: &str, passed: i64) { let status = if passed == 1 { "passed" } else { "failed" }; sqlx::query( "INSERT INTO gate_runs (version, tier, gate_kind, started_at, finished_at, status) \ VALUES (?, ?, ?, datetime('now'), datetime('now'), ?)", ) .bind(version).bind(tier).bind(kind).bind(status) .execute(pool).await.unwrap(); } // ---- unsatisfied_gates ---- #[tokio::test] async fn unsatisfied_gates_empty_when_no_runs() { // No gate_runs rows means there's nothing to check — caller treats // empty as "all green" which is correct iff the predecessor tier // has no configured gates. The topology validation is upstream. let pool = fresh_pool().await; seed(&pool, "host", "0.8.12").await; let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap(); assert_eq!(pending, Vec::::new()); } #[tokio::test] async fn unsatisfied_gates_flags_failed_kind() { let pool = fresh_pool().await; seed(&pool, "host", "0.8.12").await; insert_gate(&pool, "host", "0.8.12", "cargo_test", 0).await; insert_gate(&pool, "host", "0.8.12", "boot_smoke", 1).await; let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap(); assert_eq!(pending, vec!["cargo_test".to_string()]); } #[tokio::test] async fn unsatisfied_gates_latest_row_wins() { // Two runs of the same gate; only the latest counts. A flap from // red to green should clear the pending entry. let pool = fresh_pool().await; seed(&pool, "host", "0.8.12").await; insert_gate(&pool, "host", "0.8.12", "cargo_test", 0).await; insert_gate(&pool, "host", "0.8.12", "cargo_test", 1).await; let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap(); assert!(pending.is_empty()); } #[tokio::test] async fn unsatisfied_gates_hotfix_skips_only_burn_in() { // hotfix=true is supposed to bypass burn_in failures specifically — // not cargo_test, not boot_smoke. Lock the semantic so a future // rename doesn't accidentally widen it. let pool = fresh_pool().await; seed(&pool, "a", "0.8.12").await; insert_gate(&pool, "a", "0.8.12", "burn_in", 0).await; insert_gate(&pool, "a", "0.8.12", "cargo_test", 0).await; let normal = unsatisfied_gates(&pool, "a", "0.8.12", false).await.unwrap(); let mut sorted = normal.clone(); sorted.sort(); assert_eq!(sorted, vec!["burn_in".to_string(), "cargo_test".to_string()]); let with_hotfix = unsatisfied_gates(&pool, "a", "0.8.12", true).await.unwrap(); assert_eq!(with_hotfix, vec!["cargo_test".to_string()]); } #[tokio::test] async fn unsatisfied_gates_ignores_other_tiers_and_versions() { let pool = fresh_pool().await; seed(&pool, "host", "0.8.12").await; seed(&pool, "host", "0.8.11").await; seed(&pool, "a", "0.8.12").await; // Mark mm/0.8.12 cargo_test failing, but unrelated tiers/versions // shouldn't pollute the query. insert_gate(&pool, "host", "0.8.12", "cargo_test", 0).await; insert_gate(&pool, "a", "0.8.12", "cargo_test", 0).await; insert_gate(&pool, "host", "0.8.11", "cargo_test", 0).await; let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap(); assert_eq!(pending, vec!["cargo_test".to_string()]); } #[tokio::test] async fn unsatisfied_gates_null_status_is_treated_as_failing() { // An in-flight gate (started_at set, finished_at + status NULL) // should NOT be treated as green. Otherwise a race could promote // before the gate concludes. let pool = fresh_pool().await; seed(&pool, "host", "0.8.12").await; sqlx::query( "INSERT INTO gate_runs (version, tier, gate_kind, started_at) \ VALUES ('0.8.12', 'host', 'cargo_test', datetime('now'))", ) .execute(&pool).await.unwrap(); let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap(); assert_eq!(pending, vec!["cargo_test".to_string()]); } // ---- /confirm/{tier} ---- #[tokio::test] async fn confirm_rejects_when_tier_has_no_current_version() { // tier_state.a.current_version is NULL by default. /confirm has // nothing to confirm against → GateBlocked (400). let state = test_state().await; let app = router(state.clone()); let resp = app .oneshot( Request::builder() .method("POST") .uri("/confirm/a") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::CONFLICT); let body = body_string(resp).await; assert!(body.contains("no current_version"), "got: {body}"); } #[tokio::test] async fn confirm_accepts_when_current_version_set_and_inserts_row() { let state = test_state().await; // Seed a version + advance tier a's state to it. sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES ('0.8.12','sha',datetime('now'),'/tmp/x')") .execute(&state.pool).await.unwrap(); sqlx::query("UPDATE tier_state SET current_version = '0.8.12' WHERE tier = 'a'") .execute(&state.pool).await.unwrap(); let app = router(state.clone()); let resp = app .oneshot( Request::builder() .method("POST") .uri("/confirm/a") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::OK); let body = body_string(resp).await; assert!(body.contains("\"tier\":\"a\"")); assert!(body.contains("\"version\":\"0.8.12\"")); // A passing gate_runs row was inserted. let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM gate_runs WHERE tier='a' AND gate_kind='manual_confirm' AND status='passed'", ) .fetch_one(&state.pool) .await .unwrap(); assert_eq!(count.0, 1); } #[tokio::test] async fn confirm_404s_for_unknown_tier() { let state = test_state().await; let app = router(state); let resp = app .oneshot( Request::builder() .method("POST") .uri("/confirm/zzzz") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); } // ---- /promote/{tier} default-version resolution ---- #[tokio::test] async fn promote_to_first_tier_is_rejected() { // tier 0 is host — you /rebuild, not /promote. let state = test_state().await; let app = router(state); let resp = app .oneshot( Request::builder() .method("POST") .uri("/promote/host") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::CONFLICT); let body = body_string(resp).await; assert!(body.contains("cannot /promote to the first tier"), "got: {body}"); } #[tokio::test] async fn promote_without_body_and_no_predecessor_version_errors() { // tier a has no body version supplied AND its predecessor mm has // current_version=NULL. Should fail before any deploy. let state = test_state().await; let app = router(state); let resp = app .oneshot( Request::builder() .method("POST") .uri("/promote/a") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::CONFLICT); let body = body_string(resp).await; assert!( body.contains("no version specified") || body.contains("no current_version"), "got: {body}" ); } #[tokio::test] async fn promote_with_explicit_version_but_missing_artifact_404s() { // Explicit version supplied, gates trivially pass (mm has none in // test_topo), but `versions` table has no row → 404. let state = test_state().await; let app = router(state); let resp = app .oneshot( Request::builder() .method("POST") .uri("/promote/a") .header("content-type", "application/json") .body(Body::from(r#"{"version":"9.9.9"}"#)) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); } // ---- GET /logs/{version}/{gate} ---- async fn state_with_logs_root(logs_root: PathBuf) -> AppState { let mut s = test_state().await; let mut cfg = (*s.cfg).clone(); cfg.logs_root = logs_root; s.cfg = Arc::new(cfg); s } #[tokio::test] async fn get_gate_log_returns_file_contents() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().join("0.9.5"); tokio::fs::create_dir_all(&dir).await.unwrap(); tokio::fs::write(dir.join("cargo_test.log"), b"hello sandod\n").await.unwrap(); let state = state_with_logs_root(tmp.path().to_path_buf()).await; let app = router(state); let resp = app .oneshot(Request::builder().uri("/logs/0.9.5/cargo_test").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(resp.status(), StatusCode::OK); assert_eq!(body_string(resp).await, "hello sandod\n"); } #[tokio::test] async fn get_gate_log_404s_when_missing() { let tmp = tempfile::tempdir().unwrap(); let state = state_with_logs_root(tmp.path().to_path_buf()).await; let app = router(state); let resp = app .oneshot(Request::builder().uri("/logs/0.9.5/cargo_test").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); } /// Path-traversal guard: a `..` segment must not escape logs_root. /// axum's `{name}` param already rejects a literal `/` in the value, but /// `..` as a whole segment is structurally valid and must be blocked at /// the handler. #[tokio::test] async fn get_gate_log_rejects_dotdot_segments() { let tmp = tempfile::tempdir().unwrap(); let state = state_with_logs_root(tmp.path().to_path_buf()).await; let app = router(state); let resp = app .oneshot(Request::builder().uri("/logs/../etc/passwd").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); } }