Skip to main content

max / audiofiles

9.3 KB · 274 lines History Blame Raw
1 //! Background analysis worker thread: processes sample batches off the GUI thread with cancel support.
2 //!
3 //! The worker runs in a dedicated thread, receiving [`WorkerCommand`]s over a channel
4 //! and emitting [`WorkerEvent`]s back. Batches are processed in parallel using rayon.
5 //! An `AtomicBool` cancel flag allows interrupting in-flight parallel work.
6
7 use std::path::PathBuf;
8 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9 use std::sync::{mpsc, Arc, Mutex};
10 use std::thread;
11
12 use rayon::prelude::*;
13
14 use super::config::AnalysisConfig;
15 use super::suggest::TagSuggestion;
16 use super::{analyze_sample, AnalysisResult};
17 use tracing::instrument;
18
19 /// Command sent from the GUI thread to the analysis worker.
20 pub enum WorkerCommand {
21 /// Analyze a batch of samples. Each tuple: (hash, file_extension, path_on_disk).
22 AnalyzeBatch {
23 /// List of `(hash, file_extension, path_on_disk)` tuples to analyze.
24 samples: Vec<(String, String, PathBuf)>,
25 /// Which analysis stages to run.
26 config: AnalysisConfig,
27 },
28 /// Cancel the current batch.
29 Cancel,
30 /// Shut down the worker thread.
31 Shutdown,
32 }
33
34 /// Event sent from the analysis worker back to the GUI thread.
35 pub enum WorkerEvent {
36 /// Progress update.
37 Progress {
38 /// Number of samples finished so far.
39 completed: usize,
40 /// Total number of samples in the batch.
41 total: usize,
42 /// Filename of the sample currently being analyzed.
43 current_name: String,
44 },
45 /// A single sample finished analysis.
46 SampleDone {
47 /// The computed analysis result.
48 result: Box<AnalysisResult>,
49 /// Auto-generated tag suggestions based on the analysis.
50 suggestions: Vec<TagSuggestion>,
51 },
52 /// A single sample failed.
53 SampleError {
54 /// Hash of the sample that failed.
55 hash: String,
56 /// Human-readable error description.
57 error: String,
58 },
59 /// The entire batch is complete (or was cancelled).
60 BatchComplete,
61 }
62
63 /// Handle for communicating with the background worker.
64 ///
65 /// The receiver is wrapped in a Mutex to satisfy `Sync` bounds required
66 /// by nih-plug's editor state (BrowserState must be Send + Sync).
67 /// Only the GUI thread actually calls try_recv, so contention is zero.
68 pub struct WorkerHandle {
69 cmd_tx: mpsc::Sender<WorkerCommand>,
70 event_rx: Mutex<mpsc::Receiver<WorkerEvent>>,
71 cancel_flag: Arc<AtomicBool>,
72 thread: Option<thread::JoinHandle<()>>,
73 }
74
75 impl WorkerHandle {
76 /// Poll for the next event without blocking. Returns `None` if no events pending.
77 pub fn try_recv(&self) -> Option<WorkerEvent> {
78 self.event_rx.lock().ok()?.try_recv().ok()
79 }
80
81 /// Send a command to the worker.
82 pub fn send(&self, cmd: WorkerCommand) {
83 if matches!(cmd, WorkerCommand::Cancel) {
84 self.cancel_flag.store(true, Ordering::Release);
85 }
86 let _ = self.cmd_tx.send(cmd);
87 }
88 }
89
90 impl Drop for WorkerHandle {
91 fn drop(&mut self) {
92 self.cancel_flag.store(true, Ordering::Release);
93 let _ = self.cmd_tx.send(WorkerCommand::Shutdown);
94 if let Some(handle) = self.thread.take() {
95 let _ = handle.join();
96 }
97 }
98 }
99
100 /// Spawn the background analysis worker thread.
101 ///
102 /// Returns a handle with sender/receiver channels. The GUI thread
103 /// should call `handle.try_recv()` each frame to poll for events.
104 #[instrument(skip_all)]
105 pub fn spawn_worker() -> std::io::Result<WorkerHandle> {
106 let (cmd_tx, cmd_rx) = mpsc::channel::<WorkerCommand>();
107 let (event_tx, event_rx) = mpsc::channel::<WorkerEvent>();
108 let cancel_flag = Arc::new(AtomicBool::new(false));
109 let cancel_clone = Arc::clone(&cancel_flag);
110
111 let thread = thread::Builder::new()
112 .name("analysis-worker".to_string())
113 .spawn(move || {
114 worker_loop(cmd_rx, event_tx, cancel_clone);
115 })?;
116
117 Ok(WorkerHandle {
118 cmd_tx,
119 event_rx: Mutex::new(event_rx),
120 cancel_flag,
121 thread: Some(thread),
122 })
123 }
124
125 fn worker_loop(
126 cmd_rx: mpsc::Receiver<WorkerCommand>,
127 event_tx: mpsc::Sender<WorkerEvent>,
128 cancel_flag: Arc<AtomicBool>,
129 ) {
130 while let Ok(cmd) = cmd_rx.recv() {
131 match cmd {
132 WorkerCommand::Shutdown => break,
133 WorkerCommand::Cancel => {
134 // Cancel flag already set by WorkerHandle::send; nothing else to do
135 continue;
136 }
137 WorkerCommand::AnalyzeBatch { samples, config } => {
138 // Reset cancel flag for this batch
139 cancel_flag.store(false, Ordering::Release);
140
141 let total = samples.len();
142 let completed = Arc::new(AtomicUsize::new(0));
143
144 // Process samples in parallel using rayon
145 samples.par_iter().for_each(|(hash, _ext, path)| {
146 // Check cancel flag before starting each sample
147 if cancel_flag.load(Ordering::Acquire) {
148 return;
149 }
150
151 let name = crate::util::get_filename(path, "unknown");
152 let done = completed.load(Ordering::Acquire);
153
154 let _ = event_tx.send(WorkerEvent::Progress {
155 completed: done,
156 total,
157 current_name: name,
158 });
159
160 // Catch panics so a single bad sample doesn't kill the worker
161 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
162 analyze_sample(hash, path, &config)
163 }));
164
165 match result {
166 Ok(Ok(result)) => {
167 let suggestions = if config.auto_suggest_tags {
168 super::suggest::suggest_tags(&result)
169 } else {
170 Vec::new()
171 };
172 let _ = event_tx.send(WorkerEvent::SampleDone {
173 result: Box::new(result),
174 suggestions,
175 });
176 }
177 Ok(Err(e)) => {
178 let _ = event_tx.send(WorkerEvent::SampleError {
179 hash: hash.clone(),
180 error: e.to_string(),
181 });
182 }
183 Err(_panic) => {
184 let _ = event_tx.send(WorkerEvent::SampleError {
185 hash: hash.clone(),
186 error: "analysis panicked (internal error)".to_string(),
187 });
188 }
189 }
190
191 completed.fetch_add(1, Ordering::Release);
192 });
193
194 let _ = event_tx.send(WorkerEvent::BatchComplete);
195 }
196 }
197 }
198 }
199
200 #[cfg(test)]
201 mod tests {
202 use super::*;
203
204 #[test]
205 fn worker_command_cancel_construction() {
206 // Verify WorkerCommand variants can be constructed and matched
207 let cmd = WorkerCommand::Cancel;
208 assert!(matches!(cmd, WorkerCommand::Cancel));
209 }
210
211 #[test]
212 fn worker_command_shutdown_construction() {
213 let cmd = WorkerCommand::Shutdown;
214 assert!(matches!(cmd, WorkerCommand::Shutdown));
215 }
216
217 #[test]
218 fn worker_command_analyze_batch_construction() {
219 let config = AnalysisConfig::default();
220 let samples = vec![
221 ("hash1".to_string(), "wav".to_string(), PathBuf::from("/a.wav")),
222 ("hash2".to_string(), "mp3".to_string(), PathBuf::from("/b.mp3")),
223 ];
224 let cmd = WorkerCommand::AnalyzeBatch {
225 samples: samples.clone(),
226 config,
227 };
228 match cmd {
229 WorkerCommand::AnalyzeBatch { samples: s, config: c } => {
230 assert_eq!(s.len(), 2);
231 assert_eq!(s[0].0, "hash1");
232 assert_eq!(s[1].2, PathBuf::from("/b.mp3"));
233 assert!(c.loudness);
234 }
235 _ => panic!("expected AnalyzeBatch"),
236 }
237 }
238
239 #[test]
240 fn spawn_worker_and_shutdown_via_drop() {
241 // spawn_worker creates a handle; dropping it sends Shutdown and joins
242 let handle = spawn_worker().unwrap();
243 // Verify try_recv returns None when idle
244 assert!(handle.try_recv().is_none());
245 // Drop triggers Shutdown + join — should not hang or panic
246 drop(handle);
247 }
248
249 #[test]
250 fn worker_event_variants_constructible() {
251 let _progress = WorkerEvent::Progress {
252 completed: 5,
253 total: 10,
254 current_name: "kick.wav".to_string(),
255 };
256 let _sample_err = WorkerEvent::SampleError {
257 hash: "abc".to_string(),
258 error: "decode failed".to_string(),
259 };
260 let _batch_complete = WorkerEvent::BatchComplete;
261 // Just verify these can be constructed without panic
262 }
263
264 #[test]
265 fn cancel_flag_set_on_cancel() {
266 let handle = spawn_worker().unwrap();
267 handle.send(WorkerCommand::Cancel);
268 // Give it a moment to process
269 std::thread::sleep(std::time::Duration::from_millis(10));
270 assert!(handle.cancel_flag.load(Ordering::Acquire));
271 drop(handle);
272 }
273 }
274