//! sando-tui — operator front-end for sandod. //! //! Layout (top to bottom): //! //! ┌ daemon ─────────────────────────────────────────┐ //! │ sando -> http://...:7766 (ws: connected) │ //! ├ tiers ───────────────────── (arrows to select) │ //! │ tier prov current previous burn-in gates │ //! │ mm yes 0.8.12 - - ... │ //! │ a >yes< 0.8.12 - 02:34Z ... │ ← highlighted //! ├ events (WS /events) ────────────────────────────│ //! │ 02:55:39 backup_fetched 36MB │ //! │ 02:55:40 manual_confirm tier=a v=0.8.12 │ //! ├ tail [17] a/0.9.6 cargo_test — in-flight ───────│ //! │ running 1 test │ //! │ test routes::tests::ok ... ok │ //! ├ status / keys ──────────────────────────────────│ //! │ [p] promote [R] rollback [b] backup [c] confirm [[/]] tail [r] refresh [q] quit │ //! └─────────────────────────────────────────────────┘ //! //! State is refreshed every 2s by a background task. Events arrive in real //! time via WS. Actions are issued as background HTTP POSTs; their result //! shows up in the events log a moment later. use anyhow::{Context, Result}; use crossterm::event::{self, Event as XEvent, KeyCode, KeyModifiers}; use crossterm::terminal::{ disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen, }; use futures_util::StreamExt; use ratatui::prelude::*; use ratatui::widgets::{Block, Borders, List, ListItem, Paragraph, Row, Table}; use sando_daemon::domain::{GateKind, GateRunId, TierId, Version}; use sando_daemon::events::{Event, EventEnvelope}; use sando_daemon::outcome::{DeployFailureKind, GateBlocker, GateFailure, GateStatus, PassNote}; use serde::Deserialize; use std::collections::{BTreeMap, VecDeque}; use std::io; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::mpsc; // ---------- daemon types ---------- // // `StateView` mirrors the JSON shape of `GET /state` from // `sando_daemon::routes`. The TUI deserializes a subset (only what it // renders) — adding new fields on the daemon side is a non-breaking // change as long as the field names stay stable. #[derive(Clone, Debug, Deserialize)] struct StateView { tiers: Vec, } #[derive(Clone, Debug, Deserialize)] struct TierView { name: String, provisioned: bool, #[allow(dead_code)] canary: String, current_version: Option, previous_version: Option, burn_in_started_at: Option, nodes: Vec, gates: Vec, } #[derive(Clone, Debug, Deserialize)] struct GateView { kind: String, #[allow(dead_code)] finished_at: Option, /// `'passed' | 'failed' | 'blocked'` or NULL. NULL = in-flight. #[serde(default)] status: Option, /// Full typed `GateOutcome`, JSON object. We don't deserialize the /// inner shape on the polling path — the events handler does that /// when the gate finishes, since that's when classification actually /// matters for the operator's render. Carried here as opaque JSON. #[serde(default)] #[allow(dead_code)] outcome: Option, #[serde(default)] #[allow(dead_code)] log_ref: Option, } // ---------- app state shared between tasks ---------- #[derive(Default)] struct Shared { state: Option, last_err: Option, events: VecDeque, ws_ok: bool, selected: usize, notice: Option, /// Per-run live-tail buffers keyed by `GateRunId`. Populated by /// `GateStart` / `GateLogChunk` / `GateDone`. Bounded to the most /// recent `GATE_TAILS_CAP` runs (LRU by start time). gate_tails: BTreeMap, /// Run shown in the tail pane. Defaults to the most recently active /// run; updated as new `GateStart` events arrive. focus_run: Option, } const EVENTS_CAP: usize = 200; const GATE_TAILS_CAP: usize = 10; const TAIL_LINES_CAP: usize = 200; /// Per-run live-tail state. Receives `GateLogChunk` bytes and presents /// them as a line-aware ring buffer. Chunks are NOT line-aligned at the /// transport — we buffer a trailing partial line across chunks so the UI /// only renders complete lines (and one trailing in-flight line, on /// `close`/finalize). struct GateTail { tier: TierId, version: Version, gate: GateKind, /// Complete lines, oldest first. Newest end of the deque is what the /// pane shows at the bottom. lines: VecDeque, /// Trailing bytes after the last `\n` in any chunk so far. Flushed /// into `lines` on `finalize` (so the operator sees the last line /// even if the gate exited without a final newline). partial: String, status: TailStatus, } #[derive(Clone, Debug, PartialEq, Eq)] enum TailStatus { InFlight, Finished(String), } impl GateTail { fn new(tier: TierId, version: Version, gate: GateKind) -> Self { Self { tier, version, gate, lines: VecDeque::new(), partial: String::new(), status: TailStatus::InFlight, } } /// Append a chunk of stdout/stderr. Splits on `\n`, buffers the /// trailing partial line, caps the ring at `TAIL_LINES_CAP`. fn push_chunk(&mut self, text: &str) { let combined = std::mem::take(&mut self.partial) + text; let mut rest = combined.as_str(); while let Some(idx) = rest.find('\n') { let (line, after) = rest.split_at(idx); self.push_line(line.trim_end_matches('\r').to_string()); rest = &after[1..]; } self.partial = rest.to_string(); } fn push_line(&mut self, line: String) { if self.lines.len() >= TAIL_LINES_CAP { self.lines.pop_front(); } self.lines.push_back(line); } /// Called when `GateDone` arrives. Flushes any trailing partial /// line and records the gate's final status word. fn finalize(&mut self, status_word: String) { let trailing = std::mem::take(&mut self.partial); if !trailing.is_empty() { self.push_line(trailing); } self.status = TailStatus::Finished(status_word); } } impl Shared { fn push_event(&mut self, line: String) { if self.events.len() >= EVENTS_CAP { self.events.pop_front(); } self.events.push_back(line); } fn open_tail(&mut self, run_id: GateRunId, tier: TierId, version: Version, gate: GateKind) { if self.gate_tails.len() >= GATE_TAILS_CAP { // BTreeMap is sorted by GateRunId (monotonic from the daemon's // INSERT … RETURNING id), so the smallest key is the oldest run. if let Some((&oldest, _)) = self.gate_tails.iter().next() { self.gate_tails.remove(&oldest); } } self.gate_tails.insert(run_id, GateTail::new(tier, version, gate)); self.focus_run = Some(run_id); } fn push_tail_chunk(&mut self, run_id: GateRunId, text: &str) { if let Some(t) = self.gate_tails.get_mut(&run_id) { t.push_chunk(text); } } fn finalize_tail(&mut self, run_id: GateRunId, status_word: String) { if let Some(t) = self.gate_tails.get_mut(&run_id) { t.finalize(status_word); } } } // ---------- main ---------- fn main() -> Result<()> { let daemon = std::env::var("SANDO_DAEMON").unwrap_or_else(|_| "http://127.0.0.1:7766".into()); let shared = Arc::new(Mutex::new(Shared::default())); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(2) .build()?; // Background pollers + WS subscriber. let _g = rt.enter(); rt.spawn(state_poller(daemon.clone(), shared.clone())); rt.spawn(events_subscriber(daemon.clone(), shared.clone())); enable_raw_mode()?; let mut stdout = io::stdout(); crossterm::execute!(stdout, EnterAlternateScreen)?; let backend = CrosstermBackend::new(stdout); let mut term = Terminal::new(backend)?; let res = ui_loop(&mut term, &daemon, &shared, rt.handle()); disable_raw_mode()?; crossterm::execute!(term.backend_mut(), LeaveAlternateScreen)?; term.show_cursor()?; res } // ---------- background tasks ---------- async fn state_poller(daemon: String, shared: Arc>) { let url = format!("{}/state", daemon); let client = reqwest::Client::new(); loop { match client.get(&url).timeout(Duration::from_secs(2)).send().await { Ok(resp) if resp.status().is_success() => match resp.json::().await { Ok(s) => { let mut g = shared.lock().unwrap(); let n = s.tiers.len().saturating_sub(1); if g.selected > n { g.selected = n; } g.state = Some(s); g.last_err = None; } Err(e) => shared.lock().unwrap().last_err = Some(format!("decode: {e}")), }, Ok(resp) => { shared.lock().unwrap().last_err = Some(format!("status {}", resp.status())); } Err(e) => shared.lock().unwrap().last_err = Some(e.to_string()), } tokio::time::sleep(Duration::from_secs(2)).await; } } async fn events_subscriber(daemon: String, shared: Arc>) { let ws_url = ws_url_from(&daemon); loop { match tokio_tungstenite::connect_async(&ws_url).await { Ok((mut socket, _resp)) => { shared.lock().unwrap().ws_ok = true; while let Some(msg) = socket.next().await { match msg { Ok(tokio_tungstenite::tungstenite::Message::Text(t)) => { dispatch_ws_frame(&shared, &t); } Ok(tokio_tungstenite::tungstenite::Message::Close(_)) => break, Err(_) => break, _ => {} } } shared.lock().unwrap().ws_ok = false; } Err(_) => { shared.lock().unwrap().ws_ok = false; } } tokio::time::sleep(Duration::from_secs(3)).await; } } /// Convert the daemon's HTTP URL into the matching WebSocket URL for /events. /// Plain string replace; rejects nothing — bad input fails later when /// `connect_async` actually dials. fn ws_url_from(daemon: &str) -> String { daemon .replacen("https://", "wss://", 1) .replacen("http://", "ws://", 1) + "/events" } /// Route an incoming WS frame: gate live-tail events update the per-run /// buffer, every other event becomes one line in the events ring. /// /// `GateLogChunk` does NOT push into the events ring — a busy gate emits /// hundreds of chunks per second, which would evict every other event in /// under a minute. The chunk text only lives in the per-run tail buffer. fn dispatch_ws_frame(shared: &Arc>, raw: &str) { if let Some(line) = format_lagged(raw) { shared.lock().unwrap().push_event(line); return; } let Ok(env) = serde_json::from_str::(raw) else { // Unparseable frame — surface verbatim so the operator can see it. shared.lock().unwrap().push_event(raw.to_string()); return; }; match &env.event { Event::GateStart { run_id, tier, version, gate } => { let mut g = shared.lock().unwrap(); g.open_tail(*run_id, tier.clone(), version.clone(), *gate); let line = format_event_line(&env); g.push_event(line); } Event::GateLogChunk { run_id, text, .. } => { shared.lock().unwrap().push_tail_chunk(*run_id, text); } Event::GateDone { run_id, outcome, .. } => { let status_word = outcome.status_str().to_string(); let mut g = shared.lock().unwrap(); g.finalize_tail(*run_id, status_word); let line = format_event_line(&env); g.push_event(line); } _ => { let line = format_event_line(&env); shared.lock().unwrap().push_event(line); } } } fn format_event_line(env: &EventEnvelope) -> String { let time = env.at.format("%H:%M:%S").to_string(); let kind = event_kind_str(&env.event); let body = format_event_body(&env.event); format!("{time} {kind} {body}") } /// Render one event envelope into a human-readable single line. /// /// Step 5: deserializes the daemon's `EventEnvelope` directly via the /// shared `sando_daemon::events` types. No more `serde_json::Value` /// reflection; the compiler enforces every field name. The `lagged` /// frame is a daemon-emitted ad-hoc envelope that doesn't match the /// `Event` enum (it's `{"kind":"lagged","skipped":N}`), so we handle it /// with a one-shot probe before the typed parse. #[cfg(test)] fn format_event(json: &str) -> Option { if let Some(line) = format_lagged(json) { return Some(line); } let env: EventEnvelope = serde_json::from_str(json).ok()?; Some(format_event_line(&env)) } fn format_lagged(json: &str) -> Option { let v: serde_json::Value = serde_json::from_str(json).ok()?; if v.get("kind").and_then(|x| x.as_str()) != Some("lagged") { return None; } let at = v.get("at").and_then(|x| x.as_str()).unwrap_or(""); let time = at.get(11..19).unwrap_or(""); let skipped = v.get("skipped") .and_then(|x| x.as_u64()) .map(|n| n.to_string()) .unwrap_or_else(|| "?".into()); Some(format!("{time} lagged (skipped {skipped} events)")) } fn event_kind_str(e: &Event) -> &'static str { match e { Event::RebuildRequested { .. } => "rebuild_requested", Event::BuildAborted { .. } => "build_aborted", Event::BuildStart { .. } => "build_start", Event::BuildOk { .. } => "build_ok", Event::BuildFailed { .. } => "build_failed", Event::GateStart { .. } => "gate_start", Event::GateLogChunk { .. } => "gate_log", Event::GateDone { .. } => "gate_done", Event::DeployStart { .. } => "deploy_start", Event::DeployOk { .. } => "deploy_ok", Event::DeployFailed { .. } => "deploy_failed", Event::PromoteComplete { .. } => "promote_complete", Event::Rollback { .. } => "rollback", Event::BackupFetched { .. } => "backup_fetched", Event::ManualConfirm { .. } => "manual_confirm", } } fn format_event_body(e: &Event) -> String { match e { Event::RebuildRequested { sha } => format!("sha={}", sha.short()), Event::BuildAborted { sha_aborted } => format!("aborted prev sha={}", sha_aborted.short()), Event::BuildStart { sha, version } => format!("v={version} sha={}", sha.short()), Event::BuildOk { version, elapsed_s, .. } => format!("v={version} ({elapsed_s}s)"), Event::BuildFailed { version, elapsed_s, .. } => format!("v={version} FAILED ({elapsed_s}s)"), Event::GateStart { run_id, tier, version, gate } => format!("[{run_id}] {tier}/{version} {gate} start"), Event::GateLogChunk { run_id, seq, text } => { // Single-line summary used by `format_event` (test path only); // production rendering routes chunks through `dispatch_ws_frame` // into the per-run tail buffer, never the events ring. let one = text.lines().next().unwrap_or("").trim_end(); let truncated: String = one.chars().take(160).collect(); format!("[{run_id} #{seq}] {truncated}") } Event::GateDone { run_id, tier, version, gate, outcome, .. } => { let summary = match &outcome.status { GateStatus::Passed { note } => format!("ok ({})", pass_note_short(note)), GateStatus::Failed { failure } => format!("FAIL ({})", failure_short(failure)), GateStatus::Blocked { blocker } => format!("blocked ({})", blocker_short(blocker)), }; format!("[{run_id}] {tier}/{version} {gate} {summary}") } Event::DeployStart { tier, node, version } => format!("{tier} -> {node} v={version}"), Event::DeployOk { tier, node, version } => format!("{tier} -> {node} v={version} ok"), Event::DeployFailed { tier, node, version, failure } => format!("{tier} -> {node} v={version} FAILED: {}", deploy_failure_short(failure)), Event::PromoteComplete { tier, version } => format!("{tier} -> v={version} complete"), Event::Rollback { tier, from, to } => format!("{tier} {from} -> {to}"), Event::BackupFetched { source, byte_size } => format!("backup {byte_size} bytes from {source}"), Event::ManualConfirm { tier, version } => format!("{tier} v={version} confirmed"), } } fn pass_note_short(n: &PassNote) -> String { match n { PassNote::StayedUp { duration_s } => format!("up {duration_s}s"), PassNote::BurnInElapsed { hours } => format!("{hours}h elapsed"), PassNote::Migrated { backup_path } => { // Show just the basename; full path lives in /state if needed. let name = backup_path.rsplit('/').next().unwrap_or(backup_path); format!("migrated {name}") } PassNote::TestsPassed { duration_s } => format!("{duration_s}s"), PassNote::OperatorConfirmed { .. } => "operator".into(), PassNote::Legacy { text } => text.clone(), } } fn failure_short(f: &GateFailure) -> String { match f { GateFailure::CargoTest { failed_count, first_failed: Some(name) } => format!("{failed_count} test(s); first {name}"), GateFailure::CargoTest { failed_count, .. } => format!("{failed_count} test(s) failed"), GateFailure::MigrationDrift { migration } => format!("drift {migration}"), GateFailure::MigrationModified { migration } => format!("modified {migration}"), GateFailure::MigrationSqlError { migration, sqlstate: Some(s) } => format!("sql {migration} {s}"), GateFailure::MigrationSqlError { migration, .. } => format!("sql {migration}"), GateFailure::RestoreFailed { reason } => format!("restore: {reason}"), GateFailure::BootPanic { exit_code: Some(c) } => format!("panic exit {c}"), GateFailure::BootPanic { .. } => "panic".into(), GateFailure::BootExitedEarly { exit_code: Some(c) } => format!("exited {c}"), GateFailure::BootExitedEarly { .. } => "exited early".into(), GateFailure::SpawnFailed { message } => format!("spawn: {message}"), GateFailure::Timeout { gate, after_s } => format!("{gate} timeout {after_s}s"), GateFailure::Unclassified { legacy_detail: Some(d) } => { // Truncate aggressively — the events log is single-line. d.lines().next().unwrap_or("").chars().take(80).collect() } GateFailure::Unclassified { .. } => "unclassified".into(), } } /// Pick the one-word mark + color for a gate row in the tiers table. /// `status` is NULL while the gate is in-flight. fn gate_mark_and_style(g: &GateView) -> (&'static str, Style) { match g.status.as_deref() { Some("passed") => ("ok", Style::default().fg(Color::Green)), Some("failed") => ("FAIL", Style::default().fg(Color::Red).add_modifier(Modifier::BOLD)), Some("blocked") => ("blocked", Style::default().fg(Color::Yellow)), Some(_) | None => ("...", Style::default().fg(Color::DarkGray)), } } fn deploy_failure_short(f: &DeployFailureKind) -> String { match f { DeployFailureKind::NodeUnreachable { .. } => "node unreachable".into(), DeployFailureKind::RsyncFailed { detail } => { // Show the rsync exit hint if present (e.g. "(28)" for ENOSPC). let first = detail.lines().next().unwrap_or("").trim_end(); format!("rsync: {}", first.chars().take(80).collect::()) } DeployFailureKind::SymlinkSwapFailed { .. } => "symlink swap".into(), DeployFailureKind::ServiceRestartFailed { .. } => "service restart".into(), DeployFailureKind::Unclassified { detail } => detail.lines().next().unwrap_or("").chars().take(80).collect(), } } fn blocker_short(b: &GateBlocker) -> String { match b { GateBlocker::BurnInClockNotStarted => "burn-in clock not started".into(), GateBlocker::BurnInRemaining { hours_remaining, hours_total } => format!("{hours_remaining}h of {hours_total}h remaining"), GateBlocker::AwaitingOperatorConfirmation => "needs operator".into(), GateBlocker::NoBackupAvailable => "no backup".into(), GateBlocker::ScratchDbUrlUnset => "scratch_db_url unset".into(), GateBlocker::ArtifactMissing { version } => format!("no artifact for {version}"), } } // ---------- UI loop ---------- fn ui_loop( term: &mut Terminal, daemon: &str, shared: &Arc>, rt: &tokio::runtime::Handle, ) -> Result<()> { let (action_tx, mut action_rx) = mpsc::channel::(32); // Action dispatcher task: posts to daemon, drops notice on Shared. { let shared = shared.clone(); let daemon = daemon.to_string(); rt.spawn(async move { while let Some(act) = action_rx.recv().await { let res = dispatch_action(&daemon, &act).await; let line = match res { Ok(msg) => format!("[ok] {act}: {msg}"), Err(e) => format!("[err] {act}: {e}"), }; let mut g = shared.lock().unwrap(); g.notice = Some(line.clone()); g.push_event(format!(" action {line}")); } }); } loop { term.draw(|f| draw(f, daemon, shared))?; if event::poll(Duration::from_millis(120))? { if let XEvent::Key(k) = event::read()? { let selected_tier = { let g = shared.lock().unwrap(); g.state .as_ref() .and_then(|s| s.tiers.get(g.selected).cloned()) }; match k.code { KeyCode::Char('q') | KeyCode::Esc => return Ok(()), KeyCode::Char('c') if k.modifiers.contains(KeyModifiers::CONTROL) => { return Ok(()); } KeyCode::Up | KeyCode::Char('k') => { let mut g = shared.lock().unwrap(); g.selected = g.selected.saturating_sub(1); } KeyCode::Down | KeyCode::Char('j') => { let mut g = shared.lock().unwrap(); let max = g .state .as_ref() .map(|s| s.tiers.len().saturating_sub(1)) .unwrap_or(0); if g.selected < max { g.selected += 1; } } KeyCode::Char('r') => { // Nudge: state poller wakes every 2s anyway; this just notes intent. shared.lock().unwrap().notice = Some("refresh on next tick".into()); } KeyCode::Char('b') => { let _ = action_tx.try_send(Action::BackupFetch); } KeyCode::Char('p') => { if let Some(t) = selected_tier { let _ = action_tx.try_send(Action::Promote { tier: t.name }); } } KeyCode::Char('R') => { if let Some(t) = selected_tier { let _ = action_tx.try_send(Action::Rollback { tier: t.name }); } } KeyCode::Char('c') => { if let Some(t) = selected_tier { let _ = action_tx.try_send(Action::Confirm { tier: t.name }); } } KeyCode::Char('[') => { let mut g = shared.lock().unwrap(); g.focus_run = cycle_focus(&g.gate_tails, g.focus_run, -1); } KeyCode::Char(']') => { let mut g = shared.lock().unwrap(); g.focus_run = cycle_focus(&g.gate_tails, g.focus_run, 1); } _ => {} } } } } } // ---------- actions ---------- #[derive(Clone, Debug)] enum Action { BackupFetch, Promote { tier: String }, Rollback { tier: String }, Confirm { tier: String }, } impl std::fmt::Display for Action { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Action::BackupFetch => write!(f, "backup/fetch"), Action::Promote { tier } => write!(f, "promote/{tier}"), Action::Rollback { tier } => write!(f, "rollback/{tier}"), Action::Confirm { tier } => write!(f, "confirm/{tier}"), } } } async fn dispatch_action(daemon: &str, act: &Action) -> Result { let client = reqwest::Client::new(); let url = match act { Action::BackupFetch => format!("{daemon}/backup/fetch"), Action::Promote { tier } => format!("{daemon}/promote/{tier}"), Action::Rollback { tier } => format!("{daemon}/rollback/{tier}"), Action::Confirm { tier } => format!("{daemon}/confirm/{tier}"), }; let mut req = client.post(&url).timeout(Duration::from_secs(120)); // /promote accepts an empty body now (defaults version to predecessor's // current). The other endpoints take no body. if matches!(act, Action::Promote { .. }) { req = req.header("Content-Type", "application/json").body("{}"); } let resp = req.send().await.context("send")?; let status = resp.status(); let body = resp.text().await.unwrap_or_default(); if status.is_success() { Ok(truncate(&body, 80)) } else { Err(anyhow::anyhow!("HTTP {status}: {}", truncate(&body, 200))) } } fn truncate(s: &str, n: usize) -> String { if s.chars().count() <= n { s.into() } else { s.chars().take(n).collect::() + "…" } } // ---------- render ---------- fn draw(f: &mut Frame, daemon: &str, shared: &Arc>) { let g = shared.lock().unwrap(); let chunks = Layout::default() .direction(Direction::Vertical) .constraints([ Constraint::Length(3), // header Constraint::Length(10), // tiers Constraint::Min(4), // events Constraint::Length(8), // gate tail Constraint::Length(2), // status ]) .split(f.area()); let ws_label = if g.ws_ok { "ws ok" } else { "ws ..." }; let header = Paragraph::new(format!("sando -> {daemon} ({ws_label})")) .block(Block::default().title("daemon").borders(Borders::ALL)); f.render_widget(header, chunks[0]); // Tiers table if let Some(s) = g.state.as_ref() { let hr = Row::new(vec![ " ", "tier", "prov", "current", "previous", "burn-in", "nodes", "gates", ]) .style(Style::default().add_modifier(Modifier::BOLD)); let rows: Vec = s .tiers .iter() .enumerate() .map(|(i, t)| { let marker = if i == g.selected { ">" } else { " " }; let gates = if t.gates.is_empty() { Line::from("-") } else { // Step 5: render Passed (green) / Failed (red) / // Blocked (yellow) / in-flight (default) from the // typed `status` column. Falls back to `passed` for // rows written before migration 003 (which the // backfill should have set, but defensive anyway). let mut spans: Vec = Vec::new(); for (idx, g) in t.gates.iter().enumerate() { if idx > 0 { spans.push(Span::raw(" ")); } let (mark, style) = gate_mark_and_style(g); spans.push(Span::raw(format!("{}:", g.kind))); spans.push(Span::styled(mark, style)); } Line::from(spans) }; let cells: Vec = vec![ marker.to_string().into(), t.name.clone().into(), if t.provisioned { "yes".into() } else { "no".into() }, t.current_version.clone().unwrap_or_else(|| "-".into()).into(), t.previous_version.clone().unwrap_or_else(|| "-".into()).into(), t.burn_in_started_at .as_deref() .and_then(|s| s.get(11..19)) .unwrap_or("-") .to_string() .into(), if t.nodes.is_empty() { "-".into() } else { t.nodes.join(",").into() }, gates.into(), ]; let row = Row::new(cells); if i == g.selected { row.style(Style::default().add_modifier(Modifier::REVERSED)) } else { row } }) .collect(); let widths = [ Constraint::Length(2), Constraint::Length(8), Constraint::Length(5), Constraint::Length(12), Constraint::Length(12), Constraint::Length(10), Constraint::Length(20), Constraint::Min(20), ]; let table = Table::new(rows, widths).header(hr).block( Block::default() .title(format!("tiers ({}) ↑/↓ select", s.tiers.len())) .borders(Borders::ALL), ); f.render_widget(table, chunks[1]); } else { let msg = g .last_err .clone() .unwrap_or_else(|| "loading...".into()); let placeholder = Paragraph::new(msg).block(Block::default().title("tiers").borders(Borders::ALL)); f.render_widget(placeholder, chunks[1]); } // Events let area = chunks[2]; let visible = area.height.saturating_sub(2) as usize; // minus borders let items: Vec = g .events .iter() .rev() .take(visible) .rev() .map(|line| ListItem::new(line.clone())) .collect(); let events_block = List::new(items).block( Block::default() .title(format!("events ({})", g.events.len())) .borders(Borders::ALL), ); f.render_widget(events_block, area); // Gate tail (per-run live log buffer). let tail_area = chunks[3]; let tail = g.focus_run.and_then(|id| g.gate_tails.get(&id).map(|t| (id, t))); let tail_widget = match tail { Some((id, t)) => { let title_state = match &t.status { TailStatus::InFlight => "in-flight".to_string(), TailStatus::Finished(s) => s.clone(), }; let title = format!( "tail [{}] {}/{} {} — {title_state} ([) prev / (]) next {} of {}", id, t.tier, t.version, t.gate, tail_position(&g.gate_tails, id), g.gate_tails.len(), ); let visible = tail_area.height.saturating_sub(2) as usize; let items: Vec = t.lines .iter() .rev() .take(visible) .rev() .map(|l| ListItem::new(truncate(l, 200))) .collect(); List::new(items).block(Block::default().title(title).borders(Borders::ALL)) } None => { let body = "no recent gate runs"; List::new(vec![ListItem::new(body)]) .block(Block::default().title("tail").borders(Borders::ALL)) } }; f.render_widget(tail_widget, tail_area); // Status / keys let keys = "[p] promote [R] rollback [b] backup [c] confirm [↑↓] select [[/]] tail [r] refresh [q] quit"; let status = if let Some(n) = &g.notice { format!("{n} {keys}") } else if let Some(e) = &g.last_err { format!("error: {e} {keys}") } else { keys.into() }; f.render_widget(Paragraph::new(status), chunks[4]); } /// 1-based index of `id` within the sorted tail map; 0 if not present. fn tail_position(tails: &BTreeMap, id: GateRunId) -> usize { tails.keys().position(|k| *k == id).map(|i| i + 1).unwrap_or(0) } /// Move `focus_run` by `step` positions through the sorted run map. /// Wraps at both ends. Returns the new focus, or `None` if the map is /// empty. fn cycle_focus( tails: &BTreeMap, current: Option, step: i32, ) -> Option { if tails.is_empty() { return None; } let keys: Vec = tails.keys().copied().collect(); let idx = current .and_then(|c| keys.iter().position(|k| *k == c)) .unwrap_or(keys.len().saturating_sub(1)); let len = keys.len() as i32; let new_idx = ((idx as i32 + step) % len + len) % len; Some(keys[new_idx as usize]) } // ---------- tests ---------- #[cfg(test)] mod tests { use super::*; #[test] fn ws_url_from_http() { assert_eq!(ws_url_from("http://127.0.0.1:7766"), "ws://127.0.0.1:7766/events"); assert_eq!(ws_url_from("http://pop-os:7766"), "ws://pop-os:7766/events"); } #[test] fn ws_url_from_https() { assert_eq!( ws_url_from("https://sando.example.com"), "wss://sando.example.com/events" ); } #[test] fn ws_url_from_only_replaces_scheme_once() { // A path or query containing the literal string `http://` should not be // double-rewritten. Edge case but cheap to guard. assert_eq!( ws_url_from("http://host/api?next=http://other"), "ws://host/api?next=http://other/events" ); } #[test] fn ws_url_from_unknown_scheme_passes_through() { // Bad input is left alone — `connect_async` will reject it later. let s = ws_url_from("garbage://host"); assert_eq!(s, "garbage://host/events"); } #[test] fn format_event_build_ok() { let json = r#"{"at":"2026-06-01T02:55:39.123Z","kind":"build_ok","sha":"3dc8dca7a120c4","version":"0.8.12","elapsed_s":132}"#; let out = format_event(json).unwrap(); assert!(out.starts_with("02:55:39 build_ok")); assert!(out.contains("v=0.8.12")); assert!(out.contains("(132s)")); } #[test] fn format_event_gate_done_passed_uses_pass_note() { // Step 5+: GateDone carries a typed `outcome` and `run_id`. let pass = format_event( r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_done","run_id":17,"tier":"mm","version":"0.8.12","gate":"cargo_test","outcome":{"status":{"kind":"passed","note":{"kind":"tests_passed","duration_s":42}}}}"#, ) .unwrap(); assert!(pass.contains("[17]"), "got: {pass}"); assert!(pass.contains("mm/0.8.12 cargo_test ok"), "got: {pass}"); assert!(pass.contains("42s"), "got: {pass}"); } #[test] fn format_event_gate_done_failed_uses_failure_variant() { let fail = format_event( r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_done","run_id":18,"tier":"mm","version":"0.8.12","gate":"cargo_test","outcome":{"status":{"kind":"failed","failure":{"kind":"cargo_test","failed_count":3,"first_failed":"foo::bar"}}}}"#, ) .unwrap(); assert!(fail.contains("FAIL"), "got: {fail}"); assert!(fail.contains("3 test(s)"), "got: {fail}"); assert!(fail.contains("foo::bar"), "got: {fail}"); } #[test] fn format_event_gate_done_blocked_is_not_FAIL() { // The whole point of the Blocked variant: an unstarted burn-in // clock is not a defect, so the TUI renders it distinctly from a // genuine failure. No "FAIL" string. let blocked = format_event( r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_done","run_id":19,"tier":"a","version":"0.9.6","gate":"burn_in","outcome":{"status":{"kind":"blocked","blocker":{"kind":"burn_in_clock_not_started"}}}}"#, ) .unwrap(); assert!(blocked.contains("burn_in blocked"), "got: {blocked}"); assert!(!blocked.contains("FAIL"), "got: {blocked}"); assert!(blocked.contains("burn-in clock not started"), "got: {blocked}"); } #[test] fn format_event_gate_log_chunk_renders_truncated_line() { let line = format_event( r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_log_chunk","run_id":42,"seq":7,"text":" Compiling foo v0.1.0\n"}"#, ) .unwrap(); assert!(line.contains("[42 #7]"), "got: {line}"); assert!(line.contains("Compiling foo"), "got: {line}"); // Trailing newline stripped, no FAIL/ok decorations. assert!(!line.ends_with('\n'), "got: {line}"); } #[test] fn format_event_gate_log_chunk_truncates_long_input() { // 300-char line should be cut to 160 — single-line UX. let text = "x".repeat(300); let json = format!( r#"{{"at":"2026-06-01T02:55:39Z","kind":"gate_log_chunk","run_id":1,"seq":0,"text":"{text}"}}"# ); let line = format_event(&json).unwrap(); // Total line includes timestamp + kind + prefix + the truncated body. // The truncated body alone should be ≤ 160 chars of 'x'. let x_count = line.chars().filter(|c| *c == 'x').count(); assert!(x_count <= 160, "got {x_count} x's; full line: {line}"); } #[test] fn format_event_backup_fetched() { let out = format_event( r#"{"at":"2026-06-01T04:00:01Z","kind":"backup_fetched","source":"ssh://backup-puller@alpha-west-1:2200/latest.sql.gz","byte_size":36079398}"#, ) .unwrap(); assert!(out.starts_with("04:00:01 backup_fetched")); assert!(out.contains("36079398")); assert!(out.contains("ssh://")); } #[test] fn format_event_deploy_failed_uses_typed_failure() { // Step 7: DeployFailed carries a typed DeployFailureKind. let out = format_event( r#"{"at":"2026-06-01T02:55:39Z","kind":"deploy_failed","tier":"a","node":"testnot-1","version":"0.8.12","failure":{"kind":"rsync_failed","detail":"rsync: write failed: No space left on device (28)"}}"#, ) .unwrap(); assert!(out.contains("a -> testnot-1"), "got: {out}"); assert!(out.contains("FAILED"), "got: {out}"); assert!(out.contains("rsync"), "got: {out}"); assert!(out.contains("No space left"), "got: {out}"); } #[test] fn format_event_deploy_failed_node_unreachable_short_form() { // NodeUnreachable renders as a stable short label; the operator // doesn't need the full ssh chain in the events log. let out = format_event( r#"{"at":"2026-06-01T02:55:39Z","kind":"deploy_failed","tier":"a","node":"testnot-1","version":"0.8.12","failure":{"kind":"node_unreachable","detail":"Connection refused"}}"#, ) .unwrap(); assert!(out.contains("node unreachable"), "got: {out}"); } #[test] fn format_event_unknown_kind_returns_none() { // Step 5: typed parse rejects unknown kinds. The caller falls // back to the raw text (see events_subscriber's // `unwrap_or_else(|| t.to_string())`), so unknown events still // appear in the log; they just don't get the prettified line. let json = r#"{"at":"2026-06-01T02:55:39Z","kind":"new_thing","whatever":42}"#; assert_eq!(format_event(json), None); } #[test] fn format_event_malformed_returns_none() { // Not a JSON object → None; caller falls back to the raw text. assert_eq!(format_event("not json"), None); assert_eq!(format_event("[1,2,3]"), None); // Object without `kind` → None too. assert_eq!(format_event(r#"{"at":"2026-06-01T02:55:39Z"}"#), None); } #[test] fn action_display_strings() { // These show up verbatim in the events log; the daemon's gate parser // doesn't read them, but the operator does. Keep stable. assert_eq!(Action::BackupFetch.to_string(), "backup/fetch"); assert_eq!( Action::Promote { tier: "a".into() }.to_string(), "promote/a" ); assert_eq!( Action::Rollback { tier: "b".into() }.to_string(), "rollback/b" ); assert_eq!( Action::Confirm { tier: "mm".into() }.to_string(), "confirm/mm" ); } #[test] fn truncate_short_returns_same() { assert_eq!(truncate("hi", 10), "hi"); } #[test] fn truncate_long_appends_ellipsis() { let s = "a".repeat(200); let out = truncate(&s, 10); assert_eq!(out.chars().count(), 11); // 10 chars + ellipsis assert!(out.ends_with('…')); } #[test] fn shared_push_event_caps_at_capacity() { let mut sh = Shared::default(); for i in 0..(EVENTS_CAP + 50) { sh.push_event(format!("e{i}")); } assert_eq!(sh.events.len(), EVENTS_CAP); // Oldest 50 were dropped: front should be "e50". assert_eq!(sh.events.front().unwrap(), "e50"); assert_eq!(sh.events.back().unwrap(), &format!("e{}", EVENTS_CAP + 49)); } // ----- gate tail buffer (Phase C) ----- fn tail_fixture() -> GateTail { GateTail::new(TierId::new("a"), "0.9.6".parse().unwrap(), GateKind::CargoTest) } #[test] fn gate_tail_splits_chunks_on_newlines() { let mut t = tail_fixture(); t.push_chunk("one\ntwo\n"); assert_eq!(t.lines.len(), 2); assert_eq!(t.lines[0], "one"); assert_eq!(t.lines[1], "two"); assert!(t.partial.is_empty()); } #[test] fn gate_tail_buffers_partial_line_across_chunks() { // Chunks arrive at tokio read boundaries — a single logical line // can span multiple chunks. The tail must wait for `\n` before // committing. let mut t = tail_fixture(); t.push_chunk("Compi"); t.push_chunk("ling foo"); assert!(t.lines.is_empty()); assert_eq!(t.partial, "Compiling foo"); t.push_chunk(" v0.1.0\nDone\n"); assert_eq!(t.lines.len(), 2); assert_eq!(t.lines[0], "Compiling foo v0.1.0"); assert_eq!(t.lines[1], "Done"); } #[test] fn gate_tail_strips_carriage_returns() { // Cargo emits `\r\n` on Windows-builds; treat the `\r` as part of // the line terminator so the pane doesn't render visible `^M`s. let mut t = tail_fixture(); t.push_chunk("hello\r\nworld\r\n"); assert_eq!(t.lines[0], "hello"); assert_eq!(t.lines[1], "world"); } #[test] fn gate_tail_caps_lines_at_tail_lines_cap() { let mut t = tail_fixture(); for i in 0..(TAIL_LINES_CAP + 50) { t.push_chunk(&format!("line {i}\n")); } assert_eq!(t.lines.len(), TAIL_LINES_CAP); assert_eq!(t.lines.front().unwrap(), &format!("line {}", 50)); } #[test] fn gate_tail_finalize_flushes_trailing_partial() { // If the gate's last write didn't end in `\n` (panic mid-line, // SIGKILL), the partial-buffered tail must still surface on // finalize. let mut t = tail_fixture(); t.push_chunk("partial without newline"); assert!(t.lines.is_empty()); t.finalize("failed".into()); assert_eq!(t.lines.len(), 1); assert_eq!(t.lines[0], "partial without newline"); assert_eq!(t.status, TailStatus::Finished("failed".into())); } #[test] fn shared_open_tail_evicts_oldest_at_capacity() { let mut sh = Shared::default(); for i in 0..(GATE_TAILS_CAP + 3) as i64 { sh.open_tail( GateRunId(i + 1), TierId::new("a"), "0.9.6".parse().unwrap(), GateKind::CargoTest, ); } assert_eq!(sh.gate_tails.len(), GATE_TAILS_CAP); // The first 3 (run_id 1, 2, 3) were evicted; the smallest key // remaining should be 4. assert_eq!(*sh.gate_tails.keys().next().unwrap(), GateRunId(4)); // focus_run follows the most recent insertion. assert_eq!(sh.focus_run, Some(GateRunId((GATE_TAILS_CAP + 3) as i64))); } #[test] fn shared_push_tail_chunk_is_noop_for_unknown_run() { // GateLogChunk can arrive before GateStart if a frame lags; just // drop it rather than synthesizing a header-less run. let mut sh = Shared::default(); sh.push_tail_chunk(GateRunId(99), "orphaned\n"); assert!(sh.gate_tails.is_empty()); } #[test] fn cycle_focus_wraps_in_both_directions() { let mut sh = Shared::default(); for i in 1..=3i64 { sh.open_tail( GateRunId(i), TierId::new("a"), "0.9.6".parse().unwrap(), GateKind::CargoTest, ); } // Currently focused on the most recent (run 3). assert_eq!(sh.focus_run, Some(GateRunId(3))); // Forward wraps to the smallest. let next = cycle_focus(&sh.gate_tails, sh.focus_run, 1); assert_eq!(next, Some(GateRunId(1))); // Backward from the smallest wraps to the largest. let prev = cycle_focus(&sh.gate_tails, Some(GateRunId(1)), -1); assert_eq!(prev, Some(GateRunId(3))); } #[test] fn cycle_focus_handles_empty_map() { let sh = Shared::default(); assert_eq!(cycle_focus(&sh.gate_tails, None, 1), None); assert_eq!(cycle_focus(&sh.gate_tails, Some(GateRunId(7)), -1), None); } #[test] fn dispatch_ws_routes_gate_log_chunks_into_tail_not_events() { // The contract: a busy gate (hundreds of GateLogChunks) must NOT // evict other events from the main ring. Only GateStart and // GateDone get an events-log line. let sh = Arc::new(Mutex::new(Shared::default())); let start = r#"{"at":"2026-06-01T02:55:39Z","kind":"gate_start","run_id":17,"tier":"a","version":"0.9.6","gate":"cargo_test"}"#; dispatch_ws_frame(&sh, start); for i in 0..50 { let chunk = format!( r#"{{"at":"2026-06-01T02:55:39Z","kind":"gate_log_chunk","run_id":17,"seq":{i},"text":"line {i}\n"}}"# ); dispatch_ws_frame(&sh, &chunk); } let done = r#"{"at":"2026-06-01T02:55:40Z","kind":"gate_done","run_id":17,"tier":"a","version":"0.9.6","gate":"cargo_test","outcome":{"status":{"kind":"passed","note":{"kind":"tests_passed","duration_s":1}}}}"#; dispatch_ws_frame(&sh, done); let g = sh.lock().unwrap(); // Two lines in the events ring: start + done. Zero chunk lines. assert_eq!(g.events.len(), 2); assert!(g.events.front().unwrap().contains("gate_start")); assert!(g.events.back().unwrap().contains("gate_done")); // 50 lines in the tail buffer. let tail = g.gate_tails.get(&GateRunId(17)).expect("tail entry"); assert_eq!(tail.lines.len(), 50); assert_eq!(tail.lines.back().unwrap(), "line 49"); assert_eq!(tail.status, TailStatus::Finished("passed".into())); } }