//! Full plugin lifecycle test: load -> fetch -> classify error -> circuit break //! -> reset -> fetch again. //! //! Exercises the end-to-end path through the orchestrator, plugin manager, //! error classifier, and circuit breaker, using real Rhai plugins and an //! in-memory SQLite database. mod common; use bb_db::{BusserId, CreateFeed}; use bb_interface::{ErrorCategory, StructuredError}; // ── Helpers ───────────────────────────────────────────────────────── /// Write a Rhai plugin that returns one static item (no network). fn write_ok_plugin(dir: &std::path::Path) -> std::path::PathBuf { let code = r#" fn id() { "lifecycle" } fn name() { "Lifecycle Plugin" } fn config_schema() { #{ description: "Plugin for lifecycle test", fields: [] } } fn fetch(config, cursor) { #{ items: [ #{ id: "lc-1", bite: #{ author: "Tester", text: "Lifecycle item" }, content: #{ title: "Lifecycle Title", body: "
Body
", url: "https://example.com/lifecycle" }, meta: #{ source_name: "lifecycle", published_at: 1700000000, tags: ["test"] } } ], has_more: false } } "#; let path = dir.join("lifecycle.rhai"); std::fs::write(&path, code).unwrap(); path } /// Write a Rhai plugin with the same `id()` that throws a transient error. fn write_transient_fail_plugin(dir: &std::path::Path) -> std::path::PathBuf { let code = r#" fn id() { "lifecycle" } fn name() { "Lifecycle Plugin" } fn config_schema() { #{ description: "Plugin for lifecycle test (failing)", fields: [] } } fn fetch(config, cursor) { throw "BB_ERR:transient:HTTP 503: Service Unavailable"; } "#; let path = dir.join("lifecycle.rhai"); std::fs::write(&path, code).unwrap(); path } // ── Tests ─────────────────────────────────────────────────────────── /// Full lifecycle: load -> fetch OK -> swap to failing plugin -> accumulate /// transient failures -> circuit break -> reset -> swap back -> fetch OK. #[tokio::test] async fn plugin_lifecycle_transient_circuit_break_and_recovery() { let orch = common::setup("lifecycle_transient").await; let db = orch.database(); // -- Step 1: Write and load the succeeding plugin ----------------- let plugins_dir = { let pm = orch.plugins(); let pm = pm.read().await; pm.plugins_dir().to_path_buf() }; let plugin_path = write_ok_plugin(&plugins_dir); { let pm = orch.plugins(); let mut pm = pm.write().await; pm.load_plugin(&plugin_path).unwrap(); pm.initialize_plugin("lifecycle", bb_interface::BusserConfig::new()) .unwrap(); } // Create a feed row so the orchestrator can store items let feed = db .feeds() .create(CreateFeed { busser_id: BusserId::new("lifecycle"), name: "Lifecycle Feed".to_string(), config: serde_json::json!({}), }) .await .unwrap(); // -- Step 2: First fetch succeeds --------------------------------- let count = orch.fetch_plugin("lifecycle").await.unwrap(); assert_eq!(count, 1, "first fetch should return 1 item"); let feed_after = db.feeds().get(feed.id).await.unwrap().unwrap(); assert_eq!(feed_after.consecutive_failures, 0); assert!(!feed_after.circuit_broken); // -- Step 3: Swap to a failing plugin (transient errors) ---------- // // Overwrite the .rhai file and reload the plugin so `fetch()` now // throws a transient error. write_transient_fail_plugin(&plugins_dir); { let pm = orch.plugins(); let mut pm = pm.write().await; pm.load_plugin(&plugin_path).unwrap(); pm.initialize_plugin("lifecycle", bb_interface::BusserConfig::new()) .unwrap(); } // -- Step 4: Accumulate failures up to circuit breaker threshold --- // // Transient errors use threshold-based circuit breaking (10 failures). for i in 0..10 { let result = orch.fetch_plugin("lifecycle").await; assert!(result.is_err(), "fetch {} should fail", i); } // -- Step 5: Verify circuit breaker is tripped -------------------- let feed_broken = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!( feed_broken.circuit_broken, "circuit breaker should be tripped after 10 transient failures" ); assert_eq!(feed_broken.consecutive_failures, 10); // Verify the stored error is classified as transient let stored_err = StructuredError::from_last_error(feed_broken.last_error.as_ref().unwrap()); assert_eq!( stored_err.category, ErrorCategory::Transient, "error should be classified as transient" ); // Orchestrator helper should also report broken assert!( orch.is_circuit_broken("lifecycle").await.unwrap(), "is_circuit_broken should return true" ); // The feed should be excluded from the enabled list let enabled = db.feeds().list_enabled().await.unwrap(); assert!( enabled.iter().all(|f| f.id != feed.id), "broken feed should not appear in enabled list" ); // -- Step 6: Swap back to the succeeding plugin ------------------- write_ok_plugin(&plugins_dir); { let pm = orch.plugins(); let mut pm = pm.write().await; pm.load_plugin(&plugin_path).unwrap(); pm.initialize_plugin("lifecycle", bb_interface::BusserConfig::new()) .unwrap(); } // -- Step 7: Reset circuit breaker and fetch again ---------------- let count = orch .reset_circuit_breaker_and_fetch("lifecycle") .await .unwrap(); assert_eq!(count, 1, "fetch after reset should return 1 item"); // -- Step 8: Verify full recovery --------------------------------- let feed_recovered = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!( !feed_recovered.circuit_broken, "circuit breaker should be cleared after reset + successful fetch" ); assert_eq!( feed_recovered.consecutive_failures, 0, "failure counter should be 0 after successful fetch" ); assert!( feed_recovered.last_error.is_none(), "last_error should be cleared after successful fetch" ); // The feed should be back in the enabled list let enabled = db.feeds().list_enabled().await.unwrap(); assert!( enabled.iter().any(|f| f.id == feed.id), "recovered feed should appear in enabled list" ); } /// Auth errors immediately trip the circuit breaker (no threshold ramp). #[tokio::test] async fn plugin_lifecycle_auth_error_immediate_circuit_break() { let orch = common::setup("lifecycle_auth").await; let db = orch.database(); let plugins_dir = { let pm = orch.plugins(); let pm = pm.read().await; pm.plugins_dir().to_path_buf() }; // Write a plugin that throws an auth error let code = r#" fn id() { "lifecycle_auth" } fn name() { "Auth Fail Plugin" } fn config_schema() { #{ description: "auth fail", fields: [] } } fn fetch(config, cursor) { throw "BB_ERR:auth:HTTP 401: Invalid API key"; } "#; let plugin_path = plugins_dir.join("lifecycle_auth.rhai"); std::fs::write(&plugin_path, code).unwrap(); { let pm = orch.plugins(); let mut pm = pm.write().await; pm.load_plugin(&plugin_path).unwrap(); pm.initialize_plugin("lifecycle_auth", bb_interface::BusserConfig::new()) .unwrap(); } let feed = db .feeds() .create(CreateFeed { busser_id: BusserId::new("lifecycle_auth"), name: "Auth Feed".to_string(), config: serde_json::json!({}), }) .await .unwrap(); // A single fetch failure with an auth error should trip immediately let result = orch.fetch_plugin("lifecycle_auth").await; assert!(result.is_err()); let feed_after = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!( feed_after.circuit_broken, "auth error should immediately trip circuit breaker" ); assert_eq!(feed_after.consecutive_failures, 1); let stored_err = StructuredError::from_last_error(feed_after.last_error.as_ref().unwrap()); assert_eq!(stored_err.category, ErrorCategory::Auth); // Now fix the plugin (swap to a succeeding one) let ok_code = r#" fn id() { "lifecycle_auth" } fn name() { "Auth Fixed Plugin" } fn config_schema() { #{ description: "fixed", fields: [] } } fn fetch(config, cursor) { #{ items: [ #{ id: "auth-fixed-1", bite: #{ author: "Tester", text: "Fixed" }, content: #{ title: "Fixed", url: "https://example.com/fixed" }, meta: #{ source_name: "lifecycle_auth", published_at: 1700000000, tags: [] } } ], has_more: false } } "#; std::fs::write(&plugin_path, ok_code).unwrap(); { let pm = orch.plugins(); let mut pm = pm.write().await; pm.load_plugin(&plugin_path).unwrap(); pm.initialize_plugin("lifecycle_auth", bb_interface::BusserConfig::new()) .unwrap(); } // Reset and fetch let count = orch .reset_circuit_breaker_and_fetch("lifecycle_auth") .await .unwrap(); assert_eq!(count, 1); let feed_recovered = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!(!feed_recovered.circuit_broken); assert_eq!(feed_recovered.consecutive_failures, 0); } /// Rate-limited errors do NOT increment the failure counter or trip the /// circuit breaker. #[tokio::test] async fn plugin_lifecycle_rate_limited_no_circuit_break() { let orch = common::setup("lifecycle_rate").await; let db = orch.database(); let plugins_dir = { let pm = orch.plugins(); let pm = pm.read().await; pm.plugins_dir().to_path_buf() }; let code = r#" fn id() { "lifecycle_rate" } fn name() { "Rate Limited Plugin" } fn config_schema() { #{ description: "rate limited", fields: [] } } fn fetch(config, cursor) { throw "BB_ERR:rate_limited:120:HTTP 429: Too Many Requests"; } "#; let plugin_path = plugins_dir.join("lifecycle_rate.rhai"); std::fs::write(&plugin_path, code).unwrap(); { let pm = orch.plugins(); let mut pm = pm.write().await; pm.load_plugin(&plugin_path).unwrap(); pm.initialize_plugin("lifecycle_rate", bb_interface::BusserConfig::new()) .unwrap(); } let feed = db .feeds() .create(CreateFeed { busser_id: BusserId::new("lifecycle_rate"), name: "Rate Feed".to_string(), config: serde_json::json!({}), }) .await .unwrap(); // Fail 15 times — none should increment the failure counter for _ in 0..15 { let _ = orch.fetch_plugin("lifecycle_rate").await; } let feed_after = db.feeds().get(feed.id).await.unwrap().unwrap(); assert_eq!( feed_after.consecutive_failures, 0, "rate_limited errors should not increment failure counter" ); assert!( !feed_after.circuit_broken, "rate_limited errors should never trip circuit breaker" ); // Verify the stored error has the retry_after hint let stored_err = StructuredError::from_last_error(feed_after.last_error.as_ref().unwrap()); assert_eq!(stored_err.category, ErrorCategory::RateLimited); assert_eq!(stored_err.retry_after_secs, Some(120)); }