Skip to main content

max / makenotwork

Split scheduler into submodules, add Phase 1 integration tests Scheduler: split 1329-line monolith into 6 submodules (announcements, cleanup, integrity, webhooks, mt_threads, mod). No file exceeds 253 lines. Public API unchanged. Integration tests: 28 new tests across 4 workflow modules: - sandbox.rs (9): create, restrictions, content isolation, IP cap, email - purchase.rs (+5): paid checkout+webhook, PWYW, unlisted, dedup, library - subscriptions.rs (+6): tier CRUD, visibility, sandbox fake Stripe IDs - revenue_splits.rs (5): member CRUD, split on purchase, CSV export Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-04-29 16:44 UTC
Commit: a8cfed5b31180d7e226063bb9f8e07ac99ec2ee5
Parent: 0ac9738
13 files changed, +2921 insertions, -501 deletions
@@ -123,6 +123,85 @@ Files > 100 MB are now held for review instead of downloaded into RAM. Next step
123 123
124 124 ---
125 125
126 + ## Integration Test Improvement Plan (2026-04-29)
127 +
128 + Current: 986 unit tests + 643 integration tests (74 workflow modules) = ~1,629 total.
129 + Infrastructure: template DBs, mock Stripe/email/S3, cookie-aware in-process client.
130 +
131 + ### Phase 1: Fill thin workflow gaps (high ROI)
132 +
133 + Modules with 1-3 tests that need expansion, plus missing modules for features we just hardened.
134 +
135 + #### Expand `purchase.rs` (1 -> 6 tests)
136 + - [x] Paid item purchase via mock Stripe: checkout + webhook + transaction verification
137 + - [x] PWYW purchase: buyer submits custom amount, verify correct cents
138 + - [x] Purchase of unlisted item: verify failure
139 + - [x] Duplicate free purchase: verify idempotent (one transaction)
140 + - [x] Purchase adds to library: verify item appears in /library
141 + - [ ] Purchase with promo code: apply discount, verify discounted amount in transaction (deferred to Phase 2 promo_codes_checkout)
142 +
143 + #### Expand `subscriptions.rs` (2 -> 8 tests)
144 + - [x] Create subscription tier with mock Stripe IDs
145 + - [x] List subscription tiers (multiple, ordered)
146 + - [x] Update subscription tier (name, description, deactivation)
147 + - [x] Delete subscription tier
148 + - [x] Subscriber tier visibility (anonymous user sees tier on public page)
149 + - [x] Sandbox tier uses fake Stripe IDs (sandbox_prod_/sandbox_price_ prefix)
150 + - [ ] Subscribe via mock checkout + webhook (deferred — requires Stripe subscription event simulation)
151 + - [ ] Subscription cancel + grace period (deferred — requires subscription lifecycle simulation)
152 +
153 + #### New: `sandbox.rs` (9 tests)
154 + - [x] Create sandbox account: POST /sandbox, redirect to dashboard
155 + - [x] Sandbox blocks custom domains (403)
156 + - [x] Sandbox blocks git repos (403)
157 + - [x] Sandbox blocks imports (403)
158 + - [x] Sandbox blocks guest claim (403)
159 + - [x] Sandbox content not visible on item page (404 from second client)
160 + - [x] Sandbox RSS returns 404
161 + - [x] Sandbox per-IP cap: create MAX_PER_IP, verify next rejected
162 + - [x] Sandbox blog publish: no emails sent
163 +
164 + #### New: `revenue_splits.rs` (5 tests)
165 + - [x] Add project member with split percentage
166 + - [x] Update split percentage
167 + - [x] Remove project member
168 + - [x] Split recorded on purchase (full checkout + webhook + DB verification)
169 + - [x] Split export CSV contains correct data (both seller and collaborator perspectives)
170 +
171 + ### Phase 2: Add missing workflow modules
172 +
173 + #### New: `rate_limiting.rs`
174 + - [ ] Auth rate limit: send burst of login attempts, verify 429 returned after burst exceeded
175 + - [ ] API write rate limit: send burst of POST requests, verify 429
176 + - [ ] Sandbox creation rate limit: send burst of POST /sandbox, verify 429
177 + - [ ] Rate limit headers: verify `x-ratelimit-limit`, `x-ratelimit-remaining`, `retry-after` in responses
178 + - [ ] Rate limit per-IP isolation: verify different IPs have independent quotas
179 +
180 + #### New: `promo_codes_checkout.rs` (end-to-end with mock Stripe)
181 + - [ ] Percentage discount checkout: apply 50% code, verify checkout amount is halved
182 + - [ ] Fixed discount checkout: apply $5 off code, verify amount reduced by 500 cents
183 + - [ ] Free access code checkout: apply 100% code, verify $0 transaction (no Stripe session)
184 + - [ ] Expired promo code: apply expired code, verify rejection
185 + - [ ] Max-uses promo code: use code up to max_uses, verify next attempt rejected
186 + - [ ] Promo code reservation: start checkout with code, verify use_count incremented; abandon checkout, verify use_count released after 25h cleanup
187 +
188 + ### Phase 3: Harness optimization
189 +
190 + #### New helper: `create_creator_with_stripe`
191 + - [ ] Combines `create_creator` + mock Stripe Connect setup (stripe_account_id, charges_enabled, onboarding_complete)
192 + - [ ] Returns `CreatorSetup` with stripe account info for webhook tests
193 + - [ ] Eliminates repeated 5-line SQL blocks across stripe_webhooks, mock_payment_flows, fan_plus, tier_enforcement
194 +
195 + #### New helper: `create_buyers(n)`
196 + - [ ] Batch-create n buyer accounts for tests needing multiple purchasers
197 + - [ ] Returns Vec<UserId> for concurrent purchase tests
198 +
199 + #### Harness speedups
200 + - [ ] `TestHarness::minimal()` alias — skips storage/stripe/scanner setup for auth-only tests
201 + - [ ] Document test database requirements in tests/README.md (PostgreSQL version, template DB, TEST_DATABASE_URL)
202 +
203 + ---
204 +
126 205 ## Code Fuzz Findings (2026-04-25)
127 206
128 207 Three rounds of adversarial code review. 51 findings total: 50 fixed, 1 accepted risk, 2 deferred. Fixed items moved to todo_done.md.
@@ -136,6 +215,20 @@ Three rounds of adversarial code review. 51 findings total: 50 fixed, 1 accepted
136 215 - Rate limit IP extraction trusts X-Forwarded-For when traffic bypasses Cloudflare (helpers.rs). Fix requires splitting rate limit extraction by path: CF-Connecting-IP for public web routes, peer socket for internal/CLI/git. Needs careful routing since CLI, git smart HTTP, and SyncKit all hit the same server but some bypass Cloudflare.
137 216 - S3 key/file size UPDATE queries lack ownership in SQL -- defense-in-depth; callers verify ownership (db/items.rs)
138 217
218 + ## SyncKit Fuzz Findings (2026-04-29)
219 +
220 + ### Serious
221 + - [x] Push retry creates duplicate sync log entries — added `batch_id` column + unique index. Migration 083. Client generates UUID per push batch. Server dedup check before INSERT.
222 + - [x] Auth timing oracle in `sync_auth` — moved `verify_password` before suspension/lockout/2FA checks. All failures return 401.
223 +
224 + ### Medium
225 + - [x] `change_password` TOCTOU — added `expected_version` to PUT, server rejects with 409 on stale version. Added `Conflict` error variant.
226 +
227 + ### Minor
228 + - [x] `prune_sync_log` negative `retain_days` guard — added early return for `retain_days <= 0`.
229 + - [x] Blob size mismatch on concurrent confirm — changed `ON CONFLICT DO NOTHING` to `DO UPDATE SET size_bytes = EXCLUDED.size_bytes`.
230 + - [x] `validate-app` uses GET with API key in query string — changed to POST with JSON body.
231 +
139 232 ## Audit Run 16 (2026-04-29)
140 233
141 234 Overall grade: A- -> A (post-remediation). 75.5k LOC, 986 unit tests (13.1 tests/KLOC). 40+ findings resolved.
@@ -1,1329 +0,0 @@
1 - //! Background scheduler — publishes scheduled items and blog posts when their time arrives.
2 - //!
3 - //! Runs on a fixed interval, checking for items and blog posts whose publish date
4 - //! has passed. Also sends release announcements and blog-post notifications to
5 - //! project mailing lists, and spawns Multithreaded discussion threads for new
6 - //! blog posts. Announcement logic is shared with the manual publish handler.
7 -
8 - use tokio::sync::watch;
9 - use tokio::task::JoinHandle;
10 -
11 - use crate::constants;
12 - use crate::db;
13 - use crate::db::{DbBlogPost, DbItem};
14 - use crate::AppState;
15 -
16 - /// Atomically mark an item as release-announced and send subscriber emails
17 - /// via the project's content mailing list.
18 - ///
19 - /// Shared between the manual publish handler (`routes/api/items.rs`) and the
20 - /// scheduler (this module). Safe to call multiple times — `mark_release_announced`
21 - /// is a no-op if the item was already announced.
22 - pub async fn send_release_announcements(state: &AppState, item: &DbItem) {
23 - if !db::items::mark_release_announced(&state.db, item.id)
24 - .await
25 - .unwrap_or(false)
26 - {
27 - return;
28 - }
29 -
30 - // Skip email delivery for web-only items
31 - if item.web_only {
32 - return;
33 - }
34 -
35 - let Ok(Some(project)) = db::projects::get_project_by_id(&state.db, item.project_id).await
36 - else {
37 - return;
38 - };
39 - let Ok(Some(creator)) = db::users::get_user_by_id(&state.db, project.user_id).await else {
40 - return;
41 - };
42 - let Ok(Some(list)) = db::mailing_lists::get_list_by_project_and_type(
43 - &state.db,
44 - item.project_id,
45 - db::MailingListType::Content,
46 - )
47 - .await
48 - else {
49 - return;
50 - };
51 - let Ok(subscribers) = db::mailing_lists::get_subscriber_emails(&state.db, list.id).await
52 - else {
53 - return;
54 - };
55 -
56 - let creator_name = creator
57 - .display_name
58 - .as_deref()
59 - .unwrap_or(&creator.username)
60 - .to_string();
61 - let item_title = item.title.clone();
62 - let item_url = format!("{}/i/{}", state.config.host_url, item.id);
63 - let email_client = state.email.clone();
64 - let host_url = state.config.host_url.clone();
65 - let signing_secret = state.config.signing_secret.clone();
66 - let list_id_str = list.id.to_string();
67 -
68 - tokio::spawn(async move {
69 - for subscriber in subscribers {
70 - let unsub_url = crate::email::generate_unsubscribe_url(
71 - &host_url,
72 - subscriber.id,
73 - "mailing_list",
74 - &list_id_str,
75 - &signing_secret,
76 - );
77 - if let Err(e) = email_client
78 - .send_release_announcement(
79 - &subscriber.email,
80 - subscriber.display_name.as_deref(),
81 - &creator_name,
82 - &item_title,
83 - &item_url,
84 - Some(&unsub_url),
85 - )
86 - .await
87 - {
88 - tracing::error!(
89 - error = ?e,
90 - "failed to send release announcement email"
91 - );
92 - }
93 - }
94 - });
95 - }
96 -
97 - /// Atomically mark a blog post as announced and send subscriber emails
98 - /// via the project's content mailing list.
99 - ///
100 - /// Shared between the blog post publish handlers and the scheduler.
101 - /// Safe to call multiple times — `mark_blog_post_announced` is a no-op
102 - /// if the post was already announced.
103 - pub async fn send_blog_post_announcements(state: &AppState, post: &DbBlogPost) {
104 - if !db::blog_posts::mark_blog_post_announced(&state.db, post.id)
105 - .await
106 - .unwrap_or(false)
107 - {
108 - return;
109 - }
110 -
111 - // Skip email delivery for web-only posts
112 - if post.web_only {
113 - return;
114 - }
115 -
116 - let Ok(Some(project)) = db::projects::get_project_by_id(&state.db, post.project_id).await
117 - else {
118 - return;
119 - };
120 - let Ok(Some(creator)) = db::users::get_user_by_id(&state.db, project.user_id).await else {
121 - return;
122 - };
123 - let Ok(Some(list)) = db::mailing_lists::get_list_by_project_and_type(
124 - &state.db,
125 - post.project_id,
126 - db::MailingListType::Content,
127 - )
128 - .await
129 - else {
130 - return;
131 - };
132 - let Ok(subscribers) = db::mailing_lists::get_subscriber_emails(&state.db, list.id).await
133 - else {
134 - return;
135 - };
136 -
137 - let creator_name = creator
138 - .display_name
139 - .as_deref()
140 - .unwrap_or(&creator.username)
141 - .to_string();
142 - let post_title = post.title.clone();
143 - let post_url = format!(
144 - "{}/{}/blog/{}",
145 - state.config.host_url, project.slug, post.slug
146 - );
147 - let email_client = state.email.clone();
148 - let host_url = state.config.host_url.clone();
149 - let signing_secret = state.config.signing_secret.clone();
150 - let list_id_str = list.id.to_string();
151 -
152 - tokio::spawn(async move {
153 - for subscriber in subscribers {
154 - let unsub_url = crate::email::generate_unsubscribe_url(
155 - &host_url,
156 - subscriber.id,
157 - "mailing_list",
158 - &list_id_str,
159 - &signing_secret,
160 - );
161 - if let Err(e) = email_client
162 - .send_blog_post_announcement(
163 - &subscriber.email,
164 - subscriber.display_name.as_deref(),
165 - &creator_name,
166 - &post_title,
167 - &post_url,
168 - Some(&unsub_url),
169 - )
170 - .await
171 - {
172 - tracing::error!(
173 - error = ?e,
174 - "failed to send blog post announcement email"
175 - );
176 - }
177 - }
178 - });
179 - }
180 -
181 - /// Process the getting-started email drip sequence.
182 - ///
183 - /// Step 1 (welcome) is sent at signup in the auth handler.
184 - /// Step 2 (profile tips) fires 24h after welcome, skipped if display_name is set.
185 - /// Step 3 (Stripe guide) fires 72h after welcome, skipped if Stripe is connected.
186 - async fn send_onboarding_emails(state: &AppState) {
187 - let host_url = &state.config.host_url;
188 -
189 - // Step 1→2: profile tips (24h after welcome)
190 - let next = OnboardingStep::ProfileTipsSent;
191 - if let Ok(users) =
192 - db::users::get_onboarding_candidates(&state.db, OnboardingStep::WelcomeSent.as_i16(), chrono::Duration::hours(24)).await
193 - {
194 - // Batch-advance users who already set a display name (skip email)
195 - let (skip, send): (Vec<_>, Vec<_>) =
196 - users.into_iter().partition(|u| u.display_name.is_some());
197 - if !skip.is_empty() {
198 - let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect();
199 - if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await {
200 - tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step");
201 - }
202 - }
203 - for user in send {
204 - if let Err(e) = state
205 - .email
206 - .send_onboarding_profile(&user.email, user.display_name.as_deref(), host_url)
207 - .await
208 - {
209 - tracing::error!(error = ?e, user_id = %user.id, "failed to send onboarding profile email");
210 - continue;
211 - }
212 - if let Err(e) = db::users::advance_onboarding_step(&state.db, user.id, next.as_i16()).await {
213 - tracing::warn!(user_id = %user.id, step = ?next, error = ?e, "failed to advance onboarding step");
214 - }
215 - }
216 - }
217 -
218 - // Step 2→3: Stripe guide (72h after welcome)
219 - let next = OnboardingStep::StripeGuideSent;
220 - if let Ok(users) =
221 - db::users::get_onboarding_candidates(&state.db, OnboardingStep::ProfileTipsSent.as_i16(), chrono::Duration::hours(48)).await
222 - {
223 - // Batch-advance users who already connected Stripe (skip email)
224 - let (skip, send): (Vec<_>, Vec<_>) =
225 - users.into_iter().partition(|u| u.stripe_account_id.is_some());
226 - if !skip.is_empty() {
227 - let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect();
228 - if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await {
229 - tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step");
230 - }
231 - }
232 - for user in send {
233 - if let Err(e) = state
234 - .email
235 - .send_onboarding_stripe(&user.email, user.display_name.as_deref(), host_url)
236 - .await
237 - {
238 - tracing::error!(error = ?e, user_id = %user.id, "failed to send onboarding stripe email");
239 - continue;
240 - }
241 - if let Err(e) = db::users::advance_onboarding_step(&state.db, user.id, next.as_i16()).await {
242 - tracing::warn!(user_id = %user.id, step = ?next, error = ?e, "failed to advance onboarding step");
243 - }
244 - }
245 - }
246 - }
247 -
248 - /// Enforce post-grace item hiding for creators whose cancellation grace period has expired.
249 - async fn enforce_post_grace_hiding(state: &AppState) {
250 - match db::creator_tiers::get_expired_grace_creators(&state.db).await {
251 - Ok(user_ids) => {
252 - for user_id in user_ids {
253 - match db::items::hide_all_items_for_user(&state.db, user_id).await {
254 - Ok(count) => {
255 - if count > 0 {
256 - tracing::info!(
257 - user_id = %user_id,
258 - items_hidden = count,
259 - "post-grace enforcement: items hidden"
260 - );
261 - }
262 - }
263 - Err(e) => {
264 - tracing::error!(error = ?e, user_id = %user_id, "failed to hide items for post-grace enforcement");
265 - continue;
266 - }
267 - }
268 - if let Err(e) = db::creator_tiers::mark_grace_enforced(&state.db, user_id).await {
269 - tracing::error!(error = ?e, user_id = %user_id, "failed to mark grace enforced");
270 - }
271 - }
272 - }
273 - Err(e) => {
274 - tracing::error!(error = ?e, "failed to query expired grace creators");
275 - }
276 - }
277 - }
278 -
279 - /// Weekly storage drift correction — recalculates storage_used_bytes for all creators
280 - /// in a single batch query (no N+1 loop).
281 - async fn recalculate_all_storage_used(state: &AppState) {
282 - match db::creator_tiers::recalculate_all_storage_batch(&state.db).await {
283 - Ok(corrected) => {
284 - if corrected > 0 {
285 - tracing::info!(corrected = corrected, "weekly storage drift correction complete");
286 - }
287 - }
288 - Err(e) => {
289 - tracing::error!(error = ?e, "weekly storage drift correction failed");
290 - }
291 - }
292 - }
293 -
294 - /// Weekly drift correction interval in scheduler ticks (10,080 = 7 days at 60s).
295 - const DRIFT_CORRECTION_INTERVAL: u64 = 10_080;
296 -
297 - /// Daily interval in scheduler ticks (1,440 = 24h at 60s).
298 - const DAILY_INTERVAL: u64 = 1440;
299 -
300 - /// Sandbox cleanup interval in scheduler ticks (5 = 5min at 60s).
301 - const SANDBOX_CLEANUP_INTERVAL: u64 = 5;
302 -
303 - /// Maximum webhook retry attempts before marking as dead letter.
304 - const WEBHOOK_MAX_RETRIES: i32 = 5;
305 -
306 - /// Onboarding email drip steps (maps to `onboarding_email_step` i16 column).
307 - #[derive(Debug, Clone, Copy, PartialEq, Eq)]
308 - #[repr(i16)]
309 - enum OnboardingStep {
310 - /// Welcome email sent at signup.
311 - WelcomeSent = 1,
312 - /// Profile tips email (24h after welcome).
313 - ProfileTipsSent = 2,
314 - /// Stripe guide email (72h after welcome).
315 - StripeGuideSent = 3,
316 - }
317 -
318 - impl OnboardingStep {
319 - fn as_i16(self) -> i16 { self as i16 }
320 - }
321 -
322 - /// Determine which scheduled job groups should run for a given tick.
323 - ///
324 - /// Returns `(sandbox_cleanup, daily_jobs, weekly_jobs)`.
325 - /// Extracted for testability — the scheduler loop calls this each tick.
326 - fn jobs_for_tick(tick: u64) -> (bool, bool, bool) {
327 - let sandbox = tick.is_multiple_of(SANDBOX_CLEANUP_INTERVAL);
328 - let daily = tick == 1 || tick.is_multiple_of(DAILY_INTERVAL);
329 - let weekly = tick.is_multiple_of(DRIFT_CORRECTION_INTERVAL);
330 - (sandbox, daily, weekly)
331 - }
332 -
333 - /// Determine whether a webhook retry attempt should be treated as a dead letter.
334 - fn is_webhook_dead(attempt: i32) -> bool {
335 - attempt >= WEBHOOK_MAX_RETRIES
336 - }
337 -
338 - /// Spawn the background scheduler loop. Drop `shutdown_tx` to stop it.
339 - pub fn spawn_scheduler(
340 - state: AppState,
341 - mut shutdown_rx: watch::Receiver<()>,
342 - ) -> JoinHandle<()> {
343 - tokio::spawn(async move {
344 - tracing::info!("Scheduler started (interval={}s)", constants::SCHEDULER_INTERVAL_SECS);
345 -
346 - let mut interval = tokio::time::interval(
347 - std::time::Duration::from_secs(constants::SCHEDULER_INTERVAL_SECS),
348 - );
349 - interval.tick().await; // consume immediate first tick
350 -
351 - let mut tick_count: u64 = 0;
352 -
353 - loop {
354 - tokio::select! {
355 - _ = interval.tick() => {}
356 - _ = shutdown_rx.changed() => {
357 - tracing::info!("Scheduler shutting down");
358 - return;
359 - }
360 - }
361 -
362 - tick_count += 1;
363 -
364 - // Publish scheduled items
365 - match db::items::publish_scheduled_items(&state.db).await {
366 - Ok(items) => {
367 - for item in &items {
368 - tracing::info!(
369 - item_id = %item.id,
370 - title = %item.title,
371 - "scheduler published item"
372 - );
373 - send_release_announcements(&state, item).await;
374 - // Create MT thread for scheduled publishes
375 - if item.mt_thread_id.is_none() {
376 - spawn_mt_thread_for_item_by_lookup(&state, item);
377 - }
378 - }
379 - }
380 - Err(e) => {
381 - tracing::error!(error = ?e, "scheduler failed to publish items");
382 - }
383 - }
384 -
385 - // Send onboarding drip emails
386 - send_onboarding_emails(&state).await;
387 -
388 - // Dispatch pending builds
389 - crate::build_runner::dispatch_pending_build(&state).await;
390 -
391 - // Publish scheduled blog posts
392 - match db::blog_posts::publish_scheduled_blog_posts(&state.db).await {
393 - Ok(posts) => {
394 - for post in &posts {
395 - tracing::info!(
396 - post_id = %post.id,
397 - title = %post.title,
398 - "scheduler published blog post"
399 - );
400 - send_blog_post_announcements(&state, post).await;
401 - // Create MT thread for scheduled publishes
402 - if post.mt_thread_id.is_none() {
403 - spawn_mt_thread_for_blog_post_by_lookup(&state, post);
404 - }
405 - }
406 - }
407 - Err(e) => {
408 - tracing::error!(error = ?e, "scheduler failed to publish blog posts");
409 - }
410 - }
411 -
412 - // Enforce post-grace item hiding (canceled 30+ days ago)
413 - enforce_post_grace_hiding(&state).await;
414 -
415 - // Clean up expired idempotency keys (every tick is fine — cheap DELETE)
416 - if let Err(e) = db::idempotency::cleanup_expired(&state.db).await {
417 - tracing::error!(error = ?e, "failed to clean up expired idempotency keys");
418 - }
419 -
420 - let (run_sandbox, run_daily, run_weekly) = jobs_for_tick(tick_count);
421 -
422 - // Clean up expired sandbox accounts (every 5 ticks = 5 min at 60s interval)
423 - if run_sandbox {
424 - cleanup_sandbox_accounts(&state).await;
425 - }
426 -
427 - // Retry failed webhook events
428 - retry_failed_webhooks(&state).await;
429 -
430 - // Escalate stale pending refunds (unmatched for >24 hours)
431 - escalate_stale_refunds(&state).await;
432 -
433 - // Clean up stale pending transactions (>24h) and release promo code reservations
434 - cleanup_stale_pending_transactions(&state).await;
435 -
436 - // Weekly storage drift correction + integrity checks
437 - if run_weekly {
438 - recalculate_all_storage_used(&state).await;
439 - check_sales_count_drift(&state).await;
440 - }
441 -
442 - // Daily checks (every 1440 ticks at 60s interval, plus first tick after startup)
443 - if run_daily {
444 - check_stale_subscriptions(&state).await;
445 - check_email_bounce_spike(&state).await;
446 -
447 - // Prune session records inactive for 90+ days
448 - let session_threshold = chrono::Utc::now() - chrono::Duration::days(90);
449 - match db::sessions::prune_expired_sessions(&state.db, session_threshold).await {
450 - Ok(n) => {
451 - if n > 0 { tracing::info!(pruned = n, "pruned expired session records"); }
452 - let _ = db::scheduler_jobs::record_job_run(&state.db, "session_prune", n as i64).await;
453 - }
454 - Err(e) => tracing::error!(error = ?e, "failed to prune expired sessions"),
455 - }
456 -
457 - // Scrub IP addresses older than 30 days (privacy policy commitment)
458 - scrub_stale_ip_addresses(&state).await;
459 -
460 -
461 - // Delete terminated accounts whose 30-day export window has expired
462 - delete_expired_terminated_accounts(&state).await;
463 -
464 - // Delete self-deleted creator accounts whose 90-day content grace period has expired
465 - delete_expired_content_removal_accounts(&state).await;
466 - }
467 - }
468 - })
469 - }
470 -
471 - // ============================================================================
472 - // Sandbox cleanup
473 - // ============================================================================
474 -
475 - /// Delete expired sandbox accounts and their S3 objects.
476 - ///
477 - /// S3 cleanup uses three prefix patterns across two buckets:
478 - /// - Main S3: `{user_id}/` (item audio, downloads, covers, versions, media)
479 - /// - Main S3: `projects/{project_id}/` (project images)
480 - /// - SyncKit S3: `{app_id}/` (SyncKit blobs) and `ota/{app_id}/` (OTA artifacts)
481 - ///
482 - /// All prefixes are deleted before the user row CASCADE wipes DB records.
483 - async fn cleanup_sandbox_accounts(state: &AppState) {
484 - let expired_ids = match db::users::get_expired_sandbox_ids(&state.db).await {
485 - Ok(ids) if ids.is_empty() => return,
486 - Ok(ids) => ids,
487 - Err(e) => {
488 - tracing::error!(error = ?e, "failed to query expired sandbox accounts");
489 - return;
490 - }
491 - };
492 -
493 - for user_id in &expired_ids {
494 - cleanup_user_s3_and_delete(state, *user_id, "sandbox_expired", "sandbox").await;
495 - }
496 - }
497 -
498 - // ============================================================================
499 - // Periodic integrity checks
500 - // ============================================================================
Lines truncated
@@ -0,0 +1,253 @@
1 + //! Release and blog post announcement emails via project mailing lists.
2 +
3 + use crate::db;
4 + use crate::db::{DbBlogPost, DbItem};
5 + use crate::AppState;
6 +
7 + /// Atomically mark an item as release-announced and send subscriber emails
8 + /// via the project's content mailing list.
9 + ///
10 + /// Shared between the manual publish handler (`routes/api/items.rs`) and the
11 + /// scheduler. Safe to call multiple times — `mark_release_announced`
12 + /// is a no-op if the item was already announced.
13 + pub async fn send_release_announcements(state: &AppState, item: &DbItem) {
14 + if !db::items::mark_release_announced(&state.db, item.id)
15 + .await
16 + .unwrap_or(false)
17 + {
18 + return;
19 + }
20 +
21 + // Skip email delivery for web-only items
22 + if item.web_only {
23 + return;
24 + }
25 +
26 + let Ok(Some(project)) = db::projects::get_project_by_id(&state.db, item.project_id).await
27 + else {
28 + return;
29 + };
30 + let Ok(Some(creator)) = db::users::get_user_by_id(&state.db, project.user_id).await else {
31 + return;
32 + };
33 + let Ok(Some(list)) = db::mailing_lists::get_list_by_project_and_type(
34 + &state.db,
35 + item.project_id,
36 + db::MailingListType::Content,
37 + )
38 + .await
39 + else {
40 + return;
41 + };
42 + let Ok(subscribers) = db::mailing_lists::get_subscriber_emails(&state.db, list.id).await
43 + else {
44 + return;
45 + };
46 +
47 + let creator_name = creator
48 + .display_name
49 + .as_deref()
50 + .unwrap_or(&creator.username)
51 + .to_string();
52 + let item_title = item.title.clone();
53 + let item_url = format!("{}/i/{}", state.config.host_url, item.id);
54 + let email_client = state.email.clone();
55 + let host_url = state.config.host_url.clone();
56 + let signing_secret = state.config.signing_secret.clone();
57 + let list_id_str = list.id.to_string();
58 +
59 + tokio::spawn(async move {
60 + for subscriber in subscribers {
61 + let unsub_url = crate::email::generate_unsubscribe_url(
62 + &host_url,
63 + subscriber.id,
64 + "mailing_list",
65 + &list_id_str,
66 + &signing_secret,
67 + );
68 + if let Err(e) = email_client
69 + .send_release_announcement(
70 + &subscriber.email,
71 + subscriber.display_name.as_deref(),
72 + &creator_name,
73 + &item_title,
74 + &item_url,
75 + Some(&unsub_url),
76 + )
77 + .await
78 + {
79 + tracing::error!(
80 + error = ?e,
81 + "failed to send release announcement email"
82 + );
83 + }
84 + }
85 + });
86 + }
87 +
88 + /// Atomically mark a blog post as announced and send subscriber emails
89 + /// via the project's content mailing list.
90 + ///
91 + /// Shared between the blog post publish handlers and the scheduler.
92 + /// Safe to call multiple times — `mark_blog_post_announced` is a no-op
93 + /// if the post was already announced.
94 + pub async fn send_blog_post_announcements(state: &AppState, post: &DbBlogPost) {
95 + if !db::blog_posts::mark_blog_post_announced(&state.db, post.id)
96 + .await
97 + .unwrap_or(false)
98 + {
99 + return;
100 + }
101 +
102 + // Skip email delivery for web-only posts
103 + if post.web_only {
104 + return;
105 + }
106 +
107 + let Ok(Some(project)) = db::projects::get_project_by_id(&state.db, post.project_id).await
108 + else {
109 + return;
110 + };
111 + let Ok(Some(creator)) = db::users::get_user_by_id(&state.db, project.user_id).await else {
112 + return;
113 + };
114 + let Ok(Some(list)) = db::mailing_lists::get_list_by_project_and_type(
115 + &state.db,
116 + post.project_id,
117 + db::MailingListType::Content,
118 + )
119 + .await
120 + else {
121 + return;
122 + };
123 + let Ok(subscribers) = db::mailing_lists::get_subscriber_emails(&state.db, list.id).await
124 + else {
125 + return;
126 + };
127 +
128 + let creator_name = creator
129 + .display_name
130 + .as_deref()
131 + .unwrap_or(&creator.username)
132 + .to_string();
133 + let post_title = post.title.clone();
134 + let post_url = format!(
135 + "{}/{}/blog/{}",
136 + state.config.host_url, project.slug, post.slug
137 + );
138 + let email_client = state.email.clone();
139 + let host_url = state.config.host_url.clone();
140 + let signing_secret = state.config.signing_secret.clone();
141 + let list_id_str = list.id.to_string();
142 +
143 + tokio::spawn(async move {
144 + for subscriber in subscribers {
145 + let unsub_url = crate::email::generate_unsubscribe_url(
146 + &host_url,
147 + subscriber.id,
148 + "mailing_list",
149 + &list_id_str,
150 + &signing_secret,
151 + );
152 + if let Err(e) = email_client
153 + .send_blog_post_announcement(
154 + &subscriber.email,
155 + subscriber.display_name.as_deref(),
156 + &creator_name,
157 + &post_title,
158 + &post_url,
159 + Some(&unsub_url),
160 + )
161 + .await
162 + {
163 + tracing::error!(
164 + error = ?e,
165 + "failed to send blog post announcement email"
166 + );
167 + }
168 + }
169 + });
170 + }
171 +
172 + /// Onboarding email drip steps (maps to `onboarding_email_step` i16 column).
173 + #[derive(Debug, Clone, Copy, PartialEq, Eq)]
174 + #[repr(i16)]
175 + enum OnboardingStep {
176 + /// Welcome email sent at signup.
177 + WelcomeSent = 1,
178 + /// Profile tips email (24h after welcome).
179 + ProfileTipsSent = 2,
180 + /// Stripe guide email (72h after welcome).
181 + StripeGuideSent = 3,
182 + }
183 +
184 + impl OnboardingStep {
185 + fn as_i16(self) -> i16 { self as i16 }
186 + }
187 +
188 + /// Process the getting-started email drip sequence.
189 + ///
190 + /// Step 1 (welcome) is sent at signup in the auth handler.
191 + /// Step 2 (profile tips) fires 24h after welcome, skipped if display_name is set.
192 + /// Step 3 (Stripe guide) fires 72h after welcome, skipped if Stripe is connected.
193 + pub(super) async fn send_onboarding_emails(state: &AppState) {
194 + let host_url = &state.config.host_url;
195 +
196 + // Step 1→2: profile tips (24h after welcome)
197 + let next = OnboardingStep::ProfileTipsSent;
198 + if let Ok(users) =
199 + db::users::get_onboarding_candidates(&state.db, OnboardingStep::WelcomeSent.as_i16(), chrono::Duration::hours(24)).await
200 + {
201 + // Batch-advance users who already set a display name (skip email)
202 + let (skip, send): (Vec<_>, Vec<_>) =
203 + users.into_iter().partition(|u| u.display_name.is_some());
204 + if !skip.is_empty() {
205 + let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect();
206 + if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await {
207 + tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step");
208 + }
209 + }
210 + for user in send {
211 + if let Err(e) = state
212 + .email
213 + .send_onboarding_profile(&user.email, user.display_name.as_deref(), host_url)
214 + .await
215 + {
216 + tracing::error!(error = ?e, user_id = %user.id, "failed to send onboarding profile email");
217 + continue;
218 + }
219 + if let Err(e) = db::users::advance_onboarding_step(&state.db, user.id, next.as_i16()).await {
220 + tracing::warn!(user_id = %user.id, step = ?next, error = ?e, "failed to advance onboarding step");
221 + }
222 + }
223 + }
224 +
225 + // Step 2→3: Stripe guide (72h after welcome)
226 + let next = OnboardingStep::StripeGuideSent;
227 + if let Ok(users) =
228 + db::users::get_onboarding_candidates(&state.db, OnboardingStep::ProfileTipsSent.as_i16(), chrono::Duration::hours(48)).await
229 + {
230 + // Batch-advance users who already connected Stripe (skip email)
231 + let (skip, send): (Vec<_>, Vec<_>) =
232 + users.into_iter().partition(|u| u.stripe_account_id.is_some());
233 + if !skip.is_empty() {
234 + let skip_ids: Vec<_> = skip.iter().map(|u| u.id).collect();
235 + if let Err(e) = db::users::batch_advance_onboarding_step(&state.db, &skip_ids, next.as_i16()).await {
236 + tracing::warn!(count = skip_ids.len(), step = ?next, error = ?e, "failed to batch advance onboarding step");
237 + }
238 + }
239 + for user in send {
240 + if let Err(e) = state
241 + .email
242 + .send_onboarding_stripe(&user.email, user.display_name.as_deref(), host_url)
243 + .await
244 + {
245 + tracing::error!(error = ?e, user_id = %user.id, "failed to send onboarding stripe email");
246 + continue;
247 + }
248 + if let Err(e) = db::users::advance_onboarding_step(&state.db, user.id, next.as_i16()).await {
249 + tracing::warn!(user_id = %user.id, step = ?next, error = ?e, "failed to advance onboarding step");
250 + }
251 + }
252 + }
253 + }
@@ -0,0 +1,212 @@
1 + //! Account cleanup: sandbox expiry, terminated accounts, content removal, IP scrubbing.
2 +
3 + use crate::db;
4 + use crate::AppState;
5 +
6 + /// Delete expired sandbox accounts and their S3 objects.
7 + pub(super) async fn cleanup_sandbox_accounts(state: &AppState) {
8 + let expired_ids = match db::users::get_expired_sandbox_ids(&state.db).await {
9 + Ok(ids) if ids.is_empty() => return,
10 + Ok(ids) => ids,
11 + Err(e) => {
12 + tracing::error!(error = ?e, "failed to query expired sandbox accounts");
13 + return;
14 + }
15 + };
16 +
17 + for user_id in &expired_ids {
18 + cleanup_user_s3_and_delete(state, *user_id, "sandbox_expired", "sandbox").await;
19 + }
20 + }
21 +
22 + /// Clean up a user's S3 objects, git repos, and CASCADE-delete the user row.
23 + ///
24 + /// Shared between sandbox, terminated, and content-removal account cleanup.
25 + /// S3 objects are deleted first (before CASCADE removes the DB rows that reference them).
26 + async fn cleanup_user_s3_and_delete(state: &AppState, user_id: db::UserId, event: &str, label: &str) -> bool {
27 + // Main S3 bucket: user-level and project-level objects
28 + if let Some(ref s3) = state.s3 {
29 + let user_prefix = format!("{user_id}/");
30 + if let Err(e) = s3.delete_prefix(&user_prefix).await {
31 + tracing::warn!(error = ?e, %user_id, "{label}: failed to delete user S3 objects");
32 + }
33 +
34 + if let Ok(project_ids) = db::projects::get_project_ids_for_user(&state.db, user_id).await {
35 + for pid in project_ids {
36 + let proj_prefix = format!("projects/{pid}/");
37 + if let Err(e) = s3.delete_prefix(&proj_prefix).await {
38 + tracing::warn!(error = ?e, %user_id, %pid, "{label}: failed to delete project S3 objects");
39 + }
40 + }
41 + }
42 + }
43 +
44 + // SyncKit S3 bucket: blobs and OTA artifacts
45 + if let Some(ref synckit_s3) = state.synckit_s3 {
46 + if let Ok(apps) = db::synckit::get_sync_apps_by_creator(&state.db, user_id).await {
47 + for app in &apps {
48 + let blob_prefix = format!("{}/", app.id);
49 + if let Err(e) = synckit_s3.delete_prefix(&blob_prefix).await {
50 + tracing::warn!(error = ?e, %user_id, app_id = %app.id, "{label}: failed to delete SyncKit blobs");
51 + }
52 + let ota_prefix = format!("ota/{}/", app.id);
53 + if let Err(e) = synckit_s3.delete_prefix(&ota_prefix).await {
54 + tracing::warn!(error = ?e, %user_id, app_id = %app.id, "{label}: failed to delete OTA artifacts");
55 + }
56 + }
57 + }
58 + }
59 +
60 + // Git repos on disk
61 + if let Some(ref git_root) = state.config.git_repos_path {
62 + if let Ok(Some(user)) = db::users::get_user_by_id(&state.db, user_id).await {
63 + cleanup_git_repos_on_disk(git_root, &user.username, &user_id);
64 + }
65 + }
66 +
67 + // CASCADE delete user row
68 + if let Err(e) = db::users::delete_user(&state.db, user_id).await {
69 + tracing::error!(error = ?e, %user_id, "{label}: failed to delete account");
70 + false
71 + } else {
72 + tracing::info!(%user_id, event = event, "{label}: account cleaned up");
73 + true
74 + }
75 + }
76 +
77 + /// Remove a user's bare git repositories from disk.
78 + ///
79 + /// Must be called before `delete_user` (which CASCADE-deletes the git_repos rows).
80 + /// Best-effort: logs warnings on failure but does not block account deletion.
81 + pub(super) fn cleanup_git_repos_on_disk(git_repos_path: &str, username: &str, user_id: &db::UserId) {
82 + let user_git_dir = std::path::Path::new(git_repos_path).join(username);
83 + if user_git_dir.exists() {
84 + match std::fs::remove_dir_all(&user_git_dir) {
85 + Ok(()) => tracing::info!(%user_id, path = %user_git_dir.display(), "deleted git repos from disk"),
86 + Err(e) => tracing::warn!(error = ?e, %user_id, path = %user_git_dir.display(), "failed to delete git repos from disk"),
87 + }
88 + }
89 + }
90 +
91 + /// Delete accounts that were terminated >30 days ago.
92 + pub(super) async fn delete_expired_terminated_accounts(state: &AppState) {
93 + let expired_ids = match db::users::get_expired_terminated_ids(&state.db).await {
94 + Ok(ids) if ids.is_empty() => {
95 + let _ = db::scheduler_jobs::record_job_run(&state.db, "terminated_account_cleanup", 0).await;
96 + return;
97 + }
98 + Ok(ids) => ids,
99 + Err(e) => {
100 + tracing::error!(error = ?e, "failed to query expired terminated accounts");
101 + return;
102 + }
103 + };
104 +
105 + let mut deleted = 0i64;
106 + for user_id in &expired_ids {
107 + if cleanup_user_s3_and_delete(state, *user_id, "termination_expired", "terminated account").await {
108 + deleted += 1;
109 + }
110 + }
111 + let _ = db::scheduler_jobs::record_job_run(&state.db, "terminated_account_cleanup", deleted).await;
112 + }
113 +
114 + /// Delete creator accounts whose 90-day content removal grace period has expired.
115 + pub(super) async fn delete_expired_content_removal_accounts(state: &AppState) {
116 + let expired_ids = match db::users::get_expired_content_removal_ids(&state.db).await {
117 + Ok(ids) if ids.is_empty() => {
118 + let _ = db::scheduler_jobs::record_job_run(&state.db, "content_removal_cleanup", 0).await;
119 + return;
120 + }
121 + Ok(ids) => ids,
122 + Err(e) => {
123 + tracing::error!(error = ?e, "failed to query expired content removal accounts");
124 + return;
125 + }
126 + };
127 +
128 + let mut deleted = 0i64;
129 + for user_id in &expired_ids {
130 + if cleanup_user_s3_and_delete(state, *user_id, "content_removal_expired", "content removal").await {
131 + deleted += 1;
132 + }
133 + }
134 + let _ = db::scheduler_jobs::record_job_run(&state.db, "content_removal_cleanup", deleted).await;
135 + }
136 +
137 + /// Delete pending transactions older than 25 hours and release promo code reservations.
138 + pub(super) async fn cleanup_stale_pending_transactions(state: &AppState) {
139 + let promo_ids = match db::transactions::cleanup_stale_pending(
140 + &state.db,
141 + chrono::Duration::hours(25),
142 + )
143 + .await
144 + {
145 + Ok(ids) if ids.is_empty() => return,
146 + Ok(ids) => ids,
147 + Err(e) => {
148 + tracing::error!(error = ?e, "failed to clean up stale pending transactions");
149 + return;
150 + }
151 + };
152 +
153 + let mut released = 0u64;
154 + for pc_id in promo_ids.into_iter().flatten() {
155 + if let Err(e) = db::promo_codes::release_use_count(&state.db, pc_id).await {
156 + tracing::warn!(promo_code_id = %pc_id, error = ?e, "failed to release promo code use count");
157 + } else {
158 + released += 1;
159 + }
160 + }
161 +
162 + if released > 0 {
163 + tracing::info!(released, "released promo code reservations from stale pending transactions");
164 + }
165 + }
166 +
167 + /// NULL out IP addresses older than 30 days in user_sessions.
168 + pub(super) async fn scrub_stale_ip_addresses(state: &AppState) {
169 + let cutoff = chrono::Utc::now() - chrono::Duration::days(30);
170 +
171 + match sqlx::query(
172 + "UPDATE user_sessions SET ip_address = NULL WHERE ip_address IS NOT NULL AND created_at < $1",
173 + )
174 + .bind(cutoff)
175 + .execute(&state.db)
176 + .await
177 + {
178 + Ok(r) => {
179 + if r.rows_affected() > 0 {
180 + tracing::info!(scrubbed = r.rows_affected(), "scrubbed stale IP addresses (30-day retention)");
181 + }
182 + let _ = db::scheduler_jobs::record_job_run(&state.db, "ip_scrub", r.rows_affected() as i64).await;
183 + }
184 + Err(e) => tracing::error!(error = ?e, "failed to scrub IPs from user_sessions"),
185 + }
186 + }
187 +
188 + #[cfg(test)]
189 + mod tests {
190 + use super::*;
191 +
192 + #[test]
193 + fn cleanup_git_repos_removes_directory() {
194 + let tmp = tempfile::tempdir().unwrap();
195 + let git_root = tmp.path();
196 + let user_dir = git_root.join("testuser");
197 + std::fs::create_dir_all(user_dir.join("repo.git")).unwrap();
198 + std::fs::write(user_dir.join("repo.git/HEAD"), "ref: refs/heads/main\n").unwrap();
199 +
200 + let user_id = db::UserId::nil();
201 + cleanup_git_repos_on_disk(git_root.to_str().unwrap(), "testuser", &user_id);
202 +
203 + assert!(!user_dir.exists(), "user git directory should be deleted");
204 + }
205 +
206 + #[test]
207 + fn cleanup_git_repos_noop_if_missing() {
208 + let tmp = tempfile::tempdir().unwrap();
209 + let user_id = db::UserId::nil();
210 + cleanup_git_repos_on_disk(tmp.path().to_str().unwrap(), "nonexistent", &user_id);
211 + }
212 + }
@@ -0,0 +1,156 @@
1 + //! Periodic integrity checks: sales count drift, stale subscriptions, bounce spikes,
2 + //! storage recalculation, and post-grace enforcement.
3 +
4 + use crate::db;
5 + use crate::AppState;
6 +
7 + /// Weekly storage drift correction — batch recalculates storage_used_bytes for all creators.
8 + pub(super) async fn recalculate_all_storage_used(state: &AppState) {
9 + match db::creator_tiers::recalculate_all_storage_batch(&state.db).await {
10 + Ok(corrected) => {
11 + if corrected > 0 {
12 + tracing::info!(corrected = corrected, "weekly storage drift correction complete");
13 + }
14 + }
15 + Err(e) => {
16 + tracing::error!(error = ?e, "weekly storage drift correction failed");
17 + }
18 + }
19 + }
20 +
21 + /// Enforce post-grace item hiding for creators whose cancellation grace period has expired.
22 + pub(super) async fn enforce_post_grace_hiding(state: &AppState) {
23 + match db::creator_tiers::get_expired_grace_creators(&state.db).await {
24 + Ok(user_ids) => {
25 + for user_id in user_ids {
26 + match db::items::hide_all_items_for_user(&state.db, user_id).await {
27 + Ok(count) => {
28 + if count > 0 {
29 + tracing::info!(
30 + user_id = %user_id,
31 + items_hidden = count,
32 + "post-grace enforcement: items hidden"
33 + );
34 + }
35 + }
36 + Err(e) => {
37 + tracing::error!(error = ?e, user_id = %user_id, "failed to hide items for post-grace enforcement");
38 + continue;
39 + }
40 + }
41 + if let Err(e) = db::creator_tiers::mark_grace_enforced(&state.db, user_id).await {
42 + tracing::error!(error = ?e, user_id = %user_id, "failed to mark grace enforced");
43 + }
44 + }
45 + }
46 + Err(e) => {
47 + tracing::error!(error = ?e, "failed to query expired grace creators");
48 + }
49 + }
50 + }
51 +
52 + /// Detect items where denormalized sales_count has drifted from actual transaction count.
53 + pub(super) async fn check_sales_count_drift(state: &AppState) {
54 + let rows = match sqlx::query_as::<_, (db::ItemId, i32, i64)>(
55 + r#"
56 + SELECT i.id, i.sales_count, COUNT(t.id)
57 + FROM items i
58 + LEFT JOIN transactions t ON t.item_id = i.id AND t.status = 'completed'
59 + GROUP BY i.id
60 + HAVING i.sales_count != COUNT(t.id)
61 + LIMIT 50
62 + "#,
63 + )
64 + .fetch_all(&state.db)
65 + .await
66 + {
67 + Ok(r) if r.is_empty() => return,
68 + Ok(r) => r,
69 + Err(e) => {
70 + tracing::error!(error = ?e, "sales count drift check failed");
71 + return;
72 + }
73 + };
74 +
75 + tracing::warn!(count = rows.len(), "sales count drift detected");
76 +
77 + if let Some(ref wam) = state.wam {
78 + let items: Vec<String> = rows
79 + .iter()
80 + .map(|(id, cached, actual)| format!(" {id}: cached={cached}, actual={actual}"))
81 + .collect();
82 + let body = format!("Items with drifted sales_count:\n{}", items.join("\n"));
83 + wam.create_ticket(
84 + &format!("Sales count drift: {} items", rows.len()),
85 + Some(&body),
86 + "medium",
87 + "sales-count-drift",
88 + None,
89 + )
90 + .await;
91 + }
92 + }
93 +
94 + /// Find subscriptions stuck in past_due for >7 days (possible missed webhook).
95 + pub(super) async fn check_stale_subscriptions(state: &AppState) {
96 + let count: i64 = match sqlx::query_scalar(
97 + r#"
98 + SELECT COUNT(*) FROM (
99 + SELECT 1 FROM creator_subscriptions WHERE status = 'past_due' AND updated_at < NOW() - INTERVAL '7 days'
100 + UNION ALL
101 + SELECT 1 FROM subscriptions WHERE status = 'past_due' AND updated_at < NOW() - INTERVAL '7 days'
102 + ) stale
103 + "#,
104 + )
105 + .fetch_one(&state.db)
106 + .await
107 + {
108 + Ok(c) => c,
109 + Err(e) => {
110 + tracing::error!(error = ?e, "stale subscription check failed");
111 + return;
112 + }
113 + };
114 +
115 + if count > 0 {
116 + tracing::warn!(count, "stale past_due subscriptions detected");
117 + if let Some(ref wam) = state.wam {
118 + wam.create_ticket(
119 + &format!("{count} subscriptions past_due >7 days"),
120 + Some("Subscriptions stuck in past_due for over 7 days. A Stripe webhook may have been missed. Check the Stripe dashboard."),
121 + "medium",
122 + "subscription-stale-past-due",
123 + None,
124 + ).await;
125 + }
126 + }
127 + }
128 +
129 + /// Detect email bounce/complaint spikes (>10 suppressions in 24h).
130 + pub(super) async fn check_email_bounce_spike(state: &AppState) {
131 + let count: i64 = match sqlx::query_scalar(
132 + "SELECT COUNT(*) FROM email_suppressions WHERE created_at > NOW() - INTERVAL '24 hours'",
133 + )
134 + .fetch_one(&state.db)
135 + .await
136 + {
137 + Ok(c) => c,
138 + Err(e) => {
139 + tracing::error!(error = ?e, "email bounce spike check failed");
140 + return;
141 + }
142 + };
143 +
144 + if count > 10 {
145 + tracing::warn!(count, "email bounce/complaint spike");
146 + if let Some(ref wam) = state.wam {
147 + wam.create_ticket(
148 + &format!("Email bounce spike: {count} suppressions in 24h"),
149 + Some("Elevated bounce/complaint rate may indicate a deliverability problem. Check Postmark dashboard."),
150 + "high",
151 + "email-bounce-spike",
152 + None,
153 + ).await;
154 + }
155 + }
156 + }
@@ -0,0 +1,248 @@
1 + //! Background scheduler — runs periodic jobs on a fixed interval.
2 + //!
3 + //! Every tick: publish scheduled items/posts, send onboarding emails, dispatch builds.
4 + //! Every 5 ticks: sandbox cleanup. Every tick: webhook retry, refund escalation, transaction cleanup.
5 + //! Daily: subscription checks, bounce monitoring, session pruning, IP scrubbing, account deletion.
6 + //! Weekly: storage drift correction, sales count integrity.
7 +
8 + mod announcements;
9 + mod cleanup;
10 + mod integrity;
11 + mod mt_threads;
12 + mod webhooks;
13 +
14 + use tokio::sync::watch;
15 + use tokio::task::JoinHandle;
16 +
17 + use crate::constants;
18 + use crate::db;
19 + use crate::AppState;
20 +
21 + // Re-export public API used by route handlers
22 + pub use announcements::{send_blog_post_announcements, send_release_announcements};
23 + pub use mt_threads::{spawn_mt_thread_for_blog_post, spawn_mt_thread_for_item};
24 +
25 + /// Weekly drift correction interval in scheduler ticks (10,080 = 7 days at 60s).
26 + const DRIFT_CORRECTION_INTERVAL: u64 = 10_080;
27 +
28 + /// Daily interval in scheduler ticks (1,440 = 24h at 60s).
29 + const DAILY_INTERVAL: u64 = 1440;
30 +
31 + /// Sandbox cleanup interval in scheduler ticks (5 = 5min at 60s).
32 + const SANDBOX_CLEANUP_INTERVAL: u64 = 5;
33 +
34 + /// Determine which scheduled job groups should run for a given tick.
35 + ///
36 + /// Returns `(sandbox_cleanup, daily_jobs, weekly_jobs)`.
37 + fn jobs_for_tick(tick: u64) -> (bool, bool, bool) {
38 + let sandbox = tick.is_multiple_of(SANDBOX_CLEANUP_INTERVAL);
39 + let daily = tick == 1 || tick.is_multiple_of(DAILY_INTERVAL);
40 + let weekly = tick.is_multiple_of(DRIFT_CORRECTION_INTERVAL);
41 + (sandbox, daily, weekly)
42 + }
43 +
44 + /// Spawn the background scheduler loop. Drop `shutdown_tx` to stop it.
45 + pub fn spawn_scheduler(
46 + state: AppState,
47 + mut shutdown_rx: watch::Receiver<()>,
48 + ) -> JoinHandle<()> {
49 + tokio::spawn(async move {
50 + tracing::info!("Scheduler started (interval={}s)", constants::SCHEDULER_INTERVAL_SECS);
51 +
52 + let mut interval = tokio::time::interval(
53 + std::time::Duration::from_secs(constants::SCHEDULER_INTERVAL_SECS),
54 + );
55 + interval.tick().await; // consume immediate first tick
56 +
57 + let mut tick_count: u64 = 0;
58 +
59 + loop {
60 + tokio::select! {
61 + _ = interval.tick() => {}
62 + _ = shutdown_rx.changed() => {
63 + tracing::info!("Scheduler shutting down");
64 + return;
65 + }
66 + }
67 +
68 + tick_count += 1;
69 +
70 + // Publish scheduled items
71 + match db::items::publish_scheduled_items(&state.db).await {
72 + Ok(items) => {
73 + for item in &items {
74 + tracing::info!(
75 + item_id = %item.id,
76 + title = %item.title,
77 + "scheduler published item"
78 + );
79 + announcements::send_release_announcements(&state, item).await;
80 + if item.mt_thread_id.is_none() {
81 + mt_threads::spawn_mt_thread_for_item_by_lookup(&state, item);
82 + }
83 + }
84 + }
85 + Err(e) => {
86 + tracing::error!(error = ?e, "scheduler failed to publish items");
87 + }
88 + }
89 +
90 + // Send onboarding drip emails
91 + announcements::send_onboarding_emails(&state).await;
92 +
93 + // Dispatch pending builds
94 + crate::build_runner::dispatch_pending_build(&state).await;
95 +
96 + // Publish scheduled blog posts
97 + match db::blog_posts::publish_scheduled_blog_posts(&state.db).await {
98 + Ok(posts) => {
99 + for post in &posts {
100 + tracing::info!(
101 + post_id = %post.id,
102 + title = %post.title,
103 + "scheduler published blog post"
104 + );
105 + announcements::send_blog_post_announcements(&state, post).await;
106 + if post.mt_thread_id.is_none() {
107 + mt_threads::spawn_mt_thread_for_blog_post_by_lookup(&state, post);
108 + }
109 + }
110 + }
111 + Err(e) => {
112 + tracing::error!(error = ?e, "scheduler failed to publish blog posts");
113 + }
114 + }
115 +
116 + // Enforce post-grace item hiding (canceled 30+ days ago)
117 + integrity::enforce_post_grace_hiding(&state).await;
118 +
119 + // Clean up expired idempotency keys (every tick is fine — cheap DELETE)
120 + if let Err(e) = db::idempotency::cleanup_expired(&state.db).await {
121 + tracing::error!(error = ?e, "failed to clean up expired idempotency keys");
122 + }
123 +
124 + let (run_sandbox, run_daily, run_weekly) = jobs_for_tick(tick_count);
125 +
126 + // Clean up expired sandbox accounts (every 5 ticks = 5 min at 60s interval)
127 + if run_sandbox {
128 + cleanup::cleanup_sandbox_accounts(&state).await;
129 + }
130 +
131 + // Retry failed webhook events
132 + webhooks::retry_failed_webhooks(&state).await;
133 +
134 + // Escalate stale pending refunds (unmatched for >24 hours)
135 + webhooks::escalate_stale_refunds(&state).await;
136 +
137 + // Clean up stale pending transactions (>24h) and release promo code reservations
138 + cleanup::cleanup_stale_pending_transactions(&state).await;
139 +
140 + // Weekly storage drift correction + integrity checks
141 + if run_weekly {
142 + integrity::recalculate_all_storage_used(&state).await;
143 + integrity::check_sales_count_drift(&state).await;
144 + }
145 +
146 + // Daily checks (every 1440 ticks at 60s interval, plus first tick after startup)
147 + if run_daily {
148 + integrity::check_stale_subscriptions(&state).await;
149 + integrity::check_email_bounce_spike(&state).await;
150 +
151 + // Prune session records inactive for 90+ days
152 + let session_threshold = chrono::Utc::now() - chrono::Duration::days(90);
153 + match db::sessions::prune_expired_sessions(&state.db, session_threshold).await {
154 + Ok(n) => {
155 + if n > 0 { tracing::info!(pruned = n, "pruned expired session records"); }
156 + let _ = db::scheduler_jobs::record_job_run(&state.db, "session_prune", n as i64).await;
157 + }
158 + Err(e) => tracing::error!(error = ?e, "failed to prune expired sessions"),
159 + }
160 +
161 + // Scrub IP addresses older than 30 days (privacy policy commitment)
162 + cleanup::scrub_stale_ip_addresses(&state).await;
163 +
164 + // Delete terminated accounts whose 30-day export window has expired
165 + cleanup::delete_expired_terminated_accounts(&state).await;
166 +
167 + // Delete self-deleted creator accounts whose 90-day content grace period has expired
168 + cleanup::delete_expired_content_removal_accounts(&state).await;
169 + }
170 + }
171 + })
172 + }
173 +
174 + #[cfg(test)]
175 + mod tests {
176 + use super::*;
177 +
178 + // ── Tick cadence tests ──
179 +
180 + #[test]
181 + fn tick_1_runs_daily_not_weekly() {
182 + let (sandbox, daily, weekly) = jobs_for_tick(1);
183 + assert!(!sandbox, "tick 1 is not a multiple of 5");
184 + assert!(daily, "tick 1 should trigger daily jobs (first-tick rule)");
185 + assert!(!weekly, "tick 1 should not trigger weekly jobs");
186 + }
187 +
188 + #[test]
189 + fn tick_5_runs_sandbox_cleanup() {
190 + let (sandbox, daily, weekly) = jobs_for_tick(5);
191 + assert!(sandbox);
192 + assert!(!daily);
193 + assert!(!weekly);
194 + }
195 +
196 + #[test]
197 + fn tick_1440_runs_daily_and_sandbox() {
198 + let (sandbox, daily, weekly) = jobs_for_tick(1440);
199 + assert!(sandbox, "1440 is divisible by 5");
200 + assert!(daily, "1440 is the daily interval");
201 + assert!(!weekly);
202 + }
203 +
204 + #[test]
205 + fn tick_10080_runs_all_three() {
206 + let (sandbox, daily, weekly) = jobs_for_tick(10_080);
207 + assert!(sandbox, "10080 is divisible by 5");
208 + assert!(daily, "10080 is divisible by 1440");
209 + assert!(weekly, "10080 is the weekly interval");
210 + }
211 +
212 + #[test]
213 + fn normal_tick_runs_nothing_special() {
214 + let (sandbox, daily, weekly) = jobs_for_tick(7);
215 + assert!(!sandbox);
216 + assert!(!daily);
217 + assert!(!weekly);
218 + }
219 +
220 + #[test]
221 + fn second_daily_tick() {
222 + let (_, daily, _) = jobs_for_tick(2880);
223 + assert!(daily, "2880 = 2 * 1440");
224 + }
225 +
226 + #[test]
227 + fn second_weekly_tick() {
228 + let (_, _, weekly) = jobs_for_tick(20_160);
229 + assert!(weekly, "20160 = 2 * 10080");
230 + }
231 +
232 + // ── Interval constant sanity checks ──
233 +
234 + #[test]
235 + fn drift_correction_interval_is_7_days() {
236 + assert_eq!(DRIFT_CORRECTION_INTERVAL, 7 * 24 * 60);
237 + }
238 +
239 + #[test]
240 + fn daily_interval_is_24_hours() {
241 + assert_eq!(DAILY_INTERVAL, 24 * 60);
242 + }
243 +
244 + #[test]
245 + fn sandbox_cleanup_interval_is_5_minutes() {
246 + assert_eq!(SANDBOX_CLEANUP_INTERVAL, 5);
247 + }
248 + }
@@ -0,0 +1,243 @@
1 + //! Multithreaded forum thread provisioning for published items and blog posts.
2 +
3 + use crate::db;
4 + use crate::db::{DbBlogPost, DbItem};
5 + use crate::AppState;
6 +
7 + /// Fire-and-forget: create an MT discussion thread for a published item.
8 + /// Called from the item update handler where the user is available.
9 + pub fn spawn_mt_thread_for_item(
10 + state: &AppState,
11 + item: &DbItem,
12 + user: &crate::auth::SessionUser,
13 + ) {
14 + let Some(ref mt) = state.mt_client else {
15 + return;
16 + };
17 + let mt = mt.clone();
18 + let db_pool = state.db.clone();
19 + let host_url = state.config.host_url.clone();
20 + let item_id = item.id;
21 + let item_title = item.title.clone();
22 + let project_id = item.project_id;
23 + let user_id = *user.id;
24 + let username = user.username.to_string();
25 + let display_name = user.display_name.clone();
26 +
27 + tokio::spawn(async move {
28 + create_mt_thread_for_item(
29 + &mt,
30 + &db_pool,
31 + &host_url,
32 + item_id,
33 + &item_title,
34 + project_id,
35 + user_id,
36 + &username,
37 + display_name.as_deref(),
38 + )
39 + .await;
40 + });
41 + }
42 +
43 + /// Fire-and-forget: create an MT discussion thread for a published item
44 + /// (scheduler version — looks up the project/user from DB).
45 + pub(super) fn spawn_mt_thread_for_item_by_lookup(state: &AppState, item: &DbItem) {
46 + let Some(ref mt) = state.mt_client else {
47 + return;
48 + };
49 + let mt = mt.clone();
50 + let db_pool = state.db.clone();
51 + let host_url = state.config.host_url.clone();
52 + let item_id = item.id;
53 + let item_title = item.title.clone();
54 + let project_id = item.project_id;
55 +
56 + tokio::spawn(async move {
57 + let Ok(Some(project)) = db::projects::get_project_by_id(&db_pool, project_id).await
58 + else {
59 + return;
60 + };
61 + let Ok(Some(user)) = db::users::get_user_by_id(&db_pool, project.user_id).await else {
62 + return;
63 + };
64 + create_mt_thread_for_item(
65 + &mt,
66 + &db_pool,
67 + &host_url,
68 + item_id,
69 + &item_title,
70 + project_id,
71 + *user.id,
72 + &user.username,
73 + user.display_name.as_deref(),
74 + )
75 + .await;
76 + });
77 + }
78 +
79 + #[allow(clippy::too_many_arguments)]
80 + async fn create_mt_thread_for_item(
81 + mt: &crate::mt_client::MtClient,
82 + db_pool: &sqlx::PgPool,
83 + host_url: &str,
84 + item_id: db::ItemId,
85 + item_title: &str,
86 + project_id: db::ProjectId,
87 + user_id: uuid::Uuid,
88 + username: &str,
89 + display_name: Option<&str>,
90 + ) {
91 + let Ok(Some(project)) = db::projects::get_project_by_id(db_pool, project_id).await else {
92 + return;
93 + };
94 +
95 + let item_url = format!("{}/i/{}", host_url, item_id);
96 + let body = format!("Discussion for [{}]({})", item_title, item_url);
97 + let external_ref = format!("mnw:item:{}", item_id);
98 +
99 + match mt
100 + .create_thread(&crate::mt_client::CreateThreadRequest {
101 + community_slug: project.slug.to_string(),
102 + category_slug: "items".to_string(),
103 + title: item_title.to_string(),
104 + body_markdown: body,
105 + author_mnw_id: user_id,
106 + author_username: username.to_string(),
107 + author_display_name: display_name.map(String::from),
108 + external_ref,
109 + })
110 + .await
111 + {
112 + Ok(resp) => {
113 + if let Err(e) = db::items::set_mt_thread_id(db_pool, item_id, resp.thread_id).await {
114 + tracing::warn!(error = ?e, "failed to store MT thread ID for item");
115 + }
116 + }
117 + Err(e) => tracing::warn!(error = ?e, %item_id, "MT thread creation failed for item"),
118 + }
119 + }
120 +
121 + /// Fire-and-forget: create an MT discussion thread for a published blog post.
122 + /// Called from the blog post handler where the user is available.
123 + pub fn spawn_mt_thread_for_blog_post(
124 + state: &AppState,
125 + post: &DbBlogPost,
126 + user: &crate::auth::SessionUser,
127 + ) {
128 + let Some(ref mt) = state.mt_client else {
129 + return;
130 + };
131 + let mt = mt.clone();
132 + let db_pool = state.db.clone();
133 + let host_url = state.config.host_url.clone();
134 + let post_id = post.id;
135 + let post_title = post.title.clone();
136 + let post_slug = post.slug.to_string();
137 + let project_id = post.project_id;
138 + let user_id = *user.id;
139 + let username = user.username.to_string();
140 + let display_name = user.display_name.clone();
141 +
142 + tokio::spawn(async move {
143 + create_mt_thread_for_blog_post(
144 + &mt,
145 + &db_pool,
146 + &host_url,
147 + post_id,
148 + &post_title,
149 + &post_slug,
150 + project_id,
151 + user_id,
152 + &username,
153 + display_name.as_deref(),
154 + )
155 + .await;
156 + });
157 + }
158 +
159 + /// Fire-and-forget: create an MT discussion thread for a published blog post
160 + /// (scheduler version — looks up the project/user from DB).
161 + pub(super) fn spawn_mt_thread_for_blog_post_by_lookup(state: &AppState, post: &DbBlogPost) {
162 + let Some(ref mt) = state.mt_client else {
163 + return;
164 + };
165 + let mt = mt.clone();
166 + let db_pool = state.db.clone();
167 + let host_url = state.config.host_url.clone();
168 + let post_id = post.id;
169 + let post_title = post.title.clone();
170 + let post_slug = post.slug.to_string();
171 + let project_id = post.project_id;
172 +
173 + tokio::spawn(async move {
174 + let Ok(Some(project)) = db::projects::get_project_by_id(&db_pool, project_id).await
175 + else {
176 + return;
177 + };
178 + let Ok(Some(user)) = db::users::get_user_by_id(&db_pool, project.user_id).await else {
179 + return;
180 + };
181 + create_mt_thread_for_blog_post(
182 + &mt,
183 + &db_pool,
184 + &host_url,
185 + post_id,
186 + &post_title,
187 + &post_slug,
188 + project_id,
189 + *user.id,
190 + &user.username,
191 + user.display_name.as_deref(),
192 + )
193 + .await;
194 + });
195 + }
196 +
197 + #[allow(clippy::too_many_arguments)]
198 + async fn create_mt_thread_for_blog_post(
199 + mt: &crate::mt_client::MtClient,
200 + db_pool: &sqlx::PgPool,
201 + host_url: &str,
202 + post_id: db::BlogPostId,
203 + post_title: &str,
204 + post_slug: &str,
205 + project_id: db::ProjectId,
206 + user_id: uuid::Uuid,
207 + username: &str,
208 + display_name: Option<&str>,
209 + ) {
210 + let Ok(Some(project)) = db::projects::get_project_by_id(db_pool, project_id).await else {
211 + return;
212 + };
213 +
214 + let post_url = format!(
215 + "{}/{}/blog/{}",
216 + host_url, project.slug, post_slug
217 + );
218 + let body = format!("Discussion for [{}]({})", post_title, post_url);
219 + let external_ref = format!("mnw:blog:{}", post_id);
220 +
221 + match mt
222 + .create_thread(&crate::mt_client::CreateThreadRequest {
223 + community_slug: project.slug.to_string(),
224 + category_slug: "blog".to_string(),
225 + title: post_title.to_string(),
226 + body_markdown: body,
227 + author_mnw_id: user_id,
228 + author_username: username.to_string(),
229 + author_display_name: display_name.map(String::from),
230 + external_ref,
231 + })
232 + .await
233 + {
234 + Ok(resp) => {
235 + if let Err(e) =
236 + db::blog_posts::set_mt_thread_id(db_pool, post_id, resp.thread_id).await
237 + {
238 + tracing::warn!(error = ?e, "failed to store MT thread ID for blog post");
239 + }
240 + }
241 + Err(e) => tracing::warn!(error = ?e, %post_id, "MT thread creation failed for blog post"),
242 + }
243 + }
@@ -0,0 +1,190 @@
1 + //! Webhook retry with exponential backoff and stale refund escalation.
2 +
3 + use crate::db;
4 + use crate::AppState;
5 +
6 + /// Maximum webhook retry attempts before marking as dead letter.
7 + const WEBHOOK_MAX_RETRIES: i32 = 5;
8 +
9 + /// Determine whether a webhook retry attempt should be treated as a dead letter.
10 + pub(super) fn is_webhook_dead(attempt: i32) -> bool {
11 + attempt >= WEBHOOK_MAX_RETRIES
12 + }
13 +
14 + /// Retry failed webhook events with exponential backoff.
15 + pub(super) async fn retry_failed_webhooks(state: &AppState) {
16 + let events = match db::webhook_events::get_retryable_events(&state.db).await {
17 + Ok(e) if e.is_empty() => return,
18 + Ok(e) => e,
19 + Err(e) => {
20 + tracing::error!(error = ?e, "failed to fetch retryable webhook events");
21 + return;
22 + }
23 + };
24 +
25 + if state.stripe.is_none() { return; }
26 +
27 + for event in events {
28 + let attempt = event.attempts + 1;
29 + tracing::info!(
30 + event_id = %event.id, source = %event.source, event_type = %event.event_type,
31 + attempt = attempt, "retrying webhook event"
32 + );
33 +
34 + let result = if event.source == "stripe" {
35 + match serde_json::from_str::<stripe::Event>(&event.payload) {
36 + Ok(parsed_event) => {
37 + crate::routes::stripe::process_webhook_event(
38 + state, &parsed_event, &parsed_event.id.to_string(),
39 + ).await
40 + }
41 + Err(e) => {
42 + Err(crate::error::AppError::BadRequest(format!("Failed to parse stored webhook payload: {e}")))
43 + }
44 + }
45 + } else {
46 + Err(crate::error::AppError::BadRequest(format!("Unknown webhook source: {}", event.source)))
47 + };
48 +
49 + match result {
50 + Ok(()) => {
51 + tracing::info!(event_id = %event.id, "webhook retry succeeded");
52 + if let Err(e) = db::webhook_events::mark_processed(&state.db, event.id).await {
53 + tracing::error!(error = ?e, "failed to mark webhook event as processed");
54 + }
55 + }
56 + Err(e) => {
57 + let is_dead = is_webhook_dead(attempt);
58 + tracing::warn!(
59 + event_id = %event.id, attempt = attempt, error = ?e,
60 + dead = is_dead,
61 + "webhook retry failed"
62 + );
63 + if let Err(e) = db::webhook_events::schedule_retry(
64 + &state.db, event.id, attempt, &format!("{:?}", e),
65 + ).await {
66 + tracing::error!(error = ?e, "failed to schedule webhook retry");
67 + }
68 +
69 + if is_dead {
70 + if let Some(ref wam) = state.wam {
71 + let title = format!(
72 + "Dead webhook: {} ({})",
73 + event.event_type, event.id
74 + );
75 + let body = format!(
76 + "Webhook event exhausted all {} retry attempts.\n\
77 + Source: {}\nType: {}\nLast error: {:?}",
78 + attempt, event.source, event.event_type, e,
79 + );
80 + wam.create_ticket(
81 + &title,
82 + Some(&body),
83 + "high",
84 + "webhook-dead-letter",
85 + Some(&event.id.to_string()),
86 + )
87 + .await;
88 + }
89 + }
90 + }
91 + }
92 + }
93 + }
94 +
95 + /// Alert the admin about pending refunds that have gone unmatched for >24 hours.
96 + pub(super) async fn escalate_stale_refunds(state: &AppState) {
97 + let stale = match db::pending_refunds::get_stale_refunds(
98 + &state.db,
99 + chrono::Duration::hours(24),
100 + )
101 + .await
102 + {
103 + Ok(s) if s.is_empty() => return,
104 + Ok(s) => s,
105 + Err(e) => {
106 + tracing::error!(error = ?e, "failed to query stale pending refunds");
107 + return;
108 + }
109 + };
110 +
111 + let alert_email = std::env::var("ALERT_EMAIL").ok();
112 +
113 + for refund in &stale {
114 + tracing::error!(
115 + payment_intent_id = %refund.payment_intent_id,
116 + amount = refund.amount.as_i64(),
117 + amount_refunded = refund.amount_refunded.as_i64(),
118 + created_at = %refund.created_at,
119 + "STALE PENDING REFUND: unmatched for >24h, needs manual investigation"
120 + );
121 +
122 + if let Some(ref to) = alert_email {
123 + let subject = format!(
124 + "Unmatched refund: {} ({}c refunded)",
125 + refund.payment_intent_id, refund.amount_refunded
126 + );
127 + let body = format!(
128 + "A charge.refunded webhook for payment intent {} has been pending for >24 hours \
129 + with no matching completed transaction.\n\n\
130 + Amount: {}c\nAmount refunded: {}c\nReceived: {}\n\n\
131 + This likely means the checkout.session.completed webhook was lost. \
132 + Check the Stripe dashboard and reconcile manually.",
133 + refund.payment_intent_id,
134 + refund.amount,
135 + refund.amount_refunded,
136 + refund.created_at,
137 + );
138 + if let Err(e) = state.email.send_alert(to, &subject, &body).await {
139 + tracing::error!(error = ?e, "failed to send stale refund alert email");
140 + }
141 + }
142 +
143 + if let Some(ref wam) = state.wam {
144 + let title = format!(
145 + "Unmatched refund: {} ({}c)",
146 + refund.payment_intent_id, refund.amount_refunded
147 + );
148 + let body = format!(
149 + "charge.refunded webhook pending >24h with no matching completed transaction.\n\
150 + Amount: {}c\nRefunded: {}c\nReceived: {}\n\
151 + Check Stripe dashboard and reconcile manually.",
152 + refund.amount, refund.amount_refunded, refund.created_at,
153 + );
154 + wam.create_ticket(
155 + &title,
156 + Some(&body),
157 + "critical",
158 + "refund-escalation",
159 + Some(&refund.payment_intent_id),
160 + )
161 + .await;
162 + }
163 +
164 + if let Err(e) = db::pending_refunds::mark_escalated(&state.db, refund.id).await {
165 + tracing::error!(error = ?e, "failed to mark pending refund as escalated");
166 + }
167 + }
168 + }
169 +
170 + #[cfg(test)]
171 + mod tests {
172 + use super::*;
173 +
174 + #[test]
175 + fn webhook_not_dead_under_threshold() {
176 + assert!(!is_webhook_dead(1));
177 + assert!(!is_webhook_dead(4));
178 + }
179 +
180 + #[test]
181 + fn webhook_dead_at_threshold() {
182 + assert!(is_webhook_dead(5));
183 + }
184 +
185 + #[test]
186 + fn webhook_dead_above_threshold() {
187 + assert!(is_webhook_dead(6));
188 + assert!(is_webhook_dead(100));
189 + }
190 + }
@@ -70,5 +70,7 @@ mod imports;
70 70 mod media_library;
71 71 mod synckit_sse;
72 72 mod mock_payment_flows;
73 + mod revenue_splits;
73 74 mod synckit_selective;
74 75 mod guest_checkout;
76 + mod sandbox;
@@ -1,8 +1,14 @@
1 1 //! Purchase workflow: creator publishes free item -> buyer signs up ->
2 - //! add to library -> verify -> remove from library -> verify gone
2 + //! add to library -> verify -> remove from library -> verify gone.
3 + //!
4 + //! Also covers paid purchases via mock Stripe, PWYW, unlisted item rejection,
5 + //! duplicate free claim idempotency, and library verification.
3 6
4 7 use crate::harness::TestHarness;
8 + use makenotwork::db;
5 9 use serde_json::Value;
10 + use std::collections::HashMap;
11 + use stripe::{EventObject, EventType, NotificationEventData};
6 12
7 13 #[tokio::test]
8 14 async fn free_item_library_flow() {
@@ -86,3 +92,464 @@ async fn free_item_library_flow() {
86 92 "Library should no longer contain the removed item"
87 93 );
88 94 }
95 +
96 + // ---------------------------------------------------------------------------
97 + // Helpers (shared by paid-purchase tests below)
98 + // ---------------------------------------------------------------------------
99 +
100 + /// Create a creator with Stripe "connected" and a published paid item.
101 + /// Returns (seller_id, project_id, item_id).
102 + async fn setup_creator_with_paid_item(
103 + h: &mut TestHarness,
104 + price_cents: i32,
105 + ) -> (db::UserId, String, String) {
106 + let seller_id = h.signup("seller", "seller@test.com", "pass1234").await;
107 + h.grant_creator(seller_id).await;
108 +
109 + sqlx::query(
110 + "UPDATE users SET stripe_account_id = 'acct_mock_seller', \
111 + stripe_charges_enabled = true WHERE id = $1",
112 + )
113 + .bind(seller_id)
114 + .execute(&h.db)
115 + .await
116 + .unwrap();
117 +
118 + h.client.post_form("/logout", "").await;
119 + h.login("seller", "pass1234").await;
120 +
121 + let resp = h
122 + .client
123 + .post_form("/api/projects", "slug=shop&title=Shop")
124 + .await;
125 + let project: Value = resp.json();
126 + let project_id = project["id"].as_str().unwrap().to_string();
127 +
128 + let resp = h
129 + .client
130 + .post_form(
131 + &format!("/api/projects/{}/items", project_id),
132 + &format!("title=Track&price_cents={}&item_type=audio", price_cents),
133 + )
134 + .await;
135 + let item: Value = resp.json();
136 + let item_id = item["id"].as_str().unwrap().to_string();
137 +
138 + // Publish both
139 + h.client
140 + .put_form(&format!("/api/projects/{}", project_id), "is_public=true")
141 + .await;
142 + h.client
143 + .put_form(&format!("/api/items/{}", item_id), "is_public=true")
144 + .await;
145 +
146 + h.client.post_form("/logout", "").await;
147 +
148 + (seller_id, project_id, item_id)
149 + }
150 +
151 + /// Post a signed webhook event to the harness.
152 + async fn post_webhook(
153 + h: &mut TestHarness,
154 + event: &stripe::Event,
155 + ) -> crate::harness::client::TestResponse {
156 + let payload = serde_json::to_string(event).expect("Event serialization");
157 + let signature = crate::harness::stripe::sign_webhook_payload(
158 + &payload,
159 + crate::harness::stripe::TEST_WEBHOOK_SECRET,
160 + );
161 + h.client
162 + .request_with_headers(
163 + "POST",
164 + "/stripe/webhook",
165 + Some(&payload),
166 + &[
167 + ("stripe-signature", &signature),
168 + ("content-type", "application/json"),
169 + ],
170 + )
171 + .await
172 + }
173 +
174 + fn make_checkout_event(
175 + event_type: EventType,
176 + object: EventObject,
177 + ) -> stripe::Event {
178 + stripe::Event {
179 + id: "evt_purchase_test".parse().unwrap(),
180 + type_: event_type,
181 + data: NotificationEventData {
182 + object,
183 + ..Default::default()
184 + },
185 + ..Default::default()
186 + }
187 + }
188 +
189 + // ---------------------------------------------------------------------------
190 + // 1. Paid purchase via mock Stripe
191 + // ---------------------------------------------------------------------------
192 +
193 + #[tokio::test]
194 + async fn paid_purchase_via_mock_stripe() {
195 + let mut h = TestHarness::with_mocks().await;
196 + let (seller_id, _project_id, item_id) =
197 + setup_creator_with_paid_item(&mut h, 500).await;
198 +
199 + // Buyer signs up and initiates checkout
200 + let buyer_id = h.signup("buyer", "buyer@test.com", "pass1234").await;
201 + let resp = h
202 + .client
203 + .post_form(
204 + &format!("/stripe/checkout/{}", item_id),
205 + "share_contact=false",
206 + )
207 + .await;
208 + assert!(
209 + resp.status.is_redirection() || resp.status.is_success(),
210 + "Checkout should redirect or succeed, got: {} {}",
211 + resp.status,
212 + resp.text
213 + );
214 +
215 + // Verify mock recorded the checkout
216 + let mock_stripe = h.mock_stripe.as_ref().unwrap();
217 + assert_eq!(
218 + mock_stripe.checkouts().len(),
219 + 1,
220 + "Expected 1 checkout session"
221 + );
222 +
223 + // Verify pending transaction was created with correct amount
224 + let (session_id, amount): (String, i32) = sqlx::query_as(
225 + "SELECT stripe_checkout_session_id, amount_cents \
226 + FROM transactions WHERE buyer_id = $1 AND status = 'pending'",
227 + )
228 + .bind(buyer_id)
229 + .fetch_one(&h.db)
230 + .await
231 + .unwrap();
232 + assert_eq!(amount, 500, "Pending transaction should have 500 cents");
233 +
234 + // Simulate webhook completion
235 + let mut meta = HashMap::new();
236 + meta.insert("buyer_id".to_string(), buyer_id.to_string());
237 + meta.insert("seller_id".to_string(), seller_id.to_string());
238 + meta.insert("item_id".to_string(), item_id.clone());
239 + let session = stripe::CheckoutSession {
240 + id: session_id.parse().unwrap(),
241 + metadata: Some(meta),
242 + payment_intent: Some(
243 + serde_json::from_value(serde_json::json!("pi_paid_001")).unwrap(),
244 + ),
245 + ..Default::default()
246 + };
247 + let event = make_checkout_event(
248 + EventType::CheckoutSessionCompleted,
249 + EventObject::CheckoutSession(session),
250 + );
251 + let resp = post_webhook(&mut h, &event).await;
252 + assert_eq!(resp.status.as_u16(), 200, "Webhook failed: {}", resp.text);
253 +
254 + // Verify transaction completed
255 + let status: String = sqlx::query_scalar(
256 + "SELECT status FROM transactions \
257 + WHERE buyer_id = $1 AND item_id = $2::uuid",
258 + )
259 + .bind(buyer_id)
260 + .bind(&item_id)
261 + .fetch_one(&h.db)
262 + .await
263 + .unwrap();
264 + assert_eq!(status, "completed");
265 +
266 + // Verify sales count
267 + let sales: i32 =
268 + sqlx::query_scalar("SELECT sales_count FROM items WHERE id = $1::uuid")
269 + .bind(&item_id)
270 + .fetch_one(&h.db)
271 + .await
272 + .unwrap();
273 + assert_eq!(sales, 1);
274 + }
275 +
276 + // ---------------------------------------------------------------------------
277 + // 2. PWYW purchase with custom amount
278 + // ---------------------------------------------------------------------------
279 +
280 + #[tokio::test]
281 + async fn pwyw_purchase_custom_amount() {
282 + let mut h = TestHarness::with_mocks().await;
283 + let (_seller_id, _project_id, item_id) =
284 + setup_creator_with_paid_item(&mut h, 300).await;
285 +
286 + // Enable PWYW on the item
287 + h.login("seller", "pass1234").await;
288 + h.client
289 + .put_form(
290 + &format!("/api/items/{}", item_id),
291 + "pwyw_enabled=on&pwyw_min_cents=300",
292 + )
293 + .await;
294 + h.client.post_form("/logout", "").await;
295 +
296 + // Buyer checks out with $10 (1000 cents)
297 + let buyer_id = h
298 + .signup("pwywbuyer", "pwyw@test.com", "pass1234")
299 + .await;
300 + let resp = h
301 + .client
302 + .post_form(
303 + &format!("/stripe/checkout/{}", item_id),
304 + "share_contact=false&amount_cents=1000",
305 + )
306 + .await;
307 + assert!(
308 + resp.status.is_redirection() || resp.status.is_success(),
309 + "PWYW checkout should redirect, got: {} {}",
310 + resp.status,
311 + resp.text
312 + );
313 +
314 + // Verify mock recorded the checkout
315 + let mock_stripe = h.mock_stripe.as_ref().unwrap();
316 + assert_eq!(mock_stripe.checkouts().len(), 1);
317 +
318 + // Verify the pending transaction has the custom amount
319 + let amount: i32 = sqlx::query_scalar(
320 + "SELECT amount_cents FROM transactions \
321 + WHERE buyer_id = $1 AND status = 'pending'",
322 + )
323 + .bind(buyer_id)
324 + .fetch_one(&h.db)
325 + .await
326 + .unwrap();
327 + assert_eq!(
328 + amount, 1000,
329 + "PWYW transaction should have buyer's chosen amount of 1000 cents"
330 + );
331 + }
332 +
333 + // ---------------------------------------------------------------------------
334 + // 3. Purchase unlisted item fails
335 + // ---------------------------------------------------------------------------
336 +
337 + #[tokio::test]
338 + async fn purchase_unlisted_item_fails() {
339 + let mut h = TestHarness::with_mocks().await;
340 +
341 + // Create a creator with an item but do NOT publish it
342 + let seller_id = h.signup("seller", "seller@test.com", "pass1234").await;
343 + h.grant_creator(seller_id).await;
344 + sqlx::query(
345 + "UPDATE users SET stripe_account_id = 'acct_mock_seller', \
346 + stripe_charges_enabled = true WHERE id = $1",
347 + )
348 + .bind(seller_id)
349 + .execute(&h.db)
350 + .await
351 + .unwrap();
352 +
353 + h.client.post_form("/logout", "").await;
354 + h.login("seller", "pass1234").await;
355 +
356 + let resp = h
357 + .client
358 + .post_form("/api/projects", "slug=shop&title=Shop")
359 + .await;
360 + let project: Value = resp.json();
361 + let project_id = project["id"].as_str().unwrap();
362 +
363 + let resp = h
364 + .client
365 + .post_form(
366 + &format!("/api/projects/{}/items", project_id),
367 + "title=Secret+Track&price_cents=500&item_type=audio",
368 + )
369 + .await;
370 + let item: Value = resp.json();
371 + let item_id = item["id"].as_str().unwrap();
372 +
373 + // Deliberately NOT publishing: project and item stay private
374 + h.client.post_form("/logout", "").await;
375 +
376 + // Buyer tries to checkout
377 + h.signup("unlistedbuyer", "unlisted@test.com", "pass1234").await;
378 + let resp = h
379 + .client
380 + .post_form(
381 + &format!("/stripe/checkout/{}", item_id),
382 + "share_contact=false",
383 + )
384 + .await;
385 + assert!(
386 + resp.status.is_client_error(),
387 + "Unpublished item should be rejected, got: {} {}",
388 + resp.status,
389 + resp.text
390 + );
391 +
392 + // No checkout session should have been created
393 + let mock_stripe = h.mock_stripe.as_ref().unwrap();
394 + assert_eq!(
395 + mock_stripe.checkouts().len(),
396 + 0,
397 + "No checkout should be created for unlisted item"
398 + );
399 + }
400 +
401 + // ---------------------------------------------------------------------------
402 + // 4. Duplicate free purchase is idempotent
403 + // ---------------------------------------------------------------------------
404 +
405 + #[tokio::test]
406 + async fn duplicate_free_purchase_idempotent() {
407 + let mut h = TestHarness::new().await;
408 +
409 + // Creator: sign up, create free item, publish
410 + let creator_id = h
411 + .signup("seller", "seller@example.com", "password123")
412 + .await;
413 + h.grant_creator(creator_id).await;
414 + h.client.post_form("/logout", "").await;
415 + h.login("seller", "password123").await;
416 +
417 + let resp = h
418 + .client
419 + .post_form("/api/projects", "slug=shop&title=My+Shop")
420 + .await;
421 + let project: Value = resp.json();
422 + let project_id = project["id"].as_str().unwrap();
423 +
424 + let resp = h
425 + .client
426 + .post_form(
427 + &format!("/api/projects/{}/items", project_id),
428 + "title=Freebie&price_cents=0&item_type=digital",
429 + )
430 + .await;
431 + let item: Value = resp.json();
432 + let item_id = item["id"].as_str().unwrap();
433 +
434 + h.client
435 + .put_json(
436 + &format!("/api/projects/{}", project_id),
437 + r#"{"is_public": true}"#,
438 + )
439 + .await;
440 + h.client
441 + .put_form(&format!("/api/items/{}", item_id), "is_public=true")
442 + .await;
443 +
444 + // Buyer signs up
445 + h.client.post_form("/logout", "").await;
446 + let buyer_id = h
447 + .signup("buyer", "buyer@example.com", "password456")
448 + .await;
449 +
450 + // First claim
451 + let resp = h
452 + .client
453 + .post_form(&format!("/api/library/add/{}", item_id), "")
454 + .await;
455 + assert!(
456 + resp.status.is_success(),
457 + "First claim failed: {} {}",
458 + resp.status,
459 + resp.text
460 + );
461 +
462 + // Second claim (should succeed or be silently idempotent)
463 + let resp = h
464 + .client
465 + .post_form(&format!("/api/library/add/{}", item_id), "")
466 + .await;
467 + assert!(
468 + resp.status.is_success() || resp.status.is_redirection(),
469 + "Second claim should not error, got: {} {}",
470 + resp.status,
471 + resp.text
472 + );
473 +
474 + // Verify only one transaction exists
475 + let count: i64 = sqlx::query_scalar(
476 + "SELECT COUNT(*) FROM transactions \
477 + WHERE buyer_id = $1 AND item_id = $2::uuid",
478 + )
479 + .bind(buyer_id)
480 + .bind(item_id)
481 + .fetch_one(&h.db)
482 + .await
483 + .unwrap();
484 + assert_eq!(
485 + count, 1,
486 + "Duplicate free claim should not create a second transaction"
487 + );
488 + }
489 +
490 + // ---------------------------------------------------------------------------
491 + // 5. Purchase adds to library
492 + // ---------------------------------------------------------------------------
493 +
494 + #[tokio::test]
495 + async fn purchase_adds_to_library() {
496 + let mut h = TestHarness::new().await;
497 +
498 + // Creator: sign up, create free item, publish
499 + let creator_id = h
500 + .signup("seller", "seller@example.com", "password123")
501 + .await;
502 + h.grant_creator(creator_id).await;
503 + h.client.post_form("/logout", "").await;
504 + h.login("seller", "password123").await;
505 +
506 + let resp = h
507 + .client
508 + .post_form("/api/projects", "slug=shop&title=My+Shop")
509 + .await;
510 + let project: Value = resp.json();
511 + let project_id = project["id"].as_str().unwrap();
512 +
513 + let resp = h
514 + .client
515 + .post_form(
516 + &format!("/api/projects/{}/items", project_id),
517 + "title=Library+Item&price_cents=0&item_type=digital",
518 + )
519 + .await;
520 + let item: Value = resp.json();
521 + let item_id = item["id"].as_str().unwrap();
522 +
523 + h.client
524 + .put_json(
525 + &format!("/api/projects/{}", project_id),
526 + r#"{"is_public": true}"#,
527 + )
528 + .await;
529 + h.client
530 + .put_form(&format!("/api/items/{}", item_id), "is_public=true")
531 + .await;
532 +
533 + // Buyer signs up and claims free item
534 + h.client.post_form("/logout", "").await;
535 + h.signup("buyer", "buyer@example.com", "password456").await;
536 +
537 + let resp = h
538 + .client
539 + .post_form(&format!("/api/library/add/{}", item_id), "")
540 + .await;
541 + assert!(
542 + resp.status.is_success(),
543 + "Add to library failed: {} {}",
544 + resp.status,
545 + resp.text
546 + );
547 +
548 + // Verify item appears in GET /library
549 + let resp = h.client.get("/library").await;
550 + assert_eq!(resp.status, 200);
551 + assert!(
552 + resp.text.contains("Library Item"),
553 + "Library page should contain the purchased item title"
554 + );
555 + }
@@ -0,0 +1,396 @@
1 + //! Revenue split integration tests — project members, split recording on purchase,
2 + //! and split CSV export.
3 +
4 + use crate::harness::TestHarness;
5 + use makenotwork::db;
6 + use serde_json::Value;
7 + use std::collections::HashMap;
8 + use stripe::{Event, EventObject, EventType, NotificationEventData};
9 +
10 + // ---------------------------------------------------------------------------
11 + // Helpers (mirrors mock_payment_flows patterns)
12 + // ---------------------------------------------------------------------------
13 +
14 + /// Create a creator with Stripe "connected" (direct DB override) and a published paid item.
15 + /// Returns (seller_id, project_id, item_id).
16 + async fn setup_paid_item(h: &mut TestHarness, price_cents: i32) -> (db::UserId, String, String) {
17 + let seller_id = h.signup("seller", "seller@test.com", "pass1234").await;
18 + h.grant_creator(seller_id).await;
19 +
20 + // Simulate Stripe Connect onboarding complete
21 + sqlx::query("UPDATE users SET stripe_account_id = 'acct_mock_seller', stripe_charges_enabled = true WHERE id = $1")
22 + .bind(seller_id)
23 + .execute(&h.db)
24 + .await
25 + .unwrap();
26 +
27 + h.client.post_form("/logout", "").await;
28 + h.login("seller", "pass1234").await;
29 +
30 + let resp = h.client.post_form("/api/projects", "slug=shop&title=Shop").await;
31 + let project: Value = resp.json();
32 + let project_id = project["id"].as_str().unwrap().to_string();
33 +
34 + let resp = h.client.post_form(
35 + &format!("/api/projects/{}/items", project_id),
36 + &format!("title=Track&price_cents={}&item_type=audio", price_cents),
37 + ).await;
38 + let item: Value = resp.json();
39 + let item_id = item["id"].as_str().unwrap().to_string();
40 +
41 + // Publish
42 + h.client.put_form(&format!("/api/projects/{}", project_id), "is_public=true").await;
43 + h.client.put_form(&format!("/api/items/{}", item_id), "is_public=true").await;
44 +
45 + (seller_id, project_id, item_id)
46 + }
47 +
48 + fn make_event(event_type: EventType, object: EventObject) -> Event {
49 + Event {
50 + id: "evt_split_test".parse().unwrap(),
51 + type_: event_type,
52 + data: NotificationEventData {
53 + object,
54 + ..Default::default()
55 + },
56 + ..Default::default()
57 + }
58 + }
59 +
60 + async fn post_webhook(h: &mut TestHarness, event: &Event) -> crate::harness::client::TestResponse {
61 + let payload = serde_json::to_string(event).expect("Event serialization");
62 + let signature = crate::harness::stripe::sign_webhook_payload(
63 + &payload,
64 + crate::harness::stripe::TEST_WEBHOOK_SECRET,
65 + );
66 + h.client.request_with_headers(
67 + "POST",
68 + "/stripe/webhook",
69 + Some(&payload),
70 + &[
71 + ("stripe-signature", &signature),
72 + ("content-type", "application/json"),
73 + ],
74 + ).await
75 + }
76 +
77 + /// Complete a purchase through the webhook pipeline.
78 + /// Returns the buyer_id.
79 + async fn complete_purchase(
80 + h: &mut TestHarness,
81 + seller_id: db::UserId,
82 + item_id: &str,
83 + buyer_username: &str,
84 + buyer_email: &str,
85 + ) -> db::UserId {
86 + let buyer_id = h.signup(buyer_username, buyer_email, "pass1234").await;
87 +
88 + h.client.post_form(
89 + &format!("/stripe/checkout/{}", item_id),
90 + "share_contact=false",
91 + ).await;
92 +
93 + let session_id: String = sqlx::query_scalar(
94 + "SELECT stripe_checkout_session_id FROM transactions WHERE buyer_id = $1 AND status = 'pending'",
95 + )
96 + .bind(buyer_id)
97 + .fetch_one(&h.db)
98 + .await
99 + .unwrap();
100 +
101 + let mut meta = HashMap::new();
102 + meta.insert("buyer_id".to_string(), buyer_id.to_string());
103 + meta.insert("seller_id".to_string(), seller_id.to_string());
104 + meta.insert("item_id".to_string(), item_id.to_string());
105 + let session = stripe::CheckoutSession {
106 + id: session_id.parse().unwrap(),
107 + metadata: Some(meta),
108 + payment_intent: Some(
109 + serde_json::from_value(serde_json::json!(format!("pi_split_{}", buyer_username)))
110 + .unwrap(),
111 + ),
112 + ..Default::default()
113 + };
114 + let event = make_event(
115 + EventType::CheckoutSessionCompleted,
116 + EventObject::CheckoutSession(session),
117 + );
118 + let resp = post_webhook(h, &event).await;
119 + assert_eq!(resp.status.as_u16(), 200, "Webhook failed: {}", resp.text);
120 +
121 + buyer_id
122 + }
123 +
124 + // ---------------------------------------------------------------------------
125 + // 1. Add project member with split
126 + // ---------------------------------------------------------------------------
127 +
128 + #[tokio::test]
129 + async fn add_project_member_with_split() {
130 + let mut h = TestHarness::with_mocks().await;
131 +
132 + let _seller_id = h.create_creator("creator1").await;
133 +
134 + // Create project
135 + let resp = h.client.post_form("/api/projects", "slug=collab-proj&title=Collab+Project").await;
136 + assert!(resp.status.is_success(), "Create project failed: {}", resp.text);
137 + let project: Value = resp.json();
138 + let project_id = project["id"].as_str().unwrap().to_string();
139 +
140 + // Sign up a collaborator (separate session)
141 + h.client.post_form("/logout", "").await;
142 + let collab_id = h.signup("collaborator", "collab@test.com", "pass1234").await;
143 +
144 + // Log back in as creator
145 + h.client.post_form("/logout", "").await;
146 + h.login("creator1", "password123").await;
147 +
148 + // Add collaborator via API
149 + let resp = h.client.post_form(
150 + &format!("/api/projects/{}/members", project_id),
151 + "username=collaborator&split_percent=30",
152 + ).await;
153 + assert!(
154 + resp.status.is_success(),
155 + "Add member failed: {} {}",
156 + resp.status, resp.text
157 + );
158 +
159 + // Verify member exists in DB
160 + let members: Vec<(db::UserId, i16)> = sqlx::query_as(
161 + "SELECT user_id, split_percent FROM project_members WHERE project_id = $1::uuid",
162 + )
163 + .bind(&project_id)
164 + .fetch_all(&h.db)
165 + .await
166 + .unwrap();
167 +
168 + assert_eq!(members.len(), 1, "Expected 1 project member");
169 + assert_eq!(members[0].0, collab_id);
170 + assert_eq!(members[0].1, 30);
171 + }
172 +
173 + // ---------------------------------------------------------------------------
174 + // 2. Update split percentage
175 + // ---------------------------------------------------------------------------
176 +
177 + #[tokio::test]
178 + async fn update_split_percentage() {
179 + let mut h = TestHarness::with_mocks().await;
180 +
181 + let _seller_id = h.create_creator("creator2").await;
182 +
183 + let resp = h.client.post_form("/api/projects", "slug=split-upd&title=Split+Update").await;
184 + assert!(resp.status.is_success(), "Create project failed: {}", resp.text);
185 + let project: Value = resp.json();
186 + let project_id = project["id"].as_str().unwrap().to_string();
187 +
188 + // Create collaborator
189 + h.client.post_form("/logout", "").await;
190 + let collab_id = h.signup("collab2", "collab2@test.com", "pass1234").await;
191 +
192 + // Log back in as creator
193 + h.client.post_form("/logout", "").await;
194 + h.login("creator2", "password123").await;
195 +
196 + // Add with 30%
197 + let resp = h.client.post_form(
198 + &format!("/api/projects/{}/members", project_id),
199 + "username=collab2&split_percent=30",
200 + ).await;
201 + assert!(resp.status.is_success(), "Add member failed: {} {}", resp.status, resp.text);
202 +
203 + // Update to 50% by re-adding (the API uses ON CONFLICT DO UPDATE)
204 + let resp = h.client.post_form(
205 + &format!("/api/projects/{}/members", project_id),
206 + "username=collab2&split_percent=50",
207 + ).await;
208 + assert!(resp.status.is_success(), "Update member failed: {} {}", resp.status, resp.text);
209 +
210 + // Verify updated split
211 + let split: i16 = sqlx::query_scalar(
212 + "SELECT split_percent FROM project_members WHERE project_id = $1::uuid AND user_id = $2",
213 + )
214 + .bind(&project_id)
215 + .bind(collab_id)
216 + .fetch_one(&h.db)
217 + .await
218 + .unwrap();
219 +
220 + assert_eq!(split, 50, "Split should be updated to 50%");
221 + }
222 +
223 + // ---------------------------------------------------------------------------
224 + // 3. Remove project member
225 + // ---------------------------------------------------------------------------
226 +
227 + #[tokio::test]
228 + async fn remove_project_member() {
229 + let mut h = TestHarness::with_mocks().await;
230 +
231 + let _seller_id = h.create_creator("creator3").await;
232 +
233 + let resp = h.client.post_form("/api/projects", "slug=rm-member&title=Remove+Member").await;
234 + assert!(resp.status.is_success(), "Create project failed: {}", resp.text);
235 + let project: Value = resp.json();
236 + let project_id = project["id"].as_str().unwrap().to_string();
237 +
238 + // Create collaborator
239 + h.client.post_form("/logout", "").await;
240 + let collab_id = h.signup("collab3", "collab3@test.com", "pass1234").await;
241 +
242 + // Log back in as creator
243 + h.client.post_form("/logout", "").await;
244 + h.login("creator3", "password123").await;
245 +
246 + // Add member
247 + let resp = h.client.post_form(
248 + &format!("/api/projects/{}/members", project_id),
249 + "username=collab3&split_percent=25",
250 + ).await;
251 + assert!(resp.status.is_success(), "Add member failed: {} {}", resp.status, resp.text);
252 +
253 + // Remove member
254 + let resp = h.client.delete(
255 + &format!("/api/projects/{}/members/{}", project_id, collab_id),
256 + ).await;
257 + assert!(
258 + resp.status.is_success(),
259 + "Remove member failed: {} {}",
260 + resp.status, resp.text
261 + );
262 +
263 + // Verify member is gone
264 + let count: i64 = sqlx::query_scalar(
265 + "SELECT COUNT(*) FROM project_members WHERE project_id = $1::uuid AND user_id = $2",
266 + )
267 + .bind(&project_id)
268 + .bind(collab_id)
269 + .fetch_one(&h.db)
270 + .await
271 + .unwrap();
272 +
273 + assert_eq!(count, 0, "Member should be removed");
274 + }
275 +
276 + // ---------------------------------------------------------------------------
277 + // 4. Split recorded on purchase
278 + // ---------------------------------------------------------------------------
279 +
280 + #[tokio::test]
281 + async fn split_recorded_on_purchase() {
282 + let mut h = TestHarness::with_mocks().await;
283 + let (seller_id, project_id, item_id) = setup_paid_item(&mut h, 1000).await;
284 +
285 + // Create a collaborator
286 + let collab_id = h.signup("splitcollab", "splitcollab@test.com", "pass1234").await;
287 + h.client.post_form("/logout", "").await;
288 +
289 + // Add collaborator with 50% split (direct SQL, seller is already logged out)
290 + sqlx::query(
291 + "INSERT INTO project_members (project_id, user_id, role, split_percent, added_by) VALUES ($1::uuid, $2, 'member', 50, $3)",
292 + )
293 + .bind(&project_id)
294 + .bind(collab_id)
295 + .bind(seller_id)
296 + .execute(&h.db)
297 + .await
298 + .unwrap();
299 +
300 + // Complete a purchase through the webhook pipeline
301 + let _buyer_id = complete_purchase(&mut h, seller_id, &item_id, "splitbuyer", "splitbuyer@test.com").await;
302 +
303 + // Wait for split recording (runs after transaction commit)
304 + tokio::time::sleep(std::time::Duration::from_millis(200)).await;
305 +
306 + // Verify revenue_splits table has a row for the collaborator
307 + let split_rows: Vec<(i32, i16)> = sqlx::query_as(
308 + "SELECT amount_cents, split_percent FROM revenue_splits WHERE recipient_id = $1",
309 + )
310 + .bind(collab_id)
311 + .fetch_all(&h.db)
312 + .await
313 + .unwrap();
314 +
315 + assert_eq!(split_rows.len(), 1, "Expected 1 revenue split record for collaborator");
316 + // 1000 * 50 / 100 = 500
317 + assert_eq!(split_rows[0].0, 500, "Collaborator should get 50% of 1000 = 500 cents");
318 + assert_eq!(split_rows[0].1, 50, "Split percent should be recorded as 50");
319 +
320 + // Also verify the split is linked to the correct transaction
321 + let has_transaction_id: bool = sqlx::query_scalar(
322 + "SELECT transaction_id IS NOT NULL FROM revenue_splits WHERE recipient_id = $1",
323 + )
324 + .bind(collab_id)
325 + .fetch_one(&h.db)
326 + .await
327 + .unwrap();
328 + assert!(has_transaction_id, "Revenue split should be linked to a transaction");
329 + }
330 +
331 + // ---------------------------------------------------------------------------
332 + // 5. Split export contains data
333 + // ---------------------------------------------------------------------------
334 +
335 + #[tokio::test]
336 + async fn split_export_contains_data() {
337 + let mut h = TestHarness::with_mocks().await;
338 + let (seller_id, project_id, item_id) = setup_paid_item(&mut h, 800).await;
339 +
340 + // Create a collaborator
341 + let collab_id = h.signup("exportcollab", "exportcollab@test.com", "pass1234").await;
342 + h.client.post_form("/logout", "").await;
343 +
344 + // Add collaborator with 40% split
345 + sqlx::query(
346 + "INSERT INTO project_members (project_id, user_id, role, split_percent, added_by) VALUES ($1::uuid, $2, 'member', 40, $3)",
347 + )
348 + .bind(&project_id)
349 + .bind(collab_id)
350 + .bind(seller_id)
351 + .execute(&h.db)
352 + .await
353 + .unwrap();
354 +
355 + // Complete a purchase
356 + let _buyer_id = complete_purchase(&mut h, seller_id, &item_id, "exportbuyer", "exportbuyer@test.com").await;
357 +
358 + // Wait for split recording
359 + tokio::time::sleep(std::time::Duration::from_millis(200)).await;
360 +
361 + // Log in as seller and export splits
362 + h.client.post_form("/logout", "").await;
363 + h.login("seller", "pass1234").await;
364 +
365 + let resp = h.client.post_form("/api/export/splits", "").await;
366 + assert!(
367 + resp.status.is_success(),
368 + "Export splits failed: {} {}",
369 + resp.status, resp.text
370 + );
371 +
372 + // CSV should contain header + at least one data row
373 + let csv = &resp.text;
374 + assert!(csv.contains("Date,Type,Direction,Recipient,Amount,Split %"), "CSV should have header row");
375 + assert!(csv.contains("exportcollab"), "CSV should contain collaborator username");
376 + assert!(csv.contains("sale"), "CSV should contain 'sale' source type");
377 + assert!(csv.contains("outgoing"), "From seller perspective, split should be 'outgoing'");
378 + // 800 * 40 / 100 = 320 cents = 3.20
379 + assert!(csv.contains("3.20"), "CSV should contain split amount of $3.20");
380 + assert!(csv.contains("40"), "CSV should contain split percentage of 40");
381 +
382 + // Also verify from the collaborator's perspective
383 + h.client.post_form("/logout", "").await;
384 + h.login("exportcollab", "pass1234").await;
385 +
386 + let resp = h.client.post_form("/api/export/splits", "").await;
387 + assert!(
388 + resp.status.is_success(),
389 + "Collab export splits failed: {} {}",
390 + resp.status, resp.text
391 + );
392 +
393 + let csv = &resp.text;
394 + assert!(csv.contains("incoming"), "From collaborator perspective, split should be 'incoming'");
395 + assert!(csv.contains("exportcollab"), "CSV should contain collaborator username");
396 + }
@@ -0,0 +1,280 @@
1 + //! Sandbox workflow: ephemeral account creation, feature restrictions, visibility rules.
2 +
3 + use crate::harness::TestHarness;
4 + use makenotwork::constants::SANDBOX_MAX_PER_IP;
5 +
6 + /// Helper: create a sandbox account via POST /sandbox.
7 + /// The client must have a CSRF token (fetched from GET /sandbox).
8 + /// Returns the response from POST /sandbox (should be a 302 redirect).
9 + async fn create_sandbox(h: &mut TestHarness) -> crate::harness::client::TestResponse {
10 + // GET /sandbox to establish session + extract CSRF token
11 + let resp = h.client.get("/sandbox").await;
12 + assert!(
13 + resp.status.is_success(),
14 + "GET /sandbox failed: {} {}",
15 + resp.status,
16 + resp.text
17 + );
18 +
19 + // POST /sandbox to create the account
20 + h.client.post_form("/sandbox", "").await
21 + }
22 +
23 + /// Look up the sandbox username from the DB (most recently created sandbox_ user).
24 + async fn sandbox_username(h: &TestHarness) -> String {
25 + sqlx::query_scalar::<_, String>(
26 + "SELECT username FROM users WHERE username LIKE 'sandbox_%' ORDER BY created_at DESC LIMIT 1",
27 + )
28 + .fetch_one(&h.db)
29 + .await
30 + .expect("No sandbox user found")
31 + }
32 +
33 + #[tokio::test]
34 + async fn create_sandbox_account() {
35 + let mut h = TestHarness::new().await;
36 +
37 + let resp = create_sandbox(&mut h).await;
38 + assert!(
39 + resp.status.is_redirection(),
40 + "POST /sandbox should redirect, got {}",
41 + resp.status
42 + );
43 +
44 + // Should be able to access dashboard as the sandbox user
45 + let resp = h.client.get("/dashboard").await;
46 + assert_eq!(
47 + resp.status, 200,
48 + "Dashboard should be accessible after sandbox creation"
49 + );
50 + }
51 +
52 + #[tokio::test]
53 + async fn sandbox_blocks_custom_domains() {
54 + let mut h = TestHarness::new().await;
55 + create_sandbox(&mut h).await;
56 +
57 + let resp = h
58 + .client
59 + .post_json("/api/domains", r#"{"domain": "sandbox.example.com"}"#)
60 + .await;
61 + assert_eq!(
62 + resp.status, 403,
63 + "Sandbox user should get 403 on POST /api/domains, got {}",
64 + resp.status
65 + );
66 + }
67 +
68 + #[tokio::test]
69 + async fn sandbox_blocks_git_repos() {
70 + let mut h = TestHarness::new().await;
71 + create_sandbox(&mut h).await;
72 +
73 + let resp = h
74 + .client
75 + .post_json("/api/repos", r#"{"name": "test-repo"}"#)
76 + .await;
77 + assert_eq!(
78 + resp.status, 403,
79 + "Sandbox user should get 403 on POST /api/repos, got {}",
80 + resp.status
81 + );
82 + }
83 +
84 + #[tokio::test]
85 + async fn sandbox_blocks_imports() {
86 + let mut h = TestHarness::new().await;
87 + create_sandbox(&mut h).await;
88 +
89 + let resp = h
90 + .client
91 + .post_json(
92 + "/api/users/me/import",
93 + r#"{"project_id": "00000000-0000-0000-0000-000000000000", "source": "bandcamp", "url": "https://example.bandcamp.com"}"#,
94 + )
95 + .await;
96 + assert_eq!(
97 + resp.status, 403,
98 + "Sandbox user should get 403 on POST /api/users/me/import, got {}",
99 + resp.status
100 + );
101 + }
102 +
103 + #[tokio::test]
104 + async fn sandbox_blocks_guest_claim() {
105 + let mut h = TestHarness::new().await;
106 + create_sandbox(&mut h).await;
107 +
108 + let resp = h
109 + .client
110 + .post_json(
111 + "/api/purchases/claim",
112 + r#"{"claim_token": "fake-token-12345"}"#,
113 + )
114 + .await;
115 + assert_eq!(
116 + resp.status, 403,
117 + "Sandbox user should get 403 on POST /api/purchases/claim, got {}",
118 + resp.status
119 + );
120 + }
121 +
122 + #[tokio::test]
123 + async fn sandbox_content_not_visible_on_item_page() {
124 + let mut h = TestHarness::new().await;
125 + create_sandbox(&mut h).await;
126 +
127 + // Create a project
128 + let resp = h
129 + .client
130 + .post_form("/api/projects", "slug=sandbox-proj&title=Sandbox+Project")
131 + .await;
132 + assert!(
133 + resp.status.is_success(),
134 + "Create project failed: {} {}",
135 + resp.status,
136 + resp.text
137 + );
138 + let project: serde_json::Value = resp.json();
139 + let project_id = project["id"].as_str().unwrap();
140 +
141 + // Publish project
142 + h.client
143 + .put_json(
144 + &format!("/api/projects/{}", project_id),
145 + r#"{"is_public": true}"#,
146 + )
147 + .await;
148 +
149 + // Create an item
150 + let resp = h
151 + .client
152 + .post_form(
153 + &format!("/api/projects/{}/items", project_id),
154 + "title=Sandbox+Item&item_type=digital&price_cents=0",
155 + )
156 + .await;
157 + assert!(
158 + resp.status.is_success(),
159 + "Create item failed: {} {}",
160 + resp.status,
161 + resp.text
162 + );
163 + let item: serde_json::Value = resp.json();
164 + let item_id = item["id"].as_str().unwrap();
165 +
166 + // Publish the item
167 + h.client
168 + .put_form(&format!("/api/items/{}", item_id), "is_public=true")
169 + .await;
170 +
171 + // Use a second harness (unauthenticated client) to visit the item page
172 + let mut h2 = TestHarness::new().await;
173 + let resp = h2.client.get(&format!("/i/{}", item_id)).await;
174 + assert_eq!(
175 + resp.status, 404,
176 + "Sandbox item should return 404 to unauthenticated visitor, got {}",
177 + resp.status
178 + );
179 + }
180 +
181 + #[tokio::test]
182 + async fn sandbox_rss_returns_404() {
183 + let mut h = TestHarness::new().await;
184 + create_sandbox(&mut h).await;
185 +
186 + let username = sandbox_username(&h).await;
187 +
188 + let resp = h.client.get(&format!("/u/{}/rss", username)).await;
189 + assert_eq!(
190 + resp.status, 404,
191 + "Sandbox user RSS feed should return 404, got {}",
192 + resp.status
193 + );
194 + }
195 +
196 + #[tokio::test]
197 + async fn sandbox_per_ip_cap() {
198 + let mut h = TestHarness::new().await;
199 +
200 + // Create SANDBOX_MAX_PER_IP sandboxes, each needing a fresh session
201 + for i in 0..SANDBOX_MAX_PER_IP {
202 + // Log out between sandbox creations so we get a fresh session
203 + if i > 0 {
204 + h.client.post_form("/logout", "").await;
205 + }
206 + let resp = create_sandbox(&mut h).await;
207 + assert!(
208 + resp.status.is_redirection(),
209 + "Sandbox {} should succeed, got {}",
210 + i + 1,
211 + resp.status
212 + );
213 + }
214 +
215 + // Log out and try one more — should be rejected
216 + h.client.post_form("/logout", "").await;
217 + let resp = h.client.get("/sandbox").await;
218 + assert!(resp.status.is_success());
219 +
220 + let resp = h.client.post_form("/sandbox", "").await;
221 + assert_eq!(
222 + resp.status, 400,
223 + "Sandbox beyond per-IP cap should return 400, got {}",
224 + resp.status
225 + );
226 + }
227 +
228 + #[tokio::test]
229 + async fn sandbox_blog_no_email() {
230 + let mut h = TestHarness::with_mocks().await;
231 + create_sandbox(&mut h).await;
232 +
233 + // Clear any emails from sandbox creation
234 + h.mock_email.as_ref().unwrap().clear();
235 +
236 + // Create a project
237 + let resp = h
238 + .client
239 + .post_form("/api/projects", "slug=sb-blog&title=Sandbox+Blog")
240 + .await;
241 + assert!(
242 + resp.status.is_success(),
243 + "Create project failed: {} {}",
244 + resp.status,
245 + resp.text
246 + );
247 + let project: serde_json::Value = resp.json();
248 + let project_id = project["id"].as_str().unwrap();
249 +
250 + // Publish project
251 + h.client
252 + .put_json(
253 + &format!("/api/projects/{}", project_id),
254 + r#"{"is_public": true}"#,
255 + )
256 + .await;
257 +
258 + // Create and immediately publish a blog post
259 + let resp = h
260 + .client
261 + .post_json(
262 + &format!("/api/projects/{}/blog", project_id),
263 + r#"{"title": "Sandbox Post", "body_markdown": "Hello from sandbox!", "is_published": true}"#,
264 + )
265 + .await;
266 + assert!(
267 + resp.status.is_success(),
268 + "Create blog post failed: {} {}",
269 + resp.status,
270 + resp.text
271 + );
272 +
273 + // No emails should have been sent (sandbox skips announcements)
274 + let count = h.mock_email.as_ref().unwrap().count();
275 + assert_eq!(
276 + count, 0,
277 + "Sandbox blog publish should send 0 emails, got {}",
278 + count
279 + );
280 + }
@@ -199,6 +199,386 @@ async fn subscription_tier_lifecycle() {
199 199 }
200 200
201 201 #[tokio::test]
202 + async fn create_subscription_tier() {
203 + let mut h = TestHarness::new().await;
204 +
205 + // Setup: creator with project
206 + let _user_id = h.create_creator("tiercreator").await;
207 +
208 + let resp = h
209 + .client
210 + .post_form("/api/projects", "slug=tier-create&title=Tier+Create")
211 + .await;
212 + assert!(resp.status.is_success(), "Create project failed: {}", resp.text);
213 + let project: Value = resp.json();
214 + let project_id = project["id"].as_str().unwrap();
215 + let project_uuid: uuid::Uuid = project_id.parse().unwrap();
216 +
217 + // ── Attempt to create a tier via API (no Stripe configured -> 400) ──
218 + let resp = h
219 + .client
220 + .post_json(
221 + &format!("/api/projects/{}/tiers", project_id),
222 + r#"{"name": "Gold Tier", "description": "Premium access", "price_cents": 1000}"#,
223 + )
224 + .await;
225 + assert_eq!(
226 + resp.status, 400,
227 + "Create tier without Stripe should return 400, got {} {}",
228 + resp.status, resp.text
229 + );
230 +
231 + // The handler inserts the row before calling Stripe, so clean up the orphan
232 + sqlx::query("DELETE FROM subscription_tiers WHERE project_id = $1")
233 + .bind(project_uuid)
234 + .execute(&h.db)
235 + .await
236 + .unwrap();
237 +
238 + // ── Insert tier via SQL (simulating successful Stripe flow) ──
239 + let tier_id = sqlx::query_scalar::<_, uuid::Uuid>(
240 + "INSERT INTO subscription_tiers (project_id, name, description, price_cents, stripe_product_id, stripe_price_id) \
241 + VALUES ($1, 'Gold Tier', 'Premium access', 1000, 'prod_test_123', 'price_test_123') RETURNING id",
242 + )
243 + .bind(project_uuid)
244 + .fetch_one(&h.db)
245 + .await
246 + .unwrap();
247 +
248 + // ── Verify it appears in the list (project settings) ──
249 + let resp = h
250 + .client
251 + .get(&format!("/api/projects/{}/tiers", project_id))
252 + .await;
253 + assert!(resp.status.is_success(), "List tiers failed: {}", resp.text);
254 + let body: Value = resp.json();
255 + let tiers = body["data"].as_array().expect("response should have data array");
256 + assert_eq!(tiers.len(), 1, "Should have exactly 1 tier");
257 + assert_eq!(tiers[0]["name"], "Gold Tier");
258 + assert_eq!(tiers[0]["description"], "Premium access");
259 + assert_eq!(tiers[0]["price_cents"], 1000);
260 + assert_eq!(tiers[0]["is_active"], true);
261 +
262 + // ── Verify Stripe IDs are set in the database ──
263 + let (prod_id, price_id): (Option<String>, Option<String>) = sqlx::query_as(
264 + "SELECT stripe_product_id, stripe_price_id FROM subscription_tiers WHERE id = $1",
265 + )
266 + .bind(tier_id)
267 + .fetch_one(&h.db)
268 + .await
269 + .unwrap();
270 + assert_eq!(prod_id.as_deref(), Some("prod_test_123"));
271 + assert_eq!(price_id.as_deref(), Some("price_test_123"));
272 + }
273 +
274 + #[tokio::test]
275 + async fn list_subscription_tiers() {
276 + let mut h = TestHarness::new().await;
277 +
278 + let _user_id = h.create_creator("tierlist").await;
279 +
280 + let resp = h
281 + .client
282 + .post_form("/api/projects", "slug=tier-list&title=Tier+List")
283 + .await;
284 + assert!(resp.status.is_success());
285 + let project: Value = resp.json();
286 + let project_id = project["id"].as_str().unwrap();
287 + let project_uuid: uuid::Uuid = project_id.parse().unwrap();
288 +
289 + // ── Insert multiple tiers with explicit sort_order ──
290 + for (name, price, order) in [
291 + ("Bronze", 300, 1),
292 + ("Silver", 600, 2),
293 + ("Gold", 1200, 3),
294 + ] {
295 + sqlx::query(
296 + "INSERT INTO subscription_tiers (project_id, name, price_cents, sort_order) \
297 + VALUES ($1, $2, $3, $4)",
298 + )
299 + .bind(project_uuid)
300 + .bind(name)
301 + .bind(price)
302 + .bind(order)
303 + .execute(&h.db)
304 + .await
305 + .unwrap();
306 + }
307 +
308 + // ── List tiers: should return all 3 in order ──
309 + let resp = h
310 + .client
311 + .get(&format!("/api/projects/{}/tiers", project_id))
312 + .await;
313 + assert!(resp.status.is_success(), "List tiers failed: {}", resp.text);
314 + let body: Value = resp.json();
315 + let tiers = body["data"].as_array().expect("response should have data array");
316 + assert_eq!(tiers.len(), 3, "Should have 3 tiers");
317 + assert_eq!(tiers[0]["name"], "Bronze");
318 + assert_eq!(tiers[0]["price_cents"], 300);
319 + assert_eq!(tiers[1]["name"], "Silver");
320 + assert_eq!(tiers[1]["price_cents"], 600);
321 + assert_eq!(tiers[2]["name"], "Gold");
322 + assert_eq!(tiers[2]["price_cents"], 1200);
323 + }
324 +
325 + #[tokio::test]
326 + async fn update_subscription_tier() {
327 + let mut h = TestHarness::new().await;
328 +
329 + let _user_id = h.create_creator("tierupd").await;
330 +
331 + let resp = h
332 + .client
333 + .post_form("/api/projects", "slug=tier-upd&title=Tier+Update")
334 + .await;
335 + assert!(resp.status.is_success());
336 + let project: Value = resp.json();
337 + let project_id = project["id"].as_str().unwrap();
338 + let project_uuid: uuid::Uuid = project_id.parse().unwrap();
339 +
340 + // ── Insert a tier via SQL ──
341 + let tier_id = sqlx::query_scalar::<_, uuid::Uuid>(
342 + "INSERT INTO subscription_tiers (project_id, name, description, price_cents) \
343 + VALUES ($1, 'Starter', 'Basic access', 500) RETURNING id",
344 + )
345 + .bind(project_uuid)
346 + .fetch_one(&h.db)
347 + .await
348 + .unwrap();
349 +
350 + // ── Update name and description via API ──
351 + let resp = h
352 + .client
353 + .put_json(
354 + &format!("/api/tiers/{}", tier_id),
355 + r#"{"name": "Pro", "description": "Full access to everything", "is_active": true}"#,
356 + )
357 + .await;
358 + assert!(
359 + resp.status.is_success(),
360 + "Update tier failed: {} {}",
361 + resp.status,
362 + resp.text
363 + );
364 + let updated: Value = resp.json();
365 + assert_eq!(updated["name"], "Pro");
366 + assert_eq!(updated["description"], "Full access to everything");
367 + assert_eq!(updated["is_active"], true);
368 +
369 + // ── Verify changes persisted via list endpoint ──
370 + let resp = h
371 + .client
372 + .get(&format!("/api/projects/{}/tiers", project_id))
373 + .await;
374 + let body: Value = resp.json();
375 + let tiers = body["data"].as_array().unwrap();
376 + assert_eq!(tiers.len(), 1);
377 + assert_eq!(tiers[0]["name"], "Pro", "Updated name should persist");
378 + assert_eq!(
379 + tiers[0]["description"], "Full access to everything",
380 + "Updated description should persist"
381 + );
382 +
383 + // ── Update to deactivate ──
384 + let resp = h
385 + .client
386 + .put_json(
387 + &format!("/api/tiers/{}", tier_id),
388 + r#"{"name": "Pro", "description": "Full access to everything", "is_active": false}"#,
389 + )
390 + .await;
391 + assert!(resp.status.is_success());
392 + let updated: Value = resp.json();
393 + assert_eq!(updated["is_active"], false, "Tier should be deactivated");
394 + }
395 +
396 + #[tokio::test]
397 + async fn delete_subscription_tier() {
398 + let mut h = TestHarness::new().await;
399 +
400 + let _user_id = h.create_creator("tierdel").await;
401 +
402 + let resp = h
403 + .client
404 + .post_form("/api/projects", "slug=tier-del&title=Tier+Delete")
405 + .await;
406 + assert!(resp.status.is_success());
407 + let project: Value = resp.json();
408 + let project_id = project["id"].as_str().unwrap();
409 + let project_uuid: uuid::Uuid = project_id.parse().unwrap();
410 +
411 + // ── Insert a tier via SQL ──
412 + let tier_id = sqlx::query_scalar::<_, uuid::Uuid>(
413 + "INSERT INTO subscription_tiers (project_id, name, price_cents) \
414 + VALUES ($1, 'Temp Tier', 800) RETURNING id",
415 + )
416 + .bind(project_uuid)
417 + .fetch_one(&h.db)
418 + .await
419 + .unwrap();
420 +
421 + // ── Verify it exists ──
422 + let resp = h
423 + .client
424 + .get(&format!("/api/projects/{}/tiers", project_id))
425 + .await;
426 + let body: Value = resp.json();
427 + let tiers = body["data"].as_array().unwrap();
428 + assert_eq!(tiers.len(), 1, "Tier should exist before deletion");
429 +
430 + // ── Delete the tier ──
431 + let resp = h
432 + .client
433 + .delete(&format!("/api/tiers/{}", tier_id))
434 + .await;
435 + assert_eq!(
436 + resp.status, 204,
437 + "Delete tier should return 204, got {} {}",
438 + resp.status, resp.text
439 + );
440 +
441 + // ── Verify tier is gone from list ──
442 + let resp = h
443 + .client
444 + .get(&format!("/api/projects/{}/tiers", project_id))
445 + .await;
446 + let body: Value = resp.json();
447 + let tiers = body["data"].as_array().unwrap();
448 + assert_eq!(tiers.len(), 0, "Tier list should be empty after deletion");
449 +
450 + // ── Verify hard delete (no subscriptions referenced it) ──
451 + let count = sqlx::query_scalar::<_, i64>(
452 + "SELECT COUNT(*) FROM subscription_tiers WHERE id = $1",
453 + )
454 + .bind(tier_id)
455 + .fetch_one(&h.db)
456 + .await
457 + .unwrap();
458 + assert_eq!(count, 0, "Tier should be hard-deleted from database");
459 + }
460 +
461 + #[tokio::test]
462 + async fn subscriber_tier_visibility() {
463 + let mut h = TestHarness::new().await;
464 +
465 + // Creator sets up a public project with a tier
466 + let _user_id = h.create_creator("tiervis").await;
467 +
468 + let resp = h
469 + .client
470 + .post_form("/api/projects", "slug=tier-vis&title=Visible+Tiers")
471 + .await;
472 + assert!(resp.status.is_success());
473 + let project: Value = resp.json();
474 + let project_id = project["id"].as_str().unwrap();
475 + let project_uuid: uuid::Uuid = project_id.parse().unwrap();
476 +
477 + // Publish the project
478 + h.client
479 + .put_json(
480 + &format!("/api/projects/{}", project_id),
481 + r#"{"is_public": true}"#,
482 + )
483 + .await;
484 +
485 + // Insert an active tier via SQL
486 + sqlx::query(
487 + "INSERT INTO subscription_tiers (project_id, name, description, price_cents) \
488 + VALUES ($1, 'Community', 'Join the community', 500)",
489 + )
490 + .bind(project_uuid)
491 + .execute(&h.db)
492 + .await
493 + .unwrap();
494 +
495 + // ── Log out so we are unauthenticated ──
496 + h.client.post_form("/logout", "").await;
497 +
498 + // ── Visit the public project page as anonymous user ──
499 + let resp = h.client.get("/@tiervis/tier-vis").await;
500 + assert_eq!(resp.status, 200, "Public project page should render, got {}", resp.status);
501 + // The project page template should include the tier name
502 + assert!(
503 + resp.text.contains("Community"),
504 + "Public project page should show the tier name 'Community'"
505 + );
506 + }
507 +
508 + #[tokio::test]
509 + async fn sandbox_tier_uses_fake_stripe_ids() {
510 + let mut h = TestHarness::new().await;
511 +
512 + // ── Create a sandbox account via POST /sandbox ──
513 + h.client.fetch_csrf_token().await;
514 + let resp = h.client.post_form("/sandbox", "").await;
515 + assert!(
516 + resp.status.is_redirection(),
517 + "Sandbox creation should redirect, got {} {}",
518 + resp.status, resp.text
519 + );
520 +
521 + // Fetch CSRF for the new session
522 + h.client.fetch_csrf_token().await;
523 +
524 + // Find the sandbox user
525 + let (sandbox_user_id, is_sandbox): (i32, bool) = sqlx::query_as(
526 + "SELECT id, is_sandbox FROM users WHERE username LIKE 'sandbox_%' ORDER BY created_at DESC LIMIT 1",
527 + )
528 + .fetch_one(&h.db)
529 + .await
530 + .unwrap();
531 + assert!(is_sandbox, "User should be a sandbox account");
532 +
533 + // The sandbox user already has a demo project seeded. Find it.
534 + let project_id: uuid::Uuid = sqlx::query_scalar(
535 + "SELECT id FROM projects WHERE user_id = $1 LIMIT 1",
536 + )
537 + .bind(sandbox_user_id)
538 + .fetch_one(&h.db)
539 + .await
540 + .unwrap();
541 +
542 + // ── Create a tier via the API (sandbox users get fake Stripe IDs) ──
543 + let resp = h
544 + .client
545 + .post_json(
546 + &format!("/api/projects/{}/tiers", project_id),
547 + r#"{"name": "Sandbox Tier", "description": "Test tier", "price_cents": 500}"#,
548 + )
549 + .await;
550 + assert!(
551 + resp.status.is_success(),
552 + "Sandbox tier creation should succeed, got {} {}",
553 + resp.status, resp.text
554 + );
555 + let tier: Value = resp.json();
556 + let tier_id = tier["id"].as_str().unwrap();
557 +
558 + // ── Verify the tier has sandbox_ prefixed Stripe IDs ──
559 + let (prod_id, price_id): (Option<String>, Option<String>) = sqlx::query_as(
560 + "SELECT stripe_product_id, stripe_price_id FROM subscription_tiers WHERE id = $1::uuid",
561 + )
562 + .bind(tier_id)
563 + .fetch_one(&h.db)
564 + .await
565 + .unwrap();
566 +
567 + let prod_id = prod_id.expect("Sandbox tier should have stripe_product_id");
568 + let price_id = price_id.expect("Sandbox tier should have stripe_price_id");
569 + assert!(
570 + prod_id.starts_with("sandbox_prod_"),
571 + "Sandbox product ID should start with 'sandbox_prod_', got: {}",
572 + prod_id
573 + );
574 + assert!(
575 + price_id.starts_with("sandbox_price_"),
576 + "Sandbox price ID should start with 'sandbox_price_', got: {}",
577 + price_id
578 + );
579 + }
580 +
581 + #[tokio::test]
202 582 async fn non_owner_cannot_manage_tiers() {
203 583 let mut h = TestHarness::new().await;
204 584