Skip to main content

max / makenotwork

5.4 KB · 152 lines History Blame Raw
1 //! Disk-append + callback live-log sink.
2 //!
3 //! Wraps an on-disk per-run log file with a per-chunk callback. A streamed
4 //! command (via [`crate::remote::RemoteHost::run_streaming`]) pushes its
5 //! merged stdout/stderr through `write_chunk` as it arrives; the sink:
6 //! 1. appends to the on-disk file so a post-mortem `GET /logs/...` route
7 //! still has the full byte stream;
8 //! 2. invokes the chunk callback with `(seq, text)` so the owning daemon can
9 //! broadcast a tool-specific log-chunk event (the callback is where the
10 //! coupling to a concrete `Event` enum lives, keeping this module
11 //! generic).
12 //!
13 //! Chunks reflect tokio read boundaries — they are NOT line-aligned.
14 //! Consumers that want lines must reassemble; the on-disk log preserves the
15 //! exact byte stream for that.
16
17 use crate::remote::LogSink;
18 use async_trait::async_trait;
19 use std::path::{Path, PathBuf};
20 use tokio::fs::File;
21 use tokio::io::AsyncWriteExt;
22
23 /// Callback invoked once per chunk with the monotonic per-run sequence number
24 /// and the UTF-8-lossy text. Boxed so the sink stays a concrete type that can
25 /// be shared behind an `Arc<Mutex<_>>`.
26 pub type ChunkCallback = Box<dyn FnMut(u32, &str) + Send>;
27
28 pub struct LiveLog {
29 file: Option<File>,
30 /// Kept for diagnostic logging when file IO is unavailable.
31 path: PathBuf,
32 on_chunk: ChunkCallback,
33 seq: u32,
34 }
35
36 impl LiveLog {
37 /// Open the log file for append-streaming, creating parent directories as
38 /// needed. If the file can't be opened the sink degrades to "callback
39 /// only" — chunks still reach the callback; the missing on-disk log is a
40 /// warning, never a failure (a broken log dir must not turn a passing run
41 /// red).
42 pub async fn open(path: PathBuf, on_chunk: ChunkCallback) -> Self {
43 let file = open_for_append(&path).await;
44 Self { file, path, on_chunk, seq: 0 }
45 }
46
47 /// Flush and close the file. Best-effort; errors are logged.
48 pub async fn close(mut self) {
49 if let Some(mut f) = self.file.take() {
50 if let Err(e) = f.flush().await {
51 tracing::warn!(error = %e, path = %self.path.display(), "live log flush failed");
52 }
53 }
54 }
55
56 pub fn chunks_emitted(&self) -> u32 {
57 self.seq
58 }
59 }
60
61 #[async_trait]
62 impl LogSink for LiveLog {
63 /// Append `bytes` to the on-disk log and invoke the callback. The callback
64 /// fires even if the disk write fails — operators watching live still see
65 /// the chunk.
66 async fn write_chunk(&mut self, bytes: &[u8]) {
67 if bytes.is_empty() {
68 return;
69 }
70 if let Some(f) = self.file.as_mut() {
71 if let Err(e) = f.write_all(bytes).await {
72 tracing::warn!(error = %e, path = %self.path.display(), "live log write failed");
73 self.file = None;
74 }
75 }
76 let text = String::from_utf8_lossy(bytes);
77 (self.on_chunk)(self.seq, &text);
78 self.seq = self.seq.saturating_add(1);
79 }
80 }
81
82 async fn open_for_append(path: &Path) -> Option<File> {
83 if let Some(parent) = path.parent() {
84 if let Err(e) = tokio::fs::create_dir_all(parent).await {
85 tracing::warn!(error = %e, dir = %parent.display(), "could not create log dir");
86 return None;
87 }
88 }
89 match tokio::fs::OpenOptions::new().create(true).append(true).open(path).await {
90 Ok(f) => Some(f),
91 Err(e) => {
92 tracing::warn!(error = %e, path = %path.display(), "could not open log file");
93 None
94 }
95 }
96 }
97
98 #[cfg(test)]
99 mod tests {
100 use super::*;
101 use std::sync::{Arc, Mutex};
102
103 #[tokio::test]
104 async fn write_chunk_appends_to_file_and_fires_callback() {
105 let dir = tempfile::tempdir().unwrap();
106 let path = dir.path().join("nested/run.log");
107 let seen: Arc<Mutex<Vec<(u32, String)>>> = Arc::new(Mutex::new(Vec::new()));
108 let cb_seen = seen.clone();
109 let mut log = LiveLog::open(
110 path.clone(),
111 Box::new(move |seq, text| cb_seen.lock().unwrap().push((seq, text.to_string()))),
112 )
113 .await;
114 log.write_chunk(b"hello ").await;
115 log.write_chunk(b"world\n").await;
116 log.close().await;
117
118 assert_eq!(tokio::fs::read_to_string(&path).await.unwrap(), "hello world\n");
119 assert_eq!(
120 *seen.lock().unwrap(),
121 vec![(0, "hello ".to_string()), (1, "world\n".to_string())]
122 );
123 }
124
125 #[tokio::test]
126 async fn callback_fires_even_when_file_cannot_open() {
127 let dir = tempfile::tempdir().unwrap();
128 let blocker = dir.path().join("blocker");
129 tokio::fs::write(&blocker, b"i am a file").await.unwrap();
130 let path = blocker.join("inside.log"); // parent is a file
131 let seen = Arc::new(Mutex::new(Vec::new()));
132 let cb_seen = seen.clone();
133 let mut log = LiveLog::open(
134 path,
135 Box::new(move |_, text| cb_seen.lock().unwrap().push(text.to_string())),
136 )
137 .await;
138 log.write_chunk(b"streamed despite no file\n").await;
139 log.close().await;
140 assert_eq!(*seen.lock().unwrap(), vec!["streamed despite no file\n".to_string()]);
141 }
142
143 #[tokio::test]
144 async fn empty_chunk_is_noop() {
145 let dir = tempfile::tempdir().unwrap();
146 let path = dir.path().join("empty.log");
147 let mut log = LiveLog::open(path, Box::new(|_, _| {})).await;
148 log.write_chunk(b"").await;
149 assert_eq!(log.chunks_emitted(), 0);
150 }
151 }
152