//! audiofiles SyncKit integration: cloud sync for sample metadata, VFS, tags, and collections. //! //! Provides [`SyncManager`] as the public API, callable from the GUI thread. //! All async work runs on an internal tokio runtime handle. pub mod auth; pub mod error; pub mod scheduler; pub mod service; use std::path::PathBuf; use std::sync::Arc; use tracing::instrument; use parking_lot::Mutex; use synckit_client::SyncKitClient; use tokio::runtime::Handle; use tokio::sync::mpsc; use error::Result; use scheduler::SyncCommand; /// Sync engine entry point, owned by the app and shared with the GUI. pub struct SyncManager { client: Arc, db_path: PathBuf, content_dir: PathBuf, runtime: Handle, status: Arc>, command_tx: Mutex>>, /// Sender for cancelling the in-flight OAuth flow. Set in `start_auth`, /// taken (and triggered) in `cancel_auth`. Closing this channel makes the /// spawned auth-await task return early without mutating sync state. auth_cancel_tx: Mutex>>, } /// Observable sync status, read by the GUI each frame. #[derive(Debug, Clone)] pub struct SyncStatus { pub state: SyncState, pub last_sync_at: Option, pub pending_changes: i64, pub last_error: Option, pub device_id: Option, pub auto_sync_enabled: bool, pub sync_interval_minutes: u32, /// Set to true when remote changes were pulled — GUI should reload VFS/contents. pub needs_refresh: bool, /// Subscription status for blob sync tier (populated async). pub subscription: Option, /// Pricing-formula constants (fetched once from server, used to quote /// prices locally as the user adjusts the cap slider). pub pricing: Option, } impl Default for SyncStatus { fn default() -> Self { Self { state: SyncState::Disconnected, last_sync_at: None, pending_changes: 0, last_error: None, device_id: None, auto_sync_enabled: false, sync_interval_minutes: 15, needs_refresh: false, subscription: None, pricing: None, } } } /// High-level sync state for UI display. #[derive(Debug, Clone, PartialEq)] pub enum SyncState { /// Not configured or not authenticated. Disconnected, /// OAuth flow in progress. Authenticating, /// Authenticated but encryption not set up. `has_server_key` indicates whether /// the user has previously set up encryption on another device. NeedsEncryption { has_server_key: bool }, /// Fully configured and idle. Ready, /// Sync cycle in progress. Syncing, } impl SyncManager { /// Create a new SyncManager. Call [`start_scheduler`] after construction. /// /// `content_dir` is the content-addressed sample store root used for blob sync. pub fn new(config: SyncKitConfig, db_path: PathBuf, content_dir: PathBuf, runtime: Handle) -> Self { let client = Arc::new(SyncKitClient::new(config)); let status = Arc::new(Mutex::new(SyncStatus::default())); Self { client, db_path, content_dir, runtime, status, command_tx: Mutex::new(None), auth_cancel_tx: Mutex::new(None), } } /// Read the current sync status (cheap mutex read). pub fn status(&self) -> SyncStatus { self.status.lock().clone() } /// Clear the needs_refresh flag after the GUI has reloaded. pub fn clear_needs_refresh(&self) { self.status.lock().needs_refresh = false; } /// Dismiss the surfaced `last_error`. The error banner in the sync panel /// calls this when the user clicks Dismiss or Retry — Retry re-runs the /// action and clears the previous error in the same gesture. pub fn clear_last_error(&self) { self.status.lock().last_error = None; } /// Start the OAuth2 PKCE authentication flow. /// Opens the callback server and returns the auth URL. A background task /// automatically awaits the callback and completes authentication, or /// terminates early if [`cancel_auth`] fires. #[instrument(skip_all)] pub fn start_auth(&self) -> Result { self.status.lock().state = SyncState::Authenticating; let session = auth::start_auth(&self.client)?; let auth_url = session.auth_url.clone(); // Install a fresh cancel channel for this flow. Dropping any prior // sender quietly cancels its waiter — start_auth being re-entrant is // a corner case but shouldn't leak old senders. let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>(); *self.auth_cancel_tx.lock() = Some(cancel_tx); // Spawn background task to await callback and auto-complete auth let client = self.client.clone(); let status = self.status.clone(); let db_path = self.db_path.clone(); let expected_state = session.expected_state.clone(); let code_verifier = session.code_verifier.clone(); let port = session.port; let mut code_rx = session.code_rx; self.runtime.spawn(async move { // Race the callback against the cancel signal. If cancel wins, the // task exits without mutating state — `cancel_auth` already set // state to Disconnected at the call site, and any late callback // result would otherwise transition state back to Ready / // NeedsEncryption. tokio::select! { _ = cancel_rx => {} recv = &mut code_rx => { match recv { Ok(result) => { if result.state != expected_state { let mut s = status.lock(); s.last_error = Some("CSRF state mismatch".to_string()); s.state = SyncState::Disconnected; return; } match client .authenticate_with_code(&result.code, &code_verifier, port, "__internal__") .await { Ok(_) => { let has_key = client.try_load_key_from_keychain().unwrap_or(false); if has_key { let mut s = status.lock(); s.state = SyncState::Ready; load_sync_settings_into_status(&db_path, &mut s); } else { let has_server_key = client.has_server_key().await.unwrap_or(false); status.lock().state = SyncState::NeedsEncryption { has_server_key }; } } Err(e) => { let mut s = status.lock(); s.state = SyncState::Disconnected; s.last_error = Some(format!("Auth failed: {e}")); } } } Err(_) => { // Callback server timed out or was dropped let mut s = status.lock(); if s.state == SyncState::Authenticating { s.state = SyncState::Disconnected; s.last_error = Some("Authentication timed out".to_string()); } } } } } }); Ok(auth_url) } /// Cancel an in-flight OAuth flow. Returns to [`SyncState::Disconnected`] /// immediately; the background callback-await task aborts on its next poll. /// No-op when there is no active flow. #[instrument(skip_all)] pub fn cancel_auth(&self) { if let Some(tx) = self.auth_cancel_tx.lock().take() { // Send may fail if the task already terminated — that's fine. let _ = tx.send(()); } let mut s = self.status.lock(); if s.state == SyncState::Authenticating { s.state = SyncState::Disconnected; s.last_error = None; } } /// Set up encryption (new or existing, depending on `is_new`). #[instrument(skip_all)] pub fn setup_encryption(&self, password: String, is_new: bool) { let client = self.client.clone(); let status = self.status.clone(); let db_path = self.db_path.clone(); self.runtime.spawn(async move { let result = if is_new { client.setup_encryption_new(&password).await } else { client.setup_encryption_existing(&password).await }; match result { Ok(()) => { let mut s = status.lock(); s.state = SyncState::Ready; load_sync_settings_into_status(&db_path, &mut s); } Err(e) => { status.lock().last_error = Some(format!("Encryption setup failed: {e}")); } } }); } /// Trigger an immediate sync cycle. pub fn sync_now(&self) { if let Some(tx) = self.command_tx.lock().as_ref() { let _ = tx.send(SyncCommand::SyncNow); } } /// Trigger a targeted download of one cloud-only sample blob. Returns /// `true` if the request was queued; `false` if the scheduler isn't /// running yet (in which case the caller should fall back to a full sync /// or surface "sync not ready"). pub fn download_sample(&self, hash: &str) -> bool { if let Some(tx) = self.command_tx.lock().as_ref() { tx.send(SyncCommand::DownloadOne { hash: hash.to_string() }).is_ok() } else { false } } /// Update auto-sync settings. #[instrument(skip_all)] pub fn update_settings(&self, auto_sync: Option, interval: Option) { if let Ok(conn) = rusqlite::Connection::open(&self.db_path) { if let Some(enabled) = auto_sync { let _ = service::set_sync_state( &conn, "auto_sync_enabled", if enabled { "1" } else { "0" }, ); } if let Some(mins) = interval { let _ = service::set_sync_state( &conn, "sync_interval_minutes", &mins.to_string(), ); } } let mut s = self.status.lock(); if let Some(enabled) = auto_sync { s.auto_sync_enabled = enabled; } if let Some(mins) = interval { s.sync_interval_minutes = mins; } } /// Disconnect: clear status, reset to disconnected. pub fn disconnect(&self) { let mut s = self.status.lock(); s.state = SyncState::Disconnected; s.last_error = None; s.device_id = None; } /// Try to restore a previous session from keychain on startup. #[instrument(skip_all)] pub fn try_restore_session(&self) { if self.client.session_info().is_some() { let has_key = self.client.try_load_key_from_keychain().unwrap_or(false); if has_key { let mut s = self.status.lock(); s.state = SyncState::Ready; load_sync_settings_into_status(&self.db_path, &mut s); } } } /// Fetch the pricing formula from the server (async, no JWT needed). /// Stored on the status so UI code can quote a price for any cap locally. pub fn fetch_pricing(&self) { let client = self.client.clone(); let status = self.status.clone(); self.runtime.spawn(async move { match client.get_app_pricing().await { Ok(pricing) => { status.lock().pricing = Some(pricing); } Err(e) => { tracing::debug!("Failed to fetch app pricing: {e}"); } } }); } /// Fetch subscription status from the server (async, result goes to status.subscription). /// On error (404, network, etc.) treats the user as unsubscribed so the UI can show the /// subscribe CTA instead of spinning forever. pub fn fetch_subscription_status(&self) { let client = self.client.clone(); let status = self.status.clone(); self.runtime.spawn(async move { let sub = match client.get_subscription_status().await { Ok(sub) => sub, Err(e) => { tracing::debug!("Failed to fetch subscription status, treating as inactive: {e}"); synckit_client::SubscriptionStatus::default() } }; status.lock().subscription = Some(sub); }); } /// Create a Stripe checkout session at the chosen storage cap and open /// it in the browser. Polls for subscription activation after opening. pub fn subscribe(&self, cap_bytes: i64, interval: synckit_client::BillingInterval) { let client = self.client.clone(); let status = self.status.clone(); self.runtime.spawn(async move { match client.create_subscription_checkout(cap_bytes, interval).await { Ok(resp) => { if let Err(e) = open::that(&resp.checkout_url) { tracing::warn!("Failed to open browser: {e}"); } // Poll for subscription activation (5s intervals, up to 10 minutes) for _ in 0..120 { tokio::time::sleep(std::time::Duration::from_secs(5)).await; if let Ok(sub) = client.get_subscription_status().await && sub.active { status.lock().subscription = Some(sub); tracing::info!("Subscription activated"); break; } } } Err(e) => { tracing::error!("Failed to create checkout: {e}"); status.lock().last_error = Some(format!("Could not start checkout: {e}")); } } }); } /// Queue a storage-cap change that applies at the next billing cycle. pub fn queue_cap_change(&self, cap_bytes: i64) { let client = self.client.clone(); let status = self.status.clone(); self.runtime.spawn(async move { match client.queue_storage_cap_change(cap_bytes).await { Ok(sub) => { status.lock().subscription = Some(sub); tracing::info!(cap_bytes, "Storage cap change queued"); } Err(e) => { tracing::error!("Failed to queue cap change: {e}"); status.lock().last_error = Some(format!("Could not update storage cap: {e}")); } } }); } /// Spawn the background sync scheduler task. #[instrument(skip_all)] pub fn start_scheduler(&self) { let (tx, rx) = mpsc::unbounded_channel(); *self.command_tx.lock() = Some(tx); let client = self.client.clone(); let db_path = self.db_path.clone(); let content_dir = self.content_dir.clone(); let status = self.status.clone(); self.runtime.spawn(scheduler::run_scheduler( client, db_path, content_dir, status, rx, )); } } /// Load sync settings from the database into a SyncStatus. #[instrument(skip_all)] fn load_sync_settings_into_status(db_path: &PathBuf, s: &mut SyncStatus) { let conn = match rusqlite::Connection::open(db_path) { Ok(c) => c, Err(e) => { tracing::warn!("Failed to open DB for sync settings: {e}"); return; } }; s.auto_sync_enabled = service::get_sync_state(&conn, "auto_sync_enabled") .inspect_err(|e| tracing::warn!("Failed to read auto_sync_enabled: {e}")) .unwrap_or_default() == "1"; s.sync_interval_minutes = service::get_sync_state(&conn, "sync_interval_minutes") .inspect_err(|e| tracing::warn!("Failed to read sync_interval_minutes: {e}")) .unwrap_or_else(|_| "15".to_string()) .parse() .unwrap_or(15); s.pending_changes = service::count_pending_changes(&conn) .inspect_err(|e| tracing::warn!("Failed to count pending changes: {e}")) .unwrap_or(0); s.last_sync_at = { let v = service::get_sync_state(&conn, "last_sync_at") .inspect_err(|e| tracing::warn!("Failed to read last_sync_at: {e}")) .unwrap_or_default(); if v.is_empty() { None } else { Some(v) } }; s.device_id = { let v = service::get_sync_state(&conn, "device_id") .inspect_err(|e| tracing::warn!("Failed to read device_id: {e}")) .unwrap_or_default(); if v.is_empty() { None } else { Some(v) } }; } // Re-export for convenience pub use synckit_client::SyncKitConfig; pub use synckit_client::{AppPricing, BillingInterval, PriceQuote}; pub use synckit_client::validate_api_key;