Skip to main content

max / makenotwork

8.1 KB · 229 lines History Blame Raw
1 //! Reconcile sando.toml into SQLite at startup.
2 //!
3 //! Tiers and nodes are config-driven; mutable per-tier state (current version,
4 //! burn-in clock) lives in tier_state and must survive across syncs. Stale
5 //! rows (tier or node removed from the TOML) are deleted, but tier_state for
6 //! a removed tier is preserved silently — the FK is cleared by deleting the
7 //! parent last. If you actually need to forget a retired tier, do it by hand.
8
9 use crate::topology::Topology;
10 use anyhow::Result;
11 use sqlx::SqlitePool;
12
13 pub async fn sync(pool: &SqlitePool, topo: &Topology) -> Result<()> {
14 let mut tx = pool.begin().await?;
15
16 let want_tiers: Vec<&str> = topo.tiers.iter().map(|t| t.name.as_str()).collect();
17 let want_nodes: Vec<(&str, &str)> = topo
18 .tiers
19 .iter()
20 .flat_map(|t| t.nodes.iter().map(move |n| (t.name.as_str(), n.name.as_str())))
21 .collect();
22
23 // Drop stale nodes first (FK to tiers).
24 let existing_nodes: Vec<(String, String)> =
25 sqlx::query_as("SELECT name, tier FROM nodes")
26 .fetch_all(&mut *tx)
27 .await?;
28 for (name, tier) in existing_nodes {
29 if !want_nodes.iter().any(|(t, n)| *t == tier && *n == name) {
30 sqlx::query("DELETE FROM nodes WHERE name = ?")
31 .bind(&name)
32 .execute(&mut *tx)
33 .await?;
34 }
35 }
36
37 // Drop stale tiers. tier_state rows referencing them are preserved by
38 // clearing the FK target only after a manual cleanup — for now we just
39 // refuse to delete a tier that still has tier_state with non-null version.
40 let existing_tiers: Vec<String> = sqlx::query_scalar("SELECT name FROM tiers")
41 .fetch_all(&mut *tx)
42 .await?;
43 for t in existing_tiers {
44 if !want_tiers.contains(&t.as_str()) {
45 let in_use: Option<String> =
46 sqlx::query_scalar("SELECT current_version FROM tier_state WHERE tier = ?")
47 .bind(&t)
48 .fetch_optional(&mut *tx)
49 .await?
50 .flatten();
51 anyhow::ensure!(
52 in_use.is_none(),
53 "refusing to remove tier {t} from config: tier_state still pins a version. \
54 clean it up by hand before editing sando.toml.",
55 );
56 sqlx::query("DELETE FROM tier_state WHERE tier = ?")
57 .bind(&t)
58 .execute(&mut *tx)
59 .await?;
60 sqlx::query("DELETE FROM tiers WHERE name = ?")
61 .bind(&t)
62 .execute(&mut *tx)
63 .await?;
64 }
65 }
66
67 // Upsert tiers in declaration order; `ord` mirrors that order so the
68 // promotion sequence is queryable without re-reading the TOML.
69 for (i, t) in topo.tiers.iter().enumerate() {
70 sqlx::query(
71 "INSERT INTO tiers (name, ord, provisioned, canary)
72 VALUES (?, ?, ?, ?)
73 ON CONFLICT(name) DO UPDATE SET
74 ord = excluded.ord,
75 provisioned = excluded.provisioned,
76 canary = excluded.canary",
77 )
78 .bind(&t.name)
79 .bind(i as i64)
80 .bind(t.provisioned as i64)
81 .bind(t.canary.as_str())
82 .execute(&mut *tx)
83 .await?;
84
85 sqlx::query("INSERT OR IGNORE INTO tier_state (tier) VALUES (?)")
86 .bind(&t.name)
87 .execute(&mut *tx)
88 .await?;
89
90 for n in &t.nodes {
91 sqlx::query(
92 "INSERT INTO nodes (name, tier, ssh_target, release_root)
93 VALUES (?, ?, ?, ?)
94 ON CONFLICT(name) DO UPDATE SET
95 tier = excluded.tier,
96 ssh_target = excluded.ssh_target,
97 release_root = excluded.release_root",
98 )
99 .bind(&n.name)
100 .bind(&t.name)
101 .bind(&n.ssh_target)
102 .bind(&n.release_root)
103 .execute(&mut *tx)
104 .await?;
105 }
106 }
107
108 tx.commit().await?;
109 Ok(())
110 }
111
112 #[cfg(test)]
113 mod tests {
114 use super::*;
115 use crate::topology::{BackupConfig, CanaryPolicy, Gate, Node, RepoConfig, Tier, Topology};
116 use sqlx::sqlite::SqlitePoolOptions;
117
118 async fn fresh_pool() -> SqlitePool {
119 let pool = SqlitePoolOptions::new()
120 .max_connections(1)
121 .connect("sqlite::memory:")
122 .await
123 .unwrap();
124 sqlx::migrate!("./migrations").run(&pool).await.unwrap();
125 pool
126 }
127
128 fn topo(tiers: Vec<Tier>) -> Topology {
129 Topology {
130 repo: RepoConfig { bare_path: "/tmp/x".into(), branch: "main".into() },
131 backup: BackupConfig {
132 source: "file:///tmp/b".into(),
133 local_path: "/tmp/b".into(),
134 },
135 tiers,
136 }
137 }
138
139 fn tier(name: &str, provisioned: bool, nodes: Vec<Node>) -> Tier {
140 Tier {
141 name: name.into(),
142 provisioned,
143 gates: vec![Gate::BootSmoke],
144 canary: CanaryPolicy::Sequential,
145 nodes,
146 }
147 }
148
149 fn node(name: &str) -> Node {
150 Node {
151 name: name.into(),
152 ssh_target: format!("deploy@{name}"),
153 release_root: "/opt/mnw".into(),
154 service_name: "makenotwork.service".into(),
155 actuate: crate::topology::default_actuate(),
156 observe: crate::topology::default_observe(),
157 }
158 }
159
160 #[tokio::test]
161 async fn syncs_tiers_nodes_and_inits_tier_state() {
162 let pool = fresh_pool().await;
163 let t = topo(vec![
164 tier("host", true, vec![]),
165 tier("a", true, vec![node("testnot-1")]),
166 tier("c", false, vec![]),
167 ]);
168
169 sync(&pool, &t).await.unwrap();
170
171 let tier_names: Vec<String> = sqlx::query_scalar("SELECT name FROM tiers ORDER BY ord")
172 .fetch_all(&pool).await.unwrap();
173 assert_eq!(tier_names, vec!["host", "a", "c"]);
174
175 let node_names: Vec<String> = sqlx::query_scalar("SELECT name FROM nodes")
176 .fetch_all(&pool).await.unwrap();
177 assert_eq!(node_names, vec!["testnot-1"]);
178
179 let state_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tier_state")
180 .fetch_one(&pool).await.unwrap();
181 assert_eq!(state_count, 3);
182 }
183
184 #[tokio::test]
185 async fn second_sync_is_idempotent() {
186 let pool = fresh_pool().await;
187 let t = topo(vec![tier("host", true, vec![]), tier("a", true, vec![node("n1")])]);
188 sync(&pool, &t).await.unwrap();
189 sync(&pool, &t).await.unwrap();
190
191 let nodes: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM nodes")
192 .fetch_one(&pool).await.unwrap();
193 assert_eq!(nodes, 1);
194 let states: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tier_state")
195 .fetch_one(&pool).await.unwrap();
196 assert_eq!(states, 2);
197 }
198
199 #[tokio::test]
200 async fn removing_node_from_config_drops_row() {
201 let pool = fresh_pool().await;
202 let t1 = topo(vec![tier("a", true, vec![node("n1"), node("n2")])]);
203 sync(&pool, &t1).await.unwrap();
204 let t2 = topo(vec![tier("a", true, vec![node("n1")])]);
205 sync(&pool, &t2).await.unwrap();
206
207 let nodes: Vec<String> = sqlx::query_scalar("SELECT name FROM nodes")
208 .fetch_all(&pool).await.unwrap();
209 assert_eq!(nodes, vec!["n1"]);
210 }
211
212 #[tokio::test]
213 async fn refuses_to_drop_tier_with_pinned_version() {
214 let pool = fresh_pool().await;
215 let t1 = topo(vec![tier("host", true, vec![]), tier("a", true, vec![])]);
216 sync(&pool, &t1).await.unwrap();
217
218 // Simulate a version being deployed on tier a.
219 sqlx::query("INSERT INTO versions (version, git_sha, built_at, artifact_path) VALUES ('0.1.0', 'deadbeef', '2026-05-22T00:00:00Z', '/r/0.1.0')")
220 .execute(&pool).await.unwrap();
221 sqlx::query("UPDATE tier_state SET current_version = '0.1.0' WHERE tier = 'a'")
222 .execute(&pool).await.unwrap();
223
224 let t2 = topo(vec![tier("host", true, vec![])]);
225 let err = sync(&pool, &t2).await.unwrap_err();
226 assert!(err.to_string().contains("tier_state still pins"), "got: {err}");
227 }
228 }
229