//! Client-side conflict detection and resolution for SyncKit. //! //! SyncKit uses E2E encryption — the server never sees row contents and cannot //! merge or compare data. All conflict resolution must happen client-side after //! decryption. //! //! This module provides: //! - [`detect_conflicts`]: pure function that splits pulled changes into clean //! (non-conflicting) and conflicting sets //! - [`resolve_lww`]: last-write-wins resolution by `client_timestamp` //! - [`resolve_field_merge`]: 3-way JSON object merge using a base version //! - [`ConflictResolver`]: trait for custom resolution strategies use std::collections::HashMap; use chrono::{DateTime, Utc}; use uuid::Uuid; use crate::types::{ChangeEntry, ChangeOp, PulledChange}; /// A remote change that conflicts with a local pending change. #[derive(Debug, Clone)] pub struct ConflictPair { /// The remote change from pull. pub remote: PulledChange, /// The local pending change that conflicts. pub local: ChangeEntry, } /// How a conflict should be resolved. #[derive(Debug, Clone)] pub enum Resolution { /// Keep the local version; skip the remote change. /// The local version will push on the next sync cycle. KeepLocal, /// Apply the remote change, discarding the local version. KeepRemote, /// Apply a merged result that combines both changes. Merged(serde_json::Value), /// Skip both changes (neither apply nor push). Skip, } /// Trait for custom conflict resolution strategies. /// /// Implement this to plug in app-specific merge logic. The `base` parameter /// is `Some` only if the app provides a base-store adapter. pub trait ConflictResolver: Send + Sync { fn resolve( &self, local: &ChangeEntry, remote: &PulledChange, base: Option<&serde_json::Value>, ) -> Resolution; } /// Split pulled changes into non-conflicting and conflicting sets. /// /// A conflict exists when a remote change and a local pending change both /// modify the same `(table, row_id)` from different devices. Changes from /// our own device (echoes) are never treated as conflicts. /// /// Returns `(clean, conflicts)` where `clean` changes can be applied directly. pub fn detect_conflicts( remote: Vec, local_pending: &[ChangeEntry], our_device_id: Uuid, ) -> (Vec, Vec) { // Build lookup: (table, row_id) -> last local pending entry let mut local_map: HashMap<(&str, &str), &ChangeEntry> = HashMap::new(); for entry in local_pending { local_map.insert((&entry.table, &entry.row_id), entry); } let mut clean = Vec::new(); let mut conflicts = Vec::new(); for pulled in remote { // Our own echo — never a conflict if pulled.device_id == our_device_id { clean.push(pulled); continue; } let key = (pulled.entry.table.as_str(), pulled.entry.row_id.as_str()); if let Some(&local_entry) = local_map.get(&key) { conflicts.push(ConflictPair { remote: pulled, local: local_entry.clone(), }); } else { clean.push(pulled); } } (clean, conflicts) } /// Last-write-wins resolution by `client_timestamp`. /// /// - DELETE vs non-DELETE: delete wins regardless of timestamp. /// - Both INSERT/UPDATE: newer timestamp wins. Ties go to local. pub fn resolve_lww(local: &ChangeEntry, remote: &PulledChange) -> Resolution { // DELETE wins over non-DELETE if local.op == ChangeOp::Delete || remote.entry.op == ChangeOp::Delete { return if local.op == ChangeOp::Delete { Resolution::KeepLocal } else { Resolution::KeepRemote }; } // Both are INSERT/UPDATE: newer timestamp wins, ties go to local if local.timestamp >= remote.entry.timestamp { Resolution::KeepLocal } else { Resolution::KeepRemote } } /// 3-way field-level merge for JSON objects. /// /// Compares `local` and `remote` against `base` to determine which fields /// each side changed, then merges non-overlapping changes. For overlapping /// fields, the newer timestamp wins. /// /// Returns `Resolution::KeepRemote` if any input is not a JSON object. pub fn resolve_field_merge( local: &serde_json::Value, remote: &serde_json::Value, base: &serde_json::Value, local_ts: DateTime, remote_ts: DateTime, ) -> Resolution { let (Some(local_obj), Some(remote_obj), Some(base_obj)) = ( local.as_object(), remote.as_object(), base.as_object(), ) else { return Resolution::KeepRemote; }; // Compute diffs: keys where local/remote differ from base let mut local_changed: HashMap<&str, Option<&serde_json::Value>> = HashMap::new(); let mut remote_changed: HashMap<&str, Option<&serde_json::Value>> = HashMap::new(); // Check keys in base for changes or deletions for key in base_obj.keys() { let base_val = &base_obj[key]; match local_obj.get(key) { Some(local_val) if local_val != base_val => { local_changed.insert(key, Some(local_val)); } None => { // Key deleted on local side local_changed.insert(key, None); } _ => {} } match remote_obj.get(key) { Some(remote_val) if remote_val != base_val => { remote_changed.insert(key, Some(remote_val)); } None => { // Key deleted on remote side remote_changed.insert(key, None); } _ => {} } } // Check for new keys added by local (not in base) for (key, val) in local_obj { if !base_obj.contains_key(key) { local_changed.insert(key, Some(val)); } } // Check for new keys added by remote (not in base) for (key, val) in remote_obj { if !base_obj.contains_key(key) { remote_changed.insert(key, Some(val)); } } // Build merged result starting from base let mut result = base_obj.clone(); // Apply local changes for (key, val) in &local_changed { match val { Some(v) => { result.insert((*key).to_string(), (*v).clone()); } None => { result.remove(*key); } } } // Apply remote changes (non-overlapping only, or newer timestamp wins) for (key, val) in &remote_changed { if local_changed.contains_key(key) { // Overlapping: newer timestamp wins, ties go to local if remote_ts > local_ts { match val { Some(v) => { result.insert((*key).to_string(), (*v).clone()); } None => { result.remove(*key); } } } // else: local already applied above } else { // Non-overlapping: apply remote match val { Some(v) => { result.insert((*key).to_string(), (*v).clone()); } None => { result.remove(*key); } } } } Resolution::Merged(serde_json::Value::Object(result)) } #[cfg(test)] mod tests { use super::*; use serde_json::json; fn make_entry(table: &str, row_id: &str, op: ChangeOp, ts: DateTime) -> ChangeEntry { ChangeEntry { table: table.to_string(), op, row_id: row_id.to_string(), timestamp: ts, data: Some(json!({"value": "test"})), } } fn make_pulled( table: &str, row_id: &str, op: ChangeOp, ts: DateTime, device_id: Uuid, seq: i64, ) -> PulledChange { PulledChange { entry: make_entry(table, row_id, op, ts), device_id, seq, } } // ── detect_conflicts ── #[test] fn no_conflicts_when_different_rows() { let our_device = Uuid::new_v4(); let other_device = Uuid::new_v4(); let now = Utc::now(); let remote = vec![ make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1), ]; let local = vec![ make_entry("tasks", "r2", ChangeOp::Update, now), ]; let (clean, conflicts) = detect_conflicts(remote, &local, our_device); assert_eq!(clean.len(), 1); assert!(conflicts.is_empty()); } #[test] fn conflict_detected_same_row_different_device() { let our_device = Uuid::new_v4(); let other_device = Uuid::new_v4(); let now = Utc::now(); let remote = vec![ make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1), ]; let local = vec![ make_entry("tasks", "r1", ChangeOp::Update, now), ]; let (clean, conflicts) = detect_conflicts(remote, &local, our_device); assert!(clean.is_empty()); assert_eq!(conflicts.len(), 1); assert_eq!(conflicts[0].remote.entry.row_id, "r1"); assert_eq!(conflicts[0].local.row_id, "r1"); } #[test] fn own_echo_not_treated_as_conflict() { let our_device = Uuid::new_v4(); let now = Utc::now(); let remote = vec![ make_pulled("tasks", "r1", ChangeOp::Update, now, our_device, 1), ]; let local = vec![ make_entry("tasks", "r1", ChangeOp::Update, now), ]; let (clean, conflicts) = detect_conflicts(remote, &local, our_device); assert_eq!(clean.len(), 1); assert!(conflicts.is_empty()); } #[test] fn different_tables_same_row_id_no_conflict() { let our_device = Uuid::new_v4(); let other_device = Uuid::new_v4(); let now = Utc::now(); let remote = vec![ make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1), ]; let local = vec![ make_entry("events", "r1", ChangeOp::Update, now), ]; let (clean, conflicts) = detect_conflicts(remote, &local, our_device); assert_eq!(clean.len(), 1); assert!(conflicts.is_empty()); } #[test] fn detect_conflicts_correct_split() { let our_device = Uuid::new_v4(); let other_device = Uuid::new_v4(); let now = Utc::now(); let remote = vec![ make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1), make_pulled("tasks", "r2", ChangeOp::Insert, now, other_device, 2), make_pulled("events", "r3", ChangeOp::Delete, now, other_device, 3), ]; let local = vec![ make_entry("tasks", "r1", ChangeOp::Update, now), // r2 not in local → clean // r3 not in local → clean ]; let (clean, conflicts) = detect_conflicts(remote, &local, our_device); assert_eq!(clean.len(), 2); assert_eq!(conflicts.len(), 1); assert_eq!(conflicts[0].remote.entry.row_id, "r1"); } #[test] fn empty_remote_produces_no_conflicts() { let our_device = Uuid::new_v4(); let now = Utc::now(); let remote = vec![]; let local = vec![ make_entry("tasks", "r1", ChangeOp::Update, now), ]; let (clean, conflicts) = detect_conflicts(remote, &local, our_device); assert!(clean.is_empty()); assert!(conflicts.is_empty()); } #[test] fn empty_local_produces_no_conflicts() { let our_device = Uuid::new_v4(); let other_device = Uuid::new_v4(); let now = Utc::now(); let remote = vec![ make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1), ]; let local: Vec = vec![]; let (clean, conflicts) = detect_conflicts(remote, &local, our_device); assert_eq!(clean.len(), 1); assert!(conflicts.is_empty()); } // ── resolve_lww ── #[test] fn lww_picks_newer_timestamp() { let other_device = Uuid::new_v4(); let old = Utc::now() - chrono::Duration::seconds(60); let new = Utc::now(); let local = make_entry("tasks", "r1", ChangeOp::Update, old); let remote = make_pulled("tasks", "r1", ChangeOp::Update, new, other_device, 1); assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepRemote)); } #[test] fn lww_local_wins_when_newer() { let other_device = Uuid::new_v4(); let old = Utc::now() - chrono::Duration::seconds(60); let new = Utc::now(); let local = make_entry("tasks", "r1", ChangeOp::Update, new); let remote = make_pulled("tasks", "r1", ChangeOp::Update, old, other_device, 1); assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal)); } #[test] fn lww_equal_timestamps_local_wins() { let other_device = Uuid::new_v4(); let now = Utc::now(); let local = make_entry("tasks", "r1", ChangeOp::Update, now); let remote = make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1); assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal)); } #[test] fn lww_local_delete_wins_over_update() { let other_device = Uuid::new_v4(); let now = Utc::now(); let local = make_entry("tasks", "r1", ChangeOp::Delete, now); let remote = make_pulled("tasks", "r1", ChangeOp::Update, now, other_device, 1); assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal)); } #[test] fn lww_remote_delete_wins_over_update() { let other_device = Uuid::new_v4(); let now = Utc::now(); let local = make_entry("tasks", "r1", ChangeOp::Update, now); let remote = make_pulled("tasks", "r1", ChangeOp::Delete, now, other_device, 1); assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepRemote)); } #[test] fn lww_both_delete_local_wins() { let other_device = Uuid::new_v4(); let now = Utc::now(); let local = make_entry("tasks", "r1", ChangeOp::Delete, now); let remote = make_pulled("tasks", "r1", ChangeOp::Delete, now, other_device, 1); assert!(matches!(resolve_lww(&local, &remote), Resolution::KeepLocal)); } // ── resolve_field_merge ── #[test] fn field_merge_non_overlapping_changes() { let base = json!({"title": "old", "status": "pending", "priority": 1}); let local = json!({"title": "new title", "status": "pending", "priority": 1}); let remote = json!({"title": "old", "status": "done", "priority": 1}); let now = Utc::now(); let result = resolve_field_merge(&local, &remote, &base, now, now); match result { Resolution::Merged(v) => { assert_eq!(v["title"], "new title"); assert_eq!(v["status"], "done"); assert_eq!(v["priority"], 1); } _ => panic!("Expected Merged"), } } #[test] fn field_merge_overlapping_newer_wins() { let base = json!({"title": "old"}); let local = json!({"title": "local title"}); let remote = json!({"title": "remote title"}); let old = Utc::now() - chrono::Duration::seconds(60); let new = Utc::now(); // Remote is newer → remote wins the overlapping field let result = resolve_field_merge(&local, &remote, &base, old, new); match result { Resolution::Merged(v) => { assert_eq!(v["title"], "remote title"); } _ => panic!("Expected Merged"), } // Local is newer → local wins the overlapping field let result = resolve_field_merge(&local, &remote, &base, new, old); match result { Resolution::Merged(v) => { assert_eq!(v["title"], "local title"); } _ => panic!("Expected Merged"), } } #[test] fn field_merge_key_deleted_on_one_side() { let base = json!({"title": "old", "notes": "some notes"}); let local = json!({"title": "old"}); // notes deleted let remote = json!({"title": "old", "notes": "some notes"}); let now = Utc::now(); let result = resolve_field_merge(&local, &remote, &base, now, now); match result { Resolution::Merged(v) => { assert_eq!(v["title"], "old"); assert!(v.get("notes").is_none(), "notes should be deleted"); } _ => panic!("Expected Merged"), } } #[test] fn field_merge_non_object_falls_back() { let base = json!("string value"); let local = json!("local string"); let remote = json!("remote string"); let now = Utc::now(); assert!(matches!( resolve_field_merge(&local, &remote, &base, now, now), Resolution::KeepRemote )); } #[test] fn field_merge_empty_base_treats_all_as_changed() { let base = json!({}); let local = json!({"title": "from local"}); let remote = json!({"status": "from remote"}); let now = Utc::now(); let result = resolve_field_merge(&local, &remote, &base, now, now); match result { Resolution::Merged(v) => { assert_eq!(v["title"], "from local"); assert_eq!(v["status"], "from remote"); } _ => panic!("Expected Merged"), } } #[test] fn field_merge_new_keys_from_both_sides() { let base = json!({"existing": 1}); let local = json!({"existing": 1, "local_new": "a"}); let remote = json!({"existing": 1, "remote_new": "b"}); let now = Utc::now(); let result = resolve_field_merge(&local, &remote, &base, now, now); match result { Resolution::Merged(v) => { assert_eq!(v["existing"], 1); assert_eq!(v["local_new"], "a"); assert_eq!(v["remote_new"], "b"); } _ => panic!("Expected Merged"), } } // ── PulledChange preserves metadata ── #[test] fn pulled_change_preserves_device_id_and_seq() { let device_id = Uuid::new_v4(); let now = Utc::now(); let pulled = make_pulled("tasks", "r1", ChangeOp::Insert, now, device_id, 42); assert_eq!(pulled.device_id, device_id); assert_eq!(pulled.seq, 42); assert_eq!(pulled.entry.table, "tasks"); assert_eq!(pulled.entry.row_id, "r1"); } #[test] fn pulled_change_clone_works() { let device_id = Uuid::new_v4(); let now = Utc::now(); let pulled = make_pulled("tasks", "r1", ChangeOp::Insert, now, device_id, 1); let cloned = pulled.clone(); assert_eq!(cloned.device_id, pulled.device_id); assert_eq!(cloned.seq, pulled.seq); assert_eq!(cloned.entry.table, pulled.entry.table); } // ── Resolution variants ── #[test] fn resolution_debug_format() { let keep_local = Resolution::KeepLocal; let keep_remote = Resolution::KeepRemote; let merged = Resolution::Merged(json!({"a": 1})); let skip = Resolution::Skip; assert!(format!("{:?}", keep_local).contains("KeepLocal")); assert!(format!("{:?}", keep_remote).contains("KeepRemote")); assert!(format!("{:?}", merged).contains("Merged")); assert!(format!("{:?}", skip).contains("Skip")); } // ── ConflictResolver trait ── #[test] fn custom_resolver_works() { struct AlwaysRemote; impl ConflictResolver for AlwaysRemote { fn resolve( &self, _local: &ChangeEntry, _remote: &PulledChange, _base: Option<&serde_json::Value>, ) -> Resolution { Resolution::KeepRemote } } let resolver = AlwaysRemote; let now = Utc::now(); let local = make_entry("tasks", "r1", ChangeOp::Update, now); let remote = make_pulled("tasks", "r1", ChangeOp::Update, now, Uuid::new_v4(), 1); assert!(matches!( resolver.resolve(&local, &remote, None), Resolution::KeepRemote )); } }