Skip to main content

max / makenotwork

Run #15: fix 5 SERIOUS findings; seal subscription access gate - Payments S1: route check_item_access through subscriptions::has_access and seal the grant predicate. The "grants access right now" predicate now lives as a private associated const inside a sealed gate module behind a SubscriptionGate witness (private field, no public constructor); the only way to mint one is to run the predicate. Thread Option<SubscriptionGate> into pricing::AccessContext so subscription access cannot be granted at the consumption point without a proof. Drop the bool-batch get_user_subscribed_item_ids in favour of a witness-bearing subscribed_item_gates map. - Security S1: gate private git raw/clone and browse on MaybeUserVerified (revalidates the session against the DB) instead of MaybeUserUnverified, so a revoked owner session loses access immediately. - Storage S1: anchor the is_s3_key_live UrlSuffix LIKE on the path boundary so a queued key can't false-positive against a neighbouring row and suppress a legitimate deletion. - Storage S2: route every single-key orphan delete through one guarded funnel (delete_orphan_key_guarded) so the orphaned-upload reaper can't skip the is_s3_key_live check the retry worker already had. - Performance S1: make the scan spool tempfile path job-unique (include the job UUID) so concurrent/retried scans of the same s3_key can't collide. - test: pin the lapsed-period item-subscription gate (streaming.rs). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-09 06:25 UTC
Commit: 97c0023e6727775f3904b8472e2698f91c4c7adc
Parent: cf0c345
14 files changed, +488 insertions, -182 deletions
@@ -608,18 +608,34 @@ pub async fn get_item_owner(pool: &PgPool, item_id: ItemId) -> Result<Option<Use
608 608 }
609 609
610 610 /// Pre-flight access check for content streaming/download. Returns item data
611 - /// plus ownership, purchase, subscription, and bundle access in a single query.
611 + /// plus ownership, purchase, subscription, and bundle access.
612 612 /// Returns `None` if the item does not exist (or is soft-deleted).
613 - #[derive(Debug, sqlx::FromRow)]
613 + #[derive(Debug)]
614 614 pub struct ItemAccessCheck {
615 615 // Ownership
616 616 pub owner_id: UserId,
617 617 // Access flags (only meaningful when a user_id is provided)
618 618 pub has_purchased: bool,
619 - pub has_subscription: bool,
619 + /// Subscription access as a witness, not a bool: `Some` only when the sealed
620 + /// gate confirmed an active, in-period subscription. Feed it straight into
621 + /// `pricing::AccessContext::subscription`.
622 + pub subscription: Option<crate::db::subscriptions::SubscriptionGate>,
620 623 pub has_bundle_access: bool,
621 624 }
622 625
626 + /// Ownership + the access flags that are pure transaction/bundle lookups.
627 + /// `has_subscription` is deliberately NOT here: it must come from
628 + /// [`crate::db::subscriptions::has_access`], the single sealed access gate, so
629 + /// the grant predicate (incl. the `current_period_end` clause) cannot be
630 + /// hand-written and drift. (Payments S1 / CHRONIC 2: this query previously
631 + /// inlined the predicate and dropped the period clause on the download path.)
632 + #[derive(Debug, sqlx::FromRow)]
633 + struct ItemAccessPartial {
634 + owner_id: UserId,
635 + has_purchased: bool,
636 + has_bundle_access: bool,
637 + }
638 +
623 639 #[tracing::instrument(skip_all)]
624 640 pub async fn check_item_access(
625 641 pool: &PgPool,
@@ -628,7 +644,7 @@ pub async fn check_item_access(
628 644 ) -> Result<Option<ItemAccessCheck>> {
629 645 // When no user is provided, access flags are all false
630 646 let uid = user_id.unwrap_or(UserId::nil());
631 - let row = sqlx::query_as::<_, ItemAccessCheck>(
647 + let partial = sqlx::query_as::<_, ItemAccessPartial>(
632 648 r#"
633 649 SELECT
634 650 p.user_id AS owner_id,
@@ -637,11 +653,6 @@ pub async fn check_item_access(
637 653 WHERE t.item_id = i.id AND t.buyer_id = $2 AND t.status = 'completed'
638 654 ) AS has_purchased,
639 655 EXISTS(
640 - SELECT 1 FROM subscriptions s
641 - WHERE s.subscriber_id = $2 AND s.item_id = i.id
642 - AND s.status = 'active' AND s.paused_at IS NULL
643 - ) AS has_subscription,
644 - EXISTS(
645 656 SELECT 1 FROM bundle_items bi
646 657 JOIN transactions bt ON bt.item_id = bi.bundle_id
647 658 WHERE bi.item_id = i.id AND bt.buyer_id = $2 AND bt.status = 'completed'
@@ -656,7 +667,31 @@ pub async fn check_item_access(
656 667 .fetch_optional(pool)
657 668 .await?;
658 669
659 - Ok(row)
670 + let Some(partial) = partial else {
671 + return Ok(None);
672 + };
673 +
674 + // Subscription access goes through the one sealed gate, which returns a
675 + // witness. Anonymous callers (no user_id) can't hold a subscription, so skip
676 + // the query entirely.
677 + let subscription = match user_id {
678 + Some(real_uid) => {
679 + crate::db::subscriptions::SubscriptionGate::check(
680 + pool,
681 + real_uid,
682 + crate::db::subscriptions::SubscriptionScope::Item(item_id),
683 + )
684 + .await?
685 + }
686 + None => None,
687 + };
688 +
689 + Ok(Some(ItemAccessCheck {
690 + owner_id: partial.owner_id,
691 + has_purchased: partial.has_purchased,
692 + subscription,
693 + has_bundle_access: partial.has_bundle_access,
694 + }))
660 695 }
661 696
662 697 /// Update the text body content for an item (for articles/essays).
@@ -172,7 +172,13 @@ pub async fn is_s3_key_live(pool: &PgPool, bucket: &str, s3_key: &str) -> Result
172 172 // Escape LIKE wildcards so a key with a literal `_`/`%` (from a user
173 173 // folder name) can't false-positive against neighbouring rows.
174 174 let escaped = s3_key.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_");
175 - query = query.bind(format!("%{escaped}"));
175 + // Anchor on the leading path separator. Stored values are always
176 + // `{cdn_base}/{s3_key}`, so the key is preceded by `/` in every live
177 + // URL. Without the anchor, `LIKE '%u/1/cover.png'` also matches an
178 + // unrelated row ending `.../OTHERu/1/cover.png`, falsely marking a dead
179 + // object live and suppressing its deletion (orphan leak). Requiring the
180 + // `/` boundary makes the substring match a true path-suffix match.
181 + query = query.bind(format!("%/{escaped}"));
176 182 }
177 183 Ok(query.fetch_one(pool).await?)
178 184 }
@@ -352,58 +352,143 @@ pub async fn resume_subscriptions_for_creator(
352 352 /// missed/delayed `customer.subscription.deleted` webhook — `status = 'active'`
353 353 /// alone trusts Stripe to push the cancellation promptly.
354 354 ///
355 - /// SINGLE source of truth for "does this subscription grant access right now".
356 - /// Every access gate routes through [`has_access`] / [`get_user_subscribed_item_ids`],
357 - /// which interpolate this constant; do not hand-write the predicate at a call
358 - /// site. Run #14 CHRONIC: the item-level gates had drifted from the
359 - /// project-level gate by omitting the period clause — centralizing it here makes
360 - /// that divergence unwritable. (Compile-time constant, never user input, so the
361 - /// `format!` interpolation is injection-safe; the `$N` placeholders stay bound.)
362 - const GRANTS_ACCESS_PREDICATE: &str = "status = 'active' AND paused_at IS NULL \
363 - AND (current_period_end IS NULL OR current_period_end > NOW())";
364 -
365 - /// What a subscription access check is scoped to. Both arms share
366 - /// [`GRANTS_ACCESS_PREDICATE`] via [`has_access`], so a project gate and an item
367 - /// gate cannot diverge.
355 + /// What a subscription access check is scoped to. Both arms run the SAME sealed
356 + /// predicate inside [`gate`], so a project gate and an item gate cannot diverge.
368 357 #[derive(Debug, Clone, Copy)]
369 358 pub enum SubscriptionScope {
370 359 Project(ProjectId),
371 360 Item(super::ItemId),
372 361 }
373 362
363 + pub use gate::SubscriptionGate;
364 +
365 + /// Sealed home of the "does a subscription grant access right now" predicate.
366 + ///
367 + /// The predicate text lives in exactly ONE place — [`SubscriptionGate`]'s
368 + /// private `PREDICATE` associated const — and is unreachable from the rest of
369 + /// this module, let alone other modules. The only way to learn "this user has
370 + /// access" is [`SubscriptionGate::check`] (or [`SubscriptionGate::accessible_item_ids`]
371 + /// for the batch shape), each of which runs that predicate. A `SubscriptionGate`
372 + /// value is a witness: its field is private and there is no public constructor,
373 + /// so access-granting code can neither fabricate one nor hand-write a divergent
374 + /// gate.
375 + ///
376 + /// Payments S1 / CHRONIC 2: the predicate used to be a shareable `&str` const,
377 + /// and item gates drifted by dropping the `current_period_end` clause. A const
378 + /// is copy-pasteable; a private associated const inside a sealed submodule is
379 + /// not. Sealing it here makes the divergence unwritable, not merely discouraged.
380 + mod gate {
381 + use super::SubscriptionScope;
382 + use crate::db::{ItemId, UserId};
383 + use crate::error::Result;
384 + use sqlx::PgPool;
385 + use std::collections::HashMap;
386 +
387 + /// Proof that a subscription currently grants access. Constructible ONLY via
388 + /// [`SubscriptionGate::check`] — the private `()` field seals the type so no
389 + /// other code can mint one.
390 + #[derive(Debug, Clone, Copy)]
391 + pub struct SubscriptionGate(());
392 +
393 + impl SubscriptionGate {
394 + /// The single source of truth for "grants access right now". Private to
395 + /// this submodule: nothing outside can read it as a string, so it cannot
396 + /// be copy-pasted into a divergent query. (Compile-time constant, never
397 + /// user input, so the `format!` interpolation is injection-safe; the
398 + /// `$N` placeholders stay bound.)
399 + const PREDICATE: &'static str = "status = 'active' AND paused_at IS NULL \
400 + AND (current_period_end IS NULL OR current_period_end > NOW())";
401 +
402 + /// Does `user_id` hold a subscription that currently grants access to
403 + /// `scope`? Returns `Some(gate)` iff so — the sole gate constructor and
404 + /// the single entry point for project- and item-level access checks.
405 + #[tracing::instrument(skip_all)]
406 + pub async fn check(
407 + pool: &PgPool,
408 + user_id: UserId,
409 + scope: SubscriptionScope,
410 + ) -> Result<Option<SubscriptionGate>> {
411 + let exists: bool = match scope {
412 + SubscriptionScope::Project(project_id) => {
413 + sqlx::query_scalar(&format!(
414 + "SELECT EXISTS(SELECT 1 FROM subscriptions \
415 + WHERE subscriber_id = $1 AND project_id = $2 AND {})",
416 + Self::PREDICATE
417 + ))
418 + .bind(user_id)
419 + .bind(project_id)
420 + .fetch_one(pool)
421 + .await?
422 + }
423 + SubscriptionScope::Item(item_id) => {
424 + sqlx::query_scalar(&format!(
425 + "SELECT EXISTS(SELECT 1 FROM subscriptions \
426 + WHERE subscriber_id = $1 AND item_id = $2 AND {})",
427 + Self::PREDICATE
428 + ))
429 + .bind(user_id)
430 + .bind(item_id)
431 + .fetch_one(pool)
432 + .await?
433 + }
434 + };
435 +
436 + Ok(exists.then_some(SubscriptionGate(())))
437 + }
438 +
439 + /// Every item ID `user_id` currently has access to via subscription
440 + /// (batch gate). Runs the same sealed predicate as [`check`], so the
441 + /// batch path cannot drift from the single-item gate.
442 + #[tracing::instrument(skip_all)]
443 + pub async fn accessible_item_ids(pool: &PgPool, user_id: UserId) -> Result<Vec<ItemId>> {
444 + let item_ids: Vec<ItemId> = sqlx::query_scalar(&format!(
445 + "SELECT DISTINCT item_id FROM subscriptions \
446 + WHERE subscriber_id = $1 AND item_id IS NOT NULL AND {}",
447 + Self::PREDICATE
448 + ))
449 + .bind(user_id)
450 + .fetch_all(pool)
451 + .await?;
452 +
453 + Ok(item_ids)
454 + }
455 +
456 + /// Map of every item `user_id` currently has subscription access to →
457 + /// its access proof. The witness-bearing batch shape used by the project
458 + /// page, where each item's [`AccessContext`](crate::pricing::AccessContext)
459 + /// needs its own gate. Runs the sealed predicate once.
460 + #[tracing::instrument(skip_all)]
461 + pub async fn subscribed_item_gates(
462 + pool: &PgPool,
463 + user_id: UserId,
464 + ) -> Result<HashMap<ItemId, SubscriptionGate>> {
465 + let ids = Self::accessible_item_ids(pool, user_id).await?;
466 + Ok(ids.into_iter().map(|id| (id, SubscriptionGate(()))).collect())
467 + }
468 +
469 + /// Test-only constructor. Real gates can only be minted by running the
470 + /// predicate against the DB; unit tests (e.g. `pricing`) need to
471 + /// fabricate the "access granted" state without a database. Gated to
472 + /// test builds so production code still cannot forge a witness.
473 + #[cfg(test)]
474 + pub(crate) fn test_witness() -> Self {
475 + SubscriptionGate(())
476 + }
477 + }
478 + }
479 +
374 480 /// Does `user_id` hold a subscription that currently grants access to `scope`?
375 481 ///
376 - /// The single entry point for project- and item-level subscription access gates.
482 + /// Thin boolean wrapper over the sealed [`SubscriptionGate::check`]; prefer
483 + /// taking the [`SubscriptionGate`] witness directly where a proof of access is
484 + /// useful downstream.
377 485 #[tracing::instrument(skip_all)]
378 486 pub async fn has_access(
379 487 pool: &PgPool,
380 488 user_id: UserId,
381 489 scope: SubscriptionScope,
382 490 ) -> Result<bool> {
383 - let count: i64 = match scope {
384 - SubscriptionScope::Project(project_id) => {
385 - sqlx::query_scalar(&format!(
386 - "SELECT COUNT(*) FROM subscriptions \
387 - WHERE subscriber_id = $1 AND project_id = $2 AND {GRANTS_ACCESS_PREDICATE}"
388 - ))
389 - .bind(user_id)
390 - .bind(project_id)
391 - .fetch_one(pool)
392 - .await?
393 - }
394 - SubscriptionScope::Item(item_id) => {
395 - sqlx::query_scalar(&format!(
396 - "SELECT COUNT(*) FROM subscriptions \
397 - WHERE subscriber_id = $1 AND item_id = $2 AND {GRANTS_ACCESS_PREDICATE}"
398 - ))
399 - .bind(user_id)
400 - .bind(item_id)
401 - .fetch_one(pool)
402 - .await?
403 - }
404 - };
405 -
406 - Ok(count > 0)
491 + Ok(SubscriptionGate::check(pool, user_id, scope).await?.is_some())
407 492 }
408 493
409 494 /// Get user subscriptions joined with project and tier data (for library display).
@@ -451,27 +536,6 @@ pub async fn get_project_subscriber_count(
451 536 Ok(count)
452 537 }
453 538
454 - // ── Item-level subscriptions ──
455 -
456 - /// Get all item IDs that `user_id` currently has access to via subscription
457 - /// (for batch access checks). Shares [`GRANTS_ACCESS_PREDICATE`] with
458 - /// [`has_access`] so the batch path cannot drift from the single-item gate.
459 - #[tracing::instrument(skip_all)]
460 - pub async fn get_user_subscribed_item_ids(
461 - pool: &PgPool,
462 - user_id: UserId,
463 - ) -> Result<Vec<super::ItemId>> {
464 - let item_ids: Vec<super::ItemId> = sqlx::query_scalar(&format!(
465 - "SELECT DISTINCT item_id FROM subscriptions \
466 - WHERE subscriber_id = $1 AND item_id IS NOT NULL AND {GRANTS_ACCESS_PREDICATE}"
467 - ))
468 - .bind(user_id)
469 - .fetch_all(pool)
470 - .await?;
471 -
472 - Ok(item_ids)
473 - }
474 -
475 539 // ── Export ──
476 540
477 541 /// Export all subscribers across a creator's projects.
@@ -130,12 +130,27 @@ mod parse_dollars_tests {
130 130 /// Pre-fetched access state for a user viewing a priced resource.
131 131 ///
132 132 /// Routes populate this from DB lookups, then pass it to `PricingModel::can_access()`.
133 - /// Adding a new payment method means adding a new bool field here.
133 + ///
134 + /// `subscription` holds a [`SubscriptionGate`](db::subscriptions::SubscriptionGate)
135 + /// witness rather than a bool: the only way to set "has an active subscription"
136 + /// is to present a gate, which can only be minted by running the canonical
137 + /// access predicate (see `db::subscriptions::gate`). This makes it impossible to
138 + /// grant subscription access here without having actually checked it — the
139 + /// consumption-point counterpart to the sealed gate.
134 140 #[derive(Debug, Clone, Default)]
135 141 pub struct AccessContext {
136 142 pub is_creator: bool,
137 143 pub has_purchased: bool,
138 - pub has_active_subscription: bool,
144 + pub subscription: Option<db::subscriptions::SubscriptionGate>,
145 + }
146 +
147 + impl AccessContext {
148 + /// True iff a subscription currently grants access — only possible when a
149 + /// real [`SubscriptionGate`](db::subscriptions::SubscriptionGate) proof is
150 + /// present.
151 + pub fn has_active_subscription(&self) -> bool {
152 + self.subscription.is_some()
153 + }
139 154 }
140 155
141 156 /// What kind of checkout flow a pricing model requires.
@@ -233,7 +248,7 @@ impl PricingModel for FixedPricing {
233 248 }
234 249
235 250 fn can_access(&self, ctx: &AccessContext) -> bool {
236 - ctx.is_creator || ctx.has_purchased || ctx.has_active_subscription
251 + ctx.is_creator || ctx.has_purchased || ctx.has_active_subscription()
237 252 }
238 253
239 254 fn price_display(&self) -> String {
@@ -278,7 +293,7 @@ impl PricingModel for PwywPricing {
278 293 }
279 294
280 295 fn can_access(&self, ctx: &AccessContext) -> bool {
281 - ctx.is_creator || ctx.has_purchased || ctx.has_active_subscription
296 + ctx.is_creator || ctx.has_purchased || ctx.has_active_subscription()
282 297 }
283 298
284 299 fn price_display(&self) -> String {
@@ -333,7 +348,7 @@ impl PricingModel for SubscriptionPricing {
333 348 }
334 349
335 350 fn can_access(&self, ctx: &AccessContext) -> bool {
336 - ctx.is_creator || ctx.has_active_subscription
351 + ctx.is_creator || ctx.has_active_subscription()
337 352 }
338 353
339 354 fn price_display(&self) -> String {
@@ -407,13 +422,17 @@ pub async fn build_project_access_context(
407 422 let is_creator = user_id == creator_user_id;
408 423 let has_purchased =
409 424 db::transactions::has_purchased_project(pool, user_id, project_id).await?;
410 - let has_active_subscription =
411 - db::subscriptions::has_access(pool, user_id, db::subscriptions::SubscriptionScope::Project(project_id)).await?;
425 + let subscription = db::subscriptions::SubscriptionGate::check(
426 + pool,
427 + user_id,
428 + db::subscriptions::SubscriptionScope::Project(project_id),
429 + )
430 + .await?;
412 431
413 432 Ok(AccessContext {
414 433 is_creator,
415 434 has_purchased,
416 - has_active_subscription,
435 + subscription,
417 436 })
418 437 }
419 438
@@ -493,7 +512,7 @@ mod tests {
493 512 fn fixed_access_subscribed() {
494 513 let p = FixedPricing { price_cents: 999 };
495 514 assert!(p.can_access(&AccessContext {
496 - has_active_subscription: true,
515 + subscription: Some(crate::db::subscriptions::SubscriptionGate::test_witness()),
497 516 ..Default::default()
498 517 }));
499 518 }
@@ -660,7 +679,7 @@ mod tests {
660 679 #[test]
661 680 fn subscription_access_subscribed() {
662 681 assert!(SubscriptionPricing.can_access(&AccessContext {
663 - has_active_subscription: true,
682 + subscription: Some(crate::db::subscriptions::SubscriptionGate::test_witness()),
664 683 ..Default::default()
665 684 }));
666 685 }
@@ -828,7 +847,7 @@ mod tests {
828 847 // Subscription items don't honor has_purchased (by design)
829 848 assert!(!SubscriptionPricing.can_access(&AccessContext {
830 849 has_purchased: true,
831 - has_active_subscription: false,
850 + subscription: None,
832 851 is_creator: false,
833 852 }));
834 853 }
@@ -878,7 +897,7 @@ mod tests {
878 897 let ctx = AccessContext {
879 898 is_creator: false,
880 899 has_purchased: false,
881 - has_active_subscription: false,
900 + subscription: None,
882 901 };
883 902 // Only FreePricing should grant access with no flags
884 903 assert!(FreePricing.can_access(&ctx));
@@ -892,7 +911,7 @@ mod tests {
892 911 let ctx = AccessContext {
893 912 is_creator: true,
894 913 has_purchased: true,
895 - has_active_subscription: true,
914 + subscription: Some(crate::db::subscriptions::SubscriptionGate::test_witness()),
896 915 };
897 916 // All pricing models should grant access with all flags
898 917 assert!(FreePricing.can_access(&ctx));
@@ -1039,7 +1058,7 @@ mod tests {
1039 1058 min_cents: Some(500),
1040 1059 };
1041 1060 assert!(p.can_access(&AccessContext {
1042 - has_active_subscription: true,
1061 + subscription: Some(crate::db::subscriptions::SubscriptionGate::test_witness()),
1043 1062 ..Default::default()
1044 1063 }));
1045 1064 }
@@ -1053,7 +1072,7 @@ mod tests {
1053 1072 has_purchased in proptest::bool::ANY,
1054 1073 has_active_subscription in proptest::bool::ANY,
1055 1074 ) {
1056 - let ctx = AccessContext { is_creator, has_purchased, has_active_subscription };
1075 + let ctx = AccessContext { is_creator, has_purchased, subscription: has_active_subscription.then(crate::db::subscriptions::SubscriptionGate::test_witness) };
1057 1076 proptest::prop_assert!(FreePricing.can_access(&ctx));
1058 1077 proptest::prop_assert_eq!(FreePricing.price_cents(), 0);
1059 1078 }
@@ -1064,7 +1083,7 @@ mod tests {
1064 1083 has_purchased in proptest::bool::ANY,
1065 1084 has_active_subscription in proptest::bool::ANY,
1066 1085 ) {
1067 - let ctx = AccessContext { is_creator, has_purchased, has_active_subscription };
1086 + let ctx = AccessContext { is_creator, has_purchased, subscription: has_active_subscription.then(crate::db::subscriptions::SubscriptionGate::test_witness) };
1068 1087 let p = FixedPricing { price_cents: 999 };
1069 1088 let expected = is_creator || has_purchased || has_active_subscription;
1070 1089 proptest::prop_assert_eq!(p.can_access(&ctx), expected);
@@ -1076,7 +1095,7 @@ mod tests {
1076 1095 has_purchased in proptest::bool::ANY,
1077 1096 has_active_subscription in proptest::bool::ANY,
1078 1097 ) {
1079 - let ctx = AccessContext { is_creator, has_purchased, has_active_subscription };
1098 + let ctx = AccessContext { is_creator, has_purchased, subscription: has_active_subscription.then(crate::db::subscriptions::SubscriptionGate::test_witness) };
1080 1099 let p = PwywPricing { min_cents: Some(500) };
1081 1100 let expected = is_creator || has_purchased || has_active_subscription;
1082 1101 proptest::prop_assert_eq!(p.can_access(&ctx), expected);
@@ -1088,7 +1107,7 @@ mod tests {
1088 1107 has_purchased in proptest::bool::ANY,
1089 1108 has_active_subscription in proptest::bool::ANY,
1090 1109 ) {
1091 - let ctx = AccessContext { is_creator, has_purchased, has_active_subscription };
1110 + let ctx = AccessContext { is_creator, has_purchased, subscription: has_active_subscription.then(crate::db::subscriptions::SubscriptionGate::test_witness) };
1092 1111 // Subscription only honors is_creator and has_active_subscription
1093 1112 let expected = is_creator || has_active_subscription;
1094 1113 proptest::prop_assert_eq!(SubscriptionPricing.can_access(&ctx), expected);
@@ -9,7 +9,7 @@ use serde::Deserialize;
9 9 use tower_sessions::Session;
10 10
11 11 use crate::{
12 - auth::MaybeUserUnverified,
12 + auth::MaybeUserVerified,
13 13 constants,
14 14 db,
15 15 error::{AppError, Result},
@@ -28,7 +28,7 @@ use super::{
28 28 pub(super) async fn repo_overview(
29 29 State(state): State<AppState>,
30 30 session: Session,
31 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
31 + MaybeUserVerified(maybe_user): MaybeUserVerified,
32 32 Path((owner, repo_name)): Path<(String, String)>,
33 33 ) -> Result<impl IntoResponse> {
34 34 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
@@ -79,7 +79,7 @@ pub(super) async fn repo_overview(
79 79 pub(super) async fn tree_root(
80 80 State(state): State<AppState>,
81 81 session: Session,
82 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
82 + MaybeUserVerified(maybe_user): MaybeUserVerified,
83 83 Path((owner, repo_name, git_ref)): Path<(String, String, String)>,
84 84 ) -> Result<impl IntoResponse> {
85 85 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
@@ -129,7 +129,7 @@ pub(super) async fn tree_root(
129 129 pub(super) async fn tree_or_file(
130 130 State(state): State<AppState>,
131 131 session: Session,
132 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
132 + MaybeUserVerified(maybe_user): MaybeUserVerified,
133 133 Path((owner, repo_name, git_ref, path)): Path<(String, String, String, String)>,
134 134 ) -> Result<Response> {
135 135 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
@@ -245,7 +245,7 @@ pub(super) struct CommitQuery {
245 245 pub(super) async fn commit_log(
246 246 State(state): State<AppState>,
247 247 session: Session,
248 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
248 + MaybeUserVerified(maybe_user): MaybeUserVerified,
249 249 Path((owner, repo_name, git_ref)): Path<(String, String, String)>,
250 250 Query(query): Query<CommitQuery>,
251 251 ) -> Result<impl IntoResponse> {
@@ -291,7 +291,7 @@ pub(super) async fn commit_log(
291 291 pub(super) async fn commit_detail_page(
292 292 State(state): State<AppState>,
293 293 session: Session,
294 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
294 + MaybeUserVerified(maybe_user): MaybeUserVerified,
295 295 Path((owner, repo_name, oid_str)): Path<(String, String, String)>,
296 296 ) -> Result<impl IntoResponse> {
297 297 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
@@ -343,7 +343,7 @@ pub(super) async fn commit_detail_page(
343 343 pub(super) async fn blame_view(
344 344 State(state): State<AppState>,
345 345 session: Session,
346 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
346 + MaybeUserVerified(maybe_user): MaybeUserVerified,
347 347 Path((owner, repo_name, git_ref, path)): Path<(String, String, String, String)>,
348 348 ) -> Result<impl IntoResponse> {
349 349 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
@@ -385,7 +385,7 @@ pub(super) async fn blame_view(
385 385 pub(super) async fn user_repos(
386 386 State(state): State<AppState>,
387 387 session: Session,
388 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
388 + MaybeUserVerified(maybe_user): MaybeUserVerified,
389 389 Path(owner): Path<String>,
390 390 ) -> Result<impl IntoResponse> {
391 391 let username = db::Username::new(&owner).map_err(|_| AppError::NotFound)?;
@@ -423,7 +423,7 @@ pub(super) struct ExploreQuery {
423 423 pub(super) async fn git_landing(
424 424 State(state): State<AppState>,
425 425 session: Session,
426 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
426 + MaybeUserVerified(maybe_user): MaybeUserVerified,
427 427 Query(query): Query<ExploreQuery>,
428 428 ) -> Result<impl IntoResponse> {
429 429 let page = query.page.unwrap_or(1).clamp(1, 10_000);
@@ -452,7 +452,7 @@ pub(super) async fn git_landing(
452 452 pub(super) async fn file_log(
453 453 State(state): State<AppState>,
454 454 session: Session,
455 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
455 + MaybeUserVerified(maybe_user): MaybeUserVerified,
456 456 Path((owner, repo_name, git_ref, path)): Path<(String, String, String, String)>,
457 457 Query(query): Query<CommitQuery>,
458 458 ) -> Result<impl IntoResponse> {
@@ -9,7 +9,7 @@ use axum::{
9 9 use serde::Deserialize;
10 10
11 11 use crate::{
12 - auth::MaybeUserUnverified,
12 + auth::MaybeUserVerified,
13 13 constants,
14 14 error::{AppError, Result, ResultExt},
15 15 git,
@@ -22,7 +22,7 @@ use super::{repos_root, resolve_repo, resolve_repo_name};
22 22 #[tracing::instrument(skip_all, name = "git::raw_file")]
23 23 pub(super) async fn raw_file(
24 24 State(state): State<AppState>,
25 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
25 + MaybeUserVerified(maybe_user): MaybeUserVerified,
26 26 Path((owner, repo_name, git_ref, path)): Path<(String, String, String, String)>,
27 27 ) -> Result<Response> {
28 28 let resolved = resolve_repo(&state, &owner, &repo_name, maybe_user.as_ref().map(|u| u.id)).await?;
@@ -94,7 +94,7 @@ pub(super) struct InfoRefsQuery {
94 94 #[tracing::instrument(skip_all, name = "git::smart_http_info_refs")]
95 95 pub(super) async fn smart_http_info_refs(
96 96 State(state): State<AppState>,
97 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
97 + MaybeUserVerified(maybe_user): MaybeUserVerified,
98 98 Path((owner, repo_name)): Path<(String, String)>,
99 99 Query(query): Query<InfoRefsQuery>,
100 100 ) -> Result<Response> {
@@ -147,7 +147,7 @@ pub(super) async fn smart_http_info_refs(
147 147 #[tracing::instrument(skip_all, name = "git::smart_http_upload_pack")]
148 148 pub(super) async fn smart_http_upload_pack(
149 149 State(state): State<AppState>,
150 - MaybeUserUnverified(maybe_user): MaybeUserUnverified,
150 + MaybeUserVerified(maybe_user): MaybeUserVerified,
151 151 Path((owner, repo_name)): Path<(String, String)>,
152 152 body: axum::body::Bytes,
153 153 ) -> Result<Response> {
@@ -86,15 +86,15 @@ pub(crate) async fn render_item_page(
86 86 } else {
87 87 false
88 88 };
89 - let has_item_sub = if let Some(ref user) = maybe_user {
90 - db::subscriptions::has_access(&state.db, user.id, db::subscriptions::SubscriptionScope::Item(db_item.id)).await?
89 + let item_sub = if let Some(ref user) = maybe_user {
90 + db::subscriptions::SubscriptionGate::check(&state.db, user.id, db::subscriptions::SubscriptionScope::Item(db_item.id)).await?
91 91 } else {
92 - false
92 + None
93 93 };
94 94 let ctx = pricing::AccessContext {
95 95 is_creator: is_owner,
96 96 has_purchased: in_library,
97 - has_active_subscription: has_item_sub,
97 + subscription: item_sub,
98 98 };
99 99 let mut has_access = item_pricing.can_access(&ctx);
100 100 let is_free = item_pricing.is_free();
@@ -448,8 +448,15 @@ pub(super) async fn build_segments_json(
448 448 Ok(j) => j,
449 449 Err(_) => return "null".to_string(),
450 450 };
451 - // Replace </ with <\/ to prevent </script> injection when embedded in a
452 - // <script> tag via Askama's |safe filter.
451 + escape_json_for_script_tag(json)
452 + }
453 +
454 + /// Neutralize `</` so a JSON value can't break out of the surrounding
455 + /// `<script>` block (serde_json does not escape `<`, `>`, or `/` by default, and
456 + /// the result is emitted through Askama's `|safe` filter). Load-bearing: a
457 + /// creator-controlled insertion title containing `</script>` would otherwise
458 + /// inject markup. Pinned by `script_tag_breakout_is_neutralized`.
459 + fn escape_json_for_script_tag(json: String) -> String {
453 460 json.replace("</", "<\\/")
454 461 }
455 462
@@ -481,7 +488,18 @@ fn make_excerpt(body: &str, max_chars: usize) -> String {
481 488
482 489 #[cfg(test)]
483 490 mod tests {
484 - use super::make_excerpt;
491 + use super::{escape_json_for_script_tag, make_excerpt};
492 +
493 + #[test]
494 + fn script_tag_breakout_is_neutralized() {
495 + // A serialized value carrying `</script>` must not survive verbatim into
496 + // the <script type="application/json"> block.
497 + let raw = serde_json::to_string("</script><script>alert(1)</script>").unwrap();
498 + let escaped = escape_json_for_script_tag(raw);
499 + assert!(!escaped.contains("</script>"), "literal </script> leaked: {escaped}");
500 + assert!(!escaped.contains("</"), "no unescaped </ should remain: {escaped}");
501 + assert!(escaped.contains("<\\/script>"), "expected escaped form: {escaped}");
502 + }
485 503
486 504 #[test]
487 505 fn excerpt_short_passes_through() {
@@ -63,15 +63,15 @@ pub(in crate::routes::pages::public) async fn library_page(
63 63 } else {
64 64 false
65 65 };
66 - let has_item_sub = if let Some(ref user) = maybe_user {
67 - db::subscriptions::has_access(&state.db, user.id, db::subscriptions::SubscriptionScope::Item(db_item.id)).await?
66 + let item_sub = if let Some(ref user) = maybe_user {
67 + db::subscriptions::SubscriptionGate::check(&state.db, user.id, db::subscriptions::SubscriptionScope::Item(db_item.id)).await?
68 68 } else {
69 - false
69 + None
70 70 };
71 71 let ctx = pricing::AccessContext {
72 72 is_creator: is_owner,
73 73 has_purchased: in_library,
74 - has_active_subscription: has_item_sub,
74 + subscription: item_sub,
75 75 };
76 76 let mut has_access = item_pricing.can_access(&ctx);
77 77 if !has_access
@@ -75,7 +75,7 @@ pub(crate) async fn render_project_page(
75 75 viewer_user_id = ?maybe_user.as_ref().map(|u| u.id),
76 76 is_creator = project_ctx.is_creator,
77 77 has_purchased = project_ctx.has_purchased,
78 - has_active_subscription = project_ctx.has_active_subscription,
78 + has_active_subscription = project_ctx.has_active_subscription(),
79 79 pricing_kind = ?project_pricing.kind(),
80 80 "project paywall gate: showing paywall"
81 81 );
@@ -115,14 +115,12 @@ pub(crate) async fn render_project_page(
115 115 std::collections::HashSet::new()
116 116 };
117 117
118 - let subscribed_item_ids: std::collections::HashSet<ItemId> = if let Some(ref user) = maybe_user
119 - {
120 - db::subscriptions::get_user_subscribed_item_ids(&state.db, user.id)
121 - .await?
122 - .into_iter()
123 - .collect()
118 + // Per-item subscription proofs for this user, so each item's AccessContext
119 + // gets its own gate witness rather than a bare membership bool.
120 + let subscribed_gates = if let Some(ref user) = maybe_user {
121 + db::subscriptions::SubscriptionGate::subscribed_item_gates(&state.db, user.id).await?
124 122 } else {
125 - std::collections::HashSet::new()
123 + std::collections::HashMap::new()
126 124 };
127 125
128 126 let has_subscription = if let Some(ref user) = maybe_user {
@@ -156,7 +154,7 @@ pub(crate) async fn render_project_page(
156 154 let ctx = pricing::AccessContext {
157 155 is_creator,
158 156 has_purchased: purchased_item_ids.contains(&i.id),
159 - has_active_subscription: subscribed_item_ids.contains(&i.id),
157 + subscription: subscribed_gates.get(&i.id).copied(),
160 158 };
161 159 let can_access = item_pricing.can_access(&ctx);
162 160 let is_free = item_pricing.is_free();
@@ -109,7 +109,7 @@ pub(super) async fn stream_url(
109 109 let ctx = pricing::AccessContext {
110 110 is_creator: false,
111 111 has_purchased: access.has_purchased,
112 - has_active_subscription: access.has_subscription,
112 + subscription: access.subscription,
113 113 };
114 114 if !item_pricing.can_access(&ctx) && !access.has_bundle_access {
115 115 return Err(AppError::Forbidden);
@@ -203,7 +203,7 @@ pub(super) async fn version_download(
203 203 let ctx = pricing::AccessContext {
204 204 is_creator: false,
205 205 has_purchased: access.has_purchased,
206 - has_active_subscription: access.has_subscription,
206 + subscription: access.subscription,
207 207 };
208 208 if !item_pricing.can_access(&ctx) && !access.has_bundle_access {
209 209 return Err(AppError::Forbidden);
@@ -89,6 +89,7 @@ pub fn check_free_space(spool_dir: &Path, expected_size: u64) -> Result<(), Stri
89 89 /// presign already bounded the object size).
90 90 pub async fn download_into_tempfile(
91 91 spool_dir: &Path,
92 + unique: &str,
92 93 s3_key: &str,
93 94 expected_size: u64,
94 95 mut stream: ByteStream,
@@ -99,8 +100,14 @@ pub async fn download_into_tempfile(
99 100 .await
100 101 .map_err(|e| format!("create spool dir {}: {e}", spool_dir.display()))?;
101 102
103 + // `unique` (the scan job's UUID) makes the path collision-free even when two
104 + // workers scan the same s3_key concurrently (an item and its version, or a
105 + // re-queued job). Keying solely on pid+s3_key let the second `create_new`
106 + // fail `AlreadyExists` — failing a legitimately-running job — and let one
107 + // scan's `SpoolHandle::drop` unlink the other's live file. The s3_key suffix
108 + // is retained only for human-readable debugging.
102 109 let suffix = s3_key.replace('/', "_");
103 - let path = spool_dir.join(format!("scan-{}-{}.tmp", std::process::id(), suffix));
110 + let path = spool_dir.join(format!("scan-{}-{}-{}.tmp", std::process::id(), unique, suffix));
104 111
105 112 let mut file = OpenOptions::new()
106 113 .create_new(true)
@@ -195,6 +195,7 @@ async fn run_pipeline_and_decide(
195 195 let stream = ctx.s3.download_stream(&job.s3_key).await?;
196 196 let spool = super::spool::download_into_tempfile(
197 197 std::path::Path::new(constants::SCAN_SPOOL_DIR),
198 + &job.id.to_string(),
198 199 &job.s3_key,
199 200 job.file_size_bytes as u64,
200 201 stream,
@@ -222,23 +223,61 @@ async fn run_pipeline_and_decide(
222 223 }
223 224
224 225 // CDN-served image kinds (project/gallery/content-insertion) have no
225 - // scan_status column and no app-proxied download route to gate on, so
226 - // they would keep serving from cdn.makenot.work until an admin acted on
227 - // the ticket above. Purge the object from storage now — that is the only
228 - // enforcement point for these kinds. The verdict + failed layers are
229 - // already persisted in file_scan_results for admin review, and the WAM
230 - // ticket retains the s3_key for context.
226 + // scan_status column and no app-proxied download route to gate on, so a
227 + // verdict has to be enforced by removing the content, not by flipping a
228 + // status. Two-step enforcement:
229 + //
230 + // 1. Delete the DB image row. This stops the app from ever rendering
231 + // the (Cloudflare-served) URL again AND makes the s3_key non-live,
232 + // so the durable-deletion queue below won't park it behind the
233 + // `is_s3_key_live` guard.
234 + // 2. Delete the S3 object; on failure, enqueue it for durable retry
235 + // (the scheduler worker retries to completion and dead-letters on
236 + // exhaustion) rather than a single best-effort attempt that could
237 + // leave the origin object behind forever.
238 + //
239 + // Residual exposure: Cloudflare caches these objects immutably for a
240 + // year, so an already-edge-cached copy survives origin deletion until
241 + // the cache TTL lapses. Purging the edge cache needs the Cloudflare API
242 + // (token + zone), which the app doesn't currently hold — the WAM ticket
243 + // above is the manual trigger for that. The verdict + failed layers stay
244 + // in file_scan_results for admin review.
231 245 if kind.is_cdn_served_without_gate() {
232 - match ctx.s3.delete_object(&job.s3_key).await {
233 - Ok(()) => tracing::warn!(
234 - s3_key = %job.s3_key, target_kind = %kind.as_str(),
235 - "purged quarantined CDN-served image from storage (no serving gate exists for this kind)"
246 + match db::scanning::purge_cdn_image_rows_by_key(&ctx.db, &job.s3_key).await {
247 + Ok(n) => tracing::warn!(
248 + s3_key = %job.s3_key, target_kind = %kind.as_str(), rows_removed = n,
249 + "removed quarantined CDN-served image row(s); URL is no longer rendered"
236 250 ),
237 251 Err(e) => tracing::error!(
238 252 s3_key = %job.s3_key, target_kind = %kind.as_str(), error = %e,
239 - "FAILED to purge quarantined CDN-served image; it may still be served from the CDN until manual removal"
253 + "FAILED to remove quarantined image row(s); the URL may still render until manual removal"
240 254 ),
241 255 }
256 +
257 + match ctx.s3.delete_object(&job.s3_key).await {
258 + Ok(()) => tracing::warn!(
259 + s3_key = %job.s3_key, target_kind = %kind.as_str(),
260 + "purged quarantined CDN-served image object from storage"
261 + ),
262 + Err(e) => {
263 + tracing::error!(
264 + s3_key = %job.s3_key, target_kind = %kind.as_str(), error = %e,
265 + "immediate purge of quarantined image object failed; enqueuing for durable retry"
266 + );
267 + if let Err(enqueue_err) = db::pending_s3_deletions::enqueue_deletions(
268 + &ctx.db,
269 + &[(job.s3_key.clone(), "main".to_string())],
270 + "malware_quarantine",
271 + )
272 + .await
273 + {
274 + tracing::error!(
275 + s3_key = %job.s3_key, error = %enqueue_err,
276 + "FAILED to enqueue quarantined image object for durable deletion; object may persist at origin until manual removal"
277 + );
278 + }
279 + }
280 + }
242 281 }
243 282 return Ok(FileScanStatus::Quarantined);
244 283 }
@@ -382,6 +382,52 @@ pub(super) async fn purge_expired_deleted_items(state: &AppState) {
382 382 }
383 383 }
384 384
385 + /// Outcome of a guarded single-key orphan S3 delete.
386 + enum GuardedDelete {
387 + /// Object deleted (S3 returned Ok).
388 + Deleted,
389 + /// Skipped: a live DB row still references the key (delete-then-reupload
390 + /// race). The caller should clear its queue/record without deleting.
391 + SkippedLive,
392 + /// S3 delete failed (already logged). The caller decides retry vs. clear.
393 + Failed,
394 + }
395 +
396 + /// The single funnel for deleting one non-prefix orphan S3 object.
397 + ///
398 + /// Performs the `is_s3_key_live` check FIRST, so an object that a fresh upload
399 + /// reclaimed under the same key is never torpedoed. Both the pending-deletions
400 + /// retry worker and the orphaned-upload reaper route every single-key delete
401 + /// through here — neither can delete a key without the live-check. (Storage S2
402 + /// / CHRONIC 2: the worker had this guard inline; its sibling reaper did not,
403 + /// and could delete a now-live deterministic-key object.)
404 + async fn delete_orphan_key_guarded(
405 + db: &sqlx::PgPool,
406 + s3: &dyn crate::storage::StorageBackend,
407 + bucket: &str,
408 + s3_key: &str,
409 + ) -> GuardedDelete {
410 + match db::pending_s3_deletions::is_s3_key_live(db, bucket, s3_key).await {
411 + Ok(true) => {
412 + tracing::info!(s3_key = %s3_key, bucket = %bucket,
413 + "orphan S3 delete skipped — key reclaimed by a live row (delete-then-reupload)");
414 + return GuardedDelete::SkippedLive;
415 + }
416 + Ok(false) => {}
417 + Err(e) => {
418 + tracing::warn!(s3_key = %s3_key, error = ?e,
419 + "live-key check failed; proceeding with delete attempt");
420 + }
421 + }
422 + match s3.delete_object(s3_key).await {
423 + Ok(()) => GuardedDelete::Deleted,
424 + Err(e) => {
425 + tracing::warn!(s3_key = %s3_key, bucket = %bucket, error = ?e, "orphan S3 delete failed");
426 + GuardedDelete::Failed
427 + }
428 + }
429 + }
430 +
385 431 /// Delete S3 objects from presigned uploads that were never confirmed (>24h old).
386 432 pub(super) async fn cleanup_orphaned_uploads(state: &AppState) {
387 433 let stale = match db::pending_uploads::get_stale_pending_uploads(
@@ -410,16 +456,18 @@ pub(super) async fn cleanup_orphaned_uploads(state: &AppState) {
410 456 _ => state.s3.as_ref(),
411 457 };
412 458 if let Some(s3) = s3_client {
413 - match s3.delete_object(s3_key).await {
414 - Ok(()) => {
459 + // Route through the guarded funnel: if a confirm already reclaimed
460 + // this deterministic key, the live row owns the object and we must
461 + // NOT delete it — just clear the stale pending_uploads record.
462 + match delete_orphan_key_guarded(&state.db, s3.as_ref(), bucket, s3_key).await {
463 + GuardedDelete::Deleted => {
415 464 cleaned += 1;
416 465 keys_to_delete.push(s3_key.clone());
417 466 }
418 - Err(e) => {
419 - tracing::warn!(s3_key = %s3_key, bucket = %bucket, error = ?e, "failed to delete orphaned S3 object");
420 - // Still remove the DB record so we don't retry forever
421 - keys_to_delete.push(s3_key.clone());
422 - }
467 + // Live row owns the key now: clear the stale record, keep object.
468 + GuardedDelete::SkippedLive => keys_to_delete.push(s3_key.clone()),
469 + // Delete failed: still clear the record so we don't retry forever.
470 + GuardedDelete::Failed => keys_to_delete.push(s3_key.clone()),
423 471 }
424 472 } else {
425 473 // S3 not configured for this bucket; remove the DB record anyway
@@ -492,49 +540,36 @@ pub(super) async fn retry_pending_s3_deletions(state: &AppState) {
492 540 };
493 541
494 542 if let Some(s3) = s3 {
495 - // Delete-then-reupload race: a fresh upload may have reclaimed this
496 - // exact s3_key after the deletion was enqueued. Skip and dequeue if
497 - // any live row still references the key. Prefix deletes (user-scoped
498 - // cascade cleanups) bypass this per-key check — see the guard below.
499 - if !row.s3_key.ends_with('/') {
500 - match db::pending_s3_deletions::is_s3_key_live(&state.db, &row.bucket, &row.s3_key).await {
501 - Ok(true) => {
502 - tracing::info!(s3_key = %row.s3_key, bucket = %row.bucket,
503 - "S3 deletion skipped — key reclaimed by a live row (delete-then-reupload)");
504 - completed_ids.push(row.id);
505 - continue;
506 - }
507 - Ok(false) => {}
543 + if row.s3_key.ends_with('/') {
544 + // Prefix delete (user-scoped cascade cleanup). Bypasses the
545 + // per-key live-check by design, but carries its own guard: a bare
546 + // `{user_id}/` prefix wipes a creator's ENTIRE storage and is only
547 + // ever enqueued by account cleanup (which CASCADE-deletes the user
548 + // row in the same pass). A STILL-LIVE user here means something
549 + // wrongly enqueued a prefix delete against a live account — refuse
550 + // and let the row climb toward dead-letter triage rather than
551 + // silently nuking live files.
552 + if row.bucket == "main"
553 + && let Some(uid) = row.s3_key.strip_suffix('/').and_then(|s| s.parse::<db::UserId>().ok())
554 + && matches!(db::users::get_user_by_id(&state.db, uid).await, Ok(Some(_)))
555 + {
556 + tracing::error!(s3_key = %row.s3_key, user_id = %uid,
557 + "refusing user-prefix S3 delete: user still exists — parking for dead-letter triage instead of wiping live storage");
558 + continue;
559 + }
560 + match s3.delete_prefix(&row.s3_key).await {
561 + Ok(()) => completed_ids.push(row.id),
508 562 Err(e) => {
509 - tracing::warn!(s3_key = %row.s3_key, error = ?e,
510 - "live-key check failed; proceeding with delete attempt");
563 + tracing::warn!(s3_key = %row.s3_key, error = ?e, "retry S3 prefix deletion failed");
511 564 }
512 565 }
513 - } else if row.bucket == "main"
514 - && let Some(uid) = row.s3_key.strip_suffix('/').and_then(|s| s.parse::<db::UserId>().ok())
515 - && matches!(db::users::get_user_by_id(&state.db, uid).await, Ok(Some(_)))
516 - {
517 - // A bare `{user_id}/` prefix delete wipes a creator's ENTIRE
518 - // storage. These are only ever enqueued by account cleanup, which
519 - // CASCADE-deletes the user row in the same pass — so a STILL-LIVE
520 - // user here means something wrongly enqueued a prefix delete against
521 - // a live account. Refuse: don't dequeue, let the row climb attempts
522 - // into the dead-letter table for operator triage rather than
523 - // silently nuking a live creator's files.
524 - tracing::error!(s3_key = %row.s3_key, user_id = %uid,
525 - "refusing user-prefix S3 delete: user still exists — parking for dead-letter triage instead of wiping live storage");
526 - continue;
527 - }
528 -
529 - let result = if row.s3_key.ends_with('/') {
530 - s3.delete_prefix(&row.s3_key).await
531 566 } else {
532 - s3.delete_object(&row.s3_key).await
533 - };
534 - match result {
535 - Ok(()) => completed_ids.push(row.id),
536 - Err(e) => {
537 - tracing::warn!(s3_key = %row.s3_key, error = ?e, "retry S3 deletion failed");
567 + // Single-key delete — routed through the one guarded funnel so the
568 + // delete-then-reupload live-check can never be skipped. The same
569 + // funnel backs the orphaned-upload reaper.
570 + match delete_orphan_key_guarded(&state.db, s3.as_ref(), &row.bucket, &row.s3_key).await {
571 + GuardedDelete::Deleted | GuardedDelete::SkippedLive => completed_ids.push(row.id),
572 + GuardedDelete::Failed => {} // leave queued; climbs toward dead-letter
538 573 }
539 574 }
540 575 } else {
@@ -44,6 +44,91 @@ async fn setup_audio_item(h: &mut TestHarness, price_cents: i64) -> (String, Str
44 44 (setup.user_id.to_string(), setup.project_id, setup.item_id, s3_key)
45 45 }
46 46
47 + /// Regression pin for Payments S1 / CHRONIC 2: the download/stream access gate
48 + /// MUST honor `current_period_end`. An item subscription still flagged
49 + /// `status = 'active'` (its cancel/expiry webhook lapsed or never arrived) but
50 + /// whose paid period has EXPIRED must not grant access. Before the seal,
51 + /// `check_item_access` hand-wrote `status = 'active' AND paused_at IS NULL` and
52 + /// dropped the period clause, so a lapsed subscriber could still pull the file
53 + /// on the very route that protects content. The gate now routes through
54 + /// `subscriptions::has_access`, which enforces the clause in one sealed place.
55 + /// If anyone re-inlines the predicate without the period clause, the first
56 + /// assertion (lapsed → 403) flips to 200 and this test fails.
57 + #[tokio::test]
58 + async fn stream_denies_item_subscription_with_lapsed_period() {
59 + let mut h = TestHarness::with_storage().await;
60 + // Paid audio item owned by the creator (price > 0 ⇒ access is gated).
61 + let (_creator, _project, item_id, _s3) = setup_audio_item(&mut h, 500).await;
62 + let item_uuid: uuid::Uuid = item_id.parse().unwrap();
63 +
64 + // Item-level subscription tier (project_id NULL, item_id set).
65 + let tier_id: uuid::Uuid = sqlx::query_scalar(
66 + "INSERT INTO subscription_tiers (project_id, item_id, name, price_cents) \
67 + VALUES (NULL, $1, 'Item Tier', 500) RETURNING id",
68 + )
69 + .bind(item_uuid)
70 + .fetch_one(&h.db)
71 + .await
72 + .unwrap();
73 +
74 + // A fan with a known password (seeded directly; we only need to log in).
75 + let hash = makenotwork::auth::hash_password("password123").unwrap();
76 + let fan_id: makenotwork::db::UserId = sqlx::query_scalar(
77 + "INSERT INTO users (username, email, password_hash, email_verified) \
78 + VALUES ('lapsedfan', 'lapsedfan@example.com', $1, true) RETURNING id",
79 + )
80 + .bind(&hash)
81 + .fetch_one(&h.db)
82 + .await
83 + .unwrap();
84 +
85 + // Active-status item subscription whose paid period ended a day ago.
86 + sqlx::query(
87 + "INSERT INTO subscriptions \
88 + (subscriber_id, tier_id, item_id, project_id, stripe_subscription_id, \
89 + stripe_customer_id, status, paused_at, current_period_end) \
90 + VALUES ($1, $2, $3, NULL, 'sub_lapsed', 'cus_lapsed', 'active', NULL, NOW() - INTERVAL '1 day')",
91 + )
92 + .bind(fan_id)
93 + .bind(tier_id)
94 + .bind(item_uuid)
95 + .execute(&h.db)
96 + .await
97 + .unwrap();
98 +
99 + // Act as the fan (the creator is logged in from setup_audio_item).
100 + h.client.post_form("/logout", "").await;
101 + h.login("lapsedfan", "password123").await;
102 +
103 + // Lapsed period → denied on the download gate.
104 + let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
105 + assert_eq!(
106 + resp.status.as_u16(),
107 + 403,
108 + "lapsed-period subscriber must be denied the download gate, got {} {}",
109 + resp.status,
110 + resp.text
111 + );
112 +
113 + // Same subscriber, period pushed into the future → now passes.
114 + sqlx::query(
115 + "UPDATE subscriptions SET current_period_end = NOW() + INTERVAL '30 days' \
116 + WHERE subscriber_id = $1",
117 + )
118 + .bind(fan_id)
119 + .execute(&h.db)
120 + .await
121 + .unwrap();
122 +
123 + let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
124 + assert!(
125 + resp.status.is_success(),
126 + "subscriber within the paid period must pass the gate, got {} {}",
127 + resp.status,
128 + resp.text
129 + );
130 + }
131 +
47 132 #[tokio::test]
48 133 async fn stream_url_response_has_expected_shape() {
49 134 let mut h = TestHarness::with_storage().await;