Skip to main content

max / makenotwork

5.0 KB · 179 lines History Blame Raw
1 //! Webhook event retry queue; persist and retry failed webhook deliveries.
2
3 use sqlx::PgPool;
4 use chrono::{DateTime, Utc};
5 use crate::error::Result;
6
7 /// A failed webhook event pending retry.
8 #[derive(Debug, sqlx::FromRow)]
9 #[allow(dead_code)]
10 pub struct DbWebhookEvent {
11 pub id: uuid::Uuid,
12 pub source: String,
13 pub event_type: String,
14 pub payload: String,
15 pub signature: Option<String>,
16 pub status: String,
17 pub attempts: i32,
18 pub last_error: Option<String>,
19 pub next_retry_at: DateTime<Utc>,
20 pub created_at: DateTime<Utc>,
21 }
22
23 /// Try to mark a webhook event ID as processed. Returns `true` if this is the
24 /// first time the event has been seen, `false` if it was already processed.
25 /// Uses INSERT ... ON CONFLICT to atomically deduplicate.
26 #[tracing::instrument(skip_all)]
27 pub async fn try_mark_event_processed(pool: &PgPool, event_id: &str) -> Result<bool> {
28 let result = sqlx::query(
29 "INSERT INTO processed_webhook_events (event_id) VALUES ($1) ON CONFLICT DO NOTHING",
30 )
31 .bind(event_id)
32 .execute(pool)
33 .await?;
34
35 Ok(result.rows_affected() > 0)
36 }
37
38 /// Remove a webhook event from the processed set, allowing Stripe to retry delivery.
39 #[tracing::instrument(skip_all)]
40 pub async fn unmark_event_processed(pool: &PgPool, event_id: &str) -> Result<()> {
41 sqlx::query("DELETE FROM processed_webhook_events WHERE event_id = $1")
42 .bind(event_id)
43 .execute(pool)
44 .await?;
45 Ok(())
46 }
47
48 /// Insert a failed webhook event for later retry.
49 #[tracing::instrument(skip_all)]
50 pub async fn insert_failed_event(
51 pool: &PgPool,
52 source: &str,
53 event_type: &str,
54 payload: &str,
55 signature: Option<&str>,
56 error: &str,
57 ) -> Result<()> {
58 sqlx::query(
59 r#"INSERT INTO webhook_events (source, event_type, payload, signature, last_error)
60 VALUES ($1, $2, $3, $4, $5)"#,
61 )
62 .bind(source)
63 .bind(event_type)
64 .bind(payload)
65 .bind(signature)
66 .bind(error)
67 .execute(pool)
68 .await?;
69
70 Ok(())
71 }
72
73 /// Fetch events that are due for retry (status = failed/retrying, next_retry_at <= now).
74 /// Excludes events that have exhausted retries to prevent retry storms if the
75 /// `schedule_retry` dead-letter update fails.
76 /// Returns up to 10 at a time.
77 #[tracing::instrument(skip_all)]
78 pub async fn get_retryable_events(pool: &PgPool) -> Result<Vec<DbWebhookEvent>> {
79 let events = sqlx::query_as::<_, DbWebhookEvent>(
80 r#"SELECT * FROM webhook_events
81 WHERE status IN ('failed', 'retrying')
82 AND attempts < 5
83 AND next_retry_at <= NOW()
84 ORDER BY next_retry_at
85 LIMIT 10"#,
86 )
87 .fetch_all(pool)
88 .await?;
89
90 Ok(events)
91 }
92
93 /// Mark an event as successfully processed and delete it.
94 #[tracing::instrument(skip_all)]
95 pub async fn mark_processed(pool: &PgPool, id: uuid::Uuid) -> Result<()> {
96 sqlx::query("DELETE FROM webhook_events WHERE id = $1")
97 .bind(id)
98 .execute(pool)
99 .await?;
100
101 Ok(())
102 }
103
104 /// Increment attempts and schedule next retry with exponential backoff.
105 /// Backoff: 1m, 5m, 30m, 2h, 24h. After 5 attempts, mark as dead.
106 #[tracing::instrument(skip_all)]
107 pub async fn schedule_retry(
108 pool: &PgPool,
109 id: uuid::Uuid,
110 attempt: i32,
111 error: &str,
112 ) -> Result<()> {
113 let max_attempts = 5;
114
115 if attempt >= max_attempts {
116 sqlx::query(
117 "UPDATE webhook_events SET status = 'dead', attempts = $2, last_error = $3 WHERE id = $1",
118 )
119 .bind(id)
120 .bind(attempt)
121 .bind(error)
122 .execute(pool)
123 .await?;
124 } else {
125 // Exponential backoff: 60s, 300s, 1800s, 7200s, 86400s
126 let delay_secs: i64 = match attempt {
127 0 => 60,
128 1 => 300,
129 2 => 1800,
130 3 => 7200,
131 _ => 86400,
132 };
133
134 sqlx::query(
135 r#"UPDATE webhook_events
136 SET status = 'retrying',
137 attempts = $2,
138 last_error = $3,
139 next_retry_at = NOW() + make_interval(secs => $4::double precision)
140 WHERE id = $1"#,
141 )
142 .bind(id)
143 .bind(attempt)
144 .bind(error)
145 .bind(delay_secs as f64)
146 .execute(pool)
147 .await?;
148 }
149
150 Ok(())
151 }
152
153 /// Get dead events for admin review.
154 #[allow(dead_code)]
155 #[tracing::instrument(skip_all)]
156 pub async fn get_dead_events(pool: &PgPool) -> Result<Vec<DbWebhookEvent>> {
157 let events = sqlx::query_as::<_, DbWebhookEvent>(
158 "SELECT * FROM webhook_events WHERE status = 'dead' ORDER BY created_at DESC LIMIT 50",
159 )
160 .fetch_all(pool)
161 .await?;
162
163 Ok(events)
164 }
165
166 /// Reset a dead event for retry.
167 #[allow(dead_code)]
168 #[tracing::instrument(skip_all)]
169 pub async fn retry_dead_event(pool: &PgPool, id: uuid::Uuid) -> Result<bool> {
170 let result = sqlx::query(
171 "UPDATE webhook_events SET status = 'failed', next_retry_at = NOW() WHERE id = $1 AND status = 'dead'",
172 )
173 .bind(id)
174 .execute(pool)
175 .await?;
176
177 Ok(result.rows_affected() > 0)
178 }
179