Skip to main content

max / makenotwork

33.2 KB · 920 lines History Blame Raw
1 use base64::Engine;
2
3 use crate::{
4 crypto,
5 error::{Result, SyncKitError},
6 types::*,
7 };
8
9 use super::{BASE_DELAY, MAX_RETRIES, SyncKitClient};
10
11 impl SyncKitClient {
12 /// Retry an async HTTP operation with exponential backoff.
13 ///
14 /// Retries on transient errors (network failures, 5xx, 429) up to [`MAX_RETRIES`]
15 /// times with delays of 1s, 2s, 4s. Returns the last error if all attempts fail.
16 /// Client errors (4xx except 429) are considered permanent and returned immediately.
17 pub(super) async fn retry_request<F, Fut>(&self, mut operation: F) -> Result<reqwest::Response>
18 where
19 F: FnMut() -> Fut,
20 Fut: std::future::Future<Output = Result<reqwest::Response>>,
21 {
22 let mut last_err = None;
23
24 for attempt in 0..=MAX_RETRIES {
25 match operation().await {
26 Ok(resp) => return Ok(resp),
27 Err(err) => {
28 if !is_transient(&err) {
29 return Err(err);
30 }
31
32 if attempt < MAX_RETRIES {
33 let delay = retry_delay(&err, attempt);
34 tracing::debug!(
35 attempt = attempt + 1,
36 max_retries = MAX_RETRIES,
37 delay_ms = delay.as_millis() as u64,
38 error = %err,
39 "Transient error, retrying after backoff",
40 );
41 tokio::time::sleep(delay).await;
42 }
43
44 last_err = Some(err);
45 }
46 }
47 }
48
49 Err(last_err.expect("loop ran at least once"))
50 }
51
52 /// Retry an HTTP operation and deserialize the JSON response body inside
53 /// the retry loop. This ensures a transient body-read failure (truncated
54 /// response, connection reset mid-body) is retried rather than surfacing
55 /// as a permanent error after the server already committed the operation.
56 pub(super) async fn retry_request_json<F, Fut, T>(&self, mut operation: F) -> Result<T>
57 where
58 F: FnMut() -> Fut,
59 Fut: std::future::Future<Output = Result<reqwest::Response>>,
60 T: serde::de::DeserializeOwned,
61 {
62 let mut last_err = None;
63
64 for attempt in 0..=MAX_RETRIES {
65 match operation().await {
66 Ok(resp) => {
67 match resp.json::<T>().await {
68 Ok(parsed) => return Ok(parsed),
69 Err(e) => {
70 let err = SyncKitError::Http(e);
71 if attempt < MAX_RETRIES {
72 let delay = BASE_DELAY * 2u32.pow(attempt);
73 tracing::debug!(
74 attempt = attempt + 1,
75 max_retries = MAX_RETRIES,
76 delay_ms = delay.as_millis() as u64,
77 error = %err,
78 "Response body read failed, retrying",
79 );
80 tokio::time::sleep(delay).await;
81 }
82 last_err = Some(err);
83 }
84 }
85 }
86 Err(err) => {
87 if !is_transient(&err) {
88 return Err(err);
89 }
90
91 if attempt < MAX_RETRIES {
92 let delay = retry_delay(&err, attempt);
93 tracing::debug!(
94 attempt = attempt + 1,
95 max_retries = MAX_RETRIES,
96 delay_ms = delay.as_millis() as u64,
97 error = %err,
98 "Transient error, retrying after backoff",
99 );
100 tokio::time::sleep(delay).await;
101 }
102
103 last_err = Some(err);
104 }
105 }
106 }
107
108 Err(last_err.expect("loop ran at least once"))
109 }
110
111 /// Encrypt the data field of a change entry for the wire.
112 #[cfg(test)]
113 pub(super) fn encrypt_change(&self, entry: ChangeEntry) -> Result<WireChangeEntry> {
114 if entry.data.is_some() {
115 let master_key = self.require_master_key()?;
116 Self::encrypt_change_with_key(entry, &master_key)
117 } else {
118 Self::encrypt_change_no_data(entry)
119 }
120 }
121
122 /// Build a wire entry for a change with no data (e.g. Delete). No key needed.
123 #[cfg(test)]
124 pub(super) fn encrypt_change_no_data(entry: ChangeEntry) -> Result<WireChangeEntry> {
125 debug_assert!(entry.data.is_none());
126 Ok(WireChangeEntry {
127 table: entry.table,
128 op: entry.op,
129 row_id: entry.row_id,
130 timestamp: entry.timestamp,
131 data: None,
132 })
133 }
134
135 /// Decrypt a pulled entry that has no data. No key needed.
136 #[cfg(test)]
137 pub(super) fn decrypt_change_no_data(entry: PullChangeEntry) -> Result<ChangeEntry> {
138 debug_assert!(entry.data.is_none());
139 Ok(ChangeEntry {
140 table: entry.table,
141 op: entry.op,
142 row_id: entry.row_id,
143 timestamp: entry.timestamp,
144 data: None,
145 })
146 }
147
148 /// Encrypt with a pre-loaded key. Used by `push()` to avoid per-entry lock acquisition.
149 pub(super) fn encrypt_change_with_key(entry: ChangeEntry, master_key: &[u8; 32]) -> Result<WireChangeEntry> {
150 let encrypted_data = match entry.data {
151 Some(ref value) => Some(crypto::encrypt_json(value, master_key)?),
152 None => None,
153 };
154
155 Ok(WireChangeEntry {
156 table: entry.table,
157 op: entry.op,
158 row_id: entry.row_id,
159 timestamp: entry.timestamp,
160 data: encrypted_data,
161 })
162 }
163
164 /// Decrypt the data field of a pulled change entry.
165 #[cfg(test)]
166 pub(super) fn decrypt_change(&self, entry: PullChangeEntry) -> Result<ChangeEntry> {
167 if entry.data.is_some() {
168 let master_key = self.require_master_key()?;
169 Self::decrypt_change_with_key(entry, &master_key)
170 } else {
171 Self::decrypt_change_no_data(entry)
172 }
173 }
174
175 /// Decrypt with a pre-loaded key, preserving `device_id` and `seq` in a [`PulledChange`].
176 ///
177 /// Used by `pull_rich()` to produce conflict-detection-ready results.
178 pub(super) fn decrypt_change_to_pulled(entry: PullChangeEntry, master_key: &[u8; 32]) -> Result<crate::types::PulledChange> {
179 let device_id = entry.device_id;
180 let seq = entry.seq;
181 let decrypted = Self::decrypt_change_with_key(entry, master_key)?;
182 Ok(crate::types::PulledChange {
183 entry: decrypted,
184 device_id,
185 seq,
186 })
187 }
188
189 /// Decrypt with key selection based on key_id. Used during rotation when
190 /// pulled entries may be encrypted with different keys.
191 #[allow(dead_code)]
192 pub(super) fn decrypt_change_multi_key(
193 entry: PullChangeEntry,
194 primary_key: &[u8; 32],
195 primary_key_id: i32,
196 pending_key: &[u8; 32],
197 pending_key_id: i32,
198 ) -> Result<ChangeEntry> {
199 let effective_key_id = entry.key_id.unwrap_or(1);
200 let key = if effective_key_id == pending_key_id {
201 pending_key
202 } else if effective_key_id == primary_key_id || effective_key_id <= 1 {
203 primary_key
204 } else {
205 // Unknown key_id — try primary, then pending
206 match Self::decrypt_change_with_key_inner(&entry, primary_key) {
207 Ok(result) => return Ok(result),
208 Err(_) => pending_key,
209 }
210 };
211 Self::decrypt_change_with_key(entry, key)
212 }
213
214 /// Inner decryption that borrows the entry (for fallback logic).
215 #[allow(dead_code)]
216 fn decrypt_change_with_key_inner(entry: &PullChangeEntry, master_key: &[u8; 32]) -> Result<ChangeEntry> {
217 let decrypted_data = match entry.data {
218 Some(ref value) => Some(crypto::decrypt_json(value, master_key)?),
219 None => None,
220 };
221
222 Ok(ChangeEntry {
223 table: entry.table.clone(),
224 op: entry.op,
225 row_id: entry.row_id.clone(),
226 timestamp: entry.timestamp,
227 data: decrypted_data,
228 })
229 }
230
231 /// Decrypt with a pre-loaded key. Used by `pull()` to avoid per-entry lock acquisition.
232 pub(super) fn decrypt_change_with_key(entry: PullChangeEntry, master_key: &[u8; 32]) -> Result<ChangeEntry> {
233 let decrypted_data = match entry.data {
234 Some(ref value) => Some(crypto::decrypt_json(value, master_key)?),
235 None => None,
236 };
237
238 Ok(ChangeEntry {
239 table: entry.table,
240 op: entry.op,
241 row_id: entry.row_id,
242 timestamp: entry.timestamp,
243 data: decrypted_data,
244 })
245 }
246 }
247
248 /// Extract the `exp` claim from a JWT without verifying the signature.
249 ///
250 /// JWTs are `header.payload.signature` where the payload is base64url-encoded JSON.
251 /// We decode the payload segment and read the `exp` field. Returns `None` if
252 /// the token is malformed or `exp` is missing.
253 pub(super) fn jwt_exp(token: &str) -> Option<i64> {
254 let payload = token.split('.').nth(1)?;
255 let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
256 .decode(payload)
257 .ok()?;
258 let claims: serde_json::Value = serde_json::from_slice(&bytes).ok()?;
259 claims["exp"].as_i64()
260 }
261
262 /// Returns `true` if the token's `exp` claim is within [`TOKEN_EXPIRY_BUFFER_SECS`]
263 /// of the current time (or already past). Returns `false` if the token cannot
264 /// be decoded — in that case, let the server decide.
265 #[cfg(test)]
266 pub(super) fn token_is_expired(token: &str) -> bool {
267 let Some(exp) = jwt_exp(token) else {
268 return false;
269 };
270 let now = chrono::Utc::now().timestamp();
271 now >= exp - super::TOKEN_EXPIRY_BUFFER_SECS
272 }
273
274 /// Check an HTTP response for errors, returning the response on success.
275 ///
276 /// For 429 responses, parses the `Retry-After` header so the retry loop
277 /// can use it instead of the default exponential backoff delay.
278 pub(super) async fn check_response(resp: reqwest::Response) -> Result<reqwest::Response> {
279 let status = resp.status().as_u16();
280 if status >= 400 {
281 let retry_after_secs = parse_retry_after(&resp);
282 let message = resp.text().await.unwrap_or_default();
283 return Err(SyncKitError::Server { status, message, retry_after_secs });
284 }
285 Ok(resp)
286 }
287
288 /// Extract the `Retry-After` header from an HTTP response as seconds.
289 /// Returns `None` if the header is absent, non-numeric, or zero.
290 fn parse_retry_after(resp: &reqwest::Response) -> Option<u64> {
291 resp.headers()
292 .get("retry-after")
293 .and_then(|v| v.to_str().ok())
294 .and_then(|v| v.parse::<u64>().ok())
295 .filter(|&secs| secs > 0)
296 }
297
298 /// Compute the backoff delay for a retry attempt.
299 ///
300 /// Uses the server's `Retry-After` header (if present on a 429) capped at
301 /// 60 seconds. Falls back to exponential backoff (1s, 2s, 4s).
302 fn retry_delay(err: &SyncKitError, attempt: u32) -> std::time::Duration {
303 if let SyncKitError::Server { retry_after_secs: Some(secs), .. } = err {
304 let capped = (*secs).min(60);
305 return std::time::Duration::from_secs(capped);
306 }
307 BASE_DELAY * 2u32.pow(attempt)
308 }
309
310 /// Returns true if the error is transient and worth retrying.
311 ///
312 /// Transient errors:
313 /// - Network-level failures (connection refused, timeout, DNS, etc.)
314 /// - Server errors (5xx)
315 /// - Rate limiting (429)
316 ///
317 /// Permanent errors (not retried):
318 /// - Client errors (4xx except 429) — bad request, auth failure, not found, etc.
319 /// - Serialization errors, encryption errors, missing session, etc.
320 pub(super) fn is_transient(err: &SyncKitError) -> bool {
321 match err {
322 SyncKitError::Http(e) => {
323 // Transport errors (timeout, connect, DNS) are transient.
324 // Builder errors are programming mistakes, redirect loops and
325 // body decode failures are permanent — retrying won't help.
326 !e.is_builder() && !e.is_redirect() && !e.is_decode()
327 }
328 SyncKitError::Server { status, .. } => {
329 // 5xx = server error (transient), 429 = rate limited (transient)
330 *status >= 500 || *status == 429
331 }
332 // Everything else (auth, crypto, serialization, keychain) is permanent.
333 // Note: some keychain errors (e.g. locked keychain, unavailable
334 // secret-service) are conceptually transient, but keychain operations
335 // are synchronous and not wrapped by retry_request.
336 _ => false,
337 }
338 }
339
340 #[cfg(test)]
341 mod tests {
342 use super::*;
343 use base64::Engine;
344 use chrono::Utc;
345 use std::time::Duration;
346
347 use super::super::TOKEN_EXPIRY_BUFFER_SECS;
348
349 fn test_config() -> super::super::SyncKitConfig {
350 super::super::SyncKitConfig {
351 server_url: "https://example.com".to_string(),
352 api_key: "test-api-key-123".to_string(),
353 }
354 }
355
356 /// Build a fake JWT with the given `exp` claim (no real signature).
357 fn fake_jwt(exp: i64) -> String {
358 let header = base64::engine::general_purpose::URL_SAFE_NO_PAD
359 .encode(r#"{"alg":"HS256","typ":"JWT"}"#);
360 let payload_json = serde_json::json!({
361 "sub": "550e8400-e29b-41d4-a716-446655440000",
362 "app": "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
363 "exp": exp,
364 "iat": exp - 3600,
365 });
366 let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD
367 .encode(payload_json.to_string().as_bytes());
368 let signature = base64::engine::general_purpose::URL_SAFE_NO_PAD
369 .encode(b"fake-signature");
370 format!("{header}.{payload}.{signature}")
371 }
372
373 // ── encrypt_change / decrypt_change ──
374
375 #[test]
376 fn encrypt_change_with_no_data() {
377 let client = SyncKitClient::new(test_config());
378 let entry = ChangeEntry {
379 table: "tasks".to_string(),
380 op: ChangeOp::Delete,
381 row_id: "row-1".to_string(),
382 timestamp: Utc::now(),
383 data: None,
384 };
385
386 let wire = client.encrypt_change(entry.clone()).unwrap();
387 assert_eq!(wire.table, "tasks");
388 assert_eq!(wire.op, ChangeOp::Delete);
389 assert_eq!(wire.row_id, "row-1");
390 assert!(wire.data.is_none());
391 }
392
393 #[test]
394 fn encrypt_change_fails_without_master_key() {
395 let client = SyncKitClient::new(test_config());
396 let entry = ChangeEntry {
397 table: "tasks".to_string(),
398 op: ChangeOp::Insert,
399 row_id: "row-1".to_string(),
400 timestamp: Utc::now(),
401 data: Some(serde_json::json!({"title": "test"})),
402 };
403
404 let err = client.encrypt_change(entry).unwrap_err();
405 assert!(matches!(err, SyncKitError::NoMasterKey));
406 }
407
408 #[test]
409 fn encrypt_change_produces_encrypted_data() {
410 let client = SyncKitClient::new(test_config());
411 let key = crypto::generate_master_key();
412 *client.master_key.write() = Some(crypto::ZeroizeOnDrop(key));
413
414 let original_data = serde_json::json!({"title": "Buy milk", "priority": 3});
415 let entry = ChangeEntry {
416 table: "tasks".to_string(),
417 op: ChangeOp::Insert,
418 row_id: "row-1".to_string(),
419 timestamp: Utc::now(),
420 data: Some(original_data.clone()),
421 };
422
423 let wire = client.encrypt_change(entry).unwrap();
424 assert!(wire.data.is_some());
425 let encrypted = wire.data.unwrap();
426 assert!(encrypted.is_string());
427 assert_ne!(encrypted, original_data);
428 }
429
430 #[test]
431 fn encrypt_decrypt_roundtrip() {
432 let client = SyncKitClient::new(test_config());
433 let key = crypto::generate_master_key();
434 *client.master_key.write() = Some(crypto::ZeroizeOnDrop(key));
435
436 let original_data = serde_json::json!({
437 "title": "Buy milk",
438 "tags": ["groceries", "urgent"],
439 "count": 42
440 });
441 let ts = Utc::now();
442 let entry = ChangeEntry {
443 table: "tasks".to_string(),
444 op: ChangeOp::Update,
445 row_id: "row-abc".to_string(),
446 timestamp: ts,
447 data: Some(original_data.clone()),
448 };
449
450 let wire = client.encrypt_change(entry).unwrap();
451 let pull_entry = PullChangeEntry {
452 seq: 1,
453 device_id: uuid::Uuid::new_v4(),
454 table: wire.table,
455 op: wire.op,
456 row_id: wire.row_id,
457 timestamp: wire.timestamp,
458 data: wire.data,
459 key_id: None,
460 };
461
462 let decrypted = client.decrypt_change(pull_entry).unwrap();
463 assert_eq!(decrypted.table, "tasks");
464 assert_eq!(decrypted.op, ChangeOp::Update);
465 assert_eq!(decrypted.row_id, "row-abc");
466 assert_eq!(decrypted.data.unwrap(), original_data);
467 }
468
469 #[test]
470 fn decrypt_change_with_no_data() {
471 let client = SyncKitClient::new(test_config());
472 let pull_entry = PullChangeEntry {
473 seq: 5,
474 device_id: uuid::Uuid::new_v4(),
475 table: "events".to_string(),
476 op: ChangeOp::Delete,
477 row_id: "evt-1".to_string(),
478 timestamp: Utc::now(),
479 data: None,
480 key_id: None,
481 };
482
483 let decrypted = client.decrypt_change(pull_entry).unwrap();
484 assert_eq!(decrypted.table, "events");
485 assert_eq!(decrypted.op, ChangeOp::Delete);
486 assert!(decrypted.data.is_none());
487 }
488
489 #[test]
490 fn decrypt_change_fails_without_master_key() {
491 let client = SyncKitClient::new(test_config());
492 let pull_entry = PullChangeEntry {
493 seq: 1,
494 device_id: uuid::Uuid::new_v4(),
495 table: "tasks".to_string(),
496 op: ChangeOp::Insert,
497 row_id: "row-1".to_string(),
498 timestamp: Utc::now(),
499 data: Some(serde_json::json!("some-encrypted-string")),
500 key_id: None,
501 };
502
503 let err = client.decrypt_change(pull_entry).unwrap_err();
504 assert!(matches!(err, SyncKitError::NoMasterKey));
505 }
506
507 // ── is_transient error classification ──
508
509 #[test]
510 fn is_transient_server_5xx() {
511 let err = SyncKitError::Server { status: 500, message: "Internal Server Error".to_string(), retry_after_secs: None };
512 assert!(is_transient(&err));
513 let err = SyncKitError::Server { status: 502, message: "Bad Gateway".to_string(), retry_after_secs: None };
514 assert!(is_transient(&err));
515 let err = SyncKitError::Server { status: 503, message: "Service Unavailable".to_string(), retry_after_secs: None };
516 assert!(is_transient(&err));
517 let err = SyncKitError::Server { status: 504, message: "Gateway Timeout".to_string(), retry_after_secs: None };
518 assert!(is_transient(&err));
519 }
520
521 #[test]
522 fn is_transient_rate_limited_429() {
523 let err = SyncKitError::Server { status: 429, message: "Too Many Requests".to_string(), retry_after_secs: None };
524 assert!(is_transient(&err));
525 }
526
527 #[test]
528 fn is_not_transient_client_4xx() {
529 let err = SyncKitError::Server { status: 400, message: "Bad Request".to_string(), retry_after_secs: None };
530 assert!(!is_transient(&err));
531 let err = SyncKitError::Server { status: 401, message: "Unauthorized".to_string(), retry_after_secs: None };
532 assert!(!is_transient(&err));
533 let err = SyncKitError::Server { status: 403, message: "Forbidden".to_string(), retry_after_secs: None };
534 assert!(!is_transient(&err));
535 let err = SyncKitError::Server { status: 404, message: "Not Found".to_string(), retry_after_secs: None };
536 assert!(!is_transient(&err));
537 let err = SyncKitError::Server { status: 409, message: "Conflict".to_string(), retry_after_secs: None };
538 assert!(!is_transient(&err));
539 let err = SyncKitError::Server { status: 422, message: "Unprocessable Entity".to_string(), retry_after_secs: None };
540 assert!(!is_transient(&err));
541 }
542
543 #[test]
544 fn is_not_transient_not_authenticated() {
545 assert!(!is_transient(&SyncKitError::NotAuthenticated));
546 }
547
548 #[test]
549 fn is_not_transient_no_master_key() {
550 assert!(!is_transient(&SyncKitError::NoMasterKey));
551 }
552
553 #[test]
554 fn is_not_transient_decryption_failed() {
555 assert!(!is_transient(&SyncKitError::DecryptionFailed));
556 }
557
558 #[test]
559 fn is_not_transient_invalid_envelope() {
560 assert!(!is_transient(&SyncKitError::InvalidEnvelope("bad version".to_string())));
561 }
562
563 #[test]
564 fn is_not_transient_crypto() {
565 assert!(!is_transient(&SyncKitError::Crypto("encrypt failed".to_string())));
566 }
567
568 #[test]
569 fn is_not_transient_json() {
570 let err: SyncKitError = serde_json::from_str::<serde_json::Value>("not json")
571 .unwrap_err()
572 .into();
573 assert!(!is_transient(&err));
574 }
575
576 #[test]
577 fn is_not_transient_base64() {
578 let err: SyncKitError = base64::engine::general_purpose::STANDARD
579 .decode("!!!invalid!!!")
580 .unwrap_err()
581 .into();
582 assert!(!is_transient(&err));
583 }
584
585 #[test]
586 fn is_not_transient_token_expired() {
587 assert!(!is_transient(&SyncKitError::TokenExpired));
588 }
589
590 #[test]
591 fn is_not_transient_internal() {
592 assert!(!is_transient(&SyncKitError::Internal("lock poisoned".to_string())));
593 }
594
595 // ── Retry constants ──
596
597 #[test]
598 fn retry_constants_are_sensible() {
599 assert_eq!(MAX_RETRIES, 3);
600 assert_eq!(BASE_DELAY, Duration::from_secs(1));
601 }
602
603 #[test]
604 fn backoff_delays_are_exponential() {
605 let delay_0 = BASE_DELAY * 2u32.pow(0);
606 let delay_1 = BASE_DELAY * 2u32.pow(1);
607 let delay_2 = BASE_DELAY * 2u32.pow(2);
608
609 assert_eq!(delay_0, Duration::from_secs(1));
610 assert_eq!(delay_1, Duration::from_secs(2));
611 assert_eq!(delay_2, Duration::from_secs(4));
612 }
613
614 // ── is_transient boundary: 429 vs 428, 499 vs 500 ──
615
616 #[test]
617 fn is_transient_boundary_values() {
618 assert!(!is_transient(&SyncKitError::Server { status: 428, message: String::new(), retry_after_secs: None }));
619 assert!(is_transient(&SyncKitError::Server { status: 429, message: String::new(), retry_after_secs: None }));
620 assert!(!is_transient(&SyncKitError::Server { status: 430, message: String::new(), retry_after_secs: None }));
621 assert!(!is_transient(&SyncKitError::Server { status: 499, message: String::new(), retry_after_secs: None }));
622 assert!(is_transient(&SyncKitError::Server { status: 500, message: String::new(), retry_after_secs: None }));
623 }
624
625 // ── Token expiry detection ──
626
627 #[test]
628 fn jwt_exp_extracts_expiry() {
629 let exp = Utc::now().timestamp() + 3600;
630 let token = fake_jwt(exp);
631 assert_eq!(jwt_exp(&token), Some(exp));
632 }
633
634 #[test]
635 fn jwt_exp_returns_none_for_garbage() {
636 assert_eq!(jwt_exp("not-a-jwt"), None);
637 assert_eq!(jwt_exp("a.b.c"), None);
638 assert_eq!(jwt_exp(""), None);
639 }
640
641 #[test]
642 fn token_is_expired_for_past_exp() {
643 let token = fake_jwt(Utc::now().timestamp() - 3600);
644 assert!(token_is_expired(&token));
645 }
646
647 #[test]
648 fn token_is_expired_within_buffer() {
649 let token = fake_jwt(Utc::now().timestamp() + 10);
650 assert!(token_is_expired(&token));
651 }
652
653 #[test]
654 fn token_is_not_expired_when_fresh() {
655 let token = fake_jwt(Utc::now().timestamp() + 3600);
656 assert!(!token_is_expired(&token));
657 }
658
659 #[test]
660 fn token_is_not_expired_for_garbage() {
661 assert!(!token_is_expired("garbage"));
662 }
663
664 #[test]
665 fn token_expires_exactly_at_buffer_boundary() {
666 let token = fake_jwt(Utc::now().timestamp() + TOKEN_EXPIRY_BUFFER_SECS);
667 assert!(token_is_expired(&token));
668 }
669
670 #[test]
671 fn token_expires_just_past_buffer() {
672 let token = fake_jwt(Utc::now().timestamp() + TOKEN_EXPIRY_BUFFER_SECS + 1);
673 assert!(!token_is_expired(&token));
674 }
675
676 // ── encrypt_change preserves metadata ──
677
678 #[test]
679 fn encrypt_change_preserves_all_metadata() {
680 let client = SyncKitClient::new(test_config());
681 let key = crypto::generate_master_key();
682 client.set_master_key_raw(key);
683
684 let ts = Utc::now();
685 let entry = ChangeEntry {
686 table: "contacts".to_string(),
687 op: ChangeOp::Update,
688 row_id: "unique-row-id".to_string(),
689 timestamp: ts,
690 data: Some(serde_json::json!({"name": "Alice"})),
691 };
692
693 let wire = client.encrypt_change(entry).unwrap();
694 assert_eq!(wire.table, "contacts");
695 assert_eq!(wire.op, ChangeOp::Update);
696 assert_eq!(wire.row_id, "unique-row-id");
697 assert_eq!(wire.timestamp, ts);
698 }
699
700 // ── Multiple entries encrypt/decrypt ──
701
702 #[test]
703 fn multiple_entries_encrypt_decrypt_roundtrip() {
704 let client = SyncKitClient::new(test_config());
705 let key = crypto::generate_master_key();
706 client.set_master_key_raw(key);
707
708 let entries = [
709 ChangeEntry {
710 table: "tasks".to_string(),
711 op: ChangeOp::Insert,
712 row_id: "r1".to_string(),
713 timestamp: Utc::now(),
714 data: Some(serde_json::json!({"title": "Task 1"})),
715 },
716 ChangeEntry {
717 table: "tasks".to_string(),
718 op: ChangeOp::Update,
719 row_id: "r2".to_string(),
720 timestamp: Utc::now(),
721 data: Some(serde_json::json!({"title": "Task 2", "done": true})),
722 },
723 ChangeEntry {
724 table: "events".to_string(),
725 op: ChangeOp::Delete,
726 row_id: "r3".to_string(),
727 timestamp: Utc::now(),
728 data: None,
729 },
730 ];
731
732 let wire_entries: Vec<_> = entries
733 .iter()
734 .cloned()
735 .map(|e| client.encrypt_change(e).unwrap())
736 .collect();
737
738 assert_eq!(wire_entries.len(), 3);
739 assert!(wire_entries[0].data.is_some());
740 assert!(wire_entries[1].data.is_some());
741 assert!(wire_entries[2].data.is_none());
742
743 for (i, wire) in wire_entries.into_iter().enumerate() {
744 let pull = PullChangeEntry {
745 seq: i as i64,
746 device_id: uuid::Uuid::new_v4(),
747 table: wire.table,
748 op: wire.op,
749 row_id: wire.row_id,
750 timestamp: wire.timestamp,
751 data: wire.data,
752 key_id: None,
753 };
754 let decrypted = client.decrypt_change(pull).unwrap();
755 assert_eq!(decrypted.table, entries[i].table);
756 assert_eq!(decrypted.op, entries[i].op);
757 assert_eq!(decrypted.data, entries[i].data);
758 }
759 }
760
761 // ── Unicode and edge-case roundtrips ──
762
763 #[test]
764 fn encrypt_decrypt_roundtrip_unicode_table() {
765 let client = SyncKitClient::new(test_config());
766 let key = crypto::generate_master_key();
767 client.set_master_key_raw(key);
768
769 let entry = ChangeEntry {
770 table: "\u{65E5}\u{672C}\u{8A9E}\u{30C6}\u{30FC}\u{30D6}\u{30EB}".into(),
771 op: ChangeOp::Insert,
772 row_id: "row-1".into(),
773 timestamp: Utc::now(),
774 data: Some(serde_json::json!({"name": "\u{30C6}\u{30B9}\u{30C8}"})),
775 };
776
777 let wire = client.encrypt_change(entry).unwrap();
778 let pull = PullChangeEntry {
779 seq: 1,
780 device_id: uuid::Uuid::new_v4(),
781 table: wire.table,
782 op: wire.op,
783 row_id: wire.row_id,
784 timestamp: wire.timestamp,
785 data: wire.data,
786 key_id: None,
787 };
788 let decrypted = client.decrypt_change(pull).unwrap();
789 assert_eq!(
790 decrypted.table,
791 "\u{65E5}\u{672C}\u{8A9E}\u{30C6}\u{30FC}\u{30D6}\u{30EB}"
792 );
793 }
794
795 #[test]
796 fn encrypt_decrypt_roundtrip_empty_row_id() {
797 let client = SyncKitClient::new(test_config());
798 let key = crypto::generate_master_key();
799 client.set_master_key_raw(key);
800
801 let entry = ChangeEntry {
802 table: "t".into(),
803 op: ChangeOp::Insert,
804 row_id: "".into(),
805 timestamp: Utc::now(),
806 data: Some(serde_json::json!(42)),
807 };
808
809 let wire = client.encrypt_change(entry).unwrap();
810 let pull = PullChangeEntry {
811 seq: 1,
812 device_id: uuid::Uuid::new_v4(),
813 table: wire.table,
814 op: wire.op,
815 row_id: wire.row_id,
816 timestamp: wire.timestamp,
817 data: wire.data,
818 key_id: None,
819 };
820 let decrypted = client.decrypt_change(pull).unwrap();
821 assert_eq!(decrypted.row_id, "");
822 assert_eq!(decrypted.data.unwrap(), serde_json::json!(42));
823 }
824
825 // ── decrypt_change_multi_key key selection ──
826 //
827 // Pins the key-id boundary logic that picks between primary and pending
828 // master keys during a rotation window. The mutations targeted here:
829 // * `effective_key_id == pending_key_id` (== ↔ !=)
830 // * `effective_key_id == primary_key_id || effective_key_id <= 1`
831 // (`||` ↔ `&&`, `<= 1` ↔ `< 1`/`<= 0`)
832 // * `entry.key_id.unwrap_or(1)` default
833 // * the Err-fallthrough that tries pending if primary fails
834
835 fn encrypt_with(key: &[u8; 32], value: serde_json::Value) -> serde_json::Value {
836 crypto::encrypt_json(&value, key).unwrap()
837 }
838
839 fn pull_entry_with(
840 encrypted: serde_json::Value,
841 key_id: Option<i32>,
842 ) -> PullChangeEntry {
843 PullChangeEntry {
844 seq: 1,
845 device_id: uuid::Uuid::new_v4(),
846 table: "tasks".to_string(),
847 op: ChangeOp::Insert,
848 row_id: "row-multikey".to_string(),
849 timestamp: Utc::now(),
850 data: Some(encrypted),
851 key_id,
852 }
853 }
854
855 #[test]
856 fn multi_key_picks_pending_when_key_id_matches() {
857 let primary = crypto::generate_master_key();
858 let pending = crypto::generate_master_key();
859 let plaintext = serde_json::json!({"v": "pending-payload"});
860 let entry = pull_entry_with(encrypt_with(&pending, plaintext.clone()), Some(7));
861
862 let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 1, &pending, 7).unwrap();
863 assert_eq!(decrypted.data.unwrap(), plaintext);
864 }
865
866 #[test]
867 fn multi_key_picks_primary_when_key_id_matches_primary() {
868 let primary = crypto::generate_master_key();
869 let pending = crypto::generate_master_key();
870 let plaintext = serde_json::json!({"v": "primary-payload"});
871 let entry = pull_entry_with(encrypt_with(&primary, plaintext.clone()), Some(3));
872
873 let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 3, &pending, 9).unwrap();
874 assert_eq!(decrypted.data.unwrap(), plaintext);
875 }
876
877 #[test]
878 fn multi_key_treats_missing_key_id_as_primary() {
879 // `entry.key_id.unwrap_or(1)` defaults to 1; `effective_key_id <= 1`
880 // arm routes to primary. A mutation changing `unwrap_or(1)` to
881 // `unwrap_or(99)` would route to "unknown" path and try primary anyway
882 // via the fallback — but a mutation to `<= 1` → `< 1` would skip the
883 // direct-primary branch and fall into the fallback path.
884 let primary = crypto::generate_master_key();
885 let pending = crypto::generate_master_key();
886 let plaintext = serde_json::json!({"v": "legacy-no-key-id"});
887 let entry = pull_entry_with(encrypt_with(&primary, plaintext.clone()), None);
888
889 let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 5, &pending, 6).unwrap();
890 assert_eq!(decrypted.data.unwrap(), plaintext);
891 }
892
893 #[test]
894 fn multi_key_unknown_key_id_falls_back_to_primary_first() {
895 // effective_key_id (42) matches neither primary (5) nor pending (6),
896 // and is > 1. The fallback first tries primary; here the data WAS
897 // encrypted with primary, so the fallback succeeds.
898 let primary = crypto::generate_master_key();
899 let pending = crypto::generate_master_key();
900 let plaintext = serde_json::json!({"v": "via-fallback-primary"});
901 let entry = pull_entry_with(encrypt_with(&primary, plaintext.clone()), Some(42));
902
903 let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 5, &pending, 6).unwrap();
904 assert_eq!(decrypted.data.unwrap(), plaintext);
905 }
906
907 #[test]
908 fn multi_key_unknown_key_id_falls_back_to_pending_when_primary_fails() {
909 // Unknown key_id and the data was encrypted with pending — fallback
910 // must try primary first (fails), then pending (succeeds).
911 let primary = crypto::generate_master_key();
912 let pending = crypto::generate_master_key();
913 let plaintext = serde_json::json!({"v": "via-fallback-pending"});
914 let entry = pull_entry_with(encrypt_with(&pending, plaintext.clone()), Some(42));
915
916 let decrypted = SyncKitClient::decrypt_change_multi_key(entry, &primary, 5, &pending, 6).unwrap();
917 assert_eq!(decrypted.data.unwrap(), plaintext);
918 }
919 }
920