Skip to main content

max / makenotwork

7.7 KB · 224 lines History Blame Raw
1 //! Streaming command execution — local or over SSH.
2 //!
3 //! The single most valuable primitive both tools share. Sando's `deploy.rs`
4 //! shelled out to `ssh`/`rsync` with buffered `.output()`; an app-build
5 //! orchestrator instead needs the merged stdout+stderr streamed live (a
6 //! `cargo tauri build` writes progress to stderr for minutes), so this is a
7 //! proper streaming runner rather than a buffered one.
8 //!
9 //! A [`RemoteHost`] is either the local machine (`ssh_target == "local"` or
10 //! empty) or a tailnet host reached over SSH. [`RemoteHost::run_streaming`]
11 //! spawns the command, drains both pipes concurrently into a shared
12 //! [`LogSink`], and returns the exit status plus the full captured bytes so a
13 //! caller can still post-process the whole output (classify, grep a success
14 //! banner, etc.).
15 //!
16 //! This is the low-level transport primitive. The capability-gated
17 //! [`crate::Executor`] trait ([`crate::LocalExec`] / [`crate::SshExec`]) is the
18 //! layer built on top of it.
19
20 use anyhow::{Context, Result};
21 use async_trait::async_trait;
22 use std::process::{ExitStatus, Stdio};
23 use std::sync::Arc;
24 use tokio::io::{AsyncRead, AsyncReadExt};
25 use tokio::process::Command;
26 use tokio::sync::Mutex;
27
28 /// SSH options used everywhere we shell out to ssh — fail fast, no prompts.
29 /// Matches Sando's original `deploy.rs` so behavior is identical across tools.
30 pub const SSH_FLAGS: &[&str] = &[
31 "-o",
32 "BatchMode=yes",
33 "-o",
34 "ConnectTimeout=10",
35 "-o",
36 "StrictHostKeyChecking=accept-new",
37 ];
38
39 /// A sink for streamed command output. Each chunk reflects a tokio read
40 /// boundary — chunks are NOT line-aligned; consumers that want lines must
41 /// reassemble. `ops_core::live_log::LiveLog` is the canonical implementation
42 /// (append-to-disk + broadcast).
43 ///
44 /// `#[async_trait]` keeps the trait object-safe so it can be passed as
45 /// `&mut dyn LogSink` into the [`crate::Executor`] trait while still being
46 /// usable behind an `Arc<Mutex<_>>` by [`RemoteHost::run_streaming`].
47 #[async_trait]
48 pub trait LogSink: Send {
49 async fn write_chunk(&mut self, bytes: &[u8]);
50 }
51
52 /// A build host: the local machine or an SSH target (a tailnet alias such as
53 /// `mbp`, or `user@host`).
54 #[derive(Debug, Clone)]
55 pub struct RemoteHost {
56 ssh_target: String,
57 }
58
59 /// Result of a streamed run: the exit status plus the full captured stdout and
60 /// stderr byte streams (already forwarded to the sink as they arrived).
61 #[derive(Debug)]
62 pub struct RunOutput {
63 pub status: ExitStatus,
64 pub stdout: Vec<u8>,
65 pub stderr: Vec<u8>,
66 }
67
68 impl RunOutput {
69 pub fn success(&self) -> bool {
70 self.status.success()
71 }
72 }
73
74 impl RemoteHost {
75 /// `"local"` (or empty) runs commands directly via `sh -c`; anything else
76 /// is an SSH target.
77 pub fn new(ssh_target: impl Into<String>) -> Self {
78 Self { ssh_target: ssh_target.into() }
79 }
80
81 pub fn is_local(&self) -> bool {
82 self.ssh_target == "local" || self.ssh_target.is_empty()
83 }
84
85 pub fn ssh_target(&self) -> &str {
86 &self.ssh_target
87 }
88
89 /// Build the `Command` that runs `script` as a single `/bin/sh` program,
90 /// locally or over SSH. The remote side runs `script` as the argument to
91 /// the login shell (ssh joins argv with spaces and hands it to the shell),
92 /// so multi-statement scripts and pipes work the same as locally.
93 pub(crate) fn command(&self, script: &str) -> Command {
94 if self.is_local() {
95 let mut cmd = Command::new("sh");
96 cmd.arg("-c").arg(script);
97 cmd
98 } else {
99 let mut cmd = Command::new("ssh");
100 cmd.args(SSH_FLAGS).arg(&self.ssh_target).arg(script);
101 cmd
102 }
103 }
104
105 /// Spawn `script`, stream its merged output into `sink`, and return the
106 /// exit status plus captured bytes. `kill_on_drop` means cancelling the
107 /// caller's task (e.g. a newer build superseding this one) SIGKILLs the
108 /// child and — for SSH — drops the connection.
109 pub async fn run_streaming<S>(&self, script: &str, sink: Arc<Mutex<S>>) -> Result<RunOutput>
110 where
111 S: LogSink + Send + 'static,
112 {
113 let mut child = self
114 .command(script)
115 .stdout(Stdio::piped())
116 .stderr(Stdio::piped())
117 .kill_on_drop(true)
118 .spawn()
119 .with_context(|| format!("spawning command on {}", self.ssh_target))?;
120
121 let stdout_task = tokio::spawn(drain(child.stdout.take(), sink.clone()));
122 let stderr_task = tokio::spawn(drain(child.stderr.take(), sink.clone()));
123 let status = child.wait().await.context("waiting on child")?;
124 let stdout = stdout_task.await.unwrap_or_default();
125 let stderr = stderr_task.await.unwrap_or_default();
126 Ok(RunOutput { status, stdout, stderr })
127 }
128 }
129
130 /// Drain `stream` into the shared sink and return the concatenated bytes.
131 async fn drain<R, S>(stream: Option<R>, sink: Arc<Mutex<S>>) -> Vec<u8>
132 where
133 R: AsyncRead + Unpin + Send + 'static,
134 S: LogSink + Send + 'static,
135 {
136 let mut total = Vec::new();
137 let Some(mut s) = stream else { return total };
138 let mut buf = [0u8; 4096];
139 loop {
140 match s.read(&mut buf).await {
141 Ok(0) | Err(_) => break,
142 Ok(n) => {
143 total.extend_from_slice(&buf[..n]);
144 sink.lock().await.write_chunk(&buf[..n]).await;
145 }
146 }
147 }
148 total
149 }
150
151 /// Single-quote a string for safe inclusion in a `/bin/sh` command, escaping
152 /// any embedded single quote. Not bulletproof for adversarial input, but every
153 /// path here comes from our own config files.
154 pub fn sh_quote(s: &str) -> String {
155 let escaped = s.replace('\'', r"'\''");
156 format!("'{escaped}'")
157 }
158
159 #[cfg(test)]
160 mod tests {
161 use super::*;
162
163 /// A simple sink that accumulates every chunk for assertions.
164 #[derive(Default)]
165 pub(crate) struct VecSink(pub Vec<u8>);
166 #[async_trait]
167 impl LogSink for VecSink {
168 async fn write_chunk(&mut self, bytes: &[u8]) {
169 self.0.extend_from_slice(bytes);
170 }
171 }
172
173 #[test]
174 fn sh_quote_escapes() {
175 assert_eq!(sh_quote("hello"), "'hello'");
176 assert_eq!(sh_quote("it's"), r"'it'\''s'");
177 }
178
179 #[test]
180 fn local_detection() {
181 assert!(RemoteHost::new("local").is_local());
182 assert!(RemoteHost::new("").is_local());
183 assert!(!RemoteHost::new("mbp").is_local());
184 }
185
186 #[tokio::test]
187 async fn local_run_streams_stdout_and_captures_status() {
188 let host = RemoteHost::new("local");
189 let sink = Arc::new(Mutex::new(VecSink::default()));
190 let out = host
191 .run_streaming("printf 'hello '; printf 'world'", sink.clone())
192 .await
193 .unwrap();
194 assert!(out.success());
195 assert_eq!(out.stdout, b"hello world");
196 assert_eq!(sink.lock().await.0, b"hello world");
197 }
198
199 #[tokio::test]
200 async fn local_run_streams_stderr_too() {
201 let host = RemoteHost::new("local");
202 let sink = Arc::new(Mutex::new(VecSink::default()));
203 let out = host
204 .run_streaming("echo out; echo err 1>&2", sink.clone())
205 .await
206 .unwrap();
207 assert!(out.success());
208 assert_eq!(out.stdout, b"out\n");
209 assert_eq!(out.stderr, b"err\n");
210 let seen = sink.lock().await.0.clone();
211 assert!(seen.windows(4).any(|w| w == b"out\n"));
212 assert!(seen.windows(4).any(|w| w == b"err\n"));
213 }
214
215 #[tokio::test]
216 async fn local_run_reports_nonzero_exit() {
217 let host = RemoteHost::new("local");
218 let sink = Arc::new(Mutex::new(VecSink::default()));
219 let out = host.run_streaming("exit 3", sink).await.unwrap();
220 assert!(!out.success());
221 assert_eq!(out.status.code(), Some(3));
222 }
223 }
224