//! Application state wrapping the Orchestrator use bb_core::scheduler::any_feed_due; use bb_core::{Orchestrator, OrchestratorConfig}; use bb_interface::ErrorCategory; use parking_lot::RwLock; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use synckit_client::{SyncKitClient, SyncKitConfig}; use tauri::{AppHandle, Emitter, Manager}; use tokio::task::AbortHandle; use tracing::{debug, error, info, warn}; /// Default sync server URL. pub const SYNC_SERVER_URL: &str = "https://makenot.work"; /// Bundled SyncKit config. Compile-time embed so a missing file is a build /// error, not a silent runtime no-op. The API key is a public client /// identifier, not a secret. const SYNCKIT_TOML: &str = include_str!("../../synckit.toml"); /// Extract `api_key = "..."` from the bundled synckit.toml. /// Returns None for an empty value or the unconfigured placeholder so /// sync stays cleanly disabled until a real key is provisioned. fn parse_synckit_toml_key() -> Option<&'static str> { for line in SYNCKIT_TOML.lines() { let line = line.trim(); if let Some(rest) = line.strip_prefix("api_key") { let rest = rest.trim_start().strip_prefix('=')?.trim(); let key = rest.trim_matches('"'); if !key.is_empty() && key != "REPLACE_WITH_BB_SYNC_API_KEY" { return Some(key); } } } None } /// Application state wrapping the Orchestrator pub struct AppState { pub orchestrator: Orchestrator, /// SyncKit client for cloud sync (None until configured via API key). pub sync_client: RwLock>>, /// App data directory for key persistence. pub data_dir: PathBuf, /// Guard preventing concurrent sync operations (manual + scheduler). pub sync_mutex: tokio::sync::Mutex<()>, /// Handle to abort the background auto-fetch task on shutdown. auto_fetch_handle: parking_lot::Mutex>, /// Handle to abort the background stale-item cleanup task on shutdown. cleanup_handle: parking_lot::Mutex>, /// Handle to abort the background sync scheduler task on shutdown. sync_scheduler_handle: parking_lot::Mutex>, } impl AppState { pub async fn new(app: &AppHandle) -> Result { let app_data_dir = app .path() .app_data_dir() .map_err(|e| format!("Failed to get app data dir: {}", e))?; info!(?app_data_dir, "Initializing application state"); std::fs::create_dir_all(&app_data_dir) .map_err(|e| format!("Failed to create app data dir: {}", e))?; let db_path = app_data_dir.join("balanced_breakfast.db"); let db_url = format!("sqlite:{}?mode=rwc", db_path.display()); debug!(?db_path, "Connecting to database"); // Determine plugins directory let plugins_dir = find_plugins_dir(app); info!(?plugins_dir, "Using plugins directory"); // Copy bundled plugins to config dir on first launch copy_bundled_plugins(app, &plugins_dir); let config = OrchestratorConfig { database_url: db_url, plugins_dir: plugins_dir.to_string_lossy().to_string(), fetch_interval_secs: 300, }; let mut orchestrator = Orchestrator::new(config) .await .map_err(|e| format!("Failed to create orchestrator: {}", e))?; // Load or create encryption key for plugin secrets (keychain preferred, file fallback). // This is a hard requirement — without an encryption key, plugin secrets // would be stored in plaintext, which is a security risk. let key_path = app_data_dir.join("encryption.key"); let key = bb_core::crypto::load_or_create_key_from_keychain(&key_path) .map_err(|e| format!("Failed to load encryption key: {e}"))?; info!("Encryption key loaded"); orchestrator.set_encryption_key(key); info!("Orchestrator created, running migrations"); orchestrator .migrate() .await .map_err(|e| format!("Failed to run migrations: {}", e))?; info!("Migrations complete, loading plugins"); match orchestrator.load_plugins().await { Ok(loaded) => info!(count = loaded.len(), "Loaded plugins"), Err(e) => tracing::warn!(error = %e, "Failed to load some plugins"), } // Encrypt any plaintext secrets in existing feeds if let Err(e) = orchestrator.encrypt_existing_secrets().await { tracing::warn!(error = %e, "Failed to encrypt existing secrets"); } // Initialize plugins from DB init_plugins_from_db(&orchestrator).await; // Initialize SyncKit client from saved key or env vars let sync_client = load_sync_client(&app_data_dir); // Clean up old download temp files from previous sessions let downloads_dir = std::env::temp_dir().join("bb-downloads"); if downloads_dir.exists() { if let Err(e) = std::fs::remove_dir_all(&downloads_dir) { debug!(error = %e, "Failed to clean up old download temp files"); } } info!("Application state initialized"); Ok(Self { orchestrator, sync_client: RwLock::new(sync_client.map(Arc::new)), data_dir: app_data_dir, sync_mutex: tokio::sync::Mutex::new(()), auto_fetch_handle: parking_lot::Mutex::new(None), cleanup_handle: parking_lot::Mutex::new(None), sync_scheduler_handle: parking_lot::Mutex::new(None), }) } /// Store the abort handle for the background auto-fetch task. pub fn set_auto_fetch_handle(&self, handle: AbortHandle) { let mut guard = self.auto_fetch_handle.lock(); // If there's already a running task, abort it before replacing. if let Some(old) = guard.take() { old.abort(); } *guard = Some(handle); } /// Abort the background auto-fetch task if it is running. pub fn abort_auto_fetch(&self) { let mut guard = self.auto_fetch_handle.lock(); if let Some(handle) = guard.take() { info!("Aborting auto-fetch background task"); handle.abort(); } } /// Store the abort handle for the background stale-item cleanup task. pub fn set_cleanup_handle(&self, handle: AbortHandle) { let mut guard = self.cleanup_handle.lock(); if let Some(old) = guard.take() { old.abort(); } *guard = Some(handle); } /// Abort the background cleanup task if it is running. pub fn abort_cleanup(&self) { let mut guard = self.cleanup_handle.lock(); if let Some(handle) = guard.take() { info!("Aborting stale-cleanup background task"); handle.abort(); } } /// Store the abort handle for the background sync scheduler task. pub fn set_sync_scheduler_handle(&self, handle: AbortHandle) { let mut guard = self.sync_scheduler_handle.lock(); if let Some(old) = guard.take() { old.abort(); } *guard = Some(handle); } /// Abort the background sync scheduler task if it is running. pub fn abort_sync_scheduler(&self) { let mut guard = self.sync_scheduler_handle.lock(); if let Some(handle) = guard.take() { info!("Aborting sync-scheduler background task"); handle.abort(); } } } impl Drop for AppState { fn drop(&mut self) { self.abort_auto_fetch(); self.abort_cleanup(); self.abort_sync_scheduler(); } } /// Load API key from the saved file, falling back to env var. pub fn load_api_key(data_dir: &std::path::Path) -> Option { let key_path = data_dir.join("sync_api_key"); if let Ok(key) = std::fs::read_to_string(&key_path) { let key = key.trim().to_string(); if !key.is_empty() { return Some(key); } } if let Ok(key) = std::env::var("BB_SYNC_API_KEY") { return Some(key); } parse_synckit_toml_key().map(String::from) } /// Save API key to the data directory. pub fn save_api_key(data_dir: &std::path::Path, api_key: &str) { let key_path = data_dir.join("sync_api_key"); if let Err(e) = std::fs::write(&key_path, api_key) { tracing::error!(error = %e, path = %key_path.display(), "Failed to save API key"); return; } // Restrict file permissions to owner-only (0600) on Unix #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; let perms = std::fs::Permissions::from_mode(0o600); if let Err(e) = std::fs::set_permissions(&key_path, perms) { tracing::warn!(error = %e, path = %key_path.display(), "Failed to set API key file permissions"); } } } /// Create a SyncKitClient from the saved or env-var API key. fn load_sync_client(data_dir: &std::path::Path) -> Option { let api_key = load_api_key(data_dir)?; let server_url = std::env::var("BB_SYNC_SERVER_URL") .unwrap_or_else(|_| SYNC_SERVER_URL.to_string()); info!(%server_url, "SyncKit client configured"); let client = SyncKitClient::new(SyncKitConfig { server_url, api_key }); match client.try_load_key_from_keychain() { Ok(true) => info!("Sync encryption key loaded from keychain"), Ok(false) => debug!("No sync encryption key in keychain"), Err(e) => warn!(error = %e, "Failed to load sync encryption key"), } Some(client) } /// Find the plugins directory, preferring dev-mode project root fn find_plugins_dir(app: &AppHandle) -> PathBuf { // In dev mode, use the project-root plugins/ directory let dev_plugins = PathBuf::from(env!("CARGO_MANIFEST_DIR")) .parent() .map(|p| p.join("plugins")); if let Some(ref dev_path) = dev_plugins { if dev_path.exists() { return dev_path.clone(); } } // In production, use the app config dir (fall back to app data dir) let base = app .path() .app_config_dir() .or_else(|_| app.path().app_data_dir()) .unwrap_or_else(|_| PathBuf::from(".")); base.join("plugins") } /// Copy bundled plugin resources to the config plugins directory. /// Always updates bundled plugins to the latest version while preserving user-added plugins. fn copy_bundled_plugins(app: &AppHandle, plugins_dir: &PathBuf) { if let Ok(resource_dir) = app.path().resource_dir() { let bundled = resource_dir.join("plugins"); if bundled.exists() { if let Err(e) = std::fs::create_dir_all(plugins_dir) { tracing::warn!(error = %e, path = ?plugins_dir, "Failed to create plugins directory"); return; } if let Ok(entries) = std::fs::read_dir(&bundled) { for entry in entries.flatten() { let path = entry.path(); if path.extension().is_some_and(|e| e == "rhai") { let dest = plugins_dir.join(entry.file_name()); // Skip if installed file is identical to bundled version if dest.exists() { if let (Ok(bundled_bytes), Ok(installed_bytes)) = (std::fs::read(&path), std::fs::read(&dest)) { if bundled_bytes == installed_bytes { continue; } } } if let Err(e) = std::fs::copy(&path, &dest) { tracing::warn!(error = %e, ?path, "Failed to copy plugin"); } else { info!(file = ?entry.file_name(), "Copied bundled plugin"); } } } } } } } /// Initialize all plugins that have feeds configured in the database async fn init_plugins_from_db(orchestrator: &Orchestrator) { let plugin_ids = { let plugins = orchestrator.plugins(); let plugins = plugins.read().await; plugins.list_plugins() }; for plugin_id in plugin_ids { if let Err(e) = orchestrator.init_plugin_from_db(&plugin_id).await { tracing::warn!(error = %e, %plugin_id, "Failed to init plugin from DB"); } } } /// Spawn a background task that auto-fetches plugins on their preferred interval. /// Checks every 60 seconds which plugins are due for a fetch. /// The task's `AbortHandle` is stored in `AppState` so it can be cancelled on shutdown. pub fn spawn_auto_fetch(app_handle: AppHandle, state: Arc) { const CHECK_INTERVAL_SECS: u64 = 60; let task_state = Arc::clone(&state); let handle = tauri::async_runtime::spawn(async move { // Per-plugin rate-limit backoff: maps plugin_id → earliest next fetch time. let mut backoff_until: HashMap = HashMap::new(); loop { tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)).await; let plugin_ids = { let plugins = task_state.orchestrator.plugins(); let plugins = plugins.read().await; plugins.list_plugins() }; for plugin_id in plugin_ids { let interval = task_state.orchestrator.fetch_interval_secs(&plugin_id).await; if interval == 0 { continue; // Auto-fetch disabled for this plugin } // Skip if rate-limit backoff hasn't expired if let Some(&until) = backoff_until.get(&plugin_id) { if tokio::time::Instant::now() < until { debug!(%plugin_id, "Skipping rate-limited feed (backoff active)"); continue; } backoff_until.remove(&plugin_id); } // Skip circuit-broken feeds match task_state.orchestrator.is_circuit_broken(&plugin_id).await { Ok(true) => { debug!(%plugin_id, "Skipping circuit-broken feed"); continue; } Ok(false) => {} Err(e) => { error!(error = %e, %plugin_id, "Failed to check circuit breaker"); continue; } } // Check if this plugin is due for a fetch let due = match is_fetch_due(&task_state, &plugin_id, interval).await { Ok(due) => due, Err(e) => { error!(error = %e, %plugin_id, "Failed to check fetch status"); continue; } }; if !due { continue; } info!(%plugin_id, "Auto-fetching plugin"); match task_state.orchestrator.fetch_plugin(&plugin_id).await { Ok(count) => { if count > 0 { info!(count, %plugin_id, "Auto-fetch complete"); // Notify the frontend so it can refresh if let Err(e) = app_handle.emit("auto-fetch-complete", &plugin_id) { debug!(error = %e, "Failed to emit auto-fetch-complete event"); } } } Err(e) => { // Classify the error to determine backoff behavior let structured = bb_core::classify_error(&e.to_string()); let category_str = structured.category.to_string(); // Apply rate-limit backoff if structured.category == ErrorCategory::RateLimited { let delay_secs = structured.retry_after_secs.unwrap_or(60); info!(%plugin_id, delay_secs, "Rate limited, backing off"); backoff_until.insert( plugin_id.clone(), tokio::time::Instant::now() + std::time::Duration::from_secs(delay_secs), ); } error!(error = %e, %plugin_id, category = %category_str, "Auto-fetch failed"); // Check if the circuit breaker just tripped if let Ok(true) = task_state.orchestrator.is_circuit_broken(&plugin_id).await { info!(%plugin_id, "Circuit breaker tripped, emitting event"); if let Err(emit_err) = app_handle.emit( "feed-circuit-broken", serde_json::json!({ "pluginId": plugin_id, "error": e.to_string(), "category": category_str, }), ) { debug!(error = %emit_err, "Failed to emit feed-circuit-broken event"); } } if let Err(emit_err) = app_handle.emit( "auto-fetch-error", serde_json::json!({ "pluginId": plugin_id, "error": e.to_string(), "category": category_str, }), ) { debug!(error = %emit_err, "Failed to emit auto-fetch-error event"); } } } } } }); state.set_auto_fetch_handle(handle.inner().abort_handle()); } /// Spawn a background task that periodically deletes old read items. /// Runs every 6 hours, removing read (non-starred) items older than 30 days. /// The task's `AbortHandle` is stored in `AppState` so it can be cancelled on shutdown. pub fn spawn_stale_cleanup(state: Arc) { const CLEANUP_INTERVAL_SECS: u64 = 21600; // 6 hours const STALE_DAYS: i64 = 30; let task_state = Arc::clone(&state); let handle = tauri::async_runtime::spawn(async move { loop { tokio::time::sleep(std::time::Duration::from_secs(CLEANUP_INTERVAL_SECS)).await; let cutoff = chrono::Utc::now() - chrono::Duration::days(STALE_DAYS); let db = task_state.orchestrator.database(); match db.items().delete_stale_read(cutoff).await { Ok(count) if count > 0 => { info!(count, "Stale cleanup: deleted old read items"); } Ok(_) => {} Err(e) => { error!(error = %e, "Stale cleanup failed"); } } } }); state.set_cleanup_handle(handle.inner().abort_handle()); } /// Check if a plugin is due for a fetch by comparing its last_fetch time /// against the configured interval. async fn is_fetch_due( state: &AppState, plugin_id: &str, interval_secs: u64, ) -> Result { let db = state.orchestrator.database(); let feeds = db .feeds() .get_by_busser(plugin_id) .await .map_err(|e| e.to_string())?; // If no feeds configured, nothing to fetch if feeds.is_empty() { return Ok(false); } let now = chrono::Utc::now(); let feed_times: Vec<_> = feeds .iter() .map(|f| f.last_fetch.as_deref()) .collect(); Ok(any_feed_due(&feed_times, interval_secs, now)) }