Skip to main content

max / makenotwork

22.4 KB · 621 lines History Blame Raw
1 //! Rhai recipe engine + host-function API.
2 //!
3 //! A `(app, target)` resolves to a `.rhai` recipe composed from a shared step
4 //! vocabulary. The daemon embeds Rhai and registers the host functions recipes
5 //! call; the recipe is the orchestration, the host functions are the
6 //! privileged primitives (run a command, read a secret, collect artifacts,
7 //! publish). Recipes are otherwise sandboxed โ€” no arbitrary FS/network except
8 //! through these functions โ€” matching the Balanced Breakfast plugin model.
9 //!
10 //! Rhai is synchronous; the engine runs each recipe on a blocking thread
11 //! (`spawn_blocking`, see [`crate::runner`]) and host functions bridge to async
12 //! work via `Handle::block_on`. That is sound only off a runtime worker thread,
13 //! which `spawn_blocking` guarantees.
14
15 use crate::config::Config;
16 use crate::domain::{AppId, Status, Step, StepRunId, Target, Version};
17 use crate::events::{self, Event, EventTx};
18 use crate::ota::{OtaRegistry, Release};
19 use anyhow::{Context as _, Result};
20 use ops_core::live_log::LiveLog;
21 use ops_core::remote::RemoteHost;
22 use rhai::{Engine, EvalAltResult, Map};
23 use sqlx::SqlitePool;
24 use std::collections::HashMap;
25 use std::path::{Path, PathBuf};
26 use std::sync::{Arc, Mutex};
27 use tokio::runtime::Handle;
28 use tokio::sync::Mutex as AsyncMutex;
29
30 /// The currently-open step within a recipe run: its DB row id, which step it
31 /// is, and the live-log sink that `sh`/`log` stream into.
32 struct StepState {
33 run_id: StepRunId,
34 step: Step,
35 log: Arc<AsyncMutex<LiveLog>>,
36 }
37
38 /// Everything a recipe's host functions need, shared (Arc) into each closure.
39 pub struct RecipeCtx {
40 pub app: AppId,
41 pub version: Version,
42 pub target: Target,
43 pub target_run_id: i64,
44 pub hosts: HashMap<String, RemoteHost>,
45 pub pool: SqlitePool,
46 pub events: EventTx,
47 pub cfg: Arc<Config>,
48 pub ota: Arc<OtaRegistry>,
49 pub rt: Handle,
50 current: Mutex<Option<StepState>>,
51 }
52
53 impl RecipeCtx {
54 #[allow(clippy::too_many_arguments)]
55 pub fn new(
56 app: AppId,
57 version: Version,
58 target: Target,
59 target_run_id: i64,
60 hosts: HashMap<String, RemoteHost>,
61 pool: SqlitePool,
62 events: EventTx,
63 cfg: Arc<Config>,
64 ota: Arc<OtaRegistry>,
65 rt: Handle,
66 ) -> Self {
67 Self {
68 app,
69 version,
70 target,
71 target_run_id,
72 hosts,
73 pool,
74 events,
75 cfg,
76 ota,
77 rt,
78 current: Mutex::new(None),
79 }
80 }
81
82 fn now() -> String {
83 chrono::Utc::now().to_rfc3339()
84 }
85
86 /// `<logs_root>/<app>/<version>/<target-with-slash-as-dash>/<step>.log`.
87 fn log_path(&self, step: Step) -> PathBuf {
88 let target_dir = self.target.to_string().replace('/', "-");
89 self.cfg
90 .logs_root
91 .join(self.app.as_str())
92 .join(self.version.to_string())
93 .join(target_dir)
94 .join(format!("{}.log", step.as_str()))
95 }
96
97 /// Close the previous step (as `Ok`), open a new one: insert its DB row,
98 /// open a live log whose chunks broadcast `StepLogChunk`, emit `StepStart`.
99 fn begin_step(self: &Arc<Self>, step: Step) -> Result<()> {
100 self.finish_step(Status::Ok)?;
101 let me = self.clone();
102 let run_id = self.rt.block_on(async move {
103 let started = Self::now();
104 let log_ref = me.log_path(step).to_string_lossy().into_owned();
105 let id: i64 = sqlx::query_scalar(
106 "INSERT INTO step_runs (target_run_id, step, status, log_ref, started_at)
107 VALUES (?, ?, 'running', ?, ?) RETURNING id",
108 )
109 .bind(me.target_run_id)
110 .bind(step.as_str())
111 .bind(&log_ref)
112 .bind(&started)
113 .fetch_one(&me.pool)
114 .await
115 .context("insert step_run")?;
116 sqlx::query("UPDATE target_runs SET current_step = ? WHERE id = ?")
117 .bind(step.as_str())
118 .bind(me.target_run_id)
119 .execute(&me.pool)
120 .await
121 .context("update current_step")?;
122 anyhow::Ok(StepRunId(id))
123 })?;
124
125 // Live log: each chunk fans out as a StepLogChunk event keyed by run_id.
126 let events = self.events.clone();
127 let cb_run_id = run_id;
128 let log = self.rt.block_on(LiveLog::open(
129 self.log_path(step),
130 Box::new(move |seq, text| {
131 events::emit(
132 &events,
133 Event::StepLogChunk { run_id: cb_run_id, seq, text: text.to_string() },
134 );
135 }),
136 ));
137
138 events::emit(
139 &self.events,
140 Event::StepStart {
141 run_id,
142 app: self.app.clone(),
143 version: self.version.clone(),
144 target: self.target,
145 step,
146 },
147 );
148
149 *self.current.lock().unwrap() = Some(StepState {
150 run_id,
151 step,
152 log: Arc::new(AsyncMutex::new(log)),
153 });
154 Ok(())
155 }
156
157 /// Finalize the open step (if any): close its log, stamp the DB row, emit
158 /// `StepDone`. Idempotent when no step is open.
159 pub fn finish_step(self: &Arc<Self>, status: Status) -> Result<()> {
160 let st = self.current.lock().unwrap().take();
161 let Some(st) = st else { return Ok(()) };
162 let me = self.clone();
163 self.rt.block_on(async move {
164 // Drop all log refs so the sink can be owned + flushed.
165 if let Ok(m) = Arc::try_unwrap(st.log) {
166 m.into_inner().close().await;
167 }
168 let _ = sqlx::query(
169 "UPDATE step_runs SET status = ?, finished_at = ? WHERE id = ?",
170 )
171 .bind(status.as_str())
172 .bind(Self::now())
173 .bind(st.run_id.0)
174 .execute(&me.pool)
175 .await;
176 });
177 events::emit(
178 &self.events,
179 Event::StepDone {
180 run_id: st.run_id,
181 app: self.app.clone(),
182 target: self.target,
183 step: st.step,
184 status,
185 },
186 );
187 Ok(())
188 }
189
190 /// Ensure a step is open; default to `Build` if a recipe runs a command
191 /// before declaring one.
192 fn ensure_step(self: &Arc<Self>) -> Result<Arc<AsyncMutex<LiveLog>>> {
193 if self.current.lock().unwrap().is_none() {
194 self.begin_step(Step::Build)?;
195 }
196 Ok(self.current.lock().unwrap().as_ref().unwrap().log.clone())
197 }
198
199 /// The step currently open, or `Build` as a default for failure
200 /// attribution before any step was declared.
201 pub fn current_step(&self) -> Step {
202 self.current.lock().unwrap().as_ref().map(|s| s.step).unwrap_or(Step::Build)
203 }
204
205 fn host(&self, name: &str) -> Result<RemoteHost> {
206 self.hosts
207 .get(name)
208 .cloned()
209 .ok_or_else(|| anyhow::anyhow!("unknown build host `{name}` (not in topology)"))
210 }
211
212 /// Run `cmd` on `host`, streaming into the current step's log. Returns
213 /// exit code + a tail of stdout for the recipe to branch on.
214 fn run(self: &Arc<Self>, host: &str, cmd: &str) -> Result<(i32, String)> {
215 let sink = self.ensure_step()?;
216 let host = self.host(host)?;
217 let cmd = cmd.to_string();
218 let out = self.rt.block_on(async move { host.run_streaming(&cmd, sink).await })?;
219 let code = out.status.code().unwrap_or(-1);
220 let stdout = String::from_utf8_lossy(&out.stdout);
221 let tail: String = stdout.chars().rev().take(2000).collect::<Vec<_>>().into_iter().rev().collect();
222 Ok((code, tail))
223 }
224 }
225
226 // ----- error bridging: anyhow -> Rhai runtime error -----
227
228 fn rhai_err(e: impl std::fmt::Display) -> Box<EvalAltResult> {
229 Box::new(EvalAltResult::ErrorRuntime(e.to_string().into(), rhai::Position::NONE))
230 }
231
232 /// Read `tauri.conf.json`'s version for `app`, from its checkout on the daemon
233 /// host. Used by `version_of()` and the runner's default-version path.
234 pub fn version_from_tauri_conf(repo: &str) -> Result<Version> {
235 let path = expand_tilde(repo).join("src-tauri").join("tauri.conf.json");
236 let raw = std::fs::read_to_string(&path)
237 .with_context(|| format!("reading {}", path.display()))?;
238 let v: serde_json::Value = serde_json::from_str(&raw).context("parsing tauri.conf.json")?;
239 let ver = v.get("version").and_then(|x| x.as_str()).context("no `version` in tauri.conf.json")?;
240 Version::parse(ver).map_err(|e| anyhow::anyhow!(e))
241 }
242
243 /// Expand a leading `~/` to `$HOME`. Paths in the topology are written with `~`.
244 pub fn expand_tilde(p: &str) -> PathBuf {
245 if let Some(rest) = p.strip_prefix("~/") {
246 if let Ok(home) = std::env::var("HOME") {
247 return Path::new(&home).join(rest);
248 }
249 }
250 PathBuf::from(p)
251 }
252
253 /// Build a Rhai engine with the host API bound to `ctx`. Sandboxed: recipes
254 /// touch the outside world only through these functions.
255 pub fn build_engine(ctx: Arc<RecipeCtx>) -> Engine {
256 let mut engine = Engine::new();
257 // Defensive caps โ€” recipes are first-party but bound the blast radius.
258 engine.set_max_operations(5_000_000);
259 engine.set_max_call_levels(64);
260 engine.set_max_string_size(0);
261
262 // --- step(name) ---
263 {
264 let ctx = ctx.clone();
265 engine.register_fn("step", move |name: &str| -> Result<(), Box<EvalAltResult>> {
266 let step: Step = name.parse().map_err(rhai_err)?;
267 ctx.begin_step(step).map_err(rhai_err)
268 });
269 }
270
271 // --- sh(host, cmd) -> #{ code, stdout_tail } ---
272 {
273 let ctx = ctx.clone();
274 engine.register_fn("sh", move |host: &str, cmd: &str| -> Result<Map, Box<EvalAltResult>> {
275 let (code, tail) = ctx.run(host, cmd).map_err(rhai_err)?;
276 let mut m = Map::new();
277 m.insert("code".into(), (code as i64).into());
278 m.insert("stdout_tail".into(), tail.into());
279 Ok(m)
280 });
281 }
282
283 // --- sh_ok(host, cmd): run + assert exit 0 ---
284 {
285 let ctx = ctx.clone();
286 engine.register_fn("sh_ok", move |host: &str, cmd: &str| -> Result<(), Box<EvalAltResult>> {
287 let (code, _) = ctx.run(host, cmd).map_err(rhai_err)?;
288 if code != 0 {
289 return Err(rhai_err(format!("command on `{host}` exited {code}: {cmd}")));
290 }
291 Ok(())
292 });
293 }
294
295 // --- log(msg): operator-visible line into the current step's tail ---
296 {
297 let ctx = ctx.clone();
298 engine.register_fn("log", move |msg: &str| -> Result<(), Box<EvalAltResult>> {
299 let sink = ctx.ensure_step().map_err(rhai_err)?;
300 let line = format!("[recipe] {msg}\n");
301 ctx.rt.block_on(async {
302 use ops_core::remote::LogSink;
303 sink.lock().await.write_chunk(line.as_bytes()).await;
304 });
305 Ok(())
306 });
307 }
308
309 // --- version_of(app) -> string ---
310 {
311 let ctx = ctx.clone();
312 engine.register_fn("version_of", move |app: &str| -> Result<String, Box<EvalAltResult>> {
313 // Only the current app is in scope; cross-app reads aren't needed.
314 if app != ctx.app.as_str() {
315 return Err(rhai_err(format!("version_of: `{app}` is not the app being built")));
316 }
317 Ok(ctx.version.to_string())
318 });
319 }
320
321 // --- secret(key) -> string (file under secrets_root; never logged) ---
322 {
323 let ctx = ctx.clone();
324 engine.register_fn("secret", move |key: &str| -> Result<String, Box<EvalAltResult>> {
325 // Guard against path traversal out of secrets_root.
326 if key.contains("..") || key.starts_with('/') {
327 return Err(rhai_err("secret key must be a relative path without `..`"));
328 }
329 let path = ctx.cfg.secrets_root.join(key);
330 std::fs::read_to_string(&path)
331 .map(|s| s.trim_end().to_string())
332 .map_err(|e| rhai_err(format!("secret `{key}`: {e}")))
333 });
334 }
335
336 // --- env(host, key) -> string ---
337 {
338 let ctx = ctx.clone();
339 engine.register_fn("env", move |host: &str, key: &str| -> Result<String, Box<EvalAltResult>> {
340 // Read via the shell so it works on remote hosts too.
341 let (code, tail) = ctx
342 .run(host, &format!("printf '%s' \"${{{key}}}\""))
343 .map_err(rhai_err)?;
344 if code != 0 {
345 return Err(rhai_err(format!("env `{key}` on `{host}` failed")));
346 }
347 Ok(tail.trim().to_string())
348 });
349 }
350
351 // --- collect(host, glob, app, version): pull artifacts to dist_root ---
352 {
353 let ctx = ctx.clone();
354 engine.register_fn(
355 "collect",
356 move |host: &str, glob: &str, app: &str, version: &str| -> Result<(), Box<EvalAltResult>> {
357 ctx.collect(host, glob, app, version).map_err(rhai_err)
358 },
359 );
360 }
361
362 // --- publish(channel, app, target, version, artifact, meta) ---
363 {
364 let ctx = ctx.clone();
365 engine.register_fn(
366 "publish",
367 move |channel: &str, app: &str, target: &str, version: &str, artifact: &str, meta: Map| -> Result<String, Box<EvalAltResult>> {
368 ctx.publish(channel, app, target, version, artifact, meta).map_err(rhai_err)
369 },
370 );
371 }
372
373 // --- macOS signing helpers (run on the named host; exercised once the Mac
374 // keychain blocker clears โ€” see _private/docs/bento/design.md ยง3/ยง7) ---
375 register_macos_fns(&mut engine, &ctx);
376
377 engine
378 }
379
380 impl RecipeCtx {
381 fn collect(self: &Arc<Self>, host: &str, glob: &str, app: &str, version: &str) -> Result<()> {
382 let dest = self.cfg.dist_root.join(app).join(version);
383 let dest_s = dest.to_string_lossy().into_owned();
384 let h = self.host(host)?;
385 // The daemon does the transfer locally: cp for a local host, scp for a
386 // remote one. (Remote glob expansion happens on the remote shell.)
387 let cmd = if h.is_local() {
388 let g = ops_core::remote::sh_quote(glob);
389 let d = ops_core::remote::sh_quote(&dest_s);
390 format!("mkdir -p {d} && cp -vR {g} {d}/")
391 } else {
392 let d = ops_core::remote::sh_quote(&dest_s);
393 format!(
394 "mkdir -p {d} && scp -r {flags} {tgt}:{glob} {d}/",
395 flags = ops_core::remote::SSH_FLAGS.join(" "),
396 tgt = h.ssh_target(),
397 )
398 };
399 // The daemon always runs the transfer itself (local cp or local scp),
400 // regardless of which host built the artifact.
401 let sink = self.ensure_step()?;
402 let local = RemoteHost::new("local");
403 let out = self.rt.block_on(async move { local.run_streaming(&cmd, sink).await })?;
404 if !out.success() {
405 anyhow::bail!("collect failed (exit {:?})", out.status.code());
406 }
407 // Best-effort size accounting for the event.
408 events::emit(
409 &self.events,
410 Event::ArtifactCollected {
411 app: self.app.clone(),
412 target: self.target,
413 path: dest_s,
414 bytes: dir_size(&dest).unwrap_or(0),
415 },
416 );
417 Ok(())
418 }
419
420 fn publish(
421 self: &Arc<Self>,
422 channel: &str,
423 app: &str,
424 target: &str,
425 version: &str,
426 artifact: &str,
427 meta: Map,
428 ) -> Result<String> {
429 let backend = self
430 .ota
431 .get(channel)
432 .ok_or_else(|| anyhow::anyhow!("unknown publish channel `{channel}`"))?;
433 let target: Target = target.parse().map_err(|e: String| anyhow::anyhow!(e))?;
434 let version = Version::parse(version).map_err(|e| anyhow::anyhow!(e))?;
435 let app = AppId::new(app);
436 let notes = meta.get("notes").and_then(|v| v.clone().into_string().ok()).unwrap_or_default();
437 // Resolve the artifact relative to the collected dist dir if not absolute.
438 let artifact_path = {
439 let p = PathBuf::from(artifact);
440 if p.is_absolute() {
441 p
442 } else {
443 self.cfg.dist_root.join(app.as_str()).join(version.to_string()).join(artifact)
444 }
445 };
446 let rel = Release { app: &app, target, version: &version, notes };
447 let receipt = backend
448 .publish(&rel, &artifact_path)
449 .with_context(|| format!("publish to `{channel}`"))?;
450 // Record for idempotency / monotonicity.
451 let me = self.clone();
452 let (app_s, target_s, ver_s, chan_s) =
453 (app.to_string(), target.to_string(), version.to_string(), channel.to_string());
454 self.rt.block_on(async move {
455 let _ = sqlx::query(
456 "INSERT OR IGNORE INTO releases (app, target, version, channel, published_at)
457 VALUES (?, ?, ?, ?, ?)",
458 )
459 .bind(app_s)
460 .bind(target_s)
461 .bind(ver_s)
462 .bind(chan_s)
463 .bind(Self::now())
464 .execute(&me.pool)
465 .await;
466 });
467 events::emit(
468 &self.events,
469 Event::PublishOk { app: self.app.clone(), target: self.target, channel: channel.to_string() },
470 );
471 Ok(receipt)
472 }
473 }
474
475 fn dir_size(p: &Path) -> Option<i64> {
476 let mut total = 0i64;
477 for entry in std::fs::read_dir(p).ok()? {
478 let entry = entry.ok()?;
479 let md = entry.metadata().ok()?;
480 if md.is_file() {
481 total += md.len() as i64;
482 }
483 }
484 Some(total)
485 }
486
487 /// macOS signing/notarization host functions. Thin wrappers over the right
488 /// shell incantations, run on the named host. Not exercised this session (the
489 /// Mac keychain blocker, design ยง3, is uncleared) but staged so the macOS
490 /// recipe runs unmodified once it is.
491 fn register_macos_fns(engine: &mut Engine, ctx: &Arc<RecipeCtx>) {
492 {
493 let ctx = ctx.clone();
494 engine.register_fn(
495 "verify_gatekeeper",
496 move |host: &str, path: &str| -> Result<bool, Box<EvalAltResult>> {
497 let (_, tail) = ctx
498 .run(host, &format!("spctl --assess -vv --type install {} 2>&1 || true", ops_core::remote::sh_quote(path)))
499 .map_err(rhai_err)?;
500 Ok(tail.contains("source=Notarized Developer ID"))
501 },
502 );
503 }
504 {
505 let ctx = ctx.clone();
506 engine.register_fn(
507 "codesign",
508 move |host: &str, identity: &str, path: &str| -> Result<(), Box<EvalAltResult>> {
509 let cmd = format!(
510 "codesign --force --options runtime --timestamp --sign {} {}",
511 ops_core::remote::sh_quote(identity),
512 ops_core::remote::sh_quote(path),
513 );
514 let (code, _) = ctx.run(host, &cmd).map_err(rhai_err)?;
515 if code != 0 {
516 return Err(rhai_err("codesign failed"));
517 }
518 Ok(())
519 },
520 );
521 }
522 {
523 let ctx = ctx.clone();
524 engine.register_fn(
525 "staple",
526 move |host: &str, path: &str| -> Result<(), Box<EvalAltResult>> {
527 let (code, _) = ctx
528 .run(host, &format!("xcrun stapler staple {}", ops_core::remote::sh_quote(path)))
529 .map_err(rhai_err)?;
530 if code != 0 {
531 return Err(rhai_err("stapler failed"));
532 }
533 Ok(())
534 },
535 );
536 }
537 {
538 let ctx = ctx.clone();
539 engine.register_fn(
540 "notarize",
541 move |host: &str, path: &str| -> Result<String, Box<EvalAltResult>> {
542 ctx.notarize(host, path).map_err(rhai_err)
543 },
544 );
545 }
546 {
547 let ctx = ctx.clone();
548 engine.register_fn(
549 "keychain_open",
550 move |host: &str, name: &str| -> Result<(), Box<EvalAltResult>> {
551 // The full build-keychain lifecycle lives in dist/build-keychain.sh
552 // (design ยง7); this drives it by name so the recipe stays short.
553 let (code, _) = ctx
554 .run(host, &format!(". ~/.tauri/passwords.env && ./dist/build-keychain.sh open {}", ops_core::remote::sh_quote(name)))
555 .map_err(rhai_err)?;
556 if code != 0 {
557 return Err(rhai_err("keychain_open failed"));
558 }
559 Ok(())
560 },
561 );
562 }
563 {
564 let ctx = ctx.clone();
565 engine.register_fn(
566 "keychain_close",
567 move |host: &str, name: &str| -> Result<(), Box<EvalAltResult>> {
568 let _ = ctx.run(host, &format!("./dist/build-keychain.sh close {}", ops_core::remote::sh_quote(name)));
569 Ok(())
570 },
571 );
572 }
573 }
574
575 impl RecipeCtx {
576 /// `xcrun notarytool submit --wait` with bounded retry (the one flaky,
577 /// network-bound step). Emits `NotarizeRetry` per attempt.
578 fn notarize(self: &Arc<Self>, host: &str, path: &str) -> Result<String> {
579 const MAX_ATTEMPTS: u32 = 3;
580 let cmd = format!(
581 ". ~/.tauri/passwords.env && xcrun notarytool submit {} \
582 --key \"$NOTARY_KEY\" --key-id \"$NOTARY_KEY_ID\" --issuer \"$NOTARY_ISSUER\" \
583 --wait --output-format json",
584 ops_core::remote::sh_quote(path),
585 );
586 let mut last = String::new();
587 for attempt in 1..=MAX_ATTEMPTS {
588 let (code, tail) = self.run(host, &cmd)?;
589 if code == 0 && tail.contains("\"status\":\"Accepted\"") {
590 return Ok(tail);
591 }
592 last = tail;
593 if attempt < MAX_ATTEMPTS {
594 events::emit(
595 &self.events,
596 Event::NotarizeRetry {
597 app: self.app.clone(),
598 target: self.target,
599 attempt,
600 reason: format!("exit {code}"),
601 },
602 );
603 self.rt.block_on(tokio::time::sleep(std::time::Duration::from_secs(15)));
604 }
605 }
606 anyhow::bail!("notarization failed after {MAX_ATTEMPTS} attempts: {last}")
607 }
608 }
609
610 #[cfg(test)]
611 mod tests {
612 use super::*;
613
614 #[test]
615 fn expand_tilde_handles_home() {
616 unsafe { std::env::set_var("HOME", "/home/test") };
617 assert_eq!(expand_tilde("~/Code/x"), PathBuf::from("/home/test/Code/x"));
618 assert_eq!(expand_tilde("/abs/path"), PathBuf::from("/abs/path"));
619 }
620 }
621