//! Atomic symlink-swap deploys. //! //! Layout on every target (local host, A nodes, B nodes, ...): //! //! / //! releases/ //! 0.8.1/ //! //! 0.8.2/ //! //! current -> releases/0.8.2 //! //! `ln -sfn` swaps the symlink. systemd units point at //! `/current/` so reload-or-restart picks up the new //! binary without ever pointing at a missing path. //! //! The host-side transport — `ssh` for shell steps, `rsync` for the release //! dir — comes from the shared [`ops_exec::Executor`] (a `LocalExec` for //! `ssh_target = "local"`, an `SshExec` otherwise), built once per node in //! [`crate::state`]. This module owns the *deploy choreography* (mkdir, push, //! atomic swap, restart, gc); the transport is the crate's. SSH push behavior //! is identical to the pre-extraction code — this is a transport extraction, //! not a model change. use crate::topology::Node; use anyhow::{Context, Result}; use async_trait::async_trait; use ops_exec::{Action, Executor, LogSink, RunOutput, Step, SyncOpts, sh_quote}; use std::path::{Path, PathBuf}; use tokio::process::Command; /// Keep this many release dirs per node; older ones get gc'd after a /// successful deploy. Fixed for now; promote to config if the constant ever /// needs to vary by tier. const RELEASES_TO_KEEP: usize = 5; /// A sink that drops streamed bytes. Deploy steps don't have a live-log handle /// (gates do), so output is discarded as it streams; [`RunOutput`] still /// captures the full stdout/stderr for error reporting, preserving the /// pre-extraction behavior of surfacing `stderr` in failure messages. struct DiscardSink; #[async_trait] impl LogSink for DiscardSink { async fn write_chunk(&mut self, _bytes: &[u8]) {} } /// Run a shell step through `executor`, treating a non-zero exit as an error /// whose message carries the captured stderr — exactly as the old bespoke /// `ssh()` helper did (`ssh failed: `). async fn run_checked(executor: &dyn Executor, script: &str, what: &str) -> Result { let step = Step::shell(Action::Deploy, script); let mut sink = DiscardSink; let out = executor .run_streaming(&step, &mut sink) .await .with_context(|| format!("{what}: spawning command"))?; anyhow::ensure!( out.status.success(), "{what} failed (exit {}): {}", out.status.code().map(|c| c.to_string()).unwrap_or_else(|| "signal".into()), String::from_utf8_lossy(&out.stderr), ); Ok(out) } pub async fn deploy_local( release_root: &Path, version: &crate::domain::Version, binaries: &[PathBuf], ) -> Result { let release_dir = release_root.join("releases").join(version.to_string()); tokio::fs::create_dir_all(&release_dir).await?; for binary in binaries { let name = binary.file_name() .context("binary path has no file name")?; let dest = release_dir.join(name); tokio::fs::copy(binary, &dest) .await .with_context(|| format!("copy {} -> {}", binary.display(), dest.display()))?; } let current = release_root.join("current"); let target = format!("releases/{version}"); let out = Command::new("ln") .args(["-sfn", &target]) .arg(¤t) .output() .await?; anyhow::ensure!( out.status.success(), "symlink swap failed: {}", String::from_utf8_lossy(&out.stderr), ); if let Err(e) = gc_local_releases(release_root).await { tracing::warn!(error = %e, "local release GC failed (non-fatal)"); } Ok(release_dir) } /// Deploy `staged_release_dir` (a directory built on the Sando host by /// `deploy_local`) to `node` using `executor` (its transport from the topology /// executor map). For `ssh_target=local`, this is just a symlink swap; for /// remote nodes, we rsync the whole dir over the executor. /// /// `primary_bin` is only used for logging — every file present in the staged /// dir gets shipped. pub async fn deploy_node( executor: &dyn Executor, node: &Node, version: &str, staged_release_dir: &Path, primary_bin: &str, ) -> Result { if node.ssh_target == "local" || node.ssh_target.is_empty() { // Local deploy already happened when we staged on the Sando host. // Just re-point `current` at the staged dir. return reset_local_current(executor, Path::new(&node.release_root), version).await; } deploy_remote(executor, node, version, staged_release_dir, primary_bin).await } async fn reset_local_current( executor: &dyn Executor, release_root: &Path, version: &str, ) -> Result { let current = release_root.join("current"); let target = format!("releases/{version}"); run_checked( executor, &format!("ln -sfn {} {}", sh_quote(&target), sh_quote(¤t.to_string_lossy())), "local symlink swap", ) .await?; Ok(release_root.join("releases").join(version)) } async fn deploy_remote( executor: &dyn Executor, node: &Node, version: &str, staged_release_dir: &Path, primary_bin: &str, ) -> Result { let release_root = &node.release_root; let service = &node.service_name; let release_dir = format!("{release_root}/releases/{version}"); tracing::info!(node = %node.name, version, "deploy: mkdir release dir"); run_checked( executor, &format!("set -e; mkdir -p {q}", q = sh_quote(&release_dir)), "creating remote release dir", ) .await?; tracing::info!(node = %node.name, version, primary = %primary_bin, "deploy: rsync release dir"); // Rsync the whole staged dir (binaries + every release_contents entry). // `SyncOpts::release_mirror()` is the exact pre-extraction rsync flag set: // -az --partial --delete --chmod=Du=rwx,Dgo=rx,Fu=rw,Fgo=r,F+X. // --delete: removed assets across versions don't accumulate on the // target; the bundle stays self-contained per version. // --chmod: F+X preserves the execute bit per-file (binaries land 0755, // data files 0644) instead of a blanket 0755. executor .push(staged_release_dir, Path::new(&release_dir), &SyncOpts::release_mirror()) .await .context("rsync failed (current symlink left intact)")?; tracing::info!(node = %node.name, version, "deploy: symlink swap + service reload"); // Symlink swap is atomic via `mv -T` of a freshly-created symlink over the // old one (the rename(2) is the atomic step; `ln -sfn` does unlink+symlink // which has a window). let swap_and_restart = format!( "set -e; \ cd {root}; \ ln -sfn releases/{ver} current.new; \ mv -Tf current.new current; \ sudo /bin/systemctl reload-or-restart {svc}", root = sh_quote(release_root), ver = sh_quote(version), svc = sh_quote(service), ); run_checked(executor, &swap_and_restart, "symlink swap + systemctl reload-or-restart").await?; if let Err(e) = gc_remote_releases(executor, release_root).await { tracing::warn!(error = %e, "remote release GC failed (non-fatal)"); } Ok(PathBuf::from(release_root).join("releases").join(version)) } async fn gc_local_releases(release_root: &Path) -> Result<()> { let releases = release_root.join("releases"); if !releases.exists() { return Ok(()); } let mut entries = Vec::new(); let mut rd = tokio::fs::read_dir(&releases).await?; while let Some(entry) = rd.next_entry().await? { if !entry.file_type().await?.is_dir() { continue; } let meta = entry.metadata().await?; entries.push((entry.path(), meta.modified()?)); } entries.sort_by(|a, b| b.1.cmp(&a.1)); for (path, _) in entries.into_iter().skip(RELEASES_TO_KEEP) { if let Err(e) = tokio::fs::remove_dir_all(&path).await { tracing::warn!(path = %path.display(), error = %e, "gc: rm failed"); } else { tracing::debug!(path = %path.display(), "gc: removed old release"); } } Ok(()) } async fn gc_remote_releases(executor: &dyn Executor, release_root: &str) -> Result<()> { // `ls -t` orders by mtime desc. Skip the first N, rm the rest. `xargs -r` // is a no-op when stdin is empty (avoids `rm` complaining). let script = format!( "set -e; cd {root}/releases 2>/dev/null || exit 0; \ ls -1t | tail -n +{keep_plus_one} | xargs -r -I{{}} rm -rf -- {{}}", root = sh_quote(release_root), keep_plus_one = RELEASES_TO_KEEP + 1, ); run_checked(executor, &script, "remote release gc").await.map(|_| ()) } #[cfg(test)] mod tests { use super::*; use ops_exec::{CapabilitySet, LocalExec, SshExec}; use std::time::SystemTime; /// A LocalExec granted the default node capabilities (deploy + restart). fn local_executor() -> LocalExec { LocalExec::new(CapabilitySet::from_tokens(["deploy", "restart"], ["health"])) } #[tokio::test] async fn deploy_local_copies_multiple_binaries_and_swaps_symlink() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let src_dir = root.join("src"); tokio::fs::create_dir_all(&src_dir).await.unwrap(); let primary = src_dir.join("makenotwork"); let admin = src_dir.join("mnw-admin"); tokio::fs::write(&primary, b"PRIMARY").await.unwrap(); tokio::fs::write(&admin, b"ADMIN").await.unwrap(); let release_root = root.join("releases-root"); tokio::fs::create_dir_all(&release_root).await.unwrap(); let staged = deploy_local( &release_root, &"0.8.12".parse().unwrap(), &[primary.clone(), admin.clone()], ) .await .expect("deploy_local should succeed"); assert_eq!(staged, release_root.join("releases").join("0.8.12")); assert_eq!(tokio::fs::read(staged.join("makenotwork")).await.unwrap(), b"PRIMARY"); assert_eq!(tokio::fs::read(staged.join("mnw-admin")).await.unwrap(), b"ADMIN"); let current = release_root.join("current"); let target = tokio::fs::read_link(¤t).await.unwrap(); assert_eq!(target.to_string_lossy(), "releases/0.8.12"); let via_current = tokio::fs::read(current.join("makenotwork")).await.unwrap(); assert_eq!(via_current, b"PRIMARY"); } #[tokio::test] async fn deploy_local_second_release_swaps_symlink_and_keeps_old_dir() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let src_dir = root.join("src"); tokio::fs::create_dir_all(&src_dir).await.unwrap(); let bin = src_dir.join("server"); tokio::fs::write(&bin, b"V1").await.unwrap(); let release_root = root.join("rr"); tokio::fs::create_dir_all(&release_root).await.unwrap(); deploy_local(&release_root, &"0.1.0".parse().unwrap(), &[bin.clone()]).await.unwrap(); tokio::fs::write(&bin, b"V2").await.unwrap(); deploy_local(&release_root, &"0.2.0".parse().unwrap(), &[bin.clone()]).await.unwrap(); assert!(release_root.join("releases/0.1.0/server").exists()); assert!(release_root.join("releases/0.2.0/server").exists()); let target = tokio::fs::read_link(release_root.join("current")).await.unwrap(); assert_eq!(target.to_string_lossy(), "releases/0.2.0"); let via_current = tokio::fs::read(release_root.join("current/server")).await.unwrap(); assert_eq!(via_current, b"V2"); } #[tokio::test] async fn gc_local_releases_keeps_last_n_by_mtime() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let releases = root.join("releases"); tokio::fs::create_dir_all(&releases).await.unwrap(); let total = RELEASES_TO_KEEP + 3; let mut names = Vec::new(); for i in 0..total { let name = format!("v{i:02}"); let dir = releases.join(&name); tokio::fs::create_dir(&dir).await.unwrap(); let f = std::fs::File::open(&dir).unwrap(); let when = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000 + i as u64); let times = std::fs::FileTimes::new().set_modified(when); f.set_times(times).unwrap(); names.push(name); } gc_local_releases(root).await.unwrap(); let surviving_expected: Vec<_> = names .iter() .skip(total - RELEASES_TO_KEEP) .cloned() .collect(); for name in &surviving_expected { assert!(releases.join(name).exists(), "expected to survive: {name}"); } for name in names.iter().take(total - RELEASES_TO_KEEP) { assert!(!releases.join(name).exists(), "expected to be pruned: {name}"); } } #[tokio::test] async fn gc_local_releases_noop_when_below_threshold() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let releases = root.join("releases"); tokio::fs::create_dir_all(&releases).await.unwrap(); for i in 0..3 { tokio::fs::create_dir(releases.join(format!("v{i}"))).await.unwrap(); } gc_local_releases(root).await.unwrap(); for i in 0..3 { assert!(releases.join(format!("v{i}")).exists()); } } #[tokio::test] async fn gc_local_releases_noop_when_releases_dir_missing() { let tmp = tempfile::tempdir().unwrap(); gc_local_releases(tmp.path()).await.unwrap(); } #[tokio::test] async fn deploy_remote_fails_cleanly_when_host_unreachable() { // 192.0.2.0/24 is reserved for documentation and routes nowhere. // ConnectTimeout=10 limits the test wallclock to ~10s worst case. let tmp = tempfile::tempdir().unwrap(); let staged = tmp.path().join("releases").join("0.0.1"); tokio::fs::create_dir_all(&staged).await.unwrap(); tokio::fs::write(staged.join("server"), b"x").await.unwrap(); let node = crate::topology::Node { name: "unreachable".into(), ssh_target: "deploy@192.0.2.1".into(), release_root: "/opt/never".into(), service_name: "makenotwork.service".into(), actuate: crate::topology::default_actuate(), observe: crate::topology::default_observe(), }; let executor = SshExec::new( node.ssh_target.clone(), CapabilitySet::from_tokens(["deploy", "restart"], ["health"]), ); let result = deploy_node(&executor, &node, "0.0.1", &staged, "server").await; let err = result.expect_err("deploy to unreachable host should fail"); let msg = format!("{err:#}"); // Don't pin exact wording, just that the failure is attributed (ssh / // rsync / connection) and that no panic / hang happened. assert!( msg.contains("ssh") || msg.contains("rsync") || msg.contains("connection") || msg.contains("Connection"), "unexpected error: {msg}" ); } #[tokio::test] async fn deploy_node_with_local_ssh_target_swaps_symlink() { // ssh_target="local" routes to the local fast-path: just a symlink // swap, no remote calls. let tmp = tempfile::tempdir().unwrap(); let release_root = tmp.path().to_path_buf(); let staged = release_root.join("releases").join("0.0.1"); tokio::fs::create_dir_all(&staged).await.unwrap(); tokio::fs::write(staged.join("server"), b"x").await.unwrap(); let node = crate::topology::Node { name: "local-dev".into(), ssh_target: "local".into(), release_root: release_root.to_string_lossy().into_owned(), service_name: "makenotwork.service".into(), actuate: crate::topology::default_actuate(), observe: crate::topology::default_observe(), }; let executor = local_executor(); let out = deploy_node(&executor, &node, "0.0.1", &staged, "server").await.unwrap(); assert_eq!(out, staged); let target = tokio::fs::read_link(release_root.join("current")).await.unwrap(); assert_eq!(target.to_string_lossy(), "releases/0.0.1"); } #[tokio::test] async fn deploy_node_denied_when_executor_lacks_deploy_grant() { // Defense in depth: an executor without the deploy grant refuses the // step before any filesystem / ssh action. let tmp = tempfile::tempdir().unwrap(); let release_root = tmp.path().to_path_buf(); let staged = release_root.join("releases").join("0.0.1"); tokio::fs::create_dir_all(&staged).await.unwrap(); let node = crate::topology::Node { name: "local-dev".into(), ssh_target: "local".into(), release_root: release_root.to_string_lossy().into_owned(), service_name: "makenotwork.service".into(), actuate: vec!["restart".into()], // no deploy observe: vec![], }; let executor = LocalExec::new(CapabilitySet::from_tokens(["restart"], Vec::<&str>::new())); let err = deploy_node(&executor, &node, "0.0.1", &staged, "server").await.unwrap_err(); assert!(format!("{err:#}").contains("capability denied"), "expected capability denial"); } }