//! Initial snapshot creation, changelog cleanup, and retention enforcement. use sqlx::SqlitePool; use tracing::{debug, info, instrument, warn}; use crate::commands::error::ApiError; use super::{set_sync_state, MAX_CHANGELOG_ENTRIES}; /// One-time: snapshot all existing rows into the changelog for the first push. #[instrument(skip_all)] pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result { let mut total: i64 = 0; // feeds let result = sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ SELECT 'feeds', 'INSERT', id, json_object(\ 'id', id, 'busser_id', busser_id, 'name', name, 'config', config, \ 'enabled', enabled, 'last_fetch', last_fetch, \ 'created_at', created_at, 'updated_at', updated_at, \ 'consecutive_failures', consecutive_failures, \ 'last_error', last_error, 'last_success_at', last_success_at, \ 'circuit_broken', circuit_broken\ ) FROM feeds", ) .execute(pool) .await .map_err(super::db_err)?; total += result.rows_affected() as i64; // feed_tags let result = sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ SELECT 'feed_tags', 'INSERT', feed_id || ':' || tag, \ json_object('feed_id', feed_id, 'tag', tag) FROM feed_tags", ) .execute(pool) .await .map_err(super::db_err)?; total += result.rows_affected() as i64; // query_feeds let result = sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ SELECT 'query_feeds', 'INSERT', id, json_object(\ 'id', id, 'name', name, 'rules', rules, \ 'created_at', created_at, 'updated_at', updated_at\ ) FROM query_feeds", ) .execute(pool) .await .map_err(super::db_err)?; total += result.rows_affected() as i64; // bookmarks let result = sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ SELECT 'bookmarks', 'INSERT', id, json_object(\ 'id', id, 'url', url, 'title', title, 'description', description, \ 'author', author, 'source_name', source_name, 'feed_item_id', feed_item_id, \ 'notes', notes, 'is_pinned', is_pinned, \ 'created_at', created_at, 'updated_at', updated_at\ ) FROM bookmarks", ) .execute(pool) .await .map_err(super::db_err)?; total += result.rows_affected() as i64; // bookmark_tags let result = sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ SELECT 'bookmark_tags', 'INSERT', bookmark_id || ':' || tag, \ json_object('bookmark_id', bookmark_id, 'tag', tag) FROM bookmark_tags", ) .execute(pool) .await .map_err(super::db_err)?; total += result.rows_affected() as i64; // user_config let result = sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ SELECT 'user_config', 'INSERT', key, \ json_object('key', key, 'value', value) FROM user_config", ) .execute(pool) .await .map_err(super::db_err)?; total += result.rows_affected() as i64; // feed_items (user state only — is_read, is_starred) let result = sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ SELECT 'feed_items', 'UPDATE', id, \ json_object('id', id, 'is_read', is_read, 'is_starred', is_starred) \ FROM feed_items WHERE is_read = 1 OR is_starred = 1", ) .execute(pool) .await .map_err(super::db_err)?; total += result.rows_affected() as i64; set_sync_state(pool, "initial_snapshot_done", "1").await?; info!(rows = total, "Initial sync snapshot created"); Ok(total) } // ── Changelog cleanup ── /// Prune pushed entries older than 7 days. #[instrument(skip_all)] pub async fn cleanup_changelog(pool: &SqlitePool) -> Result { let result = sqlx::query( "DELETE FROM sync_changelog WHERE pushed = 1 AND timestamp < datetime('now', '-7 days')", ) .execute(pool) .await .map_err(super::db_err)?; let deleted = result.rows_affected() as i64; if deleted > 0 { debug!(count = deleted, "Cleaned up old changelog entries"); } Ok(deleted) } /// Enforce a hard cap on total changelog entries to prevent unbounded growth /// when sync is disconnected or failing. Deletes the oldest entries (by rowid) /// to bring the count back under `MAX_CHANGELOG_ENTRIES`. #[instrument(skip_all)] pub async fn enforce_changelog_retention(pool: &SqlitePool) -> Result { let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(pool) .await .map_err(super::db_err)?; if count <= MAX_CHANGELOG_ENTRIES { return Ok(0); } let excess = count - MAX_CHANGELOG_ENTRIES; let result = sqlx::query( "DELETE FROM sync_changelog WHERE id IN \ (SELECT id FROM sync_changelog ORDER BY id ASC LIMIT ?)", ) .bind(excess) .execute(pool) .await .map_err(super::db_err)?; let deleted = result.rows_affected() as i64; warn!( deleted, total = count, limit = MAX_CHANGELOG_ENTRIES, "Changelog retention cap enforced" ); Ok(deleted) } /// Count unpushed changes. #[instrument(skip_all)] pub async fn count_pending_changes(pool: &SqlitePool) -> Result { let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0") .fetch_one(pool) .await .map_err(super::db_err)?; Ok(row.0) } #[cfg(test)] mod tests { use super::super::tests::*; use super::super::{set_sync_state, MAX_CHANGELOG_ENTRIES}; use super::*; // ── Initial snapshot ── #[tokio::test] async fn create_initial_snapshot_captures_all_tables() { let pool = setup_test_db().await; // Suppress triggers during data setup set_sync_state(&pool, "applying_remote", "1").await.unwrap(); let feed_id = create_test_feed(&pool, "Snapshot Feed").await; sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'snap')") .bind(&feed_id) .execute(&pool) .await .unwrap(); sqlx::query("INSERT INTO user_config (key, value) VALUES ('pref', 'yes')") .execute(&pool) .await .unwrap(); // Create a starred item (only starred/read items are snapshotted for feed_items) let item_id = create_test_item(&pool, &feed_id, "ext-star").await; sqlx::query("UPDATE feed_items SET is_starred = 1 WHERE id = ?") .bind(&item_id) .execute(&pool) .await .unwrap(); set_sync_state(&pool, "applying_remote", "0").await.unwrap(); sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); let total = create_initial_snapshot(&pool).await.unwrap(); assert_eq!(total, 4); // 1 feed + 1 feed_tag + 1 user_config + 1 starred item } // ── Changelog retention cap ── #[tokio::test] async fn enforce_changelog_retention_caps_entries() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); // Insert more entries than the cap (use a smaller batch for test speed). // We test with MAX_CHANGELOG_ENTRIES + 500 entries. let total_to_insert = MAX_CHANGELOG_ENTRIES + 500; for i in 0..total_to_insert { sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ VALUES ('feeds', 'UPDATE', ?, '{}')", ) .bind(format!("row-{}", i)) .execute(&pool) .await .unwrap(); } // Verify we have more than the cap let (before,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool) .await .unwrap(); assert_eq!(before, total_to_insert); // Enforce retention let deleted = enforce_changelog_retention(&pool).await.unwrap(); assert_eq!(deleted, 500); // Verify count is at the cap let (after,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool) .await .unwrap(); assert_eq!(after, MAX_CHANGELOG_ENTRIES); // The oldest entries (lowest rowids) should have been removed. // The remaining entries should be the newest ones (row-500 through row-10499). let (min_row_id,): (String,) = sqlx::query_as( "SELECT row_id FROM sync_changelog ORDER BY id ASC LIMIT 1", ) .fetch_one(&pool) .await .unwrap(); assert_eq!(min_row_id, "row-500"); } #[tokio::test] async fn enforce_changelog_retention_no_op_under_cap() { let pool = setup_test_db().await; sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); // Insert fewer entries than the cap for i in 0..10 { sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ VALUES ('feeds', 'INSERT', ?, '{}')", ) .bind(format!("row-{}", i)) .execute(&pool) .await .unwrap(); } let deleted = enforce_changelog_retention(&pool).await.unwrap(); assert_eq!(deleted, 0); let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool) .await .unwrap(); assert_eq!(count, 10); } }