//! Transaction queries: checkout tracking, purchase history, and free claims. use chrono::{DateTime, Utc}; use sqlx::PgPool; use super::models::*; use super::validated_types::KeyCode; use super::{Cents, ClaimToken, DownloadToken, ItemId, ProjectId, PromoCodeId, TransactionId, UserId}; use crate::error::Result; /// Parameters for creating a pending Stripe checkout transaction. pub struct CreateTransactionParams<'a> { pub buyer_id: Option, pub seller_id: UserId, /// `None` for project-level purchases (no specific item). pub item_id: Option, pub amount_cents: Cents, pub platform_fee_cents: Cents, pub stripe_checkout_session_id: &'a str, pub item_title: &'a str, pub seller_username: &'a str, pub share_contact: bool, /// Set for project-level purchases; `None` for item purchases. pub project_id: Option, /// Promo code used for this checkout (for releasing reservations on cleanup). pub promo_code_id: Option, /// Guest buyer's email (set for guest checkouts, None for logged-in). pub guest_email: Option<&'a str>, } /// Common parameters for claiming a free item (direct, discount code, or download code). pub struct ClaimParams<'a> { pub buyer_id: UserId, pub item_id: ItemId, pub seller_id: UserId, pub item_title: &'a str, pub seller_username: &'a str, pub share_contact: bool, /// If this claim was granted via a bundle purchase, the parent transaction ID. pub parent_transaction_id: Option, } /// Record a new pending transaction for a Stripe checkout session. #[tracing::instrument(skip_all)] pub async fn create_transaction<'e>( executor: impl sqlx::PgExecutor<'e>, params: &CreateTransactionParams<'_>, ) -> Result { let tx = sqlx::query_as::<_, DbTransaction>( r#" INSERT INTO transactions (buyer_id, seller_id, item_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, item_title, seller_username, share_contact, project_id, promo_code_id, guest_email) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING * "#, ) .bind(params.buyer_id) .bind(params.seller_id) .bind(params.item_id) .bind(params.amount_cents) .bind(params.platform_fee_cents) .bind(params.stripe_checkout_session_id) .bind(params.item_title) .bind(params.seller_username) .bind(params.share_contact) .bind(params.project_id) .bind(params.promo_code_id) .bind(params.guest_email) .fetch_one(executor) .await?; Ok(tx) } /// Complete a guest transaction: set guest_email, generate claim_token, and /// optionally auto-attach to an existing user if the email matches. /// /// Uses a transaction to prevent races between auto-attach and concurrent /// signup/email-verification for the same email. #[tracing::instrument(skip_all)] pub async fn complete_guest_transaction<'e>( executor: impl sqlx::PgExecutor<'e>, stripe_checkout_session_id: &str, stripe_payment_intent_id: &str, guest_email: &str, existing_user_id: Option, ) -> Result> { let claim_token = ClaimToken::new(); let tx = sqlx::query_as::<_, DbTransaction>( r#" UPDATE transactions SET status = 'completed', stripe_payment_intent_id = $2, completed_at = NOW(), guest_email = $3, claim_token = CASE WHEN $4::UUID IS NOT NULL THEN NULL ELSE $5 END, buyer_id = $4 WHERE stripe_checkout_session_id = $1 AND status = 'pending' RETURNING * "#, ) .bind(stripe_checkout_session_id) .bind(stripe_payment_intent_id) .bind(guest_email) .bind(existing_user_id) .bind(claim_token) .fetch_optional(executor) .await?; Ok(tx) } /// Attach all unclaimed guest purchases for an email to a user account. /// Called during signup/email verification to auto-claim prior guest purchases. #[tracing::instrument(skip_all)] pub async fn attach_guest_purchases_by_email( pool: &PgPool, email: &str, user_id: UserId, ) -> Result { let result = sqlx::query( r#" UPDATE transactions SET buyer_id = $1, claimed_by = $1, claim_token = NULL WHERE LOWER(guest_email) = LOWER($2) AND buyer_id IS NULL AND status = 'completed' "#, ) .bind(user_id) .bind(email) .execute(pool) .await?; Ok(result.rows_affected()) } /// Claim a single guest purchase by claim token. #[tracing::instrument(skip_all)] pub async fn claim_guest_purchase( pool: &PgPool, claim_token: ClaimToken, user_id: UserId, ) -> Result> { let tx = sqlx::query_as::<_, DbTransaction>( r#" UPDATE transactions SET buyer_id = $2, claimed_by = $2, claim_token = NULL WHERE claim_token = $1 AND buyer_id IS NULL AND status = 'completed' RETURNING * "#, ) .bind(claim_token) .bind(user_id) .fetch_optional(pool) .await?; Ok(tx) } /// Look up a completed transaction by download token (for guest download links). #[tracing::instrument(skip_all)] pub async fn get_transaction_by_download_token( pool: &PgPool, download_token: DownloadToken, ) -> Result> { let tx = sqlx::query_as::<_, DbTransaction>( "SELECT * FROM transactions WHERE download_token = $1 AND status = 'completed'", ) .bind(download_token) .fetch_optional(pool) .await?; Ok(tx) } /// Mark a pending transaction as completed (idempotent; returns `None` if already completed). /// /// Accepts any sqlx executor (`&PgPool`, `&mut Transaction`, etc.) so callers /// can include this in a larger transaction when needed. #[tracing::instrument(skip_all)] pub async fn complete_transaction<'e>( executor: impl sqlx::PgExecutor<'e>, stripe_checkout_session_id: &str, stripe_payment_intent_id: &str, ) -> Result> { // Only update if status is 'pending' for idempotency // Returns None if transaction was already completed (duplicate webhook) let tx = sqlx::query_as::<_, DbTransaction>( r#" UPDATE transactions SET status = 'completed', stripe_payment_intent_id = $2, completed_at = NOW() WHERE stripe_checkout_session_id = $1 AND status = 'pending' RETURNING * "#, ) .bind(stripe_checkout_session_id) .bind(stripe_payment_intent_id) .fetch_optional(executor) .await?; Ok(tx) } /// Complete ALL pending transactions for a cart checkout session. /// Returns the list of completed transactions (empty if already processed). #[tracing::instrument(skip_all)] pub async fn complete_cart_transactions<'e>( executor: impl sqlx::PgExecutor<'e>, stripe_checkout_session_id: &str, stripe_payment_intent_id: &str, ) -> Result> { let txs = sqlx::query_as::<_, DbTransaction>( r#" UPDATE transactions SET status = 'completed', stripe_payment_intent_id = $2, completed_at = NOW() WHERE stripe_checkout_session_id = $1 AND status = 'pending' RETURNING * "#, ) .bind(stripe_checkout_session_id) .bind(stripe_payment_intent_id) .fetch_all(executor) .await?; Ok(txs) } /// List transactions where the user is the buyer, newest first. /// /// Pass `limit: None` for all rows (exports), or `Some(n)` for dashboard display. #[tracing::instrument(skip_all)] pub async fn get_transactions_by_buyer( pool: &PgPool, buyer_id: UserId, limit: Option, ) -> Result> { let txs = sqlx::query_as::<_, DbTransaction>( "SELECT * FROM transactions WHERE buyer_id = $1 ORDER BY created_at DESC LIMIT $2", ) .bind(buyer_id) .bind(limit) .fetch_all(pool) .await?; Ok(txs) } /// List transactions where the user is the seller, newest first. /// /// Pass `limit: None` for all rows (exports), or `Some(n)` for dashboard display. #[tracing::instrument(skip_all)] pub async fn get_transactions_by_seller( pool: &PgPool, seller_id: UserId, limit: Option, ) -> Result> { let txs = sqlx::query_as::<_, DbTransaction>( "SELECT * FROM transactions WHERE seller_id = $1 ORDER BY created_at DESC LIMIT $2", ) .bind(seller_id) .bind(limit) .fetch_all(pool) .await?; Ok(txs) } /// Check whether a user has a completed purchase for a given item. #[tracing::instrument(skip_all)] pub async fn has_purchased_item(pool: &PgPool, user_id: UserId, item_id: ItemId) -> Result { let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM transactions WHERE buyer_id = $1 AND item_id = $2 AND status = 'completed'", ) .bind(user_id) .bind(item_id) .fetch_one(pool) .await?; Ok(count > 0) } /// Bulk variant of `has_purchased_item`. Returns the subset of `item_ids` that /// the buyer has a completed purchase for. Single DB roundtrip vs. N calls. #[tracing::instrument(skip_all)] pub async fn purchased_subset( pool: &PgPool, user_id: UserId, item_ids: &[ItemId], ) -> Result> { if item_ids.is_empty() { return Ok(std::collections::HashSet::new()); } let rows: Vec<(ItemId,)> = sqlx::query_as( "SELECT DISTINCT item_id FROM transactions WHERE buyer_id = $1 AND status = 'completed' AND item_id = ANY($2)", ) .bind(user_id) .bind(item_ids) .fetch_all(pool) .await?; Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Get all item IDs that a user has purchased (for batch access checks) #[tracing::instrument(skip_all)] pub async fn get_user_purchased_item_ids(pool: &PgPool, user_id: UserId) -> Result> { let item_ids: Vec = sqlx::query_scalar( "SELECT DISTINCT item_id FROM transactions WHERE buyer_id = $1 AND status = 'completed'", ) .bind(user_id) .fetch_all(pool) .await?; Ok(item_ids) } /// Claims a free item by creating a zero-cost completed transaction. /// Returns true if claimed successfully, false if already in library. /// /// Uses `ON CONFLICT DO NOTHING` against the partial unique index on /// `(buyer_id, item_id) WHERE status = 'completed' AND item_id IS NOT NULL` to prevent duplicate /// claims under concurrent requests. #[tracing::instrument(skip_all)] pub async fn claim_free_item<'e>( executor: impl sqlx::PgExecutor<'e>, params: &ClaimParams<'_>, ) -> Result { let claim_id = format!("free-claim-{}-{}", params.buyer_id, params.item_id); let result = sqlx::query( r#" INSERT INTO transactions (buyer_id, seller_id, item_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, status, completed_at, item_title, seller_username, share_contact, parent_transaction_id) VALUES ($1, $2, $3, 0, 0, $4, 'completed', NOW(), $5, $6, $7, $8) ON CONFLICT (buyer_id, item_id) WHERE status = 'completed' AND item_id IS NOT NULL DO NOTHING "#, ) .bind(params.buyer_id) .bind(params.seller_id) .bind(params.item_id) .bind(&claim_id) .bind(params.item_title) .bind(params.seller_username) .bind(params.share_contact) .bind(params.parent_transaction_id) .execute(executor) .await?; Ok(result.rows_affected() > 0) } /// Optional parameters for generating a license key inside a claim transaction. pub struct LicenseKeyParams<'a> { pub key_code: &'a KeyCode, pub max_activations: Option, } /// Atomically claim a free item and increment the promo code's use count. /// /// Claims the item FIRST (INSERT transaction), then increments use_count. /// If the user already owns the item (rows_affected == 0), rolls back without /// consuming the code. If the code limit is reached, rolls back the claim too. /// /// When `license_key_params` is `Some`, a license key is created inside the /// same transaction so that the claim and key are always consistent. /// /// Returns `(code_accepted, item_claimed)`: /// - `code_accepted = false` → promo code hit its usage limit (nothing changed) /// - `item_claimed = false` → user already owns the item (code was NOT consumed) #[tracing::instrument(skip_all)] pub async fn claim_free_with_promo_code( pool: &PgPool, promo_code_id: PromoCodeId, params: &ClaimParams<'_>, license_key_params: Option<&LicenseKeyParams<'_>>, ) -> Result<(bool, bool)> { let mut tx = pool.begin().await?; // Step 1: Attempt to claim the item first let claim_id = format!("free-claim-{}-{}", params.buyer_id, params.item_id); let result = sqlx::query( r#" INSERT INTO transactions (buyer_id, seller_id, item_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, status, completed_at, item_title, seller_username, share_contact, promo_code_id) VALUES ($1, $2, $3, 0, 0, $4, 'completed', NOW(), $5, $6, $7, $8) ON CONFLICT (buyer_id, item_id) WHERE status = 'completed' AND item_id IS NOT NULL DO NOTHING "#, ) .bind(params.buyer_id) .bind(params.seller_id) .bind(params.item_id) .bind(&claim_id) .bind(params.item_title) .bind(params.seller_username) .bind(params.share_contact) .bind(promo_code_id) .execute(&mut *tx) .await?; let claimed = result.rows_affected() > 0; // Step 2: If the user already owns the item, rollback without consuming the code if !claimed { tx.rollback().await?; return Ok((true, false)); } // Step 3: Increment the promo code use count let code_result = sqlx::query( "UPDATE promo_codes SET use_count = use_count + 1 WHERE id = $1 AND (max_uses IS NULL OR use_count < max_uses)", ) .bind(promo_code_id) .execute(&mut *tx) .await?; // Step 4: If the code limit was reached, rollback the claim too if code_result.rows_affected() == 0 { tx.rollback().await?; return Ok((false, false)); } crate::db::items::increment_sales_count(&mut *tx, params.item_id).await?; // Step 5: Create license key inside the same transaction if requested. // Retry once on a unique-violation: the wordlist generator has ~6B-coin- // flip headroom, so an actual collision is vanishingly rare, but the // alternative is surfacing a 500 to a buyer mid-claim — cheap to handle. if let Some(lk) = license_key_params { let attempt = sqlx::query( r#" INSERT INTO license_keys (item_id, owner_id, transaction_id, key_code, max_activations) VALUES ($1, $2, NULL, $3, $4) "#, ) .bind(params.item_id) .bind(params.buyer_id) .bind(lk.key_code) .bind(lk.max_activations) .execute(&mut *tx) .await; if let Err(sqlx::Error::Database(e)) = &attempt && e.code().as_deref() == Some("23505") { let retry_code = crate::helpers::generate_key_code(); tracing::warn!(item_id = %params.item_id, "license key 23505 collision; retrying once"); sqlx::query( r#" INSERT INTO license_keys (item_id, owner_id, transaction_id, key_code, max_activations) VALUES ($1, $2, NULL, $3, $4) "#, ) .bind(params.item_id) .bind(params.buyer_id) .bind(&retry_code) .bind(lk.max_activations) .execute(&mut *tx) .await?; } else { attempt?; } } tx.commit().await?; Ok((true, true)) } // ── Project purchases ── /// Check whether a user has a completed purchase for a given project. #[tracing::instrument(skip_all)] pub async fn has_purchased_project(pool: &PgPool, user_id: UserId, project_id: ProjectId) -> Result { let count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM transactions WHERE buyer_id = $1 AND project_id = $2 AND status = 'completed'", ) .bind(user_id) .bind(project_id) .fetch_one(pool) .await?; Ok(count > 0) } /// Parameters for creating a pending project purchase transaction. pub struct CreateProjectTransactionParams<'a> { pub buyer_id: UserId, pub seller_id: UserId, pub project_id: ProjectId, pub amount_cents: i32, pub stripe_checkout_session_id: &'a str, pub project_title: &'a str, pub seller_username: &'a str, pub share_contact: bool, } /// Record a new pending transaction for a project purchase. #[tracing::instrument(skip_all)] pub async fn create_project_transaction( pool: &PgPool, params: &CreateProjectTransactionParams<'_>, ) -> Result { let tx = sqlx::query_as::<_, DbTransaction>( r#" INSERT INTO transactions (buyer_id, seller_id, project_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, item_title, seller_username, share_contact) VALUES ($1, $2, $3, $4, 0, $5, $6, $7, $8) RETURNING * "#, ) .bind(params.buyer_id) .bind(params.seller_id) .bind(params.project_id) .bind(params.amount_cents) .bind(params.stripe_checkout_session_id) .bind(params.project_title) .bind(params.seller_username) .bind(params.share_contact) .fetch_one(pool) .await?; Ok(tx) } /// Get items purchased by a user, including any associated license key. /// /// Reads from the `purchases` VIEW (which filters `transactions` to /// `status = 'completed'`), then JOINs through `items → projects → users` /// for display fields. The LEFT JOIN on `license_keys` attaches the most /// recent non-revoked key code so the buyer can see it in their library /// without a separate lookup. Capped at 20 rows for the dashboard summary. #[tracing::instrument(skip_all)] pub async fn get_user_purchases(pool: &PgPool, user_id: UserId) -> Result> { let purchases = sqlx::query_as::<_, DbPurchaseRow>( r#" SELECT * FROM ( SELECT DISTINCT ON (p.item_id) p.transaction_id, p.item_id, i.title, u.username as creator, i.item_type, p.purchased_at, (i.price_cents = 0) as is_free, lk.key_code as license_key_code, (vc.total_versions > 0 AND vc.total_versions > COALESCE(dc.downloaded_count, 0)) as has_new_version FROM purchases p JOIN items i ON p.item_id = i.id JOIN projects proj ON i.project_id = proj.id JOIN users u ON proj.user_id = u.id LEFT JOIN license_keys lk ON lk.item_id = p.item_id AND lk.owner_id = p.buyer_id AND lk.revoked_at IS NULL LEFT JOIN LATERAL ( SELECT COUNT(*) AS total_versions FROM versions v WHERE v.item_id = i.id AND v.s3_key IS NOT NULL ) vc ON true LEFT JOIN LATERAL ( SELECT COUNT(*) AS downloaded_count FROM user_downloads ud WHERE ud.user_id = p.buyer_id AND ud.item_id = i.id ) dc ON true WHERE p.buyer_id = $1 ORDER BY p.item_id, p.purchased_at DESC ) deduped ORDER BY purchased_at DESC LIMIT 20 "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(purchases) } /// Sum completed revenue and count sales for all items in a project. /// /// Returns `(total_revenue_cents, total_sales)`. Only completed transactions /// are counted; pending, failed, and refunded are excluded. #[tracing::instrument(skip_all)] pub async fn get_revenue_by_project(pool: &PgPool, project_id: ProjectId) -> Result<(i64, i64)> { let row: (Option, Option) = sqlx::query_as( r#" SELECT COALESCE(SUM(t.amount_cents), 0)::BIGINT, COUNT(*) FROM transactions t JOIN items i ON t.item_id = i.id WHERE i.project_id = $1 AND t.status = 'completed' "#, ) .bind(project_id) .fetch_one(pool) .await?; Ok((row.0.unwrap_or(0), row.1.unwrap_or(0))) } /// Revenue per project for a given seller, returned as (project_id, title, revenue_cents). /// Single query replaces N+1 loop in dashboard analytics. #[tracing::instrument(skip_all)] pub async fn get_revenue_by_user_projects( pool: &PgPool, user_id: UserId, ) -> Result> { let rows: Vec<(ProjectId, String, i64)> = sqlx::query_as( r#" SELECT p.id, p.title, COALESCE(SUM(t.amount_cents), 0)::BIGINT FROM projects p LEFT JOIN items i ON i.project_id = p.id LEFT JOIN transactions t ON t.item_id = i.id AND t.status = 'completed' WHERE p.user_id = $1 GROUP BY p.id, p.title HAVING COALESCE(SUM(t.amount_cents), 0) > 0 ORDER BY COALESCE(SUM(t.amount_cents), 0) DESC "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(rows) } /// Revenue and sales per project for a seller within a time range. /// /// Used for the cross-project comparison table on the user analytics tab. #[tracing::instrument(skip_all)] pub async fn get_revenue_by_user_projects_in_range( pool: &PgPool, user_id: UserId, range: &super::analytics::TimeRange, ) -> Result> { let time_filter = match range.interval_sql() { Some(interval) => format!( " AND t.completed_at >= NOW() - INTERVAL '{interval}'" ), None => String::new(), }; let sql = format!( r#" SELECT p.id, p.title, COALESCE(SUM(t.amount_cents), 0)::BIGINT, COUNT(t.id)::BIGINT FROM projects p LEFT JOIN items i ON i.project_id = p.id LEFT JOIN transactions t ON t.item_id = i.id AND t.status = 'completed'{time_filter} WHERE p.user_id = $1 GROUP BY p.id, p.title ORDER BY COALESCE(SUM(t.amount_cents), 0) DESC "# ); let rows: Vec<(ProjectId, String, i64, i64)> = sqlx::query_as(&sql) .bind(user_id) .fetch_all(pool) .await?; Ok(rows) } /// Remove a free item from library (deletes the claim transaction). /// If the claim was via a promo code, decrements the code's use_count. #[tracing::instrument(skip_all)] pub async fn remove_free_item_from_library( pool: &PgPool, user_id: UserId, item_id: ItemId, ) -> Result { // Delete the free claim and return the promo_code_id if one was used let row: Option> = sqlx::query_scalar( r#" DELETE FROM transactions WHERE buyer_id = $1 AND item_id = $2 AND amount_cents = 0 AND status = 'completed' RETURNING promo_code_id "#, ) .bind(user_id) .bind(item_id) .fetch_optional(pool) .await?; let deleted = row.is_some(); if let Some(Some(pc_id)) = row { super::promo_codes::release_use_count(pool, pc_id).await.ok(); } Ok(deleted) } /// Fetch a single transaction by ID. #[tracing::instrument(skip_all)] pub async fn get_transaction_by_id( pool: &PgPool, id: TransactionId, ) -> Result> { let tx = sqlx::query_as::<_, DbTransaction>("SELECT * FROM transactions WHERE id = $1") .bind(id) .fetch_optional(pool) .await?; Ok(tx) } /// Mark a transaction as refunded, returning its ID and item_id for downstream cleanup. /// /// The WHERE clause requires `status = 'completed'` so that already-refunded /// or pending transactions are not double-processed. Returns an empty vec if no /// matching transactions were found (idempotent for webhook retries). /// /// Returns ALL refunded transactions (handles cart checkouts where multiple /// transactions share the same payment_intent_id). #[tracing::instrument(skip_all)] pub async fn refund_transaction_by_payment_intent<'e>( executor: impl sqlx::PgExecutor<'e>, payment_intent_id: &str, ) -> Result)>> { // item_id is nullable on project-level transactions (routes/stripe/checkout/project.rs); // returning non-Optional ItemId would cause sqlx decode failures and infinite Stripe retries. let rows: Vec<(super::TransactionId, Option)> = sqlx::query_as( r#" UPDATE transactions SET status = 'refunded' WHERE stripe_payment_intent_id = $1 AND status = 'completed' RETURNING id, item_id "#, ) .bind(payment_intent_id) .fetch_all(executor) .await?; Ok(rows) } /// Revoke all child transactions linked to a parent (bundle) transaction. /// /// Returns the item IDs of revoked children so callers can decrement sales counts. #[tracing::instrument(skip_all)] pub async fn revoke_child_transactions<'e>( executor: impl sqlx::PgExecutor<'e>, parent_transaction_id: TransactionId, ) -> Result> { let item_ids: Vec<(Option,)> = sqlx::query_as( r#" UPDATE transactions SET status = 'refunded' WHERE parent_transaction_id = $1 AND status = 'completed' RETURNING item_id "#, ) .bind(parent_transaction_id) .fetch_all(executor) .await?; Ok(item_ids.into_iter().filter_map(|(id,)| id).collect()) } /// A buyer's email and display name for platform notifications (e.g. creator departure). /// /// Unlike [`DbContactRow`], this includes ALL buyers regardless of contact sharing /// preference, because platform-initiated notifications (content removal warnings) /// are distinct from creator-initiated contact. #[derive(sqlx::FromRow)] pub struct BuyerNotificationRow { pub email: String, pub display_name: Option, } /// Get unique buyers who purchased from a seller, for platform notifications. /// /// This bypasses `share_contact` and `contact_revocations` because the notification /// is sent by the platform (not the creator) to warn buyers about content removal. /// /// Capped at `limit` rows to bound memory + outbound email volume on creators /// with very large historical buyer pools. The caller is responsible for /// noting when the cap is hit (returned slice length == limit). #[tracing::instrument(skip_all)] pub async fn get_all_buyers_for_seller( pool: &PgPool, seller_id: UserId, limit: i64, ) -> Result> { let rows = sqlx::query_as::<_, BuyerNotificationRow>( r#" SELECT DISTINCT u.email, u.display_name FROM transactions t JOIN users u ON u.id = t.buyer_id WHERE t.seller_id = $1 AND t.status = 'completed' AND t.buyer_id IS NOT NULL LIMIT $2 "#, ) .bind(seller_id) .bind(limit) .fetch_all(pool) .await?; Ok(rows) } /// A contact row: a buyer who opted to share their email with the seller. #[derive(sqlx::FromRow)] pub struct DbContactRow { pub username: String, pub email: String, pub total_purchases: i64, pub total_spent_cents: i64, pub last_purchase_at: DateTime, } /// A creator the fan has actively shared contact info with (no revocation). #[derive(sqlx::FromRow)] pub struct SharedCreatorRow { pub seller_id: UserId, pub username: String, pub display_name: Option, } /// Get unique contacts for a seller: buyers who opted in to share their email. /// /// Aggregates across all completed transactions where `share_contact = true`, /// returning one row per buyer with purchase stats. Excludes buyers who have /// revoked contact sharing. #[tracing::instrument(skip_all)] pub async fn get_seller_contacts( pool: &PgPool, seller_id: UserId, ) -> Result> { let rows = sqlx::query_as::<_, DbContactRow>( r#" SELECT u.username, u.email, COUNT(*) as total_purchases, COALESCE(SUM(t.amount_cents), 0)::BIGINT as total_spent_cents, MAX(t.completed_at) as last_purchase_at FROM transactions t JOIN users u ON u.id = t.buyer_id WHERE t.seller_id = $1 AND t.status = 'completed' AND t.share_contact = true AND NOT EXISTS ( SELECT 1 FROM contact_revocations cr WHERE cr.buyer_id = t.buyer_id AND cr.seller_id = t.seller_id ) GROUP BY t.buyer_id, u.username, u.email ORDER BY last_purchase_at DESC LIMIT 500 "#, ) .bind(seller_id) .fetch_all(pool) .await?; Ok(rows) } /// Get seller transactions for CSV export, with conditional buyer email. /// /// Respects contact revocations: if a buyer revoked sharing, their email /// is hidden even if `share_contact` was true on the transaction. #[tracing::instrument(skip_all)] pub async fn get_seller_transactions_for_export( pool: &PgPool, seller_id: UserId, ) -> Result> { let rows = sqlx::query_as::<_, DbTransactionExportRow>( r#" SELECT t.created_at, t.item_id, t.item_title, t.amount_cents, t.status, CASE WHEN t.share_contact AND NOT EXISTS ( SELECT 1 FROM contact_revocations cr WHERE cr.buyer_id = t.buyer_id AND cr.seller_id = t.seller_id ) THEN u.email ELSE NULL END as buyer_email FROM transactions t LEFT JOIN users u ON u.id = t.buyer_id WHERE t.seller_id = $1 ORDER BY t.created_at DESC "#, ) .bind(seller_id) .fetch_all(pool) .await?; Ok(rows) } /// Platform-wide revenue stats: total completed revenue, completed count, refunded count. #[tracing::instrument(skip_all)] pub async fn get_platform_revenue_stats(pool: &PgPool) -> Result<(i64, i64, i64)> { let row: (Option, Option, Option) = sqlx::query_as( r#" SELECT COALESCE(SUM(CASE WHEN status = 'completed' THEN amount_cents ELSE 0 END), 0)::BIGINT, COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0)::BIGINT, COALESCE(SUM(CASE WHEN status = 'refunded' THEN 1 ELSE 0 END), 0)::BIGINT FROM transactions "#, ) .fetch_one(pool) .await?; Ok((row.0.unwrap_or(0), row.1.unwrap_or(0), row.2.unwrap_or(0))) } /// Record a contact revocation (fan withdraws email sharing from a creator). /// /// Idempotent: does nothing if already revoked. #[tracing::instrument(skip_all)] pub async fn revoke_contact_sharing( pool: &PgPool, buyer_id: UserId, seller_id: UserId, ) -> Result<()> { sqlx::query( "INSERT INTO contact_revocations (buyer_id, seller_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", ) .bind(buyer_id) .bind(seller_id) .execute(pool) .await?; Ok(()) } /// Clear a contact revocation (fan re-shares on a new purchase). #[tracing::instrument(skip_all)] pub async fn clear_contact_revocation( pool: &PgPool, buyer_id: UserId, seller_id: UserId, ) -> Result<()> { sqlx::query( "DELETE FROM contact_revocations WHERE buyer_id = $1 AND seller_id = $2", ) .bind(buyer_id) .bind(seller_id) .execute(pool) .await?; Ok(()) } /// Get creators the fan has actively shared contact info with (excluding revoked). #[tracing::instrument(skip_all)] pub async fn get_shared_creators( pool: &PgPool, buyer_id: UserId, ) -> Result> { let rows = sqlx::query_as::<_, SharedCreatorRow>( r#" SELECT DISTINCT t.seller_id, u.username, u.display_name FROM transactions t JOIN users u ON u.id = t.seller_id WHERE t.buyer_id = $1 AND t.status = 'completed' AND t.share_contact = true AND NOT EXISTS ( SELECT 1 FROM contact_revocations cr WHERE cr.buyer_id = t.buyer_id AND cr.seller_id = t.seller_id ) ORDER BY u.username LIMIT 500 "#, ) .bind(buyer_id) .fetch_all(pool) .await?; Ok(rows) } /// Create a pending "placeholder" transaction for a subscription checkout that /// used a promo code. This row exists solely so `cleanup_stale_pending` can /// release the promo code reservation if the buyer abandons the Stripe session. /// It is deleted (not completed) when the subscription webhook fires. #[tracing::instrument(skip_all)] pub async fn create_subscription_pending_transaction( pool: &PgPool, buyer_id: UserId, seller_id: UserId, project_id: ProjectId, stripe_checkout_session_id: &str, promo_code_id: PromoCodeId, ) -> Result<()> { sqlx::query( r#" INSERT INTO transactions (buyer_id, seller_id, project_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, item_title, seller_username, share_contact, promo_code_id) VALUES ($1, $2, $3, 0, 0, $4, 'subscription-promo-hold', '', false, $5) "#, ) .bind(buyer_id) .bind(seller_id) .bind(project_id) .bind(stripe_checkout_session_id) .bind(promo_code_id) .execute(pool) .await?; Ok(()) } /// Delete a pending subscription promo-hold transaction by checkout session ID. /// Called from the subscription webhook after the subscription is created. #[tracing::instrument(skip_all)] pub async fn delete_subscription_pending_transaction<'e>( executor: impl sqlx::PgExecutor<'e>, stripe_checkout_session_id: &str, ) -> Result<()> { sqlx::query( "DELETE FROM transactions WHERE stripe_checkout_session_id = $1 AND status = 'pending'", ) .bind(stripe_checkout_session_id) .execute(executor) .await?; Ok(()) } /// Delete stale pending transactions (older than the given threshold) and return /// the promo_code_ids that need their use_count decremented. /// /// Stripe checkout sessions expire after 24 hours, so pending transactions older /// than that will never complete. This releases the pending purchase uniqueness /// slot and any reserved promo code use_count. #[tracing::instrument(skip_all)] pub async fn cleanup_stale_pending( pool: &PgPool, older_than: chrono::Duration, ) -> Result>> { let cutoff = chrono::Utc::now() - older_than; let rows: Vec<(Option,)> = sqlx::query_as( r#" DELETE FROM transactions WHERE status = 'pending' AND created_at < $1 RETURNING promo_code_id "#, ) .bind(cutoff) .fetch_all(pool) .await?; Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Bulk variant of `get_pending_item_purchase`. Returns the subset of `item_ids` /// for which the buyer already has a `pending` transaction. Used by cart /// checkout to abort early when any line item would collide with the partial /// unique index on `(buyer_id, item_id) WHERE status = 'pending'`. #[tracing::instrument(skip_all)] pub async fn pending_subset( pool: &PgPool, buyer_id: UserId, item_ids: &[ItemId], ) -> Result> { if item_ids.is_empty() { return Ok(std::collections::HashSet::new()); } let rows: Vec<(ItemId,)> = sqlx::query_as( "SELECT DISTINCT item_id FROM transactions WHERE buyer_id = $1 AND status = 'pending' AND item_id = ANY($2)", ) .bind(buyer_id) .bind(item_ids) .fetch_all(pool) .await?; Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Returns the buyer's pending transaction for a specific item, if any. /// Used to surface in-progress checkouts on the purchase page. #[tracing::instrument(skip_all)] pub async fn get_pending_item_purchase( pool: &PgPool, buyer_id: UserId, item_id: ItemId, ) -> Result)>> { let row: Option<(TransactionId, chrono::DateTime)> = sqlx::query_as( r#" SELECT id, created_at FROM transactions WHERE buyer_id = $1 AND item_id = $2 AND status = 'pending' LIMIT 1 "#, ) .bind(buyer_id) .bind(item_id) .fetch_optional(pool) .await?; Ok(row) } /// Delete the buyer's pending transaction for a specific item. /// Returns any released `promo_code_id` so the caller can release its /// reservation. #[tracing::instrument(skip_all)] pub async fn delete_pending_item_purchase( pool: &PgPool, buyer_id: UserId, item_id: ItemId, ) -> Result> { let row: Option<(Option,)> = sqlx::query_as( r#" DELETE FROM transactions WHERE buyer_id = $1 AND item_id = $2 AND status = 'pending' RETURNING promo_code_id "#, ) .bind(buyer_id) .bind(item_id) .fetch_optional(pool) .await?; Ok(row.and_then(|(id,)| id)) } /// Create a completed free guest transaction. /// /// Returns the number of rows inserted (0 if already claimed via ON CONFLICT). #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all)] pub async fn create_free_guest_transaction( pool: &PgPool, buyer_id: Option, seller_id: UserId, item_id: ItemId, checkout_session_id: &str, item_title: &str, seller_username: &str, guest_email: &str, claim_token: Option, download_token: DownloadToken, ) -> std::result::Result { let result = sqlx::query( r#" INSERT INTO transactions ( buyer_id, seller_id, item_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, status, completed_at, item_title, seller_username, share_contact, guest_email, claim_token, download_token ) VALUES ($1, $2, $3, 0, 0, $4, 'completed', NOW(), $5, $6, false, $7, $8, $9) ON CONFLICT (guest_email, item_id) WHERE status = 'completed' AND guest_email IS NOT NULL DO NOTHING "#, ) .bind(buyer_id) .bind(seller_id) .bind(item_id) .bind(checkout_session_id) .bind(item_title) .bind(seller_username) .bind(guest_email) .bind(claim_token) .bind(download_token) .execute(pool) .await?; Ok(result.rows_affected()) } /// Record a free project claim (PWYW with $0 min or free project). /// /// Returns `true` if the claim was actually inserted, `false` if the buyer /// already owned the project. Mirrors the `claim_free_item` shape so callers /// can gate downstream side-effects (contact-revocation clear, sale-notification /// email, etc.) on the winner of a concurrent-claim race — without this signal, /// two concurrent `/checkout/project` POSTs both fire those side-effects /// regardless of which one's INSERT actually landed (Run #7 deferred SERIOUS). #[tracing::instrument(skip_all)] pub async fn claim_free_project( pool: &PgPool, buyer_id: UserId, seller_id: UserId, project_id: ProjectId, item_title: &str, seller_username: &str, share_contact: bool, ) -> Result { let result = sqlx::query( r#" INSERT INTO transactions (buyer_id, seller_id, project_id, amount_cents, platform_fee_cents, status, completed_at, item_title, seller_username, share_contact) VALUES ($1, $2, $3, 0, 0, 'completed', NOW(), $4, $5, $6) ON CONFLICT (buyer_id, project_id) WHERE status = 'completed' AND project_id IS NOT NULL DO NOTHING "#, ) .bind(buyer_id) .bind(seller_id) .bind(project_id) .bind(item_title) .bind(seller_username) .bind(share_contact) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Completed and refunded sales for a specific item, for the item dashboard Sales tab. #[tracing::instrument(skip_all)] pub async fn get_sales_by_item( pool: &PgPool, item_id: ItemId, seller_id: UserId, ) -> Result> { let rows = sqlx::query_as::<_, DbTransaction>( r#" SELECT * FROM transactions WHERE item_id = $1 AND seller_id = $2 AND status IN ('completed', 'refunded') ORDER BY created_at DESC LIMIT 200 "#, ) .bind(item_id) .bind(seller_id) .fetch_all(pool) .await?; Ok(rows) }