Skip to main content

max / makenotwork

server: bound broadcast fan-out at 16 with 100ms cadence Both broadcast paths (POST /api/broadcast and the internal CLI equivalent) ran an unbounded sequential for-loop inside the spawned fan-out task. At thousand-follower scale that's minutes of wall-clock serialized through a single email_client connection, plus a Postmark API burst we'd rather smooth out. Replaces the loop with a JoinSet that keeps up to BROADCAST_PARALLELISM (16) sends in flight and sleeps BROADCAST_CHUNK_DELAY_MS (100ms) between spawns. Steady-state is ~10 sends/sec with first-recipient latency unchanged. Both call sites get the same shape so future drift is one-shot to fix. Note: the 100ms pacing is new — there was no per-recipient sleep before, so this is a Postmark-friendliness improvement, not a preservation of prior behavior. Plan: _private/docs/mnw/server-docs/plans/broadcast-bounded-fanout.md.
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-27 14:36 UTC
Commit: 5d6d93d19a43a6ddf5f3f04ad89b413b1fc09008
Parent: c5cc1b2
3 files changed, +87 insertions, -24 deletions
@@ -142,6 +142,16 @@ pub const DISCOVER_PAGE_SIZE: u32 = 25;
142 142 pub const FEED_PAGE_SIZE: u32 = 25;
143 143 pub const PAGINATION_WINDOW_SIZE: u32 = 5;
144 144
145 + // -- Creator broadcast fan-out --
146 + /// Max concurrent in-flight email sends per broadcast. The outer worker
147 + /// task spawns up to this many child tasks, then waits on one to drain
148 + /// before spawning the next.
149 + pub const BROADCAST_PARALLELISM: usize = 16;
150 + /// Delay between successive broadcast send-task spawns. Spreads Postmark
151 + /// API load when a creator with thousands of followers fires a broadcast
152 + /// — at parallelism 16 + 100 ms cadence, steady-state is ~10 sends/sec.
153 + pub const BROADCAST_CHUNK_DELAY_MS: u64 = 100;
154 +
145 155 // -- File scanning --
146 156 pub const SCAN_MAX_MEMORY_BYTES: usize = 100 * 1024 * 1024; // 100 MB in-memory threshold
147 157 pub const SCAN_MAX_CONCURRENT: usize = 4; // Max concurrent file scans (each can use up to 100 MB RAM)
@@ -461,6 +471,11 @@ mod tests {
461 471 }
462 472
463 473 #[test]
474 + fn broadcast_parallelism_sane() {
475 + assert!(BROADCAST_PARALLELISM > 0 && BROADCAST_PARALLELISM <= 64);
476 + }
477 +
478 + #[test]
464 479 fn scan_zip_max_uncompressed_exceeds_memory_threshold() {
465 480 assert!(SCAN_ZIP_MAX_UNCOMPRESSED > SCAN_MAX_MEMORY_BYTES as u64);
466 481 }
@@ -7,6 +7,7 @@ use axum::Json;
7 7 use serde::{Deserialize, Serialize};
8 8
9 9 use crate::auth::ServiceAuth;
10 + use crate::constants;
10 11 use crate::db::{self, CollectionId, ItemId, ProjectId, Slug, UserId};
11 12 use crate::error::{AppError, Result, ResultExt};
12 13 use crate::AppState;
@@ -193,21 +194,44 @@ pub(super) async fn send_broadcast(
193 194 let email_client = state.email.clone();
194 195
195 196 tokio::spawn(async move {
197 + let mut set = tokio::task::JoinSet::new();
198 + let chunk_delay = std::time::Duration::from_millis(constants::BROADCAST_CHUNK_DELAY_MS);
199 +
196 200 for follower in followers {
197 - let unsub_url = crate::email::generate_unsubscribe_url(
198 - &host_url, follower.id, crate::email::UnsubscribeAction::Broadcast, &creator_id.to_string(), &signing_secret,
199 - );
200 - if let Err(e) = email_client.send_broadcast(
201 - &follower.email,
202 - follower.display_name.as_deref(),
203 - &creator_name,
204 - &subject,
205 - &body,
206 - Some(&unsub_url),
207 - ).await {
208 - tracing::warn!(error = ?e, to = %follower.email, "broadcast email failed");
201 + if set.len() >= constants::BROADCAST_PARALLELISM {
202 + let _ = set.join_next().await;
209 203 }
204 +
205 + let email_client = email_client.clone();
206 + let host_url = host_url.clone();
207 + let signing_secret = signing_secret.clone();
208 + let creator_name = creator_name.clone();
209 + let subject = subject.clone();
210 + let body = body.clone();
211 + let creator_id_str = creator_id.to_string();
212 +
213 + set.spawn(async move {
214 + let unsub_url = crate::email::generate_unsubscribe_url(
215 + &host_url, follower.id,
216 + crate::email::UnsubscribeAction::Broadcast,
217 + &creator_id_str, &signing_secret,
218 + );
219 + if let Err(e) = email_client.send_broadcast(
220 + &follower.email,
221 + follower.display_name.as_deref(),
222 + &creator_name,
223 + &subject,
224 + &body,
225 + Some(&unsub_url),
226 + ).await {
227 + tracing::warn!(error = ?e, to = %follower.email, "broadcast email failed");
228 + }
229 + });
230 +
231 + tokio::time::sleep(chunk_delay).await;
210 232 }
233 +
234 + while set.join_next().await.is_some() {}
211 235 });
212 236 }
213 237
@@ -9,6 +9,7 @@ use serde::Deserialize;
9 9
10 10 use crate::{
11 11 auth::AuthUser,
12 + constants,
12 13 db,
13 14 error::{AppError, Result},
14 15 templates::FormStatusTemplate,
@@ -93,21 +94,44 @@ pub(in crate::routes::api) async fn broadcast_send(
93 94 let signing_secret = state.config.signing_secret.clone();
94 95
95 96 tokio::spawn(async move {
97 + let mut set = tokio::task::JoinSet::new();
98 + let chunk_delay = std::time::Duration::from_millis(constants::BROADCAST_CHUNK_DELAY_MS);
99 +
96 100 for follower in followers {
97 - let unsub_url = crate::email::generate_unsubscribe_url(
98 - &host_url, follower.id, crate::email::UnsubscribeAction::Broadcast, &creator_id.to_string(), &signing_secret,
99 - );
100 - if let Err(e) = email_client.send_broadcast(
101 - &follower.email,
102 - follower.display_name.as_deref(),
103 - &creator_name,
104 - &subject,
105 - &body,
106 - Some(&unsub_url),
107 - ).await {
108 - tracing::error!(error = ?e, "failed to send broadcast email");
101 + if set.len() >= constants::BROADCAST_PARALLELISM {
102 + let _ = set.join_next().await;
109 103 }
104 +
105 + let email_client = email_client.clone();
106 + let host_url = host_url.clone();
107 + let signing_secret = signing_secret.clone();
108 + let creator_name = creator_name.clone();
109 + let subject = subject.clone();
110 + let body = body.clone();
111 + let creator_id_str = creator_id.to_string();
112 +
113 + set.spawn(async move {
114 + let unsub_url = crate::email::generate_unsubscribe_url(
115 + &host_url, follower.id,
116 + crate::email::UnsubscribeAction::Broadcast,
117 + &creator_id_str, &signing_secret,
118 + );
119 + if let Err(e) = email_client.send_broadcast(
120 + &follower.email,
121 + follower.display_name.as_deref(),
122 + &creator_name,
123 + &subject,
124 + &body,
125 + Some(&unsub_url),
126 + ).await {
127 + tracing::warn!(error = ?e, to = %follower.email, "broadcast email failed");
128 + }
129 + });
130 +
131 + tokio::time::sleep(chunk_delay).await;
110 132 }
133 +
134 + while set.join_next().await.is_some() {}
111 135 });
112 136
113 137 tracing::info!(user_id = %user.id, recipient_count = count, "broadcast sent");