//! Generic event bus for live operator visibility. //! //! Sando and Bento both broadcast typed events to a connected TUI over a //! `tokio::sync::broadcast` channel; the WS handler forwards each envelope as //! a JSON text frame. The only thing that differs between the tools is the //! concrete event payload, so the bus is generic over `E`. //! //! The envelope flattens the payload, so a payload enum tagged //! `#[serde(tag = "kind", rename_all = "snake_case")]` serializes with a //! top-level `kind` field plus the variant's fields inline — the wire shape //! the TUIs parse. use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; /// Capacity of the broadcast channel. Subscribers that fall behind by more /// than this many events get `RecvError::Lagged`; WS handlers treat that as a /// recoverable hiccup (emit a `lagged` notice), not a disconnect. pub const CAPACITY: usize = 256; /// An event payload `E` stamped with the time it was emitted. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EventEnvelope { pub at: DateTime, #[serde(flatten)] pub event: E, } /// The broadcast sender every emit site clones. `subscribe()` yields a /// receiver for the WS handler. pub type EventTx = broadcast::Sender>; /// Create a fresh bus. The dropped receiver from `broadcast::channel` is fine /// — emitters don't care whether anyone is listening (see [`emit`]). pub fn channel() -> EventTx { broadcast::channel(CAPACITY).0 } /// Send an event without caring whether anyone is subscribed. `send` errors /// only when there are zero receivers, which is the normal idle case for an /// operator tool, so the error is intentionally dropped. pub fn emit(tx: &EventTx, event: E) { let envelope = EventEnvelope { at: Utc::now(), event }; let _ = tx.send(envelope); } #[cfg(test)] mod tests { use super::*; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(tag = "kind", rename_all = "snake_case")] enum TestEvent { Started { name: String }, Done { code: i32 }, } #[test] fn emit_with_zero_subscribers_does_not_panic() { let tx = channel::(); emit(&tx, TestEvent::Started { name: "x".into() }); } #[tokio::test] async fn emit_reaches_a_subscriber() { let tx = channel::(); let mut rx = tx.subscribe(); emit(&tx, TestEvent::Done { code: 7 }); let env = rx.recv().await.expect("envelope"); assert_eq!(env.event, TestEvent::Done { code: 7 }); } #[test] fn envelope_serializes_with_flat_kind() { let env = EventEnvelope { at: Utc::now(), event: TestEvent::Started { name: "build".into() }, }; let v: serde_json::Value = serde_json::from_str(&serde_json::to_string(&env).unwrap()).unwrap(); assert_eq!(v["kind"], "started"); assert_eq!(v["name"], "build"); // Flattened, not nested under `event`. assert!(v.get("event").is_none()); // `at` is preserved. assert!(v.get("at").is_some()); } #[tokio::test] async fn lagged_subscriber_observes_recv_error_lagged() { let tx = channel::(); let mut rx = tx.subscribe(); for i in 0..(CAPACITY + 10) { emit(&tx, TestEvent::Done { code: i as i32 }); } let err = rx.recv().await.expect_err("expected Lagged"); match err { broadcast::error::RecvError::Lagged(n) => assert!(n >= 10), other => panic!("unexpected error: {other:?}"), } } }