Skip to main content

max / makenotwork

6.9 KB · 203 lines History Blame Raw
1 use chrono::{DateTime, Utc};
2 use sqlx::PgPool;
3
4 use crate::db::{SyncAppId, UserId};
5 use crate::error::Result;
6
7 /// Parameters for inserting a new app sync subscription from a webhook event.
8 pub struct NewAppSyncSubscription<'a> {
9 pub user_id: UserId,
10 pub app_id: SyncAppId,
11 pub stripe_subscription_id: &'a str,
12 pub stripe_customer_id: &'a str,
13 /// Billing interval ("monthly" / "annual"). Persisted in the `tier` column
14 /// (kept lowercase for that legacy name) so cap-change handlers know which
15 /// interval the user is on without round-tripping to Stripe.
16 pub interval: &'a str,
17 pub storage_limit_bytes: i64,
18 }
19
20 /// End-user subscription to an app's cloud sync (rows in `app_sync_subscriptions`).
21 #[derive(Debug, sqlx::FromRow)]
22 pub struct DbAppSyncSubscription {
23 pub stripe_subscription_id: String,
24 pub interval: String,
25 pub status: String,
26 pub storage_limit_bytes: Option<i64>,
27 pub pending_storage_limit_bytes: Option<i64>,
28 pub current_period_end: Option<DateTime<Utc>>,
29 }
30
31 /// Look up the active subscription a user has on an app, if any.
32 /// Returns `None` if the user has never subscribed or the subscription was deleted.
33 #[tracing::instrument(skip_all)]
34 pub async fn get_user_app_subscription(
35 pool: &PgPool,
36 user_id: UserId,
37 app_id: SyncAppId,
38 ) -> Result<Option<DbAppSyncSubscription>> {
39 let row = sqlx::query_as::<_, DbAppSyncSubscription>(
40 r#"
41 SELECT stripe_subscription_id,
42 tier AS interval,
43 status,
44 storage_limit_bytes,
45 pending_storage_limit_bytes,
46 current_period_end
47 FROM app_sync_subscriptions
48 WHERE user_id = $1 AND app_id = $2
49 "#,
50 )
51 .bind(user_id)
52 .bind(app_id)
53 .fetch_optional(pool)
54 .await?;
55 Ok(row)
56 }
57
58 /// Insert or reactivate an app sync subscription. Returns `Ok(true)` if a row
59 /// was inserted or reactivated, `Ok(false)` if an identical active subscription
60 /// already existed for this (user, app) pair (idempotent webhook replay).
61 ///
62 /// ON CONFLICT DO UPDATE (not DO NOTHING) so a paid re-subscribe after a
63 /// cancellation reactivates the row AT CHECKOUT, deterministically — the old
64 /// DO NOTHING left a re-subscribed user `canceled` until a later
65 /// `customer.subscription.updated`(active) happened to arrive (Run #12 MINOR).
66 /// The guard WHERE makes a duplicate checkout webhook for an unchanged active
67 /// row a no-op, mirroring `creator_tiers::create_creator_subscription`. Pairing
68 /// this with the terminal guard on `update_app_sync_subscription_status` keeps
69 /// reactivation on the checkout path while the status-update webhooks can't
70 /// revive a canceled row.
71 #[tracing::instrument(skip_all)]
72 pub async fn create_app_sync_subscription(
73 pool: &PgPool,
74 sub: &NewAppSyncSubscription<'_>,
75 ) -> Result<bool> {
76 let result = sqlx::query(
77 r#"
78 INSERT INTO app_sync_subscriptions
79 (user_id, app_id, stripe_subscription_id, stripe_customer_id,
80 tier, status, storage_limit_bytes)
81 VALUES ($1, $2, $3, $4, $5, 'active', $6)
82 ON CONFLICT (user_id, app_id) DO UPDATE
83 SET stripe_subscription_id = EXCLUDED.stripe_subscription_id,
84 stripe_customer_id = EXCLUDED.stripe_customer_id,
85 tier = EXCLUDED.tier,
86 storage_limit_bytes = EXCLUDED.storage_limit_bytes,
87 status = 'active',
88 canceled_at = NULL
89 WHERE app_sync_subscriptions.stripe_subscription_id != EXCLUDED.stripe_subscription_id
90 OR app_sync_subscriptions.status != 'active'
91 "#,
92 )
93 .bind(sub.user_id)
94 .bind(sub.app_id)
95 .bind(sub.stripe_subscription_id)
96 .bind(sub.stripe_customer_id)
97 .bind(sub.interval)
98 .bind(sub.storage_limit_bytes)
99 .execute(pool)
100 .await?;
101 Ok(result.rows_affected() > 0)
102 }
103
104 /// Update the status of an existing app sync subscription (e.g. on
105 /// `customer.subscription.updated` or `.deleted` webhook events).
106 ///
107 /// `canceled` is terminal: the `AND (status != 'canceled' OR $2 = 'canceled')`
108 /// guard refuses to revive a canceled app-sub via an out-of-order webhook;
109 /// reactivation happens at checkout through `create_app_sync_subscription`'s
110 /// DO UPDATE path. Consistent with every other subscription family's setter.
111 #[tracing::instrument(skip_all)]
112 pub async fn update_app_sync_subscription_status(
113 pool: &PgPool,
114 stripe_subscription_id: &str,
115 status: &str,
116 current_period_end: Option<DateTime<Utc>>,
117 ) -> Result<()> {
118 sqlx::query(
119 r#"
120 UPDATE app_sync_subscriptions
121 SET status = $2,
122 current_period_end = COALESCE($3, current_period_end),
123 canceled_at = CASE WHEN $2 = 'canceled' THEN NOW() ELSE canceled_at END
124 WHERE stripe_subscription_id = $1
125 AND (status != 'canceled' OR $2 = 'canceled')
126 "#,
127 )
128 .bind(stripe_subscription_id)
129 .bind(status)
130 .bind(current_period_end)
131 .execute(pool)
132 .await?;
133 Ok(())
134 }
135
136 /// Look up an app sync subscription by its Stripe subscription ID. Used by
137 /// webhook handlers to find the row to update.
138 #[tracing::instrument(skip_all)]
139 pub async fn get_subscription_by_stripe_id(
140 pool: &PgPool,
141 stripe_subscription_id: &str,
142 ) -> Result<Option<(UserId, SyncAppId)>> {
143 let row: Option<(UserId, SyncAppId)> = sqlx::query_as(
144 r#"
145 SELECT user_id, app_id
146 FROM app_sync_subscriptions
147 WHERE stripe_subscription_id = $1
148 "#,
149 )
150 .bind(stripe_subscription_id)
151 .fetch_optional(pool)
152 .await?;
153 Ok(row)
154 }
155
156 /// Queue a storage-cap change to apply at the next billing cycle. Stores the
157 /// new cap in `pending_storage_limit_bytes`; the renewal webhook handler
158 /// promotes it to `storage_limit_bytes` once Stripe confirms the period roll.
159 #[tracing::instrument(skip_all)]
160 pub async fn set_pending_storage_cap(
161 pool: &PgPool,
162 user_id: UserId,
163 app_id: SyncAppId,
164 pending_bytes: i64,
165 ) -> Result<()> {
166 sqlx::query(
167 r#"
168 UPDATE app_sync_subscriptions
169 SET pending_storage_limit_bytes = $3
170 WHERE user_id = $1 AND app_id = $2
171 "#,
172 )
173 .bind(user_id)
174 .bind(app_id)
175 .bind(pending_bytes)
176 .execute(pool)
177 .await?;
178 Ok(())
179 }
180
181 /// Promote a queued cap change to the active cap. Called from the renewal
182 /// webhook handler when Stripe rolls the subscription to a new period.
183 /// No-op if no pending change is queued.
184 #[tracing::instrument(skip_all)]
185 pub async fn apply_pending_storage_cap(
186 pool: &PgPool,
187 stripe_subscription_id: &str,
188 ) -> Result<()> {
189 sqlx::query(
190 r#"
191 UPDATE app_sync_subscriptions
192 SET storage_limit_bytes = pending_storage_limit_bytes,
193 pending_storage_limit_bytes = NULL
194 WHERE stripe_subscription_id = $1
195 AND pending_storage_limit_bytes IS NOT NULL
196 "#,
197 )
198 .bind(stripe_subscription_id)
199 .execute(pool)
200 .await?;
201 Ok(())
202 }
203