Skip to main content

max / makenotwork

39.7 KB · 1236 lines History Blame Raw
1 //! Transaction queries: checkout tracking, purchase history, and free claims.
2
3 use chrono::{DateTime, Utc};
4 use sqlx::PgPool;
5
6 use super::models::*;
7 use super::validated_types::KeyCode;
8 use super::{Cents, ClaimToken, DownloadToken, ItemId, ProjectId, PromoCodeId, TransactionId, UserId};
9 use crate::error::Result;
10
11 /// Parameters for creating a pending Stripe checkout transaction.
12 pub struct CreateTransactionParams<'a> {
13 pub buyer_id: Option<UserId>,
14 pub seller_id: UserId,
15 /// `None` for project-level purchases (no specific item).
16 pub item_id: Option<ItemId>,
17 pub amount_cents: Cents,
18 pub platform_fee_cents: Cents,
19 pub stripe_checkout_session_id: &'a str,
20 pub item_title: &'a str,
21 pub seller_username: &'a str,
22 pub share_contact: bool,
23 /// Set for project-level purchases; `None` for item purchases.
24 pub project_id: Option<ProjectId>,
25 /// Promo code used for this checkout (for releasing reservations on cleanup).
26 pub promo_code_id: Option<PromoCodeId>,
27 /// Guest buyer's email (set for guest checkouts, None for logged-in).
28 pub guest_email: Option<&'a str>,
29 }
30
31 /// Common parameters for claiming a free item (direct, discount code, or download code).
32 pub struct ClaimParams<'a> {
33 pub buyer_id: UserId,
34 pub item_id: ItemId,
35 pub seller_id: UserId,
36 pub item_title: &'a str,
37 pub seller_username: &'a str,
38 pub share_contact: bool,
39 /// If this claim was granted via a bundle purchase, the parent transaction ID.
40 pub parent_transaction_id: Option<TransactionId>,
41 }
42
43 /// Record a new pending transaction for a Stripe checkout session.
44 #[tracing::instrument(skip_all)]
45 pub async fn create_transaction<'e>(
46 executor: impl sqlx::PgExecutor<'e>,
47 params: &CreateTransactionParams<'_>,
48 ) -> Result<DbTransaction> {
49 let tx = sqlx::query_as::<_, DbTransaction>(
50 r#"
51 INSERT INTO transactions (buyer_id, seller_id, item_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, item_title, seller_username, share_contact, project_id, promo_code_id, guest_email)
52 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
53 RETURNING *
54 "#,
55 )
56 .bind(params.buyer_id)
57 .bind(params.seller_id)
58 .bind(params.item_id)
59 .bind(params.amount_cents)
60 .bind(params.platform_fee_cents)
61 .bind(params.stripe_checkout_session_id)
62 .bind(params.item_title)
63 .bind(params.seller_username)
64 .bind(params.share_contact)
65 .bind(params.project_id)
66 .bind(params.promo_code_id)
67 .bind(params.guest_email)
68 .fetch_one(executor)
69 .await?;
70
71 Ok(tx)
72 }
73
74 /// Complete a guest transaction: set guest_email, generate claim_token, and
75 /// optionally auto-attach to an existing user if the email matches.
76 ///
77 /// Uses a transaction to prevent races between auto-attach and concurrent
78 /// signup/email-verification for the same email.
79 #[tracing::instrument(skip_all)]
80 pub async fn complete_guest_transaction<'e>(
81 executor: impl sqlx::PgExecutor<'e>,
82 stripe_checkout_session_id: &str,
83 stripe_payment_intent_id: &str,
84 guest_email: &str,
85 existing_user_id: Option<UserId>,
86 ) -> Result<Option<DbTransaction>> {
87 let claim_token = ClaimToken::new();
88
89 let tx = sqlx::query_as::<_, DbTransaction>(
90 r#"
91 UPDATE transactions
92 SET status = 'completed',
93 stripe_payment_intent_id = $2,
94 completed_at = NOW(),
95 guest_email = $3,
96 claim_token = CASE WHEN $4::UUID IS NOT NULL THEN NULL ELSE $5 END,
97 buyer_id = $4
98 WHERE stripe_checkout_session_id = $1
99 AND status = 'pending'
100 RETURNING *
101 "#,
102 )
103 .bind(stripe_checkout_session_id)
104 .bind(stripe_payment_intent_id)
105 .bind(guest_email)
106 .bind(existing_user_id)
107 .bind(claim_token)
108 .fetch_optional(executor)
109 .await?;
110
111 Ok(tx)
112 }
113
114 /// Attach all unclaimed guest purchases for an email to a user account.
115 /// Called during signup/email verification to auto-claim prior guest purchases.
116 #[tracing::instrument(skip_all)]
117 pub async fn attach_guest_purchases_by_email(
118 pool: &PgPool,
119 email: &str,
120 user_id: UserId,
121 ) -> Result<u64> {
122 let result = sqlx::query(
123 r#"
124 UPDATE transactions
125 SET buyer_id = $1, claimed_by = $1, claim_token = NULL
126 WHERE LOWER(guest_email) = LOWER($2)
127 AND buyer_id IS NULL
128 AND status = 'completed'
129 "#,
130 )
131 .bind(user_id)
132 .bind(email)
133 .execute(pool)
134 .await?;
135
136 Ok(result.rows_affected())
137 }
138
139 /// Claim a single guest purchase by claim token.
140 #[tracing::instrument(skip_all)]
141 pub async fn claim_guest_purchase(
142 pool: &PgPool,
143 claim_token: ClaimToken,
144 user_id: UserId,
145 ) -> Result<Option<DbTransaction>> {
146 let tx = sqlx::query_as::<_, DbTransaction>(
147 r#"
148 UPDATE transactions
149 SET buyer_id = $2, claimed_by = $2, claim_token = NULL
150 WHERE claim_token = $1
151 AND buyer_id IS NULL
152 AND status = 'completed'
153 RETURNING *
154 "#,
155 )
156 .bind(claim_token)
157 .bind(user_id)
158 .fetch_optional(pool)
159 .await?;
160
161 Ok(tx)
162 }
163
164 /// Look up a completed transaction by download token (for guest download links).
165 #[tracing::instrument(skip_all)]
166 pub async fn get_transaction_by_download_token(
167 pool: &PgPool,
168 download_token: DownloadToken,
169 ) -> Result<Option<DbTransaction>> {
170 let tx = sqlx::query_as::<_, DbTransaction>(
171 "SELECT * FROM transactions WHERE download_token = $1 AND status = 'completed'",
172 )
173 .bind(download_token)
174 .fetch_optional(pool)
175 .await?;
176
177 Ok(tx)
178 }
179
180 /// Mark a pending transaction as completed (idempotent; returns `None` if already completed).
181 ///
182 /// Accepts any sqlx executor (`&PgPool`, `&mut Transaction`, etc.) so callers
183 /// can include this in a larger transaction when needed.
184 #[tracing::instrument(skip_all)]
185 pub async fn complete_transaction<'e>(
186 executor: impl sqlx::PgExecutor<'e>,
187 stripe_checkout_session_id: &str,
188 stripe_payment_intent_id: &str,
189 ) -> Result<Option<DbTransaction>> {
190 // Only update if status is 'pending' for idempotency
191 // Returns None if transaction was already completed (duplicate webhook)
192 let tx = sqlx::query_as::<_, DbTransaction>(
193 r#"
194 UPDATE transactions
195 SET status = 'completed',
196 stripe_payment_intent_id = $2,
197 completed_at = NOW()
198 WHERE stripe_checkout_session_id = $1
199 AND status = 'pending'
200 RETURNING *
201 "#,
202 )
203 .bind(stripe_checkout_session_id)
204 .bind(stripe_payment_intent_id)
205 .fetch_optional(executor)
206 .await?;
207
208 Ok(tx)
209 }
210
211 /// Complete ALL pending transactions for a cart checkout session.
212 /// Returns the list of completed transactions (empty if already processed).
213 #[tracing::instrument(skip_all)]
214 pub async fn complete_cart_transactions<'e>(
215 executor: impl sqlx::PgExecutor<'e>,
216 stripe_checkout_session_id: &str,
217 stripe_payment_intent_id: &str,
218 ) -> Result<Vec<DbTransaction>> {
219 let txs = sqlx::query_as::<_, DbTransaction>(
220 r#"
221 UPDATE transactions
222 SET status = 'completed',
223 stripe_payment_intent_id = $2,
224 completed_at = NOW()
225 WHERE stripe_checkout_session_id = $1
226 AND status = 'pending'
227 RETURNING *
228 "#,
229 )
230 .bind(stripe_checkout_session_id)
231 .bind(stripe_payment_intent_id)
232 .fetch_all(executor)
233 .await?;
234
235 Ok(txs)
236 }
237
238 /// List transactions where the user is the buyer, newest first.
239 ///
240 /// Pass `limit: None` for all rows (exports), or `Some(n)` for dashboard display.
241 #[tracing::instrument(skip_all)]
242 pub async fn get_transactions_by_buyer(
243 pool: &PgPool,
244 buyer_id: UserId,
245 limit: Option<i64>,
246 ) -> Result<Vec<DbTransaction>> {
247 let txs = sqlx::query_as::<_, DbTransaction>(
248 "SELECT * FROM transactions WHERE buyer_id = $1 ORDER BY created_at DESC LIMIT $2",
249 )
250 .bind(buyer_id)
251 .bind(limit)
252 .fetch_all(pool)
253 .await?;
254
255 Ok(txs)
256 }
257
258 /// List transactions where the user is the seller, newest first.
259 ///
260 /// Pass `limit: None` for all rows (exports), or `Some(n)` for dashboard display.
261 #[tracing::instrument(skip_all)]
262 pub async fn get_transactions_by_seller(
263 pool: &PgPool,
264 seller_id: UserId,
265 limit: Option<i64>,
266 ) -> Result<Vec<DbTransaction>> {
267 let txs = sqlx::query_as::<_, DbTransaction>(
268 "SELECT * FROM transactions WHERE seller_id = $1 ORDER BY created_at DESC LIMIT $2",
269 )
270 .bind(seller_id)
271 .bind(limit)
272 .fetch_all(pool)
273 .await?;
274
275 Ok(txs)
276 }
277
278 /// Check whether a user has a completed purchase for a given item.
279 #[tracing::instrument(skip_all)]
280 pub async fn has_purchased_item(pool: &PgPool, user_id: UserId, item_id: ItemId) -> Result<bool> {
281 let count: i64 = sqlx::query_scalar(
282 "SELECT COUNT(*) FROM transactions WHERE buyer_id = $1 AND item_id = $2 AND status = 'completed'",
283 )
284 .bind(user_id)
285 .bind(item_id)
286 .fetch_one(pool)
287 .await?;
288
289 Ok(count > 0)
290 }
291
292 /// Bulk variant of `has_purchased_item`. Returns the subset of `item_ids` that
293 /// the buyer has a completed purchase for. Single DB roundtrip vs. N calls.
294 #[tracing::instrument(skip_all)]
295 pub async fn purchased_subset(
296 pool: &PgPool,
297 user_id: UserId,
298 item_ids: &[ItemId],
299 ) -> Result<std::collections::HashSet<ItemId>> {
300 if item_ids.is_empty() {
301 return Ok(std::collections::HashSet::new());
302 }
303 let rows: Vec<(ItemId,)> = sqlx::query_as(
304 "SELECT DISTINCT item_id FROM transactions
305 WHERE buyer_id = $1 AND status = 'completed' AND item_id = ANY($2)",
306 )
307 .bind(user_id)
308 .bind(item_ids)
309 .fetch_all(pool)
310 .await?;
311 Ok(rows.into_iter().map(|(id,)| id).collect())
312 }
313
314 /// Get all item IDs that a user has purchased (for batch access checks)
315 #[tracing::instrument(skip_all)]
316 pub async fn get_user_purchased_item_ids(pool: &PgPool, user_id: UserId) -> Result<Vec<ItemId>> {
317 let item_ids: Vec<ItemId> = sqlx::query_scalar(
318 "SELECT DISTINCT item_id FROM transactions WHERE buyer_id = $1 AND status = 'completed'",
319 )
320 .bind(user_id)
321 .fetch_all(pool)
322 .await?;
323
324 Ok(item_ids)
325 }
326
327 /// Claims a free item by creating a zero-cost completed transaction.
328 /// Returns true if claimed successfully, false if already in library.
329 ///
330 /// Uses `ON CONFLICT DO NOTHING` against the partial unique index on
331 /// `(buyer_id, item_id) WHERE status = 'completed' AND item_id IS NOT NULL` to prevent duplicate
332 /// claims under concurrent requests.
333 #[tracing::instrument(skip_all)]
334 pub async fn claim_free_item<'e>(
335 executor: impl sqlx::PgExecutor<'e>,
336 params: &ClaimParams<'_>,
337 ) -> Result<bool> {
338 let claim_id = format!("free-claim-{}-{}", params.buyer_id, params.item_id);
339 let result = sqlx::query(
340 r#"
341 INSERT INTO transactions (buyer_id, seller_id, item_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, status, completed_at, item_title, seller_username, share_contact, parent_transaction_id)
342 VALUES ($1, $2, $3, 0, 0, $4, 'completed', NOW(), $5, $6, $7, $8)
343 ON CONFLICT (buyer_id, item_id) WHERE status = 'completed' AND item_id IS NOT NULL DO NOTHING
344 "#,
345 )
346 .bind(params.buyer_id)
347 .bind(params.seller_id)
348 .bind(params.item_id)
349 .bind(&claim_id)
350 .bind(params.item_title)
351 .bind(params.seller_username)
352 .bind(params.share_contact)
353 .bind(params.parent_transaction_id)
354 .execute(executor)
355 .await?;
356
357 Ok(result.rows_affected() > 0)
358 }
359
360 /// Optional parameters for generating a license key inside a claim transaction.
361 pub struct LicenseKeyParams<'a> {
362 pub key_code: &'a KeyCode,
363 pub max_activations: Option<i32>,
364 }
365
366 /// Atomically claim a free item and increment the promo code's use count.
367 ///
368 /// Claims the item FIRST (INSERT transaction), then increments use_count.
369 /// If the user already owns the item (rows_affected == 0), rolls back without
370 /// consuming the code. If the code limit is reached, rolls back the claim too.
371 ///
372 /// When `license_key_params` is `Some`, a license key is created inside the
373 /// same transaction so that the claim and key are always consistent.
374 ///
375 /// Returns `(code_accepted, item_claimed)`:
376 /// - `code_accepted = false` → promo code hit its usage limit (nothing changed)
377 /// - `item_claimed = false` → user already owns the item (code was NOT consumed)
378 #[tracing::instrument(skip_all)]
379 pub async fn claim_free_with_promo_code(
380 pool: &PgPool,
381 promo_code_id: PromoCodeId,
382 params: &ClaimParams<'_>,
383 license_key_params: Option<&LicenseKeyParams<'_>>,
384 ) -> Result<(bool, bool)> {
385 let mut tx = pool.begin().await?;
386
387 // Step 1: Attempt to claim the item first
388 let claim_id = format!("free-claim-{}-{}", params.buyer_id, params.item_id);
389 let result = sqlx::query(
390 r#"
391 INSERT INTO transactions (buyer_id, seller_id, item_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, status, completed_at, item_title, seller_username, share_contact, promo_code_id)
392 VALUES ($1, $2, $3, 0, 0, $4, 'completed', NOW(), $5, $6, $7, $8)
393 ON CONFLICT (buyer_id, item_id) WHERE status = 'completed' AND item_id IS NOT NULL DO NOTHING
394 "#,
395 )
396 .bind(params.buyer_id)
397 .bind(params.seller_id)
398 .bind(params.item_id)
399 .bind(&claim_id)
400 .bind(params.item_title)
401 .bind(params.seller_username)
402 .bind(params.share_contact)
403 .bind(promo_code_id)
404 .execute(&mut *tx)
405 .await?;
406
407 let claimed = result.rows_affected() > 0;
408
409 // Step 2: If the user already owns the item, rollback without consuming the code
410 if !claimed {
411 tx.rollback().await?;
412 return Ok((true, false));
413 }
414
415 // Step 3: Increment the promo code use count
416 let code_result = sqlx::query(
417 "UPDATE promo_codes SET use_count = use_count + 1 WHERE id = $1 AND (max_uses IS NULL OR use_count < max_uses)",
418 )
419 .bind(promo_code_id)
420 .execute(&mut *tx)
421 .await?;
422
423 // Step 4: If the code limit was reached, rollback the claim too
424 if code_result.rows_affected() == 0 {
425 tx.rollback().await?;
426 return Ok((false, false));
427 }
428
429 crate::db::items::increment_sales_count(&mut *tx, params.item_id).await?;
430
431 // Step 5: Create license key inside the same transaction if requested.
432 // Retry once on a unique-violation: the wordlist generator has ~6B-coin-
433 // flip headroom, so an actual collision is vanishingly rare, but the
434 // alternative is surfacing a 500 to a buyer mid-claim — cheap to handle.
435 if let Some(lk) = license_key_params {
436 let attempt = sqlx::query(
437 r#"
438 INSERT INTO license_keys (item_id, owner_id, transaction_id, key_code, max_activations)
439 VALUES ($1, $2, NULL, $3, $4)
440 "#,
441 )
442 .bind(params.item_id)
443 .bind(params.buyer_id)
444 .bind(lk.key_code)
445 .bind(lk.max_activations)
446 .execute(&mut *tx)
447 .await;
448
449 if let Err(sqlx::Error::Database(e)) = &attempt
450 && e.code().as_deref() == Some("23505")
451 {
452 let retry_code = crate::helpers::generate_key_code();
453 tracing::warn!(item_id = %params.item_id, "license key 23505 collision; retrying once");
454 sqlx::query(
455 r#"
456 INSERT INTO license_keys (item_id, owner_id, transaction_id, key_code, max_activations)
457 VALUES ($1, $2, NULL, $3, $4)
458 "#,
459 )
460 .bind(params.item_id)
461 .bind(params.buyer_id)
462 .bind(&retry_code)
463 .bind(lk.max_activations)
464 .execute(&mut *tx)
465 .await?;
466 } else {
467 attempt?;
468 }
469 }
470
471 tx.commit().await?;
472 Ok((true, true))
473 }
474
475 // ── Project purchases ──
476
477 /// Check whether a user has a completed purchase for a given project.
478 #[tracing::instrument(skip_all)]
479 pub async fn has_purchased_project(pool: &PgPool, user_id: UserId, project_id: ProjectId) -> Result<bool> {
480 let count: i64 = sqlx::query_scalar(
481 "SELECT COUNT(*) FROM transactions WHERE buyer_id = $1 AND project_id = $2 AND status = 'completed'",
482 )
483 .bind(user_id)
484 .bind(project_id)
485 .fetch_one(pool)
486 .await?;
487
488 Ok(count > 0)
489 }
490
491 /// Parameters for creating a pending project purchase transaction.
492 pub struct CreateProjectTransactionParams<'a> {
493 pub buyer_id: UserId,
494 pub seller_id: UserId,
495 pub project_id: ProjectId,
496 pub amount_cents: i32,
497 pub stripe_checkout_session_id: &'a str,
498 pub project_title: &'a str,
499 pub seller_username: &'a str,
500 pub share_contact: bool,
501 }
502
503 /// Record a new pending transaction for a project purchase.
504 #[tracing::instrument(skip_all)]
505 pub async fn create_project_transaction(
506 pool: &PgPool,
507 params: &CreateProjectTransactionParams<'_>,
508 ) -> Result<DbTransaction> {
509 let tx = sqlx::query_as::<_, DbTransaction>(
510 r#"
511 INSERT INTO transactions (buyer_id, seller_id, project_id, amount_cents, platform_fee_cents, stripe_checkout_session_id, item_title, seller_username, share_contact)
512 VALUES ($1, $2, $3, $4, 0, $5, $6, $7, $8)
513 RETURNING *
514 "#,
515 )
516 .bind(params.buyer_id)
517 .bind(params.seller_id)
518 .bind(params.project_id)
519 .bind(params.amount_cents)
520 .bind(params.stripe_checkout_session_id)
521 .bind(params.project_title)
522 .bind(params.seller_username)
523 .bind(params.share_contact)
524 .fetch_one(pool)
525 .await?;
526
527 Ok(tx)
528 }
529
530 /// Get items purchased by a user, including any associated license key.
531 ///
532 /// Reads from the `purchases` VIEW (which filters `transactions` to
533 /// `status = 'completed'`), then JOINs through `items → projects → users`
534 /// for display fields. The LEFT JOIN on `license_keys` attaches the most
535 /// recent non-revoked key code so the buyer can see it in their library
536 /// without a separate lookup. Capped at 20 rows for the dashboard summary.
537 #[tracing::instrument(skip_all)]
538 pub async fn get_user_purchases(pool: &PgPool, user_id: UserId) -> Result<Vec<DbPurchaseRow>> {
539 let purchases = sqlx::query_as::<_, DbPurchaseRow>(
540 r#"
541 SELECT * FROM (
542 SELECT DISTINCT ON (p.item_id)
543 p.transaction_id,
544 p.item_id,
545 i.title,
546 u.username as creator,
547 i.item_type,
548 p.purchased_at,
549 (i.price_cents = 0) as is_free,
550 lk.key_code as license_key_code,
551 (vc.total_versions > 0 AND vc.total_versions > COALESCE(dc.downloaded_count, 0)) as has_new_version
552 FROM purchases p
553 JOIN items i ON p.item_id = i.id
554 JOIN projects proj ON i.project_id = proj.id
555 JOIN users u ON proj.user_id = u.id
556 LEFT JOIN license_keys lk ON lk.item_id = p.item_id AND lk.owner_id = p.buyer_id AND lk.revoked_at IS NULL
557 LEFT JOIN LATERAL (
558 SELECT COUNT(*) AS total_versions
559 FROM versions v
560 WHERE v.item_id = i.id AND v.s3_key IS NOT NULL
561 ) vc ON true
562 LEFT JOIN LATERAL (
563 SELECT COUNT(*) AS downloaded_count
564 FROM user_downloads ud
565 WHERE ud.user_id = p.buyer_id AND ud.item_id = i.id
566 ) dc ON true
567 WHERE p.buyer_id = $1
568 ORDER BY p.item_id, p.purchased_at DESC
569 ) deduped
570 ORDER BY purchased_at DESC
571 LIMIT 20
572 "#,
573 )
574 .bind(user_id)
575 .fetch_all(pool)
576 .await?;
577
578 Ok(purchases)
579 }
580
581 /// Sum completed revenue and count sales for all items in a project.
582 ///
583 /// Returns `(total_revenue_cents, total_sales)`. Only completed transactions
584 /// are counted; pending, failed, and refunded are excluded.
585 #[tracing::instrument(skip_all)]
586 pub async fn get_revenue_by_project(pool: &PgPool, project_id: ProjectId) -> Result<(i64, i64)> {
587 let row: (Option<i64>, Option<i64>) = sqlx::query_as(
588 r#"
589 SELECT
590 COALESCE(SUM(t.amount_cents), 0)::BIGINT,
591 COUNT(*)
592 FROM transactions t
593 JOIN items i ON t.item_id = i.id
594 WHERE i.project_id = $1
595 AND t.status = 'completed'
596 "#,
597 )
598 .bind(project_id)
599 .fetch_one(pool)
600 .await?;
601
602 Ok((row.0.unwrap_or(0), row.1.unwrap_or(0)))
603 }
604
605 /// Revenue per project for a given seller, returned as (project_id, title, revenue_cents).
606 /// Single query replaces N+1 loop in dashboard analytics.
607 #[tracing::instrument(skip_all)]
608 pub async fn get_revenue_by_user_projects(
609 pool: &PgPool,
610 user_id: UserId,
611 ) -> Result<Vec<(ProjectId, String, i64)>> {
612 let rows: Vec<(ProjectId, String, i64)> = sqlx::query_as(
613 r#"
614 SELECT p.id, p.title, COALESCE(SUM(t.amount_cents), 0)::BIGINT
615 FROM projects p
616 LEFT JOIN items i ON i.project_id = p.id
617 LEFT JOIN transactions t ON t.item_id = i.id AND t.status = 'completed'
618 WHERE p.user_id = $1
619 GROUP BY p.id, p.title
620 HAVING COALESCE(SUM(t.amount_cents), 0) > 0
621 ORDER BY COALESCE(SUM(t.amount_cents), 0) DESC
622 "#,
623 )
624 .bind(user_id)
625 .fetch_all(pool)
626 .await?;
627
628 Ok(rows)
629 }
630
631 /// Revenue and sales per project for a seller within a time range.
632 ///
633 /// Used for the cross-project comparison table on the user analytics tab.
634 #[tracing::instrument(skip_all)]
635 pub async fn get_revenue_by_user_projects_in_range(
636 pool: &PgPool,
637 user_id: UserId,
638 range: &super::analytics::TimeRange,
639 ) -> Result<Vec<(ProjectId, String, i64, i64)>> {
640 let time_filter = match range.interval_sql() {
641 Some(interval) => format!(
642 " AND t.completed_at >= NOW() - INTERVAL '{interval}'"
643 ),
644 None => String::new(),
645 };
646
647 let sql = format!(
648 r#"
649 SELECT p.id, p.title,
650 COALESCE(SUM(t.amount_cents), 0)::BIGINT,
651 COUNT(t.id)::BIGINT
652 FROM projects p
653 LEFT JOIN items i ON i.project_id = p.id
654 LEFT JOIN transactions t ON t.item_id = i.id AND t.status = 'completed'{time_filter}
655 WHERE p.user_id = $1
656 GROUP BY p.id, p.title
657 ORDER BY COALESCE(SUM(t.amount_cents), 0) DESC
658 "#
659 );
660
661 let rows: Vec<(ProjectId, String, i64, i64)> = sqlx::query_as(&sql)
662 .bind(user_id)
663 .fetch_all(pool)
664 .await?;
665
666 Ok(rows)
667 }
668
669 /// Remove a free item from library (deletes the claim transaction).
670 /// If the claim was via a promo code, decrements the code's use_count.
671 #[tracing::instrument(skip_all)]
672 pub async fn remove_free_item_from_library(
673 pool: &PgPool,
674 user_id: UserId,
675 item_id: ItemId,
676 ) -> Result<bool> {
677 // Delete the free claim and return the promo_code_id if one was used
678 let row: Option<Option<super::PromoCodeId>> = sqlx::query_scalar(
679 r#"
680 DELETE FROM transactions
681 WHERE buyer_id = $1 AND item_id = $2 AND amount_cents = 0 AND status = 'completed'
682 RETURNING promo_code_id
683 "#,
684 )
685 .bind(user_id)
686 .bind(item_id)
687 .fetch_optional(pool)
688 .await?;
689
690 let deleted = row.is_some();
691
692 if let Some(Some(pc_id)) = row {
693 super::promo_codes::release_use_count(pool, pc_id).await.ok();
694 }
695
696 Ok(deleted)
697 }
698
699 /// Fetch a single transaction by ID.
700 #[tracing::instrument(skip_all)]
701 pub async fn get_transaction_by_id(
702 pool: &PgPool,
703 id: TransactionId,
704 ) -> Result<Option<DbTransaction>> {
705 let tx = sqlx::query_as::<_, DbTransaction>("SELECT * FROM transactions WHERE id = $1")
706 .bind(id)
707 .fetch_optional(pool)
708 .await?;
709 Ok(tx)
710 }
711
712 /// Mark a transaction as refunded, returning its ID and item_id for downstream cleanup.
713 ///
714 /// The WHERE clause requires `status = 'completed'` so that already-refunded
715 /// or pending transactions are not double-processed. Returns an empty vec if no
716 /// matching transactions were found (idempotent for webhook retries).
717 ///
718 /// Returns ALL refunded transactions (handles cart checkouts where multiple
719 /// transactions share the same payment_intent_id).
720 #[tracing::instrument(skip_all)]
721 pub async fn refund_transaction_by_payment_intent<'e>(
722 executor: impl sqlx::PgExecutor<'e>,
723 payment_intent_id: &str,
724 ) -> Result<Vec<(super::TransactionId, Option<ItemId>)>> {
725 // item_id is nullable on project-level transactions (routes/stripe/checkout/project.rs);
726 // returning non-Optional ItemId would cause sqlx decode failures and infinite Stripe retries.
727 let rows: Vec<(super::TransactionId, Option<ItemId>)> = sqlx::query_as(
728 r#"
729 UPDATE transactions
730 SET status = 'refunded'
731 WHERE stripe_payment_intent_id = $1 AND status = 'completed'
732 RETURNING id, item_id
733 "#,
734 )
735 .bind(payment_intent_id)
736 .fetch_all(executor)
737 .await?;
738
739 Ok(rows)
740 }
741
742 /// Revoke all child transactions linked to a parent (bundle) transaction.
743 ///
744 /// Returns the item IDs of revoked children so callers can decrement sales counts.
745 #[tracing::instrument(skip_all)]
746 pub async fn revoke_child_transactions<'e>(
747 executor: impl sqlx::PgExecutor<'e>,
748 parent_transaction_id: TransactionId,
749 ) -> Result<Vec<ItemId>> {
750 let item_ids: Vec<(Option<ItemId>,)> = sqlx::query_as(
751 r#"
752 UPDATE transactions
753 SET status = 'refunded'
754 WHERE parent_transaction_id = $1 AND status = 'completed'
755 RETURNING item_id
756 "#,
757 )
758 .bind(parent_transaction_id)
759 .fetch_all(executor)
760 .await?;
761
762 Ok(item_ids.into_iter().filter_map(|(id,)| id).collect())
763 }
764
765 /// A buyer's email and display name for platform notifications (e.g. creator departure).
766 ///
767 /// Unlike [`DbContactRow`], this includes ALL buyers regardless of contact sharing
768 /// preference, because platform-initiated notifications (content removal warnings)
769 /// are distinct from creator-initiated contact.
770 #[derive(sqlx::FromRow)]
771 pub struct BuyerNotificationRow {
772 pub email: String,
773 pub display_name: Option<String>,
774 }
775
776 /// Get unique buyers who purchased from a seller, for platform notifications.
777 ///
778 /// This bypasses `share_contact` and `contact_revocations` because the notification
779 /// is sent by the platform (not the creator) to warn buyers about content removal.
780 ///
781 /// Capped at `limit` rows to bound memory + outbound email volume on creators
782 /// with very large historical buyer pools. The caller is responsible for
783 /// noting when the cap is hit (returned slice length == limit).
784 #[tracing::instrument(skip_all)]
785 pub async fn get_all_buyers_for_seller(
786 pool: &PgPool,
787 seller_id: UserId,
788 limit: i64,
789 ) -> Result<Vec<BuyerNotificationRow>> {
790 let rows = sqlx::query_as::<_, BuyerNotificationRow>(
791 r#"
792 SELECT DISTINCT u.email, u.display_name
793 FROM transactions t
794 JOIN users u ON u.id = t.buyer_id
795 WHERE t.seller_id = $1
796 AND t.status = 'completed'
797 AND t.buyer_id IS NOT NULL
798 LIMIT $2
799 "#,
800 )
801 .bind(seller_id)
802 .bind(limit)
803 .fetch_all(pool)
804 .await?;
805
806 Ok(rows)
807 }
808
809 /// A contact row: a buyer who opted to share their email with the seller.
810 #[derive(sqlx::FromRow)]
811 pub struct DbContactRow {
812 pub username: String,
813 pub email: String,
814 pub total_purchases: i64,
815 pub total_spent_cents: i64,
816 pub last_purchase_at: DateTime<Utc>,
817 }
818
819 /// A creator the fan has actively shared contact info with (no revocation).
820 #[derive(sqlx::FromRow)]
821 pub struct SharedCreatorRow {
822 pub seller_id: UserId,
823 pub username: String,
824 pub display_name: Option<String>,
825 }
826
827 /// Get unique contacts for a seller: buyers who opted in to share their email.
828 ///
829 /// Aggregates across all completed transactions where `share_contact = true`,
830 /// returning one row per buyer with purchase stats. Excludes buyers who have
831 /// revoked contact sharing.
832 #[tracing::instrument(skip_all)]
833 pub async fn get_seller_contacts(
834 pool: &PgPool,
835 seller_id: UserId,
836 ) -> Result<Vec<DbContactRow>> {
837 let rows = sqlx::query_as::<_, DbContactRow>(
838 r#"
839 SELECT
840 u.username,
841 u.email,
842 COUNT(*) as total_purchases,
843 COALESCE(SUM(t.amount_cents), 0)::BIGINT as total_spent_cents,
844 MAX(t.completed_at) as last_purchase_at
845 FROM transactions t
846 JOIN users u ON u.id = t.buyer_id
847 WHERE t.seller_id = $1
848 AND t.status = 'completed'
849 AND t.share_contact = true
850 AND NOT EXISTS (
851 SELECT 1 FROM contact_revocations cr
852 WHERE cr.buyer_id = t.buyer_id AND cr.seller_id = t.seller_id
853 )
854 GROUP BY t.buyer_id, u.username, u.email
855 ORDER BY last_purchase_at DESC
856 LIMIT 500
857 "#,
858 )
859 .bind(seller_id)
860 .fetch_all(pool)
861 .await?;
862
863 Ok(rows)
864 }
865
866 /// Get seller transactions for CSV export, with conditional buyer email.
867 ///
868 /// Respects contact revocations: if a buyer revoked sharing, their email
869 /// is hidden even if `share_contact` was true on the transaction.
870 #[tracing::instrument(skip_all)]
871 pub async fn get_seller_transactions_for_export(
872 pool: &PgPool,
873 seller_id: UserId,
874 ) -> Result<Vec<DbTransactionExportRow>> {
875 let rows = sqlx::query_as::<_, DbTransactionExportRow>(
876 r#"
877 SELECT
878 t.created_at,
879 t.item_id,
880 t.item_title,
881 t.amount_cents,
882 t.status,
883 CASE WHEN t.share_contact AND NOT EXISTS (
884 SELECT 1 FROM contact_revocations cr
885 WHERE cr.buyer_id = t.buyer_id AND cr.seller_id = t.seller_id
886 ) THEN u.email ELSE NULL END as buyer_email
887 FROM transactions t
888 LEFT JOIN users u ON u.id = t.buyer_id
889 WHERE t.seller_id = $1
890 ORDER BY t.created_at DESC
891 "#,
892 )
893 .bind(seller_id)
894 .fetch_all(pool)
895 .await?;
896
897 Ok(rows)
898 }
899
900 /// Platform-wide revenue stats: total completed revenue, completed count, refunded count.
901 #[tracing::instrument(skip_all)]
902 pub async fn get_platform_revenue_stats(pool: &PgPool) -> Result<(i64, i64, i64)> {
903 let row: (Option<i64>, Option<i64>, Option<i64>) = sqlx::query_as(
904 r#"
905 SELECT
906 COALESCE(SUM(CASE WHEN status = 'completed' THEN amount_cents ELSE 0 END), 0)::BIGINT,
907 COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0)::BIGINT,
908 COALESCE(SUM(CASE WHEN status = 'refunded' THEN 1 ELSE 0 END), 0)::BIGINT
909 FROM transactions
910 "#,
911 )
912 .fetch_one(pool)
913 .await?;
914
915 Ok((row.0.unwrap_or(0), row.1.unwrap_or(0), row.2.unwrap_or(0)))
916 }
917
918 /// Record a contact revocation (fan withdraws email sharing from a creator).
919 ///
920 /// Idempotent: does nothing if already revoked.
921 #[tracing::instrument(skip_all)]
922 pub async fn revoke_contact_sharing(
923 pool: &PgPool,
924 buyer_id: UserId,
925 seller_id: UserId,
926 ) -> Result<()> {
927 sqlx::query(
928 "INSERT INTO contact_revocations (buyer_id, seller_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
929 )
930 .bind(buyer_id)
931 .bind(seller_id)
932 .execute(pool)
933 .await?;
934
935 Ok(())
936 }
937
938 /// Clear a contact revocation (fan re-shares on a new purchase).
939 #[tracing::instrument(skip_all)]
940 pub async fn clear_contact_revocation(
941 pool: &PgPool,
942 buyer_id: UserId,
943 seller_id: UserId,
944 ) -> Result<()> {
945 sqlx::query(
946 "DELETE FROM contact_revocations WHERE buyer_id = $1 AND seller_id = $2",
947 )
948 .bind(buyer_id)
949 .bind(seller_id)
950 .execute(pool)
951 .await?;
952
953 Ok(())
954 }
955
956 /// Get creators the fan has actively shared contact info with (excluding revoked).
957 #[tracing::instrument(skip_all)]
958 pub async fn get_shared_creators(
959 pool: &PgPool,
960 buyer_id: UserId,
961 ) -> Result<Vec<SharedCreatorRow>> {
962 let rows = sqlx::query_as::<_, SharedCreatorRow>(
963 r#"
964 SELECT DISTINCT t.seller_id, u.username, u.display_name
965 FROM transactions t
966 JOIN users u ON u.id = t.seller_id
967 WHERE t.buyer_id = $1
968 AND t.status = 'completed'
969 AND t.share_contact = true
970 AND NOT EXISTS (
971 SELECT 1 FROM contact_revocations cr
972 WHERE cr.buyer_id = t.buyer_id AND cr.seller_id = t.seller_id
973 )
974 ORDER BY u.username
975 LIMIT 500
976 "#,
977 )
978 .bind(buyer_id)
979 .fetch_all(pool)
980 .await?;
981
982 Ok(rows)
983 }
984
985 /// Create a pending "placeholder" transaction for a subscription checkout that
986 /// used a promo code. This row exists solely so `cleanup_stale_pending` can
987 /// release the promo code reservation if the buyer abandons the Stripe session.
988 /// It is deleted (not completed) when the subscription webhook fires.
989 #[tracing::instrument(skip_all)]
990 pub async fn create_subscription_pending_transaction(
991 pool: &PgPool,
992 buyer_id: UserId,
993 seller_id: UserId,
994 project_id: ProjectId,
995 stripe_checkout_session_id: &str,
996 promo_code_id: PromoCodeId,
997 ) -> Result<()> {
998 sqlx::query(
999 r#"
1000 INSERT INTO transactions (buyer_id, seller_id, project_id, amount_cents, platform_fee_cents,
1001 stripe_checkout_session_id, item_title, seller_username, share_contact, promo_code_id)
1002 VALUES ($1, $2, $3, 0, 0, $4, 'subscription-promo-hold', '', false, $5)
1003 "#,
1004 )
1005 .bind(buyer_id)
1006 .bind(seller_id)
1007 .bind(project_id)
1008 .bind(stripe_checkout_session_id)
1009 .bind(promo_code_id)
1010 .execute(pool)
1011 .await?;
1012
1013 Ok(())
1014 }
1015
1016 /// Delete a pending subscription promo-hold transaction by checkout session ID.
1017 /// Called from the subscription webhook after the subscription is created.
1018 #[tracing::instrument(skip_all)]
1019 pub async fn delete_subscription_pending_transaction<'e>(
1020 executor: impl sqlx::PgExecutor<'e>,
1021 stripe_checkout_session_id: &str,
1022 ) -> Result<()> {
1023 sqlx::query(
1024 "DELETE FROM transactions WHERE stripe_checkout_session_id = $1 AND status = 'pending'",
1025 )
1026 .bind(stripe_checkout_session_id)
1027 .execute(executor)
1028 .await?;
1029
1030 Ok(())
1031 }
1032
1033 /// Delete stale pending transactions (older than the given threshold) and return
1034 /// the promo_code_ids that need their use_count decremented.
1035 ///
1036 /// Stripe checkout sessions expire after 24 hours, so pending transactions older
1037 /// than that will never complete. This releases the pending purchase uniqueness
1038 /// slot and any reserved promo code use_count.
1039 #[tracing::instrument(skip_all)]
1040 pub async fn cleanup_stale_pending(
1041 pool: &PgPool,
1042 older_than: chrono::Duration,
1043 ) -> Result<Vec<Option<super::PromoCodeId>>> {
1044 let cutoff = chrono::Utc::now() - older_than;
1045 let rows: Vec<(Option<super::PromoCodeId>,)> = sqlx::query_as(
1046 r#"
1047 DELETE FROM transactions
1048 WHERE status = 'pending'
1049 AND created_at < $1
1050 RETURNING promo_code_id
1051 "#,
1052 )
1053 .bind(cutoff)
1054 .fetch_all(pool)
1055 .await?;
1056
1057 Ok(rows.into_iter().map(|(id,)| id).collect())
1058 }
1059
1060 /// Bulk variant of `get_pending_item_purchase`. Returns the subset of `item_ids`
1061 /// for which the buyer already has a `pending` transaction. Used by cart
1062 /// checkout to abort early when any line item would collide with the partial
1063 /// unique index on `(buyer_id, item_id) WHERE status = 'pending'`.
1064 #[tracing::instrument(skip_all)]
1065 pub async fn pending_subset(
1066 pool: &PgPool,
1067 buyer_id: UserId,
1068 item_ids: &[ItemId],
1069 ) -> Result<std::collections::HashSet<ItemId>> {
1070 if item_ids.is_empty() {
1071 return Ok(std::collections::HashSet::new());
1072 }
1073 let rows: Vec<(ItemId,)> = sqlx::query_as(
1074 "SELECT DISTINCT item_id FROM transactions
1075 WHERE buyer_id = $1 AND status = 'pending' AND item_id = ANY($2)",
1076 )
1077 .bind(buyer_id)
1078 .bind(item_ids)
1079 .fetch_all(pool)
1080 .await?;
1081 Ok(rows.into_iter().map(|(id,)| id).collect())
1082 }
1083
1084 /// Returns the buyer's pending transaction for a specific item, if any.
1085 /// Used to surface in-progress checkouts on the purchase page.
1086 #[tracing::instrument(skip_all)]
1087 pub async fn get_pending_item_purchase(
1088 pool: &PgPool,
1089 buyer_id: UserId,
1090 item_id: ItemId,
1091 ) -> Result<Option<(TransactionId, chrono::DateTime<chrono::Utc>)>> {
1092 let row: Option<(TransactionId, chrono::DateTime<chrono::Utc>)> = sqlx::query_as(
1093 r#"
1094 SELECT id, created_at FROM transactions
1095 WHERE buyer_id = $1 AND item_id = $2 AND status = 'pending'
1096 LIMIT 1
1097 "#,
1098 )
1099 .bind(buyer_id)
1100 .bind(item_id)
1101 .fetch_optional(pool)
1102 .await?;
1103
1104 Ok(row)
1105 }
1106
1107 /// Delete the buyer's pending transaction for a specific item.
1108 /// Returns any released `promo_code_id` so the caller can release its
1109 /// reservation.
1110 #[tracing::instrument(skip_all)]
1111 pub async fn delete_pending_item_purchase(
1112 pool: &PgPool,
1113 buyer_id: UserId,
1114 item_id: ItemId,
1115 ) -> Result<Option<super::PromoCodeId>> {
1116 let row: Option<(Option<super::PromoCodeId>,)> = sqlx::query_as(
1117 r#"
1118 DELETE FROM transactions
1119 WHERE buyer_id = $1 AND item_id = $2 AND status = 'pending'
1120 RETURNING promo_code_id
1121 "#,
1122 )
1123 .bind(buyer_id)
1124 .bind(item_id)
1125 .fetch_optional(pool)
1126 .await?;
1127
1128 Ok(row.and_then(|(id,)| id))
1129 }
1130
1131 /// Create a completed free guest transaction.
1132 ///
1133 /// Returns the number of rows inserted (0 if already claimed via ON CONFLICT).
1134 #[allow(clippy::too_many_arguments)]
1135 #[tracing::instrument(skip_all)]
1136 pub async fn create_free_guest_transaction(
1137 pool: &PgPool,
1138 buyer_id: Option<UserId>,
1139 seller_id: UserId,
1140 item_id: ItemId,
1141 checkout_session_id: &str,
1142 item_title: &str,
1143 seller_username: &str,
1144 guest_email: &str,
1145 claim_token: Option<ClaimToken>,
1146 download_token: DownloadToken,
1147 ) -> std::result::Result<u64, sqlx::Error> {
1148 let result = sqlx::query(
1149 r#"
1150 INSERT INTO transactions (
1151 buyer_id, seller_id, item_id, amount_cents, platform_fee_cents,
1152 stripe_checkout_session_id, status, completed_at,
1153 item_title, seller_username, share_contact,
1154 guest_email, claim_token, download_token
1155 )
1156 VALUES ($1, $2, $3, 0, 0, $4, 'completed', NOW(), $5, $6, false, $7, $8, $9)
1157 ON CONFLICT (guest_email, item_id) WHERE status = 'completed' AND guest_email IS NOT NULL DO NOTHING
1158 "#,
1159 )
1160 .bind(buyer_id)
1161 .bind(seller_id)
1162 .bind(item_id)
1163 .bind(checkout_session_id)
1164 .bind(item_title)
1165 .bind(seller_username)
1166 .bind(guest_email)
1167 .bind(claim_token)
1168 .bind(download_token)
1169 .execute(pool)
1170 .await?;
1171
1172 Ok(result.rows_affected())
1173 }
1174
1175 /// Record a free project claim (PWYW with $0 min or free project).
1176 ///
1177 /// Returns `true` if the claim was actually inserted, `false` if the buyer
1178 /// already owned the project. Mirrors the `claim_free_item` shape so callers
1179 /// can gate downstream side-effects (contact-revocation clear, sale-notification
1180 /// email, etc.) on the winner of a concurrent-claim race — without this signal,
1181 /// two concurrent `/checkout/project` POSTs both fire those side-effects
1182 /// regardless of which one's INSERT actually landed (Run #7 deferred SERIOUS).
1183 #[tracing::instrument(skip_all)]
1184 pub async fn claim_free_project(
1185 pool: &PgPool,
1186 buyer_id: UserId,
1187 seller_id: UserId,
1188 project_id: ProjectId,
1189 item_title: &str,
1190 seller_username: &str,
1191 share_contact: bool,
1192 ) -> Result<bool> {
1193 let result = sqlx::query(
1194 r#"
1195 INSERT INTO transactions (buyer_id, seller_id, project_id, amount_cents, platform_fee_cents,
1196 status, completed_at, item_title, seller_username, share_contact)
1197 VALUES ($1, $2, $3, 0, 0, 'completed', NOW(), $4, $5, $6)
1198 ON CONFLICT (buyer_id, project_id) WHERE status = 'completed' AND project_id IS NOT NULL DO NOTHING
1199 "#,
1200 )
1201 .bind(buyer_id)
1202 .bind(seller_id)
1203 .bind(project_id)
1204 .bind(item_title)
1205 .bind(seller_username)
1206 .bind(share_contact)
1207 .execute(pool)
1208 .await?;
1209
1210 Ok(result.rows_affected() > 0)
1211 }
1212
1213 /// Completed and refunded sales for a specific item, for the item dashboard Sales tab.
1214 #[tracing::instrument(skip_all)]
1215 pub async fn get_sales_by_item(
1216 pool: &PgPool,
1217 item_id: ItemId,
1218 seller_id: UserId,
1219 ) -> Result<Vec<DbTransaction>> {
1220 let rows = sqlx::query_as::<_, DbTransaction>(
1221 r#"
1222 SELECT * FROM transactions
1223 WHERE item_id = $1 AND seller_id = $2
1224 AND status IN ('completed', 'refunded')
1225 ORDER BY created_at DESC
1226 LIMIT 200
1227 "#,
1228 )
1229 .bind(item_id)
1230 .bind(seller_id)
1231 .fetch_all(pool)
1232 .await?;
1233
1234 Ok(rows)
1235 }
1236