Skip to main content

max / makenotwork

6.2 KB · 167 lines History Blame Raw
1 //! Event bus for live operator visibility.
2 //!
3 //! Sites that previously logged via `tracing::info!` also emit a typed event
4 //! onto a `broadcast::Sender<EventEnvelope>`. The WS handler at `/events`
5 //! subscribes to the bus and forwards each envelope to the connected TUI as
6 //! a JSON text frame.
7
8 use crate::domain::{GateKind, GateRunId, GitSha, NodeId, TierId, Version};
9 use crate::outcome::{DeployFailureKind, GateOutcome};
10 use chrono::{DateTime, Utc};
11 use serde::{Deserialize, Serialize};
12 use tokio::sync::broadcast;
13
14 /// Capacity of the broadcast channel. Slow subscribers that fall behind by
15 /// more than this many events get `RecvError::Lagged`; the WS handler treats
16 /// that as a recoverable hiccup, not a disconnect.
17 pub const CAPACITY: usize = 256;
18
19 pub type EventTx = broadcast::Sender<EventEnvelope>;
20
21 #[derive(Clone, Debug, Serialize, Deserialize)]
22 pub struct EventEnvelope {
23 pub at: DateTime<Utc>,
24 #[serde(flatten)]
25 pub event: Event,
26 }
27
28 #[derive(Clone, Debug, Serialize, Deserialize)]
29 #[serde(tag = "kind", rename_all = "snake_case")]
30 pub enum Event {
31 /// A /rebuild was accepted (post-receive hook or operator).
32 RebuildRequested { sha: GitSha },
33 /// A previous in-flight build was aborted because a newer /rebuild arrived.
34 BuildAborted { sha_aborted: GitSha },
35 BuildStart { sha: GitSha, version: Version },
36 BuildOk { sha: GitSha, version: Version, elapsed_s: u64 },
37 BuildFailed { sha: GitSha, version: Version, elapsed_s: u64 },
38 GateStart {
39 run_id: GateRunId,
40 tier: TierId,
41 version: Version,
42 gate: GateKind,
43 },
44 /// Chunk of combined stdout+stderr from a gate that's currently running.
45 /// `run_id` correlates back to the `GateStart` for the same gate run; the
46 /// TUI uses it to group chunks if it wants a per-run buffer. `seq` is a
47 /// monotonic counter scoped to one run (resets across runs). `text` is a
48 /// UTF-8-lossy slice of bytes — chunks reflect tokio read boundaries, not
49 /// line boundaries; the on-disk log at `outcome.log_ref` is the full,
50 /// byte-exact stream.
51 GateLogChunk {
52 run_id: GateRunId,
53 seq: u32,
54 text: String,
55 },
56 /// `outcome` carries classification, blocker variants, and the log_ref —
57 /// consumers should read `outcome.status` to decide pass/fail/blocked.
58 GateDone {
59 run_id: GateRunId,
60 tier: TierId,
61 version: Version,
62 gate: GateKind,
63 outcome: GateOutcome,
64 },
65 DeployStart { tier: TierId, node: NodeId, version: Version },
66 DeployOk { tier: TierId, node: NodeId, version: Version },
67 DeployFailed {
68 tier: TierId,
69 node: NodeId,
70 version: Version,
71 failure: DeployFailureKind,
72 },
73 PromoteComplete { tier: TierId, version: Version },
74 Rollback { tier: TierId, from: Version, to: Version },
75 /// `source` is an ssh URL, kept freeform on purpose — it's a transport
76 /// detail, not a domain identifier.
77 BackupFetched { source: String, byte_size: i64 },
78 ManualConfirm { tier: TierId, version: Version },
79 }
80
81 pub fn channel() -> EventTx {
82 broadcast::channel(CAPACITY).0
83 }
84
85 /// Send an event without caring whether anyone is listening. The `send` call
86 /// fails only when there are zero subscribers, which is the normal case for
87 /// most operator-tool deployments.
88 pub fn emit(tx: &EventTx, event: Event) {
89 let envelope = EventEnvelope { at: Utc::now(), event };
90 let _ = tx.send(envelope);
91 }
92
93 #[cfg(test)]
94 mod tests {
95 use super::*;
96
97 #[test]
98 fn emit_with_zero_subscribers_does_not_panic() {
99 // The whole point of `let _ = tx.send(...)` is that emitting into an
100 // unsubscribed bus is fine. Verify the contract — if this regresses
101 // to `.unwrap()` someday, every build/deploy site will start
102 // crashing.
103 let tx = channel();
104 emit(&tx, Event::RebuildRequested { sha: GitSha::parse("abc1234").unwrap() });
105 emit(&tx, Event::BackupFetched { source: "x".into(), byte_size: 1 });
106 }
107
108 #[tokio::test]
109 async fn emit_reaches_a_subscriber() {
110 let tx = channel();
111 let mut rx = tx.subscribe();
112 emit(&tx, Event::PromoteComplete {
113 tier: TierId::new("a"),
114 version: "0.8.12".parse().unwrap(),
115 });
116 let env = rx.recv().await.expect("envelope");
117 match env.event {
118 Event::PromoteComplete { tier, version } => {
119 assert_eq!(tier.as_str(), "a");
120 assert_eq!(version.to_string(), "0.8.12");
121 }
122 _ => panic!("wrong event kind"),
123 }
124 }
125
126 #[tokio::test]
127 async fn envelope_serializes_with_flat_kind() {
128 // Contract for the WS handler + TUI's `format_event`: the JSON has a
129 // top-level `kind` field, not nested under `event`. Locking this in.
130 let env = EventEnvelope {
131 at: Utc::now(),
132 event: Event::GateStart {
133 run_id: GateRunId(42),
134 tier: TierId::new("host"),
135 version: "0.8.12".parse().unwrap(),
136 gate: GateKind::CargoTest,
137 },
138 };
139 let s = serde_json::to_string(&env).unwrap();
140 let v: serde_json::Value = serde_json::from_str(&s).unwrap();
141 assert_eq!(v["kind"], "gate_start");
142 assert_eq!(v["tier"], "host");
143 assert_eq!(v["gate"], "cargo_test");
144 // No nested `event` object.
145 assert!(v.get("event").is_none());
146 }
147
148 #[tokio::test]
149 async fn lagged_subscriber_observes_recv_error_lagged() {
150 // If a subscriber falls behind by more than CAPACITY, the next
151 // recv() returns RecvError::Lagged(n) — not Closed, not a panic.
152 // The WS handler turns this into a `lagged` envelope.
153 let tx = channel();
154 let mut rx = tx.subscribe();
155 for i in 0..(CAPACITY + 10) {
156 // 7+ hex chars satisfy GitSha::parse; pad i into that shape.
157 let sha = GitSha::parse(&format!("{i:0>7x}")).unwrap();
158 emit(&tx, Event::RebuildRequested { sha });
159 }
160 let err = rx.recv().await.expect_err("expected Lagged");
161 match err {
162 tokio::sync::broadcast::error::RecvError::Lagged(n) => assert!(n >= 10),
163 other => panic!("unexpected error: {other:?}"),
164 }
165 }
166 }
167