//! Download logic: pull remote changes, download missing blobs. use std::path::Path; use sha2::{Digest, Sha256}; use synckit_client::SyncKitClient; use uuid::Uuid; use tracing::instrument; use audiofiles_core::store::{validate_hash, validate_extension}; use crate::error::{Result, SyncError}; use super::{open_conn, get_sync_state, set_sync_state}; /// Download blobs for samples that exist in metadata but not on disk. /// /// For samples in sync-enabled VFS entries where the local file is missing, /// downloads from the server and writes to the content directory. #[instrument(skip_all)] pub async fn download_missing_blobs( db_path: &Path, content_dir: &Path, client: &SyncKitClient, ) -> Result { let cd = content_dir.to_path_buf(); let p = db_path.to_path_buf(); let missing: Vec<(String, String)> = tokio::task::spawn_blocking(move || -> Result<_> { let conn = open_conn(&p)?; let mut stmt = conn.prepare( "SELECT DISTINCT s.hash, s.file_extension FROM samples s JOIN vfs_nodes vn ON vn.sample_hash = s.hash JOIN vfs v ON v.id = vn.vfs_id WHERE v.sync_files = 1", )?; let rows: Vec<(String, String)> = stmt .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()?; // Filter to only samples whose blob file doesn't exist locally let missing: Vec<(String, String)> = rows .into_iter() .filter(|(hash, ext)| { if validate_hash(hash).is_err() || validate_extension(ext).is_err() { tracing::warn!(hash, ext, "Skipping blob with invalid hash/extension"); return false; } let path = cd.join(format!("{}.{}", hash, ext)); !path.exists() }) .collect(); Ok(missing) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; let mut downloaded = 0u64; for (hash, ext) in &missing { // Mark cloud_only = 1 while downloading (in a transaction so the // applying_remote flag is always cleared, even on error) let p = db_path.to_path_buf(); let h = hash.clone(); tokio::task::spawn_blocking(move || -> Result<()> { let conn = open_conn(&p)?; conn.execute_batch("BEGIN IMMEDIATE")?; conn.execute( "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'", [], )?; conn.execute( "UPDATE samples SET cloud_only = 1 WHERE hash = ?1", [&h], )?; conn.execute( "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'", [], )?; conn.execute_batch("COMMIT")?; Ok(()) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; let download_url = match client.blob_download_url(hash).await { Ok(url) => url, Err(e) => { tracing::warn!("Failed to get download URL for {hash}: {e}"); continue; } }; let data = match client.blob_download(&download_url).await { Ok(d) => d, Err(e) => { tracing::warn!("Failed to download blob {hash}: {e}"); continue; } }; // Verify SHA-256 before writing let computed = format!("{:x}", Sha256::digest(&data)); if computed != *hash { tracing::warn!( expected = hash, actual = computed, "Downloaded blob hash mismatch, skipping" ); continue; } // Write blob atomically: temp file + rename to prevent partial writes on crash std::fs::create_dir_all(content_dir)?; let blob_path = content_dir.join(format!("{}.{}", hash, ext)); let tmp_path = content_dir.join(format!("{}.{}.tmp", hash, ext)); std::fs::write(&tmp_path, &data)?; if let Err(e) = std::fs::rename(&tmp_path, &blob_path) { let _ = std::fs::remove_file(&tmp_path); return Err(SyncError::Io(e)); } // Clear cloud_only flag (in a transaction) let p = db_path.to_path_buf(); let h = hash.clone(); tokio::task::spawn_blocking(move || -> Result<()> { let conn = open_conn(&p)?; conn.execute_batch("BEGIN IMMEDIATE")?; conn.execute( "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'", [], )?; conn.execute( "UPDATE samples SET cloud_only = 0 WHERE hash = ?1", [&h], )?; conn.execute( "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'", [], )?; conn.execute_batch("COMMIT")?; Ok(()) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; downloaded += 1; } Ok(downloaded) } /// Download a single sample blob by hash. Used by the "Download" context-menu /// item on cloud-only rows so the user can fetch just the sample they care /// about without triggering a full sync cycle. #[instrument(skip(client))] pub async fn download_one_blob( db_path: &Path, content_dir: &Path, client: &SyncKitClient, hash: &str, ) -> Result<()> { // Look up the sample's extension and verify it exists in metadata. let p = db_path.to_path_buf(); let h = hash.to_string(); let ext: String = tokio::task::spawn_blocking(move || -> Result { let conn = open_conn(&p)?; let ext: String = conn .query_row( "SELECT file_extension FROM samples WHERE hash = ?1", [&h], |row| row.get(0), ) .map_err(|e| match e { rusqlite::Error::QueryReturnedNoRows => { SyncError::Other(format!("Sample {h} not found in metadata")) } other => other.into(), })?; Ok(ext) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; if validate_hash(hash).is_err() || validate_extension(&ext).is_err() { return Err(SyncError::Other(format!( "Invalid hash or extension: {hash}.{ext}" ))); } let blob_path = content_dir.join(format!("{hash}.{ext}")); if blob_path.exists() { // Nothing to do — clear the cloud_only flag in case it's stale. let p = db_path.to_path_buf(); let h = hash.to_string(); tokio::task::spawn_blocking(move || -> Result<()> { let conn = open_conn(&p)?; conn.execute("UPDATE samples SET cloud_only = 0 WHERE hash = ?1", [&h])?; Ok(()) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; return Ok(()); } let download_url = client .blob_download_url(hash) .await .map_err(|e| SyncError::Client(e.to_string()))?; let data = client .blob_download(&download_url) .await .map_err(|e| SyncError::Client(e.to_string()))?; let computed = format!("{:x}", Sha256::digest(&data)); if computed != *hash { return Err(SyncError::Other(format!( "Downloaded blob hash mismatch (expected {hash}, got {computed})" ))); } std::fs::create_dir_all(content_dir)?; let tmp_path = content_dir.join(format!("{hash}.{ext}.tmp")); std::fs::write(&tmp_path, &data)?; if let Err(e) = std::fs::rename(&tmp_path, &blob_path) { let _ = std::fs::remove_file(&tmp_path); return Err(SyncError::Io(e)); } // Clear cloud_only flag in a transaction (mirrors download_missing_blobs). let p = db_path.to_path_buf(); let h = hash.to_string(); tokio::task::spawn_blocking(move || -> Result<()> { let conn = open_conn(&p)?; conn.execute_batch("BEGIN IMMEDIATE")?; conn.execute( "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'", [], )?; conn.execute( "UPDATE samples SET cloud_only = 0 WHERE hash = ?1", [&h], )?; conn.execute( "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'", [], )?; conn.execute_batch("COMMIT")?; Ok(()) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; Ok(()) } /// Pull remote changes from the server and apply them locally. #[instrument(skip_all)] pub(crate) async fn pull_changes( db_path: &std::path::Path, client: &SyncKitClient, device_id: Uuid, ) -> Result { let mut total_applied: i64 = 0; loop { let p = db_path.to_path_buf(); let cursor_str: String = tokio::task::spawn_blocking(move || { let conn = open_conn(&p)?; get_sync_state(&conn, "pull_cursor") }) .await .map_err(|e| SyncError::Other(e.to_string()))??; let cursor: i64 = cursor_str.parse().unwrap_or(0); let (changes, new_cursor, has_more) = client .pull(device_id, cursor) .await .map_err(|e| SyncError::Client(e.to_string()))?; if !changes.is_empty() { let p = db_path.to_path_buf(); let applied = tokio::task::spawn_blocking(move || { let conn = open_conn(&p)?; super::resolve::apply_remote_changes(&conn, &changes) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; total_applied += applied; } let p = db_path.to_path_buf(); tokio::task::spawn_blocking(move || { let conn = open_conn(&p)?; set_sync_state(&conn, "pull_cursor", &new_cursor.to_string()) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; if !has_more { break; } } Ok(total_applied) }