use chrono::{DateTime, Utc}; use sqlx::PgPool; use crate::db::{SyncAppId, UserId}; use crate::error::Result; /// Parameters for inserting a new app sync subscription from a webhook event. pub struct NewAppSyncSubscription<'a> { pub user_id: UserId, pub app_id: SyncAppId, pub stripe_subscription_id: &'a str, pub stripe_customer_id: &'a str, /// Billing interval ("monthly" / "annual"). Persisted in the `tier` column /// (kept lowercase for that legacy name) so cap-change handlers know which /// interval the user is on without round-tripping to Stripe. pub interval: &'a str, pub storage_limit_bytes: i64, } /// End-user subscription to an app's cloud sync (rows in `app_sync_subscriptions`). #[derive(Debug, sqlx::FromRow)] pub struct DbAppSyncSubscription { pub stripe_subscription_id: String, pub interval: String, pub status: String, pub storage_limit_bytes: Option, pub pending_storage_limit_bytes: Option, pub current_period_end: Option>, } /// Look up the active subscription a user has on an app, if any. /// Returns `None` if the user has never subscribed or the subscription was deleted. #[tracing::instrument(skip_all)] pub async fn get_user_app_subscription( pool: &PgPool, user_id: UserId, app_id: SyncAppId, ) -> Result> { let row = sqlx::query_as::<_, DbAppSyncSubscription>( r#" SELECT stripe_subscription_id, tier AS interval, status, storage_limit_bytes, pending_storage_limit_bytes, current_period_end FROM app_sync_subscriptions WHERE user_id = $1 AND app_id = $2 "#, ) .bind(user_id) .bind(app_id) .fetch_optional(pool) .await?; Ok(row) } /// Insert or reactivate an app sync subscription. Returns `Ok(true)` if a row /// was inserted or reactivated, `Ok(false)` if an identical active subscription /// already existed for this (user, app) pair (idempotent webhook replay). /// /// ON CONFLICT DO UPDATE (not DO NOTHING) so a paid re-subscribe after a /// cancellation reactivates the row AT CHECKOUT, deterministically — the old /// DO NOTHING left a re-subscribed user `canceled` until a later /// `customer.subscription.updated`(active) happened to arrive (Run #12 MINOR). /// The guard WHERE makes a duplicate checkout webhook for an unchanged active /// row a no-op, mirroring `creator_tiers::create_creator_subscription`. Pairing /// this with the terminal guard on `update_app_sync_subscription_status` keeps /// reactivation on the checkout path while the status-update webhooks can't /// revive a canceled row. #[tracing::instrument(skip_all)] pub async fn create_app_sync_subscription( pool: &PgPool, sub: &NewAppSyncSubscription<'_>, ) -> Result { let result = sqlx::query( r#" INSERT INTO app_sync_subscriptions (user_id, app_id, stripe_subscription_id, stripe_customer_id, tier, status, storage_limit_bytes) VALUES ($1, $2, $3, $4, $5, 'active', $6) ON CONFLICT (user_id, app_id) DO UPDATE SET stripe_subscription_id = EXCLUDED.stripe_subscription_id, stripe_customer_id = EXCLUDED.stripe_customer_id, tier = EXCLUDED.tier, storage_limit_bytes = EXCLUDED.storage_limit_bytes, status = 'active', canceled_at = NULL WHERE app_sync_subscriptions.stripe_subscription_id != EXCLUDED.stripe_subscription_id OR app_sync_subscriptions.status != 'active' "#, ) .bind(sub.user_id) .bind(sub.app_id) .bind(sub.stripe_subscription_id) .bind(sub.stripe_customer_id) .bind(sub.interval) .bind(sub.storage_limit_bytes) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update the status of an existing app sync subscription (e.g. on /// `customer.subscription.updated` or `.deleted` webhook events). /// /// `canceled` is terminal: the `AND (status != 'canceled' OR $2 = 'canceled')` /// guard refuses to revive a canceled app-sub via an out-of-order webhook; /// reactivation happens at checkout through `create_app_sync_subscription`'s /// DO UPDATE path. Consistent with every other subscription family's setter. #[tracing::instrument(skip_all)] pub async fn update_app_sync_subscription_status( pool: &PgPool, stripe_subscription_id: &str, status: &str, current_period_end: Option>, ) -> Result<()> { sqlx::query( r#" UPDATE app_sync_subscriptions SET status = $2, current_period_end = COALESCE($3, current_period_end), canceled_at = CASE WHEN $2 = 'canceled' THEN NOW() ELSE canceled_at END WHERE stripe_subscription_id = $1 AND (status != 'canceled' OR $2 = 'canceled') "#, ) .bind(stripe_subscription_id) .bind(status) .bind(current_period_end) .execute(pool) .await?; Ok(()) } /// Look up an app sync subscription by its Stripe subscription ID. Used by /// webhook handlers to find the row to update. #[tracing::instrument(skip_all)] pub async fn get_subscription_by_stripe_id( pool: &PgPool, stripe_subscription_id: &str, ) -> Result> { let row: Option<(UserId, SyncAppId)> = sqlx::query_as( r#" SELECT user_id, app_id FROM app_sync_subscriptions WHERE stripe_subscription_id = $1 "#, ) .bind(stripe_subscription_id) .fetch_optional(pool) .await?; Ok(row) } /// Queue a storage-cap change to apply at the next billing cycle. Stores the /// new cap in `pending_storage_limit_bytes`; the renewal webhook handler /// promotes it to `storage_limit_bytes` once Stripe confirms the period roll. #[tracing::instrument(skip_all)] pub async fn set_pending_storage_cap( pool: &PgPool, user_id: UserId, app_id: SyncAppId, pending_bytes: i64, ) -> Result<()> { sqlx::query( r#" UPDATE app_sync_subscriptions SET pending_storage_limit_bytes = $3 WHERE user_id = $1 AND app_id = $2 "#, ) .bind(user_id) .bind(app_id) .bind(pending_bytes) .execute(pool) .await?; Ok(()) } /// Promote a queued cap change to the active cap. Called from the renewal /// webhook handler when Stripe rolls the subscription to a new period. /// No-op if no pending change is queued. #[tracing::instrument(skip_all)] pub async fn apply_pending_storage_cap( pool: &PgPool, stripe_subscription_id: &str, ) -> Result<()> { sqlx::query( r#" UPDATE app_sync_subscriptions SET storage_limit_bytes = pending_storage_limit_bytes, pending_storage_limit_bytes = NULL WHERE stripe_subscription_id = $1 AND pending_storage_limit_bytes IS NOT NULL "#, ) .bind(stripe_subscription_id) .execute(pool) .await?; Ok(()) }