Skip to main content

max / makenotwork

5.9 KB · 163 lines History Blame Raw
1 //! Periodic integrity checks: sales count drift, stale subscriptions, bounce spikes,
2 //! storage recalculation, and post-grace enforcement.
3
4 use crate::db;
5 use crate::AppState;
6
7 /// Weekly storage drift correction — batch recalculates storage_used_bytes for all creators.
8 pub(super) async fn recalculate_all_storage_used(state: &AppState) {
9 match db::creator_tiers::recalculate_all_storage_batch(&state.db).await {
10 Ok(corrected) => {
11 if corrected > 0 {
12 tracing::info!(corrected = corrected, "weekly storage drift correction complete");
13 }
14 }
15 Err(e) => {
16 tracing::error!(error = ?e, "weekly storage drift correction failed");
17 }
18 }
19 }
20
21 /// Enforce post-grace item hiding for creators whose cancellation grace period has expired.
22 pub(super) async fn enforce_post_grace_hiding(state: &AppState) {
23 match db::creator_tiers::get_expired_grace_creators(&state.db).await {
24 Ok(user_ids) => {
25 for user_id in user_ids {
26 match db::items::hide_all_items_for_user(&state.db, user_id).await {
27 Ok(count) => {
28 if count > 0 {
29 tracing::info!(
30 user_id = %user_id,
31 items_hidden = count,
32 "post-grace enforcement: items hidden"
33 );
34 }
35 }
36 Err(e) => {
37 tracing::error!(error = ?e, user_id = %user_id, "failed to hide items for post-grace enforcement");
38 continue;
39 }
40 }
41 if let Err(e) = db::creator_tiers::mark_grace_enforced(&state.db, user_id).await {
42 tracing::error!(error = ?e, user_id = %user_id, "failed to mark grace enforced");
43 }
44 }
45 }
46 Err(e) => {
47 tracing::error!(error = ?e, "failed to query expired grace creators");
48 }
49 }
50 }
51
52 /// Detect items where denormalized sales_count has drifted from actual transaction count.
53 pub(super) async fn check_sales_count_drift(state: &AppState) {
54 // Pre-filter to items that either have a non-zero recorded count OR have
55 // at least one completed transaction. The previous query GROUPed every
56 // item in the platform's history; on a mature DB this multi-minute scan
57 // pins a pool connection. `EXISTS ... LIMIT 1` short-circuits per item.
58 let rows = match sqlx::query_as::<_, (db::ItemId, i32, i64)>(
59 r#"
60 SELECT i.id, i.sales_count, COUNT(t.id)
61 FROM items i
62 LEFT JOIN transactions t ON t.item_id = i.id AND t.status = 'completed'
63 WHERE i.sales_count > 0
64 OR EXISTS (SELECT 1 FROM transactions WHERE item_id = i.id AND status = 'completed' LIMIT 1)
65 GROUP BY i.id
66 HAVING i.sales_count != COUNT(t.id)
67 LIMIT 50
68 "#,
69 )
70 .fetch_all(&state.db)
71 .await
72 {
73 Ok(r) if r.is_empty() => return,
74 Ok(r) => r,
75 Err(e) => {
76 tracing::error!(error = ?e, "sales count drift check failed");
77 return;
78 }
79 };
80
81 tracing::warn!(count = rows.len(), "sales count drift detected");
82
83 if let Some(ref wam) = state.wam {
84 let items: Vec<String> = rows
85 .iter()
86 .map(|(id, cached, actual)| format!(" {id}: cached={cached}, actual={actual}"))
87 .collect();
88 let body = format!("Items with drifted sales_count:\n{}", items.join("\n"));
89 wam.create_ticket(
90 &format!("Sales count drift: {} items", rows.len()),
91 Some(&body),
92 "medium",
93 "sales-count-drift",
94 None,
95 )
96 .await;
97 }
98 }
99
100 /// Find subscriptions stuck in past_due for >7 days (possible missed webhook).
101 pub(super) async fn check_stale_subscriptions(state: &AppState) {
102 let count: i64 = match sqlx::query_scalar(
103 r#"
104 SELECT COUNT(*) FROM (
105 SELECT 1 FROM creator_subscriptions WHERE status = 'past_due' AND current_period_end < NOW() - INTERVAL '7 days'
106 UNION ALL
107 SELECT 1 FROM subscriptions WHERE status = 'past_due' AND current_period_end < NOW() - INTERVAL '7 days'
108 ) stale
109 "#,
110 )
111 .fetch_one(&state.db)
112 .await
113 {
114 Ok(c) => c,
115 Err(e) => {
116 tracing::error!(error = ?e, "stale subscription check failed");
117 return;
118 }
119 };
120
121 if count > 0 {
122 tracing::warn!(count, "stale past_due subscriptions detected");
123 if let Some(ref wam) = state.wam {
124 wam.create_ticket(
125 &format!("{count} subscriptions past_due >7 days"),
126 Some("Subscriptions stuck in past_due for over 7 days. A Stripe webhook may have been missed. Check the Stripe dashboard."),
127 "medium",
128 "subscription-stale-past-due",
129 None,
130 ).await;
131 }
132 }
133 }
134
135 /// Detect email bounce/complaint spikes (>10 suppressions in 24h).
136 pub(super) async fn check_email_bounce_spike(state: &AppState) {
137 let count: i64 = match sqlx::query_scalar(
138 "SELECT COUNT(*) FROM email_suppressions WHERE created_at > NOW() - INTERVAL '24 hours'",
139 )
140 .fetch_one(&state.db)
141 .await
142 {
143 Ok(c) => c,
144 Err(e) => {
145 tracing::error!(error = ?e, "email bounce spike check failed");
146 return;
147 }
148 };
149
150 if count > 10 {
151 tracing::warn!(count, "email bounce/complaint spike");
152 if let Some(ref wam) = state.wam {
153 wam.create_ticket(
154 &format!("Email bounce spike: {count} suppressions in 24h"),
155 Some("Elevated bounce/complaint rate may indicate a deliverability problem. Check Postmark dashboard."),
156 "high",
157 "email-bounce-spike",
158 None,
159 ).await;
160 }
161 }
162 }
163