//! Live gate-log sink. //! //! Wraps the on-disk per-run log file with chunk broadcasting. Gate //! runners (`cargo_test`, `boot_smoke`) push their stdout/stderr through //! `LiveLog::write_chunk` as it arrives; the sink: //! 1. appends to the on-disk file at `//.log` //! so the post-mortem `GET /logs/...` route still has the full byte //! stream; //! 2. broadcasts a `GateLogChunk` event with a UTF-8-lossy slice of the //! same bytes, so the TUI sees the tail in real time. //! //! Chunks reflect tokio read boundaries — they are NOT line-aligned. //! Consumers that want lines must reassemble; the on-disk log preserves //! the exact byte stream for that purpose. use crate::domain::GateRunId; use crate::events::{self, Event, EventTx}; use std::path::{Path, PathBuf}; use tokio::fs::File; use tokio::io::AsyncWriteExt; pub struct LiveLog { file: Option, /// Kept for diagnostic logging when file IO is unavailable. path: PathBuf, events: EventTx, run_id: GateRunId, seq: u32, } impl LiveLog { /// Open the log file for append-streaming. Creates parent directories /// as needed. If the file can't be opened, the sink degrades to /// "broadcast only" — chunks still go out as `GateLogChunk` events; /// the missing on-disk log is logged as a warning. This matches the /// pre-step-6 invariant: a broken log dir doesn't turn a passing gate /// red. pub async fn open(events: EventTx, run_id: GateRunId, path: PathBuf) -> Self { let file = open_for_append(&path).await; Self { file, path, events, run_id, seq: 0 } } /// Append `bytes` to the on-disk log and broadcast a `GateLogChunk`. /// The broadcast goes out even if the disk write fails — operators /// watching live still see the chunk. pub async fn write_chunk(&mut self, bytes: &[u8]) { if bytes.is_empty() { return; } if let Some(f) = self.file.as_mut() { if let Err(e) = f.write_all(bytes).await { tracing::warn!(error = %e, path = %self.path.display(), "live log write failed"); self.file = None; } } let text = String::from_utf8_lossy(bytes).into_owned(); events::emit(&self.events, Event::GateLogChunk { run_id: self.run_id, seq: self.seq, text, }); self.seq = self.seq.saturating_add(1); } /// Flush and close the file. Best-effort: errors are logged. pub async fn close(mut self) { if let Some(mut f) = self.file.take() { if let Err(e) = f.flush().await { tracing::warn!(error = %e, path = %self.path.display(), "live log flush failed"); } } } pub fn run_id(&self) -> GateRunId { self.run_id } pub fn chunks_emitted(&self) -> u32 { self.seq } } /// Lets a Sando `LiveLog` be used directly as an [`ops_exec::LogSink`], so a /// gate or deploy run through `executor.run_streaming` streams into the same /// on-disk file + `GateLogChunk` broadcast the bespoke runner used. Delegates /// to the inherent [`LiveLog::write_chunk`] (inherent methods take precedence, /// so this is not recursive). #[async_trait::async_trait] impl ops_exec::LogSink for LiveLog { async fn write_chunk(&mut self, bytes: &[u8]) { self.write_chunk(bytes).await; } } async fn open_for_append(path: &Path) -> Option { if let Some(parent) = path.parent() { if let Err(e) = tokio::fs::create_dir_all(parent).await { tracing::warn!(error = %e, dir = %parent.display(), "could not create gate log dir"); return None; } } match tokio::fs::OpenOptions::new() .create(true) .append(true) .open(path) .await { Ok(f) => Some(f), Err(e) => { tracing::warn!(error = %e, path = %path.display(), "could not open gate log file"); None } } } #[cfg(test)] mod tests { use super::*; use crate::events::EventEnvelope; #[tokio::test] async fn write_chunk_emits_event_and_appends_to_file() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("nested/test.log"); let events = events::channel(); let mut rx = events.subscribe(); let mut log = LiveLog::open(events.clone(), GateRunId(7), path.clone()).await; log.write_chunk(b"hello ").await; log.write_chunk(b"world\n").await; log.close().await; let on_disk = tokio::fs::read_to_string(&path).await.unwrap(); assert_eq!(on_disk, "hello world\n"); let mut chunks: Vec = Vec::new(); while let Ok(env) = rx.try_recv() { if let Event::GateLogChunk { run_id, seq, text } = env.event { assert_eq!(run_id, GateRunId(7)); assert_eq!(seq as usize, chunks.len()); chunks.push(text); } } assert_eq!(chunks, vec!["hello ".to_string(), "world\n".to_string()]); } #[tokio::test] async fn write_chunk_emits_even_when_file_cannot_be_opened() { // Point at a path whose parent is a regular file — create_dir_all // will fail. The broadcast must still fire. let dir = tempfile::tempdir().unwrap(); let blocker = dir.path().join("blocker"); tokio::fs::write(&blocker, b"i am a file, not a dir").await.unwrap(); let path = blocker.join("inside.log"); // parent is a file let events = events::channel(); let mut rx = events.subscribe(); let mut log = LiveLog::open(events.clone(), GateRunId(1), path).await; log.write_chunk(b"streamed despite no file\n").await; log.close().await; let env: EventEnvelope = rx.try_recv().unwrap(); match env.event { Event::GateLogChunk { text, .. } => assert_eq!(text, "streamed despite no file\n"), _ => panic!("expected GateLogChunk"), } } #[tokio::test] async fn empty_chunk_is_noop() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("empty.log"); let events = events::channel(); let mut rx = events.subscribe(); let mut log = LiveLog::open(events.clone(), GateRunId(9), path).await; log.write_chunk(b"").await; assert_eq!(log.chunks_emitted(), 0); assert!(rx.try_recv().is_err()); log.close().await; } }