//! User account CRUD, profile updates, and lookup queries. use sqlx::PgPool; use super::enums::AppealDecision; use super::models::*; use super::validated_types::{Email, Username}; use super::UserId; use crate::error::Result; /// Insert a new user and return the created row. #[tracing::instrument(skip_all)] pub async fn create_user( pool: &PgPool, username: &Username, email: &Email, password_hash: &str, ) -> Result { let user = sqlx::query_as::<_, DbUser>( r#" INSERT INTO users (username, email, password_hash) VALUES ($1, $2, $3) RETURNING * "#, ) .bind(username) .bind(email) .bind(password_hash) .fetch_one(pool) .await?; Ok(user) } /// Fetch a user by primary key. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get_user_by_id(pool: &PgPool, id: UserId) -> Result> { let user = sqlx::query_as::<_, DbUser>("SELECT * FROM users WHERE id = $1") .bind(id) .fetch_optional(pool) .await?; Ok(user) } /// Fetch multiple users by ID in a single query. #[tracing::instrument(skip_all)] pub async fn get_users_by_ids(pool: &PgPool, ids: &[UserId]) -> Result> { let users = sqlx::query_as::<_, DbUser>("SELECT * FROM users WHERE id = ANY($1)") .bind(ids) .fetch_all(pool) .await?; Ok(users) } /// Fetch a user by username. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get_user_by_username(pool: &PgPool, username: &Username) -> Result> { let user = sqlx::query_as::<_, DbUser>("SELECT * FROM users WHERE username = $1") .bind(username) .fetch_optional(pool) .await?; Ok(user) } /// Fetch a user by email address. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get_user_by_email(pool: &PgPool, email: &Email) -> Result> { let user = sqlx::query_as::<_, DbUser>("SELECT * FROM users WHERE email = $1") .bind(email) .fetch_optional(pool) .await?; Ok(user) } /// Update a user's display name and/or bio (COALESCE keeps existing values when `None`). #[tracing::instrument(skip_all)] pub async fn update_user_profile( pool: &PgPool, id: UserId, display_name: Option<&str>, bio: Option<&str>, ) -> Result { let user = sqlx::query_as::<_, DbUser>( r#" UPDATE users SET display_name = COALESCE($2, display_name), bio = COALESCE($3, bio) WHERE id = $1 RETURNING * "#, ) .bind(id) .bind(display_name) .bind(bio) .fetch_one(pool) .await?; Ok(user) } /// Replace a user's password hash and invalidate outstanding JWTs. #[tracing::instrument(skip_all)] pub async fn update_user_password(pool: &PgPool, id: UserId, password_hash: &str) -> Result<()> { sqlx::query("UPDATE users SET password_hash = $2, jwt_invalidated_at = NOW() WHERE id = $1") .bind(id) .bind(password_hash) .execute(pool) .await?; Ok(()) } /// Increment the user's feed key version, revoking their current personal-feed /// URL. Returns the new version (folded into the next URL's HMAC). #[tracing::instrument(skip_all)] pub async fn bump_feed_key_version(pool: &PgPool, id: UserId) -> Result { let (version,): (i32,) = sqlx::query_as( "UPDATE users SET feed_key_version = feed_key_version + 1, updated_at = NOW() \ WHERE id = $1 RETURNING feed_key_version", ) .bind(id) .fetch_one(pool) .await?; Ok(version) } /// Self-deactivate an account (enter limbo state). /// /// Bumps `jwt_invalidated_at` so any outstanding SyncKit JWTs minted from /// this account stop authenticating immediately. #[tracing::instrument(skip_all)] pub async fn deactivate_user(pool: &PgPool, id: UserId) -> Result<()> { sqlx::query( "UPDATE users SET deactivated_at = NOW(), jwt_invalidated_at = NOW(), updated_at = NOW() WHERE id = $1", ) .bind(id) .execute(pool) .await?; Ok(()) } /// Reactivate a self-deactivated account. #[tracing::instrument(skip_all)] pub async fn reactivate_user(pool: &PgPool, id: UserId) -> Result<()> { sqlx::query( "UPDATE users SET deactivated_at = NULL, updated_at = NOW() WHERE id = $1", ) .bind(id) .execute(pool) .await?; Ok(()) } /// Admin: permanently terminate an account (enforcement ladder step 4). /// The user has 30 days to export data. After that, the scheduler deletes the account. /// The account must already be suspended. #[tracing::instrument(skip_all)] pub async fn terminate_user(pool: &PgPool, id: UserId) -> Result<()> { sqlx::query( "UPDATE users SET terminated_at = NOW(), jwt_invalidated_at = NOW(), updated_at = NOW() WHERE id = $1", ) .bind(id) .execute(pool) .await?; Ok(()) } /// Get user IDs of terminated accounts whose 30-day export window has expired. #[tracing::instrument(skip_all)] pub async fn get_expired_terminated_ids(pool: &PgPool) -> Result> { let ids: Vec = sqlx::query_scalar( r#" SELECT id FROM users WHERE terminated_at IS NOT NULL AND terminated_at < NOW() - INTERVAL '30 days' ORDER BY terminated_at LIMIT 1000 "#, ) .fetch_all(pool) .await?; Ok(ids) } /// Permanently delete a user by ID. #[tracing::instrument(skip_all)] pub async fn delete_user(pool: &PgPool, id: UserId) -> Result<()> { sqlx::query("DELETE FROM users WHERE id = $1") .bind(id) .execute(pool) .await?; Ok(()) } /// Check whether this creator has any completed sales (transactions where they were the seller). #[tracing::instrument(skip_all)] pub async fn has_completed_sales(pool: &PgPool, id: UserId) -> Result { let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM transactions WHERE seller_id = $1 AND status = 'completed'", ) .bind(id) .fetch_one(pool) .await?; Ok(count > 0) } /// Schedule content removal 90 days from now. The user row is hidden from public /// views but items remain accessible to buyers who previously purchased them. /// After 90 days the scheduler deletes S3 objects and the user row. #[tracing::instrument(skip_all)] pub async fn schedule_content_removal(pool: &PgPool, id: UserId) -> Result<()> { sqlx::query( r#" UPDATE users SET content_removal_at = NOW() + INTERVAL '90 days', deactivated_at = NOW(), updated_at = NOW() WHERE id = $1 "#, ) .bind(id) .execute(pool) .await?; Ok(()) } /// Get user IDs whose 90-day content removal grace period has expired. #[tracing::instrument(skip_all)] pub async fn get_expired_content_removal_ids(pool: &PgPool) -> Result> { let ids: Vec = sqlx::query_scalar( r#" SELECT id FROM users WHERE content_removal_at IS NOT NULL AND content_removal_at < NOW() ORDER BY content_removal_at LIMIT 1000 "#, ) .fetch_all(pool) .await?; Ok(ids) } /// Create an ephemeral sandbox user. Returns the created row. /// /// The user gets `can_create_projects = true`, `email_verified = true`, /// a SmallFiles creator tier, and a tight storage cap. The row is /// automatically cleaned up by the scheduler after `sandbox_expires_at`. #[tracing::instrument(skip_all)] pub async fn create_sandbox_user( pool: &PgPool, username: &Username, email: &Email, password_hash: &str, expiry_secs: i64, ) -> Result { let user = sqlx::query_as::<_, DbUser>( r#" INSERT INTO users ( username, email, password_hash, is_sandbox, sandbox_expires_at, can_create_projects, email_verified, creator_tier ) VALUES ( $1, $2, $3, TRUE, NOW() + make_interval(secs => $4::float8), TRUE, TRUE, 'small_files' ) RETURNING * "#, ) .bind(username) .bind(email) .bind(password_hash) .bind(expiry_secs as f64) .fetch_one(pool) .await?; Ok(user) } /// Return IDs of sandbox users whose expiry has passed. #[tracing::instrument(skip_all)] pub async fn get_expired_sandbox_ids(pool: &PgPool) -> Result> { // Per-tick LIMIT bounds the supervisor's input list (the scheduler re-ticks // and the WHERE re-excludes already-deleted rows, so the remainder is picked // up next tick). Run #12 INFO — keeps a pathological mass-expiry from loading // an unbounded id vec even though concurrency is already capped at 4. let ids = sqlx::query_scalar::<_, UserId>( "SELECT id FROM users WHERE is_sandbox = TRUE AND sandbox_expires_at < NOW() \ ORDER BY sandbox_expires_at LIMIT 1000", ) .fetch_all(pool) .await?; Ok(ids) } /// Count active (non-expired) sandbox accounts created from a given IP. /// Used to enforce the per-IP concurrent sandbox cap. #[tracing::instrument(skip_all)] pub async fn count_active_sandboxes_by_ip(pool: &PgPool, ip: &str) -> Result { let count: i64 = sqlx::query_scalar( r#" SELECT COUNT(*) FROM users u JOIN user_sessions us ON us.user_id = u.id WHERE u.is_sandbox = TRUE AND u.sandbox_expires_at > NOW() AND us.ip_address = $1 "#, ) .bind(ip) .fetch_one(pool) .await?; Ok(count) } /// Update user's Stripe Connect account information after OAuth #[tracing::instrument(skip_all)] pub async fn update_user_stripe_account( pool: &PgPool, user_id: UserId, stripe_account_id: &str, onboarding_complete: bool, payouts_enabled: bool, charges_enabled: bool, ) -> Result { let user = sqlx::query_as::<_, DbUser>( r#" UPDATE users SET stripe_account_id = $2, stripe_onboarding_complete = $3, stripe_payouts_enabled = $4, stripe_charges_enabled = $5, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(user_id) .bind(stripe_account_id) .bind(onboarding_complete) .bind(payouts_enabled) .bind(charges_enabled) .fetch_one(pool) .await?; Ok(user) } /// Atomically set a user's Stripe Connect account ID, but only if one is not /// already set. Returns `Some(user)` on success, or `None` if another request /// already claimed the slot (race-condition guard). #[tracing::instrument(skip_all)] pub async fn try_set_stripe_account( pool: &PgPool, user_id: UserId, stripe_account_id: &str, ) -> Result> { let user = sqlx::query_as::<_, DbUser>( r#" UPDATE users SET stripe_account_id = $2, stripe_onboarding_complete = false, stripe_payouts_enabled = false, stripe_charges_enabled = false, updated_at = NOW() WHERE id = $1 AND (stripe_account_id IS NULL OR stripe_account_id = '') RETURNING * "#, ) .bind(user_id) .bind(stripe_account_id) .fetch_optional(pool) .await?; Ok(user) } /// Update user's Stripe status from webhook (finds user by stripe_account_id) #[tracing::instrument(skip_all)] pub async fn update_user_stripe_status( pool: &PgPool, stripe_account_id: &str, onboarding_complete: bool, payouts_enabled: bool, charges_enabled: bool, ) -> Result> { let user = sqlx::query_as::<_, DbUser>( r#" UPDATE users SET stripe_onboarding_complete = $2, stripe_payouts_enabled = $3, stripe_charges_enabled = $4, updated_at = NOW() WHERE stripe_account_id = $1 RETURNING * "#, ) .bind(stripe_account_id) .bind(onboarding_complete) .bind(payouts_enabled) .bind(charges_enabled) .fetch_optional(pool) .await?; Ok(user) } /// Mark a user's email as verified #[tracing::instrument(skip_all)] pub async fn verify_user_email(pool: &PgPool, user_id: UserId) -> Result<()> { sqlx::query( r#" UPDATE users SET email_verified = true, email_verification_token = NULL, updated_at = NOW() WHERE id = $1 "#, ) .bind(user_id) .execute(pool) .await?; Ok(()) } // ── Suspension / Appeals ── /// Suspend a user account, clearing any prior appeal fields. /// /// Bumps `jwt_invalidated_at` so SyncKit JWTs minted for this user expire /// at the next extractor check (subject to `SESSION_TOUCH_CACHE_SECS`). #[tracing::instrument(skip_all)] pub async fn suspend_user(pool: &PgPool, user_id: UserId, reason: &str) -> Result<()> { sqlx::query( r#" UPDATE users SET suspended_at = NOW(), suspension_reason = $2, jwt_invalidated_at = NOW(), appeal_text = NULL, appeal_submitted_at = NULL, appeal_decision = NULL, appeal_response = NULL, appeal_decided_at = NULL, updated_at = NOW() WHERE id = $1 "#, ) .bind(user_id) .bind(reason) .execute(pool) .await?; Ok(()) } /// Remove suspension and clear all suspension/appeal fields. #[tracing::instrument(skip_all)] pub async fn unsuspend_user(pool: &PgPool, user_id: UserId) -> Result<()> { sqlx::query( r#" UPDATE users SET suspended_at = NULL, suspension_reason = NULL, appeal_text = NULL, appeal_submitted_at = NULL, appeal_decision = NULL, appeal_response = NULL, appeal_decided_at = NULL, updated_at = NOW() WHERE id = $1 "#, ) .bind(user_id) .execute(pool) .await?; Ok(()) } // ── Creator pause (voluntary) ── /// Set the creator_paused_at timestamp (voluntary pause). #[tracing::instrument(skip_all)] pub async fn pause_creator(pool: &PgPool, user_id: UserId) -> Result<()> { sqlx::query( "UPDATE users SET creator_paused_at = NOW(), updated_at = NOW() WHERE id = $1", ) .bind(user_id) .execute(pool) .await?; Ok(()) } /// Clear the creator_paused_at timestamp (resume from voluntary pause). #[tracing::instrument(skip_all)] pub async fn unpause_creator(pool: &PgPool, user_id: UserId) -> Result<()> { sqlx::query( "UPDATE users SET creator_paused_at = NULL, updated_at = NOW() WHERE id = $1", ) .bind(user_id) .execute(pool) .await?; Ok(()) } /// Submit an appeal for a suspended account, clearing any prior decision. #[tracing::instrument(skip_all)] pub async fn submit_appeal(pool: &PgPool, user_id: UserId, appeal_text: &str) -> Result<()> { sqlx::query( r#" UPDATE users SET appeal_text = $2, appeal_submitted_at = NOW(), appeal_decision = NULL, appeal_response = NULL, appeal_decided_at = NULL, updated_at = NOW() WHERE id = $1 AND suspended_at IS NOT NULL "#, ) .bind(user_id) .bind(appeal_text) .execute(pool) .await?; Ok(()) } /// Resolve an appeal. If approved, also clears suspension. #[tracing::instrument(skip_all)] pub async fn resolve_appeal( pool: &PgPool, user_id: UserId, decision: AppealDecision, response: &str, ) -> Result<()> { if decision == AppealDecision::Approved { // Approve: clear suspension entirely sqlx::query( r#" UPDATE users SET appeal_decision = $2, appeal_response = $3, appeal_decided_at = NOW(), suspended_at = NULL, suspension_reason = NULL, updated_at = NOW() WHERE id = $1 "#, ) .bind(user_id) .bind(decision) .bind(response) .execute(pool) .await?; } else { // Deny: keep suspension, record decision sqlx::query( r#" UPDATE users SET appeal_decision = $2, appeal_response = $3, appeal_decided_at = NOW(), updated_at = NOW() WHERE id = $1 "#, ) .bind(user_id) .bind(decision) .bind(response) .execute(pool) .await?; } Ok(()) } /// Admin query: users with a pending appeal (submitted but not yet decided). #[tracing::instrument(skip_all)] pub async fn get_pending_appeals(pool: &PgPool) -> Result> { let users = sqlx::query_as::<_, DbUser>( r#" SELECT * FROM users WHERE appeal_submitted_at IS NOT NULL AND appeal_decided_at IS NULL ORDER BY appeal_submitted_at ASC LIMIT 500 "#, ) .fetch_all(pool) .await?; Ok(users) } /// Admin query: all users, optionally filtered by suspension status, with pagination. #[tracing::instrument(skip_all)] pub async fn get_all_users( pool: &PgPool, filter: Option<&str>, limit: i64, offset: i64, ) -> Result> { let limit = limit.min(200); let users = match filter { Some("suspended") => { sqlx::query_as::<_, DbUser>( "SELECT * FROM users WHERE suspended_at IS NOT NULL ORDER BY suspended_at DESC LIMIT $1 OFFSET $2", ) .bind(limit) .bind(offset) .fetch_all(pool) .await? } Some("active") => { sqlx::query_as::<_, DbUser>( "SELECT * FROM users WHERE suspended_at IS NULL ORDER BY created_at DESC LIMIT $1 OFFSET $2", ) .bind(limit) .bind(offset) .fetch_all(pool) .await? } _ => { sqlx::query_as::<_, DbUser>( "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2", ) .bind(limit) .bind(offset) .fetch_all(pool) .await? } }; Ok(users) } /// Count users matching a filter (for pagination totals). #[tracing::instrument(skip_all)] pub async fn count_users(pool: &PgPool, filter: Option<&str>) -> Result { let count = match filter { Some("suspended") => { sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM users WHERE suspended_at IS NOT NULL", ) .fetch_one(pool) .await? } Some("active") => { sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM users WHERE suspended_at IS NULL", ) .fetch_one(pool) .await? } _ => { sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM users") .fetch_one(pool) .await? } }; Ok(count) } /// Count total and suspended users in a single query. #[tracing::instrument(skip_all)] pub async fn count_users_summary(pool: &PgPool) -> Result<(i64, i64)> { let (total, suspended): (i64, i64) = sqlx::query_as( r#" SELECT COUNT(*), COUNT(*) FILTER (WHERE suspended_at IS NOT NULL) FROM users "#, ) .fetch_one(pool) .await?; Ok((total, suspended)) } /// Get all user emails for bulk notifications (e.g. shutdown notice). #[tracing::instrument(skip_all)] pub async fn get_all_user_emails(pool: &PgPool) -> Result)>> { let rows = sqlx::query_as::<_, (String, Option)>( "SELECT email, display_name FROM users ORDER BY created_at ASC", ) .fetch_all(pool) .await?; Ok(rows) } /// Update a user's email notification preferences. #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all)] pub async fn update_notification_preferences( pool: &PgPool, id: UserId, notify_sale: bool, notify_follower: bool, notify_release: bool, login_notification_enabled: bool, notify_issues: bool, notify_status: bool, ) -> Result<()> { sqlx::query( r#" UPDATE users SET notify_sale = $2, notify_follower = $3, notify_release = $4, login_notification_enabled = $5, notify_issues = $6, notify_status = $7, updated_at = NOW() WHERE id = $1 "#, ) .bind(id) .bind(notify_sale) .bind(notify_follower) .bind(notify_release) .bind(login_notification_enabled) .bind(notify_issues) .bind(notify_status) .execute(pool) .await?; Ok(()) } /// Update a user's tip preferences (tips_enabled toggle and notification). #[tracing::instrument(skip_all)] pub async fn update_tip_preferences( pool: &PgPool, id: UserId, tips_enabled: bool, notify_tip: bool, ) -> Result<()> { sqlx::query( r#" UPDATE users SET tips_enabled = $2, notify_tip = $3, updated_at = NOW() WHERE id = $1 "#, ) .bind(id) .bind(tips_enabled) .bind(notify_tip) .execute(pool) .await?; Ok(()) } /// Disable a single notification preference by column name. /// /// Used by the email unsubscribe handler. Only accepts known column names /// to prevent SQL injection. #[tracing::instrument(skip_all)] pub async fn disable_notification(pool: &PgPool, user_id: UserId, preference: &str) -> Result { let sql = match preference { "notify_sale" => "UPDATE users SET notify_sale = false, updated_at = NOW() WHERE id = $1", "notify_follower" => "UPDATE users SET notify_follower = false, updated_at = NOW() WHERE id = $1", "notify_release" => "UPDATE users SET notify_release = false, updated_at = NOW() WHERE id = $1", "login_notification_enabled" => "UPDATE users SET login_notification_enabled = false, updated_at = NOW() WHERE id = $1", "notify_issues" => "UPDATE users SET notify_issues = false, updated_at = NOW() WHERE id = $1", "notify_tip" => "UPDATE users SET notify_tip = false, updated_at = NOW() WHERE id = $1", "notify_status" => "UPDATE users SET notify_status = false, updated_at = NOW() WHERE id = $1", _ => return Ok(false), }; let result = sqlx::query(sql).bind(user_id).execute(pool).await?; Ok(result.rows_affected() > 0) } /// A user who opted into platform status notifications. #[derive(sqlx::FromRow)] pub struct StatusAlertSubscriber { pub id: UserId, pub email: Email, pub display_name: Option, } /// Get all users who opted into platform status notifications. /// /// Hard cap at 10k rows so the monitor's status-change fan-out can't unbox /// an unbounded query into RAM. The 100ms pacing in `monitor.rs` already /// limits fan-out throughput to ~600/minute — anything past 10k would /// chew through Postmark rate limits anyway. If we ever hit the cap a /// WARN fires so we know to switch to a paged dispatch model. #[tracing::instrument(skip_all)] pub async fn get_status_alert_subscribers(pool: &PgPool) -> Result> { const STATUS_SUBSCRIBER_CAP: i64 = 10_000; let rows = sqlx::query_as::<_, StatusAlertSubscriber>( "SELECT id, email, display_name FROM users \ WHERE notify_status = true AND deactivated_at IS NULL \ ORDER BY id LIMIT $1", ) .bind(STATUS_SUBSCRIBER_CAP) .fetch_all(pool) .await?; if rows.len() as i64 == STATUS_SUBSCRIBER_CAP { tracing::warn!( cap = STATUS_SUBSCRIBER_CAP, "get_status_alert_subscribers hit hard cap; promote to paged dispatch" ); } Ok(rows) } /// Atomically check-and-set broadcast timestamp. Returns false if already sent within 24 hours. #[tracing::instrument(skip_all)] pub async fn try_set_broadcast_at(pool: &PgPool, user_id: UserId) -> Result { let result = sqlx::query( r#" UPDATE users SET last_broadcast_at = NOW() WHERE id = $1 AND (last_broadcast_at IS NULL OR last_broadcast_at < NOW() - INTERVAL '24 hours') "#, ) .bind(user_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Release the 24h broadcast slot. Used when a broadcast is refused after the /// slot has already been claimed (e.g. recipient cap exceeded) so the creator /// can retry without waiting a day. #[tracing::instrument(skip_all)] pub async fn clear_broadcast_at(pool: &PgPool, user_id: UserId) -> Result<()> { sqlx::query("UPDATE users SET last_broadcast_at = NULL WHERE id = $1") .bind(user_id) .execute(pool) .await?; Ok(()) } // ── Upload Trust ── /// Check if a user is trusted for uploads (bypasses review queue). #[tracing::instrument(skip_all)] pub async fn is_upload_trusted(pool: &PgPool, user_id: UserId) -> Result { let trusted = sqlx::query_scalar::<_, bool>( "SELECT upload_trusted FROM users WHERE id = $1", ) .bind(user_id) .fetch_one(pool) .await?; Ok(trusted) } /// Set a user's upload trust status. #[tracing::instrument(skip_all)] pub async fn set_upload_trusted(pool: &PgPool, user_id: UserId, trusted: bool) -> Result<()> { sqlx::query( r#" UPDATE users SET upload_trusted = $2, updated_at = NOW() WHERE id = $1 "#, ) .bind(user_id) .bind(trusted) .execute(pool) .await?; Ok(()) } // ── Onboarding email drip ── /// Users who need the next onboarding email. Returns users at a given step /// whose last email was sent more than `min_age` ago (or never). #[tracing::instrument(skip_all)] pub async fn get_onboarding_candidates( pool: &PgPool, step: i16, min_age: chrono::Duration, ) -> Result> { let cutoff = chrono::Utc::now() - min_age; // Per-tick LIMIT bounds the scheduler's input list. The caller advances // each returned user's step (so the WHERE re-excludes them), meaning the // remainder is drained on the next tick — same re-tick pattern as the // sandbox/terminated/content-removal cleanup queries. Run #14 MEDIUM: a // signup surge must not load an unbounded user vec into the lock-held tick. let users = sqlx::query_as::<_, DbUser>( "SELECT * FROM users WHERE onboarding_email_step = $1 AND (onboarding_email_sent_at IS NULL OR onboarding_email_sent_at < $2) AND suspended_at IS NULL ORDER BY onboarding_email_sent_at ASC NULLS FIRST LIMIT 1000", ) .bind(step) .bind(cutoff) .fetch_all(pool) .await?; Ok(users) } /// Advance a user's onboarding email step and record the send time. #[tracing::instrument(skip_all)] pub async fn advance_onboarding_step(pool: &PgPool, user_id: UserId, new_step: i16) -> Result<()> { sqlx::query( "UPDATE users SET onboarding_email_step = $2, onboarding_email_sent_at = NOW() WHERE id = $1", ) .bind(user_id) .bind(new_step) .execute(pool) .await?; Ok(()) } /// Advance onboarding step for multiple users in a single query. #[tracing::instrument(skip_all)] pub async fn batch_advance_onboarding_step( pool: &PgPool, user_ids: &[UserId], new_step: i16, ) -> Result<()> { sqlx::query( "UPDATE users SET onboarding_email_step = $2, onboarding_email_sent_at = NOW() WHERE id = ANY($1)", ) .bind(user_ids) .bind(new_step) .execute(pool) .await?; Ok(()) } /// Mark a user as a founder. Called when they start a creator-tier /// subscription while the founder pricing window is open. Sticky; never /// reset, even on cancellation. Subsequent re-subscriptions during the /// window keep their founder status. After the window closes, eligibility /// is determined by `founder_locked_at` (stamped only for users with an /// active subscription at the close-time snapshot). /// /// **DIY exclusion**: DIY-tier accounts are not full members and must not /// qualify for founder pricing (`project_founder_pricing.md` § decision 5). /// Only call this from creator-tier (Basic/SmallFiles/BigFiles/Everything) /// checkout paths. When DIY ships, its checkout path must NOT invoke this. #[tracing::instrument(skip_all)] pub async fn mark_user_as_founder(pool: &PgPool, user_id: UserId) -> Result<()> { sqlx::query( r#" UPDATE users SET is_founder = TRUE, updated_at = NOW() WHERE id = $1 AND is_founder = FALSE "#, ) .bind(user_id) .execute(pool) .await?; Ok(()) } /// Close the founder pricing window by stamping `founder_locked_at` on every /// user who is currently flagged `is_founder` AND has an active creator-tier /// subscription. Returns the number of users locked in. Idempotent: skips /// any user already locked. Intended to be called once from an admin tool /// at the moment the founder window closes. #[tracing::instrument(skip_all)] pub async fn lock_in_founders_with_active_subscriptions(pool: &PgPool) -> Result { let result = sqlx::query( r#" UPDATE users u SET founder_locked_at = NOW(), updated_at = NOW() WHERE u.is_founder = TRUE AND u.founder_locked_at IS NULL AND EXISTS ( SELECT 1 FROM creator_subscriptions s WHERE s.user_id = u.id AND s.status = 'active' ) "#, ) .execute(pool) .await?; Ok(result.rows_affected()) } /// Update a user's Stripe Tax toggle. #[tracing::instrument(skip_all)] pub async fn update_stripe_tax_enabled(pool: &PgPool, user_id: UserId, enabled: bool) -> Result<()> { sqlx::query( r#" UPDATE users SET stripe_tax_enabled = $2, updated_at = NOW() WHERE id = $1 "#, ) .bind(user_id) .bind(enabled) .execute(pool) .await?; Ok(()) } /// Disconnect a user's Stripe account #[tracing::instrument(skip_all)] pub async fn disconnect_user_stripe(pool: &PgPool, user_id: UserId) -> Result { let user = sqlx::query_as::<_, DbUser>( r#" UPDATE users SET stripe_account_id = NULL, stripe_onboarding_complete = false, stripe_payouts_enabled = false, stripe_charges_enabled = false, updated_at = NOW() WHERE id = $1 RETURNING * "#, ) .bind(user_id) .fetch_one(pool) .await?; Ok(user) } /// Fetch the current cache generation for a user (cheap, indexed lookup). #[tracing::instrument(skip_all)] pub async fn get_cache_generation(pool: &PgPool, user_id: UserId) -> Result { let generation = sqlx::query_scalar::<_, i64>( "SELECT cache_generation FROM users WHERE id = $1", ) .bind(user_id) .fetch_one(pool) .await?; Ok(generation) } /// Atomically increment the user's cache generation counter. /// Call after any write that changes user-visible dashboard data. #[tracing::instrument(skip_all)] pub async fn bump_cache_generation(pool: &PgPool, user_id: UserId) -> Result<()> { sqlx::query("UPDATE users SET cache_generation = cache_generation + 1 WHERE id = $1") .bind(user_id) .execute(pool) .await?; Ok(()) } /// Look up a verified user by email (case-insensitive). /// Returns the user ID if a verified account exists with that email. #[tracing::instrument(skip_all)] pub async fn get_verified_user_id_by_email(pool: &PgPool, email: &Email) -> Result> { let id = sqlx::query_scalar( "SELECT id FROM users WHERE LOWER(email) = LOWER($1) AND email_verified = true", ) .bind(email) .fetch_optional(pool) .await?; Ok(id) }