Skip to main content

max / makenotwork

31.3 KB · 1093 lines History Blame Raw
1 //! User account CRUD, profile updates, and lookup queries.
2
3 use sqlx::PgPool;
4
5 use super::enums::AppealDecision;
6 use super::models::*;
7 use super::validated_types::{Email, Username};
8 use super::UserId;
9 use crate::error::Result;
10
11 /// Insert a new user and return the created row.
12 #[tracing::instrument(skip_all)]
13 pub async fn create_user(
14 pool: &PgPool,
15 username: &Username,
16 email: &Email,
17 password_hash: &str,
18 ) -> Result<DbUser> {
19 let user = sqlx::query_as::<_, DbUser>(
20 r#"
21 INSERT INTO users (username, email, password_hash)
22 VALUES ($1, $2, $3)
23 RETURNING *
24 "#,
25 )
26 .bind(username)
27 .bind(email)
28 .bind(password_hash)
29 .fetch_one(pool)
30 .await?;
31
32 Ok(user)
33 }
34
35 /// Fetch a user by primary key. Returns `None` if not found.
36 #[tracing::instrument(skip_all)]
37 pub async fn get_user_by_id(pool: &PgPool, id: UserId) -> Result<Option<DbUser>> {
38 let user = sqlx::query_as::<_, DbUser>("SELECT * FROM users WHERE id = $1")
39 .bind(id)
40 .fetch_optional(pool)
41 .await?;
42
43 Ok(user)
44 }
45
46 /// Fetch multiple users by ID in a single query.
47 #[tracing::instrument(skip_all)]
48 pub async fn get_users_by_ids(pool: &PgPool, ids: &[UserId]) -> Result<Vec<DbUser>> {
49 let users = sqlx::query_as::<_, DbUser>("SELECT * FROM users WHERE id = ANY($1)")
50 .bind(ids)
51 .fetch_all(pool)
52 .await?;
53 Ok(users)
54 }
55
56 /// Fetch a user by username. Returns `None` if not found.
57 #[tracing::instrument(skip_all)]
58 pub async fn get_user_by_username(pool: &PgPool, username: &Username) -> Result<Option<DbUser>> {
59 let user = sqlx::query_as::<_, DbUser>("SELECT * FROM users WHERE username = $1")
60 .bind(username)
61 .fetch_optional(pool)
62 .await?;
63
64 Ok(user)
65 }
66
67 /// Fetch a user by email address. Returns `None` if not found.
68 #[tracing::instrument(skip_all)]
69 pub async fn get_user_by_email(pool: &PgPool, email: &Email) -> Result<Option<DbUser>> {
70 let user = sqlx::query_as::<_, DbUser>("SELECT * FROM users WHERE email = $1")
71 .bind(email)
72 .fetch_optional(pool)
73 .await?;
74
75 Ok(user)
76 }
77
78 /// Update a user's display name and/or bio (COALESCE keeps existing values when `None`).
79 #[tracing::instrument(skip_all)]
80 pub async fn update_user_profile(
81 pool: &PgPool,
82 id: UserId,
83 display_name: Option<&str>,
84 bio: Option<&str>,
85 ) -> Result<DbUser> {
86 let user = sqlx::query_as::<_, DbUser>(
87 r#"
88 UPDATE users
89 SET display_name = COALESCE($2, display_name),
90 bio = COALESCE($3, bio)
91 WHERE id = $1
92 RETURNING *
93 "#,
94 )
95 .bind(id)
96 .bind(display_name)
97 .bind(bio)
98 .fetch_one(pool)
99 .await?;
100
101 Ok(user)
102 }
103
104 /// Replace a user's password hash and invalidate outstanding JWTs.
105 #[tracing::instrument(skip_all)]
106 pub async fn update_user_password(pool: &PgPool, id: UserId, password_hash: &str) -> Result<()> {
107 sqlx::query("UPDATE users SET password_hash = $2, jwt_invalidated_at = NOW() WHERE id = $1")
108 .bind(id)
109 .bind(password_hash)
110 .execute(pool)
111 .await?;
112
113 Ok(())
114 }
115
116 /// Increment the user's feed key version, revoking their current personal-feed
117 /// URL. Returns the new version (folded into the next URL's HMAC).
118 #[tracing::instrument(skip_all)]
119 pub async fn bump_feed_key_version(pool: &PgPool, id: UserId) -> Result<i32> {
120 let (version,): (i32,) = sqlx::query_as(
121 "UPDATE users SET feed_key_version = feed_key_version + 1, updated_at = NOW() \
122 WHERE id = $1 RETURNING feed_key_version",
123 )
124 .bind(id)
125 .fetch_one(pool)
126 .await?;
127
128 Ok(version)
129 }
130
131 /// Self-deactivate an account (enter limbo state).
132 ///
133 /// Bumps `jwt_invalidated_at` so any outstanding SyncKit JWTs minted from
134 /// this account stop authenticating immediately.
135 #[tracing::instrument(skip_all)]
136 pub async fn deactivate_user(pool: &PgPool, id: UserId) -> Result<()> {
137 sqlx::query(
138 "UPDATE users SET deactivated_at = NOW(), jwt_invalidated_at = NOW(), updated_at = NOW() WHERE id = $1",
139 )
140 .bind(id)
141 .execute(pool)
142 .await?;
143
144 Ok(())
145 }
146
147 /// Reactivate a self-deactivated account.
148 #[tracing::instrument(skip_all)]
149 pub async fn reactivate_user(pool: &PgPool, id: UserId) -> Result<()> {
150 sqlx::query(
151 "UPDATE users SET deactivated_at = NULL, updated_at = NOW() WHERE id = $1",
152 )
153 .bind(id)
154 .execute(pool)
155 .await?;
156
157 Ok(())
158 }
159
160 /// Admin: permanently terminate an account (enforcement ladder step 4).
161 /// The user has 30 days to export data. After that, the scheduler deletes the account.
162 /// The account must already be suspended.
163 #[tracing::instrument(skip_all)]
164 pub async fn terminate_user(pool: &PgPool, id: UserId) -> Result<()> {
165 sqlx::query(
166 "UPDATE users SET terminated_at = NOW(), jwt_invalidated_at = NOW(), updated_at = NOW() WHERE id = $1",
167 )
168 .bind(id)
169 .execute(pool)
170 .await?;
171
172 Ok(())
173 }
174
175 /// Get user IDs of terminated accounts whose 30-day export window has expired.
176 #[tracing::instrument(skip_all)]
177 pub async fn get_expired_terminated_ids(pool: &PgPool) -> Result<Vec<UserId>> {
178 let ids: Vec<UserId> = sqlx::query_scalar(
179 r#"
180 SELECT id FROM users
181 WHERE terminated_at IS NOT NULL
182 AND terminated_at < NOW() - INTERVAL '30 days'
183 ORDER BY terminated_at
184 LIMIT 1000
185 "#,
186 )
187 .fetch_all(pool)
188 .await?;
189
190 Ok(ids)
191 }
192
193 /// Permanently delete a user by ID.
194 #[tracing::instrument(skip_all)]
195 pub async fn delete_user(pool: &PgPool, id: UserId) -> Result<()> {
196 sqlx::query("DELETE FROM users WHERE id = $1")
197 .bind(id)
198 .execute(pool)
199 .await?;
200
201 Ok(())
202 }
203
204 /// Check whether this creator has any completed sales (transactions where they were the seller).
205 #[tracing::instrument(skip_all)]
206 pub async fn has_completed_sales(pool: &PgPool, id: UserId) -> Result<bool> {
207 let count: i64 = sqlx::query_scalar(
208 "SELECT COUNT(*) FROM transactions WHERE seller_id = $1 AND status = 'completed'",
209 )
210 .bind(id)
211 .fetch_one(pool)
212 .await?;
213
214 Ok(count > 0)
215 }
216
217 /// Schedule content removal 90 days from now. The user row is hidden from public
218 /// views but items remain accessible to buyers who previously purchased them.
219 /// After 90 days the scheduler deletes S3 objects and the user row.
220 #[tracing::instrument(skip_all)]
221 pub async fn schedule_content_removal(pool: &PgPool, id: UserId) -> Result<()> {
222 sqlx::query(
223 r#"
224 UPDATE users
225 SET content_removal_at = NOW() + INTERVAL '90 days',
226 deactivated_at = NOW(),
227 updated_at = NOW()
228 WHERE id = $1
229 "#,
230 )
231 .bind(id)
232 .execute(pool)
233 .await?;
234
235 Ok(())
236 }
237
238 /// Get user IDs whose 90-day content removal grace period has expired.
239 #[tracing::instrument(skip_all)]
240 pub async fn get_expired_content_removal_ids(pool: &PgPool) -> Result<Vec<UserId>> {
241 let ids: Vec<UserId> = sqlx::query_scalar(
242 r#"
243 SELECT id FROM users
244 WHERE content_removal_at IS NOT NULL
245 AND content_removal_at < NOW()
246 ORDER BY content_removal_at
247 LIMIT 1000
248 "#,
249 )
250 .fetch_all(pool)
251 .await?;
252
253 Ok(ids)
254 }
255
256 /// Create an ephemeral sandbox user. Returns the created row.
257 ///
258 /// The user gets `can_create_projects = true`, `email_verified = true`,
259 /// a SmallFiles creator tier, and a tight storage cap. The row is
260 /// automatically cleaned up by the scheduler after `sandbox_expires_at`.
261 #[tracing::instrument(skip_all)]
262 pub async fn create_sandbox_user(
263 pool: &PgPool,
264 username: &Username,
265 email: &Email,
266 password_hash: &str,
267 expiry_secs: i64,
268 ) -> Result<DbUser> {
269 let user = sqlx::query_as::<_, DbUser>(
270 r#"
271 INSERT INTO users (
272 username, email, password_hash,
273 is_sandbox, sandbox_expires_at,
274 can_create_projects, email_verified,
275 creator_tier
276 )
277 VALUES (
278 $1, $2, $3,
279 TRUE, NOW() + make_interval(secs => $4::float8),
280 TRUE, TRUE,
281 'small_files'
282 )
283 RETURNING *
284 "#,
285 )
286 .bind(username)
287 .bind(email)
288 .bind(password_hash)
289 .bind(expiry_secs as f64)
290 .fetch_one(pool)
291 .await?;
292
293 Ok(user)
294 }
295
296 /// Return IDs of sandbox users whose expiry has passed.
297 #[tracing::instrument(skip_all)]
298 pub async fn get_expired_sandbox_ids(pool: &PgPool) -> Result<Vec<UserId>> {
299 // Per-tick LIMIT bounds the supervisor's input list (the scheduler re-ticks
300 // and the WHERE re-excludes already-deleted rows, so the remainder is picked
301 // up next tick). Run #12 INFO — keeps a pathological mass-expiry from loading
302 // an unbounded id vec even though concurrency is already capped at 4.
303 let ids = sqlx::query_scalar::<_, UserId>(
304 "SELECT id FROM users WHERE is_sandbox = TRUE AND sandbox_expires_at < NOW() \
305 ORDER BY sandbox_expires_at LIMIT 1000",
306 )
307 .fetch_all(pool)
308 .await?;
309
310 Ok(ids)
311 }
312
313 /// Count active (non-expired) sandbox accounts created from a given IP.
314 /// Used to enforce the per-IP concurrent sandbox cap.
315 #[tracing::instrument(skip_all)]
316 pub async fn count_active_sandboxes_by_ip(pool: &PgPool, ip: &str) -> Result<i64> {
317 let count: i64 = sqlx::query_scalar(
318 r#"
319 SELECT COUNT(*) FROM users u
320 JOIN user_sessions us ON us.user_id = u.id
321 WHERE u.is_sandbox = TRUE
322 AND u.sandbox_expires_at > NOW()
323 AND us.ip_address = $1
324 "#,
325 )
326 .bind(ip)
327 .fetch_one(pool)
328 .await?;
329
330 Ok(count)
331 }
332
333 /// Update user's Stripe Connect account information after OAuth
334 #[tracing::instrument(skip_all)]
335 pub async fn update_user_stripe_account(
336 pool: &PgPool,
337 user_id: UserId,
338 stripe_account_id: &str,
339 onboarding_complete: bool,
340 payouts_enabled: bool,
341 charges_enabled: bool,
342 ) -> Result<DbUser> {
343 let user = sqlx::query_as::<_, DbUser>(
344 r#"
345 UPDATE users
346 SET stripe_account_id = $2,
347 stripe_onboarding_complete = $3,
348 stripe_payouts_enabled = $4,
349 stripe_charges_enabled = $5,
350 updated_at = NOW()
351 WHERE id = $1
352 RETURNING *
353 "#,
354 )
355 .bind(user_id)
356 .bind(stripe_account_id)
357 .bind(onboarding_complete)
358 .bind(payouts_enabled)
359 .bind(charges_enabled)
360 .fetch_one(pool)
361 .await?;
362
363 Ok(user)
364 }
365
366 /// Atomically set a user's Stripe Connect account ID, but only if one is not
367 /// already set. Returns `Some(user)` on success, or `None` if another request
368 /// already claimed the slot (race-condition guard).
369 #[tracing::instrument(skip_all)]
370 pub async fn try_set_stripe_account(
371 pool: &PgPool,
372 user_id: UserId,
373 stripe_account_id: &str,
374 ) -> Result<Option<DbUser>> {
375 let user = sqlx::query_as::<_, DbUser>(
376 r#"
377 UPDATE users
378 SET stripe_account_id = $2,
379 stripe_onboarding_complete = false,
380 stripe_payouts_enabled = false,
381 stripe_charges_enabled = false,
382 updated_at = NOW()
383 WHERE id = $1 AND (stripe_account_id IS NULL OR stripe_account_id = '')
384 RETURNING *
385 "#,
386 )
387 .bind(user_id)
388 .bind(stripe_account_id)
389 .fetch_optional(pool)
390 .await?;
391
392 Ok(user)
393 }
394
395 /// Update user's Stripe status from webhook (finds user by stripe_account_id)
396 #[tracing::instrument(skip_all)]
397 pub async fn update_user_stripe_status(
398 pool: &PgPool,
399 stripe_account_id: &str,
400 onboarding_complete: bool,
401 payouts_enabled: bool,
402 charges_enabled: bool,
403 ) -> Result<Option<DbUser>> {
404 let user = sqlx::query_as::<_, DbUser>(
405 r#"
406 UPDATE users
407 SET stripe_onboarding_complete = $2,
408 stripe_payouts_enabled = $3,
409 stripe_charges_enabled = $4,
410 updated_at = NOW()
411 WHERE stripe_account_id = $1
412 RETURNING *
413 "#,
414 )
415 .bind(stripe_account_id)
416 .bind(onboarding_complete)
417 .bind(payouts_enabled)
418 .bind(charges_enabled)
419 .fetch_optional(pool)
420 .await?;
421
422 Ok(user)
423 }
424
425 /// Mark a user's email as verified
426 #[tracing::instrument(skip_all)]
427 pub async fn verify_user_email(pool: &PgPool, user_id: UserId) -> Result<()> {
428 sqlx::query(
429 r#"
430 UPDATE users
431 SET email_verified = true,
432 email_verification_token = NULL,
433 updated_at = NOW()
434 WHERE id = $1
435 "#,
436 )
437 .bind(user_id)
438 .execute(pool)
439 .await?;
440
441 Ok(())
442 }
443
444 // ── Suspension / Appeals ──
445
446 /// Suspend a user account, clearing any prior appeal fields.
447 ///
448 /// Bumps `jwt_invalidated_at` so SyncKit JWTs minted for this user expire
449 /// at the next extractor check (subject to `SESSION_TOUCH_CACHE_SECS`).
450 #[tracing::instrument(skip_all)]
451 pub async fn suspend_user(pool: &PgPool, user_id: UserId, reason: &str) -> Result<()> {
452 sqlx::query(
453 r#"
454 UPDATE users
455 SET suspended_at = NOW(),
456 suspension_reason = $2,
457 jwt_invalidated_at = NOW(),
458 appeal_text = NULL,
459 appeal_submitted_at = NULL,
460 appeal_decision = NULL,
461 appeal_response = NULL,
462 appeal_decided_at = NULL,
463 updated_at = NOW()
464 WHERE id = $1
465 "#,
466 )
467 .bind(user_id)
468 .bind(reason)
469 .execute(pool)
470 .await?;
471
472 Ok(())
473 }
474
475 /// Remove suspension and clear all suspension/appeal fields.
476 #[tracing::instrument(skip_all)]
477 pub async fn unsuspend_user(pool: &PgPool, user_id: UserId) -> Result<()> {
478 sqlx::query(
479 r#"
480 UPDATE users
481 SET suspended_at = NULL,
482 suspension_reason = NULL,
483 appeal_text = NULL,
484 appeal_submitted_at = NULL,
485 appeal_decision = NULL,
486 appeal_response = NULL,
487 appeal_decided_at = NULL,
488 updated_at = NOW()
489 WHERE id = $1
490 "#,
491 )
492 .bind(user_id)
493 .execute(pool)
494 .await?;
495
496 Ok(())
497 }
498
499 // ── Creator pause (voluntary) ──
500
501 /// Set the creator_paused_at timestamp (voluntary pause).
502 #[tracing::instrument(skip_all)]
503 pub async fn pause_creator(pool: &PgPool, user_id: UserId) -> Result<()> {
504 sqlx::query(
505 "UPDATE users SET creator_paused_at = NOW(), updated_at = NOW() WHERE id = $1",
506 )
507 .bind(user_id)
508 .execute(pool)
509 .await?;
510
511 Ok(())
512 }
513
514 /// Clear the creator_paused_at timestamp (resume from voluntary pause).
515 #[tracing::instrument(skip_all)]
516 pub async fn unpause_creator(pool: &PgPool, user_id: UserId) -> Result<()> {
517 sqlx::query(
518 "UPDATE users SET creator_paused_at = NULL, updated_at = NOW() WHERE id = $1",
519 )
520 .bind(user_id)
521 .execute(pool)
522 .await?;
523
524 Ok(())
525 }
526
527 /// Submit an appeal for a suspended account, clearing any prior decision.
528 #[tracing::instrument(skip_all)]
529 pub async fn submit_appeal(pool: &PgPool, user_id: UserId, appeal_text: &str) -> Result<()> {
530 sqlx::query(
531 r#"
532 UPDATE users
533 SET appeal_text = $2,
534 appeal_submitted_at = NOW(),
535 appeal_decision = NULL,
536 appeal_response = NULL,
537 appeal_decided_at = NULL,
538 updated_at = NOW()
539 WHERE id = $1 AND suspended_at IS NOT NULL
540 "#,
541 )
542 .bind(user_id)
543 .bind(appeal_text)
544 .execute(pool)
545 .await?;
546
547 Ok(())
548 }
549
550 /// Resolve an appeal. If approved, also clears suspension.
551 #[tracing::instrument(skip_all)]
552 pub async fn resolve_appeal(
553 pool: &PgPool,
554 user_id: UserId,
555 decision: AppealDecision,
556 response: &str,
557 ) -> Result<()> {
558 if decision == AppealDecision::Approved {
559 // Approve: clear suspension entirely
560 sqlx::query(
561 r#"
562 UPDATE users
563 SET appeal_decision = $2,
564 appeal_response = $3,
565 appeal_decided_at = NOW(),
566 suspended_at = NULL,
567 suspension_reason = NULL,
568 updated_at = NOW()
569 WHERE id = $1
570 "#,
571 )
572 .bind(user_id)
573 .bind(decision)
574 .bind(response)
575 .execute(pool)
576 .await?;
577 } else {
578 // Deny: keep suspension, record decision
579 sqlx::query(
580 r#"
581 UPDATE users
582 SET appeal_decision = $2,
583 appeal_response = $3,
584 appeal_decided_at = NOW(),
585 updated_at = NOW()
586 WHERE id = $1
587 "#,
588 )
589 .bind(user_id)
590 .bind(decision)
591 .bind(response)
592 .execute(pool)
593 .await?;
594 }
595
596 Ok(())
597 }
598
599 /// Admin query: users with a pending appeal (submitted but not yet decided).
600 #[tracing::instrument(skip_all)]
601 pub async fn get_pending_appeals(pool: &PgPool) -> Result<Vec<DbUser>> {
602 let users = sqlx::query_as::<_, DbUser>(
603 r#"
604 SELECT * FROM users
605 WHERE appeal_submitted_at IS NOT NULL
606 AND appeal_decided_at IS NULL
607 ORDER BY appeal_submitted_at ASC
608 LIMIT 500
609 "#,
610 )
611 .fetch_all(pool)
612 .await?;
613
614 Ok(users)
615 }
616
617 /// Admin query: all users, optionally filtered by suspension status, with pagination.
618 #[tracing::instrument(skip_all)]
619 pub async fn get_all_users(
620 pool: &PgPool,
621 filter: Option<&str>,
622 limit: i64,
623 offset: i64,
624 ) -> Result<Vec<DbUser>> {
625 let limit = limit.min(200);
626 let users = match filter {
627 Some("suspended") => {
628 sqlx::query_as::<_, DbUser>(
629 "SELECT * FROM users WHERE suspended_at IS NOT NULL ORDER BY suspended_at DESC LIMIT $1 OFFSET $2",
630 )
631 .bind(limit)
632 .bind(offset)
633 .fetch_all(pool)
634 .await?
635 }
636 Some("active") => {
637 sqlx::query_as::<_, DbUser>(
638 "SELECT * FROM users WHERE suspended_at IS NULL ORDER BY created_at DESC LIMIT $1 OFFSET $2",
639 )
640 .bind(limit)
641 .bind(offset)
642 .fetch_all(pool)
643 .await?
644 }
645 _ => {
646 sqlx::query_as::<_, DbUser>(
647 "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
648 )
649 .bind(limit)
650 .bind(offset)
651 .fetch_all(pool)
652 .await?
653 }
654 };
655
656 Ok(users)
657 }
658
659 /// Count users matching a filter (for pagination totals).
660 #[tracing::instrument(skip_all)]
661 pub async fn count_users(pool: &PgPool, filter: Option<&str>) -> Result<i64> {
662 let count = match filter {
663 Some("suspended") => {
664 sqlx::query_scalar::<_, i64>(
665 "SELECT COUNT(*) FROM users WHERE suspended_at IS NOT NULL",
666 )
667 .fetch_one(pool)
668 .await?
669 }
670 Some("active") => {
671 sqlx::query_scalar::<_, i64>(
672 "SELECT COUNT(*) FROM users WHERE suspended_at IS NULL",
673 )
674 .fetch_one(pool)
675 .await?
676 }
677 _ => {
678 sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM users")
679 .fetch_one(pool)
680 .await?
681 }
682 };
683
684 Ok(count)
685 }
686
687 /// Count total and suspended users in a single query.
688 #[tracing::instrument(skip_all)]
689 pub async fn count_users_summary(pool: &PgPool) -> Result<(i64, i64)> {
690 let (total, suspended): (i64, i64) = sqlx::query_as(
691 r#"
692 SELECT
693 COUNT(*),
694 COUNT(*) FILTER (WHERE suspended_at IS NOT NULL)
695 FROM users
696 "#,
697 )
698 .fetch_one(pool)
699 .await?;
700
701 Ok((total, suspended))
702 }
703
704 /// Get all user emails for bulk notifications (e.g. shutdown notice).
705 #[tracing::instrument(skip_all)]
706 pub async fn get_all_user_emails(pool: &PgPool) -> Result<Vec<(String, Option<String>)>> {
707 let rows = sqlx::query_as::<_, (String, Option<String>)>(
708 "SELECT email, display_name FROM users ORDER BY created_at ASC",
709 )
710 .fetch_all(pool)
711 .await?;
712
713 Ok(rows)
714 }
715
716 /// Update a user's email notification preferences.
717 #[allow(clippy::too_many_arguments)]
718 #[tracing::instrument(skip_all)]
719 pub async fn update_notification_preferences(
720 pool: &PgPool,
721 id: UserId,
722 notify_sale: bool,
723 notify_follower: bool,
724 notify_release: bool,
725 login_notification_enabled: bool,
726 notify_issues: bool,
727 notify_status: bool,
728 ) -> Result<()> {
729 sqlx::query(
730 r#"
731 UPDATE users
732 SET notify_sale = $2,
733 notify_follower = $3,
734 notify_release = $4,
735 login_notification_enabled = $5,
736 notify_issues = $6,
737 notify_status = $7,
738 updated_at = NOW()
739 WHERE id = $1
740 "#,
741 )
742 .bind(id)
743 .bind(notify_sale)
744 .bind(notify_follower)
745 .bind(notify_release)
746 .bind(login_notification_enabled)
747 .bind(notify_issues)
748 .bind(notify_status)
749 .execute(pool)
750 .await?;
751
752 Ok(())
753 }
754
755 /// Update a user's tip preferences (tips_enabled toggle and notification).
756 #[tracing::instrument(skip_all)]
757 pub async fn update_tip_preferences(
758 pool: &PgPool,
759 id: UserId,
760 tips_enabled: bool,
761 notify_tip: bool,
762 ) -> Result<()> {
763 sqlx::query(
764 r#"
765 UPDATE users
766 SET tips_enabled = $2,
767 notify_tip = $3,
768 updated_at = NOW()
769 WHERE id = $1
770 "#,
771 )
772 .bind(id)
773 .bind(tips_enabled)
774 .bind(notify_tip)
775 .execute(pool)
776 .await?;
777
778 Ok(())
779 }
780
781 /// Disable a single notification preference by column name.
782 ///
783 /// Used by the email unsubscribe handler. Only accepts known column names
784 /// to prevent SQL injection.
785 #[tracing::instrument(skip_all)]
786 pub async fn disable_notification(pool: &PgPool, user_id: UserId, preference: &str) -> Result<bool> {
787 let sql = match preference {
788 "notify_sale" => "UPDATE users SET notify_sale = false, updated_at = NOW() WHERE id = $1",
789 "notify_follower" => "UPDATE users SET notify_follower = false, updated_at = NOW() WHERE id = $1",
790 "notify_release" => "UPDATE users SET notify_release = false, updated_at = NOW() WHERE id = $1",
791 "login_notification_enabled" => "UPDATE users SET login_notification_enabled = false, updated_at = NOW() WHERE id = $1",
792 "notify_issues" => "UPDATE users SET notify_issues = false, updated_at = NOW() WHERE id = $1",
793 "notify_tip" => "UPDATE users SET notify_tip = false, updated_at = NOW() WHERE id = $1",
794 "notify_status" => "UPDATE users SET notify_status = false, updated_at = NOW() WHERE id = $1",
795 _ => return Ok(false),
796 };
797 let result = sqlx::query(sql).bind(user_id).execute(pool).await?;
798 Ok(result.rows_affected() > 0)
799 }
800
801 /// A user who opted into platform status notifications.
802 #[derive(sqlx::FromRow)]
803 pub struct StatusAlertSubscriber {
804 pub id: UserId,
805 pub email: Email,
806 pub display_name: Option<String>,
807 }
808
809 /// Get all users who opted into platform status notifications.
810 ///
811 /// Hard cap at 10k rows so the monitor's status-change fan-out can't unbox
812 /// an unbounded query into RAM. The 100ms pacing in `monitor.rs` already
813 /// limits fan-out throughput to ~600/minute — anything past 10k would
814 /// chew through Postmark rate limits anyway. If we ever hit the cap a
815 /// WARN fires so we know to switch to a paged dispatch model.
816 #[tracing::instrument(skip_all)]
817 pub async fn get_status_alert_subscribers(pool: &PgPool) -> Result<Vec<StatusAlertSubscriber>> {
818 const STATUS_SUBSCRIBER_CAP: i64 = 10_000;
819 let rows = sqlx::query_as::<_, StatusAlertSubscriber>(
820 "SELECT id, email, display_name FROM users \
821 WHERE notify_status = true AND deactivated_at IS NULL \
822 ORDER BY id LIMIT $1",
823 )
824 .bind(STATUS_SUBSCRIBER_CAP)
825 .fetch_all(pool)
826 .await?;
827 if rows.len() as i64 == STATUS_SUBSCRIBER_CAP {
828 tracing::warn!(
829 cap = STATUS_SUBSCRIBER_CAP,
830 "get_status_alert_subscribers hit hard cap; promote to paged dispatch"
831 );
832 }
833 Ok(rows)
834 }
835
836 /// Atomically check-and-set broadcast timestamp. Returns false if already sent within 24 hours.
837 #[tracing::instrument(skip_all)]
838 pub async fn try_set_broadcast_at(pool: &PgPool, user_id: UserId) -> Result<bool> {
839 let result = sqlx::query(
840 r#"
841 UPDATE users
842 SET last_broadcast_at = NOW()
843 WHERE id = $1
844 AND (last_broadcast_at IS NULL OR last_broadcast_at < NOW() - INTERVAL '24 hours')
845 "#,
846 )
847 .bind(user_id)
848 .execute(pool)
849 .await?;
850
851 Ok(result.rows_affected() > 0)
852 }
853
854 /// Release the 24h broadcast slot. Used when a broadcast is refused after the
855 /// slot has already been claimed (e.g. recipient cap exceeded) so the creator
856 /// can retry without waiting a day.
857 #[tracing::instrument(skip_all)]
858 pub async fn clear_broadcast_at(pool: &PgPool, user_id: UserId) -> Result<()> {
859 sqlx::query("UPDATE users SET last_broadcast_at = NULL WHERE id = $1")
860 .bind(user_id)
861 .execute(pool)
862 .await?;
863 Ok(())
864 }
865
866 // ── Upload Trust ──
867
868 /// Check if a user is trusted for uploads (bypasses review queue).
869 #[tracing::instrument(skip_all)]
870 pub async fn is_upload_trusted(pool: &PgPool, user_id: UserId) -> Result<bool> {
871 let trusted = sqlx::query_scalar::<_, bool>(
872 "SELECT upload_trusted FROM users WHERE id = $1",
873 )
874 .bind(user_id)
875 .fetch_one(pool)
876 .await?;
877
878 Ok(trusted)
879 }
880
881 /// Set a user's upload trust status.
882 #[tracing::instrument(skip_all)]
883 pub async fn set_upload_trusted(pool: &PgPool, user_id: UserId, trusted: bool) -> Result<()> {
884 sqlx::query(
885 r#"
886 UPDATE users
887 SET upload_trusted = $2,
888 updated_at = NOW()
889 WHERE id = $1
890 "#,
891 )
892 .bind(user_id)
893 .bind(trusted)
894 .execute(pool)
895 .await?;
896
897 Ok(())
898 }
899
900 // ── Onboarding email drip ──
901
902 /// Users who need the next onboarding email. Returns users at a given step
903 /// whose last email was sent more than `min_age` ago (or never).
904 #[tracing::instrument(skip_all)]
905 pub async fn get_onboarding_candidates(
906 pool: &PgPool,
907 step: i16,
908 min_age: chrono::Duration,
909 ) -> Result<Vec<DbUser>> {
910 let cutoff = chrono::Utc::now() - min_age;
911 // Per-tick LIMIT bounds the scheduler's input list. The caller advances
912 // each returned user's step (so the WHERE re-excludes them), meaning the
913 // remainder is drained on the next tick — same re-tick pattern as the
914 // sandbox/terminated/content-removal cleanup queries. Run #14 MEDIUM: a
915 // signup surge must not load an unbounded user vec into the lock-held tick.
916 let users = sqlx::query_as::<_, DbUser>(
917 "SELECT * FROM users
918 WHERE onboarding_email_step = $1
919 AND (onboarding_email_sent_at IS NULL OR onboarding_email_sent_at < $2)
920 AND suspended_at IS NULL
921 ORDER BY onboarding_email_sent_at ASC NULLS FIRST
922 LIMIT 1000",
923 )
924 .bind(step)
925 .bind(cutoff)
926 .fetch_all(pool)
927 .await?;
928 Ok(users)
929 }
930
931 /// Advance a user's onboarding email step and record the send time.
932 #[tracing::instrument(skip_all)]
933 pub async fn advance_onboarding_step(pool: &PgPool, user_id: UserId, new_step: i16) -> Result<()> {
934 sqlx::query(
935 "UPDATE users SET onboarding_email_step = $2, onboarding_email_sent_at = NOW() WHERE id = $1",
936 )
937 .bind(user_id)
938 .bind(new_step)
939 .execute(pool)
940 .await?;
941 Ok(())
942 }
943
944 /// Advance onboarding step for multiple users in a single query.
945 #[tracing::instrument(skip_all)]
946 pub async fn batch_advance_onboarding_step(
947 pool: &PgPool,
948 user_ids: &[UserId],
949 new_step: i16,
950 ) -> Result<()> {
951 sqlx::query(
952 "UPDATE users SET onboarding_email_step = $2, onboarding_email_sent_at = NOW() WHERE id = ANY($1)",
953 )
954 .bind(user_ids)
955 .bind(new_step)
956 .execute(pool)
957 .await?;
958 Ok(())
959 }
960
961 /// Mark a user as a founder. Called when they start a creator-tier
962 /// subscription while the founder pricing window is open. Sticky; never
963 /// reset, even on cancellation. Subsequent re-subscriptions during the
964 /// window keep their founder status. After the window closes, eligibility
965 /// is determined by `founder_locked_at` (stamped only for users with an
966 /// active subscription at the close-time snapshot).
967 ///
968 /// **DIY exclusion**: DIY-tier accounts are not full members and must not
969 /// qualify for founder pricing (`project_founder_pricing.md` § decision 5).
970 /// Only call this from creator-tier (Basic/SmallFiles/BigFiles/Everything)
971 /// checkout paths. When DIY ships, its checkout path must NOT invoke this.
972 #[tracing::instrument(skip_all)]
973 pub async fn mark_user_as_founder(pool: &PgPool, user_id: UserId) -> Result<()> {
974 sqlx::query(
975 r#"
976 UPDATE users
977 SET is_founder = TRUE,
978 updated_at = NOW()
979 WHERE id = $1 AND is_founder = FALSE
980 "#,
981 )
982 .bind(user_id)
983 .execute(pool)
984 .await?;
985 Ok(())
986 }
987
988 /// Close the founder pricing window by stamping `founder_locked_at` on every
989 /// user who is currently flagged `is_founder` AND has an active creator-tier
990 /// subscription. Returns the number of users locked in. Idempotent: skips
991 /// any user already locked. Intended to be called once from an admin tool
992 /// at the moment the founder window closes.
993 #[tracing::instrument(skip_all)]
994 pub async fn lock_in_founders_with_active_subscriptions(pool: &PgPool) -> Result<u64> {
995 let result = sqlx::query(
996 r#"
997 UPDATE users u
998 SET founder_locked_at = NOW(),
999 updated_at = NOW()
1000 WHERE u.is_founder = TRUE
1001 AND u.founder_locked_at IS NULL
1002 AND EXISTS (
1003 SELECT 1 FROM creator_subscriptions s
1004 WHERE s.user_id = u.id
1005 AND s.status = 'active'
1006 )
1007 "#,
1008 )
1009 .execute(pool)
1010 .await?;
1011 Ok(result.rows_affected())
1012 }
1013
1014 /// Update a user's Stripe Tax toggle.
1015 #[tracing::instrument(skip_all)]
1016 pub async fn update_stripe_tax_enabled(pool: &PgPool, user_id: UserId, enabled: bool) -> Result<()> {
1017 sqlx::query(
1018 r#"
1019 UPDATE users
1020 SET stripe_tax_enabled = $2,
1021 updated_at = NOW()
1022 WHERE id = $1
1023 "#,
1024 )
1025 .bind(user_id)
1026 .bind(enabled)
1027 .execute(pool)
1028 .await?;
1029
1030 Ok(())
1031 }
1032
1033 /// Disconnect a user's Stripe account
1034 #[tracing::instrument(skip_all)]
1035 pub async fn disconnect_user_stripe(pool: &PgPool, user_id: UserId) -> Result<DbUser> {
1036 let user = sqlx::query_as::<_, DbUser>(
1037 r#"
1038 UPDATE users
1039 SET stripe_account_id = NULL,
1040 stripe_onboarding_complete = false,
1041 stripe_payouts_enabled = false,
1042 stripe_charges_enabled = false,
1043 updated_at = NOW()
1044 WHERE id = $1
1045 RETURNING *
1046 "#,
1047 )
1048 .bind(user_id)
1049 .fetch_one(pool)
1050 .await?;
1051
1052 Ok(user)
1053 }
1054
1055 /// Fetch the current cache generation for a user (cheap, indexed lookup).
1056 #[tracing::instrument(skip_all)]
1057 pub async fn get_cache_generation(pool: &PgPool, user_id: UserId) -> Result<i64> {
1058 let generation = sqlx::query_scalar::<_, i64>(
1059 "SELECT cache_generation FROM users WHERE id = $1",
1060 )
1061 .bind(user_id)
1062 .fetch_one(pool)
1063 .await?;
1064
1065 Ok(generation)
1066 }
1067
1068 /// Atomically increment the user's cache generation counter.
1069 /// Call after any write that changes user-visible dashboard data.
1070 #[tracing::instrument(skip_all)]
1071 pub async fn bump_cache_generation(pool: &PgPool, user_id: UserId) -> Result<()> {
1072 sqlx::query("UPDATE users SET cache_generation = cache_generation + 1 WHERE id = $1")
1073 .bind(user_id)
1074 .execute(pool)
1075 .await?;
1076
1077 Ok(())
1078 }
1079
1080 /// Look up a verified user by email (case-insensitive).
1081 /// Returns the user ID if a verified account exists with that email.
1082 #[tracing::instrument(skip_all)]
1083 pub async fn get_verified_user_id_by_email(pool: &PgPool, email: &Email) -> Result<Option<UserId>> {
1084 let id = sqlx::query_scalar(
1085 "SELECT id FROM users WHERE LOWER(email) = LOWER($1) AND email_verified = true",
1086 )
1087 .bind(email)
1088 .fetch_optional(pool)
1089 .await?;
1090
1091 Ok(id)
1092 }
1093