Skip to main content

max / balanced_breakfast

8.8 KB · 218 lines History Blame Raw
1 //! Background sync scheduler with exponential backoff and SSE push notifications.
2
3 use bb_core::scheduler::exponential_backoff_secs;
4 use crate::state::AppState;
5 use crate::sync_service;
6 use std::sync::Arc;
7 use tauri::{AppHandle, Emitter, Manager};
8 use tracing::{debug, error, info, warn};
9
10 /// How often (seconds) the scheduler checks whether a sync is due.
11 const CHECK_INTERVAL_SECS: u64 = 60;
12
13 /// Delay before reconnecting the SSE stream after a disconnect (seconds).
14 const SSE_RECONNECT_DELAY_SECS: u64 = 5;
15
16 /// Spawn a background task that periodically syncs if auto-sync is enabled.
17 /// Also maintains an SSE connection for real-time push notifications.
18 /// Returns the `AbortHandle` so the caller can cancel the task on shutdown.
19 pub fn start_sync_scheduler(app: AppHandle) -> tokio::task::AbortHandle {
20 let handle = tauri::async_runtime::spawn(async move {
21 let mut consecutive_failures: u32 = 0;
22 let mut backoff_until: Option<chrono::DateTime<chrono::Utc>> = None;
23 let mut sse_triggered = false;
24 let mut sse_stream: Option<synckit_client::SyncNotifyStream> = None;
25
26 loop {
27 tokio::select! {
28 _ = tokio::time::sleep(std::time::Duration::from_secs(CHECK_INTERVAL_SECS)) => {}
29 result = async {
30 if let Some(ref mut stream) = sse_stream {
31 stream.next_change().await
32 } else {
33 std::future::pending::<Option<()>>().await
34 }
35 } => {
36 match result {
37 Some(()) => {
38 debug!("SSE: received change notification, triggering immediate sync");
39 sse_triggered = true;
40 }
41 None => {
42 debug!("SSE: stream disconnected, will reconnect");
43 sse_stream = None;
44 tokio::time::sleep(std::time::Duration::from_secs(SSE_RECONNECT_DELAY_SECS)).await;
45 }
46 }
47 }
48 }
49
50 let state = match app.try_state::<Arc<AppState>>() {
51 Some(s) => s,
52 None => {
53 debug!("Sync scheduler: app state not available yet");
54 continue;
55 }
56 };
57
58 let pool = state.orchestrator.database().pool();
59
60 // Enforce changelog retention cap every tick, even when sync is
61 // disconnected/disabled — that's exactly when entries accumulate.
62 if let Err(e) = sync_service::enforce_changelog_retention(pool).await {
63 warn!(error = %e, "Sync changelog retention check failed");
64 }
65
66 let client: Arc<synckit_client::SyncKitClient> = match state.sync_client.read().clone() {
67 Some(c) => c,
68 None => continue,
69 };
70
71 // Must be authenticated
72 if client.session_info().is_none() {
73 continue;
74 }
75
76 // Must have encryption key loaded
77 if !client.has_master_key() {
78 continue;
79 }
80
81 // Try to establish SSE connection if not connected
82 if sse_stream.is_none() {
83 match client.subscribe().await {
84 Ok(stream) => {
85 debug!("SSE: connected to push notification stream");
86 sse_stream = Some(stream);
87 }
88 Err(e) => {
89 debug!("SSE: failed to connect (will retry): {}", e);
90 }
91 }
92 }
93
94 // Check auto_sync_enabled
95 let enabled = match sync_service::get_sync_state(pool, "auto_sync_enabled").await {
96 Ok(v) => v == "1",
97 Err(e) => {
98 warn!(error = %e, "Sync scheduler: failed to read auto_sync_enabled");
99 continue;
100 }
101 };
102 if !enabled {
103 continue;
104 }
105
106 // Check if sync interval has elapsed (skip check if SSE-triggered)
107 if !sse_triggered {
108 let interval_minutes: u64 = match sync_service::get_sync_state(pool, "sync_interval_minutes").await {
109 Ok(v) => match v.parse() {
110 Ok(mins) => mins,
111 Err(e) => {
112 warn!(value = %v, error = %e, "Failed to parse sync_interval_minutes, using default 15");
113 15
114 }
115 },
116 Err(e) => {
117 warn!(error = %e, "Failed to read sync_interval_minutes, using default 15");
118 15
119 }
120 };
121
122 let last_sync = match sync_service::get_sync_state(pool, "last_sync_at").await {
123 Ok(v) => v,
124 Err(e) => {
125 warn!(error = %e, "Failed to read last_sync_at");
126 String::new()
127 }
128 };
129
130 if !last_sync.is_empty() {
131 if let Ok(last) = chrono::DateTime::parse_from_rfc3339(&last_sync) {
132 let elapsed = chrono::Utc::now() - last.with_timezone(&chrono::Utc);
133 if elapsed.num_minutes() < interval_minutes as i64 {
134 continue;
135 }
136 }
137 }
138 }
139 sse_triggered = false;
140
141 // Check backoff
142 if let Some(until) = backoff_until {
143 if chrono::Utc::now() < until {
144 debug!(%until, "Sync scheduler: backing off");
145 continue;
146 }
147 }
148
149 // Prevent concurrent sync operations (manual + scheduler).
150 let _sync_guard = state.sync_mutex.lock().await;
151
152 // Create initial snapshot on first sync (inside mutex to prevent
153 // duplicate snapshots from concurrent manual + scheduled sync).
154 let snapshot_done = sync_service::get_sync_state(pool, "initial_snapshot_done")
155 .await
156 .unwrap_or_default();
157
158 if snapshot_done != "1" {
159 match sync_service::create_initial_snapshot(pool).await {
160 Ok(count) => info!(rows = count, "Initial sync snapshot created"),
161 Err(e) => {
162 error!(error = %e, "Failed to create initial snapshot");
163 continue;
164 }
165 }
166 }
167
168 // Perform sync
169 match sync_service::perform_sync(pool, &client).await {
170 Ok(result) => {
171 consecutive_failures = 0;
172 backoff_until = None;
173 if result.pushed > 0 || result.pulled > 0 {
174 info!(
175 pushed = result.pushed,
176 pulled = result.pulled,
177 "Auto-sync complete"
178 );
179 }
180 if result.pulled > 0 {
181 let _ = app.emit("sync:changes-applied", ());
182 }
183 }
184 Err(e) => {
185 // If the server returned 402 (payment required), stop retrying —
186 // the user needs to subscribe before sync will work.
187 let is_payment_required = e.to_string().contains("402");
188 if is_payment_required {
189 let _ = app.emit("sync:subscription-required", ());
190 warn!("Auto-sync: subscription required, pausing scheduler for 1 hour");
191 backoff_until = Some(chrono::Utc::now() + chrono::Duration::hours(1));
192 } else {
193 consecutive_failures += 1;
194 let backoff_secs =
195 exponential_backoff_secs(consecutive_failures, 15) * 60;
196 backoff_until = Some(
197 chrono::Utc::now()
198 + chrono::Duration::seconds(backoff_secs as i64),
199 );
200 warn!(
201 error = %e,
202 attempt = consecutive_failures,
203 backoff_secs,
204 "Auto-sync failed"
205 );
206 }
207 }
208 }
209
210 // Cleanup old changelog entries
211 if let Err(e) = sync_service::cleanup_changelog(pool).await {
212 warn!(error = %e, "Sync changelog cleanup failed");
213 }
214 }
215 });
216 handle.inner().abort_handle()
217 }
218