Skip to main content

max / makenotwork

3.2 KB · 116 lines History Blame Raw
1 //! Pending refunds queue for out-of-order webhook delivery.
2 //!
3 //! When a `charge.refunded` webhook arrives before its matching
4 //! `checkout.session.completed`, the refund data is stored here.
5 //! The scheduler and checkout handler both check for pending matches.
6
7 use sqlx::PgPool;
8
9 use super::validated_types::Cents;
10
11 use crate::error::Result;
12
13 /// Insert a pending refund for later matching. Deduplicates on
14 /// `payment_intent_id`; if a pending (unmatched) refund already exists
15 /// for this payment intent, the insert is silently skipped.
16 pub async fn insert_pending_refund(
17 pool: &PgPool,
18 payment_intent_id: &str,
19 amount: i64,
20 amount_refunded: i64,
21 ) -> Result<()> {
22 sqlx::query(
23 r#"
24 INSERT INTO pending_refunds (payment_intent_id, amount, amount_refunded)
25 VALUES ($1, $2, $3)
26 ON CONFLICT (payment_intent_id) WHERE matched_at IS NULL DO NOTHING
27 "#,
28 )
29 .bind(payment_intent_id)
30 .bind(amount)
31 .bind(amount_refunded)
32 .execute(pool)
33 .await?;
34
35 Ok(())
36 }
37
38 /// Row from the pending_refunds table.
39 #[derive(Debug, sqlx::FromRow)]
40 pub struct PendingRefund {
41 pub id: uuid::Uuid,
42 pub payment_intent_id: String,
43 pub amount: Cents,
44 pub amount_refunded: Cents,
45 }
46
47 /// Claim a pending refund matching a payment intent ID.
48 ///
49 /// Atomically marks it as matched (so it is only processed once).
50 /// Returns `None` if no unmatched pending refund exists.
51 pub async fn claim_pending_refund(
52 pool: &PgPool,
53 payment_intent_id: &str,
54 ) -> Result<Option<PendingRefund>> {
55 let row = sqlx::query_as::<_, PendingRefund>(
56 r#"
57 UPDATE pending_refunds
58 SET matched_at = NOW()
59 WHERE id = (
60 SELECT id FROM pending_refunds
61 WHERE payment_intent_id = $1 AND matched_at IS NULL
62 LIMIT 1
63 FOR UPDATE SKIP LOCKED
64 )
65 RETURNING id, payment_intent_id, amount, amount_refunded
66 "#,
67 )
68 .bind(payment_intent_id)
69 .fetch_optional(pool)
70 .await?;
71
72 Ok(row)
73 }
74
75 /// Row for stale pending refunds that need escalation.
76 #[derive(Debug, sqlx::FromRow)]
77 pub struct StaleRefund {
78 pub id: uuid::Uuid,
79 pub payment_intent_id: String,
80 pub amount: Cents,
81 pub amount_refunded: Cents,
82 pub created_at: chrono::DateTime<chrono::Utc>,
83 }
84
85 /// Get pending refunds older than `age` that have not been matched or escalated.
86 pub async fn get_stale_refunds(
87 pool: &PgPool,
88 age: chrono::Duration,
89 ) -> Result<Vec<StaleRefund>> {
90 let cutoff = chrono::Utc::now() - age;
91 let rows = sqlx::query_as::<_, StaleRefund>(
92 r#"
93 SELECT id, payment_intent_id, amount, amount_refunded, created_at
94 FROM pending_refunds
95 WHERE matched_at IS NULL
96 AND escalated_at IS NULL
97 AND created_at < $1
98 ORDER BY created_at
99 "#,
100 )
101 .bind(cutoff)
102 .fetch_all(pool)
103 .await?;
104
105 Ok(rows)
106 }
107
108 /// Mark a pending refund as escalated (alert sent, won't be re-alerted).
109 pub async fn mark_escalated(pool: &PgPool, id: uuid::Uuid) -> Result<()> {
110 sqlx::query("UPDATE pending_refunds SET escalated_at = NOW() WHERE id = $1")
111 .bind(id)
112 .execute(pool)
113 .await?;
114 Ok(())
115 }
116