use bytes::Bytes; use tracing::instrument; use uuid::Uuid; use crate::{ crypto, error::Result, types::*, }; use super::SyncKitClient; use super::helpers::check_response; impl SyncKitClient { // ── Devices ── /// Register a device for sync. /// /// If a device with the same name already exists for this user/app, the /// server upserts: it updates the existing device's platform and /// `last_seen_at` rather than creating a duplicate. #[instrument(skip(self))] pub async fn register_device( &self, device_name: &str, platform: &str, ) -> Result { let token = self.require_token()?; let body = Bytes::from(serde_json::to_vec(&RegisterDeviceRequest { device_name: device_name.to_string(), platform: platform.to_string(), })?); self.retry_request_json(|| { let req = self .http .post(&self.endpoints.devices) .bearer_auth(&token) .header("content-type", "application/json") .body(body.clone()); async move { check_response(req.send().await?).await } }) .await } /// List all devices for the current user. #[instrument(skip(self))] pub async fn list_devices(&self) -> Result> { let token = self.require_token()?; self.retry_request_json(|| { let req = self.http.get(&self.endpoints.devices).bearer_auth(&token); async move { check_response(req.send().await?).await } }) .await } // ── Push / Pull ── /// Push changes to the server. Encrypts `data` fields automatically. /// Returns the server cursor after the push. /// /// Retries on transient failures (network errors, 5xx, 429) with exponential backoff. #[instrument(skip(self, changes))] pub async fn push( &self, device_id: Uuid, changes: Vec, ) -> Result { let token = self.require_token()?; // Extract key once for the entire batch (only needed if any entry has data) let has_data = changes.iter().any(|c| c.data.is_some()); let key_holder = if has_data { self.require_master_key()? } else { crypto::ZeroizeOnDrop([0u8; 32]) }; let master_key: &[u8; 32] = &key_holder; let wire_changes = changes .into_iter() .map(|c| Self::encrypt_change_with_key(c, master_key)) .collect::>>()?; let body = Bytes::from(serde_json::to_vec(&WirePushRequest { device_id, batch_id: Uuid::new_v4(), changes: wire_changes, })?); let push_resp: PushResponse = self .retry_request_json(|| { let req = self .http .post(&self.endpoints.push) .bearer_auth(&token) .header("content-type", "application/json") .body(body.clone()); async move { check_response(req.send().await?).await } }) .await?; Ok(push_resp.cursor) } /// Pull changes from the server since the given cursor. /// Decrypts `data` fields automatically. /// Returns (changes, new_cursor, has_more). /// /// Retries on transient failures (network errors, 5xx, 429) with exponential backoff. #[instrument(skip(self))] pub async fn pull( &self, device_id: Uuid, cursor: i64, ) -> Result<(Vec, i64, bool)> { let body = Bytes::from(serde_json::to_vec(&PullRequest { device_id, cursor })?); self.pull_inner(body, Self::decrypt_change_with_key).await } /// Pull changes from the server with optional table and timestamp filters. /// Decrypts `data` fields automatically. /// Returns (changes, new_cursor, has_more). /// /// Identical to [`pull`](SyncKitClient::pull) when the filter is empty/default. #[instrument(skip(self, filter))] pub async fn pull_filtered( &self, device_id: Uuid, cursor: i64, filter: PullFilter, ) -> Result<(Vec, i64, bool)> { let body = Bytes::from(serde_json::to_vec(&FilteredPullRequest { device_id, cursor, tables: filter.tables, since: filter.since, })?); self.pull_inner(body, Self::decrypt_change_with_key).await } /// Pull changes from the server, preserving `device_id` and `seq` metadata. /// /// Same HTTP call and decryption as [`pull`](SyncKitClient::pull), but returns /// [`PulledChange`] wrappers that retain server metadata needed for conflict /// detection. Returns (changes, new_cursor, has_more). #[instrument(skip(self))] pub async fn pull_rich( &self, device_id: Uuid, cursor: i64, ) -> Result<(Vec, i64, bool)> { let body = Bytes::from(serde_json::to_vec(&PullRequest { device_id, cursor })?); self.pull_inner(body, Self::decrypt_change_to_pulled).await } /// Pull changes with filters, preserving `device_id` and `seq` metadata. /// /// Same as [`pull_rich`](SyncKitClient::pull_rich) but with table/timestamp /// filtering support. Returns (changes, new_cursor, has_more). #[instrument(skip(self, filter))] pub async fn pull_filtered_rich( &self, device_id: Uuid, cursor: i64, filter: PullFilter, ) -> Result<(Vec, i64, bool)> { let body = Bytes::from(serde_json::to_vec(&FilteredPullRequest { device_id, cursor, tables: filter.tables, since: filter.since, })?); self.pull_inner(body, Self::decrypt_change_to_pulled).await } /// Shared pull implementation: sends the request, extracts the master key, /// and decrypts each change using the provided function. /// /// If a pending rotation key is cached on the client, entries are decrypted /// with key selection based on each entry's `key_id` field. async fn pull_inner( &self, body: Bytes, decrypt_fn: F, ) -> Result<(Vec, i64, bool)> where F: Fn(PullChangeEntry, &[u8; 32]) -> Result, { let token = self.require_token()?; let pull_resp: PullResponse = self .retry_request_json(|| { let req = self .http .post(&self.endpoints.pull) .bearer_auth(&token) .header("content-type", "application/json") .body(body.clone()); async move { check_response(req.send().await?).await } }) .await?; // Extract key once for the entire batch (only needed if any entry has data) let has_data = pull_resp.changes.iter().any(|c| c.data.is_some()); let key_holder = if has_data { self.require_master_key()? } else { crypto::ZeroizeOnDrop([0u8; 32]) }; let master_key: &[u8; 32] = &key_holder; // Check for pending rotation key (multi-key decryption) let pending_guard = self.pending_key.read(); let has_pending = pending_guard.is_some() && has_data; let changes = if has_pending { let pending = pending_guard.as_ref().unwrap(); let _primary_key_id = *self.master_key_id.read(); pull_resp .changes .into_iter() .map(|c| { let effective_key_id = c.key_id.unwrap_or(1); if effective_key_id == pending.key_id { decrypt_fn(c, &pending.key) } else { decrypt_fn(c, master_key) } }) .collect::>>()? } else { drop(pending_guard); pull_resp .changes .into_iter() .map(|c| decrypt_fn(c, master_key)) .collect::>>()? }; Ok((changes, pull_resp.cursor, pull_resp.has_more)) } /// Get sync status (total changes, latest cursor). #[instrument(skip(self))] pub async fn status(&self) -> Result { let token = self.require_token()?; self.retry_request_json(|| { let req = self.http.get(&self.endpoints.status).bearer_auth(&token); async move { check_response(req.send().await?).await } }) .await } } #[cfg(test)] mod tests { use chrono::Utc; use uuid::Uuid; use crate::types::*; // ── Type serialization / deserialization ── #[test] fn change_entry_serialization_roundtrip() { let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Insert, row_id: Uuid::new_v4().to_string(), timestamp: Utc::now(), data: Some(serde_json::json!({"title": "Test task", "done": false})), }; let json = serde_json::to_string(&entry).unwrap(); let deserialized: ChangeEntry = serde_json::from_str(&json).unwrap(); assert_eq!(deserialized.table, entry.table); assert_eq!(deserialized.op, entry.op); assert_eq!(deserialized.row_id, entry.row_id); assert_eq!(deserialized.data, entry.data); } #[test] fn change_entry_with_none_data_omits_field() { let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Delete, row_id: "abc-123".to_string(), timestamp: Utc::now(), data: None, }; let json = serde_json::to_string(&entry).unwrap(); assert!(!json.contains("\"data\"")); } #[test] fn change_entry_deserialization_with_missing_data() { let json = r#"{ "table": "events", "op": "DELETE", "row_id": "evt-1", "timestamp": "2025-01-15T10:00:00Z" }"#; let entry: ChangeEntry = serde_json::from_str(json).unwrap(); assert_eq!(entry.table, "events"); assert_eq!(entry.op, ChangeOp::Delete); assert!(entry.data.is_none()); } #[test] fn device_serialization_roundtrip() { let device = Device { id: Uuid::new_v4(), app_id: Uuid::new_v4(), user_id: Uuid::new_v4(), device_name: "MacBook Pro".to_string(), platform: "macos".to_string(), last_seen_at: Utc::now(), created_at: Utc::now(), }; let json = serde_json::to_string(&device).unwrap(); let deserialized: Device = serde_json::from_str(&json).unwrap(); assert_eq!(deserialized.id, device.id); assert_eq!(deserialized.device_name, device.device_name); assert_eq!(deserialized.platform, device.platform); } #[test] fn sync_status_deserialization() { let json = r#"{"total_changes": 42, "latest_cursor": 100}"#; let status: SyncStatus = serde_json::from_str(json).unwrap(); assert_eq!(status.total_changes, 42); assert_eq!(status.latest_cursor, Some(100)); } #[test] fn sync_status_with_null_cursor() { let json = r#"{"total_changes": 0, "latest_cursor": null}"#; let status: SyncStatus = serde_json::from_str(json).unwrap(); assert_eq!(status.total_changes, 0); assert_eq!(status.latest_cursor, None); } #[test] fn register_device_request_serialization() { let req = RegisterDeviceRequest { device_name: "iPhone 15".to_string(), platform: "ios".to_string(), }; let json = serde_json::to_string(&req).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["device_name"], "iPhone 15"); assert_eq!(parsed["platform"], "ios"); } // ── Wire types ── #[test] fn wire_push_request_serialization() { let device_id = Uuid::new_v4(); let req = WirePushRequest { device_id, batch_id: Uuid::new_v4(), changes: vec![WireChangeEntry { table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "r1".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!("encrypted-blob")), }], }; let json = serde_json::to_string(&req).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["device_id"], device_id.to_string()); assert_eq!(parsed["changes"].as_array().unwrap().len(), 1); } #[test] fn pull_request_serialization() { let device_id = Uuid::new_v4(); let req = PullRequest { device_id, cursor: 42, }; let json = serde_json::to_string(&req).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["device_id"], device_id.to_string()); assert_eq!(parsed["cursor"], 42); } #[test] fn pull_response_deserialization() { let device_id = Uuid::new_v4(); let json = format!( r#"{{ "changes": [ {{ "seq": 1, "device_id": "{}", "table": "tasks", "op": "INSERT", "row_id": "r1", "timestamp": "2025-06-01T12:00:00Z", "data": "encrypted" }} ], "cursor": 5, "has_more": true }}"#, device_id ); let resp: PullResponse = serde_json::from_str(&json).unwrap(); assert_eq!(resp.changes.len(), 1); assert_eq!(resp.cursor, 5); assert!(resp.has_more); assert_eq!(resp.changes[0].seq, 1); assert_eq!(resp.changes[0].table, "tasks"); } #[test] fn pull_response_empty_changes() { let json = r#"{"changes": [], "cursor": 0, "has_more": false}"#; let resp: PullResponse = serde_json::from_str(json).unwrap(); assert!(resp.changes.is_empty()); assert_eq!(resp.cursor, 0); assert!(!resp.has_more); } #[test] fn push_response_deserialization() { let json = r#"{"cursor": 99}"#; let resp: PushResponse = serde_json::from_str(json).unwrap(); assert_eq!(resp.cursor, 99); } // ── ChangeOp display and parsing ── #[test] fn change_op_display() { assert_eq!(ChangeOp::Insert.to_string(), "INSERT"); assert_eq!(ChangeOp::Update.to_string(), "UPDATE"); assert_eq!(ChangeOp::Delete.to_string(), "DELETE"); } #[test] fn change_op_from_str_valid() { assert_eq!(ChangeOp::from_str_opt("INSERT"), Some(ChangeOp::Insert)); assert_eq!(ChangeOp::from_str_opt("UPDATE"), Some(ChangeOp::Update)); assert_eq!(ChangeOp::from_str_opt("DELETE"), Some(ChangeOp::Delete)); } #[test] fn change_op_from_str_invalid() { assert_eq!(ChangeOp::from_str_opt("insert"), None); assert_eq!(ChangeOp::from_str_opt("UPSERT"), None); assert_eq!(ChangeOp::from_str_opt(""), None); } // ── Malformed response types ── #[test] fn pull_response_missing_changes_fails() { let json = r#"{"cursor": 0, "has_more": false}"#; assert!(serde_json::from_str::(json).is_err()); } #[test] fn pull_response_missing_cursor_fails() { let json = r#"{"changes": [], "has_more": false}"#; assert!(serde_json::from_str::(json).is_err()); } #[test] fn push_response_missing_cursor_fails() { let json = r#"{}"#; assert!(serde_json::from_str::(json).is_err()); } }