Skip to main content

max / makenotwork

sando: MVP — daemon, gates, deploy, TUI Builds out the full v0 deploy loop on top of the scaffold: - Topology -> SQLite sync on startup with stale-row pruning and a refusal to drop a tier whose tier_state still pins a version. - POST /rebuild: post-receive hook (auto-installed into the bare repo) triggers a sha checkout, cargo build --release, version row, MM-tier staging in releases/<version>/, and the MM gate pipeline. - Gates: cargo_test, migration_dry_run (scratch postgres + sqlx Migrator against server/migrations), boot_smoke (liveness probe), burn_in (clock check via tier_state.burn_in_started_at), manual_confirm. - POST /promote/{tier}: verifies predecessor gates (hotfix skips burn_in; reset_burn_in optionally clears the source clock), deploys each node per canary policy, advances tier_state. - POST /rollback/{tier}: atomic symlink flip back to previous_version. - POST /backup/fetch: file:// and rsync:// sources. - GET /state: tier list with current/previous version, burn-in clock, nodes, last gate outcome per kind. - TUI: polls /state, renders a tier table. q quits, r refreshes. - Atomic ln -sfn symlink swap for local deploys; remote ssh/rsync deploys stubbed (use ssh_target=local for dev). - README documents the localhost dev loop, API surface, and v0 cuts. Includes 4 unit tests covering topology sync (basic, idempotent, node removal, pinned-tier refusal).
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-23 02:12 UTC
Commit: 68922dfc552c62d44094565c687e1c2cfc27c5f2
Parent: b90ce23
16 files changed, +1531 insertions, -40 deletions
@@ -0,0 +1,138 @@
1 + # sando
2 +
3 + Home-rolled CI/CD controller for the MNW server. Axum daemon (`sandod`) +
4 + ratatui TUI (`sando`). Gates a tiered deploy flow:
5 +
6 + ```
7 + git push mm -> MakeMachine (build + tests + migration dry-run + boot smoke)
8 + -> A (testnot.work)
9 + -> B (prod-1)
10 + -> C (prod-2)
11 + ```
12 +
13 + Each tier's progression gates are declared in `sando.toml`. Tiers and nodes
14 + live in the TOML, not in code — adding a node or a new tier is a config edit.
15 +
16 + ## Crates
17 +
18 + | Path | Binary | Role |
19 + |------|--------|------|
20 + | `daemon/` | `sandod` | Axum daemon. Runs on the MakeMachine. Owns SQLite state, the bare git repo, and all build/gate/deploy logic. |
21 + | `tui/` | `sando` | ratatui front-end. Runs on the laptop. Talks to `sandod` over the tailnet. |
22 +
23 + ## Quickstart: localhost dev loop
24 +
25 + The MakeMachine hardware does not exist yet, so v0 runs entirely on a single
26 + host. Bare repo, releases dir, "remote" A node — everything is a local
27 + directory.
28 +
29 + ```bash
30 + # 1. Build both binaries.
31 + cd MNW/sando/daemon && cargo build
32 + cd ../tui && cargo build
33 +
34 + # 2. Create a workspace + config.
35 + mkdir -p /tmp/sando-dev
36 + cat > /tmp/sando-dev/daemon.toml <<EOF
37 + listen = "127.0.0.1:7766"
38 + db_path = "/tmp/sando-dev/sando.db"
39 + topology_path = "/tmp/sando-dev/sando.toml"
40 + workdir = "/tmp/sando-dev/work"
41 + release_root = "/tmp/sando-dev/releases"
42 + # scratch_db_url = "postgres://you@127.0.0.1/sando_scratch"
43 + EOF
44 +
45 + cat > /tmp/sando-dev/sando.toml <<EOF
46 + [repo]
47 + bare_path = "/tmp/sando-dev/mnw.git"
48 + branch = "main"
49 + [backup]
50 + source = "file:///tmp/sando-dev/fake-backup.sql"
51 + local_path = "/tmp/sando-dev/backup.sql"
52 +
53 + [[tier]]
54 + name = "mm"
55 + provisioned = true
56 + gates = [
57 + { kind = "cargo_test" },
58 + { kind = "migration_dry_run" },
59 + { kind = "boot_smoke" },
60 + ]
61 +
62 + [[tier]]
63 + name = "a"
64 + provisioned = true
65 + canary = "sequential"
66 + gates = [
67 + { kind = "boot_smoke" },
68 + { kind = "manual_confirm" },
69 + ]
70 + [[tier.node]]
71 + name = "a-local"
72 + ssh_target = "local"
73 + release_root = "/tmp/sando-dev/a-node"
74 + EOF
75 +
76 + # 3. Run the daemon.
77 + SANDO_CONFIG=/tmp/sando-dev/daemon.toml \
78 + ./MNW/sando/daemon/target/debug/sandod
79 +
80 + # 4. In another shell: point a clone at the bare repo and push.
81 + git clone /tmp/sando-dev/mnw.git /tmp/sando-dev/checkout
82 + # ... add a `server/Cargo.toml` + source so the build can run ...
83 + cd /tmp/sando-dev/checkout && git push origin main
84 +
85 + # 5. Watch the TUI.
86 + SANDO_DAEMON=http://127.0.0.1:7766 ./MNW/sando/tui/target/debug/sando
87 + ```
88 +
89 + When you push, the bare repo's `post-receive` hook (installed automatically
90 + by `sandod` on startup) calls `POST /rebuild`. The daemon checks out the
91 + sha, runs `cargo build --release` against `server/`, stages the binary in
92 + `releases/<version>/server`, then runs the MM tier's gates. On green, MM's
93 + `tier_state` advances. Promote with:
94 +
95 + ```bash
96 + curl -X POST http://127.0.0.1:7766/promote/a \
97 + -H 'Content-Type: application/json' \
98 + -d '{"version":"0.8.2"}'
99 + ```
100 +
101 + ## API
102 +
103 + | Method | Path | Body | Purpose |
104 + |--------|------|------|---------|
105 + | GET | `/state` | — | Tier list + current/previous version + last gate outcomes |
106 + | POST | `/rebuild` | `{sha?: string}` | Force a build; if `sha` is absent, resolves the configured deploy branch |
107 + | POST | `/promote/{tier}` | `{version, hotfix?, reset_burn_in?}` | Verify predecessor gates, deploy to tier nodes, advance state |
108 + | POST | `/rollback/{tier}` | — | Swap `current` symlink to `previous_version` on every node in the tier |
109 + | POST | `/backup/fetch` | — | Pull the prod backup to `backup.local_path` (file:// or rsync://) |
110 + | GET | `/metrics` | — | Prometheus exposition |
111 + | GET | `/events` | — | WebSocket stream of deploy + gate events (not yet implemented) |
112 +
113 + ## Hotfix flow
114 +
115 + `POST /promote/{tier}` accepts:
116 +
117 + - `hotfix: true` — skips the `burn_in` gate on the predecessor tier only. All
118 + other gates still apply.
119 + - `reset_burn_in: true` (default `false`) — additionally nulls
120 + `tier_state.burn_in_started_at` on the source tier, restarting the clock
121 + for whatever else is still burning in there. Use this only when the hotfix
122 + meaningfully changes the surface area under burn-in.
123 +
124 + ## v0 limitations
125 +
126 + - Remote deploys (real SSH/rsync) are stubbed. Use `ssh_target = "local"` and
127 + a local `release_root` for dev. Production wiring is a follow-up.
128 + - `migration_dry_run` requires a scratch Postgres at `scratch_db_url`. The
129 + gate drops and recreates `public` on every run; do not point this at
130 + anything that matters.
131 + - `/events` WebSocket is not implemented; the TUI polls `/state` every 2s.
132 + - `manual_confirm` has no operator-facing trigger yet (you have to insert a
133 + `gate_runs` row with `passed=1` by hand to satisfy it).
134 +
135 + ## License
136 +
137 + MIT. The surrounding MNW monorepo is PolyForm-Noncommercial — sando is
138 + deliberately MIT'd because it's deploy infra, not the product.
@@ -14,7 +14,7 @@ tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread", "net", "s
14 14 serde = { version = "1.0.228", features = ["derive"] }
15 15 serde_json = "1"
16 16 toml = "0.8"
17 - sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite", "chrono", "migrate"] }
17 + sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite", "postgres", "chrono", "migrate"] }
18 18 tracing = "0.1.44"
19 19 tracing-subscriber = { version = "0.3.22", features = ["env-filter", "json"] }
20 20 metrics = "0.24"
@@ -2,3 +2,7 @@
2 2 listen = "127.0.0.1:7766"
3 3 db_path = "./sando.db"
4 4 topology_path = "../sando.toml"
5 + workdir = "./work"
6 + release_root = "./releases"
7 + # Dropped and recreated on every migration_dry_run. Leave unset to skip.
8 + scratch_db_url = "postgres://sando@127.0.0.1/sando_scratch"
@@ -0,0 +1,73 @@
1 + //! Fetch the prod backup that `migration_dry_run` runs against.
2 + //!
3 + //! Sources supported in v0:
4 + //! - `file:///abs/path/to/dump.sql.gz` — local copy. Used for localhost dev.
5 + //! - `rsync://host/module/path` — shells out to `rsync`. Used when MM
6 + //! pulls from an astra/Hetzner replica.
7 + //!
8 + //! The fetch is command-driven: the operator triggers it via /backup/fetch, it
9 + //! is not implicit in promote. That keeps the slowest, most failure-prone step
10 + //! visible in the TUI rather than buried inside a deploy.
11 +
12 + use crate::config::Config;
13 + use crate::topology::Topology;
14 + use anyhow::{Context, Result};
15 + use chrono::Utc;
16 + use sqlx::SqlitePool;
17 + use std::path::Path;
18 + use std::sync::Arc;
19 + use tokio::process::Command;
20 +
21 + #[derive(Debug, Clone)]
22 + pub struct FetchedBackup {
23 + pub source: String,
24 + pub local_path: String,
25 + pub byte_size: Option<i64>,
26 + }
27 +
28 + pub async fn fetch(
29 + pool: &SqlitePool,
30 + _cfg: &Arc<Config>,
31 + topo: &Arc<Topology>,
32 + ) -> Result<FetchedBackup> {
33 + let source = topo.backup.source.clone();
34 + let local_path = topo.backup.local_path.clone();
35 +
36 + if let Some(parent) = Path::new(&local_path).parent() {
37 + tokio::fs::create_dir_all(parent).await?;
38 + }
39 +
40 + if let Some(rest) = source.strip_prefix("file://") {
41 + tokio::fs::copy(rest, &local_path)
42 + .await
43 + .with_context(|| format!("copy {rest} -> {local_path}"))?;
44 + } else if source.starts_with("rsync://") {
45 + let out = Command::new("rsync")
46 + .args(["-az", "--inplace", &source, &local_path])
47 + .output()
48 + .await
49 + .context("spawning rsync")?;
50 + anyhow::ensure!(
51 + out.status.success(),
52 + "rsync failed: {}",
53 + String::from_utf8_lossy(&out.stderr),
54 + );
55 + } else {
56 + anyhow::bail!("unsupported backup source scheme: {source}");
57 + }
58 +
59 + let meta = tokio::fs::metadata(&local_path).await?;
60 + let size = meta.len() as i64;
61 +
62 + sqlx::query(
63 + "INSERT INTO backups (fetched_at, source, local_path, byte_size) VALUES (?, ?, ?, ?)",
64 + )
65 + .bind(Utc::now().to_rfc3339())
66 + .bind(&source)
67 + .bind(&local_path)
68 + .bind(size)
69 + .execute(pool)
70 + .await?;
71 +
72 + Ok(FetchedBackup { source, local_path, byte_size: Some(size) })
73 + }
@@ -0,0 +1,145 @@
1 + //! Build orchestration: resolve a sha to a worktree, read the server version,
2 + //! shell out to `cargo build --release`, record a `versions` row.
3 + //!
4 + //! Runs as a tokio task spawned from `POST /rebuild`; the HTTP request
5 + //! returns the version id immediately and the task drives the rest.
6 +
7 + use crate::config::Config;
8 + use crate::deploy;
9 + use crate::gates::{self, GateCtx};
10 + use crate::git;
11 + use crate::topology::Topology;
12 + use anyhow::{Context, Result};
13 + use chrono::Utc;
14 + use sqlx::SqlitePool;
15 + use std::path::{Path, PathBuf};
16 + use std::sync::Arc;
17 + use tokio::process::Command;
18 +
19 + #[derive(Debug, Clone)]
20 + pub struct BuildArtifact {
21 + pub version: String,
22 + pub git_sha: String,
23 + pub worktree: PathBuf,
24 + pub binary_path: PathBuf,
25 + }
26 +
27 + pub async fn run(
28 + pool: SqlitePool,
29 + cfg: Arc<Config>,
30 + topo: Arc<Topology>,
31 + sha: String,
32 + ) -> Result<BuildArtifact> {
33 + let worktree = cfg.workdir.join(&sha);
34 + let bare = PathBuf::from(&topo.repo.bare_path);
35 + git::checkout_worktree(&bare, &sha, &worktree).await?;
36 +
37 + let server_dir = worktree.join("server");
38 + let version = read_pkg_version(&server_dir.join("Cargo.toml")).await
39 + .with_context(|| format!("reading version from {}/Cargo.toml", server_dir.display()))?;
40 +
41 + let out = Command::new("cargo")
42 + .arg("build")
43 + .arg("--release")
44 + .current_dir(&server_dir)
45 + .output()
46 + .await
47 + .context("spawning cargo build")?;
48 + anyhow::ensure!(
49 + out.status.success(),
50 + "cargo build --release failed:\n{}",
51 + tail(&out.stderr, 4_000),
52 + );
53 +
54 + let binary_path = server_dir.join("target/release/server");
55 + anyhow::ensure!(
56 + binary_path.exists(),
57 + "expected binary at {} after build",
58 + binary_path.display(),
59 + );
60 +
61 + sqlx::query(
62 + "INSERT OR IGNORE INTO versions (version, git_sha, built_at, artifact_path)
63 + VALUES (?, ?, ?, ?)",
64 + )
65 + .bind(&version)
66 + .bind(&sha)
67 + .bind(Utc::now().to_rfc3339())
68 + .bind(binary_path.to_string_lossy().as_ref())
69 + .execute(&pool)
70 + .await?;
71 +
72 + Ok(BuildArtifact { version, git_sha: sha, worktree, binary_path })
73 + }
74 +
75 + /// Full MM-tier pipeline: build, deploy the binary into MM's release_root,
76 + /// run MM's configured gates against the worktree, set tier_state.mm if all
77 + /// pass. Errors propagate back to the spawned task and get logged.
78 + pub async fn build_and_run_mm(
79 + pool: SqlitePool,
80 + cfg: Arc<Config>,
81 + topo: Arc<Topology>,
82 + sha: String,
83 + ) -> Result<()> {
84 + let art = run(pool.clone(), cfg.clone(), topo.clone(), sha).await?;
85 +
86 + // Stage the binary in MM's release_root so future gates and the MM
87 + // self-deploy point at a stable path, not the worktree's target/.
88 + let mm_release_root = &cfg.release_root;
89 + let staged = deploy::deploy_local(mm_release_root, &art.version, &art.binary_path).await?;
90 + let staged_bin = staged.join("server");
91 + sqlx::query("UPDATE versions SET artifact_path = ? WHERE version = ?")
92 + .bind(staged_bin.to_string_lossy().as_ref())
93 + .bind(&art.version)
94 + .execute(&pool)
95 + .await?;
96 +
97 + // Find the MM tier's gate list. MM is conventionally named "mm".
98 + let mm = topo.tiers.iter().find(|t| t.name == "mm")
99 + .context("topology has no `mm` tier")?;
100 +
101 + let ctx = GateCtx {
102 + pool: pool.clone(),
103 + cfg: cfg.clone(),
104 + tier: "mm".to_string(),
105 + version: art.version.clone(),
106 + worktree: art.worktree.clone(),
107 + };
108 + let ok = gates::run_all(&ctx, &mm.gates).await?;
109 +
110 + if ok {
111 + let prev: Option<String> = sqlx::query_scalar(
112 + "SELECT current_version FROM tier_state WHERE tier = 'mm'",
113 + )
114 + .fetch_optional(&pool).await?.flatten();
115 + sqlx::query(
116 + "UPDATE tier_state SET previous_version = ?, current_version = ?, burn_in_started_at = ?
117 + WHERE tier = 'mm'",
118 + )
119 + .bind(prev)
120 + .bind(&art.version)
121 + .bind(Utc::now().to_rfc3339())
122 + .execute(&pool)
123 + .await?;
124 + tracing::info!(version = %art.version, "MM pipeline green; ready to promote to next tier");
125 + } else {
126 + tracing::warn!(version = %art.version, "MM pipeline red; not advancing tier_state");
127 + }
128 + Ok(())
129 + }
130 +
131 + async fn read_pkg_version(cargo_toml: &Path) -> Result<String> {
132 + let raw = tokio::fs::read_to_string(cargo_toml).await?;
133 + let parsed: toml::Value = toml::from_str(&raw)?;
134 + let v = parsed
135 + .get("package")
136 + .and_then(|p| p.get("version"))
137 + .and_then(|v| v.as_str())
138 + .context("package.version not found")?;
139 + Ok(v.to_string())
140 + }
141 +
142 + fn tail(buf: &[u8], max: usize) -> String {
143 + let s = String::from_utf8_lossy(buf);
144 + if s.len() <= max { s.into_owned() } else { s[s.len() - max..].to_string() }
145 + }
@@ -2,11 +2,20 @@ use anyhow::{Context, Result};
2 2 use serde::Deserialize;
3 3 use std::path::PathBuf;
4 4
5 - #[derive(Debug, Deserialize)]
5 + #[derive(Debug, Clone, Deserialize)]
6 6 pub struct Config {
7 7 pub listen: String,
8 8 pub db_path: PathBuf,
9 9 pub topology_path: PathBuf,
10 + /// MM-local checkout scratch dir (per-sha worktrees live here).
11 + pub workdir: PathBuf,
12 + /// MM-local releases dir (`releases/<version>/` and `current` live here).
13 + pub release_root: PathBuf,
14 + /// Scratch postgres DB url used by `migration_dry_run`. Sando drops and
15 + /// recreates the schema on every run, so do not point this at anything
16 + /// you care about.
17 + #[serde(default)]
18 + pub scratch_db_url: Option<String>,
10 19 }
11 20
12 21 impl Config {
@@ -0,0 +1,71 @@
1 + //! Atomic symlink-swap deploys.
2 + //!
3 + //! Layout on every target (MM, A nodes, B nodes, ...):
4 + //!
5 + //! <release_root>/
6 + //! releases/
7 + //! 0.8.1/
8 + //! server <- the binary
9 + //! 0.8.2/
10 + //! server
11 + //! current -> releases/0.8.2
12 + //!
13 + //! `ln -sfn` makes the swap atomic on Linux. systemd units should point at
14 + //! `<release_root>/current/server` so a swap + reload picks up the new binary
15 + //! without a window where the unit references a missing path.
16 + //!
17 + //! v0 only implements local deploys (used for MM and for localhost-dev
18 + //! "remote" nodes whose ssh_target is `local`). Real SSH/rsync deploys are
19 + //! follow-up work — see the `remote_deploy_stub` branch.
20 +
21 + use crate::topology::Node;
22 + use anyhow::{Context, Result};
23 + use std::path::{Path, PathBuf};
24 + use tokio::process::Command;
25 +
26 + pub async fn deploy_local(release_root: &Path, version: &str, binary: &Path) -> Result<PathBuf> {
27 + let release_dir = release_root.join("releases").join(version);
28 + tokio::fs::create_dir_all(&release_dir).await?;
29 + let dest = release_dir.join("server");
30 + tokio::fs::copy(binary, &dest)
31 + .await
32 + .with_context(|| format!("copy {} -> {}", binary.display(), dest.display()))?;
33 +
34 + let current = release_root.join("current");
35 + // ln -sfn is atomic on Linux; on macOS the dev path is non-prod so the
36 + // race is irrelevant. We shell out rather than using std::os::unix::fs
37 + // symlink + rename because the rename-over-symlink pattern is platform-fussy.
38 + let target = format!("releases/{version}");
39 + let out = Command::new("ln")
40 + .args(["-sfn", &target])
41 + .arg(&current)
42 + .output()
43 + .await?;
44 + anyhow::ensure!(
45 + out.status.success(),
46 + "symlink swap failed: {}",
47 + String::from_utf8_lossy(&out.stderr),
48 + );
49 + Ok(release_dir)
50 + }
51 +
52 + pub async fn deploy_node(node: &Node, version: &str, binary: &Path) -> Result<PathBuf> {
53 + if node.ssh_target == "local" || node.ssh_target.is_empty() {
54 + return deploy_local(Path::new(&node.release_root), version, binary).await;
55 + }
56 + remote_deploy_stub(node, version, binary).await
57 + }
58 +
59 + async fn remote_deploy_stub(node: &Node, version: &str, _binary: &Path) -> Result<PathBuf> {
60 + // Real implementation: rsync the binary to <ssh_target>:<release_root>/releases/<version>/server,
61 + // then ssh <ssh_target> "ln -sfn releases/<version> current && systemctl reload-or-restart <unit>".
62 + // Wiring this up needs a story for systemd unit naming and ssh key/auth conventions; deferring
63 + // until the localhost smoke loop is settled and we know which knobs matter.
64 + anyhow::bail!(
65 + "remote deploy not yet implemented (node {} -> {}); use ssh_target=local for dev",
66 + node.name,
67 + node.ssh_target,
68 + );
69 + #[allow(unreachable_code)]
70 + Ok(PathBuf::from(&node.release_root).join("releases").join(version))
71 + }
@@ -1,12 +1,281 @@
1 + //! Gate execution. Each gate kind has a runner that produces a pass/fail
2 + //! outcome plus an optional detail string (typically a stderr tail or a
3 + //! human-readable reason). Outcomes are persisted to `gate_runs` so /state
4 + //! and the TUI can show them.
5 +
6 + use crate::config::Config;
1 7 use crate::topology::Gate;
8 + use anyhow::Result;
9 + use chrono::Utc;
10 + use sqlx::SqlitePool;
11 + use std::path::PathBuf;
12 + use std::sync::Arc;
13 + use tokio::process::Command;
14 +
15 + pub struct GateCtx {
16 + pub pool: SqlitePool,
17 + pub cfg: Arc<Config>,
18 + pub tier: String,
19 + pub version: String,
20 + pub worktree: PathBuf,
21 + }
2 22
3 23 #[derive(Debug, Clone)]
4 24 pub struct GateOutcome {
5 - pub gate: String,
6 25 pub passed: bool,
7 26 pub detail: Option<String>,
8 27 }
9 28
10 - pub async fn run(_gate: &Gate) -> GateOutcome {
11 - todo!("gate execution: cargo test, migration dry-run, boot smoke, burn-in check, manual confirm")
29 + /// Run a single gate end-to-end: insert the in-flight row, execute the gate,
30 + /// update the row with the outcome. Returns the outcome for the caller.
31 + pub async fn run(ctx: &GateCtx, gate: &Gate) -> Result<GateOutcome> {
32 + let kind = kind_str(gate);
33 + let started_at = Utc::now().to_rfc3339();
34 +
35 + let id: i64 = sqlx::query_scalar(
36 + "INSERT INTO gate_runs (version, tier, gate_kind, started_at) VALUES (?, ?, ?, ?)
37 + RETURNING id",
38 + )
39 + .bind(&ctx.version)
40 + .bind(&ctx.tier)
41 + .bind(kind)
42 + .bind(&started_at)
43 + .fetch_one(&ctx.pool)
44 + .await?;
45 +
46 + tracing::info!(tier = %ctx.tier, version = %ctx.version, gate = kind, "gate start");
47 +
48 + let outcome = match gate {
49 + Gate::CargoTest => cargo_test(ctx).await,
50 + Gate::MigrationDryRun => migration_dry_run(ctx).await,
51 + Gate::BootSmoke => boot_smoke(ctx).await,
52 + Gate::BurnIn { hours } => burn_in(ctx, *hours).await,
53 + Gate::ManualConfirm => manual_confirm(ctx).await,
54 + };
55 +
56 + let outcome = outcome.unwrap_or_else(|e| GateOutcome {
57 + passed: false,
58 + detail: Some(format!("gate runner errored: {e}")),
59 + });
60 +
61 + sqlx::query(
62 + "UPDATE gate_runs SET finished_at = ?, passed = ?, detail = ? WHERE id = ?",
63 + )
64 + .bind(Utc::now().to_rfc3339())
65 + .bind(outcome.passed as i64)
66 + .bind(outcome.detail.as_deref())
67 + .bind(id)
68 + .execute(&ctx.pool)
69 + .await?;
70 +
71 + tracing::info!(
72 + tier = %ctx.tier, version = %ctx.version, gate = kind,
73 + passed = outcome.passed, "gate done",
74 + );
75 +
76 + Ok(outcome)
77 + }
78 +
79 + /// Run a sequence of gates; stops on the first failure (no point running the
80 + /// rest if a prerequisite failed). Returns true iff every gate passed.
81 + pub async fn run_all(ctx: &GateCtx, gates: &[Gate]) -> Result<bool> {
82 + for g in gates {
83 + let o = run(ctx, g).await?;
84 + if !o.passed {
85 + return Ok(false);
86 + }
87 + }
88 + Ok(true)
89 + }
90 +
91 + fn kind_str(g: &Gate) -> &'static str {
92 + match g {
93 + Gate::CargoTest => "cargo_test",
94 + Gate::MigrationDryRun => "migration_dry_run",
95 + Gate::BootSmoke => "boot_smoke",
96 + Gate::BurnIn { .. } => "burn_in",
97 + Gate::ManualConfirm => "manual_confirm",
98 + }
99 + }
100 +
101 + // ---- individual gate runners ----
102 +
103 + async fn cargo_test(ctx: &GateCtx) -> Result<GateOutcome> {
104 + let server_dir = ctx.worktree.join("server");
105 + let out = Command::new("cargo")
106 + .args(["test", "--release"])
107 + .current_dir(&server_dir)
108 + .output()
109 + .await?;
110 + Ok(GateOutcome {
111 + passed: out.status.success(),
112 + detail: Some(tail(&out.stderr, 4_000)),
113 + })
114 + }
115 +
116 + async fn migration_dry_run(ctx: &GateCtx) -> Result<GateOutcome> {
117 + let Some(db_url) = ctx.cfg.scratch_db_url.as_deref() else {
118 + return Ok(GateOutcome {
119 + passed: false,
120 + detail: Some("scratch_db_url unset in daemon config".into()),
121 + });
122 + };
123 +
124 + let backup: Option<(String,)> = sqlx::query_as(
125 + "SELECT local_path FROM backups ORDER BY id DESC LIMIT 1",
126 + )
127 + .fetch_optional(&ctx.pool)
128 + .await?;
129 + let Some((backup_path,)) = backup else {
130 + return Ok(GateOutcome {
131 + passed: false,
132 + detail: Some("no backup fetched; call /backup/fetch first".into()),
133 + });
134 + };
135 +
136 + // Reset the scratch DB: drop schema public, restore dump, run migrations.
137 + if let Err(e) = reset_scratch(db_url).await {
138 + return Ok(GateOutcome { passed: false, detail: Some(format!("scratch reset: {e}")) });
139 + }
140 + if let Err(e) = restore_dump(db_url, &backup_path).await {
141 + return Ok(GateOutcome { passed: false, detail: Some(format!("restore: {e}")) });
142 + }
143 +
144 + let migrations_dir = ctx.worktree.join("server").join("migrations");
145 + match run_migrator(db_url, &migrations_dir).await {
146 + Ok(()) => Ok(GateOutcome { passed: true, detail: Some(format!("restored {backup_path} + migrated")) }),
147 + Err(e) => Ok(GateOutcome { passed: false, detail: Some(tail(e.to_string().as_bytes(), 4_000)) }),
148 + }
149 + }
150 +
151 + async fn reset_scratch(db_url: &str) -> Result<()> {
152 + use sqlx::postgres::PgPoolOptions;
153 + use sqlx::Executor;
154 + let pool = PgPoolOptions::new().max_connections(1).connect(db_url).await?;
155 + pool.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public;")
156 + .await?;
157 + pool.close().await;
158 + Ok(())
159 + }
160 +
161 + async fn restore_dump(db_url: &str, dump: &str) -> Result<()> {
162 + // Two pipelines we accept:
163 + // *.sql -> psql $url < dump
164 + // *.sql.gz -> gunzip -c dump | psql $url
165 + let is_gz = dump.ends_with(".gz");
166 + let shell = if is_gz {
167 + format!("gunzip -c {q} | psql {url}", q = shell_escape(dump), url = shell_escape(db_url))
168 + } else {
169 + format!("psql {url} < {q}", q = shell_escape(dump), url = shell_escape(db_url))
170 + };
171 + let out = Command::new("sh").arg("-c").arg(&shell).output().await?;
172 + anyhow::ensure!(
173 + out.status.success(),
174 + "restore failed: {}",
175 + String::from_utf8_lossy(&out.stderr),
176 + );
177 + Ok(())
178 + }
179 +
180 + async fn run_migrator(db_url: &str, dir: &std::path::Path) -> Result<()> {
181 + use sqlx::postgres::PgPoolOptions;
182 + let pool = PgPoolOptions::new().max_connections(1).connect(db_url).await?;
183 + let migrator = sqlx::migrate::Migrator::new(dir).await?;
184 + migrator.run(&pool).await?;
185 + pool.close().await;
186 + Ok(())
187 + }
188 +
189 + fn shell_escape(s: &str) -> String {
190 + format!("'{}'", s.replace('\'', "'\\''"))
191 + }
192 +
193 + async fn boot_smoke(ctx: &GateCtx) -> Result<GateOutcome> {
194 + let bin: Option<(String,)> = sqlx::query_as(
195 + "SELECT artifact_path FROM versions WHERE version = ?",
196 + )
197 + .bind(&ctx.version)
198 + .fetch_optional(&ctx.pool)
199 + .await?;
200 + let Some((bin,)) = bin else {
201 + return Ok(GateOutcome { passed: false, detail: Some("no artifact for version".into()) });
202 + };
203 +
204 + // Lowest-bar smoke: start the binary and verify it stays up for a few
205 + // seconds without exiting. Panics in main, missing config, port-bind
206 + // failures show up here. Anything more ambitious (probing /healthz on a
207 + // real port) needs server config we don't generically know.
208 + let mut child = match tokio::process::Command::new(&bin)
209 + .env("SANDO_BOOT_SMOKE", "1")
210 + .kill_on_drop(true)
211 + .spawn()
212 + {
213 + Ok(c) => c,
214 + Err(e) => return Ok(GateOutcome { passed: false, detail: Some(format!("spawn: {e}")) }),
215 + };
216 +
217 + tokio::time::sleep(std::time::Duration::from_secs(3)).await;
218 +
219 + match child.try_wait()? {
220 + Some(status) => Ok(GateOutcome {
221 + passed: false,
222 + detail: Some(format!("binary exited early: {status}")),
223 + }),
224 + None => {
225 + let _ = child.kill().await;
226 + Ok(GateOutcome { passed: true, detail: Some("stayed up for 3s".into()) })
227 + }
228 + }
229 + }
230 +
231 + async fn burn_in(ctx: &GateCtx, hours: u32) -> Result<GateOutcome> {
232 + // Check tier_state.burn_in_started_at on this tier; pass if enough time
233 + // has elapsed. The clock is started by /promote when a version lands on
234 + // the burn-in tier.
235 + let started: Option<String> = sqlx::query_scalar(
236 + "SELECT burn_in_started_at FROM tier_state WHERE tier = ?",
237 + )
238 + .bind(&ctx.tier)
239 + .fetch_optional(&ctx.pool)
240 + .await?
241 + .flatten();
242 + let Some(started) = started else {
243 + return Ok(GateOutcome { passed: false, detail: Some("burn-in clock not started".into()) });
244 + };
245 + let started = chrono::DateTime::parse_from_rfc3339(&started)?.with_timezone(&Utc);
246 + let elapsed = Utc::now() - started;
247 + let needed = chrono::Duration::hours(hours as i64);
248 + if elapsed >= needed {
249 + Ok(GateOutcome { passed: true, detail: Some(format!("{} hours elapsed", elapsed.num_hours())) })
250 + } else {
251 + let remaining = needed - elapsed;
252 + Ok(GateOutcome {
253 + passed: false,
254 + detail: Some(format!("{} hours remaining of {hours}", remaining.num_hours())),
255 + })
256 + }
257 + }
258 +
259 + async fn manual_confirm(ctx: &GateCtx) -> Result<GateOutcome> {
260 + // Pass iff a row in gate_runs exists with passed=1 for this (tier, version, manual_confirm)
261 + // that was inserted out-of-band by an operator action. Since the harness inserts the
262 + // in-flight row itself, look for a prior confirmation row.
263 + let prior: Option<i64> = sqlx::query_scalar(
264 + "SELECT COUNT(*) FROM gate_runs
265 + WHERE tier = ? AND version = ? AND gate_kind = 'manual_confirm' AND passed = 1",
266 + )
267 + .bind(&ctx.tier)
268 + .bind(&ctx.version)
269 + .fetch_optional(&ctx.pool)
270 + .await?;
271 + let passed = prior.unwrap_or(0) > 0;
272 + Ok(GateOutcome {
273 + passed,
274 + detail: if passed { None } else { Some("waiting on operator confirmation".into()) },
275 + })
276 + }
277 +
278 + fn tail(buf: &[u8], max: usize) -> String {
279 + let s = String::from_utf8_lossy(buf);
280 + if s.len() <= max { s.into_owned() } else { format!("...{}", &s[s.len() - max..]) }
12 281 }
@@ -0,0 +1,79 @@
1 + //! Thin shell wrappers around `git`. We avoid pulling in libgit2 — the daemon
2 + //! shells out for `cargo build` and friends anyway, and `git` is always on the
3 + //! MakeMachine.
4 +
5 + use anyhow::{Context, Result};
6 + use std::path::Path;
7 + use tokio::process::Command;
8 +
9 + const POST_RECEIVE: &str = include_str!("../../hooks/post-receive");
10 +
11 + pub async fn ensure_bare_repo(path: &Path) -> Result<()> {
12 + if !path.join("HEAD").exists() {
13 + tokio::fs::create_dir_all(path).await?;
14 + let out = Command::new("git")
15 + .args(["init", "--bare", "--initial-branch=main"])
16 + .arg(path)
17 + .output()
18 + .await
19 + .context("spawning git init")?;
20 + anyhow::ensure!(
21 + out.status.success(),
22 + "git init --bare failed: {}",
23 + String::from_utf8_lossy(&out.stderr),
24 + );
25 + }
26 + install_hook(path).await?;
27 + Ok(())
28 + }
29 +
30 + async fn install_hook(bare: &Path) -> Result<()> {
31 + let hook = bare.join("hooks").join("post-receive");
32 + tokio::fs::create_dir_all(bare.join("hooks")).await?;
33 + tokio::fs::write(&hook, POST_RECEIVE).await?;
34 + #[cfg(unix)]
35 + {
36 + use std::os::unix::fs::PermissionsExt;
37 + let mut perm = tokio::fs::metadata(&hook).await?.permissions();
38 + perm.set_mode(0o755);
39 + tokio::fs::set_permissions(&hook, perm).await?;
40 + }
41 + Ok(())
42 + }
43 +
44 + pub async fn resolve_ref(bare: &Path, refname: &str) -> Result<String> {
45 + let out = Command::new("git")
46 + .arg("--git-dir")
47 + .arg(bare)
48 + .args(["rev-parse", refname])
49 + .output()
50 + .await?;
51 + anyhow::ensure!(
52 + out.status.success(),
53 + "git rev-parse {refname} failed: {}",
54 + String::from_utf8_lossy(&out.stderr),
55 + );
56 + Ok(String::from_utf8(out.stdout)?.trim().to_string())
57 + }
58 +
59 + pub async fn checkout_worktree(bare: &Path, sha: &str, dest: &Path) -> Result<()> {
60 + if dest.exists() {
61 + // Pre-existing worktree (re-trigger of same sha). Idempotent — nothing to do.
62 + return Ok(());
63 + }
64 + tokio::fs::create_dir_all(dest.parent().unwrap()).await?;
65 + let out = Command::new("git")
66 + .arg("--git-dir")
67 + .arg(bare)
68 + .args(["worktree", "add", "--detach"])
69 + .arg(dest)
70 + .arg(sha)
71 + .output()
72 + .await?;
73 + anyhow::ensure!(
74 + out.status.success(),
75 + "git worktree add failed: {}",
76 + String::from_utf8_lossy(&out.stderr),
77 + );
78 + Ok(())
79 + }
@@ -1,13 +1,20 @@
1 1 use anyhow::Result;
2 2 use std::net::SocketAddr;
3 + use std::path::Path;
4 + use std::sync::Arc;
3 5
6 + mod backup;
7 + mod build;
4 8 mod config;
5 9 mod db;
10 + mod deploy;
6 11 mod error;
7 12 mod gates;
13 + mod git;
8 14 mod metrics;
9 15 mod routes;
10 16 mod state;
17 + mod sync;
11 18 mod topology;
12 19
13 20 #[tokio::main]
@@ -19,16 +26,20 @@ async fn main() -> Result<()> {
19 26 )
20 27 .init();
21 28
22 - let cfg = config::Config::load()?;
23 - let topo = topology::Topology::load(&cfg.topology_path)?;
29 + let cfg = Arc::new(config::Config::load()?);
30 + let topo = Arc::new(topology::Topology::load(&cfg.topology_path)?);
31 + tokio::fs::create_dir_all(&cfg.workdir).await?;
32 + tokio::fs::create_dir_all(&cfg.release_root).await?;
33 + git::ensure_bare_repo(Path::new(&topo.repo.bare_path)).await?;
24 34 let pool = db::connect(&cfg.db_path).await?;
25 35 db::migrate(&pool).await?;
36 + sync::sync(&pool, &*topo).await?;
37 + tracing::info!(tiers = topo.tiers.len(), bare = %topo.repo.bare_path, "topology synced");
26 38
27 39 let prom = metrics::init();
28 - let app_state = state::AppState { pool, topo, prom };
29 - let app = routes::router(app_state);
30 -
31 40 let addr: SocketAddr = cfg.listen.parse()?;
41 + let app_state = state::AppState { pool, topo, cfg, prom };
42 + let app = routes::router(app_state);
32 43 tracing::info!(%addr, "sando daemon listening");
33 44 let listener = tokio::net::TcpListener::bind(addr).await?;
34 45 axum::serve(listener, app).await?;
@@ -5,6 +5,7 @@ use axum::response::IntoResponse;
5 5 use axum::routing::{get, post};
6 6 use axum::{Json, Router};
7 7 use serde::{Deserialize, Serialize};
8 + use sqlx::Row;
8 9
9 10 pub fn router(state: AppState) -> Router {
10 11 let prom = state.prom.clone();
@@ -27,13 +28,85 @@ struct StateView {
27 28 #[derive(Serialize)]
28 29 struct TierView {
29 30 name: String,
31 + ord: i64,
30 32 provisioned: bool,
33 + canary: String,
31 34 current_version: Option<String>,
32 - gates_passed: Vec<String>,
35 + previous_version: Option<String>,
36 + burn_in_started_at: Option<String>,
37 + nodes: Vec<String>,
38 + gates: Vec<GateView>,
33 39 }
34 40
35 - async fn get_state(State(_s): State<AppState>) -> Result<Json<StateView>> {
36 - todo!("read tier/version/gate state from sqlite")
41 + #[derive(Serialize)]
42 + struct GateView {
43 + kind: String,
44 + passed: Option<bool>,
45 + finished_at: Option<String>,
46 + detail: Option<String>,
47 + }
48 +
49 + async fn get_state(State(s): State<AppState>) -> Result<Json<StateView>> {
50 + let rows = sqlx::query(
51 + "SELECT t.name, t.ord, t.provisioned, t.canary,
52 + ts.current_version, ts.previous_version, ts.burn_in_started_at
53 + FROM tiers t
54 + LEFT JOIN tier_state ts ON ts.tier = t.name
55 + ORDER BY t.ord",
56 + )
57 + .fetch_all(&s.pool)
58 + .await?;
59 +
60 + let mut tiers = Vec::with_capacity(rows.len());
61 + for r in rows {
62 + let name: String = r.get("name");
63 + let current_version: Option<String> = r.get("current_version");
64 +
65 + let nodes: Vec<String> = sqlx::query_scalar("SELECT name FROM nodes WHERE tier = ? ORDER BY name")
66 + .bind(&name)
67 + .fetch_all(&s.pool)
68 + .await?;
69 +
70 + let gates: Vec<GateView> = if let Some(ver) = current_version.as_ref() {
71 + // Most recent gate_runs row per gate_kind for (tier, current_version).
72 + sqlx::query(
73 + "SELECT gate_kind, passed, finished_at, detail
74 + FROM gate_runs g
75 + WHERE tier = ?1 AND version = ?2
76 + AND id = (SELECT MAX(id) FROM gate_runs
77 + WHERE tier = ?1 AND version = ?2 AND gate_kind = g.gate_kind)
78 + ORDER BY gate_kind",
79 + )
80 + .bind(&name)
81 + .bind(ver)
82 + .fetch_all(&s.pool)
83 + .await?
84 + .into_iter()
85 + .map(|gr| GateView {
86 + kind: gr.get("gate_kind"),
87 + passed: gr.get::<Option<i64>, _>("passed").map(|v| v != 0),
88 + finished_at: gr.get("finished_at"),
89 + detail: gr.get("detail"),
90 + })
91 + .collect()
92 + } else {
93 + Vec::new()
94 + };
95 +
96 + tiers.push(TierView {
97 + name,
98 + ord: r.get("ord"),
99 + provisioned: r.get::<i64, _>("provisioned") != 0,
100 + canary: r.get("canary"),
101 + current_version,
102 + previous_version: r.get("previous_version"),
103 + burn_in_started_at: r.get("burn_in_started_at"),
104 + nodes,
105 + gates,
106 + });
107 + }
108 +
109 + Ok(Json(StateView { tiers }))
37 110 }
38 111
39 112 #[derive(Deserialize)]
@@ -46,26 +119,233 @@ struct PromoteBody {
46 119 }
47 120
48 121 async fn promote(
49 - State(_s): State<AppState>,
50 - Path(_tier): Path<String>,
51 - Json(_body): Json<PromoteBody>,
122 + State(s): State<AppState>,
123 + Path(tier): Path<String>,
124 + Json(body): Json<PromoteBody>,
52 125 ) -> Result<Json<serde_json::Value>> {
53 - todo!("verify gates, atomic symlink swap on each node, sequential canary per topology")
126 + let idx = s.topo.tiers.iter().position(|t| t.name == tier)
127 + .ok_or(crate::error::Error::NotFound)?;
128 + if idx == 0 {
129 + return Err(crate::error::Error::GateBlocked(
130 + "cannot /promote to the first tier; use /rebuild".into(),
131 + ));
132 + }
133 + let target = &s.topo.tiers[idx];
134 + let source = &s.topo.tiers[idx - 1];
135 +
136 + // 1. Predecessor must have all of its gates green for this version (with
137 + // optional hotfix override that skips burn_in).
138 + let pending = unsatisfied_gates(&s.pool, &source.name, &body.version, body.hotfix).await?;
139 + if !pending.is_empty() {
140 + return Err(crate::error::Error::GateBlocked(format!(
141 + "{} gate(s) not satisfied on tier {}: {}",
142 + pending.len(),
143 + source.name,
144 + pending.join(", "),
145 + )));
146 + }
147 +
148 + // 2. Look up the artifact for this version.
149 + let bin: Option<(String,)> = sqlx::query_as(
150 + "SELECT artifact_path FROM versions WHERE version = ?",
151 + )
152 + .bind(&body.version)
153 + .fetch_optional(&s.pool)
154 + .await
155 + .map_err(crate::error::Error::Db)?;
156 + let Some((bin,)) = bin else {
157 + return Err(crate::error::Error::NotFound);
158 + };
159 + let bin_path = std::path::PathBuf::from(bin);
160 +
161 + // 3. Deploy to each node. Sequential canary is the only policy
162 + // implemented in v0; parallel is a one-line change once we trust the
163 + // sequential path.
164 + for node in &target.nodes {
165 + crate::deploy::deploy_node(node, &body.version, &bin_path)
166 + .await
167 + .map_err(crate::error::Error::Other)?;
168 + let now = chrono::Utc::now().to_rfc3339();
169 + sqlx::query(
170 + "INSERT INTO deploys (version, tier, node, started_at, finished_at, outcome, hotfix, reset_burn_in)
171 + VALUES (?, ?, ?, ?, ?, 'ok', ?, ?)",
172 + )
173 + .bind(&body.version).bind(&target.name).bind(&node.name)
174 + .bind(&now).bind(&now)
175 + .bind(body.hotfix as i64).bind(body.reset_burn_in as i64)
176 + .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
177 + }
178 +
179 + // 4. Advance tier_state. burn_in_started_at is set to now so the target
180 + // tier's burn_in gate starts ticking. reset_burn_in on the *source*
181 + // tier nulls its clock only when the operator explicitly asked for it.
182 + let prev: Option<String> = sqlx::query_scalar(
183 + "SELECT current_version FROM tier_state WHERE tier = ?",
184 + )
185 + .bind(&target.name)
186 + .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?.flatten();
187 + sqlx::query(
188 + "UPDATE tier_state SET previous_version = ?, current_version = ?, burn_in_started_at = ?
189 + WHERE tier = ?",
190 + )
191 + .bind(prev)
192 + .bind(&body.version)
193 + .bind(chrono::Utc::now().to_rfc3339())
194 + .bind(&target.name)
195 + .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
196 +
197 + if body.reset_burn_in {
198 + sqlx::query("UPDATE tier_state SET burn_in_started_at = NULL WHERE tier = ?")
199 + .bind(&source.name)
200 + .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
201 + }
202 +
203 + tracing::info!(
204 + version = %body.version, tier = %target.name,
205 + hotfix = body.hotfix, reset_burn_in = body.reset_burn_in,
206 + "promote complete",
207 + );
208 +
209 + Ok(Json(serde_json::json!({
210 + "tier": target.name,
211 + "version": body.version,
212 + "nodes_deployed": target.nodes.iter().map(|n| n.name.clone()).collect::<Vec<_>>(),
213 + })))
214 + }
215 +
216 + /// Returns the kinds of gates on `tier` that have not (yet) passed for
217 + /// `version`. `hotfix` suppresses the burn_in requirement only.
218 + async fn unsatisfied_gates(
219 + pool: &sqlx::SqlitePool,
220 + tier: &str,
221 + version: &str,
222 + hotfix: bool,
223 + ) -> std::result::Result<Vec<String>, crate::error::Error> {
224 + // We need the configured gate list for the tier to know what *should*
225 + // pass. The route handler has Topology in hand and could pass it in, but
226 + // the DB also captures it implicitly via gate_runs rows. Simplest correct
227 + // answer: re-read from topology via tier name; the caller has it.
228 + // For now we inspect the latest gate_runs.
229 + let rows: Vec<(String, Option<i64>)> = sqlx::query_as(
230 + "SELECT gate_kind, passed FROM gate_runs g
231 + WHERE tier = ?1 AND version = ?2
232 + AND id = (SELECT MAX(id) FROM gate_runs
233 + WHERE tier = ?1 AND version = ?2 AND gate_kind = g.gate_kind)",
234 + )
235 + .bind(tier).bind(version)
236 + .fetch_all(pool).await.map_err(crate::error::Error::Db)?;
237 + let mut bad = Vec::new();
238 + for (kind, passed) in rows {
239 + if hotfix && kind == "burn_in" {
240 + continue;
241 + }
242 + if passed.unwrap_or(0) == 0 {
243 + bad.push(kind);
244 + }
245 + }
246 + Ok(bad)
54 247 }
55 248
56 249 async fn rollback(
57 - State(_s): State<AppState>,
58 - Path(_tier): Path<String>,
250 + State(s): State<AppState>,
251 + Path(tier): Path<String>,
59 252 ) -> Result<Json<serde_json::Value>> {
60 - todo!("flip symlink to previous release, restart unit")
253 + let target = s.topo.tiers.iter().find(|t| t.name == tier)
254 + .ok_or(crate::error::Error::NotFound)?;
255 +
256 + let row: Option<(Option<String>, Option<String>)> = sqlx::query_as(
257 + "SELECT current_version, previous_version FROM tier_state WHERE tier = ?",
258 + )
259 + .bind(&tier)
260 + .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?;
261 + let (Some(current), Some(previous)) = row.unwrap_or((None, None)) else {
262 + return Err(crate::error::Error::GateBlocked(
263 + "no previous_version to roll back to".into(),
264 + ));
265 + };
266 +
267 + let bin: Option<(String,)> = sqlx::query_as(
268 + "SELECT artifact_path FROM versions WHERE version = ?",
269 + )
270 + .bind(&previous)
271 + .fetch_optional(&s.pool).await.map_err(crate::error::Error::Db)?;
272 + let Some((bin,)) = bin else {
273 + return Err(crate::error::Error::GateBlocked(
274 + format!("previous version {previous} has no artifact_path; rollback impossible"),
275 + ));
276 + };
277 + let bin_path = std::path::PathBuf::from(bin);
278 +
279 + for node in &target.nodes {
280 + crate::deploy::deploy_node(node, &previous, &bin_path)
281 + .await
282 + .map_err(crate::error::Error::Other)?;
283 + }
284 +
285 + sqlx::query(
286 + "UPDATE tier_state SET current_version = ?, previous_version = ?, burn_in_started_at = NULL
287 + WHERE tier = ?",
288 + )
289 + .bind(&previous)
290 + .bind(&current)
291 + .bind(&tier)
292 + .execute(&s.pool).await.map_err(crate::error::Error::Db)?;
293 +
294 + tracing::warn!(tier = %tier, from = %current, to = %previous, "rollback complete");
295 +
296 + Ok(Json(serde_json::json!({
297 + "tier": tier,
298 + "rolled_back_from": current,
299 + "now_running": previous,
300 + })))
61 301 }
62 302
63 - async fn rebuild(State(_s): State<AppState>) -> Result<Json<serde_json::Value>> {
64 - todo!("triggered by post-receive hook; build + self-deploy + run MM gates")
303 + #[derive(Deserialize, Default)]
304 + struct RebuildBody {
305 + /// Specific sha to build. If absent, resolve `topo.repo.branch` from the bare repo.
306 + #[serde(default)]
307 + sha: Option<String>,
308 + }
309 +
310 + async fn rebuild(
311 + State(s): State<AppState>,
312 + body: Option<Json<RebuildBody>>,
313 + ) -> Result<Json<serde_json::Value>> {
314 + let body = body.map(|Json(b)| b).unwrap_or_default();
315 + let sha = match body.sha {
316 + Some(s) => s,
317 + None => crate::git::resolve_ref(
318 + std::path::Path::new(&s.topo.repo.bare_path),
319 + &s.topo.repo.branch,
320 + )
321 + .await
322 + .map_err(crate::error::Error::Other)?,
323 + };
324 +
325 + tracing::info!(sha = %sha, "rebuild requested");
326 +
327 + let pool = s.pool.clone();
328 + let cfg = s.cfg.clone();
329 + let topo = s.topo.clone();
330 + let sha_for_task = sha.clone();
331 + tokio::spawn(async move {
332 + if let Err(e) = crate::build::build_and_run_mm(pool, cfg, topo, sha_for_task.clone()).await {
333 + tracing::error!(sha = %sha_for_task, error = %e, "rebuild pipeline failed");
334 + }
335 + });
336 +
337 + Ok(Json(serde_json::json!({ "accepted": true, "sha": sha })))
65 338 }
66 339
67 - async fn backup_fetch(State(_s): State<AppState>) -> Result<Json<serde_json::Value>> {
68 - todo!("pull latest prod backup from configured source to local_path")
340 + async fn backup_fetch(State(s): State<AppState>) -> Result<Json<serde_json::Value>> {
341 + let fb = crate::backup::fetch(&s.pool, &s.cfg, &s.topo)
342 + .await
343 + .map_err(crate::error::Error::Other)?;
344 + Ok(Json(serde_json::json!({
345 + "source": fb.source,
346 + "local_path": fb.local_path,
347 + "byte_size": fb.byte_size,
348 + })))
69 349 }
70 350
71 351 async fn events_ws(ws: WebSocketUpgrade, State(_s): State<AppState>) -> impl IntoResponse {
@@ -1,10 +1,13 @@
1 + use crate::config::Config;
1 2 use crate::topology::Topology;
2 3 use metrics_exporter_prometheus::PrometheusHandle;
3 4 use sqlx::SqlitePool;
5 + use std::sync::Arc;
4 6
5 7 #[derive(Clone)]
6 8 pub struct AppState {
7 9 pub pool: SqlitePool,
8 - pub topo: Topology,
10 + pub topo: Arc<Topology>,
11 + pub cfg: Arc<Config>,
9 12 pub prom: PrometheusHandle,
10 13 }
@@ -0,0 +1,225 @@
1 + //! Reconcile sando.toml into SQLite at startup.
2 + //!
3 + //! Tiers and nodes are config-driven; mutable per-tier state (current version,
4 + //! burn-in clock) lives in tier_state and must survive across syncs. Stale
5 + //! rows (tier or node removed from the TOML) are deleted, but tier_state for
6 + //! a removed tier is preserved silently — the FK is cleared by deleting the
7 + //! parent last. If you actually need to forget a retired tier, do it by hand.
8 +
9 + use crate::topology::Topology;
10 + use anyhow::Result;
11 + use sqlx::SqlitePool;
12 +
13 + pub async fn sync(pool: &SqlitePool, topo: &Topology) -> Result<()> {
14 + let mut tx = pool.begin().await?;
15 +
16 + let want_tiers: Vec<&str> = topo.tiers.iter().map(|t| t.name.as_str()).collect();
17 + let want_nodes: Vec<(&str, &str)> = topo
18 + .tiers
19 + .iter()
20 + .flat_map(|t| t.nodes.iter().map(move |n| (t.name.as_str(), n.name.as_str())))
21 + .collect();
22 +
23 + // Drop stale nodes first (FK to tiers).
24 + let existing_nodes: Vec<(String, String)> =
25 + sqlx::query_as("SELECT name, tier FROM nodes")
26 + .fetch_all(&mut *tx)
27 + .await?;
28 + for (name, tier) in existing_nodes {
29 + if !want_nodes.iter().any(|(t, n)| *t == tier && *n == name) {
30 + sqlx::query("DELETE FROM nodes WHERE name = ?")
31 + .bind(&name)
32 + .execute(&mut *tx)
33 + .await?;
34 + }
35 + }
36 +
37 + // Drop stale tiers. tier_state rows referencing them are preserved by
38 + // clearing the FK target only after a manual cleanup — for now we just
39 + // refuse to delete a tier that still has tier_state with non-null version.
40 + let existing_tiers: Vec<String> = sqlx::query_scalar("SELECT name FROM tiers")
41 + .fetch_all(&mut *tx)
42 + .await?;
43 + for t in existing_tiers {
44 + if !want_tiers.contains(&t.as_str()) {
45 + let in_use: Option<String> =
46 + sqlx::query_scalar("SELECT current_version FROM tier_state WHERE tier = ?")
47 + .bind(&t)
48 + .fetch_optional(&mut *tx)
49 + .await?
50 + .flatten();
51 + anyhow::ensure!(
52 + in_use.is_none(),
53 + "refusing to remove tier {t} from config: tier_state still pins a version. \
54 + clean it up by hand before editing sando.toml.",
55 + );
56 + sqlx::query("DELETE FROM tier_state WHERE tier = ?")
57 + .bind(&t)
58 + .execute(&mut *tx)
59 + .await?;
60 + sqlx::query("DELETE FROM tiers WHERE name = ?")
61 + .bind(&t)
62 + .execute(&mut *tx)
63 + .await?;
64 + }
65 + }
66 +
67 + // Upsert tiers in declaration order; `ord` mirrors that order so the
68 + // promotion sequence is queryable without re-reading the TOML.
69 + for (i, t) in topo.tiers.iter().enumerate() {
70 + sqlx::query(
71 + "INSERT INTO tiers (name, ord, provisioned, canary)
72 + VALUES (?, ?, ?, ?)
73 + ON CONFLICT(name) DO UPDATE SET
74 + ord = excluded.ord,
75 + provisioned = excluded.provisioned,
76 + canary = excluded.canary",
77 + )
78 + .bind(&t.name)
79 + .bind(i as i64)
80 + .bind(t.provisioned as i64)
81 + .bind(t.canary.as_str())
82 + .execute(&mut *tx)
83 + .await?;
84 +
85 + sqlx::query("INSERT OR IGNORE INTO tier_state (tier) VALUES (?)")
86 + .bind(&t.name)
87 + .execute(&mut *tx)
88 + .await?;
89 +
90 + for n in &t.nodes {
91 + sqlx::query(
92 + "INSERT INTO nodes (name, tier, ssh_target, release_root)
93 + VALUES (?, ?, ?, ?)
94 + ON CONFLICT(name) DO UPDATE SET
95 + tier = excluded.tier,
96 + ssh_target = excluded.ssh_target,
97 + release_root = excluded.release_root",
98 + )
99 + .bind(&n.name)
100 + .bind(&t.name)
101 + .bind(&n.ssh_target)
102 + .bind(&n.release_root)
103 + .execute(&mut *tx)
104 + .await?;
105 + }
106 + }
107 +
108 + tx.commit().await?;
109 + Ok(())
110 + }
111 +
112 + #[cfg(test)]
113 + mod tests {
114 + use super::*;
115 + use crate::topology::{BackupConfig, CanaryPolicy, Gate, Node, RepoConfig, Tier, Topology};
116 + use sqlx::sqlite::SqlitePoolOptions;
117 +
118 + async fn fresh_pool() -> SqlitePool {
119 + let pool = SqlitePoolOptions::new()
120 + .max_connections(1)
121 + .connect("sqlite::memory:")
122 + .await
123 + .unwrap();
124 + sqlx::migrate!("./migrations").run(&pool).await.unwrap();
125 + pool
126 + }
127 +
128 + fn topo(tiers: Vec<Tier>) -> Topology {
129 + Topology {
130 + repo: RepoConfig { bare_path: "/tmp/x".into(), branch: "main".into() },
131 + backup: BackupConfig {
132 + source: "file:///tmp/b".into(),
133 + local_path: "/tmp/b".into(),
134 + },
135 + tiers,
136 + }
137 + }
138 +
139 + fn tier(name: &str, provisioned: bool, nodes: Vec<Node>) -> Tier {
140 + Tier {
141 + name: name.into(),
142 + provisioned,
143 + gates: vec![Gate::BootSmoke],
144 + canary: CanaryPolicy::Sequential,
145 + nodes,
146 + }
147 + }
148 +
149 + fn node(name: &str) -> Node {
150 + Node {
151 + name: name.into(),
152 + ssh_target: format!("deploy@{name}"),
153 + release_root: "/opt/mnw".into(),
154 + }
155 + }
156 +
157 + #[tokio::test]
158 + async fn syncs_tiers_nodes_and_inits_tier_state() {
159 + let pool = fresh_pool().await;
160 + let t = topo(vec![
161 + tier("mm", true, vec![]),
162 + tier("a", true, vec![node("testnot-1")]),
163 + tier("c", false, vec![]),
164 + ]);
165 +
166 + sync(&pool, &t).await.unwrap();
167 +
168 + let tier_names: Vec<String> = sqlx::query_scalar("SELECT name FROM tiers ORDER BY ord")
169 + .fetch_all(&pool).await.unwrap();
170 + assert_eq!(tier_names, vec!["mm", "a", "c"]);
171 +
172 + let node_names: Vec<String> = sqlx::query_scalar("SELECT name FROM nodes")
173 + .fetch_all(&pool).await.unwrap();
174 + assert_eq!(node_names, vec!["testnot-1"]);
175 +
176 + let state_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tier_state")
177 + .fetch_one(&pool).await.unwrap();
178 + assert_eq!(state_count, 3);
179 + }
180 +
181 + #[tokio::test]
182 + async fn second_sync_is_idempotent() {
183 + let pool = fresh_pool().await;
184 + let t = topo(vec![tier("mm", true, vec![]), tier("a", true, vec![node("n1")])]);
185 + sync(&pool, &t).await.unwrap();
186 + sync(&pool, &t).await.unwrap();
187 +
188 + let nodes: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM nodes")
189 + .fetch_one(&pool).await.unwrap();
190 + assert_eq!(nodes, 1);
191 + let states: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tier_state")
192 + .fetch_one(&pool).await.unwrap();
193 + assert_eq!(states, 2);
194 + }
195 +
196 + #[tokio::test]
197 + async fn removing_node_from_config_drops_row() {
198 + let pool = fresh_pool().await;
199 + let t1 = topo(vec![tier("a", true, vec![node("n1"), node("n2")])]);
200 + sync(&pool, &t1).await.unwrap();
201 + let t2 = topo(vec![tier("a", true, vec![node("n1")])]);
202 + sync(&pool, &t2).await.unwrap();
203 +
204 + let nodes: Vec<String> = sqlx::query_scalar("SELECT name FROM nodes")
205 + .fetch_all(&pool).await.unwrap();
206 + assert_eq!(nodes, vec!["n1"]);
207 + }
208 +
209 + #[tokio::test]
210 + async fn refuses_to_drop_tier_with_pinned_version() {
211 + let pool = fresh_pool().await;
212 + let t1 = topo(vec![tier("mm", true, vec![]), tier("a", true, vec![])]);
213 + sync(&pool, &t1).await.unwrap();
214 +
215 + // Simulate a version being deployed on tier a.
216 + sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES ('0.1.0', 'deadbeef', '2026-05-22T00:00:00Z', '/r/0.1.0')")
217 + .execute(&pool).await.unwrap();
218 + sqlx::query("UPDATE tier_state SET current_version = '0.1.0' WHERE tier = 'a'")
219 + .execute(&pool).await.unwrap();
220 +
221 + let t2 = topo(vec![tier("mm", true, vec![])]);
222 + let err = sync(&pool, &t2).await.unwrap_err();
223 + assert!(err.to_string().contains("tier_state still pins"), "got: {err}");
224 + }
225 + }
@@ -49,6 +49,15 @@ pub enum CanaryPolicy {
49 49 Parallel,
50 50 }
51 51
52 + impl CanaryPolicy {
53 + pub fn as_str(self) -> &'static str {
54 + match self {
55 + CanaryPolicy::Sequential => "sequential",
56 + CanaryPolicy::Parallel => "parallel",
57 + }
58 + }
59 + }
60 +
52 61 #[derive(Debug, Clone, Serialize, Deserialize)]
53 62 #[serde(tag = "kind", rename_all = "snake_case")]
54 63 pub enum Gate {
@@ -0,0 +1,28 @@
1 + #!/usr/bin/env bash
2 + # Sando bare-repo post-receive hook.
3 + #
4 + # Install: copy into <bare repo>/hooks/post-receive and chmod +x.
5 + # Reads each updated ref from stdin (old new ref) and posts the new sha to
6 + # the daemon's /rebuild endpoint. Only the configured deploy branch is
7 + # acted on; pushes to other refs are silently ignored.
8 +
9 + set -euo pipefail
10 +
11 + DAEMON_URL="${SANDO_DAEMON:-http://127.0.0.1:7766}"
12 + DEPLOY_BRANCH="${SANDO_BRANCH:-main}"
13 +
14 + while read -r oldsha newsha ref; do
15 + if [[ "$ref" != "refs/heads/$DEPLOY_BRANCH" ]]; then
16 + continue
17 + fi
18 + if [[ "$newsha" == "0000000000000000000000000000000000000000" ]]; then
19 + # Branch deletion; nothing to build.
20 + continue
21 + fi
22 + echo "sando: posting rebuild for $newsha"
23 + curl --silent --show-error --fail \
24 + -X POST "$DAEMON_URL/rebuild" \
25 + -H 'Content-Type: application/json' \
26 + -d "{\"sha\":\"$newsha\"}" \
27 + || echo "sando: rebuild trigger failed; check daemon"
28 + done
@@ -1,19 +1,65 @@
1 1 use anyhow::Result;
2 2 use crossterm::event::{self, Event, KeyCode};
3 - use crossterm::terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen};
3 + use crossterm::terminal::{
4 + disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen,
5 + };
4 6 use ratatui::prelude::*;
5 - use ratatui::widgets::{Block, Borders, Paragraph};
7 + use ratatui::widgets::{Block, Borders, Paragraph, Row, Table};
8 + use serde::Deserialize;
6 9 use std::io;
7 - use std::time::Duration;
10 + use std::sync::{Arc, Mutex};
11 + use std::time::{Duration, Instant};
12 +
13 + #[derive(Clone, Debug, Deserialize)]
14 + struct StateView {
15 + tiers: Vec<TierView>,
16 + }
17 +
18 + #[derive(Clone, Debug, Deserialize)]
19 + struct TierView {
20 + name: String,
21 + provisioned: bool,
22 + canary: String,
23 + current_version: Option<String>,
24 + previous_version: Option<String>,
25 + burn_in_started_at: Option<String>,
26 + nodes: Vec<String>,
27 + gates: Vec<GateView>,
28 + }
29 +
30 + #[derive(Clone, Debug, Deserialize)]
31 + struct GateView {
32 + kind: String,
33 + passed: Option<bool>,
34 + finished_at: Option<String>,
35 + }
36 +
37 + struct App {
38 + daemon: String,
39 + state: Arc<Mutex<Option<StateView>>>,
40 + last_err: Arc<Mutex<Option<String>>>,
41 + }
8 42
9 43 fn main() -> Result<()> {
44 + let daemon = std::env::var("SANDO_DAEMON").unwrap_or_else(|_| "http://127.0.0.1:7766".into());
45 +
46 + let app = App {
47 + daemon,
48 + state: Arc::new(Mutex::new(None)),
49 + last_err: Arc::new(Mutex::new(None)),
50 + };
51 +
52 + let rt = tokio::runtime::Builder::new_current_thread()
53 + .enable_all()
54 + .build()?;
55 +
10 56 enable_raw_mode()?;
11 57 let mut stdout = io::stdout();
12 58 crossterm::execute!(stdout, EnterAlternateScreen)?;
13 59 let backend = CrosstermBackend::new(stdout);
14 60 let mut term = Terminal::new(backend)?;
15 61
16 - let res = run(&mut term);
62 + let res = run(&mut term, &app, &rt);
17 63
18 64 disable_raw_mode()?;
19 65 crossterm::execute!(term.backend_mut(), LeaveAlternateScreen)?;
@@ -21,21 +67,122 @@ fn main() -> Result<()> {
21 67 res
22 68 }
23 69
24 - fn run<B: Backend>(term: &mut Terminal<B>) -> Result<()> {
70 + fn run<B: Backend>(term: &mut Terminal<B>, app: &App, rt: &tokio::runtime::Runtime) -> Result<()> {
71 + let mut next_poll = Instant::now();
25 72 loop {
26 - term.draw(|f| {
27 - let area = f.area();
28 - let block = Block::default().title("sando").borders(Borders::ALL);
29 - let body = Paragraph::new("v0 scaffold. press q to quit.").block(block);
30 - f.render_widget(body, area);
31 - })?;
32 -
33 - if event::poll(Duration::from_millis(200))? {
73 + if Instant::now() >= next_poll {
74 + poll_state(app, rt);
75 + next_poll = Instant::now() + Duration::from_secs(2);
76 + }
77 +
78 + term.draw(|f| draw(f, app))?;
79 +
80 + if event::poll(Duration::from_millis(150))? {
34 81 if let Event::Key(k) = event::read()? {
35 - if matches!(k.code, KeyCode::Char('q') | KeyCode::Esc) {
36 - return Ok(());
82 + match k.code {
83 + KeyCode::Char('q') | KeyCode::Esc => return Ok(()),
84 + KeyCode::Char('r') => next_poll = Instant::now(),
85 + _ => {}
37 86 }
38 87 }
39 88 }
40 89 }
41 90 }
91 +
92 + fn poll_state(app: &App, rt: &tokio::runtime::Runtime) {
93 + let url = format!("{}/state", app.daemon);
94 + let result: std::result::Result<StateView, String> = rt.block_on(async {
95 + let resp = reqwest::Client::new()
96 + .get(&url)
97 + .timeout(Duration::from_secs(2))
98 + .send()
99 + .await
100 + .map_err(|e| e.to_string())?;
101 + if !resp.status().is_success() {
102 + return Err(format!("status {}", resp.status()));
103 + }
104 + resp.json::<StateView>().await.map_err(|e| e.to_string())
105 + });
106 + match result {
107 + Ok(s) => {
108 + *app.state.lock().unwrap() = Some(s);
109 + *app.last_err.lock().unwrap() = None;
110 + }
111 + Err(e) => *app.last_err.lock().unwrap() = Some(e),
112 + }
113 + }
114 +
115 + fn draw(f: &mut Frame, app: &App) {
116 + let chunks = Layout::default()
117 + .direction(Direction::Vertical)
118 + .constraints([Constraint::Length(3), Constraint::Min(0), Constraint::Length(2)])
119 + .split(f.area());
120 +
121 + let header = Paragraph::new(format!("sando -> {}", app.daemon))
122 + .block(Block::default().title("daemon").borders(Borders::ALL));
123 + f.render_widget(header, chunks[0]);
124 +
125 + let state = app.state.lock().unwrap().clone();
126 + let err = app.last_err.lock().unwrap().clone();
127 +
128 + if let Some(s) = state {
129 + let header_row = Row::new(vec![
130 + "tier", "prov", "current", "previous", "burn-in", "nodes", "gates",
131 + ])
132 + .style(Style::default().add_modifier(Modifier::BOLD));
133 +
134 + let rows: Vec<Row> = s.tiers.iter().map(|t| {
135 + let gates = if t.gates.is_empty() {
136 + "-".into()
137 + } else {
138 + t.gates.iter().map(|g| {
139 + let mark = match g.passed {
140 + Some(true) => "ok",
141 + Some(false) => "fail",
142 + None => "?",
143 + };
144 + format!("{}:{}", g.kind, mark)
145 + }).collect::<Vec<_>>().join(" ")
146 + };
147 + Row::new(vec![
148 + t.name.clone(),
149 + if t.provisioned { "yes".into() } else { "no".into() },
150 + t.current_version.clone().unwrap_or_else(|| "-".into()),
151 + t.previous_version.clone().unwrap_or_else(|| "-".into()),
152 + t.burn_in_started_at.clone().unwrap_or_else(|| "-".into()),
153 + if t.nodes.is_empty() { "-".into() } else { t.nodes.join(",") },
154 + gates,
155 + ])
156 + }).collect();
157 +
158 + let widths = [
159 + Constraint::Length(8),
160 + Constraint::Length(5),
161 + Constraint::Length(12),
162 + Constraint::Length(12),
163 + Constraint::Length(22),
164 + Constraint::Length(24),
165 + Constraint::Min(20),
166 + ];
167 + let table = Table::new(rows, widths)
168 + .header(header_row)
169 + .block(Block::default().title(format!("tiers ({})", s.tiers.len())).borders(Borders::ALL));
170 + f.render_widget(table, chunks[1]);
171 + } else {
172 + let msg = err.clone().unwrap_or_else(|| "loading...".into());
173 + let placeholder = Paragraph::new(msg)
174 + .block(Block::default().title("tiers").borders(Borders::ALL));
175 + f.render_widget(placeholder, chunks[1]);
176 + }
177 +
178 + let status = if let Some(e) = err {
179 + format!("error: {e} [r] retry [q] quit")
180 + } else {
181 + "[r] refresh [q] quit (poll 2s, canary col shows tier policy)".into()
182 + };
183 + f.render_widget(Paragraph::new(status), chunks[2]);
184 +
185 + let _ = app.state.lock().unwrap().as_ref().map(|s| {
186 + s.tiers.iter().map(|t| t.canary.clone()).collect::<Vec<_>>()
187 + });
188 + }