Skip to main content

max / makenotwork

9.2 KB · 215 lines History Blame Raw
1 //! Stripe webhook event processing.
2
3 mod billing;
4 mod checkout;
5 mod checkout_helpers;
6 mod subscriptions;
7
8 use axum::{
9 body::Bytes,
10 extract::State,
11 http::{header::HeaderMap, StatusCode},
12 response::IntoResponse,
13 };
14
15 use crate::{
16 db,
17 error::{AppError, Result, ResultExt},
18 payments::{
19 self, AccountUpdate, AccountView, ChargeRefundData, ChargeView,
20 CheckoutSessionView, InvoiceView, SubscriptionView, UntypedEvent,
21 },
22 AppState,
23 };
24
25 /// POST /stripe/webhook - Handle Stripe webhook events
26 #[tracing::instrument(skip_all, name = "stripe::webhook")]
27 pub(in crate::routes::stripe) async fn webhook(
28 State(state): State<AppState>,
29 headers: HeaderMap,
30 body: Bytes,
31 ) -> Result<impl IntoResponse> {
32 let stripe = state.stripe.as_ref()
33 .ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?;
34
35 // Get the signature header
36 let signature = headers
37 .get("stripe-signature")
38 .and_then(|v| v.to_str().ok())
39 .ok_or_else(|| AppError::BadRequest("Missing Stripe signature".to_string()))?;
40
41 // Parse and verify the webhook
42 let payload = std::str::from_utf8(&body)
43 .map_err(|_| AppError::BadRequest("Invalid payload encoding".to_string()))?;
44
45 let event = stripe.verify_webhook(payload, signature)?;
46 tracing::info!(event_type = %event.type_, event_id = %event.id, "received webhook event");
47
48 // Deduplicate: skip if we already processed this event ID
49 match db::webhook_events::try_mark_event_processed(&state.db, &event.id).await {
50 Ok(false) => {
51 tracing::info!(event_id = %event.id, "duplicate webhook event, skipping");
52 return Ok(StatusCode::OK);
53 }
54 Err(e) => {
55 // Dedup check failed — return 503 so Stripe retries later.
56 // Processing without dedup risks double-credit or double-refund.
57 tracing::error!(event_id = %event.id, error = ?e, "webhook dedup check failed, returning 503 for retry");
58 return Ok(StatusCode::SERVICE_UNAVAILABLE);
59 }
60 Ok(true) => {} // First time seeing this event
61 }
62
63 // For retry-queue persistence we need id+type after `event` is consumed.
64 // Move both out without cloning the underlying allocations.
65 let UntypedEvent { id: event_id, type_: event_type_str, data_object } = event;
66 let result = process_webhook_event(&state, &event_type_str, &event_id, data_object).await;
67
68 if let Err(ref e) = result {
69 tracing::error!(
70 event_id = %event_id, event_type = %event_type_str,
71 error = ?e, "webhook handler failed, queueing for retry"
72 );
73 if let Err(queue_err) = db::webhook_events::insert_failed_event(
74 &state.db,
75 "stripe",
76 &event_type_str,
77 payload,
78 Some(signature),
79 &format!("{:?}", e),
80 ).await {
81 // Handler failed AND we couldn't enqueue for retry. The dedup
82 // row was already committed before processing, so a plain 503
83 // wouldn't help: Stripe's redelivery would short-circuit at the
84 // dedup check above and 200 the event without processing it.
85 // Roll back the dedup mark so redelivery actually runs.
86 if let Err(unmark_err) = db::webhook_events::unmark_event_processed(&state.db, &event_id).await {
87 tracing::error!(error = ?unmark_err, "failed to unmark webhook event; redelivery will short-circuit");
88 }
89 tracing::error!(error = ?queue_err, "failed to queue webhook event for retry; returning 503 to trigger Stripe redelivery");
90 return Ok(StatusCode::SERVICE_UNAVAILABLE);
91 }
92 }
93
94 Ok(StatusCode::OK)
95 }
96
97 /// Process a verified Stripe webhook event. Extracted to allow the caller
98 /// to catch errors and persist to the retry queue. Also called by the
99 /// scheduler's webhook retry worker.
100 /// Dispatch a verified Stripe webhook event. Consumes `data_object` exactly
101 /// once into a typed rc.5 struct based on `event_type`. Shared by the live
102 /// webhook handler and the scheduler retry worker.
103 pub(crate) async fn process_webhook_event(
104 state: &AppState,
105 event_type: &str,
106 event_id: &str,
107 data_object: serde_json::Value,
108 ) -> Result<()> {
109 match event_type {
110 "checkout.session.completed" => {
111 let session: CheckoutSessionView = serde_json::from_value(data_object)
112 .map_err(|e| AppError::BadRequest(format!("Failed to parse CheckoutSession: {e}")))?;
113 let meta = session.metadata.as_ref();
114 if payments::is_fan_plus_checkout(meta) {
115 checkout::handle_fan_plus_checkout_completed(state, &session, event_id).await?;
116 } else if payments::is_creator_tier_checkout(meta) {
117 checkout::handle_creator_tier_checkout_completed(state, &session, event_id).await?;
118 } else if payments::is_synckit_app_sub_checkout(meta) {
119 checkout::handle_synckit_app_sub_checkout_completed(state, &session, event_id).await?;
120 } else if payments::is_tip_checkout(meta) {
121 checkout::handle_tip_checkout_completed(state, &session, event_id).await?;
122 } else if payments::is_subscription_checkout(meta) {
123 checkout::handle_subscription_checkout_completed(state, &session, event_id).await?;
124 } else if payments::is_guest_checkout(meta) {
125 checkout::handle_guest_checkout_completed(state, &session, event_id).await?;
126 } else if payments::is_cart_checkout(meta) {
127 checkout::handle_cart_checkout_completed(state, &session, event_id).await?;
128 } else {
129 checkout::handle_purchase_checkout_completed(state, &session, event_id).await?;
130 }
131 }
132 "account.updated" => {
133 let account: AccountView = serde_json::from_value(data_object)
134 .map_err(|e| AppError::BadRequest(format!("Failed to parse Account: {e}")))?;
135 handle_account_updated(state, &AccountUpdate::from(account)).await?;
136 }
137 "charge.refunded" => {
138 let charge: ChargeView = serde_json::from_value(data_object)
139 .map_err(|e| AppError::BadRequest(format!("Failed to parse Charge: {e}")))?;
140 if let Some(refund_data) = ChargeRefundData::from_view(charge) {
141 billing::handle_charge_refunded(state, &refund_data).await?;
142 }
143 }
144 "customer.subscription.updated" => {
145 let sub: SubscriptionView = serde_json::from_value(data_object)
146 .map_err(|e| AppError::BadRequest(format!("Failed to parse Subscription: {e}")))?;
147 subscriptions::handle_subscription_updated(state, &sub, event_id).await?;
148 }
149 "customer.subscription.deleted" => {
150 let sub: SubscriptionView = serde_json::from_value(data_object)
151 .map_err(|e| AppError::BadRequest(format!("Failed to parse Subscription: {e}")))?;
152 subscriptions::handle_subscription_deleted(state, &sub, event_id).await?;
153 }
154 "invoice.payment_succeeded" => {
155 let invoice: InvoiceView = serde_json::from_value(data_object)
156 .map_err(|e| AppError::BadRequest(format!("Failed to parse Invoice: {e}")))?;
157 billing::handle_invoice_payment_succeeded(state, &invoice, event_id).await?;
158 }
159 "invoice.payment_failed" => {
160 let invoice: InvoiceView = serde_json::from_value(data_object)
161 .map_err(|e| AppError::BadRequest(format!("Failed to parse Invoice: {e}")))?;
162 billing::handle_invoice_payment_failed(state, &invoice, event_id).await?;
163 }
164 other => {
165 tracing::debug!(event_type = %other, "unhandled webhook event type");
166 }
167 }
168
169 Ok(())
170 }
171
172 /// Handle account.updated from the v2 thin event endpoint.
173 pub(in crate::routes::stripe) async fn handle_account_updated_from_v2(
174 state: &AppState,
175 update: &AccountUpdate,
176 ) -> Result<()> {
177 handle_account_updated(state, update).await
178 }
179
180 /// Handle account.updated webhook
181 async fn handle_account_updated(
182 state: &AppState,
183 update: &AccountUpdate,
184 ) -> Result<()> {
185 tracing::info!(
186 account_id = %update.account_id, charges_enabled = %update.charges_enabled,
187 payouts_enabled = %update.payouts_enabled, details_submitted = %update.details_submitted,
188 "account updated"
189 );
190
191 // Update the user's Stripe status
192 db::users::update_user_stripe_status(
193 &state.db,
194 &update.account_id,
195 update.details_submitted,
196 update.payouts_enabled,
197 update.charges_enabled,
198 ).await
199 .with_context(|| format!("update Stripe status for account {}", update.account_id))?;
200
201 // Alert if charges or payouts became disabled (creator can't receive payments)
202 if (!update.charges_enabled || !update.payouts_enabled)
203 && let Some(ref wam) = state.wam
204 {
205 let title = format!("Stripe Connect degraded: {}", update.account_id);
206 let body = format!(
207 "charges_enabled: {}\npayouts_enabled: {}\ndetails_submitted: {}",
208 update.charges_enabled, update.payouts_enabled, update.details_submitted,
209 );
210 wam.create_ticket(&title, Some(&body), "high", "stripe-connect-degraded", Some(&update.account_id)).await;
211 }
212
213 Ok(())
214 }
215