Skip to main content

max / audiofiles

8.8 KB · 279 lines History Blame Raw
1 //! Upload logic: perform_sync, device registration, push changes, blob uploads.
2
3 use std::path::Path;
4
5 use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient};
6 use uuid::Uuid;
7
8 use tracing::instrument;
9
10 use audiofiles_core::store::{validate_hash, validate_extension};
11
12 use crate::error::{Result, SyncError};
13
14 use super::{open_conn, get_sync_state, set_sync_state, SyncResult, PUSH_BATCH_LIMIT};
15
16 /// Run a full sync cycle: register device, push, pull, update timestamp.
17 ///
18 /// The `db_path` is used to open separate connections inside `spawn_blocking`,
19 /// avoiding the `Connection` is `!Send` issue.
20 #[instrument(skip_all)]
21 pub async fn perform_sync(
22 db_path: &std::path::Path,
23 client: &SyncKitClient,
24 ) -> Result<SyncResult> {
25 // Clear applying_remote flag in case a previous sync crashed mid-apply.
26 // If the flag is stuck at "1", all local changes silently skip the changelog.
27 let p = db_path.to_path_buf();
28 tokio::task::spawn_blocking(move || {
29 let conn = open_conn(&p)?;
30 set_sync_state(&conn, "applying_remote", "0")
31 })
32 .await
33 .map_err(|e| SyncError::Other(e.to_string()))??;
34
35 let device_id = ensure_device_registered(db_path, client).await?;
36 let pushed = push_changes(db_path, client, device_id).await?;
37 let pulled = super::download::pull_changes(db_path, client, device_id).await?;
38
39 let p = db_path.to_path_buf();
40 tokio::task::spawn_blocking(move || {
41 let conn = open_conn(&p)?;
42 set_sync_state(&conn, "last_sync_at", &chrono::Utc::now().to_rfc3339())
43 })
44 .await
45 .map_err(|e| SyncError::Other(e.to_string()))??;
46
47 Ok(SyncResult { pushed, pulled })
48 }
49
50 /// Upload blobs for samples in VFS entries with sync_files enabled.
51 ///
52 /// Finds samples that have local files (cloud_only = 0) in sync-enabled VFS
53 /// entries, and uploads them to the server via presigned URLs.
54 #[instrument(skip_all)]
55 pub async fn upload_pending_blobs(
56 db_path: &Path,
57 content_dir: &Path,
58 client: &SyncKitClient,
59 ) -> Result<u64> {
60 let p = db_path.to_path_buf();
61 let pending: Vec<(String, String, i64)> = tokio::task::spawn_blocking(move || -> Result<_> {
62 let conn = open_conn(&p)?;
63 let mut stmt = conn.prepare(
64 "SELECT DISTINCT s.hash, s.file_extension, s.file_size
65 FROM samples s
66 JOIN vfs_nodes vn ON vn.sample_hash = s.hash
67 JOIN vfs v ON v.id = vn.vfs_id
68 WHERE v.sync_files = 1 AND s.cloud_only = 0",
69 )?;
70 let rows = stmt
71 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
72 .collect::<std::result::Result<Vec<_>, _>>()?;
73 Ok(rows)
74 })
75 .await
76 .map_err(|e| SyncError::Other(e.to_string()))??;
77
78 let mut uploaded = 0u64;
79
80 for (hash, ext, size) in &pending {
81 if validate_hash(hash).is_err() || validate_extension(ext).is_err() {
82 tracing::warn!(hash, ext, "Skipping upload with invalid hash/extension");
83 continue;
84 }
85 // Content-addressed flat layout: content_dir/{hash}.{ext}
86 let blob_path = content_dir.join(format!("{}.{}", hash, ext));
87
88 if !blob_path.exists() {
89 continue;
90 }
91
92 let resp = client
93 .blob_upload_url(hash, *size)
94 .await
95 .map_err(|e| SyncError::Client(e.to_string()))?;
96
97 if resp.already_exists {
98 uploaded += 1;
99 continue;
100 }
101
102 let data = std::fs::read(&blob_path)
103 .map_err(SyncError::Io)?;
104
105 client
106 .blob_upload(&resp.upload_url, data)
107 .await
108 .map_err(|e| SyncError::Client(e.to_string()))?;
109
110 client
111 .blob_confirm(hash, *size)
112 .await
113 .map_err(|e| SyncError::Client(e.to_string()))?;
114
115 uploaded += 1;
116 }
117
118 Ok(uploaded)
119 }
120
121 /// Ensure this device is registered with the server. Caches device_id in sync_state.
122 #[instrument(skip_all)]
123 async fn ensure_device_registered(
124 db_path: &std::path::Path,
125 client: &SyncKitClient,
126 ) -> Result<Uuid> {
127 let p = db_path.to_path_buf();
128 let stored: String = tokio::task::spawn_blocking(move || {
129 let conn = open_conn(&p)?;
130 get_sync_state(&conn, "device_id")
131 })
132 .await
133 .map_err(|e| SyncError::Other(e.to_string()))??;
134
135 if !stored.is_empty()
136 && let Ok(id) = stored.parse::<Uuid>() {
137 return Ok(id);
138 }
139
140 let hostname = std::env::var("HOSTNAME")
141 .or_else(|_| std::env::var("COMPUTERNAME"))
142 .unwrap_or_else(|_| "unknown".to_string());
143 let platform = std::env::consts::OS;
144
145 let device = client
146 .register_device(&hostname, platform)
147 .await
148 .map_err(|e| SyncError::Client(e.to_string()))?;
149
150 let p = db_path.to_path_buf();
151 let device_id = device.id;
152 tokio::task::spawn_blocking(move || {
153 let conn = open_conn(&p)?;
154 set_sync_state(&conn, "device_id", &device_id.to_string())
155 })
156 .await
157 .map_err(|e| SyncError::Other(e.to_string()))??;
158
159 Ok(device.id)
160 }
161
162 /// Push unpushed changelog entries to the server.
163 #[instrument(skip_all)]
164 async fn push_changes(
165 db_path: &std::path::Path,
166 client: &SyncKitClient,
167 device_id: Uuid,
168 ) -> Result<i64> {
169 let p = db_path.to_path_buf();
170 let rows = tokio::task::spawn_blocking(move || -> Result<_> {
171 let conn = open_conn(&p)?;
172 let mut stmt = conn.prepare(
173 "SELECT id, table_name, op, row_id, timestamp, data
174 FROM sync_changelog
175 WHERE pushed = 0
176 ORDER BY id ASC
177 LIMIT ?1",
178 )?;
179
180 let rows: Vec<(i64, String, String, String, String, Option<String>)> = stmt
181 .query_map([PUSH_BATCH_LIMIT as i64], |row| {
182 Ok((
183 row.get(0)?,
184 row.get(1)?,
185 row.get(2)?,
186 row.get(3)?,
187 row.get(4)?,
188 row.get(5)?,
189 ))
190 })?
191 .collect::<std::result::Result<Vec<_>, _>>()?;
192
193 Ok(rows)
194 })
195 .await
196 .map_err(|e| SyncError::Other(e.to_string()))??;
197
198 if rows.is_empty() {
199 return Ok(0);
200 }
201
202 let mut pushed_ids: Vec<i64> = Vec::with_capacity(rows.len());
203 let mut skipped = 0i64;
204
205 let changes: Vec<ChangeEntry> = rows
206 .into_iter()
207 .filter_map(|(id, table, op, row_id, timestamp, data)| {
208 let ts = chrono::DateTime::parse_from_rfc3339(&timestamp)
209 .map(|dt| dt.with_timezone(&chrono::Utc))
210 .unwrap_or_else(|_| chrono::Utc::now());
211
212 let change_op = match ChangeOp::from_str_opt(&op) {
213 Some(o) => o,
214 None => {
215 tracing::warn!(id, "Skipping changelog entry with unknown op: {op} — marking as pushed to prevent retry loop");
216 pushed_ids.push(id);
217 skipped += 1;
218 return None;
219 }
220 };
221
222 let parsed_data = match data {
223 Some(d) => match serde_json::from_str(&d) {
224 Ok(v) => Some(v),
225 Err(e) => {
226 tracing::warn!(row_id = %row_id, "Unparseable changelog JSON, marking as pushed to prevent retry loop: {e}");
227 pushed_ids.push(id);
228 skipped += 1;
229 return None;
230 }
231 },
232 None => None,
233 };
234
235 pushed_ids.push(id);
236 Some(ChangeEntry {
237 table,
238 op: change_op,
239 row_id,
240 timestamp: ts,
241 data: parsed_data,
242 })
243 })
244 .collect();
245
246 if skipped > 0 {
247 tracing::warn!(skipped, "Some changelog entries could not be parsed (unknown op or bad JSON) and were marked pushed to break the retry loop; they are dropped, not retried");
248 }
249
250 if !changes.is_empty() {
251 client
252 .push(device_id, changes)
253 .await
254 .map_err(|e| SyncError::Client(e.to_string()))?;
255 }
256
257 let pushed_count = pushed_ids.len() as i64;
258
259 // Mark only successfully-pushed entries
260 if !pushed_ids.is_empty() {
261 let p = db_path.to_path_buf();
262 tokio::task::spawn_blocking(move || {
263 let conn = open_conn(&p)?;
264 let placeholders: String = pushed_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
265 let sql = format!("UPDATE sync_changelog SET pushed = 1 WHERE id IN ({})", placeholders);
266 let params: Vec<&dyn rusqlite::types::ToSql> = pushed_ids
267 .iter()
268 .map(|id| id as &dyn rusqlite::types::ToSql)
269 .collect();
270 conn.execute(&sql, params.as_slice())?;
271 Ok::<_, SyncError>(())
272 })
273 .await
274 .map_err(|e| SyncError::Other(e.to_string()))??;
275 }
276
277 Ok(pushed_count)
278 }
279