Skip to main content

max / audiofiles

Sync: SSE push notifications, blob validation, OAuth state check, schema expansion Scheduler subscribes to SSE stream for real-time sync triggers. Download/upload validate hash and extension before blob operations. OAuth callback validates state parameter before sending success. Add edit_history, new analysis columns, and duration to sync whitelist. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-04-13 21:58 UTC
Commit: 870cca5706ea57e1547aede1bf1e9a05785659f4
Parent: 72e3502
6 files changed, +107 insertions, -12 deletions
@@ -123,19 +123,26 @@ pub fn start_auth(client: &SyncKitClient) -> Result<AuthSession> {
123 123 }
124 124 }
125 125
126 - // Send HTML response
127 - let body = "<html><body><h1>Authentication successful</h1><p>You can close this tab and return to audiofiles.</p></body></html>";
128 - let response = format!(
129 - "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
130 - body.len(),
131 - body
132 - );
133 - let _ = stream.write_all(response.as_bytes());
134 -
135 126 if let (Some(code), Some(state)) = (code, cb_state) {
136 127 if state != thread_expected_state {
137 - tracing::warn!("OAuth callback state mismatch");
128 + tracing::warn!("OAuth callback state mismatch — rejecting");
129 + let body = "<html><body><h1>Authentication failed</h1><p>CSRF state mismatch. Please try again.</p></body></html>";
130 + let response = format!(
131 + "HTTP/1.1 403 Forbidden\r\nContent-Type: text/html\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
132 + body.len(),
133 + body
134 + );
135 + let _ = stream.write_all(response.as_bytes());
136 + break;
138 137 }
138 +
139 + let body = "<html><body><h1>Authentication successful</h1><p>You can close this tab and return to audiofiles.</p></body></html>";
140 + let response = format!(
141 + "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
142 + body.len(),
143 + body
144 + );
145 + let _ = stream.write_all(response.as_bytes());
139 146 let _ = tx.send(CallbackResult { code, state });
140 147 }
141 148 }
@@ -1,4 +1,5 @@
1 1 //! Background sync scheduler: periodic auto-sync with exponential backoff on failure.
2 + //! SSE push notifications trigger immediate sync when another device pushes changes.
2 3
3 4 use std::path::{Path, PathBuf};
4 5 use std::sync::Arc;
@@ -21,6 +22,9 @@ pub enum SyncCommand {
21 22 Stop,
22 23 }
23 24
25 + /// Delay before reconnecting the SSE stream after a disconnect (seconds).
26 + const SSE_RECONNECT_DELAY_SECS: u64 = 5;
27 +
24 28 /// Run the background sync scheduler loop.
25 29 ///
26 30 /// Uses `db_path` to open short-lived connections inside `spawn_blocking`.
@@ -35,6 +39,7 @@ pub async fn run_scheduler(
35 39 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
36 40 let mut consecutive_failures: u32 = 0;
37 41 let mut backoff_until: Option<chrono::DateTime<chrono::Utc>> = None;
42 + let mut sse_stream: Option<synckit_client::SyncNotifyStream> = None;
38 43
39 44 loop {
40 45 tokio::select! {
@@ -81,6 +86,39 @@ pub async fn run_scheduler(
81 86 }
82 87 }
83 88 }
89 + result = async {
90 + if let Some(ref mut stream) = sse_stream {
91 + stream.next_change().await
92 + } else {
93 + std::future::pending::<Option<()>>().await
94 + }
95 + } => {
96 + match result {
97 + Some(()) => {
98 + tracing::debug!("SSE: received change notification, triggering immediate sync");
99 + if should_auto_sync(&db_path, &client) {
100 + match run_sync_cycle(&db_path, &content_dir, &client, &status).await {
101 + Ok(pulled) => {
102 + consecutive_failures = 0;
103 + backoff_until = None;
104 + if pulled > 0 {
105 + status.lock().needs_refresh = true;
106 + }
107 + }
108 + Err(e) => {
109 + tracing::warn!("SSE-triggered sync failed: {e}");
110 + status.lock().last_error = Some(e.to_string());
111 + }
112 + }
113 + }
114 + }
115 + None => {
116 + tracing::debug!("SSE: stream disconnected, will reconnect");
117 + sse_stream = None;
118 + tokio::time::sleep(std::time::Duration::from_secs(SSE_RECONNECT_DELAY_SECS)).await;
119 + }
120 + }
121 + }
84 122 cmd = commands.recv() => {
85 123 match cmd {
86 124 Some(SyncCommand::SyncNow) => {
@@ -105,6 +143,22 @@ pub async fn run_scheduler(
105 143 }
106 144 }
107 145 }
146 +
147 + // Try to establish SSE connection if not connected and client is ready
148 + if sse_stream.is_none()
149 + && client.session_info().is_some()
150 + && client.has_master_key()
151 + {
152 + match client.subscribe().await {
153 + Ok(stream) => {
154 + tracing::debug!("SSE: connected to push notification stream");
155 + sse_stream = Some(stream);
156 + }
157 + Err(e) => {
158 + tracing::debug!("SSE: failed to connect (will retry): {e}");
159 + }
160 + }
161 + }
108 162 }
109 163 }
110 164
@@ -2,11 +2,14 @@
2 2
3 3 use std::path::Path;
4 4
5 + use sha2::{Digest, Sha256};
5 6 use synckit_client::SyncKitClient;
6 7 use uuid::Uuid;
7 8
8 9 use tracing::instrument;
9 10
11 + use audiofiles_core::store::{validate_hash, validate_extension};
12 +
10 13 use crate::error::{Result, SyncError};
11 14
12 15 use super::{open_conn, get_sync_state, set_sync_state};
@@ -40,6 +43,10 @@ pub async fn download_missing_blobs(
40 43 let missing: Vec<(String, String)> = rows
41 44 .into_iter()
42 45 .filter(|(hash, ext)| {
46 + if validate_hash(hash).is_err() || validate_extension(ext).is_err() {
47 + tracing::warn!(hash, ext, "Skipping blob with invalid hash/extension");
48 + return false;
49 + }
43 50 let path = cd.join(format!("{}.{}", hash, ext));
44 51 !path.exists()
45 52 })
@@ -93,6 +100,17 @@ pub async fn download_missing_blobs(
93 100 }
94 101 };
95 102
103 + // Verify SHA-256 before writing
104 + let computed = format!("{:x}", Sha256::digest(&data));
105 + if computed != *hash {
106 + tracing::warn!(
107 + expected = hash,
108 + actual = computed,
109 + "Downloaded blob hash mismatch, skipping"
110 + );
111 + continue;
112 + }
113 +
96 114 // Write blob to content directory (flat layout)
97 115 std::fs::create_dir_all(content_dir)?;
98 116 let blob_path = content_dir.join(format!("{}.{}", hash, ext));
@@ -28,10 +28,12 @@ pub(crate) const UPSERT_ORDER: &[&str] = &[
28 28 "collection_members",
29 29 "smart_folders",
30 30 "user_config",
31 + "edit_history",
31 32 ];
32 33
33 34 /// FK-safe order: children first for deletes.
34 35 pub(crate) const DELETE_ORDER: &[&str] = &[
36 + "edit_history",
35 37 "user_config",
36 38 "smart_folders",
37 39 "collection_members",
@@ -54,13 +56,15 @@ pub(crate) fn table_columns(table: &str) -> Option<&'static [&'static str]> {
54 56 match table {
55 57 "samples" => Some(&[
56 58 "hash", "original_name", "file_extension", "file_size",
57 - "import_date", "last_modified", "cloud_only",
59 + "import_date", "last_modified", "cloud_only", "duration",
58 60 ]),
59 61 "audio_analysis" => Some(&[
60 62 "hash", "bpm", "musical_key", "duration", "sample_rate", "channels",
61 63 "peak_db", "rms_db", "is_loop", "spectral_centroid", "onset_strength",
62 64 "analyzed_at", "lufs", "spectral_flatness", "spectral_rolloff",
63 - "zero_crossing_rate", "classification",
65 + "zero_crossing_rate", "classification", "spectral_bandwidth",
66 + "centroid_variance", "crest_factor", "attack_time",
67 + "classification_confidence",
64 68 ]),
65 69 "vfs" => Some(&["id", "name", "created_at", "modified_at", "sync_files"]),
66 70 "vfs_nodes" => Some(&[
@@ -72,6 +76,10 @@ pub(crate) fn table_columns(table: &str) -> Option<&'static [&'static str]> {
72 76 "collection_members" => Some(&["collection_id", "sample_hash", "added_at"]),
73 77 "smart_folders" => Some(&["id", "vfs_id", "name", "query_json", "created_at"]),
74 78 "user_config" => Some(&["key", "value"]),
79 + "edit_history" => Some(&[
80 + "id", "source_hash", "result_hash", "operation", "params_json",
81 + "created_at",
82 + ]),
75 83 _ => None,
76 84 }
77 85 }
@@ -88,6 +96,7 @@ pub(crate) fn pk_columns(table: &str) -> &'static [&'static str] {
88 96 "collection_members" => &["collection_id", "sample_hash"],
89 97 "smart_folders" => &["id"],
90 98 "user_config" => &["key"],
99 + "edit_history" => &["id"],
91 100 _ => &[],
92 101 }
93 102 }
@@ -30,6 +30,7 @@ pub fn create_initial_snapshot(conn: &Connection) -> Result<i64> {
30 30 ("collection_members", "SELECT CAST(collection_id AS TEXT) || ':' || sample_hash, json_object('collection_id', collection_id, 'sample_hash', sample_hash, 'added_at', added_at) FROM collection_members"),
31 31 ("smart_folders", "SELECT CAST(id AS TEXT), json_object('id', id, 'vfs_id', vfs_id, 'name', name, 'query_json', query_json, 'created_at', created_at) FROM smart_folders"),
32 32 ("user_config", "SELECT key, json_object('key', key, 'value', value) FROM user_config WHERE key NOT LIKE 'sync_%'"),
33 + ("edit_history", "SELECT CAST(id AS TEXT), json_object('id', id, 'source_hash', source_hash, 'result_hash', result_hash, 'operation', operation, 'params_json', params_json, 'created_at', created_at) FROM edit_history"),
33 34 ];
34 35
35 36 for (table, query) in table_queries {
@@ -7,6 +7,8 @@ use uuid::Uuid;
7 7
8 8 use tracing::instrument;
9 9
10 + use audiofiles_core::store::{validate_hash, validate_extension};
11 +
10 12 use crate::error::{Result, SyncError};
11 13
12 14 use super::{open_conn, get_sync_state, set_sync_state, SyncResult, PUSH_BATCH_LIMIT};
@@ -76,6 +78,10 @@ pub async fn upload_pending_blobs(
76 78 let mut uploaded = 0u64;
77 79
78 80 for (hash, ext, size) in &pending {
81 + if validate_hash(hash).is_err() || validate_extension(ext).is_err() {
82 + tracing::warn!(hash, ext, "Skipping upload with invalid hash/extension");
83 + continue;
84 + }
79 85 // Content-addressed flat layout: content_dir/{hash}.{ext}
80 86 let blob_path = content_dir.join(format!("{}.{}", hash, ext));
81 87