Skip to main content

max / audiofiles

8.6 KB · 295 lines History Blame Raw
1 //! Background cleanup worker: removes orphaned samples off the GUI thread.
2 //!
3 //! Mirrors the pattern in `export.rs` — dedicated thread with its own Database +
4 //! SampleStore, communicating via channels. The GUI thread polls events each frame.
5
6 use std::path::PathBuf;
7 use std::sync::{mpsc, Mutex};
8 use std::thread;
9
10 use tracing::{error, instrument};
11
12 use audiofiles_core::db::Database;
13 use audiofiles_core::store::SampleStore;
14
15 /// Command sent from the GUI thread to the cleanup worker.
16 pub enum CleanupCommand {
17 /// Start removing orphaned samples.
18 RemoveOrphans,
19 /// Cancel the current cleanup.
20 Cancel,
21 /// Shut down the worker thread.
22 Shutdown,
23 }
24
25 /// Event sent from the cleanup worker back to the GUI thread.
26 pub enum CleanupEvent {
27 /// Progress update for one sample removed.
28 Progress {
29 completed: usize,
30 total: usize,
31 current_name: String,
32 },
33 /// The cleanup is complete.
34 Complete {
35 removed: usize,
36 errors: usize,
37 },
38 }
39
40 /// Handle for communicating with the background cleanup worker.
41 ///
42 /// The receiver is wrapped in a `Mutex` so `BrowserState` remains `Sync` (required by nih-plug).
43 /// Only the GUI thread actually calls `try_recv`, so contention is zero.
44 pub struct CleanupHandle {
45 cmd_tx: mpsc::Sender<CleanupCommand>,
46 event_rx: Mutex<mpsc::Receiver<CleanupEvent>>,
47 _thread: Option<thread::JoinHandle<()>>,
48 }
49
50 impl CleanupHandle {
51 /// Poll for the next event without blocking.
52 pub fn try_recv(&self) -> Option<CleanupEvent> {
53 self.event_rx.lock().ok()?.try_recv().ok()
54 }
55
56 /// Send a command to the worker.
57 pub fn send(&self, cmd: CleanupCommand) {
58 let _ = self.cmd_tx.send(cmd);
59 }
60 }
61
62 impl Drop for CleanupHandle {
63 fn drop(&mut self) {
64 let _ = self.cmd_tx.send(CleanupCommand::Shutdown);
65 if let Some(handle) = self._thread.take() {
66 let _ = handle.join();
67 }
68 }
69 }
70
71 /// Spawn the background cleanup worker thread.
72 ///
73 /// The worker opens its own `Database` and `SampleStore` to avoid Mutex contention
74 /// with the GUI thread.
75 #[instrument(skip_all)]
76 pub fn spawn_cleanup_worker(
77 db_path: PathBuf,
78 store_root: PathBuf,
79 ) -> std::io::Result<CleanupHandle> {
80 let (cmd_tx, cmd_rx) = mpsc::channel::<CleanupCommand>();
81 let (event_tx, event_rx) = mpsc::channel::<CleanupEvent>();
82
83 let thread = thread::Builder::new()
84 .name("cleanup-worker".to_string())
85 .spawn(move || {
86 worker_loop(cmd_rx, event_tx, &db_path, &store_root);
87 })?;
88
89 Ok(CleanupHandle {
90 cmd_tx,
91 event_rx: Mutex::new(event_rx),
92 _thread: Some(thread),
93 })
94 }
95
96 #[instrument(skip_all)]
97 fn worker_loop(
98 cmd_rx: mpsc::Receiver<CleanupCommand>,
99 event_tx: mpsc::Sender<CleanupEvent>,
100 db_path: &std::path::Path,
101 store_root: &std::path::Path,
102 ) {
103 let db = match Database::open(db_path) {
104 Ok(d) => d,
105 Err(e) => {
106 let _ = event_tx.send(CleanupEvent::Complete {
107 removed: 0,
108 errors: 1,
109 });
110 error!("Cleanup worker failed to open database: {e}");
111 return;
112 }
113 };
114
115 let store = match SampleStore::new(store_root) {
116 Ok(s) => s,
117 Err(e) => {
118 let _ = event_tx.send(CleanupEvent::Complete {
119 removed: 0,
120 errors: 1,
121 });
122 error!("Cleanup worker failed to open store: {e}");
123 return;
124 }
125 };
126
127 while let Ok(cmd) = cmd_rx.recv() {
128 match cmd {
129 CleanupCommand::Shutdown => break,
130 CleanupCommand::Cancel => continue,
131 CleanupCommand::RemoveOrphans => {
132 remove_orphans(&cmd_rx, &event_tx, &db, &store);
133 }
134 }
135 }
136 }
137
138 fn remove_orphans(
139 cmd_rx: &mpsc::Receiver<CleanupCommand>,
140 event_tx: &mpsc::Sender<CleanupEvent>,
141 db: &Database,
142 store: &SampleStore,
143 ) {
144 // Query all orphaned samples in one pass
145 let orphans = match db.conn().prepare(
146 "SELECT s.hash, s.file_extension, s.original_name
147 FROM samples s
148 LEFT JOIN vfs_nodes vn ON s.hash = vn.sample_hash
149 WHERE vn.id IS NULL AND s.deleted_at IS NULL",
150 ) {
151 Ok(mut stmt) => {
152
153 stmt
154 .query_map([], |row| {
155 Ok((
156 row.get::<_, String>(0)?,
157 row.get::<_, String>(1)?,
158 row.get::<_, String>(2)?,
159 ))
160 })
161 .ok()
162 .map(|iter| iter.flatten().collect::<Vec<_>>())
163 .unwrap_or_default()
164 }
165 Err(e) => {
166 error!("Cleanup worker failed to query orphans: {e}");
167 let _ = event_tx.send(CleanupEvent::Complete {
168 removed: 0,
169 errors: 1,
170 });
171 return;
172 }
173 };
174
175 let total = orphans.len();
176 if total == 0 {
177 let _ = event_tx.send(CleanupEvent::Complete {
178 removed: 0,
179 errors: 0,
180 });
181 return;
182 }
183
184 let mut removed = 0usize;
185 let mut errors = 0usize;
186
187 for (i, (hash, ext, name)) in orphans.iter().enumerate() {
188 // Check for cancel between deletions
189 if let Ok(CleanupCommand::Cancel) | Ok(CleanupCommand::Shutdown) = cmd_rx.try_recv() {
190 let _ = event_tx.send(CleanupEvent::Complete { removed, errors });
191 return;
192 }
193
194 let _ = event_tx.send(CleanupEvent::Progress {
195 completed: i,
196 total,
197 current_name: name.clone(),
198 });
199
200 // Delete from DB — re-check orphan status to avoid racing with a concurrent import
201 // that may have linked this sample to a VFS node since the initial query.
202 match db.conn().execute(
203 "DELETE FROM samples WHERE hash = ?1 \
204 AND NOT EXISTS (SELECT 1 FROM vfs_nodes WHERE sample_hash = ?1)",
205 [hash],
206 ) {
207 Ok(_) => {
208 // Delete from disk
209 if let Ok(path) = store.sample_path(hash, ext)
210 && path.exists() {
211 let _ = std::fs::remove_file(&path);
212 }
213 removed += 1;
214 }
215 Err(e) => {
216 error!("Cleanup: failed to delete sample {hash}: {e}");
217 errors += 1;
218 }
219 }
220 }
221
222 // WAL checkpoint for clean state
223 let _ = db
224 .conn()
225 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE)");
226
227 let _ = event_tx.send(CleanupEvent::Complete { removed, errors });
228 }
229
230 #[cfg(test)]
231 mod tests {
232 use super::*;
233
234 #[test]
235 fn cleanup_command_variants_constructible() {
236 let _remove = CleanupCommand::RemoveOrphans;
237 let _cancel = CleanupCommand::Cancel;
238 let _shutdown = CleanupCommand::Shutdown;
239 }
240
241 #[test]
242 fn cleanup_event_variants_constructible() {
243 let _progress = CleanupEvent::Progress {
244 completed: 5,
245 total: 10,
246 current_name: "kick.wav".to_string(),
247 };
248 let _complete = CleanupEvent::Complete {
249 removed: 10,
250 errors: 0,
251 };
252 }
253
254 #[test]
255 fn spawn_and_drop_does_not_hang() {
256 let dir = tempfile::TempDir::new().unwrap();
257 let db_path = dir.path().join("audiofiles.db");
258 let store_root = dir.path().join("store");
259 std::fs::create_dir_all(&store_root).unwrap();
260
261 // Create the database so the worker can open it
262 let _db = Database::open(&db_path).unwrap();
263
264 let handle = spawn_cleanup_worker(db_path, store_root).unwrap();
265 assert!(handle.try_recv().is_none());
266 drop(handle); // Should send Shutdown and join cleanly
267 }
268
269 #[test]
270 fn cleanup_no_orphans_completes_immediately() {
271 let dir = tempfile::TempDir::new().unwrap();
272 let db_path = dir.path().join("audiofiles.db");
273 let store_root = dir.path().join("store");
274 std::fs::create_dir_all(&store_root).unwrap();
275
276 let _db = Database::open(&db_path).unwrap();
277
278 let handle = spawn_cleanup_worker(db_path, store_root).unwrap();
279 handle.send(CleanupCommand::RemoveOrphans);
280
281 // Give the worker a moment to process
282 std::thread::sleep(std::time::Duration::from_millis(100));
283
284 let mut got_complete = false;
285 while let Some(event) = handle.try_recv() {
286 if let CleanupEvent::Complete { removed, errors } = event {
287 assert_eq!(removed, 0);
288 assert_eq!(errors, 0);
289 got_complete = true;
290 }
291 }
292 assert!(got_complete);
293 }
294 }
295