//! Pending refunds queue for out-of-order webhook delivery. //! //! When a `charge.refunded` webhook arrives before its matching //! `checkout.session.completed`, the refund data is stored here. //! The scheduler and checkout handler both check for pending matches. use sqlx::PgPool; use super::validated_types::Cents; use crate::error::Result; /// Insert a pending refund for later matching. Deduplicates on /// `payment_intent_id`; if a pending (unmatched) refund already exists /// for this payment intent, the insert is silently skipped. pub async fn insert_pending_refund( pool: &PgPool, payment_intent_id: &str, amount: i64, amount_refunded: i64, ) -> Result<()> { sqlx::query( r#" INSERT INTO pending_refunds (payment_intent_id, amount, amount_refunded) VALUES ($1, $2, $3) ON CONFLICT (payment_intent_id) WHERE matched_at IS NULL DO NOTHING "#, ) .bind(payment_intent_id) .bind(amount) .bind(amount_refunded) .execute(pool) .await?; Ok(()) } /// Row from the pending_refunds table. #[derive(Debug, sqlx::FromRow)] pub struct PendingRefund { pub id: uuid::Uuid, pub payment_intent_id: String, pub amount: Cents, pub amount_refunded: Cents, } /// Claim a pending refund matching a payment intent ID. /// /// Atomically marks it as matched (so it is only processed once). /// Returns `None` if no unmatched pending refund exists. pub async fn claim_pending_refund( pool: &PgPool, payment_intent_id: &str, ) -> Result> { let row = sqlx::query_as::<_, PendingRefund>( r#" UPDATE pending_refunds SET matched_at = NOW() WHERE id = ( SELECT id FROM pending_refunds WHERE payment_intent_id = $1 AND matched_at IS NULL LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING id, payment_intent_id, amount, amount_refunded "#, ) .bind(payment_intent_id) .fetch_optional(pool) .await?; Ok(row) } /// Row for stale pending refunds that need escalation. #[derive(Debug, sqlx::FromRow)] pub struct StaleRefund { pub id: uuid::Uuid, pub payment_intent_id: String, pub amount: Cents, pub amount_refunded: Cents, pub created_at: chrono::DateTime, } /// Get pending refunds older than `age` that have not been matched or escalated. pub async fn get_stale_refunds( pool: &PgPool, age: chrono::Duration, ) -> Result> { let cutoff = chrono::Utc::now() - age; let rows = sqlx::query_as::<_, StaleRefund>( r#" SELECT id, payment_intent_id, amount, amount_refunded, created_at FROM pending_refunds WHERE matched_at IS NULL AND escalated_at IS NULL AND created_at < $1 ORDER BY created_at "#, ) .bind(cutoff) .fetch_all(pool) .await?; Ok(rows) } /// Mark a pending refund as escalated (alert sent, won't be re-alerted). pub async fn mark_escalated(pool: &PgPool, id: uuid::Uuid) -> Result<()> { sqlx::query("UPDATE pending_refunds SET escalated_at = NOW() WHERE id = $1") .bind(id) .execute(pool) .await?; Ok(()) }