//! Orchestrator integration, source health status, and circuit breaker //! reset integration tests. mod common; use bb_db::{BusserId, CreateFeed, CreateFeedItem}; use bb_feed::FeedGenerator; use chrono::Utc; // ── Orchestrator Integration ───────────────────────────────────────── #[tokio::test] async fn orchestrator_create_and_query_items() { let orch = common::setup("orch_items").await; let db = orch.database(); let feed = db .feeds() .create(CreateFeed { busser_id: BusserId::new("test"), name: "Test Source".to_string(), config: serde_json::json!({}), }) .await .unwrap(); let item = db .items() .upsert(CreateFeedItem { external_id: "test:art1".to_string(), feed_id: feed.id, busser_id: BusserId::new("test"), bite_author: "Author".to_string(), bite_text: "Breaking news".to_string(), bite_secondary: Some("42 points".to_string()), bite_indicator: None, title: Some("Big Story".to_string()), body: Some("
Full article body
".to_string()), url: Some("https://example.com/story".to_string()), media: vec!["https://example.com/img.jpg".to_string()], published_at: Utc::now(), source_name: "Test Source".to_string(), score: Some(42), tags: vec!["breaking".to_string()], actions: vec![], }) .await .unwrap(); // Verify all fields round-trip correctly let fetched = db.items().get(item.id).await.unwrap().unwrap(); assert_eq!(fetched.bite_author, "Author"); assert_eq!(fetched.bite_text, "Breaking news"); assert_eq!(fetched.bite_secondary, Some("42 points".to_string())); assert_eq!(fetched.title, Some("Big Story".to_string())); assert_eq!(fetched.body, Some("Full article body
".to_string())); assert_eq!(fetched.url, Some("https://example.com/story".to_string())); assert_eq!(fetched.media_vec(), vec!["https://example.com/img.jpg"]); assert_eq!(fetched.score, Some(42)); assert_eq!(fetched.tags_vec(), vec!["breaking"]); assert!(!fetched.is_read); assert!(!fetched.is_starred); } #[tokio::test] async fn orchestrator_source_listing() { let orch = common::setup("orch_sources").await; let db = orch.database(); let feed_rss = db .feeds() .create(CreateFeed { busser_id: BusserId::new("rss"), name: "RSS Feed".to_string(), config: serde_json::json!({ "feed_url": "https://example.com/rss" }), }) .await .unwrap(); let feed_hn = db .feeds() .create(CreateFeed { busser_id: BusserId::new("hn"), name: "Hacker News".to_string(), config: serde_json::json!({}), }) .await .unwrap(); common::insert_item(db, &feed_rss, "rss:s1", "RSS Item", 1).await; common::insert_item(db, &feed_hn, "hn:s1", "HN Item 1", 1).await; common::insert_item(db, &feed_hn, "hn:s2", "HN Item 2", 2).await; // Replicate list_sources command logic let fg = FeedGenerator::new(db.clone()); let sources = fg.get_sources().await.unwrap(); assert_eq!(sources.len(), 2); let hn = sources.iter().find(|s| s.id == "hn").unwrap(); assert_eq!(hn.name, "Hacker News"); assert_eq!(hn.total_count, 2); assert_eq!(hn.unread_count, 2); let rss = sources.iter().find(|s| s.id == "rss").unwrap(); assert_eq!(rss.total_count, 1); } // ── Source Health Status ────────────────────────────────────────────��� #[tokio::test] async fn source_health_green_on_no_failures() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await; common::insert_item(&db, &feed, "rss:1", "Item", 1).await; let fg = FeedGenerator::new(db); let sources = fg.get_sources().await.unwrap(); assert_eq!(sources[0].consecutive_failures, 0); // Replicate health mapping from sources.rs command let health = match sources[0].consecutive_failures { 0 => "green", 1..=2 => "yellow", _ => "red", }; assert_eq!(health, "green"); } #[tokio::test] async fn source_health_yellow_on_few_failures() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await; db.feeds() .record_fetch_failure(feed.id, "timeout") .await .unwrap(); let fg = FeedGenerator::new(db); let sources = fg.get_sources().await.unwrap(); let health = match sources[0].consecutive_failures { 0 => "green", 1..=2 => "yellow", _ => "red", }; assert_eq!(health, "yellow"); } #[tokio::test] async fn source_health_red_on_many_failures() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await; for _ in 0..3 { db.feeds() .record_fetch_failure(feed.id, "connection refused") .await .unwrap(); } let fg = FeedGenerator::new(db); let sources = fg.get_sources().await.unwrap(); let health = match sources[0].consecutive_failures { 0 => "green", 1..=2 => "yellow", _ => "red", }; assert_eq!(health, "red"); assert_eq!( sources[0].last_error.as_deref(), Some("connection refused") ); } #[tokio::test] async fn source_health_resets_after_success() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await; // Fail twice, then succeed db.feeds() .record_fetch_failure(feed.id, "error") .await .unwrap(); db.feeds() .record_fetch_failure(feed.id, "error") .await .unwrap(); db.feeds() .record_fetch_success(feed.id) .await .unwrap(); let fg = FeedGenerator::new(db); let sources = fg.get_sources().await.unwrap(); assert_eq!(sources[0].consecutive_failures, 0); assert!(sources[0].last_error.is_none()); } // ── Circuit Breaker Reset ──────────────────────────────────────────── #[tokio::test] async fn circuit_breaker_trips_after_threshold() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Flaky Feed", "https://example.com/rss").await; // Record failures up to threshold (10) for i in 0..10 { let tripped = db .feeds() .record_fetch_failure(feed.id, &format!("error {}", i)) .await .unwrap(); if i < 9 { assert!(!tripped, "should not trip before threshold"); } else { assert!(tripped, "should trip at threshold"); } } let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!(fetched.circuit_broken); assert_eq!(fetched.consecutive_failures, 10); } #[tokio::test] async fn circuit_breaker_reset_clears_state() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Broken Feed", "https://example.com/rss").await; // Trip the circuit breaker for _ in 0..10 { db.feeds() .record_fetch_failure(feed.id, "connection refused") .await .unwrap(); } let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!(fetched.circuit_broken); // Reset the circuit breaker (replicates reset_circuit_breaker command path) db.feeds().reset_circuit_breaker(feed.id).await.unwrap(); let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!(!fetched.circuit_broken); assert_eq!(fetched.consecutive_failures, 0); assert!(fetched.last_error.is_none()); } #[tokio::test] async fn circuit_breaker_reset_on_non_broken_feed_is_safe() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Healthy Feed", "https://example.com/rss").await; // Feed has a couple failures but hasn't tripped db.feeds().record_fetch_failure(feed.id, "transient").await.unwrap(); db.feeds().record_fetch_failure(feed.id, "transient").await.unwrap(); // Reset even though not circuit-broken — should still clear counters db.feeds().reset_circuit_breaker(feed.id).await.unwrap(); let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!(!fetched.circuit_broken); assert_eq!(fetched.consecutive_failures, 0); assert!(fetched.last_error.is_none()); } #[tokio::test] async fn circuit_breaker_excludes_broken_feeds_from_enabled_list() { let db = common::test_db().await; let healthy = common::create_rss_feed(&db, "Healthy", "https://healthy.com/rss").await; let broken = common::create_rss_feed(&db, "Broken", "https://broken.com/rss").await; // Trip the circuit breaker on one feed for _ in 0..10 { db.feeds() .record_fetch_failure(broken.id, "error") .await .unwrap(); } let enabled = db.feeds().list_enabled().await.unwrap(); assert_eq!(enabled.len(), 1); assert_eq!(enabled[0].id, healthy.id); // list_all still includes the broken feed let all = db.feeds().list_all().await.unwrap(); assert_eq!(all.len(), 2); }