//! Upload logic: perform_sync, device registration, push changes, blob uploads. use std::path::Path; use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient}; use uuid::Uuid; use tracing::instrument; use audiofiles_core::store::{validate_hash, validate_extension}; use crate::error::{Result, SyncError}; use super::{open_conn, get_sync_state, set_sync_state, SyncResult, PUSH_BATCH_LIMIT}; /// Run a full sync cycle: register device, push, pull, update timestamp. /// /// The `db_path` is used to open separate connections inside `spawn_blocking`, /// avoiding the `Connection` is `!Send` issue. #[instrument(skip_all)] pub async fn perform_sync( db_path: &std::path::Path, client: &SyncKitClient, ) -> Result { // Clear applying_remote flag in case a previous sync crashed mid-apply. // If the flag is stuck at "1", all local changes silently skip the changelog. let p = db_path.to_path_buf(); tokio::task::spawn_blocking(move || { let conn = open_conn(&p)?; set_sync_state(&conn, "applying_remote", "0") }) .await .map_err(|e| SyncError::Other(e.to_string()))??; let device_id = ensure_device_registered(db_path, client).await?; let pushed = push_changes(db_path, client, device_id).await?; let pulled = super::download::pull_changes(db_path, client, device_id).await?; let p = db_path.to_path_buf(); tokio::task::spawn_blocking(move || { let conn = open_conn(&p)?; set_sync_state(&conn, "last_sync_at", &chrono::Utc::now().to_rfc3339()) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; Ok(SyncResult { pushed, pulled }) } /// Upload blobs for samples in VFS entries with sync_files enabled. /// /// Finds samples that have local files (cloud_only = 0) in sync-enabled VFS /// entries, and uploads them to the server via presigned URLs. #[instrument(skip_all)] pub async fn upload_pending_blobs( db_path: &Path, content_dir: &Path, client: &SyncKitClient, ) -> Result { let p = db_path.to_path_buf(); let pending: Vec<(String, String, i64)> = tokio::task::spawn_blocking(move || -> Result<_> { let conn = open_conn(&p)?; let mut stmt = conn.prepare( "SELECT DISTINCT s.hash, s.file_extension, s.file_size FROM samples s JOIN vfs_nodes vn ON vn.sample_hash = s.hash JOIN vfs v ON v.id = vn.vfs_id WHERE v.sync_files = 1 AND s.cloud_only = 0", )?; let rows = stmt .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))? .collect::, _>>()?; Ok(rows) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; let mut uploaded = 0u64; for (hash, ext, size) in &pending { if validate_hash(hash).is_err() || validate_extension(ext).is_err() { tracing::warn!(hash, ext, "Skipping upload with invalid hash/extension"); continue; } // Content-addressed flat layout: content_dir/{hash}.{ext} let blob_path = content_dir.join(format!("{}.{}", hash, ext)); if !blob_path.exists() { continue; } let resp = client .blob_upload_url(hash, *size) .await .map_err(|e| SyncError::Client(e.to_string()))?; if resp.already_exists { uploaded += 1; continue; } let data = std::fs::read(&blob_path) .map_err(SyncError::Io)?; client .blob_upload(&resp.upload_url, data) .await .map_err(|e| SyncError::Client(e.to_string()))?; client .blob_confirm(hash, *size) .await .map_err(|e| SyncError::Client(e.to_string()))?; uploaded += 1; } Ok(uploaded) } /// Ensure this device is registered with the server. Caches device_id in sync_state. #[instrument(skip_all)] async fn ensure_device_registered( db_path: &std::path::Path, client: &SyncKitClient, ) -> Result { let p = db_path.to_path_buf(); let stored: String = tokio::task::spawn_blocking(move || { let conn = open_conn(&p)?; get_sync_state(&conn, "device_id") }) .await .map_err(|e| SyncError::Other(e.to_string()))??; if !stored.is_empty() && let Ok(id) = stored.parse::() { return Ok(id); } let hostname = std::env::var("HOSTNAME") .or_else(|_| std::env::var("COMPUTERNAME")) .unwrap_or_else(|_| "unknown".to_string()); let platform = std::env::consts::OS; let device = client .register_device(&hostname, platform) .await .map_err(|e| SyncError::Client(e.to_string()))?; let p = db_path.to_path_buf(); let device_id = device.id; tokio::task::spawn_blocking(move || { let conn = open_conn(&p)?; set_sync_state(&conn, "device_id", &device_id.to_string()) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; Ok(device.id) } /// Push unpushed changelog entries to the server. #[instrument(skip_all)] async fn push_changes( db_path: &std::path::Path, client: &SyncKitClient, device_id: Uuid, ) -> Result { let p = db_path.to_path_buf(); let rows = tokio::task::spawn_blocking(move || -> Result<_> { let conn = open_conn(&p)?; let mut stmt = conn.prepare( "SELECT id, table_name, op, row_id, timestamp, data FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC LIMIT ?1", )?; let rows: Vec<(i64, String, String, String, String, Option)> = stmt .query_map([PUSH_BATCH_LIMIT as i64], |row| { Ok(( row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?, )) })? .collect::, _>>()?; Ok(rows) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; if rows.is_empty() { return Ok(0); } let mut pushed_ids: Vec = Vec::with_capacity(rows.len()); let mut skipped = 0i64; let changes: Vec = rows .into_iter() .filter_map(|(id, table, op, row_id, timestamp, data)| { let ts = chrono::DateTime::parse_from_rfc3339(×tamp) .map(|dt| dt.with_timezone(&chrono::Utc)) .unwrap_or_else(|_| chrono::Utc::now()); let change_op = match ChangeOp::from_str_opt(&op) { Some(o) => o, None => { tracing::warn!(id, "Skipping changelog entry with unknown op: {op} — marking as pushed to prevent retry loop"); pushed_ids.push(id); skipped += 1; return None; } }; let parsed_data = match data { Some(d) => match serde_json::from_str(&d) { Ok(v) => Some(v), Err(e) => { tracing::warn!(row_id = %row_id, "Unparseable changelog JSON, marking as pushed to prevent retry loop: {e}"); pushed_ids.push(id); skipped += 1; return None; } }, None => None, }; pushed_ids.push(id); Some(ChangeEntry { table, op: change_op, row_id, timestamp: ts, data: parsed_data, }) }) .collect(); if skipped > 0 { tracing::warn!(skipped, "Some changelog entries could not be parsed (unknown op or bad JSON) and were marked pushed to break the retry loop; they are dropped, not retried"); } if !changes.is_empty() { client .push(device_id, changes) .await .map_err(|e| SyncError::Client(e.to_string()))?; } let pushed_count = pushed_ids.len() as i64; // Mark only successfully-pushed entries if !pushed_ids.is_empty() { let p = db_path.to_path_buf(); tokio::task::spawn_blocking(move || { let conn = open_conn(&p)?; let placeholders: String = pushed_ids.iter().map(|_| "?").collect::>().join(","); let sql = format!("UPDATE sync_changelog SET pushed = 1 WHERE id IN ({})", placeholders); let params: Vec<&dyn rusqlite::types::ToSql> = pushed_ids .iter() .map(|id| id as &dyn rusqlite::types::ToSql) .collect(); conn.execute(&sql, params.as_slice())?; Ok::<_, SyncError>(()) }) .await .map_err(|e| SyncError::Other(e.to_string()))??; } Ok(pushed_count) }