Skip to main content

max / audiofiles

11.5 KB · 316 lines History Blame Raw
1 //! Background sync scheduler: periodic auto-sync with exponential backoff on failure.
2 //! SSE push notifications trigger immediate sync when another device pushes changes.
3
4 use std::path::{Path, PathBuf};
5 use std::sync::Arc;
6
7 use parking_lot::Mutex;
8 use synckit_client::SyncKitClient;
9 use tokio::sync::mpsc;
10
11 use tracing::instrument;
12
13 use crate::error::SyncError;
14 use crate::service;
15 use crate::SyncStatus;
16
17 /// Commands the GUI can send to the scheduler.
18 pub enum SyncCommand {
19 /// Trigger an immediate sync cycle.
20 SyncNow,
21 /// Download a single sample blob by hash. Used by the "Download" row-level
22 /// context menu so the user can pull one cloud-only sample without
23 /// triggering a full sync cycle.
24 DownloadOne { hash: String },
25 /// Stop the scheduler loop.
26 Stop,
27 }
28
29 /// Delay before reconnecting the SSE stream after a disconnect (seconds).
30 const SSE_RECONNECT_DELAY_SECS: u64 = 5;
31
32 /// Run the background sync scheduler loop.
33 ///
34 /// Uses `db_path` to open short-lived connections inside `spawn_blocking`.
35 #[instrument(skip_all)]
36 pub async fn run_scheduler(
37 client: Arc<SyncKitClient>,
38 db_path: PathBuf,
39 content_dir: PathBuf,
40 status: Arc<Mutex<SyncStatus>>,
41 mut commands: mpsc::UnboundedReceiver<SyncCommand>,
42 ) {
43 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
44 let mut consecutive_failures: u32 = 0;
45 let mut backoff_until: Option<chrono::DateTime<chrono::Utc>> = None;
46 let mut sse_stream: Option<synckit_client::SyncNotifyStream> = None;
47
48 loop {
49 tokio::select! {
50 _ = interval.tick() => {
51 // Check backoff
52 if let Some(until) = backoff_until
53 && chrono::Utc::now() < until {
54 continue;
55 }
56
57 // Check if auto-sync is enabled and client is ready
58 if !should_auto_sync(&db_path, &client) {
59 continue;
60 }
61
62 // Check interval elapsed
63 if !interval_elapsed(&db_path) {
64 continue;
65 }
66
67 match run_sync_cycle(&db_path, &content_dir, &client, &status).await {
68 Ok(pulled) => {
69 consecutive_failures = 0;
70 backoff_until = None;
71 if pulled > 0 {
72 status.lock().needs_refresh = true;
73 }
74 }
75 Err(e) => {
76 consecutive_failures += 1;
77 let backoff_minutes = std::cmp::min(
78 2u64.saturating_pow(consecutive_failures),
79 15,
80 );
81 backoff_until = Some(
82 chrono::Utc::now()
83 + chrono::Duration::minutes(backoff_minutes as i64),
84 );
85 tracing::warn!(
86 "Auto-sync failed (attempt {consecutive_failures}, backoff {backoff_minutes}m): {e}"
87 );
88 status.lock().last_error = Some(e.to_string());
89 }
90 }
91 }
92 result = async {
93 if let Some(ref mut stream) = sse_stream {
94 stream.next_change().await
95 } else {
96 std::future::pending::<Option<()>>().await
97 }
98 } => {
99 match result {
100 Some(()) => {
101 tracing::debug!("SSE: received change notification, triggering immediate sync");
102 if should_auto_sync(&db_path, &client) {
103 match run_sync_cycle(&db_path, &content_dir, &client, &status).await {
104 Ok(pulled) => {
105 consecutive_failures = 0;
106 backoff_until = None;
107 if pulled > 0 {
108 status.lock().needs_refresh = true;
109 }
110 }
111 Err(e) => {
112 tracing::warn!("SSE-triggered sync failed: {e}");
113 status.lock().last_error = Some(e.to_string());
114 }
115 }
116 }
117 }
118 None => {
119 tracing::debug!("SSE: stream disconnected, will reconnect");
120 sse_stream = None;
121 tokio::time::sleep(std::time::Duration::from_secs(SSE_RECONNECT_DELAY_SECS)).await;
122 }
123 }
124 }
125 cmd = commands.recv() => {
126 match cmd {
127 Some(SyncCommand::SyncNow) => {
128 match run_sync_cycle(&db_path, &content_dir, &client, &status).await {
129 Ok(pulled) => {
130 consecutive_failures = 0;
131 backoff_until = None;
132 if pulled > 0 {
133 status.lock().needs_refresh = true;
134 }
135 }
136 Err(e) => {
137 tracing::error!("Manual sync failed: {e}");
138 status.lock().last_error = Some(e.to_string());
139 }
140 }
141 }
142 Some(SyncCommand::DownloadOne { hash }) => {
143 match service::download_one_blob(&db_path, &content_dir, &client, &hash).await {
144 Ok(()) => {
145 status.lock().needs_refresh = true;
146 }
147 Err(e) => {
148 tracing::warn!("Single-blob download failed for {hash}: {e}");
149 status.lock().last_error = Some(e.to_string());
150 }
151 }
152 }
153 Some(SyncCommand::Stop) | None => {
154 tracing::info!("Sync scheduler stopping");
155 break;
156 }
157 }
158 }
159 }
160
161 // Try to establish SSE connection if not connected and client is ready
162 if sse_stream.is_none()
163 && client.session_info().is_some()
164 && client.has_master_key()
165 {
166 match client.subscribe().await {
167 Ok(stream) => {
168 tracing::debug!("SSE: connected to push notification stream");
169 sse_stream = Some(stream);
170 }
171 Err(e) => {
172 tracing::debug!("SSE: failed to connect (will retry): {e}");
173 }
174 }
175 }
176 }
177 }
178
179 /// Check if auto-sync is enabled and the client is ready (authenticated + has key).
180 #[instrument(skip_all)]
181 fn should_auto_sync(db_path: &std::path::Path, client: &SyncKitClient) -> bool {
182 if client.session_info().is_none()
183 || !client.has_master_key()
184 {
185 return false;
186 }
187 let conn = match rusqlite::Connection::open(db_path) {
188 Ok(c) => c,
189 Err(e) => {
190 tracing::warn!("Failed to open DB for auto_sync check: {e}");
191 return false;
192 }
193 };
194 service::get_sync_state(&conn, "auto_sync_enabled").unwrap_or_default() == "1"
195 }
196
197 /// Check if enough time has elapsed since the last sync.
198 #[instrument(skip_all)]
199 fn interval_elapsed(db_path: &std::path::Path) -> bool {
200 let conn = match rusqlite::Connection::open(db_path) {
201 Ok(c) => c,
202 Err(e) => {
203 tracing::warn!("Failed to open DB for interval check: {e}");
204 return true;
205 }
206 };
207 let last_sync = service::get_sync_state(&conn, "last_sync_at").unwrap_or_default();
208 let interval_str = service::get_sync_state(&conn, "sync_interval_minutes")
209 .unwrap_or_else(|_| "15".to_string());
210 let interval_minutes: i64 = interval_str.parse().unwrap_or(15);
211
212 if last_sync.is_empty() {
213 return true;
214 }
215
216 match chrono::DateTime::parse_from_rfc3339(&last_sync) {
217 Ok(last) => {
218 let elapsed = chrono::Utc::now() - last.with_timezone(&chrono::Utc);
219 elapsed.num_minutes() >= interval_minutes
220 }
221 Err(_) => true,
222 }
223 }
224
225 /// Run a single sync cycle: snapshot if needed, sync, blob sync, cleanup.
226 #[instrument(skip_all)]
227 async fn run_sync_cycle(
228 db_path: &Path,
229 content_dir: &Path,
230 client: &SyncKitClient,
231 status: &Arc<Mutex<SyncStatus>>,
232 ) -> std::result::Result<i64, SyncError> {
233 {
234 let mut s = status.lock();
235 s.state = crate::SyncState::Syncing;
236 s.last_error = None;
237 }
238
239 // Create initial snapshot if first sync
240 let p = db_path.to_path_buf();
241 tokio::task::spawn_blocking(move || {
242 let conn = rusqlite::Connection::open(&p)?;
243 service::create_initial_snapshot(&conn)
244 })
245 .await
246 .map_err(|e| SyncError::Other(e.to_string()))??;
247
248 let result = service::perform_sync(db_path, client).await?;
249
250 // Mark samples as cloud_only when blob doesn't exist locally
251 let p = db_path.to_path_buf();
252 let cd = content_dir.to_path_buf();
253 if let Err(e) = tokio::task::spawn_blocking(move || {
254 let conn = rusqlite::Connection::open(&p)?;
255 service::mark_cloud_only_samples(&conn, &cd)
256 })
257 .await
258 .map_err(|e| SyncError::Other(e.to_string()))? {
259 tracing::warn!("Cloud-only marking failed (non-fatal): {e}");
260 }
261
262 // Blob sync: upload pending, then download missing
263 if let Err(e) = service::upload_pending_blobs(db_path, content_dir, client).await {
264 tracing::warn!("Blob upload failed (non-fatal): {e}");
265 }
266 if let Err(e) = service::download_missing_blobs(db_path, content_dir, client).await {
267 tracing::warn!("Blob download failed (non-fatal): {e}");
268 }
269
270 // Cleanup old entries + enforce retention cap
271 let p = db_path.to_path_buf();
272 tokio::task::spawn_blocking(move || {
273 let conn = rusqlite::Connection::open(&p)?;
274 service::cleanup_changelog(&conn)?;
275 service::enforce_changelog_retention(&conn)
276 })
277 .await
278 .map_err(|e| SyncError::Other(e.to_string()))??;
279
280 // Update status
281 {
282 let p = db_path.to_path_buf();
283 let last_sync = tokio::task::spawn_blocking(move || {
284 match rusqlite::Connection::open(&p) {
285 Ok(c) => service::get_sync_state(&c, "last_sync_at").unwrap_or_default(),
286 Err(e) => {
287 tracing::warn!("Failed to open DB for last_sync_at: {e}");
288 String::new()
289 }
290 }
291 })
292 .await
293 .unwrap_or_default();
294
295 let p2 = db_path.to_path_buf();
296 let pending = tokio::task::spawn_blocking(move || {
297 match rusqlite::Connection::open(&p2) {
298 Ok(c) => service::count_pending_changes(&c).unwrap_or(0),
299 Err(e) => {
300 tracing::warn!("Failed to open DB for pending changes: {e}");
301 0
302 }
303 }
304 })
305 .await
306 .unwrap_or(0);
307
308 let mut s = status.lock();
309 s.state = crate::SyncState::Ready;
310 s.last_sync_at = if last_sync.is_empty() { None } else { Some(last_sync) };
311 s.pending_changes = pending;
312 }
313
314 Ok(result.pulled)
315 }
316