Skip to main content

max / goingson

8.7 KB · 221 lines History Blame Raw
1 //! Background scheduler for automatic cloud sync.
2 //!
3 //! Checks every 60 seconds whether a sync is due based on the configured
4 //! interval. On first run, creates an initial snapshot if needed.
5 //! An SSE connection receives real-time push notifications from the server,
6 //! triggering immediate sync when another device pushes changes.
7
8 use std::sync::Arc;
9 use tauri::{Emitter, Manager};
10 use tokio::time::{interval, Duration};
11 use tokio_util::sync::CancellationToken;
12 use tracing::{debug, error, info, warn};
13
14 use crate::state::AppState;
15 use crate::sync_service;
16
17 /// How often the scheduler checks if sync is due (seconds).
18 const CHECK_INTERVAL_SECS: u64 = 60;
19
20 /// Delay before reconnecting the SSE stream after a disconnect (seconds).
21 const SSE_RECONNECT_DELAY_SECS: u64 = 5;
22
23 /// Starts the cloud sync scheduler background task.
24 pub async fn start_sync_scheduler(app: tauri::AppHandle, cancel: CancellationToken) {
25 let mut check_interval = interval(Duration::from_secs(CHECK_INTERVAL_SECS));
26
27 info!("Cloud sync scheduler started (checking every {} seconds)", CHECK_INTERVAL_SECS);
28
29 let mut consecutive_failures: u32 = 0;
30 let mut backoff_until: Option<chrono::DateTime<chrono::Utc>> = None;
31 // Flag set by SSE notification to trigger immediate sync
32 let mut sse_triggered = false;
33 // SSE stream handle — None until first successful connection
34 let mut sse_stream: Option<synckit_client::SyncNotifyStream> = None;
35
36 loop {
37 // Wait for either: timer tick, cancellation, or SSE notification
38 tokio::select! {
39 _ = cancel.cancelled() => {
40 info!("Cloud sync scheduler shutting down");
41 break;
42 }
43 _ = check_interval.tick() => {}
44 result = async {
45 if let Some(ref mut stream) = sse_stream {
46 stream.next_change().await
47 } else {
48 // No stream — sleep forever (timer or cancel will fire)
49 std::future::pending::<Option<()>>().await
50 }
51 } => {
52 match result {
53 Some(()) => {
54 debug!("SSE: received change notification, triggering immediate sync");
55 sse_triggered = true;
56 }
57 None => {
58 // Stream ended — reconnect after delay
59 debug!("SSE: stream disconnected, will reconnect");
60 sse_stream = None;
61 tokio::time::sleep(Duration::from_secs(SSE_RECONNECT_DELAY_SECS)).await;
62 }
63 }
64 }
65 }
66
67 let state: Arc<AppState> = match app.try_state::<Arc<AppState>>() {
68 Some(s) => s.inner().clone(),
69 None => {
70 debug!("Sync scheduler: app state not available yet");
71 continue;
72 }
73 };
74
75 let client: Arc<synckit_client::SyncKitClient> = match state.sync_client.read().expect("sync_client poisoned").clone() {
76 Some(c) => c,
77 None => continue,
78 };
79
80 // Must be authenticated with a non-expired token
81 if client.session_info().is_none() {
82 continue;
83 }
84 if client.is_token_expired() {
85 debug!("Sync scheduler: token expired, skipping sync");
86 client.clear_session();
87 let _ = app.emit("sync:status-changed", "logged_out");
88 sse_stream = None;
89 continue;
90 }
91
92 // Must have encryption key loaded
93 if !client.has_master_key() {
94 continue;
95 }
96
97 // Try to establish SSE connection if not connected
98 if sse_stream.is_none() {
99 match client.subscribe().await {
100 Ok(stream) => {
101 debug!("SSE: connected to push notification stream");
102 sse_stream = Some(stream);
103 }
104 Err(e) => {
105 debug!("SSE: failed to connect (will retry): {}", e);
106 }
107 }
108 }
109
110 // Check auto_sync_enabled
111 let enabled = match sync_service::get_sync_state(&state.pool, "auto_sync_enabled").await {
112 Ok(v) => v == "1",
113 Err(e) => {
114 warn!("Sync scheduler: failed to read auto_sync_enabled: {}", e);
115 continue;
116 }
117 };
118 if !enabled {
119 continue;
120 }
121
122 // Check if sync interval has elapsed (skip check if SSE-triggered)
123 if !sse_triggered {
124 let interval_minutes: u64 = match sync_service::get_sync_state(&state.pool, "sync_interval_minutes").await {
125 Ok(v) => match v.parse() {
126 Ok(mins) => mins,
127 Err(e) => {
128 warn!(value = %v, error = %e, "Failed to parse sync_interval_minutes, using default 5");
129 5
130 }
131 },
132 Err(e) => {
133 warn!(error = %e, "Failed to read sync_interval_minutes, using default 5");
134 5
135 }
136 };
137
138 let last_sync = match sync_service::get_sync_state(&state.pool, "last_sync_at").await {
139 Ok(v) => v,
140 Err(e) => {
141 warn!(error = %e, "Failed to read last_sync_at");
142 String::new()
143 }
144 };
145
146 if !last_sync.is_empty() {
147 if let Ok(last) = chrono::DateTime::parse_from_rfc3339(&last_sync) {
148 let elapsed = chrono::Utc::now() - last.with_timezone(&chrono::Utc);
149 if elapsed.num_minutes() < interval_minutes as i64 {
150 continue;
151 }
152 }
153 }
154 }
155 sse_triggered = false;
156
157 // Check backoff
158 if let Some(until) = backoff_until {
159 if chrono::Utc::now() < until {
160 debug!("Sync scheduler: backing off until {}", until);
161 continue;
162 }
163 }
164
165 // Create initial snapshot on first sync
166 let snapshot_done = sync_service::get_sync_state(&state.pool, "initial_snapshot_done")
167 .await
168 .unwrap_or_default();
169
170 if snapshot_done != "1" {
171 match sync_service::create_initial_snapshot(&state.pool).await {
172 Ok(count) => info!("Initial sync snapshot: {} rows", count),
173 Err(e) => {
174 error!("Failed to create initial snapshot: {}", e);
175 continue;
176 }
177 }
178 }
179
180 // Perform sync (acquire lock to prevent concurrent syncs with manual sync_now)
181 let _sync_guard = state.sync_lock.lock().await;
182 let _ = app.emit("sync:status-changed", "syncing");
183 match sync_service::perform_sync_with_blobs(&state.pool, &client, Some(&state.data_dir)).await {
184 Ok(result) => {
185 consecutive_failures = 0;
186 backoff_until = None;
187 let _ = app.emit("sync:status-changed", "idle");
188 if result.pushed > 0 || result.pulled > 0 {
189 info!("Auto-sync: pushed {}, pulled {}", result.pushed, result.pulled);
190 }
191 if result.pulled > 0 {
192 let _ = app.emit("sync:changes-applied", ());
193 }
194 }
195 Err(e) => {
196 // If the server returned 402 (payment required), stop retrying —
197 // the user needs to subscribe before sync will work.
198 let is_payment_required = e.to_string().contains("402");
199 if is_payment_required {
200 let _ = app.emit("sync:subscription-required", ());
201 let _ = app.emit("sync:status-changed", "subscription_required");
202 warn!("Auto-sync: subscription required, pausing scheduler");
203 // Back off for 1 hour — recheck after that in case user subscribes
204 backoff_until = Some(chrono::Utc::now() + chrono::Duration::hours(1));
205 } else {
206 consecutive_failures += 1;
207 let backoff_minutes = std::cmp::min(2u64.pow(consecutive_failures), 15);
208 backoff_until = Some(chrono::Utc::now() + chrono::Duration::minutes(backoff_minutes as i64));
209 let _ = app.emit("sync:status-changed", "error");
210 warn!("Auto-sync failed (attempt {}, backoff {}m): {}", consecutive_failures, backoff_minutes, e);
211 }
212 }
213 }
214
215 // Cleanup old changelog entries
216 if let Err(e) = sync_service::cleanup_changelog(&state.pool).await {
217 warn!("Sync changelog cleanup failed: {}", e);
218 }
219 }
220 }
221