//! Feed CRUD, update, get, enabled/disabled, transaction deletion, //! and source listing integration tests. mod common; use bb_db::{BusserId, CreateFeed, FeedId}; use bb_feed::FeedGenerator; // ── Feed CRUD ──────────────────────────────────────────────────────── #[tokio::test] async fn feed_create_and_list() { let orch = common::setup("feed_create_list").await; let db = orch.database(); let feed = db .feeds() .create(CreateFeed { busser_id: BusserId::new("rss"), name: "Test Feed".to_string(), config: serde_json::json!({ "feed_url": "https://example.com/feed.xml" }), }) .await .unwrap(); assert_eq!(feed.busser_id, "rss"); assert_eq!(feed.name, "Test Feed"); assert!(feed.enabled); let all = db.feeds().list_all().await.unwrap(); assert_eq!(all.len(), 1); assert_eq!(all[0].id, feed.id); } #[tokio::test] async fn feed_delete_cascades_items() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Doomed Feed", "https://example.com/rss").await; common::insert_item(&db, &feed, "rss:d1", "Article 1", 1).await; common::insert_item(&db, &feed, "rss:d2", "Article 2", 2).await; assert_eq!(db.items().count_all().await.unwrap(), 2); // Delete items first, then feed (replicating delete_feed command logic) db.items().delete_by_feed(feed.id).await.unwrap(); db.feeds().delete(feed.id).await.unwrap(); assert_eq!(db.items().count_all().await.unwrap(), 0); assert!(db.feeds().get(feed.id).await.unwrap().is_none()); } #[tokio::test] async fn feed_delete_by_busser() { let db = common::test_db().await; let feed_a = common::create_rss_feed(&db, "Feed A", "https://a.com/rss").await; let feed_b = common::create_rss_feed(&db, "Feed B", "https://b.com/rss").await; let feed_hn = common::create_other_feed(&db, "hn", "HN Feed").await; common::insert_item(&db, &feed_a, "rss:a1", "A", 1).await; common::insert_item(&db, &feed_b, "rss:b1", "B", 1).await; common::insert_item(&db, &feed_hn, "hn:1", "HN", 1).await; // Delete all RSS feeds (replicating delete_feeds_by_busser command logic) let rss_feeds = db.feeds().get_by_busser("rss").await.unwrap(); for feed in &rss_feeds { db.items().delete_by_feed(feed.id).await.unwrap(); db.feeds().delete(feed.id).await.unwrap(); } // Only HN feed and its item should remain let remaining_feeds = db.feeds().list_all().await.unwrap(); assert_eq!(remaining_feeds.len(), 1); assert_eq!(remaining_feeds[0].busser_id, "hn"); assert_eq!(db.items().count_all().await.unwrap(), 1); } #[tokio::test] async fn feed_get_by_busser() { let db = common::test_db().await; common::create_rss_feed(&db, "RSS A", "https://a.com/rss").await; common::create_rss_feed(&db, "RSS B", "https://b.com/rss").await; common::create_other_feed(&db, "hn", "HN").await; let rss_feeds = db.feeds().get_by_busser("rss").await.unwrap(); assert_eq!(rss_feeds.len(), 2); let hn_feeds = db.feeds().get_by_busser("hn").await.unwrap(); assert_eq!(hn_feeds.len(), 1); let empty = db.feeds().get_by_busser("nonexistent").await.unwrap(); assert!(empty.is_empty()); } // ── Feed Update ────────────────────────────────────────────────────── #[tokio::test] async fn feed_update_name() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Original Name", "https://example.com/rss").await; db.feeds().update_name(feed.id, "Updated Name").await.unwrap(); let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); assert_eq!(fetched.name, "Updated Name"); } #[tokio::test] async fn feed_update_config() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await; let new_config = serde_json::json!({ "feed_url": "https://new.example.com/rss" }); let config_str = serde_json::to_string(&new_config).unwrap(); db.feeds().update_config(feed.id, &config_str).await.unwrap(); let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); let config = fetched.config_json(); assert_eq!(config["feed_url"], "https://new.example.com/rss"); } #[tokio::test] async fn feed_update_name_and_config_together() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Old Name", "https://old.example.com/rss").await; // Replicate update_feed command: update name then config db.feeds().update_name(feed.id, "New Name").await.unwrap(); let new_config = serde_json::json!({ "feed_url": "https://new.example.com/rss" }); let config_str = serde_json::to_string(&new_config).unwrap(); db.feeds().update_config(feed.id, &config_str).await.unwrap(); let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); assert_eq!(fetched.name, "New Name"); assert_eq!(fetched.config_json()["feed_url"], "https://new.example.com/rss"); } #[tokio::test] async fn feed_update_preserves_other_fields() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await; // Record a failure so the feed has non-default state db.feeds().record_fetch_failure(feed.id, "timeout").await.unwrap(); // Update just the name db.feeds().update_name(feed.id, "Renamed").await.unwrap(); let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); assert_eq!(fetched.name, "Renamed"); assert_eq!(fetched.consecutive_failures, 1); assert_eq!(fetched.last_error.as_deref(), Some("timeout")); assert!(fetched.enabled); } // ── Feed Get (single feed retrieval) ───────────────────────────────── #[tokio::test] async fn feed_get_by_id() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "My Feed", "https://example.com/rss").await; let fetched = db.feeds().get(feed.id).await.unwrap().unwrap(); assert_eq!(fetched.id, feed.id); assert_eq!(fetched.name, "My Feed"); assert_eq!(fetched.busser_id, "rss"); assert!(fetched.enabled); assert!(!fetched.circuit_broken); } #[tokio::test] async fn feed_get_nonexistent_returns_none() { let db = common::test_db().await; let fake_id = FeedId::new(); let result = db.feeds().get(fake_id).await.unwrap(); assert!(result.is_none()); } #[tokio::test] async fn feed_get_first_by_busser_replicates_get_feed_command() { let db = common::test_db().await; // Create two feeds for the same busser common::create_rss_feed(&db, "Alpha Feed", "https://alpha.com/rss").await; common::create_rss_feed(&db, "Beta Feed", "https://beta.com/rss").await; // The get_feed command returns the first feed for a busser let feeds = db.feeds().get_by_busser("rss").await.unwrap(); let first = feeds.into_iter().next().unwrap(); // get_by_busser is ordered by name, so "Alpha Feed" should be first assert_eq!(first.name, "Alpha Feed"); assert_eq!(first.busser_id, "rss"); } // ── Feed Enabled/Disabled ──────────────────────────────────────────── #[tokio::test] async fn feed_disable_excludes_from_enabled_list() { let db = common::test_db().await; let feed_a = common::create_rss_feed(&db, "Feed A", "https://a.com/rss").await; let feed_b = common::create_rss_feed(&db, "Feed B", "https://b.com/rss").await; db.feeds().set_enabled(feed_a.id, false).await.unwrap(); let enabled = db.feeds().list_enabled().await.unwrap(); assert_eq!(enabled.len(), 1); assert_eq!(enabled[0].id, feed_b.id); // Re-enable db.feeds().set_enabled(feed_a.id, true).await.unwrap(); let enabled = db.feeds().list_enabled().await.unwrap(); assert_eq!(enabled.len(), 2); } // ── Feed Transaction Deletion ──────────────────────────────────────── #[tokio::test] async fn feed_delete_via_transaction() { use sqlx::Acquire; let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Doomed", "https://example.com/rss").await; common::insert_item(&db, &feed, "rss:d1", "Article", 1).await; common::insert_item(&db, &feed, "rss:d2", "Article 2", 2).await; // Replicate the actual command handler's transaction-based deletion let mut conn = db.pool().acquire().await.unwrap(); let mut tx = conn.begin().await.unwrap(); sqlx::query("DELETE FROM feed_items WHERE feed_id = ?1") .bind(feed.id) .execute(&mut *tx) .await .unwrap(); sqlx::query("DELETE FROM feeds WHERE id = ?1") .bind(feed.id) .execute(&mut *tx) .await .unwrap(); tx.commit().await.unwrap(); assert_eq!(db.items().count_all().await.unwrap(), 0); assert!(db.feeds().get(feed.id).await.unwrap().is_none()); } #[tokio::test] async fn feed_delete_by_busser_via_transaction() { use sqlx::Acquire; let db = common::test_db().await; let feed_a = common::create_rss_feed(&db, "RSS A", "https://a.com/rss").await; let feed_b = common::create_rss_feed(&db, "RSS B", "https://b.com/rss").await; let feed_hn = common::create_other_feed(&db, "hn", "HN").await; common::insert_item(&db, &feed_a, "rss:a1", "A", 1).await; common::insert_item(&db, &feed_b, "rss:b1", "B", 1).await; common::insert_item(&db, &feed_hn, "hn:1", "HN", 1).await; // Replicate delete_feeds_by_busser command logic let feeds = db.feeds().get_by_busser("rss").await.unwrap(); let mut conn = db.pool().acquire().await.unwrap(); let mut tx = conn.begin().await.unwrap(); for feed in &feeds { sqlx::query("DELETE FROM feed_items WHERE feed_id = ?1") .bind(feed.id) .execute(&mut *tx) .await .unwrap(); sqlx::query("DELETE FROM feeds WHERE id = ?1") .bind(feed.id) .execute(&mut *tx) .await .unwrap(); } tx.commit().await.unwrap(); // Only HN feed and its item should remain let remaining = db.feeds().list_all().await.unwrap(); assert_eq!(remaining.len(), 1); assert_eq!(remaining[0].busser_id, "hn"); assert_eq!(db.items().count_all().await.unwrap(), 1); } // ── Source Listing: Unread Counts After State Changes ──────────────── #[tokio::test] async fn source_listing_unread_counts_update_after_mark_read() { let db = common::test_db().await; let feed_rss = common::create_rss_feed(&db, "RSS", "https://example.com/rss").await; let feed_hn = common::create_other_feed(&db, "hn", "HN").await; let rss1 = common::insert_item(&db, &feed_rss, "rss:1", "RSS 1", 1).await; common::insert_item(&db, &feed_rss, "rss:2", "RSS 2", 2).await; common::insert_item(&db, &feed_hn, "hn:1", "HN 1", 1).await; let fg = FeedGenerator::new(db.clone()); // Initial state: RSS has 2 unread, HN has 1 unread let sources = fg.get_sources().await.unwrap(); let rss = sources.iter().find(|s| s.id == "rss").unwrap(); assert_eq!(rss.unread_count, 2); let hn = sources.iter().find(|s| s.id == "hn").unwrap(); assert_eq!(hn.unread_count, 1); // Mark one RSS item read db.items().mark_read(rss1.id, true).await.unwrap(); let sources = fg.get_sources().await.unwrap(); let rss = sources.iter().find(|s| s.id == "rss").unwrap(); assert_eq!(rss.unread_count, 1); assert_eq!(rss.total_count, 2); // total unchanged // HN unaffected let hn = sources.iter().find(|s| s.id == "hn").unwrap(); assert_eq!(hn.unread_count, 1); } // ── Source Listing: Circuit Breaker Health Status ───────────────────── #[tokio::test] async fn source_health_circuit_broken_overrides_failure_count() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await; // Accumulate failures past the circuit breaker threshold for _ in 0..10 { db.feeds() .record_fetch_failure(feed.id, "connection timeout") .await .unwrap(); } // Verify circuit breaker tripped let refreshed = db.feeds().get(feed.id).await.unwrap().unwrap(); assert!(refreshed.circuit_broken); // Replicate the health mapping from sources.rs let fg = FeedGenerator::new(db.clone()); let sources = fg.get_sources().await.unwrap(); let source = &sources[0]; // The sources.rs command checks circuit_broken first, before failure count let health = if source.circuit_broken { "circuit_broken" } else { match source.consecutive_failures { 0 => "green", 1..=2 => "yellow", _ => "red", } }; assert_eq!(health, "circuit_broken"); assert!(source.circuit_broken); assert!(source.last_error.is_some()); } #[tokio::test] async fn source_health_circuit_broken_resets_to_green() { let db = common::test_db().await; let feed = common::create_rss_feed(&db, "Feed", "https://example.com/rss").await; // Trip the circuit breaker for _ in 0..10 { db.feeds() .record_fetch_failure(feed.id, "error") .await .unwrap(); } // Reset via the repository method (mirrors reset_circuit_breaker command) db.feeds().reset_circuit_breaker(feed.id).await.unwrap(); let fg = FeedGenerator::new(db.clone()); let sources = fg.get_sources().await.unwrap(); let source = &sources[0]; assert!(!source.circuit_broken); assert_eq!(source.consecutive_failures, 0); assert!(source.last_error.is_none()); let health = if source.circuit_broken { "circuit_broken" } else { match source.consecutive_failures { 0 => "green", 1..=2 => "yellow", _ => "red", } }; assert_eq!(health, "green"); } // ── Source Listing: Tags ───────────────────────────────────────────── #[tokio::test] async fn source_listing_includes_feed_tags() { let db = common::test_db().await; let feed_rss = common::create_rss_feed(&db, "RSS Feed", "https://example.com/rss").await; let feed_hn = common::create_other_feed(&db, "hn", "HN").await; db.tags() .set_tags(feed_rss.id, &["tech".to_string(), "news".to_string()]) .await .unwrap(); db.tags() .set_tags(feed_hn.id, &["social".to_string()]) .await .unwrap(); let fg = FeedGenerator::new(db); let sources = fg.get_sources().await.unwrap(); let rss = sources.iter().find(|s| s.id == "rss").unwrap(); assert!(rss.tags.contains(&"tech".to_string())); assert!(rss.tags.contains(&"news".to_string())); let hn = sources.iter().find(|s| s.id == "hn").unwrap(); assert_eq!(hn.tags, vec!["social"]); } // ── Source Listing: Feed with No Items Shows Zero Counts ───────────── #[tokio::test] async fn source_listing_feed_with_no_items() { let db = common::test_db().await; common::create_rss_feed(&db, "Empty RSS", "https://example.com/rss").await; common::create_other_feed(&db, "hn", "Empty HN").await; let fg = FeedGenerator::new(db); let sources = fg.get_sources().await.unwrap(); assert_eq!(sources.len(), 2); for source in &sources { assert_eq!(source.total_count, 0); assert_eq!(source.unread_count, 0); } }