Skip to main content

max / audiofiles

17.7 KB · 466 lines History Blame Raw
1 //! audiofiles SyncKit integration: cloud sync for sample metadata, VFS, tags, and collections.
2 //!
3 //! Provides [`SyncManager`] as the public API, callable from the GUI thread.
4 //! All async work runs on an internal tokio runtime handle.
5
6 pub mod auth;
7 pub mod error;
8 pub mod scheduler;
9 pub mod service;
10
11 use std::path::PathBuf;
12 use std::sync::Arc;
13
14 use tracing::instrument;
15
16 use parking_lot::Mutex;
17 use synckit_client::SyncKitClient;
18 use tokio::runtime::Handle;
19 use tokio::sync::mpsc;
20
21 use error::Result;
22 use scheduler::SyncCommand;
23
24 /// Sync engine entry point, owned by the app and shared with the GUI.
25 pub struct SyncManager {
26 client: Arc<SyncKitClient>,
27 db_path: PathBuf,
28 content_dir: PathBuf,
29 runtime: Handle,
30 status: Arc<Mutex<SyncStatus>>,
31 command_tx: Mutex<Option<mpsc::UnboundedSender<SyncCommand>>>,
32 /// Sender for cancelling the in-flight OAuth flow. Set in `start_auth`,
33 /// taken (and triggered) in `cancel_auth`. Closing this channel makes the
34 /// spawned auth-await task return early without mutating sync state.
35 auth_cancel_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
36 }
37
38 /// Observable sync status, read by the GUI each frame.
39 #[derive(Debug, Clone)]
40 pub struct SyncStatus {
41 pub state: SyncState,
42 pub last_sync_at: Option<String>,
43 pub pending_changes: i64,
44 pub last_error: Option<String>,
45 pub device_id: Option<String>,
46 pub auto_sync_enabled: bool,
47 pub sync_interval_minutes: u32,
48 /// Set to true when remote changes were pulled — GUI should reload VFS/contents.
49 pub needs_refresh: bool,
50 /// Subscription status for blob sync tier (populated async).
51 pub subscription: Option<synckit_client::SubscriptionStatus>,
52 /// Pricing-formula constants (fetched once from server, used to quote
53 /// prices locally as the user adjusts the cap slider).
54 pub pricing: Option<synckit_client::AppPricing>,
55 }
56
57 impl Default for SyncStatus {
58 fn default() -> Self {
59 Self {
60 state: SyncState::Disconnected,
61 last_sync_at: None,
62 pending_changes: 0,
63 last_error: None,
64 device_id: None,
65 auto_sync_enabled: false,
66 sync_interval_minutes: 15,
67 needs_refresh: false,
68 subscription: None,
69 pricing: None,
70 }
71 }
72 }
73
74 /// High-level sync state for UI display.
75 #[derive(Debug, Clone, PartialEq)]
76 pub enum SyncState {
77 /// Not configured or not authenticated.
78 Disconnected,
79 /// OAuth flow in progress.
80 Authenticating,
81 /// Authenticated but encryption not set up. `has_server_key` indicates whether
82 /// the user has previously set up encryption on another device.
83 NeedsEncryption { has_server_key: bool },
84 /// Fully configured and idle.
85 Ready,
86 /// Sync cycle in progress.
87 Syncing,
88 }
89
90 impl SyncManager {
91 /// Create a new SyncManager. Call [`start_scheduler`] after construction.
92 ///
93 /// `content_dir` is the content-addressed sample store root used for blob sync.
94 pub fn new(config: SyncKitConfig, db_path: PathBuf, content_dir: PathBuf, runtime: Handle) -> Self {
95 let client = Arc::new(SyncKitClient::new(config));
96 let status = Arc::new(Mutex::new(SyncStatus::default()));
97
98 Self {
99 client,
100 db_path,
101 content_dir,
102 runtime,
103 status,
104 command_tx: Mutex::new(None),
105 auth_cancel_tx: Mutex::new(None),
106 }
107 }
108
109 /// Read the current sync status (cheap mutex read).
110 pub fn status(&self) -> SyncStatus {
111 self.status.lock().clone()
112 }
113
114 /// Clear the needs_refresh flag after the GUI has reloaded.
115 pub fn clear_needs_refresh(&self) {
116 self.status.lock().needs_refresh = false;
117 }
118
119 /// Dismiss the surfaced `last_error`. The error banner in the sync panel
120 /// calls this when the user clicks Dismiss or Retry — Retry re-runs the
121 /// action and clears the previous error in the same gesture.
122 pub fn clear_last_error(&self) {
123 self.status.lock().last_error = None;
124 }
125
126 /// Start the OAuth2 PKCE authentication flow.
127 /// Opens the callback server and returns the auth URL. A background task
128 /// automatically awaits the callback and completes authentication, or
129 /// terminates early if [`cancel_auth`] fires.
130 #[instrument(skip_all)]
131 pub fn start_auth(&self) -> Result<String> {
132 self.status.lock().state = SyncState::Authenticating;
133 let session = auth::start_auth(&self.client)?;
134 let auth_url = session.auth_url.clone();
135
136 // Install a fresh cancel channel for this flow. Dropping any prior
137 // sender quietly cancels its waiter — start_auth being re-entrant is
138 // a corner case but shouldn't leak old senders.
139 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
140 *self.auth_cancel_tx.lock() = Some(cancel_tx);
141
142 // Spawn background task to await callback and auto-complete auth
143 let client = self.client.clone();
144 let status = self.status.clone();
145 let db_path = self.db_path.clone();
146 let expected_state = session.expected_state.clone();
147 let code_verifier = session.code_verifier.clone();
148 let port = session.port;
149 let mut code_rx = session.code_rx;
150
151 self.runtime.spawn(async move {
152 // Race the callback against the cancel signal. If cancel wins, the
153 // task exits without mutating state — `cancel_auth` already set
154 // state to Disconnected at the call site, and any late callback
155 // result would otherwise transition state back to Ready /
156 // NeedsEncryption.
157 tokio::select! {
158 _ = cancel_rx => {}
159 recv = &mut code_rx => {
160 match recv {
161 Ok(result) => {
162 if result.state != expected_state {
163 let mut s = status.lock();
164 s.last_error = Some("CSRF state mismatch".to_string());
165 s.state = SyncState::Disconnected;
166 return;
167 }
168
169 match client
170 .authenticate_with_code(&result.code, &code_verifier, port, "__internal__")
171 .await
172 {
173 Ok(_) => {
174 let has_key = client.try_load_key_from_keychain().unwrap_or(false);
175 if has_key {
176 let mut s = status.lock();
177 s.state = SyncState::Ready;
178 load_sync_settings_into_status(&db_path, &mut s);
179 } else {
180 let has_server_key =
181 client.has_server_key().await.unwrap_or(false);
182 status.lock().state =
183 SyncState::NeedsEncryption { has_server_key };
184 }
185 }
186 Err(e) => {
187 let mut s = status.lock();
188 s.state = SyncState::Disconnected;
189 s.last_error = Some(format!("Auth failed: {e}"));
190 }
191 }
192 }
193 Err(_) => {
194 // Callback server timed out or was dropped
195 let mut s = status.lock();
196 if s.state == SyncState::Authenticating {
197 s.state = SyncState::Disconnected;
198 s.last_error = Some("Authentication timed out".to_string());
199 }
200 }
201 }
202 }
203 }
204 });
205
206 Ok(auth_url)
207 }
208
209 /// Cancel an in-flight OAuth flow. Returns to [`SyncState::Disconnected`]
210 /// immediately; the background callback-await task aborts on its next poll.
211 /// No-op when there is no active flow.
212 #[instrument(skip_all)]
213 pub fn cancel_auth(&self) {
214 if let Some(tx) = self.auth_cancel_tx.lock().take() {
215 // Send may fail if the task already terminated — that's fine.
216 let _ = tx.send(());
217 }
218 let mut s = self.status.lock();
219 if s.state == SyncState::Authenticating {
220 s.state = SyncState::Disconnected;
221 s.last_error = None;
222 }
223 }
224
225
226 /// Set up encryption (new or existing, depending on `is_new`).
227 #[instrument(skip_all)]
228 pub fn setup_encryption(&self, password: String, is_new: bool) {
229 let client = self.client.clone();
230 let status = self.status.clone();
231 let db_path = self.db_path.clone();
232
233 self.runtime.spawn(async move {
234 let result = if is_new {
235 client.setup_encryption_new(&password).await
236 } else {
237 client.setup_encryption_existing(&password).await
238 };
239
240 match result {
241 Ok(()) => {
242 let mut s = status.lock();
243 s.state = SyncState::Ready;
244 load_sync_settings_into_status(&db_path, &mut s);
245 }
246 Err(e) => {
247 status.lock().last_error =
248 Some(format!("Encryption setup failed: {e}"));
249 }
250 }
251 });
252 }
253
254 /// Trigger an immediate sync cycle.
255 pub fn sync_now(&self) {
256 if let Some(tx) = self.command_tx.lock().as_ref() {
257 let _ = tx.send(SyncCommand::SyncNow);
258 }
259 }
260
261 /// Trigger a targeted download of one cloud-only sample blob. Returns
262 /// `true` if the request was queued; `false` if the scheduler isn't
263 /// running yet (in which case the caller should fall back to a full sync
264 /// or surface "sync not ready").
265 pub fn download_sample(&self, hash: &str) -> bool {
266 if let Some(tx) = self.command_tx.lock().as_ref() {
267 tx.send(SyncCommand::DownloadOne { hash: hash.to_string() }).is_ok()
268 } else {
269 false
270 }
271 }
272
273 /// Update auto-sync settings.
274 #[instrument(skip_all)]
275 pub fn update_settings(&self, auto_sync: Option<bool>, interval: Option<u32>) {
276 if let Ok(conn) = rusqlite::Connection::open(&self.db_path) {
277 if let Some(enabled) = auto_sync {
278 let _ = service::set_sync_state(
279 &conn,
280 "auto_sync_enabled",
281 if enabled { "1" } else { "0" },
282 );
283 }
284 if let Some(mins) = interval {
285 let _ = service::set_sync_state(
286 &conn,
287 "sync_interval_minutes",
288 &mins.to_string(),
289 );
290 }
291 }
292
293 let mut s = self.status.lock();
294 if let Some(enabled) = auto_sync {
295 s.auto_sync_enabled = enabled;
296 }
297 if let Some(mins) = interval {
298 s.sync_interval_minutes = mins;
299 }
300 }
301
302 /// Disconnect: clear status, reset to disconnected.
303 pub fn disconnect(&self) {
304 let mut s = self.status.lock();
305 s.state = SyncState::Disconnected;
306 s.last_error = None;
307 s.device_id = None;
308 }
309
310 /// Try to restore a previous session from keychain on startup.
311 #[instrument(skip_all)]
312 pub fn try_restore_session(&self) {
313 if self.client.session_info().is_some() {
314 let has_key = self.client.try_load_key_from_keychain().unwrap_or(false);
315 if has_key {
316 let mut s = self.status.lock();
317 s.state = SyncState::Ready;
318 load_sync_settings_into_status(&self.db_path, &mut s);
319 }
320 }
321 }
322
323 /// Fetch the pricing formula from the server (async, no JWT needed).
324 /// Stored on the status so UI code can quote a price for any cap locally.
325 pub fn fetch_pricing(&self) {
326 let client = self.client.clone();
327 let status = self.status.clone();
328 self.runtime.spawn(async move {
329 match client.get_app_pricing().await {
330 Ok(pricing) => {
331 status.lock().pricing = Some(pricing);
332 }
333 Err(e) => {
334 tracing::debug!("Failed to fetch app pricing: {e}");
335 }
336 }
337 });
338 }
339
340 /// Fetch subscription status from the server (async, result goes to status.subscription).
341 /// On error (404, network, etc.) treats the user as unsubscribed so the UI can show the
342 /// subscribe CTA instead of spinning forever.
343 pub fn fetch_subscription_status(&self) {
344 let client = self.client.clone();
345 let status = self.status.clone();
346 self.runtime.spawn(async move {
347 let sub = match client.get_subscription_status().await {
348 Ok(sub) => sub,
349 Err(e) => {
350 tracing::debug!("Failed to fetch subscription status, treating as inactive: {e}");
351 synckit_client::SubscriptionStatus::default()
352 }
353 };
354 status.lock().subscription = Some(sub);
355 });
356 }
357
358 /// Create a Stripe checkout session at the chosen storage cap and open
359 /// it in the browser. Polls for subscription activation after opening.
360 pub fn subscribe(&self, cap_bytes: i64, interval: synckit_client::BillingInterval) {
361 let client = self.client.clone();
362 let status = self.status.clone();
363 self.runtime.spawn(async move {
364 match client.create_subscription_checkout(cap_bytes, interval).await {
365 Ok(resp) => {
366 if let Err(e) = open::that(&resp.checkout_url) {
367 tracing::warn!("Failed to open browser: {e}");
368 }
369 // Poll for subscription activation (5s intervals, up to 10 minutes)
370 for _ in 0..120 {
371 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
372 if let Ok(sub) = client.get_subscription_status().await
373 && sub.active {
374 status.lock().subscription = Some(sub);
375 tracing::info!("Subscription activated");
376 break;
377 }
378 }
379 }
380 Err(e) => {
381 tracing::error!("Failed to create checkout: {e}");
382 status.lock().last_error =
383 Some(format!("Could not start checkout: {e}"));
384 }
385 }
386 });
387 }
388
389 /// Queue a storage-cap change that applies at the next billing cycle.
390 pub fn queue_cap_change(&self, cap_bytes: i64) {
391 let client = self.client.clone();
392 let status = self.status.clone();
393 self.runtime.spawn(async move {
394 match client.queue_storage_cap_change(cap_bytes).await {
395 Ok(sub) => {
396 status.lock().subscription = Some(sub);
397 tracing::info!(cap_bytes, "Storage cap change queued");
398 }
399 Err(e) => {
400 tracing::error!("Failed to queue cap change: {e}");
401 status.lock().last_error =
402 Some(format!("Could not update storage cap: {e}"));
403 }
404 }
405 });
406 }
407
408 /// Spawn the background sync scheduler task.
409 #[instrument(skip_all)]
410 pub fn start_scheduler(&self) {
411 let (tx, rx) = mpsc::unbounded_channel();
412 *self.command_tx.lock() = Some(tx);
413
414 let client = self.client.clone();
415 let db_path = self.db_path.clone();
416 let content_dir = self.content_dir.clone();
417 let status = self.status.clone();
418
419 self.runtime.spawn(scheduler::run_scheduler(
420 client, db_path, content_dir, status, rx,
421 ));
422 }
423 }
424
425 /// Load sync settings from the database into a SyncStatus.
426 #[instrument(skip_all)]
427 fn load_sync_settings_into_status(db_path: &PathBuf, s: &mut SyncStatus) {
428 let conn = match rusqlite::Connection::open(db_path) {
429 Ok(c) => c,
430 Err(e) => {
431 tracing::warn!("Failed to open DB for sync settings: {e}");
432 return;
433 }
434 };
435
436 s.auto_sync_enabled = service::get_sync_state(&conn, "auto_sync_enabled")
437 .inspect_err(|e| tracing::warn!("Failed to read auto_sync_enabled: {e}"))
438 .unwrap_or_default()
439 == "1";
440 s.sync_interval_minutes = service::get_sync_state(&conn, "sync_interval_minutes")
441 .inspect_err(|e| tracing::warn!("Failed to read sync_interval_minutes: {e}"))
442 .unwrap_or_else(|_| "15".to_string())
443 .parse()
444 .unwrap_or(15);
445 s.pending_changes = service::count_pending_changes(&conn)
446 .inspect_err(|e| tracing::warn!("Failed to count pending changes: {e}"))
447 .unwrap_or(0);
448 s.last_sync_at = {
449 let v = service::get_sync_state(&conn, "last_sync_at")
450 .inspect_err(|e| tracing::warn!("Failed to read last_sync_at: {e}"))
451 .unwrap_or_default();
452 if v.is_empty() { None } else { Some(v) }
453 };
454 s.device_id = {
455 let v = service::get_sync_state(&conn, "device_id")
456 .inspect_err(|e| tracing::warn!("Failed to read device_id: {e}"))
457 .unwrap_or_default();
458 if v.is_empty() { None } else { Some(v) }
459 };
460 }
461
462 // Re-export for convenience
463 pub use synckit_client::SyncKitConfig;
464 pub use synckit_client::{AppPricing, BillingInterval, PriceQuote};
465 pub use synckit_client::validate_api_key;
466