Skip to main content

max / makenotwork

47.8 KB · 1210 lines History Blame Raw
1 //! sando-tui — operator front-end for sandod.
2 //!
3 //! Layout (top to bottom):
4 //!
5 //! ┌ daemon ─────────────────────────────────────────┐
6 //! │ sando -> http://...:7766 (ws: connected) │
7 //! ├ tiers ───────────────────── (arrows to select) │
8 //! │ tier prov current previous burn-in gates │
9 //! │ mm yes 0.8.12 - - ... │
10 //! │ a >yes< 0.8.12 - 02:34Z ... │ ← highlighted
11 //! ├ events (WS /events) ────────────────────────────│
12 //! │ 02:55:39 backup_fetched 36MB │
13 //! │ 02:55:40 manual_confirm tier=a v=0.8.12 │
14 //! ├ tail [17] a/0.9.6 cargo_test — in-flight ───────│
15 //! │ running 1 test │
16 //! │ test routes::tests::ok ... ok │
17 //! ├ status / keys ──────────────────────────────────│
18 //! │ [p] promote [R] rollback [b] backup [c] confirm [[/]] tail [r] refresh [q] quit │
19 //! └─────────────────────────────────────────────────┘
20 //!
21 //! State is refreshed every 2s by a background task. Events arrive in real
22 //! time via WS. Actions are issued as background HTTP POSTs; their result
23 //! shows up in the events log a moment later.
24
25 use anyhow::{Context, Result};
26 use crossterm::event::{self, Event as XEvent, KeyCode, KeyModifiers};
27 use crossterm::terminal::{
28 disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen,
29 };
30 use futures_util::StreamExt;
31 use ratatui::prelude::*;
32 use ratatui::widgets::{Block, Borders, List, ListItem, Paragraph, Row, Table};
33 use sando_daemon::domain::{GateKind, GateRunId, TierId, Version};
34 use sando_daemon::events::{Event, EventEnvelope};
35 use sando_daemon::outcome::{DeployFailureKind, GateBlocker, GateFailure, GateStatus, PassNote};
36 use serde::Deserialize;
37 use std::collections::{BTreeMap, VecDeque};
38 use std::io;
39 use std::sync::{Arc, Mutex};
40 use std::time::Duration;
41 use tokio::sync::mpsc;
42
43 // ---------- daemon types ----------
44 //
45 // `StateView` mirrors the JSON shape of `GET /state` from
46 // `sando_daemon::routes`. The TUI deserializes a subset (only what it
47 // renders) — adding new fields on the daemon side is a non-breaking
48 // change as long as the field names stay stable.
49
50 #[derive(Clone, Debug, Deserialize)]
51 struct StateView {
52 tiers: Vec<TierView>,
53 }
54
55 #[derive(Clone, Debug, Deserialize)]
56 struct TierView {
57 name: String,
58 provisioned: bool,
59 #[allow(dead_code)]
60 canary: String,
61 current_version: Option<String>,
62 previous_version: Option<String>,
63 burn_in_started_at: Option<String>,
64 nodes: Vec<String>,
65 gates: Vec<GateView>,
66 }
67
68 #[derive(Clone, Debug, Deserialize)]
69 struct GateView {
70 kind: String,
71 #[allow(dead_code)]
72 finished_at: Option<String>,
73 /// `'passed' | 'failed' | 'blocked'` or NULL. NULL = in-flight.
74 #[serde(default)]
75 status: Option<String>,
76 /// Full typed `GateOutcome`, JSON object. We don't deserialize the
77 /// inner shape on the polling path — the events handler does that
78 /// when the gate finishes, since that's when classification actually
79 /// matters for the operator's render. Carried here as opaque JSON.
80 #[serde(default)]
81 #[allow(dead_code)]
82 outcome: Option<serde_json::Value>,
83 #[serde(default)]
84 #[allow(dead_code)]
85 log_ref: Option<String>,
86 }
87
88 // ---------- app state shared between tasks ----------
89
90 #[derive(Default)]
91 struct Shared {
92 state: Option<StateView>,
93 last_err: Option<String>,
94 events: VecDeque<String>,
95 ws_ok: bool,
96 selected: usize,
97 notice: Option<String>,
98 /// Per-run live-tail buffers keyed by `GateRunId`. Populated by
99 /// `GateStart` / `GateLogChunk` / `GateDone`. Bounded to the most
100 /// recent `GATE_TAILS_CAP` runs (LRU by start time).
101 gate_tails: BTreeMap<GateRunId, GateTail>,
102 /// Run shown in the tail pane. Defaults to the most recently active
103 /// run; updated as new `GateStart` events arrive.
104 focus_run: Option<GateRunId>,
105 }
106
107 const EVENTS_CAP: usize = 200;
108 const GATE_TAILS_CAP: usize = 10;
109 const TAIL_LINES_CAP: usize = 200;
110
111 /// Per-run live-tail state. Receives `GateLogChunk` bytes and presents
112 /// them as a line-aware ring buffer. Chunks are NOT line-aligned at the
113 /// transport — we buffer a trailing partial line across chunks so the UI
114 /// only renders complete lines (and one trailing in-flight line, on
115 /// `close`/finalize).
116 struct GateTail {
117 tier: TierId,
118 version: Version,
119 gate: GateKind,
120 /// Complete lines, oldest first. Newest end of the deque is what the
121 /// pane shows at the bottom.
122 lines: VecDeque<String>,
123 /// Trailing bytes after the last `\n` in any chunk so far. Flushed
124 /// into `lines` on `finalize` (so the operator sees the last line
125 /// even if the gate exited without a final newline).
126 partial: String,
127 status: TailStatus,
128 }
129
130 #[derive(Clone, Debug, PartialEq, Eq)]
131 enum TailStatus {
132 InFlight,
133 Finished(String),
134 }
135
136 impl GateTail {
137 fn new(tier: TierId, version: Version, gate: GateKind) -> Self {
138 Self {
139 tier,
140 version,
141 gate,
142 lines: VecDeque::new(),
143 partial: String::new(),
144 status: TailStatus::InFlight,
145 }
146 }
147
148 /// Append a chunk of stdout/stderr. Splits on `\n`, buffers the
149 /// trailing partial line, caps the ring at `TAIL_LINES_CAP`.
150 fn push_chunk(&mut self, text: &str) {
151 let combined = std::mem::take(&mut self.partial) + text;
152 let mut rest = combined.as_str();
153 while let Some(idx) = rest.find('\n') {
154 let (line, after) = rest.split_at(idx);
155 self.push_line(line.trim_end_matches('\r').to_string());
156 rest = &after[1..];
157 }
158 self.partial = rest.to_string();
159 }
160
161 fn push_line(&mut self, line: String) {
162 if self.lines.len() >= TAIL_LINES_CAP {
163 self.lines.pop_front();
164 }
165 self.lines.push_back(line);
166 }
167
168 /// Called when `GateDone` arrives. Flushes any trailing partial
169 /// line and records the gate's final status word.
170 fn finalize(&mut self, status_word: String) {
171 let trailing = std::mem::take(&mut self.partial);
172 if !trailing.is_empty() {
173 self.push_line(trailing);
174 }
175 self.status = TailStatus::Finished(status_word);
176 }
177 }
178
179 impl Shared {
180 fn push_event(&mut self, line: String) {
181 if self.events.len() >= EVENTS_CAP {
182 self.events.pop_front();
183 }
184 self.events.push_back(line);
185 }
186
187 fn open_tail(&mut self, run_id: GateRunId, tier: TierId, version: Version, gate: GateKind) {
188 if self.gate_tails.len() >= GATE_TAILS_CAP {
189 // BTreeMap is sorted by GateRunId (monotonic from the daemon's
190 // INSERT … RETURNING id), so the smallest key is the oldest run.
191 if let Some((&oldest, _)) = self.gate_tails.iter().next() {
192 self.gate_tails.remove(&oldest);
193 }
194 }
195 self.gate_tails.insert(run_id, GateTail::new(tier, version, gate));
196 self.focus_run = Some(run_id);
197 }
198
199 fn push_tail_chunk(&mut self, run_id: GateRunId, text: &str) {
200 if let Some(t) = self.gate_tails.get_mut(&run_id) {
201 t.push_chunk(text);
202 }
203 }
204
205 fn finalize_tail(&mut self, run_id: GateRunId, status_word: String) {
206 if let Some(t) = self.gate_tails.get_mut(&run_id) {
207 t.finalize(status_word);
208 }
209 }
210 }
211
212 // ---------- main ----------
213
214 fn main() -> Result<()> {
215 let daemon = std::env::var("SANDO_DAEMON").unwrap_or_else(|_| "http://127.0.0.1:7766".into());
216 let shared = Arc::new(Mutex::new(Shared::default()));
217
218 let rt = tokio::runtime::Builder::new_multi_thread()
219 .enable_all()
220 .worker_threads(2)
221 .build()?;
222
223 // Background pollers + WS subscriber.
224 let _g = rt.enter();
225 rt.spawn(state_poller(daemon.clone(), shared.clone()));
226 rt.spawn(events_subscriber(daemon.clone(), shared.clone()));
227
228 enable_raw_mode()?;
229 let mut stdout = io::stdout();
230 crossterm::execute!(stdout, EnterAlternateScreen)?;
231 let backend = CrosstermBackend::new(stdout);
232 let mut term = Terminal::new(backend)?;
233
234 let res = ui_loop(&mut term, &daemon, &shared, rt.handle());
235
236 disable_raw_mode()?;
237 crossterm::execute!(term.backend_mut(), LeaveAlternateScreen)?;
238 term.show_cursor()?;
239 res
240 }
241
242 // ---------- background tasks ----------
243
244 async fn state_poller(daemon: String, shared: Arc<Mutex<Shared>>) {
245 let url = format!("{}/state", daemon);
246 let client = reqwest::Client::new();
247 loop {
248 match client.get(&url).timeout(Duration::from_secs(2)).send().await {
249 Ok(resp) if resp.status().is_success() => match resp.json::<StateView>().await {
250 Ok(s) => {
251 let mut g = shared.lock().unwrap();
252 let n = s.tiers.len().saturating_sub(1);
253 if g.selected > n {
254 g.selected = n;
255 }
256 g.state = Some(s);
257 g.last_err = None;
258 }
259 Err(e) => shared.lock().unwrap().last_err = Some(format!("decode: {e}")),
260 },
261 Ok(resp) => {
262 shared.lock().unwrap().last_err = Some(format!("status {}", resp.status()));
263 }
264 Err(e) => shared.lock().unwrap().last_err = Some(e.to_string()),
265 }
266 tokio::time::sleep(Duration::from_secs(2)).await;
267 }
268 }
269
270 async fn events_subscriber(daemon: String, shared: Arc<Mutex<Shared>>) {
271 let ws_url = ws_url_from(&daemon);
272
273 loop {
274 match tokio_tungstenite::connect_async(&ws_url).await {
275 Ok((mut socket, _resp)) => {
276 shared.lock().unwrap().ws_ok = true;
277 while let Some(msg) = socket.next().await {
278 match msg {
279 Ok(tokio_tungstenite::tungstenite::Message::Text(t)) => {
280 dispatch_ws_frame(&shared, &t);
281 }
282 Ok(tokio_tungstenite::tungstenite::Message::Close(_)) => break,
283 Err(_) => break,
284 _ => {}
285 }
286 }
287 shared.lock().unwrap().ws_ok = false;
288 }
289 Err(_) => {
290 shared.lock().unwrap().ws_ok = false;
291 }
292 }
293 tokio::time::sleep(Duration::from_secs(3)).await;
294 }
295 }
296
297 /// Convert the daemon's HTTP URL into the matching WebSocket URL for /events.
298 /// Plain string replace; rejects nothing — bad input fails later when
299 /// `connect_async` actually dials.
300 fn ws_url_from(daemon: &str) -> String {
301 daemon
302 .replacen("https://", "wss://", 1)
303 .replacen("http://", "ws://", 1)
304 + "/events"
305 }
306
307 /// Route an incoming WS frame: gate live-tail events update the per-run
308 /// buffer, every other event becomes one line in the events ring.
309 ///
310 /// `GateLogChunk` does NOT push into the events ring — a busy gate emits
311 /// hundreds of chunks per second, which would evict every other event in
312 /// under a minute. The chunk text only lives in the per-run tail buffer.
313 fn dispatch_ws_frame(shared: &Arc<Mutex<Shared>>, raw: &str) {
314 if let Some(line) = format_lagged(raw) {
315 shared.lock().unwrap().push_event(line);
316 return;
317 }
318 let Ok(env) = serde_json::from_str::<EventEnvelope>(raw) else {
319 // Unparseable frame — surface verbatim so the operator can see it.
320 shared.lock().unwrap().push_event(raw.to_string());
321 return;
322 };
323 match &env.event {
324 Event::GateStart { run_id, tier, version, gate } => {
325 let mut g = shared.lock().unwrap();
326 g.open_tail(*run_id, tier.clone(), version.clone(), *gate);
327 let line = format_event_line(&env);
328 g.push_event(line);
329 }
330 Event::GateLogChunk { run_id, text, .. } => {
331 shared.lock().unwrap().push_tail_chunk(*run_id, text);
332 }
333 Event::GateDone { run_id, outcome, .. } => {
334 let status_word = outcome.status_str().to_string();
335 let mut g = shared.lock().unwrap();
336 g.finalize_tail(*run_id, status_word);
337 let line = format_event_line(&env);
338 g.push_event(line);
339 }
340 _ => {
341 let line = format_event_line(&env);
342 shared.lock().unwrap().push_event(line);
343 }
344 }
345 }
346
347 fn format_event_line(env: &EventEnvelope) -> String {
348 let time = env.at.format("%H:%M:%S").to_string();
349 let kind = event_kind_str(&env.event);
350 let body = format_event_body(&env.event);
351 format!("{time} {kind} {body}")
352 }
353
354 /// Render one event envelope into a human-readable single line.
355 ///
356 /// Step 5: deserializes the daemon's `EventEnvelope` directly via the
357 /// shared `sando_daemon::events` types. No more `serde_json::Value`
358 /// reflection; the compiler enforces every field name. The `lagged`
359 /// frame is a daemon-emitted ad-hoc envelope that doesn't match the
360 /// `Event` enum (it's `{"kind":"lagged","skipped":N}`), so we handle it
361 /// with a one-shot probe before the typed parse.
362 #[cfg(test)]
363 fn format_event(json: &str) -> Option<String> {
364 if let Some(line) = format_lagged(json) {
365 return Some(line);
366 }
367 let env: EventEnvelope = serde_json::from_str(json).ok()?;
368 Some(format_event_line(&env))
369 }
370
371 fn format_lagged(json: &str) -> Option<String> {
372 let v: serde_json::Value = serde_json::from_str(json).ok()?;
373 if v.get("kind").and_then(|x| x.as_str()) != Some("lagged") { return None; }
374 let at = v.get("at").and_then(|x| x.as_str()).unwrap_or("");
375 let time = at.get(11..19).unwrap_or("");
376 let skipped = v.get("skipped")
377 .and_then(|x| x.as_u64())
378 .map(|n| n.to_string())
379 .unwrap_or_else(|| "?".into());
380 Some(format!("{time} lagged (skipped {skipped} events)"))
381 }
382
383 fn event_kind_str(e: &Event) -> &'static str {
384 match e {
385 Event::RebuildRequested { .. } => "rebuild_requested",
386 Event::BuildAborted { .. } => "build_aborted",
387 Event::BuildStart { .. } => "build_start",
388 Event::BuildOk { .. } => "build_ok",
389 Event::BuildFailed { .. } => "build_failed",
390 Event::GateStart { .. } => "gate_start",
391 Event::GateLogChunk { .. } => "gate_log",
392 Event::GateDone { .. } => "gate_done",
393 Event::DeployStart { .. } => "deploy_start",
394 Event::DeployOk { .. } => "deploy_ok",
395 Event::DeployFailed { .. } => "deploy_failed",
396 Event::PromoteComplete { .. } => "promote_complete",
397 Event::Rollback { .. } => "rollback",
398 Event::BackupFetched { .. } => "backup_fetched",
399 Event::ManualConfirm { .. } => "manual_confirm",
400 }
401 }
402
403 fn format_event_body(e: &Event) -> String {
404 match e {
405 Event::RebuildRequested { sha } => format!("sha={}", sha.short()),
406 Event::BuildAborted { sha_aborted } => format!("aborted prev sha={}", sha_aborted.short()),
407 Event::BuildStart { sha, version } => format!("v={version} sha={}", sha.short()),
408 Event::BuildOk { version, elapsed_s, .. } => format!("v={version} ({elapsed_s}s)"),
409 Event::BuildFailed { version, elapsed_s, .. } => format!("v={version} FAILED ({elapsed_s}s)"),
410 Event::GateStart { run_id, tier, version, gate } =>
411 format!("[{run_id}] {tier}/{version} {gate} start"),
412 Event::GateLogChunk { run_id, seq, text } => {
413 // Single-line summary used by `format_event` (test path only);
414 // production rendering routes chunks through `dispatch_ws_frame`
415 // into the per-run tail buffer, never the events ring.
416 let one = text.lines().next().unwrap_or("").trim_end();
417 let truncated: String = one.chars().take(160).collect();
418 format!("[{run_id} #{seq}] {truncated}")
419 }
420 Event::GateDone { run_id, tier, version, gate, outcome, .. } => {
421 let summary = match &outcome.status {
422 GateStatus::Passed { note } => format!("ok ({})", pass_note_short(note)),
423 GateStatus::Failed { failure } => format!("FAIL ({})", failure_short(failure)),
424 GateStatus::Blocked { blocker } => format!("blocked ({})", blocker_short(blocker)),
425 };
426 format!("[{run_id}] {tier}/{version} {gate} {summary}")
427 }
428 Event::DeployStart { tier, node, version } => format!("{tier} -> {node} v={version}"),
429 Event::DeployOk { tier, node, version } => format!("{tier} -> {node} v={version} ok"),
430 Event::DeployFailed { tier, node, version, failure } =>
431 format!("{tier} -> {node} v={version} FAILED: {}", deploy_failure_short(failure)),
432 Event::PromoteComplete { tier, version } => format!("{tier} -> v={version} complete"),
433 Event::Rollback { tier, from, to } => format!("{tier} {from} -> {to}"),
434 Event::BackupFetched { source, byte_size } => format!("backup {byte_size} bytes from {source}"),
435 Event::ManualConfirm { tier, version } => format!("{tier} v={version} confirmed"),
436 }
437 }
438
439 fn pass_note_short(n: &PassNote) -> String {
440 match n {
441 PassNote::StayedUp { duration_s } => format!("up {duration_s}s"),
442 PassNote::BurnInElapsed { hours } => format!("{hours}h elapsed"),
443 PassNote::Migrated { backup_path } => {
444 // Show just the basename; full path lives in /state if needed.
445 let name = backup_path.rsplit('/').next().unwrap_or(backup_path);
446 format!("migrated {name}")
447 }
448 PassNote::TestsPassed { duration_s } => format!("{duration_s}s"),
449 PassNote::OperatorConfirmed { .. } => "operator".into(),
450 PassNote::Legacy { text } => text.clone(),
451 }
452 }
453
454 fn failure_short(f: &GateFailure) -> String {
455 match f {
456 GateFailure::CargoTest { failed_count, first_failed: Some(name) } =>
457 format!("{failed_count} test(s); first {name}"),
458 GateFailure::CargoTest { failed_count, .. } => format!("{failed_count} test(s) failed"),
459 GateFailure::MigrationDrift { migration } => format!("drift {migration}"),
460 GateFailure::MigrationModified { migration } => format!("modified {migration}"),
461 GateFailure::MigrationSqlError { migration, sqlstate: Some(s) } =>
462 format!("sql {migration} {s}"),
463 GateFailure::MigrationSqlError { migration, .. } => format!("sql {migration}"),
464 GateFailure::RestoreFailed { reason } => format!("restore: {reason}"),
465 GateFailure::BootPanic { exit_code: Some(c) } => format!("panic exit {c}"),
466 GateFailure::BootPanic { .. } => "panic".into(),
467 GateFailure::BootExitedEarly { exit_code: Some(c) } => format!("exited {c}"),
468 GateFailure::BootExitedEarly { .. } => "exited early".into(),
469 GateFailure::SpawnFailed { message } => format!("spawn: {message}"),
470 GateFailure::Timeout { gate, after_s } => format!("{gate} timeout {after_s}s"),
471 GateFailure::Unclassified { legacy_detail: Some(d) } => {
472 // Truncate aggressively — the events log is single-line.
473 d.lines().next().unwrap_or("").chars().take(80).collect()
474 }
475 GateFailure::Unclassified { .. } => "unclassified".into(),
476 }
477 }
478
479 /// Pick the one-word mark + color for a gate row in the tiers table.
480 /// `status` is NULL while the gate is in-flight.
481 fn gate_mark_and_style(g: &GateView) -> (&'static str, Style) {
482 match g.status.as_deref() {
483 Some("passed") => ("ok", Style::default().fg(Color::Green)),
484 Some("failed") => ("FAIL", Style::default().fg(Color::Red).add_modifier(Modifier::BOLD)),
485 Some("blocked") => ("blocked", Style::default().fg(Color::Yellow)),
486 Some(_) | None => ("...", Style::default().fg(Color::DarkGray)),
487 }
488 }
489
490 fn deploy_failure_short(f: &DeployFailureKind) -> String {
491 match f {
492 DeployFailureKind::NodeUnreachable { .. } => "node unreachable".into(),
493 DeployFailureKind::RsyncFailed { detail } => {
494 // Show the rsync exit hint if present (e.g. "(28)" for ENOSPC).
495 let first = detail.lines().next().unwrap_or("").trim_end();
496 format!("rsync: {}", first.chars().take(80).collect::<String>())
497 }
498 DeployFailureKind::SymlinkSwapFailed { .. } => "symlink swap".into(),
499 DeployFailureKind::ServiceRestartFailed { .. } => "service restart".into(),
500 DeployFailureKind::Unclassified { detail } =>
501 detail.lines().next().unwrap_or("").chars().take(80).collect(),
502 }
503 }
504
505 fn blocker_short(b: &GateBlocker) -> String {
506 match b {
507 GateBlocker::BurnInClockNotStarted => "burn-in clock not started".into(),
508 GateBlocker::BurnInRemaining { hours_remaining, hours_total } =>
509 format!("{hours_remaining}h of {hours_total}h remaining"),
510 GateBlocker::AwaitingOperatorConfirmation => "needs operator".into(),
511 GateBlocker::NoBackupAvailable => "no backup".into(),
512 GateBlocker::ScratchDbUrlUnset => "scratch_db_url unset".into(),
513 GateBlocker::ArtifactMissing { version } => format!("no artifact for {version}"),
514 }
515 }
516
517 // ---------- UI loop ----------
518
519 fn ui_loop<B: Backend>(
520 term: &mut Terminal<B>,
521 daemon: &str,
522 shared: &Arc<Mutex<Shared>>,
523 rt: &tokio::runtime::Handle,
524 ) -> Result<()> {
525 let (action_tx, mut action_rx) = mpsc::channel::<Action>(32);
526
527 // Action dispatcher task: posts to daemon, drops notice on Shared.
528 {
529 let shared = shared.clone();
530 let daemon = daemon.to_string();
531 rt.spawn(async move {
532 while let Some(act) = action_rx.recv().await {
533 let res = dispatch_action(&daemon, &act).await;
534 let line = match res {
535 Ok(msg) => format!("[ok] {act}: {msg}"),
536 Err(e) => format!("[err] {act}: {e}"),
537 };
538 let mut g = shared.lock().unwrap();
539 g.notice = Some(line.clone());
540 g.push_event(format!(" action {line}"));
541 }
542 });
543 }
544
545 loop {
546 term.draw(|f| draw(f, daemon, shared))?;
547
548 if event::poll(Duration::from_millis(120))? {
549 if let XEvent::Key(k) = event::read()? {
550 let selected_tier = {
551 let g = shared.lock().unwrap();
552 g.state
553 .as_ref()
554 .and_then(|s| s.tiers.get(g.selected).cloned())
555 };
556
557 match k.code {
558 KeyCode::Char('q') | KeyCode::Esc => return Ok(()),
559 KeyCode::Char('c') if k.modifiers.contains(KeyModifiers::CONTROL) => {
560 return Ok(());
561 }
562 KeyCode::Up | KeyCode::Char('k') => {
563 let mut g = shared.lock().unwrap();
564 g.selected = g.selected.saturating_sub(1);
565 }
566 KeyCode::Down | KeyCode::Char('j') => {
567 let mut g = shared.lock().unwrap();
568 let max = g
569 .state
570 .as_ref()
571 .map(|s| s.tiers.len().saturating_sub(1))
572 .unwrap_or(0);
573 if g.selected < max {
574 g.selected += 1;
575 }
576 }
577 KeyCode::Char('r') => {
578 // Nudge: state poller wakes every 2s anyway; this just notes intent.
579 shared.lock().unwrap().notice = Some("refresh on next tick".into());
580 }
581 KeyCode::Char('b') => {
582 let _ = action_tx.try_send(Action::BackupFetch);
583 }
584 KeyCode::Char('p') => {
585 if let Some(t) = selected_tier {
586 let _ = action_tx.try_send(Action::Promote { tier: t.name });
587 }
588 }
589 KeyCode::Char('R') => {
590 if let Some(t) = selected_tier {
591 let _ = action_tx.try_send(Action::Rollback { tier: t.name });
592 }
593 }
594 KeyCode::Char('c') => {
595 if let Some(t) = selected_tier {
596 let _ = action_tx.try_send(Action::Confirm { tier: t.name });
597 }
598 }
599 KeyCode::Char('[') => {
600 let mut g = shared.lock().unwrap();
601 g.focus_run = cycle_focus(&g.gate_tails, g.focus_run, -1);
602 }
603 KeyCode::Char(']') => {
604 let mut g = shared.lock().unwrap();
605 g.focus_run = cycle_focus(&g.gate_tails, g.focus_run, 1);
606 }
607 _ => {}
608 }
609 }
610 }
611 }
612 }
613
614 // ---------- actions ----------
615
616 #[derive(Clone, Debug)]
617 enum Action {
618 BackupFetch,
619 Promote { tier: String },
620 Rollback { tier: String },
621 Confirm { tier: String },
622 }
623
624 impl std::fmt::Display for Action {
625 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
626 match self {
627 Action::BackupFetch => write!(f, "backup/fetch"),
628 Action::Promote { tier } => write!(f, "promote/{tier}"),
629 Action::Rollback { tier } => write!(f, "rollback/{tier}"),
630 Action::Confirm { tier } => write!(f, "confirm/{tier}"),
631 }
632 }
633 }
634
635 async fn dispatch_action(daemon: &str, act: &Action) -> Result<String> {
636 let client = reqwest::Client::new();
637 let url = match act {
638 Action::BackupFetch => format!("{daemon}/backup/fetch"),
639 Action::Promote { tier } => format!("{daemon}/promote/{tier}"),
640 Action::Rollback { tier } => format!("{daemon}/rollback/{tier}"),
641 Action::Confirm { tier } => format!("{daemon}/confirm/{tier}"),
642 };
643 let mut req = client.post(&url).timeout(Duration::from_secs(120));
644 // /promote accepts an empty body now (defaults version to predecessor's
645 // current). The other endpoints take no body.
646 if matches!(act, Action::Promote { .. }) {
647 req = req.header("Content-Type", "application/json").body("{}");
648 }
649 let resp = req.send().await.context("send")?;
650 let status = resp.status();
651 let body = resp.text().await.unwrap_or_default();
652 if status.is_success() {
653 Ok(truncate(&body, 80))
654 } else {
655 Err(anyhow::anyhow!("HTTP {status}: {}", truncate(&body, 200)))
656 }
657 }
658
659 fn truncate(s: &str, n: usize) -> String {
660 if s.chars().count() <= n {
661 s.into()
662 } else {
663 s.chars().take(n).collect::<String>() + ""
664 }
665 }
666
667 // ---------- render ----------
668
669 fn draw(f: &mut Frame, daemon: &str, shared: &Arc<Mutex<Shared>>) {
670 let g = shared.lock().unwrap();
671 let chunks = Layout::default()
672 .direction(Direction::Vertical)
673 .constraints([
674 Constraint::Length(3), // header
675 Constraint::Length(10), // tiers
676 Constraint::Min(4), // events
677 Constraint::Length(8), // gate tail
678 Constraint::Length(2), // status
679 ])
680 .split(f.area());
681
682 let ws_label = if g.ws_ok { "ws ok" } else { "ws ..." };
683 let header = Paragraph::new(format!("sando -> {daemon} ({ws_label})"))
684 .block(Block::default().title("daemon").borders(Borders::ALL));
685 f.render_widget(header, chunks[0]);
686
687 // Tiers table
688 if let Some(s) = g.state.as_ref() {
689 let hr = Row::new(vec![
690 " ", "tier", "prov", "current", "previous", "burn-in", "nodes", "gates",
691 ])
692 .style(Style::default().add_modifier(Modifier::BOLD));
693
694 let rows: Vec<Row> = s
695 .tiers
696 .iter()
697 .enumerate()
698 .map(|(i, t)| {
699 let marker = if i == g.selected { ">" } else { " " };
700 let gates = if t.gates.is_empty() {
701 Line::from("-")
702 } else {
703 // Step 5: render Passed (green) / Failed (red) /
704 // Blocked (yellow) / in-flight (default) from the
705 // typed `status` column. Falls back to `passed` for
706 // rows written before migration 003 (which the
707 // backfill should have set, but defensive anyway).
708 let mut spans: Vec<Span> = Vec::new();
709 for (idx, g) in t.gates.iter().enumerate() {
710 if idx > 0 { spans.push(Span::raw(" ")); }
711 let (mark, style) = gate_mark_and_style(g);
712 spans.push(Span::raw(format!("{}:", g.kind)));
713 spans.push(Span::styled(mark, style));
714 }
715 Line::from(spans)
716 };
717 let cells: Vec<ratatui::widgets::Cell> = vec![
718 marker.to_string().into(),
719 t.name.clone().into(),
720 if t.provisioned { "yes".into() } else { "no".into() },
721 t.current_version.clone().unwrap_or_else(|| "-".into()).into(),
722 t.previous_version.clone().unwrap_or_else(|| "-".into()).into(),
723 t.burn_in_started_at
724 .as_deref()
725 .and_then(|s| s.get(11..19))
726 .unwrap_or("-")
727 .to_string()
728 .into(),
729 if t.nodes.is_empty() { "-".into() } else { t.nodes.join(",").into() },
730 gates.into(),
731 ];
732 let row = Row::new(cells);
733 if i == g.selected {
734 row.style(Style::default().add_modifier(Modifier::REVERSED))
735 } else {
736 row
737 }
738 })
739 .collect();
740
741 let widths = [
742 Constraint::Length(2),
743 Constraint::Length(8),
744 Constraint::Length(5),
745 Constraint::Length(12),
746 Constraint::Length(12),
747 Constraint::Length(10),
748 Constraint::Length(20),
749 Constraint::Min(20),
750 ];
751 let table = Table::new(rows, widths).header(hr).block(
752 Block::default()
753 .title(format!("tiers ({}) ↑/↓ select", s.tiers.len()))
754 .borders(Borders::ALL),
755 );
756 f.render_widget(table, chunks[1]);
757 } else {
758 let msg = g
759 .last_err
760 .clone()
761 .unwrap_or_else(|| "loading...".into());
762 let placeholder =
763 Paragraph::new(msg).block(Block::default().title("tiers").borders(Borders::ALL));
764 f.render_widget(placeholder, chunks[1]);
765 }
766
767 // Events
768 let area = chunks[2];
769 let visible = area.height.saturating_sub(2) as usize; // minus borders
770 let items: Vec<ListItem> = g
771 .events
772 .iter()
773 .rev()
774 .take(visible)
775 .rev()
776 .map(|line| ListItem::new(line.clone()))
777 .collect();
778 let events_block = List::new(items).block(
779 Block::default()
780 .title(format!("events ({})", g.events.len()))
781 .borders(Borders::ALL),
782 );
783 f.render_widget(events_block, area);
784
785 // Gate tail (per-run live log buffer).
786 let tail_area = chunks[3];
787 let tail = g.focus_run.and_then(|id| g.gate_tails.get(&id).map(|t| (id, t)));
788 let tail_widget = match tail {
789 Some((id, t)) => {
790 let title_state = match &t.status {
791 TailStatus::InFlight => "in-flight".to_string(),
792 TailStatus::Finished(s) => s.clone(),
793 };
794 let title = format!(
795 "tail [{}] {}/{} {}{title_state} ([) prev / (]) next {} of {}",
796 id, t.tier, t.version, t.gate,
797 tail_position(&g.gate_tails, id), g.gate_tails.len(),
798 );
799 let visible = tail_area.height.saturating_sub(2) as usize;
800 let items: Vec<ListItem> = t.lines
801 .iter()
802 .rev()
803 .take(visible)
804 .rev()
805 .map(|l| ListItem::new(truncate(l, 200)))
806 .collect();
807 List::new(items).block(Block::default().title(title).borders(Borders::ALL))
808 }
809 None => {
810 let body = "no recent gate runs";
811 List::new(vec![ListItem::new(body)])
812 .block(Block::default().title("tail").borders(Borders::ALL))
813 }
814 };
815 f.render_widget(tail_widget, tail_area);
816
817 // Status / keys
818 let keys = "[p] promote [R] rollback [b] backup [c] confirm [↑↓] select [[/]] tail [r] refresh [q] quit";
819 let status = if let Some(n) = &g.notice {
820 format!("{n} {keys}")
821 } else if let Some(e) = &g.last_err {
822 format!("error: {e} {keys}")
823 } else {
824 keys.into()
825 };
826 f.render_widget(Paragraph::new(status), chunks[4]);
827 }
828
829 /// 1-based index of `id` within the sorted tail map; 0 if not present.
830 fn tail_position(tails: &BTreeMap<GateRunId, GateTail>, id: GateRunId) -> usize {
831 tails.keys().position(|k| *k == id).map(|i| i + 1).unwrap_or(0)
832 }
833
834 /// Move `focus_run` by `step` positions through the sorted run map.
835 /// Wraps at both ends. Returns the new focus, or `None` if the map is
836 /// empty.
837 fn cycle_focus(
838 tails: &BTreeMap<GateRunId, GateTail>,
839 current: Option<GateRunId>,
840 step: i32,
841 ) -> Option<GateRunId> {
842 if tails.is_empty() { return None; }
843 let keys: Vec<GateRunId> = tails.keys().copied().collect();
844 let idx = current
845 .and_then(|c| keys.iter().position(|k| *k == c))
846 .unwrap_or(keys.len().saturating_sub(1));
847 let len = keys.len() as i32;
848 let new_idx = ((idx as i32 + step) % len + len) % len;
849 Some(keys[new_idx as usize])
850 }
851
852 // ---------- tests ----------
853
854 #[cfg(test)]
855 mod tests {
856 use super::*;
857
858 #[test]
859 fn ws_url_from_http() {
860 assert_eq!(ws_url_from("http://127.0.0.1:7766"), "ws://127.0.0.1:7766/events");
861 assert_eq!(ws_url_from("http://pop-os:7766"), "ws://pop-os:7766/events");
862 }
863
864 #[test]
865 fn ws_url_from_https() {
866 assert_eq!(
867 ws_url_from("https://sando.example.com"),
868 "wss://sando.example.com/events"
869 );
870 }
871
872 #[test]
873 fn ws_url_from_only_replaces_scheme_once() {
874 // A path or query containing the literal string `http://` should not be
875 // double-rewritten. Edge case but cheap to guard.
876 assert_eq!(
877 ws_url_from("http://host/api?next=http://other"),
878 "ws://host/api?next=http://other/events"
879 );
880 }
881
882 #[test]
883 fn ws_url_from_unknown_scheme_passes_through() {
884 // Bad input is left alone — `connect_async` will reject it later.
885 let s = ws_url_from("garbage://host");
886 assert_eq!(s, "garbage://host/events");
887 }
888
889 #[test]
890 fn format_event_build_ok() {
891 let json = r#"{"at":"2026-06-01T02:55:39.123Z","kind":"build_ok","sha":"3dc8dca7a120c4","version":"0.8.12","elapsed_s":132}"#;
892 let out = format_event(json).unwrap();
893 assert!(out.starts_with("02:55:39 build_ok"));
894 assert!(out.contains("v=0.8.12"));
895 assert!(out.contains("(132s)"));
896 }
897
898 #[test]
899 fn format_event_gate_done_passed_uses_pass_note() {
900 // Step 5+: GateDone carries a typed `outcome` and `run_id`.
901 let pass = format_event(
902 r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_done","run_id":17,"tier":"mm","version":"0.8.12","gate":"cargo_test","outcome":{"status":{"kind":"passed","note":{"kind":"tests_passed","duration_s":42}}}}"#,
903 )
904 .unwrap();
905 assert!(pass.contains("[17]"), "got: {pass}");
906 assert!(pass.contains("mm/0.8.12 cargo_test ok"), "got: {pass}");
907 assert!(pass.contains("42s"), "got: {pass}");
908 }
909
910 #[test]
911 fn format_event_gate_done_failed_uses_failure_variant() {
912 let fail = format_event(
913 r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_done","run_id":18,"tier":"mm","version":"0.8.12","gate":"cargo_test","outcome":{"status":{"kind":"failed","failure":{"kind":"cargo_test","failed_count":3,"first_failed":"foo::bar"}}}}"#,
914 )
915 .unwrap();
916 assert!(fail.contains("FAIL"), "got: {fail}");
917 assert!(fail.contains("3 test(s)"), "got: {fail}");
918 assert!(fail.contains("foo::bar"), "got: {fail}");
919 }
920
921 #[test]
922 fn format_event_gate_done_blocked_is_not_FAIL() {
923 // The whole point of the Blocked variant: an unstarted burn-in
924 // clock is not a defect, so the TUI renders it distinctly from a
925 // genuine failure. No "FAIL" string.
926 let blocked = format_event(
927 r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_done","run_id":19,"tier":"a","version":"0.9.6","gate":"burn_in","outcome":{"status":{"kind":"blocked","blocker":{"kind":"burn_in_clock_not_started"}}}}"#,
928 )
929 .unwrap();
930 assert!(blocked.contains("burn_in blocked"), "got: {blocked}");
931 assert!(!blocked.contains("FAIL"), "got: {blocked}");
932 assert!(blocked.contains("burn-in clock not started"), "got: {blocked}");
933 }
934
935 #[test]
936 fn format_event_gate_log_chunk_renders_truncated_line() {
937 let line = format_event(
938 r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_log_chunk","run_id":42,"seq":7,"text":" Compiling foo v0.1.0\n"}"#,
939 )
940 .unwrap();
941 assert!(line.contains("[42 #7]"), "got: {line}");
942 assert!(line.contains("Compiling foo"), "got: {line}");
943 // Trailing newline stripped, no FAIL/ok decorations.
944 assert!(!line.ends_with('\n'), "got: {line}");
945 }
946
947 #[test]
948 fn format_event_gate_log_chunk_truncates_long_input() {
949 // 300-char line should be cut to 160 — single-line UX.
950 let text = "x".repeat(300);
951 let json = format!(
952 r#"{{"at":"2026-06-01T02:55:39Z","kind":"gate_log_chunk","run_id":1,"seq":0,"text":"{text}"}}"#
953 );
954 let line = format_event(&json).unwrap();
955 // Total line includes timestamp + kind + prefix + the truncated body.
956 // The truncated body alone should be ≤ 160 chars of 'x'.
957 let x_count = line.chars().filter(|c| *c == 'x').count();
958 assert!(x_count <= 160, "got {x_count} x's; full line: {line}");
959 }
960
961 #[test]
962 fn format_event_backup_fetched() {
963 let out = format_event(
964 r#"{"at":"2026-06-01T04:00:01Z","kind":"backup_fetched","source":"ssh://backup-puller@alpha-west-1:2200/latest.sql.gz","byte_size":36079398}"#,
965 )
966 .unwrap();
967 assert!(out.starts_with("04:00:01 backup_fetched"));
968 assert!(out.contains("36079398"));
969 assert!(out.contains("ssh://"));
970 }
971
972 #[test]
973 fn format_event_deploy_failed_uses_typed_failure() {
974 // Step 7: DeployFailed carries a typed DeployFailureKind.
975 let out = format_event(
976 r#"{"at":"2026-06-01T02:55:39Z","kind":"deploy_failed","tier":"a","node":"testnot-1","version":"0.8.12","failure":{"kind":"rsync_failed","detail":"rsync: write failed: No space left on device (28)"}}"#,
977 )
978 .unwrap();
979 assert!(out.contains("a -> testnot-1"), "got: {out}");
980 assert!(out.contains("FAILED"), "got: {out}");
981 assert!(out.contains("rsync"), "got: {out}");
982 assert!(out.contains("No space left"), "got: {out}");
983 }
984
985 #[test]
986 fn format_event_deploy_failed_node_unreachable_short_form() {
987 // NodeUnreachable renders as a stable short label; the operator
988 // doesn't need the full ssh chain in the events log.
989 let out = format_event(
990 r#"{"at":"2026-06-01T02:55:39Z","kind":"deploy_failed","tier":"a","node":"testnot-1","version":"0.8.12","failure":{"kind":"node_unreachable","detail":"Connection refused"}}"#,
991 )
992 .unwrap();
993 assert!(out.contains("node unreachable"), "got: {out}");
994 }
995
996 #[test]
997 fn format_event_unknown_kind_returns_none() {
998 // Step 5: typed parse rejects unknown kinds. The caller falls
999 // back to the raw text (see events_subscriber's
1000 // `unwrap_or_else(|| t.to_string())`), so unknown events still
1001 // appear in the log; they just don't get the prettified line.
1002 let json = r#"{"at":"2026-06-01T02:55:39Z","kind":"new_thing","whatever":42}"#;
1003 assert_eq!(format_event(json), None);
1004 }
1005
1006 #[test]
1007 fn format_event_malformed_returns_none() {
1008 // Not a JSON object → None; caller falls back to the raw text.
1009 assert_eq!(format_event("not json"), None);
1010 assert_eq!(format_event("[1,2,3]"), None);
1011 // Object without `kind` → None too.
1012 assert_eq!(format_event(r#"{"at":"2026-06-01T02:55:39Z"}"#), None);
1013 }
1014
1015 #[test]
1016 fn action_display_strings() {
1017 // These show up verbatim in the events log; the daemon's gate parser
1018 // doesn't read them, but the operator does. Keep stable.
1019 assert_eq!(Action::BackupFetch.to_string(), "backup/fetch");
1020 assert_eq!(
1021 Action::Promote { tier: "a".into() }.to_string(),
1022 "promote/a"
1023 );
1024 assert_eq!(
1025 Action::Rollback { tier: "b".into() }.to_string(),
1026 "rollback/b"
1027 );
1028 assert_eq!(
1029 Action::Confirm { tier: "mm".into() }.to_string(),
1030 "confirm/mm"
1031 );
1032 }
1033
1034 #[test]
1035 fn truncate_short_returns_same() {
1036 assert_eq!(truncate("hi", 10), "hi");
1037 }
1038
1039 #[test]
1040 fn truncate_long_appends_ellipsis() {
1041 let s = "a".repeat(200);
1042 let out = truncate(&s, 10);
1043 assert_eq!(out.chars().count(), 11); // 10 chars + ellipsis
1044 assert!(out.ends_with(''));
1045 }
1046
1047 #[test]
1048 fn shared_push_event_caps_at_capacity() {
1049 let mut sh = Shared::default();
1050 for i in 0..(EVENTS_CAP + 50) {
1051 sh.push_event(format!("e{i}"));
1052 }
1053 assert_eq!(sh.events.len(), EVENTS_CAP);
1054 // Oldest 50 were dropped: front should be "e50".
1055 assert_eq!(sh.events.front().unwrap(), "e50");
1056 assert_eq!(sh.events.back().unwrap(), &format!("e{}", EVENTS_CAP + 49));
1057 }
1058
1059 // ----- gate tail buffer (Phase C) -----
1060
1061 fn tail_fixture() -> GateTail {
1062 GateTail::new(TierId::new("a"), "0.9.6".parse().unwrap(), GateKind::CargoTest)
1063 }
1064
1065 #[test]
1066 fn gate_tail_splits_chunks_on_newlines() {
1067 let mut t = tail_fixture();
1068 t.push_chunk("one\ntwo\n");
1069 assert_eq!(t.lines.len(), 2);
1070 assert_eq!(t.lines[0], "one");
1071 assert_eq!(t.lines[1], "two");
1072 assert!(t.partial.is_empty());
1073 }
1074
1075 #[test]
1076 fn gate_tail_buffers_partial_line_across_chunks() {
1077 // Chunks arrive at tokio read boundaries — a single logical line
1078 // can span multiple chunks. The tail must wait for `\n` before
1079 // committing.
1080 let mut t = tail_fixture();
1081 t.push_chunk("Compi");
1082 t.push_chunk("ling foo");
1083 assert!(t.lines.is_empty());
1084 assert_eq!(t.partial, "Compiling foo");
1085 t.push_chunk(" v0.1.0\nDone\n");
1086 assert_eq!(t.lines.len(), 2);
1087 assert_eq!(t.lines[0], "Compiling foo v0.1.0");
1088 assert_eq!(t.lines[1], "Done");
1089 }
1090
1091 #[test]
1092 fn gate_tail_strips_carriage_returns() {
1093 // Cargo emits `\r\n` on Windows-builds; treat the `\r` as part of
1094 // the line terminator so the pane doesn't render visible `^M`s.
1095 let mut t = tail_fixture();
1096 t.push_chunk("hello\r\nworld\r\n");
1097 assert_eq!(t.lines[0], "hello");
1098 assert_eq!(t.lines[1], "world");
1099 }
1100
1101 #[test]
1102 fn gate_tail_caps_lines_at_tail_lines_cap() {
1103 let mut t = tail_fixture();
1104 for i in 0..(TAIL_LINES_CAP + 50) {
1105 t.push_chunk(&format!("line {i}\n"));
1106 }
1107 assert_eq!(t.lines.len(), TAIL_LINES_CAP);
1108 assert_eq!(t.lines.front().unwrap(), &format!("line {}", 50));
1109 }
1110
1111 #[test]
1112 fn gate_tail_finalize_flushes_trailing_partial() {
1113 // If the gate's last write didn't end in `\n` (panic mid-line,
1114 // SIGKILL), the partial-buffered tail must still surface on
1115 // finalize.
1116 let mut t = tail_fixture();
1117 t.push_chunk("partial without newline");
1118 assert!(t.lines.is_empty());
1119 t.finalize("failed".into());
1120 assert_eq!(t.lines.len(), 1);
1121 assert_eq!(t.lines[0], "partial without newline");
1122 assert_eq!(t.status, TailStatus::Finished("failed".into()));
1123 }
1124
1125 #[test]
1126 fn shared_open_tail_evicts_oldest_at_capacity() {
1127 let mut sh = Shared::default();
1128 for i in 0..(GATE_TAILS_CAP + 3) as i64 {
1129 sh.open_tail(
1130 GateRunId(i + 1),
1131 TierId::new("a"),
1132 "0.9.6".parse().unwrap(),
1133 GateKind::CargoTest,
1134 );
1135 }
1136 assert_eq!(sh.gate_tails.len(), GATE_TAILS_CAP);
1137 // The first 3 (run_id 1, 2, 3) were evicted; the smallest key
1138 // remaining should be 4.
1139 assert_eq!(*sh.gate_tails.keys().next().unwrap(), GateRunId(4));
1140 // focus_run follows the most recent insertion.
1141 assert_eq!(sh.focus_run, Some(GateRunId((GATE_TAILS_CAP + 3) as i64)));
1142 }
1143
1144 #[test]
1145 fn shared_push_tail_chunk_is_noop_for_unknown_run() {
1146 // GateLogChunk can arrive before GateStart if a frame lags; just
1147 // drop it rather than synthesizing a header-less run.
1148 let mut sh = Shared::default();
1149 sh.push_tail_chunk(GateRunId(99), "orphaned\n");
1150 assert!(sh.gate_tails.is_empty());
1151 }
1152
1153 #[test]
1154 fn cycle_focus_wraps_in_both_directions() {
1155 let mut sh = Shared::default();
1156 for i in 1..=3i64 {
1157 sh.open_tail(
1158 GateRunId(i),
1159 TierId::new("a"),
1160 "0.9.6".parse().unwrap(),
1161 GateKind::CargoTest,
1162 );
1163 }
1164 // Currently focused on the most recent (run 3).
1165 assert_eq!(sh.focus_run, Some(GateRunId(3)));
1166 // Forward wraps to the smallest.
1167 let next = cycle_focus(&sh.gate_tails, sh.focus_run, 1);
1168 assert_eq!(next, Some(GateRunId(1)));
1169 // Backward from the smallest wraps to the largest.
1170 let prev = cycle_focus(&sh.gate_tails, Some(GateRunId(1)), -1);
1171 assert_eq!(prev, Some(GateRunId(3)));
1172 }
1173
1174 #[test]
1175 fn cycle_focus_handles_empty_map() {
1176 let sh = Shared::default();
1177 assert_eq!(cycle_focus(&sh.gate_tails, None, 1), None);
1178 assert_eq!(cycle_focus(&sh.gate_tails, Some(GateRunId(7)), -1), None);
1179 }
1180
1181 #[test]
1182 fn dispatch_ws_routes_gate_log_chunks_into_tail_not_events() {
1183 // The contract: a busy gate (hundreds of GateLogChunks) must NOT
1184 // evict other events from the main ring. Only GateStart and
1185 // GateDone get an events-log line.
1186 let sh = Arc::new(Mutex::new(Shared::default()));
1187 let start = r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_start","run_id":17,"tier":"a","version":"0.9.6","gate":"cargo_test"}"#;
1188 dispatch_ws_frame(&sh, start);
1189 for i in 0..50 {
1190 let chunk = format!(
1191 r#"{{"at":"2026-06-01T02:55:39Z","kind":"gate_log_chunk","run_id":17,"seq":{i},"text":"line {i}\n"}}"#
1192 );
1193 dispatch_ws_frame(&sh, &chunk);
1194 }
1195 let done = r#"{"at":"2026-06-01T02:55:40Z","kind":"gate_done","run_id":17,"tier":"a","version":"0.9.6","gate":"cargo_test","outcome":{"status":{"kind":"passed","note":{"kind":"tests_passed","duration_s":1}}}}"#;
1196 dispatch_ws_frame(&sh, done);
1197
1198 let g = sh.lock().unwrap();
1199 // Two lines in the events ring: start + done. Zero chunk lines.
1200 assert_eq!(g.events.len(), 2);
1201 assert!(g.events.front().unwrap().contains("gate_start"));
1202 assert!(g.events.back().unwrap().contains("gate_done"));
1203 // 50 lines in the tail buffer.
1204 let tail = g.gate_tails.get(&GateRunId(17)).expect("tail entry");
1205 assert_eq!(tail.lines.len(), 50);
1206 assert_eq!(tail.lines.back().unwrap(), "line 49");
1207 assert_eq!(tail.status, TailStatus::Finished("passed".into()));
1208 }
1209 }
1210