//! SSE push notification stream for real-time sync notifications. //! //! The server sends zero-data `event: changed` events over SSE whenever //! another device pushes changes. The client should pull on each event. //! No auto-reconnect — consumer apps handle reconnect in their scheduler. use crate::error::{Result, SyncKitError}; use super::SyncKitClient; /// A stream of SSE "changed" notifications from the SyncKit server. /// /// Created by [`SyncKitClient::subscribe`]. Wraps a raw HTTP response byte /// stream and parses SSE events line by line. Yields `Some(())` for each /// `event: changed`, `None` when the stream ends or encounters an error. pub struct SyncNotifyStream { buffer: String, response: reqwest::Response, } impl SyncNotifyStream { pub(crate) fn new(response: reqwest::Response) -> Self { Self { buffer: String::new(), response, } } /// Wait for the next "changed" notification. /// /// Returns `Some(())` when the server signals new changes are available. /// Returns `None` when the stream ends (server closed, network error). /// Keepalive comments (lines starting with `:`) are silently ignored. pub async fn next_change(&mut self) -> Option<()> { loop { // Try to extract a complete SSE block from the buffer if let Some(pos) = self.buffer.find("\n\n") { let block = self.buffer[..pos].to_string(); self.buffer = self.buffer[pos + 2..].to_string(); if parse_sse_block_is_changed(&block) { return Some(()); } // Not a "changed" event — skip and try next block continue; } // Need more data from the stream match self.response.chunk().await { Ok(Some(chunk)) => { let text = String::from_utf8_lossy(&chunk); self.buffer.push_str(&text); } Ok(None) | Err(_) => return None, } } } } /// Parse an SSE block and return `true` if it contains `event: changed`. /// /// An SSE block is one or more lines separated by `\n`, terminated by `\n\n`. /// Lines starting with `:` are comments (keepalive). The `event:` field /// specifies the event type, `data:` the payload (ignored here). fn parse_sse_block_is_changed(block: &str) -> bool { for line in block.lines() { let trimmed = line.trim(); // Skip comments if trimmed.starts_with(':') { continue; } if let Some(value) = trimmed.strip_prefix("event:") { if value.trim() == "changed" { return true; } } } false } impl SyncKitClient { /// Open an SSE connection for real-time push notifications. /// /// Returns a [`SyncNotifyStream`] that yields `Some(())` each time the /// server signals new changes are available. The caller should pull after /// each notification to get the actual changes. /// /// The stream does not auto-reconnect. If the connection drops, /// `next_change()` returns `None` and the caller should reconnect /// (typically after a short delay). #[tracing::instrument(skip(self))] pub async fn subscribe(&self) -> Result { let token = self.require_token()?; let (app_id, _user_id) = self.require_session_ids()?; let url = format!("{}?app_id={}", self.endpoints.subscribe, app_id); let resp = self .http .get(&url) .bearer_auth(&*token) .send() .await .map_err(SyncKitError::Http)?; let status = resp.status().as_u16(); if status >= 400 { let message = resp.text().await.unwrap_or_default(); return Err(SyncKitError::Server { status, message }); } Ok(SyncNotifyStream::new(resp)) } } #[cfg(test)] mod tests { use super::*; #[test] fn parse_changed_event() { let block = "event: changed\ndata: {}"; assert!(parse_sse_block_is_changed(block)); } #[test] fn parse_ignores_keepalive_comments() { let block = ": keepalive"; assert!(!parse_sse_block_is_changed(block)); } #[test] fn parse_handles_malformed_lines() { let block = "garbled nonsense"; assert!(!parse_sse_block_is_changed(block)); let block2 = ""; assert!(!parse_sse_block_is_changed(block2)); let block3 = "event:other\ndata: test"; assert!(!parse_sse_block_is_changed(block3)); } #[test] fn parse_changed_with_extra_whitespace() { let block = "event: changed \ndata: {}"; assert!(parse_sse_block_is_changed(block)); } }