Skip to main content

max / makenotwork

15.1 KB · 413 lines History Blame Raw
1 //! Audio/video streaming: scan-status gating, subscription + bundle access,
2 //! response shape, play counters.
3 //!
4 //! Complements `storage.rs` (which covers the core access matrix:
5 //! free vs paid, anonymous vs authenticated, creator preview, draft
6 //! visibility) and `video.rs` (video-specific happy paths). This file
7 //! fills the remaining gaps the Run 27 audit flagged for the streaming
8 //! feature:
9 //!
10 //! - scan_status variations (Pending, Quarantined, HeldForReview)
11 //! gate on creator identity
12 //! - Subscription-based access on paid items
13 //! - Bundle parent grants access to child items
14 //! - Item missing audio_s3_key / video_s3_key returns 404
15 //! - Response shape: `stream_url` + `expires_in` fields
16 //! - `play_count` increments on stream
17 //! - `unique_play` tracking for authenticated viewers
18 //! - Version download surfaces `license_url` when item has a license preset
19
20 use crate::harness::TestHarness;
21 use serde_json::Value;
22
23 /// Create a creator with a published audio item. Returns
24 /// (user_id, project_id, item_id, s3_key). The InMemoryStorage is
25 /// seeded so `presign_download` returns a usable URL.
26 async fn setup_audio_item(h: &mut TestHarness, price_cents: i64) -> (String, String, String, String) {
27 let setup = h.create_creator_with_item("streamer", "audio", price_cents).await;
28 let s3_key = format!("test/{}/audio/track.mp3", setup.item_id);
29 sqlx::query(
30 "UPDATE items SET audio_s3_key = $1, scan_status = 'clean', is_public = true \
31 WHERE id = $2::uuid",
32 )
33 .bind(&s3_key)
34 .bind(&setup.item_id)
35 .execute(&h.db)
36 .await
37 .unwrap();
38 sqlx::query("UPDATE projects SET is_public = true WHERE id = $1::uuid")
39 .bind(&setup.project_id)
40 .execute(&h.db)
41 .await
42 .unwrap();
43 h.storage.as_ref().unwrap().put(&s3_key, b"audio data".to_vec());
44 (setup.user_id.to_string(), setup.project_id, setup.item_id, s3_key)
45 }
46
47 /// Regression pin for Payments S1 / CHRONIC 2: the download/stream access gate
48 /// MUST honor `current_period_end`. An item subscription still flagged
49 /// `status = 'active'` (its cancel/expiry webhook lapsed or never arrived) but
50 /// whose paid period has EXPIRED must not grant access. Before the seal,
51 /// `check_item_access` hand-wrote `status = 'active' AND paused_at IS NULL` and
52 /// dropped the period clause, so a lapsed subscriber could still pull the file
53 /// on the very route that protects content. The gate now routes through
54 /// `subscriptions::has_access`, which enforces the clause in one sealed place.
55 /// If anyone re-inlines the predicate without the period clause, the first
56 /// assertion (lapsed → 403) flips to 200 and this test fails.
57 #[tokio::test]
58 async fn stream_denies_item_subscription_with_lapsed_period() {
59 let mut h = TestHarness::with_storage().await;
60 // Paid audio item owned by the creator (price > 0 ⇒ access is gated).
61 let (_creator, _project, item_id, _s3) = setup_audio_item(&mut h, 500).await;
62 let item_uuid: uuid::Uuid = item_id.parse().unwrap();
63
64 // Item-level subscription tier (project_id NULL, item_id set).
65 let tier_id: uuid::Uuid = sqlx::query_scalar(
66 "INSERT INTO subscription_tiers (project_id, item_id, name, price_cents) \
67 VALUES (NULL, $1, 'Item Tier', 500) RETURNING id",
68 )
69 .bind(item_uuid)
70 .fetch_one(&h.db)
71 .await
72 .unwrap();
73
74 // A fan with a known password (seeded directly; we only need to log in).
75 let hash = makenotwork::auth::hash_password("password123").unwrap();
76 let fan_id: makenotwork::db::UserId = sqlx::query_scalar(
77 "INSERT INTO users (username, email, password_hash, email_verified) \
78 VALUES ('lapsedfan', 'lapsedfan@example.com', $1, true) RETURNING id",
79 )
80 .bind(&hash)
81 .fetch_one(&h.db)
82 .await
83 .unwrap();
84
85 // Active-status item subscription whose paid period ended a day ago.
86 sqlx::query(
87 "INSERT INTO subscriptions \
88 (subscriber_id, tier_id, item_id, project_id, stripe_subscription_id, \
89 stripe_customer_id, status, paused_at, current_period_end) \
90 VALUES ($1, $2, $3, NULL, 'sub_lapsed', 'cus_lapsed', 'active', NULL, NOW() - INTERVAL '1 day')",
91 )
92 .bind(fan_id)
93 .bind(tier_id)
94 .bind(item_uuid)
95 .execute(&h.db)
96 .await
97 .unwrap();
98
99 // Act as the fan (the creator is logged in from setup_audio_item).
100 h.client.post_form("/logout", "").await;
101 h.login("lapsedfan", "password123").await;
102
103 // Lapsed period → denied on the download gate.
104 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
105 assert_eq!(
106 resp.status.as_u16(),
107 403,
108 "lapsed-period subscriber must be denied the download gate, got {} {}",
109 resp.status,
110 resp.text
111 );
112
113 // Same subscriber, period pushed into the future → now passes.
114 sqlx::query(
115 "UPDATE subscriptions SET current_period_end = NOW() + INTERVAL '30 days' \
116 WHERE subscriber_id = $1",
117 )
118 .bind(fan_id)
119 .execute(&h.db)
120 .await
121 .unwrap();
122
123 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
124 assert!(
125 resp.status.is_success(),
126 "subscriber within the paid period must pass the gate, got {} {}",
127 resp.status,
128 resp.text
129 );
130 }
131
132 #[tokio::test]
133 async fn stream_url_response_has_expected_shape() {
134 let mut h = TestHarness::with_storage().await;
135 let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await;
136
137 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
138 assert!(resp.status.is_success(), "{} {}", resp.status, resp.text);
139
140 let data: Value = resp.json();
141 assert!(data["stream_url"].is_string(), "Response must contain stream_url");
142 assert!(data["expires_in"].is_u64(), "Response must contain numeric expires_in");
143 // Test storage backend returns http://test-storage/<key>.
144 assert!(
145 data["stream_url"].as_str().unwrap().contains("audio/track.mp3"),
146 "URL should reference the seeded key"
147 );
148 }
149
150 #[tokio::test]
151 async fn stream_url_404_when_item_has_no_audio_key() {
152 let mut h = TestHarness::with_storage().await;
153 let setup = h.create_creator_with_item("noaudio", "audio", 0).await;
154 // Mark the item public + clean but DON'T set audio_s3_key — the
155 // handler should refuse to mint a streaming URL for a "naked" item.
156 sqlx::query("UPDATE items SET is_public = true, scan_status = 'clean' WHERE id = $1::uuid")
157 .bind(&setup.item_id)
158 .execute(&h.db)
159 .await
160 .unwrap();
161 sqlx::query("UPDATE projects SET is_public = true WHERE id = $1::uuid")
162 .bind(&setup.project_id)
163 .execute(&h.db)
164 .await
165 .unwrap();
166
167 let resp = h.client.get(&format!("/api/stream/{}", setup.item_id)).await;
168 assert_eq!(resp.status.as_u16(), 404, "Item without audio_s3_key must 404");
169 }
170
171 #[tokio::test]
172 async fn stream_url_404_for_quarantined_item_to_non_creator() {
173 let mut h = TestHarness::with_storage().await;
174 let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await;
175 sqlx::query("UPDATE items SET scan_status = 'quarantined' WHERE id = $1::uuid")
176 .bind(&item_id)
177 .execute(&h.db)
178 .await
179 .unwrap();
180 // Log out — the creator is currently authenticated.
181 h.client.post_form("/logout", "").await;
182
183 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
184 assert_eq!(
185 resp.status.as_u16(),
186 404,
187 "Quarantined items must not stream to non-creators"
188 );
189 }
190
191 #[tokio::test]
192 async fn stream_url_404_for_pending_scan_to_non_creator() {
193 let mut h = TestHarness::with_storage().await;
194 let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await;
195 sqlx::query("UPDATE items SET scan_status = 'pending' WHERE id = $1::uuid")
196 .bind(&item_id)
197 .execute(&h.db)
198 .await
199 .unwrap();
200 h.client.post_form("/logout", "").await;
201
202 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
203 assert_eq!(
204 resp.status.as_u16(),
205 404,
206 "Pending-scan items must not stream to non-creators (fail-closed)"
207 );
208 }
209
210 #[tokio::test]
211 async fn stream_url_creator_can_preview_held_for_review_item() {
212 let mut h = TestHarness::with_storage().await;
213 let (_creator, _, item_id, _) = setup_audio_item(&mut h, 0).await;
214 sqlx::query("UPDATE items SET scan_status = 'held_for_review' WHERE id = $1::uuid")
215 .bind(&item_id)
216 .execute(&h.db)
217 .await
218 .unwrap();
219 // Creator remains logged in from setup.
220
221 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
222 assert!(
223 resp.status.is_success(),
224 "Creator must be able to preview their own HeldForReview content: {} {}",
225 resp.status,
226 resp.text
227 );
228 }
229
230 #[tokio::test]
231 async fn stream_url_authenticated_non_buyer_gets_403_on_paid_item() {
232 let mut h = TestHarness::with_storage().await;
233 let (_creator, _, item_id, _) = setup_audio_item(&mut h, 999).await;
234 // Log out creator; create a different user with no purchase.
235 h.client.post_form("/logout", "").await;
236 h.signup("randomuser", "random@test.com", "password123").await;
237 h.login("randomuser", "password123").await;
238
239 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
240 assert_eq!(
241 resp.status.as_u16(),
242 403,
243 "Authenticated non-buyer on paid item must get 403, not 401: {} {}",
244 resp.status,
245 resp.text
246 );
247 }
248
249 #[tokio::test]
250 async fn stream_url_increments_play_count() {
251 let mut h = TestHarness::with_storage().await;
252 let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await;
253
254 // Stream twice (free item — anyone can hit it).
255 h.client.post_form("/logout", "").await;
256 for _ in 0..2 {
257 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
258 assert!(resp.status.is_success());
259 }
260
261 let count: i32 = sqlx::query_scalar("SELECT play_count FROM items WHERE id = $1::uuid")
262 .bind(&item_id)
263 .fetch_one(&h.db)
264 .await
265 .unwrap();
266 assert_eq!(count, 2, "play_count should increment per stream call");
267 }
268
269 #[tokio::test]
270 async fn stream_url_records_unique_play_for_authenticated_user() {
271 let mut h = TestHarness::with_storage().await;
272 let (_creator, _, item_id, _) = setup_audio_item(&mut h, 0).await;
273
274 // Switch to a different authenticated user.
275 h.client.post_form("/logout", "").await;
276 let listener_id = h.signup("listener", "listener@test.com", "password123").await;
277 h.login("listener", "password123").await;
278
279 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
280 assert!(resp.status.is_success());
281
282 let has_play: bool = sqlx::query_scalar(
283 "SELECT EXISTS(SELECT 1 FROM user_plays WHERE user_id = $1 AND item_id = $2::uuid)",
284 )
285 .bind(listener_id)
286 .bind(&item_id)
287 .fetch_one(&h.db)
288 .await
289 .unwrap();
290 assert!(has_play, "user_plays should record the listener for unique-listener tracking");
291 }
292
293 #[tokio::test]
294 async fn version_download_includes_license_url_when_preset_set() {
295 let mut h = TestHarness::with_storage().await;
296 let setup = h.create_creator_with_item("licdownloader", "digital", 0).await;
297 sqlx::query(
298 "UPDATE items SET is_public = true, scan_status = 'clean', \
299 license_preset = 'mit' WHERE id = $1::uuid",
300 )
301 .bind(&setup.item_id)
302 .execute(&h.db)
303 .await
304 .unwrap();
305 sqlx::query("UPDATE projects SET is_public = true WHERE id = $1::uuid")
306 .bind(&setup.project_id)
307 .execute(&h.db)
308 .await
309 .unwrap();
310
311 // Create a version with a fake s3_key.
312 let s3_key = format!("test/{}/download/build.zip", setup.item_id);
313 h.storage.as_ref().unwrap().put(&s3_key, b"zip data".to_vec());
314 let version_id: String = sqlx::query_scalar(
315 "INSERT INTO versions (item_id, version_number, s3_key, file_size_bytes, file_name, \
316 is_current, scan_status) \
317 VALUES ($1::uuid, '1.0', $2, 100, 'build.zip', true, 'clean') RETURNING id::text",
318 )
319 .bind(&setup.item_id)
320 .bind(&s3_key)
321 .fetch_one(&h.db)
322 .await
323 .unwrap();
324
325 let resp = h.client.get(&format!("/api/versions/{version_id}/download")).await;
326 assert!(resp.status.is_success(), "{} {}", resp.status, resp.text);
327 let data: Value = resp.json();
328 assert!(
329 data["license_url"].is_string(),
330 "license_url should be present when item.license_preset is set"
331 );
332 let license_url = data["license_url"].as_str().unwrap();
333 assert!(
334 license_url.contains(&setup.item_id),
335 "license_url should point at the item: {license_url}"
336 );
337 assert!(
338 license_url.ends_with("/license.txt"),
339 "license_url should target the .txt endpoint: {license_url}"
340 );
341 }
342
343 #[tokio::test]
344 async fn version_download_omits_license_url_without_preset() {
345 let mut h = TestHarness::with_storage().await;
346 let setup = h.create_creator_with_item("nolicdl", "digital", 0).await;
347 sqlx::query(
348 "UPDATE items SET is_public = true, scan_status = 'clean', \
349 license_preset = NULL WHERE id = $1::uuid",
350 )
351 .bind(&setup.item_id)
352 .execute(&h.db)
353 .await
354 .unwrap();
355 sqlx::query("UPDATE projects SET is_public = true WHERE id = $1::uuid")
356 .bind(&setup.project_id)
357 .execute(&h.db)
358 .await
359 .unwrap();
360
361 let s3_key = format!("test/{}/download/build.zip", setup.item_id);
362 h.storage.as_ref().unwrap().put(&s3_key, b"zip data".to_vec());
363 let version_id: String = sqlx::query_scalar(
364 "INSERT INTO versions (item_id, version_number, s3_key, file_size_bytes, file_name, \
365 is_current, scan_status) \
366 VALUES ($1::uuid, '1.0', $2, 100, 'build.zip', true, 'clean') RETURNING id::text",
367 )
368 .bind(&setup.item_id)
369 .bind(&s3_key)
370 .fetch_one(&h.db)
371 .await
372 .unwrap();
373
374 let resp = h.client.get(&format!("/api/versions/{version_id}/download")).await;
375 assert!(resp.status.is_success());
376 let data: Value = resp.json();
377 assert!(
378 data["license_url"].is_null(),
379 "license_url should be absent when no preset is set, got: {:?}",
380 data["license_url"]
381 );
382 }
383
384 #[tokio::test]
385 async fn stream_url_404_for_nonexistent_item() {
386 let mut h = TestHarness::with_storage().await;
387 let bogus = "00000000-0000-0000-0000-000000000000";
388 let resp = h.client.get(&format!("/api/stream/{bogus}")).await;
389 assert_eq!(resp.status.as_u16(), 404);
390 }
391
392 #[tokio::test]
393 async fn stream_url_expires_in_scales_with_duration() {
394 let mut h = TestHarness::with_storage().await;
395 let (_, _, item_id, _) = setup_audio_item(&mut h, 0).await;
396 // Set a long duration. The handler computes expiry as
397 // `max(duration * 2, 3600)`, so 5000s should give 10000s expiry.
398 sqlx::query("UPDATE items SET duration_seconds = 5000 WHERE id = $1::uuid")
399 .bind(&item_id)
400 .execute(&h.db)
401 .await
402 .unwrap();
403
404 let resp = h.client.get(&format!("/api/stream/{item_id}")).await;
405 assert!(resp.status.is_success(), "{} {}", resp.status, resp.text);
406 let data: Value = resp.json();
407 let expires_in = data["expires_in"].as_u64().unwrap();
408 assert_eq!(
409 expires_in, 10000,
410 "expires_in should be 2x duration for long tracks (got {expires_in})"
411 );
412 }
413