Skip to main content

max / makenotwork

38.2 KB · 994 lines History Blame Raw
1 use crate::error::Result;
2 use crate::state::AppState;
3 use axum::extract::{Path, State, WebSocketUpgrade};
4 use axum::response::IntoResponse;
5 use axum::routing::{get, post};
6 use axum::{Json, Router};
7 use serde::{Deserialize, Serialize};
8 use sqlx::Row;
9
10 pub fn router(state: AppState) -> Router {
11 let prom = state.prom.clone();
12 Router::new()
13 .route("/state", get(get_state))
14 .route("/promote/{tier}", post(promote))
15 .route("/rollback/{tier}", post(rollback))
16 .route("/rebuild", post(rebuild))
17 .route("/confirm/{tier}", post(confirm))
18 .route("/backup/fetch", post(backup_fetch))
19 .route("/logs/{version}/{gate}", get(get_gate_log))
20 .route("/events", get(events_ws))
21 .with_state(state)
22 .route("/metrics", get(crate::metrics::render).with_state(prom))
23 }
24
25 #[derive(Serialize)]
26 struct StateView {
27 tiers: Vec<TierView>,
28 }
29
30 #[derive(Serialize)]
31 struct TierView {
32 name: String,
33 ord: i64,
34 provisioned: bool,
35 canary: String,
36 current_version: Option<String>,
37 previous_version: Option<String>,
38 burn_in_started_at: Option<String>,
39 nodes: Vec<String>,
40 gates: Vec<GateView>,
41 }
42
43 #[derive(Serialize)]
44 struct GateView {
45 kind: String,
46 finished_at: Option<String>,
47 /// `'passed' | 'failed' | 'blocked'` or NULL while in-flight. The TUI
48 /// uses this to choose green/red/yellow rendering.
49 status: Option<String>,
50 /// Full typed `GateOutcome` as a JSON object, when present.
51 /// Deserialized lazily by the consumer; sandod doesn't re-parse it.
52 outcome: Option<serde_json::Value>,
53 /// Relative path under `cfg.logs_root` to the persisted stdout/stderr.
54 log_ref: Option<String>,
55 }
56
57 async fn get_state(State(s): State<AppState>) -> Result<Json<StateView>> {
58 let rows = sqlx::query(
59 "SELECT t.name, t.ord, t.provisioned, t.canary,
60 ts.current_version, ts.previous_version, ts.burn_in_started_at
61 FROM tiers t
62 LEFT JOIN tier_state ts ON ts.tier = t.name
63 ORDER BY t.ord",
64 )
65 .fetch_all(&s.pool)
66 .await?;
67
68 let mut tiers = Vec::with_capacity(rows.len());
69 for r in rows {
70 let name: String = r.get("name");
71 let current_version: Option<String> = r.get("current_version");
72
73 let nodes: Vec<String> = sqlx::query_scalar("SELECT name FROM nodes WHERE tier = ? ORDER BY name")
74 .bind(&name)
75 .fetch_all(&s.pool)
76 .await?;
77
78 // Surface gates for current_version when set, otherwise for the most
79 // recently attempted version on this tier. Without the fallback, a
80 // tier that has never gone green (MM after a build failure, B before
81 // first deploy) exposes no gate detail via /state — debugging required
82 // SSH and direct SQLite access. See sando todo: gate observability.
83 let gate_version: Option<String> = if current_version.is_some() {
84 current_version.clone()
85 } else {
86 sqlx::query_scalar(
87 "SELECT version FROM gate_runs WHERE tier = ?
88 ORDER BY id DESC LIMIT 1",
89 )
90 .bind(&name)
91 .fetch_optional(&s.pool)
92 .await?
93 };
94
95 let gates: Vec<GateView> = if let Some(ver) = gate_version.as_ref() {
96 // Most recent gate_runs row per gate_kind for (tier, ver).
97 sqlx::query(
98 "SELECT gate_kind, finished_at, status, outcome_json, log_ref
99 FROM gate_runs g
100 WHERE tier = ?1 AND version = ?2
101 AND id = (SELECT MAX(id) FROM gate_runs
102 WHERE tier = ?1 AND version = ?2 AND gate_kind = g.gate_kind)
103 ORDER BY gate_kind",
104 )
105 .bind(&name)
106 .bind(ver)
107 .fetch_all(&s.pool)
108 .await?
109 .into_iter()
110 .map(|gr| GateView {
111 kind: gr.get("gate_kind"),
112 finished_at: gr.get("finished_at"),
113 status: gr.get("status"),
114 outcome: gr.get::<Option<String>, _>("outcome_json")
115 .and_then(|s| serde_json::from_str(&s).ok()),
116 log_ref: gr.get("log_ref"),
117 })
118 .collect()
119 } else {
120 Vec::new()
121 };
122
123 tiers.push(TierView {
124 name,
125 ord: r.get("ord"),
126 provisioned: r.get::<i64, _>("provisioned") != 0,
127 canary: r.get("canary"),
128 current_version,
129 previous_version: r.get("previous_version"),
130 burn_in_started_at: r.get("burn_in_started_at"),
131 nodes,
132 gates,
133 });
134 }
135
136 Ok(Json(StateView { tiers }))
137 }
138
139 #[derive(Deserialize, Default)]
140 struct PromoteBody {
141 /// Optional. If absent, defaults to the predecessor tier's `current_version`
142 /// (i.e. promote whatever just finished baking on the previous tier).
143 #[serde(default)]
144 version: Option<String>,
145 #[serde(default)]
146 hotfix: bool,
147 #[serde(default)]
148 reset_burn_in: bool,
149 }
150
151 async fn promote(
152 State(s): State<AppState>,
153 Path(tier): Path<String>,
154 body: Option<Json<PromoteBody>>,
155 ) -> Result<Json<serde_json::Value>> {
156 let body = body.map(|Json(b)| b).unwrap_or_default();
157 let tier = crate::domain::TierId::new(tier);
158 let idx = s.topo.tiers.iter().position(|t| t.name == tier)
159 .ok_or(crate::error::Error::NotFound)?;
160 if idx == 0 {
161 return Err(crate::error::Error::GateBlocked(
162 "cannot /promote to the first tier; use /rebuild".into(),
163 ));
164 }
165 let target = &s.topo.tiers[idx];
166 let source = &s.topo.tiers[idx - 1];
167
168 // Resolve version: explicit if given, else the source tier's current.
169 let version_str = match body.version {
170 Some(v) => v,
171 None => sqlx::query_scalar::<_, Option<String>>(
172 "SELECT current_version FROM tier_state WHERE tier = ?",
173 )
174 .bind(&source.name)
175 .fetch_optional(&s.pool).await
176 .map_err(crate::error::Error::Db)?
177 .flatten()
178 .ok_or_else(|| crate::error::Error::GateBlocked(
179 format!("no version specified and tier {} has no current_version", source.name),
180 ))?,
181 };
182 let version = crate::domain::Version::parse(&version_str)
183 .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?;
184
185 // 1. Predecessor must have all of its gates green for this version (with
186 // optional hotfix override that skips burn_in).
187 let pending = unsatisfied_gates(&s.pool, source.name.as_str(), &version_str, body.hotfix).await?;
188 if !pending.is_empty() {
189 return Err(crate::error::Error::GateBlocked(format!(
190 "{} gate(s) not satisfied on tier {}: {}",
191 pending.len(),
192 source.name,
193 pending.join(", "),
194 )));
195 }
196
197 // 2. Look up the artifact for this version.
198 let bin: Option<(String,)> = sqlx::query_as(
199 "SELECT artifact_path FROM versions WHERE version = ?",
200 )
201 .bind(&version)
202 .fetch_optional(&s.pool)
203 .await
204 .map_err(crate::error::Error::Db)?;
205 let Some((bin,)) = bin else {
206 return Err(crate::error::Error::NotFound);
207 };
208 let bin_path = std::path::PathBuf::from(bin);
209 // `artifact_path` is the primary binary; the staged release dir is its parent.
210 let staged_dir = bin_path.parent()
211 .ok_or_else(|| crate::error::Error::Other(anyhow::anyhow!("artifact_path has no parent")))?
212 .to_path_buf();
213
214 // 3. Deploy to each node. Sequential canary is the only policy
215 // implemented in v0; parallel is a one-line change once we trust the
216 // sequential path.
217 for node in &target.nodes {
218 let started = chrono::Utc::now().to_rfc3339();
219 crate::events::emit(&s.events, crate::events::Event::DeployStart {
220 tier: target.name.clone(), node: node.name.clone(), version: version.clone(),
221 });
222 let executor = s.executors.get(&node.name).cloned()
223 .unwrap_or_else(|| crate::state::build_executor(node));
224 let result = crate::deploy::deploy_node(executor.as_ref(), node, &version_str, &staged_dir, s.cfg.primary_bin()).await;
225 let finished = chrono::Utc::now().to_rfc3339();
226 let (outcome_obj, err_for_propagation) = match result {
227 Ok(_) => (crate::outcome::DeployOutcome::ok(), None),
228 Err(e) => {
229 let msg = format!("{e:#}");
230 let kind = crate::classify::classify_deploy_error(&msg);
231 (crate::outcome::DeployOutcome::failed(kind), Some(e))
232 }
233 };
234 let outcome_json = serde_json::to_string(&outcome_obj)
235 .unwrap_or_else(|e| format!("{{\"_serialize_error\":{e:?}}}"));
236 sqlx::query(
237 "INSERT INTO deploys (version, tier, node, started_at, finished_at, outcome, outcome_json, hotfix, reset_burn_in)
238 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
239 )
240 .bind(&version).bind(&target.name).bind(&node.name)
241 .bind(&started).bind(&finished).bind(outcome_obj.status_str())
242 .bind(&outcome_json)
243 .bind(body.hotfix as i64).bind(body.reset_burn_in as i64)
244 .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
245 if let Some(e) = err_for_propagation {
246 let crate::outcome::DeployStatus::Failed { failure } = outcome_obj.status else {
247 unreachable!("err_for_propagation is Some iff status is Failed");
248 };
249 tracing::error!(
250 tier = %target.name, node = %node.name, version = %version,
251 failure = failure.summary(),
252 "deploy failed; current symlink left intact, tier_state not advanced"
253 );
254 crate::events::emit(&s.events, crate::events::Event::DeployFailed {
255 tier: target.name.clone(), node: node.name.clone(),
256 version: version.clone(), failure,
257 });
258 return Err(crate::error::Error::Other(e));
259 }
260 crate::events::emit(&s.events, crate::events::Event::DeployOk {
261 tier: target.name.clone(), node: node.name.clone(), version: version.clone(),
262 });
263 }
264
265 // 4. Advance tier_state. burn_in_started_at is set to now so the target
266 // tier's burn_in gate starts ticking. reset_burn_in on the *source*
267 // tier nulls its clock only when the operator explicitly asked for it.
268 let prev: Option<String> = sqlx::query_scalar(
269 "SELECT current_version FROM tier_state WHERE tier = ?",
270 )
271 .bind(&target.name)
272 .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?.flatten();
273 sqlx::query(
274 "UPDATE tier_state SET previous_version = ?, current_version = ?, burn_in_started_at = ?
275 WHERE tier = ?",
276 )
277 .bind(prev)
278 .bind(&version)
279 .bind(chrono::Utc::now().to_rfc3339())
280 .bind(&target.name)
281 .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
282
283 if body.reset_burn_in {
284 sqlx::query("UPDATE tier_state SET burn_in_started_at = NULL WHERE tier = ?")
285 .bind(&source.name)
286 .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
287 }
288
289 crate::events::emit(&s.events, crate::events::Event::PromoteComplete {
290 tier: target.name.clone(), version: version.clone(),
291 });
292 tracing::info!(
293 version = %version, tier = %target.name,
294 hotfix = body.hotfix, reset_burn_in = body.reset_burn_in,
295 "promote complete",
296 );
297
298 Ok(Json(serde_json::json!({
299 "tier": target.name,
300 "version": version,
301 "nodes_deployed": target.nodes.iter().map(|n| n.name.clone()).collect::<Vec<_>>(),
302 })))
303 }
304
305 /// Returns the kinds of gates on `tier` that have not (yet) passed for
306 /// `version`. `hotfix` suppresses the burn_in requirement only.
307 async fn unsatisfied_gates(
308 pool: &sqlx::SqlitePool,
309 tier: &str,
310 version: &str,
311 hotfix: bool,
312 ) -> std::result::Result<Vec<String>, crate::error::Error> {
313 // We need the configured gate list for the tier to know what *should*
314 // pass. The route handler has Topology in hand and could pass it in, but
315 // the DB also captures it implicitly via gate_runs rows. Simplest correct
316 // answer: re-read from topology via tier name; the caller has it.
317 // For now we inspect the latest gate_runs.
318 let rows: Vec<(String, Option<String>)> = sqlx::query_as(
319 "SELECT gate_kind, status FROM gate_runs g
320 WHERE tier = ?1 AND version = ?2
321 AND id = (SELECT MAX(id) FROM gate_runs
322 WHERE tier = ?1 AND version = ?2 AND gate_kind = g.gate_kind)",
323 )
324 .bind(tier).bind(version)
325 .fetch_all(pool).await.map_err(crate::error::Error::Db)?;
326 let mut bad = Vec::new();
327 for (kind, status) in rows {
328 if hotfix && kind == "burn_in" {
329 continue;
330 }
331 // NULL status (in-flight) and any non-passed status both count as
332 // unsatisfied; only an explicit 'passed' clears the gate.
333 if status.as_deref() != Some("passed") {
334 bad.push(kind);
335 }
336 }
337 Ok(bad)
338 }
339
340 async fn rollback(
341 State(s): State<AppState>,
342 Path(tier): Path<String>,
343 ) -> Result<Json<serde_json::Value>> {
344 let tier = crate::domain::TierId::new(tier);
345 let target = s.topo.tiers.iter().find(|t| t.name == tier)
346 .ok_or(crate::error::Error::NotFound)?;
347
348 let row: Option<(Option<String>, Option<String>)> = sqlx::query_as(
349 "SELECT current_version, previous_version FROM tier_state WHERE tier = ?",
350 )
351 .bind(&tier)
352 .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?;
353 let (Some(current_str), Some(previous_str)) = row.unwrap_or((None, None)) else {
354 return Err(crate::error::Error::GateBlocked(
355 "no previous_version to roll back to".into(),
356 ));
357 };
358 let current = crate::domain::Version::parse(&current_str)
359 .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?;
360 let previous = crate::domain::Version::parse(&previous_str)
361 .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?;
362
363 let bin: Option<(String,)> = sqlx::query_as(
364 "SELECT artifact_path FROM versions WHERE version = ?",
365 )
366 .bind(&previous)
367 .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?;
368 let Some((bin,)) = bin else {
369 return Err(crate::error::Error::GateBlocked(
370 format!("previous version {previous} has no artifact_path; rollback impossible"),
371 ));
372 };
373 let bin_path = std::path::PathBuf::from(bin);
374 let staged_dir = bin_path.parent()
375 .ok_or_else(|| crate::error::Error::Other(anyhow::anyhow!("artifact_path has no parent")))?
376 .to_path_buf();
377
378 for node in &target.nodes {
379 let executor = s.executors.get(&node.name).cloned()
380 .unwrap_or_else(|| crate::state::build_executor(node));
381 crate::deploy::deploy_node(executor.as_ref(), node, &previous_str, &staged_dir, s.cfg.primary_bin())
382 .await
383 .map_err(crate::error::Error::Other)?;
384 }
385
386 sqlx::query(
387 "UPDATE tier_state SET current_version = ?, previous_version = ?, burn_in_started_at = NULL
388 WHERE tier = ?",
389 )
390 .bind(&previous)
391 .bind(&current)
392 .bind(&tier)
393 .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
394
395 tracing::warn!(tier = %tier, from = %current, to = %previous, "rollback complete");
396 crate::events::emit(&s.events, crate::events::Event::Rollback {
397 tier: tier.clone(), from: current.clone(), to: previous.clone(),
398 });
399
400 Ok(Json(serde_json::json!({
401 "tier": tier,
402 "rolled_back_from": current,
403 "now_running": previous,
404 })))
405 }
406
407 #[derive(Deserialize, Default)]
408 struct RebuildBody {
409 /// Specific sha to build. If absent, resolve `topo.repo.branch` from the bare repo.
410 #[serde(default)]
411 sha: Option<String>,
412 }
413
414 async fn rebuild(
415 State(s): State<AppState>,
416 body: Option<Json<RebuildBody>>,
417 ) -> Result<Json<serde_json::Value>> {
418 let body = body.map(|Json(b)| b).unwrap_or_default();
419 let sha = match body.sha {
420 Some(s) => s,
421 None => crate::git::resolve_ref(
422 std::path::Path::new(&s.topo.repo.bare_path),
423 &s.topo.repo.branch,
424 )
425 .await
426 .map_err(crate::error::Error::Other)?,
427 };
428
429 // Boundary parse: a sha entering Sando must be hex of plausible length.
430 // The build pipeline downstream only ever sees `GitSha`.
431 let sha = crate::domain::GitSha::parse(&sha)
432 .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?;
433
434 tracing::info!(sha = %sha, "rebuild requested");
435 crate::events::emit(&s.events, crate::events::Event::RebuildRequested { sha: sha.clone() });
436
437 // Latest /rebuild wins: abort any in-flight build before spawning a new
438 // one. Aborting drops the spawned task's future, which drops any
439 // tokio::process::Child it owns; with `kill_on_drop(true)` set on the
440 // cargo Command, SIGKILL propagates to cargo + its rustc children.
441 let mut slot = s.active_build.lock().await;
442 if let Some(prev) = slot.take() {
443 if !prev.is_finished() {
444 tracing::warn!("aborting in-flight build for newer /rebuild request");
445 crate::events::emit(&s.events, crate::events::Event::BuildAborted { sha_aborted: sha.clone() });
446 prev.abort();
447 }
448 }
449
450 let pool = s.pool.clone();
451 let cfg = s.cfg.clone();
452 let topo = s.topo.clone();
453 let events_for_task = s.events.clone();
454 let sha_for_task = sha.clone();
455 let sha_response = sha.to_string();
456 let handle = tokio::spawn(async move {
457 if let Err(e) = crate::build::build_and_run_host(pool, cfg, topo, sha_for_task.clone(), events_for_task).await {
458 tracing::error!(sha = %sha_for_task, error = %e, "rebuild pipeline failed");
459 }
460 });
461 *slot = Some(handle.abort_handle());
462
463 Ok(Json(serde_json::json!({ "accepted": true, "sha": sha_response })))
464 }
465
466 async fn confirm(
467 State(s): State<AppState>,
468 Path(tier): Path<String>,
469 ) -> Result<Json<serde_json::Value>> {
470 // Operator-driven satisfaction of a `manual_confirm` gate. Looks up the
471 // pending version (current MM version, or the tier's own if non-mm) and
472 // inserts a passing gate_runs row so /promote can advance.
473 let tier = crate::domain::TierId::new(tier);
474 let target = s.topo.tiers.iter().find(|t| t.name == tier)
475 .ok_or(crate::error::Error::NotFound)?;
476
477 let version_str: Option<String> = sqlx::query_scalar(
478 "SELECT current_version FROM tier_state WHERE tier = ?",
479 )
480 .bind(&target.name)
481 .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?.flatten();
482 let version_str = version_str.ok_or_else(|| crate::error::Error::GateBlocked(
483 format!("tier {tier} has no current_version; nothing to confirm"),
484 ))?;
485 let version = crate::domain::Version::parse(&version_str)
486 .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?;
487
488 let now = chrono::Utc::now().to_rfc3339();
489 let outcome = crate::outcome::GateOutcome::passed(
490 crate::outcome::PassNote::OperatorConfirmed { at: chrono::Utc::now() },
491 );
492 let outcome_json = serde_json::to_string(&outcome)
493 .map_err(|e| crate::error::Error::Other(anyhow::anyhow!(e)))?;
494 sqlx::query(
495 "INSERT INTO gate_runs (version, tier, gate_kind, started_at, finished_at, status, outcome_json)
496 VALUES (?, ?, 'manual_confirm', ?, ?, 'passed', ?)",
497 )
498 .bind(&version).bind(&target.name).bind(&now).bind(&now).bind(&outcome_json)
499 .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
500
501 tracing::info!(tier = %tier, version = %version, "manual_confirm recorded");
502 crate::events::emit(&s.events, crate::events::Event::ManualConfirm {
503 tier: tier.clone(),
504 version: version.clone(),
505 });
506
507 Ok(Json(serde_json::json!({ "tier": tier, "version": version })))
508 }
509
510 async fn backup_fetch(State(s): State<AppState>) -> Result<Json<serde_json::Value>> {
511 let fb = crate::backup::fetch(&s.pool, &s.cfg, &s.topo)
512 .await
513 .map_err(crate::error::Error::Other)?;
514 crate::events::emit(&s.events, crate::events::Event::BackupFetched {
515 source: fb.source.clone(),
516 byte_size: fb.byte_size.unwrap_or(0),
517 });
518 Ok(Json(serde_json::json!({
519 "source": fb.source,
520 "local_path": fb.local_path,
521 "byte_size": fb.byte_size,
522 })))
523 }
524
525 async fn get_gate_log(
526 State(s): State<AppState>,
527 Path((version, gate)): Path<(String, String)>,
528 ) -> Result<axum::response::Response> {
529 // Guard against `..` / absolute paths — both segments must be a single
530 // safe component. Without this, `GET /logs/..%2Fetc/passwd` would escape
531 // logs_root.
532 fn safe(seg: &str) -> bool {
533 !seg.is_empty()
534 && !seg.contains('/')
535 && !seg.contains('\\')
536 && seg != "."
537 && seg != ".."
538 }
539 if !safe(&version) || !safe(&gate) {
540 return Err(crate::error::Error::NotFound);
541 }
542 let path = s.cfg.logs_root.join(&version).join(format!("{gate}.log"));
543 match tokio::fs::read(&path).await {
544 Ok(bytes) => Ok((
545 [(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")],
546 bytes,
547 ).into_response()),
548 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(crate::error::Error::NotFound),
549 Err(e) => Err(crate::error::Error::Other(e.into())),
550 }
551 }
552
553 async fn events_ws(ws: WebSocketUpgrade, State(s): State<AppState>) -> impl IntoResponse {
554 use axum::extract::ws::Message;
555 use tokio::sync::broadcast::error::RecvError;
556
557 ws.on_upgrade(move |mut socket| async move {
558 let mut rx = s.events.subscribe();
559 loop {
560 match rx.recv().await {
561 Ok(env) => {
562 let json = match serde_json::to_string(&env) {
563 Ok(s) => s,
564 Err(e) => {
565 tracing::warn!(error = %e, "events ws: serialize failed");
566 continue;
567 }
568 };
569 if socket.send(Message::Text(json.into())).await.is_err() {
570 break;
571 }
572 }
573 Err(RecvError::Lagged(n)) => {
574 let _ = socket.send(Message::Text(
575 format!(r#"{{"kind":"lagged","skipped":{n}}}"#).into(),
576 )).await;
577 }
578 Err(RecvError::Closed) => break,
579 }
580 }
581 })
582 }
583
584 #[cfg(test)]
585 mod tests {
586 use super::*;
587 use crate::config::Config;
588 use crate::topology::{BackupConfig, CanaryPolicy, Gate, Node, RepoConfig, Tier, Topology};
589 use axum::body::Body;
590 use axum::http::{Request, StatusCode};
591 use http_body_util::BodyExt;
592 use metrics_exporter_prometheus::PrometheusBuilder;
593 use sqlx::sqlite::SqlitePoolOptions;
594 use sqlx::SqlitePool;
595 use std::path::PathBuf;
596 use std::sync::Arc;
597 use tower::ServiceExt;
598
599 async fn fresh_pool() -> SqlitePool {
600 let pool = SqlitePoolOptions::new()
601 .max_connections(1)
602 .connect("sqlite::memory:")
603 .await
604 .unwrap();
605 sqlx::migrate!("./migrations").run(&pool).await.unwrap();
606 pool
607 }
608
609 /// Two-tier topology used by the route tests: mm (provisioned, no nodes)
610 /// → a (provisioned, one local node). Mirrors the production shape
611 /// without involving real ssh / postgres.
612 fn test_topo() -> Topology {
613 Topology {
614 repo: RepoConfig { bare_path: "/tmp/test.git".into(), branch: "main".into() },
615 backup: BackupConfig {
616 source: "file:///tmp/test-backup.sql".into(),
617 local_path: "/tmp/local-backup.sql".into(),
618 },
619 tiers: vec![
620 Tier {
621 name: "host".into(),
622 provisioned: true,
623 gates: vec![],
624 canary: CanaryPolicy::Sequential,
625 nodes: vec![],
626 },
627 Tier {
628 name: "a".into(),
629 provisioned: true,
630 gates: vec![Gate::BootSmoke],
631 canary: CanaryPolicy::Sequential,
632 nodes: vec![Node {
633 name: "a-local".into(),
634 ssh_target: "local".into(),
635 release_root: "/tmp/a-node".into(),
636 service_name: "makenotwork.service".into(),
637 actuate: crate::topology::default_actuate(),
638 observe: crate::topology::default_observe(),
639 }],
640 },
641 ],
642 }
643 }
644
645 fn test_cfg() -> Config {
646 Config {
647 listen: "127.0.0.1:0".into(),
648 db_path: PathBuf::from(":memory:"),
649 topology_path: PathBuf::from("/tmp/test-sando.toml"),
650 workdir: PathBuf::from("/tmp/sando-work"),
651 release_root: PathBuf::from("/tmp/sando-releases"),
652 scratch_db_url: None,
653 bin_names: vec!["makenotwork".into()],
654 logs_root: PathBuf::from("/tmp/sando-logs"),
655 release_contents: vec![],
656 }
657 }
658
659 async fn test_state() -> AppState {
660 let pool = fresh_pool().await;
661 // Seed tier rows so FKs on tier_state / gate_runs are satisfied.
662 for (i, name) in ["host", "a"].iter().enumerate() {
663 sqlx::query(
664 "INSERT INTO tiers (name, ord, provisioned, canary) VALUES (?, ?, 1, 'sequential')",
665 )
666 .bind(name).bind(i as i64).execute(&pool).await.unwrap();
667 sqlx::query("INSERT INTO tier_state (tier) VALUES (?)")
668 .bind(name).execute(&pool).await.unwrap();
669 }
670 // Don't call install_recorder in tests — it touches a process-global
671 // and conflicts when tests run in parallel.
672 let prom = PrometheusBuilder::new().build_recorder().handle();
673 let topo = test_topo();
674 let executors = Arc::new(crate::state::build_executors(&topo));
675 AppState {
676 pool,
677 topo: Arc::new(topo),
678 cfg: Arc::new(test_cfg()),
679 prom,
680 active_build: Arc::new(tokio::sync::Mutex::new(None)),
681 events: crate::events::channel(),
682 executors,
683 }
684 }
685
686 async fn body_string(resp: axum::response::Response) -> String {
687 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
688 String::from_utf8(bytes.to_vec()).unwrap()
689 }
690
691 /// Insert the FK prerequisites for inserting gate_runs/tier_state rows.
692 async fn seed(pool: &SqlitePool, tier: &str, version: &str) {
693 sqlx::query("INSERT INTO tiers (name, ord, provisioned, canary) VALUES (?, 0, 1, 'sequential') ON CONFLICT DO NOTHING")
694 .bind(tier).execute(pool).await.unwrap();
695 sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES (?, 'sha', datetime('now'), '/tmp/x') ON CONFLICT DO NOTHING")
696 .bind(version).execute(pool).await.unwrap();
697 sqlx::query("INSERT INTO tier_state (tier, current_version) VALUES (?, NULL) ON CONFLICT DO NOTHING")
698 .bind(tier).execute(pool).await.unwrap();
699 }
700
701 async fn insert_gate(pool: &SqlitePool, tier: &str, version: &str, kind: &str, passed: i64) {
702 let status = if passed == 1 { "passed" } else { "failed" };
703 sqlx::query(
704 "INSERT INTO gate_runs (version, tier, gate_kind, started_at, finished_at, status) \
705 VALUES (?, ?, ?, datetime('now'), datetime('now'), ?)",
706 )
707 .bind(version).bind(tier).bind(kind).bind(status)
708 .execute(pool).await.unwrap();
709 }
710
711 // ---- unsatisfied_gates ----
712
713 #[tokio::test]
714 async fn unsatisfied_gates_empty_when_no_runs() {
715 // No gate_runs rows means there's nothing to check — caller treats
716 // empty as "all green" which is correct iff the predecessor tier
717 // has no configured gates. The topology validation is upstream.
718 let pool = fresh_pool().await;
719 seed(&pool, "host", "0.8.12").await;
720 let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap();
721 assert_eq!(pending, Vec::<String>::new());
722 }
723
724 #[tokio::test]
725 async fn unsatisfied_gates_flags_failed_kind() {
726 let pool = fresh_pool().await;
727 seed(&pool, "host", "0.8.12").await;
728 insert_gate(&pool, "host", "0.8.12", "cargo_test", 0).await;
729 insert_gate(&pool, "host", "0.8.12", "boot_smoke", 1).await;
730 let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap();
731 assert_eq!(pending, vec!["cargo_test".to_string()]);
732 }
733
734 #[tokio::test]
735 async fn unsatisfied_gates_latest_row_wins() {
736 // Two runs of the same gate; only the latest counts. A flap from
737 // red to green should clear the pending entry.
738 let pool = fresh_pool().await;
739 seed(&pool, "host", "0.8.12").await;
740 insert_gate(&pool, "host", "0.8.12", "cargo_test", 0).await;
741 insert_gate(&pool, "host", "0.8.12", "cargo_test", 1).await;
742 let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap();
743 assert!(pending.is_empty());
744 }
745
746 #[tokio::test]
747 async fn unsatisfied_gates_hotfix_skips_only_burn_in() {
748 // hotfix=true is supposed to bypass burn_in failures specifically —
749 // not cargo_test, not boot_smoke. Lock the semantic so a future
750 // rename doesn't accidentally widen it.
751 let pool = fresh_pool().await;
752 seed(&pool, "a", "0.8.12").await;
753 insert_gate(&pool, "a", "0.8.12", "burn_in", 0).await;
754 insert_gate(&pool, "a", "0.8.12", "cargo_test", 0).await;
755
756 let normal = unsatisfied_gates(&pool, "a", "0.8.12", false).await.unwrap();
757 let mut sorted = normal.clone();
758 sorted.sort();
759 assert_eq!(sorted, vec!["burn_in".to_string(), "cargo_test".to_string()]);
760
761 let with_hotfix = unsatisfied_gates(&pool, "a", "0.8.12", true).await.unwrap();
762 assert_eq!(with_hotfix, vec!["cargo_test".to_string()]);
763 }
764
765 #[tokio::test]
766 async fn unsatisfied_gates_ignores_other_tiers_and_versions() {
767 let pool = fresh_pool().await;
768 seed(&pool, "host", "0.8.12").await;
769 seed(&pool, "host", "0.8.11").await;
770 seed(&pool, "a", "0.8.12").await;
771 // Mark mm/0.8.12 cargo_test failing, but unrelated tiers/versions
772 // shouldn't pollute the query.
773 insert_gate(&pool, "host", "0.8.12", "cargo_test", 0).await;
774 insert_gate(&pool, "a", "0.8.12", "cargo_test", 0).await;
775 insert_gate(&pool, "host", "0.8.11", "cargo_test", 0).await;
776
777 let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap();
778 assert_eq!(pending, vec!["cargo_test".to_string()]);
779 }
780
781 #[tokio::test]
782 async fn unsatisfied_gates_null_status_is_treated_as_failing() {
783 // An in-flight gate (started_at set, finished_at + status NULL)
784 // should NOT be treated as green. Otherwise a race could promote
785 // before the gate concludes.
786 let pool = fresh_pool().await;
787 seed(&pool, "host", "0.8.12").await;
788 sqlx::query(
789 "INSERT INTO gate_runs (version, tier, gate_kind, started_at) \
790 VALUES ('0.8.12', 'host', 'cargo_test', datetime('now'))",
791 )
792 .execute(&pool).await.unwrap();
793
794 let pending = unsatisfied_gates(&pool, "host", "0.8.12", false).await.unwrap();
795 assert_eq!(pending, vec!["cargo_test".to_string()]);
796 }
797
798 // ---- /confirm/{tier} ----
799
800 #[tokio::test]
801 async fn confirm_rejects_when_tier_has_no_current_version() {
802 // tier_state.a.current_version is NULL by default. /confirm has
803 // nothing to confirm against → GateBlocked (400).
804 let state = test_state().await;
805 let app = router(state.clone());
806 let resp = app
807 .oneshot(
808 Request::builder()
809 .method("POST")
810 .uri("/confirm/a")
811 .body(Body::empty())
812 .unwrap(),
813 )
814 .await
815 .unwrap();
816 assert_eq!(resp.status(), StatusCode::CONFLICT);
817 let body = body_string(resp).await;
818 assert!(body.contains("no current_version"), "got: {body}");
819 }
820
821 #[tokio::test]
822 async fn confirm_accepts_when_current_version_set_and_inserts_row() {
823 let state = test_state().await;
824 // Seed a version + advance tier a's state to it.
825 sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES ('0.8.12','sha',datetime('now'),'/tmp/x')")
826 .execute(&state.pool).await.unwrap();
827 sqlx::query("UPDATE tier_state SET current_version = '0.8.12' WHERE tier = 'a'")
828 .execute(&state.pool).await.unwrap();
829
830 let app = router(state.clone());
831 let resp = app
832 .oneshot(
833 Request::builder()
834 .method("POST")
835 .uri("/confirm/a")
836 .body(Body::empty())
837 .unwrap(),
838 )
839 .await
840 .unwrap();
841 assert_eq!(resp.status(), StatusCode::OK);
842 let body = body_string(resp).await;
843 assert!(body.contains("\"tier\":\"a\""));
844 assert!(body.contains("\"version\":\"0.8.12\""));
845
846 // A passing gate_runs row was inserted.
847 let count: (i64,) = sqlx::query_as(
848 "SELECT COUNT(*) FROM gate_runs WHERE tier='a' AND gate_kind='manual_confirm' AND status='passed'",
849 )
850 .fetch_one(&state.pool)
851 .await
852 .unwrap();
853 assert_eq!(count.0, 1);
854 }
855
856 #[tokio::test]
857 async fn confirm_404s_for_unknown_tier() {
858 let state = test_state().await;
859 let app = router(state);
860 let resp = app
861 .oneshot(
862 Request::builder()
863 .method("POST")
864 .uri("/confirm/zzzz")
865 .body(Body::empty())
866 .unwrap(),
867 )
868 .await
869 .unwrap();
870 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
871 }
872
873 // ---- /promote/{tier} default-version resolution ----
874
875 #[tokio::test]
876 async fn promote_to_first_tier_is_rejected() {
877 // tier 0 is host — you /rebuild, not /promote.
878 let state = test_state().await;
879 let app = router(state);
880 let resp = app
881 .oneshot(
882 Request::builder()
883 .method("POST")
884 .uri("/promote/host")
885 .body(Body::empty())
886 .unwrap(),
887 )
888 .await
889 .unwrap();
890 assert_eq!(resp.status(), StatusCode::CONFLICT);
891 let body = body_string(resp).await;
892 assert!(body.contains("cannot /promote to the first tier"), "got: {body}");
893 }
894
895 #[tokio::test]
896 async fn promote_without_body_and_no_predecessor_version_errors() {
897 // tier a has no body version supplied AND its predecessor mm has
898 // current_version=NULL. Should fail before any deploy.
899 let state = test_state().await;
900 let app = router(state);
901 let resp = app
902 .oneshot(
903 Request::builder()
904 .method("POST")
905 .uri("/promote/a")
906 .body(Body::empty())
907 .unwrap(),
908 )
909 .await
910 .unwrap();
911 assert_eq!(resp.status(), StatusCode::CONFLICT);
912 let body = body_string(resp).await;
913 assert!(
914 body.contains("no version specified") || body.contains("no current_version"),
915 "got: {body}"
916 );
917 }
918
919 #[tokio::test]
920 async fn promote_with_explicit_version_but_missing_artifact_404s() {
921 // Explicit version supplied, gates trivially pass (mm has none in
922 // test_topo), but `versions` table has no row → 404.
923 let state = test_state().await;
924 let app = router(state);
925 let resp = app
926 .oneshot(
927 Request::builder()
928 .method("POST")
929 .uri("/promote/a")
930 .header("content-type", "application/json")
931 .body(Body::from(r#"{"version":"9.9.9"}"#))
932 .unwrap(),
933 )
934 .await
935 .unwrap();
936 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
937 }
938
939 // ---- GET /logs/{version}/{gate} ----
940
941 async fn state_with_logs_root(logs_root: PathBuf) -> AppState {
942 let mut s = test_state().await;
943 let mut cfg = (*s.cfg).clone();
944 cfg.logs_root = logs_root;
945 s.cfg = Arc::new(cfg);
946 s
947 }
948
949 #[tokio::test]
950 async fn get_gate_log_returns_file_contents() {
951 let tmp = tempfile::tempdir().unwrap();
952 let dir = tmp.path().join("0.9.5");
953 tokio::fs::create_dir_all(&dir).await.unwrap();
954 tokio::fs::write(dir.join("cargo_test.log"), b"hello sandod\n").await.unwrap();
955
956 let state = state_with_logs_root(tmp.path().to_path_buf()).await;
957 let app = router(state);
958 let resp = app
959 .oneshot(Request::builder().uri("/logs/0.9.5/cargo_test").body(Body::empty()).unwrap())
960 .await
961 .unwrap();
962 assert_eq!(resp.status(), StatusCode::OK);
963 assert_eq!(body_string(resp).await, "hello sandod\n");
964 }
965
966 #[tokio::test]
967 async fn get_gate_log_404s_when_missing() {
968 let tmp = tempfile::tempdir().unwrap();
969 let state = state_with_logs_root(tmp.path().to_path_buf()).await;
970 let app = router(state);
971 let resp = app
972 .oneshot(Request::builder().uri("/logs/0.9.5/cargo_test").body(Body::empty()).unwrap())
973 .await
974 .unwrap();
975 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
976 }
977
978 /// Path-traversal guard: a `..` segment must not escape logs_root.
979 /// axum's `{name}` param already rejects a literal `/` in the value, but
980 /// `..` as a whole segment is structurally valid and must be blocked at
981 /// the handler.
982 #[tokio::test]
983 async fn get_gate_log_rejects_dotdot_segments() {
984 let tmp = tempfile::tempdir().unwrap();
985 let state = state_with_logs_root(tmp.path().to_path_buf()).await;
986 let app = router(state);
987 let resp = app
988 .oneshot(Request::builder().uri("/logs/../etc/passwd").body(Body::empty()).unwrap())
989 .await
990 .unwrap();
991 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
992 }
993 }
994