Skip to main content

max / balanced_breakfast

12.0 KB · 367 lines History Blame Raw
1 //! Core sync engine: push local changes, pull remote changes, apply to DB.
2
3 mod download;
4 mod snapshot;
5 mod upload;
6
7 use chrono::Utc;
8 use serde::{Deserialize, Serialize};
9 use sqlx::SqlitePool;
10 use synckit_client::SyncKitClient;
11 use tracing::{info, instrument};
12 use uuid::Uuid;
13
14 use crate::commands::error::ApiError;
15
16 pub use download::*;
17 pub use snapshot::*;
18 pub use upload::*;
19
20 /// Convert a sqlx error to an ApiError, logging the full error server-side
21 /// and returning a generic message to the frontend. Mirrors the behavior of
22 /// `From<sqlx::Error> for ApiError` in commands/error.rs.
23 pub(crate) fn db_err(e: sqlx::Error) -> ApiError {
24 tracing::error!(error = %e, "Sync database error");
25 ApiError::database("A database error occurred")
26 }
27
28 /// Maximum changes to push in a single batch.
29 pub(crate) const PUSH_BATCH_LIMIT: i64 = 500;
30
31 /// Hard cap on total changelog entries. Prevents unbounded growth when sync is
32 /// disconnected or failing. Oldest entries (by rowid) are dropped when exceeded.
33 pub(crate) const MAX_CHANGELOG_ENTRIES: i64 = 10_000;
34
35 /// Tables in FK-safe order for upserts (parents first).
36 pub(crate) const UPSERT_ORDER: &[&str] = &["feeds", "feed_tags", "query_feeds", "bookmarks", "bookmark_tags", "user_config", "feed_items"];
37
38 /// Tables in reverse FK-safe order for deletes (children first).
39 pub(crate) const DELETE_ORDER: &[&str] = &["feed_items", "user_config", "bookmark_tags", "bookmarks", "query_feeds", "feed_tags", "feeds"];
40
41 /// Result of a sync operation.
42 #[derive(Debug, Serialize, Deserialize)]
43 #[serde(rename_all = "camelCase")]
44 pub struct SyncResult {
45 pub pushed: i64,
46 pub pulled: i64,
47 }
48
49 // ── sync_state helpers ──
50
51 #[instrument(skip_all)]
52 pub async fn get_sync_state(pool: &SqlitePool, key: &str) -> Result<String, ApiError> {
53 let row: Option<(String,)> =
54 sqlx::query_as("SELECT value FROM sync_state WHERE key = ?")
55 .bind(key)
56 .fetch_optional(pool)
57 .await
58 .map_err(db_err)?;
59
60 Ok(row.map(|r| r.0).unwrap_or_default())
61 }
62
63 #[instrument(skip_all)]
64 pub async fn set_sync_state(pool: &SqlitePool, key: &str, value: &str) -> Result<(), ApiError> {
65 sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES (?, ?)")
66 .bind(key)
67 .bind(value)
68 .execute(pool)
69 .await
70 .map_err(db_err)?;
71 Ok(())
72 }
73
74 /// Clear all sync state (device_id, cursors, flags) for disconnect.
75 #[instrument(skip_all)]
76 pub async fn clear_all_sync_state(pool: &SqlitePool) -> Result<(), ApiError> {
77 sqlx::query("DELETE FROM sync_state")
78 .execute(pool)
79 .await
80 .map_err(db_err)?;
81 sqlx::query("DELETE FROM sync_changelog")
82 .execute(pool)
83 .await
84 .map_err(db_err)?;
85 Ok(())
86 }
87
88 // ── Device registration ──
89
90 #[instrument(skip_all)]
91 pub async fn ensure_device_registered(
92 pool: &SqlitePool,
93 client: &SyncKitClient,
94 ) -> Result<Uuid, ApiError> {
95 let stored = get_sync_state(pool, "device_id").await?;
96 if !stored.is_empty() {
97 return stored
98 .parse::<Uuid>()
99 .map_err(|e| ApiError::internal(format!("invalid stored device_id: {}", e)));
100 }
101
102 let hostname = std::env::var("HOSTNAME")
103 .or_else(|_| std::env::var("COMPUTERNAME"))
104 .unwrap_or_else(|_| "BalancedBreakfast Desktop".to_string());
105
106 let platform = std::env::consts::OS.to_string();
107
108 let device = client
109 .register_device(&hostname, &platform)
110 .await
111 .map_err(|e| ApiError::internal(format!("device registration failed: {}", e)))?;
112
113 set_sync_state(pool, "device_id", &device.id.to_string()).await?;
114 info!(device_name = %device.device_name, device_id = %device.id, "Registered device");
115
116 Ok(device.id)
117 }
118
119 // ── High-level sync ──
120
121 #[instrument(skip_all)]
122 pub async fn perform_sync(pool: &SqlitePool, client: &SyncKitClient) -> Result<SyncResult, ApiError> {
123 // Clear applying_remote flag in case a previous sync crashed mid-apply.
124 // If the flag is stuck at "1", all local changes silently skip the changelog.
125 set_sync_state(pool, "applying_remote", "0").await?;
126
127 let device_id = ensure_device_registered(pool, client).await?;
128
129 // Push first, then pull
130 let pushed = push_changes(pool, client, device_id).await?;
131 let pulled = pull_changes(pool, client, device_id).await?;
132
133 // Update last sync timestamp
134 let now = Utc::now().to_rfc3339();
135 set_sync_state(pool, "last_sync_at", &now).await?;
136
137 Ok(SyncResult { pushed, pulled })
138 }
139
140 #[cfg(test)]
141 mod tests {
142 use super::*;
143 use uuid::Uuid;
144
145 pub(crate) async fn setup_test_db() -> SqlitePool {
146 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
147 sqlx::migrate!("../migrations/sqlite")
148 .run(&pool)
149 .await
150 .unwrap();
151 pool
152 }
153
154 pub(crate) async fn create_test_feed(pool: &SqlitePool, name: &str) -> String {
155 let id = Uuid::new_v4().to_string();
156 sqlx::query(
157 "INSERT INTO feeds (id, busser_id, name, config, enabled, created_at, updated_at, consecutive_failures) \
158 VALUES (?, 'rss', ?, '{}', 1, datetime('now'), datetime('now'), 0)"
159 )
160 .bind(&id)
161 .bind(name)
162 .execute(pool)
163 .await
164 .unwrap();
165 id
166 }
167
168 pub(crate) async fn create_test_item(pool: &SqlitePool, feed_id: &str, external_id: &str) -> String {
169 let id = Uuid::new_v4().to_string();
170 sqlx::query(
171 "INSERT INTO feed_items (id, external_id, feed_id, busser_id, bite_author, bite_text, \
172 title, published_at, fetched_at, source_name, media, tags, created_at, updated_at) \
173 VALUES (?, ?, ?, 'rss', 'author', 'text', 'title', datetime('now'), datetime('now'), \
174 'Test', '[]', '[]', datetime('now'), datetime('now'))"
175 )
176 .bind(&id)
177 .bind(external_id)
178 .bind(feed_id)
179 .execute(pool)
180 .await
181 .unwrap();
182 id
183 }
184
185 // ── FK ordering ──
186
187 #[test]
188 fn upsert_order_parents_before_children() {
189 let pos = |t: &str| UPSERT_ORDER.iter().position(|x| *x == t).unwrap();
190 assert!(pos("feeds") < pos("feed_tags"));
191 assert!(pos("feeds") < pos("feed_items"));
192 }
193
194 #[test]
195 fn delete_order_children_before_parents() {
196 let pos = |t: &str| DELETE_ORDER.iter().position(|x| *x == t).unwrap();
197 assert!(pos("feed_tags") < pos("feeds"));
198 assert!(pos("feed_items") < pos("feeds"));
199 }
200
201 #[test]
202 fn orders_are_exact_reverses() {
203 let reversed: Vec<&str> = UPSERT_ORDER.iter().rev().copied().collect();
204 assert_eq!(reversed, DELETE_ORDER);
205 }
206
207 // ── Column whitelists ──
208
209 #[test]
210 fn all_tables_have_column_whitelists() {
211 for table in UPSERT_ORDER {
212 assert!(
213 super::download::table_columns(table).is_some(),
214 "missing column whitelist for: {}", table
215 );
216 }
217 }
218
219 #[test]
220 fn unknown_table_returns_none() {
221 assert!(super::download::table_columns("nonexistent").is_none());
222 assert!(super::download::table_columns("busser_state").is_none());
223 }
224
225 // ── Triggers ──
226
227 #[tokio::test]
228 async fn feed_insert_fires_trigger() {
229 let pool = setup_test_db().await;
230 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
231
232 let feed_id = create_test_feed(&pool, "Test Feed").await;
233
234 let count: (i64,) = sqlx::query_as(
235 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'feeds' AND op = 'INSERT'"
236 )
237 .fetch_one(&pool)
238 .await
239 .unwrap();
240 assert_eq!(count.0, 1);
241
242 // Verify 12-column JSON data (includes circuit_broken)
243 let data: (String,) = sqlx::query_as(
244 "SELECT data FROM sync_changelog WHERE table_name = 'feeds' AND row_id = ?"
245 )
246 .bind(&feed_id)
247 .fetch_one(&pool)
248 .await
249 .unwrap();
250 let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap();
251 let obj = parsed.as_object().unwrap();
252 assert_eq!(obj.len(), 12);
253 assert_eq!(parsed["name"], "Test Feed");
254 }
255
256 #[tokio::test]
257 async fn feed_tags_insert_fires_trigger() {
258 let pool = setup_test_db().await;
259 let feed_id = create_test_feed(&pool, "Tagged Feed").await;
260 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
261
262 sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'tech')")
263 .bind(&feed_id)
264 .execute(&pool)
265 .await
266 .unwrap();
267
268 let row: (String, String) = sqlx::query_as(
269 "SELECT row_id, data FROM sync_changelog WHERE table_name = 'feed_tags'"
270 )
271 .fetch_one(&pool)
272 .await
273 .unwrap();
274 assert_eq!(row.0, format!("{}:tech", feed_id));
275
276 let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap();
277 assert_eq!(parsed["feed_id"], feed_id);
278 assert_eq!(parsed["tag"], "tech");
279 }
280
281 #[tokio::test]
282 async fn user_config_insert_fires_trigger() {
283 let pool = setup_test_db().await;
284 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
285
286 sqlx::query("INSERT INTO user_config (key, value) VALUES ('theme', 'dark')")
287 .execute(&pool)
288 .await
289 .unwrap();
290
291 let row: (String, String) = sqlx::query_as(
292 "SELECT row_id, data FROM sync_changelog WHERE table_name = 'user_config'"
293 )
294 .fetch_one(&pool)
295 .await
296 .unwrap();
297 assert_eq!(row.0, "theme");
298
299 let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap();
300 assert_eq!(parsed["key"], "theme");
301 assert_eq!(parsed["value"], "dark");
302 }
303
304 #[tokio::test]
305 async fn feed_items_update_fires_trigger() {
306 let pool = setup_test_db().await;
307 let feed_id = create_test_feed(&pool, "Items Feed").await;
308 let item_id = create_test_item(&pool, &feed_id, "ext-1").await;
309 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
310
311 // Update is_read — should fire trigger
312 sqlx::query("UPDATE feed_items SET is_read = 1 WHERE id = ?")
313 .bind(&item_id)
314 .execute(&pool)
315 .await
316 .unwrap();
317
318 let count: (i64,) = sqlx::query_as(
319 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'feed_items' AND op = 'UPDATE'"
320 )
321 .fetch_one(&pool)
322 .await
323 .unwrap();
324 assert_eq!(count.0, 1);
325
326 // Data should only have id, is_read, is_starred
327 let data: (String,) = sqlx::query_as(
328 "SELECT data FROM sync_changelog WHERE table_name = 'feed_items'"
329 )
330 .fetch_one(&pool)
331 .await
332 .unwrap();
333 let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap();
334 let obj = parsed.as_object().unwrap();
335 assert_eq!(obj.len(), 3);
336 assert!(obj.contains_key("id"));
337 assert!(obj.contains_key("is_read"));
338 assert!(obj.contains_key("is_starred"));
339 }
340
341 // ── Trigger suppression ──
342
343 #[tokio::test]
344 async fn trigger_suppression_during_remote_apply() {
345 let pool = setup_test_db().await;
346 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
347
348 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
349 let _id = create_test_feed(&pool, "Suppressed").await;
350
351 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
352 .fetch_one(&pool)
353 .await
354 .unwrap();
355 assert_eq!(count.0, 0, "triggers should be suppressed");
356
357 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
358 let _id = create_test_feed(&pool, "Not Suppressed").await;
359
360 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
361 .fetch_one(&pool)
362 .await
363 .unwrap();
364 assert_eq!(count.0, 1, "trigger should fire normally");
365 }
366 }
367