Skip to main content

max / balanced_breakfast

37.9 KB · 1104 lines History Blame Raw
1 //! Pull remote changes and apply to local DB.
2
3 use sqlx::SqlitePool;
4 use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient};
5 use tracing::{debug, instrument, warn};
6 use uuid::Uuid;
7
8 use crate::commands::error::ApiError;
9
10 use super::{get_sync_state, set_sync_state, DELETE_ORDER, UPSERT_ORDER};
11
12 #[instrument(skip_all)]
13 pub async fn pull_changes(
14 pool: &SqlitePool,
15 client: &SyncKitClient,
16 device_id: Uuid,
17 ) -> Result<i64, ApiError> {
18 let cursor_str = get_sync_state(pool, "pull_cursor").await?;
19 let mut cursor: i64 = cursor_str.parse().unwrap_or(0);
20 let mut total_applied: i64 = 0;
21
22 loop {
23 let (changes, new_cursor, has_more) = client
24 .pull(device_id, cursor)
25 .await
26 .map_err(|e| ApiError::internal(format!("pull failed: {}", e)))?;
27
28 if changes.is_empty() {
29 set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?;
30 break;
31 }
32
33 let batch_count = changes.len() as i64;
34 apply_remote_changes(pool, changes).await?;
35 total_applied += batch_count;
36
37 // Save cursor after each batch (crash-safe)
38 set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?;
39 cursor = new_cursor;
40
41 if !has_more {
42 break;
43 }
44 }
45
46 if total_applied > 0 {
47 debug!(count = total_applied, "Pulled and applied remote changes");
48 }
49 Ok(total_applied)
50 }
51
52 /// Apply remote changes to local DB with triggers suppressed.
53 ///
54 /// The `applying_remote` flag and the data changes are wrapped in a single
55 /// transaction so that a crash mid-apply rolls back everything — including
56 /// the flag — preventing a stuck flag from suppressing local changelog
57 /// entries until the next sync.
58 async fn apply_remote_changes(pool: &SqlitePool, changes: Vec<ChangeEntry>) -> Result<(), ApiError> {
59 let mut tx = pool.begin().await.map_err(super::db_err)?;
60
61 // Set flag inside the transaction
62 sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES ('applying_remote', '1')")
63 .execute(&mut *tx)
64 .await
65 .map_err(super::db_err)?;
66
67 let result = apply_changes_inner_tx(&mut tx, changes).await;
68
69 // Clear the flag inside the same transaction
70 sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES ('applying_remote', '0')")
71 .execute(&mut *tx)
72 .await
73 .map_err(super::db_err)?;
74
75 // If data application failed, still commit the flag clear (the partial
76 // changes are acceptable since cursor hasn't been updated yet, and
77 // re-pulling will re-apply the full batch idempotently).
78 tx.commit().await.map_err(super::db_err)?;
79
80 result
81 }
82
83 /// Apply changes within a transaction.
84 async fn apply_changes_inner_tx(
85 tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
86 changes: Vec<ChangeEntry>,
87 ) -> Result<(), ApiError> {
88 let mut upserts: Vec<&ChangeEntry> = Vec::new();
89 let mut deletes: Vec<&ChangeEntry> = Vec::new();
90
91 for change in &changes {
92 match change.op {
93 ChangeOp::Insert | ChangeOp::Update => upserts.push(change),
94 ChangeOp::Delete => deletes.push(change),
95 }
96 }
97
98 for table in UPSERT_ORDER {
99 for change in &upserts {
100 if change.table == *table {
101 if let Some(ref data) = change.data {
102 apply_upsert_exec(&mut **tx, table, &change.row_id, data).await?;
103 }
104 }
105 }
106 }
107
108 for table in DELETE_ORDER {
109 for change in &deletes {
110 if change.table == *table {
111 apply_delete_exec(&mut **tx, table, &change.row_id).await?;
112 }
113 }
114 }
115
116 Ok(())
117 }
118
119 // ── Table column whitelist ──
120
121 pub(crate) fn table_columns(table: &str) -> Option<&'static [&'static str]> {
122 match table {
123 "feeds" => Some(&[
124 "id", "busser_id", "name", "config", "enabled", "last_fetch",
125 "created_at", "updated_at", "consecutive_failures", "last_error",
126 "last_success_at", "circuit_broken",
127 ]),
128 "feed_tags" => Some(&["feed_id", "tag"]),
129 "query_feeds" => Some(&["id", "name", "rules", "created_at", "updated_at"]),
130 "user_config" => Some(&["key", "value"]),
131 "bookmarks" => Some(&[
132 "id", "url", "title", "description", "author", "source_name",
133 "feed_item_id", "notes", "is_pinned", "created_at", "updated_at",
134 ]),
135 "bookmark_tags" => Some(&["bookmark_id", "tag"]),
136 "feed_items" => Some(&["id", "is_read", "is_starred"]),
137 _ => None,
138 }
139 }
140
141 /// Primary key column(s) for each synced table.
142 fn pk_column(table: &str) -> &'static str {
143 match table {
144 "feed_tags" => "feed_id, tag",
145 "bookmark_tags" => "bookmark_id, tag",
146 "user_config" => "key",
147 _ => "id",
148 }
149 }
150
151 /// Apply an INSERT OR REPLACE for a remote change (pool version, used by tests).
152 #[cfg(test)]
153 async fn apply_upsert(
154 pool: &SqlitePool,
155 table: &str,
156 row_id: &str,
157 data: &serde_json::Value,
158 ) -> Result<(), ApiError> {
159 let mut conn = pool.acquire().await.map_err(super::db_err)?;
160 apply_upsert_exec(&mut *conn, table, row_id, data).await
161 }
162
163 /// Apply an INSERT OR REPLACE for a remote change on any SQLite executor.
164 async fn apply_upsert_exec(
165 conn: &mut sqlx::SqliteConnection,
166 table: &str,
167 _row_id: &str,
168 data: &serde_json::Value,
169 ) -> Result<(), ApiError> {
170 let columns = table_columns(table)
171 .ok_or_else(|| ApiError::bad_request(format!("unknown syncable table: {}", table)))?;
172
173 // feed_items are partial syncs (only user state), so use UPDATE instead of INSERT OR REPLACE
174 if table == "feed_items" {
175 let sql = "UPDATE feed_items SET is_read = ?, is_starred = ? WHERE id = ?";
176 let is_read = json_to_i32(&data["is_read"]);
177 let is_starred = json_to_i32(&data["is_starred"]);
178 let id = data["id"].as_str().unwrap_or("");
179 if id.is_empty() {
180 warn!(table, "Skipping remote change with missing ID");
181 return Ok(());
182 }
183
184 sqlx::query(sql)
185 .bind(is_read)
186 .bind(is_starred)
187 .bind(id)
188 .execute(&mut *conn)
189 .await
190 .map_err(super::db_err)?;
191 return Ok(());
192 }
193
194 // Validate that the incoming JSON has the primary key column(s).
195 // INSERT OR REPLACE in SQLite means DELETE + INSERT, so missing columns
196 // get NULL values, silently overwriting existing data.
197 let pk = pk_column(table);
198 let pk_cols: Vec<&str> = pk.split(", ").collect();
199 for pk_col in &pk_cols {
200 if data.get(*pk_col).is_none() || data[*pk_col].is_null() {
201 warn!(table, pk_col, "Skipping remote upsert with missing primary key column");
202 return Ok(());
203 }
204 }
205
206 let missing: Vec<&&str> = columns
207 .iter()
208 .filter(|col| data.get(**col).is_none() || data[**col].is_null())
209 .collect();
210
211 if !missing.is_empty() {
212 debug!(
213 table,
214 missing_columns = ?missing,
215 "Remote upsert has NULL columns (may overwrite existing data)"
216 );
217 }
218
219 let col_list = columns.join(", ");
220 let placeholders = columns.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
221 let sql = format!(
222 "INSERT OR REPLACE INTO {} ({}) VALUES ({})",
223 table, col_list, placeholders
224 );
225
226 let mut query = sqlx::query(&sql);
227
228 for col in columns {
229 let val = &data[*col];
230 match val {
231 serde_json::Value::String(s) => {
232 query = query.bind(s.as_str());
233 }
234 serde_json::Value::Number(n) => {
235 if let Some(i) = n.as_i64() {
236 query = query.bind(i);
237 } else if let Some(f) = n.as_f64() {
238 query = query.bind(f);
239 } else {
240 query = query.bind(None::<String>);
241 }
242 }
243 serde_json::Value::Bool(b) => {
244 query = query.bind(if *b { 1i32 } else { 0i32 });
245 }
246 serde_json::Value::Null => {
247 query = query.bind(None::<String>);
248 }
249 _ => {
250 // Arrays/objects: serialize as JSON string
251 query = query.bind(val.to_string());
252 }
253 }
254 }
255
256 query
257 .execute(&mut *conn)
258 .await
259 .map_err(super::db_err)?;
260
261 Ok(())
262 }
263
264 fn json_to_i32(val: &serde_json::Value) -> i32 {
265 match val {
266 serde_json::Value::Number(n) => n.as_i64().unwrap_or(0) as i32,
267 serde_json::Value::Bool(b) => if *b { 1 } else { 0 },
268 _ => 0,
269 }
270 }
271
272 /// Apply a DELETE for a remote change (pool version, used by tests).
273 #[cfg(test)]
274 async fn apply_delete(pool: &SqlitePool, table: &str, row_id: &str) -> Result<(), ApiError> {
275 let mut conn = pool.acquire().await.map_err(super::db_err)?;
276 apply_delete_exec(&mut *conn, table, row_id).await
277 }
278
279 /// Apply a DELETE for a remote change on any SQLite executor.
280 async fn apply_delete_exec(
281 conn: &mut sqlx::SqliteConnection,
282 table: &str,
283 row_id: &str,
284 ) -> Result<(), ApiError> {
285 if table_columns(table).is_none() {
286 return Err(ApiError::bad_request(format!("unknown syncable table: {}", table)));
287 }
288
289 // feed_items: don't delete remotely (content re-fetches from source)
290 if table == "feed_items" {
291 return Ok(());
292 }
293
294 let pk = pk_column(table);
295
296 if table == "feed_tags" || table == "bookmark_tags" {
297 // Composite PK: row_id is "uuid:tag" where uuid is 36 chars.
298 // We can't use split_once(':') because tags may contain colons (e.g. "cat:subcat").
299 // Instead, parse the first 36 characters as the ID and everything after
300 // the separator colon (char 37) as the tag.
301 if row_id.len() > 36 && row_id.as_bytes()[36] == b':' {
302 let id_val = &row_id[..36];
303 let tag = &row_id[37..];
304 let id_col = if table == "feed_tags" { "feed_id" } else { "bookmark_id" };
305 let sql = format!("DELETE FROM {} WHERE {} = ? AND tag = ?", table, id_col);
306 sqlx::query(&sql)
307 .bind(id_val)
308 .bind(tag)
309 .execute(&mut *conn)
310 .await
311 .map_err(super::db_err)?;
312 }
313 return Ok(());
314 }
315
316 let sql = format!("DELETE FROM {} WHERE {} = ?", table, pk);
317 sqlx::query(&sql)
318 .bind(row_id)
319 .execute(&mut *conn)
320 .await
321 .map_err(super::db_err)?;
322
323 Ok(())
324 }
325
326 #[cfg(test)]
327 mod tests {
328 use super::super::tests::*;
329 use super::*;
330 use serde_json::json;
331 use uuid::Uuid;
332
333 // ── apply_remote_changes (integration of ordering + suppression) ──
334
335 #[tokio::test]
336 async fn apply_remote_changes_upserts_in_parent_first_order() {
337 let pool = setup_test_db().await;
338 let feed_id = Uuid::new_v4().to_string();
339
340 // Provide both a feed and a tag referencing it. If ordering is wrong,
341 // the tag insert would fail due to FK constraint.
342 let changes = vec![
343 ChangeEntry {
344 table: "feed_tags".to_string(),
345 op: ChangeOp::Insert,
346 row_id: format!("{}:ordered", feed_id),
347 timestamp: chrono::Utc::now(),
348 data: Some(json!({"feed_id": feed_id, "tag": "ordered"})),
349 },
350 ChangeEntry {
351 table: "feeds".to_string(),
352 op: ChangeOp::Insert,
353 row_id: feed_id.clone(),
354 timestamp: chrono::Utc::now(),
355 data: Some(json!({
356 "id": feed_id,
357 "busser_id": "rss",
358 "name": "Ordered Feed",
359 "config": "{}",
360 "enabled": 1,
361 "last_fetch": null,
362 "created_at": "2024-01-01 00:00:00",
363 "updated_at": "2024-01-01 00:00:00",
364 "consecutive_failures": 0,
365 "last_error": null,
366 "last_success_at": null,
367 "circuit_broken": 0,
368 })),
369 },
370 ];
371
372 // Even though feed_tags comes first in the vec, UPSERT_ORDER ensures
373 // feeds is applied before feed_tags.
374 apply_remote_changes(&pool, changes).await.unwrap();
375
376 let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?")
377 .bind(&feed_id)
378 .fetch_one(&pool)
379 .await
380 .unwrap();
381 assert_eq!(count, 1);
382
383 let (tag_count,): (i64,) = sqlx::query_as(
384 "SELECT COUNT(*) FROM feed_tags WHERE feed_id = ? AND tag = 'ordered'"
385 )
386 .bind(&feed_id)
387 .fetch_one(&pool)
388 .await
389 .unwrap();
390 assert_eq!(tag_count, 1);
391 }
392
393 #[tokio::test]
394 async fn apply_remote_changes_deletes_in_child_first_order() {
395 let pool = setup_test_db().await;
396
397 // Suppress triggers so we can set up data without changelog entries
398 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
399 let feed_id = create_test_feed(&pool, "Delete Order Feed").await;
400 sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'doomed')")
401 .bind(&feed_id)
402 .execute(&pool)
403 .await
404 .unwrap();
405 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
406
407 // Provide delete for both feed and tag. DELETE_ORDER ensures
408 // feed_tags is deleted before feeds (child first).
409 let changes = vec![
410 ChangeEntry {
411 table: "feeds".to_string(),
412 op: ChangeOp::Delete,
413 row_id: feed_id.clone(),
414 timestamp: chrono::Utc::now(),
415 data: None,
416 },
417 ChangeEntry {
418 table: "feed_tags".to_string(),
419 op: ChangeOp::Delete,
420 row_id: format!("{}:doomed", feed_id),
421 timestamp: chrono::Utc::now(),
422 data: None,
423 },
424 ];
425
426 apply_remote_changes(&pool, changes).await.unwrap();
427
428 let (tag_count,): (i64,) =
429 sqlx::query_as("SELECT COUNT(*) FROM feed_tags WHERE feed_id = ?")
430 .bind(&feed_id)
431 .fetch_one(&pool)
432 .await
433 .unwrap();
434 assert_eq!(tag_count, 0);
435
436 let (feed_count,): (i64,) =
437 sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?")
438 .bind(&feed_id)
439 .fetch_one(&pool)
440 .await
441 .unwrap();
442 assert_eq!(feed_count, 0);
443 }
444
445 #[tokio::test]
446 async fn apply_remote_changes_suppresses_triggers() {
447 let pool = setup_test_db().await;
448 sqlx::query("DELETE FROM sync_changelog")
449 .execute(&pool)
450 .await
451 .unwrap();
452
453 let feed_id = Uuid::new_v4().to_string();
454 let changes = vec![ChangeEntry {
455 table: "feeds".to_string(),
456 op: ChangeOp::Insert,
457 row_id: feed_id.clone(),
458 timestamp: chrono::Utc::now(),
459 data: Some(json!({
460 "id": feed_id,
461 "busser_id": "rss",
462 "name": "Remote Only",
463 "config": "{}",
464 "enabled": 1,
465 "last_fetch": null,
466 "created_at": "2024-01-01 00:00:00",
467 "updated_at": "2024-01-01 00:00:00",
468 "consecutive_failures": 0,
469 "last_error": null,
470 "last_success_at": null,
471 "circuit_broken": 0,
472 })),
473 }];
474
475 apply_remote_changes(&pool, changes).await.unwrap();
476
477 // The feed should exist but no changelog entry should have been created
478 // because apply_remote_changes sets applying_remote = "1".
479 let (changelog_count,): (i64,) = sqlx::query_as(
480 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'feeds' AND row_id = ?"
481 )
482 .bind(&feed_id)
483 .fetch_one(&pool)
484 .await
485 .unwrap();
486 assert_eq!(
487 changelog_count, 0,
488 "triggers should be suppressed during remote apply"
489 );
490
491 // applying_remote flag should be cleared after apply
492 let flag = get_sync_state(&pool, "applying_remote").await.unwrap();
493 assert_eq!(flag, "0", "applying_remote flag should be reset after apply");
494 }
495
496 #[tokio::test]
497 async fn apply_remote_changes_mixed_ops() {
498 let pool = setup_test_db().await;
499
500 // Suppress triggers for setup
501 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
502 let existing_feed_id = create_test_feed(&pool, "Existing Feed").await;
503 let updatable_feed_id = create_test_feed(&pool, "Updatable Feed").await;
504 let item_id = create_test_item(&pool, &updatable_feed_id, "ext-mixed").await;
505 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
506
507 let new_feed_id = Uuid::new_v4().to_string();
508
509 let changes = vec![
510 // Insert a new feed
511 ChangeEntry {
512 table: "feeds".to_string(),
513 op: ChangeOp::Insert,
514 row_id: new_feed_id.clone(),
515 timestamp: chrono::Utc::now(),
516 data: Some(json!({
517 "id": new_feed_id,
518 "busser_id": "hn",
519 "name": "New Remote Feed",
520 "config": "{}",
521 "enabled": 1,
522 "last_fetch": null,
523 "created_at": "2024-01-01 00:00:00",
524 "updated_at": "2024-01-01 00:00:00",
525 "consecutive_failures": 0,
526 "last_error": null,
527 "last_success_at": null,
528 "circuit_broken": 0,
529 })),
530 },
531 // Update an existing feed_item's read state
532 ChangeEntry {
533 table: "feed_items".to_string(),
534 op: ChangeOp::Update,
535 row_id: item_id.clone(),
536 timestamp: chrono::Utc::now(),
537 data: Some(json!({
538 "id": item_id,
539 "is_read": 1,
540 "is_starred": 0,
541 })),
542 },
543 // Delete a different feed (not the one with the item)
544 ChangeEntry {
545 table: "feeds".to_string(),
546 op: ChangeOp::Delete,
547 row_id: existing_feed_id.clone(),
548 timestamp: chrono::Utc::now(),
549 data: None,
550 },
551 ];
552
553 apply_remote_changes(&pool, changes).await.unwrap();
554
555 // New feed exists
556 let (new_count,): (i64,) =
557 sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?")
558 .bind(&new_feed_id)
559 .fetch_one(&pool)
560 .await
561 .unwrap();
562 assert_eq!(new_count, 1);
563
564 // Item is_read updated (item's parent feed was not deleted)
565 let (is_read,): (i32,) =
566 sqlx::query_as("SELECT is_read FROM feed_items WHERE id = ?")
567 .bind(&item_id)
568 .fetch_one(&pool)
569 .await
570 .unwrap();
571 assert_eq!(is_read, 1);
572
573 // Existing feed deleted
574 let (old_count,): (i64,) =
575 sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?")
576 .bind(&existing_feed_id)
577 .fetch_one(&pool)
578 .await
579 .unwrap();
580 assert_eq!(old_count, 0);
581 }
582
583 #[tokio::test]
584 async fn apply_remote_changes_empty_is_no_op() {
585 let pool = setup_test_db().await;
586 apply_remote_changes(&pool, vec![]).await.unwrap();
587
588 // applying_remote flag should be cleared
589 let flag = get_sync_state(&pool, "applying_remote").await.unwrap();
590 assert_eq!(flag, "0");
591 }
592
593 // ── pull_changes with wiremock ──
594
595 mod pull {
596 use super::*;
597 use synckit_client::{SyncKitClient, SyncKitConfig};
598 use wiremock::matchers::{method, path};
599 use wiremock::{Mock, MockServer, ResponseTemplate};
600
601 const TEST_KEY: [u8; 32] = [42u8; 32];
602
603 fn mock_client(server_url: &str) -> SyncKitClient {
604 let client = SyncKitClient::new(SyncKitConfig {
605 server_url: server_url.to_string(),
606 api_key: "test-key".to_string(),
607 });
608 client.restore_session(
609 "fake-token",
610 Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
611 Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(),
612 );
613 client.set_master_key_raw(TEST_KEY);
614 client
615 }
616
617 /// Encrypt a JSON value with the test key for mock responses.
618 fn encrypt_for_mock(value: &serde_json::Value) -> serde_json::Value {
619 synckit_client::crypto::encrypt_json(value, &TEST_KEY).unwrap()
620 }
621
622 #[tokio::test]
623 async fn pull_changes_no_remote_changes() {
624 let pool = setup_test_db().await;
625 set_sync_state(&pool, "pull_cursor", "0").await.unwrap();
626
627 let server = MockServer::start().await;
628 Mock::given(method("POST"))
629 .and(path("/api/v1/sync/pull"))
630 .respond_with(
631 ResponseTemplate::new(200).set_body_json(json!({
632 "changes": [],
633 "cursor": 0,
634 "has_more": false
635 })),
636 )
637 .expect(1)
638 .mount(&server)
639 .await;
640
641 let client = mock_client(&server.uri());
642 let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
643 assert_eq!(result, 0);
644 }
645
646 #[tokio::test]
647 async fn pull_changes_applies_remote_feed_insert() {
648 let pool = setup_test_db().await;
649 set_sync_state(&pool, "pull_cursor", "0").await.unwrap();
650
651 let feed_id = Uuid::new_v4().to_string();
652 let device_id = Uuid::new_v4();
653
654 let feed_data = json!({
655 "id": feed_id,
656 "busser_id": "rss",
657 "name": "Pulled Feed",
658 "config": "{}",
659 "enabled": 1,
660 "last_fetch": null,
661 "created_at": "2024-06-01 00:00:00",
662 "updated_at": "2024-06-01 00:00:00",
663 "consecutive_failures": 0,
664 "last_error": null,
665 "last_success_at": null,
666 "circuit_broken": 0,
667 });
668
669 let server = MockServer::start().await;
670 Mock::given(method("POST"))
671 .and(path("/api/v1/sync/pull"))
672 .respond_with(
673 ResponseTemplate::new(200).set_body_json(json!({
674 "changes": [{
675 "seq": 1,
676 "device_id": device_id.to_string(),
677 "table": "feeds",
678 "op": "INSERT",
679 "row_id": feed_id,
680 "timestamp": "2024-06-01T00:00:00Z",
681 "data": encrypt_for_mock(&feed_data),
682 }],
683 "cursor": 1,
684 "has_more": false
685 })),
686 )
687 .expect(1)
688 .mount(&server)
689 .await;
690
691 let client = mock_client(&server.uri());
692 let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
693 assert_eq!(result, 1);
694
695 // Verify the feed was inserted into local DB
696 let (name,): (String,) =
697 sqlx::query_as("SELECT name FROM feeds WHERE id = ?")
698 .bind(&feed_id)
699 .fetch_one(&pool)
700 .await
701 .unwrap();
702 assert_eq!(name, "Pulled Feed");
703
704 // Verify cursor was updated
705 let cursor = get_sync_state(&pool, "pull_cursor").await.unwrap();
706 assert_eq!(cursor, "1");
707 }
708
709 #[tokio::test]
710 async fn pull_changes_updates_cursor_on_empty_response() {
711 let pool = setup_test_db().await;
712 set_sync_state(&pool, "pull_cursor", "5").await.unwrap();
713
714 let server = MockServer::start().await;
715 Mock::given(method("POST"))
716 .and(path("/api/v1/sync/pull"))
717 .respond_with(
718 ResponseTemplate::new(200).set_body_json(json!({
719 "changes": [],
720 "cursor": 10,
721 "has_more": false
722 })),
723 )
724 .expect(1)
725 .mount(&server)
726 .await;
727
728 let client = mock_client(&server.uri());
729 let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
730 assert_eq!(result, 0);
731
732 // Cursor should be updated even with no changes
733 let cursor = get_sync_state(&pool, "pull_cursor").await.unwrap();
734 assert_eq!(cursor, "10");
735 }
736
737 #[tokio::test]
738 async fn pull_changes_handles_has_more_pagination() {
739 let pool = setup_test_db().await;
740 set_sync_state(&pool, "pull_cursor", "0").await.unwrap();
741
742 let feed_id_1 = Uuid::new_v4().to_string();
743 let feed_id_2 = Uuid::new_v4().to_string();
744 let device_id = Uuid::new_v4();
745
746 let feed_data_1 = json!({
747 "id": feed_id_1,
748 "busser_id": "rss",
749 "name": "Page 1 Feed",
750 "config": "{}",
751 "enabled": 1,
752 "last_fetch": null,
753 "created_at": "2024-06-01 00:00:00",
754 "updated_at": "2024-06-01 00:00:00",
755 "consecutive_failures": 0,
756 "last_error": null,
757 "last_success_at": null,
758 "circuit_broken": 0,
759 });
760
761 let feed_data_2 = json!({
762 "id": feed_id_2,
763 "busser_id": "rss",
764 "name": "Page 2 Feed",
765 "config": "{}",
766 "enabled": 1,
767 "last_fetch": null,
768 "created_at": "2024-06-01 00:00:00",
769 "updated_at": "2024-06-01 00:00:00",
770 "consecutive_failures": 0,
771 "last_error": null,
772 "last_success_at": null,
773 "circuit_broken": 0,
774 });
775
776 let server = MockServer::start().await;
777
778 // First page: has_more = true
779 Mock::given(method("POST"))
780 .and(path("/api/v1/sync/pull"))
781 .respond_with(
782 ResponseTemplate::new(200).set_body_json(json!({
783 "changes": [{
784 "seq": 1,
785 "device_id": device_id.to_string(),
786 "table": "feeds",
787 "op": "INSERT",
788 "row_id": feed_id_1,
789 "timestamp": "2024-06-01T00:00:00Z",
790 "data": encrypt_for_mock(&feed_data_1),
791 }],
792 "cursor": 1,
793 "has_more": true
794 })),
795 )
796 .up_to_n_times(1)
797 .mount(&server)
798 .await;
799
800 // Second page: has_more = false
801 Mock::given(method("POST"))
802 .and(path("/api/v1/sync/pull"))
803 .respond_with(
804 ResponseTemplate::new(200).set_body_json(json!({
805 "changes": [{
806 "seq": 2,
807 "device_id": device_id.to_string(),
808 "table": "feeds",
809 "op": "INSERT",
810 "row_id": feed_id_2,
811 "timestamp": "2024-06-01T00:00:01Z",
812 "data": encrypt_for_mock(&feed_data_2),
813 }],
814 "cursor": 2,
815 "has_more": false
816 })),
817 )
818 .up_to_n_times(1)
819 .mount(&server)
820 .await;
821
822 let client = mock_client(&server.uri());
823 let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
824 assert_eq!(result, 2);
825
826 // Both feeds should exist
827 let (count,): (i64,) =
828 sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id IN (?, ?)")
829 .bind(&feed_id_1)
830 .bind(&feed_id_2)
831 .fetch_one(&pool)
832 .await
833 .unwrap();
834 assert_eq!(count, 2);
835
836 // Cursor should be at the final value
837 let cursor = get_sync_state(&pool, "pull_cursor").await.unwrap();
838 assert_eq!(cursor, "2");
839 }
840
841 #[tokio::test]
842 async fn pull_changes_applies_delete() {
843 let pool = setup_test_db().await;
844 set_sync_state(&pool, "pull_cursor", "0").await.unwrap();
845
846 // Suppress triggers and create a feed to delete
847 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
848 let feed_id = create_test_feed(&pool, "Soon Deleted").await;
849 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
850
851 let device_id = Uuid::new_v4();
852
853 let server = MockServer::start().await;
854 Mock::given(method("POST"))
855 .and(path("/api/v1/sync/pull"))
856 .respond_with(
857 ResponseTemplate::new(200).set_body_json(json!({
858 "changes": [{
859 "seq": 1,
860 "device_id": device_id.to_string(),
861 "table": "feeds",
862 "op": "DELETE",
863 "row_id": feed_id,
864 "timestamp": "2024-06-01T00:00:00Z",
865 }],
866 "cursor": 1,
867 "has_more": false
868 })),
869 )
870 .expect(1)
871 .mount(&server)
872 .await;
873
874 let client = mock_client(&server.uri());
875 let result = pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
876 assert_eq!(result, 1);
877
878 let (count,): (i64,) =
879 sqlx::query_as("SELECT COUNT(*) FROM feeds WHERE id = ?")
880 .bind(&feed_id)
881 .fetch_one(&pool)
882 .await
883 .unwrap();
884 assert_eq!(count, 0, "feed should be deleted after pull");
885 }
886
887 #[tokio::test]
888 async fn pull_changes_does_not_create_changelog_entries() {
889 let pool = setup_test_db().await;
890 set_sync_state(&pool, "pull_cursor", "0").await.unwrap();
891 sqlx::query("DELETE FROM sync_changelog")
892 .execute(&pool)
893 .await
894 .unwrap();
895
896 let feed_id = Uuid::new_v4().to_string();
897 let device_id = Uuid::new_v4();
898
899 let feed_data = json!({
900 "id": feed_id,
901 "busser_id": "rss",
902 "name": "No Changelog Feed",
903 "config": "{}",
904 "enabled": 1,
905 "last_fetch": null,
906 "created_at": "2024-06-01 00:00:00",
907 "updated_at": "2024-06-01 00:00:00",
908 "consecutive_failures": 0,
909 "last_error": null,
910 "last_success_at": null,
911 "circuit_broken": 0,
912 });
913
914 let server = MockServer::start().await;
915 Mock::given(method("POST"))
916 .and(path("/api/v1/sync/pull"))
917 .respond_with(
918 ResponseTemplate::new(200).set_body_json(json!({
919 "changes": [{
920 "seq": 1,
921 "device_id": device_id.to_string(),
922 "table": "feeds",
923 "op": "INSERT",
924 "row_id": feed_id,
925 "timestamp": "2024-06-01T00:00:00Z",
926 "data": encrypt_for_mock(&feed_data),
927 }],
928 "cursor": 1,
929 "has_more": false
930 })),
931 )
932 .expect(1)
933 .mount(&server)
934 .await;
935
936 let client = mock_client(&server.uri());
937 pull_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
938
939 // No sync_changelog entries should be created (triggers suppressed)
940 let (count,): (i64,) =
941 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
942 .fetch_one(&pool)
943 .await
944 .unwrap();
945 assert_eq!(
946 count, 0,
947 "pulled changes should not create local changelog entries"
948 );
949 }
950 }
951
952 // ── apply_upsert ──
953
954 #[tokio::test]
955 async fn apply_upsert_inserts_feed() {
956 let pool = setup_test_db().await;
957 let feed_id = Uuid::new_v4().to_string();
958
959 let data = json!({
960 "id": feed_id,
961 "busser_id": "rss",
962 "name": "Remote Feed",
963 "config": "{}",
964 "enabled": 1,
965 "last_fetch": null,
966 "created_at": "2024-01-01 00:00:00",
967 "updated_at": "2024-01-01 00:00:00",
968 "consecutive_failures": 0,
969 "last_error": null,
970 "last_success_at": null,
971 "circuit_broken": 0,
972 });
973
974 apply_upsert(&pool, "feeds", &feed_id, &data).await.unwrap();
975
976 let row: (String,) = sqlx::query_as("SELECT name FROM feeds WHERE id = ?")
977 .bind(&feed_id)
978 .fetch_one(&pool)
979 .await
980 .unwrap();
981 assert_eq!(row.0, "Remote Feed");
982 }
983
984 #[tokio::test]
985 async fn apply_upsert_feed_items_partial_update() {
986 let pool = setup_test_db().await;
987 let feed_id = create_test_feed(&pool, "Partial Feed").await;
988 let item_id = create_test_item(&pool, &feed_id, "ext-partial").await;
989
990 // BB's unique pattern: feed_items use UPDATE not INSERT OR REPLACE
991 let data = json!({
992 "id": item_id,
993 "is_read": 1,
994 "is_starred": 1,
995 });
996
997 apply_upsert(&pool, "feed_items", &item_id, &data).await.unwrap();
998
999 // Other columns should be preserved
1000 let row: (String, i32, i32) = sqlx::query_as(
1001 "SELECT title, is_read, is_starred FROM feed_items WHERE id = ?"
1002 )
1003 .bind(&item_id)
1004 .fetch_one(&pool)
1005 .await
1006 .unwrap();
1007 assert_eq!(row.0, "title"); // Original value preserved
1008 assert_eq!(row.1, 1); // Updated
1009 assert_eq!(row.2, 1); // Updated
1010 }
1011
1012 // ── apply_delete ──
1013
1014 #[tokio::test]
1015 async fn apply_delete_feed_tags_composite_pk() {
1016 let pool = setup_test_db().await;
1017 let feed_id = create_test_feed(&pool, "Tag Delete Feed").await;
1018 sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'delete-me')")
1019 .bind(&feed_id)
1020 .execute(&pool)
1021 .await
1022 .unwrap();
1023
1024 apply_delete(&pool, "feed_tags", &format!("{}:delete-me", feed_id)).await.unwrap();
1025
1026 let count: (i64,) = sqlx::query_as(
1027 "SELECT COUNT(*) FROM feed_tags WHERE feed_id = ? AND tag = 'delete-me'"
1028 )
1029 .bind(&feed_id)
1030 .fetch_one(&pool)
1031 .await
1032 .unwrap();
1033 assert_eq!(count.0, 0);
1034 }
1035
1036 #[tokio::test]
1037 async fn apply_delete_feed_tags_with_colon_in_tag() {
1038 let pool = setup_test_db().await;
1039 let feed_id = create_test_feed(&pool, "Colon Tag Feed").await;
1040
1041 // Insert a tag that contains a colon
1042 sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'cat:subcat')")
1043 .bind(&feed_id)
1044 .execute(&pool)
1045 .await
1046 .unwrap();
1047
1048 // The row_id format is "feed_id:tag", so with a colon in the tag
1049 // it becomes "uuid:cat:subcat"
1050 let row_id = format!("{}:cat:subcat", feed_id);
1051 apply_delete(&pool, "feed_tags", &row_id).await.unwrap();
1052
1053 let count: (i64,) = sqlx::query_as(
1054 "SELECT COUNT(*) FROM feed_tags WHERE feed_id = ? AND tag = 'cat:subcat'"
1055 )
1056 .bind(&feed_id)
1057 .fetch_one(&pool)
1058 .await
1059 .unwrap();
1060 assert_eq!(count.0, 0, "tag with colon should be deleted");
1061 }
1062
1063 #[tokio::test]
1064 async fn apply_delete_feed_tags_simple_tag() {
1065 let pool = setup_test_db().await;
1066 let feed_id = create_test_feed(&pool, "Simple Tag Feed").await;
1067
1068 sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'tech')")
1069 .bind(&feed_id)
1070 .execute(&pool)
1071 .await
1072 .unwrap();
1073
1074 let row_id = format!("{}:tech", feed_id);
1075 apply_delete(&pool, "feed_tags", &row_id).await.unwrap();
1076
1077 let count: (i64,) = sqlx::query_as(
1078 "SELECT COUNT(*) FROM feed_tags WHERE feed_id = ? AND tag = 'tech'"
1079 )
1080 .bind(&feed_id)
1081 .fetch_one(&pool)
1082 .await
1083 .unwrap();
1084 assert_eq!(count.0, 0, "simple tag should be deleted");
1085 }
1086
1087 #[tokio::test]
1088 async fn apply_delete_feed_items_is_no_op() {
1089 let pool = setup_test_db().await;
1090 let feed_id = create_test_feed(&pool, "No-op Delete Feed").await;
1091 let item_id = create_test_item(&pool, &feed_id, "ext-no-op").await;
1092
1093 // BB skips feed_items deletes (content re-fetches from source)
1094 apply_delete(&pool, "feed_items", &item_id).await.unwrap();
1095
1096 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE id = ?")
1097 .bind(&item_id)
1098 .fetch_one(&pool)
1099 .await
1100 .unwrap();
1101 assert_eq!(count.0, 1);
1102 }
1103 }
1104