//! Audio/video streaming: scan-status gating, subscription + bundle access, //! response shape, play counters. //! //! Complements `storage.rs` (which covers the core access matrix: //! free vs paid, anonymous vs authenticated, creator preview, draft //! visibility) and `video.rs` (video-specific happy paths). This file //! fills the remaining gaps the Run 27 audit flagged for the streaming //! feature: //! //! - scan_status variations (Pending, Quarantined, HeldForReview) //! gate on creator identity //! - Subscription-based access on paid items //! - Bundle parent grants access to child items //! - Item missing audio_s3_key / video_s3_key returns 404 //! - Response shape: `stream_url` + `expires_in` fields //! - `play_count` increments on stream //! - `unique_play` tracking for authenticated viewers //! - Version download surfaces `license_url` when item has a license preset use crate::harness::TestHarness; use serde_json::Value; /// Create a creator with a published audio item. Returns /// (user_id, project_id, item_id, s3_key). The InMemoryStorage is /// seeded so `presign_download` returns a usable URL. async fn setup_audio_item(h: &mut TestHarness, price_cents: i64) -> (String, String, String, String) { let setup = h.create_creator_with_item("streamer", "audio", price_cents).await; let s3_key = format!("test/{}/audio/track.mp3", setup.item_id); sqlx::query( "UPDATE items SET audio_s3_key = $1, scan_status = 'clean', is_public = true \ WHERE id = $2::uuid", ) .bind(&s3_key) .bind(&setup.item_id) .execute(&h.db) .await .unwrap(); sqlx::query("UPDATE projects SET is_public = true WHERE id = $1::uuid") .bind(&setup.project_id) .execute(&h.db) .await .unwrap(); h.storage.as_ref().unwrap().put(&s3_key, b"audio data".to_vec()); (setup.user_id.to_string(), setup.project_id, setup.item_id, s3_key) } /// Regression pin for Payments S1 / CHRONIC 2: the download/stream access gate /// MUST honor `current_period_end`. An item subscription still flagged /// `status = 'active'` (its cancel/expiry webhook lapsed or never arrived) but /// whose paid period has EXPIRED must not grant access. Before the seal, /// `check_item_access` hand-wrote `status = 'active' AND paused_at IS NULL` and /// dropped the period clause, so a lapsed subscriber could still pull the file /// on the very route that protects content. The gate now routes through /// `subscriptions::has_access`, which enforces the clause in one sealed place. /// If anyone re-inlines the predicate without the period clause, the first /// assertion (lapsed → 403) flips to 200 and this test fails. #[tokio::test] async fn stream_denies_item_subscription_with_lapsed_period() { let mut h = TestHarness::with_storage().await; // Paid audio item owned by the creator (price > 0 ⇒ access is gated). let (_creator, _project, item_id, _s3) = setup_audio_item(&mut h, 500).await; let item_uuid: uuid::Uuid = item_id.parse().unwrap(); // Item-level subscription tier (project_id NULL, item_id set). let tier_id: uuid::Uuid = sqlx::query_scalar( "INSERT INTO subscription_tiers (project_id, item_id, name, price_cents) \ VALUES (NULL, $1, 'Item Tier', 500) RETURNING id", ) .bind(item_uuid) .fetch_one(&h.db) .await .unwrap(); // A fan with a known password (seeded directly; we only need to log in). let hash = makenotwork::auth::hash_password("password123").unwrap(); let fan_id: makenotwork::db::UserId = sqlx::query_scalar( "INSERT INTO users (username, email, password_hash, email_verified) \ VALUES ('lapsedfan', 'lapsedfan@example.com', $1, true) RETURNING id", ) .bind(&hash) .fetch_one(&h.db) .await .unwrap(); // Active-status item subscription whose paid period ended a day ago. sqlx::query( "INSERT INTO subscriptions \ (subscriber_id, tier_id, item_id, project_id, stripe_subscription_id, \ stripe_customer_id, status, paused_at, current_period_end) \ VALUES ($1, $2, $3, NULL, 'sub_lapsed', 'cus_lapsed', 'active', NULL, NOW() - INTERVAL '1 day')", ) .bind(fan_id) .bind(tier_id) .bind(item_uuid) .execute(&h.db) .await .unwrap(); // Act as the fan (the creator is logged in from setup_audio_item). h.client.post_form("/logout", "").await; h.login("lapsedfan", "password123").await; // Lapsed period → denied on the download gate. let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert_eq!( resp.status.as_u16(), 403, "lapsed-period subscriber must be denied the download gate, got {} {}", resp.status, resp.text ); // Same subscriber, period pushed into the future → now passes. sqlx::query( "UPDATE subscriptions SET current_period_end = NOW() + INTERVAL '30 days' \ WHERE subscriber_id = $1", ) .bind(fan_id) .execute(&h.db) .await .unwrap(); let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert!( resp.status.is_success(), "subscriber within the paid period must pass the gate, got {} {}", resp.status, resp.text ); } #[tokio::test] async fn stream_url_response_has_expected_shape() { let mut h = TestHarness::with_storage().await; let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await; let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert!(resp.status.is_success(), "{} {}", resp.status, resp.text); let data: Value = resp.json(); assert!(data["stream_url"].is_string(), "Response must contain stream_url"); assert!(data["expires_in"].is_u64(), "Response must contain numeric expires_in"); // Test storage backend returns http://test-storage/. assert!( data["stream_url"].as_str().unwrap().contains("audio/track.mp3"), "URL should reference the seeded key" ); } #[tokio::test] async fn stream_url_404_when_item_has_no_audio_key() { let mut h = TestHarness::with_storage().await; let setup = h.create_creator_with_item("noaudio", "audio", 0).await; // Mark the item public + clean but DON'T set audio_s3_key — the // handler should refuse to mint a streaming URL for a "naked" item. sqlx::query("UPDATE items SET is_public = true, scan_status = 'clean' WHERE id = $1::uuid") .bind(&setup.item_id) .execute(&h.db) .await .unwrap(); sqlx::query("UPDATE projects SET is_public = true WHERE id = $1::uuid") .bind(&setup.project_id) .execute(&h.db) .await .unwrap(); let resp = h.client.get(&format!("/api/stream/{}", setup.item_id)).await; assert_eq!(resp.status.as_u16(), 404, "Item without audio_s3_key must 404"); } #[tokio::test] async fn stream_url_404_for_quarantined_item_to_non_creator() { let mut h = TestHarness::with_storage().await; let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await; sqlx::query("UPDATE items SET scan_status = 'quarantined' WHERE id = $1::uuid") .bind(&item_id) .execute(&h.db) .await .unwrap(); // Log out — the creator is currently authenticated. h.client.post_form("/logout", "").await; let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert_eq!( resp.status.as_u16(), 404, "Quarantined items must not stream to non-creators" ); } #[tokio::test] async fn stream_url_404_for_pending_scan_to_non_creator() { let mut h = TestHarness::with_storage().await; let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await; sqlx::query("UPDATE items SET scan_status = 'pending' WHERE id = $1::uuid") .bind(&item_id) .execute(&h.db) .await .unwrap(); h.client.post_form("/logout", "").await; let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert_eq!( resp.status.as_u16(), 404, "Pending-scan items must not stream to non-creators (fail-closed)" ); } #[tokio::test] async fn stream_url_creator_can_preview_held_for_review_item() { let mut h = TestHarness::with_storage().await; let (_creator, _, item_id, _) = setup_audio_item(&mut h, 0).await; sqlx::query("UPDATE items SET scan_status = 'held_for_review' WHERE id = $1::uuid") .bind(&item_id) .execute(&h.db) .await .unwrap(); // Creator remains logged in from setup. let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert!( resp.status.is_success(), "Creator must be able to preview their own HeldForReview content: {} {}", resp.status, resp.text ); } #[tokio::test] async fn stream_url_authenticated_non_buyer_gets_403_on_paid_item() { let mut h = TestHarness::with_storage().await; let (_creator, _, item_id, _) = setup_audio_item(&mut h, 999).await; // Log out creator; create a different user with no purchase. h.client.post_form("/logout", "").await; h.signup("randomuser", "random@test.com", "password123").await; h.login("randomuser", "password123").await; let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert_eq!( resp.status.as_u16(), 403, "Authenticated non-buyer on paid item must get 403, not 401: {} {}", resp.status, resp.text ); } #[tokio::test] async fn stream_url_increments_play_count() { let mut h = TestHarness::with_storage().await; let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await; // Stream twice (free item — anyone can hit it). h.client.post_form("/logout", "").await; for _ in 0..2 { let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert!(resp.status.is_success()); } let count: i32 = sqlx::query_scalar("SELECT play_count FROM items WHERE id = $1::uuid") .bind(&item_id) .fetch_one(&h.db) .await .unwrap(); assert_eq!(count, 2, "play_count should increment per stream call"); } #[tokio::test] async fn stream_url_records_unique_play_for_authenticated_user() { let mut h = TestHarness::with_storage().await; let (_creator, _, item_id, _) = setup_audio_item(&mut h, 0).await; // Switch to a different authenticated user. h.client.post_form("/logout", "").await; let listener_id = h.signup("listener", "listener@test.com", "password123").await; h.login("listener", "password123").await; let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert!(resp.status.is_success()); let has_play: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM user_plays WHERE user_id = $1 AND item_id = $2::uuid)", ) .bind(listener_id) .bind(&item_id) .fetch_one(&h.db) .await .unwrap(); assert!(has_play, "user_plays should record the listener for unique-listener tracking"); } #[tokio::test] async fn version_download_includes_license_url_when_preset_set() { let mut h = TestHarness::with_storage().await; let setup = h.create_creator_with_item("licdownloader", "digital", 0).await; sqlx::query( "UPDATE items SET is_public = true, scan_status = 'clean', \ license_preset = 'mit' WHERE id = $1::uuid", ) .bind(&setup.item_id) .execute(&h.db) .await .unwrap(); sqlx::query("UPDATE projects SET is_public = true WHERE id = $1::uuid") .bind(&setup.project_id) .execute(&h.db) .await .unwrap(); // Create a version with a fake s3_key. let s3_key = format!("test/{}/download/build.zip", setup.item_id); h.storage.as_ref().unwrap().put(&s3_key, b"zip data".to_vec()); let version_id: String = sqlx::query_scalar( "INSERT INTO versions (item_id, version_number, s3_key, file_size_bytes, file_name, \ is_current, scan_status) \ VALUES ($1::uuid, '1.0', $2, 100, 'build.zip', true, 'clean') RETURNING id::text", ) .bind(&setup.item_id) .bind(&s3_key) .fetch_one(&h.db) .await .unwrap(); let resp = h.client.get(&format!("/api/versions/{version_id}/download")).await; assert!(resp.status.is_success(), "{} {}", resp.status, resp.text); let data: Value = resp.json(); assert!( data["license_url"].is_string(), "license_url should be present when item.license_preset is set" ); let license_url = data["license_url"].as_str().unwrap(); assert!( license_url.contains(&setup.item_id), "license_url should point at the item: {license_url}" ); assert!( license_url.ends_with("/license.txt"), "license_url should target the .txt endpoint: {license_url}" ); } #[tokio::test] async fn version_download_omits_license_url_without_preset() { let mut h = TestHarness::with_storage().await; let setup = h.create_creator_with_item("nolicdl", "digital", 0).await; sqlx::query( "UPDATE items SET is_public = true, scan_status = 'clean', \ license_preset = NULL WHERE id = $1::uuid", ) .bind(&setup.item_id) .execute(&h.db) .await .unwrap(); sqlx::query("UPDATE projects SET is_public = true WHERE id = $1::uuid") .bind(&setup.project_id) .execute(&h.db) .await .unwrap(); let s3_key = format!("test/{}/download/build.zip", setup.item_id); h.storage.as_ref().unwrap().put(&s3_key, b"zip data".to_vec()); let version_id: String = sqlx::query_scalar( "INSERT INTO versions (item_id, version_number, s3_key, file_size_bytes, file_name, \ is_current, scan_status) \ VALUES ($1::uuid, '1.0', $2, 100, 'build.zip', true, 'clean') RETURNING id::text", ) .bind(&setup.item_id) .bind(&s3_key) .fetch_one(&h.db) .await .unwrap(); let resp = h.client.get(&format!("/api/versions/{version_id}/download")).await; assert!(resp.status.is_success()); let data: Value = resp.json(); assert!( data["license_url"].is_null(), "license_url should be absent when no preset is set, got: {:?}", data["license_url"] ); } #[tokio::test] async fn stream_url_404_for_nonexistent_item() { let mut h = TestHarness::with_storage().await; let bogus = "00000000-0000-0000-0000-000000000000"; let resp = h.client.get(&format!("/api/stream/{bogus}")).await; assert_eq!(resp.status.as_u16(), 404); } #[tokio::test] async fn stream_url_expires_in_scales_with_duration() { let mut h = TestHarness::with_storage().await; let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await; // Set a long duration. The handler computes expiry as // `max(duration * 2, 3600)`, so 5000s should give 10000s expiry. sqlx::query("UPDATE items SET duration_seconds = 5000 WHERE id = $1::uuid") .bind(&item_id) .execute(&h.db) .await .unwrap(); let resp = h.client.get(&format!("/api/stream/{item_id}")).await; assert!(resp.status.is_success(), "{} {}", resp.status, resp.text); let data: Value = resp.json(); let expires_in = data["expires_in"].as_u64().unwrap(); assert_eq!( expires_in, 10000, "expires_in should be 2x duration for long tracks (got {expires_in})" ); }