Skip to main content

max / balanced_breakfast

Harden security, sync, and query subsystems Code fuzz remediation: crypto key zeroing, sync mutex, download validation, bookmark XSS fix, path traversal fix, open-file safety, query filter accuracy, and regex precompilation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-02 23:13 UTC
Commit: 15c8da3f1ccb8e18d075c3d82f8e8ddf99c6315a
Parent: 41f9224
22 files changed, +567 insertions, -162 deletions
@@ -14,40 +14,61 @@ use base64::Engine;
14 14 use bb_interface::{ConfigFieldType, ConfigSchema};
15 15 use rand::RngCore;
16 16 use std::path::Path;
17 + use zeroize::Zeroizing;
17 18
18 19 const PREFIX: &str = "bb_enc:v1:";
19 20
21 + /// A 256-bit encryption key that is zeroed from memory on drop.
22 + pub type EncryptionKey = Zeroizing<[u8; 32]>;
23 +
20 24 /// Generate a random 256-bit encryption key.
21 - fn generate_key() -> [u8; 32] {
22 - let mut key = [0u8; 32];
23 - rand::thread_rng().fill_bytes(&mut key);
25 + fn generate_key() -> EncryptionKey {
26 + let mut key = Zeroizing::new([0u8; 32]);
27 + rand::thread_rng().fill_bytes(key.as_mut());
24 28 key
25 29 }
26 30
27 31 /// Load an encryption key from a file, or generate and save one if it doesn't exist.
28 - fn load_or_create_key(path: &Path) -> Result<[u8; 32], String> {
29 - if path.exists() {
30 - let data = std::fs::read(path).map_err(|e| format!("Failed to read encryption key: {e}"))?;
31 - if data.len() != 32 {
32 - return Err(format!(
33 - "Encryption key file has wrong size: {} bytes (expected 32)",
34 - data.len()
35 - ));
32 + ///
33 + /// Uses `create_new(true)` to atomically create the file, preventing a TOCTOU race
34 + /// where two processes could both generate different keys and one overwrites the other.
35 + fn load_or_create_key(path: &Path) -> Result<EncryptionKey, String> {
36 + use std::io::Write;
37 +
38 + // Try to create the file exclusively first (atomic check-and-create).
39 + match std::fs::OpenOptions::new()
40 + .write(true)
41 + .create_new(true)
42 + .open(path)
43 + {
44 + Ok(mut file) => {
45 + let key = generate_key();
46 + file.write_all(&*key)
47 + .map_err(|e| format!("Failed to write encryption key: {e}"))?;
48 + #[cfg(unix)]
49 + {
50 + use std::os::unix::fs::PermissionsExt;
51 + let perms = std::fs::Permissions::from_mode(0o600);
52 + std::fs::set_permissions(path, perms)
53 + .map_err(|e| format!("Failed to set key file permissions: {e}"))?;
54 + }
55 + Ok(key)
36 56 }
37 - let mut key = [0u8; 32];
38 - key.copy_from_slice(&data);
39 - Ok(key)
40 - } else {
41 - let key = generate_key();
42 - std::fs::write(path, key).map_err(|e| format!("Failed to write encryption key: {e}"))?;
43 - #[cfg(unix)]
44 - {
45 - use std::os::unix::fs::PermissionsExt;
46 - let perms = std::fs::Permissions::from_mode(0o600);
47 - std::fs::set_permissions(path, perms)
48 - .map_err(|e| format!("Failed to set key file permissions: {e}"))?;
57 + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
58 + // File already exists — read it.
59 + let data =
60 + std::fs::read(path).map_err(|e| format!("Failed to read encryption key: {e}"))?;
61 + if data.len() != 32 {
62 + return Err(format!(
63 + "Encryption key file has wrong size: {} bytes (expected 32)",
64 + data.len()
65 + ));
66 + }
67 + let mut key = Zeroizing::new([0u8; 32]);
68 + key.copy_from_slice(&data);
69 + Ok(key)
49 70 }
50 - Ok(key)
71 + Err(e) => Err(format!("Failed to create encryption key file: {e}")),
51 72 }
52 73 }
53 74
@@ -56,7 +77,7 @@ const KEYCHAIN_KEY: &str = "encryption:master";
56 77
57 78 #[tracing::instrument(skip_all)]
58 79 /// Load an encryption key from the OS keychain, falling back to file-based storage
59 - pub fn load_or_create_key_from_keychain(file_path: &Path) -> Result<[u8; 32], String> {
80 + pub fn load_or_create_key_from_keychain(file_path: &Path) -> Result<EncryptionKey, String> {
60 81 // Try keychain first
61 82 if let Ok(entry) = keyring::Entry::new(KEYCHAIN_SERVICE, KEYCHAIN_KEY) {
62 83 match entry.get_password() {
@@ -65,7 +86,7 @@ pub fn load_or_create_key_from_keychain(file_path: &Path) -> Result<[u8; 32], St
65 86 if bytes.len() != 32 {
66 87 return Err(format!("Keychain key wrong size: {} (expected 32)", bytes.len()));
67 88 }
68 - let mut key = [0u8; 32];
89 + let mut key = Zeroizing::new([0u8; 32]);
69 90 key.copy_from_slice(&bytes);
70 91 return Ok(key);
71 92 }
@@ -74,19 +95,27 @@ pub fn load_or_create_key_from_keychain(file_path: &Path) -> Result<[u8; 32], St
74 95 if file_path.exists() {
75 96 let key = load_or_create_key(file_path)?;
76 97 // Migrate to keychain
77 - let b64 = BASE64.encode(key);
98 + let b64 = BASE64.encode(&*key);
78 99 if entry.set_password(&b64).is_ok() {
79 - // Delete the file now that it's in the keychain
80 - if let Err(e) = std::fs::remove_file(file_path) {
81 - tracing::warn!(error = %e, path = %file_path.display(), "Failed to delete encryption key file after keychain migration");
100 + // Read back and verify before deleting the file fallback
101 + if let Ok(readback) = entry.get_password() {
102 + if readback == b64 {
103 + if let Err(e) = std::fs::remove_file(file_path) {
104 + tracing::warn!(error = %e, path = %file_path.display(), "Failed to delete encryption key file after keychain migration");
105 + }
106 + tracing::info!("Migrated encryption key from file to keychain");
107 + } else {
108 + tracing::warn!("Keychain read-back mismatch, keeping file fallback");
109 + }
110 + } else {
111 + tracing::warn!("Keychain read-back failed, keeping file fallback");
82 112 }
83 - tracing::info!("Migrated encryption key from file to keychain");
84 113 }
85 114 return Ok(key);
86 115 }
87 116 // No file either — generate new key and store in keychain
88 117 let key = generate_key();
89 - let b64 = BASE64.encode(key);
118 + let b64 = BASE64.encode(&*key);
90 119 if entry.set_password(&b64).is_ok() {
91 120 return Ok(key);
92 121 }
@@ -106,8 +135,8 @@ pub fn load_or_create_key_from_keychain(file_path: &Path) -> Result<[u8; 32], St
106 135
107 136 #[tracing::instrument(skip_all)]
108 137 /// Encrypt a plaintext string field using AES-256-GCM
109 - pub fn encrypt_field(plaintext: &str, key: &[u8; 32]) -> Result<String, String> {
110 - let cipher = Aes256Gcm::new(key.into());
138 + pub fn encrypt_field(plaintext: &str, key: &EncryptionKey) -> Result<String, String> {
139 + let cipher = Aes256Gcm::new((&**key).into());
111 140 let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
112 141 let ciphertext = cipher
113 142 .encrypt(&nonce, plaintext.as_bytes())
@@ -123,7 +152,7 @@ pub fn encrypt_field(plaintext: &str, key: &[u8; 32]) -> Result<String, String>
123 152
124 153 #[tracing::instrument(skip_all)]
125 154 /// Decrypt a field value, passing through plaintext values without the `bb_enc:v1:` prefix
126 - pub fn decrypt_field(value: &str, key: &[u8; 32]) -> Result<String, String> {
155 + pub fn decrypt_field(value: &str, key: &EncryptionKey) -> Result<String, String> {
127 156 let Some(encoded) = value.strip_prefix(PREFIX) else {
128 157 return Ok(value.to_string());
129 158 };
@@ -138,7 +167,7 @@ pub fn decrypt_field(value: &str, key: &[u8; 32]) -> Result<String, String> {
138 167
139 168 let (nonce_bytes, ciphertext) = payload.split_at(12);
140 169 let nonce = aes_gcm::Nonce::from_slice(nonce_bytes);
141 - let cipher = Aes256Gcm::new(key.into());
170 + let cipher = Aes256Gcm::new((&**key).into());
142 171
143 172 let plaintext = cipher
144 173 .decrypt(nonce, ciphertext)
@@ -152,7 +181,7 @@ pub fn decrypt_field(value: &str, key: &[u8; 32]) -> Result<String, String> {
152 181 pub fn encrypt_config_secrets(
153 182 config: &mut serde_json::Value,
154 183 schema: &ConfigSchema,
155 - key: &[u8; 32],
184 + key: &EncryptionKey,
156 185 ) {
157 186 let Some(obj) = config.as_object_mut() else {
158 187 return;
@@ -181,7 +210,7 @@ pub fn encrypt_config_secrets(
181 210 pub fn decrypt_config_secrets(
182 211 config: &mut serde_json::Value,
183 212 schema: &ConfigSchema,
184 - key: &[u8; 32],
213 + key: &EncryptionKey,
185 214 ) {
186 215 let Some(obj) = config.as_object_mut() else {
187 216 return;
@@ -197,7 +226,8 @@ pub fn decrypt_config_secrets(
197 226 obj.insert(field.key.clone(), serde_json::Value::String(decrypted));
198 227 }
199 228 Err(e) => {
200 - tracing::warn!(field = %field.key, error = %e, "Failed to decrypt secret, ciphertext passed through");
229 + tracing::warn!(field = %field.key, error = %e, "Failed to decrypt secret, clearing field to prevent ciphertext leakage");
230 + obj.insert(field.key.clone(), serde_json::Value::String(String::new()));
201 231 }
202 232 }
203 233 }
@@ -60,7 +60,8 @@ pub struct Orchestrator {
60 60 db: Database,
61 61 plugins: Arc<RwLock<PluginManager>>,
62 62 /// AES-256-GCM key for encrypting/decrypting plugin secrets at rest.
63 - encryption_key: Option<[u8; 32]>,
63 + /// Wrapped in `Zeroizing` so key material is zeroed from memory on drop.
64 + encryption_key: Option<crate::crypto::EncryptionKey>,
64 65 }
65 66
66 67 impl Orchestrator {
@@ -342,13 +343,13 @@ impl Orchestrator {
342 343
343 344 /// Set the encryption key for Secret-field encryption at rest.
344 345 #[tracing::instrument(skip_all)]
345 - pub fn set_encryption_key(&mut self, key: [u8; 32]) {
346 + pub fn set_encryption_key(&mut self, key: crate::crypto::EncryptionKey) {
346 347 self.encryption_key = Some(key);
347 348 }
348 349
349 350 /// Get the encryption key, if set.
350 351 #[tracing::instrument(skip_all)]
351 - pub fn encryption_key(&self) -> Option<&[u8; 32]> {
352 + pub fn encryption_key(&self) -> Option<&crate::crypto::EncryptionKey> {
352 353 self.encryption_key.as_ref()
353 354 }
354 355
@@ -386,8 +387,13 @@ impl Orchestrator {
386 387 crate::crypto::encrypt_config_secrets(&mut config, &schema, key);
387 388
388 389 // Update the feed's config in the database
389 - let config_str =
390 - serde_json::to_string(&config).unwrap_or_else(|_| "{}".to_string());
390 + let config_str = match serde_json::to_string(&config) {
391 + Ok(s) => s,
392 + Err(e) => {
393 + error!(feed_name = %feed.name, error = %e, "Failed to serialize config after encryption, skipping update to prevent data loss");
394 + continue;
395 + }
396 + };
391 397 self.db.feeds().update_config(feed.id, &config_str).await?;
392 398
393 399 info!(feed_name = %feed.name, "Encrypted secrets for feed");
@@ -272,6 +272,14 @@ impl RhaiPluginManager {
272 272 engine.set_max_expr_depths(128, 128);
273 273 engine.set_max_operations(100_000);
274 274 engine.set_max_call_levels(32);
275 + // Limit intermediate value sizes to prevent memory exhaustion during
276 + // script execution (complements the post-return validate_dynamic_sizes).
277 + engine.set_max_string_size(2 * 1024 * 1024); // 2 MB per string
278 + engine.set_max_array_size(5_000);
279 + engine.set_max_map_size(2_000);
280 + // Disable `import` statements to prevent plugins from reading arbitrary
281 + // .rhai files from disk via the default FileModuleResolver.
282 + engine.set_module_resolver(rhai::module_resolvers::DummyModuleResolver::new());
275 283 let request_counter = Arc::new(AtomicUsize::new(0));
276 284 let fetch_deadline = Arc::new(AtomicU64::new(0));
277 285 register_host_functions(&mut engine, request_counter.clone(), fetch_deadline.clone());
@@ -349,6 +357,11 @@ pub fn create_engine() -> Engine {
349 357 engine.set_max_expr_depths(128, 128);
350 358 engine.set_max_operations(100_000);
351 359 engine.set_max_call_levels(32);
360 + engine.set_max_string_size(2 * 1024 * 1024);
361 + engine.set_max_array_size(5_000);
362 + engine.set_max_map_size(2_000);
363 + // Disable `import` to prevent arbitrary file reads from disk.
364 + engine.set_module_resolver(rhai::module_resolvers::DummyModuleResolver::new());
352 365 let counter = Arc::new(AtomicUsize::new(0));
353 366 let deadline = Arc::new(AtomicU64::new(0));
354 367 register_host_functions(&mut engine, counter, deadline);
@@ -83,9 +83,10 @@ pub fn strip_tracking_params(url_str: &str) -> String {
83 83 parsed.to_string()
84 84 }
85 85
86 - /// Regex matching `href="..."` and `src="..."` attribute values in HTML.
86 + /// Regex matching `href="..."` / `href='...'` and `src="..."` / `src='...'`
87 + /// attribute values in HTML (both double- and single-quoted).
87 88 static ATTR_URL_RE: LazyLock<Regex> = LazyLock::new(|| {
88 - Regex::new(r#"(href|src)\s*=\s*"([^"]+)""#).expect("invalid regex")
89 + Regex::new(r#"(href|src)\s*=\s*(?:"([^"]+)"|'([^']+)')"#).expect("invalid regex")
89 90 });
90 91
91 92 #[tracing::instrument(skip_all)]
@@ -94,9 +95,14 @@ pub fn strip_tracking_from_html(html: &str) -> String {
94 95 ATTR_URL_RE
95 96 .replace_all(html, |caps: &regex::Captures| {
96 97 let attr = &caps[1];
97 - let url = &caps[2];
98 + // Group 2 = double-quoted value, group 3 = single-quoted value
99 + let (url, quote) = if let Some(m) = caps.get(2) {
100 + (m.as_str(), '"')
101 + } else {
102 + (caps.get(3).unwrap().as_str(), '\'')
103 + };
98 104 let cleaned = strip_tracking_params(url);
99 - format!("{}=\"{}\"", attr, cleaned)
105 + format!("{}={}{}{}", attr, quote, cleaned, quote)
100 106 })
101 107 .into_owned()
102 108 }
@@ -314,8 +314,10 @@ impl FeedsRepository {
314 314 /// Update a feed's config JSON string.
315 315 #[tracing::instrument(skip_all)]
316 316 pub async fn update_config(&self, id: FeedId, config: &str) -> Result<(), sqlx::Error> {
317 - sqlx::query("UPDATE feeds SET config = ?1 WHERE id = ?2")
317 + let now = Utc::now().format(TIMESTAMP_FMT).to_string();
318 + sqlx::query("UPDATE feeds SET config = ?1, updated_at = ?2 WHERE id = ?3")
318 319 .bind(config)
320 + .bind(&now)
319 321 .bind(id)
320 322 .execute(&self.pool)
321 323 .await?;
@@ -579,6 +581,52 @@ impl ItemsRepository {
579 581 q.fetch_all(&self.pool).await
580 582 }
581 583
584 + /// List feed items with any combination of filters pushed into SQL.
585 + ///
586 + /// Unifies `list_all`, `list_by_busser`, `list_unread`, `list_starred`,
587 + /// and `list_search` into a single method so callers don't need an
588 + /// if/else chain that drops filter combinations.
589 + #[tracing::instrument(skip_all)]
590 + pub async fn list_filtered(
591 + &self,
592 + search: Option<&str>,
593 + source: Option<&str>,
594 + unread_only: bool,
595 + starred_only: bool,
596 + limit: i64,
597 + offset: i64,
598 + ) -> Result<Vec<DbFeedItem>, sqlx::Error> {
599 + // When a search query is present, use FTS5.
600 + if let Some(query) = search {
601 + return self
602 + .list_search(query, source, unread_only, starred_only, limit, offset)
603 + .await;
604 + }
605 +
606 + // Build a dynamic query with conditional WHERE clauses.
607 + let mut sql = String::from("SELECT * FROM feed_items WHERE 1=1");
608 + if source.is_some() {
609 + sql.push_str(" AND busser_id = ?3");
610 + }
611 + if unread_only {
612 + sql.push_str(" AND is_read = 0");
613 + }
614 + if starred_only {
615 + sql.push_str(" AND is_starred = 1");
616 + }
617 + sql.push_str(" ORDER BY published_at DESC LIMIT ?1 OFFSET ?2");
618 +
619 + let mut q = sqlx::query_as::<_, DbFeedItem>(&sql)
620 + .bind(limit) // ?1
621 + .bind(offset); // ?2
622 +
623 + if let Some(src) = source {
624 + q = q.bind(src); // ?3
625 + }
626 +
627 + q.fetch_all(&self.pool).await
628 + }
629 +
582 630 /// Set the read flag on a feed item.
583 631 #[tracing::instrument(skip_all)]
584 632 pub async fn mark_read(&self, id: ItemId, is_read: bool) -> Result<(), sqlx::Error> {
@@ -633,6 +681,15 @@ impl ItemsRepository {
633 681 Ok(row.0)
634 682 }
635 683
684 + /// Count starred items.
685 + #[tracing::instrument(skip_all)]
686 + pub async fn count_starred(&self) -> Result<i64, sqlx::Error> {
687 + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE is_starred = 1")
688 + .fetch_one(&self.pool)
689 + .await?;
690 + Ok(row.0)
691 + }
692 +
636 693 /// Count unread items from a specific busser source.
637 694 #[tracing::instrument(skip_all)]
638 695 pub async fn count_unread_by_busser(&self, busser_id: &str) -> Result<i64, sqlx::Error> {
@@ -1048,6 +1105,8 @@ impl BookmarksRepository {
1048 1105 let id = BookmarkId::new();
1049 1106 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
1050 1107
1108 + let mut tx = self.pool.begin().await?;
1109 +
1051 1110 let bookmark: DbBookmark = sqlx::query_as(
1052 1111 r#"
1053 1112 INSERT INTO bookmarks (id, url, title, description, author, source_name,
@@ -1065,7 +1124,7 @@ impl BookmarksRepository {
1065 1124 .bind(&input.feed_item_id)
1066 1125 .bind(&input.notes)
1067 1126 .bind(&now)
1068 - .fetch_one(&self.pool)
1127 + .fetch_one(&mut *tx)
1069 1128 .await?;
1070 1129
1071 1130 // Insert tags
@@ -1077,10 +1136,11 @@ impl BookmarksRepository {
1077 1136 sqlx::query("INSERT OR IGNORE INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)")
1078 1137 .bind(id)
1079 1138 .bind(tag)
1080 - .execute(&self.pool)
1139 + .execute(&mut *tx)
1081 1140 .await?;
1082 1141 }
1083 1142
1143 + tx.commit().await?;
1084 1144 Ok(bookmark)
1085 1145 }
1086 1146
@@ -1178,9 +1238,11 @@ impl BookmarksRepository {
1178 1238 /// Replace all tags for a bookmark (delete-all-then-insert).
1179 1239 #[tracing::instrument(skip_all)]
1180 1240 pub async fn set_tags(&self, id: BookmarkId, tags: &[String]) -> Result<(), sqlx::Error> {
1241 + let mut tx = self.pool.begin().await?;
1242 +
1181 1243 sqlx::query("DELETE FROM bookmark_tags WHERE bookmark_id = ?1")
1182 1244 .bind(id)
1183 - .execute(&self.pool)
1245 + .execute(&mut *tx)
1184 1246 .await?;
1185 1247
1186 1248 for tag in tags {
@@ -1191,9 +1253,11 @@ impl BookmarksRepository {
1191 1253 sqlx::query("INSERT INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)")
1192 1254 .bind(id)
1193 1255 .bind(tag)
1194 - .execute(&self.pool)
1256 + .execute(&mut *tx)
1195 1257 .await?;
1196 1258 }
1259 +
1260 + tx.commit().await?;
1197 1261 Ok(())
1198 1262 }
1199 1263
@@ -24,41 +24,21 @@ impl FeedGenerator {
24 24 // Fetch page_size + 1 to detect whether more pages exist.
25 25 let fetch_limit = self.page_size + 1;
26 26
27 - // When a search query is present, push it into SQL so pagination
28 - // is accurate (in-memory search after LIMIT would miss results).
29 - let items = if let Some(ref search) = self.filter.search {
30 - self.db
31 - .items()
32 - .list_search(
33 - search,
34 - self.filter.source.as_deref(),
35 - self.filter.unread_only,
36 - self.filter.starred_only,
37 - fetch_limit,
38 - offset,
39 - )
40 - .await?
41 - } else if let Some(ref source) = self.filter.source {
42 - self.db
43 - .items()
44 - .list_by_busser(source, fetch_limit, offset)
45 - .await?
46 - } else if self.filter.unread_only {
47 - self.db
48 - .items()
49 - .list_unread(fetch_limit, offset)
50 - .await?
51 - } else if self.filter.starred_only {
52 - self.db
53 - .items()
54 - .list_starred(fetch_limit, offset)
55 - .await?
56 - } else {
57 - self.db
58 - .items()
59 - .list_all(fetch_limit, offset)
60 - .await?
61 - };
27 + // Push as many filters as possible into SQL so pagination is
28 + // accurate. `list_filtered` handles any combination of source,
29 + // unread, starred, and full-text search in a single query.
30 + let items = self
31 + .db
32 + .items()
33 + .list_filtered(
34 + self.filter.search.as_deref(),
35 + self.filter.source.as_deref(),
36 + self.filter.unread_only,
37 + self.filter.starred_only,
38 + fetch_limit,
39 + offset,
40 + )
41 + .await?;
62 42
63 43 // Convert to FeedItem
64 44 let mut feed_items: Vec<FeedItem> = items
@@ -99,8 +79,10 @@ impl FeedGenerator {
99 79
100 80 // Apply query feed conditions that require in-memory evaluation
101 81 // (title/author/body contains, not_contains, equals, matches_regex).
82 + // Pre-compile regexes once to avoid O(N×M) recompilation per item.
102 83 if !self.filter.conditions.is_empty() {
103 - feed_items.retain(|item| self.filter.matches(item));
84 + let regex_cache = self.filter.compile_regexes();
85 + feed_items.retain(|item| self.filter.matches_with_cache(item, &regex_cache));
104 86 }
105 87 self.order_by.apply(&mut feed_items);
106 88 // When in-memory filtering is active and SQL indicated more rows exist,
@@ -143,6 +125,22 @@ impl FeedGenerator {
143 125 .map(|db_item| db_item.to_feed_item())
144 126 .collect();
145 127
128 + // Apply feed-tag filtering (same as get_items) before general filter.
129 + if !self.filter.feed_tags.is_empty() {
130 + let matching_feed_ids = self
131 + .db
132 + .tags()
133 + .feed_ids_with_tags(&self.filter.feed_tags)
134 + .await?;
135 + let feeds = self.db.feeds().list_all().await?;
136 + let matching_busser_ids: std::collections::HashSet<String> = feeds
137 + .iter()
138 + .filter(|f| matching_feed_ids.contains(&f.id))
139 + .map(|f| f.busser_id.to_string())
140 + .collect();
141 + feed_items.retain(|item| matching_busser_ids.contains(&item.id.source));
142 + }
143 +
146 144 feed_items = self.filter.apply(feed_items);
147 145 self.order_by.apply(&mut feed_items);
148 146
@@ -155,13 +153,34 @@ impl FeedGenerator {
155 153 })
156 154 }
157 155
158 - /// Get total item count
156 + /// Get total item count, respecting source, unread, and starred filters.
159 157 #[tracing::instrument(skip_all)]
160 158 pub async fn count(&self) -> Result<i64, FeedError> {
159 + // Use list_filtered with a large limit to count items that match all
160 + // SQL-level filters. This is less efficient than a dedicated COUNT query
161 + // for the combined case, but is correct for all filter combinations.
162 + if self.filter.source.is_some() && (self.filter.unread_only || self.filter.starred_only) {
163 + // Combined filters — count via filtered query
164 + let items = self
165 + .db
166 + .items()
167 + .list_filtered(
168 + None,
169 + self.filter.source.as_deref(),
170 + self.filter.unread_only,
171 + self.filter.starred_only,
172 + i64::MAX,
173 + 0,
174 + )
175 + .await?;
176 + return Ok(items.len() as i64);
177 + }
161 178 if let Some(ref source) = self.filter.source {
162 179 Ok(self.db.items().count_by_busser(source).await?)
163 180 } else if self.filter.unread_only {
164 181 Ok(self.db.items().count_unread().await?)
182 + } else if self.filter.starred_only {
183 + Ok(self.db.items().count_starred().await?)
165 184 } else {
166 185 Ok(self.db.items().count_all().await?)
167 186 }
@@ -173,7 +173,7 @@ impl FeedFilter {
173 173
174 174 /// Pre-compile regex patterns from conditions. Called once before filtering
175 175 /// a batch of items to avoid recompiling per-item.
176 - fn compile_regexes(&self) -> HashMap<usize, Regex> {
176 + pub fn compile_regexes(&self) -> HashMap<usize, Regex> {
177 177 let mut cache = HashMap::new();
178 178 for (i, c) in self.conditions.iter().enumerate() {
179 179 if c.operator == "matches_regex" {
@@ -200,7 +200,7 @@ impl FeedFilter {
200 200 }
201 201
202 202 /// Check if an item matches using pre-compiled regex cache.
203 - fn matches_with_cache(&self, item: &FeedItem, regex_cache: &HashMap<usize, Regex>) -> bool {
203 + pub fn matches_with_cache(&self, item: &FeedItem, regex_cache: &HashMap<usize, Regex>) -> bool {
204 204 // Check source filter
205 205 if let Some(ref source) = self.source {
206 206 if item.id.source.as_str() != source {
@@ -20,12 +20,23 @@ impl FeedItemId {
20 20 }
21 21 }
22 22
23 - /// Create a combined string ID for database storage
23 + /// Create a combined string ID for database storage.
24 + ///
25 + /// Format: `source:item_id`. Uses `split_once(':')` for parsing, so the
26 + /// `source` field must not contain `:`. This is guaranteed by convention —
27 + /// busser IDs are simple ASCII identifiers (e.g. `rss`, `hn`, `arxiv`).
24 28 pub fn to_combined(&self) -> String {
29 + debug_assert!(
30 + !self.source.contains(':'),
31 + "FeedItemId source must not contain ':', got: {}",
32 + self.source
33 + );
25 34 format!("{}:{}", self.source, self.item_id)
26 35 }
27 36
28 - /// Parse a combined string ID
37 + /// Parse a combined string ID.
38 + ///
39 + /// Splits at the first `:`, so item_id may contain colons but source must not.
29 40 pub fn from_combined(s: &str) -> Option<Self> {
30 41 let (source, item_id) = s.split_once(':')?;
31 42 Some(Self::new(source, item_id))
M docs/todo.md +58
@@ -7,6 +7,64 @@ v0.3.1. Audit grade A. 601 tests (`--workspace`).
7 7
8 8 ---
9 9
10 + ## Fuzz Findings (2026-04-27)
11 +
12 + Findings from adversarial code review. Ordered by severity.
13 +
14 + ### HIGH
15 +
16 + - [x] **XSS in bookmark HTML export** (`commands/bookmarks.rs:403`). The `body` field (attacker-controlled RSS content) is interpolated into the HTML template without escaping. `title`/`url`/`author` are escaped but `body` is not. Fix: pass `body` through `html_escape` or sanitize.
17 + - [x] **Path traversal in `download_and_open`** (`commands/items.rs:384-388`). URL filename extracted via `rsplit('/')` is passed to `dir.join()` without sanitizing `..` components. Fix: strip path separators and `..` from derived filename.
18 + - [x] **Arbitrary code exec via `open::that`** (`commands/items.rs:404`). Downloaded files are opened with the system default handler with no extension validation. A `.exe`/`.command`/`.scpt` URL would be downloaded and launched. Fix: allowlist safe extensions or skip auto-open for dangerous ones.
19 + - [x] **Source+unread/starred filter bypass** (`bb-feed/generator/query.rs:29-61`). The `if/else if` chain gives `source` priority over `unread_only`/`starred_only`. When both are set (no search), the unread/starred filter is silently dropped. Fix: add `list_by_busser_unread`/`list_by_busser_starred` queries, or combine the filters in the existing chain.
20 + - [x] **Regex recompilation per-item** (`ordering.rs:197` + `query.rs:103`). `retain` calls `matches()` which calls `compile_regexes()` per item. O(N×M) regex compilations. Fix: pre-compile once before the `retain` loop.
21 +
22 + ### MEDIUM — sync service
23 +
24 + - [x] **No mutex on `perform_sync`** (`sync_service/mod.rs`, `sync_scheduler.rs`, `commands/sync.rs`). Fixed: added `tokio::sync::Mutex` in `AppState`, acquired in both `sync_now` and `sync_scheduler`.
25 + - [x] **Skipped changelog entries still marked as pushed** (`sync_service/upload.rs:77`). Fixed: track individual pushed IDs; only mark those as pushed. Skipped entries remain `pushed=0` and will be retried.
26 + - [x] **`INSERT OR REPLACE` with partial JSON nulls columns** (`sync_service/download.rs:158-199`). Fixed: validate primary key columns exist before upserting; log debug warning for other missing columns so they're visible but don't block the sync.
27 + - [x] **`applying_remote` flag not in transaction** (`sync_service/download.rs:53-58`). Fixed: flag set, data changes, and flag clear are all wrapped in a single SQLite transaction. A crash mid-apply rolls back the flag too.
28 +
29 + ### MEDIUM — crypto
30 +
31 + - [x] **TOCTOU race in `load_or_create_key`** (`crypto.rs:29-51`). Fixed: uses `OpenOptions::create_new(true)` for atomic check-and-create.
32 + - [x] **Keychain migration deletes file before verifying durability** (`crypto.rs:78-82`). Fixed: read-back and compare before deleting file.
33 + - [x] **Key creation failure silently degrades to plaintext** (`state.rs:68-76`). Fixed: encryption key loading is now a hard startup error. App will not start without a working encryption key.
34 + - [x] **Decrypt failure passes raw ciphertext to plugins** (`crypto.rs:199-200`). Fixed: clears field to empty string on decrypt failure instead of passing ciphertext through.
35 + - [x] **No key zeroing on drop** (`crypto.rs:21-25`). Fixed: all key material wrapped in `zeroize::Zeroizing<[u8; 32]>` via `EncryptionKey` type alias. Keys are zeroed from memory when dropped.
36 +
37 + ### MEDIUM — database
38 +
39 + - [x] **Missing transaction in `BookmarksRepository::create`** (`repository.rs:1047-1085`). Bookmark insert + tag inserts are not transactional; partial failure leaves bookmark with incomplete tags.
40 + - [x] **Missing transaction in `BookmarksRepository::set_tags`** (`repository.rs:1180-1198`). Delete-all + inserts not transactional; failure mid-insert loses tags permanently.
41 + - [x] **`update_config` does not update `updated_at`** (`repository.rs:316-323`). Every other mutation method updates `updated_at`; this one doesn't, which could break sync/cache-invalidation.
42 +
43 + ### MEDIUM — plugin sandbox
44 +
45 + - [x] **No `set_max_string_size`/`set_max_array_size` on Rhai engine** (`rhai_plugin/mod.rs:271-274`). Fixed: set `max_string_size` (2 MB), `max_array_size` (5000), `max_map_size` (2000) on both engine creation sites. Complements the existing per-node `validate_dynamic_sizes` on return values.
46 + - [x] **Rhai `import` may not be disabled** (`rhai_plugin/mod.rs:271`). Fixed: set `DummyModuleResolver` on both `load_plugin` and `create_engine` engines.
47 + - [x] **DNS rebinding bypasses URL blocklist** (`rhai_plugin/host_functions.rs:35-86`). Fixed: strip `user@` from host before blocklist check; added `100.x` (CGNAT/Tailscale) block. Note: DNS rebinding itself is inherent to string-level checks; mitigated by trust model (local plugins only).
48 +
49 + ### MEDIUM — other
50 +
51 + - [x] **`count()` ignores most filters** (`bb-feed/generator/query.rs:160-168`). Fixed: added `starred_only` support and combined source+unread/starred via `list_filtered`.
52 + - [x] **`get_all_items()` ignores `feed_tags` filter** (`bb-feed/generator/query.rs:137-156`). Fixed: added feed_tags filtering before `apply()`.
53 + - [x] **No URL scheme validation in `extract_reader_view`** (`commands/query_feeds.rs:192-210`). Fixed: validates http/https before calling reader script.
54 + - [x] **HTML URL cleaner only matches double-quoted attributes** (`url_cleaner.rs:88`). Single-quoted URLs (`href='...'`) bypass tracking parameter removal entirely.
55 +
56 + ### LOW / NOTE
57 +
58 + - [x] **No size limit on OPML import** (`commands/opml.rs:70`). Fixed: capped at 10 MB before parsing.
59 + - [x] **Bookmark duplicate check not transactional** (`commands/bookmarks.rs:143-145`). Kept check-then-insert (no UNIQUE constraint on url column). Acceptable for desktop single-user app. Documented.
60 + - [x] **No bookmark tag validation** (`commands/bookmarks.rs:274-291`). Fixed: added `validate_bookmark_tags` using same `tagtree::validate_with` rules as feed tags. Applied to `create_bookmark`, `create_bookmark_from_item`, and `set_bookmark_tags`.
61 + - [x] **Error leakage**: raw sqlx errors forwarded to frontend (`commands/error.rs:81-84`). Fixed: log full error server-side, send generic message to frontend.
62 + - [x] **`expect()` on `AppState::new()` in startup** (`lib.rs:39`). Fixed: replaced `expect` with `match` that returns `Err` to Tauri setup, logging + printing the error. No more panic on DB/key failure.
63 + - [x] **`serde_json::to_string` fallback replaces config with `{}`** (`orchestrator.rs:389-390`). Fixed: serialization failure now logs an error and skips the update, preserving existing config.
64 + - [x] **`FeedItemId::to_combined`/`from_combined` roundtrip fails when source contains `:`** (`feed_item.rs:24-32`). Fixed: added `debug_assert` that source must not contain `:`. Documented the invariant. Source IDs are simple ASCII identifiers by convention.
65 +
66 + ---
67 +
10 68 ## Phase 7: Plugin OAuth (Post-beta)
11 69
12 70 ### 7A: OAuth Infrastructure
@@ -66,6 +66,9 @@ futures.workspace = true
66 66 tracing.workspace = true
67 67 tracing-subscriber.workspace = true
68 68
69 + # HTML sanitization for export
70 + regex.workspace = true
71 +
69 72 # Tag standard
70 73 tagtree.workspace = true
71 74
@@ -3,8 +3,9 @@
3 3 use crate::commands::error::ApiError;
4 4 use crate::state::AppState;
5 5 use bb_db::{BookmarkId, CreateBookmark, ItemId, UpdateBookmark};
6 + use regex::Regex;
6 7 use serde::{Deserialize, Serialize};
7 - use std::sync::Arc;
8 + use std::sync::{Arc, LazyLock};
8 9 use tauri::State;
9 10 use tracing::instrument;
10 11
@@ -97,6 +98,21 @@ async fn bookmark_to_response(
97 98 })
98 99 }
99 100
101 + /// Tag rules for bookmarks (same as feed tags for consistency).
102 + const BB_TAG_CONFIG: tagtree::TagConfig = tagtree::TagConfig {
103 + max_depth: 3,
104 + max_length: 80,
105 + semantic_depth: 0,
106 + };
107 +
108 + fn validate_bookmark_tags(tags: &[String]) -> Result<(), ApiError> {
109 + for tag in tags {
110 + tagtree::validate_with(tag, &BB_TAG_CONFIG)
111 + .map_err(|e| ApiError::bad_request(format!("invalid tag: {}", e.0)))?;
112 + }
113 + Ok(())
114 + }
115 +
100 116 fn validate_url(url: &str) -> Result<(), ApiError> {
101 117 let url = url.trim();
102 118 if url.is_empty() {
@@ -136,10 +152,11 @@ pub async fn create_bookmark(
136 152 input: CreateBookmarkInput,
137 153 ) -> Result<BookmarkResponse, ApiError> {
138 154 validate_url(&input.url)?;
155 + validate_bookmark_tags(&input.tags)?;
139 156
140 157 let db = state.orchestrator.database();
141 158
142 - // Check for duplicate
159 + // Check for duplicate (best-effort; no UNIQUE constraint on url column)
143 160 if db.bookmarks().get_by_url(input.url.trim()).await?.is_some() {
144 161 return Err(ApiError::bad_request("URL is already bookmarked"));
145 162 }
@@ -169,6 +186,10 @@ pub async fn create_bookmark_from_item(
169 186 item_id: String,
170 187 tags: Option<Vec<String>>,
171 188 ) -> Result<BookmarkResponse, ApiError> {
189 + if let Some(ref t) = tags {
190 + validate_bookmark_tags(t)?;
191 + }
192 +
172 193 let id: ItemId = item_id
173 194 .parse()
174 195 .map_err(|_| ApiError::bad_request("Invalid item ID"))?;
@@ -276,6 +297,8 @@ pub async fn set_bookmark_tags(
276 297 id: String,
277 298 tags: Vec<String>,
278 299 ) -> Result<(), ApiError> {
300 + validate_bookmark_tags(&tags)?;
301 +
279 302 let bookmark_id: BookmarkId = id
280 303 .parse()
281 304 .map_err(|_| ApiError::bad_request("Invalid bookmark ID"))?;
@@ -400,12 +423,29 @@ pub async fn export_bookmark_html(
400 423 } else {
401 424 format!("By {}<br>", html_escape(&bookmark.author))
402 425 },
403 - body = body,
426 + // Body is pre-sanitized at ingest (docengine::sanitize_html strips
427 + // script/iframe/on* etc.), but re-sanitize defensively for the export
428 + // context in case DB was modified directly or via sync.
429 + body = sanitize_body_for_export(&body),
404 430 );
405 431
406 432 Ok(html)
407 433 }
408 434
435 + /// Strip dangerous HTML elements from body content for standalone HTML export.
436 + /// The body is already sanitized at ingest time by docengine, but this provides
437 + /// defense-in-depth in case the DB was modified directly or via sync.
438 + fn sanitize_body_for_export(html: &str) -> String {
439 + static DANGEROUS_TAGS: LazyLock<Regex> = LazyLock::new(|| {
440 + Regex::new(r#"(?i)<\s*/?\s*(script|iframe|object|embed|form|base|style)\b[^>]*>"#).expect("invalid regex")
441 + });
442 + static ON_HANDLERS: LazyLock<Regex> = LazyLock::new(|| {
443 + Regex::new(r#"(?i)\bon\w+\s*=\s*["'][^"']*["']"#).expect("invalid regex")
444 + });
445 + let s = DANGEROUS_TAGS.replace_all(html, "");
446 + ON_HANDLERS.replace_all(&s, "").into_owned()
447 + }
448 +
409 449 fn html_escape(s: &str) -> String {
410 450 s.replace('&', "&amp;")
411 451 .replace('<', "&lt;")
@@ -79,13 +79,17 @@ impl ApiError {
79 79
80 80 impl From<sqlx::Error> for ApiError {
81 81 fn from(e: sqlx::Error) -> Self {
82 - Self::database(e.to_string())
82 + // Log the full error for debugging but send a generic message to the
83 + // frontend to avoid leaking file paths, SQL text, or schema details.
84 + tracing::error!(error = %e, "Database error");
85 + Self::database("A database error occurred")
83 86 }
84 87 }
85 88
86 89 impl From<bb_feed::FeedError> for ApiError {
87 90 fn from(e: bb_feed::FeedError) -> Self {
88 - Self::database(e.to_string())
91 + tracing::error!(error = %e, "Feed error");
92 + Self::database("A feed query error occurred")
89 93 }
90 94 }
91 95
@@ -93,9 +97,15 @@ impl From<bb_core::orchestrator::OrchestratorError> for ApiError {
93 97 fn from(e: bb_core::orchestrator::OrchestratorError) -> Self {
94 98 use bb_core::orchestrator::OrchestratorError;
95 99 match e {
96 - OrchestratorError::Database(e) => Self::database(e.to_string()),
100 + OrchestratorError::Database(e) => {
101 + tracing::error!(error = %e, "Database error");
102 + Self::database("A database error occurred")
103 + }
97 104 OrchestratorError::Plugin(e) => Self::plugin(e.to_string()),
98 - OrchestratorError::Feed(e) => Self::database(e.to_string()),
105 + OrchestratorError::Feed(e) => {
106 + tracing::error!(error = %e, "Feed error");
107 + Self::database("A feed query error occurred")
108 + }
99 109 OrchestratorError::Config(msg) => Self::internal(msg),
100 110 }
101 111 }
@@ -380,18 +380,38 @@ pub async fn download_and_open(url: String) -> Result<(), ApiError> {
380 380 .call()
381 381 .map_err(|e| ApiError::internal(format!("Download failed: {}", e)))?;
382 382
383 - // Derive filename from URL path, or fall back to "download"
384 - let filename = url
383 + // Derive filename from URL path, or fall back to "download.bin".
384 + // Sanitize: strip path separators and ".." to prevent directory traversal.
385 + let raw_filename = url
385 386 .rsplit('/')
386 387 .next()
387 388 .filter(|s| !s.is_empty() && s.contains('.'))
388 389 .unwrap_or("download.bin");
390 + let filename: String = raw_filename
391 + .replace(['/', '\\'], "")
392 + .replace("..", "");
393 + let filename = if filename.is_empty() { "download.bin".to_string() } else { filename };
394 +
395 + // Block executable extensions to prevent arbitrary code execution.
396 + const BLOCKED_EXTENSIONS: &[&str] = &[
397 + "exe", "msi", "bat", "cmd", "com", "scr", "pif", // Windows
398 + "app", "command", "scpt", "scptd", "action", // macOS
399 + "sh", "bash", "csh", "ksh", "run", // Unix
400 + "ps1", "psm1", "vbs", "vbe", "js", "jse", "wsf", // Script
401 + ];
402 + let ext = filename.rsplit('.').next().unwrap_or("").to_ascii_lowercase();
403 + if BLOCKED_EXTENSIONS.contains(&ext.as_str()) {
404 + return Err(ApiError::bad_request(format!(
405 + "Cannot open files with .{} extension for security reasons",
406 + ext
407 + )));
408 + }
389 409
390 410 let dir = std::env::temp_dir().join("bb-downloads");
391 411 std::fs::create_dir_all(&dir)
392 412 .map_err(|e| ApiError::internal(format!("Failed to create temp dir: {}", e)))?;
393 413
394 - let path = dir.join(filename);
414 + let path = dir.join(&filename);
395 415 let mut file = std::fs::File::create(&path)
396 416 .map_err(|e| ApiError::internal(format!("Failed to create file: {}", e)))?;
397 417
@@ -67,6 +67,16 @@ pub async fn import_opml(
67 67 state: State<'_, Arc<AppState>>,
68 68 content: String,
69 69 ) -> Result<ImportResult, ApiError> {
70 + // Cap OPML input size to prevent memory exhaustion from huge XML payloads.
71 + const MAX_OPML_SIZE: usize = 10 * 1024 * 1024; // 10 MB
72 + if content.len() > MAX_OPML_SIZE {
73 + return Err(ApiError::bad_request(format!(
74 + "OPML file too large ({} bytes, max {} bytes)",
75 + content.len(),
76 + MAX_OPML_SIZE
77 + )));
78 + }
79 +
70 80 let doc = roxmltree::Document::parse(&content)
71 81 .map_err(|e| ApiError::bad_request(format!("Invalid OPML: {}", e)))?;
72 82
@@ -200,6 +200,12 @@ pub async fn extract_reader_view(
200 200 .await
201 201 .plugins_dir()
202 202 .to_path_buf();
203 + // Validate URL scheme to prevent file:// and other local access.
204 + let lower = url.to_ascii_lowercase();
205 + if !lower.starts_with("http://") && !lower.starts_with("https://") {
206 + return Err(ApiError::bad_request("Only http and https URLs are allowed for reader view"));
207 + }
208 +
203 209 // Run in a blocking task since Rhai engine and HTTP are synchronous.
204 210 let result = tokio::task::spawn_blocking(move || {
205 211 bb_core::rhai_plugin::run_reader_script(&url, &plugins_dir).map_err(|e| {
@@ -340,6 +340,9 @@ pub async fn sync_now(
340 340
341 341 let pool = state.orchestrator.database().pool();
342 342
343 + // Prevent concurrent sync operations (manual + scheduler).
344 + let _sync_guard = state.sync_mutex.lock().await;
345 +
343 346 // Create initial snapshot if needed
344 347 let snapshot_done = sync_service::get_sync_state(pool, "initial_snapshot_done")
345 348 .await
@@ -34,9 +34,14 @@ pub fn build_app() -> tauri::Builder<tauri::Wry> {
34 34 .setup(|app| {
35 35 let app_handle = app.handle().clone();
36 36 tauri::async_runtime::block_on(async move {
37 - let state = AppState::new(&app_handle)
38 - .await
39 - .expect("Failed to initialize app state");
37 + let state = match AppState::new(&app_handle).await {
38 + Ok(s) => s,
39 + Err(e) => {
40 + tracing::error!(error = %e, "Fatal: failed to initialize app state");
41 + eprintln!("Fatal: failed to initialize app state: {e}");
42 + return Err(e);
43 + }
44 + };
40 45 let state = Arc::new(state);
41 46 app_handle.manage(state.clone());
42 47
@@ -49,7 +54,9 @@ pub fn build_app() -> tauri::Builder<tauri::Wry> {
49 54 // Start sync scheduler
50 55 let sync_handle = sync_scheduler::start_sync_scheduler(app_handle);
51 56 state.set_sync_scheduler_handle(sync_handle);
52 - });
57 + Ok(())
58 + })
59 + .map_err(|e: String| -> Box<dyn std::error::Error> { e.into() })?;
53 60
54 61 // Check for OTA updates after a short delay (desktop only)
55 62 #[cfg(not(any(target_os = "ios", target_os = "android")))]
@@ -21,6 +21,8 @@ pub struct AppState {
21 21 pub sync_client: RwLock<Option<Arc<SyncKitClient>>>,
22 22 /// App data directory for key persistence.
23 23 pub data_dir: PathBuf,
24 + /// Guard preventing concurrent sync operations (manual + scheduler).
25 + pub sync_mutex: tokio::sync::Mutex<()>,
24 26 /// Handle to abort the background auto-fetch task on shutdown.
25 27 auto_fetch_handle: parking_lot::Mutex<Option<AbortHandle>>,
26 28 /// Handle to abort the background stale-item cleanup task on shutdown.
@@ -63,17 +65,14 @@ impl AppState {
63 65 .await
64 66 .map_err(|e| format!("Failed to create orchestrator: {}", e))?;
65 67
66 - // Load or create encryption key for plugin secrets (keychain preferred, file fallback)
68 + // Load or create encryption key for plugin secrets (keychain preferred, file fallback).
69 + // This is a hard requirement — without an encryption key, plugin secrets
70 + // would be stored in plaintext, which is a security risk.
67 71 let key_path = app_data_dir.join("encryption.key");
68 - match bb_core::crypto::load_or_create_key_from_keychain(&key_path) {
69 - Ok(key) => {
70 - info!("Encryption key loaded");
71 - orchestrator.set_encryption_key(key);
72 - }
73 - Err(e) => {
74 - tracing::warn!(error = %e, "Failed to load encryption key, secrets will not be encrypted");
75 - }
76 - }
72 + let key = bb_core::crypto::load_or_create_key_from_keychain(&key_path)
73 + .map_err(|e| format!("Failed to load encryption key: {e}"))?;
74 + info!("Encryption key loaded");
75 + orchestrator.set_encryption_key(key);
77 76
78 77 info!("Orchestrator created, running migrations");
79 78
@@ -106,6 +105,7 @@ impl AppState {
106 105 orchestrator,
107 106 sync_client: RwLock::new(sync_client.map(Arc::new)),
108 107 data_dir: app_data_dir,
108 + sync_mutex: tokio::sync::Mutex::new(()),
109 109 auto_fetch_handle: parking_lot::Mutex::new(None),
110 110 cleanup_handle: parking_lot::Mutex::new(None),
111 111 sync_scheduler_handle: parking_lot::Mutex::new(None),
@@ -161,6 +161,9 @@ pub fn start_sync_scheduler(app: AppHandle) -> tokio::task::AbortHandle {
161 161 }
162 162 }
163 163
164 + // Prevent concurrent sync operations (manual + scheduler).
165 + let _sync_guard = state.sync_mutex.lock().await;
166 +
164 167 // Perform sync
165 168 match sync_service::perform_sync(pool, &client).await {
166 169 Ok(result) => {
@@ -50,15 +50,41 @@ pub async fn pull_changes(
50 50 }
51 51
52 52 /// Apply remote changes to local DB with triggers suppressed.
53 + ///
54 + /// The `applying_remote` flag and the data changes are wrapped in a single
55 + /// transaction so that a crash mid-apply rolls back everything — including
56 + /// the flag — preventing a stuck flag from suppressing local changelog
57 + /// entries until the next sync.
53 58 async fn apply_remote_changes(pool: &SqlitePool, changes: Vec<ChangeEntry>) -> Result<(), ApiError> {
54 - set_sync_state(pool, "applying_remote", "1").await?;
55 - let result = apply_changes_inner(pool, changes).await;
56 - // Always clear the flag, even on error
57 - set_sync_state(pool, "applying_remote", "0").await?;
59 + let mut tx = pool.begin().await.map_err(|e| ApiError::database(e.to_string()))?;
60 +
61 + // Set flag inside the transaction
62 + sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES ('applying_remote', '1')")
63 + .execute(&mut *tx)
64 + .await
65 + .map_err(|e| ApiError::database(e.to_string()))?;
66 +
67 + let result = apply_changes_inner_tx(&mut tx, changes).await;
68 +
69 + // Clear the flag inside the same transaction
70 + sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES ('applying_remote', '0')")
71 + .execute(&mut *tx)
72 + .await
73 + .map_err(|e| ApiError::database(e.to_string()))?;
74 +
75 + // If data application failed, still commit the flag clear (the partial
76 + // changes are acceptable since cursor hasn't been updated yet, and
77 + // re-pulling will re-apply the full batch idempotently).
78 + tx.commit().await.map_err(|e| ApiError::database(e.to_string()))?;
79 +
58 80 result
59 81 }
60 82
61 - async fn apply_changes_inner(pool: &SqlitePool, changes: Vec<ChangeEntry>) -> Result<(), ApiError> {
83 + /// Apply changes within a transaction.
84 + async fn apply_changes_inner_tx(
85 + tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
86 + changes: Vec<ChangeEntry>,
87 + ) -> Result<(), ApiError> {
62 88 let mut upserts: Vec<&ChangeEntry> = Vec::new();
63 89 let mut deletes: Vec<&ChangeEntry> = Vec::new();
64 90
@@ -69,22 +95,20 @@ async fn apply_changes_inner(pool: &SqlitePool, changes: Vec<ChangeEntry>) -> Re
69 95 }
70 96 }
71 97
72 - // Apply upserts in parent-first FK order
73 98 for table in UPSERT_ORDER {
74 99 for change in &upserts {
75 100 if change.table == *table {
76 101 if let Some(ref data) = change.data {
77 - apply_upsert(pool, table, &change.row_id, data).await?;
102 + apply_upsert_exec(&mut **tx, table, &change.row_id, data).await?;
78 103 }
79 104 }
80 105 }
81 106 }
82 107
83 - // Apply deletes in child-first FK order
84 108 for table in DELETE_ORDER {
85 109 for change in &deletes {
86 110 if change.table == *table {
87 - apply_delete(pool, table, &change.row_id).await?;
111 + apply_delete_exec(&mut **tx, table, &change.row_id).await?;
88 112 }
89 113 }
90 114 }
@@ -124,10 +148,22 @@ fn pk_column(table: &str) -> &'static str {
124 148 }
125 149 }
126 150
127 - /// Apply an INSERT OR REPLACE for a remote change.
151 + /// Apply an INSERT OR REPLACE for a remote change (pool version, used by tests).
152 + #[cfg(test)]
128 153 async fn apply_upsert(
129 154 pool: &SqlitePool,
130 155 table: &str,
156 + row_id: &str,
157 + data: &serde_json::Value,
158 + ) -> Result<(), ApiError> {
159 + let mut conn = pool.acquire().await.map_err(|e| ApiError::database(e.to_string()))?;
160 + apply_upsert_exec(&mut *conn, table, row_id, data).await
161 + }
162 +
163 + /// Apply an INSERT OR REPLACE for a remote change on any SQLite executor.
164 + async fn apply_upsert_exec(
165 + conn: &mut sqlx::SqliteConnection,
166 + table: &str,
131 167 _row_id: &str,
132 168 data: &serde_json::Value,
133 169 ) -> Result<(), ApiError> {
@@ -149,12 +185,37 @@ async fn apply_upsert(
149 185 .bind(is_read)
150 186 .bind(is_starred)
151 187 .bind(id)
152 - .execute(pool)
188 + .execute(&mut *conn)
153 189 .await
154 190 .map_err(|e| ApiError::database(e.to_string()))?;
155 191 return Ok(());
156 192 }
157 193
194 + // Validate that the incoming JSON has the primary key column(s).
195 + // INSERT OR REPLACE in SQLite means DELETE + INSERT, so missing columns
196 + // get NULL values, silently overwriting existing data.
197 + let pk = pk_column(table);
198 + let pk_cols: Vec<&str> = pk.split(", ").collect();
199 + for pk_col in &pk_cols {
200 + if data.get(*pk_col).is_none() || data[*pk_col].is_null() {
201 + warn!(table, pk_col, "Skipping remote upsert with missing primary key column");
202 + return Ok(());
203 + }
204 + }
205 +
206 + let missing: Vec<&&str> = columns
207 + .iter()
208 + .filter(|col| data.get(**col).is_none() || data[**col].is_null())
209 + .collect();
210 +
211 + if !missing.is_empty() {
212 + debug!(
213 + table,
214 + missing_columns = ?missing,
215 + "Remote upsert has NULL columns (may overwrite existing data)"
216 + );
217 + }
218 +
158 219 let col_list = columns.join(", ");
159 220 let placeholders = columns.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
160 221 let sql = format!(
@@ -193,7 +254,7 @@ async fn apply_upsert(
193 254 }
194 255
195 256 query
196 - .execute(pool)
257 + .execute(&mut *conn)
197 258 .await
198 259 .map_err(|e| ApiError::database(e.to_string()))?;
199 260
@@ -208,8 +269,19 @@ fn json_to_i32(val: &serde_json::Value) -> i32 {
208 269 }
209 270 }
210 271
211 - /// Apply a DELETE for a remote change.
272 + /// Apply a DELETE for a remote change (pool version, used by tests).
273 + #[cfg(test)]
212 274 async fn apply_delete(pool: &SqlitePool, table: &str, row_id: &str) -> Result<(), ApiError> {
275 + let mut conn = pool.acquire().await.map_err(|e| ApiError::database(e.to_string()))?;
276 + apply_delete_exec(&mut *conn, table, row_id).await
277 + }
278 +
279 + /// Apply a DELETE for a remote change on any SQLite executor.
280 + async fn apply_delete_exec(
281 + conn: &mut sqlx::SqliteConnection,
282 + table: &str,
283 + row_id: &str,
284 + ) -> Result<(), ApiError> {
213 285 if table_columns(table).is_none() {
214 286 return Err(ApiError::bad_request(format!("unknown syncable table: {}", table)));
215 287 }
@@ -234,7 +306,7 @@ async fn apply_delete(pool: &SqlitePool, table: &str, row_id: &str) -> Result<()
234 306 sqlx::query(&sql)
235 307 .bind(id_val)
236 308 .bind(tag)
237 - .execute(pool)
309 + .execute(&mut *conn)
238 310 .await
239 311 .map_err(|e| ApiError::database(e.to_string()))?;
240 312 }
@@ -244,7 +316,7 @@ async fn apply_delete(pool: &SqlitePool, table: &str, row_id: &str) -> Result<()
244 316 let sql = format!("DELETE FROM {} WHERE {} = ?", table, pk);
245 317 sqlx::query(&sql)
246 318 .bind(row_id)
247 - .execute(pool)
319 + .execute(&mut *conn)
248 320 .await
249 321 .map_err(|e| ApiError::database(e.to_string()))?;
250 322
@@ -554,7 +626,7 @@ mod tests {
554 626
555 627 let server = MockServer::start().await;
556 628 Mock::given(method("POST"))
557 - .and(path("/api/sync/pull"))
629 + .and(path("/api/v1/sync/pull"))
558 630 .respond_with(
559 631 ResponseTemplate::new(200).set_body_json(json!({
560 632 "changes": [],
@@ -596,7 +668,7 @@ mod tests {
596 668
597 669 let server = MockServer::start().await;
598 670 Mock::given(method("POST"))
599 - .and(path("/api/sync/pull"))
671 + .and(path("/api/v1/sync/pull"))
600 672 .respond_with(
601 673 ResponseTemplate::new(200).set_body_json(json!({
602 674 "changes": [{
@@ -641,7 +713,7 @@ mod tests {
641 713
642 714 let server = MockServer::start().await;
643 715 Mock::given(method("POST"))
644 - .and(path("/api/sync/pull"))
716 + .and(path("/api/v1/sync/pull"))
645 717 .respond_with(
646 718 ResponseTemplate::new(200).set_body_json(json!({
647 719 "changes": [],
@@ -705,7 +777,7 @@ mod tests {
705 777
706 778 // First page: has_more = true
707 779 Mock::given(method("POST"))
708 - .and(path("/api/sync/pull"))
780 + .and(path("/api/v1/sync/pull"))
709 781 .respond_with(
710 782 ResponseTemplate::new(200).set_body_json(json!({
711 783 "changes": [{
@@ -727,7 +799,7 @@ mod tests {
727 799
728 800 // Second page: has_more = false
729 801 Mock::given(method("POST"))
730 - .and(path("/api/sync/pull"))
802 + .and(path("/api/v1/sync/pull"))
731 803 .respond_with(
732 804 ResponseTemplate::new(200).set_body_json(json!({
733 805 "changes": [{
@@ -780,7 +852,7 @@ mod tests {
780 852
781 853 let server = MockServer::start().await;
782 854 Mock::given(method("POST"))
783 - .and(path("/api/sync/pull"))
855 + .and(path("/api/v1/sync/pull"))
784 856 .respond_with(
785 857 ResponseTemplate::new(200).set_body_json(json!({
786 858 "changes": [{
@@ -841,7 +913,7 @@ mod tests {
841 913
842 914 let server = MockServer::start().await;
843 915 Mock::given(method("POST"))
844 - .and(path("/api/sync/pull"))
916 + .and(path("/api/v1/sync/pull"))
845 917 .respond_with(
846 918 ResponseTemplate::new(200).set_body_json(json!({
847 919 "changes": [{