//! Background analysis worker thread: processes sample batches off the GUI thread with cancel support. //! //! The worker runs in a dedicated thread, receiving [`WorkerCommand`]s over a channel //! and emitting [`WorkerEvent`]s back. Batches are processed in parallel using rayon. //! An `AtomicBool` cancel flag allows interrupting in-flight parallel work. use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::thread; use rayon::prelude::*; use super::config::AnalysisConfig; use super::suggest::TagSuggestion; use super::{analyze_sample, AnalysisResult}; use tracing::instrument; /// Command sent from the GUI thread to the analysis worker. pub enum WorkerCommand { /// Analyze a batch of samples. Each tuple: (hash, file_extension, path_on_disk). AnalyzeBatch { /// List of `(hash, file_extension, path_on_disk)` tuples to analyze. samples: Vec<(String, String, PathBuf)>, /// Which analysis stages to run. config: AnalysisConfig, }, /// Cancel the current batch. Cancel, /// Shut down the worker thread. Shutdown, } /// Event sent from the analysis worker back to the GUI thread. pub enum WorkerEvent { /// Progress update. Progress { /// Number of samples finished so far. completed: usize, /// Total number of samples in the batch. total: usize, /// Filename of the sample currently being analyzed. current_name: String, }, /// A single sample finished analysis. SampleDone { /// The computed analysis result. result: Box, /// Auto-generated tag suggestions based on the analysis. suggestions: Vec, }, /// A single sample failed. SampleError { /// Hash of the sample that failed. hash: String, /// Human-readable error description. error: String, }, /// The entire batch is complete (or was cancelled). BatchComplete, } /// Handle for communicating with the background worker. /// /// The receiver is wrapped in a Mutex to satisfy `Sync` bounds required /// by nih-plug's editor state (BrowserState must be Send + Sync). /// Only the GUI thread actually calls try_recv, so contention is zero. pub struct WorkerHandle { cmd_tx: mpsc::Sender, event_rx: Mutex>, cancel_flag: Arc, thread: Option>, } impl WorkerHandle { /// Poll for the next event without blocking. Returns `None` if no events pending. pub fn try_recv(&self) -> Option { self.event_rx.lock().ok()?.try_recv().ok() } /// Send a command to the worker. pub fn send(&self, cmd: WorkerCommand) { if matches!(cmd, WorkerCommand::Cancel) { self.cancel_flag.store(true, Ordering::Release); } let _ = self.cmd_tx.send(cmd); } } impl Drop for WorkerHandle { fn drop(&mut self) { self.cancel_flag.store(true, Ordering::Release); let _ = self.cmd_tx.send(WorkerCommand::Shutdown); if let Some(handle) = self.thread.take() { let _ = handle.join(); } } } /// Spawn the background analysis worker thread. /// /// Returns a handle with sender/receiver channels. The GUI thread /// should call `handle.try_recv()` each frame to poll for events. #[instrument(skip_all)] pub fn spawn_worker() -> std::io::Result { let (cmd_tx, cmd_rx) = mpsc::channel::(); let (event_tx, event_rx) = mpsc::channel::(); let cancel_flag = Arc::new(AtomicBool::new(false)); let cancel_clone = Arc::clone(&cancel_flag); let thread = thread::Builder::new() .name("analysis-worker".to_string()) .spawn(move || { worker_loop(cmd_rx, event_tx, cancel_clone); })?; Ok(WorkerHandle { cmd_tx, event_rx: Mutex::new(event_rx), cancel_flag, thread: Some(thread), }) } fn worker_loop( cmd_rx: mpsc::Receiver, event_tx: mpsc::Sender, cancel_flag: Arc, ) { while let Ok(cmd) = cmd_rx.recv() { match cmd { WorkerCommand::Shutdown => break, WorkerCommand::Cancel => { // Cancel flag already set by WorkerHandle::send; nothing else to do continue; } WorkerCommand::AnalyzeBatch { samples, config } => { // Reset cancel flag for this batch cancel_flag.store(false, Ordering::Release); let total = samples.len(); let completed = Arc::new(AtomicUsize::new(0)); // Process samples in parallel using rayon samples.par_iter().for_each(|(hash, _ext, path)| { // Check cancel flag before starting each sample if cancel_flag.load(Ordering::Acquire) { return; } let name = crate::util::get_filename(path, "unknown"); let done = completed.load(Ordering::Acquire); let _ = event_tx.send(WorkerEvent::Progress { completed: done, total, current_name: name, }); // Catch panics so a single bad sample doesn't kill the worker let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { analyze_sample(hash, path, &config) })); match result { Ok(Ok(result)) => { let suggestions = if config.auto_suggest_tags { super::suggest::suggest_tags(&result) } else { Vec::new() }; let _ = event_tx.send(WorkerEvent::SampleDone { result: Box::new(result), suggestions, }); } Ok(Err(e)) => { let _ = event_tx.send(WorkerEvent::SampleError { hash: hash.clone(), error: e.to_string(), }); } Err(_panic) => { let _ = event_tx.send(WorkerEvent::SampleError { hash: hash.clone(), error: "analysis panicked (internal error)".to_string(), }); } } completed.fetch_add(1, Ordering::Release); }); let _ = event_tx.send(WorkerEvent::BatchComplete); } } } } #[cfg(test)] mod tests { use super::*; #[test] fn worker_command_cancel_construction() { // Verify WorkerCommand variants can be constructed and matched let cmd = WorkerCommand::Cancel; assert!(matches!(cmd, WorkerCommand::Cancel)); } #[test] fn worker_command_shutdown_construction() { let cmd = WorkerCommand::Shutdown; assert!(matches!(cmd, WorkerCommand::Shutdown)); } #[test] fn worker_command_analyze_batch_construction() { let config = AnalysisConfig::default(); let samples = vec![ ("hash1".to_string(), "wav".to_string(), PathBuf::from("/a.wav")), ("hash2".to_string(), "mp3".to_string(), PathBuf::from("/b.mp3")), ]; let cmd = WorkerCommand::AnalyzeBatch { samples: samples.clone(), config, }; match cmd { WorkerCommand::AnalyzeBatch { samples: s, config: c } => { assert_eq!(s.len(), 2); assert_eq!(s[0].0, "hash1"); assert_eq!(s[1].2, PathBuf::from("/b.mp3")); assert!(c.loudness); } _ => panic!("expected AnalyzeBatch"), } } #[test] fn spawn_worker_and_shutdown_via_drop() { // spawn_worker creates a handle; dropping it sends Shutdown and joins let handle = spawn_worker().unwrap(); // Verify try_recv returns None when idle assert!(handle.try_recv().is_none()); // Drop triggers Shutdown + join — should not hang or panic drop(handle); } #[test] fn worker_event_variants_constructible() { let _progress = WorkerEvent::Progress { completed: 5, total: 10, current_name: "kick.wav".to_string(), }; let _sample_err = WorkerEvent::SampleError { hash: "abc".to_string(), error: "decode failed".to_string(), }; let _batch_complete = WorkerEvent::BatchComplete; // Just verify these can be constructed without panic } #[test] fn cancel_flag_set_on_cancel() { let handle = spawn_worker().unwrap(); handle.send(WorkerCommand::Cancel); // Give it a moment to process std::thread::sleep(std::time::Duration::from_millis(10)); assert!(handle.cancel_flag.load(Ordering::Acquire)); drop(handle); } }