Skip to main content

max / makenotwork

29.9 KB · 727 lines History Blame Raw
1 //! Webhook handlers for checkout.session.completed events.
2
3 use crate::{
4 db,
5 error::{AppError, Result, ResultExt},
6 helpers::{self, spawn_email},
7 payments::{CheckoutMetadata, CreatorTierCheckoutMetadata, FanPlusCheckoutMetadata, SubscriptionCheckoutMetadata, SynckitAppSubCheckoutMetadata, TipCheckoutMetadata},
8 AppState,
9 };
10
11 use super::checkout_helpers::{
12 check_pending_refund, maybe_generate_license_key, record_tip_splits,
13 record_transaction_splits, send_guest_sale_notification, send_purchase_emails,
14 send_tip_email, subscribe_buyer_to_mailing_list,
15 };
16
17 /// Handle checkout.session.completed for one-time purchases
18 #[tracing::instrument(skip_all, name = "stripe::handle_purchase_checkout")]
19 pub(super) async fn handle_purchase_checkout_completed(
20 state: &AppState,
21 session: &crate::payments::CheckoutSessionView,
22 event_id: &str,
23 ) -> Result<()> {
24 let session_id = session.id.clone();
25
26 tracing::info!(session_id = %session_id, "processing completed purchase checkout");
27
28 // Extract metadata (already typed IDs from CheckoutMetadata)
29 let raw_metadata = CheckoutMetadata::from_metadata(session.metadata.as_ref())?;
30 let buyer_id = raw_metadata.buyer_id;
31 let seller_id = raw_metadata.seller_id;
32 let item_id = raw_metadata.item_id;
33 let _promo_code_id = raw_metadata.promo_code_id;
34
35 let item_id_display = item_id.map(|id| id.to_string()).unwrap_or_else(|| "project".to_string());
36
37 // Get the payment intent ID
38 let payment_intent_id = session.payment_intent.clone().unwrap_or_else(|| "unknown".to_string());
39
40 // Complete the transaction (idempotent - returns None if already completed).
41 // Steps 1-3 (complete_transaction, increment_sales_count, discount code increment)
42 // are wrapped in a single DB transaction to prevent inconsistent state if any step fails.
43 let mut db_tx = state.db.begin().await.context("begin purchase webhook transaction")?;
44
45 match db::transactions::complete_transaction(&mut *db_tx, &session_id, &payment_intent_id).await {
46 Ok(Some(tx)) => {
47 tracing::info!(
48 buyer_id = %buyer_id, seller_id = %seller_id, item_id = %item_id_display, amount_cents = %tx.amount_cents,
49 "transaction completed"
50 );
51
52 // Defense-in-depth reconciliation: our line items are server-built,
53 // so Stripe's pre-tax subtotal should equal the amount we credit. A
54 // mismatch (a future Stripe Tax / price-edit / currency edge) is
55 // logged loudly rather than silently trusted — we still credit the
56 // server amount, which is authoritative.
57 if let Some(subtotal) = session.amount_subtotal
58 && subtotal != i64::from(tx.amount_cents)
59 {
60 tracing::error!(
61 session_id = %session_id, credited_cents = %tx.amount_cents, stripe_subtotal_cents = %subtotal,
62 "checkout amount mismatch: credited transaction amount differs from Stripe session subtotal"
63 );
64 }
65
66 // Increment denormalized sales_count (inside transaction)
67 if let Some(iid) = item_id {
68 db::items::increment_sales_count(&mut *db_tx, iid)
69 .await
70 .with_context(|| format!("increment sales count for item {iid}"))?;
71 }
72
73 // Promo code use_count is reserved at checkout time (not here) to prevent
74 // concurrent checkouts from exceeding max_uses. No increment needed in webhook.
75
76 // Commit the critical data integrity operations
77 db_tx.commit().await.context("commit purchase webhook transaction")?;
78
79 // --- Secondary effects below (outside transaction) ---
80
81 // Grant access to bundle child items (if this is a bundle)
82 if let Some(iid) = item_id
83 && let Ok(Some(purchased_item)) = db::items::get_item_by_id(&state.db, iid).await
84 && purchased_item.item_type == db::ItemType::Bundle
85 {
86 crate::routes::stripe::checkout::grant_bundle_items(state, iid, buyer_id, seller_id, Some(tx.id)).await;
87 }
88
89 if tx.share_contact {
90 db::transactions::clear_contact_revocation(&state.db, buyer_id, seller_id)
91 .await
92 .context("clear contact revocation after purchase")?;
93 }
94
95 // Record revenue splits if the item's project has members
96 if let Some(iid) = item_id {
97 record_transaction_splits(state, tx.id, iid, tx.amount_cents).await;
98 maybe_generate_license_key(state, iid, buyer_id, tx.id).await;
99 subscribe_buyer_to_mailing_list(state, iid, buyer_id);
100 }
101
102 send_purchase_emails(state, &tx, buyer_id, seller_id);
103
104 if let Err(e) = db::subscriptions::log_subscription_event(
105 &state.db, None, event_id, "checkout.session.completed.purchase",
106 &serde_json::json!({"session_id": session_id}),
107 ).await {
108 tracing::warn!(event_id = %event_id, error = ?e, "failed to log subscription event");
109 }
110
111 // Check for a pending refund that arrived before this payment webhook.
112 // If found, process it now that the transaction is completed.
113 check_pending_refund(state, &payment_intent_id).await;
114 }
115 Ok(None) => {
116 tracing::info!(session_id = %session_id, "transaction already completed, ignoring duplicate webhook");
117 }
118 Err(e) => {
119 tracing::error!(session_id = %session_id, error = ?e, "failed to complete transaction");
120 return Err(e);
121 }
122 }
123
124 Ok(())
125 }
126
127 /// Handle checkout.session.completed for cart (multi-item) purchases
128 #[tracing::instrument(skip_all, name = "stripe::handle_cart_checkout")]
129 pub(super) async fn handle_cart_checkout_completed(
130 state: &AppState,
131 session: &crate::payments::CheckoutSessionView,
132 event_id: &str,
133 ) -> Result<()> {
134 let session_id = session.id.clone();
135 tracing::info!(session_id = %session_id, "processing completed cart checkout");
136
137 let meta = crate::payments::CartCheckoutMetadata::from_metadata(session.metadata.as_ref())?;
138 let buyer_id = meta.buyer_id;
139 let seller_id = meta.seller_id;
140
141 let payment_intent_id = session.payment_intent.clone().unwrap_or_else(|| "unknown".to_string());
142
143 // Complete ALL pending transactions for this session in a single DB transaction
144 let mut db_tx = state.db.begin().await.context("begin cart webhook transaction")?;
145
146 let completed_txs = db::transactions::complete_cart_transactions(
147 &mut *db_tx, &session_id, &payment_intent_id,
148 )
149 .await
150 .context("complete cart transactions")?;
151
152 if completed_txs.is_empty() {
153 tracing::info!(session_id = %session_id, "cart transactions already completed, ignoring duplicate webhook");
154 return Ok(());
155 }
156
157 tracing::info!(
158 session_id = %session_id, buyer_id = %buyer_id, seller_id = %seller_id,
159 count = completed_txs.len(), "cart transactions completed"
160 );
161
162 // Defense-in-depth reconciliation: line items are server-built, so the sum
163 // of the credited transactions should equal Stripe's pre-tax subtotal. A
164 // mismatch (future Stripe Tax / price-edit / currency edge) is logged loudly
165 // rather than silently trusted — the server amounts remain authoritative.
166 if let Some(subtotal) = session.amount_subtotal {
167 let credited: i64 = completed_txs.iter().map(|tx| i64::from(tx.amount_cents)).sum();
168 if credited != subtotal {
169 tracing::error!(
170 session_id = %session_id, credited_cents = %credited, stripe_subtotal_cents = %subtotal,
171 "cart checkout amount mismatch: sum of credited transactions differs from Stripe session subtotal"
172 );
173 }
174 }
175
176 // Increment sales count for each item
177 for tx in &completed_txs {
178 if let Some(item_id) = tx.item_id {
179 db::items::increment_sales_count(&mut *db_tx, item_id)
180 .await
181 .with_context(|| format!("increment sales count for item {item_id}"))?;
182 }
183 }
184
185 db_tx.commit().await.context("commit cart webhook transaction")?;
186
187 // Remove purchased items from cart (items stay in cart until payment succeeds,
188 // so cancelled checkouts don't lose cart contents)
189 db::cart::remove_seller_items_from_cart(&state.db, buyer_id, seller_id)
190 .await
191 .context("remove cart items after successful payment")?;
192
193 // --- Secondary effects (outside transaction) ---
194
195 for tx in &completed_txs {
196 if let Some(item_id) = tx.item_id {
197 // Bundle grants
198 if let Ok(Some(purchased_item)) = db::items::get_item_by_id(&state.db, item_id).await
199 && purchased_item.item_type == db::ItemType::Bundle
200 {
201 crate::routes::stripe::checkout::grant_bundle_items(
202 state, item_id, buyer_id, seller_id, Some(tx.id),
203 )
204 .await;
205 }
206
207 // Revenue splits
208 record_transaction_splits(state, tx.id, item_id, tx.amount_cents).await;
209
210 // License keys
211 maybe_generate_license_key(state, item_id, buyer_id, tx.id).await;
212
213 // Mailing list
214 subscribe_buyer_to_mailing_list(state, item_id, buyer_id);
215 }
216 }
217
218 // Contact sharing (once per seller)
219 if completed_txs.iter().any(|t| t.share_contact) {
220 db::transactions::clear_contact_revocation(&state.db, buyer_id, seller_id)
221 .await
222 .context("clear contact revocation after cart purchase")?;
223 }
224
225 // Send purchase emails for each item (reuse existing per-item emails)
226 for tx in &completed_txs {
227 send_purchase_emails(state, tx, buyer_id, seller_id);
228 }
229
230 // Log event
231 if let Err(e) = db::subscriptions::log_subscription_event(
232 &state.db, None, event_id, "checkout.session.completed.cart",
233 &serde_json::json!({"session_id": session_id, "item_count": completed_txs.len()}),
234 ).await {
235 tracing::warn!(event_id = %event_id, error = ?e, "failed to log cart checkout event");
236 }
237
238 // Check for pending refund
239 check_pending_refund(state, &payment_intent_id).await;
240
241 Ok(())
242 }
243
244 /// Handle checkout.session.completed for subscriptions
245 #[tracing::instrument(skip_all, name = "stripe::handle_subscription_checkout")]
246 pub(super) async fn handle_subscription_checkout_completed(
247 state: &AppState,
248 session: &crate::payments::CheckoutSessionView,
249 event_id: &str,
250 ) -> Result<()> {
251 let session_id = session.id.clone();
252 tracing::info!(session_id = %session_id, "processing completed subscription checkout");
253
254 // Extract subscription-specific metadata (already typed IDs)
255 let raw_metadata = SubscriptionCheckoutMetadata::from_metadata(session.metadata.as_ref())?;
256 let subscriber_id = raw_metadata.subscriber_id;
257 let project_id = raw_metadata.project_id;
258 let tier_id = raw_metadata.tier_id;
259
260 // Get the Stripe subscription ID from the session
261 let stripe_subscription_id = session.subscription.clone()
262 .ok_or_else(|| {
263 tracing::error!("Subscription checkout completed but no subscription ID on session");
264 AppError::BadRequest("Missing subscription ID on session".to_string())
265 })?;
266
267 // Get the Stripe customer ID from the session
268 let stripe_customer_id = session.customer.clone()
269 .ok_or_else(|| {
270 tracing::error!("Subscription checkout completed but no customer ID on session");
271 AppError::BadRequest("Missing customer ID on session".to_string())
272 })?;
273
274 // Create the subscription record + increment promo code in a single transaction.
275 let mut tx = state.db.begin().await.context("begin subscription webhook transaction")?;
276
277 let sub = match db::subscriptions::create_subscription(
278 &mut *tx,
279 subscriber_id,
280 tier_id,
281 project_id,
282 &stripe_subscription_id,
283 &stripe_customer_id,
284 ).await
285 .context("create subscription record")? {
286 Some(sub) => sub,
287 None => {
288 tracing::info!(
289 subscriber_id = %subscriber_id, project_id = %project_id,
290 "subscription already exists, ignoring duplicate"
291 );
292 return Ok(());
293 }
294 };
295
296 // Promo code use_count is reserved at checkout time (not here) to prevent
297 // concurrent checkouts from exceeding max_uses. No increment needed in webhook.
298
299 // Delete the pending promo-hold transaction (created at checkout time so
300 // cleanup_stale_pending_transactions can release the code if abandoned).
301 db::transactions::delete_subscription_pending_transaction(&mut *tx, &session_id)
302 .await
303 .context("delete subscription pending promo-hold transaction")?;
304
305 tx.commit().await.context("commit subscription webhook transaction")?;
306
307 tracing::info!(
308 subscription_id = %sub.id, subscriber_id = %subscriber_id, project_id = %project_id, tier_id = %tier_id,
309 "subscription created"
310 );
311
312 // Send subscription started email (fire-and-forget)
313 if let (Ok(Some(subscriber)), Ok(Some(tier)), Ok(Some(project))) = (
314 db::users::get_user_by_id(&state.db, subscriber_id).await,
315 db::subscriptions::get_subscription_tier_by_id(&state.db, tier_id).await,
316 db::projects::get_project_by_id(&state.db, project_id).await,
317 ) {
318 let price = helpers::format_price(tier.price_cents);
319 let sub_email = subscriber.email.clone();
320 let sub_name = subscriber.display_name.clone();
321 let tier_name = tier.name.clone();
322 let project_title = project.title.clone();
323 spawn_email!(state, "subscription started", |email| {
324 email.send_subscription_started(
325 &sub_email,
326 sub_name.as_deref(),
327 &tier_name,
328 &project_title,
329 &price,
330 )
331 });
332 }
333
334 // Log event
335 if let Err(e) = db::subscriptions::log_subscription_event(
336 &state.db, Some(sub.id), event_id, "checkout.session.completed.subscription",
337 &serde_json::json!({"session_id": session_id, "stripe_subscription_id": stripe_subscription_id}),
338 ).await {
339 tracing::warn!(event_id = %event_id, error = ?e, "failed to log subscription event");
340 }
341
342 Ok(())
343 }
344
345 /// Handle checkout.session.completed for Fan+ subscriptions
346 #[tracing::instrument(skip_all, name = "stripe::handle_fan_plus_checkout")]
347 pub(super) async fn handle_fan_plus_checkout_completed(
348 state: &AppState,
349 session: &crate::payments::CheckoutSessionView,
350 event_id: &str,
351 ) -> Result<()> {
352 let session_id = session.id.clone();
353 tracing::info!(session_id = %session_id, "processing completed Fan+ checkout");
354
355 let metadata = FanPlusCheckoutMetadata::from_metadata(session.metadata.as_ref())?;
356 let user_id = metadata.user_id;
357
358 // Get the Stripe subscription ID from the session
359 let stripe_subscription_id = session.subscription.clone()
360 .ok_or_else(|| {
361 tracing::error!("Fan+ checkout completed but no subscription ID on session");
362 AppError::BadRequest("Missing subscription ID on session".to_string())
363 })?;
364
365 // Get the Stripe customer ID from the session
366 let stripe_customer_id = session.customer.clone()
367 .ok_or_else(|| {
368 tracing::error!("Fan+ checkout completed but no customer ID on session");
369 AppError::BadRequest("Missing customer ID on session".to_string())
370 })?;
371
372 // Create the subscription record. Idempotent via ON CONFLICT (user_id) DO
373 // UPDATE with a guard WHERE: a duplicate webhook for an unchanged row updates
374 // nothing and RETURNING yields no row (-> None below, "already exists"); a
375 // genuine re-subscribe updates in place.
376 let sub = match db::fan_plus::create_fan_plus_subscription(
377 &state.db, user_id, &stripe_subscription_id, &stripe_customer_id,
378 ).await
379 .with_context(|| format!("create Fan+ subscription for user {user_id}"))? {
380 Some(sub) => sub,
381 None => {
382 tracing::info!(user_id = %user_id, "Fan+ subscription already exists, ignoring duplicate");
383 return Ok(());
384 }
385 };
386
387 tracing::info!(
388 subscription_id = %sub.id, user_id = %user_id,
389 "Fan+ subscription created"
390 );
391
392 // Send welcome email (fire-and-forget)
393 if let Ok(Some(user)) = db::users::get_user_by_id(&state.db, user_id).await {
394 let user_email = user.email.clone();
395 let user_name = user.display_name.clone();
396 spawn_email!(state, "Fan+ welcome", |email| {
397 email.send_fan_plus_welcome(&user_email, user_name.as_deref())
398 });
399 }
400
401 // Log event
402 if let Err(e) = db::subscriptions::log_subscription_event(
403 &state.db, None, event_id, "checkout.session.completed.fan_plus",
404 &serde_json::json!({"session_id": session_id, "stripe_subscription_id": stripe_subscription_id}),
405 ).await {
406 tracing::warn!(event_id = %event_id, error = ?e, "failed to log subscription event");
407 }
408
409 Ok(())
410 }
411
412 /// Handle checkout.session.completed for creator tier subscriptions
413 #[tracing::instrument(skip_all, name = "stripe::handle_creator_tier_checkout")]
414 pub(super) async fn handle_creator_tier_checkout_completed(
415 state: &AppState,
416 session: &crate::payments::CheckoutSessionView,
417 event_id: &str,
418 ) -> Result<()> {
419 let session_id = session.id.clone();
420 tracing::info!(session_id = %session_id, "processing completed creator tier checkout");
421
422 let metadata = CreatorTierCheckoutMetadata::from_metadata(session.metadata.as_ref())?;
423 let user_id = metadata.user_id;
424 let tier: db::CreatorTier = metadata.tier.parse()
425 .map_err(|_| AppError::BadRequest(format!("Invalid tier: {}", metadata.tier)))?;
426
427 // Get the Stripe subscription ID from the session
428 let stripe_subscription_id = session.subscription.clone()
429 .ok_or_else(|| {
430 tracing::error!("Creator tier checkout completed but no subscription ID on session");
431 AppError::BadRequest("Missing subscription ID on session".to_string())
432 })?;
433
434 // Get the Stripe customer ID from the session
435 let stripe_customer_id = session.customer.clone()
436 .ok_or_else(|| {
437 tracing::error!("Creator tier checkout completed but no customer ID on session");
438 AppError::BadRequest("Missing customer ID on session".to_string())
439 })?;
440
441 // Create the subscription record. Idempotent via ON CONFLICT (user_id) DO
442 // UPDATE with a guard WHERE (`stripe_subscription_id != EXCLUDED OR status
443 // != 'active'`): a duplicate webhook updates nothing and RETURNING yields no
444 // row (-> None below); a genuine tier-switch or re-subscribe overwrites the
445 // row with the new subscription id and flips it active.
446 let sub = match db::creator_tiers::create_creator_subscription(
447 &state.db, user_id, &stripe_subscription_id, &stripe_customer_id, tier,
448 ).await
449 .with_context(|| format!("create creator tier subscription for user {user_id}"))? {
450 Some(sub) => sub,
451 None => {
452 tracing::info!(user_id = %user_id, "Creator tier subscription already exists, ignoring duplicate");
453 return Ok(());
454 }
455 };
456
457 // Sync the denormalized creator_tier column on users
458 db::creator_tiers::sync_user_creator_tier(&state.db, user_id)
459 .await
460 .with_context(|| format!("sync creator tier for user {user_id}"))?;
461
462 // Auto-unhide: restore items hidden by post-grace enforcement
463 match db::items::unhide_all_items_for_user(&state.db, user_id).await {
464 Ok(count) if count > 0 => {
465 tracing::info!(user_id = %user_id, items_unhidden = count, "auto-unhidden items after tier re-subscription");
466 }
467 Err(e) => {
468 tracing::warn!(user_id = %user_id, error = ?e, "failed to unhide items after tier re-subscription");
469 }
470 _ => {}
471 }
472
473 // Auto-unpause: if this creator was paused and just re-subscribed, clear the pause
474 // and un-cancel any fan subscriptions that haven't expired yet.
475 if let Ok(Some(db_user)) = db::users::get_user_by_id(&state.db, user_id).await
476 && db_user.is_creator_paused()
477 {
478 db::users::unpause_creator(&state.db, user_id)
479 .await
480 .with_context(|| format!("unpause creator {user_id}"))?;
481
482 // Un-cancel active fan subscriptions (clear cancel_at_period_end)
483 if let (Some(stripe), Some(stripe_account_id)) = (&state.stripe, &db_user.stripe_account_id) {
484 let fan_subs = db::subscriptions::get_active_subscriptions_by_creator(&state.db, user_id)
485 .await
486 .with_context(|| format!("fetch active fan subs for unpause {user_id}"))?;
487 for fan_sub in &fan_subs {
488 if let Err(e) = stripe.set_cancel_at_period_end(
489 &fan_sub.stripe_subscription_id,
490 stripe_account_id,
491 false,
492 ).await {
493 tracing::warn!(
494 stripe_sub_id = %fan_sub.stripe_subscription_id,
495 error = ?e,
496 "failed to clear cancel_at_period_end on fan sub during unpause"
497 );
498 }
499 }
500 }
501
502 tracing::info!(user_id = %user_id, "creator auto-unpaused after re-subscribing to tier");
503 }
504
505 tracing::info!(
506 user_id = %user_id, tier = %tier,
507 "creator tier subscription created"
508 );
509
510 // Log event
511 if let Err(e) = db::subscriptions::log_subscription_event(
512 &state.db, None, event_id, "checkout.session.completed.creator_tier",
513 &serde_json::json!({
514 "session_id": session_id,
515 "stripe_subscription_id": stripe_subscription_id,
516 "tier": sub.tier,
517 }),
518 ).await {
519 tracing::warn!(event_id = %event_id, error = ?e, "failed to log subscription event");
520 }
521
522 Ok(())
523 }
524
525 /// Handle checkout.session.completed for tips
526 #[tracing::instrument(skip_all, name = "stripe::handle_tip_checkout")]
527 pub(super) async fn handle_tip_checkout_completed(
528 state: &AppState,
529 session: &crate::payments::CheckoutSessionView,
530 _event_id: &str,
531 ) -> Result<()> {
532 let session_id = session.id.clone();
533 tracing::info!(session_id = %session_id, "processing completed tip checkout");
534
535 let metadata = TipCheckoutMetadata::from_metadata(session.metadata.as_ref())?;
536 let tipper_id = metadata.tipper_id;
537 let recipient_id = metadata.recipient_id;
538
539 let payment_intent_id = session.payment_intent.clone().unwrap_or_else(|| "unknown".to_string());
540
541 // Complete the tip (idempotent)
542 match db::tips::complete_tip(&state.db, &session_id, &payment_intent_id)
543 .await
544 .context("complete tip")? {
545 Some(tip) => {
546 tracing::info!(
547 tip_id = %tip.id, tipper_id = %tipper_id, recipient_id = %recipient_id,
548 amount_cents = %tip.amount_cents, "tip completed"
549 );
550
551 // Record revenue splits if the tip's project has members
552 if let Some(project_id) = tip.project_id {
553 record_tip_splits(state, tip.id, project_id, tip.amount_cents).await;
554 }
555
556 // Send tip notification email (fire-and-forget)
557 send_tip_email(state, &tip, tipper_id, recipient_id);
558 }
559 None => {
560 tracing::info!(session_id = %session_id, "tip already completed, ignoring duplicate webhook");
561 }
562 }
563
564 Ok(())
565 }
566
567 /// Handle checkout.session.completed for guest purchases (no MNW account).
568 ///
569 /// Extracts the buyer's email from Stripe, completes the transaction, and
570 /// auto-attaches to an existing account if the email matches.
571 #[tracing::instrument(skip_all, name = "stripe::handle_guest_checkout")]
572 pub(super) async fn handle_guest_checkout_completed(
573 state: &AppState,
574 session: &crate::payments::CheckoutSessionView,
575 _event_id: &str,
576 ) -> Result<()> {
577 use crate::payments::GuestCheckoutMetadata;
578
579 let session_id = session.id.clone();
580 tracing::info!(session_id = %session_id, "processing completed guest checkout");
581
582 let meta = GuestCheckoutMetadata::from_metadata(session.metadata.as_ref())?;
583
584 // Extract buyer email from Stripe customer_details
585 let guest_email = session.customer_details.as_ref()
586 .and_then(|cd| cd.email.as_deref())
587 .unwrap_or("unknown@guest")
588 .to_string();
589
590 let payment_intent_id = session.payment_intent.clone().unwrap_or_else(|| "unknown".to_string());
591
592 // Complete the guest transaction and increment sales count in a single DB transaction
593 // (matching the non-guest path pattern to prevent counter drift on partial failure)
594 let mut db_tx = state.db.begin().await.context("begin guest checkout webhook transaction")?;
595
596 // Check if email matches an existing user — auto-attach if so.
597 // `FOR SHARE` blocks a concurrent email-change UPDATE from racing the
598 // attach: if someone edits this user's email mid-checkout, the writer
599 // waits for our tx to commit so we either attach the row we matched or
600 // the writer wins and we see no row (treated as guest purchase).
601 let existing_user_id: Option<db::UserId> = sqlx::query_scalar(
602 "SELECT id FROM users WHERE LOWER(email) = LOWER($1) AND email_verified = true FOR SHARE",
603 )
604 .bind(&guest_email)
605 .fetch_optional(&mut *db_tx)
606 .await?;
607
608 match db::transactions::complete_guest_transaction(
609 &mut *db_tx,
610 &session_id,
611 &payment_intent_id,
612 &guest_email,
613 existing_user_id,
614 ).await? {
615 Some(tx) => {
616 tracing::info!(
617 session_id = %session_id,
618 guest_email = %guest_email,
619 item_id = %meta.item_id,
620 auto_attached = tx.buyer_id.is_some(),
621 "guest transaction completed"
622 );
623
624 // Increment sales count inside transaction
625 db::items::increment_sales_count(&mut *db_tx, meta.item_id)
626 .await
627 .with_context(|| format!("increment sales count for guest item {}", meta.item_id))?;
628
629 db_tx.commit().await.context("commit guest checkout webhook transaction")?;
630
631 // --- Secondary effects below (outside transaction) ---
632
633 // Generate license key if applicable and buyer was auto-attached
634 if let Some(buyer_id) = tx.buyer_id {
635 maybe_generate_license_key(state, meta.item_id, buyer_id, tx.id).await;
636 }
637
638 // Record revenue splits
639 record_transaction_splits(state, tx.id, meta.item_id, tx.amount_cents).await;
640
641 // Send guest purchase confirmation email (only if not auto-attached to existing account)
642 if tx.buyer_id.is_none()
643 && let (Some(download_token), Some(claim_token)) = (tx.download_token, tx.claim_token)
644 {
645 let email_client = state.email.clone();
646 let host_url = state.config.host_url.clone();
647 let item_title = tx.item_title.clone().unwrap_or_else(|| "your item".to_string());
648 let price = helpers::format_price(tx.amount_cents);
649 let guest_email_addr = guest_email.clone();
650 let download_url = format!("{}/download/{}", host_url, download_token);
651 let claim_url = format!("{}/claim?token={}", host_url, claim_token);
652
653 state.bg.spawn("guest purchase confirmation", async move {
654 if let Err(e) = email_client.send_guest_purchase_confirmation(
655 &guest_email_addr, &item_title, &price, &download_url, &claim_url,
656 ).await {
657 tracing::error!(error = ?e, "failed to send guest purchase confirmation email");
658 }
659 });
660 }
661
662 // Send sale notification to seller
663 send_guest_sale_notification(state, &tx, &guest_email, meta.seller_id);
664 }
665 None => {
666 db_tx.commit().await.ok();
667 tracing::info!(session_id = %session_id, "guest transaction already completed, ignoring duplicate webhook");
668 }
669 }
670
671 Ok(())
672 }
673
674 /// Handle checkout.session.completed for an end-user SyncKit app subscription.
675 /// Inserts the `app_sync_subscriptions` row; subsequent
676 /// `customer.subscription.updated/.deleted` events keep it in sync.
677 #[tracing::instrument(skip_all, name = "stripe::handle_synckit_app_sub_checkout")]
678 pub(super) async fn handle_synckit_app_sub_checkout_completed(
679 state: &AppState,
680 session: &crate::payments::CheckoutSessionView,
681 event_id: &str,
682 ) -> Result<()> {
683 let session_id = session.id.clone();
684 tracing::info!(session_id = %session_id, "processing completed SyncKit app subscription checkout");
685
686 let meta = SynckitAppSubCheckoutMetadata::from_metadata(session.metadata.as_ref())?;
687
688 let stripe_subscription_id = session
689 .subscription
690 .clone()
691 .ok_or_else(|| AppError::BadRequest("Missing subscription ID on session".to_string()))?;
692 let stripe_customer_id = session
693 .customer
694 .clone()
695 .ok_or_else(|| AppError::BadRequest("Missing customer ID on session".to_string()))?;
696
697 let inserted = db::synckit::create_app_sync_subscription(
698 &state.db,
699 &db::synckit::NewAppSyncSubscription {
700 user_id: meta.user_id,
701 app_id: meta.app_id,
702 stripe_subscription_id: &stripe_subscription_id,
703 stripe_customer_id: &stripe_customer_id,
704 interval: &meta.tier, // metadata "tier" carries the interval string ("monthly"/"annual")
705 storage_limit_bytes: meta.storage_limit_bytes.unwrap_or(0),
706 },
707 )
708 .await
709 .with_context(|| {
710 format!(
711 "create app sync subscription user={} app={}",
712 meta.user_id, meta.app_id
713 )
714 })?;
715
716 if !inserted {
717 tracing::info!(
718 user_id = %meta.user_id,
719 app_id = %meta.app_id,
720 "SyncKit app subscription already exists, ignoring duplicate webhook"
721 );
722 }
723
724 let _ = event_id;
725 Ok(())
726 }
727