//! Pull remote changes and apply to local DB. use sqlx::SqlitePool; use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient}; use tracing::{debug, instrument, warn}; use uuid::Uuid; use crate::commands::error::ApiError; use super::{get_sync_state, set_sync_state, DELETE_ORDER, UPSERT_ORDER}; #[instrument(skip_all)] pub async fn pull_changes( pool: &SqlitePool, client: &SyncKitClient, device_id: Uuid, ) -> Result { let cursor_str = get_sync_state(pool, "pull_cursor").await?; let mut cursor: i64 = cursor_str.parse().unwrap_or(0); let mut total_applied: i64 = 0; loop { let (changes, new_cursor, has_more) = client .pull(device_id, cursor) .await .map_err(|e| ApiError::internal(format!("pull failed: {}", e)))?; if changes.is_empty() { set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?; break; } let batch_count = changes.len() as i64; apply_remote_changes(pool, changes).await?; total_applied += batch_count; // Save cursor after each batch (crash-safe) set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?; cursor = new_cursor; if !has_more { break; } } if total_applied > 0 { debug!(count = total_applied, "Pulled and applied remote changes"); } Ok(total_applied) } /// Apply remote changes to local DB with triggers suppressed. /// /// The `applying_remote` flag and the data changes are wrapped in a single /// transaction so that a crash mid-apply rolls back everything — including /// the flag — preventing a stuck flag from suppressing local changelog /// entries until the next sync. async fn apply_remote_changes(pool: &SqlitePool, changes: Vec) -> Result<(), ApiError> { let mut tx = pool.begin().await.map_err(super::db_err)?; // Set flag inside the transaction sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES ('applying_remote', '1')") .execute(&mut *tx) .await .map_err(super::db_err)?; let result = apply_changes_inner_tx(&mut tx, changes).await; // Clear the flag inside the same transaction sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES ('applying_remote', '0')") .execute(&mut *tx) .await .map_err(super::db_err)?; // If data application failed, still commit the flag clear (the partial // changes are acceptable since cursor hasn't been updated yet, and // re-pulling will re-apply the full batch idempotently). tx.commit().await.map_err(super::db_err)?; result } /// Apply changes within a transaction. async fn apply_changes_inner_tx( tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, changes: Vec, ) -> Result<(), ApiError> { let mut upserts: Vec<&ChangeEntry> = Vec::new(); let mut deletes: Vec<&ChangeEntry> = Vec::new(); for change in &changes { match change.op { ChangeOp::Insert | ChangeOp::Update => upserts.push(change), ChangeOp::Delete => deletes.push(change), } } for table in UPSERT_ORDER { for change in &upserts { if change.table == *table { if let Some(ref data) = change.data { apply_upsert_exec(&mut **tx, table, &change.row_id, data).await?; } } } } for table in DELETE_ORDER { for change in &deletes { if change.table == *table { apply_delete_exec(&mut **tx, table, &change.row_id).await?; } } } Ok(()) } // ── Table column whitelist ── pub(crate) fn table_columns(table: &str) -> Option<&'static [&'static str]> { match table { "feeds" => Some(&[ "id", "busser_id", "name", "config", "enabled", "last_fetch", "created_at", "updated_at", "consecutive_failures", "last_error", "last_success_at", "circuit_broken", ]), "feed_tags" => Some(&["feed_id", "tag"]), "query_feeds" => Some(&["id", "name", "rules", "created_at", "updated_at"]), "user_config" => Some(&["key", "value"]), "bookmarks" => Some(&[ "id", "url", "title", "description", "author", "source_name", "feed_item_id", "notes", "is_pinned", "created_at", "updated_at", ]), "bookmark_tags" => Some(&["bookmark_id", "tag"]), "feed_items" => Some(&["id", "is_read", "is_starred"]), _ => None, } } /// Primary key column(s) for each synced table. fn pk_column(table: &str) -> &'static str { match table { "feed_tags" => "feed_id, tag", "bookmark_tags" => "bookmark_id, tag", "user_config" => "key", _ => "id", } } /// Apply an INSERT OR REPLACE for a remote change (pool version, used by tests). #[cfg(test)] async fn apply_upsert( pool: &SqlitePool, table: &str, row_id: &str, data: &serde_json::Value, ) -> Result<(), ApiError> { let mut conn = pool.acquire().await.map_err(super::db_err)?; apply_upsert_exec(&mut *conn, table, row_id, data).await } /// Apply an INSERT OR REPLACE for a remote change on any SQLite executor. async fn apply_upsert_exec( conn: &mut sqlx::SqliteConnection, table: &str, _row_id: &str, data: &serde_json::Value, ) -> Result<(), ApiError> { let columns = table_columns(table) .ok_or_else(|| ApiError::bad_request(format!("unknown syncable table: {}", table)))?; // feed_items are partial syncs (only user state), so use UPDATE instead of INSERT OR REPLACE if table == "feed_items" { let sql = "UPDATE feed_items SET is_read = ?, is_starred = ? WHERE id = ?"; let is_read = json_to_i32(&data["is_read"]); let is_starred = json_to_i32(&data["is_starred"]); let id = data["id"].as_str().unwrap_or(""); if id.is_empty() { warn!(table, "Skipping remote change with missing ID"); return Ok(()); } sqlx::query(sql) .bind(is_read) .bind(is_starred) .bind(id) .execute(&mut *conn) .await .map_err(super::db_err)?; return Ok(()); } // Validate that the incoming JSON has the primary key column(s). // INSERT OR REPLACE in SQLite means DELETE + INSERT, so missing columns // get NULL values, silently overwriting existing data. let pk = pk_column(table); let pk_cols: Vec<&str> = pk.split(", ").collect(); for pk_col in &pk_cols { if data.get(*pk_col).is_none() || data[*pk_col].is_null() { warn!(table, pk_col, "Skipping remote upsert with missing primary key column"); return Ok(()); } } let missing: Vec<&&str> = columns .iter() .filter(|col| data.get(**col).is_none() || data[**col].is_null()) .collect(); if !missing.is_empty() { debug!( table, missing_columns = ?missing, "Remote upsert has NULL columns (may overwrite existing data)" ); } let col_list = columns.join(", "); let placeholders = columns.iter().map(|_| "?").collect::>().join(", "); let sql = format!( "INSERT OR REPLACE INTO {} ({}) VALUES ({})", table, col_list, placeholders ); let mut query = sqlx::query(&sql); for col in columns { let val = &data[*col]; match val { serde_json::Value::String(s) => { query = query.bind(s.as_str()); } serde_json::Value::Number(n) => { if let Some(i) = n.as_i64() { query = query.bind(i); } else if let Some(f) = n.as_f64() { query = query.bind(f); } else { query = query.bind(None::); } } serde_json::Value::Bool(b) => { query = query.bind(if *b { 1i32 } else { 0i32 }); } serde_json::Value::Null => { query = query.bind(None::); } _ => { // Arrays/objects: serialize as JSON string query = query.bind(val.to_string()); } } } query .execute(&mut *conn) .await .map_err(super::db_err)?; Ok(()) } fn json_to_i32(val: &serde_json::Value) -> i32 { match val { serde_json::Value::Number(n) => n.as_i64().unwrap_or(0) as i32, serde_json::Value::Bool(b) => if *b { 1 } else { 0 }, _ => 0, } } /// Apply a DELETE for a remote change (pool version, used by tests). #[cfg(test)] async fn apply_delete(pool: &SqlitePool, table: &str, row_id: &str) -> Result<(), ApiError> { let mut conn = pool.acquire().await.map_err(super::db_err)?; apply_delete_exec(&mut *conn, table, row_id).await } /// Apply a DELETE for a remote change on any SQLite executor. async fn apply_delete_exec( conn: &mut sqlx::SqliteConnection, table: &str, row_id: &str, ) -> Result<(), ApiError> { if table_columns(table).is_none() { return Err(ApiError::bad_request(format!("unknown syncable table: {}", table))); } // feed_items: don't delete remotely (content re-fetches from source) if table == "feed_items" { return Ok(()); } let pk = pk_column(table); if table == "feed_tags" || table == "bookmark_tags" { // Composite PK: row_id is "uuid:tag" where uuid is 36 chars. // We can't use split_once(':') because tags may contain colons (e.g. "cat:subcat"). // Instead, parse the first 36 characters as the ID and everything after // the separator colon (char 37) as the tag. if row_id.len() > 36 && row_id.as_bytes()[36] == b':' { let id_val = &row_id[..36]; let tag = &row_id[37..]; let id_col = if table == "feed_tags" { "feed_id" } else { "bookmark_id" }; let sql = format!("DELETE FROM {} WHERE {} = ? AND tag = ?", table, id_col); sqlx::query(&sql) .bind(id_val) .bind(tag) .execute(&mut *conn) .await .map_err(super::db_err)?; } return Ok(()); } let sql = format!("DELETE FROM {} WHERE {} = ?", table, pk); sqlx::query(&sql) .bind(row_id) .execute(&mut *conn) .await .map_err(super::db_err)?; Ok(()) } #[cfg(test)] mod tests { use super::super::tests::*; use super::*; use serde_json::json; use uuid::Uuid; // ── apply_remote_changes (integration of ordering + suppression) ── #[tokio::test] async fn apply_remote_changes_upserts_in_parent_first_order() { let pool = setup_test_db().await; let feed_id = Uuid::new_v4().to_string(); // Provide both a feed and a tag referencing it. If ordering is wrong, // the tag insert would fail due to FK constraint. let changes = vec![ ChangeEntry { table: "feed_tags".to_string(), op: ChangeOp::Insert, row_id: format!("{}:ordered", feed_id), timestamp: chrono::Utc::now(), data: Some(json!({"feed_id": feed_id, "tag": "ordered"})), }, ChangeEntry { table: "feeds".to_string(), op: ChangeOp::Insert, row_id: feed_id.clone(), timestamp: chrono::Utc::now(), data: Some(json!({ "id": feed_id, "busser_id": "rss", "name": "Ordered Feed", "config": "{}", "enabled": 1, "last_fetch": null, "created_at": "2024-01-01 00:00:00", "updated_at": "2024-01-01 00:00:00", "consecutive_failures": 0, "last_error": null, "last_success_at": null, "circuit_broken": 0, })), }, ]; // Even though feed_tags comes first in the vec, UPSERT_ORDER ensures // feeds is applied before feed_tags. apply_remote_changes(&pool, changes).await.unwrap(); let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?") .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count, 1); let (tag_count,): (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM feed_tags WHERE feed_id = ? AND tag = 'ordered'" ) .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(tag_count, 1); } #[tokio::test] async fn apply_remote_changes_deletes_in_child_first_order() { let pool = setup_test_db().await; // Suppress triggers so we can set up data without changelog entries set_sync_state(&pool, "applying_remote", "1").await.unwrap(); let feed_id = create_test_feed(&pool, "Delete Order Feed").await; sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'doomed')") .bind(&feed_id) .execute(&pool) .await .unwrap(); set_sync_state(&pool, "applying_remote", "0").await.unwrap(); // Provide delete for both feed and tag. DELETE_ORDER ensures // feed_tags is deleted before feeds (child first). let changes = vec![ ChangeEntry { table: "feeds".to_string(), op: ChangeOp::Delete, row_id: feed_id.clone(), timestamp: chrono::Utc::now(), data: None, }, ChangeEntry { table: "feed_tags".to_string(), op: ChangeOp::Delete, row_id: format!("{}:doomed", feed_id), timestamp: chrono::Utc::now(), data: None, }, ]; apply_remote_changes(&pool, changes).await.unwrap(); let (tag_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_tags WHERE feed_id = ?") .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(tag_count, 0); let (feed_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?") .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(feed_count, 0); } #[tokio::test] async fn apply_remote_changes_suppresses_triggers() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); let feed_id = Uuid::new_v4().to_string(); let changes = vec![ChangeEntry { table: "feeds".to_string(), op: ChangeOp::Insert, row_id: feed_id.clone(), timestamp: chrono::Utc::now(), data: Some(json!({ "id": feed_id, "busser_id": "rss", "name": "Remote Only", "config": "{}", "enabled": 1, "last_fetch": null, "created_at": "2024-01-01 00:00:00", "updated_at": "2024-01-01 00:00:00", "consecutive_failures": 0, "last_error": null, "last_success_at": null, "circuit_broken": 0, })), }]; apply_remote_changes(&pool, changes).await.unwrap(); // The feed should exist but no changelog entry should have been created // because apply_remote_changes sets applying_remote = "1". let (changelog_count,): (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'feeds' AND row_id = ?" ) .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!( changelog_count, 0, "triggers should be suppressed during remote apply" ); // applying_remote flag should be cleared after apply let flag = get_sync_state(&pool, "applying_remote").await.unwrap(); assert_eq!(flag, "0", "applying_remote flag should be reset after apply"); } #[tokio::test] async fn apply_remote_changes_mixed_ops() { let pool = setup_test_db().await; // Suppress triggers for setup set_sync_state(&pool, "applying_remote", "1").await.unwrap(); let existing_feed_id = create_test_feed(&pool, "Existing Feed").await; let updatable_feed_id = create_test_feed(&pool, "Updatable Feed").await; let item_id = create_test_item(&pool, &updatable_feed_id, "ext-mixed").await; set_sync_state(&pool, "applying_remote", "0").await.unwrap(); let new_feed_id = Uuid::new_v4().to_string(); let changes = vec![ // Insert a new feed ChangeEntry { table: "feeds".to_string(), op: ChangeOp::Insert, row_id: new_feed_id.clone(), timestamp: chrono::Utc::now(), data: Some(json!({ "id": new_feed_id, "busser_id": "hn", "name": "New Remote Feed", "config": "{}", "enabled": 1, "last_fetch": null, "created_at": "2024-01-01 00:00:00", "updated_at": "2024-01-01 00:00:00", "consecutive_failures": 0, "last_error": null, "last_success_at": null, "circuit_broken": 0, })), }, // Update an existing feed_item's read state ChangeEntry { table: "feed_items".to_string(), op: ChangeOp::Update, row_id: item_id.clone(), timestamp: chrono::Utc::now(), data: Some(json!({ "id": item_id, "is_read": 1, "is_starred": 0, })), }, // Delete a different feed (not the one with the item) ChangeEntry { table: "feeds".to_string(), op: ChangeOp::Delete, row_id: existing_feed_id.clone(), timestamp: chrono::Utc::now(), data: None, }, ]; apply_remote_changes(&pool, changes).await.unwrap(); // New feed exists let (new_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?") .bind(&new_feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(new_count, 1); // Item is_read updated (item's parent feed was not deleted) let (is_read,): (i32,) = sqlx::query_as("SELECT is_read FROM feed_items WHERE id = ?") .bind(&item_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(is_read, 1); // Existing feed deleted let (old_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?") .bind(&existing_feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(old_count, 0); } #[tokio::test] async fn apply_remote_changes_empty_is_no_op() { let pool = setup_test_db().await; apply_remote_changes(&pool, vec![]).await.unwrap(); // applying_remote flag should be cleared let flag = get_sync_state(&pool, "applying_remote").await.unwrap(); assert_eq!(flag, "0"); } // ── pull_changes with wiremock ── mod pull { use super::*; use synckit_client::{SyncKitClient, SyncKitConfig}; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; const TEST_KEY: [u8; 32] = [42u8; 32]; fn mock_client(server_url: &str) -> SyncKitClient { let client = SyncKitClient::new(SyncKitConfig { server_url: server_url.to_string(), api_key: "test-key".to_string(), }); client.restore_session( "fake-token", Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(), Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(), ); client.set_master_key_raw(TEST_KEY); client } /// Encrypt a JSON value with the test key for mock responses. fn encrypt_for_mock(value: &serde_json::Value) -> serde_json::Value { synckit_client::crypto::encrypt_json(value, &TEST_KEY).unwrap() } #[tokio::test] async fn pull_changes_no_remote_changes() { let pool = setup_test_db().await; set_sync_state(&pool, "pull_cursor", "0").await.unwrap(); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/pull")) .respond_with( ResponseTemplate::new(200).set_body_json(json!({ "changes": [], "cursor": 0, "has_more": false })), ) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); assert_eq!(result, 0); } #[tokio::test] async fn pull_changes_applies_remote_feed_insert() { let pool = setup_test_db().await; set_sync_state(&pool, "pull_cursor", "0").await.unwrap(); let feed_id = Uuid::new_v4().to_string(); let device_id = Uuid::new_v4(); let feed_data = json!({ "id": feed_id, "busser_id": "rss", "name": "Pulled Feed", "config": "{}", "enabled": 1, "last_fetch": null, "created_at": "2024-06-01 00:00:00", "updated_at": "2024-06-01 00:00:00", "consecutive_failures": 0, "last_error": null, "last_success_at": null, "circuit_broken": 0, }); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/pull")) .respond_with( ResponseTemplate::new(200).set_body_json(json!({ "changes": [{ "seq": 1, "device_id": device_id.to_string(), "table": "feeds", "op": "INSERT", "row_id": feed_id, "timestamp": "2024-06-01T00:00:00Z", "data": encrypt_for_mock(&feed_data), }], "cursor": 1, "has_more": false })), ) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); assert_eq!(result, 1); // Verify the feed was inserted into local DB let (name,): (String,) = sqlx::query_as("SELECT name FROM feeds WHERE id = ?") .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(name, "Pulled Feed"); // Verify cursor was updated let cursor = get_sync_state(&pool, "pull_cursor").await.unwrap(); assert_eq!(cursor, "1"); } #[tokio::test] async fn pull_changes_updates_cursor_on_empty_response() { let pool = setup_test_db().await; set_sync_state(&pool, "pull_cursor", "5").await.unwrap(); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/pull")) .respond_with( ResponseTemplate::new(200).set_body_json(json!({ "changes": [], "cursor": 10, "has_more": false })), ) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); assert_eq!(result, 0); // Cursor should be updated even with no changes let cursor = get_sync_state(&pool, "pull_cursor").await.unwrap(); assert_eq!(cursor, "10"); } #[tokio::test] async fn pull_changes_handles_has_more_pagination() { let pool = setup_test_db().await; set_sync_state(&pool, "pull_cursor", "0").await.unwrap(); let feed_id_1 = Uuid::new_v4().to_string(); let feed_id_2 = Uuid::new_v4().to_string(); let device_id = Uuid::new_v4(); let feed_data_1 = json!({ "id": feed_id_1, "busser_id": "rss", "name": "Page 1 Feed", "config": "{}", "enabled": 1, "last_fetch": null, "created_at": "2024-06-01 00:00:00", "updated_at": "2024-06-01 00:00:00", "consecutive_failures": 0, "last_error": null, "last_success_at": null, "circuit_broken": 0, }); let feed_data_2 = json!({ "id": feed_id_2, "busser_id": "rss", "name": "Page 2 Feed", "config": "{}", "enabled": 1, "last_fetch": null, "created_at": "2024-06-01 00:00:00", "updated_at": "2024-06-01 00:00:00", "consecutive_failures": 0, "last_error": null, "last_success_at": null, "circuit_broken": 0, }); let server = MockServer::start().await; // First page: has_more = true Mock::given(method("POST")) .and(path("/api/v1/sync/pull")) .respond_with( ResponseTemplate::new(200).set_body_json(json!({ "changes": [{ "seq": 1, "device_id": device_id.to_string(), "table": "feeds", "op": "INSERT", "row_id": feed_id_1, "timestamp": "2024-06-01T00:00:00Z", "data": encrypt_for_mock(&feed_data_1), }], "cursor": 1, "has_more": true })), ) .up_to_n_times(1) .mount(&server) .await; // Second page: has_more = false Mock::given(method("POST")) .and(path("/api/v1/sync/pull")) .respond_with( ResponseTemplate::new(200).set_body_json(json!({ "changes": [{ "seq": 2, "device_id": device_id.to_string(), "table": "feeds", "op": "INSERT", "row_id": feed_id_2, "timestamp": "2024-06-01T00:00:01Z", "data": encrypt_for_mock(&feed_data_2), }], "cursor": 2, "has_more": false })), ) .up_to_n_times(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); assert_eq!(result, 2); // Both feeds should exist let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id IN (?, ?)") .bind(&feed_id_1) .bind(&feed_id_2) .fetch_one(&pool) .await .unwrap(); assert_eq!(count, 2); // Cursor should be at the final value let cursor = get_sync_state(&pool, "pull_cursor").await.unwrap(); assert_eq!(cursor, "2"); } #[tokio::test] async fn pull_changes_applies_delete() { let pool = setup_test_db().await; set_sync_state(&pool, "pull_cursor", "0").await.unwrap(); // Suppress triggers and create a feed to delete set_sync_state(&pool, "applying_remote", "1").await.unwrap(); let feed_id = create_test_feed(&pool, "Soon Deleted").await; set_sync_state(&pool, "applying_remote", "0").await.unwrap(); let device_id = Uuid::new_v4(); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/pull")) .respond_with( ResponseTemplate::new(200).set_body_json(json!({ "changes": [{ "seq": 1, "device_id": device_id.to_string(), "table": "feeds", "op": "DELETE", "row_id": feed_id, "timestamp": "2024-06-01T00:00:00Z", }], "cursor": 1, "has_more": false })), ) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); assert_eq!(result, 1); let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?") .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count, 0, "feed should be deleted after pull"); } #[tokio::test] async fn pull_changes_does_not_create_changelog_entries() { let pool = setup_test_db().await; set_sync_state(&pool, "pull_cursor", "0").await.unwrap(); sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); let feed_id = Uuid::new_v4().to_string(); let device_id = Uuid::new_v4(); let feed_data = json!({ "id": feed_id, "busser_id": "rss", "name": "No Changelog Feed", "config": "{}", "enabled": 1, "last_fetch": null, "created_at": "2024-06-01 00:00:00", "updated_at": "2024-06-01 00:00:00", "consecutive_failures": 0, "last_error": null, "last_success_at": null, "circuit_broken": 0, }); let server = MockServer::start().await; Mock::given(method("POST")) .and(path("/api/v1/sync/pull")) .respond_with( ResponseTemplate::new(200).set_body_json(json!({ "changes": [{ "seq": 1, "device_id": device_id.to_string(), "table": "feeds", "op": "INSERT", "row_id": feed_id, "timestamp": "2024-06-01T00:00:00Z", "data": encrypt_for_mock(&feed_data), }], "cursor": 1, "has_more": false })), ) .expect(1) .mount(&server) .await; let client = mock_client(&server.uri()); pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap(); // No sync_changelog entries should be created (triggers suppressed) let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool) .await .unwrap(); assert_eq!( count, 0, "pulled changes should not create local changelog entries" ); } } // ── apply_upsert ── #[tokio::test] async fn apply_upsert_inserts_feed() { let pool = setup_test_db().await; let feed_id = Uuid::new_v4().to_string(); let data = json!({ "id": feed_id, "busser_id": "rss", "name": "Remote Feed", "config": "{}", "enabled": 1, "last_fetch": null, "created_at": "2024-01-01 00:00:00", "updated_at": "2024-01-01 00:00:00", "consecutive_failures": 0, "last_error": null, "last_success_at": null, "circuit_broken": 0, }); apply_upsert(&pool, "feeds", &feed_id, &data).await.unwrap(); let row: (String,) = sqlx::query_as("SELECT name FROM feeds WHERE id = ?") .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "Remote Feed"); } #[tokio::test] async fn apply_upsert_feed_items_partial_update() { let pool = setup_test_db().await; let feed_id = create_test_feed(&pool, "Partial Feed").await; let item_id = create_test_item(&pool, &feed_id, "ext-partial").await; // BB's unique pattern: feed_items use UPDATE not INSERT OR REPLACE let data = json!({ "id": item_id, "is_read": 1, "is_starred": 1, }); apply_upsert(&pool, "feed_items", &item_id, &data).await.unwrap(); // Other columns should be preserved let row: (String, i32, i32) = sqlx::query_as( "SELECT title, is_read, is_starred FROM feed_items WHERE id = ?" ) .bind(&item_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "title"); // Original value preserved assert_eq!(row.1, 1); // Updated assert_eq!(row.2, 1); // Updated } // ── apply_delete ── #[tokio::test] async fn apply_delete_feed_tags_composite_pk() { let pool = setup_test_db().await; let feed_id = create_test_feed(&pool, "Tag Delete Feed").await; sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'delete-me')") .bind(&feed_id) .execute(&pool) .await .unwrap(); apply_delete(&pool, "feed_tags", &format!("{}:delete-me", feed_id)).await.unwrap(); let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM feed_tags WHERE feed_id = ? AND tag = 'delete-me'" ) .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0); } #[tokio::test] async fn apply_delete_feed_tags_with_colon_in_tag() { let pool = setup_test_db().await; let feed_id = create_test_feed(&pool, "Colon Tag Feed").await; // Insert a tag that contains a colon sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'cat:subcat')") .bind(&feed_id) .execute(&pool) .await .unwrap(); // The row_id format is "feed_id:tag", so with a colon in the tag // it becomes "uuid:cat:subcat" let row_id = format!("{}:cat:subcat", feed_id); apply_delete(&pool, "feed_tags", &row_id).await.unwrap(); let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM feed_tags WHERE feed_id = ? AND tag = 'cat:subcat'" ) .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0, "tag with colon should be deleted"); } #[tokio::test] async fn apply_delete_feed_tags_simple_tag() { let pool = setup_test_db().await; let feed_id = create_test_feed(&pool, "Simple Tag Feed").await; sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'tech')") .bind(&feed_id) .execute(&pool) .await .unwrap(); let row_id = format!("{}:tech", feed_id); apply_delete(&pool, "feed_tags", &row_id).await.unwrap(); let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM feed_tags WHERE feed_id = ? AND tag = 'tech'" ) .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0, "simple tag should be deleted"); } #[tokio::test] async fn apply_delete_feed_items_is_no_op() { let pool = setup_test_db().await; let feed_id = create_test_feed(&pool, "No-op Delete Feed").await; let item_id = create_test_item(&pool, &feed_id, "ext-no-op").await; // BB skips feed_items deletes (content re-fetches from source) apply_delete(&pool, "feed_items", &item_id).await.unwrap(); let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE id = ?") .bind(&item_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 1); } }