Skip to main content

max / makenotwork

17.2 KB · 441 lines History Blame Raw
1 //! Atomic symlink-swap deploys.
2 //!
3 //! Layout on every target (local host, A nodes, B nodes, ...):
4 //!
5 //! <release_root>/
6 //! releases/
7 //! 0.8.1/
8 //! <bin_name>
9 //! 0.8.2/
10 //! <bin_name>
11 //! current -> releases/0.8.2
12 //!
13 //! `ln -sfn` swaps the symlink. systemd units point at
14 //! `<release_root>/current/<bin_name>` so reload-or-restart picks up the new
15 //! binary without ever pointing at a missing path.
16 //!
17 //! The host-side transport — `ssh` for shell steps, `rsync` for the release
18 //! dir — comes from the shared [`ops_exec::Executor`] (a `LocalExec` for
19 //! `ssh_target = "local"`, an `SshExec` otherwise), built once per node in
20 //! [`crate::state`]. This module owns the *deploy choreography* (mkdir, push,
21 //! atomic swap, restart, gc); the transport is the crate's. SSH push behavior
22 //! is identical to the pre-extraction code — this is a transport extraction,
23 //! not a model change.
24
25 use crate::topology::Node;
26 use anyhow::{Context, Result};
27 use async_trait::async_trait;
28 use ops_exec::{Action, Executor, LogSink, RunOutput, Step, SyncOpts, sh_quote};
29 use std::path::{Path, PathBuf};
30 use tokio::process::Command;
31
32 /// Keep this many release dirs per node; older ones get gc'd after a
33 /// successful deploy. Fixed for now; promote to config if the constant ever
34 /// needs to vary by tier.
35 const RELEASES_TO_KEEP: usize = 5;
36
37 /// A sink that drops streamed bytes. Deploy steps don't have a live-log handle
38 /// (gates do), so output is discarded as it streams; [`RunOutput`] still
39 /// captures the full stdout/stderr for error reporting, preserving the
40 /// pre-extraction behavior of surfacing `stderr` in failure messages.
41 struct DiscardSink;
42
43 #[async_trait]
44 impl LogSink for DiscardSink {
45 async fn write_chunk(&mut self, _bytes: &[u8]) {}
46 }
47
48 /// Run a shell step through `executor`, treating a non-zero exit as an error
49 /// whose message carries the captured stderr — exactly as the old bespoke
50 /// `ssh()` helper did (`ssh <target> failed: <stderr>`).
51 async fn run_checked(executor: &dyn Executor, script: &str, what: &str) -> Result<RunOutput> {
52 let step = Step::shell(Action::Deploy, script);
53 let mut sink = DiscardSink;
54 let out = executor
55 .run_streaming(&step, &mut sink)
56 .await
57 .with_context(|| format!("{what}: spawning command"))?;
58 anyhow::ensure!(
59 out.status.success(),
60 "{what} failed (exit {}): {}",
61 out.status.code().map(|c| c.to_string()).unwrap_or_else(|| "signal".into()),
62 String::from_utf8_lossy(&out.stderr),
63 );
64 Ok(out)
65 }
66
67 pub async fn deploy_local(
68 release_root: &Path,
69 version: &crate::domain::Version,
70 binaries: &[PathBuf],
71 ) -> Result<PathBuf> {
72 let release_dir = release_root.join("releases").join(version.to_string());
73 tokio::fs::create_dir_all(&release_dir).await?;
74 for binary in binaries {
75 let name = binary.file_name()
76 .context("binary path has no file name")?;
77 let dest = release_dir.join(name);
78 tokio::fs::copy(binary, &dest)
79 .await
80 .with_context(|| format!("copy {} -> {}", binary.display(), dest.display()))?;
81 }
82
83 let current = release_root.join("current");
84 let target = format!("releases/{version}");
85 let out = Command::new("ln")
86 .args(["-sfn", &target])
87 .arg(&current)
88 .output()
89 .await?;
90 anyhow::ensure!(
91 out.status.success(),
92 "symlink swap failed: {}",
93 String::from_utf8_lossy(&out.stderr),
94 );
95
96 if let Err(e) = gc_local_releases(release_root).await {
97 tracing::warn!(error = %e, "local release GC failed (non-fatal)");
98 }
99 Ok(release_dir)
100 }
101
102 /// Deploy `staged_release_dir` (a directory built on the Sando host by
103 /// `deploy_local`) to `node` using `executor` (its transport from the topology
104 /// executor map). For `ssh_target=local`, this is just a symlink swap; for
105 /// remote nodes, we rsync the whole dir over the executor.
106 ///
107 /// `primary_bin` is only used for logging — every file present in the staged
108 /// dir gets shipped.
109 pub async fn deploy_node(
110 executor: &dyn Executor,
111 node: &Node,
112 version: &str,
113 staged_release_dir: &Path,
114 primary_bin: &str,
115 ) -> Result<PathBuf> {
116 if node.ssh_target == "local" || node.ssh_target.is_empty() {
117 // Local deploy already happened when we staged on the Sando host.
118 // Just re-point `current` at the staged dir.
119 return reset_local_current(executor, Path::new(&node.release_root), version).await;
120 }
121 deploy_remote(executor, node, version, staged_release_dir, primary_bin).await
122 }
123
124 async fn reset_local_current(
125 executor: &dyn Executor,
126 release_root: &Path,
127 version: &str,
128 ) -> Result<PathBuf> {
129 let current = release_root.join("current");
130 let target = format!("releases/{version}");
131 run_checked(
132 executor,
133 &format!("ln -sfn {} {}", sh_quote(&target), sh_quote(&current.to_string_lossy())),
134 "local symlink swap",
135 )
136 .await?;
137 Ok(release_root.join("releases").join(version))
138 }
139
140 async fn deploy_remote(
141 executor: &dyn Executor,
142 node: &Node,
143 version: &str,
144 staged_release_dir: &Path,
145 primary_bin: &str,
146 ) -> Result<PathBuf> {
147 let release_root = &node.release_root;
148 let service = &node.service_name;
149 let release_dir = format!("{release_root}/releases/{version}");
150
151 tracing::info!(node = %node.name, version, "deploy: mkdir release dir");
152 run_checked(
153 executor,
154 &format!("set -e; mkdir -p {q}", q = sh_quote(&release_dir)),
155 "creating remote release dir",
156 )
157 .await?;
158
159 tracing::info!(node = %node.name, version, primary = %primary_bin, "deploy: rsync release dir");
160 // Rsync the whole staged dir (binaries + every release_contents entry).
161 // `SyncOpts::release_mirror()` is the exact pre-extraction rsync flag set:
162 // -az --partial --delete --chmod=Du=rwx,Dgo=rx,Fu=rw,Fgo=r,F+X.
163 // --delete: removed assets across versions don't accumulate on the
164 // target; the bundle stays self-contained per version.
165 // --chmod: F+X preserves the execute bit per-file (binaries land 0755,
166 // data files 0644) instead of a blanket 0755.
167 executor
168 .push(staged_release_dir, Path::new(&release_dir), &SyncOpts::release_mirror())
169 .await
170 .context("rsync failed (current symlink left intact)")?;
171
172 tracing::info!(node = %node.name, version, "deploy: symlink swap + service reload");
173 // Symlink swap is atomic via `mv -T` of a freshly-created symlink over the
174 // old one (the rename(2) is the atomic step; `ln -sfn` does unlink+symlink
175 // which has a window).
176 let swap_and_restart = format!(
177 "set -e; \
178 cd {root}; \
179 ln -sfn releases/{ver} current.new; \
180 mv -Tf current.new current; \
181 sudo /bin/systemctl reload-or-restart {svc}",
182 root = sh_quote(release_root),
183 ver = sh_quote(version),
184 svc = sh_quote(service),
185 );
186 run_checked(executor, &swap_and_restart, "symlink swap + systemctl reload-or-restart").await?;
187
188 if let Err(e) = gc_remote_releases(executor, release_root).await {
189 tracing::warn!(error = %e, "remote release GC failed (non-fatal)");
190 }
191
192 Ok(PathBuf::from(release_root).join("releases").join(version))
193 }
194
195 async fn gc_local_releases(release_root: &Path) -> Result<()> {
196 let releases = release_root.join("releases");
197 if !releases.exists() {
198 return Ok(());
199 }
200 let mut entries = Vec::new();
201 let mut rd = tokio::fs::read_dir(&releases).await?;
202 while let Some(entry) = rd.next_entry().await? {
203 if !entry.file_type().await?.is_dir() {
204 continue;
205 }
206 let meta = entry.metadata().await?;
207 entries.push((entry.path(), meta.modified()?));
208 }
209 entries.sort_by(|a, b| b.1.cmp(&a.1));
210 for (path, _) in entries.into_iter().skip(RELEASES_TO_KEEP) {
211 if let Err(e) = tokio::fs::remove_dir_all(&path).await {
212 tracing::warn!(path = %path.display(), error = %e, "gc: rm failed");
213 } else {
214 tracing::debug!(path = %path.display(), "gc: removed old release");
215 }
216 }
217 Ok(())
218 }
219
220 async fn gc_remote_releases(executor: &dyn Executor, release_root: &str) -> Result<()> {
221 // `ls -t` orders by mtime desc. Skip the first N, rm the rest. `xargs -r`
222 // is a no-op when stdin is empty (avoids `rm` complaining).
223 let script = format!(
224 "set -e; cd {root}/releases 2>/dev/null || exit 0; \
225 ls -1t | tail -n +{keep_plus_one} | xargs -r -I{{}} rm -rf -- {{}}",
226 root = sh_quote(release_root),
227 keep_plus_one = RELEASES_TO_KEEP + 1,
228 );
229 run_checked(executor, &script, "remote release gc").await.map(|_| ())
230 }
231
232 #[cfg(test)]
233 mod tests {
234 use super::*;
235 use ops_exec::{CapabilitySet, LocalExec, SshExec};
236 use std::time::SystemTime;
237
238 /// A LocalExec granted the default node capabilities (deploy + restart).
239 fn local_executor() -> LocalExec {
240 LocalExec::new(CapabilitySet::from_tokens(["deploy", "restart"], ["health"]))
241 }
242
243 #[tokio::test]
244 async fn deploy_local_copies_multiple_binaries_and_swaps_symlink() {
245 let tmp = tempfile::tempdir().unwrap();
246 let root = tmp.path();
247
248 let src_dir = root.join("src");
249 tokio::fs::create_dir_all(&src_dir).await.unwrap();
250 let primary = src_dir.join("makenotwork");
251 let admin = src_dir.join("mnw-admin");
252 tokio::fs::write(&primary, b"PRIMARY").await.unwrap();
253 tokio::fs::write(&admin, b"ADMIN").await.unwrap();
254
255 let release_root = root.join("releases-root");
256 tokio::fs::create_dir_all(&release_root).await.unwrap();
257
258 let staged = deploy_local(
259 &release_root,
260 &"0.8.12".parse().unwrap(),
261 &[primary.clone(), admin.clone()],
262 )
263 .await
264 .expect("deploy_local should succeed");
265
266 assert_eq!(staged, release_root.join("releases").join("0.8.12"));
267 assert_eq!(tokio::fs::read(staged.join("makenotwork")).await.unwrap(), b"PRIMARY");
268 assert_eq!(tokio::fs::read(staged.join("mnw-admin")).await.unwrap(), b"ADMIN");
269
270 let current = release_root.join("current");
271 let target = tokio::fs::read_link(&current).await.unwrap();
272 assert_eq!(target.to_string_lossy(), "releases/0.8.12");
273 let via_current = tokio::fs::read(current.join("makenotwork")).await.unwrap();
274 assert_eq!(via_current, b"PRIMARY");
275 }
276
277 #[tokio::test]
278 async fn deploy_local_second_release_swaps_symlink_and_keeps_old_dir() {
279 let tmp = tempfile::tempdir().unwrap();
280 let root = tmp.path();
281 let src_dir = root.join("src");
282 tokio::fs::create_dir_all(&src_dir).await.unwrap();
283 let bin = src_dir.join("server");
284 tokio::fs::write(&bin, b"V1").await.unwrap();
285
286 let release_root = root.join("rr");
287 tokio::fs::create_dir_all(&release_root).await.unwrap();
288
289 deploy_local(&release_root, &"0.1.0".parse().unwrap(), &[bin.clone()]).await.unwrap();
290 tokio::fs::write(&bin, b"V2").await.unwrap();
291 deploy_local(&release_root, &"0.2.0".parse().unwrap(), &[bin.clone()]).await.unwrap();
292
293 assert!(release_root.join("releases/0.1.0/server").exists());
294 assert!(release_root.join("releases/0.2.0/server").exists());
295 let target = tokio::fs::read_link(release_root.join("current")).await.unwrap();
296 assert_eq!(target.to_string_lossy(), "releases/0.2.0");
297 let via_current = tokio::fs::read(release_root.join("current/server")).await.unwrap();
298 assert_eq!(via_current, b"V2");
299 }
300
301 #[tokio::test]
302 async fn gc_local_releases_keeps_last_n_by_mtime() {
303 let tmp = tempfile::tempdir().unwrap();
304 let root = tmp.path();
305 let releases = root.join("releases");
306 tokio::fs::create_dir_all(&releases).await.unwrap();
307
308 let total = RELEASES_TO_KEEP + 3;
309 let mut names = Vec::new();
310 for i in 0..total {
311 let name = format!("v{i:02}");
312 let dir = releases.join(&name);
313 tokio::fs::create_dir(&dir).await.unwrap();
314 let f = std::fs::File::open(&dir).unwrap();
315 let when = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000 + i as u64);
316 let times = std::fs::FileTimes::new().set_modified(when);
317 f.set_times(times).unwrap();
318 names.push(name);
319 }
320
321 gc_local_releases(root).await.unwrap();
322
323 let surviving_expected: Vec<_> = names
324 .iter()
325 .skip(total - RELEASES_TO_KEEP)
326 .cloned()
327 .collect();
328 for name in &surviving_expected {
329 assert!(releases.join(name).exists(), "expected to survive: {name}");
330 }
331 for name in names.iter().take(total - RELEASES_TO_KEEP) {
332 assert!(!releases.join(name).exists(), "expected to be pruned: {name}");
333 }
334 }
335
336 #[tokio::test]
337 async fn gc_local_releases_noop_when_below_threshold() {
338 let tmp = tempfile::tempdir().unwrap();
339 let root = tmp.path();
340 let releases = root.join("releases");
341 tokio::fs::create_dir_all(&releases).await.unwrap();
342 for i in 0..3 {
343 tokio::fs::create_dir(releases.join(format!("v{i}"))).await.unwrap();
344 }
345 gc_local_releases(root).await.unwrap();
346 for i in 0..3 {
347 assert!(releases.join(format!("v{i}")).exists());
348 }
349 }
350
351 #[tokio::test]
352 async fn gc_local_releases_noop_when_releases_dir_missing() {
353 let tmp = tempfile::tempdir().unwrap();
354 gc_local_releases(tmp.path()).await.unwrap();
355 }
356
357 #[tokio::test]
358 async fn deploy_remote_fails_cleanly_when_host_unreachable() {
359 // 192.0.2.0/24 is reserved for documentation and routes nowhere.
360 // ConnectTimeout=10 limits the test wallclock to ~10s worst case.
361 let tmp = tempfile::tempdir().unwrap();
362 let staged = tmp.path().join("releases").join("0.0.1");
363 tokio::fs::create_dir_all(&staged).await.unwrap();
364 tokio::fs::write(staged.join("server"), b"x").await.unwrap();
365
366 let node = crate::topology::Node {
367 name: "unreachable".into(),
368 ssh_target: "deploy@192.0.2.1".into(),
369 release_root: "/opt/never".into(),
370 service_name: "makenotwork.service".into(),
371 actuate: crate::topology::default_actuate(),
372 observe: crate::topology::default_observe(),
373 };
374 let executor = SshExec::new(
375 node.ssh_target.clone(),
376 CapabilitySet::from_tokens(["deploy", "restart"], ["health"]),
377 );
378
379 let result = deploy_node(&executor, &node, "0.0.1", &staged, "server").await;
380 let err = result.expect_err("deploy to unreachable host should fail");
381 let msg = format!("{err:#}");
382 // Don't pin exact wording, just that the failure is attributed (ssh /
383 // rsync / connection) and that no panic / hang happened.
384 assert!(
385 msg.contains("ssh")
386 || msg.contains("rsync")
387 || msg.contains("connection")
388 || msg.contains("Connection"),
389 "unexpected error: {msg}"
390 );
391 }
392
393 #[tokio::test]
394 async fn deploy_node_with_local_ssh_target_swaps_symlink() {
395 // ssh_target="local" routes to the local fast-path: just a symlink
396 // swap, no remote calls.
397 let tmp = tempfile::tempdir().unwrap();
398 let release_root = tmp.path().to_path_buf();
399 let staged = release_root.join("releases").join("0.0.1");
400 tokio::fs::create_dir_all(&staged).await.unwrap();
401 tokio::fs::write(staged.join("server"), b"x").await.unwrap();
402
403 let node = crate::topology::Node {
404 name: "local-dev".into(),
405 ssh_target: "local".into(),
406 release_root: release_root.to_string_lossy().into_owned(),
407 service_name: "makenotwork.service".into(),
408 actuate: crate::topology::default_actuate(),
409 observe: crate::topology::default_observe(),
410 };
411 let executor = local_executor();
412
413 let out = deploy_node(&executor, &node, "0.0.1", &staged, "server").await.unwrap();
414 assert_eq!(out, staged);
415 let target = tokio::fs::read_link(release_root.join("current")).await.unwrap();
416 assert_eq!(target.to_string_lossy(), "releases/0.0.1");
417 }
418
419 #[tokio::test]
420 async fn deploy_node_denied_when_executor_lacks_deploy_grant() {
421 // Defense in depth: an executor without the deploy grant refuses the
422 // step before any filesystem / ssh action.
423 let tmp = tempfile::tempdir().unwrap();
424 let release_root = tmp.path().to_path_buf();
425 let staged = release_root.join("releases").join("0.0.1");
426 tokio::fs::create_dir_all(&staged).await.unwrap();
427
428 let node = crate::topology::Node {
429 name: "local-dev".into(),
430 ssh_target: "local".into(),
431 release_root: release_root.to_string_lossy().into_owned(),
432 service_name: "makenotwork.service".into(),
433 actuate: vec!["restart".into()], // no deploy
434 observe: vec![],
435 };
436 let executor = LocalExec::new(CapabilitySet::from_tokens(["restart"], Vec::<&str>::new()));
437 let err = deploy_node(&executor, &node, "0.0.1", &staged, "server").await.unwrap_err();
438 assert!(format!("{err:#}").contains("capability denied"), "expected capability denial");
439 }
440 }
441