Skip to main content

max / makenotwork

10.0 KB · 318 lines History Blame Raw
1 //! HTTP + WS contract (mirrors Sando's shape).
2 //!
3 //! - `GET /state` — latest build + its target x step matrix (loose JSON).
4 //! - `POST /build` — `{app, version?, targets?[]}` kick a build.
5 //! - `POST /retry` — `{app, target, version?}` re-run one target.
6 //! - `GET /logs/{app}/{version}/{target}/{step}` — post-mortem step log.
7 //! - `GET /events` — WS; broadcasts `EventEnvelope` JSON frames.
8 //! - `GET /metrics` — Prometheus.
9
10 use crate::domain::{AppId, Target};
11 use crate::error::{Error, Result};
12 use crate::runner;
13 use crate::state::AppState;
14 use axum::extract::{Path, State, WebSocketUpgrade};
15 use axum::response::IntoResponse;
16 use axum::routing::{get, post};
17 use axum::{Json, Router};
18 use serde::{Deserialize, Serialize};
19 use sqlx::Row;
20
21 pub fn router(state: AppState) -> Router {
22 let prom = state.prom.clone();
23 Router::new()
24 .route("/state", get(get_state))
25 .route("/build", post(build))
26 .route("/retry", post(retry))
27 .route("/logs/{app}/{version}/{target}/{step}", get(get_step_log))
28 .route("/events", get(events_ws))
29 .with_state(state)
30 .route("/metrics", get(crate::metrics::render).with_state(prom))
31 }
32
33 #[derive(Serialize)]
34 struct StateView {
35 build: Option<BuildView>,
36 }
37
38 #[derive(Serialize)]
39 struct BuildView {
40 id: i64,
41 app: String,
42 version: String,
43 status: String,
44 created_at: String,
45 targets: Vec<TargetView>,
46 }
47
48 #[derive(Serialize)]
49 struct TargetView {
50 target: String,
51 status: String,
52 current_step: Option<String>,
53 error: Option<String>,
54 steps: Vec<StepView>,
55 }
56
57 #[derive(Serialize)]
58 struct StepView {
59 run_id: i64,
60 step: String,
61 status: String,
62 log_ref: Option<String>,
63 }
64
65 async fn get_state(State(s): State<AppState>) -> Result<Json<StateView>> {
66 let latest = sqlx::query(
67 "SELECT id, app, version, status, created_at FROM builds ORDER BY id DESC LIMIT 1",
68 )
69 .fetch_optional(&s.pool)
70 .await?;
71
72 let Some(b) = latest else {
73 return Ok(Json(StateView { build: None }));
74 };
75 let build_id: i64 = b.get("id");
76
77 let target_rows = sqlx::query(
78 "SELECT id, target, status, current_step, error FROM target_runs
79 WHERE build_id = ? ORDER BY target",
80 )
81 .bind(build_id)
82 .fetch_all(&s.pool)
83 .await?;
84
85 let mut targets = Vec::with_capacity(target_rows.len());
86 for tr in target_rows {
87 let target_run_id: i64 = tr.get("id");
88 // Latest row per step for this target run.
89 let steps: Vec<StepView> = sqlx::query(
90 "SELECT id, step, status, log_ref FROM step_runs sr
91 WHERE target_run_id = ?1
92 AND id = (SELECT MAX(id) FROM step_runs
93 WHERE target_run_id = ?1 AND step = sr.step)
94 ORDER BY id",
95 )
96 .bind(target_run_id)
97 .fetch_all(&s.pool)
98 .await?
99 .into_iter()
100 .map(|r| StepView {
101 run_id: r.get("id"),
102 step: r.get("step"),
103 status: r.get("status"),
104 log_ref: r.get("log_ref"),
105 })
106 .collect();
107
108 targets.push(TargetView {
109 target: tr.get("target"),
110 status: tr.get("status"),
111 current_step: tr.get("current_step"),
112 error: tr.get("error"),
113 steps,
114 });
115 }
116
117 Ok(Json(StateView {
118 build: Some(BuildView {
119 id: build_id,
120 app: b.get("app"),
121 version: b.get("version"),
122 status: b.get("status"),
123 created_at: b.get("created_at"),
124 targets,
125 }),
126 }))
127 }
128
129 #[derive(Deserialize, Default)]
130 struct BuildBody {
131 app: String,
132 #[serde(default)]
133 version: Option<String>,
134 #[serde(default)]
135 targets: Vec<String>,
136 }
137
138 async fn build(State(s): State<AppState>, Json(body): Json<BuildBody>) -> Result<Json<serde_json::Value>> {
139 let app = AppId::new(body.app);
140 let targets = parse_targets(body.targets)?;
141 let version = runner::resolve_version(&s, &app, body.version).map_err(Error::Other)?;
142 let targets = runner::resolve_targets(&s, &app, targets).map_err(Error::Other)?;
143 let build_id = runner::start_build(s, app, version.clone(), targets)
144 .await
145 .map_err(Error::Other)?;
146 Ok(Json(serde_json::json!({ "accepted": true, "build_id": build_id, "version": version.to_string() })))
147 }
148
149 #[derive(Deserialize)]
150 struct RetryBody {
151 app: String,
152 target: String,
153 #[serde(default)]
154 version: Option<String>,
155 }
156
157 async fn retry(State(s): State<AppState>, Json(body): Json<RetryBody>) -> Result<Json<serde_json::Value>> {
158 let app = AppId::new(body.app);
159 let target: Target = body.target.parse().map_err(Error::BadRequest)?;
160 let version = runner::resolve_version(&s, &app, body.version).map_err(Error::Other)?;
161 let targets = runner::resolve_targets(&s, &app, vec![target]).map_err(Error::Other)?;
162 let build_id = runner::start_build(s, app, version.clone(), targets)
163 .await
164 .map_err(Error::Other)?;
165 Ok(Json(serde_json::json!({ "accepted": true, "build_id": build_id, "target": target.to_string() })))
166 }
167
168 fn parse_targets(raw: Vec<String>) -> Result<Vec<Target>> {
169 raw.into_iter().map(|t| t.parse::<Target>().map_err(Error::BadRequest)).collect()
170 }
171
172 async fn get_step_log(
173 State(s): State<AppState>,
174 Path((app, version, target, step)): Path<(String, String, String, String)>,
175 ) -> Result<axum::response::Response> {
176 fn safe(seg: &str) -> bool {
177 !seg.is_empty() && !seg.contains('/') && !seg.contains('\\') && seg != "." && seg != ".."
178 }
179 if ![&app, &version, &target, &step].into_iter().all(|s| safe(s)) {
180 return Err(Error::NotFound);
181 }
182 let path = s.cfg.logs_root.join(&app).join(&version).join(&target).join(format!("{step}.log"));
183 match tokio::fs::read(&path).await {
184 Ok(bytes) => Ok((
185 [(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")],
186 bytes,
187 )
188 .into_response()),
189 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(Error::NotFound),
190 Err(e) => Err(Error::Other(e.into())),
191 }
192 }
193
194 async fn events_ws(ws: WebSocketUpgrade, State(s): State<AppState>) -> impl IntoResponse {
195 use axum::extract::ws::Message;
196 use tokio::sync::broadcast::error::RecvError;
197
198 ws.on_upgrade(move |mut socket| async move {
199 let mut rx = s.events.subscribe();
200 loop {
201 match rx.recv().await {
202 Ok(env) => {
203 let json = match serde_json::to_string(&env) {
204 Ok(s) => s,
205 Err(e) => {
206 tracing::warn!(error = %e, "events ws: serialize failed");
207 continue;
208 }
209 };
210 if socket.send(Message::Text(json.into())).await.is_err() {
211 break;
212 }
213 }
214 Err(RecvError::Lagged(n)) => {
215 let _ = socket
216 .send(Message::Text(format!(r#"{{"kind":"lagged","skipped":{n}}}"#).into()))
217 .await;
218 }
219 Err(RecvError::Closed) => break,
220 }
221 }
222 })
223 }
224
225 #[cfg(test)]
226 mod tests {
227 use super::*;
228 use crate::config::Config;
229 use crate::ota::OtaRegistry;
230 use crate::topology::Topology;
231 use axum::body::Body;
232 use axum::http::{Request, StatusCode};
233 use http_body_util::BodyExt;
234 use std::collections::HashMap;
235 use std::sync::Arc;
236 use tokio::sync::Mutex;
237 use tower::ServiceExt;
238
239 async fn test_state(root: &std::path::Path) -> AppState {
240 let cfg = Config::for_tests(root);
241 let pool = crate::db::open(&cfg.db_path).await.unwrap();
242 let topo: Topology = toml::from_str(
243 r#"
244 [[host]]
245 name = "fw13"
246 ssh = "local"
247 targets = ["linux/x86_64"]
248
249 [app.goingson]
250 repo = "/tmp/none"
251 targets = ["linux/x86_64"]
252 "#,
253 )
254 .unwrap();
255 AppState {
256 pool,
257 topo: Arc::new(topo),
258 cfg: Arc::new(cfg),
259 prom: crate::metrics::test_handle(),
260 events: crate::events::channel(),
261 ota: Arc::new(OtaRegistry::standard("https://makenot.work")),
262 active: Arc::new(Mutex::new(HashMap::new())),
263 }
264 }
265
266 async fn body_string(resp: axum::response::Response) -> String {
267 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
268 String::from_utf8(bytes.to_vec()).unwrap()
269 }
270
271 #[tokio::test]
272 async fn state_is_empty_initially() {
273 let tmp = tempfile::tempdir().unwrap();
274 let app = router(test_state(tmp.path()).await);
275 let resp = app
276 .oneshot(Request::builder().uri("/state").body(Body::empty()).unwrap())
277 .await
278 .unwrap();
279 assert_eq!(resp.status(), StatusCode::OK);
280 assert_eq!(body_string(resp).await, r#"{"build":null}"#);
281 }
282
283 #[tokio::test]
284 async fn step_log_rejects_traversal() {
285 let tmp = tempfile::tempdir().unwrap();
286 let app = router(test_state(tmp.path()).await);
287 let resp = app
288 .oneshot(
289 Request::builder()
290 .uri("/logs/goingson/0.4.1/..%2f..%2fetc/passwd")
291 .body(Body::empty())
292 .unwrap(),
293 )
294 .await
295 .unwrap();
296 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
297 }
298
299 #[tokio::test]
300 async fn build_rejects_unknown_target() {
301 let tmp = tempfile::tempdir().unwrap();
302 let app = router(test_state(tmp.path()).await);
303 let resp = app
304 .oneshot(
305 Request::builder()
306 .method("POST")
307 .uri("/build")
308 .header("content-type", "application/json")
309 .body(Body::from(r#"{"app":"goingson","targets":["windows/x86_64"]}"#))
310 .unwrap(),
311 )
312 .await
313 .unwrap();
314 // windows/x86_64 isn't shipped by the test app -> 500 (Other).
315 assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
316 }
317 }
318