//! Disk-append + callback live-log sink. //! //! Wraps an on-disk per-run log file with a per-chunk callback. A streamed //! command (via [`crate::remote::RemoteHost::run_streaming`]) pushes its //! merged stdout/stderr through `write_chunk` as it arrives; the sink: //! 1. appends to the on-disk file so a post-mortem `GET /logs/...` route //! still has the full byte stream; //! 2. invokes the chunk callback with `(seq, text)` so the owning daemon can //! broadcast a tool-specific log-chunk event (the callback is where the //! coupling to a concrete `Event` enum lives, keeping this module //! generic). //! //! Chunks reflect tokio read boundaries — they are NOT line-aligned. //! Consumers that want lines must reassemble; the on-disk log preserves the //! exact byte stream for that. use crate::remote::LogSink; use async_trait::async_trait; use std::path::{Path, PathBuf}; use tokio::fs::File; use tokio::io::AsyncWriteExt; /// Callback invoked once per chunk with the monotonic per-run sequence number /// and the UTF-8-lossy text. Boxed so the sink stays a concrete type that can /// be shared behind an `Arc>`. pub type ChunkCallback = Box; pub struct LiveLog { file: Option, /// Kept for diagnostic logging when file IO is unavailable. path: PathBuf, on_chunk: ChunkCallback, seq: u32, } impl LiveLog { /// Open the log file for append-streaming, creating parent directories as /// needed. If the file can't be opened the sink degrades to "callback /// only" — chunks still reach the callback; the missing on-disk log is a /// warning, never a failure (a broken log dir must not turn a passing run /// red). pub async fn open(path: PathBuf, on_chunk: ChunkCallback) -> Self { let file = open_for_append(&path).await; Self { file, path, on_chunk, seq: 0 } } /// Flush and close the file. Best-effort; errors are logged. pub async fn close(mut self) { if let Some(mut f) = self.file.take() { if let Err(e) = f.flush().await { tracing::warn!(error = %e, path = %self.path.display(), "live log flush failed"); } } } pub fn chunks_emitted(&self) -> u32 { self.seq } } #[async_trait] impl LogSink for LiveLog { /// Append `bytes` to the on-disk log and invoke the callback. The callback /// fires even if the disk write fails — operators watching live still see /// the chunk. async fn write_chunk(&mut self, bytes: &[u8]) { if bytes.is_empty() { return; } if let Some(f) = self.file.as_mut() { if let Err(e) = f.write_all(bytes).await { tracing::warn!(error = %e, path = %self.path.display(), "live log write failed"); self.file = None; } } let text = String::from_utf8_lossy(bytes); (self.on_chunk)(self.seq, &text); self.seq = self.seq.saturating_add(1); } } async fn open_for_append(path: &Path) -> Option { if let Some(parent) = path.parent() { if let Err(e) = tokio::fs::create_dir_all(parent).await { tracing::warn!(error = %e, dir = %parent.display(), "could not create log dir"); return None; } } match tokio::fs::OpenOptions::new().create(true).append(true).open(path).await { Ok(f) => Some(f), Err(e) => { tracing::warn!(error = %e, path = %path.display(), "could not open log file"); None } } } #[cfg(test)] mod tests { use super::*; use std::sync::{Arc, Mutex}; #[tokio::test] async fn write_chunk_appends_to_file_and_fires_callback() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("nested/run.log"); let seen: Arc>> = Arc::new(Mutex::new(Vec::new())); let cb_seen = seen.clone(); let mut log = LiveLog::open( path.clone(), Box::new(move |seq, text| cb_seen.lock().unwrap().push((seq, text.to_string()))), ) .await; log.write_chunk(b"hello ").await; log.write_chunk(b"world\n").await; log.close().await; assert_eq!(tokio::fs::read_to_string(&path).await.unwrap(), "hello world\n"); assert_eq!( *seen.lock().unwrap(), vec![(0, "hello ".to_string()), (1, "world\n".to_string())] ); } #[tokio::test] async fn callback_fires_even_when_file_cannot_open() { let dir = tempfile::tempdir().unwrap(); let blocker = dir.path().join("blocker"); tokio::fs::write(&blocker, b"i am a file").await.unwrap(); let path = blocker.join("inside.log"); // parent is a file let seen = Arc::new(Mutex::new(Vec::new())); let cb_seen = seen.clone(); let mut log = LiveLog::open( path, Box::new(move |_, text| cb_seen.lock().unwrap().push(text.to_string())), ) .await; log.write_chunk(b"streamed despite no file\n").await; log.close().await; assert_eq!(*seen.lock().unwrap(), vec!["streamed despite no file\n".to_string()]); } #[tokio::test] async fn empty_chunk_is_noop() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("empty.log"); let mut log = LiveLog::open(path, Box::new(|_, _| {})).await; log.write_chunk(b"").await; assert_eq!(log.chunks_emitted(), 0); } }