//! Subscription queries: tier CRUD, subscription lifecycle, and access control. use sqlx::PgPool; use super::models::*; use super::{PriceCents, ProjectId, SubscriptionId, SubscriptionTierId, UserId}; use crate::error::Result; // ── Tier CRUD ── /// Create a new subscription tier for a project. #[tracing::instrument(skip_all)] pub async fn create_subscription_tier( pool: &PgPool, project_id: ProjectId, name: &str, description: Option<&str>, price_cents: PriceCents, ) -> Result { let tier = sqlx::query_as::<_, DbSubscriptionTier>( r#" INSERT INTO subscription_tiers (project_id, name, description, price_cents) VALUES ($1, $2, $3, $4) RETURNING * "#, ) .bind(project_id) .bind(name) .bind(description) .bind(price_cents.as_i32()) .fetch_one(pool) .await?; Ok(tier) } /// Get a subscription tier by ID. #[tracing::instrument(skip_all)] pub async fn get_subscription_tier_by_id( pool: &PgPool, id: SubscriptionTierId, ) -> Result> { let tier = sqlx::query_as::<_, DbSubscriptionTier>( "SELECT * FROM subscription_tiers WHERE id = $1", ) .bind(id) .fetch_optional(pool) .await?; Ok(tier) } /// Get all active tiers for a project, ordered by sort_order. #[tracing::instrument(skip_all)] pub async fn get_active_tiers_by_project( pool: &PgPool, project_id: ProjectId, ) -> Result> { let tiers = sqlx::query_as::<_, DbSubscriptionTier>( "SELECT * FROM subscription_tiers WHERE project_id = $1 AND is_active = true ORDER BY sort_order, created_at", ) .bind(project_id) .fetch_all(pool) .await?; Ok(tiers) } /// Get all tiers for a project (active and inactive), for dashboard management. #[tracing::instrument(skip_all)] pub async fn get_all_tiers_by_project( pool: &PgPool, project_id: ProjectId, ) -> Result> { let tiers = sqlx::query_as::<_, DbSubscriptionTier>( "SELECT * FROM subscription_tiers WHERE project_id = $1 ORDER BY sort_order, created_at", ) .bind(project_id) .fetch_all(pool) .await?; Ok(tiers) } /// Update a subscription tier's name, description, and active status. #[tracing::instrument(skip_all)] pub async fn update_subscription_tier( pool: &PgPool, id: SubscriptionTierId, name: &str, description: Option<&str>, is_active: bool, ) -> Result { let tier = sqlx::query_as::<_, DbSubscriptionTier>( r#" UPDATE subscription_tiers SET name = $2, description = $3, is_active = $4 WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(name) .bind(description) .bind(is_active) .fetch_one(pool) .await?; Ok(tier) } /// Store Stripe product and price IDs on a tier after creating them on connected account. #[tracing::instrument(skip_all)] pub async fn update_tier_stripe_ids( pool: &PgPool, tier_id: SubscriptionTierId, product_id: &str, price_id: &str, ) -> Result<()> { sqlx::query( r#" UPDATE subscription_tiers SET stripe_product_id = $2, stripe_price_id = $3 WHERE id = $1 "#, ) .bind(tier_id) .bind(product_id) .bind(price_id) .execute(pool) .await?; Ok(()) } /// Delete a subscription tier. Soft-deletes (sets is_active=false) if any /// subscriptions reference it; hard-deletes otherwise. /// /// Uses a transaction with FOR UPDATE to prevent a TOCTOU race where a /// subscription could be created between the existence check and the delete. #[tracing::instrument(skip_all)] pub async fn delete_subscription_tier(pool: &PgPool, id: SubscriptionTierId) -> Result<()> { let mut tx = pool.begin().await?; // Lock the tier row to serialize against concurrent subscription creation sqlx::query("SELECT id FROM subscription_tiers WHERE id = $1 FOR UPDATE") .bind(id) .fetch_optional(&mut *tx) .await? .ok_or(sqlx::Error::RowNotFound)?; let has_subscriptions: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM subscriptions WHERE tier_id = $1)", ) .bind(id) .fetch_one(&mut *tx) .await?; if has_subscriptions { sqlx::query("UPDATE subscription_tiers SET is_active = false WHERE id = $1") .bind(id) .execute(&mut *tx) .await?; } else { sqlx::query("DELETE FROM subscription_tiers WHERE id = $1") .bind(id) .execute(&mut *tx) .await?; } tx.commit().await?; Ok(()) } // ── Subscription lifecycle ── /// Create a new subscription record after successful checkout. /// /// Returns `None` if the subscription already exists (duplicate webhook /// or concurrent active subscription for the same user+project). /// The partial UNIQUE index on `(subscriber_id, project_id) WHERE status = 'active'` /// prevents multiple active subscriptions at the DB level. #[tracing::instrument(skip_all)] pub async fn create_subscription<'e>( executor: impl sqlx::PgExecutor<'e>, subscriber_id: UserId, tier_id: SubscriptionTierId, project_id: ProjectId, stripe_subscription_id: &str, stripe_customer_id: &str, ) -> Result> { let sub = sqlx::query_as::<_, DbSubscription>( r#" INSERT INTO subscriptions (subscriber_id, tier_id, project_id, stripe_subscription_id, stripe_customer_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING RETURNING * "#, ) .bind(subscriber_id) .bind(tier_id) .bind(project_id) .bind(stripe_subscription_id) .bind(stripe_customer_id) .fetch_optional(executor) .await?; Ok(sub) } /// Look up a subscription by its Stripe subscription ID. #[tracing::instrument(skip_all)] pub async fn get_subscription_by_stripe_id( pool: &PgPool, stripe_sub_id: &str, ) -> Result> { let sub = sqlx::query_as::<_, DbSubscription>( "SELECT * FROM subscriptions WHERE stripe_subscription_id = $1", ) .bind(stripe_sub_id) .fetch_optional(pool) .await?; Ok(sub) } // Apply a Stripe-driven status and/or period update in one guarded statement. // `canceled` is terminal — an out-of-order `updated`(active) or `invoice.paid` // landing after a `deleted` cannot revive the row (status) nor refresh its // period, because both columns are written here under the single guard. The // old split `update_subscription_status` + `update_subscription_period` (whose // period half lacked the guard) are replaced by this; reactivation only ever // happens at checkout via `create_subscription`, never through this path. crate::db::subscription_writer::define_stripe_subscription_writer!( apply_stripe_update, "subscriptions", DbSubscription ); /// Mark a subscription as canceled. #[tracing::instrument(skip_all)] pub async fn cancel_subscription( pool: &PgPool, stripe_sub_id: &str, ) -> Result> { let sub = sqlx::query_as::<_, DbSubscription>( r#" UPDATE subscriptions SET status = 'canceled', canceled_at = COALESCE(canceled_at, NOW()) WHERE stripe_subscription_id = $1 RETURNING * "#, ) .bind(stripe_sub_id) .fetch_optional(pool) .await?; Ok(sub) } // ── Suspension pause/resume ── /// Get all active subscriptions to a creator's projects (for pausing on suspension). #[tracing::instrument(skip_all)] pub async fn get_active_subscriptions_by_creator( pool: &PgPool, creator_id: UserId, ) -> Result> { let subs = sqlx::query_as::<_, DbSubscription>( r#" SELECT s.* FROM subscriptions s WHERE s.project_id IN (SELECT id FROM projects WHERE user_id = $1) AND s.status = 'active' AND s.paused_at IS NULL "#, ) .bind(creator_id) .fetch_all(pool) .await?; Ok(subs) } /// Mark all active subscriptions to a creator's projects as paused. #[tracing::instrument(skip_all)] pub async fn pause_subscriptions_for_creator( pool: &PgPool, creator_id: UserId, ) -> Result { let result = sqlx::query( r#" UPDATE subscriptions SET paused_at = NOW() WHERE project_id IN (SELECT id FROM projects WHERE user_id = $1) AND status = 'active' AND paused_at IS NULL "#, ) .bind(creator_id) .execute(pool) .await?; Ok(result.rows_affected()) } /// Get all paused subscriptions to a creator's projects (for cancelling on termination). #[tracing::instrument(skip_all)] pub async fn get_paused_subscriptions_by_creator( pool: &PgPool, creator_id: UserId, ) -> Result> { let subs = sqlx::query_as::<_, DbSubscription>( r#" SELECT s.* FROM subscriptions s WHERE s.project_id IN (SELECT id FROM projects WHERE user_id = $1) AND s.status = 'active' AND s.paused_at IS NOT NULL "#, ) .bind(creator_id) .fetch_all(pool) .await?; Ok(subs) } /// Resume all paused subscriptions for a creator's projects. #[tracing::instrument(skip_all)] pub async fn resume_subscriptions_for_creator( pool: &PgPool, creator_id: UserId, ) -> Result> { let subs = sqlx::query_as::<_, DbSubscription>( r#" UPDATE subscriptions SET paused_at = NULL WHERE project_id IN (SELECT id FROM projects WHERE user_id = $1) AND status = 'active' AND paused_at IS NOT NULL RETURNING * "#, ) .bind(creator_id) .fetch_all(pool) .await?; Ok(subs) } // ── Access control ── /// SQL predicate identifying a `subscriptions` row that currently grants access /// to its scope. The `current_period_end` clause is defense-in-depth against a /// missed/delayed `customer.subscription.deleted` webhook — `status = 'active'` /// alone trusts Stripe to push the cancellation promptly. /// /// What a subscription access check is scoped to. Both arms run the SAME sealed /// predicate inside [`gate`], so a project gate and an item gate cannot diverge. #[derive(Debug, Clone, Copy)] pub enum SubscriptionScope { Project(ProjectId), Item(super::ItemId), } pub use gate::SubscriptionGate; /// Sealed home of the "does a subscription grant access right now" predicate. /// /// The predicate text lives in exactly ONE place — [`SubscriptionGate`]'s /// private `PREDICATE` associated const — and is unreachable from the rest of /// this module, let alone other modules. The only way to learn "this user has /// access" is [`SubscriptionGate::check`] (or [`SubscriptionGate::accessible_item_ids`] /// for the batch shape), each of which runs that predicate. A `SubscriptionGate` /// value is a witness: its field is private and there is no public constructor, /// so access-granting code can neither fabricate one nor hand-write a divergent /// gate. /// /// Payments S1 / CHRONIC 2: the predicate used to be a shareable `&str` const, /// and item gates drifted by dropping the `current_period_end` clause. A const /// is copy-pasteable; a private associated const inside a sealed submodule is /// not. Sealing it here makes the divergence unwritable, not merely discouraged. mod gate { use super::SubscriptionScope; use crate::db::{ItemId, UserId}; use crate::error::Result; use sqlx::PgPool; use std::collections::HashMap; /// Proof that a subscription currently grants access. Constructible ONLY via /// [`SubscriptionGate::check`] — the private `()` field seals the type so no /// other code can mint one. #[derive(Debug, Clone, Copy)] pub struct SubscriptionGate(()); impl SubscriptionGate { /// The single source of truth for "grants access right now". Private to /// this submodule: nothing outside can read it as a string, so it cannot /// be copy-pasted into a divergent query. (Compile-time constant, never /// user input, so the `format!` interpolation is injection-safe; the /// `$N` placeholders stay bound.) const PREDICATE: &'static str = "status = 'active' AND paused_at IS NULL \ AND (current_period_end IS NULL OR current_period_end > NOW())"; /// Does `user_id` hold a subscription that currently grants access to /// `scope`? Returns `Some(gate)` iff so — the sole gate constructor and /// the single entry point for project- and item-level access checks. #[tracing::instrument(skip_all)] pub async fn check( pool: &PgPool, user_id: UserId, scope: SubscriptionScope, ) -> Result> { let exists: bool = match scope { SubscriptionScope::Project(project_id) => { sqlx::query_scalar(&format!( "SELECT EXISTS(SELECT 1 FROM subscriptions \ WHERE subscriber_id = $1 AND project_id = $2 AND {})", Self::PREDICATE )) .bind(user_id) .bind(project_id) .fetch_one(pool) .await? } SubscriptionScope::Item(item_id) => { sqlx::query_scalar(&format!( "SELECT EXISTS(SELECT 1 FROM subscriptions \ WHERE subscriber_id = $1 AND item_id = $2 AND {})", Self::PREDICATE )) .bind(user_id) .bind(item_id) .fetch_one(pool) .await? } }; Ok(exists.then_some(SubscriptionGate(()))) } /// Every item ID `user_id` currently has access to via subscription /// (batch gate). Runs the same sealed predicate as [`check`], so the /// batch path cannot drift from the single-item gate. #[tracing::instrument(skip_all)] pub async fn accessible_item_ids(pool: &PgPool, user_id: UserId) -> Result> { let item_ids: Vec = sqlx::query_scalar(&format!( "SELECT DISTINCT item_id FROM subscriptions \ WHERE subscriber_id = $1 AND item_id IS NOT NULL AND {}", Self::PREDICATE )) .bind(user_id) .fetch_all(pool) .await?; Ok(item_ids) } /// Map of every item `user_id` currently has subscription access to → /// its access proof. The witness-bearing batch shape used by the project /// page, where each item's [`AccessContext`](crate::pricing::AccessContext) /// needs its own gate. Runs the sealed predicate once. #[tracing::instrument(skip_all)] pub async fn subscribed_item_gates( pool: &PgPool, user_id: UserId, ) -> Result> { let ids = Self::accessible_item_ids(pool, user_id).await?; Ok(ids.into_iter().map(|id| (id, SubscriptionGate(()))).collect()) } /// Test-only constructor. Real gates can only be minted by running the /// predicate against the DB; unit tests (e.g. `pricing`) need to /// fabricate the "access granted" state without a database. Gated to /// test builds so production code still cannot forge a witness. #[cfg(test)] pub(crate) fn test_witness() -> Self { SubscriptionGate(()) } } } /// Does `user_id` hold a subscription that currently grants access to `scope`? /// /// Thin boolean wrapper over the sealed [`SubscriptionGate::check`]; prefer /// taking the [`SubscriptionGate`] witness directly where a proof of access is /// useful downstream. #[tracing::instrument(skip_all)] pub async fn has_access( pool: &PgPool, user_id: UserId, scope: SubscriptionScope, ) -> Result { Ok(SubscriptionGate::check(pool, user_id, scope).await?.is_some()) } /// Get user subscriptions joined with project and tier data (for library display). #[tracing::instrument(skip_all)] pub async fn get_user_subscriptions_with_details( pool: &PgPool, user_id: UserId, ) -> Result> { let rows = sqlx::query_as::<_, DbUserSubscriptionRow>( "SELECT s.id, s.project_id, p.title AS project_title, p.slug AS project_slug, t.name AS tier_name, t.price_cents, s.status, s.current_period_end, s.stripe_subscription_id FROM subscriptions s JOIN projects p ON p.id = s.project_id JOIN subscription_tiers t ON t.id = s.tier_id WHERE s.subscriber_id = $1 ORDER BY s.created_at DESC LIMIT 1000", ) .bind(user_id) .fetch_all(pool) .await?; Ok(rows) } /// Get the number of active subscribers to a project (for dashboard display). /// /// NOT an access gate — this is a creator-facing headcount, so it deliberately /// counts `status = 'active'` rows regardless of `current_period_end` (a sub in /// its grace window is still a subscriber). Do not "align" it with /// [`GRANTS_ACCESS_PREDICATE`]; the divergence here is intentional. #[tracing::instrument(skip_all)] pub async fn get_project_subscriber_count( pool: &PgPool, project_id: ProjectId, ) -> Result { let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM subscriptions WHERE project_id = $1 AND status = 'active' AND paused_at IS NULL", ) .bind(project_id) .fetch_one(pool) .await?; Ok(count) } // ── Export ── /// Export all subscribers across a creator's projects. /// /// Returns username, display_name, tier name, subscription status, and when. #[tracing::instrument(skip_all)] pub async fn get_project_subscribers_for_export( pool: &PgPool, user_id: UserId, ) -> Result> { let rows = sqlx::query_as::<_, SubscriberExportRow>( r#" SELECT u.username, u.display_name, t.name AS tier_name, s.status, s.created_at FROM subscriptions s JOIN users u ON u.id = s.subscriber_id JOIN subscription_tiers t ON t.id = s.tier_id WHERE s.project_id IN (SELECT id FROM projects WHERE user_id = $1) ORDER BY s.created_at DESC "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(rows) } /// Export all subscriptions across a creator's projects with full detail. /// /// Returns project name, tier name, price, subscriber username, status, /// billing period dates, and cancellation date. #[tracing::instrument(skip_all)] pub async fn get_subscriptions_for_export( pool: &PgPool, user_id: UserId, ) -> Result> { let rows = sqlx::query_as::<_, SubscriptionExportRow>( r#" SELECT p.title AS project_title, t.name AS tier_name, t.price_cents, u.username, s.status, s.current_period_start, s.current_period_end, s.canceled_at, s.created_at FROM subscriptions s JOIN users u ON u.id = s.subscriber_id JOIN subscription_tiers t ON t.id = s.tier_id JOIN projects p ON p.id = s.project_id WHERE s.project_id IN (SELECT id FROM projects WHERE user_id = $1) ORDER BY s.created_at DESC "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(rows) } // ── Event log ── /// Log a subscription webhook event for debugging and idempotency. /// The UNIQUE index on stripe_event_id makes duplicate events a no-op. #[tracing::instrument(skip_all)] pub async fn log_subscription_event( pool: &PgPool, subscription_id: Option, stripe_event_id: &str, event_type: &str, payload: &serde_json::Value, ) -> Result<()> { sqlx::query( r#" INSERT INTO subscription_events (subscription_id, stripe_event_id, event_type, payload) VALUES ($1, $2, $3, $4) ON CONFLICT (stripe_event_id) DO NOTHING "#, ) .bind(subscription_id) .bind(stripe_event_id) .bind(event_type) .bind(payload) .execute(pool) .await?; Ok(()) }