//! The [`Executor`] trait: a handle scoped to one `(host, capability set)`. //! //! Callers hold executors as `Arc` (one per node, built from //! topology) so the concrete transport — [`crate::LocalExec`], //! [`crate::SshExec`], or `AgentRpc` — is chosen per host without the caller //! caring which. use crate::capability::CapabilitySet; use crate::remote::{LogSink, RunOutput}; use crate::step::{ObserveKind, Step}; use anyhow::Result; use async_trait::async_trait; use std::path::Path; use tokio::process::Command; /// Options controlling an rsync push/pull. Default = a plain `-a --partial` /// mirror that never prunes the destination (safe for artifact collection). /// Sando's release-dir deploy opts in to `delete` + `chmod` to keep its exact /// behavior. #[derive(Clone, Debug, Default)] pub struct SyncOpts { /// `--delete`: prune files on the destination that are gone from the /// source (a true mirror). Off by default. pub delete: bool, /// `--chmod=`: force destination permissions per-file. pub chmod: Option, } impl SyncOpts { /// The Sando release-dir mirror: prune stale assets and force exec bits the /// way `deploy.rs` always has. pub fn release_mirror() -> Self { Self { delete: true, chmod: Some("Du=rwx,Dgo=rx,Fu=rw,Fgo=r,F+X".into()) } } } /// A read-only host observation. v1 can synthesize these from SSH-streamed /// commands; a resident `ops-agent` is a drop-in upgrade (E3). The variants /// mirror the executor spec. #[derive(Clone, Debug, PartialEq)] pub enum ObserveEvent { ProcessExited { unit: String, code: i32 }, ResourceSample { cpu: f64, rss: u64, disk: u64 }, JournalLine { unit: String, line: String }, HealthChanged { check: String, from: String, to: String }, } /// A live stream of [`ObserveEvent`]s from a host's observe plane. pub type EventStream = tokio::sync::mpsc::Receiver; #[async_trait] pub trait Executor: Send + Sync { /// Run a typed step, streaming merged stdout/stderr into `sink`. /// /// Returns [`crate::CapabilityDenied`] (boxed into the error) if /// `step.action` is outside this executor's grant — checked *before* the /// command is dispatched. async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result; /// Pull artifacts a prior step produced back to the caller (rsync from the /// host into `local`). async fn pull(&self, remote: &Path, local: &Path, opts: &SyncOpts) -> Result<()>; /// Push a directory/file from the caller to the host (rsync into `remote`). async fn push(&self, local: &Path, remote: &Path, opts: &SyncOpts) -> Result<()>; /// Subscribe to the host's observe stream; `None` if no observe capability /// (or, in v1, if no resident observer is wired — E3). fn observe(&self) -> Option { None } /// The capability set this executor was granted (introspection / audit). fn capabilities(&self) -> &CapabilitySet; /// Convenience: does this executor's grant cover `kind`? fn can_observe(&self, kind: &ObserveKind) -> bool { self.capabilities().permits_observe(kind) } } /// Spawn `cmd`, draining stdout+stderr concurrently into the single /// `&mut dyn LogSink`, and return the exit status plus full captured bytes. /// /// Unlike [`crate::remote::RemoteHost::run_streaming`] (which shares the sink /// across two spawned tasks via `Arc>`), this drains both pipes in one /// task with `select!` so it can take a borrowed `&mut dyn` sink — exactly the /// shape the [`Executor`] trait exposes. pub(crate) async fn run_command_into_sink( mut cmd: Command, sink: &mut dyn LogSink, ) -> Result { use std::process::Stdio; use tokio::io::AsyncReadExt; cmd.stdout(Stdio::piped()); cmd.stderr(Stdio::piped()); cmd.kill_on_drop(true); let mut child = cmd.spawn().map_err(|e| anyhow::anyhow!("spawning command: {e}"))?; let mut out = child.stdout.take(); let mut err = child.stderr.take(); let mut stdout_buf = Vec::new(); let mut stderr_buf = Vec::new(); let mut ob = [0u8; 4096]; let mut eb = [0u8; 4096]; let mut out_done = out.is_none(); let mut err_done = err.is_none(); while !(out_done && err_done) { tokio::select! { r = async { out.as_mut().unwrap().read(&mut ob).await }, if !out_done => { match r { Ok(0) | Err(_) => out_done = true, Ok(n) => { stdout_buf.extend_from_slice(&ob[..n]); sink.write_chunk(&ob[..n]).await; } } } r = async { err.as_mut().unwrap().read(&mut eb).await }, if !err_done => { match r { Ok(0) | Err(_) => err_done = true, Ok(n) => { stderr_buf.extend_from_slice(&eb[..n]); sink.write_chunk(&eb[..n]).await; } } } } } let status = child.wait().await.map_err(|e| anyhow::anyhow!("waiting on child: {e}"))?; Ok(RunOutput { status, stdout: stdout_buf, stderr: stderr_buf }) }