Skip to main content

max / balanced_breakfast

9.8 KB · 300 lines History Blame Raw
1 //! Initial snapshot creation, changelog cleanup, and retention enforcement.
2
3 use sqlx::SqlitePool;
4 use tracing::{debug, info, instrument, warn};
5
6 use crate::commands::error::ApiError;
7
8 use super::{set_sync_state, MAX_CHANGELOG_ENTRIES};
9
10 /// One-time: snapshot all existing rows into the changelog for the first push.
11 #[instrument(skip_all)]
12 pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, ApiError> {
13 let mut total: i64 = 0;
14
15 // feeds
16 let result = sqlx::query(
17 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
18 SELECT 'feeds', 'INSERT', id, json_object(\
19 'id', id, 'busser_id', busser_id, 'name', name, 'config', config, \
20 'enabled', enabled, 'last_fetch', last_fetch, \
21 'created_at', created_at, 'updated_at', updated_at, \
22 'consecutive_failures', consecutive_failures, \
23 'last_error', last_error, 'last_success_at', last_success_at, \
24 'circuit_broken', circuit_broken\
25 ) FROM feeds",
26 )
27 .execute(pool)
28 .await
29 .map_err(super::db_err)?;
30 total += result.rows_affected() as i64;
31
32 // feed_tags
33 let result = sqlx::query(
34 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
35 SELECT 'feed_tags', 'INSERT', feed_id || ':' || tag, \
36 json_object('feed_id', feed_id, 'tag', tag) FROM feed_tags",
37 )
38 .execute(pool)
39 .await
40 .map_err(super::db_err)?;
41 total += result.rows_affected() as i64;
42
43 // query_feeds
44 let result = sqlx::query(
45 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
46 SELECT 'query_feeds', 'INSERT', id, json_object(\
47 'id', id, 'name', name, 'rules', rules, \
48 'created_at', created_at, 'updated_at', updated_at\
49 ) FROM query_feeds",
50 )
51 .execute(pool)
52 .await
53 .map_err(super::db_err)?;
54 total += result.rows_affected() as i64;
55
56 // bookmarks
57 let result = sqlx::query(
58 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
59 SELECT 'bookmarks', 'INSERT', id, json_object(\
60 'id', id, 'url', url, 'title', title, 'description', description, \
61 'author', author, 'source_name', source_name, 'feed_item_id', feed_item_id, \
62 'notes', notes, 'is_pinned', is_pinned, \
63 'created_at', created_at, 'updated_at', updated_at\
64 ) FROM bookmarks",
65 )
66 .execute(pool)
67 .await
68 .map_err(super::db_err)?;
69 total += result.rows_affected() as i64;
70
71 // bookmark_tags
72 let result = sqlx::query(
73 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
74 SELECT 'bookmark_tags', 'INSERT', bookmark_id || ':' || tag, \
75 json_object('bookmark_id', bookmark_id, 'tag', tag) FROM bookmark_tags",
76 )
77 .execute(pool)
78 .await
79 .map_err(super::db_err)?;
80 total += result.rows_affected() as i64;
81
82 // user_config
83 let result = sqlx::query(
84 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
85 SELECT 'user_config', 'INSERT', key, \
86 json_object('key', key, 'value', value) FROM user_config",
87 )
88 .execute(pool)
89 .await
90 .map_err(super::db_err)?;
91 total += result.rows_affected() as i64;
92
93 // feed_items (user state only — is_read, is_starred)
94 let result = sqlx::query(
95 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
96 SELECT 'feed_items', 'UPDATE', id, \
97 json_object('id', id, 'is_read', is_read, 'is_starred', is_starred) \
98 FROM feed_items WHERE is_read = 1 OR is_starred = 1",
99 )
100 .execute(pool)
101 .await
102 .map_err(super::db_err)?;
103 total += result.rows_affected() as i64;
104
105 set_sync_state(pool, "initial_snapshot_done", "1").await?;
106 info!(rows = total, "Initial sync snapshot created");
107 Ok(total)
108 }
109
110 // ── Changelog cleanup ──
111
112 /// Prune pushed entries older than 7 days.
113 #[instrument(skip_all)]
114 pub async fn cleanup_changelog(pool: &SqlitePool) -> Result<i64, ApiError> {
115 let result = sqlx::query(
116 "DELETE FROM sync_changelog WHERE pushed = 1 AND timestamp < datetime('now', '-7 days')",
117 )
118 .execute(pool)
119 .await
120 .map_err(super::db_err)?;
121
122 let deleted = result.rows_affected() as i64;
123 if deleted > 0 {
124 debug!(count = deleted, "Cleaned up old changelog entries");
125 }
126 Ok(deleted)
127 }
128
129 /// Enforce a hard cap on total changelog entries to prevent unbounded growth
130 /// when sync is disconnected or failing. Deletes the oldest entries (by rowid)
131 /// to bring the count back under `MAX_CHANGELOG_ENTRIES`.
132 #[instrument(skip_all)]
133 pub async fn enforce_changelog_retention(pool: &SqlitePool) -> Result<i64, ApiError> {
134 let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
135 .fetch_one(pool)
136 .await
137 .map_err(super::db_err)?;
138
139 if count <= MAX_CHANGELOG_ENTRIES {
140 return Ok(0);
141 }
142
143 let excess = count - MAX_CHANGELOG_ENTRIES;
144
145 let result = sqlx::query(
146 "DELETE FROM sync_changelog WHERE id IN \
147 (SELECT id FROM sync_changelog ORDER BY id ASC LIMIT ?)",
148 )
149 .bind(excess)
150 .execute(pool)
151 .await
152 .map_err(super::db_err)?;
153
154 let deleted = result.rows_affected() as i64;
155 warn!(
156 deleted,
157 total = count,
158 limit = MAX_CHANGELOG_ENTRIES,
159 "Changelog retention cap enforced"
160 );
161 Ok(deleted)
162 }
163
164 /// Count unpushed changes.
165 #[instrument(skip_all)]
166 pub async fn count_pending_changes(pool: &SqlitePool) -> Result<i64, ApiError> {
167 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0")
168 .fetch_one(pool)
169 .await
170 .map_err(super::db_err)?;
171 Ok(row.0)
172 }
173
174 #[cfg(test)]
175 mod tests {
176 use super::super::tests::*;
177 use super::super::{set_sync_state, MAX_CHANGELOG_ENTRIES};
178 use super::*;
179
180 // ── Initial snapshot ──
181
182 #[tokio::test]
183 async fn create_initial_snapshot_captures_all_tables() {
184 let pool = setup_test_db().await;
185
186 // Suppress triggers during data setup
187 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
188
189 let feed_id = create_test_feed(&pool, "Snapshot Feed").await;
190
191 sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'snap')")
192 .bind(&feed_id)
193 .execute(&pool)
194 .await
195 .unwrap();
196
197 sqlx::query("INSERT INTO user_config (key, value) VALUES ('pref', 'yes')")
198 .execute(&pool)
199 .await
200 .unwrap();
201
202 // Create a starred item (only starred/read items are snapshotted for feed_items)
203 let item_id = create_test_item(&pool, &feed_id, "ext-star").await;
204 sqlx::query("UPDATE feed_items SET is_starred = 1 WHERE id = ?")
205 .bind(&item_id)
206 .execute(&pool)
207 .await
208 .unwrap();
209
210 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
211 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
212
213 let total = create_initial_snapshot(&pool).await.unwrap();
214 assert_eq!(total, 4); // 1 feed + 1 feed_tag + 1 user_config + 1 starred item
215 }
216
217 // ── Changelog retention cap ──
218
219 #[tokio::test]
220 async fn enforce_changelog_retention_caps_entries() {
221 let pool = setup_test_db().await;
222 sqlx::query("DELETE FROM sync_changelog")
223 .execute(&pool)
224 .await
225 .unwrap();
226
227 // Insert more entries than the cap (use a smaller batch for test speed).
228 // We test with MAX_CHANGELOG_ENTRIES + 500 entries.
229 let total_to_insert = MAX_CHANGELOG_ENTRIES + 500;
230 for i in 0..total_to_insert {
231 sqlx::query(
232 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
233 VALUES ('feeds', 'UPDATE', ?, '{}')",
234 )
235 .bind(format!("row-{}", i))
236 .execute(&pool)
237 .await
238 .unwrap();
239 }
240
241 // Verify we have more than the cap
242 let (before,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
243 .fetch_one(&pool)
244 .await
245 .unwrap();
246 assert_eq!(before, total_to_insert);
247
248 // Enforce retention
249 let deleted = enforce_changelog_retention(&pool).await.unwrap();
250 assert_eq!(deleted, 500);
251
252 // Verify count is at the cap
253 let (after,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
254 .fetch_one(&pool)
255 .await
256 .unwrap();
257 assert_eq!(after, MAX_CHANGELOG_ENTRIES);
258
259 // The oldest entries (lowest rowids) should have been removed.
260 // The remaining entries should be the newest ones (row-500 through row-10499).
261 let (min_row_id,): (String,) = sqlx::query_as(
262 "SELECT row_id FROM sync_changelog ORDER BY id ASC LIMIT 1",
263 )
264 .fetch_one(&pool)
265 .await
266 .unwrap();
267 assert_eq!(min_row_id, "row-500");
268 }
269
270 #[tokio::test]
271 async fn enforce_changelog_retention_no_op_under_cap() {
272 let pool = setup_test_db().await;
273 sqlx::query("DELETE FROM sync_changelog")
274 .execute(&pool)
275 .await
276 .unwrap();
277
278 // Insert fewer entries than the cap
279 for i in 0..10 {
280 sqlx::query(
281 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
282 VALUES ('feeds', 'INSERT', ?, '{}')",
283 )
284 .bind(format!("row-{}", i))
285 .execute(&pool)
286 .await
287 .unwrap();
288 }
289
290 let deleted = enforce_changelog_retention(&pool).await.unwrap();
291 assert_eq!(deleted, 0);
292
293 let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
294 .fetch_one(&pool)
295 .await
296 .unwrap();
297 assert_eq!(count, 10);
298 }
299 }
300