//! Conflict resolution: apply remote changes locally with FK-safe ordering. use rusqlite::Connection; use synckit_client::{ChangeEntry, ChangeOp}; use tracing::instrument; use crate::error::Result; use super::{json_to_sql, pk_columns, table_columns, UPSERT_ORDER, DELETE_ORDER}; /// Apply a batch of remote changes locally, with trigger suppression. /// /// The flag set, data changes, and flag clear all happen inside a single /// transaction so a process crash mid-apply rolls back everything — the flag /// never gets stuck in the DB. #[instrument(skip_all)] pub(crate) fn apply_remote_changes(conn: &Connection, changes: &[ChangeEntry]) -> Result { let tx = conn.unchecked_transaction()?; // Suppress triggers while applying (inside the transaction) tx.execute( "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'", [], )?; let mut count: i64 = 0; // Separate upserts and deletes let mut upserts: Vec<&ChangeEntry> = Vec::new(); let mut deletes: Vec<&ChangeEntry> = Vec::new(); for change in changes { match change.op { ChangeOp::Insert | ChangeOp::Update => upserts.push(change), ChangeOp::Delete => deletes.push(change), } } // Apply upserts in FK-safe order for table in UPSERT_ORDER { for change in &upserts { if change.table == *table && let Some(data) = &change.data { apply_upsert(&tx, table, data)?; count += 1; } } } // Apply deletes in reverse FK order. Pass `data` (decrypted by synckit) // so apply_delete can read canonical key fields from JSON — post-M018 // sync_changelog DELETE rows hash the row_id, so the cleartext PK lives // only in `data`. for table in DELETE_ORDER { for change in &deletes { if change.table == *table { apply_delete(&tx, table, &change.row_id, change.data.as_ref())?; count += 1; } } } // Clear the flag (still inside the transaction) tx.execute( "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'", [], )?; tx.commit()?; Ok(count) } /// Apply an upsert (INSERT OR REPLACE) for a single row. pub(crate) fn apply_upsert( conn: &Connection, table: &str, data: &serde_json::Value, ) -> Result<()> { let columns = match table_columns(table) { Some(c) => c, None => return Ok(()), }; let obj = match data.as_object() { Some(o) => o, None => return Ok(()), }; let mut col_names = Vec::new(); let mut placeholders = Vec::new(); let mut values: Vec> = Vec::new(); for (i, col) in columns.iter().enumerate() { if let Some(val) = obj.get(*col) { col_names.push(*col); placeholders.push(format!("?{}", i + 1)); values.push(json_to_sql(val)); } } if col_names.is_empty() { return Ok(()); } // Use INSERT ... ON CONFLICT DO UPDATE to avoid DELETE+INSERT behavior of // INSERT OR REPLACE, which would cascade FK deletes to child rows. let pks = pk_columns(table); let pk_set: std::collections::HashSet<&str> = pks.iter().copied().collect(); let non_pk_updates: Vec = col_names .iter() .enumerate() .filter(|(_, col)| !pk_set.contains(*col)) .map(|(i, col)| format!("{} = ?{}", col, i + 1)) .collect(); let sql = if non_pk_updates.is_empty() { // All columns are PKs (e.g. tags) — just ignore conflicts format!( "INSERT OR IGNORE INTO {} ({}) VALUES ({})", table, col_names.join(", "), placeholders.join(", "), ) } else { format!( "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT({}) DO UPDATE SET {}", table, col_names.join(", "), placeholders.join(", "), pks.join(", "), non_pk_updates.join(", "), ) }; conn.execute( &sql, values .iter() .map(|v| v.as_ref()) .collect::>() .as_slice(), )?; Ok(()) } /// Apply a delete for a single row, handling composite primary keys. /// /// Reads the canonical primary-key columns from the decrypted `data` JSON /// when present (the post-M018 path; row_id is then a hash of the key). /// Falls back to parsing `row_id` for backwards compatibility with rows /// pushed by pre-M018 clients, which set data=NULL on DELETE and packed /// the cleartext PK into row_id as `"pk1:pk2"`. pub(crate) fn apply_delete( conn: &Connection, table: &str, row_id: &str, data: Option<&serde_json::Value>, ) -> Result<()> { let pks = pk_columns(table); // Prefer the `data` JSON: M018+ DELETE triggers always emit it, and it // carries the canonical key without depending on the wire-side row_id. if let Some(value) = data && let Some(obj) = value.as_object() { let mut clauses = Vec::with_capacity(pks.len()); let mut params: Vec = Vec::with_capacity(pks.len()); let mut missing = false; for (i, pk) in pks.iter().enumerate() { match obj.get(*pk) { Some(v) => { let s = match v { serde_json::Value::String(s) => s.clone(), serde_json::Value::Number(n) => n.to_string(), serde_json::Value::Null => { missing = true; break; } other => other.to_string(), }; clauses.push(format!("{} = ?{}", pk, i + 1)); params.push(s); } None => { missing = true; break; } } } if !missing && !clauses.is_empty() { let sql = format!("DELETE FROM {} WHERE {}", table, clauses.join(" AND ")); let params_dyn: Vec<&dyn rusqlite::ToSql> = params.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); conn.execute(&sql, params_dyn.as_slice())?; return Ok(()); } } // Pre-M018 fallback: parse row_id as the literal PK or "pk1:pk2". if pks.len() == 1 { let sql = format!("DELETE FROM {} WHERE {} = ?1", table, pks[0]); conn.execute(&sql, [row_id])?; } else if pks.len() == 2 { let (first, second) = match row_id.find(':') { Some(pos) => (&row_id[..pos], &row_id[pos + 1..]), None => { tracing::warn!("Cannot split composite row_id for {table}: {row_id}"); return Ok(()); } }; let sql = format!( "DELETE FROM {} WHERE {} = ?1 AND {} = ?2", table, pks[0], pks[1] ); conn.execute(&sql, rusqlite::params![first, second])?; } Ok(()) }