//! Fan+ consumer subscription queries. use sqlx::PgPool; use super::id_types::*; use super::models::DbFanPlusSubscription; use crate::error::Result; /// Create or reactivate a Fan+ subscription record. /// /// Uses ON CONFLICT DO UPDATE on the user_id unique constraint to handle /// both duplicate webhooks and re-subscription after cancellation. #[tracing::instrument(skip_all)] pub async fn create_fan_plus_subscription<'e>( executor: impl sqlx::PgExecutor<'e>, user_id: UserId, stripe_subscription_id: &str, stripe_customer_id: &str, ) -> Result> { let sub = sqlx::query_as::<_, DbFanPlusSubscription>( r#" INSERT INTO fan_plus_subscriptions (user_id, stripe_subscription_id, stripe_customer_id) VALUES ($1, $2, $3) ON CONFLICT (user_id) DO UPDATE SET stripe_subscription_id = EXCLUDED.stripe_subscription_id, stripe_customer_id = EXCLUDED.stripe_customer_id, status = 'active', canceled_at = NULL RETURNING * "#, ) .bind(user_id) .bind(stripe_subscription_id) .bind(stripe_customer_id) .fetch_optional(executor) .await?; Ok(sub) } /// Look up a Fan+ subscription by its Stripe subscription ID. #[tracing::instrument(skip_all)] pub async fn get_fan_plus_by_stripe_id( pool: &PgPool, stripe_subscription_id: &str, ) -> Result> { let sub = sqlx::query_as::<_, DbFanPlusSubscription>( "SELECT * FROM fan_plus_subscriptions WHERE stripe_subscription_id = $1", ) .bind(stripe_subscription_id) .fetch_optional(pool) .await?; Ok(sub) } // Apply a Stripe-driven status and/or period update in one guarded statement. // Fan+ was the sibling the Run #11 revival guard missed (Run #12 SERIOUS), and // its period setter then lacked the guard too. Writing status + period together // here closes both: a canceled Fan+ sub can be neither revived nor period- // refreshed by an out-of-order webhook. Reactivation runs through // `create_fan_plus_subscription`'s ON CONFLICT DO UPDATE at checkout, never this // path. See `crate::db::subscription_writer`. crate::db::subscription_writer::define_stripe_subscription_writer!( apply_stripe_update, "fan_plus_subscriptions", DbFanPlusSubscription ); /// Cancel a Fan+ subscription (set status + canceled_at). #[tracing::instrument(skip_all)] pub async fn cancel_fan_plus( pool: &PgPool, stripe_subscription_id: &str, ) -> Result> { let sub = sqlx::query_as::<_, DbFanPlusSubscription>( r#" UPDATE fan_plus_subscriptions SET status = 'canceled', canceled_at = COALESCE(canceled_at, NOW()) WHERE stripe_subscription_id = $1 RETURNING * "#, ) .bind(stripe_subscription_id) .fetch_optional(pool) .await?; Ok(sub) } /// Check whether a user has an active Fan+ subscription. #[tracing::instrument(skip_all)] pub async fn is_fan_plus_active(pool: &PgPool, user_id: UserId) -> Result { let exists = sqlx::query_scalar::<_, bool>( "SELECT EXISTS(SELECT 1 FROM fan_plus_subscriptions WHERE user_id = $1 AND status = 'active')", ) .bind(user_id) .fetch_one(pool) .await?; Ok(exists) } /// Mark a Fan+ subscription as scheduled to cancel at period end (or undo). /// /// Sets the local flag; Stripe is the source of truth and re-asserts it via /// the `customer.subscription.updated` webhook. Called from the dashboard /// Cancel/Resume buttons and from the webhook handler. #[tracing::instrument(skip_all)] pub async fn set_cancel_at_period_end( pool: &PgPool, stripe_subscription_id: &str, cancel: bool, ) -> Result> { let sub = sqlx::query_as::<_, DbFanPlusSubscription>( "UPDATE fan_plus_subscriptions SET cancel_at_period_end = $2 WHERE stripe_subscription_id = $1 RETURNING *", ) .bind(stripe_subscription_id) .bind(cancel) .fetch_optional(pool) .await?; Ok(sub) } /// Get a user's Fan+ subscription (any status). #[tracing::instrument(skip_all)] pub async fn get_fan_plus_by_user( pool: &PgPool, user_id: UserId, ) -> Result> { let sub = sqlx::query_as::<_, DbFanPlusSubscription>( "SELECT * FROM fan_plus_subscriptions WHERE user_id = $1", ) .bind(user_id) .fetch_optional(pool) .await?; Ok(sub) }