use bytes::Bytes; use tracing::instrument; use uuid::Uuid; use crate::{ crypto, error::Result, types::*, }; use super::SyncKitClient; use super::helpers::check_response; impl SyncKitClient { /// Rotate the master encryption key. /// /// Generates a new 256-bit key, re-encrypts all sync log entries in batches, /// and commits the new key to the server. Idempotent: if interrupted, calling /// `rotate_key` again resumes from where it left off. /// /// Requires the encryption password (to wrap the new key) and a device_id /// (the device performing the rotation). Only one device can rotate at a time. /// /// After completion, all devices will receive the new key on their next /// `GET /keys` call. During rotation, both old and new keys are available /// so pulls continue to work. #[instrument(skip(self, password))] pub async fn rotate_key( &self, device_id: Uuid, password: &str, ) -> Result<()> { // 1. Verify password and get old key let (envelope_json, key_version) = self.get_server_key().await?; let old_key = crypto::verify_password_against_envelope(&envelope_json, password)?; let old_key = crypto::ZeroizeOnDrop(old_key); // 2. Generate new key and wrap with password let new_master_key = crypto::generate_master_key(); let new_envelope = crypto::wrap_master_key(&new_master_key, password)?; // 3. Begin rotation let begin_resp = self.begin_rotation(device_id, &new_envelope, key_version).await?; let rotation_id = begin_resp.rotation_id; let new_key_id = begin_resp.new_key_id; tracing::info!( rotation_id = %rotation_id, target_seq = begin_resp.target_seq, new_key_id = new_key_id, "Key rotation started", ); // 4. Re-encrypt loop loop { let all_done = self.reencrypt_batch( rotation_id, &old_key, &new_master_key, ).await?; if all_done { break; } } // 5. Complete — retry if stragglers from concurrent pushes loop { match self.complete_rotation(rotation_id).await { Ok(()) => break, Err(crate::error::SyncKitError::Server { status: 409, .. }) => { tracing::debug!("Stragglers detected, re-encrypting remaining entries"); // Re-encrypt any new entries pushed during our rotation loop { let all_done = self.reencrypt_batch( rotation_id, &old_key, &new_master_key, ).await?; if all_done { break; } } } Err(e) => return Err(e), } } // 6. Update local state let (app_id, user_id) = self.require_session_ids()?; crate::keystore::store_key(app_id, user_id, &new_master_key)?; *self.master_key.write() = Some(crypto::ZeroizeOnDrop(new_master_key)); *self.master_key_id.write() = new_key_id; *self.pending_key.write() = None; tracing::info!("Key rotation completed"); Ok(()) } /// Pull a batch of entries needing re-encryption, re-encrypt them, and push back. /// Returns true if there are no more entries to process. async fn reencrypt_batch( &self, rotation_id: Uuid, old_key: &[u8; 32], new_key: &[u8; 32], ) -> Result { // Pull entries needing re-encryption (starting from seq 0 each time, // since the server filters by key_id != new_key_id) let entries_resp = self.rotation_entries(rotation_id, 0).await?; if entries_resp.entries.is_empty() { return Ok(true); } let has_more = entries_resp.has_more; // Re-encrypt each entry let reencrypted: Vec = entries_resp .entries .into_iter() .map(|entry| { let new_data = match entry.data { Some(ref encrypted_value) => { // Decrypt with old key, re-encrypt with new key let plaintext = crypto::decrypt_json(encrypted_value, old_key)?; let reencrypted = crypto::encrypt_json(&plaintext, new_key)?; Some(reencrypted) } None => None, // DELETE entries have no data }; Ok(RotationBatchEntry { seq: entry.seq, data: new_data, }) }) .collect::>>()?; // Push re-encrypted batch let count = reencrypted.len(); self.rotation_batch(rotation_id, reencrypted).await?; tracing::debug!(count, "Re-encrypted batch submitted"); Ok(!has_more) } // ── HTTP helpers ── async fn begin_rotation( &self, device_id: Uuid, new_encrypted_key: &str, expected_key_version: i32, ) -> Result { let token = self.require_token()?; let url = format!("{}/api/v1/sync/keys/rotate", self.config.server_url.trim_end_matches('/')); let body = Bytes::from(serde_json::to_vec(&BeginRotationRequest { device_id, new_encrypted_key: new_encrypted_key.to_string(), expected_key_version, })?); self.retry_request_json(|| { let req = self .http .post(&url) .bearer_auth(&token) .header("content-type", "application/json") .body(body.clone()); async move { check_response(req.send().await?).await } }) .await } async fn rotation_entries( &self, rotation_id: Uuid, after_seq: i64, ) -> Result { let token = self.require_token()?; let url = format!("{}/api/v1/sync/keys/rotate/entries", self.config.server_url.trim_end_matches('/')); let body = Bytes::from(serde_json::to_vec(&RotationEntriesRequest { rotation_id, after_seq, })?); self.retry_request_json(|| { let req = self .http .post(&url) .bearer_auth(&token) .header("content-type", "application/json") .body(body.clone()); async move { check_response(req.send().await?).await } }) .await } async fn rotation_batch( &self, rotation_id: Uuid, entries: Vec, ) -> Result { let token = self.require_token()?; let url = format!("{}/api/v1/sync/keys/rotate/batch", self.config.server_url.trim_end_matches('/')); let body = Bytes::from(serde_json::to_vec(&RotationBatchRequest { rotation_id, entries, })?); self.retry_request_json(|| { let req = self .http .post(&url) .bearer_auth(&token) .header("content-type", "application/json") .body(body.clone()); async move { check_response(req.send().await?).await } }) .await } async fn complete_rotation( &self, rotation_id: Uuid, ) -> Result<()> { let token = self.require_token()?; let url = format!("{}/api/v1/sync/keys/rotate/complete", self.config.server_url.trim_end_matches('/')); let body = Bytes::from(serde_json::to_vec(&CompleteRotationRequest { rotation_id, })?); self.retry_request(|| { let req = self .http .post(&url) .bearer_auth(&token) .header("content-type", "application/json") .body(body.clone()); async move { check_response(req.send().await?).await } }) .await?; Ok(()) } } #[cfg(test)] mod tests { use super::*; fn test_config() -> super::super::SyncKitConfig { super::super::SyncKitConfig { server_url: "https://example.com".to_string(), api_key: "test-api-key-123".to_string(), } } #[test] fn reencrypt_preserves_plaintext() { // Simulate re-encryption: encrypt with old key, decrypt, re-encrypt with new key let old_key = crypto::generate_master_key(); let new_key = crypto::generate_master_key(); let original = serde_json::json!({"title": "Test task", "priority": 3}); // Encrypt with old key (simulates existing sync_log entry) let encrypted_old = crypto::encrypt_json(&original, &old_key).unwrap(); // Re-encrypt: decrypt with old, encrypt with new let plaintext = crypto::decrypt_json(&encrypted_old, &old_key).unwrap(); let encrypted_new = crypto::encrypt_json(&plaintext, &new_key).unwrap(); // Verify: new key can decrypt to original data let recovered = crypto::decrypt_json(&encrypted_new, &new_key).unwrap(); assert_eq!(recovered, original); // Verify: old key cannot decrypt re-encrypted data assert!(crypto::decrypt_json(&encrypted_new, &old_key).is_err()); } #[test] fn reencrypt_null_data_stays_null() { // DELETE entries have no data — rotation should preserve this let old_key = crypto::generate_master_key(); let new_key = crypto::generate_master_key(); // No data to re-encrypt let data: Option = None; let reencrypted = match data { Some(ref val) => { let plaintext = crypto::decrypt_json(val, &old_key).unwrap(); Some(crypto::encrypt_json(&plaintext, &new_key).unwrap()) } None => None, }; assert!(reencrypted.is_none()); } #[test] fn rotation_request_types_serialize() { let req = BeginRotationRequest { device_id: Uuid::new_v4(), new_encrypted_key: "envelope-json".to_string(), expected_key_version: 1, }; let json = serde_json::to_string(&req).unwrap(); assert!(json.contains("expected_key_version")); let req = RotationBatchRequest { rotation_id: Uuid::new_v4(), entries: vec![ RotationBatchEntry { seq: 1, data: Some(serde_json::json!("encrypted")) }, RotationBatchEntry { seq: 2, data: None }, ], }; let json = serde_json::to_string(&req).unwrap(); assert!(json.contains("rotation_id")); assert!(json.contains("\"seq\":1")); } #[test] fn rotation_response_types_deserialize() { let json = r#"{"rotation_id": "550e8400-e29b-41d4-a716-446655440000", "target_seq": 100, "new_key_id": 2}"#; let resp: BeginRotationResponse = serde_json::from_str(json).unwrap(); assert_eq!(resp.target_seq, 100); assert_eq!(resp.new_key_id, 2); let json = r#"{"entries": [{"seq": 1, "data": "encrypted"}, {"seq": 2, "data": null}], "has_more": true}"#; let resp: RotationEntriesResponse = serde_json::from_str(json).unwrap(); assert_eq!(resp.entries.len(), 2); assert!(resp.has_more); assert!(resp.entries[0].data.is_some()); assert!(resp.entries[1].data.is_none()); } #[test] fn pending_key_info_deserializes() { let json = r#"{ "encrypted_key": "envelope", "key_version": 2, "key_id": 1, "pending_key": {"encrypted_key": "new-envelope", "key_id": 2} }"#; let resp: GetKeyResponse = serde_json::from_str(json).unwrap(); assert!(resp.pending_key.is_some()); let pending = resp.pending_key.unwrap(); assert_eq!(pending.key_id, 2); assert_eq!(pending.encrypted_key, "new-envelope"); } #[test] fn get_key_response_without_pending_key() { let json = r#"{"encrypted_key": "envelope", "key_version": 1}"#; let resp: GetKeyResponse = serde_json::from_str(json).unwrap(); assert!(resp.pending_key.is_none()); assert_eq!(resp.key_id, None); // backward compat } #[test] fn pull_change_entry_with_key_id() { let device_id = Uuid::new_v4(); let json = format!( r#"{{"seq": 1, "device_id": "{}", "table": "t", "op": "INSERT", "row_id": "r", "timestamp": "2025-06-01T12:00:00Z", "data": null, "key_id": 2}}"#, device_id ); let entry: PullChangeEntry = serde_json::from_str(&json).unwrap(); assert_eq!(entry.key_id, Some(2)); } #[test] fn pull_change_entry_without_key_id_defaults_none() { let device_id = Uuid::new_v4(); let json = format!( r#"{{"seq": 1, "device_id": "{}", "table": "t", "op": "INSERT", "row_id": "r", "timestamp": "2025-06-01T12:00:00Z", "data": null}}"#, device_id ); let entry: PullChangeEntry = serde_json::from_str(&json).unwrap(); assert_eq!(entry.key_id, None); } }