| 1 |
|
| 2 |
|
| 3 |
mod download; |
| 4 |
mod snapshot; |
| 5 |
mod upload; |
| 6 |
|
| 7 |
use chrono::Utc; |
| 8 |
use serde::{Deserialize, Serialize}; |
| 9 |
use sqlx::SqlitePool; |
| 10 |
use synckit_client::SyncKitClient; |
| 11 |
use tracing::{info, instrument}; |
| 12 |
use uuid::Uuid; |
| 13 |
|
| 14 |
use crate::commands::error::ApiError; |
| 15 |
|
| 16 |
pub use download::*; |
| 17 |
pub use snapshot::*; |
| 18 |
pub use upload::*; |
| 19 |
|
| 20 |
|
| 21 |
|
| 22 |
|
| 23 |
pub(crate) fn db_err(e: sqlx::Error) -> ApiError { |
| 24 |
tracing::error!(error = %e, "Sync database error"); |
| 25 |
ApiError::database("A database error occurred") |
| 26 |
} |
| 27 |
|
| 28 |
|
| 29 |
pub(crate) const PUSH_BATCH_LIMIT: i64 = 500; |
| 30 |
|
| 31 |
|
| 32 |
|
| 33 |
pub(crate) const MAX_CHANGELOG_ENTRIES: i64 = 10_000; |
| 34 |
|
| 35 |
|
| 36 |
pub(crate) const UPSERT_ORDER: &[&str] = &["feeds", "feed_tags", "query_feeds", "bookmarks", "bookmark_tags", "user_config", "feed_items"]; |
| 37 |
|
| 38 |
|
| 39 |
pub(crate) const DELETE_ORDER: &[&str] = &["feed_items", "user_config", "bookmark_tags", "bookmarks", "query_feeds", "feed_tags", "feeds"]; |
| 40 |
|
| 41 |
|
| 42 |
#[derive(Debug, Serialize, Deserialize)] |
| 43 |
#[serde(rename_all = "camelCase")] |
| 44 |
pub struct SyncResult { |
| 45 |
pub pushed: i64, |
| 46 |
pub pulled: i64, |
| 47 |
} |
| 48 |
|
| 49 |
|
| 50 |
|
| 51 |
#[instrument(skip_all)] |
| 52 |
pub async fn get_sync_state(pool: &SqlitePool, key: &str) -> Result<String, ApiError> { |
| 53 |
let row: Option<(String,)> = |
| 54 |
sqlx::query_as("SELECT value FROM sync_state WHERE key = ?") |
| 55 |
.bind(key) |
| 56 |
.fetch_optional(pool) |
| 57 |
.await |
| 58 |
.map_err(db_err)?; |
| 59 |
|
| 60 |
Ok(row.map(|r| r.0).unwrap_or_default()) |
| 61 |
} |
| 62 |
|
| 63 |
#[instrument(skip_all)] |
| 64 |
pub async fn set_sync_state(pool: &SqlitePool, key: &str, value: &str) -> Result<(), ApiError> { |
| 65 |
sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES (?, ?)") |
| 66 |
.bind(key) |
| 67 |
.bind(value) |
| 68 |
.execute(pool) |
| 69 |
.await |
| 70 |
.map_err(db_err)?; |
| 71 |
Ok(()) |
| 72 |
} |
| 73 |
|
| 74 |
|
| 75 |
#[instrument(skip_all)] |
| 76 |
pub async fn clear_all_sync_state(pool: &SqlitePool) -> Result<(), ApiError> { |
| 77 |
sqlx::query("DELETE FROM sync_state") |
| 78 |
.execute(pool) |
| 79 |
.await |
| 80 |
.map_err(db_err)?; |
| 81 |
sqlx::query("DELETE FROM sync_changelog") |
| 82 |
.execute(pool) |
| 83 |
.await |
| 84 |
.map_err(db_err)?; |
| 85 |
Ok(()) |
| 86 |
} |
| 87 |
|
| 88 |
|
| 89 |
|
| 90 |
#[instrument(skip_all)] |
| 91 |
pub async fn ensure_device_registered( |
| 92 |
pool: &SqlitePool, |
| 93 |
client: &SyncKitClient, |
| 94 |
) -> Result<Uuid, ApiError> { |
| 95 |
let stored = get_sync_state(pool, "device_id").await?; |
| 96 |
if !stored.is_empty() { |
| 97 |
return stored |
| 98 |
.parse::<Uuid>() |
| 99 |
.map_err(|e| ApiError::internal(format!("invalid stored device_id: {}", e))); |
| 100 |
} |
| 101 |
|
| 102 |
let hostname = std::env::var("HOSTNAME") |
| 103 |
.or_else(|_| std::env::var("COMPUTERNAME")) |
| 104 |
.unwrap_or_else(|_| "BalancedBreakfast Desktop".to_string()); |
| 105 |
|
| 106 |
let platform = std::env::consts::OS.to_string(); |
| 107 |
|
| 108 |
let device = client |
| 109 |
.register_device(&hostname, &platform) |
| 110 |
.await |
| 111 |
.map_err(|e| ApiError::internal(format!("device registration failed: {}", e)))?; |
| 112 |
|
| 113 |
set_sync_state(pool, "device_id", &device.id.to_string()).await?; |
| 114 |
info!(device_name = %device.device_name, device_id = %device.id, "Registered device"); |
| 115 |
|
| 116 |
Ok(device.id) |
| 117 |
} |
| 118 |
|
| 119 |
|
| 120 |
|
| 121 |
#[instrument(skip_all)] |
| 122 |
pub async fn perform_sync(pool: &SqlitePool, client: &SyncKitClient) -> Result<SyncResult, ApiError> { |
| 123 |
|
| 124 |
|
| 125 |
set_sync_state(pool, "applying_remote", "0").await?; |
| 126 |
|
| 127 |
let device_id = ensure_device_registered(pool, client).await?; |
| 128 |
|
| 129 |
|
| 130 |
let pushed = push_changes(pool, client, device_id).await?; |
| 131 |
let pulled = pull_changes(pool, client, device_id).await?; |
| 132 |
|
| 133 |
|
| 134 |
let now = Utc::now().to_rfc3339(); |
| 135 |
set_sync_state(pool, "last_sync_at", &now).await?; |
| 136 |
|
| 137 |
Ok(SyncResult { pushed, pulled }) |
| 138 |
} |
| 139 |
|
| 140 |
#[cfg(test)] |
| 141 |
mod tests { |
| 142 |
use super::*; |
| 143 |
use uuid::Uuid; |
| 144 |
|
| 145 |
pub(crate) async fn setup_test_db() -> SqlitePool { |
| 146 |
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); |
| 147 |
sqlx::migrate!("../migrations/sqlite") |
| 148 |
.run(&pool) |
| 149 |
.await |
| 150 |
.unwrap(); |
| 151 |
pool |
| 152 |
} |
| 153 |
|
| 154 |
pub(crate) async fn create_test_feed(pool: &SqlitePool, name: &str) -> String { |
| 155 |
let id = Uuid::new_v4().to_string(); |
| 156 |
sqlx::query( |
| 157 |
"INSERT INTO feeds (id, busser_id, name, config, enabled, created_at, updated_at, consecutive_failures) \ |
| 158 |
VALUES (?, 'rss', ?, '{}', 1, datetime('now'), datetime('now'), 0)" |
| 159 |
) |
| 160 |
.bind(&id) |
| 161 |
.bind(name) |
| 162 |
.execute(pool) |
| 163 |
.await |
| 164 |
.unwrap(); |
| 165 |
id |
| 166 |
} |
| 167 |
|
| 168 |
pub(crate) async fn create_test_item(pool: &SqlitePool, feed_id: &str, external_id: &str) -> String { |
| 169 |
let id = Uuid::new_v4().to_string(); |
| 170 |
sqlx::query( |
| 171 |
"INSERT INTO feed_items (id, external_id, feed_id, busser_id, bite_author, bite_text, \ |
| 172 |
title, published_at, fetched_at, source_name, media, tags, created_at, updated_at) \ |
| 173 |
VALUES (?, ?, ?, 'rss', 'author', 'text', 'title', datetime('now'), datetime('now'), \ |
| 174 |
'Test', '[]', '[]', datetime('now'), datetime('now'))" |
| 175 |
) |
| 176 |
.bind(&id) |
| 177 |
.bind(external_id) |
| 178 |
.bind(feed_id) |
| 179 |
.execute(pool) |
| 180 |
.await |
| 181 |
.unwrap(); |
| 182 |
id |
| 183 |
} |
| 184 |
|
| 185 |
|
| 186 |
|
| 187 |
#[test] |
| 188 |
fn upsert_order_parents_before_children() { |
| 189 |
let pos = |t: &str| UPSERT_ORDER.iter().position(|x| *x == t).unwrap(); |
| 190 |
assert!(pos("feeds") < pos("feed_tags")); |
| 191 |
assert!(pos("feeds") < pos("feed_items")); |
| 192 |
} |
| 193 |
|
| 194 |
#[test] |
| 195 |
fn delete_order_children_before_parents() { |
| 196 |
let pos = |t: &str| DELETE_ORDER.iter().position(|x| *x == t).unwrap(); |
| 197 |
assert!(pos("feed_tags") < pos("feeds")); |
| 198 |
assert!(pos("feed_items") < pos("feeds")); |
| 199 |
} |
| 200 |
|
| 201 |
#[test] |
| 202 |
fn orders_are_exact_reverses() { |
| 203 |
let reversed: Vec<&str> = UPSERT_ORDER.iter().rev().copied().collect(); |
| 204 |
assert_eq!(reversed, DELETE_ORDER); |
| 205 |
} |
| 206 |
|
| 207 |
|
| 208 |
|
| 209 |
#[test] |
| 210 |
fn all_tables_have_column_whitelists() { |
| 211 |
for table in UPSERT_ORDER { |
| 212 |
assert!( |
| 213 |
super::download::table_columns(table).is_some(), |
| 214 |
"missing column whitelist for: {}", table |
| 215 |
); |
| 216 |
} |
| 217 |
} |
| 218 |
|
| 219 |
#[test] |
| 220 |
fn unknown_table_returns_none() { |
| 221 |
assert!(super::download::table_columns("nonexistent").is_none()); |
| 222 |
assert!(super::download::table_columns("busser_state").is_none()); |
| 223 |
} |
| 224 |
|
| 225 |
|
| 226 |
|
| 227 |
#[tokio::test] |
| 228 |
async fn feed_insert_fires_trigger() { |
| 229 |
let pool = setup_test_db().await; |
| 230 |
sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); |
| 231 |
|
| 232 |
let feed_id = create_test_feed(&pool, "Test Feed").await; |
| 233 |
|
| 234 |
let count: (i64,) = sqlx::query_as( |
| 235 |
"SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'feeds' AND op = 'INSERT'" |
| 236 |
) |
| 237 |
.fetch_one(&pool) |
| 238 |
.await |
| 239 |
.unwrap(); |
| 240 |
assert_eq!(count.0, 1); |
| 241 |
|
| 242 |
|
| 243 |
let data: (String,) = sqlx::query_as( |
| 244 |
"SELECT data FROM sync_changelog WHERE table_name = 'feeds' AND row_id = ?" |
| 245 |
) |
| 246 |
.bind(&feed_id) |
| 247 |
.fetch_one(&pool) |
| 248 |
.await |
| 249 |
.unwrap(); |
| 250 |
let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap(); |
| 251 |
let obj = parsed.as_object().unwrap(); |
| 252 |
assert_eq!(obj.len(), 12); |
| 253 |
assert_eq!(parsed["name"], "Test Feed"); |
| 254 |
} |
| 255 |
|
| 256 |
#[tokio::test] |
| 257 |
async fn feed_tags_insert_fires_trigger() { |
| 258 |
let pool = setup_test_db().await; |
| 259 |
let feed_id = create_test_feed(&pool, "Tagged Feed").await; |
| 260 |
sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); |
| 261 |
|
| 262 |
sqlx::query("INSERT INTO feed_tags (feed_id, tag) VALUES (?, 'tech')") |
| 263 |
.bind(&feed_id) |
| 264 |
.execute(&pool) |
| 265 |
.await |
| 266 |
.unwrap(); |
| 267 |
|
| 268 |
let row: (String, String) = sqlx::query_as( |
| 269 |
"SELECT row_id, data FROM sync_changelog WHERE table_name = 'feed_tags'" |
| 270 |
) |
| 271 |
.fetch_one(&pool) |
| 272 |
.await |
| 273 |
.unwrap(); |
| 274 |
assert_eq!(row.0, format!("{}:tech", feed_id)); |
| 275 |
|
| 276 |
let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap(); |
| 277 |
assert_eq!(parsed["feed_id"], feed_id); |
| 278 |
assert_eq!(parsed["tag"], "tech"); |
| 279 |
} |
| 280 |
|
| 281 |
#[tokio::test] |
| 282 |
async fn user_config_insert_fires_trigger() { |
| 283 |
let pool = setup_test_db().await; |
| 284 |
sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); |
| 285 |
|
| 286 |
sqlx::query("INSERT INTO user_config (key, value) VALUES ('theme', 'dark')") |
| 287 |
.execute(&pool) |
| 288 |
.await |
| 289 |
.unwrap(); |
| 290 |
|
| 291 |
let row: (String, String) = sqlx::query_as( |
| 292 |
"SELECT row_id, data FROM sync_changelog WHERE table_name = 'user_config'" |
| 293 |
) |
| 294 |
.fetch_one(&pool) |
| 295 |
.await |
| 296 |
.unwrap(); |
| 297 |
assert_eq!(row.0, "theme"); |
| 298 |
|
| 299 |
let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap(); |
| 300 |
assert_eq!(parsed["key"], "theme"); |
| 301 |
assert_eq!(parsed["value"], "dark"); |
| 302 |
} |
| 303 |
|
| 304 |
#[tokio::test] |
| 305 |
async fn feed_items_update_fires_trigger() { |
| 306 |
let pool = setup_test_db().await; |
| 307 |
let feed_id = create_test_feed(&pool, "Items Feed").await; |
| 308 |
let item_id = create_test_item(&pool, &feed_id, "ext-1").await; |
| 309 |
sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); |
| 310 |
|
| 311 |
|
| 312 |
sqlx::query("UPDATE feed_items SET is_read = 1 WHERE id = ?") |
| 313 |
.bind(&item_id) |
| 314 |
.execute(&pool) |
| 315 |
.await |
| 316 |
.unwrap(); |
| 317 |
|
| 318 |
let count: (i64,) = sqlx::query_as( |
| 319 |
"SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'feed_items' AND op = 'UPDATE'" |
| 320 |
) |
| 321 |
.fetch_one(&pool) |
| 322 |
.await |
| 323 |
.unwrap(); |
| 324 |
assert_eq!(count.0, 1); |
| 325 |
|
| 326 |
|
| 327 |
let data: (String,) = sqlx::query_as( |
| 328 |
"SELECT data FROM sync_changelog WHERE table_name = 'feed_items'" |
| 329 |
) |
| 330 |
.fetch_one(&pool) |
| 331 |
.await |
| 332 |
.unwrap(); |
| 333 |
let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap(); |
| 334 |
let obj = parsed.as_object().unwrap(); |
| 335 |
assert_eq!(obj.len(), 3); |
| 336 |
assert!(obj.contains_key("id")); |
| 337 |
assert!(obj.contains_key("is_read")); |
| 338 |
assert!(obj.contains_key("is_starred")); |
| 339 |
} |
| 340 |
|
| 341 |
|
| 342 |
|
| 343 |
#[tokio::test] |
| 344 |
async fn trigger_suppression_during_remote_apply() { |
| 345 |
let pool = setup_test_db().await; |
| 346 |
sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); |
| 347 |
|
| 348 |
set_sync_state(&pool, "applying_remote", "1").await.unwrap(); |
| 349 |
let _id = create_test_feed(&pool, "Suppressed").await; |
| 350 |
|
| 351 |
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") |
| 352 |
.fetch_one(&pool) |
| 353 |
.await |
| 354 |
.unwrap(); |
| 355 |
assert_eq!(count.0, 0, "triggers should be suppressed"); |
| 356 |
|
| 357 |
set_sync_state(&pool, "applying_remote", "0").await.unwrap(); |
| 358 |
let _id = create_test_feed(&pool, "Not Suppressed").await; |
| 359 |
|
| 360 |
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") |
| 361 |
.fetch_one(&pool) |
| 362 |
.await |
| 363 |
.unwrap(); |
| 364 |
assert_eq!(count.0, 1, "trigger should fire normally"); |
| 365 |
} |
| 366 |
} |
| 367 |
|