Skip to main content

max / balanced_breakfast

15.6 KB · 440 lines History Blame Raw
1 //! Feed CRUD, update, get, enabled/disabled, transaction deletion,
2 //! and source listing integration tests.
3
4 mod common;
5
6 use bb_db::{BusserId, CreateFeed, FeedId};
7 use bb_feed::FeedGenerator;
8
9 // ── Feed CRUD ────────────────────────────────────────────────────────
10
11 #[tokio::test]
12 async fn feed_create_and_list() {
13 let orch = common::setup("feed_create_list").await;
14 let db = orch.database();
15
16 let feed = db
17 .feeds()
18 .create(CreateFeed {
19 busser_id: BusserId::new("rss"),
20 name: "Test Feed".to_string(),
21 config: serde_json::json!({ "feed_url": "https://example.com/feed.xml" }),
22 })
23 .await
24 .unwrap();
25
26 assert_eq!(feed.busser_id, "rss");
27 assert_eq!(feed.name, "Test Feed");
28 assert!(feed.enabled);
29
30 let all = db.feeds().list_all().await.unwrap();
31 assert_eq!(all.len(), 1);
32 assert_eq!(all[0].id, feed.id);
33 }
34
35 #[tokio::test]
36 async fn feed_delete_cascades_items() {
37 let db = common::test_db().await;
38 let feed = common::create_rss_feed(&db, "Doomed Feed", "https://example.com/rss").await;
39 common::insert_item(&db, &feed, "rss:d1", "Article 1", 1).await;
40 common::insert_item(&db, &feed, "rss:d2", "Article 2", 2).await;
41
42 assert_eq!(db.items().count_all().await.unwrap(), 2);
43
44 // Delete items first, then feed (replicating delete_feed command logic)
45 db.items().delete_by_feed(feed.id).await.unwrap();
46 db.feeds().delete(feed.id).await.unwrap();
47
48 assert_eq!(db.items().count_all().await.unwrap(), 0);
49 assert!(db.feeds().get(feed.id).await.unwrap().is_none());
50 }
51
52 #[tokio::test]
53 async fn feed_delete_by_busser() {
54 let db = common::test_db().await;
55 let feed_a = common::create_rss_feed(&db, "Feed A", "https://a.com/rss").await;
56 let feed_b = common::create_rss_feed(&db, "Feed B", "https://b.com/rss").await;
57 let feed_hn = common::create_other_feed(&db, "hn", "HN Feed").await;
58
59 common::insert_item(&db, &feed_a, "rss:a1", "A", 1).await;
60 common::insert_item(&db, &feed_b, "rss:b1", "B", 1).await;
61 common::insert_item(&db, &feed_hn, "hn:1", "HN", 1).await;
62
63 // Delete all RSS feeds (replicating delete_feeds_by_busser command logic)
64 let rss_feeds = db.feeds().get_by_busser("rss").await.unwrap();
65 for feed in &rss_feeds {
66 db.items().delete_by_feed(feed.id).await.unwrap();
67 db.feeds().delete(feed.id).await.unwrap();
68 }
69
70 // Only HN feed and its item should remain
71 let remaining_feeds = db.feeds().list_all().await.unwrap();
72 assert_eq!(remaining_feeds.len(), 1);
73 assert_eq!(remaining_feeds[0].busser_id, "hn");
74 assert_eq!(db.items().count_all().await.unwrap(), 1);
75 }
76
77 #[tokio::test]
78 async fn feed_get_by_busser() {
79 let db = common::test_db().await;
80 common::create_rss_feed(&db, "RSS A", "https://a.com/rss").await;
81 common::create_rss_feed(&db, "RSS B", "https://b.com/rss").await;
82 common::create_other_feed(&db, "hn", "HN").await;
83
84 let rss_feeds = db.feeds().get_by_busser("rss").await.unwrap();
85 assert_eq!(rss_feeds.len(), 2);
86
87 let hn_feeds = db.feeds().get_by_busser("hn").await.unwrap();
88 assert_eq!(hn_feeds.len(), 1);
89
90 let empty = db.feeds().get_by_busser("nonexistent").await.unwrap();
91 assert!(empty.is_empty());
92 }
93
94 // ── Feed Update ──────────────────────────────────────────────────────
95
96 #[tokio::test]
97 async fn feed_update_name() {
98 let db = common::test_db().await;
99 let feed = common::create_rss_feed(&db, "Original Name", "https://example.com/rss").await;
100
101 db.feeds().update_name(feed.id, "Updated Name").await.unwrap();
102
103 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
104 assert_eq!(fetched.name, "Updated Name");
105 }
106
107 #[tokio::test]
108 async fn feed_update_config() {
109 let db = common::test_db().await;
110 let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await;
111
112 let new_config = serde_json::json!({ "feed_url": "https://new.example.com/rss" });
113 let config_str = serde_json::to_string(&new_config).unwrap();
114 db.feeds().update_config(feed.id, &config_str).await.unwrap();
115
116 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
117 let config = fetched.config_json();
118 assert_eq!(config["feed_url"], "https://new.example.com/rss");
119 }
120
121 #[tokio::test]
122 async fn feed_update_name_and_config_together() {
123 let db = common::test_db().await;
124 let feed = common::create_rss_feed(&db, "Old Name", "https://old.example.com/rss").await;
125
126 // Replicate update_feed command: update name then config
127 db.feeds().update_name(feed.id, "New Name").await.unwrap();
128 let new_config = serde_json::json!({ "feed_url": "https://new.example.com/rss" });
129 let config_str = serde_json::to_string(&new_config).unwrap();
130 db.feeds().update_config(feed.id, &config_str).await.unwrap();
131
132 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
133 assert_eq!(fetched.name, "New Name");
134 assert_eq!(fetched.config_json()["feed_url"], "https://new.example.com/rss");
135 }
136
137 #[tokio::test]
138 async fn feed_update_preserves_other_fields() {
139 let db = common::test_db().await;
140 let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await;
141
142 // Record a failure so the feed has non-default state
143 db.feeds().record_fetch_failure(feed.id, "timeout").await.unwrap();
144
145 // Update just the name
146 db.feeds().update_name(feed.id, "Renamed").await.unwrap();
147
148 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
149 assert_eq!(fetched.name, "Renamed");
150 assert_eq!(fetched.consecutive_failures, 1);
151 assert_eq!(fetched.last_error.as_deref(), Some("timeout"));
152 assert!(fetched.enabled);
153 }
154
155 // ── Feed Get (single feed retrieval) ─────────────────────────────────
156
157 #[tokio::test]
158 async fn feed_get_by_id() {
159 let db = common::test_db().await;
160 let feed = common::create_rss_feed(&db, "My Feed", "https://example.com/rss").await;
161
162 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
163 assert_eq!(fetched.id, feed.id);
164 assert_eq!(fetched.name, "My Feed");
165 assert_eq!(fetched.busser_id, "rss");
166 assert!(fetched.enabled);
167 assert!(!fetched.circuit_broken);
168 }
169
170 #[tokio::test]
171 async fn feed_get_nonexistent_returns_none() {
172 let db = common::test_db().await;
173 let fake_id = FeedId::new();
174 let result = db.feeds().get(fake_id).await.unwrap();
175 assert!(result.is_none());
176 }
177
178 #[tokio::test]
179 async fn feed_get_first_by_busser_replicates_get_feed_command() {
180 let db = common::test_db().await;
181 // Create two feeds for the same busser
182 common::create_rss_feed(&db, "Alpha Feed", "https://alpha.com/rss").await;
183 common::create_rss_feed(&db, "Beta Feed", "https://beta.com/rss").await;
184
185 // The get_feed command returns the first feed for a busser
186 let feeds = db.feeds().get_by_busser("rss").await.unwrap();
187 let first = feeds.into_iter().next().unwrap();
188
189 // get_by_busser is ordered by name, so "Alpha Feed" should be first
190 assert_eq!(first.name, "Alpha Feed");
191 assert_eq!(first.busser_id, "rss");
192 }
193
194 // ── Feed Enabled/Disabled ────────────────────────────────────────────
195
196 #[tokio::test]
197 async fn feed_disable_excludes_from_enabled_list() {
198 let db = common::test_db().await;
199 let feed_a = common::create_rss_feed(&db, "Feed A", "https://a.com/rss").await;
200 let feed_b = common::create_rss_feed(&db, "Feed B", "https://b.com/rss").await;
201
202 db.feeds().set_enabled(feed_a.id, false).await.unwrap();
203
204 let enabled = db.feeds().list_enabled().await.unwrap();
205 assert_eq!(enabled.len(), 1);
206 assert_eq!(enabled[0].id, feed_b.id);
207
208 // Re-enable
209 db.feeds().set_enabled(feed_a.id, true).await.unwrap();
210 let enabled = db.feeds().list_enabled().await.unwrap();
211 assert_eq!(enabled.len(), 2);
212 }
213
214 // ── Feed Transaction Deletion ────────────────────────────────────────
215
216 #[tokio::test]
217 async fn feed_delete_via_transaction() {
218 use sqlx::Acquire;
219
220 let db = common::test_db().await;
221 let feed = common::create_rss_feed(&db, "Doomed", "https://example.com/rss").await;
222 common::insert_item(&db, &feed, "rss:d1", "Article", 1).await;
223 common::insert_item(&db, &feed, "rss:d2", "Article 2", 2).await;
224
225 // Replicate the actual command handler's transaction-based deletion
226 let mut conn = db.pool().acquire().await.unwrap();
227 let mut tx = conn.begin().await.unwrap();
228
229 sqlx::query("DELETE FROM feed_items WHERE feed_id = ?1")
230 .bind(feed.id)
231 .execute(&mut *tx)
232 .await
233 .unwrap();
234
235 sqlx::query("DELETE FROM feeds WHERE id = ?1")
236 .bind(feed.id)
237 .execute(&mut *tx)
238 .await
239 .unwrap();
240
241 tx.commit().await.unwrap();
242
243 assert_eq!(db.items().count_all().await.unwrap(), 0);
244 assert!(db.feeds().get(feed.id).await.unwrap().is_none());
245 }
246
247 #[tokio::test]
248 async fn feed_delete_by_busser_via_transaction() {
249 use sqlx::Acquire;
250
251 let db = common::test_db().await;
252 let feed_a = common::create_rss_feed(&db, "RSS A", "https://a.com/rss").await;
253 let feed_b = common::create_rss_feed(&db, "RSS B", "https://b.com/rss").await;
254 let feed_hn = common::create_other_feed(&db, "hn", "HN").await;
255
256 common::insert_item(&db, &feed_a, "rss:a1", "A", 1).await;
257 common::insert_item(&db, &feed_b, "rss:b1", "B", 1).await;
258 common::insert_item(&db, &feed_hn, "hn:1", "HN", 1).await;
259
260 // Replicate delete_feeds_by_busser command logic
261 let feeds = db.feeds().get_by_busser("rss").await.unwrap();
262 let mut conn = db.pool().acquire().await.unwrap();
263 let mut tx = conn.begin().await.unwrap();
264
265 for feed in &feeds {
266 sqlx::query("DELETE FROM feed_items WHERE feed_id = ?1")
267 .bind(feed.id)
268 .execute(&mut *tx)
269 .await
270 .unwrap();
271 sqlx::query("DELETE FROM feeds WHERE id = ?1")
272 .bind(feed.id)
273 .execute(&mut *tx)
274 .await
275 .unwrap();
276 }
277
278 tx.commit().await.unwrap();
279
280 // Only HN feed and its item should remain
281 let remaining = db.feeds().list_all().await.unwrap();
282 assert_eq!(remaining.len(), 1);
283 assert_eq!(remaining[0].busser_id, "hn");
284 assert_eq!(db.items().count_all().await.unwrap(), 1);
285 }
286
287 // ── Source Listing: Unread Counts After State Changes ────────────────
288
289 #[tokio::test]
290 async fn source_listing_unread_counts_update_after_mark_read() {
291 let db = common::test_db().await;
292 let feed_rss = common::create_rss_feed(&db, "RSS", "https://example.com/rss").await;
293 let feed_hn = common::create_other_feed(&db, "hn", "HN").await;
294
295 let rss1 = common::insert_item(&db, &feed_rss, "rss:1", "RSS 1", 1).await;
296 common::insert_item(&db, &feed_rss, "rss:2", "RSS 2", 2).await;
297 common::insert_item(&db, &feed_hn, "hn:1", "HN 1", 1).await;
298
299 let fg = FeedGenerator::new(db.clone());
300
301 // Initial state: RSS has 2 unread, HN has 1 unread
302 let sources = fg.get_sources().await.unwrap();
303 let rss = sources.iter().find(|s| s.id == "rss").unwrap();
304 assert_eq!(rss.unread_count, 2);
305 let hn = sources.iter().find(|s| s.id == "hn").unwrap();
306 assert_eq!(hn.unread_count, 1);
307
308 // Mark one RSS item read
309 db.items().mark_read(rss1.id, true).await.unwrap();
310
311 let sources = fg.get_sources().await.unwrap();
312 let rss = sources.iter().find(|s| s.id == "rss").unwrap();
313 assert_eq!(rss.unread_count, 1);
314 assert_eq!(rss.total_count, 2); // total unchanged
315 // HN unaffected
316 let hn = sources.iter().find(|s| s.id == "hn").unwrap();
317 assert_eq!(hn.unread_count, 1);
318 }
319
320 // ── Source Listing: Circuit Breaker Health Status ─────────────────────
321
322 #[tokio::test]
323 async fn source_health_circuit_broken_overrides_failure_count() {
324 let db = common::test_db().await;
325 let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await;
326
327 // Accumulate failures past the circuit breaker threshold
328 for _ in 0..10 {
329 db.feeds()
330 .record_fetch_failure(feed.id, "connection timeout")
331 .await
332 .unwrap();
333 }
334
335 // Verify circuit breaker tripped
336 let refreshed = db.feeds().get(feed.id).await.unwrap().unwrap();
337 assert!(refreshed.circuit_broken);
338
339 // Replicate the health mapping from sources.rs
340 let fg = FeedGenerator::new(db.clone());
341 let sources = fg.get_sources().await.unwrap();
342 let source = &sources[0];
343
344 // The sources.rs command checks circuit_broken first, before failure count
345 let health = if source.circuit_broken {
346 "circuit_broken"
347 } else {
348 match source.consecutive_failures {
349 0 => "green",
350 1..=2 => "yellow",
351 _ => "red",
352 }
353 };
354 assert_eq!(health, "circuit_broken");
355 assert!(source.circuit_broken);
356 assert!(source.last_error.is_some());
357 }
358
359 #[tokio::test]
360 async fn source_health_circuit_broken_resets_to_green() {
361 let db = common::test_db().await;
362 let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await;
363
364 // Trip the circuit breaker
365 for _ in 0..10 {
366 db.feeds()
367 .record_fetch_failure(feed.id, "error")
368 .await
369 .unwrap();
370 }
371
372 // Reset via the repository method (mirrors reset_circuit_breaker command)
373 db.feeds().reset_circuit_breaker(feed.id).await.unwrap();
374
375 let fg = FeedGenerator::new(db.clone());
376 let sources = fg.get_sources().await.unwrap();
377 let source = &sources[0];
378
379 assert!(!source.circuit_broken);
380 assert_eq!(source.consecutive_failures, 0);
381 assert!(source.last_error.is_none());
382
383 let health = if source.circuit_broken {
384 "circuit_broken"
385 } else {
386 match source.consecutive_failures {
387 0 => "green",
388 1..=2 => "yellow",
389 _ => "red",
390 }
391 };
392 assert_eq!(health, "green");
393 }
394
395 // ── Source Listing: Tags ─────────────────────────────────────────────
396
397 #[tokio::test]
398 async fn source_listing_includes_feed_tags() {
399 let db = common::test_db().await;
400 let feed_rss = common::create_rss_feed(&db, "RSS Feed", "https://example.com/rss").await;
401 let feed_hn = common::create_other_feed(&db, "hn", "HN").await;
402
403 db.tags()
404 .set_tags(feed_rss.id, &["tech".to_string(), "news".to_string()])
405 .await
406 .unwrap();
407 db.tags()
408 .set_tags(feed_hn.id, &["social".to_string()])
409 .await
410 .unwrap();
411
412 let fg = FeedGenerator::new(db);
413 let sources = fg.get_sources().await.unwrap();
414
415 let rss = sources.iter().find(|s| s.id == "rss").unwrap();
416 assert!(rss.tags.contains(&"tech".to_string()));
417 assert!(rss.tags.contains(&"news".to_string()));
418
419 let hn = sources.iter().find(|s| s.id == "hn").unwrap();
420 assert_eq!(hn.tags, vec!["social"]);
421 }
422
423 // ── Source Listing: Feed with No Items Shows Zero Counts ─────────────
424
425 #[tokio::test]
426 async fn source_listing_feed_with_no_items() {
427 let db = common::test_db().await;
428 common::create_rss_feed(&db, "Empty RSS", "https://example.com/rss").await;
429 common::create_other_feed(&db, "hn", "Empty HN").await;
430
431 let fg = FeedGenerator::new(db);
432 let sources = fg.get_sources().await.unwrap();
433 assert_eq!(sources.len(), 2);
434
435 for source in &sources {
436 assert_eq!(source.total_count, 0);
437 assert_eq!(source.unread_count, 0);
438 }
439 }
440