//! Reconcile sando.toml into SQLite at startup. //! //! Tiers and nodes are config-driven; mutable per-tier state (current version, //! burn-in clock) lives in tier_state and must survive across syncs. Stale //! rows (tier or node removed from the TOML) are deleted, but tier_state for //! a removed tier is preserved silently — the FK is cleared by deleting the //! parent last. If you actually need to forget a retired tier, do it by hand. use crate::topology::Topology; use anyhow::Result; use sqlx::SqlitePool; pub async fn sync(pool: &SqlitePool, topo: &Topology) -> Result<()> { let mut tx = pool.begin().await?; let want_tiers: Vec<&str> = topo.tiers.iter().map(|t| t.name.as_str()).collect(); let want_nodes: Vec<(&str, &str)> = topo .tiers .iter() .flat_map(|t| t.nodes.iter().map(move |n| (t.name.as_str(), n.name.as_str()))) .collect(); // Drop stale nodes first (FK to tiers). let existing_nodes: Vec<(String, String)> = sqlx::query_as("SELECT name, tier FROM nodes") .fetch_all(&mut *tx) .await?; for (name, tier) in existing_nodes { if !want_nodes.iter().any(|(t, n)| *t == tier && *n == name) { sqlx::query("DELETE FROM nodes WHERE name = ?") .bind(&name) .execute(&mut *tx) .await?; } } // Drop stale tiers. tier_state rows referencing them are preserved by // clearing the FK target only after a manual cleanup — for now we just // refuse to delete a tier that still has tier_state with non-null version. let existing_tiers: Vec = sqlx::query_scalar("SELECT name FROM tiers") .fetch_all(&mut *tx) .await?; for t in existing_tiers { if !want_tiers.contains(&t.as_str()) { let in_use: Option = sqlx::query_scalar("SELECT current_version FROM tier_state WHERE tier = ?") .bind(&t) .fetch_optional(&mut *tx) .await? .flatten(); anyhow::ensure!( in_use.is_none(), "refusing to remove tier {t} from config: tier_state still pins a version. \ clean it up by hand before editing sando.toml.", ); sqlx::query("DELETE FROM tier_state WHERE tier = ?") .bind(&t) .execute(&mut *tx) .await?; sqlx::query("DELETE FROM tiers WHERE name = ?") .bind(&t) .execute(&mut *tx) .await?; } } // Upsert tiers in declaration order; `ord` mirrors that order so the // promotion sequence is queryable without re-reading the TOML. for (i, t) in topo.tiers.iter().enumerate() { sqlx::query( "INSERT INTO tiers (name, ord, provisioned, canary) VALUES (?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET ord = excluded.ord, provisioned = excluded.provisioned, canary = excluded.canary", ) .bind(&t.name) .bind(i as i64) .bind(t.provisioned as i64) .bind(t.canary.as_str()) .execute(&mut *tx) .await?; sqlx::query("INSERT OR IGNORE INTO tier_state (tier) VALUES (?)") .bind(&t.name) .execute(&mut *tx) .await?; for n in &t.nodes { sqlx::query( "INSERT INTO nodes (name, tier, ssh_target, release_root) VALUES (?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET tier = excluded.tier, ssh_target = excluded.ssh_target, release_root = excluded.release_root", ) .bind(&n.name) .bind(&t.name) .bind(&n.ssh_target) .bind(&n.release_root) .execute(&mut *tx) .await?; } } tx.commit().await?; Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::topology::{BackupConfig, CanaryPolicy, Gate, Node, RepoConfig, Tier, Topology}; use sqlx::sqlite::SqlitePoolOptions; async fn fresh_pool() -> SqlitePool { let pool = SqlitePoolOptions::new() .max_connections(1) .connect("sqlite::memory:") .await .unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); pool } fn topo(tiers: Vec) -> Topology { Topology { repo: RepoConfig { bare_path: "/tmp/x".into(), branch: "main".into() }, backup: BackupConfig { source: "file:///tmp/b".into(), local_path: "/tmp/b".into(), }, tiers, } } fn tier(name: &str, provisioned: bool, nodes: Vec) -> Tier { Tier { name: name.into(), provisioned, gates: vec![Gate::BootSmoke], canary: CanaryPolicy::Sequential, nodes, } } fn node(name: &str) -> Node { Node { name: name.into(), ssh_target: format!("deploy@{name}"), release_root: "/opt/mnw".into(), service_name: "makenotwork.service".into(), actuate: crate::topology::default_actuate(), observe: crate::topology::default_observe(), } } #[tokio::test] async fn syncs_tiers_nodes_and_inits_tier_state() { let pool = fresh_pool().await; let t = topo(vec![ tier("host", true, vec![]), tier("a", true, vec![node("testnot-1")]), tier("c", false, vec![]), ]); sync(&pool, &t).await.unwrap(); let tier_names: Vec = sqlx::query_scalar("SELECT name FROM tiers ORDER BY ord") .fetch_all(&pool).await.unwrap(); assert_eq!(tier_names, vec!["host", "a", "c"]); let node_names: Vec = sqlx::query_scalar("SELECT name FROM nodes") .fetch_all(&pool).await.unwrap(); assert_eq!(node_names, vec!["testnot-1"]); let state_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tier_state") .fetch_one(&pool).await.unwrap(); assert_eq!(state_count, 3); } #[tokio::test] async fn second_sync_is_idempotent() { let pool = fresh_pool().await; let t = topo(vec![tier("host", true, vec![]), tier("a", true, vec![node("n1")])]); sync(&pool, &t).await.unwrap(); sync(&pool, &t).await.unwrap(); let nodes: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM nodes") .fetch_one(&pool).await.unwrap(); assert_eq!(nodes, 1); let states: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tier_state") .fetch_one(&pool).await.unwrap(); assert_eq!(states, 2); } #[tokio::test] async fn removing_node_from_config_drops_row() { let pool = fresh_pool().await; let t1 = topo(vec![tier("a", true, vec![node("n1"), node("n2")])]); sync(&pool, &t1).await.unwrap(); let t2 = topo(vec![tier("a", true, vec![node("n1")])]); sync(&pool, &t2).await.unwrap(); let nodes: Vec = sqlx::query_scalar("SELECT name FROM nodes") .fetch_all(&pool).await.unwrap(); assert_eq!(nodes, vec!["n1"]); } #[tokio::test] async fn refuses_to_drop_tier_with_pinned_version() { let pool = fresh_pool().await; let t1 = topo(vec![tier("host", true, vec![]), tier("a", true, vec![])]); sync(&pool, &t1).await.unwrap(); // Simulate a version being deployed on tier a. sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES ('0.1.0', 'deadbeef', '2026-05-22T00:00:00Z', '/r/0.1.0')") .execute(&pool).await.unwrap(); sqlx::query("UPDATE tier_state SET current_version = '0.1.0' WHERE tier = 'a'") .execute(&pool).await.unwrap(); let t2 = topo(vec![tier("host", true, vec![])]); let err = sync(&pool, &t2).await.unwrap_err(); assert!(err.to_string().contains("tier_state still pins"), "got: {err}"); } }