Skip to main content

max / balanced_breakfast

102.2 KB · 2873 lines History Blame Raw
1 //! Database repositories for feeds, items, and busser state.
2 //!
3 //! Each repository wraps an SQLite connection pool and provides typed
4 //! CRUD operations for a single table.
5
6 use chrono::{DateTime, Utc};
7 use sqlx::SqlitePool;
8
9 use bb_interface::{ErrorCategory, StructuredError};
10
11 use crate::id_types::{BookmarkId, BusserStateId, FeedId, ItemId, QueryFeedId};
12 use crate::models::*;
13 use crate::TIMESTAMP_FMT;
14
15 /// Sanitize a user-provided search string for FTS5 MATCH syntax.
16 ///
17 /// Wraps each word in double quotes to prevent FTS5 syntax injection
18 /// (e.g. a user typing `AND`, `OR`, `NOT`, `NEAR`, or `NEAR/N` won't be
19 /// interpreted as FTS5 operators). Words are joined with spaces (implicit AND).
20 ///
21 /// Inside quoted strings FTS5 still honours two special characters:
22 /// - `*` at the end of the last token triggers prefix matching
23 /// - `^` at the start of the first token triggers beginning-of-column matching
24 ///
25 /// We strip those so user input like `^hello` or `world*` is treated literally.
26 /// The `column:` prefix syntax (e.g. `title:word`) is neutralised by the quoting
27 /// itself — colons inside double-quoted strings are treated as literal characters.
28 fn sanitize_fts_query(query: &str) -> String {
29 query
30 .split_whitespace()
31 .map(|word| {
32 // Escape embedded double quotes (FTS5 uses "" to represent a literal ")
33 let escaped = word.replace('"', "\"\"");
34 // Strip `^` prefix and `*` suffix — both are special inside FTS5 quotes
35 let escaped = escaped.trim_start_matches('^');
36 let escaped = escaped.trim_end_matches('*');
37 format!("\"{}\"", escaped)
38 })
39 // Drop tokens that became empty after stripping (e.g. bare `^`, `*`, `^*`)
40 .filter(|token| token != "\"\"")
41 .collect::<Vec<_>>()
42 .join(" ")
43 }
44
45 /// Maximum allowed length for a search query string.
46 ///
47 /// Queries longer than this are rejected early (returning an empty result set)
48 /// to prevent excessively large FTS5 MATCH expressions from consuming memory
49 /// or CPU in SQLite.
50 const MAX_SEARCH_QUERY_LENGTH: usize = 500;
51
52 /// Number of consecutive failures before a feed is automatically disabled.
53 ///
54 /// Once a feed accumulates this many failures without a successful fetch,
55 /// the circuit breaker trips: the feed is marked `circuit_broken = 1` and
56 /// excluded from automatic fetch scheduling until manually reset.
57 pub const CIRCUIT_BREAKER_THRESHOLD: i64 = 10;
58
59 #[derive(Clone)]
60 /// Repository for feed subscription CRUD and fetch tracking
61 pub struct FeedsRepository {
62 pool: SqlitePool,
63 }
64
65 impl FeedsRepository {
66 /// Create a new feeds repository backed by the given pool.
67 #[tracing::instrument(skip_all)]
68 pub fn new(pool: SqlitePool) -> Self {
69 Self { pool }
70 }
71
72 /// Insert a new feed and return the created row.
73 /// New feeds default to `enabled = 1` (auto-fetch active) and
74 /// `created_at = updated_at` (no separate "first modified" vs "created" notion).
75 #[tracing::instrument(skip_all)]
76 pub async fn create(&self, input: CreateFeed) -> Result<DbFeed, sqlx::Error> {
77 let id = FeedId::new();
78 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
79 // serde_json::Value is always serializable; unwrap is safe here.
80 let config = serde_json::to_string(&input.config).unwrap_or_else(|_| "{}".to_string());
81
82 sqlx::query_as(
83 r#"
84 INSERT INTO feeds (id, busser_id, name, config, enabled, created_at, updated_at)
85 VALUES (?1, ?2, ?3, ?4, 1, ?5, ?5)
86 RETURNING *
87 "#,
88 )
89 .bind(id)
90 .bind(input.busser_id.as_str())
91 .bind(&input.name)
92 .bind(&config)
93 .bind(&now)
94 .fetch_one(&self.pool)
95 .await
96 }
97
98 /// Look up a single feed by its ID. Returns `None` if not found.
99 #[tracing::instrument(skip_all)]
100 pub async fn get(&self, id: FeedId) -> Result<Option<DbFeed>, sqlx::Error> {
101 sqlx::query_as("SELECT * FROM feeds WHERE id = ?1")
102 .bind(id)
103 .fetch_optional(&self.pool)
104 .await
105 }
106
107 /// List all feeds belonging to a given busser, ordered by name.
108 #[tracing::instrument(skip_all)]
109 pub async fn get_by_busser(&self, busser_id: &str) -> Result<Vec<DbFeed>, sqlx::Error> {
110 sqlx::query_as("SELECT * FROM feeds WHERE busser_id = ?1 ORDER BY name")
111 .bind(busser_id)
112 .fetch_all(&self.pool)
113 .await
114 }
115
116 /// List only enabled feeds that are not circuit-broken, ordered by name.
117 #[tracing::instrument(skip_all)]
118 pub async fn list_enabled(&self) -> Result<Vec<DbFeed>, sqlx::Error> {
119 sqlx::query_as(
120 "SELECT * FROM feeds WHERE enabled = 1 AND circuit_broken = 0 ORDER BY name",
121 )
122 .fetch_all(&self.pool)
123 .await
124 }
125
126 /// List every feed (enabled or disabled), ordered by name.
127 #[tracing::instrument(skip_all)]
128 pub async fn list_all(&self) -> Result<Vec<DbFeed>, sqlx::Error> {
129 sqlx::query_as("SELECT * FROM feeds ORDER BY name")
130 .fetch_all(&self.pool)
131 .await
132 }
133
134 /// Update the `last_fetch` and `updated_at` timestamps to now.
135 #[tracing::instrument(skip_all)]
136 pub async fn update_last_fetch(&self, id: FeedId) -> Result<(), sqlx::Error> {
137 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
138 sqlx::query("UPDATE feeds SET last_fetch = ?1, updated_at = ?1 WHERE id = ?2")
139 .bind(&now)
140 .bind(id)
141 .execute(&self.pool)
142 .await?;
143 Ok(())
144 }
145
146 /// Enable or disable a feed.
147 #[tracing::instrument(skip_all)]
148 pub async fn set_enabled(&self, id: FeedId, enabled: bool) -> Result<(), sqlx::Error> {
149 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
150 sqlx::query("UPDATE feeds SET enabled = ?1, updated_at = ?2 WHERE id = ?3")
151 .bind(enabled)
152 .bind(&now)
153 .bind(id)
154 .execute(&self.pool)
155 .await?;
156 Ok(())
157 }
158
159 /// Record a successful fetch: reset failure counter, clear error, update timestamps.
160 #[tracing::instrument(skip_all)]
161 pub async fn record_fetch_success(&self, id: FeedId) -> Result<(), sqlx::Error> {
162 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
163 sqlx::query(
164 "UPDATE feeds SET consecutive_failures = 0, last_error = NULL, \
165 last_success_at = ?1, last_fetch = ?1, updated_at = ?1 WHERE id = ?2",
166 )
167 .bind(&now)
168 .bind(id)
169 .execute(&self.pool)
170 .await?;
171 Ok(())
172 }
173
174 /// Record a fetch failure: increment counter, store error, update timestamp.
175 ///
176 /// Returns `true` if the circuit breaker tripped (i.e. the feed just crossed
177 /// the [`CIRCUIT_BREAKER_THRESHOLD`] and was marked `circuit_broken = 1`).
178 #[tracing::instrument(skip_all)]
179 pub async fn record_fetch_failure(
180 &self,
181 id: FeedId,
182 error: &str,
183 ) -> Result<bool, sqlx::Error> {
184 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
185 sqlx::query(
186 "UPDATE feeds SET consecutive_failures = consecutive_failures + 1, \
187 last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3",
188 )
189 .bind(error)
190 .bind(&now)
191 .bind(id)
192 .execute(&self.pool)
193 .await?;
194
195 // Check if we just crossed the threshold
196 let feed = self.get(id).await?;
197 if let Some(feed) = feed {
198 if !feed.circuit_broken
199 && feed.consecutive_failures >= CIRCUIT_BREAKER_THRESHOLD
200 {
201 self.set_circuit_broken(id, true).await?;
202 return Ok(true);
203 }
204 }
205 Ok(false)
206 }
207
208 /// Record a structured fetch failure with category-aware behavior:
209 ///
210 /// - `RateLimited` — store error JSON, do NOT increment `consecutive_failures`.
211 /// - `Auth` / `Config` — increment + immediately set `circuit_broken = 1`.
212 /// - `Transient` / `Parse` / `Unknown` — increment normally (existing behavior).
213 ///
214 /// Returns `true` if the circuit breaker tripped.
215 #[tracing::instrument(skip_all)]
216 pub async fn record_fetch_failure_structured(
217 &self,
218 id: FeedId,
219 error: &StructuredError,
220 ) -> Result<bool, sqlx::Error> {
221 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
222 let error_json = error.to_json();
223
224 match error.category {
225 ErrorCategory::RateLimited => {
226 // Store error but don't increment failure counter
227 sqlx::query(
228 "UPDATE feeds SET last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3",
229 )
230 .bind(&error_json)
231 .bind(&now)
232 .bind(id)
233 .execute(&self.pool)
234 .await?;
235 Ok(false)
236 }
237 ErrorCategory::Auth | ErrorCategory::Config => {
238 // Increment + immediate circuit break
239 sqlx::query(
240 "UPDATE feeds SET consecutive_failures = consecutive_failures + 1, \
241 last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3",
242 )
243 .bind(&error_json)
244 .bind(&now)
245 .bind(id)
246 .execute(&self.pool)
247 .await?;
248 self.set_circuit_broken(id, true).await?;
249 Ok(true)
250 }
251 ErrorCategory::Transient | ErrorCategory::Parse | ErrorCategory::Unknown => {
252 // Normal behavior: increment and check threshold
253 sqlx::query(
254 "UPDATE feeds SET consecutive_failures = consecutive_failures + 1, \
255 last_error = ?1, last_fetch = ?2, updated_at = ?2 WHERE id = ?3",
256 )
257 .bind(&error_json)
258 .bind(&now)
259 .bind(id)
260 .execute(&self.pool)
261 .await?;
262
263 let feed = self.get(id).await?;
264 if let Some(feed) = feed {
265 if !feed.circuit_broken
266 && feed.consecutive_failures >= CIRCUIT_BREAKER_THRESHOLD
267 {
268 self.set_circuit_broken(id, true).await?;
269 return Ok(true);
270 }
271 }
272 Ok(false)
273 }
274 }
275 }
276
277 /// Mark a feed as circuit-broken (or clear the circuit breaker).
278 #[tracing::instrument(skip_all)]
279 pub async fn set_circuit_broken(
280 &self,
281 id: FeedId,
282 broken: bool,
283 ) -> Result<(), sqlx::Error> {
284 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
285 sqlx::query(
286 "UPDATE feeds SET circuit_broken = ?1, updated_at = ?2 WHERE id = ?3",
287 )
288 .bind(broken)
289 .bind(&now)
290 .bind(id)
291 .execute(&self.pool)
292 .await?;
293 Ok(())
294 }
295
296 /// Reset the circuit breaker on a feed: clear `circuit_broken`, reset
297 /// `consecutive_failures` to 0, and clear `last_error`.
298 ///
299 /// Called when a user manually triggers a fetch for a circuit-broken feed.
300 #[tracing::instrument(skip_all)]
301 pub async fn reset_circuit_breaker(&self, id: FeedId) -> Result<(), sqlx::Error> {
302 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
303 sqlx::query(
304 "UPDATE feeds SET circuit_broken = 0, consecutive_failures = 0, \
305 last_error = NULL, updated_at = ?1 WHERE id = ?2",
306 )
307 .bind(&now)
308 .bind(id)
309 .execute(&self.pool)
310 .await?;
311 Ok(())
312 }
313
314 /// Update a feed's config JSON string.
315 #[tracing::instrument(skip_all)]
316 pub async fn update_config(&self, id: FeedId, config: &str) -> Result<(), sqlx::Error> {
317 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
318 sqlx::query("UPDATE feeds SET config = ?1, updated_at = ?2 WHERE id = ?3")
319 .bind(config)
320 .bind(&now)
321 .bind(id)
322 .execute(&self.pool)
323 .await?;
324 Ok(())
325 }
326
327 /// Update a feed's display name.
328 #[tracing::instrument(skip_all)]
329 pub async fn update_name(&self, id: FeedId, name: &str) -> Result<(), sqlx::Error> {
330 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
331 sqlx::query("UPDATE feeds SET name = ?1, updated_at = ?2 WHERE id = ?3")
332 .bind(name)
333 .bind(&now)
334 .bind(id)
335 .execute(&self.pool)
336 .await?;
337 Ok(())
338 }
339
340 /// Delete a feed by ID.
341 #[tracing::instrument(skip_all)]
342 pub async fn delete(&self, id: FeedId) -> Result<(), sqlx::Error> {
343 sqlx::query("DELETE FROM feeds WHERE id = ?1")
344 .bind(id)
345 .execute(&self.pool)
346 .await?;
347 Ok(())
348 }
349 }
350
351 #[derive(Clone)]
352 /// Repository for feed item CRUD, read/star toggling, and paginated listing
353 pub struct ItemsRepository {
354 pool: SqlitePool,
355 }
356
357 impl ItemsRepository {
358 /// Create a new items repository backed by the given pool.
359 #[tracing::instrument(skip_all)]
360 pub fn new(pool: SqlitePool) -> Self {
361 Self { pool }
362 }
363
364 /// Insert a feed item or update it if `external_id` already exists.
365 ///
366 /// The ON CONFLICT clause deliberately preserves user state (`is_read`,
367 /// `is_starred`) — these are never overwritten by a re-fetch. Content fields
368 /// (author, text, body, etc.) are updated in case the source edited the post.
369 #[tracing::instrument(skip_all)]
370 pub async fn upsert(&self, input: CreateFeedItem) -> Result<DbFeedItem, sqlx::Error> {
371 let id = ItemId::new();
372 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
373 let published = input.published_at.format(TIMESTAMP_FMT).to_string();
374 // Vec<String> is always serializable; unwrap is safe here.
375 let media = serde_json::to_string(&input.media).unwrap_or_else(|_| "[]".to_string());
376 let tags = serde_json::to_string(&input.tags).unwrap_or_else(|_| "[]".to_string());
377 let actions =
378 serde_json::to_string(&input.actions).unwrap_or_else(|_| "[]".to_string());
379
380 sqlx::query_as(
381 r#"
382 INSERT INTO feed_items (
383 id, external_id, feed_id, busser_id,
384 bite_author, bite_text, bite_secondary, bite_indicator,
385 title, body, url, media, actions,
386 published_at, fetched_at, source_name, score, tags,
387 is_read, is_starred, created_at, updated_at
388 )
389 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, 0, 0, ?19, ?19)
390 ON CONFLICT (external_id) DO UPDATE SET
391 bite_author = EXCLUDED.bite_author,
392 bite_text = EXCLUDED.bite_text,
393 bite_secondary = EXCLUDED.bite_secondary,
394 bite_indicator = EXCLUDED.bite_indicator,
395 title = EXCLUDED.title,
396 body = EXCLUDED.body,
397 url = EXCLUDED.url,
398 media = EXCLUDED.media,
399 actions = EXCLUDED.actions,
400 score = EXCLUDED.score,
401 tags = EXCLUDED.tags,
402 updated_at = EXCLUDED.updated_at
403 RETURNING *
404 "#,
405 )
406 .bind(id)
407 .bind(&input.external_id)
408 .bind(input.feed_id)
409 .bind(input.busser_id.as_str())
410 .bind(&input.bite_author)
411 .bind(&input.bite_text)
412 .bind(&input.bite_secondary)
413 .bind(&input.bite_indicator)
414 .bind(&input.title)
415 .bind(&input.body)
416 .bind(&input.url)
417 .bind(&media)
418 .bind(&actions)
419 .bind(&published)
420 .bind(&now)
421 .bind(&input.source_name)
422 .bind(input.score)
423 .bind(&tags)
424 .bind(&now)
425 .fetch_one(&self.pool)
426 .await
427 }
428
429 /// Look up a feed item by its internal ID.
430 #[tracing::instrument(skip_all)]
431 pub async fn get(&self, id: ItemId) -> Result<Option<DbFeedItem>, sqlx::Error> {
432 sqlx::query_as("SELECT * FROM feed_items WHERE id = ?1")
433 .bind(id)
434 .fetch_optional(&self.pool)
435 .await
436 }
437
438 /// Look up a feed item by the `external_id` produced by the busser.
439 #[tracing::instrument(skip_all)]
440 pub async fn get_by_external_id(
441 &self,
442 external_id: &str,
443 ) -> Result<Option<DbFeedItem>, sqlx::Error> {
444 sqlx::query_as("SELECT * FROM feed_items WHERE external_id = ?1")
445 .bind(external_id)
446 .fetch_optional(&self.pool)
447 .await
448 }
449
450 /// List all items ordered by `published_at` descending, with pagination.
451 /// Sorted by publish date (not fetch date) so items appear where the author
452 /// intended, even if fetched out of order during backfill.
453 #[tracing::instrument(skip_all)]
454 pub async fn list_all(&self, limit: i64, offset: i64) -> Result<Vec<DbFeedItem>, sqlx::Error> {
455 sqlx::query_as(
456 "SELECT * FROM feed_items ORDER BY published_at DESC LIMIT ?1 OFFSET ?2",
457 )
458 .bind(limit)
459 .bind(offset)
460 .fetch_all(&self.pool)
461 .await
462 }
463
464 /// List items belonging to a specific feed, newest first.
465 #[tracing::instrument(skip_all)]
466 pub async fn list_by_feed(
467 &self,
468 feed_id: FeedId,
469 limit: i64,
470 offset: i64,
471 ) -> Result<Vec<DbFeedItem>, sqlx::Error> {
472 sqlx::query_as(
473 "SELECT * FROM feed_items WHERE feed_id = ?1 ORDER BY published_at DESC LIMIT ?2 OFFSET ?3",
474 )
475 .bind(feed_id)
476 .bind(limit)
477 .bind(offset)
478 .fetch_all(&self.pool)
479 .await
480 }
481
482 /// List items from a specific busser source, newest first.
483 #[tracing::instrument(skip_all)]
484 pub async fn list_by_busser(
485 &self,
486 busser_id: &str,
487 limit: i64,
488 offset: i64,
489 ) -> Result<Vec<DbFeedItem>, sqlx::Error> {
490 sqlx::query_as(
491 "SELECT * FROM feed_items WHERE busser_id = ?1 ORDER BY published_at DESC LIMIT ?2 OFFSET ?3",
492 )
493 .bind(busser_id)
494 .bind(limit)
495 .bind(offset)
496 .fetch_all(&self.pool)
497 .await
498 }
499
500 /// List unread items only, newest first.
501 #[tracing::instrument(skip_all)]
502 pub async fn list_unread(&self, limit: i64, offset: i64) -> Result<Vec<DbFeedItem>, sqlx::Error> {
503 sqlx::query_as(
504 "SELECT * FROM feed_items WHERE is_read = 0 ORDER BY published_at DESC LIMIT ?1 OFFSET ?2",
505 )
506 .bind(limit)
507 .bind(offset)
508 .fetch_all(&self.pool)
509 .await
510 }
511
512 /// List starred items only, newest first.
513 #[tracing::instrument(skip_all)]
514 pub async fn list_starred(
515 &self,
516 limit: i64,
517 offset: i64,
518 ) -> Result<Vec<DbFeedItem>, sqlx::Error> {
519 sqlx::query_as(
520 "SELECT * FROM feed_items WHERE is_starred = 1 ORDER BY published_at DESC LIMIT ?1 OFFSET ?2",
521 )
522 .bind(limit)
523 .bind(offset)
524 .fetch_all(&self.pool)
525 .await
526 }
527
528 /// Full-text search using FTS5 across title, body, and bite_text.
529 ///
530 /// Uses the `feed_items_fts` virtual table for fast ranked search.
531 /// Results are ordered by FTS5 relevance then by `published_at DESC`.
532 /// Additional boolean filters (source, unread, starred) are combined.
533 #[tracing::instrument(skip_all)]
534 pub async fn list_search(
535 &self,
536 query: &str,
537 source: Option<&str>,
538 unread_only: bool,
539 starred_only: bool,
540 limit: i64,
541 offset: i64,
542 ) -> Result<Vec<DbFeedItem>, sqlx::Error> {
543 // Reject excessively long queries before any processing.
544 if query.len() > MAX_SEARCH_QUERY_LENGTH {
545 return Ok(vec![]);
546 }
547
548 let fts_query = sanitize_fts_query(query);
549
550 // If sanitization stripped everything (e.g. query was just `*` or `^`),
551 // return early — an empty MATCH expression would be a SQL error.
552 if fts_query.is_empty() {
553 return Ok(vec![]);
554 }
555
556 let mut sql = String::from(
557 "SELECT fi.* FROM feed_items fi \
558 INNER JOIN feed_items_fts fts ON fi.rowid = fts.rowid \
559 WHERE feed_items_fts MATCH ?1",
560 );
561 if source.is_some() {
562 sql.push_str(" AND fi.busser_id = ?4");
563 }
564 if unread_only {
565 sql.push_str(" AND fi.is_read = 0");
566 }
567 if starred_only {
568 sql.push_str(" AND fi.is_starred = 1");
569 }
570 sql.push_str(" ORDER BY fts.rank, fi.published_at DESC LIMIT ?2 OFFSET ?3");
571
572 let mut q = sqlx::query_as::<_, DbFeedItem>(&sql)
573 .bind(&fts_query) // ?1
574 .bind(limit) // ?2
575 .bind(offset); // ?3
576
577 if let Some(src) = source {
578 q = q.bind(src); // ?4
579 }
580
581 q.fetch_all(&self.pool).await
582 }
583
584 /// List feed items with any combination of filters pushed into SQL.
585 ///
586 /// Unifies `list_all`, `list_by_busser`, `list_unread`, `list_starred`,
587 /// and `list_search` into a single method so callers don't need an
588 /// if/else chain that drops filter combinations.
589 #[tracing::instrument(skip_all)]
590 pub async fn list_filtered(
591 &self,
592 search: Option<&str>,
593 source: Option<&str>,
594 unread_only: bool,
595 starred_only: bool,
596 limit: i64,
597 offset: i64,
598 ) -> Result<Vec<DbFeedItem>, sqlx::Error> {
599 // When a search query is present, use FTS5.
600 if let Some(query) = search {
601 return self
602 .list_search(query, source, unread_only, starred_only, limit, offset)
603 .await;
604 }
605
606 // Build a dynamic query with conditional WHERE clauses.
607 let mut sql = String::from("SELECT * FROM feed_items WHERE 1=1");
608 if source.is_some() {
609 sql.push_str(" AND busser_id = ?3");
610 }
611 if unread_only {
612 sql.push_str(" AND is_read = 0");
613 }
614 if starred_only {
615 sql.push_str(" AND is_starred = 1");
616 }
617 sql.push_str(" ORDER BY published_at DESC LIMIT ?1 OFFSET ?2");
618
619 let mut q = sqlx::query_as::<_, DbFeedItem>(&sql)
620 .bind(limit) // ?1
621 .bind(offset); // ?2
622
623 if let Some(src) = source {
624 q = q.bind(src); // ?3
625 }
626
627 q.fetch_all(&self.pool).await
628 }
629
630 /// Set the read flag on a feed item.
631 #[tracing::instrument(skip_all)]
632 pub async fn mark_read(&self, id: ItemId, is_read: bool) -> Result<(), sqlx::Error> {
633 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
634 sqlx::query("UPDATE feed_items SET is_read = ?1, updated_at = ?2 WHERE id = ?3")
635 .bind(is_read)
636 .bind(&now)
637 .bind(id)
638 .execute(&self.pool)
639 .await?;
640 Ok(())
641 }
642
643 /// Set the starred flag on a feed item.
644 #[tracing::instrument(skip_all)]
645 pub async fn mark_starred(&self, id: ItemId, is_starred: bool) -> Result<(), sqlx::Error> {
646 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
647 sqlx::query("UPDATE feed_items SET is_starred = ?1, updated_at = ?2 WHERE id = ?3")
648 .bind(is_starred)
649 .bind(&now)
650 .bind(id)
651 .execute(&self.pool)
652 .await?;
653 Ok(())
654 }
655
656 /// Mark all unread items as read, optionally filtered to a specific source.
657 #[tracing::instrument(skip_all)]
658 pub async fn mark_all_read(&self, busser_id: Option<&str>) -> Result<u64, sqlx::Error> {
659 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
660 let result = match busser_id {
661 Some(id) => {
662 sqlx::query(
663 "UPDATE feed_items SET is_read = 1, updated_at = ?1 WHERE is_read = 0 AND busser_id = ?2",
664 )
665 .bind(&now)
666 .bind(id)
667 .execute(&self.pool)
668 .await?
669 }
670 None => {
671 sqlx::query(
672 "UPDATE feed_items SET is_read = 1, updated_at = ?1 WHERE is_read = 0",
673 )
674 .bind(&now)
675 .execute(&self.pool)
676 .await?
677 }
678 };
679 Ok(result.rows_affected())
680 }
681
682 /// Count all feed items.
683 #[tracing::instrument(skip_all)]
684 pub async fn count_all(&self) -> Result<i64, sqlx::Error> {
685 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items")
686 .fetch_one(&self.pool)
687 .await?;
688 Ok(row.0)
689 }
690
691 /// Count items from a specific busser source.
692 #[tracing::instrument(skip_all)]
693 pub async fn count_by_busser(&self, busser_id: &str) -> Result<i64, sqlx::Error> {
694 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE busser_id = ?1")
695 .bind(busser_id)
696 .fetch_one(&self.pool)
697 .await?;
698 Ok(row.0)
699 }
700
701 /// Count unread items across all sources.
702 #[tracing::instrument(skip_all)]
703 pub async fn count_unread(&self) -> Result<i64, sqlx::Error> {
704 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE is_read = 0")
705 .fetch_one(&self.pool)
706 .await?;
707 Ok(row.0)
708 }
709
710 /// Count starred items.
711 #[tracing::instrument(skip_all)]
712 pub async fn count_starred(&self) -> Result<i64, sqlx::Error> {
713 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE is_starred = 1")
714 .fetch_one(&self.pool)
715 .await?;
716 Ok(row.0)
717 }
718
719 /// Count unread items from a specific busser source.
720 #[tracing::instrument(skip_all)]
721 pub async fn count_unread_by_busser(&self, busser_id: &str) -> Result<i64, sqlx::Error> {
722 let row: (i64,) =
723 sqlx::query_as("SELECT COUNT(*) FROM feed_items WHERE busser_id = ?1 AND is_read = 0")
724 .bind(busser_id)
725 .fetch_one(&self.pool)
726 .await?;
727 Ok(row.0)
728 }
729
730 /// Count items matching a combination of source, unread, and starred filters.
731 #[tracing::instrument(skip_all)]
732 pub async fn count_filtered(
733 &self,
734 source: Option<&str>,
735 unread_only: bool,
736 starred_only: bool,
737 ) -> Result<i64, sqlx::Error> {
738 let mut sql = "SELECT COUNT(*) FROM feed_items WHERE 1=1".to_string();
739 if source.is_some() {
740 sql.push_str(" AND busser_id = ?1");
741 }
742 if unread_only {
743 sql.push_str(" AND is_read = 0");
744 }
745 if starred_only {
746 sql.push_str(" AND is_starred = 1");
747 }
748 let mut query = sqlx::query_as::<_, (i64,)>(&sql);
749 if let Some(s) = source {
750 query = query.bind(s);
751 }
752 let row = query.fetch_one(&self.pool).await?;
753 Ok(row.0)
754 }
755
756 /// Get total and unread counts for all busser sources in a single query.
757 ///
758 /// Returns `(busser_id, total_count, unread_count)` tuples. This avoids
759 /// issuing separate count queries per source (N+1 problem).
760 pub async fn counts_by_busser(&self) -> Result<Vec<(String, i64, i64)>, sqlx::Error> {
761 let rows: Vec<(String, i64, i64)> = sqlx::query_as(
762 r#"
763 SELECT busser_id,
764 COUNT(*) AS total_count,
765 SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END) AS unread_count
766 FROM feed_items
767 GROUP BY busser_id
768 "#,
769 )
770 .fetch_all(&self.pool)
771 .await?;
772 Ok(rows)
773 }
774
775 /// Delete a single feed item by ID.
776 #[tracing::instrument(skip_all)]
777 pub async fn delete(&self, id: ItemId) -> Result<(), sqlx::Error> {
778 sqlx::query("DELETE FROM feed_items WHERE id = ?1")
779 .bind(id)
780 .execute(&self.pool)
781 .await?;
782 Ok(())
783 }
784
785 /// Delete all items belonging to a feed. Returns the number of rows removed.
786 #[tracing::instrument(skip_all)]
787 pub async fn delete_by_feed(&self, feed_id: FeedId) -> Result<u64, sqlx::Error> {
788 let result = sqlx::query("DELETE FROM feed_items WHERE feed_id = ?1")
789 .bind(feed_id)
790 .execute(&self.pool)
791 .await?;
792 Ok(result.rows_affected())
793 }
794
795 /// Delete old read items that are not starred. Returns the number of rows removed.
796 ///
797 /// Items published before `before` that have been read and are not starred
798 /// will be permanently deleted. Starred items are always preserved regardless
799 /// of age or read status.
800 #[tracing::instrument(skip_all)]
801 pub async fn delete_stale_read(&self, before: DateTime<Utc>) -> Result<u64, sqlx::Error> {
802 let cutoff = before.format(TIMESTAMP_FMT).to_string();
803 let result = sqlx::query(
804 "DELETE FROM feed_items WHERE is_read = 1 AND is_starred = 0 AND published_at < ?1",
805 )
806 .bind(&cutoff)
807 .execute(&self.pool)
808 .await?;
809 Ok(result.rows_affected())
810 }
811 }
812
813 #[derive(Clone)]
814 /// Repository for per-feed tag assignment and listing
815 pub struct TagsRepository {
816 pool: SqlitePool,
817 }
818
819 impl TagsRepository {
820 /// Create a new tags repository backed by the given pool.
821 #[tracing::instrument(skip_all)]
822 pub fn new(pool: SqlitePool) -> Self {
823 Self { pool }
824 }
825
826 /// Replace all tags on a feed. Deletes existing tags, then inserts the new set.
827 ///
828 /// Wrapped in a transaction so the delete + inserts are atomic — a failure
829 /// mid-way won't leave the feed with zero tags.
830 #[tracing::instrument(skip_all)]
831 pub async fn set_tags(&self, feed_id: FeedId, tags: &[String]) -> Result<(), sqlx::Error> {
832 let mut tx = self.pool.begin().await?;
833
834 sqlx::query("DELETE FROM feed_tags WHERE feed_id = ?1")
835 .bind(feed_id)
836 .execute(&mut *tx)
837 .await?;
838
839 for tag in tags {
840 if !tag.is_empty() {
841 sqlx::query("INSERT OR IGNORE INTO feed_tags (feed_id, tag) VALUES (?1, ?2)")
842 .bind(feed_id)
843 .bind(tag)
844 .execute(&mut *tx)
845 .await?;
846 }
847 }
848
849 tx.commit().await?;
850 Ok(())
851 }
852
853 /// Add a single tag to a feed (idempotent).
854 #[tracing::instrument(skip_all)]
855 pub async fn add_tag(&self, feed_id: FeedId, tag: &str) -> Result<(), sqlx::Error> {
856 sqlx::query("INSERT OR IGNORE INTO feed_tags (feed_id, tag) VALUES (?1, ?2)")
857 .bind(feed_id)
858 .bind(tag)
859 .execute(&self.pool)
860 .await?;
861 Ok(())
862 }
863
864 /// Remove a single tag from a feed.
865 #[tracing::instrument(skip_all)]
866 pub async fn remove_tag(&self, feed_id: FeedId, tag: &str) -> Result<(), sqlx::Error> {
867 sqlx::query("DELETE FROM feed_tags WHERE feed_id = ?1 AND tag = ?2")
868 .bind(feed_id)
869 .bind(tag)
870 .execute(&self.pool)
871 .await?;
872 Ok(())
873 }
874
875 /// Get all tags for a feed, ordered alphabetically.
876 #[tracing::instrument(skip_all)]
877 pub async fn get_tags(&self, feed_id: FeedId) -> Result<Vec<String>, sqlx::Error> {
878 let rows: Vec<(String,)> =
879 sqlx::query_as("SELECT tag FROM feed_tags WHERE feed_id = ?1 ORDER BY tag")
880 .bind(feed_id)
881 .fetch_all(&self.pool)
882 .await?;
883 Ok(rows.into_iter().map(|(t,)| t).collect())
884 }
885
886 /// List all distinct tags across all feeds, ordered alphabetically.
887 #[tracing::instrument(skip_all)]
888 pub async fn list_all_tags(&self) -> Result<Vec<String>, sqlx::Error> {
889 let rows: Vec<(String,)> =
890 sqlx::query_as("SELECT DISTINCT tag FROM feed_tags ORDER BY tag")
891 .fetch_all(&self.pool)
892 .await?;
893 Ok(rows.into_iter().map(|(t,)| t).collect())
894 }
895
896 /// Get all (feed_id, tag) pairs for bulk rendering.
897 #[tracing::instrument(skip_all)]
898 pub async fn all_feed_tags(&self) -> Result<Vec<(FeedId, String)>, sqlx::Error> {
899 sqlx::query_as("SELECT feed_id, tag FROM feed_tags ORDER BY feed_id, tag")
900 .fetch_all(&self.pool)
901 .await
902 }
903
904 /// Get feed IDs that have any of the given tags.
905 #[tracing::instrument(skip_all)]
906 pub async fn feed_ids_with_tags(&self, tags: &[String]) -> Result<Vec<FeedId>, sqlx::Error> {
907 if tags.is_empty() {
908 return Ok(Vec::new());
909 }
910 // Build a query with placeholders for each tag
911 let placeholders: Vec<String> = (1..=tags.len()).map(|i| format!("?{}", i)).collect();
912 let sql = format!(
913 "SELECT DISTINCT feed_id FROM feed_tags WHERE tag IN ({})",
914 placeholders.join(", ")
915 );
916 let mut query = sqlx::query_as::<_, (FeedId,)>(&sql);
917 for tag in tags {
918 query = query.bind(tag);
919 }
920 let rows = query.fetch_all(&self.pool).await?;
921 Ok(rows.into_iter().map(|(id,)| id).collect())
922 }
923 }
924
925 #[derive(Clone)]
926 /// Repository for busser key-value state (cursors, tokens, pagination markers)
927 pub struct StateRepository {
928 pool: SqlitePool,
929 }
930
931 impl StateRepository {
932 /// Create a new state repository backed by the given pool.
933 #[tracing::instrument(skip_all)]
934 pub fn new(pool: SqlitePool) -> Self {
935 Self { pool }
936 }
937
938 /// Get a single state value for a busser by key.
939 #[tracing::instrument(skip_all)]
940 pub async fn get(&self, busser_id: &str, key: &str) -> Result<Option<String>, sqlx::Error> {
941 let row: Option<(String,)> =
942 sqlx::query_as("SELECT value FROM busser_state WHERE busser_id = ?1 AND key = ?2")
943 .bind(busser_id)
944 .bind(key)
945 .fetch_optional(&self.pool)
946 .await?;
947 Ok(row.map(|(v,)| v))
948 }
949
950 /// Set a state value, inserting or updating on conflict.
951 /// Uses upsert on the `(busser_id, key)` composite unique constraint so
952 /// callers don't need to check existence first.
953 #[tracing::instrument(skip_all)]
954 pub async fn set(&self, busser_id: &str, key: &str, value: &str) -> Result<(), sqlx::Error> {
955 let id = BusserStateId::new();
956 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
957 sqlx::query(
958 r#"
959 INSERT INTO busser_state (id, busser_id, key, value, created_at, updated_at)
960 VALUES (?1, ?2, ?3, ?4, ?5, ?5)
961 ON CONFLICT (busser_id, key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at
962 "#,
963 )
964 .bind(id)
965 .bind(busser_id)
966 .bind(key)
967 .bind(value)
968 .bind(&now)
969 .execute(&self.pool)
970 .await?;
971 Ok(())
972 }
973
974 /// Delete a single state entry by busser ID and key.
975 #[tracing::instrument(skip_all)]
976 pub async fn delete(&self, busser_id: &str, key: &str) -> Result<(), sqlx::Error> {
977 sqlx::query("DELETE FROM busser_state WHERE busser_id = ?1 AND key = ?2")
978 .bind(busser_id)
979 .bind(key)
980 .execute(&self.pool)
981 .await?;
982 Ok(())
983 }
984
985 /// Delete all state entries for a busser. Returns the number of rows removed.
986 #[tracing::instrument(skip_all)]
987 pub async fn delete_all(&self, busser_id: &str) -> Result<u64, sqlx::Error> {
988 let result = sqlx::query("DELETE FROM busser_state WHERE busser_id = ?1")
989 .bind(busser_id)
990 .execute(&self.pool)
991 .await?;
992 Ok(result.rows_affected())
993 }
994
995 /// List all state entries for a busser, ordered by key.
996 #[tracing::instrument(skip_all)]
997 pub async fn list(&self, busser_id: &str) -> Result<Vec<DbBusserState>, sqlx::Error> {
998 sqlx::query_as("SELECT * FROM busser_state WHERE busser_id = ?1 ORDER BY key")
999 .bind(busser_id)
1000 .fetch_all(&self.pool)
1001 .await
1002 }
1003 }
1004
1005 #[derive(Clone)]
1006 /// Repository for user_config key-value pairs (theme, welcome flag, etc.)
1007 pub struct ConfigRepository {
1008 pool: SqlitePool,
1009 }
1010
1011 impl ConfigRepository {
1012 #[tracing::instrument(skip_all)]
1013 pub fn new(pool: SqlitePool) -> Self {
1014 Self { pool }
1015 }
1016
1017 /// Get a config value by key.
1018 #[tracing::instrument(skip_all)]
1019 pub async fn get(&self, key: &str) -> Result<Option<String>, sqlx::Error> {
1020 let row: Option<(String,)> =
1021 sqlx::query_as("SELECT value FROM user_config WHERE key = ?1")
1022 .bind(key)
1023 .fetch_optional(&self.pool)
1024 .await?;
1025 Ok(row.map(|(v,)| v))
1026 }
1027
1028 /// Set a config value, inserting or updating on conflict.
1029 #[tracing::instrument(skip_all)]
1030 pub async fn set(&self, key: &str, value: &str) -> Result<(), sqlx::Error> {
1031 sqlx::query(
1032 r#"
1033 INSERT INTO user_config (key, value)
1034 VALUES (?1, ?2)
1035 ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value
1036 "#,
1037 )
1038 .bind(key)
1039 .bind(value)
1040 .execute(&self.pool)
1041 .await?;
1042 Ok(())
1043 }
1044
1045 /// Delete a config entry by key.
1046 #[tracing::instrument(skip_all)]
1047 pub async fn delete(&self, key: &str) -> Result<(), sqlx::Error> {
1048 sqlx::query("DELETE FROM user_config WHERE key = ?1")
1049 .bind(key)
1050 .execute(&self.pool)
1051 .await?;
1052 Ok(())
1053 }
1054 }
1055
1056 #[derive(Clone)]
1057 /// Repository for saved query feed (virtual source) CRUD
1058 pub struct QueryFeedsRepository {
1059 pool: SqlitePool,
1060 }
1061
1062 impl QueryFeedsRepository {
1063 #[tracing::instrument(skip_all)]
1064 pub fn new(pool: SqlitePool) -> Self {
1065 Self { pool }
1066 }
1067
1068 /// Insert a new query feed and return the created row.
1069 pub async fn create(&self, input: CreateQueryFeed) -> Result<DbQueryFeed, sqlx::Error> {
1070 let id = QueryFeedId::new();
1071 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
1072 let rules_json =
1073 serde_json::to_string(&input.rules).unwrap_or_else(|_| "[]".to_string());
1074
1075 sqlx::query_as(
1076 r#"
1077 INSERT INTO query_feeds (id, name, rules, created_at, updated_at)
1078 VALUES (?1, ?2, ?3, ?4, ?4)
1079 RETURNING *
1080 "#,
1081 )
1082 .bind(id)
1083 .bind(&input.name)
1084 .bind(&rules_json)
1085 .bind(&now)
1086 .fetch_one(&self.pool)
1087 .await
1088 }
1089
1090 /// Look up a single query feed by ID. Returns `None` if not found.
1091 #[tracing::instrument(skip_all)]
1092 pub async fn get(&self, id: QueryFeedId) -> Result<Option<DbQueryFeed>, sqlx::Error> {
1093 sqlx::query_as("SELECT * FROM query_feeds WHERE id = ?1")
1094 .bind(id)
1095 .fetch_optional(&self.pool)
1096 .await
1097 }
1098
1099 /// List all query feeds, ordered by name.
1100 #[tracing::instrument(skip_all)]
1101 pub async fn list_all(&self) -> Result<Vec<DbQueryFeed>, sqlx::Error> {
1102 sqlx::query_as("SELECT * FROM query_feeds ORDER BY name")
1103 .fetch_all(&self.pool)
1104 .await
1105 }
1106
1107 /// Update a query feed's name and rules.
1108 #[tracing::instrument(skip_all)]
1109 pub async fn update(
1110 &self,
1111 id: QueryFeedId,
1112 name: &str,
1113 rules: &[QueryCondition],
1114 ) -> Result<(), sqlx::Error> {
1115 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
1116 let rules_json = serde_json::to_string(rules).unwrap_or_else(|_| "[]".to_string());
1117
1118 sqlx::query(
1119 "UPDATE query_feeds SET name = ?1, rules = ?2, updated_at = ?3 WHERE id = ?4",
1120 )
1121 .bind(name)
1122 .bind(&rules_json)
1123 .bind(&now)
1124 .bind(id)
1125 .execute(&self.pool)
1126 .await?;
1127 Ok(())
1128 }
1129
1130 /// Delete a query feed by ID.
1131 #[tracing::instrument(skip_all)]
1132 pub async fn delete(&self, id: QueryFeedId) -> Result<(), sqlx::Error> {
1133 sqlx::query("DELETE FROM query_feeds WHERE id = ?1")
1134 .bind(id)
1135 .execute(&self.pool)
1136 .await?;
1137 Ok(())
1138 }
1139 }
1140
1141 // ── BookmarksRepository ──────────────────────────────────────────
1142
1143 #[derive(Clone)]
1144 /// Repository for bookmark (reading list) CRUD
1145 pub struct BookmarksRepository {
1146 pool: SqlitePool,
1147 }
1148
1149 impl BookmarksRepository {
1150 pub fn new(pool: SqlitePool) -> Self {
1151 Self { pool }
1152 }
1153
1154 /// Insert a new bookmark and return the created row.
1155 #[tracing::instrument(skip_all)]
1156 pub async fn create(&self, input: CreateBookmark) -> Result<DbBookmark, sqlx::Error> {
1157 let id = BookmarkId::new();
1158 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
1159
1160 let mut tx = self.pool.begin().await?;
1161
1162 let bookmark: DbBookmark = sqlx::query_as(
1163 r#"
1164 INSERT INTO bookmarks (id, url, title, description, author, source_name,
1165 feed_item_id, notes, is_pinned, created_at, updated_at)
1166 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 0, ?9, ?9)
1167 RETURNING *
1168 "#,
1169 )
1170 .bind(id)
1171 .bind(&input.url)
1172 .bind(&input.title)
1173 .bind(&input.description)
1174 .bind(&input.author)
1175 .bind(&input.source_name)
1176 .bind(&input.feed_item_id)
1177 .bind(&input.notes)
1178 .bind(&now)
1179 .fetch_one(&mut *tx)
1180 .await?;
1181
1182 // Insert tags
1183 for tag in &input.tags {
1184 let tag = tag.trim();
1185 if tag.is_empty() {
1186 continue;
1187 }
1188 sqlx::query("INSERT OR IGNORE INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)")
1189 .bind(id)
1190 .bind(tag)
1191 .execute(&mut *tx)
1192 .await?;
1193 }
1194
1195 tx.commit().await?;
1196 Ok(bookmark)
1197 }
1198
1199 /// Look up a single bookmark by ID. Returns `None` if not found.
1200 #[tracing::instrument(skip_all)]
1201 pub async fn get(&self, id: BookmarkId) -> Result<Option<DbBookmark>, sqlx::Error> {
1202 sqlx::query_as("SELECT * FROM bookmarks WHERE id = ?1")
1203 .bind(id)
1204 .fetch_optional(&self.pool)
1205 .await
1206 }
1207
1208 /// Look up a bookmark by URL. Returns `None` if not found.
1209 #[tracing::instrument(skip_all)]
1210 pub async fn get_by_url(&self, url: &str) -> Result<Option<DbBookmark>, sqlx::Error> {
1211 sqlx::query_as("SELECT * FROM bookmarks WHERE url = ?1")
1212 .bind(url)
1213 .fetch_optional(&self.pool)
1214 .await
1215 }
1216
1217 /// Look up a bookmark by its linked feed item ID. Returns `None` if not found.
1218 #[tracing::instrument(skip_all)]
1219 pub async fn get_by_feed_item(&self, feed_item_id: &str) -> Result<Option<DbBookmark>, sqlx::Error> {
1220 sqlx::query_as("SELECT * FROM bookmarks WHERE feed_item_id = ?1")
1221 .bind(feed_item_id)
1222 .fetch_optional(&self.pool)
1223 .await
1224 }
1225
1226 /// List bookmarks, optionally filtered by tag. Ordered by pinned first, then newest.
1227 #[tracing::instrument(skip_all)]
1228 pub async fn list(&self, tag: Option<&str>) -> Result<Vec<DbBookmark>, sqlx::Error> {
1229 match tag {
1230 Some(tag) => {
1231 sqlx::query_as(
1232 r#"
1233 SELECT b.* FROM bookmarks b
1234 INNER JOIN bookmark_tags bt ON bt.bookmark_id = b.id
1235 WHERE bt.tag = ?1
1236 ORDER BY b.is_pinned DESC, b.created_at DESC
1237 "#,
1238 )
1239 .bind(tag)
1240 .fetch_all(&self.pool)
1241 .await
1242 }
1243 None => {
1244 sqlx::query_as(
1245 "SELECT * FROM bookmarks ORDER BY is_pinned DESC, created_at DESC",
1246 )
1247 .fetch_all(&self.pool)
1248 .await
1249 }
1250 }
1251 }
1252
1253 /// Update a bookmark's mutable fields.
1254 #[tracing::instrument(skip_all)]
1255 pub async fn update(&self, id: BookmarkId, input: UpdateBookmark) -> Result<(), sqlx::Error> {
1256 let now = Utc::now().format(TIMESTAMP_FMT).to_string();
1257
1258 sqlx::query(
1259 r#"
1260 UPDATE bookmarks SET
1261 title = COALESCE(?2, title),
1262 description = COALESCE(?3, description),
1263 notes = COALESCE(?4, notes),
1264 is_pinned = COALESCE(?5, is_pinned),
1265 updated_at = ?1
1266 WHERE id = ?6
1267 "#,
1268 )
1269 .bind(&now)
1270 .bind(&input.title)
1271 .bind(&input.description)
1272 .bind(&input.notes)
1273 .bind(input.is_pinned.map(|b| b as i32))
1274 .bind(id)
1275 .execute(&self.pool)
1276 .await?;
1277 Ok(())
1278 }
1279
1280 /// Delete a bookmark by ID (cascade deletes its tags).
1281 #[tracing::instrument(skip_all)]
1282 pub async fn delete(&self, id: BookmarkId) -> Result<(), sqlx::Error> {
1283 sqlx::query("DELETE FROM bookmarks WHERE id = ?1")
1284 .bind(id)
1285 .execute(&self.pool)
1286 .await?;
1287 Ok(())
1288 }
1289
1290 /// Replace all tags for a bookmark (delete-all-then-insert).
1291 #[tracing::instrument(skip_all)]
1292 pub async fn set_tags(&self, id: BookmarkId, tags: &[String]) -> Result<(), sqlx::Error> {
1293 let mut tx = self.pool.begin().await?;
1294
1295 sqlx::query("DELETE FROM bookmark_tags WHERE bookmark_id = ?1")
1296 .bind(id)
1297 .execute(&mut *tx)
1298 .await?;
1299
1300 for tag in tags {
1301 let tag = tag.trim();
1302 if tag.is_empty() {
1303 continue;
1304 }
1305 sqlx::query("INSERT OR IGNORE INTO bookmark_tags (bookmark_id, tag) VALUES (?1, ?2)")
1306 .bind(id)
1307 .bind(tag)
1308 .execute(&mut *tx)
1309 .await?;
1310 }
1311
1312 tx.commit().await?;
1313 Ok(())
1314 }
1315
1316 /// Get all tags for a bookmark.
1317 #[tracing::instrument(skip_all)]
1318 pub async fn get_tags(&self, id: BookmarkId) -> Result<Vec<String>, sqlx::Error> {
1319 let rows: Vec<(String,)> =
1320 sqlx::query_as("SELECT tag FROM bookmark_tags WHERE bookmark_id = ?1 ORDER BY tag")
1321 .bind(id)
1322 .fetch_all(&self.pool)
1323 .await?;
1324 Ok(rows.into_iter().map(|r| r.0).collect())
1325 }
1326
1327 /// Get all distinct tags across all bookmarks.
1328 #[tracing::instrument(skip_all)]
1329 pub async fn list_all_tags(&self) -> Result<Vec<String>, sqlx::Error> {
1330 let rows: Vec<(String,)> =
1331 sqlx::query_as("SELECT DISTINCT tag FROM bookmark_tags ORDER BY tag")
1332 .fetch_all(&self.pool)
1333 .await?;
1334 Ok(rows.into_iter().map(|r| r.0).collect())
1335 }
1336
1337 /// Count total bookmarks.
1338 #[tracing::instrument(skip_all)]
1339 pub async fn count(&self) -> Result<i64, sqlx::Error> {
1340 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM bookmarks")
1341 .fetch_one(&self.pool)
1342 .await?;
1343 Ok(row.0)
1344 }
1345 }
1346
1347 #[cfg(test)]
1348 mod tests {
1349 use super::*;
1350 use crate::id_types::BusserId;
1351 use chrono::{DateTime, Duration};
1352 use serde_json::json;
1353
1354 /// Connect to in-memory SQLite and run all migrations.
1355 async fn test_db() -> SqlitePool {
1356 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1357 sqlx::migrate!("../../migrations/sqlite")
1358 .run(&pool)
1359 .await
1360 .unwrap();
1361 pool
1362 }
1363
1364 /// Insert a feed with the given busser and name, return the created row.
1365 async fn make_feed(pool: &SqlitePool, busser_id: &str, name: &str) -> DbFeed {
1366 FeedsRepository::new(pool.clone())
1367 .create(CreateFeed {
1368 busser_id: BusserId::new(busser_id),
1369 name: name.to_string(),
1370 config: json!({}),
1371 })
1372 .await
1373 .unwrap()
1374 }
1375
1376 /// Insert a feed item with minimal required fields, return the created row.
1377 async fn make_item(pool: &SqlitePool, feed: &DbFeed, external_id: &str) -> DbFeedItem {
1378 make_item_at(pool, feed, external_id, Utc::now()).await
1379 }
1380
1381 /// Insert a feed item at a specific published_at time.
1382 async fn make_item_at(
1383 pool: &SqlitePool,
1384 feed: &DbFeed,
1385 external_id: &str,
1386 published_at: DateTime<Utc>,
1387 ) -> DbFeedItem {
1388 ItemsRepository::new(pool.clone())
1389 .upsert(CreateFeedItem {
1390 external_id: external_id.to_string(),
1391 feed_id: feed.id,
1392 busser_id: feed.busser_id.clone(),
1393 bite_author: "author".to_string(),
1394 bite_text: format!("Item {external_id}"),
1395 bite_secondary: None,
1396 bite_indicator: None,
1397 title: Some(format!("Title {external_id}")),
1398 body: None,
1399 url: None,
1400 media: vec![],
1401 published_at,
1402 source_name: "test".to_string(),
1403 score: None,
1404 tags: vec![],
1405 actions: vec![],
1406 })
1407 .await
1408 .unwrap()
1409 }
1410
1411 // ── FeedsRepository ───────────────────────────────────────────
1412
1413 #[tokio::test]
1414 async fn feeds_create_and_get() {
1415 let pool = test_db().await;
1416 let feed = make_feed(&pool, "rss", "My Feed").await;
1417
1418 assert_eq!(feed.busser_id, "rss");
1419 assert_eq!(feed.name, "My Feed");
1420 assert!(feed.enabled);
1421 assert!(feed.last_fetch.is_none());
1422
1423 let fetched = FeedsRepository::new(pool.clone())
1424 .get(feed.id)
1425 .await
1426 .unwrap()
1427 .expect("feed should exist");
1428 assert_eq!(fetched.id, feed.id);
1429 assert_eq!(fetched.name, "My Feed");
1430 }
1431
1432 #[tokio::test]
1433 async fn feeds_get_nonexistent_returns_none() {
1434 let pool = test_db().await;
1435 let result = FeedsRepository::new(pool.clone())
1436 .get(FeedId::new())
1437 .await
1438 .unwrap();
1439 assert!(result.is_none());
1440 }
1441
1442 #[tokio::test]
1443 async fn feeds_list_all_returns_created() {
1444 let pool = test_db().await;
1445 make_feed(&pool, "rss", "Beta Feed").await;
1446 make_feed(&pool, "rss", "Alpha Feed").await;
1447
1448 let all = FeedsRepository::new(pool.clone())
1449 .list_all()
1450 .await
1451 .unwrap();
1452 assert_eq!(all.len(), 2);
1453 assert_eq!(all[0].name, "Alpha Feed");
1454 assert_eq!(all[1].name, "Beta Feed");
1455 }
1456
1457 #[tokio::test]
1458 async fn feeds_get_by_busser_filters() {
1459 let pool = test_db().await;
1460 make_feed(&pool, "rss", "RSS Feed").await;
1461 make_feed(&pool, "hn", "HN Feed").await;
1462 make_feed(&pool, "rss", "RSS Feed 2").await;
1463
1464 let rss = FeedsRepository::new(pool.clone())
1465 .get_by_busser("rss")
1466 .await
1467 .unwrap();
1468 assert_eq!(rss.len(), 2);
1469 for f in &rss {
1470 assert_eq!(f.busser_id, "rss");
1471 }
1472
1473 let hn = FeedsRepository::new(pool.clone())
1474 .get_by_busser("hn")
1475 .await
1476 .unwrap();
1477 assert_eq!(hn.len(), 1);
1478 assert_eq!(hn[0].busser_id, "hn");
1479 }
1480
1481 #[tokio::test]
1482 async fn feeds_list_enabled_excludes_disabled() {
1483 let pool = test_db().await;
1484 let feeds_repo = FeedsRepository::new(pool.clone());
1485 let feed = make_feed(&pool, "rss", "Disabled Feed").await;
1486 make_feed(&pool, "rss", "Enabled Feed").await;
1487
1488 feeds_repo.set_enabled(feed.id, false).await.unwrap();
1489
1490 let enabled = feeds_repo.list_enabled().await.unwrap();
1491 assert_eq!(enabled.len(), 1);
1492 assert_eq!(enabled[0].name, "Enabled Feed");
1493 }
1494
1495 #[tokio::test]
1496 async fn feeds_set_enabled_toggle() {
1497 let pool = test_db().await;
1498 let feeds_repo = FeedsRepository::new(pool.clone());
1499 let feed = make_feed(&pool, "rss", "Toggle Feed").await;
1500 assert!(feed.enabled);
1501
1502 feeds_repo.set_enabled(feed.id, false).await.unwrap();
1503 let updated = feeds_repo.get(feed.id).await.unwrap().unwrap();
1504 assert!(!updated.enabled);
1505
1506 feeds_repo.set_enabled(feed.id, true).await.unwrap();
1507 let updated = feeds_repo.get(feed.id).await.unwrap().unwrap();
1508 assert!(updated.enabled);
1509 }
1510
1511 #[tokio::test]
1512 async fn feeds_update_last_fetch_sets_timestamp() {
1513 let pool = test_db().await;
1514 let feeds_repo = FeedsRepository::new(pool.clone());
1515 let feed = make_feed(&pool, "rss", "Fetch Feed").await;
1516 assert!(feed.last_fetch.is_none());
1517
1518 feeds_repo.update_last_fetch(feed.id).await.unwrap();
1519 let updated = feeds_repo.get(feed.id).await.unwrap().unwrap();
1520 assert!(updated.last_fetch.is_some());
1521 }
1522
1523 #[tokio::test]
1524 async fn feeds_delete_removes_feed() {
1525 let pool = test_db().await;
1526 let feeds_repo = FeedsRepository::new(pool.clone());
1527 let feed = make_feed(&pool, "rss", "Doomed Feed").await;
1528
1529 feeds_repo.delete(feed.id).await.unwrap();
1530 let result = feeds_repo.get(feed.id).await.unwrap();
1531 assert!(result.is_none());
1532 }
1533
1534 // ── ItemsRepository ───────────────────────────────────────────
1535
1536 #[tokio::test]
1537 async fn items_upsert_and_get() {
1538 let pool = test_db().await;
1539 let feed = make_feed(&pool, "rss", "Feed").await;
1540 let item = make_item(&pool, &feed, "rss:1").await;
1541
1542 assert_eq!(item.external_id, "rss:1");
1543 assert_eq!(item.feed_id, feed.id);
1544 assert_eq!(item.busser_id, "rss");
1545 assert_eq!(item.bite_author, "author");
1546 assert!(!item.is_read);
1547 assert!(!item.is_starred);
1548
1549 let fetched = ItemsRepository::new(pool.clone())
1550 .get(item.id)
1551 .await
1552 .unwrap()
1553 .expect("item should exist");
1554 assert_eq!(fetched.id, item.id);
1555 }
1556
1557 #[tokio::test]
1558 async fn items_upsert_conflict_updates() {
1559 let pool = test_db().await;
1560 let feed = make_feed(&pool, "rss", "Feed").await;
1561 let items_repo = ItemsRepository::new(pool.clone());
1562
1563 let first = make_item(&pool, &feed, "rss:dup").await;
1564
1565 let second = items_repo
1566 .upsert(CreateFeedItem {
1567 external_id: "rss:dup".to_string(),
1568 feed_id: feed.id,
1569 busser_id: feed.busser_id.clone(),
1570 bite_author: "updated_author".to_string(),
1571 bite_text: "Updated text".to_string(),
1572 bite_secondary: None,
1573 bite_indicator: None,
1574 title: Some("Updated Title".to_string()),
1575 body: None,
1576 url: None,
1577 media: vec![],
1578 published_at: Utc::now(),
1579 source_name: "test".to_string(),
1580 score: None,
1581 tags: vec![],
1582 actions: vec![],
1583 })
1584 .await
1585 .unwrap();
1586
1587 assert_eq!(first.id, second.id);
1588 assert_eq!(second.bite_author, "updated_author");
1589 assert_eq!(second.bite_text, "Updated text");
1590 assert_eq!(items_repo.count_all().await.unwrap(), 1);
1591 }
1592
1593 #[tokio::test]
1594 async fn items_get_by_external_id() {
1595 let pool = test_db().await;
1596 let feed = make_feed(&pool, "rss", "Feed").await;
1597 let item = make_item(&pool, &feed, "rss:ext1").await;
1598
1599 let found = ItemsRepository::new(pool.clone())
1600 .get_by_external_id("rss:ext1")
1601 .await
1602 .unwrap()
1603 .expect("should find by external_id");
1604 assert_eq!(found.id, item.id);
1605 }
1606
1607 #[tokio::test]
1608 async fn items_list_all_pagination() {
1609 let pool = test_db().await;
1610 let feed = make_feed(&pool, "rss", "Feed").await;
1611 let now = Utc::now();
1612
1613 make_item_at(&pool, &feed, "p:1", now - Duration::hours(3)).await;
1614 make_item_at(&pool, &feed, "p:2", now - Duration::hours(2)).await;
1615 make_item_at(&pool, &feed, "p:3", now - Duration::hours(1)).await;
1616
1617 let items_repo = ItemsRepository::new(pool.clone());
1618
1619 let page1 = items_repo.list_all(2, 0).await.unwrap();
1620 assert_eq!(page1.len(), 2);
1621 assert_eq!(page1[0].external_id, "p:3");
1622 assert_eq!(page1[1].external_id, "p:2");
1623
1624 let page2 = items_repo.list_all(2, 2).await.unwrap();
1625 assert_eq!(page2.len(), 1);
1626 assert_eq!(page2[0].external_id, "p:1");
1627 }
1628
1629 #[tokio::test]
1630 async fn items_list_by_feed_filters() {
1631 let pool = test_db().await;
1632 let feed_a = make_feed(&pool, "rss", "Feed A").await;
1633 let feed_b = make_feed(&pool, "rss", "Feed B").await;
1634
1635 make_item(&pool, &feed_a, "a:1").await;
1636 make_item(&pool, &feed_a, "a:2").await;
1637 make_item(&pool, &feed_b, "b:1").await;
1638
1639 let items_repo = ItemsRepository::new(pool.clone());
1640
1641 let a_items = items_repo.list_by_feed(feed_a.id, 100, 0).await.unwrap();
1642 assert_eq!(a_items.len(), 2);
1643 for i in &a_items {
1644 assert_eq!(i.feed_id, feed_a.id);
1645 }
1646
1647 let b_items = items_repo.list_by_feed(feed_b.id, 100, 0).await.unwrap();
1648 assert_eq!(b_items.len(), 1);
1649 assert_eq!(b_items[0].feed_id, feed_b.id);
1650 }
1651
1652 #[tokio::test]
1653 async fn items_list_by_busser_filters() {
1654 let pool = test_db().await;
1655 let feed_rss = make_feed(&pool, "rss", "RSS Feed").await;
1656 let feed_hn = make_feed(&pool, "hn", "HN Feed").await;
1657
1658 make_item(&pool, &feed_rss, "rss:1").await;
1659 make_item(&pool, &feed_hn, "hn:1").await;
1660 make_item(&pool, &feed_hn, "hn:2").await;
1661
1662 let items_repo = ItemsRepository::new(pool.clone());
1663
1664 let rss = items_repo.list_by_busser("rss", 100, 0).await.unwrap();
1665 assert_eq!(rss.len(), 1);
1666
1667 let hn = items_repo.list_by_busser("hn", 100, 0).await.unwrap();
1668 assert_eq!(hn.len(), 2);
1669 }
1670
1671 #[tokio::test]
1672 async fn items_list_unread_excludes_read() {
1673 let pool = test_db().await;
1674 let feed = make_feed(&pool, "rss", "Feed").await;
1675 let items_repo = ItemsRepository::new(pool.clone());
1676
1677 let item1 = make_item(&pool, &feed, "u:1").await;
1678 make_item(&pool, &feed, "u:2").await;
1679
1680 items_repo.mark_read(item1.id, true).await.unwrap();
1681
1682 let unread = items_repo.list_unread(100, 0).await.unwrap();
1683 assert_eq!(unread.len(), 1);
1684 assert_eq!(unread[0].external_id, "u:2");
1685 }
1686
1687 #[tokio::test]
1688 async fn items_list_starred_only() {
1689 let pool = test_db().await;
1690 let feed = make_feed(&pool, "rss", "Feed").await;
1691 let items_repo = ItemsRepository::new(pool.clone());
1692
1693 make_item(&pool, &feed, "s:1").await;
1694 let item2 = make_item(&pool, &feed, "s:2").await;
1695
1696 items_repo.mark_starred(item2.id, true).await.unwrap();
1697
1698 let starred = items_repo.list_starred(100, 0).await.unwrap();
1699 assert_eq!(starred.len(), 1);
1700 assert_eq!(starred[0].external_id, "s:2");
1701 }
1702
1703 #[tokio::test]
1704 async fn items_mark_read_and_unread() {
1705 let pool = test_db().await;
1706 let feed = make_feed(&pool, "rss", "Feed").await;
1707 let items_repo = ItemsRepository::new(pool.clone());
1708 let item = make_item(&pool, &feed, "r:1").await;
1709
1710 assert!(!item.is_read);
1711
1712 items_repo.mark_read(item.id, true).await.unwrap();
1713 let updated = items_repo.get(item.id).await.unwrap().unwrap();
1714 assert!(updated.is_read);
1715
1716 items_repo.mark_read(item.id, false).await.unwrap();
1717 let updated = items_repo.get(item.id).await.unwrap().unwrap();
1718 assert!(!updated.is_read);
1719 }
1720
1721 #[tokio::test]
1722 async fn items_mark_starred_and_unstarred() {
1723 let pool = test_db().await;
1724 let feed = make_feed(&pool, "rss", "Feed").await;
1725 let items_repo = ItemsRepository::new(pool.clone());
1726 let item = make_item(&pool, &feed, "st:1").await;
1727
1728 assert!(!item.is_starred);
1729
1730 items_repo.mark_starred(item.id, true).await.unwrap();
1731 let updated = items_repo.get(item.id).await.unwrap().unwrap();
1732 assert!(updated.is_starred);
1733
1734 items_repo.mark_starred(item.id, false).await.unwrap();
1735 let updated = items_repo.get(item.id).await.unwrap().unwrap();
1736 assert!(!updated.is_starred);
1737 }
1738
1739 #[tokio::test]
1740 async fn items_count_all_and_unread() {
1741 let pool = test_db().await;
1742 let feed = make_feed(&pool, "rss", "Feed").await;
1743 let items_repo = ItemsRepository::new(pool.clone());
1744
1745 let item1 = make_item(&pool, &feed, "c:1").await;
1746 make_item(&pool, &feed, "c:2").await;
1747 make_item(&pool, &feed, "c:3").await;
1748
1749 assert_eq!(items_repo.count_all().await.unwrap(), 3);
1750 assert_eq!(items_repo.count_unread().await.unwrap(), 3);
1751
1752 items_repo.mark_read(item1.id, true).await.unwrap();
1753 assert_eq!(items_repo.count_all().await.unwrap(), 3);
1754 assert_eq!(items_repo.count_unread().await.unwrap(), 2);
1755 }
1756
1757 #[tokio::test]
1758 async fn items_delete_by_feed_removes_items() {
1759 let pool = test_db().await;
1760 let feed = make_feed(&pool, "rss", "Feed").await;
1761 let items_repo = ItemsRepository::new(pool.clone());
1762
1763 make_item(&pool, &feed, "d:1").await;
1764 make_item(&pool, &feed, "d:2").await;
1765 assert_eq!(items_repo.count_all().await.unwrap(), 2);
1766
1767 let removed = items_repo.delete_by_feed(feed.id).await.unwrap();
1768 assert_eq!(removed, 2);
1769 assert_eq!(items_repo.count_all().await.unwrap(), 0);
1770 }
1771
1772 #[tokio::test]
1773 async fn items_delete_stale_read() {
1774 let pool = test_db().await;
1775 let feed = make_feed(&pool, "rss", "Feed").await;
1776 let items_repo = ItemsRepository::new(pool.clone());
1777 let now = Utc::now();
1778
1779 // Old read item (should be deleted)
1780 let old_read = make_item_at(&pool, &feed, "stale:1", now - Duration::days(60)).await;
1781 items_repo.mark_read(old_read.id, true).await.unwrap();
1782
1783 // Old read + starred item (should be preserved)
1784 let old_starred = make_item_at(&pool, &feed, "stale:2", now - Duration::days(60)).await;
1785 items_repo.mark_read(old_starred.id, true).await.unwrap();
1786 items_repo.mark_starred(old_starred.id, true).await.unwrap();
1787
1788 // Old unread item (should be preserved)
1789 make_item_at(&pool, &feed, "stale:3", now - Duration::days(60)).await;
1790
1791 // Recent read item (should be preserved)
1792 let recent_read = make_item_at(&pool, &feed, "stale:4", now - Duration::days(5)).await;
1793 items_repo.mark_read(recent_read.id, true).await.unwrap();
1794
1795 // Recent unread item (should be preserved)
1796 make_item_at(&pool, &feed, "stale:5", now - Duration::days(5)).await;
1797
1798 assert_eq!(items_repo.count_all().await.unwrap(), 5);
1799
1800 let cutoff = now - Duration::days(30);
1801 let deleted = items_repo.delete_stale_read(cutoff).await.unwrap();
1802 assert_eq!(deleted, 1); // only old_read
1803
1804 assert_eq!(items_repo.count_all().await.unwrap(), 4);
1805
1806 // Verify the right items remain
1807 assert!(items_repo.get(old_starred.id).await.unwrap().is_some());
1808 assert!(items_repo.get(recent_read.id).await.unwrap().is_some());
1809 assert!(items_repo.get(old_read.id).await.unwrap().is_none());
1810 }
1811
1812 // ── Feed Health ──────────────────────────────────────────────
1813
1814 #[tokio::test]
1815 async fn health_success_resets_counter() {
1816 let pool = test_db().await;
1817 let feeds_repo = FeedsRepository::new(pool.clone());
1818 let feed = make_feed(&pool, "rss", "Feed").await;
1819
1820 // Simulate two failures
1821 feeds_repo.record_fetch_failure(feed.id, "timeout").await.unwrap();
1822 feeds_repo.record_fetch_failure(feed.id, "timeout").await.unwrap();
1823
1824 let f = feeds_repo.get(feed.id).await.unwrap().unwrap();
1825 assert_eq!(f.consecutive_failures, 2);
1826 assert_eq!(f.last_error.as_deref(), Some("timeout"));
1827
1828 // Then a success
1829 feeds_repo.record_fetch_success(feed.id).await.unwrap();
1830 let f = feeds_repo.get(feed.id).await.unwrap().unwrap();
1831 assert_eq!(f.consecutive_failures, 0);
1832 assert!(f.last_error.is_none());
1833 assert!(f.last_success_at.is_some());
1834 }
1835
1836 #[tokio::test]
1837 async fn health_failure_increments() {
1838 let pool = test_db().await;
1839 let feeds_repo = FeedsRepository::new(pool.clone());
1840 let feed = make_feed(&pool, "rss", "Feed").await;
1841
1842 feeds_repo.record_fetch_failure(feed.id, "dns error").await.unwrap();
1843 let f = feeds_repo.get(feed.id).await.unwrap().unwrap();
1844 assert_eq!(f.consecutive_failures, 1);
1845 assert_eq!(f.last_error.as_deref(), Some("dns error"));
1846
1847 feeds_repo.record_fetch_failure(feed.id, "connection refused").await.unwrap();
1848 let f = feeds_repo.get(feed.id).await.unwrap().unwrap();
1849 assert_eq!(f.consecutive_failures, 2);
1850 assert_eq!(f.last_error.as_deref(), Some("connection refused"));
1851 }
1852
1853 #[tokio::test]
1854 async fn health_defaults_on_new_feed() {
1855 let pool = test_db().await;
1856 let feed = make_feed(&pool, "rss", "Feed").await;
1857 assert_eq!(feed.consecutive_failures, 0);
1858 assert!(feed.last_error.is_none());
1859 assert!(feed.last_success_at.is_none());
1860 }
1861
1862 // ── FTS5 Search ───────────────────────────────────────────────
1863
1864 #[tokio::test]
1865 async fn fts5_search_matches_title() {
1866 let pool = test_db().await;
1867 let feed = make_feed(&pool, "rss", "Feed").await;
1868 let items_repo = ItemsRepository::new(pool.clone());
1869
1870 items_repo
1871 .upsert(CreateFeedItem {
1872 external_id: "fts:1".to_string(),
1873 feed_id: feed.id,
1874 busser_id: BusserId::new("rss"),
1875 bite_author: "author".to_string(),
1876 bite_text: "Some bite text".to_string(),
1877 bite_secondary: None,
1878 bite_indicator: None,
1879 title: Some("Rust programming language".to_string()),
1880 body: Some("Body content here".to_string()),
1881 url: None,
1882 media: vec![],
1883 published_at: Utc::now(),
1884 source_name: "test".to_string(),
1885 score: None,
1886 tags: vec![],
1887 actions: vec![],
1888 })
1889 .await
1890 .unwrap();
1891
1892 let results = items_repo
1893 .list_search("Rust", None, false, false, 10, 0)
1894 .await
1895 .unwrap();
1896 assert_eq!(results.len(), 1);
1897 assert_eq!(results[0].external_id, "fts:1");
1898 }
1899
1900 #[tokio::test]
1901 async fn fts5_search_matches_body() {
1902 let pool = test_db().await;
1903 let feed = make_feed(&pool, "rss", "Feed").await;
1904 let items_repo = ItemsRepository::new(pool.clone());
1905
1906 items_repo
1907 .upsert(CreateFeedItem {
1908 external_id: "fts:2".to_string(),
1909 feed_id: feed.id,
1910 busser_id: BusserId::new("rss"),
1911 bite_author: "author".to_string(),
1912 bite_text: "Bite".to_string(),
1913 bite_secondary: None,
1914 bite_indicator: None,
1915 title: Some("Title".to_string()),
1916 body: Some("SQLite full text search is powerful".to_string()),
1917 url: None,
1918 media: vec![],
1919 published_at: Utc::now(),
1920 source_name: "test".to_string(),
1921 score: None,
1922 tags: vec![],
1923 actions: vec![],
1924 })
1925 .await
1926 .unwrap();
1927
1928 let results = items_repo
1929 .list_search("powerful", None, false, false, 10, 0)
1930 .await
1931 .unwrap();
1932 assert_eq!(results.len(), 1);
1933 }
1934
1935 #[tokio::test]
1936 async fn fts5_search_matches_bite_text() {
1937 let pool = test_db().await;
1938 let feed = make_feed(&pool, "rss", "Feed").await;
1939 let items_repo = ItemsRepository::new(pool.clone());
1940
1941 items_repo
1942 .upsert(CreateFeedItem {
1943 external_id: "fts:3".to_string(),
1944 feed_id: feed.id,
1945 busser_id: BusserId::new("rss"),
1946 bite_author: "author".to_string(),
1947 bite_text: "Breaking news about databases".to_string(),
1948 bite_secondary: None,
1949 bite_indicator: None,
1950 title: None,
1951 body: None,
1952 url: None,
1953 media: vec![],
1954 published_at: Utc::now(),
1955 source_name: "test".to_string(),
1956 score: None,
1957 tags: vec![],
1958 actions: vec![],
1959 })
1960 .await
1961 .unwrap();
1962
1963 let results = items_repo
1964 .list_search("databases", None, false, false, 10, 0)
1965 .await
1966 .unwrap();
1967 assert_eq!(results.len(), 1);
1968 }
1969
1970 #[tokio::test]
1971 async fn fts5_multi_word_query() {
1972 let pool = test_db().await;
1973 let feed = make_feed(&pool, "rss", "Feed").await;
1974 let items_repo = ItemsRepository::new(pool.clone());
1975
1976 items_repo
1977 .upsert(CreateFeedItem {
1978 external_id: "fts:4".to_string(),
1979 feed_id: feed.id,
1980 busser_id: BusserId::new("rss"),
1981 bite_author: "author".to_string(),
1982 bite_text: "bite".to_string(),
1983 bite_secondary: None,
1984 bite_indicator: None,
1985 title: Some("Rust async programming guide".to_string()),
1986 body: None,
1987 url: None,
1988 media: vec![],
1989 published_at: Utc::now(),
1990 source_name: "test".to_string(),
1991 score: None,
1992 tags: vec![],
1993 actions: vec![],
1994 })
1995 .await
1996 .unwrap();
1997
1998 let results = items_repo
1999 .list_search("Rust programming", None, false, false, 10, 0)
2000 .await
2001 .unwrap();
2002 assert_eq!(results.len(), 1);
2003 }
2004
2005 #[tokio::test]
2006 async fn fts5_special_characters_handled() {
2007 let pool = test_db().await;
2008 let feed = make_feed(&pool, "rss", "Feed").await;
2009 let items_repo = ItemsRepository::new(pool.clone());
2010
2011 make_item(&pool, &feed, "fts:5").await;
2012
2013 // Search with FTS5 operators should be safely escaped
2014 let results = items_repo
2015 .list_search("AND OR NOT", None, false, false, 10, 0)
2016 .await
2017 .unwrap();
2018 assert_eq!(results.len(), 0); // no match but no crash
2019 }
2020
2021 // ── sanitize_fts_query unit tests ────────────────────────────
2022
2023 #[test]
2024 fn fts5_sanitize_basic_words() {
2025 assert_eq!(sanitize_fts_query("hello world"), r#""hello" "world""#);
2026 }
2027
2028 #[test]
2029 fn fts5_sanitize_operators_quoted() {
2030 // AND, OR, NOT become quoted literal strings
2031 assert_eq!(
2032 sanitize_fts_query("AND OR NOT"),
2033 r#""AND" "OR" "NOT""#,
2034 );
2035 }
2036
2037 #[test]
2038 fn fts5_sanitize_near_operator() {
2039 assert_eq!(sanitize_fts_query("NEAR"), r#""NEAR""#);
2040 assert_eq!(sanitize_fts_query("NEAR/3"), r#""NEAR/3""#);
2041 assert_eq!(sanitize_fts_query("word NEAR/5 other"), r#""word" "NEAR/5" "other""#);
2042 }
2043
2044 #[test]
2045 fn fts5_sanitize_column_prefix() {
2046 // column:term syntax — colon inside quotes is literal
2047 assert_eq!(sanitize_fts_query("title:rust"), r#""title:rust""#);
2048 assert_eq!(
2049 sanitize_fts_query("body:hello title:world"),
2050 r#""body:hello" "title:world""#,
2051 );
2052 }
2053
2054 #[test]
2055 fn fts5_sanitize_caret_stripped() {
2056 // ^ is beginning-of-column marker, must be stripped
2057 assert_eq!(sanitize_fts_query("^hello"), r#""hello""#);
2058 assert_eq!(sanitize_fts_query("^hello world"), r#""hello" "world""#);
2059 // Multiple carets
2060 assert_eq!(sanitize_fts_query("^^hello"), r#""hello""#);
2061 }
2062
2063 #[test]
2064 fn fts5_sanitize_star_stripped() {
2065 // * is prefix query suffix, must be stripped
2066 assert_eq!(sanitize_fts_query("hello*"), r#""hello""#);
2067 assert_eq!(sanitize_fts_query("hel*"), r#""hel""#);
2068 // Multiple stars
2069 assert_eq!(sanitize_fts_query("hello**"), r#""hello""#);
2070 }
2071
2072 #[test]
2073 fn fts5_sanitize_caret_and_star_combined() {
2074 assert_eq!(sanitize_fts_query("^hello*"), r#""hello""#);
2075 assert_eq!(sanitize_fts_query("^*"), "");
2076 }
2077
2078 #[test]
2079 fn fts5_sanitize_bare_special_chars_dropped() {
2080 // Bare `^`, `*`, `^*` should produce empty string (dropped)
2081 assert_eq!(sanitize_fts_query("^"), "");
2082 assert_eq!(sanitize_fts_query("*"), "");
2083 assert_eq!(sanitize_fts_query("^ word"), r#""word""#);
2084 assert_eq!(sanitize_fts_query("* word"), r#""word""#);
2085 }
2086
2087 #[test]
2088 fn fts5_sanitize_embedded_quotes() {
2089 // Embedded double quotes are escaped as ""
2090 // Input: say "hi" -> tokens: [say] ["hi"]
2091 // "hi" has two quotes -> each becomes "" -> ""hi"" -> wrapped: """hi"""
2092 assert_eq!(
2093 sanitize_fts_query("say \"hi\""),
2094 "\"say\" \"\"\"hi\"\"\"",
2095 );
2096 }
2097
2098 #[test]
2099 fn fts5_sanitize_empty_and_whitespace() {
2100 assert_eq!(sanitize_fts_query(""), "");
2101 assert_eq!(sanitize_fts_query(" "), "");
2102 }
2103
2104 #[test]
2105 fn fts5_sanitize_mixed_special_syntax() {
2106 // Realistic adversarial input combining multiple FTS5 features
2107 assert_eq!(
2108 sanitize_fts_query("^title:rust* NEAR/3 OR body:hello*"),
2109 r#""title:rust" "NEAR/3" "OR" "body:hello""#,
2110 );
2111 }
2112
2113 // ── FTS5 integration tests for hardened sanitization ─────────
2114
2115 #[tokio::test]
2116 async fn fts5_near_operator_does_not_crash() {
2117 let pool = test_db().await;
2118 let feed = make_feed(&pool, "rss", "Feed").await;
2119 let items_repo = ItemsRepository::new(pool.clone());
2120 make_item(&pool, &feed, "fts:near1").await;
2121
2122 // NEAR and NEAR/N should be safely quoted
2123 let results = items_repo
2124 .list_search("NEAR", None, false, false, 10, 0)
2125 .await
2126 .unwrap();
2127 assert_eq!(results.len(), 0);
2128
2129 let results = items_repo
2130 .list_search("word NEAR/3 other", None, false, false, 10, 0)
2131 .await
2132 .unwrap();
2133 assert_eq!(results.len(), 0);
2134 }
2135
2136 #[tokio::test]
2137 async fn fts5_column_prefix_does_not_target_column() {
2138 let pool = test_db().await;
2139 let feed = make_feed(&pool, "rss", "Feed").await;
2140 let items_repo = ItemsRepository::new(pool.clone());
2141
2142 items_repo
2143 .upsert(CreateFeedItem {
2144 external_id: "fts:col1".to_string(),
2145 feed_id: feed.id,
2146 busser_id: BusserId::new("rss"),
2147 bite_author: "author".to_string(),
2148 bite_text: "bite".to_string(),
2149 bite_secondary: None,
2150 bite_indicator: None,
2151 title: Some("rust programming".to_string()),
2152 body: Some("unrelated body".to_string()),
2153 url: None,
2154 media: vec![],
2155 published_at: Utc::now(),
2156 source_name: "test".to_string(),
2157 score: None,
2158 tags: vec![],
2159 actions: vec![],
2160 })
2161 .await
2162 .unwrap();
2163
2164 // "title:rust" should NOT act as a column filter — it should search
2165 // for the literal string "title:rust" which won't match anything
2166 let results = items_repo
2167 .list_search("title:rust", None, false, false, 10, 0)
2168 .await
2169 .unwrap();
2170 assert_eq!(results.len(), 0);
2171 }
2172
2173 #[tokio::test]
2174 async fn fts5_caret_prefix_does_not_crash() {
2175 let pool = test_db().await;
2176 let feed = make_feed(&pool, "rss", "Feed").await;
2177 let items_repo = ItemsRepository::new(pool.clone());
2178 make_item(&pool, &feed, "fts:caret1").await;
2179
2180 let results = items_repo
2181 .list_search("^Title", None, false, false, 10, 0)
2182 .await
2183 .unwrap();
2184 // Caret is stripped, so this searches for "Title" which matches our items
2185 assert!(!results.is_empty());
2186 }
2187
2188 #[tokio::test]
2189 async fn fts5_star_suffix_does_not_crash() {
2190 let pool = test_db().await;
2191 let feed = make_feed(&pool, "rss", "Feed").await;
2192 let items_repo = ItemsRepository::new(pool.clone());
2193 make_item(&pool, &feed, "fts:star1").await;
2194
2195 let results = items_repo
2196 .list_search("Titl*", None, false, false, 10, 0)
2197 .await
2198 .unwrap();
2199 // Star is stripped, so this searches for "Titl" which won't match "Title"
2200 // (exact token match, not prefix)
2201 assert_eq!(results.len(), 0);
2202 }
2203
2204 #[tokio::test]
2205 async fn fts5_bare_star_and_caret_safe() {
2206 let pool = test_db().await;
2207 let feed = make_feed(&pool, "rss", "Feed").await;
2208 let items_repo = ItemsRepository::new(pool.clone());
2209 make_item(&pool, &feed, "fts:bare1").await;
2210
2211 // Bare special chars should not crash
2212 let results = items_repo
2213 .list_search("*", None, false, false, 10, 0)
2214 .await
2215 .unwrap();
2216 assert_eq!(results.len(), 0);
2217
2218 let results = items_repo
2219 .list_search("^", None, false, false, 10, 0)
2220 .await
2221 .unwrap();
2222 assert_eq!(results.len(), 0);
2223
2224 let results = items_repo
2225 .list_search("^*", None, false, false, 10, 0)
2226 .await
2227 .unwrap();
2228 assert_eq!(results.len(), 0);
2229 }
2230
2231 #[tokio::test]
2232 async fn fts5_pagination_works() {
2233 let pool = test_db().await;
2234 let feed = make_feed(&pool, "rss", "Feed").await;
2235 let items_repo = ItemsRepository::new(pool.clone());
2236
2237 for i in 0..5 {
2238 items_repo
2239 .upsert(CreateFeedItem {
2240 external_id: format!("fts:page:{i}"),
2241 feed_id: feed.id,
2242 busser_id: BusserId::new("rss"),
2243 bite_author: "author".to_string(),
2244 bite_text: "bite".to_string(),
2245 bite_secondary: None,
2246 bite_indicator: None,
2247 title: Some("Searchable title here".to_string()),
2248 body: None,
2249 url: None,
2250 media: vec![],
2251 published_at: Utc::now() - Duration::hours(i),
2252 source_name: "test".to_string(),
2253 score: None,
2254 tags: vec![],
2255 actions: vec![],
2256 })
2257 .await
2258 .unwrap();
2259 }
2260
2261 let page1 = items_repo
2262 .list_search("Searchable", None, false, false, 2, 0)
2263 .await
2264 .unwrap();
2265 assert_eq!(page1.len(), 2);
2266
2267 let page2 = items_repo
2268 .list_search("Searchable", None, false, false, 2, 2)
2269 .await
2270 .unwrap();
2271 assert_eq!(page2.len(), 2);
2272
2273 let page3 = items_repo
2274 .list_search("Searchable", None, false, false, 2, 4)
2275 .await
2276 .unwrap();
2277 assert_eq!(page3.len(), 1);
2278 }
2279
2280 // ── TagsRepository ────────────────────────────────────────────
2281
2282 #[tokio::test]
2283 async fn tags_set_and_get() {
2284 let pool = test_db().await;
2285 let feed = make_feed(&pool, "rss", "Feed").await;
2286 let tags_repo = TagsRepository::new(pool.clone());
2287
2288 tags_repo
2289 .set_tags(feed.id, &["tech".into(), "rust".into()])
2290 .await
2291 .unwrap();
2292
2293 let tags = tags_repo.get_tags(feed.id).await.unwrap();
2294 assert_eq!(tags, vec!["rust", "tech"]); // alphabetical
2295 }
2296
2297 #[tokio::test]
2298 async fn tags_set_idempotent() {
2299 let pool = test_db().await;
2300 let feed = make_feed(&pool, "rss", "Feed").await;
2301 let tags_repo = TagsRepository::new(pool.clone());
2302
2303 tags_repo.set_tags(feed.id, &["a".into(), "b".into()]).await.unwrap();
2304 tags_repo.set_tags(feed.id, &["c".into()]).await.unwrap();
2305
2306 let tags = tags_repo.get_tags(feed.id).await.unwrap();
2307 assert_eq!(tags, vec!["c"]);
2308 }
2309
2310 #[tokio::test]
2311 async fn tags_add_and_remove() {
2312 let pool = test_db().await;
2313 let feed = make_feed(&pool, "rss", "Feed").await;
2314 let tags_repo = TagsRepository::new(pool.clone());
2315
2316 tags_repo.add_tag(feed.id, "news").await.unwrap();
2317 tags_repo.add_tag(feed.id, "tech").await.unwrap();
2318 tags_repo.add_tag(feed.id, "news").await.unwrap(); // duplicate
2319
2320 let tags = tags_repo.get_tags(feed.id).await.unwrap();
2321 assert_eq!(tags, vec!["news", "tech"]);
2322
2323 tags_repo.remove_tag(feed.id, "news").await.unwrap();
2324 let tags = tags_repo.get_tags(feed.id).await.unwrap();
2325 assert_eq!(tags, vec!["tech"]);
2326 }
2327
2328 #[tokio::test]
2329 async fn tags_list_all_tags() {
2330 let pool = test_db().await;
2331 let feed_a = make_feed(&pool, "rss", "A").await;
2332 let feed_b = make_feed(&pool, "hn", "B").await;
2333 let tags_repo = TagsRepository::new(pool.clone());
2334
2335 tags_repo.set_tags(feed_a.id, &["tech".into(), "news".into()]).await.unwrap();
2336 tags_repo.set_tags(feed_b.id, &["tech".into(), "fun".into()]).await.unwrap();
2337
2338 let all = tags_repo.list_all_tags().await.unwrap();
2339 assert_eq!(all, vec!["fun", "news", "tech"]);
2340 }
2341
2342 #[tokio::test]
2343 async fn tags_cascade_on_feed_delete() {
2344 let pool = test_db().await;
2345 let feed = make_feed(&pool, "rss", "Feed").await;
2346 let tags_repo = TagsRepository::new(pool.clone());
2347 let feeds_repo = FeedsRepository::new(pool.clone());
2348
2349 tags_repo.set_tags(feed.id, &["x".into()]).await.unwrap();
2350 feeds_repo.delete(feed.id).await.unwrap();
2351
2352 let tags = tags_repo.get_tags(feed.id).await.unwrap();
2353 assert!(tags.is_empty());
2354 }
2355
2356 #[tokio::test]
2357 async fn tags_feed_ids_with_tags() {
2358 let pool = test_db().await;
2359 let feed_a = make_feed(&pool, "rss", "A").await;
2360 let feed_b = make_feed(&pool, "hn", "B").await;
2361 let _feed_c = make_feed(&pool, "reddit", "C").await;
2362 let tags_repo = TagsRepository::new(pool.clone());
2363
2364 tags_repo.set_tags(feed_a.id, &["tech".into()]).await.unwrap();
2365 tags_repo.set_tags(feed_b.id, &["tech".into(), "news".into()]).await.unwrap();
2366
2367 let ids = tags_repo.feed_ids_with_tags(&["tech".into()]).await.unwrap();
2368 assert_eq!(ids.len(), 2);
2369 assert!(ids.contains(&feed_a.id));
2370 assert!(ids.contains(&feed_b.id));
2371 }
2372
2373 // ── StateRepository ───────────────────────────────────────────
2374
2375 #[tokio::test]
2376 async fn state_set_and_get() {
2377 let pool = test_db().await;
2378 let state = StateRepository::new(pool.clone());
2379
2380 state.set("rss", "cursor", "abc123").await.unwrap();
2381 let val = state.get("rss", "cursor").await.unwrap();
2382 assert_eq!(val, Some("abc123".to_string()));
2383 }
2384
2385 #[tokio::test]
2386 async fn state_set_overwrites_value() {
2387 let pool = test_db().await;
2388 let state = StateRepository::new(pool.clone());
2389
2390 state.set("rss", "cursor", "first").await.unwrap();
2391 state.set("rss", "cursor", "second").await.unwrap();
2392
2393 let val = state.get("rss", "cursor").await.unwrap();
2394 assert_eq!(val, Some("second".to_string()));
2395 }
2396
2397 #[tokio::test]
2398 async fn state_get_missing_returns_none() {
2399 let pool = test_db().await;
2400 let state = StateRepository::new(pool.clone());
2401
2402 let val = state.get("rss", "nonexistent").await.unwrap();
2403 assert!(val.is_none());
2404 }
2405
2406 #[tokio::test]
2407 async fn state_delete_removes_key() {
2408 let pool = test_db().await;
2409 let state = StateRepository::new(pool.clone());
2410
2411 state.set("rss", "token", "secret").await.unwrap();
2412 state.delete("rss", "token").await.unwrap();
2413
2414 let val = state.get("rss", "token").await.unwrap();
2415 assert!(val.is_none());
2416 }
2417
2418 #[tokio::test]
2419 async fn state_delete_all_clears_busser() {
2420 let pool = test_db().await;
2421 let state = StateRepository::new(pool.clone());
2422
2423 state.set("rss", "key1", "val1").await.unwrap();
2424 state.set("rss", "key2", "val2").await.unwrap();
2425 state.set("rss", "key3", "val3").await.unwrap();
2426
2427 let removed = state.delete_all("rss").await.unwrap();
2428 assert_eq!(removed, 3);
2429
2430 let remaining = state.list("rss").await.unwrap();
2431 assert!(remaining.is_empty());
2432 }
2433
2434 // ── ConfigRepository ─────────────────────────────────────────
2435
2436 #[tokio::test]
2437 async fn config_set_and_get() {
2438 let pool = test_db().await;
2439 let config = ConfigRepository::new(pool);
2440 config.set("theme", "dark").await.unwrap();
2441 let val = config.get("theme").await.unwrap();
2442 assert_eq!(val, Some("dark".to_string()));
2443 }
2444
2445 #[tokio::test]
2446 async fn config_set_overwrites() {
2447 let pool = test_db().await;
2448 let config = ConfigRepository::new(pool);
2449 config.set("lang", "en").await.unwrap();
2450 config.set("lang", "fr").await.unwrap();
2451 let val = config.get("lang").await.unwrap();
2452 assert_eq!(val, Some("fr".to_string()));
2453 }
2454
2455 #[tokio::test]
2456 async fn config_get_missing_returns_none() {
2457 let pool = test_db().await;
2458 let config = ConfigRepository::new(pool);
2459 let val = config.get("nonexistent").await.unwrap();
2460 assert!(val.is_none());
2461 }
2462
2463 #[tokio::test]
2464 async fn config_delete() {
2465 let pool = test_db().await;
2466 let config = ConfigRepository::new(pool);
2467 config.set("key", "value").await.unwrap();
2468 config.delete("key").await.unwrap();
2469 let val = config.get("key").await.unwrap();
2470 assert!(val.is_none());
2471 }
2472
2473 // ── FeedsRepository (additional) ─────────────────────────────
2474
2475 #[tokio::test]
2476 async fn feeds_update_name() {
2477 let pool = test_db().await;
2478 let feed = make_feed(&pool, "rss", "Old Name").await;
2479 let feeds = FeedsRepository::new(pool);
2480 feeds.update_name(feed.id, "New Name").await.unwrap();
2481 let updated = feeds.get(feed.id).await.unwrap().unwrap();
2482 assert_eq!(updated.name, "New Name");
2483 }
2484
2485 #[tokio::test]
2486 async fn feeds_update_config() {
2487 let pool = test_db().await;
2488 let feed = make_feed(&pool, "rss", "Feed").await;
2489 let feeds = FeedsRepository::new(pool);
2490 feeds.update_config(feed.id, r#"{"url":"https://example.com/rss"}"#).await.unwrap();
2491 let updated = feeds.get(feed.id).await.unwrap().unwrap();
2492 assert_eq!(updated.config, r#"{"url":"https://example.com/rss"}"#);
2493 }
2494
2495 #[tokio::test]
2496 async fn feeds_record_fetch_success_resets_failures() {
2497 let pool = test_db().await;
2498 let feed = make_feed(&pool, "rss", "Feed").await;
2499 let feeds = FeedsRepository::new(pool);
2500 // Record some failures first
2501 feeds.record_fetch_failure(feed.id, "timeout").await.unwrap();
2502 feeds.record_fetch_failure(feed.id, "dns error").await.unwrap();
2503 let failed = feeds.get(feed.id).await.unwrap().unwrap();
2504 assert_eq!(failed.consecutive_failures, 2);
2505 assert_eq!(failed.last_error.as_deref(), Some("dns error"));
2506 // Record success
2507 feeds.record_fetch_success(feed.id).await.unwrap();
2508 let ok = feeds.get(feed.id).await.unwrap().unwrap();
2509 assert_eq!(ok.consecutive_failures, 0);
2510 assert!(ok.last_error.is_none());
2511 assert!(ok.last_success_at.is_some());
2512 }
2513
2514 #[tokio::test]
2515 async fn feeds_record_fetch_failure_increments() {
2516 let pool = test_db().await;
2517 let feed = make_feed(&pool, "rss", "Feed").await;
2518 let feeds = FeedsRepository::new(pool);
2519 feeds.record_fetch_failure(feed.id, "err1").await.unwrap();
2520 let f = feeds.get(feed.id).await.unwrap().unwrap();
2521 assert_eq!(f.consecutive_failures, 1);
2522 assert_eq!(f.last_error.as_deref(), Some("err1"));
2523 feeds.record_fetch_failure(feed.id, "err2").await.unwrap();
2524 let f = feeds.get(feed.id).await.unwrap().unwrap();
2525 assert_eq!(f.consecutive_failures, 2);
2526 assert_eq!(f.last_error.as_deref(), Some("err2"));
2527 }
2528
2529 // ── record_fetch_failure_structured ────────────────────────────
2530
2531 #[tokio::test]
2532 async fn structured_failure_rate_limited_no_increment() {
2533 let pool = test_db().await;
2534 let feed = make_feed(&pool, "rss", "Feed").await;
2535 let feeds = FeedsRepository::new(pool);
2536 let err = StructuredError::rate_limited("429 Too Many Requests", 120);
2537 let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2538 assert!(!tripped);
2539 let f = feeds.get(feed.id).await.unwrap().unwrap();
2540 assert_eq!(f.consecutive_failures, 0, "rate_limited should not increment failures");
2541 assert!(f.last_error.is_some(), "error should be stored");
2542 // Verify stored as JSON
2543 let stored = StructuredError::from_last_error(f.last_error.as_ref().unwrap());
2544 assert_eq!(stored.category, ErrorCategory::RateLimited);
2545 assert_eq!(stored.retry_after_secs, Some(120));
2546 }
2547
2548 #[tokio::test]
2549 async fn structured_failure_auth_immediate_circuit_break() {
2550 let pool = test_db().await;
2551 let feed = make_feed(&pool, "rss", "Feed").await;
2552 let feeds = FeedsRepository::new(pool);
2553 let err = StructuredError::new(ErrorCategory::Auth, "401 Unauthorized");
2554 let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2555 assert!(tripped, "auth error should immediately trip circuit breaker");
2556 let f = feeds.get(feed.id).await.unwrap().unwrap();
2557 assert!(f.circuit_broken);
2558 assert_eq!(f.consecutive_failures, 1);
2559 }
2560
2561 #[tokio::test]
2562 async fn structured_failure_config_immediate_circuit_break() {
2563 let pool = test_db().await;
2564 let feed = make_feed(&pool, "rss", "Feed").await;
2565 let feeds = FeedsRepository::new(pool);
2566 let err = StructuredError::new(ErrorCategory::Config, "404 Not Found");
2567 let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2568 assert!(tripped, "config error should immediately trip circuit breaker");
2569 let f = feeds.get(feed.id).await.unwrap().unwrap();
2570 assert!(f.circuit_broken);
2571 }
2572
2573 #[tokio::test]
2574 async fn structured_failure_transient_increments_normally() {
2575 let pool = test_db().await;
2576 let feed = make_feed(&pool, "rss", "Feed").await;
2577 let feeds = FeedsRepository::new(pool);
2578 let err = StructuredError::new(ErrorCategory::Transient, "HTTP 503");
2579 let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2580 assert!(!tripped);
2581 let f = feeds.get(feed.id).await.unwrap().unwrap();
2582 assert_eq!(f.consecutive_failures, 1);
2583 assert!(!f.circuit_broken);
2584 }
2585
2586 #[tokio::test]
2587 async fn structured_failure_transient_trips_at_threshold() {
2588 let pool = test_db().await;
2589 let feed = make_feed(&pool, "rss", "Feed").await;
2590 let feeds = FeedsRepository::new(pool);
2591 // Fill up to threshold - 1
2592 for i in 0..CIRCUIT_BREAKER_THRESHOLD - 1 {
2593 let err = StructuredError::new(ErrorCategory::Transient, format!("error {i}"));
2594 let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2595 assert!(!tripped);
2596 }
2597 // One more should trip it
2598 let err = StructuredError::new(ErrorCategory::Transient, "final error");
2599 let tripped = feeds.record_fetch_failure_structured(feed.id, &err).await.unwrap();
2600 assert!(tripped);
2601 let f = feeds.get(feed.id).await.unwrap().unwrap();
2602 assert!(f.circuit_broken);
2603 }
2604
2605 // ── ItemsRepository (additional) ─────────────────────────────
2606
2607 #[tokio::test]
2608 async fn items_count_by_busser() {
2609 let pool = test_db().await;
2610 let feed_a = make_feed(&pool, "rss", "A").await;
2611 let feed_b = make_feed(&pool, "hn", "B").await;
2612 make_item(&pool, &feed_a, "a1").await;
2613 make_item(&pool, &feed_a, "a2").await;
2614 make_item(&pool, &feed_b, "b1").await;
2615 let items = ItemsRepository::new(pool);
2616 assert_eq!(items.count_by_busser("rss").await.unwrap(), 2);
2617 assert_eq!(items.count_by_busser("hn").await.unwrap(), 1);
2618 assert_eq!(items.count_by_busser("nonexistent").await.unwrap(), 0);
2619 }
2620
2621 #[tokio::test]
2622 async fn items_count_unread_by_busser() {
2623 let pool = test_db().await;
2624 let feed = make_feed(&pool, "rss", "Feed").await;
2625 let item1 = make_item(&pool, &feed, "i1").await;
2626 make_item(&pool, &feed, "i2").await;
2627 let items = ItemsRepository::new(pool);
2628 assert_eq!(items.count_unread_by_busser("rss").await.unwrap(), 2);
2629 items.mark_read(item1.id, true).await.unwrap();
2630 assert_eq!(items.count_unread_by_busser("rss").await.unwrap(), 1);
2631 }
2632
2633 #[tokio::test]
2634 async fn items_counts_by_busser_bulk() {
2635 let pool = test_db().await;
2636 let feed_a = make_feed(&pool, "rss", "A").await;
2637 let feed_b = make_feed(&pool, "hn", "B").await;
2638 let item_a1 = make_item(&pool, &feed_a, "a1").await;
2639 make_item(&pool, &feed_a, "a2").await;
2640 make_item(&pool, &feed_b, "b1").await;
2641 let items = ItemsRepository::new(pool);
2642 items.mark_read(item_a1.id, true).await.unwrap();
2643 let counts = items.counts_by_busser().await.unwrap();
2644 // Should have entries for rss (total=2, unread=1) and hn (total=1, unread=1)
2645 let rss = counts.iter().find(|(b, _, _)| b == "rss").unwrap();
2646 assert_eq!(rss.1, 2); // total
2647 assert_eq!(rss.2, 1); // unread
2648 let hn = counts.iter().find(|(b, _, _)| b == "hn").unwrap();
2649 assert_eq!(hn.1, 1);
2650 assert_eq!(hn.2, 1);
2651 }
2652
2653 #[tokio::test]
2654 async fn items_search_with_source_filter() {
2655 let pool = test_db().await;
2656 let feed_a = make_feed(&pool, "rss", "RSS Feed").await;
2657 let feed_b = make_feed(&pool, "hn", "HN Feed").await;
2658 make_item(&pool, &feed_a, "rss1").await;
2659 make_item(&pool, &feed_b, "hn1").await;
2660 let items = ItemsRepository::new(pool);
2661 // Search all - should find both (searching for "Title" which is in default title)
2662 let all = items.list_search("Title", None, false, false, 10, 0).await.unwrap();
2663 assert_eq!(all.len(), 2);
2664 // Search with source filter
2665 let rss_only = items.list_search("Title", Some("rss"), false, false, 10, 0).await.unwrap();
2666 assert_eq!(rss_only.len(), 1);
2667 assert_eq!(rss_only[0].busser_id.as_str(), "rss");
2668 }
2669
2670 #[tokio::test]
2671 async fn items_search_unread_only() {
2672 let pool = test_db().await;
2673 let feed = make_feed(&pool, "rss", "Feed").await;
2674 let item1 = make_item(&pool, &feed, "i1").await;
2675 make_item(&pool, &feed, "i2").await;
2676 let items = ItemsRepository::new(pool);
2677 items.mark_read(item1.id, true).await.unwrap();
2678 let unread = items.list_search("Title", None, true, false, 10, 0).await.unwrap();
2679 assert_eq!(unread.len(), 1);
2680 }
2681
2682 #[tokio::test]
2683 async fn items_search_starred_only() {
2684 let pool = test_db().await;
2685 let feed = make_feed(&pool, "rss", "Feed").await;
2686 let item1 = make_item(&pool, &feed, "i1").await;
2687 make_item(&pool, &feed, "i2").await;
2688 let items = ItemsRepository::new(pool);
2689 items.mark_starred(item1.id, true).await.unwrap();
2690 let starred = items.list_search("Title", None, false, true, 10, 0).await.unwrap();
2691 assert_eq!(starred.len(), 1);
2692 }
2693
2694 // ── TagsRepository (additional) ──────────────────────────────
2695
2696 #[tokio::test]
2697 async fn tags_all_feed_tags_bulk() {
2698 let pool = test_db().await;
2699 let feed_a = make_feed(&pool, "rss", "A").await;
2700 let feed_b = make_feed(&pool, "hn", "B").await;
2701 let tags = TagsRepository::new(pool);
2702 tags.set_tags(feed_a.id, &["tech".into(), "news".into()]).await.unwrap();
2703 tags.set_tags(feed_b.id, &["fun".into()]).await.unwrap();
2704 let all = tags.all_feed_tags().await.unwrap();
2705 assert_eq!(all.len(), 3);
2706 // Verify ordering (by feed_id, then tag)
2707 assert!(all.iter().any(|(id, tag)| *id == feed_a.id && tag == "news"));
2708 assert!(all.iter().any(|(id, tag)| *id == feed_a.id && tag == "tech"));
2709 assert!(all.iter().any(|(id, tag)| *id == feed_b.id && tag == "fun"));
2710 }
2711
2712 // ── StateRepository (additional) ─────────────────────────────
2713
2714 #[tokio::test]
2715 async fn state_list_returns_ordered() {
2716 let pool = test_db().await;
2717 let state = StateRepository::new(pool);
2718 state.set("rss", "cursor", "abc").await.unwrap();
2719 state.set("rss", "auth_token", "xyz").await.unwrap();
2720 state.set("rss", "page", "2").await.unwrap();
2721 state.set("hn", "unrelated", "val").await.unwrap();
2722 let list = state.list("rss").await.unwrap();
2723 assert_eq!(list.len(), 3);
2724 assert_eq!(list[0].key, "auth_token"); // alphabetical
2725 assert_eq!(list[1].key, "cursor");
2726 assert_eq!(list[2].key, "page");
2727 }
2728
2729 // ── Circuit Breaker ─────────────────────────────────────────
2730
2731 #[tokio::test]
2732 async fn circuit_breaker_new_feed_not_broken() {
2733 let pool = test_db().await;
2734 let feed = make_feed(&pool, "rss", "Feed").await;
2735 assert!(!feed.circuit_broken);
2736 }
2737
2738 #[tokio::test]
2739 async fn circuit_breaker_trips_at_threshold() {
2740 let pool = test_db().await;
2741 let feeds = FeedsRepository::new(pool.clone());
2742 let feed = make_feed(&pool, "rss", "Feed").await;
2743
2744 // Record failures up to threshold - 1: should not trip
2745 for i in 0..(CIRCUIT_BREAKER_THRESHOLD - 1) {
2746 let tripped = feeds
2747 .record_fetch_failure(feed.id, &format!("error {}", i))
2748 .await
2749 .unwrap();
2750 assert!(!tripped, "should not trip at failure {}", i + 1);
2751 }
2752 let f = feeds.get(feed.id).await.unwrap().unwrap();
2753 assert!(!f.circuit_broken);
2754 assert_eq!(f.consecutive_failures, CIRCUIT_BREAKER_THRESHOLD - 1);
2755
2756 // The Nth failure should trip the breaker
2757 let tripped = feeds
2758 .record_fetch_failure(feed.id, "final error")
2759 .await
2760 .unwrap();
2761 assert!(tripped, "should trip at threshold");
2762
2763 let f = feeds.get(feed.id).await.unwrap().unwrap();
2764 assert!(f.circuit_broken);
2765 assert_eq!(f.consecutive_failures, CIRCUIT_BREAKER_THRESHOLD);
2766 }
2767
2768 #[tokio::test]
2769 async fn circuit_breaker_does_not_trip_again_once_broken() {
2770 let pool = test_db().await;
2771 let feeds = FeedsRepository::new(pool.clone());
2772 let feed = make_feed(&pool, "rss", "Feed").await;
2773
2774 // Trip the breaker
2775 for _ in 0..CIRCUIT_BREAKER_THRESHOLD {
2776 feeds.record_fetch_failure(feed.id, "err").await.unwrap();
2777 }
2778 let f = feeds.get(feed.id).await.unwrap().unwrap();
2779 assert!(f.circuit_broken);
2780
2781 // Additional failures should return false (already broken)
2782 let tripped = feeds
2783 .record_fetch_failure(feed.id, "extra error")
2784 .await
2785 .unwrap();
2786 assert!(!tripped, "should not re-trip");
2787 }
2788
2789 #[tokio::test]
2790 async fn circuit_breaker_reset_clears_state() {
2791 let pool = test_db().await;
2792 let feeds = FeedsRepository::new(pool.clone());
2793 let feed = make_feed(&pool, "rss", "Feed").await;
2794
2795 // Trip the breaker
2796 for _ in 0..CIRCUIT_BREAKER_THRESHOLD {
2797 feeds.record_fetch_failure(feed.id, "err").await.unwrap();
2798 }
2799 let f = feeds.get(feed.id).await.unwrap().unwrap();
2800 assert!(f.circuit_broken);
2801 assert_eq!(f.consecutive_failures, CIRCUIT_BREAKER_THRESHOLD);
2802 assert!(f.last_error.is_some());
2803
2804 // Reset
2805 feeds.reset_circuit_breaker(feed.id).await.unwrap();
2806 let f = feeds.get(feed.id).await.unwrap().unwrap();
2807 assert!(!f.circuit_broken);
2808 assert_eq!(f.consecutive_failures, 0);
2809 assert!(f.last_error.is_none());
2810 }
2811
2812 #[tokio::test]
2813 async fn circuit_breaker_success_resets_counter_but_not_broken_flag() {
2814 let pool = test_db().await;
2815 let feeds = FeedsRepository::new(pool.clone());
2816 let feed = make_feed(&pool, "rss", "Feed").await;
2817
2818 // Accumulate some failures (below threshold)
2819 for _ in 0..5 {
2820 feeds.record_fetch_failure(feed.id, "err").await.unwrap();
2821 }
2822 let f = feeds.get(feed.id).await.unwrap().unwrap();
2823 assert_eq!(f.consecutive_failures, 5);
2824 assert!(!f.circuit_broken);
2825
2826 // Success resets the counter
2827 feeds.record_fetch_success(feed.id).await.unwrap();
2828 let f = feeds.get(feed.id).await.unwrap().unwrap();
2829 assert_eq!(f.consecutive_failures, 0);
2830 assert!(!f.circuit_broken);
2831 }
2832
2833 #[tokio::test]
2834 async fn circuit_broken_feed_excluded_from_list_enabled() {
2835 let pool = test_db().await;
2836 let feeds = FeedsRepository::new(pool.clone());
2837 let feed_ok = make_feed(&pool, "rss", "OK Feed").await;
2838 let feed_broken = make_feed(&pool, "hn", "Broken Feed").await;
2839
2840 // Trip the circuit breaker on one feed
2841 for _ in 0..CIRCUIT_BREAKER_THRESHOLD {
2842 feeds
2843 .record_fetch_failure(feed_broken.id, "err")
2844 .await
2845 .unwrap();
2846 }
2847
2848 let enabled = feeds.list_enabled().await.unwrap();
2849 assert_eq!(enabled.len(), 1);
2850 assert_eq!(enabled[0].id, feed_ok.id);
2851
2852 // list_all should still include both
2853 let all = feeds.list_all().await.unwrap();
2854 assert_eq!(all.len(), 2);
2855 }
2856
2857 #[tokio::test]
2858 async fn circuit_breaker_set_and_clear() {
2859 let pool = test_db().await;
2860 let feeds = FeedsRepository::new(pool.clone());
2861 let feed = make_feed(&pool, "rss", "Feed").await;
2862 assert!(!feed.circuit_broken);
2863
2864 feeds.set_circuit_broken(feed.id, true).await.unwrap();
2865 let f = feeds.get(feed.id).await.unwrap().unwrap();
2866 assert!(f.circuit_broken);
2867
2868 feeds.set_circuit_broken(feed.id, false).await.unwrap();
2869 let f = feeds.get(feed.id).await.unwrap().unwrap();
2870 assert!(!f.circuit_broken);
2871 }
2872 }
2873