Skip to main content

max / balanced_breakfast

Theme standardization, plugin sandboxing, sync service split, integration tests Map all 14 TOML theme slots (GO-style COLOR_MAP), add contrastColor() for text-on-accent, theme import/export commands, fix hardcoded colors in HTML export. Rename CSS vars (--accent→--accent-red, --yolk→--accent-yellow, --success→--accent-green), add --accent-blue/--accent-purple/--accent-cyan/ --bg-surface/--text-on-accent. Primary actions use accent-blue, logo uses accent-blue. Also: Rhai plugin sandbox hardening, sync service modularization, split command_integration.rs into per-domain test files, feed generator split, bb-interface error types, navigation JS, OTA updater, mobile layout improvements, query feeds, new API endpoints, docs updates. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-04-08 20:12 UTC
Commit: 319bc1e996641c851c1c90d046e6370ac1ccfdc5
Parent: 2a421da
78 files changed, +8715 insertions, -4896 deletions
A CHANGELOG.md +27
@@ -0,0 +1,27 @@
1 + # Changelog
2 +
3 + All notable changes to Balanced Breakfast will be documented in this file.
4 +
5 + Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). Versioning follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
6 +
7 + ## [0.3.0] — 2026-03-28
8 +
9 + First beta-ready release.
10 +
11 + ### Added
12 + - Cloud sync via SyncKit SDK (configs, feed sources, plugin manifests)
13 + - OTA updates via tauri-plugin-updater
14 + - Rhai plugin system for custom feed sources ("bussers")
15 + - RSS, Atom, JSON Feed, Hacker News, and arXiv source types
16 + - FTS5 full-text search across all feed items
17 + - Circuit breaker for failing feed sources
18 + - 16 bundled color themes
19 + - macOS, Windows, and Linux builds (signed and notarized on macOS)
20 +
21 + ### Fixed
22 + - All issues identified in audit runs 1–12
23 + - Concurrency and type safety patterns (Rust patterns audit)
24 +
25 + ### Security
26 + - Full observability instrumentation (61 traced functions)
27 + - No production unsafe code
M Cargo.lock +14 -12
@@ -210,6 +210,7 @@ dependencies = [
210 210 "toml 0.8.2",
211 211 "tracing",
212 212 "tracing-subscriber",
213 + "ureq",
213 214 "uuid",
214 215 ]
215 216
@@ -296,6 +297,7 @@ version = "0.3.0"
296 297 dependencies = [
297 298 "chrono",
298 299 "serde",
300 + "serde_json",
299 301 ]
300 302
301 303 [[package]]
@@ -918,7 +920,7 @@ dependencies = [
918 920 "libc",
919 921 "option-ext",
920 922 "redox_users",
921 - "windows-sys 0.61.2",
923 + "windows-sys 0.59.0",
922 924 ]
923 925
924 926 [[package]]
@@ -1086,7 +1088,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
1086 1088 checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
1087 1089 dependencies = [
1088 1090 "libc",
1089 - "windows-sys 0.61.2",
1091 + "windows-sys 0.59.0",
1090 1092 ]
1091 1093
1092 1094 [[package]]
@@ -2639,7 +2641,7 @@ version = "0.50.3"
2639 2641 source = "registry+https://github.com/rust-lang/crates.io-index"
2640 2642 checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
2641 2643 dependencies = [
2642 - "windows-sys 0.61.2",
2644 + "windows-sys 0.59.0",
2643 2645 ]
2644 2646
2645 2647 [[package]]
@@ -3024,7 +3026,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
3024 3026 checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967"
3025 3027 dependencies = [
3026 3028 "libc",
3027 - "windows-sys 0.61.2",
3029 + "windows-sys 0.45.0",
3028 3030 ]
3029 3031
3030 3032 [[package]]
@@ -3883,7 +3885,7 @@ dependencies = [
3883 3885 "errno",
3884 3886 "libc",
3885 3887 "linux-raw-sys",
3886 - "windows-sys 0.61.2",
3888 + "windows-sys 0.59.0",
3887 3889 ]
3888 3890
3889 3891 [[package]]
@@ -3940,7 +3942,7 @@ dependencies = [
3940 3942 "security-framework",
3941 3943 "security-framework-sys",
3942 3944 "webpki-root-certs",
3943 - "windows-sys 0.61.2",
3945 + "windows-sys 0.59.0",
3944 3946 ]
3945 3947
3946 3948 [[package]]
@@ -3951,9 +3953,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f"
3951 3953
3952 3954 [[package]]
3953 3955 name = "rustls-webpki"
3954 - version = "0.103.9"
3956 + version = "0.103.10"
3955 3957 source = "registry+https://github.com/rust-lang/crates.io-index"
3956 - checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53"
3958 + checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
3957 3959 dependencies = [
3958 3960 "ring",
3959 3961 "rustls-pki-types",
@@ -4939,9 +4941,9 @@ dependencies = [
4939 4941
4940 4942 [[package]]
4941 4943 name = "tar"
4942 - version = "0.4.44"
4944 + version = "0.4.45"
4943 4945 source = "registry+https://github.com/rust-lang/crates.io-index"
4944 - checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a"
4946 + checksum = "22692a6476a21fa75fdfc11d452fda482af402c008cdbaf3476414e122040973"
4945 4947 dependencies = [
4946 4948 "filetime",
4947 4949 "libc",
@@ -5305,7 +5307,7 @@ dependencies = [
5305 5307 "getrandom 0.3.4",
5306 5308 "once_cell",
5307 5309 "rustix",
5308 - "windows-sys 0.61.2",
5310 + "windows-sys 0.59.0",
5309 5311 ]
5310 5312
5311 5313 [[package]]
@@ -6283,7 +6285,7 @@ version = "0.1.11"
6283 6285 source = "registry+https://github.com/rust-lang/crates.io-index"
6284 6286 checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
6285 6287 dependencies = [
6286 - "windows-sys 0.61.2",
6288 + "windows-sys 0.48.0",
6287 6289 ]
6288 6290
6289 6291 [[package]]
M README.md +47 -12
@@ -24,7 +24,7 @@ A desktop feed aggregator that unifies RSS, Hacker News, arXiv, and other source
24 24 # Development (hot-reload frontend, debug backend)
25 25 cargo tauri dev
26 26
27 - # Production build
27 + # Production build (macOS DMG, Windows installer, Linux AppImage)
28 28 cargo tauri build
29 29
30 30 # Run all workspace tests
@@ -45,6 +45,22 @@ The project is a Cargo workspace with four library crates and one application cr
45 45
46 46 Dependency flow: `bb-interface` is leaf (no internal deps) -> `bb-core` and `bb-feed` depend on `bb-interface` -> `bb-db` depends on `bb-interface` -> `src-tauri` depends on all four.
47 47
48 + Shared libraries from `../Shared/`: [theme-common](../Shared/theme-common/) (theme loading), [tagtree](../Shared/tagtree/) (tag validation), [synckit-client](../Shared/synckit-client/) (cloud sync SDK).
49 +
50 + ## Features
51 +
52 + - **Unified timeline** -- RSS, Atom, JSON Feed, Hacker News, arXiv, and custom sources merged into one feed
53 + - **Plugin system** -- Rhai scripting for extensible feed fetching (write a plugin for any source)
54 + - **Reader view** -- clean article rendering with HTML sanitization via DocEngine
55 + - **Search** -- FTS5 full-text search across all items with sanitized indexing
56 + - **Organization** -- tags, starred items, read/unread tracking, query feeds (saved dynamic filters)
57 + - **Auto-fetch scheduling** -- configurable per-plugin fetch intervals, circuit breaker after consecutive failures
58 + - **Feed health tracking** -- visual status indicators (green/yellow/red) per source
59 + - **Cloud sync** -- SyncKit integration with E2E encryption (feeds, tags, read state, preferences)
60 + - **OTA updates** -- background update checker with consent dialog (Tauri updater v2)
61 + - **17 bundled themes** -- dark, light, and high-contrast variants in TOML format, system auto-detection
62 + - **Platforms** -- macOS, Windows, Linux (native via Tauri 2)
63 +
48 64 ## Plugin Authoring
49 65
50 66 Plugins ("bussers") are `.rhai` script files. Drop one into the plugins directory and it loads on next launch.
@@ -54,19 +70,38 @@ Plugins ("bussers") are `.rhai` script files. Drop one into the plugins director
54 70
55 71 Every plugin defines four functions (`id`, `name`, `config_schema`, `fetch`) plus an optional `capabilities()`. Full authoring guide with field types, return shapes, host functions, and examples: [docs/plugin_authoring.md](docs/plugin_authoring.md).
56 72
57 - ## Features
58 -
59 - - **Unified timeline** -- RSS, Hacker News, arXiv, and custom sources merged into one feed
60 - - **Plugin system** -- Rhai scripting for extensible feed fetching (write a plugin for any source)
61 - - **Reader view** -- clean article rendering with HTML sanitization
62 - - **Search** -- FTS5 full-text search across all items
63 - - **Organization** -- tags, starred items, read/unread tracking, query feeds (saved filters)
64 - - **Cloud sync** -- SyncKit integration with E2E encryption (feeds, tags, read state, preferences)
65 - - **Themes** -- light and dark themes, system auto-detection
66 -
67 73 ## Bundled Plugins
68 74
69 - Four plugins ship with the app: **rss.rhai** (RSS/Atom/JSON Feed), **hackernews.rhai** (HN stories), **arxiv.rhai** (arXiv papers), **reader.rhai** (web page reader view).
75 + Eleven plugins ship with the app:
76 +
77 + | Plugin | Source |
78 + |--------|--------|
79 + | `rss.rhai` | RSS, Atom, and JSON Feed |
80 + | `hackernews.rhai` | Hacker News stories |
81 + | `arxiv.rhai` | arXiv papers |
82 + | `reader.rhai` | Web page reader view |
83 + | `github_trending.rhai` | GitHub trending repositories |
84 + | `devto.rhai` | Dev.to articles |
85 + | `lobsters.rhai` | Lobsters community |
86 + | `xkcd.rhai` | XKCD comics |
87 + | `nasa_apod.rhai` | NASA Astronomy Picture of the Day |
88 + | `earthquakes.rhai` | USGS earthquake data |
89 + | `nws_alerts.rhai` | National Weather Service alerts |
90 +
91 + ## Key Paths
92 +
93 + | What | Where |
94 + |------|-------|
95 + | Plugin contract types | `crates/bb-interface/src/` |
96 + | Plugin runtime + orchestrator | `crates/bb-core/src/` |
97 + | Feed aggregation | `crates/bb-feed/src/` |
98 + | Database layer | `crates/bb-db/src/` |
99 + | Tauri commands | `src-tauri/src/commands/` |
100 + | Frontend JS | `src-tauri/frontend/js/` |
101 + | Styles | `src-tauri/frontend/css/styles.css` |
102 + | Bundled plugins | `plugins/` |
103 + | Plugin authoring guide | `docs/plugin_authoring.md` |
104 + | Architecture | `docs/architecture.md` |
70 105
71 106 ## License
72 107
@@ -77,7 +77,9 @@ pub fn load_or_create_key_from_keychain(file_path: &Path) -> Result<[u8; 32], St
77 77 let b64 = BASE64.encode(key);
78 78 if entry.set_password(&b64).is_ok() {
79 79 // Delete the file now that it's in the keychain
80 - let _ = std::fs::remove_file(file_path);
80 + if let Err(e) = std::fs::remove_file(file_path) {
81 + tracing::warn!(error = %e, path = %file_path.display(), "Failed to delete encryption key file after keychain migration");
82 + }
81 83 tracing::info!("Migrated encryption key from file to keychain");
82 84 }
83 85 return Ok(key);
@@ -13,4 +13,6 @@ pub mod url_cleaner;
13 13
14 14 pub use orchestrator::*;
15 15 pub use plugin_manager::*;
16 - pub use rhai_plugin::{ReaderResult, RhaiPlugin, RhaiPluginError, RhaiPluginManager};
16 + pub use rhai_plugin::{
17 + classify_error, ReaderResult, RhaiPlugin, RhaiPluginError, RhaiPluginManager,
18 + };
@@ -12,6 +12,7 @@ use thiserror::Error;
12 12 use tokio::sync::RwLock;
13 13 use tracing::{debug, error, info, instrument};
14 14
15 + use crate::rhai_plugin::classify_error;
15 16 use crate::url_cleaner;
16 17 use crate::PluginManager;
17 18
@@ -200,18 +201,24 @@ impl Orchestrator {
200 201 let result = match fetch_result {
201 202 Ok(r) => r,
202 203 Err(e) => {
203 - // Record fetch failure before propagating
204 - let error_msg = e.to_string();
204 + // Classify the error and record structured failure
205 + let structured = classify_error(&e.to_string());
206 + debug!(
207 + %feed_id, %plugin_id,
208 + category = %structured.category,
209 + "Fetch failed"
210 + );
205 211 match self
206 212 .db
207 213 .feeds()
208 - .record_fetch_failure(feed_id, &error_msg)
214 + .record_fetch_failure_structured(feed_id, &structured)
209 215 .await
210 216 {
211 217 Ok(tripped) => {
212 218 if tripped {
213 219 info!(
214 220 %feed_id, %plugin_id,
221 + category = %structured.category,
215 222 "Circuit breaker tripped for feed"
216 223 );
217 224 }
@@ -469,6 +476,7 @@ mod tests {
469 476 source_name: "Test".to_string(),
470 477 score: None,
471 478 tags: vec![],
479 + actions: vec![],
472 480 };
473 481
474 482 db.items().upsert(item).await.unwrap();
@@ -518,6 +526,7 @@ mod tests {
518 526 source_name: "Test".to_string(),
519 527 score: None,
520 528 tags: vec![],
529 + actions: vec![],
521 530 };
522 531
523 532 // Insert the same item twice (same external_id).
@@ -11,7 +11,9 @@ use bb_interface::{BusserCapabilities, BusserConfig, ConfigSchema, FetchResult};
11 11 use thiserror::Error;
12 12 use tracing::{debug, error, info, warn};
13 13
14 - use crate::rhai_plugin::{RhaiPluginError, RhaiPluginManager};
14 + use bb_interface::StructuredError;
15 +
16 + use crate::rhai_plugin::{classify_error, RhaiPluginError, RhaiPluginManager};
15 17
16 18 #[derive(Error, Debug)]
17 19 pub enum PluginError {
@@ -38,6 +40,17 @@ pub enum PluginError {
38 40 RhaiError(#[from] RhaiPluginError),
39 41 }
40 42
43 + impl PluginError {
44 + /// Convert this error to a [`StructuredError`] by classifying the inner message.
45 + pub fn to_structured(&self) -> StructuredError {
46 + match self {
47 + PluginError::RhaiError(e) => e.to_structured(),
48 + PluginError::FetchError(msg) => classify_error(msg),
49 + _ => classify_error(&self.to_string()),
50 + }
51 + }
52 + }
53 +
41 54 /// Manages Rhai plugin loading and lifecycle
42 55 pub struct PluginManager {
43 56 plugins_dir: PathBuf,
@@ -46,6 +59,7 @@ pub struct PluginManager {
46 59 }
47 60
48 61 impl PluginManager {
62 + /// Create a new plugin manager rooted at the given plugins directory.
49 63 pub fn new(plugins_dir: impl AsRef<Path>) -> Self {
50 64 Self {
51 65 plugins_dir: plugins_dir.as_ref().to_path_buf(),
@@ -19,7 +19,7 @@
19 19
20 20 use bb_interface::{
21 21 BiteDisplay, BusserCapabilities, BusserConfig, ConfigField, ConfigFieldType, ConfigSchema,
22 - FeedItem, FeedItemContent, FeedItemId, FeedItemMeta, FetchResult,
22 + FeedItem, FeedItemContent, FeedItemId, FeedItemMeta, FetchResult, ItemAction,
23 23 };
24 24 use rhai::{Dynamic, Map};
25 25 use tracing::warn;
@@ -671,6 +671,22 @@ pub(super) fn dynamic_to_feed_item(
671 671 .collect();
672 672 }
673 673 }
674 +
675 + if let Some(actions_val) = content_map.get("actions") {
676 + if let Some(actions_arr) = actions_val.clone().try_cast::<rhai::Array>() {
677 + content.actions = actions_arr
678 + .into_iter()
679 + .filter_map(|v| {
680 + let m = v.try_cast::<Map>()?;
681 + Some(ItemAction {
682 + label: m.get("label")?.clone().try_cast::<String>()?,
683 + action_type: m.get("action_type")?.clone().try_cast::<String>()?,
684 + url: m.get("url")?.clone().try_cast::<String>()?,
685 + })
686 + })
687 + .collect();
688 + }
689 + }
674 690 content
675 691 } else {
676 692 FeedItemContent::new()
@@ -107,6 +107,40 @@ fn check_fetch_deadline(deadline: &AtomicU64) -> Result<(), String> {
107 107 }
108 108 }
109 109
110 + /// Prefix string for structured error signals passed through Rhai.
111 + ///
112 + /// Format: `BB_ERR:category:message` (or `BB_ERR:rate_limited:retry_secs:message`).
113 + const BB_ERR_PREFIX: &str = "BB_ERR:";
114 +
115 + /// Classify a `ureq::Error` into a `BB_ERR:`-prefixed string that carries
116 + /// error category information through the Rhai string-error channel.
117 + fn format_ureq_error(err: ureq::Error) -> String {
118 + match err {
119 + ureq::Error::Status(status, response) => {
120 + let body = response
121 + .into_string()
122 + .unwrap_or_default();
123 + let body_preview = if body.len() > 200 { &body[..200] } else { &body };
124 + match status {
125 + 401 | 403 => format!("{BB_ERR_PREFIX}auth:HTTP {status}: {body_preview}"),
126 + 429 => {
127 + // Try to read Retry-After header (already consumed, use default)
128 + // ureq consumes the response, so we default to 60s
129 + let retry_secs = 60;
130 + format!("{BB_ERR_PREFIX}rate_limited:{retry_secs}:HTTP 429: {body_preview}")
131 + }
132 + 404 | 410 => format!("{BB_ERR_PREFIX}config:HTTP {status}: {body_preview}"),
133 + 500..=599 => format!("{BB_ERR_PREFIX}transient:HTTP {status}: {body_preview}"),
134 + _ => format!("{BB_ERR_PREFIX}transient:HTTP {status}: {body_preview}"),
135 + }
136 + }
137 + ureq::Error::Transport(ref transport) => {
138 + let msg = transport.to_string();
139 + format!("{BB_ERR_PREFIX}transient:Transport error: {msg}")
140 + }
141 + }
142 + }
143 +
110 144 /// Register host functions available to Rhai scripts.
111 145 ///
112 146 /// # Trust model
@@ -135,15 +169,15 @@ pub(super) fn register_host_functions(
135 169 .set("User-Agent", USER_AGENT)
136 170 .timeout(HTTP_TIMEOUT)
137 171 .call()
138 - .map_err(|e| format!("HTTP request failed: {}", e))?;
172 + .map_err(format_ureq_error)?;
139 173
140 - let mut body = String::new();
174 + let mut bytes = Vec::new();
141 175 response
142 176 .into_reader()
143 177 .take(MAX_RESPONSE_BYTES)
144 - .read_to_string(&mut body)
178 + .read_to_end(&mut bytes)
145 179 .map_err(|e| format!("Failed to read response: {}", e))?;
146 - Ok(body)
180 + Ok(String::from_utf8_lossy(&bytes).into_owned())
147 181 });
148 182
149 183 // HTTP GET returning parsed JSON as Dynamic
@@ -160,7 +194,7 @@ pub(super) fn register_host_functions(
160 194 .set("User-Agent", USER_AGENT)
161 195 .timeout(HTTP_TIMEOUT)
162 196 .call()
163 - .map_err(|e| format!("HTTP request failed: {}", e))?;
197 + .map_err(format_ureq_error)?;
164 198
165 199 let mut body = Vec::new();
166 200 response
@@ -170,7 +204,7 @@ pub(super) fn register_host_functions(
170 204 .map_err(|e| format!("Failed to read response: {}", e))?;
171 205
172 206 let json: serde_json::Value = serde_json::from_slice(&body)
173 - .map_err(|e| format!("JSON parse failed: {}", e))?;
207 + .map_err(|e| format!("{BB_ERR_PREFIX}parse:JSON parse failed: {e}"))?;
174 208
175 209 json_to_dynamic(json)
176 210 },
@@ -181,7 +215,7 @@ pub(super) fn register_host_functions(
181 215 "parse_json",
182 216 |json_str: &str| -> Result<Dynamic, Box<rhai::EvalAltResult>> {
183 217 let json: serde_json::Value =
184 - serde_json::from_str(json_str).map_err(|e| format!("JSON parse error: {}", e))?;
218 + serde_json::from_str(json_str).map_err(|e| format!("{BB_ERR_PREFIX}parse:JSON parse error: {e}"))?;
185 219 json_to_dynamic(json)
186 220 },
187 221 );
@@ -281,6 +315,37 @@ pub(super) fn register_host_functions(
281 315 }
282 316 });
283 317
318 + // ── Structured error signaling ──────────────────────────────────
319 + // Plugins can throw categorized errors that the host recognizes.
320 +
321 + engine.register_fn(
322 + "throw_rate_limited",
323 + |retry_secs: i64| -> Result<(), Box<rhai::EvalAltResult>> {
324 + Err(format!("{BB_ERR_PREFIX}rate_limited:{retry_secs}:Rate limited by plugin").into())
325 + },
326 + );
327 +
328 + engine.register_fn(
329 + "throw_auth_error",
330 + |msg: &str| -> Result<(), Box<rhai::EvalAltResult>> {
331 + Err(format!("{BB_ERR_PREFIX}auth:{msg}").into())
332 + },
333 + );
334 +
335 + engine.register_fn(
336 + "throw_config_error",
337 + |msg: &str| -> Result<(), Box<rhai::EvalAltResult>> {
338 + Err(format!("{BB_ERR_PREFIX}config:{msg}").into())
339 + },
340 + );
341 +
342 + engine.register_fn(
343 + "throw_parse_error",
344 + |msg: &str| -> Result<(), Box<rhai::EvalAltResult>> {
345 + Err(format!("{BB_ERR_PREFIX}parse:{msg}").into())
346 + },
347 + );
348 +
284 349 // Extract main article content from HTML using the readability algorithm.
285 350 // Returns a map with "title", "content" (cleaned HTML), and "text" (plain text).
286 351 engine.register_fn("extract_article", |html: String| -> rhai::Map {
@@ -315,7 +380,10 @@ mod tests {
315 380
316 381 use std::sync::atomic::AtomicU64;
317 382
318 - use super::{check_fetch_deadline, check_request_limit, validate_url, MAX_REQUESTS_PER_FETCH};
383 + use super::{
384 + check_fetch_deadline, check_request_limit, validate_url, BB_ERR_PREFIX,
385 + MAX_REQUESTS_PER_FETCH,
386 + };
319 387
320 388 /// Truncate text with ellipsis (mirrors the Rhai-registered closure for testing).
321 389 fn truncate_text(text: &str, max: usize) -> String {
@@ -673,4 +741,43 @@ mod tests {
673 741 assert!(result.contains_key("content"));
674 742 assert!(result.contains_key("text"));
675 743 }
744 +
745 + // ── BB_ERR prefix tests ─────────────────────────────────────
746 +
747 + #[test]
748 + fn bb_err_prefix_constant() {
749 + assert_eq!(BB_ERR_PREFIX, "BB_ERR:");
750 + }
751 +
752 + #[test]
753 + fn throw_auth_error_produces_prefixed_string() {
754 + let engine = super::super::create_engine();
755 + let result = engine.eval::<()>(r#"throw_auth_error("bad key")"#);
756 + let err = result.unwrap_err().to_string();
757 + assert!(err.contains("BB_ERR:auth:bad key"), "got: {err}");
758 + }
759 +
760 + #[test]
761 + fn throw_config_error_produces_prefixed_string() {
762 + let engine = super::super::create_engine();
763 + let result = engine.eval::<()>(r#"throw_config_error("missing URL")"#);
764 + let err = result.unwrap_err().to_string();
765 + assert!(err.contains("BB_ERR:config:missing URL"), "got: {err}");
766 + }
767 +
768 + #[test]
769 + fn throw_parse_error_produces_prefixed_string() {
770 + let engine = super::super::create_engine();
771 + let result = engine.eval::<()>(r#"throw_parse_error("bad XML")"#);
772 + let err = result.unwrap_err().to_string();
773 + assert!(err.contains("BB_ERR:parse:bad XML"), "got: {err}");
774 + }
775 +
776 + #[test]
777 + fn throw_rate_limited_produces_prefixed_string() {
778 + let engine = super::super::create_engine();
779 + let result = engine.eval::<()>("throw_rate_limited(120)");
780 + let err = result.unwrap_err().to_string();
781 + assert!(err.contains("BB_ERR:rate_limited:120:"), "got: {err}");
782 + }
676 783 }
@@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16 16 use std::sync::Arc;
17 17
18 18 use bb_interface::{BusserCapabilities, ConfigSchema};
19 - use rhai::{Dynamic, Engine, Scope, AST};
19 + use rhai::{Dynamic, Engine, ImmutableString, Scope, AST};
20 20 use thiserror::Error;
21 21 use tracing::debug;
22 22
@@ -24,6 +24,7 @@ use conversions::{
24 24 busser_config_to_dynamic, dynamic_to_capabilities, dynamic_to_config_schema,
25 25 dynamic_to_fetch_result,
26 26 };
27 + use bb_interface::{ErrorCategory, StructuredError};
27 28 use host_functions::register_host_functions;
28 29
29 30 #[derive(Error, Debug)]
@@ -55,6 +56,75 @@ pub enum RhaiPluginError {
55 56 JsonError(String),
56 57 }
57 58
59 + /// Prefix used by host functions and `throw_*` helpers to signal structured errors.
60 + const BB_ERR_PREFIX: &str = "BB_ERR:";
61 +
62 + impl RhaiPluginError {
63 + /// Convert this error to a [`StructuredError`] by parsing the `BB_ERR:` prefix
64 + /// convention, falling back to heuristic keyword matching.
65 + pub fn to_structured(&self) -> StructuredError {
66 + let msg = self.to_string();
67 + classify_error(&msg)
68 + }
69 + }
70 +
71 + /// Parse a `BB_ERR:category:message` string into a [`StructuredError`].
72 + /// Falls back to [`heuristic_classify`] for unprefixed errors.
73 + pub fn classify_error(error_str: &str) -> StructuredError {
74 + // The error string from Rhai wraps the inner error message. Extract the
75 + // BB_ERR portion wherever it appears in the string.
76 + if let Some(pos) = error_str.find(BB_ERR_PREFIX) {
77 + let rest = &error_str[pos + BB_ERR_PREFIX.len()..];
78 + if let Some((category_str, remainder)) = rest.split_once(':') {
79 + match category_str {
80 + "rate_limited" => {
81 + // Format: BB_ERR:rate_limited:retry_secs:message
82 + let (retry_secs, message) = match remainder.split_once(':') {
83 + Some((secs_str, msg)) => {
84 + let secs = secs_str.parse::<u64>().unwrap_or(60);
85 + (secs, msg.to_string())
86 + }
87 + None => (60, remainder.to_string()),
88 + };
89 + return StructuredError::rate_limited(message, retry_secs);
90 + }
91 + "auth" => {
92 + return StructuredError::new(ErrorCategory::Auth, remainder);
93 + }
94 + "config" => {
95 + return StructuredError::new(ErrorCategory::Config, remainder);
96 + }
97 + "parse" => {
98 + return StructuredError::new(ErrorCategory::Parse, remainder);
99 + }
100 + "transient" => {
101 + return StructuredError::new(ErrorCategory::Transient, remainder);
102 + }
103 + _ => {}
104 + }
105 + }
106 + }
107 + heuristic_classify(error_str)
108 + }
109 +
110 + /// Best-effort classification of an error string by looking for common keywords.
111 + fn heuristic_classify(error_str: &str) -> StructuredError {
112 + let lower = error_str.to_ascii_lowercase();
113 + if lower.contains("401") || lower.contains("403") || lower.contains("unauthorized") || lower.contains("forbidden") {
114 + StructuredError::new(ErrorCategory::Auth, error_str)
115 + } else if lower.contains("429") || lower.contains("rate limit") {
116 + StructuredError::rate_limited(error_str, 60)
117 + } else if lower.contains("404") || lower.contains("not found") || lower.contains("410") {
118 + StructuredError::new(ErrorCategory::Config, error_str)
119 + } else if lower.contains("timeout") || lower.contains("timed out") || lower.contains("500") || lower.contains("502") || lower.contains("503") || lower.contains("connection") {
120 + StructuredError::new(ErrorCategory::Transient, error_str)
121 + } else if lower.contains("parse") || lower.contains("invalid xml") || lower.contains("invalid json") || lower.contains("malformed") {
122 + StructuredError::new(ErrorCategory::Parse, error_str)
123 + } else {
124 + StructuredError::new(ErrorCategory::Unknown, error_str)
125 + }
126 + }
127 +
58 128 /// A compiled Rhai plugin.
59 129 ///
60 130 /// Each plugin is a single `.rhai` script that implements the busser
@@ -82,9 +152,19 @@ pub struct RhaiPlugin {
82 152 }
83 153
84 154 impl RhaiPlugin {
155 + /// Evaluate top-level code (e.g. `const` declarations) into a scope
156 + /// so that constants are visible inside functions called via `call_fn`.
157 + fn eval_scope(&self) -> Result<Scope<'static>, RhaiPluginError> {
158 + let mut scope = Scope::new();
159 + self.engine
160 + .run_ast_with_scope(&mut scope, &self.ast)
161 + .map_err(|e| RhaiPluginError::RuntimeError(e.to_string()))?;
162 + Ok(scope)
163 + }
164 +
85 165 /// Get the plugin's configuration schema.
86 166 pub fn config_schema(&self) -> Result<ConfigSchema, RhaiPluginError> {
87 - let mut scope = Scope::new();
167 + let mut scope = self.eval_scope()?;
88 168 let result: Dynamic = self
89 169 .engine
90 170 .call_fn(&mut scope, &self.ast, "config_schema", ())
@@ -95,7 +175,13 @@ impl RhaiPlugin {
95 175
96 176 /// Get the plugin's capabilities.
97 177 pub fn capabilities(&self) -> BusserCapabilities {
98 - let mut scope = Scope::new();
178 + let mut scope = match self.eval_scope() {
179 + Ok(s) => s,
180 + Err(e) => {
181 + tracing::warn!(error = %e, plugin_id = %self.id, "Plugin scope eval failed");
182 + return BusserCapabilities::default();
183 + }
184 + };
99 185 match self
100 186 .engine
101 187 .call_fn::<Dynamic>(&mut scope, &self.ast, "capabilities", ())
@@ -127,7 +213,7 @@ impl RhaiPlugin {
127 213 Ordering::Relaxed,
128 214 );
129 215
130 - let mut scope = Scope::new();
216 + let mut scope = self.eval_scope()?;
131 217
132 218 let config_map = busser_config_to_dynamic(config);
133 219
@@ -141,6 +227,9 @@ impl RhaiPlugin {
141 227 .call_fn(&mut scope, &self.ast, "fetch", (config_map, cursor_val))
142 228 .map_err(|e| RhaiPluginError::RuntimeError(e.to_string()))?;
143 229
230 + validate_dynamic_sizes(&result, 0)
231 + .map_err(|e| RhaiPluginError::RuntimeError(e))?;
232 +
144 233 dynamic_to_fetch_result(result, &self.id)
145 234 }
146 235 }
@@ -163,20 +252,19 @@ impl RhaiPluginManager {
163 252 /// - `max_expr_depths(128, 128)`: limits AST nesting depth for both expressions
164 253 /// and functions, preventing stack overflows from deeply recursive scripts.
165 254 /// - `max_call_levels(32)`: limits function call nesting depth.
166 - /// - `max_string_size(10_000)`: prevents scripts from building huge strings.
167 - /// - `max_array_size(1_000)`: prevents scripts from building huge arrays.
168 - /// - `max_map_size(100)`: prevents scripts from building huge maps.
169 255 /// - HTTP limits: 15s timeout, 2 MB response cap, 100 requests per fetch,
170 256 /// http/https only, no localhost/internal addresses.
257 + ///
258 + /// Data size limits (max_string_size, max_array_size, max_map_size) are
259 + /// intentionally disabled. Rhai counts these cumulatively across the entire
260 + /// nested value tree (not per container), so normal API responses trip modest
261 + /// limits. The 2 MB HTTP response cap is the real memory gate.
171 262 pub fn new() -> Self {
172 263 let mut engine = Engine::new();
173 264
174 265 engine.set_max_expr_depths(128, 128);
175 266 engine.set_max_operations(100_000);
176 267 engine.set_max_call_levels(32);
177 - engine.set_max_string_size(10_000);
178 - engine.set_max_array_size(1_000);
179 - engine.set_max_map_size(100);
180 268
181 269 let request_counter = Arc::new(AtomicUsize::new(0));
182 270 let fetch_deadline = Arc::new(AtomicU64::new(0));
@@ -200,7 +288,11 @@ impl RhaiPluginManager {
200 288 .compile(&script)
201 289 .map_err(|e| RhaiPluginError::CompileError(e.to_string()))?;
202 290
291 + // Evaluate top-level code (const declarations) so they're visible in functions
203 292 let mut scope = Scope::new();
293 + self.engine
294 + .run_ast_with_scope(&mut scope, &ast)
295 + .map_err(|e| RhaiPluginError::CompileError(format!("Top-level eval failed: {}", e)))?;
204 296
205 297 let id: String = self
206 298 .engine
@@ -266,15 +358,69 @@ pub fn create_engine() -> Engine {
266 358 engine.set_max_expr_depths(128, 128);
267 359 engine.set_max_operations(100_000);
268 360 engine.set_max_call_levels(32);
269 - engine.set_max_string_size(10_000);
270 - engine.set_max_array_size(1_000);
271 - engine.set_max_map_size(100);
272 361 let counter = Arc::new(AtomicUsize::new(0));
273 362 let deadline = Arc::new(AtomicU64::new(0));
274 363 register_host_functions(&mut engine, counter, deadline);
275 364 engine
276 365 }
277 366
367 + // ── Per-node data size limits ────────────────────────────────────────────
368 + //
369 + // Rhai's built-in max_string_size / max_array_size / max_map_size count
370 + // cumulatively across the entire nested value tree, which makes them trip
371 + // on normal API responses. Instead we enforce per-node limits: no single
372 + // string, array, or map may exceed these thresholds.
373 +
374 + /// Maximum length (in bytes) for any single string value.
375 + const MAX_SINGLE_STRING: usize = 1_000_000; // 1 MB
376 + /// Maximum number of elements in any single array.
377 + const MAX_SINGLE_ARRAY: usize = 500;
378 + /// Maximum number of entries in any single map.
379 + const MAX_SINGLE_MAP: usize = 200;
380 + /// Maximum nesting depth to walk (prevents pathological recursion).
381 + const MAX_DEPTH: usize = 32;
382 +
383 + /// Recursively validate per-node size limits on a Dynamic value tree.
384 + ///
385 + /// Returns `Ok(())` if every node is within limits, or an `Err` describing
386 + /// the first violation found.
387 + fn validate_dynamic_sizes(val: &Dynamic, depth: usize) -> Result<(), String> {
388 + if depth > MAX_DEPTH {
389 + return Err(format!("value nesting exceeds {MAX_DEPTH} levels"));
390 + }
391 +
392 + if let Some(s) = val.read_lock::<ImmutableString>() {
393 + if s.len() > MAX_SINGLE_STRING {
394 + return Err(format!(
395 + "single string too large ({} bytes, max {MAX_SINGLE_STRING})",
396 + s.len()
397 + ));
398 + }
399 + } else if let Ok(arr) = val.as_array_ref() {
400 + if arr.len() > MAX_SINGLE_ARRAY {
401 + return Err(format!(
402 + "single array too large ({} elements, max {MAX_SINGLE_ARRAY})",
403 + arr.len()
404 + ));
405 + }
406 + for elem in arr.iter() {
407 + validate_dynamic_sizes(elem, depth + 1)?;
408 + }
409 + } else if let Ok(map) = val.as_map_ref() {
410 + if map.len() > MAX_SINGLE_MAP {
411 + return Err(format!(
412 + "single map too large ({} entries, max {MAX_SINGLE_MAP})",
413 + map.len()
414 + ));
415 + }
416 + for v in map.values() {
417 + validate_dynamic_sizes(v, depth + 1)?;
418 + }
419 + }
420 +
421 + Ok(())
422 + }
423 +
278 424 /// Result from extracting article content via the reader plugin.
279 425 #[derive(Debug, Clone)]
280 426 pub struct ReaderResult {
@@ -334,6 +480,70 @@ mod tests {
334 480 use super::*;
335 481
336 482 #[test]
483 + fn classify_error_auth_prefix() {
484 + let err = classify_error("Script execution failed: BB_ERR:auth:HTTP 401: Unauthorized");
485 + assert_eq!(err.category, bb_interface::ErrorCategory::Auth);
486 + assert!(err.message.contains("HTTP 401"));
487 + }
488 +
489 + #[test]
490 + fn classify_error_rate_limited_prefix() {
491 + let err = classify_error("Script execution failed: BB_ERR:rate_limited:120:HTTP 429: slow down");
492 + assert_eq!(err.category, bb_interface::ErrorCategory::RateLimited);
493 + assert_eq!(err.retry_after_secs, Some(120));
494 + assert!(err.message.contains("429"));
495 + }
496 +
497 + #[test]
498 + fn classify_error_config_prefix() {
499 + let err = classify_error("BB_ERR:config:HTTP 404: not found");
500 + assert_eq!(err.category, bb_interface::ErrorCategory::Config);
501 + }
502 +
503 + #[test]
504 + fn classify_error_parse_prefix() {
505 + let err = classify_error("BB_ERR:parse:JSON parse failed: unexpected EOF");
506 + assert_eq!(err.category, bb_interface::ErrorCategory::Parse);
507 + }
508 +
509 + #[test]
510 + fn classify_error_transient_prefix() {
511 + let err = classify_error("BB_ERR:transient:HTTP 503: Service Unavailable");
512 + assert_eq!(err.category, bb_interface::ErrorCategory::Transient);
513 + }
514 +
515 + #[test]
516 + fn heuristic_classify_timeout() {
517 + let err = classify_error("HTTP request failed: connection timed out");
518 + assert_eq!(err.category, bb_interface::ErrorCategory::Transient);
519 + }
520 +
521 + #[test]
522 + fn heuristic_classify_401() {
523 + let err = classify_error("unexpected 401 response from server");
524 + assert_eq!(err.category, bb_interface::ErrorCategory::Auth);
525 + }
526 +
527 + #[test]
528 + fn heuristic_classify_parse() {
529 + let err = classify_error("parse error: invalid XML structure");
530 + assert_eq!(err.category, bb_interface::ErrorCategory::Parse);
531 + }
532 +
533 + #[test]
534 + fn heuristic_classify_unknown() {
535 + let err = classify_error("something went wrong");
536 + assert_eq!(err.category, bb_interface::ErrorCategory::Unknown);
537 + }
538 +
539 + #[test]
540 + fn rhai_plugin_error_to_structured() {
541 + let e = RhaiPluginError::RuntimeError("BB_ERR:auth:bad token".into());
542 + let s = e.to_structured();
543 + assert_eq!(s.category, bb_interface::ErrorCategory::Auth);
544 + }
545 +
546 + #[test]
337 547 fn error_display_compile() {
338 548 let e = RhaiPluginError::CompileError("syntax error".into());
339 549 assert_eq!(e.to_string(), "Script compilation failed: syntax error");
@@ -398,41 +608,59 @@ mod tests {
398 608 }
399 609
400 610 #[test]
401 - fn sandbox_enforces_string_size_limit() {
402 - let manager = RhaiPluginManager::new();
403 - let script = r#"
404 - let s = "x";
405 - for i in 0..20 { s += s; }
406 - s
407 - "#;
408 - let result = manager.engine.eval::<String>(script);
409 - assert!(result.is_err(), "huge string should be stopped by string size limit");
611 + fn validate_dynamic_sizes_accepts_normal_data() {
612 + // A map with 50 entries, each containing a nested map with 10 keys
613 + // and a string. Simulates a typical API response.
614 + let mut outer = rhai::Map::new();
615 + for i in 0..50 {
616 + let mut inner = rhai::Map::new();
617 + for j in 0..10 {
618 + inner.insert(format!("k{j}").into(), Dynamic::from(format!("val-{i}-{j}")));
619 + }
620 + outer.insert(format!("item{i}").into(), Dynamic::from(inner));
621 + }
622 + let val = Dynamic::from(outer);
623 + assert!(validate_dynamic_sizes(&val, 0).is_ok());
410 624 }
411 625
412 626 #[test]
413 - fn sandbox_enforces_array_size_limit() {
414 - let manager = RhaiPluginManager::new();
415 - let script = r#"
416 - let a = [];
417 - for i in 0..2000 { a.push(i); }
418 - a
419 - "#;
420 - let result = manager.engine.eval::<rhai::Array>(script);
421 - assert!(result.is_err(), "huge array should be stopped by array size limit");
627 + fn validate_dynamic_sizes_rejects_oversized_map() {
628 + let mut big = rhai::Map::new();
629 + for i in 0..201 {
630 + big.insert(format!("k{i}").into(), Dynamic::from(i as i64));
631 + }
632 + let val = Dynamic::from(big);
633 + let err = validate_dynamic_sizes(&val, 0).unwrap_err();
634 + assert!(err.contains("single map too large"), "got: {err}");
422 635 }
423 636
424 637 #[test]
425 - fn sandbox_enforces_map_size_limit() {
426 - let manager = RhaiPluginManager::new();
427 - // Build a map by merging — Rhai checks map size limits on object map literals
428 - // and merge operations, not index assignment. Use += to trigger the check.
429 - let mut entries = String::new();
430 - for i in 0..200 {
431 - entries.push_str(&format!("m += #{{ k{i}: {i} }};\n"));
638 + fn validate_dynamic_sizes_rejects_oversized_array() {
639 + let arr: Vec<Dynamic> = (0..501).map(|i| Dynamic::from(i as i64)).collect();
640 + let val = Dynamic::from(arr);
641 + let err = validate_dynamic_sizes(&val, 0).unwrap_err();
642 + assert!(err.contains("single array too large"), "got: {err}");
643 + }
644 +
645 + #[test]
646 + fn validate_dynamic_sizes_rejects_oversized_string() {
647 + let big_str = "x".repeat(MAX_SINGLE_STRING + 1);
648 + let val = Dynamic::from(big_str);
649 + let err = validate_dynamic_sizes(&val, 0).unwrap_err();
650 + assert!(err.contains("single string too large"), "got: {err}");
651 + }
652 +
653 + #[test]
654 + fn validate_dynamic_sizes_rejects_deep_nesting() {
655 + // Build a value nested MAX_DEPTH+1 levels deep
656 + let mut val = Dynamic::from(1_i64);
657 + for _ in 0..MAX_DEPTH + 1 {
658 + let mut m = rhai::Map::new();
659 + m.insert("nested".into(), val);
660 + val = Dynamic::from(m);
432 661 }
433 - let script = format!("let m = #{{}};\n{entries}m");
434 - let result = manager.engine.eval::<rhai::Map>(&script);
435 - assert!(result.is_err(), "huge map should be stopped by map size limit");
662 + let err = validate_dynamic_sizes(&val, 0).unwrap_err();
663 + assert!(err.contains("nesting exceeds"), "got: {err}");
436 664 }
437 665
438 666 #[test]
@@ -453,15 +681,31 @@ mod tests {
453 681 assert!(result.is_err(), "deep recursion should be stopped by call level limit");
454 682 }
455 683
684 +
456 685 #[test]
457 - fn create_engine_enforces_string_size_limit() {
458 - let engine = create_engine();
459 - let script = r#"
460 - let s = "x";
461 - for i in 0..20 { s += s; }
462 - s
463 - "#;
464 - let result = engine.eval::<String>(script);
465 - assert!(result.is_err(), "huge string should be stopped by string size limit");
686 + fn devto_plugin_live_fetch() {
687 + let manifest_dir = env!("CARGO_MANIFEST_DIR");
688 + let plugins_dir = std::path::Path::new(manifest_dir).join("../../plugins");
689 + let plugin_path = plugins_dir.join("devto.rhai");
690 + if !plugin_path.exists() {
691 + eprintln!("Skipping: devto.rhai not found at {:?}", plugin_path);
692 + return;
693 + }
694 +
695 + let mut manager = RhaiPluginManager::new();
696 + let id = manager.load_plugin(&plugin_path).expect("failed to load devto plugin");
697 + let plugin = manager.get(&id).unwrap();
698 +
699 + let config = bb_interface::BusserConfig {
700 + options: std::collections::HashMap::new(),
701 + feeds: vec![],
702 + };
703 +
704 + let result = plugin.fetch(&config, None);
705 + match &result {
706 + Ok(r) => eprintln!("OK: got {} items, has_more={}", r.items.len(), r.has_more),
707 + Err(e) => eprintln!("ERROR: {e}"),
708 + }
709 + result.expect("devto fetch should succeed");
466 710 }
467 711 }
@@ -103,6 +103,8 @@ pub struct DbFeedItem {
103 103 pub url: Option<String>,
104 104 /// JSON-encoded list of media attachment URLs.
105 105 pub media: String,
106 + /// JSON-encoded list of plugin-declared custom actions.
107 + pub actions: String,
106 108
107 109 /// When the item was originally published (SQLite timestamp text).
108 110 pub published_at: String,
@@ -153,6 +155,14 @@ impl DbFeedItem {
153 155 )
154 156 }
155 157
158 + /// Deserialize the JSON `actions` column into a `Vec<ItemAction>`.
159 + pub fn actions_vec(&self) -> Vec<bb_interface::ItemAction> {
160 + parse_or_default(
161 + serde_json::from_str(&self.actions),
162 + "Failed to parse feed item actions JSON",
163 + )
164 + }
165 +
156 166 /// Reconstruct a full [`FeedItem`](bb_interface::FeedItem) from the flat DB row.
157 167 ///
158 168 /// The interface type splits an item into three parts:
@@ -178,6 +188,7 @@ impl DbFeedItem {
178 188 content.body = self.body.clone();
179 189 content.url = self.url.clone();
180 190 content.media = self.media_vec();
191 + content.actions = self.actions_vec();
181 192
182 193 // 3. Meta — timestamps, score, tags
183 194 let published_at = self.published_at_dt();
@@ -247,6 +258,8 @@ pub struct CreateFeedItem {
247 258 pub url: Option<String>,
248 259 /// Media attachment URLs.
249 260 pub media: Vec<String>,
261 + /// Plugin-declared custom actions.
262 + pub actions: Vec<bb_interface::ItemAction>,
250 263 /// When the item was originally published.
251 264 pub published_at: DateTime<Utc>,
252 265 /// Human-readable source name.
@@ -272,6 +285,7 @@ impl CreateFeedItem {
272 285 body: item.content.body.clone(),
273 286 url: item.content.url.clone(),
274 287 media: item.content.media.clone(),
288 + actions: item.content.actions.clone(),
275 289 published_at: DateTime::from_timestamp(item.meta.published_at, 0)
276 290 .unwrap_or_else(Utc::now),
277 291 source_name: item.meta.source_name.clone(),
@@ -6,6 +6,8 @@
6 6 use chrono::{DateTime, Utc};
7 7 use sqlx::SqlitePool;
8 8
9 + use bb_interface::{ErrorCategory, StructuredError};
10 +
9 11 use crate::id_types::{BusserStateId, FeedId, ItemId, QueryFeedId};
10 12 use crate::models::*;
11 13 use crate::TIMESTAMP_FMT;
@@ -193,6 +195,74 @@ impl FeedsRepository {
193 195 Ok(false)
194 196 }
195 197
198 + /// Record a structured fetch failure with category-aware behavior:
199 + ///
200 + /// - `RateLimited` — store error JSON, do NOT increment `consecutive_failures`.
201 + /// - `Auth` / `Config` — increment + immediately set `circuit_broken = 1`.
202 + /// - `Transient` / `Parse` / `Unknown` — increment normally (existing behavior).
203 + ///
204 + /// Returns `true` if the circuit breaker tripped.
205 + pub async fn record_fetch_failure_structured(
206 + &self,
207 + id: FeedId,
208 + error: &StructuredError,
209 + ) -> Result<bool, sqlx::Error> {
210 + let now = Utc::now().format(TIMESTAMP_FMT).to_string();
211 + let error_json = error.to_json();
212 +
213 + match error.category {
214 + ErrorCategory::RateLimited => {
215 + // Store error but don't increment failure counter
216 + sqlx::query(
217 + "UPDATE feeds SET last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3",
218 + )
219 + .bind(&error_json)
220 + .bind(&now)
221 + .bind(id)
222 + .execute(&self.pool)
223 + .await?;
224 + Ok(false)
225 + }
226 + ErrorCategory::Auth | ErrorCategory::Config => {
227 + // Increment + immediate circuit break
228 + sqlx::query(
229 + "UPDATE feeds SET consecutive_failures = consecutive_failures + 1, \
230 + last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3",
231 + )
232 + .bind(&error_json)
233 + .bind(&now)
234 + .bind(id)
235 + .execute(&self.pool)
236 + .await?;
237 + self.set_circuit_broken(id, true).await?;
238 + Ok(true)
239 + }
240 + ErrorCategory::Transient | ErrorCategory::Parse | ErrorCategory::Unknown => {
241 + // Normal behavior: increment and check threshold
242 + sqlx::query(
243 + "UPDATE feeds SET consecutive_failures = consecutive_failures + 1, \
244 + last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3",
245 + )
246 + .bind(&error_json)
247 + .bind(&now)
248 + .bind(id)
249 + .execute(&self.pool)
250 + .await?;
251 +
252 + let feed = self.get(id).await?;
253 + if let Some(feed) = feed {
254 + if !feed.circuit_broken
255 + && feed.consecutive_failures >= CIRCUIT_BREAKER_THRESHOLD
256 + {
257 + self.set_circuit_broken(id, true).await?;
258 + return Ok(true);
259 + }
260 + }
261 + Ok(false)
262 + }
263 + }
264 + }
265 +
196 266 /// Mark a feed as circuit-broken (or clear the circuit breaker).
197 267 pub async fn set_circuit_broken(
198 268 &self,
@@ -284,17 +354,19 @@ impl ItemsRepository {
284 354 // Vec<String> is always serializable; unwrap is safe here.
285 355 let media = serde_json::to_string(&input.media).unwrap_or_else(|_| "[]".to_string());
286 356 let tags = serde_json::to_string(&input.tags).unwrap_or_else(|_| "[]".to_string());
357 + let actions =
358 + serde_json::to_string(&input.actions).unwrap_or_else(|_| "[]".to_string());
287 359
288 360 sqlx::query_as(
289 361 r#"
290 362 INSERT INTO feed_items (
291 363 id, external_id, feed_id, busser_id,
292 364 bite_author, bite_text, bite_secondary, bite_indicator,
293 - title, body, url, media,
365 + title, body, url, media, actions,
294 366 published_at, fetched_at, source_name, score, tags,
295 367 is_read, is_starred, created_at, updated_at
296 368 )
297 - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, 0, 0, ?18, ?18)
369 + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, 0, 0, ?19, ?19)
298 370 ON CONFLICT (external_id) DO UPDATE SET
299 371 bite_author = EXCLUDED.bite_author,
300 372 bite_text = EXCLUDED.bite_text,
@@ -304,6 +376,7 @@ impl ItemsRepository {
304 376 body = EXCLUDED.body,
305 377 url = EXCLUDED.url,
306 378 media = EXCLUDED.media,
379 + actions = EXCLUDED.actions,
307 380 score = EXCLUDED.score,
308 381 tags = EXCLUDED.tags,
309 382 updated_at = EXCLUDED.updated_at
@@ -322,6 +395,7 @@ impl ItemsRepository {
322 395 .bind(&input.body)
323 396 .bind(&input.url)
324 397 .bind(&media)
398 + .bind(&actions)
325 399 .bind(&published)
326 400 .bind(&now)
327 401 .bind(&input.source_name)
@@ -963,6 +1037,7 @@ mod tests {
963 1037 source_name: "test".to_string(),
964 1038 score: None,
965 1039 tags: vec![],
1040 + actions: vec![],
966 1041 })
967 1042 .await
968 1043 .unwrap()
@@ -1139,6 +1214,7 @@ mod tests {
1139 1214 source_name: "test".to_string(),
1140 1215 score: None,
1141 1216 tags: vec![],
1217 + actions: vec![],
1142 1218 })
1143 1219 .await
1144 1220 .unwrap();
@@ -1443,6 +1519,7 @@ mod tests {
1443 1519 source_name: "test".to_string(),
1444 1520 score: None,
1445 1521 tags: vec![],
1522 + actions: vec![],
1446 1523 })
1447 1524 .await
1448 1525 .unwrap();
@@ -1478,6 +1555,7 @@ mod tests {
1478 1555 source_name: "test".to_string(),
1479 1556 score: None,
1480 1557 tags: vec![],
1558 + actions: vec![],
1481 1559 })
1482 1560 .await
1483 1561 .unwrap();
@@ -1512,6 +1590,7 @@ mod tests {
1512 1590 source_name: "test".to_string(),
1513 1591 score: None,
1514 1592 tags: vec![],
1593 + actions: vec![],
1515 1594 })
1516 1595 .await
1517 1596 .unwrap();
@@ -1546,6 +1625,7 @@ mod tests {
1546 1625 source_name: "test".to_string(),
1547 1626 score: None,
1548 1627 tags: vec![],
1628 + actions: vec![],
1549 1629 })
1550 1630 .await
1551 1631 .unwrap();
@@ -1711,6 +1791,7 @@ mod tests {
1711 1791 source_name: "test".to_string(),
1712 1792 score: None,
1713 1793 tags: vec![],
1794 + actions: vec![],
1714 1795 })
1715 1796 .await
1716 1797 .unwrap();
@@ -1806,6 +1887,7 @@ mod tests {
1806 1887 source_name: "test".to_string(),
1807 1888 score: None,
1808 1889 tags: vec![],
1890 + actions: vec![],
1809 1891 })
1810 1892 .await
1811 1893 .unwrap();
@@ -2079,6 +2161,82 @@ mod tests {
2079 2161 assert_eq!(f.last_error.as_deref(), Some("err2"));
2080 2162 }
2081 2163
2164 + // ── record_fetch_failure_structured ────────────────────────────
2165 +
2166 + #[tokio::test]
2167 + async fn structured_failure_rate_limited_no_increment() {
2168 + let pool = test_db().await;
2169 + let feed = make_feed(&pool, "rss", "Feed").await;
2170 + let feeds = FeedsRepository::new(pool);
2171 + let err = StructuredError::rate_limited("429 Too Many Requests", 120);
2172 + let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2173 + assert!(!tripped);
2174 + let f = feeds.get(feed.id).await.unwrap().unwrap();
2175 + assert_eq!(f.consecutive_failures, 0, "rate_limited should not increment failures");
2176 + assert!(f.last_error.is_some(), "error should be stored");
2177 + // Verify stored as JSON
2178 + let stored = StructuredError::from_last_error(f.last_error.as_ref().unwrap());
2179 + assert_eq!(stored.category, ErrorCategory::RateLimited);
2180 + assert_eq!(stored.retry_after_secs, Some(120));
2181 + }
2182 +
2183 + #[tokio::test]
2184 + async fn structured_failure_auth_immediate_circuit_break() {
2185 + let pool = test_db().await;
2186 + let feed = make_feed(&pool, "rss", "Feed").await;
2187 + let feeds = FeedsRepository::new(pool);
2188 + let err = StructuredError::new(ErrorCategory::Auth, "401 Unauthorized");
2189 + let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2190 + assert!(tripped, "auth error should immediately trip circuit breaker");
2191 + let f = feeds.get(feed.id).await.unwrap().unwrap();
2192 + assert!(f.circuit_broken);
2193 + assert_eq!(f.consecutive_failures, 1);
2194 + }
2195 +
2196 + #[tokio::test]
2197 + async fn structured_failure_config_immediate_circuit_break() {
2198 + let pool = test_db().await;
2199 + let feed = make_feed(&pool, "rss", "Feed").await;
2200 + let feeds = FeedsRepository::new(pool);
2201 + let err = StructuredError::new(ErrorCategory::Config, "404 Not Found");
2202 + let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2203 + assert!(tripped, "config error should immediately trip circuit breaker");
2204 + let f = feeds.get(feed.id).await.unwrap().unwrap();
2205 + assert!(f.circuit_broken);
2206 + }
2207 +
2208 + #[tokio::test]
2209 + async fn structured_failure_transient_increments_normally() {
2210 + let pool = test_db().await;
2211 + let feed = make_feed(&pool, "rss", "Feed").await;
2212 + let feeds = FeedsRepository::new(pool);
2213 + let err = StructuredError::new(ErrorCategory::Transient, "HTTP 503");
2214 + let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2215 + assert!(!tripped);
2216 + let f = feeds.get(feed.id).await.unwrap().unwrap();
2217 + assert_eq!(f.consecutive_failures, 1);
2218 + assert!(!f.circuit_broken);
2219 + }
2220 +
2221 + #[tokio::test]
2222 + async fn structured_failure_transient_trips_at_threshold() {
2223 + let pool = test_db().await;
2224 + let feed = make_feed(&pool, "rss", "Feed").await;
2225 + let feeds = FeedsRepository::new(pool);
2226 + // Fill up to threshold - 1
2227 + for i in 0..CIRCUIT_BREAKER_THRESHOLD - 1 {
2228 + let err = StructuredError::new(ErrorCategory::Transient, format!("error {i}"));
2229 + let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2230 + assert!(!tripped);
2231 + }
2232 + // One more should trip it
2233 + let err = StructuredError::new(ErrorCategory::Transient, "final error");
2234 + let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2235 + assert!(tripped);
2236 + let f = feeds.get(feed.id).await.unwrap().unwrap();
2237 + assert!(f.circuit_broken);
2238 + }
2239 +
2082 2240 // ── ItemsRepository (additional) ─────────────────────────────
2083 2241
2084 2242 #[tokio::test]
@@ -1,1371 +0,0 @@
1 - //! Feed generator for aggregating, filtering, and ordering items.
2 - //!
3 - //! Reads items from the database, applies user-defined filters and
4 - //! ordering, and returns paginated results for display.
5 -
6 - use bb_db::Database;
7 - use bb_interface::FeedItem;
8 - use thiserror::Error;
9 - use tracing::debug;
10 -
11 - use crate::{FeedFilter, OrderBy};
12 -
13 - #[derive(Error, Debug)]
14 - pub enum FeedError {
15 - #[error("Database error: {0}")]
16 - Database(#[from] sqlx::Error),
17 - }
18 -
19 - /// A page of feed items with a flag indicating whether more pages exist.
20 - #[derive(Debug)]
21 - pub struct PaginatedItems {
22 - /// The items for this page (at most `page_size` items).
23 - pub items: Vec<FeedItem>,
24 - /// Whether additional pages of results are available beyond this one.
25 - pub has_more: bool,
26 - }
27 -
28 - /// Source info with item counts
29 - #[derive(Debug, Clone)]
30 - pub struct SourceInfo {
31 - /// Busser/plugin identifier.
32 - pub id: String,
33 - /// Human-readable source name.
34 - pub name: String,
35 - /// Total number of items from this source.
36 - pub total_count: i64,
37 - /// Number of unread items from this source.
38 - pub unread_count: i64,
39 - /// User-assigned tags for this feed.
40 - pub tags: Vec<String>,
41 - /// Number of consecutive fetch failures (0 = healthy).
42 - pub consecutive_failures: i64,
43 - /// Error message from the last failed fetch.
44 - pub last_error: Option<String>,
45 - /// Whether the circuit breaker has tripped for this feed.
46 - pub circuit_broken: bool,
47 - }
48 -
49 - /// The feed generator aggregates and orders items from the database
50 - pub struct FeedGenerator {
51 - db: Database,
52 - order_by: OrderBy,
53 - filter: FeedFilter,
54 - page_size: i64,
55 - }
56 -
57 - impl FeedGenerator {
58 - pub fn new(db: Database) -> Self {
59 - Self {
60 - db,
61 - order_by: OrderBy::default(),
62 - filter: FeedFilter::default(),
63 - page_size: 50,
64 - }
65 - }
66 -
67 - pub fn with_order(mut self, order: OrderBy) -> Self {
68 - self.order_by = order;
69 - self
70 - }
71 -
72 - pub fn with_filter(mut self, filter: FeedFilter) -> Self {
73 - self.filter = filter;
74 - self
75 - }
76 -
77 - pub fn with_page_size(mut self, size: i64) -> Self {
78 - self.page_size = size;
79 - self
80 - }
81 -
82 - pub fn set_order(&mut self, order: OrderBy) {
83 - self.order_by = order;
84 - }
85 -
86 - pub fn set_filter(&mut self, filter: FeedFilter) {
87 - self.filter = filter;
88 - }
89 -
90 - pub fn order(&self) -> OrderBy {
91 - self.order_by
92 - }
93 -
94 - pub fn filter(&self) -> &FeedFilter {
95 - &self.filter
96 - }
97 -
98 - /// Get a page of feed items with pagination metadata.
99 - ///
100 - /// When a search query is present, the search predicate is pushed into SQL
101 - /// so that LIMIT/OFFSET pagination works correctly with filtered results.
102 - /// Other filters (source, unread, starred) are combined in the same query.
103 - ///
104 - /// Fetches `page_size + 1` items to detect whether more pages exist,
105 - /// then applies in-memory ordering before truncating to the exact page size.
106 - pub async fn get_items(
107 - &self,
108 - page: i64,
109 - ) -> Result<PaginatedItems, FeedError> {
110 - let offset = page * self.page_size;
111 -
112 - // Fetch page_size + 1 to detect whether more pages exist.
113 - let fetch_limit = self.page_size + 1;
114 -
115 - // When a search query is present, push it into SQL so pagination
116 - // is accurate (in-memory search after LIMIT would miss results).
117 - let items = if let Some(ref search) = self.filter.search {
118 - self.db
119 - .items()
120 - .list_search(
121 - search,
122 - self.filter.source.as_deref(),
123 - self.filter.unread_only,
124 - self.filter.starred_only,
125 - fetch_limit,
126 - offset,
127 - )
128 - .await?
129 - } else if let Some(ref source) = self.filter.source {
130 - self.db
131 - .items()
132 - .list_by_busser(source, fetch_limit, offset)
133 - .await?
134 - } else if self.filter.unread_only {
135 - self.db
136 - .items()
137 - .list_unread(fetch_limit, offset)
138 - .await?
139 - } else if self.filter.starred_only {
140 - self.db
141 - .items()
142 - .list_starred(fetch_limit, offset)
143 - .await?
144 - } else {
145 - self.db
146 - .items()
147 - .list_all(fetch_limit, offset)
148 - .await?
149 - };
150 -
151 - // Convert to FeedItem
152 - let mut feed_items: Vec<FeedItem> = items
153 - .into_iter()
154 - .map(|db_item| db_item.to_feed_item())
155 - .collect();
156 -
157 - // Track whether SQL returned a full page (indicating more rows exist)
158 - // BEFORE any in-memory filtering reduces the count.
159 - let sql_had_more = feed_items.len() > self.page_size as usize;
160 -
161 - // Whether any in-memory filter is active (used for has_more logic below).
162 - let needs_inmemory_filter =
163 - !self.filter.tags.is_empty() || !self.filter.feed_tags.is_empty() || !self.filter.conditions.is_empty();
164 -
165 - // Apply feed-tag filtering: only keep items whose feed has a matching tag.
166 - if !self.filter.feed_tags.is_empty() {
167 - let matching_feed_ids = self
168 - .db
169 - .tags()
170 - .feed_ids_with_tags(&self.filter.feed_tags)
171 - .await?;
172 -
173 - // Resolve feed_ids to busser_ids so we can filter FeedItems
174 - let feeds = self.db.feeds().list_all().await?;
175 - let matching_busser_ids: std::collections::HashSet<String> = feeds
176 - .iter()
177 - .filter(|f| matching_feed_ids.contains(&f.id))
178 - .map(|f| f.busser_id.to_string())
179 - .collect();
180 -
181 - feed_items.retain(|item| matching_busser_ids.contains(&item.id.source));
182 - }
183 -
184 - if !self.filter.tags.is_empty() {
185 - feed_items = self.filter.apply_tags_only(feed_items);
186 - }
187 -
188 - // Apply query feed conditions that require in-memory evaluation
189 - // (title/author/body contains, not_contains, equals, matches_regex).
190 - if !self.filter.conditions.is_empty() {
191 - feed_items.retain(|item| self.filter.matches(item));
192 - }
193 - self.order_by.apply(&mut feed_items);
194 - // When in-memory filtering is active and SQL indicated more rows exist,
195 - // we cannot know whether subsequent SQL pages contain matching items.
196 - // Conservatively report has_more = true so the UI can request the next
197 - // page. This may occasionally produce an empty last page, which is a
198 - // better UX than silently hiding matching items.
199 - let has_more = if needs_inmemory_filter && sql_had_more && feed_items.len() <= self.page_size as usize {
200 - true
201 - } else {
202 - feed_items.len() > self.page_size as usize
203 - };
204 - feed_items.truncate(self.page_size as usize);
205 -
206 - debug!(count = feed_items.len(), page, has_more, "Returning items");
207 - Ok(PaginatedItems {
208 - items: feed_items,
209 - has_more,
210 - })
211 - }
212 -
213 - /// Maximum number of items to fetch in a single `get_all_items` call.
214 - /// Guards against unbounded memory usage when the database is large.
215 - /// Increase this (or switch to paginated iteration) if feeds grow past
216 - /// this threshold.
217 - const MAX_ALL_ITEMS: i64 = 10_000;
218 -
219 - /// Get all items (for offline use or small feeds).
220 - ///
221 - /// Capped at [`Self::MAX_ALL_ITEMS`] rows to prevent unbounded memory use.
222 - /// Fetches `MAX_ALL_ITEMS + 1` to detect whether more items exist beyond
223 - /// the cap, using the same strategy as [`Self::get_items`].
224 - pub async fn get_all_items(&self) -> Result<PaginatedItems, FeedError> {
225 - let fetch_limit = Self::MAX_ALL_ITEMS + 1;
226 - let items = self.db.items().list_all(fetch_limit, 0).await?;
227 -
228 - let mut feed_items: Vec<FeedItem> = items
229 - .into_iter()
230 - .map(|db_item| db_item.to_feed_item())
231 - .collect();
232 -
233 - feed_items = self.filter.apply(feed_items);
234 - self.order_by.apply(&mut feed_items);
235 -
236 - let has_more = feed_items.len() > Self::MAX_ALL_ITEMS as usize;
237 - feed_items.truncate(Self::MAX_ALL_ITEMS as usize);
238 -
239 - Ok(PaginatedItems {
240 - items: feed_items,
241 - has_more,
242 - })
243 - }
244 -
245 - /// Get total item count
246 - pub async fn count(&self) -> Result<i64, FeedError> {
247 - if let Some(ref source) = self.filter.source {
248 - Ok(self.db.items().count_by_busser(source).await?)
249 - } else if self.filter.unread_only {
250 - Ok(self.db.items().count_unread().await?)
251 - } else {
252 - Ok(self.db.items().count_all().await?)
253 - }
254 - }
255 -
256 - /// Get unread count
257 - pub async fn unread_count(&self) -> Result<i64, FeedError> {
258 - Ok(self.db.items().count_unread().await?)
259 - }
260 -
261 - /// Get source information.
262 - ///
263 - /// Fetches all feeds and their item counts in two queries (feeds + a single
264 - /// GROUP BY on feed_items) instead of issuing per-source count queries.
265 - pub async fn get_sources(&self) -> Result<Vec<SourceInfo>, FeedError> {
266 - let feeds = self.db.feeds().list_all().await?;
267 - let counts = self.db.items().counts_by_busser().await?;
268 - let all_feed_tags = self.db.tags().all_feed_tags().await?;
269 -
270 - // Build a lookup map: busser_id -> (total, unread)
271 - let count_map: std::collections::HashMap<&str, (i64, i64)> = counts
272 - .iter()
273 - .map(|(id, total, unread)| (id.as_str(), (*total, *unread)))
274 - .collect();
275 -
276 - // Build a lookup map: feed_id -> Vec<tag>
277 - let mut tag_map: std::collections::HashMap<bb_db::FeedId, Vec<String>> =
278 - std::collections::HashMap::new();
279 - for (feed_id, tag) in all_feed_tags {
280 - tag_map.entry(feed_id).or_default().push(tag);
281 - }
282 -
283 - let sources = feeds
284 - .into_iter()
285 - .map(|feed| {
286 - let (total_count, unread_count) =
287 - count_map.get(feed.busser_id.as_str()).copied().unwrap_or((0, 0));
288 - let tags = tag_map.remove(&feed.id).unwrap_or_default();
289 - SourceInfo {
290 - id: feed.busser_id.to_string(),
291 - name: feed.name,
292 - total_count,
293 - unread_count,
294 - tags,
295 - consecutive_failures: feed.consecutive_failures,
296 - last_error: feed.last_error,
297 - circuit_broken: feed.circuit_broken,
298 - }
299 - })
300 - .collect();
301 -
302 - Ok(sources)
303 - }
304 -
305 - /// Mark an item as read
306 - pub async fn mark_read(&self, item_id: &str, is_read: bool) -> Result<(), FeedError> {
307 - // Find item by external_id
308 - if let Some(item) = self.db.items().get_by_external_id(item_id).await? {
309 - self.db.items().mark_read(item.id, is_read).await?;
310 - }
311 - Ok(())
312 - }
313 -
314 - /// Mark an item as starred
315 - pub async fn mark_starred(&self, item_id: &str, is_starred: bool) -> Result<(), FeedError> {
316 - if let Some(item) = self.db.items().get_by_external_id(item_id).await? {
317 - self.db.items().mark_starred(item.id, is_starred).await?;
318 - }
319 - Ok(())
320 - }
321 -
322 - /// Get the database handle
323 - pub fn database(&self) -> &Database {
324 - &self.db
325 - }
326 - }
327 -
328 - #[cfg(test)]
329 - mod tests {
330 - use super::*;
331 - use bb_db::{BusserId, CreateFeed, CreateFeedItem};
332 - use chrono::{Duration, Utc};
333 -
334 - async fn test_db() -> Database {
335 - let db = Database::connect("sqlite::memory:").await.unwrap();
336 - db.migrate().await.unwrap();
337 - db
338 - }
339 -
340 - async fn seed_feed(db: &Database, busser_id: &str, name: &str) -> bb_db::DbFeed {
341 - db.feeds()
342 - .create(CreateFeed {
343 - busser_id: BusserId::new(busser_id),
344 - name: name.to_string(),
345 - config: serde_json::json!({}),
346 - })
347 - .await
348 - .unwrap()
349 - }
350 -
351 - async fn seed_item(
352 - db: &Database,
353 - feed: &bb_db::DbFeed,
354 - external_id: &str,
355 - hours_ago: i64,
356 - ) -> bb_db::DbFeedItem {
357 - db.items()
358 - .upsert(CreateFeedItem {
359 - external_id: external_id.to_string(),
360 - feed_id: feed.id,
361 - busser_id: feed.busser_id.clone(),
362 - bite_author: "author".to_string(),
363 - bite_text: format!("Item {external_id}"),
364 - bite_secondary: None,
365 - bite_indicator: None,
366 - title: Some(format!("Title {external_id}")),
367 - body: None,
368 - url: None,
369 - media: vec![],
370 - published_at: Utc::now() - Duration::hours(hours_ago),
371 - source_name: "test".to_string(),
372 - score: None,
373 - tags: vec![],
374 - })
375 - .await
376 - .unwrap()
377 - }
378 -
379 - // ── Constructor & builders ────────────────────────────────────
380 -
381 - #[tokio::test]
382 - async fn new_has_default_order_and_filter() {
383 - let db = test_db().await;
384 - let gen = FeedGenerator::new(db);
385 - assert_eq!(gen.order(), OrderBy::default());
386 - assert_eq!(gen.filter().source, None);
387 - assert!(!gen.filter().unread_only);
388 - assert!(!gen.filter().starred_only);
389 - }
390 -
391 - #[tokio::test]
392 - async fn with_order_changes_order() {
393 - let db = test_db().await;
394 - let gen = FeedGenerator::new(db).with_order(OrderBy::Score);
395 - assert_eq!(gen.order(), OrderBy::Score);
396 - }
397 -
398 - #[tokio::test]
399 - async fn with_filter_changes_filter() {
400 - let db = test_db().await;
401 - let filter = FeedFilter::new().source("rss");
402 - let gen = FeedGenerator::new(db).with_filter(filter);
403 - assert_eq!(gen.filter().source.as_deref(), Some("rss"));
404 - }
405 -
406 - #[tokio::test]
407 - async fn set_order_mutates() {
408 - let db = test_db().await;
409 - let mut gen = FeedGenerator::new(db);
410 - gen.set_order(OrderBy::Score);
411 - assert_eq!(gen.order(), OrderBy::Score);
412 - }
413 -
414 - #[tokio::test]
415 - async fn set_filter_mutates() {
416 - let db = test_db().await;
417 - let mut gen = FeedGenerator::new(db);
418 - gen.set_filter(FeedFilter::new().unread_only());
419 - assert!(gen.filter().unread_only);
420 - }
421 -
422 - // ── get_items ────────────────────────────────────────────────
423 -
424 - #[tokio::test]
425 - async fn get_items_empty_db_returns_empty() {
426 - let db = test_db().await;
427 - let gen = FeedGenerator::new(db);
428 - let result = gen.get_items(0).await.unwrap();
429 - assert!(result.items.is_empty());
430 - assert!(!result.has_more);
431 - }
432 -
433 - #[tokio::test]
434 - async fn get_items_returns_inserted_items() {
435 - let db = test_db().await;
436 - let feed = seed_feed(&db, "rss", "Test Feed").await;
437 - seed_item(&db, &feed, "rss:1", 2).await;
438 - seed_item(&db, &feed, "rss:2", 1).await;
439 -
440 - let gen = FeedGenerator::new(db);
441 - let result = gen.get_items(0).await.unwrap();
442 - assert_eq!(result.items.len(), 2);
443 - assert!(!result.has_more);
444 - }
445 -
446 - #[tokio::test]
447 - async fn get_items_respects_page_size() {
448 - let db = test_db().await;
449 - let feed = seed_feed(&db, "rss", "Feed").await;
450 - for i in 0..10 {
451 - seed_item(&db, &feed, &format!("rss:{i}"), i).await;
452 - }
453 -
454 - let gen = FeedGenerator::new(db).with_page_size(3);
455 - let result = gen.get_items(0).await.unwrap();
456 - assert_eq!(result.items.len(), 3);
457 - assert!(result.has_more);
458 - }
459 -
460 - // ── count ────────────────────────────────────────────────────
461 -
462 - #[tokio::test]
463 - async fn count_returns_total() {
464 - let db = test_db().await;
465 - let feed = seed_feed(&db, "rss", "Feed").await;
466 - seed_item(&db, &feed, "rss:1", 0).await;
467 - seed_item(&db, &feed, "rss:2", 0).await;
468 - seed_item(&db, &feed, "rss:3", 0).await;
469 -
470 - let gen = FeedGenerator::new(db);
471 - assert_eq!(gen.count().await.unwrap(), 3);
472 - }
473 -
474 - #[tokio::test]
475 - async fn count_with_source_filter() {
476 - let db = test_db().await;
477 - let feed_a = seed_feed(&db, "rss", "RSS").await;
478 - let feed_b = seed_feed(&db, "hn", "HN").await;
479 - seed_item(&db, &feed_a, "rss:1", 0).await;
480 - seed_item(&db, &feed_b, "hn:1", 0).await;
481 - seed_item(&db, &feed_b, "hn:2", 0).await;
482 -
483 - let gen = FeedGenerator::new(db).with_filter(FeedFilter::new().source("hn"));
484 - assert_eq!(gen.count().await.unwrap(), 2);
485 - }
486 -
487 - // ── get_sources ──────────────────────────────────────────────
488 -
489 - #[tokio::test]
490 - async fn get_sources_aggregates_feeds() {
491 - let db = test_db().await;
492 - let feed_a = seed_feed(&db, "rss", "RSS Feed").await;
493 - let feed_b = seed_feed(&db, "hn", "HN Feed").await;
494 - seed_item(&db, &feed_a, "rss:1", 0).await;
495 - seed_item(&db, &feed_b, "hn:1", 0).await;
496 - seed_item(&db, &feed_b, "hn:2", 0).await;
497 -
498 - let gen = FeedGenerator::new(db);
499 - let sources = gen.get_sources().await.unwrap();
500 - assert_eq!(sources.len(), 2);
Lines truncated
@@ -0,0 +1,111 @@
1 + //! Item mutation and database access methods for the feed generator.
2 +
3 + use bb_db::Database;
4 + use super::{FeedError, FeedGenerator};
5 +
6 + impl FeedGenerator {
7 + /// Mark an item as read
8 + pub async fn mark_read(&self, item_id: &str, is_read: bool) -> Result<(), FeedError> {
9 + // Find item by external_id
10 + if let Some(item) = self.db.items().get_by_external_id(item_id).await? {
11 + self.db.items().mark_read(item.id, is_read).await?;
12 + }
13 + Ok(())
14 + }
15 +
16 + /// Mark an item as starred
17 + pub async fn mark_starred(&self, item_id: &str, is_starred: bool) -> Result<(), FeedError> {
18 + if let Some(item) = self.db.items().get_by_external_id(item_id).await? {
19 + self.db.items().mark_starred(item.id, is_starred).await?;
20 + }
21 + Ok(())
22 + }
23 +
24 + /// Get the database handle
25 + pub fn database(&self) -> &Database {
26 + &self.db
27 + }
28 + }
29 +
30 + #[cfg(test)]
31 + mod tests {
32 + use super::super::tests::*;
33 + use super::super::FeedGenerator;
34 +
35 + // ── mark_read / mark_starred ─────────────────────────────────
36 +
37 + #[tokio::test]
38 + async fn mark_read_updates_item() {
39 + let db = test_db().await;
40 + let feed = seed_feed(&db, "rss", "Feed").await;
41 + seed_item(&db, &feed, "rss:mr", 0).await;
42 +
43 + let gen = FeedGenerator::new(db.clone());
44 + gen.mark_read("rss:mr", true).await.unwrap();
45 +
46 + let item = db.items().get_by_external_id("rss:mr").await.unwrap().unwrap();
47 + assert!(item.is_read);
48 + }
49 +
50 + #[tokio::test]
51 + async fn mark_starred_updates_item() {
52 + let db = test_db().await;
53 + let feed = seed_feed(&db, "rss", "Feed").await;
54 + seed_item(&db, &feed, "rss:ms", 0).await;
55 +
56 + let gen = FeedGenerator::new(db.clone());
57 + gen.mark_starred("rss:ms", true).await.unwrap();
58 +
59 + let item = db.items().get_by_external_id("rss:ms").await.unwrap().unwrap();
60 + assert!(item.is_starred);
61 + }
62 +
63 + #[tokio::test]
64 + async fn mark_read_nonexistent_is_noop() {
65 + let db = test_db().await;
66 + let gen = FeedGenerator::new(db);
67 + // Should not error on missing item
68 + gen.mark_read("nonexistent:1", true).await.unwrap();
69 + }
70 +
71 + // ── mark_starred edge case ──────────────────────────────────
72 +
73 + #[tokio::test]
74 + async fn mark_starred_nonexistent_is_noop() {
75 + let db = test_db().await;
76 + let gen = FeedGenerator::new(db);
77 + gen.mark_starred("nonexistent:1", true).await.unwrap();
78 + }
79 +
80 + #[tokio::test]
81 + async fn mark_read_toggle_off() {
82 + let db = test_db().await;
83 + let feed = seed_feed(&db, "rss", "Feed").await;
84 + seed_item(&db, &feed, "rss:toggle", 0).await;
85 +
86 + let gen = FeedGenerator::new(db.clone());
87 + gen.mark_read("rss:toggle", true).await.unwrap();
88 + let item = db.items().get_by_external_id("rss:toggle").await.unwrap().unwrap();
89 + assert!(item.is_read);
90 +
91 + gen.mark_read("rss:toggle", false).await.unwrap();
92 + let item = db.items().get_by_external_id("rss:toggle").await.unwrap().unwrap();
93 + assert!(!item.is_read);
94 + }
95 +
96 + #[tokio::test]
97 + async fn mark_starred_toggle_off() {
98 + let db = test_db().await;
99 + let feed = seed_feed(&db, "rss", "Feed").await;
100 + seed_item(&db, &feed, "rss:toggle", 0).await;
101 +
102 + let gen = FeedGenerator::new(db.clone());
103 + gen.mark_starred("rss:toggle", true).await.unwrap();
104 + let item = db.items().get_by_external_id("rss:toggle").await.unwrap().unwrap();
105 + assert!(item.is_starred);
106 +
107 + gen.mark_starred("rss:toggle", false).await.unwrap();
108 + let item = db.items().get_by_external_id("rss:toggle").await.unwrap().unwrap();
109 + assert!(!item.is_starred);
110 + }
111 + }
@@ -0,0 +1,258 @@
1 + //! Feed generator for aggregating, filtering, and ordering items.
2 + //!
3 + //! Reads items from the database, applies user-defined filters and
4 + //! ordering, and returns paginated results for display.
5 +
6 + mod items;
7 + mod query;
8 +
9 + use bb_db::Database;
10 + use thiserror::Error;
11 +
12 + use crate::{FeedFilter, OrderBy};
13 +
14 + // items and query add impl blocks to FeedGenerator (no new public types to re-export).
15 +
16 + #[derive(Error, Debug)]
17 + pub enum FeedError {
18 + #[error("Database error: {0}")]
19 + Database(#[from] sqlx::Error),
20 + }
21 +
22 + /// A page of feed items with a flag indicating whether more pages exist.
23 + #[derive(Debug)]
24 + pub struct PaginatedItems {
25 + /// The items for this page (at most `page_size` items).
26 + pub items: Vec<bb_interface::FeedItem>,
27 + /// Whether additional pages of results are available beyond this one.
28 + pub has_more: bool,
29 + }
30 +
31 + /// Source info with item counts
32 + #[derive(Debug, Clone)]
33 + pub struct SourceInfo {
34 + /// Busser/plugin identifier.
35 + pub id: String,
36 + /// Human-readable source name.
37 + pub name: String,
38 + /// Total number of items from this source.
39 + pub total_count: i64,
40 + /// Number of unread items from this source.
41 + pub unread_count: i64,
42 + /// User-assigned tags for this feed.
43 + pub tags: Vec<String>,
44 + /// Number of consecutive fetch failures (0 = healthy).
45 + pub consecutive_failures: i64,
46 + /// Error message from the last failed fetch.
47 + pub last_error: Option<String>,
48 + /// Whether the circuit breaker has tripped for this feed.
49 + pub circuit_broken: bool,
50 + }
51 +
52 + /// The feed generator aggregates and orders items from the database
53 + pub struct FeedGenerator {
54 + db: Database,
55 + order_by: OrderBy,
56 + filter: FeedFilter,
57 + page_size: i64,
58 + }
59 +
60 + impl FeedGenerator {
61 + pub fn new(db: Database) -> Self {
62 + Self {
63 + db,
64 + order_by: OrderBy::default(),
65 + filter: FeedFilter::default(),
66 + page_size: 50,
67 + }
68 + }
69 +
70 + pub fn with_order(mut self, order: OrderBy) -> Self {
71 + self.order_by = order;
72 + self
73 + }
74 +
75 + pub fn with_filter(mut self, filter: FeedFilter) -> Self {
76 + self.filter = filter;
77 + self
78 + }
79 +
80 + pub fn with_page_size(mut self, size: i64) -> Self {
81 + self.page_size = size;
82 + self
83 + }
84 +
85 + pub fn set_order(&mut self, order: OrderBy) {
86 + self.order_by = order;
87 + }
88 +
89 + pub fn set_filter(&mut self, filter: FeedFilter) {
90 + self.filter = filter;
91 + }
92 +
93 + pub fn order(&self) -> OrderBy {
94 + self.order_by
95 + }
96 +
97 + pub fn filter(&self) -> &FeedFilter {
98 + &self.filter
99 + }
100 + }
101 +
102 + #[cfg(test)]
103 + mod tests {
104 + use super::*;
105 + use bb_db::{BusserId, CreateFeed, CreateFeedItem};
106 + use chrono::{Duration, Utc};
107 +
108 + pub(crate) async fn test_db() -> Database {
109 + let db = Database::connect("sqlite::memory:").await.unwrap();
110 + db.migrate().await.unwrap();
111 + db
112 + }
113 +
114 + pub(crate) async fn seed_feed(db: &Database, busser_id: &str, name: &str) -> bb_db::DbFeed {
115 + db.feeds()
116 + .create(CreateFeed {
117 + busser_id: BusserId::new(busser_id),
118 + name: name.to_string(),
119 + config: serde_json::json!({}),
120 + })
121 + .await
122 + .unwrap()
123 + }
124 +
125 + pub(crate) async fn seed_item(
126 + db: &Database,
127 + feed: &bb_db::DbFeed,
128 + external_id: &str,
129 + hours_ago: i64,
130 + ) -> bb_db::DbFeedItem {
131 + db.items()
132 + .upsert(CreateFeedItem {
133 + external_id: external_id.to_string(),
134 + feed_id: feed.id,
135 + busser_id: feed.busser_id.clone(),
136 + bite_author: "author".to_string(),
137 + bite_text: format!("Item {external_id}"),
138 + bite_secondary: None,
139 + bite_indicator: None,
140 + title: Some(format!("Title {external_id}")),
141 + body: None,
142 + url: None,
143 + media: vec![],
144 + published_at: Utc::now() - Duration::hours(hours_ago),
145 + source_name: "test".to_string(),
146 + score: None,
147 + tags: vec![],
148 + actions: vec![],
149 + })
150 + .await
151 + .unwrap()
152 + }
153 +
154 + /// Seed an item with a score for ordering tests.
155 + pub(crate) async fn seed_scored_item(
156 + db: &Database,
157 + feed: &bb_db::DbFeed,
158 + external_id: &str,
159 + hours_ago: i64,
160 + score: Option<i64>,
161 + ) -> bb_db::DbFeedItem {
162 + db.items()
163 + .upsert(CreateFeedItem {
164 + external_id: external_id.to_string(),
165 + feed_id: feed.id,
166 + busser_id: feed.busser_id.clone(),
167 + bite_author: "author".to_string(),
168 + bite_text: format!("Item {external_id}"),
169 + bite_secondary: None,
170 + bite_indicator: None,
171 + title: Some(format!("Title {external_id}")),
172 + body: None,
173 + url: None,
174 + media: vec![],
175 + published_at: Utc::now() - Duration::hours(hours_ago),
176 + source_name: "test".to_string(),
177 + score,
178 + tags: vec![],
179 + actions: vec![],
180 + })
181 + .await
182 + .unwrap()
183 + }
184 +
185 + /// Seed an item with item-level tags.
186 + pub(crate) async fn seed_tagged_item(
187 + db: &Database,
188 + feed: &bb_db::DbFeed,
189 + external_id: &str,
190 + hours_ago: i64,
191 + tags: Vec<String>,
192 + ) -> bb_db::DbFeedItem {
193 + db.items()
194 + .upsert(CreateFeedItem {
195 + external_id: external_id.to_string(),
196 + feed_id: feed.id,
197 + busser_id: feed.busser_id.clone(),
198 + bite_author: "author".to_string(),
199 + bite_text: format!("Item {external_id}"),
200 + bite_secondary: None,
201 + bite_indicator: None,
202 + title: Some(format!("Title {external_id}")),
203 + body: Some(format!("Body of {external_id}")),
204 + url: None,
205 + media: vec![],
206 + published_at: Utc::now() - Duration::hours(hours_ago),
207 + source_name: "test".to_string(),
208 + score: None,
209 + tags,
210 + actions: vec![],
211 + })
212 + .await
213 + .unwrap()
214 + }
215 +
216 + // ── Constructor & builders ────────────────────────────────────
217 +
218 + #[tokio::test]
219 + async fn new_has_default_order_and_filter() {
220 + let db = test_db().await;
221 + let gen = FeedGenerator::new(db);
222 + assert_eq!(gen.order(), OrderBy::default());
223 + assert_eq!(gen.filter().source, None);
224 + assert!(!gen.filter().unread_only);
225 + assert!(!gen.filter().starred_only);
226 + }
227 +
228 + #[tokio::test]
229 + async fn with_order_changes_order() {
230 + let db = test_db().await;
231 + let gen = FeedGenerator::new(db).with_order(OrderBy::Score);
232 + assert_eq!(gen.order(), OrderBy::Score);
233 + }
234 +
235 + #[tokio::test]
236 + async fn with_filter_changes_filter() {
237 + let db = test_db().await;
238 + let filter = FeedFilter::new().source("rss");
239 + let gen = FeedGenerator::new(db).with_filter(filter);
240 + assert_eq!(gen.filter().source.as_deref(), Some("rss"));
241 + }
242 +
243 + #[tokio::test]
244 + async fn set_order_mutates() {
245 + let db = test_db().await;
246 + let mut gen = FeedGenerator::new(db);
247 + gen.set_order(OrderBy::Score);
248 + assert_eq!(gen.order(), OrderBy::Score);
249 + }
250 +
251 + #[tokio::test]
252 + async fn set_filter_mutates() {
253 + let db = test_db().await;
254 + let mut gen = FeedGenerator::new(db);
255 + gen.set_filter(FeedFilter::new().unread_only());
256 + assert!(gen.filter().unread_only);
257 + }
258 + }
@@ -0,0 +1,1035 @@
1 + //! Query and pagination methods for the feed generator.
2 +
3 + use bb_interface::FeedItem;
4 + use tracing::debug;
5 +
6 + use super::{FeedError, FeedGenerator, PaginatedItems, SourceInfo};
7 +
8 + impl FeedGenerator {
9 + /// Get a page of feed items with pagination metadata.
10 + ///
11 + /// When a search query is present, the search predicate is pushed into SQL
12 + /// so that LIMIT/OFFSET pagination works correctly with filtered results.
13 + /// Other filters (source, unread, starred) are combined in the same query.
14 + ///
15 + /// Fetches `page_size + 1` items to detect whether more pages exist,
16 + /// then applies in-memory ordering before truncating to the exact page size.
17 + pub async fn get_items(
18 + &self,
19 + page: i64,
20 + ) -> Result<PaginatedItems, FeedError> {
21 + let offset = page * self.page_size;
22 +
23 + // Fetch page_size + 1 to detect whether more pages exist.
24 + let fetch_limit = self.page_size + 1;
25 +
26 + // When a search query is present, push it into SQL so pagination
27 + // is accurate (in-memory search after LIMIT would miss results).
28 + let items = if let Some(ref search) = self.filter.search {
29 + self.db
30 + .items()
31 + .list_search(
32 + search,
33 + self.filter.source.as_deref(),
34 + self.filter.unread_only,
35 + self.filter.starred_only,
36 + fetch_limit,
37 + offset,
38 + )
39 + .await?
40 + } else if let Some(ref source) = self.filter.source {
41 + self.db
42 + .items()
43 + .list_by_busser(source, fetch_limit, offset)
44 + .await?
45 + } else if self.filter.unread_only {
46 + self.db
47 + .items()
48 + .list_unread(fetch_limit, offset)
49 + .await?
50 + } else if self.filter.starred_only {
51 + self.db
52 + .items()
53 + .list_starred(fetch_limit, offset)
54 + .await?
55 + } else {
56 + self.db
57 + .items()
58 + .list_all(fetch_limit, offset)
59 + .await?
60 + };
61 +
62 + // Convert to FeedItem
63 + let mut feed_items: Vec<FeedItem> = items
64 + .into_iter()
65 + .map(|db_item| db_item.to_feed_item())
66 + .collect();
67 +
68 + // Track whether SQL returned a full page (indicating more rows exist)
69 + // BEFORE any in-memory filtering reduces the count.
70 + let sql_had_more = feed_items.len() > self.page_size as usize;
71 +
72 + // Whether any in-memory filter is active (used for has_more logic below).
73 + let needs_inmemory_filter =
74 + !self.filter.tags.is_empty() || !self.filter.feed_tags.is_empty() || !self.filter.conditions.is_empty();
75 +
76 + // Apply feed-tag filtering: only keep items whose feed has a matching tag.
77 + if !self.filter.feed_tags.is_empty() {
78 + let matching_feed_ids = self
79 + .db
80 + .tags()
81 + .feed_ids_with_tags(&self.filter.feed_tags)
82 + .await?;
83 +
84 + // Resolve feed_ids to busser_ids so we can filter FeedItems
85 + let feeds = self.db.feeds().list_all().await?;
86 + let matching_busser_ids: std::collections::HashSet<String> = feeds
87 + .iter()
88 + .filter(|f| matching_feed_ids.contains(&f.id))
89 + .map(|f| f.busser_id.to_string())
90 + .collect();
91 +
92 + feed_items.retain(|item| matching_busser_ids.contains(&item.id.source));
93 + }
94 +
95 + if !self.filter.tags.is_empty() {
96 + feed_items = self.filter.apply_tags_only(feed_items);
97 + }
98 +
99 + // Apply query feed conditions that require in-memory evaluation
100 + // (title/author/body contains, not_contains, equals, matches_regex).
101 + if !self.filter.conditions.is_empty() {
102 + feed_items.retain(|item| self.filter.matches(item));
103 + }
104 + self.order_by.apply(&mut feed_items);
105 + // When in-memory filtering is active and SQL indicated more rows exist,
106 + // we cannot know whether subsequent SQL pages contain matching items.
107 + // Conservatively report has_more = true so the UI can request the next
108 + // page. This may occasionally produce an empty last page, which is a
109 + // better UX than silently hiding matching items.
110 + let has_more = if needs_inmemory_filter && sql_had_more && feed_items.len() <= self.page_size as usize {
111 + true
112 + } else {
113 + feed_items.len() > self.page_size as usize
114 + };
115 + feed_items.truncate(self.page_size as usize);
116 +
117 + debug!(count = feed_items.len(), page, has_more, "Returning items");
118 + Ok(PaginatedItems {
119 + items: feed_items,
120 + has_more,
121 + })
122 + }
123 +
124 + /// Maximum number of items to fetch in a single `get_all_items` call.
125 + /// Guards against unbounded memory usage when the database is large.
126 + /// Increase this (or switch to paginated iteration) if feeds grow past
127 + /// this threshold.
128 + const MAX_ALL_ITEMS: i64 = 10_000;
129 +
130 + /// Get all items (for offline use or small feeds).
131 + ///
132 + /// Capped at [`Self::MAX_ALL_ITEMS`] rows to prevent unbounded memory use.
133 + /// Fetches `MAX_ALL_ITEMS + 1` to detect whether more items exist beyond
134 + /// the cap, using the same strategy as [`Self::get_items`].
135 + pub async fn get_all_items(&self) -> Result<PaginatedItems, FeedError> {
136 + let fetch_limit = Self::MAX_ALL_ITEMS + 1;
137 + let items = self.db.items().list_all(fetch_limit, 0).await?;
138 +
139 + let mut feed_items: Vec<FeedItem> = items
140 + .into_iter()
141 + .map(|db_item| db_item.to_feed_item())
142 + .collect();
143 +
144 + feed_items = self.filter.apply(feed_items);
145 + self.order_by.apply(&mut feed_items);
146 +
147 + let has_more = feed_items.len() > Self::MAX_ALL_ITEMS as usize;
148 + feed_items.truncate(Self::MAX_ALL_ITEMS as usize);
149 +
150 + Ok(PaginatedItems {
151 + items: feed_items,
152 + has_more,
153 + })
154 + }
155 +
156 + /// Get total item count
157 + pub async fn count(&self) -> Result<i64, FeedError> {
158 + if let Some(ref source) = self.filter.source {
159 + Ok(self.db.items().count_by_busser(source).await?)
160 + } else if self.filter.unread_only {
161 + Ok(self.db.items().count_unread().await?)
162 + } else {
163 + Ok(self.db.items().count_all().await?)
164 + }
165 + }
166 +
167 + /// Get unread count
168 + pub async fn unread_count(&self) -> Result<i64, FeedError> {
169 + Ok(self.db.items().count_unread().await?)
170 + }
171 +
172 + /// Get source information.
173 + ///
174 + /// Fetches all feeds and their item counts in two queries (feeds + a single
175 + /// GROUP BY on feed_items) instead of issuing per-source count queries.
176 + pub async fn get_sources(&self) -> Result<Vec<SourceInfo>, FeedError> {
177 + let feeds = self.db.feeds().list_all().await?;
178 + let counts = self.db.items().counts_by_busser().await?;
179 + let all_feed_tags = self.db.tags().all_feed_tags().await?;
180 +
181 + // Build a lookup map: busser_id -> (total, unread)
182 + let count_map: std::collections::HashMap<&str, (i64, i64)> = counts
183 + .iter()
184 + .map(|(id, total, unread)| (id.as_str(), (*total, *unread)))
185 + .collect();
186 +
187 + // Build a lookup map: feed_id -> Vec<tag>
188 + let mut tag_map: std::collections::HashMap<bb_db::FeedId, Vec<String>> =
189 + std::collections::HashMap::new();
190 + for (feed_id, tag) in all_feed_tags {
191 + tag_map.entry(feed_id).or_default().push(tag);
192 + }
193 +
194 + let sources = feeds
195 + .into_iter()
196 + .map(|feed| {
197 + let (total_count, unread_count) =
198 + count_map.get(feed.busser_id.as_str()).copied().unwrap_or((0, 0));
199 + let tags = tag_map.remove(&feed.id).unwrap_or_default();
200 + SourceInfo {
201 + id: feed.busser_id.to_string(),
202 + name: feed.name,
203 + total_count,
204 + unread_count,
205 + tags,
206 + consecutive_failures: feed.consecutive_failures,
207 + last_error: feed.last_error,
208 + circuit_broken: feed.circuit_broken,
209 + }
210 + })
211 + .collect();
212 +
213 + Ok(sources)
214 + }
215 + }
216 +
217 + #[cfg(test)]
218 + mod tests {
219 + use super::super::tests::*;
220 + use super::super::FeedGenerator;
221 + use crate::{FeedFilter, OrderBy};
222 +
223 + // ── get_items ────────────────────────────────────────────────
224 +
225 + #[tokio::test]
226 + async fn get_items_empty_db_returns_empty() {
227 + let db = test_db().await;
228 + let gen = FeedGenerator::new(db);
229 + let result = gen.get_items(0).await.unwrap();
230 + assert!(result.items.is_empty());
231 + assert!(!result.has_more);
232 + }
233 +
234 + #[tokio::test]
235 + async fn get_items_returns_inserted_items() {
236 + let db = test_db().await;
237 + let feed = seed_feed(&db, "rss", "Test Feed").await;
238 + seed_item(&db, &feed, "rss:1", 2).await;
239 + seed_item(&db, &feed, "rss:2", 1).await;
240 +
241 + let gen = FeedGenerator::new(db);
242 + let result = gen.get_items(0).await.unwrap();
243 + assert_eq!(result.items.len(), 2);
244 + assert!(!result.has_more);
245 + }
246 +
247 + #[tokio::test]
248 + async fn get_items_respects_page_size() {
249 + let db = test_db().await;
250 + let feed = seed_feed(&db, "rss", "Feed").await;
251 + for i in 0..10 {
252 + seed_item(&db, &feed, &format!("rss:{i}"), i).await;
253 + }
254 +
255 + let gen = FeedGenerator::new(db).with_page_size(3);
256 + let result = gen.get_items(0).await.unwrap();
257 + assert_eq!(result.items.len(), 3);
258 + assert!(result.has_more);
259 + }
260 +
261 + // ── count ────────────────────────────────────────────────────
262 +
263 + #[tokio::test]
264 + async fn count_returns_total() {
265 + let db = test_db().await;
266 + let feed = seed_feed(&db, "rss", "Feed").await;
267 + seed_item(&db, &feed, "rss:1", 0).await;
268 + seed_item(&db, &feed, "rss:2", 0).await;
269 + seed_item(&db, &feed, "rss:3", 0).await;
270 +
271 + let gen = FeedGenerator::new(db);
272 + assert_eq!(gen.count().await.unwrap(), 3);
273 + }
274 +
275 + #[tokio::test]
276 + async fn count_with_source_filter() {
277 + let db = test_db().await;
278 + let feed_a = seed_feed(&db, "rss", "RSS").await;
279 + let feed_b = seed_feed(&db, "hn", "HN").await;
280 + seed_item(&db, &feed_a, "rss:1", 0).await;
281 + seed_item(&db, &feed_b, "hn:1", 0).await;
282 + seed_item(&db, &feed_b, "hn:2", 0).await;
283 +
284 + let gen = FeedGenerator::new(db).with_filter(FeedFilter::new().source("hn"));
285 + assert_eq!(gen.count().await.unwrap(), 2);
286 + }
287 +
288 + // ── get_sources ──────────────────────────────────────────────
289 +
290 + #[tokio::test]
291 + async fn get_sources_aggregates_feeds() {
292 + let db = test_db().await;
293 + let feed_a = seed_feed(&db, "rss", "RSS Feed").await;
294 + let feed_b = seed_feed(&db, "hn", "HN Feed").await;
295 + seed_item(&db, &feed_a, "rss:1", 0).await;
296 + seed_item(&db, &feed_b, "hn:1", 0).await;
297 + seed_item(&db, &feed_b, "hn:2", 0).await;
298 +
299 + let gen = FeedGenerator::new(db);
300 + let sources = gen.get_sources().await.unwrap();
301 + assert_eq!(sources.len(), 2);
302 +
303 + let hn_source = sources.iter().find(|s| s.id == "hn").unwrap();
304 + assert_eq!(hn_source.total_count, 2);
305 + }
306 +
307 + // ── Pagination edge cases ────────────────────────────────────
308 +
309 + #[tokio::test]
310 + async fn get_items_exact_page_boundary_no_has_more() {
311 + let db = test_db().await;
312 + let feed = seed_feed(&db, "rss", "Feed").await;
313 + // Insert exactly page_size items
314 + for i in 0..3 {
315 + seed_item(&db, &feed, &format!("rss:{i}"), i).await;
316 + }
317 +
318 + let gen = FeedGenerator::new(db).with_page_size(3);
319 + let result = gen.get_items(0).await.unwrap();
320 + assert_eq!(result.items.len(), 3);
321 + assert!(!result.has_more);
322 + }
323 +
324 + #[tokio::test]
325 + async fn get_items_page_boundary_plus_one_has_more() {
326 + let db = test_db().await;
327 + let feed = seed_feed(&db, "rss", "Feed").await;
328 + // Insert page_size + 1 items
329 + for i in 0..4 {
330 + seed_item(&db, &feed, &format!("rss:{i}"), i).await;
331 + }
332 +
333 + let gen = FeedGenerator::new(db).with_page_size(3);
334 + let result = gen.get_items(0).await.unwrap();
335 + assert_eq!(result.items.len(), 3);
336 + assert!(result.has_more);
337 + }
338 +
339 + #[tokio::test]
340 + async fn get_items_last_page_partial() {
341 + let db = test_db().await;
342 + let feed = seed_feed(&db, "rss", "Feed").await;
343 + for i in 0..5 {
344 + seed_item(&db, &feed, &format!("rss:{i}"), i).await;
345 + }
346 +
347 + let gen = FeedGenerator::new(db).with_page_size(3);
348 + let page1 = gen.get_items(1).await.unwrap();
349 + assert_eq!(page1.items.len(), 2);
350 + assert!(!page1.has_more);
351 + }
352 +
353 + #[tokio::test]
354 + async fn get_items_past_end_returns_empty() {
355 + let db = test_db().await;
356 + let feed = seed_feed(&db, "rss", "Feed").await;
357 + seed_item(&db, &feed, "rss:1", 0).await;
358 +
359 + let gen = FeedGenerator::new(db).with_page_size(3);
360 + let result = gen.get_items(5).await.unwrap();
361 + assert!(result.items.is_empty());
362 + assert!(!result.has_more);
363 + }
364 +
365 + // ── Feed-tag filtering ──────────────────────────────────────
366 +
367 + #[tokio::test]
368 + async fn get_items_feed_tag_filter_keeps_matching() {
369 + let db = test_db().await;
370 + let feed_a = seed_feed(&db, "rss", "Tech Blog").await;
371 + let feed_b = seed_feed(&db, "hn", "News").await;
372 + seed_item(&db, &feed_a, "rss:1", 0).await;
373 + seed_item(&db, &feed_b, "hn:1", 0).await;
374 +
375 + // Tag only feed_a
376 + db.tags()
377 + .set_tags(feed_a.id, &["tech".to_string()])
378 + .await
379 + .unwrap();
380 +
381 + let gen = FeedGenerator::new(db)
382 + .with_filter(FeedFilter::new().with_feed_tag("tech"));
383 + let result = gen.get_items(0).await.unwrap();
384 + assert_eq!(result.items.len(), 1);
385 + assert_eq!(result.items[0].id.source, "rss");
386 + }
387 +
388 + #[tokio::test]
389 + async fn get_items_feed_tag_filter_no_match_returns_empty() {
390 + let db = test_db().await;
391 + let feed = seed_feed(&db, "rss", "Feed").await;
392 + seed_item(&db, &feed, "rss:1", 0).await;
393 +
394 + let gen = FeedGenerator::new(db)
395 + .with_filter(FeedFilter::new().with_feed_tag("nonexistent"));
396 + let result = gen.get_items(0).await.unwrap();
397 + assert!(result.items.is_empty());
398 + }
399 +
400 + #[tokio::test]
401 + async fn get_items_feed_tag_filter_multiple_feeds_same_tag() {
402 + let db = test_db().await;
403 + let feed_a = seed_feed(&db, "rss", "Blog A").await;
404 + let feed_b = seed_feed(&db, "hn", "Blog B").await;
405 + let feed_c = seed_feed(&db, "arxiv", "Science").await;
406 + seed_item(&db, &feed_a, "rss:1", 0).await;
407 + seed_item(&db, &feed_b, "hn:1", 0).await;
408 + seed_item(&db, &feed_c, "arxiv:1", 0).await;
409 +
410 + // Tag feeds A and B with "tech"
411 + db.tags()
412 + .set_tags(feed_a.id, &["tech".to_string()])
413 + .await
414 + .unwrap();
415 + db.tags()
416 + .set_tags(feed_b.id, &["tech".to_string()])
417 + .await
418 + .unwrap();
419 +
420 + let gen = FeedGenerator::new(db)
421 + .with_filter(FeedFilter::new().with_feed_tag("tech"));
422 + let result = gen.get_items(0).await.unwrap();
423 + assert_eq!(result.items.len(), 2);
424 + }
425 +
426 + // ── Filter combinations ─────────────────────────────────────
427 +
428 + #[tokio::test]
429 + async fn get_items_unread_filter() {
430 + let db = test_db().await;
431 + let feed = seed_feed(&db, "rss", "Feed").await;
432 + let item1 = seed_item(&db, &feed, "rss:1", 0).await;
433 + seed_item(&db, &feed, "rss:2", 1).await;
434 +
435 + db.items().mark_read(item1.id, true).await.unwrap();
436 +
437 + let gen = FeedGenerator::new(db)
438 + .with_filter(FeedFilter::new().unread_only());
439 + let result = gen.get_items(0).await.unwrap();
440 + assert_eq!(result.items.len(), 1);
441 + }
442 +
443 + #[tokio::test]
444 + async fn get_items_starred_filter() {
445 + let db = test_db().await;
446 + let feed = seed_feed(&db, "rss", "Feed").await;
447 + seed_item(&db, &feed, "rss:1", 0).await;
448 + let item2 = seed_item(&db, &feed, "rss:2", 1).await;
449 +
450 + db.items().mark_starred(item2.id, true).await.unwrap();
451 +
452 + let gen = FeedGenerator::new(db)
453 + .with_filter(FeedFilter::new().starred_only());
454 + let result = gen.get_items(0).await.unwrap();
455 + assert_eq!(result.items.len(), 1);
456 + assert!(result.items[0].is_starred);
457 + }
458 +
459 + // ── get_all_items ───────────────────────────────────────────
460 +
461 + #[tokio::test]
462 + async fn get_all_items_returns_all() {
463 + let db = test_db().await;
464 + let feed = seed_feed(&db, "rss", "Feed").await;
465 + for i in 0..5 {
466 + seed_item(&db, &feed, &format!("rss:{i}"), i).await;
467 + }
468 +
469 + let gen = FeedGenerator::new(db).with_page_size(2); // page_size irrelevant for get_all
470 + let result = gen.get_all_items().await.unwrap();
471 + assert_eq!(result.items.len(), 5);
472 + assert!(!result.has_more);
473 + }
474 +
475 + #[tokio::test]
476 + async fn get_all_items_applies_filter() {
477 + let db = test_db().await;
478 + let feed_a = seed_feed(&db, "rss", "RSS").await;
479 + let feed_b = seed_feed(&db, "hn", "HN").await;
480 + seed_item(&db, &feed_a, "rss:1", 0).await;
481 + seed_item(&db, &feed_b, "hn:1", 0).await;
482 +
483 + let gen = FeedGenerator::new(db)
484 + .with_filter(FeedFilter::new().source("rss"));
485 + let result = gen.get_all_items().await.unwrap();
486 + assert_eq!(result.items.len(), 1);
487 + assert_eq!(result.items[0].id.source, "rss");
488 + }
489 +
490 + #[tokio::test]
491 + async fn get_all_items_applies_ordering() {
492 + let db = test_db().await;
493 + let feed = seed_feed(&db, "rss", "Feed").await;
494 + seed_item(&db, &feed, "rss:old", 10).await;
495 + seed_item(&db, &feed, "rss:new", 1).await;
496 +
497 + let gen = FeedGenerator::new(db).with_order(OrderBy::Chronological);
498 + let result = gen.get_all_items().await.unwrap();
499 + assert_eq!(result.items[0].id.item_id, "rss:new");
500 + assert_eq!(result.items[1].id.item_id, "rss:old");
Lines truncated
@@ -9,4 +9,5 @@ crate-type = ["lib"]
9 9
10 10 [dependencies]
11 11 serde.workspace = true
12 + serde_json.workspace = true
12 13 chrono.workspace = true
@@ -85,6 +85,119 @@ impl std::fmt::Display for BusserError {
85 85
86 86 impl std::error::Error for BusserError {}
87 87
88 + /// Broad error categories for plugin fetch failures.
89 + ///
90 + /// Each category determines retry behavior, circuit-breaker weight,
91 + /// and what action the user should take.
92 + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
93 + #[serde(rename_all = "snake_case")]
94 + pub enum ErrorCategory {
95 + /// DNS timeout, 500, 502, 503 — retry normally, +1 failure weight.
96 + Transient,
97 + /// 429 or explicit retry-after — retry after delay, no failure penalty.
98 + RateLimited,
99 + /// 401, 403, invalid API key — no retry, immediate circuit break.
100 + Auth,
101 + /// Bad URL, missing field, 404 — no retry, immediate circuit break.
102 + Config,
103 + /// Malformed RSS/JSON — retry normally, +1 failure weight.
104 + Parse,
105 + /// Anything else — retry normally, +1 failure weight.
106 + Unknown,
107 + }
108 +
109 + /// A structured error carrying category, display message, and optional
110 + /// retry-after hint. Stored as JSON in the `last_error` column.
111 + #[derive(Clone, Debug, Serialize, Deserialize)]
112 + pub struct StructuredError {
113 + pub category: ErrorCategory,
114 + pub message: String,
115 + #[serde(skip_serializing_if = "Option::is_none")]
116 + pub retry_after_secs: Option<u64>,
117 + }
118 +
119 + impl BusserError {
120 + /// Map each variant to its default [`ErrorCategory`].
121 + pub fn category(&self) -> ErrorCategory {
122 + match self {
123 + BusserError::InitializationFailed { .. } => ErrorCategory::Config,
124 + BusserError::FetchFailed { .. } => ErrorCategory::Transient,
125 + BusserError::ConfigError { .. } => ErrorCategory::Config,
126 + BusserError::AuthError { .. } => ErrorCategory::Auth,
127 + BusserError::RateLimited { .. } => ErrorCategory::RateLimited,
128 + BusserError::ParseError { .. } => ErrorCategory::Parse,
129 + BusserError::Other { .. } => ErrorCategory::Unknown,
130 + }
131 + }
132 +
133 + /// Convert to a [`StructuredError`] using default category mapping.
134 + pub fn to_structured(&self) -> StructuredError {
135 + let retry_after_secs = match self {
136 + BusserError::RateLimited { retry_after_secs } => Some(*retry_after_secs),
137 + _ => None,
138 + };
139 + StructuredError {
140 + category: self.category(),
141 + message: self.to_string(),
142 + retry_after_secs,
143 + }
144 + }
145 + }
146 +
147 + impl StructuredError {
148 + /// Create a new structured error.
149 + pub fn new(category: ErrorCategory, message: impl Into<String>) -> Self {
150 + Self {
151 + category,
152 + message: message.into(),
153 + retry_after_secs: None,
154 + }
155 + }
156 +
157 + /// Create a rate-limited error with a retry-after hint.
158 + pub fn rate_limited(message: impl Into<String>, retry_after_secs: u64) -> Self {
159 + Self {
160 + category: ErrorCategory::RateLimited,
161 + message: message.into(),
162 + retry_after_secs: Some(retry_after_secs),
163 + }
164 + }
165 +
166 + /// Serialize to JSON for storage in `last_error`.
167 + pub fn to_json(&self) -> String {
168 + serde_json::to_string(self).unwrap_or_else(|_| self.message.clone())
169 + }
170 +
171 + /// Parse a `last_error` string. Tries JSON first, falls back to treating
172 + /// the raw string as an `Unknown` category error (backward compat).
173 + pub fn from_last_error(s: &str) -> Self {
174 + serde_json::from_str(s).unwrap_or_else(|_| Self {
175 + category: ErrorCategory::Unknown,
176 + message: s.to_string(),
177 + retry_after_secs: None,
178 + })
179 + }
180 + }
181 +
182 + impl std::fmt::Display for StructuredError {
183 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 + write!(f, "{}", self.message)
185 + }
186 + }
187 +
188 + impl std::fmt::Display for ErrorCategory {
189 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 + match self {
191 + ErrorCategory::Transient => write!(f, "transient"),
192 + ErrorCategory::RateLimited => write!(f, "rate_limited"),
193 + ErrorCategory::Auth => write!(f, "auth"),
194 + ErrorCategory::Config => write!(f, "config"),
195 + ErrorCategory::Parse => write!(f, "parse"),
196 + ErrorCategory::Unknown => write!(f, "unknown"),
197 + }
198 + }
199 + }
200 +
88 201 #[cfg(test)]
89 202 mod tests {
90 203 use super::*;
@@ -105,4 +218,62 @@ mod tests {
105 218 let rate = BusserError::RateLimited { retry_after_secs: 30 };
106 219 assert!(rate.to_string().contains("30"));
107 220 }
221 +
222 + #[test]
223 + fn category_mapping() {
224 + assert_eq!(BusserError::init_failed("x").category(), ErrorCategory::Config);
225 + assert_eq!(BusserError::fetch_failed("x").category(), ErrorCategory::Transient);
226 + assert_eq!(BusserError::config_error("x").category(), ErrorCategory::Config);
227 + assert_eq!(BusserError::auth_error("x").category(), ErrorCategory::Auth);
228 + assert_eq!(BusserError::parse_error("x").category(), ErrorCategory::Parse);
229 + assert_eq!(BusserError::other("x").category(), ErrorCategory::Unknown);
230 + assert_eq!(
231 + BusserError::RateLimited { retry_after_secs: 60 }.category(),
232 + ErrorCategory::RateLimited
233 + );
234 + }
235 +
236 + #[test]
237 + fn to_structured_rate_limited() {
238 + let e = BusserError::RateLimited { retry_after_secs: 60 };
239 + let s = e.to_structured();
240 + assert_eq!(s.category, ErrorCategory::RateLimited);
241 + assert_eq!(s.retry_after_secs, Some(60));
242 + }
243 +
244 + #[test]
245 + fn to_structured_regular() {
246 + let s = BusserError::auth_error("bad key").to_structured();
247 + assert_eq!(s.category, ErrorCategory::Auth);
248 + assert!(s.retry_after_secs.is_none());
249 + assert!(s.message.contains("bad key"));
250 + }
251 +
252 + #[test]
253 + fn structured_error_json_roundtrip() {
254 + let err = StructuredError::rate_limited("Too many requests", 120);
255 + let json = err.to_json();
256 + let parsed = StructuredError::from_last_error(&json);
257 + assert_eq!(parsed.category, ErrorCategory::RateLimited);
258 + assert_eq!(parsed.retry_after_secs, Some(120));
259 + assert!(parsed.message.contains("Too many requests"));
260 + }
261 +
262 + #[test]
263 + fn structured_error_from_legacy_plain_text() {
264 + let parsed = StructuredError::from_last_error("HTTP request failed: timeout");
265 + assert_eq!(parsed.category, ErrorCategory::Unknown);
266 + assert_eq!(parsed.message, "HTTP request failed: timeout");
267 + assert!(parsed.retry_after_secs.is_none());
268 + }
269 +
270 + #[test]
271 + fn error_category_display() {
272 + assert_eq!(ErrorCategory::Transient.to_string(), "transient");
273 + assert_eq!(ErrorCategory::RateLimited.to_string(), "rate_limited");
274 + assert_eq!(ErrorCategory::Auth.to_string(), "auth");
275 + assert_eq!(ErrorCategory::Config.to_string(), "config");
276 + assert_eq!(ErrorCategory::Parse.to_string(), "parse");
277 + assert_eq!(ErrorCategory::Unknown.to_string(), "unknown");
278 + }
108 279 }
@@ -69,6 +69,17 @@ impl BiteDisplay {
69 69 }
70 70 }
71 71
72 + /// A custom action button declared by a plugin (e.g. "View PDF", "ar5iv HTML").
73 + #[derive(Clone, Debug, Serialize, Deserialize)]
74 + pub struct ItemAction {
75 + /// Button label shown in the detail view.
76 + pub label: String,
77 + /// Action type: `"open"` (system browser) or `"download"` (download + open).
78 + pub action_type: String,
79 + /// Target URL for the action.
80 + pub url: String,
81 + }
82 +
72 83 /// Full content for detail view
73 84 #[derive(Clone, Debug, Default, Serialize, Deserialize)]
74 85 pub struct FeedItemContent {
@@ -80,6 +91,9 @@ pub struct FeedItemContent {
80 91 pub url: Option<String>,
81 92 /// Media attachments (URLs)
82 93 pub media: Vec<String>,
94 + /// Plugin-declared custom action buttons.
95 + #[serde(default)]
96 + pub actions: Vec<ItemAction>,
83 97 }
84 98
85 99 impl FeedItemContent {
@@ -105,7 +105,7 @@ The `FeedGenerator` (`bb-feed::generator`) reads items from the database, applie
105 105
106 106 ## Database Layer
107 107
108 - SQLite via sqlx with compile-time migrations (9 migrations). The `Database` struct holds a connection pool (`max_connections: 5`) and provides typed repository accessors.
108 + SQLite via sqlx with compile-time migrations (10 migrations). The `Database` struct holds a connection pool (`max_connections: 16`) and provides typed repository accessors.
109 109
110 110 ### Tables
111 111
@@ -207,7 +207,7 @@ Each feed tracks `consecutive_failures` and `last_error`. On fetch success, fail
207 207 | Ordering/filtering | `crates/bb-feed/src/ordering.rs` |
208 208 | Database layer | `crates/bb-db/src/` |
209 209 | Repositories | `crates/bb-db/src/repository.rs` |
210 - | Migrations | `migrations/sqlite/` (001-009) |
210 + | Migrations | `migrations/sqlite/` (001-010) |
211 211 | Tauri app state | `src-tauri/src/state.rs` |
212 212 | Tauri commands | `src-tauri/src/commands/` |
213 213 | Sync service | `src-tauri/src/sync_service.rs` |
M docs/todo.md +58 -3