Skip to main content

max / audiofiles

5.2 KB · 166 lines History Blame Raw
1 //! Push/pull sync engine: reads from sync_changelog, pushes to server, pulls remote changes.
2 //!
3 //! All rusqlite operations run synchronously inside `spawn_blocking` to avoid
4 //! holding a `Connection` across async `.await` points (Connection is !Send).
5
6 mod download;
7 mod resolve;
8 mod state;
9 mod upload;
10
11 use rusqlite::Connection;
12
13 use tracing::instrument;
14
15 use crate::error::{Result, SyncError};
16
17 pub(crate) const PUSH_BATCH_LIMIT: usize = 500;
18 pub(crate) const MAX_CHANGELOG_ENTRIES: i64 = 10_000;
19
20 /// FK-safe order: parents first for inserts/updates.
21 pub(crate) const UPSERT_ORDER: &[&str] = &[
22 "vfs",
23 "samples",
24 "collections",
25 "vfs_nodes",
26 "audio_analysis",
27 "tags",
28 "collection_members",
29 "user_config",
30 "edit_history",
31 ];
32
33 /// FK-safe order: children first for deletes.
34 pub(crate) const DELETE_ORDER: &[&str] = &[
35 "edit_history",
36 "user_config",
37 "collection_members",
38 "tags",
39 "audio_analysis",
40 "vfs_nodes",
41 "collections",
42 "samples",
43 "vfs",
44 ];
45
46 /// Result of a sync round.
47 pub struct SyncResult {
48 pub pushed: i64,
49 pub pulled: i64,
50 }
51
52 /// Column whitelist per synced table.
53 pub(crate) fn table_columns(table: &str) -> Option<&'static [&'static str]> {
54 match table {
55 "samples" => Some(&[
56 "hash", "original_name", "file_extension", "file_size",
57 "import_date", "last_modified", "cloud_only", "duration",
58 ]),
59 "audio_analysis" => Some(&[
60 "hash", "bpm", "musical_key", "duration", "sample_rate", "channels",
61 "peak_db", "rms_db", "is_loop", "spectral_centroid", "onset_strength",
62 "analyzed_at", "lufs", "spectral_flatness", "spectral_rolloff",
63 "zero_crossing_rate", "classification", "spectral_bandwidth",
64 "centroid_variance", "crest_factor", "attack_time",
65 "classification_confidence",
66 ]),
67 "vfs" => Some(&["id", "name", "created_at", "modified_at", "sync_files"]),
68 "vfs_nodes" => Some(&[
69 "id", "vfs_id", "parent_id", "name", "node_type",
70 "sample_hash", "created_at",
71 ]),
72 "tags" => Some(&["sample_hash", "tag"]),
73 "collections" => Some(&["id", "name", "description", "created_at", "filter_json"]),
74 "collection_members" => Some(&["collection_id", "sample_hash", "added_at"]),
75 "user_config" => Some(&["key", "value"]),
76 "edit_history" => Some(&[
77 "id", "source_hash", "result_hash", "operation", "params_json",
78 "created_at",
79 ]),
80 _ => None,
81 }
82 }
83
84 /// Primary key column(s) for each synced table.
85 pub(crate) fn pk_columns(table: &str) -> &'static [&'static str] {
86 match table {
87 "samples" => &["hash"],
88 "audio_analysis" => &["hash"],
89 "vfs" => &["id"],
90 "vfs_nodes" => &["id"],
91 "tags" => &["sample_hash", "tag"],
92 "collections" => &["id"],
93 "collection_members" => &["collection_id", "sample_hash"],
94 "user_config" => &["key"],
95 "edit_history" => &["id"],
96 _ => &[],
97 }
98 }
99
100 /// Open a WAL-mode connection for use inside spawn_blocking.
101 #[instrument(skip_all)]
102 pub(crate) fn open_conn(db_path: &std::path::Path) -> Result<Connection> {
103 let conn = Connection::open(db_path)?;
104 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
105 Ok(conn)
106 }
107
108 /// Convert a JSON value to a rusqlite-compatible SQL parameter.
109 pub(crate) fn json_to_sql(val: &serde_json::Value) -> Box<dyn rusqlite::types::ToSql> {
110 match val {
111 serde_json::Value::String(s) => Box::new(s.clone()),
112 serde_json::Value::Number(n) => {
113 if let Some(i) = n.as_i64() {
114 Box::new(i)
115 } else if let Some(f) = n.as_f64() {
116 Box::new(f)
117 } else {
118 Box::new(n.to_string())
119 }
120 }
121 serde_json::Value::Bool(b) => Box::new(*b as i32),
122 serde_json::Value::Null => Box::new(rusqlite::types::Null),
123 other => Box::new(other.to_string()),
124 }
125 }
126
127 /// Read a value from the sync_state key-value table.
128 #[instrument(skip_all)]
129 pub fn get_sync_state(conn: &Connection, key: &str) -> Result<String> {
130 conn.query_row(
131 "SELECT value FROM sync_state WHERE key = ?1",
132 [key],
133 |row| row.get(0),
134 )
135 .map_err(SyncError::Db)
136 }
137
138 /// Write a value to the sync_state key-value table.
139 ///
140 /// Uses INSERT OR REPLACE to handle keys that may not exist yet (e.g. after
141 /// partial migration or DB repair).
142 #[instrument(skip_all)]
143 pub fn set_sync_state(conn: &Connection, key: &str, value: &str) -> Result<()> {
144 conn.execute(
145 "INSERT OR REPLACE INTO sync_state (key, value) VALUES (?1, ?2)",
146 rusqlite::params![key, value],
147 )?;
148 Ok(())
149 }
150
151 /// Count unpushed changelog entries.
152 #[instrument(skip_all)]
153 pub fn count_pending_changes(conn: &Connection) -> Result<i64> {
154 conn.query_row(
155 "SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0",
156 [],
157 |row| row.get(0),
158 )
159 .map_err(SyncError::Db)
160 }
161
162 // Re-exports from submodules
163 pub use download::{download_missing_blobs, download_one_blob};
164 pub use state::{cleanup_changelog, create_initial_snapshot, enforce_changelog_retention, mark_cloud_only_samples};
165 pub use upload::{perform_sync, upload_pending_blobs};
166