Skip to main content

max / makenotwork

33.1 KB · 1014 lines History Blame Raw
1 //! SyncKit v2 developer-billing queries.
2 //!
3 //! Operates on the billing columns added to `sync_apps` and the auxiliary
4 //! `sync_app_usage_current` table by migration 117 (`117_synckit_v2_billing.sql`).
5 //!
6 //! Schema reference (kept here for offline-mode reviewers since the migration
7 //! is not yet applied at the time of writing):
8 //!
9 //! ```text
10 //! sync_apps:
11 //! is_internal BOOLEAN NOT NULL DEFAULT FALSE
12 //! stripe_customer_id TEXT
13 //! stripe_subscription_id TEXT UNIQUE
14 //! billing_status TEXT NOT NULL CHECK IN
15 //! ('draft','active','suspended_unpaid','canceled')
16 //! storage_gb_cap INT
17 //! egress_multiple NUMERIC(6,2)
18 //! enforcement_mode TEXT NOT NULL CHECK IN ('per_key','app_wide')
19 //! key_cap INT
20 //! current_period_start TIMESTAMPTZ
21 //! current_period_end TIMESTAMPTZ
22 //!
23 //! sync_app_usage_current (one row per app):
24 //! app_id, bytes_stored, bytes_egress_period, keys_claimed,
25 //! last_warning_pct, period_started_at, updated_at
26 //! ```
27
28 use chrono::{DateTime, Utc};
29 use sqlx::PgPool;
30
31 use super::id_types::SyncAppId;
32 use super::models::{DbSyncAppBilling, DbSyncAppKey};
33 use crate::error::Result;
34
35 /// Outcome of a `claim_key` call.
36 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
37 pub struct ClaimResult {
38 /// `true` if this call inserted a new active claim row;
39 /// `false` if the key was already actively claimed (idempotent re-claim).
40 pub newly_claimed: bool,
41 /// Total active claims for this app after the operation.
42 pub total_claimed: i32,
43 }
44
45 /// Outcome of a `release_key` call.
46 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
47 pub struct ReleaseResult {
48 /// `true` if this call transitioned an active row to released;
49 /// `false` if no active row existed (idempotent release).
50 pub newly_released: bool,
51 /// Total active claims for this app after the operation.
52 pub total_claimed: i32,
53 }
54
55 /// Set the Stripe customer ID on a sync app (idempotent; re-sets the same id
56 /// without error). Called by the billing setup route once the developer-side
57 /// Customer object has been created in Stripe.
58 #[tracing::instrument(skip_all)]
59 pub async fn set_stripe_customer(
60 pool: &PgPool,
61 app_id: SyncAppId,
62 stripe_customer_id: &str,
63 ) -> Result<()> {
64 sqlx::query("UPDATE sync_apps SET stripe_customer_id = $2 WHERE id = $1")
65 .bind(app_id)
66 .bind(stripe_customer_id)
67 .execute(pool)
68 .await?;
69 Ok(())
70 }
71
72 /// Activate billing on a draft app: stamps subscription id, knobs, status,
73 /// and current period. The `egress_multiple` is passed in as `f64` and cast
74 /// to the column's `NUMERIC(6,2)` type in SQL.
75 #[tracing::instrument(skip_all)]
76 #[allow(clippy::too_many_arguments)]
77 pub async fn activate_billing(
78 pool: &PgPool,
79 app_id: SyncAppId,
80 enforcement_mode: &str,
81 storage_gb_cap: Option<i32>,
82 key_cap: Option<i32>,
83 gb_per_key: Option<i32>,
84 stripe_sub_id: &str,
85 period_start: DateTime<Utc>,
86 period_end: DateTime<Utc>,
87 ) -> Result<()> {
88 sqlx::query(
89 r#"
90 UPDATE sync_apps SET
91 billing_status = 'active',
92 stripe_subscription_id = $2,
93 enforcement_mode = $3,
94 storage_gb_cap = $4,
95 key_cap = $5,
96 gb_per_key = $6,
97 current_period_start = $7,
98 current_period_end = $8
99 WHERE id = $1
100 "#,
101 )
102 .bind(app_id)
103 .bind(stripe_sub_id)
104 .bind(enforcement_mode)
105 .bind(storage_gb_cap)
106 .bind(key_cap)
107 .bind(gb_per_key)
108 .bind(period_start)
109 .bind(period_end)
110 .execute(pool)
111 .await?;
112 Ok(())
113 }
114
115 /// Update the pricing knobs on an already-active app (no status change).
116 #[tracing::instrument(skip_all)]
117 pub async fn update_knobs(
118 pool: &PgPool,
119 app_id: SyncAppId,
120 enforcement_mode: &str,
121 storage_gb_cap: Option<i32>,
122 key_cap: Option<i32>,
123 gb_per_key: Option<i32>,
124 ) -> Result<()> {
125 sqlx::query(
126 r#"
127 UPDATE sync_apps SET
128 enforcement_mode = $2,
129 storage_gb_cap = $3,
130 key_cap = $4,
131 gb_per_key = $5
132 WHERE id = $1
133 "#,
134 )
135 .bind(app_id)
136 .bind(enforcement_mode)
137 .bind(storage_gb_cap)
138 .bind(key_cap)
139 .bind(gb_per_key)
140 .execute(pool)
141 .await?;
142 Ok(())
143 }
144
145 /// Apply a Stripe-driven billing update to a sync app: optionally change
146 /// `billing_status` and/or refresh the current period, in ONE guarded statement.
147 /// Returns `true` if a live (non-canceled) row was updated.
148 ///
149 /// `canceled` is terminal: the `AND (billing_status != 'canceled' OR $2 = 'canceled')`
150 /// guard refuses to move a canceled app back to active/suspended_unpaid — AND,
151 /// because the period columns are written here under the same guard, a stray
152 /// `invoice.paid` after a `deleted` can no longer refresh the period (nor, via
153 /// the returned `false`, reset usage) on a canceled app. This replaces the old
154 /// split `set_billing_status` (guarded) + `set_period` (UNGUARDED) — the synckit
155 /// instance of the period-without-guard bug class. The legitimate
156 /// `suspended_unpaid -> active` recovery on `invoice.paid` is unaffected
157 /// (suspended_unpaid != canceled). Initial activation from `draft` goes through
158 /// `activate_billing`, not this path. See `crate::db::subscription_writer` for
159 /// the cross-family rationale.
160 #[tracing::instrument(skip_all)]
161 pub async fn apply_billing_update<'e>(
162 executor: impl sqlx::PgExecutor<'e>,
163 app_id: SyncAppId,
164 status: Option<&str>,
165 period: Option<(DateTime<Utc>, DateTime<Utc>)>,
166 ) -> Result<bool> {
167 let (period_start, period_end) = match period {
168 Some((start, end)) => (Some(start), Some(end)),
169 None => (None, None),
170 };
171 let result = sqlx::query(
172 r#"
173 UPDATE sync_apps SET
174 billing_status = COALESCE($2, billing_status),
175 current_period_start = COALESCE($3, current_period_start),
176 current_period_end = COALESCE($4, current_period_end)
177 WHERE id = $1
178 AND (billing_status != 'canceled' OR $2 = 'canceled')
179 "#,
180 )
181 .bind(app_id)
182 .bind(status)
183 .bind(period_start)
184 .bind(period_end)
185 .execute(executor)
186 .await?;
187 Ok(result.rows_affected() > 0)
188 }
189
190 /// Reset the per-period usage counters on `sync_app_usage_current`. Called
191 /// from the `invoice.paid` webhook handler at period rollover.
192 #[tracing::instrument(skip_all)]
193 pub async fn reset_period_usage<'e>(
194 executor: impl sqlx::PgExecutor<'e>,
195 app_id: SyncAppId,
196 ) -> Result<()> {
197 sqlx::query(
198 r#"
199 UPDATE sync_app_usage_current SET
200 bytes_egress_period = 0,
201 last_warning_pct = 0,
202 period_started_at = NOW(),
203 updated_at = NOW()
204 WHERE app_id = $1
205 "#,
206 )
207 .bind(app_id)
208 .execute(executor)
209 .await?;
210 Ok(())
211 }
212
213 /// Look up the sync app that owns a given Stripe subscription. Used by the
214 /// webhook router to distinguish SyncKit v2 subscriptions from
215 /// creator-tier / Fan+ subscriptions.
216 #[tracing::instrument(skip_all)]
217 pub async fn get_app_by_stripe_subscription(
218 pool: &PgPool,
219 stripe_sub_id: &str,
220 ) -> Result<Option<SyncAppId>> {
221 let row: Option<(SyncAppId,)> = sqlx::query_as(
222 "SELECT id FROM sync_apps WHERE stripe_subscription_id = $1",
223 )
224 .bind(stripe_sub_id)
225 .fetch_optional(pool)
226 .await?;
227 Ok(row.map(|(id,)| id))
228 }
229
230 /// Fetch the combined app+billing+usage view for a single app. `egress_multiple`
231 /// is cast from NUMERIC to DOUBLE PRECISION so it decodes into `f64` without
232 /// the `bigdecimal` sqlx feature. `sync_app_usage_current` is LEFT-joined; /// the row is normally inserted on app create (see migration 117), but
233 /// `Option` is used so a missing row doesn't blow up the query.
234 #[tracing::instrument(skip_all)]
235 pub async fn get_app_with_billing(
236 pool: &PgPool,
237 app_id: SyncAppId,
238 ) -> Result<Option<DbSyncAppBilling>> {
239 let app = sqlx::query_as::<_, DbSyncAppBilling>(
240 r#"
241 SELECT
242 sa.id,
243 sa.creator_id,
244 sa.name,
245 sa.is_internal,
246 sa.stripe_customer_id,
247 sa.stripe_subscription_id,
248 sa.billing_status,
249 sa.storage_gb_cap,
250 sa.enforcement_mode,
251 sa.key_cap,
252 sa.gb_per_key,
253 sa.current_period_start,
254 sa.current_period_end,
255 u.bytes_stored,
256 u.bytes_egress_period,
257 u.keys_claimed,
258 u.last_warning_pct,
259 u.period_started_at,
260 p.slug AS project_slug
261 FROM sync_apps sa
262 LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id
263 LEFT JOIN projects p ON p.id = sa.project_id
264 WHERE sa.id = $1
265 "#,
266 )
267 .bind(app_id)
268 .fetch_optional(pool)
269 .await?;
270 Ok(app)
271 }
272
273 /// Batch variant of `get_app_with_billing` that loads every app owned by a
274 /// creator with its billing+usage join in one query. Used by the user-level
275 /// SyncKit dashboard tab.
276 #[tracing::instrument(skip_all)]
277 pub async fn get_apps_with_billing_by_creator(
278 pool: &PgPool,
279 creator_id: super::id_types::UserId,
280 ) -> Result<Vec<DbSyncAppBilling>> {
281 let apps = sqlx::query_as::<_, DbSyncAppBilling>(
282 r#"
283 SELECT
284 sa.id,
285 sa.creator_id,
286 sa.name,
287 sa.is_internal,
288 sa.stripe_customer_id,
289 sa.stripe_subscription_id,
290 sa.billing_status,
291 sa.storage_gb_cap,
292 sa.enforcement_mode,
293 sa.key_cap,
294 sa.gb_per_key,
295 sa.current_period_start,
296 sa.current_period_end,
297 u.bytes_stored,
298 u.bytes_egress_period,
299 u.keys_claimed,
300 u.last_warning_pct,
301 u.period_started_at,
302 p.slug AS project_slug
303 FROM sync_apps sa
304 LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id
305 LEFT JOIN projects p ON p.id = sa.project_id
306 WHERE sa.creator_id = $1
307 "#,
308 )
309 .bind(creator_id)
310 .fetch_all(pool)
311 .await?;
312 Ok(apps)
313 }
314
315 /// Batch variant of `get_app_with_billing` for one project. Used by the
316 /// project-level SyncKit dashboard tab.
317 #[tracing::instrument(skip_all)]
318 pub async fn get_apps_with_billing_by_project(
319 pool: &PgPool,
320 project_id: super::id_types::ProjectId,
321 ) -> Result<Vec<DbSyncAppBilling>> {
322 let apps = sqlx::query_as::<_, DbSyncAppBilling>(
323 r#"
324 SELECT
325 sa.id,
326 sa.creator_id,
327 sa.name,
328 sa.is_internal,
329 sa.stripe_customer_id,
330 sa.stripe_subscription_id,
331 sa.billing_status,
332 sa.storage_gb_cap,
333 sa.enforcement_mode,
334 sa.key_cap,
335 sa.gb_per_key,
336 sa.current_period_start,
337 sa.current_period_end,
338 u.bytes_stored,
339 u.bytes_egress_period,
340 u.keys_claimed,
341 u.last_warning_pct,
342 u.period_started_at,
343 p.slug AS project_slug
344 FROM sync_apps sa
345 LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id
346 LEFT JOIN projects p ON p.id = sa.project_id
347 WHERE sa.project_id = $1
348 "#,
349 )
350 .bind(project_id)
351 .fetch_all(pool)
352 .await?;
353 Ok(apps)
354 }
355
356 /// Claim an SDK encryption key for an app. Idempotent: a re-claim of an
357 /// already-active key returns `newly_claimed = false` without inserting.
358 ///
359 /// The transaction locks `sync_app_usage_current` for this app first, so the
360 /// route handler can read `keys_claimed` and decide on the cap before this
361 /// runs without races. (The handler does its check pre-transaction; this
362 /// function re-checks idempotency inside the lock.)
363 #[tracing::instrument(skip_all)]
364 pub async fn claim_key(
365 pool: &sqlx::PgPool,
366 app_id: SyncAppId,
367 key: &str,
368 ) -> Result<ClaimResult> {
369 let mut tx = pool.begin().await?;
370
371 // Lock the usage row for this app. Returns the current keys_claimed.
372 let (mut keys_claimed,): (i32,) = sqlx::query_as(
373 "SELECT keys_claimed FROM sync_app_usage_current WHERE app_id = $1 FOR UPDATE",
374 )
375 .bind(app_id)
376 .fetch_one(&mut *tx)
377 .await?;
378
379 // Is there already an active claim for this key?
380 let existing: Option<(uuid::Uuid,)> = sqlx::query_as(
381 "SELECT id FROM sync_app_keys
382 WHERE app_id = $1 AND key = $2 AND released_at IS NULL",
383 )
384 .bind(app_id)
385 .bind(key)
386 .fetch_optional(&mut *tx)
387 .await?;
388
389 let newly_claimed = if existing.is_some() {
390 false
391 } else {
392 sqlx::query(
393 "INSERT INTO sync_app_keys (app_id, key) VALUES ($1, $2)",
394 )
395 .bind(app_id)
396 .bind(key)
397 .execute(&mut *tx)
398 .await?;
399
400 sqlx::query(
401 "UPDATE sync_app_usage_current
402 SET keys_claimed = keys_claimed + 1, updated_at = NOW()
403 WHERE app_id = $1",
404 )
405 .bind(app_id)
406 .execute(&mut *tx)
407 .await?;
408
409 keys_claimed += 1;
410 true
411 };
412
413 tx.commit().await?;
414 Ok(ClaimResult {
415 newly_claimed,
416 total_claimed: keys_claimed,
417 })
418 }
419
420 /// Release an SDK encryption key. Idempotent: releasing a key that is not
421 /// actively claimed returns `newly_released = false`.
422 #[tracing::instrument(skip_all)]
423 pub async fn release_key(
424 pool: &sqlx::PgPool,
425 app_id: SyncAppId,
426 key: &str,
427 ) -> Result<ReleaseResult> {
428 let mut tx = pool.begin().await?;
429
430 let (mut keys_claimed,): (i32,) = sqlx::query_as(
431 "SELECT keys_claimed FROM sync_app_usage_current WHERE app_id = $1 FOR UPDATE",
432 )
433 .bind(app_id)
434 .fetch_one(&mut *tx)
435 .await?;
436
437 let released: Option<(uuid::Uuid,)> = sqlx::query_as(
438 "UPDATE sync_app_keys SET released_at = NOW()
439 WHERE app_id = $1 AND key = $2 AND released_at IS NULL
440 RETURNING id",
441 )
442 .bind(app_id)
443 .bind(key)
444 .fetch_optional(&mut *tx)
445 .await?;
446
447 let newly_released = if released.is_some() {
448 sqlx::query(
449 "UPDATE sync_app_usage_current
450 SET keys_claimed = GREATEST(keys_claimed - 1, 0), updated_at = NOW()
451 WHERE app_id = $1",
452 )
453 .bind(app_id)
454 .execute(&mut *tx)
455 .await?;
456 keys_claimed = (keys_claimed - 1).max(0);
457 true
458 } else {
459 false
460 };
461
462 tx.commit().await?;
463 Ok(ReleaseResult {
464 newly_released,
465 total_claimed: keys_claimed,
466 })
467 }
468
469 /// List active (un-released) key claims for an app, ordered by `claimed_at DESC`.
470 /// Used by the dashboard "Active keys" view. `bytes_stored` is LEFT-joined
471 /// from `sync_key_usage_current` — `0` when no upload has landed yet.
472 #[tracing::instrument(skip_all)]
473 pub async fn list_active_keys(
474 pool: &sqlx::PgPool,
475 app_id: SyncAppId,
476 limit: i64,
477 offset: i64,
478 ) -> Result<Vec<DbSyncAppKey>> {
479 let rows = sqlx::query_as::<_, DbSyncAppKey>(
480 r#"
481 SELECT k.id, k.key, k.claimed_at,
482 COALESCE(u.bytes_stored, 0) AS bytes_stored
483 FROM sync_app_keys k
484 LEFT JOIN sync_key_usage_current u
485 ON u.app_id = k.app_id AND u.key = k.key
486 WHERE k.app_id = $1 AND k.released_at IS NULL
487 ORDER BY k.claimed_at DESC
488 LIMIT $2 OFFSET $3
489 "#,
490 )
491 .bind(app_id)
492 .bind(limit)
493 .bind(offset)
494 .fetch_all(pool)
495 .await?;
496 Ok(rows)
497 }
498
499 /// Per-app top-N key usage for the dashboard panel. Returns the highest-usage
500 /// active keys (sorted by `bytes_stored DESC`) for each app in `app_ids`,
501 /// batched into a single query so the integrations page doesn't N+1 across
502 /// every per_key-mode app.
503 #[tracing::instrument(skip_all)]
504 pub async fn get_top_keys_per_app(
505 pool: &PgPool,
506 app_ids: &[SyncAppId],
507 limit_per_app: i64,
508 ) -> Result<Vec<(SyncAppId, String, i64)>> {
509 if app_ids.is_empty() {
510 return Ok(Vec::new());
511 }
512 let rows: Vec<(SyncAppId, String, i64)> = sqlx::query_as(
513 r#"
514 SELECT app_id, key, bytes_stored
515 FROM (
516 SELECT u.app_id, u.key, u.bytes_stored,
517 ROW_NUMBER() OVER (PARTITION BY u.app_id ORDER BY u.bytes_stored DESC, u.key) AS rn
518 FROM sync_key_usage_current u
519 JOIN sync_app_keys k
520 ON k.app_id = u.app_id AND k.key = u.key AND k.released_at IS NULL
521 WHERE u.app_id = ANY($1)
522 ) ranked
523 WHERE rn <= $2
524 ORDER BY app_id, rn
525 "#,
526 )
527 .bind(app_ids)
528 .bind(limit_per_app)
529 .fetch_all(pool)
530 .await?;
531 Ok(rows)
532 }
533
534 // ── Phase 5: Usage counters and cap enforcement ──
535
536 /// A breached cap, returned by `would_exceed_*` checks.
537 #[derive(Debug, Clone, PartialEq, Eq)]
538 pub struct ExceededLimit {
539 /// `"storage"`, `"storage_per_key"`, or `"egress"`. (`"billing"` is reserved
540 /// for inactive-billing failure paths so a single 402 response shape can
541 /// carry every reason; the caller decides whether to thread that through
542 /// here or check `billing_status` separately.)
543 pub dimension: &'static str,
544 /// Current usage in bytes (before the would-be addition).
545 pub used: i64,
546 /// Configured cap in bytes.
547 pub limit: i64,
548 /// The SDK key that hit its cap, if `dimension == "storage_per_key"`.
549 /// `None` for app-wide dimensions.
550 pub key: Option<String>,
551 }
552
553 /// A row that may need a warning email sent.
554 ///
555 /// `threshold_pct` is the highest WARNING_THRESHOLDS_PCT band currently
556 /// breached above `last_warning_pct`. The caller is responsible for filtering
557 /// out apps where no new breach has occurred (i.e. usage hasn't crossed the
558 /// next threshold since the previous warning).
559 #[derive(Debug, Clone, PartialEq, Eq)]
560 pub struct WarningCandidate {
561 pub app_id: SyncAppId,
562 pub creator_id: super::id_types::UserId,
563 pub creator_email: String,
564 pub app_name: String,
565 pub threshold_pct: i16,
566 pub dimension: &'static str,
567 pub used: i64,
568 pub limit: i64,
569 /// SDK key that breached the threshold, when `dimension ==
570 /// "storage_per_key"`. `None` for app-wide breaches.
571 pub key: Option<String>,
572 }
573
574 /// Increment (or decrement, for negative `delta`) the rolling `bytes_stored`
575 /// counter on `sync_app_usage_current` and the per-key row on
576 /// `sync_key_usage_current` (upserted if missing). Returns the new app-level
577 /// total.
578 ///
579 /// NOTE: read-then-add elsewhere in the request handler is racy. Acceptable
580 /// overshoot under concurrent uploads in v1.
581 #[tracing::instrument(skip_all)]
582 pub async fn add_bytes_stored(
583 pool: &PgPool,
584 app_id: SyncAppId,
585 key: &str,
586 delta: i64,
587 ) -> Result<i64> {
588 let mut tx = pool.begin().await?;
589
590 let (new_val,): (i64,) = sqlx::query_as(
591 r#"
592 UPDATE sync_app_usage_current
593 SET bytes_stored = GREATEST(bytes_stored + $2, 0), updated_at = NOW()
594 WHERE app_id = $1
595 RETURNING bytes_stored
596 "#,
597 )
598 .bind(app_id)
599 .bind(delta)
600 .fetch_one(&mut *tx)
601 .await?;
602
603 sqlx::query(
604 r#"
605 INSERT INTO sync_key_usage_current (app_id, key, bytes_stored)
606 VALUES ($1, $2, GREATEST($3, 0))
607 ON CONFLICT (app_id, key)
608 DO UPDATE SET
609 bytes_stored = GREATEST(sync_key_usage_current.bytes_stored + $3, 0),
610 updated_at = NOW()
611 "#,
612 )
613 .bind(app_id)
614 .bind(key)
615 .bind(delta)
616 .execute(&mut *tx)
617 .await?;
618
619 tx.commit().await?;
620 Ok(new_val)
621 }
622
623 /// Increment (or decrement) the per-period `bytes_egress_period` counter.
624 /// Returns the new value.
625 #[tracing::instrument(skip_all)]
626 pub async fn add_bytes_egress(
627 pool: &PgPool,
628 app_id: SyncAppId,
629 delta: i64,
630 ) -> Result<i64> {
631 let (new_val,): (i64,) = sqlx::query_as(
632 r#"
633 UPDATE sync_app_usage_current
634 SET bytes_egress_period = GREATEST(bytes_egress_period + $2, 0), updated_at = NOW()
635 WHERE app_id = $1
636 RETURNING bytes_egress_period
637 "#,
638 )
639 .bind(app_id)
640 .bind(delta)
641 .fetch_one(pool)
642 .await?;
643 Ok(new_val)
644 }
645
646 /// Check whether storing `additional_bytes` more would exceed the storage cap.
647 ///
648 /// Returns `None` if no cap applies (internal apps, billing not active, or
649 /// the cap is unset). Returns `Some(ExceededLimit)` only when adding
650 /// `additional_bytes` would push usage past the cap.
651 ///
652 /// In `per_key` mode the per-key counter is checked first against
653 /// `gb_per_key × 1GB` — if exceeded, returns `dimension: "storage_per_key"`
654 /// with the offending `key`. The app-aggregate (`key_cap × gb_per_key`) is
655 /// also checked as a defensive hard ceiling; under normal operation this
656 /// can't trip before some key has tripped, but it guards against drift.
657 ///
658 /// In `bulk` mode `key` is unused (still passed in for call-site uniformity)
659 /// and the app-wide `storage_gb_cap` is the only check.
660 ///
661 /// Race-condition note: this reads, the caller adds. Concurrent uploads can
662 /// produce small overshoots. Acceptable for v1.
663 /// Row for the storage-cap lookup:
664 /// (is_internal, enforcement_mode, storage_gb_cap, key_cap, gb_per_key, used_bytes).
665 type AppStorageCapRow = (bool, String, Option<i32>, Option<i32>, Option<i32>, Option<i64>);
666
667 #[tracing::instrument(skip_all)]
668 pub async fn would_exceed_storage(
669 pool: &PgPool,
670 app_id: SyncAppId,
671 key: &str,
672 additional_bytes: i64,
673 ) -> Result<Option<ExceededLimit>> {
674 let row: Option<AppStorageCapRow> = sqlx::query_as(
675 r#"
676 SELECT sa.is_internal, sa.enforcement_mode,
677 sa.storage_gb_cap, sa.key_cap, sa.gb_per_key,
678 u.bytes_stored
679 FROM sync_apps sa
680 LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id
681 WHERE sa.id = $1
682 "#,
683 )
684 .bind(app_id)
685 .fetch_optional(pool)
686 .await?;
687
688 let Some((is_internal, mode, storage_gb, key_cap, gb_per_key, bytes_stored)) = row else { return Ok(None); };
689 if is_internal { return Ok(None); }
690
691 let app_used = bytes_stored.unwrap_or(0);
692
693 match mode.as_str() {
694 "bulk" => {
695 let Some(gb) = storage_gb else { return Ok(None) };
696 let limit = crate::synckit_billing::storage_cap_bytes(gb as u32);
697 if app_used.saturating_add(additional_bytes) > limit {
698 return Ok(Some(ExceededLimit {
699 dimension: "storage",
700 used: app_used,
701 limit,
702 key: None,
703 }));
704 }
705 }
706 "per_key" => {
707 let (Some(k), Some(g)) = (key_cap, gb_per_key) else { return Ok(None) };
708 let per_key_limit = crate::synckit_billing::storage_cap_bytes(g as u32);
709 let app_limit = crate::synckit_billing::storage_cap_bytes(k.saturating_mul(g) as u32);
710
711 let key_used: Option<i64> = sqlx::query_scalar(
712 "SELECT bytes_stored FROM sync_key_usage_current
713 WHERE app_id = $1 AND key = $2",
714 )
715 .bind(app_id)
716 .bind(key)
717 .fetch_optional(pool)
718 .await?;
719 let key_used = key_used.unwrap_or(0);
720
721 if key_used.saturating_add(additional_bytes) > per_key_limit {
722 return Ok(Some(ExceededLimit {
723 dimension: "storage_per_key",
724 used: key_used,
725 limit: per_key_limit,
726 key: Some(key.to_string()),
727 }));
728 }
729 // Defensive app-aggregate ceiling. Tripping this before the per-key
730 // check means the per-key counters have drifted under the app
731 // counter — refuse rather than admit silent over-allocation.
732 if app_used.saturating_add(additional_bytes) > app_limit {
733 return Ok(Some(ExceededLimit {
734 dimension: "storage",
735 used: app_used,
736 limit: app_limit,
737 key: None,
738 }));
739 }
740 }
741 _ => return Ok(None),
742 }
743
744 Ok(None)
745 }
746
747 /// Fetch the list of active, non-internal apps that may need a warning email,
748 /// along with the data needed to compute which threshold (if any) has been
749 /// breached since the last notice. The per-app breach computation is done in
750 /// Rust by the caller (see `scheduler::synckit_warnings`).
751 #[tracing::instrument(skip_all)]
752 pub async fn get_apps_needing_warning(pool: &PgPool) -> Result<Vec<WarningCandidate>> {
753 use super::id_types::UserId;
754
755 // Egress is no longer a price input or an enforced cap (see migration 118),
756 // so the only dimension that warrants a usage warning is storage. The
757 // effective storage cap depends on enforcement_mode.
758 #[derive(sqlx::FromRow)]
759 struct Row {
760 app_id: SyncAppId,
761 creator_id: UserId,
762 creator_email: String,
763 app_name: String,
764 enforcement_mode: String,
765 storage_gb_cap: Option<i32>,
766 key_cap: Option<i32>,
767 gb_per_key: Option<i32>,
768 bytes_stored: i64,
769 last_warning_pct: i16,
770 }
771
772 let rows = sqlx::query_as::<_, Row>(
773 r#"
774 SELECT
775 sa.id AS app_id,
776 sa.creator_id AS creator_id,
777 u_user.email AS creator_email,
778 sa.name AS app_name,
779 sa.enforcement_mode AS enforcement_mode,
780 sa.storage_gb_cap AS storage_gb_cap,
781 sa.key_cap AS key_cap,
782 sa.gb_per_key AS gb_per_key,
783 COALESCE(u.bytes_stored, 0) AS bytes_stored,
784 COALESCE(u.last_warning_pct, 0::smallint) AS last_warning_pct
785 FROM sync_apps sa
786 JOIN users u_user ON u_user.id = sa.creator_id
787 LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id
788 WHERE sa.billing_status = 'active'
789 AND sa.is_internal = FALSE
790 "#,
791 )
792 .fetch_all(pool)
793 .await?;
794
795 let mut out = Vec::new();
796 let mut per_key_apps: Vec<(SyncAppId, UserId, String, String, i32)> = Vec::new();
797 for r in rows {
798 match r.enforcement_mode.as_str() {
799 "bulk" => {
800 let Some(gb) = r.storage_gb_cap else { continue };
801 let limit = crate::synckit_billing::storage_cap_bytes(gb as u32);
802 if let Some(pct) =
803 highest_breached_threshold(r.bytes_stored, limit, r.last_warning_pct)
804 {
805 out.push(WarningCandidate {
806 app_id: r.app_id,
807 creator_id: r.creator_id,
808 creator_email: r.creator_email,
809 app_name: r.app_name,
810 threshold_pct: pct,
811 dimension: "storage",
812 used: r.bytes_stored,
813 limit,
814 key: None,
815 });
816 }
817 }
818 "per_key" => {
819 // Defer to a second query that fans out per (app, key); the
820 // per-key counter — not the app aggregate — is what we warn on.
821 let (Some(_), Some(g)) = (r.key_cap, r.gb_per_key) else { continue };
822 per_key_apps.push((r.app_id, r.creator_id, r.creator_email, r.app_name, g));
823 }
824 _ => {}
825 }
826 }
827
828 if !per_key_apps.is_empty() {
829 #[derive(sqlx::FromRow)]
830 struct KeyRow {
831 app_id: SyncAppId,
832 key: String,
833 bytes_stored: i64,
834 last_warning_pct: i16,
835 }
836 let app_ids: Vec<SyncAppId> = per_key_apps.iter().map(|t| t.0).collect();
837 let key_rows = sqlx::query_as::<_, KeyRow>(
838 r#"
839 SELECT u.app_id, u.key, u.bytes_stored, u.last_warning_pct
840 FROM sync_key_usage_current u
841 JOIN sync_app_keys k
842 ON k.app_id = u.app_id AND k.key = u.key AND k.released_at IS NULL
843 WHERE u.app_id = ANY($1)
844 "#,
845 )
846 .bind(&app_ids)
847 .fetch_all(pool)
848 .await?;
849
850 // Index app metadata by id for cheap join.
851 let meta: std::collections::HashMap<SyncAppId, (UserId, String, String, i32)> =
852 per_key_apps
853 .into_iter()
854 .map(|(a, c, e, n, g)| (a, (c, e, n, g)))
855 .collect();
856
857 for r in key_rows {
858 let Some((creator_id, creator_email, app_name, gb_per_key)) =
859 meta.get(&r.app_id).cloned()
860 else {
861 continue;
862 };
863 let limit = crate::synckit_billing::storage_cap_bytes(gb_per_key as u32);
864 if let Some(pct) =
865 highest_breached_threshold(r.bytes_stored, limit, r.last_warning_pct)
866 {
867 out.push(WarningCandidate {
868 app_id: r.app_id,
869 creator_id,
870 creator_email,
871 app_name,
872 threshold_pct: pct,
873 dimension: "storage_per_key",
874 used: r.bytes_stored,
875 limit,
876 key: Some(r.key),
877 });
878 }
879 }
880 }
881
882 Ok(out)
883 }
884
885 /// Compute the highest WARNING_THRESHOLDS_PCT band currently exceeded by
886 /// `used / limit` whose value is strictly above `last_warning_pct`.
887 ///
888 /// Returns `None` if no new band has been breached.
889 pub fn highest_breached_threshold(used: i64, limit: i64, last_warning_pct: i16) -> Option<i16> {
890 if limit <= 0 { return None; }
891 // Compute current percentage as integer; saturate at 100+.
892 // Use f64 to avoid 32-bit overflow for very large byte counts.
893 let pct_f = (used as f64 / limit as f64) * 100.0;
894 let pct = pct_f.floor() as i64;
895 crate::synckit_billing::WARNING_THRESHOLDS_PCT
896 .iter()
897 .rev()
898 .copied()
899 .find(|&t| pct >= t as i64 && t > last_warning_pct)
900 }
901
902 /// Stamp `last_warning_pct` to record that a warning at `pct` has fired.
903 #[tracing::instrument(skip_all)]
904 pub async fn update_warning_pct(
905 pool: &PgPool,
906 app_id: SyncAppId,
907 pct: i16,
908 ) -> Result<()> {
909 sqlx::query(
910 r#"
911 UPDATE sync_app_usage_current
912 SET last_warning_pct = $2, updated_at = NOW()
913 WHERE app_id = $1
914 "#,
915 )
916 .bind(app_id)
917 .bind(pct)
918 .execute(pool)
919 .await?;
920 Ok(())
921 }
922
923 /// Per-key analogue of `update_warning_pct`. Stamps the band on
924 /// `sync_key_usage_current` so subsequent ticks don't re-fire the same band
925 /// for the same key.
926 #[tracing::instrument(skip_all)]
927 pub async fn update_key_warning_pct(
928 pool: &PgPool,
929 app_id: SyncAppId,
930 key: &str,
931 pct: i16,
932 ) -> Result<()> {
933 sqlx::query(
934 r#"
935 UPDATE sync_key_usage_current
936 SET last_warning_pct = $3, updated_at = NOW()
937 WHERE app_id = $1 AND key = $2
938 "#,
939 )
940 .bind(app_id)
941 .bind(key)
942 .bind(pct)
943 .execute(pool)
944 .await?;
945 Ok(())
946 }
947
948 /// Recalculate `bytes_stored` from the authoritative `sync_blobs` table for
949 /// every app and every per-key counter. Weekly drift correction. Returns
950 /// total count of rows updated across both tables.
951 #[tracing::instrument(skip_all)]
952 pub async fn recalculate_synckit_app_storage(pool: &PgPool) -> Result<u64> {
953 let mut tx = pool.begin().await?;
954
955 let app_res = sqlx::query(
956 r#"
957 UPDATE sync_app_usage_current u
958 SET bytes_stored = COALESCE(s.total, 0), updated_at = NOW()
959 FROM (
960 SELECT app_id, SUM(size_bytes)::BIGINT AS total
961 FROM sync_blobs
962 GROUP BY app_id
963 ) s
964 WHERE u.app_id = s.app_id
965 AND u.bytes_stored <> COALESCE(s.total, 0)
966 "#,
967 )
968 .execute(&mut *tx)
969 .await?;
970
971 // Per-key reconciliation: upsert every (app_id, key) found in sync_blobs.
972 // Keys that disappear (all blobs deleted) aren't pruned here — bytes_stored
973 // would just stay at whatever it last drifted to. Blob delete isn't shipped
974 // yet, so this is fine. When it ships, add a step that resets rows whose
975 // sync_blobs total is zero (or just delete them — the upsert recreates).
976 let key_res = sqlx::query(
977 r#"
978 INSERT INTO sync_key_usage_current (app_id, key, bytes_stored, updated_at)
979 SELECT app_id, key, SUM(size_bytes)::BIGINT, NOW()
980 FROM sync_blobs
981 GROUP BY app_id, key
982 ON CONFLICT (app_id, key)
983 DO UPDATE SET
984 bytes_stored = EXCLUDED.bytes_stored,
985 updated_at = NOW()
986 WHERE sync_key_usage_current.bytes_stored <> EXCLUDED.bytes_stored
987 "#,
988 )
989 .execute(&mut *tx)
990 .await?;
991
992 tx.commit().await?;
993 Ok(app_res.rows_affected() + key_res.rows_affected())
994 }
995
996 /// Check whether a key is currently actively claimed. Used by the claim
997 /// handler to short-circuit the cap check for idempotent re-claims.
998 #[tracing::instrument(skip_all)]
999 pub async fn is_key_actively_claimed(
1000 pool: &sqlx::PgPool,
1001 app_id: SyncAppId,
1002 key: &str,
1003 ) -> Result<bool> {
1004 let row: Option<(uuid::Uuid,)> = sqlx::query_as(
1005 "SELECT id FROM sync_app_keys
1006 WHERE app_id = $1 AND key = $2 AND released_at IS NULL",
1007 )
1008 .bind(app_id)
1009 .bind(key)
1010 .fetch_optional(pool)
1011 .await?;
1012 Ok(row.is_some())
1013 }
1014