//! `AgentRpc` — the Executor transport that talks to a remote `ops-agent`. //! //! Used for the macOS in-session sign step (the agent runs in the Aqua security //! session where codesign can use the Developer ID key) and, later, the //! resident observe plane. Non-signing, Linux/Windows work stays on //! [`crate::SshExec`]. //! //! Trust is the tailnet: the connection rides WireGuard, and the agent //! independently re-checks the caller's identity via `whois`. This client also //! enforces its own grant caller-side (the first half of double enforcement). use crate::capability::{CapabilityDenied, CapabilitySet}; use crate::executor::{Executor, SyncOpts}; use crate::remote::{LogSink, RunOutput}; use crate::step::Step; use crate::wire::{Frame, HealthResponse, RunRequest}; use anyhow::{Context, Result}; use async_trait::async_trait; use futures_util::StreamExt; use std::path::Path; use std::process::ExitStatus; /// A handle to one `ops-agent`, scoped to a caller-side capability set. pub struct AgentRpc { base_url: String, host_label: String, caps: CapabilitySet, client: reqwest::Client, } impl AgentRpc { /// `base_url` is e.g. `http://mbp.tailnet:8765` (the agent listens on the /// tailnet interface). `host_label` is used only for audit messages. pub fn new(base_url: impl Into, host_label: impl Into, caps: CapabilitySet) -> Self { Self { base_url: base_url.into(), host_label: host_label.into(), caps, client: reqwest::Client::new(), } } /// `GET /health` — liveness plus the agent's own declared grant. pub async fn health(&self) -> Result { let resp = self .client .get(format!("{}/health", self.base_url)) .send() .await .context("GET /health")? .error_for_status() .context("agent /health status")?; resp.json().await.context("decoding /health") } } /// Reconstruct an `ExitStatus` from a raw exit code (Unix wait-status encoding: /// the code lives in the high byte). #[cfg(unix)] fn exit_status(code: i32) -> ExitStatus { use std::os::unix::process::ExitStatusExt; ExitStatus::from_raw((code & 0xff) << 8) } #[cfg(not(unix))] fn exit_status(code: i32) -> ExitStatus { use std::os::windows::process::ExitStatusExt; ExitStatus::from_raw(code as u32) } #[async_trait] impl Executor for AgentRpc { async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result { // Caller-side enforcement (half 1 of 2). The agent re-checks against // its own grant (half 2) — this just fails fast before a round-trip. if !self.caps.permits(&step.action) { return Err(CapabilityDenied::new(&self.host_label, &step.action).into()); } let resp = self .client .post(format!("{}/run", self.base_url)) .json(&RunRequest { step: step.clone() }) .send() .await .context("POST /run")?; if resp.status() == reqwest::StatusCode::FORBIDDEN { let body = resp.text().await.unwrap_or_default(); anyhow::bail!("agent denied /run: {}", body.trim()); } let resp = resp.error_for_status().context("agent /run status")?; let mut stream = resp.bytes_stream(); let mut buf: Vec = Vec::new(); let mut captured: Vec = Vec::new(); let mut exit_code: Option = None; while let Some(chunk) = stream.next().await { let chunk = chunk.context("reading /run stream")?; buf.extend_from_slice(&chunk); while let Some(nl) = buf.iter().position(|&b| b == b'\n') { let line: Vec = buf.drain(..=nl).collect(); let line = &line[..line.len() - 1]; if line.is_empty() { continue; } let frame: Frame = serde_json::from_slice(line) .with_context(|| format!("decoding agent frame: {}", String::from_utf8_lossy(line)))?; match frame { Frame::Chunk { text } => { captured.extend_from_slice(text.as_bytes()); sink.write_chunk(text.as_bytes()).await; } Frame::Exit { code } => exit_code = Some(code), Frame::Error { message } => anyhow::bail!("agent error: {message}"), } } } let code = exit_code.context("agent closed /run stream without an exit frame")?; Ok(RunOutput { status: exit_status(code), stdout: captured, stderr: Vec::new() }) } async fn pull(&self, remote: &Path, local: &Path, _opts: &SyncOpts) -> Result<()> { let resp = self .client .get(format!("{}/pull", self.base_url)) .query(&[("path", remote.to_string_lossy().as_ref())]) .send() .await .context("GET /pull")? .error_for_status() .context("agent /pull status")?; let bytes = resp.bytes().await.context("reading /pull body")?; if let Some(parent) = local.parent() { tokio::fs::create_dir_all(parent).await.ok(); } tokio::fs::write(local, &bytes).await.context("writing pulled artifact")?; Ok(()) } async fn push(&self, _local: &Path, _remote: &Path, _opts: &SyncOpts) -> Result<()> { // The agent transport is for in-session *execution*. Bulk data movement // onto the host uses SshExec/rsync or a `git pull` step inside the // recipe — keeping the agent's surface small (one open port, exec only). anyhow::bail!( "AgentRpc::push is unsupported by design; move source/data with SshExec/rsync \ or a `git pull` step, not the agent" ) } fn capabilities(&self) -> &CapabilitySet { &self.caps } }