//! Application state wrapping the Orchestrator
use bb_core::scheduler::any_feed_due;
use bb_core::{Orchestrator, OrchestratorConfig};
use bb_interface::ErrorCategory;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use synckit_client::{SyncKitClient, SyncKitConfig};
use tauri::{AppHandle, Emitter, Manager};
use tokio::task::AbortHandle;
use tracing::{debug, error, info, warn};
/// Default sync server URL.
pub const SYNC_SERVER_URL: &str = "https://makenot.work";
/// Bundled SyncKit config. Compile-time embed so a missing file is a build
/// error, not a silent runtime no-op. The API key is a public client
/// identifier, not a secret.
const SYNCKIT_TOML: &str = include_str!("../../synckit.toml");
/// Extract `api_key = "..."` from the bundled synckit.toml.
/// Returns None for an empty value or the unconfigured placeholder so
/// sync stays cleanly disabled until a real key is provisioned.
fn parse_synckit_toml_key() -> Option<&'static str> {
for line in SYNCKIT_TOML.lines() {
let line = line.trim();
if let Some(rest) = line.strip_prefix("api_key") {
let rest = rest.trim_start().strip_prefix('=')?.trim();
let key = rest.trim_matches('"');
if !key.is_empty() && key != "REPLACE_WITH_BB_SYNC_API_KEY" {
return Some(key);
}
}
}
None
}
/// Application state wrapping the Orchestrator
pub struct AppState {
pub orchestrator: Orchestrator,
/// SyncKit client for cloud sync (None until configured via API key).
pub sync_client: RwLock>>,
/// App data directory for key persistence.
pub data_dir: PathBuf,
/// Guard preventing concurrent sync operations (manual + scheduler).
pub sync_mutex: tokio::sync::Mutex<()>,
/// Handle to abort the background auto-fetch task on shutdown.
auto_fetch_handle: parking_lot::Mutex >,
/// Handle to abort the background stale-item cleanup task on shutdown.
cleanup_handle: parking_lot::Mutex >,
/// Handle to abort the background sync scheduler task on shutdown.
sync_scheduler_handle: parking_lot::Mutex >,
}
impl AppState {
pub async fn new(app: &AppHandle) -> Result {
let app_data_dir = app
.path()
.app_data_dir()
.map_err(|e| format!("Failed to get app data dir: {}", e))?;
info!(?app_data_dir, "Initializing application state");
std::fs::create_dir_all(&app_data_dir)
.map_err(|e| format!("Failed to create app data dir: {}", e))?;
let db_path = app_data_dir.join("balanced_breakfast.db");
let db_url = format!("sqlite:{}?mode=rwc", db_path.display());
debug!(?db_path, "Connecting to database");
// Determine plugins directory
let plugins_dir = find_plugins_dir(app);
info!(?plugins_dir, "Using plugins directory");
// Copy bundled plugins to config dir on first launch
copy_bundled_plugins(app, &plugins_dir);
let config = OrchestratorConfig {
database_url: db_url,
plugins_dir: plugins_dir.to_string_lossy().to_string(),
fetch_interval_secs: 300,
};
let mut orchestrator = Orchestrator::new(config)
.await
.map_err(|e| format!("Failed to create orchestrator: {}", e))?;
// Load or create encryption key for plugin secrets (keychain preferred, file fallback).
// This is a hard requirement — without an encryption key, plugin secrets
// would be stored in plaintext, which is a security risk.
let key_path = app_data_dir.join("encryption.key");
let key = bb_core::crypto::load_or_create_key_from_keychain(&key_path)
.map_err(|e| format!("Failed to load encryption key: {e}"))?;
info!("Encryption key loaded");
orchestrator.set_encryption_key(key);
info!("Orchestrator created, running migrations");
orchestrator
.migrate()
.await
.map_err(|e| format!("Failed to run migrations: {}", e))?;
info!("Migrations complete, loading plugins");
match orchestrator.load_plugins().await {
Ok(loaded) => info!(count = loaded.len(), "Loaded plugins"),
Err(e) => tracing::warn!(error = %e, "Failed to load some plugins"),
}
// Encrypt any plaintext secrets in existing feeds
if let Err(e) = orchestrator.encrypt_existing_secrets().await {
tracing::warn!(error = %e, "Failed to encrypt existing secrets");
}
// Initialize plugins from DB
init_plugins_from_db(&orchestrator).await;
// Initialize SyncKit client from saved key or env vars
let sync_client = load_sync_client(&app_data_dir);
// Clean up old download temp files from previous sessions
let downloads_dir = std::env::temp_dir().join("bb-downloads");
if downloads_dir.exists() {
if let Err(e) = std::fs::remove_dir_all(&downloads_dir) {
debug!(error = %e, "Failed to clean up old download temp files");
}
}
info!("Application state initialized");
Ok(Self {
orchestrator,
sync_client: RwLock::new(sync_client.map(Arc::new)),
data_dir: app_data_dir,
sync_mutex: tokio::sync::Mutex::new(()),
auto_fetch_handle: parking_lot::Mutex::new(None),
cleanup_handle: parking_lot::Mutex::new(None),
sync_scheduler_handle: parking_lot::Mutex::new(None),
})
}
/// Store the abort handle for the background auto-fetch task.
pub fn set_auto_fetch_handle(&self, handle: AbortHandle) {
let mut guard = self.auto_fetch_handle.lock();
// If there's already a running task, abort it before replacing.
if let Some(old) = guard.take() {
old.abort();
}
*guard = Some(handle);
}
/// Abort the background auto-fetch task if it is running.
pub fn abort_auto_fetch(&self) {
let mut guard = self.auto_fetch_handle.lock();
if let Some(handle) = guard.take() {
info!("Aborting auto-fetch background task");
handle.abort();
}
}
/// Store the abort handle for the background stale-item cleanup task.
pub fn set_cleanup_handle(&self, handle: AbortHandle) {
let mut guard = self.cleanup_handle.lock();
if let Some(old) = guard.take() {
old.abort();
}
*guard = Some(handle);
}
/// Abort the background cleanup task if it is running.
pub fn abort_cleanup(&self) {
let mut guard = self.cleanup_handle.lock();
if let Some(handle) = guard.take() {
info!("Aborting stale-cleanup background task");
handle.abort();
}
}
/// Store the abort handle for the background sync scheduler task.
pub fn set_sync_scheduler_handle(&self, handle: AbortHandle) {
let mut guard = self.sync_scheduler_handle.lock();
if let Some(old) = guard.take() {
old.abort();
}
*guard = Some(handle);
}
/// Abort the background sync scheduler task if it is running.
pub fn abort_sync_scheduler(&self) {
let mut guard = self.sync_scheduler_handle.lock();
if let Some(handle) = guard.take() {
info!("Aborting sync-scheduler background task");
handle.abort();
}
}
}
impl Drop for AppState {
fn drop(&mut self) {
self.abort_auto_fetch();
self.abort_cleanup();
self.abort_sync_scheduler();
}
}
/// Load API key from the saved file, falling back to env var.
pub fn load_api_key(data_dir: &std::path::Path) -> Option {
let key_path = data_dir.join("sync_api_key");
if let Ok(key) = std::fs::read_to_string(&key_path) {
let key = key.trim().to_string();
if !key.is_empty() {
return Some(key);
}
}
if let Ok(key) = std::env::var("BB_SYNC_API_KEY") {
return Some(key);
}
parse_synckit_toml_key().map(String::from)
}
/// Save API key to the data directory.
pub fn save_api_key(data_dir: &std::path::Path, api_key: &str) {
let key_path = data_dir.join("sync_api_key");
if let Err(e) = std::fs::write(&key_path, api_key) {
tracing::error!(error = %e, path = %key_path.display(), "Failed to save API key");
return;
}
// Restrict file permissions to owner-only (0600) on Unix
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let perms = std::fs::Permissions::from_mode(0o600);
if let Err(e) = std::fs::set_permissions(&key_path, perms) {
tracing::warn!(error = %e, path = %key_path.display(), "Failed to set API key file permissions");
}
}
}
/// Create a SyncKitClient from the saved or env-var API key.
fn load_sync_client(data_dir: &std::path::Path) -> Option {
let api_key = load_api_key(data_dir)?;
let server_url = std::env::var("BB_SYNC_SERVER_URL")
.unwrap_or_else(|_| SYNC_SERVER_URL.to_string());
info!(%server_url, "SyncKit client configured");
let client = SyncKitClient::new(SyncKitConfig { server_url, api_key });
match client.try_load_key_from_keychain() {
Ok(true) => info!("Sync encryption key loaded from keychain"),
Ok(false) => debug!("No sync encryption key in keychain"),
Err(e) => warn!(error = %e, "Failed to load sync encryption key"),
}
Some(client)
}
/// Find the plugins directory, preferring dev-mode project root
fn find_plugins_dir(app: &AppHandle) -> PathBuf {
// In dev mode, use the project-root plugins/ directory
let dev_plugins = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.map(|p| p.join("plugins"));
if let Some(ref dev_path) = dev_plugins {
if dev_path.exists() {
return dev_path.clone();
}
}
// In production, use the app config dir (fall back to app data dir)
let base = app
.path()
.app_config_dir()
.or_else(|_| app.path().app_data_dir())
.unwrap_or_else(|_| PathBuf::from("."));
base.join("plugins")
}
/// Copy bundled plugin resources to the config plugins directory.
/// Always updates bundled plugins to the latest version while preserving user-added plugins.
fn copy_bundled_plugins(app: &AppHandle, plugins_dir: &PathBuf) {
if let Ok(resource_dir) = app.path().resource_dir() {
let bundled = resource_dir.join("plugins");
if bundled.exists() {
if let Err(e) = std::fs::create_dir_all(plugins_dir) {
tracing::warn!(error = %e, path = ?plugins_dir, "Failed to create plugins directory");
return;
}
if let Ok(entries) = std::fs::read_dir(&bundled) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().is_some_and(|e| e == "rhai") {
let dest = plugins_dir.join(entry.file_name());
// Skip if installed file is identical to bundled version
if dest.exists() {
if let (Ok(bundled_bytes), Ok(installed_bytes)) =
(std::fs::read(&path), std::fs::read(&dest))
{
if bundled_bytes == installed_bytes {
continue;
}
}
}
if let Err(e) = std::fs::copy(&path, &dest) {
tracing::warn!(error = %e, ?path, "Failed to copy plugin");
} else {
info!(file = ?entry.file_name(), "Copied bundled plugin");
}
}
}
}
}
}
}
/// Initialize all plugins that have feeds configured in the database
async fn init_plugins_from_db(orchestrator: &Orchestrator) {
let plugin_ids = {
let plugins = orchestrator.plugins();
let plugins = plugins.read().await;
plugins.list_plugins()
};
for plugin_id in plugin_ids {
if let Err(e) = orchestrator.init_plugin_from_db(&plugin_id).await {
tracing::warn!(error = %e, %plugin_id, "Failed to init plugin from DB");
}
}
}
/// Spawn a background task that auto-fetches plugins on their preferred interval.
/// Checks every 60 seconds which plugins are due for a fetch.
/// The task's `AbortHandle` is stored in `AppState` so it can be cancelled on shutdown.
pub fn spawn_auto_fetch(app_handle: AppHandle, state: Arc) {
const CHECK_INTERVAL_SECS: u64 = 60;
let task_state = Arc::clone(&state);
let handle = tauri::async_runtime::spawn(async move {
// Per-plugin rate-limit backoff: maps plugin_id → earliest next fetch time.
let mut backoff_until: HashMap = HashMap::new();
loop {
tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)).await;
let plugin_ids = {
let plugins = task_state.orchestrator.plugins();
let plugins = plugins.read().await;
plugins.list_plugins()
};
for plugin_id in plugin_ids {
let interval = task_state.orchestrator.fetch_interval_secs(&plugin_id).await;
if interval == 0 {
continue; // Auto-fetch disabled for this plugin
}
// Skip if rate-limit backoff hasn't expired
if let Some(&until) = backoff_until.get(&plugin_id) {
if tokio::time::Instant::now() < until {
debug!(%plugin_id, "Skipping rate-limited feed (backoff active)");
continue;
}
backoff_until.remove(&plugin_id);
}
// Skip circuit-broken feeds
match task_state.orchestrator.is_circuit_broken(&plugin_id).await {
Ok(true) => {
debug!(%plugin_id, "Skipping circuit-broken feed");
continue;
}
Ok(false) => {}
Err(e) => {
error!(error = %e, %plugin_id, "Failed to check circuit breaker");
continue;
}
}
// Check if this plugin is due for a fetch
let due = match is_fetch_due(&task_state, &plugin_id, interval).await {
Ok(due) => due,
Err(e) => {
error!(error = %e, %plugin_id, "Failed to check fetch status");
continue;
}
};
if !due {
continue;
}
info!(%plugin_id, "Auto-fetching plugin");
match task_state.orchestrator.fetch_plugin(&plugin_id).await {
Ok(count) => {
if count > 0 {
info!(count, %plugin_id, "Auto-fetch complete");
// Notify the frontend so it can refresh
if let Err(e) = app_handle.emit("auto-fetch-complete", &plugin_id) {
debug!(error = %e, "Failed to emit auto-fetch-complete event");
}
}
}
Err(e) => {
// Classify the error to determine backoff behavior
let structured = bb_core::classify_error(&e.to_string());
let category_str = structured.category.to_string();
// Apply rate-limit backoff
if structured.category == ErrorCategory::RateLimited {
let delay_secs = structured.retry_after_secs.unwrap_or(60);
info!(%plugin_id, delay_secs, "Rate limited, backing off");
backoff_until.insert(
plugin_id.clone(),
tokio::time::Instant::now()
+ std::time::Duration::from_secs(delay_secs),
);
}
error!(error = %e, %plugin_id, category = %category_str, "Auto-fetch failed");
// Check if the circuit breaker just tripped
if let Ok(true) = task_state.orchestrator.is_circuit_broken(&plugin_id).await
{
info!(%plugin_id, "Circuit breaker tripped, emitting event");
if let Err(emit_err) = app_handle.emit(
"feed-circuit-broken",
serde_json::json!({
"pluginId": plugin_id,
"error": e.to_string(),
"category": category_str,
}),
) {
debug!(error = %emit_err, "Failed to emit feed-circuit-broken event");
}
}
if let Err(emit_err) = app_handle.emit(
"auto-fetch-error",
serde_json::json!({
"pluginId": plugin_id,
"error": e.to_string(),
"category": category_str,
}),
) {
debug!(error = %emit_err, "Failed to emit auto-fetch-error event");
}
}
}
}
}
});
state.set_auto_fetch_handle(handle.inner().abort_handle());
}
/// Spawn a background task that periodically deletes old read items.
/// Runs every 6 hours, removing read (non-starred) items older than 30 days.
/// The task's `AbortHandle` is stored in `AppState` so it can be cancelled on shutdown.
pub fn spawn_stale_cleanup(state: Arc) {
const CLEANUP_INTERVAL_SECS: u64 = 21600; // 6 hours
const STALE_DAYS: i64 = 30;
let task_state = Arc::clone(&state);
let handle = tauri::async_runtime::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(CLEANUP_INTERVAL_SECS)).await;
let cutoff = chrono::Utc::now() - chrono::Duration::days(STALE_DAYS);
let db = task_state.orchestrator.database();
match db.items().delete_stale_read(cutoff).await {
Ok(count) if count > 0 => {
info!(count, "Stale cleanup: deleted old read items");
}
Ok(_) => {}
Err(e) => {
error!(error = %e, "Stale cleanup failed");
}
}
}
});
state.set_cleanup_handle(handle.inner().abort_handle());
}
/// Check if a plugin is due for a fetch by comparing its last_fetch time
/// against the configured interval.
async fn is_fetch_due(
state: &AppState,
plugin_id: &str,
interval_secs: u64,
) -> Result {
let db = state.orchestrator.database();
let feeds = db
.feeds()
.get_by_busser(plugin_id)
.await
.map_err(|e| e.to_string())?;
// If no feeds configured, nothing to fetch
if feeds.is_empty() {
return Ok(false);
}
let now = chrono::Utc::now();
let feed_times: Vec<_> = feeds
.iter()
.map(|f| f.last_fetch.as_deref())
.collect();
Ok(any_feed_due(&feed_times, interval_secs, now))
}