Skip to main content

max / makenotwork

24.8 KB · 626 lines History Blame Raw
1 //! Gate execution. Each gate kind has a runner that produces a pass/fail
2 //! outcome plus an optional detail string (typically a stderr tail or a
3 //! human-readable reason). Outcomes are persisted to `gate_runs` so /state
4 //! and the TUI can show them.
5
6 use crate::classify;
7 use crate::config::Config;
8 use crate::domain::{GateKind, GateRunId, TierId, Version};
9 use crate::events::{self, Event, EventTx};
10 use crate::live_log::LiveLog;
11 use crate::outcome::{GateBlocker, GateFailure, GateOutcome, LogRef, PassNote};
12 use crate::topology::Gate;
13 use anyhow::Result;
14 use chrono::Utc;
15 use sqlx::SqlitePool;
16 use std::path::PathBuf;
17 use std::sync::Arc;
18 use tokio::io::AsyncReadExt;
19 use tokio::process::Command;
20
21 pub struct GateCtx {
22 pub pool: SqlitePool,
23 pub cfg: Arc<Config>,
24 pub tier: TierId,
25 pub version: Version,
26 pub worktree: PathBuf,
27 pub events: EventTx,
28 }
29
30 /// Run a single gate end-to-end: insert the in-flight row, execute the gate,
31 /// update the row with the outcome. Returns the outcome for the caller.
32 pub async fn run(ctx: &GateCtx, gate: &Gate) -> Result<GateOutcome> {
33 let kind = gate.kind();
34 let started_at = Utc::now().to_rfc3339();
35
36 let id: i64 = sqlx::query_scalar(
37 "INSERT INTO gate_runs (version, tier, gate_kind, started_at) VALUES (?, ?, ?, ?)
38 RETURNING id",
39 )
40 .bind(&ctx.version)
41 .bind(&ctx.tier)
42 .bind(kind)
43 .bind(&started_at)
44 .fetch_one(&ctx.pool)
45 .await?;
46 let run_id = GateRunId(id);
47
48 tracing::info!(
49 run_id = %run_id, tier = %ctx.tier, version = %ctx.version, gate = %kind,
50 "gate start",
51 );
52 events::emit(&ctx.events, Event::GateStart {
53 run_id,
54 tier: ctx.tier.clone(),
55 version: ctx.version.clone(),
56 gate: kind,
57 });
58
59 let outcome = match gate {
60 Gate::CargoTest => cargo_test(ctx, run_id).await,
61 Gate::MigrationDryRun => migration_dry_run(ctx).await,
62 Gate::BootSmoke => boot_smoke(ctx, run_id).await,
63 Gate::BurnIn { hours } => burn_in(ctx, *hours).await,
64 Gate::ManualConfirm => manual_confirm(ctx).await,
65 };
66
67 let outcome = outcome.unwrap_or_else(|e| GateOutcome::failed(GateFailure::Unclassified {
68 legacy_detail: Some(format!("gate runner errored: {e}")),
69 }));
70
71 let outcome_json = serde_json::to_string(&outcome)
72 .unwrap_or_else(|e| format!("{{\"_serialize_error\":{e:?}}}"));
73 sqlx::query(
74 "UPDATE gate_runs
75 SET finished_at = ?, status = ?, outcome_json = ?, log_ref = ?
76 WHERE id = ?",
77 )
78 .bind(Utc::now().to_rfc3339())
79 .bind(outcome.status_str())
80 .bind(&outcome_json)
81 .bind(outcome.log_ref.as_ref().map(|l| l.as_str()))
82 .bind(id)
83 .execute(&ctx.pool)
84 .await?;
85
86 tracing::info!(
87 tier = %ctx.tier, version = %ctx.version, gate = %kind,
88 status = outcome.status_str(), "gate done",
89 );
90 events::emit(&ctx.events, Event::GateDone {
91 run_id,
92 tier: ctx.tier.clone(),
93 version: ctx.version.clone(),
94 gate: kind,
95 outcome: outcome.clone(),
96 });
97
98 Ok(outcome)
99 }
100
101 /// Run a sequence of gates; stops on the first failure (no point running the
102 /// Run every gate in order and return true iff all passed. We deliberately do
103 /// NOT short-circuit on first failure — every gate's outcome is recorded in
104 /// `gate_runs`, which is the operator's only visibility into pipeline health.
105 /// Hiding later gates because an earlier one failed makes diagnosis worse.
106 pub async fn run_all(ctx: &GateCtx, gates: &[Gate]) -> Result<bool> {
107 let mut all_ok = true;
108 for g in gates {
109 let o = run(ctx, g).await?;
110 if !o.is_passed() {
111 all_ok = false;
112 }
113 }
114 Ok(all_ok)
115 }
116
117 // ---- individual gate runners ----
118
119 async fn cargo_test(ctx: &GateCtx, run_id: GateRunId) -> Result<GateOutcome> {
120 let server_dir = ctx.worktree.join("server");
121 let mut cmd = Command::new("cargo");
122 // Match CI (`server/deploy/run-ci.sh`): `--features fast-tests` relaxes
123 // auth rate-limit burst (5 → 20) and argon2 cost so signup-heavy + lockout
124 // workflow tests can complete without hitting Governor before the
125 // hand-rolled lockout check. The feature is specifically documented for
126 // this in `server/src/constants.rs:87`.
127 cmd.args(["test", "--release", "--features", "fast-tests"])
128 .current_dir(&server_dir)
129 .stdout(std::process::Stdio::piped())
130 .stderr(std::process::Stdio::piped())
131 .kill_on_drop(true);
132 // Same online-mode rationale as the build step: sqlx query macros need a
133 // live DB to type-check against. The scratch DB is left in migrated state
134 // by the preceding build, so we can reuse it here.
135 if let Some(scratch_url) = ctx.cfg.scratch_db_url.as_deref() {
136 cmd.env("DATABASE_URL", scratch_url);
137 // The server test harness (tests/harness/db.rs) parses TEST_DATABASE_URL
138 // with rfind('/'), which mangles URLs whose query string contains '/'
139 // (e.g. `?host=/var/run/postgresql`). Strip the query — libpq defaults
140 // to /var/run/postgresql on Debian/Ubuntu when host is unspecified.
141 let test_url = scratch_url
142 .split_once('?')
143 .map(|(base, _)| base)
144 .unwrap_or(scratch_url);
145 cmd.env("TEST_DATABASE_URL", test_url);
146 }
147 let started = std::time::Instant::now();
148 let log_path = gate_log_path(ctx, GateKind::CargoTest);
149 let log_ref = LogRef::new(&ctx.version, GateKind::CargoTest);
150 let mut child = match cmd.spawn() {
151 Ok(c) => c,
152 Err(e) => {
153 return Ok(GateOutcome::failed(GateFailure::SpawnFailed {
154 message: e.to_string(),
155 }).with_log_ref(log_ref));
156 }
157 };
158 let (stdout_buf, stderr_buf, status) =
159 stream_child_to_live_log(&mut child, ctx.events.clone(), run_id, log_path).await?;
160 let duration_s = started.elapsed().as_secs() as u32;
161 if status.success() {
162 Ok(GateOutcome::passed(PassNote::TestsPassed { duration_s }).with_log_ref(log_ref))
163 } else {
164 let failure = classify::classify_cargo_test(&stdout_buf, &stderr_buf);
165 Ok(GateOutcome::failed(failure).with_log_ref(log_ref))
166 }
167 }
168
169 async fn migration_dry_run(ctx: &GateCtx) -> Result<GateOutcome> {
170 let mut log_buf: Vec<u8> = Vec::new();
171 let log_ref = LogRef::new(&ctx.version, GateKind::MigrationDryRun);
172 let finish = |outcome: GateOutcome, buf: Vec<u8>| async move {
173 persist_gate_log(ctx, GateKind::MigrationDryRun, &buf, &[]).await;
174 outcome
175 };
176
177 let Some(db_url) = ctx.cfg.scratch_db_url.as_deref() else {
178 log_buf.extend_from_slice(b"scratch_db_url unset in daemon config\n");
179 return Ok(finish(
180 GateOutcome::blocked(GateBlocker::ScratchDbUrlUnset).with_log_ref(log_ref),
181 log_buf,
182 ).await);
183 };
184
185 let backup: Option<(String,)> = sqlx::query_as(
186 "SELECT local_path FROM backups ORDER BY id DESC LIMIT 1",
187 )
188 .fetch_optional(&ctx.pool)
189 .await?;
190 let Some((backup_path,)) = backup else {
191 log_buf.extend_from_slice(b"no backup fetched; call /backup/fetch first\n");
192 return Ok(finish(
193 GateOutcome::blocked(GateBlocker::NoBackupAvailable).with_log_ref(log_ref),
194 log_buf,
195 ).await);
196 };
197
198 log_buf.extend_from_slice(b"---- reset_scratch ----\n");
199 if let Err(e) = reset_scratch(db_url).await {
200 let msg = format!("scratch reset: {e}");
201 log_buf.extend_from_slice(msg.as_bytes());
202 return Ok(finish(
203 GateOutcome::failed(GateFailure::RestoreFailed { reason: msg }).with_log_ref(log_ref),
204 log_buf,
205 ).await);
206 }
207 log_buf.extend_from_slice(format!("---- restore_dump ({backup_path}) ----\n").as_bytes());
208 if let Err(e) = restore_dump(db_url, &backup_path, &mut log_buf).await {
209 let msg = format!("restore: {e}");
210 log_buf.extend_from_slice(msg.as_bytes());
211 return Ok(finish(
212 GateOutcome::failed(GateFailure::RestoreFailed { reason: msg }).with_log_ref(log_ref),
213 log_buf,
214 ).await);
215 }
216
217 let migrations_dir = ctx.worktree.join("server").join("migrations");
218 log_buf.extend_from_slice(b"---- run_migrator ----\n");
219 match run_migrator(db_url, &migrations_dir).await {
220 Ok(()) => {
221 let detail = format!("restored {backup_path} + migrated");
222 log_buf.extend_from_slice(detail.as_bytes());
223 Ok(finish(
224 GateOutcome::passed(PassNote::Migrated { backup_path: backup_path.clone() })
225 .with_log_ref(log_ref),
226 log_buf,
227 ).await)
228 }
229 Err(e) => {
230 let err_s = e.to_string();
231 log_buf.extend_from_slice(err_s.as_bytes());
232 let failure = classify::classify_migration_error(&err_s, None);
233 Ok(finish(
234 GateOutcome::failed(failure).with_log_ref(log_ref),
235 log_buf,
236 ).await)
237 }
238 }
239 }
240
241 pub(crate) async fn reset_scratch(db_url: &str) -> Result<()> {
242 use sqlx::postgres::PgPoolOptions;
243 use sqlx::Executor;
244 let pool = PgPoolOptions::new().max_connections(1).connect(db_url).await?;
245 // Drop every non-system schema, not just public — migrations create custom
246 // schemas (e.g. tower_sessions) that survive `DROP SCHEMA public CASCADE`
247 // and then collide on the next migration run.
248 pool.execute(
249 r#"
250 DO $$
251 DECLARE s text;
252 BEGIN
253 FOR s IN
254 SELECT nspname FROM pg_namespace
255 WHERE nspname NOT LIKE 'pg_%'
256 AND nspname NOT IN ('information_schema')
257 LOOP
258 EXECUTE format('DROP SCHEMA IF EXISTS %I CASCADE', s);
259 END LOOP;
260 EXECUTE 'CREATE SCHEMA public';
261 END $$;
262 "#,
263 )
264 .await?;
265 pool.close().await;
266 Ok(())
267 }
268
269 async fn restore_dump(db_url: &str, dump: &str, log_buf: &mut Vec<u8>) -> Result<()> {
270 // Two pipelines we accept:
271 // *.sql -> psql $url < dump
272 // *.sql.gz -> gunzip -c dump | psql $url
273 let is_gz = dump.ends_with(".gz");
274 let shell = if is_gz {
275 format!("gunzip -c {q} | psql {url}", q = shell_escape(dump), url = shell_escape(db_url))
276 } else {
277 format!("psql {url} < {q}", q = shell_escape(dump), url = shell_escape(db_url))
278 };
279 let out = Command::new("sh").arg("-c").arg(&shell).output().await?;
280 log_buf.extend_from_slice(&out.stdout);
281 log_buf.extend_from_slice(&out.stderr);
282 anyhow::ensure!(
283 out.status.success(),
284 "restore failed: {}",
285 String::from_utf8_lossy(&out.stderr),
286 );
287 Ok(())
288 }
289
290 pub(crate) async fn run_migrator(db_url: &str, dir: &std::path::Path) -> Result<()> {
291 use sqlx::postgres::PgPoolOptions;
292 let pool = PgPoolOptions::new().max_connections(1).connect(db_url).await?;
293 let migrator = sqlx::migrate::Migrator::new(dir).await?;
294 migrator.run(&pool).await?;
295 pool.close().await;
296 Ok(())
297 }
298
299 fn shell_escape(s: &str) -> String {
300 format!("'{}'", s.replace('\'', "'\\''"))
301 }
302
303 async fn boot_smoke(ctx: &GateCtx, run_id: GateRunId) -> Result<GateOutcome> {
304 let bin: Option<(String,)> = sqlx::query_as(
305 "SELECT artifact_path FROM versions WHERE version = ?",
306 )
307 .bind(&ctx.version)
308 .fetch_optional(&ctx.pool)
309 .await?;
310 let Some((bin,)) = bin else {
311 return Ok(GateOutcome::blocked(GateBlocker::ArtifactMissing {
312 version: ctx.version.clone(),
313 }));
314 };
315
316 // Lowest-bar smoke: start the binary and verify it stays up for a few
317 // seconds without exiting. Panics in main, missing config, port-bind
318 // failures show up here. Anything more ambitious (probing /healthz on a
319 // real port) needs server config we don't generically know.
320 //
321 // The server requires DATABASE_URL or it panics on config load before
322 // we can observe anything. We point it at the scratch DB (already
323 // migrated by the build step and refreshed by migration_dry_run if
324 // that gate ran first). SCAN_ENABLED=false skips loading YARA rules
325 // from /opt/makenotwork/yara-rules which doesn't exist on the build
326 // host. Other config has sane optional defaults.
327 let mut cmd = tokio::process::Command::new(&bin);
328 cmd.env("SANDO_BOOT_SMOKE", "1")
329 .env("SCAN_ENABLED", "false")
330 .stdout(std::process::Stdio::piped())
331 .stderr(std::process::Stdio::piped())
332 .kill_on_drop(true);
333 if let Some(scratch_url) = ctx.cfg.scratch_db_url.as_deref() {
334 cmd.env("DATABASE_URL", scratch_url);
335 }
336 let log_path = gate_log_path(ctx, GateKind::BootSmoke);
337 let log_ref = LogRef::new(&ctx.version, GateKind::BootSmoke);
338 let mut child = match cmd.spawn() {
339 Ok(c) => c,
340 Err(e) => {
341 // Spawn failures get a one-off log line via LiveLog so the
342 // on-disk file still exists for `GET /logs/...`.
343 let mut log = LiveLog::open(ctx.events.clone(), run_id, log_path).await;
344 log.write_chunk(format!("spawn: {e}\n").as_bytes()).await;
345 log.close().await;
346 return Ok(GateOutcome::failed(GateFailure::SpawnFailed {
347 message: e.to_string(),
348 }).with_log_ref(log_ref));
349 }
350 };
351
352 // The boot smoke window is 3s. Drain stdout/stderr concurrently through
353 // a shared LiveLog sink so the operator sees panics/log lines stream in
354 // real time before the kill, AND the on-disk log gets the full byte
355 // stream for post-mortem reads. The drainers exit when their pipe
356 // closes — which happens when the child exits naturally or after kill.
357 let log = std::sync::Arc::new(tokio::sync::Mutex::new(
358 LiveLog::open(ctx.events.clone(), run_id, log_path).await,
359 ));
360 let stdout_task = tokio::spawn(stream_into_log(child.stdout.take(), log.clone()));
361 let stderr_task = tokio::spawn(stream_into_log(child.stderr.take(), log.clone()));
362
363 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
364
365 let exit = child.try_wait()?;
366 if exit.is_none() {
367 let _ = child.kill().await;
368 }
369 // The boot_smoke classifier looks at exit code only — the streamed
370 // bytes already landed in the live log and the on-disk file for the
371 // post-mortem reader. Drain the join handles to avoid hangs.
372 let _ = stdout_task.await;
373 let _ = stderr_task.await;
374 // Unique owner of the Arc at this point (both tasks dropped their clones).
375 if let Ok(mutex) = std::sync::Arc::try_unwrap(log) {
376 mutex.into_inner().close().await;
377 }
378
379 match exit {
380 Some(status) => {
381 let failure = classify::classify_boot_smoke(status.code());
382 Ok(GateOutcome::failed(failure).with_log_ref(log_ref))
383 }
384 None => Ok(GateOutcome::passed(PassNote::StayedUp { duration_s: 3 })
385 .with_log_ref(log_ref)),
386 }
387 }
388
389 /// Drain `stream` into the shared `LiveLog` (which forwards each chunk to
390 /// the on-disk log file AND broadcasts a `GateLogChunk` event), and return
391 /// the concatenated bytes so the classifier can still operate on the full
392 /// output post-hoc.
393 async fn stream_into_log<R>(
394 stream: Option<R>,
395 log: std::sync::Arc<tokio::sync::Mutex<LiveLog>>,
396 ) -> Vec<u8>
397 where
398 R: tokio::io::AsyncRead + Unpin + Send + 'static,
399 {
400 let mut total = Vec::new();
401 let Some(mut s) = stream else { return total };
402 let mut buf = [0u8; 4096];
403 loop {
404 match s.read(&mut buf).await {
405 Ok(0) => break,
406 Err(_) => break,
407 Ok(n) => {
408 total.extend_from_slice(&buf[..n]);
409 log.lock().await.write_chunk(&buf[..n]).await;
410 }
411 }
412 }
413 total
414 }
415
416 /// Spawn a child, drain its stdout/stderr through a `LiveLog`, return the
417 /// combined buffers and exit status. Shared by `cargo_test` (no deadline)
418 /// and ad-hoc callers — `boot_smoke` rolls its own variant because of its
419 /// 3s kill window.
420 async fn stream_child_to_live_log(
421 child: &mut tokio::process::Child,
422 events: EventTx,
423 run_id: GateRunId,
424 log_path: PathBuf,
425 ) -> Result<(Vec<u8>, Vec<u8>, std::process::ExitStatus)> {
426 let log = std::sync::Arc::new(tokio::sync::Mutex::new(
427 LiveLog::open(events, run_id, log_path).await,
428 ));
429 let stdout_task = tokio::spawn(stream_into_log(child.stdout.take(), log.clone()));
430 let stderr_task = tokio::spawn(stream_into_log(child.stderr.take(), log.clone()));
431 let status = child.wait().await?;
432 let stdout_buf = stdout_task.await.unwrap_or_default();
433 let stderr_buf = stderr_task.await.unwrap_or_default();
434 if let Ok(mutex) = std::sync::Arc::try_unwrap(log) {
435 mutex.into_inner().close().await;
436 }
437 Ok((stdout_buf, stderr_buf, status))
438 }
439
440 fn gate_log_path(ctx: &GateCtx, gate: GateKind) -> PathBuf {
441 ctx.cfg.logs_root.join(ctx.version.to_string()).join(format!("{}.log", gate.as_str()))
442 }
443
444 async fn burn_in(ctx: &GateCtx, hours: u32) -> Result<GateOutcome> {
445 // Check tier_state.burn_in_started_at on this tier; pass if enough time
446 // has elapsed. The clock is started by /promote when a version lands on
447 // the burn-in tier.
448 let started: Option<String> = sqlx::query_scalar(
449 "SELECT burn_in_started_at FROM tier_state WHERE tier = ?",
450 )
451 .bind(&ctx.tier)
452 .fetch_optional(&ctx.pool)
453 .await?
454 .flatten();
455 let Some(started) = started else {
456 return Ok(GateOutcome::blocked(GateBlocker::BurnInClockNotStarted));
457 };
458 let started = chrono::DateTime::parse_from_rfc3339(&started)?.with_timezone(&Utc);
459 let elapsed = Utc::now() - started;
460 let needed = chrono::Duration::hours(hours as i64);
461 if elapsed >= needed {
462 Ok(GateOutcome::passed(PassNote::BurnInElapsed { hours: elapsed.num_hours() as u32 }))
463 } else {
464 let remaining = (needed - elapsed).num_hours().max(0) as u32;
465 Ok(GateOutcome::blocked(GateBlocker::BurnInRemaining {
466 hours_remaining: remaining,
467 hours_total: hours,
468 }))
469 }
470 }
471
472 async fn manual_confirm(ctx: &GateCtx) -> Result<GateOutcome> {
473 // Pass iff a row in gate_runs exists with status='passed' for this
474 // (tier, version, manual_confirm) that was inserted out-of-band by an
475 // operator action. Since the harness inserts the in-flight row itself,
476 // look for a prior confirmation row.
477 let prior_at: Option<String> = sqlx::query_scalar(
478 "SELECT finished_at FROM gate_runs
479 WHERE tier = ? AND version = ? AND gate_kind = 'manual_confirm' AND status = 'passed'
480 ORDER BY id DESC LIMIT 1",
481 )
482 .bind(&ctx.tier)
483 .bind(&ctx.version)
484 .fetch_optional(&ctx.pool)
485 .await?;
486 match prior_at {
487 Some(at_str) => {
488 let at = chrono::DateTime::parse_from_rfc3339(&at_str)
489 .map(|d| d.with_timezone(&Utc))
490 .unwrap_or_else(|_| Utc::now());
491 Ok(GateOutcome::passed(PassNote::OperatorConfirmed { at }))
492 }
493 None => Ok(GateOutcome::blocked(GateBlocker::AwaitingOperatorConfirmation)),
494 }
495 }
496
497 /// Write `<logs_root>/<version>/<gate_kind>.log` containing the gate's
498 /// captured stdout + stderr. Best-effort: any IO error is logged and
499 /// swallowed — a missing log must not turn a passing gate red.
500 ///
501 /// Layout matches §6.8 of launchplan_final.md so the `GET /logs/{version}/{gate}`
502 /// route can find it. We use `ctx.version` (semver, what `gate_runs.version`
503 /// already stores) rather than the git sha; the launchplan uses "sha" in
504 /// prose but switching the on-disk key is a Phase B change after `GitSha` /
505 /// `Version` newtypes land.
506 async fn persist_gate_log(ctx: &GateCtx, gate_kind: GateKind, stdout: &[u8], stderr: &[u8]) {
507 let dir = ctx.cfg.logs_root.join(ctx.version.to_string());
508 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
509 tracing::warn!(error = %e, dir = %dir.display(), "could not create gate log dir");
510 return;
511 }
512 let path = dir.join(format!("{}.log", gate_kind.as_str()));
513 let mut body = Vec::with_capacity(stdout.len() + stderr.len() + 64);
514 body.extend_from_slice(b"==== stdout ====\n");
515 body.extend_from_slice(stdout);
516 if !stdout.last().is_some_and(|b| *b == b'\n') {
517 body.push(b'\n');
518 }
519 body.extend_from_slice(b"==== stderr ====\n");
520 body.extend_from_slice(stderr);
521 if let Err(e) = tokio::fs::write(&path, &body).await {
522 tracing::warn!(error = %e, path = %path.display(), "could not write gate log");
523 }
524 }
525
526 #[cfg(test)]
527 mod tests {
528 use super::*;
529 use crate::events;
530 use sqlx::sqlite::SqlitePoolOptions;
531
532 /// burn_in returns a typed Blocked when the clock isn't started; the
533 /// runner persists status='blocked' + outcome_json (the json carries
534 /// blocker.kind = 'burn_in_clock_not_started').
535 #[tokio::test]
536 async fn burn_in_blocked_persists_typed_outcome() {
537 let pool = SqlitePoolOptions::new()
538 .max_connections(1)
539 .connect("sqlite::memory:")
540 .await
541 .unwrap();
542 crate::db::migrate(&pool).await.unwrap();
543 // Topology sync expects a tier row before gate_runs can reference it.
544 sqlx::query("INSERT INTO tiers (name, ord, provisioned, canary) VALUES ('host', 0, 0, 'sequential')")
545 .execute(&pool).await.unwrap();
546 sqlx::query("INSERT INTO tier_state (tier) VALUES ('host')")
547 .execute(&pool).await.unwrap();
548 // versions FK target.
549 sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES ('0.1.0', 'abc1234', '2026-01-01T00:00:00Z', '/tmp/x')")
550 .execute(&pool).await.unwrap();
551
552 let cfg = std::sync::Arc::new(crate::config::Config::for_tests());
553 let ctx = GateCtx {
554 pool: pool.clone(),
555 cfg,
556 tier: TierId::new("host"),
557 version: "0.1.0".parse().unwrap(),
558 worktree: std::path::PathBuf::from("/tmp/unused"),
559 events: events::channel(),
560 };
561 let out = run(&ctx, &Gate::BurnIn { hours: 24 }).await.unwrap();
562 assert_eq!(out.status_str(), "blocked");
563 assert!(!out.is_passed());
564
565 // Read the persisted row.
566 let row: (Option<String>, Option<String>) = sqlx::query_as(
567 "SELECT status, outcome_json FROM gate_runs ORDER BY id DESC LIMIT 1",
568 ).fetch_one(&pool).await.unwrap();
569 assert_eq!(row.0.as_deref(), Some("blocked"), "typed status");
570 let json: serde_json::Value = serde_json::from_str(row.1.as_deref().unwrap()).unwrap();
571 assert_eq!(json["status"]["kind"], "blocked");
572 assert_eq!(json["status"]["blocker"]["kind"], "burn_in_clock_not_started");
573 }
574
575 /// reset_scratch must drop every non-system schema, not just `public` —
576 /// otherwise migrations that create custom schemas (e.g. tower_sessions)
577 /// collide on the next run. This regressed once (Phase 0) and the fix is
578 /// load-bearing for migration_dry_run.
579 ///
580 /// Gated on `SANDO_TEST_PG_URL` so it only runs where postgres is
581 /// available. Set `SANDO_TEST_PG_URL=postgres:///sando_scratch?host=/var/run/postgresql`
582 /// (or similar) before `cargo test`.
583 #[tokio::test]
584 async fn reset_scratch_drops_all_non_system_schemas() {
585 let Ok(url) = std::env::var("SANDO_TEST_PG_URL") else {
586 eprintln!("skipping: SANDO_TEST_PG_URL not set");
587 return;
588 };
589 use sqlx::Executor;
590 use sqlx::postgres::PgPoolOptions;
591
592 let pool = PgPoolOptions::new().max_connections(1).connect(&url).await.unwrap();
593 // Plant two non-system schemas + a table in each.
594 pool.execute("DROP SCHEMA IF EXISTS foo CASCADE; CREATE SCHEMA foo; CREATE TABLE foo.t (i int);")
595 .await.unwrap();
596 pool.execute("DROP SCHEMA IF EXISTS tower_sessions CASCADE; CREATE SCHEMA tower_sessions; CREATE TABLE tower_sessions.session (id text);")
597 .await.unwrap();
598 pool.close().await;
599
600 reset_scratch(&url).await.expect("reset_scratch");
601
602 let pool = PgPoolOptions::new().max_connections(1).connect(&url).await.unwrap();
603 let rows: Vec<(String,)> = sqlx::query_as(
604 "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname <> 'information_schema'",
605 )
606 .fetch_all(&pool)
607 .await
608 .unwrap();
609 let names: Vec<String> = rows.into_iter().map(|(s,)| s).collect();
610 // After reset, only `public` should remain among non-system schemas.
611 assert_eq!(names, vec!["public".to_string()], "got: {names:?}");
612 pool.close().await;
613 }
614
615 /// Sanity: applying MNW migrations from a *non-existent* dir errors,
616 /// rather than silently no-op'ing. Cheap pure check, no postgres needed
617 /// (the sqlx::Migrator::new constructor itself reads the dir).
618 #[tokio::test]
619 async fn run_migrator_errors_on_missing_dir() {
620 // The first thing run_migrator does is `Migrator::new(dir)`, which
621 // needs a real dir to read migration files from.
622 let res = run_migrator("postgres:///does-not-matter", std::path::Path::new("/nonexistent/sando-test-migrations")).await;
623 assert!(res.is_err());
624 }
625 }
626