//! Build orchestration: fan a `(app, version)` out across its targets, each //! running its recipe concurrently on the host that can build it. //! //! A build inserts one `builds` row, then spawns one task per target. Each //! target task registers itself in the single-slot guard (a newer build for //! the same `(app, target)` aborts the in-flight one — latest wins, but other //! targets keep running, which is the fan-out), then runs the recipe. use crate::domain::{AppId, Status, Step, Target, Version}; use crate::engine::{self, RecipeCtx}; use crate::events::{self, Event}; use crate::state::AppState; use anyhow::{Context, Result}; use std::path::PathBuf; use std::sync::Arc; /// Resolve the version to build: explicit, or read from `tauri.conf.json`. pub fn resolve_version(state: &AppState, app: &AppId, explicit: Option) -> Result { if let Some(v) = explicit { return Version::parse(&v).map_err(|e| anyhow::anyhow!(e)); } let cfg = state.topo.app(app).ok_or_else(|| anyhow::anyhow!("unknown app `{app}`"))?; engine::version_from_tauri_conf(&cfg.repo) } /// Validate + default the target list against what the app ships. pub fn resolve_targets(state: &AppState, app: &AppId, requested: Vec) -> Result> { let cfg = state.topo.app(app).ok_or_else(|| anyhow::anyhow!("unknown app `{app}`"))?; if requested.is_empty() { return Ok(cfg.targets.clone()); } for t in &requested { anyhow::ensure!(cfg.targets.contains(t), "app `{app}` does not ship target {t}"); anyhow::ensure!(state.topo.host_for(*t).is_some(), "no host can build {t}"); } Ok(requested) } /// Insert the build row and spawn per-target tasks. Returns the build id. pub async fn start_build( state: AppState, app: AppId, version: Version, targets: Vec, ) -> Result { let build_id: i64 = sqlx::query_scalar( "INSERT INTO builds (app, version, status, created_at) VALUES (?, ?, 'running', ?) RETURNING id", ) .bind(app.as_str()) .bind(version.to_string()) .bind(chrono::Utc::now().to_rfc3339()) .fetch_one(&state.pool) .await .context("insert build")?; events::emit( &state.events, Event::BuildRequested { app: app.clone(), version: version.clone(), targets: targets.clone() }, ); for target in targets { let state = state.clone(); let app = app.clone(); let version = version.clone(); let key = (app.clone(), target); // Latest-wins: abort any in-flight run for this (app, target). { let mut active = state.active.lock().await; if let Some(prev) = active.remove(&key) { if !prev.is_finished() { events::emit( &state.events, Event::TargetAborted { app: app.clone(), target }, ); prev.abort(); } } } let handle = tokio::spawn(run_target(state.clone(), build_id, app, version, target)); state.active.lock().await.insert(key, handle.abort_handle()); } // Mark the build done once all target tasks settle. Spawned so /build // returns immediately. tokio::spawn(finalize_build(state, build_id)); Ok(build_id) } /// Run one target's recipe end to end, updating its `target_runs` row. async fn run_target(state: AppState, build_id: i64, app: AppId, version: Version, target: Target) { let target_run_id: i64 = match sqlx::query_scalar( "INSERT INTO target_runs (build_id, app, version, target, status, started_at) VALUES (?, ?, ?, ?, 'running', ?) RETURNING id", ) .bind(build_id) .bind(app.as_str()) .bind(version.to_string()) .bind(target.to_string()) .bind(chrono::Utc::now().to_rfc3339()) .fetch_one(&state.pool) .await { Ok(id) => id, Err(e) => { tracing::error!(%app, %target, error = %e, "could not create target_run"); return; } }; events::emit( &state.events, Event::TargetStart { app: app.clone(), version: version.clone(), target }, ); let recipe_src = match read_recipe(&state, &app, target) { Ok(s) => s, Err(e) => { fail_target(&state, target_run_id, &app, &version, target, Step::Checkout, &format!("{e:#}")).await; return; } }; let ctx = Arc::new(RecipeCtx::new( app.clone(), version.clone(), target, target_run_id, state.topo.remote_hosts(), state.pool.clone(), state.events.clone(), state.cfg.clone(), state.ota.clone(), tokio::runtime::Handle::current(), )); // Rhai is synchronous; run the recipe (and its final step finalization) on // a blocking thread so host functions can `block_on` without sitting on a // runtime worker. let ctx_run = ctx.clone(); let outcome = tokio::task::spawn_blocking(move || { let engine = engine::build_engine(ctx_run.clone()); let res = engine.run(&recipe_src); let last_step = ctx_run.current_step(); match &res { Ok(_) => { let _ = ctx_run.finish_step(Status::Ok); } Err(_) => { let _ = ctx_run.finish_step(Status::Failed); } } res.map(|_| ()).map_err(|e| (last_step, e.to_string())) }) .await; match outcome { Ok(Ok(())) => { let artifacts = collected_artifacts(&state, &app, &version); let _ = sqlx::query( "UPDATE target_runs SET status = 'ok', current_step = NULL, finished_at = ? WHERE id = ?", ) .bind(chrono::Utc::now().to_rfc3339()) .bind(target_run_id) .execute(&state.pool) .await; events::emit(&state.events, Event::TargetOk { app, version, target, artifacts }); } Ok(Err((step, msg))) => { fail_target(&state, target_run_id, &app, &version, target, step, &msg).await; } Err(join_err) => { // Task was aborted (superseded) or panicked. let msg = if join_err.is_cancelled() { "aborted (superseded)".to_string() } else { format!("recipe task panicked: {join_err}") }; fail_target(&state, target_run_id, &app, &version, target, Step::Build, &msg).await; } } } async fn fail_target( state: &AppState, target_run_id: i64, app: &AppId, version: &Version, target: Target, step: Step, error: &str, ) { let _ = sqlx::query( "UPDATE target_runs SET status = 'failed', error = ?, finished_at = ? WHERE id = ?", ) .bind(error) .bind(chrono::Utc::now().to_rfc3339()) .bind(target_run_id) .execute(&state.pool) .await; events::emit( &state.events, Event::TargetFailed { app: app.clone(), version: version.clone(), target, step, error: error.to_string() }, ); } /// Wait for all target runs of a build to leave `running`, then stamp the /// build's terminal status. async fn finalize_build(state: AppState, build_id: i64) { loop { let running: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM target_runs WHERE build_id = ? AND status = 'running'", ) .bind(build_id) .fetch_one(&state.pool) .await .unwrap_or(0); if running == 0 { break; } tokio::time::sleep(std::time::Duration::from_millis(500)).await; } let failed: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM target_runs WHERE build_id = ? AND status = 'failed'", ) .bind(build_id) .fetch_one(&state.pool) .await .unwrap_or(0); let status = if failed == 0 { "ok" } else { "failed" }; let _ = sqlx::query("UPDATE builds SET status = ?, finished_at = ? WHERE id = ?") .bind(status) .bind(chrono::Utc::now().to_rfc3339()) .bind(build_id) .execute(&state.pool) .await; } /// Read the recipe text for `(app, target)` from the app's checkout on the /// daemon host. `//.rhai`. fn read_recipe(state: &AppState, app: &AppId, target: Target) -> Result { let cfg = state.topo.app(app).ok_or_else(|| anyhow::anyhow!("unknown app `{app}`"))?; let path: PathBuf = engine::expand_tilde(&cfg.repo) .join(&cfg.recipe_dir) .join(format!("{}.rhai", target.platform.as_str())); std::fs::read_to_string(&path).with_context(|| format!("reading recipe {}", path.display())) } fn collected_artifacts(state: &AppState, app: &AppId, version: &Version) -> Vec { let dir = state.cfg.dist_root.join(app.as_str()).join(version.to_string()); let Ok(rd) = std::fs::read_dir(&dir) else { return Vec::new() }; rd.filter_map(|e| e.ok()).map(|e| e.file_name().to_string_lossy().into_owned()).collect() } #[cfg(test)] mod tests { use super::*; use crate::config::Config; use crate::ota::OtaRegistry; use crate::topology::Topology; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; /// Stand up a tmp app repo + topology and run a real local recipe end to /// end: step transitions, streamed `sh_ok`, `version_of`, `log`, and a /// `collect` that pulls a built artifact into dist_root. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn local_linux_recipe_runs_end_to_end() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Fake app checkout: tauri.conf.json + a linux recipe. let repo = root.join("app"); std::fs::create_dir_all(repo.join("src-tauri")).unwrap(); std::fs::write( repo.join("src-tauri/tauri.conf.json"), r#"{"version":"0.0.1"}"#, ) .unwrap(); std::fs::create_dir_all(repo.join("dist/recipes")).unwrap(); // Build writes an artifact into the repo; collect pulls it to dist_root. std::fs::write( repo.join("dist/recipes/linux.rhai"), r#" step("build"); let v = version_of("demo"); log("building demo " + v); sh_ok("fw13", "echo compiling; mkdir -p REPO/out && echo bin > REPO/out/demo.bin"); step("collect"); collect("fw13", "REPO/out/demo.bin", "demo", v); "# .replace("REPO", repo.to_str().unwrap()), ) .unwrap(); let cfg = Config::for_tests(root); let pool = crate::db::open(&cfg.db_path).await.unwrap(); let topo: Topology = toml::from_str(&format!( r#" [[host]] name = "fw13" ssh = "local" targets = ["linux/x86_64"] [app.demo] repo = "{}" targets = ["linux/x86_64"] "#, repo.display() )) .unwrap(); let state = AppState { pool: pool.clone(), topo: Arc::new(topo), cfg: Arc::new(cfg), prom: crate::metrics::test_handle(), events: crate::events::channel(), ota: Arc::new(OtaRegistry::standard("https://makenot.work")), active: Arc::new(Mutex::new(HashMap::new())), }; let app = AppId::new("demo"); let version = Version::parse("0.0.1").unwrap(); let build_id = start_build(state.clone(), app, version, vec!["linux/x86_64".parse().unwrap()]) .await .unwrap(); // Wait for the target run to settle. let mut status = String::new(); for _ in 0..100 { status = sqlx::query_scalar("SELECT status FROM target_runs WHERE build_id = ?") .bind(build_id) .fetch_one(&pool) .await .unwrap(); if status != "running" { break; } tokio::time::sleep(std::time::Duration::from_millis(50)).await; } assert_eq!(status, "ok", "target run should succeed"); // Both steps recorded and finished ok. let steps: Vec<(String, String)> = sqlx::query_as("SELECT step, status FROM step_runs WHERE target_run_id IN (SELECT id FROM target_runs WHERE build_id = ?) ORDER BY id") .bind(build_id) .fetch_all(&pool) .await .unwrap(); let names: Vec<&str> = steps.iter().map(|(s, _)| s.as_str()).collect(); assert_eq!(names, vec!["build", "collect"]); assert!(steps.iter().all(|(_, st)| st == "ok")); // Artifact landed in dist_root and a step log was written. let artifact = state.cfg.dist_root.join("demo/0.0.1/demo.bin"); assert!(artifact.exists(), "collect should copy the artifact"); let log = state.cfg.logs_root.join("demo/0.0.1/linux-x86_64/build.log"); assert!(log.exists(), "build step log should exist"); assert!(std::fs::read_to_string(&log).unwrap().contains("compiling")); } }