Skip to main content

max / audiofiles

10.0 KB · 309 lines History Blame Raw
1 //! Download logic: pull remote changes, download missing blobs.
2
3 use std::path::Path;
4
5 use sha2::{Digest, Sha256};
6 use synckit_client::SyncKitClient;
7 use uuid::Uuid;
8
9 use tracing::instrument;
10
11 use audiofiles_core::store::{validate_hash, validate_extension};
12
13 use crate::error::{Result, SyncError};
14
15 use super::{open_conn, get_sync_state, set_sync_state};
16
17 /// Download blobs for samples that exist in metadata but not on disk.
18 ///
19 /// For samples in sync-enabled VFS entries where the local file is missing,
20 /// downloads from the server and writes to the content directory.
21 #[instrument(skip_all)]
22 pub async fn download_missing_blobs(
23 db_path: &Path,
24 content_dir: &Path,
25 client: &SyncKitClient,
26 ) -> Result<u64> {
27 let cd = content_dir.to_path_buf();
28 let p = db_path.to_path_buf();
29 let missing: Vec<(String, String)> = tokio::task::spawn_blocking(move || -> Result<_> {
30 let conn = open_conn(&p)?;
31 let mut stmt = conn.prepare(
32 "SELECT DISTINCT s.hash, s.file_extension
33 FROM samples s
34 JOIN vfs_nodes vn ON vn.sample_hash = s.hash
35 JOIN vfs v ON v.id = vn.vfs_id
36 WHERE v.sync_files = 1",
37 )?;
38 let rows: Vec<(String, String)> = stmt
39 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
40 .collect::<std::result::Result<Vec<_>, _>>()?;
41
42 // Filter to only samples whose blob file doesn't exist locally
43 let missing: Vec<(String, String)> = rows
44 .into_iter()
45 .filter(|(hash, ext)| {
46 if validate_hash(hash).is_err() || validate_extension(ext).is_err() {
47 tracing::warn!(hash, ext, "Skipping blob with invalid hash/extension");
48 return false;
49 }
50 let path = cd.join(format!("{}.{}", hash, ext));
51 !path.exists()
52 })
53 .collect();
54 Ok(missing)
55 })
56 .await
57 .map_err(|e| SyncError::Other(e.to_string()))??;
58
59 let mut downloaded = 0u64;
60
61 for (hash, ext) in &missing {
62 // Mark cloud_only = 1 while downloading (in a transaction so the
63 // applying_remote flag is always cleared, even on error)
64 let p = db_path.to_path_buf();
65 let h = hash.clone();
66 tokio::task::spawn_blocking(move || -> Result<()> {
67 let conn = open_conn(&p)?;
68 conn.execute_batch("BEGIN IMMEDIATE")?;
69 conn.execute(
70 "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'",
71 [],
72 )?;
73 conn.execute(
74 "UPDATE samples SET cloud_only = 1 WHERE hash = ?1",
75 [&h],
76 )?;
77 conn.execute(
78 "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'",
79 [],
80 )?;
81 conn.execute_batch("COMMIT")?;
82 Ok(())
83 })
84 .await
85 .map_err(|e| SyncError::Other(e.to_string()))??;
86
87 let download_url = match client.blob_download_url(hash).await {
88 Ok(url) => url,
89 Err(e) => {
90 tracing::warn!("Failed to get download URL for {hash}: {e}");
91 continue;
92 }
93 };
94
95 let data = match client.blob_download(&download_url).await {
96 Ok(d) => d,
97 Err(e) => {
98 tracing::warn!("Failed to download blob {hash}: {e}");
99 continue;
100 }
101 };
102
103 // Verify SHA-256 before writing
104 let computed = format!("{:x}", Sha256::digest(&data));
105 if computed != *hash {
106 tracing::warn!(
107 expected = hash,
108 actual = computed,
109 "Downloaded blob hash mismatch, skipping"
110 );
111 continue;
112 }
113
114 // Write blob atomically: temp file + rename to prevent partial writes on crash
115 std::fs::create_dir_all(content_dir)?;
116 let blob_path = content_dir.join(format!("{}.{}", hash, ext));
117 let tmp_path = content_dir.join(format!("{}.{}.tmp", hash, ext));
118 std::fs::write(&tmp_path, &data)?;
119 if let Err(e) = std::fs::rename(&tmp_path, &blob_path) {
120 let _ = std::fs::remove_file(&tmp_path);
121 return Err(SyncError::Io(e));
122 }
123
124 // Clear cloud_only flag (in a transaction)
125 let p = db_path.to_path_buf();
126 let h = hash.clone();
127 tokio::task::spawn_blocking(move || -> Result<()> {
128 let conn = open_conn(&p)?;
129 conn.execute_batch("BEGIN IMMEDIATE")?;
130 conn.execute(
131 "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'",
132 [],
133 )?;
134 conn.execute(
135 "UPDATE samples SET cloud_only = 0 WHERE hash = ?1",
136 [&h],
137 )?;
138 conn.execute(
139 "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'",
140 [],
141 )?;
142 conn.execute_batch("COMMIT")?;
143 Ok(())
144 })
145 .await
146 .map_err(|e| SyncError::Other(e.to_string()))??;
147
148 downloaded += 1;
149 }
150
151 Ok(downloaded)
152 }
153
154 /// Download a single sample blob by hash. Used by the "Download" context-menu
155 /// item on cloud-only rows so the user can fetch just the sample they care
156 /// about without triggering a full sync cycle.
157 #[instrument(skip(client))]
158 pub async fn download_one_blob(
159 db_path: &Path,
160 content_dir: &Path,
161 client: &SyncKitClient,
162 hash: &str,
163 ) -> Result<()> {
164 // Look up the sample's extension and verify it exists in metadata.
165 let p = db_path.to_path_buf();
166 let h = hash.to_string();
167 let ext: String = tokio::task::spawn_blocking(move || -> Result<String> {
168 let conn = open_conn(&p)?;
169 let ext: String = conn
170 .query_row(
171 "SELECT file_extension FROM samples WHERE hash = ?1",
172 [&h],
173 |row| row.get(0),
174 )
175 .map_err(|e| match e {
176 rusqlite::Error::QueryReturnedNoRows => {
177 SyncError::Other(format!("Sample {h} not found in metadata"))
178 }
179 other => other.into(),
180 })?;
181 Ok(ext)
182 })
183 .await
184 .map_err(|e| SyncError::Other(e.to_string()))??;
185
186 if validate_hash(hash).is_err() || validate_extension(&ext).is_err() {
187 return Err(SyncError::Other(format!(
188 "Invalid hash or extension: {hash}.{ext}"
189 )));
190 }
191
192 let blob_path = content_dir.join(format!("{hash}.{ext}"));
193 if blob_path.exists() {
194 // Nothing to do — clear the cloud_only flag in case it's stale.
195 let p = db_path.to_path_buf();
196 let h = hash.to_string();
197 tokio::task::spawn_blocking(move || -> Result<()> {
198 let conn = open_conn(&p)?;
199 conn.execute("UPDATE samples SET cloud_only = 0 WHERE hash = ?1", [&h])?;
200 Ok(())
201 })
202 .await
203 .map_err(|e| SyncError::Other(e.to_string()))??;
204 return Ok(());
205 }
206
207 let download_url = client
208 .blob_download_url(hash)
209 .await
210 .map_err(|e| SyncError::Client(e.to_string()))?;
211 let data = client
212 .blob_download(&download_url)
213 .await
214 .map_err(|e| SyncError::Client(e.to_string()))?;
215
216 let computed = format!("{:x}", Sha256::digest(&data));
217 if computed != *hash {
218 return Err(SyncError::Other(format!(
219 "Downloaded blob hash mismatch (expected {hash}, got {computed})"
220 )));
221 }
222
223 std::fs::create_dir_all(content_dir)?;
224 let tmp_path = content_dir.join(format!("{hash}.{ext}.tmp"));
225 std::fs::write(&tmp_path, &data)?;
226 if let Err(e) = std::fs::rename(&tmp_path, &blob_path) {
227 let _ = std::fs::remove_file(&tmp_path);
228 return Err(SyncError::Io(e));
229 }
230
231 // Clear cloud_only flag in a transaction (mirrors download_missing_blobs).
232 let p = db_path.to_path_buf();
233 let h = hash.to_string();
234 tokio::task::spawn_blocking(move || -> Result<()> {
235 let conn = open_conn(&p)?;
236 conn.execute_batch("BEGIN IMMEDIATE")?;
237 conn.execute(
238 "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'",
239 [],
240 )?;
241 conn.execute(
242 "UPDATE samples SET cloud_only = 0 WHERE hash = ?1",
243 [&h],
244 )?;
245 conn.execute(
246 "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'",
247 [],
248 )?;
249 conn.execute_batch("COMMIT")?;
250 Ok(())
251 })
252 .await
253 .map_err(|e| SyncError::Other(e.to_string()))??;
254
255 Ok(())
256 }
257
258 /// Pull remote changes from the server and apply them locally.
259 #[instrument(skip_all)]
260 pub(crate) async fn pull_changes(
261 db_path: &std::path::Path,
262 client: &SyncKitClient,
263 device_id: Uuid,
264 ) -> Result<i64> {
265 let mut total_applied: i64 = 0;
266
267 loop {
268 let p = db_path.to_path_buf();
269 let cursor_str: String = tokio::task::spawn_blocking(move || {
270 let conn = open_conn(&p)?;
271 get_sync_state(&conn, "pull_cursor")
272 })
273 .await
274 .map_err(|e| SyncError::Other(e.to_string()))??;
275
276 let cursor: i64 = cursor_str.parse().unwrap_or(0);
277
278 let (changes, new_cursor, has_more) = client
279 .pull(device_id, cursor)
280 .await
281 .map_err(|e| SyncError::Client(e.to_string()))?;
282
283 if !changes.is_empty() {
284 let p = db_path.to_path_buf();
285 let applied = tokio::task::spawn_blocking(move || {
286 let conn = open_conn(&p)?;
287 super::resolve::apply_remote_changes(&conn, &changes)
288 })
289 .await
290 .map_err(|e| SyncError::Other(e.to_string()))??;
291 total_applied += applied;
292 }
293
294 let p = db_path.to_path_buf();
295 tokio::task::spawn_blocking(move || {
296 let conn = open_conn(&p)?;
297 set_sync_state(&conn, "pull_cursor", &new_cursor.to_string())
298 })
299 .await
300 .map_err(|e| SyncError::Other(e.to_string()))??;
301
302 if !has_more {
303 break;
304 }
305 }
306
307 Ok(total_applied)
308 }
309