Skip to main content

max / makenotwork

3.7 KB · 105 lines History Blame Raw
1 //! Generic event bus for live operator visibility.
2 //!
3 //! Sando and Bento both broadcast typed events to a connected TUI over a
4 //! `tokio::sync::broadcast` channel; the WS handler forwards each envelope as
5 //! a JSON text frame. The only thing that differs between the tools is the
6 //! concrete event payload, so the bus is generic over `E`.
7 //!
8 //! The envelope flattens the payload, so a payload enum tagged
9 //! `#[serde(tag = "kind", rename_all = "snake_case")]` serializes with a
10 //! top-level `kind` field plus the variant's fields inline — the wire shape
11 //! the TUIs parse.
12
13 use chrono::{DateTime, Utc};
14 use serde::{Deserialize, Serialize};
15 use tokio::sync::broadcast;
16
17 /// Capacity of the broadcast channel. Subscribers that fall behind by more
18 /// than this many events get `RecvError::Lagged`; WS handlers treat that as a
19 /// recoverable hiccup (emit a `lagged` notice), not a disconnect.
20 pub const CAPACITY: usize = 256;
21
22 /// An event payload `E` stamped with the time it was emitted.
23 #[derive(Clone, Debug, Serialize, Deserialize)]
24 pub struct EventEnvelope<E> {
25 pub at: DateTime<Utc>,
26 #[serde(flatten)]
27 pub event: E,
28 }
29
30 /// The broadcast sender every emit site clones. `subscribe()` yields a
31 /// receiver for the WS handler.
32 pub type EventTx<E> = broadcast::Sender<EventEnvelope<E>>;
33
34 /// Create a fresh bus. The dropped receiver from `broadcast::channel` is fine
35 /// — emitters don't care whether anyone is listening (see [`emit`]).
36 pub fn channel<E: Clone>() -> EventTx<E> {
37 broadcast::channel(CAPACITY).0
38 }
39
40 /// Send an event without caring whether anyone is subscribed. `send` errors
41 /// only when there are zero receivers, which is the normal idle case for an
42 /// operator tool, so the error is intentionally dropped.
43 pub fn emit<E: Clone>(tx: &EventTx<E>, event: E) {
44 let envelope = EventEnvelope { at: Utc::now(), event };
45 let _ = tx.send(envelope);
46 }
47
48 #[cfg(test)]
49 mod tests {
50 use super::*;
51 use serde::{Deserialize, Serialize};
52
53 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
54 #[serde(tag = "kind", rename_all = "snake_case")]
55 enum TestEvent {
56 Started { name: String },
57 Done { code: i32 },
58 }
59
60 #[test]
61 fn emit_with_zero_subscribers_does_not_panic() {
62 let tx = channel::<TestEvent>();
63 emit(&tx, TestEvent::Started { name: "x".into() });
64 }
65
66 #[tokio::test]
67 async fn emit_reaches_a_subscriber() {
68 let tx = channel::<TestEvent>();
69 let mut rx = tx.subscribe();
70 emit(&tx, TestEvent::Done { code: 7 });
71 let env = rx.recv().await.expect("envelope");
72 assert_eq!(env.event, TestEvent::Done { code: 7 });
73 }
74
75 #[test]
76 fn envelope_serializes_with_flat_kind() {
77 let env = EventEnvelope {
78 at: Utc::now(),
79 event: TestEvent::Started { name: "build".into() },
80 };
81 let v: serde_json::Value =
82 serde_json::from_str(&serde_json::to_string(&env).unwrap()).unwrap();
83 assert_eq!(v["kind"], "started");
84 assert_eq!(v["name"], "build");
85 // Flattened, not nested under `event`.
86 assert!(v.get("event").is_none());
87 // `at` is preserved.
88 assert!(v.get("at").is_some());
89 }
90
91 #[tokio::test]
92 async fn lagged_subscriber_observes_recv_error_lagged() {
93 let tx = channel::<TestEvent>();
94 let mut rx = tx.subscribe();
95 for i in 0..(CAPACITY + 10) {
96 emit(&tx, TestEvent::Done { code: i as i32 });
97 }
98 let err = rx.recv().await.expect_err("expected Lagged");
99 match err {
100 broadcast::error::RecvError::Lagged(n) => assert!(n >= 10),
101 other => panic!("unexpected error: {other:?}"),
102 }
103 }
104 }
105