//! SyncKit v2 developer-billing queries. //! //! Operates on the billing columns added to `sync_apps` and the auxiliary //! `sync_app_usage_current` table by migration 117 (`117_synckit_v2_billing.sql`). //! //! Schema reference (kept here for offline-mode reviewers since the migration //! is not yet applied at the time of writing): //! //! ```text //! sync_apps: //! is_internal BOOLEAN NOT NULL DEFAULT FALSE //! stripe_customer_id TEXT //! stripe_subscription_id TEXT UNIQUE //! billing_status TEXT NOT NULL CHECK IN //! ('draft','active','suspended_unpaid','canceled') //! storage_gb_cap INT //! egress_multiple NUMERIC(6,2) //! enforcement_mode TEXT NOT NULL CHECK IN ('per_key','app_wide') //! key_cap INT //! current_period_start TIMESTAMPTZ //! current_period_end TIMESTAMPTZ //! //! sync_app_usage_current (one row per app): //! app_id, bytes_stored, bytes_egress_period, keys_claimed, //! last_warning_pct, period_started_at, updated_at //! ``` use chrono::{DateTime, Utc}; use sqlx::PgPool; use super::id_types::SyncAppId; use super::models::{DbSyncAppBilling, DbSyncAppKey}; use crate::error::Result; /// Outcome of a `claim_key` call. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ClaimResult { /// `true` if this call inserted a new active claim row; /// `false` if the key was already actively claimed (idempotent re-claim). pub newly_claimed: bool, /// Total active claims for this app after the operation. pub total_claimed: i32, } /// Outcome of a `release_key` call. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ReleaseResult { /// `true` if this call transitioned an active row to released; /// `false` if no active row existed (idempotent release). pub newly_released: bool, /// Total active claims for this app after the operation. pub total_claimed: i32, } /// Set the Stripe customer ID on a sync app (idempotent; re-sets the same id /// without error). Called by the billing setup route once the developer-side /// Customer object has been created in Stripe. #[tracing::instrument(skip_all)] pub async fn set_stripe_customer( pool: &PgPool, app_id: SyncAppId, stripe_customer_id: &str, ) -> Result<()> { sqlx::query("UPDATE sync_apps SET stripe_customer_id = $2 WHERE id = $1") .bind(app_id) .bind(stripe_customer_id) .execute(pool) .await?; Ok(()) } /// Activate billing on a draft app: stamps subscription id, knobs, status, /// and current period. The `egress_multiple` is passed in as `f64` and cast /// to the column's `NUMERIC(6,2)` type in SQL. #[tracing::instrument(skip_all)] #[allow(clippy::too_many_arguments)] pub async fn activate_billing( pool: &PgPool, app_id: SyncAppId, enforcement_mode: &str, storage_gb_cap: Option, key_cap: Option, gb_per_key: Option, stripe_sub_id: &str, period_start: DateTime, period_end: DateTime, ) -> Result<()> { sqlx::query( r#" UPDATE sync_apps SET billing_status = 'active', stripe_subscription_id = $2, enforcement_mode = $3, storage_gb_cap = $4, key_cap = $5, gb_per_key = $6, current_period_start = $7, current_period_end = $8 WHERE id = $1 "#, ) .bind(app_id) .bind(stripe_sub_id) .bind(enforcement_mode) .bind(storage_gb_cap) .bind(key_cap) .bind(gb_per_key) .bind(period_start) .bind(period_end) .execute(pool) .await?; Ok(()) } /// Update the pricing knobs on an already-active app (no status change). #[tracing::instrument(skip_all)] pub async fn update_knobs( pool: &PgPool, app_id: SyncAppId, enforcement_mode: &str, storage_gb_cap: Option, key_cap: Option, gb_per_key: Option, ) -> Result<()> { sqlx::query( r#" UPDATE sync_apps SET enforcement_mode = $2, storage_gb_cap = $3, key_cap = $4, gb_per_key = $5 WHERE id = $1 "#, ) .bind(app_id) .bind(enforcement_mode) .bind(storage_gb_cap) .bind(key_cap) .bind(gb_per_key) .execute(pool) .await?; Ok(()) } /// Apply a Stripe-driven billing update to a sync app: optionally change /// `billing_status` and/or refresh the current period, in ONE guarded statement. /// Returns `true` if a live (non-canceled) row was updated. /// /// `canceled` is terminal: the `AND (billing_status != 'canceled' OR $2 = 'canceled')` /// guard refuses to move a canceled app back to active/suspended_unpaid — AND, /// because the period columns are written here under the same guard, a stray /// `invoice.paid` after a `deleted` can no longer refresh the period (nor, via /// the returned `false`, reset usage) on a canceled app. This replaces the old /// split `set_billing_status` (guarded) + `set_period` (UNGUARDED) — the synckit /// instance of the period-without-guard bug class. The legitimate /// `suspended_unpaid -> active` recovery on `invoice.paid` is unaffected /// (suspended_unpaid != canceled). Initial activation from `draft` goes through /// `activate_billing`, not this path. See `crate::db::subscription_writer` for /// the cross-family rationale. #[tracing::instrument(skip_all)] pub async fn apply_billing_update<'e>( executor: impl sqlx::PgExecutor<'e>, app_id: SyncAppId, status: Option<&str>, period: Option<(DateTime, DateTime)>, ) -> Result { let (period_start, period_end) = match period { Some((start, end)) => (Some(start), Some(end)), None => (None, None), }; let result = sqlx::query( r#" UPDATE sync_apps SET billing_status = COALESCE($2, billing_status), current_period_start = COALESCE($3, current_period_start), current_period_end = COALESCE($4, current_period_end) WHERE id = $1 AND (billing_status != 'canceled' OR $2 = 'canceled') "#, ) .bind(app_id) .bind(status) .bind(period_start) .bind(period_end) .execute(executor) .await?; Ok(result.rows_affected() > 0) } /// Reset the per-period usage counters on `sync_app_usage_current`. Called /// from the `invoice.paid` webhook handler at period rollover. #[tracing::instrument(skip_all)] pub async fn reset_period_usage<'e>( executor: impl sqlx::PgExecutor<'e>, app_id: SyncAppId, ) -> Result<()> { sqlx::query( r#" UPDATE sync_app_usage_current SET bytes_egress_period = 0, last_warning_pct = 0, period_started_at = NOW(), updated_at = NOW() WHERE app_id = $1 "#, ) .bind(app_id) .execute(executor) .await?; Ok(()) } /// Look up the sync app that owns a given Stripe subscription. Used by the /// webhook router to distinguish SyncKit v2 subscriptions from /// creator-tier / Fan+ subscriptions. #[tracing::instrument(skip_all)] pub async fn get_app_by_stripe_subscription( pool: &PgPool, stripe_sub_id: &str, ) -> Result> { let row: Option<(SyncAppId,)> = sqlx::query_as( "SELECT id FROM sync_apps WHERE stripe_subscription_id = $1", ) .bind(stripe_sub_id) .fetch_optional(pool) .await?; Ok(row.map(|(id,)| id)) } /// Fetch the combined app+billing+usage view for a single app. `egress_multiple` /// is cast from NUMERIC to DOUBLE PRECISION so it decodes into `f64` without /// the `bigdecimal` sqlx feature. `sync_app_usage_current` is LEFT-joined; /// the row is normally inserted on app create (see migration 117), but /// `Option` is used so a missing row doesn't blow up the query. #[tracing::instrument(skip_all)] pub async fn get_app_with_billing( pool: &PgPool, app_id: SyncAppId, ) -> Result> { let app = sqlx::query_as::<_, DbSyncAppBilling>( r#" SELECT sa.id, sa.creator_id, sa.name, sa.is_internal, sa.stripe_customer_id, sa.stripe_subscription_id, sa.billing_status, sa.storage_gb_cap, sa.enforcement_mode, sa.key_cap, sa.gb_per_key, sa.current_period_start, sa.current_period_end, u.bytes_stored, u.bytes_egress_period, u.keys_claimed, u.last_warning_pct, u.period_started_at, p.slug AS project_slug FROM sync_apps sa LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id LEFT JOIN projects p ON p.id = sa.project_id WHERE sa.id = $1 "#, ) .bind(app_id) .fetch_optional(pool) .await?; Ok(app) } /// Batch variant of `get_app_with_billing` that loads every app owned by a /// creator with its billing+usage join in one query. Used by the user-level /// SyncKit dashboard tab. #[tracing::instrument(skip_all)] pub async fn get_apps_with_billing_by_creator( pool: &PgPool, creator_id: super::id_types::UserId, ) -> Result> { let apps = sqlx::query_as::<_, DbSyncAppBilling>( r#" SELECT sa.id, sa.creator_id, sa.name, sa.is_internal, sa.stripe_customer_id, sa.stripe_subscription_id, sa.billing_status, sa.storage_gb_cap, sa.enforcement_mode, sa.key_cap, sa.gb_per_key, sa.current_period_start, sa.current_period_end, u.bytes_stored, u.bytes_egress_period, u.keys_claimed, u.last_warning_pct, u.period_started_at, p.slug AS project_slug FROM sync_apps sa LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id LEFT JOIN projects p ON p.id = sa.project_id WHERE sa.creator_id = $1 "#, ) .bind(creator_id) .fetch_all(pool) .await?; Ok(apps) } /// Batch variant of `get_app_with_billing` for one project. Used by the /// project-level SyncKit dashboard tab. #[tracing::instrument(skip_all)] pub async fn get_apps_with_billing_by_project( pool: &PgPool, project_id: super::id_types::ProjectId, ) -> Result> { let apps = sqlx::query_as::<_, DbSyncAppBilling>( r#" SELECT sa.id, sa.creator_id, sa.name, sa.is_internal, sa.stripe_customer_id, sa.stripe_subscription_id, sa.billing_status, sa.storage_gb_cap, sa.enforcement_mode, sa.key_cap, sa.gb_per_key, sa.current_period_start, sa.current_period_end, u.bytes_stored, u.bytes_egress_period, u.keys_claimed, u.last_warning_pct, u.period_started_at, p.slug AS project_slug FROM sync_apps sa LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id LEFT JOIN projects p ON p.id = sa.project_id WHERE sa.project_id = $1 "#, ) .bind(project_id) .fetch_all(pool) .await?; Ok(apps) } /// Claim an SDK encryption key for an app. Idempotent: a re-claim of an /// already-active key returns `newly_claimed = false` without inserting. /// /// The transaction locks `sync_app_usage_current` for this app first, so the /// route handler can read `keys_claimed` and decide on the cap before this /// runs without races. (The handler does its check pre-transaction; this /// function re-checks idempotency inside the lock.) #[tracing::instrument(skip_all)] pub async fn claim_key( pool: &sqlx::PgPool, app_id: SyncAppId, key: &str, ) -> Result { let mut tx = pool.begin().await?; // Lock the usage row for this app. Returns the current keys_claimed. let (mut keys_claimed,): (i32,) = sqlx::query_as( "SELECT keys_claimed FROM sync_app_usage_current WHERE app_id = $1 FOR UPDATE", ) .bind(app_id) .fetch_one(&mut *tx) .await?; // Is there already an active claim for this key? let existing: Option<(uuid::Uuid,)> = sqlx::query_as( "SELECT id FROM sync_app_keys WHERE app_id = $1 AND key = $2 AND released_at IS NULL", ) .bind(app_id) .bind(key) .fetch_optional(&mut *tx) .await?; let newly_claimed = if existing.is_some() { false } else { sqlx::query( "INSERT INTO sync_app_keys (app_id, key) VALUES ($1, $2)", ) .bind(app_id) .bind(key) .execute(&mut *tx) .await?; sqlx::query( "UPDATE sync_app_usage_current SET keys_claimed = keys_claimed + 1, updated_at = NOW() WHERE app_id = $1", ) .bind(app_id) .execute(&mut *tx) .await?; keys_claimed += 1; true }; tx.commit().await?; Ok(ClaimResult { newly_claimed, total_claimed: keys_claimed, }) } /// Release an SDK encryption key. Idempotent: releasing a key that is not /// actively claimed returns `newly_released = false`. #[tracing::instrument(skip_all)] pub async fn release_key( pool: &sqlx::PgPool, app_id: SyncAppId, key: &str, ) -> Result { let mut tx = pool.begin().await?; let (mut keys_claimed,): (i32,) = sqlx::query_as( "SELECT keys_claimed FROM sync_app_usage_current WHERE app_id = $1 FOR UPDATE", ) .bind(app_id) .fetch_one(&mut *tx) .await?; let released: Option<(uuid::Uuid,)> = sqlx::query_as( "UPDATE sync_app_keys SET released_at = NOW() WHERE app_id = $1 AND key = $2 AND released_at IS NULL RETURNING id", ) .bind(app_id) .bind(key) .fetch_optional(&mut *tx) .await?; let newly_released = if released.is_some() { sqlx::query( "UPDATE sync_app_usage_current SET keys_claimed = GREATEST(keys_claimed - 1, 0), updated_at = NOW() WHERE app_id = $1", ) .bind(app_id) .execute(&mut *tx) .await?; keys_claimed = (keys_claimed - 1).max(0); true } else { false }; tx.commit().await?; Ok(ReleaseResult { newly_released, total_claimed: keys_claimed, }) } /// List active (un-released) key claims for an app, ordered by `claimed_at DESC`. /// Used by the dashboard "Active keys" view. `bytes_stored` is LEFT-joined /// from `sync_key_usage_current` — `0` when no upload has landed yet. #[tracing::instrument(skip_all)] pub async fn list_active_keys( pool: &sqlx::PgPool, app_id: SyncAppId, limit: i64, offset: i64, ) -> Result> { let rows = sqlx::query_as::<_, DbSyncAppKey>( r#" SELECT k.id, k.key, k.claimed_at, COALESCE(u.bytes_stored, 0) AS bytes_stored FROM sync_app_keys k LEFT JOIN sync_key_usage_current u ON u.app_id = k.app_id AND u.key = k.key WHERE k.app_id = $1 AND k.released_at IS NULL ORDER BY k.claimed_at DESC LIMIT $2 OFFSET $3 "#, ) .bind(app_id) .bind(limit) .bind(offset) .fetch_all(pool) .await?; Ok(rows) } /// Per-app top-N key usage for the dashboard panel. Returns the highest-usage /// active keys (sorted by `bytes_stored DESC`) for each app in `app_ids`, /// batched into a single query so the integrations page doesn't N+1 across /// every per_key-mode app. #[tracing::instrument(skip_all)] pub async fn get_top_keys_per_app( pool: &PgPool, app_ids: &[SyncAppId], limit_per_app: i64, ) -> Result> { if app_ids.is_empty() { return Ok(Vec::new()); } let rows: Vec<(SyncAppId, String, i64)> = sqlx::query_as( r#" SELECT app_id, key, bytes_stored FROM ( SELECT u.app_id, u.key, u.bytes_stored, ROW_NUMBER() OVER (PARTITION BY u.app_id ORDER BY u.bytes_stored DESC, u.key) AS rn FROM sync_key_usage_current u JOIN sync_app_keys k ON k.app_id = u.app_id AND k.key = u.key AND k.released_at IS NULL WHERE u.app_id = ANY($1) ) ranked WHERE rn <= $2 ORDER BY app_id, rn "#, ) .bind(app_ids) .bind(limit_per_app) .fetch_all(pool) .await?; Ok(rows) } // ── Phase 5: Usage counters and cap enforcement ── /// A breached cap, returned by `would_exceed_*` checks. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ExceededLimit { /// `"storage"`, `"storage_per_key"`, or `"egress"`. (`"billing"` is reserved /// for inactive-billing failure paths so a single 402 response shape can /// carry every reason; the caller decides whether to thread that through /// here or check `billing_status` separately.) pub dimension: &'static str, /// Current usage in bytes (before the would-be addition). pub used: i64, /// Configured cap in bytes. pub limit: i64, /// The SDK key that hit its cap, if `dimension == "storage_per_key"`. /// `None` for app-wide dimensions. pub key: Option, } /// A row that may need a warning email sent. /// /// `threshold_pct` is the highest WARNING_THRESHOLDS_PCT band currently /// breached above `last_warning_pct`. The caller is responsible for filtering /// out apps where no new breach has occurred (i.e. usage hasn't crossed the /// next threshold since the previous warning). #[derive(Debug, Clone, PartialEq, Eq)] pub struct WarningCandidate { pub app_id: SyncAppId, pub creator_id: super::id_types::UserId, pub creator_email: String, pub app_name: String, pub threshold_pct: i16, pub dimension: &'static str, pub used: i64, pub limit: i64, /// SDK key that breached the threshold, when `dimension == /// "storage_per_key"`. `None` for app-wide breaches. pub key: Option, } /// Increment (or decrement, for negative `delta`) the rolling `bytes_stored` /// counter on `sync_app_usage_current` and the per-key row on /// `sync_key_usage_current` (upserted if missing). Returns the new app-level /// total. /// /// NOTE: read-then-add elsewhere in the request handler is racy. Acceptable /// overshoot under concurrent uploads in v1. #[tracing::instrument(skip_all)] pub async fn add_bytes_stored( pool: &PgPool, app_id: SyncAppId, key: &str, delta: i64, ) -> Result { let mut tx = pool.begin().await?; let (new_val,): (i64,) = sqlx::query_as( r#" UPDATE sync_app_usage_current SET bytes_stored = GREATEST(bytes_stored + $2, 0), updated_at = NOW() WHERE app_id = $1 RETURNING bytes_stored "#, ) .bind(app_id) .bind(delta) .fetch_one(&mut *tx) .await?; sqlx::query( r#" INSERT INTO sync_key_usage_current (app_id, key, bytes_stored) VALUES ($1, $2, GREATEST($3, 0)) ON CONFLICT (app_id, key) DO UPDATE SET bytes_stored = GREATEST(sync_key_usage_current.bytes_stored + $3, 0), updated_at = NOW() "#, ) .bind(app_id) .bind(key) .bind(delta) .execute(&mut *tx) .await?; tx.commit().await?; Ok(new_val) } /// Increment (or decrement) the per-period `bytes_egress_period` counter. /// Returns the new value. #[tracing::instrument(skip_all)] pub async fn add_bytes_egress( pool: &PgPool, app_id: SyncAppId, delta: i64, ) -> Result { let (new_val,): (i64,) = sqlx::query_as( r#" UPDATE sync_app_usage_current SET bytes_egress_period = GREATEST(bytes_egress_period + $2, 0), updated_at = NOW() WHERE app_id = $1 RETURNING bytes_egress_period "#, ) .bind(app_id) .bind(delta) .fetch_one(pool) .await?; Ok(new_val) } /// Check whether storing `additional_bytes` more would exceed the storage cap. /// /// Returns `None` if no cap applies (internal apps, billing not active, or /// the cap is unset). Returns `Some(ExceededLimit)` only when adding /// `additional_bytes` would push usage past the cap. /// /// In `per_key` mode the per-key counter is checked first against /// `gb_per_key × 1GB` — if exceeded, returns `dimension: "storage_per_key"` /// with the offending `key`. The app-aggregate (`key_cap × gb_per_key`) is /// also checked as a defensive hard ceiling; under normal operation this /// can't trip before some key has tripped, but it guards against drift. /// /// In `bulk` mode `key` is unused (still passed in for call-site uniformity) /// and the app-wide `storage_gb_cap` is the only check. /// /// Race-condition note: this reads, the caller adds. Concurrent uploads can /// produce small overshoots. Acceptable for v1. /// Row for the storage-cap lookup: /// (is_internal, enforcement_mode, storage_gb_cap, key_cap, gb_per_key, used_bytes). type AppStorageCapRow = (bool, String, Option, Option, Option, Option); #[tracing::instrument(skip_all)] pub async fn would_exceed_storage( pool: &PgPool, app_id: SyncAppId, key: &str, additional_bytes: i64, ) -> Result> { let row: Option = sqlx::query_as( r#" SELECT sa.is_internal, sa.enforcement_mode, sa.storage_gb_cap, sa.key_cap, sa.gb_per_key, u.bytes_stored FROM sync_apps sa LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id WHERE sa.id = $1 "#, ) .bind(app_id) .fetch_optional(pool) .await?; let Some((is_internal, mode, storage_gb, key_cap, gb_per_key, bytes_stored)) = row else { return Ok(None); }; if is_internal { return Ok(None); } let app_used = bytes_stored.unwrap_or(0); match mode.as_str() { "bulk" => { let Some(gb) = storage_gb else { return Ok(None) }; let limit = crate::synckit_billing::storage_cap_bytes(gb as u32); if app_used.saturating_add(additional_bytes) > limit { return Ok(Some(ExceededLimit { dimension: "storage", used: app_used, limit, key: None, })); } } "per_key" => { let (Some(k), Some(g)) = (key_cap, gb_per_key) else { return Ok(None) }; let per_key_limit = crate::synckit_billing::storage_cap_bytes(g as u32); let app_limit = crate::synckit_billing::storage_cap_bytes(k.saturating_mul(g) as u32); let key_used: Option = sqlx::query_scalar( "SELECT bytes_stored FROM sync_key_usage_current WHERE app_id = $1 AND key = $2", ) .bind(app_id) .bind(key) .fetch_optional(pool) .await?; let key_used = key_used.unwrap_or(0); if key_used.saturating_add(additional_bytes) > per_key_limit { return Ok(Some(ExceededLimit { dimension: "storage_per_key", used: key_used, limit: per_key_limit, key: Some(key.to_string()), })); } // Defensive app-aggregate ceiling. Tripping this before the per-key // check means the per-key counters have drifted under the app // counter — refuse rather than admit silent over-allocation. if app_used.saturating_add(additional_bytes) > app_limit { return Ok(Some(ExceededLimit { dimension: "storage", used: app_used, limit: app_limit, key: None, })); } } _ => return Ok(None), } Ok(None) } /// Fetch the list of active, non-internal apps that may need a warning email, /// along with the data needed to compute which threshold (if any) has been /// breached since the last notice. The per-app breach computation is done in /// Rust by the caller (see `scheduler::synckit_warnings`). #[tracing::instrument(skip_all)] pub async fn get_apps_needing_warning(pool: &PgPool) -> Result> { use super::id_types::UserId; // Egress is no longer a price input or an enforced cap (see migration 118), // so the only dimension that warrants a usage warning is storage. The // effective storage cap depends on enforcement_mode. #[derive(sqlx::FromRow)] struct Row { app_id: SyncAppId, creator_id: UserId, creator_email: String, app_name: String, enforcement_mode: String, storage_gb_cap: Option, key_cap: Option, gb_per_key: Option, bytes_stored: i64, last_warning_pct: i16, } let rows = sqlx::query_as::<_, Row>( r#" SELECT sa.id AS app_id, sa.creator_id AS creator_id, u_user.email AS creator_email, sa.name AS app_name, sa.enforcement_mode AS enforcement_mode, sa.storage_gb_cap AS storage_gb_cap, sa.key_cap AS key_cap, sa.gb_per_key AS gb_per_key, COALESCE(u.bytes_stored, 0) AS bytes_stored, COALESCE(u.last_warning_pct, 0::smallint) AS last_warning_pct FROM sync_apps sa JOIN users u_user ON u_user.id = sa.creator_id LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id WHERE sa.billing_status = 'active' AND sa.is_internal = FALSE "#, ) .fetch_all(pool) .await?; let mut out = Vec::new(); let mut per_key_apps: Vec<(SyncAppId, UserId, String, String, i32)> = Vec::new(); for r in rows { match r.enforcement_mode.as_str() { "bulk" => { let Some(gb) = r.storage_gb_cap else { continue }; let limit = crate::synckit_billing::storage_cap_bytes(gb as u32); if let Some(pct) = highest_breached_threshold(r.bytes_stored, limit, r.last_warning_pct) { out.push(WarningCandidate { app_id: r.app_id, creator_id: r.creator_id, creator_email: r.creator_email, app_name: r.app_name, threshold_pct: pct, dimension: "storage", used: r.bytes_stored, limit, key: None, }); } } "per_key" => { // Defer to a second query that fans out per (app, key); the // per-key counter — not the app aggregate — is what we warn on. let (Some(_), Some(g)) = (r.key_cap, r.gb_per_key) else { continue }; per_key_apps.push((r.app_id, r.creator_id, r.creator_email, r.app_name, g)); } _ => {} } } if !per_key_apps.is_empty() { #[derive(sqlx::FromRow)] struct KeyRow { app_id: SyncAppId, key: String, bytes_stored: i64, last_warning_pct: i16, } let app_ids: Vec = per_key_apps.iter().map(|t| t.0).collect(); let key_rows = sqlx::query_as::<_, KeyRow>( r#" SELECT u.app_id, u.key, u.bytes_stored, u.last_warning_pct FROM sync_key_usage_current u JOIN sync_app_keys k ON k.app_id = u.app_id AND k.key = u.key AND k.released_at IS NULL WHERE u.app_id = ANY($1) "#, ) .bind(&app_ids) .fetch_all(pool) .await?; // Index app metadata by id for cheap join. let meta: std::collections::HashMap = per_key_apps .into_iter() .map(|(a, c, e, n, g)| (a, (c, e, n, g))) .collect(); for r in key_rows { let Some((creator_id, creator_email, app_name, gb_per_key)) = meta.get(&r.app_id).cloned() else { continue; }; let limit = crate::synckit_billing::storage_cap_bytes(gb_per_key as u32); if let Some(pct) = highest_breached_threshold(r.bytes_stored, limit, r.last_warning_pct) { out.push(WarningCandidate { app_id: r.app_id, creator_id, creator_email, app_name, threshold_pct: pct, dimension: "storage_per_key", used: r.bytes_stored, limit, key: Some(r.key), }); } } } Ok(out) } /// Compute the highest WARNING_THRESHOLDS_PCT band currently exceeded by /// `used / limit` whose value is strictly above `last_warning_pct`. /// /// Returns `None` if no new band has been breached. pub fn highest_breached_threshold(used: i64, limit: i64, last_warning_pct: i16) -> Option { if limit <= 0 { return None; } // Compute current percentage as integer; saturate at 100+. // Use f64 to avoid 32-bit overflow for very large byte counts. let pct_f = (used as f64 / limit as f64) * 100.0; let pct = pct_f.floor() as i64; crate::synckit_billing::WARNING_THRESHOLDS_PCT .iter() .rev() .copied() .find(|&t| pct >= t as i64 && t > last_warning_pct) } /// Stamp `last_warning_pct` to record that a warning at `pct` has fired. #[tracing::instrument(skip_all)] pub async fn update_warning_pct( pool: &PgPool, app_id: SyncAppId, pct: i16, ) -> Result<()> { sqlx::query( r#" UPDATE sync_app_usage_current SET last_warning_pct = $2, updated_at = NOW() WHERE app_id = $1 "#, ) .bind(app_id) .bind(pct) .execute(pool) .await?; Ok(()) } /// Per-key analogue of `update_warning_pct`. Stamps the band on /// `sync_key_usage_current` so subsequent ticks don't re-fire the same band /// for the same key. #[tracing::instrument(skip_all)] pub async fn update_key_warning_pct( pool: &PgPool, app_id: SyncAppId, key: &str, pct: i16, ) -> Result<()> { sqlx::query( r#" UPDATE sync_key_usage_current SET last_warning_pct = $3, updated_at = NOW() WHERE app_id = $1 AND key = $2 "#, ) .bind(app_id) .bind(key) .bind(pct) .execute(pool) .await?; Ok(()) } /// Recalculate `bytes_stored` from the authoritative `sync_blobs` table for /// every app and every per-key counter. Weekly drift correction. Returns /// total count of rows updated across both tables. #[tracing::instrument(skip_all)] pub async fn recalculate_synckit_app_storage(pool: &PgPool) -> Result { let mut tx = pool.begin().await?; let app_res = sqlx::query( r#" UPDATE sync_app_usage_current u SET bytes_stored = COALESCE(s.total, 0), updated_at = NOW() FROM ( SELECT app_id, SUM(size_bytes)::BIGINT AS total FROM sync_blobs GROUP BY app_id ) s WHERE u.app_id = s.app_id AND u.bytes_stored <> COALESCE(s.total, 0) "#, ) .execute(&mut *tx) .await?; // Per-key reconciliation: upsert every (app_id, key) found in sync_blobs. // Keys that disappear (all blobs deleted) aren't pruned here — bytes_stored // would just stay at whatever it last drifted to. Blob delete isn't shipped // yet, so this is fine. When it ships, add a step that resets rows whose // sync_blobs total is zero (or just delete them — the upsert recreates). let key_res = sqlx::query( r#" INSERT INTO sync_key_usage_current (app_id, key, bytes_stored, updated_at) SELECT app_id, key, SUM(size_bytes)::BIGINT, NOW() FROM sync_blobs GROUP BY app_id, key ON CONFLICT (app_id, key) DO UPDATE SET bytes_stored = EXCLUDED.bytes_stored, updated_at = NOW() WHERE sync_key_usage_current.bytes_stored <> EXCLUDED.bytes_stored "#, ) .execute(&mut *tx) .await?; tx.commit().await?; Ok(app_res.rows_affected() + key_res.rows_affected()) } /// Check whether a key is currently actively claimed. Used by the claim /// handler to short-circuit the cap check for idempotent re-claims. #[tracing::instrument(skip_all)] pub async fn is_key_actively_claimed( pool: &sqlx::PgPool, app_id: SyncAppId, key: &str, ) -> Result { let row: Option<(uuid::Uuid,)> = sqlx::query_as( "SELECT id FROM sync_app_keys WHERE app_id = $1 AND key = $2 AND released_at IS NULL", ) .bind(app_id) .bind(key) .fetch_optional(pool) .await?; Ok(row.is_some()) }