//! Database write mutations — inserts, updates, deletes. use chrono::{DateTime, Utc}; use mt_core::types::{BanType, ModAction}; use sqlx::PgPool; use uuid::Uuid; /// Create a new community and return its ID. #[tracing::instrument(skip_all)] pub async fn create_community( pool: &PgPool, name: &str, slug: &str, description: Option<&str>, ) -> Result { let row: (Uuid,) = sqlx::query_as( "INSERT INTO communities (name, slug, description) VALUES ($1, $2, $3) RETURNING id", ) .bind(name) .bind(slug) .bind(description) .fetch_one(pool) .await?; Ok(row.0) } /// Upsert a user from MNW account data. Creates the user if they don't exist, /// updates username/display_name if they do. #[tracing::instrument(skip_all)] pub async fn upsert_user( pool: &PgPool, mnw_account_id: Uuid, username: &str, display_name: Option<&str>, ) -> Result<(), sqlx::Error> { sqlx::query( "INSERT INTO users (mnw_account_id, username, display_name) VALUES ($1, $2, $3) ON CONFLICT (mnw_account_id) DO UPDATE SET username = $2, display_name = $3, updated_at = now()", ) .bind(mnw_account_id) .bind(username) .bind(display_name) .execute(pool) .await?; Ok(()) } /// Ensure a user has a membership in a community with the given role. /// Creates membership if none exists, does nothing if already a member. #[tracing::instrument(skip_all)] pub async fn ensure_membership_with_role( pool: &PgPool, user_id: Uuid, community_id: Uuid, role: &str, ) -> Result<(), sqlx::Error> { sqlx::query( "INSERT INTO memberships (user_id, community_id, role) VALUES ($1, $2, $3) ON CONFLICT (user_id, community_id) DO NOTHING", ) .bind(user_id) .bind(community_id) .bind(role) .execute(pool) .await?; Ok(()) } /// Create a thread with an external reference. Returns the thread ID. #[tracing::instrument(skip_all)] pub async fn create_thread_with_external_ref( pool: &PgPool, category_id: Uuid, author_id: Uuid, title: &str, external_ref: &str, ) -> Result { let row: (Uuid,) = sqlx::query_as( "INSERT INTO threads (category_id, author_id, title, external_ref) VALUES ($1, $2, $3, $4) RETURNING id", ) .bind(category_id) .bind(author_id) .bind(title) .bind(external_ref) .fetch_one(pool) .await?; Ok(row.0) } /// Ensure a user has a membership in a community. Creates a 'member' role if none exists. #[tracing::instrument(skip_all)] pub async fn ensure_membership( pool: &PgPool, user_id: Uuid, community_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( "INSERT INTO memberships (user_id, community_id, role) VALUES ($1, $2, 'member') ON CONFLICT (user_id, community_id) DO NOTHING", ) .bind(user_id) .bind(community_id) .execute(pool) .await?; Ok(()) } /// Insert a new thread and return its ID. #[tracing::instrument(skip_all)] pub async fn create_thread( pool: &PgPool, category_id: Uuid, author_id: Uuid, title: &str, ) -> Result { let row: (Uuid,) = sqlx::query_as( "INSERT INTO threads (category_id, author_id, title) VALUES ($1, $2, $3) RETURNING id", ) .bind(category_id) .bind(author_id) .bind(title) .fetch_one(pool) .await?; Ok(row.0) } /// Insert a new post and bump the thread's last_activity_at atomically. #[tracing::instrument(skip_all)] pub async fn create_post( pool: &PgPool, thread_id: Uuid, author_id: Uuid, body_markdown: &str, body_html: &str, ) -> Result { let mut tx = pool.begin().await?; let row: (Uuid,) = sqlx::query_as( "INSERT INTO posts (thread_id, author_id, body_markdown, body_html) VALUES ($1, $2, $3, $4) RETURNING id", ) .bind(thread_id) .bind(author_id) .bind(body_markdown) .bind(body_html) .fetch_one(&mut *tx) .await?; sqlx::query("UPDATE threads SET last_activity_at = now() WHERE id = $1") .bind(thread_id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(row.0) } /// Insert a footnote on a post. Returns the footnote ID. #[tracing::instrument(skip_all)] pub async fn insert_footnote( pool: &PgPool, post_id: Uuid, author_id: Uuid, body_markdown: &str, body_html: &str, ) -> Result { let row: (Uuid,) = sqlx::query_as( "INSERT INTO post_footnotes (post_id, author_id, body_markdown, body_html) VALUES ($1, $2, $3, $4) RETURNING id", ) .bind(post_id) .bind(author_id) .bind(body_markdown) .bind(body_html) .fetch_one(pool) .await?; Ok(row.0) } /// Mod-remove a post: set removed_by/removed_at. Content stays intact for audit. /// Returns true if the post was actually removed (false if already removed). #[tracing::instrument(skip_all)] pub async fn mod_remove_post( pool: &PgPool, post_id: Uuid, removed_by_id: Uuid, ) -> Result { let result = sqlx::query( "UPDATE posts SET removed_by = $2, removed_at = now() WHERE id = $1 AND removed_at IS NULL", ) .bind(post_id) .bind(removed_by_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Atomically auto-hide a post if pending flag count meets the threshold. /// Combines count check and removal in a single query to avoid race conditions. /// Returns true if the post was actually removed. #[tracing::instrument(skip_all)] pub async fn auto_hide_if_threshold_met( pool: &PgPool, post_id: Uuid, removed_by_id: Uuid, threshold: i32, ) -> Result { let result = sqlx::query( "UPDATE posts SET removed_by = $2, removed_at = now() WHERE id = $1 AND removed_at IS NULL AND (SELECT COUNT(*) FROM post_flags WHERE post_id = $1 AND resolved_at IS NULL) >= $3", ) .bind(post_id) .bind(removed_by_id) .bind(threshold as i64) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Update a thread's title. #[tracing::instrument(skip_all)] pub async fn update_thread_title( pool: &PgPool, thread_id: Uuid, title: &str, ) -> Result<(), sqlx::Error> { sqlx::query("UPDATE threads SET title = $2 WHERE id = $1") .bind(thread_id) .bind(title) .execute(pool) .await?; Ok(()) } /// Soft-delete a thread: set deleted_at (hides from listings). #[tracing::instrument(skip_all)] pub async fn soft_delete_thread(pool: &PgPool, thread_id: Uuid) -> Result<(), sqlx::Error> { sqlx::query("UPDATE threads SET deleted_at = now() WHERE id = $1") .bind(thread_id) .execute(pool) .await?; Ok(()) } /// Set or unset the pinned flag on a thread. #[tracing::instrument(skip_all)] pub async fn set_thread_pinned( pool: &PgPool, thread_id: Uuid, pinned: bool, ) -> Result<(), sqlx::Error> { sqlx::query("UPDATE threads SET pinned = $2 WHERE id = $1") .bind(thread_id) .bind(pinned) .execute(pool) .await?; Ok(()) } /// Set or unset the locked flag on a thread. #[tracing::instrument(skip_all)] pub async fn set_thread_locked( pool: &PgPool, thread_id: Uuid, locked: bool, ) -> Result<(), sqlx::Error> { sqlx::query("UPDATE threads SET locked = $2 WHERE id = $1") .bind(thread_id) .bind(locked) .execute(pool) .await?; Ok(()) } /// Update a community's name and description. #[tracing::instrument(skip_all)] pub async fn update_community( pool: &PgPool, community_id: Uuid, name: &str, description: Option<&str>, auto_hide_threshold: Option, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE communities SET name = $2, description = $3, auto_hide_threshold = $4 WHERE id = $1", ) .bind(community_id) .bind(name) .bind(description) .bind(auto_hide_threshold) .execute(pool) .await?; Ok(()) } /// Create a new category in a community. #[tracing::instrument(skip_all)] pub async fn create_category( pool: &PgPool, community_id: Uuid, name: &str, slug: &str, description: Option<&str>, sort_order: i32, ) -> Result { let row: (Uuid,) = sqlx::query_as( "INSERT INTO categories (community_id, name, slug, description, sort_order) VALUES ($1, $2, $3, $4, $5) RETURNING id", ) .bind(community_id) .bind(name) .bind(slug) .bind(description) .bind(sort_order) .fetch_one(pool) .await?; Ok(row.0) } /// Update a category's name and description. #[tracing::instrument(skip_all)] pub async fn update_category( pool: &PgPool, category_id: Uuid, name: &str, description: Option<&str>, ) -> Result<(), sqlx::Error> { sqlx::query("UPDATE categories SET name = $2, description = $3 WHERE id = $1") .bind(category_id) .bind(name) .bind(description) .execute(pool) .await?; Ok(()) } /// Swap the sort_order of two categories atomically. #[tracing::instrument(skip_all)] pub async fn swap_category_order( pool: &PgPool, id_a: Uuid, order_a: i32, id_b: Uuid, order_b: i32, ) -> Result<(), sqlx::Error> { let mut tx = pool.begin().await?; sqlx::query("UPDATE categories SET sort_order = $2 WHERE id = $1") .bind(id_a) .bind(order_b) .execute(&mut *tx) .await?; sqlx::query("UPDATE categories SET sort_order = $2 WHERE id = $1") .bind(id_b) .bind(order_a) .execute(&mut *tx) .await?; tx.commit().await?; Ok(()) } /// Look up a category ID by community slug + category slug. #[tracing::instrument(skip_all)] pub async fn get_category_id_by_slugs( pool: &PgPool, community_slug: &str, category_slug: &str, ) -> Result, sqlx::Error> { let row: Option<(Uuid,)> = sqlx::query_as( "SELECT c.id FROM categories c JOIN communities co ON co.id = c.community_id WHERE co.slug = $1 AND c.slug = $2", ) .bind(community_slug) .bind(category_slug) .fetch_optional(pool) .await?; Ok(row.map(|r| r.0)) } // ============================================================================ // Ban / mute mutations // ============================================================================ /// Create or update a ban/mute. Returns the ban ID. #[tracing::instrument(skip_all)] pub async fn create_community_ban( pool: &PgPool, community_id: Uuid, user_id: Uuid, banned_by: Uuid, ban_type: BanType, reason: Option<&str>, expires_at: Option>, ) -> Result { let row: (Uuid,) = sqlx::query_as( "INSERT INTO community_bans (community_id, user_id, banned_by, ban_type, reason, expires_at) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (community_id, user_id, ban_type) DO UPDATE SET banned_by = $3, reason = $5, expires_at = $6, created_at = now() RETURNING id", ) .bind(community_id) .bind(user_id) .bind(banned_by) .bind(ban_type.as_str()) .bind(reason) .bind(expires_at) .fetch_one(pool) .await?; Ok(row.0) } /// Delete all bans/mutes whose expiration has passed. #[tracing::instrument(skip_all)] pub async fn cleanup_expired_bans(pool: &PgPool, community_id: Uuid) -> Result { let result = sqlx::query( "DELETE FROM community_bans WHERE community_id = $1 AND expires_at IS NOT NULL AND expires_at <= now()", ) .bind(community_id) .execute(pool) .await?; Ok(result.rows_affected()) } /// Remove a ban or mute. #[tracing::instrument(skip_all)] pub async fn remove_community_ban( pool: &PgPool, community_id: Uuid, user_id: Uuid, ban_type: BanType, ) -> Result<(), sqlx::Error> { sqlx::query( "DELETE FROM community_bans WHERE community_id = $1 AND user_id = $2 AND ban_type = $3", ) .bind(community_id) .bind(user_id) .bind(ban_type.as_str()) .execute(pool) .await?; Ok(()) } /// Insert a mod log entry. #[tracing::instrument(skip_all)] pub async fn insert_mod_log( pool: &PgPool, community_id: Option, actor_id: Uuid, action: ModAction, target_user: Option, target_id: Option, reason: Option<&str>, ) -> Result<(), sqlx::Error> { sqlx::query( "INSERT INTO mod_log (community_id, actor_id, action, target_user, target_id, reason) VALUES ($1, $2, $3, $4, $5, $6)", ) .bind(community_id) .bind(actor_id) .bind(action.as_str()) .bind(target_user) .bind(target_id) .bind(reason) .execute(pool) .await?; Ok(()) } // ============================================================================ // Suspension mutations // ============================================================================ /// Suspend a community. #[tracing::instrument(skip_all)] pub async fn suspend_community( pool: &PgPool, community_id: Uuid, reason: Option<&str>, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE communities SET suspended_at = now(), suspension_reason = $2 WHERE id = $1", ) .bind(community_id) .bind(reason) .execute(pool) .await?; Ok(()) } /// Unsuspend a community. #[tracing::instrument(skip_all)] pub async fn unsuspend_community( pool: &PgPool, community_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE communities SET suspended_at = NULL, suspension_reason = NULL WHERE id = $1", ) .bind(community_id) .execute(pool) .await?; Ok(()) } /// Suspend a user. #[tracing::instrument(skip_all)] pub async fn suspend_user( pool: &PgPool, user_id: Uuid, reason: Option<&str>, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE users SET suspended_at = now(), suspension_reason = $2 WHERE mnw_account_id = $1", ) .bind(user_id) .bind(reason) .execute(pool) .await?; Ok(()) } /// Unsuspend a user. #[tracing::instrument(skip_all)] pub async fn unsuspend_user( pool: &PgPool, user_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE users SET suspended_at = NULL, suspension_reason = NULL WHERE mnw_account_id = $1", ) .bind(user_id) .execute(pool) .await?; Ok(()) } // ============================================================================ // Tracked thread mutations // ============================================================================ /// Track a thread (upsert). #[tracing::instrument(skip_all)] pub async fn track_thread( pool: &PgPool, user_id: Uuid, thread_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( "INSERT INTO tracked_threads (user_id, thread_id) VALUES ($1, $2) ON CONFLICT (user_id, thread_id) DO NOTHING", ) .bind(user_id) .bind(thread_id) .execute(pool) .await?; Ok(()) } /// Untrack a thread. #[tracing::instrument(skip_all)] pub async fn untrack_thread( pool: &PgPool, user_id: Uuid, thread_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( "DELETE FROM tracked_threads WHERE user_id = $1 AND thread_id = $2", ) .bind(user_id) .bind(thread_id) .execute(pool) .await?; Ok(()) } /// Stop tracking all threads for a user. #[tracing::instrument(skip_all)] pub async fn untrack_all( pool: &PgPool, user_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM tracked_threads WHERE user_id = $1") .bind(user_id) .execute(pool) .await?; Ok(()) } /// Update the read position for a tracked thread (set last_read_post_id to the last post). #[tracing::instrument(skip_all)] pub async fn update_read_position( pool: &PgPool, user_id: Uuid, thread_id: Uuid, last_post_id: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE tracked_threads SET last_read_post_id = $3 WHERE user_id = $1 AND thread_id = $2", ) .bind(user_id) .bind(thread_id) .bind(last_post_id) .execute(pool) .await?; Ok(()) } // ============================================================================ // Tag mutations // ============================================================================ /// Create a tag in a community. Returns the tag ID. #[tracing::instrument(skip_all)] pub async fn create_tag( pool: &PgPool, community_id: Uuid, name: &str, slug: &str, ) -> Result { let row: (Uuid,) = sqlx::query_as( "INSERT INTO tags (community_id, name, slug) VALUES ($1, $2, $3) RETURNING id", ) .bind(community_id) .bind(name) .bind(slug) .fetch_one(pool) .await?; Ok(row.0) } /// Delete a tag (CASCADE removes thread_tags rows). #[tracing::instrument(skip_all)] pub async fn delete_tag(pool: &PgPool, tag_id: Uuid) -> Result<(), sqlx::Error> { sqlx::query("DELETE FROM tags WHERE id = $1") .bind(tag_id) .execute(pool) .await?; Ok(()) } /// Set the tags for a thread (delete existing + insert new, in a transaction). #[tracing::instrument(skip_all)] pub async fn set_thread_tags( pool: &PgPool, thread_id: Uuid, tag_ids: &[Uuid], ) -> Result<(), sqlx::Error> { let mut tx = pool.begin().await?; sqlx::query("DELETE FROM thread_tags WHERE thread_id = $1") .bind(thread_id) .execute(&mut *tx) .await?; for tag_id in tag_ids { sqlx::query("INSERT INTO thread_tags (thread_id, tag_id) VALUES ($1, $2)") .bind(thread_id) .bind(tag_id) .execute(&mut *tx) .await?; } tx.commit().await?; Ok(()) } // ============================================================================ // Flag mutations // ============================================================================ /// Insert a flag on a post. ON CONFLICT DO NOTHING (idempotent per user+post). #[tracing::instrument(skip_all)] pub async fn insert_flag( pool: &PgPool, post_id: Uuid, flagger_id: Uuid, reason: &str, detail: Option<&str>, ) -> Result<(), sqlx::Error> { sqlx::query( "INSERT INTO post_flags (post_id, flagger_id, reason, detail) VALUES ($1, $2, $3, $4) ON CONFLICT (post_id, flagger_id) DO NOTHING", ) .bind(post_id) .bind(flagger_id) .bind(reason) .bind(detail) .execute(pool) .await?; Ok(()) } /// Resolve a single flag. #[tracing::instrument(skip_all)] pub async fn resolve_flag( pool: &PgPool, flag_id: Uuid, resolved_by: Uuid, resolution: &str, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE post_flags SET resolved_at = now(), resolved_by = $2, resolution = $3 WHERE id = $1 AND resolved_at IS NULL", ) .bind(flag_id) .bind(resolved_by) .bind(resolution) .execute(pool) .await?; Ok(()) } /// Resolve all unresolved flags for a given post. #[tracing::instrument(skip_all)] pub async fn resolve_all_flags_for_post( pool: &PgPool, post_id: Uuid, resolved_by: Uuid, resolution: &str, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE post_flags SET resolved_at = now(), resolved_by = $2, resolution = $3 WHERE post_id = $1 AND resolved_at IS NULL", ) .bind(post_id) .bind(resolved_by) .bind(resolution) .execute(pool) .await?; Ok(()) } // ============================================================================ // Link preview mutations // ============================================================================ /// Insert a link preview for a post. Ignores duplicates. #[tracing::instrument(skip_all)] pub async fn insert_link_preview( pool: &PgPool, post_id: Uuid, url: &str, title: Option<&str>, description: Option<&str>, ) -> Result<(), sqlx::Error> { sqlx::query( "INSERT INTO link_previews (post_id, url, title, description) VALUES ($1, $2, $3, $4) ON CONFLICT (post_id, url) DO NOTHING", ) .bind(post_id) .bind(url) .bind(title) .bind(description) .execute(pool) .await?; Ok(()) } // ============================================================================ // Mention mutations // ============================================================================ /// Insert mention rows for a post. Ignores duplicates. #[tracing::instrument(skip_all)] pub async fn insert_mentions( pool: &PgPool, post_id: Uuid, user_ids: &[Uuid], ) -> Result<(), sqlx::Error> { for user_id in user_ids { sqlx::query( "INSERT INTO post_mentions (post_id, mentioned_user_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", ) .bind(post_id) .bind(user_id) .execute(pool) .await?; } Ok(()) } // ============================================================================ // Endorsement mutations // ============================================================================ /// Toggle endorsement: insert if missing, delete if exists. Returns true if now endorsed. #[tracing::instrument(skip_all)] pub async fn toggle_endorsement( pool: &PgPool, post_id: Uuid, endorser_id: Uuid, ) -> Result { let result = sqlx::query( "INSERT INTO post_endorsements (post_id, endorser_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", ) .bind(post_id) .bind(endorser_id) .execute(pool) .await?; if result.rows_affected() == 0 { // Already existed — remove it sqlx::query("DELETE FROM post_endorsements WHERE post_id = $1 AND endorser_id = $2") .bind(post_id) .bind(endorser_id) .execute(pool) .await?; Ok(false) } else { Ok(true) } } // ============================================================================ // Image uploads // ============================================================================ /// Insert an uploaded image record. #[tracing::instrument(skip_all)] pub async fn insert_image( pool: &PgPool, uploader_id: Uuid, community_id: Uuid, s3_key: &str, filename: &str, content_type: &str, size_bytes: i64, ) -> Result { sqlx::query_scalar( "INSERT INTO images (uploader_id, community_id, s3_key, filename, content_type, size_bytes) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id", ) .bind(uploader_id) .bind(community_id) .bind(s3_key) .bind(filename) .bind(content_type) .bind(size_bytes) .fetch_one(pool) .await } /// Mark an image as removed by a moderator. #[tracing::instrument(skip_all)] pub async fn remove_image( pool: &PgPool, image_id: Uuid, removed_by: Uuid, ) -> Result<(), sqlx::Error> { sqlx::query( "UPDATE images SET removed_at = now(), removed_by = $2 WHERE id = $1 AND removed_at IS NULL", ) .bind(image_id) .bind(removed_by) .execute(pool) .await?; Ok(()) }