Skip to main content

max / audiofiles

7.2 KB · 222 lines History Blame Raw
1 //! Conflict resolution: apply remote changes locally with FK-safe ordering.
2
3 use rusqlite::Connection;
4 use synckit_client::{ChangeEntry, ChangeOp};
5
6 use tracing::instrument;
7
8 use crate::error::Result;
9
10 use super::{json_to_sql, pk_columns, table_columns, UPSERT_ORDER, DELETE_ORDER};
11
12 /// Apply a batch of remote changes locally, with trigger suppression.
13 ///
14 /// The flag set, data changes, and flag clear all happen inside a single
15 /// transaction so a process crash mid-apply rolls back everything — the flag
16 /// never gets stuck in the DB.
17 #[instrument(skip_all)]
18 pub(crate) fn apply_remote_changes(conn: &Connection, changes: &[ChangeEntry]) -> Result<i64> {
19 let tx = conn.unchecked_transaction()?;
20
21 // Suppress triggers while applying (inside the transaction)
22 tx.execute(
23 "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'",
24 [],
25 )?;
26
27 let mut count: i64 = 0;
28
29 // Separate upserts and deletes
30 let mut upserts: Vec<&ChangeEntry> = Vec::new();
31 let mut deletes: Vec<&ChangeEntry> = Vec::new();
32
33 for change in changes {
34 match change.op {
35 ChangeOp::Insert | ChangeOp::Update => upserts.push(change),
36 ChangeOp::Delete => deletes.push(change),
37 }
38 }
39
40 // Apply upserts in FK-safe order
41 for table in UPSERT_ORDER {
42 for change in &upserts {
43 if change.table == *table
44 && let Some(data) = &change.data {
45 apply_upsert(&tx, table, data)?;
46 count += 1;
47 }
48 }
49 }
50
51 // Apply deletes in reverse FK order. Pass `data` (decrypted by synckit)
52 // so apply_delete can read canonical key fields from JSON — post-M018
53 // sync_changelog DELETE rows hash the row_id, so the cleartext PK lives
54 // only in `data`.
55 for table in DELETE_ORDER {
56 for change in &deletes {
57 if change.table == *table {
58 apply_delete(&tx, table, &change.row_id, change.data.as_ref())?;
59 count += 1;
60 }
61 }
62 }
63
64 // Clear the flag (still inside the transaction)
65 tx.execute(
66 "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'",
67 [],
68 )?;
69
70 tx.commit()?;
71
72 Ok(count)
73 }
74
75 /// Apply an upsert (INSERT OR REPLACE) for a single row.
76 pub(crate) fn apply_upsert(
77 conn: &Connection,
78 table: &str,
79 data: &serde_json::Value,
80 ) -> Result<()> {
81 let columns = match table_columns(table) {
82 Some(c) => c,
83 None => return Ok(()),
84 };
85
86 let obj = match data.as_object() {
87 Some(o) => o,
88 None => return Ok(()),
89 };
90
91 let mut col_names = Vec::new();
92 let mut placeholders = Vec::new();
93 let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
94
95 for (i, col) in columns.iter().enumerate() {
96 if let Some(val) = obj.get(*col) {
97 col_names.push(*col);
98 placeholders.push(format!("?{}", i + 1));
99 values.push(json_to_sql(val));
100 }
101 }
102
103 if col_names.is_empty() {
104 return Ok(());
105 }
106
107 // Use INSERT ... ON CONFLICT DO UPDATE to avoid DELETE+INSERT behavior of
108 // INSERT OR REPLACE, which would cascade FK deletes to child rows.
109 let pks = pk_columns(table);
110 let pk_set: std::collections::HashSet<&str> = pks.iter().copied().collect();
111 let non_pk_updates: Vec<String> = col_names
112 .iter()
113 .enumerate()
114 .filter(|(_, col)| !pk_set.contains(*col))
115 .map(|(i, col)| format!("{} = ?{}", col, i + 1))
116 .collect();
117
118 let sql = if non_pk_updates.is_empty() {
119 // All columns are PKs (e.g. tags) — just ignore conflicts
120 format!(
121 "INSERT OR IGNORE INTO {} ({}) VALUES ({})",
122 table,
123 col_names.join(", "),
124 placeholders.join(", "),
125 )
126 } else {
127 format!(
128 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT({}) DO UPDATE SET {}",
129 table,
130 col_names.join(", "),
131 placeholders.join(", "),
132 pks.join(", "),
133 non_pk_updates.join(", "),
134 )
135 };
136
137 conn.execute(
138 &sql,
139 values
140 .iter()
141 .map(|v| v.as_ref())
142 .collect::<Vec<_>>()
143 .as_slice(),
144 )?;
145
146 Ok(())
147 }
148
149 /// Apply a delete for a single row, handling composite primary keys.
150 ///
151 /// Reads the canonical primary-key columns from the decrypted `data` JSON
152 /// when present (the post-M018 path; row_id is then a hash of the key).
153 /// Falls back to parsing `row_id` for backwards compatibility with rows
154 /// pushed by pre-M018 clients, which set data=NULL on DELETE and packed
155 /// the cleartext PK into row_id as `"pk1:pk2"`.
156 pub(crate) fn apply_delete(
157 conn: &Connection,
158 table: &str,
159 row_id: &str,
160 data: Option<&serde_json::Value>,
161 ) -> Result<()> {
162 let pks = pk_columns(table);
163
164 // Prefer the `data` JSON: M018+ DELETE triggers always emit it, and it
165 // carries the canonical key without depending on the wire-side row_id.
166 if let Some(value) = data
167 && let Some(obj) = value.as_object() {
168 let mut clauses = Vec::with_capacity(pks.len());
169 let mut params: Vec<String> = Vec::with_capacity(pks.len());
170 let mut missing = false;
171 for (i, pk) in pks.iter().enumerate() {
172 match obj.get(*pk) {
173 Some(v) => {
174 let s = match v {
175 serde_json::Value::String(s) => s.clone(),
176 serde_json::Value::Number(n) => n.to_string(),
177 serde_json::Value::Null => {
178 missing = true;
179 break;
180 }
181 other => other.to_string(),
182 };
183 clauses.push(format!("{} = ?{}", pk, i + 1));
184 params.push(s);
185 }
186 None => {
187 missing = true;
188 break;
189 }
190 }
191 }
192 if !missing && !clauses.is_empty() {
193 let sql = format!("DELETE FROM {} WHERE {}", table, clauses.join(" AND "));
194 let params_dyn: Vec<&dyn rusqlite::ToSql> =
195 params.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
196 conn.execute(&sql, params_dyn.as_slice())?;
197 return Ok(());
198 }
199 }
200
201 // Pre-M018 fallback: parse row_id as the literal PK or "pk1:pk2".
202 if pks.len() == 1 {
203 let sql = format!("DELETE FROM {} WHERE {} = ?1", table, pks[0]);
204 conn.execute(&sql, [row_id])?;
205 } else if pks.len() == 2 {
206 let (first, second) = match row_id.find(':') {
207 Some(pos) => (&row_id[..pos], &row_id[pos + 1..]),
208 None => {
209 tracing::warn!("Cannot split composite row_id for {table}: {row_id}");
210 return Ok(());
211 }
212 };
213 let sql = format!(
214 "DELETE FROM {} WHERE {} = ?1 AND {} = ?2",
215 table, pks[0], pks[1]
216 );
217 conn.execute(&sql, rusqlite::params![first, second])?;
218 }
219
220 Ok(())
221 }
222