max / balanced_breakfast
20 files changed,
+320 insertions,
-132 deletions
| @@ -36,22 +36,26 @@ fn load_or_create_key(path: &Path) -> Result<EncryptionKey, String> { | |||
| 36 | 36 | use std::io::Write; | |
| 37 | 37 | ||
| 38 | 38 | // Try to create the file exclusively first (atomic check-and-create). | |
| 39 | - | match std::fs::OpenOptions::new() | |
| 39 | + | #[cfg(unix)] | |
| 40 | + | let open_result = { | |
| 41 | + | use std::os::unix::fs::OpenOptionsExt; | |
| 42 | + | std::fs::OpenOptions::new() | |
| 43 | + | .write(true) | |
| 44 | + | .create_new(true) | |
| 45 | + | .mode(0o600) | |
| 46 | + | .open(path) | |
| 47 | + | }; | |
| 48 | + | #[cfg(not(unix))] | |
| 49 | + | let open_result = std::fs::OpenOptions::new() | |
| 40 | 50 | .write(true) | |
| 41 | 51 | .create_new(true) | |
| 42 | - | .open(path) | |
| 43 | - | { | |
| 52 | + | .open(path); | |
| 53 | + | ||
| 54 | + | match open_result { | |
| 44 | 55 | Ok(mut file) => { | |
| 45 | 56 | let key = generate_key(); | |
| 46 | 57 | file.write_all(&*key) | |
| 47 | 58 | .map_err(|e| format!("Failed to write encryption key: {e}"))?; | |
| 48 | - | #[cfg(unix)] | |
| 49 | - | { | |
| 50 | - | use std::os::unix::fs::PermissionsExt; | |
| 51 | - | let perms = std::fs::Permissions::from_mode(0o600); | |
| 52 | - | std::fs::set_permissions(path, perms) | |
| 53 | - | .map_err(|e| format!("Failed to set key file permissions: {e}"))?; | |
| 54 | - | } | |
| 55 | 59 | Ok(key) | |
| 56 | 60 | } | |
| 57 | 61 | Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { | |
| @@ -226,7 +230,7 @@ pub fn decrypt_config_secrets( | |||
| 226 | 230 | obj.insert(field.key.clone(), serde_json::Value::String(decrypted)); | |
| 227 | 231 | } | |
| 228 | 232 | Err(e) => { | |
| 229 | - | tracing::warn!(field = %field.key, error = %e, "Failed to decrypt secret, clearing field to prevent ciphertext leakage"); | |
| 233 | + | tracing::error!(field = %field.key, error = %e, "Failed to decrypt secret, clearing field to prevent ciphertext leakage. Feed may need re-configuration."); | |
| 230 | 234 | obj.insert(field.key.clone(), serde_json::Value::String(String::new())); | |
| 231 | 235 | } | |
| 232 | 236 | } |
| @@ -184,22 +184,29 @@ impl Orchestrator { | |||
| 184 | 184 | &self, | |
| 185 | 185 | plugin_id: &str, | |
| 186 | 186 | ) -> Result<usize, OrchestratorError> { | |
| 187 | - | // Get feed ID for this busser | |
| 187 | + | // Get all feed IDs for this busser (usually one, but can be multiple) | |
| 188 | 188 | let feeds = self.db.feeds().get_by_busser(plugin_id).await?; | |
| 189 | - | let feed_id = feeds.first().map(|f| f.id); | |
| 189 | + | let feed_ids: Vec<bb_db::FeedId> = feeds.iter().map(|f| f.id).collect(); | |
| 190 | 190 | ||
| 191 | - | let Some(feed_id) = feed_id else { | |
| 191 | + | if feed_ids.is_empty() { | |
| 192 | 192 | debug!(%plugin_id, "No feeds configured for plugin, skipping store"); | |
| 193 | 193 | return Ok(0); | |
| 194 | - | }; | |
| 195 | - | ||
| 196 | - | // Acquire the read lock only for the fetch call, then release it | |
| 197 | - | // before any async DB operations to avoid holding the lock across | |
| 198 | - | // blocking network I/O. | |
| 199 | - | let fetch_result = { | |
| 200 | - | let plugins = self.plugins.read().await; | |
| 201 | - | plugins.fetch(plugin_id, None) | |
| 202 | - | }; | |
| 194 | + | } | |
| 195 | + | // Primary feed for item storage (all items go under first feed) | |
| 196 | + | let feed_id = feed_ids[0]; | |
| 197 | + | ||
| 198 | + | // Run the blocking fetch on a dedicated thread to avoid starving | |
| 199 | + | // the tokio runtime. Plugin fetches do synchronous HTTP inside the | |
| 200 | + | // Rhai engine, which would block worker threads if run inline. | |
| 201 | + | let plugins = self.plugins.clone(); | |
| 202 | + | let pid = plugin_id.to_string(); | |
| 203 | + | let fetch_result = tokio::task::spawn_blocking(move || { | |
| 204 | + | let handle = tokio::runtime::Handle::current(); | |
| 205 | + | let plugins = handle.block_on(plugins.read()); | |
| 206 | + | plugins.fetch(&pid, None) | |
| 207 | + | }) | |
| 208 | + | .await | |
| 209 | + | .map_err(|e| OrchestratorError::Config(format!("Fetch task panicked: {e}")))?; | |
| 203 | 210 | ||
| 204 | 211 | let result = match fetch_result { | |
| 205 | 212 | Ok(r) => r, | |
| @@ -211,23 +218,25 @@ impl Orchestrator { | |||
| 211 | 218 | category = %structured.category, | |
| 212 | 219 | "Fetch failed" | |
| 213 | 220 | ); | |
| 214 | - | match self | |
| 215 | - | .db | |
| 216 | - | .feeds() | |
| 217 | - | .record_fetch_failure_structured(feed_id, &structured) | |
| 218 | - | .await | |
| 219 | - | { | |
| 220 | - | Ok(tripped) => { | |
| 221 | - | if tripped { | |
| 222 | - | info!( | |
| 223 | - | %feed_id, %plugin_id, | |
| 224 | - | category = %structured.category, | |
| 225 | - | "Circuit breaker tripped for feed" | |
| 226 | - | ); | |
| 221 | + | for fid in &feed_ids { | |
| 222 | + | match self | |
| 223 | + | .db | |
| 224 | + | .feeds() | |
| 225 | + | .record_fetch_failure_structured(*fid, &structured) | |
| 226 | + | .await | |
| 227 | + | { | |
| 228 | + | Ok(tripped) => { | |
| 229 | + | if tripped { | |
| 230 | + | info!( | |
| 231 | + | feed_id = %fid, %plugin_id, | |
| 232 | + | category = %structured.category, | |
| 233 | + | "Circuit breaker tripped for feed" | |
| 234 | + | ); | |
| 235 | + | } | |
| 236 | + | } | |
| 237 | + | Err(db_err) => { | |
| 238 | + | error!(error = %db_err, "Failed to record fetch failure"); | |
| 227 | 239 | } | |
| 228 | - | } | |
| 229 | - | Err(db_err) => { | |
| 230 | - | error!(error = %db_err, "Failed to record fetch failure"); | |
| 231 | 240 | } | |
| 232 | 241 | } | |
| 233 | 242 | return Err(e.into()); | |
| @@ -261,8 +270,10 @@ impl Orchestrator { | |||
| 261 | 270 | } | |
| 262 | 271 | } | |
| 263 | 272 | ||
| 264 | - | // Record successful fetch (resets failure counter) | |
| 265 | - | self.db.feeds().record_fetch_success(feed_id).await?; | |
| 273 | + | // Record successful fetch for all feeds (resets failure counter) | |
| 274 | + | for fid in &feed_ids { | |
| 275 | + | self.db.feeds().record_fetch_success(*fid).await?; | |
| 276 | + | } | |
| 266 | 277 | ||
| 267 | 278 | info!(count, %plugin_id, "Fetched items from plugin"); | |
| 268 | 279 | Ok(count) |
| @@ -30,6 +30,18 @@ const MAX_REQUESTS_PER_FETCH: usize = 100; | |||
| 30 | 30 | /// without this limit). | |
| 31 | 31 | pub(super) const MAX_FETCH_DURATION: Duration = Duration::from_secs(60); | |
| 32 | 32 | ||
| 33 | + | /// Check if a host is in the CGNAT/shared address space (100.64.0.0/10). | |
| 34 | + | fn is_cgnat(host: &str) -> bool { | |
| 35 | + | if let Some(rest) = host.strip_prefix("100.") { | |
| 36 | + | if let Some(second) = rest.split('.').next() { | |
| 37 | + | if let Ok(n) = second.parse::<u8>() { | |
| 38 | + | return (64..=127).contains(&n); | |
| 39 | + | } | |
| 40 | + | } | |
| 41 | + | } | |
| 42 | + | false | |
| 43 | + | } | |
| 44 | + | ||
| 33 | 45 | /// Validate a URL before making an HTTP request. Blocks non-HTTP schemes | |
| 34 | 46 | /// and requests to localhost/internal networks. | |
| 35 | 47 | fn validate_url(url: &str) -> Result<(), String> { | |
| @@ -59,7 +71,7 @@ fn validate_url(url: &str) -> Result<(), String> { | |||
| 59 | 71 | || host.starts_with("10.") | |
| 60 | 72 | || host.starts_with("192.168.") | |
| 61 | 73 | || host.starts_with("169.254.") | |
| 62 | - | || host.starts_with("100.") | |
| 74 | + | || is_cgnat(host) | |
| 63 | 75 | { | |
| 64 | 76 | return Err(format!("Blocked request to internal address: {}", url)); | |
| 65 | 77 | } | |
| @@ -85,6 +97,25 @@ fn validate_url(url: &str) -> Result<(), String> { | |||
| 85 | 97 | } | |
| 86 | 98 | } | |
| 87 | 99 | } | |
| 100 | + | // Block encoded IP forms: decimal (2130706433), hex (0x7f000001), | |
| 101 | + | // and octal-prefixed octets (0177.0.0.1). These bypass string-based | |
| 102 | + | // checks but are resolved by ureq to actual IPs. | |
| 103 | + | if host.starts_with("0x") || host.starts_with("0X") { | |
| 104 | + | return Err(format!("Blocked hex-encoded IP address: {}", url)); | |
| 105 | + | } | |
| 106 | + | // Pure-numeric hosts are decimal-encoded IPs (e.g. 2130706433 = 127.0.0.1) | |
| 107 | + | if host.chars().all(|c| c.is_ascii_digit()) && !host.is_empty() { | |
| 108 | + | return Err(format!("Blocked numeric IP address: {}", url)); | |
| 109 | + | } | |
| 110 | + | // Octal-prefixed octets (e.g. 0177.0.0.1) | |
| 111 | + | if host.contains('.') { | |
| 112 | + | let has_octal = host.split('.').any(|octet| { | |
| 113 | + | octet.len() > 1 && octet.starts_with('0') && octet.chars().all(|c| c.is_ascii_digit()) | |
| 114 | + | }); | |
| 115 | + | if has_octal { | |
| 116 | + | return Err(format!("Blocked octal-encoded IP address: {}", url)); | |
| 117 | + | } | |
| 118 | + | } | |
| 88 | 119 | Ok(()) | |
| 89 | 120 | } | |
| 90 | 121 | ||
| @@ -127,14 +158,28 @@ fn check_fetch_deadline(deadline: &AtomicU64) -> Result<(), String> { | |||
| 127 | 158 | /// Format: `BB_ERR:category:message` (or `BB_ERR:rate_limited:retry_secs:message`). | |
| 128 | 159 | const BB_ERR_PREFIX: &str = "BB_ERR:"; | |
| 129 | 160 | ||
| 161 | + | /// Create a ureq agent with redirects disabled to prevent SSRF via | |
| 162 | + | /// redirect to localhost/private IPs. | |
| 163 | + | fn make_agent() -> ureq::Agent { | |
| 164 | + | ureq::AgentBuilder::new() | |
| 165 | + | .timeout(HTTP_TIMEOUT) | |
| 166 | + | .redirects(0) | |
| 167 | + | .user_agent(USER_AGENT) | |
| 168 | + | .build() | |
| 169 | + | } | |
| 170 | + | ||
| 130 | 171 | /// Classify a `ureq::Error` into a `BB_ERR:`-prefixed string that carries | |
| 131 | 172 | /// error category information through the Rhai string-error channel. | |
| 132 | 173 | fn format_ureq_error(err: ureq::Error) -> String { | |
| 133 | 174 | match err { | |
| 134 | 175 | ureq::Error::Status(status, response) => { | |
| 135 | - | let body = response | |
| 136 | - | .into_string() | |
| 137 | - | .unwrap_or_default(); | |
| 176 | + | // Cap error body read to prevent OOM on malicious large error responses | |
| 177 | + | let mut bytes = Vec::new(); | |
| 178 | + | let _ = response | |
| 179 | + | .into_reader() | |
| 180 | + | .take(MAX_RESPONSE_BYTES) | |
| 181 | + | .read_to_end(&mut bytes); | |
| 182 | + | let body = String::from_utf8_lossy(&bytes); | |
| 138 | 183 | let body_preview = if body.len() > 200 { &body[..200] } else { &body }; | |
| 139 | 184 | match status { | |
| 140 | 185 | 401 | 403 => format!("{BB_ERR_PREFIX}auth:HTTP {status}: {body_preview}"), | |
| @@ -175,14 +220,14 @@ pub(super) fn register_host_functions( | |||
| 175 | 220 | // HTTP GET returning string (see trust model above) | |
| 176 | 221 | let counter = request_counter.clone(); | |
| 177 | 222 | let deadline = fetch_deadline.clone(); | |
| 223 | + | let agent = make_agent(); | |
| 224 | + | let agent_clone = agent.clone(); | |
| 178 | 225 | engine.register_fn("http_get", move |url: &str| -> Result<String, Box<rhai::EvalAltResult>> { | |
| 179 | 226 | validate_url(url)?; | |
| 180 | 227 | check_request_limit(&counter)?; | |
| 181 | 228 | check_fetch_deadline(&deadline)?; | |
| 182 | 229 | ||
| 183 | - | let response = ureq::get(url) | |
| 184 | - | .set("User-Agent", USER_AGENT) | |
| 185 | - | .timeout(HTTP_TIMEOUT) | |
| 230 | + | let response = agent_clone.get(url) | |
| 186 | 231 | .call() | |
| 187 | 232 | .map_err(format_ureq_error)?; | |
| 188 | 233 | ||
| @@ -205,9 +250,7 @@ pub(super) fn register_host_functions( | |||
| 205 | 250 | check_request_limit(&counter)?; | |
| 206 | 251 | check_fetch_deadline(&deadline)?; | |
| 207 | 252 | ||
| 208 | - | let response = ureq::get(url) | |
| 209 | - | .set("User-Agent", USER_AGENT) | |
| 210 | - | .timeout(HTTP_TIMEOUT) | |
| 253 | + | let response = agent.get(url) | |
| 211 | 254 | .call() | |
| 212 | 255 | .map_err(format_ureq_error)?; | |
| 213 | 256 | ||
| @@ -270,7 +313,7 @@ pub(super) fn register_host_functions( | |||
| 270 | 313 | ||
| 271 | 314 | // Truncate text with ellipsis (character-count aware for multibyte UTF-8) | |
| 272 | 315 | engine.register_fn("truncate", |text: &str, max_len: i64| -> String { | |
| 273 | - | let max = max_len as usize; | |
| 316 | + | let max = max_len.max(0) as usize; | |
| 274 | 317 | let char_count = text.chars().count(); | |
| 275 | 318 | if char_count <= max { | |
| 276 | 319 | text.to_string() |
| @@ -351,8 +351,11 @@ impl Default for RhaiPluginManager { | |||
| 351 | 351 | } | |
| 352 | 352 | ||
| 353 | 353 | #[tracing::instrument(skip_all)] | |
| 354 | - | /// Create a configured Rhai engine with all host functions registered | |
| 355 | - | pub fn create_engine() -> Engine { | |
| 354 | + | /// Create a configured Rhai engine with all host functions registered. | |
| 355 | + | /// | |
| 356 | + | /// If `set_deadline` is true, a 60-second aggregate fetch deadline is set | |
| 357 | + | /// (used for reader scripts). If false, no deadline is set (used for tests). | |
| 358 | + | fn create_engine_inner(set_deadline: bool) -> Engine { | |
| 356 | 359 | let mut engine = Engine::new(); | |
| 357 | 360 | engine.set_max_expr_depths(128, 128); | |
| 358 | 361 | engine.set_max_operations(100_000); | |
| @@ -363,11 +366,24 @@ pub fn create_engine() -> Engine { | |||
| 363 | 366 | // Disable `import` to prevent arbitrary file reads from disk. | |
| 364 | 367 | engine.set_module_resolver(rhai::module_resolvers::DummyModuleResolver::new()); | |
| 365 | 368 | let counter = Arc::new(AtomicUsize::new(0)); | |
| 366 | - | let deadline = Arc::new(AtomicU64::new(0)); | |
| 369 | + | let deadline = Arc::new(AtomicU64::new(if set_deadline { | |
| 370 | + | let now_ms = std::time::SystemTime::now() | |
| 371 | + | .duration_since(std::time::UNIX_EPOCH) | |
| 372 | + | .unwrap_or_default() | |
| 373 | + | .as_millis() as u64; | |
| 374 | + | now_ms + host_functions::MAX_FETCH_DURATION.as_millis() as u64 | |
| 375 | + | } else { | |
| 376 | + | 0 | |
| 377 | + | })); | |
| 367 | 378 | register_host_functions(&mut engine, counter, deadline); | |
| 368 | 379 | engine | |
| 369 | 380 | } | |
| 370 | 381 | ||
| 382 | + | /// Create a configured Rhai engine without a deadline (for tests and general use). | |
| 383 | + | pub fn create_engine() -> Engine { | |
| 384 | + | create_engine_inner(false) | |
| 385 | + | } | |
| 386 | + | ||
| 371 | 387 | // ── Per-node data size limits ──────────────────────────────────────────── | |
| 372 | 388 | // | |
| 373 | 389 | // Rhai's built-in max_string_size / max_array_size / max_map_size count | |
| @@ -440,7 +456,7 @@ pub struct ReaderResult { | |||
| 440 | 456 | #[tracing::instrument(skip_all)] | |
| 441 | 457 | /// Run the reader extraction plugin on a URL and return the extracted content | |
| 442 | 458 | pub fn run_reader_script(url: &str, plugins_dir: &Path) -> Result<ReaderResult, RhaiPluginError> { | |
| 443 | - | let engine = create_engine(); | |
| 459 | + | let engine = create_engine_inner(true); | |
| 444 | 460 | ||
| 445 | 461 | let plugin_path = plugins_dir.join("reader.rhai"); | |
| 446 | 462 | let script = std::fs::read_to_string(plugin_path).map_err(|e| { |
| @@ -26,6 +26,14 @@ impl Database { | |||
| 26 | 26 | pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> { | |
| 27 | 27 | let pool = SqlitePoolOptions::new() | |
| 28 | 28 | .max_connections(16) | |
| 29 | + | .after_connect(|conn, _meta| { | |
| 30 | + | Box::pin(async move { | |
| 31 | + | sqlx::query("PRAGMA foreign_keys = ON") | |
| 32 | + | .execute(&mut *conn) | |
| 33 | + | .await?; | |
| 34 | + | Ok(()) | |
| 35 | + | }) | |
| 36 | + | }) | |
| 29 | 37 | .connect(database_url) | |
| 30 | 38 | .await?; | |
| 31 | 39 |
| @@ -701,6 +701,32 @@ impl ItemsRepository { | |||
| 701 | 701 | Ok(row.0) | |
| 702 | 702 | } | |
| 703 | 703 | ||
| 704 | + | /// Count items matching a combination of source, unread, and starred filters. | |
| 705 | + | #[tracing::instrument(skip_all)] | |
| 706 | + | pub async fn count_filtered( | |
| 707 | + | &self, | |
| 708 | + | source: Option<&str>, | |
| 709 | + | unread_only: bool, | |
| 710 | + | starred_only: bool, | |
| 711 | + | ) -> Result<i64, sqlx::Error> { | |
| 712 | + | let mut sql = "SELECT COUNT(*) FROM feed_items WHERE 1=1".to_string(); | |
| 713 | + | if source.is_some() { | |
| 714 | + | sql.push_str(" AND busser_id = ?1"); | |
| 715 | + | } | |
| 716 | + | if unread_only { | |
| 717 | + | sql.push_str(" AND is_read = 0"); | |
| 718 | + | } | |
| 719 | + | if starred_only { | |
| 720 | + | sql.push_str(" AND is_starred = 1"); | |
| 721 | + | } | |
| 722 | + | let mut query = sqlx::query_as::<_, (i64,)>(&sql); | |
| 723 | + | if let Some(s) = source { | |
| 724 | + | query = query.bind(s); | |
| 725 | + | } | |
| 726 | + | let row = query.fetch_one(&self.pool).await?; | |
| 727 | + | Ok(row.0) | |
| 728 | + | } | |
| 729 | + | ||
| 704 | 730 | /// Get total and unread counts for all busser sources in a single query. | |
| 705 | 731 | /// | |
| 706 | 732 | /// Returns `(busser_id, total_count, unread_count)` tuples. This avoids | |
| @@ -1250,7 +1276,7 @@ impl BookmarksRepository { | |||
| 1250 | 1276 | if tag.is_empty() { | |
| 1251 | 1277 | continue; | |
| 1252 | 1278 | } | |
| 1253 | - | sqlx::query("INSERT INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)") | |
| 1279 | + | sqlx::query("INSERT OR IGNORE INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)") | |
| 1254 | 1280 | .bind(id) | |
| 1255 | 1281 | .bind(tag) | |
| 1256 | 1282 | .execute(&mut *tx) |
| @@ -118,7 +118,19 @@ impl FeedGenerator { | |||
| 118 | 118 | #[tracing::instrument(skip_all)] | |
| 119 | 119 | pub async fn get_all_items(&self) -> Result<PaginatedItems, FeedError> { | |
| 120 | 120 | let fetch_limit = Self::MAX_ALL_ITEMS + 1; | |
| 121 | - | let items = self.db.items().list_all(fetch_limit, 0).await?; | |
| 121 | + | // Push source/unread/starred/search filters to SQL for efficiency | |
| 122 | + | let items = self | |
| 123 | + | .db | |
| 124 | + | .items() | |
| 125 | + | .list_filtered( | |
| 126 | + | self.filter.search.as_deref(), | |
| 127 | + | self.filter.source.as_deref(), | |
| 128 | + | self.filter.unread_only, | |
| 129 | + | self.filter.starred_only, | |
| 130 | + | fetch_limit, | |
| 131 | + | 0, | |
| 132 | + | ) | |
| 133 | + | .await?; | |
| 122 | 134 | ||
| 123 | 135 | let mut feed_items: Vec<FeedItem> = items | |
| 124 | 136 | .into_iter() | |
| @@ -156,34 +168,15 @@ impl FeedGenerator { | |||
| 156 | 168 | /// Get total item count, respecting source, unread, and starred filters. | |
| 157 | 169 | #[tracing::instrument(skip_all)] | |
| 158 | 170 | pub async fn count(&self) -> Result<i64, FeedError> { | |
| 159 | - | // Use list_filtered with a large limit to count items that match all | |
| 160 | - | // SQL-level filters. This is less efficient than a dedicated COUNT query | |
| 161 | - | // for the combined case, but is correct for all filter combinations. | |
| 162 | - | if self.filter.source.is_some() && (self.filter.unread_only || self.filter.starred_only) { | |
| 163 | - | // Combined filters — count via filtered query | |
| 164 | - | let items = self | |
| 165 | - | .db | |
| 166 | - | .items() | |
| 167 | - | .list_filtered( | |
| 168 | - | None, | |
| 169 | - | self.filter.source.as_deref(), | |
| 170 | - | self.filter.unread_only, | |
| 171 | - | self.filter.starred_only, | |
| 172 | - | i64::MAX, | |
| 173 | - | 0, | |
| 174 | - | ) | |
| 175 | - | .await?; | |
| 176 | - | return Ok(items.len() as i64); | |
| 177 | - | } | |
| 178 | - | if let Some(ref source) = self.filter.source { | |
| 179 | - | Ok(self.db.items().count_by_busser(source).await?) | |
| 180 | - | } else if self.filter.unread_only { | |
| 181 | - | Ok(self.db.items().count_unread().await?) | |
| 182 | - | } else if self.filter.starred_only { | |
| 183 | - | Ok(self.db.items().count_starred().await?) | |
| 184 | - | } else { | |
| 185 | - | Ok(self.db.items().count_all().await?) | |
| 186 | - | } | |
| 171 | + | Ok(self | |
| 172 | + | .db | |
| 173 | + | .items() | |
| 174 | + | .count_filtered( | |
| 175 | + | self.filter.source.as_deref(), | |
| 176 | + | self.filter.unread_only, | |
| 177 | + | self.filter.starred_only, | |
| 178 | + | ) | |
| 179 | + | .await?) | |
| 187 | 180 | } | |
| 188 | 181 | ||
| 189 | 182 | /// Get unread count |
| @@ -43,20 +43,21 @@ impl OrderBy { | |||
| 43 | 43 | items.sort_by(|a, b| b.meta.published_at.cmp(&a.meta.published_at)); | |
| 44 | 44 | } | |
| 45 | 45 | OrderBy::Score => { | |
| 46 | + | // Use i64::MIN for None so scoreless items sort after all scored items | |
| 46 | 47 | items.sort_by(|a, b| { | |
| 47 | - | let score_a = a.meta.score.as_ref().copied().unwrap_or(0); | |
| 48 | - | let score_b = b.meta.score.as_ref().copied().unwrap_or(0); | |
| 48 | + | let score_a = a.meta.score.unwrap_or(i64::MIN); | |
| 49 | + | let score_b = b.meta.score.unwrap_or(i64::MIN); | |
| 49 | 50 | score_b.cmp(&score_a).then_with(|| { | |
| 50 | 51 | b.meta.published_at.cmp(&a.meta.published_at) | |
| 51 | 52 | }) | |
| 52 | 53 | }); | |
| 53 | 54 | } | |
| 54 | 55 | OrderBy::UnreadFirst => { | |
| 55 | - | // `b.is_read.cmp(&a.is_read)` sorts false (unread) before true (read) | |
| 56 | + | // `a.is_read.cmp(&b.is_read)` sorts false (unread) before true (read) | |
| 56 | 57 | // because false < true in Rust's bool Ord. | |
| 57 | 58 | items.sort_by(|a, b| { | |
| 58 | - | b.is_read | |
| 59 | - | .cmp(&a.is_read) | |
| 59 | + | a.is_read | |
| 60 | + | .cmp(&b.is_read) | |
| 60 | 61 | .then_with(|| b.meta.published_at.cmp(&a.meta.published_at)) | |
| 61 | 62 | }); | |
| 62 | 63 | } | |
| @@ -418,12 +419,12 @@ mod tests { | |||
| 418 | 419 | make_item_full("s", "unread2", 400, None, false, false), | |
| 419 | 420 | ]; | |
| 420 | 421 | OrderBy::UnreadFirst.apply(&mut items); | |
| 421 | - | // Items are grouped by read state, then by published_at descending within each group. | |
| 422 | - | // Note: the current sort uses b.is_read.cmp(&a.is_read) which groups | |
| 423 | - | // read=true items first (bool ordering: false < true). | |
| 424 | - | let first_group_read = items[0].is_read; | |
| 425 | - | let second_group_read = items[items.len() - 1].is_read; | |
| 426 | - | assert_ne!(first_group_read, second_group_read); | |
| 422 | + | // Unread items (is_read=false) come first, then read items. | |
| 423 | + | // Within each group, sorted by published_at descending. | |
| 424 | + | assert_eq!(items[0].id.item_id, "unread2"); | |
| 425 | + | assert_eq!(items[1].id.item_id, "unread1"); | |
| 426 | + | assert_eq!(items[2].id.item_id, "read1"); | |
| 427 | + | assert_eq!(items[3].id.item_id, "read2"); | |
| 427 | 428 | } | |
| 428 | 429 | ||
| 429 | 430 | #[test] |
| @@ -7,6 +7,17 @@ v0.3.1. Audit grade A. 601 tests (`--workspace`). | |||
| 7 | 7 | ||
| 8 | 8 | --- | |
| 9 | 9 | ||
| 10 | + | ## Sync Monetization | |
| 11 | + | ||
| 12 | + | BB is free. Cloud sync is the only revenue source. See `MNW/server/docs/internal/business/app_sync_pricing.md` for full pricing rationale. | |
| 13 | + | ||
| 14 | + | - [ ] Stripe product + prices: $1/mo monthly, $8/yr annual | |
| 15 | + | - [ ] Sync gate: check subscription status before enabling SyncKit sync | |
| 16 | + | - [ ] Subscription UI: in-app purchase/manage flow (settings panel) | |
| 17 | + | - [ ] Annual billing messaging: explain why annual is preferred (Stripe fee transparency) | |
| 18 | + | ||
| 19 | + | --- | |
| 20 | + | ||
| 10 | 21 | ## Fuzz Findings (2026-04-27) | |
| 11 | 22 | ||
| 12 | 23 | Findings from adversarial code review. Ordered by severity. | |
| @@ -65,6 +76,47 @@ Findings from adversarial code review. Ordered by severity. | |||
| 65 | 76 | ||
| 66 | 77 | --- | |
| 67 | 78 | ||
| 79 | + | ## Fuzz Findings (2026-05-02) | |
| 80 | + | ||
| 81 | + | Findings from adversarial code review. Ordered by severity. | |
| 82 | + | ||
| 83 | + | ### SERIOUS | |
| 84 | + | ||
| 85 | + | - [x] **`UnreadFirst` sorts read items first** (`ordering.rs:57-61`). Fixed: swapped to `a.is_read.cmp(&b.is_read)`. Fixed test to assert correct ordering. | |
| 86 | + | - [x] **`PRAGMA foreign_keys` never enabled** (`bb-db/src/lib.rs:26-31`). Fixed: added `after_connect` hook that runs `PRAGMA foreign_keys = ON` on every connection. | |
| 87 | + | - [x] **Feed edit "Save" is broken** (`sources.js:470`). Fixed: pass `feed.id` (UUID) instead of `source.id` (busser_id). | |
| 88 | + | - [x] **SSRF via HTTP redirect** (`host_functions.rs:183-196`). Fixed: created shared `ureq::Agent` with `redirects(0)`. Plugins no longer follow redirects. | |
| 89 | + | - [x] **Error response body read without size limit** (`host_functions.rs:133-157`). Fixed: `format_ureq_error` now reads error bodies via `.into_reader().take(MAX_RESPONSE_BYTES)`. | |
| 90 | + | - [x] **Tokio RwLock held across blocking HTTP** (`orchestrator.rs:199-202`). Fixed: wrapped `plugins.fetch()` in `tokio::task::spawn_blocking`. | |
| 91 | + | - [x] **Key file world-readable between write and chmod** (`crypto.rs:46-53`). Fixed: use `OpenOptionsExt::mode(0o600)` on Unix to set permissions atomically at creation. | |
| 92 | + | - [x] **Raw SQLite errors leaked to frontend in sync service** (`sync_service/*.rs`). Fixed: added `db_err()` helper that logs full error and returns generic message. Applied to all 27 call sites. | |
| 93 | + | ||
| 94 | + | ### MINOR | |
| 95 | + | ||
| 96 | + | - [x] **`count()` loads all rows into memory** (`generator/query.rs:162-176`). Fixed: added `count_filtered()` to `ItemsRepository` using `SELECT COUNT(*)` with dynamic WHERE clause. | |
| 97 | + | - [x] **`100.` prefix blocking overly broad** (`host_functions.rs:63`). Fixed: replaced with `is_cgnat()` that checks `100.64.0.0/10` only. | |
| 98 | + | - [x] **`validate_url` doesn't block encoded IPs** (`host_functions.rs:35-88`). Fixed: block hex-prefixed hosts, pure-numeric hosts (decimal IPs), and octal-prefixed octets. | |
| 99 | + | - [ ] ~~**Non-deterministic synthesized IDs** (`conversions.rs:280-286`)~~. False positive: `DefaultHasher::new()` uses fixed keys and IS deterministic across runs. | |
| 100 | + | - [x] **`truncate` with negative `max_len`** (`host_functions.rs:272-283`). Fixed: `max_len.max(0)` before cast to usize. | |
| 101 | + | - [x] **`bookmark_tags` INSERT without `OR IGNORE`** (`repository.rs:1253`). Fixed: added `OR IGNORE`. | |
| 102 | + | - [x] **Only first feed per plugin used** (`orchestrator.rs:189`). Fixed: track all feed IDs; record success/failure for every feed, not just first. | |
| 103 | + | - [x] **Decryption failure silently clears secret** (`crypto.rs:228-230`). Fixed: upgraded log level to `error` so it's visible in logs. The clear behavior is intentional (prevents ciphertext leakage to plugins). | |
| 104 | + | - [x] **Initial snapshot race** (`sync_scheduler.rs:150-165`). Fixed: moved snapshot creation after acquiring sync mutex. | |
| 105 | + | - [x] **Temp files never cleaned up** (`commands/items.rs:410-414`). Fixed: clean up `bb-downloads/` on startup; use PID-prefixed filenames to prevent collisions. | |
| 106 | + | - [x] **OAuth callback thread leak** (`commands/sync.rs:114-159`). Fixed: generation counter cancels previous callback servers when a new auth flow starts. | |
| 107 | + | - [x] **`Score` ordering NULL as 0** (`ordering.rs:46-52`). Fixed: use `i64::MIN` for None so scoreless items sort after all scored items. | |
| 108 | + | ||
| 109 | + | ### NOTE | |
| 110 | + | ||
| 111 | + | - [ ] **Aggregate memory per fetch uncapped** (`host_functions.rs:21,25`). 100 requests × 2MB = 200MB during execution. Acceptable: per-plugin, user-installed, single-threaded. | |
| 112 | + | - [x] **`run_reader_script` no timeout/validation** (`rhai_plugin/mod.rs:354-369`). Fixed: reader scripts now use 60-second aggregate deadline via `create_engine_inner(true)`. | |
| 113 | + | - [x] **`sanitizeHtml` missing `<meta>` tag** (`frontend/js/utils.js:21`). Fixed: added `meta` and `link` to `DANGEROUS_ELEMENTS`. | |
| 114 | + | - [ ] ~~**FTS5 rowid instability after VACUUM**~~ (`migrations/sqlite/005_create_fts.sql`). Won't fix: app never runs VACUUM. Documented for awareness. | |
| 115 | + | - [x] **`get_all_items` fetches then filters** (`generator/query.rs:119-153`). Fixed: use `list_filtered()` to push source/unread/starred/search to SQL. | |
| 116 | + | - [x] **Plugin errors leak internals** (`commands/error.rs:104`). Fixed: log full error server-side, send generic message to frontend. | |
| 117 | + | ||
| 118 | + | --- | |
| 119 | + | ||
| 68 | 120 | ## Phase 7: Plugin OAuth (Post-beta) | |
| 69 | 121 | ||
| 70 | 122 | ### 7A: OAuth Infrastructure |
| @@ -157,7 +157,7 @@ | |||
| 157 | 157 | <span class="source-name">${escapeHtml(qf.name)}</span> | |
| 158 | 158 | </div> | |
| 159 | 159 | <span class="source-actions"> | |
| 160 | - | <span class="source-count">${qf.matchCount}</span> | |
| 160 | + | <span class="source-count">${escapeHtml(String(qf.matchCount))}</span> | |
| 161 | 161 | <button class="source-edit" title="Edit query feed" aria-label="Edit query feed">⚙</button> | |
| 162 | 162 | <button class="source-delete" title="Delete query feed" aria-label="Delete query feed">×</button> | |
| 163 | 163 | </span> | |
| @@ -469,7 +469,7 @@ | |||
| 469 | 469 | onSubmit: async (data) => { | |
| 470 | 470 | const name = data.name; | |
| 471 | 471 | delete data.name; | |
| 472 | - | await BB.api.feeds.update(source.id, name, data); | |
| 472 | + | await BB.api.feeds.update(feed.id, name, data); | |
| 473 | 473 | BB.ui.showToast('Feed updated!'); | |
| 474 | 474 | load(); | |
| 475 | 475 | }, |
| @@ -20,6 +20,7 @@ | |||
| 20 | 20 | /** Set of element tag names that are stripped during sanitization. */ | |
| 21 | 21 | const DANGEROUS_ELEMENTS = new Set([ | |
| 22 | 22 | 'script', 'iframe', 'object', 'embed', 'form', 'style', 'base', 'svg', 'math', | |
| 23 | + | 'meta', 'link', | |
| 23 | 24 | ]); | |
| 24 | 25 | ||
| 25 | 26 | /** |
| @@ -101,7 +101,10 @@ impl From<bb_core::orchestrator::OrchestratorError> for ApiError { | |||
| 101 | 101 | tracing::error!(error = %e, "Database error"); | |
| 102 | 102 | Self::database("A database error occurred") | |
| 103 | 103 | } | |
| 104 | - | OrchestratorError::Plugin(e) => Self::plugin(e.to_string()), | |
| 104 | + | OrchestratorError::Plugin(e) => { | |
| 105 | + | tracing::error!(error = %e, "Plugin error"); | |
| 106 | + | Self::plugin("A plugin error occurred") | |
| 107 | + | } | |
| 105 | 108 | OrchestratorError::Feed(e) => { | |
| 106 | 109 | tracing::error!(error = %e, "Feed error"); | |
| 107 | 110 | Self::database("A feed query error occurred") |
| @@ -411,7 +411,9 @@ pub async fn download_and_open(url: String) -> Result<(), ApiError> { | |||
| 411 | 411 | std::fs::create_dir_all(&dir) | |
| 412 | 412 | .map_err(|e| ApiError::internal(format!("Failed to create temp dir: {}", e)))?; | |
| 413 | 413 | ||
| 414 | - | let path = dir.join(&filename); | |
| 414 | + | // Use a unique prefix to prevent filename collisions between different URLs | |
| 415 | + | let unique_name = format!("{}_{}", std::process::id(), filename); | |
| 416 | + | let path = dir.join(&unique_name); | |
| 415 | 417 | let mut file = std::fs::File::create(&path) | |
| 416 | 418 | .map_err(|e| ApiError::internal(format!("Failed to create file: {}", e)))?; | |
| 417 | 419 |
| @@ -97,9 +97,13 @@ fn generate_state() -> String { | |||
| 97 | 97 | ||
| 98 | 98 | // ── Callback server ── | |
| 99 | 99 | ||
| 100 | + | /// Shared flag to signal previous callback servers to stop. | |
| 101 | + | static CALLBACK_CANCEL: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); | |
| 102 | + | ||
| 100 | 103 | /// Start a minimal HTTP server on a random port that waits for the OAuth redirect. | |
| 101 | 104 | /// Returns the port. The server accepts one connection, parses the query string, | |
| 102 | 105 | /// responds with a success/error page, then shuts down. | |
| 106 | + | /// Any previously running callback server is cancelled via the shared generation counter. | |
| 103 | 107 | fn start_callback_server() -> Result<u16, ApiError> { | |
| 104 | 108 | let listener = std::net::TcpListener::bind("127.0.0.1:0") | |
| 105 | 109 | .map_err(|e| ApiError::internal(format!("Failed to bind callback server: {}", e)))?; | |
| @@ -111,12 +115,19 @@ fn start_callback_server() -> Result<u16, ApiError> { | |||
| 111 | 115 | .set_nonblocking(true) | |
| 112 | 116 | .map_err(|e| ApiError::internal(format!("Failed to set non-blocking: {}", e)))?; | |
| 113 | 117 | ||
| 118 | + | // Increment generation to cancel any previous callback server thread | |
| 119 | + | let generation = CALLBACK_CANCEL.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; | |
| 120 | + | ||
| 114 | 121 | std::thread::spawn(move || { | |
| 115 | 122 | use std::io::{Read, Write}; | |
| 116 | 123 | ||
| 117 | 124 | let timeout = std::time::Instant::now() + std::time::Duration::from_secs(300); | |
| 118 | 125 | ||
| 119 | 126 | while std::time::Instant::now() < timeout { | |
| 127 | + | // Check if a newer callback server has been started | |
| 128 | + | if CALLBACK_CANCEL.load(std::sync::atomic::Ordering::Relaxed) != generation { | |
| 129 | + | break; | |
| 130 | + | } | |
| 120 | 131 | match listener.accept() { | |
| 121 | 132 | Ok((mut stream, _)) => { | |
| 122 | 133 | let mut buf = [0u8; 4096]; |
| @@ -99,6 +99,14 @@ impl AppState { | |||
| 99 | 99 | // Initialize SyncKit client from saved key or env vars | |
| 100 | 100 | let sync_client = load_sync_client(&app_data_dir); | |
| 101 | 101 | ||
| 102 | + | // Clean up old download temp files from previous sessions | |
| 103 | + | let downloads_dir = std::env::temp_dir().join("bb-downloads"); | |
| 104 | + | if downloads_dir.exists() { | |
| 105 | + | if let Err(e) = std::fs::remove_dir_all(&downloads_dir) { | |
| 106 | + | debug!(error = %e, "Failed to clean up old download temp files"); | |
| 107 | + | } | |
| 108 | + | } | |
| 109 | + | ||
| 102 | 110 | info!("Application state initialized"); | |
| 103 | 111 | ||
| 104 | 112 | Ok(Self { |
| @@ -146,7 +146,11 @@ pub fn start_sync_scheduler(app: AppHandle) -> tokio::task::AbortHandle { | |||
| 146 | 146 | } | |
| 147 | 147 | } | |
| 148 | 148 | ||
| 149 | - | // Create initial snapshot on first sync | |
| 149 | + | // Prevent concurrent sync operations (manual + scheduler). | |
| 150 | + | let _sync_guard = state.sync_mutex.lock().await; | |
| 151 | + | ||
| 152 | + | // Create initial snapshot on first sync (inside mutex to prevent | |
| 153 | + | // duplicate snapshots from concurrent manual + scheduled sync). | |
| 150 | 154 | let snapshot_done = sync_service::get_sync_state(pool, "initial_snapshot_done") | |
| 151 | 155 | .await | |
| 152 | 156 | .unwrap_or_default(); | |
| @@ -161,9 +165,6 @@ pub fn start_sync_scheduler(app: AppHandle) -> tokio::task::AbortHandle { | |||
| 161 | 165 | } | |
| 162 | 166 | } | |
| 163 | 167 | ||
| 164 | - | // Prevent concurrent sync operations (manual + scheduler). | |
| 165 | - | let _sync_guard = state.sync_mutex.lock().await; | |
| 166 | - | ||
| 167 | 168 | // Perform sync | |
| 168 | 169 | match sync_service::perform_sync(pool, &client).await { | |
| 169 | 170 | Ok(result) => { |
| @@ -56,13 +56,13 @@ pub async fn pull_changes( | |||
| 56 | 56 | /// the flag — preventing a stuck flag from suppressing local changelog | |
| 57 | 57 | /// entries until the next sync. | |
| 58 | 58 | async fn apply_remote_changes(pool: &SqlitePool, changes: Vec<ChangeEntry>) -> Result<(), ApiError> { | |
| 59 | - | let mut tx = pool.begin().await.map_err(|e| ApiError::database(e.to_string()))?; | |
| 59 | + | let mut tx = pool.begin().await.map_err(super::db_err)?; | |
| 60 | 60 | ||
| 61 | 61 | // Set flag inside the transaction | |
| 62 | 62 | sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES ('applying_remote', '1')") | |
| 63 | 63 | .execute(&mut *tx) | |
| 64 | 64 | .await | |
| 65 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 65 | + | .map_err(super::db_err)?; | |
| 66 | 66 | ||
| 67 | 67 | let result = apply_changes_inner_tx(&mut tx, changes).await; | |
| 68 | 68 | ||
| @@ -70,12 +70,12 @@ async fn apply_remote_changes(pool: &SqlitePool, changes: Vec<ChangeEntry>) -> R | |||
| 70 | 70 | sqlx::query("INSERT OR REPLACE INTO sync_state (key, value) VALUES ('applying_remote', '0')") | |
| 71 | 71 | .execute(&mut *tx) | |
| 72 | 72 | .await | |
| 73 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 73 | + | .map_err(super::db_err)?; | |
| 74 | 74 | ||
| 75 | 75 | // If data application failed, still commit the flag clear (the partial | |
| 76 | 76 | // changes are acceptable since cursor hasn't been updated yet, and | |
| 77 | 77 | // re-pulling will re-apply the full batch idempotently). | |
| 78 | - | tx.commit().await.map_err(|e| ApiError::database(e.to_string()))?; | |
| 78 | + | tx.commit().await.map_err(super::db_err)?; | |
| 79 | 79 | ||
| 80 | 80 | result | |
| 81 | 81 | } | |
| @@ -156,7 +156,7 @@ async fn apply_upsert( | |||
| 156 | 156 | row_id: &str, | |
| 157 | 157 | data: &serde_json::Value, | |
| 158 | 158 | ) -> Result<(), ApiError> { | |
| 159 | - | let mut conn = pool.acquire().await.map_err(|e| ApiError::database(e.to_string()))?; | |
| 159 | + | let mut conn = pool.acquire().await.map_err(super::db_err)?; | |
| 160 | 160 | apply_upsert_exec(&mut *conn, table, row_id, data).await | |
| 161 | 161 | } | |
| 162 | 162 | ||
| @@ -187,7 +187,7 @@ async fn apply_upsert_exec( | |||
| 187 | 187 | .bind(id) | |
| 188 | 188 | .execute(&mut *conn) | |
| 189 | 189 | .await | |
| 190 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 190 | + | .map_err(super::db_err)?; | |
| 191 | 191 | return Ok(()); | |
| 192 | 192 | } | |
| 193 | 193 | ||
| @@ -256,7 +256,7 @@ async fn apply_upsert_exec( | |||
| 256 | 256 | query | |
| 257 | 257 | .execute(&mut *conn) | |
| 258 | 258 | .await | |
| 259 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 259 | + | .map_err(super::db_err)?; | |
| 260 | 260 | ||
| 261 | 261 | Ok(()) | |
| 262 | 262 | } | |
| @@ -272,7 +272,7 @@ fn json_to_i32(val: &serde_json::Value) -> i32 { | |||
| 272 | 272 | /// Apply a DELETE for a remote change (pool version, used by tests). | |
| 273 | 273 | #[cfg(test)] | |
| 274 | 274 | async fn apply_delete(pool: &SqlitePool, table: &str, row_id: &str) -> Result<(), ApiError> { | |
| 275 | - | let mut conn = pool.acquire().await.map_err(|e| ApiError::database(e.to_string()))?; | |
| 275 | + | let mut conn = pool.acquire().await.map_err(super::db_err)?; | |
| 276 | 276 | apply_delete_exec(&mut *conn, table, row_id).await | |
| 277 | 277 | } | |
| 278 | 278 | ||
| @@ -308,7 +308,7 @@ async fn apply_delete_exec( | |||
| 308 | 308 | .bind(tag) | |
| 309 | 309 | .execute(&mut *conn) | |
| 310 | 310 | .await | |
| 311 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 311 | + | .map_err(super::db_err)?; | |
| 312 | 312 | } | |
| 313 | 313 | return Ok(()); | |
| 314 | 314 | } | |
| @@ -318,7 +318,7 @@ async fn apply_delete_exec( | |||
| 318 | 318 | .bind(row_id) | |
| 319 | 319 | .execute(&mut *conn) | |
| 320 | 320 | .await | |
| 321 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 321 | + | .map_err(super::db_err)?; | |
| 322 | 322 | ||
| 323 | 323 | Ok(()) | |
| 324 | 324 | } |
| @@ -17,6 +17,14 @@ pub use download::*; | |||
| 17 | 17 | pub use snapshot::*; | |
| 18 | 18 | pub use upload::*; | |
| 19 | 19 | ||
| 20 | + | /// Convert a sqlx error to an ApiError, logging the full error server-side | |
| 21 | + | /// and returning a generic message to the frontend. Mirrors the behavior of | |
| 22 | + | /// `From<sqlx::Error> for ApiError` in commands/error.rs. | |
| 23 | + | pub(crate) fn db_err(e: sqlx::Error) -> ApiError { | |
| 24 | + | tracing::error!(error = %e, "Sync database error"); | |
| 25 | + | ApiError::database("A database error occurred") | |
| 26 | + | } | |
| 27 | + | ||
| 20 | 28 | /// Maximum changes to push in a single batch. | |
| 21 | 29 | pub(crate) const PUSH_BATCH_LIMIT: i64 = 500; | |
| 22 | 30 | ||
| @@ -47,7 +55,7 @@ pub async fn get_sync_state(pool: &SqlitePool, key: &str) -> Result<String, ApiE | |||
| 47 | 55 | .bind(key) | |
| 48 | 56 | .fetch_optional(pool) | |
| 49 | 57 | .await | |
| 50 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 58 | + | .map_err(db_err)?; | |
| 51 | 59 | ||
| 52 | 60 | Ok(row.map(|r| r.0).unwrap_or_default()) | |
| 53 | 61 | } | |
| @@ -59,7 +67,7 @@ pub async fn set_sync_state(pool: &SqlitePool, key: &str, value: &str) -> Result | |||
| 59 | 67 | .bind(value) | |
| 60 | 68 | .execute(pool) | |
| 61 | 69 | .await | |
| 62 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 70 | + | .map_err(db_err)?; | |
| 63 | 71 | Ok(()) | |
| 64 | 72 | } | |
| 65 | 73 | ||
| @@ -69,11 +77,11 @@ pub async fn clear_all_sync_state(pool: &SqlitePool) -> Result<(), ApiError> { | |||
| 69 | 77 | sqlx::query("DELETE FROM sync_state") | |
| 70 | 78 | .execute(pool) | |
| 71 | 79 | .await | |
| 72 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 80 | + | .map_err(db_err)?; | |
| 73 | 81 | sqlx::query("DELETE FROM sync_changelog") | |
| 74 | 82 | .execute(pool) | |
| 75 | 83 | .await | |
| 76 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 84 | + | .map_err(db_err)?; | |
| 77 | 85 | Ok(()) | |
| 78 | 86 | } | |
| 79 | 87 |
| @@ -26,7 +26,7 @@ pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, ApiError> | |||
| 26 | 26 | ) | |
| 27 | 27 | .execute(pool) | |
| 28 | 28 | .await | |
| 29 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 29 | + | .map_err(super::db_err)?; | |
| 30 | 30 | total += result.rows_affected() as i64; | |
| 31 | 31 | ||
| 32 | 32 | // feed_tags | |
| @@ -37,7 +37,7 @@ pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, ApiError> | |||
| 37 | 37 | ) | |
| 38 | 38 | .execute(pool) | |
| 39 | 39 | .await | |
| 40 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 40 | + | .map_err(super::db_err)?; | |
| 41 | 41 | total += result.rows_affected() as i64; | |
| 42 | 42 | ||
| 43 | 43 | // query_feeds | |
| @@ -50,7 +50,7 @@ pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, ApiError> | |||
| 50 | 50 | ) | |
| 51 | 51 | .execute(pool) | |
| 52 | 52 | .await | |
| 53 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 53 | + | .map_err(super::db_err)?; | |
| 54 | 54 | total += result.rows_affected() as i64; | |
| 55 | 55 | ||
| 56 | 56 | // bookmarks | |
| @@ -65,7 +65,7 @@ pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, ApiError> | |||
| 65 | 65 | ) | |
| 66 | 66 | .execute(pool) | |
| 67 | 67 | .await | |
| 68 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 68 | + | .map_err(super::db_err)?; | |
| 69 | 69 | total += result.rows_affected() as i64; | |
| 70 | 70 | ||
| 71 | 71 | // bookmark_tags | |
| @@ -76,7 +76,7 @@ pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, ApiError> | |||
| 76 | 76 | ) | |
| 77 | 77 | .execute(pool) | |
| 78 | 78 | .await | |
| 79 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 79 | + | .map_err(super::db_err)?; | |
| 80 | 80 | total += result.rows_affected() as i64; | |
| 81 | 81 | ||
| 82 | 82 | // user_config | |
| @@ -87,7 +87,7 @@ pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, ApiError> | |||
| 87 | 87 | ) | |
| 88 | 88 | .execute(pool) | |
| 89 | 89 | .await | |
| 90 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 90 | + | .map_err(super::db_err)?; | |
| 91 | 91 | total += result.rows_affected() as i64; | |
| 92 | 92 | ||
| 93 | 93 | // feed_items (user state only — is_read, is_starred) | |
| @@ -99,7 +99,7 @@ pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, ApiError> | |||
| 99 | 99 | ) | |
| 100 | 100 | .execute(pool) | |
| 101 | 101 | .await | |
| 102 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 102 | + | .map_err(super::db_err)?; | |
| 103 | 103 | total += result.rows_affected() as i64; | |
| 104 | 104 | ||
| 105 | 105 | set_sync_state(pool, "initial_snapshot_done", "1").await?; | |
| @@ -117,7 +117,7 @@ pub async fn cleanup_changelog(pool: &SqlitePool) -> Result<i64, ApiError> { | |||
| 117 | 117 | ) | |
| 118 | 118 | .execute(pool) | |
| 119 | 119 | .await | |
| 120 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 120 | + | .map_err(super::db_err)?; | |
| 121 | 121 | ||
| 122 | 122 | let deleted = result.rows_affected() as i64; | |
| 123 | 123 | if deleted > 0 { | |
| @@ -134,7 +134,7 @@ pub async fn enforce_changelog_retention(pool: &SqlitePool) -> Result<i64, ApiEr | |||
| 134 | 134 | let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") | |
| 135 | 135 | .fetch_one(pool) | |
| 136 | 136 | .await | |
| 137 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 137 | + | .map_err(super::db_err)?; | |
| 138 | 138 | ||
| 139 | 139 | if count <= MAX_CHANGELOG_ENTRIES { | |
| 140 | 140 | return Ok(0); | |
| @@ -149,7 +149,7 @@ pub async fn enforce_changelog_retention(pool: &SqlitePool) -> Result<i64, ApiEr | |||
| 149 | 149 | .bind(excess) | |
| 150 | 150 | .execute(pool) | |
| 151 | 151 | .await | |
| 152 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 152 | + | .map_err(super::db_err)?; | |
| 153 | 153 | ||
| 154 | 154 | let deleted = result.rows_affected() as i64; | |
| 155 | 155 | warn!( | |
| @@ -167,7 +167,7 @@ pub async fn count_pending_changes(pool: &SqlitePool) -> Result<i64, ApiError> { | |||
| 167 | 167 | let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0") | |
| 168 | 168 | .fetch_one(pool) | |
| 169 | 169 | .await | |
| 170 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 170 | + | .map_err(super::db_err)?; | |
| 171 | 171 | Ok(row.0) | |
| 172 | 172 | } | |
| 173 | 173 |
| @@ -23,7 +23,7 @@ pub async fn push_changes( | |||
| 23 | 23 | .bind(PUSH_BATCH_LIMIT) | |
| 24 | 24 | .fetch_all(pool) | |
| 25 | 25 | .await | |
| 26 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 26 | + | .map_err(super::db_err)?; | |
| 27 | 27 | ||
| 28 | 28 | if rows.is_empty() { | |
| 29 | 29 | return Ok(0); | |
| @@ -101,7 +101,7 @@ pub async fn push_changes( | |||
| 101 | 101 | query | |
| 102 | 102 | .execute(pool) | |
| 103 | 103 | .await | |
| 104 | - | .map_err(|e| ApiError::database(e.to_string()))?; | |
| 104 | + | .map_err(super::db_err)?; | |
| 105 | 105 | } | |
| 106 | 106 | ||
| 107 | 107 | debug!(count, "Pushed changes"); |