Skip to main content

max / multithreaded

22.9 KB · 880 lines History Blame Raw
1 //! Database write mutations — inserts, updates, deletes.
2
3 use chrono::{DateTime, Utc};
4 use mt_core::types::{BanType, ModAction};
5 use sqlx::PgPool;
6 use uuid::Uuid;
7
8 /// Create a new community and return its ID.
9 #[tracing::instrument(skip_all)]
10 pub async fn create_community(
11 pool: &PgPool,
12 name: &str,
13 slug: &str,
14 description: Option<&str>,
15 ) -> Result<Uuid, sqlx::Error> {
16 let row: (Uuid,) = sqlx::query_as(
17 "INSERT INTO communities (name, slug, description)
18 VALUES ($1, $2, $3)
19 RETURNING id",
20 )
21 .bind(name)
22 .bind(slug)
23 .bind(description)
24 .fetch_one(pool)
25 .await?;
26 Ok(row.0)
27 }
28
29 /// Upsert a user from MNW account data. Creates the user if they don't exist,
30 /// updates username/display_name if they do.
31 #[tracing::instrument(skip_all)]
32 pub async fn upsert_user(
33 pool: &PgPool,
34 mnw_account_id: Uuid,
35 username: &str,
36 display_name: Option<&str>,
37 ) -> Result<(), sqlx::Error> {
38 sqlx::query(
39 "INSERT INTO users (mnw_account_id, username, display_name)
40 VALUES ($1, $2, $3)
41 ON CONFLICT (mnw_account_id) DO UPDATE
42 SET username = $2, display_name = $3, updated_at = now()",
43 )
44 .bind(mnw_account_id)
45 .bind(username)
46 .bind(display_name)
47 .execute(pool)
48 .await?;
49 Ok(())
50 }
51
52 /// Ensure a user has a membership in a community with the given role.
53 /// Creates membership if none exists, does nothing if already a member.
54 #[tracing::instrument(skip_all)]
55 pub async fn ensure_membership_with_role(
56 pool: &PgPool,
57 user_id: Uuid,
58 community_id: Uuid,
59 role: &str,
60 ) -> Result<(), sqlx::Error> {
61 sqlx::query(
62 "INSERT INTO memberships (user_id, community_id, role)
63 VALUES ($1, $2, $3)
64 ON CONFLICT (user_id, community_id) DO NOTHING",
65 )
66 .bind(user_id)
67 .bind(community_id)
68 .bind(role)
69 .execute(pool)
70 .await?;
71 Ok(())
72 }
73
74 /// Create a thread with an external reference. Returns the thread ID.
75 #[tracing::instrument(skip_all)]
76 pub async fn create_thread_with_external_ref(
77 pool: &PgPool,
78 category_id: Uuid,
79 author_id: Uuid,
80 title: &str,
81 external_ref: &str,
82 ) -> Result<Uuid, sqlx::Error> {
83 let row: (Uuid,) = sqlx::query_as(
84 "INSERT INTO threads (category_id, author_id, title, external_ref)
85 VALUES ($1, $2, $3, $4)
86 RETURNING id",
87 )
88 .bind(category_id)
89 .bind(author_id)
90 .bind(title)
91 .bind(external_ref)
92 .fetch_one(pool)
93 .await?;
94 Ok(row.0)
95 }
96
97 /// Ensure a user has a membership in a community. Creates a 'member' role if none exists.
98 #[tracing::instrument(skip_all)]
99 pub async fn ensure_membership(
100 pool: &PgPool,
101 user_id: Uuid,
102 community_id: Uuid,
103 ) -> Result<(), sqlx::Error> {
104 sqlx::query(
105 "INSERT INTO memberships (user_id, community_id, role)
106 VALUES ($1, $2, 'member')
107 ON CONFLICT (user_id, community_id) DO NOTHING",
108 )
109 .bind(user_id)
110 .bind(community_id)
111 .execute(pool)
112 .await?;
113 Ok(())
114 }
115
116 /// Insert a new thread and return its ID.
117 #[tracing::instrument(skip_all)]
118 pub async fn create_thread(
119 pool: &PgPool,
120 category_id: Uuid,
121 author_id: Uuid,
122 title: &str,
123 ) -> Result<Uuid, sqlx::Error> {
124 let row: (Uuid,) = sqlx::query_as(
125 "INSERT INTO threads (category_id, author_id, title)
126 VALUES ($1, $2, $3)
127 RETURNING id",
128 )
129 .bind(category_id)
130 .bind(author_id)
131 .bind(title)
132 .fetch_one(pool)
133 .await?;
134 Ok(row.0)
135 }
136
137 /// Insert a new post and bump the thread's last_activity_at atomically.
138 #[tracing::instrument(skip_all)]
139 pub async fn create_post(
140 pool: &PgPool,
141 thread_id: Uuid,
142 author_id: Uuid,
143 body_markdown: &str,
144 body_html: &str,
145 ) -> Result<Uuid, sqlx::Error> {
146 let mut tx = pool.begin().await?;
147
148 let row: (Uuid,) = sqlx::query_as(
149 "INSERT INTO posts (thread_id, author_id, body_markdown, body_html)
150 VALUES ($1, $2, $3, $4)
151 RETURNING id",
152 )
153 .bind(thread_id)
154 .bind(author_id)
155 .bind(body_markdown)
156 .bind(body_html)
157 .fetch_one(&mut *tx)
158 .await?;
159
160 sqlx::query("UPDATE threads SET last_activity_at = now() WHERE id = $1")
161 .bind(thread_id)
162 .execute(&mut *tx)
163 .await?;
164
165 tx.commit().await?;
166 Ok(row.0)
167 }
168
169 /// Insert a footnote on a post. Returns the footnote ID.
170 #[tracing::instrument(skip_all)]
171 pub async fn insert_footnote(
172 pool: &PgPool,
173 post_id: Uuid,
174 author_id: Uuid,
175 body_markdown: &str,
176 body_html: &str,
177 ) -> Result<Uuid, sqlx::Error> {
178 let row: (Uuid,) = sqlx::query_as(
179 "INSERT INTO post_footnotes (post_id, author_id, body_markdown, body_html)
180 VALUES ($1, $2, $3, $4)
181 RETURNING id",
182 )
183 .bind(post_id)
184 .bind(author_id)
185 .bind(body_markdown)
186 .bind(body_html)
187 .fetch_one(pool)
188 .await?;
189 Ok(row.0)
190 }
191
192 /// Mod-remove a post: set removed_by/removed_at. Content stays intact for audit.
193 /// Returns true if the post was actually removed (false if already removed).
194 #[tracing::instrument(skip_all)]
195 pub async fn mod_remove_post(
196 pool: &PgPool,
197 post_id: Uuid,
198 removed_by_id: Uuid,
199 ) -> Result<bool, sqlx::Error> {
200 let result = sqlx::query(
201 "UPDATE posts SET removed_by = $2, removed_at = now()
202 WHERE id = $1 AND removed_at IS NULL",
203 )
204 .bind(post_id)
205 .bind(removed_by_id)
206 .execute(pool)
207 .await?;
208 Ok(result.rows_affected() > 0)
209 }
210
211 /// Atomically auto-hide a post if pending flag count meets the threshold.
212 /// Combines count check and removal in a single query to avoid race conditions.
213 /// Returns true if the post was actually removed.
214 #[tracing::instrument(skip_all)]
215 pub async fn auto_hide_if_threshold_met(
216 pool: &PgPool,
217 post_id: Uuid,
218 removed_by_id: Uuid,
219 threshold: i32,
220 ) -> Result<bool, sqlx::Error> {
221 let result = sqlx::query(
222 "UPDATE posts SET removed_by = $2, removed_at = now()
223 WHERE id = $1 AND removed_at IS NULL
224 AND (SELECT COUNT(*) FROM post_flags WHERE post_id = $1 AND resolved_at IS NULL) >= $3",
225 )
226 .bind(post_id)
227 .bind(removed_by_id)
228 .bind(threshold as i64)
229 .execute(pool)
230 .await?;
231 Ok(result.rows_affected() > 0)
232 }
233
234 /// Update a thread's title.
235 #[tracing::instrument(skip_all)]
236 pub async fn update_thread_title(
237 pool: &PgPool,
238 thread_id: Uuid,
239 title: &str,
240 ) -> Result<(), sqlx::Error> {
241 sqlx::query("UPDATE threads SET title = $2 WHERE id = $1")
242 .bind(thread_id)
243 .bind(title)
244 .execute(pool)
245 .await?;
246 Ok(())
247 }
248
249 /// Soft-delete a thread: set deleted_at (hides from listings).
250 #[tracing::instrument(skip_all)]
251 pub async fn soft_delete_thread(pool: &PgPool, thread_id: Uuid) -> Result<(), sqlx::Error> {
252 sqlx::query("UPDATE threads SET deleted_at = now() WHERE id = $1")
253 .bind(thread_id)
254 .execute(pool)
255 .await?;
256 Ok(())
257 }
258
259 /// Set or unset the pinned flag on a thread.
260 #[tracing::instrument(skip_all)]
261 pub async fn set_thread_pinned(
262 pool: &PgPool,
263 thread_id: Uuid,
264 pinned: bool,
265 ) -> Result<(), sqlx::Error> {
266 sqlx::query("UPDATE threads SET pinned = $2 WHERE id = $1")
267 .bind(thread_id)
268 .bind(pinned)
269 .execute(pool)
270 .await?;
271 Ok(())
272 }
273
274 /// Set or unset the locked flag on a thread.
275 #[tracing::instrument(skip_all)]
276 pub async fn set_thread_locked(
277 pool: &PgPool,
278 thread_id: Uuid,
279 locked: bool,
280 ) -> Result<(), sqlx::Error> {
281 sqlx::query("UPDATE threads SET locked = $2 WHERE id = $1")
282 .bind(thread_id)
283 .bind(locked)
284 .execute(pool)
285 .await?;
286 Ok(())
287 }
288
289 /// Update a community's name and description.
290 #[tracing::instrument(skip_all)]
291 pub async fn update_community(
292 pool: &PgPool,
293 community_id: Uuid,
294 name: &str,
295 description: Option<&str>,
296 auto_hide_threshold: Option<i32>,
297 ) -> Result<(), sqlx::Error> {
298 sqlx::query(
299 "UPDATE communities SET name = $2, description = $3, auto_hide_threshold = $4 WHERE id = $1",
300 )
301 .bind(community_id)
302 .bind(name)
303 .bind(description)
304 .bind(auto_hide_threshold)
305 .execute(pool)
306 .await?;
307 Ok(())
308 }
309
310 /// Create a new category in a community.
311 #[tracing::instrument(skip_all)]
312 pub async fn create_category(
313 pool: &PgPool,
314 community_id: Uuid,
315 name: &str,
316 slug: &str,
317 description: Option<&str>,
318 sort_order: i32,
319 ) -> Result<Uuid, sqlx::Error> {
320 let row: (Uuid,) = sqlx::query_as(
321 "INSERT INTO categories (community_id, name, slug, description, sort_order)
322 VALUES ($1, $2, $3, $4, $5)
323 RETURNING id",
324 )
325 .bind(community_id)
326 .bind(name)
327 .bind(slug)
328 .bind(description)
329 .bind(sort_order)
330 .fetch_one(pool)
331 .await?;
332 Ok(row.0)
333 }
334
335 /// Update a category's name and description.
336 #[tracing::instrument(skip_all)]
337 pub async fn update_category(
338 pool: &PgPool,
339 category_id: Uuid,
340 name: &str,
341 description: Option<&str>,
342 ) -> Result<(), sqlx::Error> {
343 sqlx::query("UPDATE categories SET name = $2, description = $3 WHERE id = $1")
344 .bind(category_id)
345 .bind(name)
346 .bind(description)
347 .execute(pool)
348 .await?;
349 Ok(())
350 }
351
352 /// Swap the sort_order of two categories atomically.
353 #[tracing::instrument(skip_all)]
354 pub async fn swap_category_order(
355 pool: &PgPool,
356 id_a: Uuid,
357 order_a: i32,
358 id_b: Uuid,
359 order_b: i32,
360 ) -> Result<(), sqlx::Error> {
361 let mut tx = pool.begin().await?;
362 sqlx::query("UPDATE categories SET sort_order = $2 WHERE id = $1")
363 .bind(id_a)
364 .bind(order_b)
365 .execute(&mut *tx)
366 .await?;
367 sqlx::query("UPDATE categories SET sort_order = $2 WHERE id = $1")
368 .bind(id_b)
369 .bind(order_a)
370 .execute(&mut *tx)
371 .await?;
372 tx.commit().await?;
373 Ok(())
374 }
375
376 /// Look up a category ID by community slug + category slug.
377 #[tracing::instrument(skip_all)]
378 pub async fn get_category_id_by_slugs(
379 pool: &PgPool,
380 community_slug: &str,
381 category_slug: &str,
382 ) -> Result<Option<Uuid>, sqlx::Error> {
383 let row: Option<(Uuid,)> = sqlx::query_as(
384 "SELECT c.id
385 FROM categories c
386 JOIN communities co ON co.id = c.community_id
387 WHERE co.slug = $1 AND c.slug = $2",
388 )
389 .bind(community_slug)
390 .bind(category_slug)
391 .fetch_optional(pool)
392 .await?;
393 Ok(row.map(|r| r.0))
394 }
395
396 // ============================================================================
397 // Ban / mute mutations
398 // ============================================================================
399
400 /// Create or update a ban/mute. Returns the ban ID.
401 #[tracing::instrument(skip_all)]
402 pub async fn create_community_ban(
403 pool: &PgPool,
404 community_id: Uuid,
405 user_id: Uuid,
406 banned_by: Uuid,
407 ban_type: BanType,
408 reason: Option<&str>,
409 expires_at: Option<DateTime<Utc>>,
410 ) -> Result<Uuid, sqlx::Error> {
411 let row: (Uuid,) = sqlx::query_as(
412 "INSERT INTO community_bans (community_id, user_id, banned_by, ban_type, reason, expires_at)
413 VALUES ($1, $2, $3, $4, $5, $6)
414 ON CONFLICT (community_id, user_id, ban_type) DO UPDATE
415 SET banned_by = $3, reason = $5, expires_at = $6, created_at = now()
416 RETURNING id",
417 )
418 .bind(community_id)
419 .bind(user_id)
420 .bind(banned_by)
421 .bind(ban_type.as_str())
422 .bind(reason)
423 .bind(expires_at)
424 .fetch_one(pool)
425 .await?;
426 Ok(row.0)
427 }
428
429 /// Delete all bans/mutes whose expiration has passed.
430 #[tracing::instrument(skip_all)]
431 pub async fn cleanup_expired_bans(pool: &PgPool, community_id: Uuid) -> Result<u64, sqlx::Error> {
432 let result = sqlx::query(
433 "DELETE FROM community_bans
434 WHERE community_id = $1 AND expires_at IS NOT NULL AND expires_at <= now()",
435 )
436 .bind(community_id)
437 .execute(pool)
438 .await?;
439 Ok(result.rows_affected())
440 }
441
442 /// Remove a ban or mute.
443 #[tracing::instrument(skip_all)]
444 pub async fn remove_community_ban(
445 pool: &PgPool,
446 community_id: Uuid,
447 user_id: Uuid,
448 ban_type: BanType,
449 ) -> Result<(), sqlx::Error> {
450 sqlx::query(
451 "DELETE FROM community_bans
452 WHERE community_id = $1 AND user_id = $2 AND ban_type = $3",
453 )
454 .bind(community_id)
455 .bind(user_id)
456 .bind(ban_type.as_str())
457 .execute(pool)
458 .await?;
459 Ok(())
460 }
461
462 /// Insert a mod log entry.
463 #[tracing::instrument(skip_all)]
464 pub async fn insert_mod_log(
465 pool: &PgPool,
466 community_id: Option<Uuid>,
467 actor_id: Uuid,
468 action: ModAction,
469 target_user: Option<Uuid>,
470 target_id: Option<Uuid>,
471 reason: Option<&str>,
472 ) -> Result<(), sqlx::Error> {
473 sqlx::query(
474 "INSERT INTO mod_log (community_id, actor_id, action, target_user, target_id, reason)
475 VALUES ($1, $2, $3, $4, $5, $6)",
476 )
477 .bind(community_id)
478 .bind(actor_id)
479 .bind(action.as_str())
480 .bind(target_user)
481 .bind(target_id)
482 .bind(reason)
483 .execute(pool)
484 .await?;
485 Ok(())
486 }
487
488 // ============================================================================
489 // Suspension mutations
490 // ============================================================================
491
492 /// Suspend a community.
493 #[tracing::instrument(skip_all)]
494 pub async fn suspend_community(
495 pool: &PgPool,
496 community_id: Uuid,
497 reason: Option<&str>,
498 ) -> Result<(), sqlx::Error> {
499 sqlx::query(
500 "UPDATE communities SET suspended_at = now(), suspension_reason = $2 WHERE id = $1",
501 )
502 .bind(community_id)
503 .bind(reason)
504 .execute(pool)
505 .await?;
506 Ok(())
507 }
508
509 /// Unsuspend a community.
510 #[tracing::instrument(skip_all)]
511 pub async fn unsuspend_community(
512 pool: &PgPool,
513 community_id: Uuid,
514 ) -> Result<(), sqlx::Error> {
515 sqlx::query(
516 "UPDATE communities SET suspended_at = NULL, suspension_reason = NULL WHERE id = $1",
517 )
518 .bind(community_id)
519 .execute(pool)
520 .await?;
521 Ok(())
522 }
523
524 /// Suspend a user.
525 #[tracing::instrument(skip_all)]
526 pub async fn suspend_user(
527 pool: &PgPool,
528 user_id: Uuid,
529 reason: Option<&str>,
530 ) -> Result<(), sqlx::Error> {
531 sqlx::query(
532 "UPDATE users SET suspended_at = now(), suspension_reason = $2 WHERE mnw_account_id = $1",
533 )
534 .bind(user_id)
535 .bind(reason)
536 .execute(pool)
537 .await?;
538 Ok(())
539 }
540
541 /// Unsuspend a user.
542 #[tracing::instrument(skip_all)]
543 pub async fn unsuspend_user(
544 pool: &PgPool,
545 user_id: Uuid,
546 ) -> Result<(), sqlx::Error> {
547 sqlx::query(
548 "UPDATE users SET suspended_at = NULL, suspension_reason = NULL WHERE mnw_account_id = $1",
549 )
550 .bind(user_id)
551 .execute(pool)
552 .await?;
553 Ok(())
554 }
555
556 // ============================================================================
557 // Tracked thread mutations
558 // ============================================================================
559
560 /// Track a thread (upsert).
561 #[tracing::instrument(skip_all)]
562 pub async fn track_thread(
563 pool: &PgPool,
564 user_id: Uuid,
565 thread_id: Uuid,
566 ) -> Result<(), sqlx::Error> {
567 sqlx::query(
568 "INSERT INTO tracked_threads (user_id, thread_id)
569 VALUES ($1, $2)
570 ON CONFLICT (user_id, thread_id) DO NOTHING",
571 )
572 .bind(user_id)
573 .bind(thread_id)
574 .execute(pool)
575 .await?;
576 Ok(())
577 }
578
579 /// Untrack a thread.
580 #[tracing::instrument(skip_all)]
581 pub async fn untrack_thread(
582 pool: &PgPool,
583 user_id: Uuid,
584 thread_id: Uuid,
585 ) -> Result<(), sqlx::Error> {
586 sqlx::query(
587 "DELETE FROM tracked_threads WHERE user_id = $1 AND thread_id = $2",
588 )
589 .bind(user_id)
590 .bind(thread_id)
591 .execute(pool)
592 .await?;
593 Ok(())
594 }
595
596 /// Stop tracking all threads for a user.
597 #[tracing::instrument(skip_all)]
598 pub async fn untrack_all(
599 pool: &PgPool,
600 user_id: Uuid,
601 ) -> Result<(), sqlx::Error> {
602 sqlx::query("DELETE FROM tracked_threads WHERE user_id = $1")
603 .bind(user_id)
604 .execute(pool)
605 .await?;
606 Ok(())
607 }
608
609 /// Update the read position for a tracked thread (set last_read_post_id to the last post).
610 #[tracing::instrument(skip_all)]
611 pub async fn update_read_position(
612 pool: &PgPool,
613 user_id: Uuid,
614 thread_id: Uuid,
615 last_post_id: Uuid,
616 ) -> Result<(), sqlx::Error> {
617 sqlx::query(
618 "UPDATE tracked_threads SET last_read_post_id = $3
619 WHERE user_id = $1 AND thread_id = $2",
620 )
621 .bind(user_id)
622 .bind(thread_id)
623 .bind(last_post_id)
624 .execute(pool)
625 .await?;
626 Ok(())
627 }
628
629 // ============================================================================
630 // Tag mutations
631 // ============================================================================
632
633 /// Create a tag in a community. Returns the tag ID.
634 #[tracing::instrument(skip_all)]
635 pub async fn create_tag(
636 pool: &PgPool,
637 community_id: Uuid,
638 name: &str,
639 slug: &str,
640 ) -> Result<Uuid, sqlx::Error> {
641 let row: (Uuid,) = sqlx::query_as(
642 "INSERT INTO tags (community_id, name, slug) VALUES ($1, $2, $3) RETURNING id",
643 )
644 .bind(community_id)
645 .bind(name)
646 .bind(slug)
647 .fetch_one(pool)
648 .await?;
649 Ok(row.0)
650 }
651
652 /// Delete a tag (CASCADE removes thread_tags rows).
653 #[tracing::instrument(skip_all)]
654 pub async fn delete_tag(pool: &PgPool, tag_id: Uuid) -> Result<(), sqlx::Error> {
655 sqlx::query("DELETE FROM tags WHERE id = $1")
656 .bind(tag_id)
657 .execute(pool)
658 .await?;
659 Ok(())
660 }
661
662 /// Set the tags for a thread (delete existing + insert new, in a transaction).
663 #[tracing::instrument(skip_all)]
664 pub async fn set_thread_tags(
665 pool: &PgPool,
666 thread_id: Uuid,
667 tag_ids: &[Uuid],
668 ) -> Result<(), sqlx::Error> {
669 let mut tx = pool.begin().await?;
670 sqlx::query("DELETE FROM thread_tags WHERE thread_id = $1")
671 .bind(thread_id)
672 .execute(&mut *tx)
673 .await?;
674 for tag_id in tag_ids {
675 sqlx::query("INSERT INTO thread_tags (thread_id, tag_id) VALUES ($1, $2)")
676 .bind(thread_id)
677 .bind(tag_id)
678 .execute(&mut *tx)
679 .await?;
680 }
681 tx.commit().await?;
682 Ok(())
683 }
684
685 // ============================================================================
686 // Flag mutations
687 // ============================================================================
688
689 /// Insert a flag on a post. ON CONFLICT DO NOTHING (idempotent per user+post).
690 #[tracing::instrument(skip_all)]
691 pub async fn insert_flag(
692 pool: &PgPool,
693 post_id: Uuid,
694 flagger_id: Uuid,
695 reason: &str,
696 detail: Option<&str>,
697 ) -> Result<(), sqlx::Error> {
698 sqlx::query(
699 "INSERT INTO post_flags (post_id, flagger_id, reason, detail)
700 VALUES ($1, $2, $3, $4)
701 ON CONFLICT (post_id, flagger_id) DO NOTHING",
702 )
703 .bind(post_id)
704 .bind(flagger_id)
705 .bind(reason)
706 .bind(detail)
707 .execute(pool)
708 .await?;
709 Ok(())
710 }
711
712 /// Resolve a single flag.
713 #[tracing::instrument(skip_all)]
714 pub async fn resolve_flag(
715 pool: &PgPool,
716 flag_id: Uuid,
717 resolved_by: Uuid,
718 resolution: &str,
719 ) -> Result<(), sqlx::Error> {
720 sqlx::query(
721 "UPDATE post_flags SET resolved_at = now(), resolved_by = $2, resolution = $3
722 WHERE id = $1 AND resolved_at IS NULL",
723 )
724 .bind(flag_id)
725 .bind(resolved_by)
726 .bind(resolution)
727 .execute(pool)
728 .await?;
729 Ok(())
730 }
731
732 /// Resolve all unresolved flags for a given post.
733 #[tracing::instrument(skip_all)]
734 pub async fn resolve_all_flags_for_post(
735 pool: &PgPool,
736 post_id: Uuid,
737 resolved_by: Uuid,
738 resolution: &str,
739 ) -> Result<(), sqlx::Error> {
740 sqlx::query(
741 "UPDATE post_flags SET resolved_at = now(), resolved_by = $2, resolution = $3
742 WHERE post_id = $1 AND resolved_at IS NULL",
743 )
744 .bind(post_id)
745 .bind(resolved_by)
746 .bind(resolution)
747 .execute(pool)
748 .await?;
749 Ok(())
750 }
751
752 // ============================================================================
753 // Link preview mutations
754 // ============================================================================
755
756 /// Insert a link preview for a post. Ignores duplicates.
757 #[tracing::instrument(skip_all)]
758 pub async fn insert_link_preview(
759 pool: &PgPool,
760 post_id: Uuid,
761 url: &str,
762 title: Option<&str>,
763 description: Option<&str>,
764 ) -> Result<(), sqlx::Error> {
765 sqlx::query(
766 "INSERT INTO link_previews (post_id, url, title, description)
767 VALUES ($1, $2, $3, $4)
768 ON CONFLICT (post_id, url) DO NOTHING",
769 )
770 .bind(post_id)
771 .bind(url)
772 .bind(title)
773 .bind(description)
774 .execute(pool)
775 .await?;
776 Ok(())
777 }
778
779 // ============================================================================
780 // Mention mutations
781 // ============================================================================
782
783 /// Insert mention rows for a post. Ignores duplicates.
784 #[tracing::instrument(skip_all)]
785 pub async fn insert_mentions(
786 pool: &PgPool,
787 post_id: Uuid,
788 user_ids: &[Uuid],
789 ) -> Result<(), sqlx::Error> {
790 for user_id in user_ids {
791 sqlx::query(
792 "INSERT INTO post_mentions (post_id, mentioned_user_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
793 )
794 .bind(post_id)
795 .bind(user_id)
796 .execute(pool)
797 .await?;
798 }
799 Ok(())
800 }
801
802 // ============================================================================
803 // Endorsement mutations
804 // ============================================================================
805
806 /// Toggle endorsement: insert if missing, delete if exists. Returns true if now endorsed.
807 #[tracing::instrument(skip_all)]
808 pub async fn toggle_endorsement(
809 pool: &PgPool,
810 post_id: Uuid,
811 endorser_id: Uuid,
812 ) -> Result<bool, sqlx::Error> {
813 let result = sqlx::query(
814 "INSERT INTO post_endorsements (post_id, endorser_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
815 )
816 .bind(post_id)
817 .bind(endorser_id)
818 .execute(pool)
819 .await?;
820
821 if result.rows_affected() == 0 {
822 // Already existed — remove it
823 sqlx::query("DELETE FROM post_endorsements WHERE post_id = $1 AND endorser_id = $2")
824 .bind(post_id)
825 .bind(endorser_id)
826 .execute(pool)
827 .await?;
828 Ok(false)
829 } else {
830 Ok(true)
831 }
832 }
833
834 // ============================================================================
835 // Image uploads
836 // ============================================================================
837
838 /// Insert an uploaded image record.
839 #[tracing::instrument(skip_all)]
840 pub async fn insert_image(
841 pool: &PgPool,
842 uploader_id: Uuid,
843 community_id: Uuid,
844 s3_key: &str,
845 filename: &str,
846 content_type: &str,
847 size_bytes: i64,
848 ) -> Result<Uuid, sqlx::Error> {
849 sqlx::query_scalar(
850 "INSERT INTO images (uploader_id, community_id, s3_key, filename, content_type, size_bytes)
851 VALUES ($1, $2, $3, $4, $5, $6)
852 RETURNING id",
853 )
854 .bind(uploader_id)
855 .bind(community_id)
856 .bind(s3_key)
857 .bind(filename)
858 .bind(content_type)
859 .bind(size_bytes)
860 .fetch_one(pool)
861 .await
862 }
863
864 /// Mark an image as removed by a moderator.
865 #[tracing::instrument(skip_all)]
866 pub async fn remove_image(
867 pool: &PgPool,
868 image_id: Uuid,
869 removed_by: Uuid,
870 ) -> Result<(), sqlx::Error> {
871 sqlx::query(
872 "UPDATE images SET removed_at = now(), removed_by = $2 WHERE id = $1 AND removed_at IS NULL",
873 )
874 .bind(image_id)
875 .bind(removed_by)
876 .execute(pool)
877 .await?;
878 Ok(())
879 }
880