//! Background folder import worker: walks directories and imports audio files off the GUI thread. //! //! Mirrors the pattern in `audiofiles_core::analysis::worker` — the worker runs in a //! dedicated thread with its own DB connection, communicating via channels. Between files //! it checks for cancellation, keeping the UI responsive during large imports. use std::fs; use std::path::{Path, PathBuf}; use std::sync::{mpsc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use tracing::{error, instrument, warn}; use audiofiles_core::db::Database; use audiofiles_core::error::CoreError; use audiofiles_core::store::SampleStore; use audiofiles_core::vfs::{self, NodeType}; use audiofiles_core::{NodeId, VfsId}; /// Check whether a path has an audio file extension. fn is_audio_file(path: &Path) -> bool { audiofiles_core::util::is_audio_file(path) } /// Check whether a directory should be skipped during traversal. fn is_skipped_dir(path: &Path) -> bool { audiofiles_core::util::is_macos_metadata_dir(path) } /// How imported files should be organized in the VFS. pub enum ImportStrategy { /// All links in one directory, no subdirs created. Flat { vfs_id: VfsId, parent_id: Option }, /// Create a new VFS, preserve source directory structure. NewVfs { vfs_name: String }, /// Merge with structure into an existing VFS. MergeIntoVfs { vfs_id: VfsId, parent_id: Option }, } /// A top-level source folder and its imported samples. #[derive(Clone)] pub struct ImportedFolder { pub name: String, pub samples: Vec<(String, String)>, // (hash, ext) } /// Result of importing a single file into the VFS. enum ImportFileResult { Imported(String, String), // (hash, ext) Duplicate, // NameConflict — link already existed } /// Command sent from the GUI thread to the import worker. pub enum ImportCommand { /// Import all audio files from `source` using the given strategy. ImportDirectory { source: PathBuf, strategy: ImportStrategy, }, /// Cancel the current import. Cancel, /// Shut down the worker thread. Shutdown, } /// Event sent from the import worker back to the GUI thread. pub enum ImportEvent { /// Throttled progress emitted during the pre-walk so the UI can show a /// running file count instead of an indeterminate spinner (m-12). Fires /// at most every ~100ms. WalkProgress { count: usize, total_bytes: u64 }, /// Pre-walk finished — we now know the total file count and size. WalkComplete { total: usize, total_bytes: u64 }, /// One file was processed. Progress { completed: usize, total: usize, current_name: String, }, /// A single file failed to import. FileError { path: String, error: String }, /// The entire import is done. Complete { /// `(hash, file_extension)` pairs for analysis flow. imported: Vec<(String, String)>, total_files: usize, errors: usize, duplicates: usize, /// Top-level folders with their samples (empty for flat imports). folders: Vec, }, } /// Handle for communicating with the background import worker. /// /// The receiver is wrapped in a `Mutex` so `BrowserState` remains `Sync` (required by nih-plug). /// Only the GUI thread actually calls `try_recv`, so contention is zero. pub struct ImportHandle { cmd_tx: mpsc::Sender, event_rx: Mutex>, _thread: Option>, } impl ImportHandle { /// Poll for the next event without blocking. pub fn try_recv(&self) -> Option { self.event_rx.lock().ok()?.try_recv().ok() } /// Send a command to the worker. pub fn send(&self, cmd: ImportCommand) { let _ = self.cmd_tx.send(cmd); } } impl Drop for ImportHandle { fn drop(&mut self) { let _ = self.cmd_tx.send(ImportCommand::Shutdown); if let Some(handle) = self._thread.take() { let _ = handle.join(); } } } /// Spawn the background import worker thread. /// /// The worker opens its own `Database` and `SampleStore` to avoid Mutex contention /// with the GUI thread. Returns a handle for sending commands and polling events. #[instrument(skip_all)] pub fn spawn_import_worker(db_path: PathBuf, store_root: PathBuf) -> std::io::Result { let (cmd_tx, cmd_rx) = mpsc::channel::(); let (event_tx, event_rx) = mpsc::channel::(); let thread = thread::Builder::new() .name("import-worker".to_string()) .spawn(move || { worker_loop(cmd_rx, event_tx, &db_path, &store_root); })?; Ok(ImportHandle { cmd_tx, event_rx: Mutex::new(event_rx), _thread: Some(thread), }) } /// Recursively count audio files and sum their sizes under `dir`. /// Checks for cancellation between entries. Returns `None` if cancelled. #[instrument(skip_all)] fn count_audio_files( dir: &Path, cmd_rx: &mpsc::Receiver, event_tx: &mpsc::Sender, ) -> Option<(usize, u64)> { let mut count = 0; let mut total_bytes = 0u64; let mut stack = vec![dir.to_path_buf()]; let mut last_emit = Instant::now(); let emit_interval = Duration::from_millis(100); while let Some(current) = stack.pop() { // Check for cancel if let Ok(ImportCommand::Cancel) | Ok(ImportCommand::Shutdown) = cmd_rx.try_recv() { return None; } let entries = match fs::read_dir(¤t) { Ok(e) => e, Err(e) => { tracing::warn!(dir = %current.display(), "Failed to read directory during pre-walk: {e}"); continue; } }; for entry in entries.flatten() { let path = entry.path(); if path.is_dir() { if !is_skipped_dir(&path) { stack.push(path); } } else if is_audio_file(&path) { count += 1; if let Ok(meta) = fs::metadata(&path) { total_bytes += meta.len(); } if last_emit.elapsed() >= emit_interval { let _ = event_tx.send(ImportEvent::WalkProgress { count, total_bytes }); last_emit = Instant::now(); } } } } Some((count, total_bytes)) } /// Import a single file into store + VFS, returning the result. fn import_single_file( path: &Path, vfs_id: VfsId, parent_id: Option, store: &SampleStore, db: &Database, loose_files: bool, ) -> Result { let hash = if loose_files { store.import_loose_files(path, db)? } else { store.import(path, db)? }; let name = audiofiles_core::util::get_filename(path, "unknown"); let ext = audiofiles_core::util::get_extension(path); match vfs::create_sample_link(db, vfs_id, parent_id, &name, &hash) { Ok(_) => Ok(ImportFileResult::Imported(hash, ext)), Err(CoreError::NameConflict(_)) => Ok(ImportFileResult::Duplicate), Err(e) => { if let CoreError::Db(ref sqlite_err) = e && sqlite_err.to_string().contains("UNIQUE") { return Ok(ImportFileResult::Duplicate); } Err(e) } } } /// Shared mutable state and dependencies for the import functions. /// /// Bundles the store, DB, channels, and progress counters so that /// `import_directory_recursive`, `import_directory_flat`, and `import_structured` /// don't each need 12 parameters. struct ImportContext<'a> { store: &'a SampleStore, db: &'a Database, event_tx: &'a mpsc::Sender, cmd_rx: &'a mpsc::Receiver, completed: &'a mut usize, total: usize, errors: &'a mut usize, duplicates: &'a mut usize, imported: &'a mut Vec<(String, String)>, loose_files: bool, } impl ImportContext<'_> { /// Check for a cancellation command without blocking. fn is_cancelled(&self) -> bool { matches!( self.cmd_rx.try_recv(), Ok(ImportCommand::Cancel) | Ok(ImportCommand::Shutdown) ) } /// Send a progress event for the current file. fn send_progress(&self, name: String) { let _ = self.event_tx.send(ImportEvent::Progress { completed: *self.completed, total: self.total, current_name: name, }); } /// Import a single audio file, updating counters and sending error events. fn process_file(&mut self, path: &Path, vfs_id: VfsId, parent_id: Option) { let name = audiofiles_core::util::get_filename(path, "unknown"); self.send_progress(name); match import_single_file(path, vfs_id, parent_id, self.store, self.db, self.loose_files) { Ok(ImportFileResult::Imported(hash, ext)) => { self.imported.push((hash, ext)); *self.completed += 1; } Ok(ImportFileResult::Duplicate) => { *self.duplicates += 1; *self.completed += 1; } Err(e) => { *self.errors += 1; let _ = self.event_tx.send(ImportEvent::FileError { path: path.display().to_string(), error: e.to_string(), }); } } } } /// Recursively import a directory with structure, sending progress events. /// Returns `true` if cancelled. fn import_directory_recursive( dir: &Path, vfs_id: VfsId, parent_id: Option, ctx: &mut ImportContext<'_>, ) -> bool { let entries = match fs::read_dir(dir) { Ok(e) => e, Err(_) => { *ctx.errors += 1; return false; // not cancelled } }; let mut paths: Vec = entries.flatten().map(|e| e.path()).collect(); paths.sort(); for path in paths { if ctx.is_cancelled() { return true; } if path.is_dir() { if is_skipped_dir(&path) { continue; } let dir_name = audiofiles_core::util::get_filename(&path, "folder"); let dir_node_id = match vfs::create_directory(ctx.db, vfs_id, parent_id, &dir_name) { Ok(id) => Some(id), Err(CoreError::NameConflict(_)) => { vfs::list_children(ctx.db, vfs_id, parent_id) .unwrap_or_default() .iter() .find(|n| { n.name == dir_name && n.node_type == NodeType::Directory }) .map(|n| n.id) } Err(_) => { *ctx.errors += 1; continue; } }; let cancelled = import_directory_recursive( &path, vfs_id, dir_node_id.or(parent_id), ctx, ); if cancelled { return true; } } else if path.is_file() && is_audio_file(&path) { ctx.process_file(&path, vfs_id, parent_id); } } false // not cancelled } /// Flat import: walk source tree recursively but put all VFS links into a single directory. /// Returns `true` if cancelled. fn import_directory_flat( dir: &Path, vfs_id: VfsId, parent_id: Option, ctx: &mut ImportContext<'_>, ) -> bool { let entries = match fs::read_dir(dir) { Ok(e) => e, Err(_) => { *ctx.errors += 1; return false; } }; let mut paths: Vec = entries.flatten().map(|e| e.path()).collect(); paths.sort(); for path in paths { if ctx.is_cancelled() { return true; } if path.is_dir() { if is_skipped_dir(&path) { continue; } // Recurse into subdirs but still put all links at the same flat level let cancelled = import_directory_flat(&path, vfs_id, parent_id, ctx); if cancelled { return true; } } else if path.is_file() && is_audio_file(&path) { ctx.process_file(&path, vfs_id, parent_id); } } false } /// Structured import: iterate source dir's immediate children, import each top-level /// subdirectory via `import_directory_recursive`, tracking `ImportedFolder` per top-level dir. /// Files directly in the source root are imported without a folder grouping. /// Returns `(cancelled, folders)`. fn import_structured( source: &Path, vfs_id: VfsId, parent_id: Option, ctx: &mut ImportContext<'_>, ) -> (bool, Vec) { let entries = match fs::read_dir(source) { Ok(e) => e, Err(_) => { *ctx.errors += 1; return (false, Vec::new()); } }; let mut paths: Vec = entries.flatten().map(|e| e.path()).collect(); paths.sort(); let mut folders = Vec::new(); for path in paths { if ctx.is_cancelled() { return (true, folders); } if path.is_dir() { if is_skipped_dir(&path) { continue; } let dir_name = audiofiles_core::util::get_filename(&path, "folder"); let dir_node_id = match vfs::create_directory(ctx.db, vfs_id, parent_id, &dir_name) { Ok(id) => Some(id), Err(CoreError::NameConflict(_)) => { vfs::list_children(ctx.db, vfs_id, parent_id) .unwrap_or_default() .iter() .find(|n| { n.name == dir_name && n.node_type == NodeType::Directory }) .map(|n| n.id) } Err(_) => { *ctx.errors += 1; continue; } }; // Track where this folder's samples start in the imported list let folder_start = ctx.imported.len(); let cancelled = import_directory_recursive( &path, vfs_id, dir_node_id.or(parent_id), ctx, ); // Collect samples imported within this top-level folder let folder_samples: Vec<(String, String)> = ctx.imported[folder_start..].to_vec(); if !folder_samples.is_empty() { folders.push(ImportedFolder { name: dir_name, samples: folder_samples, }); } if cancelled { return (true, folders); } } else if path.is_file() && is_audio_file(&path) { // Root-level files — import directly, no folder grouping ctx.process_file(&path, vfs_id, parent_id); } } (false, folders) } fn worker_loop( cmd_rx: mpsc::Receiver, event_tx: mpsc::Sender, db_path: &Path, store_root: &Path, ) { // Open our own DB connection and store let db = match Database::open(db_path) { Ok(db) => db, Err(e) => { let _ = event_tx.send(ImportEvent::Complete { imported: Vec::new(), total_files: 0, errors: 1, duplicates: 0, folders: Vec::new(), }); error!("Import worker failed to open DB: {e}"); return; } }; let store = match SampleStore::new(store_root) { Ok(s) => s, Err(e) => { let _ = event_tx.send(ImportEvent::Complete { imported: Vec::new(), total_files: 0, errors: 1, duplicates: 0, folders: Vec::new(), }); error!("Import worker failed to open store: {e}"); return; } }; while let Ok(cmd) = cmd_rx.recv() { match cmd { ImportCommand::Shutdown => break, ImportCommand::Cancel => continue, ImportCommand::ImportDirectory { source, strategy } => { // Resolve strategy to concrete (vfs_id, parent_id, flat) let (vfs_id, parent_id, flat) = match strategy { ImportStrategy::Flat { vfs_id, parent_id } => (vfs_id, parent_id, true), ImportStrategy::NewVfs { vfs_name } => { match vfs::create_vfs(&db, &vfs_name) { Ok(id) => (id, None, false), Err(e) => { let _ = event_tx.send(ImportEvent::Complete { imported: Vec::new(), total_files: 0, errors: 1, duplicates: 0, folders: Vec::new(), }); error!("Failed to create VFS '{vfs_name}': {e}"); continue; } } } ImportStrategy::MergeIntoVfs { vfs_id, parent_id } => { (vfs_id, parent_id, false) } }; // Phase 1: pre-walk to count audio files and sum sizes let (total, total_bytes) = match count_audio_files(&source, &cmd_rx, &event_tx) { Some(result) => result, None => { let _ = event_tx.send(ImportEvent::Complete { imported: Vec::new(), total_files: 0, errors: 0, duplicates: 0, folders: Vec::new(), }); continue; } }; let _ = event_tx.send(ImportEvent::WalkComplete { total, total_bytes }); // Check if loose-files mode is enabled for this vault let loose_files = db .conn() .query_row( "SELECT value FROM user_config WHERE key = 'loose_files'", [], |row| row.get::<_, String>(0), ) .ok() .is_some_and(|v| v == "1"); // Phase 2: import files with progress let mut completed = 0usize; let mut errors = 0usize; let mut duplicates = 0usize; let mut imported = Vec::new(); let mut ctx = ImportContext { store: &store, db: &db, event_tx: &event_tx, cmd_rx: &cmd_rx, completed: &mut completed, total, errors: &mut errors, duplicates: &mut duplicates, imported: &mut imported, loose_files, }; let (cancelled, folders) = if flat { let c = import_directory_flat( &source, vfs_id, parent_id, &mut ctx, ); (c, Vec::new()) } else { import_structured( &source, vfs_id, parent_id, &mut ctx, ) }; let total_files = if cancelled { completed } else { total }; // Checkpoint WAL after large import to keep -shm file fresh // and avoid stale memory-mapped state on macOS. if let Err(e) = db.wal_checkpoint() { warn!("WAL checkpoint after import failed: {e}"); } let _ = event_tx.send(ImportEvent::Complete { imported, total_files, errors, duplicates, folders, }); } } } } #[cfg(test)] mod tests { use super::*; #[test] fn is_audio_file_recognises_extensions() { assert!(is_audio_file(Path::new("kick.wav"))); assert!(is_audio_file(Path::new("pad.FLAC"))); assert!(is_audio_file(Path::new("song.mp3"))); assert!(is_audio_file(Path::new("loop.ogg"))); assert!(is_audio_file(Path::new("strings.aiff"))); assert!(is_audio_file(Path::new("brass.AIF"))); assert!(!is_audio_file(Path::new("readme.txt"))); assert!(!is_audio_file(Path::new("photo.png"))); assert!(!is_audio_file(Path::new("noext"))); } #[test] fn import_command_variants_constructible() { let _import = ImportCommand::ImportDirectory { source: PathBuf::from("/tmp/samples"), strategy: ImportStrategy::MergeIntoVfs { vfs_id: VfsId::from(1), parent_id: None, }, }; let _cancel = ImportCommand::Cancel; let _shutdown = ImportCommand::Shutdown; } #[test] fn import_event_variants_constructible() { let _walk = ImportEvent::WalkComplete { total: 42, total_bytes: 1024 }; let _walk_progress = ImportEvent::WalkProgress { count: 17, total_bytes: 512 }; let _progress = ImportEvent::Progress { completed: 5, total: 42, current_name: "kick.wav".to_string(), }; let _err = ImportEvent::FileError { path: "/tmp/bad.wav".to_string(), error: "decode failed".to_string(), }; let _done = ImportEvent::Complete { imported: vec![("abc".to_string(), "wav".to_string())], total_files: 1, errors: 0, duplicates: 0, folders: vec![], }; } #[test] fn import_strategy_variants_constructible() { let _flat = ImportStrategy::Flat { vfs_id: VfsId::from(1), parent_id: None, }; let _new = ImportStrategy::NewVfs { vfs_name: "Test".to_string(), }; let _merge = ImportStrategy::MergeIntoVfs { vfs_id: VfsId::from(1), parent_id: Some(NodeId::from(5)), }; } #[test] fn spawn_and_drop_does_not_hang() { let dir = tempfile::TempDir::new().unwrap(); let db_path = dir.path().join("test.db"); let store_root = dir.path().join("store"); // Create the DB so worker can open it let _db = Database::open(&db_path).unwrap(); let handle = spawn_import_worker(db_path, store_root).unwrap(); assert!(handle.try_recv().is_none()); drop(handle); // Should send Shutdown and join cleanly } }