Skip to main content

max / makenotwork

ops-exec: trusted executor (E1+E2), Sando refactor, thin macOS driver Lane A release pipeline. New MIT crate shared/ops-exec: the capability-gated Executor trait with LocalExec/SshExec/AgentRpc transports, typed Step/Action, double-enforced CapabilitySet, and the ops-agent binary (tailscale whois identity, Aqua LaunchAgent for in-session macOS signing). ops-core re-exports it so bento compiles unchanged. Sando deploy refactored onto the executor, behavior-preserving: ssh/rsync now run through the per-node executor map built from topology; Node gains actuate/observe caps defaulting to deploy+restart/health so sando.toml is unchanged. LiveLog implements ops_exec::LogSink. bento/driver (bento-release-macos): the thin agent-driven driver (launchplan A decision (a)) — checkout -> release-macos.sh --keychain -> spctl verify -> pull DMG, transport-agnostic via ops-exec. Tests: ops-exec 25 unit + 4 HTTP e2e, ops-core 8, sando 76, driver 6; zero warnings, clippy clean. Real DMG run / clean-Mac smoke / mbp install remain handoffs H1/M7. Runbook: _private/docs/ops-core/operator-runbook.md. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-07 19:03 UTC
Commit: 14fd620d9683016a5a47c8f3fbc8a03b57381e0d
Parent: 9cceb18
36 files changed, +5179 insertions, -349 deletions
@@ -47,6 +47,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
47 47 checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
48 48
49 49 [[package]]
50 + name = "async-trait"
51 + version = "0.1.89"
52 + source = "registry+https://github.com/rust-lang/crates.io-index"
53 + checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
54 + dependencies = [
55 + "proc-macro2",
56 + "quote",
57 + "syn",
58 + ]
59 +
60 + [[package]]
50 61 name = "atoi"
51 62 version = "2.0.0"
52 63 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1262,7 +1273,9 @@ name = "ops-core"
1262 1273 version = "0.1.0"
1263 1274 dependencies = [
1264 1275 "anyhow",
1276 + "async-trait",
1265 1277 "chrono",
1278 + "ops-exec",
1266 1279 "serde",
1267 1280 "sqlx",
1268 1281 "tokio",
@@ -1270,6 +1283,18 @@ dependencies = [
1270 1283 ]
1271 1284
1272 1285 [[package]]
1286 + name = "ops-exec"
1287 + version = "0.1.0"
1288 + dependencies = [
1289 + "anyhow",
1290 + "async-trait",
1291 + "serde",
1292 + "thiserror",
1293 + "tokio",
1294 + "tracing",
1295 + ]
1296 +
1297 + [[package]]
1273 1298 name = "parking"
1274 1299 version = "2.2.1"
1275 1300 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -0,0 +1,1920 @@
1 + # This file is automatically @generated by Cargo.
2 + # It is not intended for manual editing.
3 + version = 4
4 +
5 + [[package]]
6 + name = "aho-corasick"
7 + version = "1.1.4"
8 + source = "registry+https://github.com/rust-lang/crates.io-index"
9 + checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
10 + dependencies = [
11 + "memchr",
12 + ]
13 +
14 + [[package]]
15 + name = "anyhow"
16 + version = "1.0.102"
17 + source = "registry+https://github.com/rust-lang/crates.io-index"
18 + checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
19 +
20 + [[package]]
21 + name = "async-trait"
22 + version = "0.1.89"
23 + source = "registry+https://github.com/rust-lang/crates.io-index"
24 + checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
25 + dependencies = [
26 + "proc-macro2",
27 + "quote",
28 + "syn",
29 + ]
30 +
31 + [[package]]
32 + name = "atomic-waker"
33 + version = "1.1.2"
34 + source = "registry+https://github.com/rust-lang/crates.io-index"
35 + checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
36 +
37 + [[package]]
38 + name = "base64"
39 + version = "0.22.1"
40 + source = "registry+https://github.com/rust-lang/crates.io-index"
41 + checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
42 +
43 + [[package]]
44 + name = "bento-driver"
45 + version = "0.1.0"
46 + dependencies = [
47 + "anyhow",
48 + "async-trait",
49 + "ops-exec",
50 + "serde",
51 + "tempfile",
52 + "tokio",
53 + "toml",
54 + "tracing",
55 + "tracing-subscriber",
56 + ]
57 +
58 + [[package]]
59 + name = "bitflags"
60 + version = "2.13.0"
61 + source = "registry+https://github.com/rust-lang/crates.io-index"
62 + checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8"
63 +
64 + [[package]]
65 + name = "bumpalo"
66 + version = "3.20.3"
67 + source = "registry+https://github.com/rust-lang/crates.io-index"
68 + checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649"
69 +
70 + [[package]]
71 + name = "bytes"
72 + version = "1.11.1"
73 + source = "registry+https://github.com/rust-lang/crates.io-index"
74 + checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
75 +
76 + [[package]]
77 + name = "cc"
78 + version = "1.2.63"
79 + source = "registry+https://github.com/rust-lang/crates.io-index"
80 + checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f"
81 + dependencies = [
82 + "find-msvc-tools",
83 + "shlex",
84 + ]
85 +
86 + [[package]]
87 + name = "cfg-if"
88 + version = "1.0.4"
89 + source = "registry+https://github.com/rust-lang/crates.io-index"
90 + checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
91 +
92 + [[package]]
93 + name = "cfg_aliases"
94 + version = "0.2.1"
95 + source = "registry+https://github.com/rust-lang/crates.io-index"
96 + checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
97 +
98 + [[package]]
99 + name = "displaydoc"
100 + version = "0.2.6"
101 + source = "registry+https://github.com/rust-lang/crates.io-index"
102 + checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f"
103 + dependencies = [
104 + "proc-macro2",
105 + "quote",
106 + "syn",
107 + ]
108 +
109 + [[package]]
110 + name = "equivalent"
111 + version = "1.0.2"
112 + source = "registry+https://github.com/rust-lang/crates.io-index"
113 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
114 +
115 + [[package]]
116 + name = "errno"
117 + version = "0.3.14"
118 + source = "registry+https://github.com/rust-lang/crates.io-index"
119 + checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
120 + dependencies = [
121 + "libc",
122 + "windows-sys 0.61.2",
123 + ]
124 +
125 + [[package]]
126 + name = "fastrand"
127 + version = "2.4.1"
128 + source = "registry+https://github.com/rust-lang/crates.io-index"
129 + checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
130 +
131 + [[package]]
132 + name = "find-msvc-tools"
133 + version = "0.1.9"
134 + source = "registry+https://github.com/rust-lang/crates.io-index"
135 + checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
136 +
137 + [[package]]
138 + name = "foldhash"
139 + version = "0.1.5"
140 + source = "registry+https://github.com/rust-lang/crates.io-index"
141 + checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
142 +
143 + [[package]]
144 + name = "form_urlencoded"
145 + version = "1.2.2"
146 + source = "registry+https://github.com/rust-lang/crates.io-index"
147 + checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf"
148 + dependencies = [
149 + "percent-encoding",
150 + ]
151 +
152 + [[package]]
153 + name = "futures-channel"
154 + version = "0.3.32"
155 + source = "registry+https://github.com/rust-lang/crates.io-index"
156 + checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
157 + dependencies = [
158 + "futures-core",
159 + ]
160 +
161 + [[package]]
162 + name = "futures-core"
163 + version = "0.3.32"
164 + source = "registry+https://github.com/rust-lang/crates.io-index"
165 + checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
166 +
167 + [[package]]
168 + name = "futures-io"
169 + version = "0.3.32"
170 + source = "registry+https://github.com/rust-lang/crates.io-index"
171 + checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
172 +
173 + [[package]]
174 + name = "futures-macro"
175 + version = "0.3.32"
176 + source = "registry+https://github.com/rust-lang/crates.io-index"
177 + checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
178 + dependencies = [
179 + "proc-macro2",
180 + "quote",
181 + "syn",
182 + ]
183 +
184 + [[package]]
185 + name = "futures-sink"
186 + version = "0.3.32"
187 + source = "registry+https://github.com/rust-lang/crates.io-index"
188 + checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
189 +
190 + [[package]]
191 + name = "futures-task"
192 + version = "0.3.32"
193 + source = "registry+https://github.com/rust-lang/crates.io-index"
194 + checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393"
195 +
196 + [[package]]
197 + name = "futures-util"
198 + version = "0.3.32"
199 + source = "registry+https://github.com/rust-lang/crates.io-index"
200 + checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
201 + dependencies = [
202 + "futures-core",
203 + "futures-io",
204 + "futures-macro",
205 + "futures-sink",
206 + "futures-task",
207 + "memchr",
208 + "pin-project-lite",
209 + "slab",
210 + ]
211 +
212 + [[package]]
213 + name = "getrandom"
214 + version = "0.2.17"
215 + source = "registry+https://github.com/rust-lang/crates.io-index"
216 + checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
217 + dependencies = [
218 + "cfg-if",
219 + "js-sys",
220 + "libc",
221 + "wasi",
222 + "wasm-bindgen",
223 + ]
224 +
225 + [[package]]
226 + name = "getrandom"
227 + version = "0.3.4"
228 + source = "registry+https://github.com/rust-lang/crates.io-index"
229 + checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
230 + dependencies = [
231 + "cfg-if",
232 + "js-sys",
233 + "libc",
234 + "r-efi 5.3.0",
235 + "wasip2",
236 + "wasm-bindgen",
237 + ]
238 +
239 + [[package]]
240 + name = "getrandom"
241 + version = "0.4.2"
242 + source = "registry+https://github.com/rust-lang/crates.io-index"
243 + checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
244 + dependencies = [
245 + "cfg-if",
246 + "libc",
247 + "r-efi 6.0.0",
248 + "wasip2",
249 + "wasip3",
250 + ]
251 +
252 + [[package]]
253 + name = "hashbrown"
254 + version = "0.15.5"
255 + source = "registry+https://github.com/rust-lang/crates.io-index"
256 + checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
257 + dependencies = [
258 + "foldhash",
259 + ]
260 +
261 + [[package]]
262 + name = "hashbrown"
263 + version = "0.17.1"
264 + source = "registry+https://github.com/rust-lang/crates.io-index"
265 + checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a"
266 +
267 + [[package]]
268 + name = "heck"
269 + version = "0.5.0"
270 + source = "registry+https://github.com/rust-lang/crates.io-index"
271 + checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
272 +
273 + [[package]]
274 + name = "http"
275 + version = "1.4.1"
276 + source = "registry+https://github.com/rust-lang/crates.io-index"
277 + checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0"
278 + dependencies = [
279 + "bytes",
280 + "itoa",
281 + ]
282 +
283 + [[package]]
284 + name = "http-body"
285 + version = "1.0.1"
286 + source = "registry+https://github.com/rust-lang/crates.io-index"
287 + checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
288 + dependencies = [
289 + "bytes",
290 + "http",
291 + ]
292 +
293 + [[package]]
294 + name = "http-body-util"
295 + version = "0.1.3"
296 + source = "registry+https://github.com/rust-lang/crates.io-index"
297 + checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
298 + dependencies = [
299 + "bytes",
300 + "futures-core",
301 + "http",
302 + "http-body",
303 + "pin-project-lite",
304 + ]
305 +
306 + [[package]]
307 + name = "httparse"
308 + version = "1.10.1"
309 + source = "registry+https://github.com/rust-lang/crates.io-index"
310 + checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
311 +
312 + [[package]]
313 + name = "hyper"
314 + version = "1.10.1"
315 + source = "registry+https://github.com/rust-lang/crates.io-index"
316 + checksum = "55281c53a1894c864990125767da440a4e630446785086f52523b20033b74498"
317 + dependencies = [
318 + "atomic-waker",
319 + "bytes",
320 + "futures-channel",
321 + "futures-core",
322 + "http",
323 + "http-body",
324 + "httparse",
325 + "itoa",
326 + "pin-project-lite",
327 + "smallvec",
328 + "tokio",
329 + "want",
330 + ]
331 +
332 + [[package]]
333 + name = "hyper-rustls"
334 + version = "0.27.9"
335 + source = "registry+https://github.com/rust-lang/crates.io-index"
336 + checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f"
337 + dependencies = [
338 + "http",
339 + "hyper",
340 + "hyper-util",
341 + "rustls",
342 + "tokio",
343 + "tokio-rustls",
344 + "tower-service",
345 + "webpki-roots",
346 + ]
347 +
348 + [[package]]
349 + name = "hyper-util"
350 + version = "0.1.20"
351 + source = "registry+https://github.com/rust-lang/crates.io-index"
352 + checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0"
353 + dependencies = [
354 + "base64",
355 + "bytes",
356 + "futures-channel",
357 + "futures-util",
358 + "http",
359 + "http-body",
360 + "hyper",
361 + "ipnet",
362 + "libc",
363 + "percent-encoding",
364 + "pin-project-lite",
365 + "socket2",
366 + "tokio",
367 + "tower-service",
368 + "tracing",
369 + ]
370 +
371 + [[package]]
372 + name = "icu_collections"
373 + version = "2.2.0"
374 + source = "registry+https://github.com/rust-lang/crates.io-index"
375 + checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c"
376 + dependencies = [
377 + "displaydoc",
378 + "potential_utf",
379 + "utf8_iter",
380 + "yoke",
381 + "zerofrom",
382 + "zerovec",
383 + ]
384 +
385 + [[package]]
386 + name = "icu_locale_core"
387 + version = "2.2.0"
388 + source = "registry+https://github.com/rust-lang/crates.io-index"
389 + checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29"
390 + dependencies = [
391 + "displaydoc",
392 + "litemap",
393 + "tinystr",
394 + "writeable",
395 + "zerovec",
396 + ]
397 +
398 + [[package]]
399 + name = "icu_normalizer"
400 + version = "2.2.0"
401 + source = "registry+https://github.com/rust-lang/crates.io-index"
402 + checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4"
403 + dependencies = [
404 + "icu_collections",
405 + "icu_normalizer_data",
406 + "icu_properties",
407 + "icu_provider",
408 + "smallvec",
409 + "zerovec",
410 + ]
411 +
412 + [[package]]
413 + name = "icu_normalizer_data"
414 + version = "2.2.0"
415 + source = "registry+https://github.com/rust-lang/crates.io-index"
416 + checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38"
417 +
418 + [[package]]
419 + name = "icu_properties"
420 + version = "2.2.0"
421 + source = "registry+https://github.com/rust-lang/crates.io-index"
422 + checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de"
423 + dependencies = [
424 + "icu_collections",
425 + "icu_locale_core",
426 + "icu_properties_data",
427 + "icu_provider",
428 + "zerotrie",
429 + "zerovec",
430 + ]
431 +
432 + [[package]]
433 + name = "icu_properties_data"
434 + version = "2.2.0"
435 + source = "registry+https://github.com/rust-lang/crates.io-index"
436 + checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14"
437 +
438 + [[package]]
439 + name = "icu_provider"
440 + version = "2.2.0"
441 + source = "registry+https://github.com/rust-lang/crates.io-index"
442 + checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421"
443 + dependencies = [
444 + "displaydoc",
445 + "icu_locale_core",
446 + "writeable",
447 + "yoke",
448 + "zerofrom",
449 + "zerotrie",
450 + "zerovec",
451 + ]
452 +
453 + [[package]]
454 + name = "id-arena"
455 + version = "2.3.0"
456 + source = "registry+https://github.com/rust-lang/crates.io-index"
457 + checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954"
458 +
459 + [[package]]
460 + name = "idna"
461 + version = "1.1.0"
462 + source = "registry+https://github.com/rust-lang/crates.io-index"
463 + checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de"
464 + dependencies = [
465 + "idna_adapter",
466 + "smallvec",
467 + "utf8_iter",
468 + ]
469 +
470 + [[package]]
471 + name = "idna_adapter"
472 + version = "1.2.2"
473 + source = "registry+https://github.com/rust-lang/crates.io-index"
474 + checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714"
475 + dependencies = [
476 + "icu_normalizer",
477 + "icu_properties",
478 + ]
479 +
480 + [[package]]
481 + name = "indexmap"
482 + version = "2.14.0"
483 + source = "registry+https://github.com/rust-lang/crates.io-index"
484 + checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9"
485 + dependencies = [
486 + "equivalent",
487 + "hashbrown 0.17.1",
488 + "serde",
489 + "serde_core",
490 + ]
491 +
492 + [[package]]
493 + name = "ipnet"
494 + version = "2.12.0"
495 + source = "registry+https://github.com/rust-lang/crates.io-index"
496 + checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2"
497 +
498 + [[package]]
499 + name = "itoa"
500 + version = "1.0.18"
Lines truncated
@@ -0,0 +1,23 @@
1 + [package]
2 + name = "bento-driver"
3 + version = "0.1.0"
4 + edition = "2024"
5 + license = "MIT"
6 + description = "Thin agent-driven release driver: runs the proven macOS recipe (build -> sign -> notarize -> staple -> verify) on a build host via the ops-exec executor, then pulls the artifact. The full bentod/TUI orchestrator is deferred (launchplan §J)."
7 +
8 + [[bin]]
9 + name = "bento-release-macos"
10 + path = "src/main.rs"
11 +
12 + [dependencies]
13 + ops-exec = { path = "../../shared/ops-exec", features = ["rpc"] }
14 + tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread", "fs", "io-std"] }
15 + anyhow = "1.0.102"
16 + async-trait = "0.1.83"
17 + serde = { version = "1.0.228", features = ["derive"] }
18 + toml = "0.8"
19 + tracing = "0.1.44"
20 + tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
21 +
22 + [dev-dependencies]
23 + tempfile = "3.20"
@@ -0,0 +1,26 @@
1 + # bento-release-macos driver config.
2 + #
3 + # Run: bento-release-macos --config driver.toml [--version 0.4.1]
4 + #
5 + # Drives the macOS release on the build host's in-session ops-agent and pulls
6 + # the signed DMG back to this machine (fw13).
7 +
8 + [agent]
9 + # The ops-agent's tailnet URL on the Mac build host. Must match the agent's
10 + # `listen` address. http (not https): the tailnet (WireGuard) is the transport
11 + # security; identity is re-checked by the agent via `tailscale whois`.
12 + base_url = "http://100.64.0.2:8765"
13 + host_label = "mbp"
14 +
15 + [plan]
16 + app = "goingson"
17 + version = "0.4.1" # overridable with --version
18 + product_name = "GoingsOn" # used for the default DMG name
19 + repo_path = "~/Code/Apps/goingson" # checkout ON THE MAC
20 + env_file = "~/.tauri/passwords.env" # secrets sourced before the build
21 + dist_root = "~/Dist/goingson" # where the signed DMG lands ON THE MAC
22 + # dmg_name = "GoingsOn_0.4.1_aarch64.dmg" # optional; default shown
23 +
24 + [local]
25 + # Where on THIS host (fw13) to pull the signed DMG to.
26 + dest_dir = "~/Dist/goingson"
@@ -0,0 +1,301 @@
1 + //! The thin agent-driven release driver.
2 + //!
3 + //! Decision gate (launchplan §A "Driver question") resolved to **(a) a thin
4 + //! standalone driver** for the launch window — the full `bentod`/ratatui
5 + //! orchestrator is deferred (§J). This drives the *proven* macOS path:
6 + //!
7 + //! checkout -> release-macos.sh --keychain (build+sign+notarize+staple)
8 + //! -> verify gatekeeper -> pull the DMG
9 + //!
10 + //! It runs each step through an [`ops_exec::Executor`] — in production an
11 + //! [`ops_exec::AgentRpc`] to the in-session `ops-agent` on the Mac (the only
12 + //! place codesign can use the Developer ID key), but it works against any
13 + //! executor, so a `LocalExec`/`SshExec` exercises the exact same recipe.
14 + //!
15 + //! The recipe shape ([`ReleasePlan`]) is pure and unit-tested; the run loop is
16 + //! transport-agnostic. Producing a *real* notarized DMG additionally needs the
17 + //! Mac (Aqua session, secrets, Apple notary) and is handoff H1.
18 +
19 + use anyhow::{Context, Result};
20 + use ops_exec::{Action, Executor, LogSink, ObserveKind, RunOutput, Step};
21 +
22 + /// Everything needed to drive one app's macOS release on a build host.
23 + #[derive(Clone, Debug)]
24 + pub struct ReleasePlan {
25 + /// App slug, e.g. `goingson`.
26 + pub app: String,
27 + /// Version being released, e.g. `0.4.1` (no leading `v`).
28 + pub version: String,
29 + /// Repo checkout path ON THE BUILD HOST, e.g. `~/Code/Apps/goingson`.
30 + pub repo_path: String,
31 + /// Env file sourced before the release script (secrets), e.g.
32 + /// `~/.tauri/passwords.env`.
33 + pub env_file: String,
34 + /// Directory ON THE BUILD HOST where the signed DMG lands, e.g.
35 + /// `~/Dist/goingson`. The DMG name follows Tauri's convention.
36 + pub dist_root: String,
37 + /// The DMG file name. Defaults via [`ReleasePlan::default_dmg_name`].
38 + pub dmg_name: String,
39 + }
40 +
41 + impl ReleasePlan {
42 + /// Tauri's default macOS DMG name for an aarch64 build.
43 + pub fn default_dmg_name(app_product_name: &str, version: &str) -> String {
44 + format!("{app_product_name}_{version}_aarch64.dmg")
45 + }
46 +
47 + /// `git pull` + checkout the release tag on the build host. Gated as
48 + /// `build` (source prep is part of building).
49 + pub fn checkout_step(&self) -> Step {
50 + Step::shell(
51 + Action::Build,
52 + format!("set -e; git pull --ff-only && git checkout v{}", self.version),
53 + )
54 + .with_cwd(&self.repo_path)
55 + }
56 +
57 + /// Source secrets, then run the proven release script with the dedicated
58 + /// build keychain. The script does build + codesign + notarize + staple +
59 + /// its own verify. Gated as `sign` — running it requires the sign grant,
60 + /// which is the whole point of the capability model.
61 + pub fn release_step(&self) -> Step {
62 + Step::shell(
63 + Action::Sign,
64 + format!(". {} && ./dist/release-macos.sh --keychain", self.env_file),
65 + )
66 + .with_cwd(&self.repo_path)
67 + }
68 +
69 + /// Independent Gatekeeper assessment of the produced DMG (read-only, so
70 + /// it is an `observe` action). The driver double-checks the script's own
71 + /// verify because the daemon's verify gate is load-bearing, not decorative.
72 + pub fn verify_step(&self) -> Step {
73 + Step::shell(
74 + Action::Observe(ObserveKind::Custom("gatekeeper".into())),
75 + format!("spctl --assess -vv --type install {} 2>&1", shell_quote(&self.dmg_remote_path())),
76 + )
77 + }
78 +
79 + /// The DMG path on the build host.
80 + pub fn dmg_remote_path(&self) -> String {
81 + format!("{}/{}", self.dist_root.trim_end_matches('/'), self.dmg_name)
82 + }
83 + }
84 +
85 + /// Minimal single-quote for the one path we interpolate into the verify
86 + /// command. (The executor sh-quotes argv for us; this is only for the literal
87 + /// string we build here.)
88 + fn shell_quote(s: &str) -> String {
89 + format!("'{}'", s.replace('\'', r"'\''"))
90 + }
91 +
92 + /// The result of a release run.
93 + #[derive(Debug)]
94 + pub struct ReleaseOutcome {
95 + /// Did `spctl` report the DMG as notarized + accepted?
96 + pub gatekeeper_accepted: bool,
97 + /// The DMG path on the build host (pull it with `executor.pull`).
98 + pub dmg_remote: String,
99 + }
100 +
101 + /// Run a single step, streaming into `sink`, and fail on a non-zero exit.
102 + async fn run_step(
103 + exec: &dyn Executor,
104 + step: &Step,
105 + sink: &mut dyn LogSink,
106 + label: &str,
107 + ) -> Result<RunOutput> {
108 + let out = exec
109 + .run_streaming(step, sink)
110 + .await
111 + .with_context(|| format!("running {label}"))?;
112 + anyhow::ensure!(
113 + out.status.success(),
114 + "{label} failed (exit {}): {}",
115 + out.status.code().map(|c| c.to_string()).unwrap_or_else(|| "signal".into()),
116 + String::from_utf8_lossy(&out.stderr),
117 + );
118 + Ok(out)
119 + }
120 +
121 + /// Drive the full macOS release recipe through `exec`, streaming all output to
122 + /// `sink`. Does NOT pull the artifact — the caller does that (transport choice
123 + /// belongs there: `AgentRpc::pull` for a single DMG, or `SshExec::pull` for a
124 + /// dir). Returns whether Gatekeeper accepted the DMG.
125 + pub async fn run_release(
126 + exec: &dyn Executor,
127 + plan: &ReleasePlan,
128 + sink: &mut dyn LogSink,
129 + ) -> Result<ReleaseOutcome> {
130 + run_step(exec, &plan.checkout_step(), sink, "checkout").await?;
131 + run_step(exec, &plan.release_step(), sink, "release-macos.sh --keychain").await?;
132 + let verify = run_step(exec, &plan.verify_step(), sink, "verify gatekeeper").await?;
133 +
134 + // spctl writes its verdict to stderr/stdout (we merged with 2>&1).
135 + let combined = format!(
136 + "{}{}",
137 + String::from_utf8_lossy(&verify.stdout),
138 + String::from_utf8_lossy(&verify.stderr)
139 + );
140 + Ok(ReleaseOutcome {
141 + gatekeeper_accepted: gatekeeper_accepted(&combined),
142 + dmg_remote: plan.dmg_remote_path(),
143 + })
144 + }
145 +
146 + /// Parse an `spctl --assess -vv --type install` verdict. The accept banner is
147 + /// `source=Notarized Developer ID` (`accepted` appears alongside `source=` on
148 + /// success). A rejection prints `rejected` and no notarized source.
149 + pub fn gatekeeper_accepted(spctl_output: &str) -> bool {
150 + spctl_output.contains("source=Notarized Developer ID")
151 + || (spctl_output.contains("accepted") && spctl_output.contains("source="))
152 + }
153 +
154 + /// A [`LogSink`] that writes streamed chunks straight to this process's stdout,
155 + /// so the operator watches the build live.
156 + pub struct StdoutSink;
157 +
158 + #[async_trait::async_trait]
159 + impl LogSink for StdoutSink {
160 + async fn write_chunk(&mut self, bytes: &[u8]) {
161 + use tokio::io::AsyncWriteExt;
162 + let mut out = tokio::io::stdout();
163 + let _ = out.write_all(bytes).await;
164 + let _ = out.flush().await;
165 + }
166 + }
167 +
168 + #[cfg(test)]
169 + mod tests {
170 + use super::*;
171 + use ops_exec::{CapabilitySet, LocalExec};
172 +
173 + fn go_plan(repo: &str, dist: &str) -> ReleasePlan {
174 + ReleasePlan {
175 + app: "goingson".into(),
176 + version: "0.4.1".into(),
177 + repo_path: repo.into(),
178 + env_file: "~/.tauri/passwords.env".into(),
179 + dist_root: dist.into(),
180 + dmg_name: ReleasePlan::default_dmg_name("GoingsOn", "0.4.1"),
181 + }
182 + }
183 +
184 + #[test]
185 + fn checkout_step_pulls_and_checks_out_the_tag() {
186 + let s = go_plan("/repo", "/dist").checkout_step();
187 + assert_eq!(s.action, Action::Build);
188 + let script = s.argv.last().unwrap();
189 + assert!(script.contains("git checkout v0.4.1"), "{script}");
190 + assert_eq!(s.cwd.as_deref(), Some(std::path::Path::new("/repo")));
191 + }
192 +
193 + #[test]
194 + fn release_step_is_gated_as_sign_and_sources_secrets() {
195 + let s = go_plan("/repo", "/dist").release_step();
196 + assert_eq!(s.action, Action::Sign);
197 + let script = s.argv.last().unwrap();
198 + assert!(script.contains("passwords.env"));
199 + assert!(script.contains("release-macos.sh --keychain"));
200 + }
201 +
202 + #[test]
203 + fn verify_step_is_an_observe_action_on_the_dmg() {
204 + let s = go_plan("/repo", "/dist").verify_step();
205 + assert_eq!(s.action, Action::Observe(ObserveKind::Custom("gatekeeper".into())));
206 + let script = s.argv.last().unwrap();
207 + assert!(script.contains("spctl --assess"));
208 + assert!(script.contains("/dist/GoingsOn_0.4.1_aarch64.dmg"));
209 + }
210 +
211 + #[test]
212 + fn dmg_path_trims_trailing_slash() {
213 + let p = go_plan("/repo", "/dist/").dmg_remote_path();
214 + assert_eq!(p, "/dist/GoingsOn_0.4.1_aarch64.dmg");
215 + }
216 +
217 + #[test]
218 + fn gatekeeper_banner_parsing() {
219 + assert!(gatekeeper_accepted(
220 + "GoingsOn.dmg: accepted\nsource=Notarized Developer ID\norigin=Developer ID Application: ..."
221 + ));
222 + assert!(gatekeeper_accepted("X.dmg: accepted\nsource=Notarized Developer ID"));
223 + assert!(!gatekeeper_accepted("X.dmg: rejected\nsource=no usable signature"));
224 + assert!(!gatekeeper_accepted(""));
225 + }
226 +
227 + /// End-to-end against a LocalExec, proving the whole run loop (gating,
228 + /// sequencing, DMG production, gatekeeper parsing) without a Mac. `git` and
229 + /// `spctl` are PATH shims; the fake `dist/release-macos.sh` writes the DMG;
230 + /// the `spctl` shim prints the notarized banner. Serialized via a mutex
231 + /// because it mutates `PATH` (process-global).
232 + #[tokio::test]
233 + async fn run_release_drives_the_full_recipe_against_a_local_fake() {
234 + let _guard = PATH_LOCK.lock().await;
235 + let dir = tempfile::tempdir().unwrap();
236 + let repo = dir.path().join("repo");
237 + let dist = dir.path().join("dist");
238 + let bindir = dir.path().join("bin");
239 + tokio::fs::create_dir_all(repo.join("dist")).await.unwrap();
240 + tokio::fs::create_dir_all(&dist).await.unwrap();
241 + tokio::fs::create_dir_all(&bindir).await.unwrap();
242 +
243 + let dmg_name = ReleasePlan::default_dmg_name("GoingsOn", "0.4.1");
244 + let dmg = dist.join(&dmg_name);
245 +
246 + // Fake `git` (any subcommand succeeds) and `spctl` (prints notarized).
247 + write_shim(&bindir.join("git"), "#!/bin/sh\nexit 0\n").await;
248 + write_shim(
249 + &bindir.join("spctl"),
250 + "#!/bin/sh\necho 'accepted'\necho 'source=Notarized Developer ID'\n",
251 + )
252 + .await;
253 + // The fake release script writes the DMG into dist_root.
254 + write_shim(
255 + &repo.join("dist/release-macos.sh"),
256 + &format!("#!/bin/sh\nset -e\nprintf 'building %s\\n' \"$PWD\"\n: > '{}'\n", dmg.display()),
257 + )
258 + .await;
259 +
260 + let orig_path = std::env::var("PATH").unwrap_or_default();
261 + // SAFETY: serialized by PATH_LOCK; restored before the guard drops.
262 + unsafe { std::env::set_var("PATH", format!("{}:{}", bindir.display(), orig_path)); }
263 +
264 + let plan = ReleasePlan {
265 + app: "goingson".into(),
266 + version: "0.4.1".into(),
267 + repo_path: repo.to_string_lossy().into_owned(),
268 + env_file: "/dev/null".into(), // `. /dev/null` is a harmless no-op
269 + dist_root: dist.to_string_lossy().into_owned(),
270 + dmg_name,
271 + };
272 + let exec = LocalExec::new(CapabilitySet::from_tokens(
273 + ["build", "sign", "notarize", "staple"],
274 + ["gatekeeper"],
275 + ));
276 +
277 + let mut sink = Discard;
278 + let outcome = run_release(&exec, &plan, &mut sink).await.expect("recipe should run");
279 +
280 + // SAFETY: serialized by PATH_LOCK.
281 + unsafe { std::env::set_var("PATH", orig_path); }
282 +
283 + assert!(dmg.exists(), "release step should produce the DMG");
284 + assert!(outcome.gatekeeper_accepted, "fake spctl reports notarized");
285 + assert_eq!(outcome.dmg_remote, dist.join("GoingsOn_0.4.1_aarch64.dmg").to_string_lossy());
286 + }
287 +
288 + static PATH_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
289 +
290 + async fn write_shim(path: &std::path::Path, body: &str) {
291 + tokio::fs::write(path, body).await.unwrap();
292 + let out = tokio::process::Command::new("chmod").arg("+x").arg(path).output().await.unwrap();
293 + assert!(out.status.success());
294 + }
295 +
296 + struct Discard;
297 + #[async_trait::async_trait]
298 + impl LogSink for Discard {
299 + async fn write_chunk(&mut self, _b: &[u8]) {}
300 + }
301 + }
@@ -0,0 +1,141 @@
1 + //! `bento-release-macos` — drive one macOS release through the in-session
2 + //! `ops-agent` and pull the signed DMG back.
3 + //!
4 + //! Usage: `bento-release-macos --config driver.toml [--version 0.4.1]`
5 + //!
6 + //! This is the thin driver (launchplan §A decision (a)); the full bentod/TUI is
7 + //! deferred. It is transport-agnostic via `ops-exec` — in production it points
8 + //! at the Mac's `ops-agent` (`AgentRpc`), the only context where codesign can
9 + //! use the Developer ID key.
10 +
11 + use anyhow::{Context, Result};
12 + use bento_driver::{ReleasePlan, StdoutSink, run_release};
13 + use ops_exec::{AgentRpc, CapabilitySet, Executor};
14 + use serde::Deserialize;
15 + use std::path::PathBuf;
16 +
17 + #[derive(Debug, Deserialize)]
18 + struct DriverConfig {
19 + agent: AgentSection,
20 + plan: PlanSection,
21 + /// Where on THIS host (fw13) to pull the signed DMG.
22 + local: LocalSection,
23 + }
24 +
25 + #[derive(Debug, Deserialize)]
26 + struct AgentSection {
27 + /// e.g. `http://mbp.tailnet:8765`
28 + base_url: String,
29 + /// Audit label, e.g. `mbp`.
30 + host_label: String,
31 + }
32 +
33 + #[derive(Debug, Deserialize)]
34 + struct PlanSection {
35 + app: String,
36 + version: String,
37 + repo_path: String,
38 + env_file: String,
39 + dist_root: String,
40 + /// Optional; defaults to `<product>_<version>_aarch64.dmg`.
41 + dmg_name: Option<String>,
42 + /// Product name used for the default DMG name, e.g. `GoingsOn`.
43 + product_name: String,
44 + }
45 +
46 + #[derive(Debug, Deserialize)]
47 + struct LocalSection {
48 + dest_dir: PathBuf,
49 + }
50 +
51 + #[tokio::main]
52 + async fn main() -> Result<()> {
53 + tracing_subscriber::fmt()
54 + .with_env_filter(
55 + tracing_subscriber::EnvFilter::try_from_default_env()
56 + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
57 + )
58 + .init();
59 +
60 + let args = Args::parse()?;
61 + let raw = std::fs::read_to_string(&args.config)
62 + .with_context(|| format!("reading {}", args.config))?;
63 + let mut cfg: DriverConfig = toml::from_str(&raw).context("parsing driver config")?;
64 + if let Some(v) = args.version {
65 + cfg.plan.version = v;
66 + }
67 +
68 + let dmg_name = cfg.plan.dmg_name.clone().unwrap_or_else(|| {
69 + ReleasePlan::default_dmg_name(&cfg.plan.product_name, &cfg.plan.version)
70 + });
71 + let plan = ReleasePlan {
72 + app: cfg.plan.app.clone(),
73 + version: cfg.plan.version.clone(),
74 + repo_path: cfg.plan.repo_path.clone(),
75 + env_file: cfg.plan.env_file.clone(),
76 + dist_root: cfg.plan.dist_root.clone(),
77 + dmg_name: dmg_name.clone(),
78 + };
79 +
80 + // The driver's caller-side grant: it may build/sign/notarize/staple and
81 + // observe the gatekeeper verdict. The agent re-checks against its own grant.
82 + let exec = AgentRpc::new(
83 + cfg.agent.base_url.clone(),
84 + cfg.agent.host_label.clone(),
85 + CapabilitySet::from_tokens(["build", "sign", "notarize", "staple"], ["gatekeeper"]),
86 + );
87 +
88 + // Fail fast if the agent isn't reachable / in-session.
89 + let health = exec.health().await.context(
90 + "agent /health failed — is ops-agent running in the Aqua session on the build host?",
91 + )?;
92 + tracing::info!(actuate = ?health.actuate, "agent reachable");
93 +
94 + println!("==> releasing {} {} via {}", plan.app, plan.version, cfg.agent.host_label);
95 + let mut sink = StdoutSink;
96 + let outcome = run_release(&exec, &plan, &mut sink).await?;
97 + if !outcome.gatekeeper_accepted {
98 + anyhow::bail!(
99 + "Gatekeeper did NOT accept {} — not pulling. Check the notarization log.",
100 + outcome.dmg_remote
101 + );
102 + }
103 + println!("\n==> Gatekeeper accepted; pulling DMG");
104 +
105 + tokio::fs::create_dir_all(&cfg.local.dest_dir).await.ok();
106 + let local_dmg = cfg.local.dest_dir.join(&dmg_name);
107 + exec.pull(
108 + std::path::Path::new(&outcome.dmg_remote),
109 + &local_dmg,
110 + &Default::default(),
111 + )
112 + .await
113 + .context("pulling the signed DMG back")?;
114 +
115 + println!("==> done: {}", local_dmg.display());
116 + Ok(())
117 + }
118 +
119 + struct Args {
120 + config: String,
121 + version: Option<String>,
122 + }
123 +
124 + impl Args {
125 + fn parse() -> Result<Self> {
126 + let mut config = None;
127 + let mut version = None;
128 + let mut it = std::env::args().skip(1);
129 + while let Some(a) = it.next() {
130 + match a.as_str() {
131 + "--config" | "-c" => config = it.next(),
132 + "--version" | "-v" => version = it.next(),
133 + other => anyhow::bail!("unexpected arg `{other}` (usage: --config <toml> [--version <ver>])"),
134 + }
135 + }
136 + Ok(Self {
137 + config: config.context("--config <driver.toml> is required")?,
138 + version,
139 + })
140 + }
141 + }
@@ -33,6 +33,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
33 33 checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
34 34
35 35 [[package]]
36 + name = "async-trait"
37 + version = "0.1.89"
38 + source = "registry+https://github.com/rust-lang/crates.io-index"
39 + checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
40 + dependencies = [
41 + "proc-macro2",
42 + "quote",
43 + "syn",
44 + ]
45 +
46 + [[package]]
36 47 name = "atoi"
37 48 version = "2.0.0"
38 49 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1153,6 +1164,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
1153 1164 checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
1154 1165
1155 1166 [[package]]
1167 + name = "ops-exec"
1168 + version = "0.1.0"
1169 + dependencies = [
1170 + "anyhow",
1171 + "async-trait",
1172 + "serde",
1173 + "thiserror",
1174 + "tokio",
1175 + "tracing",
1176 + ]
1177 +
1178 + [[package]]
1156 1179 name = "parking"
1157 1180 version = "2.2.1"
1158 1181 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1617,11 +1640,13 @@ name = "sando-daemon"
1617 1640 version = "0.2.0"
1618 1641 dependencies = [
1619 1642 "anyhow",
1643 + "async-trait",
1620 1644 "axum",
1621 1645 "chrono",
1622 1646 "http-body-util",
1623 1647 "metrics",
1624 1648 "metrics-exporter-prometheus",
1649 + "ops-exec",
1625 1650 "reqwest",
1626 1651 "semver",
1627 1652 "serde",
@@ -11,6 +11,8 @@ path = "src/main.rs"
11 11 [dependencies]
12 12 axum = { version = "0.8.8", features = ["macros", "ws"] }
13 13 tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread", "net", "signal", "fs", "process"] }
14 + ops-exec = { path = "../../shared/ops-exec" }
15 + async-trait = "0.1.83"
14 16 serde = { version = "1.0.228", features = ["derive"] }
15 17 serde_json = "1"
16 18 toml = "0.8"
@@ -14,28 +14,56 @@
14 14 //! `<release_root>/current/<bin_name>` so reload-or-restart picks up the new
15 15 //! binary without ever pointing at a missing path.
16 16 //!
17 - //! For nodes with `ssh_target` set to anything other than `"local"`, deploy
18 - //! goes via rsync + ssh; the bootstrap (creating release_root, installing the
19 - //! service unit, granting sudo for systemctl) is out of scope here — it
20 - //! happens once per node, not per deploy.
17 + //! The host-side transport — `ssh` for shell steps, `rsync` for the release
18 + //! dir — comes from the shared [`ops_exec::Executor`] (a `LocalExec` for
19 + //! `ssh_target = "local"`, an `SshExec` otherwise), built once per node in
20 + //! [`crate::state`]. This module owns the *deploy choreography* (mkdir, push,
21 + //! atomic swap, restart, gc); the transport is the crate's. SSH push behavior
22 + //! is identical to the pre-extraction code — this is a transport extraction,
23 + //! not a model change.
21 24
22 25 use crate::topology::Node;
23 26 use anyhow::{Context, Result};
27 + use async_trait::async_trait;
28 + use ops_exec::{Action, Executor, LogSink, RunOutput, Step, SyncOpts, sh_quote};
24 29 use std::path::{Path, PathBuf};
25 30 use tokio::process::Command;
26 31
27 - /// SSH options used everywhere we shell out to ssh — fail fast, no prompts.
28 - const SSH_FLAGS: &[&str] = &[
29 - "-o", "BatchMode=yes",
30 - "-o", "ConnectTimeout=10",
31 - "-o", "StrictHostKeyChecking=accept-new",
32 - ];
33 -
34 32 /// Keep this many release dirs per node; older ones get gc'd after a
35 33 /// successful deploy. Fixed for now; promote to config if the constant ever
36 34 /// needs to vary by tier.
37 35 const RELEASES_TO_KEEP: usize = 5;
38 36
37 + /// A sink that drops streamed bytes. Deploy steps don't have a live-log handle
38 + /// (gates do), so output is discarded as it streams; [`RunOutput`] still
39 + /// captures the full stdout/stderr for error reporting, preserving the
40 + /// pre-extraction behavior of surfacing `stderr` in failure messages.
41 + struct DiscardSink;
42 +
43 + #[async_trait]
44 + impl LogSink for DiscardSink {
45 + async fn write_chunk(&mut self, _bytes: &[u8]) {}
46 + }
47 +
48 + /// Run a shell step through `executor`, treating a non-zero exit as an error
49 + /// whose message carries the captured stderr — exactly as the old bespoke
50 + /// `ssh()` helper did (`ssh <target> failed: <stderr>`).
51 + async fn run_checked(executor: &dyn Executor, script: &str, what: &str) -> Result<RunOutput> {
52 + let step = Step::shell(Action::Deploy, script);
53 + let mut sink = DiscardSink;
54 + let out = executor
55 + .run_streaming(&step, &mut sink)
56 + .await
57 + .with_context(|| format!("{what}: spawning command"))?;
58 + anyhow::ensure!(
59 + out.status.success(),
60 + "{what} failed (exit {}): {}",
61 + out.status.code().map(|c| c.to_string()).unwrap_or_else(|| "signal".into()),
62 + String::from_utf8_lossy(&out.stderr),
63 + );
64 + Ok(out)
65 + }
66 +
39 67 pub async fn deploy_local(
40 68 release_root: &Path,
41 69 version: &crate::domain::Version,
@@ -72,12 +100,14 @@ pub async fn deploy_local(
72 100 }
73 101
74 102 /// Deploy `staged_release_dir` (a directory built on the Sando host by
75 - /// `deploy_local`) to `node`. For `ssh_target=local`, this is just symlink
76 - /// swap + restart; for remote nodes, we rsync the whole dir.
103 + /// `deploy_local`) to `node` using `executor` (its transport from the topology
104 + /// executor map). For `ssh_target=local`, this is just a symlink swap; for
105 + /// remote nodes, we rsync the whole dir over the executor.
77 106 ///
78 107 /// `primary_bin` is only used for logging — every file present in the staged
79 108 /// dir gets shipped.
80 109 pub async fn deploy_node(
110 + executor: &dyn Executor,
81 111 node: &Node,
82 112 version: &str,
83 113 staged_release_dir: &Path,
@@ -86,78 +116,63 @@ pub async fn deploy_node(
86 116 if node.ssh_target == "local" || node.ssh_target.is_empty() {
87 117 // Local deploy already happened when we staged on the Sando host.
88 118 // Just re-point `current` at the staged dir.
89 - return reset_local_current(Path::new(&node.release_root), version).await;
119 + return reset_local_current(executor, Path::new(&node.release_root), version).await;
90 120 }
91 - deploy_remote(node, version, staged_release_dir, primary_bin).await
121 + deploy_remote(executor, node, version, staged_release_dir, primary_bin).await
92 122 }
93 123
94 - async fn reset_local_current(release_root: &Path, version: &str) -> Result<PathBuf> {
124 + async fn reset_local_current(
125 + executor: &dyn Executor,
126 + release_root: &Path,
127 + version: &str,
128 + ) -> Result<PathBuf> {
95 129 let current = release_root.join("current");
96 130 let target = format!("releases/{version}");
97 - let out = Command::new("ln")
98 - .args(["-sfn", &target])
99 - .arg(&current)
100 - .output()
101 - .await?;
102 - anyhow::ensure!(
103 - out.status.success(),
104 - "symlink swap failed: {}",
105 - String::from_utf8_lossy(&out.stderr),
106 - );
131 + run_checked(
132 + executor,
133 + &format!("ln -sfn {} {}", sh_quote(&target), sh_quote(&current.to_string_lossy())),
134 + "local symlink swap",
135 + )
136 + .await?;
107 137 Ok(release_root.join("releases").join(version))
108 138 }
109 139
110 140 async fn deploy_remote(
141 + executor: &dyn Executor,
111 142 node: &Node,
112 143 version: &str,
113 144 staged_release_dir: &Path,
114 145 primary_bin: &str,
115 146 ) -> Result<PathBuf> {
116 147 let release_root = &node.release_root;
117 - let ssh_target = &node.ssh_target;
118 148 let service = &node.service_name;
119 149 let release_dir = format!("{release_root}/releases/{version}");
120 150
121 151 tracing::info!(node = %node.name, version, "deploy: mkdir release dir");
122 - ssh(ssh_target, &format!("set -e; mkdir -p {q}", q = sh_quote(&release_dir)))
123 - .await
124 - .context("creating remote release dir")?;
152 + run_checked(
153 + executor,
154 + &format!("set -e; mkdir -p {q}", q = sh_quote(&release_dir)),
155 + "creating remote release dir",
156 + )
157 + .await?;
125 158
126 159 tracing::info!(node = %node.name, version, primary = %primary_bin, "deploy: rsync release dir");
127 160 // Rsync the whole staged dir (binaries + every release_contents entry).
128 - // Trailing slash on source = contents of dir, not the dir itself.
129 - //
130 - // --delete: removed assets across versions don't accumulate on the
131 - // target. Bundle stays self-contained per version.
132 - // --chmod: `F+X` preserves execute bit per-file (binaries land 0755,
133 - // data files 0644) instead of the old blanket-0755 that was wrong for
134 - // static assets + docs.
135 - let rsync_src = format!("{}/", staged_release_dir.display());
136 - let rsync_dest = format!("{ssh_target}:{release_dir}/");
137 - let mut rsync = Command::new("rsync");
138 - rsync
139 - .arg("-az")
140 - .arg("--partial")
141 - .arg("--delete")
142 - .arg("--chmod=Du=rwx,Dgo=rx,Fu=rw,Fgo=r,F+X")
143 - .arg("-e")
144 - .arg(format!(
145 - "ssh {}",
146 - SSH_FLAGS.iter().map(|s| s.to_string()).collect::<Vec<_>>().join(" ")
147 - ))
148 - .arg(&rsync_src)
149 - .arg(&rsync_dest);
150 - let out = rsync.output().await.context("spawning rsync")?;
151 - anyhow::ensure!(
152 - out.status.success(),
153 - "rsync failed (current symlink left intact): {}",
154 - String::from_utf8_lossy(&out.stderr),
155 - );
161 + // `SyncOpts::release_mirror()` is the exact pre-extraction rsync flag set:
162 + // -az --partial --delete --chmod=Du=rwx,Dgo=rx,Fu=rw,Fgo=r,F+X.
163 + // --delete: removed assets across versions don't accumulate on the
164 + // target; the bundle stays self-contained per version.
165 + // --chmod: F+X preserves the execute bit per-file (binaries land 0755,
166 + // data files 0644) instead of a blanket 0755.
167 + executor
168 + .push(staged_release_dir, Path::new(&release_dir), &SyncOpts::release_mirror())
169 + .await
170 + .context("rsync failed (current symlink left intact)")?;
156 171
157 172 tracing::info!(node = %node.name, version, "deploy: symlink swap + service reload");
158 - // Symlink swap is atomic via `mv -T` of a freshly-created symlink over
159 - // the old one (the rename(2) is the atomic step; `ln -sfn` does
160 - // unlink+symlink which has a window).
173 + // Symlink swap is atomic via `mv -T` of a freshly-created symlink over the
174 + // old one (the rename(2) is the atomic step; `ln -sfn` does unlink+symlink
175 + // which has a window).
161 176 let swap_and_restart = format!(
162 177 "set -e; \
163 178 cd {root}; \
@@ -168,29 +183,15 @@ async fn deploy_remote(
168 183 ver = sh_quote(version),
169 184 svc = sh_quote(service),
170 185 );
171 - ssh(ssh_target, &swap_and_restart)
172 - .await
173 - .context("symlink swap + systemctl reload-or-restart")?;
186 + run_checked(executor, &swap_and_restart, "symlink swap + systemctl reload-or-restart").await?;
174 187
175 - if let Err(e) = gc_remote_releases(ssh_target, release_root).await {
188 + if let Err(e) = gc_remote_releases(executor, release_root).await {
176 189 tracing::warn!(error = %e, "remote release GC failed (non-fatal)");
177 190 }
178 191
179 192 Ok(PathBuf::from(release_root).join("releases").join(version))
180 193 }
181 194
182 - async fn ssh(target: &str, script: &str) -> Result<()> {
183 - let mut cmd = Command::new("ssh");
184 - cmd.args(SSH_FLAGS).arg(target).arg(script);
185 - let out = cmd.output().await.context("spawning ssh")?;
186 - anyhow::ensure!(
187 - out.status.success(),
188 - "ssh {target} failed: {}",
189 - String::from_utf8_lossy(&out.stderr),
190 - );
191 - Ok(())
192 - }
193 -
194 195 async fn gc_local_releases(release_root: &Path) -> Result<()> {
195 196 let releases = release_root.join("releases");
196 197 if !releases.exists() {
@@ -216,7 +217,7 @@ async fn gc_local_releases(release_root: &Path) -> Result<()> {
216 217 Ok(())
217 218 }
218 219
219 - async fn gc_remote_releases(ssh_target: &str, release_root: &str) -> Result<()> {
220 + async fn gc_remote_releases(executor: &dyn Executor, release_root: &str) -> Result<()> {
220 221 // `ls -t` orders by mtime desc. Skip the first N, rm the rest. `xargs -r`
221 222 // is a no-op when stdin is empty (avoids `rm` complaining).
222 223 let script = format!(
@@ -225,32 +226,18 @@ async fn gc_remote_releases(ssh_target: &str, release_root: &str) -> Result<()>
225 226 root = sh_quote(release_root),
226 227 keep_plus_one = RELEASES_TO_KEEP + 1,
227 228 );
228 - ssh(ssh_target, &script).await
229 - }
230 -
231 - /// Single-quote a string for safe inclusion in a /bin/sh command, escaping
232 - /// any single quote inside. Not bulletproof for adversarial input, but every
233 - /// path here comes from our own config files.
234 - fn sh_quote(s: &str) -> String {
235 - let escaped = s.replace('\'', r"'\''");
236 - format!("'{escaped}'")
229 + run_checked(executor, &script, "remote release gc").await.map(|_| ())
237 230 }
238 231
239 232 #[cfg(test)]
240 233 mod tests {
241 234 use super::*;
235 + use ops_exec::{CapabilitySet, LocalExec, SshExec};
242 236 use std::time::SystemTime;
243 237
244 - #[test]
245 - fn sh_quote_no_quote() {
246 - assert_eq!(sh_quote("hello"), "'hello'");
247 - assert_eq!(sh_quote("/opt/mnw/releases/0.8.12"), "'/opt/mnw/releases/0.8.12'");
248 - }
249 -
250 - #[test]
251 - fn sh_quote_with_quote() {
252 - // The string `it's` becomes `'it'\''s'` — close, escape, open.
253 - assert_eq!(sh_quote("it's"), r"'it'\''s'");
238 + /// A LocalExec granted the default node capabilities (deploy + restart).
239 + fn local_executor() -> LocalExec {
240 + LocalExec::new(CapabilitySet::from_tokens(["deploy", "restart"], ["health"]))
254 241 }
255 242
256 243 #[tokio::test]
@@ -258,7 +245,6 @@ mod tests {
258 245 let tmp = tempfile::tempdir().unwrap();
259 246 let root = tmp.path();
260 247
261 - // Source binaries (worktree's target/release/)
262 248 let src_dir = root.join("src");
263 249 tokio::fs::create_dir_all(&src_dir).await.unwrap();
264 250 let primary = src_dir.join("makenotwork");
@@ -266,7 +252,6 @@ mod tests {
266 252 tokio::fs::write(&primary, b"PRIMARY").await.unwrap();
267 253 tokio::fs::write(&admin, b"ADMIN").await.unwrap();
268 254
269 - // Release root (where staged versions live)
270 255 let release_root = root.join("releases-root");
271 256 tokio::fs::create_dir_all(&release_root).await.unwrap();
272 257
@@ -282,11 +267,9 @@ mod tests {
282 267 assert_eq!(tokio::fs::read(staged.join("makenotwork")).await.unwrap(), b"PRIMARY");
283 268 assert_eq!(tokio::fs::read(staged.join("mnw-admin")).await.unwrap(), b"ADMIN");
284 269
285 - // current symlink should resolve to staged
286 270 let current = release_root.join("current");
287 271 let target = tokio::fs::read_link(&current).await.unwrap();
288 272 assert_eq!(target.to_string_lossy(), "releases/0.8.12");
289 - // And reading through `current/` should give the new content.
290 273 let via_current = tokio::fs::read(current.join("makenotwork")).await.unwrap();
291 274 assert_eq!(via_current, b"PRIMARY");
292 275 }
@@ -304,14 +287,11 @@ mod tests {
304 287 tokio::fs::create_dir_all(&release_root).await.unwrap();
305 288
306 289 deploy_local(&release_root, &"0.1.0".parse().unwrap(), &[bin.clone()]).await.unwrap();
307 - // Rewrite source then deploy 0.2.0.
308 290 tokio::fs::write(&bin, b"V2").await.unwrap();
309 291 deploy_local(&release_root, &"0.2.0".parse().unwrap(), &[bin.clone()]).await.unwrap();
310 292
311 - // Both versions present on disk.
312 293 assert!(release_root.join("releases/0.1.0/server").exists());
313 294 assert!(release_root.join("releases/0.2.0/server").exists());
314 - // current points at the new one.
315 295 let target = tokio::fs::read_link(release_root.join("current")).await.unwrap();
316 296 assert_eq!(target.to_string_lossy(), "releases/0.2.0");
317 297 let via_current = tokio::fs::read(release_root.join("current/server")).await.unwrap();
@@ -320,8 +300,6 @@ mod tests {
320 300
321 301 #[tokio::test]
322 302 async fn gc_local_releases_keeps_last_n_by_mtime() {
323 - // Build > RELEASES_TO_KEEP fake release dirs with distinct mtimes,
324 - // then run gc and check which survived.
325 303 let tmp = tempfile::tempdir().unwrap();
326 304 let root = tmp.path();
327 305 let releases = root.join("releases");
@@ -333,10 +311,6 @@ mod tests {
333 311 let name = format!("v{i:02}");
334 312 let dir = releases.join(&name);
335 313 tokio::fs::create_dir(&dir).await.unwrap();
336 - // Stagger mtimes deterministically. tokio's File doesn't expose
337 - // set_times, so reach for std::fs::File + std::fs::FileTimes
338 - // (stable since 1.75). Synchronous is fine here — this is test
339 - // setup, not the hot path.
340 314 let f = std::fs::File::open(&dir).unwrap();
341 315 let when = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000 + i as u64);
342 316 let times = std::fs::FileTimes::new().set_modified(when);
@@ -346,23 +320,16 @@ mod tests {
346 320
347 321 gc_local_releases(root).await.unwrap();
348 322
349 - // The last RELEASES_TO_KEEP by mtime (i.e. highest i) survive.
350 323 let surviving_expected: Vec<_> = names
351 324 .iter()
352 325 .skip(total - RELEASES_TO_KEEP)
353 326 .cloned()
354 327 .collect();
355 328 for name in &surviving_expected {
356 - assert!(
357 - releases.join(name).exists(),
358 - "expected to survive: {name}"
359 - );
329 + assert!(releases.join(name).exists(), "expected to survive: {name}");
360 330 }
361 331 for name in names.iter().take(total - RELEASES_TO_KEEP) {
362 - assert!(
363 - !releases.join(name).exists(),
364 - "expected to be pruned: {name}"
365 - );
332 + assert!(!releases.join(name).exists(), "expected to be pruned: {name}");
366 333 }
367 334 }
368 335
@@ -401,24 +368,32 @@ mod tests {
401 368 ssh_target: "deploy@192.0.2.1".into(),
402 369 release_root: "/opt/never".into(),
403 370 service_name: "makenotwork.service".into(),
371 + actuate: crate::topology::default_actuate(),
372 + observe: crate::topology::default_observe(),
404 373 };
374 + let executor = SshExec::new(
375 + node.ssh_target.clone(),
376 + CapabilitySet::from_tokens(["deploy", "restart"], ["health"]),
377 + );
405 378
406 - let result = deploy_node(&node, "0.0.1", &staged, "server").await;
379 + let result = deploy_node(&executor, &node, "0.0.1", &staged, "server").await;
407 380 let err = result.expect_err("deploy to unreachable host should fail");
408 381 let msg = format!("{err:#}");
409 - // The ssh helper returns `ssh <target> failed: ...`. Don't pin the
410 - // exact wording, just that the failure is attributed and that no
411 - // panic / hang happened.
382 + // Don't pin exact wording, just that the failure is attributed (ssh /
383 + // rsync / connection) and that no panic / hang happened.
412 384 assert!(
413 - msg.contains("ssh") || msg.contains("rsync") || msg.contains("connection"),
385 + msg.contains("ssh")
386 + || msg.contains("rsync")
387 + || msg.contains("connection")
388 + || msg.contains("Connection"),
414 389 "unexpected error: {msg}"
415 390 );
416 391 }
417 392
418 393 #[tokio::test]
419 394 async fn deploy_node_with_local_ssh_target_swaps_symlink() {
420 - // ssh_target="local" should route to the local fast-path: just a
421 - // symlink swap, no remote calls. Helpful for dev loops.
395 + // ssh_target="local" routes to the local fast-path: just a symlink
396 + // swap, no remote calls.
422 397 let tmp = tempfile::tempdir().unwrap();
423 398 let release_root = tmp.path().to_path_buf();
424 399 let staged = release_root.join("releases").join("0.0.1");
@@ -430,11 +405,36 @@ mod tests {
430 405 ssh_target: "local".into(),
431 406 release_root: release_root.to_string_lossy().into_owned(),
432 407 service_name: "makenotwork.service".into(),
408 + actuate: crate::topology::default_actuate(),
409 + observe: crate::topology::default_observe(),
433 410 };
411 + let executor = local_executor();
434 412
435 - let out = deploy_node(&node, "0.0.1", &staged, "server").await.unwrap();
413 + let out = deploy_node(&executor, &node, "0.0.1", &staged, "server").await.unwrap();
436 414 assert_eq!(out, staged);
437 415 let target = tokio::fs::read_link(release_root.join("current")).await.unwrap();
438 416 assert_eq!(target.to_string_lossy(), "releases/0.0.1");
439 417 }
418 +
419 + #[tokio::test]
420 + async fn deploy_node_denied_when_executor_lacks_deploy_grant() {
421 + // Defense in depth: an executor without the deploy grant refuses the
422 + // step before any filesystem / ssh action.
423 + let tmp = tempfile::tempdir().unwrap();
424 + let release_root = tmp.path().to_path_buf();
425 + let staged = release_root.join("releases").join("0.0.1");
426 + tokio::fs::create_dir_all(&staged).await.unwrap();
427 +
428 + let node = crate::topology::Node {
429 + name: "local-dev".into(),
430 + ssh_target: "local".into(),
431 + release_root: release_root.to_string_lossy().into_owned(),
432 + service_name: "makenotwork.service".into(),
433 + actuate: vec!["restart".into()], // no deploy
434 + observe: vec![],
435 + };
436 + let executor = LocalExec::new(CapabilitySet::from_tokens(["restart"], Vec::<&str>::new()));
437 + let err = deploy_node(&executor, &node, "0.0.1", &staged, "server").await.unwrap_err();
438 + assert!(format!("{err:#}").contains("capability denied"), "expected capability denial");
439 + }
440 440 }
@@ -73,6 +73,18 @@ impl LiveLog {
73 73 pub fn chunks_emitted(&self) -> u32 { self.seq }
74 74 }
75 75
76 + /// Lets a Sando `LiveLog` be used directly as an [`ops_exec::LogSink`], so a
77 + /// gate or deploy run through `executor.run_streaming` streams into the same
78 + /// on-disk file + `GateLogChunk` broadcast the bespoke runner used. Delegates
79 + /// to the inherent [`LiveLog::write_chunk`] (inherent methods take precedence,
80 + /// so this is not recursive).
81 + #[async_trait::async_trait]
82 + impl ops_exec::LogSink for LiveLog {
83 + async fn write_chunk(&mut self, bytes: &[u8]) {
84 + self.write_chunk(bytes).await;
85 + }
86 + }
87 +
76 88 async fn open_for_append(path: &Path) -> Option<File> {
77 89 if let Some(parent) = path.parent() {
78 90 if let Err(e) = tokio::fs::create_dir_all(parent).await {
@@ -32,6 +32,7 @@ async fn main() -> Result<()> {
32 32
33 33 let prom = metrics::init();
34 34 let addr: SocketAddr = cfg.listen.parse()?;
35 + let executors = Arc::new(state::build_executors(&topo));
35 36 let app_state = state::AppState {
36 37 pool,
37 38 topo,
@@ -39,6 +40,7 @@ async fn main() -> Result<()> {
39 40 prom,
40 41 active_build: Arc::new(tokio::sync::Mutex::new(None)),
41 42 events: events::channel(),
43 + executors,
42 44 };
43 45 let app = routes::router(app_state);
44 46 tracing::info!(%addr, "sando daemon listening");
@@ -219,7 +219,9 @@ async fn promote(
219 219 crate::events::emit(&s.events, crate::events::Event::DeployStart {
220 220 tier: target.name.clone(), node: node.name.clone(), version: version.clone(),
221 221 });
222 - let result = crate::deploy::deploy_node(node, &version_str, &staged_dir, s.cfg.primary_bin()).await;
222 + let executor = s.executors.get(&node.name).cloned()
223 + .unwrap_or_else(|| crate::state::build_executor(node));
224 + let result = crate::deploy::deploy_node(executor.as_ref(), node, &version_str, &staged_dir, s.cfg.primary_bin()).await;
223 225 let finished = chrono::Utc::now().to_rfc3339();
224 226 let (outcome_obj, err_for_propagation) = match result {
225 227 Ok(_) => (crate::outcome::DeployOutcome::ok(), None),
@@ -374,7 +376,9 @@ async fn rollback(
374 376 .to_path_buf();
375 377
376 378 for node in &target.nodes {
377 - crate::deploy::deploy_node(node, &previous_str, &staged_dir, s.cfg.primary_bin())
379 + let executor = s.executors.get(&node.name).cloned()
380 + .unwrap_or_else(|| crate::state::build_executor(node));
381 + crate::deploy::deploy_node(executor.as_ref(), node, &previous_str, &staged_dir, s.cfg.primary_bin())
378 382 .await
379 383 .map_err(crate::error::Error::Other)?;
380 384 }
@@ -630,6 +634,8 @@ mod tests {
630 634 ssh_target: "local".into(),
631 635 release_root: "/tmp/a-node".into(),
632 636 service_name: "makenotwork.service".into(),
637 + actuate: crate::topology::default_actuate(),
638 + observe: crate::topology::default_observe(),
633 639 }],
634 640 },
635 641 ],
@@ -664,13 +670,16 @@ mod tests {
664 670 // Don't call install_recorder in tests — it touches a process-global
665 671 // and conflicts when tests run in parallel.
666 672 let prom = PrometheusBuilder::new().build_recorder().handle();
673 + let topo = test_topo();
674 + let executors = Arc::new(crate::state::build_executors(&topo));
667 675 AppState {
668 676 pool,
669 - topo: Arc::new(test_topo()),
677 + topo: Arc::new(topo),
670 678 cfg: Arc::new(test_cfg()),
671 679 prom,
672 680 active_build: Arc::new(tokio::sync::Mutex::new(None)),
673 681 events: crate::events::channel(),
682 + executors,
674 683 }
675 684 }
676 685
@@ -1,12 +1,20 @@
1 1 use crate::config::Config;
2 + use crate::domain::NodeId;
2 3 use crate::events::EventTx;
3 - use crate::topology::Topology;
4 + use crate::topology::{Node, Topology};
4 5 use metrics_exporter_prometheus::PrometheusHandle;
6 + use ops_exec::{CapabilitySet, Executor, LocalExec, SshExec};
5 7 use sqlx::SqlitePool;
8 + use std::collections::HashMap;
6 9 use std::sync::Arc;
7 10 use tokio::sync::Mutex;
8 11 use tokio::task::AbortHandle;
9 12
13 + /// Per-node executors keyed by node id, built once from the topology at
14 + /// startup. The deploy path looks a node's executor up here instead of
15 + /// constructing ssh/rsync invocations inline.
16 + pub type ExecutorMap = HashMap<NodeId, Arc<dyn Executor>>;
17 +
10 18 #[derive(Clone)]
11 19 pub struct AppState {
12 20 pub pool: SqlitePool,
@@ -19,4 +27,27 @@ pub struct AppState {
19 27 /// Broadcast bus for live operator events. WS /events subscribes; all
20 28 /// build/gate/deploy code sites emit on this.
21 29 pub events: EventTx,
30 + /// One capability-scoped [`Executor`] per node, built from the topology.
31 + pub executors: Arc<ExecutorMap>,
32 + }
33 +
34 + /// Build the executor for one node: a `LocalExec` for the `local` fast-path, an
35 + /// `SshExec` otherwise, each granted the node's declared capabilities (which
36 + /// default to deploy+restart / observe health — the historical behavior).
37 + pub fn build_executor(node: &Node) -> Arc<dyn Executor> {
38 + let caps = CapabilitySet::from_tokens(&node.actuate, &node.observe);
39 + if node.ssh_target == "local" || node.ssh_target.is_empty() {
40 + Arc::new(LocalExec::new(caps))
41 + } else {
42 + Arc::new(SshExec::new(node.ssh_target.clone(), caps))
43 + }
44 + }
45 +
46 + /// Build the full node → executor map from every tier's nodes.
47 + pub fn build_executors(topo: &Topology) -> ExecutorMap {
48 + topo.tiers
49 + .iter()
50 + .flat_map(|t| t.nodes.iter())
51 + .map(|node| (node.name.clone(), build_executor(node)))
52 + .collect()
22 53 }
@@ -152,6 +152,8 @@ mod tests {
152 152 ssh_target: format!("deploy@{name}"),
153 153 release_root: "/opt/mnw".into(),
154 154 service_name: "makenotwork.service".into(),
155 + actuate: crate::topology::default_actuate(),
156 + observe: crate::topology::default_observe(),
155 157 }
156 158 }
157 159
@@ -44,10 +44,23 @@ pub struct Node {
44 44 /// Defaults to "makenotwork.service" because that's MNW's prod unit.
45 45 #[serde(default = "default_service_name")]
46 46 pub service_name: String,
47 + /// Capability grant for this node's executor (see `ops_exec`). Defaults to
48 + /// the current behavior of every Sando node — actuate deploy+restart,
49 + /// observe health — so an existing `sando.toml` keeps working unedited.
50 + #[serde(default = "default_actuate")]
51 + pub actuate: Vec<String>,
52 + #[serde(default = "default_observe")]
53 + pub observe: Vec<String>,
47 54 }
48 55
49 56 fn default_service_name() -> String { "makenotwork.service".into() }
50 57
58 + /// The capability set every pre-existing Sando node implicitly had: it deploys
59 + /// and restarts, and is health-observed. Keeping these as the defaults is what
60 + /// lets `sando.toml` stay unchanged through the executor refactor.
61 + pub fn default_actuate() -> Vec<String> { vec!["deploy".into(), "restart".into()] }
62 + pub fn default_observe() -> Vec<String> { vec!["health".into()] }
63 +
51 64 #[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
52 65 #[serde(rename_all = "snake_case")]
53 66 pub enum CanaryPolicy {
@@ -24,6 +24,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
24 24 checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
25 25
26 26 [[package]]
27 + name = "async-trait"
28 + version = "0.1.89"
29 + source = "registry+https://github.com/rust-lang/crates.io-index"
30 + checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
31 + dependencies = [
32 + "proc-macro2",
33 + "quote",
34 + "syn",
35 + ]
36 +
37 + [[package]]
27 38 name = "atoi"
28 39 version = "2.0.0"
29 40 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -805,7 +816,9 @@ name = "ops-core"
805 816 version = "0.1.0"
806 817 dependencies = [
807 818 "anyhow",
819 + "async-trait",
808 820 "chrono",
821 + "ops-exec",
809 822 "serde",
810 823 "serde_json",
811 824 "sqlx",
@@ -815,6 +828,18 @@ dependencies = [
815 828 ]
816 829
817 830 [[package]]
831 + name = "ops-exec"
832 + version = "0.1.0"
833 + dependencies = [
834 + "anyhow",
835 + "async-trait",
836 + "serde",
837 + "thiserror",
838 + "tokio",
839 + "tracing",
840 + ]
841 +
842 + [[package]]
818 843 name = "parking"
819 844 version = "2.2.1"
820 845 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6,10 +6,12 @@ license = "MIT"
6 6 description = "Shared operator-tool infrastructure: streaming remote-exec, a generic event bus, a disk+broadcast live-log sink, and sqlite helpers. Used by sando-daemon and bento-daemon."
7 7
8 8 [dependencies]
9 + ops-exec = { path = "../ops-exec" }
9 10 tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread", "io-util", "fs", "process", "sync"] }
10 11 serde = { version = "1.0.228", features = ["derive"] }
11 12 chrono = { version = "0.4", features = ["serde"] }
12 13 anyhow = "1.0.102"
14 + async-trait = "0.1.83"
13 15 tracing = "0.1.44"
14 16 sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite"] }
15 17
@@ -6,7 +6,10 @@
6 6 //! recipes, and route handlers; only the transport-and-plumbing layer lives
7 7 //! here so there is exactly one copy.
8 8 //!
9 - //! - [`remote`] — streaming SSH (or local) command exec into a [`remote::LogSink`].
9 + //! - [`remote`] — re-exported from the `ops-exec` crate: the streaming SSH (or
10 + //! local) command primitive ([`remote::RemoteHost`]) and the
11 + //! [`remote::LogSink`] sink trait. The capability-gated `Executor` trait and
12 + //! its transports live in `ops-exec` proper (re-exported here as [`ops_exec`]).
10 13 //! - [`eventbus`] — a generic `EventEnvelope<E>` broadcast bus with flat-`kind`
11 14 //! serialization (each tool supplies its own concrete `Event` enum as `E`).
12 15 //! - [`live_log`] — a disk-append + callback live-log sink that implements
@@ -16,5 +19,11 @@
16 19
17 20 pub mod eventbus;
18 21 pub mod live_log;
19 - pub mod remote;
20 22 pub mod sqlite;
23 +
24 + /// The trusted executor crate, re-exported so consumers can depend on
25 + /// `ops-core` alone and get the `Executor` trait + transports transitively.
26 + pub use ops_exec;
27 + /// The low-level streaming primitive + `LogSink`, kept at the historical
28 + /// `ops_core::remote` path so existing consumers compile unchanged.
29 + pub use ops_exec::remote;
@@ -15,6 +15,7 @@
15 15 //! exact byte stream for that.
16 16
17 17 use crate::remote::LogSink;
18 + use async_trait::async_trait;
18 19 use std::path::{Path, PathBuf};
19 20 use tokio::fs::File;
20 21 use tokio::io::AsyncWriteExt;
@@ -57,6 +58,7 @@ impl LiveLog {
57 58 }
58 59 }
59 60
61 + #[async_trait]
60 62 impl LogSink for LiveLog {
61 63 /// Append `bytes` to the on-disk log and invoke the callback. The callback
62 64 /// fires even if the disk write fails — operators watching live still see
@@ -1,216 +0,0 @@
1 - //! Streaming command execution — local or over SSH.
2 - //!
3 - //! The single most valuable primitive both tools share. Sando's `deploy.rs`
4 - //! shells out to `ssh`/`rsync` with buffered `.output()`; an app-build
5 - //! orchestrator instead needs the merged stdout+stderr streamed live (a
6 - //! `cargo tauri build` writes progress to stderr for minutes), so this is a
7 - //! proper streaming runner rather than a buffered one.
8 - //!
9 - //! A [`RemoteHost`] is either the local machine (`ssh_target == "local"` or
10 - //! empty) or a tailnet host reached over SSH. [`RemoteHost::run_streaming`]
11 - //! spawns the command, drains both pipes concurrently into a shared
12 - //! [`LogSink`], and returns the exit status plus the full captured bytes so a
13 - //! caller can still post-process the whole output (classify, grep a success
14 - //! banner, etc.).
15 -
16 - use anyhow::{Context, Result};
17 - use std::process::{ExitStatus, Stdio};
18 - use std::sync::Arc;
19 - use tokio::io::{AsyncRead, AsyncReadExt};
20 - use tokio::process::Command;
21 - use tokio::sync::Mutex;
22 -
23 - /// SSH options used everywhere we shell out to ssh — fail fast, no prompts.
24 - /// Matches Sando's `deploy.rs` so behavior is identical across both tools.
25 - pub const SSH_FLAGS: &[&str] = &[
26 - "-o",
27 - "BatchMode=yes",
28 - "-o",
29 - "ConnectTimeout=10",
30 - "-o",
31 - "StrictHostKeyChecking=accept-new",
32 - ];
33 -
34 - /// A sink for streamed command output. Each chunk reflects a tokio read
35 - /// boundary — chunks are NOT line-aligned; consumers that want lines must
36 - /// reassemble. [`crate::live_log::LiveLog`] is the canonical implementation
37 - /// (append-to-disk + broadcast).
38 - ///
39 - /// `async fn` in trait via RPITIT (stable on the 2024 edition). The `Send`
40 - /// bound on the returned future lets callers drive a sink from a spawned task.
41 - pub trait LogSink: Send {
42 - fn write_chunk(&mut self, bytes: &[u8]) -> impl std::future::Future<Output = ()> + Send;
43 - }
44 -
45 - /// A build host: the local machine or an SSH target (a tailnet alias such as
46 - /// `mbp`, or `user@host`).
47 - #[derive(Debug, Clone)]
48 - pub struct RemoteHost {
49 - ssh_target: String,
50 - }
51 -
52 - /// Result of a streamed run: the exit status plus the full captured stdout and
53 - /// stderr byte streams (already forwarded to the sink as they arrived).
54 - pub struct RunOutput {
55 - pub status: ExitStatus,
56 - pub stdout: Vec<u8>,
57 - pub stderr: Vec<u8>,
58 - }
59 -
60 - impl RunOutput {
61 - pub fn success(&self) -> bool {
62 - self.status.success()
63 - }
64 - }
65 -
66 - impl RemoteHost {
67 - /// `"local"` (or empty) runs commands directly via `sh -c`; anything else
68 - /// is an SSH target.
69 - pub fn new(ssh_target: impl Into<String>) -> Self {
70 - Self { ssh_target: ssh_target.into() }
71 - }
72 -
73 - pub fn is_local(&self) -> bool {
74 - self.ssh_target == "local" || self.ssh_target.is_empty()
75 - }
76 -
77 - pub fn ssh_target(&self) -> &str {
78 - &self.ssh_target
79 - }
80 -
81 - /// Build the `Command` that runs `script` as a single `/bin/sh` program,
82 - /// locally or over SSH. The remote side runs `script` as the argument to
83 - /// the login shell (ssh joins argv with spaces and hands it to the shell),
84 - /// so multi-statement scripts and pipes work the same as locally.
85 - fn command(&self, script: &str) -> Command {
86 - if self.is_local() {
87 - let mut cmd = Command::new("sh");
88 - cmd.arg("-c").arg(script);
89 - cmd
90 - } else {
91 - let mut cmd = Command::new("ssh");
92 - cmd.args(SSH_FLAGS).arg(&self.ssh_target).arg(script);
93 - cmd
94 - }
95 - }
96 -
97 - /// Spawn `script`, stream its merged output into `sink`, and return the
98 - /// exit status plus captured bytes. `kill_on_drop` means cancelling the
99 - /// caller's task (e.g. a newer build superseding this one) SIGKILLs the
100 - /// child and — for SSH — drops the connection.
101 - pub async fn run_streaming<S>(&self, script: &str, sink: Arc<Mutex<S>>) -> Result<RunOutput>
102 - where
103 - S: LogSink + Send + 'static,
104 - {
105 - let mut child = self
106 - .command(script)
107 - .stdout(Stdio::piped())
108 - .stderr(Stdio::piped())
109 - .kill_on_drop(true)
110 - .spawn()
111 - .with_context(|| format!("spawning command on {}", self.ssh_target))?;
112 -
113 - let stdout_task = tokio::spawn(drain(child.stdout.take(), sink.clone()));
114 - let stderr_task = tokio::spawn(drain(child.stderr.take(), sink.clone()));
115 - let status = child.wait().await.context("waiting on child")?;
116 - let stdout = stdout_task.await.unwrap_or_default();
117 - let stderr = stderr_task.await.unwrap_or_default();
118 - Ok(RunOutput { status, stdout, stderr })
119 - }
120 - }
121 -
122 - /// Drain `stream` into the shared sink and return the concatenated bytes.
123 - /// Mirrors Sando's `gates.rs::stream_into_log`, generalized over `LogSink`.
124 - async fn drain<R, S>(stream: Option<R>, sink: Arc<Mutex<S>>) -> Vec<u8>
125 - where
126 - R: AsyncRead + Unpin + Send + 'static,
127 - S: LogSink + Send + 'static,
128 - {
129 - let mut total = Vec::new();
130 - let Some(mut s) = stream else { return total };
131 - let mut buf = [0u8; 4096];
132 - loop {
133 - match s.read(&mut buf).await {
134 - Ok(0) | Err(_) => break,
135 - Ok(n) => {
136 - total.extend_from_slice(&buf[..n]);
137 - sink.lock().await.write_chunk(&buf[..n]).await;
138 - }
139 - }
140 - }
141 - total
142 - }
143 -
144 - /// Single-quote a string for safe inclusion in a `/bin/sh` command, escaping
145 - /// any embedded single quote. Not bulletproof for adversarial input, but every
146 - /// path here comes from our own config files. (Same impl as Sando's.)
147 - pub fn sh_quote(s: &str) -> String {
148 - let escaped = s.replace('\'', r"'\''");
149 - format!("'{escaped}'")
150 - }
151 -
152 - #[cfg(test)]
153 - mod tests {
154 - use super::*;
155 -
156 - /// A simple sink that accumulates every chunk for assertions.
157 - #[derive(Default)]
158 - struct VecSink(Vec<u8>);
159 - impl LogSink for VecSink {
160 - async fn write_chunk(&mut self, bytes: &[u8]) {
161 - self.0.extend_from_slice(bytes);
162 - }
163 - }
164 -
165 - #[test]
166 - fn sh_quote_escapes() {
167 - assert_eq!(sh_quote("hello"), "'hello'");
168 - assert_eq!(sh_quote("it's"), r"'it'\''s'");
169 - }
170 -
171 - #[test]
172 - fn local_detection() {
173 - assert!(RemoteHost::new("local").is_local());
174 - assert!(RemoteHost::new("").is_local());
175 - assert!(!RemoteHost::new("mbp").is_local());
176 - }
177 -
178 - #[tokio::test]
179 - async fn local_run_streams_stdout_and_captures_status() {
180 - let host = RemoteHost::new("local");
181 - let sink = Arc::new(Mutex::new(VecSink::default()));
182 - let out = host
183 - .run_streaming("printf 'hello '; printf 'world'", sink.clone())
184 - .await
185 - .unwrap();
186 - assert!(out.success());
187 - assert_eq!(out.stdout, b"hello world");
188 - assert_eq!(sink.lock().await.0, b"hello world");
189 - }
190 -
191 - #[tokio::test]
192 - async fn local_run_streams_stderr_too() {
193 - let host = RemoteHost::new("local");
194 - let sink = Arc::new(Mutex::new(VecSink::default()));
195 - let out = host
196 - .run_streaming("echo out; echo err 1>&2", sink.clone())
197 - .await
198 - .unwrap();
199 - assert!(out.success());
200 - assert_eq!(out.stdout, b"out\n");
201 - assert_eq!(out.stderr, b"err\n");
202 - // The sink saw both streams (order between them is not guaranteed).
203 - let seen = sink.lock().await.0.clone();
204 - assert!(seen.windows(4).any(|w| w == b"out\n"));
205 - assert!(seen.windows(4).any(|w| w == b"err\n"));
206 - }
207 -
208 - #[tokio::test]
209 - async fn local_run_reports_nonzero_exit() {
210 - let host = RemoteHost::new("local");
211 - let sink = Arc::new(Mutex::new(VecSink::default()));
212 - let out = host.run_streaming("exit 3", sink).await.unwrap();
213 - assert!(!out.success());
214 - assert_eq!(out.status.code(), Some(3));
215 - }
216 - }