Skip to main content

max / synckit-client

4.7 KB · 149 lines History Blame Raw
1 //! SSE push notification stream for real-time sync notifications.
2 //!
3 //! The server sends zero-data `event: changed` events over SSE whenever
4 //! another device pushes changes. The client should pull on each event.
5 //! No auto-reconnect — consumer apps handle reconnect in their scheduler.
6
7 use crate::error::{Result, SyncKitError};
8
9 use super::SyncKitClient;
10
11 /// A stream of SSE "changed" notifications from the SyncKit server.
12 ///
13 /// Created by [`SyncKitClient::subscribe`]. Wraps a raw HTTP response byte
14 /// stream and parses SSE events line by line. Yields `Some(())` for each
15 /// `event: changed`, `None` when the stream ends or encounters an error.
16 pub struct SyncNotifyStream {
17 buffer: String,
18 response: reqwest::Response,
19 }
20
21 impl SyncNotifyStream {
22 pub(crate) fn new(response: reqwest::Response) -> Self {
23 Self {
24 buffer: String::new(),
25 response,
26 }
27 }
28
29 /// Wait for the next "changed" notification.
30 ///
31 /// Returns `Some(())` when the server signals new changes are available.
32 /// Returns `None` when the stream ends (server closed, network error).
33 /// Keepalive comments (lines starting with `:`) are silently ignored.
34 pub async fn next_change(&mut self) -> Option<()> {
35 loop {
36 // Try to extract a complete SSE block from the buffer
37 if let Some(pos) = self.buffer.find("\n\n") {
38 let block = self.buffer[..pos].to_string();
39 self.buffer = self.buffer[pos + 2..].to_string();
40
41 if parse_sse_block_is_changed(&block) {
42 return Some(());
43 }
44 // Not a "changed" event — skip and try next block
45 continue;
46 }
47
48 // Need more data from the stream
49 match self.response.chunk().await {
50 Ok(Some(chunk)) => {
51 let text = String::from_utf8_lossy(&chunk);
52 self.buffer.push_str(&text);
53 }
54 Ok(None) | Err(_) => return None,
55 }
56 }
57 }
58 }
59
60 /// Parse an SSE block and return `true` if it contains `event: changed`.
61 ///
62 /// An SSE block is one or more lines separated by `\n`, terminated by `\n\n`.
63 /// Lines starting with `:` are comments (keepalive). The `event:` field
64 /// specifies the event type, `data:` the payload (ignored here).
65 fn parse_sse_block_is_changed(block: &str) -> bool {
66 for line in block.lines() {
67 let trimmed = line.trim();
68 // Skip comments
69 if trimmed.starts_with(':') {
70 continue;
71 }
72 if let Some(value) = trimmed.strip_prefix("event:") {
73 if value.trim() == "changed" {
74 return true;
75 }
76 }
77 }
78 false
79 }
80
81 impl SyncKitClient {
82 /// Open an SSE connection for real-time push notifications.
83 ///
84 /// Returns a [`SyncNotifyStream`] that yields `Some(())` each time the
85 /// server signals new changes are available. The caller should pull after
86 /// each notification to get the actual changes.
87 ///
88 /// The stream does not auto-reconnect. If the connection drops,
89 /// `next_change()` returns `None` and the caller should reconnect
90 /// (typically after a short delay).
91 #[tracing::instrument(skip(self))]
92 pub async fn subscribe(&self) -> Result<SyncNotifyStream> {
93 let token = self.require_token()?;
94 let (app_id, _user_id) = self.require_session_ids()?;
95 let url = format!("{}?app_id={}", self.endpoints.subscribe, app_id);
96
97 let resp = self
98 .http
99 .get(&url)
100 .bearer_auth(&*token)
101 .send()
102 .await
103 .map_err(SyncKitError::Http)?;
104
105 let status = resp.status().as_u16();
106 if status >= 400 {
107 let message = resp.text().await.unwrap_or_default();
108 return Err(SyncKitError::Server { status, message });
109 }
110
111 Ok(SyncNotifyStream::new(resp))
112 }
113 }
114
115 #[cfg(test)]
116 mod tests {
117 use super::*;
118
119 #[test]
120 fn parse_changed_event() {
121 let block = "event: changed\ndata: {}";
122 assert!(parse_sse_block_is_changed(block));
123 }
124
125 #[test]
126 fn parse_ignores_keepalive_comments() {
127 let block = ": keepalive";
128 assert!(!parse_sse_block_is_changed(block));
129 }
130
131 #[test]
132 fn parse_handles_malformed_lines() {
133 let block = "garbled nonsense";
134 assert!(!parse_sse_block_is_changed(block));
135
136 let block2 = "";
137 assert!(!parse_sse_block_is_changed(block2));
138
139 let block3 = "event:other\ndata: test";
140 assert!(!parse_sse_block_is_changed(block3));
141 }
142
143 #[test]
144 fn parse_changed_with_extra_whitespace() {
145 let block = "event: changed \ndata: {}";
146 assert!(parse_sse_block_is_changed(block));
147 }
148 }
149