//! Push/pull sync engine: reads from sync_changelog, pushes to server, pulls remote changes. //! //! All rusqlite operations run synchronously inside `spawn_blocking` to avoid //! holding a `Connection` across async `.await` points (Connection is !Send). mod download; mod resolve; mod state; mod upload; use rusqlite::Connection; use tracing::instrument; use crate::error::{Result, SyncError}; pub(crate) const PUSH_BATCH_LIMIT: usize = 500; pub(crate) const MAX_CHANGELOG_ENTRIES: i64 = 10_000; /// FK-safe order: parents first for inserts/updates. pub(crate) const UPSERT_ORDER: &[&str] = &[ "vfs", "samples", "collections", "vfs_nodes", "audio_analysis", "tags", "collection_members", "user_config", "edit_history", ]; /// FK-safe order: children first for deletes. pub(crate) const DELETE_ORDER: &[&str] = &[ "edit_history", "user_config", "collection_members", "tags", "audio_analysis", "vfs_nodes", "collections", "samples", "vfs", ]; /// Result of a sync round. pub struct SyncResult { pub pushed: i64, pub pulled: i64, } /// Column whitelist per synced table. pub(crate) fn table_columns(table: &str) -> Option<&'static [&'static str]> { match table { "samples" => Some(&[ "hash", "original_name", "file_extension", "file_size", "import_date", "last_modified", "cloud_only", "duration", ]), "audio_analysis" => Some(&[ "hash", "bpm", "musical_key", "duration", "sample_rate", "channels", "peak_db", "rms_db", "is_loop", "spectral_centroid", "onset_strength", "analyzed_at", "lufs", "spectral_flatness", "spectral_rolloff", "zero_crossing_rate", "classification", "spectral_bandwidth", "centroid_variance", "crest_factor", "attack_time", "classification_confidence", ]), "vfs" => Some(&["id", "name", "created_at", "modified_at", "sync_files"]), "vfs_nodes" => Some(&[ "id", "vfs_id", "parent_id", "name", "node_type", "sample_hash", "created_at", ]), "tags" => Some(&["sample_hash", "tag"]), "collections" => Some(&["id", "name", "description", "created_at", "filter_json"]), "collection_members" => Some(&["collection_id", "sample_hash", "added_at"]), "user_config" => Some(&["key", "value"]), "edit_history" => Some(&[ "id", "source_hash", "result_hash", "operation", "params_json", "created_at", ]), _ => None, } } /// Primary key column(s) for each synced table. pub(crate) fn pk_columns(table: &str) -> &'static [&'static str] { match table { "samples" => &["hash"], "audio_analysis" => &["hash"], "vfs" => &["id"], "vfs_nodes" => &["id"], "tags" => &["sample_hash", "tag"], "collections" => &["id"], "collection_members" => &["collection_id", "sample_hash"], "user_config" => &["key"], "edit_history" => &["id"], _ => &[], } } /// Open a WAL-mode connection for use inside spawn_blocking. #[instrument(skip_all)] pub(crate) fn open_conn(db_path: &std::path::Path) -> Result { let conn = Connection::open(db_path)?; conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?; Ok(conn) } /// Convert a JSON value to a rusqlite-compatible SQL parameter. pub(crate) fn json_to_sql(val: &serde_json::Value) -> Box { match val { serde_json::Value::String(s) => Box::new(s.clone()), serde_json::Value::Number(n) => { if let Some(i) = n.as_i64() { Box::new(i) } else if let Some(f) = n.as_f64() { Box::new(f) } else { Box::new(n.to_string()) } } serde_json::Value::Bool(b) => Box::new(*b as i32), serde_json::Value::Null => Box::new(rusqlite::types::Null), other => Box::new(other.to_string()), } } /// Read a value from the sync_state key-value table. #[instrument(skip_all)] pub fn get_sync_state(conn: &Connection, key: &str) -> Result { conn.query_row( "SELECT value FROM sync_state WHERE key = ?1", [key], |row| row.get(0), ) .map_err(SyncError::Db) } /// Write a value to the sync_state key-value table. /// /// Uses INSERT OR REPLACE to handle keys that may not exist yet (e.g. after /// partial migration or DB repair). #[instrument(skip_all)] pub fn set_sync_state(conn: &Connection, key: &str, value: &str) -> Result<()> { conn.execute( "INSERT OR REPLACE INTO sync_state (key, value) VALUES (?1, ?2)", rusqlite::params![key, value], )?; Ok(()) } /// Count unpushed changelog entries. #[instrument(skip_all)] pub fn count_pending_changes(conn: &Connection) -> Result { conn.query_row( "SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0", [], |row| row.get(0), ) .map_err(SyncError::Db) } // Re-exports from submodules pub use download::{download_missing_blobs, download_one_blob}; pub use state::{cleanup_changelog, create_initial_snapshot, enforce_changelog_retention, mark_cloud_only_samples}; pub use upload::{perform_sync, upload_pending_blobs};