//! Background sync scheduler: periodic auto-sync with exponential backoff on failure. //! SSE push notifications trigger immediate sync when another device pushes changes. use std::path::{Path, PathBuf}; use std::sync::Arc; use parking_lot::Mutex; use synckit_client::SyncKitClient; use tokio::sync::mpsc; use tracing::instrument; use crate::error::SyncError; use crate::service; use crate::SyncStatus; /// Commands the GUI can send to the scheduler. pub enum SyncCommand { /// Trigger an immediate sync cycle. SyncNow, /// Download a single sample blob by hash. Used by the "Download" row-level /// context menu so the user can pull one cloud-only sample without /// triggering a full sync cycle. DownloadOne { hash: String }, /// Stop the scheduler loop. Stop, } /// Delay before reconnecting the SSE stream after a disconnect (seconds). const SSE_RECONNECT_DELAY_SECS: u64 = 5; /// Run the background sync scheduler loop. /// /// Uses `db_path` to open short-lived connections inside `spawn_blocking`. #[instrument(skip_all)] pub async fn run_scheduler( client: Arc, db_path: PathBuf, content_dir: PathBuf, status: Arc>, mut commands: mpsc::UnboundedReceiver, ) { let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); let mut consecutive_failures: u32 = 0; let mut backoff_until: Option> = None; let mut sse_stream: Option = None; loop { tokio::select! { _ = interval.tick() => { // Check backoff if let Some(until) = backoff_until && chrono::Utc::now() < until { continue; } // Check if auto-sync is enabled and client is ready if !should_auto_sync(&db_path, &client) { continue; } // Check interval elapsed if !interval_elapsed(&db_path) { continue; } match run_sync_cycle(&db_path, &content_dir, &client, &status).await { Ok(pulled) => { consecutive_failures = 0; backoff_until = None; if pulled > 0 { status.lock().needs_refresh = true; } } Err(e) => { consecutive_failures += 1; let backoff_minutes = std::cmp::min( 2u64.saturating_pow(consecutive_failures), 15, ); backoff_until = Some( chrono::Utc::now() + chrono::Duration::minutes(backoff_minutes as i64), ); tracing::warn!( "Auto-sync failed (attempt {consecutive_failures}, backoff {backoff_minutes}m): {e}" ); status.lock().last_error = Some(e.to_string()); } } } result = async { if let Some(ref mut stream) = sse_stream { stream.next_change().await } else { std::future::pending::>().await } } => { match result { Some(()) => { tracing::debug!("SSE: received change notification, triggering immediate sync"); if should_auto_sync(&db_path, &client) { match run_sync_cycle(&db_path, &content_dir, &client, &status).await { Ok(pulled) => { consecutive_failures = 0; backoff_until = None; if pulled > 0 { status.lock().needs_refresh = true; } } Err(e) => { tracing::warn!("SSE-triggered sync failed: {e}"); status.lock().last_error = Some(e.to_string()); } } } } None => { tracing::debug!("SSE: stream disconnected, will reconnect"); sse_stream = None; tokio::time::sleep(std::time::Duration::from_secs(SSE_RECONNECT_DELAY_SECS)).await; } } } cmd = commands.recv() => { match cmd { Some(SyncCommand::SyncNow) => { match run_sync_cycle(&db_path, &content_dir, &client, &status).await { Ok(pulled) => { consecutive_failures = 0; backoff_until = None; if pulled > 0 { status.lock().needs_refresh = true; } } Err(e) => { tracing::error!("Manual sync failed: {e}"); status.lock().last_error = Some(e.to_string()); } } } Some(SyncCommand::DownloadOne { hash }) => { match service::download_one_blob(&db_path, &content_dir, &client, &hash).await { Ok(()) => { status.lock().needs_refresh = true; } Err(e) => { tracing::warn!("Single-blob download failed for {hash}: {e}"); status.lock().last_error = Some(e.to_string()); } } } Some(SyncCommand::Stop) | None => { tracing::info!("Sync scheduler stopping"); break; } } } } // Try to establish SSE connection if not connected and client is ready if sse_stream.is_none() && client.session_info().is_some() && client.has_master_key() { match client.subscribe().await { Ok(stream) => { tracing::debug!("SSE: connected to push notification stream"); sse_stream = Some(stream); } Err(e) => { tracing::debug!("SSE: failed to connect (will retry): {e}"); } } } } } /// Check if auto-sync is enabled and the client is ready (authenticated + has key). #[instrument(skip_all)] fn should_auto_sync(db_path: &std::path::Path, client: &SyncKitClient) -> bool { if client.session_info().is_none() || !client.has_master_key() { return false; } let conn = match rusqlite::Connection::open(db_path) { Ok(c) => c, Err(e) => { tracing::warn!("Failed to open DB for auto_sync check: {e}"); return false; } }; service::get_sync_state(&conn, "auto_sync_enabled").unwrap_or_default() == "1" } /// Check if enough time has elapsed since the last sync. #[instrument(skip_all)] fn interval_elapsed(db_path: &std::path::Path) -> bool { let conn = match rusqlite::Connection::open(db_path) { Ok(c) => c, Err(e) => { tracing::warn!("Failed to open DB for interval check: {e}"); return true; } }; let last_sync = service::get_sync_state(&conn, "last_sync_at").unwrap_or_default(); let interval_str = service::get_sync_state(&conn, "sync_interval_minutes") .unwrap_or_else(|_| "15".to_string()); let interval_minutes: i64 = interval_str.parse().unwrap_or(15); if last_sync.is_empty() { return true; } match chrono::DateTime::parse_from_rfc3339(&last_sync) { Ok(last) => { let elapsed = chrono::Utc::now() - last.with_timezone(&chrono::Utc); elapsed.num_minutes() >= interval_minutes } Err(_) => true, } } /// Run a single sync cycle: snapshot if needed, sync, blob sync, cleanup. #[instrument(skip_all)] async fn run_sync_cycle( db_path: &Path, content_dir: &Path, client: &SyncKitClient, status: &Arc>, ) -> std::result::Result { { let mut s = status.lock(); s.state = crate::SyncState::Syncing; s.last_error = None; } // Create initial snapshot if first sync let p = db_path.to_path_buf(); tokio::task::spawn_blocking(move || { let conn = rusqlite::Connection::open(&p)?; service::create_initial_snapshot(&conn) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; let result = service::perform_sync(db_path, client).await?; // Mark samples as cloud_only when blob doesn't exist locally let p = db_path.to_path_buf(); let cd = content_dir.to_path_buf(); if let Err(e) = tokio::task::spawn_blocking(move || { let conn = rusqlite::Connection::open(&p)?; service::mark_cloud_only_samples(&conn, &cd) }) .await .map_err(|e| SyncError::Other(e.to_string()))? { tracing::warn!("Cloud-only marking failed (non-fatal): {e}"); } // Blob sync: upload pending, then download missing if let Err(e) = service::upload_pending_blobs(db_path, content_dir, client).await { tracing::warn!("Blob upload failed (non-fatal): {e}"); } if let Err(e) = service::download_missing_blobs(db_path, content_dir, client).await { tracing::warn!("Blob download failed (non-fatal): {e}"); } // Cleanup old entries + enforce retention cap let p = db_path.to_path_buf(); tokio::task::spawn_blocking(move || { let conn = rusqlite::Connection::open(&p)?; service::cleanup_changelog(&conn)?; service::enforce_changelog_retention(&conn) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; // Update status { let p = db_path.to_path_buf(); let last_sync = tokio::task::spawn_blocking(move || { match rusqlite::Connection::open(&p) { Ok(c) => service::get_sync_state(&c, "last_sync_at").unwrap_or_default(), Err(e) => { tracing::warn!("Failed to open DB for last_sync_at: {e}"); String::new() } } }) .await .unwrap_or_default(); let p2 = db_path.to_path_buf(); let pending = tokio::task::spawn_blocking(move || { match rusqlite::Connection::open(&p2) { Ok(c) => service::count_pending_changes(&c).unwrap_or(0), Err(e) => { tracing::warn!("Failed to open DB for pending changes: {e}"); 0 } } }) .await .unwrap_or(0); let mut s = status.lock(); s.state = crate::SyncState::Ready; s.last_sync_at = if last_sync.is_empty() { None } else { Some(last_sync) }; s.pending_changes = pending; } Ok(result.pulled) }