Skip to main content

max / makenotwork

sando+bento: remediate ultra-fuzz Run 1 (CF1-5, serious, minors) Deploy-controller audit remediation across Sando, Bento, and shared ops-exec. Critical: tier-gate fail-closed + post-deploy gate runs; promote/rollback single-flight + atomic tier_state; atomic backup pull + pipefail/ON_ERROR_STOP restore; bearer-token auth on mutators (loopback-or-token bind); TUI panic terminal-restore guards. Serious: canary rollback of touched nodes; worktree HEAD validation; atomic symlink-swap-and-restart; Bento publish gated on supports()+step-success ledger and capability-scoped build hosts (no build-on-prod); 500->400 for unshipped targets; scp glob metachar-escaping; partial-deploy compensation surfaced via tier_state.partial_reason in /state + TUI. Minor: notarize JSON-parse / gatekeeper on-host sentinel; finalize_build backstop; publish non-empty-artifact guard; secret() per-component guard; manual_confirm freshness vs per-deploy clock; bounded log reads; tail() char-boundary; restore password via PGPASSWORD; agent /health pre-flight (Executor::preflight); TUI confirm modal + error-body surfacing. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-12 21:29 UTC
Commit: bd8994851a0e0aa988b92097cd23481fe1d77f37
Parent: 722d138
31 files changed, +3205 insertions, -254 deletions
@@ -168,6 +168,7 @@ dependencies = [
168 168 "metrics",
169 169 "metrics-exporter-prometheus",
170 170 "ops-core",
171 + "ops-exec",
171 172 "reqwest",
172 173 "rhai",
173 174 "semver",
@@ -553,6 +554,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
553 554 checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
554 555
555 556 [[package]]
557 + name = "futures-macro"
558 + version = "0.3.32"
559 + source = "registry+https://github.com/rust-lang/crates.io-index"
560 + checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
561 + dependencies = [
562 + "proc-macro2",
563 + "quote",
564 + "syn",
565 + ]
566 +
567 + [[package]]
556 568 name = "futures-sink"
557 569 version = "0.3.32"
558 570 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -572,6 +584,7 @@ checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
572 584 dependencies = [
573 585 "futures-core",
574 586 "futures-io",
587 + "futures-macro",
575 588 "futures-sink",
576 589 "futures-task",
577 590 "memchr",
@@ -1288,7 +1301,10 @@ version = "0.1.0"
1288 1301 dependencies = [
1289 1302 "anyhow",
1290 1303 "async-trait",
1304 + "futures-util",
1305 + "reqwest",
1291 1306 "serde",
1307 + "serde_json",
1292 1308 "thiserror",
1293 1309 "tokio",
1294 1310 "tracing",
@@ -1641,6 +1657,7 @@ dependencies = [
1641 1657 "base64",
1642 1658 "bytes",
1643 1659 "futures-core",
1660 + "futures-util",
1644 1661 "http",
1645 1662 "http-body",
1646 1663 "http-body-util",
@@ -1660,12 +1677,14 @@ dependencies = [
1660 1677 "sync_wrapper",
1661 1678 "tokio",
1662 1679 "tokio-rustls",
1680 + "tokio-util",
1663 1681 "tower",
1664 1682 "tower-http",
1665 1683 "tower-service",
1666 1684 "url",
1667 1685 "wasm-bindgen",
1668 1686 "wasm-bindgen-futures",
1687 + "wasm-streams",
1669 1688 "web-sys",
1670 1689 "webpki-roots",
1671 1690 ]
@@ -2415,6 +2434,19 @@ dependencies = [
2415 2434 ]
2416 2435
2417 2436 [[package]]
2437 + name = "tokio-util"
2438 + version = "0.7.18"
2439 + source = "registry+https://github.com/rust-lang/crates.io-index"
2440 + checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098"
2441 + dependencies = [
2442 + "bytes",
2443 + "futures-core",
2444 + "futures-sink",
2445 + "pin-project-lite",
2446 + "tokio",
2447 + ]
2448 +
2449 + [[package]]
2418 2450 name = "toml"
2419 2451 version = "0.8.23"
2420 2452 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2796,6 +2828,19 @@ dependencies = [
2796 2828 ]
2797 2829
2798 2830 [[package]]
2831 + name = "wasm-streams"
2832 + version = "0.4.2"
2833 + source = "registry+https://github.com/rust-lang/crates.io-index"
2834 + checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
2835 + dependencies = [
2836 + "futures-util",
2837 + "js-sys",
2838 + "wasm-bindgen",
2839 + "wasm-bindgen-futures",
2840 + "web-sys",
2841 + ]
2842 +
2843 + [[package]]
2799 2844 name = "wasmparser"
2800 2845 version = "0.244.0"
2801 2846 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -10,6 +10,7 @@ path = "src/main.rs"
10 10
11 11 [dependencies]
12 12 ops-core = { path = "../../shared/ops-core" }
13 + ops-exec = { path = "../../shared/ops-exec", features = ["rpc"] }
13 14 axum = { version = "0.8.8", features = ["macros", "ws"] }
14 15 tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread", "net", "signal", "fs", "process", "sync"] }
15 16 serde = { version = "1.0.228", features = ["derive"] }
@@ -13,12 +13,14 @@
13 13 //! which `spawn_blocking` guarantees.
14 14
15 15 use crate::config::Config;
16 - use crate::domain::{AppId, Status, Step, StepRunId, Target, Version};
16 + use crate::domain::{AppId, Platform, Status, Step, StepRunId, Target, Version};
17 17 use crate::events::{self, Event, EventTx};
18 18 use crate::ota::{OtaRegistry, Release};
19 + use crate::state::ExecutorMap;
19 20 use anyhow::{Context as _, Result};
20 21 use ops_core::live_log::LiveLog;
21 22 use ops_core::remote::RemoteHost;
23 + use ops_exec::{Action, ObserveKind, Step as OpStep};
22 24 use rhai::{Engine, EvalAltResult, Map};
23 25 use sqlx::SqlitePool;
24 26 use std::collections::HashMap;
@@ -27,12 +29,35 @@ use std::sync::{Arc, Mutex};
27 29 use tokio::runtime::Handle;
28 30 use tokio::sync::Mutex as AsyncMutex;
29 31
32 + /// The capability label for a command, derived from the open recipe step. A
33 + /// recipe's `sh("mbp", …)` under `step("sign")` becomes an `Action::Sign`, gated
34 + /// by the mac host's `sign` grant — so recipes stay unchanged while every command
35 + /// is capability-checked at its transport. `Verify` is read-only (an observe).
36 + fn action_for(step: Step) -> Action {
37 + match step {
38 + Step::Checkout | Step::Prebuild | Step::Build => Action::Build,
39 + Step::Sign => Action::Sign,
40 + Step::Notarize => Action::Notarize,
41 + Step::Staple => Action::Staple,
42 + Step::Package => Action::Package,
43 + Step::Verify => Action::Observe(ObserveKind::Custom("gatekeeper".into())),
44 + // Publish/Collect run on the daemon, not through a host executor; this
45 + // label only applies if a recipe runs a bare `sh` while one is open.
46 + Step::Publish | Step::Collect => Action::Package,
47 + }
48 + }
49 +
30 50 /// The currently-open step within a recipe run: its DB row id, which step it
31 51 /// is, and the live-log sink that `sh`/`log` stream into.
32 52 struct StepState {
33 53 run_id: StepRunId,
34 54 step: Step,
35 55 log: Arc<AsyncMutex<LiveLog>>,
56 + /// Set when something in the step recorded a hard failure the recipe did
57 + /// not abort on (e.g. `verify_gatekeeper` rejected the artifact but the
58 + /// recipe ignored the bool). Forces the step's recorded status to `Failed`
59 + /// and bars `publish` (the step-success ledger).
60 + failed: bool,
36 61 }
37 62
38 63 /// Everything a recipe's host functions need, shared (Arc) into each closure.
@@ -41,13 +66,28 @@ pub struct RecipeCtx {
41 66 pub version: Version,
42 67 pub target: Target,
43 68 pub target_run_id: i64,
44 - pub hosts: HashMap<String, RemoteHost>,
69 + /// Capability-scoped executor per build host. Recipe commands dispatch
70 + /// through these — the transport (local / ssh / in-session agent) and the
71 + /// capability gate are the executor's, not the engine's.
72 + pub execs: Arc<ExecutorMap>,
73 + /// Host name -> ssh string, used only by `collect` to name the remote scp
74 + /// source for a daemon-local artifact pull. Not an execution path.
75 + pub host_ssh: HashMap<String, String>,
45 76 pub pool: SqlitePool,
46 77 pub events: EventTx,
47 78 pub cfg: Arc<Config>,
48 79 pub ota: Arc<OtaRegistry>,
49 80 pub rt: Handle,
50 81 current: Mutex<Option<StepState>>,
82 + /// Gatekeeper verdict recorded by `verify_gatekeeper`: `None` = never run,
83 + /// `Some(false)` = ran and rejected, `Some(true)` = accepted. `publish`
84 + /// requires `Some(true)` for a macOS/iOS artifact (the proof it is signed +
85 + /// notarized).
86 + gatekeeper_ok: Mutex<Option<bool>>,
87 + /// Steps finalized as `Failed` during this run. A non-empty ledger bars
88 + /// `publish` — an artifact is never shipped after a step failed, even if the
89 + /// recipe ignored the failure and ran on.
90 + failed_steps: Mutex<Vec<Step>>,
51 91 }
52 92
53 93 impl RecipeCtx {
@@ -57,7 +97,8 @@ impl RecipeCtx {
57 97 version: Version,
58 98 target: Target,
59 99 target_run_id: i64,
60 - hosts: HashMap<String, RemoteHost>,
100 + execs: Arc<ExecutorMap>,
101 + host_ssh: HashMap<String, String>,
61 102 pool: SqlitePool,
62 103 events: EventTx,
63 104 cfg: Arc<Config>,
@@ -69,13 +110,16 @@ impl RecipeCtx {
69 110 version,
70 111 target,
71 112 target_run_id,
72 - hosts,
113 + execs,
114 + host_ssh,
73 115 pool,
74 116 events,
75 117 cfg,
76 118 ota,
77 119 rt,
78 120 current: Mutex::new(None),
121 + gatekeeper_ok: Mutex::new(None),
122 + failed_steps: Mutex::new(Vec::new()),
79 123 }
80 124 }
81 125
@@ -150,15 +194,31 @@ impl RecipeCtx {
150 194 run_id,
151 195 step,
152 196 log: Arc::new(AsyncMutex::new(log)),
197 + failed: false,
153 198 });
154 199 Ok(())
155 200 }
156 201
202 + /// Flag the currently-open step as failed (no-op if none is open). Forces
203 + /// its recorded status to `Failed` at `finish_step` and adds it to the
204 + /// publish-barring ledger, even though the recipe kept running.
205 + fn fail_current_step(&self) {
206 + if let Some(st) = self.current.lock().unwrap().as_mut() {
207 + st.failed = true;
208 + }
209 + }
210 +
157 211 /// Finalize the open step (if any): close its log, stamp the DB row, emit
158 - /// `StepDone`. Idempotent when no step is open.
212 + /// `StepDone`. Idempotent when no step is open. A step flagged via
213 + /// [`fail_current_step`] is recorded `Failed` regardless of the requested
214 + /// status, and added to the ledger `publish` consults.
159 215 pub fn finish_step(self: &Arc<Self>, status: Status) -> Result<()> {
160 216 let st = self.current.lock().unwrap().take();
161 217 let Some(st) = st else { return Ok(()) };
218 + let status = if st.failed { Status::Failed } else { status };
219 + if status == Status::Failed {
220 + self.failed_steps.lock().unwrap().push(st.step);
221 + }
162 222 let me = self.clone();
163 223 self.rt.block_on(async move {
164 224 // Drop all log refs so the sink can be owned + flushed.
@@ -202,20 +262,37 @@ impl RecipeCtx {
202 262 self.current.lock().unwrap().as_ref().map(|s| s.step).unwrap_or(Step::Build)
203 263 }
204 264
205 - fn host(&self, name: &str) -> Result<RemoteHost> {
206 - self.hosts
265 + /// The capability-scoped executor for `name`, or an error if the host isn't
266 + /// in the topology.
267 + fn exec(&self, name: &str) -> Result<Arc<dyn ops_exec::Executor>> {
268 + self.execs
207 269 .get(name)
208 270 .cloned()
209 271 .ok_or_else(|| anyhow::anyhow!("unknown build host `{name}` (not in topology)"))
210 272 }
211 273
212 - /// Run `cmd` on `host`, streaming into the current step's log. Returns
274 + /// The ssh string for `name` (for `collect`'s remote scp source).
275 + fn host_ssh(&self, name: &str) -> Result<String> {
276 + self.host_ssh
277 + .get(name)
278 + .cloned()
279 + .ok_or_else(|| anyhow::anyhow!("unknown build host `{name}` (not in topology)"))
280 + }
281 +
282 + /// Run `cmd` on `host` through its capability-scoped executor, streaming into
283 + /// the current step's log. The command's [`Action`] is derived from the open
284 + /// step (see [`action_for`]) and gated at the transport before dispatch — so a
285 + /// `build` step on a host without the `build` grant is denied, and the macOS
286 + /// sign steps ride the in-session `AgentRpc` transport automatically. Returns
213 287 /// exit code + a tail of stdout for the recipe to branch on.
214 288 fn run(self: &Arc<Self>, host: &str, cmd: &str) -> Result<(i32, String)> {
215 289 let sink = self.ensure_step()?;
216 - let host = self.host(host)?;
217 - let cmd = cmd.to_string();
218 - let out = self.rt.block_on(async move { host.run_streaming(&cmd, sink).await })?;
290 + let exec = self.exec(host)?;
291 + let step = OpStep::shell(action_for(self.current_step()), cmd.to_string());
292 + let out = self.rt.block_on(async move {
293 + let mut guard = sink.lock().await;
294 + exec.run_streaming(&step, &mut *guard).await
295 + })?;
219 296 let code = out.status.code().unwrap_or(-1);
220 297 let stdout = String::from_utf8_lossy(&out.stdout);
221 298 let tail: String = stdout.chars().rev().take(2000).collect::<Vec<_>>().into_iter().rev().collect();
@@ -322,9 +399,21 @@ pub fn build_engine(ctx: Arc<RecipeCtx>) -> Engine {
322 399 {
323 400 let ctx = ctx.clone();
324 401 engine.register_fn("secret", move |key: &str| -> Result<String, Box<EvalAltResult>> {
325 - // Guard against path traversal out of secrets_root.
326 - if key.contains("..") || key.starts_with('/') {
327 - return Err(rhai_err("secret key must be a relative path without `..`"));
402 + // Guard against traversal out of secrets_root. Require every path
403 + // component to be `Normal` (rejects `..`, `.`, absolute roots and
404 + // drive prefixes) and forbid backslashes (a literal filename char on
405 + // Linux, but a separator elsewhere) — the per-component strength of
406 + // Sando's `safe()`. A multi-segment key like `app/token` is still
407 + // allowed; `foo..bar` (a legit filename) is no longer falsely blocked.
408 + let safe = !key.is_empty()
409 + && !key.contains('\\')
410 + && std::path::Path::new(key)
411 + .components()
412 + .all(|c| matches!(c, std::path::Component::Normal(_)));
413 + if !safe {
414 + return Err(rhai_err(
415 + "secret key must be a relative path under secrets_root (no `..`, `.`, absolute paths, or backslashes)",
416 + ));
328 417 }
329 418 let path = ctx.cfg.secrets_root.join(key);
330 419 std::fs::read_to_string(&path)
@@ -370,8 +459,12 @@ pub fn build_engine(ctx: Arc<RecipeCtx>) -> Engine {
370 459 );
371 460 }
372 461
373 - // --- macOS signing helpers (run on the named host; exercised once the Mac
374 - // keychain blocker clears — see _private/docs/bento/design.md §3/§7) ---
462 + // --- macOS signing helpers. They dispatch through the named host's
463 + // executor like any other step; when that host is the mac (transport =
464 + // "agent"), codesign/notarize/staple ride the in-session `AgentRpc`
465 + // transport — the only security session where the Developer ID key is
466 + // usable (design §7 "THE WALL"). Capability-gated by the host's `sign`
467 + // grant. ---
375 468 register_macos_fns(&mut engine, &ctx);
376 469
377 470 engine
@@ -381,19 +474,28 @@ impl RecipeCtx {
381 474 fn collect(self: &Arc<Self>, host: &str, glob: &str, app: &str, version: &str) -> Result<()> {
382 475 let dest = self.cfg.dist_root.join(app).join(version);
383 476 let dest_s = dest.to_string_lossy().into_owned();
384 - let h = self.host(host)?;
477 + let ssh = self.host_ssh(host)?;
478 + let is_local = ssh == "local" || ssh.is_empty();
385 479 // The daemon does the transfer locally: cp for a local host, scp for a
386 - // remote one. (Remote glob expansion happens on the remote shell.)
387 - let cmd = if h.is_local() {
480 + // remote one. The glob is sh-quoted on the local side; on the remote side
481 + // the wildcard must stay live for the remote shell to expand, so we quote
482 + // the host prefix and leave the glob unquoted but validated.
483 + let d = ops_core::remote::sh_quote(&dest_s);
484 + let cmd = if is_local {
388 485 let g = ops_core::remote::sh_quote(glob);
389 - let d = ops_core::remote::sh_quote(&dest_s);
390 486 format!("mkdir -p {d} && cp -vR {g} {d}/")
391 487 } else {
392 - let d = ops_core::remote::sh_quote(&dest_s);
488 + // Reject shell metacharacters that would break out of the scp source
489 + // (the wildcard chars `*?[]` and path chars are allowed; `;`, `$`,
490 + // backticks, quotes, whitespace are not).
491 + anyhow::ensure!(
492 + !glob.chars().any(|c| matches!(c, ';' | '&' | '|' | '$' | '`' | '\'' | '"' | '\\' | ' ' | '\n' | '(' | ')' | '<' | '>')),
493 + "collect glob `{glob}` contains shell metacharacters"
494 + );
393 495 format!(
394 496 "mkdir -p {d} && scp -r {flags} {tgt}:{glob} {d}/",
395 497 flags = ops_core::remote::SSH_FLAGS.join(" "),
396 - tgt = h.ssh_target(),
498 + tgt = ops_core::remote::sh_quote(&ssh),
397 499 )
398 500 };
399 501 // The daemon always runs the transfer itself (local cp or local scp),
@@ -433,6 +535,21 @@ impl RecipeCtx {
433 535 let target: Target = target.parse().map_err(|e: String| anyhow::anyhow!(e))?;
434 536 let version = Version::parse(version).map_err(|e| anyhow::anyhow!(e))?;
435 537 let app = AppId::new(app);
538 +
539 + // The backend must actually handle this target (e.g. the desktop updater
540 + // disclaims iOS) — otherwise publish would push an artifact through a
541 + // backend that does not support it.
542 + anyhow::ensure!(
543 + backend.supports(target),
544 + "publish channel `{channel}` does not support target {target}",
545 + );
546 +
547 + // Step-success ledger (the Bento analogue of Sando's gate fail-closed).
548 + {
549 + let failed = self.failed_steps.lock().unwrap();
550 + let gatekeeper = *self.gatekeeper_ok.lock().unwrap();
551 + publish_precondition(target, failed.as_slice(), gatekeeper)?;
552 + }
436 553 let notes = meta.get("notes").and_then(|v| v.clone().into_string().ok()).unwrap_or_default();
437 554 // Resolve the artifact relative to the collected dist dir if not absolute.
438 555 let artifact_path = {
@@ -485,19 +602,68 @@ fn dir_size(p: &Path) -> Option<i64> {
485 602 }
486 603
487 604 /// macOS signing/notarization host functions. Thin wrappers over the right
488 - /// shell incantations, run on the named host. Not exercised this session (the
489 - /// Mac keychain blocker, design §3, is uncleared) but staged so the macOS
490 - /// recipe runs unmodified once it is.
605 + /// shell incantations, dispatched through the named host's executor. On the mac
606 + /// host (`transport = "agent"`) they run via the in-session `ops-agent`, the only
607 + /// context where codesign can use the Developer ID key (design §7 "THE WALL"); a
608 + /// plain SSH session cannot. Each is gated by the host's `sign` capability.
609 + /// The publish step-success gate (CF1 analogue for Bento). Bars `publish` when
610 + /// any prior step failed, OR when a macOS/iOS artifact has not passed
611 + /// Gatekeeper (the proof it is actually signed + notarized). Both fail closed —
612 + /// an unverified or post-failure artifact is never shipped. Pure, so it is
613 + /// exhaustively unit-testable without a runtime.
614 + fn publish_precondition(
615 + target: Target,
616 + failed_steps: &[Step],
617 + gatekeeper_ok: Option<bool>,
618 + ) -> Result<()> {
619 + anyhow::ensure!(
620 + failed_steps.is_empty(),
621 + "refusing to publish {target}: {} prior step(s) failed ({})",
622 + failed_steps.len(),
623 + failed_steps.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", "),
624 + );
625 + if matches!(target.platform, Platform::Macos | Platform::Ios) {
626 + match gatekeeper_ok {
627 + Some(true) => {}
628 + Some(false) => anyhow::bail!(
629 + "refusing to publish {target}: Gatekeeper rejected the artifact (verify_gatekeeper did not accept)"
630 + ),
631 + None => anyhow::bail!(
632 + "refusing to publish {target}: artifact was never verified — verify_gatekeeper must run and accept before publish"
633 + ),
634 + }
635 + }
636 + Ok(())
637 + }
638 +
491 639 fn register_macos_fns(engine: &mut Engine, ctx: &Arc<RecipeCtx>) {
492 640 {
493 641 let ctx = ctx.clone();
494 642 engine.register_fn(
495 643 "verify_gatekeeper",
496 644 move |host: &str, path: &str| -> Result<bool, Box<EvalAltResult>> {
497 - let (_, tail) = ctx
498 - .run(host, &format!("spctl --assess -vv --type install {} 2>&1 || true", ops_core::remote::sh_quote(path)))
499 - .map_err(rhai_err)?;
500 - Ok(tail.contains("source=Notarized Developer ID"))
645 + // spctl has no JSON mode, so assess on-host and decide there,
646 + // emitting an unambiguous sentinel as the final line. We match the
647 + // sentinel rather than substring-hunting `source=Notarized...` in a
648 + // 2000-char tail: truncation only drops the front, so the sentinel
649 + // is always present, and it can't be spoofed by spctl's own prose.
650 + // The full assess output is still streamed to the step log.
651 + let q = ops_core::remote::sh_quote(path);
652 + let cmd = format!(
653 + "out=$(spctl --assess -vv --type install {q} 2>&1); printf '%s\\n' \"$out\"; \
654 + printf '%s' \"$out\" | grep -q 'source=Notarized Developer ID' \
655 + && echo BENTO_GATEKEEPER_OK || echo BENTO_GATEKEEPER_FAIL",
656 + );
657 + let (_, tail) = ctx.run(host, &cmd).map_err(rhai_err)?;
658 + let accepted = tail.contains("BENTO_GATEKEEPER_OK");
659 + // Record the verdict for the publish gate. A rejection also
660 + // fails the step, so the matrix shows red and `publish` is barred
661 + // even if the recipe ignores the returned bool.
662 + *ctx.gatekeeper_ok.lock().unwrap() = Some(accepted);
663 + if !accepted {
664 + ctx.fail_current_step();
665 + }
666 + Ok(accepted)
501 667 },
502 668 );
503 669 }
@@ -586,7 +752,7 @@ impl RecipeCtx {
586 752 let mut last = String::new();
587 753 for attempt in 1..=MAX_ATTEMPTS {
588 754 let (code, tail) = self.run(host, &cmd)?;
589 - if code == 0 && tail.contains("\"status\":\"Accepted\"") {
755 + if code == 0 && notary_accepted(&tail) {
590 756 return Ok(tail);
591 757 }
592 758 last = tail;
@@ -607,6 +773,25 @@ impl RecipeCtx {
607 773 }
608 774 }
609 775
776 + /// True iff `notarytool --output-format json` output reports `status: Accepted`.
777 + /// Isolates the JSON object (`{`..`}`) from any shell-sourcing noise and reads
778 + /// the typed `status` field, rather than substring-matching `"status":"Accepted"`
779 + /// in a possibly-truncated tail — which could match the literal inside an error
780 + /// message or miss it across a whitespace variant. Fails closed: any parse or
781 + /// field miss returns false.
782 + fn notary_accepted(output: &str) -> bool {
783 + let (Some(start), Some(end)) = (output.find('{'), output.rfind('}')) else {
784 + return false;
785 + };
786 + if start > end {
787 + return false;
788 + }
789 + serde_json::from_str::<serde_json::Value>(&output[start..=end])
790 + .ok()
791 + .and_then(|v| v.get("status").and_then(|s| s.as_str()).map(|s| s.eq_ignore_ascii_case("accepted")))
792 + .unwrap_or(false)
793 + }
794 +
610 795 #[cfg(test)]
611 796 mod tests {
612 797 use super::*;
@@ -617,4 +802,70 @@ mod tests {
617 802 assert_eq!(expand_tilde("~/Code/x"), PathBuf::from("/home/test/Code/x"));
618 803 assert_eq!(expand_tilde("/abs/path"), PathBuf::from("/abs/path"));
619 804 }
805 +
806 + // ---- publish step-success gate ----
807 +
808 + fn target(s: &str) -> Target {
809 + s.parse().unwrap()
810 + }
811 +
812 + #[test]
813 + fn publish_gate_blocks_macos_without_verification() {
814 + // Never verified -> blocked, with a message pointing at verify_gatekeeper.
815 + let err = publish_precondition(target("macos/aarch64"), &[], None).unwrap_err();
816 + assert!(format!("{err:#}").contains("never verified"), "{err:#}");
817 + }
818 +
819 + #[test]
820 + fn publish_gate_blocks_macos_when_gatekeeper_rejected() {
821 + let err = publish_precondition(target("macos/aarch64"), &[], Some(false)).unwrap_err();
822 + assert!(format!("{err:#}").contains("Gatekeeper rejected"), "{err:#}");
823 + }
824 +
825 + #[test]
826 + fn publish_gate_allows_macos_when_gatekeeper_accepted() {
827 + publish_precondition(target("macos/aarch64"), &[], Some(true)).unwrap();
828 + // iOS is gated the same way.
829 + publish_precondition(target("ios/universal"), &[], Some(true)).unwrap();
830 + assert!(publish_precondition(target("ios/universal"), &[], None).is_err());
831 + }
832 +
833 + #[test]
834 + fn publish_gate_does_not_require_gatekeeper_for_non_apple_targets() {
835 + // Linux/Windows aren't notarized; no gatekeeper proof needed.
836 + publish_precondition(target("linux/x86_64"), &[], None).unwrap();
837 + publish_precondition(target("windows/x86_64"), &[], None).unwrap();
838 + }
839 +
840 + #[test]
841 + fn publish_gate_blocks_when_any_prior_step_failed() {
842 + // A failed step bars publish on every target, even a verified macOS one.
843 + let err = publish_precondition(target("linux/x86_64"), &[Step::Build], None).unwrap_err();
844 + assert!(format!("{err:#}").contains("prior step(s) failed"), "{err:#}");
845 + assert!(format!("{err:#}").contains("build"), "names the failed step: {err:#}");
846 +
847 + let err = publish_precondition(target("macos/aarch64"), &[Step::Sign], Some(true)).unwrap_err();
848 + assert!(format!("{err:#}").contains("prior step(s) failed"), "{err:#}");
849 + }
850 +
851 + #[test]
852 + fn notary_accepted_parses_status_field() {
853 + assert!(notary_accepted(r#"{"id":"abc","status":"Accepted","message":"ok"}"#));
854 + // Embedded in shell-sourcing noise: the object is isolated and parsed.
855 + assert!(notary_accepted("sourcing env...\n{\n \"status\": \"Accepted\"\n}\nbye"));
856 + // Whitespace variant that a tight substring `"status":"Accepted"` misses.
857 + assert!(notary_accepted(r#"{ "status" : "Accepted" }"#));
858 + }
859 +
860 + #[test]
861 + fn notary_accepted_rejects_non_accepted_and_garbage() {
862 + assert!(!notary_accepted(r#"{"status":"Invalid"}"#));
863 + assert!(!notary_accepted(r#"{"status":"In Progress"}"#));
864 + assert!(!notary_accepted("no json here"));
865 + assert!(!notary_accepted("")); // empty / truncated -> fail closed
866 + // A truncated tail whose opening brace was cut off cannot parse -> closed.
867 + assert!(!notary_accepted(r#""status":"Accepted"}"#));
868 + // The literal appearing inside an error string must NOT pass as success.
869 + assert!(!notary_accepted(r#"{"status":"Invalid","message":"expected status:Accepted"}"#));
870 + }
620 871 }
@@ -29,6 +29,26 @@ async fn main() -> Result<()> {
29 29
30 30 let prom = metrics::init();
31 31 let addr: SocketAddr = cfg.listen.parse()?;
32 +
33 + // Build-API auth (CF2). Token from BENTO_API_TOKEN (systemd EnvironmentFile);
34 + // refuse to expose build triggers unauthenticated on a non-loopback bind.
35 + let api_token: Option<Arc<str>> = std::env::var("BENTO_API_TOKEN")
36 + .ok()
37 + .map(|s| s.trim().to_string())
38 + .filter(|s| !s.is_empty())
39 + .map(|s| Arc::from(s.as_str()));
40 + if api_token.is_none() && !addr.ip().is_loopback() {
41 + anyhow::bail!(
42 + "BENTO_API_TOKEN is unset but listen={addr} is not loopback; refusing to expose build \
43 + triggers unauthenticated. Set BENTO_API_TOKEN (via EnvironmentFile) or bind 127.0.0.1."
44 + );
45 + }
46 + match &api_token {
47 + Some(_) => tracing::info!("build endpoints require a bearer token"),
48 + None => tracing::warn!(%addr, "BENTO_API_TOKEN unset; build endpoints are UNAUTHENTICATED (loopback bind)"),
49 + }
50 +
51 + let executors = Arc::new(state::build_executors(&topo));
32 52 let app_state = state::AppState {
33 53 pool,
34 54 topo,
@@ -36,7 +56,9 @@ async fn main() -> Result<()> {
36 56 prom,
37 57 events: events::channel(),
38 58 ota: Arc::new(ota::OtaRegistry::standard(mnw_base_url())),
59 + executors,
39 60 active: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
61 + api_token,
40 62 };
41 63 let app = routes::router(app_state);
42 64 tracing::info!(%addr, "bento daemon listening");
@@ -13,7 +13,7 @@
13 13 //! and returns a descriptive receipt without transferring bytes.
14 14
15 15 use crate::domain::{AppId, Target, Version};
16 - use anyhow::Result;
16 + use anyhow::{Context, Result};
17 17 use std::collections::HashMap;
18 18 use std::path::Path;
19 19
@@ -85,8 +85,15 @@ impl OtaBackend for TauriMnwBackend {
85 85 }
86 86
87 87 fn publish(&self, rel: &Release, artifact: &Path) -> Result<String> {
88 - // Guardrail: never publish an artifact that isn't there.
89 - anyhow::ensure!(artifact.exists(), "artifact {} does not exist", artifact.display());
88 + // Guardrail: never publish an artifact that is missing OR present-but-empty
89 + // (a truncated/zero-byte collect would otherwise pass a bare `exists()`).
90 + // Full integrity (minisign .sig verification against the trusted pubkey)
91 + // lands with the P2 upload wiring below — this size floor is the cheap
92 + // pre-check that catches a failed transfer.
93 + let meta = std::fs::metadata(artifact)
94 + .with_context(|| format!("artifact {} is not accessible", artifact.display()))?;
95 + anyhow::ensure!(meta.is_file(), "artifact {} is not a regular file", artifact.display());
96 + anyhow::ensure!(meta.len() > 0, "artifact {} is empty (0 bytes)", artifact.display());
90 97 // P2: POST the release + upload the artifact + its minisign .sig to the
91 98 // MNW OTA API at `self.base_url`. Until then, report what would happen
92 99 // so the recipe path is exercised without a half-published release.
@@ -125,4 +132,22 @@ mod tests {
125 132 let rel = Release { app: &app, target: "macos/aarch64".parse().unwrap(), version: &ver, notes: String::new() };
126 133 assert!(b.publish(&rel, Path::new("/no/such/file")).is_err());
127 134 }
135 +
136 + #[test]
137 + fn publish_refuses_empty_artifact() {
138 + let reg = OtaRegistry::standard("https://makenot.work");
139 + let b = reg.get("tauri-mnw").unwrap();
140 + let app = AppId::new("goingson");
141 + let ver = Version::parse("0.4.1").unwrap();
142 + let rel = Release { app: &app, target: "macos/aarch64".parse().unwrap(), version: &ver, notes: String::new() };
143 + let tmp = tempfile::tempdir().unwrap();
144 + // Zero-byte file: exists() would pass, but a truncated collect must not publish.
145 + let empty = tmp.path().join("app.dmg");
146 + std::fs::write(&empty, b"").unwrap();
147 + let err = b.publish(&rel, &empty).unwrap_err();
148 + assert!(format!("{err:#}").contains("empty"), "{err:#}");
149 + // A non-empty file is accepted (P2 stub returns the would-publish receipt).
150 + std::fs::write(&empty, b"x").unwrap();
151 + assert!(b.publish(&rel, &empty).is_ok());
152 + }
128 153 }
@@ -20,16 +20,67 @@ use sqlx::Row;
20 20
21 21 pub fn router(state: AppState) -> Router {
22 22 let prom = state.prom.clone();
23 - Router::new()
24 - .route("/state", get(get_state))
23 + let token = state.api_token.clone();
24 +
25 + // Build triggers require a bearer token (when configured); reads stay open
26 + // so the TUI's state/event/log polling needs no credential (CF2).
27 + let mutating = Router::new()
25 28 .route("/build", post(build))
26 29 .route("/retry", post(retry))
30 + .route_layer(axum::middleware::from_fn(move |req, next| {
31 + require_bearer(token.clone(), req, next)
32 + }));
33 +
34 + let open = Router::new()
35 + .route("/state", get(get_state))
27 36 .route("/logs/{app}/{version}/{target}/{step}", get(get_step_log))
28 - .route("/events", get(events_ws))
37 + .route("/events", get(events_ws));
38 +
39 + Router::new()
40 + .merge(mutating)
41 + .merge(open)
29 42 .with_state(state)
30 43 .route("/metrics", get(crate::metrics::render).with_state(prom))
31 44 }
32 45
46 + /// Bearer-token gate for the build triggers. No token configured -> pass
47 + /// (main() only allows that on a loopback bind). Constant-time comparison.
48 + async fn require_bearer(
49 + token: Option<std::sync::Arc<str>>,
50 + req: axum::extract::Request,
51 + next: axum::middleware::Next,
52 + ) -> axum::response::Response {
53 + let Some(expected) = token.as_deref() else {
54 + return next.run(req).await;
55 + };
56 + let path = req.uri().path().to_string();
57 + let ok = req
58 + .headers()
59 + .get(axum::http::header::AUTHORIZATION)
60 + .and_then(|h| h.to_str().ok())
61 + .and_then(|h| h.strip_prefix("Bearer "))
62 + .is_some_and(|t| ct_eq(t, expected));
63 + if ok {
64 + next.run(req).await
65 + } else {
66 + tracing::warn!(path = %path, "rejected unauthenticated request to a build trigger");
67 + (axum::http::StatusCode::UNAUTHORIZED, "missing or invalid bearer token\n").into_response()
68 + }
69 + }
70 +
71 + /// Constant-time string compare (length may leak; bytes do not short-circuit).
72 + fn ct_eq(a: &str, b: &str) -> bool {
73 + let (a, b) = (a.as_bytes(), b.as_bytes());
74 + if a.len() != b.len() {
75 + return false;
76 + }
77 + let mut diff = 0u8;
78 + for (x, y) in a.iter().zip(b) {
79 + diff |= x ^ y;
80 + }
81 + diff == 0
82 + }
83 +
33 84 #[derive(Serialize)]
34 85 struct StateView {
35 86 build: Option<BuildView>,
@@ -138,8 +189,8 @@ struct BuildBody {
138 189 async fn build(State(s): State<AppState>, Json(body): Json<BuildBody>) -> Result<Json<serde_json::Value>> {
139 190 let app = AppId::new(body.app);
140 191 let targets = parse_targets(body.targets)?;
141 - let version = runner::resolve_version(&s, &app, body.version).map_err(Error::Other)?;
142 - let targets = runner::resolve_targets(&s, &app, targets).map_err(Error::Other)?;
192 + let version = runner::resolve_version(&s, &app, body.version)?;
193 + let targets = runner::resolve_targets(&s, &app, targets)?;
143 194 let build_id = runner::start_build(s, app, version.clone(), targets)
144 195 .await
145 196 .map_err(Error::Other)?;
@@ -157,8 +208,8 @@ struct RetryBody {
157 208 async fn retry(State(s): State<AppState>, Json(body): Json<RetryBody>) -> Result<Json<serde_json::Value>> {
158 209 let app = AppId::new(body.app);
159 210 let target: Target = body.target.parse().map_err(Error::BadRequest)?;
160 - let version = runner::resolve_version(&s, &app, body.version).map_err(Error::Other)?;
161 - let targets = runner::resolve_targets(&s, &app, vec![target]).map_err(Error::Other)?;
211 + let version = runner::resolve_version(&s, &app, body.version)?;
212 + let targets = runner::resolve_targets(&s, &app, vec![target])?;
162 213 let build_id = runner::start_build(s, app, version.clone(), targets)
163 214 .await
164 215 .map_err(Error::Other)?;
@@ -252,6 +303,7 @@ targets = ["linux/x86_64"]
252 303 "#,
253 304 )
254 305 .unwrap();
306 + let executors = Arc::new(crate::state::build_executors(&topo));
255 307 AppState {
256 308 pool,
257 309 topo: Arc::new(topo),
@@ -259,7 +311,9 @@ targets = ["linux/x86_64"]
259 311 prom: crate::metrics::test_handle(),
260 312 events: crate::events::channel(),
261 313 ota: Arc::new(OtaRegistry::standard("https://makenot.work")),
314 + executors,
262 315 active: Arc::new(Mutex::new(HashMap::new())),
316 + api_token: None,
263 317 }
264 318 }
265 319
@@ -280,6 +334,62 @@ targets = ["linux/x86_64"]
280 334 assert_eq!(body_string(resp).await, r#"{"build":null}"#);
281 335 }
282 336
337 + // ---- CF2: bearer-token auth on build triggers ----
338 +
339 + #[tokio::test]
340 + async fn build_route_requires_bearer_when_token_set() {
341 + let tmp = tempfile::tempdir().unwrap();
342 + let mut state = test_state(tmp.path()).await;
343 + state.api_token = Some(std::sync::Arc::from("s3cr3t"));
344 + let app = router(state);
345 +
346 + let build_req = || {
347 + Request::builder()
348 + .method("POST")
349 + .uri("/build")
350 + .header("content-type", "application/json")
351 + .body(Body::from(r#"{"app":"goingson"}"#))
352 + .unwrap()
353 + };
354 +
355 + // No token -> 401.
356 + let resp = app.clone().oneshot(build_req()).await.unwrap();
357 + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
358 +
359 + // Wrong token -> 401.
360 + let mut bad = build_req();
361 + bad.headers_mut().insert("authorization", "Bearer nope".parse().unwrap());
362 + let resp = app.clone().oneshot(bad).await.unwrap();
363 + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
364 +
365 + // Correct token -> passes auth (NOT 401; the build itself is accepted).
366 + let mut good = build_req();
367 + good.headers_mut().insert("authorization", "Bearer s3cr3t".parse().unwrap());
368 + let resp = app.clone().oneshot(good).await.unwrap();
369 + assert_ne!(resp.status(), StatusCode::UNAUTHORIZED);
370 + }
371 +
372 + #[tokio::test]
373 + async fn read_route_open_when_token_set() {
374 + let tmp = tempfile::tempdir().unwrap();
375 + let mut state = test_state(tmp.path()).await;
376 + state.api_token = Some(std::sync::Arc::from("s3cr3t"));
377 + let app = router(state);
378 + let resp = app
379 + .oneshot(Request::builder().uri("/state").body(Body::empty()).unwrap())
380 + .await
381 + .unwrap();
382 + assert_eq!(resp.status(), StatusCode::OK);
383 + }
384 +
385 + #[test]
386 + fn ct_eq_matches_only_identical_strings() {
387 + assert!(ct_eq("abc", "abc"));
388 + assert!(!ct_eq("abc", "abd"));
389 + assert!(!ct_eq("abc", "ab"));
390 + assert!(ct_eq("", ""));
391 + }
392 +
283 393 #[tokio::test]
284 394 async fn step_log_rejects_traversal() {
285 395 let tmp = tempfile::tempdir().unwrap();
@@ -297,7 +407,51 @@ targets = ["linux/x86_64"]
297 407 }
298 408
299 409 #[tokio::test]
300 - async fn build_rejects_unknown_target() {
410 + async fn build_rejects_unshipped_target_as_bad_request() {
411 + // windows/x86_64 is a valid target but the test app doesn't ship it.
412 + // That's a client error (400), not a server error (500).
413 + let tmp = tempfile::tempdir().unwrap();
414 + let app = router(test_state(tmp.path()).await);
415 + let resp = app
416 + .oneshot(
417 + Request::builder()
418 + .method("POST")
419 + .uri("/build")
420 + .header("content-type", "application/json")
421 + // Explicit version so resolve_version doesn't read a
422 + // (nonexistent) tauri.conf first — we're testing the target
423 + // validation specifically.
424 + .body(Body::from(r#"{"app":"goingson","version":"0.4.1","targets":["windows/x86_64"]}"#))
425 + .unwrap(),
426 + )
427 + .await
428 + .unwrap();
429 + assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
430 + assert!(body_string(resp).await.contains("does not ship target"), "names the problem");
431 + }
432 +
433 + #[tokio::test]
434 + async fn build_rejects_unknown_app_as_bad_request() {
435 + let tmp = tempfile::tempdir().unwrap();
436 + let app = router(test_state(tmp.path()).await);
437 + let resp = app
438 + .oneshot(
439 + Request::builder()
440 + .method("POST")
441 + .uri("/build")
442 + .header("content-type", "application/json")
443 + .body(Body::from(r#"{"app":"nope","targets":["linux/x86_64"]}"#))
444 + .unwrap(),
445 + )
446 + .await
447 + .unwrap();
448 + assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
449 + assert!(body_string(resp).await.contains("unknown app"), "names the problem");
450 + }
451 +
452 + #[tokio::test]
453 + async fn build_rejects_malformed_target_as_bad_request() {
454 + // A non-parseable target string is also a 400 (via parse_targets).
301 455 let tmp = tempfile::tempdir().unwrap();
302 456 let app = router(test_state(tmp.path()).await);
303 457 let resp = app
@@ -306,12 +460,11 @@ targets = ["linux/x86_64"]
306 460 .method("POST")
307 461 .uri("/build")
308 462 .header("content-type", "application/json")
309 - .body(Body::from(r#"{"app":"goingson","targets":["windows/x86_64"]}"#))
463 + .body(Body::from(r#"{"app":"goingson","targets":["not-a-target"]}"#))
310 464 .unwrap(),
311 465 )
312 466 .await
313 467 .unwrap();
314 - // windows/x86_64 isn't shipped by the test app -> 500 (Other).
315 - assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
468 + assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
316 469 }
317 470 }
@@ -15,23 +15,49 @@ use std::path::PathBuf;
15 15 use std::sync::Arc;
16 16
17 17 /// Resolve the version to build: explicit, or read from `tauri.conf.json`.
18 - pub fn resolve_version(state: &AppState, app: &AppId, explicit: Option<String>) -> Result<Version> {
18 + ///
19 + /// Returns typed errors so the route maps user mistakes (unknown app, bad
20 + /// version string) to 400, and only genuine daemon-side failures (reading the
21 + /// app's `tauri.conf.json`) to 500.
22 + pub fn resolve_version(
23 + state: &AppState,
24 + app: &AppId,
25 + explicit: Option<String>,
26 + ) -> crate::error::Result<Version> {
27 + use crate::error::Error;
19 28 if let Some(v) = explicit {
20 - return Version::parse(&v).map_err(|e| anyhow::anyhow!(e));
29 + return Version::parse(&v).map_err(Error::BadRequest);
21 30 }
22 - let cfg = state.topo.app(app).ok_or_else(|| anyhow::anyhow!("unknown app `{app}`"))?;
23 - engine::version_from_tauri_conf(&cfg.repo)
31 + let cfg = state
32 + .topo
33 + .app(app)
34 + .ok_or_else(|| Error::BadRequest(format!("unknown app `{app}`")))?;
35 + engine::version_from_tauri_conf(&cfg.repo).map_err(Error::Other)
24 36 }
25 37
26 - /// Validate + default the target list against what the app ships.
27 - pub fn resolve_targets(state: &AppState, app: &AppId, requested: Vec<Target>) -> Result<Vec<Target>> {
28 - let cfg = state.topo.app(app).ok_or_else(|| anyhow::anyhow!("unknown app `{app}`"))?;
38 + /// Validate + default the target list against what the app ships. Unknown app,
39 + /// an unshipped target, or a target no host can build are all client errors
40 + /// (400), not server errors (500).
41 + pub fn resolve_targets(
42 + state: &AppState,
43 + app: &AppId,
44 + requested: Vec<Target>,
45 + ) -> crate::error::Result<Vec<Target>> {
46 + use crate::error::Error;
47 + let cfg = state
48 + .topo
49 + .app(app)
50 + .ok_or_else(|| Error::BadRequest(format!("unknown app `{app}`")))?;
29 51 if requested.is_empty() {
30 52 return Ok(cfg.targets.clone());
31 53 }
32 54 for t in &requested {
33 - anyhow::ensure!(cfg.targets.contains(t), "app `{app}` does not ship target {t}");
34 - anyhow::ensure!(state.topo.host_for(*t).is_some(), "no host can build {t}");
55 + if !cfg.targets.contains(t) {
56 + return Err(Error::BadRequest(format!("app `{app}` does not ship target {t}")));
57 + }
58 + if state.topo.host_for(*t).is_none() {
59 + return Err(Error::BadRequest(format!("no host can build {t}")));
60 + }
35 61 }
36 62 Ok(requested)
37 63 }
@@ -122,12 +148,26 @@ async fn run_target(state: AppState, build_id: i64, app: AppId, version: Version
122 148 }
123 149 };
124 150
151 + // Pre-flight the build host before running the recipe. For local/ssh hosts
152 + // this is a no-op; for the agent host (macOS in-session signing) it hits
153 + // `/health` so a dead ops-agent fails here with a clear message — matching
154 + // what the driver does — instead of erroring opaquely on the recipe's first
155 + // dispatch to that host.
156 + if let Some(host) = state.topo.host_for(target)
157 + && let Some(exec) = state.executors.get(&host.name)
158 + && let Err(e) = exec.preflight().await
159 + {
160 + fail_target(&state, target_run_id, &app, &version, target, Step::Checkout, &format!("{e:#}")).await;
161 + return;
162 + }
163 +
125 164 let ctx = Arc::new(RecipeCtx::new(
126 165 app.clone(),
127 166 version.clone(),
128 167 target,
129 168 target_run_id,
130 - state.topo.remote_hosts(),
169 + state.executors.clone(),
170 + state.topo.host_ssh(),
131 171 state.pool.clone(),
132 172 state.events.clone(),
133 173 state.cfg.clone(),
@@ -204,6 +244,11 @@ async fn fail_target(
204 244 /// Wait for all target runs of a build to leave `running`, then stamp the
205 245 /// build's terminal status.
206 246 async fn finalize_build(state: AppState, build_id: i64) {
247 + // Generous backstop: a build (sign + notarize can be slow) shouldn't take
248 + // more than this. Without it, a target task SIGKILLed before it stamps its
249 + // terminal status leaves a `running` row forever and this loop never exits.
250 + const MAX_WAIT: std::time::Duration = std::time::Duration::from_secs(2 * 60 * 60);
251 + let start = std::time::Instant::now();
207 252 loop {
208 253 let running: i64 = sqlx::query_scalar(
209 254 "SELECT COUNT(*) FROM target_runs WHERE build_id = ? AND status = 'running'",
@@ -215,6 +260,21 @@ async fn finalize_build(state: AppState, build_id: i64) {
215 260 if running == 0 {
216 261 break;
217 262 }
263 + if start.elapsed() >= MAX_WAIT {
264 + // Force the stragglers failed so the build can finalize and the row
265 + // reflects reality (a vanished worker, not a clean success).
266 + let _ = sqlx::query(
267 + "UPDATE target_runs SET status = 'failed', \
268 + error = COALESCE(error, 'worker vanished before finishing (finalize timeout)'), \
269 + finished_at = ? WHERE build_id = ? AND status = 'running'",
270 + )
271 + .bind(chrono::Utc::now().to_rfc3339())
272 + .bind(build_id)
273 + .execute(&state.pool)
274 + .await;
275 + tracing::error!(build_id, "finalize_build timed out waiting on target_runs; marked stragglers failed");
276 + break;
277 + }
218 278 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
219 279 }
220 280 let failed: i64 = sqlx::query_scalar(
@@ -308,6 +368,7 @@ targets = ["linux/x86_64"]
308 368 ))
309 369 .unwrap();
310 370
371 + let executors = Arc::new(crate::state::build_executors(&topo));
311 372 let state = AppState {
312 373 pool: pool.clone(),
313 374 topo: Arc::new(topo),
@@ -315,7 +376,9 @@ targets = ["linux/x86_64"]
315 376 prom: crate::metrics::test_handle(),
316 377 events: crate::events::channel(),
317 378 ota: Arc::new(OtaRegistry::standard("https://makenot.work")),
379 + executors,
318 380 active: Arc::new(Mutex::new(HashMap::new())),
381 + api_token: None,
319 382 };
320 383
321 384 let app = AppId::new("demo");
@@ -324,14 +387,17 @@ targets = ["linux/x86_64"]
324 387 .await
325 388 .unwrap();
326 389
327 - // Wait for the target run to settle.
390 + // Wait for the target run to settle. The row may not be inserted on the
391 + // first poll (run_target is spawned, not awaited), so treat a missing row
392 + // as still-pending rather than an error.
328 393 let mut status = String::new();
329 394 for _ in 0..100 {
330 395 status = sqlx::query_scalar("SELECT status FROM target_runs WHERE build_id = ?")
331 396 .bind(build_id)
332 - .fetch_one(&pool)
397 + .fetch_optional(&pool)
333 398 .await
334 - .unwrap();
399 + .unwrap()
400 + .unwrap_or_else(|| "running".to_string());
335 401 if status != "running" {
336 402 break;
337 403 }
@@ -357,4 +423,104 @@ targets = ["linux/x86_64"]
357 423 assert!(log.exists(), "build step log should exist");
358 424 assert!(std::fs::read_to_string(&log).unwrap().contains("compiling"));
359 425 }
426 +
427 + /// The audit fix: a `build` step dispatched to a host whose executor lacks the
428 + /// `build` capability is denied at the transport BEFORE the command runs. This
429 + /// is the structural guarantee behind "never build on prod" — a recipe naming
430 + /// the wrong host can't compile there.
431 + #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
432 + async fn build_on_a_host_without_the_build_grant_is_denied() {
433 + let tmp = tempfile::tempdir().unwrap();
434 + let root = tmp.path();
435 +
436 + let repo = root.join("app");
437 + std::fs::create_dir_all(repo.join("src-tauri")).unwrap();
438 + std::fs::write(repo.join("src-tauri/tauri.conf.json"), r#"{"version":"0.0.1"}"#).unwrap();
439 + std::fs::create_dir_all(repo.join("dist/recipes")).unwrap();
440 + // The recipe (wrongly) tries to compile on `prod`, a host with no build
441 + // grant. A marker file would appear if the command actually ran.
442 + let marker = root.join("ran-on-prod");
443 + std::fs::write(
444 + repo.join("dist/recipes/linux.rhai"),
445 + r#"
446 + step("build");
447 + sh_ok("prod", "touch MARKER");
448 + "#
449 + .replace("MARKER", marker.to_str().unwrap()),
450 + )
451 + .unwrap();
452 +
453 + let cfg = Config::for_tests(root);
454 + let pool = crate::db::open(&cfg.db_path).await.unwrap();
455 + // fw13 builds linux; `prod` is local-but-restart-only (no build/package).
456 + let topo: Topology = toml::from_str(
457 + r#"
458 + [[host]]
459 + name = "fw13"
460 + ssh = "local"
461 + targets = ["linux/x86_64"]
462 +
463 + [[host]]
464 + name = "prod"
465 + ssh = "local"
466 + actuate = ["restart"]
467 + observe = []
468 +
469 + [app.demo]
470 + repo = "REPO"
471 + targets = ["linux/x86_64"]
472 + "#
473 + .replace("REPO", repo.to_str().unwrap())
474 + .as_str(),
475 + )
476 + .unwrap();
477 +
478 + let executors = Arc::new(crate::state::build_executors(&topo));
479 + let state = AppState {
480 + pool: pool.clone(),
481 + topo: Arc::new(topo),
482 + cfg: Arc::new(cfg),
483 + prom: crate::metrics::test_handle(),
484 + events: crate::events::channel(),
485 + ota: Arc::new(OtaRegistry::standard("https://makenot.work")),
486 + executors,
487 + active: Arc::new(Mutex::new(HashMap::new())),
488 + api_token: None,
489 + };
490 +
491 + let build_id = start_build(
492 + state.clone(),
493 + AppId::new("demo"),
494 + Version::parse("0.0.1").unwrap(),
495 + vec!["linux/x86_64".parse().unwrap()],
496 + )
497 + .await
498 + .unwrap();
499 +
500 + let mut status = String::new();
501 + let mut error = String::new();
502 + for _ in 0..100 {
503 + let row: Option<(String, Option<String>)> =
504 + sqlx::query_as("SELECT status, error FROM target_runs WHERE build_id = ?")
505 + .bind(build_id)
506 + .fetch_optional(&pool)
507 + .await
508 + .unwrap();
509 + if let Some((s, e)) = row {
510 + status = s;
511 + error = e.unwrap_or_default();
512 + if status != "running" {
513 + break;
514 + }
515 + }
516 + tokio::time::sleep(std::time::Duration::from_millis(50)).await;
517 + }
518 +
519 + assert_eq!(status, "failed", "build on an ungranted host must fail");
520 + assert!(
521 + error.contains("capability denied") && error.contains("build"),
522 + "failure must be a capability denial, got: {error}"
523 + );
524 + assert!(!marker.exists(), "denied build step must NOT have executed");
525 + }
360 526 }
@@ -2,14 +2,22 @@ use crate::config::Config;
2 2 use crate::domain::{AppId, Target};
3 3 use crate::events::EventTx;
4 4 use crate::ota::OtaRegistry;
5 - use crate::topology::Topology;
5 + use crate::topology::{Host, HostTransport, Topology};
6 6 use metrics_exporter_prometheus::PrometheusHandle;
7 + use ops_exec::{AgentRpc, CapabilitySet, Executor, LocalExec, SshExec};
7 8 use sqlx::SqlitePool;
8 9 use std::collections::HashMap;
9 10 use std::sync::Arc;
10 11 use tokio::sync::Mutex;
11 12 use tokio::task::AbortHandle;
12 13
14 + /// Per-host executors keyed by host name, built once from the topology at
15 + /// startup. The recipe engine looks a host's executor up here instead of
16 + /// constructing ssh/scp invocations inline — capability-scoped, transport
17 + /// chosen per host (local / ssh / in-session agent). Mirrors Sando's
18 + /// `ExecutorMap`.
19 + pub type ExecutorMap = HashMap<String, Arc<dyn Executor>>;
20 +
13 21 #[derive(Clone)]
14 22 pub struct AppState {
15 23 pub pool: SqlitePool,
@@ -18,8 +26,78 @@ pub struct AppState {
18 26 pub prom: PrometheusHandle,
19 27 pub events: EventTx,
20 28 pub ota: Arc<OtaRegistry>,
29 + /// One capability-scoped [`Executor`] per build host, from the topology.
30 + pub executors: Arc<ExecutorMap>,
31 + /// Bearer token required on the build-triggering routes (`/build`,
32 + /// `/retry`). Sourced from `BENTO_API_TOKEN` (systemd EnvironmentFile).
33 + /// `None` = unauthenticated, which main() permits only on a loopback bind
34 + /// (CF2).
35 + pub api_token: Option<Arc<str>>,
21 36 /// Single-slot guard per `(app, target)`: a newer build for the same
22 37 /// target aborts the in-flight one (latest request wins), mirroring
23 38 /// Sando's `active_build`. Other targets keep running — that's the fan-out.
24 39 pub active: Arc<Mutex<HashMap<(AppId, Target), AbortHandle>>>,
25 40 }
41 +
42 + /// Build one host's executor: `LocalExec` for `ssh = "local"`, `AgentRpc` for an
43 + /// agent-transport host (macOS in-session signing), `SshExec` otherwise — each
44 + /// granted exactly the host's declared capabilities.
45 + pub fn build_executor(host: &Host) -> Arc<dyn Executor> {
46 + let caps = CapabilitySet::from_tokens(&host.actuate, &host.observe);
47 + match host.transport {
48 + HostTransport::Agent => {
49 + // validate() guarantees agent_url is set for agent hosts.
50 + let url = host.agent_url.clone().unwrap_or_default();
51 + Arc::new(AgentRpc::new(url, host.name.clone(), caps))
52 + }
53 + HostTransport::Ssh if host.ssh == "local" || host.ssh.is_empty() => {
54 + Arc::new(LocalExec::new(caps))
55 + }
56 + HostTransport::Ssh => Arc::new(SshExec::new(host.ssh.clone(), caps)),
57 + }
58 + }
59 +
60 + /// Build the full host name -> executor map from the topology.
61 + pub fn build_executors(topo: &Topology) -> ExecutorMap {
62 + topo.hosts.iter().map(|h| (h.name.clone(), build_executor(h))).collect()
63 + }
64 +
65 + #[cfg(test)]
66 + mod tests {
67 + use super::*;
68 + use ops_exec::Action;
69 +
70 + fn host(toml_host: &str) -> Host {
71 + // Parse a single [[host]] table by wrapping it in a topology fragment.
72 + let topo: Topology = toml::from_str(&format!(
73 + "{toml_host}\n[app.x]\nrepo = \"/x\"\ntargets = []\n"
74 + ))
75 + .unwrap();
76 + topo.hosts.into_iter().next().unwrap()
77 + }
78 +
79 + #[test]
80 + fn build_host_executor_permits_build_not_sign() {
81 + let h = host("[[host]]\nname = \"fw13\"\nssh = \"local\"\ntargets = [\"linux/x86_64\"]");
82 + let exec = build_executor(&h);
83 + assert!(exec.capabilities().permits(&Action::Build));
84 + assert!(exec.capabilities().permits(&Action::Package));
85 + assert!(!exec.capabilities().permits(&Action::Sign));
86 + }
87 +
88 + #[test]
89 + fn agent_host_executor_permits_sign() {
90 + // The mac host: agent transport, widened grant. Proves AgentRpc is
91 + // constructed (no panic) and carries the sign capability that the
92 + // SSH/local transports' default grant does not.
93 + let h = host(
94 + "[[host]]\nname = \"mbp\"\nssh = \"mbp\"\ntargets = [\"macos/aarch64\"]\n\
95 + transport = \"agent\"\nagent_url = \"http://mbp:8765\"\n\
96 + actuate = [\"build\", \"sign\", \"notarize\", \"staple\"]",
97 + );
98 + let exec = build_executor(&h);
99 + assert!(exec.capabilities().permits(&Action::Sign));
100 + assert!(exec.capabilities().permits(&Action::Notarize));
101 + assert!(exec.capabilities().permits(&Action::Build));
102 + }
103 + }
@@ -7,7 +7,6 @@
7 7
8 8 use crate::domain::{AppId, Target};
9 9 use anyhow::{Context, Result};
10 - use ops_core::remote::RemoteHost;
11 10 use serde::Deserialize;
12 11 use std::collections::HashMap;
13 12 use std::path::Path;
@@ -21,6 +20,20 @@ pub struct Topology {
21 20 pub app: HashMap<String, AppConfig>,
22 21 }
23 22
23 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
24 + #[serde(rename_all = "snake_case")]
25 + pub enum HostTransport {
26 + /// SSH push (or local, when `ssh = "local"`). Every Linux/Windows step and
27 + /// every non-signing macOS step.
28 + #[default]
29 + Ssh,
30 + /// In-session `ops-agent` over HTTP (`agent_url`). Required for macOS
31 + /// sign/notarize/staple — codesign can only use the Developer ID key from
32 + /// the Aqua GUI session (design §7 "THE WALL"), which a plain SSH session
33 + /// cannot reach.
34 + Agent,
35 + }
36 +
24 37 #[derive(Debug, Clone, Deserialize)]
25 38 pub struct Host {
26 39 pub name: String,
@@ -30,6 +43,31 @@ pub struct Host {
30 43 /// that lists it — this is how no-cross-compile is enforced structurally.
31 44 #[serde(default)]
32 45 pub targets: Vec<Target>,
46 + /// How `bentod` reaches this host to run steps (see [`HostTransport`]).
47 + #[serde(default)]
48 + pub transport: HostTransport,
49 + /// Base URL of this host's `ops-agent`, e.g. `http://mbp:8765`. Required
50 + /// when `transport = "agent"`; ignored otherwise.
51 + #[serde(default)]
52 + pub agent_url: Option<String>,
53 + /// Actuate capabilities this host's executor is granted (`build`, `sign`,
54 + /// …). Defaults to a plain build host so an existing `bento.toml` keeps
55 + /// loading; the mac host widens this to sign/notarize/staple.
56 + #[serde(default = "default_actuate")]
57 + pub actuate: Vec<String>,
58 + /// Observe capabilities (read-only host inspection).
59 + #[serde(default = "default_observe")]
60 + pub observe: Vec<String>,
61 + }
62 +
63 + /// Every build host can, by definition, build and package. Keeping these the
64 + /// defaults lets an existing `bento.toml` (which only declared name/ssh/targets)
65 + /// load unchanged through the executor refactor.
66 + fn default_actuate() -> Vec<String> {
67 + vec!["build".into(), "package".into()]
68 + }
69 + fn default_observe() -> Vec<String> {
70 + vec!["build-log".into()]
33 71 }
34 72
35 73 #[derive(Debug, Clone, Deserialize)]
@@ -72,6 +110,20 @@ impl Topology {
72 110 }
73 111 }
74 112 }
113 + // Capability/transport coherence: a host that declares buildable targets
114 + // must be granted `build` (otherwise its own recipes would be denied at
115 + // dispatch), and an agent-transport host must say where its agent is.
116 + for h in &self.hosts {
117 + if !h.targets.is_empty() && !h.actuate.iter().any(|a| a == "build") {
118 + anyhow::bail!(
119 + "host `{}` declares buildable targets but is not granted the `build` capability",
120 + h.name
121 + );
122 + }
123 + if h.transport == HostTransport::Agent && h.agent_url.is_none() {
124 + anyhow::bail!("host `{}` uses transport = \"agent\" but sets no agent_url", h.name);
125 + }
126 + }
75 127 Ok(())
76 128 }
77 129
@@ -80,9 +132,11 @@ impl Topology {
80 132 self.hosts.iter().find(|h| h.targets.contains(&target))
81 133 }
82 134
83 - /// Map of host name -> `RemoteHost`, for the recipe `sh(host, cmd)` API.
84 - pub fn remote_hosts(&self) -> HashMap<String, RemoteHost> {
85 - self.hosts.iter().map(|h| (h.name.clone(), RemoteHost::new(h.ssh.clone()))).collect()
135 + /// Map of host name -> ssh string, for `collect`'s daemon-local scp source.
136 + /// (Command execution goes through the capability-scoped executors built in
137 + /// `state::build_executors`, not through this map.)
138 + pub fn host_ssh(&self) -> HashMap<String, String> {
139 + self.hosts.iter().map(|h| (h.name.clone(), h.ssh.clone())).collect()
86 140 }
87 141
88 142 pub fn app(&self, app: &AppId) -> Option<&AppConfig> {
@@ -141,10 +195,75 @@ targets = ["windows/x86_64"]
141 195 }
142 196
143 197 #[test]
144 - fn remote_hosts_marks_local() {
198 + fn host_ssh_maps_names_to_ssh_strings() {
145 199 let t = load(SAMPLE).unwrap();
146 - let hosts = t.remote_hosts();
147 - assert!(hosts["fw13"].is_local());
148 - assert!(!hosts["mbp"].is_local());
200 + let ssh = t.host_ssh();
201 + assert_eq!(ssh["fw13"], "local");
202 + assert_eq!(ssh["mbp"], "mbp");
203 + }
204 +
205 + #[test]
206 + fn capability_defaults_make_a_build_host() {
207 + let t = load(SAMPLE).unwrap();
208 + let fw13 = t.hosts.iter().find(|h| h.name == "fw13").unwrap();
209 + assert_eq!(fw13.transport, HostTransport::Ssh);
210 + assert!(fw13.actuate.contains(&"build".to_string()));
211 + assert!(fw13.actuate.contains(&"package".to_string()));
212 + }
213 +
214 + #[test]
215 + fn agent_transport_parses_with_url_and_caps() {
216 + let t = load(
217 + r#"
218 + [[host]]
219 + name = "mbp"
220 + ssh = "mbp"
221 + targets = ["macos/aarch64"]
222 + transport = "agent"
223 + agent_url = "http://mbp:8765"
224 + actuate = ["build", "sign", "notarize", "staple"]
225 +
226 + [app.goingson]
227 + repo = "x"
228 + targets = ["macos/aarch64"]
229 + "#,
230 + )
231 + .unwrap();
232 + let mbp = &t.hosts[0];
233 + assert_eq!(mbp.transport, HostTransport::Agent);
234 + assert_eq!(mbp.agent_url.as_deref(), Some("http://mbp:8765"));
235 + assert!(mbp.actuate.contains(&"sign".to_string()));
236 + }
237 +
238 + #[test]
239 + fn agent_host_without_url_is_rejected() {
240 + let bad = r#"
241 + [[host]]
242 + name = "mbp"
243 + ssh = "mbp"
244 + targets = ["macos/aarch64"]
245 + transport = "agent"
246 +
247 + [app.goingson]
248 + repo = "x"
249 + targets = ["macos/aarch64"]
250 + "#;
251 + assert!(load(bad).is_err());
252 + }
253 +
254 + #[test]
255 + fn build_host_without_build_capability_is_rejected() {
256 + let bad = r#"
257 + [[host]]
258 + name = "fw13"
259 + ssh = "local"
260 + targets = ["linux/x86_64"]
261 + actuate = ["package"]
262 +
263 + [app.goingson]
264 + repo = "x"
265 + targets = ["linux/x86_64"]
266 + "#;
267 + assert!(load(bad).is_err());
149 268 }
150 269 }
@@ -3,7 +3,7 @@ name = "bento-driver"
3 3 version = "0.1.0"
4 4 edition = "2024"
5 5 license = "MIT"
6 - description = "Thin agent-driven release driver: runs the proven macOS recipe (build -> sign -> notarize -> staple -> verify) on a build host via the ops-exec executor, then pulls the artifact. The full bentod/TUI orchestrator is deferred (launchplan §J)."
6 + description = "Thin agent-driven release driver: runs the proven macOS recipe (build -> sign -> notarize -> staple -> verify) on a build host via the ops-exec executor, then pulls the artifact. A focused one-shot path; bentod (daemon + TUI) now runs the same recipes through the same ops-exec executors, including AgentRpc for the in-session macOS sign steps."
7 7
8 8 [[bin]]
9 9 name = "bento-release-macos"
@@ -102,7 +102,9 @@ async fn main() -> Result<()> {
102 102 }
103 103 println!("\n==> Gatekeeper accepted; pulling DMG");
104 104
105 - tokio::fs::create_dir_all(&cfg.local.dest_dir).await.ok();
105 + tokio::fs::create_dir_all(&cfg.local.dest_dir)
106 + .await
107 + .with_context(|| format!("creating dest dir {}", cfg.local.dest_dir.display()))?;
106 108 let local_dmg = cfg.local.dest_dir.join(&dmg_name);
107 109 exec.pull(
108 110 std::path::Path::new(&outcome.dmg_remote),
@@ -0,0 +1,3546 @@
1 + # This file is automatically @generated by Cargo.
2 + # It is not intended for manual editing.
3 + version = 4
4 +
5 + [[package]]
6 + name = "ahash"
7 + version = "0.8.12"
8 + source = "registry+https://github.com/rust-lang/crates.io-index"
9 + checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
10 + dependencies = [
11 + "cfg-if",
12 + "const-random",
13 + "getrandom 0.3.4",
14 + "once_cell",
15 + "version_check",
16 + "zerocopy",
17 + ]
18 +
19 + [[package]]
20 + name = "aho-corasick"
21 + version = "1.1.4"
22 + source = "registry+https://github.com/rust-lang/crates.io-index"
23 + checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
24 + dependencies = [
25 + "memchr",
26 + ]
27 +
28 + [[package]]
29 + name = "allocator-api2"
30 + version = "0.2.21"
31 + source = "registry+https://github.com/rust-lang/crates.io-index"
32 + checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
33 +
34 + [[package]]
35 + name = "android_system_properties"
36 + version = "0.1.5"
37 + source = "registry+https://github.com/rust-lang/crates.io-index"
38 + checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
39 + dependencies = [
40 + "libc",
41 + ]
42 +
43 + [[package]]
44 + name = "anyhow"
45 + version = "1.0.102"
46 + source = "registry+https://github.com/rust-lang/crates.io-index"
47 + checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
48 +
49 + [[package]]
50 + name = "async-trait"
51 + version = "0.1.89"
52 + source = "registry+https://github.com/rust-lang/crates.io-index"
53 + checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
54 + dependencies = [
55 + "proc-macro2",
56 + "quote",
57 + "syn",
58 + ]
59 +
60 + [[package]]
61 + name = "atoi"
62 + version = "2.0.0"
63 + source = "registry+https://github.com/rust-lang/crates.io-index"
64 + checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528"
65 + dependencies = [
66 + "num-traits",
67 + ]
68 +
69 + [[package]]
70 + name = "atomic-waker"
71 + version = "1.1.2"
72 + source = "registry+https://github.com/rust-lang/crates.io-index"
73 + checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
74 +
75 + [[package]]
76 + name = "autocfg"
77 + version = "1.5.1"
78 + source = "registry+https://github.com/rust-lang/crates.io-index"
79 + checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53"
80 +
81 + [[package]]
82 + name = "axum"
83 + version = "0.8.9"
84 + source = "registry+https://github.com/rust-lang/crates.io-index"
85 + checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90"
86 + dependencies = [
87 + "axum-core",
88 + "axum-macros",
89 + "base64",
90 + "bytes",
91 + "form_urlencoded",
92 + "futures-util",
93 + "http",
94 + "http-body",
95 + "http-body-util",
96 + "hyper",
97 + "hyper-util",
98 + "itoa",
99 + "matchit",
100 + "memchr",
101 + "mime",
102 + "percent-encoding",
103 + "pin-project-lite",
104 + "serde_core",
105 + "serde_json",
106 + "serde_path_to_error",
107 + "serde_urlencoded",
108 + "sha1",
109 + "sync_wrapper",
110 + "tokio",
111 + "tokio-tungstenite 0.29.0",
112 + "tower",
113 + "tower-layer",
114 + "tower-service",
115 + "tracing",
116 + ]
117 +
118 + [[package]]
119 + name = "axum-core"
120 + version = "0.5.6"
121 + source = "registry+https://github.com/rust-lang/crates.io-index"
122 + checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
123 + dependencies = [
124 + "bytes",
125 + "futures-core",
126 + "http",
127 + "http-body",
128 + "http-body-util",
129 + "mime",
130 + "pin-project-lite",
131 + "sync_wrapper",
132 + "tower-layer",
133 + "tower-service",
134 + "tracing",
135 + ]
136 +
137 + [[package]]
138 + name = "axum-macros"
139 + version = "0.5.1"
140 + source = "registry+https://github.com/rust-lang/crates.io-index"
141 + checksum = "7aa268c23bfbbd2c4363b9cd302a4f504fb2a9dfe7e3451d66f35dd392e20aca"
142 + dependencies = [
143 + "proc-macro2",
144 + "quote",
145 + "syn",
146 + ]
147 +
148 + [[package]]
149 + name = "base64"
150 + version = "0.22.1"
151 + source = "registry+https://github.com/rust-lang/crates.io-index"
152 + checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
153 +
154 + [[package]]
155 + name = "base64ct"
156 + version = "1.8.3"
157 + source = "registry+https://github.com/rust-lang/crates.io-index"
158 + checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06"
159 +
160 + [[package]]
161 + name = "bento-daemon"
162 + version = "0.1.0"
163 + dependencies = [
164 + "anyhow",
165 + "axum",
166 + "chrono",
167 + "metrics",
168 + "metrics-exporter-prometheus",
169 + "ops-core",
170 + "ops-exec",
171 + "rhai",
172 + "semver",
173 + "serde",
174 + "serde_json",
175 + "sqlx",
176 + "thiserror 2.0.18",
177 + "tokio",
178 + "toml",
179 + "tracing",
180 + "tracing-subscriber",
181 + ]
182 +
183 + [[package]]
184 + name = "bento-tui"
185 + version = "0.1.0"
186 + dependencies = [
187 + "anyhow",
188 + "bento-daemon",
189 + "chrono",
190 + "crossterm",
191 + "futures-util",
192 + "ratatui",
193 + "reqwest",
194 + "serde",
195 + "serde_json",
196 + "tokio",
197 + "tokio-tungstenite 0.24.0",
198 + ]
199 +
200 + [[package]]
201 + name = "bitflags"
202 + version = "2.13.0"
203 + source = "registry+https://github.com/rust-lang/crates.io-index"
204 + checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8"
205 + dependencies = [
206 + "serde_core",
207 + ]
208 +
209 + [[package]]
210 + name = "block-buffer"
211 + version = "0.10.4"
212 + source = "registry+https://github.com/rust-lang/crates.io-index"
213 + checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
214 + dependencies = [
215 + "generic-array",
216 + ]
217 +
218 + [[package]]
219 + name = "bumpalo"
220 + version = "3.20.3"
221 + source = "registry+https://github.com/rust-lang/crates.io-index"
222 + checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649"
223 +
224 + [[package]]
225 + name = "byteorder"
226 + version = "1.5.0"
227 + source = "registry+https://github.com/rust-lang/crates.io-index"
228 + checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
229 +
230 + [[package]]
231 + name = "bytes"
232 + version = "1.11.1"
233 + source = "registry+https://github.com/rust-lang/crates.io-index"
234 + checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
235 +
236 + [[package]]
237 + name = "cassowary"
238 + version = "0.3.0"
239 + source = "registry+https://github.com/rust-lang/crates.io-index"
240 + checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53"
241 +
242 + [[package]]
243 + name = "castaway"
244 + version = "0.2.4"
245 + source = "registry+https://github.com/rust-lang/crates.io-index"
246 + checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a"
247 + dependencies = [
248 + "rustversion",
249 + ]
250 +
251 + [[package]]
252 + name = "cc"
253 + version = "1.2.64"
254 + source = "registry+https://github.com/rust-lang/crates.io-index"
255 + checksum = "dad887fd958be91b5098c0248def011f4523ab786cd411be668777e55063501f"
256 + dependencies = [
257 + "find-msvc-tools",
258 + "shlex",
259 + ]
260 +
261 + [[package]]
262 + name = "cfg-if"
263 + version = "1.0.4"
264 + source = "registry+https://github.com/rust-lang/crates.io-index"
265 + checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
266 +
267 + [[package]]
268 + name = "cfg_aliases"
269 + version = "0.2.1"
270 + source = "registry+https://github.com/rust-lang/crates.io-index"
271 + checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
272 +
273 + [[package]]
274 + name = "chrono"
275 + version = "0.4.45"
276 + source = "registry+https://github.com/rust-lang/crates.io-index"
277 + checksum = "1aa79e62e7697b8e29b513a68abacf485adcd1fe8284a4316c5ae868e6633327"
278 + dependencies = [
279 + "iana-time-zone",
280 + "js-sys",
281 + "num-traits",
282 + "serde",
283 + "wasm-bindgen",
284 + "windows-link",
285 + ]
286 +
287 + [[package]]
288 + name = "compact_str"
289 + version = "0.8.2"
290 + source = "registry+https://github.com/rust-lang/crates.io-index"
291 + checksum = "7fd622ebbb56a5b2ccb651b32b911cdeb2a9b4b11776b2473bf26a26a286244e"
292 + dependencies = [
293 + "castaway",
294 + "cfg-if",
295 + "itoa",
296 + "rustversion",
297 + "ryu",
298 + "static_assertions",
299 + ]
300 +
301 + [[package]]
302 + name = "concurrent-queue"
303 + version = "2.5.0"
304 + source = "registry+https://github.com/rust-lang/crates.io-index"
305 + checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
306 + dependencies = [
307 + "crossbeam-utils",
308 + ]
309 +
310 + [[package]]
311 + name = "const-oid"
312 + version = "0.9.6"
313 + source = "registry+https://github.com/rust-lang/crates.io-index"
314 + checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
315 +
316 + [[package]]
317 + name = "const-random"
318 + version = "0.1.18"
319 + source = "registry+https://github.com/rust-lang/crates.io-index"
320 + checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
321 + dependencies = [
322 + "const-random-macro",
323 + ]
324 +
325 + [[package]]
326 + name = "const-random-macro"
327 + version = "0.1.16"
328 + source = "registry+https://github.com/rust-lang/crates.io-index"
329 + checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
330 + dependencies = [
331 + "getrandom 0.2.17",
332 + "once_cell",
333 + "tiny-keccak",
334 + ]
335 +
336 + [[package]]
337 + name = "core-foundation-sys"
338 + version = "0.8.7"
339 + source = "registry+https://github.com/rust-lang/crates.io-index"
340 + checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
341 +
342 + [[package]]
343 + name = "cpufeatures"
344 + version = "0.2.17"
345 + source = "registry+https://github.com/rust-lang/crates.io-index"
346 + checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
347 + dependencies = [
348 + "libc",
349 + ]
350 +
351 + [[package]]
352 + name = "crc"
353 + version = "3.4.0"
354 + source = "registry+https://github.com/rust-lang/crates.io-index"
355 + checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d"
356 + dependencies = [
357 + "crc-catalog",
358 + ]
359 +
360 + [[package]]
361 + name = "crc-catalog"
362 + version = "2.5.0"
363 + source = "registry+https://github.com/rust-lang/crates.io-index"
364 + checksum = "217698eaf96b4a3f0bc4f3662aaa55bdf913cd54d7204591faa790070c6d0853"
365 +
366 + [[package]]
367 + name = "crossbeam-epoch"
368 + version = "0.9.18"
369 + source = "registry+https://github.com/rust-lang/crates.io-index"
370 + checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
371 + dependencies = [
372 + "crossbeam-utils",
373 + ]
374 +
375 + [[package]]
376 + name = "crossbeam-queue"
377 + version = "0.3.12"
378 + source = "registry+https://github.com/rust-lang/crates.io-index"
379 + checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
380 + dependencies = [
381 + "crossbeam-utils",
382 + ]
383 +
384 + [[package]]
385 + name = "crossbeam-utils"
386 + version = "0.8.21"
387 + source = "registry+https://github.com/rust-lang/crates.io-index"
388 + checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
389 +
390 + [[package]]
391 + name = "crossterm"
392 + version = "0.28.1"
393 + source = "registry+https://github.com/rust-lang/crates.io-index"
394 + checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6"
395 + dependencies = [
396 + "bitflags",
397 + "crossterm_winapi",
398 + "mio",
399 + "parking_lot",
400 + "rustix",
401 + "signal-hook",
402 + "signal-hook-mio",
403 + "winapi",
404 + ]
405 +
406 + [[package]]
407 + name = "crossterm_winapi"
408 + version = "0.9.1"
409 + source = "registry+https://github.com/rust-lang/crates.io-index"
410 + checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b"
411 + dependencies = [
412 + "winapi",
413 + ]
414 +
415 + [[package]]
416 + name = "crunchy"
417 + version = "0.2.4"
418 + source = "registry+https://github.com/rust-lang/crates.io-index"
419 + checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
420 +
421 + [[package]]
422 + name = "crypto-common"
423 + version = "0.1.7"
424 + source = "registry+https://github.com/rust-lang/crates.io-index"
425 + checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a"
426 + dependencies = [
427 + "generic-array",
428 + "typenum",
429 + ]
430 +
431 + [[package]]
432 + name = "darling"
433 + version = "0.23.0"
434 + source = "registry+https://github.com/rust-lang/crates.io-index"
435 + checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d"
436 + dependencies = [
437 + "darling_core",
438 + "darling_macro",
439 + ]
440 +
441 + [[package]]
442 + name = "darling_core"
443 + version = "0.23.0"
444 + source = "registry+https://github.com/rust-lang/crates.io-index"
445 + checksum = "9865a50f7c335f53564bb694ef660825eb8610e0a53d3e11bf1b0d3df31e03b0"
446 + dependencies = [
447 + "ident_case",
448 + "proc-macro2",
449 + "quote",
450 + "strsim",
451 + "syn",
452 + ]
453 +
454 + [[package]]
455 + name = "darling_macro"
456 + version = "0.23.0"
457 + source = "registry+https://github.com/rust-lang/crates.io-index"
458 + checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d"
459 + dependencies = [
460 + "darling_core",
461 + "quote",
462 + "syn",
463 + ]
464 +
465 + [[package]]
466 + name = "data-encoding"
467 + version = "2.11.0"
468 + source = "registry+https://github.com/rust-lang/crates.io-index"
469 + checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8"
470 +
471 + [[package]]
472 + name = "der"
473 + version = "0.7.10"
474 + source = "registry+https://github.com/rust-lang/crates.io-index"
475 + checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb"
476 + dependencies = [
477 + "const-oid",
478 + "pem-rfc7468",
479 + "zeroize",
480 + ]
481 +
482 + [[package]]
483 + name = "digest"
484 + version = "0.10.7"
485 + source = "registry+https://github.com/rust-lang/crates.io-index"
486 + checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
487 + dependencies = [
488 + "block-buffer",
489 + "const-oid",
490 + "crypto-common",
491 + "subtle",
492 + ]
493 +
494 + [[package]]
495 + name = "displaydoc"
496 + version = "0.2.6"
497 + source = "registry+https://github.com/rust-lang/crates.io-index"
498 + checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f"
499 + dependencies = [
500 + "proc-macro2",
Lines truncated
@@ -205,6 +205,27 @@ impl Shared {
205 205
206 206 // ---------- main ----------
207 207
208 + /// Leave raw mode + the alternate screen and show the cursor. Best-effort:
209 + /// every step is attempted regardless of earlier failures, so a partially
210 + /// entered terminal is still restored as far as possible.
211 + fn restore_terminal() {
212 + let _ = disable_raw_mode();
213 + let _ = crossterm::execute!(io::stdout(), LeaveAlternateScreen, crossterm::cursor::Show);
214 + }
215 +
216 + /// RAII guard that restores the terminal on drop — covering normal return, a
217 + /// `?` early-exit, AND unwinding from a panic (e.g. a poisoned `Mutex` lock).
218 + /// Without this, any panic between `enable_raw_mode` and the manual teardown
219 + /// left the operator's shell wedged in raw mode + alt screen until a blind
220 + /// `reset`. Pairs with the panic hook installed in `main`.
221 + struct TerminalGuard;
222 +
223 + impl Drop for TerminalGuard {
224 + fn drop(&mut self) {
225 + restore_terminal();
226 + }
227 + }
228 +
208 229 fn main() -> Result<()> {
209 230 let daemon = std::env::var("BENTO_DAEMON").unwrap_or_else(|_| "http://127.0.0.1:7800".into());
210 231 let default_app = std::env::var("BENTO_APP").unwrap_or_else(|_| "goingson".into());
@@ -219,18 +240,24 @@ fn main() -> Result<()> {
219 240 rt.spawn(state_poller(daemon.clone(), shared.clone()));
220 241 rt.spawn(events_subscriber(daemon.clone(), shared.clone()));
221 242
243 + // Restore the terminal on panic BEFORE the default hook prints, so the
244 + // panic message + backtrace land on the normal screen rather than being
245 + // swallowed by the abandoned alternate screen.
246 + let default_hook = std::panic::take_hook();
247 + std::panic::set_hook(Box::new(move |info| {
248 + restore_terminal();
249 + default_hook(info);
250 + }));
251 +
222 252 enable_raw_mode()?;
223 253 let mut stdout = io::stdout();
224 254 crossterm::execute!(stdout, EnterAlternateScreen)?;
225 255 let backend = CrosstermBackend::new(stdout);
226 256 let mut term = Terminal::new(backend)?;
257 + // From here on the guard owns teardown (normal, `?`, or panic).
258 + let _guard = TerminalGuard;
227 259
228 - let res = ui_loop(&mut term, &daemon, &default_app, &shared, rt.handle());
229 -
230 - disable_raw_mode()?;
231 - crossterm::execute!(term.backend_mut(), LeaveAlternateScreen)?;
232 - term.show_cursor()?;
233 - res
260 + ui_loop(&mut term, &daemon, &default_app, &shared, rt.handle())
234 261 }
235 262
236 263 // ---------- background tasks ----------
@@ -384,19 +411,25 @@ async fn dispatch_action(daemon: &str, act: &Action) -> Result<String> {
384 411 (format!("{daemon}/retry"), serde_json::json!({ "app": app, "target": target }))
385 412 }
386 413 };
387 - let resp = client
388 - .post(&url)
389 - .timeout(Duration::from_secs(30))
390 - .json(&body)
391 - .send()
392 - .await
393 - .context("send")?;
414 + let mut req = client.post(&url).timeout(Duration::from_secs(30)).json(&body);
415 + // Build triggers require a bearer token when the daemon is configured with
416 + // one (CF2). Operator supplies it via BENTO_API_TOKEN.
417 + if let Ok(tok) = std::env::var("BENTO_API_TOKEN") {
418 + let tok = tok.trim();
419 + if !tok.is_empty() {
420 + req = req.header(reqwest::header::AUTHORIZATION, format!("Bearer {tok}"));
421 + }
422 + }
423 + let resp = req.send().await.context("send")?;
394 424 let status = resp.status();
395 - let text = resp.text().await.unwrap_or_default();
396 - if status.is_success() {
397 - Ok(truncate(&text, 120))
398 - } else {
399 - Err(anyhow::anyhow!("HTTP {status}: {}", truncate(&text, 200)))
425 + // Distinguish a body-read failure from an empty body: the daemon's structured
426 + // error message is the operator's only signal on a rejected build, and silently
427 + // turning a read hiccup into "" renders as a contentless "HTTP 409: ".
428 + match (status.is_success(), resp.text().await) {
429 + (true, Ok(text)) => Ok(truncate(&text, 120)),
430 + (true, Err(e)) => Err(anyhow::anyhow!("HTTP {status} but response body unreadable: {e}")),
431 + (false, Ok(text)) => Err(anyhow::anyhow!("HTTP {status}: {}", truncate(&text, 200))),
432 + (false, Err(e)) => Err(anyhow::anyhow!("HTTP {status} (response body unreadable: {e})")),
400 433 }
401 434 }
402 435
@@ -0,0 +1,19 @@
1 + -- Record when a tier is left in a partial / mixed-version state.
2 + --
3 + -- A canary promote that fails mid-rollout tries to roll the touched nodes
4 + -- back to the prior version (CF3 canary rollback). That compensation is
5 + -- best-effort: a node whose rollback also fails, or a first-deploy failure
6 + -- with no prior version to restore, leaves the tier genuinely inconsistent —
7 + -- some nodes on the new (broken) version, some on the old. The rollback
8 + -- endpoint has the same exposure if a node deploy fails partway through the
9 + -- fleet.
10 + --
11 + -- Before this column that condition was invisible: `tier_state` still showed
12 + -- a single `current_version` (never advanced on failure), so `/state` and the
13 + -- TUI reported a clean tier while the fleet was split-brain — the operator
14 + -- only learned of it by reading `deploys` rows or SSHing the nodes.
15 + --
16 + -- `partial_reason` is NULL when the tier is consistent and a human-readable
17 + -- explanation when it is not. It is set by the promote/rollback failure paths
18 + -- and cleared by any subsequent clean full promote or rollback of the tier.
19 + ALTER TABLE tier_state ADD COLUMN partial_reason TEXT;
@@ -82,6 +82,43 @@ pub(crate) fn parse_source(s: &str) -> Result<BackupSource> {
82 82 bail!("unsupported backup source scheme: {s}");
83 83 }
84 84
85 + /// Backups smaller than this are treated as a failed/truncated transfer rather
86 + /// than a real dump. The prod backup is ~885M; even a dev fixture is well above
87 + /// this. The floor exists to catch an empty or header-only file (an rsync that
88 + /// wrote zero bytes, an empty source); `gzip -t` catches mid-stream truncation
89 + /// of a `.gz`.
90 + const MIN_BACKUP_BYTES: u64 = 64;
91 +
92 + /// Verify a freshly-downloaded backup before it is allowed to become the live
93 + /// dump: reject anything implausibly small, and for a gzip require a complete,
94 + /// valid stream (`gzip -t` fails on truncation/corruption). `is_gz` is taken
95 + /// from the *destination* name, not the temp path (which carries a `.partial`
96 + /// suffix).
97 + async fn verify_backup(tmp_path: &str, is_gz: bool) -> Result<()> {
98 + let meta = tokio::fs::metadata(tmp_path)
99 + .await
100 + .with_context(|| format!("stat fetched backup {tmp_path}"))?;
101 + anyhow::ensure!(
102 + meta.len() >= MIN_BACKUP_BYTES,
103 + "fetched backup {tmp_path} is implausibly small ({} bytes); treating as a failed/truncated transfer",
104 + meta.len(),
105 + );
106 + if is_gz {
107 + let out = Command::new("gzip")
108 + .arg("-t")
109 + .arg(tmp_path)
110 + .output()
111 + .await
112 + .with_context(|| format!("spawning gzip -t {tmp_path}"))?;
113 + anyhow::ensure!(
114 + out.status.success(),
115 + "fetched backup {tmp_path} failed gzip integrity check (truncated/corrupt): {}",
116 + String::from_utf8_lossy(&out.stderr),
117 + );
118 + }
119 + Ok(())
120 + }
121 +
85 122 pub async fn fetch(
86 123 pool: &SqlitePool,
87 124 _cfg: &Arc<Config>,
@@ -94,47 +131,70 @@ pub async fn fetch(
94 131 tokio::fs::create_dir_all(parent).await?;
95 132 }
96 133
134 + // Download to a sibling temp path, verify integrity, then atomically rename
135 + // into place. The live `local_path` is never the write target, so a partial
136 + // or corrupt transfer can never become the backup `migration_dry_run`
137 + // restores (CF4). `--inplace`/`--partial` are deliberately NOT used — those
138 + // keep a truncated file on failure, the opposite of what we want here.
139 + let tmp_path = format!("{local_path}.partial");
140 + let is_gz = local_path.ends_with(".gz");
141 +
97 142 let parsed = parse_source(&source)?;
98 - match parsed {
99 - BackupSource::File { path } => {
100 - tokio::fs::copy(&path, &local_path)
101 - .await
102 - .with_context(|| format!("copy {path} -> {local_path}"))?;
103 - }
104 - BackupSource::RsyncDaemon { url } => {
105 - let out = Command::new("rsync")
106 - .args(["-az", "--inplace", &url, &local_path])
107 - .output()
108 - .await
109 - .context("spawning rsync")?;
110 - anyhow::ensure!(
111 - out.status.success(),
112 - "rsync (daemon) failed: {}",
113 - String::from_utf8_lossy(&out.stderr),
114 - );
115 - }
116 - BackupSource::Ssh { user_host, port, path } => {
117 - let ssh_cmd = match port {
118 - Some(p) => format!("ssh -p {p} -o BatchMode=yes -o StrictHostKeyChecking=accept-new"),
119 - None => "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new".into(),
120 - };
121 - let remote = format!("{user_host}:{path}");
122 - let out = Command::new("rsync")
123 - .args(["-a", "--partial"])
124 - .arg("-e").arg(&ssh_cmd)
125 - .arg(&remote)
126 - .arg(&local_path)
127 - .output()
128 - .await
129 - .context("spawning rsync")?;
130 - anyhow::ensure!(
131 - out.status.success(),
132 - "rsync (ssh) failed: {}",
133 - String::from_utf8_lossy(&out.stderr),
134 - );
143 + let downloaded: Result<()> = async {
144 + match parsed {
145 + BackupSource::File { path } => {
146 + tokio::fs::copy(&path, &tmp_path)
147 + .await
148 + .with_context(|| format!("copy {path} -> {tmp_path}"))?;
149 + }
150 + BackupSource::RsyncDaemon { url } => {
151 + let out = Command::new("rsync")
152 + .args(["-az", &url, &tmp_path])
153 + .output()
154 + .await
155 + .context("spawning rsync")?;
156 + anyhow::ensure!(
157 + out.status.success(),
158 + "rsync (daemon) failed: {}",
159 + String::from_utf8_lossy(&out.stderr),
160 + );
161 + }
162 + BackupSource::Ssh { user_host, port, path } => {
163 + let ssh_cmd = match port {
164 + Some(p) => format!("ssh -p {p} -o BatchMode=yes -o StrictHostKeyChecking=accept-new"),
165 + None => "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new".into(),
166 + };
167 + let remote = format!("{user_host}:{path}");
168 + let out = Command::new("rsync")
169 + .args(["-a"])
170 + .arg("-e").arg(&ssh_cmd)
171 + .arg(&remote)
172 + .arg(&tmp_path)
173 + .output()
174 + .await
175 + .context("spawning rsync")?;
176 + anyhow::ensure!(
177 + out.status.success(),
178 + "rsync (ssh) failed: {}",
179 + String::from_utf8_lossy(&out.stderr),
180 + );
181 + }
135 182 }
183 + verify_backup(&tmp_path, is_gz).await
184 + }
185 + .await;
186 +
187 + // On any download/verify failure, remove the temp file so a corrupt
188 + // `.partial` never lingers, and leave the existing live backup untouched.
189 + if let Err(e) = downloaded {
190 + let _ = tokio::fs::remove_file(&tmp_path).await;
191 + return Err(e);
136 192 }
137 193
194 + tokio::fs::rename(&tmp_path, &local_path)
195 + .await
196 + .with_context(|| format!("atomic rename {tmp_path} -> {local_path}"))?;
197 +
138 198 let meta = tokio::fs::metadata(&local_path).await?;
139 199 let size = meta.len() as i64;
140 200
@@ -164,6 +224,100 @@ pub async fn fetch(
164 224 #[cfg(test)]
165 225 mod tests {
166 226 use super::*;
227 + use crate::topology::{BackupConfig, RepoConfig};
228 +
229 + // ---- CF4: atomic write + integrity ----
230 +
231 + async fn mem_pool() -> SqlitePool {
232 + let pool = sqlx::sqlite::SqlitePoolOptions::new()
233 + .max_connections(1)
234 + .connect("sqlite::memory:")
235 + .await
236 + .unwrap();
237 + crate::db::migrate(&pool).await.unwrap();
238 + pool
239 + }
240 +
241 + fn topo_with_backup(source: String, local_path: String) -> Topology {
242 + Topology {
243 + repo: RepoConfig { bare_path: "/tmp/x.git".into(), branch: "main".into(), upstream: None },
244 + backup: BackupConfig { source, local_path },
245 + tiers: vec![],
246 + }
247 + }
248 +
249 + /// ~4 KB of poorly-compressible bytes so a gzip of it stays well above the
250 + /// size floor and a half-truncation lands mid-stream (failing `gzip -t`).
251 + fn incompressible(n: usize) -> Vec<u8> {
252 + (0..n).map(|i| (i.wrapping_mul(2_654_435_761) >> 13) as u8).collect()
253 + }
254 +
255 + async fn write_valid_gz(path: &Path) {
256 + let plain = path.with_extension("plain");
257 + tokio::fs::write(&plain, incompressible(4096)).await.unwrap();
258 + let out = Command::new("sh")
259 + .arg("-c")
260 + .arg(format!("gzip -c {} > {}", plain.display(), path.display()))
261 + .output()
262 + .await
263 + .unwrap();
264 + assert!(out.status.success(), "gzip shim failed");
265 + }
266 +
267 + #[tokio::test]
268 + async fn fetch_file_source_writes_atomically_and_records_row() {
269 + let tmp = tempfile::tempdir().unwrap();
270 + let src = tmp.path().join("src.sql.gz");
271 + write_valid_gz(&src).await;
272 + let dest = tmp.path().join("backups/latest.sql.gz");
273 + let topo = Arc::new(topo_with_backup(
274 + format!("file://{}", src.display()),
275 + dest.to_string_lossy().into_owned(),
276 + ));
277 + let pool = mem_pool().await;
278 + let cfg = Arc::new(Config::for_tests());
279 +
280 + let fb = fetch(&pool, &cfg, &topo).await.unwrap();
281 + assert!(dest.exists(), "live backup written");
282 + assert!(
283 + !dest.with_file_name("latest.sql.gz.partial").exists(),
284 + "temp file consumed by the atomic rename",
285 + );
286 + assert!(fb.byte_size.unwrap() > 0);
287 + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM backups")
288 + .fetch_one(&pool).await.unwrap();
289 + assert_eq!(count.0, 1, "a row is recorded for a successful fetch");
290 + }
291 +
292 + #[tokio::test]
293 + async fn fetch_rejects_truncated_gz_and_leaves_no_live_file() {
294 + let tmp = tempfile::tempdir().unwrap();
295 + let src = tmp.path().join("src.sql.gz");
296 + write_valid_gz(&src).await;
297 + // Truncate to half: a valid gzip prefix that fails `gzip -t` mid-stream.
298 + let full = tokio::fs::read(&src).await.unwrap();
299 + assert!(full.len() / 2 > MIN_BACKUP_BYTES as usize, "half must clear the size floor to exercise gzip -t");
300 + tokio::fs::write(&src, &full[..full.len() / 2]).await.unwrap();
301 +
302 + let dest = tmp.path().join("backups/latest.sql.gz");
303 + let topo = Arc::new(topo_with_backup(
304 + format!("file://{}", src.display()),
305 + dest.to_string_lossy().into_owned(),
306 + ));
307 + let pool = mem_pool().await;
308 + let cfg = Arc::new(Config::for_tests());
309 +
310 + let res = fetch(&pool, &cfg, &topo).await;
311 + assert!(res.is_err(), "a truncated gzip must fail the fetch");
312 + assert!(!dest.exists(), "no live backup file results from a failed fetch");
313 + assert!(
314 + !dest.with_file_name("latest.sql.gz.partial").exists(),
315 + "the corrupt temp file is cleaned up",
316 + );
317 + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM backups")
318 + .fetch_one(&pool).await.unwrap();
319 + assert_eq!(count.0, 0, "no row recorded for a failed fetch");
320 + }
167 321
168 322 #[test]
169 323 fn parses_file_url() {
@@ -214,7 +214,18 @@ async fn read_pkg_version(cargo_toml: &Path) -> Result<Version> {
214 214
215 215 fn tail(buf: &[u8], max: usize) -> String {
216 216 let s = String::from_utf8_lossy(buf);
217 - if s.len() <= max { s.into_owned() } else { s[s.len() - max..].to_string() }
217 + if s.len() <= max {
218 + return s.into_owned();
219 + }
220 + // `s.len() - max` can land mid-codepoint; walk forward to the next char
221 + // boundary so the slice never panics (returns slightly fewer than `max`
222 + // bytes in that case). `floor_char_boundary` is still unstable, so do it by
223 + // hand.
224 + let mut start = s.len() - max;
225 + while start < s.len() && !s.is_char_boundary(start) {
226 + start += 1;
227 + }
228 + s[start..].to_string()
218 229 }
219 230
220 231 /// Copy `worktree/<entry.src>` into `staged/<entry.dst>`. Handles file or
@@ -273,3 +284,25 @@ async fn stage_entry(
273 284 );
274 285 Ok(())
275 286 }
287 +
288 + #[cfg(test)]
289 + mod tests {
290 + use super::tail;
291 +
292 + #[test]
293 + fn tail_does_not_panic_on_multibyte_boundary() {
294 + // Each '€' is 3 bytes; a byte cap landing mid-codepoint must not panic.
295 + let s = "€".repeat(10); // 30 bytes
296 + for max in 1..=30 {
297 + let out = tail(s.as_bytes(), max);
298 + assert!(out.len() <= max, "max={max} got {} bytes", out.len());
299 + // Result is always valid UTF-8 made only of whole '€'s.
300 + assert!(out.chars().all(|c| c == '€'), "max={max}: {out:?}");
301 + }
302 + }
303 +
304 + #[test]
305 + fn tail_returns_whole_input_when_under_cap() {
306 + assert_eq!(tail(b"hello", 100), "hello");
307 + }
308 + }
@@ -170,19 +170,8 @@ async fn deploy_remote(
170 170 .context("rsync failed (current symlink left intact)")?;
171 171
172 172 tracing::info!(node = %node.name, version, "deploy: symlink swap + service reload");
173 - // Symlink swap is atomic via `mv -T` of a freshly-created symlink over the
174 - // old one (the rename(2) is the atomic step; `ln -sfn` does unlink+symlink
175 - // which has a window).
176 - let swap_and_restart = format!(
177 - "set -e; \
178 - cd {root}; \
179 - ln -sfn releases/{ver} current.new; \
180 - mv -Tf current.new current; \
181 - sudo /bin/systemctl reload-or-restart {svc}",
182 - root = sh_quote(release_root),
183 - ver = sh_quote(version),
184 - svc = sh_quote(service),
185 - );
173 + let restart_cmd = format!("sudo /bin/systemctl reload-or-restart {}", sh_quote(service));
174 + let swap_and_restart = swap_and_restart_script(release_root, version, &restart_cmd);
186 175 run_checked(executor, &swap_and_restart, "symlink swap + systemctl reload-or-restart").await?;
187 176
188 177 if let Err(e) = gc_remote_releases(executor, release_root).await {
@@ -192,6 +181,42 @@ async fn deploy_remote(
192 181 Ok(PathBuf::from(release_root).join("releases").join(version))
193 182 }
194 183
184 + /// Build the swap-and-restart shell script for a remote node.
185 + ///
186 + /// The symlink swap is atomic via `mv -T` of a freshly-created symlink over the
187 + /// old one (the rename(2) is the atomic step; `ln -sfn` alone does
188 + /// unlink+symlink, which has a window). The load-bearing part: if `restart_cmd`
189 + /// fails *after* the flip, `current` is rolled back to its prior target before
190 + /// the script exits non-zero. Otherwise a failed restart would leave `current`
191 + /// pointing at the new, un-activated release while the service still runs the
192 + /// old one — and a later reboot/cron restart would then silently bring up the
193 + /// release the deploy reported as failed. Best-effort re-restart of the prior
194 + /// version keeps the running service consistent with the restored symlink.
195 + ///
196 + /// `restart_cmd` is injected (rather than hardcoded) so tests can drive the
197 + /// failure and success paths with a `false`/`true` stand-in.
198 + fn swap_and_restart_script(release_root: &str, version: &str, restart_cmd: &str) -> String {
199 + format!(
200 + "set -e\n\
201 + cd {root}\n\
202 + prev=$(readlink current 2>/dev/null || true)\n\
203 + ln -sfn releases/{ver} current.new\n\
204 + mv -Tf current.new current\n\
205 + if ! {restart}; then\n\
206 + if [ -n \"$prev\" ]; then\n\
207 + ln -sfn \"$prev\" current.rollback\n\
208 + mv -Tf current.rollback current\n\
209 + {restart} || true\n\
210 + fi\n\
211 + echo \"deploy: restart failed; rolled symlink back to ${{prev:-<none>}}\" >&2\n\
212 + exit 1\n\
213 + fi\n",
214 + root = sh_quote(release_root),
215 + ver = sh_quote(version),
216 + restart = restart_cmd,
217 + )
218 + }
219 +
195 220 async fn gc_local_releases(release_root: &Path) -> Result<()> {
196 221 let releases = release_root.join("releases");
197 222 if !releases.exists() {
@@ -416,6 +441,60 @@ mod tests {
416 441 assert_eq!(target.to_string_lossy(), "releases/0.0.1");
417 442 }
418 443
444 + // ---- swap_and_restart_script: symlink/restart consistency ----
445 +
446 + async fn run_script(script: &str) -> std::process::Output {
447 + Command::new("sh").arg("-c").arg(script).output().await.unwrap()
448 + }
449 +
450 + async fn setup_release_root(with_current: bool) -> tempfile::TempDir {
451 + let tmp = tempfile::tempdir().unwrap();
452 + let root = tmp.path();
453 + tokio::fs::create_dir_all(root.join("releases/old")).await.unwrap();
454 + tokio::fs::create_dir_all(root.join("releases/new")).await.unwrap();
455 + if with_current {
456 + std::os::unix::fs::symlink("releases/old", root.join("current")).unwrap();
457 + }
458 + tmp
459 + }
460 +
461 + #[tokio::test]
462 + async fn swap_and_restart_keeps_new_symlink_when_restart_succeeds() {
463 + let tmp = setup_release_root(true).await;
464 + let root = tmp.path().to_string_lossy().into_owned();
465 + let out = run_script(&swap_and_restart_script(&root, "new", "true")).await;
466 + assert!(out.status.success(), "script should succeed when restart succeeds");
467 + let target = tokio::fs::read_link(tmp.path().join("current")).await.unwrap();
468 + assert_eq!(target.to_string_lossy(), "releases/new", "symlink advanced to new");
469 + }
470 +
471 + #[tokio::test]
472 + async fn swap_and_restart_rolls_symlink_back_when_restart_fails() {
473 + // The bug: a restart failure after the flip must NOT leave `current`
474 + // pointing at the new (un-activated) release.
475 + let tmp = setup_release_root(true).await;
476 + let root = tmp.path().to_string_lossy().into_owned();
477 + let out = run_script(&swap_and_restart_script(&root, "new", "false")).await;
478 + assert!(!out.status.success(), "script must fail when restart fails");
479 + let target = tokio::fs::read_link(tmp.path().join("current")).await.unwrap();
480 + assert_eq!(
481 + target.to_string_lossy(), "releases/old",
482 + "symlink rolled back to prev so a later restart can't silently activate new",
483 + );
484 + }
485 +
486 + #[tokio::test]
487 + async fn swap_and_restart_first_deploy_failure_has_no_prev_to_restore() {
488 + // No prior `current`. A restart failure leaves `current` at new (the only
489 + // version) and still reports failure — documented degenerate case.
490 + let tmp = setup_release_root(false).await;
491 + let root = tmp.path().to_string_lossy().into_owned();
492 + let out = run_script(&swap_and_restart_script(&root, "new", "false")).await;
493 + assert!(!out.status.success(), "script must fail when restart fails");
494 + let target = tokio::fs::read_link(tmp.path().join("current")).await.unwrap();
495 + assert_eq!(target.to_string_lossy(), "releases/new", "no prev existed to roll back to");
496 + }
497 +
419 498 #[tokio::test]
420 499 async fn deploy_node_denied_when_executor_lacks_deploy_grant() {
421 500 // Defense in depth: an executor without the deploy grant refuses the
@@ -315,17 +315,47 @@ async fn clean_stale_test_dbs(db_url: &str) {
315 315 pool.close().await;
316 316 }
317 317
318 - async fn restore_dump(db_url: &str, dump: &str, log_buf: &mut Vec<u8>) -> Result<()> {
319 - // Two pipelines we accept:
320 - // *.sql -> psql $url < dump
321 - // *.sql.gz -> gunzip -c dump | psql $url
322 - let is_gz = dump.ends_with(".gz");
323 - let shell = if is_gz {
324 - format!("gunzip -c {q} | psql {url}", q = shell_escape(dump), url = shell_escape(db_url))
318 + /// Build the restore shell line. Two pipelines we accept:
319 + /// *.sql -> psql -v ON_ERROR_STOP=1 $url < dump
320 + /// *.sql.gz -> set -o pipefail; gunzip -c dump | psql -v ON_ERROR_STOP=1 $url
321 + ///
322 + /// Two safety flags are load-bearing (CF4):
323 + /// - `ON_ERROR_STOP=1`: without it, psql exits 0 even when individual statements
324 + /// error, so a partial/corrupt restore would *pass* the gate.
325 + /// - `set -o pipefail`: without it a shell pipeline reports only the last
326 + /// command's status, so a `gunzip` failure on a truncated archive is masked by
327 + /// psql's exit. pipefail is a bash builtin (not POSIX sh), so the runner uses
328 + /// `bash -c`.
329 + fn restore_shell(db_url: &str, dump: &str) -> String {
330 + if dump.ends_with(".gz") {
331 + format!(
332 + "set -o pipefail; gunzip -c {q} | psql -v ON_ERROR_STOP=1 {url}",
333 + q = shell_escape(dump),
334 + url = shell_escape(db_url),
335 + )
325 336 } else {
326 - format!("psql {url} < {q}", q = shell_escape(dump), url = shell_escape(db_url))
327 - };
328 - let out = Command::new("sh").arg("-c").arg(&shell).output().await?;
337 + format!(
338 + "psql -v ON_ERROR_STOP=1 {url} < {q}",
339 + url = shell_escape(db_url),
340 + q = shell_escape(dump),
341 + )
342 + }
343 + }
344 +
345 + async fn restore_dump(db_url: &str, dump: &str, log_buf: &mut Vec<u8>) -> Result<()> {
346 + // Split the password out of the URL and hand it to psql via PGPASSWORD, so it
347 + // never lands in argv (visible in /proc/<pid>/cmdline to any local user).
348 + // The sanitized URL — user/host/db, no secret — goes on the command line.
349 + let (sanitized, password) = split_pg_password(db_url);
350 + let shell = restore_shell(&sanitized, dump);
351 + // `bash` (not `sh`): `set -o pipefail` is a bash builtin. The restore runs
352 + // locally on the Sando host (fw13), which has bash.
353 + let mut cmd = Command::new("bash");
354 + cmd.arg("-c").arg(&shell);
355 + if let Some(pw) = password {
356 + cmd.env("PGPASSWORD", pw);
357 + }
358 + let out = cmd.output().await?;
329 359 log_buf.extend_from_slice(&out.stdout);
330 360 log_buf.extend_from_slice(&out.stderr);
331 361 anyhow::ensure!(
@@ -336,6 +366,62 @@ async fn restore_dump(db_url: &str, dump: &str, log_buf: &mut Vec<u8>) -> Result
336 366 Ok(())
337 367 }
338 368
369 + /// Split a `postgres://user:password@host/db` URL into its password-free form and
370 + /// the (percent-decoded) password. Returns the URL unchanged with `None` when
371 + /// there is no userinfo password. psql reads the password from `PGPASSWORD`, so
372 + /// keeping it off the command line removes the /proc exposure.
373 + fn split_pg_password(db_url: &str) -> (String, Option<String>) {
374 + let Some(after) = db_url.find("://").map(|i| i + 3) else {
375 + return (db_url.to_string(), None);
376 + };
377 + // The authority ends at the first '/', '?' or '#'; the password (if any) is
378 + // between the first ':' and the '@' within the userinfo of that authority.
379 + let authority_end = db_url[after..]
380 + .find(['/', '?', '#'])
381 + .map(|i| after + i)
382 + .unwrap_or(db_url.len());
383 + let Some(at) = db_url[after..authority_end].find('@').map(|i| after + i) else {
384 + return (db_url.to_string(), None);
385 + };
386 + let userinfo = &db_url[after..at];
387 + let Some(colon) = userinfo.find(':') else {
388 + return (db_url.to_string(), None);
389 + };
390 + let password = percent_decode(&userinfo[colon + 1..]);
391 + let sanitized = format!("{}{}{}", &db_url[..after], &userinfo[..colon], &db_url[at..]);
392 + (sanitized, Some(password))
393 + }
394 +
395 + /// Minimal `%XX` percent-decode for a URL userinfo component. Non-escape bytes
396 + /// pass through; a malformed escape is left literal.
397 + fn percent_decode(s: &str) -> String {
398 + let b = s.as_bytes();
399 + let mut out = Vec::with_capacity(b.len());
400 + let mut i = 0;
401 + while i < b.len() {
402 + if b[i] == b'%'
403 + && i + 2 < b.len()
404 + && let (Some(h), Some(l)) = (hex_val(b[i + 1]), hex_val(b[i + 2]))
405 + {
406 + out.push((h << 4) | l);
407 + i += 3;
408 + } else {
409 + out.push(b[i]);
410 + i += 1;
411 + }
412 + }
413 + String::from_utf8_lossy(&out).into_owned()
414 + }
415 +
416 + fn hex_val(c: u8) -> Option<u8> {
417 + match c {
418 + b'0'..=b'9' => Some(c - b'0'),
419 + b'a'..=b'f' => Some(c - b'a' + 10),
420 + b'A'..=b'F' => Some(c - b'A' + 10),
421 + _ => None,
422 + }
423 + }
424 +
339 425 pub(crate) async fn run_migrator(db_url: &str, dir: &std::path::Path) -> Result<()> {
340 426 use sqlx::postgres::PgPoolOptions;
341 427 let pool = PgPoolOptions::new().max_connections(1).connect(db_url).await?;
@@ -490,6 +576,24 @@ fn gate_log_path(ctx: &GateCtx, gate: GateKind) -> PathBuf {
490 576 ctx.cfg.logs_root.join(ctx.version.to_string()).join(format!("{}.log", gate.as_str()))
491 577 }
492 578
579 + /// Live check: has `tier`'s burn-in window of `hours` elapsed since its clock
580 + /// (`tier_state.burn_in_started_at`, started by a promote onto the tier)? Used
581 + /// by the promote-time gate check (`unsatisfied_gates`) so a stale `blocked`
582 + /// row never masks an elapsed — or not-yet-elapsed — window. The `burn_in` gate
583 + /// runner below wraps the same state with a richer outcome for `/state`.
584 + pub async fn burn_in_satisfied(pool: &SqlitePool, tier: &TierId, hours: u32) -> Result<bool> {
585 + let started: Option<String> = sqlx::query_scalar(
586 + "SELECT burn_in_started_at FROM tier_state WHERE tier = ?",
587 + )
588 + .bind(tier)
589 + .fetch_optional(pool)
590 + .await?
591 + .flatten();
592 + let Some(started) = started else { return Ok(false) };
593 + let started = chrono::DateTime::parse_from_rfc3339(&started)?.with_timezone(&Utc);
594 + Ok(Utc::now() - started >= chrono::Duration::hours(hours as i64))
595 + }
596 +
493 597 async fn burn_in(ctx: &GateCtx, hours: u32) -> Result<GateOutcome> {
494 598 // Check tier_state.burn_in_started_at on this tier; pass if enough time
495 599 // has elapsed. The clock is started by /promote when a version lands on
@@ -661,6 +765,60 @@ mod tests {
661 765 pool.close().await;
662 766 }
663 767
768 + /// CF4: the restore pipeline must carry `ON_ERROR_STOP=1` (so psql fails on
769 + /// a bad statement instead of exiting 0 on a partial restore) and, for a
770 + /// gzip source, `set -o pipefail` (so a `gunzip` failure on a truncated
771 + /// archive isn't masked by psql's exit). Pure string check — no postgres.
772 + #[test]
773 + fn restore_shell_has_error_stop_and_pipefail() {
774 + let gz = restore_shell("postgres:///scratch", "/srv/sando/backups/latest.sql.gz");
775 + assert!(gz.contains("ON_ERROR_STOP=1"), "gz: {gz}");
776 + assert!(gz.contains("set -o pipefail"), "gz: {gz}");
777 + assert!(gz.contains("gunzip -c"), "gz: {gz}");
778 +
779 + let plain = restore_shell("postgres:///scratch", "/srv/sando/backups/dump.sql");
780 + assert!(plain.contains("ON_ERROR_STOP=1"), "plain: {plain}");
781 + // No pipeline for a plain .sql, so pipefail is unnecessary there.
782 + assert!(!plain.contains("gunzip"), "plain: {plain}");
783 + // The db url is single-quote escaped in both forms.
784 + assert!(plain.contains("'postgres:///scratch'"), "plain: {plain}");
785 + }
786 +
787 + #[test]
788 + fn split_pg_password_extracts_and_sanitizes() {
789 + // Password lifted out of the URL; the sanitized form keeps user/host/db.
790 + let (url, pw) = split_pg_password("postgres://sando:s3cret@db.host:5432/scratch");
791 + assert_eq!(url, "postgres://sando@db.host:5432/scratch");
792 + assert_eq!(pw.as_deref(), Some("s3cret"));
793 + // Percent-encoded password is decoded for PGPASSWORD.
794 + let (url, pw) = split_pg_password("postgresql://u:p%40ss%2Fword@h/d");
795 + assert_eq!(url, "postgresql://u@h/d");
796 + assert_eq!(pw.as_deref(), Some("p@ss/word"));
797 + }
798 +
799 + #[test]
800 + fn split_pg_password_noop_without_password() {
801 + // No userinfo password -> unchanged, None. (A ':' after the '@', e.g. a
802 + // port, must not be mistaken for the password delimiter.)
803 + assert_eq!(
804 + split_pg_password("postgres:///scratch"),
805 + ("postgres:///scratch".to_string(), None),
806 + );
807 + assert_eq!(
808 + split_pg_password("postgres://sando@db.host:5432/scratch"),
809 + ("postgres://sando@db.host:5432/scratch".to_string(), None),
810 + );
811 + }
812 +
813 + #[test]
814 + fn percent_decode_handles_escapes_and_malformed() {
815 + assert_eq!(percent_decode("plain"), "plain");
816 + assert_eq!(percent_decode("a%2Fb"), "a/b");
817 + // A malformed trailing escape is left literal, not dropped.
818 + assert_eq!(percent_decode("ab%2"), "ab%2");
819 + assert_eq!(percent_decode("ab%zz"), "ab%zz");
820 + }
821 +
664 822 /// Sanity: applying MNW migrations from a *non-existent* dir errors,
665 823 /// rather than silently no-op'ing. Cheap pure check, no postgres needed
666 824 /// (the sqlx::Migrator::new constructor itself reads the dir).
@@ -94,10 +94,24 @@ pub async fn sha_present(bare: &Path, sha: &str) -> Result<bool> {
94 94
95 95 pub async fn checkout_worktree(bare: &Path, sha: &str, dest: &Path) -> Result<()> {
96 96 if dest.exists() {
97 - // Pre-existing worktree (re-trigger of same sha). Idempotent — nothing to do.
98 - return Ok(());
97 + // A dir already exists at `dest`. Trust it ONLY if it is a real git
98 + // worktree whose checked-out HEAD is exactly `sha`. Otherwise it is a
99 + // stale or partial leftover — a crashed `worktree add`, a half-populated
100 + // build dir, or a worktree for a different sha — which would silently
101 + // get compiled and shipped as if it were `sha`. Remove and recreate.
102 + if worktree_head_matches(bare, dest, sha).await.unwrap_or(false) {
103 + return Ok(());
104 + }
105 + tracing::warn!(
106 + dest = %dest.display(), sha,
107 + "existing path is not a valid worktree at this sha; removing and recreating",
108 + );
109 + remove_worktree(bare, dest).await?;
99 110 }
100 111 tokio::fs::create_dir_all(dest.parent().unwrap()).await?;
112 + // Drop admin entries for worktrees whose dirs are gone, so `add` can't
113 + // collide with a leftover registration for this path.
114 + prune_worktrees(bare).await;
101 115 let out = Command::new("git")
102 116 .arg("--git-dir")
103 117 .arg(bare)
@@ -113,3 +127,169 @@ pub async fn checkout_worktree(bare: &Path, sha: &str, dest: &Path) -> Result<()
113 127 );
114 128 Ok(())
115 129 }
130 +
131 + /// True iff `dest` is a git worktree whose checked-out HEAD commit equals the
132 + /// commit `sha` resolves to in `bare`. Any failure (not a worktree, unresolvable
133 + /// sha) is reported as "no match" rather than an error — the caller recreates.
134 + async fn worktree_head_matches(bare: &Path, dest: &Path, sha: &str) -> Result<bool> {
135 + let want = resolve_commit(bare, sha).await?;
136 + let have = match worktree_head(dest).await {
137 + Ok(h) => h,
138 + Err(_) => return Ok(false),
139 + };
140 + Ok(have == want)
141 + }
142 +
143 + /// Resolve `sha` (short or full) to its full commit id in the bare repo.
144 + async fn resolve_commit(bare: &Path, sha: &str) -> Result<String> {
145 + let out = Command::new("git")
146 + .arg("--git-dir")
147 + .arg(bare)
148 + .args(["rev-parse", "--verify", "--quiet"])
149 + .arg(format!("{sha}^{{commit}}"))
150 + .output()
151 + .await?;
152 + anyhow::ensure!(out.status.success(), "sha {sha} does not resolve to a commit in the bare repo");
153 + Ok(String::from_utf8(out.stdout)?.trim().to_string())
154 + }
155 +
156 + /// The full HEAD commit id checked out in a worktree dir. Errors if `dest` is
157 + /// not a valid git worktree.
158 + async fn worktree_head(dest: &Path) -> Result<String> {
159 + let out = Command::new("git")
160 + .arg("-C")
161 + .arg(dest)
162 + .args(["rev-parse", "--verify", "--quiet", "HEAD"])
163 + .output()
164 + .await?;
165 + anyhow::ensure!(out.status.success(), "not a git worktree (rev-parse HEAD failed)");
166 + Ok(String::from_utf8(out.stdout)?.trim().to_string())
167 + }
168 +
169 + /// Remove a worktree dir and its admin registration. Prefers `git worktree
170 + /// remove --force`; falls back to a filesystem remove + prune when `dest` isn't
171 + /// a registered worktree (a partial leftover git won't recognize).
172 + async fn remove_worktree(bare: &Path, dest: &Path) -> Result<()> {
173 + let out = Command::new("git")
174 + .arg("--git-dir")
175 + .arg(bare)
176 + .args(["worktree", "remove", "--force"])
177 + .arg(dest)
178 + .output()
179 + .await?;
180 + if !out.status.success() {
181 + if dest.exists() {
182 + tokio::fs::remove_dir_all(dest)
183 + .await
184 + .with_context(|| format!("removing stale worktree dir {}", dest.display()))?;
185 + }
186 + prune_worktrees(bare).await;
187 + }
188 + Ok(())
189 + }
190 +
191 + /// `git worktree prune` — drop admin entries for worktrees whose dirs are gone.
192 + /// Best-effort: a prune failure is logged, never fatal.
193 + async fn prune_worktrees(bare: &Path) {
194 + match Command::new("git")
195 + .arg("--git-dir")
196 + .arg(bare)
197 + .args(["worktree", "prune"])
198 + .output()
199 + .await
200 + {
201 + Ok(o) if !o.status.success() => {
202 + tracing::warn!(stderr = %String::from_utf8_lossy(&o.stderr), "git worktree prune failed");
203 + }
204 + Err(e) => tracing::warn!(error = %e, "spawning git worktree prune failed"),
205 + _ => {}
206 + }
207 + }
208 +
209 + #[cfg(test)]
210 + mod tests {
211 + use super::*;
212 +
213 + async fn git(repo: &Path, args: &[&str]) -> std::process::Output {
214 + Command::new("git")
215 + .args(["-c", "user.email=t@t", "-c", "user.name=t"])
216 + .current_dir(repo)
217 + .args(args)
218 + .output()
219 + .await
220 + .unwrap()
221 + }
222 +
223 + async fn rev_parse(repo: &Path, r: &str) -> String {
224 + let o = git(repo, &["rev-parse", r]).await;
225 + assert!(o.status.success(), "rev-parse {r} failed");
226 + String::from_utf8(o.stdout).unwrap().trim().to_string()
227 + }
228 +
229 + /// A repo with two commits; returns (tmp, gitdir, sha1, sha2).
230 + async fn two_commit_repo() -> (tempfile::TempDir, std::path::PathBuf, String, String) {
231 + let tmp = tempfile::tempdir().unwrap();
232 + let repo = tmp.path().join("repo");
233 + tokio::fs::create_dir_all(&repo).await.unwrap();
234 + assert!(git(&repo, &["init", "-q", "-b", "main"]).await.status.success());
235 + tokio::fs::write(repo.join("a.txt"), b"one").await.unwrap();
236 + assert!(git(&repo, &["add", "."]).await.status.success());
237 + assert!(git(&repo, &["commit", "-q", "-m", "one"]).await.status.success());
238 + let sha1 = rev_parse(&repo, "HEAD").await;
239 + tokio::fs::write(repo.join("b.txt"), b"two").await.unwrap();
240 + assert!(git(&repo, &["add", "."]).await.status.success());
241 + assert!(git(&repo, &["commit", "-q", "-m", "two"]).await.status.success());
242 + let sha2 = rev_parse(&repo, "HEAD").await;
243 + let gitdir = repo.join(".git");
244 + (tmp, gitdir, sha1, sha2)
245 + }
246 +
247 + #[tokio::test]
248 + async fn checkout_worktree_creates_at_sha() {
249 + let (tmp, gitdir, sha1, _sha2) = two_commit_repo().await;
250 + let dest = tmp.path().join("wt");
251 + checkout_worktree(&gitdir, &sha1, &dest).await.unwrap();
252 + assert!(dest.join("a.txt").exists());
253 + assert!(!dest.join("b.txt").exists(), "sha1 predates b.txt");
254 + assert_eq!(rev_parse(&dest, "HEAD").await, sha1);
255 + }
256 +
257 + #[tokio::test]
258 + async fn checkout_worktree_idempotent_reuses_valid_worktree() {
259 + let (tmp, gitdir, sha1, _) = two_commit_repo().await;
260 + let dest = tmp.path().join("wt");
261 + checkout_worktree(&gitdir, &sha1, &dest).await.unwrap();
262 + // A marker that an idempotent re-checkout must NOT wipe.
263 + tokio::fs::write(dest.join("marker"), b"x").await.unwrap();
264 + checkout_worktree(&gitdir, &sha1, &dest).await.unwrap();
265 + assert!(dest.join("marker").exists(), "valid worktree reused, not recreated");
266 + assert_eq!(rev_parse(&dest, "HEAD").await, sha1);
267 + }
268 +
269 + #[tokio::test]
270 + async fn checkout_worktree_replaces_partial_leftover() {
271 + // dest exists but is NOT a worktree (a crashed mid-add). It must be
272 + // replaced with a real checkout, not built as-is.
273 + let (tmp, gitdir, sha1, _) = two_commit_repo().await;
274 + let dest = tmp.path().join("wt");
275 + tokio::fs::create_dir_all(&dest).await.unwrap();
276 + tokio::fs::write(dest.join("garbage"), b"partial").await.unwrap();
277 + checkout_worktree(&gitdir, &sha1, &dest).await.unwrap();
278 + assert!(dest.join("a.txt").exists(), "real checkout populated the dir");
279 + assert!(!dest.join("garbage").exists(), "stale leftover removed");
280 + assert_eq!(rev_parse(&dest, "HEAD").await, sha1);
281 + }
282 +
283 + #[tokio::test]
284 + async fn checkout_worktree_recreates_when_existing_sha_differs() {
285 + let (tmp, gitdir, sha1, sha2) = two_commit_repo().await;
286 + let dest = tmp.path().join("wt");
287 + checkout_worktree(&gitdir, &sha2, &dest).await.unwrap();
288 + assert!(dest.join("b.txt").exists());
289 + // Re-checkout the SAME dest at the older sha — the wrong-sha worktree
290 + // must be torn down and rebuilt, not silently reused.
291 + checkout_worktree(&gitdir, &sha1, &dest).await.unwrap();
292 + assert_eq!(rev_parse(&dest, "HEAD").await, sha1);
293 + assert!(!dest.join("b.txt").exists(), "now at sha1, which predates b.txt");
294 + }
295 + }
@@ -32,6 +32,27 @@ async fn main() -> Result<()> {
32 32
33 33 let prom = metrics::init();
34 34 let addr: SocketAddr = cfg.listen.parse()?;
35 +
36 + // Deploy-API auth (CF2). Token comes from SANDO_API_TOKEN (systemd
37 + // EnvironmentFile). Refuse to expose the mutators unauthenticated on a
38 + // non-loopback bind — that exact posture (tailnet bind, no auth) was the
39 + // finding.
40 + let api_token: Option<Arc<str>> = std::env::var("SANDO_API_TOKEN")
41 + .ok()
42 + .map(|s| s.trim().to_string())
43 + .filter(|s| !s.is_empty())
44 + .map(|s| Arc::from(s.as_str()));
45 + if api_token.is_none() && !addr.ip().is_loopback() {
46 + anyhow::bail!(
47 + "SANDO_API_TOKEN is unset but listen={addr} is not loopback; refusing to expose deploy \
48 + controls unauthenticated. Set SANDO_API_TOKEN (via EnvironmentFile) or bind 127.0.0.1."
49 + );
50 + }
51 + match &api_token {
52 + Some(_) => tracing::info!("deploy endpoints require a bearer token"),
53 + None => tracing::warn!(%addr, "SANDO_API_TOKEN unset; deploy endpoints are UNAUTHENTICATED (loopback bind)"),
54 + }
55 +
35 56 let executors = Arc::new(state::build_executors(&topo));
36 57 let app_state = state::AppState {
37 58 pool,
@@ -39,8 +60,10 @@ async fn main() -> Result<()> {
39 60 cfg,
40 61 prom,
41 62 active_build: Arc::new(tokio::sync::Mutex::new(None)),
63 + deploy_lock: Arc::new(tokio::sync::Mutex::new(())),
42 64 events: events::channel(),
43 65 executors,
66 + api_token,
44 67 };
45 68 let app = routes::router(app_state);
46 69 tracing::info!(%addr, "sando daemon listening");