Skip to main content

max / makenotwork

13.3 KB · 387 lines History Blame Raw
1 use bytes::Bytes;
2 use tracing::instrument;
3 use uuid::Uuid;
4
5 use crate::{
6 crypto,
7 error::Result,
8 types::*,
9 };
10
11 use super::SyncKitClient;
12 use super::helpers::check_response;
13
14 impl SyncKitClient {
15 /// Rotate the master encryption key.
16 ///
17 /// Generates a new 256-bit key, re-encrypts all sync log entries in batches,
18 /// and commits the new key to the server. Idempotent: if interrupted, calling
19 /// `rotate_key` again resumes from where it left off.
20 ///
21 /// Requires the encryption password (to wrap the new key) and a device_id
22 /// (the device performing the rotation). Only one device can rotate at a time.
23 ///
24 /// After completion, all devices will receive the new key on their next
25 /// `GET /keys` call. During rotation, both old and new keys are available
26 /// so pulls continue to work.
27 #[instrument(skip(self, password))]
28 pub async fn rotate_key(
29 &self,
30 device_id: Uuid,
31 password: &str,
32 ) -> Result<()> {
33 // 1. Verify password and get old key
34 let (envelope_json, key_version) = self.get_server_key().await?;
35 let old_key = crypto::verify_password_against_envelope(&envelope_json, password)?;
36 let old_key = crypto::ZeroizeOnDrop(old_key);
37
38 // 2. Generate new key and wrap with password
39 let new_master_key = crypto::generate_master_key();
40 let new_envelope = crypto::wrap_master_key(&new_master_key, password)?;
41
42 // 3. Begin rotation
43 let begin_resp = self.begin_rotation(device_id, &new_envelope, key_version).await?;
44 let rotation_id = begin_resp.rotation_id;
45 let new_key_id = begin_resp.new_key_id;
46
47 tracing::info!(
48 rotation_id = %rotation_id,
49 target_seq = begin_resp.target_seq,
50 new_key_id = new_key_id,
51 "Key rotation started",
52 );
53
54 // 4. Re-encrypt loop
55 loop {
56 let all_done = self.reencrypt_batch(
57 rotation_id,
58 &old_key,
59 &new_master_key,
60 ).await?;
61
62 if all_done {
63 break;
64 }
65 }
66
67 // 5. Complete — retry if stragglers from concurrent pushes
68 loop {
69 match self.complete_rotation(rotation_id).await {
70 Ok(()) => break,
71 Err(crate::error::SyncKitError::Server { status: 409, .. }) => {
72 tracing::debug!("Stragglers detected, re-encrypting remaining entries");
73 // Re-encrypt any new entries pushed during our rotation
74 loop {
75 let all_done = self.reencrypt_batch(
76 rotation_id,
77 &old_key,
78 &new_master_key,
79 ).await?;
80 if all_done {
81 break;
82 }
83 }
84 }
85 Err(e) => return Err(e),
86 }
87 }
88
89 // 6. Update local state
90 let (app_id, user_id) = self.require_session_ids()?;
91 crate::keystore::store_key(app_id, user_id, &new_master_key)?;
92 *self.master_key.write() = Some(crypto::ZeroizeOnDrop(new_master_key));
93 *self.master_key_id.write() = new_key_id;
94 *self.pending_key.write() = None;
95
96 tracing::info!("Key rotation completed");
97 Ok(())
98 }
99
100 /// Pull a batch of entries needing re-encryption, re-encrypt them, and push back.
101 /// Returns true if there are no more entries to process.
102 async fn reencrypt_batch(
103 &self,
104 rotation_id: Uuid,
105 old_key: &[u8; 32],
106 new_key: &[u8; 32],
107 ) -> Result<bool> {
108 // Pull entries needing re-encryption (starting from seq 0 each time,
109 // since the server filters by key_id != new_key_id)
110 let entries_resp = self.rotation_entries(rotation_id, 0).await?;
111
112 if entries_resp.entries.is_empty() {
113 return Ok(true);
114 }
115
116 let has_more = entries_resp.has_more;
117
118 // Re-encrypt each entry
119 let reencrypted: Vec<RotationBatchEntry> = entries_resp
120 .entries
121 .into_iter()
122 .map(|entry| {
123 let new_data = match entry.data {
124 Some(ref encrypted_value) => {
125 // Decrypt with old key, re-encrypt with new key
126 let plaintext = crypto::decrypt_json(encrypted_value, old_key)?;
127 let reencrypted = crypto::encrypt_json(&plaintext, new_key)?;
128 Some(reencrypted)
129 }
130 None => None, // DELETE entries have no data
131 };
132 Ok(RotationBatchEntry {
133 seq: entry.seq,
134 data: new_data,
135 })
136 })
137 .collect::<Result<Vec<_>>>()?;
138
139 // Push re-encrypted batch
140 let count = reencrypted.len();
141 self.rotation_batch(rotation_id, reencrypted).await?;
142
143 tracing::debug!(count, "Re-encrypted batch submitted");
144
145 Ok(!has_more)
146 }
147
148 // ── HTTP helpers ──
149
150 async fn begin_rotation(
151 &self,
152 device_id: Uuid,
153 new_encrypted_key: &str,
154 expected_key_version: i32,
155 ) -> Result<BeginRotationResponse> {
156 let token = self.require_token()?;
157 let url = format!("{}/api/v1/sync/keys/rotate", self.config.server_url.trim_end_matches('/'));
158
159 let body = Bytes::from(serde_json::to_vec(&BeginRotationRequest {
160 device_id,
161 new_encrypted_key: new_encrypted_key.to_string(),
162 expected_key_version,
163 })?);
164
165 self.retry_request_json(|| {
166 let req = self
167 .http
168 .post(&url)
169 .bearer_auth(&token)
170 .header("content-type", "application/json")
171 .body(body.clone());
172 async move { check_response(req.send().await?).await }
173 })
174 .await
175 }
176
177 async fn rotation_entries(
178 &self,
179 rotation_id: Uuid,
180 after_seq: i64,
181 ) -> Result<RotationEntriesResponse> {
182 let token = self.require_token()?;
183 let url = format!("{}/api/v1/sync/keys/rotate/entries", self.config.server_url.trim_end_matches('/'));
184
185 let body = Bytes::from(serde_json::to_vec(&RotationEntriesRequest {
186 rotation_id,
187 after_seq,
188 })?);
189
190 self.retry_request_json(|| {
191 let req = self
192 .http
193 .post(&url)
194 .bearer_auth(&token)
195 .header("content-type", "application/json")
196 .body(body.clone());
197 async move { check_response(req.send().await?).await }
198 })
199 .await
200 }
201
202 async fn rotation_batch(
203 &self,
204 rotation_id: Uuid,
205 entries: Vec<RotationBatchEntry>,
206 ) -> Result<RotationBatchResponse> {
207 let token = self.require_token()?;
208 let url = format!("{}/api/v1/sync/keys/rotate/batch", self.config.server_url.trim_end_matches('/'));
209
210 let body = Bytes::from(serde_json::to_vec(&RotationBatchRequest {
211 rotation_id,
212 entries,
213 })?);
214
215 self.retry_request_json(|| {
216 let req = self
217 .http
218 .post(&url)
219 .bearer_auth(&token)
220 .header("content-type", "application/json")
221 .body(body.clone());
222 async move { check_response(req.send().await?).await }
223 })
224 .await
225 }
226
227 async fn complete_rotation(
228 &self,
229 rotation_id: Uuid,
230 ) -> Result<()> {
231 let token = self.require_token()?;
232 let url = format!("{}/api/v1/sync/keys/rotate/complete", self.config.server_url.trim_end_matches('/'));
233
234 let body = Bytes::from(serde_json::to_vec(&CompleteRotationRequest {
235 rotation_id,
236 })?);
237
238 self.retry_request(|| {
239 let req = self
240 .http
241 .post(&url)
242 .bearer_auth(&token)
243 .header("content-type", "application/json")
244 .body(body.clone());
245 async move { check_response(req.send().await?).await }
246 })
247 .await?;
248
249 Ok(())
250 }
251 }
252
253 #[cfg(test)]
254 mod tests {
255 use super::*;
256
257 fn test_config() -> super::super::SyncKitConfig {
258 super::super::SyncKitConfig {
259 server_url: "https://example.com".to_string(),
260 api_key: "test-api-key-123".to_string(),
261 }
262 }
263
264 #[test]
265 fn reencrypt_preserves_plaintext() {
266 // Simulate re-encryption: encrypt with old key, decrypt, re-encrypt with new key
267 let old_key = crypto::generate_master_key();
268 let new_key = crypto::generate_master_key();
269 let original = serde_json::json!({"title": "Test task", "priority": 3});
270
271 // Encrypt with old key (simulates existing sync_log entry)
272 let encrypted_old = crypto::encrypt_json(&original, &old_key).unwrap();
273
274 // Re-encrypt: decrypt with old, encrypt with new
275 let plaintext = crypto::decrypt_json(&encrypted_old, &old_key).unwrap();
276 let encrypted_new = crypto::encrypt_json(&plaintext, &new_key).unwrap();
277
278 // Verify: new key can decrypt to original data
279 let recovered = crypto::decrypt_json(&encrypted_new, &new_key).unwrap();
280 assert_eq!(recovered, original);
281
282 // Verify: old key cannot decrypt re-encrypted data
283 assert!(crypto::decrypt_json(&encrypted_new, &old_key).is_err());
284 }
285
286 #[test]
287 fn reencrypt_null_data_stays_null() {
288 // DELETE entries have no data — rotation should preserve this
289 let old_key = crypto::generate_master_key();
290 let new_key = crypto::generate_master_key();
291
292 // No data to re-encrypt
293 let data: Option<serde_json::Value> = None;
294 let reencrypted = match data {
295 Some(ref val) => {
296 let plaintext = crypto::decrypt_json(val, &old_key).unwrap();
297 Some(crypto::encrypt_json(&plaintext, &new_key).unwrap())
298 }
299 None => None,
300 };
301
302 assert!(reencrypted.is_none());
303 }
304
305 #[test]
306 fn rotation_request_types_serialize() {
307 let req = BeginRotationRequest {
308 device_id: Uuid::new_v4(),
309 new_encrypted_key: "envelope-json".to_string(),
310 expected_key_version: 1,
311 };
312 let json = serde_json::to_string(&req).unwrap();
313 assert!(json.contains("expected_key_version"));
314
315 let req = RotationBatchRequest {
316 rotation_id: Uuid::new_v4(),
317 entries: vec![
318 RotationBatchEntry { seq: 1, data: Some(serde_json::json!("encrypted")) },
319 RotationBatchEntry { seq: 2, data: None },
320 ],
321 };
322 let json = serde_json::to_string(&req).unwrap();
323 assert!(json.contains("rotation_id"));
324 assert!(json.contains("\"seq\":1"));
325 }
326
327 #[test]
328 fn rotation_response_types_deserialize() {
329 let json = r#"{"rotation_id": "550e8400-e29b-41d4-a716-446655440000", "target_seq": 100, "new_key_id": 2}"#;
330 let resp: BeginRotationResponse = serde_json::from_str(json).unwrap();
331 assert_eq!(resp.target_seq, 100);
332 assert_eq!(resp.new_key_id, 2);
333
334 let json = r#"{"entries": [{"seq": 1, "data": "encrypted"}, {"seq": 2, "data": null}], "has_more": true}"#;
335 let resp: RotationEntriesResponse = serde_json::from_str(json).unwrap();
336 assert_eq!(resp.entries.len(), 2);
337 assert!(resp.has_more);
338 assert!(resp.entries[0].data.is_some());
339 assert!(resp.entries[1].data.is_none());
340 }
341
342 #[test]
343 fn pending_key_info_deserializes() {
344 let json = r#"{
345 "encrypted_key": "envelope",
346 "key_version": 2,
347 "key_id": 1,
348 "pending_key": {"encrypted_key": "new-envelope", "key_id": 2}
349 }"#;
350 let resp: GetKeyResponse = serde_json::from_str(json).unwrap();
351 assert!(resp.pending_key.is_some());
352 let pending = resp.pending_key.unwrap();
353 assert_eq!(pending.key_id, 2);
354 assert_eq!(pending.encrypted_key, "new-envelope");
355 }
356
357 #[test]
358 fn get_key_response_without_pending_key() {
359 let json = r#"{"encrypted_key": "envelope", "key_version": 1}"#;
360 let resp: GetKeyResponse = serde_json::from_str(json).unwrap();
361 assert!(resp.pending_key.is_none());
362 assert_eq!(resp.key_id, None); // backward compat
363 }
364
365 #[test]
366 fn pull_change_entry_with_key_id() {
367 let device_id = Uuid::new_v4();
368 let json = format!(
369 r#"{{"seq": 1, "device_id": "{}", "table": "t", "op": "INSERT", "row_id": "r", "timestamp": "2025-06-01T12:00:00Z", "data": null, "key_id": 2}}"#,
370 device_id
371 );
372 let entry: PullChangeEntry = serde_json::from_str(&json).unwrap();
373 assert_eq!(entry.key_id, Some(2));
374 }
375
376 #[test]
377 fn pull_change_entry_without_key_id_defaults_none() {
378 let device_id = Uuid::new_v4();
379 let json = format!(
380 r#"{{"seq": 1, "device_id": "{}", "table": "t", "op": "INSERT", "row_id": "r", "timestamp": "2025-06-01T12:00:00Z", "data": null}}"#,
381 device_id
382 );
383 let entry: PullChangeEntry = serde_json::from_str(&json).unwrap();
384 assert_eq!(entry.key_id, None);
385 }
386 }
387