//! Creator tier subscription queries and storage enforcement.
use chrono::{DateTime, Utc};
use sqlx::PgPool;
use super::enums::CreatorTier;
use super::id_types::*;
use super::models::{DbCreatorSubscription, StorageBreakdown};
use crate::error::{AppError, Result};
use crate::helpers::format_bytes;
use crate::storage::FileType;
/// Create or reactivate a creator tier subscription record.
///
/// Uses ON CONFLICT DO UPDATE on the user_id unique index to handle
/// both duplicate webhooks and re-subscription after cancellation.
/// Returns `None` if the row already existed with the same stripe_subscription_id
/// (duplicate webhook), `Some` if this was a fresh insert or a re-subscription
/// with a different subscription ID.
#[tracing::instrument(skip_all)]
pub async fn create_creator_subscription<'e>(
executor: impl sqlx::PgExecutor<'e>,
user_id: UserId,
stripe_subscription_id: &str,
stripe_customer_id: &str,
tier: CreatorTier,
) -> Result> {
// Use WHERE clause on the DO UPDATE to only update if the subscription_id
// is different (new subscription) or status is not already active.
// When the WHERE fails, DO UPDATE becomes a no-op and RETURNING yields no row.
let sub = sqlx::query_as::<_, DbCreatorSubscription>(
r#"
INSERT INTO creator_subscriptions (user_id, stripe_subscription_id, stripe_customer_id, tier)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id) DO UPDATE
SET stripe_subscription_id = EXCLUDED.stripe_subscription_id,
stripe_customer_id = EXCLUDED.stripe_customer_id,
tier = EXCLUDED.tier,
status = 'active',
canceled_at = NULL,
grace_enforced_at = NULL
WHERE creator_subscriptions.stripe_subscription_id != EXCLUDED.stripe_subscription_id
OR creator_subscriptions.status != 'active'
RETURNING *
"#,
)
.bind(user_id)
.bind(stripe_subscription_id)
.bind(stripe_customer_id)
.bind(tier)
.fetch_optional(executor)
.await?;
Ok(sub)
}
/// Look up a creator subscription by its Stripe subscription ID.
#[tracing::instrument(skip_all)]
pub async fn get_creator_sub_by_stripe_id(
pool: &PgPool,
stripe_subscription_id: &str,
) -> Result > {
let sub = sqlx::query_as::<_, DbCreatorSubscription>(
"SELECT * FROM creator_subscriptions WHERE stripe_subscription_id = $1",
)
.bind(stripe_subscription_id)
.fetch_optional(pool)
.await?;
Ok(sub)
}
/// Get a user's creator subscription (any status).
#[tracing::instrument(skip_all)]
pub async fn get_creator_sub_by_user(
pool: &PgPool,
user_id: UserId,
) -> Result > {
let sub = sqlx::query_as::<_, DbCreatorSubscription>(
"SELECT * FROM creator_subscriptions WHERE user_id = $1",
)
.bind(user_id)
.fetch_optional(pool)
.await?;
Ok(sub)
}
/// Get the active creator tier for a user (None if no active subscription).
#[tracing::instrument(skip_all)]
pub async fn get_active_creator_tier(
pool: &PgPool,
user_id: UserId,
) -> Result > {
let tier = sqlx::query_scalar::<_, String>(
"SELECT tier FROM creator_subscriptions WHERE user_id = $1 AND status = 'active'",
)
.bind(user_id)
.fetch_optional(pool)
.await?;
Ok(tier.and_then(|t| t.parse().ok()))
}
// Apply a Stripe-driven status and/or period update in one guarded statement.
// `canceled` is terminal; reactivation runs through `create_creator_subscription`'s
// `ON CONFLICT (user_id) DO UPDATE` at checkout, never through this path. Replaces
// the old split status/period setters (the period half lacked the guard). See
// `crate::db::subscription_writer`.
crate::db::subscription_writer::define_stripe_subscription_writer!(
apply_stripe_update,
"creator_subscriptions",
DbCreatorSubscription
);
/// Cancel a creator subscription (set status + canceled_at).
#[tracing::instrument(skip_all)]
pub async fn cancel_creator_sub(
pool: &PgPool,
stripe_subscription_id: &str,
) -> Result > {
let sub = sqlx::query_as::<_, DbCreatorSubscription>(
r#"
UPDATE creator_subscriptions
SET status = 'canceled', canceled_at = COALESCE(canceled_at, NOW())
WHERE stripe_subscription_id = $1
RETURNING *
"#,
)
.bind(stripe_subscription_id)
.fetch_optional(pool)
.await?;
Ok(sub)
}
/// Sync the users.creator_tier column from the subscription status.
/// Called after checkout/update/cancel to keep the denormalized column in sync.
#[tracing::instrument(skip_all)]
pub async fn sync_user_creator_tier(pool: &PgPool, user_id: UserId) -> Result<()> {
sqlx::query(
r#"
UPDATE users SET creator_tier = (
SELECT tier FROM creator_subscriptions
WHERE user_id = $1 AND status = 'active'
)
WHERE id = $1
"#,
)
.bind(user_id)
.execute(pool)
.await?;
Ok(())
}
// ============================================================================
// Storage tracking
// ============================================================================
/// Get the current storage_used_bytes for a user.
#[tracing::instrument(skip_all)]
pub async fn get_storage_used(pool: &PgPool, user_id: UserId) -> Result {
let used: i64 = sqlx::query_scalar(
"SELECT storage_used_bytes FROM users WHERE id = $1",
)
.bind(user_id)
.fetch_one(pool)
.await?;
Ok(used)
}
/// Build the "storage cap exceeded" error, reading current usage on `executor`
/// for the message. Shared by the cap-checked UPDATE helpers below so the
/// wording (and the usage read) can't drift between them. Returns the error in
/// `Ok`; a failed usage read propagates as `Err`, matching the prior inline code.
async fn storage_cap_exceeded<'e>(
executor: impl sqlx::PgExecutor<'e>,
user_id: UserId,
max_storage_bytes: i64,
) -> Result {
let used: i64 = sqlx::query_scalar("SELECT storage_used_bytes FROM users WHERE id = $1")
.bind(user_id)
.fetch_one(executor)
.await?;
Ok(AppError::BadRequest(format!(
"You've used {} of {} storage. Delete files or upgrade your tier.",
format_bytes(used),
format_bytes(max_storage_bytes),
)))
}
/// Atomically check storage cap and increment the user's storage counter.
/// Returns an error if the increment would exceed `max_storage_bytes`.
#[tracing::instrument(skip_all)]
pub async fn try_increment_storage(
pool: &PgPool,
user_id: UserId,
bytes: i64,
max_storage_bytes: i64,
) -> Result<()> {
let result = sqlx::query(
"UPDATE users SET storage_used_bytes = storage_used_bytes + $2 \
WHERE id = $1 AND storage_used_bytes + $2 <= $3",
)
.bind(user_id)
.bind(bytes)
.bind(max_storage_bytes)
.execute(pool)
.await?;
if result.rows_affected() == 0 {
return Err(storage_cap_exceeded(pool, user_id, max_storage_bytes).await?);
}
Ok(())
}
/// Transaction-friendly variant of [`try_increment_storage`]. Runs both the
/// cap-checked UPDATE and the error-path SELECT against the supplied
/// connection, so callers can wrap the increment in the same transaction as
/// the follow-up entity write (e.g. `media_files::create`). On cap miss the
/// transaction is left open for the caller to roll back via drop.
#[tracing::instrument(skip_all)]
pub async fn try_increment_storage_on(
conn: &mut sqlx::PgConnection,
user_id: UserId,
bytes: i64,
max_storage_bytes: i64,
) -> Result<()> {
let result = sqlx::query(
"UPDATE users SET storage_used_bytes = storage_used_bytes + $2 \
WHERE id = $1 AND storage_used_bytes + $2 <= $3",
)
.bind(user_id)
.bind(bytes)
.bind(max_storage_bytes)
.execute(&mut *conn)
.await?;
if result.rows_affected() == 0 {
return Err(storage_cap_exceeded(&mut *conn, user_id, max_storage_bytes).await?);
}
Ok(())
}
/// Atomically replace storage: decrement old file size and increment new file
/// size in a single cap-checked UPDATE, on a caller-supplied connection so it can
/// be bundled with the row UPDATE in one transaction (no storage-drift window on
/// a mid-write failure). Prevents drift by avoiding a separate decrement/increment.
#[tracing::instrument(skip_all)]
pub async fn try_replace_storage_on(
conn: &mut sqlx::PgConnection,
user_id: UserId,
old_bytes: i64,
new_bytes: i64,
max_storage_bytes: i64,
) -> Result<()> {
let result = sqlx::query(
"UPDATE users SET storage_used_bytes = GREATEST(0, storage_used_bytes - $2) + $3 \
WHERE id = $1 AND GREATEST(0, storage_used_bytes - $2) + $3 <= $4",
)
.bind(user_id)
.bind(old_bytes)
.bind(new_bytes)
.bind(max_storage_bytes)
.execute(&mut *conn)
.await?;
if result.rows_affected() == 0 {
return Err(storage_cap_exceeded(&mut *conn, user_id, max_storage_bytes).await?);
}
Ok(())
}
/// Apply a confirmed upload's storage credit on a transaction connection:
/// `replace_old_size = Some(old)` swaps an existing file's size for the new one
/// (atomic decrement-old + increment-new); `None` is a fresh increment. Lets the
/// four upload-confirm handlers share one call instead of each re-deriving the
/// replace-vs-increment branch.
#[tracing::instrument(skip_all)]
pub async fn try_apply_storage_on(
conn: &mut sqlx::PgConnection,
user_id: UserId,
replace_old_size: Option,
new_bytes: i64,
max_storage_bytes: i64,
) -> Result<()> {
match replace_old_size {
Some(old_bytes) => {
try_replace_storage_on(conn, user_id, old_bytes, new_bytes, max_storage_bytes).await
}
None => try_increment_storage_on(conn, user_id, new_bytes, max_storage_bytes).await,
}
}
/// Atomically decrement the user's storage counter (clamped to 0).
#[tracing::instrument(skip_all)]
pub async fn decrement_storage_used<'e>(
executor: impl sqlx::PgExecutor<'e>,
user_id: UserId,
bytes: i64,
) -> Result<()> {
sqlx::query(
"UPDATE users SET storage_used_bytes = GREATEST(0, storage_used_bytes - $2) WHERE id = $1",
)
.bind(user_id)
.bind(bytes)
.execute(executor)
.await?;
Ok(())
}
/// Get the admin-set per-file size override for a user.
#[tracing::instrument(skip_all)]
pub async fn get_max_file_override(pool: &PgPool, user_id: UserId) -> Result> {
let val: Option = sqlx::query_scalar(
"SELECT max_file_override_bytes FROM users WHERE id = $1",
)
.bind(user_id)
.fetch_one(pool)
.await?;
Ok(val)
}
/// Set or clear the admin per-file size override.
#[tracing::instrument(skip_all)]
pub async fn set_max_file_override(
pool: &PgPool,
user_id: UserId,
bytes: Option,
) -> Result<()> {
sqlx::query(
"UPDATE users SET max_file_override_bytes = $2 WHERE id = $1",
)
.bind(user_id)
.bind(bytes)
.execute(pool)
.await?;
Ok(())
}
/// Get the grandfathering deadline for a user.
#[tracing::instrument(skip_all)]
pub async fn get_grandfathered_until(
pool: &PgPool,
user_id: UserId,
) -> Result>> {
let val: Option> = sqlx::query_scalar(
"SELECT grandfathered_until FROM users WHERE id = $1",
)
.bind(user_id)
.fetch_one(pool)
.await?;
Ok(val)
}
/// Get a per-category storage breakdown for the creator dashboard (single query).
#[tracing::instrument(skip_all)]
pub async fn get_storage_breakdown(pool: &PgPool, user_id: UserId) -> Result {
let row: (i64, i64, i64, i64, i64, i64) = sqlx::query_as(
r#"
WITH audio_bytes AS (
SELECT COALESCE(SUM(i.audio_file_size_bytes)::BIGINT, 0) AS total
FROM items i JOIN projects p ON i.project_id = p.id
WHERE p.user_id = $1 AND i.audio_file_size_bytes IS NOT NULL
),
cover_bytes AS (
SELECT COALESCE(SUM(i.cover_file_size_bytes)::BIGINT, 0) AS total
FROM items i JOIN projects p ON i.project_id = p.id
WHERE p.user_id = $1 AND i.cover_file_size_bytes IS NOT NULL
),
version_bytes AS (
SELECT COALESCE(SUM(v.file_size_bytes)::BIGINT, 0) AS total
FROM versions v
JOIN items i ON v.item_id = i.id
JOIN projects p ON i.project_id = p.id
WHERE p.user_id = $1 AND v.file_size_bytes IS NOT NULL
),
insertion_bytes AS (
SELECT COALESCE(SUM(file_size)::BIGINT, 0) AS total
FROM content_insertions WHERE user_id = $1
),
video_bytes AS (
SELECT COALESCE(SUM(i.video_file_size_bytes)::BIGINT, 0) AS total
FROM items i JOIN projects p ON i.project_id = p.id
WHERE p.user_id = $1 AND i.video_file_size_bytes IS NOT NULL
),
media_bytes AS (
SELECT COALESCE(SUM(file_size_bytes)::BIGINT, 0) AS total
FROM media_files WHERE user_id = $1
)
SELECT
(SELECT total FROM audio_bytes),
(SELECT total FROM cover_bytes),
(SELECT total FROM version_bytes),
(SELECT total FROM insertion_bytes),
(SELECT total FROM video_bytes),
(SELECT total FROM media_bytes)
"#,
)
.bind(user_id)
.fetch_one(pool)
.await?;
Ok(StorageBreakdown {
audio_bytes: row.0,
cover_bytes: row.1,
download_bytes: row.2,
insertion_bytes: row.3,
video_bytes: row.4,
media_bytes: row.5,
total_bytes: row.0 + row.1 + row.2 + row.3 + row.4 + row.5,
})
}
/// Get user IDs of creators with canceled subscriptions 30+ days ago
/// whose items have not yet been hidden.
#[tracing::instrument(skip_all)]
pub async fn get_expired_grace_creators(pool: &PgPool) -> Result> {
let ids: Vec = sqlx::query_scalar(
r#"
SELECT user_id FROM creator_subscriptions
WHERE status = 'canceled'
AND canceled_at IS NOT NULL
AND canceled_at < NOW() - INTERVAL '30 days'
AND grace_enforced_at IS NULL
"#,
)
.fetch_all(pool)
.await?;
Ok(ids)
}
/// Mark a creator's post-grace enforcement as applied.
#[tracing::instrument(skip_all)]
pub async fn mark_grace_enforced(pool: &PgPool, user_id: UserId) -> Result<()> {
sqlx::query(
"UPDATE creator_subscriptions SET grace_enforced_at = NOW() WHERE user_id = $1",
)
.bind(user_id)
.execute(pool)
.await?;
Ok(())
}
/// Count fully-paying creators — `status = 'active'` only.
///
/// Excludes trialing (free trial), past_due (payment failed but not yet
/// canceled), canceled-in-grace (winding down), and incomplete states.
/// This is the number that goes on the runway disclosure as "paying
/// creators today": revenue-bearing seats, no fudge.
#[tracing::instrument(skip_all)]
pub async fn count_active_paying(pool: &PgPool) -> Result {
let count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM creator_subscriptions WHERE status = 'active'",
)
.fetch_one(pool)
.await?;
Ok(count.0)
}
/// Count creators in a trial or 30-day cancellation grace period.
///
/// These are not revenue-bearing today but represent the near-term
/// pipeline: trialing seats may convert, grace seats may resubscribe
/// before enforcement. Disclosed as a secondary number on the runway
/// surface so the headline `count_active_paying` stays strict.
#[tracing::instrument(skip_all)]
pub async fn count_trialing_or_grace(pool: &PgPool) -> Result {
let count: (i64,) = sqlx::query_as(
r#"
SELECT COUNT(*) FROM creator_subscriptions
WHERE status = 'trialing'
OR (
status = 'canceled'
AND canceled_at IS NOT NULL
AND canceled_at > NOW() - INTERVAL '30 days'
AND grace_enforced_at IS NULL
)
"#,
)
.fetch_one(pool)
.await?;
Ok(count.0)
}
/// Check whether a user is in the 30-day cancellation grace period.
///
/// Returns `true` if the subscription is canceled but within 30 days of cancellation
/// and enforcement has not yet been applied.
#[tracing::instrument(skip_all)]
pub async fn is_in_grace_period(pool: &PgPool, user_id: UserId) -> Result {
let in_grace: bool = sqlx::query_scalar(
r#"
SELECT EXISTS(
SELECT 1 FROM creator_subscriptions
WHERE user_id = $1
AND status = 'canceled'
AND canceled_at IS NOT NULL
AND canceled_at > NOW() - INTERVAL '30 days'
AND grace_enforced_at IS NULL
)
"#,
)
.bind(user_id)
.fetch_one(pool)
.await?;
Ok(in_grace)
}
/// Batch-recalculate storage_used_bytes for ALL creators in a single query.
///
/// Uses the same CTE logic as `recalculate_storage_used` but operates on all
/// creator users at once, avoiding the N+1 loop. Returns the number of rows updated.
#[tracing::instrument(skip_all)]
pub async fn recalculate_all_storage_batch(pool: &PgPool) -> Result {
let result = sqlx::query(
r#"
UPDATE users SET storage_used_bytes = totals.total
FROM (
SELECT u.id AS user_id,
COALESCE(audio.total, 0)
+ COALESCE(cover.total, 0)
+ COALESCE(video.total, 0)
+ COALESCE(versions.total, 0)
+ COALESCE(insertions.total, 0)
+ COALESCE(media.total, 0) AS total
FROM users u
LEFT JOIN LATERAL (
SELECT SUM(i.audio_file_size_bytes)::BIGINT AS total
FROM items i JOIN projects p ON i.project_id = p.id
WHERE p.user_id = u.id AND i.audio_file_size_bytes IS NOT NULL
) audio ON true
LEFT JOIN LATERAL (
SELECT SUM(i.cover_file_size_bytes)::BIGINT AS total
FROM items i JOIN projects p ON i.project_id = p.id
WHERE p.user_id = u.id AND i.cover_file_size_bytes IS NOT NULL
) cover ON true
LEFT JOIN LATERAL (
SELECT SUM(i.video_file_size_bytes)::BIGINT AS total
FROM items i JOIN projects p ON i.project_id = p.id
WHERE p.user_id = u.id AND i.video_file_size_bytes IS NOT NULL
) video ON true
LEFT JOIN LATERAL (
SELECT SUM(v.file_size_bytes)::BIGINT AS total
FROM versions v JOIN items i ON v.item_id = i.id JOIN projects p ON i.project_id = p.id
WHERE p.user_id = u.id AND v.file_size_bytes IS NOT NULL
) versions ON true
LEFT JOIN LATERAL (
SELECT SUM(ci.file_size)::BIGINT AS total
FROM content_insertions ci WHERE ci.user_id = u.id
) insertions ON true
LEFT JOIN LATERAL (
SELECT SUM(mf.file_size_bytes)::BIGINT AS total
FROM media_files mf WHERE mf.user_id = u.id
) media ON true
WHERE u.can_create_projects = true
) totals
WHERE users.id = totals.user_id AND users.storage_used_bytes IS DISTINCT FROM totals.total
"#,
)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
// ============================================================================
// Enforcement
// ============================================================================
/// Check whether a file upload is allowed based on the user's tier, storage,
/// and grandfathering status. Returns the tier's `max_storage_bytes` on success
/// (for use with `try_increment_storage`), or an appropriate `AppError` if rejected.
///
/// Covers and media images bypass tier checks but respect their size limits
/// enforced separately in the storage module.
#[tracing::instrument(skip_all)]
pub async fn check_upload_allowed(
pool: &PgPool,
user_id: UserId,
file_type: FileType,
file_size_bytes: i64,
) -> Result {
// Covers and media images bypass per-file tier checks but still respect
// the storage cap. Look up the active tier (fallback to Basic cap).
if file_type == FileType::Cover || file_type == FileType::MediaImage {
let active_tier = get_active_creator_tier(pool, user_id).await?;
let max_storage = active_tier
.map(|t| t.max_storage_bytes())
.unwrap_or_else(|| CreatorTier::Basic.max_storage_bytes());
return Ok(max_storage);
}
// Resolve effective tier
let active_tier = get_active_creator_tier(pool, user_id).await?;
let grandfathered = get_grandfathered_until(pool, user_id).await?;
let effective_tier = match active_tier {
Some(tier) => Some(tier),
None => {
// Check grandfathering
if let Some(until) = grandfathered {
if Utc::now() < until {
Some(CreatorTier::SmallFiles) // grandfathered as SmallFiles-equivalent
} else {
None
}
} else {
None
}
}
};
// Grace period check (canceled sub, within 30 days)
// Use effective_tier so grandfathered users aren't blocked
if effective_tier.is_none() {
let in_grace = is_in_grace_period(pool, user_id).await?;
if in_grace {
return Err(AppError::BadRequest(
"Your creator subscription has been canceled. Re-subscribe to upload files.".to_string(),
));
}
}
// No tier and not grandfathered → reject
let tier = match effective_tier {
Some(t) => t,
None => {
return Err(AppError::BadRequest(
"A creator tier subscription is required to upload files.".to_string(),
));
}
};
// Basic tier is text-only (no non-cover uploads)
if !tier.allows_file_uploads() {
return Err(AppError::BadRequest(
"Basic tier is text-only. Upgrade to Small Files or higher to upload files.".to_string(),
));
}
// Per-file size check
let max_override = get_max_file_override(pool, user_id).await?;
let max_file = max_override.unwrap_or(tier.max_file_bytes());
if file_size_bytes > max_file {
return Err(AppError::FileTooLarge(format!(
"File size ({}) exceeds the {} per-file limit of {}.",
format_bytes(file_size_bytes),
tier.label(),
format_bytes(max_file),
)));
}
// Storage cap pre-check (non-atomic fast-fail; the atomic enforcement
// happens in try_increment_storage after scanning completes)
let used = get_storage_used(pool, user_id).await?;
let max_storage = tier.max_storage_bytes();
if used + file_size_bytes > max_storage {
return Err(AppError::BadRequest(format!(
"You've used {} of {} storage. Delete files or upgrade your tier.",
format_bytes(used),
format_bytes(max_storage),
)));
}
Ok(max_storage)
}
/// Early presign-time check: reject if the user has no tier or is already at/over
/// their storage cap. This prevents generating presigned URLs that would always
/// fail at confirm time. Does NOT check file size (unknown at presign).
#[tracing::instrument(skip_all)]
pub async fn check_presign_allowed(
pool: &PgPool,
user_id: UserId,
file_type: FileType,
) -> Result<()> {
// Covers and media images bypass tier checks
if file_type == FileType::Cover || file_type == FileType::MediaImage {
return Ok(());
}
let active_tier = get_active_creator_tier(pool, user_id).await?;
let grandfathered = get_grandfathered_until(pool, user_id).await?;
let effective_tier = match active_tier {
Some(tier) => Some(tier),
None => {
if let Some(until) = grandfathered {
if Utc::now() < until {
Some(CreatorTier::SmallFiles)
} else {
None
}
} else {
None
}
}
};
if effective_tier.is_none() {
let in_grace = is_in_grace_period(pool, user_id).await?;
if in_grace {
return Err(AppError::BadRequest(
"Your creator subscription has been canceled. Re-subscribe to upload files.".to_string(),
));
}
return Err(AppError::BadRequest(
"A creator tier subscription is required to upload files.".to_string(),
));
}
let tier = effective_tier.expect("guarded by is_none check above");
if !tier.allows_file_uploads() {
return Err(AppError::BadRequest(
"Basic tier is text-only. Upgrade to Small Files or higher to upload files.".to_string(),
));
}
// Reject if already at/over storage cap
let used = get_storage_used(pool, user_id).await?;
let max_storage = tier.max_storage_bytes();
if used >= max_storage {
return Err(AppError::BadRequest(format!(
"You've used {} of {} storage. Delete files or upgrade your tier.",
format_bytes(used),
format_bytes(max_storage),
)));
}
Ok(())
}
/// Return the effective per-file size limit in bytes for this user, accounting
/// for their active tier and any admin override. Returns `None` for file types
/// that bypass tier checks (covers, media images).
#[tracing::instrument(skip_all)]
pub async fn get_effective_max_file_bytes(
pool: &PgPool,
user_id: UserId,
file_type: FileType,
) -> Result> {
if file_type == FileType::Cover || file_type == FileType::MediaImage {
return Ok(None);
}
let active_tier = get_active_creator_tier(pool, user_id).await?;
let grandfathered = get_grandfathered_until(pool, user_id).await?;
let effective_tier = match active_tier {
Some(tier) => tier,
None => {
if let Some(until) = grandfathered {
if Utc::now() < until {
CreatorTier::SmallFiles
} else {
return Ok(Some(file_type.max_size()));
}
} else {
return Ok(Some(file_type.max_size()));
}
}
};
let max_override = get_max_file_override(pool, user_id).await?;
let tier_limit = max_override.unwrap_or(effective_tier.max_file_bytes()) as u64;
Ok(Some(std::cmp::min(tier_limit, file_type.max_size())))
}
/// Get total known file sizes for a user (versions + content insertions).
/// Used by the account deletion form to show how much data will be removed.
#[tracing::instrument(skip_all)]
pub async fn get_user_content_size(pool: &PgPool, user_id: UserId) -> Result {
let version_size: i64 = sqlx::query_scalar(
r#"
SELECT COALESCE(SUM(v.file_size_bytes)::BIGINT, 0)
FROM versions v
JOIN items i ON v.item_id = i.id
JOIN projects p ON i.project_id = p.id
WHERE p.user_id = $1 AND v.s3_key IS NOT NULL
"#,
)
.bind(user_id)
.fetch_one(pool)
.await?;
let insertion_size: i64 = sqlx::query_scalar(
"SELECT COALESCE(SUM(file_size)::BIGINT, 0) FROM content_insertions WHERE user_id = $1",
)
.bind(user_id)
.fetch_one(pool)
.await?;
Ok(version_size + insertion_size)
}
#[cfg(test)]
mod tests {
use super::*;
// ── CreatorTier::label ───────────────────────────────────────────────
#[test]
fn label_basic() {
assert_eq!(CreatorTier::Basic.label(), "Basic");
}
#[test]
fn label_small_files() {
assert_eq!(CreatorTier::SmallFiles.label(), "Small Files");
}
#[test]
fn label_big_files() {
assert_eq!(CreatorTier::BigFiles.label(), "Big Files");
}
#[test]
fn label_everything() {
assert_eq!(CreatorTier::Everything.label(), "Everything");
}
// ── CreatorTier::price_cents ────────────────────────────────────────
#[test]
fn price_basic_is_sixteen_dollars() {
assert_eq!(CreatorTier::Basic.price_cents(), 1600);
}
#[test]
fn price_small_files_is_twenty_four_dollars() {
assert_eq!(CreatorTier::SmallFiles.price_cents(), 2400);
}
#[test]
fn price_big_files_is_thirty_six_dollars() {
assert_eq!(CreatorTier::BigFiles.price_cents(), 3600);
}
#[test]
fn price_everything_is_sixty_dollars() {
assert_eq!(CreatorTier::Everything.price_cents(), 6000);
}
#[test]
fn prices_are_strictly_increasing() {
let tiers = [
CreatorTier::Basic,
CreatorTier::SmallFiles,
CreatorTier::BigFiles,
CreatorTier::Everything,
];
for pair in tiers.windows(2) {
assert!(
pair[0].price_cents() < pair[1].price_cents(),
"{:?} should cost less than {:?}",
pair[0],
pair[1],
);
}
}
// ── CreatorTier::max_file_bytes ─────────────────────────────────────
#[test]
fn max_file_basic_is_10mb() {
assert_eq!(CreatorTier::Basic.max_file_bytes(), 10 * 1024 * 1024);
}
#[test]
fn max_file_small_files_is_500mb() {
assert_eq!(CreatorTier::SmallFiles.max_file_bytes(), 500 * 1024 * 1024);
}
#[test]
fn max_file_big_files_is_20gb() {
assert_eq!(CreatorTier::BigFiles.max_file_bytes(), 20 * 1024 * 1024 * 1024);
}
#[test]
fn max_file_everything_matches_big_files() {
assert_eq!(
CreatorTier::Everything.max_file_bytes(),
CreatorTier::BigFiles.max_file_bytes(),
);
}
#[test]
fn max_file_bytes_non_decreasing() {
let tiers = [
CreatorTier::Basic,
CreatorTier::SmallFiles,
CreatorTier::BigFiles,
CreatorTier::Everything,
];
for pair in tiers.windows(2) {
assert!(
pair[0].max_file_bytes() <= pair[1].max_file_bytes(),
"{:?} file limit should not exceed {:?}",
pair[0],
pair[1],
);
}
}
// ── CreatorTier::max_storage_bytes ───────────────────────────────────
#[test]
fn max_storage_basic_is_50gb() {
assert_eq!(CreatorTier::Basic.max_storage_bytes(), 50 * 1024 * 1024 * 1024);
}
#[test]
fn max_storage_small_files_is_250gb() {
assert_eq!(CreatorTier::SmallFiles.max_storage_bytes(), 250 * 1024 * 1024 * 1024);
}
#[test]
fn max_storage_big_files_is_500gb() {
assert_eq!(CreatorTier::BigFiles.max_storage_bytes(), 500 * 1024 * 1024 * 1024);
}
#[test]
fn max_storage_everything_matches_big_files() {
assert_eq!(
CreatorTier::Everything.max_storage_bytes(),
CreatorTier::BigFiles.max_storage_bytes(),
);
}
#[test]
fn max_storage_non_decreasing() {
let tiers = [
CreatorTier::Basic,
CreatorTier::SmallFiles,
CreatorTier::BigFiles,
CreatorTier::Everything,
];
for pair in tiers.windows(2) {
assert!(
pair[0].max_storage_bytes() <= pair[1].max_storage_bytes(),
"{:?} storage limit should not exceed {:?}",
pair[0],
pair[1],
);
}
}
// ── CreatorTier::allows_file_uploads ─────────────────────────────────
#[test]
fn basic_tier_disallows_file_uploads() {
assert!(!CreatorTier::Basic.allows_file_uploads());
}
#[test]
fn small_files_allows_file_uploads() {
assert!(CreatorTier::SmallFiles.allows_file_uploads());
}
#[test]
fn big_files_allows_file_uploads() {
assert!(CreatorTier::BigFiles.allows_file_uploads());
}
#[test]
fn everything_allows_file_uploads() {
assert!(CreatorTier::Everything.allows_file_uploads());
}
// ── format_bytes helper ─────────────────────────────────────────────
#[test]
fn format_bytes_zero() {
assert_eq!(format_bytes(0), "0 B");
}
#[test]
fn format_bytes_one_byte() {
assert_eq!(format_bytes(1), "1 B");
}
#[test]
fn format_bytes_below_kb() {
assert_eq!(format_bytes(1023), "1023 B");
}
#[test]
fn format_bytes_exactly_1kb() {
assert_eq!(format_bytes(1024), "1.0 KB");
}
#[test]
fn format_bytes_exactly_1mb() {
assert_eq!(format_bytes(1024 * 1024), "1.0 MB");
}
#[test]
fn format_bytes_exactly_1gb() {
assert_eq!(format_bytes(1024 * 1024 * 1024), "1.0 GB");
}
#[test]
fn format_bytes_negative_clamped_to_zero() {
assert_eq!(format_bytes(-999), "0 B");
}
#[test]
fn format_bytes_large_storage_cap() {
// 500 GB (Everything tier cap)
assert_eq!(format_bytes(500 * 1024 * 1024 * 1024), "500.0 GB");
}
// ── StorageBreakdown ────────────────────────────────────────────────
#[test]
fn storage_breakdown_default_is_all_zeros() {
let sb = StorageBreakdown::default();
assert_eq!(sb.audio_bytes, 0);
assert_eq!(sb.cover_bytes, 0);
assert_eq!(sb.download_bytes, 0);
assert_eq!(sb.insertion_bytes, 0);
assert_eq!(sb.video_bytes, 0);
assert_eq!(sb.media_bytes, 0);
assert_eq!(sb.total_bytes, 0);
}
#[test]
fn storage_breakdown_total_is_sum_of_categories() {
let sb = StorageBreakdown {
audio_bytes: 100,
cover_bytes: 200,
download_bytes: 300,
insertion_bytes: 400,
video_bytes: 500,
media_bytes: 600,
total_bytes: 100 + 200 + 300 + 400 + 500 + 600,
};
assert_eq!(
sb.total_bytes,
sb.audio_bytes + sb.cover_bytes + sb.download_bytes
+ sb.insertion_bytes + sb.video_bytes + sb.media_bytes,
);
}
#[test]
fn storage_breakdown_single_category() {
let sb = StorageBreakdown {
audio_bytes: 1_000_000,
total_bytes: 1_000_000,
..Default::default()
};
assert_eq!(sb.total_bytes, 1_000_000);
assert_eq!(sb.cover_bytes, 0);
}
// ── Boundary / cross-cutting ────────────────────────────────────────
#[test]
fn basic_file_limit_less_than_storage_limit() {
assert!(CreatorTier::Basic.max_file_bytes() < CreatorTier::Basic.max_storage_bytes());
}
#[test]
fn every_tier_file_limit_within_storage_limit() {
for tier in [
CreatorTier::Basic,
CreatorTier::SmallFiles,
CreatorTier::BigFiles,
CreatorTier::Everything,
] {
assert!(
tier.max_file_bytes() <= tier.max_storage_bytes(),
"{:?} file limit exceeds its own storage limit",
tier,
);
}
}
}