Skip to main content

max / makenotwork

Run #14 remediation: governor sweeper, scan/storage/payments hardening - rate_limit/lib/config: register a governor bucket-map retain_recent sweeper at boot so per-IP GCRA stores are reclaimed (CHRONIC 1) - scheduler/announcements: run onboarding fan-out off the lock-held connection with a bounded candidate query - stripe: collapse the duplicated cart checkout paths, unify the connect account-id parser, reconcile credited amount against Stripe subtotal - storage/gallery: enforce the per-entity cap inside the confirm tx; media_files list WARNs on cap-hit instead of silently truncating - scanning: YARA expected-rule-count boot floor; quarantine purges the CDN-served image rows before deleting the object - git: run libgit2 read paths on the blocking pool via ResolvedRepo::with_repo; escape_html now escapes single quotes Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-09 06:24 UTC
Commit: cf0c3454dd90a7a3ae6dab7ea94eb04bd57bceb4
Parent: 5b7cc20
21 files changed, +749 insertions, -461 deletions
@@ -427,6 +427,12 @@ pub struct ScanConfig {
427 427 /// <https://metadefender.com/account>). Second-opinion layer; only
428 428 /// invoked when another layer flagged the file as suspicious.
429 429 pub metadefender_api_key: Option<String>,
430 + /// Minimum number of YARA rule files that must compile for the corpus to be
431 + /// considered healthy. `0` disables the check (default). Set it to the known
432 + /// deployed corpus size so a silent drop (a dependency/format change that
433 + /// makes rules uncompilable) fails boot loudly rather than degrading
434 + /// coverage unnoticed.
435 + pub yara_min_rule_files: usize,
430 436 }
431 437
432 438 impl ScanConfig {
@@ -453,6 +459,10 @@ impl ScanConfig {
453 459 .unwrap_or(true),
454 460 abuse_ch_auth_key: std::env::var("ABUSE_CH_AUTH_KEY").ok().filter(|s| !s.is_empty()),
455 461 metadefender_api_key: std::env::var("METADEFENDER_API_KEY").ok().filter(|s| !s.is_empty()),
462 + yara_min_rule_files: std::env::var("YARA_MIN_RULE_FILES")
463 + .ok()
464 + .and_then(|v| v.parse().ok())
465 + .unwrap_or(0),
456 466 })
457 467 }
458 468 }
@@ -72,6 +72,10 @@ pub const HEALTH_HISTORY_RETAIN_DAYS: i64 = 90;
72 72 // -- Scheduled publish --
73 73 pub const SCHEDULER_INTERVAL_SECS: u64 = 60;
74 74
75 + // How often the rate-limiter bucket-map sweeper reclaims stale GCRA entries.
76 + // Bounds limiter map size by active (not cumulative-unique) client keys.
77 + pub const GOVERNOR_SWEEP_INTERVAL_SECS: u64 = 60;
78 +
75 79 // -- TOTP / 2FA --
76 80 pub const TOTP_SKEW: u8 = 1; // Allow +/-1 time step (+/-30s)
77 81 pub const TOTP_STEP: u64 = 30; // 30-second windows
@@ -41,6 +41,11 @@ pub async fn create<'e>(
41 41 Ok(row)
42 42 }
43 43
44 + /// Safety cap on the media picker listing. Hitting it is logged at WARN so a
45 + /// creator with more than this many clean files in one folder isn't silently
46 + /// truncated (mirrors `versions::VERSIONS_LIST_HARD_CAP`).
47 + pub const MEDIA_LIST_HARD_CAP: i64 = 500;
48 +
44 49 /// List media files for a user, optionally filtered by folder.
45 50 #[tracing::instrument(skip_all)]
46 51 pub async fn list_by_user_folder(
@@ -50,21 +55,30 @@ pub async fn list_by_user_folder(
50 55 ) -> Result<Vec<DbMediaFile>> {
51 56 let rows = if let Some(f) = folder {
52 57 sqlx::query_as::<_, DbMediaFile>(
53 - "SELECT * FROM media_files WHERE user_id = $1 AND folder = $2 AND scan_status = 'clean' ORDER BY created_at DESC LIMIT 500",
58 + "SELECT * FROM media_files WHERE user_id = $1 AND folder = $2 AND scan_status = 'clean' ORDER BY created_at DESC LIMIT $3",
54 59 )
55 60 .bind(user_id)
56 61 .bind(f)
62 + .bind(MEDIA_LIST_HARD_CAP)
57 63 .fetch_all(pool)
58 64 .await?
59 65 } else {
60 66 sqlx::query_as::<_, DbMediaFile>(
61 - "SELECT * FROM media_files WHERE user_id = $1 AND scan_status = 'clean' ORDER BY created_at DESC LIMIT 500",
67 + "SELECT * FROM media_files WHERE user_id = $1 AND scan_status = 'clean' ORDER BY created_at DESC LIMIT $2",
62 68 )
63 69 .bind(user_id)
70 + .bind(MEDIA_LIST_HARD_CAP)
64 71 .fetch_all(pool)
65 72 .await?
66 73 };
67 74
75 + if rows.len() as i64 == MEDIA_LIST_HARD_CAP {
76 + tracing::warn!(
77 + %user_id, cap = MEDIA_LIST_HARD_CAP,
78 + "list_by_user_folder hit hard cap; some media omitted from the picker"
79 + );
80 + }
81 +
68 82 Ok(rows)
69 83 }
70 84
@@ -42,6 +42,31 @@ pub struct HeldVersionRow {
42 42
43 43 /// Insert a scan result record for audit trail.
44 44 #[tracing::instrument(skip_all)]
45 + /// Remove every CDN-served image row referencing `s3_key`, across the three
46 + /// image tables that have no per-row scan gate: `item_images.s3_key`,
47 + /// `project_images.s3_key`, and `content_insertions.storage_key`. Returns the
48 + /// number of rows removed.
49 + ///
50 + /// On quarantine of these kinds, deleting the row IS the primary enforcement: it
51 + /// stops the app from ever rendering the (Cloudflare-served) URL again, and —
52 + /// critically — makes the key non-live so the durable S3-deletion queue will
53 + /// actually purge the object instead of parking it behind the `is_s3_key_live`
54 + /// guard. `storage_used` counters self-heal on the weekly
55 + /// `recalculate_all_storage_used` pass; we accept a transient over-count for a
56 + /// malicious upload rather than join through three ownership paths here.
57 + #[tracing::instrument(skip_all)]
58 + pub async fn purge_cdn_image_rows_by_key(db: &PgPool, s3_key: &str) -> Result<u64, sqlx::Error> {
59 + let mut removed = 0u64;
60 + for sql in [
61 + "DELETE FROM item_images WHERE s3_key = $1",
62 + "DELETE FROM project_images WHERE s3_key = $1",
63 + "DELETE FROM content_insertions WHERE storage_key = $1",
64 + ] {
65 + removed += sqlx::query(sql).bind(s3_key).execute(db).await?.rows_affected();
66 + }
67 + Ok(removed)
68 + }
69 +
45 70 pub async fn insert_scan_result(
46 71 db: &PgPool,
47 72 s3_key: &str,
@@ -256,6 +256,14 @@ pub fn open_repo(repos_root: &Path, owner: &str, repo: &str) -> Result<Repositor
256 256 Repository::open_bare(&canonical_repo).map_err(|_| GitError::RepoNotFound)
257 257 }
258 258
259 + /// Open a bare repository at an already-resolved, validated path (e.g. one
260 + /// returned by [`repo_disk_path`]). Used to (re)open the repo inside a
261 + /// `spawn_blocking` closure, since `git2::Repository` is `!Send` and cannot
262 + /// cross the await boundary.
263 + pub fn open_repo_at(repo_path: &Path) -> Result<Repository, GitError> {
264 + Repository::open_bare(repo_path).map_err(|_| GitError::RepoNotFound)
265 + }
266 +
259 267 /// Get basic repository info.
260 268 pub fn repo_info(repo: &Repository, name: &str) -> RepoInfo {
261 269 let description = std::fs::read_to_string(repo.path().join("description"))
@@ -224,6 +224,11 @@ pub fn build_app(
224 224 );
225 225 }
226 226
227 + // All rate limiters were registered as they were built above; start the
228 + // periodic GC that sweeps their bucket maps so they don't grow unbounded for
229 + // process lifetime (Run #14 CHRONIC 1). Guarded by `Once` internally.
230 + crate::rate_limit::start_governor_sweeper();
231 +
227 232 app.layer(middleware::from_fn_with_state(state.clone(), access_gate::access_gate_middleware))
228 233 .layer(middleware::from_fn_with_state(state.clone(), security_headers_middleware))
229 234 .layer(middleware::from_fn(metrics::cache_control_middleware))
@@ -25,12 +25,6 @@ fn parse_subscription_id(stripe_sub_id: &str) -> Result<stripe_shared::Subscript
25 25 })
26 26 }
27 27
28 - fn parse_account_id_internal(account_id: &str) -> Result<stripe_shared::AccountId> {
29 - account_id.parse().map_err(|_| {
30 - AppError::Internal(anyhow::anyhow!("Invalid Stripe account ID"))
31 - })
32 - }
33 -
34 28 impl StripeClient {
35 29 /// Create a Stripe Standard connected account for a creator.
36 30 #[tracing::instrument(skip_all, name = "payments::create_connect_account")]
@@ -155,7 +149,7 @@ impl StripeClient {
155 149 stripe_sub_id: &str,
156 150 connected_account_id: &str,
157 151 ) -> Result<()> {
158 - let acct = parse_account_id_internal(connected_account_id)?;
152 + let acct = Self::parse_account_id(connected_account_id)?;
159 153 let sub_id = parse_subscription_id(stripe_sub_id)?;
160 154
161 155 UpdateSubscription::new(sub_id)
@@ -184,7 +178,7 @@ impl StripeClient {
184 178 stripe_sub_id: &str,
185 179 connected_account_id: &str,
186 180 ) -> Result<()> {
187 - let acct = parse_account_id_internal(connected_account_id)?;
181 + let acct = Self::parse_account_id(connected_account_id)?;
188 182 let sub_id = parse_subscription_id(stripe_sub_id)?;
189 183
190 184 ResumeSubscription::new(sub_id)
@@ -207,7 +201,7 @@ impl StripeClient {
207 201 stripe_sub_id: &str,
208 202 connected_account_id: &str,
209 203 ) -> Result<()> {
210 - let acct = parse_account_id_internal(connected_account_id)?;
204 + let acct = Self::parse_account_id(connected_account_id)?;
211 205 let sub_id = parse_subscription_id(stripe_sub_id)?;
212 206
213 207 CancelSubscription::new(sub_id)
@@ -264,7 +258,7 @@ impl StripeClient {
264 258 connected_account_id: &str,
265 259 cancel: bool,
266 260 ) -> Result<()> {
267 - let acct = parse_account_id_internal(connected_account_id)?;
261 + let acct = Self::parse_account_id(connected_account_id)?;
268 262 let sub_id = parse_subscription_id(stripe_sub_id)?;
269 263 UpdateSubscription::new(sub_id)
270 264 .cancel_at_period_end(cancel)
@@ -305,7 +299,7 @@ impl StripeClient {
305 299 payment_intent_id: &str,
306 300 connected_account_id: &str,
307 301 ) -> Result<()> {
308 - let acct = parse_account_id_internal(connected_account_id)?;
302 + let acct = Self::parse_account_id(connected_account_id)?;
309 303 CreateRefund::new()
310 304 .payment_intent(payment_intent_id.to_string())
311 305 .customize()
@@ -319,3 +313,27 @@ impl StripeClient {
319 313 Ok(())
320 314 }
321 315 }
316 +
317 + #[cfg(test)]
318 + mod tests {
319 + use super::*;
320 +
321 + // NOTE: async-stripe's `*Id` types are permissive newtypes — `FromStr`
322 + // accepts any non-pathological string without validating the `acct_`/`sub_`
323 + // prefix, so there is no error path to assert on normal input. These tests
324 + // pin what is actually observable: canonical IDs parse and round-trip, and
325 + // both account-id call sites now go through the single `parse_account_id`
326 + // (the divergent `parse_account_id_internal` was deleted in Run #14).
327 +
328 + #[test]
329 + fn account_id_parses_and_round_trips() {
330 + let acct = StripeClient::parse_account_id("acct_1A2b3C4d5E6f7G").unwrap();
331 + assert_eq!(acct.to_string(), "acct_1A2b3C4d5E6f7G");
332 + }
333 +
334 + #[test]
335 + fn subscription_id_parses_and_round_trips() {
336 + let sub = parse_subscription_id("sub_1A2b3C4d5E6f7G8h").unwrap();
337 + assert_eq!(sub.to_string(), "sub_1A2b3C4d5E6f7G8h");
338 + }
339 + }
@@ -48,9 +48,13 @@ impl StripeClient {
48 48 }
49 49
50 50 /// Parse a connected account ID string into an `AccountId`.
51 + ///
52 + /// Account IDs are read from our own DB (`users.stripe_account_id`), so a
53 + /// parse failure is an internal invariant violation rather than bad user
54 + /// input — classify it `Internal` and keep the underlying error for ops.
51 55 pub(crate) fn parse_account_id(account_id: &str) -> Result<stripe_shared::AccountId> {
52 - account_id.parse().map_err(|_| {
53 - AppError::BadRequest("Invalid Stripe account ID format".to_string())
56 + account_id.parse().map_err(|e| {
57 + AppError::Internal(anyhow::anyhow!("Invalid Stripe account ID '{}': {}", account_id, e))
54 58 })
55 59 }
56 60 }
@@ -127,6 +127,11 @@ pub struct CheckoutSessionView {
127 127 pub customer: Option<String>,
128 128 #[serde(default)]
129 129 pub customer_details: Option<CheckoutCustomerDetailsView>,
130 + /// Pre-tax line-item total (cents) Stripe computed for the session. Used
131 + /// only as a defense-in-depth reconciliation against our server-built line
132 + /// items; absent on older/edge events, hence `Option`.
133 + #[serde(default)]
134 + pub amount_subtotal: Option<i64>,
130 135 }
131 136
132 137 #[derive(Debug, Default, serde::Deserialize)]
@@ -141,6 +141,54 @@ impl KeyExtractor for SyncAppKeyExtractor {
141 141
142 142 // ── Config builders ──
143 143
144 + // ── Bucket-map sweeping (Run #14 CHRONIC 1) ──
145 +
146 + /// Type-erased `retain_recent` GC hooks, one per limiter built below. Each hook
147 + /// sweeps one limiter's keyed GCRA store and returns its post-sweep entry count.
148 + ///
149 + /// tower_governor's in-memory store grows one entry per unique client key for
150 + /// process lifetime unless swept, so EVERY limiter must be registered here. The
151 + /// three `rate_limiter_*` constructors below are the only sanctioned way to
152 + /// build a limiter precisely because they register on construct — do not build a
153 + /// `GovernorConfig` directly (it would leak, unswept). The registry is touched
154 + /// only at startup (registration) and once per sweep interval, so the `Mutex` is
155 + /// effectively uncontended; no lock is ever held across an `.await`.
156 + static GOVERNOR_SWEEPERS: std::sync::Mutex<Vec<Box<dyn Fn() -> usize + Send + Sync>>> =
157 + std::sync::Mutex::new(Vec::new());
158 +
159 + /// Register a limiter's GC hook. Monomorphized at each call site (where the
160 + /// limiter's concrete key type is known), so this stays non-generic.
161 + fn register_for_sweep(hook: impl Fn() -> usize + Send + Sync + 'static) {
162 + if let Ok(mut hooks) = GOVERNOR_SWEEPERS.lock() {
163 + hooks.push(Box::new(hook));
164 + }
165 + }
166 +
167 + /// Spawn the periodic task that sweeps every registered limiter's bucket map.
168 + /// Call once at startup (guarded by `Once`, so extra calls — e.g. per-test
169 + /// `build_app` — are no-ops). Requires a Tokio runtime.
170 + pub fn start_governor_sweeper() {
171 + static STARTED: std::sync::Once = std::sync::Once::new();
172 + STARTED.call_once(|| {
173 + tokio::spawn(async {
174 + let interval =
175 + std::time::Duration::from_secs(crate::constants::GOVERNOR_SWEEP_INTERVAL_SECS);
176 + loop {
177 + tokio::time::sleep(interval).await;
178 + // Collect counts without holding the lock across any await.
179 + let (limiters, retained) = {
180 + let Ok(hooks) = GOVERNOR_SWEEPERS.lock() else {
181 + continue;
182 + };
183 + let retained: usize = hooks.iter().map(|hook| hook()).sum();
184 + (hooks.len(), retained)
185 + };
186 + tracing::debug!(limiters, retained_keys = retained, "swept governor bucket maps");
187 + }
188 + });
189 + });
190 + }
191 +
144 192 /// Build an IP-based rate limiter from a per-millisecond interval and burst size.
145 193 pub fn rate_limiter_ms(
146 194 ms: u64,
@@ -151,7 +199,7 @@ pub fn rate_limiter_ms(
151 199 ::governor::middleware::StateInformationMiddleware,
152 200 >,
153 201 > {
154 - std::sync::Arc::new(
202 + let config = std::sync::Arc::new(
155 203 tower_governor::governor::GovernorConfigBuilder::default()
156 204 .key_extractor(CloudflareIpKeyExtractor)
157 205 .per_millisecond(ms)
@@ -159,7 +207,13 @@ pub fn rate_limiter_ms(
159 207 .use_headers()
160 208 .finish()
161 209 .expect("rate limiter config"),
162 - )
210 + );
211 + let limiter = config.limiter().clone();
212 + register_for_sweep(move || {
213 + limiter.retain_recent();
214 + limiter.len()
215 + });
216 + config
163 217 }
164 218
165 219 /// Build an IP-based rate limiter from a per-second rate and burst size.
@@ -172,7 +226,7 @@ pub fn rate_limiter_per_sec(
172 226 ::governor::middleware::StateInformationMiddleware,
173 227 >,
174 228 > {
175 - std::sync::Arc::new(
229 + let config = std::sync::Arc::new(
176 230 tower_governor::governor::GovernorConfigBuilder::default()
177 231 .key_extractor(CloudflareIpKeyExtractor)
178 232 .per_second(per_sec)
@@ -180,7 +234,13 @@ pub fn rate_limiter_per_sec(
180 234 .use_headers()
181 235 .finish()
182 236 .expect("rate limiter config"),
183 - )
237 + );
238 + let limiter = config.limiter().clone();
239 + register_for_sweep(move || {
240 + limiter.retain_recent();
241 + limiter.len()
242 + });
243 + config
184 244 }
185 245
186 246 /// Build a per-SyncKit-app rate limiter from a per-millisecond interval and burst size.
@@ -197,7 +257,7 @@ pub fn synckit_app_rate_limiter_ms(
197 257 ::governor::middleware::StateInformationMiddleware,
198 258 >,
199 259 > {
200 - std::sync::Arc::new(
260 + let config = std::sync::Arc::new(
201 261 tower_governor::governor::GovernorConfigBuilder::default()
202 262 .key_extractor(SyncAppKeyExtractor::new(secret))
203 263 .per_millisecond(ms)
@@ -205,7 +265,13 @@ pub fn synckit_app_rate_limiter_ms(
205 265 .use_headers()
206 266 .finish()
207 267 .expect("synckit app rate limiter config"),
208 - )
268 + );
269 + let limiter = config.limiter().clone();
270 + register_for_sweep(move || {
271 + limiter.retain_recent();
272 + limiter.len()
273 + });
274 + config
209 275 }
210 276
211 277 #[cfg(test)]
@@ -32,12 +32,15 @@ pub(super) async fn repo_overview(
32 32 Path((owner, repo_name)): Path<(String, String)>,
33 33 ) -> Result<impl IntoResponse> {
34 34 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
35 - let info = git::repo_info(&resolved.git_repo, &repo_name);
36 - let refs = git::list_refs(&resolved.git_repo);
37 -
38 - let commit_oid = git::resolve_ref(&resolved.git_repo, &info.default_branch)?;
39 - let tree_items = git::list_tree(&resolved.git_repo, commit_oid, "")?;
40 - let readme_html = git::find_readme(&resolved.git_repo, commit_oid);
35 + let repo_name_c = repo_name.clone();
36 + let (info, refs, tree_items, readme_html) = resolved.with_repo(move |repo| {
37 + let info = git::repo_info(repo, &repo_name_c);
38 + let refs = git::list_refs(repo);
39 + let commit_oid = git::resolve_ref(repo, &info.default_branch)?;
40 + let tree_items = git::list_tree(repo, commit_oid, "")?;
41 + let readme_html = git::find_readme(repo, commit_oid);
42 + Ok((info, refs, tree_items, readme_html))
43 + }).await?;
41 44
42 45 let csrf_token = get_csrf_token(&session).await;
43 46
@@ -80,10 +83,14 @@ pub(super) async fn tree_root(
80 83 Path((owner, repo_name, git_ref)): Path<(String, String, String)>,
81 84 ) -> Result<impl IntoResponse> {
82 85 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
83 - let refs = git::list_refs(&resolved.git_repo);
84 - let commit_oid = git::resolve_ref(&resolved.git_repo, &git_ref)?;
85 - let tree_items = git::list_tree(&resolved.git_repo, commit_oid, "")?;
86 - let readme_html = git::find_readme(&resolved.git_repo, commit_oid);
86 + let git_ref_c = git_ref.clone();
87 + let (refs, tree_items, readme_html) = resolved.with_repo(move |repo| {
88 + let refs = git::list_refs(repo);
89 + let commit_oid = git::resolve_ref(repo, &git_ref_c)?;
90 + let tree_items = git::list_tree(repo, commit_oid, "")?;
91 + let readme_html = git::find_readme(repo, commit_oid);
92 + Ok((refs, tree_items, readme_html))
93 + }).await?;
87 94
88 95 let csrf_token = get_csrf_token(&session).await;
89 96 let is_owner = maybe_user.as_ref().map(|u| u.id) == Some(resolved.db_user.id);
@@ -126,39 +133,58 @@ pub(super) async fn tree_or_file(
126 133 Path((owner, repo_name, git_ref, path)): Path<(String, String, String, String)>,
127 134 ) -> Result<Response> {
128 135 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
129 - let commit_oid = git::resolve_ref(&resolved.git_repo, &git_ref)?;
130 136 let is_owner = maybe_user.as_ref().map(|u| u.id) == Some(resolved.db_user.id);
131 137 let (open_issue_count, _) = db::issues::get_issue_counts(&state.db, resolved.db_repo.id).await.unwrap_or((0, 0));
132 138
133 - // Try as directory first
134 - if let Ok(tree_items) = git::list_tree(&resolved.git_repo, commit_oid, &path) {
135 - let refs = git::list_refs(&resolved.git_repo);
136 - let breadcrumbs = build_breadcrumbs(&path);
137 - let parent_path = parent_of(&path);
138 - let csrf_token = get_csrf_token(&session).await;
139 -
140 - let in_subdir = !path.is_empty();
141 - return Ok(GitTreeTemplate {
142 - csrf_token,
143 - session_user: maybe_user,
144 - owner,
145 - repo_name,
146 - current_ref: git_ref,
147 - refs,
148 - path,
149 - parent_path,
150 - in_subdir,
151 - breadcrumbs,
152 - tree_items,
153 - open_issue_count,
154 - is_owner,
155 - active_tab: "files",
156 - }
157 - .into_response());
139 + // Resolve the ref and read the path as a directory (preferred) or file in
140 + // one blocking-pool pass.
141 + enum DirOrFile {
142 + Dir(Vec<git::TreeItem>, Vec<git::RefInfo>),
143 + File(git::FileContent, Vec<git::RefInfo>),
158 144 }
145 + let git_ref_c = git_ref.clone();
146 + let path_c = path.clone();
147 + let dir_or_file = resolved.with_repo(move |repo| {
148 + let commit_oid = git::resolve_ref(repo, &git_ref_c)?;
149 + match git::list_tree(repo, commit_oid, &path_c) {
150 + Ok(tree_items) => Ok(DirOrFile::Dir(tree_items, git::list_refs(repo))),
151 + Err(_) => {
152 + let file_content = git::read_file(repo, commit_oid, &path_c)?;
153 + Ok(DirOrFile::File(file_content, git::list_refs(repo)))
154 + }
155 + }
156 + }).await?;
157 +
158 + // A directory listing renders and returns here; a file falls through.
159 + let (file_content, refs) = match dir_or_file {
160 + DirOrFile::Dir(tree_items, refs) => {
161 + let breadcrumbs = build_breadcrumbs(&path);
162 + let parent_path = parent_of(&path);
163 + let csrf_token = get_csrf_token(&session).await;
164 +
165 + let in_subdir = !path.is_empty();
166 + return Ok(GitTreeTemplate {
167 + csrf_token,
168 + session_user: maybe_user,
169 + owner,
170 + repo_name,
171 + current_ref: git_ref,
172 + refs,
173 + path,
174 + parent_path,
175 + in_subdir,
176 + breadcrumbs,
177 + tree_items,
178 + open_issue_count,
179 + is_owner,
180 + active_tab: "files",
181 + }
182 + .into_response());
183 + }
184 + DirOrFile::File(file_content, refs) => (file_content, refs),
185 + };
159 186
160 - // Try as file
161 - let file_content = git::read_file(&resolved.git_repo, commit_oid, &path)?;
187 + // File view.
162 188 let filename = path
163 189 .rsplit('/')
164 190 .next()
@@ -166,7 +192,6 @@ pub(super) async fn tree_or_file(
166 192 .to_string();
167 193 let breadcrumbs = build_breadcrumbs(&path);
168 194 let file_size = format_size(&file_content.size);
169 - let refs = git::list_refs(&resolved.git_repo);
170 195
171 196 let highlighted_lines = if file_content.is_binary {
172 197 Vec::new()
@@ -225,14 +250,18 @@ pub(super) async fn commit_log(
225 250 Query(query): Query<CommitQuery>,
226 251 ) -> Result<impl IntoResponse> {
227 252 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
228 - let refs = git::list_refs(&resolved.git_repo);
229 - let commit_oid = git::resolve_ref(&resolved.git_repo, &git_ref)?;
230 253
231 254 let page = query.page.unwrap_or(1).clamp(1, 10_000);
232 255 let limit = constants::GIT_COMMITS_PER_PAGE;
233 256 let offset = (page - 1).saturating_mul(limit);
234 257
235 - let commits = git::commit_log(&resolved.git_repo, commit_oid, limit + 1, offset)?;
258 + let git_ref_c = git_ref.clone();
259 + let (refs, commits) = resolved.with_repo(move |repo| {
260 + let refs = git::list_refs(repo);
261 + let commit_oid = git::resolve_ref(repo, &git_ref_c)?;
262 + let commits = git::commit_log(repo, commit_oid, limit + 1, offset)?;
263 + Ok((refs, commits))
264 + }).await?;
236 265 let has_more = commits.len() > limit;
237 266 let commits: Vec<_> = commits.into_iter().take(limit).collect();
238 267
@@ -268,22 +297,25 @@ pub(super) async fn commit_detail_page(
268 297 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
269 298
270 299 let oid = git2::Oid::from_str(&oid_str).map_err(|_| AppError::NotFound)?;
271 - resolved.git_repo.find_commit(oid).map_err(|_| AppError::NotFound)?;
272 -
273 - let detail = git::commit_detail(&resolved.git_repo, oid)?;
274 - let diff_files = git::commit_diff(
275 - &resolved.git_repo,
276 - oid,
277 - constants::GIT_DIFF_MAX_FILES,
278 - constants::GIT_DIFF_MAX_LINES,
279 - )?;
300 + let repo_name_c = repo_name.clone();
301 + let (detail, diff_files, refs, info) = resolved.with_repo(move |repo| {
302 + repo.find_commit(oid).map_err(|_| AppError::NotFound)?;
303 + let detail = git::commit_detail(repo, oid)?;
304 + let diff_files = git::commit_diff(
305 + repo,
306 + oid,
307 + constants::GIT_DIFF_MAX_FILES,
308 + constants::GIT_DIFF_MAX_LINES,
309 + )?;
310 + let refs = git::list_refs(repo);
311 + let info = git::repo_info(repo, &repo_name_c);
312 + Ok((detail, diff_files, refs, info))
313 + }).await?;
280 314
281 315 let total_files = diff_files.len();
282 316 let total_additions: usize = diff_files.iter().map(|f| f.additions).sum();
283 317 let total_deletions: usize = diff_files.iter().map(|f| f.deletions).sum();
284 318
285 - let refs = git::list_refs(&resolved.git_repo);
286 - let info = git::repo_info(&resolved.git_repo, &repo_name);
287 319 let csrf_token = get_csrf_token(&session).await;
288 320 let is_owner = maybe_user.as_ref().map(|u| u.id) == Some(resolved.db_user.id);
289 321 let (open_issue_count, _) = db::issues::get_issue_counts(&state.db, resolved.db_repo.id).await.unwrap_or((0, 0));
@@ -315,12 +347,17 @@ pub(super) async fn blame_view(
315 347 Path((owner, repo_name, git_ref, path)): Path<(String, String, String, String)>,
316 348 ) -> Result<impl IntoResponse> {
317 349 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
318 - let commit_oid = git::resolve_ref(&resolved.git_repo, &git_ref)?;
319 - let blame_lines = git::blame_file(&resolved.git_repo, commit_oid, &path)?;
350 + let git_ref_c = git_ref.clone();
351 + let path_c = path.clone();
352 + let (blame_lines, refs) = resolved.with_repo(move |repo| {
353 + let commit_oid = git::resolve_ref(repo, &git_ref_c)?;
354 + let blame_lines = git::blame_file(repo, commit_oid, &path_c)?;
355 + let refs = git::list_refs(repo);
356 + Ok((blame_lines, refs))
357 + }).await?;
320 358
321 359 let filename = path.rsplit('/').next().unwrap_or(&path).to_string();
322 360 let breadcrumbs = build_breadcrumbs(&path);
323 - let refs = git::list_refs(&resolved.git_repo);
324 361
325 362 let csrf_token = get_csrf_token(&session).await;
326 363 let is_owner = maybe_user.as_ref().map(|u| u.id) == Some(resolved.db_user.id);
@@ -420,21 +457,26 @@ pub(super) async fn file_log(
420 457 Query(query): Query<CommitQuery>,
421 458 ) -> Result<impl IntoResponse> {
422 459 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
423 - let refs = git::list_refs(&resolved.git_repo);
424 - let commit_oid = git::resolve_ref(&resolved.git_repo, &git_ref)?;
425 460
426 461 let page = query.page.unwrap_or(1).clamp(1, 10_000);
427 462 let limit = constants::GIT_COMMITS_PER_PAGE;
428 463 let offset = (page - 1).saturating_mul(limit);
429 464
430 - let commits = git::file_commit_log(
431 - &resolved.git_repo,
432 - commit_oid,
433 - &path,
434 - limit + 1,
435 - offset,
436 - constants::GIT_FILE_LOG_MAX_WALK,
437 - )?;
465 + let git_ref_c = git_ref.clone();
466 + let path_c = path.clone();
467 + let (refs, commits) = resolved.with_repo(move |repo| {
468 + let refs = git::list_refs(repo);
469 + let commit_oid = git::resolve_ref(repo, &git_ref_c)?;
470 + let commits = git::file_commit_log(
471 + repo,
472 + commit_oid,
473 + &path_c,
474 + limit + 1,
475 + offset,
476 + constants::GIT_FILE_LOG_MAX_WALK,
477 + )?;
478 + Ok((refs, commits))
479 + }).await?;
438 480 let has_more = commits.len() > limit;
439 481 let commits: Vec<_> = commits.into_iter().take(limit).collect();
440 482
@@ -3,12 +3,13 @@
3 3 mod browsing;
4 4 mod raw;
5 5
6 + use std::path::PathBuf;
7 +
6 8 use axum::{
7 9 extract::DefaultBodyLimit,
8 10 routing::{get, post},
9 11 Router,
10 12 };
11 - use git2::Repository;
12 13 use tower_governor::GovernorLayer;
13 14
14 15 use crate::{
@@ -69,12 +70,14 @@ pub fn format_size(bytes: &u64) -> String {
69 70 }
70 71 }
71 72
72 - /// Escape HTML special characters.
73 + /// Escape HTML special characters, including `'` so the output is safe in
74 + /// single-quoted attribute contexts as well as element text.
73 75 fn escape_html(s: &str) -> String {
74 76 s.replace('&', "&amp;")
75 77 .replace('<', "&lt;")
76 78 .replace('>', "&gt;")
77 79 .replace('"', "&quot;")
80 + .replace('\'', "&#39;")
78 81 }
79 82
80 83 /// Get the repos root path from config, or return 404 if git is not configured.
@@ -121,13 +124,39 @@ fn parent_of(path: &str) -> String {
121 124 // Repo resolution + visibility
122 125 // ============================================================================
123 126
124 - /// Result of resolving a git repo from the URL: DB record, opened git2 repo, and owner user.
127 + /// Result of resolving a git repo from the URL: DB record, validated on-disk
128 + /// path, and owner user.
129 + ///
130 + /// We store the *path* rather than an open `git2::Repository` because the repo
131 + /// handle is `!Send` and would pin the synchronous libgit2 work to the async
132 + /// runtime. Handlers run their libgit2 reads through [`ResolvedRepo::with_repo`],
133 + /// which reopens the repo on the blocking pool.
125 134 pub(crate) struct ResolvedRepo {
126 135 pub(crate) db_repo: DbGitRepo,
127 - pub(crate) git_repo: Repository,
136 + pub(crate) repo_path: PathBuf,
128 137 pub(crate) db_user: DbUser,
129 138 }
130 139
140 + impl ResolvedRepo {
141 + /// Run a synchronous libgit2 closure on the blocking pool against a freshly
142 + /// opened handle. `git2::Repository` is `!Send`, so the handle is reopened
143 + /// inside the closure rather than moved across the await. Keeps libgit2's
144 + /// disk + zlib work off the Tokio worker threads (Run #14 Perf LOW).
145 + pub(crate) async fn with_repo<T, F>(&self, f: F) -> Result<T>
146 + where
147 + T: Send + 'static,
148 + F: FnOnce(&git2::Repository) -> Result<T> + Send + 'static,
149 + {
150 + let path = self.repo_path.clone();
151 + tokio::task::spawn_blocking(move || {
152 + let repo = git::open_repo_at(&path)?;
153 + f(&repo)
154 + })
155 + .await
156 + .map_err(|e| AppError::Internal(anyhow::anyhow!("git worker task failed: {e}")))?
157 + }
158 + }
159 +
131 160 /// Strip optional `.git` suffix from a repo name (for HTTP clone URLs).
132 161 fn resolve_repo_name(repo_name: &str) -> &str {
133 162 repo_name.strip_suffix(".git").unwrap_or(repo_name)
@@ -175,10 +204,12 @@ pub(crate) async fn resolve_repo(
175 204 return Err(AppError::NotFound);
176 205 }
177 206
178 - // 4. Open repo from disk
179 - let git_repo = git::open_repo(&root, owner, repo_name)?;
207 + // 4. Resolve + validate the on-disk path (segment + symlink-traversal
208 + // checks; canonicalize also confirms the repo exists). The handle itself is
209 + // opened per-operation on the blocking pool via `with_repo`.
210 + let repo_path = git::repo_disk_path(&root, owner, repo_name)?;
180 211
181 - Ok(ResolvedRepo { db_repo, git_repo, db_user })
212 + Ok(ResolvedRepo { db_repo, repo_path, db_user })
182 213 }
183 214
184 215 // ============================================================================
@@ -26,28 +26,29 @@ pub(super) async fn raw_file(
26 26 Path((owner, repo_name, git_ref, path)): Path<(String, String, String, String)>,
27 27 ) -> Result<Response> {
28 28 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
29 - let commit_oid = git::resolve_ref(&resolved.git_repo, &git_ref)?;
30 -
31 - let commit = resolved.git_repo
32 - .find_commit(commit_oid)
33 - .map_err(|_| AppError::NotFound)?;
34 - let tree = commit.tree().map_err(|_| AppError::NotFound)?;
35 - let entry = tree
36 - .get_path(std::path::Path::new(&path))
37 - .map_err(|_| AppError::NotFound)?;
38 - let obj = entry.to_object(&resolved.git_repo).map_err(|_| AppError::NotFound)?;
39 - let blob = obj.as_blob().ok_or(AppError::NotFound)?;
40 -
41 - // Prevent unbounded memory allocation from large blobs
42 - let size = blob.size();
43 - if size > constants::GIT_RAW_MAX_BYTES {
44 - return Err(AppError::BadRequest(format!(
45 - "File too large for raw download ({} bytes, max {})",
46 - size, constants::GIT_RAW_MAX_BYTES,
47 - )));
48 - }
49 29
50 - let content = blob.content().to_vec();
30 + let git_ref_c = git_ref.clone();
31 + let path_c = path.clone();
32 + let content: Vec<u8> = resolved.with_repo(move |repo| {
33 + let commit_oid = git::resolve_ref(repo, &git_ref_c)?;
34 + let commit = repo.find_commit(commit_oid).map_err(|_| AppError::NotFound)?;
35 + let tree = commit.tree().map_err(|_| AppError::NotFound)?;
36 + let entry = tree
37 + .get_path(std::path::Path::new(&path_c))
38 + .map_err(|_| AppError::NotFound)?;
39 + let obj = entry.to_object(repo).map_err(|_| AppError::NotFound)?;
40 + let blob = obj.as_blob().ok_or(AppError::NotFound)?;
41 +
42 + // Prevent unbounded memory allocation from large blobs.
43 + let size = blob.size();
44 + if size > constants::GIT_RAW_MAX_BYTES {
45 + return Err(AppError::BadRequest(format!(
46 + "File too large for raw download ({} bytes, max {})",
47 + size, constants::GIT_RAW_MAX_BYTES,
48 + )));
49 + }
50 + Ok(blob.content().to_vec())
51 + }).await?;
51 52
52 53 let filename = path.rsplit('/').next().unwrap_or(&path);
53 54 let content_type = if filename.ends_with(".rs")
@@ -257,10 +257,44 @@ pub(super) async fn gallery_confirm(
257 257
258 258 let alt = req.alt.trim().to_string();
259 259
260 + // Advisory-lock key for serializing concurrent confirms on this gallery:
261 + // class separates item vs project; obj folds the target UUID to i32 (a
262 + // collision only over-serializes two unrelated galleries, never under-).
263 + let gallery_lock_class: i32 = match target {
264 + GalleryTarget::Item => 0,
265 + GalleryTarget::Project => 1,
266 + };
267 + let gallery_lock_obj: i32 = {
268 + let b = req.target_id.as_bytes();
269 + i32::from_le_bytes([b[0], b[1], b[2], b[3]])
270 + ^ i32::from_le_bytes([b[4], b[5], b[6], b[7]])
271 + ^ i32::from_le_bytes([b[8], b[9], b[10], b[11]])
272 + ^ i32::from_le_bytes([b[12], b[13], b[14], b[15]])
273 + };
274 +
260 275 // Storage increment + row INSERT in ONE transaction (gallery is add-only, so
261 276 // a pure increment — no old-key replacement). A rollback restores the counter.
262 277 let committed: Result<Uuid> = async {
263 278 let mut tx = state.db.begin().await?;
279 + // Serialize concurrent confirms for this gallery and re-count INSIDE the
280 + // tx, so two inserts that both passed the best-effort pre-check above
281 + // can't push it over MAX_GALLERY_IMAGES (Run #14 Storage LOW). A failure
282 + // here routes through the orphan-enqueue cleanup below.
283 + sqlx::query("SELECT pg_advisory_xact_lock($1, $2)")
284 + .bind(gallery_lock_class)
285 + .bind(gallery_lock_obj)
286 + .execute(&mut *tx)
287 + .await?;
288 + let count_in_tx = match target {
289 + GalleryTarget::Item => db::gallery_images::count_for_item(&mut *tx, ItemId::from(req.target_id)).await?,
290 + GalleryTarget::Project => db::gallery_images::count_for_project(&mut *tx, ProjectId::from(req.target_id)).await?,
291 + };
292 + if count_in_tx >= db::gallery_images::MAX_GALLERY_IMAGES {
293 + return Err(AppError::BadRequest(format!(
294 + "Gallery is full (max {} images)",
295 + db::gallery_images::MAX_GALLERY_IMAGES
296 + )));
297 + }
264 298 db::creator_tiers::try_increment_storage_on(&mut tx, user.id, file_size_bytes, max_storage).await?;
265 299 let id = match target {
266 300 GalleryTarget::Item => {
@@ -43,7 +43,11 @@ pub(in crate::routes::stripe) struct CartCheckoutForm {
43 43 pub promo_code: Option<String>,
44 44 }
45 45
46 - /// POST /stripe/checkout/cart - Checkout all cart items from one seller
46 + /// POST /stripe/checkout/cart - Checkout all cart items from one seller.
47 + ///
48 + /// Thin wrapper over [`checkout_seller_cart`]: enforces the buyer-side
49 + /// preconditions (suspended/sandbox/self-purchase) the chained path doesn't
50 + /// need, then maps the core's `Option<url>` onto a redirect.
47 51 #[tracing::instrument(skip_all, name = "stripe::cart_checkout", fields(user_id = %user.id))]
48 52 pub(in crate::routes::stripe) async fn create_cart_checkout(
49 53 State(state): State<AppState>,
@@ -60,62 +64,98 @@ pub(in crate::routes::stripe) async fn create_cart_checkout(
60 64 return Err(AppError::BadRequest("You cannot purchase your own items".to_string()));
61 65 }
62 66
63 - // Get all cart items for this seller
64 - let cart_items = db::cart::get_cart_items_for_seller(&state.db, user.id, seller_id).await
65 - .context("fetch cart items for seller")?;
67 + match checkout_seller_cart(&state, &user, seller_id, form.share_contact, form.promo_code.as_deref()).await? {
68 + Some(url) => Ok(Redirect::to(&url).into_response()),
69 + None => Ok(Redirect::to("/library?purchase=success").into_response()),
70 + }
71 + }
72 +
73 + /// Form data for checkout-all (cross-seller).
74 + #[derive(Debug, Deserialize)]
75 + pub(in crate::routes::stripe) struct CartCheckoutAllForm {
76 + #[serde(default)]
77 + pub share_contact: bool,
78 + }
79 +
80 + /// POST /stripe/checkout/cart/all - Checkout all cart items across all sellers.
81 + ///
82 + /// Queues seller IDs in the session, processes the first seller, then chains
83 + /// through the rest via checkout_success redirects.
84 + #[tracing::instrument(skip_all, name = "stripe::cart_checkout_all", fields(user_id = %user.id))]
85 + pub(in crate::routes::stripe) async fn create_cart_checkout_all(
86 + State(state): State<AppState>,
87 + AuthUser(user): AuthUser,
88 + session: tower_sessions::Session,
89 + Form(form): Form<CartCheckoutAllForm>,
90 + ) -> Result<Response> {
91 + user.check_not_suspended()?;
92 + user.check_not_sandbox()?;
93 +
94 + let cart_items = db::cart::get_cart_items(&state.db, user.id).await
95 + .context("fetch all cart items")?;
66 96
67 97 if cart_items.is_empty() {
68 - return Err(AppError::BadRequest("No items in cart for this creator".to_string()));
98 + return Ok(Redirect::to("/cart").into_response());
69 99 }
70 100
71 - // Verify seller has Stripe connected
72 - let seller = db::users::get_user_by_id(&state.db, seller_id)
73 - .await
74 - .context("fetch seller")?
75 - .ok_or(AppError::NotFound)?;
101 + // Group by seller, collect unique seller IDs in order
102 + let mut seen = std::collections::HashSet::new();
103 + let mut seller_ids: Vec<String> = Vec::new();
104 + for item in &cart_items {
105 + let sid = item.seller_id.to_string();
106 + if seen.insert(sid.clone()) {
107 + seller_ids.push(sid);
108 + }
109 + }
76 110
77 - if seller.is_suspended() {
78 - return Err(AppError::BadRequest("This creator's account is currently unavailable".to_string()));
111 + if seller_ids.is_empty() {
112 + return Ok(Redirect::to("/cart").into_response());
79 113 }
80 114
81 - // Bulk-check ownership in a single query instead of N sequential roundtrips.
82 - let cart_item_ids: Vec<db::ItemId> = cart_items.iter().map(|c| c.item_id).collect();
83 - let already_owned = db::transactions::purchased_subset(&state.db, user.id, &cart_item_ids)
84 - .await
85 - .context("bulk check existing purchases")?;
115 + // Queue remaining sellers (all except the first) in session
116 + let first_seller = seller_ids.remove(0);
117 + if !seller_ids.is_empty() {
118 + session.insert("cart_queue", seller_ids).await
119 + .map_err(|e| AppError::BadRequest(format!("session error: {e}")))?;
120 + session.insert("cart_share_contact", form.share_contact).await
121 + .map_err(|e| AppError::BadRequest(format!("session error: {e}")))?;
122 + }
86 123
87 - let mut free_items = Vec::new();
88 - let mut paid_items = Vec::new();
89 - for item in &cart_items {
90 - if already_owned.contains(&item.item_id) {
91 - if let Err(e) = db::cart::remove_from_cart(&state.db, user.id, item.item_id).await {
92 - tracing::warn!(
93 - user_id = %user.id, item_id = %item.item_id, error = ?e,
94 - "failed to remove already-purchased item from cart; buyer will see it lingering on /cart"
95 - );
96 - }
97 - continue;
98 - }
99 - if item.is_free() {
100 - free_items.push(item);
101 - } else {
102 - paid_items.push(item);
103 - }
124 + // Process the first seller and chain through the queue until we hit a
125 + // paid seller (return its Stripe URL) or exhaust everything as free.
126 + match drain_to_paid(&state, &user, first_seller, form.share_contact, &session).await? {
127 + Some(url) => Ok(Redirect::to(&url).into_response()),
128 + None => Ok(Redirect::to("/library?purchase=success").into_response()),
104 129 }
130 + }
105 131
106 - // Claim free items immediately. Bundle/license metadata is pulled
107 - // through CartItem so this loop doesn't need a per-item `get_item_by_id`;
108 - // cart rows are bulk-deleted at the end so this loop doesn't fire N
109 - // DELETEs either (Run #8 perf MED).
110 - let mut to_remove: Vec<db::ItemId> = Vec::with_capacity(free_items.len());
111 - for item in &free_items {
132 + /// Claim a set of free (or discount-zeroed) cart items: insert the free
133 + /// transaction, bump the sales count, and grant bundle items + a license key
134 + /// when applicable, then bulk-remove the claimed rows from the cart.
135 + ///
136 + /// Bundle/license fields come from `CartItem`, so this does no per-item
137 + /// `get_item_by_id`, and the cart rows are removed in one bulk DELETE after the
138 + /// loop (Run #8 perf MED). Shared by the free-by-price and discount-zeroed
139 + /// passes so the claim logic exists in exactly one place.
140 + async fn claim_free_cart_items(
141 + state: &AppState,
142 + user_id: UserId,
143 + seller_id: UserId,
144 + items: &[&db::cart::CartItem],
145 + share_contact: bool,
146 + ) -> Result<()> {
147 + if items.is_empty() {
148 + return Ok(());
149 + }
150 + let mut to_remove: Vec<db::ItemId> = Vec::with_capacity(items.len());
151 + for item in items {
112 152 let claim = db::transactions::ClaimParams {
113 - buyer_id: user.id,
153 + buyer_id: user_id,
114 154 item_id: item.item_id,
115 155 seller_id,
116 156 item_title: &item.title,
117 157 seller_username: &item.creator_username,
118 - share_contact: form.share_contact,
158 + share_contact,
119 159 parent_transaction_id: None,
120 160 };
121 161
@@ -132,12 +172,12 @@ pub(in crate::routes::stripe) async fn create_cart_checkout(
132 172
133 173 if claimed {
134 174 if item.item_type == "bundle" {
135 - grant_bundle_items(&state, item.item_id, user.id, seller_id, None).await;
175 + grant_bundle_items(state, item.item_id, user_id, seller_id, None).await;
136 176 }
137 177 if item.enable_license_keys {
138 178 let key_code = helpers::generate_key_code();
139 179 db::license_keys::create_license_key(
140 - &state.db, item.item_id, user.id, None, &key_code,
180 + &state.db, item.item_id, user_id, None, &key_code,
141 181 item.default_max_activations,
142 182 ).await.ok();
143 183 }
@@ -145,199 +185,41 @@ pub(in crate::routes::stripe) async fn create_cart_checkout(
145 185
146 186 to_remove.push(item.item_id);
147 187 }
148 - db::cart::remove_from_cart_bulk(&state.db, user.id, &to_remove).await.ok();
149 -
150 - // Validate optional promo code and compute per-item discounted prices
151 - let mut promo_code_id: Option<PromoCodeId> = None;
152 - let mut discounted_prices: std::collections::HashMap<db::ItemId, i32> = std::collections::HashMap::new();
153 -
154 - if let Some(code_str) = form.promo_code.as_deref().map(str::trim).filter(|s| !s.is_empty())
155 - && let Some(validated) =
156 - db::promo_codes::lookup_and_validate_promo(&state.db, seller_id, Some(user.id), code_str).await?
157 - {
158 - use db::promo_codes::PromoApplication;
159 - // Apply to each eligible paid item; ineligible items (scope/min-price)
160 - // are skipped so the rest of the cart can still qualify.
161 - for item in &paid_items {
162 - if item.pwyw_enabled {
163 - continue; // PWYW items can't take a promo (single-item behavior)
164 - }
165 - if let PromoApplication::Apply(price) = db::promo_codes::apply_promo_to_item(
166 - &validated, item.item_id, item.project_id, item.effective_price_cents(),
167 - )? {
168 - discounted_prices.insert(item.item_id, price);
169 - }
170 - }
171 - promo_code_id = Some(validated.id());
172 - }
173 -
174 - // Re-classify items after discount: some paid items may now be free
175 - let mut newly_free = Vec::new();
176 - let mut still_paid = Vec::new();
177 - for item in &paid_items {
178 - let final_price = discounted_prices.get(&item.item_id).copied()
179 - .unwrap_or_else(|| item.effective_price_cents());
180 - if final_price == 0 {
181 - newly_free.push(item);
182 - } else {
183 - still_paid.push((item, final_price));
184 - }
185 - }
186 -
187 - // Claim discount-zeroed items as free. Same per-item-roundtrip discipline
188 - // as the free-by-price loop above: bundle/license fields come from CartItem,
189 - // cart rows are bulk-deleted after the loop.
190 - let mut to_remove_promo: Vec<db::ItemId> = Vec::with_capacity(newly_free.len());
191 - for item in &newly_free {
192 - let claim = db::transactions::ClaimParams {
193 - buyer_id: user.id,
194 - item_id: item.item_id,
195 - seller_id,
196 - item_title: &item.title,
197 - seller_username: &item.creator_username,
198 - share_contact: form.share_contact,
199 - parent_transaction_id: None,
200 - };
201 - let mut tx = state.db.begin().await.context("begin promo-free claim")?;
202 - let claimed = db::transactions::claim_free_item(&mut *tx, &claim)
203 - .await.context("claim promo-free item")?;
204 - if claimed {
205 - db::items::increment_sales_count(&mut *tx, item.item_id)
206 - .await.context("increment sales count")?;
207 - }
208 - tx.commit().await.context("commit promo-free claim")?;
209 - if claimed {
210 - if item.item_type == "bundle" {
211 - grant_bundle_items(&state, item.item_id, user.id, seller_id, None).await;
212 - }
213 - if item.enable_license_keys {
214 - let key_code = helpers::generate_key_code();
215 - db::license_keys::create_license_key(
216 - &state.db, item.item_id, user.id, None, &key_code,
217 - item.default_max_activations,
218 - ).await.ok();
219 - }
220 - }
221 - to_remove_promo.push(item.item_id);
222 - }
223 - db::cart::remove_from_cart_bulk(&state.db, user.id, &to_remove_promo).await.ok();
224 -
225 - // If no paid items remain after discounts, redirect to library
226 - if still_paid.is_empty() {
227 - if form.share_contact && (!free_items.is_empty() || !newly_free.is_empty()) {
228 - db::transactions::clear_contact_revocation(&state.db, user.id, seller_id)
229 - .await
230 - .context("clear contact revocation")?;
231 - }
232 - return Ok(Redirect::to("/library?purchase=success").into_response());
233 - }
234 -
235 - // Verify Stripe is ready for paid items BEFORE reserving the promo. The
236 - // previous order burned a use of single-use codes against creators with
237 - // no charges_enabled.
238 - let stripe_account_id = seller.stripe_account_id.as_ref()
239 - .ok_or_else(|| AppError::BadRequest("Creator hasn't set up payments yet".to_string()))?;
240 -
241 - if !seller.stripe_charges_enabled {
242 - return Err(AppError::BadRequest("Creator's payment account is not ready".to_string()));
243 - }
244 -
245 - let stripe = state.stripe.as_ref()
246 - .ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?;
247 -
248 - // Reserve promo code use_count before creating session
249 - if let Some(pc_id) = promo_code_id {
250 - let reserved = db::promo_codes::try_increment_use_count(&state.db, pc_id)
251 - .await
252 - .context("reserve promo code use at cart checkout")?;
253 - if !reserved {
254 - return Err(AppError::BadRequest("This promo code has reached its usage limit".to_string()));
255 - }
256 - }
257 -
258 - // Build Stripe line items with discounted prices
259 - let line_items: Vec<crate::payments::CartLineItem> = still_paid
260 - .iter()
261 - .map(|(item, final_price)| crate::payments::CartLineItem {
262 - title: &item.title,
263 - amount_cents: *final_price as i64,
264 - })
265 - .collect();
266 -
267 - // Reject sub-Stripe-minimum totals before calling Stripe; same rationale
268 - // as the per-seller path further down — chained promo+PWYW combinations
269 - // can land between 1¢ and 49¢, and Stripe's error message for that is
270 - // not user-friendly.
271 - let cart_total: i64 = line_items.iter().map(|li| li.amount_cents).sum();
272 - if cart_total > 0 && cart_total < crate::constants::STRIPE_MINIMUM_CHARGE_CENTS {
273 - if let Some(pc_id) = promo_code_id {
274 - release_promo_quietly(&state, pc_id, user.id).await;
275 - }
276 - return Err(AppError::BadRequest(format!(
277 - "Minimum cart total is ${:.2}",
278 - crate::constants::STRIPE_MINIMUM_CHARGE_CENTS as f64 / 100.0
279 - )));
280 - }
281 -
282 - // Pre-check the partial unique index `(buyer_id, item_id) WHERE status='pending'`
283 - // BEFORE creating the Stripe session. The previous behavior swallowed a 23505
284 - // collision per cart item silently, leaving the buyer charged for items that
285 - // never got a pending row — and therefore never got fulfilled by the webhook.
286 - let paid_item_ids: Vec<db::ItemId> = still_paid.iter().map(|(it, _)| it.item_id).collect();
287 - let pending_collisions = db::transactions::pending_subset(&state.db, user.id, &paid_item_ids)
288 - .await.context("pre-check pending cart purchases")?;
289 - if !pending_collisions.is_empty() {
290 - if let Some(pc_id) = promo_code_id {
291 - release_promo_quietly(&state, pc_id, user.id).await;
292 - }
293 - return Err(AppError::BadRequest(
294 - "You already have a checkout in progress for one or more of these items. \
295 - Complete or cancel that checkout before starting a new one.".to_string(),
296 - ));
297 - }
298 -
299 - let success_url = format!(
300 - "{}/stripe/success?session_id={{CHECKOUT_SESSION_ID}}",
301 - state.config.host_url
302 - );
303 - let cancel_url = format!("{}/cart", state.config.host_url);
304 -
305 - let cart_params = crate::payments::CartCheckoutParams {
306 - connected_account_id: stripe_account_id,
307 - line_items: &line_items,
308 - buyer_id: user.id,
309 - seller_id,
310 - success_url: &success_url,
311 - cancel_url: &cancel_url,
312 - enable_stripe_tax: seller.stripe_tax_enabled,
313 - };
314 -
315 - let result = match stripe.create_cart_checkout_session(&cart_params).await {
316 - Ok(r) => r,
317 - Err(e) => {
318 - if let Some(pc_id) = promo_code_id {
319 - release_promo_quietly(&state, pc_id, user.id).await;
320 - }
321 - return Err(e).context("create cart checkout session");
322 - }
323 - };
188 + db::cart::remove_from_cart_bulk(&state.db, user_id, &to_remove).await.ok();
189 + Ok(())
190 + }
324 191
325 - // Create pending transactions for all paid items atomically so the buyer
326 - // either gets all items or none (prevents partial delivery on mid-loop failure)
192 + /// Create the pending transactions for every paid item in one DB transaction,
193 + /// so the buyer gets all items or none (no partial delivery on a mid-loop
194 + /// failure).
195 + ///
196 + /// A 23505 means another tab raced past the pre-check; abort the whole cart
197 + /// rather than leave a paid Stripe line item with no pending row to fulfill. On
198 + /// any error the promo reservation (if any) is released, since the Stripe
199 + /// session was already created but no fulfilling rows landed.
200 + async fn create_cart_pending_transactions(
201 + state: &AppState,
202 + user_id: UserId,
203 + seller_id: UserId,
204 + session_id: &str,
205 + items: &[(&db::cart::CartItem, i32)],
206 + share_contact: bool,
207 + promo_code_id: Option<PromoCodeId>,
208 + ) -> Result<()> {
327 209 let mut db_tx = state.db.begin().await.context("begin cart transaction creation")?;
328 - for (item, final_price) in &still_paid {
210 + for (item, final_price) in items {
329 211 match db::transactions::create_transaction(
330 212 &mut *db_tx,
331 213 &db::transactions::CreateTransactionParams {
332 - buyer_id: Some(user.id),
214 + buyer_id: Some(user_id),
333 215 seller_id,
334 216 item_id: Some(item.item_id),
335 217 amount_cents: Cents::new(*final_price as i64),
336 218 platform_fee_cents: Cents::ZERO,
337 - stripe_checkout_session_id: &result.id,
219 + stripe_checkout_session_id: session_id,
338 220 item_title: &item.title,
339 221 seller_username: &item.creator_username,
340 - share_contact: form.share_contact,
222 + share_contact,
341 223 project_id: None,
342 224 promo_code_id,
343 225 guest_email: None,
@@ -349,15 +231,12 @@ pub(in crate::routes::stripe) async fn create_cart_checkout(
349 231 Err(AppError::Database(sqlx::Error::Database(ref db_err)))
350 232 if db_err.code().as_deref() == Some("23505") =>
351 233 {
352 - // A 23505 here means another tab raced past the pre-check.
353 - // Abort the whole cart rather than silently leaving a paid
354 - // Stripe line item without a pending DB row to fulfill.
355 234 tracing::warn!(
356 - buyer_id = %user.id, item_id = %item.item_id,
235 + buyer_id = %user_id, item_id = %item.item_id,
357 236 "23505 raced past pre-check during cart pending insert"
358 237 );
359 238 if let Some(pc_id) = promo_code_id {
360 - release_promo_quietly(&state, pc_id, user.id).await;
239 + release_promo_quietly(state, pc_id, user_id).await;
361 240 }
362 241 return Err(AppError::BadRequest(
363 242 "Another checkout for one of these items started while this one was loading. \
@@ -365,132 +244,61 @@ pub(in crate::routes::stripe) async fn create_cart_checkout(
365 244 ));
366 245 }
367 246 Err(e) => {
368 - // Transaction auto-rolls back on drop
247 + // Transaction auto-rolls back on drop.
369 248 if let Some(pc_id) = promo_code_id {
370 - release_promo_quietly(&state, pc_id, user.id).await;
249 + release_promo_quietly(state, pc_id, user_id).await;
371 250 }
372 251 return Err(e).context("create pending transaction for cart item");
373 252 }
374 253 }
375 254 }
376 255 db_tx.commit().await.context("commit cart pending transactions")?;
377 -
378 - // Cart items are removed by the webhook handler on successful payment,
379 - // so users keep their cart if they cancel the Stripe checkout.
380 -
381 - // Redirect to Stripe Checkout
382 - let checkout_url = result
383 - .url
384 - .ok_or_else(|| AppError::BadRequest("No checkout URL returned".to_string()))?;
385 -
386 - Ok(Redirect::to(&checkout_url).into_response())
387 - }
388 -
389 - /// Form data for checkout-all (cross-seller).
390 - #[derive(Debug, Deserialize)]
391 - pub(in crate::routes::stripe) struct CartCheckoutAllForm {
392 - #[serde(default)]
393 - pub share_contact: bool,
394 - }
395 -
396 - /// POST /stripe/checkout/cart/all - Checkout all cart items across all sellers.
397 - ///
398 - /// Queues seller IDs in the session, processes the first seller, then chains
399 - /// through the rest via checkout_success redirects.
400 - #[tracing::instrument(skip_all, name = "stripe::cart_checkout_all", fields(user_id = %user.id))]
401 - pub(in crate::routes::stripe) async fn create_cart_checkout_all(
402 - State(state): State<AppState>,
403 - AuthUser(user): AuthUser,
404 - session: tower_sessions::Session,
405 - Form(form): Form<CartCheckoutAllForm>,
406 - ) -> Result<Response> {
407 - user.check_not_suspended()?;
408 - user.check_not_sandbox()?;
409 -
410 - let cart_items = db::cart::get_cart_items(&state.db, user.id).await
411 - .context("fetch all cart items")?;
412 -
413 - if cart_items.is_empty() {
414 - return Ok(Redirect::to("/cart").into_response());
415 - }
416 -
417 - // Group by seller, collect unique seller IDs in order
418 - let mut seen = std::collections::HashSet::new();
419 - let mut seller_ids: Vec<String> = Vec::new();
420 - for item in &cart_items {
421 - let sid = item.seller_id.to_string();
422 - if seen.insert(sid.clone()) {
423 - seller_ids.push(sid);
424 - }
425 - }
426 -
427 - if seller_ids.is_empty() {
428 - return Ok(Redirect::to("/cart").into_response());
429 - }
430 -
431 - // Queue remaining sellers (all except the first) in session
432 - let first_seller = seller_ids.remove(0);
433 - if !seller_ids.is_empty() {
434 - session.insert("cart_queue", seller_ids).await
435 - .map_err(|e| AppError::BadRequest(format!("session error: {e}")))?;
436 - session.insert("cart_share_contact", form.share_contact).await
437 - .map_err(|e| AppError::BadRequest(format!("session error: {e}")))?;
438 - }
439 -
440 - // Process the first seller and chain through the queue until we hit a
441 - // paid seller (return its Stripe URL) or exhaust everything as free.
442 - match drain_to_paid(&state, &user, first_seller, form.share_contact, &session).await? {
443 - Some(url) => Ok(Redirect::to(&url).into_response()),
444 - None => Ok(Redirect::to("/library?purchase=success").into_response()),
445 - }
256 + Ok(())
446 257 }
447 258
448 - /// Core logic: process cart checkout for one seller.
259 + /// Core per-seller cart checkout, shared by the single-seller form
260 + /// ([`create_cart_checkout`]) and the cross-seller chain ([`drain_to_paid`]).
449 261 ///
450 - /// Returns `Ok(Some(url))` for a Stripe-payable checkout, `Ok(None)` when
451 - /// every item for this seller was free (already claimed inline; no Stripe
452 - /// session needed — caller should advance to the next queued seller or
453 - /// redirect to the library). The chain-break bug fixed in Run #8 was caused
454 - /// by the previous shape returning `Err` on the all-free case, which broke
Lines truncated
@@ -49,6 +49,20 @@ pub(super) async fn handle_purchase_checkout_completed(
49 49 "transaction completed"
50 50 );
51 51
52 + // Defense-in-depth reconciliation: our line items are server-built,
53 + // so Stripe's pre-tax subtotal should equal the amount we credit. A
54 + // mismatch (a future Stripe Tax / price-edit / currency edge) is
55 + // logged loudly rather than silently trusted — we still credit the
56 + // server amount, which is authoritative.
57 + if let Some(subtotal) = session.amount_subtotal
58 + && subtotal != i64::from(tx.amount_cents)
59 + {
60 + tracing::error!(
61 + session_id = %session_id, credited_cents = %tx.amount_cents, stripe_subtotal_cents = %subtotal,
62 + "checkout amount mismatch: credited transaction amount differs from Stripe session subtotal"
63 + );
64 + }
65 +
52 66 // Increment denormalized sales_count (inside transaction)
53 67 if let Some(iid) = item_id {
54 68 db::items::increment_sales_count(&mut *db_tx, iid)
@@ -145,6 +159,20 @@ pub(super) async fn handle_cart_checkout_completed(
145 159 count = completed_txs.len(), "cart transactions completed"
146 160 );
147 161
162 + // Defense-in-depth reconciliation: line items are server-built, so the sum
163 + // of the credited transactions should equal Stripe's pre-tax subtotal. A
164 + // mismatch (future Stripe Tax / price-edit / currency edge) is logged loudly
165 + // rather than silently trusted — the server amounts remain authoritative.
166 + if let Some(subtotal) = session.amount_subtotal {
167 + let credited: i64 = completed_txs.iter().map(|tx| i64::from(tx.amount_cents)).sum();
168 + if credited != subtotal {
169 + tracing::error!(
170 + session_id = %session_id, credited_cents = %credited, stripe_subtotal_cents = %subtotal,
171 + "cart checkout amount mismatch: sum of credited transactions differs from Stripe session subtotal"
172 + );
173 + }
174 + }
175 +
148 176 // Increment sales count for each item
149 177 for tx in &completed_txs {
150 178 if let Some(item_id) = tx.item_id {
@@ -141,6 +141,9 @@ pub struct ScanResult {
141 141 /// Pre-compiled scanning pipeline. Initialized once at startup and shared via Arc.
142 142 pub struct ScanPipeline {
143 143 yara_rules: Option<yara_x::Rules>,
144 + /// Number of YARA rule files that compiled, and the configured health floor.
145 + yara_rule_count: usize,
146 + yara_min_rule_files: usize,
144 147 clamav_socket: Option<String>,
145 148 malwarebazaar_enabled: bool,
146 149 urlhaus_enabled: bool,
@@ -151,10 +154,12 @@ pub struct ScanPipeline {
151 154 impl ScanPipeline {
152 155 /// Create a new pipeline, compiling YARA rules from the configured directory.
153 156 pub fn new(config: &ScanConfig) -> Result<Self, String> {
154 - let yara_rules = yara::compile_rules_from_dir(&config.yara_rules_dir)?;
157 + let (yara_rules, yara_rule_count) = yara::compile_rules_from_dir(&config.yara_rules_dir)?;
155 158
156 159 Ok(ScanPipeline {
157 160 yara_rules,
161 + yara_rule_count,
162 + yara_min_rule_files: config.yara_min_rule_files,
158 163 clamav_socket: config.clamav_socket.clone(),
159 164 malwarebazaar_enabled: config.malwarebazaar_enabled,
160 165 urlhaus_enabled: config.urlhaus_enabled,
@@ -179,6 +184,18 @@ impl ScanPipeline {
179 184 }
180 185 }
181 186 if self.yara_rules.is_some() {
187 + // Expected-rule-count floor: a corpus that quietly dropped below the
188 + // operator-declared size (e.g. a yara-x upgrade made N rules
189 + // uncompilable) is degraded coverage masquerading as a live layer.
190 + // Fail boot loudly when a floor is set and we're under it.
191 + if self.yara_min_rule_files > 0 && self.yara_rule_count < self.yara_min_rule_files {
192 + return Err(format!(
193 + "YARA corpus degraded: {} rule files compiled, below the configured \
194 + floor of {} (YARA_MIN_RULE_FILES). Refusing to boot — a silently \
195 + shrunken rule set is false coverage.",
196 + self.yara_rule_count, self.yara_min_rule_files,
197 + ));
198 + }
182 199 live_layers.push("yara");
183 200 }
184 201 if self.malwarebazaar_enabled {
@@ -492,6 +509,8 @@ mod tests {
492 509 fn make_pipeline() -> std::sync::Arc<ScanPipeline> {
493 510 std::sync::Arc::new(ScanPipeline {
494 511 yara_rules: None,
512 + yara_rule_count: 0,
513 + yara_min_rule_files: 0,
495 514 clamav_socket: None,
496 515 malwarebazaar_enabled: false,
497 516 urlhaus_enabled: false,
@@ -17,11 +17,15 @@ const YARA_SCAN_TIMEOUT: Duration = Duration::from_secs(30);
17 17
18 18 /// Compile all YARA rules from `.yar` files in a directory.
19 19 /// Returns None if the directory doesn't exist or contains no rules.
20 - pub fn compile_rules_from_dir(dir: &str) -> Result<Option<yara_x::Rules>, String> {
20 + /// Compile every `.yar`/`.yara` file under `dir`. Returns the compiled `Rules`
21 + /// (or `None` when the directory is absent / empty) alongside the count of rule
22 + /// files that compiled successfully — the caller uses that count to enforce an
23 + /// optional health floor (see `ScanPipeline::assert_live`).
24 + pub fn compile_rules_from_dir(dir: &str) -> Result<(Option<yara_x::Rules>, usize), String> {
21 25 let path = Path::new(dir);
22 26 if !path.exists() {
23 27 tracing::info!(dir = %dir, "YARA rules directory not found, skipping");
24 - return Ok(None);
28 + return Ok((None, 0));
25 29 }
26 30
27 31 let mut compiler = yara_x::Compiler::new();
@@ -74,14 +78,14 @@ pub fn compile_rules_from_dir(dir: &str) -> Result<Option<yara_x::Rules>, String
74 78
75 79 if rule_count == 0 {
76 80 tracing::info!(dir = %dir, "No YARA rule files found");
77 - return Ok(None);
81 + return Ok((None, 0));
78 82 }
79 83
80 84 let rules = compiler
81 85 .build();
82 86
83 - tracing::info!(rule_count, dir = %dir, "YARA rules compiled");
84 - Ok(Some(rules))
87 + tracing::info!(rule_count, skipped_count, dir = %dir, "YARA rules compiled");
88 + Ok((Some(rules), rule_count))
85 89 }
86 90
87 91 /// Scan file data against compiled YARA rules.
@@ -151,7 +155,7 @@ mod tests {
151 155 fn no_rules_dir_returns_none() {
152 156 let result = compile_rules_from_dir("/nonexistent/path/to/rules");
153 157 assert!(result.is_ok());
154 - assert!(result.unwrap().is_none());
158 + assert!(result.unwrap().0.is_none());
155 159 }
156 160
157 161 #[test]
@@ -366,8 +370,9 @@ mod tests {
366 370
367 371 let result = compile_rules_from_dir(dir.path().to_str().unwrap());
368 372 assert!(result.is_ok());
369 - let rules = result.unwrap();
373 + let (rules, count) = result.unwrap();
370 374 assert!(rules.is_some(), "Should have compiled one rule");
375 + assert_eq!(count, 1, "exactly one rule file compiled");
371 376
372 377 // Verify the rules work
373 378 let rules = rules.unwrap();
@@ -381,7 +386,7 @@ mod tests {
381 386 let dir = tempfile::tempdir().unwrap();
382 387 let result = compile_rules_from_dir(dir.path().to_str().unwrap());
383 388 assert!(result.is_ok());
384 - assert!(result.unwrap().is_none());
389 + assert!(result.unwrap().0.is_none());
385 390 }
386 391
387 392 #[test]
@@ -392,7 +397,7 @@ mod tests {
392 397
393 398 let result = compile_rules_from_dir(dir.path().to_str().unwrap());
394 399 assert!(result.is_ok());
395 - assert!(result.unwrap().is_none());
400 + assert!(result.unwrap().0.is_none());
396 401 }
397 402
398 403 #[test]
@@ -407,7 +412,7 @@ mod tests {
407 412 assert!(result.is_ok(), "skipped-on-error, not aborted");
408 413 // No valid rules in the dir, so the function returns Ok(None) — the
409 414 // pipeline interprets None as "yara not configured" → Skip verdict.
410 - assert!(result.unwrap().is_none());
415 + assert!(result.unwrap().0.is_none());
411 416 }
412 417
413 418 #[test]
@@ -421,7 +426,9 @@ mod tests {
421 426
422 427 let result = compile_rules_from_dir(dir.path().to_str().unwrap());
423 428 assert!(result.is_ok());
424 - assert!(result.unwrap().is_some(), "valid rules still compile when bad files are present");
429 + let (rules, count) = result.unwrap();
430 + assert!(rules.is_some(), "valid rules still compile when bad files are present");
431 + assert_eq!(count, 1, "the one valid file compiled; the bad one was skipped");
425 432 }
426 433
427 434 #[test]
@@ -4,6 +4,36 @@ use crate::db;
4 4 use crate::db::{DbBlogPost, DbItem, DbUser};
5 5 use crate::AppState;
6 6
7 + /// Spawn a bounded email fan-out off the caller's (possibly advisory-lock-held)
8 + /// connection. `recipients` MUST already be bounded by the producing query's
9 + /// LIMIT — this helper owns the off-lock `tokio::spawn` and the every-50
10 + /// Postmark pause, so no scheduler fan-out can re-introduce an inline serial
11 + /// send loop on the lock connection (Run #14 CHRONIC 2b: the shape that drifted
12 + /// between the announcement and onboarding paths). `send_one` is awaited once
13 + /// per recipient and owns its own per-recipient error logging.
14 + ///
15 + /// There is deliberately no non-spawning variant: routing every fan-out through
16 + /// here is what makes "serial sends on the lock connection" unwritable.
17 + fn spawn_bounded_fanout<T, F, Fut>(recipients: Vec<T>, send_one: F)
18 + where
19 + T: Send + 'static,
20 + F: Fn(T) -> Fut + Send + 'static,
21 + Fut: std::future::Future<Output = ()> + Send,
22 + {
23 + if recipients.is_empty() {
24 + return;
25 + }
26 + tokio::spawn(async move {
27 + for (i, recipient) in recipients.into_iter().enumerate() {
28 + // Rate-limit: pause briefly every 50 emails to avoid hammering Postmark.
29 + if i > 0 && i % 50 == 0 {
30 + tokio::time::sleep(std::time::Duration::from_secs(1)).await;
31 + }
32 + send_one(recipient).await;
33 + }
34 + });
35 + }
36 +
7 37 /// Atomically mark an item as release-announced and send subscriber emails
8 38 /// via the project's content mailing list.
9 39 ///
@@ -56,12 +86,15 @@ pub async fn send_release_announcements(state: &AppState, item: &DbItem) {
56 86 let signing_secret = state.config.signing_secret.clone();
57 87 let list_id_str = list.id.to_string();
58 88
59 - tokio::spawn(async move {
60 - for (i, subscriber) in subscribers.iter().enumerate() {
61 - // Rate-limit: pause briefly every 50 emails to avoid hammering Postmark
62 - if i > 0 && i % 50 == 0 {
63 - tokio::time::sleep(std::time::Duration::from_secs(1)).await;
64 - }
89 + spawn_bounded_fanout(subscribers, move |subscriber| {
90 + let email_client = email_client.clone();
91 + let host_url = host_url.clone();
92 + let signing_secret = signing_secret.clone();
93 + let creator_name = creator_name.clone();
94 + let item_title = item_title.clone();
95 + let item_url = item_url.clone();
96 + let list_id_str = list_id_str.clone();
97 + async move {
65 98 let unsub_url = crate::email::generate_unsubscribe_url(
66 99 &host_url,
67 100 subscriber.id,
@@ -80,10 +113,7 @@ pub async fn send_release_announcements(state: &AppState, item: &DbItem) {
80 113 )
81 114 .await
82 115 {
83 - tracing::error!(
84 - error = ?e,
85 - "failed to send release announcement email"
86 - );
116 + tracing::error!(error = ?e, "failed to send release announcement email");
87 117 }
88 118 }
89 119 });
@@ -144,12 +174,15 @@ pub async fn send_blog_post_announcements(state: &AppState, post: &DbBlogPost) {
144 174 let signing_secret = state.config.signing_secret.clone();
145 175 let list_id_str = list.id.to_string();
146 176
147 - tokio::spawn(async move {
148 - for (i, subscriber) in subscribers.iter().enumerate() {
149 - // Rate-limit: pause briefly every 50 emails to avoid hammering Postmark
150 - if i > 0 && i % 50 == 0 {
151 - tokio::time::sleep(std::time::Duration::from_secs(1)).await;
152 - }
177 + spawn_bounded_fanout(subscribers, move |subscriber| {
178 + let email_client = email_client.clone();
179 + let host_url = host_url.clone();
180 + let signing_secret = signing_secret.clone();
181 + let creator_name = creator_name.clone();
182 + let post_title = post_title.clone();
183 + let post_url = post_url.clone();
184 + let list_id_str = list_id_str.clone();
185 + async move {
153 186 let unsub_url = crate::email::generate_unsubscribe_url(
154 187 &host_url,
155 188 subscriber.id,
@@ -168,10 +201,7 @@ pub async fn send_blog_post_announcements(state: &AppState, post: &DbBlogPost) {
168 201 )
169 202 .await
170 203 {
171 - tracing::error!(
172 - error = ?e,
173 - "failed to send blog post announcement email"
174 - );
204 + tracing::error!(error = ?e, "failed to send blog post announcement email");
175 205 }
176 206 }
177 207 });
@@ -260,12 +290,10 @@ async fn claim_and_spawn_sends(state: &AppState, send: Vec<DbUser>, next: Onboar
260 290
261 291 let email_client = state.email.clone();
262 292 let host_url = state.config.host_url.clone();
263 - tokio::spawn(async move {
264 - for (i, user) in send.into_iter().enumerate() {
265 - // Rate-limit: pause briefly every 50 emails to avoid hammering Postmark
266 - if i > 0 && i % 50 == 0 {
267 - tokio::time::sleep(std::time::Duration::from_secs(1)).await;
268 - }
293 + spawn_bounded_fanout(send, move |user| {
294 + let email_client = email_client.clone();
295 + let host_url = host_url.clone();
296 + async move {
269 297 let res = match next {
270 298 OnboardingStep::StripeGuideSent => {
271 299 email_client
@@ -247,6 +247,7 @@ impl TestHarness {
247 247 urlhaus_enabled: false,
248 248 abuse_ch_auth_key: None,
249 249 metadefender_api_key: None,
250 + yara_min_rule_files: 0,
250 251 };
251 252 ScanPipeline::new(&scan_config).expect("ScanPipeline::new with no-op config")
252 253 }