//! Background sync scheduler with exponential backoff and SSE push notifications. use bb_core::scheduler::exponential_backoff_secs; use crate::state::AppState; use crate::sync_service; use std::sync::Arc; use tauri::{AppHandle, Emitter, Manager}; use tracing::{debug, error, info, warn}; /// How often (seconds) the scheduler checks whether a sync is due. const CHECK_INTERVAL_SECS: u64 = 60; /// Delay before reconnecting the SSE stream after a disconnect (seconds). const SSE_RECONNECT_DELAY_SECS: u64 = 5; /// Spawn a background task that periodically syncs if auto-sync is enabled. /// Also maintains an SSE connection for real-time push notifications. /// Returns the `AbortHandle` so the caller can cancel the task on shutdown. pub fn start_sync_scheduler(app: AppHandle) -> tokio::task::AbortHandle { let handle = tauri::async_runtime::spawn(async move { let mut consecutive_failures: u32 = 0; let mut backoff_until: Option> = None; let mut sse_triggered = false; let mut sse_stream: Option = None; loop { tokio::select! { _ = tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)) => {} result = async { if let Some(ref mut stream) = sse_stream { stream.next_change().await } else { std::future::pending::>().await } } => { match result { Some(()) => { debug!("SSE: received change notification, triggering immediate sync"); sse_triggered = true; } None => { debug!("SSE: stream disconnected, will reconnect"); sse_stream = None; tokio::time::sleep(std::time::Duration::from_secs(SSE_RECONNECT_DELAY_SECS)).await; } } } } let state = match app.try_state::>() { Some(s) => s, None => { debug!("Sync scheduler: app state not available yet"); continue; } }; let pool = state.orchestrator.database().pool(); // Enforce changelog retention cap every tick, even when sync is // disconnected/disabled — that's exactly when entries accumulate. if let Err(e) = sync_service::enforce_changelog_retention(pool).await { warn!(error = %e, "Sync changelog retention check failed"); } let client: Arc = match state.sync_client.read().clone() { Some(c) => c, None => continue, }; // Must be authenticated if client.session_info().is_none() { continue; } // Must have encryption key loaded if !client.has_master_key() { continue; } // Try to establish SSE connection if not connected if sse_stream.is_none() { match client.subscribe().await { Ok(stream) => { debug!("SSE: connected to push notification stream"); sse_stream = Some(stream); } Err(e) => { debug!("SSE: failed to connect (will retry): {}", e); } } } // Check auto_sync_enabled let enabled = match sync_service::get_sync_state(pool, "auto_sync_enabled").await { Ok(v) => v == "1", Err(e) => { warn!(error = %e, "Sync scheduler: failed to read auto_sync_enabled"); continue; } }; if !enabled { continue; } // Check if sync interval has elapsed (skip check if SSE-triggered) if !sse_triggered { let interval_minutes: u64 = match sync_service::get_sync_state(pool, "sync_interval_minutes").await { Ok(v) => match v.parse() { Ok(mins) => mins, Err(e) => { warn!(value = %v, error = %e, "Failed to parse sync_interval_minutes, using default 15"); 15 } }, Err(e) => { warn!(error = %e, "Failed to read sync_interval_minutes, using default 15"); 15 } }; let last_sync = match sync_service::get_sync_state(pool, "last_sync_at").await { Ok(v) => v, Err(e) => { warn!(error = %e, "Failed to read last_sync_at"); String::new() } }; if !last_sync.is_empty() { if let Ok(last) = chrono::DateTime::parse_from_rfc3339(&last_sync) { let elapsed = chrono::Utc::now() - last.with_timezone(&chrono::Utc); if elapsed.num_minutes() < interval_minutes as i64 { continue; } } } } sse_triggered = false; // Check backoff if let Some(until) = backoff_until { if chrono::Utc::now() < until { debug!(%until, "Sync scheduler: backing off"); continue; } } // Prevent concurrent sync operations (manual + scheduler). let _sync_guard = state.sync_mutex.lock().await; // Create initial snapshot on first sync (inside mutex to prevent // duplicate snapshots from concurrent manual + scheduled sync). let snapshot_done = sync_service::get_sync_state(pool, "initial_snapshot_done") .await .unwrap_or_default(); if snapshot_done != "1" { match sync_service::create_initial_snapshot(pool).await { Ok(count) => info!(rows = count, "Initial sync snapshot created"), Err(e) => { error!(error = %e, "Failed to create initial snapshot"); continue; } } } // Perform sync match sync_service::perform_sync(pool, &client).await { Ok(result) => { consecutive_failures = 0; backoff_until = None; if result.pushed > 0 || result.pulled > 0 { info!( pushed = result.pushed, pulled = result.pulled, "Auto-sync complete" ); } if result.pulled > 0 { let _ = app.emit("sync:changes-applied", ()); } } Err(e) => { // If the server returned 402 (payment required), stop retrying — // the user needs to subscribe before sync will work. let is_payment_required = e.to_string().contains("402"); if is_payment_required { let _ = app.emit("sync:subscription-required", ()); warn!("Auto-sync: subscription required, pausing scheduler for 1 hour"); backoff_until = Some(chrono::Utc::now() + chrono::Duration::hours(1)); } else { consecutive_failures += 1; let backoff_secs = exponential_backoff_secs(consecutive_failures, 15) * 60; backoff_until = Some( chrono::Utc::now() + chrono::Duration::seconds(backoff_secs as i64), ); warn!( error = %e, attempt = consecutive_failures, backoff_secs, "Auto-sync failed" ); } } } // Cleanup old changelog entries if let Err(e) = sync_service::cleanup_changelog(pool).await { warn!(error = %e, "Sync changelog cleanup failed"); } } }); handle.inner().abort_handle() }