Skip to main content

max / makenotwork

2.9 KB · 76 lines History Blame Raw
1 //! Bounded background-task queue for fire-and-forget work.
2 //!
3 //! Replaces per-request `tokio::spawn(...)` for low-priority work that
4 //! competes with request handlers for the DB pool — email sends, mailing-list
5 //! subscriptions, etc. Run #4 fixed the same shape for page views via
6 //! `db::page_views::PageViewTx`; Run #8 surfaced it again on the webhook
7 //! hot path (`routes/stripe/webhook/checkout_helpers.rs`), so this module
8 //! generalizes the pattern.
9 //!
10 //! Two bounds applied:
11 //! - **Queue capacity**: `try_send` returns Err once `CAPACITY` tasks are
12 //! queued; the task is dropped and a warning is logged. Caller is never
13 //! blocked and never panics.
14 //! - **Concurrent execution**: a semaphore caps the number of background
15 //! tasks running at once, so a burst of email sends can't exhaust the
16 //! 25-slot DB pool out from under request handlers.
17 //!
18 //! Use `state.bg.spawn(name, fut)` instead of `tokio::spawn(fut)` for any
19 //! work that (a) doesn't need its result and (b) acquires DB pool conns or
20 //! makes outbound network calls.
21
22 use std::future::Future;
23 use std::pin::Pin;
24 use std::sync::Arc;
25 use tokio::sync::{mpsc, Semaphore};
26
27 /// Max queued tasks before `spawn` starts dropping with a warning.
28 const CAPACITY: usize = 1024;
29
30 /// Max background tasks running concurrently. Set well below
31 /// `DB_POOL_MAX_CONNECTIONS` (25) so background work can't starve requests.
32 const PARALLELISM: usize = 8;
33
34 type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
35
36 #[derive(Clone)]
37 pub struct BackgroundTx {
38 tx: mpsc::Sender<(&'static str, BoxFuture)>,
39 }
40
41 impl BackgroundTx {
42 /// Submit a fire-and-forget task. On queue overflow the task is dropped
43 /// and a warning is logged with `name` as the task identifier. Never
44 /// blocks; never panics.
45 pub fn spawn<F>(&self, name: &'static str, fut: F)
46 where
47 F: Future<Output = ()> + Send + 'static,
48 {
49 if self.tx.try_send((name, Box::pin(fut))).is_err() {
50 tracing::warn!(task = name, "background queue full, task dropped");
51 }
52 }
53 }
54
55 /// Spawn the drainer + return the sender to install on `AppState`. The
56 /// drainer pulls tasks off the channel and runs each under a semaphore
57 /// permit so concurrent execution is bounded.
58 pub fn spawn_pool() -> BackgroundTx {
59 let (tx, mut rx) = mpsc::channel::<(&'static str, BoxFuture)>(CAPACITY);
60 let sem = Arc::new(Semaphore::new(PARALLELISM));
61 tokio::spawn(async move {
62 while let Some((_, task)) = rx.recv().await {
63 let permit = match sem.clone().acquire_owned().await {
64 Ok(p) => p,
65 Err(_) => break, // semaphore closed → shutting down
66 };
67 tokio::spawn(async move {
68 task.await;
69 drop(permit);
70 });
71 }
72 tracing::info!("background-task drainer exited");
73 });
74 BackgroundTx { tx }
75 }
76