//! Background cleanup worker: removes orphaned samples off the GUI thread. //! //! Mirrors the pattern in `export.rs` — dedicated thread with its own Database + //! SampleStore, communicating via channels. The GUI thread polls events each frame. use std::path::PathBuf; use std::sync::{mpsc, Mutex}; use std::thread; use tracing::{error, instrument}; use audiofiles_core::db::Database; use audiofiles_core::store::SampleStore; /// Command sent from the GUI thread to the cleanup worker. pub enum CleanupCommand { /// Start removing orphaned samples. RemoveOrphans, /// Cancel the current cleanup. Cancel, /// Shut down the worker thread. Shutdown, } /// Event sent from the cleanup worker back to the GUI thread. pub enum CleanupEvent { /// Progress update for one sample removed. Progress { completed: usize, total: usize, current_name: String, }, /// The cleanup is complete. Complete { removed: usize, errors: usize, }, } /// Handle for communicating with the background cleanup worker. /// /// The receiver is wrapped in a `Mutex` so `BrowserState` remains `Sync` (required by nih-plug). /// Only the GUI thread actually calls `try_recv`, so contention is zero. pub struct CleanupHandle { cmd_tx: mpsc::Sender, event_rx: Mutex>, _thread: Option>, } impl CleanupHandle { /// Poll for the next event without blocking. pub fn try_recv(&self) -> Option { self.event_rx.lock().ok()?.try_recv().ok() } /// Send a command to the worker. pub fn send(&self, cmd: CleanupCommand) { let _ = self.cmd_tx.send(cmd); } } impl Drop for CleanupHandle { fn drop(&mut self) { let _ = self.cmd_tx.send(CleanupCommand::Shutdown); if let Some(handle) = self._thread.take() { let _ = handle.join(); } } } /// Spawn the background cleanup worker thread. /// /// The worker opens its own `Database` and `SampleStore` to avoid Mutex contention /// with the GUI thread. #[instrument(skip_all)] pub fn spawn_cleanup_worker( db_path: PathBuf, store_root: PathBuf, ) -> std::io::Result { let (cmd_tx, cmd_rx) = mpsc::channel::(); let (event_tx, event_rx) = mpsc::channel::(); let thread = thread::Builder::new() .name("cleanup-worker".to_string()) .spawn(move || { worker_loop(cmd_rx, event_tx, &db_path, &store_root); })?; Ok(CleanupHandle { cmd_tx, event_rx: Mutex::new(event_rx), _thread: Some(thread), }) } #[instrument(skip_all)] fn worker_loop( cmd_rx: mpsc::Receiver, event_tx: mpsc::Sender, db_path: &std::path::Path, store_root: &std::path::Path, ) { let db = match Database::open(db_path) { Ok(d) => d, Err(e) => { let _ = event_tx.send(CleanupEvent::Complete { removed: 0, errors: 1, }); error!("Cleanup worker failed to open database: {e}"); return; } }; let store = match SampleStore::new(store_root) { Ok(s) => s, Err(e) => { let _ = event_tx.send(CleanupEvent::Complete { removed: 0, errors: 1, }); error!("Cleanup worker failed to open store: {e}"); return; } }; while let Ok(cmd) = cmd_rx.recv() { match cmd { CleanupCommand::Shutdown => break, CleanupCommand::Cancel => continue, CleanupCommand::RemoveOrphans => { remove_orphans(&cmd_rx, &event_tx, &db, &store); } } } } fn remove_orphans( cmd_rx: &mpsc::Receiver, event_tx: &mpsc::Sender, db: &Database, store: &SampleStore, ) { // Query all orphaned samples in one pass let orphans = match db.conn().prepare( "SELECT s.hash, s.file_extension, s.original_name FROM samples s LEFT JOIN vfs_nodes vn ON s.hash = vn.sample_hash WHERE vn.id IS NULL AND s.deleted_at IS NULL", ) { Ok(mut stmt) => { stmt .query_map([], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, String>(1)?, row.get::<_, String>(2)?, )) }) .ok() .map(|iter| iter.flatten().collect::>()) .unwrap_or_default() } Err(e) => { error!("Cleanup worker failed to query orphans: {e}"); let _ = event_tx.send(CleanupEvent::Complete { removed: 0, errors: 1, }); return; } }; let total = orphans.len(); if total == 0 { let _ = event_tx.send(CleanupEvent::Complete { removed: 0, errors: 0, }); return; } let mut removed = 0usize; let mut errors = 0usize; for (i, (hash, ext, name)) in orphans.iter().enumerate() { // Check for cancel between deletions if let Ok(CleanupCommand::Cancel) | Ok(CleanupCommand::Shutdown) = cmd_rx.try_recv() { let _ = event_tx.send(CleanupEvent::Complete { removed, errors }); return; } let _ = event_tx.send(CleanupEvent::Progress { completed: i, total, current_name: name.clone(), }); // Delete from DB — re-check orphan status to avoid racing with a concurrent import // that may have linked this sample to a VFS node since the initial query. match db.conn().execute( "DELETE FROM samples WHERE hash = ?1 \ AND NOT EXISTS (SELECT 1 FROM vfs_nodes WHERE sample_hash = ?1)", [hash], ) { Ok(_) => { // Delete from disk if let Ok(path) = store.sample_path(hash, ext) && path.exists() { let _ = std::fs::remove_file(&path); } removed += 1; } Err(e) => { error!("Cleanup: failed to delete sample {hash}: {e}"); errors += 1; } } } // WAL checkpoint for clean state let _ = db .conn() .execute_batch("PRAGMA wal_checkpoint(TRUNCATE)"); let _ = event_tx.send(CleanupEvent::Complete { removed, errors }); } #[cfg(test)] mod tests { use super::*; #[test] fn cleanup_command_variants_constructible() { let _remove = CleanupCommand::RemoveOrphans; let _cancel = CleanupCommand::Cancel; let _shutdown = CleanupCommand::Shutdown; } #[test] fn cleanup_event_variants_constructible() { let _progress = CleanupEvent::Progress { completed: 5, total: 10, current_name: "kick.wav".to_string(), }; let _complete = CleanupEvent::Complete { removed: 10, errors: 0, }; } #[test] fn spawn_and_drop_does_not_hang() { let dir = tempfile::TempDir::new().unwrap(); let db_path = dir.path().join("audiofiles.db"); let store_root = dir.path().join("store"); std::fs::create_dir_all(&store_root).unwrap(); // Create the database so the worker can open it let _db = Database::open(&db_path).unwrap(); let handle = spawn_cleanup_worker(db_path, store_root).unwrap(); assert!(handle.try_recv().is_none()); drop(handle); // Should send Shutdown and join cleanly } #[test] fn cleanup_no_orphans_completes_immediately() { let dir = tempfile::TempDir::new().unwrap(); let db_path = dir.path().join("audiofiles.db"); let store_root = dir.path().join("store"); std::fs::create_dir_all(&store_root).unwrap(); let _db = Database::open(&db_path).unwrap(); let handle = spawn_cleanup_worker(db_path, store_root).unwrap(); handle.send(CleanupCommand::RemoveOrphans); // Give the worker a moment to process std::thread::sleep(std::time::Duration::from_millis(100)); let mut got_complete = false; while let Some(event) = handle.try_recv() { if let CleanupEvent::Complete { removed, errors } = event { assert_eq!(removed, 0); assert_eq!(errors, 0); got_complete = true; } } assert!(got_complete); } }