//! Pull remote changes and apply them to the local database. use chrono::Utc; use goingson_core::CoreError; use sqlx::{SqliteConnection, SqlitePool}; use synckit_client::{ ChangeEntry, ChangeOp, Resolution, SyncKitClient, detect_conflicts, resolve_lww, }; use tracing::debug; use uuid::Uuid; use super::state::{get_sync_state, set_sync_state}; use super::{UPSERT_ORDER, DELETE_ORDER}; use super::apply::{apply_upsert, apply_delete}; pub async fn pull_changes( pool: &SqlitePool, client: &SyncKitClient, device_id: Uuid, ) -> Result { let cursor_str = get_sync_state(pool, "pull_cursor").await?; let mut cursor: i64 = cursor_str.parse().unwrap_or(0); let mut total_applied: i64 = 0; // Read local pending changes once (unpushed changelog entries) let local_pending = read_local_pending(pool).await?; loop { let (changes, new_cursor, has_more) = client .pull_rich(device_id, cursor) .await .map_err(|e| CoreError::sync(format!("pull failed: {}", e)))?; if changes.is_empty() { set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?; break; } // Detect conflicts between remote changes and local pending let (clean, conflicts) = detect_conflicts(changes, &local_pending, device_id); // Apply non-conflicting changes directly let clean_count = clean.len() as i64; if !clean.is_empty() { let clean_entries: Vec = clean.into_iter().map(|p| p.entry).collect(); apply_remote_changes(pool, clean_entries).await?; } // Resolve conflicts with LWW let mut resolved_count: i64 = 0; if !conflicts.is_empty() { let conflict_count = conflicts.len(); let mut resolved_entries: Vec = Vec::new(); for pair in &conflicts { match resolve_lww(&pair.local, &pair.remote) { Resolution::KeepRemote => { resolved_entries.push(pair.remote.entry.clone()); } Resolution::KeepLocal => { // Skip — local version will push on next cycle } Resolution::Merged(data) => { // Apply merged data using the remote entry as template let mut merged = pair.remote.entry.clone(); merged.data = Some(data); resolved_entries.push(merged); } Resolution::Skip => { // Skip both } } } resolved_count = resolved_entries.len() as i64; if !resolved_entries.is_empty() { apply_remote_changes(pool, resolved_entries).await?; } debug!( "Resolved {} conflicts ({} applied as remote wins)", conflict_count, conflict_count - conflicts.iter().filter(|p| { matches!(resolve_lww(&p.local, &p.remote), Resolution::KeepLocal | Resolution::Skip) }).count(), ); } total_applied += clean_count + resolved_count; // Save cursor after each batch (crash-safe) set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?; cursor = new_cursor; if !has_more { break; } } if total_applied > 0 { debug!("Pulled and applied {} remote changes", total_applied); } Ok(total_applied) } /// Read unpushed changelog entries as ChangeEntry values for conflict detection. async fn read_local_pending(pool: &SqlitePool) -> Result, CoreError> { let rows: Vec<(String, String, String, String, Option)> = sqlx::query_as( "SELECT table_name, op, row_id, timestamp, data FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC" ) .fetch_all(pool) .await .map_err(CoreError::database)?; let entries: Vec = rows .into_iter() .filter_map(|(table, op, row_id, timestamp, data)| { let ts = chrono::DateTime::parse_from_rfc3339(×tamp) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or(chrono::DateTime::UNIX_EPOCH); let change_op = ChangeOp::from_str_opt(&op)?; let json_data = data.and_then(|d| serde_json::from_str(&d).ok()); Some(ChangeEntry { table, op: change_op, row_id, timestamp: ts, data: json_data, }) }) .collect(); Ok(entries) } /// Apply remote changes to local DB with triggers suppressed and FK enforcement off. /// /// FK enforcement is disabled so that tasks with `source_email_id` pointing to /// emails not yet fetched locally can be inserted without error. Uses a dedicated /// connection (same pattern as `crates/db-sqlite/src/migrations.rs`). /// /// The `applying_remote` flag is set inside a transaction on the dedicated connection. /// In WAL mode, uncommitted changes are only visible to the writing connection, so /// other connections (handling user edits) never see the flag and their triggers fire /// normally. The flag is reset to '0' before commit so it's never globally visible as '1'. /// /// Uses `detach()` to prevent returning the connection to the pool with FK OFF /// if pragma restoration fails. The pool will create a fresh connection (with /// FK ON via connect options) to replace it. pub(crate) async fn apply_remote_changes(pool: &SqlitePool, changes: Vec) -> Result<(), CoreError> { // Acquire a dedicated connection for FK pragma and transaction let mut conn = pool.acquire().await.map_err(CoreError::database)?; // FK pragma must be set outside a transaction (SQLite requirement) sqlx::query("PRAGMA foreign_keys = OFF") .execute(&mut *conn) .await .map_err(CoreError::database)?; // Use a transaction so applying_remote is only visible to this connection (WAL isolation). // Other connections see the committed value ('0') and their triggers fire normally. sqlx::query("BEGIN IMMEDIATE") .execute(&mut *conn) .await .map_err(CoreError::database)?; sqlx::query("UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'") .execute(&mut *conn) .await .map_err(CoreError::database)?; let result = apply_changes_inner(&mut conn, changes).await; // Reset flag before commit so it's never globally visible as '1' let _ = sqlx::query("UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'") .execute(&mut *conn) .await; if result.is_ok() { if let Err(e) = sqlx::query("COMMIT").execute(&mut *conn).await { let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await; let _ = sqlx::query("PRAGMA foreign_keys = ON").execute(&mut *conn).await; return Err(CoreError::database(e)); } } else { let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await; } // Always restore FK enforcement. If this fails, detach the connection // so it doesn't return to the pool with FK OFF. if let Err(e) = sqlx::query("PRAGMA foreign_keys = ON") .execute(&mut *conn) .await { conn.detach(); return Err(CoreError::database(e)); } result } pub(crate) async fn apply_changes_inner( conn: &mut SqliteConnection, changes: Vec, ) -> Result<(), CoreError> { // Separate upserts from deletes let mut upserts: Vec<&ChangeEntry> = Vec::new(); let mut deletes: Vec<&ChangeEntry> = Vec::new(); for change in &changes { match change.op { ChangeOp::Insert | ChangeOp::Update => upserts.push(change), ChangeOp::Delete => deletes.push(change), } } // Apply upserts in parent-first FK order for table in UPSERT_ORDER { for change in &upserts { if change.table == *table { if let Some(ref data) = change.data { apply_upsert(&mut *conn, table, &change.row_id, data).await?; } } } } // Apply deletes in child-first FK order for table in DELETE_ORDER { for change in &deletes { if change.table == *table { apply_delete(&mut *conn, table, &change.row_id).await?; } } } Ok(()) }