Skip to main content

max / balanced_breakfast

11.8 KB · 375 lines History Blame Raw
1 //! Full plugin lifecycle test: load -> fetch -> classify error -> circuit break
2 //! -> reset -> fetch again.
3 //!
4 //! Exercises the end-to-end path through the orchestrator, plugin manager,
5 //! error classifier, and circuit breaker, using real Rhai plugins and an
6 //! in-memory SQLite database.
7
8 mod common;
9
10 use bb_db::{BusserId, CreateFeed};
11 use bb_interface::{ErrorCategory, StructuredError};
12
13 // ── Helpers ─────────────────────────────────────────────────────────
14
15 /// Write a Rhai plugin that returns one static item (no network).
16 fn write_ok_plugin(dir: &std::path::Path) -> std::path::PathBuf {
17 let code = r#"
18 fn id() { "lifecycle" }
19 fn name() { "Lifecycle Plugin" }
20
21 fn config_schema() {
22 #{
23 description: "Plugin for lifecycle test",
24 fields: []
25 }
26 }
27
28 fn fetch(config, cursor) {
29 #{
30 items: [
31 #{
32 id: "lc-1",
33 bite: #{
34 author: "Tester",
35 text: "Lifecycle item"
36 },
37 content: #{
38 title: "Lifecycle Title",
39 body: "<p>Body</p>",
40 url: "https://example.com/lifecycle"
41 },
42 meta: #{
43 source_name: "lifecycle",
44 published_at: 1700000000,
45 tags: ["test"]
46 }
47 }
48 ],
49 has_more: false
50 }
51 }
52 "#;
53 let path = dir.join("lifecycle.rhai");
54 std::fs::write(&path, code).unwrap();
55 path
56 }
57
58 /// Write a Rhai plugin with the same `id()` that throws a transient error.
59 fn write_transient_fail_plugin(dir: &std::path::Path) -> std::path::PathBuf {
60 let code = r#"
61 fn id() { "lifecycle" }
62 fn name() { "Lifecycle Plugin" }
63
64 fn config_schema() {
65 #{
66 description: "Plugin for lifecycle test (failing)",
67 fields: []
68 }
69 }
70
71 fn fetch(config, cursor) {
72 throw "BB_ERR:transient:HTTP 503: Service Unavailable";
73 }
74 "#;
75 let path = dir.join("lifecycle.rhai");
76 std::fs::write(&path, code).unwrap();
77 path
78 }
79
80 // ── Tests ───────────────────────────────────────────────────────────
81
82 /// Full lifecycle: load -> fetch OK -> swap to failing plugin -> accumulate
83 /// transient failures -> circuit break -> reset -> swap back -> fetch OK.
84 #[tokio::test]
85 async fn plugin_lifecycle_transient_circuit_break_and_recovery() {
86 let orch = common::setup("lifecycle_transient").await;
87 let db = orch.database();
88
89 // -- Step 1: Write and load the succeeding plugin -----------------
90 let plugins_dir = {
91 let pm = orch.plugins();
92 let pm = pm.read().await;
93 pm.plugins_dir().to_path_buf()
94 };
95 let plugin_path = write_ok_plugin(&plugins_dir);
96
97 {
98 let pm = orch.plugins();
99 let mut pm = pm.write().await;
100 pm.load_plugin(&plugin_path).unwrap();
101 pm.initialize_plugin("lifecycle", bb_interface::BusserConfig::new())
102 .unwrap();
103 }
104
105 // Create a feed row so the orchestrator can store items
106 let feed = db
107 .feeds()
108 .create(CreateFeed {
109 busser_id: BusserId::new("lifecycle"),
110 name: "Lifecycle Feed".to_string(),
111 config: serde_json::json!({}),
112 })
113 .await
114 .unwrap();
115
116 // -- Step 2: First fetch succeeds ---------------------------------
117 let count = orch.fetch_plugin("lifecycle").await.unwrap();
118 assert_eq!(count, 1, "first fetch should return 1 item");
119
120 let feed_after = db.feeds().get(feed.id).await.unwrap().unwrap();
121 assert_eq!(feed_after.consecutive_failures, 0);
122 assert!(!feed_after.circuit_broken);
123
124 // -- Step 3: Swap to a failing plugin (transient errors) ----------
125 //
126 // Overwrite the .rhai file and reload the plugin so `fetch()` now
127 // throws a transient error.
128 write_transient_fail_plugin(&plugins_dir);
129 {
130 let pm = orch.plugins();
131 let mut pm = pm.write().await;
132 pm.load_plugin(&plugin_path).unwrap();
133 pm.initialize_plugin("lifecycle", bb_interface::BusserConfig::new())
134 .unwrap();
135 }
136
137 // -- Step 4: Accumulate failures up to circuit breaker threshold ---
138 //
139 // Transient errors use threshold-based circuit breaking (10 failures).
140 for i in 0..10 {
141 let result = orch.fetch_plugin("lifecycle").await;
142 assert!(result.is_err(), "fetch {} should fail", i);
143 }
144
145 // -- Step 5: Verify circuit breaker is tripped --------------------
146 let feed_broken = db.feeds().get(feed.id).await.unwrap().unwrap();
147 assert!(
148 feed_broken.circuit_broken,
149 "circuit breaker should be tripped after 10 transient failures"
150 );
151 assert_eq!(feed_broken.consecutive_failures, 10);
152
153 // Verify the stored error is classified as transient
154 let stored_err =
155 StructuredError::from_last_error(feed_broken.last_error.as_ref().unwrap());
156 assert_eq!(
157 stored_err.category,
158 ErrorCategory::Transient,
159 "error should be classified as transient"
160 );
161
162 // Orchestrator helper should also report broken
163 assert!(
164 orch.is_circuit_broken("lifecycle").await.unwrap(),
165 "is_circuit_broken should return true"
166 );
167
168 // The feed should be excluded from the enabled list
169 let enabled = db.feeds().list_enabled().await.unwrap();
170 assert!(
171 enabled.iter().all(|f| f.id != feed.id),
172 "broken feed should not appear in enabled list"
173 );
174
175 // -- Step 6: Swap back to the succeeding plugin -------------------
176 write_ok_plugin(&plugins_dir);
177 {
178 let pm = orch.plugins();
179 let mut pm = pm.write().await;
180 pm.load_plugin(&plugin_path).unwrap();
181 pm.initialize_plugin("lifecycle", bb_interface::BusserConfig::new())
182 .unwrap();
183 }
184
185 // -- Step 7: Reset circuit breaker and fetch again ----------------
186 let count = orch
187 .reset_circuit_breaker_and_fetch("lifecycle")
188 .await
189 .unwrap();
190 assert_eq!(count, 1, "fetch after reset should return 1 item");
191
192 // -- Step 8: Verify full recovery ---------------------------------
193 let feed_recovered = db.feeds().get(feed.id).await.unwrap().unwrap();
194 assert!(
195 !feed_recovered.circuit_broken,
196 "circuit breaker should be cleared after reset + successful fetch"
197 );
198 assert_eq!(
199 feed_recovered.consecutive_failures, 0,
200 "failure counter should be 0 after successful fetch"
201 );
202 assert!(
203 feed_recovered.last_error.is_none(),
204 "last_error should be cleared after successful fetch"
205 );
206
207 // The feed should be back in the enabled list
208 let enabled = db.feeds().list_enabled().await.unwrap();
209 assert!(
210 enabled.iter().any(|f| f.id == feed.id),
211 "recovered feed should appear in enabled list"
212 );
213 }
214
215 /// Auth errors immediately trip the circuit breaker (no threshold ramp).
216 #[tokio::test]
217 async fn plugin_lifecycle_auth_error_immediate_circuit_break() {
218 let orch = common::setup("lifecycle_auth").await;
219 let db = orch.database();
220
221 let plugins_dir = {
222 let pm = orch.plugins();
223 let pm = pm.read().await;
224 pm.plugins_dir().to_path_buf()
225 };
226
227 // Write a plugin that throws an auth error
228 let code = r#"
229 fn id() { "lifecycle_auth" }
230 fn name() { "Auth Fail Plugin" }
231 fn config_schema() { #{ description: "auth fail", fields: [] } }
232 fn fetch(config, cursor) {
233 throw "BB_ERR:auth:HTTP 401: Invalid API key";
234 }
235 "#;
236 let plugin_path = plugins_dir.join("lifecycle_auth.rhai");
237 std::fs::write(&plugin_path, code).unwrap();
238
239 {
240 let pm = orch.plugins();
241 let mut pm = pm.write().await;
242 pm.load_plugin(&plugin_path).unwrap();
243 pm.initialize_plugin("lifecycle_auth", bb_interface::BusserConfig::new())
244 .unwrap();
245 }
246
247 let feed = db
248 .feeds()
249 .create(CreateFeed {
250 busser_id: BusserId::new("lifecycle_auth"),
251 name: "Auth Feed".to_string(),
252 config: serde_json::json!({}),
253 })
254 .await
255 .unwrap();
256
257 // A single fetch failure with an auth error should trip immediately
258 let result = orch.fetch_plugin("lifecycle_auth").await;
259 assert!(result.is_err());
260
261 let feed_after = db.feeds().get(feed.id).await.unwrap().unwrap();
262 assert!(
263 feed_after.circuit_broken,
264 "auth error should immediately trip circuit breaker"
265 );
266 assert_eq!(feed_after.consecutive_failures, 1);
267
268 let stored_err =
269 StructuredError::from_last_error(feed_after.last_error.as_ref().unwrap());
270 assert_eq!(stored_err.category, ErrorCategory::Auth);
271
272 // Now fix the plugin (swap to a succeeding one)
273 let ok_code = r#"
274 fn id() { "lifecycle_auth" }
275 fn name() { "Auth Fixed Plugin" }
276 fn config_schema() { #{ description: "fixed", fields: [] } }
277 fn fetch(config, cursor) {
278 #{
279 items: [
280 #{
281 id: "auth-fixed-1",
282 bite: #{ author: "Tester", text: "Fixed" },
283 content: #{ title: "Fixed", url: "https://example.com/fixed" },
284 meta: #{ source_name: "lifecycle_auth", published_at: 1700000000, tags: [] }
285 }
286 ],
287 has_more: false
288 }
289 }
290 "#;
291 std::fs::write(&plugin_path, ok_code).unwrap();
292 {
293 let pm = orch.plugins();
294 let mut pm = pm.write().await;
295 pm.load_plugin(&plugin_path).unwrap();
296 pm.initialize_plugin("lifecycle_auth", bb_interface::BusserConfig::new())
297 .unwrap();
298 }
299
300 // Reset and fetch
301 let count = orch
302 .reset_circuit_breaker_and_fetch("lifecycle_auth")
303 .await
304 .unwrap();
305 assert_eq!(count, 1);
306
307 let feed_recovered = db.feeds().get(feed.id).await.unwrap().unwrap();
308 assert!(!feed_recovered.circuit_broken);
309 assert_eq!(feed_recovered.consecutive_failures, 0);
310 }
311
312 /// Rate-limited errors do NOT increment the failure counter or trip the
313 /// circuit breaker.
314 #[tokio::test]
315 async fn plugin_lifecycle_rate_limited_no_circuit_break() {
316 let orch = common::setup("lifecycle_rate").await;
317 let db = orch.database();
318
319 let plugins_dir = {
320 let pm = orch.plugins();
321 let pm = pm.read().await;
322 pm.plugins_dir().to_path_buf()
323 };
324
325 let code = r#"
326 fn id() { "lifecycle_rate" }
327 fn name() { "Rate Limited Plugin" }
328 fn config_schema() { #{ description: "rate limited", fields: [] } }
329 fn fetch(config, cursor) {
330 throw "BB_ERR:rate_limited:120:HTTP 429: Too Many Requests";
331 }
332 "#;
333 let plugin_path = plugins_dir.join("lifecycle_rate.rhai");
334 std::fs::write(&plugin_path, code).unwrap();
335
336 {
337 let pm = orch.plugins();
338 let mut pm = pm.write().await;
339 pm.load_plugin(&plugin_path).unwrap();
340 pm.initialize_plugin("lifecycle_rate", bb_interface::BusserConfig::new())
341 .unwrap();
342 }
343
344 let feed = db
345 .feeds()
346 .create(CreateFeed {
347 busser_id: BusserId::new("lifecycle_rate"),
348 name: "Rate Feed".to_string(),
349 config: serde_json::json!({}),
350 })
351 .await
352 .unwrap();
353
354 // Fail 15 times — none should increment the failure counter
355 for _ in 0..15 {
356 let _ = orch.fetch_plugin("lifecycle_rate").await;
357 }
358
359 let feed_after = db.feeds().get(feed.id).await.unwrap().unwrap();
360 assert_eq!(
361 feed_after.consecutive_failures, 0,
362 "rate_limited errors should not increment failure counter"
363 );
364 assert!(
365 !feed_after.circuit_broken,
366 "rate_limited errors should never trip circuit breaker"
367 );
368
369 // Verify the stored error has the retry_after hint
370 let stored_err =
371 StructuredError::from_last_error(feed_after.last_error.as_ref().unwrap());
372 assert_eq!(stored_err.category, ErrorCategory::RateLimited);
373 assert_eq!(stored_err.retry_after_secs, Some(120));
374 }
375