Skip to main content

max / makenotwork

12.8 KB · 361 lines History Blame Raw
1 //! Build orchestration: fan a `(app, version)` out across its targets, each
2 //! running its recipe concurrently on the host that can build it.
3 //!
4 //! A build inserts one `builds` row, then spawns one task per target. Each
5 //! target task registers itself in the single-slot guard (a newer build for
6 //! the same `(app, target)` aborts the in-flight one — latest wins, but other
7 //! targets keep running, which is the fan-out), then runs the recipe.
8
9 use crate::domain::{AppId, Status, Step, Target, Version};
10 use crate::engine::{self, RecipeCtx};
11 use crate::events::{self, Event};
12 use crate::state::AppState;
13 use anyhow::{Context, Result};
14 use std::path::PathBuf;
15 use std::sync::Arc;
16
17 /// Resolve the version to build: explicit, or read from `tauri.conf.json`.
18 pub fn resolve_version(state: &AppState, app: &AppId, explicit: Option<String>) -> Result<Version> {
19 if let Some(v) = explicit {
20 return Version::parse(&v).map_err(|e| anyhow::anyhow!(e));
21 }
22 let cfg = state.topo.app(app).ok_or_else(|| anyhow::anyhow!("unknown app `{app}`"))?;
23 engine::version_from_tauri_conf(&cfg.repo)
24 }
25
26 /// Validate + default the target list against what the app ships.
27 pub fn resolve_targets(state: &AppState, app: &AppId, requested: Vec<Target>) -> Result<Vec<Target>> {
28 let cfg = state.topo.app(app).ok_or_else(|| anyhow::anyhow!("unknown app `{app}`"))?;
29 if requested.is_empty() {
30 return Ok(cfg.targets.clone());
31 }
32 for t in &requested {
33 anyhow::ensure!(cfg.targets.contains(t), "app `{app}` does not ship target {t}");
34 anyhow::ensure!(state.topo.host_for(*t).is_some(), "no host can build {t}");
35 }
36 Ok(requested)
37 }
38
39 /// Insert the build row and spawn per-target tasks. Returns the build id.
40 pub async fn start_build(
41 state: AppState,
42 app: AppId,
43 version: Version,
44 targets: Vec<Target>,
45 ) -> Result<i64> {
46 let build_id: i64 = sqlx::query_scalar(
47 "INSERT INTO builds (app, version, status, created_at) VALUES (?, ?, 'running', ?) RETURNING id",
48 )
49 .bind(app.as_str())
50 .bind(version.to_string())
51 .bind(chrono::Utc::now().to_rfc3339())
52 .fetch_one(&state.pool)
53 .await
54 .context("insert build")?;
55
56 events::emit(
57 &state.events,
58 Event::BuildRequested { app: app.clone(), version: version.clone(), targets: targets.clone() },
59 );
60
61 for target in targets {
62 let state = state.clone();
63 let app = app.clone();
64 let version = version.clone();
65 let key = (app.clone(), target);
66
67 // Latest-wins: abort any in-flight run for this (app, target).
68 {
69 let mut active = state.active.lock().await;
70 if let Some(prev) = active.remove(&key) {
71 if !prev.is_finished() {
72 events::emit(
73 &state.events,
74 Event::TargetAborted { app: app.clone(), target },
75 );
76 prev.abort();
77 }
78 }
79 }
80
81 let handle = tokio::spawn(run_target(state.clone(), build_id, app, version, target));
82 state.active.lock().await.insert(key, handle.abort_handle());
83 }
84
85 // Mark the build done once all target tasks settle. Spawned so /build
86 // returns immediately.
87 tokio::spawn(finalize_build(state, build_id));
88 Ok(build_id)
89 }
90
91 /// Run one target's recipe end to end, updating its `target_runs` row.
92 async fn run_target(state: AppState, build_id: i64, app: AppId, version: Version, target: Target) {
93 let target_run_id: i64 = match sqlx::query_scalar(
94 "INSERT INTO target_runs (build_id, app, version, target, status, started_at)
95 VALUES (?, ?, ?, ?, 'running', ?) RETURNING id",
96 )
97 .bind(build_id)
98 .bind(app.as_str())
99 .bind(version.to_string())
100 .bind(target.to_string())
101 .bind(chrono::Utc::now().to_rfc3339())
102 .fetch_one(&state.pool)
103 .await
104 {
105 Ok(id) => id,
106 Err(e) => {
107 tracing::error!(%app, %target, error = %e, "could not create target_run");
108 return;
109 }
110 };
111
112 events::emit(
113 &state.events,
114 Event::TargetStart { app: app.clone(), version: version.clone(), target },
115 );
116
117 let recipe_src = match read_recipe(&state, &app, target) {
118 Ok(s) => s,
119 Err(e) => {
120 fail_target(&state, target_run_id, &app, &version, target, Step::Checkout, &format!("{e:#}")).await;
121 return;
122 }
123 };
124
125 let ctx = Arc::new(RecipeCtx::new(
126 app.clone(),
127 version.clone(),
128 target,
129 target_run_id,
130 state.topo.remote_hosts(),
131 state.pool.clone(),
132 state.events.clone(),
133 state.cfg.clone(),
134 state.ota.clone(),
135 tokio::runtime::Handle::current(),
136 ));
137
138 // Rhai is synchronous; run the recipe (and its final step finalization) on
139 // a blocking thread so host functions can `block_on` without sitting on a
140 // runtime worker.
141 let ctx_run = ctx.clone();
142 let outcome = tokio::task::spawn_blocking(move || {
143 let engine = engine::build_engine(ctx_run.clone());
144 let res = engine.run(&recipe_src);
145 let last_step = ctx_run.current_step();
146 match &res {
147 Ok(_) => {
148 let _ = ctx_run.finish_step(Status::Ok);
149 }
150 Err(_) => {
151 let _ = ctx_run.finish_step(Status::Failed);
152 }
153 }
154 res.map(|_| ()).map_err(|e| (last_step, e.to_string()))
155 })
156 .await;
157
158 match outcome {
159 Ok(Ok(())) => {
160 let artifacts = collected_artifacts(&state, &app, &version);
161 let _ = sqlx::query(
162 "UPDATE target_runs SET status = 'ok', current_step = NULL, finished_at = ? WHERE id = ?",
163 )
164 .bind(chrono::Utc::now().to_rfc3339())
165 .bind(target_run_id)
166 .execute(&state.pool)
167 .await;
168 events::emit(&state.events, Event::TargetOk { app, version, target, artifacts });
169 }
170 Ok(Err((step, msg))) => {
171 fail_target(&state, target_run_id, &app, &version, target, step, &msg).await;
172 }
173 Err(join_err) => {
174 // Task was aborted (superseded) or panicked.
175 let msg = if join_err.is_cancelled() { "aborted (superseded)".to_string() } else { format!("recipe task panicked: {join_err}") };
176 fail_target(&state, target_run_id, &app, &version, target, Step::Build, &msg).await;
177 }
178 }
179 }
180
181 async fn fail_target(
182 state: &AppState,
183 target_run_id: i64,
184 app: &AppId,
185 version: &Version,
186 target: Target,
187 step: Step,
188 error: &str,
189 ) {
190 let _ = sqlx::query(
191 "UPDATE target_runs SET status = 'failed', error = ?, finished_at = ? WHERE id = ?",
192 )
193 .bind(error)
194 .bind(chrono::Utc::now().to_rfc3339())
195 .bind(target_run_id)
196 .execute(&state.pool)
197 .await;
198 events::emit(
199 &state.events,
200 Event::TargetFailed { app: app.clone(), version: version.clone(), target, step, error: error.to_string() },
201 );
202 }
203
204 /// Wait for all target runs of a build to leave `running`, then stamp the
205 /// build's terminal status.
206 async fn finalize_build(state: AppState, build_id: i64) {
207 loop {
208 let running: i64 = sqlx::query_scalar(
209 "SELECT COUNT(*) FROM target_runs WHERE build_id = ? AND status = 'running'",
210 )
211 .bind(build_id)
212 .fetch_one(&state.pool)
213 .await
214 .unwrap_or(0);
215 if running == 0 {
216 break;
217 }
218 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
219 }
220 let failed: i64 = sqlx::query_scalar(
221 "SELECT COUNT(*) FROM target_runs WHERE build_id = ? AND status = 'failed'",
222 )
223 .bind(build_id)
224 .fetch_one(&state.pool)
225 .await
226 .unwrap_or(0);
227 let status = if failed == 0 { "ok" } else { "failed" };
228 let _ = sqlx::query("UPDATE builds SET status = ?, finished_at = ? WHERE id = ?")
229 .bind(status)
230 .bind(chrono::Utc::now().to_rfc3339())
231 .bind(build_id)
232 .execute(&state.pool)
233 .await;
234 }
235
236 /// Read the recipe text for `(app, target)` from the app's checkout on the
237 /// daemon host. `<repo>/<recipe_dir>/<platform>.rhai`.
238 fn read_recipe(state: &AppState, app: &AppId, target: Target) -> Result<String> {
239 let cfg = state.topo.app(app).ok_or_else(|| anyhow::anyhow!("unknown app `{app}`"))?;
240 let path: PathBuf = engine::expand_tilde(&cfg.repo)
241 .join(&cfg.recipe_dir)
242 .join(format!("{}.rhai", target.platform.as_str()));
243 std::fs::read_to_string(&path).with_context(|| format!("reading recipe {}", path.display()))
244 }
245
246 fn collected_artifacts(state: &AppState, app: &AppId, version: &Version) -> Vec<String> {
247 let dir = state.cfg.dist_root.join(app.as_str()).join(version.to_string());
248 let Ok(rd) = std::fs::read_dir(&dir) else { return Vec::new() };
249 rd.filter_map(|e| e.ok()).map(|e| e.file_name().to_string_lossy().into_owned()).collect()
250 }
251
252 #[cfg(test)]
253 mod tests {
254 use super::*;
255 use crate::config::Config;
256 use crate::ota::OtaRegistry;
257 use crate::topology::Topology;
258 use std::collections::HashMap;
259 use std::sync::Arc;
260 use tokio::sync::Mutex;
261
262 /// Stand up a tmp app repo + topology and run a real local recipe end to
263 /// end: step transitions, streamed `sh_ok`, `version_of`, `log`, and a
264 /// `collect` that pulls a built artifact into dist_root.
265 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
266 async fn local_linux_recipe_runs_end_to_end() {
267 let tmp = tempfile::tempdir().unwrap();
268 let root = tmp.path();
269
270 // Fake app checkout: tauri.conf.json + a linux recipe.
271 let repo = root.join("app");
272 std::fs::create_dir_all(repo.join("src-tauri")).unwrap();
273 std::fs::write(
274 repo.join("src-tauri/tauri.conf.json"),
275 r#"{"version":"0.0.1"}"#,
276 )
277 .unwrap();
278 std::fs::create_dir_all(repo.join("dist/recipes")).unwrap();
279 // Build writes an artifact into the repo; collect pulls it to dist_root.
280 std::fs::write(
281 repo.join("dist/recipes/linux.rhai"),
282 r#"
283 step("build");
284 let v = version_of("demo");
285 log("building demo " + v);
286 sh_ok("fw13", "echo compiling; mkdir -p REPO/out && echo bin > REPO/out/demo.bin");
287 step("collect");
288 collect("fw13", "REPO/out/demo.bin", "demo", v);
289 "#
290 .replace("REPO", repo.to_str().unwrap()),
291 )
292 .unwrap();
293
294 let cfg = Config::for_tests(root);
295 let pool = crate::db::open(&cfg.db_path).await.unwrap();
296 let topo: Topology = toml::from_str(&format!(
297 r#"
298 [[host]]
299 name = "fw13"
300 ssh = "local"
301 targets = ["linux/x86_64"]
302
303 [app.demo]
304 repo = "{}"
305 targets = ["linux/x86_64"]
306 "#,
307 repo.display()
308 ))
309 .unwrap();
310
311 let state = AppState {
312 pool: pool.clone(),
313 topo: Arc::new(topo),
314 cfg: Arc::new(cfg),
315 prom: crate::metrics::test_handle(),
316 events: crate::events::channel(),
317 ota: Arc::new(OtaRegistry::standard("https://makenot.work")),
318 active: Arc::new(Mutex::new(HashMap::new())),
319 };
320
321 let app = AppId::new("demo");
322 let version = Version::parse("0.0.1").unwrap();
323 let build_id = start_build(state.clone(), app, version, vec!["linux/x86_64".parse().unwrap()])
324 .await
325 .unwrap();
326
327 // Wait for the target run to settle.
328 let mut status = String::new();
329 for _ in 0..100 {
330 status = sqlx::query_scalar("SELECT status FROM target_runs WHERE build_id = ?")
331 .bind(build_id)
332 .fetch_one(&pool)
333 .await
334 .unwrap();
335 if status != "running" {
336 break;
337 }
338 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
339 }
340 assert_eq!(status, "ok", "target run should succeed");
341
342 // Both steps recorded and finished ok.
343 let steps: Vec<(String, String)> =
344 sqlx::query_as("SELECT step, status FROM step_runs WHERE target_run_id IN (SELECT id FROM target_runs WHERE build_id = ?) ORDER BY id")
345 .bind(build_id)
346 .fetch_all(&pool)
347 .await
348 .unwrap();
349 let names: Vec<&str> = steps.iter().map(|(s, _)| s.as_str()).collect();
350 assert_eq!(names, vec!["build", "collect"]);
351 assert!(steps.iter().all(|(_, st)| st == "ok"));
352
353 // Artifact landed in dist_root and a step log was written.
354 let artifact = state.cfg.dist_root.join("demo/0.0.1/demo.bin");
355 assert!(artifact.exists(), "collect should copy the artifact");
356 let log = state.cfg.logs_root.join("demo/0.0.1/linux-x86_64/build.log");
357 assert!(log.exists(), "build step log should exist");
358 assert!(std::fs::read_to_string(&log).unwrap().contains("compiling"));
359 }
360 }
361