//! Stripe webhook event processing. mod billing; mod checkout; mod checkout_helpers; mod subscriptions; use axum::{ body::Bytes, extract::State, http::{header::HeaderMap, StatusCode}, response::IntoResponse, }; use crate::{ db, error::{AppError, Result, ResultExt}, payments::{ self, AccountUpdate, AccountView, ChargeRefundData, ChargeView, CheckoutSessionView, InvoiceView, SubscriptionView, UntypedEvent, }, AppState, }; /// POST /stripe/webhook - Handle Stripe webhook events #[tracing::instrument(skip_all, name = "stripe::webhook")] pub(in crate::routes::stripe) async fn webhook( State(state): State, headers: HeaderMap, body: Bytes, ) -> Result { let stripe = state.stripe.as_ref() .ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?; // Get the signature header let signature = headers .get("stripe-signature") .and_then(|v| v.to_str().ok()) .ok_or_else(|| AppError::BadRequest("Missing Stripe signature".to_string()))?; // Parse and verify the webhook let payload = std::str::from_utf8(&body) .map_err(|_| AppError::BadRequest("Invalid payload encoding".to_string()))?; let event = stripe.verify_webhook(payload, signature)?; tracing::info!(event_type = %event.type_, event_id = %event.id, "received webhook event"); // Deduplicate: skip if we already processed this event ID match db::webhook_events::try_mark_event_processed(&state.db, &event.id).await { Ok(false) => { tracing::info!(event_id = %event.id, "duplicate webhook event, skipping"); return Ok(StatusCode::OK); } Err(e) => { // Dedup check failed — return 503 so Stripe retries later. // Processing without dedup risks double-credit or double-refund. tracing::error!(event_id = %event.id, error = ?e, "webhook dedup check failed, returning 503 for retry"); return Ok(StatusCode::SERVICE_UNAVAILABLE); } Ok(true) => {} // First time seeing this event } // For retry-queue persistence we need id+type after `event` is consumed. // Move both out without cloning the underlying allocations. let UntypedEvent { id: event_id, type_: event_type_str, data_object } = event; let result = process_webhook_event(&state, &event_type_str, &event_id, data_object).await; if let Err(ref e) = result { tracing::error!( event_id = %event_id, event_type = %event_type_str, error = ?e, "webhook handler failed, queueing for retry" ); if let Err(queue_err) = db::webhook_events::insert_failed_event( &state.db, "stripe", &event_type_str, payload, Some(signature), &format!("{:?}", e), ).await { // Handler failed AND we couldn't enqueue for retry. The dedup // row was already committed before processing, so a plain 503 // wouldn't help: Stripe's redelivery would short-circuit at the // dedup check above and 200 the event without processing it. // Roll back the dedup mark so redelivery actually runs. if let Err(unmark_err) = db::webhook_events::unmark_event_processed(&state.db, &event_id).await { tracing::error!(error = ?unmark_err, "failed to unmark webhook event; redelivery will short-circuit"); } tracing::error!(error = ?queue_err, "failed to queue webhook event for retry; returning 503 to trigger Stripe redelivery"); return Ok(StatusCode::SERVICE_UNAVAILABLE); } } Ok(StatusCode::OK) } /// Process a verified Stripe webhook event. Extracted to allow the caller /// to catch errors and persist to the retry queue. Also called by the /// scheduler's webhook retry worker. /// Dispatch a verified Stripe webhook event. Consumes `data_object` exactly /// once into a typed rc.5 struct based on `event_type`. Shared by the live /// webhook handler and the scheduler retry worker. pub(crate) async fn process_webhook_event( state: &AppState, event_type: &str, event_id: &str, data_object: serde_json::Value, ) -> Result<()> { match event_type { "checkout.session.completed" => { let session: CheckoutSessionView = serde_json::from_value(data_object) .map_err(|e| AppError::BadRequest(format!("Failed to parse CheckoutSession: {e}")))?; let meta = session.metadata.as_ref(); if payments::is_fan_plus_checkout(meta) { checkout::handle_fan_plus_checkout_completed(state, &session, event_id).await?; } else if payments::is_creator_tier_checkout(meta) { checkout::handle_creator_tier_checkout_completed(state, &session, event_id).await?; } else if payments::is_synckit_app_sub_checkout(meta) { checkout::handle_synckit_app_sub_checkout_completed(state, &session, event_id).await?; } else if payments::is_tip_checkout(meta) { checkout::handle_tip_checkout_completed(state, &session, event_id).await?; } else if payments::is_subscription_checkout(meta) { checkout::handle_subscription_checkout_completed(state, &session, event_id).await?; } else if payments::is_guest_checkout(meta) { checkout::handle_guest_checkout_completed(state, &session, event_id).await?; } else if payments::is_cart_checkout(meta) { checkout::handle_cart_checkout_completed(state, &session, event_id).await?; } else { checkout::handle_purchase_checkout_completed(state, &session, event_id).await?; } } "account.updated" => { let account: AccountView = serde_json::from_value(data_object) .map_err(|e| AppError::BadRequest(format!("Failed to parse Account: {e}")))?; handle_account_updated(state, &AccountUpdate::from(account)).await?; } "charge.refunded" => { let charge: ChargeView = serde_json::from_value(data_object) .map_err(|e| AppError::BadRequest(format!("Failed to parse Charge: {e}")))?; if let Some(refund_data) = ChargeRefundData::from_view(charge) { billing::handle_charge_refunded(state, &refund_data).await?; } } "customer.subscription.updated" => { let sub: SubscriptionView = serde_json::from_value(data_object) .map_err(|e| AppError::BadRequest(format!("Failed to parse Subscription: {e}")))?; subscriptions::handle_subscription_updated(state, &sub, event_id).await?; } "customer.subscription.deleted" => { let sub: SubscriptionView = serde_json::from_value(data_object) .map_err(|e| AppError::BadRequest(format!("Failed to parse Subscription: {e}")))?; subscriptions::handle_subscription_deleted(state, &sub, event_id).await?; } "invoice.payment_succeeded" => { let invoice: InvoiceView = serde_json::from_value(data_object) .map_err(|e| AppError::BadRequest(format!("Failed to parse Invoice: {e}")))?; billing::handle_invoice_payment_succeeded(state, &invoice, event_id).await?; } "invoice.payment_failed" => { let invoice: InvoiceView = serde_json::from_value(data_object) .map_err(|e| AppError::BadRequest(format!("Failed to parse Invoice: {e}")))?; billing::handle_invoice_payment_failed(state, &invoice, event_id).await?; } other => { tracing::debug!(event_type = %other, "unhandled webhook event type"); } } Ok(()) } /// Handle account.updated from the v2 thin event endpoint. pub(in crate::routes::stripe) async fn handle_account_updated_from_v2( state: &AppState, update: &AccountUpdate, ) -> Result<()> { handle_account_updated(state, update).await } /// Handle account.updated webhook async fn handle_account_updated( state: &AppState, update: &AccountUpdate, ) -> Result<()> { tracing::info!( account_id = %update.account_id, charges_enabled = %update.charges_enabled, payouts_enabled = %update.payouts_enabled, details_submitted = %update.details_submitted, "account updated" ); // Update the user's Stripe status db::users::update_user_stripe_status( &state.db, &update.account_id, update.details_submitted, update.payouts_enabled, update.charges_enabled, ).await .with_context(|| format!("update Stripe status for account {}", update.account_id))?; // Alert if charges or payouts became disabled (creator can't receive payments) if (!update.charges_enabled || !update.payouts_enabled) && let Some(ref wam) = state.wam { let title = format!("Stripe Connect degraded: {}", update.account_id); let body = format!( "charges_enabled: {}\npayouts_enabled: {}\ndetails_submitted: {}", update.charges_enabled, update.payouts_enabled, update.details_submitted, ); wam.create_ticket(&title, Some(&body), "high", "stripe-connect-degraded", Some(&update.account_id)).await; } Ok(()) }