//! Concrete [`Executor`] transports. //! //! - [`LocalExec`] — spawn on the local machine (Sando's `ssh_target = "local"` //! fast-path; the agent's own in-process execution). //! - [`SshExec`] — spawn `ssh` / `rsync` over the tailnet. Auth is the existing //! SSH keys; the rsync push/pull is extracted near-verbatim from Sando's //! `deploy.rs`. //! //! Both render a [`Step`] to one `/bin/sh` line and run it through the shared //! [`RemoteHost`] command builder, so local and remote shell semantics are //! identical. The capability gate is enforced here, caller-side, before any //! command is dispatched. use crate::capability::{CapabilityDenied, CapabilitySet}; use crate::executor::{Executor, SyncOpts, run_command_into_sink}; use crate::remote::{LogSink, RemoteHost, RunOutput, SSH_FLAGS, sh_quote}; use crate::step::Step; use anyhow::{Context, Result}; use async_trait::async_trait; use std::path::Path; use tokio::process::Command; /// Render a step to a single `/bin/sh` command line: optional `cd`, env /// exports, then the body (a shell script verbatim, or sh-quoted argv). fn render_shell_line(step: &Step) -> String { let mut line = String::new(); if let Some(cwd) = &step.cwd { line.push_str(&format!("cd {} && ", sh_quote(&cwd.to_string_lossy()))); } for (k, v) in &step.env { line.push_str(&format!("{}={} ", k, sh_quote(v))); } match step.shell_script() { Some(script) => line.push_str(script), None => { let parts: Vec = step.argv.iter().map(|a| sh_quote(a)).collect(); line.push_str(&parts.join(" ")); } } line } /// Enforce the caller-side capability gate, returning [`CapabilityDenied`] as /// an `anyhow::Error` the caller can downcast for audit logging. fn gate(caps: &CapabilitySet, host: &str, step: &Step) -> Result<()> { if caps.permits(&step.action) { Ok(()) } else { Err(CapabilityDenied::new(host, &step.action).into()) } } /// Build the rsync `Command` shared by local and ssh push/pull. `remote_spec` /// is either a plain path (local) or `target:path` (ssh). fn rsync_command(src: &str, dst: &str, over_ssh: bool, opts: &SyncOpts) -> Command { let mut rsync = Command::new("rsync"); rsync.arg("-az").arg("--partial"); if opts.delete { rsync.arg("--delete"); } if let Some(chmod) = &opts.chmod { rsync.arg(format!("--chmod={chmod}")); } if over_ssh { rsync.arg("-e").arg(format!("ssh {}", SSH_FLAGS.join(" "))); } rsync.arg(src).arg(dst); rsync } async fn run_rsync(mut cmd: Command, what: &str) -> Result<()> { let out = cmd.output().await.with_context(|| format!("spawning rsync ({what})"))?; anyhow::ensure!( out.status.success(), "rsync {what} failed: {}", String::from_utf8_lossy(&out.stderr), ); Ok(()) } /// The local-machine transport. pub struct LocalExec { host: RemoteHost, caps: CapabilitySet, } impl LocalExec { pub fn new(caps: CapabilitySet) -> Self { Self { host: RemoteHost::new("local"), caps } } } #[async_trait] impl Executor for LocalExec { async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result { gate(&self.caps, "local", step)?; let cmd = self.host.command(&render_shell_line(step)); run_command_into_sink(cmd, sink).await } async fn pull(&self, remote: &Path, local: &Path, opts: &SyncOpts) -> Result<()> { // Local "pull" is just a local rsync; trailing slash = contents. let src = format!("{}/", remote.display()); run_rsync(rsync_command(&src, &local.to_string_lossy(), false, opts), "pull(local)").await } async fn push(&self, local: &Path, remote: &Path, opts: &SyncOpts) -> Result<()> { let src = format!("{}/", local.display()); run_rsync(rsync_command(&src, &remote.to_string_lossy(), false, opts), "push(local)").await } fn capabilities(&self) -> &CapabilitySet { &self.caps } } /// The SSH transport: a tailnet host reached with the existing SSH keys. pub struct SshExec { host: RemoteHost, caps: CapabilitySet, } impl SshExec { pub fn new(ssh_target: impl Into, caps: CapabilitySet) -> Self { Self { host: RemoteHost::new(ssh_target), caps } } pub fn ssh_target(&self) -> &str { self.host.ssh_target() } } #[async_trait] impl Executor for SshExec { async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result { gate(&self.caps, self.host.ssh_target(), step)?; let cmd = self.host.command(&render_shell_line(step)); run_command_into_sink(cmd, sink).await } async fn pull(&self, remote: &Path, local: &Path, opts: &SyncOpts) -> Result<()> { let src = format!("{}:{}/", self.host.ssh_target(), remote.display()); run_rsync(rsync_command(&src, &local.to_string_lossy(), true, opts), "pull(ssh)").await } async fn push(&self, local: &Path, remote: &Path, opts: &SyncOpts) -> Result<()> { let src = format!("{}/", local.display()); let dst = format!("{}:{}/", self.host.ssh_target(), remote.display()); run_rsync(rsync_command(&src, &dst, true, opts), "push(ssh)").await } fn capabilities(&self) -> &CapabilitySet { &self.caps } } #[cfg(test)] mod tests { use super::*; use crate::step::Action; use std::sync::Arc; #[derive(Default)] struct VecSink(Vec); #[async_trait] impl LogSink for VecSink { async fn write_chunk(&mut self, bytes: &[u8]) { self.0.extend_from_slice(bytes); } } fn vec_sink() -> VecSink { VecSink::default() } #[test] fn render_plain_argv_quotes_each_token() { let step = Step::new(Action::Build, ["echo", "a b", "c"]); assert_eq!(render_shell_line(&step), "'echo' 'a b' 'c'"); } #[test] fn render_shell_script_is_verbatim_with_env_and_cwd() { let step = Step::shell(Action::Deploy, "set -e; echo hi") .with_env("K", "v v") .with_cwd("/tmp/x"); assert_eq!(render_shell_line(&step), "cd '/tmp/x' && K='v v' set -e; echo hi"); } #[tokio::test] async fn local_exec_runs_granted_step() { let exec = LocalExec::new(CapabilitySet::actuate_only([Action::Deploy])); let mut sink = vec_sink(); let step = Step::shell(Action::Deploy, "printf ok"); let out = exec.run_streaming(&step, &mut sink).await.unwrap(); assert!(out.success()); assert_eq!(sink.0, b"ok"); } #[tokio::test] async fn local_exec_denies_ungranted_step_before_dispatch() { // Grant deploy only; ask it to sign. Must deny, and must NOT run the // command (the file the command would create must not appear). let dir = tempfile::tempdir().unwrap(); let marker = dir.path().join("ran"); let exec = LocalExec::new(CapabilitySet::actuate_only([Action::Deploy])); let mut sink = vec_sink(); let step = Step::shell(Action::Sign, format!("touch {}", marker.display())); let err = exec.run_streaming(&step, &mut sink).await.unwrap_err(); let denied = err.downcast_ref::().expect("CapabilityDenied"); assert_eq!(denied.action, "sign"); assert!(!marker.exists(), "denied step must not execute"); } #[tokio::test] async fn dyn_executor_object_is_usable() { // Prove the trait is object-safe and Arc works. let exec: Arc = Arc::new(LocalExec::new(CapabilitySet::actuate_only([Action::Restart]))); assert!(exec.capabilities().permits(&Action::Restart)); let mut sink = vec_sink(); let out = exec.run_streaming(&Step::shell(Action::Restart, "true"), &mut sink).await.unwrap(); assert!(out.success()); } #[tokio::test] async fn local_push_and_pull_move_a_dir() { let dir = tempfile::tempdir().unwrap(); let src = dir.path().join("src"); let mid = dir.path().join("mid"); let dst = dir.path().join("dst"); tokio::fs::create_dir_all(&src).await.unwrap(); tokio::fs::write(src.join("f.txt"), b"hi").await.unwrap(); let exec = LocalExec::new(CapabilitySet::default()); exec.push(&src, &mid, &SyncOpts::default()).await.unwrap(); assert_eq!(tokio::fs::read(mid.join("f.txt")).await.unwrap(), b"hi"); exec.pull(&mid, &dst, &SyncOpts::default()).await.unwrap(); assert_eq!(tokio::fs::read(dst.join("f.txt")).await.unwrap(), b"hi"); } }