Skip to main content

max / synckit-client

20.1 KB · 631 lines History Blame Raw
1 //! Client-side conflict detection and resolution for SyncKit.
2 //!
3 //! SyncKit uses E2E encryption — the server never sees row contents and cannot
4 //! merge or compare data. All conflict resolution must happen client-side after
5 //! decryption.
6 //!
7 //! This module provides:
8 //! - [`detect_conflicts`]: pure function that splits pulled changes into clean
9 //! (non-conflicting) and conflicting sets
10 //! - [`resolve_lww`]: last-write-wins resolution by `client_timestamp`
11 //! - [`resolve_field_merge`]: 3-way JSON object merge using a base version
12 //! - [`ConflictResolver`]: trait for custom resolution strategies
13
14 use std::collections::HashMap;
15
16 use chrono::{DateTime, Utc};
17 use uuid::Uuid;
18
19 use crate::types::{ChangeEntry, ChangeOp, PulledChange};
20
21 /// A remote change that conflicts with a local pending change.
22 #[derive(Debug, Clone)]
23 pub struct ConflictPair {
24 /// The remote change from pull.
25 pub remote: PulledChange,
26 /// The local pending change that conflicts.
27 pub local: ChangeEntry,
28 }
29
30 /// How a conflict should be resolved.
31 #[derive(Debug, Clone)]
32 pub enum Resolution {
33 /// Keep the local version; skip the remote change.
34 /// The local version will push on the next sync cycle.
35 KeepLocal,
36 /// Apply the remote change, discarding the local version.
37 KeepRemote,
38 /// Apply a merged result that combines both changes.
39 Merged(serde_json::Value),
40 /// Skip both changes (neither apply nor push).
41 Skip,
42 }
43
44 /// Trait for custom conflict resolution strategies.
45 ///
46 /// Implement this to plug in app-specific merge logic. The `base` parameter
47 /// is `Some` only if the app provides a base-store adapter.
48 pub trait ConflictResolver: Send + Sync {
49 fn resolve(
50 &self,
51 local: &ChangeEntry,
52 remote: &PulledChange,
53 base: Option<&serde_json::Value>,
54 ) -> Resolution;
55 }
56
57 /// Split pulled changes into non-conflicting and conflicting sets.
58 ///
59 /// A conflict exists when a remote change and a local pending change both
60 /// modify the same `(table, row_id)` from different devices. Changes from
61 /// our own device (echoes) are never treated as conflicts.
62 ///
63 /// Returns `(clean, conflicts)` where `clean` changes can be applied directly.
64 pub fn detect_conflicts(
65 remote: Vec<PulledChange>,
66 local_pending: &[ChangeEntry],
67 our_device_id: Uuid,
68 ) -> (Vec<PulledChange>, Vec<ConflictPair>) {
69 // Build lookup: (table, row_id) -> last local pending entry
70 let mut local_map: HashMap<(&str, &str), &ChangeEntry> = HashMap::new();
71 for entry in local_pending {
72 local_map.insert((&entry.table, &entry.row_id), entry);
73 }
74
75 let mut clean = Vec::new();
76 let mut conflicts = Vec::new();
77
78 for pulled in remote {
79 // Our own echo — never a conflict
80 if pulled.device_id == our_device_id {
81 clean.push(pulled);
82 continue;
83 }
84
85 let key = (pulled.entry.table.as_str(), pulled.entry.row_id.as_str());
86 if let Some(&local_entry) = local_map.get(&key) {
87 conflicts.push(ConflictPair {
88 remote: pulled,
89 local: local_entry.clone(),
90 });
91 } else {
92 clean.push(pulled);
93 }
94 }
95
96 (clean, conflicts)
97 }
98
99 /// Last-write-wins resolution by `client_timestamp`.
100 ///
101 /// - DELETE vs non-DELETE: delete wins regardless of timestamp.
102 /// - Both INSERT/UPDATE: newer timestamp wins. Ties go to local.
103 pub fn resolve_lww(local: &ChangeEntry, remote: &PulledChange) -> Resolution {
104 // DELETE wins over non-DELETE
105 if local.op == ChangeOp::Delete || remote.entry.op == ChangeOp::Delete {
106 return if local.op == ChangeOp::Delete {
107 Resolution::KeepLocal
108 } else {
109 Resolution::KeepRemote
110 };
111 }
112
113 // Both are INSERT/UPDATE: newer timestamp wins, ties go to local
114 if local.timestamp >= remote.entry.timestamp {
115 Resolution::KeepLocal
116 } else {
117 Resolution::KeepRemote
118 }
119 }
120
121 /// 3-way field-level merge for JSON objects.
122 ///
123 /// Compares `local` and `remote` against `base` to determine which fields
124 /// each side changed, then merges non-overlapping changes. For overlapping
125 /// fields, the newer timestamp wins.
126 ///
127 /// Returns `Resolution::KeepRemote` if any input is not a JSON object.
128 pub fn resolve_field_merge(
129 local: &serde_json::Value,
130 remote: &serde_json::Value,
131 base: &serde_json::Value,
132 local_ts: DateTime<Utc>,
133 remote_ts: DateTime<Utc>,
134 ) -> Resolution {
135 let (Some(local_obj), Some(remote_obj), Some(base_obj)) = (
136 local.as_object(),
137 remote.as_object(),
138 base.as_object(),
139 ) else {
140 return Resolution::KeepRemote;
141 };
142
143 // Compute diffs: keys where local/remote differ from base
144 let mut local_changed: HashMap<&str, Option<&serde_json::Value>> = HashMap::new();
145 let mut remote_changed: HashMap<&str, Option<&serde_json::Value>> = HashMap::new();
146
147 // Check keys in base for changes or deletions
148 for key in base_obj.keys() {
149 let base_val = &base_obj[key];
150
151 match local_obj.get(key) {
152 Some(local_val) if local_val != base_val => {
153 local_changed.insert(key, Some(local_val));
154 }
155 None => {
156 // Key deleted on local side
157 local_changed.insert(key, None);
158 }
159 _ => {}
160 }
161
162 match remote_obj.get(key) {
163 Some(remote_val) if remote_val != base_val => {
164 remote_changed.insert(key, Some(remote_val));
165 }
166 None => {
167 // Key deleted on remote side
168 remote_changed.insert(key, None);
169 }
170 _ => {}
171 }
172 }
173
174 // Check for new keys added by local (not in base)
175 for (key, val) in local_obj {
176 if !base_obj.contains_key(key) {
177 local_changed.insert(key, Some(val));
178 }
179 }
180
181 // Check for new keys added by remote (not in base)
182 for (key, val) in remote_obj {
183 if !base_obj.contains_key(key) {
184 remote_changed.insert(key, Some(val));
185 }
186 }
187
188 // Build merged result starting from base
189 let mut result = base_obj.clone();
190
191 // Apply local changes
192 for (key, val) in &local_changed {
193 match val {
194 Some(v) => { result.insert((*key).to_string(), (*v).clone()); }
195 None => { result.remove(*key); }
196 }
197 }
198
199 // Apply remote changes (non-overlapping only, or newer timestamp wins)
200 for (key, val) in &remote_changed {
201 if local_changed.contains_key(key) {
202 // Overlapping: newer timestamp wins, ties go to local
203 if remote_ts > local_ts {
204 match val {
205 Some(v) => { result.insert((*key).to_string(), (*v).clone()); }
206 None => { result.remove(*key); }
207 }
208 }
209 // else: local already applied above
210 } else {
211 // Non-overlapping: apply remote
212 match val {
213 Some(v) => { result.insert((*key).to_string(), (*v).clone()); }
214 None => { result.remove(*key); }
215 }
216 }
217 }
218
219 Resolution::Merged(serde_json::Value::Object(result))
220 }
221
222 #[cfg(test)]
223 mod tests {
224 use super::*;
225 use serde_json::json;
226
227 fn make_entry(table: &str, row_id: &str, op: ChangeOp, ts: DateTime<Utc>) -> ChangeEntry {
228 ChangeEntry {
229 table: table.to_string(),
230 op,
231 row_id: row_id.to_string(),
232 timestamp: ts,
233 data: Some(json!({"value": "test"})),
234 }
235 }
236
237 fn make_pulled(
238 table: &str,
239 row_id: &str,
240 op: ChangeOp,
241 ts: DateTime<Utc>,
242 device_id: Uuid,
243 seq: i64,
244 ) -> PulledChange {
245 PulledChange {
246 entry: make_entry(table, row_id, op, ts),
247 device_id,
248 seq,
249 }
250 }
251
252 // ── detect_conflicts ──
253
254 #[test]
255 fn no_conflicts_when_different_rows() {
256 let our_device = Uuid::new_v4();
257 let other_device = Uuid::new_v4();
258 let now = Utc::now();
259
260 let remote = vec![
261 make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1),
262 ];
263 let local = vec![
264 make_entry("tasks", "r2", ChangeOp::Update, now),
265 ];
266
267 let (clean, conflicts) = detect_conflicts(remote, &local, our_device);
268 assert_eq!(clean.len(), 1);
269 assert!(conflicts.is_empty());
270 }
271
272 #[test]
273 fn conflict_detected_same_row_different_device() {
274 let our_device = Uuid::new_v4();
275 let other_device = Uuid::new_v4();
276 let now = Utc::now();
277
278 let remote = vec![
279 make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1),
280 ];
281 let local = vec![
282 make_entry("tasks", "r1", ChangeOp::Update, now),
283 ];
284
285 let (clean, conflicts) = detect_conflicts(remote, &local, our_device);
286 assert!(clean.is_empty());
287 assert_eq!(conflicts.len(), 1);
288 assert_eq!(conflicts[0].remote.entry.row_id, "r1");
289 assert_eq!(conflicts[0].local.row_id, "r1");
290 }
291
292 #[test]
293 fn own_echo_not_treated_as_conflict() {
294 let our_device = Uuid::new_v4();
295 let now = Utc::now();
296
297 let remote = vec![
298 make_pulled("tasks", "r1", ChangeOp::Update, now, our_device, 1),
299 ];
300 let local = vec![
301 make_entry("tasks", "r1", ChangeOp::Update, now),
302 ];
303
304 let (clean, conflicts) = detect_conflicts(remote, &local, our_device);
305 assert_eq!(clean.len(), 1);
306 assert!(conflicts.is_empty());
307 }
308
309 #[test]
310 fn different_tables_same_row_id_no_conflict() {
311 let our_device = Uuid::new_v4();
312 let other_device = Uuid::new_v4();
313 let now = Utc::now();
314
315 let remote = vec![
316 make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1),
317 ];
318 let local = vec![
319 make_entry("events", "r1", ChangeOp::Update, now),
320 ];
321
322 let (clean, conflicts) = detect_conflicts(remote, &local, our_device);
323 assert_eq!(clean.len(), 1);
324 assert!(conflicts.is_empty());
325 }
326
327 #[test]
328 fn detect_conflicts_correct_split() {
329 let our_device = Uuid::new_v4();
330 let other_device = Uuid::new_v4();
331 let now = Utc::now();
332
333 let remote = vec![
334 make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1),
335 make_pulled("tasks", "r2", ChangeOp::Insert, now, other_device, 2),
336 make_pulled("events", "r3", ChangeOp::Delete, now, other_device, 3),
337 ];
338 let local = vec![
339 make_entry("tasks", "r1", ChangeOp::Update, now),
340 // r2 not in local → clean
341 // r3 not in local → clean
342 ];
343
344 let (clean, conflicts) = detect_conflicts(remote, &local, our_device);
345 assert_eq!(clean.len(), 2);
346 assert_eq!(conflicts.len(), 1);
347 assert_eq!(conflicts[0].remote.entry.row_id, "r1");
348 }
349
350 #[test]
351 fn empty_remote_produces_no_conflicts() {
352 let our_device = Uuid::new_v4();
353 let now = Utc::now();
354
355 let remote = vec![];
356 let local = vec![
357 make_entry("tasks", "r1", ChangeOp::Update, now),
358 ];
359
360 let (clean, conflicts) = detect_conflicts(remote, &local, our_device);
361 assert!(clean.is_empty());
362 assert!(conflicts.is_empty());
363 }
364
365 #[test]
366 fn empty_local_produces_no_conflicts() {
367 let our_device = Uuid::new_v4();
368 let other_device = Uuid::new_v4();
369 let now = Utc::now();
370
371 let remote = vec![
372 make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1),
373 ];
374 let local: Vec<ChangeEntry> = vec![];
375
376 let (clean, conflicts) = detect_conflicts(remote, &local, our_device);
377 assert_eq!(clean.len(), 1);
378 assert!(conflicts.is_empty());
379 }
380
381 // ── resolve_lww ──
382
383 #[test]
384 fn lww_picks_newer_timestamp() {
385 let other_device = Uuid::new_v4();
386 let old = Utc::now() - chrono::Duration::seconds(60);
387 let new = Utc::now();
388
389 let local = make_entry("tasks", "r1", ChangeOp::Update, old);
390 let remote = make_pulled("tasks", "r1", ChangeOp::Update, new, other_device, 1);
391
392 assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepRemote));
393 }
394
395 #[test]
396 fn lww_local_wins_when_newer() {
397 let other_device = Uuid::new_v4();
398 let old = Utc::now() - chrono::Duration::seconds(60);
399 let new = Utc::now();
400
401 let local = make_entry("tasks", "r1", ChangeOp::Update, new);
402 let remote = make_pulled("tasks", "r1", ChangeOp::Update, old, other_device, 1);
403
404 assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal));
405 }
406
407 #[test]
408 fn lww_equal_timestamps_local_wins() {
409 let other_device = Uuid::new_v4();
410 let now = Utc::now();
411
412 let local = make_entry("tasks", "r1", ChangeOp::Update, now);
413 let remote = make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1);
414
415 assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal));
416 }
417
418 #[test]
419 fn lww_local_delete_wins_over_update() {
420 let other_device = Uuid::new_v4();
421 let now = Utc::now();
422
423 let local = make_entry("tasks", "r1", ChangeOp::Delete, now);
424 let remote = make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1);
425
426 assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal));
427 }
428
429 #[test]
430 fn lww_remote_delete_wins_over_update() {
431 let other_device = Uuid::new_v4();
432 let now = Utc::now();
433
434 let local = make_entry("tasks", "r1", ChangeOp::Update, now);
435 let remote = make_pulled("tasks", "r1", ChangeOp::Delete, now, other_device, 1);
436
437 assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepRemote));
438 }
439
440 #[test]
441 fn lww_both_delete_local_wins() {
442 let other_device = Uuid::new_v4();
443 let now = Utc::now();
444
445 let local = make_entry("tasks", "r1", ChangeOp::Delete, now);
446 let remote = make_pulled("tasks", "r1", ChangeOp::Delete, now, other_device, 1);
447
448 assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal));
449 }
450
451 // ── resolve_field_merge ──
452
453 #[test]
454 fn field_merge_non_overlapping_changes() {
455 let base = json!({"title": "old", "status": "pending", "priority": 1});
456 let local = json!({"title": "new title", "status": "pending", "priority": 1});
457 let remote = json!({"title": "old", "status": "done", "priority": 1});
458 let now = Utc::now();
459
460 let result = resolve_field_merge(&local, &remote, &base, now, now);
461 match result {
462 Resolution::Merged(v) => {
463 assert_eq!(v["title"], "new title");
464 assert_eq!(v["status"], "done");
465 assert_eq!(v["priority"], 1);
466 }
467 _ => panic!("Expected Merged"),
468 }
469 }
470
471 #[test]
472 fn field_merge_overlapping_newer_wins() {
473 let base = json!({"title": "old"});
474 let local = json!({"title": "local title"});
475 let remote = json!({"title": "remote title"});
476 let old = Utc::now() - chrono::Duration::seconds(60);
477 let new = Utc::now();
478
479 // Remote is newer → remote wins the overlapping field
480 let result = resolve_field_merge(&local, &remote, &base, old, new);
481 match result {
482 Resolution::Merged(v) => {
483 assert_eq!(v["title"], "remote title");
484 }
485 _ => panic!("Expected Merged"),
486 }
487
488 // Local is newer → local wins the overlapping field
489 let result = resolve_field_merge(&local, &remote, &base, new, old);
490 match result {
491 Resolution::Merged(v) => {
492 assert_eq!(v["title"], "local title");
493 }
494 _ => panic!("Expected Merged"),
495 }
496 }
497
498 #[test]
499 fn field_merge_key_deleted_on_one_side() {
500 let base = json!({"title": "old", "notes": "some notes"});
501 let local = json!({"title": "old"}); // notes deleted
502 let remote = json!({"title": "old", "notes": "some notes"});
503 let now = Utc::now();
504
505 let result = resolve_field_merge(&local, &remote, &base, now, now);
506 match result {
507 Resolution::Merged(v) => {
508 assert_eq!(v["title"], "old");
509 assert!(v.get("notes").is_none(), "notes should be deleted");
510 }
511 _ => panic!("Expected Merged"),
512 }
513 }
514
515 #[test]
516 fn field_merge_non_object_falls_back() {
517 let base = json!("string value");
518 let local = json!("local string");
519 let remote = json!("remote string");
520 let now = Utc::now();
521
522 assert!(matches!(
523 resolve_field_merge(&local, &remote, &base, now, now),
524 Resolution::KeepRemote
525 ));
526 }
527
528 #[test]
529 fn field_merge_empty_base_treats_all_as_changed() {
530 let base = json!({});
531 let local = json!({"title": "from local"});
532 let remote = json!({"status": "from remote"});
533 let now = Utc::now();
534
535 let result = resolve_field_merge(&local, &remote, &base, now, now);
536 match result {
537 Resolution::Merged(v) => {
538 assert_eq!(v["title"], "from local");
539 assert_eq!(v["status"], "from remote");
540 }
541 _ => panic!("Expected Merged"),
542 }
543 }
544
545 #[test]
546 fn field_merge_new_keys_from_both_sides() {
547 let base = json!({"existing": 1});
548 let local = json!({"existing": 1, "local_new": "a"});
549 let remote = json!({"existing": 1, "remote_new": "b"});
550 let now = Utc::now();
551
552 let result = resolve_field_merge(&local, &remote, &base, now, now);
553 match result {
554 Resolution::Merged(v) => {
555 assert_eq!(v["existing"], 1);
556 assert_eq!(v["local_new"], "a");
557 assert_eq!(v["remote_new"], "b");
558 }
559 _ => panic!("Expected Merged"),
560 }
561 }
562
563 // ── PulledChange preserves metadata ──
564
565 #[test]
566 fn pulled_change_preserves_device_id_and_seq() {
567 let device_id = Uuid::new_v4();
568 let now = Utc::now();
569 let pulled = make_pulled("tasks", "r1", ChangeOp::Insert, now, device_id, 42);
570
571 assert_eq!(pulled.device_id, device_id);
572 assert_eq!(pulled.seq, 42);
573 assert_eq!(pulled.entry.table, "tasks");
574 assert_eq!(pulled.entry.row_id, "r1");
575 }
576
577 #[test]
578 fn pulled_change_clone_works() {
579 let device_id = Uuid::new_v4();
580 let now = Utc::now();
581 let pulled = make_pulled("tasks", "r1", ChangeOp::Insert, now, device_id, 1);
582 let cloned = pulled.clone();
583
584 assert_eq!(cloned.device_id, pulled.device_id);
585 assert_eq!(cloned.seq, pulled.seq);
586 assert_eq!(cloned.entry.table, pulled.entry.table);
587 }
588
589 // ── Resolution variants ──
590
591 #[test]
592 fn resolution_debug_format() {
593 let keep_local = Resolution::KeepLocal;
594 let keep_remote = Resolution::KeepRemote;
595 let merged = Resolution::Merged(json!({"a": 1}));
596 let skip = Resolution::Skip;
597
598 assert!(format!("{:?}", keep_local).contains("KeepLocal"));
599 assert!(format!("{:?}", keep_remote).contains("KeepRemote"));
600 assert!(format!("{:?}", merged).contains("Merged"));
601 assert!(format!("{:?}", skip).contains("Skip"));
602 }
603
604 // ── ConflictResolver trait ──
605
606 #[test]
607 fn custom_resolver_works() {
608 struct AlwaysRemote;
609 impl ConflictResolver for AlwaysRemote {
610 fn resolve(
611 &self,
612 _local: &ChangeEntry,
613 _remote: &PulledChange,
614 _base: Option<&serde_json::Value>,
615 ) -> Resolution {
616 Resolution::KeepRemote
617 }
618 }
619
620 let resolver = AlwaysRemote;
621 let now = Utc::now();
622 let local = make_entry("tasks", "r1", ChangeOp::Update, now);
623 let remote = make_pulled("tasks", "r1", ChangeOp::Update, now, Uuid::new_v4(), 1);
624
625 assert!(matches!(
626 resolver.resolve(&local, &remote, None),
627 Resolution::KeepRemote
628 ));
629 }
630 }
631