//! Core sync engine: push local changes, pull remote changes, apply to DB. mod download; mod snapshot; mod upload; use chrono::Utc; use serde::{Deserialize, Serialize}; use sqlx::SqlitePool; use synckit_client::SyncKitClient; use tracing::{info, instrument}; use uuid::Uuid; use crate::commands::error::ApiError; pub use download::*; pub use snapshot::*; pub use upload::*; /// Convert a sqlx error to an ApiError, logging the full error server-side /// and returning a generic message to the frontend. Mirrors the behavior of /// `From for ApiError` in commands/error.rs. pub(crate) fn db_err(e: sqlx::Error) -> ApiError { tracing::error!(error = %e, "Sync database error"); ApiError::database("A database error occurred") } /// Maximum changes to push in a single batch. pub(crate) const PUSH_BATCH_LIMIT: i64 = 500; /// Hard cap on total changelog entries. Prevents unbounded growth when sync is /// disconnected or failing. Oldest entries (by rowid) are dropped when exceeded. pub(crate) const MAX_CHANGELOG_ENTRIES: i64 = 10_000; /// Tables in FK-safe order for upserts (parents first). pub(crate) const UPSERT_ORDER: &[&str] = &["feeds", "feed_tags", "query_feeds", "bookmarks", "bookmark_tags", "user_config", "feed_items"]; /// Tables in reverse FK-safe order for deletes (children first). pub(crate) const DELETE_ORDER: &[&str] = &["feed_items", "user_config", "bookmark_tags", "bookmarks", "query_feeds", "feed_tags", "feeds"]; /// Result of a sync operation. #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct SyncResult { pub pushed: i64, pub pulled: i64, } // ── sync_state helpers ── #[instrument(skip_all)] pub async fn get_sync_state(pool: &SqlitePool, key: &str) -> Result { let row: Option<(String,)> = sqlx::query_as("SELECT value FROM sync_state WHERE key = ?") .bind(key) .fetch_optional(pool) .await .map_err(db_err)?; Ok(row.map(|r| r.0).unwrap_or_default()) } #[instrument(skip_all)] pub async fn set_sync_state(pool: &SqlitePool, key: &str, value: &str) -> Result<(), ApiError> { sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES (?, ?)") .bind(key) .bind(value) .execute(pool) .await .map_err(db_err)?; Ok(()) } /// Clear all sync state (device_id, cursors, flags) for disconnect. #[instrument(skip_all)] pub async fn clear_all_sync_state(pool: &SqlitePool) -> Result<(), ApiError> { sqlx::query("DELETE FROM sync_state") .execute(pool) .await .map_err(db_err)?; sqlx::query("DELETE FROM sync_changelog") .execute(pool) .await .map_err(db_err)?; Ok(()) } // ── Device registration ── #[instrument(skip_all)] pub async fn ensure_device_registered( pool: &SqlitePool, client: &SyncKitClient, ) -> Result { let stored = get_sync_state(pool, "device_id").await?; if !stored.is_empty() { return stored .parse::() .map_err(|e| ApiError::internal(format!("invalid stored device_id: {}", e))); } let hostname = std::env::var("HOSTNAME") .or_else(|_| std::env::var("COMPUTERNAME")) .unwrap_or_else(|_| "BalancedBreakfast Desktop".to_string()); let platform = std::env::consts::OS.to_string(); let device = client .register_device(&hostname, &platform) .await .map_err(|e| ApiError::internal(format!("device registration failed: {}", e)))?; set_sync_state(pool, "device_id", &device.id.to_string()).await?; info!(device_name = %device.device_name, device_id = %device.id, "Registered device"); Ok(device.id) } // ── High-level sync ── #[instrument(skip_all)] pub async fn perform_sync(pool: &SqlitePool, client: &SyncKitClient) -> Result { // Clear applying_remote flag in case a previous sync crashed mid-apply. // If the flag is stuck at "1", all local changes silently skip the changelog. set_sync_state(pool, "applying_remote", "0").await?; let device_id = ensure_device_registered(pool, client).await?; // Push first, then pull let pushed = push_changes(pool, client, device_id).await?; let pulled = pull_changes(pool, client, device_id).await?; // Update last sync timestamp let now = Utc::now().to_rfc3339(); set_sync_state(pool, "last_sync_at", &now).await?; Ok(SyncResult { pushed, pulled }) } #[cfg(test)] mod tests { use super::*; use uuid::Uuid; pub(crate) async fn setup_test_db() -> SqlitePool { let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); sqlx::migrate!("../migrations/sqlite") .run(&pool) .await .unwrap(); pool } pub(crate) async fn create_test_feed(pool: &SqlitePool, name: &str) -> String { let id = Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO feeds (id, busser_id, name, config, enabled, created_at, updated_at, consecutive_failures) \ VALUES (?, 'rss', ?, '{}', 1, datetime('now'), datetime('now'), 0)" ) .bind(&id) .bind(name) .execute(pool) .await .unwrap(); id } pub(crate) async fn create_test_item(pool: &SqlitePool, feed_id: &str, external_id: &str) -> String { let id = Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO feed_items (id, external_id, feed_id, busser_id, bite_author, bite_text, \ title, published_at, fetched_at, source_name, media, tags, created_at, updated_at) \ VALUES (?, ?, ?, 'rss', 'author', 'text', 'title', datetime('now'), datetime('now'), \ 'Test', '[]', '[]', datetime('now'), datetime('now'))" ) .bind(&id) .bind(external_id) .bind(feed_id) .execute(pool) .await .unwrap(); id } // ── FK ordering ── #[test] fn upsert_order_parents_before_children() { let pos = |t: &str| UPSERT_ORDER.iter().position(|x| *x == t).unwrap(); assert!(pos("feeds") < pos("feed_tags")); assert!(pos("feeds") < pos("feed_items")); } #[test] fn delete_order_children_before_parents() { let pos = |t: &str| DELETE_ORDER.iter().position(|x| *x == t).unwrap(); assert!(pos("feed_tags") < pos("feeds")); assert!(pos("feed_items") < pos("feeds")); } #[test] fn orders_are_exact_reverses() { let reversed: Vec<&str> = UPSERT_ORDER.iter().rev().copied().collect(); assert_eq!(reversed, DELETE_ORDER); } // ── Column whitelists ── #[test] fn all_tables_have_column_whitelists() { for table in UPSERT_ORDER { assert!( super::download::table_columns(table).is_some(), "missing column whitelist for: {}", table ); } } #[test] fn unknown_table_returns_none() { assert!(super::download::table_columns("nonexistent").is_none()); assert!(super::download::table_columns("busser_state").is_none()); } // ── Triggers ── #[tokio::test] async fn feed_insert_fires_trigger() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); let feed_id = create_test_feed(&pool, "Test Feed").await; let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'feeds' AND op = 'INSERT'" ) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 1); // Verify 12-column JSON data (includes circuit_broken) let data: (String,) = sqlx::query_as( "SELECT data FROM sync_changelog WHERE table_name = 'feeds' AND row_id = ?" ) .bind(&feed_id) .fetch_one(&pool) .await .unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap(); let obj = parsed.as_object().unwrap(); assert_eq!(obj.len(), 12); assert_eq!(parsed["name"], "Test Feed"); } #[tokio::test] async fn feed_tags_insert_fires_trigger() { let pool = setup_test_db().await; let feed_id = create_test_feed(&pool, "Tagged Feed").await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'tech')") .bind(&feed_id) .execute(&pool) .await .unwrap(); let row: (String, String) = sqlx::query_as( "SELECT row_id, data FROM sync_changelog WHERE table_name = 'feed_tags'" ) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, format!("{}:tech", feed_id)); let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap(); assert_eq!(parsed["feed_id"], feed_id); assert_eq!(parsed["tag"], "tech"); } #[tokio::test] async fn user_config_insert_fires_trigger() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); sqlx::query("INSERT INTO user_config (key, value) VALUES ('theme', 'dark')") .execute(&pool) .await .unwrap(); let row: (String, String) = sqlx::query_as( "SELECT row_id, data FROM sync_changelog WHERE table_name = 'user_config'" ) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "theme"); let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap(); assert_eq!(parsed["key"], "theme"); assert_eq!(parsed["value"], "dark"); } #[tokio::test] async fn feed_items_update_fires_trigger() { let pool = setup_test_db().await; let feed_id = create_test_feed(&pool, "Items Feed").await; let item_id = create_test_item(&pool, &feed_id, "ext-1").await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); // Update is_read — should fire trigger sqlx::query("UPDATE feed_items SET is_read = 1 WHERE id = ?") .bind(&item_id) .execute(&pool) .await .unwrap(); let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'feed_items' AND op = 'UPDATE'" ) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 1); // Data should only have id, is_read, is_starred let data: (String,) = sqlx::query_as( "SELECT data FROM sync_changelog WHERE table_name = 'feed_items'" ) .fetch_one(&pool) .await .unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap(); let obj = parsed.as_object().unwrap(); assert_eq!(obj.len(), 3); assert!(obj.contains_key("id")); assert!(obj.contains_key("is_read")); assert!(obj.contains_key("is_starred")); } // ── Trigger suppression ── #[tokio::test] async fn trigger_suppression_during_remote_apply() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); set_sync_state(&pool, "applying_remote", "1").await.unwrap(); let _id = create_test_feed(&pool, "Suppressed").await; let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0, "triggers should be suppressed"); set_sync_state(&pool, "applying_remote", "0").await.unwrap(); let _id = create_test_feed(&pool, "Not Suppressed").await; let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 1, "trigger should fire normally"); } }