Skip to main content

max / makenotwork

Unify subscription access checks; bound scheduler fan-out queries (Run #14) - db::subscriptions: consolidate has_active_subscription_to_item/_to_project into a single has_access(pool, user_id, SubscriptionScope::{Item,Project}); update all callers (item/library/project pages, subscription checkout, pricing context). - scheduler (Run #14 MEDIUM): bound get_onboarding_candidates and the announcement fan-out with ORDER BY + LIMIT so a signup/announcement surge drains across ticks instead of loading an unbounded user vec into a lock-held tick. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-08 22:32 UTC
Commit: 5b7cc20e171bd99f2c6db5cfb0333540a3cea74b
Parent: 9b59f8b
9 files changed, +141 insertions, -85 deletions
@@ -347,25 +347,61 @@ pub async fn resume_subscriptions_for_creator(
347 347
348 348 // ── Access control ──
349 349
350 - /// Check if a user has an active subscription to a project.
350 + /// SQL predicate identifying a `subscriptions` row that currently grants access
351 + /// to its scope. The `current_period_end` clause is defense-in-depth against a
352 + /// missed/delayed `customer.subscription.deleted` webhook — `status = 'active'`
353 + /// alone trusts Stripe to push the cancellation promptly.
354 + ///
355 + /// SINGLE source of truth for "does this subscription grant access right now".
356 + /// Every access gate routes through [`has_access`] / [`get_user_subscribed_item_ids`],
357 + /// which interpolate this constant; do not hand-write the predicate at a call
358 + /// site. Run #14 CHRONIC: the item-level gates had drifted from the
359 + /// project-level gate by omitting the period clause — centralizing it here makes
360 + /// that divergence unwritable. (Compile-time constant, never user input, so the
361 + /// `format!` interpolation is injection-safe; the `$N` placeholders stay bound.)
362 + const GRANTS_ACCESS_PREDICATE: &str = "status = 'active' AND paused_at IS NULL \
363 + AND (current_period_end IS NULL OR current_period_end > NOW())";
364 +
365 + /// What a subscription access check is scoped to. Both arms share
366 + /// [`GRANTS_ACCESS_PREDICATE`] via [`has_access`], so a project gate and an item
367 + /// gate cannot diverge.
368 + #[derive(Debug, Clone, Copy)]
369 + pub enum SubscriptionScope {
370 + Project(ProjectId),
371 + Item(super::ItemId),
372 + }
373 +
374 + /// Does `user_id` hold a subscription that currently grants access to `scope`?
375 + ///
376 + /// The single entry point for project- and item-level subscription access gates.
351 377 #[tracing::instrument(skip_all)]
352 - pub async fn has_active_subscription_to_project(
378 + pub async fn has_access(
353 379 pool: &PgPool,
354 380 user_id: UserId,
355 - project_id: ProjectId,
381 + scope: SubscriptionScope,
356 382 ) -> Result<bool> {
357 - // Defense-in-depth on a missed/delayed `customer.subscription.deleted`
358 - // webhook: also reject when the current period has ended. status='active'
359 - // alone trusts Stripe to push the cancellation event promptly.
360 - let count: i64 = sqlx::query_scalar(
361 - "SELECT COUNT(*) FROM subscriptions \
362 - WHERE subscriber_id = $1 AND project_id = $2 AND status = 'active' AND paused_at IS NULL \
363 - AND (current_period_end IS NULL OR current_period_end > NOW())",
364 - )
365 - .bind(user_id)
366 - .bind(project_id)
367 - .fetch_one(pool)
368 - .await?;
383 + let count: i64 = match scope {
384 + SubscriptionScope::Project(project_id) => {
385 + sqlx::query_scalar(&format!(
386 + "SELECT COUNT(*) FROM subscriptions \
387 + WHERE subscriber_id = $1 AND project_id = $2 AND {GRANTS_ACCESS_PREDICATE}"
388 + ))
389 + .bind(user_id)
390 + .bind(project_id)
391 + .fetch_one(pool)
392 + .await?
393 + }
394 + SubscriptionScope::Item(item_id) => {
395 + sqlx::query_scalar(&format!(
396 + "SELECT COUNT(*) FROM subscriptions \
397 + WHERE subscriber_id = $1 AND item_id = $2 AND {GRANTS_ACCESS_PREDICATE}"
398 + ))
399 + .bind(user_id)
400 + .bind(item_id)
401 + .fetch_one(pool)
402 + .await?
403 + }
404 + };
369 405
370 406 Ok(count > 0)
371 407 }
@@ -395,6 +431,11 @@ pub async fn get_user_subscriptions_with_details(
395 431 }
396 432
397 433 /// Get the number of active subscribers to a project (for dashboard display).
434 + ///
435 + /// NOT an access gate — this is a creator-facing headcount, so it deliberately
436 + /// counts `status = 'active'` rows regardless of `current_period_end` (a sub in
437 + /// its grace window is still a subscriber). Do not "align" it with
438 + /// [`GRANTS_ACCESS_PREDICATE`]; the divergence here is intentional.
398 439 #[tracing::instrument(skip_all)]
399 440 pub async fn get_project_subscriber_count(
400 441 pool: &PgPool,
@@ -412,33 +453,18 @@ pub async fn get_project_subscriber_count(
412 453
413 454 // ── Item-level subscriptions ──
414 455
415 - /// Check if a user has an active subscription to a specific item.
416 - #[tracing::instrument(skip_all)]
417 - pub async fn has_active_subscription_to_item(
418 - pool: &PgPool,
419 - user_id: UserId,
420 - item_id: super::ItemId,
421 - ) -> Result<bool> {
422 - let count: i64 = sqlx::query_scalar(
423 - "SELECT COUNT(*) FROM subscriptions WHERE subscriber_id = $1 AND item_id = $2 AND status = 'active' AND paused_at IS NULL",
424 - )
425 - .bind(user_id)
426 - .bind(item_id)
427 - .fetch_one(pool)
428 - .await?;
429 -
430 - Ok(count > 0)
431 - }
432 -
433 - /// Get all item IDs that a user has active subscriptions to (for batch access checks).
456 + /// Get all item IDs that `user_id` currently has access to via subscription
457 + /// (for batch access checks). Shares [`GRANTS_ACCESS_PREDICATE`] with
458 + /// [`has_access`] so the batch path cannot drift from the single-item gate.
434 459 #[tracing::instrument(skip_all)]
435 460 pub async fn get_user_subscribed_item_ids(
436 461 pool: &PgPool,
437 462 user_id: UserId,
438 463 ) -> Result<Vec<super::ItemId>> {
439 - let item_ids: Vec<super::ItemId> = sqlx::query_scalar(
440 - "SELECT DISTINCT item_id FROM subscriptions WHERE subscriber_id = $1 AND status = 'active' AND paused_at IS NULL AND item_id IS NOT NULL",
441 - )
464 + let item_ids: Vec<super::ItemId> = sqlx::query_scalar(&format!(
465 + "SELECT DISTINCT item_id FROM subscriptions \
466 + WHERE subscriber_id = $1 AND item_id IS NOT NULL AND {GRANTS_ACCESS_PREDICATE}"
467 + ))
442 468 .bind(user_id)
443 469 .fetch_all(pool)
444 470 .await?;
@@ -908,11 +908,18 @@ pub async fn get_onboarding_candidates(
908 908 min_age: chrono::Duration,
909 909 ) -> Result<Vec<DbUser>> {
910 910 let cutoff = chrono::Utc::now() - min_age;
911 + // Per-tick LIMIT bounds the scheduler's input list. The caller advances
912 + // each returned user's step (so the WHERE re-excludes them), meaning the
913 + // remainder is drained on the next tick — same re-tick pattern as the
914 + // sandbox/terminated/content-removal cleanup queries. Run #14 MEDIUM: a
915 + // signup surge must not load an unbounded user vec into the lock-held tick.
911 916 let users = sqlx::query_as::<_, DbUser>(
912 917 "SELECT * FROM users
913 918 WHERE onboarding_email_step = $1
914 919 AND (onboarding_email_sent_at IS NULL OR onboarding_email_sent_at < $2)
915 - AND suspended_at IS NULL",
920 + AND suspended_at IS NULL
921 + ORDER BY onboarding_email_sent_at ASC NULLS FIRST
922 + LIMIT 1000",
916 923 )
917 924 .bind(step)
918 925 .bind(cutoff)
@@ -408,7 +408,7 @@ pub async fn build_project_access_context(
408 408 let has_purchased =
409 409 db::transactions::has_purchased_project(pool, user_id, project_id).await?;
410 410 let has_active_subscription =
411 - db::subscriptions::has_active_subscription_to_project(pool, user_id, project_id).await?;
411 + db::subscriptions::has_access(pool, user_id, db::subscriptions::SubscriptionScope::Project(project_id)).await?;
412 412
413 413 Ok(AccessContext {
414 414 is_creator,
@@ -87,7 +87,7 @@ pub(crate) async fn render_item_page(
87 87 false
88 88 };
89 89 let has_item_sub = if let Some(ref user) = maybe_user {
90 - db::subscriptions::has_active_subscription_to_item(&state.db, user.id, db_item.id).await?
90 + db::subscriptions::has_access(&state.db, user.id, db::subscriptions::SubscriptionScope::Item(db_item.id)).await?
91 91 } else {
92 92 false
93 93 };
@@ -64,7 +64,7 @@ pub(in crate::routes::pages::public) async fn library_page(
64 64 false
65 65 };
66 66 let has_item_sub = if let Some(ref user) = maybe_user {
67 - db::subscriptions::has_active_subscription_to_item(&state.db, user.id, db_item.id).await?
67 + db::subscriptions::has_access(&state.db, user.id, db::subscriptions::SubscriptionScope::Item(db_item.id)).await?
68 68 } else {
69 69 false
70 70 };
@@ -126,7 +126,7 @@ pub(crate) async fn render_project_page(
126 126 };
127 127
128 128 let has_subscription = if let Some(ref user) = maybe_user {
129 - db::subscriptions::has_active_subscription_to_project(&state.db, user.id, db_project.id)
129 + db::subscriptions::has_access(&state.db, user.id, db::subscriptions::SubscriptionScope::Project(db_project.id))
130 130 .await?
131 131 } else {
132 132 false
@@ -332,7 +332,7 @@ pub(in crate::routes::stripe) async fn create_subscription_checkout(
332 332 }
333 333
334 334 // Check if user already has an active subscription to this project
335 - if db::subscriptions::has_active_subscription_to_project(&state.db, user.id, tier_project_id).await? {
335 + if db::subscriptions::has_access(&state.db, user.id, db::subscriptions::SubscriptionScope::Project(tier_project_id)).await? {
336 336 return Ok(Redirect::to(&format!("/p/{}", project.slug)).into_response());
337 337 }
338 338
@@ -1,7 +1,7 @@
1 1 //! Release and blog post announcement emails via project mailing lists.
2 2
3 3 use crate::db;
4 - use crate::db::{DbBlogPost, DbItem};
4 + use crate::db::{DbBlogPost, DbItem, DbUser};
5 5 use crate::AppState;
6 6
7 7 /// Atomically mark an item as release-announced and send subscriber emails
@@ -199,9 +199,12 @@ impl OnboardingStep {
199 199 /// Step 1 (welcome) is sent at signup in the auth handler.
200 200 /// Step 2 (profile tips) fires 24h after welcome, skipped if display_name is set.
201 201 /// Step 3 (Stripe guide) fires 72h after welcome, skipped if Stripe is connected.
202 + ///
203 + /// Only the candidate fetch + step-advance (fast DB writes) run inline; the
204 + /// actual Postmark sends are spawned off the scheduler's lock-held connection
205 + /// so a backlog of serial email I/O can't extend the advisory-lock hold time
206 + /// (Run #14 MEDIUM, mirrors the release/blog announcement fan-out).
202 207 pub(super) async fn send_onboarding_emails(state: &AppState) {
203 - let host_url = &state.config.host_url;
204 -
205 208 // Step 1→2: profile tips (24h after welcome)
206 209 let next = OnboardingStep::ProfileTipsSent;
207 210 if let Ok(users) =
@@ -210,27 +213,8 @@ pub(super) async fn send_onboarding_emails(state: &AppState) {
210 213 // Batch-advance users who already set a display name (skip email)
211 214 let (skip, send): (Vec<_>, Vec<_>) =
212 215 users.into_iter().partition(|u| u.display_name.is_some());
213 - if !skip.is_empty() {
214 - let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect();
215 - if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await {
216 - tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step");
217 - }
218 - }
219 - for user in send {
220 - // Advance step BEFORE sending email to prevent duplicates on DB failure.
221 - // Missing a non-critical onboarding email is better than sending it twice.
222 - if let Err(e) = db::users::advance_onboarding_step(&state.db, user.id, next.as_i16()).await {
223 - tracing::warn!(user_id = %user.id, step = ?next, error = ?e, "failed to advance onboarding step");
224 - continue;
225 - }
226 - if let Err(e) = state
227 - .email
228 - .send_onboarding_profile(&user.email, user.display_name.as_deref(), host_url)
229 - .await
230 - {
231 - tracing::error!(error = ?e, user_id = %user.id, "failed to send onboarding profile email");
232 - }
233 - }
216 + advance_skipped(state, &skip, next).await;
217 + claim_and_spawn_sends(state, send, next).await;
234 218 }
235 219
236 220 // Step 2→3: Stripe guide (72h after welcome)
@@ -241,24 +225,63 @@ pub(super) async fn send_onboarding_emails(state: &AppState) {
241 225 // Batch-advance users who already connected Stripe (skip email)
242 226 let (skip, send): (Vec<_>, Vec<_>) =
243 227 users.into_iter().partition(|u| u.stripe_account_id.is_some());
244 - if !skip.is_empty() {
245 - let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect();
246 - if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await {
247 - tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step");
248 - }
249 - }
250 - for user in send {
251 - if let Err(e) = db::users::advance_onboarding_step(&state.db, user.id, next.as_i16()).await {
252 - tracing::warn!(user_id = %user.id, step = ?next, error = ?e, "failed to advance onboarding step");
253 - continue;
228 + advance_skipped(state, &skip, next).await;
229 + claim_and_spawn_sends(state, send, next).await;
230 + }
231 + }
232 +
233 + /// Batch-advance users who don't need an email for this step (display name /
234 + /// Stripe already set). Inline — a single cheap UPDATE.
235 + async fn advance_skipped(state: &AppState, skip: &[DbUser], next: OnboardingStep) {
236 + if skip.is_empty() {
237 + return;
238 + }
239 + let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect();
240 + if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await {
241 + tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step");
242 + }
243 + }
244 +
245 + /// Claim the send batch by advancing its step BEFORE sending (so concurrent
246 + /// instances and the next tick re-exclude these users — preventing duplicate
247 + /// sends), then spawn the Postmark I/O off the lock-held connection. A failed
248 + /// claim leaves the rows untouched to retry next tick rather than sending
249 + /// without a claim. Missing a non-critical onboarding email is better than
250 + /// sending it twice.
251 + async fn claim_and_spawn_sends(state: &AppState, send: Vec<DbUser>, next: OnboardingStep) {
252 + if send.is_empty() {
253 + return;
254 + }
255 + let send_ids: Vec<_> = send.iter().map(|u| u.id).collect();
256 + if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &send_ids, next.as_i16()).await {
257 + tracing::warn!(count = send_ids.len(), step = ?next, error = ?e, "failed to claim onboarding batch; retrying next tick");
258 + return;
259 + }
260 +
261 + let email_client = state.email.clone();
262 + let host_url = state.config.host_url.clone();
263 + tokio::spawn(async move {
264 + for (i, user) in send.into_iter().enumerate() {
265 + // Rate-limit: pause briefly every 50 emails to avoid hammering Postmark
266 + if i > 0 && i % 50 == 0 {
267 + tokio::time::sleep(std::time::Duration::from_secs(1)).await;
254 268 }
255 - if let Err(e) = state
256 - .email
257 - .send_onboarding_stripe(&user.email, user.display_name.as_deref(), host_url)
258 - .await
259 - {
260 - tracing::error!(error = ?e, user_id = %user.id, "failed to send onboarding stripe email");
269 + let res = match next {
270 + OnboardingStep::StripeGuideSent => {
271 + email_client
272 + .send_onboarding_stripe(&user.email, user.display_name.as_deref(), &host_url)
273 + .await
274 + }
275 + // ProfileTipsSent (WelcomeSent is never a "next" step).
276 + _ => {
277 + email_client
278 + .send_onboarding_profile(&user.email, user.display_name.as_deref(), &host_url)
279 + .await
280 + }
281 + };
282 + if let Err(e) = res {
283 + tracing::error!(error = ?e, user_id = %user.id, step = ?next, "failed to send onboarding email");
261 284 }
262 285 }
263 - }
286 + });
264 287 }
@@ -645,7 +645,7 @@ async fn subscription_lifecycle_subscribe_cancel_access_revoked() {
645 645 .await
646 646 .unwrap();
647 647
648 - // Step 4: Verify subscriber has access (mirrors has_active_subscription_to_project)
648 + // Step 4: Verify subscriber has access (mirrors db::subscriptions::has_access)
649 649 let has_access: bool = sqlx::query_scalar(
650 650 "SELECT COUNT(*) > 0 FROM subscriptions WHERE subscriber_id = $1 AND project_id = $2 AND status = 'active' AND paused_at IS NULL",
651 651 )