Skip to main content

max / balanced_breakfast

9.5 KB · 291 lines History Blame Raw
1 //! Orchestrator integration, source health status, and circuit breaker
2 //! reset integration tests.
3
4 mod common;
5
6 use bb_db::{BusserId, CreateFeed, CreateFeedItem};
7 use bb_feed::FeedGenerator;
8 use chrono::Utc;
9
10 // ── Orchestrator Integration ─────────────────────────────────────────
11
12 #[tokio::test]
13 async fn orchestrator_create_and_query_items() {
14 let orch = common::setup("orch_items").await;
15 let db = orch.database();
16
17 let feed = db
18 .feeds()
19 .create(CreateFeed {
20 busser_id: BusserId::new("test"),
21 name: "Test Source".to_string(),
22 config: serde_json::json!({}),
23 })
24 .await
25 .unwrap();
26
27 let item = db
28 .items()
29 .upsert(CreateFeedItem {
30 external_id: "test:art1".to_string(),
31 feed_id: feed.id,
32 busser_id: BusserId::new("test"),
33 bite_author: "Author".to_string(),
34 bite_text: "Breaking news".to_string(),
35 bite_secondary: Some("42 points".to_string()),
36 bite_indicator: None,
37 title: Some("Big Story".to_string()),
38 body: Some("<p>Full article body</p>".to_string()),
39 url: Some("https://example.com/story".to_string()),
40 media: vec!["https://example.com/img.jpg".to_string()],
41 published_at: Utc::now(),
42 source_name: "Test Source".to_string(),
43 score: Some(42),
44 tags: vec!["breaking".to_string()],
45 actions: vec![],
46 })
47 .await
48 .unwrap();
49
50 // Verify all fields round-trip correctly
51 let fetched = db.items().get(item.id).await.unwrap().unwrap();
52 assert_eq!(fetched.bite_author, "Author");
53 assert_eq!(fetched.bite_text, "Breaking news");
54 assert_eq!(fetched.bite_secondary, Some("42 points".to_string()));
55 assert_eq!(fetched.title, Some("Big Story".to_string()));
56 assert_eq!(fetched.body, Some("<p>Full article body</p>".to_string()));
57 assert_eq!(fetched.url, Some("https://example.com/story".to_string()));
58 assert_eq!(fetched.media_vec(), vec!["https://example.com/img.jpg"]);
59 assert_eq!(fetched.score, Some(42));
60 assert_eq!(fetched.tags_vec(), vec!["breaking"]);
61 assert!(!fetched.is_read);
62 assert!(!fetched.is_starred);
63 }
64
65 #[tokio::test]
66 async fn orchestrator_source_listing() {
67 let orch = common::setup("orch_sources").await;
68 let db = orch.database();
69
70 let feed_rss = db
71 .feeds()
72 .create(CreateFeed {
73 busser_id: BusserId::new("rss"),
74 name: "RSS Feed".to_string(),
75 config: serde_json::json!({ "feed_url": "https://example.com/rss" }),
76 })
77 .await
78 .unwrap();
79
80 let feed_hn = db
81 .feeds()
82 .create(CreateFeed {
83 busser_id: BusserId::new("hn"),
84 name: "Hacker News".to_string(),
85 config: serde_json::json!({}),
86 })
87 .await
88 .unwrap();
89
90 common::insert_item(db, &feed_rss, "rss:s1", "RSS Item", 1).await;
91 common::insert_item(db, &feed_hn, "hn:s1", "HN Item 1", 1).await;
92 common::insert_item(db, &feed_hn, "hn:s2", "HN Item 2", 2).await;
93
94 // Replicate list_sources command logic
95 let fg = FeedGenerator::new(db.clone());
96 let sources = fg.get_sources().await.unwrap();
97 assert_eq!(sources.len(), 2);
98
99 let hn = sources.iter().find(|s| s.id == "hn").unwrap();
100 assert_eq!(hn.name, "Hacker News");
101 assert_eq!(hn.total_count, 2);
102 assert_eq!(hn.unread_count, 2);
103
104 let rss = sources.iter().find(|s| s.id == "rss").unwrap();
105 assert_eq!(rss.total_count, 1);
106 }
107
108 // ── Source Health Status ────────────────────────────────────────────���
109
110 #[tokio::test]
111 async fn source_health_green_on_no_failures() {
112 let db = common::test_db().await;
113 let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await;
114 common::insert_item(&db, &feed, "rss:1", "Item", 1).await;
115
116 let fg = FeedGenerator::new(db);
117 let sources = fg.get_sources().await.unwrap();
118 assert_eq!(sources[0].consecutive_failures, 0);
119
120 // Replicate health mapping from sources.rs command
121 let health = match sources[0].consecutive_failures {
122 0 => "green",
123 1..=2 => "yellow",
124 _ => "red",
125 };
126 assert_eq!(health, "green");
127 }
128
129 #[tokio::test]
130 async fn source_health_yellow_on_few_failures() {
131 let db = common::test_db().await;
132 let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await;
133
134 db.feeds()
135 .record_fetch_failure(feed.id, "timeout")
136 .await
137 .unwrap();
138
139 let fg = FeedGenerator::new(db);
140 let sources = fg.get_sources().await.unwrap();
141 let health = match sources[0].consecutive_failures {
142 0 => "green",
143 1..=2 => "yellow",
144 _ => "red",
145 };
146 assert_eq!(health, "yellow");
147 }
148
149 #[tokio::test]
150 async fn source_health_red_on_many_failures() {
151 let db = common::test_db().await;
152 let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await;
153
154 for _ in 0..3 {
155 db.feeds()
156 .record_fetch_failure(feed.id, "connection refused")
157 .await
158 .unwrap();
159 }
160
161 let fg = FeedGenerator::new(db);
162 let sources = fg.get_sources().await.unwrap();
163 let health = match sources[0].consecutive_failures {
164 0 => "green",
165 1..=2 => "yellow",
166 _ => "red",
167 };
168 assert_eq!(health, "red");
169 assert_eq!(
170 sources[0].last_error.as_deref(),
171 Some("connection refused")
172 );
173 }
174
175 #[tokio::test]
176 async fn source_health_resets_after_success() {
177 let db = common::test_db().await;
178 let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await;
179
180 // Fail twice, then succeed
181 db.feeds()
182 .record_fetch_failure(feed.id, "error")
183 .await
184 .unwrap();
185 db.feeds()
186 .record_fetch_failure(feed.id, "error")
187 .await
188 .unwrap();
189 db.feeds()
190 .record_fetch_success(feed.id)
191 .await
192 .unwrap();
193
194 let fg = FeedGenerator::new(db);
195 let sources = fg.get_sources().await.unwrap();
196 assert_eq!(sources[0].consecutive_failures, 0);
197 assert!(sources[0].last_error.is_none());
198 }
199
200 // ── Circuit Breaker Reset ────────────────────────────────────────────
201
202 #[tokio::test]
203 async fn circuit_breaker_trips_after_threshold() {
204 let db = common::test_db().await;
205 let feed = common::create_rss_feed(&db, "Flaky Feed", "https://example.com/rss").await;
206
207 // Record failures up to threshold (10)
208 for i in 0..10 {
209 let tripped = db
210 .feeds()
211 .record_fetch_failure(feed.id, &format!("error {}", i))
212 .await
213 .unwrap();
214 if i < 9 {
215 assert!(!tripped, "should not trip before threshold");
216 } else {
217 assert!(tripped, "should trip at threshold");
218 }
219 }
220
221 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
222 assert!(fetched.circuit_broken);
223 assert_eq!(fetched.consecutive_failures, 10);
224 }
225
226 #[tokio::test]
227 async fn circuit_breaker_reset_clears_state() {
228 let db = common::test_db().await;
229 let feed = common::create_rss_feed(&db, "Broken Feed", "https://example.com/rss").await;
230
231 // Trip the circuit breaker
232 for _ in 0..10 {
233 db.feeds()
234 .record_fetch_failure(feed.id, "connection refused")
235 .await
236 .unwrap();
237 }
238
239 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
240 assert!(fetched.circuit_broken);
241
242 // Reset the circuit breaker (replicates reset_circuit_breaker command path)
243 db.feeds().reset_circuit_breaker(feed.id).await.unwrap();
244
245 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
246 assert!(!fetched.circuit_broken);
247 assert_eq!(fetched.consecutive_failures, 0);
248 assert!(fetched.last_error.is_none());
249 }
250
251 #[tokio::test]
252 async fn circuit_breaker_reset_on_non_broken_feed_is_safe() {
253 let db = common::test_db().await;
254 let feed = common::create_rss_feed(&db, "Healthy Feed", "https://example.com/rss").await;
255
256 // Feed has a couple failures but hasn't tripped
257 db.feeds().record_fetch_failure(feed.id, "transient").await.unwrap();
258 db.feeds().record_fetch_failure(feed.id, "transient").await.unwrap();
259
260 // Reset even though not circuit-broken — should still clear counters
261 db.feeds().reset_circuit_breaker(feed.id).await.unwrap();
262
263 let fetched = db.feeds().get(feed.id).await.unwrap().unwrap();
264 assert!(!fetched.circuit_broken);
265 assert_eq!(fetched.consecutive_failures, 0);
266 assert!(fetched.last_error.is_none());
267 }
268
269 #[tokio::test]
270 async fn circuit_breaker_excludes_broken_feeds_from_enabled_list() {
271 let db = common::test_db().await;
272 let healthy = common::create_rss_feed(&db, "Healthy", "https://healthy.com/rss").await;
273 let broken = common::create_rss_feed(&db, "Broken", "https://broken.com/rss").await;
274
275 // Trip the circuit breaker on one feed
276 for _ in 0..10 {
277 db.feeds()
278 .record_fetch_failure(broken.id, "error")
279 .await
280 .unwrap();
281 }
282
283 let enabled = db.feeds().list_enabled().await.unwrap();
284 assert_eq!(enabled.len(), 1);
285 assert_eq!(enabled[0].id, healthy.id);
286
287 // list_all still includes the broken feed
288 let all = db.feeds().list_all().await.unwrap();
289 assert_eq!(all.len(), 2);
290 }
291