Skip to main content

max / goingson

4.6 KB · 151 lines History Blame Raw
1 //! Blob sync: upload local blobs to SyncKit, download missing blobs from SyncKit.
2
3 use std::path::Path;
4
5 use goingson_core::CoreError;
6 use sqlx::SqlitePool;
7 use synckit_client::SyncKitClient;
8 use tracing::{debug, info, warn};
9
10 use crate::commands::attachment::blob_path;
11 use crate::state::DESKTOP_USER_ID;
12
13 /// Upload local blobs that haven't been synced to the server yet.
14 ///
15 /// Queries all distinct blob hashes from attachments, checks which ones have local
16 /// files, and uploads them via the SyncKit blob API (presigned S3 + E2E encryption).
17 pub async fn upload_pending_blobs(
18 pool: &SqlitePool,
19 data_dir: &Path,
20 client: &SyncKitClient,
21 ) -> Result<i64, CoreError> {
22 let hashes: Vec<(String, i64)> = sqlx::query_as(
23 "SELECT DISTINCT a.blob_hash, a.file_size FROM attachments a WHERE a.user_id = ?"
24 )
25 .bind(DESKTOP_USER_ID.to_string())
26 .fetch_all(pool)
27 .await
28 .map_err(CoreError::database)?;
29
30 let mut uploaded = 0i64;
31
32 for (hash, size) in &hashes {
33 let path = blob_path(data_dir, hash);
34 if !tokio::fs::try_exists(&path).await.unwrap_or(false) {
35 continue; // No local blob to upload
36 }
37
38 // Request upload URL — server tells us if blob already exists
39 let upload_resp = match client.blob_upload_url(hash, *size).await {
40 Ok(r) => r,
41 Err(e) => {
42 warn!("Failed to get upload URL for blob {}: {}", &hash[..8], e);
43 continue;
44 }
45 };
46
47 if upload_resp.already_exists {
48 debug!("Blob {} already on server, skipping", &hash[..8]);
49 continue;
50 }
51
52 // Read and upload
53 let data = match tokio::fs::read(&path).await {
54 Ok(d) => d,
55 Err(e) => {
56 warn!("Failed to read blob {}: {}", &hash[..8], e);
57 continue;
58 }
59 };
60
61 if let Err(e) = client.blob_upload(&upload_resp.upload_url, data).await {
62 warn!("Failed to upload blob {}: {}", &hash[..8], e);
63 continue;
64 }
65
66 // Confirm upload
67 if let Err(e) = client.blob_confirm(hash, *size).await {
68 warn!("Failed to confirm blob {}: {}", &hash[..8], e);
69 continue;
70 }
71
72 uploaded += 1;
73 debug!("Uploaded blob {}", &hash[..8]);
74 }
75
76 if uploaded > 0 {
77 info!("Uploaded {} blobs", uploaded);
78 }
79 Ok(uploaded)
80 }
81
82 /// Download blobs that exist in attachment records but not on local disk.
83 ///
84 /// After metadata sync pulls attachment records from other devices, this function
85 /// downloads the actual blob data from SyncKit (presigned S3 + E2E decryption).
86 pub async fn download_missing_blobs(
87 pool: &SqlitePool,
88 data_dir: &Path,
89 client: &SyncKitClient,
90 ) -> Result<i64, CoreError> {
91 let hashes: Vec<(String,)> = sqlx::query_as(
92 "SELECT DISTINCT blob_hash FROM attachments WHERE user_id = ?"
93 )
94 .bind(DESKTOP_USER_ID.to_string())
95 .fetch_all(pool)
96 .await
97 .map_err(CoreError::database)?;
98
99 let blobs_dir = data_dir.join("blobs");
100 tokio::fs::create_dir_all(&blobs_dir)
101 .await
102 .map_err(|e| CoreError::internal(format!("Failed to create blobs dir: {}", e)))?;
103
104 let mut downloaded = 0i64;
105
106 for (hash,) in &hashes {
107 let path = blob_path(data_dir, hash);
108 if tokio::fs::try_exists(&path).await.unwrap_or(false) {
109 continue; // Already have it locally
110 }
111
112 // Get download URL
113 let download_url = match client.blob_download_url(hash).await {
114 Ok(url) => url,
115 Err(e) => {
116 warn!("Failed to get download URL for blob {}: {}", &hash[..8], e);
117 continue;
118 }
119 };
120
121 // Download and decrypt
122 let data = match client.blob_download(&download_url).await {
123 Ok(d) => d,
124 Err(e) => {
125 warn!("Failed to download blob {}: {}", &hash[..8], e);
126 continue;
127 }
128 };
129
130 // Write to disk atomically (tmp + rename) to prevent corrupt partial files
131 let tmp_path = path.with_extension("tmp");
132 if let Err(e) = tokio::fs::write(&tmp_path, &data).await {
133 warn!("Failed to write blob {}: {}", &hash[..8], e);
134 continue;
135 }
136 if let Err(e) = tokio::fs::rename(&tmp_path, &path).await {
137 warn!("Failed to rename blob {}: {}", &hash[..8], e);
138 let _ = tokio::fs::remove_file(&tmp_path).await;
139 continue;
140 }
141
142 downloaded += 1;
143 debug!("Downloaded blob {}", &hash[..8]);
144 }
145
146 if downloaded > 0 {
147 info!("Downloaded {} blobs", downloaded);
148 }
149 Ok(downloaded)
150 }
151