Skip to main content

max / synckit-client

17.6 KB · 549 lines History Blame Raw
1 use bytes::Bytes;
2 use tracing::instrument;
3 use uuid::Uuid;
4
5 use crate::{
6 crypto,
7 error::Result,
8 types::*,
9 };
10
11 use super::SyncKitClient;
12 use super::helpers::check_response;
13
14 impl SyncKitClient {
15 // ── Devices ──
16
17 /// Register a device for sync.
18 ///
19 /// If a device with the same name already exists for this user/app, the
20 /// server upserts: it updates the existing device's platform and
21 /// `last_seen_at` rather than creating a duplicate.
22 #[instrument(skip(self))]
23 pub async fn register_device(
24 &self,
25 device_name: &str,
26 platform: &str,
27 ) -> Result<Device> {
28 let token = self.require_token()?;
29
30 let body = Bytes::from(serde_json::to_vec(&RegisterDeviceRequest {
31 device_name: device_name.to_string(),
32 platform: platform.to_string(),
33 })?);
34
35 let resp = self
36 .retry_request(|| {
37 let req = self
38 .http
39 .post(&self.endpoints.devices)
40 .bearer_auth(&token)
41 .header("content-type", "application/json")
42 .body(body.clone());
43 async move { check_response(req.send().await?).await }
44 })
45 .await?;
46 Ok(resp.json().await?)
47 }
48
49 /// List all devices for the current user.
50 #[instrument(skip(self))]
51 pub async fn list_devices(&self) -> Result<Vec<Device>> {
52 let token = self.require_token()?;
53
54 let resp = self
55 .retry_request(|| {
56 let req = self.http.get(&self.endpoints.devices).bearer_auth(&token);
57 async move { check_response(req.send().await?).await }
58 })
59 .await?;
60 Ok(resp.json().await?)
61 }
62
63 // ── Push / Pull ──
64
65 /// Push changes to the server. Encrypts `data` fields automatically.
66 /// Returns the server cursor after the push.
67 ///
68 /// Retries on transient failures (network errors, 5xx, 429) with exponential backoff.
69 #[instrument(skip(self, changes))]
70 pub async fn push(
71 &self,
72 device_id: Uuid,
73 changes: Vec<ChangeEntry>,
74 ) -> Result<i64> {
75 let token = self.require_token()?;
76
77 // Extract key once for the entire batch (only needed if any entry has data)
78 let has_data = changes.iter().any(|c| c.data.is_some());
79 let key_holder = if has_data {
80 self.require_master_key()?
81 } else {
82 crypto::ZeroizeOnDrop([0u8; 32])
83 };
84 let master_key: &[u8; 32] = &key_holder;
85 let wire_changes = changes
86 .into_iter()
87 .map(|c| Self::encrypt_change_with_key(c, master_key))
88 .collect::<Result<Vec<_>>>()?;
89
90 let body = Bytes::from(serde_json::to_vec(&WirePushRequest {
91 device_id,
92 changes: wire_changes,
93 })?);
94
95 let resp = self
96 .retry_request(|| {
97 let req = self
98 .http
99 .post(&self.endpoints.push)
100 .bearer_auth(&token)
101 .header("content-type", "application/json")
102 .body(body.clone());
103 async move { check_response(req.send().await?).await }
104 })
105 .await?;
106
107 let push_resp: PushResponse = resp.json().await?;
108 Ok(push_resp.cursor)
109 }
110
111 /// Pull changes from the server since the given cursor.
112 /// Decrypts `data` fields automatically.
113 /// Returns (changes, new_cursor, has_more).
114 ///
115 /// Retries on transient failures (network errors, 5xx, 429) with exponential backoff.
116 #[instrument(skip(self))]
117 pub async fn pull(
118 &self,
119 device_id: Uuid,
120 cursor: i64,
121 ) -> Result<(Vec<ChangeEntry>, i64, bool)> {
122 let token = self.require_token()?;
123
124 let body = Bytes::from(serde_json::to_vec(&PullRequest { device_id, cursor })?);
125
126 let resp = self
127 .retry_request(|| {
128 let req = self
129 .http
130 .post(&self.endpoints.pull)
131 .bearer_auth(&token)
132 .header("content-type", "application/json")
133 .body(body.clone());
134 async move { check_response(req.send().await?).await }
135 })
136 .await?;
137
138 let pull_resp: PullResponse = resp.json().await?;
139
140 // Extract key once for the entire batch (only needed if any entry has data)
141 let has_data = pull_resp.changes.iter().any(|c| c.data.is_some());
142 let key_holder = if has_data {
143 self.require_master_key()?
144 } else {
145 crypto::ZeroizeOnDrop([0u8; 32])
146 };
147 let master_key: &[u8; 32] = &key_holder;
148 let changes = pull_resp
149 .changes
150 .into_iter()
151 .map(|c| Self::decrypt_change_with_key(c, master_key))
152 .collect::<Result<Vec<_>>>()?;
153
154 Ok((changes, pull_resp.cursor, pull_resp.has_more))
155 }
156
157 /// Pull changes from the server with optional table and timestamp filters.
158 /// Decrypts `data` fields automatically.
159 /// Returns (changes, new_cursor, has_more).
160 ///
161 /// Identical to [`pull`](SyncKitClient::pull) when the filter is empty/default.
162 #[instrument(skip(self, filter))]
163 pub async fn pull_filtered(
164 &self,
165 device_id: Uuid,
166 cursor: i64,
167 filter: PullFilter,
168 ) -> Result<(Vec<ChangeEntry>, i64, bool)> {
169 let token = self.require_token()?;
170
171 let body = Bytes::from(serde_json::to_vec(&FilteredPullRequest {
172 device_id,
173 cursor,
174 tables: filter.tables,
175 since: filter.since,
176 })?);
177
178 let resp = self
179 .retry_request(|| {
180 let req = self
181 .http
182 .post(&self.endpoints.pull)
183 .bearer_auth(&token)
184 .header("content-type", "application/json")
185 .body(body.clone());
186 async move { check_response(req.send().await?).await }
187 })
188 .await?;
189
190 let pull_resp: PullResponse = resp.json().await?;
191
192 let has_data = pull_resp.changes.iter().any(|c| c.data.is_some());
193 let key_holder = if has_data {
194 self.require_master_key()?
195 } else {
196 crypto::ZeroizeOnDrop([0u8; 32])
197 };
198 let master_key: &[u8; 32] = &key_holder;
199 let changes = pull_resp
200 .changes
201 .into_iter()
202 .map(|c| Self::decrypt_change_with_key(c, master_key))
203 .collect::<Result<Vec<_>>>()?;
204
205 Ok((changes, pull_resp.cursor, pull_resp.has_more))
206 }
207
208 /// Pull changes from the server, preserving `device_id` and `seq` metadata.
209 ///
210 /// Same HTTP call and decryption as [`pull`](SyncKitClient::pull), but returns
211 /// [`PulledChange`] wrappers that retain server metadata needed for conflict
212 /// detection. Returns (changes, new_cursor, has_more).
213 #[instrument(skip(self))]
214 pub async fn pull_rich(
215 &self,
216 device_id: Uuid,
217 cursor: i64,
218 ) -> Result<(Vec<PulledChange>, i64, bool)> {
219 let token = self.require_token()?;
220
221 let body = Bytes::from(serde_json::to_vec(&PullRequest { device_id, cursor })?);
222
223 let resp = self
224 .retry_request(|| {
225 let req = self
226 .http
227 .post(&self.endpoints.pull)
228 .bearer_auth(&token)
229 .header("content-type", "application/json")
230 .body(body.clone());
231 async move { check_response(req.send().await?).await }
232 })
233 .await?;
234
235 let pull_resp: PullResponse = resp.json().await?;
236
237 let has_data = pull_resp.changes.iter().any(|c| c.data.is_some());
238 let key_holder = if has_data {
239 self.require_master_key()?
240 } else {
241 crypto::ZeroizeOnDrop([0u8; 32])
242 };
243 let master_key: &[u8; 32] = &key_holder;
244 let changes = pull_resp
245 .changes
246 .into_iter()
247 .map(|c| Self::decrypt_change_to_pulled(c, master_key))
248 .collect::<Result<Vec<_>>>()?;
249
250 Ok((changes, pull_resp.cursor, pull_resp.has_more))
251 }
252
253 /// Pull changes with filters, preserving `device_id` and `seq` metadata.
254 ///
255 /// Same as [`pull_rich`](SyncKitClient::pull_rich) but with table/timestamp
256 /// filtering support. Returns (changes, new_cursor, has_more).
257 #[instrument(skip(self, filter))]
258 pub async fn pull_filtered_rich(
259 &self,
260 device_id: Uuid,
261 cursor: i64,
262 filter: PullFilter,
263 ) -> Result<(Vec<PulledChange>, i64, bool)> {
264 let token = self.require_token()?;
265
266 let body = Bytes::from(serde_json::to_vec(&FilteredPullRequest {
267 device_id,
268 cursor,
269 tables: filter.tables,
270 since: filter.since,
271 })?);
272
273 let resp = self
274 .retry_request(|| {
275 let req = self
276 .http
277 .post(&self.endpoints.pull)
278 .bearer_auth(&token)
279 .header("content-type", "application/json")
280 .body(body.clone());
281 async move { check_response(req.send().await?).await }
282 })
283 .await?;
284
285 let pull_resp: PullResponse = resp.json().await?;
286
287 let has_data = pull_resp.changes.iter().any(|c| c.data.is_some());
288 let key_holder = if has_data {
289 self.require_master_key()?
290 } else {
291 crypto::ZeroizeOnDrop([0u8; 32])
292 };
293 let master_key: &[u8; 32] = &key_holder;
294 let changes = pull_resp
295 .changes
296 .into_iter()
297 .map(|c| Self::decrypt_change_to_pulled(c, master_key))
298 .collect::<Result<Vec<_>>>()?;
299
300 Ok((changes, pull_resp.cursor, pull_resp.has_more))
301 }
302
303 /// Get sync status (total changes, latest cursor).
304 #[instrument(skip(self))]
305 pub async fn status(&self) -> Result<SyncStatus> {
306 let token = self.require_token()?;
307
308 let resp = self
309 .retry_request(|| {
310 let req = self.http.get(&self.endpoints.status).bearer_auth(&token);
311 async move { check_response(req.send().await?).await }
312 })
313 .await?;
314 Ok(resp.json().await?)
315 }
316 }
317
318 #[cfg(test)]
319 mod tests {
320 use chrono::Utc;
321 use uuid::Uuid;
322
323 use crate::types::*;
324
325 // ── Type serialization / deserialization ──
326
327 #[test]
328 fn change_entry_serialization_roundtrip() {
329 let entry = ChangeEntry {
330 table: "tasks".to_string(),
331 op: ChangeOp::Insert,
332 row_id: Uuid::new_v4().to_string(),
333 timestamp: Utc::now(),
334 data: Some(serde_json::json!({"title": "Test task", "done": false})),
335 };
336
337 let json = serde_json::to_string(&entry).unwrap();
338 let deserialized: ChangeEntry = serde_json::from_str(&json).unwrap();
339
340 assert_eq!(deserialized.table, entry.table);
341 assert_eq!(deserialized.op, entry.op);
342 assert_eq!(deserialized.row_id, entry.row_id);
343 assert_eq!(deserialized.data, entry.data);
344 }
345
346 #[test]
347 fn change_entry_with_none_data_omits_field() {
348 let entry = ChangeEntry {
349 table: "tasks".to_string(),
350 op: ChangeOp::Delete,
351 row_id: "abc-123".to_string(),
352 timestamp: Utc::now(),
353 data: None,
354 };
355
356 let json = serde_json::to_string(&entry).unwrap();
357 assert!(!json.contains("\"data\""));
358 }
359
360 #[test]
361 fn change_entry_deserialization_with_missing_data() {
362 let json = r#"{
363 "table": "events",
364 "op": "DELETE",
365 "row_id": "evt-1",
366 "timestamp": "2025-01-15T10:00:00Z"
367 }"#;
368
369 let entry: ChangeEntry = serde_json::from_str(json).unwrap();
370 assert_eq!(entry.table, "events");
371 assert_eq!(entry.op, ChangeOp::Delete);
372 assert!(entry.data.is_none());
373 }
374
375 #[test]
376 fn device_serialization_roundtrip() {
377 let device = Device {
378 id: Uuid::new_v4(),
379 app_id: Uuid::new_v4(),
380 user_id: Uuid::new_v4(),
381 device_name: "MacBook Pro".to_string(),
382 platform: "macos".to_string(),
383 last_seen_at: Utc::now(),
384 created_at: Utc::now(),
385 };
386
387 let json = serde_json::to_string(&device).unwrap();
388 let deserialized: Device = serde_json::from_str(&json).unwrap();
389
390 assert_eq!(deserialized.id, device.id);
391 assert_eq!(deserialized.device_name, device.device_name);
392 assert_eq!(deserialized.platform, device.platform);
393 }
394
395 #[test]
396 fn sync_status_deserialization() {
397 let json = r#"{"total_changes": 42, "latest_cursor": 100}"#;
398 let status: SyncStatus = serde_json::from_str(json).unwrap();
399 assert_eq!(status.total_changes, 42);
400 assert_eq!(status.latest_cursor, Some(100));
401 }
402
403 #[test]
404 fn sync_status_with_null_cursor() {
405 let json = r#"{"total_changes": 0, "latest_cursor": null}"#;
406 let status: SyncStatus = serde_json::from_str(json).unwrap();
407 assert_eq!(status.total_changes, 0);
408 assert_eq!(status.latest_cursor, None);
409 }
410
411 #[test]
412 fn register_device_request_serialization() {
413 let req = RegisterDeviceRequest {
414 device_name: "iPhone 15".to_string(),
415 platform: "ios".to_string(),
416 };
417
418 let json = serde_json::to_string(&req).unwrap();
419 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
420 assert_eq!(parsed["device_name"], "iPhone 15");
421 assert_eq!(parsed["platform"], "ios");
422 }
423
424 // ── Wire types ──
425
426 #[test]
427 fn wire_push_request_serialization() {
428 let device_id = Uuid::new_v4();
429 let req = WirePushRequest {
430 device_id,
431 changes: vec![WireChangeEntry {
432 table: "tasks".to_string(),
433 op: ChangeOp::Insert,
434 row_id: "r1".to_string(),
435 timestamp: Utc::now(),
436 data: Some(serde_json::json!("encrypted-blob")),
437 }],
438 };
439
440 let json = serde_json::to_string(&req).unwrap();
441 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
442 assert_eq!(parsed["device_id"], device_id.to_string());
443 assert_eq!(parsed["changes"].as_array().unwrap().len(), 1);
444 }
445
446 #[test]
447 fn pull_request_serialization() {
448 let device_id = Uuid::new_v4();
449 let req = PullRequest {
450 device_id,
451 cursor: 42,
452 };
453
454 let json = serde_json::to_string(&req).unwrap();
455 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
456 assert_eq!(parsed["device_id"], device_id.to_string());
457 assert_eq!(parsed["cursor"], 42);
458 }
459
460 #[test]
461 fn pull_response_deserialization() {
462 let device_id = Uuid::new_v4();
463 let json = format!(
464 r#"{{
465 "changes": [
466 {{
467 "seq": 1,
468 "device_id": "{}",
469 "table": "tasks",
470 "op": "INSERT",
471 "row_id": "r1",
472 "timestamp": "2025-06-01T12:00:00Z",
473 "data": "encrypted"
474 }}
475 ],
476 "cursor": 5,
477 "has_more": true
478 }}"#,
479 device_id
480 );
481
482 let resp: PullResponse = serde_json::from_str(&json).unwrap();
483 assert_eq!(resp.changes.len(), 1);
484 assert_eq!(resp.cursor, 5);
485 assert!(resp.has_more);
486 assert_eq!(resp.changes[0].seq, 1);
487 assert_eq!(resp.changes[0].table, "tasks");
488 }
489
490 #[test]
491 fn pull_response_empty_changes() {
492 let json = r#"{"changes": [], "cursor": 0, "has_more": false}"#;
493 let resp: PullResponse = serde_json::from_str(json).unwrap();
494 assert!(resp.changes.is_empty());
495 assert_eq!(resp.cursor, 0);
496 assert!(!resp.has_more);
497 }
498
499 #[test]
500 fn push_response_deserialization() {
501 let json = r#"{"cursor": 99}"#;
502 let resp: PushResponse = serde_json::from_str(json).unwrap();
503 assert_eq!(resp.cursor, 99);
504 }
505
506 // ── ChangeOp display and parsing ──
507
508 #[test]
509 fn change_op_display() {
510 assert_eq!(ChangeOp::Insert.to_string(), "INSERT");
511 assert_eq!(ChangeOp::Update.to_string(), "UPDATE");
512 assert_eq!(ChangeOp::Delete.to_string(), "DELETE");
513 }
514
515 #[test]
516 fn change_op_from_str_valid() {
517 assert_eq!(ChangeOp::from_str_opt("INSERT"), Some(ChangeOp::Insert));
518 assert_eq!(ChangeOp::from_str_opt("UPDATE"), Some(ChangeOp::Update));
519 assert_eq!(ChangeOp::from_str_opt("DELETE"), Some(ChangeOp::Delete));
520 }
521
522 #[test]
523 fn change_op_from_str_invalid() {
524 assert_eq!(ChangeOp::from_str_opt("insert"), None);
525 assert_eq!(ChangeOp::from_str_opt("UPSERT"), None);
526 assert_eq!(ChangeOp::from_str_opt(""), None);
527 }
528
529 // ── Malformed response types ──
530
531 #[test]
532 fn pull_response_missing_changes_fails() {
533 let json = r#"{"cursor": 0, "has_more": false}"#;
534 assert!(serde_json::from_str::<PullResponse>(json).is_err());
535 }
536
537 #[test]
538 fn pull_response_missing_cursor_fails() {
539 let json = r#"{"changes": [], "has_more": false}"#;
540 assert!(serde_json::from_str::<PullResponse>(json).is_err());
541 }
542
543 #[test]
544 fn push_response_missing_cursor_fails() {
545 let json = r#"{}"#;
546 assert!(serde_json::from_str::<PushResponse>(json).is_err());
547 }
548 }
549