| 1 |
|
| 2 |
|
| 3 |
|
| 4 |
|
| 5 |
|
| 6 |
|
| 7 |
|
| 8 |
|
| 9 |
|
| 10 |
|
| 11 |
|
| 12 |
use crate::capability::{CapabilityDenied, CapabilitySet}; |
| 13 |
use crate::executor::{Executor, SyncOpts}; |
| 14 |
use crate::remote::{LogSink, RunOutput}; |
| 15 |
use crate::step::Step; |
| 16 |
use crate::wire::{Frame, HealthResponse, RunRequest}; |
| 17 |
use anyhow::{Context, Result}; |
| 18 |
use async_trait::async_trait; |
| 19 |
use futures_util::StreamExt; |
| 20 |
use std::path::Path; |
| 21 |
use std::process::ExitStatus; |
| 22 |
|
| 23 |
|
| 24 |
pub struct AgentRpc { |
| 25 |
base_url: String, |
| 26 |
host_label: String, |
| 27 |
caps: CapabilitySet, |
| 28 |
client: reqwest::Client, |
| 29 |
} |
| 30 |
|
| 31 |
impl AgentRpc { |
| 32 |
|
| 33 |
|
| 34 |
pub fn new(base_url: impl Into<String>, host_label: impl Into<String>, caps: CapabilitySet) -> Self { |
| 35 |
Self { |
| 36 |
base_url: base_url.into(), |
| 37 |
host_label: host_label.into(), |
| 38 |
caps, |
| 39 |
client: reqwest::Client::new(), |
| 40 |
} |
| 41 |
} |
| 42 |
|
| 43 |
|
| 44 |
pub async fn health(&self) -> Result<HealthResponse> { |
| 45 |
let resp = self |
| 46 |
.client |
| 47 |
.get(format!("{}/health", self.base_url)) |
| 48 |
.send() |
| 49 |
.await |
| 50 |
.context("GET /health")? |
| 51 |
.error_for_status() |
| 52 |
.context("agent /health status")?; |
| 53 |
resp.json().await.context("decoding /health") |
| 54 |
} |
| 55 |
} |
| 56 |
|
| 57 |
|
| 58 |
|
| 59 |
#[cfg(unix)] |
| 60 |
fn exit_status(code: i32) -> ExitStatus { |
| 61 |
use std::os::unix::process::ExitStatusExt; |
| 62 |
ExitStatus::from_raw((code & 0xff) << 8) |
| 63 |
} |
| 64 |
#[cfg(not(unix))] |
| 65 |
fn exit_status(code: i32) -> ExitStatus { |
| 66 |
use std::os::windows::process::ExitStatusExt; |
| 67 |
ExitStatus::from_raw(code as u32) |
| 68 |
} |
| 69 |
|
| 70 |
#[async_trait] |
| 71 |
impl Executor for AgentRpc { |
| 72 |
async fn run_streaming(&self, step: &Step, sink: &mut dyn LogSink) -> Result<RunOutput> { |
| 73 |
|
| 74 |
|
| 75 |
if !self.caps.permits(&step.action) { |
| 76 |
return Err(CapabilityDenied::new(&self.host_label, &step.action).into()); |
| 77 |
} |
| 78 |
|
| 79 |
let resp = self |
| 80 |
.client |
| 81 |
.post(format!("{}/run", self.base_url)) |
| 82 |
.json(&RunRequest { step: step.clone() }) |
| 83 |
.send() |
| 84 |
.await |
| 85 |
.context("POST /run")?; |
| 86 |
if resp.status() == reqwest::StatusCode::FORBIDDEN { |
| 87 |
let body = resp.text().await.unwrap_or_default(); |
| 88 |
anyhow::bail!("agent denied /run: {}", body.trim()); |
| 89 |
} |
| 90 |
let resp = resp.error_for_status().context("agent /run status")?; |
| 91 |
|
| 92 |
let mut stream = resp.bytes_stream(); |
| 93 |
let mut buf: Vec<u8> = Vec::new(); |
| 94 |
let mut captured: Vec<u8> = Vec::new(); |
| 95 |
let mut exit_code: Option<i32> = None; |
| 96 |
|
| 97 |
while let Some(chunk) = stream.next().await { |
| 98 |
let chunk = chunk.context("reading /run stream")?; |
| 99 |
buf.extend_from_slice(&chunk); |
| 100 |
while let Some(nl) = buf.iter().position(|&b| b == b'\n') { |
| 101 |
let line: Vec<u8> = buf.drain(..=nl).collect(); |
| 102 |
let line = &line[..line.len() - 1]; |
| 103 |
if line.is_empty() { |
| 104 |
continue; |
| 105 |
} |
| 106 |
let frame: Frame = serde_json::from_slice(line) |
| 107 |
.with_context(|| format!("decoding agent frame: {}", String::from_utf8_lossy(line)))?; |
| 108 |
match frame { |
| 109 |
Frame::Chunk { text } => { |
| 110 |
captured.extend_from_slice(text.as_bytes()); |
| 111 |
sink.write_chunk(text.as_bytes()).await; |
| 112 |
} |
| 113 |
Frame::Exit { code } => exit_code = Some(code), |
| 114 |
Frame::Error { message } => anyhow::bail!("agent error: {message}"), |
| 115 |
} |
| 116 |
} |
| 117 |
} |
| 118 |
|
| 119 |
let code = exit_code.context("agent closed /run stream without an exit frame")?; |
| 120 |
Ok(RunOutput { status: exit_status(code), stdout: captured, stderr: Vec::new() }) |
| 121 |
} |
| 122 |
|
| 123 |
async fn pull(&self, remote: &Path, local: &Path, _opts: &SyncOpts) -> Result<()> { |
| 124 |
let resp = self |
| 125 |
.client |
| 126 |
.get(format!("{}/pull", self.base_url)) |
| 127 |
.query(&[("path", remote.to_string_lossy().as_ref())]) |
| 128 |
.send() |
| 129 |
.await |
| 130 |
.context("GET /pull")? |
| 131 |
.error_for_status() |
| 132 |
.context("agent /pull status")?; |
| 133 |
let bytes = resp.bytes().await.context("reading /pull body")?; |
| 134 |
if let Some(parent) = local.parent() { |
| 135 |
tokio::fs::create_dir_all(parent).await.ok(); |
| 136 |
} |
| 137 |
tokio::fs::write(local, &bytes).await.context("writing pulled artifact")?; |
| 138 |
Ok(()) |
| 139 |
} |
| 140 |
|
| 141 |
async fn push(&self, _local: &Path, _remote: &Path, _opts: &SyncOpts) -> Result<()> { |
| 142 |
|
| 143 |
|
| 144 |
|
| 145 |
anyhow::bail!( |
| 146 |
"AgentRpc::push is unsupported by design; move source/data with SshExec/rsync \ |
| 147 |
or a `git pull` step, not the agent" |
| 148 |
) |
| 149 |
} |
| 150 |
|
| 151 |
fn capabilities(&self) -> &CapabilitySet { |
| 152 |
&self.caps |
| 153 |
} |
| 154 |
} |
| 155 |
|