//! Database repositories for feeds, items, and busser state. //! //! Each repository wraps an SQLite connection pool and provides typed //! CRUD operations for a single table. use chrono::{DateTime, Utc}; use sqlx::SqlitePool; use bb_interface::{ErrorCategory, StructuredError}; use crate::id_types::{BookmarkId, BusserStateId, FeedId, ItemId, QueryFeedId}; use crate::models::*; use crate::TIMESTAMP_FMT; /// Sanitize a user-provided search string for FTS5 MATCH syntax. /// /// Wraps each word in double quotes to prevent FTS5 syntax injection /// (e.g. a user typing `AND`, `OR`, `NOT`, `NEAR`, or `NEAR/N` won't be /// interpreted as FTS5 operators). Words are joined with spaces (implicit AND). /// /// Inside quoted strings FTS5 still honours two special characters: /// - `*` at the end of the last token triggers prefix matching /// - `^` at the start of the first token triggers beginning-of-column matching /// /// We strip those so user input like `^hello` or `world*` is treated literally. /// The `column:` prefix syntax (e.g. `title:word`) is neutralised by the quoting /// itself — colons inside double-quoted strings are treated as literal characters. fn sanitize_fts_query(query: &str) -> String { query .split_whitespace() .map(|word| { // Escape embedded double quotes (FTS5 uses "" to represent a literal ") let escaped = word.replace('"', "\"\""); // Strip `^` prefix and `*` suffix — both are special inside FTS5 quotes let escaped = escaped.trim_start_matches('^'); let escaped = escaped.trim_end_matches('*'); format!("\"{}\"", escaped) }) // Drop tokens that became empty after stripping (e.g. bare `^`, `*`, `^*`) .filter(|token| token != "\"\"") .collect::>() .join(" ") } /// Maximum allowed length for a search query string. /// /// Queries longer than this are rejected early (returning an empty result set) /// to prevent excessively large FTS5 MATCH expressions from consuming memory /// or CPU in SQLite. const MAX_SEARCH_QUERY_LENGTH: usize = 500; /// Number of consecutive failures before a feed is automatically disabled. /// /// Once a feed accumulates this many failures without a successful fetch, /// the circuit breaker trips: the feed is marked `circuit_broken = 1` and /// excluded from automatic fetch scheduling until manually reset. pub const CIRCUIT_BREAKER_THRESHOLD: i64 = 10; #[derive(Clone)] /// Repository for feed subscription CRUD and fetch tracking pub struct FeedsRepository { pool: SqlitePool, } impl FeedsRepository { /// Create a new feeds repository backed by the given pool. #[tracing::instrument(skip_all)] pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Insert a new feed and return the created row. /// New feeds default to `enabled = 1` (auto-fetch active) and /// `created_at = updated_at` (no separate "first modified" vs "created" notion). #[tracing::instrument(skip_all)] pub async fn create(&self, input: CreateFeed) -> Result { let id = FeedId::new(); let now = Utc::now().format(TIMESTAMP_FMT).to_string(); // serde_json::Value is always serializable; unwrap is safe here. let config = serde_json::to_string(&input.config).unwrap_or_else(|_| "{}".to_string()); sqlx::query_as( r#" INSERT INTO feeds (id, busser_id, name, config, enabled, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, 1, ?5, ?5) RETURNING * "#, ) .bind(id) .bind(input.busser_id.as_str()) .bind(&input.name) .bind(&config) .bind(&now) .fetch_one(&self.pool) .await } /// Look up a single feed by its ID. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get(&self, id: FeedId) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM feeds WHERE id = ?1") .bind(id) .fetch_optional(&self.pool) .await } /// List all feeds belonging to a given busser, ordered by name. #[tracing::instrument(skip_all)] pub async fn get_by_busser(&self, busser_id: &str) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM feeds WHERE busser_id = ?1 ORDER BY name") .bind(busser_id) .fetch_all(&self.pool) .await } /// List only enabled feeds that are not circuit-broken, ordered by name. #[tracing::instrument(skip_all)] pub async fn list_enabled(&self) -> Result, sqlx::Error> { sqlx::query_as( "SELECT * FROM feeds WHERE enabled = 1 AND circuit_broken = 0 ORDER BY name", ) .fetch_all(&self.pool) .await } /// List every feed (enabled or disabled), ordered by name. #[tracing::instrument(skip_all)] pub async fn list_all(&self) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM feeds ORDER BY name") .fetch_all(&self.pool) .await } /// Update the `last_fetch` and `updated_at` timestamps to now. #[tracing::instrument(skip_all)] pub async fn update_last_fetch(&self, id: FeedId) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query("UPDATE feeds SET last_fetch = ?1, updated_at = ?1 WHERE id = ?2") .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Enable or disable a feed. #[tracing::instrument(skip_all)] pub async fn set_enabled(&self, id: FeedId, enabled: bool) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query("UPDATE feeds SET enabled = ?1, updated_at = ?2 WHERE id = ?3") .bind(enabled) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Record a successful fetch: reset failure counter, clear error, update timestamps. #[tracing::instrument(skip_all)] pub async fn record_fetch_success(&self, id: FeedId) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query( "UPDATE feeds SET consecutive_failures = 0, last_error = NULL, \ last_success_at = ?1, last_fetch = ?1, updated_at = ?1 WHERE id = ?2", ) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Record a fetch failure: increment counter, store error, update timestamp. /// /// Returns `true` if the circuit breaker tripped (i.e. the feed just crossed /// the [`CIRCUIT_BREAKER_THRESHOLD`] and was marked `circuit_broken = 1`). #[tracing::instrument(skip_all)] pub async fn record_fetch_failure( &self, id: FeedId, error: &str, ) -> Result { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query( "UPDATE feeds SET consecutive_failures = consecutive_failures + 1, \ last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3", ) .bind(error) .bind(&now) .bind(id) .execute(&self.pool) .await?; // Check if we just crossed the threshold let feed = self.get(id).await?; if let Some(feed) = feed { if !feed.circuit_broken && feed.consecutive_failures >= CIRCUIT_BREAKER_THRESHOLD { self.set_circuit_broken(id, true).await?; return Ok(true); } } Ok(false) } /// Record a structured fetch failure with category-aware behavior: /// /// - `RateLimited` — store error JSON, do NOT increment `consecutive_failures`. /// - `Auth` / `Config` — increment + immediately set `circuit_broken = 1`. /// - `Transient` / `Parse` / `Unknown` — increment normally (existing behavior). /// /// Returns `true` if the circuit breaker tripped. #[tracing::instrument(skip_all)] pub async fn record_fetch_failure_structured( &self, id: FeedId, error: &StructuredError, ) -> Result { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); let error_json = error.to_json(); match error.category { ErrorCategory::RateLimited => { // Store error but don't increment failure counter sqlx::query( "UPDATE feeds SET last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3", ) .bind(&error_json) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(false) } ErrorCategory::Auth | ErrorCategory::Config => { // Increment + immediate circuit break sqlx::query( "UPDATE feeds SET consecutive_failures = consecutive_failures + 1, \ last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3", ) .bind(&error_json) .bind(&now) .bind(id) .execute(&self.pool) .await?; self.set_circuit_broken(id, true).await?; Ok(true) } ErrorCategory::Transient | ErrorCategory::Parse | ErrorCategory::Unknown => { // Normal behavior: increment and check threshold sqlx::query( "UPDATE feeds SET consecutive_failures = consecutive_failures + 1, \ last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3", ) .bind(&error_json) .bind(&now) .bind(id) .execute(&self.pool) .await?; let feed = self.get(id).await?; if let Some(feed) = feed { if !feed.circuit_broken && feed.consecutive_failures >= CIRCUIT_BREAKER_THRESHOLD { self.set_circuit_broken(id, true).await?; return Ok(true); } } Ok(false) } } } /// Mark a feed as circuit-broken (or clear the circuit breaker). #[tracing::instrument(skip_all)] pub async fn set_circuit_broken( &self, id: FeedId, broken: bool, ) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query( "UPDATE feeds SET circuit_broken = ?1, updated_at = ?2 WHERE id = ?3", ) .bind(broken) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Reset the circuit breaker on a feed: clear `circuit_broken`, reset /// `consecutive_failures` to 0, and clear `last_error`. /// /// Called when a user manually triggers a fetch for a circuit-broken feed. #[tracing::instrument(skip_all)] pub async fn reset_circuit_breaker(&self, id: FeedId) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query( "UPDATE feeds SET circuit_broken = 0, consecutive_failures = 0, \ last_error = NULL, updated_at = ?1 WHERE id = ?2", ) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Update a feed's config JSON string. #[tracing::instrument(skip_all)] pub async fn update_config(&self, id: FeedId, config: &str) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query("UPDATE feeds SET config = ?1, updated_at = ?2 WHERE id = ?3") .bind(config) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Update a feed's display name. #[tracing::instrument(skip_all)] pub async fn update_name(&self, id: FeedId, name: &str) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query("UPDATE feeds SET name = ?1, updated_at = ?2 WHERE id = ?3") .bind(name) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Delete a feed by ID. #[tracing::instrument(skip_all)] pub async fn delete(&self, id: FeedId) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM feeds WHERE id = ?1") .bind(id) .execute(&self.pool) .await?; Ok(()) } } #[derive(Clone)] /// Repository for feed item CRUD, read/star toggling, and paginated listing pub struct ItemsRepository { pool: SqlitePool, } impl ItemsRepository { /// Create a new items repository backed by the given pool. #[tracing::instrument(skip_all)] pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Insert a feed item or update it if `external_id` already exists. /// /// The ON CONFLICT clause deliberately preserves user state (`is_read`, /// `is_starred`) — these are never overwritten by a re-fetch. Content fields /// (author, text, body, etc.) are updated in case the source edited the post. #[tracing::instrument(skip_all)] pub async fn upsert(&self, input: CreateFeedItem) -> Result { let id = ItemId::new(); let now = Utc::now().format(TIMESTAMP_FMT).to_string(); let published = input.published_at.format(TIMESTAMP_FMT).to_string(); // Vec is always serializable; unwrap is safe here. let media = serde_json::to_string(&input.media).unwrap_or_else(|_| "[]".to_string()); let tags = serde_json::to_string(&input.tags).unwrap_or_else(|_| "[]".to_string()); let actions = serde_json::to_string(&input.actions).unwrap_or_else(|_| "[]".to_string()); sqlx::query_as( r#" INSERT INTO feed_items ( id, external_id, feed_id, busser_id, bite_author, bite_text, bite_secondary, bite_indicator, title, body, url, media, actions, published_at, fetched_at, source_name, score, tags, is_read, is_starred, created_at, updated_at ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, 0, 0, ?19, ?19) ON CONFLICT (external_id) DO UPDATE SET bite_author = EXCLUDED.bite_author, bite_text = EXCLUDED.bite_text, bite_secondary = EXCLUDED.bite_secondary, bite_indicator = EXCLUDED.bite_indicator, title = EXCLUDED.title, body = EXCLUDED.body, url = EXCLUDED.url, media = EXCLUDED.media, actions = EXCLUDED.actions, score = EXCLUDED.score, tags = EXCLUDED.tags, updated_at = EXCLUDED.updated_at RETURNING * "#, ) .bind(id) .bind(&input.external_id) .bind(input.feed_id) .bind(input.busser_id.as_str()) .bind(&input.bite_author) .bind(&input.bite_text) .bind(&input.bite_secondary) .bind(&input.bite_indicator) .bind(&input.title) .bind(&input.body) .bind(&input.url) .bind(&media) .bind(&actions) .bind(&published) .bind(&now) .bind(&input.source_name) .bind(input.score) .bind(&tags) .bind(&now) .fetch_one(&self.pool) .await } /// Look up a feed item by its internal ID. #[tracing::instrument(skip_all)] pub async fn get(&self, id: ItemId) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM feed_items WHERE id = ?1") .bind(id) .fetch_optional(&self.pool) .await } /// Look up a feed item by the `external_id` produced by the busser. #[tracing::instrument(skip_all)] pub async fn get_by_external_id( &self, external_id: &str, ) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM feed_items WHERE external_id = ?1") .bind(external_id) .fetch_optional(&self.pool) .await } /// List all items ordered by `published_at` descending, with pagination. /// Sorted by publish date (not fetch date) so items appear where the author /// intended, even if fetched out of order during backfill. #[tracing::instrument(skip_all)] pub async fn list_all(&self, limit: i64, offset: i64) -> Result, sqlx::Error> { sqlx::query_as( "SELECT * FROM feed_items ORDER BY published_at DESC LIMIT ?1 OFFSET ?2", ) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await } /// List items belonging to a specific feed, newest first. #[tracing::instrument(skip_all)] pub async fn list_by_feed( &self, feed_id: FeedId, limit: i64, offset: i64, ) -> Result, sqlx::Error> { sqlx::query_as( "SELECT * FROM feed_items WHERE feed_id = ?1 ORDER BY published_at DESC LIMIT ?2 OFFSET ?3", ) .bind(feed_id) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await } /// List items from a specific busser source, newest first. #[tracing::instrument(skip_all)] pub async fn list_by_busser( &self, busser_id: &str, limit: i64, offset: i64, ) -> Result, sqlx::Error> { sqlx::query_as( "SELECT * FROM feed_items WHERE busser_id = ?1 ORDER BY published_at DESC LIMIT ?2 OFFSET ?3", ) .bind(busser_id) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await } /// List unread items only, newest first. #[tracing::instrument(skip_all)] pub async fn list_unread(&self, limit: i64, offset: i64) -> Result, sqlx::Error> { sqlx::query_as( "SELECT * FROM feed_items WHERE is_read = 0 ORDER BY published_at DESC LIMIT ?1 OFFSET ?2", ) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await } /// List starred items only, newest first. #[tracing::instrument(skip_all)] pub async fn list_starred( &self, limit: i64, offset: i64, ) -> Result, sqlx::Error> { sqlx::query_as( "SELECT * FROM feed_items WHERE is_starred = 1 ORDER BY published_at DESC LIMIT ?1 OFFSET ?2", ) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await } /// Full-text search using FTS5 across title, body, and bite_text. /// /// Uses the `feed_items_fts` virtual table for fast ranked search. /// Results are ordered by FTS5 relevance then by `published_at DESC`. /// Additional boolean filters (source, unread, starred) are combined. #[tracing::instrument(skip_all)] pub async fn list_search( &self, query: &str, source: Option<&str>, unread_only: bool, starred_only: bool, limit: i64, offset: i64, ) -> Result, sqlx::Error> { // Reject excessively long queries before any processing. if query.len() > MAX_SEARCH_QUERY_LENGTH { return Ok(vec![]); } let fts_query = sanitize_fts_query(query); // If sanitization stripped everything (e.g. query was just `*` or `^`), // return early — an empty MATCH expression would be a SQL error. if fts_query.is_empty() { return Ok(vec![]); } let mut sql = String::from( "SELECT fi.* FROM feed_items fi \ INNER JOIN feed_items_fts fts ON fi.rowid = fts.rowid \ WHERE feed_items_fts MATCH ?1", ); if source.is_some() { sql.push_str(" AND fi.busser_id = ?4"); } if unread_only { sql.push_str(" AND fi.is_read = 0"); } if starred_only { sql.push_str(" AND fi.is_starred = 1"); } sql.push_str(" ORDER BY fts.rank, fi.published_at DESC LIMIT ?2 OFFSET ?3"); let mut q = sqlx::query_as::<_, DbFeedItem>(&sql) .bind(&fts_query) // ?1 .bind(limit) // ?2 .bind(offset); // ?3 if let Some(src) = source { q = q.bind(src); // ?4 } q.fetch_all(&self.pool).await } /// List feed items with any combination of filters pushed into SQL. /// /// Unifies `list_all`, `list_by_busser`, `list_unread`, `list_starred`, /// and `list_search` into a single method so callers don't need an /// if/else chain that drops filter combinations. #[tracing::instrument(skip_all)] pub async fn list_filtered( &self, search: Option<&str>, source: Option<&str>, unread_only: bool, starred_only: bool, limit: i64, offset: i64, ) -> Result, sqlx::Error> { // When a search query is present, use FTS5. if let Some(query) = search { return self .list_search(query, source, unread_only, starred_only, limit, offset) .await; } // Build a dynamic query with conditional WHERE clauses. let mut sql = String::from("SELECT * FROM feed_items WHERE 1=1"); if source.is_some() { sql.push_str(" AND busser_id = ?3"); } if unread_only { sql.push_str(" AND is_read = 0"); } if starred_only { sql.push_str(" AND is_starred = 1"); } sql.push_str(" ORDER BY published_at DESC LIMIT ?1 OFFSET ?2"); let mut q = sqlx::query_as::<_, DbFeedItem>(&sql) .bind(limit) // ?1 .bind(offset); // ?2 if let Some(src) = source { q = q.bind(src); // ?3 } q.fetch_all(&self.pool).await } /// Set the read flag on a feed item. #[tracing::instrument(skip_all)] pub async fn mark_read(&self, id: ItemId, is_read: bool) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query("UPDATE feed_items SET is_read = ?1, updated_at = ?2 WHERE id = ?3") .bind(is_read) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Set the starred flag on a feed item. #[tracing::instrument(skip_all)] pub async fn mark_starred(&self, id: ItemId, is_starred: bool) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query("UPDATE feed_items SET is_starred = ?1, updated_at = ?2 WHERE id = ?3") .bind(is_starred) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Mark all unread items as read, optionally filtered to a specific source. #[tracing::instrument(skip_all)] pub async fn mark_all_read(&self, busser_id: Option<&str>) -> Result { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); let result = match busser_id { Some(id) => { sqlx::query( "UPDATE feed_items SET is_read = 1, updated_at = ?1 WHERE is_read = 0 AND busser_id = ?2", ) .bind(&now) .bind(id) .execute(&self.pool) .await? } None => { sqlx::query( "UPDATE feed_items SET is_read = 1, updated_at = ?1 WHERE is_read = 0", ) .bind(&now) .execute(&self.pool) .await? } }; Ok(result.rows_affected()) } /// Count all feed items. #[tracing::instrument(skip_all)] pub async fn count_all(&self) -> Result { let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items") .fetch_one(&self.pool) .await?; Ok(row.0) } /// Count items from a specific busser source. #[tracing::instrument(skip_all)] pub async fn count_by_busser(&self, busser_id: &str) -> Result { let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE busser_id = ?1") .bind(busser_id) .fetch_one(&self.pool) .await?; Ok(row.0) } /// Count unread items across all sources. #[tracing::instrument(skip_all)] pub async fn count_unread(&self) -> Result { let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE is_read = 0") .fetch_one(&self.pool) .await?; Ok(row.0) } /// Count starred items. #[tracing::instrument(skip_all)] pub async fn count_starred(&self) -> Result { let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE is_starred = 1") .fetch_one(&self.pool) .await?; Ok(row.0) } /// Count unread items from a specific busser source. #[tracing::instrument(skip_all)] pub async fn count_unread_by_busser(&self, busser_id: &str) -> Result { let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE busser_id = ?1 AND is_read = 0") .bind(busser_id) .fetch_one(&self.pool) .await?; Ok(row.0) } /// Count items matching a combination of source, unread, and starred filters. #[tracing::instrument(skip_all)] pub async fn count_filtered( &self, source: Option<&str>, unread_only: bool, starred_only: bool, ) -> Result { let mut sql = "SELECT COUNT(*) FROM feed_items WHERE 1=1".to_string(); if source.is_some() { sql.push_str(" AND busser_id = ?1"); } if unread_only { sql.push_str(" AND is_read = 0"); } if starred_only { sql.push_str(" AND is_starred = 1"); } let mut query = sqlx::query_as::<_, (i64,)>(&sql); if let Some(s) = source { query = query.bind(s); } let row = query.fetch_one(&self.pool).await?; Ok(row.0) } /// Get total and unread counts for all busser sources in a single query. /// /// Returns `(busser_id, total_count, unread_count)` tuples. This avoids /// issuing separate count queries per source (N+1 problem). pub async fn counts_by_busser(&self) -> Result, sqlx::Error> { let rows: Vec<(String, i64, i64)> = sqlx::query_as( r#" SELECT busser_id, COUNT(*) AS total_count, SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END) AS unread_count FROM feed_items GROUP BY busser_id "#, ) .fetch_all(&self.pool) .await?; Ok(rows) } /// Delete a single feed item by ID. #[tracing::instrument(skip_all)] pub async fn delete(&self, id: ItemId) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM feed_items WHERE id = ?1") .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Delete all items belonging to a feed. Returns the number of rows removed. #[tracing::instrument(skip_all)] pub async fn delete_by_feed(&self, feed_id: FeedId) -> Result { let result = sqlx::query("DELETE FROM feed_items WHERE feed_id = ?1") .bind(feed_id) .execute(&self.pool) .await?; Ok(result.rows_affected()) } /// Delete old read items that are not starred. Returns the number of rows removed. /// /// Items published before `before` that have been read and are not starred /// will be permanently deleted. Starred items are always preserved regardless /// of age or read status. #[tracing::instrument(skip_all)] pub async fn delete_stale_read(&self, before: DateTime) -> Result { let cutoff = before.format(TIMESTAMP_FMT).to_string(); let result = sqlx::query( "DELETE FROM feed_items WHERE is_read = 1 AND is_starred = 0 AND published_at < ?1", ) .bind(&cutoff) .execute(&self.pool) .await?; Ok(result.rows_affected()) } } #[derive(Clone)] /// Repository for per-feed tag assignment and listing pub struct TagsRepository { pool: SqlitePool, } impl TagsRepository { /// Create a new tags repository backed by the given pool. #[tracing::instrument(skip_all)] pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Replace all tags on a feed. Deletes existing tags, then inserts the new set. /// /// Wrapped in a transaction so the delete + inserts are atomic — a failure /// mid-way won't leave the feed with zero tags. #[tracing::instrument(skip_all)] pub async fn set_tags(&self, feed_id: FeedId, tags: &[String]) -> Result<(), sqlx::Error> { let mut tx = self.pool.begin().await?; sqlx::query("DELETE FROM feed_tags WHERE feed_id = ?1") .bind(feed_id) .execute(&mut *tx) .await?; for tag in tags { if !tag.is_empty() { sqlx::query("INSERT OR IGNORE INTO feed_tags (feed_id, tag) VALUES (?1, ?2)") .bind(feed_id) .bind(tag) .execute(&mut *tx) .await?; } } tx.commit().await?; Ok(()) } /// Add a single tag to a feed (idempotent). #[tracing::instrument(skip_all)] pub async fn add_tag(&self, feed_id: FeedId, tag: &str) -> Result<(), sqlx::Error> { sqlx::query("INSERT OR IGNORE INTO feed_tags (feed_id, tag) VALUES (?1, ?2)") .bind(feed_id) .bind(tag) .execute(&self.pool) .await?; Ok(()) } /// Remove a single tag from a feed. #[tracing::instrument(skip_all)] pub async fn remove_tag(&self, feed_id: FeedId, tag: &str) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM feed_tags WHERE feed_id = ?1 AND tag = ?2") .bind(feed_id) .bind(tag) .execute(&self.pool) .await?; Ok(()) } /// Get all tags for a feed, ordered alphabetically. #[tracing::instrument(skip_all)] pub async fn get_tags(&self, feed_id: FeedId) -> Result, sqlx::Error> { let rows: Vec<(String,)> = sqlx::query_as("SELECT tag FROM feed_tags WHERE feed_id = ?1 ORDER BY tag") .bind(feed_id) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(|(t,)| t).collect()) } /// List all distinct tags across all feeds, ordered alphabetically. #[tracing::instrument(skip_all)] pub async fn list_all_tags(&self) -> Result, sqlx::Error> { let rows: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT tag FROM feed_tags ORDER BY tag") .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(|(t,)| t).collect()) } /// Get all (feed_id, tag) pairs for bulk rendering. #[tracing::instrument(skip_all)] pub async fn all_feed_tags(&self) -> Result, sqlx::Error> { sqlx::query_as("SELECT feed_id, tag FROM feed_tags ORDER BY feed_id, tag") .fetch_all(&self.pool) .await } /// Get feed IDs that have any of the given tags. #[tracing::instrument(skip_all)] pub async fn feed_ids_with_tags(&self, tags: &[String]) -> Result, sqlx::Error> { if tags.is_empty() { return Ok(Vec::new()); } // Build a query with placeholders for each tag let placeholders: Vec = (1..=tags.len()).map(|i| format!("?{}", i)).collect(); let sql = format!( "SELECT DISTINCT feed_id FROM feed_tags WHERE tag IN ({})", placeholders.join(", ") ); let mut query = sqlx::query_as::<_, (FeedId,)>(&sql); for tag in tags { query = query.bind(tag); } let rows = query.fetch_all(&self.pool).await?; Ok(rows.into_iter().map(|(id,)| id).collect()) } } #[derive(Clone)] /// Repository for busser key-value state (cursors, tokens, pagination markers) pub struct StateRepository { pool: SqlitePool, } impl StateRepository { /// Create a new state repository backed by the given pool. #[tracing::instrument(skip_all)] pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Get a single state value for a busser by key. #[tracing::instrument(skip_all)] pub async fn get(&self, busser_id: &str, key: &str) -> Result, sqlx::Error> { let row: Option<(String,)> = sqlx::query_as("SELECT value FROM busser_state WHERE busser_id = ?1 AND key = ?2") .bind(busser_id) .bind(key) .fetch_optional(&self.pool) .await?; Ok(row.map(|(v,)| v)) } /// Set a state value, inserting or updating on conflict. /// Uses upsert on the `(busser_id, key)` composite unique constraint so /// callers don't need to check existence first. #[tracing::instrument(skip_all)] pub async fn set(&self, busser_id: &str, key: &str, value: &str) -> Result<(), sqlx::Error> { let id = BusserStateId::new(); let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query( r#" INSERT INTO busser_state (id, busser_id, key, value, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5, ?5) ON CONFLICT (busser_id, key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at "#, ) .bind(id) .bind(busser_id) .bind(key) .bind(value) .bind(&now) .execute(&self.pool) .await?; Ok(()) } /// Delete a single state entry by busser ID and key. #[tracing::instrument(skip_all)] pub async fn delete(&self, busser_id: &str, key: &str) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM busser_state WHERE busser_id = ?1 AND key = ?2") .bind(busser_id) .bind(key) .execute(&self.pool) .await?; Ok(()) } /// Delete all state entries for a busser. Returns the number of rows removed. #[tracing::instrument(skip_all)] pub async fn delete_all(&self, busser_id: &str) -> Result { let result = sqlx::query("DELETE FROM busser_state WHERE busser_id = ?1") .bind(busser_id) .execute(&self.pool) .await?; Ok(result.rows_affected()) } /// List all state entries for a busser, ordered by key. #[tracing::instrument(skip_all)] pub async fn list(&self, busser_id: &str) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM busser_state WHERE busser_id = ?1 ORDER BY key") .bind(busser_id) .fetch_all(&self.pool) .await } } #[derive(Clone)] /// Repository for user_config key-value pairs (theme, welcome flag, etc.) pub struct ConfigRepository { pool: SqlitePool, } impl ConfigRepository { #[tracing::instrument(skip_all)] pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Get a config value by key. #[tracing::instrument(skip_all)] pub async fn get(&self, key: &str) -> Result, sqlx::Error> { let row: Option<(String,)> = sqlx::query_as("SELECT value FROM user_config WHERE key = ?1") .bind(key) .fetch_optional(&self.pool) .await?; Ok(row.map(|(v,)| v)) } /// Set a config value, inserting or updating on conflict. #[tracing::instrument(skip_all)] pub async fn set(&self, key: &str, value: &str) -> Result<(), sqlx::Error> { sqlx::query( r#" INSERT INTO user_config (key, value) VALUES (?1, ?2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value "#, ) .bind(key) .bind(value) .execute(&self.pool) .await?; Ok(()) } /// Delete a config entry by key. #[tracing::instrument(skip_all)] pub async fn delete(&self, key: &str) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM user_config WHERE key = ?1") .bind(key) .execute(&self.pool) .await?; Ok(()) } } #[derive(Clone)] /// Repository for saved query feed (virtual source) CRUD pub struct QueryFeedsRepository { pool: SqlitePool, } impl QueryFeedsRepository { #[tracing::instrument(skip_all)] pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Insert a new query feed and return the created row. pub async fn create(&self, input: CreateQueryFeed) -> Result { let id = QueryFeedId::new(); let now = Utc::now().format(TIMESTAMP_FMT).to_string(); let rules_json = serde_json::to_string(&input.rules).unwrap_or_else(|_| "[]".to_string()); sqlx::query_as( r#" INSERT INTO query_feeds (id, name, rules, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?4) RETURNING * "#, ) .bind(id) .bind(&input.name) .bind(&rules_json) .bind(&now) .fetch_one(&self.pool) .await } /// Look up a single query feed by ID. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get(&self, id: QueryFeedId) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM query_feeds WHERE id = ?1") .bind(id) .fetch_optional(&self.pool) .await } /// List all query feeds, ordered by name. #[tracing::instrument(skip_all)] pub async fn list_all(&self) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM query_feeds ORDER BY name") .fetch_all(&self.pool) .await } /// Update a query feed's name and rules. #[tracing::instrument(skip_all)] pub async fn update( &self, id: QueryFeedId, name: &str, rules: &[QueryCondition], ) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); let rules_json = serde_json::to_string(rules).unwrap_or_else(|_| "[]".to_string()); sqlx::query( "UPDATE query_feeds SET name = ?1, rules = ?2, updated_at = ?3 WHERE id = ?4", ) .bind(name) .bind(&rules_json) .bind(&now) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Delete a query feed by ID. #[tracing::instrument(skip_all)] pub async fn delete(&self, id: QueryFeedId) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM query_feeds WHERE id = ?1") .bind(id) .execute(&self.pool) .await?; Ok(()) } } // ── BookmarksRepository ────────────────────────────────────────── #[derive(Clone)] /// Repository for bookmark (reading list) CRUD pub struct BookmarksRepository { pool: SqlitePool, } impl BookmarksRepository { pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Insert a new bookmark and return the created row. #[tracing::instrument(skip_all)] pub async fn create(&self, input: CreateBookmark) -> Result { let id = BookmarkId::new(); let now = Utc::now().format(TIMESTAMP_FMT).to_string(); let mut tx = self.pool.begin().await?; let bookmark: DbBookmark = sqlx::query_as( r#" INSERT INTO bookmarks (id, url, title, description, author, source_name, feed_item_id, notes, is_pinned, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 0, ?9, ?9) RETURNING * "#, ) .bind(id) .bind(&input.url) .bind(&input.title) .bind(&input.description) .bind(&input.author) .bind(&input.source_name) .bind(&input.feed_item_id) .bind(&input.notes) .bind(&now) .fetch_one(&mut *tx) .await?; // Insert tags for tag in &input.tags { let tag = tag.trim(); if tag.is_empty() { continue; } sqlx::query("INSERT OR IGNORE INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)") .bind(id) .bind(tag) .execute(&mut *tx) .await?; } tx.commit().await?; Ok(bookmark) } /// Look up a single bookmark by ID. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get(&self, id: BookmarkId) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM bookmarks WHERE id = ?1") .bind(id) .fetch_optional(&self.pool) .await } /// Look up a bookmark by URL. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get_by_url(&self, url: &str) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM bookmarks WHERE url = ?1") .bind(url) .fetch_optional(&self.pool) .await } /// Look up a bookmark by its linked feed item ID. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get_by_feed_item(&self, feed_item_id: &str) -> Result, sqlx::Error> { sqlx::query_as("SELECT * FROM bookmarks WHERE feed_item_id = ?1") .bind(feed_item_id) .fetch_optional(&self.pool) .await } /// List bookmarks, optionally filtered by tag. Ordered by pinned first, then newest. #[tracing::instrument(skip_all)] pub async fn list(&self, tag: Option<&str>) -> Result, sqlx::Error> { match tag { Some(tag) => { sqlx::query_as( r#" SELECT b.* FROM bookmarks b INNER JOIN bookmark_tags bt ON bt.bookmark_id = b.id WHERE bt.tag = ?1 ORDER BY b.is_pinned DESC, b.created_at DESC "#, ) .bind(tag) .fetch_all(&self.pool) .await } None => { sqlx::query_as( "SELECT * FROM bookmarks ORDER BY is_pinned DESC, created_at DESC", ) .fetch_all(&self.pool) .await } } } /// Update a bookmark's mutable fields. #[tracing::instrument(skip_all)] pub async fn update(&self, id: BookmarkId, input: UpdateBookmark) -> Result<(), sqlx::Error> { let now = Utc::now().format(TIMESTAMP_FMT).to_string(); sqlx::query( r#" UPDATE bookmarks SET title = COALESCE(?2, title), description = COALESCE(?3, description), notes = COALESCE(?4, notes), is_pinned = COALESCE(?5, is_pinned), updated_at = ?1 WHERE id = ?6 "#, ) .bind(&now) .bind(&input.title) .bind(&input.description) .bind(&input.notes) .bind(input.is_pinned.map(|b| b as i32)) .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Delete a bookmark by ID (cascade deletes its tags). #[tracing::instrument(skip_all)] pub async fn delete(&self, id: BookmarkId) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM bookmarks WHERE id = ?1") .bind(id) .execute(&self.pool) .await?; Ok(()) } /// Replace all tags for a bookmark (delete-all-then-insert). #[tracing::instrument(skip_all)] pub async fn set_tags(&self, id: BookmarkId, tags: &[String]) -> Result<(), sqlx::Error> { let mut tx = self.pool.begin().await?; sqlx::query("DELETE FROM bookmark_tags WHERE bookmark_id = ?1") .bind(id) .execute(&mut *tx) .await?; for tag in tags { let tag = tag.trim(); if tag.is_empty() { continue; } sqlx::query("INSERT OR IGNORE INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)") .bind(id) .bind(tag) .execute(&mut *tx) .await?; } tx.commit().await?; Ok(()) } /// Get all tags for a bookmark. #[tracing::instrument(skip_all)] pub async fn get_tags(&self, id: BookmarkId) -> Result, sqlx::Error> { let rows: Vec<(String,)> = sqlx::query_as("SELECT tag FROM bookmark_tags WHERE bookmark_id = ?1 ORDER BY tag") .bind(id) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(|r| r.0).collect()) } /// Get all distinct tags across all bookmarks. #[tracing::instrument(skip_all)] pub async fn list_all_tags(&self) -> Result, sqlx::Error> { let rows: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT tag FROM bookmark_tags ORDER BY tag") .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(|r| r.0).collect()) } /// Count total bookmarks. #[tracing::instrument(skip_all)] pub async fn count(&self) -> Result { let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM bookmarks") .fetch_one(&self.pool) .await?; Ok(row.0) } } #[cfg(test)] mod tests { use super::*; use crate::id_types::BusserId; use chrono::{DateTime, Duration}; use serde_json::json; /// Connect to in-memory SQLite and run all migrations. async fn test_db() -> SqlitePool { let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); sqlx::migrate!("../../migrations/sqlite") .run(&pool) .await .unwrap(); pool } /// Insert a feed with the given busser and name, return the created row. async fn make_feed(pool: &SqlitePool, busser_id: &str, name: &str) -> DbFeed { FeedsRepository::new(pool.clone()) .create(CreateFeed { busser_id: BusserId::new(busser_id), name: name.to_string(), config: json!({}), }) .await .unwrap() } /// Insert a feed item with minimal required fields, return the created row. async fn make_item(pool: &SqlitePool, feed: &DbFeed, external_id: &str) -> DbFeedItem { make_item_at(pool, feed, external_id, Utc::now()).await } /// Insert a feed item at a specific published_at time. async fn make_item_at( pool: &SqlitePool, feed: &DbFeed, external_id: &str, published_at: DateTime, ) -> DbFeedItem { ItemsRepository::new(pool.clone()) .upsert(CreateFeedItem { external_id: external_id.to_string(), feed_id: feed.id, busser_id: feed.busser_id.clone(), bite_author: "author".to_string(), bite_text: format!("Item {external_id}"), bite_secondary: None, bite_indicator: None, title: Some(format!("Title {external_id}")), body: None, url: None, media: vec![], published_at, source_name: "test".to_string(), score: None, tags: vec![], actions: vec![], }) .await .unwrap() } // ── FeedsRepository ─────────────────────────────────────────── #[tokio::test] async fn feeds_create_and_get() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "My Feed").await; assert_eq!(feed.busser_id, "rss"); assert_eq!(feed.name, "My Feed"); assert!(feed.enabled); assert!(feed.last_fetch.is_none()); let fetched = FeedsRepository::new(pool.clone()) .get(feed.id) .await .unwrap() .expect("feed should exist"); assert_eq!(fetched.id, feed.id); assert_eq!(fetched.name, "My Feed"); } #[tokio::test] async fn feeds_get_nonexistent_returns_none() { let pool = test_db().await; let result = FeedsRepository::new(pool.clone()) .get(FeedId::new()) .await .unwrap(); assert!(result.is_none()); } #[tokio::test] async fn feeds_list_all_returns_created() { let pool = test_db().await; make_feed(&pool, "rss", "Beta Feed").await; make_feed(&pool, "rss", "Alpha Feed").await; let all = FeedsRepository::new(pool.clone()) .list_all() .await .unwrap(); assert_eq!(all.len(), 2); assert_eq!(all[0].name, "Alpha Feed"); assert_eq!(all[1].name, "Beta Feed"); } #[tokio::test] async fn feeds_get_by_busser_filters() { let pool = test_db().await; make_feed(&pool, "rss", "RSS Feed").await; make_feed(&pool, "hn", "HN Feed").await; make_feed(&pool, "rss", "RSS Feed 2").await; let rss = FeedsRepository::new(pool.clone()) .get_by_busser("rss") .await .unwrap(); assert_eq!(rss.len(), 2); for f in &rss { assert_eq!(f.busser_id, "rss"); } let hn = FeedsRepository::new(pool.clone()) .get_by_busser("hn") .await .unwrap(); assert_eq!(hn.len(), 1); assert_eq!(hn[0].busser_id, "hn"); } #[tokio::test] async fn feeds_list_enabled_excludes_disabled() { let pool = test_db().await; let feeds_repo = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Disabled Feed").await; make_feed(&pool, "rss", "Enabled Feed").await; feeds_repo.set_enabled(feed.id, false).await.unwrap(); let enabled = feeds_repo.list_enabled().await.unwrap(); assert_eq!(enabled.len(), 1); assert_eq!(enabled[0].name, "Enabled Feed"); } #[tokio::test] async fn feeds_set_enabled_toggle() { let pool = test_db().await; let feeds_repo = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Toggle Feed").await; assert!(feed.enabled); feeds_repo.set_enabled(feed.id, false).await.unwrap(); let updated = feeds_repo.get(feed.id).await.unwrap().unwrap(); assert!(!updated.enabled); feeds_repo.set_enabled(feed.id, true).await.unwrap(); let updated = feeds_repo.get(feed.id).await.unwrap().unwrap(); assert!(updated.enabled); } #[tokio::test] async fn feeds_update_last_fetch_sets_timestamp() { let pool = test_db().await; let feeds_repo = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Fetch Feed").await; assert!(feed.last_fetch.is_none()); feeds_repo.update_last_fetch(feed.id).await.unwrap(); let updated = feeds_repo.get(feed.id).await.unwrap().unwrap(); assert!(updated.last_fetch.is_some()); } #[tokio::test] async fn feeds_delete_removes_feed() { let pool = test_db().await; let feeds_repo = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Doomed Feed").await; feeds_repo.delete(feed.id).await.unwrap(); let result = feeds_repo.get(feed.id).await.unwrap(); assert!(result.is_none()); } // ── ItemsRepository ─────────────────────────────────────────── #[tokio::test] async fn items_upsert_and_get() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let item = make_item(&pool, &feed, "rss:1").await; assert_eq!(item.external_id, "rss:1"); assert_eq!(item.feed_id, feed.id); assert_eq!(item.busser_id, "rss"); assert_eq!(item.bite_author, "author"); assert!(!item.is_read); assert!(!item.is_starred); let fetched = ItemsRepository::new(pool.clone()) .get(item.id) .await .unwrap() .expect("item should exist"); assert_eq!(fetched.id, item.id); } #[tokio::test] async fn items_upsert_conflict_updates() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); let first = make_item(&pool, &feed, "rss:dup").await; let second = items_repo .upsert(CreateFeedItem { external_id: "rss:dup".to_string(), feed_id: feed.id, busser_id: feed.busser_id.clone(), bite_author: "updated_author".to_string(), bite_text: "Updated text".to_string(), bite_secondary: None, bite_indicator: None, title: Some("Updated Title".to_string()), body: None, url: None, media: vec![], published_at: Utc::now(), source_name: "test".to_string(), score: None, tags: vec![], actions: vec![], }) .await .unwrap(); assert_eq!(first.id, second.id); assert_eq!(second.bite_author, "updated_author"); assert_eq!(second.bite_text, "Updated text"); assert_eq!(items_repo.count_all().await.unwrap(), 1); } #[tokio::test] async fn items_get_by_external_id() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let item = make_item(&pool, &feed, "rss:ext1").await; let found = ItemsRepository::new(pool.clone()) .get_by_external_id("rss:ext1") .await .unwrap() .expect("should find by external_id"); assert_eq!(found.id, item.id); } #[tokio::test] async fn items_list_all_pagination() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let now = Utc::now(); make_item_at(&pool, &feed, "p:1", now - Duration::hours(3)).await; make_item_at(&pool, &feed, "p:2", now - Duration::hours(2)).await; make_item_at(&pool, &feed, "p:3", now - Duration::hours(1)).await; let items_repo = ItemsRepository::new(pool.clone()); let page1 = items_repo.list_all(2, 0).await.unwrap(); assert_eq!(page1.len(), 2); assert_eq!(page1[0].external_id, "p:3"); assert_eq!(page1[1].external_id, "p:2"); let page2 = items_repo.list_all(2, 2).await.unwrap(); assert_eq!(page2.len(), 1); assert_eq!(page2[0].external_id, "p:1"); } #[tokio::test] async fn items_list_by_feed_filters() { let pool = test_db().await; let feed_a = make_feed(&pool, "rss", "Feed A").await; let feed_b = make_feed(&pool, "rss", "Feed B").await; make_item(&pool, &feed_a, "a:1").await; make_item(&pool, &feed_a, "a:2").await; make_item(&pool, &feed_b, "b:1").await; let items_repo = ItemsRepository::new(pool.clone()); let a_items = items_repo.list_by_feed(feed_a.id, 100, 0).await.unwrap(); assert_eq!(a_items.len(), 2); for i in &a_items { assert_eq!(i.feed_id, feed_a.id); } let b_items = items_repo.list_by_feed(feed_b.id, 100, 0).await.unwrap(); assert_eq!(b_items.len(), 1); assert_eq!(b_items[0].feed_id, feed_b.id); } #[tokio::test] async fn items_list_by_busser_filters() { let pool = test_db().await; let feed_rss = make_feed(&pool, "rss", "RSS Feed").await; let feed_hn = make_feed(&pool, "hn", "HN Feed").await; make_item(&pool, &feed_rss, "rss:1").await; make_item(&pool, &feed_hn, "hn:1").await; make_item(&pool, &feed_hn, "hn:2").await; let items_repo = ItemsRepository::new(pool.clone()); let rss = items_repo.list_by_busser("rss", 100, 0).await.unwrap(); assert_eq!(rss.len(), 1); let hn = items_repo.list_by_busser("hn", 100, 0).await.unwrap(); assert_eq!(hn.len(), 2); } #[tokio::test] async fn items_list_unread_excludes_read() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); let item1 = make_item(&pool, &feed, "u:1").await; make_item(&pool, &feed, "u:2").await; items_repo.mark_read(item1.id, true).await.unwrap(); let unread = items_repo.list_unread(100, 0).await.unwrap(); assert_eq!(unread.len(), 1); assert_eq!(unread[0].external_id, "u:2"); } #[tokio::test] async fn items_list_starred_only() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); make_item(&pool, &feed, "s:1").await; let item2 = make_item(&pool, &feed, "s:2").await; items_repo.mark_starred(item2.id, true).await.unwrap(); let starred = items_repo.list_starred(100, 0).await.unwrap(); assert_eq!(starred.len(), 1); assert_eq!(starred[0].external_id, "s:2"); } #[tokio::test] async fn items_mark_read_and_unread() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); let item = make_item(&pool, &feed, "r:1").await; assert!(!item.is_read); items_repo.mark_read(item.id, true).await.unwrap(); let updated = items_repo.get(item.id).await.unwrap().unwrap(); assert!(updated.is_read); items_repo.mark_read(item.id, false).await.unwrap(); let updated = items_repo.get(item.id).await.unwrap().unwrap(); assert!(!updated.is_read); } #[tokio::test] async fn items_mark_starred_and_unstarred() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); let item = make_item(&pool, &feed, "st:1").await; assert!(!item.is_starred); items_repo.mark_starred(item.id, true).await.unwrap(); let updated = items_repo.get(item.id).await.unwrap().unwrap(); assert!(updated.is_starred); items_repo.mark_starred(item.id, false).await.unwrap(); let updated = items_repo.get(item.id).await.unwrap().unwrap(); assert!(!updated.is_starred); } #[tokio::test] async fn items_count_all_and_unread() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); let item1 = make_item(&pool, &feed, "c:1").await; make_item(&pool, &feed, "c:2").await; make_item(&pool, &feed, "c:3").await; assert_eq!(items_repo.count_all().await.unwrap(), 3); assert_eq!(items_repo.count_unread().await.unwrap(), 3); items_repo.mark_read(item1.id, true).await.unwrap(); assert_eq!(items_repo.count_all().await.unwrap(), 3); assert_eq!(items_repo.count_unread().await.unwrap(), 2); } #[tokio::test] async fn items_delete_by_feed_removes_items() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); make_item(&pool, &feed, "d:1").await; make_item(&pool, &feed, "d:2").await; assert_eq!(items_repo.count_all().await.unwrap(), 2); let removed = items_repo.delete_by_feed(feed.id).await.unwrap(); assert_eq!(removed, 2); assert_eq!(items_repo.count_all().await.unwrap(), 0); } #[tokio::test] async fn items_delete_stale_read() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); let now = Utc::now(); // Old read item (should be deleted) let old_read = make_item_at(&pool, &feed, "stale:1", now - Duration::days(60)).await; items_repo.mark_read(old_read.id, true).await.unwrap(); // Old read + starred item (should be preserved) let old_starred = make_item_at(&pool, &feed, "stale:2", now - Duration::days(60)).await; items_repo.mark_read(old_starred.id, true).await.unwrap(); items_repo.mark_starred(old_starred.id, true).await.unwrap(); // Old unread item (should be preserved) make_item_at(&pool, &feed, "stale:3", now - Duration::days(60)).await; // Recent read item (should be preserved) let recent_read = make_item_at(&pool, &feed, "stale:4", now - Duration::days(5)).await; items_repo.mark_read(recent_read.id, true).await.unwrap(); // Recent unread item (should be preserved) make_item_at(&pool, &feed, "stale:5", now - Duration::days(5)).await; assert_eq!(items_repo.count_all().await.unwrap(), 5); let cutoff = now - Duration::days(30); let deleted = items_repo.delete_stale_read(cutoff).await.unwrap(); assert_eq!(deleted, 1); // only old_read assert_eq!(items_repo.count_all().await.unwrap(), 4); // Verify the right items remain assert!(items_repo.get(old_starred.id).await.unwrap().is_some()); assert!(items_repo.get(recent_read.id).await.unwrap().is_some()); assert!(items_repo.get(old_read.id).await.unwrap().is_none()); } // ── Feed Health ────────────────────────────────────────────── #[tokio::test] async fn health_success_resets_counter() { let pool = test_db().await; let feeds_repo = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Feed").await; // Simulate two failures feeds_repo.record_fetch_failure(feed.id, "timeout").await.unwrap(); feeds_repo.record_fetch_failure(feed.id, "timeout").await.unwrap(); let f = feeds_repo.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 2); assert_eq!(f.last_error.as_deref(), Some("timeout")); // Then a success feeds_repo.record_fetch_success(feed.id).await.unwrap(); let f = feeds_repo.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 0); assert!(f.last_error.is_none()); assert!(f.last_success_at.is_some()); } #[tokio::test] async fn health_failure_increments() { let pool = test_db().await; let feeds_repo = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Feed").await; feeds_repo.record_fetch_failure(feed.id, "dns error").await.unwrap(); let f = feeds_repo.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 1); assert_eq!(f.last_error.as_deref(), Some("dns error")); feeds_repo.record_fetch_failure(feed.id, "connection refused").await.unwrap(); let f = feeds_repo.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 2); assert_eq!(f.last_error.as_deref(), Some("connection refused")); } #[tokio::test] async fn health_defaults_on_new_feed() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; assert_eq!(feed.consecutive_failures, 0); assert!(feed.last_error.is_none()); assert!(feed.last_success_at.is_none()); } // ── FTS5 Search ─────────────────────────────────────────────── #[tokio::test] async fn fts5_search_matches_title() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); items_repo .upsert(CreateFeedItem { external_id: "fts:1".to_string(), feed_id: feed.id, busser_id: BusserId::new("rss"), bite_author: "author".to_string(), bite_text: "Some bite text".to_string(), bite_secondary: None, bite_indicator: None, title: Some("Rust programming language".to_string()), body: Some("Body content here".to_string()), url: None, media: vec![], published_at: Utc::now(), source_name: "test".to_string(), score: None, tags: vec![], actions: vec![], }) .await .unwrap(); let results = items_repo .list_search("Rust", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].external_id, "fts:1"); } #[tokio::test] async fn fts5_search_matches_body() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); items_repo .upsert(CreateFeedItem { external_id: "fts:2".to_string(), feed_id: feed.id, busser_id: BusserId::new("rss"), bite_author: "author".to_string(), bite_text: "Bite".to_string(), bite_secondary: None, bite_indicator: None, title: Some("Title".to_string()), body: Some("SQLite full text search is powerful".to_string()), url: None, media: vec![], published_at: Utc::now(), source_name: "test".to_string(), score: None, tags: vec![], actions: vec![], }) .await .unwrap(); let results = items_repo .list_search("powerful", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 1); } #[tokio::test] async fn fts5_search_matches_bite_text() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); items_repo .upsert(CreateFeedItem { external_id: "fts:3".to_string(), feed_id: feed.id, busser_id: BusserId::new("rss"), bite_author: "author".to_string(), bite_text: "Breaking news about databases".to_string(), bite_secondary: None, bite_indicator: None, title: None, body: None, url: None, media: vec![], published_at: Utc::now(), source_name: "test".to_string(), score: None, tags: vec![], actions: vec![], }) .await .unwrap(); let results = items_repo .list_search("databases", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 1); } #[tokio::test] async fn fts5_multi_word_query() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); items_repo .upsert(CreateFeedItem { external_id: "fts:4".to_string(), feed_id: feed.id, busser_id: BusserId::new("rss"), bite_author: "author".to_string(), bite_text: "bite".to_string(), bite_secondary: None, bite_indicator: None, title: Some("Rust async programming guide".to_string()), body: None, url: None, media: vec![], published_at: Utc::now(), source_name: "test".to_string(), score: None, tags: vec![], actions: vec![], }) .await .unwrap(); let results = items_repo .list_search("Rust programming", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 1); } #[tokio::test] async fn fts5_special_characters_handled() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); make_item(&pool, &feed, "fts:5").await; // Search with FTS5 operators should be safely escaped let results = items_repo .list_search("AND OR NOT", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 0); // no match but no crash } // ── sanitize_fts_query unit tests ──────────────────────────── #[test] fn fts5_sanitize_basic_words() { assert_eq!(sanitize_fts_query("hello world"), r#""hello" "world""#); } #[test] fn fts5_sanitize_operators_quoted() { // AND, OR, NOT become quoted literal strings assert_eq!( sanitize_fts_query("AND OR NOT"), r#""AND" "OR" "NOT""#, ); } #[test] fn fts5_sanitize_near_operator() { assert_eq!(sanitize_fts_query("NEAR"), r#""NEAR""#); assert_eq!(sanitize_fts_query("NEAR/3"), r#""NEAR/3""#); assert_eq!(sanitize_fts_query("word NEAR/5 other"), r#""word" "NEAR/5" "other""#); } #[test] fn fts5_sanitize_column_prefix() { // column:term syntax — colon inside quotes is literal assert_eq!(sanitize_fts_query("title:rust"), r#""title:rust""#); assert_eq!( sanitize_fts_query("body:hello title:world"), r#""body:hello" "title:world""#, ); } #[test] fn fts5_sanitize_caret_stripped() { // ^ is beginning-of-column marker, must be stripped assert_eq!(sanitize_fts_query("^hello"), r#""hello""#); assert_eq!(sanitize_fts_query("^hello world"), r#""hello" "world""#); // Multiple carets assert_eq!(sanitize_fts_query("^^hello"), r#""hello""#); } #[test] fn fts5_sanitize_star_stripped() { // * is prefix query suffix, must be stripped assert_eq!(sanitize_fts_query("hello*"), r#""hello""#); assert_eq!(sanitize_fts_query("hel*"), r#""hel""#); // Multiple stars assert_eq!(sanitize_fts_query("hello**"), r#""hello""#); } #[test] fn fts5_sanitize_caret_and_star_combined() { assert_eq!(sanitize_fts_query("^hello*"), r#""hello""#); assert_eq!(sanitize_fts_query("^*"), ""); } #[test] fn fts5_sanitize_bare_special_chars_dropped() { // Bare `^`, `*`, `^*` should produce empty string (dropped) assert_eq!(sanitize_fts_query("^"), ""); assert_eq!(sanitize_fts_query("*"), ""); assert_eq!(sanitize_fts_query("^ word"), r#""word""#); assert_eq!(sanitize_fts_query("* word"), r#""word""#); } #[test] fn fts5_sanitize_embedded_quotes() { // Embedded double quotes are escaped as "" // Input: say "hi" -> tokens: [say] ["hi"] // "hi" has two quotes -> each becomes "" -> ""hi"" -> wrapped: """hi""" assert_eq!( sanitize_fts_query("say \"hi\""), "\"say\" \"\"\"hi\"\"\"", ); } #[test] fn fts5_sanitize_empty_and_whitespace() { assert_eq!(sanitize_fts_query(""), ""); assert_eq!(sanitize_fts_query(" "), ""); } #[test] fn fts5_sanitize_mixed_special_syntax() { // Realistic adversarial input combining multiple FTS5 features assert_eq!( sanitize_fts_query("^title:rust* NEAR/3 OR body:hello*"), r#""title:rust" "NEAR/3" "OR" "body:hello""#, ); } // ── FTS5 integration tests for hardened sanitization ───────── #[tokio::test] async fn fts5_near_operator_does_not_crash() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); make_item(&pool, &feed, "fts:near1").await; // NEAR and NEAR/N should be safely quoted let results = items_repo .list_search("NEAR", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 0); let results = items_repo .list_search("word NEAR/3 other", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 0); } #[tokio::test] async fn fts5_column_prefix_does_not_target_column() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); items_repo .upsert(CreateFeedItem { external_id: "fts:col1".to_string(), feed_id: feed.id, busser_id: BusserId::new("rss"), bite_author: "author".to_string(), bite_text: "bite".to_string(), bite_secondary: None, bite_indicator: None, title: Some("rust programming".to_string()), body: Some("unrelated body".to_string()), url: None, media: vec![], published_at: Utc::now(), source_name: "test".to_string(), score: None, tags: vec![], actions: vec![], }) .await .unwrap(); // "title:rust" should NOT act as a column filter — it should search // for the literal string "title:rust" which won't match anything let results = items_repo .list_search("title:rust", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 0); } #[tokio::test] async fn fts5_caret_prefix_does_not_crash() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); make_item(&pool, &feed, "fts:caret1").await; let results = items_repo .list_search("^Title", None, false, false, 10, 0) .await .unwrap(); // Caret is stripped, so this searches for "Title" which matches our items assert!(!results.is_empty()); } #[tokio::test] async fn fts5_star_suffix_does_not_crash() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); make_item(&pool, &feed, "fts:star1").await; let results = items_repo .list_search("Titl*", None, false, false, 10, 0) .await .unwrap(); // Star is stripped, so this searches for "Titl" which won't match "Title" // (exact token match, not prefix) assert_eq!(results.len(), 0); } #[tokio::test] async fn fts5_bare_star_and_caret_safe() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); make_item(&pool, &feed, "fts:bare1").await; // Bare special chars should not crash let results = items_repo .list_search("*", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 0); let results = items_repo .list_search("^", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 0); let results = items_repo .list_search("^*", None, false, false, 10, 0) .await .unwrap(); assert_eq!(results.len(), 0); } #[tokio::test] async fn fts5_pagination_works() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let items_repo = ItemsRepository::new(pool.clone()); for i in 0..5 { items_repo .upsert(CreateFeedItem { external_id: format!("fts:page:{i}"), feed_id: feed.id, busser_id: BusserId::new("rss"), bite_author: "author".to_string(), bite_text: "bite".to_string(), bite_secondary: None, bite_indicator: None, title: Some("Searchable title here".to_string()), body: None, url: None, media: vec![], published_at: Utc::now() - Duration::hours(i), source_name: "test".to_string(), score: None, tags: vec![], actions: vec![], }) .await .unwrap(); } let page1 = items_repo .list_search("Searchable", None, false, false, 2, 0) .await .unwrap(); assert_eq!(page1.len(), 2); let page2 = items_repo .list_search("Searchable", None, false, false, 2, 2) .await .unwrap(); assert_eq!(page2.len(), 2); let page3 = items_repo .list_search("Searchable", None, false, false, 2, 4) .await .unwrap(); assert_eq!(page3.len(), 1); } // ── TagsRepository ──────────────────────────────────────────── #[tokio::test] async fn tags_set_and_get() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let tags_repo = TagsRepository::new(pool.clone()); tags_repo .set_tags(feed.id, &["tech".into(), "rust".into()]) .await .unwrap(); let tags = tags_repo.get_tags(feed.id).await.unwrap(); assert_eq!(tags, vec!["rust", "tech"]); // alphabetical } #[tokio::test] async fn tags_set_idempotent() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let tags_repo = TagsRepository::new(pool.clone()); tags_repo.set_tags(feed.id, &["a".into(), "b".into()]).await.unwrap(); tags_repo.set_tags(feed.id, &["c".into()]).await.unwrap(); let tags = tags_repo.get_tags(feed.id).await.unwrap(); assert_eq!(tags, vec!["c"]); } #[tokio::test] async fn tags_add_and_remove() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let tags_repo = TagsRepository::new(pool.clone()); tags_repo.add_tag(feed.id, "news").await.unwrap(); tags_repo.add_tag(feed.id, "tech").await.unwrap(); tags_repo.add_tag(feed.id, "news").await.unwrap(); // duplicate let tags = tags_repo.get_tags(feed.id).await.unwrap(); assert_eq!(tags, vec!["news", "tech"]); tags_repo.remove_tag(feed.id, "news").await.unwrap(); let tags = tags_repo.get_tags(feed.id).await.unwrap(); assert_eq!(tags, vec!["tech"]); } #[tokio::test] async fn tags_list_all_tags() { let pool = test_db().await; let feed_a = make_feed(&pool, "rss", "A").await; let feed_b = make_feed(&pool, "hn", "B").await; let tags_repo = TagsRepository::new(pool.clone()); tags_repo.set_tags(feed_a.id, &["tech".into(), "news".into()]).await.unwrap(); tags_repo.set_tags(feed_b.id, &["tech".into(), "fun".into()]).await.unwrap(); let all = tags_repo.list_all_tags().await.unwrap(); assert_eq!(all, vec!["fun", "news", "tech"]); } #[tokio::test] async fn tags_cascade_on_feed_delete() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let tags_repo = TagsRepository::new(pool.clone()); let feeds_repo = FeedsRepository::new(pool.clone()); tags_repo.set_tags(feed.id, &["x".into()]).await.unwrap(); feeds_repo.delete(feed.id).await.unwrap(); let tags = tags_repo.get_tags(feed.id).await.unwrap(); assert!(tags.is_empty()); } #[tokio::test] async fn tags_feed_ids_with_tags() { let pool = test_db().await; let feed_a = make_feed(&pool, "rss", "A").await; let feed_b = make_feed(&pool, "hn", "B").await; let _feed_c = make_feed(&pool, "reddit", "C").await; let tags_repo = TagsRepository::new(pool.clone()); tags_repo.set_tags(feed_a.id, &["tech".into()]).await.unwrap(); tags_repo.set_tags(feed_b.id, &["tech".into(), "news".into()]).await.unwrap(); let ids = tags_repo.feed_ids_with_tags(&["tech".into()]).await.unwrap(); assert_eq!(ids.len(), 2); assert!(ids.contains(&feed_a.id)); assert!(ids.contains(&feed_b.id)); } // ── StateRepository ─────────────────────────────────────────── #[tokio::test] async fn state_set_and_get() { let pool = test_db().await; let state = StateRepository::new(pool.clone()); state.set("rss", "cursor", "abc123").await.unwrap(); let val = state.get("rss", "cursor").await.unwrap(); assert_eq!(val, Some("abc123".to_string())); } #[tokio::test] async fn state_set_overwrites_value() { let pool = test_db().await; let state = StateRepository::new(pool.clone()); state.set("rss", "cursor", "first").await.unwrap(); state.set("rss", "cursor", "second").await.unwrap(); let val = state.get("rss", "cursor").await.unwrap(); assert_eq!(val, Some("second".to_string())); } #[tokio::test] async fn state_get_missing_returns_none() { let pool = test_db().await; let state = StateRepository::new(pool.clone()); let val = state.get("rss", "nonexistent").await.unwrap(); assert!(val.is_none()); } #[tokio::test] async fn state_delete_removes_key() { let pool = test_db().await; let state = StateRepository::new(pool.clone()); state.set("rss", "token", "secret").await.unwrap(); state.delete("rss", "token").await.unwrap(); let val = state.get("rss", "token").await.unwrap(); assert!(val.is_none()); } #[tokio::test] async fn state_delete_all_clears_busser() { let pool = test_db().await; let state = StateRepository::new(pool.clone()); state.set("rss", "key1", "val1").await.unwrap(); state.set("rss", "key2", "val2").await.unwrap(); state.set("rss", "key3", "val3").await.unwrap(); let removed = state.delete_all("rss").await.unwrap(); assert_eq!(removed, 3); let remaining = state.list("rss").await.unwrap(); assert!(remaining.is_empty()); } // ── ConfigRepository ───────────────────────────────────────── #[tokio::test] async fn config_set_and_get() { let pool = test_db().await; let config = ConfigRepository::new(pool); config.set("theme", "dark").await.unwrap(); let val = config.get("theme").await.unwrap(); assert_eq!(val, Some("dark".to_string())); } #[tokio::test] async fn config_set_overwrites() { let pool = test_db().await; let config = ConfigRepository::new(pool); config.set("lang", "en").await.unwrap(); config.set("lang", "fr").await.unwrap(); let val = config.get("lang").await.unwrap(); assert_eq!(val, Some("fr".to_string())); } #[tokio::test] async fn config_get_missing_returns_none() { let pool = test_db().await; let config = ConfigRepository::new(pool); let val = config.get("nonexistent").await.unwrap(); assert!(val.is_none()); } #[tokio::test] async fn config_delete() { let pool = test_db().await; let config = ConfigRepository::new(pool); config.set("key", "value").await.unwrap(); config.delete("key").await.unwrap(); let val = config.get("key").await.unwrap(); assert!(val.is_none()); } // ── FeedsRepository (additional) ───────────────────────────── #[tokio::test] async fn feeds_update_name() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Old Name").await; let feeds = FeedsRepository::new(pool); feeds.update_name(feed.id, "New Name").await.unwrap(); let updated = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(updated.name, "New Name"); } #[tokio::test] async fn feeds_update_config() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let feeds = FeedsRepository::new(pool); feeds.update_config(feed.id, r#"{"url":"https://example.com/rss"}"#).await.unwrap(); let updated = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(updated.config, r#"{"url":"https://example.com/rss"}"#); } #[tokio::test] async fn feeds_record_fetch_success_resets_failures() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let feeds = FeedsRepository::new(pool); // Record some failures first feeds.record_fetch_failure(feed.id, "timeout").await.unwrap(); feeds.record_fetch_failure(feed.id, "dns error").await.unwrap(); let failed = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(failed.consecutive_failures, 2); assert_eq!(failed.last_error.as_deref(), Some("dns error")); // Record success feeds.record_fetch_success(feed.id).await.unwrap(); let ok = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(ok.consecutive_failures, 0); assert!(ok.last_error.is_none()); assert!(ok.last_success_at.is_some()); } #[tokio::test] async fn feeds_record_fetch_failure_increments() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let feeds = FeedsRepository::new(pool); feeds.record_fetch_failure(feed.id, "err1").await.unwrap(); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 1); assert_eq!(f.last_error.as_deref(), Some("err1")); feeds.record_fetch_failure(feed.id, "err2").await.unwrap(); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 2); assert_eq!(f.last_error.as_deref(), Some("err2")); } // ── record_fetch_failure_structured ──────────────────────────── #[tokio::test] async fn structured_failure_rate_limited_no_increment() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let feeds = FeedsRepository::new(pool); let err = StructuredError::rate_limited("429 Too Many Requests", 120); let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap(); assert!(!tripped); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 0, "rate_limited should not increment failures"); assert!(f.last_error.is_some(), "error should be stored"); // Verify stored as JSON let stored = StructuredError::from_last_error(f.last_error.as_ref().unwrap()); assert_eq!(stored.category, ErrorCategory::RateLimited); assert_eq!(stored.retry_after_secs, Some(120)); } #[tokio::test] async fn structured_failure_auth_immediate_circuit_break() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let feeds = FeedsRepository::new(pool); let err = StructuredError::new(ErrorCategory::Auth, "401 Unauthorized"); let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap(); assert!(tripped, "auth error should immediately trip circuit breaker"); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(f.circuit_broken); assert_eq!(f.consecutive_failures, 1); } #[tokio::test] async fn structured_failure_config_immediate_circuit_break() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let feeds = FeedsRepository::new(pool); let err = StructuredError::new(ErrorCategory::Config, "404 Not Found"); let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap(); assert!(tripped, "config error should immediately trip circuit breaker"); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(f.circuit_broken); } #[tokio::test] async fn structured_failure_transient_increments_normally() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let feeds = FeedsRepository::new(pool); let err = StructuredError::new(ErrorCategory::Transient, "HTTP 503"); let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap(); assert!(!tripped); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 1); assert!(!f.circuit_broken); } #[tokio::test] async fn structured_failure_transient_trips_at_threshold() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let feeds = FeedsRepository::new(pool); // Fill up to threshold - 1 for i in 0..CIRCUIT_BREAKER_THRESHOLD - 1 { let err = StructuredError::new(ErrorCategory::Transient, format!("error {i}")); let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap(); assert!(!tripped); } // One more should trip it let err = StructuredError::new(ErrorCategory::Transient, "final error"); let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap(); assert!(tripped); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(f.circuit_broken); } // ── ItemsRepository (additional) ───────────────────────────── #[tokio::test] async fn items_count_by_busser() { let pool = test_db().await; let feed_a = make_feed(&pool, "rss", "A").await; let feed_b = make_feed(&pool, "hn", "B").await; make_item(&pool, &feed_a, "a1").await; make_item(&pool, &feed_a, "a2").await; make_item(&pool, &feed_b, "b1").await; let items = ItemsRepository::new(pool); assert_eq!(items.count_by_busser("rss").await.unwrap(), 2); assert_eq!(items.count_by_busser("hn").await.unwrap(), 1); assert_eq!(items.count_by_busser("nonexistent").await.unwrap(), 0); } #[tokio::test] async fn items_count_unread_by_busser() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let item1 = make_item(&pool, &feed, "i1").await; make_item(&pool, &feed, "i2").await; let items = ItemsRepository::new(pool); assert_eq!(items.count_unread_by_busser("rss").await.unwrap(), 2); items.mark_read(item1.id, true).await.unwrap(); assert_eq!(items.count_unread_by_busser("rss").await.unwrap(), 1); } #[tokio::test] async fn items_counts_by_busser_bulk() { let pool = test_db().await; let feed_a = make_feed(&pool, "rss", "A").await; let feed_b = make_feed(&pool, "hn", "B").await; let item_a1 = make_item(&pool, &feed_a, "a1").await; make_item(&pool, &feed_a, "a2").await; make_item(&pool, &feed_b, "b1").await; let items = ItemsRepository::new(pool); items.mark_read(item_a1.id, true).await.unwrap(); let counts = items.counts_by_busser().await.unwrap(); // Should have entries for rss (total=2, unread=1) and hn (total=1, unread=1) let rss = counts.iter().find(|(b, _, _)| b == "rss").unwrap(); assert_eq!(rss.1, 2); // total assert_eq!(rss.2, 1); // unread let hn = counts.iter().find(|(b, _, _)| b == "hn").unwrap(); assert_eq!(hn.1, 1); assert_eq!(hn.2, 1); } #[tokio::test] async fn items_search_with_source_filter() { let pool = test_db().await; let feed_a = make_feed(&pool, "rss", "RSS Feed").await; let feed_b = make_feed(&pool, "hn", "HN Feed").await; make_item(&pool, &feed_a, "rss1").await; make_item(&pool, &feed_b, "hn1").await; let items = ItemsRepository::new(pool); // Search all - should find both (searching for "Title" which is in default title) let all = items.list_search("Title", None, false, false, 10, 0).await.unwrap(); assert_eq!(all.len(), 2); // Search with source filter let rss_only = items.list_search("Title", Some("rss"), false, false, 10, 0).await.unwrap(); assert_eq!(rss_only.len(), 1); assert_eq!(rss_only[0].busser_id.as_str(), "rss"); } #[tokio::test] async fn items_search_unread_only() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let item1 = make_item(&pool, &feed, "i1").await; make_item(&pool, &feed, "i2").await; let items = ItemsRepository::new(pool); items.mark_read(item1.id, true).await.unwrap(); let unread = items.list_search("Title", None, true, false, 10, 0).await.unwrap(); assert_eq!(unread.len(), 1); } #[tokio::test] async fn items_search_starred_only() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; let item1 = make_item(&pool, &feed, "i1").await; make_item(&pool, &feed, "i2").await; let items = ItemsRepository::new(pool); items.mark_starred(item1.id, true).await.unwrap(); let starred = items.list_search("Title", None, false, true, 10, 0).await.unwrap(); assert_eq!(starred.len(), 1); } // ── TagsRepository (additional) ────────────────────────────── #[tokio::test] async fn tags_all_feed_tags_bulk() { let pool = test_db().await; let feed_a = make_feed(&pool, "rss", "A").await; let feed_b = make_feed(&pool, "hn", "B").await; let tags = TagsRepository::new(pool); tags.set_tags(feed_a.id, &["tech".into(), "news".into()]).await.unwrap(); tags.set_tags(feed_b.id, &["fun".into()]).await.unwrap(); let all = tags.all_feed_tags().await.unwrap(); assert_eq!(all.len(), 3); // Verify ordering (by feed_id, then tag) assert!(all.iter().any(|(id, tag)| *id == feed_a.id && tag == "news")); assert!(all.iter().any(|(id, tag)| *id == feed_a.id && tag == "tech")); assert!(all.iter().any(|(id, tag)| *id == feed_b.id && tag == "fun")); } // ── StateRepository (additional) ───────────────────────────── #[tokio::test] async fn state_list_returns_ordered() { let pool = test_db().await; let state = StateRepository::new(pool); state.set("rss", "cursor", "abc").await.unwrap(); state.set("rss", "auth_token", "xyz").await.unwrap(); state.set("rss", "page", "2").await.unwrap(); state.set("hn", "unrelated", "val").await.unwrap(); let list = state.list("rss").await.unwrap(); assert_eq!(list.len(), 3); assert_eq!(list[0].key, "auth_token"); // alphabetical assert_eq!(list[1].key, "cursor"); assert_eq!(list[2].key, "page"); } // ── Circuit Breaker ───────────────────────────────────────── #[tokio::test] async fn circuit_breaker_new_feed_not_broken() { let pool = test_db().await; let feed = make_feed(&pool, "rss", "Feed").await; assert!(!feed.circuit_broken); } #[tokio::test] async fn circuit_breaker_trips_at_threshold() { let pool = test_db().await; let feeds = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Feed").await; // Record failures up to threshold - 1: should not trip for i in 0..(CIRCUIT_BREAKER_THRESHOLD - 1) { let tripped = feeds .record_fetch_failure(feed.id, &format!("error {}", i)) .await .unwrap(); assert!(!tripped, "should not trip at failure {}", i + 1); } let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(!f.circuit_broken); assert_eq!(f.consecutive_failures, CIRCUIT_BREAKER_THRESHOLD - 1); // The Nth failure should trip the breaker let tripped = feeds .record_fetch_failure(feed.id, "final error") .await .unwrap(); assert!(tripped, "should trip at threshold"); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(f.circuit_broken); assert_eq!(f.consecutive_failures, CIRCUIT_BREAKER_THRESHOLD); } #[tokio::test] async fn circuit_breaker_does_not_trip_again_once_broken() { let pool = test_db().await; let feeds = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Feed").await; // Trip the breaker for _ in 0..CIRCUIT_BREAKER_THRESHOLD { feeds.record_fetch_failure(feed.id, "err").await.unwrap(); } let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(f.circuit_broken); // Additional failures should return false (already broken) let tripped = feeds .record_fetch_failure(feed.id, "extra error") .await .unwrap(); assert!(!tripped, "should not re-trip"); } #[tokio::test] async fn circuit_breaker_reset_clears_state() { let pool = test_db().await; let feeds = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Feed").await; // Trip the breaker for _ in 0..CIRCUIT_BREAKER_THRESHOLD { feeds.record_fetch_failure(feed.id, "err").await.unwrap(); } let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(f.circuit_broken); assert_eq!(f.consecutive_failures, CIRCUIT_BREAKER_THRESHOLD); assert!(f.last_error.is_some()); // Reset feeds.reset_circuit_breaker(feed.id).await.unwrap(); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(!f.circuit_broken); assert_eq!(f.consecutive_failures, 0); assert!(f.last_error.is_none()); } #[tokio::test] async fn circuit_breaker_success_resets_counter_but_not_broken_flag() { let pool = test_db().await; let feeds = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Feed").await; // Accumulate some failures (below threshold) for _ in 0..5 { feeds.record_fetch_failure(feed.id, "err").await.unwrap(); } let f = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 5); assert!(!f.circuit_broken); // Success resets the counter feeds.record_fetch_success(feed.id).await.unwrap(); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert_eq!(f.consecutive_failures, 0); assert!(!f.circuit_broken); } #[tokio::test] async fn circuit_broken_feed_excluded_from_list_enabled() { let pool = test_db().await; let feeds = FeedsRepository::new(pool.clone()); let feed_ok = make_feed(&pool, "rss", "OK Feed").await; let feed_broken = make_feed(&pool, "hn", "Broken Feed").await; // Trip the circuit breaker on one feed for _ in 0..CIRCUIT_BREAKER_THRESHOLD { feeds .record_fetch_failure(feed_broken.id, "err") .await .unwrap(); } let enabled = feeds.list_enabled().await.unwrap(); assert_eq!(enabled.len(), 1); assert_eq!(enabled[0].id, feed_ok.id); // list_all should still include both let all = feeds.list_all().await.unwrap(); assert_eq!(all.len(), 2); } #[tokio::test] async fn circuit_breaker_set_and_clear() { let pool = test_db().await; let feeds = FeedsRepository::new(pool.clone()); let feed = make_feed(&pool, "rss", "Feed").await; assert!(!feed.circuit_broken); feeds.set_circuit_broken(feed.id, true).await.unwrap(); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(f.circuit_broken); feeds.set_circuit_broken(feed.id, false).await.unwrap(); let f = feeds.get(feed.id).await.unwrap().unwrap(); assert!(!f.circuit_broken); } }