Skip to main content

max / audiofiles

23.0 KB · 705 lines History Blame Raw
1 //! Background folder import worker: walks directories and imports audio files off the GUI thread.
2 //!
3 //! Mirrors the pattern in `audiofiles_core::analysis::worker` — the worker runs in a
4 //! dedicated thread with its own DB connection, communicating via channels. Between files
5 //! it checks for cancellation, keeping the UI responsive during large imports.
6
7 use std::fs;
8 use std::path::{Path, PathBuf};
9 use std::sync::{mpsc, Mutex};
10 use std::thread;
11 use std::time::{Duration, Instant};
12
13 use tracing::{error, instrument, warn};
14
15 use audiofiles_core::db::Database;
16 use audiofiles_core::error::CoreError;
17 use audiofiles_core::store::SampleStore;
18 use audiofiles_core::vfs::{self, NodeType};
19 use audiofiles_core::{NodeId, VfsId};
20
21 /// Check whether a path has an audio file extension.
22 fn is_audio_file(path: &Path) -> bool {
23 audiofiles_core::util::is_audio_file(path)
24 }
25
26 /// Check whether a directory should be skipped during traversal.
27 fn is_skipped_dir(path: &Path) -> bool {
28 audiofiles_core::util::is_macos_metadata_dir(path)
29 }
30
31 /// How imported files should be organized in the VFS.
32 pub enum ImportStrategy {
33 /// All links in one directory, no subdirs created.
34 Flat { vfs_id: VfsId, parent_id: Option<NodeId> },
35 /// Create a new VFS, preserve source directory structure.
36 NewVfs { vfs_name: String },
37 /// Merge with structure into an existing VFS.
38 MergeIntoVfs { vfs_id: VfsId, parent_id: Option<NodeId> },
39 }
40
41 /// A top-level source folder and its imported samples.
42 #[derive(Clone)]
43 pub struct ImportedFolder {
44 pub name: String,
45 pub samples: Vec<(String, String)>, // (hash, ext)
46 }
47
48 /// Result of importing a single file into the VFS.
49 enum ImportFileResult {
50 Imported(String, String), // (hash, ext)
51 Duplicate, // NameConflict — link already existed
52 }
53
54 /// Command sent from the GUI thread to the import worker.
55 pub enum ImportCommand {
56 /// Import all audio files from `source` using the given strategy.
57 ImportDirectory {
58 source: PathBuf,
59 strategy: ImportStrategy,
60 },
61 /// Cancel the current import.
62 Cancel,
63 /// Shut down the worker thread.
64 Shutdown,
65 }
66
67 /// Event sent from the import worker back to the GUI thread.
68 pub enum ImportEvent {
69 /// Throttled progress emitted during the pre-walk so the UI can show a
70 /// running file count instead of an indeterminate spinner (m-12). Fires
71 /// at most every ~100ms.
72 WalkProgress { count: usize, total_bytes: u64 },
73 /// Pre-walk finished — we now know the total file count and size.
74 WalkComplete { total: usize, total_bytes: u64 },
75 /// One file was processed.
76 Progress {
77 completed: usize,
78 total: usize,
79 current_name: String,
80 },
81 /// A single file failed to import.
82 FileError { path: String, error: String },
83 /// The entire import is done.
84 Complete {
85 /// `(hash, file_extension)` pairs for analysis flow.
86 imported: Vec<(String, String)>,
87 total_files: usize,
88 errors: usize,
89 duplicates: usize,
90 /// Top-level folders with their samples (empty for flat imports).
91 folders: Vec<ImportedFolder>,
92 },
93 }
94
95 /// Handle for communicating with the background import worker.
96 ///
97 /// The receiver is wrapped in a `Mutex` so `BrowserState` remains `Sync` (required by nih-plug).
98 /// Only the GUI thread actually calls `try_recv`, so contention is zero.
99 pub struct ImportHandle {
100 cmd_tx: mpsc::Sender<ImportCommand>,
101 event_rx: Mutex<mpsc::Receiver<ImportEvent>>,
102 _thread: Option<thread::JoinHandle<()>>,
103 }
104
105 impl ImportHandle {
106 /// Poll for the next event without blocking.
107 pub fn try_recv(&self) -> Option<ImportEvent> {
108 self.event_rx.lock().ok()?.try_recv().ok()
109 }
110
111 /// Send a command to the worker.
112 pub fn send(&self, cmd: ImportCommand) {
113 let _ = self.cmd_tx.send(cmd);
114 }
115 }
116
117 impl Drop for ImportHandle {
118 fn drop(&mut self) {
119 let _ = self.cmd_tx.send(ImportCommand::Shutdown);
120 if let Some(handle) = self._thread.take() {
121 let _ = handle.join();
122 }
123 }
124 }
125
126 /// Spawn the background import worker thread.
127 ///
128 /// The worker opens its own `Database` and `SampleStore` to avoid Mutex contention
129 /// with the GUI thread. Returns a handle for sending commands and polling events.
130 #[instrument(skip_all)]
131 pub fn spawn_import_worker(db_path: PathBuf, store_root: PathBuf) -> std::io::Result<ImportHandle> {
132 let (cmd_tx, cmd_rx) = mpsc::channel::<ImportCommand>();
133 let (event_tx, event_rx) = mpsc::channel::<ImportEvent>();
134
135 let thread = thread::Builder::new()
136 .name("import-worker".to_string())
137 .spawn(move || {
138 worker_loop(cmd_rx, event_tx, &db_path, &store_root);
139 })?;
140
141 Ok(ImportHandle {
142 cmd_tx,
143 event_rx: Mutex::new(event_rx),
144 _thread: Some(thread),
145 })
146 }
147
148 /// Recursively count audio files and sum their sizes under `dir`.
149 /// Checks for cancellation between entries. Returns `None` if cancelled.
150 #[instrument(skip_all)]
151 fn count_audio_files(
152 dir: &Path,
153 cmd_rx: &mpsc::Receiver<ImportCommand>,
154 event_tx: &mpsc::Sender<ImportEvent>,
155 ) -> Option<(usize, u64)> {
156 let mut count = 0;
157 let mut total_bytes = 0u64;
158 let mut stack = vec![dir.to_path_buf()];
159 let mut last_emit = Instant::now();
160 let emit_interval = Duration::from_millis(100);
161
162 while let Some(current) = stack.pop() {
163 // Check for cancel
164 if let Ok(ImportCommand::Cancel) | Ok(ImportCommand::Shutdown) = cmd_rx.try_recv() {
165 return None;
166 }
167
168 let entries = match fs::read_dir(&current) {
169 Ok(e) => e,
170 Err(e) => {
171 tracing::warn!(dir = %current.display(), "Failed to read directory during pre-walk: {e}");
172 continue;
173 }
174 };
175
176 for entry in entries.flatten() {
177 let path = entry.path();
178 if path.is_dir() {
179 if !is_skipped_dir(&path) {
180 stack.push(path);
181 }
182 } else if is_audio_file(&path) {
183 count += 1;
184 if let Ok(meta) = fs::metadata(&path) {
185 total_bytes += meta.len();
186 }
187 if last_emit.elapsed() >= emit_interval {
188 let _ = event_tx.send(ImportEvent::WalkProgress { count, total_bytes });
189 last_emit = Instant::now();
190 }
191 }
192 }
193 }
194
195 Some((count, total_bytes))
196 }
197
198 /// Import a single file into store + VFS, returning the result.
199 fn import_single_file(
200 path: &Path,
201 vfs_id: VfsId,
202 parent_id: Option<NodeId>,
203 store: &SampleStore,
204 db: &Database,
205 loose_files: bool,
206 ) -> Result<ImportFileResult, CoreError> {
207 let hash = if loose_files {
208 store.import_loose_files(path, db)?
209 } else {
210 store.import(path, db)?
211 };
212 let name = audiofiles_core::util::get_filename(path, "unknown");
213 let ext = audiofiles_core::util::get_extension(path);
214
215 match vfs::create_sample_link(db, vfs_id, parent_id, &name, &hash) {
216 Ok(_) => Ok(ImportFileResult::Imported(hash, ext)),
217 Err(CoreError::NameConflict(_)) => Ok(ImportFileResult::Duplicate),
218 Err(e) => {
219 if let CoreError::Db(ref sqlite_err) = e
220 && sqlite_err.to_string().contains("UNIQUE") {
221 return Ok(ImportFileResult::Duplicate);
222 }
223 Err(e)
224 }
225 }
226 }
227
228 /// Shared mutable state and dependencies for the import functions.
229 ///
230 /// Bundles the store, DB, channels, and progress counters so that
231 /// `import_directory_recursive`, `import_directory_flat`, and `import_structured`
232 /// don't each need 12 parameters.
233 struct ImportContext<'a> {
234 store: &'a SampleStore,
235 db: &'a Database,
236 event_tx: &'a mpsc::Sender<ImportEvent>,
237 cmd_rx: &'a mpsc::Receiver<ImportCommand>,
238 completed: &'a mut usize,
239 total: usize,
240 errors: &'a mut usize,
241 duplicates: &'a mut usize,
242 imported: &'a mut Vec<(String, String)>,
243 loose_files: bool,
244 }
245
246 impl ImportContext<'_> {
247 /// Check for a cancellation command without blocking.
248 fn is_cancelled(&self) -> bool {
249 matches!(
250 self.cmd_rx.try_recv(),
251 Ok(ImportCommand::Cancel) | Ok(ImportCommand::Shutdown)
252 )
253 }
254
255 /// Send a progress event for the current file.
256 fn send_progress(&self, name: String) {
257 let _ = self.event_tx.send(ImportEvent::Progress {
258 completed: *self.completed,
259 total: self.total,
260 current_name: name,
261 });
262 }
263
264 /// Import a single audio file, updating counters and sending error events.
265 fn process_file(&mut self, path: &Path, vfs_id: VfsId, parent_id: Option<NodeId>) {
266 let name = audiofiles_core::util::get_filename(path, "unknown");
267 self.send_progress(name);
268
269 match import_single_file(path, vfs_id, parent_id, self.store, self.db, self.loose_files) {
270 Ok(ImportFileResult::Imported(hash, ext)) => {
271 self.imported.push((hash, ext));
272 *self.completed += 1;
273 }
274 Ok(ImportFileResult::Duplicate) => {
275 *self.duplicates += 1;
276 *self.completed += 1;
277 }
278 Err(e) => {
279 *self.errors += 1;
280 let _ = self.event_tx.send(ImportEvent::FileError {
281 path: path.display().to_string(),
282 error: e.to_string(),
283 });
284 }
285 }
286 }
287 }
288
289 /// Recursively import a directory with structure, sending progress events.
290 /// Returns `true` if cancelled.
291 fn import_directory_recursive(
292 dir: &Path,
293 vfs_id: VfsId,
294 parent_id: Option<NodeId>,
295 ctx: &mut ImportContext<'_>,
296 ) -> bool {
297 let entries = match fs::read_dir(dir) {
298 Ok(e) => e,
299 Err(_) => {
300 *ctx.errors += 1;
301 return false; // not cancelled
302 }
303 };
304
305 let mut paths: Vec<PathBuf> = entries.flatten().map(|e| e.path()).collect();
306 paths.sort();
307
308 for path in paths {
309 if ctx.is_cancelled() {
310 return true;
311 }
312
313 if path.is_dir() {
314 if is_skipped_dir(&path) {
315 continue;
316 }
317 let dir_name = audiofiles_core::util::get_filename(&path, "folder");
318
319 let dir_node_id =
320 match vfs::create_directory(ctx.db, vfs_id, parent_id, &dir_name) {
321 Ok(id) => Some(id),
322 Err(CoreError::NameConflict(_)) => {
323 vfs::list_children(ctx.db, vfs_id, parent_id)
324 .unwrap_or_default()
325 .iter()
326 .find(|n| {
327 n.name == dir_name && n.node_type == NodeType::Directory
328 })
329 .map(|n| n.id)
330 }
331 Err(_) => {
332 *ctx.errors += 1;
333 continue;
334 }
335 };
336
337 let cancelled = import_directory_recursive(
338 &path,
339 vfs_id,
340 dir_node_id.or(parent_id),
341 ctx,
342 );
343 if cancelled {
344 return true;
345 }
346 } else if path.is_file() && is_audio_file(&path) {
347 ctx.process_file(&path, vfs_id, parent_id);
348 }
349 }
350
351 false // not cancelled
352 }
353
354 /// Flat import: walk source tree recursively but put all VFS links into a single directory.
355 /// Returns `true` if cancelled.
356 fn import_directory_flat(
357 dir: &Path,
358 vfs_id: VfsId,
359 parent_id: Option<NodeId>,
360 ctx: &mut ImportContext<'_>,
361 ) -> bool {
362 let entries = match fs::read_dir(dir) {
363 Ok(e) => e,
364 Err(_) => {
365 *ctx.errors += 1;
366 return false;
367 }
368 };
369
370 let mut paths: Vec<PathBuf> = entries.flatten().map(|e| e.path()).collect();
371 paths.sort();
372
373 for path in paths {
374 if ctx.is_cancelled() {
375 return true;
376 }
377
378 if path.is_dir() {
379 if is_skipped_dir(&path) {
380 continue;
381 }
382 // Recurse into subdirs but still put all links at the same flat level
383 let cancelled = import_directory_flat(&path, vfs_id, parent_id, ctx);
384 if cancelled {
385 return true;
386 }
387 } else if path.is_file() && is_audio_file(&path) {
388 ctx.process_file(&path, vfs_id, parent_id);
389 }
390 }
391
392 false
393 }
394
395 /// Structured import: iterate source dir's immediate children, import each top-level
396 /// subdirectory via `import_directory_recursive`, tracking `ImportedFolder` per top-level dir.
397 /// Files directly in the source root are imported without a folder grouping.
398 /// Returns `(cancelled, folders)`.
399 fn import_structured(
400 source: &Path,
401 vfs_id: VfsId,
402 parent_id: Option<NodeId>,
403 ctx: &mut ImportContext<'_>,
404 ) -> (bool, Vec<ImportedFolder>) {
405 let entries = match fs::read_dir(source) {
406 Ok(e) => e,
407 Err(_) => {
408 *ctx.errors += 1;
409 return (false, Vec::new());
410 }
411 };
412
413 let mut paths: Vec<PathBuf> = entries.flatten().map(|e| e.path()).collect();
414 paths.sort();
415
416 let mut folders = Vec::new();
417
418 for path in paths {
419 if ctx.is_cancelled() {
420 return (true, folders);
421 }
422
423 if path.is_dir() {
424 if is_skipped_dir(&path) {
425 continue;
426 }
427 let dir_name = audiofiles_core::util::get_filename(&path, "folder");
428
429 let dir_node_id =
430 match vfs::create_directory(ctx.db, vfs_id, parent_id, &dir_name) {
431 Ok(id) => Some(id),
432 Err(CoreError::NameConflict(_)) => {
433 vfs::list_children(ctx.db, vfs_id, parent_id)
434 .unwrap_or_default()
435 .iter()
436 .find(|n| {
437 n.name == dir_name && n.node_type == NodeType::Directory
438 })
439 .map(|n| n.id)
440 }
441 Err(_) => {
442 *ctx.errors += 1;
443 continue;
444 }
445 };
446
447 // Track where this folder's samples start in the imported list
448 let folder_start = ctx.imported.len();
449
450 let cancelled = import_directory_recursive(
451 &path,
452 vfs_id,
453 dir_node_id.or(parent_id),
454 ctx,
455 );
456
457 // Collect samples imported within this top-level folder
458 let folder_samples: Vec<(String, String)> =
459 ctx.imported[folder_start..].to_vec();
460
461 if !folder_samples.is_empty() {
462 folders.push(ImportedFolder {
463 name: dir_name,
464 samples: folder_samples,
465 });
466 }
467
468 if cancelled {
469 return (true, folders);
470 }
471 } else if path.is_file() && is_audio_file(&path) {
472 // Root-level files — import directly, no folder grouping
473 ctx.process_file(&path, vfs_id, parent_id);
474 }
475 }
476
477 (false, folders)
478 }
479
480 fn worker_loop(
481 cmd_rx: mpsc::Receiver<ImportCommand>,
482 event_tx: mpsc::Sender<ImportEvent>,
483 db_path: &Path,
484 store_root: &Path,
485 ) {
486 // Open our own DB connection and store
487 let db = match Database::open(db_path) {
488 Ok(db) => db,
489 Err(e) => {
490 let _ = event_tx.send(ImportEvent::Complete {
491 imported: Vec::new(),
492 total_files: 0,
493 errors: 1,
494 duplicates: 0,
495 folders: Vec::new(),
496 });
497 error!("Import worker failed to open DB: {e}");
498 return;
499 }
500 };
501
502 let store = match SampleStore::new(store_root) {
503 Ok(s) => s,
504 Err(e) => {
505 let _ = event_tx.send(ImportEvent::Complete {
506 imported: Vec::new(),
507 total_files: 0,
508 errors: 1,
509 duplicates: 0,
510 folders: Vec::new(),
511 });
512 error!("Import worker failed to open store: {e}");
513 return;
514 }
515 };
516
517 while let Ok(cmd) = cmd_rx.recv() {
518 match cmd {
519 ImportCommand::Shutdown => break,
520 ImportCommand::Cancel => continue,
521 ImportCommand::ImportDirectory { source, strategy } => {
522 // Resolve strategy to concrete (vfs_id, parent_id, flat)
523 let (vfs_id, parent_id, flat) = match strategy {
524 ImportStrategy::Flat { vfs_id, parent_id } => (vfs_id, parent_id, true),
525 ImportStrategy::NewVfs { vfs_name } => {
526 match vfs::create_vfs(&db, &vfs_name) {
527 Ok(id) => (id, None, false),
528 Err(e) => {
529 let _ = event_tx.send(ImportEvent::Complete {
530 imported: Vec::new(),
531 total_files: 0,
532 errors: 1,
533 duplicates: 0,
534 folders: Vec::new(),
535 });
536 error!("Failed to create VFS '{vfs_name}': {e}");
537 continue;
538 }
539 }
540 }
541 ImportStrategy::MergeIntoVfs { vfs_id, parent_id } => {
542 (vfs_id, parent_id, false)
543 }
544 };
545
546 // Phase 1: pre-walk to count audio files and sum sizes
547 let (total, total_bytes) = match count_audio_files(&source, &cmd_rx, &event_tx) {
548 Some(result) => result,
549 None => {
550 let _ = event_tx.send(ImportEvent::Complete {
551 imported: Vec::new(),
552 total_files: 0,
553 errors: 0,
554 duplicates: 0,
555 folders: Vec::new(),
556 });
557 continue;
558 }
559 };
560
561 let _ = event_tx.send(ImportEvent::WalkComplete { total, total_bytes });
562
563 // Check if loose-files mode is enabled for this vault
564 let loose_files = db
565 .conn()
566 .query_row(
567 "SELECT value FROM user_config WHERE key = 'loose_files'",
568 [],
569 |row| row.get::<_, String>(0),
570 )
571 .ok()
572 .is_some_and(|v| v == "1");
573
574 // Phase 2: import files with progress
575 let mut completed = 0usize;
576 let mut errors = 0usize;
577 let mut duplicates = 0usize;
578 let mut imported = Vec::new();
579
580 let mut ctx = ImportContext {
581 store: &store,
582 db: &db,
583 event_tx: &event_tx,
584 cmd_rx: &cmd_rx,
585 completed: &mut completed,
586 total,
587 errors: &mut errors,
588 duplicates: &mut duplicates,
589 imported: &mut imported,
590 loose_files,
591 };
592
593 let (cancelled, folders) = if flat {
594 let c = import_directory_flat(
595 &source, vfs_id, parent_id, &mut ctx,
596 );
597 (c, Vec::new())
598 } else {
599 import_structured(
600 &source, vfs_id, parent_id, &mut ctx,
601 )
602 };
603
604 let total_files = if cancelled { completed } else { total };
605
606 // Checkpoint WAL after large import to keep -shm file fresh
607 // and avoid stale memory-mapped state on macOS.
608 if let Err(e) = db.wal_checkpoint() {
609 warn!("WAL checkpoint after import failed: {e}");
610 }
611
612 let _ = event_tx.send(ImportEvent::Complete {
613 imported,
614 total_files,
615 errors,
616 duplicates,
617 folders,
618 });
619 }
620 }
621 }
622 }
623
624 #[cfg(test)]
625 mod tests {
626 use super::*;
627
628 #[test]
629 fn is_audio_file_recognises_extensions() {
630 assert!(is_audio_file(Path::new("kick.wav")));
631 assert!(is_audio_file(Path::new("pad.FLAC")));
632 assert!(is_audio_file(Path::new("song.mp3")));
633 assert!(is_audio_file(Path::new("loop.ogg")));
634 assert!(is_audio_file(Path::new("strings.aiff")));
635 assert!(is_audio_file(Path::new("brass.AIF")));
636 assert!(!is_audio_file(Path::new("readme.txt")));
637 assert!(!is_audio_file(Path::new("photo.png")));
638 assert!(!is_audio_file(Path::new("noext")));
639 }
640
641 #[test]
642 fn import_command_variants_constructible() {
643 let _import = ImportCommand::ImportDirectory {
644 source: PathBuf::from("/tmp/samples"),
645 strategy: ImportStrategy::MergeIntoVfs {
646 vfs_id: VfsId::from(1),
647 parent_id: None,
648 },
649 };
650 let _cancel = ImportCommand::Cancel;
651 let _shutdown = ImportCommand::Shutdown;
652 }
653
654 #[test]
655 fn import_event_variants_constructible() {
656 let _walk = ImportEvent::WalkComplete { total: 42, total_bytes: 1024 };
657 let _walk_progress = ImportEvent::WalkProgress { count: 17, total_bytes: 512 };
658 let _progress = ImportEvent::Progress {
659 completed: 5,
660 total: 42,
661 current_name: "kick.wav".to_string(),
662 };
663 let _err = ImportEvent::FileError {
664 path: "/tmp/bad.wav".to_string(),
665 error: "decode failed".to_string(),
666 };
667 let _done = ImportEvent::Complete {
668 imported: vec![("abc".to_string(), "wav".to_string())],
669 total_files: 1,
670 errors: 0,
671 duplicates: 0,
672 folders: vec![],
673 };
674 }
675
676 #[test]
677 fn import_strategy_variants_constructible() {
678 let _flat = ImportStrategy::Flat {
679 vfs_id: VfsId::from(1),
680 parent_id: None,
681 };
682 let _new = ImportStrategy::NewVfs {
683 vfs_name: "Test".to_string(),
684 };
685 let _merge = ImportStrategy::MergeIntoVfs {
686 vfs_id: VfsId::from(1),
687 parent_id: Some(NodeId::from(5)),
688 };
689 }
690
691 #[test]
692 fn spawn_and_drop_does_not_hang() {
693 let dir = tempfile::TempDir::new().unwrap();
694 let db_path = dir.path().join("test.db");
695 let store_root = dir.path().join("store");
696
697 // Create the DB so worker can open it
698 let _db = Database::open(&db_path).unwrap();
699
700 let handle = spawn_import_worker(db_path, store_root).unwrap();
701 assert!(handle.try_recv().is_none());
702 drop(handle); // Should send Shutdown and join cleanly
703 }
704 }
705