Skip to main content

max / makenotwork

28.9 KB · 1044 lines History Blame Raw
1 //! Database write mutations — inserts, updates, deletes.
2
3 use chrono::{DateTime, Utc};
4 use mt_core::types::{BanType, CommunityState, ModAction};
5 use sqlx::PgPool;
6 use uuid::Uuid;
7
8 /// Create a new community and return its ID.
9 #[tracing::instrument(skip_all)]
10 pub async fn create_community(
11 pool: &PgPool,
12 name: &str,
13 slug: &str,
14 description: Option<&str>,
15 ) -> Result<Uuid, sqlx::Error> {
16 let row: (Uuid,) = sqlx::query_as(
17 "INSERT INTO communities (name, slug, description)
18 VALUES ($1, $2, $3)
19 RETURNING id",
20 )
21 .bind(name)
22 .bind(slug)
23 .bind(description)
24 .fetch_one(pool)
25 .await?;
26 Ok(row.0)
27 }
28
29 /// Upsert a user from MNW account data. Creates the user if they don't exist,
30 /// updates username/display_name if they do.
31 #[tracing::instrument(skip_all)]
32 pub async fn upsert_user(
33 pool: &PgPool,
34 mnw_account_id: Uuid,
35 username: &str,
36 display_name: Option<&str>,
37 ) -> Result<(), sqlx::Error> {
38 sqlx::query(
39 "INSERT INTO users (mnw_account_id, username, display_name)
40 VALUES ($1, $2, $3)
41 ON CONFLICT (mnw_account_id) DO UPDATE
42 SET username = $2, display_name = $3, updated_at = now()",
43 )
44 .bind(mnw_account_id)
45 .bind(username)
46 .bind(display_name)
47 .execute(pool)
48 .await?;
49 Ok(())
50 }
51
52 /// Ensure a user has a membership in a community with the given role.
53 /// Creates membership if none exists, does nothing if already a member.
54 #[tracing::instrument(skip_all)]
55 pub async fn ensure_membership_with_role(
56 pool: &PgPool,
57 user_id: Uuid,
58 community_id: Uuid,
59 role: &str,
60 ) -> Result<(), sqlx::Error> {
61 sqlx::query(
62 "INSERT INTO memberships (user_id, community_id, role)
63 VALUES ($1, $2, $3)
64 ON CONFLICT (user_id, community_id) DO NOTHING",
65 )
66 .bind(user_id)
67 .bind(community_id)
68 .bind(role)
69 .execute(pool)
70 .await?;
71 Ok(())
72 }
73
74 /// Create a thread with an external reference. Returns the thread ID.
75 #[tracing::instrument(skip_all)]
76 pub async fn create_thread_with_external_ref(
77 pool: &PgPool,
78 category_id: Uuid,
79 author_id: Uuid,
80 title: &str,
81 external_ref: &str,
82 ) -> Result<Uuid, sqlx::Error> {
83 let row: (Uuid,) = sqlx::query_as(
84 "INSERT INTO threads (category_id, author_id, title, external_ref)
85 VALUES ($1, $2, $3, $4)
86 RETURNING id",
87 )
88 .bind(category_id)
89 .bind(author_id)
90 .bind(title)
91 .bind(external_ref)
92 .fetch_one(pool)
93 .await?;
94 Ok(row.0)
95 }
96
97 /// Ensure a user has a membership in a community. Creates a 'member' role if none exists.
98 #[tracing::instrument(skip_all)]
99 pub async fn ensure_membership(
100 pool: &PgPool,
101 user_id: Uuid,
102 community_id: Uuid,
103 ) -> Result<(), sqlx::Error> {
104 sqlx::query(
105 "INSERT INTO memberships (user_id, community_id, role)
106 VALUES ($1, $2, 'member')
107 ON CONFLICT (user_id, community_id) DO NOTHING",
108 )
109 .bind(user_id)
110 .bind(community_id)
111 .execute(pool)
112 .await?;
113 Ok(())
114 }
115
116 /// Insert a new thread and return its ID.
117 #[tracing::instrument(skip_all)]
118 pub async fn create_thread(
119 pool: &PgPool,
120 category_id: Uuid,
121 author_id: Uuid,
122 title: &str,
123 ) -> Result<Uuid, sqlx::Error> {
124 let row: (Uuid,) = sqlx::query_as(
125 "INSERT INTO threads (category_id, author_id, title)
126 VALUES ($1, $2, $3)
127 RETURNING id",
128 )
129 .bind(category_id)
130 .bind(author_id)
131 .bind(title)
132 .fetch_one(pool)
133 .await?;
134 Ok(row.0)
135 }
136
137 /// Insert a new post and bump the thread's last_activity_at atomically.
138 /// When `is_reply` is true, also increments the denormalized reply_count.
139 /// Pass false for the opening post (OP), true for all subsequent replies.
140 #[tracing::instrument(skip_all)]
141 pub async fn create_post(
142 pool: &PgPool,
143 thread_id: Uuid,
144 author_id: Uuid,
145 body_markdown: &str,
146 body_html: &str,
147 is_reply: bool,
148 ) -> Result<Uuid, sqlx::Error> {
149 let mut tx = pool.begin().await?;
150
151 let row: (Uuid,) = sqlx::query_as(
152 "INSERT INTO posts (thread_id, author_id, body_markdown, body_html)
153 VALUES ($1, $2, $3, $4)
154 RETURNING id",
155 )
156 .bind(thread_id)
157 .bind(author_id)
158 .bind(body_markdown)
159 .bind(body_html)
160 .fetch_one(&mut *tx)
161 .await?;
162
163 if is_reply {
164 sqlx::query(
165 "UPDATE threads SET last_activity_at = now(), reply_count = reply_count + 1 WHERE id = $1",
166 )
167 .bind(thread_id)
168 .execute(&mut *tx)
169 .await?;
170 } else {
171 sqlx::query("UPDATE threads SET last_activity_at = now() WHERE id = $1")
172 .bind(thread_id)
173 .execute(&mut *tx)
174 .await?;
175 }
176
177 tx.commit().await?;
178 Ok(row.0)
179 }
180
181 /// Insert a footnote on a post. Returns the footnote ID.
182 #[tracing::instrument(skip_all)]
183 pub async fn insert_footnote(
184 pool: &PgPool,
185 post_id: Uuid,
186 author_id: Uuid,
187 body_markdown: &str,
188 body_html: &str,
189 ) -> Result<Uuid, sqlx::Error> {
190 let row: (Uuid,) = sqlx::query_as(
191 "INSERT INTO post_footnotes (post_id, author_id, body_markdown, body_html)
192 VALUES ($1, $2, $3, $4)
193 RETURNING id",
194 )
195 .bind(post_id)
196 .bind(author_id)
197 .bind(body_markdown)
198 .bind(body_html)
199 .fetch_one(pool)
200 .await?;
201 Ok(row.0)
202 }
203
204 /// Mod-remove a post: set removed_by/removed_at. Content stays intact for audit.
205 /// Returns true if the post was actually removed (false if already removed).
206 #[tracing::instrument(skip_all)]
207 pub async fn mod_remove_post(
208 pool: &PgPool,
209 post_id: Uuid,
210 removed_by_id: Uuid,
211 ) -> Result<bool, sqlx::Error> {
212 let result = sqlx::query(
213 "UPDATE posts SET removed_by = $2, removed_at = now()
214 WHERE id = $1 AND removed_at IS NULL",
215 )
216 .bind(post_id)
217 .bind(removed_by_id)
218 .execute(pool)
219 .await?;
220 Ok(result.rows_affected() > 0)
221 }
222
223 /// Atomically auto-hide a post if pending flag count meets the threshold.
224 /// Combines count check and removal in a single query to avoid race conditions.
225 /// Returns true if the post was actually removed.
226 /// Sets removed_by to NULL (system action) — the mod log records the event.
227 #[tracing::instrument(skip_all)]
228 pub async fn auto_hide_if_threshold_met(
229 pool: &PgPool,
230 post_id: Uuid,
231 threshold: i32,
232 ) -> Result<bool, sqlx::Error> {
233 let result = sqlx::query(
234 "UPDATE posts SET removed_by = NULL, removed_at = now()
235 WHERE id = $1 AND removed_at IS NULL
236 AND (SELECT COUNT(*) FROM post_flags WHERE post_id = $1 AND resolved_at IS NULL) >= $2",
237 )
238 .bind(post_id)
239 .bind(threshold as i64)
240 .execute(pool)
241 .await?;
242 Ok(result.rows_affected() > 0)
243 }
244
245 /// Update a thread's title.
246 #[tracing::instrument(skip_all)]
247 pub async fn update_thread_title(
248 pool: &PgPool,
249 thread_id: Uuid,
250 title: &str,
251 ) -> Result<(), sqlx::Error> {
252 sqlx::query("UPDATE threads SET title = $2 WHERE id = $1")
253 .bind(thread_id)
254 .bind(title)
255 .execute(pool)
256 .await?;
257 Ok(())
258 }
259
260 /// Soft-delete a thread: set deleted_at (hides from listings).
261 #[tracing::instrument(skip_all)]
262 pub async fn soft_delete_thread(pool: &PgPool, thread_id: Uuid) -> Result<(), sqlx::Error> {
263 sqlx::query("UPDATE threads SET deleted_at = now() WHERE id = $1")
264 .bind(thread_id)
265 .execute(pool)
266 .await?;
267 Ok(())
268 }
269
270 /// Set or unset the pinned flag on a thread.
271 #[tracing::instrument(skip_all)]
272 pub async fn set_thread_pinned(
273 pool: &PgPool,
274 thread_id: Uuid,
275 pinned: bool,
276 ) -> Result<(), sqlx::Error> {
277 sqlx::query("UPDATE threads SET pinned = $2 WHERE id = $1")
278 .bind(thread_id)
279 .bind(pinned)
280 .execute(pool)
281 .await?;
282 Ok(())
283 }
284
285 /// Set or unset the locked flag on a thread.
286 #[tracing::instrument(skip_all)]
287 pub async fn set_thread_locked(
288 pool: &PgPool,
289 thread_id: Uuid,
290 locked: bool,
291 ) -> Result<(), sqlx::Error> {
292 sqlx::query("UPDATE threads SET locked = $2 WHERE id = $1")
293 .bind(thread_id)
294 .bind(locked)
295 .execute(pool)
296 .await?;
297 Ok(())
298 }
299
300 /// Update a community's name and description.
301 #[tracing::instrument(skip_all)]
302 pub async fn update_community(
303 pool: &PgPool,
304 community_id: Uuid,
305 name: &str,
306 description: Option<&str>,
307 auto_hide_threshold: Option<i32>,
308 ) -> Result<(), sqlx::Error> {
309 sqlx::query(
310 "UPDATE communities SET name = $2, description = $3, auto_hide_threshold = $4 WHERE id = $1",
311 )
312 .bind(community_id)
313 .bind(name)
314 .bind(description)
315 .bind(auto_hide_threshold)
316 .execute(pool)
317 .await?;
318 Ok(())
319 }
320
321 /// Create a new category in a community.
322 #[tracing::instrument(skip_all)]
323 pub async fn create_category(
324 pool: &PgPool,
325 community_id: Uuid,
326 name: &str,
327 slug: &str,
328 description: Option<&str>,
329 sort_order: i32,
330 ) -> Result<Uuid, sqlx::Error> {
331 let row: (Uuid,) = sqlx::query_as(
332 "INSERT INTO categories (community_id, name, slug, description, sort_order)
333 VALUES ($1, $2, $3, $4, $5)
334 RETURNING id",
335 )
336 .bind(community_id)
337 .bind(name)
338 .bind(slug)
339 .bind(description)
340 .bind(sort_order)
341 .fetch_one(pool)
342 .await?;
343 Ok(row.0)
344 }
345
346 /// Update a category's name and description (scoped to community).
347 #[tracing::instrument(skip_all)]
348 pub async fn update_category(
349 pool: &PgPool,
350 category_id: Uuid,
351 community_id: Uuid,
352 name: &str,
353 description: Option<&str>,
354 ) -> Result<bool, sqlx::Error> {
355 let result = sqlx::query(
356 "UPDATE categories SET name = $2, description = $3 WHERE id = $1 AND community_id = $4",
357 )
358 .bind(category_id)
359 .bind(name)
360 .bind(description)
361 .bind(community_id)
362 .execute(pool)
363 .await?;
364 Ok(result.rows_affected() > 0)
365 }
366
367 /// Swap the sort_order of two categories atomically.
368 #[tracing::instrument(skip_all)]
369 pub async fn swap_category_order(
370 pool: &PgPool,
371 id_a: Uuid,
372 order_a: i32,
373 id_b: Uuid,
374 order_b: i32,
375 ) -> Result<(), sqlx::Error> {
376 let mut tx = pool.begin().await?;
377 sqlx::query("UPDATE categories SET sort_order = $2 WHERE id = $1")
378 .bind(id_a)
379 .bind(order_b)
380 .execute(&mut *tx)
381 .await?;
382 sqlx::query("UPDATE categories SET sort_order = $2 WHERE id = $1")
383 .bind(id_b)
384 .bind(order_a)
385 .execute(&mut *tx)
386 .await?;
387 tx.commit().await?;
388 Ok(())
389 }
390
391 /// Look up a category ID by community slug + category slug.
392 #[tracing::instrument(skip_all)]
393 pub async fn get_category_id_by_slugs(
394 pool: &PgPool,
395 community_slug: &str,
396 category_slug: &str,
397 ) -> Result<Option<Uuid>, sqlx::Error> {
398 let row: Option<(Uuid,)> = sqlx::query_as(
399 "SELECT c.id
400 FROM categories c
401 JOIN communities co ON co.id = c.community_id
402 WHERE co.slug = $1 AND c.slug = $2",
403 )
404 .bind(community_slug)
405 .bind(category_slug)
406 .fetch_optional(pool)
407 .await?;
408 Ok(row.map(|r| r.0))
409 }
410
411 // ============================================================================
412 // Ban / mute mutations
413 // ============================================================================
414
415 /// Create or update a ban/mute. Returns the ban ID.
416 #[tracing::instrument(skip_all)]
417 pub async fn create_community_ban(
418 pool: &PgPool,
419 community_id: Uuid,
420 user_id: Uuid,
421 banned_by: Uuid,
422 ban_type: BanType,
423 reason: Option<&str>,
424 expires_at: Option<DateTime<Utc>>,
425 ) -> Result<Uuid, sqlx::Error> {
426 let row: (Uuid,) = sqlx::query_as(
427 "INSERT INTO community_bans (community_id, user_id, banned_by, ban_type, reason, expires_at)
428 VALUES ($1, $2, $3, $4, $5, $6)
429 ON CONFLICT (community_id, user_id, ban_type) DO UPDATE
430 SET banned_by = $3, reason = $5, expires_at = $6, created_at = now()
431 RETURNING id",
432 )
433 .bind(community_id)
434 .bind(user_id)
435 .bind(banned_by)
436 .bind(ban_type.as_str())
437 .bind(reason)
438 .bind(expires_at)
439 .fetch_one(pool)
440 .await?;
441 Ok(row.0)
442 }
443
444 /// Delete all bans/mutes whose expiration has passed.
445 #[tracing::instrument(skip_all)]
446 pub async fn cleanup_expired_bans(pool: &PgPool, community_id: Uuid) -> Result<u64, sqlx::Error> {
447 let result = sqlx::query(
448 "DELETE FROM community_bans
449 WHERE community_id = $1 AND expires_at IS NOT NULL AND expires_at <= now()",
450 )
451 .bind(community_id)
452 .execute(pool)
453 .await?;
454 Ok(result.rows_affected())
455 }
456
457 /// Remove a ban or mute.
458 #[tracing::instrument(skip_all)]
459 pub async fn remove_community_ban(
460 pool: &PgPool,
461 community_id: Uuid,
462 user_id: Uuid,
463 ban_type: BanType,
464 ) -> Result<(), sqlx::Error> {
465 sqlx::query(
466 "DELETE FROM community_bans
467 WHERE community_id = $1 AND user_id = $2 AND ban_type = $3",
468 )
469 .bind(community_id)
470 .bind(user_id)
471 .bind(ban_type.as_str())
472 .execute(pool)
473 .await?;
474 Ok(())
475 }
476
477 /// Insert a mod log entry.
478 #[tracing::instrument(skip_all)]
479 pub async fn insert_mod_log(
480 pool: &PgPool,
481 community_id: Option<Uuid>,
482 actor_id: Uuid,
483 action: ModAction,
484 target_user: Option<Uuid>,
485 target_id: Option<Uuid>,
486 reason: Option<&str>,
487 ) -> Result<(), sqlx::Error> {
488 sqlx::query(
489 "INSERT INTO mod_log (community_id, actor_id, action, target_user, target_id, reason)
490 VALUES ($1, $2, $3, $4, $5, $6)",
491 )
492 .bind(community_id)
493 .bind(actor_id)
494 .bind(action.as_str())
495 .bind(target_user)
496 .bind(target_id)
497 .bind(reason)
498 .execute(pool)
499 .await?;
500 Ok(())
501 }
502
503 // ============================================================================
504 // Suspension mutations
505 // ============================================================================
506
507 /// Suspend a community.
508 #[tracing::instrument(skip_all)]
509 pub async fn suspend_community(
510 pool: &PgPool,
511 community_id: Uuid,
512 reason: Option<&str>,
513 ) -> Result<(), sqlx::Error> {
514 sqlx::query(
515 "UPDATE communities SET suspended_at = now(), suspension_reason = $2 WHERE id = $1",
516 )
517 .bind(community_id)
518 .bind(reason)
519 .execute(pool)
520 .await?;
521 Ok(())
522 }
523
524 /// Unsuspend a community.
525 #[tracing::instrument(skip_all)]
526 pub async fn unsuspend_community(
527 pool: &PgPool,
528 community_id: Uuid,
529 ) -> Result<(), sqlx::Error> {
530 sqlx::query(
531 "UPDATE communities SET suspended_at = NULL, suspension_reason = NULL WHERE id = $1",
532 )
533 .bind(community_id)
534 .execute(pool)
535 .await?;
536 Ok(())
537 }
538
539 /// Set the community moderation state. See [`CommunityState`] for semantics.
540 #[tracing::instrument(skip_all)]
541 pub async fn set_community_state(
542 pool: &PgPool,
543 community_id: Uuid,
544 state: CommunityState,
545 ) -> Result<(), sqlx::Error> {
546 sqlx::query("UPDATE communities SET state = $2 WHERE id = $1")
547 .bind(community_id)
548 .bind(state.as_str())
549 .execute(pool)
550 .await?;
551 Ok(())
552 }
553
554 /// Result of a clean-slate operation.
555 pub struct CleanSlateResult {
556 /// Number of threads deleted (excluding the system reset thread that's
557 /// then inserted). Useful for the success toast.
558 pub deleted_thread_count: i64,
559 /// ID of the system "Community reset" thread created in the first
560 /// category. `None` if the community has no categories — clean-slate
561 /// still deletes threads but has nowhere to post the notice.
562 pub system_thread_id: Option<Uuid>,
563 }
564
565 /// Clean-slate a community: delete all threads (and the posts / footnotes /
566 /// endorsements / flags / read-positions that cascade from them) while
567 /// preserving the community row, categories, memberships, bans, mutes, and
568 /// tags. Posts a system thread "Community reset by &lt;actor&gt; on &lt;date&gt;"
569 /// in the first category by `sort_order`.
570 ///
571 /// Authorization is the caller's responsibility (see `routes/admin.rs`); this
572 /// mutation only enforces atomicity.
573 #[tracing::instrument(skip_all)]
574 pub async fn clean_slate_community(
575 pool: &PgPool,
576 community_id: Uuid,
577 actor_id: Uuid,
578 actor_display: &str,
579 ) -> Result<CleanSlateResult, sqlx::Error> {
580 let mut tx = pool.begin().await?;
581
582 // Delete every thread whose category belongs to this community. Cascades
583 // reap posts, footnotes, endorsements, flags, read-positions, link
584 // previews, mentions, and tag joins.
585 let deleted: i64 = sqlx::query_scalar(
586 "WITH d AS (
587 DELETE FROM threads
588 WHERE category_id IN (SELECT id FROM categories WHERE community_id = $1)
589 RETURNING 1
590 )
591 SELECT COUNT(*) FROM d",
592 )
593 .bind(community_id)
594 .fetch_one(&mut *tx)
595 .await?;
596
597 // Pick the first category by sort_order to host the reset notice. None
598 // means the community has no categories — nothing to post into.
599 let first_category: Option<Uuid> = sqlx::query_scalar(
600 "SELECT id FROM categories
601 WHERE community_id = $1
602 ORDER BY sort_order
603 LIMIT 1",
604 )
605 .bind(community_id)
606 .fetch_optional(&mut *tx)
607 .await?;
608
609 let system_thread_id = if let Some(cat_id) = first_category {
610 let now = Utc::now();
611 let date = now.format("%Y-%m-%d").to_string();
612 let title = format!("Community reset by {actor_display} on {date}");
613 let body_md = format!(
614 "This community was reset by **{actor_display}** on {date}. All previous threads have been cleared. Settings, categories, members, and bans are preserved.",
615 );
616 let body_html = format!(
617 "<p>This community was reset by <strong>{}</strong> on {}. All previous threads have been cleared. Settings, categories, members, and bans are preserved.</p>",
618 html_escape(actor_display),
619 date,
620 );
621
622 let thread_id: (Uuid,) = sqlx::query_as(
623 "INSERT INTO threads (category_id, author_id, title, pinned, locked)
624 VALUES ($1, $2, $3, TRUE, TRUE)
625 RETURNING id",
626 )
627 .bind(cat_id)
628 .bind(actor_id)
629 .bind(&title)
630 .fetch_one(&mut *tx)
631 .await?;
632
633 sqlx::query(
634 "INSERT INTO posts (thread_id, author_id, body_markdown, body_html)
635 VALUES ($1, $2, $3, $4)",
636 )
637 .bind(thread_id.0)
638 .bind(actor_id)
639 .bind(&body_md)
640 .bind(&body_html)
641 .execute(&mut *tx)
642 .await?;
643
644 Some(thread_id.0)
645 } else {
646 None
647 };
648
649 tx.commit().await?;
650 Ok(CleanSlateResult { deleted_thread_count: deleted, system_thread_id })
651 }
652
653 /// Minimal HTML-escape for actor display names embedded in the reset notice.
654 /// We render the notice as a literal HTML string (skipping the markdown
655 /// pipeline) so we don't have to thread renderer config into the mutation.
656 fn html_escape(input: &str) -> String {
657 input
658 .replace('&', "&amp;")
659 .replace('<', "&lt;")
660 .replace('>', "&gt;")
661 .replace('"', "&quot;")
662 .replace('\'', "&#39;")
663 }
664
665 /// Suspend a user.
666 #[tracing::instrument(skip_all)]
667 pub async fn suspend_user(
668 pool: &PgPool,
669 user_id: Uuid,
670 reason: Option<&str>,
671 ) -> Result<(), sqlx::Error> {
672 sqlx::query(
673 "UPDATE users SET suspended_at = now(), suspension_reason = $2 WHERE mnw_account_id = $1",
674 )
675 .bind(user_id)
676 .bind(reason)
677 .execute(pool)
678 .await?;
679 Ok(())
680 }
681
682 /// Unsuspend a user.
683 #[tracing::instrument(skip_all)]
684 pub async fn unsuspend_user(
685 pool: &PgPool,
686 user_id: Uuid,
687 ) -> Result<(), sqlx::Error> {
688 sqlx::query(
689 "UPDATE users SET suspended_at = NULL, suspension_reason = NULL WHERE mnw_account_id = $1",
690 )
691 .bind(user_id)
692 .execute(pool)
693 .await?;
694 Ok(())
695 }
696
697 // ============================================================================
698 // Tracked thread mutations
699 // ============================================================================
700
701 /// Track a thread (upsert).
702 #[tracing::instrument(skip_all)]
703 pub async fn track_thread(
704 pool: &PgPool,
705 user_id: Uuid,
706 thread_id: Uuid,
707 ) -> Result<(), sqlx::Error> {
708 sqlx::query(
709 "INSERT INTO tracked_threads (user_id, thread_id)
710 VALUES ($1, $2)
711 ON CONFLICT (user_id, thread_id) DO NOTHING",
712 )
713 .bind(user_id)
714 .bind(thread_id)
715 .execute(pool)
716 .await?;
717 Ok(())
718 }
719
720 /// Untrack a thread.
721 #[tracing::instrument(skip_all)]
722 pub async fn untrack_thread(
723 pool: &PgPool,
724 user_id: Uuid,
725 thread_id: Uuid,
726 ) -> Result<(), sqlx::Error> {
727 sqlx::query(
728 "DELETE FROM tracked_threads WHERE user_id = $1 AND thread_id = $2",
729 )
730 .bind(user_id)
731 .bind(thread_id)
732 .execute(pool)
733 .await?;
734 Ok(())
735 }
736
737 /// Stop tracking all threads for a user.
738 #[tracing::instrument(skip_all)]
739 pub async fn untrack_all(
740 pool: &PgPool,
741 user_id: Uuid,
742 ) -> Result<(), sqlx::Error> {
743 sqlx::query("DELETE FROM tracked_threads WHERE user_id = $1")
744 .bind(user_id)
745 .execute(pool)
746 .await?;
747 Ok(())
748 }
749
750 /// Update the read position for a tracked thread (set last_read_post_id to the last post).
751 #[tracing::instrument(skip_all)]
752 pub async fn update_read_position(
753 pool: &PgPool,
754 user_id: Uuid,
755 thread_id: Uuid,
756 last_post_id: Uuid,
757 ) -> Result<(), sqlx::Error> {
758 sqlx::query(
759 "UPDATE tracked_threads SET last_read_post_id = $3
760 WHERE user_id = $1 AND thread_id = $2",
761 )
762 .bind(user_id)
763 .bind(thread_id)
764 .bind(last_post_id)
765 .execute(pool)
766 .await?;
767 Ok(())
768 }
769
770 // ============================================================================
771 // Tag mutations
772 // ============================================================================
773
774 /// Create a tag in a community. Returns the tag ID.
775 #[tracing::instrument(skip_all)]
776 pub async fn create_tag(
777 pool: &PgPool,
778 community_id: Uuid,
779 name: &str,
780 slug: &str,
781 ) -> Result<Uuid, sqlx::Error> {
782 let row: (Uuid,) = sqlx::query_as(
783 "INSERT INTO tags (community_id, name, slug) VALUES ($1, $2, $3) RETURNING id",
784 )
785 .bind(community_id)
786 .bind(name)
787 .bind(slug)
788 .fetch_one(pool)
789 .await?;
790 Ok(row.0)
791 }
792
793 /// Delete a tag (CASCADE removes thread_tags rows), scoped to community.
794 #[tracing::instrument(skip_all)]
795 pub async fn delete_tag(pool: &PgPool, tag_id: Uuid, community_id: Uuid) -> Result<bool, sqlx::Error> {
796 let result = sqlx::query("DELETE FROM tags WHERE id = $1 AND community_id = $2")
797 .bind(tag_id)
798 .bind(community_id)
799 .execute(pool)
800 .await?;
801 Ok(result.rows_affected() > 0)
802 }
803
804 /// Set the tags for a thread (delete existing + batch insert new, in a transaction).
805 #[tracing::instrument(skip_all)]
806 pub async fn set_thread_tags(
807 pool: &PgPool,
808 thread_id: Uuid,
809 tag_ids: &[Uuid],
810 ) -> Result<(), sqlx::Error> {
811 let mut tx = pool.begin().await?;
812 sqlx::query("DELETE FROM thread_tags WHERE thread_id = $1")
813 .bind(thread_id)
814 .execute(&mut *tx)
815 .await?;
816 if !tag_ids.is_empty() {
817 // Build multi-row INSERT: VALUES ($1, $2), ($1, $3), ...
818 let mut sql = String::from("INSERT INTO thread_tags (thread_id, tag_id) VALUES ");
819 for (i, _) in tag_ids.iter().enumerate() {
820 if i > 0 {
821 sql.push_str(", ");
822 }
823 // $1 = thread_id, $2.. = tag_ids
824 sql.push_str(&format!("($1, ${})", i + 2));
825 }
826 let mut query = sqlx::query(&sql).bind(thread_id);
827 for tag_id in tag_ids {
828 query = query.bind(tag_id);
829 }
830 query.execute(&mut *tx).await?;
831 }
832 tx.commit().await?;
833 Ok(())
834 }
835
836 // ============================================================================
837 // Flag mutations
838 // ============================================================================
839
840 /// Insert a flag on a post. ON CONFLICT DO NOTHING (idempotent per user+post).
841 #[tracing::instrument(skip_all)]
842 pub async fn insert_flag(
843 pool: &PgPool,
844 post_id: Uuid,
845 flagger_id: Uuid,
846 reason: &str,
847 detail: Option<&str>,
848 ) -> Result<(), sqlx::Error> {
849 sqlx::query(
850 "INSERT INTO post_flags (post_id, flagger_id, reason, detail)
851 VALUES ($1, $2, $3, $4)
852 ON CONFLICT (post_id, flagger_id) DO NOTHING",
853 )
854 .bind(post_id)
855 .bind(flagger_id)
856 .bind(reason)
857 .bind(detail)
858 .execute(pool)
859 .await?;
860 Ok(())
861 }
862
863 /// Resolve a single flag.
864 #[tracing::instrument(skip_all)]
865 pub async fn resolve_flag(
866 pool: &PgPool,
867 flag_id: Uuid,
868 resolved_by: Uuid,
869 resolution: &str,
870 ) -> Result<(), sqlx::Error> {
871 sqlx::query(
872 "UPDATE post_flags SET resolved_at = now(), resolved_by = $2, resolution = $3
873 WHERE id = $1 AND resolved_at IS NULL",
874 )
875 .bind(flag_id)
876 .bind(resolved_by)
877 .bind(resolution)
878 .execute(pool)
879 .await?;
880 Ok(())
881 }
882
883 /// Resolve all unresolved flags for a given post.
884 #[tracing::instrument(skip_all)]
885 pub async fn resolve_all_flags_for_post(
886 pool: &PgPool,
887 post_id: Uuid,
888 resolved_by: Uuid,
889 resolution: &str,
890 ) -> Result<(), sqlx::Error> {
891 sqlx::query(
892 "UPDATE post_flags SET resolved_at = now(), resolved_by = $2, resolution = $3
893 WHERE post_id = $1 AND resolved_at IS NULL",
894 )
895 .bind(post_id)
896 .bind(resolved_by)
897 .bind(resolution)
898 .execute(pool)
899 .await?;
900 Ok(())
901 }
902
903 // ============================================================================
904 // Link preview mutations
905 // ============================================================================
906
907 /// Insert a link preview for a post. Ignores duplicates.
908 #[tracing::instrument(skip_all)]
909 pub async fn insert_link_preview(
910 pool: &PgPool,
911 post_id: Uuid,
912 url: &str,
913 title: Option<&str>,
914 description: Option<&str>,
915 ) -> Result<(), sqlx::Error> {
916 sqlx::query(
917 "INSERT INTO link_previews (post_id, url, title, description)
918 VALUES ($1, $2, $3, $4)
919 ON CONFLICT (post_id, url) DO NOTHING",
920 )
921 .bind(post_id)
922 .bind(url)
923 .bind(title)
924 .bind(description)
925 .execute(pool)
926 .await?;
927 Ok(())
928 }
929
930 // ============================================================================
931 // Mention mutations
932 // ============================================================================
933
934 /// Insert mention rows for a post (batch insert). Ignores duplicates.
935 #[tracing::instrument(skip_all)]
936 pub async fn insert_mentions(
937 pool: &PgPool,
938 post_id: Uuid,
939 user_ids: &[Uuid],
940 ) -> Result<(), sqlx::Error> {
941 if user_ids.is_empty() {
942 return Ok(());
943 }
944 // Build multi-row INSERT: VALUES ($1, $2), ($1, $3), ... ON CONFLICT DO NOTHING
945 let mut sql = String::from("INSERT INTO post_mentions (post_id, mentioned_user_id) VALUES ");
946 for (i, _) in user_ids.iter().enumerate() {
947 if i > 0 {
948 sql.push_str(", ");
949 }
950 sql.push_str(&format!("($1, ${})", i + 2));
951 }
952 sql.push_str(" ON CONFLICT DO NOTHING");
953 let mut query = sqlx::query(&sql).bind(post_id);
954 for user_id in user_ids {
955 query = query.bind(user_id);
956 }
957 query.execute(pool).await?;
958 Ok(())
959 }
960
961 // ============================================================================
962 // Endorsement mutations
963 // ============================================================================
964
965 /// Toggle endorsement: insert if missing, delete if exists. Returns true if now endorsed.
966 /// Uses a transaction to prevent race conditions between concurrent toggle requests.
967 #[tracing::instrument(skip_all)]
968 pub async fn toggle_endorsement(
969 pool: &PgPool,
970 post_id: Uuid,
971 endorser_id: Uuid,
972 ) -> Result<bool, sqlx::Error> {
973 let mut tx = pool.begin().await?;
974
975 let result = sqlx::query(
976 "INSERT INTO post_endorsements (post_id, endorser_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
977 )
978 .bind(post_id)
979 .bind(endorser_id)
980 .execute(&mut *tx)
981 .await?;
982
983 if result.rows_affected() == 0 {
984 // Already existed — remove it
985 sqlx::query("DELETE FROM post_endorsements WHERE post_id = $1 AND endorser_id = $2")
986 .bind(post_id)
987 .bind(endorser_id)
988 .execute(&mut *tx)
989 .await?;
990 tx.commit().await?;
991 Ok(false)
992 } else {
993 tx.commit().await?;
994 Ok(true)
995 }
996 }
997
998 // ============================================================================
999 // Image uploads
1000 // ============================================================================
1001
1002 /// Insert an uploaded image record.
1003 #[tracing::instrument(skip_all)]
1004 pub async fn insert_image(
1005 pool: &PgPool,
1006 uploader_id: Uuid,
1007 community_id: Uuid,
1008 s3_key: &str,
1009 filename: &str,
1010 content_type: &str,
1011 size_bytes: i64,
1012 ) -> Result<Uuid, sqlx::Error> {
1013 sqlx::query_scalar(
1014 "INSERT INTO images (uploader_id, community_id, s3_key, filename, content_type, size_bytes)
1015 VALUES ($1, $2, $3, $4, $5, $6)
1016 RETURNING id",
1017 )
1018 .bind(uploader_id)
1019 .bind(community_id)
1020 .bind(s3_key)
1021 .bind(filename)
1022 .bind(content_type)
1023 .bind(size_bytes)
1024 .fetch_one(pool)
1025 .await
1026 }
1027
1028 /// Mark an image as removed by a moderator.
1029 #[tracing::instrument(skip_all)]
1030 pub async fn remove_image(
1031 pool: &PgPool,
1032 image_id: Uuid,
1033 removed_by: Uuid,
1034 ) -> Result<(), sqlx::Error> {
1035 sqlx::query(
1036 "UPDATE images SET removed_at = now(), removed_by = $2 WHERE id = $1 AND removed_at IS NULL",
1037 )
1038 .bind(image_id)
1039 .bind(removed_by)
1040 .execute(pool)
1041 .await?;
1042 Ok(())
1043 }
1044