//! HTTP + WS contract (mirrors Sando's shape). //! //! - `GET /state` — latest build + its target x step matrix (loose JSON). //! - `POST /build` — `{app, version?, targets?[]}` kick a build. //! - `POST /retry` — `{app, target, version?}` re-run one target. //! - `GET /logs/{app}/{version}/{target}/{step}` — post-mortem step log. //! - `GET /events` — WS; broadcasts `EventEnvelope` JSON frames. //! - `GET /metrics` — Prometheus. use crate::domain::{AppId, Target}; use crate::error::{Error, Result}; use crate::runner; use crate::state::AppState; use axum::extract::{Path, State, WebSocketUpgrade}; use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::{Json, Router}; use serde::{Deserialize, Serialize}; use sqlx::Row; pub fn router(state: AppState) -> Router { let prom = state.prom.clone(); Router::new() .route("/state", get(get_state)) .route("/build", post(build)) .route("/retry", post(retry)) .route("/logs/{app}/{version}/{target}/{step}", get(get_step_log)) .route("/events", get(events_ws)) .with_state(state) .route("/metrics", get(crate::metrics::render).with_state(prom)) } #[derive(Serialize)] struct StateView { build: Option, } #[derive(Serialize)] struct BuildView { id: i64, app: String, version: String, status: String, created_at: String, targets: Vec, } #[derive(Serialize)] struct TargetView { target: String, status: String, current_step: Option, error: Option, steps: Vec, } #[derive(Serialize)] struct StepView { run_id: i64, step: String, status: String, log_ref: Option, } async fn get_state(State(s): State) -> Result> { let latest = sqlx::query( "SELECT id, app, version, status, created_at FROM builds ORDER BY id DESC LIMIT 1", ) .fetch_optional(&s.pool) .await?; let Some(b) = latest else { return Ok(Json(StateView { build: None })); }; let build_id: i64 = b.get("id"); let target_rows = sqlx::query( "SELECT id, target, status, current_step, error FROM target_runs WHERE build_id = ? ORDER BY target", ) .bind(build_id) .fetch_all(&s.pool) .await?; let mut targets = Vec::with_capacity(target_rows.len()); for tr in target_rows { let target_run_id: i64 = tr.get("id"); // Latest row per step for this target run. let steps: Vec = sqlx::query( "SELECT id, step, status, log_ref FROM step_runs sr WHERE target_run_id = ?1 AND id = (SELECT MAX(id) FROM step_runs WHERE target_run_id = ?1 AND step = sr.step) ORDER BY id", ) .bind(target_run_id) .fetch_all(&s.pool) .await? .into_iter() .map(|r| StepView { run_id: r.get("id"), step: r.get("step"), status: r.get("status"), log_ref: r.get("log_ref"), }) .collect(); targets.push(TargetView { target: tr.get("target"), status: tr.get("status"), current_step: tr.get("current_step"), error: tr.get("error"), steps, }); } Ok(Json(StateView { build: Some(BuildView { id: build_id, app: b.get("app"), version: b.get("version"), status: b.get("status"), created_at: b.get("created_at"), targets, }), })) } #[derive(Deserialize, Default)] struct BuildBody { app: String, #[serde(default)] version: Option, #[serde(default)] targets: Vec, } async fn build(State(s): State, Json(body): Json) -> Result> { let app = AppId::new(body.app); let targets = parse_targets(body.targets)?; let version = runner::resolve_version(&s, &app, body.version).map_err(Error::Other)?; let targets = runner::resolve_targets(&s, &app, targets).map_err(Error::Other)?; let build_id = runner::start_build(s, app, version.clone(), targets) .await .map_err(Error::Other)?; Ok(Json(serde_json::json!({ "accepted": true, "build_id": build_id, "version": version.to_string() }))) } #[derive(Deserialize)] struct RetryBody { app: String, target: String, #[serde(default)] version: Option, } async fn retry(State(s): State, Json(body): Json) -> Result> { let app = AppId::new(body.app); let target: Target = body.target.parse().map_err(Error::BadRequest)?; let version = runner::resolve_version(&s, &app, body.version).map_err(Error::Other)?; let targets = runner::resolve_targets(&s, &app, vec![target]).map_err(Error::Other)?; let build_id = runner::start_build(s, app, version.clone(), targets) .await .map_err(Error::Other)?; Ok(Json(serde_json::json!({ "accepted": true, "build_id": build_id, "target": target.to_string() }))) } fn parse_targets(raw: Vec) -> Result> { raw.into_iter().map(|t| t.parse::().map_err(Error::BadRequest)).collect() } async fn get_step_log( State(s): State, Path((app, version, target, step)): Path<(String, String, String, String)>, ) -> Result { fn safe(seg: &str) -> bool { !seg.is_empty() && !seg.contains('/') && !seg.contains('\\') && seg != "." && seg != ".." } if ![&app, &version, &target, &step].into_iter().all(|s| safe(s)) { return Err(Error::NotFound); } let path = s.cfg.logs_root.join(&app).join(&version).join(&target).join(format!("{step}.log")); match tokio::fs::read(&path).await { Ok(bytes) => Ok(( [(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")], bytes, ) .into_response()), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(Error::NotFound), Err(e) => Err(Error::Other(e.into())), } } async fn events_ws(ws: WebSocketUpgrade, State(s): State) -> impl IntoResponse { use axum::extract::ws::Message; use tokio::sync::broadcast::error::RecvError; ws.on_upgrade(move |mut socket| async move { let mut rx = s.events.subscribe(); loop { match rx.recv().await { Ok(env) => { let json = match serde_json::to_string(&env) { Ok(s) => s, Err(e) => { tracing::warn!(error = %e, "events ws: serialize failed"); continue; } }; if socket.send(Message::Text(json.into())).await.is_err() { break; } } Err(RecvError::Lagged(n)) => { let _ = socket .send(Message::Text(format!(r#"{{"kind":"lagged","skipped":{n}}}"#).into())) .await; } Err(RecvError::Closed) => break, } } }) } #[cfg(test)] mod tests { use super::*; use crate::config::Config; use crate::ota::OtaRegistry; use crate::topology::Topology; use axum::body::Body; use axum::http::{Request, StatusCode}; use http_body_util::BodyExt; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; use tower::ServiceExt; async fn test_state(root: &std::path::Path) -> AppState { let cfg = Config::for_tests(root); let pool = crate::db::open(&cfg.db_path).await.unwrap(); let topo: Topology = toml::from_str( r#" [[host]] name = "fw13" ssh = "local" targets = ["linux/x86_64"] [app.goingson] repo = "/tmp/none" targets = ["linux/x86_64"] "#, ) .unwrap(); AppState { pool, topo: Arc::new(topo), cfg: Arc::new(cfg), prom: crate::metrics::test_handle(), events: crate::events::channel(), ota: Arc::new(OtaRegistry::standard("https://makenot.work")), active: Arc::new(Mutex::new(HashMap::new())), } } async fn body_string(resp: axum::response::Response) -> String { let bytes = resp.into_body().collect().await.unwrap().to_bytes(); String::from_utf8(bytes.to_vec()).unwrap() } #[tokio::test] async fn state_is_empty_initially() { let tmp = tempfile::tempdir().unwrap(); let app = router(test_state(tmp.path()).await); let resp = app .oneshot(Request::builder().uri("/state").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(resp.status(), StatusCode::OK); assert_eq!(body_string(resp).await, r#"{"build":null}"#); } #[tokio::test] async fn step_log_rejects_traversal() { let tmp = tempfile::tempdir().unwrap(); let app = router(test_state(tmp.path()).await); let resp = app .oneshot( Request::builder() .uri("/logs/goingson/0.4.1/..%2f..%2fetc/passwd") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn build_rejects_unknown_target() { let tmp = tempfile::tempdir().unwrap(); let app = router(test_state(tmp.path()).await); let resp = app .oneshot( Request::builder() .method("POST") .uri("/build") .header("content-type", "application/json") .body(Body::from(r#"{"app":"goingson","targets":["windows/x86_64"]}"#)) .unwrap(), ) .await .unwrap(); // windows/x86_64 isn't shipped by the test app -> 500 (Other). assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); } }