Skip to main content

max / makenotwork

5.9 KB · 155 lines History Blame Raw
1 //! `AgentRpc` — the Executor transport that talks to a remote `ops-agent`.
2 //!
3 //! Used for the macOS in-session sign step (the agent runs in the Aqua security
4 //! session where codesign can use the Developer ID key) and, later, the
5 //! resident observe plane. Non-signing, Linux/Windows work stays on
6 //! [`crate::SshExec`].
7 //!
8 //! Trust is the tailnet: the connection rides WireGuard, and the agent
9 //! independently re-checks the caller's identity via `whois`. This client also
10 //! enforces its own grant caller-side (the first half of double enforcement).
11
12 use crate::capability::{CapabilityDenied, CapabilitySet};
13 use crate::executor::{Executor, SyncOpts};
14 use crate::remote::{LogSink, RunOutput};
15 use crate::step::Step;
16 use crate::wire::{Frame, HealthResponse, RunRequest};
17 use anyhow::{Context, Result};
18 use async_trait::async_trait;
19 use futures_util::StreamExt;
20 use std::path::Path;
21 use std::process::ExitStatus;
22
23 /// A handle to one `ops-agent`, scoped to a caller-side capability set.
24 pub struct AgentRpc {
25 base_url: String,
26 host_label: String,
27 caps: CapabilitySet,
28 client: reqwest::Client,
29 }
30
31 impl AgentRpc {
32 /// `base_url` is e.g. `http://mbp.tailnet:8765` (the agent listens on the
33 /// tailnet interface). `host_label` is used only for audit messages.
34 pub fn new(base_url: impl Into<String>, host_label: impl Into<String>, caps: CapabilitySet) -> Self {
35 Self {
36 base_url: base_url.into(),
37 host_label: host_label.into(),
38 caps,
39 client: reqwest::Client::new(),
40 }
41 }
42
43 /// `GET /health` — liveness plus the agent's own declared grant.
44 pub async fn health(&self) -> Result<HealthResponse> {
45 let resp = self
46 .client
47 .get(format!("{}/health", self.base_url))
48 .send()
49 .await
50 .context("GET /health")?
51 .error_for_status()
52 .context("agent /health status")?;
53 resp.json().await.context("decoding /health")
54 }
55 }
56
57 /// Reconstruct an `ExitStatus` from a raw exit code (Unix wait-status encoding:
58 /// the code lives in the high byte).
59 #[cfg(unix)]
60 fn exit_status(code: i32) -> ExitStatus {
61 use std::os::unix::process::ExitStatusExt;
62 ExitStatus::from_raw((code & 0xff) << 8)
63 }
64 #[cfg(not(unix))]
65 fn exit_status(code: i32) -> ExitStatus {
66 use std::os::windows::process::ExitStatusExt;
67 ExitStatus::from_raw(code as u32)
68 }
69
70 #[async_trait]
71 impl Executor for AgentRpc {
72 async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result<RunOutput> {
73 // Caller-side enforcement (half 1 of 2). The agent re-checks against
74 // its own grant (half 2) — this just fails fast before a round-trip.
75 if !self.caps.permits(&step.action) {
76 return Err(CapabilityDenied::new(&self.host_label, &step.action).into());
77 }
78
79 let resp = self
80 .client
81 .post(format!("{}/run", self.base_url))
82 .json(&RunRequest { step: step.clone() })
83 .send()
84 .await
85 .context("POST /run")?;
86 if resp.status() == reqwest::StatusCode::FORBIDDEN {
87 let body = resp.text().await.unwrap_or_default();
88 anyhow::bail!("agent denied /run: {}", body.trim());
89 }
90 let resp = resp.error_for_status().context("agent /run status")?;
91
92 let mut stream = resp.bytes_stream();
93 let mut buf: Vec<u8> = Vec::new();
94 let mut captured: Vec<u8> = Vec::new();
95 let mut exit_code: Option<i32> = None;
96
97 while let Some(chunk) = stream.next().await {
98 let chunk = chunk.context("reading /run stream")?;
99 buf.extend_from_slice(&chunk);
100 while let Some(nl) = buf.iter().position(|&b| b == b'\n') {
101 let line: Vec<u8> = buf.drain(..=nl).collect();
102 let line = &line[..line.len() - 1];
103 if line.is_empty() {
104 continue;
105 }
106 let frame: Frame = serde_json::from_slice(line)
107 .with_context(|| format!("decoding agent frame: {}", String::from_utf8_lossy(line)))?;
108 match frame {
109 Frame::Chunk { text } => {
110 captured.extend_from_slice(text.as_bytes());
111 sink.write_chunk(text.as_bytes()).await;
112 }
113 Frame::Exit { code } => exit_code = Some(code),
114 Frame::Error { message } => anyhow::bail!("agent error: {message}"),
115 }
116 }
117 }
118
119 let code = exit_code.context("agent closed /run stream without an exit frame")?;
120 Ok(RunOutput { status: exit_status(code), stdout: captured, stderr: Vec::new() })
121 }
122
123 async fn pull(&self, remote: &Path, local: &Path, _opts: &SyncOpts) -> Result<()> {
124 let resp = self
125 .client
126 .get(format!("{}/pull", self.base_url))
127 .query(&[("path", remote.to_string_lossy().as_ref())])
128 .send()
129 .await
130 .context("GET /pull")?
131 .error_for_status()
132 .context("agent /pull status")?;
133 let bytes = resp.bytes().await.context("reading /pull body")?;
134 if let Some(parent) = local.parent() {
135 tokio::fs::create_dir_all(parent).await.ok();
136 }
137 tokio::fs::write(local, &bytes).await.context("writing pulled artifact")?;
138 Ok(())
139 }
140
141 async fn push(&self, _local: &Path, _remote: &Path, _opts: &SyncOpts) -> Result<()> {
142 // The agent transport is for in-session *execution*. Bulk data movement
143 // onto the host uses SshExec/rsync or a `git pull` step inside the
144 // recipe — keeping the agent's surface small (one open port, exec only).
145 anyhow::bail!(
146 "AgentRpc::push is unsupported by design; move source/data with SshExec/rsync \
147 or a `git pull` step, not the agent"
148 )
149 }
150
151 fn capabilities(&self) -> &CapabilitySet {
152 &self.caps
153 }
154 }
155