//! Fan+ consumer subscription queries.
use sqlx::PgPool;
use super::id_types::*;
use super::models::DbFanPlusSubscription;
use crate::error::Result;
/// Create or reactivate a Fan+ subscription record.
///
/// Uses ON CONFLICT DO UPDATE on the user_id unique constraint to handle
/// both duplicate webhooks and re-subscription after cancellation.
#[tracing::instrument(skip_all)]
pub async fn create_fan_plus_subscription<'e>(
executor: impl sqlx::PgExecutor<'e>,
user_id: UserId,
stripe_subscription_id: &str,
stripe_customer_id: &str,
) -> Result> {
let sub = sqlx::query_as::<_, DbFanPlusSubscription>(
r#"
INSERT INTO fan_plus_subscriptions (user_id, stripe_subscription_id, stripe_customer_id)
VALUES ($1, $2, $3)
ON CONFLICT (user_id) DO UPDATE
SET stripe_subscription_id = EXCLUDED.stripe_subscription_id,
stripe_customer_id = EXCLUDED.stripe_customer_id,
status = 'active',
canceled_at = NULL
RETURNING *
"#,
)
.bind(user_id)
.bind(stripe_subscription_id)
.bind(stripe_customer_id)
.fetch_optional(executor)
.await?;
Ok(sub)
}
/// Look up a Fan+ subscription by its Stripe subscription ID.
#[tracing::instrument(skip_all)]
pub async fn get_fan_plus_by_stripe_id(
pool: &PgPool,
stripe_subscription_id: &str,
) -> Result > {
let sub = sqlx::query_as::<_, DbFanPlusSubscription>(
"SELECT * FROM fan_plus_subscriptions WHERE stripe_subscription_id = $1",
)
.bind(stripe_subscription_id)
.fetch_optional(pool)
.await?;
Ok(sub)
}
// Apply a Stripe-driven status and/or period update in one guarded statement.
// Fan+ was the sibling the Run #11 revival guard missed (Run #12 SERIOUS), and
// its period setter then lacked the guard too. Writing status + period together
// here closes both: a canceled Fan+ sub can be neither revived nor period-
// refreshed by an out-of-order webhook. Reactivation runs through
// `create_fan_plus_subscription`'s ON CONFLICT DO UPDATE at checkout, never this
// path. See `crate::db::subscription_writer`.
crate::db::subscription_writer::define_stripe_subscription_writer!(
apply_stripe_update,
"fan_plus_subscriptions",
DbFanPlusSubscription
);
/// Cancel a Fan+ subscription (set status + canceled_at).
#[tracing::instrument(skip_all)]
pub async fn cancel_fan_plus(
pool: &PgPool,
stripe_subscription_id: &str,
) -> Result > {
let sub = sqlx::query_as::<_, DbFanPlusSubscription>(
r#"
UPDATE fan_plus_subscriptions
SET status = 'canceled', canceled_at = COALESCE(canceled_at, NOW())
WHERE stripe_subscription_id = $1
RETURNING *
"#,
)
.bind(stripe_subscription_id)
.fetch_optional(pool)
.await?;
Ok(sub)
}
/// Check whether a user has an active Fan+ subscription.
#[tracing::instrument(skip_all)]
pub async fn is_fan_plus_active(pool: &PgPool, user_id: UserId) -> Result {
let exists = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM fan_plus_subscriptions WHERE user_id = $1 AND status = 'active')",
)
.bind(user_id)
.fetch_one(pool)
.await?;
Ok(exists)
}
/// Mark a Fan+ subscription as scheduled to cancel at period end (or undo).
///
/// Sets the local flag; Stripe is the source of truth and re-asserts it via
/// the `customer.subscription.updated` webhook. Called from the dashboard
/// Cancel/Resume buttons and from the webhook handler.
#[tracing::instrument(skip_all)]
pub async fn set_cancel_at_period_end(
pool: &PgPool,
stripe_subscription_id: &str,
cancel: bool,
) -> Result> {
let sub = sqlx::query_as::<_, DbFanPlusSubscription>(
"UPDATE fan_plus_subscriptions
SET cancel_at_period_end = $2
WHERE stripe_subscription_id = $1
RETURNING *",
)
.bind(stripe_subscription_id)
.bind(cancel)
.fetch_optional(pool)
.await?;
Ok(sub)
}
/// Get a user's Fan+ subscription (any status).
#[tracing::instrument(skip_all)]
pub async fn get_fan_plus_by_user(
pool: &PgPool,
user_id: UserId,
) -> Result > {
let sub = sqlx::query_as::<_, DbFanPlusSubscription>(
"SELECT * FROM fan_plus_subscriptions WHERE user_id = $1",
)
.bind(user_id)
.fetch_optional(pool)
.await?;
Ok(sub)
}