//! State tracking: snapshots, changelog maintenance, cloud-only marking. use std::path::Path; use rusqlite::Connection; use tracing::instrument; use crate::error::Result; use super::{get_sync_state, set_sync_state, MAX_CHANGELOG_ENTRIES}; /// Create initial snapshot: insert all existing rows into sync_changelog. #[instrument(skip_all)] pub fn create_initial_snapshot(conn: &Connection) -> Result { let done = get_sync_state(conn, "initial_snapshot_done")?; if done == "1" { return Ok(0); } let tx = conn.unchecked_transaction()?; let mut total: i64 = 0; let table_queries: &[(&str, &str)] = &[ ("samples", "SELECT hash, json_object('hash', hash, 'original_name', original_name, 'file_extension', file_extension, 'file_size', file_size, 'import_date', import_date, 'last_modified', last_modified, 'cloud_only', cloud_only, 'duration', duration) FROM samples"), ("audio_analysis", "SELECT hash, json_object('hash', hash, 'bpm', bpm, 'musical_key', musical_key, 'duration', duration, 'sample_rate', sample_rate, 'channels', channels, 'peak_db', peak_db, 'rms_db', rms_db, 'is_loop', is_loop, 'spectral_centroid', spectral_centroid, 'onset_strength', onset_strength, 'analyzed_at', analyzed_at, 'lufs', lufs, 'spectral_flatness', spectral_flatness, 'spectral_rolloff', spectral_rolloff, 'zero_crossing_rate', zero_crossing_rate, 'classification', classification, 'spectral_bandwidth', spectral_bandwidth, 'centroid_variance', centroid_variance, 'crest_factor', crest_factor, 'attack_time', attack_time, 'classification_confidence', classification_confidence) FROM audio_analysis"), ("vfs", "SELECT CAST(id AS TEXT), json_object('id', id, 'name', name, 'created_at', created_at, 'modified_at', modified_at, 'sync_files', sync_files) FROM vfs"), ("vfs_nodes", "SELECT CAST(id AS TEXT), json_object('id', id, 'vfs_id', vfs_id, 'parent_id', parent_id, 'name', name, 'node_type', node_type, 'sample_hash', sample_hash, 'created_at', created_at) FROM vfs_nodes"), ("tags", "SELECT sample_hash || ':' || tag, json_object('sample_hash', sample_hash, 'tag', tag) FROM tags"), ("collections", "SELECT CAST(id AS TEXT), json_object('id', id, 'name', name, 'description', description, 'created_at', created_at, 'filter_json', filter_json) FROM collections"), ("collection_members", "SELECT CAST(collection_id AS TEXT) || ':' || sample_hash, json_object('collection_id', collection_id, 'sample_hash', sample_hash, 'added_at', added_at) FROM collection_members"), ("user_config", "SELECT key, json_object('key', key, 'value', value) FROM user_config WHERE key NOT LIKE 'sync_%' AND key != 'loose_files'"), ("edit_history", "SELECT CAST(id AS TEXT), json_object('id', id, 'source_hash', source_hash, 'result_hash', result_hash, 'operation', operation, 'params_json', params_json, 'created_at', created_at) FROM edit_history"), ]; for (table, query) in table_queries { let mut stmt = tx.prepare(query)?; let rows: Vec<(String, String)> = stmt .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()?; for (row_id, data) in &rows { tx.execute( "INSERT INTO sync_changelog (table_name, op, row_id, data) VALUES (?1, 'INSERT', ?2, ?3)", rusqlite::params![table, row_id, data], )?; } total += rows.len() as i64; } set_sync_state(&tx, "initial_snapshot_done", "1")?; tx.commit()?; Ok(total) } /// Delete pushed changelog entries older than 7 days. #[instrument(skip_all)] pub fn cleanup_changelog(conn: &Connection) -> Result { let deleted = conn.execute( "DELETE FROM sync_changelog WHERE pushed = 1 AND timestamp < datetime('now', '-7 days')", [], )?; Ok(deleted as i64) } /// Enforce a hard cap on total changelog entries to prevent unbounded growth /// when sync is disconnected or failing. Deletes the oldest entries (by rowid) /// to bring the count back under `MAX_CHANGELOG_ENTRIES`. #[instrument(skip_all)] pub fn enforce_changelog_retention(conn: &Connection) -> Result { let count: i64 = conn.query_row("SELECT COUNT(*) FROM sync_changelog", [], |r| r.get(0))?; if count <= MAX_CHANGELOG_ENTRIES { return Ok(0); } let excess = count - MAX_CHANGELOG_ENTRIES; // Prefer deleting already-pushed entries first to avoid losing unsynced changes. let deleted_pushed = conn.execute( "DELETE FROM sync_changelog WHERE id IN \ (SELECT id FROM sync_changelog WHERE pushed = 1 ORDER BY id ASC LIMIT ?1)", [excess], )?; let mut total_deleted = deleted_pushed as i64; // If still over cap, reluctantly delete unpushed entries (oldest first). if total_deleted < excess { let remaining = excess - total_deleted; let deleted_unpushed = conn.execute( "DELETE FROM sync_changelog WHERE id IN \ (SELECT id FROM sync_changelog ORDER BY id ASC LIMIT ?1)", [remaining], )?; if deleted_unpushed > 0 { tracing::warn!( deleted_unpushed, "Changelog retention: dropped unpushed entries — sync was offline too long" ); } total_deleted += deleted_unpushed as i64; } tracing::warn!( deleted = total_deleted, total = count, limit = MAX_CHANGELOG_ENTRIES, "Changelog retention cap enforced" ); Ok(total_deleted) } /// Mark samples as cloud_only when their blob doesn't exist on disk. /// /// After pulling remote changes, a sample row may be created with cloud_only=0 /// (as it was on the origin device). If the local content directory doesn't have /// the blob file, we correct the flag to cloud_only=1. This runs with /// applying_remote=1 to suppress changelog triggers. #[instrument(skip_all)] pub fn mark_cloud_only_samples(conn: &Connection, content_dir: &Path) -> Result { let mut stmt = conn.prepare( "SELECT hash, file_extension FROM samples WHERE cloud_only = 0", )?; let rows: Vec<(String, String)> = stmt .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()?; // Collect hashes missing on disk before taking a transaction. let missing: Vec<&str> = rows .iter() .filter(|(hash, ext)| !content_dir.join(format!("{}.{}", hash, ext)).exists()) .map(|(hash, _)| hash.as_str()) .collect(); if missing.is_empty() { return Ok(0); } conn.execute_batch("BEGIN IMMEDIATE")?; conn.execute( "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'", [], )?; let result = (|| -> Result { let mut marked = 0i64; for hash in &missing { conn.execute( "UPDATE samples SET cloud_only = 1 WHERE hash = ?1", [hash], )?; marked += 1; } Ok(marked) })(); conn.execute( "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'", [], )?; match result { Ok(marked) => { conn.execute_batch("COMMIT")?; Ok(marked) } Err(e) => { let _ = conn.execute_batch("ROLLBACK"); Err(e) } } } #[cfg(test)] mod tests { use super::*; use super::super::{ UPSERT_ORDER, DELETE_ORDER, table_columns, pk_columns, resolve::{apply_upsert, apply_delete, apply_remote_changes}, }; use audiofiles_core::db::Database; use serde_json::json; use synckit_client::{ChangeEntry, ChangeOp}; fn setup_test_db() -> Database { Database::open_in_memory().expect("Failed to create test DB") } fn insert_sample(conn: &Connection, hash: &str, name: &str, ext: &str) { let now = chrono::Utc::now().timestamp(); conn.execute( "INSERT INTO samples (hash, original_name, file_extension, file_size, import_date, last_modified) VALUES (?1, ?2, ?3, 1024, ?4, ?4)", rusqlite::params![hash, name, ext, now], ).unwrap(); } fn insert_vfs(conn: &Connection, name: &str, sync_files: bool) -> i64 { let now = chrono::Utc::now().timestamp(); conn.execute( "INSERT INTO vfs (name, created_at, modified_at, sync_files) VALUES (?1, ?2, ?2, ?3)", rusqlite::params![name, now, sync_files as i64], ).unwrap(); conn.last_insert_rowid() } fn clear_changelog(conn: &Connection) { conn.execute("DELETE FROM sync_changelog", []).unwrap(); } fn changelog_count(conn: &Connection, table: Option<&str>, op: Option<&str>) -> i64 { match (table, op) { (Some(t), Some(o)) => conn.query_row( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = ?1 AND op = ?2", rusqlite::params![t, o], |row| row.get(0), ).unwrap(), (Some(t), None) => conn.query_row( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = ?1", [t], |row| row.get(0), ).unwrap(), (None, Some(o)) => conn.query_row( "SELECT COUNT(*) FROM sync_changelog WHERE op = ?1", [o], |row| row.get(0), ).unwrap(), (None, None) => conn.query_row( "SELECT COUNT(*) FROM sync_changelog", [], |row| row.get(0), ).unwrap(), } } fn change(table: &str, op: ChangeOp, row_id: &str, data: Option) -> ChangeEntry { ChangeEntry { table: table.to_string(), op, row_id: row_id.to_string(), timestamp: chrono::Utc::now(), data, } } // ── FK ordering ── #[test] fn upsert_order_parents_before_children() { let pos = |t: &str| UPSERT_ORDER.iter().position(|x| *x == t).unwrap(); assert!(pos("vfs") < pos("vfs_nodes")); assert!(pos("samples") < pos("audio_analysis")); assert!(pos("samples") < pos("tags")); assert!(pos("samples") < pos("collection_members")); assert!(pos("collections") < pos("collection_members")); } #[test] fn delete_order_children_before_parents() { let pos = |t: &str| DELETE_ORDER.iter().position(|x| *x == t).unwrap(); assert!(pos("vfs_nodes") < pos("vfs")); assert!(pos("audio_analysis") < pos("samples")); assert!(pos("tags") < pos("samples")); assert!(pos("collection_members") < pos("collections")); assert!(pos("collection_members") < pos("samples")); } #[test] fn orders_are_exact_reverses() { let reversed: Vec<&str> = UPSERT_ORDER.iter().rev().copied().collect(); assert_eq!(reversed, DELETE_ORDER); } // ── Column whitelists ── #[test] fn all_tables_have_column_whitelists() { for table in UPSERT_ORDER { assert!( table_columns(table).is_some(), "missing column whitelist for: {}", table ); } } #[test] fn unknown_table_returns_none() { assert!(table_columns("nonexistent").is_none()); assert!(table_columns("fingerprints").is_none()); } #[test] fn pk_columns_covers_all_tables() { for table in UPSERT_ORDER { let pks = pk_columns(table); assert!(!pks.is_empty(), "missing pk_columns for: {}", table); } assert_eq!(pk_columns("tags"), &["sample_hash", "tag"]); assert_eq!(pk_columns("collection_members"), &["collection_id", "sample_hash"]); } // ── Triggers ── #[test] fn sample_insert_fires_trigger() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); insert_sample(conn, "abc123", "kick.wav", "wav"); assert_eq!(changelog_count(conn, Some("samples"), Some("INSERT")), 1); // Post-M018: row_id is `hash_row_id(salt, "abc123")` so we look up // by table+op and inspect the canonical hash inside the encrypted- // at-the-wire `data` field. let data: String = conn.query_row( "SELECT data FROM sync_changelog WHERE table_name = 'samples' AND op = 'INSERT'", [], |row| row.get(0), ).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data).unwrap(); assert!(parsed.get("cloud_only").is_some()); assert_eq!(parsed["hash"], "abc123"); } #[test] fn vfs_insert_fires_trigger() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); let vfs_id = insert_vfs(conn, "Library", true); assert_eq!(changelog_count(conn, Some("vfs"), Some("INSERT")), 1); let row_id: String = conn.query_row( "SELECT row_id FROM sync_changelog WHERE table_name = 'vfs'", [], |row| row.get(0), ).unwrap(); assert_eq!(row_id, vfs_id.to_string()); } #[test] fn tag_insert_fires_trigger() { let db = setup_test_db(); let conn = db.conn(); insert_sample(conn, "hash1", "snare.wav", "wav"); clear_changelog(conn); conn.execute( "INSERT INTO tags (sample_hash, tag) VALUES ('hash1', 'drums')", [], ).unwrap(); assert_eq!(changelog_count(conn, Some("tags"), Some("INSERT")), 1); // Post-M018: row_id is hashed; the cleartext key lives in `data`. let data: String = conn.query_row( "SELECT data FROM sync_changelog WHERE table_name = 'tags'", [], |row| row.get(0), ).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data).unwrap(); assert_eq!(parsed["sample_hash"], "hash1"); assert_eq!(parsed["tag"], "drums"); } #[test] fn collection_member_insert_fires_trigger() { let db = setup_test_db(); let conn = db.conn(); insert_sample(conn, "hash2", "hat.wav", "wav"); let now = chrono::Utc::now().timestamp(); conn.execute( "INSERT INTO collections (name, description, created_at) VALUES ('Faves', NULL, ?1)", [now], ).unwrap(); let collection_id = conn.last_insert_rowid(); clear_changelog(conn); conn.execute( "INSERT INTO collection_members (collection_id, sample_hash, added_at) VALUES (?1, 'hash2', ?2)", rusqlite::params![collection_id, now], ).unwrap(); assert_eq!(changelog_count(conn, Some("collection_members"), Some("INSERT")), 1); let data: String = conn.query_row( "SELECT data FROM sync_changelog WHERE table_name = 'collection_members'", [], |row| row.get(0), ).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data).unwrap(); assert_eq!(parsed["collection_id"].as_i64().unwrap(), collection_id); assert_eq!(parsed["sample_hash"], "hash2"); } // ── Trigger suppression ── #[test] fn trigger_suppression_during_remote_apply() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); set_sync_state(conn, "applying_remote", "1").unwrap(); insert_sample(conn, "suppressed", "test.wav", "wav"); assert_eq!(changelog_count(conn, None, None), 0); set_sync_state(conn, "applying_remote", "0").unwrap(); insert_sample(conn, "unsuppressed", "test2.wav", "wav"); assert_eq!(changelog_count(conn, None, None), 1); } // ── apply_upsert ── #[test] fn apply_upsert_inserts_sample() { let db = setup_test_db(); let conn = db.conn(); let data = json!({ "hash": "upsert_hash", "original_name": "synced.wav", "file_extension": "wav", "file_size": 2048, "import_date": 1000000, "last_modified": 1000000, "cloud_only": 0 }); apply_upsert(conn, "samples", &data).unwrap(); let name: String = conn.query_row( "SELECT original_name FROM samples WHERE hash = 'upsert_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(name, "synced.wav"); } #[test] fn apply_upsert_inserts_vfs_node() { let db = setup_test_db(); let conn = db.conn(); let vfs_id = insert_vfs(conn, "TestVFS", false); insert_sample(conn, "node_hash", "pad.wav", "wav"); let data = json!({ "id": 999, "vfs_id": vfs_id, "parent_id": null, "name": "pad.wav", "node_type": "sample", "sample_hash": "node_hash", "created_at": 1000000 }); apply_upsert(conn, "vfs_nodes", &data).unwrap(); let name: String = conn.query_row( "SELECT name FROM vfs_nodes WHERE id = 999", [], |row| row.get(0), ).unwrap(); assert_eq!(name, "pad.wav"); } #[test] fn apply_upsert_unknown_table_is_no_op() { let db = setup_test_db(); let conn = db.conn(); let data = json!({"id": "abc"}); let result = apply_upsert(conn, "nonexistent_table", &data); assert!(result.is_ok()); } // ── apply_delete ── #[test] fn apply_delete_removes_sample() { let db = setup_test_db(); let conn = db.conn(); insert_sample(conn, "del_hash", "delete_me.wav", "wav"); apply_delete(conn, "samples", "del_hash", None).unwrap(); let count: i64 = conn.query_row( "SELECT COUNT(*) FROM samples WHERE hash = 'del_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(count, 0); } #[test] fn apply_delete_composite_pk_tags() { let db = setup_test_db(); let conn = db.conn(); insert_sample(conn, "tag_hash", "tagged.wav", "wav"); conn.execute( "INSERT INTO tags (sample_hash, tag) VALUES ('tag_hash', 'bass')", [], ).unwrap(); apply_delete(conn, "tags", "tag_hash:bass", None).unwrap(); let count: i64 = conn.query_row( "SELECT COUNT(*) FROM tags WHERE sample_hash = 'tag_hash' AND tag = 'bass'", [], |row| row.get(0), ).unwrap(); assert_eq!(count, 0); } // ── Full pipeline ── #[test] fn apply_remote_changes_full_pipeline() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); // Changes in wrong FK order — apply_remote_changes reorders via UPSERT_ORDER let changes = vec![ change("vfs_nodes", ChangeOp::Insert, "1", Some(json!({ "id": 1, "vfs_id": 1, "parent_id": null, "name": "kick.wav", "node_type": "sample", "sample_hash": "pipe_hash", "created_at": 1000000 }))), change("vfs", ChangeOp::Insert, "1", Some(json!({ "id": 1, "name": "Library", "created_at": 1000000, "modified_at": 1000000, "sync_files": 1 }))), change("samples", ChangeOp::Insert, "pipe_hash", Some(json!({ "hash": "pipe_hash", "original_name": "kick.wav", "file_extension": "wav", "file_size": 4096, "import_date": 1000000, "last_modified": 1000000, "cloud_only": 0 }))), ]; let applied = apply_remote_changes(conn, &changes).unwrap(); assert_eq!(applied, 3); let sample_count: i64 = conn.query_row( "SELECT COUNT(*) FROM samples WHERE hash = 'pipe_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(sample_count, 1); let vfs_count: i64 = conn.query_row( "SELECT COUNT(*) FROM vfs WHERE name = 'Library'", [], |row| row.get(0), ).unwrap(); assert_eq!(vfs_count, 1); let node_count: i64 = conn.query_row( "SELECT COUNT(*) FROM vfs_nodes WHERE sample_hash = 'pipe_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(node_count, 1); // Changelog should be empty (triggers were suppressed) assert_eq!(changelog_count(conn, None, None), 0); } // ── Initial snapshot ── #[test] fn create_initial_snapshot_captures_all_rows() { let db = setup_test_db(); let conn = db.conn(); // Suppress triggers during data setup set_sync_state(conn, "applying_remote", "1").unwrap(); insert_sample(conn, "snap1", "one.wav", "wav"); insert_sample(conn, "snap2", "two.wav", "wav"); insert_vfs(conn, "TestLib", false); set_sync_state(conn, "applying_remote", "0").unwrap(); clear_changelog(conn); let total = create_initial_snapshot(conn).unwrap(); // 2 samples + 1 vfs + 1 user_config seed // (`sample_tombstone_retain_days = 30`, seeded by M019). assert_eq!(total, 4); } #[test] fn create_initial_snapshot_idempotent() { let db = setup_test_db(); let conn = db.conn(); set_sync_state(conn, "applying_remote", "1").unwrap(); insert_sample(conn, "idem", "test.wav", "wav"); set_sync_state(conn, "applying_remote", "0").unwrap(); clear_changelog(conn); let first = create_initial_snapshot(conn).unwrap(); assert!(first > 0); let second = create_initial_snapshot(conn).unwrap(); assert_eq!(second, 0); } // ── Changelog helpers ── #[test] fn cleanup_removes_old_pushed() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); // Old pushed — should be deleted conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \ VALUES ('samples', 'INSERT', 'old', datetime('now', '-10 days'), 1)", [], ).unwrap(); // Recent pushed — should remain conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \ VALUES ('samples', 'INSERT', 'recent', datetime('now'), 1)", [], ).unwrap(); // Old unpushed — should remain (never delete unpushed) conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \ VALUES ('samples', 'INSERT', 'unpushed', datetime('now', '-10 days'), 0)", [], ).unwrap(); let deleted = cleanup_changelog(conn).unwrap(); assert_eq!(deleted, 1); let remaining: i64 = conn.query_row( "SELECT COUNT(*) FROM sync_changelog", [], |row| row.get(0), ).unwrap(); assert_eq!(remaining, 2); } #[test] fn enforce_retention_caps_entries() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); let total = MAX_CHANGELOG_ENTRIES + 500; for i in 0..total { conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ VALUES ('samples', 'UPDATE', ?1, '{}')", [format!("row-{}", i)], ) .unwrap(); } let before: i64 = conn.query_row("SELECT COUNT(*) FROM sync_changelog", [], |r| r.get(0)).unwrap(); assert_eq!(before, total); let deleted = enforce_changelog_retention(conn).unwrap(); assert_eq!(deleted, 500); let after: i64 = conn.query_row("SELECT COUNT(*) FROM sync_changelog", [], |r| r.get(0)).unwrap(); assert_eq!(after, MAX_CHANGELOG_ENTRIES); // Oldest entries removed — the remaining start at row-500 let min_row_id: String = conn .query_row( "SELECT row_id FROM sync_changelog ORDER BY id ASC LIMIT 1", [], |r| r.get(0), ) .unwrap(); assert_eq!(min_row_id, "row-500"); } #[test] fn enforce_retention_noop_under_cap() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); for i in 0..5 { conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ VALUES ('samples', 'INSERT', ?1, '{}')", [format!("row-{}", i)], ) .unwrap(); } let deleted = enforce_changelog_retention(conn).unwrap(); assert_eq!(deleted, 0); } #[test] fn count_pending_counts_unpushed() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); // 3 unpushed for i in 0..3 { conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, pushed) VALUES ('samples', 'INSERT', ?1, 0)", [format!("un-{}", i)], ).unwrap(); } // 2 pushed for i in 0..2 { conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, pushed) VALUES ('samples', 'INSERT', ?1, 1)", [format!("push-{}", i)], ).unwrap(); } let count = super::super::count_pending_changes(conn).unwrap(); assert_eq!(count, 3); } // --- mark_cloud_only_samples tests --- #[test] fn mark_cloud_only_marks_missing_blobs() { let db = setup_test_db(); let conn = db.conn(); // Insert two samples — neither has a file on disk insert_sample(conn, "aaa", "kick.wav", "wav"); insert_sample(conn, "bbb", "snare.wav", "wav"); // Use a temp dir as content_dir (empty — no blobs) let tmp = tempfile::tempdir().unwrap(); let marked = mark_cloud_only_samples(conn, tmp.path()).unwrap(); assert_eq!(marked, 2); // Verify both are now cloud_only=1 let count: i64 = conn.query_row( "SELECT COUNT(*) FROM samples WHERE cloud_only = 1", [], |r| r.get(0), ).unwrap(); assert_eq!(count, 2); } #[test] fn mark_cloud_only_skips_existing_blobs() { let db = setup_test_db(); let conn = db.conn(); insert_sample(conn, "aaa", "kick.wav", "wav"); insert_sample(conn, "bbb", "snare.wav", "wav"); // Create one blob file, leave other missing let tmp = tempfile::tempdir().unwrap(); std::fs::write(tmp.path().join("aaa.wav"), b"fake audio data").unwrap(); let marked = mark_cloud_only_samples(conn, tmp.path()).unwrap(); assert_eq!(marked, 1); // only bbb // aaa still cloud_only=0, bbb is cloud_only=1 let aaa_co: i32 = conn.query_row( "SELECT cloud_only FROM samples WHERE hash = 'aaa'", [], |r| r.get(0), ).unwrap(); assert_eq!(aaa_co, 0); let bbb_co: i32 = conn.query_row( "SELECT cloud_only FROM samples WHERE hash = 'bbb'", [], |r| r.get(0), ).unwrap(); assert_eq!(bbb_co, 1); } #[test] fn mark_cloud_only_suppresses_changelog() { let db = setup_test_db(); let conn = db.conn(); insert_sample(conn, "aaa", "kick.wav", "wav"); clear_changelog(conn); let tmp = tempfile::tempdir().unwrap(); mark_cloud_only_samples(conn, tmp.path()).unwrap(); // The UPDATE should not appear in changelog (applying_remote suppression) let count = changelog_count(conn, Some("samples"), Some("UPDATE")); assert_eq!(count, 0); } #[test] fn mark_cloud_only_idempotent() { let db = setup_test_db(); let conn = db.conn(); insert_sample(conn, "aaa", "kick.wav", "wav"); let tmp = tempfile::tempdir().unwrap(); // First call marks it assert_eq!(mark_cloud_only_samples(conn, tmp.path()).unwrap(), 1); // Second call: already cloud_only=1, query only selects cloud_only=0 assert_eq!(mark_cloud_only_samples(conn, tmp.path()).unwrap(), 0); } // ── Integration: multi-table snapshot ── #[test] fn snapshot_captures_tags_collections_and_members() { let db = setup_test_db(); let conn = db.conn(); // Suppress triggers during data setup set_sync_state(conn, "applying_remote", "1").unwrap(); insert_sample(conn, "s1", "kick.wav", "wav"); insert_sample(conn, "s2", "snare.wav", "wav"); // Tags conn.execute( "INSERT INTO tags (sample_hash, tag) VALUES ('s1', 'drums')", [], ).unwrap(); conn.execute( "INSERT INTO tags (sample_hash, tag) VALUES ('s2', 'perc')", [], ).unwrap(); // Collection let now = chrono::Utc::now().timestamp(); conn.execute( "INSERT INTO collections (name, description, created_at) VALUES ('Kit', NULL, ?1)", [now], ).unwrap(); let coll_id = conn.last_insert_rowid(); // Collection members conn.execute( "INSERT INTO collection_members (collection_id, sample_hash, added_at) VALUES (?1, 's1', ?2)", rusqlite::params![coll_id, now], ).unwrap(); set_sync_state(conn, "applying_remote", "0").unwrap(); clear_changelog(conn); let total = create_initial_snapshot(conn).unwrap(); // 2 samples + 2 tags + 1 collection + 1 collection_member // + 1 user_config seed (sample_tombstone_retain_days, from M019) = 7 assert_eq!(total, 7); assert_eq!(changelog_count(conn, Some("samples"), Some("INSERT")), 2); assert_eq!(changelog_count(conn, Some("tags"), Some("INSERT")), 2); assert_eq!(changelog_count(conn, Some("collections"), Some("INSERT")), 1); assert_eq!(changelog_count(conn, Some("collection_members"), Some("INSERT")), 1); } #[test] fn snapshot_changelog_entries_contain_valid_json() { let db = setup_test_db(); let conn = db.conn(); set_sync_state(conn, "applying_remote", "1").unwrap(); insert_sample(conn, "json_test", "pad.wav", "wav"); set_sync_state(conn, "applying_remote", "0").unwrap(); clear_changelog(conn); create_initial_snapshot(conn).unwrap(); let data: String = conn.query_row( "SELECT data FROM sync_changelog WHERE table_name = 'samples' AND row_id = 'json_test'", [], |row| row.get(0), ).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data).unwrap(); assert_eq!(parsed["hash"], "json_test"); assert_eq!(parsed["original_name"], "pad.wav"); assert_eq!(parsed["file_extension"], "wav"); assert!(parsed.get("file_size").is_some()); assert!(parsed.get("import_date").is_some()); // Columns added in migrations 008-009 must be present in snapshot assert!(parsed.get("cloud_only").is_some(), "snapshot missing cloud_only"); assert!(parsed.get("duration").is_some(), "snapshot missing duration"); } #[test] fn snapshot_samples_columns_match_whitelist() { let db = setup_test_db(); let conn = db.conn(); set_sync_state(conn, "applying_remote", "1").unwrap(); insert_sample(conn, "col_test", "check.wav", "wav"); set_sync_state(conn, "applying_remote", "0").unwrap(); clear_changelog(conn); create_initial_snapshot(conn).unwrap(); let data: String = conn.query_row( "SELECT data FROM sync_changelog WHERE table_name = 'samples' AND row_id = 'col_test'", [], |row| row.get(0), ).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data).unwrap(); let obj = parsed.as_object().unwrap(); // Every column in the whitelist must appear in the snapshot JSON for col in table_columns("samples").unwrap() { assert!(obj.contains_key(*col), "snapshot samples missing column: {}", col); } } #[test] fn snapshot_audio_analysis_columns_match_whitelist() { let db = setup_test_db(); let conn = db.conn(); set_sync_state(conn, "applying_remote", "1").unwrap(); insert_sample(conn, "aa_test", "tone.wav", "wav"); conn.execute( "INSERT INTO audio_analysis (hash, duration, sample_rate, channels, analyzed_at) VALUES ('aa_test', 1.5, 44100, 2, 1000000)", [], ).unwrap(); set_sync_state(conn, "applying_remote", "0").unwrap(); clear_changelog(conn); create_initial_snapshot(conn).unwrap(); let data: String = conn.query_row( "SELECT data FROM sync_changelog WHERE table_name = 'audio_analysis' AND row_id = 'aa_test'", [], |row| row.get(0), ).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data).unwrap(); let obj = parsed.as_object().unwrap(); for col in table_columns("audio_analysis").unwrap() { assert!(obj.contains_key(*col), "snapshot audio_analysis missing column: {}", col); } } #[test] fn loose_files_excluded_from_sync() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); // Insert loose_files — should NOT fire trigger conn.execute( "INSERT INTO user_config (key, value) VALUES ('loose_files', '1')", [], ).unwrap(); assert_eq!(changelog_count(conn, Some("user_config"), None), 0); // Update loose_files — should NOT fire trigger conn.execute( "UPDATE user_config SET value = '0' WHERE key = 'loose_files'", [], ).unwrap(); assert_eq!(changelog_count(conn, Some("user_config"), None), 0); // Normal key — should fire trigger conn.execute( "INSERT INTO user_config (key, value) VALUES ('theme', 'dark')", [], ).unwrap(); assert_eq!(changelog_count(conn, Some("user_config"), None), 1); } #[test] fn snapshot_after_adding_data_does_not_duplicate() { let db = setup_test_db(); let conn = db.conn(); set_sync_state(conn, "applying_remote", "1").unwrap(); insert_sample(conn, "first", "one.wav", "wav"); set_sync_state(conn, "applying_remote", "0").unwrap(); clear_changelog(conn); let first = create_initial_snapshot(conn).unwrap(); // 1 sample + 1 user_config seed (sample_tombstone_retain_days, M019) assert_eq!(first, 2); // Add more data after snapshot (these go through normal triggers) insert_sample(conn, "second", "two.wav", "wav"); // Second snapshot call should return 0 (flag already set) let second = create_initial_snapshot(conn).unwrap(); assert_eq!(second, 0); // Total changelog: 1 from snapshot + 1 from trigger assert_eq!(changelog_count(conn, Some("samples"), None), 2); } #[test] fn cleanup_preserves_recent_pushed_entries() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); // Entry pushed exactly 6 days ago (within 7-day window) conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \ VALUES ('samples', 'INSERT', 'six_days', datetime('now', '-6 days'), 1)", [], ).unwrap(); // Entry pushed exactly 8 days ago (outside 7-day window) conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \ VALUES ('samples', 'INSERT', 'eight_days', datetime('now', '-8 days'), 1)", [], ).unwrap(); let deleted = cleanup_changelog(conn).unwrap(); assert_eq!(deleted, 1); // six_days should remain, eight_days should be deleted let remaining_id: String = conn.query_row( "SELECT row_id FROM sync_changelog", [], |r| r.get(0), ).unwrap(); assert_eq!(remaining_id, "six_days"); } #[test] fn enforce_retention_at_exact_cap_is_noop() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); for i in 0..MAX_CHANGELOG_ENTRIES { conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ VALUES ('samples', 'UPDATE', ?1, '{}')", [format!("row-{}", i)], ).unwrap(); } let deleted = enforce_changelog_retention(conn).unwrap(); assert_eq!(deleted, 0); let count: i64 = conn.query_row( "SELECT COUNT(*) FROM sync_changelog", [], |r| r.get(0), ).unwrap(); assert_eq!(count, MAX_CHANGELOG_ENTRIES); } #[test] fn cleanup_and_retention_combined() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); // Insert many old pushed entries (should be cleaned by cleanup) for i in 0..100 { conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \ VALUES ('samples', 'UPDATE', ?1, datetime('now', '-14 days'), 1)", [format!("old-{}", i)], ).unwrap(); } // Insert entries at the retention cap to test combined behavior for i in 0..MAX_CHANGELOG_ENTRIES { conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ VALUES ('samples', 'INSERT', ?1, '{}')", [format!("new-{}", i)], ).unwrap(); } // cleanup removes old pushed entries let cleaned = cleanup_changelog(conn).unwrap(); assert_eq!(cleaned, 100); // retention should be a noop now (exactly MAX_CHANGELOG_ENTRIES remain) let retained = enforce_changelog_retention(conn).unwrap(); assert_eq!(retained, 0); } #[test] fn sync_state_get_and_set_roundtrip() { let db = setup_test_db(); let conn = db.conn(); set_sync_state(conn, "auto_sync_enabled", "1").unwrap(); assert_eq!(get_sync_state(conn, "auto_sync_enabled").unwrap(), "1"); set_sync_state(conn, "auto_sync_enabled", "0").unwrap(); assert_eq!(get_sync_state(conn, "auto_sync_enabled").unwrap(), "0"); } // ── Download query logic ── #[test] fn missing_blobs_query_finds_sync_enabled_only() { let db = setup_test_db(); let conn = db.conn(); // Create two VFS: one with sync_files=true, one without let vfs_sync = insert_vfs(conn, "Synced", true); let vfs_local = insert_vfs(conn, "Local", false); insert_sample(conn, "hash_a", "kick.wav", "wav"); insert_sample(conn, "hash_b", "snare.wav", "wav"); // Link hash_a to synced VFS, hash_b to local-only VFS let now = chrono::Utc::now().timestamp(); conn.execute( "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'kick.wav', 'sample', 'hash_a', ?2)", rusqlite::params![vfs_sync, now], ).unwrap(); conn.execute( "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'snare.wav', 'sample', 'hash_b', ?2)", rusqlite::params![vfs_local, now], ).unwrap(); // Run the same query as download_missing_blobs let mut stmt = conn.prepare( "SELECT DISTINCT s.hash, s.file_extension FROM samples s JOIN vfs_nodes vn ON vn.sample_hash = s.hash JOIN vfs v ON v.id = vn.vfs_id WHERE v.sync_files = 1", ).unwrap(); let rows: Vec<(String, String)> = stmt .query_map([], |row| Ok((row.get(0)?, row.get(1)?))) .unwrap() .collect::, _>>() .unwrap(); assert_eq!(rows.len(), 1); assert_eq!(rows[0].0, "hash_a"); } #[test] fn missing_blobs_query_deduplicates_multi_vfs() { let db = setup_test_db(); let conn = db.conn(); let vfs1 = insert_vfs(conn, "Lib1", true); let vfs2 = insert_vfs(conn, "Lib2", true); insert_sample(conn, "shared_hash", "shared.wav", "wav"); // Same sample linked in two synced VFS entries let now = chrono::Utc::now().timestamp(); conn.execute( "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'a.wav', 'sample', 'shared_hash', ?2)", rusqlite::params![vfs1, now], ).unwrap(); conn.execute( "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'b.wav', 'sample', 'shared_hash', ?2)", rusqlite::params![vfs2, now], ).unwrap(); let mut stmt = conn.prepare( "SELECT DISTINCT s.hash, s.file_extension FROM samples s JOIN vfs_nodes vn ON vn.sample_hash = s.hash JOIN vfs v ON v.id = vn.vfs_id WHERE v.sync_files = 1", ).unwrap(); let rows: Vec<(String, String)> = stmt .query_map([], |row| Ok((row.get(0)?, row.get(1)?))) .unwrap() .collect::, _>>() .unwrap(); // DISTINCT should yield exactly one row assert_eq!(rows.len(), 1); } // ── Upload query logic ── #[test] fn upload_pending_query_finds_non_cloud_only() { let db = setup_test_db(); let conn = db.conn(); let vfs = insert_vfs(conn, "Synced", true); insert_sample(conn, "local_hash", "kick.wav", "wav"); // Mark one sample as cloud_only conn.execute("UPDATE samples SET cloud_only = 1 WHERE hash = 'local_hash'", []).unwrap(); insert_sample(conn, "present_hash", "snare.wav", "wav"); let now = chrono::Utc::now().timestamp(); conn.execute( "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'kick.wav', 'sample', 'local_hash', ?2)", rusqlite::params![vfs, now], ).unwrap(); conn.execute( "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'snare.wav', 'sample', 'present_hash', ?2)", rusqlite::params![vfs, now], ).unwrap(); // Same query as upload_pending_blobs let mut stmt = conn.prepare( "SELECT DISTINCT s.hash, s.file_extension, s.file_size FROM samples s JOIN vfs_nodes vn ON vn.sample_hash = s.hash JOIN vfs v ON v.id = vn.vfs_id WHERE v.sync_files = 1 AND s.cloud_only = 0", ).unwrap(); let rows: Vec<(String, String, i64)> = stmt .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?))) .unwrap() .collect::, _>>() .unwrap(); // Only present_hash (cloud_only=0) assert_eq!(rows.len(), 1); assert_eq!(rows[0].0, "present_hash"); } #[test] fn push_changelog_reads_unpushed_in_order() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); // Insert changelog entries with specific order for i in 0..5 { conn.execute( "INSERT INTO sync_changelog (table_name, op, row_id, data, pushed) VALUES ('samples', 'INSERT', ?1, '{}', 0)", [format!("push-{}", i)], ).unwrap(); } // Mark one as already pushed conn.execute( "UPDATE sync_changelog SET pushed = 1 WHERE row_id = 'push-2'", [], ).unwrap(); // Same query as push_changes let mut stmt = conn.prepare( "SELECT id, table_name, op, row_id, timestamp, data FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC LIMIT ?1", ).unwrap(); let rows: Vec<(i64, String, String, String)> = stmt .query_map([500i64], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))) .unwrap() .collect::, _>>() .unwrap(); // Should skip push-2 (already pushed) assert_eq!(rows.len(), 4); let row_ids: Vec<&str> = rows.iter().map(|r| r.3.as_str()).collect(); assert!(!row_ids.contains(&"push-2")); // Should be in ASC order assert_eq!(row_ids[0], "push-0"); assert_eq!(row_ids[3], "push-4"); } // ── Resolve: mixed upsert/delete ordering ── #[test] fn apply_remote_changes_mixed_ops_correct_order() { let db = setup_test_db(); let conn = db.conn(); clear_changelog(conn); // Insert a sample first let changes_insert = vec![ change("samples", ChangeOp::Insert, "mix_hash", Some(json!({ "hash": "mix_hash", "original_name": "mixed.wav", "file_extension": "wav", "file_size": 1024, "import_date": 1000000, "last_modified": 1000000, "cloud_only": 0 }))), change("tags", ChangeOp::Insert, "mix_hash:bass", Some(json!({ "sample_hash": "mix_hash", "tag": "bass" }))), ]; apply_remote_changes(conn, &changes_insert).unwrap(); // Now delete tag then sample (mixed batch) let changes_delete = vec![ change("tags", ChangeOp::Delete, "mix_hash:bass", None), change("samples", ChangeOp::Delete, "mix_hash", None), ]; let applied = apply_remote_changes(conn, &changes_delete).unwrap(); assert_eq!(applied, 2); let sample_count: i64 = conn.query_row( "SELECT COUNT(*) FROM samples WHERE hash = 'mix_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(sample_count, 0); let tag_count: i64 = conn.query_row( "SELECT COUNT(*) FROM tags WHERE sample_hash = 'mix_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(tag_count, 0); } #[test] fn apply_upsert_updates_existing_row() { let db = setup_test_db(); let conn = db.conn(); // Insert initial sample apply_upsert(conn, "samples", &json!({ "hash": "upd_hash", "original_name": "old.wav", "file_extension": "wav", "file_size": 1024, "import_date": 1000000, "last_modified": 1000000, "cloud_only": 0 })).unwrap(); // Update via upsert (ON CONFLICT DO UPDATE) apply_upsert(conn, "samples", &json!({ "hash": "upd_hash", "original_name": "new.wav", "file_extension": "wav", "file_size": 2048, "import_date": 1000000, "last_modified": 2000000, "cloud_only": 0 })).unwrap(); let name: String = conn.query_row( "SELECT original_name FROM samples WHERE hash = 'upd_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(name, "new.wav"); let size: i64 = conn.query_row( "SELECT file_size FROM samples WHERE hash = 'upd_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(size, 2048); // Should still be only 1 row let count: i64 = conn.query_row( "SELECT COUNT(*) FROM samples WHERE hash = 'upd_hash'", [], |row| row.get(0), ).unwrap(); assert_eq!(count, 1); } #[test] fn apply_delete_nonexistent_is_noop() { let db = setup_test_db(); let conn = db.conn(); // Delete a hash that doesn't exist — should succeed (no-op) let result = apply_delete(conn, "samples", "nonexistent_hash", None); assert!(result.is_ok()); } }