use base64::Engine; use crate::{ crypto, error::{Result, SyncKitError}, types::*, }; use super::{BASE_DELAY, MAX_RETRIES, SyncKitClient}; impl SyncKitClient { /// Retry an async HTTP operation with exponential backoff. /// /// Retries on transient errors (network failures, 5xx, 429) up to [`MAX_RETRIES`] /// times with delays of 1s, 2s, 4s. Returns the last error if all attempts fail. /// Client errors (4xx except 429) are considered permanent and returned immediately. pub(super) async fn retry_request(&self, mut operation: F) -> Result where F: FnMut() -> Fut, Fut: std::future::Future>, { let mut last_err = None; for attempt in 0..=MAX_RETRIES { match operation().await { Ok(resp) => return Ok(resp), Err(err) => { if !is_transient(&err) { return Err(err); } if attempt < MAX_RETRIES { let delay = retry_delay(&err, attempt); tracing::debug!( attempt = attempt + 1, max_retries = MAX_RETRIES, delay_ms = delay.as_millis() as u64, error = %err, "Transient error, retrying after backoff", ); tokio::time::sleep(delay).await; } last_err = Some(err); } } } Err(last_err.expect("loop ran at least once")) } /// Retry an HTTP operation and deserialize the JSON response body inside /// the retry loop. This ensures a transient body-read failure (truncated /// response, connection reset mid-body) is retried rather than surfacing /// as a permanent error after the server already committed the operation. pub(super) async fn retry_request_json(&self, mut operation: F) -> Result where F: FnMut() -> Fut, Fut: std::future::Future>, T: serde::de::DeserializeOwned, { let mut last_err = None; for attempt in 0..=MAX_RETRIES { match operation().await { Ok(resp) => { match resp.json::().await { Ok(parsed) => return Ok(parsed), Err(e) => { let err = SyncKitError::Http(e); if attempt < MAX_RETRIES { let delay = BASE_DELAY * 2u32.pow(attempt); tracing::debug!( attempt = attempt + 1, max_retries = MAX_RETRIES, delay_ms = delay.as_millis() as u64, error = %err, "Response body read failed, retrying", ); tokio::time::sleep(delay).await; } last_err = Some(err); } } } Err(err) => { if !is_transient(&err) { return Err(err); } if attempt < MAX_RETRIES { let delay = retry_delay(&err, attempt); tracing::debug!( attempt = attempt + 1, max_retries = MAX_RETRIES, delay_ms = delay.as_millis() as u64, error = %err, "Transient error, retrying after backoff", ); tokio::time::sleep(delay).await; } last_err = Some(err); } } } Err(last_err.expect("loop ran at least once")) } /// Encrypt the data field of a change entry for the wire. #[cfg(test)] pub(super) fn encrypt_change(&self, entry: ChangeEntry) -> Result { if entry.data.is_some() { let master_key = self.require_master_key()?; Self::encrypt_change_with_key(entry, &master_key) } else { Self::encrypt_change_no_data(entry) } } /// Build a wire entry for a change with no data (e.g. Delete). No key needed. #[cfg(test)] pub(super) fn encrypt_change_no_data(entry: ChangeEntry) -> Result { debug_assert!(entry.data.is_none()); Ok(WireChangeEntry { table: entry.table, op: entry.op, row_id: entry.row_id, timestamp: entry.timestamp, data: None, }) } /// Decrypt a pulled entry that has no data. No key needed. #[cfg(test)] pub(super) fn decrypt_change_no_data(entry: PullChangeEntry) -> Result { debug_assert!(entry.data.is_none()); Ok(ChangeEntry { table: entry.table, op: entry.op, row_id: entry.row_id, timestamp: entry.timestamp, data: None, }) } /// Encrypt with a pre-loaded key. Used by `push()` to avoid per-entry lock acquisition. pub(super) fn encrypt_change_with_key(entry: ChangeEntry, master_key: &[u8; 32]) -> Result { let encrypted_data = match entry.data { Some(ref value) => Some(crypto::encrypt_json(value, master_key)?), None => None, }; Ok(WireChangeEntry { table: entry.table, op: entry.op, row_id: entry.row_id, timestamp: entry.timestamp, data: encrypted_data, }) } /// Decrypt the data field of a pulled change entry. #[cfg(test)] pub(super) fn decrypt_change(&self, entry: PullChangeEntry) -> Result { if entry.data.is_some() { let master_key = self.require_master_key()?; Self::decrypt_change_with_key(entry, &master_key) } else { Self::decrypt_change_no_data(entry) } } /// Decrypt with a pre-loaded key, preserving `device_id` and `seq` in a [`PulledChange`]. /// /// Used by `pull_rich()` to produce conflict-detection-ready results. pub(super) fn decrypt_change_to_pulled(entry: PullChangeEntry, master_key: &[u8; 32]) -> Result { let device_id = entry.device_id; let seq = entry.seq; let decrypted = Self::decrypt_change_with_key(entry, master_key)?; Ok(crate::types::PulledChange { entry: decrypted, device_id, seq, }) } /// Decrypt with key selection based on key_id. Used during rotation when /// pulled entries may be encrypted with different keys. #[allow(dead_code)] pub(super) fn decrypt_change_multi_key( entry: PullChangeEntry, primary_key: &[u8; 32], primary_key_id: i32, pending_key: &[u8; 32], pending_key_id: i32, ) -> Result { let effective_key_id = entry.key_id.unwrap_or(1); let key = if effective_key_id == pending_key_id { pending_key } else if effective_key_id == primary_key_id || effective_key_id <= 1 { primary_key } else { // Unknown key_id — try primary, then pending match Self::decrypt_change_with_key_inner(&entry, primary_key) { Ok(result) => return Ok(result), Err(_) => pending_key, } }; Self::decrypt_change_with_key(entry, key) } /// Inner decryption that borrows the entry (for fallback logic). #[allow(dead_code)] fn decrypt_change_with_key_inner(entry: &PullChangeEntry, master_key: &[u8; 32]) -> Result { let decrypted_data = match entry.data { Some(ref value) => Some(crypto::decrypt_json(value, master_key)?), None => None, }; Ok(ChangeEntry { table: entry.table.clone(), op: entry.op, row_id: entry.row_id.clone(), timestamp: entry.timestamp, data: decrypted_data, }) } /// Decrypt with a pre-loaded key. Used by `pull()` to avoid per-entry lock acquisition. pub(super) fn decrypt_change_with_key(entry: PullChangeEntry, master_key: &[u8; 32]) -> Result { let decrypted_data = match entry.data { Some(ref value) => Some(crypto::decrypt_json(value, master_key)?), None => None, }; Ok(ChangeEntry { table: entry.table, op: entry.op, row_id: entry.row_id, timestamp: entry.timestamp, data: decrypted_data, }) } } /// Extract the `exp` claim from a JWT without verifying the signature. /// /// JWTs are `header.payload.signature` where the payload is base64url-encoded JSON. /// We decode the payload segment and read the `exp` field. Returns `None` if /// the token is malformed or `exp` is missing. pub(super) fn jwt_exp(token: &str) -> Option { let payload = token.split('.').nth(1)?; let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD .decode(payload) .ok()?; let claims: serde_json::Value = serde_json::from_slice(&bytes).ok()?; claims["exp"].as_i64() } /// Returns `true` if the token's `exp` claim is within [`TOKEN_EXPIRY_BUFFER_SECS`] /// of the current time (or already past). Returns `false` if the token cannot /// be decoded — in that case, let the server decide. #[cfg(test)] pub(super) fn token_is_expired(token: &str) -> bool { let Some(exp) = jwt_exp(token) else { return false; }; let now = chrono::Utc::now().timestamp(); now >= exp - super::TOKEN_EXPIRY_BUFFER_SECS } /// Check an HTTP response for errors, returning the response on success. /// /// For 429 responses, parses the `Retry-After` header so the retry loop /// can use it instead of the default exponential backoff delay. pub(super) async fn check_response(resp: reqwest::Response) -> Result { let status = resp.status().as_u16(); if status >= 400 { let retry_after_secs = parse_retry_after(&resp); let message = resp.text().await.unwrap_or_default(); return Err(SyncKitError::Server { status, message, retry_after_secs }); } Ok(resp) } /// Extract the `Retry-After` header from an HTTP response as seconds. /// Returns `None` if the header is absent, non-numeric, or zero. fn parse_retry_after(resp: &reqwest::Response) -> Option { resp.headers() .get("retry-after") .and_then(|v| v.to_str().ok()) .and_then(|v| v.parse::().ok()) .filter(|&secs| secs > 0) } /// Compute the backoff delay for a retry attempt. /// /// Uses the server's `Retry-After` header (if present on a 429) capped at /// 60 seconds. Falls back to exponential backoff (1s, 2s, 4s). fn retry_delay(err: &SyncKitError, attempt: u32) -> std::time::Duration { if let SyncKitError::Server { retry_after_secs: Some(secs), .. } = err { let capped = (*secs).min(60); return std::time::Duration::from_secs(capped); } BASE_DELAY * 2u32.pow(attempt) } /// Returns true if the error is transient and worth retrying. /// /// Transient errors: /// - Network-level failures (connection refused, timeout, DNS, etc.) /// - Server errors (5xx) /// - Rate limiting (429) /// /// Permanent errors (not retried): /// - Client errors (4xx except 429) — bad request, auth failure, not found, etc. /// - Serialization errors, encryption errors, missing session, etc. pub(super) fn is_transient(err: &SyncKitError) -> bool { match err { SyncKitError::Http(e) => { // Transport errors (timeout, connect, DNS) are transient. // Builder errors are programming mistakes, redirect loops and // body decode failures are permanent — retrying won't help. !e.is_builder() && !e.is_redirect() && !e.is_decode() } SyncKitError::Server { status, .. } => { // 5xx = server error (transient), 429 = rate limited (transient) *status >= 500 || *status == 429 } // Everything else (auth, crypto, serialization, keychain) is permanent. // Note: some keychain errors (e.g. locked keychain, unavailable // secret-service) are conceptually transient, but keychain operations // are synchronous and not wrapped by retry_request. _ => false, } } #[cfg(test)] mod tests { use super::*; use base64::Engine; use chrono::Utc; use std::time::Duration; use super::super::TOKEN_EXPIRY_BUFFER_SECS; fn test_config() -> super::super::SyncKitConfig { super::super::SyncKitConfig { server_url: "https://example.com".to_string(), api_key: "test-api-key-123".to_string(), } } /// Build a fake JWT with the given `exp` claim (no real signature). fn fake_jwt(exp: i64) -> String { let header = base64::engine::general_purpose::URL_SAFE_NO_PAD .encode(r#"{"alg":"HS256","typ":"JWT"}"#); let payload_json = serde_json::json!({ "sub": "550e8400-e29b-41d4-a716-446655440000", "app": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", "exp": exp, "iat": exp - 3600, }); let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD .encode(payload_json.to_string().as_bytes()); let signature = base64::engine::general_purpose::URL_SAFE_NO_PAD .encode(b"fake-signature"); format!("{header}.{payload}.{signature}") } // ── encrypt_change / decrypt_change ── #[test] fn encrypt_change_with_no_data() { let client = SyncKitClient::new(test_config()); let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Delete, row_id: "row-1".to_string(), timestamp: Utc::now(), data: None, }; let wire = client.encrypt_change(entry.clone()).unwrap(); assert_eq!(wire.table, "tasks"); assert_eq!(wire.op, ChangeOp::Delete); assert_eq!(wire.row_id, "row-1"); assert!(wire.data.is_none()); } #[test] fn encrypt_change_fails_without_master_key() { let client = SyncKitClient::new(test_config()); let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "row-1".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!({"title": "test"})), }; let err = client.encrypt_change(entry).unwrap_err(); assert!(matches!(err, SyncKitError::NoMasterKey)); } #[test] fn encrypt_change_produces_encrypted_data() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); *client.master_key.write() = Some(crypto::ZeroizeOnDrop(key)); let original_data = serde_json::json!({"title": "Buy milk", "priority": 3}); let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "row-1".to_string(), timestamp: Utc::now(), data: Some(original_data.clone()), }; let wire = client.encrypt_change(entry).unwrap(); assert!(wire.data.is_some()); let encrypted = wire.data.unwrap(); assert!(encrypted.is_string()); assert_ne!(encrypted, original_data); } #[test] fn encrypt_decrypt_roundtrip() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); *client.master_key.write() = Some(crypto::ZeroizeOnDrop(key)); let original_data = serde_json::json!({ "title": "Buy milk", "tags": ["groceries", "urgent"], "count": 42 }); let ts = Utc::now(); let entry = ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Update, row_id: "row-abc".to_string(), timestamp: ts, data: Some(original_data.clone()), }; let wire = client.encrypt_change(entry).unwrap(); let pull_entry = PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: wire.table, op: wire.op, row_id: wire.row_id, timestamp: wire.timestamp, data: wire.data, key_id: None, }; let decrypted = client.decrypt_change(pull_entry).unwrap(); assert_eq!(decrypted.table, "tasks"); assert_eq!(decrypted.op, ChangeOp::Update); assert_eq!(decrypted.row_id, "row-abc"); assert_eq!(decrypted.data.unwrap(), original_data); } #[test] fn decrypt_change_with_no_data() { let client = SyncKitClient::new(test_config()); let pull_entry = PullChangeEntry { seq: 5, device_id: uuid::Uuid::new_v4(), table: "events".to_string(), op: ChangeOp::Delete, row_id: "evt-1".to_string(), timestamp: Utc::now(), data: None, key_id: None, }; let decrypted = client.decrypt_change(pull_entry).unwrap(); assert_eq!(decrypted.table, "events"); assert_eq!(decrypted.op, ChangeOp::Delete); assert!(decrypted.data.is_none()); } #[test] fn decrypt_change_fails_without_master_key() { let client = SyncKitClient::new(test_config()); let pull_entry = PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "row-1".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!("some-encrypted-string")), key_id: None, }; let err = client.decrypt_change(pull_entry).unwrap_err(); assert!(matches!(err, SyncKitError::NoMasterKey)); } // ── is_transient error classification ── #[test] fn is_transient_server_5xx() { let err = SyncKitError::Server { status: 500, message: "Internal Server Error".to_string(), retry_after_secs: None }; assert!(is_transient(&err)); let err = SyncKitError::Server { status: 502, message: "Bad Gateway".to_string(), retry_after_secs: None }; assert!(is_transient(&err)); let err = SyncKitError::Server { status: 503, message: "Service Unavailable".to_string(), retry_after_secs: None }; assert!(is_transient(&err)); let err = SyncKitError::Server { status: 504, message: "Gateway Timeout".to_string(), retry_after_secs: None }; assert!(is_transient(&err)); } #[test] fn is_transient_rate_limited_429() { let err = SyncKitError::Server { status: 429, message: "Too Many Requests".to_string(), retry_after_secs: None }; assert!(is_transient(&err)); } #[test] fn is_not_transient_client_4xx() { let err = SyncKitError::Server { status: 400, message: "Bad Request".to_string(), retry_after_secs: None }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 401, message: "Unauthorized".to_string(), retry_after_secs: None }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 403, message: "Forbidden".to_string(), retry_after_secs: None }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 404, message: "Not Found".to_string(), retry_after_secs: None }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 409, message: "Conflict".to_string(), retry_after_secs: None }; assert!(!is_transient(&err)); let err = SyncKitError::Server { status: 422, message: "Unprocessable Entity".to_string(), retry_after_secs: None }; assert!(!is_transient(&err)); } #[test] fn is_not_transient_not_authenticated() { assert!(!is_transient(&SyncKitError::NotAuthenticated)); } #[test] fn is_not_transient_no_master_key() { assert!(!is_transient(&SyncKitError::NoMasterKey)); } #[test] fn is_not_transient_decryption_failed() { assert!(!is_transient(&SyncKitError::DecryptionFailed)); } #[test] fn is_not_transient_invalid_envelope() { assert!(!is_transient(&SyncKitError::InvalidEnvelope("bad version".to_string()))); } #[test] fn is_not_transient_crypto() { assert!(!is_transient(&SyncKitError::Crypto("encrypt failed".to_string()))); } #[test] fn is_not_transient_json() { let err: SyncKitError = serde_json::from_str::("not json") .unwrap_err() .into(); assert!(!is_transient(&err)); } #[test] fn is_not_transient_base64() { let err: SyncKitError = base64::engine::general_purpose::STANDARD .decode("!!!invalid!!!") .unwrap_err() .into(); assert!(!is_transient(&err)); } #[test] fn is_not_transient_token_expired() { assert!(!is_transient(&SyncKitError::TokenExpired)); } #[test] fn is_not_transient_internal() { assert!(!is_transient(&SyncKitError::Internal("lock poisoned".to_string()))); } // ── Retry constants ── #[test] fn retry_constants_are_sensible() { assert_eq!(MAX_RETRIES, 3); assert_eq!(BASE_DELAY, Duration::from_secs(1)); } #[test] fn backoff_delays_are_exponential() { let delay_0 = BASE_DELAY * 2u32.pow(0); let delay_1 = BASE_DELAY * 2u32.pow(1); let delay_2 = BASE_DELAY * 2u32.pow(2); assert_eq!(delay_0, Duration::from_secs(1)); assert_eq!(delay_1, Duration::from_secs(2)); assert_eq!(delay_2, Duration::from_secs(4)); } // ── is_transient boundary: 429 vs 428, 499 vs 500 ── #[test] fn is_transient_boundary_values() { assert!(!is_transient(&SyncKitError::Server { status: 428, message: String::new(), retry_after_secs: None })); assert!(is_transient(&SyncKitError::Server { status: 429, message: String::new(), retry_after_secs: None })); assert!(!is_transient(&SyncKitError::Server { status: 430, message: String::new(), retry_after_secs: None })); assert!(!is_transient(&SyncKitError::Server { status: 499, message: String::new(), retry_after_secs: None })); assert!(is_transient(&SyncKitError::Server { status: 500, message: String::new(), retry_after_secs: None })); } // ── Token expiry detection ── #[test] fn jwt_exp_extracts_expiry() { let exp = Utc::now().timestamp() + 3600; let token = fake_jwt(exp); assert_eq!(jwt_exp(&token), Some(exp)); } #[test] fn jwt_exp_returns_none_for_garbage() { assert_eq!(jwt_exp("not-a-jwt"), None); assert_eq!(jwt_exp("a.b.c"), None); assert_eq!(jwt_exp(""), None); } #[test] fn token_is_expired_for_past_exp() { let token = fake_jwt(Utc::now().timestamp() - 3600); assert!(token_is_expired(&token)); } #[test] fn token_is_expired_within_buffer() { let token = fake_jwt(Utc::now().timestamp() + 10); assert!(token_is_expired(&token)); } #[test] fn token_is_not_expired_when_fresh() { let token = fake_jwt(Utc::now().timestamp() + 3600); assert!(!token_is_expired(&token)); } #[test] fn token_is_not_expired_for_garbage() { assert!(!token_is_expired("garbage")); } #[test] fn token_expires_exactly_at_buffer_boundary() { let token = fake_jwt(Utc::now().timestamp() + TOKEN_EXPIRY_BUFFER_SECS); assert!(token_is_expired(&token)); } #[test] fn token_expires_just_past_buffer() { let token = fake_jwt(Utc::now().timestamp() + TOKEN_EXPIRY_BUFFER_SECS + 1); assert!(!token_is_expired(&token)); } // ── encrypt_change preserves metadata ── #[test] fn encrypt_change_preserves_all_metadata() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); client.set_master_key_raw(key); let ts = Utc::now(); let entry = ChangeEntry { table: "contacts".to_string(), op: ChangeOp::Update, row_id: "unique-row-id".to_string(), timestamp: ts, data: Some(serde_json::json!({"name": "Alice"})), }; let wire = client.encrypt_change(entry).unwrap(); assert_eq!(wire.table, "contacts"); assert_eq!(wire.op, ChangeOp::Update); assert_eq!(wire.row_id, "unique-row-id"); assert_eq!(wire.timestamp, ts); } // ── Multiple entries encrypt/decrypt ── #[test] fn multiple_entries_encrypt_decrypt_roundtrip() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); client.set_master_key_raw(key); let entries = [ ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "r1".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!({"title": "Task 1"})), }, ChangeEntry { table: "tasks".to_string(), op: ChangeOp::Update, row_id: "r2".to_string(), timestamp: Utc::now(), data: Some(serde_json::json!({"title": "Task 2", "done": true})), }, ChangeEntry { table: "events".to_string(), op: ChangeOp::Delete, row_id: "r3".to_string(), timestamp: Utc::now(), data: None, }, ]; let wire_entries: Vec<_> = entries .iter() .cloned() .map(|e| client.encrypt_change(e).unwrap()) .collect(); assert_eq!(wire_entries.len(), 3); assert!(wire_entries[0].data.is_some()); assert!(wire_entries[1].data.is_some()); assert!(wire_entries[2].data.is_none()); for (i, wire) in wire_entries.into_iter().enumerate() { let pull = PullChangeEntry { seq: i as i64, device_id: uuid::Uuid::new_v4(), table: wire.table, op: wire.op, row_id: wire.row_id, timestamp: wire.timestamp, data: wire.data, key_id: None, }; let decrypted = client.decrypt_change(pull).unwrap(); assert_eq!(decrypted.table, entries[i].table); assert_eq!(decrypted.op, entries[i].op); assert_eq!(decrypted.data, entries[i].data); } } // ── Unicode and edge-case roundtrips ── #[test] fn encrypt_decrypt_roundtrip_unicode_table() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); client.set_master_key_raw(key); let entry = ChangeEntry { table: "\u{65E5}\u{672C}\u{8A9E}\u{30C6}\u{30FC}\u{30D6}\u{30EB}".into(), op: ChangeOp::Insert, row_id: "row-1".into(), timestamp: Utc::now(), data: Some(serde_json::json!({"name": "\u{30C6}\u{30B9}\u{30C8}"})), }; let wire = client.encrypt_change(entry).unwrap(); let pull = PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: wire.table, op: wire.op, row_id: wire.row_id, timestamp: wire.timestamp, data: wire.data, key_id: None, }; let decrypted = client.decrypt_change(pull).unwrap(); assert_eq!( decrypted.table, "\u{65E5}\u{672C}\u{8A9E}\u{30C6}\u{30FC}\u{30D6}\u{30EB}" ); } #[test] fn encrypt_decrypt_roundtrip_empty_row_id() { let client = SyncKitClient::new(test_config()); let key = crypto::generate_master_key(); client.set_master_key_raw(key); let entry = ChangeEntry { table: "t".into(), op: ChangeOp::Insert, row_id: "".into(), timestamp: Utc::now(), data: Some(serde_json::json!(42)), }; let wire = client.encrypt_change(entry).unwrap(); let pull = PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: wire.table, op: wire.op, row_id: wire.row_id, timestamp: wire.timestamp, data: wire.data, key_id: None, }; let decrypted = client.decrypt_change(pull).unwrap(); assert_eq!(decrypted.row_id, ""); assert_eq!(decrypted.data.unwrap(), serde_json::json!(42)); } // ── decrypt_change_multi_key key selection ── // // Pins the key-id boundary logic that picks between primary and pending // master keys during a rotation window. The mutations targeted here: // * `effective_key_id == pending_key_id` (== ↔ !=) // * `effective_key_id == primary_key_id || effective_key_id <= 1` // (`||` ↔ `&&`, `<= 1` ↔ `< 1`/`<= 0`) // * `entry.key_id.unwrap_or(1)` default // * the Err-fallthrough that tries pending if primary fails fn encrypt_with(key: &[u8; 32], value: serde_json::Value) -> serde_json::Value { crypto::encrypt_json(&value, key).unwrap() } fn pull_entry_with( encrypted: serde_json::Value, key_id: Option, ) -> PullChangeEntry { PullChangeEntry { seq: 1, device_id: uuid::Uuid::new_v4(), table: "tasks".to_string(), op: ChangeOp::Insert, row_id: "row-multikey".to_string(), timestamp: Utc::now(), data: Some(encrypted), key_id, } } #[test] fn multi_key_picks_pending_when_key_id_matches() { let primary = crypto::generate_master_key(); let pending = crypto::generate_master_key(); let plaintext = serde_json::json!({"v": "pending-payload"}); let entry = pull_entry_with(encrypt_with(&pending, plaintext.clone()), Some(7)); let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 1, &pending, 7).unwrap(); assert_eq!(decrypted.data.unwrap(), plaintext); } #[test] fn multi_key_picks_primary_when_key_id_matches_primary() { let primary = crypto::generate_master_key(); let pending = crypto::generate_master_key(); let plaintext = serde_json::json!({"v": "primary-payload"}); let entry = pull_entry_with(encrypt_with(&primary, plaintext.clone()), Some(3)); let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 3, &pending, 9).unwrap(); assert_eq!(decrypted.data.unwrap(), plaintext); } #[test] fn multi_key_treats_missing_key_id_as_primary() { // `entry.key_id.unwrap_or(1)` defaults to 1; `effective_key_id <= 1` // arm routes to primary. A mutation changing `unwrap_or(1)` to // `unwrap_or(99)` would route to "unknown" path and try primary anyway // via the fallback — but a mutation to `<= 1` → `< 1` would skip the // direct-primary branch and fall into the fallback path. let primary = crypto::generate_master_key(); let pending = crypto::generate_master_key(); let plaintext = serde_json::json!({"v": "legacy-no-key-id"}); let entry = pull_entry_with(encrypt_with(&primary, plaintext.clone()), None); let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 5, &pending, 6).unwrap(); assert_eq!(decrypted.data.unwrap(), plaintext); } #[test] fn multi_key_unknown_key_id_falls_back_to_primary_first() { // effective_key_id (42) matches neither primary (5) nor pending (6), // and is > 1. The fallback first tries primary; here the data WAS // encrypted with primary, so the fallback succeeds. let primary = crypto::generate_master_key(); let pending = crypto::generate_master_key(); let plaintext = serde_json::json!({"v": "via-fallback-primary"}); let entry = pull_entry_with(encrypt_with(&primary, plaintext.clone()), Some(42)); let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 5, &pending, 6).unwrap(); assert_eq!(decrypted.data.unwrap(), plaintext); } #[test] fn multi_key_unknown_key_id_falls_back_to_pending_when_primary_fails() { // Unknown key_id and the data was encrypted with pending — fallback // must try primary first (fails), then pending (succeeds). let primary = crypto::generate_master_key(); let pending = crypto::generate_master_key(); let plaintext = serde_json::json!({"v": "via-fallback-pending"}); let entry = pull_entry_with(encrypt_with(&pending, plaintext.clone()), Some(42)); let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 5, &pending, 6).unwrap(); assert_eq!(decrypted.data.unwrap(), plaintext); } }