Skip to main content

max / makenotwork

5.0 KB · 130 lines History Blame Raw
1 //! The [`Executor`] trait: a handle scoped to one `(host, capability set)`.
2 //!
3 //! Callers hold executors as `Arc<dyn Executor>` (one per node, built from
4 //! topology) so the concrete transport — [`crate::LocalExec`],
5 //! [`crate::SshExec`], or `AgentRpc` — is chosen per host without the caller
6 //! caring which.
7
8 use crate::capability::CapabilitySet;
9 use crate::remote::{LogSink, RunOutput};
10 use crate::step::{ObserveKind, Step};
11 use anyhow::Result;
12 use async_trait::async_trait;
13 use std::path::Path;
14 use tokio::process::Command;
15
16 /// Options controlling an rsync push/pull. Default = a plain `-a --partial`
17 /// mirror that never prunes the destination (safe for artifact collection).
18 /// Sando's release-dir deploy opts in to `delete` + `chmod` to keep its exact
19 /// behavior.
20 #[derive(Clone, Debug, Default)]
21 pub struct SyncOpts {
22 /// `--delete`: prune files on the destination that are gone from the
23 /// source (a true mirror). Off by default.
24 pub delete: bool,
25 /// `--chmod=<spec>`: force destination permissions per-file.
26 pub chmod: Option<String>,
27 }
28
29 impl SyncOpts {
30 /// The Sando release-dir mirror: prune stale assets and force exec bits the
31 /// way `deploy.rs` always has.
32 pub fn release_mirror() -> Self {
33 Self { delete: true, chmod: Some("Du=rwx,Dgo=rx,Fu=rw,Fgo=r,F+X".into()) }
34 }
35 }
36
37 /// A read-only host observation. v1 can synthesize these from SSH-streamed
38 /// commands; a resident `ops-agent` is a drop-in upgrade (E3). The variants
39 /// mirror the executor spec.
40 #[derive(Clone, Debug, PartialEq)]
41 pub enum ObserveEvent {
42 ProcessExited { unit: String, code: i32 },
43 ResourceSample { cpu: f64, rss: u64, disk: u64 },
44 JournalLine { unit: String, line: String },
45 HealthChanged { check: String, from: String, to: String },
46 }
47
48 /// A live stream of [`ObserveEvent`]s from a host's observe plane.
49 pub type EventStream = tokio::sync::mpsc::Receiver<ObserveEvent>;
50
51 #[async_trait]
52 pub trait Executor: Send + Sync {
53 /// Run a typed step, streaming merged stdout/stderr into `sink`.
54 ///
55 /// Returns [`crate::CapabilityDenied`] (boxed into the error) if
56 /// `step.action` is outside this executor's grant — checked *before* the
57 /// command is dispatched.
58 async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result<RunOutput>;
59
60 /// Pull artifacts a prior step produced back to the caller (rsync from the
61 /// host into `local`).
62 async fn pull(&self, remote: &Path, local: &Path, opts: &SyncOpts) -> Result<()>;
63
64 /// Push a directory/file from the caller to the host (rsync into `remote`).
65 async fn push(&self, local: &Path, remote: &Path, opts: &SyncOpts) -> Result<()>;
66
67 /// Subscribe to the host's observe stream; `None` if no observe capability
68 /// (or, in v1, if no resident observer is wired — E3).
69 fn observe(&self) -> Option<EventStream> {
70 None
71 }
72
73 /// The capability set this executor was granted (introspection / audit).
74 fn capabilities(&self) -> &CapabilitySet;
75
76 /// Convenience: does this executor's grant cover `kind`?
77 fn can_observe(&self, kind: &ObserveKind) -> bool {
78 self.capabilities().permits_observe(kind)
79 }
80 }
81
82 /// Spawn `cmd`, draining stdout+stderr concurrently into the single
83 /// `&mut dyn LogSink`, and return the exit status plus full captured bytes.
84 ///
85 /// Unlike [`crate::remote::RemoteHost::run_streaming`] (which shares the sink
86 /// across two spawned tasks via `Arc<Mutex<_>>`), this drains both pipes in one
87 /// task with `select!` so it can take a borrowed `&mut dyn` sink — exactly the
88 /// shape the [`Executor`] trait exposes.
89 pub(crate) async fn run_command_into_sink(
90 mut cmd: Command,
91 sink: &mut dyn LogSink,
92 ) -> Result<RunOutput> {
93 use std::process::Stdio;
94 use tokio::io::AsyncReadExt;
95
96 cmd.stdout(Stdio::piped());
97 cmd.stderr(Stdio::piped());
98 cmd.kill_on_drop(true);
99 let mut child = cmd.spawn().map_err(|e| anyhow::anyhow!("spawning command: {e}"))?;
100
101 let mut out = child.stdout.take();
102 let mut err = child.stderr.take();
103 let mut stdout_buf = Vec::new();
104 let mut stderr_buf = Vec::new();
105 let mut ob = [0u8; 4096];
106 let mut eb = [0u8; 4096];
107 let mut out_done = out.is_none();
108 let mut err_done = err.is_none();
109
110 while !(out_done && err_done) {
111 tokio::select! {
112 r = async { out.as_mut().unwrap().read(&mut ob).await }, if !out_done => {
113 match r {
114 Ok(0) | Err(_) => out_done = true,
115 Ok(n) => { stdout_buf.extend_from_slice(&ob[..n]); sink.write_chunk(&ob[..n]).await; }
116 }
117 }
118 r = async { err.as_mut().unwrap().read(&mut eb).await }, if !err_done => {
119 match r {
120 Ok(0) | Err(_) => err_done = true,
121 Ok(n) => { stderr_buf.extend_from_slice(&eb[..n]); sink.write_chunk(&eb[..n]).await; }
122 }
123 }
124 }
125 }
126
127 let status = child.wait().await.map_err(|e| anyhow::anyhow!("waiting on child: {e}"))?;
128 Ok(RunOutput { status, stdout: stdout_buf, stderr: stderr_buf })
129 }
130