//! Item CRUD: creation, listing, text body updates, and ownership lookups. //! //! Bulk and structural operations (move, bulk_*, duplicate) live in the //! `bulk` submodule and are re-exported flat so call sites still see //! `db::items::bulk_publish` etc. mod bulk; mod media; pub use bulk::*; pub use media::*; use sqlx::PgPool; use super::enums::{AiTier, ItemType}; use super::models::*; use super::{ItemId, MtThreadId, PriceCents, ProjectId, UserId}; use crate::error::Result; /// Insert a new item into a project and return the created row. /// /// Auto-generates a URL-safe slug from the title. If the slug collides with /// an existing item in the same project, appends a counter suffix. #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all)] pub async fn create_item( pool: &PgPool, project_id: ProjectId, title: &str, description: Option<&str>, price_cents: PriceCents, item_type: ItemType, ai_tier: AiTier, ai_disclosure: Option<&str>, ) -> Result { let mut slug = crate::helpers::slugify(title); // Check for collision and append counter if needed if item_slug_exists(pool, project_id, &slug).await? { let base = slug.clone(); let mut counter = 2u32; loop { slug = super::validated_types::Slug::from_trusted(format!("{}-{}", base, counter)); if !item_slug_exists(pool, project_id, &slug).await? { break; } counter += 1; } } // Retry loop for TOCTOU race on slug uniqueness let base_slug = slug.clone(); let mut suffix = 1u32; let item = loop { match sqlx::query_as::<_, DbItem>( r#" INSERT INTO items (project_id, title, description, price_cents, item_type, slug, ai_tier, ai_disclosure) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING * "#, ) .bind(project_id) .bind(title) .bind(description) .bind(price_cents) .bind(item_type) .bind(&slug) .bind(ai_tier) .bind(ai_disclosure) .fetch_one(pool) .await { Ok(item) => break item, Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("23505") && suffix < 100 => { suffix += 1; slug = super::validated_types::Slug::from_trusted( format!("{}-{}", base_slug, suffix), ); continue; } Err(e) => return Err(e.into()), } }; Ok(item) } /// Check whether a slug already exists for a given project. pub(super) async fn item_slug_exists<'e, E: sqlx::Executor<'e, Database = sqlx::Postgres>>( executor: E, project_id: ProjectId, slug: &super::validated_types::Slug, ) -> Result { let exists: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM items WHERE project_id = $1 AND slug = $2)", ) .bind(project_id) .bind(slug) .fetch_one(executor) .await?; Ok(exists) } /// Fetch an item by primary key. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get_item_by_id(pool: &PgPool, id: ItemId) -> Result> { let item = sqlx::query_as::<_, DbItem>("SELECT * FROM items WHERE id = $1") .bind(id) .fetch_optional(pool) .await?; Ok(item) } /// Fetch titles for a batch of item IDs. Returns (item_id, title) pairs. #[tracing::instrument(skip_all)] pub async fn get_item_titles_batch(pool: &PgPool, ids: &[ItemId]) -> Result> { if ids.is_empty() { return Ok(vec![]); } let rows: Vec<(ItemId, String)> = sqlx::query_as( "SELECT id, title FROM items WHERE id = ANY($1)", ) .bind(ids) .fetch_all(pool) .await?; Ok(rows) } /// Fetch project_id for a batch of item IDs. Returns (item_id, project_id) pairs. /// /// Used by bulk operations to verify all items belong to the same project in one query. #[tracing::instrument(skip_all)] pub async fn get_item_project_ids_batch(pool: &PgPool, ids: &[ItemId]) -> Result> { if ids.is_empty() { return Ok(vec![]); } let rows: Vec<(ItemId, super::ProjectId)> = sqlx::query_as( "SELECT id, project_id FROM items WHERE id = ANY($1)", ) .bind(ids) .fetch_all(pool) .await?; Ok(rows) } /// List all items in a project, ordered by sort_order then newest. /// /// Capped at 500 as a safety limit. #[tracing::instrument(skip_all)] pub async fn get_items_by_project(pool: &PgPool, project_id: ProjectId) -> Result> { let items = sqlx::query_as::<_, DbItem>( "SELECT * FROM items WHERE project_id = $1 AND deleted_at IS NULL ORDER BY sort_order, created_at DESC LIMIT 500", ) .bind(project_id) .fetch_all(pool) .await?; Ok(items) } /// List all items across all projects owned by a user, newest first. /// /// Capped at 500 as a safety limit. #[tracing::instrument(skip_all)] pub async fn get_items_by_user(pool: &PgPool, user_id: UserId) -> Result> { let items = sqlx::query_as::<_, DbItem>( r#" SELECT i.* FROM items i JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1 AND i.deleted_at IS NULL ORDER BY i.created_at DESC LIMIT 500 "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(items) } /// Count items per project for all projects owned by a user. /// /// Returns `(project_id, count)` tuples. Used by the CLI to avoid N+1 queries. #[tracing::instrument(skip_all)] pub async fn count_items_by_user_projects( pool: &PgPool, user_id: UserId, ) -> Result> { let rows: Vec<(ProjectId, i64)> = sqlx::query_as( r#" SELECT i.project_id, COUNT(*) AS cnt FROM items i JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1 AND i.deleted_at IS NULL GROUP BY i.project_id "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(rows) } /// List only public items in a project, ordered by sort_order then newest. /// /// Capped at 500 as a safety limit. #[tracing::instrument(skip_all)] pub async fn get_public_items_by_project(pool: &PgPool, project_id: ProjectId) -> Result> { let items = sqlx::query_as::<_, DbItem>( "SELECT * FROM items WHERE project_id = $1 AND is_public = true AND listed = true ORDER BY sort_order, created_at DESC LIMIT 500", ) .bind(project_id) .fetch_all(pool) .await?; Ok(items) } /// Partially update an item's fields (COALESCE keeps existing values when `None`). /// /// `publish_at` uses a double-Option: `None` = no change, `Some(None)` = clear schedule, /// `Some(Some(dt))` = set schedule. #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all)] pub async fn update_item( pool: &PgPool, id: ItemId, user_id: UserId, title: Option<&str>, description: Option<&str>, price_cents: Option, item_type: Option, is_public: Option, pwyw_enabled: Option, pwyw_min_cents: Option, publish_at: Option>>, web_only: Option, ai_tier: Option, ai_disclosure: Option>, ) -> Result { // Flatten the double-Option: if outer is None, pass current DB value (via SQL CASE). // $10 = whether to update publish_at, $11 = the new value (NULL to clear). let update_publish_at = publish_at.is_some(); let publish_at_value = publish_at.flatten(); // ai_disclosure uses the same double-Option pattern as publish_at: // None = no change, Some(None) = clear, Some(Some(text)) = set. let update_ai_disclosure = ai_disclosure.is_some(); let ai_disclosure_value = ai_disclosure.flatten(); let item = sqlx::query_as::<_, DbItem>( r#" UPDATE items SET title = COALESCE($3, title), description = COALESCE($4, description), price_cents = COALESCE($5, price_cents), item_type = COALESCE($6, item_type), is_public = CASE WHEN removed_by_admin AND $7 = true THEN false ELSE COALESCE($7, is_public) END, pwyw_enabled = COALESCE($8, pwyw_enabled), pwyw_min_cents = COALESCE($9, pwyw_min_cents), publish_at = CASE WHEN $10 THEN $11 ELSE publish_at END, web_only = COALESCE($12, web_only), ai_tier = COALESCE($13, ai_tier), ai_disclosure = CASE WHEN $14 THEN $15 ELSE ai_disclosure END WHERE id = $1 AND project_id IN (SELECT id FROM projects WHERE user_id = $2) RETURNING * "#, ) .bind(id) .bind(user_id) .bind(title) .bind(description) .bind(price_cents) .bind(item_type) .bind(is_public) .bind(pwyw_enabled) .bind(pwyw_min_cents) .bind(update_publish_at) .bind(publish_at_value) .bind(web_only) .bind(ai_tier) .bind(update_ai_disclosure) .bind(ai_disclosure_value) .fetch_one(pool) .await?; Ok(item) } /// Publish all items whose scheduled publish time has passed. /// /// Atomically sets `is_public = true` and clears `publish_at`, returning the /// newly published items so the caller can send release announcements. #[tracing::instrument(skip_all)] pub async fn publish_scheduled_items(pool: &PgPool) -> Result> { let items = sqlx::query_as::<_, DbItem>( r#" UPDATE items SET is_public = true, publish_at = NULL, updated_at = NOW() WHERE publish_at IS NOT NULL AND publish_at <= NOW() AND is_public = false AND removed_by_admin = false RETURNING * "#, ) .fetch_all(pool) .await?; Ok(items) } /// Soft-delete an item (sets deleted_at, recoverable for 7 days). #[tracing::instrument(skip_all)] pub async fn delete_item(pool: &PgPool, id: ItemId, user_id: UserId) -> Result<()> { sqlx::query( r#" UPDATE items SET deleted_at = NOW(), is_public = false WHERE id = $1 AND deleted_at IS NULL AND project_id IN (SELECT id FROM projects WHERE user_id = $2) "#, ) .bind(id) .bind(user_id) .execute(pool) .await?; Ok(()) } /// Restore a soft-deleted item. #[tracing::instrument(skip_all)] pub async fn restore_item(pool: &PgPool, id: ItemId, user_id: UserId) -> Result { let result = sqlx::query( r#" UPDATE items SET deleted_at = NULL WHERE id = $1 AND deleted_at IS NOT NULL AND project_id IN (SELECT id FROM projects WHERE user_id = $2) "#, ) .bind(id) .bind(user_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Get soft-deleted items for a project (for the "Recently Deleted" section). #[tracing::instrument(skip_all)] pub async fn get_deleted_items_by_project( pool: &PgPool, project_id: ProjectId, ) -> Result> { let items = sqlx::query_as::<_, DbItem>( "SELECT * FROM items WHERE project_id = $1 AND deleted_at IS NOT NULL ORDER BY deleted_at DESC LIMIT 500", ) .bind(project_id) .fetch_all(pool) .await?; Ok(items) } /// Collect S3 keys from items that are about to be purged (soft-deleted >7 days). /// Returns all non-null S3 keys (audio, cover, video) so they can be deleted /// from S3 before the DB rows are removed. #[tracing::instrument(skip_all)] pub async fn get_expired_deleted_item_s3_keys(pool: &PgPool) -> Result> { let keys: Vec = sqlx::query_scalar( r#" SELECT k FROM ( SELECT unnest(ARRAY_REMOVE(ARRAY[audio_s3_key, cover_s3_key, video_s3_key], NULL)) AS k FROM items WHERE deleted_at IS NOT NULL AND deleted_at < NOW() - INTERVAL '7 days' AND (audio_s3_key IS NOT NULL OR cover_s3_key IS NOT NULL OR video_s3_key IS NOT NULL) ) sub "#, ) .fetch_all(pool) .await?; Ok(keys) } /// Collect S3 keys from versions belonging to items about to be purged. /// Must be called before purge since CASCADE delete destroys version rows. #[tracing::instrument(skip_all)] pub async fn get_expired_deleted_item_version_s3_keys(pool: &PgPool) -> Result> { let keys: Vec = sqlx::query_scalar( r#" SELECT v.s3_key FROM versions v JOIN items i ON v.item_id = i.id WHERE i.deleted_at IS NOT NULL AND i.deleted_at < NOW() - INTERVAL '7 days' AND v.s3_key IS NOT NULL "#, ) .fetch_all(pool) .await?; Ok(keys) } /// Sum total file sizes per user for items about to be purged, including version files. /// Returns (user_id, total_bytes) pairs for storage decrement. #[tracing::instrument(skip_all)] pub async fn get_expired_deleted_item_storage_by_user(pool: &PgPool) -> Result> { let rows: Vec<(super::UserId, i64)> = sqlx::query_as( r#" SELECT p.user_id, COALESCE(SUM( COALESCE(i.audio_file_size_bytes, 0) + COALESCE(i.cover_file_size_bytes, 0) + COALESCE(i.video_file_size_bytes, 0) + COALESCE(ver.version_bytes, 0) ), 0)::BIGINT AS total_bytes FROM items i JOIN projects p ON i.project_id = p.id LEFT JOIN LATERAL ( SELECT COALESCE(SUM(v.file_size_bytes), 0)::BIGINT AS version_bytes FROM versions v WHERE v.item_id = i.id AND v.file_size_bytes IS NOT NULL ) ver ON true WHERE i.deleted_at IS NOT NULL AND i.deleted_at < NOW() - INTERVAL '7 days' GROUP BY p.user_id "#, ) .fetch_all(pool) .await?; Ok(rows) } /// Collect all S3 keys from items belonging to a project (audio, cover, video). #[tracing::instrument(skip_all)] pub async fn get_project_item_s3_keys(pool: &PgPool, project_id: super::ProjectId) -> Result> { let keys: Vec = sqlx::query_scalar( r#" SELECT k FROM ( SELECT unnest(ARRAY_REMOVE(ARRAY[audio_s3_key, cover_s3_key, video_s3_key], NULL)) AS k FROM items WHERE project_id = $1 AND (audio_s3_key IS NOT NULL OR cover_s3_key IS NOT NULL OR video_s3_key IS NOT NULL) ) sub "#, ) .bind(project_id) .fetch_all(pool) .await?; Ok(keys) } /// Collect S3 keys from versions belonging to items in a project. #[tracing::instrument(skip_all)] pub async fn get_project_version_s3_keys(pool: &PgPool, project_id: super::ProjectId) -> Result> { let keys: Vec = sqlx::query_scalar( r#" SELECT v.s3_key FROM versions v JOIN items i ON v.item_id = i.id WHERE i.project_id = $1 AND v.s3_key IS NOT NULL "#, ) .bind(project_id) .fetch_all(pool) .await?; Ok(keys) } /// Sum total file sizes for all items and versions in a project. #[tracing::instrument(skip_all)] pub async fn get_project_storage_bytes(pool: &PgPool, project_id: super::ProjectId) -> Result { let total: i64 = sqlx::query_scalar( r#" SELECT COALESCE(SUM( COALESCE(i.audio_file_size_bytes, 0) + COALESCE(i.cover_file_size_bytes, 0) + COALESCE(i.video_file_size_bytes, 0) + COALESCE(ver.version_bytes, 0) ), 0)::BIGINT FROM items i LEFT JOIN LATERAL ( SELECT COALESCE(SUM(v.file_size_bytes), 0)::BIGINT AS version_bytes FROM versions v WHERE v.item_id = i.id AND v.file_size_bytes IS NOT NULL ) ver ON true WHERE i.project_id = $1 "#, ) .bind(project_id) .fetch_one(pool) .await?; Ok(total) } /// Permanently delete items that were soft-deleted more than 7 days ago. #[tracing::instrument(skip_all)] pub async fn purge_expired_deleted_items(pool: &PgPool) -> Result { let result = sqlx::query( "DELETE FROM items WHERE deleted_at IS NOT NULL AND deleted_at < NOW() - INTERVAL '7 days'", ) .execute(pool) .await?; Ok(result.rows_affected()) } /// Update the audio S3 key for an item (defense-in-depth: verifies ownership) #[tracing::instrument(skip_all)] pub async fn update_item_audio_s3_key( pool: &PgPool, item_id: ItemId, user_id: UserId, s3_key: &str, ) -> Result { let item = sqlx::query_as::<_, DbItem>( r#" UPDATE items SET audio_s3_key = $2, updated_at = NOW() WHERE id = $1 AND project_id IN (SELECT id FROM projects WHERE user_id = $3) RETURNING * "#, ) .bind(item_id) .bind(s3_key) .bind(user_id) .fetch_one(pool) .await?; Ok(item) } /// Update the cover image S3 key for an item (defense-in-depth: verifies ownership) #[tracing::instrument(skip_all)] pub async fn update_item_cover_s3_key( pool: &PgPool, item_id: ItemId, user_id: UserId, s3_key: &str, ) -> Result { let item = sqlx::query_as::<_, DbItem>( r#" UPDATE items SET cover_s3_key = $2, updated_at = NOW() WHERE id = $1 AND project_id IN (SELECT id FROM projects WHERE user_id = $3) RETURNING * "#, ) .bind(item_id) .bind(s3_key) .bind(user_id) .fetch_one(pool) .await?; Ok(item) } /// List all public items across all public projects owned by a user, newest first. /// /// Used by the creator RSS feed to avoid O(projects) queries. /// Capped at 50 items (standard RSS feed size). #[tracing::instrument(skip_all)] pub async fn get_public_items_by_user(pool: &PgPool, user_id: UserId) -> Result> { let items = sqlx::query_as::<_, DbItem>( r#" SELECT i.* FROM items i JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1 AND p.is_public = true AND i.is_public = true AND i.listed = true ORDER BY i.created_at DESC LIMIT 50 "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(items) } /// Get the owner (user_id) of an item through its project #[tracing::instrument(skip_all)] pub async fn get_item_owner(pool: &PgPool, item_id: ItemId) -> Result> { let owner: Option = sqlx::query_scalar( r#" SELECT p.user_id FROM items i JOIN projects p ON i.project_id = p.id WHERE i.id = $1 "#, ) .bind(item_id) .fetch_optional(pool) .await?; Ok(owner) } /// Pre-flight access check for content streaming/download. Returns item data /// plus ownership, purchase, subscription, and bundle access. /// Returns `None` if the item does not exist (or is soft-deleted). #[derive(Debug)] pub struct ItemAccessCheck { // Ownership pub owner_id: UserId, // Access flags (only meaningful when a user_id is provided) pub has_purchased: bool, /// Subscription access as a witness, not a bool: `Some` only when the sealed /// gate confirmed an active, in-period subscription. Feed it straight into /// `pricing::AccessContext::subscription`. pub subscription: Option, pub has_bundle_access: bool, } /// Ownership + the access flags that are pure transaction/bundle lookups. /// `has_subscription` is deliberately NOT here: it must come from /// [`crate::db::subscriptions::has_access`], the single sealed access gate, so /// the grant predicate (incl. the `current_period_end` clause) cannot be /// hand-written and drift. (Payments S1 / CHRONIC 2: this query previously /// inlined the predicate and dropped the period clause on the download path.) #[derive(Debug, sqlx::FromRow)] struct ItemAccessPartial { owner_id: UserId, has_purchased: bool, has_bundle_access: bool, } #[tracing::instrument(skip_all)] pub async fn check_item_access( pool: &PgPool, item_id: ItemId, user_id: Option, ) -> Result> { // When no user is provided, access flags are all false let uid = user_id.unwrap_or(UserId::nil()); let partial = sqlx::query_as::<_, ItemAccessPartial>( r#" SELECT p.user_id AS owner_id, EXISTS( SELECT 1 FROM transactions t WHERE t.item_id = i.id AND t.buyer_id = $2 AND t.status = 'completed' ) AS has_purchased, EXISTS( SELECT 1 FROM bundle_items bi JOIN transactions bt ON bt.item_id = bi.bundle_id WHERE bi.item_id = i.id AND bt.buyer_id = $2 AND bt.status = 'completed' ) AS has_bundle_access FROM items i JOIN projects p ON i.project_id = p.id WHERE i.id = $1 AND i.deleted_at IS NULL "#, ) .bind(item_id) .bind(uid) .fetch_optional(pool) .await?; let Some(partial) = partial else { return Ok(None); }; // Subscription access goes through the one sealed gate, which returns a // witness. Anonymous callers (no user_id) can't hold a subscription, so skip // the query entirely. let subscription = match user_id { Some(real_uid) => { crate::db::subscriptions::SubscriptionGate::check( pool, real_uid, crate::db::subscriptions::SubscriptionScope::Item(item_id), ) .await? } None => None, }; Ok(Some(ItemAccessCheck { owner_id: partial.owner_id, has_purchased: partial.has_purchased, subscription, has_bundle_access: partial.has_bundle_access, })) } /// Update the text body content for an item (for articles/essays). /// /// Recomputes `word_count` and `reading_time_minutes` on every save. /// Reading time uses ~200 wpm (average adult reading speed) with floor /// division, clamped to a minimum of 1 minute so the UI never shows "0 min". #[tracing::instrument(skip_all)] pub async fn update_item_text(pool: &PgPool, id: ItemId, user_id: UserId, body: &str) -> Result { let word_count = body.split_whitespace().count() as i32; let reading_time = (word_count / 200).max(1); let item = sqlx::query_as::<_, DbItem>( r#" UPDATE items SET body = $3, word_count = $4, reading_time_minutes = $5, updated_at = NOW() WHERE id = $1 AND project_id IN (SELECT id FROM projects WHERE user_id = $2) RETURNING * "#, ) .bind(id) .bind(user_id) .bind(body) .bind(word_count) .bind(reading_time) .fetch_one(pool) .await?; Ok(item) } /// Update the license key settings on an item. #[tracing::instrument(skip_all)] pub async fn update_item_license_settings( pool: &PgPool, item_id: ItemId, user_id: UserId, enable_license_keys: bool, default_max_activations: Option, ) -> Result<()> { sqlx::query( r#" UPDATE items SET enable_license_keys = $3, default_max_activations = $4, updated_at = NOW() WHERE id = $1 AND project_id IN (SELECT id FROM projects WHERE user_id = $2) "#, ) .bind(item_id) .bind(user_id) .bind(enable_license_keys) .bind(default_max_activations) .execute(pool) .await?; Ok(()) } /// Update the license text preset and optional custom text on an item. #[tracing::instrument(skip_all)] pub async fn update_item_license_text( pool: &PgPool, item_id: ItemId, user_id: UserId, license_preset: Option<&str>, custom_license_text: Option<&str>, ) -> Result<()> { sqlx::query( r#" UPDATE items SET license_preset = $3, custom_license_text = $4, updated_at = NOW() WHERE id = $1 AND project_id IN (SELECT id FROM projects WHERE user_id = $2) "#, ) .bind(item_id) .bind(user_id) .bind(license_preset) .bind(custom_license_text) .execute(pool) .await?; Ok(()) } /// Increment the denormalized sales_count for an item (called on purchase/claim). /// /// Accepts any sqlx executor (`&PgPool`, `&mut Transaction`, etc.) so callers /// can include this in a larger transaction when needed. #[tracing::instrument(skip_all)] pub async fn increment_sales_count<'e>( executor: impl sqlx::PgExecutor<'e>, item_id: ItemId, ) -> Result<()> { sqlx::query("UPDATE items SET sales_count = sales_count + 1 WHERE id = $1") .bind(item_id) .execute(executor) .await?; Ok(()) } /// Decrement the denormalized sales_count for an item (called on refund/unclaim). #[tracing::instrument(skip_all)] pub async fn decrement_sales_count<'e>( executor: impl sqlx::PgExecutor<'e>, item_id: ItemId, ) -> Result<()> { sqlx::query("UPDATE items SET sales_count = GREATEST(sales_count - 1, 0) WHERE id = $1") .bind(item_id) .execute(executor) .await?; Ok(()) } /// Atomically mark an item as having had its release announced. /// Returns false if already announced (prevents duplicate announcements on unpublish/republish). #[tracing::instrument(skip_all)] pub async fn mark_release_announced(pool: &PgPool, item_id: ItemId) -> Result { let result = sqlx::query( "UPDATE items SET release_announced_at = NOW() WHERE id = $1 AND release_announced_at IS NULL", ) .bind(item_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Set the linked MT thread ID for an item. #[tracing::instrument(skip_all)] pub async fn set_mt_thread_id( pool: &PgPool, item_id: ItemId, thread_id: MtThreadId, ) -> Result<()> { sqlx::query("UPDATE items SET mt_thread_id = $2 WHERE id = $1") .bind(item_id) .bind(thread_id) .execute(pool) .await?; Ok(()) } /// Collect all S3 keys for items owned by a user (audio + cover + video). /// /// Returns item title, project slug, and optional S3 keys for audio/cover/video. /// Only includes items that have at least one S3 key. #[tracing::instrument(skip_all)] pub async fn get_user_s3_keys(pool: &PgPool, user_id: UserId) -> Result> { let rows = sqlx::query_as::<_, ItemS3KeyRow>( r#" SELECT i.title, p.id AS project_id, p.slug AS project_slug, i.audio_s3_key, i.cover_s3_key, i.video_s3_key, i.audio_file_size_bytes, i.cover_file_size_bytes, i.video_file_size_bytes FROM items i JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1 AND (i.audio_s3_key IS NOT NULL OR i.cover_s3_key IS NOT NULL OR i.video_s3_key IS NOT NULL) ORDER BY p.slug, i.sort_order LIMIT 500 "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(rows) } /// Check whether a user has at least one public item across all their projects. #[tracing::instrument(skip_all)] pub async fn has_public_item_by_user(pool: &PgPool, user_id: UserId) -> Result { let exists: bool = sqlx::query_scalar( r#" SELECT EXISTS( SELECT 1 FROM items i JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1 AND i.is_public = true ) "#, ) .bind(user_id) .fetch_one(pool) .await?; Ok(exists) } /// Increment the play count for an item (called on audio/video stream request). #[tracing::instrument(skip_all)] pub async fn increment_play_count(pool: &PgPool, item_id: ItemId) -> Result<()> { sqlx::query("UPDATE items SET play_count = play_count + 1 WHERE id = $1") .bind(item_id) .execute(pool) .await?; Ok(()) } /// Record a unique listener and increment unique_play_count if this is the first /// play by this user. Returns true if new unique listener, false if already counted. /// Runs in a transaction so the user_plays INSERT and count UPDATE are atomic. #[tracing::instrument(skip_all)] pub async fn record_unique_play(pool: &PgPool, user_id: super::UserId, item_id: ItemId) -> Result { let mut tx = pool.begin().await?; let result = sqlx::query( "INSERT INTO user_plays (user_id, item_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", ) .bind(user_id) .bind(item_id) .execute(&mut *tx) .await?; if result.rows_affected() > 0 { sqlx::query("UPDATE items SET unique_play_count = unique_play_count + 1 WHERE id = $1") .bind(item_id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(true) } else { tx.commit().await?; Ok(false) } } /// Increment the item-level download count (called alongside per-version increment). #[tracing::instrument(skip_all)] pub async fn increment_item_download_count(pool: &PgPool, item_id: ItemId) -> Result<()> { sqlx::query("UPDATE items SET download_count = download_count + 1 WHERE id = $1") .bind(item_id) .execute(pool) .await?; Ok(()) } /// Fetch a public item by project ID and slug (for custom domain routing). #[tracing::instrument(skip_all)] pub async fn get_item_by_project_and_slug( pool: &PgPool, project_id: ProjectId, slug: &str, ) -> Result> { let item = sqlx::query_as::<_, DbItem>( "SELECT * FROM items WHERE project_id = $1 AND slug = $2 AND is_public = true", ) .bind(project_id) .bind(slug) .fetch_optional(pool) .await?; Ok(item) } /// Hide all items for a user (set is_public = false). Used for post-grace enforcement. /// Returns the number of items hidden. #[tracing::instrument(skip_all)] pub async fn hide_all_items_for_user(pool: &PgPool, user_id: UserId) -> Result { let result = sqlx::query( r#" UPDATE items SET is_public = false WHERE project_id IN (SELECT id FROM projects WHERE user_id = $1) AND is_public = true "#, ) .bind(user_id) .execute(pool) .await?; Ok(result.rows_affected()) } /// Unhide all items for a user (set is_public = true). Used when a creator re-subscribes /// after post-grace hiding. Returns the number of items unhidden. #[tracing::instrument(skip_all)] pub async fn unhide_all_items_for_user(pool: &PgPool, user_id: UserId) -> Result { let result = sqlx::query( r#" UPDATE items SET is_public = true WHERE project_id IN (SELECT id FROM projects WHERE user_id = $1) AND is_public = false AND removed_at IS NULL "#, ) .bind(user_id) .execute(pool) .await?; Ok(result.rows_affected()) } /// Admin: remove an item (hide from public, record reason). The item stays in the DB /// and the creator can see it in their dashboard with the removal reason. #[tracing::instrument(skip_all)] pub async fn admin_remove_item( pool: &PgPool, item_id: ItemId, reason: &str, ) -> Result { let item = sqlx::query_as::<_, DbItem>( r#" UPDATE items SET removed_by_admin = true, removal_reason = $2, removed_at = NOW(), is_public = false WHERE id = $1 RETURNING * "#, ) .bind(item_id) .bind(reason) .fetch_one(pool) .await?; Ok(item) } /// Admin: restore a previously removed item. Clears the removal fields /// but does NOT re-publish (creator must publish manually). #[tracing::instrument(skip_all)] pub async fn admin_restore_item( pool: &PgPool, item_id: ItemId, ) -> Result { let item = sqlx::query_as::<_, DbItem>( r#" UPDATE items SET removed_by_admin = false, removal_reason = NULL, removed_at = NULL WHERE id = $1 RETURNING * "#, ) .bind(item_id) .fetch_one(pool) .await?; Ok(item) } /// Count public, listed items (for landing page stats). #[tracing::instrument(skip_all)] pub async fn count_public_listed(pool: &PgPool) -> Result { let count = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM items WHERE is_public = true AND listed = true", ) .fetch_one(pool) .await?; Ok(count) }