| 1 |
|
| 2 |
use bb_core::scheduler::any_feed_due; |
| 3 |
use bb_core::{Orchestrator, OrchestratorConfig}; |
| 4 |
use bb_interface::ErrorCategory; |
| 5 |
use parking_lot::RwLock; |
| 6 |
use std::collections::HashMap; |
| 7 |
use std::path::PathBuf; |
| 8 |
use std::sync::Arc; |
| 9 |
use synckit_client::{SyncKitClient, SyncKitConfig}; |
| 10 |
use tauri::{AppHandle, Emitter, Manager}; |
| 11 |
use tokio::task::AbortHandle; |
| 12 |
use tracing::{debug, error, info, warn}; |
| 13 |
|
| 14 |
|
| 15 |
pub const SYNC_SERVER_URL: &str = "https://makenot.work"; |
| 16 |
|
| 17 |
|
| 18 |
|
| 19 |
|
| 20 |
const SYNCKIT_TOML: &str = include_str!("../../synckit.toml"); |
| 21 |
|
| 22 |
|
| 23 |
|
| 24 |
|
| 25 |
fn parse_synckit_toml_key() -> Option<&'static str> { |
| 26 |
for line in SYNCKIT_TOML.lines() { |
| 27 |
let line = line.trim(); |
| 28 |
if let Some(rest) = line.strip_prefix("api_key") { |
| 29 |
let rest = rest.trim_start().strip_prefix('=')?.trim(); |
| 30 |
let key = rest.trim_matches('"'); |
| 31 |
if !key.is_empty() && key != "REPLACE_WITH_BB_SYNC_API_KEY" { |
| 32 |
return Some(key); |
| 33 |
} |
| 34 |
} |
| 35 |
} |
| 36 |
None |
| 37 |
} |
| 38 |
|
| 39 |
|
| 40 |
pub struct AppState { |
| 41 |
pub orchestrator: Orchestrator, |
| 42 |
|
| 43 |
pub sync_client: RwLock<Option<Arc<SyncKitClient>>>, |
| 44 |
|
| 45 |
pub data_dir: PathBuf, |
| 46 |
|
| 47 |
pub sync_mutex: tokio::sync::Mutex<()>, |
| 48 |
|
| 49 |
auto_fetch_handle: parking_lot::Mutex<Option<AbortHandle>>, |
| 50 |
|
| 51 |
cleanup_handle: parking_lot::Mutex<Option<AbortHandle>>, |
| 52 |
|
| 53 |
sync_scheduler_handle: parking_lot::Mutex<Option<AbortHandle>>, |
| 54 |
} |
| 55 |
|
| 56 |
impl AppState { |
| 57 |
pub async fn new(app: &AppHandle) -> Result<Self, String> { |
| 58 |
let app_data_dir = app |
| 59 |
.path() |
| 60 |
.app_data_dir() |
| 61 |
.map_err(|e| format!("Failed to get app data dir: {}", e))?; |
| 62 |
|
| 63 |
info!(?app_data_dir, "Initializing application state"); |
| 64 |
|
| 65 |
std::fs::create_dir_all(&app_data_dir) |
| 66 |
.map_err(|e| format!("Failed to create app data dir: {}", e))?; |
| 67 |
|
| 68 |
let db_path = app_data_dir.join("balanced_breakfast.db"); |
| 69 |
let db_url = format!("sqlite:{}?mode=rwc", db_path.display()); |
| 70 |
|
| 71 |
debug!(?db_path, "Connecting to database"); |
| 72 |
|
| 73 |
|
| 74 |
let plugins_dir = find_plugins_dir(app); |
| 75 |
info!(?plugins_dir, "Using plugins directory"); |
| 76 |
|
| 77 |
|
| 78 |
copy_bundled_plugins(app, &plugins_dir); |
| 79 |
|
| 80 |
let config = OrchestratorConfig { |
| 81 |
database_url: db_url, |
| 82 |
plugins_dir: plugins_dir.to_string_lossy().to_string(), |
| 83 |
fetch_interval_secs: 300, |
| 84 |
}; |
| 85 |
|
| 86 |
let mut orchestrator = Orchestrator::new(config) |
| 87 |
.await |
| 88 |
.map_err(|e| format!("Failed to create orchestrator: {}", e))?; |
| 89 |
|
| 90 |
|
| 91 |
|
| 92 |
|
| 93 |
let key_path = app_data_dir.join("encryption.key"); |
| 94 |
let key = bb_core::crypto::load_or_create_key_from_keychain(&key_path) |
| 95 |
.map_err(|e| format!("Failed to load encryption key: {e}"))?; |
| 96 |
info!("Encryption key loaded"); |
| 97 |
orchestrator.set_encryption_key(key); |
| 98 |
|
| 99 |
info!("Orchestrator created, running migrations"); |
| 100 |
|
| 101 |
orchestrator |
| 102 |
.migrate() |
| 103 |
.await |
| 104 |
.map_err(|e| format!("Failed to run migrations: {}", e))?; |
| 105 |
|
| 106 |
info!("Migrations complete, loading plugins"); |
| 107 |
|
| 108 |
match orchestrator.load_plugins().await { |
| 109 |
Ok(loaded) => info!(count = loaded.len(), "Loaded plugins"), |
| 110 |
Err(e) => tracing::warn!(error = %e, "Failed to load some plugins"), |
| 111 |
} |
| 112 |
|
| 113 |
|
| 114 |
if let Err(e) = orchestrator.encrypt_existing_secrets().await { |
| 115 |
tracing::warn!(error = %e, "Failed to encrypt existing secrets"); |
| 116 |
} |
| 117 |
|
| 118 |
|
| 119 |
init_plugins_from_db(&orchestrator).await; |
| 120 |
|
| 121 |
|
| 122 |
let sync_client = load_sync_client(&app_data_dir); |
| 123 |
|
| 124 |
|
| 125 |
let downloads_dir = std::env::temp_dir().join("bb-downloads"); |
| 126 |
if downloads_dir.exists() { |
| 127 |
if let Err(e) = std::fs::remove_dir_all(&downloads_dir) { |
| 128 |
debug!(error = %e, "Failed to clean up old download temp files"); |
| 129 |
} |
| 130 |
} |
| 131 |
|
| 132 |
info!("Application state initialized"); |
| 133 |
|
| 134 |
Ok(Self { |
| 135 |
orchestrator, |
| 136 |
sync_client: RwLock::new(sync_client.map(Arc::new)), |
| 137 |
data_dir: app_data_dir, |
| 138 |
sync_mutex: tokio::sync::Mutex::new(()), |
| 139 |
auto_fetch_handle: parking_lot::Mutex::new(None), |
| 140 |
cleanup_handle: parking_lot::Mutex::new(None), |
| 141 |
sync_scheduler_handle: parking_lot::Mutex::new(None), |
| 142 |
}) |
| 143 |
} |
| 144 |
|
| 145 |
|
| 146 |
pub fn set_auto_fetch_handle(&self, handle: AbortHandle) { |
| 147 |
let mut guard = self.auto_fetch_handle.lock(); |
| 148 |
|
| 149 |
if let Some(old) = guard.take() { |
| 150 |
old.abort(); |
| 151 |
} |
| 152 |
*guard = Some(handle); |
| 153 |
} |
| 154 |
|
| 155 |
|
| 156 |
pub fn abort_auto_fetch(&self) { |
| 157 |
let mut guard = self.auto_fetch_handle.lock(); |
| 158 |
if let Some(handle) = guard.take() { |
| 159 |
info!("Aborting auto-fetch background task"); |
| 160 |
handle.abort(); |
| 161 |
} |
| 162 |
} |
| 163 |
|
| 164 |
|
| 165 |
pub fn set_cleanup_handle(&self, handle: AbortHandle) { |
| 166 |
let mut guard = self.cleanup_handle.lock(); |
| 167 |
if let Some(old) = guard.take() { |
| 168 |
old.abort(); |
| 169 |
} |
| 170 |
*guard = Some(handle); |
| 171 |
} |
| 172 |
|
| 173 |
|
| 174 |
pub fn abort_cleanup(&self) { |
| 175 |
let mut guard = self.cleanup_handle.lock(); |
| 176 |
if let Some(handle) = guard.take() { |
| 177 |
info!("Aborting stale-cleanup background task"); |
| 178 |
handle.abort(); |
| 179 |
} |
| 180 |
} |
| 181 |
|
| 182 |
|
| 183 |
pub fn set_sync_scheduler_handle(&self, handle: AbortHandle) { |
| 184 |
let mut guard = self.sync_scheduler_handle.lock(); |
| 185 |
if let Some(old) = guard.take() { |
| 186 |
old.abort(); |
| 187 |
} |
| 188 |
*guard = Some(handle); |
| 189 |
} |
| 190 |
|
| 191 |
|
| 192 |
pub fn abort_sync_scheduler(&self) { |
| 193 |
let mut guard = self.sync_scheduler_handle.lock(); |
| 194 |
if let Some(handle) = guard.take() { |
| 195 |
info!("Aborting sync-scheduler background task"); |
| 196 |
handle.abort(); |
| 197 |
} |
| 198 |
} |
| 199 |
} |
| 200 |
|
| 201 |
impl Drop for AppState { |
| 202 |
fn drop(&mut self) { |
| 203 |
self.abort_auto_fetch(); |
| 204 |
self.abort_cleanup(); |
| 205 |
self.abort_sync_scheduler(); |
| 206 |
} |
| 207 |
} |
| 208 |
|
| 209 |
|
| 210 |
pub fn load_api_key(data_dir: &std::path::Path) -> Option<String> { |
| 211 |
let key_path = data_dir.join("sync_api_key"); |
| 212 |
if let Ok(key) = std::fs::read_to_string(&key_path) { |
| 213 |
let key = key.trim().to_string(); |
| 214 |
if !key.is_empty() { |
| 215 |
return Some(key); |
| 216 |
} |
| 217 |
} |
| 218 |
if let Ok(key) = std::env::var("BB_SYNC_API_KEY") { |
| 219 |
return Some(key); |
| 220 |
} |
| 221 |
|
| 222 |
parse_synckit_toml_key().map(String::from) |
| 223 |
} |
| 224 |
|
| 225 |
|
| 226 |
pub fn save_api_key(data_dir: &std::path::Path, api_key: &str) { |
| 227 |
let key_path = data_dir.join("sync_api_key"); |
| 228 |
if let Err(e) = std::fs::write(&key_path, api_key) { |
| 229 |
tracing::error!(error = %e, path = %key_path.display(), "Failed to save API key"); |
| 230 |
return; |
| 231 |
} |
| 232 |
|
| 233 |
#[cfg(unix)] |
| 234 |
{ |
| 235 |
use std::os::unix::fs::PermissionsExt; |
| 236 |
let perms = std::fs::Permissions::from_mode(0o600); |
| 237 |
if let Err(e) = std::fs::set_permissions(&key_path, perms) { |
| 238 |
tracing::warn!(error = %e, path = %key_path.display(), "Failed to set API key file permissions"); |
| 239 |
} |
| 240 |
} |
| 241 |
} |
| 242 |
|
| 243 |
|
| 244 |
fn load_sync_client(data_dir: &std::path::Path) -> Option<SyncKitClient> { |
| 245 |
let api_key = load_api_key(data_dir)?; |
| 246 |
let server_url = std::env::var("BB_SYNC_SERVER_URL") |
| 247 |
.unwrap_or_else(|_| SYNC_SERVER_URL.to_string()); |
| 248 |
info!(%server_url, "SyncKit client configured"); |
| 249 |
let client = SyncKitClient::new(SyncKitConfig { server_url, api_key }); |
| 250 |
match client.try_load_key_from_keychain() { |
| 251 |
Ok(true) => info!("Sync encryption key loaded from keychain"), |
| 252 |
Ok(false) => debug!("No sync encryption key in keychain"), |
| 253 |
Err(e) => warn!(error = %e, "Failed to load sync encryption key"), |
| 254 |
} |
| 255 |
Some(client) |
| 256 |
} |
| 257 |
|
| 258 |
|
| 259 |
fn find_plugins_dir(app: &AppHandle) -> PathBuf { |
| 260 |
|
| 261 |
let dev_plugins = PathBuf::from(env!("CARGO_MANIFEST_DIR")) |
| 262 |
.parent() |
| 263 |
.map(|p| p.join("plugins")); |
| 264 |
|
| 265 |
if let Some(ref dev_path) = dev_plugins { |
| 266 |
if dev_path.exists() { |
| 267 |
return dev_path.clone(); |
| 268 |
} |
| 269 |
} |
| 270 |
|
| 271 |
|
| 272 |
let base = app |
| 273 |
.path() |
| 274 |
.app_config_dir() |
| 275 |
.or_else(|_| app.path().app_data_dir()) |
| 276 |
.unwrap_or_else(|_| PathBuf::from(".")); |
| 277 |
base.join("plugins") |
| 278 |
} |
| 279 |
|
| 280 |
|
| 281 |
|
| 282 |
fn copy_bundled_plugins(app: &AppHandle, plugins_dir: &PathBuf) { |
| 283 |
if let Ok(resource_dir) = app.path().resource_dir() { |
| 284 |
let bundled = resource_dir.join("plugins"); |
| 285 |
if bundled.exists() { |
| 286 |
if let Err(e) = std::fs::create_dir_all(plugins_dir) { |
| 287 |
tracing::warn!(error = %e, path = ?plugins_dir, "Failed to create plugins directory"); |
| 288 |
return; |
| 289 |
} |
| 290 |
if let Ok(entries) = std::fs::read_dir(&bundled) { |
| 291 |
for entry in entries.flatten() { |
| 292 |
let path = entry.path(); |
| 293 |
if path.extension().is_some_and(|e| e == "rhai") { |
| 294 |
let dest = plugins_dir.join(entry.file_name()); |
| 295 |
|
| 296 |
if dest.exists() { |
| 297 |
if let (Ok(bundled_bytes), Ok(installed_bytes)) = |
| 298 |
(std::fs::read(&path), std::fs::read(&dest)) |
| 299 |
{ |
| 300 |
if bundled_bytes == installed_bytes { |
| 301 |
continue; |
| 302 |
} |
| 303 |
} |
| 304 |
} |
| 305 |
if let Err(e) = std::fs::copy(&path, &dest) { |
| 306 |
tracing::warn!(error = %e, ?path, "Failed to copy plugin"); |
| 307 |
} else { |
| 308 |
info!(file = ?entry.file_name(), "Copied bundled plugin"); |
| 309 |
} |
| 310 |
} |
| 311 |
} |
| 312 |
} |
| 313 |
} |
| 314 |
} |
| 315 |
} |
| 316 |
|
| 317 |
|
| 318 |
async fn init_plugins_from_db(orchestrator: &Orchestrator) { |
| 319 |
let plugin_ids = { |
| 320 |
let plugins = orchestrator.plugins(); |
| 321 |
let plugins = plugins.read().await; |
| 322 |
plugins.list_plugins() |
| 323 |
}; |
| 324 |
|
| 325 |
for plugin_id in plugin_ids { |
| 326 |
if let Err(e) = orchestrator.init_plugin_from_db(&plugin_id).await { |
| 327 |
tracing::warn!(error = %e, %plugin_id, "Failed to init plugin from DB"); |
| 328 |
} |
| 329 |
} |
| 330 |
} |
| 331 |
|
| 332 |
|
| 333 |
|
| 334 |
|
| 335 |
pub fn spawn_auto_fetch(app_handle: AppHandle, state: Arc<AppState>) { |
| 336 |
const CHECK_INTERVAL_SECS: u64 = 60; |
| 337 |
|
| 338 |
let task_state = Arc::clone(&state); |
| 339 |
let handle = tauri::async_runtime::spawn(async move { |
| 340 |
|
| 341 |
let mut backoff_until: HashMap<String, tokio::time::Instant> = HashMap::new(); |
| 342 |
|
| 343 |
loop { |
| 344 |
tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)).await; |
| 345 |
|
| 346 |
let plugin_ids = { |
| 347 |
let plugins = task_state.orchestrator.plugins(); |
| 348 |
let plugins = plugins.read().await; |
| 349 |
plugins.list_plugins() |
| 350 |
}; |
| 351 |
|
| 352 |
for plugin_id in plugin_ids { |
| 353 |
let interval = task_state.orchestrator.fetch_interval_secs(&plugin_id).await; |
| 354 |
if interval == 0 { |
| 355 |
continue; |
| 356 |
} |
| 357 |
|
| 358 |
|
| 359 |
if let Some(&until) = backoff_until.get(&plugin_id) { |
| 360 |
if tokio::time::Instant::now() < until { |
| 361 |
debug!(%plugin_id, "Skipping rate-limited feed (backoff active)"); |
| 362 |
continue; |
| 363 |
} |
| 364 |
backoff_until.remove(&plugin_id); |
| 365 |
} |
| 366 |
|
| 367 |
|
| 368 |
match task_state.orchestrator.is_circuit_broken(&plugin_id).await { |
| 369 |
Ok(true) => { |
| 370 |
debug!(%plugin_id, "Skipping circuit-broken feed"); |
| 371 |
continue; |
| 372 |
} |
| 373 |
Ok(false) => {} |
| 374 |
Err(e) => { |
| 375 |
error!(error = %e, %plugin_id, "Failed to check circuit breaker"); |
| 376 |
continue; |
| 377 |
} |
| 378 |
} |
| 379 |
|
| 380 |
|
| 381 |
let due = match is_fetch_due(&task_state, &plugin_id, interval).await { |
| 382 |
Ok(due) => due, |
| 383 |
Err(e) => { |
| 384 |
error!(error = %e, %plugin_id, "Failed to check fetch status"); |
| 385 |
continue; |
| 386 |
} |
| 387 |
}; |
| 388 |
|
| 389 |
if !due { |
| 390 |
continue; |
| 391 |
} |
| 392 |
|
| 393 |
info!(%plugin_id, "Auto-fetching plugin"); |
| 394 |
match task_state.orchestrator.fetch_plugin(&plugin_id).await { |
| 395 |
Ok(count) => { |
| 396 |
if count > 0 { |
| 397 |
info!(count, %plugin_id, "Auto-fetch complete"); |
| 398 |
|
| 399 |
if let Err(e) = app_handle.emit("auto-fetch-complete", &plugin_id) { |
| 400 |
debug!(error = %e, "Failed to emit auto-fetch-complete event"); |
| 401 |
} |
| 402 |
} |
| 403 |
} |
| 404 |
Err(e) => { |
| 405 |
|
| 406 |
let structured = bb_core::classify_error(&e.to_string()); |
| 407 |
let category_str = structured.category.to_string(); |
| 408 |
|
| 409 |
|
| 410 |
if structured.category == ErrorCategory::RateLimited { |
| 411 |
let delay_secs = structured.retry_after_secs.unwrap_or(60); |
| 412 |
info!(%plugin_id, delay_secs, "Rate limited, backing off"); |
| 413 |
backoff_until.insert( |
| 414 |
plugin_id.clone(), |
| 415 |
tokio::time::Instant::now() |
| 416 |
+ std::time::Duration::from_secs(delay_secs), |
| 417 |
); |
| 418 |
} |
| 419 |
|
| 420 |
error!(error = %e, %plugin_id, category = %category_str, "Auto-fetch failed"); |
| 421 |
|
| 422 |
|
| 423 |
if let Ok(true) = task_state.orchestrator.is_circuit_broken(&plugin_id).await |
| 424 |
{ |
| 425 |
info!(%plugin_id, "Circuit breaker tripped, emitting event"); |
| 426 |
if let Err(emit_err) = app_handle.emit( |
| 427 |
"feed-circuit-broken", |
| 428 |
serde_json::json!({ |
| 429 |
"pluginId": plugin_id, |
| 430 |
"error": e.to_string(), |
| 431 |
"category": category_str, |
| 432 |
}), |
| 433 |
) { |
| 434 |
debug!(error = %emit_err, "Failed to emit feed-circuit-broken event"); |
| 435 |
} |
| 436 |
} |
| 437 |
|
| 438 |
if let Err(emit_err) = app_handle.emit( |
| 439 |
"auto-fetch-error", |
| 440 |
serde_json::json!({ |
| 441 |
"pluginId": plugin_id, |
| 442 |
"error": e.to_string(), |
| 443 |
"category": category_str, |
| 444 |
}), |
| 445 |
) { |
| 446 |
debug!(error = %emit_err, "Failed to emit auto-fetch-error event"); |
| 447 |
} |
| 448 |
} |
| 449 |
} |
| 450 |
} |
| 451 |
} |
| 452 |
}); |
| 453 |
|
| 454 |
state.set_auto_fetch_handle(handle.inner().abort_handle()); |
| 455 |
} |
| 456 |
|
| 457 |
|
| 458 |
|
| 459 |
|
| 460 |
pub fn spawn_stale_cleanup(state: Arc<AppState>) { |
| 461 |
const CLEANUP_INTERVAL_SECS: u64 = 21600; |
| 462 |
const STALE_DAYS: i64 = 30; |
| 463 |
|
| 464 |
let task_state = Arc::clone(&state); |
| 465 |
let handle = tauri::async_runtime::spawn(async move { |
| 466 |
loop { |
| 467 |
tokio::time::sleep(std::time::Duration::from_secs(CLEANUP_INTERVAL_SECS)).await; |
| 468 |
|
| 469 |
let cutoff = chrono::Utc::now() - chrono::Duration::days(STALE_DAYS); |
| 470 |
let db = task_state.orchestrator.database(); |
| 471 |
match db.items().delete_stale_read(cutoff).await { |
| 472 |
Ok(count) if count > 0 => { |
| 473 |
info!(count, "Stale cleanup: deleted old read items"); |
| 474 |
} |
| 475 |
Ok(_) => {} |
| 476 |
Err(e) => { |
| 477 |
error!(error = %e, "Stale cleanup failed"); |
| 478 |
} |
| 479 |
} |
| 480 |
} |
| 481 |
}); |
| 482 |
|
| 483 |
state.set_cleanup_handle(handle.inner().abort_handle()); |
| 484 |
} |
| 485 |
|
| 486 |
|
| 487 |
|
| 488 |
async fn is_fetch_due( |
| 489 |
state: &AppState, |
| 490 |
plugin_id: &str, |
| 491 |
interval_secs: u64, |
| 492 |
) -> Result<bool, String> { |
| 493 |
let db = state.orchestrator.database(); |
| 494 |
let feeds = db |
| 495 |
.feeds() |
| 496 |
.get_by_busser(plugin_id) |
| 497 |
.await |
| 498 |
.map_err(|e| e.to_string())?; |
| 499 |
|
| 500 |
|
| 501 |
if feeds.is_empty() { |
| 502 |
return Ok(false); |
| 503 |
} |
| 504 |
|
| 505 |
let now = chrono::Utc::now(); |
| 506 |
let feed_times: Vec<_> = feeds |
| 507 |
.iter() |
| 508 |
.map(|f| f.last_fetch.as_deref()) |
| 509 |
.collect(); |
| 510 |
Ok(any_feed_due(&feed_times, interval_secs, now)) |
| 511 |
} |
| 512 |
|