Skip to main content

max / makenotwork

SyncKit client: refactor pull variants, expand conflict resolution, harden crypto - Extract pull_inner generic helper, eliminating ~120 LOC duplication across 4 pull variants - Expand conflict resolution with field-merge and LWW strategies - Add SSE buffer limit, retry-safe JSON deserialization, single-shot OAuth exchange - Zeroize normalized password after Argon2 derivation - Update audit review and competition docs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-01 14:24 UTC
Commit: 582ccc7857643182f78c15833491c831160dff32
Parent: 384217c
17 files changed, +799 insertions, -275 deletions
@@ -4,6 +4,14 @@ See [audit_review.md](./audit_review.md) for current scorecard and grades.
4 4
5 5 ## Changes Since Last Audit
6 6
7 + ### Tenth audit (2026-04-30, Run 17 cross-project)
8 + - **Test count:** 340 (241 unit + 99 integration). 0 clippy warnings. 0 failures.
9 + - **Grade:** A (maintained). v0.3.1. ~5,945 LOC (src) + 2,742 (integration) = 8,687 total.
10 + - **Module heatmap updated** with per-file line counts. Client split into 6 submodules (mod.rs, helpers.rs, auth.rs, sync.rs, encryption.rs, blob.rs, subscribe.rs). conflict.rs at 950 LOC is the second largest module after crypto.rs (1,049 LOC).
11 + - **New cold spots:** (1) sync.rs pull duplication -- key-extraction + decrypt pattern copy-pasted across 5 pull variants (lines 112-288), a private `do_pull_inner` could eliminate ~100 LOC. (2) `fake_jwt` test helper defined identically in helpers.rs and auth.rs tests. (3) rand 0.8 not latest (0.9.x available, compatible with chacha20poly1305 0.10).
12 + - **Mandatory surprise:** `resolve_lww` semantic asymmetry -- DELETE always beats non-DELETE regardless of timestamp. Remote Delete kills local Insert even if Insert is newer. Convergent and safe, but potentially surprising. Documented in test at line 847.
13 + - **Previous items unchanged:** subscribe.rs:107 unwrap_or_default still present, rustls-webpki vulns still upstream-blocked, no key rotation mechanism still open.
14 +
7 15 ### Seventh audit (2026-03-28, Run 12 cross-project)
8 16 - **Test count:** 297 (197 unit + 99 integration + 1 doctest). 0 clippy warnings. 0 failures.
9 17 - **Grade:** A (maintained). v0.3.0.
@@ -1,51 +1,58 @@
1 1 # SyncKit Client SDK Audit Review
2 2
3 - - Last audited: 2026-04-18 (ninth audit, Run 15 cross-project)
4 - - Previous audit: 2026-04-15 (eighth audit, Run 14 cross-project)
3 + - Last audited: 2026-04-30 (tenth audit, Run 17 cross-project)
4 + - Previous audit: 2026-04-18 (ninth audit, Run 15 cross-project)
5 5 - Crate: `synckit-client` v0.3.1
6 6 - Path: `MNW/shared/synckit-client/`
7 7
8 8 ## Overall Grade: A
9 9
10 - Run 15: 327 tests (all pass). 0 clippy warnings. Grade stable at A. v0.3.1. ~5,426 LOC. Minor issues only: subscribe.rs:107 unwrap_or_default masks server errors, rustls-webpki vulns (upstream-blocked), no key rotation mechanism.
10 + Run 17: 340 tests (241 unit + 99 integration). 0 clippy warnings. v0.3.1. ~5,945 LOC (src) + 2,742 (integration) = 8,687 total. Grade stable at A. Minor issues only: subscribe.rs:107 unwrap_or_default masks server errors, rustls-webpki vulns (upstream-blocked), no key rotation mechanism.
11 11
12 12 ## Scorecard
13 13
14 - | Dimension | Grade |
15 - |-----------|-------|
16 - | Code Quality | A |
17 - | Architecture | A |
18 - | Testing | A+ |
19 - | Security | A |
20 - | Performance | A |
21 - | Documentation | A- |
22 - | Dependencies | A- |
23 - | Type Safety | A |
24 - | Observability | A |
25 - | Concurrency | A |
26 - | Resilience | A |
27 - | API Consistency | A |
28 - | Codebase Size | A |
14 + | Dimension | Grade | Notes |
15 + |-----------|-------|-------|
16 + | Code Quality | A | Zero unwraps in production code (1 expect on infallible Client::build) |
17 + | Architecture | A | Clean module boundaries. Public API minimal. Wire types pub(crate). |
18 + | Testing | A | 340 tests for 5,945 LOC = 57.2 tests/KLOC. 54 crypto tests alone. |
19 + | Security | A+ | XChaCha20-Poly1305, Argon2id (OWASP params), NFC normalization, zeroize, envelope versioning |
20 + | Performance | A- | Pre-computed endpoints. Minor: 5x duplicated pull pattern. |
21 + | Documentation | A | Module-level docs on every file. Crypto rationale documented. |
22 + | Dependencies | A- | All deps latest stable major. rand 0.8 not latest (0.9 exists). |
23 + | Type Safety | A | Proper enums, ZeroizeOnDrop wrapper, pub(crate) on wire types. |
24 + | Concurrency | A | parking_lot::RwLock, never held across .await. Send+Sync asserted. |
25 + | Resilience | A | 3-retry exponential backoff, Retry-After support. Token expiry pre-flight. SSE buffer bounded. |
26 + | Codebase Size | A | 5,945 LOC lean for feature set. Pull variants refactored to shared `pull_inner` helper (2026-05-01). |
29 27
30 28 ## Module Heatmap
31 29
32 30 | Module | File | LOC | Tests | Grade | Notes |
33 31 |--------|------|-----|-------|-------|-------|
34 - | client | `src/client.rs` | ~750 | 85 | A | Comprehensive unit tests. Auth state, encrypt/decrypt roundtrip, type serialization, error classification, token expiry, OAuth URL construction. Blob E2E encryption. Send+Sync assert. with_http_client constructor. |
35 - | crypto | `src/crypto.rs` | ~480 | 19 | A+ | Excellent coverage. Roundtrip, wrong-key, version, truncation, uniqueness, zeroize. Binary blob encrypt/decrypt. Random salt wrapping. |
36 - | types | `src/types.rs` | ~240 | 13 | A | Serde roundtrip, Display/serde consistency, from_str_opt rejection, Copy/Hash traits, skip_serializing_if, extra field tolerance, ISO timestamp parsing. |
37 - | keystore | `src/keystore.rs` | 94 | 18 | A | Service name construction, base64 roundtrips, length validation, error handling. Platform behavior documented. |
38 - | subscribe | `src/subscribe.rs` | ~100 | ~10 | A- | SSE subscription. unwrap_or_default at line 107 masks server errors. |
39 - | error | `src/error.rs` | ~100 | 10 | A | Send+Sync assert, Display for all variants, Debug no-panic, source() chain. |
40 - | lib | `src/lib.rs` | 53 | 1 (doctest) | A | Clean re-exports. Doctest validates API surface compiles. |
32 + | crypto | `src/crypto.rs` | 1,049 | 54 | A+ | Excellent coverage. Roundtrip, wrong-key, version, truncation, uniqueness, zeroize. Binary blob encrypt/decrypt. Random salt wrapping. |
33 + | client/mod | `src/client/mod.rs` | 560 | 28 | A | Auth state, encrypt/decrypt roundtrip, type serialization, error classification, token expiry, OAuth URL construction. Send+Sync assert. |
34 + | client/helpers | `src/client/helpers.rs` | 776 | 33 | A | Pre-built endpoints, token management, session helpers. |
35 + | client/auth | `src/client/auth.rs` | 428 | 16 | A | OAuth flows, token refresh, session management. |
36 + | client/sync | `src/client/sync.rs` | 414 | 18 | A | Push/pull operations. Pull duplication resolved via `pull_inner` helper. |
37 + | client/encryption | `src/client/encryption.rs` | 233 | 4 | A | Encryption setup, key wrapping, password change. |
38 + | client/blob | `src/client/blob.rs` | 181 | 3 | A- | Blob upload/download with E2E encryption. |
39 + | client/subscribe | `src/client/subscribe.rs` | 166 | 4 | A- | SSE subscription. unwrap_or_default at line 107 masks server errors. |
40 + | conflict | `src/conflict.rs` | 950 | 36 | A | LWW conflict resolution. Semantic asymmetry: DELETE beats non-DELETE regardless of timestamp. |
41 + | types | `src/types.rs` | 500 | 17 | A | Serde roundtrip, Display/serde consistency, from_str_opt rejection, Copy/Hash traits, skip_serializing_if, extra field tolerance, ISO timestamp parsing. |
42 + | error | `src/error.rs` | 193 | 10 | A | Send+Sync assert, Display for all variants, Debug no-panic, source() chain. |
43 + | keystore | `src/keystore.rs` | 318 | 18 | A- | Service name construction, base64 roundtrips, length validation, error handling. Platform behavior documented. |
41 44
42 45 ## Cold Spots
43 46
44 - All previous cold spots resolved. Minor issues:
47 + 1. ~~**sync.rs pull duplication**~~ -- Fixed 2026-05-01. Extracted `pull_inner` generic helper. 4 public methods now delegate to shared implementation.
48 + 2. **fake_jwt test helper duplication** -- Defined identically in helpers.rs and auth.rs tests.
49 + 3. **rand 0.8** -- 0.9.x available. Not urgent (chacha20poly1305 0.10 compatible).
45 50
46 - 1. **subscribe.rs:107 unwrap_or_default** -- Masks server errors during SSE event parsing. If the server sends a malformed event, the client silently produces a default value instead of propagating the error.
47 - 2. **rustls-webpki vulns** -- Transitive dependency advisories. Blocked on upstream.
48 - 3. **No key rotation mechanism** -- No way to rotate the master key without re-encrypting all data.
51 + ### Carried forward
52 +
53 + 4. **subscribe.rs:107 unwrap_or_default** -- Masks server errors during SSE event parsing. If the server sends a malformed event, the client silently produces a default value instead of propagating the error.
54 + 5. **rustls-webpki vulns** -- Transitive dependency advisories. Blocked on upstream.
55 + 6. **No key rotation mechanism** -- No way to rotate the master key without re-encrypting all data.
49 56
50 57 ## Strengths
51 58
@@ -54,7 +61,7 @@ All previous cold spots resolved. Minor issues:
54 61 - **Clean key hierarchy** -- Three layers (password -> wrapping key -> master key -> per-entry encryption) well-documented and correctly implemented.
55 62 - **No consumer-specific logic** -- Fully generic. No references to GO, BB, or AF. Table names and data shapes are opaque to the SDK.
56 63 - **ZeroizeOnDrop** -- In-memory keys are zeroed on drop via volatile writes.
57 - - **Comprehensive test suite** -- 327 tests at ~60 tests/KLOC. Coverage across all modules including adversarial inputs.
64 + - **Comprehensive test suite** -- 340 tests at ~57 tests/KLOC. Coverage across all modules including adversarial inputs.
58 65
59 66 ## Weaknesses
60 67
@@ -62,6 +69,18 @@ All previous cold spots resolved. Minor issues:
62 69 - **subscribe.rs:107 unwrap_or_default** -- Masks server errors during SSE event parsing.
63 70 - **rustls-webpki transitive vulns** -- Blocked on upstream.
64 71
72 + ## Action Items
73 +
74 + ### Previous (carried forward)
75 + - subscribe.rs:107 unwrap_or_default: Still present (unchanged)
76 + - rustls-webpki vulns: Still upstream-blocked
77 + - No key rotation mechanism: Still open (architectural decision)
78 +
79 + ### New (Run 17)
80 + - ~~[LOW] Refactor pull variants in sync.rs to share a private `pull_inner` helper~~ -- Done 2026-05-01
81 + - [LOW] Extract `fake_jwt` test helper to shared `#[cfg(test)]` module
82 + - [LOW] Track rand 0.9 upgrade (currently compatible with chacha20poly1305 0.10)
83 +
65 84 ### Resolved (previous audits)
66 85 - ~~**CRITICAL: Blob data NOT encrypted**~~ -- RESOLVED
67 86 - ~~**Deterministic Argon2 salt**~~ -- RESOLVED
@@ -75,33 +94,29 @@ All previous cold spots resolved. Minor issues:
75 94
76 95 ## Mandatory Surprise
77 96
78 - **subscribe.rs:107 unwrap_or_default masks server errors.**
79 -
80 - The SSE event handler at subscribe.rs:107 uses `unwrap_or_default()` when parsing incoming server events. If the server sends a malformed JSON payload, instead of surfacing the parse error (which could indicate a protocol mismatch, server bug, or man-in-the-middle attack), the client silently produces a default empty event and continues processing.
81 -
82 - This means a broken server deployment that sends corrupted sync data would appear to work -- the client would process empty changesets and report success, while actually missing all changes. The consumer app would show "sync complete" with stale data.
83 -
84 - **Verdict:** Low severity in practice (the server is well-tested and E2E encryption would cause decryption failures on tampered data), but the silent masking violates the "fail loudly" principle. Should log at warn level and/or return an error variant.
97 + **The `resolve_lww` function has a deliberate semantic asymmetry: DELETE always beats non-DELETE regardless of timestamp.** A remote Delete can kill a local Insert even if the Insert is newer. This is convergent and safe (both peers agree on the outcome), but potentially surprising to users who expect timestamp to be the sole tiebreaker. Documented in test at line 847 (`lww_remote_delete_beats_local_insert`).
85 98
86 - ### Previous Surprise
99 + ### Previous Surprises
87 100
88 - **change_password race + deterministic salt** -- Both resolved. Lock guard dropped before await. Random salt per operation.
101 + - **subscribe.rs:107 unwrap_or_default masks server errors** -- Still present. SSE event handler silently produces default on malformed JSON. Low severity (E2E encryption catches tampering).
102 + - **change_password race + deterministic salt** -- Both resolved. Lock guard dropped before await. Random salt per operation.
89 103
90 104 ## Metrics Over Time
91 105
92 - | Date | LOC | Files | Tests | Tests/KLOC |
93 - |------|-----|-------|-------|------------|
94 - | 2026-03-11 | 1,416 | 6 | 13 | 9.2 |
95 - | 2026-03-13 | ~1.4K | 6 | 109 | ~77 |
96 - | 2026-03-13 (post-fix) | ~1.5K | 6 | 118 | ~79 |
97 - | 2026-03-13 (adversarial) | ~2.5K | 6 | 234 | ~94 |
98 - | 2026-03-13 (perf+resilience) | ~2.6K | 6 | 243 | ~94 |
99 - | 2026-03-13 (testing push) | ~2.8K | 6 | 297 | ~106 |
100 - | 2026-03-16 (Run 6) | 4,327 | 6 | 297 | ~69 |
101 - | 2026-03-18 (Run 9) | 4,327 | 6 | 298 | ~69 |
102 - | 2026-03-28 (Run 12) | 4,327 | 6 | 297 | ~69 |
103 - | 2026-04-15 (Run 14) | ~5,426 | -- | 327 | ~60 |
104 - | 2026-04-18 (Run 15) | ~5,426 | -- | 327 | ~60 |
106 + | Date | LOC | Files | Tests | Tests/KLOC | Clippy | Expects | Grade |
107 + |------|-----|-------|-------|------------|--------|---------|-------|
108 + | 2026-03-11 | 1,416 | 6 | 13 | 9.2 | -- | -- | -- |
109 + | 2026-03-13 | ~1.4K | 6 | 109 | ~77 | -- | -- | -- |
110 + | 2026-03-13 (post-fix) | ~1.5K | 6 | 118 | ~79 | -- | -- | -- |
111 + | 2026-03-13 (adversarial) | ~2.5K | 6 | 234 | ~94 | -- | -- | -- |
112 + | 2026-03-13 (perf+resilience) | ~2.6K | 6 | 243 | ~94 | -- | -- | -- |
113 + | 2026-03-13 (testing push) | ~2.8K | 6 | 297 | ~106 | -- | -- | -- |
114 + | 2026-03-16 (Run 6) | 4,327 | 6 | 297 | ~69 | -- | -- | A |
115 + | 2026-03-18 (Run 9) | 4,327 | 6 | 298 | ~69 | -- | -- | A |
116 + | 2026-03-28 (Run 12) | 4,327 | 6 | 297 | ~69 | -- | -- | A |
117 + | 2026-04-15 (Run 14) | ~5,426 | -- | 327 | ~60 | -- | -- | A |
118 + | 2026-04-18 (Run 15) | ~5,426 | -- | 327 | ~60 | -- | -- | A |
119 + | 2026-04-30 (Run 17) | ~5,945 | -- | 340 | ~57 | 0 | 1 | A |
105 120
106 121 ---
107 122
@@ -6,13 +6,13 @@ Last updated: 2026-04-10
6 6
7 7 SyncKit is an E2E encrypted, Rust-native, offline-first sync SDK for indie desktop and mobile apps. The server (hosted on MNW) stores only encrypted blobs -- zero-knowledge by design. Bundled with OTA updates and device management. Consumers: GoingsOn, Balanced Breakfast, audiofiles (all Tauri apps).
8 8
9 - The key differentiators are server-zero-knowledge encryption (XChaCha20-Poly1305 + Argon2id, keys never leave the device), opaque-blob storage (bring-your-own-schema, no server-side migrations), and the bundled OTA + device management layer that no sync competitor offers. Pricing is bundled with MNW creator tiers ($10-40/mo), not per-read/write metered.
9 + The key differentiators are server-zero-knowledge encryption (XChaCha20-Poly1305 + Argon2id, keys never leave the device), opaque-blob storage (bring-your-own-schema, no server-side migrations), and the bundled OTA + device management layer that no sync competitor offers. Pricing is bundled with MNW creator tiers ($10-60/mo), not per-read/write metered.
10 10
11 11 ## Pricing Comparison
12 12
13 13 | Tool | Price | Model |
14 14 |------|-------|-------|
15 - | **SyncKit** | $10-40/mo (bundled) | Included in MNW creator tier |
15 + | **SyncKit** | $10-60/mo (bundled) | Included in MNW creator tier |
16 16 | Firebase Firestore | Pay-per-use | $0.18/100K reads+writes, $0.26/GB |
17 17 | Supabase | $0-$599/mo | Freemium + usage overages |
18 18 | PowerSync | $0-$599/mo | Usage-based (GB synced) |
@@ -2,7 +2,61 @@
2 2
3 3 Done: All phases (S1-S5). Active: None. Next: Post-beta items below.
4 4
5 - v0.3.0. Audit grade A. 304 tests.
5 + v0.3.1. Audit grade A. 340 tests.
6 +
7 + ## Fuzz Findings (2026-04-27)
8 +
9 + ### Serious
10 + - [x] SSE buffer can grow unbounded — added 1MB `MAX_SSE_BUFFER` limit in `subscribe.rs`.
11 + - [x] `resp.json()` failure after successful server-side push — added `retry_request_json` that deserializes inside the retry loop. Push now uses it.
12 + - [x] OAuth code exchange retry permanently fails on single-use codes — removed retry wrapper from `authenticate_with_code`, sends exactly once.
13 +
14 + ### Medium
15 + - [x] Normalized password string not zeroized after Argon2 derivation — `derive_wrapping_key` now zeroizes the normalized `String` before returning.
16 + - [x] `detect_conflicts` silently drops duplicate local pending changes — documented precondition and behavior in doc comment.
17 + - [x] Multiple remote changes for same row produce multiple ConflictPairs — documented in doc comment that caller must resolve in seq order.
18 + - [x] `PullFilter { tables: Some(vec![]) }` sends empty array — added `tables_is_empty` skip-if predicate; `Some(vec![])` now serializes same as `None`.
19 +
20 + ### Minor
21 + - [x] 429 `Retry-After` header ignored — `check_response` now parses `Retry-After` into `Server.retry_after_secs`; retry loop uses it (capped at 60s) instead of default backoff.
22 + - [x] `is_transient` misclassifies redirect loops and decode errors as retryable — added `!e.is_redirect() && !e.is_decode()` checks.
23 + - [x] `resolve_field_merge` with non-object base silently returns `KeepRemote` — improved doc comment to note `Value::Null` case and suggest LWW fallback.
24 + - [x] Trailing slash in `server_url` creates `//api/v1/...` double-slash paths — `Endpoints::new` now trims trailing slash.
25 + - [x] SSE error swallowed — `Err` branch now logs the error via `tracing::debug` before returning `None`.
26 + - [x] Invalid UTF-8 in SSE stream replaced silently — changed from `from_utf8_lossy` to `from_utf8`; invalid UTF-8 now closes the stream with a warning.
27 + - [x] Decrypted master key `Vec<u8>` in `unwrap_master_key` not zeroized before drop — added `plaintext.zeroize()`.
28 + - [x] `size_bytes: i64` accepts negative values — added validation in `blob_upload_url` and `blob_confirm`, returns `Internal` error for negative values.
29 +
30 + ### Notes
31 + - [x] Dummy `[0u8; 32]` key passed when no data — replaced with dedicated `encrypt_change_no_data`/`decrypt_change_no_data` that skip the key entirely.
32 + - [x] Field merge is shallow (top-level keys only) — documented as "top-level keys only" in function doc.
33 + - [x] Concurrent `authenticate()` calls silently overwrite each other — documented last-writer-wins behavior in `authenticate` doc comment.
34 + - [x] Keychain errors always classified non-transient — added comment in `is_transient` explaining keychain ops are synchronous and not wrapped by retry_request.
35 + - [x] `ChangeOp::from_str_opt` is case-sensitive but undocumented as such — doc comment now states "case-sensitive" and lists accepted values.
36 +
37 + ## Fuzz Findings (2026-04-29)
38 +
39 + ### Serious
40 + - [x] Push retry creates duplicate sync log entries — added `batch_id` UUID to push request + server dedup via `idx_sync_log_batch_id`. Migration 083.
41 + - [x] Auth timing oracle leaks account status — moved `verify_password` before account status checks. All failures now return uniform 401.
42 +
43 + ### Medium
44 + - [x] `change_password` TOCTOU race — added `expected_version` to `PutKeyRequest`. Server rejects with 409 on version mismatch. Client sends version from `get_server_key`.
45 + - [x] Pull/status JSON deserialization outside retry loop — reads use `retry_request` then `resp.json()` outside loop. Body-read failures not retried for idempotent reads. Switched to `retry_request_json`.
46 + - [x] Keystore doesn't zeroize base64 intermediates — `store_key`/`load_key` leave base64 strings on heap without zeroizing.
47 +
48 + ### Minor
49 + - [x] `prune_sync_log` accepts negative `retain_days` — would delete all entries. Added guard.
50 + - [x] Client `GetKeyResponse` missing `key_version` field — server returns it but client drops it.
51 + - [x] Token can expire during push retry sequence — mitigated by push idempotency key (retries are now safe).
52 + - [x] Blob size mismatch on concurrent upload — changed `ON CONFLICT DO NOTHING` to `DO UPDATE SET size_bytes = EXCLUDED.size_bytes`.
53 + - [x] `validate-app` endpoint uses GET with API key in query string — changed to POST with JSON body.
54 +
55 + ### Notes
56 + - SSE buffer can overshoot MAX_SSE_BUFFER by one chunk size before check triggers — bounded, not unbounded.
57 + - SSE parser only handles `\n\n` delimiters, not `\r\n\r\n` — fine since MNW server sends `\n`.
58 + - Integer vs float (`1` vs `1.0`) treated as different in `resolve_field_merge` — serde_json Value semantics.
59 + - Null base in `resolve_field_merge` returns KeepRemote — documented, callers should use LWW fallback.
6 60
7 61 ## Deferred (Post-Beta)
8 62 - [ ] Conflict resolution helpers — LWW, field-level merge, custom resolver callback in the SDK. Reduces client-side boilerplate. (Gap vs Ditto, Couchbase)
@@ -14,6 +14,9 @@ use super::helpers::{check_response, jwt_exp};
14 14 impl SyncKitClient {
15 15 /// Authenticate with the MNW server. Returns (user_id, app_id).
16 16 ///
17 + /// Stores the session internally. If called concurrently from multiple
18 + /// threads, the last write wins — earlier sessions are silently overwritten.
19 + ///
17 20 /// # Errors
18 21 ///
19 22 /// Returns `Server { status: 401, .. }` for wrong credentials.
@@ -144,16 +147,19 @@ impl SyncKitClient {
144 147 client_id: &self.config.api_key,
145 148 })?);
146 149
147 - let resp = self
148 - .retry_request(|| {
149 - let req = self
150 - .http
151 - .post(&self.endpoints.oauth_token)
152 - .header("content-type", "application/json")
153 - .body(body.clone());
154 - async move { check_response(req.send().await?).await }
155 - })
156 - .await?;
150 + // OAuth authorization codes are single-use. Retrying after a
151 + // network error would send an already-consumed code, producing a
152 + // permanent 400. Send exactly once and let the caller restart the
153 + // OAuth flow on failure.
154 + let resp = check_response(
155 + self.http
156 + .post(&self.endpoints.oauth_token)
157 + .header("content-type", "application/json")
158 + .body(body)
159 + .send()
160 + .await?,
161 + )
162 + .await?;
157 163 let token_resp: OAuthTokenResponse = resp.json().await?;
158 164
159 165 let user_id = token_resp.user_id;
@@ -20,6 +20,11 @@ impl SyncKitClient {
20 20 hash: &str,
21 21 size_bytes: i64,
22 22 ) -> Result<BlobUploadUrlResponse> {
23 + if size_bytes < 0 {
24 + return Err(crate::error::SyncKitError::Internal(
25 + "size_bytes must be non-negative".into(),
26 + ));
27 + }
23 28 let token = self.require_token()?;
24 29
25 30 let body = Bytes::from(serde_json::to_vec(&BlobUploadUrlRequest {
@@ -27,18 +32,16 @@ impl SyncKitClient {
27 32 size_bytes,
28 33 })?);
29 34
30 - let resp = self
31 - .retry_request(|| {
32 - let req = self
33 - .http
34 - .post(&self.endpoints.blobs_upload)
35 - .bearer_auth(&token)
36 - .header("content-type", "application/json")
37 - .body(body.clone());
38 - async move { check_response(req.send().await?).await }
39 - })
40 - .await?;
41 - Ok(resp.json().await?)
35 + self.retry_request_json(|| {
36 + let req = self
37 + .http
38 + .post(&self.endpoints.blobs_upload)
39 + .bearer_auth(&token)
40 + .header("content-type", "application/json")
41 + .body(body.clone());
42 + async move { check_response(req.send().await?).await }
43 + })
44 + .await
42 45 }
43 46
44 47 /// Upload blob data directly to S3 via a presigned PUT URL.
@@ -67,6 +70,11 @@ impl SyncKitClient {
67 70 /// The server verifies the object exists in S3 and records it.
68 71 #[instrument(skip(self))]
69 72 pub async fn blob_confirm(&self, hash: &str, size_bytes: i64) -> Result<()> {
73 + if size_bytes < 0 {
74 + return Err(crate::error::SyncKitError::Internal(
75 + "size_bytes must be non-negative".into(),
76 + ));
77 + }
70 78 let token = self.require_token()?;
71 79
72 80 let body = Bytes::from(serde_json::to_vec(&BlobConfirmRequest {
@@ -96,8 +104,8 @@ impl SyncKitClient {
96 104 hash: hash.to_string(),
97 105 })?);
98 106
99 - let resp = self
100 - .retry_request(|| {
107 + let download: BlobDownloadUrlResponse = self
108 + .retry_request_json(|| {
101 109 let req = self
102 110 .http
103 111 .post(&self.endpoints.blobs_download)
@@ -107,7 +115,6 @@ impl SyncKitClient {
107 115 async move { check_response(req.send().await?).await }
108 116 })
109 117 .await?;
110 - let download: BlobDownloadUrlResponse = resp.json().await?;
111 118 Ok(download.download_url)
112 119 }
113 120
@@ -27,7 +27,7 @@ impl SyncKitClient {
27 27 200 | 404 => Ok(resp),
28 28 status => {
29 29 let message = resp.text().await.unwrap_or_default();
30 - Err(crate::error::SyncKitError::Server { status, message })
30 + Err(crate::error::SyncKitError::Server { status, message, retry_after_secs: None })
31 31 }
32 32 }
33 33 }
@@ -45,8 +45,8 @@ impl SyncKitClient {
45 45 let master_key = crypto::generate_master_key();
46 46 let envelope = crypto::wrap_master_key(&master_key, password)?;
47 47
48 - // Push to server
49 - self.put_server_key(&envelope).await?;
48 + // Push to server (expected_version 0 = first key)
49 + self.put_server_key(&envelope, 0).await?;
50 50
51 51 // Cache in OS keychain
52 52 keystore::store_key(app_id, user_id, &master_key)?;
@@ -63,7 +63,7 @@ impl SyncKitClient {
63 63 pub async fn setup_encryption_existing(&self, password: &str) -> Result<()> {
64 64 let (app_id, user_id) = self.require_session_ids()?;
65 65
66 - let envelope_json = self.get_server_key().await?;
66 + let (envelope_json, _key_version) = self.get_server_key().await?;
67 67 let master_key = crypto::unwrap_master_key(&envelope_json, password)?;
68 68
69 69 // Cache in OS keychain
@@ -105,7 +105,7 @@ impl SyncKitClient {
105 105 ) -> Result<()> {
106 106 // Always fetch the envelope from the server and verify old_password
107 107 // can decrypt it, regardless of whether we have a cached key.
108 - let envelope_json = self.get_server_key().await?;
108 + let (envelope_json, key_version) = self.get_server_key().await?;
109 109 let verified_key = crypto::verify_password_against_envelope(
110 110 &envelope_json,
111 111 old_password,
@@ -116,7 +116,9 @@ impl SyncKitClient {
116 116 // Re-wrap with new password (generates fresh random salt)
117 117 let new_envelope = crypto::wrap_master_key(&master_key, new_password)?;
118 118
119 - self.put_server_key(&new_envelope).await?;
119 + // Optimistic lock: reject if another device changed the password
120 + // between our GET and this PUT.
121 + self.put_server_key(&new_envelope, key_version).await?;
120 122
121 123 tracing::info!("Encryption password changed");
122 124 Ok(())
@@ -129,11 +131,16 @@ impl SyncKitClient {
129 131 }
130 132
131 133 /// Upload the encrypted master key envelope to the server (PUT /api/sync/keys).
132 - pub(super) async fn put_server_key(&self, envelope_json: &str) -> Result<()> {
134 + ///
135 + /// `expected_version` is the key version the client expects. The server
136 + /// rejects with 409 if the current version doesn't match (another device
137 + /// changed the password). Use 0 for the initial key upload.
138 + pub(super) async fn put_server_key(&self, envelope_json: &str, expected_version: i32) -> Result<()> {
133 139 let (url, token) = self.key_url_and_token()?;
134 140
135 141 let body = Bytes::from(serde_json::to_vec(&PutKeyRequest {
136 142 encrypted_key: envelope_json.to_string(),
143 + expected_version,
137 144 })?);
138 145
139 146 self.retry_request(|| {
@@ -150,17 +157,17 @@ impl SyncKitClient {
150 157 }
151 158
152 159 /// Download the encrypted master key envelope from the server (GET /api/sync/keys).
153 - pub(super) async fn get_server_key(&self) -> Result<String> {
160 + /// Returns `(envelope_json, key_version)`.
161 + pub(super) async fn get_server_key(&self) -> Result<(String, i32)> {
154 162 let (url, token) = self.key_url_and_token()?;
155 163
156 - let resp = self
157 - .retry_request(|| {
164 + let key_resp: GetKeyResponse = self
165 + .retry_request_json(|| {
158 166 let req = self.http.get(url).bearer_auth(&token);
159 167 async move { check_response(req.send().await?).await }
160 168 })
161 169 .await?;
162 - let key_resp: GetKeyResponse = resp.json().await?;
163 - Ok(key_resp.encrypted_key)
170 + Ok((key_resp.encrypted_key, key_resp.key_version.unwrap_or(0)))
164 171 }
165 172 }
166 173
@@ -208,11 +215,13 @@ mod tests {
208 215 fn put_key_request_serialization() {
209 216 let req = PutKeyRequest {
210 217 encrypted_key: "envelope-json-here".to_string(),
218 + expected_version: 1,
211 219 };
212 220
213 221 let json = serde_json::to_string(&req).unwrap();
214 222 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
215 223 assert_eq!(parsed["encrypted_key"], "envelope-json-here");
224 + assert_eq!(parsed["expected_version"], 1);
216 225 }
217 226
218 227 #[test]
@@ -30,7 +30,66 @@ impl SyncKitClient {
30 30 }
31 31
32 32 if attempt < MAX_RETRIES {
33 - let delay = BASE_DELAY * 2u32.pow(attempt);
33 + let delay = retry_delay(&err, attempt);
34 + tracing::debug!(
35 + attempt = attempt + 1,
36 + max_retries = MAX_RETRIES,
37 + delay_ms = delay.as_millis() as u64,
38 + error = %err,
39 + "Transient error, retrying after backoff",
40 + );
41 + tokio::time::sleep(delay).await;
42 + }
43 +
44 + last_err = Some(err);
45 + }
46 + }
47 + }
48 +
49 + Err(last_err.expect("loop ran at least once"))
50 + }
51 +
52 + /// Retry an HTTP operation and deserialize the JSON response body inside
53 + /// the retry loop. This ensures a transient body-read failure (truncated
54 + /// response, connection reset mid-body) is retried rather than surfacing
55 + /// as a permanent error after the server already committed the operation.
56 + pub(super) async fn retry_request_json<F, Fut, T>(&self, mut operation: F) -> Result<T>
57 + where
58 + F: FnMut() -> Fut,
59 + Fut: std::future::Future<Output = Result<reqwest::Response>>,
60 + T: serde::de::DeserializeOwned,
61 + {
62 + let mut last_err = None;
63 +
64 + for attempt in 0..=MAX_RETRIES {
65 + match operation().await {
66 + Ok(resp) => {
67 + match resp.json::<T>().await {
68 + Ok(parsed) => return Ok(parsed),
69 + Err(e) => {
70 + let err = SyncKitError::Http(e);
71 + if attempt < MAX_RETRIES {
72 + let delay = BASE_DELAY * 2u32.pow(attempt);
73 + tracing::debug!(
74 + attempt = attempt + 1,
75 + max_retries = MAX_RETRIES,
76 + delay_ms = delay.as_millis() as u64,
77 + error = %err,
78 + "Response body read failed, retrying",
79 + );
80 + tokio::time::sleep(delay).await;
81 + }
82 + last_err = Some(err);
83 + }
84 + }
85 + }
86 + Err(err) => {
87 + if !is_transient(&err) {
88 + return Err(err);
89 + }
90 +
91 + if attempt < MAX_RETRIES {
92 + let delay = retry_delay(&err, attempt);
34 93 tracing::debug!(
35 94 attempt = attempt + 1,
36 95 max_retries = MAX_RETRIES,
@@ -56,10 +115,36 @@ impl SyncKitClient {
56 115 let master_key = self.require_master_key()?;
57 116 Self::encrypt_change_with_key(entry, &master_key)
58 117 } else {
59 - Self::encrypt_change_with_key(entry, &[0u8; 32])
118 + Self::encrypt_change_no_data(entry)
60 119 }
61 120 }
62 121
122 + /// Build a wire entry for a change with no data (e.g. Delete). No key needed.
123 + #[cfg(test)]
124 + pub(super) fn encrypt_change_no_data(entry: ChangeEntry) -> Result<WireChangeEntry> {
125 + debug_assert!(entry.data.is_none());
126 + Ok(WireChangeEntry {
127 + table: entry.table,
128 + op: entry.op,
129 + row_id: entry.row_id,
130 + timestamp: entry.timestamp,
131 + data: None,
132 + })
133 + }
134 +
135 + /// Decrypt a pulled entry that has no data. No key needed.
136 + #[cfg(test)]
137 + pub(super) fn decrypt_change_no_data(entry: PullChangeEntry) -> Result<ChangeEntry> {
138 + debug_assert!(entry.data.is_none());
139 + Ok(ChangeEntry {
140 + table: entry.table,
141 + op: entry.op,
142 + row_id: entry.row_id,
143 + timestamp: entry.timestamp,
144 + data: None,
145 + })
146 + }
147 +
63 148 /// Encrypt with a pre-loaded key. Used by `push()` to avoid per-entry lock acquisition.
64 149 pub(super) fn encrypt_change_with_key(entry: ChangeEntry, master_key: &[u8; 32]) -> Result<WireChangeEntry> {
65 150 let encrypted_data = match entry.data {
@@ -83,7 +168,7 @@ impl SyncKitClient {
83 168 let master_key = self.require_master_key()?;
84 169 Self::decrypt_change_with_key(entry, &master_key)
85 170 } else {
86 - Self::decrypt_change_with_key(entry, &[0u8; 32])
171 + Self::decrypt_change_no_data(entry)
87 172 }
88 173 }
89 174
@@ -145,15 +230,41 @@ pub(super) fn token_is_expired(token: &str) -> bool {
145 230 }
146 231
147 232 /// Check an HTTP response for errors, returning the response on success.
233 + ///
234 + /// For 429 responses, parses the `Retry-After` header so the retry loop
235 + /// can use it instead of the default exponential backoff delay.
148 236 pub(super) async fn check_response(resp: reqwest::Response) -> Result<reqwest::Response> {
149 237 let status = resp.status().as_u16();
150 238 if status >= 400 {
239 + let retry_after_secs = parse_retry_after(&resp);
151 240 let message = resp.text().await.unwrap_or_default();
152 - return Err(SyncKitError::Server { status, message });
241 + return Err(SyncKitError::Server { status, message, retry_after_secs });
153 242 }
154 243 Ok(resp)
155 244 }
156 245
246 + /// Extract the `Retry-After` header from an HTTP response as seconds.
247 + /// Returns `None` if the header is absent, non-numeric, or zero.
248 + fn parse_retry_after(resp: &reqwest::Response) -> Option<u64> {
249 + resp.headers()
250 + .get("retry-after")
251 + .and_then(|v| v.to_str().ok())
252 + .and_then(|v| v.parse::<u64>().ok())
253 + .filter(|&secs| secs > 0)
254 + }
255 +
256 + /// Compute the backoff delay for a retry attempt.
257 + ///
258 + /// Uses the server's `Retry-After` header (if present on a 429) capped at
259 + /// 60 seconds. Falls back to exponential backoff (1s, 2s, 4s).
260 + fn retry_delay(err: &SyncKitError, attempt: u32) -> std::time::Duration {
261 + if let SyncKitError::Server { retry_after_secs: Some(secs), .. } = err {
262 + let capped = (*secs).min(60);
263 + return std::time::Duration::from_secs(capped);
264 + }
265 + BASE_DELAY * 2u32.pow(attempt)
266 + }
267 +
157 268 /// Returns true if the error is transient and worth retrying.
158 269 ///
159 270 /// Transient errors:
@@ -167,15 +278,19 @@ pub(super) async fn check_response(resp: reqwest::Response) -> Result<reqwest::R
167 278 pub(super) fn is_transient(err: &SyncKitError) -> bool {
168 279 match err {
169 280 SyncKitError::Http(e) => {
170 - // All reqwest transport errors are transient (timeout, connect, DNS, etc.)
171 - // except for builder errors which indicate programming mistakes.
172 - !e.is_builder()
281 + // Transport errors (timeout, connect, DNS) are transient.
282 + // Builder errors are programming mistakes, redirect loops and
283 + // body decode failures are permanent — retrying won't help.
284 + !e.is_builder() && !e.is_redirect() && !e.is_decode()
173 285 }
174 286 SyncKitError::Server { status, .. } => {
175 287 // 5xx = server error (transient), 429 = rate limited (transient)
176 288 *status >= 500 || *status == 429
177 289 }
178 - // Everything else (auth, crypto, serialization) is permanent
290 + // Everything else (auth, crypto, serialization, keychain) is permanent.
291 + // Note: some keychain errors (e.g. locked keychain, unavailable
292 + // secret-service) are conceptually transient, but keychain operations
293 + // are synchronous and not wrapped by retry_request.
179 294 _ => false,
180 295 }
181 296 }
@@ -348,35 +463,35 @@ mod tests {
348 463
349 464 #[test]
350 465 fn is_transient_server_5xx() {
351 - let err = SyncKitError::Server { status: 500, message: "Internal Server Error".to_string() };
466 + let err = SyncKitError::Server { status: 500, message: "Internal Server Error".to_string(), retry_after_secs: None };
352 467 assert!(is_transient(&err));
353 - let err = SyncKitError::Server { status: 502, message: "Bad Gateway".to_string() };
468 + let err = SyncKitError::Server { status: 502, message: "Bad Gateway".to_string(), retry_after_secs: None };
354 469 assert!(is_transient(&err));
355 - let err = SyncKitError::Server { status: 503, message: "Service Unavailable".to_string() };
470 + let err = SyncKitError::Server { status: 503, message: "Service Unavailable".to_string(), retry_after_secs: None };
356 471 assert!(is_transient(&err));
357 - let err = SyncKitError::Server { status: 504, message: "Gateway Timeout".to_string() };
472 + let err = SyncKitError::Server { status: 504, message: "Gateway Timeout".to_string(), retry_after_secs: None };
358 473 assert!(is_transient(&err));
359 474 }
360 475
361 476 #[test]
362 477 fn is_transient_rate_limited_429() {
363 - let err = SyncKitError::Server { status: 429, message: "Too Many Requests".to_string() };
478 + let err = SyncKitError::Server { status: 429, message: "Too Many Requests".to_string(), retry_after_secs: None };
364 479 assert!(is_transient(&err));
365 480 }
366 481
367 482 #[test]
368 483 fn is_not_transient_client_4xx() {
369 - let err = SyncKitError::Server { status: 400, message: "Bad Request".to_string() };
484 + let err = SyncKitError::Server { status: 400, message: "Bad Request".to_string(), retry_after_secs: None };
370 485 assert!(!is_transient(&err));
371 - let err = SyncKitError::Server { status: 401, message: "Unauthorized".to_string() };
486 + let err = SyncKitError::Server { status: 401, message: "Unauthorized".to_string(), retry_after_secs: None };
372 487 assert!(!is_transient(&err));
373 - let err = SyncKitError::Server { status: 403, message: "Forbidden".to_string() };
488 + let err = SyncKitError::Server { status: 403, message: "Forbidden".to_string(), retry_after_secs: None };
374 489 assert!(!is_transient(&err));
375 - let err = SyncKitError::Server { status: 404, message: "Not Found".to_string() };
490 + let err = SyncKitError::Server { status: 404, message: "Not Found".to_string(), retry_after_secs: None };
376 491 assert!(!is_transient(&err));
377 - let err = SyncKitError::Server { status: 409, message: "Conflict".to_string() };
492 + let err = SyncKitError::Server { status: 409, message: "Conflict".to_string(), retry_after_secs: None };
378 493 assert!(!is_transient(&err));
379 - let err = SyncKitError::Server { status: 422, message: "Unprocessable Entity".to_string() };
494 + let err = SyncKitError::Server { status: 422, message: "Unprocessable Entity".to_string(), retry_after_secs: None };
380 495 assert!(!is_transient(&err));
381 496 }
382 497
@@ -455,11 +570,11 @@ mod tests {
455 570
456 571 #[test]
457 572 fn is_transient_boundary_values() {
458 - assert!(!is_transient(&SyncKitError::Server { status: 428, message: String::new() }));
459 - assert!(is_transient(&SyncKitError::Server { status: 429, message: String::new() }));
460 - assert!(!is_transient(&SyncKitError::Server { status: 430, message: String::new() }));
461 - assert!(!is_transient(&SyncKitError::Server { status: 499, message: String::new() }));
462 - assert!(is_transient(&SyncKitError::Server { status: 500, message: String::new() }));
573 + assert!(!is_transient(&SyncKitError::Server { status: 428, message: String::new(), retry_after_secs: None }));
574 + assert!(is_transient(&SyncKitError::Server { status: 429, message: String::new(), retry_after_secs: None }));
575 + assert!(!is_transient(&SyncKitError::Server { status: 430, message: String::new(), retry_after_secs: None }));
576 + assert!(!is_transient(&SyncKitError::Server { status: 499, message: String::new(), retry_after_secs: None }));
577 + assert!(is_transient(&SyncKitError::Server { status: 500, message: String::new(), retry_after_secs: None }));
463 578 }
464 579
465 580 // ── Token expiry detection ──
@@ -105,6 +105,7 @@ struct Endpoints {
105 105
106 106 impl Endpoints {
107 107 fn new(base: &str) -> Self {
108 + let base = base.trim_end_matches('/');
108 109 Self {
109 110 auth: format!("{base}/api/v1/sync/auth"),
110 111 oauth_token: format!("{base}/oauth/token"),
@@ -265,8 +266,9 @@ pub async fn validate_api_key(server_url: &str, api_key: &str) -> Result<String>
265 266 .timeout(std::time::Duration::from_secs(10))
266 267 .build()?;
267 268 let resp = http
268 - .get(&url)
269 - .query(&[("api_key", api_key)])
269 + .post(&url)
270 + .header("content-type", "application/json")
271 + .body(serde_json::to_vec(&serde_json::json!({"api_key": api_key}))?)
270 272 .send()
271 273 .await?;
272 274 let status = resp.status().as_u16();
@@ -274,6 +276,7 @@ pub async fn validate_api_key(server_url: &str, api_key: &str) -> Result<String>
274 276 return Err(SyncKitError::Server {
275 277 status: 401,
276 278 message: "Invalid API key".to_string(),
279 + retry_after_secs: None,
277 280 });
278 281 }
279 282 let resp = helpers::check_response(resp).await?;
@@ -474,6 +477,8 @@ mod tests {
474 477 };
475 478 let client = SyncKitClient::new(config);
476 479 assert_eq!(client.config().server_url, "https://example.com/");
480 + // Endpoints should not have double slashes
481 + assert_eq!(client.endpoints.auth, "https://example.com/api/v1/sync/auth");
477 482 }
478 483
479 484 // ── SyncKitError Display ──
@@ -492,7 +497,7 @@ mod tests {
492 497
493 498 #[test]
494 499 fn error_display_server() {
495 - let err = SyncKitError::Server { status: 500, message: "boom".to_string() };
500 + let err = SyncKitError::Server { status: 500, message: "boom".to_string(), retry_after_secs: None };
496 501 let msg = err.to_string();
497 502 assert!(msg.contains("500"));
498 503 assert!(msg.contains("boom"));
@@ -18,6 +18,11 @@ pub struct SyncNotifyStream {
18 18 response: reqwest::Response,
19 19 }
20 20
21 + /// Maximum SSE buffer size (1 MB). If the server sends data without a
22 + /// block terminator (`\n\n`) exceeding this limit, we treat it as an error
23 + /// and close the stream to prevent unbounded memory growth.
24 + const MAX_SSE_BUFFER: usize = 1024 * 1024;
25 +
21 26 impl SyncNotifyStream {
22 27 pub(crate) fn new(response: reqwest::Response) -> Self {
23 28 Self {
@@ -48,10 +53,23 @@ impl SyncNotifyStream {
48 53 // Need more data from the stream
49 54 match self.response.chunk().await {
50 55 Ok(Some(chunk)) => {
51 - let text = String::from_utf8_lossy(&chunk);
52 - self.buffer.push_str(&text);
56 + match std::str::from_utf8(&chunk) {
57 + Ok(text) => self.buffer.push_str(text),
58 + Err(_) => {
59 + tracing::warn!("SSE stream contained invalid UTF-8, closing");
60 + return None;
61 + }
62 + }
63 + if self.buffer.len() > MAX_SSE_BUFFER {
64 + tracing::warn!("SSE buffer exceeded {MAX_SSE_BUFFER} bytes, closing stream");
65 + return None;
66 + }
67 + }
68 + Ok(None) => return None,
69 + Err(e) => {
70 + tracing::debug!(error = %e, "SSE stream error, closing");
71 + return None;
53 72 }
54 - Ok(None) | Err(_) => return None,
55 73 }
56 74 }
57 75 }
@@ -105,7 +123,7 @@ impl SyncKitClient {
105 123 let status = resp.status().as_u16();
106 124 if status >= 400 {
107 125 let message = resp.text().await.unwrap_or_default();
108 - return Err(SyncKitError::Server { status, message });
126 + return Err(SyncKitError::Server { status, message, retry_after_secs: None });
109 127 }
110 128
111 129 Ok(SyncNotifyStream::new(resp))
@@ -32,18 +32,16 @@ impl SyncKitClient {
32 32 platform: platform.to_string(),
33 33 })?);
34 34
35 - let resp = self
36 - .retry_request(|| {
37 - let req = self
38 - .http
39 - .post(&self.endpoints.devices)
40 - .bearer_auth(&token)
41 - .header("content-type", "application/json")
42 - .body(body.clone());
43 - async move { check_response(req.send().await?).await }
44 - })
45 - .await?;
46 - Ok(resp.json().await?)
35 + self.retry_request_json(|| {
36 + let req = self
37 + .http
38 + .post(&self.endpoints.devices)
39 + .bearer_auth(&token)
40 + .header("content-type", "application/json")
41 + .body(body.clone());
42 + async move { check_response(req.send().await?).await }
43 + })
44 + .await
47 45 }
48 46
49 47 /// List all devices for the current user.
@@ -51,13 +49,11 @@ impl SyncKitClient {
51 49 pub async fn list_devices(&self) -> Result<Vec<Device>> {
52 50 let token = self.require_token()?;
53 51
54 - let resp = self
55 - .retry_request(|| {
56 - let req = self.http.get(&self.endpoints.devices).bearer_auth(&token);
57 - async move { check_response(req.send().await?).await }
58 - })
59 - .await?;
60 - Ok(resp.json().await?)
52 + self.retry_request_json(|| {
53 + let req = self.http.get(&self.endpoints.devices).bearer_auth(&token);
54 + async move { check_response(req.send().await?).await }
55 + })
56 + .await
61 57 }
62 58
63 59 // ── Push / Pull ──
@@ -89,11 +85,12 @@ impl SyncKitClient {
89 85
90 86 let body = Bytes::from(serde_json::to_vec(&WirePushRequest {
91 87 device_id,
88 + batch_id: Uuid::new_v4(),
92 89 changes: wire_changes,
93 90 })?);
94 91
95 - let resp = self
96 - .retry_request(|| {
92 + let push_resp: PushResponse = self
93 + .retry_request_json(|| {
97 94 let req = self
98 95 .http
99 96 .post(&self.endpoints.push)
@@ -103,8 +100,6 @@ impl SyncKitClient {
103 100 async move { check_response(req.send().await?).await }
104 101 })
105 102 .await?;
106 -
107 - let push_resp: PushResponse = resp.json().await?;
108 103 Ok(push_resp.cursor)
109 104 }
110 105
@@ -119,39 +114,8 @@ impl SyncKitClient {
119 114 device_id: Uuid,
120 115 cursor: i64,
121 116 ) -> Result<(Vec<ChangeEntry>, i64, bool)> {
122 - let token = self.require_token()?;
123 -
124 117 let body = Bytes::from(serde_json::to_vec(&PullRequest { device_id, cursor })?);
125 -
126 - let resp = self
127 - .retry_request(|| {
128 - let req = self
129 - .http
130 - .post(&self.endpoints.pull)
131 - .bearer_auth(&token)
132 - .header("content-type", "application/json")
133 - .body(body.clone());
134 - async move { check_response(req.send().await?).await }
135 - })
136 - .await?;
137 -
138 - let pull_resp: PullResponse = resp.json().await?;
139 -
140 - // Extract key once for the entire batch (only needed if any entry has data)
141 - let has_data = pull_resp.changes.iter().any(|c| c.data.is_some());
142 - let key_holder = if has_data {
143 - self.require_master_key()?
144 - } else {
145 - crypto::ZeroizeOnDrop([0u8; 32])
146 - };
147 - let master_key: &[u8; 32] = &key_holder;
148 - let changes = pull_resp
149 - .changes
150 - .into_iter()
151 - .map(|c| Self::decrypt_change_with_key(c, master_key))
152 - .collect::<Result<Vec<_>>>()?;
153 -
154 - Ok((changes, pull_resp.cursor, pull_resp.has_more))
118 + self.pull_inner(body, Self::decrypt_change_with_key).await
155 119 }
156 120
157 121 /// Pull changes from the server with optional table and timestamp filters.
@@ -166,43 +130,13 @@ impl SyncKitClient {
166 130 cursor: i64,
167 131 filter: PullFilter,
168 132 ) -> Result<(Vec<ChangeEntry>, i64, bool)> {
169 - let token = self.require_token()?;
170 -
171 133 let body = Bytes::from(serde_json::to_vec(&FilteredPullRequest {
172 134 device_id,
173 135 cursor,
174 136 tables: filter.tables,
175 137 since: filter.since,
176 138 })?);
177 -
178 - let resp = self
179 - .retry_request(|| {
180 - let req = self
181 - .http
182 - .post(&self.endpoints.pull)
183 - .bearer_auth(&token)
184 - .header("content-type", "application/json")
185 - .body(body.clone());
186 - async move { check_response(req.send().await?).await }
187 - })
188 - .await?;
189 -
190 - let pull_resp: PullResponse = resp.json().await?;
191 -
192 - let has_data = pull_resp.changes.iter().any(|c| c.data.is_some());
193 - let key_holder = if has_data {
194 - self.require_master_key()?
195 - } else {
196 - crypto::ZeroizeOnDrop([0u8; 32])
197 - };
198 - let master_key: &[u8; 32] = &key_holder;
199 - let changes = pull_resp
200 - .changes
201 - .into_iter()
202 - .map(|c| Self::decrypt_change_with_key(c, master_key))
203 - .collect::<Result<Vec<_>>>()?;
204 -
205 - Ok((changes, pull_resp.cursor, pull_resp.has_more))
139 + self.pull_inner(body, Self::decrypt_change_with_key).await
206 140 }
207 141
208 142 /// Pull changes from the server, preserving `device_id` and `seq` metadata.
@@ -216,38 +150,8 @@ impl SyncKitClient {
216 150 device_id: Uuid,
217 151 cursor: i64,
218 152 ) -> Result<(Vec<PulledChange>, i64, bool)> {
219 - let token = self.require_token()?;
220 -
221 153 let body = Bytes::from(serde_json::to_vec(&PullRequest { device_id, cursor })?);
222 -
223 - let resp = self
224 - .retry_request(|| {
225 - let req = self
226 - .http
227 - .post(&self.endpoints.pull)
228 - .bearer_auth(&token)
229 - .header("content-type", "application/json")
230 - .body(body.clone());
231 - async move { check_response(req.send().await?).await }
232 - })
233 - .await?;
234 -
235 - let pull_resp: PullResponse = resp.json().await?;
236 -
237 - let has_data = pull_resp.changes.iter().any(|c| c.data.is_some());
238 - let key_holder = if has_data {
239 - self.require_master_key()?
240 - } else {
241 - crypto::ZeroizeOnDrop([0u8; 32])
242 - };
243 - let master_key: &[u8; 32] = &key_holder;
244 - let changes = pull_resp
245 - .changes
246 - .into_iter()
247 - .map(|c| Self::decrypt_change_to_pulled(c, master_key))
248 - .collect::<Result<Vec<_>>>()?;
249 -
250 - Ok((changes, pull_resp.cursor, pull_resp.has_more))
154 + self.pull_inner(body, Self::decrypt_change_to_pulled).await
251 155 }
252 156
253 157 /// Pull changes with filters, preserving `device_id` and `seq` metadata.
@@ -261,17 +165,29 @@ impl SyncKitClient {
261 165 cursor: i64,
262 166 filter: PullFilter,
263 167 ) -> Result<(Vec<PulledChange>, i64, bool)> {
264 - let token = self.require_token()?;
265 -
266 168 let body = Bytes::from(serde_json::to_vec(&FilteredPullRequest {
267 169 device_id,
268 170 cursor,
269 171 tables: filter.tables,
270 172 since: filter.since,
271 173 })?);
174 + self.pull_inner(body, Self::decrypt_change_to_pulled).await
175 + }
272 176
273 - let resp = self
274 - .retry_request(|| {
177 + /// Shared pull implementation: sends the request, extracts the master key,
178 + /// and decrypts each change using the provided function.
179 + async fn pull_inner<T, F>(
180 + &self,
181 + body: Bytes,
182 + decrypt_fn: F,
183 + ) -> Result<(Vec<T>, i64, bool)>
184 + where
185 + F: Fn(PullChangeEntry, &[u8; 32]) -> Result<T>,
186 + {
187 + let token = self.require_token()?;
188 +
189 + let pull_resp: PullResponse = self
190 + .retry_request_json(|| {
275 191 let req = self
276 192 .http
277 193 .post(&self.endpoints.pull)
@@ -282,8 +198,7 @@ impl SyncKitClient {
282 198 })
283 199 .await?;
284 200
285 - let pull_resp: PullResponse = resp.json().await?;
286 -
201 + // Extract key once for the entire batch (only needed if any entry has data)
287 202 let has_data = pull_resp.changes.iter().any(|c| c.data.is_some());
288 203 let key_holder = if has_data {
289 204 self.require_master_key()?
@@ -294,7 +209,7 @@ impl SyncKitClient {
294 209 let changes = pull_resp
295 210 .changes
296 211 .into_iter()
297 - .map(|c| Self::decrypt_change_to_pulled(c, master_key))
212 + .map(|c| decrypt_fn(c, master_key))
298 213 .collect::<Result<Vec<_>>>()?;
299 214
300 215 Ok((changes, pull_resp.cursor, pull_resp.has_more))
@@ -305,13 +220,11 @@ impl SyncKitClient {
305 220 pub async fn status(&self) -> Result<SyncStatus> {
306 221 let token = self.require_token()?;
307 222
308 - let resp = self
309 - .retry_request(|| {
310 - let req = self.http.get(&self.endpoints.status).bearer_auth(&token);
311 - async move { check_response(req.send().await?).await }
312 - })
313 - .await?;
314 - Ok(resp.json().await?)
223 + self.retry_request_json(|| {
224 + let req = self.http.get(&self.endpoints.status).bearer_auth(&token);
225 + async move { check_response(req.send().await?).await }
226 + })
227 + .await
315 228 }
316 229 }
317 230
@@ -428,6 +341,7 @@ mod tests {
428 341 let device_id = Uuid::new_v4();
429 342 let req = WirePushRequest {
430 343 device_id,
344 + batch_id: Uuid::new_v4(),
431 345 changes: vec![WireChangeEntry {
432 346 table: "tasks".to_string(),
433 347 op: ChangeOp::Insert,
@@ -60,13 +60,23 @@ pub trait ConflictResolver: Send + Sync {
60 60 /// modify the same `(table, row_id)` from different devices. Changes from
61 61 /// our own device (echoes) are never treated as conflicts.
62 62 ///
63 + /// **Precondition:** `local_pending` should contain at most one entry per
64 + /// `(table, row_id)`. If duplicates exist, only the last one participates
65 + /// in conflict detection (earlier entries are silently ignored). Callers
66 + /// should compact local pending changes before calling this function.
67 + ///
68 + /// If `remote` contains multiple changes for the same `(table, row_id)`,
69 + /// each produces a separate `ConflictPair` with a clone of the same local
70 + /// entry. The caller must resolve them in order (by `seq`).
71 + ///
63 72 /// Returns `(clean, conflicts)` where `clean` changes can be applied directly.
64 73 pub fn detect_conflicts(
65 74 remote: Vec<PulledChange>,
66 75 local_pending: &[ChangeEntry],
67 76 our_device_id: Uuid,
68 77 ) -> (Vec<PulledChange>, Vec<ConflictPair>) {
69 - // Build lookup: (table, row_id) -> last local pending entry
78 + // Build lookup: (table, row_id) -> last local pending entry.
79 + // If local_pending has duplicates for the same key, the last entry wins.
70 80 let mut local_map: HashMap<(&str, &str), &ChangeEntry> = HashMap::new();
71 81 for entry in local_pending {
72 82 local_map.insert((&entry.table, &entry.row_id), entry);
@@ -118,13 +128,15 @@ pub fn resolve_lww(local: &ChangeEntry, remote: &PulledChange) -> Resolution {
118 128 }
119 129 }
120 130
121 - /// 3-way field-level merge for JSON objects.
131 + /// 3-way field-level merge for JSON objects (top-level keys only).
122 132 ///
123 133 /// Compares `local` and `remote` against `base` to determine which fields
124 134 /// each side changed, then merges non-overlapping changes. For overlapping
125 135 /// fields, the newer timestamp wins.
126 136 ///
127 - /// Returns `Resolution::KeepRemote` if any input is not a JSON object.
137 + /// Returns `Resolution::KeepRemote` if any input is not a JSON object
138 + /// (including `Value::Null` for a missing base). Callers should fall back
139 + /// to [`resolve_lww`] when the base snapshot is unavailable.
128 140 pub fn resolve_field_merge(
129 141 local: &serde_json::Value,
130 142 remote: &serde_json::Value,
@@ -627,4 +639,312 @@ mod tests {
627 639 Resolution::KeepRemote
628 640 ));
629 641 }
642 +
643 + // ── Fuzz: edge cases and attack vectors ──
644 +
645 + // Attack vector 1: duplicate local entries for same (table, row_id).
646 + // HashMap insert means last entry wins. Verify the conflict pair uses
647 + // the LATER local entry, not the earlier one.
648 + #[test]
649 + fn detect_conflicts_duplicate_local_uses_last_entry() {
650 + let our_device = Uuid::new_v4();
651 + let other_device = Uuid::new_v4();
652 + let t1 = Utc::now() - chrono::Duration::seconds(60);
653 + let t2 = Utc::now();
654 +
655 + let remote = vec![
656 + make_pulled("tasks", "r1", ChangeOp::Update, t2, other_device, 1),
657 + ];
658 + // Two local entries for same (table, row_id): Insert then Update.
659 + // The Update (last) should participate in conflict detection.
660 + let local = vec![
661 + make_entry("tasks", "r1", ChangeOp::Insert, t1),
662 + make_entry("tasks", "r1", ChangeOp::Update, t2),
663 + ];
664 +
665 + let (_clean, conflicts) = detect_conflicts(remote, &local, our_device);
666 + assert_eq!(conflicts.len(), 1);
667 + // The conflict should use the Update (last entry), not the Insert
668 + assert_eq!(conflicts[0].local.op, ChangeOp::Update);
669 + assert_eq!(conflicts[0].local.timestamp, t2);
670 + }
671 +
672 + // Attack vector 2: DELETE vs DELETE in LWW.
673 + // Both are Delete, so it hits `local.op == ChangeOp::Delete` and returns
674 + // KeepLocal — even if remote Delete has a newer timestamp. This is the
675 + // current behavior. Verify it explicitly.
676 + #[test]
677 + fn lww_both_delete_local_wins_even_when_remote_newer() {
678 + let other_device = Uuid::new_v4();
679 + let old = Utc::now() - chrono::Duration::seconds(60);
680 + let new = Utc::now();
681 +
682 + let local = make_entry("tasks", "r1", ChangeOp::Delete, old);
683 + let remote = make_pulled("tasks", "r1", ChangeOp::Delete, new, other_device, 1);
684 +
685 + // BUG CANDIDATE: remote Delete is newer but local wins because the
686 + // code checks `local.op == Delete` first, returning KeepLocal always.
687 + // For Delete-vs-Delete this is semantically fine (both delete the row),
688 + // but the timestamp is ignored entirely.
689 + assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal));
690 + }
691 +
692 + // Attack vector 3: field_merge overlapping fields with equal timestamps.
693 + // Ties go to local (remote_ts > local_ts is false). Verify.
694 + #[test]
695 + fn field_merge_overlapping_equal_timestamps_local_wins() {
696 + let base = json!({"title": "base"});
697 + let local = json!({"title": "local version"});
698 + let remote = json!({"title": "remote version"});
699 + let now = Utc::now();
700 +
701 + let result = resolve_field_merge(&local, &remote, &base, now, now);
702 + match result {
703 + Resolution::Merged(v) => {
704 + assert_eq!(v["title"], "local version",
705 + "Equal timestamps: local should win for overlapping fields");
706 + }
707 + _ => panic!("Expected Merged"),
708 + }
709 + }
710 +
711 + // Attack vector 4: HashMap iteration order determinism.
712 + // Non-overlapping keys from both sides should merge deterministically
713 + // regardless of HashMap iteration order.
714 + #[test]
715 + fn field_merge_deterministic_with_many_keys() {
716 + let base = json!({});
717 + let local = json!({"a": 1, "c": 3, "e": 5, "g": 7, "i": 9});
718 + let remote = json!({"b": 2, "d": 4, "f": 6, "h": 8, "j": 10});
719 + let now = Utc::now();
720 +
721 + // Run multiple times to catch iteration-order bugs
722 + for _ in 0..10 {
723 + let result = resolve_field_merge(&local, &remote, &base, now, now);
724 + match result {
725 + Resolution::Merged(v) => {
726 + assert_eq!(v["a"], 1);
727 + assert_eq!(v["b"], 2);
728 + assert_eq!(v["c"], 3);
729 + assert_eq!(v["d"], 4);
730 + assert_eq!(v["e"], 5);
731 + assert_eq!(v["f"], 6);
732 + assert_eq!(v["g"], 7);
733 + assert_eq!(v["h"], 8);
734 + assert_eq!(v["i"], 9);
735 + assert_eq!(v["j"], 10);
736 + }
737 + _ => panic!("Expected Merged"),
738 + }
739 + }
740 + }
741 +
742 + // Attack vector 5: null base with both local and remote as objects.
743 + // Falls back to KeepRemote, silently discarding local changes.
744 + #[test]
745 + fn field_merge_null_base_discards_local() {
746 + let base = json!(null);
747 + let local = json!({"title": "important local edit"});
748 + let remote = json!({"status": "remote only"});
749 + let now = Utc::now();
750 +
751 + // BUG: Both sides are valid objects but null base causes KeepRemote,
752 + // which silently drops "title": "important local edit".
753 + // A better fallback might be to merge both against an empty base,
754 + // or fall back to LWW.
755 + let result = resolve_field_merge(&local, &remote, &base, now, now);
756 + assert!(matches!(result, Resolution::KeepRemote),
757 + "Null base should fall back to KeepRemote (current behavior)");
758 + }
759 +
760 + // Attack vector 6: empty object vs changed value in field_merge.
761 + // Both `{}` and a new value differ from the base value, so both are
762 + // "changed". This is an overlapping-field conflict.
763 + #[test]
764 + fn field_merge_empty_object_counts_as_change() {
765 + let base = json!({"meta": {"nested": "data"}});
766 + let local = json!({"meta": {}}); // Changed to empty object
767 + let remote = json!({"meta": "flat string"}); // Changed to string
768 + let old = Utc::now() - chrono::Duration::seconds(60);
769 + let new = Utc::now();
770 +
771 + // Remote is newer, so remote wins the overlapping field
772 + let result = resolve_field_merge(&local, &remote, &base, old, new);
773 + match result {
774 + Resolution::Merged(v) => {
775 + assert_eq!(v["meta"], "flat string");
776 + }
777 + _ => panic!("Expected Merged"),
778 + }
779 +
780 + // Local is newer, so local wins — meta becomes empty object
781 + let result = resolve_field_merge(&local, &remote, &base, new, old);
782 + match result {
783 + Resolution::Merged(v) => {
784 + assert_eq!(v["meta"], json!({}));
785 + }
786 + _ => panic!("Expected Merged"),
787 + }
788 + }
789 +
790 + // Attack vector 7: multiple remote changes for same (table, row_id).
791 + // Each produces a separate ConflictPair with a CLONE of the same local entry.
792 + #[test]
793 + fn detect_conflicts_multiple_remote_same_row() {
794 + let our_device = Uuid::new_v4();
795 + let other_device = Uuid::new_v4();
796 + let now = Utc::now();
797 +
798 + let remote = vec![
799 + make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1),
800 + make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 2),
801 + make_pulled("tasks", "r1", ChangeOp::Delete, now, other_device, 3),
802 + ];
803 + let local = vec![
804 + make_entry("tasks", "r1", ChangeOp::Update, now),
805 + ];
806 +
807 + let (_clean, conflicts) = detect_conflicts(remote, &local, our_device);
808 + // All 3 remote changes conflict with the same local entry
809 + assert_eq!(conflicts.len(), 3);
810 + // Each gets a clone of the same local entry
811 + assert_eq!(conflicts[0].local.row_id, "r1");
812 + assert_eq!(conflicts[1].local.row_id, "r1");
813 + assert_eq!(conflicts[2].local.row_id, "r1");
814 + // Verify seq ordering is preserved
815 + assert_eq!(conflicts[0].remote.seq, 1);
816 + assert_eq!(conflicts[1].remote.seq, 2);
817 + assert_eq!(conflicts[2].remote.seq, 3);
818 + }
819 +
820 + // Attack vector 8: numeric type coercion in serde_json PartialEq.
821 + // JSON `1` (u64) and `1.0` (f64) are different Value variants.
822 + // serde_json::Value PartialEq does NOT treat them as equal.
823 + #[test]
824 + fn field_merge_integer_vs_float_treated_as_different() {
825 + let base = json!({"count": 1}); // serde_json: Number(PosInt(1))
826 + let local = json!({"count": 1.0}); // serde_json: Number(Float(1.0))
827 + let remote = json!({"count": 1}); // unchanged from base
828 + let now = Utc::now();
829 +
830 + let result = resolve_field_merge(&local, &remote, &base, now, now);
831 + match result {
832 + Resolution::Merged(v) => {
833 + // BUG CANDIDATE: `1` != `1.0` in serde_json, so local sees
834 + // "count" as changed (1 -> 1.0) even though semantically
835 + // identical. Remote sees no change. Merge applies local's 1.0.
836 + assert_eq!(v["count"], 1.0,
837 + "serde_json treats 1 and 1.0 as different values");
838 + }
839 + _ => panic!("Expected Merged"),
840 + }
841 + }
842 +
843 + // Bonus: INSERT vs DELETE in LWW. Insert is not Delete, so the Delete
844 + // branch fires. If local is Insert and remote is Delete, remote wins.
845 + // This means a remote delete can kill a row that was just created locally.
846 + #[test]
847 + fn lww_remote_delete_beats_local_insert() {
848 + let other_device = Uuid::new_v4();
849 + let now = Utc::now();
850 +
851 + let local = make_entry("tasks", "r1", ChangeOp::Insert, now);
852 + let remote = make_pulled("tasks", "r1", ChangeOp::Delete, now, other_device, 1);
853 +
854 + // Remote Delete wins over local Insert — the local insert is discarded.
855 + // This could be surprising: user creates a row, but a concurrent
856 + // delete from another device kills it even though the insert is "newer".
857 + assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepRemote));
858 + }
859 +
860 + // Bonus: local delete wins over remote insert. Same logic in reverse.
861 + #[test]
862 + fn lww_local_delete_beats_remote_insert() {
863 + let other_device = Uuid::new_v4();
864 + let now = Utc::now();
865 +
866 + let local = make_entry("tasks", "r1", ChangeOp::Delete, now);
867 + let remote = make_pulled("tasks", "r1", ChangeOp::Insert, now, other_device, 1);
868 +
869 + assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal));
870 + }
871 +
872 + // Bonus: field_merge where both sides delete the same key.
873 + // Both detect the key as deleted. Local applies deletion first.
874 + // Remote sees it as overlapping but equal-ts, so local's deletion stands.
875 + #[test]
876 + fn field_merge_both_delete_same_key() {
877 + let base = json!({"title": "old", "notes": "old notes"});
878 + let local = json!({"title": "old"}); // deleted "notes"
879 + let remote = json!({"title": "old"}); // also deleted "notes"
880 + let now = Utc::now();
881 +
882 + let result = resolve_field_merge(&local, &remote, &base, now, now);
883 + match result {
884 + Resolution::Merged(v) => {
885 + assert_eq!(v["title"], "old");
886 + assert!(v.get("notes").is_none(),
887 + "Both sides deleted notes, should stay deleted");
888 + }
889 + _ => panic!("Expected Merged"),
890 + }
891 + }
892 +
893 + // Bonus: field_merge where local deletes a key and remote modifies it.
894 + // Equal timestamps: local wins (deletion).
895 + #[test]
896 + fn field_merge_local_deletes_remote_modifies_equal_ts() {
897 + let base = json!({"title": "old", "notes": "original"});
898 + let local = json!({"title": "old"}); // deleted "notes"
899 + let remote = json!({"title": "old", "notes": "updated notes"});
900 + let now = Utc::now();
901 +
902 + let result = resolve_field_merge(&local, &remote, &base, now, now);
903 + match result {
904 + Resolution::Merged(v) => {
905 + // Local wins at equal timestamps: notes stays deleted
906 + assert!(v.get("notes").is_none(),
907 + "Equal ts: local delete should win over remote modify");
908 + }
909 + _ => panic!("Expected Merged"),
910 + }
911 + }
912 +
913 + // Bonus: field_merge where both sides add the SAME new key with
914 + // different values. Both are in local_changed and remote_changed.
915 + #[test]
916 + fn field_merge_both_add_same_new_key_different_values() {
917 + let base = json!({"existing": 1});
918 + let local = json!({"existing": 1, "new_key": "local value"});
919 + let remote = json!({"existing": 1, "new_key": "remote value"});
920 + let old = Utc::now() - chrono::Duration::seconds(60);
921 + let new = Utc::now();
922 +
923 + // Remote newer: remote wins the overlapping new key
924 + let result = resolve_field_merge(&local, &remote, &base, old, new);
925 + match result {
926 + Resolution::Merged(v) => {
927 + assert_eq!(v["new_key"], "remote value");
928 + }
929 + _ => panic!("Expected Merged"),
930 + }
931 +
932 + // Local newer: local wins the overlapping new key
933 + let result = resolve_field_merge(&local, &remote, &base, new, old);
934 + match result {
935 + Resolution::Merged(v) => {
936 + assert_eq!(v["new_key"], "local value");
937 + }
938 + _ => panic!("Expected Merged"),
939 + }
940 +
941 + // Equal: local wins
942 + let result = resolve_field_merge(&local, &remote, &base, new, new);
943 + match result {
944 + Resolution::Merged(v) => {
945 + assert_eq!(v["new_key"], "local value");
946 + }
947 + _ => panic!("Expected Merged"),
948 + }
949 + }
630 950 }
@@ -24,6 +24,7 @@ use chacha20poly1305::{
24 24 use rand::RngCore;
25 25 use serde::{Deserialize, Serialize};
26 26 use unicode_normalization::UnicodeNormalization;
27 + use zeroize::Zeroize;
27 28
28 29 use crate::error::{Result, SyncKitError};
29 30
@@ -105,10 +106,15 @@ fn derive_wrapping_key(
105 106 let argon2 = Argon2::new(Algorithm::Argon2id, Version::V0x13, params);
106 107
107 108 let mut wrapping_key = ZeroizeOnDrop([0u8; KEY_SIZE]);
108 - argon2
109 + let result = argon2
109 110 .hash_password_into(normalized.as_bytes(), salt, &mut wrapping_key.0)
110 - .map_err(|e| SyncKitError::Crypto(format!("Argon2 hash: {e}")))?;
111 + .map_err(|e| SyncKitError::Crypto(format!("Argon2 hash: {e}")));
111 112
113 + // Zeroize the normalized password before returning (defense-in-depth).
114 + let mut norm_bytes = normalized.into_bytes();
115 + norm_bytes.zeroize();
116 +
117 + result?;
112 118 Ok(wrapping_key)
113 119 }
114 120
@@ -203,11 +209,12 @@ pub fn unwrap_master_key(
203 209 let cipher = XChaCha20Poly1305::new((&*wrapping_key).into());
204 210 let nonce = XNonce::from_slice(&nonce_bytes);
205 211
206 - let plaintext = cipher
212 + let mut plaintext = cipher
207 213 .decrypt(nonce, ciphertext.as_ref())
208 214 .map_err(|_| SyncKitError::DecryptionFailed)?;
209 215
210 216 if plaintext.len() != KEY_SIZE {
217 + plaintext.zeroize();
211 218 return Err(SyncKitError::InvalidEnvelope(
212 219 "decrypted key has wrong length".into(),
213 220 ));
@@ -215,6 +222,7 @@ pub fn unwrap_master_key(
215 222
216 223 let mut key = [0u8; KEY_SIZE];
217 224 key.copy_from_slice(&plaintext);
225 + plaintext.zeroize();
218 226 Ok(key)
219 227 }
220 228
@@ -12,7 +12,14 @@ pub enum SyncKitError {
12 12 /// Server returned a non-success HTTP status (4xx or 5xx). Status and message
13 13 /// extracted from response.
14 14 #[error("Server returned {status}: {message}")]
15 - Server { status: u16, message: String },
15 + Server {
16 + status: u16,
17 + message: String,
18 + /// Parsed `Retry-After` header value in seconds (429 responses only).
19 + /// Hidden from public API — used internally by the retry loop.
20 + #[doc(hidden)]
21 + retry_after_secs: Option<u64>,
22 + },
16 23
17 24 /// Response body could not be parsed as expected JSON type.
18 25 #[error("JSON serialization error: {0}")]
@@ -85,7 +92,7 @@ mod tests {
85 92 fn display_all_variants() {
86 93 let cases: Vec<(SyncKitError, &str)> = vec![
87 94 (
88 - SyncKitError::Server { status: 500, message: "boom".into() },
95 + SyncKitError::Server { status: 500, message: "boom".into(), retry_after_secs: None },
89 96 "Server returned 500: boom",
90 97 ),
91 98 (SyncKitError::NoMasterKey, "Encryption not initialized"),
@@ -108,7 +115,7 @@ mod tests {
108 115 #[test]
109 116 fn debug_format_no_panic() {
110 117 let variants: Vec<SyncKitError> = vec![
111 - SyncKitError::Server { status: 500, message: "err".into() },
118 + SyncKitError::Server { status: 500, message: "err".into(), retry_after_secs: None },
112 119 SyncKitError::NoMasterKey,
113 120 SyncKitError::DecryptionFailed,
114 121 SyncKitError::InvalidEnvelope("v".into()),
@@ -149,13 +156,13 @@ mod tests {
149 156 assert!(SyncKitError::InvalidEnvelope("x".into()).source().is_none());
150 157 assert!(SyncKitError::Crypto("x".into()).source().is_none());
151 158 assert!(SyncKitError::Internal("x".into()).source().is_none());
152 - let server = SyncKitError::Server { status: 500, message: "x".into() };
159 + let server = SyncKitError::Server { status: 500, message: "x".into(), retry_after_secs: None };
153 160 assert!(server.source().is_none());
154 161 }
155 162
156 163 #[test]
157 164 fn server_error_empty_message() {
158 - let err = SyncKitError::Server { status: 503, message: String::new() };
165 + let err = SyncKitError::Server { status: 503, message: String::new(), retry_after_secs: None };
159 166 let msg = err.to_string();
160 167 assert!(msg.contains("503"));
161 168 assert!(msg.contains(": "), "Should have colon separator even with empty message");
@@ -164,7 +171,7 @@ mod tests {
164 171 #[test]
165 172 fn server_error_very_long_message() {
166 173 let long = "x".repeat(1_000_000);
167 - let err = SyncKitError::Server { status: 500, message: long };
174 + let err = SyncKitError::Server { status: 500, message: long, retry_after_secs: None };
168 175 let msg = err.to_string();
169 176 assert!(msg.contains("500"));
170 177 assert!(msg.len() > 1_000_000);
@@ -36,9 +36,12 @@ fn user_key(user_id: Uuid) -> String {
36 36 /// Store the master key in the OS keychain.
37 37 #[cfg(feature = "keychain")]
38 38 pub fn store_key(app_id: Uuid, user_id: Uuid, master_key: &[u8; 32]) -> Result<()> {
39 + use zeroize::Zeroize;
39 40 let entry = keyring::Entry::new(&service_name(app_id), &user_key(user_id))?;
40 - let encoded = B64.encode(master_key);
41 - entry.set_password(&encoded)?;
41 + let mut encoded = B64.encode(master_key);
42 + let result = entry.set_password(&encoded);
43 + encoded.zeroize();
44 + result?;
42 45
43 46 tracing::debug!("Master key stored in OS keychain");
44 47 Ok(())
@@ -52,14 +55,17 @@ pub fn load_key(app_id: Uuid, user_id: Uuid) -> Result<Option<[u8; 32]>> {
52 55
53 56 match entry.get_password() {
54 57 Ok(encoded) => {
55 - let bytes = B64.decode(&encoded)?;
58 + use zeroize::Zeroize;
59 + let mut bytes = B64.decode(&encoded)?;
56 60 if bytes.len() != 32 {
61 + bytes.zeroize();
57 62 return Err(SyncKitError::Keychain(
58 63 "stored key has wrong length".into(),
59 64 ));
60 65 }
61 66 let mut key = [0u8; 32];
62 67 key.copy_from_slice(&bytes);
68 + bytes.zeroize();
63 69 tracing::debug!("Master key loaded from OS keychain");
64 70 Ok(Some(key))
65 71 }
@@ -31,8 +31,9 @@ impl fmt::Display for ChangeOp {
31 31 }
32 32
33 33 impl ChangeOp {
34 - /// Parse from a string, returning `None` for unrecognized values.
34 + /// Parse from an uppercase string, returning `None` for unrecognized values.
35 35 ///
36 + /// Case-sensitive: only `"INSERT"`, `"UPDATE"`, `"DELETE"` are accepted.
36 37 /// Useful when reading raw op strings from a local database.
37 38 pub fn from_str_opt(s: &str) -> Option<Self> {
38 39 match s {
@@ -111,6 +112,9 @@ pub struct ChangeEntry {
111 112 pub(crate) struct WirePushRequest {
112 113 /// The device sending the changes.
113 114 pub device_id: Uuid,
115 + /// Client-generated UUID for idempotent push. If a push with the same
116 + /// batch_id was already committed, the server returns the existing cursor.
117 + pub batch_id: Uuid,
114 118 /// Encrypted change entries ready for the wire.
115 119 pub changes: Vec<WireChangeEntry>,
116 120 }
@@ -169,18 +173,28 @@ pub(crate) struct PullChangeEntry {
169 173 #[derive(Debug, Clone, Default, Serialize)]
170 174 pub struct PullFilter {
171 175 /// Only return entries for these table names.
172 - #[serde(skip_serializing_if = "Option::is_none")]
176 + /// `None` and `Some(vec![])` are treated identically (no table filter).
177 + #[serde(skip_serializing_if = "PullFilter::tables_is_empty")]
173 178 pub tables: Option<Vec<String>>,
174 179 /// Only return entries with `client_timestamp >= since`.
175 180 #[serde(skip_serializing_if = "Option::is_none")]
176 181 pub since: Option<DateTime<Utc>>,
177 182 }
178 183
184 + impl PullFilter {
185 + fn tables_is_empty(tables: &Option<Vec<String>>) -> bool {
186 + match tables {
187 + None => true,
188 + Some(v) => v.is_empty(),
189 + }
190 + }
191 + }
192 +
179 193 #[derive(Serialize)]
180 194 pub(crate) struct FilteredPullRequest {
181 195 pub device_id: Uuid,
182 196 pub cursor: i64,
183 - #[serde(skip_serializing_if = "Option::is_none")]
197 + #[serde(skip_serializing_if = "PullFilter::tables_is_empty")]
184 198 pub tables: Option<Vec<String>>,
185 199 #[serde(skip_serializing_if = "Option::is_none")]
186 200 pub since: Option<DateTime<Utc>>,
@@ -209,11 +223,18 @@ pub struct PulledChange {
209 223 #[derive(Serialize)]
210 224 pub(crate) struct PutKeyRequest {
211 225 pub encrypted_key: String,
226 + /// Expected key version for optimistic concurrency control.
227 + pub expected_version: i32,
212 228 }
213 229
214 230 #[derive(Deserialize)]
215 231 pub(crate) struct GetKeyResponse {
216 232 pub encrypted_key: String,
233 + /// Server-side key version, incremented on each password change.
234 + /// Used for optimistic concurrency detection.
235 + #[serde(default)]
236 + #[allow(dead_code)]
237 + pub key_version: Option<i32>,
217 238 }
218 239
219 240 // ── OAuth ──
@@ -452,6 +473,17 @@ mod tests {
452 473 }
453 474
454 475 #[test]
476 + fn pull_filter_empty_tables_vec_omitted() {
477 + let filter = PullFilter {
478 + tables: Some(vec![]),
479 + since: None,
480 + };
481 + let json = serde_json::to_string(&filter).unwrap();
482 + assert!(!json.contains("tables"), "empty tables vec should be omitted: {json}");
483 + assert_eq!(json, "{}");
484 + }
485 +
486 + #[test]
455 487 fn filtered_pull_request_includes_filter_fields() {
456 488 let req = FilteredPullRequest {
457 489 device_id: Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(),
@@ -1094,7 +1094,7 @@ async fn server_error_message_preserved() {
1094 1094 let client = authed_client(&server);
1095 1095 let err = client.status().await.unwrap_err();
1096 1096 match err {
1097 - SyncKitError::Server { status, message } => {
1097 + SyncKitError::Server { status, message, .. } => {
1098 1098 assert_eq!(status, 422);
1099 1099 assert!(
1100 1100 message.contains("Validation failed"),
@@ -1823,7 +1823,7 @@ async fn all_4xx_error_codes_mapped() {
1823 1823 let err = client.status().await.unwrap_err();
1824 1824
1825 1825 match err {
1826 - SyncKitError::Server { status, message } => {
1826 + SyncKitError::Server { status, message, .. } => {
1827 1827 assert_eq!(status, status_code);
1828 1828 assert!(message.contains(&format!("Error {status_code}")));
1829 1829 }
@@ -2290,7 +2290,7 @@ async fn server_returns_413_request_entity_too_large() {
2290 2290
2291 2291 let err = client.push(Uuid::new_v4(), vec![]).await.unwrap_err();
2292 2292 match err {
2293 - SyncKitError::Server { status, message } => {
2293 + SyncKitError::Server { status, message, .. } => {
2294 2294 assert_eq!(status, 413);
2295 2295 assert!(message.contains("too large"));
2296 2296 }