Skip to main content

max / makenotwork

11.9 KB · 341 lines History Blame Raw
1 //! `ops-agent` — the on-host half of the executor.
2 //!
3 //! One binary; behavior is set entirely by its local config (its own grant +
4 //! which caller identities may reach it). It listens **only on the tailnet
5 //! interface** and, on every request, resolves the caller via the local
6 //! Tailscale LocalAPI `whois`, maps node/tags to a caller grant, and runs the
7 //! step under the **intersection** of that grant with its own — the agent-side
8 //! half of double enforcement. A buggy or compromised daemon cannot make this
9 //! agent exceed its local grant.
10 //!
11 //! macOS deployment is an Aqua LaunchAgent (`LimitLoadToSessionType = Aqua`) so
12 //! build+sign run in the GUI security session where codesign can use the key.
13
14 use crate::capability::CapabilitySet;
15 use crate::executor::Executor;
16 use crate::remote::LogSink;
17 use crate::step::Action;
18 use crate::transport::LocalExec;
19 use crate::wire::{Frame, HealthResponse, RunRequest};
20 use anyhow::{Context, Result};
21 use async_trait::async_trait;
22 use serde::Deserialize;
23 use std::net::{IpAddr, SocketAddr};
24 use std::path::PathBuf;
25 use std::sync::Arc;
26
27 /// The agent's local configuration (TOML).
28 #[derive(Clone, Debug, Deserialize)]
29 pub struct AgentConfig {
30 /// Tailnet socket to bind — set this to the host's tailnet address so the
31 /// agent is never reachable off the tailnet.
32 pub listen: SocketAddr,
33 /// What this host itself is allowed to do. The ceiling for every caller.
34 #[serde(default)]
35 pub grant: GrantConfig,
36 /// Which caller identities may reach this agent, and the grant each
37 /// implies. The effective grant is `caller.caps ∩ self.grant`.
38 #[serde(default)]
39 pub allow: Vec<CallerGrant>,
40 }
41
42 /// Grant tokens as they appear in config: `actuate = [...]`, `observe = [...]`.
43 #[derive(Clone, Debug, Default, Deserialize)]
44 pub struct GrantConfig {
45 #[serde(default)]
46 pub actuate: Vec<String>,
47 #[serde(default)]
48 pub observe: Vec<String>,
49 }
50
51 impl GrantConfig {
52 pub fn to_caps(&self) -> CapabilitySet {
53 CapabilitySet::from_tokens(&self.actuate, &self.observe)
54 }
55 }
56
57 /// One allowed caller: a tailnet identity (a node name like `fw13` or a tag
58 /// like `tag:prod`) and the grant it implies.
59 #[derive(Clone, Debug, Deserialize)]
60 pub struct CallerGrant {
61 pub identity: String,
62 #[serde(default)]
63 pub actuate: Vec<String>,
64 #[serde(default)]
65 pub observe: Vec<String>,
66 }
67
68 impl CallerGrant {
69 fn to_caps(&self) -> CapabilitySet {
70 CapabilitySet::from_tokens(&self.actuate, &self.observe)
71 }
72 }
73
74 /// A resolved caller identity from `whois`: the peer's node name and its tags.
75 #[derive(Clone, Debug, PartialEq, Eq)]
76 pub struct CallerIdentity {
77 pub node: String,
78 pub tags: Vec<String>,
79 }
80
81 impl CallerIdentity {
82 /// Does this identity match a config `identity` string (a node name or a
83 /// `tag:...`)?
84 fn matches(&self, identity: &str) -> bool {
85 self.node == identity || self.tags.iter().any(|t| t == identity)
86 }
87 }
88
89 /// The outcome of authorizing a caller for an action.
90 #[derive(Debug, PartialEq, Eq)]
91 pub enum AuthDecision {
92 /// Permitted; carries the effective (intersected) grant.
93 Allow,
94 /// The caller's identity is not in this agent's allow-list.
95 UnknownCaller,
96 /// Known caller, but the action is outside the effective grant.
97 Denied,
98 }
99
100 /// Pure authorization core — no IO, fully unit-testable. The effective grant is
101 /// `caller_grant ∩ agent_grant`; the action must be permitted by it.
102 pub fn authorize(config: &AgentConfig, caller: &CallerIdentity, action: &Action) -> AuthDecision {
103 let Some(entry) = config.allow.iter().find(|c| caller.matches(&c.identity)) else {
104 return AuthDecision::UnknownCaller;
105 };
106 let effective = entry.to_caps().intersect(&config.grant.to_caps());
107 if effective.permits(action) {
108 AuthDecision::Allow
109 } else {
110 AuthDecision::Denied
111 }
112 }
113
114 /// Resolve a peer IP to a tailnet identity by shelling `tailscale whois
115 /// --json`. Works identically under Headscale (it serves the same client CLI).
116 /// Runtime-only — not exercised by unit tests (no live tailnet in CI).
117 pub async fn tailscale_whois(peer: IpAddr) -> Result<CallerIdentity> {
118 let out = tokio::process::Command::new("tailscale")
119 .args(["whois", "--json", &peer.to_string()])
120 .output()
121 .await
122 .context("spawning `tailscale whois`")?;
123 anyhow::ensure!(
124 out.status.success(),
125 "tailscale whois {peer} failed: {}",
126 String::from_utf8_lossy(&out.stderr)
127 );
128 #[derive(Deserialize)]
129 struct Whois {
130 #[serde(rename = "Node")]
131 node: WhoisNode,
132 }
133 #[derive(Deserialize)]
134 struct WhoisNode {
135 #[serde(rename = "Name", default)]
136 name: String,
137 #[serde(rename = "Tags", default)]
138 tags: Vec<String>,
139 }
140 let parsed: Whois = serde_json::from_slice(&out.stdout).context("parsing whois json")?;
141 // `Name` is the MagicDNS FQDN (e.g. `fw13.tailnet.ts.net.`); reduce to the
142 // bare hostname so config can say `fw13`.
143 let node = parsed.node.name.trim_end_matches('.').split('.').next().unwrap_or("").to_string();
144 Ok(CallerIdentity { node, tags: parsed.node.tags })
145 }
146
147 // ---------------------------------------------------------------------------
148 // HTTP layer (axum). Kept thin: it resolves identity, authorizes, and runs the
149 // step locally via `LocalExec`, streaming NDJSON frames back.
150 // ---------------------------------------------------------------------------
151
152 /// A resolver mapping a peer address to its tailnet identity. Boxed so tests
153 /// can inject a stub instead of shelling out to `tailscale`.
154 pub type WhoisResolver =
155 Arc<dyn Fn(IpAddr) -> futures_util::future::BoxFuture<'static, Result<CallerIdentity>> + Send + Sync>;
156
157 /// Shared server state.
158 #[derive(Clone)]
159 pub struct AgentState {
160 pub config: Arc<AgentConfig>,
161 pub whois: WhoisResolver,
162 }
163
164 impl AgentState {
165 /// Build state with the real `tailscale whois` resolver.
166 pub fn new(config: AgentConfig) -> Self {
167 Self {
168 config: Arc::new(config),
169 whois: Arc::new(|ip| Box::pin(tailscale_whois(ip))),
170 }
171 }
172 }
173
174 /// A [`LogSink`] that forwards each chunk as an NDJSON [`Frame::Chunk`] line
175 /// into the response body channel.
176 struct ChannelSink {
177 tx: tokio::sync::mpsc::Sender<Result<axum::body::Bytes, std::io::Error>>,
178 }
179
180 #[async_trait]
181 impl LogSink for ChannelSink {
182 async fn write_chunk(&mut self, bytes: &[u8]) {
183 let text = String::from_utf8_lossy(bytes).into_owned();
184 let line = Frame::Chunk { text }.to_line();
185 let _ = self.tx.send(Ok(axum::body::Bytes::from(line))).await;
186 }
187 }
188
189 /// Build the axum router. Serve it with
190 /// `.into_make_service_with_connect_info::<SocketAddr>()` so handlers see the
191 /// peer address.
192 pub fn router(state: AgentState) -> axum::Router {
193 use axum::routing::{get, post};
194 axum::Router::new()
195 .route("/health", get(health))
196 .route("/run", post(run))
197 .route("/pull", get(pull))
198 .with_state(state)
199 }
200
201 async fn health(
202 axum::extract::State(state): axum::extract::State<AgentState>,
203 ) -> axum::Json<HealthResponse> {
204 let caps = state.config.grant.to_caps();
205 axum::Json(HealthResponse {
206 ok: true,
207 actuate: caps.actuate_tokens().map(String::from).collect(),
208 observe: caps.observe_kinds().map(|k| k.token()).collect(),
209 })
210 }
211
212 async fn run(
213 axum::extract::State(state): axum::extract::State<AgentState>,
214 axum::extract::ConnectInfo(peer): axum::extract::ConnectInfo<SocketAddr>,
215 axum::Json(req): axum::Json<RunRequest>,
216 ) -> axum::response::Response {
217 use axum::http::StatusCode;
218 use axum::response::IntoResponse;
219
220 // Identity → authorization (agent-side enforcement).
221 let identity = match (state.whois)(peer.ip()).await {
222 Ok(id) => id,
223 Err(e) => {
224 return (StatusCode::FORBIDDEN, format!("whois failed: {e}")).into_response();
225 }
226 };
227 match authorize(&state.config, &identity, &req.step.action) {
228 AuthDecision::Allow => {}
229 AuthDecision::UnknownCaller => {
230 return (StatusCode::FORBIDDEN, format!("unknown caller: {}", identity.node)).into_response();
231 }
232 AuthDecision::Denied => {
233 return (
234 StatusCode::FORBIDDEN,
235 format!("action `{:?}` denied for {}", req.step.action, identity.node),
236 )
237 .into_response();
238 }
239 }
240
241 // Run locally under the effective grant, streaming NDJSON frames back.
242 let effective = state
243 .config
244 .allow
245 .iter()
246 .find(|c| identity.matches(&c.identity))
247 .map(|c| c.to_caps().intersect(&state.config.grant.to_caps()))
248 .unwrap_or_default();
249
250 let (tx, rx) = tokio::sync::mpsc::channel::<Result<axum::body::Bytes, std::io::Error>>(64);
251 tokio::spawn(async move {
252 let exec = LocalExec::new(effective);
253 let mut sink = ChannelSink { tx: tx.clone() };
254 let terminal = match exec.run_streaming(&req.step, &mut sink).await {
255 Ok(out) => Frame::Exit { code: out.status.code().unwrap_or(-1) },
256 Err(e) => Frame::Error { message: format!("{e:#}") },
257 };
258 let _ = tx.send(Ok(axum::body::Bytes::from(terminal.to_line()))).await;
259 });
260
261 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
262 axum::body::Body::from_stream(stream).into_response()
263 }
264
265 #[derive(Deserialize)]
266 struct PullQuery {
267 path: PathBuf,
268 }
269
270 async fn pull(
271 axum::extract::State(_state): axum::extract::State<AgentState>,
272 axum::extract::Query(q): axum::extract::Query<PullQuery>,
273 ) -> axum::response::Response {
274 use axum::http::StatusCode;
275 use axum::response::IntoResponse;
276 match tokio::fs::read(&q.path).await {
277 Ok(bytes) => bytes.into_response(),
278 Err(e) => (StatusCode::NOT_FOUND, format!("pull {}: {e}", q.path.display())).into_response(),
279 }
280 }
281
282 #[cfg(test)]
283 mod tests {
284 use super::*;
285
286 fn cfg() -> AgentConfig {
287 AgentConfig {
288 listen: "127.0.0.1:0".parse().unwrap(),
289 // This host (mbp-like) may build/sign/notarize/staple.
290 grant: GrantConfig {
291 actuate: vec!["build".into(), "sign".into(), "notarize".into(), "staple".into()],
292 observe: vec!["build-log".into()],
293 },
294 allow: vec![CallerGrant {
295 identity: "fw13".into(),
296 actuate: vec!["build".into(), "sign".into(), "notarize".into(), "staple".into()],
297 observe: vec![],
298 }],
299 }
300 }
301
302 fn fw13() -> CallerIdentity {
303 CallerIdentity { node: "fw13".into(), tags: vec![] }
304 }
305
306 #[test]
307 fn known_caller_granted_action_is_allowed() {
308 assert_eq!(authorize(&cfg(), &fw13(), &Action::Sign), AuthDecision::Allow);
309 }
310
311 #[test]
312 fn unknown_caller_is_rejected() {
313 let stranger = CallerIdentity { node: "laptop-x".into(), tags: vec![] };
314 assert_eq!(authorize(&cfg(), &stranger, &Action::Sign), AuthDecision::UnknownCaller);
315 }
316
317 #[test]
318 fn action_outside_agent_grant_is_denied_even_if_caller_asks() {
319 // Caller is granted only what config lists; deploy is not in either set.
320 assert_eq!(authorize(&cfg(), &fw13(), &Action::Deploy), AuthDecision::Denied);
321 }
322
323 #[test]
324 fn intersection_floors_a_too_broad_caller() {
325 // A caller granted `deploy` but the agent host only grants build/sign:
326 // deploy must be denied (agent grant is the ceiling).
327 let mut c = cfg();
328 c.allow[0].actuate.push("deploy".into());
329 assert_eq!(authorize(&c, &fw13(), &Action::Deploy), AuthDecision::Denied);
330 assert_eq!(authorize(&c, &fw13(), &Action::Sign), AuthDecision::Allow);
331 }
332
333 #[test]
334 fn tag_identity_matches() {
335 let mut c = cfg();
336 c.allow[0].identity = "tag:builder".into();
337 let tagged = CallerIdentity { node: "whatever".into(), tags: vec!["tag:builder".into()] };
338 assert_eq!(authorize(&c, &tagged, &Action::Sign), AuthDecision::Allow);
339 }
340 }
341