| 1 |
|
| 2 |
|
| 3 |
|
| 4 |
|
| 5 |
|
| 6 |
|
| 7 |
|
| 8 |
use axum::{ |
| 9 |
body::Bytes, |
| 10 |
extract::State, |
| 11 |
http::{header::HeaderMap, StatusCode}, |
| 12 |
response::IntoResponse, |
| 13 |
}; |
| 14 |
|
| 15 |
use crate::{ |
| 16 |
db, |
| 17 |
error::{AppError, Result}, |
| 18 |
payments::{self, ThinEvent}, |
| 19 |
AppState, |
| 20 |
}; |
| 21 |
|
| 22 |
|
| 23 |
#[tracing::instrument(skip_all, name = "stripe::webhook_v2")] |
| 24 |
pub(super) async fn webhook_v2( |
| 25 |
State(state): State<AppState>, |
| 26 |
headers: HeaderMap, |
| 27 |
body: Bytes, |
| 28 |
) -> Result<impl IntoResponse> { |
| 29 |
let stripe = state.stripe.as_ref() |
| 30 |
.ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?; |
| 31 |
|
| 32 |
let signature = headers |
| 33 |
.get("stripe-signature") |
| 34 |
.and_then(|v| v.to_str().ok()) |
| 35 |
.ok_or_else(|| AppError::BadRequest("Missing Stripe signature".to_string()))?; |
| 36 |
|
| 37 |
let payload = std::str::from_utf8(&body) |
| 38 |
.map_err(|_| AppError::BadRequest("Invalid payload encoding".to_string()))?; |
| 39 |
|
| 40 |
|
| 41 |
let body_json = stripe.verify_webhook_v2(payload, signature)?; |
| 42 |
|
| 43 |
|
| 44 |
let thin: ThinEvent = serde_json::from_value(body_json).map_err(|e| { |
| 45 |
tracing::warn!(error = ?e, "failed to parse v2 thin event"); |
| 46 |
AppError::BadRequest("Invalid v2 event format".to_string()) |
| 47 |
})?; |
| 48 |
|
| 49 |
tracing::info!(event_type = %thin.event_type, event_id = %thin.id, "received v2 thin event"); |
| 50 |
|
| 51 |
|
| 52 |
match db::webhook_events::try_mark_event_processed(&state.db, &thin.id).await { |
| 53 |
Ok(true) => {} |
| 54 |
Ok(false) => { |
| 55 |
tracing::debug!(event_id = %thin.id, "v2 event already processed, skipping"); |
| 56 |
return Ok(StatusCode::OK); |
| 57 |
} |
| 58 |
Err(e) => { |
| 59 |
|
| 60 |
tracing::error!(event_id = %thin.id, error = ?e, "v2 dedup check failed, returning 503 for retry"); |
| 61 |
return Ok(StatusCode::SERVICE_UNAVAILABLE); |
| 62 |
} |
| 63 |
} |
| 64 |
|
| 65 |
|
| 66 |
if thin.event_type.starts_with("v2.core.account") { |
| 67 |
if let Err(e) = handle_account_thin_event(&state, stripe.as_ref(), &thin).await { |
| 68 |
|
| 69 |
tracing::warn!(event_id = %thin.id, error = ?e, "v2 event processing failed, unmarking for retry"); |
| 70 |
let _ = db::webhook_events::unmark_event_processed(&state.db, &thin.id).await; |
| 71 |
return Err(e); |
| 72 |
} |
| 73 |
} else { |
| 74 |
tracing::debug!(event_type = %thin.event_type, "unhandled v2 event type"); |
| 75 |
} |
| 76 |
|
| 77 |
Ok(StatusCode::OK) |
| 78 |
} |
| 79 |
|
| 80 |
|
| 81 |
async fn handle_account_thin_event( |
| 82 |
state: &AppState, |
| 83 |
stripe: &dyn payments::PaymentProvider, |
| 84 |
thin: &ThinEvent, |
| 85 |
) -> Result<()> { |
| 86 |
let account_id = match &thin.related_object { |
| 87 |
Some(obj) => &obj.id, |
| 88 |
None => { |
| 89 |
tracing::warn!(event_id = %thin.id, "v2 account event missing related_object"); |
| 90 |
return Ok(()); |
| 91 |
} |
| 92 |
}; |
| 93 |
|
| 94 |
let update = stripe.fetch_account(account_id).await.map_err(|e| { |
| 95 |
tracing::warn!(account_id = %account_id, error = ?e, "failed to fetch account for v2 event"); |
| 96 |
e |
| 97 |
})?; |
| 98 |
|
| 99 |
super::webhook::handle_account_updated_from_v2(state, &update).await.map_err(|e| { |
| 100 |
tracing::warn!(account_id = %account_id, error = ?e, "failed to process account update from v2 event"); |
| 101 |
e |
| 102 |
})?; |
| 103 |
|
| 104 |
Ok(()) |
| 105 |
} |
| 106 |
|