//! Release and blog post announcement emails via project mailing lists. use crate::db; use crate::db::{DbBlogPost, DbItem, DbUser}; use crate::AppState; /// Spawn a bounded email fan-out off the caller's (possibly advisory-lock-held) /// connection. `recipients` MUST already be bounded by the producing query's /// LIMIT — this helper owns the off-lock `tokio::spawn` and the every-50 /// Postmark pause, so no scheduler fan-out can re-introduce an inline serial /// send loop on the lock connection (Run #14 CHRONIC 2b: the shape that drifted /// between the announcement and onboarding paths). `send_one` is awaited once /// per recipient and owns its own per-recipient error logging. /// /// There is deliberately no non-spawning variant: routing every fan-out through /// here is what makes "serial sends on the lock connection" unwritable. fn spawn_bounded_fanout(recipients: Vec, send_one: F) where T: Send + 'static, F: Fn(T) -> Fut + Send + 'static, Fut: std::future::Future + Send, { if recipients.is_empty() { return; } tokio::spawn(async move { for (i, recipient) in recipients.into_iter().enumerate() { // Rate-limit: pause briefly every 50 emails to avoid hammering Postmark. if i > 0 && i % 50 == 0 { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } send_one(recipient).await; } }); } /// Atomically mark an item as release-announced and send subscriber emails /// via the project's content mailing list. /// /// Shared between the manual publish handler (`routes/api/items.rs`) and the /// scheduler. Safe to call multiple times — `mark_release_announced` /// is a no-op if the item was already announced. pub async fn send_release_announcements(state: &AppState, item: &DbItem) { if !db::items::mark_release_announced(&state.db, item.id) .await .unwrap_or(false) { return; } // Skip email delivery for web-only items if item.web_only { return; } let Ok(Some(project)) = db::projects::get_project_by_id(&state.db, item.project_id).await else { return; }; let Ok(Some(creator)) = db::users::get_user_by_id(&state.db, project.user_id).await else { return; }; let Ok(Some(list)) = db::mailing_lists::get_list_by_project_and_type( &state.db, item.project_id, db::MailingListType::Content, ) .await else { return; }; let Ok(subscribers) = db::mailing_lists::get_subscriber_emails(&state.db, list.id).await else { return; }; let creator_name = creator .display_name .as_deref() .unwrap_or(&creator.username) .to_string(); let item_title = item.title.clone(); let item_url = format!("{}/i/{}", state.config.host_url, item.id); let email_client = state.email.clone(); let host_url = state.config.host_url.clone(); let signing_secret = state.config.signing_secret.clone(); let list_id_str = list.id.to_string(); spawn_bounded_fanout(subscribers, move |subscriber| { let email_client = email_client.clone(); let host_url = host_url.clone(); let signing_secret = signing_secret.clone(); let creator_name = creator_name.clone(); let item_title = item_title.clone(); let item_url = item_url.clone(); let list_id_str = list_id_str.clone(); async move { let unsub_url = crate::email::generate_unsubscribe_url( &host_url, subscriber.id, crate::email::UnsubscribeAction::MailingList, &list_id_str, &signing_secret, ); if let Err(e) = email_client .send_release_announcement( &subscriber.email, subscriber.display_name.as_deref(), &creator_name, &item_title, &item_url, Some(&unsub_url), ) .await { tracing::error!(error = ?e, "failed to send release announcement email"); } } }); } /// Atomically mark a blog post as announced and send subscriber emails /// via the project's content mailing list. /// /// Shared between the blog post publish handlers and the scheduler. /// Safe to call multiple times — `mark_blog_post_announced` is a no-op /// if the post was already announced. pub async fn send_blog_post_announcements(state: &AppState, post: &DbBlogPost) { if !db::blog_posts::mark_blog_post_announced(&state.db, post.id) .await .unwrap_or(false) { return; } // Skip email delivery for web-only posts if post.web_only { return; } let Ok(Some(project)) = db::projects::get_project_by_id(&state.db, post.project_id).await else { return; }; let Ok(Some(creator)) = db::users::get_user_by_id(&state.db, project.user_id).await else { return; }; let Ok(Some(list)) = db::mailing_lists::get_list_by_project_and_type( &state.db, post.project_id, db::MailingListType::Content, ) .await else { return; }; let Ok(subscribers) = db::mailing_lists::get_subscriber_emails(&state.db, list.id).await else { return; }; let creator_name = creator .display_name .as_deref() .unwrap_or(&creator.username) .to_string(); let post_title = post.title.clone(); let post_url = format!( "{}/{}/blog/{}", state.config.host_url, project.slug, post.slug ); let email_client = state.email.clone(); let host_url = state.config.host_url.clone(); let signing_secret = state.config.signing_secret.clone(); let list_id_str = list.id.to_string(); spawn_bounded_fanout(subscribers, move |subscriber| { let email_client = email_client.clone(); let host_url = host_url.clone(); let signing_secret = signing_secret.clone(); let creator_name = creator_name.clone(); let post_title = post_title.clone(); let post_url = post_url.clone(); let list_id_str = list_id_str.clone(); async move { let unsub_url = crate::email::generate_unsubscribe_url( &host_url, subscriber.id, crate::email::UnsubscribeAction::MailingList, &list_id_str, &signing_secret, ); if let Err(e) = email_client .send_blog_post_announcement( &subscriber.email, subscriber.display_name.as_deref(), &creator_name, &post_title, &post_url, Some(&unsub_url), ) .await { tracing::error!(error = ?e, "failed to send blog post announcement email"); } } }); } /// Onboarding email drip steps (maps to `onboarding_email_step` i16 column). #[allow(clippy::enum_variant_names)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(i16)] enum OnboardingStep { /// Welcome email sent at signup. WelcomeSent = 1, /// Profile tips email (24h after welcome). ProfileTipsSent = 2, /// Stripe guide email (72h after welcome). StripeGuideSent = 3, } impl OnboardingStep { fn as_i16(self) -> i16 { self as i16 } } /// Process the getting-started email drip sequence. /// /// Step 1 (welcome) is sent at signup in the auth handler. /// Step 2 (profile tips) fires 24h after welcome, skipped if display_name is set. /// Step 3 (Stripe guide) fires 72h after welcome, skipped if Stripe is connected. /// /// Only the candidate fetch + step-advance (fast DB writes) run inline; the /// actual Postmark sends are spawned off the scheduler's lock-held connection /// so a backlog of serial email I/O can't extend the advisory-lock hold time /// (Run #14 MEDIUM, mirrors the release/blog announcement fan-out). pub(super) async fn send_onboarding_emails(state: &AppState) { // Step 1→2: profile tips (24h after welcome) let next = OnboardingStep::ProfileTipsSent; if let Ok(users) = db::users::get_onboarding_candidates(&state.db, OnboardingStep::WelcomeSent.as_i16(), chrono::Duration::hours(24)).await { // Batch-advance users who already set a display name (skip email) let (skip, send): (Vec<_>, Vec<_>) = users.into_iter().partition(|u| u.display_name.is_some()); advance_skipped(state, &skip, next).await; claim_and_spawn_sends(state, send, next).await; } // Step 2→3: Stripe guide (72h after welcome) let next = OnboardingStep::StripeGuideSent; if let Ok(users) = db::users::get_onboarding_candidates(&state.db, OnboardingStep::ProfileTipsSent.as_i16(), chrono::Duration::hours(48)).await { // Batch-advance users who already connected Stripe (skip email) let (skip, send): (Vec<_>, Vec<_>) = users.into_iter().partition(|u| u.stripe_account_id.is_some()); advance_skipped(state, &skip, next).await; claim_and_spawn_sends(state, send, next).await; } } /// Batch-advance users who don't need an email for this step (display name / /// Stripe already set). Inline — a single cheap UPDATE. async fn advance_skipped(state: &AppState, skip: &[DbUser], next: OnboardingStep) { if skip.is_empty() { return; } let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect(); if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await { tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step"); } } /// Claim the send batch by advancing its step BEFORE sending (so concurrent /// instances and the next tick re-exclude these users — preventing duplicate /// sends), then spawn the Postmark I/O off the lock-held connection. A failed /// claim leaves the rows untouched to retry next tick rather than sending /// without a claim. Missing a non-critical onboarding email is better than /// sending it twice. async fn claim_and_spawn_sends(state: &AppState, send: Vec, next: OnboardingStep) { if send.is_empty() { return; } let send_ids: Vec<_> = send.iter().map(|u| u.id).collect(); if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &send_ids, next.as_i16()).await { tracing::warn!(count = send_ids.len(), step = ?next, error = ?e, "failed to claim onboarding batch; retrying next tick"); return; } let email_client = state.email.clone(); let host_url = state.config.host_url.clone(); spawn_bounded_fanout(send, move |user| { let email_client = email_client.clone(); let host_url = host_url.clone(); async move { let res = match next { OnboardingStep::StripeGuideSent => { email_client .send_onboarding_stripe(&user.email, user.display_name.as_deref(), &host_url) .await } // ProfileTipsSent (WelcomeSent is never a "next" step). _ => { email_client .send_onboarding_profile(&user.email, user.display_name.as_deref(), &host_url) .await } }; if let Err(e) = res { tracing::error!(error = ?e, user_id = %user.id, step = ?next, "failed to send onboarding email"); } } }); }