Skip to main content

max / makenotwork

5.6 KB · 167 lines History Blame Raw
1 //! SSE push notification stream for real-time sync notifications.
2 //!
3 //! The server sends zero-data `event: changed` events over SSE whenever
4 //! another device pushes changes. The client should pull on each event.
5 //! No auto-reconnect — consumer apps handle reconnect in their scheduler.
6
7 use crate::error::{Result, SyncKitError};
8
9 use super::SyncKitClient;
10
11 /// A stream of SSE "changed" notifications from the SyncKit server.
12 ///
13 /// Created by [`SyncKitClient::subscribe`]. Wraps a raw HTTP response byte
14 /// stream and parses SSE events line by line. Yields `Some(())` for each
15 /// `event: changed`, `None` when the stream ends or encounters an error.
16 pub struct SyncNotifyStream {
17 buffer: String,
18 response: reqwest::Response,
19 }
20
21 /// Maximum SSE buffer size (1 MB). If the server sends data without a
22 /// block terminator (`\n\n`) exceeding this limit, we treat it as an error
23 /// and close the stream to prevent unbounded memory growth.
24 const MAX_SSE_BUFFER: usize = 1024 * 1024;
25
26 impl SyncNotifyStream {
27 pub(crate) fn new(response: reqwest::Response) -> Self {
28 Self {
29 buffer: String::new(),
30 response,
31 }
32 }
33
34 /// Wait for the next "changed" notification.
35 ///
36 /// Returns `Some(())` when the server signals new changes are available.
37 /// Returns `None` when the stream ends (server closed, network error).
38 /// Keepalive comments (lines starting with `:`) are silently ignored.
39 pub async fn next_change(&mut self) -> Option<()> {
40 loop {
41 // Try to extract a complete SSE block from the buffer
42 if let Some(pos) = self.buffer.find("\n\n") {
43 let block = self.buffer[..pos].to_string();
44 self.buffer = self.buffer[pos + 2..].to_string();
45
46 if parse_sse_block_is_changed(&block) {
47 return Some(());
48 }
49 // Not a "changed" event — skip and try next block
50 continue;
51 }
52
53 // Need more data from the stream
54 match self.response.chunk().await {
55 Ok(Some(chunk)) => {
56 match std::str::from_utf8(&chunk) {
57 Ok(text) => self.buffer.push_str(text),
58 Err(_) => {
59 tracing::warn!("SSE stream contained invalid UTF-8, closing");
60 return None;
61 }
62 }
63 if self.buffer.len() > MAX_SSE_BUFFER {
64 tracing::warn!("SSE buffer exceeded {MAX_SSE_BUFFER} bytes, closing stream");
65 return None;
66 }
67 }
68 Ok(None) => return None,
69 Err(e) => {
70 tracing::debug!(error = %e, "SSE stream error, closing");
71 return None;
72 }
73 }
74 }
75 }
76 }
77
78 /// Parse an SSE block and return `true` if it contains `event: changed`.
79 ///
80 /// An SSE block is one or more lines separated by `\n`, terminated by `\n\n`.
81 /// Lines starting with `:` are comments (keepalive). The `event:` field
82 /// specifies the event type, `data:` the payload (ignored here).
83 fn parse_sse_block_is_changed(block: &str) -> bool {
84 for line in block.lines() {
85 let trimmed = line.trim();
86 // Skip comments
87 if trimmed.starts_with(':') {
88 continue;
89 }
90 if let Some(value) = trimmed.strip_prefix("event:") {
91 if value.trim() == "changed" {
92 return true;
93 }
94 }
95 }
96 false
97 }
98
99 impl SyncKitClient {
100 /// Open an SSE connection for real-time push notifications.
101 ///
102 /// Returns a [`SyncNotifyStream`] that yields `Some(())` each time the
103 /// server signals new changes are available. The caller should pull after
104 /// each notification to get the actual changes.
105 ///
106 /// The stream does not auto-reconnect. If the connection drops,
107 /// `next_change()` returns `None` and the caller should reconnect
108 /// (typically after a short delay).
109 #[tracing::instrument(skip(self))]
110 pub async fn subscribe(&self) -> Result<SyncNotifyStream> {
111 let token = self.require_token()?;
112 let (app_id, _user_id) = self.require_session_ids()?;
113 let url = format!("{}?app_id={}", self.endpoints.subscribe, app_id);
114
115 let resp = self
116 .http
117 .get(&url)
118 .bearer_auth(&*token)
119 .send()
120 .await
121 .map_err(SyncKitError::Http)?;
122
123 let status = resp.status().as_u16();
124 if status >= 400 {
125 let message = resp.text().await.unwrap_or_default();
126 return Err(SyncKitError::Server { status, message, retry_after_secs: None });
127 }
128
129 Ok(SyncNotifyStream::new(resp))
130 }
131 }
132
133 #[cfg(test)]
134 mod tests {
135 use super::*;
136
137 #[test]
138 fn parse_changed_event() {
139 let block = "event: changed\ndata: {}";
140 assert!(parse_sse_block_is_changed(block));
141 }
142
143 #[test]
144 fn parse_ignores_keepalive_comments() {
145 let block = ": keepalive";
146 assert!(!parse_sse_block_is_changed(block));
147 }
148
149 #[test]
150 fn parse_handles_malformed_lines() {
151 let block = "garbled nonsense";
152 assert!(!parse_sse_block_is_changed(block));
153
154 let block2 = "";
155 assert!(!parse_sse_block_is_changed(block2));
156
157 let block3 = "event:other\ndata: test";
158 assert!(!parse_sse_block_is_changed(block3));
159 }
160
161 #[test]
162 fn parse_changed_with_extra_whitespace() {
163 let block = "event: changed \ndata: {}";
164 assert!(parse_sse_block_is_changed(block));
165 }
166 }
167