Skip to main content

max / makenotwork

6.4 KB · 173 lines History Blame Raw
1 //! Live gate-log sink.
2 //!
3 //! Wraps the on-disk per-run log file with chunk broadcasting. Gate
4 //! runners (`cargo_test`, `boot_smoke`) push their stdout/stderr through
5 //! `LiveLog::write_chunk` as it arrives; the sink:
6 //! 1. appends to the on-disk file at `<logs_root>/<version>/<gate>.log`
7 //! so the post-mortem `GET /logs/...` route still has the full byte
8 //! stream;
9 //! 2. broadcasts a `GateLogChunk` event with a UTF-8-lossy slice of the
10 //! same bytes, so the TUI sees the tail in real time.
11 //!
12 //! Chunks reflect tokio read boundaries — they are NOT line-aligned.
13 //! Consumers that want lines must reassemble; the on-disk log preserves
14 //! the exact byte stream for that purpose.
15
16 use crate::domain::GateRunId;
17 use crate::events::{self, Event, EventTx};
18 use std::path::{Path, PathBuf};
19 use tokio::fs::File;
20 use tokio::io::AsyncWriteExt;
21
22 pub struct LiveLog {
23 file: Option<File>,
24 /// Kept for diagnostic logging when file IO is unavailable.
25 path: PathBuf,
26 events: EventTx,
27 run_id: GateRunId,
28 seq: u32,
29 }
30
31 impl LiveLog {
32 /// Open the log file for append-streaming. Creates parent directories
33 /// as needed. If the file can't be opened, the sink degrades to
34 /// "broadcast only" — chunks still go out as `GateLogChunk` events;
35 /// the missing on-disk log is logged as a warning. This matches the
36 /// pre-step-6 invariant: a broken log dir doesn't turn a passing gate
37 /// red.
38 pub async fn open(events: EventTx, run_id: GateRunId, path: PathBuf) -> Self {
39 let file = open_for_append(&path).await;
40 Self { file, path, events, run_id, seq: 0 }
41 }
42
43 /// Append `bytes` to the on-disk log and broadcast a `GateLogChunk`.
44 /// The broadcast goes out even if the disk write fails — operators
45 /// watching live still see the chunk.
46 pub async fn write_chunk(&mut self, bytes: &[u8]) {
47 if bytes.is_empty() { return; }
48 if let Some(f) = self.file.as_mut() {
49 if let Err(e) = f.write_all(bytes).await {
50 tracing::warn!(error = %e, path = %self.path.display(), "live log write failed");
51 self.file = None;
52 }
53 }
54 let text = String::from_utf8_lossy(bytes).into_owned();
55 events::emit(&self.events, Event::GateLogChunk {
56 run_id: self.run_id,
57 seq: self.seq,
58 text,
59 });
60 self.seq = self.seq.saturating_add(1);
61 }
62
63 /// Flush and close the file. Best-effort: errors are logged.
64 pub async fn close(mut self) {
65 if let Some(mut f) = self.file.take() {
66 if let Err(e) = f.flush().await {
67 tracing::warn!(error = %e, path = %self.path.display(), "live log flush failed");
68 }
69 }
70 }
71
72 pub fn run_id(&self) -> GateRunId { self.run_id }
73 pub fn chunks_emitted(&self) -> u32 { self.seq }
74 }
75
76 /// Lets a Sando `LiveLog` be used directly as an [`ops_exec::LogSink`], so a
77 /// gate or deploy run through `executor.run_streaming` streams into the same
78 /// on-disk file + `GateLogChunk` broadcast the bespoke runner used. Delegates
79 /// to the inherent [`LiveLog::write_chunk`] (inherent methods take precedence,
80 /// so this is not recursive).
81 #[async_trait::async_trait]
82 impl ops_exec::LogSink for LiveLog {
83 async fn write_chunk(&mut self, bytes: &[u8]) {
84 self.write_chunk(bytes).await;
85 }
86 }
87
88 async fn open_for_append(path: &Path) -> Option<File> {
89 if let Some(parent) = path.parent() {
90 if let Err(e) = tokio::fs::create_dir_all(parent).await {
91 tracing::warn!(error = %e, dir = %parent.display(), "could not create gate log dir");
92 return None;
93 }
94 }
95 match tokio::fs::OpenOptions::new()
96 .create(true)
97 .append(true)
98 .open(path)
99 .await
100 {
101 Ok(f) => Some(f),
102 Err(e) => {
103 tracing::warn!(error = %e, path = %path.display(), "could not open gate log file");
104 None
105 }
106 }
107 }
108
109 #[cfg(test)]
110 mod tests {
111 use super::*;
112 use crate::events::EventEnvelope;
113
114 #[tokio::test]
115 async fn write_chunk_emits_event_and_appends_to_file() {
116 let dir = tempfile::tempdir().unwrap();
117 let path = dir.path().join("nested/test.log");
118 let events = events::channel();
119 let mut rx = events.subscribe();
120 let mut log = LiveLog::open(events.clone(), GateRunId(7), path.clone()).await;
121 log.write_chunk(b"hello ").await;
122 log.write_chunk(b"world\n").await;
123 log.close().await;
124
125 let on_disk = tokio::fs::read_to_string(&path).await.unwrap();
126 assert_eq!(on_disk, "hello world\n");
127
128 let mut chunks: Vec<String> = Vec::new();
129 while let Ok(env) = rx.try_recv() {
130 if let Event::GateLogChunk { run_id, seq, text } = env.event {
131 assert_eq!(run_id, GateRunId(7));
132 assert_eq!(seq as usize, chunks.len());
133 chunks.push(text);
134 }
135 }
136 assert_eq!(chunks, vec!["hello ".to_string(), "world\n".to_string()]);
137 }
138
139 #[tokio::test]
140 async fn write_chunk_emits_even_when_file_cannot_be_opened() {
141 // Point at a path whose parent is a regular file — create_dir_all
142 // will fail. The broadcast must still fire.
143 let dir = tempfile::tempdir().unwrap();
144 let blocker = dir.path().join("blocker");
145 tokio::fs::write(&blocker, b"i am a file, not a dir").await.unwrap();
146 let path = blocker.join("inside.log"); // parent is a file
147 let events = events::channel();
148 let mut rx = events.subscribe();
149 let mut log = LiveLog::open(events.clone(), GateRunId(1), path).await;
150 log.write_chunk(b"streamed despite no file\n").await;
151 log.close().await;
152
153 let env: EventEnvelope = rx.try_recv().unwrap();
154 match env.event {
155 Event::GateLogChunk { text, .. } => assert_eq!(text, "streamed despite no file\n"),
156 _ => panic!("expected GateLogChunk"),
157 }
158 }
159
160 #[tokio::test]
161 async fn empty_chunk_is_noop() {
162 let dir = tempfile::tempdir().unwrap();
163 let path = dir.path().join("empty.log");
164 let events = events::channel();
165 let mut rx = events.subscribe();
166 let mut log = LiveLog::open(events.clone(), GateRunId(9), path).await;
167 log.write_chunk(b"").await;
168 assert_eq!(log.chunks_emitted(), 0);
169 assert!(rx.try_recv().is_err());
170 log.close().await;
171 }
172 }
173