//! Event bus for live operator visibility. //! //! Sites that previously logged via `tracing::info!` also emit a typed event //! onto a `broadcast::Sender`. The WS handler at `/events` //! subscribes to the bus and forwards each envelope to the connected TUI as //! a JSON text frame. use crate::domain::{GateKind, GateRunId, GitSha, NodeId, TierId, Version}; use crate::outcome::{DeployFailureKind, GateOutcome}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; /// Capacity of the broadcast channel. Slow subscribers that fall behind by /// more than this many events get `RecvError::Lagged`; the WS handler treats /// that as a recoverable hiccup, not a disconnect. pub const CAPACITY: usize = 256; pub type EventTx = broadcast::Sender; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EventEnvelope { pub at: DateTime, #[serde(flatten)] pub event: Event, } #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum Event { /// A /rebuild was accepted (post-receive hook or operator). RebuildRequested { sha: GitSha }, /// A previous in-flight build was aborted because a newer /rebuild arrived. BuildAborted { sha_aborted: GitSha }, BuildStart { sha: GitSha, version: Version }, BuildOk { sha: GitSha, version: Version, elapsed_s: u64 }, BuildFailed { sha: GitSha, version: Version, elapsed_s: u64 }, GateStart { run_id: GateRunId, tier: TierId, version: Version, gate: GateKind, }, /// Chunk of combined stdout+stderr from a gate that's currently running. /// `run_id` correlates back to the `GateStart` for the same gate run; the /// TUI uses it to group chunks if it wants a per-run buffer. `seq` is a /// monotonic counter scoped to one run (resets across runs). `text` is a /// UTF-8-lossy slice of bytes — chunks reflect tokio read boundaries, not /// line boundaries; the on-disk log at `outcome.log_ref` is the full, /// byte-exact stream. GateLogChunk { run_id: GateRunId, seq: u32, text: String, }, /// `outcome` carries classification, blocker variants, and the log_ref — /// consumers should read `outcome.status` to decide pass/fail/blocked. GateDone { run_id: GateRunId, tier: TierId, version: Version, gate: GateKind, outcome: GateOutcome, }, DeployStart { tier: TierId, node: NodeId, version: Version }, DeployOk { tier: TierId, node: NodeId, version: Version }, DeployFailed { tier: TierId, node: NodeId, version: Version, failure: DeployFailureKind, }, PromoteComplete { tier: TierId, version: Version }, Rollback { tier: TierId, from: Version, to: Version }, /// `source` is an ssh URL, kept freeform on purpose — it's a transport /// detail, not a domain identifier. BackupFetched { source: String, byte_size: i64 }, ManualConfirm { tier: TierId, version: Version }, } pub fn channel() -> EventTx { broadcast::channel(CAPACITY).0 } /// Send an event without caring whether anyone is listening. The `send` call /// fails only when there are zero subscribers, which is the normal case for /// most operator-tool deployments. pub fn emit(tx: &EventTx, event: Event) { let envelope = EventEnvelope { at: Utc::now(), event }; let _ = tx.send(envelope); } #[cfg(test)] mod tests { use super::*; #[test] fn emit_with_zero_subscribers_does_not_panic() { // The whole point of `let _ = tx.send(...)` is that emitting into an // unsubscribed bus is fine. Verify the contract — if this regresses // to `.unwrap()` someday, every build/deploy site will start // crashing. let tx = channel(); emit(&tx, Event::RebuildRequested { sha: GitSha::parse("abc1234").unwrap() }); emit(&tx, Event::BackupFetched { source: "x".into(), byte_size: 1 }); } #[tokio::test] async fn emit_reaches_a_subscriber() { let tx = channel(); let mut rx = tx.subscribe(); emit(&tx, Event::PromoteComplete { tier: TierId::new("a"), version: "0.8.12".parse().unwrap(), }); let env = rx.recv().await.expect("envelope"); match env.event { Event::PromoteComplete { tier, version } => { assert_eq!(tier.as_str(), "a"); assert_eq!(version.to_string(), "0.8.12"); } _ => panic!("wrong event kind"), } } #[tokio::test] async fn envelope_serializes_with_flat_kind() { // Contract for the WS handler + TUI's `format_event`: the JSON has a // top-level `kind` field, not nested under `event`. Locking this in. let env = EventEnvelope { at: Utc::now(), event: Event::GateStart { run_id: GateRunId(42), tier: TierId::new("host"), version: "0.8.12".parse().unwrap(), gate: GateKind::CargoTest, }, }; let s = serde_json::to_string(&env).unwrap(); let v: serde_json::Value = serde_json::from_str(&s).unwrap(); assert_eq!(v["kind"], "gate_start"); assert_eq!(v["tier"], "host"); assert_eq!(v["gate"], "cargo_test"); // No nested `event` object. assert!(v.get("event").is_none()); } #[tokio::test] async fn lagged_subscriber_observes_recv_error_lagged() { // If a subscriber falls behind by more than CAPACITY, the next // recv() returns RecvError::Lagged(n) — not Closed, not a panic. // The WS handler turns this into a `lagged` envelope. let tx = channel(); let mut rx = tx.subscribe(); for i in 0..(CAPACITY + 10) { // 7+ hex chars satisfy GitSha::parse; pad i into that shape. let sha = GitSha::parse(&format!("{i:0>7x}")).unwrap(); emit(&tx, Event::RebuildRequested { sha }); } let err = rx.recv().await.expect_err("expected Lagged"); match err { tokio::sync::broadcast::error::RecvError::Lagged(n) => assert!(n >= 10), other => panic!("unexpected error: {other:?}"), } } }