Skip to main content

max / makenotwork

31.7 KB · 1052 lines History Blame Raw
1 //! Item CRUD: creation, listing, text body updates, and ownership lookups.
2 //!
3 //! Bulk and structural operations (move, bulk_*, duplicate) live in the
4 //! `bulk` submodule and are re-exported flat so call sites still see
5 //! `db::items::bulk_publish` etc.
6
7 mod bulk;
8 mod media;
9
10 pub use bulk::*;
11 pub use media::*;
12
13 use sqlx::PgPool;
14
15 use super::enums::{AiTier, ItemType};
16 use super::models::*;
17 use super::{ItemId, MtThreadId, PriceCents, ProjectId, UserId};
18 use crate::error::Result;
19
20 /// Insert a new item into a project and return the created row.
21 ///
22 /// Auto-generates a URL-safe slug from the title. If the slug collides with
23 /// an existing item in the same project, appends a counter suffix.
24 #[allow(clippy::too_many_arguments)]
25 #[tracing::instrument(skip_all)]
26 pub async fn create_item(
27 pool: &PgPool,
28 project_id: ProjectId,
29 title: &str,
30 description: Option<&str>,
31 price_cents: PriceCents,
32 item_type: ItemType,
33 ai_tier: AiTier,
34 ai_disclosure: Option<&str>,
35 ) -> Result<DbItem> {
36 let mut slug = crate::helpers::slugify(title);
37
38 // Check for collision and append counter if needed
39 if item_slug_exists(pool, project_id, &slug).await? {
40 let base = slug.clone();
41 let mut counter = 2u32;
42 loop {
43 slug = super::validated_types::Slug::from_trusted(format!("{}-{}", base, counter));
44 if !item_slug_exists(pool, project_id, &slug).await? {
45 break;
46 }
47 counter += 1;
48 }
49 }
50
51 // Retry loop for TOCTOU race on slug uniqueness
52 let base_slug = slug.clone();
53 let mut suffix = 1u32;
54 let item = loop {
55 match sqlx::query_as::<_, DbItem>(
56 r#"
57 INSERT INTO items (project_id, title, description, price_cents, item_type, slug, ai_tier, ai_disclosure)
58 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
59 RETURNING *
60 "#,
61 )
62 .bind(project_id)
63 .bind(title)
64 .bind(description)
65 .bind(price_cents)
66 .bind(item_type)
67 .bind(&slug)
68 .bind(ai_tier)
69 .bind(ai_disclosure)
70 .fetch_one(pool)
71 .await
72 {
73 Ok(item) => break item,
74 Err(sqlx::Error::Database(db_err))
75 if db_err.code().as_deref() == Some("23505") && suffix < 100 =>
76 {
77 suffix += 1;
78 slug = super::validated_types::Slug::from_trusted(
79 format!("{}-{}", base_slug, suffix),
80 );
81 continue;
82 }
83 Err(e) => return Err(e.into()),
84 }
85 };
86
87 Ok(item)
88 }
89
90 /// Check whether a slug already exists for a given project.
91 pub(super) async fn item_slug_exists<'e, E: sqlx::Executor<'e, Database = sqlx::Postgres>>(
92 executor: E,
93 project_id: ProjectId,
94 slug: &super::validated_types::Slug,
95 ) -> Result<bool> {
96 let exists: bool = sqlx::query_scalar(
97 "SELECT EXISTS(SELECT 1 FROM items WHERE project_id = $1 AND slug = $2)",
98 )
99 .bind(project_id)
100 .bind(slug)
101 .fetch_one(executor)
102 .await?;
103
104 Ok(exists)
105 }
106
107 /// Fetch an item by primary key. Returns `None` if not found.
108 #[tracing::instrument(skip_all)]
109 pub async fn get_item_by_id(pool: &PgPool, id: ItemId) -> Result<Option<DbItem>> {
110 let item = sqlx::query_as::<_, DbItem>("SELECT * FROM items WHERE id = $1")
111 .bind(id)
112 .fetch_optional(pool)
113 .await?;
114
115 Ok(item)
116 }
117
118 /// Fetch titles for a batch of item IDs. Returns (item_id, title) pairs.
119 #[tracing::instrument(skip_all)]
120 pub async fn get_item_titles_batch(pool: &PgPool, ids: &[ItemId]) -> Result<Vec<(ItemId, String)>> {
121 if ids.is_empty() {
122 return Ok(vec![]);
123 }
124 let rows: Vec<(ItemId, String)> = sqlx::query_as(
125 "SELECT id, title FROM items WHERE id = ANY($1)",
126 )
127 .bind(ids)
128 .fetch_all(pool)
129 .await?;
130
131 Ok(rows)
132 }
133
134 /// Fetch project_id for a batch of item IDs. Returns (item_id, project_id) pairs.
135 ///
136 /// Used by bulk operations to verify all items belong to the same project in one query.
137 #[tracing::instrument(skip_all)]
138 pub async fn get_item_project_ids_batch(pool: &PgPool, ids: &[ItemId]) -> Result<Vec<(ItemId, super::ProjectId)>> {
139 if ids.is_empty() {
140 return Ok(vec![]);
141 }
142 let rows: Vec<(ItemId, super::ProjectId)> = sqlx::query_as(
143 "SELECT id, project_id FROM items WHERE id = ANY($1)",
144 )
145 .bind(ids)
146 .fetch_all(pool)
147 .await?;
148
149 Ok(rows)
150 }
151
152 /// List all items in a project, ordered by sort_order then newest.
153 ///
154 /// Capped at 500 as a safety limit.
155 #[tracing::instrument(skip_all)]
156 pub async fn get_items_by_project(pool: &PgPool, project_id: ProjectId) -> Result<Vec<DbItem>> {
157 let items = sqlx::query_as::<_, DbItem>(
158 "SELECT * FROM items WHERE project_id = $1 AND deleted_at IS NULL ORDER BY sort_order, created_at DESC LIMIT 500",
159 )
160 .bind(project_id)
161 .fetch_all(pool)
162 .await?;
163
164 Ok(items)
165 }
166
167 /// List all items across all projects owned by a user, newest first.
168 ///
169 /// Capped at 500 as a safety limit.
170 #[tracing::instrument(skip_all)]
171 pub async fn get_items_by_user(pool: &PgPool, user_id: UserId) -> Result<Vec<DbItem>> {
172 let items = sqlx::query_as::<_, DbItem>(
173 r#"
174 SELECT i.* FROM items i
175 JOIN projects p ON i.project_id = p.id
176 WHERE p.user_id = $1 AND i.deleted_at IS NULL
177 ORDER BY i.created_at DESC
178 LIMIT 500
179 "#,
180 )
181 .bind(user_id)
182 .fetch_all(pool)
183 .await?;
184
185 Ok(items)
186 }
187
188 /// Count items per project for all projects owned by a user.
189 ///
190 /// Returns `(project_id, count)` tuples. Used by the CLI to avoid N+1 queries.
191 #[tracing::instrument(skip_all)]
192 pub async fn count_items_by_user_projects(
193 pool: &PgPool,
194 user_id: UserId,
195 ) -> Result<Vec<(ProjectId, i64)>> {
196 let rows: Vec<(ProjectId, i64)> = sqlx::query_as(
197 r#"
198 SELECT i.project_id, COUNT(*) AS cnt
199 FROM items i
200 JOIN projects p ON i.project_id = p.id
201 WHERE p.user_id = $1 AND i.deleted_at IS NULL
202 GROUP BY i.project_id
203 "#,
204 )
205 .bind(user_id)
206 .fetch_all(pool)
207 .await?;
208
209 Ok(rows)
210 }
211
212 /// List only public items in a project, ordered by sort_order then newest.
213 ///
214 /// Capped at 500 as a safety limit.
215 #[tracing::instrument(skip_all)]
216 pub async fn get_public_items_by_project(pool: &PgPool, project_id: ProjectId) -> Result<Vec<DbItem>> {
217 let items = sqlx::query_as::<_, DbItem>(
218 "SELECT * FROM items WHERE project_id = $1 AND is_public = true AND listed = true ORDER BY sort_order, created_at DESC LIMIT 500",
219 )
220 .bind(project_id)
221 .fetch_all(pool)
222 .await?;
223
224 Ok(items)
225 }
226
227 /// Partially update an item's fields (COALESCE keeps existing values when `None`).
228 ///
229 /// `publish_at` uses a double-Option: `None` = no change, `Some(None)` = clear schedule,
230 /// `Some(Some(dt))` = set schedule.
231 #[allow(clippy::too_many_arguments)]
232 #[tracing::instrument(skip_all)]
233 pub async fn update_item(
234 pool: &PgPool,
235 id: ItemId,
236 user_id: UserId,
237 title: Option<&str>,
238 description: Option<&str>,
239 price_cents: Option<PriceCents>,
240 item_type: Option<ItemType>,
241 is_public: Option<bool>,
242 pwyw_enabled: Option<bool>,
243 pwyw_min_cents: Option<PriceCents>,
244 publish_at: Option<Option<chrono::DateTime<chrono::Utc>>>,
245 web_only: Option<bool>,
246 ai_tier: Option<AiTier>,
247 ai_disclosure: Option<Option<&str>>,
248 ) -> Result<DbItem> {
249 // Flatten the double-Option: if outer is None, pass current DB value (via SQL CASE).
250 // $10 = whether to update publish_at, $11 = the new value (NULL to clear).
251 let update_publish_at = publish_at.is_some();
252 let publish_at_value = publish_at.flatten();
253
254 // ai_disclosure uses the same double-Option pattern as publish_at:
255 // None = no change, Some(None) = clear, Some(Some(text)) = set.
256 let update_ai_disclosure = ai_disclosure.is_some();
257 let ai_disclosure_value = ai_disclosure.flatten();
258
259 let item = sqlx::query_as::<_, DbItem>(
260 r#"
261 UPDATE items
262 SET title = COALESCE($3, title),
263 description = COALESCE($4, description),
264 price_cents = COALESCE($5, price_cents),
265 item_type = COALESCE($6, item_type),
266 is_public = CASE WHEN removed_by_admin AND $7 = true THEN false ELSE COALESCE($7, is_public) END,
267 pwyw_enabled = COALESCE($8, pwyw_enabled),
268 pwyw_min_cents = COALESCE($9, pwyw_min_cents),
269 publish_at = CASE WHEN $10 THEN $11 ELSE publish_at END,
270 web_only = COALESCE($12, web_only),
271 ai_tier = COALESCE($13, ai_tier),
272 ai_disclosure = CASE WHEN $14 THEN $15 ELSE ai_disclosure END
273 WHERE id = $1
274 AND project_id IN (SELECT id FROM projects WHERE user_id = $2)
275 RETURNING *
276 "#,
277 )
278 .bind(id)
279 .bind(user_id)
280 .bind(title)
281 .bind(description)
282 .bind(price_cents)
283 .bind(item_type)
284 .bind(is_public)
285 .bind(pwyw_enabled)
286 .bind(pwyw_min_cents)
287 .bind(update_publish_at)
288 .bind(publish_at_value)
289 .bind(web_only)
290 .bind(ai_tier)
291 .bind(update_ai_disclosure)
292 .bind(ai_disclosure_value)
293 .fetch_one(pool)
294 .await?;
295
296 Ok(item)
297 }
298
299 /// Publish all items whose scheduled publish time has passed.
300 ///
301 /// Atomically sets `is_public = true` and clears `publish_at`, returning the
302 /// newly published items so the caller can send release announcements.
303 #[tracing::instrument(skip_all)]
304 pub async fn publish_scheduled_items(pool: &PgPool) -> Result<Vec<DbItem>> {
305 let items = sqlx::query_as::<_, DbItem>(
306 r#"
307 UPDATE items
308 SET is_public = true, publish_at = NULL, updated_at = NOW()
309 WHERE publish_at IS NOT NULL AND publish_at <= NOW() AND is_public = false AND removed_by_admin = false
310 RETURNING *
311 "#,
312 )
313 .fetch_all(pool)
314 .await?;
315
316 Ok(items)
317 }
318
319 /// Soft-delete an item (sets deleted_at, recoverable for 7 days).
320 #[tracing::instrument(skip_all)]
321 pub async fn delete_item(pool: &PgPool, id: ItemId, user_id: UserId) -> Result<()> {
322 sqlx::query(
323 r#"
324 UPDATE items SET deleted_at = NOW(), is_public = false
325 WHERE id = $1 AND deleted_at IS NULL
326 AND project_id IN (SELECT id FROM projects WHERE user_id = $2)
327 "#,
328 )
329 .bind(id)
330 .bind(user_id)
331 .execute(pool)
332 .await?;
333
334 Ok(())
335 }
336
337 /// Restore a soft-deleted item.
338 #[tracing::instrument(skip_all)]
339 pub async fn restore_item(pool: &PgPool, id: ItemId, user_id: UserId) -> Result<bool> {
340 let result = sqlx::query(
341 r#"
342 UPDATE items SET deleted_at = NULL
343 WHERE id = $1 AND deleted_at IS NOT NULL
344 AND project_id IN (SELECT id FROM projects WHERE user_id = $2)
345 "#,
346 )
347 .bind(id)
348 .bind(user_id)
349 .execute(pool)
350 .await?;
351
352 Ok(result.rows_affected() > 0)
353 }
354
355 /// Get soft-deleted items for a project (for the "Recently Deleted" section).
356 #[tracing::instrument(skip_all)]
357 pub async fn get_deleted_items_by_project(
358 pool: &PgPool,
359 project_id: ProjectId,
360 ) -> Result<Vec<DbItem>> {
361 let items = sqlx::query_as::<_, DbItem>(
362 "SELECT * FROM items WHERE project_id = $1 AND deleted_at IS NOT NULL ORDER BY deleted_at DESC LIMIT 500",
363 )
364 .bind(project_id)
365 .fetch_all(pool)
366 .await?;
367
368 Ok(items)
369 }
370
371 /// Collect S3 keys from items that are about to be purged (soft-deleted >7 days).
372 /// Returns all non-null S3 keys (audio, cover, video) so they can be deleted
373 /// from S3 before the DB rows are removed.
374 #[tracing::instrument(skip_all)]
375 pub async fn get_expired_deleted_item_s3_keys(pool: &PgPool) -> Result<Vec<String>> {
376 let keys: Vec<String> = sqlx::query_scalar(
377 r#"
378 SELECT k FROM (
379 SELECT unnest(ARRAY_REMOVE(ARRAY[audio_s3_key, cover_s3_key, video_s3_key], NULL)) AS k
380 FROM items
381 WHERE deleted_at IS NOT NULL AND deleted_at < NOW() - INTERVAL '7 days'
382 AND (audio_s3_key IS NOT NULL OR cover_s3_key IS NOT NULL OR video_s3_key IS NOT NULL)
383 ) sub
384 "#,
385 )
386 .fetch_all(pool)
387 .await?;
388
389 Ok(keys)
390 }
391
392 /// Collect S3 keys from versions belonging to items about to be purged.
393 /// Must be called before purge since CASCADE delete destroys version rows.
394 #[tracing::instrument(skip_all)]
395 pub async fn get_expired_deleted_item_version_s3_keys(pool: &PgPool) -> Result<Vec<String>> {
396 let keys: Vec<String> = sqlx::query_scalar(
397 r#"
398 SELECT v.s3_key
399 FROM versions v
400 JOIN items i ON v.item_id = i.id
401 WHERE i.deleted_at IS NOT NULL AND i.deleted_at < NOW() - INTERVAL '7 days'
402 AND v.s3_key IS NOT NULL
403 "#,
404 )
405 .fetch_all(pool)
406 .await?;
407
408 Ok(keys)
409 }
410
411 /// Sum total file sizes per user for items about to be purged, including version files.
412 /// Returns (user_id, total_bytes) pairs for storage decrement.
413 #[tracing::instrument(skip_all)]
414 pub async fn get_expired_deleted_item_storage_by_user(pool: &PgPool) -> Result<Vec<(super::UserId, i64)>> {
415 let rows: Vec<(super::UserId, i64)> = sqlx::query_as(
416 r#"
417 SELECT p.user_id,
418 COALESCE(SUM(
419 COALESCE(i.audio_file_size_bytes, 0) +
420 COALESCE(i.cover_file_size_bytes, 0) +
421 COALESCE(i.video_file_size_bytes, 0) +
422 COALESCE(ver.version_bytes, 0)
423 ), 0)::BIGINT AS total_bytes
424 FROM items i
425 JOIN projects p ON i.project_id = p.id
426 LEFT JOIN LATERAL (
427 SELECT COALESCE(SUM(v.file_size_bytes), 0)::BIGINT AS version_bytes
428 FROM versions v
429 WHERE v.item_id = i.id AND v.file_size_bytes IS NOT NULL
430 ) ver ON true
431 WHERE i.deleted_at IS NOT NULL AND i.deleted_at < NOW() - INTERVAL '7 days'
432 GROUP BY p.user_id
433 "#,
434 )
435 .fetch_all(pool)
436 .await?;
437
438 Ok(rows)
439 }
440
441 /// Collect all S3 keys from items belonging to a project (audio, cover, video).
442 #[tracing::instrument(skip_all)]
443 pub async fn get_project_item_s3_keys(pool: &PgPool, project_id: super::ProjectId) -> Result<Vec<String>> {
444 let keys: Vec<String> = sqlx::query_scalar(
445 r#"
446 SELECT k FROM (
447 SELECT unnest(ARRAY_REMOVE(ARRAY[audio_s3_key, cover_s3_key, video_s3_key], NULL)) AS k
448 FROM items
449 WHERE project_id = $1
450 AND (audio_s3_key IS NOT NULL OR cover_s3_key IS NOT NULL OR video_s3_key IS NOT NULL)
451 ) sub
452 "#,
453 )
454 .bind(project_id)
455 .fetch_all(pool)
456 .await?;
457
458 Ok(keys)
459 }
460
461 /// Collect S3 keys from versions belonging to items in a project.
462 #[tracing::instrument(skip_all)]
463 pub async fn get_project_version_s3_keys(pool: &PgPool, project_id: super::ProjectId) -> Result<Vec<String>> {
464 let keys: Vec<String> = sqlx::query_scalar(
465 r#"
466 SELECT v.s3_key
467 FROM versions v
468 JOIN items i ON v.item_id = i.id
469 WHERE i.project_id = $1 AND v.s3_key IS NOT NULL
470 "#,
471 )
472 .bind(project_id)
473 .fetch_all(pool)
474 .await?;
475
476 Ok(keys)
477 }
478
479 /// Sum total file sizes for all items and versions in a project.
480 #[tracing::instrument(skip_all)]
481 pub async fn get_project_storage_bytes(pool: &PgPool, project_id: super::ProjectId) -> Result<i64> {
482 let total: i64 = sqlx::query_scalar(
483 r#"
484 SELECT COALESCE(SUM(
485 COALESCE(i.audio_file_size_bytes, 0) +
486 COALESCE(i.cover_file_size_bytes, 0) +
487 COALESCE(i.video_file_size_bytes, 0) +
488 COALESCE(ver.version_bytes, 0)
489 ), 0)::BIGINT
490 FROM items i
491 LEFT JOIN LATERAL (
492 SELECT COALESCE(SUM(v.file_size_bytes), 0)::BIGINT AS version_bytes
493 FROM versions v
494 WHERE v.item_id = i.id AND v.file_size_bytes IS NOT NULL
495 ) ver ON true
496 WHERE i.project_id = $1
497 "#,
498 )
499 .bind(project_id)
500 .fetch_one(pool)
501 .await?;
502
503 Ok(total)
504 }
505
506 /// Permanently delete items that were soft-deleted more than 7 days ago.
507 #[tracing::instrument(skip_all)]
508 pub async fn purge_expired_deleted_items(pool: &PgPool) -> Result<u64> {
509 let result = sqlx::query(
510 "DELETE FROM items WHERE deleted_at IS NOT NULL AND deleted_at < NOW() - INTERVAL '7 days'",
511 )
512 .execute(pool)
513 .await?;
514
515 Ok(result.rows_affected())
516 }
517
518 /// Update the audio S3 key for an item (defense-in-depth: verifies ownership)
519 #[tracing::instrument(skip_all)]
520 pub async fn update_item_audio_s3_key(
521 pool: &PgPool,
522 item_id: ItemId,
523 user_id: UserId,
524 s3_key: &str,
525 ) -> Result<DbItem> {
526 let item = sqlx::query_as::<_, DbItem>(
527 r#"
528 UPDATE items
529 SET audio_s3_key = $2, updated_at = NOW()
530 WHERE id = $1
531 AND project_id IN (SELECT id FROM projects WHERE user_id = $3)
532 RETURNING *
533 "#,
534 )
535 .bind(item_id)
536 .bind(s3_key)
537 .bind(user_id)
538 .fetch_one(pool)
539 .await?;
540
541 Ok(item)
542 }
543
544 /// Update the cover image S3 key for an item (defense-in-depth: verifies ownership)
545 #[tracing::instrument(skip_all)]
546 pub async fn update_item_cover_s3_key(
547 pool: &PgPool,
548 item_id: ItemId,
549 user_id: UserId,
550 s3_key: &str,
551 ) -> Result<DbItem> {
552 let item = sqlx::query_as::<_, DbItem>(
553 r#"
554 UPDATE items
555 SET cover_s3_key = $2, updated_at = NOW()
556 WHERE id = $1
557 AND project_id IN (SELECT id FROM projects WHERE user_id = $3)
558 RETURNING *
559 "#,
560 )
561 .bind(item_id)
562 .bind(s3_key)
563 .bind(user_id)
564 .fetch_one(pool)
565 .await?;
566
567 Ok(item)
568 }
569
570 /// List all public items across all public projects owned by a user, newest first.
571 ///
572 /// Used by the creator RSS feed to avoid O(projects) queries.
573 /// Capped at 50 items (standard RSS feed size).
574 #[tracing::instrument(skip_all)]
575 pub async fn get_public_items_by_user(pool: &PgPool, user_id: UserId) -> Result<Vec<DbItem>> {
576 let items = sqlx::query_as::<_, DbItem>(
577 r#"
578 SELECT i.* FROM items i
579 JOIN projects p ON i.project_id = p.id
580 WHERE p.user_id = $1 AND p.is_public = true AND i.is_public = true AND i.listed = true
581 ORDER BY i.created_at DESC
582 LIMIT 50
583 "#,
584 )
585 .bind(user_id)
586 .fetch_all(pool)
587 .await?;
588
589 Ok(items)
590 }
591
592 /// Get the owner (user_id) of an item through its project
593 #[tracing::instrument(skip_all)]
594 pub async fn get_item_owner(pool: &PgPool, item_id: ItemId) -> Result<Option<UserId>> {
595 let owner: Option<UserId> = sqlx::query_scalar(
596 r#"
597 SELECT p.user_id
598 FROM items i
599 JOIN projects p ON i.project_id = p.id
600 WHERE i.id = $1
601 "#,
602 )
603 .bind(item_id)
604 .fetch_optional(pool)
605 .await?;
606
607 Ok(owner)
608 }
609
610 /// Pre-flight access check for content streaming/download. Returns item data
611 /// plus ownership, purchase, subscription, and bundle access.
612 /// Returns `None` if the item does not exist (or is soft-deleted).
613 #[derive(Debug)]
614 pub struct ItemAccessCheck {
615 // Ownership
616 pub owner_id: UserId,
617 // Access flags (only meaningful when a user_id is provided)
618 pub has_purchased: bool,
619 /// Subscription access as a witness, not a bool: `Some` only when the sealed
620 /// gate confirmed an active, in-period subscription. Feed it straight into
621 /// `pricing::AccessContext::subscription`.
622 pub subscription: Option<crate::db::subscriptions::SubscriptionGate>,
623 pub has_bundle_access: bool,
624 }
625
626 /// Ownership + the access flags that are pure transaction/bundle lookups.
627 /// `has_subscription` is deliberately NOT here: it must come from
628 /// [`crate::db::subscriptions::has_access`], the single sealed access gate, so
629 /// the grant predicate (incl. the `current_period_end` clause) cannot be
630 /// hand-written and drift. (Payments S1 / CHRONIC 2: this query previously
631 /// inlined the predicate and dropped the period clause on the download path.)
632 #[derive(Debug, sqlx::FromRow)]
633 struct ItemAccessPartial {
634 owner_id: UserId,
635 has_purchased: bool,
636 has_bundle_access: bool,
637 }
638
639 #[tracing::instrument(skip_all)]
640 pub async fn check_item_access(
641 pool: &PgPool,
642 item_id: ItemId,
643 user_id: Option<UserId>,
644 ) -> Result<Option<ItemAccessCheck>> {
645 // When no user is provided, access flags are all false
646 let uid = user_id.unwrap_or(UserId::nil());
647 let partial = sqlx::query_as::<_, ItemAccessPartial>(
648 r#"
649 SELECT
650 p.user_id AS owner_id,
651 EXISTS(
652 SELECT 1 FROM transactions t
653 WHERE t.item_id = i.id AND t.buyer_id = $2 AND t.status = 'completed'
654 ) AS has_purchased,
655 EXISTS(
656 SELECT 1 FROM bundle_items bi
657 JOIN transactions bt ON bt.item_id = bi.bundle_id
658 WHERE bi.item_id = i.id AND bt.buyer_id = $2 AND bt.status = 'completed'
659 ) AS has_bundle_access
660 FROM items i
661 JOIN projects p ON i.project_id = p.id
662 WHERE i.id = $1 AND i.deleted_at IS NULL
663 "#,
664 )
665 .bind(item_id)
666 .bind(uid)
667 .fetch_optional(pool)
668 .await?;
669
670 let Some(partial) = partial else {
671 return Ok(None);
672 };
673
674 // Subscription access goes through the one sealed gate, which returns a
675 // witness. Anonymous callers (no user_id) can't hold a subscription, so skip
676 // the query entirely.
677 let subscription = match user_id {
678 Some(real_uid) => {
679 crate::db::subscriptions::SubscriptionGate::check(
680 pool,
681 real_uid,
682 crate::db::subscriptions::SubscriptionScope::Item(item_id),
683 )
684 .await?
685 }
686 None => None,
687 };
688
689 Ok(Some(ItemAccessCheck {
690 owner_id: partial.owner_id,
691 has_purchased: partial.has_purchased,
692 subscription,
693 has_bundle_access: partial.has_bundle_access,
694 }))
695 }
696
697 /// Update the text body content for an item (for articles/essays).
698 ///
699 /// Recomputes `word_count` and `reading_time_minutes` on every save.
700 /// Reading time uses ~200 wpm (average adult reading speed) with floor
701 /// division, clamped to a minimum of 1 minute so the UI never shows "0 min".
702 #[tracing::instrument(skip_all)]
703 pub async fn update_item_text(pool: &PgPool, id: ItemId, user_id: UserId, body: &str) -> Result<DbItem> {
704 let word_count = body.split_whitespace().count() as i32;
705 let reading_time = (word_count / 200).max(1);
706
707 let item = sqlx::query_as::<_, DbItem>(
708 r#"
709 UPDATE items
710 SET body = $3, word_count = $4, reading_time_minutes = $5, updated_at = NOW()
711 WHERE id = $1
712 AND project_id IN (SELECT id FROM projects WHERE user_id = $2)
713 RETURNING *
714 "#,
715 )
716 .bind(id)
717 .bind(user_id)
718 .bind(body)
719 .bind(word_count)
720 .bind(reading_time)
721 .fetch_one(pool)
722 .await?;
723
724 Ok(item)
725 }
726
727 /// Update the license key settings on an item.
728 #[tracing::instrument(skip_all)]
729 pub async fn update_item_license_settings(
730 pool: &PgPool,
731 item_id: ItemId,
732 user_id: UserId,
733 enable_license_keys: bool,
734 default_max_activations: Option<i32>,
735 ) -> Result<()> {
736 sqlx::query(
737 r#"
738 UPDATE items
739 SET enable_license_keys = $3, default_max_activations = $4, updated_at = NOW()
740 WHERE id = $1
741 AND project_id IN (SELECT id FROM projects WHERE user_id = $2)
742 "#,
743 )
744 .bind(item_id)
745 .bind(user_id)
746 .bind(enable_license_keys)
747 .bind(default_max_activations)
748 .execute(pool)
749 .await?;
750
751 Ok(())
752 }
753
754 /// Update the license text preset and optional custom text on an item.
755 #[tracing::instrument(skip_all)]
756 pub async fn update_item_license_text(
757 pool: &PgPool,
758 item_id: ItemId,
759 user_id: UserId,
760 license_preset: Option<&str>,
761 custom_license_text: Option<&str>,
762 ) -> Result<()> {
763 sqlx::query(
764 r#"
765 UPDATE items
766 SET license_preset = $3, custom_license_text = $4, updated_at = NOW()
767 WHERE id = $1
768 AND project_id IN (SELECT id FROM projects WHERE user_id = $2)
769 "#,
770 )
771 .bind(item_id)
772 .bind(user_id)
773 .bind(license_preset)
774 .bind(custom_license_text)
775 .execute(pool)
776 .await?;
777
778 Ok(())
779 }
780
781 /// Increment the denormalized sales_count for an item (called on purchase/claim).
782 ///
783 /// Accepts any sqlx executor (`&PgPool`, `&mut Transaction`, etc.) so callers
784 /// can include this in a larger transaction when needed.
785 #[tracing::instrument(skip_all)]
786 pub async fn increment_sales_count<'e>(
787 executor: impl sqlx::PgExecutor<'e>,
788 item_id: ItemId,
789 ) -> Result<()> {
790 sqlx::query("UPDATE items SET sales_count = sales_count + 1 WHERE id = $1")
791 .bind(item_id)
792 .execute(executor)
793 .await?;
794
795 Ok(())
796 }
797
798 /// Decrement the denormalized sales_count for an item (called on refund/unclaim).
799 #[tracing::instrument(skip_all)]
800 pub async fn decrement_sales_count<'e>(
801 executor: impl sqlx::PgExecutor<'e>,
802 item_id: ItemId,
803 ) -> Result<()> {
804 sqlx::query("UPDATE items SET sales_count = GREATEST(sales_count - 1, 0) WHERE id = $1")
805 .bind(item_id)
806 .execute(executor)
807 .await?;
808
809 Ok(())
810 }
811
812 /// Atomically mark an item as having had its release announced.
813 /// Returns false if already announced (prevents duplicate announcements on unpublish/republish).
814 #[tracing::instrument(skip_all)]
815 pub async fn mark_release_announced(pool: &PgPool, item_id: ItemId) -> Result<bool> {
816 let result = sqlx::query(
817 "UPDATE items SET release_announced_at = NOW() WHERE id = $1 AND release_announced_at IS NULL",
818 )
819 .bind(item_id)
820 .execute(pool)
821 .await?;
822
823 Ok(result.rows_affected() > 0)
824 }
825
826 /// Set the linked MT thread ID for an item.
827 #[tracing::instrument(skip_all)]
828 pub async fn set_mt_thread_id(
829 pool: &PgPool,
830 item_id: ItemId,
831 thread_id: MtThreadId,
832 ) -> Result<()> {
833 sqlx::query("UPDATE items SET mt_thread_id = $2 WHERE id = $1")
834 .bind(item_id)
835 .bind(thread_id)
836 .execute(pool)
837 .await?;
838 Ok(())
839 }
840
841 /// Collect all S3 keys for items owned by a user (audio + cover + video).
842 ///
843 /// Returns item title, project slug, and optional S3 keys for audio/cover/video.
844 /// Only includes items that have at least one S3 key.
845 #[tracing::instrument(skip_all)]
846 pub async fn get_user_s3_keys(pool: &PgPool, user_id: UserId) -> Result<Vec<ItemS3KeyRow>> {
847 let rows = sqlx::query_as::<_, ItemS3KeyRow>(
848 r#"
849 SELECT i.title, p.id AS project_id, p.slug AS project_slug,
850 i.audio_s3_key, i.cover_s3_key, i.video_s3_key,
851 i.audio_file_size_bytes, i.cover_file_size_bytes, i.video_file_size_bytes
852 FROM items i JOIN projects p ON i.project_id = p.id
853 WHERE p.user_id = $1 AND (i.audio_s3_key IS NOT NULL OR i.cover_s3_key IS NOT NULL OR i.video_s3_key IS NOT NULL)
854 ORDER BY p.slug, i.sort_order
855 LIMIT 500
856 "#,
857 )
858 .bind(user_id)
859 .fetch_all(pool)
860 .await?;
861
862 Ok(rows)
863 }
864
865 /// Check whether a user has at least one public item across all their projects.
866 #[tracing::instrument(skip_all)]
867 pub async fn has_public_item_by_user(pool: &PgPool, user_id: UserId) -> Result<bool> {
868 let exists: bool = sqlx::query_scalar(
869 r#"
870 SELECT EXISTS(
871 SELECT 1 FROM items i
872 JOIN projects p ON i.project_id = p.id
873 WHERE p.user_id = $1 AND i.is_public = true
874 )
875 "#,
876 )
877 .bind(user_id)
878 .fetch_one(pool)
879 .await?;
880
881 Ok(exists)
882 }
883
884 /// Increment the play count for an item (called on audio/video stream request).
885 #[tracing::instrument(skip_all)]
886 pub async fn increment_play_count(pool: &PgPool, item_id: ItemId) -> Result<()> {
887 sqlx::query("UPDATE items SET play_count = play_count + 1 WHERE id = $1")
888 .bind(item_id)
889 .execute(pool)
890 .await?;
891
892 Ok(())
893 }
894
895 /// Record a unique listener and increment unique_play_count if this is the first
896 /// play by this user. Returns true if new unique listener, false if already counted.
897 /// Runs in a transaction so the user_plays INSERT and count UPDATE are atomic.
898 #[tracing::instrument(skip_all)]
899 pub async fn record_unique_play(pool: &PgPool, user_id: super::UserId, item_id: ItemId) -> Result<bool> {
900 let mut tx = pool.begin().await?;
901
902 let result = sqlx::query(
903 "INSERT INTO user_plays (user_id, item_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
904 )
905 .bind(user_id)
906 .bind(item_id)
907 .execute(&mut *tx)
908 .await?;
909
910 if result.rows_affected() > 0 {
911 sqlx::query("UPDATE items SET unique_play_count = unique_play_count + 1 WHERE id = $1")
912 .bind(item_id)
913 .execute(&mut *tx)
914 .await?;
915 tx.commit().await?;
916 Ok(true)
917 } else {
918 tx.commit().await?;
919 Ok(false)
920 }
921 }
922
923 /// Increment the item-level download count (called alongside per-version increment).
924 #[tracing::instrument(skip_all)]
925 pub async fn increment_item_download_count(pool: &PgPool, item_id: ItemId) -> Result<()> {
926 sqlx::query("UPDATE items SET download_count = download_count + 1 WHERE id = $1")
927 .bind(item_id)
928 .execute(pool)
929 .await?;
930
931 Ok(())
932 }
933
934
935 /// Fetch a public item by project ID and slug (for custom domain routing).
936 #[tracing::instrument(skip_all)]
937 pub async fn get_item_by_project_and_slug(
938 pool: &PgPool,
939 project_id: ProjectId,
940 slug: &str,
941 ) -> Result<Option<DbItem>> {
942 let item = sqlx::query_as::<_, DbItem>(
943 "SELECT * FROM items WHERE project_id = $1 AND slug = $2 AND is_public = true",
944 )
945 .bind(project_id)
946 .bind(slug)
947 .fetch_optional(pool)
948 .await?;
949
950 Ok(item)
951 }
952
953 /// Hide all items for a user (set is_public = false). Used for post-grace enforcement.
954 /// Returns the number of items hidden.
955 #[tracing::instrument(skip_all)]
956 pub async fn hide_all_items_for_user(pool: &PgPool, user_id: UserId) -> Result<u64> {
957 let result = sqlx::query(
958 r#"
959 UPDATE items SET is_public = false
960 WHERE project_id IN (SELECT id FROM projects WHERE user_id = $1)
961 AND is_public = true
962 "#,
963 )
964 .bind(user_id)
965 .execute(pool)
966 .await?;
967
968 Ok(result.rows_affected())
969 }
970
971 /// Unhide all items for a user (set is_public = true). Used when a creator re-subscribes
972 /// after post-grace hiding. Returns the number of items unhidden.
973 #[tracing::instrument(skip_all)]
974 pub async fn unhide_all_items_for_user(pool: &PgPool, user_id: UserId) -> Result<u64> {
975 let result = sqlx::query(
976 r#"
977 UPDATE items SET is_public = true
978 WHERE project_id IN (SELECT id FROM projects WHERE user_id = $1)
979 AND is_public = false
980 AND removed_at IS NULL
981 "#,
982 )
983 .bind(user_id)
984 .execute(pool)
985 .await?;
986
987 Ok(result.rows_affected())
988 }
989
990 /// Admin: remove an item (hide from public, record reason). The item stays in the DB
991 /// and the creator can see it in their dashboard with the removal reason.
992 #[tracing::instrument(skip_all)]
993 pub async fn admin_remove_item(
994 pool: &PgPool,
995 item_id: ItemId,
996 reason: &str,
997 ) -> Result<DbItem> {
998 let item = sqlx::query_as::<_, DbItem>(
999 r#"
1000 UPDATE items
1001 SET removed_by_admin = true,
1002 removal_reason = $2,
1003 removed_at = NOW(),
1004 is_public = false
1005 WHERE id = $1
1006 RETURNING *
1007 "#,
1008 )
1009 .bind(item_id)
1010 .bind(reason)
1011 .fetch_one(pool)
1012 .await?;
1013
1014 Ok(item)
1015 }
1016
1017 /// Admin: restore a previously removed item. Clears the removal fields
1018 /// but does NOT re-publish (creator must publish manually).
1019 #[tracing::instrument(skip_all)]
1020 pub async fn admin_restore_item(
1021 pool: &PgPool,
1022 item_id: ItemId,
1023 ) -> Result<DbItem> {
1024 let item = sqlx::query_as::<_, DbItem>(
1025 r#"
1026 UPDATE items
1027 SET removed_by_admin = false,
1028 removal_reason = NULL,
1029 removed_at = NULL
1030 WHERE id = $1
1031 RETURNING *
1032 "#,
1033 )
1034 .bind(item_id)
1035 .fetch_one(pool)
1036 .await?;
1037
1038 Ok(item)
1039 }
1040
1041 /// Count public, listed items (for landing page stats).
1042 #[tracing::instrument(skip_all)]
1043 pub async fn count_public_listed(pool: &PgPool) -> Result<i64> {
1044 let count = sqlx::query_scalar::<_, i64>(
1045 "SELECT COUNT(*) FROM items WHERE is_public = true AND listed = true",
1046 )
1047 .fetch_one(pool)
1048 .await?;
1049
1050 Ok(count)
1051 }
1052