//! Webhook event retry queue; persist and retry failed webhook deliveries. use sqlx::PgPool; use chrono::{DateTime, Utc}; use crate::error::Result; /// A failed webhook event pending retry. #[derive(Debug, sqlx::FromRow)] #[allow(dead_code)] pub struct DbWebhookEvent { pub id: uuid::Uuid, pub source: String, pub event_type: String, pub payload: String, pub signature: Option, pub status: String, pub attempts: i32, pub last_error: Option, pub next_retry_at: DateTime, pub created_at: DateTime, } /// Try to mark a webhook event ID as processed. Returns `true` if this is the /// first time the event has been seen, `false` if it was already processed. /// Uses INSERT ... ON CONFLICT to atomically deduplicate. #[tracing::instrument(skip_all)] pub async fn try_mark_event_processed(pool: &PgPool, event_id: &str) -> Result { let result = sqlx::query( "INSERT INTO processed_webhook_events (event_id) VALUES ($1) ON CONFLICT DO NOTHING", ) .bind(event_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Remove a webhook event from the processed set, allowing Stripe to retry delivery. #[tracing::instrument(skip_all)] pub async fn unmark_event_processed(pool: &PgPool, event_id: &str) -> Result<()> { sqlx::query("DELETE FROM processed_webhook_events WHERE event_id = $1") .bind(event_id) .execute(pool) .await?; Ok(()) } /// Insert a failed webhook event for later retry. #[tracing::instrument(skip_all)] pub async fn insert_failed_event( pool: &PgPool, source: &str, event_type: &str, payload: &str, signature: Option<&str>, error: &str, ) -> Result<()> { sqlx::query( r#"INSERT INTO webhook_events (source, event_type, payload, signature, last_error) VALUES ($1, $2, $3, $4, $5)"#, ) .bind(source) .bind(event_type) .bind(payload) .bind(signature) .bind(error) .execute(pool) .await?; Ok(()) } /// Fetch events that are due for retry (status = failed/retrying, next_retry_at <= now). /// Excludes events that have exhausted retries to prevent retry storms if the /// `schedule_retry` dead-letter update fails. /// Returns up to 10 at a time. #[tracing::instrument(skip_all)] pub async fn get_retryable_events(pool: &PgPool) -> Result> { let events = sqlx::query_as::<_, DbWebhookEvent>( r#"SELECT * FROM webhook_events WHERE status IN ('failed', 'retrying') AND attempts < 5 AND next_retry_at <= NOW() ORDER BY next_retry_at LIMIT 10"#, ) .fetch_all(pool) .await?; Ok(events) } /// Mark an event as successfully processed and delete it. #[tracing::instrument(skip_all)] pub async fn mark_processed(pool: &PgPool, id: uuid::Uuid) -> Result<()> { sqlx::query("DELETE FROM webhook_events WHERE id = $1") .bind(id) .execute(pool) .await?; Ok(()) } /// Increment attempts and schedule next retry with exponential backoff. /// Backoff: 1m, 5m, 30m, 2h, 24h. After 5 attempts, mark as dead. #[tracing::instrument(skip_all)] pub async fn schedule_retry( pool: &PgPool, id: uuid::Uuid, attempt: i32, error: &str, ) -> Result<()> { let max_attempts = 5; if attempt >= max_attempts { sqlx::query( "UPDATE webhook_events SET status = 'dead', attempts = $2, last_error = $3 WHERE id = $1", ) .bind(id) .bind(attempt) .bind(error) .execute(pool) .await?; } else { // Exponential backoff: 60s, 300s, 1800s, 7200s, 86400s let delay_secs: i64 = match attempt { 0 => 60, 1 => 300, 2 => 1800, 3 => 7200, _ => 86400, }; sqlx::query( r#"UPDATE webhook_events SET status = 'retrying', attempts = $2, last_error = $3, next_retry_at = NOW() + make_interval(secs => $4::double precision) WHERE id = $1"#, ) .bind(id) .bind(attempt) .bind(error) .bind(delay_secs as f64) .execute(pool) .await?; } Ok(()) } /// Get dead events for admin review. #[allow(dead_code)] #[tracing::instrument(skip_all)] pub async fn get_dead_events(pool: &PgPool) -> Result> { let events = sqlx::query_as::<_, DbWebhookEvent>( "SELECT * FROM webhook_events WHERE status = 'dead' ORDER BY created_at DESC LIMIT 50", ) .fetch_all(pool) .await?; Ok(events) } /// Reset a dead event for retry. #[allow(dead_code)] #[tracing::instrument(skip_all)] pub async fn retry_dead_event(pool: &PgPool, id: uuid::Uuid) -> Result { let result = sqlx::query( "UPDATE webhook_events SET status = 'failed', next_retry_at = NOW() WHERE id = $1 AND status = 'dead'", ) .bind(id) .execute(pool) .await?; Ok(result.rows_affected() > 0) }