//! Push local changes to the remote sync server. use chrono::Utc; use goingson_core::CoreError; use sqlx::SqlitePool; use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient}; use tracing::{debug, warn}; use uuid::Uuid; use super::PUSH_BATCH_LIMIT; pub async fn push_changes( pool: &SqlitePool, client: &SyncKitClient, device_id: Uuid, ) -> Result { let mut total_pushed: i64 = 0; loop { let rows: Vec<(i64, String, String, String, String, Option)> = sqlx::query_as( "SELECT id, table_name, op, row_id, timestamp, data FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC LIMIT ?" ) .bind(PUSH_BATCH_LIMIT) .fetch_all(pool) .await .map_err(CoreError::database)?; if rows.is_empty() { break; } let row_count = rows.len() as i64; let mut pushed_ids: Vec = Vec::new(); let changes: Vec = rows .into_iter() .filter_map(|(id, table, op, row_id, timestamp, data)| { // Always mark as pushed (even unknown ops) to avoid infinite re-fetch pushed_ids.push(id); let ts = chrono::DateTime::parse_from_rfc3339(×tamp) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or(chrono::DateTime::UNIX_EPOCH); let json_data = data.and_then(|d| serde_json::from_str(&d).ok()); let change_op = match ChangeOp::from_str_opt(&op) { Some(o) => o, None => { warn!("Skipping changelog entry with unknown op: {}", op); return None; } }; Some(ChangeEntry { table, op: change_op, row_id, timestamp: ts, data: json_data, }) }) .collect(); let count = changes.len() as i64; let is_last_batch = row_count < PUSH_BATCH_LIMIT; client .push(device_id, changes) .await .map_err(|e| CoreError::sync(format!("push failed: {}", e)))?; // Mark pushed entries in a single transaction let mut tx = pool.begin().await.map_err(CoreError::database)?; for id in &pushed_ids { sqlx::query("UPDATE sync_changelog SET pushed = 1 WHERE id = ?") .bind(id) .execute(&mut *tx) .await .map_err(CoreError::database)?; } tx.commit().await.map_err(CoreError::database)?; total_pushed += count; if is_last_batch { break; } } if total_pushed > 0 { debug!("Pushed {} changes", total_pushed); } Ok(total_pushed) }