//! Background scheduler for automatic cloud sync. //! //! Checks every 60 seconds whether a sync is due based on the configured //! interval. On first run, creates an initial snapshot if needed. //! An SSE connection receives real-time push notifications from the server, //! triggering immediate sync when another device pushes changes. use std::sync::Arc; use tauri::{Emitter, Manager}; use tokio::time::{interval, Duration}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::state::AppState; use crate::sync_service; /// How often the scheduler checks if sync is due (seconds). const CHECK_INTERVAL_SECS: u64 = 60; /// Delay before reconnecting the SSE stream after a disconnect (seconds). const SSE_RECONNECT_DELAY_SECS: u64 = 5; /// Starts the cloud sync scheduler background task. pub async fn start_sync_scheduler(app: tauri::AppHandle, cancel: CancellationToken) { let mut check_interval = interval(Duration::from_secs(CHECK_INTERVAL_SECS)); info!("Cloud sync scheduler started (checking every {} seconds)", CHECK_INTERVAL_SECS); let mut consecutive_failures: u32 = 0; let mut backoff_until: Option> = None; // Flag set by SSE notification to trigger immediate sync let mut sse_triggered = false; // SSE stream handle — None until first successful connection let mut sse_stream: Option = None; loop { // Wait for either: timer tick, cancellation, or SSE notification tokio::select! { _ = cancel.cancelled() => { info!("Cloud sync scheduler shutting down"); break; } _ = check_interval.tick() => {} result = async { if let Some(ref mut stream) = sse_stream { stream.next_change().await } else { // No stream — sleep forever (timer or cancel will fire) std::future::pending::>().await } } => { match result { Some(()) => { debug!("SSE: received change notification, triggering immediate sync"); sse_triggered = true; } None => { // Stream ended — reconnect after delay debug!("SSE: stream disconnected, will reconnect"); sse_stream = None; tokio::time::sleep(Duration::from_secs(SSE_RECONNECT_DELAY_SECS)).await; } } } } let state: Arc = match app.try_state::>() { Some(s) => s.inner().clone(), None => { debug!("Sync scheduler: app state not available yet"); continue; } }; let client: Arc = match state.sync_client.read().expect("sync_client poisoned").clone() { Some(c) => c, None => continue, }; // Must be authenticated with a non-expired token if client.session_info().is_none() { continue; } if client.is_token_expired() { debug!("Sync scheduler: token expired, skipping sync"); client.clear_session(); let _ = app.emit("sync:status-changed", "logged_out"); sse_stream = None; continue; } // Must have encryption key loaded if !client.has_master_key() { continue; } // Try to establish SSE connection if not connected if sse_stream.is_none() { match client.subscribe().await { Ok(stream) => { debug!("SSE: connected to push notification stream"); sse_stream = Some(stream); } Err(e) => { debug!("SSE: failed to connect (will retry): {}", e); } } } // Check auto_sync_enabled let enabled = match sync_service::get_sync_state(&state.pool, "auto_sync_enabled").await { Ok(v) => v == "1", Err(e) => { warn!("Sync scheduler: failed to read auto_sync_enabled: {}", e); continue; } }; if !enabled { continue; } // Check if sync interval has elapsed (skip check if SSE-triggered) if !sse_triggered { let interval_minutes: u64 = match sync_service::get_sync_state(&state.pool, "sync_interval_minutes").await { Ok(v) => match v.parse() { Ok(mins) => mins, Err(e) => { warn!(value = %v, error = %e, "Failed to parse sync_interval_minutes, using default 5"); 5 } }, Err(e) => { warn!(error = %e, "Failed to read sync_interval_minutes, using default 5"); 5 } }; let last_sync = match sync_service::get_sync_state(&state.pool, "last_sync_at").await { Ok(v) => v, Err(e) => { warn!(error = %e, "Failed to read last_sync_at"); String::new() } }; if !last_sync.is_empty() { if let Ok(last) = chrono::DateTime::parse_from_rfc3339(&last_sync) { let elapsed = chrono::Utc::now() - last.with_timezone(&chrono::Utc); if elapsed.num_minutes() < interval_minutes as i64 { continue; } } } } sse_triggered = false; // Check backoff if let Some(until) = backoff_until { if chrono::Utc::now() < until { debug!("Sync scheduler: backing off until {}", until); continue; } } // Create initial snapshot on first sync let snapshot_done = sync_service::get_sync_state(&state.pool, "initial_snapshot_done") .await .unwrap_or_default(); if snapshot_done != "1" { match sync_service::create_initial_snapshot(&state.pool).await { Ok(count) => info!("Initial sync snapshot: {} rows", count), Err(e) => { error!("Failed to create initial snapshot: {}", e); continue; } } } // Perform sync (acquire lock to prevent concurrent syncs with manual sync_now) let _sync_guard = state.sync_lock.lock().await; let _ = app.emit("sync:status-changed", "syncing"); match sync_service::perform_sync_with_blobs(&state.pool, &client, Some(&state.data_dir)).await { Ok(result) => { consecutive_failures = 0; backoff_until = None; let _ = app.emit("sync:status-changed", "idle"); if result.pushed > 0 || result.pulled > 0 { info!("Auto-sync: pushed {}, pulled {}", result.pushed, result.pulled); } if result.pulled > 0 { let _ = app.emit("sync:changes-applied", ()); } } Err(e) => { // If the server returned 402 (payment required), stop retrying — // the user needs to subscribe before sync will work. let is_payment_required = e.to_string().contains("402"); if is_payment_required { let _ = app.emit("sync:subscription-required", ()); let _ = app.emit("sync:status-changed", "subscription_required"); warn!("Auto-sync: subscription required, pausing scheduler"); // Back off for 1 hour — recheck after that in case user subscribes backoff_until = Some(chrono::Utc::now() + chrono::Duration::hours(1)); } else { consecutive_failures += 1; let backoff_minutes = std::cmp::min(2u64.pow(consecutive_failures), 15); backoff_until = Some(chrono::Utc::now() + chrono::Duration::minutes(backoff_minutes as i64)); let _ = app.emit("sync:status-changed", "error"); warn!("Auto-sync failed (attempt {}, backoff {}m): {}", consecutive_failures, backoff_minutes, e); } } } // Cleanup old changelog entries if let Err(e) = sync_service::cleanup_changelog(&state.pool).await { warn!("Sync changelog cleanup failed: {}", e); } } }