Skip to main content

max / balanced_breakfast

19.8 KB · 512 lines History Blame Raw
1 //! Application state wrapping the Orchestrator
2 use bb_core::scheduler::any_feed_due;
3 use bb_core::{Orchestrator, OrchestratorConfig};
4 use bb_interface::ErrorCategory;
5 use parking_lot::RwLock;
6 use std::collections::HashMap;
7 use std::path::PathBuf;
8 use std::sync::Arc;
9 use synckit_client::{SyncKitClient, SyncKitConfig};
10 use tauri::{AppHandle, Emitter, Manager};
11 use tokio::task::AbortHandle;
12 use tracing::{debug, error, info, warn};
13
14 /// Default sync server URL.
15 pub const SYNC_SERVER_URL: &str = "https://makenot.work";
16
17 /// Bundled SyncKit config. Compile-time embed so a missing file is a build
18 /// error, not a silent runtime no-op. The API key is a public client
19 /// identifier, not a secret.
20 const SYNCKIT_TOML: &str = include_str!("../../synckit.toml");
21
22 /// Extract `api_key = "..."` from the bundled synckit.toml.
23 /// Returns None for an empty value or the unconfigured placeholder so
24 /// sync stays cleanly disabled until a real key is provisioned.
25 fn parse_synckit_toml_key() -> Option<&'static str> {
26 for line in SYNCKIT_TOML.lines() {
27 let line = line.trim();
28 if let Some(rest) = line.strip_prefix("api_key") {
29 let rest = rest.trim_start().strip_prefix('=')?.trim();
30 let key = rest.trim_matches('"');
31 if !key.is_empty() && key != "REPLACE_WITH_BB_SYNC_API_KEY" {
32 return Some(key);
33 }
34 }
35 }
36 None
37 }
38
39 /// Application state wrapping the Orchestrator
40 pub struct AppState {
41 pub orchestrator: Orchestrator,
42 /// SyncKit client for cloud sync (None until configured via API key).
43 pub sync_client: RwLock<Option<Arc<SyncKitClient>>>,
44 /// App data directory for key persistence.
45 pub data_dir: PathBuf,
46 /// Guard preventing concurrent sync operations (manual + scheduler).
47 pub sync_mutex: tokio::sync::Mutex<()>,
48 /// Handle to abort the background auto-fetch task on shutdown.
49 auto_fetch_handle: parking_lot::Mutex<Option<AbortHandle>>,
50 /// Handle to abort the background stale-item cleanup task on shutdown.
51 cleanup_handle: parking_lot::Mutex<Option<AbortHandle>>,
52 /// Handle to abort the background sync scheduler task on shutdown.
53 sync_scheduler_handle: parking_lot::Mutex<Option<AbortHandle>>,
54 }
55
56 impl AppState {
57 pub async fn new(app: &AppHandle) -> Result<Self, String> {
58 let app_data_dir = app
59 .path()
60 .app_data_dir()
61 .map_err(|e| format!("Failed to get app data dir: {}", e))?;
62
63 info!(?app_data_dir, "Initializing application state");
64
65 std::fs::create_dir_all(&app_data_dir)
66 .map_err(|e| format!("Failed to create app data dir: {}", e))?;
67
68 let db_path = app_data_dir.join("balanced_breakfast.db");
69 let db_url = format!("sqlite:{}?mode=rwc", db_path.display());
70
71 debug!(?db_path, "Connecting to database");
72
73 // Determine plugins directory
74 let plugins_dir = find_plugins_dir(app);
75 info!(?plugins_dir, "Using plugins directory");
76
77 // Copy bundled plugins to config dir on first launch
78 copy_bundled_plugins(app, &plugins_dir);
79
80 let config = OrchestratorConfig {
81 database_url: db_url,
82 plugins_dir: plugins_dir.to_string_lossy().to_string(),
83 fetch_interval_secs: 300,
84 };
85
86 let mut orchestrator = Orchestrator::new(config)
87 .await
88 .map_err(|e| format!("Failed to create orchestrator: {}", e))?;
89
90 // Load or create encryption key for plugin secrets (keychain preferred, file fallback).
91 // This is a hard requirement — without an encryption key, plugin secrets
92 // would be stored in plaintext, which is a security risk.
93 let key_path = app_data_dir.join("encryption.key");
94 let key = bb_core::crypto::load_or_create_key_from_keychain(&key_path)
95 .map_err(|e| format!("Failed to load encryption key: {e}"))?;
96 info!("Encryption key loaded");
97 orchestrator.set_encryption_key(key);
98
99 info!("Orchestrator created, running migrations");
100
101 orchestrator
102 .migrate()
103 .await
104 .map_err(|e| format!("Failed to run migrations: {}", e))?;
105
106 info!("Migrations complete, loading plugins");
107
108 match orchestrator.load_plugins().await {
109 Ok(loaded) => info!(count = loaded.len(), "Loaded plugins"),
110 Err(e) => tracing::warn!(error = %e, "Failed to load some plugins"),
111 }
112
113 // Encrypt any plaintext secrets in existing feeds
114 if let Err(e) = orchestrator.encrypt_existing_secrets().await {
115 tracing::warn!(error = %e, "Failed to encrypt existing secrets");
116 }
117
118 // Initialize plugins from DB
119 init_plugins_from_db(&orchestrator).await;
120
121 // Initialize SyncKit client from saved key or env vars
122 let sync_client = load_sync_client(&app_data_dir);
123
124 // Clean up old download temp files from previous sessions
125 let downloads_dir = std::env::temp_dir().join("bb-downloads");
126 if downloads_dir.exists() {
127 if let Err(e) = std::fs::remove_dir_all(&downloads_dir) {
128 debug!(error = %e, "Failed to clean up old download temp files");
129 }
130 }
131
132 info!("Application state initialized");
133
134 Ok(Self {
135 orchestrator,
136 sync_client: RwLock::new(sync_client.map(Arc::new)),
137 data_dir: app_data_dir,
138 sync_mutex: tokio::sync::Mutex::new(()),
139 auto_fetch_handle: parking_lot::Mutex::new(None),
140 cleanup_handle: parking_lot::Mutex::new(None),
141 sync_scheduler_handle: parking_lot::Mutex::new(None),
142 })
143 }
144
145 /// Store the abort handle for the background auto-fetch task.
146 pub fn set_auto_fetch_handle(&self, handle: AbortHandle) {
147 let mut guard = self.auto_fetch_handle.lock();
148 // If there's already a running task, abort it before replacing.
149 if let Some(old) = guard.take() {
150 old.abort();
151 }
152 *guard = Some(handle);
153 }
154
155 /// Abort the background auto-fetch task if it is running.
156 pub fn abort_auto_fetch(&self) {
157 let mut guard = self.auto_fetch_handle.lock();
158 if let Some(handle) = guard.take() {
159 info!("Aborting auto-fetch background task");
160 handle.abort();
161 }
162 }
163
164 /// Store the abort handle for the background stale-item cleanup task.
165 pub fn set_cleanup_handle(&self, handle: AbortHandle) {
166 let mut guard = self.cleanup_handle.lock();
167 if let Some(old) = guard.take() {
168 old.abort();
169 }
170 *guard = Some(handle);
171 }
172
173 /// Abort the background cleanup task if it is running.
174 pub fn abort_cleanup(&self) {
175 let mut guard = self.cleanup_handle.lock();
176 if let Some(handle) = guard.take() {
177 info!("Aborting stale-cleanup background task");
178 handle.abort();
179 }
180 }
181
182 /// Store the abort handle for the background sync scheduler task.
183 pub fn set_sync_scheduler_handle(&self, handle: AbortHandle) {
184 let mut guard = self.sync_scheduler_handle.lock();
185 if let Some(old) = guard.take() {
186 old.abort();
187 }
188 *guard = Some(handle);
189 }
190
191 /// Abort the background sync scheduler task if it is running.
192 pub fn abort_sync_scheduler(&self) {
193 let mut guard = self.sync_scheduler_handle.lock();
194 if let Some(handle) = guard.take() {
195 info!("Aborting sync-scheduler background task");
196 handle.abort();
197 }
198 }
199 }
200
201 impl Drop for AppState {
202 fn drop(&mut self) {
203 self.abort_auto_fetch();
204 self.abort_cleanup();
205 self.abort_sync_scheduler();
206 }
207 }
208
209 /// Load API key from the saved file, falling back to env var.
210 pub fn load_api_key(data_dir: &std::path::Path) -> Option<String> {
211 let key_path = data_dir.join("sync_api_key");
212 if let Ok(key) = std::fs::read_to_string(&key_path) {
213 let key = key.trim().to_string();
214 if !key.is_empty() {
215 return Some(key);
216 }
217 }
218 if let Ok(key) = std::env::var("BB_SYNC_API_KEY") {
219 return Some(key);
220 }
221
222 parse_synckit_toml_key().map(String::from)
223 }
224
225 /// Save API key to the data directory.
226 pub fn save_api_key(data_dir: &std::path::Path, api_key: &str) {
227 let key_path = data_dir.join("sync_api_key");
228 if let Err(e) = std::fs::write(&key_path, api_key) {
229 tracing::error!(error = %e, path = %key_path.display(), "Failed to save API key");
230 return;
231 }
232 // Restrict file permissions to owner-only (0600) on Unix
233 #[cfg(unix)]
234 {
235 use std::os::unix::fs::PermissionsExt;
236 let perms = std::fs::Permissions::from_mode(0o600);
237 if let Err(e) = std::fs::set_permissions(&key_path, perms) {
238 tracing::warn!(error = %e, path = %key_path.display(), "Failed to set API key file permissions");
239 }
240 }
241 }
242
243 /// Create a SyncKitClient from the saved or env-var API key.
244 fn load_sync_client(data_dir: &std::path::Path) -> Option<SyncKitClient> {
245 let api_key = load_api_key(data_dir)?;
246 let server_url = std::env::var("BB_SYNC_SERVER_URL")
247 .unwrap_or_else(|_| SYNC_SERVER_URL.to_string());
248 info!(%server_url, "SyncKit client configured");
249 let client = SyncKitClient::new(SyncKitConfig { server_url, api_key });
250 match client.try_load_key_from_keychain() {
251 Ok(true) => info!("Sync encryption key loaded from keychain"),
252 Ok(false) => debug!("No sync encryption key in keychain"),
253 Err(e) => warn!(error = %e, "Failed to load sync encryption key"),
254 }
255 Some(client)
256 }
257
258 /// Find the plugins directory, preferring dev-mode project root
259 fn find_plugins_dir(app: &AppHandle) -> PathBuf {
260 // In dev mode, use the project-root plugins/ directory
261 let dev_plugins = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
262 .parent()
263 .map(|p| p.join("plugins"));
264
265 if let Some(ref dev_path) = dev_plugins {
266 if dev_path.exists() {
267 return dev_path.clone();
268 }
269 }
270
271 // In production, use the app config dir (fall back to app data dir)
272 let base = app
273 .path()
274 .app_config_dir()
275 .or_else(|_| app.path().app_data_dir())
276 .unwrap_or_else(|_| PathBuf::from("."));
277 base.join("plugins")
278 }
279
280 /// Copy bundled plugin resources to the config plugins directory.
281 /// Always updates bundled plugins to the latest version while preserving user-added plugins.
282 fn copy_bundled_plugins(app: &AppHandle, plugins_dir: &PathBuf) {
283 if let Ok(resource_dir) = app.path().resource_dir() {
284 let bundled = resource_dir.join("plugins");
285 if bundled.exists() {
286 if let Err(e) = std::fs::create_dir_all(plugins_dir) {
287 tracing::warn!(error = %e, path = ?plugins_dir, "Failed to create plugins directory");
288 return;
289 }
290 if let Ok(entries) = std::fs::read_dir(&bundled) {
291 for entry in entries.flatten() {
292 let path = entry.path();
293 if path.extension().is_some_and(|e| e == "rhai") {
294 let dest = plugins_dir.join(entry.file_name());
295 // Skip if installed file is identical to bundled version
296 if dest.exists() {
297 if let (Ok(bundled_bytes), Ok(installed_bytes)) =
298 (std::fs::read(&path), std::fs::read(&dest))
299 {
300 if bundled_bytes == installed_bytes {
301 continue;
302 }
303 }
304 }
305 if let Err(e) = std::fs::copy(&path, &dest) {
306 tracing::warn!(error = %e, ?path, "Failed to copy plugin");
307 } else {
308 info!(file = ?entry.file_name(), "Copied bundled plugin");
309 }
310 }
311 }
312 }
313 }
314 }
315 }
316
317 /// Initialize all plugins that have feeds configured in the database
318 async fn init_plugins_from_db(orchestrator: &Orchestrator) {
319 let plugin_ids = {
320 let plugins = orchestrator.plugins();
321 let plugins = plugins.read().await;
322 plugins.list_plugins()
323 };
324
325 for plugin_id in plugin_ids {
326 if let Err(e) = orchestrator.init_plugin_from_db(&plugin_id).await {
327 tracing::warn!(error = %e, %plugin_id, "Failed to init plugin from DB");
328 }
329 }
330 }
331
332 /// Spawn a background task that auto-fetches plugins on their preferred interval.
333 /// Checks every 60 seconds which plugins are due for a fetch.
334 /// The task's `AbortHandle` is stored in `AppState` so it can be cancelled on shutdown.
335 pub fn spawn_auto_fetch(app_handle: AppHandle, state: Arc<AppState>) {
336 const CHECK_INTERVAL_SECS: u64 = 60;
337
338 let task_state = Arc::clone(&state);
339 let handle = tauri::async_runtime::spawn(async move {
340 // Per-plugin rate-limit backoff: maps plugin_id → earliest next fetch time.
341 let mut backoff_until: HashMap<String, tokio::time::Instant> = HashMap::new();
342
343 loop {
344 tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)).await;
345
346 let plugin_ids = {
347 let plugins = task_state.orchestrator.plugins();
348 let plugins = plugins.read().await;
349 plugins.list_plugins()
350 };
351
352 for plugin_id in plugin_ids {
353 let interval = task_state.orchestrator.fetch_interval_secs(&plugin_id).await;
354 if interval == 0 {
355 continue; // Auto-fetch disabled for this plugin
356 }
357
358 // Skip if rate-limit backoff hasn't expired
359 if let Some(&until) = backoff_until.get(&plugin_id) {
360 if tokio::time::Instant::now() < until {
361 debug!(%plugin_id, "Skipping rate-limited feed (backoff active)");
362 continue;
363 }
364 backoff_until.remove(&plugin_id);
365 }
366
367 // Skip circuit-broken feeds
368 match task_state.orchestrator.is_circuit_broken(&plugin_id).await {
369 Ok(true) => {
370 debug!(%plugin_id, "Skipping circuit-broken feed");
371 continue;
372 }
373 Ok(false) => {}
374 Err(e) => {
375 error!(error = %e, %plugin_id, "Failed to check circuit breaker");
376 continue;
377 }
378 }
379
380 // Check if this plugin is due for a fetch
381 let due = match is_fetch_due(&task_state, &plugin_id, interval).await {
382 Ok(due) => due,
383 Err(e) => {
384 error!(error = %e, %plugin_id, "Failed to check fetch status");
385 continue;
386 }
387 };
388
389 if !due {
390 continue;
391 }
392
393 info!(%plugin_id, "Auto-fetching plugin");
394 match task_state.orchestrator.fetch_plugin(&plugin_id).await {
395 Ok(count) => {
396 if count > 0 {
397 info!(count, %plugin_id, "Auto-fetch complete");
398 // Notify the frontend so it can refresh
399 if let Err(e) = app_handle.emit("auto-fetch-complete", &plugin_id) {
400 debug!(error = %e, "Failed to emit auto-fetch-complete event");
401 }
402 }
403 }
404 Err(e) => {
405 // Classify the error to determine backoff behavior
406 let structured = bb_core::classify_error(&e.to_string());
407 let category_str = structured.category.to_string();
408
409 // Apply rate-limit backoff
410 if structured.category == ErrorCategory::RateLimited {
411 let delay_secs = structured.retry_after_secs.unwrap_or(60);
412 info!(%plugin_id, delay_secs, "Rate limited, backing off");
413 backoff_until.insert(
414 plugin_id.clone(),
415 tokio::time::Instant::now()
416 + std::time::Duration::from_secs(delay_secs),
417 );
418 }
419
420 error!(error = %e, %plugin_id, category = %category_str, "Auto-fetch failed");
421
422 // Check if the circuit breaker just tripped
423 if let Ok(true) = task_state.orchestrator.is_circuit_broken(&plugin_id).await
424 {
425 info!(%plugin_id, "Circuit breaker tripped, emitting event");
426 if let Err(emit_err) = app_handle.emit(
427 "feed-circuit-broken",
428 serde_json::json!({
429 "pluginId": plugin_id,
430 "error": e.to_string(),
431 "category": category_str,
432 }),
433 ) {
434 debug!(error = %emit_err, "Failed to emit feed-circuit-broken event");
435 }
436 }
437
438 if let Err(emit_err) = app_handle.emit(
439 "auto-fetch-error",
440 serde_json::json!({
441 "pluginId": plugin_id,
442 "error": e.to_string(),
443 "category": category_str,
444 }),
445 ) {
446 debug!(error = %emit_err, "Failed to emit auto-fetch-error event");
447 }
448 }
449 }
450 }
451 }
452 });
453
454 state.set_auto_fetch_handle(handle.inner().abort_handle());
455 }
456
457 /// Spawn a background task that periodically deletes old read items.
458 /// Runs every 6 hours, removing read (non-starred) items older than 30 days.
459 /// The task's `AbortHandle` is stored in `AppState` so it can be cancelled on shutdown.
460 pub fn spawn_stale_cleanup(state: Arc<AppState>) {
461 const CLEANUP_INTERVAL_SECS: u64 = 21600; // 6 hours
462 const STALE_DAYS: i64 = 30;
463
464 let task_state = Arc::clone(&state);
465 let handle = tauri::async_runtime::spawn(async move {
466 loop {
467 tokio::time::sleep(std::time::Duration::from_secs(CLEANUP_INTERVAL_SECS)).await;
468
469 let cutoff = chrono::Utc::now() - chrono::Duration::days(STALE_DAYS);
470 let db = task_state.orchestrator.database();
471 match db.items().delete_stale_read(cutoff).await {
472 Ok(count) if count > 0 => {
473 info!(count, "Stale cleanup: deleted old read items");
474 }
475 Ok(_) => {}
476 Err(e) => {
477 error!(error = %e, "Stale cleanup failed");
478 }
479 }
480 }
481 });
482
483 state.set_cleanup_handle(handle.inner().abort_handle());
484 }
485
486 /// Check if a plugin is due for a fetch by comparing its last_fetch time
487 /// against the configured interval.
488 async fn is_fetch_due(
489 state: &AppState,
490 plugin_id: &str,
491 interval_secs: u64,
492 ) -> Result<bool, String> {
493 let db = state.orchestrator.database();
494 let feeds = db
495 .feeds()
496 .get_by_busser(plugin_id)
497 .await
498 .map_err(|e| e.to_string())?;
499
500 // If no feeds configured, nothing to fetch
501 if feeds.is_empty() {
502 return Ok(false);
503 }
504
505 let now = chrono::Utc::now();
506 let feed_times: Vec<_> = feeds
507 .iter()
508 .map(|f| f.last_fetch.as_deref())
509 .collect();
510 Ok(any_feed_due(&feed_times, interval_secs, now))
511 }
512