| 1 |
|
| 2 |
|
| 3 |
use rusqlite::Connection; |
| 4 |
use synckit_client::{ChangeEntry, ChangeOp}; |
| 5 |
|
| 6 |
use tracing::instrument; |
| 7 |
|
| 8 |
use crate::error::Result; |
| 9 |
|
| 10 |
use super::{json_to_sql, pk_columns, table_columns, UPSERT_ORDER, DELETE_ORDER}; |
| 11 |
|
| 12 |
|
| 13 |
|
| 14 |
|
| 15 |
|
| 16 |
|
| 17 |
#[instrument(skip_all)] |
| 18 |
pub(crate) fn apply_remote_changes(conn: &Connection, changes: &[ChangeEntry]) -> Result<i64> { |
| 19 |
let tx = conn.unchecked_transaction()?; |
| 20 |
|
| 21 |
|
| 22 |
tx.execute( |
| 23 |
"UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'", |
| 24 |
[], |
| 25 |
)?; |
| 26 |
|
| 27 |
let mut count: i64 = 0; |
| 28 |
|
| 29 |
|
| 30 |
let mut upserts: Vec<&ChangeEntry> = Vec::new(); |
| 31 |
let mut deletes: Vec<&ChangeEntry> = Vec::new(); |
| 32 |
|
| 33 |
for change in changes { |
| 34 |
match change.op { |
| 35 |
ChangeOp::Insert | ChangeOp::Update => upserts.push(change), |
| 36 |
ChangeOp::Delete => deletes.push(change), |
| 37 |
} |
| 38 |
} |
| 39 |
|
| 40 |
|
| 41 |
for table in UPSERT_ORDER { |
| 42 |
for change in &upserts { |
| 43 |
if change.table == *table |
| 44 |
&& let Some(data) = &change.data { |
| 45 |
apply_upsert(&tx, table, data)?; |
| 46 |
count += 1; |
| 47 |
} |
| 48 |
} |
| 49 |
} |
| 50 |
|
| 51 |
|
| 52 |
|
| 53 |
|
| 54 |
|
| 55 |
for table in DELETE_ORDER { |
| 56 |
for change in &deletes { |
| 57 |
if change.table == *table { |
| 58 |
apply_delete(&tx, table, &change.row_id, change.data.as_ref())?; |
| 59 |
count += 1; |
| 60 |
} |
| 61 |
} |
| 62 |
} |
| 63 |
|
| 64 |
|
| 65 |
tx.execute( |
| 66 |
"UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'", |
| 67 |
[], |
| 68 |
)?; |
| 69 |
|
| 70 |
tx.commit()?; |
| 71 |
|
| 72 |
Ok(count) |
| 73 |
} |
| 74 |
|
| 75 |
|
| 76 |
pub(crate) fn apply_upsert( |
| 77 |
conn: &Connection, |
| 78 |
table: &str, |
| 79 |
data: &serde_json::Value, |
| 80 |
) -> Result<()> { |
| 81 |
let columns = match table_columns(table) { |
| 82 |
Some(c) => c, |
| 83 |
None => return Ok(()), |
| 84 |
}; |
| 85 |
|
| 86 |
let obj = match data.as_object() { |
| 87 |
Some(o) => o, |
| 88 |
None => return Ok(()), |
| 89 |
}; |
| 90 |
|
| 91 |
let mut col_names = Vec::new(); |
| 92 |
let mut placeholders = Vec::new(); |
| 93 |
let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new(); |
| 94 |
|
| 95 |
for (i, col) in columns.iter().enumerate() { |
| 96 |
if let Some(val) = obj.get(*col) { |
| 97 |
col_names.push(*col); |
| 98 |
placeholders.push(format!("?{}", i + 1)); |
| 99 |
values.push(json_to_sql(val)); |
| 100 |
} |
| 101 |
} |
| 102 |
|
| 103 |
if col_names.is_empty() { |
| 104 |
return Ok(()); |
| 105 |
} |
| 106 |
|
| 107 |
|
| 108 |
|
| 109 |
let pks = pk_columns(table); |
| 110 |
let pk_set: std::collections::HashSet<&str> = pks.iter().copied().collect(); |
| 111 |
let non_pk_updates: Vec<String> = col_names |
| 112 |
.iter() |
| 113 |
.enumerate() |
| 114 |
.filter(|(_, col)| !pk_set.contains(*col)) |
| 115 |
.map(|(i, col)| format!("{} = ?{}", col, i + 1)) |
| 116 |
.collect(); |
| 117 |
|
| 118 |
let sql = if non_pk_updates.is_empty() { |
| 119 |
|
| 120 |
format!( |
| 121 |
"INSERT OR IGNORE INTO {} ({}) VALUES ({})", |
| 122 |
table, |
| 123 |
col_names.join(", "), |
| 124 |
placeholders.join(", "), |
| 125 |
) |
| 126 |
} else { |
| 127 |
format!( |
| 128 |
"INSERT INTO {} ({}) VALUES ({}) ON CONFLICT({}) DO UPDATE SET {}", |
| 129 |
table, |
| 130 |
col_names.join(", "), |
| 131 |
placeholders.join(", "), |
| 132 |
pks.join(", "), |
| 133 |
non_pk_updates.join(", "), |
| 134 |
) |
| 135 |
}; |
| 136 |
|
| 137 |
conn.execute( |
| 138 |
&sql, |
| 139 |
values |
| 140 |
.iter() |
| 141 |
.map(|v| v.as_ref()) |
| 142 |
.collect::<Vec<_>>() |
| 143 |
.as_slice(), |
| 144 |
)?; |
| 145 |
|
| 146 |
Ok(()) |
| 147 |
} |
| 148 |
|
| 149 |
|
| 150 |
|
| 151 |
|
| 152 |
|
| 153 |
|
| 154 |
|
| 155 |
|
| 156 |
pub(crate) fn apply_delete( |
| 157 |
conn: &Connection, |
| 158 |
table: &str, |
| 159 |
row_id: &str, |
| 160 |
data: Option<&serde_json::Value>, |
| 161 |
) -> Result<()> { |
| 162 |
let pks = pk_columns(table); |
| 163 |
|
| 164 |
|
| 165 |
|
| 166 |
if let Some(value) = data |
| 167 |
&& let Some(obj) = value.as_object() { |
| 168 |
let mut clauses = Vec::with_capacity(pks.len()); |
| 169 |
let mut params: Vec<String> = Vec::with_capacity(pks.len()); |
| 170 |
let mut missing = false; |
| 171 |
for (i, pk) in pks.iter().enumerate() { |
| 172 |
match obj.get(*pk) { |
| 173 |
Some(v) => { |
| 174 |
let s = match v { |
| 175 |
serde_json::Value::String(s) => s.clone(), |
| 176 |
serde_json::Value::Number(n) => n.to_string(), |
| 177 |
serde_json::Value::Null => { |
| 178 |
missing = true; |
| 179 |
break; |
| 180 |
} |
| 181 |
other => other.to_string(), |
| 182 |
}; |
| 183 |
clauses.push(format!("{} = ?{}", pk, i + 1)); |
| 184 |
params.push(s); |
| 185 |
} |
| 186 |
None => { |
| 187 |
missing = true; |
| 188 |
break; |
| 189 |
} |
| 190 |
} |
| 191 |
} |
| 192 |
if !missing && !clauses.is_empty() { |
| 193 |
let sql = format!("DELETE FROM {} WHERE {}", table, clauses.join(" AND ")); |
| 194 |
let params_dyn: Vec<&dyn rusqlite::ToSql> = |
| 195 |
params.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); |
| 196 |
conn.execute(&sql, params_dyn.as_slice())?; |
| 197 |
return Ok(()); |
| 198 |
} |
| 199 |
} |
| 200 |
|
| 201 |
|
| 202 |
if pks.len() == 1 { |
| 203 |
let sql = format!("DELETE FROM {} WHERE {} = ?1", table, pks[0]); |
| 204 |
conn.execute(&sql, [row_id])?; |
| 205 |
} else if pks.len() == 2 { |
| 206 |
let (first, second) = match row_id.find(':') { |
| 207 |
Some(pos) => (&row_id[..pos], &row_id[pos + 1..]), |
| 208 |
None => { |
| 209 |
tracing::warn!("Cannot split composite row_id for {table}: {row_id}"); |
| 210 |
return Ok(()); |
| 211 |
} |
| 212 |
}; |
| 213 |
let sql = format!( |
| 214 |
"DELETE FROM {} WHERE {} = ?1 AND {} = ?2", |
| 215 |
table, pks[0], pks[1] |
| 216 |
); |
| 217 |
conn.execute(&sql, rusqlite::params![first, second])?; |
| 218 |
} |
| 219 |
|
| 220 |
Ok(()) |
| 221 |
} |
| 222 |
|