//! Bounded background-task queue for fire-and-forget work. //! //! Replaces per-request `tokio::spawn(...)` for low-priority work that //! competes with request handlers for the DB pool — email sends, mailing-list //! subscriptions, etc. Run #4 fixed the same shape for page views via //! `db::page_views::PageViewTx`; Run #8 surfaced it again on the webhook //! hot path (`routes/stripe/webhook/checkout_helpers.rs`), so this module //! generalizes the pattern. //! //! Two bounds applied: //! - **Queue capacity**: `try_send` returns Err once `CAPACITY` tasks are //! queued; the task is dropped and a warning is logged. Caller is never //! blocked and never panics. //! - **Concurrent execution**: a semaphore caps the number of background //! tasks running at once, so a burst of email sends can't exhaust the //! 25-slot DB pool out from under request handlers. //! //! Use `state.bg.spawn(name, fut)` instead of `tokio::spawn(fut)` for any //! work that (a) doesn't need its result and (b) acquires DB pool conns or //! makes outbound network calls. use std::future::Future; use std::pin::Pin; use std::sync::Arc; use tokio::sync::{mpsc, Semaphore}; /// Max queued tasks before `spawn` starts dropping with a warning. const CAPACITY: usize = 1024; /// Max background tasks running concurrently. Set well below /// `DB_POOL_MAX_CONNECTIONS` (25) so background work can't starve requests. const PARALLELISM: usize = 8; type BoxFuture = Pin + Send + 'static>>; #[derive(Clone)] pub struct BackgroundTx { tx: mpsc::Sender<(&'static str, BoxFuture)>, } impl BackgroundTx { /// Submit a fire-and-forget task. On queue overflow the task is dropped /// and a warning is logged with `name` as the task identifier. Never /// blocks; never panics. pub fn spawn(&self, name: &'static str, fut: F) where F: Future + Send + 'static, { if self.tx.try_send((name, Box::pin(fut))).is_err() { tracing::warn!(task = name, "background queue full, task dropped"); } } } /// Spawn the drainer + return the sender to install on `AppState`. The /// drainer pulls tasks off the channel and runs each under a semaphore /// permit so concurrent execution is bounded. pub fn spawn_pool() -> BackgroundTx { let (tx, mut rx) = mpsc::channel::<(&'static str, BoxFuture)>(CAPACITY); let sem = Arc::new(Semaphore::new(PARALLELISM)); tokio::spawn(async move { while let Some((_, task)) = rx.recv().await { let permit = match sem.clone().acquire_owned().await { Ok(p) => p, Err(_) => break, // semaphore closed → shutting down }; tokio::spawn(async move { task.await; drop(permit); }); } tracing::info!("background-task drainer exited"); }); BackgroundTx { tx } }