Skip to main content

max / makenotwork

8.5 KB · 240 lines History Blame Raw
1 //! Concrete [`Executor`] transports.
2 //!
3 //! - [`LocalExec`] — spawn on the local machine (Sando's `ssh_target = "local"`
4 //! fast-path; the agent's own in-process execution).
5 //! - [`SshExec`] — spawn `ssh` / `rsync` over the tailnet. Auth is the existing
6 //! SSH keys; the rsync push/pull is extracted near-verbatim from Sando's
7 //! `deploy.rs`.
8 //!
9 //! Both render a [`Step`] to one `/bin/sh` line and run it through the shared
10 //! [`RemoteHost`] command builder, so local and remote shell semantics are
11 //! identical. The capability gate is enforced here, caller-side, before any
12 //! command is dispatched.
13
14 use crate::capability::{CapabilityDenied, CapabilitySet};
15 use crate::executor::{Executor, SyncOpts, run_command_into_sink};
16 use crate::remote::{LogSink, RemoteHost, RunOutput, SSH_FLAGS, sh_quote};
17 use crate::step::Step;
18 use anyhow::{Context, Result};
19 use async_trait::async_trait;
20 use std::path::Path;
21 use tokio::process::Command;
22
23 /// Render a step to a single `/bin/sh` command line: optional `cd`, env
24 /// exports, then the body (a shell script verbatim, or sh-quoted argv).
25 fn render_shell_line(step: &Step) -> String {
26 let mut line = String::new();
27 if let Some(cwd) = &step.cwd {
28 line.push_str(&format!("cd {} && ", sh_quote(&cwd.to_string_lossy())));
29 }
30 for (k, v) in &step.env {
31 line.push_str(&format!("{}={} ", k, sh_quote(v)));
32 }
33 match step.shell_script() {
34 Some(script) => line.push_str(script),
35 None => {
36 let parts: Vec<String> = step.argv.iter().map(|a| sh_quote(a)).collect();
37 line.push_str(&parts.join(" "));
38 }
39 }
40 line
41 }
42
43 /// Enforce the caller-side capability gate, returning [`CapabilityDenied`] as
44 /// an `anyhow::Error` the caller can downcast for audit logging.
45 fn gate(caps: &CapabilitySet, host: &str, step: &Step) -> Result<()> {
46 if caps.permits(&step.action) {
47 Ok(())
48 } else {
49 Err(CapabilityDenied::new(host, &step.action).into())
50 }
51 }
52
53 /// Build the rsync `Command` shared by local and ssh push/pull. `remote_spec`
54 /// is either a plain path (local) or `target:path` (ssh).
55 fn rsync_command(src: &str, dst: &str, over_ssh: bool, opts: &SyncOpts) -> Command {
56 let mut rsync = Command::new("rsync");
57 rsync.arg("-az").arg("--partial");
58 if opts.delete {
59 rsync.arg("--delete");
60 }
61 if let Some(chmod) = &opts.chmod {
62 rsync.arg(format!("--chmod={chmod}"));
63 }
64 if over_ssh {
65 rsync.arg("-e").arg(format!("ssh {}", SSH_FLAGS.join(" ")));
66 }
67 rsync.arg(src).arg(dst);
68 rsync
69 }
70
71 async fn run_rsync(mut cmd: Command, what: &str) -> Result<()> {
72 let out = cmd.output().await.with_context(|| format!("spawning rsync ({what})"))?;
73 anyhow::ensure!(
74 out.status.success(),
75 "rsync {what} failed: {}",
76 String::from_utf8_lossy(&out.stderr),
77 );
78 Ok(())
79 }
80
81 /// The local-machine transport.
82 pub struct LocalExec {
83 host: RemoteHost,
84 caps: CapabilitySet,
85 }
86
87 impl LocalExec {
88 pub fn new(caps: CapabilitySet) -> Self {
89 Self { host: RemoteHost::new("local"), caps }
90 }
91 }
92
93 #[async_trait]
94 impl Executor for LocalExec {
95 async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result<RunOutput> {
96 gate(&self.caps, "local", step)?;
97 let cmd = self.host.command(&render_shell_line(step));
98 run_command_into_sink(cmd, sink).await
99 }
100
101 async fn pull(&self, remote: &Path, local: &Path, opts: &SyncOpts) -> Result<()> {
102 // Local "pull" is just a local rsync; trailing slash = contents.
103 let src = format!("{}/", remote.display());
104 run_rsync(rsync_command(&src, &local.to_string_lossy(), false, opts), "pull(local)").await
105 }
106
107 async fn push(&self, local: &Path, remote: &Path, opts: &SyncOpts) -> Result<()> {
108 let src = format!("{}/", local.display());
109 run_rsync(rsync_command(&src, &remote.to_string_lossy(), false, opts), "push(local)").await
110 }
111
112 fn capabilities(&self) -> &CapabilitySet {
113 &self.caps
114 }
115 }
116
117 /// The SSH transport: a tailnet host reached with the existing SSH keys.
118 pub struct SshExec {
119 host: RemoteHost,
120 caps: CapabilitySet,
121 }
122
123 impl SshExec {
124 pub fn new(ssh_target: impl Into<String>, caps: CapabilitySet) -> Self {
125 Self { host: RemoteHost::new(ssh_target), caps }
126 }
127
128 pub fn ssh_target(&self) -> &str {
129 self.host.ssh_target()
130 }
131 }
132
133 #[async_trait]
134 impl Executor for SshExec {
135 async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result<RunOutput> {
136 gate(&self.caps, self.host.ssh_target(), step)?;
137 let cmd = self.host.command(&render_shell_line(step));
138 run_command_into_sink(cmd, sink).await
139 }
140
141 async fn pull(&self, remote: &Path, local: &Path, opts: &SyncOpts) -> Result<()> {
142 let src = format!("{}:{}/", self.host.ssh_target(), remote.display());
143 run_rsync(rsync_command(&src, &local.to_string_lossy(), true, opts), "pull(ssh)").await
144 }
145
146 async fn push(&self, local: &Path, remote: &Path, opts: &SyncOpts) -> Result<()> {
147 let src = format!("{}/", local.display());
148 let dst = format!("{}:{}/", self.host.ssh_target(), remote.display());
149 run_rsync(rsync_command(&src, &dst, true, opts), "push(ssh)").await
150 }
151
152 fn capabilities(&self) -> &CapabilitySet {
153 &self.caps
154 }
155 }
156
157 #[cfg(test)]
158 mod tests {
159 use super::*;
160 use crate::step::Action;
161 use std::sync::Arc;
162
163 #[derive(Default)]
164 struct VecSink(Vec<u8>);
165 #[async_trait]
166 impl LogSink for VecSink {
167 async fn write_chunk(&mut self, bytes: &[u8]) {
168 self.0.extend_from_slice(bytes);
169 }
170 }
171
172 fn vec_sink() -> VecSink {
173 VecSink::default()
174 }
175
176 #[test]
177 fn render_plain_argv_quotes_each_token() {
178 let step = Step::new(Action::Build, ["echo", "a b", "c"]);
179 assert_eq!(render_shell_line(&step), "'echo' 'a b' 'c'");
180 }
181
182 #[test]
183 fn render_shell_script_is_verbatim_with_env_and_cwd() {
184 let step = Step::shell(Action::Deploy, "set -e; echo hi")
185 .with_env("K", "v v")
186 .with_cwd("/tmp/x");
187 assert_eq!(render_shell_line(&step), "cd '/tmp/x' && K='v v' set -e; echo hi");
188 }
189
190 #[tokio::test]
191 async fn local_exec_runs_granted_step() {
192 let exec = LocalExec::new(CapabilitySet::actuate_only([Action::Deploy]));
193 let mut sink = vec_sink();
194 let step = Step::shell(Action::Deploy, "printf ok");
195 let out = exec.run_streaming(&step, &mut sink).await.unwrap();
196 assert!(out.success());
197 assert_eq!(sink.0, b"ok");
198 }
199
200 #[tokio::test]
201 async fn local_exec_denies_ungranted_step_before_dispatch() {
202 // Grant deploy only; ask it to sign. Must deny, and must NOT run the
203 // command (the file the command would create must not appear).
204 let dir = tempfile::tempdir().unwrap();
205 let marker = dir.path().join("ran");
206 let exec = LocalExec::new(CapabilitySet::actuate_only([Action::Deploy]));
207 let mut sink = vec_sink();
208 let step = Step::shell(Action::Sign, format!("touch {}", marker.display()));
209 let err = exec.run_streaming(&step, &mut sink).await.unwrap_err();
210 let denied = err.downcast_ref::<CapabilityDenied>().expect("CapabilityDenied");
211 assert_eq!(denied.action, "sign");
212 assert!(!marker.exists(), "denied step must not execute");
213 }
214
215 #[tokio::test]
216 async fn dyn_executor_object_is_usable() {
217 // Prove the trait is object-safe and Arc<dyn Executor> works.
218 let exec: Arc<dyn Executor> = Arc::new(LocalExec::new(CapabilitySet::actuate_only([Action::Restart])));
219 assert!(exec.capabilities().permits(&Action::Restart));
220 let mut sink = vec_sink();
221 let out = exec.run_streaming(&Step::shell(Action::Restart, "true"), &mut sink).await.unwrap();
222 assert!(out.success());
223 }
224
225 #[tokio::test]
226 async fn local_push_and_pull_move_a_dir() {
227 let dir = tempfile::tempdir().unwrap();
228 let src = dir.path().join("src");
229 let mid = dir.path().join("mid");
230 let dst = dir.path().join("dst");
231 tokio::fs::create_dir_all(&src).await.unwrap();
232 tokio::fs::write(src.join("f.txt"), b"hi").await.unwrap();
233 let exec = LocalExec::new(CapabilitySet::default());
234 exec.push(&src, &mid, &SyncOpts::default()).await.unwrap();
235 assert_eq!(tokio::fs::read(mid.join("f.txt")).await.unwrap(), b"hi");
236 exec.pull(&mid, &dst, &SyncOpts::default()).await.unwrap();
237 assert_eq!(tokio::fs::read(dst.join("f.txt")).await.unwrap(), b"hi");
238 }
239 }
240