| 1 |
|
| 2 |
|
| 3 |
|
| 4 |
|
| 5 |
|
| 6 |
|
| 7 |
|
| 8 |
|
| 9 |
use crate::topology::Topology; |
| 10 |
use anyhow::Result; |
| 11 |
use sqlx::SqlitePool; |
| 12 |
|
| 13 |
pub async fn sync(pool: &SqlitePool, topo: &Topology) -> Result<()> { |
| 14 |
let mut tx = pool.begin().await?; |
| 15 |
|
| 16 |
let want_tiers: Vec<&str> = topo.tiers.iter().map(|t| t.name.as_str()).collect(); |
| 17 |
let want_nodes: Vec<(&str, &str)> = topo |
| 18 |
.tiers |
| 19 |
.iter() |
| 20 |
.flat_map(|t| t.nodes.iter().map(move |n| (t.name.as_str(), n.name.as_str()))) |
| 21 |
.collect(); |
| 22 |
|
| 23 |
|
| 24 |
let existing_nodes: Vec<(String, String)> = |
| 25 |
sqlx::query_as("SELECT name, tier FROM nodes") |
| 26 |
.fetch_all(&mut *tx) |
| 27 |
.await?; |
| 28 |
for (name, tier) in existing_nodes { |
| 29 |
if !want_nodes.iter().any(|(t, n)| *t == tier && *n == name) { |
| 30 |
sqlx::query("DELETE FROM nodes WHERE name = ?") |
| 31 |
.bind(&name) |
| 32 |
.execute(&mut *tx) |
| 33 |
.await?; |
| 34 |
} |
| 35 |
} |
| 36 |
|
| 37 |
|
| 38 |
|
| 39 |
|
| 40 |
let existing_tiers: Vec<String> = sqlx::query_scalar("SELECT name FROM tiers") |
| 41 |
.fetch_all(&mut *tx) |
| 42 |
.await?; |
| 43 |
for t in existing_tiers { |
| 44 |
if !want_tiers.contains(&t.as_str()) { |
| 45 |
let in_use: Option<String> = |
| 46 |
sqlx::query_scalar("SELECT current_version FROM tier_state WHERE tier = ?") |
| 47 |
.bind(&t) |
| 48 |
.fetch_optional(&mut *tx) |
| 49 |
.await? |
| 50 |
.flatten(); |
| 51 |
anyhow::ensure!( |
| 52 |
in_use.is_none(), |
| 53 |
"refusing to remove tier {t} from config: tier_state still pins a version. \ |
| 54 |
clean it up by hand before editing sando.toml.", |
| 55 |
); |
| 56 |
sqlx::query("DELETE FROM tier_state WHERE tier = ?") |
| 57 |
.bind(&t) |
| 58 |
.execute(&mut *tx) |
| 59 |
.await?; |
| 60 |
sqlx::query("DELETE FROM tiers WHERE name = ?") |
| 61 |
.bind(&t) |
| 62 |
.execute(&mut *tx) |
| 63 |
.await?; |
| 64 |
} |
| 65 |
} |
| 66 |
|
| 67 |
|
| 68 |
|
| 69 |
for (i, t) in topo.tiers.iter().enumerate() { |
| 70 |
sqlx::query( |
| 71 |
"INSERT INTO tiers (name, ord, provisioned, canary) |
| 72 |
VALUES (?, ?, ?, ?) |
| 73 |
ON CONFLICT(name) DO UPDATE SET |
| 74 |
ord = excluded.ord, |
| 75 |
provisioned = excluded.provisioned, |
| 76 |
canary = excluded.canary", |
| 77 |
) |
| 78 |
.bind(&t.name) |
| 79 |
.bind(i as i64) |
| 80 |
.bind(t.provisioned as i64) |
| 81 |
.bind(t.canary.as_str()) |
| 82 |
.execute(&mut *tx) |
| 83 |
.await?; |
| 84 |
|
| 85 |
sqlx::query("INSERT OR IGNORE INTO tier_state (tier) VALUES (?)") |
| 86 |
.bind(&t.name) |
| 87 |
.execute(&mut *tx) |
| 88 |
.await?; |
| 89 |
|
| 90 |
for n in &t.nodes { |
| 91 |
sqlx::query( |
| 92 |
"INSERT INTO nodes (name, tier, ssh_target, release_root) |
| 93 |
VALUES (?, ?, ?, ?) |
| 94 |
ON CONFLICT(name) DO UPDATE SET |
| 95 |
tier = excluded.tier, |
| 96 |
ssh_target = excluded.ssh_target, |
| 97 |
release_root = excluded.release_root", |
| 98 |
) |
| 99 |
.bind(&n.name) |
| 100 |
.bind(&t.name) |
| 101 |
.bind(&n.ssh_target) |
| 102 |
.bind(&n.release_root) |
| 103 |
.execute(&mut *tx) |
| 104 |
.await?; |
| 105 |
} |
| 106 |
} |
| 107 |
|
| 108 |
tx.commit().await?; |
| 109 |
Ok(()) |
| 110 |
} |
| 111 |
|
| 112 |
#[cfg(test)] |
| 113 |
mod tests { |
| 114 |
use super::*; |
| 115 |
use crate::topology::{BackupConfig, CanaryPolicy, Gate, Node, RepoConfig, Tier, Topology}; |
| 116 |
use sqlx::sqlite::SqlitePoolOptions; |
| 117 |
|
| 118 |
async fn fresh_pool() -> SqlitePool { |
| 119 |
let pool = SqlitePoolOptions::new() |
| 120 |
.max_connections(1) |
| 121 |
.connect("sqlite::memory:") |
| 122 |
.await |
| 123 |
.unwrap(); |
| 124 |
sqlx::migrate!("./migrations").run(&pool).await.unwrap(); |
| 125 |
pool |
| 126 |
} |
| 127 |
|
| 128 |
fn topo(tiers: Vec<Tier>) -> Topology { |
| 129 |
Topology { |
| 130 |
repo: RepoConfig { bare_path: "/tmp/x".into(), branch: "main".into() }, |
| 131 |
backup: BackupConfig { |
| 132 |
source: "file:///tmp/b".into(), |
| 133 |
local_path: "/tmp/b".into(), |
| 134 |
}, |
| 135 |
tiers, |
| 136 |
} |
| 137 |
} |
| 138 |
|
| 139 |
fn tier(name: &str, provisioned: bool, nodes: Vec<Node>) -> Tier { |
| 140 |
Tier { |
| 141 |
name: name.into(), |
| 142 |
provisioned, |
| 143 |
gates: vec![Gate::BootSmoke], |
| 144 |
canary: CanaryPolicy::Sequential, |
| 145 |
nodes, |
| 146 |
} |
| 147 |
} |
| 148 |
|
| 149 |
fn node(name: &str) -> Node { |
| 150 |
Node { |
| 151 |
name: name.into(), |
| 152 |
ssh_target: format!("deploy@{name}"), |
| 153 |
release_root: "/opt/mnw".into(), |
| 154 |
service_name: "makenotwork.service".into(), |
| 155 |
actuate: crate::topology::default_actuate(), |
| 156 |
observe: crate::topology::default_observe(), |
| 157 |
} |
| 158 |
} |
| 159 |
|
| 160 |
#[tokio::test] |
| 161 |
async fn syncs_tiers_nodes_and_inits_tier_state() { |
| 162 |
let pool = fresh_pool().await; |
| 163 |
let t = topo(vec![ |
| 164 |
tier("host", true, vec![]), |
| 165 |
tier("a", true, vec![node("testnot-1")]), |
| 166 |
tier("c", false, vec![]), |
| 167 |
]); |
| 168 |
|
| 169 |
sync(&pool, &t).await.unwrap(); |
| 170 |
|
| 171 |
let tier_names: Vec<String> = sqlx::query_scalar("SELECT name FROM tiers ORDER BY ord") |
| 172 |
.fetch_all(&pool).await.unwrap(); |
| 173 |
assert_eq!(tier_names, vec!["host", "a", "c"]); |
| 174 |
|
| 175 |
let node_names: Vec<String> = sqlx::query_scalar("SELECT name FROM nodes") |
| 176 |
.fetch_all(&pool).await.unwrap(); |
| 177 |
assert_eq!(node_names, vec!["testnot-1"]); |
| 178 |
|
| 179 |
let state_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tier_state") |
| 180 |
.fetch_one(&pool).await.unwrap(); |
| 181 |
assert_eq!(state_count, 3); |
| 182 |
} |
| 183 |
|
| 184 |
#[tokio::test] |
| 185 |
async fn second_sync_is_idempotent() { |
| 186 |
let pool = fresh_pool().await; |
| 187 |
let t = topo(vec![tier("host", true, vec![]), tier("a", true, vec![node("n1")])]); |
| 188 |
sync(&pool, &t).await.unwrap(); |
| 189 |
sync(&pool, &t).await.unwrap(); |
| 190 |
|
| 191 |
let nodes: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM nodes") |
| 192 |
.fetch_one(&pool).await.unwrap(); |
| 193 |
assert_eq!(nodes, 1); |
| 194 |
let states: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tier_state") |
| 195 |
.fetch_one(&pool).await.unwrap(); |
| 196 |
assert_eq!(states, 2); |
| 197 |
} |
| 198 |
|
| 199 |
#[tokio::test] |
| 200 |
async fn removing_node_from_config_drops_row() { |
| 201 |
let pool = fresh_pool().await; |
| 202 |
let t1 = topo(vec![tier("a", true, vec![node("n1"), node("n2")])]); |
| 203 |
sync(&pool, &t1).await.unwrap(); |
| 204 |
let t2 = topo(vec![tier("a", true, vec![node("n1")])]); |
| 205 |
sync(&pool, &t2).await.unwrap(); |
| 206 |
|
| 207 |
let nodes: Vec<String> = sqlx::query_scalar("SELECT name FROM nodes") |
| 208 |
.fetch_all(&pool).await.unwrap(); |
| 209 |
assert_eq!(nodes, vec!["n1"]); |
| 210 |
} |
| 211 |
|
| 212 |
#[tokio::test] |
| 213 |
async fn refuses_to_drop_tier_with_pinned_version() { |
| 214 |
let pool = fresh_pool().await; |
| 215 |
let t1 = topo(vec![tier("host", true, vec![]), tier("a", true, vec![])]); |
| 216 |
sync(&pool, &t1).await.unwrap(); |
| 217 |
|
| 218 |
|
| 219 |
sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES ('0.1.0', 'deadbeef', '2026-05-22T00:00:00Z', '/r/0.1.0')") |
| 220 |
.execute(&pool).await.unwrap(); |
| 221 |
sqlx::query("UPDATE tier_state SET current_version = '0.1.0' WHERE tier = 'a'") |
| 222 |
.execute(&pool).await.unwrap(); |
| 223 |
|
| 224 |
let t2 = topo(vec![tier("host", true, vec![])]); |
| 225 |
let err = sync(&pool, &t2).await.unwrap_err(); |
| 226 |
assert!(err.to_string().contains("tier_state still pins"), "got: {err}"); |
| 227 |
} |
| 228 |
} |
| 229 |
|