use base64::Engine; use crate::{ crypto, error::{Result, SyncKitError}, types::*, }; use super::{BASE_DELAY, MAX_RETRIES, SyncKitClient}; impl SyncKitClient { /// Retry an async HTTP operation with exponential backoff. /// /// Retries on transient errors (network failures, 5xx, 429) up to [`MAX_RETRIES`] /// times with delays of 1s, 2s, 4s. Returns the last error if all attempts fail. /// Client errors (4xx except 429) are considered permanent and returned immediately. pub(super) async fn retry_request(&self, mut operation: F) -> Result where F: FnMut() -> Fut, Fut: std::future::Future>, { let mut last_err = None; for attempt in 0..=MAX_RETRIES { match operation().await { Ok(resp) => return Ok(resp), Err(err) => { if !is_transient(&err) { return Err(err); } if attempt < MAX_RETRIES { let delay = BASE_DELAY * 2u32.pow(attempt); tracing::debug!( attempt = attempt + 1, max_retries = MAX_RETRIES, delay_ms = delay.as_millis() as u64, error = %err, "Transient error, retrying after backoff", ); tokio::time::sleep(delay).await; } last_err = Some(err); } } } Err(last_err.expect("loop ran at least once")) } /// Encrypt the data field of a change entry for the wire. #[cfg(test)] pub(super) fn encrypt_change(&self, entry: ChangeEntry) -> Result { if entry.data.is_some() { let master_key = self.require_master_key()?; Self::encrypt_change_with_key(entry, &master_key) } else { Self::encrypt_change_with_key(entry, &[0u8; 32]) } } /// Encrypt with a pre-loaded key. Used by `push()` to avoid per-entry lock acquisition. pub(super) fn encrypt_change_with_key(entry: ChangeEntry, master_key: &[u8; 32]) -> Result { let encrypted_data = match entry.data { Some(ref value) => Some(crypto::encrypt_json(value, master_key)?), None => None, }; Ok(WireChangeEntry { table: entry.table, op: entry.op, row_id: entry.row_id, timestamp: entry.timestamp, data: encrypted_data, }) } /// Decrypt the data field of a pulled change entry. #[cfg(test)] pub(super) fn decrypt_change(&self, entry: PullChangeEntry) -> Result { if entry.data.is_some() { let master_key = self.require_master_key()?; Self::decrypt_change_with_key(entry, &master_key) } else { Self::decrypt_change_with_key(entry, &[0u8; 32]) } } /// Decrypt with a pre-loaded key, preserving `device_id` and `seq` in a [`PulledChange`]. /// /// Used by `pull_rich()` to produce conflict-detection-ready results. pub(super) fn decrypt_change_to_pulled(entry: PullChangeEntry, master_key: &[u8; 32]) -> Result { let device_id = entry.device_id; let seq = entry.seq; let decrypted = Self::decrypt_change_with_key(entry, master_key)?; Ok(crate::types::PulledChange { entry: decrypted, device_id, seq, }) } /// Decrypt with a pre-loaded key. Used by `pull()` to avoid per-entry lock acquisition. pub(super) fn decrypt_change_with_key(entry: PullChangeEntry, master_key: &[u8; 32]) -> Result { let decrypted_data = match entry.data { Some(ref value) => Some(crypto::decrypt_json(value, master_key)?), None => None, }; Ok(ChangeEntry { table: entry.table, op: entry.op, row_id: entry.row_id, timestamp: entry.timestamp, data: decrypted_data, }) } } /// Extract the `exp` claim from a JWT without verifying the signature. /// /// JWTs are `header.payload.signature` where the payload is base64url-encoded JSON. /// We decode the payload segment and read the `exp` field. Returns `None` if /// the token is malformed or `exp` is missing. pub(super) fn jwt_exp(token: &str) -> Option { let payload = token.split('.').nth(1)?; let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD .decode(payload) .ok()?; let claims: serde_json::Value = serde_json::from_slice(&bytes).ok()?; claims["exp"].as_i64() } /// Returns `true` if the token's `exp` claim is within [`TOKEN_EXPIRY_BUFFER_SECS`] /// of the current time (or already past). Returns `false` if the token cannot /// be decoded — in that case, let the server decide. #[cfg(test)] pub(super) fn token_is_expired(token: &str) -> bool { let Some(exp) = jwt_exp(token) else { return false; }; let now = chrono::Utc::now().timestamp(); now >= exp - super::TOKEN_EXPIRY_BUFFER_SECS } /// Check an HTTP response for errors, returning the response on success. pub(super) async fn check_response(resp: reqwest::Response) -> Result { let status = resp.status().as_u16(); if status >= 400 { let message = resp.text().await.unwrap_or_default(); return Err(SyncKitError::Server { status, message }); } Ok(resp) } /// Returns true if the error is transient and worth retrying. /// /// Transient errors: /// - Network-level failures (connection refused, timeout, DNS, etc.) /// - Server errors (5xx) /// - Rate limiting (429) /// /// Permanent errors (not retried): /// - Client errors (4xx except 429) — bad request, auth failure, not found, etc. /// - Serialization errors, encryption errors, missing session, etc. pub(super) fn is_transient(err: &SyncKitError) -> bool { match err { SyncKitError::Http(e) => { // All reqwest transport errors are transient (timeout, connect, DNS, etc.) // except for builder errors which indicate programming mistakes. !e.is_builder() } SyncKitError::Server { status, .. } => { // 5xx = server error (transient), 429 = rate limited (transient) *status >= 500 || *status == 429 } // Everything else (auth, crypto, serialization) is permanent _ => false, } } #[cfg(test)] mod tests { use super::*; use base64::Engine; use chrono::Utc; use std::time::Duration; use super::super::TOKEN_EXPIRY_BUFFER_SECS; fn test_config() -> super::super::SyncKitConfig { super::super::SyncKitConfig { server_url: "https://example.com".to_string(), api_key: "test-api-key-123".to_string(), } } /// Build a fake JWT with the given `exp` claim (no real signature). fn fake_jwt(exp: i64) -> String { let header = base64::engine::general_purpose::URL_SAFE_NO_PAD .encode(r#"{"alg":"HS256","typ":"JWT"}"#); let payload_json = serde_json::json!({ "sub": "550e8400-e29b-41d4-a716-446655440000", "app": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", "exp": exp, "iat": exp - 3600, }); let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD .encode(payload_json.to_string().as_bytes()); let signature = base64::engine::general_purpose::URL_SAFE_NO_PAD .encode(b"fake-signature"); format!("{header}.{payload}.{signature}") } // ── encrypt_change / decrypt_change ── #[test] fn encrypt_change_with_no_data() { let client = SyncKitClient::new(test_config()); let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Delete, row_id: "row-1".to_string(), timestamp: Utc::now(), data: None, }; let wire = client.encrypt_change(entry.clone()).unwrap(); assert_eq!(wire.table, "tasks"); assert_eq!(wire.op, ChangeOp::Delete); assert_eq!(wire.row_id, "row-1"); assert!(wire.data.is_none()); } #[test] fn encrypt_change_fails_without_master_key() { let client = SyncKitClient::new(test_config()); let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "row-1".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!({"title": "test"})), }; let err = client.encrypt_change(entry).unwrap_err(); assert!(matches!(err, SyncKitError::NoMasterKey)); } #[test] fn encrypt_change_produces_encrypted_data() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); *client.master_key.write() = Some(crypto::ZeroizeOnDrop(key)); let original_data = serde_json::json!({"title": "Buy milk", "priority": 3}); let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "row-1".to_string(), timestamp: Utc::now(), data: Some(original_data.clone()), }; let wire = client.encrypt_change(entry).unwrap(); assert!(wire.data.is_some()); let encrypted = wire.data.unwrap(); assert!(encrypted.is_string()); assert_ne!(encrypted, original_data); } #[test] fn encrypt_decrypt_roundtrip() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); *client.master_key.write() = Some(crypto::ZeroizeOnDrop(key)); let original_data = serde_json::json!({ "title": "Buy milk", "tags": ["groceries", "urgent"], "count": 42 }); let ts = Utc::now(); let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Update, row_id: "row-abc".to_string(), timestamp: ts, data: Some(original_data.clone()), }; let wire = client.encrypt_change(entry).unwrap(); let pull_entry = PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: wire.table, op: wire.op, row_id: wire.row_id, timestamp: wire.timestamp, data: wire.data, }; let decrypted = client.decrypt_change(pull_entry).unwrap(); assert_eq!(decrypted.table, "tasks"); assert_eq!(decrypted.op, ChangeOp::Update); assert_eq!(decrypted.row_id, "row-abc"); assert_eq!(decrypted.data.unwrap(), original_data); } #[test] fn decrypt_change_with_no_data() { let client = SyncKitClient::new(test_config()); let pull_entry = PullChangeEntry { seq: 5, device_id: uuid::Uuid::new_v4(), table: "events".to_string(), op: ChangeOp::Delete, row_id: "evt-1".to_string(), timestamp: Utc::now(), data: None, }; let decrypted = client.decrypt_change(pull_entry).unwrap(); assert_eq!(decrypted.table, "events"); assert_eq!(decrypted.op, ChangeOp::Delete); assert!(decrypted.data.is_none()); } #[test] fn decrypt_change_fails_without_master_key() { let client = SyncKitClient::new(test_config()); let pull_entry = PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "row-1".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!("some-encrypted-string")), }; let err = client.decrypt_change(pull_entry).unwrap_err(); assert!(matches!(err, SyncKitError::NoMasterKey)); } // ── is_transient error classification ── #[test] fn is_transient_server_5xx() { let err = SyncKitError::Server { status: 500, message: "Internal Server Error".to_string() }; assert!(is_transient(&err)); let err = SyncKitError::Server { status: 502, message: "Bad Gateway".to_string() }; assert!(is_transient(&err)); let err = SyncKitError::Server { status: 503, message: "Service Unavailable".to_string() }; assert!(is_transient(&err)); let err = SyncKitError::Server { status: 504, message: "Gateway Timeout".to_string() }; assert!(is_transient(&err)); } #[test] fn is_transient_rate_limited_429() { let err = SyncKitError::Server { status: 429, message: "Too Many Requests".to_string() }; assert!(is_transient(&err)); } #[test] fn is_not_transient_client_4xx() { let err = SyncKitError::Server { status: 400, message: "Bad Request".to_string() }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 401, message: "Unauthorized".to_string() }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 403, message: "Forbidden".to_string() }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 404, message: "Not Found".to_string() }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 409, message: "Conflict".to_string() }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 422, message: "Unprocessable Entity".to_string() }; assert!(!is_transient(&err)); } #[test] fn is_not_transient_not_authenticated() { assert!(!is_transient(&SyncKitError::NotAuthenticated)); } #[test] fn is_not_transient_no_master_key() { assert!(!is_transient(&SyncKitError::NoMasterKey)); } #[test] fn is_not_transient_decryption_failed() { assert!(!is_transient(&SyncKitError::DecryptionFailed)); } #[test] fn is_not_transient_invalid_envelope() { assert!(!is_transient(&SyncKitError::InvalidEnvelope("bad version".to_string()))); } #[test] fn is_not_transient_crypto() { assert!(!is_transient(&SyncKitError::Crypto("encrypt failed".to_string()))); } #[test] fn is_not_transient_json() { let err: SyncKitError = serde_json::from_str::("not json") .unwrap_err() .into(); assert!(!is_transient(&err)); } #[test] fn is_not_transient_base64() { let err: SyncKitError = base64::engine::general_purpose::STANDARD .decode("!!!invalid!!!") .unwrap_err() .into(); assert!(!is_transient(&err)); } #[test] fn is_not_transient_token_expired() { assert!(!is_transient(&SyncKitError::TokenExpired)); } #[test] fn is_not_transient_internal() { assert!(!is_transient(&SyncKitError::Internal("lock poisoned".to_string()))); } // ── Retry constants ── #[test] fn retry_constants_are_sensible() { assert_eq!(MAX_RETRIES, 3); assert_eq!(BASE_DELAY, Duration::from_secs(1)); } #[test] fn backoff_delays_are_exponential() { let delay_0 = BASE_DELAY * 2u32.pow(0); let delay_1 = BASE_DELAY * 2u32.pow(1); let delay_2 = BASE_DELAY * 2u32.pow(2); assert_eq!(delay_0, Duration::from_secs(1)); assert_eq!(delay_1, Duration::from_secs(2)); assert_eq!(delay_2, Duration::from_secs(4)); } // ── is_transient boundary: 429 vs 428, 499 vs 500 ── #[test] fn is_transient_boundary_values() { assert!(!is_transient(&SyncKitError::Server { status: 428, message: String::new() })); assert!(is_transient(&SyncKitError::Server { status: 429, message: String::new() })); assert!(!is_transient(&SyncKitError::Server { status: 430, message: String::new() })); assert!(!is_transient(&SyncKitError::Server { status: 499, message: String::new() })); assert!(is_transient(&SyncKitError::Server { status: 500, message: String::new() })); } // ── Token expiry detection ── #[test] fn jwt_exp_extracts_expiry() { let exp = Utc::now().timestamp() + 3600; let token = fake_jwt(exp); assert_eq!(jwt_exp(&token), Some(exp)); } #[test] fn jwt_exp_returns_none_for_garbage() { assert_eq!(jwt_exp("not-a-jwt"), None); assert_eq!(jwt_exp("a.b.c"), None); assert_eq!(jwt_exp(""), None); } #[test] fn token_is_expired_for_past_exp() { let token = fake_jwt(Utc::now().timestamp() - 3600); assert!(token_is_expired(&token)); } #[test] fn token_is_expired_within_buffer() { let token = fake_jwt(Utc::now().timestamp() + 10); assert!(token_is_expired(&token)); } #[test] fn token_is_not_expired_when_fresh() { let token = fake_jwt(Utc::now().timestamp() + 3600); assert!(!token_is_expired(&token)); } #[test] fn token_is_not_expired_for_garbage() { assert!(!token_is_expired("garbage")); } #[test] fn token_expires_exactly_at_buffer_boundary() { let token = fake_jwt(Utc::now().timestamp() + TOKEN_EXPIRY_BUFFER_SECS); assert!(token_is_expired(&token)); } #[test] fn token_expires_just_past_buffer() { let token = fake_jwt(Utc::now().timestamp() + TOKEN_EXPIRY_BUFFER_SECS + 1); assert!(!token_is_expired(&token)); } // ── encrypt_change preserves metadata ── #[test] fn encrypt_change_preserves_all_metadata() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); client.set_master_key_raw(key); let ts = Utc::now(); let entry = ChangeEntry { table: "contacts".to_string(), op: ChangeOp::Update, row_id: "unique-row-id".to_string(), timestamp: ts, data: Some(serde_json::json!({"name": "Alice"})), }; let wire = client.encrypt_change(entry).unwrap(); assert_eq!(wire.table, "contacts"); assert_eq!(wire.op, ChangeOp::Update); assert_eq!(wire.row_id, "unique-row-id"); assert_eq!(wire.timestamp, ts); } // ── Multiple entries encrypt/decrypt ── #[test] fn multiple_entries_encrypt_decrypt_roundtrip() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); client.set_master_key_raw(key); let entries = [ ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "r1".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!({"title": "Task 1"})), }, ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Update, row_id: "r2".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!({"title": "Task 2", "done": true})), }, ChangeEntry { table: "events".to_string(), op: ChangeOp::Delete, row_id: "r3".to_string(), timestamp: Utc::now(), data: None, }, ]; let wire_entries: Vec<_> = entries .iter() .cloned() .map(|e| client.encrypt_change(e).unwrap()) .collect(); assert_eq!(wire_entries.len(), 3); assert!(wire_entries[0].data.is_some()); assert!(wire_entries[1].data.is_some()); assert!(wire_entries[2].data.is_none()); for (i, wire) in wire_entries.into_iter().enumerate() { let pull = PullChangeEntry { seq: i as i64, device_id: uuid::Uuid::new_v4(), table: wire.table, op: wire.op, row_id: wire.row_id, timestamp: wire.timestamp, data: wire.data, }; let decrypted = client.decrypt_change(pull).unwrap(); assert_eq!(decrypted.table, entries[i].table); assert_eq!(decrypted.op, entries[i].op); assert_eq!(decrypted.data, entries[i].data); } } // ── Unicode and edge-case roundtrips ── #[test] fn encrypt_decrypt_roundtrip_unicode_table() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); client.set_master_key_raw(key); let entry = ChangeEntry { table: "\u{65E5}\u{672C}\u{8A9E}\u{30C6}\u{30FC}\u{30D6}\u{30EB}".into(), op: ChangeOp::Insert, row_id: "row-1".into(), timestamp: Utc::now(), data: Some(serde_json::json!({"name": "\u{30C6}\u{30B9}\u{30C8}"})), }; let wire = client.encrypt_change(entry).unwrap(); let pull = PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: wire.table, op: wire.op, row_id: wire.row_id, timestamp: wire.timestamp, data: wire.data, }; let decrypted = client.decrypt_change(pull).unwrap(); assert_eq!( decrypted.table, "\u{65E5}\u{672C}\u{8A9E}\u{30C6}\u{30FC}\u{30D6}\u{30EB}" ); } #[test] fn encrypt_decrypt_roundtrip_empty_row_id() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); client.set_master_key_raw(key); let entry = ChangeEntry { table: "t".into(), op: ChangeOp::Insert, row_id: "".into(), timestamp: Utc::now(), data: Some(serde_json::json!(42)), }; let wire = client.encrypt_change(entry).unwrap(); let pull = PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: wire.table, op: wire.op, row_id: wire.row_id, timestamp: wire.timestamp, data: wire.data, }; let decrypted = client.decrypt_change(pull).unwrap(); assert_eq!(decrypted.row_id, ""); assert_eq!(decrypted.data.unwrap(), serde_json::json!(42)); } }