Skip to main content

max / goingson

8.5 KB · 243 lines History Blame Raw
1 //! Pull remote changes and apply them to the local database.
2
3 use chrono::Utc;
4 use goingson_core::CoreError;
5 use sqlx::{SqliteConnection, SqlitePool};
6 use synckit_client::{
7 ChangeEntry, ChangeOp, Resolution, SyncKitClient,
8 detect_conflicts, resolve_lww,
9 };
10 use tracing::debug;
11 use uuid::Uuid;
12
13 use super::state::{get_sync_state, set_sync_state};
14 use super::{UPSERT_ORDER, DELETE_ORDER};
15 use super::apply::{apply_upsert, apply_delete};
16
17 pub async fn pull_changes(
18 pool: &SqlitePool,
19 client: &SyncKitClient,
20 device_id: Uuid,
21 ) -> Result<i64, CoreError> {
22 let cursor_str = get_sync_state(pool, "pull_cursor").await?;
23 let mut cursor: i64 = cursor_str.parse().unwrap_or(0);
24 let mut total_applied: i64 = 0;
25
26 // Read local pending changes once (unpushed changelog entries)
27 let local_pending = read_local_pending(pool).await?;
28
29 loop {
30 let (changes, new_cursor, has_more) = client
31 .pull_rich(device_id, cursor)
32 .await
33 .map_err(|e| CoreError::sync(format!("pull failed: {}", e)))?;
34
35 if changes.is_empty() {
36 set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?;
37 break;
38 }
39
40 // Detect conflicts between remote changes and local pending
41 let (clean, conflicts) = detect_conflicts(changes, &local_pending, device_id);
42
43 // Apply non-conflicting changes directly
44 let clean_count = clean.len() as i64;
45 if !clean.is_empty() {
46 let clean_entries: Vec<ChangeEntry> =
47 clean.into_iter().map(|p| p.entry).collect();
48 apply_remote_changes(pool, clean_entries).await?;
49 }
50
51 // Resolve conflicts with LWW
52 let mut resolved_count: i64 = 0;
53 if !conflicts.is_empty() {
54 let conflict_count = conflicts.len();
55 let mut resolved_entries: Vec<ChangeEntry> = Vec::new();
56
57 for pair in &conflicts {
58 match resolve_lww(&pair.local, &pair.remote) {
59 Resolution::KeepRemote => {
60 resolved_entries.push(pair.remote.entry.clone());
61 }
62 Resolution::KeepLocal => {
63 // Skip — local version will push on next cycle
64 }
65 Resolution::Merged(data) => {
66 // Apply merged data using the remote entry as template
67 let mut merged = pair.remote.entry.clone();
68 merged.data = Some(data);
69 resolved_entries.push(merged);
70 }
71 Resolution::Skip => {
72 // Skip both
73 }
74 }
75 }
76
77 resolved_count = resolved_entries.len() as i64;
78 if !resolved_entries.is_empty() {
79 apply_remote_changes(pool, resolved_entries).await?;
80 }
81
82 debug!(
83 "Resolved {} conflicts ({} applied as remote wins)",
84 conflict_count,
85 conflict_count - conflicts.iter().filter(|p| {
86 matches!(resolve_lww(&p.local, &p.remote), Resolution::KeepLocal | Resolution::Skip)
87 }).count(),
88 );
89 }
90
91 total_applied += clean_count + resolved_count;
92
93 // Save cursor after each batch (crash-safe)
94 set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?;
95 cursor = new_cursor;
96
97 if !has_more {
98 break;
99 }
100 }
101
102 if total_applied > 0 {
103 debug!("Pulled and applied {} remote changes", total_applied);
104 }
105 Ok(total_applied)
106 }
107
108 /// Read unpushed changelog entries as ChangeEntry values for conflict detection.
109 async fn read_local_pending(pool: &SqlitePool) -> Result<Vec<ChangeEntry>, CoreError> {
110 let rows: Vec<(String, String, String, String, Option<String>)> = sqlx::query_as(
111 "SELECT table_name, op, row_id, timestamp, data FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC"
112 )
113 .fetch_all(pool)
114 .await
115 .map_err(CoreError::database)?;
116
117 let entries: Vec<ChangeEntry> = rows
118 .into_iter()
119 .filter_map(|(table, op, row_id, timestamp, data)| {
120 let ts = chrono::DateTime::parse_from_rfc3339(&timestamp)
121 .map(|dt| dt.with_timezone(&Utc))
122 .unwrap_or(chrono::DateTime::UNIX_EPOCH);
123
124 let change_op = ChangeOp::from_str_opt(&op)?;
125 let json_data = data.and_then(|d| serde_json::from_str(&d).ok());
126
127 Some(ChangeEntry {
128 table,
129 op: change_op,
130 row_id,
131 timestamp: ts,
132 data: json_data,
133 })
134 })
135 .collect();
136
137 Ok(entries)
138 }
139
140 /// Apply remote changes to local DB with triggers suppressed and FK enforcement off.
141 ///
142 /// FK enforcement is disabled so that tasks with `source_email_id` pointing to
143 /// emails not yet fetched locally can be inserted without error. Uses a dedicated
144 /// connection (same pattern as `crates/db-sqlite/src/migrations.rs`).
145 ///
146 /// The `applying_remote` flag is set inside a transaction on the dedicated connection.
147 /// In WAL mode, uncommitted changes are only visible to the writing connection, so
148 /// other connections (handling user edits) never see the flag and their triggers fire
149 /// normally. The flag is reset to '0' before commit so it's never globally visible as '1'.
150 ///
151 /// Uses `detach()` to prevent returning the connection to the pool with FK OFF
152 /// if pragma restoration fails. The pool will create a fresh connection (with
153 /// FK ON via connect options) to replace it.
154 pub(crate) async fn apply_remote_changes(pool: &SqlitePool, changes: Vec<ChangeEntry>) -> Result<(), CoreError> {
155 // Acquire a dedicated connection for FK pragma and transaction
156 let mut conn = pool.acquire().await.map_err(CoreError::database)?;
157
158 // FK pragma must be set outside a transaction (SQLite requirement)
159 sqlx::query("PRAGMA foreign_keys = OFF")
160 .execute(&mut *conn)
161 .await
162 .map_err(CoreError::database)?;
163
164 // Use a transaction so applying_remote is only visible to this connection (WAL isolation).
165 // Other connections see the committed value ('0') and their triggers fire normally.
166 sqlx::query("BEGIN IMMEDIATE")
167 .execute(&mut *conn)
168 .await
169 .map_err(CoreError::database)?;
170
171 sqlx::query("UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'")
172 .execute(&mut *conn)
173 .await
174 .map_err(CoreError::database)?;
175
176 let result = apply_changes_inner(&mut conn, changes).await;
177
178 // Reset flag before commit so it's never globally visible as '1'
179 let _ = sqlx::query("UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'")
180 .execute(&mut *conn)
181 .await;
182
183 if result.is_ok() {
184 if let Err(e) = sqlx::query("COMMIT").execute(&mut *conn).await {
185 let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
186 let _ = sqlx::query("PRAGMA foreign_keys = ON").execute(&mut *conn).await;
187 return Err(CoreError::database(e));
188 }
189 } else {
190 let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
191 }
192
193 // Always restore FK enforcement. If this fails, detach the connection
194 // so it doesn't return to the pool with FK OFF.
195 if let Err(e) = sqlx::query("PRAGMA foreign_keys = ON")
196 .execute(&mut *conn)
197 .await
198 {
199 conn.detach();
200 return Err(CoreError::database(e));
201 }
202
203 result
204 }
205
206 pub(crate) async fn apply_changes_inner(
207 conn: &mut SqliteConnection,
208 changes: Vec<ChangeEntry>,
209 ) -> Result<(), CoreError> {
210 // Separate upserts from deletes
211 let mut upserts: Vec<&ChangeEntry> = Vec::new();
212 let mut deletes: Vec<&ChangeEntry> = Vec::new();
213
214 for change in &changes {
215 match change.op {
216 ChangeOp::Insert | ChangeOp::Update => upserts.push(change),
217 ChangeOp::Delete => deletes.push(change),
218 }
219 }
220
221 // Apply upserts in parent-first FK order
222 for table in UPSERT_ORDER {
223 for change in &upserts {
224 if change.table == *table {
225 if let Some(ref data) = change.data {
226 apply_upsert(&mut *conn, table, &change.row_id, data).await?;
227 }
228 }
229 }
230 }
231
232 // Apply deletes in child-first FK order
233 for table in DELETE_ORDER {
234 for change in &deletes {
235 if change.table == *table {
236 apply_delete(&mut *conn, table, &change.row_id).await?;
237 }
238 }
239 }
240
241 Ok(())
242 }
243