| 1 |
|
| 2 |
|
| 3 |
mod billing; |
| 4 |
mod checkout; |
| 5 |
mod checkout_helpers; |
| 6 |
mod subscriptions; |
| 7 |
|
| 8 |
use axum::{ |
| 9 |
body::Bytes, |
| 10 |
extract::State, |
| 11 |
http::{header::HeaderMap, StatusCode}, |
| 12 |
response::IntoResponse, |
| 13 |
}; |
| 14 |
|
| 15 |
use crate::{ |
| 16 |
db, |
| 17 |
error::{AppError, Result, ResultExt}, |
| 18 |
payments::{ |
| 19 |
self, AccountUpdate, AccountView, ChargeRefundData, ChargeView, |
| 20 |
CheckoutSessionView, InvoiceView, SubscriptionView, UntypedEvent, |
| 21 |
}, |
| 22 |
AppState, |
| 23 |
}; |
| 24 |
|
| 25 |
|
| 26 |
#[tracing::instrument(skip_all, name = "stripe::webhook")] |
| 27 |
pub(in crate::routes::stripe) async fn webhook( |
| 28 |
State(state): State<AppState>, |
| 29 |
headers: HeaderMap, |
| 30 |
body: Bytes, |
| 31 |
) -> Result<impl IntoResponse> { |
| 32 |
let stripe = state.stripe.as_ref() |
| 33 |
.ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?; |
| 34 |
|
| 35 |
|
| 36 |
let signature = headers |
| 37 |
.get("stripe-signature") |
| 38 |
.and_then(|v| v.to_str().ok()) |
| 39 |
.ok_or_else(|| AppError::BadRequest("Missing Stripe signature".to_string()))?; |
| 40 |
|
| 41 |
|
| 42 |
let payload = std::str::from_utf8(&body) |
| 43 |
.map_err(|_| AppError::BadRequest("Invalid payload encoding".to_string()))?; |
| 44 |
|
| 45 |
let event = stripe.verify_webhook(payload, signature)?; |
| 46 |
tracing::info!(event_type = %event.type_, event_id = %event.id, "received webhook event"); |
| 47 |
|
| 48 |
|
| 49 |
match db::webhook_events::try_mark_event_processed(&state.db, &event.id).await { |
| 50 |
Ok(false) => { |
| 51 |
tracing::info!(event_id = %event.id, "duplicate webhook event, skipping"); |
| 52 |
return Ok(StatusCode::OK); |
| 53 |
} |
| 54 |
Err(e) => { |
| 55 |
|
| 56 |
|
| 57 |
tracing::error!(event_id = %event.id, error = ?e, "webhook dedup check failed, returning 503 for retry"); |
| 58 |
return Ok(StatusCode::SERVICE_UNAVAILABLE); |
| 59 |
} |
| 60 |
Ok(true) => {} |
| 61 |
} |
| 62 |
|
| 63 |
|
| 64 |
|
| 65 |
let UntypedEvent { id: event_id, type_: event_type_str, data_object } = event; |
| 66 |
let result = process_webhook_event(&state, &event_type_str, &event_id, data_object).await; |
| 67 |
|
| 68 |
if let Err(ref e) = result { |
| 69 |
tracing::error!( |
| 70 |
event_id = %event_id, event_type = %event_type_str, |
| 71 |
error = ?e, "webhook handler failed, queueing for retry" |
| 72 |
); |
| 73 |
if let Err(queue_err) = db::webhook_events::insert_failed_event( |
| 74 |
&state.db, |
| 75 |
"stripe", |
| 76 |
&event_type_str, |
| 77 |
payload, |
| 78 |
Some(signature), |
| 79 |
&format!("{:?}", e), |
| 80 |
).await { |
| 81 |
|
| 82 |
|
| 83 |
|
| 84 |
|
| 85 |
|
| 86 |
if let Err(unmark_err) = db::webhook_events::unmark_event_processed(&state.db, &event_id).await { |
| 87 |
tracing::error!(error = ?unmark_err, "failed to unmark webhook event; redelivery will short-circuit"); |
| 88 |
} |
| 89 |
tracing::error!(error = ?queue_err, "failed to queue webhook event for retry; returning 503 to trigger Stripe redelivery"); |
| 90 |
return Ok(StatusCode::SERVICE_UNAVAILABLE); |
| 91 |
} |
| 92 |
} |
| 93 |
|
| 94 |
Ok(StatusCode::OK) |
| 95 |
} |
| 96 |
|
| 97 |
|
| 98 |
|
| 99 |
|
| 100 |
|
| 101 |
|
| 102 |
|
| 103 |
pub(crate) async fn process_webhook_event( |
| 104 |
state: &AppState, |
| 105 |
event_type: &str, |
| 106 |
event_id: &str, |
| 107 |
data_object: serde_json::Value, |
| 108 |
) -> Result<()> { |
| 109 |
match event_type { |
| 110 |
"checkout.session.completed" => { |
| 111 |
let session: CheckoutSessionView = serde_json::from_value(data_object) |
| 112 |
.map_err(|e| AppError::BadRequest(format!("Failed to parse CheckoutSession: {e}")))?; |
| 113 |
let meta = session.metadata.as_ref(); |
| 114 |
if payments::is_fan_plus_checkout(meta) { |
| 115 |
checkout::handle_fan_plus_checkout_completed(state, &session, event_id).await?; |
| 116 |
} else if payments::is_creator_tier_checkout(meta) { |
| 117 |
checkout::handle_creator_tier_checkout_completed(state, &session, event_id).await?; |
| 118 |
} else if payments::is_synckit_app_sub_checkout(meta) { |
| 119 |
checkout::handle_synckit_app_sub_checkout_completed(state, &session, event_id).await?; |
| 120 |
} else if payments::is_tip_checkout(meta) { |
| 121 |
checkout::handle_tip_checkout_completed(state, &session, event_id).await?; |
| 122 |
} else if payments::is_subscription_checkout(meta) { |
| 123 |
checkout::handle_subscription_checkout_completed(state, &session, event_id).await?; |
| 124 |
} else if payments::is_guest_checkout(meta) { |
| 125 |
checkout::handle_guest_checkout_completed(state, &session, event_id).await?; |
| 126 |
} else if payments::is_cart_checkout(meta) { |
| 127 |
checkout::handle_cart_checkout_completed(state, &session, event_id).await?; |
| 128 |
} else { |
| 129 |
checkout::handle_purchase_checkout_completed(state, &session, event_id).await?; |
| 130 |
} |
| 131 |
} |
| 132 |
"account.updated" => { |
| 133 |
let account: AccountView = serde_json::from_value(data_object) |
| 134 |
.map_err(|e| AppError::BadRequest(format!("Failed to parse Account: {e}")))?; |
| 135 |
handle_account_updated(state, &AccountUpdate::from(account)).await?; |
| 136 |
} |
| 137 |
"charge.refunded" => { |
| 138 |
let charge: ChargeView = serde_json::from_value(data_object) |
| 139 |
.map_err(|e| AppError::BadRequest(format!("Failed to parse Charge: {e}")))?; |
| 140 |
if let Some(refund_data) = ChargeRefundData::from_view(charge) { |
| 141 |
billing::handle_charge_refunded(state, &refund_data).await?; |
| 142 |
} |
| 143 |
} |
| 144 |
"customer.subscription.updated" => { |
| 145 |
let sub: SubscriptionView = serde_json::from_value(data_object) |
| 146 |
.map_err(|e| AppError::BadRequest(format!("Failed to parse Subscription: {e}")))?; |
| 147 |
subscriptions::handle_subscription_updated(state, &sub, event_id).await?; |
| 148 |
} |
| 149 |
"customer.subscription.deleted" => { |
| 150 |
let sub: SubscriptionView = serde_json::from_value(data_object) |
| 151 |
.map_err(|e| AppError::BadRequest(format!("Failed to parse Subscription: {e}")))?; |
| 152 |
subscriptions::handle_subscription_deleted(state, &sub, event_id).await?; |
| 153 |
} |
| 154 |
"invoice.payment_succeeded" => { |
| 155 |
let invoice: InvoiceView = serde_json::from_value(data_object) |
| 156 |
.map_err(|e| AppError::BadRequest(format!("Failed to parse Invoice: {e}")))?; |
| 157 |
billing::handle_invoice_payment_succeeded(state, &invoice, event_id).await?; |
| 158 |
} |
| 159 |
"invoice.payment_failed" => { |
| 160 |
let invoice: InvoiceView = serde_json::from_value(data_object) |
| 161 |
.map_err(|e| AppError::BadRequest(format!("Failed to parse Invoice: {e}")))?; |
| 162 |
billing::handle_invoice_payment_failed(state, &invoice, event_id).await?; |
| 163 |
} |
| 164 |
other => { |
| 165 |
tracing::debug!(event_type = %other, "unhandled webhook event type"); |
| 166 |
} |
| 167 |
} |
| 168 |
|
| 169 |
Ok(()) |
| 170 |
} |
| 171 |
|
| 172 |
|
| 173 |
pub(in crate::routes::stripe) async fn handle_account_updated_from_v2( |
| 174 |
state: &AppState, |
| 175 |
update: &AccountUpdate, |
| 176 |
) -> Result<()> { |
| 177 |
handle_account_updated(state, update).await |
| 178 |
} |
| 179 |
|
| 180 |
|
| 181 |
async fn handle_account_updated( |
| 182 |
state: &AppState, |
| 183 |
update: &AccountUpdate, |
| 184 |
) -> Result<()> { |
| 185 |
tracing::info!( |
| 186 |
account_id = %update.account_id, charges_enabled = %update.charges_enabled, |
| 187 |
payouts_enabled = %update.payouts_enabled, details_submitted = %update.details_submitted, |
| 188 |
"account updated" |
| 189 |
); |
| 190 |
|
| 191 |
|
| 192 |
db::users::update_user_stripe_status( |
| 193 |
&state.db, |
| 194 |
&update.account_id, |
| 195 |
update.details_submitted, |
| 196 |
update.payouts_enabled, |
| 197 |
update.charges_enabled, |
| 198 |
).await |
| 199 |
.with_context(|| format!("update Stripe status for account {}", update.account_id))?; |
| 200 |
|
| 201 |
|
| 202 |
if (!update.charges_enabled || !update.payouts_enabled) |
| 203 |
&& let Some(ref wam) = state.wam |
| 204 |
{ |
| 205 |
let title = format!("Stripe Connect degraded: {}", update.account_id); |
| 206 |
let body = format!( |
| 207 |
"charges_enabled: {}\npayouts_enabled: {}\ndetails_submitted: {}", |
| 208 |
update.charges_enabled, update.payouts_enabled, update.details_submitted, |
| 209 |
); |
| 210 |
wam.create_ticket(&title, Some(&body), "high", "stripe-connect-degraded", Some(&update.account_id)).await; |
| 211 |
} |
| 212 |
|
| 213 |
Ok(()) |
| 214 |
} |
| 215 |
|