//! bento-tui — operator front-end for bentod. //! //! Layout (top to bottom): //! //! ┌ daemon ─────────────────────────────────────────────────────┐ //! │ bento -> http://...:7800 (ws ok) build: goingson 0.4.1 │ //! ├ matrix ── target x step (↑/↓ select target) ────────────────│ //! │ target chk pre bld sgn ntz stp vfy pkg pub col │ //! │>macos/aarch64 O O > . . . . . . . │ //! │ linux/x86_64 O O O - - O - - O O │ //! ├ events (WS /events) ────────────────────────────────────────│ //! │ 09:21:03 target_start linux/x86_64 │ //! ├ tail [42] linux/x86_64 build — in-flight ───────────────────│ //! │ Compiling goingson v0.4.1 │ //! ├ status / keys ──────────────────────────────────────────────│ //! │ [b] build [R] retry target [↑↓] select [[/]] tail [q] quit│ //! └─────────────────────────────────────────────────────────────┘ use anyhow::{Context, Result}; use crossterm::event::{self, Event as XEvent, KeyCode, KeyModifiers}; use crossterm::terminal::{ EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode, }; use futures_util::StreamExt; use ratatui::prelude::*; use ratatui::widgets::{Block, Borders, Cell, List, ListItem, Paragraph, Row, Table}; use bento_daemon::domain::{Step, StepRunId}; use bento_daemon::events::{Event, EventEnvelope}; use serde::Deserialize; use std::collections::{BTreeMap, VecDeque}; use std::io; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::mpsc; // ---------- daemon types (subset of GET /state) ---------- #[derive(Clone, Debug, Deserialize)] struct StateView { build: Option, } #[derive(Clone, Debug, Deserialize)] struct BuildView { #[allow(dead_code)] id: i64, app: String, version: String, status: String, targets: Vec, } #[derive(Clone, Debug, Deserialize)] struct TargetView { target: String, status: String, #[allow(dead_code)] current_step: Option, error: Option, steps: Vec, } #[derive(Clone, Debug, Deserialize)] struct StepView { #[allow(dead_code)] run_id: i64, step: String, status: String, } /// Short column headers for the matrix, in canonical step order. const STEP_COLS: [(Step, &str); 10] = [ (Step::Checkout, "chk"), (Step::Prebuild, "pre"), (Step::Build, "bld"), (Step::Sign, "sgn"), (Step::Notarize, "ntz"), (Step::Staple, "stp"), (Step::Verify, "vfy"), (Step::Package, "pkg"), (Step::Publish, "pub"), (Step::Collect, "col"), ]; // ---------- shared app state ---------- #[derive(Default)] struct Shared { state: Option, last_err: Option, events: VecDeque, ws_ok: bool, selected: usize, notice: Option, tails: BTreeMap, focus_run: Option, } const EVENTS_CAP: usize = 200; const TAILS_CAP: usize = 10; const TAIL_LINES_CAP: usize = 200; /// Per-step live-tail buffer. Receives `StepLogChunk` text and presents it as a /// line-aware ring; chunks are not line-aligned at the transport, so a trailing /// partial line is buffered across chunks. struct StepTail { target: String, step: String, lines: VecDeque, partial: String, status: TailStatus, } #[derive(Clone, PartialEq, Eq)] enum TailStatus { InFlight, Finished(String), } impl StepTail { fn new(target: String, step: String) -> Self { Self { target, step, lines: VecDeque::new(), partial: String::new(), status: TailStatus::InFlight, } } fn push_chunk(&mut self, text: &str) { let combined = std::mem::take(&mut self.partial) + text; let mut rest = combined.as_str(); while let Some(idx) = rest.find('\n') { let (line, after) = rest.split_at(idx); self.push_line(line.trim_end_matches('\r').to_string()); rest = &after[1..]; } self.partial = rest.to_string(); } fn push_line(&mut self, line: String) { if self.lines.len() >= TAIL_LINES_CAP { self.lines.pop_front(); } self.lines.push_back(line); } fn finalize(&mut self, status_word: String) { let trailing = std::mem::take(&mut self.partial); if !trailing.is_empty() { self.push_line(trailing); } self.status = TailStatus::Finished(status_word); } } impl Shared { fn push_event(&mut self, line: String) { if self.events.len() >= EVENTS_CAP { self.events.pop_front(); } self.events.push_back(line); } fn open_tail(&mut self, run_id: StepRunId, target: String, step: String) { if self.tails.len() >= TAILS_CAP { if let Some((&oldest, _)) = self.tails.iter().next() { self.tails.remove(&oldest); } } self.tails.insert(run_id, StepTail::new(target, step)); self.focus_run = Some(run_id); } fn push_tail_chunk(&mut self, run_id: StepRunId, text: &str) { if let Some(t) = self.tails.get_mut(&run_id) { t.push_chunk(text); } } fn finalize_tail(&mut self, run_id: StepRunId, status_word: String) { if let Some(t) = self.tails.get_mut(&run_id) { t.finalize(status_word); } } /// App to act on: the latest build's app, else the env default. fn app_name(&self, default_app: &str) -> String { self.state .as_ref() .and_then(|s| s.build.as_ref()) .map(|b| b.app.clone()) .unwrap_or_else(|| default_app.to_string()) } fn selected_target(&self) -> Option { self.state .as_ref() .and_then(|s| s.build.as_ref()) .and_then(|b| b.targets.get(self.selected)) .map(|t| t.target.clone()) } } // ---------- main ---------- fn main() -> Result<()> { let daemon = std::env::var("BENTO_DAEMON").unwrap_or_else(|_| "http://127.0.0.1:7800".into()); let default_app = std::env::var("BENTO_APP").unwrap_or_else(|_| "goingson".into()); let shared = Arc::new(Mutex::new(Shared::default())); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(2) .build()?; let _g = rt.enter(); rt.spawn(state_poller(daemon.clone(), shared.clone())); rt.spawn(events_subscriber(daemon.clone(), shared.clone())); enable_raw_mode()?; let mut stdout = io::stdout(); crossterm::execute!(stdout, EnterAlternateScreen)?; let backend = CrosstermBackend::new(stdout); let mut term = Terminal::new(backend)?; let res = ui_loop(&mut term, &daemon, &default_app, &shared, rt.handle()); disable_raw_mode()?; crossterm::execute!(term.backend_mut(), LeaveAlternateScreen)?; term.show_cursor()?; res } // ---------- background tasks ---------- async fn state_poller(daemon: String, shared: Arc>) { let url = format!("{daemon}/state"); let client = reqwest::Client::new(); loop { match client.get(&url).timeout(Duration::from_secs(2)).send().await { Ok(resp) if resp.status().is_success() => match resp.json::().await { Ok(s) => { let mut g = shared.lock().unwrap(); let n = s .build .as_ref() .map(|b| b.targets.len().saturating_sub(1)) .unwrap_or(0); if g.selected > n { g.selected = n; } g.state = Some(s); g.last_err = None; } Err(e) => shared.lock().unwrap().last_err = Some(format!("decode: {e}")), }, Ok(resp) => shared.lock().unwrap().last_err = Some(format!("status {}", resp.status())), Err(e) => shared.lock().unwrap().last_err = Some(e.to_string()), } tokio::time::sleep(Duration::from_secs(2)).await; } } async fn events_subscriber(daemon: String, shared: Arc>) { let ws_url = ws_url_from(&daemon); loop { match tokio_tungstenite::connect_async(&ws_url).await { Ok((mut socket, _resp)) => { shared.lock().unwrap().ws_ok = true; while let Some(msg) = socket.next().await { match msg { Ok(tokio_tungstenite::tungstenite::Message::Text(t)) => { dispatch_ws_frame(&shared, &t) } Ok(tokio_tungstenite::tungstenite::Message::Close(_)) | Err(_) => break, _ => {} } } shared.lock().unwrap().ws_ok = false; } Err(_) => shared.lock().unwrap().ws_ok = false, } tokio::time::sleep(Duration::from_secs(3)).await; } } fn ws_url_from(daemon: &str) -> String { daemon.replacen("https://", "wss://", 1).replacen("http://", "ws://", 1) + "/events" } /// Route a WS frame: log chunks update the per-step tail; everything else /// becomes one line in the events ring. (Chunks never hit the ring — a busy /// build emits hundreds per second and would evict every other event.) fn dispatch_ws_frame(shared: &Arc>, raw: &str) { if let Some(line) = format_lagged(raw) { shared.lock().unwrap().push_event(line); return; } let Ok(env) = serde_json::from_str::(raw) else { shared.lock().unwrap().push_event(raw.to_string()); return; }; match &env.event { Event::StepStart { run_id, target, step, .. } => { let mut g = shared.lock().unwrap(); g.open_tail(*run_id, target.to_string(), step.to_string()); let line = format_event_line(&env); g.push_event(line); } Event::StepLogChunk { run_id, text, .. } => { shared.lock().unwrap().push_tail_chunk(*run_id, text); } Event::StepDone { run_id, status, .. } => { let word = status.as_str().to_string(); let mut g = shared.lock().unwrap(); g.finalize_tail(*run_id, word); let line = format_event_line(&env); g.push_event(line); } _ => { let line = format_event_line(&env); shared.lock().unwrap().push_event(line); } } } fn format_event_line(env: &EventEnvelope) -> String { let time = env.at.format("%H:%M:%S").to_string(); format!("{time} {}", format_event_body(&env.event)) } fn format_event_body(e: &Event) -> String { match e { Event::BuildRequested { app, version, targets } => { format!("build_requested {app} {version} [{}]", targets.iter().map(|t| t.to_string()).collect::>().join(",")) } Event::TargetAborted { target, .. } => format!("target_aborted {target}"), Event::TargetStart { target, .. } => format!("target_start {target}"), Event::StepStart { target, step, .. } => format!("step_start {target} {step}"), Event::StepLogChunk { run_id, text, .. } => format!("log[{run_id}] {}", truncate(text.trim_end(), 80)), Event::StepDone { target, step, status, .. } => format!("step_done {target} {step} {}", status.as_str()), Event::TargetOk { target, artifacts, .. } => format!("target_ok {target} ({} artifacts)", artifacts.len()), Event::TargetFailed { target, step, error, .. } => format!("target_failed {target} {step}: {}", truncate(error, 80)), Event::NotarizeRetry { target, attempt, reason, .. } => format!("notarize_retry {target} #{attempt} {reason}"), Event::ArtifactCollected { target, path, bytes, .. } => format!("artifact {target} {path} {bytes}B"), Event::PublishOk { target, channel, .. } => format!("publish_ok {target} {channel}"), Event::PublishFailed { target, channel, error, .. } => format!("publish_failed {target} {channel}: {}", truncate(error, 80)), } } fn format_lagged(raw: &str) -> Option { let v: serde_json::Value = serde_json::from_str(raw).ok()?; if v.get("kind")?.as_str()? == "lagged" { Some(format!("(lagged, skipped {})", v.get("skipped").and_then(|n| n.as_i64()).unwrap_or(0))) } else { None } } // ---------- actions ---------- #[derive(Clone, Debug)] enum Action { Build { app: String }, Retry { app: String, target: String }, } impl std::fmt::Display for Action { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Action::Build { app } => write!(f, "build {app}"), Action::Retry { app, target } => write!(f, "retry {app}/{target}"), } } } async fn dispatch_action(daemon: &str, act: &Action) -> Result { let client = reqwest::Client::new(); let (url, body) = match act { Action::Build { app } => (format!("{daemon}/build"), serde_json::json!({ "app": app })), Action::Retry { app, target } => { (format!("{daemon}/retry"), serde_json::json!({ "app": app, "target": target })) } }; let resp = client .post(&url) .timeout(Duration::from_secs(30)) .json(&body) .send() .await .context("send")?; let status = resp.status(); let text = resp.text().await.unwrap_or_default(); if status.is_success() { Ok(truncate(&text, 120)) } else { Err(anyhow::anyhow!("HTTP {status}: {}", truncate(&text, 200))) } } // ---------- ui loop ---------- fn ui_loop( term: &mut Terminal, daemon: &str, default_app: &str, shared: &Arc>, rt: &tokio::runtime::Handle, ) -> Result<()> { let (action_tx, mut action_rx) = mpsc::channel::(32); { let shared = shared.clone(); let daemon = daemon.to_string(); rt.spawn(async move { while let Some(act) = action_rx.recv().await { let res = dispatch_action(&daemon, &act).await; let line = match res { Ok(msg) => format!("[ok] {act}: {msg}"), Err(e) => format!("[err] {act}: {e}"), }; let mut g = shared.lock().unwrap(); g.notice = Some(line.clone()); g.push_event(format!(" action {line}")); } }); } loop { term.draw(|f| draw(f, daemon, shared))?; if event::poll(Duration::from_millis(120))? { if let XEvent::Key(k) = event::read()? { match k.code { KeyCode::Char('q') | KeyCode::Esc => return Ok(()), KeyCode::Char('c') if k.modifiers.contains(KeyModifiers::CONTROL) => { return Ok(()); } KeyCode::Up | KeyCode::Char('k') => { let mut g = shared.lock().unwrap(); g.selected = g.selected.saturating_sub(1); } KeyCode::Down | KeyCode::Char('j') => { let mut g = shared.lock().unwrap(); let max = g .state .as_ref() .and_then(|s| s.build.as_ref()) .map(|b| b.targets.len().saturating_sub(1)) .unwrap_or(0); if g.selected < max { g.selected += 1; } } KeyCode::Char('b') => { let app = shared.lock().unwrap().app_name(default_app); let _ = action_tx.try_send(Action::Build { app }); } KeyCode::Char('R') => { let (app, target) = { let g = shared.lock().unwrap(); (g.app_name(default_app), g.selected_target()) }; if let Some(target) = target { let _ = action_tx.try_send(Action::Retry { app, target }); } } KeyCode::Char('r') => { shared.lock().unwrap().notice = Some("refresh on next tick".into()); } KeyCode::Char('[') => { let mut g = shared.lock().unwrap(); g.focus_run = cycle_focus(&g.tails, g.focus_run, -1); } KeyCode::Char(']') => { let mut g = shared.lock().unwrap(); g.focus_run = cycle_focus(&g.tails, g.focus_run, 1); } _ => {} } } } } } // ---------- render ---------- fn draw(f: &mut Frame, daemon: &str, shared: &Arc>) { let g = shared.lock().unwrap(); let chunks = Layout::default() .direction(Direction::Vertical) .constraints([ Constraint::Length(3), // header Constraint::Length(10), // matrix Constraint::Min(4), // events Constraint::Length(8), // tail Constraint::Length(2), // status ]) .split(f.area()); let ws_label = if g.ws_ok { "ws ok" } else { "ws ..." }; let build_label = g .state .as_ref() .and_then(|s| s.build.as_ref()) .map(|b| format!("{} {} ({})", b.app, b.version, b.status)) .unwrap_or_else(|| "no builds yet".into()); let header = Paragraph::new(format!("bento -> {daemon} ({ws_label}) build: {build_label}")) .block(Block::default().title("daemon").borders(Borders::ALL)); f.render_widget(header, chunks[0]); // Matrix: rows = targets, columns = steps. match g.state.as_ref().and_then(|s| s.build.as_ref()) { Some(b) if !b.targets.is_empty() => { let mut header_cells = vec![" target".to_string()]; header_cells.extend(STEP_COLS.iter().map(|(_, abbr)| abbr.to_string())); let hr = Row::new(header_cells).style(Style::default().add_modifier(Modifier::BOLD)); let rows: Vec = b .targets .iter() .enumerate() .map(|(i, t)| { let marker = if i == g.selected { ">" } else { " " }; let mut cells: Vec = vec![format!("{marker} {}", t.target).into()]; for (step, _) in STEP_COLS { let status = t .steps .iter() .find(|s| s.step == step.as_str()) .map(|s| s.status.as_str()); let (mark, style) = step_mark(status); cells.push(Cell::from(mark).style(style)); } let row = Row::new(cells); if i == g.selected { row.style(Style::default().add_modifier(Modifier::REVERSED)) } else { row } }) .collect(); let mut widths = vec![Constraint::Length(20)]; widths.extend(STEP_COLS.iter().map(|_| Constraint::Length(4))); let table = Table::new(rows, widths).header(hr).block( Block::default() .title(format!("matrix ({} targets) ↑/↓ select", b.targets.len())) .borders(Borders::ALL), ); f.render_widget(table, chunks[1]); } _ => { let msg = g.last_err.clone().unwrap_or_else(|| "no active build — press [b] to start".into()); let placeholder = Paragraph::new(msg).block(Block::default().title("matrix").borders(Borders::ALL)); f.render_widget(placeholder, chunks[1]); } } // Events. let area = chunks[2]; let visible = area.height.saturating_sub(2) as usize; let items: Vec = g.events.iter().rev().take(visible).rev().map(|l| ListItem::new(l.clone())).collect(); let events_block = List::new(items).block( Block::default().title(format!("events ({})", g.events.len())).borders(Borders::ALL), ); f.render_widget(events_block, area); // Tail. let tail_area = chunks[3]; let tail = g.focus_run.and_then(|id| g.tails.get(&id).map(|t| (id, t))); let tail_widget = match tail { Some((id, t)) => { let state = match &t.status { TailStatus::InFlight => "in-flight".to_string(), TailStatus::Finished(s) => s.clone(), }; let title = format!( "tail [{id}] {}/{} — {state} ([) prev / (]) next {} of {}", t.target, t.step, tail_position(&g.tails, id), g.tails.len(), ); let visible = tail_area.height.saturating_sub(2) as usize; let items: Vec = t.lines.iter().rev().take(visible).rev().map(|l| ListItem::new(truncate(l, 200))).collect(); List::new(items).block(Block::default().title(title).borders(Borders::ALL)) } None => List::new(vec![ListItem::new("no recent step runs")]) .block(Block::default().title("tail").borders(Borders::ALL)), }; f.render_widget(tail_widget, tail_area); // Status / keys. Surface the selected target's error if it failed. let keys = "[b] build [R] retry target [↑↓] select [[/]] tail [r] refresh [q] quit"; let sel_err = g .state .as_ref() .and_then(|s| s.build.as_ref()) .and_then(|b| b.targets.get(g.selected)) .filter(|t| t.status == "failed") .and_then(|t| t.error.clone()); let status = if let Some(e) = sel_err { format!("FAILED: {} {keys}", truncate(&e, 100)) } else if let Some(n) = &g.notice { format!("{n} {keys}") } else if let Some(e) = &g.last_err { format!("error: {e} {keys}") } else { keys.into() }; f.render_widget(Paragraph::new(status), chunks[4]); } /// Status -> (mark, style). `None` = no row yet (step not reached). fn step_mark(status: Option<&str>) -> (&'static str, Style) { match status { Some("ok") => ("O", Style::default().fg(Color::Green)), Some("failed") => ("X", Style::default().fg(Color::Red)), Some("running") => (">", Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD)), Some("pending") => (".", Style::default().fg(Color::DarkGray)), _ => ("-", Style::default().fg(Color::DarkGray)), } } fn tail_position(tails: &BTreeMap, id: StepRunId) -> usize { tails.keys().position(|k| *k == id).map(|i| i + 1).unwrap_or(0) } fn cycle_focus( tails: &BTreeMap, current: Option, step: i32, ) -> Option { if tails.is_empty() { return None; } let keys: Vec = tails.keys().copied().collect(); let idx = current .and_then(|c| keys.iter().position(|k| *k == c)) .unwrap_or(keys.len().saturating_sub(1)); let len = keys.len() as i32; let new_idx = ((idx as i32 + step) % len + len) % len; Some(keys[new_idx as usize]) } fn truncate(s: &str, n: usize) -> String { if s.chars().count() <= n { s.into() } else { s.chars().take(n).collect::() + "…" } } #[cfg(test)] mod tests { use super::*; #[test] fn ws_url_swaps_scheme_once() { assert_eq!(ws_url_from("http://x:7800"), "ws://x:7800/events"); assert_eq!(ws_url_from("https://x"), "wss://x/events"); } #[test] fn step_mark_colors() { assert_eq!(step_mark(Some("ok")).0, "O"); assert_eq!(step_mark(Some("failed")).0, "X"); assert_eq!(step_mark(Some("running")).0, ">"); assert_eq!(step_mark(None).0, "-"); } #[test] fn tail_chunk_reassembles_lines() { let mut t = StepTail::new("linux/x86_64".into(), "build".into()); t.push_chunk("Compil"); t.push_chunk("ing\nDone\n"); assert_eq!(t.lines, vec!["Compiling".to_string(), "Done".to_string()]); t.finalize("ok".into()); assert_eq!(t.status, TailStatus::Finished("ok".into())); } #[test] fn cycle_focus_wraps() { let mut tails = BTreeMap::new(); tails.insert(StepRunId(1), StepTail::new("a".into(), "build".into())); tails.insert(StepRunId(2), StepTail::new("b".into(), "sign".into())); assert_eq!(cycle_focus(&tails, Some(StepRunId(2)), 1), Some(StepRunId(1))); assert_eq!(cycle_focus(&tails, Some(StepRunId(1)), -1), Some(StepRunId(2))); assert_eq!(cycle_focus(&BTreeMap::new(), None, 1), None); } #[test] fn format_lagged_detects() { assert!(format_lagged(r#"{"kind":"lagged","skipped":5}"#).unwrap().contains("5")); assert!(format_lagged(r#"{"kind":"step_start"}"#).is_none()); } }