//! Streaming command execution — local or over SSH. //! //! The single most valuable primitive both tools share. Sando's `deploy.rs` //! shelled out to `ssh`/`rsync` with buffered `.output()`; an app-build //! orchestrator instead needs the merged stdout+stderr streamed live (a //! `cargo tauri build` writes progress to stderr for minutes), so this is a //! proper streaming runner rather than a buffered one. //! //! A [`RemoteHost`] is either the local machine (`ssh_target == "local"` or //! empty) or a tailnet host reached over SSH. [`RemoteHost::run_streaming`] //! spawns the command, drains both pipes concurrently into a shared //! [`LogSink`], and returns the exit status plus the full captured bytes so a //! caller can still post-process the whole output (classify, grep a success //! banner, etc.). //! //! This is the low-level transport primitive. The capability-gated //! [`crate::Executor`] trait ([`crate::LocalExec`] / [`crate::SshExec`]) is the //! layer built on top of it. use anyhow::{Context, Result}; use async_trait::async_trait; use std::process::{ExitStatus, Stdio}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncReadExt}; use tokio::process::Command; use tokio::sync::Mutex; /// SSH options used everywhere we shell out to ssh — fail fast, no prompts. /// Matches Sando's original `deploy.rs` so behavior is identical across tools. pub const SSH_FLAGS: &[&str] = &[ "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=accept-new", ]; /// A sink for streamed command output. Each chunk reflects a tokio read /// boundary — chunks are NOT line-aligned; consumers that want lines must /// reassemble. `ops_core::live_log::LiveLog` is the canonical implementation /// (append-to-disk + broadcast). /// /// `#[async_trait]` keeps the trait object-safe so it can be passed as /// `&mut dyn LogSink` into the [`crate::Executor`] trait while still being /// usable behind an `Arc>` by [`RemoteHost::run_streaming`]. #[async_trait] pub trait LogSink: Send { async fn write_chunk(&mut self, bytes: &[u8]); } /// A build host: the local machine or an SSH target (a tailnet alias such as /// `mbp`, or `user@host`). #[derive(Debug, Clone)] pub struct RemoteHost { ssh_target: String, } /// Result of a streamed run: the exit status plus the full captured stdout and /// stderr byte streams (already forwarded to the sink as they arrived). #[derive(Debug)] pub struct RunOutput { pub status: ExitStatus, pub stdout: Vec, pub stderr: Vec, } impl RunOutput { pub fn success(&self) -> bool { self.status.success() } } impl RemoteHost { /// `"local"` (or empty) runs commands directly via `sh -c`; anything else /// is an SSH target. pub fn new(ssh_target: impl Into) -> Self { Self { ssh_target: ssh_target.into() } } pub fn is_local(&self) -> bool { self.ssh_target == "local" || self.ssh_target.is_empty() } pub fn ssh_target(&self) -> &str { &self.ssh_target } /// Build the `Command` that runs `script` as a single `/bin/sh` program, /// locally or over SSH. The remote side runs `script` as the argument to /// the login shell (ssh joins argv with spaces and hands it to the shell), /// so multi-statement scripts and pipes work the same as locally. pub(crate) fn command(&self, script: &str) -> Command { if self.is_local() { let mut cmd = Command::new("sh"); cmd.arg("-c").arg(script); cmd } else { let mut cmd = Command::new("ssh"); cmd.args(SSH_FLAGS).arg(&self.ssh_target).arg(script); cmd } } /// Spawn `script`, stream its merged output into `sink`, and return the /// exit status plus captured bytes. `kill_on_drop` means cancelling the /// caller's task (e.g. a newer build superseding this one) SIGKILLs the /// child and — for SSH — drops the connection. pub async fn run_streaming(&self, script: &str, sink: Arc>) -> Result where S: LogSink + Send + 'static, { let mut child = self .command(script) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .kill_on_drop(true) .spawn() .with_context(|| format!("spawning command on {}", self.ssh_target))?; let stdout_task = tokio::spawn(drain(child.stdout.take(), sink.clone())); let stderr_task = tokio::spawn(drain(child.stderr.take(), sink.clone())); let status = child.wait().await.context("waiting on child")?; let stdout = stdout_task.await.unwrap_or_default(); let stderr = stderr_task.await.unwrap_or_default(); Ok(RunOutput { status, stdout, stderr }) } } /// Drain `stream` into the shared sink and return the concatenated bytes. async fn drain(stream: Option, sink: Arc>) -> Vec where R: AsyncRead + Unpin + Send + 'static, S: LogSink + Send + 'static, { let mut total = Vec::new(); let Some(mut s) = stream else { return total }; let mut buf = [0u8; 4096]; loop { match s.read(&mut buf).await { Ok(0) | Err(_) => break, Ok(n) => { total.extend_from_slice(&buf[..n]); sink.lock().await.write_chunk(&buf[..n]).await; } } } total } /// Single-quote a string for safe inclusion in a `/bin/sh` command, escaping /// any embedded single quote. Not bulletproof for adversarial input, but every /// path here comes from our own config files. pub fn sh_quote(s: &str) -> String { let escaped = s.replace('\'', r"'\''"); format!("'{escaped}'") } #[cfg(test)] mod tests { use super::*; /// A simple sink that accumulates every chunk for assertions. #[derive(Default)] pub(crate) struct VecSink(pub Vec); #[async_trait] impl LogSink for VecSink { async fn write_chunk(&mut self, bytes: &[u8]) { self.0.extend_from_slice(bytes); } } #[test] fn sh_quote_escapes() { assert_eq!(sh_quote("hello"), "'hello'"); assert_eq!(sh_quote("it's"), r"'it'\''s'"); } #[test] fn local_detection() { assert!(RemoteHost::new("local").is_local()); assert!(RemoteHost::new("").is_local()); assert!(!RemoteHost::new("mbp").is_local()); } #[tokio::test] async fn local_run_streams_stdout_and_captures_status() { let host = RemoteHost::new("local"); let sink = Arc::new(Mutex::new(VecSink::default())); let out = host .run_streaming("printf 'hello '; printf 'world'", sink.clone()) .await .unwrap(); assert!(out.success()); assert_eq!(out.stdout, b"hello world"); assert_eq!(sink.lock().await.0, b"hello world"); } #[tokio::test] async fn local_run_streams_stderr_too() { let host = RemoteHost::new("local"); let sink = Arc::new(Mutex::new(VecSink::default())); let out = host .run_streaming("echo out; echo err 1>&2", sink.clone()) .await .unwrap(); assert!(out.success()); assert_eq!(out.stdout, b"out\n"); assert_eq!(out.stderr, b"err\n"); let seen = sink.lock().await.0.clone(); assert!(seen.windows(4).any(|w| w == b"out\n")); assert!(seen.windows(4).any(|w| w == b"err\n")); } #[tokio::test] async fn local_run_reports_nonzero_exit() { let host = RemoteHost::new("local"); let sink = Arc::new(Mutex::new(VecSink::default())); let out = host.run_streaming("exit 3", sink).await.unwrap(); assert!(!out.success()); assert_eq!(out.status.code(), Some(3)); } }