//! Push local changes to the remote sync server. use chrono::Utc; use sqlx::SqlitePool; use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient}; use tracing::{debug, instrument, warn}; use uuid::Uuid; use crate::commands::error::ApiError; use super::PUSH_BATCH_LIMIT; #[instrument(skip_all)] pub async fn push_changes( pool: &SqlitePool, client: &SyncKitClient, device_id: Uuid, ) -> Result { let rows: Vec<(i64, String, String, String, String, Option)> = sqlx::query_as( "SELECT id, table_name, op, row_id, timestamp, data \ FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC LIMIT ?", ) .bind(PUSH_BATCH_LIMIT) .fetch_all(pool) .await .map_err(super::db_err)?; if rows.is_empty() { return Ok(0); } // Track which changelog IDs are actually included in the push. // Entries with corrupt JSON or unknown ops are skipped — they must NOT // be marked as pushed, or their data is silently lost. let mut pushed_ids: Vec = Vec::with_capacity(rows.len()); let mut skipped_ids: Vec = Vec::new(); let changes: Vec = rows .into_iter() .filter_map(|(id, table, op, row_id, timestamp, data)| { let ts = chrono::DateTime::parse_from_rfc3339(×tamp) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|e| { warn!(raw = %timestamp, error = %e, "Malformed changelog timestamp, using now"); Utc::now() }); let json_data = data.and_then(|d| match serde_json::from_str(&d) { Ok(v) => Some(v), Err(e) => { warn!(id, error = %e, raw = %d, "Corrupt changelog JSON, skipping entry"); None } }); let change_op = match ChangeOp::from_str_opt(&op) { Some(o) => o, None => { warn!(id, op = %op, "Skipping changelog entry with unknown op"); skipped_ids.push(id); return None; } }; pushed_ids.push(id); Some(ChangeEntry { table, op: change_op, row_id, timestamp: ts, data: json_data, }) }) .collect(); if !skipped_ids.is_empty() { warn!(count = skipped_ids.len(), "Skipped unpushable changelog entries (will retry)"); } let count = changes.len() as i64; if !changes.is_empty() { client .push(device_id, changes) .await .map_err(|e| ApiError::internal(format!("push failed: {}", e)))?; } // Mark only successfully-pushed IDs (not skipped entries). // Batch into groups of 999 to stay under SQLite's variable limit. for chunk in pushed_ids.chunks(999) { let placeholders = chunk.iter().map(|_| "?").collect::>().join(","); let sql = format!( "UPDATE sync_changelog SET pushed = 1 WHERE id IN ({})", placeholders ); let mut query = sqlx::query(&sql); for id in chunk { query = query.bind(id); } query .execute(pool) .await .map_err(super::db_err)?; } debug!(count, "Pushed changes"); Ok(count) } #[cfg(test)] mod tests { use super::super::tests::*; use super::*; use synckit_client::{SyncKitClient, SyncKitConfig}; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; /// Create a SyncKitClient pointing at the mock server with auth and key set. fn mock_client(server_url: &str) -> SyncKitClient { let client = SyncKitClient::new(SyncKitConfig { server_url: server_url.to_string(), api_key: "test-key".to_string(), }); // Use a plain token (no JWT exp check when jwt_exp returns None) client.restore_session( "fake-token", Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(), Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(), ); client.set_master_key_raw([42u8; 32]); client } // ── Empty changelog ── #[tokio::test] async fn push_changes_empty_changelog_returns_zero() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); // No mock server needed -- push_changes returns early before HTTP let server = MockServer::start().await; let client = mock_client(&server.uri()); let device_id = Uuid::new_v4(); let result = push_changes(&pool, &client, device_id).await.unwrap(); assert_eq!(result, 0); } // ── Successful push ── #[tokio::test] async fn push_changes_sends_and_marks_pushed() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); // Insert changelog entries directly (bypass triggers) sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \ VALUES ('feeds', 'INSERT', 'feed-1', datetime('now'), '{\"id\":\"feed-1\",\"name\":\"Test\"}', 0)" ) .execute(&pool) .await .unwrap(); sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \ VALUES ('user_config', 'INSERT', 'theme', datetime('now'), '{\"key\":\"theme\",\"value\":\"dark\"}', 0)" ) .execute(&pool) .await .unwrap(); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/push")) .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ "cursor": 42 }))) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let device_id = Uuid::new_v4(); let result = push_changes(&pool, &client, device_id).await.unwrap(); assert_eq!(result, 2); // Verify entries are marked as pushed let (unpushed,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0") .fetch_one(&pool) .await .unwrap(); assert_eq!(unpushed, 0); let (pushed,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 1") .fetch_one(&pool) .await .unwrap(); assert_eq!(pushed, 2); } // ── Only unpushed entries are sent ── #[tokio::test] async fn push_changes_skips_already_pushed() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); // One already pushed, one pending sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \ VALUES ('feeds', 'INSERT', 'old', datetime('now'), '{\"id\":\"old\"}', 1)" ) .execute(&pool) .await .unwrap(); sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \ VALUES ('feeds', 'UPDATE', 'new', datetime('now'), '{\"id\":\"new\"}', 0)" ) .execute(&pool) .await .unwrap(); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/push")) .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ "cursor": 10 }))) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = push_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); assert_eq!(result, 1); } // ── Delete entries (no data) ── #[tokio::test] async fn push_changes_handles_delete_with_null_data() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \ VALUES ('feeds', 'DELETE', 'deleted-feed', datetime('now'), NULL, 0)" ) .execute(&pool) .await .unwrap(); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/push")) .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ "cursor": 1 }))) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = push_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); assert_eq!(result, 1); } // ── Trigger-produced entries push correctly ── #[tokio::test] async fn push_changes_from_trigger_produced_entries() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); // Create a feed via the normal path (triggers fire, producing changelog entries) let _feed_id = create_test_feed(&pool, "Trigger Feed").await; let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/push")) .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ "cursor": 5 }))) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = push_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); assert_eq!(result, 1); // One INSERT from the trigger // Verify marked as pushed let (unpushed,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0") .fetch_one(&pool) .await .unwrap(); assert_eq!(unpushed, 0); } // ── Server error propagates ── #[tokio::test] async fn push_changes_propagates_server_error() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \ VALUES ('feeds', 'INSERT', 'fail', datetime('now'), '{\"id\":\"fail\"}', 0)" ) .execute(&pool) .await .unwrap(); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/push")) .respond_with(ResponseTemplate::new(400).set_body_string("bad request")) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = push_changes(&pool, &client, Uuid::new_v4()).await; assert!(result.is_err()); // Entries should NOT be marked as pushed on failure let (unpushed,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0") .fetch_one(&pool) .await .unwrap(); assert_eq!(unpushed, 1, "entries should remain unpushed after server error"); } }