Skip to main content

max / makenotwork

11.6 KB · 316 lines History Blame Raw
1 //! Release and blog post announcement emails via project mailing lists.
2
3 use crate::db;
4 use crate::db::{DbBlogPost, DbItem, DbUser};
5 use crate::AppState;
6
7 /// Spawn a bounded email fan-out off the caller's (possibly advisory-lock-held)
8 /// connection. `recipients` MUST already be bounded by the producing query's
9 /// LIMIT — this helper owns the off-lock `tokio::spawn` and the every-50
10 /// Postmark pause, so no scheduler fan-out can re-introduce an inline serial
11 /// send loop on the lock connection (Run #14 CHRONIC 2b: the shape that drifted
12 /// between the announcement and onboarding paths). `send_one` is awaited once
13 /// per recipient and owns its own per-recipient error logging.
14 ///
15 /// There is deliberately no non-spawning variant: routing every fan-out through
16 /// here is what makes "serial sends on the lock connection" unwritable.
17 fn spawn_bounded_fanout<T, F, Fut>(recipients: Vec<T>, send_one: F)
18 where
19 T: Send + 'static,
20 F: Fn(T) -> Fut + Send + 'static,
21 Fut: std::future::Future<Output = ()> + Send,
22 {
23 if recipients.is_empty() {
24 return;
25 }
26 tokio::spawn(async move {
27 for (i, recipient) in recipients.into_iter().enumerate() {
28 // Rate-limit: pause briefly every 50 emails to avoid hammering Postmark.
29 if i > 0 && i % 50 == 0 {
30 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
31 }
32 send_one(recipient).await;
33 }
34 });
35 }
36
37 /// Atomically mark an item as release-announced and send subscriber emails
38 /// via the project's content mailing list.
39 ///
40 /// Shared between the manual publish handler (`routes/api/items.rs`) and the
41 /// scheduler. Safe to call multiple times — `mark_release_announced`
42 /// is a no-op if the item was already announced.
43 pub async fn send_release_announcements(state: &AppState, item: &DbItem) {
44 if !db::items::mark_release_announced(&state.db, item.id)
45 .await
46 .unwrap_or(false)
47 {
48 return;
49 }
50
51 // Skip email delivery for web-only items
52 if item.web_only {
53 return;
54 }
55
56 let Ok(Some(project)) = db::projects::get_project_by_id(&state.db, item.project_id).await
57 else {
58 return;
59 };
60 let Ok(Some(creator)) = db::users::get_user_by_id(&state.db, project.user_id).await else {
61 return;
62 };
63 let Ok(Some(list)) = db::mailing_lists::get_list_by_project_and_type(
64 &state.db,
65 item.project_id,
66 db::MailingListType::Content,
67 )
68 .await
69 else {
70 return;
71 };
72 let Ok(subscribers) = db::mailing_lists::get_subscriber_emails(&state.db, list.id).await
73 else {
74 return;
75 };
76
77 let creator_name = creator
78 .display_name
79 .as_deref()
80 .unwrap_or(&creator.username)
81 .to_string();
82 let item_title = item.title.clone();
83 let item_url = format!("{}/i/{}", state.config.host_url, item.id);
84 let email_client = state.email.clone();
85 let host_url = state.config.host_url.clone();
86 let signing_secret = state.config.signing_secret.clone();
87 let list_id_str = list.id.to_string();
88
89 spawn_bounded_fanout(subscribers, move |subscriber| {
90 let email_client = email_client.clone();
91 let host_url = host_url.clone();
92 let signing_secret = signing_secret.clone();
93 let creator_name = creator_name.clone();
94 let item_title = item_title.clone();
95 let item_url = item_url.clone();
96 let list_id_str = list_id_str.clone();
97 async move {
98 let unsub_url = crate::email::generate_unsubscribe_url(
99 &host_url,
100 subscriber.id,
101 crate::email::UnsubscribeAction::MailingList,
102 &list_id_str,
103 &signing_secret,
104 );
105 if let Err(e) = email_client
106 .send_release_announcement(
107 &subscriber.email,
108 subscriber.display_name.as_deref(),
109 &creator_name,
110 &item_title,
111 &item_url,
112 Some(&unsub_url),
113 )
114 .await
115 {
116 tracing::error!(error = ?e, "failed to send release announcement email");
117 }
118 }
119 });
120 }
121
122 /// Atomically mark a blog post as announced and send subscriber emails
123 /// via the project's content mailing list.
124 ///
125 /// Shared between the blog post publish handlers and the scheduler.
126 /// Safe to call multiple times — `mark_blog_post_announced` is a no-op
127 /// if the post was already announced.
128 pub async fn send_blog_post_announcements(state: &AppState, post: &DbBlogPost) {
129 if !db::blog_posts::mark_blog_post_announced(&state.db, post.id)
130 .await
131 .unwrap_or(false)
132 {
133 return;
134 }
135
136 // Skip email delivery for web-only posts
137 if post.web_only {
138 return;
139 }
140
141 let Ok(Some(project)) = db::projects::get_project_by_id(&state.db, post.project_id).await
142 else {
143 return;
144 };
145 let Ok(Some(creator)) = db::users::get_user_by_id(&state.db, project.user_id).await else {
146 return;
147 };
148 let Ok(Some(list)) = db::mailing_lists::get_list_by_project_and_type(
149 &state.db,
150 post.project_id,
151 db::MailingListType::Content,
152 )
153 .await
154 else {
155 return;
156 };
157 let Ok(subscribers) = db::mailing_lists::get_subscriber_emails(&state.db, list.id).await
158 else {
159 return;
160 };
161
162 let creator_name = creator
163 .display_name
164 .as_deref()
165 .unwrap_or(&creator.username)
166 .to_string();
167 let post_title = post.title.clone();
168 let post_url = format!(
169 "{}/{}/blog/{}",
170 state.config.host_url, project.slug, post.slug
171 );
172 let email_client = state.email.clone();
173 let host_url = state.config.host_url.clone();
174 let signing_secret = state.config.signing_secret.clone();
175 let list_id_str = list.id.to_string();
176
177 spawn_bounded_fanout(subscribers, move |subscriber| {
178 let email_client = email_client.clone();
179 let host_url = host_url.clone();
180 let signing_secret = signing_secret.clone();
181 let creator_name = creator_name.clone();
182 let post_title = post_title.clone();
183 let post_url = post_url.clone();
184 let list_id_str = list_id_str.clone();
185 async move {
186 let unsub_url = crate::email::generate_unsubscribe_url(
187 &host_url,
188 subscriber.id,
189 crate::email::UnsubscribeAction::MailingList,
190 &list_id_str,
191 &signing_secret,
192 );
193 if let Err(e) = email_client
194 .send_blog_post_announcement(
195 &subscriber.email,
196 subscriber.display_name.as_deref(),
197 &creator_name,
198 &post_title,
199 &post_url,
200 Some(&unsub_url),
201 )
202 .await
203 {
204 tracing::error!(error = ?e, "failed to send blog post announcement email");
205 }
206 }
207 });
208 }
209
210 /// Onboarding email drip steps (maps to `onboarding_email_step` i16 column).
211 #[allow(clippy::enum_variant_names)]
212 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
213 #[repr(i16)]
214 enum OnboardingStep {
215 /// Welcome email sent at signup.
216 WelcomeSent = 1,
217 /// Profile tips email (24h after welcome).
218 ProfileTipsSent = 2,
219 /// Stripe guide email (72h after welcome).
220 StripeGuideSent = 3,
221 }
222
223 impl OnboardingStep {
224 fn as_i16(self) -> i16 { self as i16 }
225 }
226
227 /// Process the getting-started email drip sequence.
228 ///
229 /// Step 1 (welcome) is sent at signup in the auth handler.
230 /// Step 2 (profile tips) fires 24h after welcome, skipped if display_name is set.
231 /// Step 3 (Stripe guide) fires 72h after welcome, skipped if Stripe is connected.
232 ///
233 /// Only the candidate fetch + step-advance (fast DB writes) run inline; the
234 /// actual Postmark sends are spawned off the scheduler's lock-held connection
235 /// so a backlog of serial email I/O can't extend the advisory-lock hold time
236 /// (Run #14 MEDIUM, mirrors the release/blog announcement fan-out).
237 pub(super) async fn send_onboarding_emails(state: &AppState) {
238 // Step 1→2: profile tips (24h after welcome)
239 let next = OnboardingStep::ProfileTipsSent;
240 if let Ok(users) =
241 db::users::get_onboarding_candidates(&state.db, OnboardingStep::WelcomeSent.as_i16(), chrono::Duration::hours(24)).await
242 {
243 // Batch-advance users who already set a display name (skip email)
244 let (skip, send): (Vec<_>, Vec<_>) =
245 users.into_iter().partition(|u| u.display_name.is_some());
246 advance_skipped(state, &skip, next).await;
247 claim_and_spawn_sends(state, send, next).await;
248 }
249
250 // Step 2→3: Stripe guide (72h after welcome)
251 let next = OnboardingStep::StripeGuideSent;
252 if let Ok(users) =
253 db::users::get_onboarding_candidates(&state.db, OnboardingStep::ProfileTipsSent.as_i16(), chrono::Duration::hours(48)).await
254 {
255 // Batch-advance users who already connected Stripe (skip email)
256 let (skip, send): (Vec<_>, Vec<_>) =
257 users.into_iter().partition(|u| u.stripe_account_id.is_some());
258 advance_skipped(state, &skip, next).await;
259 claim_and_spawn_sends(state, send, next).await;
260 }
261 }
262
263 /// Batch-advance users who don't need an email for this step (display name /
264 /// Stripe already set). Inline — a single cheap UPDATE.
265 async fn advance_skipped(state: &AppState, skip: &[DbUser], next: OnboardingStep) {
266 if skip.is_empty() {
267 return;
268 }
269 let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect();
270 if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await {
271 tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step");
272 }
273 }
274
275 /// Claim the send batch by advancing its step BEFORE sending (so concurrent
276 /// instances and the next tick re-exclude these users — preventing duplicate
277 /// sends), then spawn the Postmark I/O off the lock-held connection. A failed
278 /// claim leaves the rows untouched to retry next tick rather than sending
279 /// without a claim. Missing a non-critical onboarding email is better than
280 /// sending it twice.
281 async fn claim_and_spawn_sends(state: &AppState, send: Vec<DbUser>, next: OnboardingStep) {
282 if send.is_empty() {
283 return;
284 }
285 let send_ids: Vec<_> = send.iter().map(|u| u.id).collect();
286 if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &send_ids, next.as_i16()).await {
287 tracing::warn!(count = send_ids.len(), step = ?next, error = ?e, "failed to claim onboarding batch; retrying next tick");
288 return;
289 }
290
291 let email_client = state.email.clone();
292 let host_url = state.config.host_url.clone();
293 spawn_bounded_fanout(send, move |user| {
294 let email_client = email_client.clone();
295 let host_url = host_url.clone();
296 async move {
297 let res = match next {
298 OnboardingStep::StripeGuideSent => {
299 email_client
300 .send_onboarding_stripe(&user.email, user.display_name.as_deref(), &host_url)
301 .await
302 }
303 // ProfileTipsSent (WelcomeSent is never a "next" step).
304 _ => {
305 email_client
306 .send_onboarding_profile(&user.email, user.display_name.as_deref(), &host_url)
307 .await
308 }
309 };
310 if let Err(e) = res {
311 tracing::error!(error = ?e, user_id = %user.id, step = ?next, "failed to send onboarding email");
312 }
313 }
314 });
315 }
316