//! Blob sync: upload local blobs to SyncKit, download missing blobs from SyncKit. use std::path::Path; use goingson_core::CoreError; use sqlx::SqlitePool; use synckit_client::SyncKitClient; use tracing::{debug, info, warn}; use crate::commands::attachment::blob_path; use crate::state::DESKTOP_USER_ID; /// Upload local blobs that haven't been synced to the server yet. /// /// Queries all distinct blob hashes from attachments, checks which ones have local /// files, and uploads them via the SyncKit blob API (presigned S3 + E2E encryption). pub async fn upload_pending_blobs( pool: &SqlitePool, data_dir: &Path, client: &SyncKitClient, ) -> Result { let hashes: Vec<(String, i64)> = sqlx::query_as( "SELECT DISTINCT a.blob_hash, a.file_size FROM attachments a WHERE a.user_id = ?" ) .bind(DESKTOP_USER_ID.to_string()) .fetch_all(pool) .await .map_err(CoreError::database)?; let mut uploaded = 0i64; for (hash, size) in &hashes { let path = blob_path(data_dir, hash); if !tokio::fs::try_exists(&path).await.unwrap_or(false) { continue; // No local blob to upload } // Request upload URL — server tells us if blob already exists let upload_resp = match client.blob_upload_url(hash, *size).await { Ok(r) => r, Err(e) => { warn!("Failed to get upload URL for blob {}: {}", &hash[..8], e); continue; } }; if upload_resp.already_exists { debug!("Blob {} already on server, skipping", &hash[..8]); continue; } // Read and upload let data = match tokio::fs::read(&path).await { Ok(d) => d, Err(e) => { warn!("Failed to read blob {}: {}", &hash[..8], e); continue; } }; if let Err(e) = client.blob_upload(&upload_resp.upload_url, data).await { warn!("Failed to upload blob {}: {}", &hash[..8], e); continue; } // Confirm upload if let Err(e) = client.blob_confirm(hash, *size).await { warn!("Failed to confirm blob {}: {}", &hash[..8], e); continue; } uploaded += 1; debug!("Uploaded blob {}", &hash[..8]); } if uploaded > 0 { info!("Uploaded {} blobs", uploaded); } Ok(uploaded) } /// Download blobs that exist in attachment records but not on local disk. /// /// After metadata sync pulls attachment records from other devices, this function /// downloads the actual blob data from SyncKit (presigned S3 + E2E decryption). pub async fn download_missing_blobs( pool: &SqlitePool, data_dir: &Path, client: &SyncKitClient, ) -> Result { let hashes: Vec<(String,)> = sqlx::query_as( "SELECT DISTINCT blob_hash FROM attachments WHERE user_id = ?" ) .bind(DESKTOP_USER_ID.to_string()) .fetch_all(pool) .await .map_err(CoreError::database)?; let blobs_dir = data_dir.join("blobs"); tokio::fs::create_dir_all(&blobs_dir) .await .map_err(|e| CoreError::internal(format!("Failed to create blobs dir: {}", e)))?; let mut downloaded = 0i64; for (hash,) in &hashes { let path = blob_path(data_dir, hash); if tokio::fs::try_exists(&path).await.unwrap_or(false) { continue; // Already have it locally } // Get download URL let download_url = match client.blob_download_url(hash).await { Ok(url) => url, Err(e) => { warn!("Failed to get download URL for blob {}: {}", &hash[..8], e); continue; } }; // Download and decrypt let data = match client.blob_download(&download_url).await { Ok(d) => d, Err(e) => { warn!("Failed to download blob {}: {}", &hash[..8], e); continue; } }; // Write to disk atomically (tmp + rename) to prevent corrupt partial files let tmp_path = path.with_extension("tmp"); if let Err(e) = tokio::fs::write(&tmp_path, &data).await { warn!("Failed to write blob {}: {}", &hash[..8], e); continue; } if let Err(e) = tokio::fs::rename(&tmp_path, &path).await { warn!("Failed to rename blob {}: {}", &hash[..8], e); let _ = tokio::fs::remove_file(&tmp_path).await; continue; } downloaded += 1; debug!("Downloaded blob {}", &hash[..8]); } if downloaded > 0 { info!("Downloaded {} blobs", downloaded); } Ok(downloaded) }