//! Feed management commands (create, delete, fetch) use crate::commands::error::ApiError; use crate::state::AppState; use bb_db::FeedId; use futures::stream::{self, StreamExt}; use serde::{Deserialize, Serialize}; use sqlx::Acquire; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tauri::{Emitter, State}; use tracing::{info, instrument}; /// Frontend input for creating a new feed subscription. #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct CreateFeedInput { pub busser_id: String, pub name: String, pub config: serde_json::Value, } /// Serializable snapshot of a feed subscription (for undo/restore). #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct FeedSnapshot { pub busser_id: String, pub name: String, pub config: serde_json::Value, } /// Response returned after a bulk fetch operation. #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct FetchResponse { pub items_fetched: usize, #[serde(skip_serializing_if = "Vec::is_empty")] pub errors: Vec, } /// Progress event emitted per-plugin during fetch. #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] struct FetchProgress { completed: usize, total: usize, plugin_id: String, } /// Get all feed subscriptions for a busser, including their config. /// /// Used by the frontend to snapshot feed details before deletion (for undo). #[tauri::command] #[instrument(skip_all)] pub async fn get_feeds_by_busser( state: State<'_, Arc>, busser_id: String, ) -> Result, ApiError> { let db = state.orchestrator.database(); let feeds = db.feeds().get_by_busser(&busser_id).await?; // Decrypt Secret fields before returning to the frontend let key = state.orchestrator.encryption_key(); let schema = { let plugins = state.orchestrator.plugins(); let plugins = plugins.read().await; plugins.get_config_schema(&busser_id) }; Ok(feeds .into_iter() .map(|f| { let mut config = f.config_json(); if let (Some(k), Some(s)) = (key, &schema) { bb_core::crypto::decrypt_config_secrets(&mut config, s, k); } FeedSnapshot { busser_id: f.busser_id.to_string(), name: f.name, config, } }) .collect()) } /// Validate feed creation input against the plugin's config schema. /// /// Returns the trimmed feed name on success. fn validate_feed_input( name: &str, config: &serde_json::Value, schema: &bb_interface::ConfigSchema, ) -> Result { let name = name.trim().to_string(); if name.is_empty() { return Err(ApiError::bad_request("Feed name cannot be empty")); } if name.len() > 200 { return Err(ApiError::bad_request( "Feed name cannot exceed 200 characters", )); } let config_obj = config .as_object() .ok_or_else(|| ApiError::bad_request("Config must be a JSON object"))?; for field in &schema.fields { let value = config_obj.get(&field.key); if field.required && value.is_none_or(|v| v.as_str().is_some_and(|s| s.is_empty())) { return Err(ApiError::bad_request(format!( "Missing required field: {}", field.label ))); } if let Some(serde_json::Value::String(s)) = value { if !s.is_empty() { match field.field_type { bb_interface::ConfigFieldType::Url => { if !s.starts_with("http://") && !s.starts_with("https://") { return Err(ApiError::bad_request(format!( "'{}' must start with http:// or https://", field.label ))); } let after_scheme = s.split("://").nth(1).unwrap_or(""); if after_scheme.is_empty() || after_scheme == "/" { return Err(ApiError::bad_request(format!( "'{}' is not a valid URL", field.label ))); } } bb_interface::ConfigFieldType::Number => { if s.parse::().is_err() { return Err(ApiError::bad_request(format!( "'{}' must be a valid number", field.label ))); } } bb_interface::ConfigFieldType::Select => { if !field.options.contains(s) { return Err(ApiError::bad_request(format!( "'{}' must be one of: {}", field.label, field.options.join(", ") ))); } } bb_interface::ConfigFieldType::Toggle => { if s != "true" && s != "false" { return Err(ApiError::bad_request(format!( "'{}' must be \"true\" or \"false\"", field.label ))); } } bb_interface::ConfigFieldType::Text | bb_interface::ConfigFieldType::TextArea | bb_interface::ConfigFieldType::Secret => { if s.len() > 10_000 { return Err(ApiError::bad_request(format!( "'{}' exceeds maximum length of 10,000 characters", field.label ))); } } } } } } Ok(name) } /// Create a new feed and re-initialize its plugin. #[tauri::command] #[instrument(skip_all)] pub async fn create_feed( state: State<'_, Arc>, input: CreateFeedInput, ) -> Result<(), ApiError> { // Validate busser exists and get its schema let schema = { let plugins = state.orchestrator.plugins(); let plugins = plugins.read().await; plugins .get_config_schema(&input.busser_id) .ok_or_else(|| { ApiError::not_found(format!("Plugin '{}' not found", input.busser_id)) })? }; let name = validate_feed_input(&input.name, &input.config, &schema)?; // Serialize the plaintext config for the duplicate check before we move // `input.config` into the encryption step (avoids an extra clone). let new_config_str = serde_json::to_string(&input.config).unwrap_or_default(); let db = state.orchestrator.database(); // Encrypt Secret-type fields before storage (mutates in place, no clone) let config = { let mut cfg = input.config; if let Some(key) = state.orchestrator.encryption_key() { let plugins = state.orchestrator.plugins(); let plugins = plugins.read().await; if let Some(schema) = plugins.get_config_schema(&input.busser_id) { bb_core::crypto::encrypt_config_secrets(&mut cfg, &schema, key); } } cfg }; // Check for duplicate config and insert inside a single transaction to // avoid a TOCTOU race (two concurrent create_feed calls with the same // config could both pass the duplicate check before either inserts). let mut conn = db.pool().acquire().await?; let mut tx = conn.begin().await?; // Duplicate-config check (inside the transaction) { let existing: Vec = sqlx::query_as("SELECT * FROM feeds WHERE busser_id = ?1 ORDER BY name") .bind(&input.busser_id) .fetch_all(&mut *tx) .await?; let key = state.orchestrator.encryption_key(); let schema = { let plugins = state.orchestrator.plugins(); let plugins = plugins.read().await; plugins.get_config_schema(&input.busser_id) }; for feed in &existing { let mut existing_config = feed.config_json(); // Decrypt existing config for comparison against plaintext input if let (Some(k), Some(s)) = (key, &schema) { bb_core::crypto::decrypt_config_secrets(&mut existing_config, s, k); } let existing_config_str = serde_json::to_string(&existing_config).unwrap_or_default(); if existing_config_str == new_config_str { return Err(ApiError::bad_request( "A feed with this configuration already exists", )); } } } // Insert the new feed (still inside the transaction) let feed_id = bb_db::FeedId::new(); let now = chrono::Utc::now() .format(bb_db::TIMESTAMP_FMT) .to_string(); let config_str = serde_json::to_string(&config).unwrap_or_else(|_| "{}".to_string()); sqlx::query( "INSERT INTO feeds (id, busser_id, name, config, enabled, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, 1, ?5, ?5)", ) .bind(feed_id) .bind(&input.busser_id) .bind(&name) .bind(&config_str) .bind(&now) .execute(&mut *tx) .await?; tx.commit().await?; // Re-initialize the plugin with the new feed if let Err(e) = state .orchestrator .init_plugin_from_db(&input.busser_id) .await { tracing::warn!(error = %e, busser_id = %input.busser_id, "Failed to reinitialize plugin"); } info!(busser_id = %input.busser_id, "Created new feed"); Ok(()) } /// Response for a single feed with its database ID (for editing). #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct FeedResponse { pub id: String, pub busser_id: String, pub name: String, pub config: serde_json::Value, } /// Get the first feed for a busser, with decrypted config for editing. #[tauri::command] #[instrument(skip_all)] pub async fn get_feed( state: State<'_, Arc>, busser_id: String, ) -> Result { let db = state.orchestrator.database(); let feeds = db.feeds().get_by_busser(&busser_id).await?; let feed = feeds.into_iter().next().ok_or_else(|| ApiError::not_found("Feed not found"))?; let key = state.orchestrator.encryption_key(); let schema = { let plugins = state.orchestrator.plugins(); let plugins = plugins.read().await; plugins.get_config_schema(&busser_id) }; let mut config = feed.config_json(); if let (Some(k), Some(s)) = (key, &schema) { bb_core::crypto::decrypt_config_secrets(&mut config, s, k); } Ok(FeedResponse { id: feed.id.to_string(), busser_id: feed.busser_id.to_string(), name: feed.name, config, }) } /// Update an existing feed's name and config. #[tauri::command] #[instrument(skip_all)] pub async fn update_feed( state: State<'_, Arc>, id: String, name: String, config: serde_json::Value, ) -> Result<(), ApiError> { let feed_id: FeedId = id.parse().map_err(|_| ApiError::bad_request("Invalid feed ID"))?; let db = state.orchestrator.database(); // Load existing feed to get busser_id let feed = db.feeds().get(feed_id).await?.ok_or_else(|| ApiError::not_found("Feed not found"))?; let busser_id = feed.busser_id.to_string(); // Validate against plugin schema let schema = { let plugins = state.orchestrator.plugins(); let plugins = plugins.read().await; plugins.get_config_schema(&busser_id).ok_or_else(|| { ApiError::not_found(format!("Plugin '{}' not found", busser_id)) })? }; let name = validate_feed_input(&name, &config, &schema)?; // Encrypt Secret-type fields before storage (mutate in place, no clone) let mut config = config; if let Some(key) = state.orchestrator.encryption_key() { let plugins = state.orchestrator.plugins(); let plugins = plugins.read().await; if let Some(schema) = plugins.get_config_schema(&busser_id) { bb_core::crypto::encrypt_config_secrets(&mut config, &schema, key); } } // Update name and config db.feeds().update_name(feed_id, &name).await?; let config_str = serde_json::to_string(&config) .map_err(|e| ApiError::internal(format!("Failed to serialize config: {}", e)))?; db.feeds().update_config(feed_id, &config_str).await?; // Re-initialize the plugin with the updated config if let Err(e) = state.orchestrator.init_plugin_from_db(&busser_id).await { tracing::warn!(error = %e, %busser_id, "Failed to reinitialize plugin"); } info!(%id, %busser_id, "Updated feed"); Ok(()) } /// Delete a single feed and all its items in a transaction. #[tauri::command] #[instrument(skip_all)] pub async fn delete_feed( state: State<'_, Arc>, id: String, ) -> Result<(), ApiError> { let feed_id: FeedId = id.parse().map_err(|_| ApiError::bad_request("Invalid feed ID"))?; let db = state.orchestrator.database(); // Delete items and feed in a single transaction let mut conn = db.pool().acquire().await?; let mut tx = conn.begin().await?; sqlx::query("DELETE FROM feed_items WHERE feed_id = ?1") .bind(feed_id) .execute(&mut *tx) .await?; sqlx::query("DELETE FROM feeds WHERE id = ?1") .bind(feed_id) .execute(&mut *tx) .await?; tx.commit().await?; info!(%id, "Deleted feed"); Ok(()) } /// Delete all feeds (and their items) belonging to a busser. #[tauri::command] #[instrument(skip_all)] pub async fn delete_feeds_by_busser( state: State<'_, Arc>, busser_id: String, ) -> Result<(), ApiError> { let db = state.orchestrator.database(); // Get all feeds for this busser so we can delete their items let feeds = db.feeds().get_by_busser(&busser_id).await?; let mut conn = db.pool().acquire().await?; let mut tx = conn.begin().await?; for feed in &feeds { sqlx::query("DELETE FROM feed_items WHERE feed_id = ?1") .bind(feed.id) .execute(&mut *tx) .await?; sqlx::query("DELETE FROM feeds WHERE id = ?1") .bind(feed.id) .execute(&mut *tx) .await?; } tx.commit().await?; info!(count = feeds.len(), %busser_id, "Deleted feeds"); Ok(()) } /// Tag rules for Balanced Breakfast: shallow hierarchy, no required semantic prefix. const BB_TAG_CONFIG: tagtree::TagConfig = tagtree::TagConfig { max_depth: 3, max_length: 80, semantic_depth: 0, }; /// Set tags on all feeds belonging to a busser. #[tauri::command] #[instrument(skip_all)] pub async fn set_feed_tags( state: State<'_, Arc>, busser_id: String, tags: Vec, ) -> Result<(), ApiError> { for tag in &tags { tagtree::validate_with(tag, &BB_TAG_CONFIG) .map_err(|e| ApiError::bad_request(format!("invalid tag: {}", e.0)))?; } let db = state.orchestrator.database(); let feeds = db.feeds().get_by_busser(&busser_id).await?; for feed in &feeds { db.tags().set_tags(feed.id, &tags).await?; } info!(?tags, count = feeds.len(), %busser_id, "Set feed tags"); Ok(()) } /// List all distinct tags across all feeds. #[tauri::command] #[instrument(skip_all)] pub async fn list_all_tags( state: State<'_, Arc>, ) -> Result, ApiError> { Ok(state.orchestrator.database().tags().list_all_tags().await?) } /// Trigger a fetch from all active plugins and return the total item count. /// Emits `fetch-progress` events per plugin so the frontend can show a real progress bar. #[tauri::command] #[instrument(skip_all)] pub async fn fetch_all( app: tauri::AppHandle, state: State<'_, Arc>, ) -> Result { let plugin_ids = { let plugins = state.orchestrator.plugins(); let plugins = plugins.read().await; plugins.list_plugins() }; let total = plugin_ids.len(); let completed = AtomicUsize::new(0); let results: Vec<_> = stream::iter(plugin_ids) .map(|plugin_id| { let app = &app; let orchestrator = &state.orchestrator; let completed = &completed; async move { let result = orchestrator.fetch_plugin(&plugin_id).await; let n = completed.fetch_add(1, Ordering::Relaxed) + 1; let _ = app.emit("fetch-progress", FetchProgress { completed: n, total, plugin_id: plugin_id.clone(), }); (plugin_id, result) } }) .buffer_unordered(4) .collect() .await; let mut items_fetched = 0; let mut errors = Vec::new(); for (plugin_id, result) in results { match result { Ok(count) => items_fetched += count, Err(e) => { tracing::error!(error = %e, %plugin_id, "Failed to fetch from plugin"); errors.push(format!("{}: {}", plugin_id, e)); } } } info!(items_fetched, errors = errors.len(), "Fetched from all sources"); Ok(FetchResponse { items_fetched, errors, }) } /// Reset the circuit breaker for a feed and attempt a fresh fetch. /// /// When a feed has been auto-disabled after too many consecutive failures, /// this command clears the circuit-broken state, resets the failure counter, /// and immediately retries the fetch. Returns the number of items fetched. #[tauri::command] #[instrument(skip_all)] pub async fn reset_circuit_breaker( state: State<'_, Arc>, busser_id: String, ) -> Result { let count = state .orchestrator .reset_circuit_breaker_and_fetch(&busser_id) .await?; info!(%busser_id, items_fetched = count, "Circuit breaker reset"); Ok(FetchResponse { items_fetched: count, errors: Vec::new(), }) } #[cfg(test)] mod tests { use super::*; use bb_interface::{ConfigField, ConfigFieldType, ConfigSchema}; /// Build a schema with a single required URL field (typical RSS feed config). fn rss_schema() -> ConfigSchema { ConfigSchema { description: "RSS feed".to_string(), fields: vec![ConfigField { key: "feed_url".to_string(), label: "Feed URL".to_string(), description: None, field_type: ConfigFieldType::Url, required: true, default: None, options: vec![], placeholder: None, }], } } /// Build an empty schema (no config fields required). fn empty_schema() -> ConfigSchema { ConfigSchema { description: "No config".to_string(), fields: vec![], } } /// Build a schema with one field of the given type. fn schema_with_field( key: &str, label: &str, field_type: ConfigFieldType, required: bool, options: Vec, ) -> ConfigSchema { ConfigSchema { description: "Test".to_string(), fields: vec![ConfigField { key: key.to_string(), label: label.to_string(), description: None, field_type, required, default: None, options, placeholder: None, }], } } // -- Name validation -- #[test] fn validate_empty_name() { let r = validate_feed_input("", &serde_json::json!({}), &empty_schema()); assert!(r.is_err()); assert!(r.unwrap_err().message.contains("empty")); } #[test] fn validate_whitespace_only_name() { let r = validate_feed_input(" ", &serde_json::json!({}), &empty_schema()); assert!(r.is_err()); assert!(r.unwrap_err().message.contains("empty")); } #[test] fn validate_name_too_long() { let long = "x".repeat(201); let r = validate_feed_input(&long, &serde_json::json!({}), &empty_schema()); assert!(r.is_err()); assert!(r.unwrap_err().message.contains("200")); } #[test] fn validate_name_at_limit() { let name = "x".repeat(200); let r = validate_feed_input(&name, &serde_json::json!({}), &empty_schema()); assert!(r.is_ok()); assert_eq!(r.unwrap(), name); } #[test] fn validate_name_trimmed() { let r = validate_feed_input(" My Feed ", &serde_json::json!({}), &empty_schema()); assert_eq!(r.unwrap(), "My Feed"); } // -- Config structure -- #[test] fn validate_config_not_object() { let r = validate_feed_input("Feed", &serde_json::json!("string"), &empty_schema()); assert!(r.is_err()); assert!(r.unwrap_err().message.contains("JSON object")); } #[test] fn validate_config_array_rejected() { let r = validate_feed_input("Feed", &serde_json::json!([1, 2]), &empty_schema()); assert!(r.is_err()); } // -- Required fields -- #[test] fn validate_missing_required_field() { let r = validate_feed_input("Feed", &serde_json::json!({}), &rss_schema()); assert!(r.is_err()); assert!(r.unwrap_err().message.contains("Feed URL")); } #[test] fn validate_empty_required_field() { let config = serde_json::json!({"feed_url": ""}); let r = validate_feed_input("Feed", &config, &rss_schema()); assert!(r.is_err()); assert!(r.unwrap_err().message.contains("Feed URL")); } #[test] fn validate_optional_field_missing_ok() { let schema = schema_with_field("notes", "Notes", ConfigFieldType::Text, false, vec![]); let r = validate_feed_input("Feed", &serde_json::json!({}), &schema); assert!(r.is_ok()); } // -- URL validation -- #[test] fn validate_url_valid() { let config = serde_json::json!({"feed_url": "https://example.com/feed.xml"}); assert!(validate_feed_input("Feed", &config, &rss_schema()).is_ok()); } #[test] fn validate_url_http_valid() { let config = serde_json::json!({"feed_url": "http://example.com/feed"}); assert!(validate_feed_input("Feed", &config, &rss_schema()).is_ok()); } #[test] fn validate_url_missing_scheme() { let config = serde_json::json!({"feed_url": "example.com/feed"}); let r = validate_feed_input("Feed", &config, &rss_schema()); assert!(r.unwrap_err().message.contains("http")); } #[test] fn validate_url_empty_after_scheme() { let config = serde_json::json!({"feed_url": "https://"}); let r = validate_feed_input("Feed", &config, &rss_schema()); assert!(r.is_err()); assert!(r.unwrap_err().message.contains("valid URL")); } #[test] fn validate_url_only_slash_after_scheme() { let config = serde_json::json!({"feed_url": "https:///"}); let r = validate_feed_input("Feed", &config, &rss_schema()); assert!(r.is_err()); } // -- Number validation -- #[test] fn validate_number_valid() { let schema = schema_with_field("count", "Count", ConfigFieldType::Number, true, vec![]); let config = serde_json::json!({"count": "42"}); assert!(validate_feed_input("Feed", &config, &schema).is_ok()); } #[test] fn validate_number_float_valid() { let schema = schema_with_field("count", "Count", ConfigFieldType::Number, true, vec![]); let config = serde_json::json!({"count": "3.14"}); assert!(validate_feed_input("Feed", &config, &schema).is_ok()); } #[test] fn validate_number_invalid() { let schema = schema_with_field("count", "Count", ConfigFieldType::Number, true, vec![]); let config = serde_json::json!({"count": "abc"}); let r = validate_feed_input("Feed", &config, &schema); assert!(r.unwrap_err().message.contains("valid number")); } // -- Select validation -- #[test] fn validate_select_valid() { let schema = schema_with_field( "sort", "Sort", ConfigFieldType::Select, true, vec!["new".into(), "top".into()], ); let config = serde_json::json!({"sort": "new"}); assert!(validate_feed_input("Feed", &config, &schema).is_ok()); } #[test] fn validate_select_invalid_option() { let schema = schema_with_field( "sort", "Sort", ConfigFieldType::Select, true, vec!["new".into(), "top".into()], ); let config = serde_json::json!({"sort": "random"}); let r = validate_feed_input("Feed", &config, &schema); assert!(r.unwrap_err().message.contains("must be one of")); } // -- Toggle validation -- #[test] fn validate_toggle_valid() { let schema = schema_with_field("enabled", "Enabled", ConfigFieldType::Toggle, true, vec![]); assert!(validate_feed_input("F", &serde_json::json!({"enabled": "true"}), &schema).is_ok()); assert!( validate_feed_input("F", &serde_json::json!({"enabled": "false"}), &schema).is_ok() ); } #[test] fn validate_toggle_invalid() { let schema = schema_with_field("enabled", "Enabled", ConfigFieldType::Toggle, true, vec![]); let config = serde_json::json!({"enabled": "yes"}); let r = validate_feed_input("Feed", &config, &schema); assert!(r.unwrap_err().message.contains("true")); } // -- Text length validation -- #[test] fn validate_text_within_limit() { let schema = schema_with_field("bio", "Bio", ConfigFieldType::TextArea, false, vec![]); let config = serde_json::json!({"bio": "Hello world"}); assert!(validate_feed_input("Feed", &config, &schema).is_ok()); } #[test] fn validate_text_exceeds_limit() { let schema = schema_with_field("bio", "Bio", ConfigFieldType::Text, false, vec![]); let long = "x".repeat(10_001); let config = serde_json::json!({"bio": long}); let r = validate_feed_input("Feed", &config, &schema); assert!(r.unwrap_err().message.contains("10,000")); } #[test] fn validate_secret_exceeds_limit() { let schema = schema_with_field("token", "API Token", ConfigFieldType::Secret, false, vec![]); let long = "x".repeat(10_001); let config = serde_json::json!({"token": long}); let r = validate_feed_input("Feed", &config, &schema); assert!(r.is_err()); } }