Skip to main content

max / audiofiles

47.8 KB · 1366 lines History Blame Raw
1 //! State tracking: snapshots, changelog maintenance, cloud-only marking.
2
3 use std::path::Path;
4
5 use rusqlite::Connection;
6
7 use tracing::instrument;
8
9 use crate::error::Result;
10
11 use super::{get_sync_state, set_sync_state, MAX_CHANGELOG_ENTRIES};
12
13 /// Create initial snapshot: insert all existing rows into sync_changelog.
14 #[instrument(skip_all)]
15 pub fn create_initial_snapshot(conn: &Connection) -> Result<i64> {
16 let done = get_sync_state(conn, "initial_snapshot_done")?;
17 if done == "1" {
18 return Ok(0);
19 }
20
21 let tx = conn.unchecked_transaction()?;
22
23 let mut total: i64 = 0;
24
25 let table_queries: &[(&str, &str)] = &[
26 ("samples", "SELECT hash, json_object('hash', hash, 'original_name', original_name, 'file_extension', file_extension, 'file_size', file_size, 'import_date', import_date, 'last_modified', last_modified, 'cloud_only', cloud_only, 'duration', duration) FROM samples"),
27 ("audio_analysis", "SELECT hash, json_object('hash', hash, 'bpm', bpm, 'musical_key', musical_key, 'duration', duration, 'sample_rate', sample_rate, 'channels', channels, 'peak_db', peak_db, 'rms_db', rms_db, 'is_loop', is_loop, 'spectral_centroid', spectral_centroid, 'onset_strength', onset_strength, 'analyzed_at', analyzed_at, 'lufs', lufs, 'spectral_flatness', spectral_flatness, 'spectral_rolloff', spectral_rolloff, 'zero_crossing_rate', zero_crossing_rate, 'classification', classification, 'spectral_bandwidth', spectral_bandwidth, 'centroid_variance', centroid_variance, 'crest_factor', crest_factor, 'attack_time', attack_time, 'classification_confidence', classification_confidence) FROM audio_analysis"),
28 ("vfs", "SELECT CAST(id AS TEXT), json_object('id', id, 'name', name, 'created_at', created_at, 'modified_at', modified_at, 'sync_files', sync_files) FROM vfs"),
29 ("vfs_nodes", "SELECT CAST(id AS TEXT), json_object('id', id, 'vfs_id', vfs_id, 'parent_id', parent_id, 'name', name, 'node_type', node_type, 'sample_hash', sample_hash, 'created_at', created_at) FROM vfs_nodes"),
30 ("tags", "SELECT sample_hash || ':' || tag, json_object('sample_hash', sample_hash, 'tag', tag) FROM tags"),
31 ("collections", "SELECT CAST(id AS TEXT), json_object('id', id, 'name', name, 'description', description, 'created_at', created_at, 'filter_json', filter_json) FROM collections"),
32 ("collection_members", "SELECT CAST(collection_id AS TEXT) || ':' || sample_hash, json_object('collection_id', collection_id, 'sample_hash', sample_hash, 'added_at', added_at) FROM collection_members"),
33 ("user_config", "SELECT key, json_object('key', key, 'value', value) FROM user_config WHERE key NOT LIKE 'sync_%' AND key != 'loose_files'"),
34 ("edit_history", "SELECT CAST(id AS TEXT), json_object('id', id, 'source_hash', source_hash, 'result_hash', result_hash, 'operation', operation, 'params_json', params_json, 'created_at', created_at) FROM edit_history"),
35 ];
36
37 for (table, query) in table_queries {
38 let mut stmt = tx.prepare(query)?;
39 let rows: Vec<(String, String)> = stmt
40 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
41 .collect::<std::result::Result<Vec<_>, _>>()?;
42
43 for (row_id, data) in &rows {
44 tx.execute(
45 "INSERT INTO sync_changelog (table_name, op, row_id, data) VALUES (?1, 'INSERT', ?2, ?3)",
46 rusqlite::params![table, row_id, data],
47 )?;
48 }
49 total += rows.len() as i64;
50 }
51
52 set_sync_state(&tx, "initial_snapshot_done", "1")?;
53 tx.commit()?;
54 Ok(total)
55 }
56
57 /// Delete pushed changelog entries older than 7 days.
58 #[instrument(skip_all)]
59 pub fn cleanup_changelog(conn: &Connection) -> Result<i64> {
60 let deleted = conn.execute(
61 "DELETE FROM sync_changelog WHERE pushed = 1 AND timestamp < datetime('now', '-7 days')",
62 [],
63 )?;
64 Ok(deleted as i64)
65 }
66
67 /// Enforce a hard cap on total changelog entries to prevent unbounded growth
68 /// when sync is disconnected or failing. Deletes the oldest entries (by rowid)
69 /// to bring the count back under `MAX_CHANGELOG_ENTRIES`.
70 #[instrument(skip_all)]
71 pub fn enforce_changelog_retention(conn: &Connection) -> Result<i64> {
72 let count: i64 = conn.query_row("SELECT COUNT(*) FROM sync_changelog", [], |r| r.get(0))?;
73
74 if count <= MAX_CHANGELOG_ENTRIES {
75 return Ok(0);
76 }
77
78 let excess = count - MAX_CHANGELOG_ENTRIES;
79
80 // Prefer deleting already-pushed entries first to avoid losing unsynced changes.
81 let deleted_pushed = conn.execute(
82 "DELETE FROM sync_changelog WHERE id IN \
83 (SELECT id FROM sync_changelog WHERE pushed = 1 ORDER BY id ASC LIMIT ?1)",
84 [excess],
85 )?;
86
87 let mut total_deleted = deleted_pushed as i64;
88
89 // If still over cap, reluctantly delete unpushed entries (oldest first).
90 if total_deleted < excess {
91 let remaining = excess - total_deleted;
92 let deleted_unpushed = conn.execute(
93 "DELETE FROM sync_changelog WHERE id IN \
94 (SELECT id FROM sync_changelog ORDER BY id ASC LIMIT ?1)",
95 [remaining],
96 )?;
97 if deleted_unpushed > 0 {
98 tracing::warn!(
99 deleted_unpushed,
100 "Changelog retention: dropped unpushed entries — sync was offline too long"
101 );
102 }
103 total_deleted += deleted_unpushed as i64;
104 }
105
106 tracing::warn!(
107 deleted = total_deleted,
108 total = count,
109 limit = MAX_CHANGELOG_ENTRIES,
110 "Changelog retention cap enforced"
111 );
112 Ok(total_deleted)
113 }
114
115 /// Mark samples as cloud_only when their blob doesn't exist on disk.
116 ///
117 /// After pulling remote changes, a sample row may be created with cloud_only=0
118 /// (as it was on the origin device). If the local content directory doesn't have
119 /// the blob file, we correct the flag to cloud_only=1. This runs with
120 /// applying_remote=1 to suppress changelog triggers.
121 #[instrument(skip_all)]
122 pub fn mark_cloud_only_samples(conn: &Connection, content_dir: &Path) -> Result<i64> {
123 let mut stmt = conn.prepare(
124 "SELECT hash, file_extension FROM samples WHERE cloud_only = 0",
125 )?;
126 let rows: Vec<(String, String)> = stmt
127 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
128 .collect::<std::result::Result<Vec<_>, _>>()?;
129
130 // Collect hashes missing on disk before taking a transaction.
131 let missing: Vec<&str> = rows
132 .iter()
133 .filter(|(hash, ext)| !content_dir.join(format!("{}.{}", hash, ext)).exists())
134 .map(|(hash, _)| hash.as_str())
135 .collect();
136
137 if missing.is_empty() {
138 return Ok(0);
139 }
140
141 conn.execute_batch("BEGIN IMMEDIATE")?;
142 conn.execute(
143 "UPDATE sync_state SET value = '1' WHERE key = 'applying_remote'",
144 [],
145 )?;
146 let result = (|| -> Result<i64> {
147 let mut marked = 0i64;
148 for hash in &missing {
149 conn.execute(
150 "UPDATE samples SET cloud_only = 1 WHERE hash = ?1",
151 [hash],
152 )?;
153 marked += 1;
154 }
155 Ok(marked)
156 })();
157 conn.execute(
158 "UPDATE sync_state SET value = '0' WHERE key = 'applying_remote'",
159 [],
160 )?;
161 match result {
162 Ok(marked) => {
163 conn.execute_batch("COMMIT")?;
164 Ok(marked)
165 }
166 Err(e) => {
167 let _ = conn.execute_batch("ROLLBACK");
168 Err(e)
169 }
170 }
171 }
172
173 #[cfg(test)]
174 mod tests {
175 use super::*;
176 use super::super::{
177 UPSERT_ORDER, DELETE_ORDER, table_columns, pk_columns,
178 resolve::{apply_upsert, apply_delete, apply_remote_changes},
179 };
180 use audiofiles_core::db::Database;
181 use serde_json::json;
182 use synckit_client::{ChangeEntry, ChangeOp};
183
184 fn setup_test_db() -> Database {
185 Database::open_in_memory().expect("Failed to create test DB")
186 }
187
188 fn insert_sample(conn: &Connection, hash: &str, name: &str, ext: &str) {
189 let now = chrono::Utc::now().timestamp();
190 conn.execute(
191 "INSERT INTO samples (hash, original_name, file_extension, file_size, import_date, last_modified) VALUES (?1, ?2, ?3, 1024, ?4, ?4)",
192 rusqlite::params![hash, name, ext, now],
193 ).unwrap();
194 }
195
196 fn insert_vfs(conn: &Connection, name: &str, sync_files: bool) -> i64 {
197 let now = chrono::Utc::now().timestamp();
198 conn.execute(
199 "INSERT INTO vfs (name, created_at, modified_at, sync_files) VALUES (?1, ?2, ?2, ?3)",
200 rusqlite::params![name, now, sync_files as i64],
201 ).unwrap();
202 conn.last_insert_rowid()
203 }
204
205 fn clear_changelog(conn: &Connection) {
206 conn.execute("DELETE FROM sync_changelog", []).unwrap();
207 }
208
209 fn changelog_count(conn: &Connection, table: Option<&str>, op: Option<&str>) -> i64 {
210 match (table, op) {
211 (Some(t), Some(o)) => conn.query_row(
212 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = ?1 AND op = ?2",
213 rusqlite::params![t, o],
214 |row| row.get(0),
215 ).unwrap(),
216 (Some(t), None) => conn.query_row(
217 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = ?1",
218 [t],
219 |row| row.get(0),
220 ).unwrap(),
221 (None, Some(o)) => conn.query_row(
222 "SELECT COUNT(*) FROM sync_changelog WHERE op = ?1",
223 [o],
224 |row| row.get(0),
225 ).unwrap(),
226 (None, None) => conn.query_row(
227 "SELECT COUNT(*) FROM sync_changelog",
228 [],
229 |row| row.get(0),
230 ).unwrap(),
231 }
232 }
233
234 fn change(table: &str, op: ChangeOp, row_id: &str, data: Option<serde_json::Value>) -> ChangeEntry {
235 ChangeEntry {
236 table: table.to_string(),
237 op,
238 row_id: row_id.to_string(),
239 timestamp: chrono::Utc::now(),
240 data,
241 }
242 }
243
244 // ── FK ordering ──
245
246 #[test]
247 fn upsert_order_parents_before_children() {
248 let pos = |t: &str| UPSERT_ORDER.iter().position(|x| *x == t).unwrap();
249 assert!(pos("vfs") < pos("vfs_nodes"));
250 assert!(pos("samples") < pos("audio_analysis"));
251 assert!(pos("samples") < pos("tags"));
252 assert!(pos("samples") < pos("collection_members"));
253 assert!(pos("collections") < pos("collection_members"));
254 }
255
256 #[test]
257 fn delete_order_children_before_parents() {
258 let pos = |t: &str| DELETE_ORDER.iter().position(|x| *x == t).unwrap();
259 assert!(pos("vfs_nodes") < pos("vfs"));
260 assert!(pos("audio_analysis") < pos("samples"));
261 assert!(pos("tags") < pos("samples"));
262 assert!(pos("collection_members") < pos("collections"));
263 assert!(pos("collection_members") < pos("samples"));
264 }
265
266 #[test]
267 fn orders_are_exact_reverses() {
268 let reversed: Vec<&str> = UPSERT_ORDER.iter().rev().copied().collect();
269 assert_eq!(reversed, DELETE_ORDER);
270 }
271
272 // ── Column whitelists ──
273
274 #[test]
275 fn all_tables_have_column_whitelists() {
276 for table in UPSERT_ORDER {
277 assert!(
278 table_columns(table).is_some(),
279 "missing column whitelist for: {}", table
280 );
281 }
282 }
283
284 #[test]
285 fn unknown_table_returns_none() {
286 assert!(table_columns("nonexistent").is_none());
287 assert!(table_columns("fingerprints").is_none());
288 }
289
290 #[test]
291 fn pk_columns_covers_all_tables() {
292 for table in UPSERT_ORDER {
293 let pks = pk_columns(table);
294 assert!(!pks.is_empty(), "missing pk_columns for: {}", table);
295 }
296 assert_eq!(pk_columns("tags"), &["sample_hash", "tag"]);
297 assert_eq!(pk_columns("collection_members"), &["collection_id", "sample_hash"]);
298 }
299
300 // ── Triggers ──
301
302 #[test]
303 fn sample_insert_fires_trigger() {
304 let db = setup_test_db();
305 let conn = db.conn();
306 clear_changelog(conn);
307
308 insert_sample(conn, "abc123", "kick.wav", "wav");
309
310 assert_eq!(changelog_count(conn, Some("samples"), Some("INSERT")), 1);
311
312 // Post-M018: row_id is `hash_row_id(salt, "abc123")` so we look up
313 // by table+op and inspect the canonical hash inside the encrypted-
314 // at-the-wire `data` field.
315 let data: String = conn.query_row(
316 "SELECT data FROM sync_changelog WHERE table_name = 'samples' AND op = 'INSERT'",
317 [],
318 |row| row.get(0),
319 ).unwrap();
320 let parsed: serde_json::Value = serde_json::from_str(&data).unwrap();
321 assert!(parsed.get("cloud_only").is_some());
322 assert_eq!(parsed["hash"], "abc123");
323 }
324
325 #[test]
326 fn vfs_insert_fires_trigger() {
327 let db = setup_test_db();
328 let conn = db.conn();
329 clear_changelog(conn);
330
331 let vfs_id = insert_vfs(conn, "Library", true);
332
333 assert_eq!(changelog_count(conn, Some("vfs"), Some("INSERT")), 1);
334
335 let row_id: String = conn.query_row(
336 "SELECT row_id FROM sync_changelog WHERE table_name = 'vfs'",
337 [],
338 |row| row.get(0),
339 ).unwrap();
340 assert_eq!(row_id, vfs_id.to_string());
341 }
342
343 #[test]
344 fn tag_insert_fires_trigger() {
345 let db = setup_test_db();
346 let conn = db.conn();
347 insert_sample(conn, "hash1", "snare.wav", "wav");
348 clear_changelog(conn);
349
350 conn.execute(
351 "INSERT INTO tags (sample_hash, tag) VALUES ('hash1', 'drums')",
352 [],
353 ).unwrap();
354
355 assert_eq!(changelog_count(conn, Some("tags"), Some("INSERT")), 1);
356
357 // Post-M018: row_id is hashed; the cleartext key lives in `data`.
358 let data: String = conn.query_row(
359 "SELECT data FROM sync_changelog WHERE table_name = 'tags'",
360 [],
361 |row| row.get(0),
362 ).unwrap();
363 let parsed: serde_json::Value = serde_json::from_str(&data).unwrap();
364 assert_eq!(parsed["sample_hash"], "hash1");
365 assert_eq!(parsed["tag"], "drums");
366 }
367
368 #[test]
369 fn collection_member_insert_fires_trigger() {
370 let db = setup_test_db();
371 let conn = db.conn();
372 insert_sample(conn, "hash2", "hat.wav", "wav");
373 let now = chrono::Utc::now().timestamp();
374 conn.execute(
375 "INSERT INTO collections (name, description, created_at) VALUES ('Faves', NULL, ?1)",
376 [now],
377 ).unwrap();
378 let collection_id = conn.last_insert_rowid();
379 clear_changelog(conn);
380
381 conn.execute(
382 "INSERT INTO collection_members (collection_id, sample_hash, added_at) VALUES (?1, 'hash2', ?2)",
383 rusqlite::params![collection_id, now],
384 ).unwrap();
385
386 assert_eq!(changelog_count(conn, Some("collection_members"), Some("INSERT")), 1);
387
388 let data: String = conn.query_row(
389 "SELECT data FROM sync_changelog WHERE table_name = 'collection_members'",
390 [],
391 |row| row.get(0),
392 ).unwrap();
393 let parsed: serde_json::Value = serde_json::from_str(&data).unwrap();
394 assert_eq!(parsed["collection_id"].as_i64().unwrap(), collection_id);
395 assert_eq!(parsed["sample_hash"], "hash2");
396 }
397
398 // ── Trigger suppression ──
399
400 #[test]
401 fn trigger_suppression_during_remote_apply() {
402 let db = setup_test_db();
403 let conn = db.conn();
404 clear_changelog(conn);
405
406 set_sync_state(conn, "applying_remote", "1").unwrap();
407 insert_sample(conn, "suppressed", "test.wav", "wav");
408 assert_eq!(changelog_count(conn, None, None), 0);
409
410 set_sync_state(conn, "applying_remote", "0").unwrap();
411 insert_sample(conn, "unsuppressed", "test2.wav", "wav");
412 assert_eq!(changelog_count(conn, None, None), 1);
413 }
414
415 // ── apply_upsert ──
416
417 #[test]
418 fn apply_upsert_inserts_sample() {
419 let db = setup_test_db();
420 let conn = db.conn();
421
422 let data = json!({
423 "hash": "upsert_hash",
424 "original_name": "synced.wav",
425 "file_extension": "wav",
426 "file_size": 2048,
427 "import_date": 1000000,
428 "last_modified": 1000000,
429 "cloud_only": 0
430 });
431
432 apply_upsert(conn, "samples", &data).unwrap();
433
434 let name: String = conn.query_row(
435 "SELECT original_name FROM samples WHERE hash = 'upsert_hash'",
436 [],
437 |row| row.get(0),
438 ).unwrap();
439 assert_eq!(name, "synced.wav");
440 }
441
442 #[test]
443 fn apply_upsert_inserts_vfs_node() {
444 let db = setup_test_db();
445 let conn = db.conn();
446
447 let vfs_id = insert_vfs(conn, "TestVFS", false);
448 insert_sample(conn, "node_hash", "pad.wav", "wav");
449
450 let data = json!({
451 "id": 999,
452 "vfs_id": vfs_id,
453 "parent_id": null,
454 "name": "pad.wav",
455 "node_type": "sample",
456 "sample_hash": "node_hash",
457 "created_at": 1000000
458 });
459
460 apply_upsert(conn, "vfs_nodes", &data).unwrap();
461
462 let name: String = conn.query_row(
463 "SELECT name FROM vfs_nodes WHERE id = 999",
464 [],
465 |row| row.get(0),
466 ).unwrap();
467 assert_eq!(name, "pad.wav");
468 }
469
470 #[test]
471 fn apply_upsert_unknown_table_is_no_op() {
472 let db = setup_test_db();
473 let conn = db.conn();
474
475 let data = json!({"id": "abc"});
476 let result = apply_upsert(conn, "nonexistent_table", &data);
477 assert!(result.is_ok());
478 }
479
480 // ── apply_delete ──
481
482 #[test]
483 fn apply_delete_removes_sample() {
484 let db = setup_test_db();
485 let conn = db.conn();
486 insert_sample(conn, "del_hash", "delete_me.wav", "wav");
487
488 apply_delete(conn, "samples", "del_hash", None).unwrap();
489
490 let count: i64 = conn.query_row(
491 "SELECT COUNT(*) FROM samples WHERE hash = 'del_hash'",
492 [],
493 |row| row.get(0),
494 ).unwrap();
495 assert_eq!(count, 0);
496 }
497
498 #[test]
499 fn apply_delete_composite_pk_tags() {
500 let db = setup_test_db();
501 let conn = db.conn();
502 insert_sample(conn, "tag_hash", "tagged.wav", "wav");
503 conn.execute(
504 "INSERT INTO tags (sample_hash, tag) VALUES ('tag_hash', 'bass')",
505 [],
506 ).unwrap();
507
508 apply_delete(conn, "tags", "tag_hash:bass", None).unwrap();
509
510 let count: i64 = conn.query_row(
511 "SELECT COUNT(*) FROM tags WHERE sample_hash = 'tag_hash' AND tag = 'bass'",
512 [],
513 |row| row.get(0),
514 ).unwrap();
515 assert_eq!(count, 0);
516 }
517
518 // ── Full pipeline ──
519
520 #[test]
521 fn apply_remote_changes_full_pipeline() {
522 let db = setup_test_db();
523 let conn = db.conn();
524 clear_changelog(conn);
525
526 // Changes in wrong FK order — apply_remote_changes reorders via UPSERT_ORDER
527 let changes = vec![
528 change("vfs_nodes", ChangeOp::Insert, "1", Some(json!({
529 "id": 1, "vfs_id": 1, "parent_id": null,
530 "name": "kick.wav", "node_type": "sample",
531 "sample_hash": "pipe_hash", "created_at": 1000000
532 }))),
533 change("vfs", ChangeOp::Insert, "1", Some(json!({
534 "id": 1, "name": "Library", "created_at": 1000000,
535 "modified_at": 1000000, "sync_files": 1
536 }))),
537 change("samples", ChangeOp::Insert, "pipe_hash", Some(json!({
538 "hash": "pipe_hash", "original_name": "kick.wav",
539 "file_extension": "wav", "file_size": 4096,
540 "import_date": 1000000, "last_modified": 1000000,
541 "cloud_only": 0
542 }))),
543 ];
544
545 let applied = apply_remote_changes(conn, &changes).unwrap();
546 assert_eq!(applied, 3);
547
548 let sample_count: i64 = conn.query_row(
549 "SELECT COUNT(*) FROM samples WHERE hash = 'pipe_hash'",
550 [], |row| row.get(0),
551 ).unwrap();
552 assert_eq!(sample_count, 1);
553
554 let vfs_count: i64 = conn.query_row(
555 "SELECT COUNT(*) FROM vfs WHERE name = 'Library'",
556 [], |row| row.get(0),
557 ).unwrap();
558 assert_eq!(vfs_count, 1);
559
560 let node_count: i64 = conn.query_row(
561 "SELECT COUNT(*) FROM vfs_nodes WHERE sample_hash = 'pipe_hash'",
562 [], |row| row.get(0),
563 ).unwrap();
564 assert_eq!(node_count, 1);
565
566 // Changelog should be empty (triggers were suppressed)
567 assert_eq!(changelog_count(conn, None, None), 0);
568 }
569
570 // ── Initial snapshot ──
571
572 #[test]
573 fn create_initial_snapshot_captures_all_rows() {
574 let db = setup_test_db();
575 let conn = db.conn();
576
577 // Suppress triggers during data setup
578 set_sync_state(conn, "applying_remote", "1").unwrap();
579 insert_sample(conn, "snap1", "one.wav", "wav");
580 insert_sample(conn, "snap2", "two.wav", "wav");
581 insert_vfs(conn, "TestLib", false);
582 set_sync_state(conn, "applying_remote", "0").unwrap();
583
584 clear_changelog(conn);
585
586 let total = create_initial_snapshot(conn).unwrap();
587 // 2 samples + 1 vfs + 1 user_config seed
588 // (`sample_tombstone_retain_days = 30`, seeded by M019).
589 assert_eq!(total, 4);
590 }
591
592 #[test]
593 fn create_initial_snapshot_idempotent() {
594 let db = setup_test_db();
595 let conn = db.conn();
596
597 set_sync_state(conn, "applying_remote", "1").unwrap();
598 insert_sample(conn, "idem", "test.wav", "wav");
599 set_sync_state(conn, "applying_remote", "0").unwrap();
600 clear_changelog(conn);
601
602 let first = create_initial_snapshot(conn).unwrap();
603 assert!(first > 0);
604
605 let second = create_initial_snapshot(conn).unwrap();
606 assert_eq!(second, 0);
607 }
608
609 // ── Changelog helpers ──
610
611 #[test]
612 fn cleanup_removes_old_pushed() {
613 let db = setup_test_db();
614 let conn = db.conn();
615 clear_changelog(conn);
616
617 // Old pushed — should be deleted
618 conn.execute(
619 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \
620 VALUES ('samples', 'INSERT', 'old', datetime('now', '-10 days'), 1)",
621 [],
622 ).unwrap();
623
624 // Recent pushed — should remain
625 conn.execute(
626 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \
627 VALUES ('samples', 'INSERT', 'recent', datetime('now'), 1)",
628 [],
629 ).unwrap();
630
631 // Old unpushed — should remain (never delete unpushed)
632 conn.execute(
633 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \
634 VALUES ('samples', 'INSERT', 'unpushed', datetime('now', '-10 days'), 0)",
635 [],
636 ).unwrap();
637
638 let deleted = cleanup_changelog(conn).unwrap();
639 assert_eq!(deleted, 1);
640
641 let remaining: i64 = conn.query_row(
642 "SELECT COUNT(*) FROM sync_changelog",
643 [], |row| row.get(0),
644 ).unwrap();
645 assert_eq!(remaining, 2);
646 }
647
648 #[test]
649 fn enforce_retention_caps_entries() {
650 let db = setup_test_db();
651 let conn = db.conn();
652 clear_changelog(conn);
653
654 let total = MAX_CHANGELOG_ENTRIES + 500;
655 for i in 0..total {
656 conn.execute(
657 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
658 VALUES ('samples', 'UPDATE', ?1, '{}')",
659 [format!("row-{}", i)],
660 )
661 .unwrap();
662 }
663
664 let before: i64 =
665 conn.query_row("SELECT COUNT(*) FROM sync_changelog", [], |r| r.get(0)).unwrap();
666 assert_eq!(before, total);
667
668 let deleted = enforce_changelog_retention(conn).unwrap();
669 assert_eq!(deleted, 500);
670
671 let after: i64 =
672 conn.query_row("SELECT COUNT(*) FROM sync_changelog", [], |r| r.get(0)).unwrap();
673 assert_eq!(after, MAX_CHANGELOG_ENTRIES);
674
675 // Oldest entries removed — the remaining start at row-500
676 let min_row_id: String = conn
677 .query_row(
678 "SELECT row_id FROM sync_changelog ORDER BY id ASC LIMIT 1",
679 [],
680 |r| r.get(0),
681 )
682 .unwrap();
683 assert_eq!(min_row_id, "row-500");
684 }
685
686 #[test]
687 fn enforce_retention_noop_under_cap() {
688 let db = setup_test_db();
689 let conn = db.conn();
690 clear_changelog(conn);
691
692 for i in 0..5 {
693 conn.execute(
694 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
695 VALUES ('samples', 'INSERT', ?1, '{}')",
696 [format!("row-{}", i)],
697 )
698 .unwrap();
699 }
700
701 let deleted = enforce_changelog_retention(conn).unwrap();
702 assert_eq!(deleted, 0);
703 }
704
705 #[test]
706 fn count_pending_counts_unpushed() {
707 let db = setup_test_db();
708 let conn = db.conn();
709 clear_changelog(conn);
710
711 // 3 unpushed
712 for i in 0..3 {
713 conn.execute(
714 "INSERT INTO sync_changelog (table_name, op, row_id, pushed) VALUES ('samples', 'INSERT', ?1, 0)",
715 [format!("un-{}", i)],
716 ).unwrap();
717 }
718
719 // 2 pushed
720 for i in 0..2 {
721 conn.execute(
722 "INSERT INTO sync_changelog (table_name, op, row_id, pushed) VALUES ('samples', 'INSERT', ?1, 1)",
723 [format!("push-{}", i)],
724 ).unwrap();
725 }
726
727 let count = super::super::count_pending_changes(conn).unwrap();
728 assert_eq!(count, 3);
729 }
730
731 // --- mark_cloud_only_samples tests ---
732
733 #[test]
734 fn mark_cloud_only_marks_missing_blobs() {
735 let db = setup_test_db();
736 let conn = db.conn();
737
738 // Insert two samples — neither has a file on disk
739 insert_sample(conn, "aaa", "kick.wav", "wav");
740 insert_sample(conn, "bbb", "snare.wav", "wav");
741
742 // Use a temp dir as content_dir (empty — no blobs)
743 let tmp = tempfile::tempdir().unwrap();
744
745 let marked = mark_cloud_only_samples(conn, tmp.path()).unwrap();
746 assert_eq!(marked, 2);
747
748 // Verify both are now cloud_only=1
749 let count: i64 = conn.query_row(
750 "SELECT COUNT(*) FROM samples WHERE cloud_only = 1",
751 [],
752 |r| r.get(0),
753 ).unwrap();
754 assert_eq!(count, 2);
755 }
756
757 #[test]
758 fn mark_cloud_only_skips_existing_blobs() {
759 let db = setup_test_db();
760 let conn = db.conn();
761
762 insert_sample(conn, "aaa", "kick.wav", "wav");
763 insert_sample(conn, "bbb", "snare.wav", "wav");
764
765 // Create one blob file, leave other missing
766 let tmp = tempfile::tempdir().unwrap();
767 std::fs::write(tmp.path().join("aaa.wav"), b"fake audio data").unwrap();
768
769 let marked = mark_cloud_only_samples(conn, tmp.path()).unwrap();
770 assert_eq!(marked, 1); // only bbb
771
772 // aaa still cloud_only=0, bbb is cloud_only=1
773 let aaa_co: i32 = conn.query_row(
774 "SELECT cloud_only FROM samples WHERE hash = 'aaa'",
775 [],
776 |r| r.get(0),
777 ).unwrap();
778 assert_eq!(aaa_co, 0);
779
780 let bbb_co: i32 = conn.query_row(
781 "SELECT cloud_only FROM samples WHERE hash = 'bbb'",
782 [],
783 |r| r.get(0),
784 ).unwrap();
785 assert_eq!(bbb_co, 1);
786 }
787
788 #[test]
789 fn mark_cloud_only_suppresses_changelog() {
790 let db = setup_test_db();
791 let conn = db.conn();
792
793 insert_sample(conn, "aaa", "kick.wav", "wav");
794 clear_changelog(conn);
795
796 let tmp = tempfile::tempdir().unwrap();
797 mark_cloud_only_samples(conn, tmp.path()).unwrap();
798
799 // The UPDATE should not appear in changelog (applying_remote suppression)
800 let count = changelog_count(conn, Some("samples"), Some("UPDATE"));
801 assert_eq!(count, 0);
802 }
803
804 #[test]
805 fn mark_cloud_only_idempotent() {
806 let db = setup_test_db();
807 let conn = db.conn();
808
809 insert_sample(conn, "aaa", "kick.wav", "wav");
810 let tmp = tempfile::tempdir().unwrap();
811
812 // First call marks it
813 assert_eq!(mark_cloud_only_samples(conn, tmp.path()).unwrap(), 1);
814 // Second call: already cloud_only=1, query only selects cloud_only=0
815 assert_eq!(mark_cloud_only_samples(conn, tmp.path()).unwrap(), 0);
816 }
817
818 // ── Integration: multi-table snapshot ──
819
820 #[test]
821 fn snapshot_captures_tags_collections_and_members() {
822 let db = setup_test_db();
823 let conn = db.conn();
824
825 // Suppress triggers during data setup
826 set_sync_state(conn, "applying_remote", "1").unwrap();
827
828 insert_sample(conn, "s1", "kick.wav", "wav");
829 insert_sample(conn, "s2", "snare.wav", "wav");
830
831 // Tags
832 conn.execute(
833 "INSERT INTO tags (sample_hash, tag) VALUES ('s1', 'drums')",
834 [],
835 ).unwrap();
836 conn.execute(
837 "INSERT INTO tags (sample_hash, tag) VALUES ('s2', 'perc')",
838 [],
839 ).unwrap();
840
841 // Collection
842 let now = chrono::Utc::now().timestamp();
843 conn.execute(
844 "INSERT INTO collections (name, description, created_at) VALUES ('Kit', NULL, ?1)",
845 [now],
846 ).unwrap();
847 let coll_id = conn.last_insert_rowid();
848
849 // Collection members
850 conn.execute(
851 "INSERT INTO collection_members (collection_id, sample_hash, added_at) VALUES (?1, 's1', ?2)",
852 rusqlite::params![coll_id, now],
853 ).unwrap();
854
855 set_sync_state(conn, "applying_remote", "0").unwrap();
856 clear_changelog(conn);
857
858 let total = create_initial_snapshot(conn).unwrap();
859 // 2 samples + 2 tags + 1 collection + 1 collection_member
860 // + 1 user_config seed (sample_tombstone_retain_days, from M019) = 7
861 assert_eq!(total, 7);
862
863 assert_eq!(changelog_count(conn, Some("samples"), Some("INSERT")), 2);
864 assert_eq!(changelog_count(conn, Some("tags"), Some("INSERT")), 2);
865 assert_eq!(changelog_count(conn, Some("collections"), Some("INSERT")), 1);
866 assert_eq!(changelog_count(conn, Some("collection_members"), Some("INSERT")), 1);
867 }
868
869 #[test]
870 fn snapshot_changelog_entries_contain_valid_json() {
871 let db = setup_test_db();
872 let conn = db.conn();
873
874 set_sync_state(conn, "applying_remote", "1").unwrap();
875 insert_sample(conn, "json_test", "pad.wav", "wav");
876 set_sync_state(conn, "applying_remote", "0").unwrap();
877 clear_changelog(conn);
878
879 create_initial_snapshot(conn).unwrap();
880
881 let data: String = conn.query_row(
882 "SELECT data FROM sync_changelog WHERE table_name = 'samples' AND row_id = 'json_test'",
883 [],
884 |row| row.get(0),
885 ).unwrap();
886
887 let parsed: serde_json::Value = serde_json::from_str(&data).unwrap();
888 assert_eq!(parsed["hash"], "json_test");
889 assert_eq!(parsed["original_name"], "pad.wav");
890 assert_eq!(parsed["file_extension"], "wav");
891 assert!(parsed.get("file_size").is_some());
892 assert!(parsed.get("import_date").is_some());
893 // Columns added in migrations 008-009 must be present in snapshot
894 assert!(parsed.get("cloud_only").is_some(), "snapshot missing cloud_only");
895 assert!(parsed.get("duration").is_some(), "snapshot missing duration");
896 }
897
898 #[test]
899 fn snapshot_samples_columns_match_whitelist() {
900 let db = setup_test_db();
901 let conn = db.conn();
902
903 set_sync_state(conn, "applying_remote", "1").unwrap();
904 insert_sample(conn, "col_test", "check.wav", "wav");
905 set_sync_state(conn, "applying_remote", "0").unwrap();
906 clear_changelog(conn);
907
908 create_initial_snapshot(conn).unwrap();
909
910 let data: String = conn.query_row(
911 "SELECT data FROM sync_changelog WHERE table_name = 'samples' AND row_id = 'col_test'",
912 [],
913 |row| row.get(0),
914 ).unwrap();
915 let parsed: serde_json::Value = serde_json::from_str(&data).unwrap();
916 let obj = parsed.as_object().unwrap();
917
918 // Every column in the whitelist must appear in the snapshot JSON
919 for col in table_columns("samples").unwrap() {
920 assert!(obj.contains_key(*col), "snapshot samples missing column: {}", col);
921 }
922 }
923
924 #[test]
925 fn snapshot_audio_analysis_columns_match_whitelist() {
926 let db = setup_test_db();
927 let conn = db.conn();
928
929 set_sync_state(conn, "applying_remote", "1").unwrap();
930 insert_sample(conn, "aa_test", "tone.wav", "wav");
931 conn.execute(
932 "INSERT INTO audio_analysis (hash, duration, sample_rate, channels, analyzed_at) VALUES ('aa_test', 1.5, 44100, 2, 1000000)",
933 [],
934 ).unwrap();
935 set_sync_state(conn, "applying_remote", "0").unwrap();
936 clear_changelog(conn);
937
938 create_initial_snapshot(conn).unwrap();
939
940 let data: String = conn.query_row(
941 "SELECT data FROM sync_changelog WHERE table_name = 'audio_analysis' AND row_id = 'aa_test'",
942 [],
943 |row| row.get(0),
944 ).unwrap();
945 let parsed: serde_json::Value = serde_json::from_str(&data).unwrap();
946 let obj = parsed.as_object().unwrap();
947
948 for col in table_columns("audio_analysis").unwrap() {
949 assert!(obj.contains_key(*col), "snapshot audio_analysis missing column: {}", col);
950 }
951 }
952
953 #[test]
954 fn loose_files_excluded_from_sync() {
955 let db = setup_test_db();
956 let conn = db.conn();
957 clear_changelog(conn);
958
959 // Insert loose_files — should NOT fire trigger
960 conn.execute(
961 "INSERT INTO user_config (key, value) VALUES ('loose_files', '1')",
962 [],
963 ).unwrap();
964 assert_eq!(changelog_count(conn, Some("user_config"), None), 0);
965
966 // Update loose_files — should NOT fire trigger
967 conn.execute(
968 "UPDATE user_config SET value = '0' WHERE key = 'loose_files'",
969 [],
970 ).unwrap();
971 assert_eq!(changelog_count(conn, Some("user_config"), None), 0);
972
973 // Normal key — should fire trigger
974 conn.execute(
975 "INSERT INTO user_config (key, value) VALUES ('theme', 'dark')",
976 [],
977 ).unwrap();
978 assert_eq!(changelog_count(conn, Some("user_config"), None), 1);
979 }
980
981 #[test]
982 fn snapshot_after_adding_data_does_not_duplicate() {
983 let db = setup_test_db();
984 let conn = db.conn();
985
986 set_sync_state(conn, "applying_remote", "1").unwrap();
987 insert_sample(conn, "first", "one.wav", "wav");
988 set_sync_state(conn, "applying_remote", "0").unwrap();
989 clear_changelog(conn);
990
991 let first = create_initial_snapshot(conn).unwrap();
992 // 1 sample + 1 user_config seed (sample_tombstone_retain_days, M019)
993 assert_eq!(first, 2);
994
995 // Add more data after snapshot (these go through normal triggers)
996 insert_sample(conn, "second", "two.wav", "wav");
997
998 // Second snapshot call should return 0 (flag already set)
999 let second = create_initial_snapshot(conn).unwrap();
1000 assert_eq!(second, 0);
1001
1002 // Total changelog: 1 from snapshot + 1 from trigger
1003 assert_eq!(changelog_count(conn, Some("samples"), None), 2);
1004 }
1005
1006 #[test]
1007 fn cleanup_preserves_recent_pushed_entries() {
1008 let db = setup_test_db();
1009 let conn = db.conn();
1010 clear_changelog(conn);
1011
1012 // Entry pushed exactly 6 days ago (within 7-day window)
1013 conn.execute(
1014 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \
1015 VALUES ('samples', 'INSERT', 'six_days', datetime('now', '-6 days'), 1)",
1016 [],
1017 ).unwrap();
1018
1019 // Entry pushed exactly 8 days ago (outside 7-day window)
1020 conn.execute(
1021 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \
1022 VALUES ('samples', 'INSERT', 'eight_days', datetime('now', '-8 days'), 1)",
1023 [],
1024 ).unwrap();
1025
1026 let deleted = cleanup_changelog(conn).unwrap();
1027 assert_eq!(deleted, 1);
1028
1029 // six_days should remain, eight_days should be deleted
1030 let remaining_id: String = conn.query_row(
1031 "SELECT row_id FROM sync_changelog",
1032 [],
1033 |r| r.get(0),
1034 ).unwrap();
1035 assert_eq!(remaining_id, "six_days");
1036 }
1037
1038 #[test]
1039 fn enforce_retention_at_exact_cap_is_noop() {
1040 let db = setup_test_db();
1041 let conn = db.conn();
1042 clear_changelog(conn);
1043
1044 for i in 0..MAX_CHANGELOG_ENTRIES {
1045 conn.execute(
1046 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
1047 VALUES ('samples', 'UPDATE', ?1, '{}')",
1048 [format!("row-{}", i)],
1049 ).unwrap();
1050 }
1051
1052 let deleted = enforce_changelog_retention(conn).unwrap();
1053 assert_eq!(deleted, 0);
1054
1055 let count: i64 = conn.query_row(
1056 "SELECT COUNT(*) FROM sync_changelog", [], |r| r.get(0),
1057 ).unwrap();
1058 assert_eq!(count, MAX_CHANGELOG_ENTRIES);
1059 }
1060
1061 #[test]
1062 fn cleanup_and_retention_combined() {
1063 let db = setup_test_db();
1064 let conn = db.conn();
1065 clear_changelog(conn);
1066
1067 // Insert many old pushed entries (should be cleaned by cleanup)
1068 for i in 0..100 {
1069 conn.execute(
1070 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) \
1071 VALUES ('samples', 'UPDATE', ?1, datetime('now', '-14 days'), 1)",
1072 [format!("old-{}", i)],
1073 ).unwrap();
1074 }
1075
1076 // Insert entries at the retention cap to test combined behavior
1077 for i in 0..MAX_CHANGELOG_ENTRIES {
1078 conn.execute(
1079 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
1080 VALUES ('samples', 'INSERT', ?1, '{}')",
1081 [format!("new-{}", i)],
1082 ).unwrap();
1083 }
1084
1085 // cleanup removes old pushed entries
1086 let cleaned = cleanup_changelog(conn).unwrap();
1087 assert_eq!(cleaned, 100);
1088
1089 // retention should be a noop now (exactly MAX_CHANGELOG_ENTRIES remain)
1090 let retained = enforce_changelog_retention(conn).unwrap();
1091 assert_eq!(retained, 0);
1092 }
1093
1094 #[test]
1095 fn sync_state_get_and_set_roundtrip() {
1096 let db = setup_test_db();
1097 let conn = db.conn();
1098
1099 set_sync_state(conn, "auto_sync_enabled", "1").unwrap();
1100 assert_eq!(get_sync_state(conn, "auto_sync_enabled").unwrap(), "1");
1101
1102 set_sync_state(conn, "auto_sync_enabled", "0").unwrap();
1103 assert_eq!(get_sync_state(conn, "auto_sync_enabled").unwrap(), "0");
1104 }
1105
1106 // ── Download query logic ──
1107
1108 #[test]
1109 fn missing_blobs_query_finds_sync_enabled_only() {
1110 let db = setup_test_db();
1111 let conn = db.conn();
1112
1113 // Create two VFS: one with sync_files=true, one without
1114 let vfs_sync = insert_vfs(conn, "Synced", true);
1115 let vfs_local = insert_vfs(conn, "Local", false);
1116
1117 insert_sample(conn, "hash_a", "kick.wav", "wav");
1118 insert_sample(conn, "hash_b", "snare.wav", "wav");
1119
1120 // Link hash_a to synced VFS, hash_b to local-only VFS
1121 let now = chrono::Utc::now().timestamp();
1122 conn.execute(
1123 "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'kick.wav', 'sample', 'hash_a', ?2)",
1124 rusqlite::params![vfs_sync, now],
1125 ).unwrap();
1126 conn.execute(
1127 "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'snare.wav', 'sample', 'hash_b', ?2)",
1128 rusqlite::params![vfs_local, now],
1129 ).unwrap();
1130
1131 // Run the same query as download_missing_blobs
1132 let mut stmt = conn.prepare(
1133 "SELECT DISTINCT s.hash, s.file_extension
1134 FROM samples s
1135 JOIN vfs_nodes vn ON vn.sample_hash = s.hash
1136 JOIN vfs v ON v.id = vn.vfs_id
1137 WHERE v.sync_files = 1",
1138 ).unwrap();
1139 let rows: Vec<(String, String)> = stmt
1140 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
1141 .unwrap()
1142 .collect::<std::result::Result<Vec<_>, _>>()
1143 .unwrap();
1144
1145 assert_eq!(rows.len(), 1);
1146 assert_eq!(rows[0].0, "hash_a");
1147 }
1148
1149 #[test]
1150 fn missing_blobs_query_deduplicates_multi_vfs() {
1151 let db = setup_test_db();
1152 let conn = db.conn();
1153
1154 let vfs1 = insert_vfs(conn, "Lib1", true);
1155 let vfs2 = insert_vfs(conn, "Lib2", true);
1156 insert_sample(conn, "shared_hash", "shared.wav", "wav");
1157
1158 // Same sample linked in two synced VFS entries
1159 let now = chrono::Utc::now().timestamp();
1160 conn.execute(
1161 "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'a.wav', 'sample', 'shared_hash', ?2)",
1162 rusqlite::params![vfs1, now],
1163 ).unwrap();
1164 conn.execute(
1165 "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'b.wav', 'sample', 'shared_hash', ?2)",
1166 rusqlite::params![vfs2, now],
1167 ).unwrap();
1168
1169 let mut stmt = conn.prepare(
1170 "SELECT DISTINCT s.hash, s.file_extension
1171 FROM samples s
1172 JOIN vfs_nodes vn ON vn.sample_hash = s.hash
1173 JOIN vfs v ON v.id = vn.vfs_id
1174 WHERE v.sync_files = 1",
1175 ).unwrap();
1176 let rows: Vec<(String, String)> = stmt
1177 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
1178 .unwrap()
1179 .collect::<std::result::Result<Vec<_>, _>>()
1180 .unwrap();
1181
1182 // DISTINCT should yield exactly one row
1183 assert_eq!(rows.len(), 1);
1184 }
1185
1186 // ── Upload query logic ──
1187
1188 #[test]
1189 fn upload_pending_query_finds_non_cloud_only() {
1190 let db = setup_test_db();
1191 let conn = db.conn();
1192
1193 let vfs = insert_vfs(conn, "Synced", true);
1194 insert_sample(conn, "local_hash", "kick.wav", "wav");
1195
1196 // Mark one sample as cloud_only
1197 conn.execute("UPDATE samples SET cloud_only = 1 WHERE hash = 'local_hash'", []).unwrap();
1198
1199 insert_sample(conn, "present_hash", "snare.wav", "wav");
1200
1201 let now = chrono::Utc::now().timestamp();
1202 conn.execute(
1203 "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'kick.wav', 'sample', 'local_hash', ?2)",
1204 rusqlite::params![vfs, now],
1205 ).unwrap();
1206 conn.execute(
1207 "INSERT INTO vfs_nodes (vfs_id, parent_id, name, node_type, sample_hash, created_at) VALUES (?1, NULL, 'snare.wav', 'sample', 'present_hash', ?2)",
1208 rusqlite::params![vfs, now],
1209 ).unwrap();
1210
1211 // Same query as upload_pending_blobs
1212 let mut stmt = conn.prepare(
1213 "SELECT DISTINCT s.hash, s.file_extension, s.file_size
1214 FROM samples s
1215 JOIN vfs_nodes vn ON vn.sample_hash = s.hash
1216 JOIN vfs v ON v.id = vn.vfs_id
1217 WHERE v.sync_files = 1 AND s.cloud_only = 0",
1218 ).unwrap();
1219 let rows: Vec<(String, String, i64)> = stmt
1220 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
1221 .unwrap()
1222 .collect::<std::result::Result<Vec<_>, _>>()
1223 .unwrap();
1224
1225 // Only present_hash (cloud_only=0)
1226 assert_eq!(rows.len(), 1);
1227 assert_eq!(rows[0].0, "present_hash");
1228 }
1229
1230 #[test]
1231 fn push_changelog_reads_unpushed_in_order() {
1232 let db = setup_test_db();
1233 let conn = db.conn();
1234 clear_changelog(conn);
1235
1236 // Insert changelog entries with specific order
1237 for i in 0..5 {
1238 conn.execute(
1239 "INSERT INTO sync_changelog (table_name, op, row_id, data, pushed) VALUES ('samples', 'INSERT', ?1, '{}', 0)",
1240 [format!("push-{}", i)],
1241 ).unwrap();
1242 }
1243 // Mark one as already pushed
1244 conn.execute(
1245 "UPDATE sync_changelog SET pushed = 1 WHERE row_id = 'push-2'",
1246 [],
1247 ).unwrap();
1248
1249 // Same query as push_changes
1250 let mut stmt = conn.prepare(
1251 "SELECT id, table_name, op, row_id, timestamp, data
1252 FROM sync_changelog
1253 WHERE pushed = 0
1254 ORDER BY id ASC
1255 LIMIT ?1",
1256 ).unwrap();
1257 let rows: Vec<(i64, String, String, String)> = stmt
1258 .query_map([500i64], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)))
1259 .unwrap()
1260 .collect::<std::result::Result<Vec<_>, _>>()
1261 .unwrap();
1262
1263 // Should skip push-2 (already pushed)
1264 assert_eq!(rows.len(), 4);
1265 let row_ids: Vec<&str> = rows.iter().map(|r| r.3.as_str()).collect();
1266 assert!(!row_ids.contains(&"push-2"));
1267 // Should be in ASC order
1268 assert_eq!(row_ids[0], "push-0");
1269 assert_eq!(row_ids[3], "push-4");
1270 }
1271
1272 // ── Resolve: mixed upsert/delete ordering ──
1273
1274 #[test]
1275 fn apply_remote_changes_mixed_ops_correct_order() {
1276 let db = setup_test_db();
1277 let conn = db.conn();
1278 clear_changelog(conn);
1279
1280 // Insert a sample first
1281 let changes_insert = vec![
1282 change("samples", ChangeOp::Insert, "mix_hash", Some(json!({
1283 "hash": "mix_hash", "original_name": "mixed.wav",
1284 "file_extension": "wav", "file_size": 1024,
1285 "import_date": 1000000, "last_modified": 1000000,
1286 "cloud_only": 0
1287 }))),
1288 change("tags", ChangeOp::Insert, "mix_hash:bass", Some(json!({
1289 "sample_hash": "mix_hash", "tag": "bass"
1290 }))),
1291 ];
1292 apply_remote_changes(conn, &changes_insert).unwrap();
1293
1294 // Now delete tag then sample (mixed batch)
1295 let changes_delete = vec![
1296 change("tags", ChangeOp::Delete, "mix_hash:bass", None),
1297 change("samples", ChangeOp::Delete, "mix_hash", None),
1298 ];
1299 let applied = apply_remote_changes(conn, &changes_delete).unwrap();
1300 assert_eq!(applied, 2);
1301
1302 let sample_count: i64 = conn.query_row(
1303 "SELECT COUNT(*) FROM samples WHERE hash = 'mix_hash'",
1304 [], |row| row.get(0),
1305 ).unwrap();
1306 assert_eq!(sample_count, 0);
1307
1308 let tag_count: i64 = conn.query_row(
1309 "SELECT COUNT(*) FROM tags WHERE sample_hash = 'mix_hash'",
1310 [], |row| row.get(0),
1311 ).unwrap();
1312 assert_eq!(tag_count, 0);
1313 }
1314
1315 #[test]
1316 fn apply_upsert_updates_existing_row() {
1317 let db = setup_test_db();
1318 let conn = db.conn();
1319
1320 // Insert initial sample
1321 apply_upsert(conn, "samples", &json!({
1322 "hash": "upd_hash", "original_name": "old.wav",
1323 "file_extension": "wav", "file_size": 1024,
1324 "import_date": 1000000, "last_modified": 1000000,
1325 "cloud_only": 0
1326 })).unwrap();
1327
1328 // Update via upsert (ON CONFLICT DO UPDATE)
1329 apply_upsert(conn, "samples", &json!({
1330 "hash": "upd_hash", "original_name": "new.wav",
1331 "file_extension": "wav", "file_size": 2048,
1332 "import_date": 1000000, "last_modified": 2000000,
1333 "cloud_only": 0
1334 })).unwrap();
1335
1336 let name: String = conn.query_row(
1337 "SELECT original_name FROM samples WHERE hash = 'upd_hash'",
1338 [], |row| row.get(0),
1339 ).unwrap();
1340 assert_eq!(name, "new.wav");
1341
1342 let size: i64 = conn.query_row(
1343 "SELECT file_size FROM samples WHERE hash = 'upd_hash'",
1344 [], |row| row.get(0),
1345 ).unwrap();
1346 assert_eq!(size, 2048);
1347
1348 // Should still be only 1 row
1349 let count: i64 = conn.query_row(
1350 "SELECT COUNT(*) FROM samples WHERE hash = 'upd_hash'",
1351 [], |row| row.get(0),
1352 ).unwrap();
1353 assert_eq!(count, 1);
1354 }
1355
1356 #[test]
1357 fn apply_delete_nonexistent_is_noop() {
1358 let db = setup_test_db();
1359 let conn = db.conn();
1360
1361 // Delete a hash that doesn't exist — should succeed (no-op)
1362 let result = apply_delete(conn, "samples", "nonexistent_hash", None);
1363 assert!(result.is_ok());
1364 }
1365 }
1366