Skip to main content

max / makenotwork

server: gallery/carousel, promo-validator, fuzz #11/#12 remediations, embed port, observability The accumulated 2026-06-05 launch-plan code wave. These changes are interleaved across shared core files (db/mod.rs, scan_jobs.rs, pending_s3_deletions.rs, scanning/worker.rs, storage.rs) and cannot be cleanly separated into per-feature commits without hunk-splitting. - Gallery + carousel (launch plan S.1 / S.7): item_images/project_images tables (migration 135), db/gallery_images.rs, routes/storage/gallery.rs, composable carousel macro + static/carousel.js, gallery manager UI, item/project page render, sealed S3_KEY_REFS entries. - Promo-validator extraction (R Phase 1): db/promo_codes.rs lookup_and_validate_promo + apply_promo_to_item collapse the 4 hand-copied promo blocks (item.rs, cart.rs x2, guest_checkout.rs) into thin call sites. - Ultra Fuzz Run #11/#12 remediations (P/Q): media-confirm data-loss, cleanup pool-exhaustion, subscription-revival terminal guard across all families, min-charge gate, embed CSP, deploy_lint host-agnostic. - Embed Askama port (R Phase 1): src/templates/embed.rs + templates/embed/. - Observability to A+ (R Phase 4): #[instrument] fields + structured logs across git/mod, storage routes, project page, cart, bundles; cart promo releases now warn on failure via release_promo_quietly. - Feed key versioning (migration 134), deploy_lint.rs. Strict clippy (--features fast-tests --all-targets -D warnings) green; 1600 lib + affected integration tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-07 15:10 UTC
Commit: 9cceb187a4e8e752d3aefd4f75a1aa50928b96c8
Parent: 9b7f2bb
133 files changed, +5780 insertions, -1805 deletions
@@ -181,7 +181,18 @@ www.makenot.work {
181 181 on_demand
182 182 }
183 183
184 - reverse_proxy localhost:3000
184 + # Custom domains connect directly to the origin (no Cloudflare mTLS in front),
185 + # so any client-supplied CF-Connecting-IP / X-Forwarded-For is forgeable. The
186 + # app trusts CF-Connecting-IP for rate-limiting, lockouts, and audit logs, so
187 + # overwrite it with the real TCP peer and strip XFF before proxying — a client
188 + # can no longer mint fake source IPs to evade per-IP throttles or poison logs.
189 + reverse_proxy localhost:3000 {
190 + # Set (replace) CF-Connecting-IP to the real TCP peer — overwrites any
191 + # value the client sent. Strip X-Forwarded-For so no forged value reaches
192 + # the app (the app ignores XFF anyway; this is hygiene).
193 + header_up CF-Connecting-IP {http.request.remote.host}
194 + header_up -X-Forwarded-For
195 + }
185 196
186 197 header {
187 198 X-Content-Type-Options "nosniff"
@@ -7,10 +7,15 @@
7 7 # - Allow HTTP/HTTPS (80/443) from anywhere (custom domains need direct access)
8 8 # - Drop everything else
9 9 #
10 - # HTTP/HTTPS is open to all because custom domains bypass Cloudflare.
11 - # makenot.work subdomains remain protected by Caddy mTLS (Authenticated Origin Pulls):
12 - # requests without a valid Cloudflare client cert are rejected by Caddy before
13 - # reaching the application.
10 + # HTTP/HTTPS is open to all because custom domains connect directly (on-demand
11 + # Let's Encrypt TLS, not behind Cloudflare) and arrive from arbitrary client IPs,
12 + # so 443 cannot be CIDR-locked to Cloudflare without breaking that paid feature.
13 + # The IP-spoofing risk this would otherwise create is closed in Caddy instead:
14 + # - makenot.work subdomains: Caddy mTLS (Authenticated Origin Pulls) rejects any
15 + # request without a valid Cloudflare client cert before it reaches the app.
16 + # - custom domains (:443 block): Caddy overwrites CF-Connecting-IP with the real
17 + # TCP peer and strips X-Forwarded-For, so a client cannot forge the source IP
18 + # the app uses for rate-limiting, lockouts, and audit logs.
14 19
15 20 set -e
16 21
@@ -0,0 +1,11 @@
1 + -- Per-user feed key version, folded into the personal-feed URL HMAC.
2 + --
3 + -- The personal RSS feed URL (`/feed/{user_id}?v={version}&sig={hmac}`) is
4 + -- authenticated by an HMAC over `feed:{user_id}:{version}`. Bumping this column
5 + -- changes the signed message, so the user's previously-issued feed URL stops
6 + -- verifying — a leaked or compromised feed link can be revoked in isolation
7 + -- without rotating the global signing secret (which would invalidate EVERY
8 + -- user's feed at once). Starts at 0; the "Regenerate feed URL" action in
9 + -- dashboard settings increments it.
10 + ALTER TABLE users
11 + ADD COLUMN IF NOT EXISTS feed_key_version INTEGER NOT NULL DEFAULT 0;
@@ -0,0 +1,51 @@
1 + -- Ordered image galleries for items and projects (launchplan S.1).
2 + --
3 + -- The single `cover_image_url` on items/projects stays as-is (it remains the
4 + -- OG/Twitter card image). These tables are additive: zero or more extra images
5 + -- per entity, each with an alt string and an explicit display position, render
6 + -- through the shared carousel widget.
7 + --
8 + -- s3_key is stored directly (not just the URL) so the cleanup/garbage paths can
9 + -- match it exactly via the S3_KEY_REFS registry, and so a delete can decrement
10 + -- storage from the recorded file_size_bytes without an S3 HEAD probe (the same
11 + -- drift-avoidance reason items.cover_file_size_bytes exists, migration 126).
12 + --
13 + -- Scanning follows the project-image precedent: gallery uploads enqueue a scan
14 + -- job (target_kind 'gallery_image'); the worker scans + quarantines a malicious
15 + -- object but writes no per-row status column (no display gating), so no
16 + -- scan_status column here.
17 +
18 + CREATE TABLE item_images (
19 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
20 + item_id UUID NOT NULL REFERENCES items(id) ON DELETE CASCADE,
21 + s3_key TEXT NOT NULL,
22 + image_url TEXT NOT NULL,
23 + alt TEXT NOT NULL DEFAULT '',
24 + position INT NOT NULL DEFAULT 0,
25 + file_size_bytes BIGINT NOT NULL DEFAULT 0,
26 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
27 + );
28 +
29 + CREATE INDEX idx_item_images_item ON item_images (item_id, position);
30 +
31 + CREATE TABLE project_images (
32 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
33 + project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
34 + s3_key TEXT NOT NULL,
35 + image_url TEXT NOT NULL,
36 + alt TEXT NOT NULL DEFAULT '',
37 + position INT NOT NULL DEFAULT 0,
38 + file_size_bytes BIGINT NOT NULL DEFAULT 0,
39 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
40 + );
41 +
42 + CREATE INDEX idx_project_images_project ON project_images (project_id, position);
43 +
44 + -- Allow the new scan target kind. Drop-and-re-add is order-independent of any
45 + -- prior alteration to this constraint.
46 + ALTER TABLE scan_jobs DROP CONSTRAINT IF EXISTS scan_jobs_target_kind_check;
47 + ALTER TABLE scan_jobs ADD CONSTRAINT scan_jobs_target_kind_check
48 + CHECK (target_kind IN (
49 + 'version', 'item', 'project_image', 'item_image', 'media',
50 + 'content_insertion', 'gallery_image'
51 + ));
@@ -214,6 +214,7 @@ impl FromRequestParts<crate::AppState> for AuthUser {
214 214 /// - gates paid content or downloads,
215 215 /// - issues OAuth tokens / grants,
216 216 /// - exposes account-private information,
217 + ///
217 218 /// use [`AuthUser`] (required login) or [`MaybeUserVerified`] (optional login
218 219 /// with revocation check) instead.
219 220 pub struct MaybeUserUnverified(pub Option<SessionUser>);
@@ -403,7 +403,15 @@ async fn execute_target(
403 403 // Cleanup remote build dir
404 404 let _ = run_ssh_command(host, &format!("rm -rf {}", shell_escape(&build_dir))).await;
405 405
406 - scp_result.map_err(|e| format!("SCP download failed: {e}"))?;
406 + if let Err(e) = scp_result {
407 + // The main artifact failed, but the .sig sidecar may already be on disk
408 + // from its own scp above. Clean it up before bailing so a retry loop
409 + // doesn't accumulate orphaned .sig temp files (the main temp is removed
410 + // unconditionally further down, but on this early return it was never
411 + // created).
412 + let _ = tokio::fs::remove_file(&local_sig_tmp).await;
413 + return Err(format!("SCP download failed: {e}"));
414 + }
407 415
408 416 // Read signature from .sig file if it was downloaded
409 417 let signature = if scp_sig_result.is_ok() {
@@ -291,423 +291,141 @@ pub const SANDBOX_RATE_LIMIT_BURST: u32 = 10;
291 291 /// Max concurrent active sandboxes per IP.
292 292 pub const SANDBOX_MAX_PER_IP: i64 = 3;
293 293
294 + // ── Compile-time invariants on the constants above ───────────────────────────
295 + //
296 + // Encoded as `const _: () = assert!(...)` rather than `#[test]` functions: these
297 + // are checked when the crate is COMPILED, so a bad constant fails the build
298 + // (not just a test run), and the whole invariant set sits next to the values.
299 +
300 + // Price constants
301 + const _: () = assert!(MAX_PRICE_CENTS > 0);
302 + const _: () = assert!(MAX_PRICE_CENTS <= 10_000_000); // <= $100,000
303 + const _: () = assert!(MIN_SUBSCRIPTION_PRICE_CENTS > 0);
304 + const _: () = assert!(MIN_SUBSCRIPTION_PRICE_CENTS < MAX_PRICE_CENTS);
305 +
306 + // Stripe fee constants
307 + const _: () = assert!(STRIPE_FEE_PERCENTAGE > 0.0 && STRIPE_FEE_PERCENTAGE < 0.5);
308 + const _: () = assert!(STRIPE_FEE_FIXED_CENTS > 0.0);
309 +
310 + // Database pool
311 + const _: () = assert!(DB_POOL_MAX_CONNECTIONS > DB_POOL_MIN_CONNECTIONS);
312 + const _: () = assert!(DB_POOL_MIN_CONNECTIONS > 0);
313 + const _: () = assert!(DB_ACQUIRE_TIMEOUT_SECS > 0);
314 + const _: () = assert!(DB_MAX_LIFETIME_SECS > DB_IDLE_TIMEOUT_SECS);
315 +
316 + // Session constants
317 + const _: () = assert!(SESSION_EXPIRY_DAYS > 0 && SESSION_EXPIRY_DAYS <= 365);
318 + const _: () = assert!(SESSION_TOUCH_CACHE_SECS > 0 && SESSION_TOUCH_CACHE_SECS < 86400);
319 +
320 + // Login security
321 + const _: () = assert!(MAX_LOGIN_ATTEMPTS > 0);
322 + const _: () = assert!(LOCKOUT_MINUTES > 0);
323 +
324 + // Email link expiry ordering
325 + const _: () = assert!(PASSWORD_RESET_EXPIRY_SECS > 0);
326 + const _: () = assert!(EMAIL_VERIFICATION_EXPIRY_SECS > PASSWORD_RESET_EXPIRY_SECS);
327 + const _: () = assert!(ACCOUNT_DELETION_EXPIRY_SECS > 0);
328 +
329 + // Scheduler
330 + const _: () = assert!(SCHEDULER_INTERVAL_SECS > 0);
331 +
332 + // Rate-limit bursts all positive
333 + const _: () = assert!(AUTH_RATE_LIMIT_BURST > 0);
334 + const _: () = assert!(VALIDATE_RATE_LIMIT_BURST > 0);
335 + const _: () = assert!(API_WRITE_RATE_LIMIT_BURST > 0);
336 + const _: () = assert!(API_READ_RATE_LIMIT_BURST > 0);
337 + const _: () = assert!(API_EXPORT_RATE_LIMIT_BURST > 0);
338 + const _: () = assert!(LICENSE_KEY_RATE_LIMIT_BURST > 0);
339 + const _: () = assert!(UPLOAD_RATE_LIMIT_BURST > 0);
340 + const _: () = assert!(OAUTH_RATE_LIMIT_BURST > 0);
341 + const _: () = assert!(OAUTH_TOKEN_RATE_LIMIT_BURST > 0);
342 +
343 + // Rate-limit burst ordering: read > write > auth
344 + const _: () = assert!(API_READ_RATE_LIMIT_BURST > API_WRITE_RATE_LIMIT_BURST);
345 + const _: () = assert!(API_WRITE_RATE_LIMIT_BURST > AUTH_RATE_LIMIT_BURST);
346 +
347 + // Rate-limit intervals positive
348 + const _: () = assert!(AUTH_RATE_LIMIT_MS > 0);
349 + const _: () = assert!(API_WRITE_RATE_LIMIT_MS > 0);
350 + const _: () = assert!(API_READ_RATE_LIMIT_MS > 0);
351 +
352 + // File size limits
353 + const _: () = assert!(SCAN_MAX_MEMORY_BYTES > 0);
354 + const _: () = assert!(SCAN_SPOOL_FREE_RESERVE_BYTES < SCAN_SPOOL_MAX_BYTES);
355 + const _: () = assert!(SCAN_SPOOL_MAX_BYTES > SCAN_MAX_MEMORY_BYTES as u64);
356 + const _: () = assert!(SCAN_JOB_RETENTION_DAYS >= 7); // no same-day purge race
357 + const _: () = assert!(BROADCAST_PARALLELISM > 0 && BROADCAST_PARALLELISM <= 64);
358 + const _: () = assert!(SCAN_ZIP_MAX_UNCOMPRESSED > SCAN_MAX_MEMORY_BYTES as u64);
359 + const _: () = assert!(GIT_RAW_MAX_BYTES > GIT_MAX_FILE_SIZE_BYTES);
360 + const _: () = assert!(SCAN_ZIP_MAX_RATIO > 0.0);
361 + const _: () = assert!(SCAN_ZIP_MAX_DEPTH > 0);
362 +
363 + // SyncKit
364 + const _: () = assert!(SYNCKIT_PUSH_MAX_CHANGES > 0);
365 + const _: () = assert!(SYNCKIT_PULL_PAGE_SIZE > 0);
366 + const _: () = assert!(SYNCKIT_MAX_BLOB_SIZE_BYTES > 0);
367 + const _: () = assert!(SYNCKIT_JWT_EXPIRY_SECS > 0);
368 +
369 + // TOTP
370 + const _: () = assert!(TOTP_DIGITS == 6);
371 + const _: () = assert!(TOTP_STEP == 30);
372 + const _: () = assert!(BACKUP_CODE_COUNT > 0);
373 + const _: () = assert!(BACKUP_CODE_LENGTH > 0);
374 +
375 + // Pagination
376 + const _: () = assert!(DISCOVER_PAGE_SIZE > 0);
377 + const _: () = assert!(FEED_PAGE_SIZE > 0);
378 + const _: () = assert!(PAGINATION_WINDOW_SIZE > 0);
379 +
380 + // String constants non-empty
381 + const _: () = assert!(!DATE_FMT_SHORT.is_empty());
382 + const _: () = assert!(!DATE_FMT_FULL.is_empty());
383 + const _: () = assert!(!DATE_FMT_ISO.is_empty());
384 + const _: () = assert!(!DATE_FMT_DATETIME.is_empty());
385 + const _: () = assert!(!DATE_FMT_DATETIME_UTC.is_empty());
386 + const _: () = assert!(!CHANGELOG_PROJECT_SLUG.is_empty());
387 + const _: () = assert!(!BUILD_ALLOWED_TARGETS.is_empty());
388 +
389 + // Collections
390 + const _: () = assert!(MAX_COLLECTIONS_PER_USER > 0);
391 + const _: () = assert!(MAX_ITEMS_PER_COLLECTION > 0);
392 +
393 + // Build pipeline
394 + const _: () = assert!(BUILD_TIMEOUT_SECS > 0);
395 + const _: () = assert!(BUILD_MAX_LOG_BYTES > 0);
396 +
397 + // Health monitoring
398 + const _: () = assert!(HEALTH_CHECK_INTERVAL_SECS > 0);
399 + const _: () = assert!(ALERT_COOLDOWN_SECS > HEALTH_CHECK_INTERVAL_SECS);
400 +
401 + // Sandbox
402 + const _: () = assert!(SANDBOX_EXPIRY_SECS > 0);
403 + const _: () = assert!(SANDBOX_CLEANUP_INTERVAL_SECS > 0);
404 + const _: () = assert!(SANDBOX_CLEANUP_INTERVAL_SECS < SANDBOX_EXPIRY_SECS as u64);
405 + const _: () = assert!(SANDBOX_MAX_PER_IP > 0);
406 +
407 + // Webhook
408 + const _: () = assert!(WEBHOOK_TIMESTAMP_TOLERANCE_SECS > 0);
409 +
410 + // OAuth
411 + const _: () = assert!(OAUTH_CODE_EXPIRY_SECS > 0);
412 + const _: () = assert!(OAUTH_CODE_LENGTH > 0);
413 +
414 + // Buffer limits
415 + const _: () = assert!(USER_AGENT_MAX_LENGTH > 0);
416 + const _: () = assert!(SYNCKIT_MAX_KEY_ENVELOPE_BYTES > 0);
417 +
294 418 #[cfg(test)]
295 419 mod tests {
296 420 use super::*;
297 421
298 - // -- Price constants --
299 -
300 - #[test]
301 - fn max_price_cents_is_positive() {
302 - assert!(MAX_PRICE_CENTS > 0);
303 - }
304 -
305 - #[test]
306 - fn max_price_cents_sane_upper_bound() {
307 - // Should not exceed $100,000
308 - assert!(MAX_PRICE_CENTS <= 10_000_000);
309 - }
310 -
311 - #[test]
312 - fn min_subscription_price_positive() {
313 - assert!(MIN_SUBSCRIPTION_PRICE_CENTS > 0);
314 - }
315 -
316 - #[test]
317 - fn min_subscription_price_below_max() {
318 - assert!(MIN_SUBSCRIPTION_PRICE_CENTS < MAX_PRICE_CENTS);
319 - }
320 -
321 - // -- Stripe fee constants --
322 -
323 - #[test]
324 - fn stripe_fee_percentage_reasonable() {
325 - assert!(STRIPE_FEE_PERCENTAGE > 0.0);
326 - assert!(STRIPE_FEE_PERCENTAGE < 0.5); // less than 50%
327 - }
328 -
329 - #[test]
330 - fn stripe_fee_fixed_positive() {
331 - assert!(STRIPE_FEE_FIXED_CENTS > 0.0);
332 - }
333 -
334 - // -- Database pool --
335 -
336 - #[test]
337 - fn db_pool_max_exceeds_min() {
338 - assert!(DB_POOL_MAX_CONNECTIONS > DB_POOL_MIN_CONNECTIONS);
339 - }
340 -
341 - #[test]
342 - fn db_pool_min_positive() {
343 - assert!(DB_POOL_MIN_CONNECTIONS > 0);
344 - }
345 -
346 - #[test]
347 - fn db_acquire_timeout_positive() {
348 - assert!(DB_ACQUIRE_TIMEOUT_SECS > 0);
349 - }
350 -
351 - #[test]
352 - fn db_max_lifetime_exceeds_idle_timeout() {
353 - assert!(DB_MAX_LIFETIME_SECS > DB_IDLE_TIMEOUT_SECS);
354 - }
355 -
356 - // -- Session constants --
357 -
358 - #[test]
359 - fn session_expiry_positive() {
360 - assert!(SESSION_EXPIRY_DAYS > 0);
361 - }
362 -
363 - #[test]
364 - fn session_expiry_not_absurd() {
365 - assert!(SESSION_EXPIRY_DAYS <= 365);
366 - }
367 -
368 - #[test]
369 - fn session_touch_cache_positive() {
370 - assert!(SESSION_TOUCH_CACHE_SECS > 0);
371 - }
372 -
373 - #[test]
374 - fn session_touch_cache_less_than_one_day() {
375 - assert!(SESSION_TOUCH_CACHE_SECS < 86400);
376 - }
377 -
378 - // -- Login security --
379 -
380 - #[test]
381 - fn max_login_attempts_positive() {
382 - assert!(MAX_LOGIN_ATTEMPTS > 0);
383 - }
384 -
385 - #[test]
386 - fn lockout_minutes_positive() {
387 - assert!(LOCKOUT_MINUTES > 0);
388 - }
389 -
390 - // -- Email link expiry ordering --
391 -
392 - #[test]
393 - fn password_reset_expiry_positive() {
394 - assert!(PASSWORD_RESET_EXPIRY_SECS > 0);
395 - }
396 -
397 - #[test]
398 - fn email_verification_longer_than_password_reset() {
399 - assert!(EMAIL_VERIFICATION_EXPIRY_SECS > PASSWORD_RESET_EXPIRY_SECS);
400 - }
401 -
402 - #[test]
403 - fn account_deletion_expiry_positive() {
404 - assert!(ACCOUNT_DELETION_EXPIRY_SECS > 0);
405 - }
406 -
407 - // -- Scheduler --
408 -
409 - #[test]
410 - fn scheduler_interval_positive() {
411 - assert!(SCHEDULER_INTERVAL_SECS > 0);
412 - }
413 -
414 - // -- Rate limit bursts all positive --
415 -
416 - #[test]
417 - fn auth_rate_limit_burst_positive() {
418 - assert!(AUTH_RATE_LIMIT_BURST > 0);
419 - }
420 -
421 - #[test]
422 - fn validate_rate_limit_burst_positive() {
423 - assert!(VALIDATE_RATE_LIMIT_BURST > 0);
424 - }
425 -
426 - #[test]
427 - fn api_write_rate_limit_burst_positive() {
428 - assert!(API_WRITE_RATE_LIMIT_BURST > 0);
429 - }
430 -
431 - #[test]
432 - fn api_read_rate_limit_burst_positive() {
433 - assert!(API_READ_RATE_LIMIT_BURST > 0);
434 - }
435 -
436 - #[test]
437 - fn api_export_rate_limit_burst_positive() {
438 - assert!(API_EXPORT_RATE_LIMIT_BURST > 0);
439 - }
440 -
441 - #[test]
442 - fn license_key_rate_limit_burst_positive() {
443 - assert!(LICENSE_KEY_RATE_LIMIT_BURST > 0);
444 - }
445 -
446 - #[test]
447 - fn upload_rate_limit_burst_positive() {
448 - assert!(UPLOAD_RATE_LIMIT_BURST > 0);
449 - }
450 -
451 - #[test]
452 - fn oauth_rate_limit_burst_positive() {
453 - assert!(OAUTH_RATE_LIMIT_BURST > 0);
454 - }
455 -
456 - #[test]
457 - fn oauth_token_rate_limit_burst_positive() {
458 - assert!(OAUTH_TOKEN_RATE_LIMIT_BURST > 0);
459 - }
460 -
461 - // -- Rate limit burst ordering: read > write > auth --
462 -
463 - #[test]
464 - fn api_read_burst_exceeds_write_burst() {
465 - assert!(API_READ_RATE_LIMIT_BURST > API_WRITE_RATE_LIMIT_BURST);
466 - }
467 -
468 - #[test]
469 - fn api_write_burst_exceeds_auth_burst() {
470 - assert!(API_WRITE_RATE_LIMIT_BURST > AUTH_RATE_LIMIT_BURST);
471 - }
472 -
473 - // -- Rate limit intervals positive --
474 -
475 - #[test]
476 - fn auth_rate_limit_ms_positive() {
477 - assert!(AUTH_RATE_LIMIT_MS > 0);
478 - }
479 -
480 - #[test]
481 - fn api_write_rate_limit_ms_positive() {
482 - assert!(API_WRITE_RATE_LIMIT_MS > 0);
483 - }
484 -
485 - #[test]
486 - fn api_read_rate_limit_ms_positive() {
487 - assert!(API_READ_RATE_LIMIT_MS > 0);
488 - }
489 -
490 - // -- File size limits --
491 -
492 - #[test]
493 - fn scan_max_memory_positive() {
494 - assert!(SCAN_MAX_MEMORY_BYTES > 0);
495 - }
496 -
497 - #[test]
498 - fn scan_spool_reserve_below_max() {
499 - assert!(SCAN_SPOOL_FREE_RESERVE_BYTES < SCAN_SPOOL_MAX_BYTES);
500 - }
501 -
502 - #[test]
503 - fn scan_spool_max_exceeds_memory_threshold() {
504 - assert!(SCAN_SPOOL_MAX_BYTES > SCAN_MAX_MEMORY_BYTES as u64);
505 - }
506 -
507 - #[test]
508 - fn scan_job_retention_days_safe_floor() {
509 - // Guards against an accidental same-day purge that would race the
510 - // worker stamping completed_at.
511 - assert!(SCAN_JOB_RETENTION_DAYS >= 7);
512 - }
513 -
514 - #[test]
515 - fn broadcast_parallelism_sane() {
516 - assert!(BROADCAST_PARALLELISM > 0 && BROADCAST_PARALLELISM <= 64);
517 - }
518 -
519 - #[test]
520 - fn scan_zip_max_uncompressed_exceeds_memory_threshold() {
521 - assert!(SCAN_ZIP_MAX_UNCOMPRESSED > SCAN_MAX_MEMORY_BYTES as u64);
522 - }
523 -
524 - #[test]
525 - fn git_raw_max_exceeds_file_display_limit() {
526 - assert!(GIT_RAW_MAX_BYTES > GIT_MAX_FILE_SIZE_BYTES);
527 - }
528 -
529 - #[test]
530 - fn scan_zip_max_ratio_positive() {
531 - assert!(SCAN_ZIP_MAX_RATIO > 0.0);
532 - }
533 -
534 - #[test]
535 - fn scan_zip_max_depth_positive() {
536 - assert!(SCAN_ZIP_MAX_DEPTH > 0);
537 - }
538 -
539 - // -- SyncKit --
540 -
541 - #[test]
542 - fn synckit_push_max_changes_positive() {
543 - assert!(SYNCKIT_PUSH_MAX_CHANGES > 0);
544 - }
545 -
546 - #[test]
547 - fn synckit_pull_page_size_positive() {
548 - assert!(SYNCKIT_PULL_PAGE_SIZE > 0);
549 - }
550 -
551 - #[test]
552 - fn synckit_max_blob_size_positive() {
553 - assert!(SYNCKIT_MAX_BLOB_SIZE_BYTES > 0);
554 - }
555 -
556 - #[test]
557 - fn synckit_jwt_expiry_positive() {
558 - assert!(SYNCKIT_JWT_EXPIRY_SECS > 0);
559 - }
560 -
561 - // -- TOTP --
562 -
563 - #[test]
564 - fn totp_digits_is_six() {
565 - assert_eq!(TOTP_DIGITS, 6);
566 - }
567 -
568 - #[test]
569 - fn totp_step_is_30() {
570 - assert_eq!(TOTP_STEP, 30);
571 - }
572 -
573 - #[test]
574 - fn backup_code_count_positive() {
575 - assert!(BACKUP_CODE_COUNT > 0);
576 - }
577 -
578 - #[test]
579 - fn backup_code_length_positive() {
580 - assert!(BACKUP_CODE_LENGTH > 0);
581 - }
582 -
583 - // -- Pagination --
584 -
585 - #[test]
586 - fn discover_page_size_positive() {
587 - assert!(DISCOVER_PAGE_SIZE > 0);
588 - }
589 -
590 - #[test]
591 - fn feed_page_size_positive() {
592 - assert!(FEED_PAGE_SIZE > 0);
593 - }
594 -
595 - #[test]
596 - fn pagination_window_size_positive() {
597 - assert!(PAGINATION_WINDOW_SIZE > 0);
598 - }
599 -
600 - // -- String constants non-empty --
601 -
602 - #[test]
603 - fn date_formats_non_empty() {
604 - assert!(!DATE_FMT_SHORT.is_empty());
605 - assert!(!DATE_FMT_FULL.is_empty());
606 - assert!(!DATE_FMT_ISO.is_empty());
607 - assert!(!DATE_FMT_DATETIME.is_empty());
608 - assert!(!DATE_FMT_DATETIME_UTC.is_empty());
609 - }
610 -
422 + /// The build-target FORMAT check uses `str::contains`, which isn't const —
423 + /// so this invariant stays a runtime test (the rest are compile-time above).
611 424 #[test]
612 - fn changelog_project_slug_non_empty() {
613 - assert!(!CHANGELOG_PROJECT_SLUG.is_empty());
614 - }
615 -
616 - #[test]
617 - fn build_allowed_targets_non_empty() {
618 - assert!(!BUILD_ALLOWED_TARGETS.is_empty());
425 + fn build_allowed_targets_are_os_slash_arch() {
619 426 for target in BUILD_ALLOWED_TARGETS {
620 427 assert!(!target.is_empty());
621 - assert!(target.contains('/'), "target should be os/arch format: {}", target);
428 + assert!(target.contains('/'), "target should be os/arch format: {target}");
622 429 }
623 430 }
624 -
625 - // -- Collections --
626 -
627 - #[test]
628 - fn max_collections_per_user_positive() {
629 - assert!(MAX_COLLECTIONS_PER_USER > 0);
630 - }
631 -
632 - #[test]
633 - fn max_items_per_collection_positive() {
634 - assert!(MAX_ITEMS_PER_COLLECTION > 0);
635 - }
636 -
637 - // -- Build pipeline --
638 -
639 - #[test]
640 - fn build_timeout_positive() {
641 - assert!(BUILD_TIMEOUT_SECS > 0);
642 - }
643 -
644 - #[test]
645 - fn build_max_log_bytes_positive() {
646 - assert!(BUILD_MAX_LOG_BYTES > 0);
647 - }
648 -
649 - // -- Health monitoring --
650 -
651 - #[test]
652 - fn health_check_interval_positive() {
653 - assert!(HEALTH_CHECK_INTERVAL_SECS > 0);
654 - }
655 -
656 - #[test]
657 - fn alert_cooldown_exceeds_health_check() {
658 - assert!(ALERT_COOLDOWN_SECS > HEALTH_CHECK_INTERVAL_SECS);
659 - }
660 -
661 - // -- Sandbox --
662 -
Lines truncated
@@ -39,34 +39,46 @@ pub fn generate_key_code() -> crate::db::KeyCode {
39 39 crate::db::KeyCode::from_trusted(words.join("-"))
40 40 }
41 41
42 - /// Generate an HMAC-signed personal RSS feed URL for a user.
43 - ///
44 - /// The URL is permanent (no expiry) and tied to the signing secret.
45 - /// If the secret rotates, old URLs become invalid.
46 - pub fn generate_feed_url(host_url: &str, user_id: crate::db::UserId, secret: &str) -> String {
42 + /// Compute the hex HMAC-SHA256 over `feed:{user_id}:{version}` with `secret`.
43 + fn feed_signature(user_id: crate::db::UserId, version: i32, secret: &str) -> String {
47 44 use hmac::{Hmac, Mac};
48 45 use sha2::Sha256;
49 46
50 - let message = format!("feed:{}", user_id);
47 + let message = format!("feed:{user_id}:{version}");
51 48 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes())
52 49 .expect("HMAC-SHA256 accepts any key length");
53 50 mac.update(message.as_bytes());
54 - let sig = hex::encode(mac.finalize().into_bytes());
55 -
56 - format!("{}/feed/{}?sig={}", host_url, user_id, sig)
51 + hex::encode(mac.finalize().into_bytes())
57 52 }
58 53
59 - /// Verify a personal feed URL signature.
60 - pub fn verify_feed_signature(user_id: crate::db::UserId, signature: &str, secret: &str) -> bool {
61 - use hmac::{Hmac, Mac};
62 - use sha2::Sha256;
63 -
64 - let message = format!("feed:{}", user_id);
65 - let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes())
66 - .expect("HMAC-SHA256 accepts any key length");
67 - mac.update(message.as_bytes());
68 - let expected = hex::encode(mac.finalize().into_bytes());
54 + /// Generate an HMAC-signed personal RSS feed URL for a user.
55 + ///
56 + /// The signature covers `feed:{user_id}:{version}`. `version` is the user's
57 + /// `feed_key_version`: bumping it (via the dashboard "Regenerate feed URL"
58 + /// action) changes the signed message and revokes the previously-issued URL
59 + /// for that one user, without rotating the global signing secret (which would
60 + /// invalidate every user's feed at once). The URL is otherwise permanent.
61 + pub fn generate_feed_url(
62 + host_url: &str,
63 + user_id: crate::db::UserId,
64 + version: i32,
65 + secret: &str,
66 + ) -> String {
67 + let sig = feed_signature(user_id, version, secret);
68 + format!("{}/feed/{}?v={}&sig={}", host_url, user_id, version, sig)
69 + }
69 70
71 + /// Verify a personal feed URL signature for a given `(user_id, version)`.
72 + ///
73 + /// The caller MUST additionally check that `version` equals the user's current
74 + /// `feed_key_version` — a valid signature for a stale version is a revoked URL.
75 + pub fn verify_feed_signature(
76 + user_id: crate::db::UserId,
77 + version: i32,
78 + signature: &str,
79 + secret: &str,
80 + ) -> bool {
81 + let expected = feed_signature(user_id, version, secret);
70 82 constant_time_compare(&expected, signature)
71 83 }
72 84
@@ -126,47 +138,60 @@ mod tests {
126 138
127 139 // ── feed URL signing ──
128 140
141 + /// Extract the `sig=` value from a generated feed URL.
142 + fn sig_of(url: &str) -> &str {
143 + url.split("sig=").nth(1).unwrap()
144 + }
145 +
129 146 #[test]
130 147 fn feed_url_round_trip() {
131 148 let user_id = crate::db::UserId::new();
132 - let url = generate_feed_url("https://makenot.work", user_id, "secret");
149 + let url = generate_feed_url("https://makenot.work", user_id, 0, "secret");
133 150 assert!(url.contains(&user_id.to_string()));
151 + assert!(url.contains("v=0"));
134 152 assert!(url.contains("sig="));
135 - let sig = url.split("sig=").nth(1).unwrap();
136 - assert!(verify_feed_signature(user_id, sig, "secret"));
153 + assert!(verify_feed_signature(user_id, 0, sig_of(&url), "secret"));
137 154 }
138 155
139 156 #[test]
140 157 fn feed_url_wrong_secret_rejected() {
141 158 let user_id = crate::db::UserId::new();
142 - let url = generate_feed_url("https://makenot.work", user_id, "secret");
143 - let sig = url.split("sig=").nth(1).unwrap();
144 - assert!(!verify_feed_signature(user_id, sig, "wrong-secret"));
159 + let url = generate_feed_url("https://makenot.work", user_id, 0, "secret");
160 + assert!(!verify_feed_signature(user_id, 0, sig_of(&url), "wrong-secret"));
145 161 }
146 162
147 163 #[test]
148 164 fn feed_url_wrong_user_rejected() {
149 165 let user_id = crate::db::UserId::new();
150 166 let other_id = crate::db::UserId::new();
151 - let url = generate_feed_url("https://makenot.work", user_id, "secret");
152 - let sig = url.split("sig=").nth(1).unwrap();
153 - assert!(!verify_feed_signature(other_id, sig, "secret"));
167 + let url = generate_feed_url("https://makenot.work", user_id, 0, "secret");
168 + assert!(!verify_feed_signature(other_id, 0, sig_of(&url), "secret"));
169 + }
170 +
171 + #[test]
172 + fn feed_url_stale_version_rejected() {
173 + // A signature minted for version 0 must not verify against version 1 —
174 + // this is what makes "Regenerate feed URL" revoke the old link.
175 + let user_id = crate::db::UserId::new();
176 + let url = generate_feed_url("https://makenot.work", user_id, 0, "secret");
177 + assert!(verify_feed_signature(user_id, 0, sig_of(&url), "secret"));
178 + assert!(!verify_feed_signature(user_id, 1, sig_of(&url), "secret"));
154 179 }
155 180
156 181 #[test]
157 182 fn feed_signature_empty_string_rejected() {
158 183 let user_id = crate::db::UserId::new();
159 - assert!(!verify_feed_signature(user_id, "", "secret"));
184 + assert!(!verify_feed_signature(user_id, 0, "", "secret"));
160 185 }
161 186
162 187 #[test]
163 188 fn feed_signature_tampered_rejected() {
164 189 let user_id = crate::db::UserId::new();
165 - let url = generate_feed_url("https://makenot.work", user_id, "secret");
166 - let sig = url.split("sig=").nth(1).unwrap();
190 + let url = generate_feed_url("https://makenot.work", user_id, 0, "secret");
191 + let sig = sig_of(&url);
167 192 let mut tampered = sig.to_string();
168 193 let first = tampered.remove(0);
169 194 tampered.insert(0, if first == '0' { '1' } else { '0' });
170 - assert!(!verify_feed_signature(user_id, &tampered, "secret"));
195 + assert!(!verify_feed_signature(user_id, 0, &tampered, "secret"));
171 196 }
172 197 }
@@ -9,7 +9,7 @@ use crate::error::Result;
9 9 /// Add an item to a bundle at the given sort position.
10 10 ///
11 11 /// Uses `ON CONFLICT DO UPDATE` so re-adding updates the sort position.
12 - #[tracing::instrument(skip_all)]
12 + #[tracing::instrument(skip_all, fields(%bundle_id, %item_id, sort_order))]
13 13 pub async fn add_item_to_bundle(
14 14 pool: &PgPool,
15 15 bundle_id: ItemId,
@@ -33,7 +33,7 @@ pub async fn add_item_to_bundle(
33 33 }
34 34
35 35 /// Remove an item from a bundle.
36 - #[tracing::instrument(skip_all)]
36 + #[tracing::instrument(skip_all, fields(%bundle_id, %item_id))]
37 37 pub async fn remove_item_from_bundle(
38 38 pool: &PgPool,
39 39 bundle_id: ItemId,
@@ -49,7 +49,7 @@ pub async fn remove_item_from_bundle(
49 49 }
50 50
51 51 /// Get all items included in a bundle, ordered by sort_order.
52 - #[tracing::instrument(skip_all)]
52 + #[tracing::instrument(skip_all, fields(%bundle_id))]
53 53 pub async fn get_bundle_items(pool: &PgPool, bundle_id: ItemId) -> Result<Vec<DbItem>> {
54 54 let items = sqlx::query_as::<_, DbItem>(
55 55 r#"
@@ -68,7 +68,7 @@ pub async fn get_bundle_items(pool: &PgPool, bundle_id: ItemId) -> Result<Vec<Db
68 68 }
69 69
70 70 /// Get the IDs of all bundles that contain a given item.
71 - #[tracing::instrument(skip_all)]
71 + #[tracing::instrument(skip_all, fields(%item_id))]
72 72 pub async fn get_bundles_containing_item(
73 73 pool: &PgPool,
74 74 item_id: ItemId,
@@ -87,7 +87,7 @@ pub async fn get_bundles_containing_item(
87 87 ///
88 88 /// Returns true if the user has a completed transaction for any bundle
89 89 /// that contains this item.
90 - #[tracing::instrument(skip_all)]
90 + #[tracing::instrument(skip_all, fields(%user_id, %item_id))]
91 91 pub async fn has_access_via_bundle(
92 92 pool: &PgPool,
93 93 user_id: UserId,
@@ -116,7 +116,7 @@ pub async fn has_access_via_bundle(
116 116 ///
117 117 /// Excludes the bundle itself (by `exclude_bundle_id`) and any items that are
118 118 /// already bundles (to prevent nesting).
119 - #[tracing::instrument(skip_all)]
119 + #[tracing::instrument(skip_all, fields(%project_id, ?exclude_bundle_id))]
120 120 pub async fn get_bundleable_items(
121 121 pool: &PgPool,
122 122 project_id: ProjectId,
@@ -146,7 +146,7 @@ pub async fn get_bundleable_items(
146 146 /// Deletes all existing bundle_items rows for the bundle and inserts the new set.
147 147 /// `item_ids` is an ordered list; sort_order is derived from position.
148 148 /// Validates that both the bundle and all items belong to `owner_id`.
149 - #[tracing::instrument(skip_all)]
149 + #[tracing::instrument(skip_all, fields(%bundle_id, %owner_id, item_count = item_ids.len()))]
150 150 pub async fn set_bundle_items(
151 151 pool: &PgPool,
152 152 bundle_id: ItemId,
@@ -210,7 +210,7 @@ pub async fn set_bundle_items(
210 210 }
211 211
212 212 /// Count how many items are in a bundle.
213 - #[tracing::instrument(skip_all)]
213 + #[tracing::instrument(skip_all, fields(%bundle_id))]
214 214 pub async fn get_bundle_item_count(pool: &PgPool, bundle_id: ItemId) -> Result<i64> {
215 215 let count: i64 = sqlx::query_scalar(
216 216 "SELECT COUNT(*) FROM bundle_items WHERE bundle_id = $1",
@@ -222,10 +222,27 @@ pub async fn get_bundle_item_count(pool: &PgPool, bundle_id: ItemId) -> Result<i
222 222 Ok(count)
223 223 }
224 224
225 + /// Batch child-item counts for several bundles in one query, keyed by bundle.
226 + /// Bundles with no children are absent from the map (callers default to 0).
227 + #[tracing::instrument(skip_all, fields(bundle_count = bundle_ids.len()))]
228 + pub async fn get_bundle_item_counts(
229 + pool: &PgPool,
230 + bundle_ids: &[ItemId],
231 + ) -> Result<std::collections::HashMap<ItemId, i64>> {
232 + let rows: Vec<(ItemId, i64)> = sqlx::query_as(
233 + "SELECT bundle_id, COUNT(*) FROM bundle_items WHERE bundle_id = ANY($1) GROUP BY bundle_id",
234 + )
235 + .bind(bundle_ids)
236 + .fetch_all(pool)
237 + .await?;
238 +
239 + Ok(rows.into_iter().collect())
240 + }
241 +
225 242 /// Get all bundle→child relationships for items within a project.
226 243 ///
227 244 /// Returns `(bundle_id, child_item_id)` pairs ordered by bundle then sort order.
228 - #[tracing::instrument(skip_all)]
245 + #[tracing::instrument(skip_all, fields(%project_id))]
229 246 pub async fn get_project_bundle_map(
230 247 pool: &PgPool,
231 248 project_id: ProjectId,
@@ -249,7 +266,7 @@ pub async fn get_project_bundle_map(
249 266 /// Batch-load bundle maps for multiple projects at once.
250 267 ///
251 268 /// Returns (bundle_id, child_item_id) pairs for all bundles across the given projects.
252 - #[tracing::instrument(skip_all)]
269 + #[tracing::instrument(skip_all, fields(project_count = project_ids.len()))]
253 270 pub async fn get_bundle_maps_by_projects(
254 271 pool: &PgPool,
255 272 project_ids: &[super::ProjectId],
@@ -271,7 +288,7 @@ pub async fn get_bundle_maps_by_projects(
271 288 }
272 289
273 290 /// Check if an item is a member of a bundle.
274 - #[tracing::instrument(skip_all)]
291 + #[tracing::instrument(skip_all, fields(%bundle_id, %child_id))]
275 292 pub async fn is_bundle_member(pool: &PgPool, bundle_id: ItemId, child_id: ItemId) -> Result<bool> {
276 293 let exists: bool = sqlx::query_scalar(
277 294 "SELECT EXISTS(SELECT 1 FROM bundle_items WHERE bundle_id = $1 AND item_id = $2)",
@@ -285,7 +302,7 @@ pub async fn is_bundle_member(pool: &PgPool, bundle_id: ItemId, child_id: ItemId
285 302 }
286 303
287 304 /// Set the `listed` flag on an item.
288 - #[tracing::instrument(skip_all)]
305 + #[tracing::instrument(skip_all, fields(%item_id, listed))]
289 306 pub async fn set_item_listed(pool: &PgPool, item_id: ItemId, listed: bool) -> Result<()> {
290 307 sqlx::query("UPDATE items SET listed = $2 WHERE id = $1")
291 308 .bind(item_id)
@@ -337,7 +354,7 @@ mod tests {
337 354 fn sort_order_vector_generation() {
338 355 // Mirrors the sort_order logic in set_bundle_items
339 356 let item_count = 5;
340 - let orders: Vec<i32> = (0..item_count as i32).collect();
357 + let orders: Vec<i32> = (0..item_count).collect();
341 358 assert_eq!(orders, vec![0, 1, 2, 3, 4]);
342 359 }
343 360
@@ -351,7 +368,7 @@ mod tests {
351 368 fn bundle_id_replication_for_insert() {
352 369 // Mirrors the bundle_ids vector in set_bundle_items
353 370 let bundle_id = ItemId::nil();
354 - let item_ids = vec![ItemId::new(); 3];
371 + let item_ids = [ItemId::new(); 3];
355 372 let bundle_ids: Vec<ItemId> = vec![bundle_id; item_ids.len()];
356 373 assert_eq!(bundle_ids.len(), 3);
357 374 assert!(bundle_ids.iter().all(|id| *id == bundle_id));
@@ -3,7 +3,7 @@
3 3 use chrono::{DateTime, Duration, Utc};
4 4 use sqlx::PgPool;
5 5
6 - use super::{ItemId, UserId};
6 + use super::{ItemId, ProjectId, UserId};
7 7 use crate::error::Result;
8 8
9 9 /// Check if an item is in the user's cart.
@@ -144,6 +144,9 @@ pub async fn toggle_cart_preflight(
144 144 #[derive(Debug, Clone, sqlx::FromRow)]
145 145 pub struct CartItem {
146 146 pub item_id: ItemId,
147 + /// Owning project. Pulled through so project-scoped promo checks don't need
148 + /// a `get_item_by_id` per item at checkout.
149 + pub project_id: ProjectId,
147 150 pub title: String,
148 151 pub item_type: String,
149 152 pub price_cents: i32,
@@ -207,7 +210,7 @@ impl CartItem {
207 210 pub async fn get_cart_items(pool: &PgPool, user_id: UserId) -> Result<Vec<CartItem>> {
208 211 let items = sqlx::query_as::<_, CartItem>(
209 212 r#"
210 - SELECT c.item_id, i.title, i.item_type::TEXT as item_type,
213 + SELECT c.item_id, i.project_id, i.title, i.item_type::TEXT as item_type,
211 214 i.price_cents, i.pwyw_enabled, i.pwyw_min_cents,
212 215 c.amount_cents,
213 216 u.username AS creator_username, p.user_id AS seller_id,
@@ -244,7 +247,7 @@ pub async fn get_cart_items_for_seller(
244 247 ) -> Result<Vec<CartItem>> {
245 248 let items = sqlx::query_as::<_, CartItem>(
246 249 r#"
247 - SELECT c.item_id, i.title, i.item_type::TEXT as item_type,
250 + SELECT c.item_id, i.project_id, i.title, i.item_type::TEXT as item_type,
248 251 i.price_cents, i.pwyw_enabled, i.pwyw_min_cents,
249 252 c.amount_cents,
250 253 u.username AS creator_username, p.user_id AS seller_id,
@@ -341,6 +344,7 @@ mod tests {
341 344 ) -> CartItem {
342 345 CartItem {
343 346 item_id: ItemId::nil(),
347 + project_id: ProjectId::nil(),
344 348 title: String::new(),
345 349 item_type: String::from("audio"),
346 350 price_cents,
@@ -104,6 +104,14 @@ pub async fn get_active_creator_tier(
104 104
105 105 /// Update the status of a creator subscription.
106 106 /// Sets canceled_at when transitioning to canceled, preserving existing value.
107 + /// Returns None if not found OR already canceled.
108 + ///
109 + /// `canceled` is terminal here too (see `subscriptions::update_subscription_status`
110 + /// for the out-of-order-retry revival this guard prevents). Re-subscribing runs
111 + /// through `create_creator_subscription`'s `ON CONFLICT (user_id) DO UPDATE`
112 + /// path, which overwrites the row with the new stripe_subscription_id and flips
113 + /// it active — it does NOT go through this function — so the guard never blocks
114 + /// a legitimate reactivation.
107 115 #[tracing::instrument(skip_all)]
108 116 pub async fn update_creator_sub_status<'e>(
109 117 executor: impl sqlx::PgExecutor<'e>,
@@ -119,6 +127,7 @@ pub async fn update_creator_sub_status<'e>(
119 127 ELSE canceled_at
120 128 END
121 129 WHERE stripe_subscription_id = $1
130 + AND (status != 'canceled' OR $2 = 'canceled')
122 131 RETURNING *
123 132 "#,
124 133 )
@@ -212,6 +221,26 @@ pub async fn get_storage_used(pool: &PgPool, user_id: UserId) -> Result<i64> {
212 221 Ok(used)
213 222 }
214 223
224 + /// Build the "storage cap exceeded" error, reading current usage on `executor`
225 + /// for the message. Shared by the cap-checked UPDATE helpers below so the
226 + /// wording (and the usage read) can't drift between them. Returns the error in
227 + /// `Ok`; a failed usage read propagates as `Err`, matching the prior inline code.
228 + async fn storage_cap_exceeded<'e>(
229 + executor: impl sqlx::PgExecutor<'e>,
230 + user_id: UserId,
231 + max_storage_bytes: i64,
232 + ) -> Result<AppError> {
233 + let used: i64 = sqlx::query_scalar("SELECT storage_used_bytes FROM users WHERE id = $1")
234 + .bind(user_id)
235 + .fetch_one(executor)
236 + .await?;
237 + Ok(AppError::BadRequest(format!(
238 + "You've used {} of {} storage. Delete files or upgrade your tier.",
239 + format_bytes(used),
240 + format_bytes(max_storage_bytes),
241 + )))
242 + }
243 +
215 244 /// Atomically check storage cap and increment the user's storage counter.
216 245 /// Returns an error if the increment would exceed `max_storage_bytes`.
217 246 #[tracing::instrument(skip_all)]
@@ -232,12 +261,7 @@ pub async fn try_increment_storage(
232 261 .await?;
233 262
234 263 if result.rows_affected() == 0 {
235 - let used = get_storage_used(pool, user_id).await?;
236 - return Err(AppError::BadRequest(format!(
237 - "You've used {} of {} storage. Delete files or upgrade your tier.",
238 - format_bytes(used),
239 - format_bytes(max_storage_bytes),
240 - )));
264 + return Err(storage_cap_exceeded(pool, user_id, max_storage_bytes).await?);
241 265 }
242 266
243 267 Ok(())
@@ -266,28 +290,19 @@ pub async fn try_increment_storage_on(
266 290 .await?;
267 291
268 292 if result.rows_affected() == 0 {
269 - let used: i64 = sqlx::query_scalar(
270 - "SELECT storage_used_bytes FROM users WHERE id = $1",
271 - )
272 - .bind(user_id)
273 - .fetch_one(&mut *conn)
274 - .await?;
275 - return Err(AppError::BadRequest(format!(
276 - "You've used {} of {} storage. Delete files or upgrade your tier.",
277 - format_bytes(used),
278 - format_bytes(max_storage_bytes),
279 - )));
293 + return Err(storage_cap_exceeded(&mut *conn, user_id, max_storage_bytes).await?);
280 294 }
281 295
282 296 Ok(())
283 297 }
284 298
285 - /// Atomically replace storage: decrement old file size and increment new file size
286 - /// in a single UPDATE. Prevents storage drift on file replacement by avoiding a
287 - /// window where decrement and increment are separate operations.
299 + /// Atomically replace storage: decrement old file size and increment new file
300 + /// size in a single cap-checked UPDATE, on a caller-supplied connection so it can
301 + /// be bundled with the row UPDATE in one transaction (no storage-drift window on
302 + /// a mid-write failure). Prevents drift by avoiding a separate decrement/increment.
288 303 #[tracing::instrument(skip_all)]
289 - pub async fn try_replace_storage(
290 - pool: &PgPool,
304 + pub async fn try_replace_storage_on(
305 + conn: &mut sqlx::PgConnection,
291 306 user_id: UserId,
292 307 old_bytes: i64,
293 308 new_bytes: i64,
@@ -301,21 +316,37 @@ pub async fn try_replace_storage(
301 316 .bind(old_bytes)
302 317 .bind(new_bytes)
303 318 .bind(max_storage_bytes)
304 - .execute(pool)
319 + .execute(&mut *conn)
305 320 .await?;
306 321
307 322 if result.rows_affected() == 0 {
308 - let used = get_storage_used(pool, user_id).await?;
309 - return Err(AppError::BadRequest(format!(
310 - "You've used {} of {} storage. Delete files or upgrade your tier.",
311 - format_bytes(used),
312 - format_bytes(max_storage_bytes),
313 - )));
323 + return Err(storage_cap_exceeded(&mut *conn, user_id, max_storage_bytes).await?);
314 324 }
315 325
316 326 Ok(())
317 327 }
318 328
329 + /// Apply a confirmed upload's storage credit on a transaction connection:
330 + /// `replace_old_size = Some(old)` swaps an existing file's size for the new one
331 + /// (atomic decrement-old + increment-new); `None` is a fresh increment. Lets the
332 + /// four upload-confirm handlers share one call instead of each re-deriving the
333 + /// replace-vs-increment branch.
334 + #[tracing::instrument(skip_all)]
335 + pub async fn try_apply_storage_on(
336 + conn: &mut sqlx::PgConnection,
337 + user_id: UserId,
338 + replace_old_size: Option<i64>,
339 + new_bytes: i64,
340 + max_storage_bytes: i64,
341 + ) -> Result<()> {
342 + match replace_old_size {
343 + Some(old_bytes) => {
344 + try_replace_storage_on(conn, user_id, old_bytes, new_bytes, max_storage_bytes).await
345 + }
346 + None => try_increment_storage_on(conn, user_id, new_bytes, max_storage_bytes).await,
347 + }
348 + }
349 +
319 350 /// Atomically decrement the user's storage counter (clamped to 0).
320 351 #[tracing::instrument(skip_all)]
321 352 pub async fn decrement_storage_used<'e>(
@@ -58,6 +58,18 @@ pub async fn get_fan_plus_by_stripe_id(
58 58
59 59 /// Update the status of a Fan+ subscription.
60 60 /// Sets canceled_at when transitioning to canceled, preserving existing value.
61 + /// Returns None if not found OR already canceled.
62 + ///
63 + /// `canceled` is terminal: the `AND (status != 'canceled' OR $2 = 'canceled')`
64 + /// guard refuses to transition a canceled Fan+ sub back to active/past_due, so an
65 + /// out-of-order/retried `customer.subscription.updated`(active) landing after a
66 + /// `deleted` can't revive it (Run #12 SERIOUS — the sibling the Run #11 revival
67 + /// guard missed). Re-subscribe runs through `create_fan_plus_subscription`'s
68 + /// ON CONFLICT DO UPDATE, not this setter, so legitimate reactivation is
69 + /// unaffected. Every subscription status setter now enforces this invariant at
70 + /// the data layer (see also `subscriptions::update_subscription_status`,
71 + /// `creator_tiers::update_creator_sub_status`, `synckit_billing::set_billing_status`,
72 + /// `synckit::subscriptions::update_app_sync_subscription_status`).
61 73 #[tracing::instrument(skip_all)]
62 74 pub async fn update_fan_plus_status<'e>(
63 75 executor: impl sqlx::PgExecutor<'e>,
@@ -73,6 +85,7 @@ pub async fn update_fan_plus_status<'e>(
73 85 ELSE canceled_at
74 86 END
75 87 WHERE stripe_subscription_id = $1
88 + AND (status != 'canceled' OR $2 = 'canceled')
76 89 RETURNING *
77 90 "#,
78 91 )
@@ -0,0 +1,214 @@
1 + //! Ordered image galleries for items and projects (launchplan S.1).
2 + //!
3 + //! Two near-identical tables (`item_images` / `project_images`) keyed by parent,
4 + //! each row an image with an alt string and an explicit `position`. The single
5 + //! `cover_image_url` on items/projects is unaffected — it stays the OG/card
6 + //! image; these rows are the additive carousel gallery.
7 + //!
8 + //! `s3_key` + `file_size_bytes` are stored so a delete decrements storage from
9 + //! the recorded size (no S3 HEAD) and the cleanup garbage-collector recognizes
10 + //! live gallery objects via the S3_KEY_REFS registry.
11 +
12 + use sqlx::{PgExecutor, PgPool};
13 + use uuid::Uuid;
14 +
15 + use super::{ItemId, ProjectId};
16 + use crate::error::Result;
17 +
18 + /// One gallery image row (shared shape for item and project galleries).
19 + #[derive(Debug, Clone, sqlx::FromRow)]
20 + pub struct GalleryImage {
21 + pub id: Uuid,
22 + pub s3_key: String,
23 + pub image_url: String,
24 + pub alt: String,
25 + pub position: i32,
26 + pub file_size_bytes: i64,
27 + }
28 +
29 + /// Maximum gallery images per entity. Keeps a single creator from ballooning
30 + /// storage with one listing and bounds the carousel length.
31 + pub const MAX_GALLERY_IMAGES: i64 = 8;
32 +
33 + // ---------------------------------------------------------------------------
34 + // Item galleries
35 + // ---------------------------------------------------------------------------
36 +
37 + /// List an item's gallery images in display order.
38 + #[tracing::instrument(skip_all)]
39 + pub async fn list_for_item<'e>(executor: impl PgExecutor<'e>, item_id: ItemId) -> Result<Vec<GalleryImage>> {
40 + let rows = sqlx::query_as::<_, GalleryImage>(
41 + "SELECT id, s3_key, image_url, alt, position, file_size_bytes \
42 + FROM item_images WHERE item_id = $1 ORDER BY position, created_at",
43 + )
44 + .bind(item_id)
45 + .fetch_all(executor)
46 + .await?;
47 + Ok(rows)
48 + }
49 +
50 + /// Count an item's gallery images (for the per-entity cap check).
51 + #[tracing::instrument(skip_all)]
52 + pub async fn count_for_item<'e>(executor: impl PgExecutor<'e>, item_id: ItemId) -> Result<i64> {
53 + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM item_images WHERE item_id = $1")
54 + .bind(item_id)
55 + .fetch_one(executor)
56 + .await?;
57 + Ok(count)
58 + }
59 +
60 + /// Insert a gallery image at the end (position = current max + 1). Returns the
61 + /// new row id. Takes an executor so it can run inside the confirm transaction.
62 + #[tracing::instrument(skip_all)]
63 + pub async fn insert_for_item<'e>(
64 + executor: impl PgExecutor<'e>,
65 + item_id: ItemId,
66 + s3_key: &str,
67 + image_url: &str,
68 + alt: &str,
69 + file_size_bytes: i64,
70 + ) -> Result<Uuid> {
71 + let id: Uuid = sqlx::query_scalar(
72 + "INSERT INTO item_images (item_id, s3_key, image_url, alt, position, file_size_bytes) \
73 + VALUES ($1, $2, $3, $4, \
74 + COALESCE((SELECT MAX(position) + 1 FROM item_images WHERE item_id = $1), 0), \
75 + $5) \
76 + RETURNING id",
77 + )
78 + .bind(item_id)
79 + .bind(s3_key)
80 + .bind(image_url)
81 + .bind(alt)
82 + .bind(file_size_bytes)
83 + .fetch_one(executor)
84 + .await?;
85 + Ok(id)
86 + }
87 +
88 + /// Delete one item gallery image IF it belongs to an item owned by `user_id`.
89 + /// Returns the deleted row (for storage decrement + S3 cleanup), or None if it
90 + /// did not exist or the caller does not own it.
91 + #[tracing::instrument(skip_all)]
92 + pub async fn delete_for_item<'e>(
93 + executor: impl PgExecutor<'e>,
94 + image_id: Uuid,
95 + user_id: super::UserId,
96 + ) -> Result<Option<GalleryImage>> {
97 + let row = sqlx::query_as::<_, GalleryImage>(
98 + "DELETE FROM item_images WHERE id = $1 AND item_id IN ( \
99 + SELECT i.id FROM items i JOIN projects p ON p.id = i.project_id WHERE p.user_id = $2 \
100 + ) RETURNING id, s3_key, image_url, alt, position, file_size_bytes",
101 + )
102 + .bind(image_id)
103 + .bind(user_id)
104 + .fetch_optional(executor)
105 + .await?;
106 + Ok(row)
107 + }
108 +
109 + /// Reorder an item's gallery to match `ordered_ids` (ids not belonging to the
110 + /// item are ignored). Positions are assigned by list order.
111 + #[tracing::instrument(skip_all)]
112 + pub async fn reorder_item(pool: &PgPool, item_id: ItemId, ordered_ids: &[Uuid]) -> Result<()> {
113 + let mut tx = pool.begin().await?;
114 + for (pos, id) in ordered_ids.iter().enumerate() {
115 + sqlx::query("UPDATE item_images SET position = $1 WHERE id = $2 AND item_id = $3")
116 + .bind(pos as i32)
117 + .bind(id)
118 + .bind(item_id)
119 + .execute(&mut *tx)
120 + .await?;
121 + }
122 + tx.commit().await?;
123 + Ok(())
124 + }
125 +
126 + // ---------------------------------------------------------------------------
127 + // Project galleries
128 + // ---------------------------------------------------------------------------
129 +
130 + /// List a project's gallery images in display order.
131 + #[tracing::instrument(skip_all)]
132 + pub async fn list_for_project<'e>(executor: impl PgExecutor<'e>, project_id: ProjectId) -> Result<Vec<GalleryImage>> {
133 + let rows = sqlx::query_as::<_, GalleryImage>(
134 + "SELECT id, s3_key, image_url, alt, position, file_size_bytes \
135 + FROM project_images WHERE project_id = $1 ORDER BY position, created_at",
136 + )
137 + .bind(project_id)
138 + .fetch_all(executor)
139 + .await?;
140 + Ok(rows)
141 + }
142 +
143 + /// Count a project's gallery images (for the per-entity cap check).
144 + #[tracing::instrument(skip_all)]
145 + pub async fn count_for_project<'e>(executor: impl PgExecutor<'e>, project_id: ProjectId) -> Result<i64> {
146 + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM project_images WHERE project_id = $1")
147 + .bind(project_id)
148 + .fetch_one(executor)
149 + .await?;
150 + Ok(count)
151 + }
152 +
153 + /// Insert a project gallery image at the end. Returns the new row id.
154 + #[tracing::instrument(skip_all)]
155 + pub async fn insert_for_project<'e>(
156 + executor: impl PgExecutor<'e>,
157 + project_id: ProjectId,
158 + s3_key: &str,
159 + image_url: &str,
160 + alt: &str,
161 + file_size_bytes: i64,
162 + ) -> Result<Uuid> {
163 + let id: Uuid = sqlx::query_scalar(
164 + "INSERT INTO project_images (project_id, s3_key, image_url, alt, position, file_size_bytes) \
165 + VALUES ($1, $2, $3, $4, \
166 + COALESCE((SELECT MAX(position) + 1 FROM project_images WHERE project_id = $1), 0), \
167 + $5) \
168 + RETURNING id",
169 + )
170 + .bind(project_id)
171 + .bind(s3_key)
172 + .bind(image_url)
173 + .bind(alt)
174 + .bind(file_size_bytes)
175 + .fetch_one(executor)
176 + .await?;
177 + Ok(id)
178 + }
179 +
180 + /// Delete one project gallery image IF the project is owned by `user_id`.
181 + /// Returns the deleted row, or None if missing / not owned.
182 + #[tracing::instrument(skip_all)]
183 + pub async fn delete_for_project<'e>(
184 + executor: impl PgExecutor<'e>,
185 + image_id: Uuid,
186 + user_id: super::UserId,
187 + ) -> Result<Option<GalleryImage>> {
188 + let row = sqlx::query_as::<_, GalleryImage>(
189 + "DELETE FROM project_images WHERE id = $1 AND project_id IN ( \
190 + SELECT id FROM projects WHERE user_id = $2 \
191 + ) RETURNING id, s3_key, image_url, alt, position, file_size_bytes",
192 + )
193 + .bind(image_id)
194 + .bind(user_id)
195 + .fetch_optional(executor)
196 + .await?;
197 + Ok(row)
198 + }
199 +
200 + /// Reorder a project's gallery to match `ordered_ids`.
201 + #[tracing::instrument(skip_all)]
202 + pub async fn reorder_project(pool: &PgPool, project_id: ProjectId, ordered_ids: &[Uuid]) -> Result<()> {
203 + let mut tx = pool.begin().await?;
204 + for (pos, id) in ordered_ids.iter().enumerate() {
205 + sqlx::query("UPDATE project_images SET position = $1 WHERE id = $2 AND project_id = $3")
206 + .bind(pos as i32)
207 + .bind(id)
208 + .bind(project_id)
209 + .execute(&mut *tx)
210 + .await?;
211 + }
212 + tx.commit().await?;
213 + Ok(())
214 + }
@@ -83,8 +83,8 @@ pub async fn update_item_cover_image_url(
83 83 /// storage credit, scan enqueue, S3 orphan queueing — must check the bool
84 84 /// and roll back on false.
85 85 #[tracing::instrument(skip_all)]
86 - pub async fn update_item_cover(
87 - pool: &PgPool,
86 + pub async fn update_item_cover<'e>(
87 + executor: impl sqlx::PgExecutor<'e>,
88 88 item_id: ItemId,
89 89 user_id: UserId,
90 90 url: &str,
@@ -102,7 +102,7 @@ pub async fn update_item_cover(
102 102 .bind(s3_key)
103 103 .bind(file_size_bytes)
104 104 .bind(user_id)
105 - .execute(pool)
105 + .execute(executor)
106 106 .await?;
107 107
108 108 Ok(result.rows_affected() > 0)
@@ -811,7 +811,9 @@ pub async fn set_mt_thread_id(
811 811 pub async fn get_user_s3_keys(pool: &PgPool, user_id: UserId) -> Result<Vec<ItemS3KeyRow>> {
812 812 let rows = sqlx::query_as::<_, ItemS3KeyRow>(
813 813 r#"
814 - SELECT i.title, p.id AS project_id, p.slug AS project_slug, i.audio_s3_key, i.cover_s3_key, i.video_s3_key
814 + SELECT i.title, p.id AS project_id, p.slug AS project_slug,
815 + i.audio_s3_key, i.cover_s3_key, i.video_s3_key,
816 + i.audio_file_size_bytes, i.cover_file_size_bytes, i.video_file_size_bytes
815 817 FROM items i JOIN projects p ON i.project_id = p.id
816 818 WHERE p.user_id = $1 AND (i.audio_s3_key IS NOT NULL OR i.cover_s3_key IS NOT NULL OR i.video_s3_key IS NOT NULL)
817 819 ORDER BY p.slug, i.sort_order
@@ -63,13 +63,14 @@ pub(crate) mod tips;
63 63 pub(crate) mod project_members;
64 64 pub(crate) mod idempotency;
65 65 pub(crate) mod pending_refunds;
66 - pub(crate) mod webhook_events;
66 + pub mod webhook_events;
67 67 pub(crate) mod scheduler_jobs;
68 68 pub(crate) mod moderation;
69 69 pub(crate) mod wishlists;
70 70 pub(crate) mod cart;
71 + pub mod gallery_images;
71 72 pub mod page_views;
72 - pub(crate) mod pending_s3_deletions;
73 + pub mod pending_s3_deletions;
73 74 pub(crate) mod pending_uploads;
74 75
75 76 pub use id_types::*;
@@ -106,7 +107,7 @@ pub async fn check_sandbox_cap(pool: &PgPool, lock_key: i64, ip: &str) -> Result
106 107 ));
107 108 }
108 109
109 - let count: i64 = sqlx::query_scalar(
110 + let count_result: Result<i64> = sqlx::query_scalar(
110 111 r#"
111 112 SELECT COUNT(*) FROM users u
112 113 JOIN user_sessions us ON us.user_id = u.id
@@ -117,12 +118,19 @@ pub async fn check_sandbox_cap(pool: &PgPool, lock_key: i64, ip: &str) -> Result
117 118 )
118 119 .bind(ip)
119 120 .fetch_one(&mut *conn)
120 - .await?;
121 + .await
122 + .map_err(Into::into);
121 123
122 - sqlx::query("SELECT pg_advisory_unlock($1)")
124 + // Release the advisory lock on EVERY exit path, not just the success one.
125 + // If the COUNT above errored, an early `?` would return the connection to
126 + // the pool with the session-level lock still held — it would only clear
127 + // when `max_lifetime` rotates the connection out (up to 30 min later),
128 + // silently wedging the per-IP lock key in the meantime. Best-effort unlock
129 + // (a failed unlock is itself cleared by connection rotation).
130 + let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
123 131 .bind(lock_key)
124 132 .execute(&mut *conn)
125 - .await?;
133 + .await;
126 134
127 - Ok(count)
135 + count_result
128 136 }
@@ -114,7 +114,7 @@ pub struct DbImportJob {
114 114 pub completed_at: Option<DateTime<Utc>>,
115 115 }
116 116
117 - /// An item's S3 keys for content export.
117 + /// An item's S3 keys (and DB-known file sizes) for content export.
118 118 #[derive(Debug, Clone, FromRow)]
119 119 pub struct ItemS3KeyRow {
120 120 pub title: String,
@@ -123,9 +123,12 @@ pub struct ItemS3KeyRow {
123 123 pub audio_s3_key: Option<String>,
124 124 pub cover_s3_key: Option<String>,
125 125 pub video_s3_key: Option<String>,
126 + pub audio_file_size_bytes: Option<i64>,
127 + pub cover_file_size_bytes: Option<i64>,
128 + pub video_file_size_bytes: Option<i64>,
126 129 }
127 130
128 - /// A version's S3 key for content export.
131 + /// A version's S3 key (and DB-known file size) for content export.
129 132 #[derive(Debug, Clone, FromRow)]
130 133 pub struct VersionS3KeyRow {
131 134 pub s3_key: Option<String>,
@@ -134,4 +137,5 @@ pub struct VersionS3KeyRow {
134 137 pub item_title: String,
135 138 pub project_id: super::super::ProjectId,
136 139 pub project_slug: Slug,
140 + pub file_size_bytes: Option<i64>,
137 141 }
@@ -190,6 +190,10 @@ pub struct DbUser {
190 190 /// subscriptions on this account. NULL after the close means "lost
191 191 /// eligibility"; they pay sticker prices on any future subscription.
192 192 pub founder_locked_at: Option<DateTime<Utc>>,
193 + /// Version counter folded into the personal-feed URL HMAC. Bumping it (via
194 + /// the "Regenerate feed URL" dashboard action) revokes the user's existing
195 + /// feed link without rotating the global signing secret. Starts at 0.
196 + pub feed_key_version: i32,
193 197 }
194 198
195 199 impl DbUser {
@@ -320,6 +324,7 @@ mod tests {
320 324 jwt_invalidated_at: None,
321 325 is_founder: false,
322 326 founder_locked_at: None,
327 + feed_key_version: 0,
323 328 }
324 329 }
325 330
@@ -50,51 +50,131 @@ pub async fn remove_completed(pool: &PgPool, ids: &[Uuid]) -> Result<()> {
50 50 Ok(())
51 51 }
52 52
53 - /// Returns true if any live row in the given bucket's storage tables still
54 - /// references `s3_key`. Used by the deletion worker to detect the
55 - /// delete-then-reupload race: a freshly-uploaded object reusing a queued key
56 - /// must not be torpedoed by the worker draining the queue.
53 + /// Move permanently-failing deletions off the hot queue into the operator-visible
54 + /// dead-letter table (see migration 129). Atomic per the single-statement CTE:
55 + /// the rows are deleted from `pending_s3_deletions` and inserted into
56 + /// `pending_s3_deletions_dead_letter` in one operation, preserving their
57 + /// `created_at`/`attempts`/`last_attempted_at`. Returns the number moved.
57 58 ///
58 - /// `projects.cover_image_url` stores a full URL (not a bare s3_key) so we
59 - /// check via `LIKE %s3_key`. Project image keys live under the
60 - /// `projects/{id}/image/` prefix and don't collide with item/media keys, so
61 - /// a positive match here is genuine. Without this clause a project cover
62 - /// could be deleted out from under a still-live row.
59 + /// Without this the worker only logged and DELETEd such rows, so a malformed
60 + /// key / gone bucket / ACL gap left an S3 object orphaned with no durable
61 + /// record — the exact leak the dead-letter table was added to prevent.
63 62 #[tracing::instrument(skip_all)]
64 - pub async fn is_s3_key_live(pool: &PgPool, bucket: &str, s3_key: &str) -> Result<bool> {
65 - let live = if bucket == "synckit" {
66 - sqlx::query_scalar::<_, bool>(
67 - "SELECT EXISTS(SELECT 1 FROM sync_blobs WHERE s3_key = $1)",
63 + pub async fn move_to_dead_letter(pool: &PgPool, ids: &[Uuid]) -> Result<u64> {
64 + if ids.is_empty() {
65 + return Ok(0);
66 + }
67 + let result = sqlx::query(
68 + r#"
69 + WITH moved AS (
70 + DELETE FROM pending_s3_deletions
71 + WHERE id = ANY($1)
72 + RETURNING id, s3_key, bucket, source, created_at, attempts, last_attempted_at
68 73 )
69 - .bind(s3_key)
70 - .fetch_one(pool)
71 - .await?
72 - } else {
73 - // Escape the SQL LIKE wildcards (`_` matches one char, `%` matches
74 - // any) so an s3_key containing literal underscores from a user-
75 - // supplied media folder name (e.g. `cool_stuff/file.png`) doesn't
76 - // false-positive against neighbouring rows.
74 + INSERT INTO pending_s3_deletions_dead_letter
75 + (id, s3_key, bucket, source, created_at, attempts, last_attempted_at)
76 + SELECT id, s3_key, bucket, source, created_at, attempts, last_attempted_at
77 + FROM moved
78 + ON CONFLICT (id) DO NOTHING
79 + "#,
80 + )
81 + .bind(ids)
82 + .execute(pool)
83 + .await?;
84 + Ok(result.rows_affected())
85 + }
86 +
87 + /// How a table column references an S3 object key.
88 + #[derive(Clone, Copy)]
89 + enum S3KeyMatch {
90 + /// `column = $1` — the column stores the bare s3_key.
91 + Exact,
92 + /// `column LIKE $2 ESCAPE '\'` — the column stores a full URL ending in the
93 + /// s3_key (e.g. a CDN cover-image URL). The bound value is `%{escaped_key}`.
94 + UrlSuffix,
95 + }
96 +
97 + /// One place a live S3 object key can still be referenced. [`is_s3_key_live`] is
98 + /// generated from this registry, so a new s3_key-bearing table is added in
99 + /// exactly one place AND declares its own bucket — it can't be checked against
100 + /// the wrong bucket (the OTA-key-checked-in-the-main-branch bug this replaced)
101 + /// or silently forgotten by one branch of a hand-written query.
102 + struct S3KeyRef {
103 + bucket: &'static str,
104 + table: &'static str,
105 + column: &'static str,
106 + matching: S3KeyMatch,
107 + }
108 +
109 + /// The registry. To add a table that stores an S3 key, add a row here.
110 + const S3_KEY_REFS: &[S3KeyRef] = &[
111 + // ── main bucket: creator content ──
112 + S3KeyRef { bucket: "main", table: "media_files", column: "s3_key", matching: S3KeyMatch::Exact },
113 + S3KeyRef { bucket: "main", table: "versions", column: "s3_key", matching: S3KeyMatch::Exact },
114 + S3KeyRef { bucket: "main", table: "items", column: "audio_s3_key", matching: S3KeyMatch::Exact },
115 + S3KeyRef { bucket: "main", table: "items", column: "cover_s3_key", matching: S3KeyMatch::Exact },
116 + S3KeyRef { bucket: "main", table: "items", column: "video_s3_key", matching: S3KeyMatch::Exact },
117 + // cover_image_url stores a full CDN URL, not a bare key; image keys live
118 + // under their own `.../image/` prefix so a suffix match is unambiguous.
119 + S3KeyRef { bucket: "main", table: "items", column: "cover_image_url", matching: S3KeyMatch::UrlSuffix },
120 + S3KeyRef { bucket: "main", table: "projects", column: "cover_image_url", matching: S3KeyMatch::UrlSuffix },
121 + // gallery images store the bare s3_key directly (not a URL) → exact match.
122 + S3KeyRef { bucket: "main", table: "item_images", column: "s3_key", matching: S3KeyMatch::Exact },
123 + S3KeyRef { bucket: "main", table: "project_images", column: "s3_key", matching: S3KeyMatch::Exact },
124 + S3KeyRef { bucket: "main", table: "content_insertions", column: "storage_key", matching: S3KeyMatch::Exact },
125 + // ── synckit bucket: SyncKit blobs + OTA artifacts (both deterministic keys) ──
126 + S3KeyRef { bucket: "synckit", table: "sync_blobs", column: "s3_key", matching: S3KeyMatch::Exact },
127 + S3KeyRef { bucket: "synckit", table: "ota_artifacts", column: "s3_key", matching: S3KeyMatch::Exact },
128 + ];
129 +
130 + /// Returns true if any live row in `bucket` still references `s3_key`. Used by
131 + /// the deletion worker to detect the delete-then-reupload race: a freshly
132 + /// uploaded object reusing a queued key must not be torpedoed by the worker
133 + /// draining the queue. The tables checked are [`S3_KEY_REFS`] filtered to
134 + /// `bucket` — add a new key-bearing table there.
135 + ///
136 + /// Note: this deliberately does NOT consult `pending_uploads`. An in-flight
137 + /// upload whose S3 PUT finished but whose durable row hasn't committed is
138 + /// invisible here. That window is safe in practice because the durable row is
139 + /// written early for every key class (OTA writes `ota_artifacts` at presign
140 + /// time; same-name item/version/media replaces are caught by their confirm
141 + /// handlers' idempotency guards before any delete is queued), and a queued
142 + /// delete only fires after a >=10-minute staleness check. Adding
143 + /// `pending_uploads` would broaden the live-set to ephemeral rows the reaper is
144 + /// meant to clean, so it is intentionally excluded.
145 + #[tracing::instrument(skip_all)]
146 + pub async fn is_s3_key_live(pool: &PgPool, bucket: &str, s3_key: &str) -> Result<bool> {
147 + let refs: Vec<&S3KeyRef> = S3_KEY_REFS.iter().filter(|r| r.bucket == bucket).collect();
148 + if refs.is_empty() {
149 + // Only "main"/"synckit" exist, so an unregistered bucket is unexpected.
150 + // Refuse to declare the key dead — don't let the worker delete an object
151 + // we have no way to verify.
152 + tracing::warn!(bucket, "is_s3_key_live: no registered tables for bucket; treating key as live");
153 + return Ok(true);
154 + }
155 +
156 + // Table/column names are compile-time constants from S3_KEY_REFS (never user
157 + // input); the key value is always a bound parameter ($1 exact, $2 url-suffix).
158 + let needs_url_suffix = refs.iter().any(|r| matches!(r.matching, S3KeyMatch::UrlSuffix));
159 + let clauses: Vec<String> = refs
160 + .iter()
161 + .map(|r| match r.matching {
162 + S3KeyMatch::Exact => format!("EXISTS(SELECT 1 FROM {} WHERE {} = $1)", r.table, r.column),
163 + S3KeyMatch::UrlSuffix => {
164 + format!("EXISTS(SELECT 1 FROM {} WHERE {} LIKE $2 ESCAPE '\\')", r.table, r.column)
165 + }
166 + })
167 + .collect();
168 + let sql = format!("SELECT {}", clauses.join(" OR "));
169 +
170 + let mut query = sqlx::query_scalar::<_, bool>(&sql).bind(s3_key);
171 + if needs_url_suffix {
172 + // Escape LIKE wildcards so a key with a literal `_`/`%` (from a user
173 + // folder name) can't false-positive against neighbouring rows.
77 174 let escaped = s3_key.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_");
78 - let url_suffix = format!("%{escaped}");
79 - sqlx::query_scalar::<_, bool>(
80 - r#"
81 - SELECT
82 - EXISTS(SELECT 1 FROM media_files WHERE s3_key = $1)
83 - OR EXISTS(SELECT 1 FROM versions WHERE s3_key = $1)
84 - OR EXISTS(SELECT 1 FROM ota_artifacts WHERE s3_key = $1)
85 - OR EXISTS(SELECT 1 FROM items
86 - WHERE audio_s3_key = $1 OR cover_s3_key = $1 OR video_s3_key = $1)
87 - OR EXISTS(SELECT 1 FROM items WHERE cover_image_url LIKE $2 ESCAPE '\')
88 - OR EXISTS(SELECT 1 FROM projects WHERE cover_image_url LIKE $2 ESCAPE '\')
89 - OR EXISTS(SELECT 1 FROM content_insertions WHERE storage_key = $1)
90 - "#,
91 - )
92 - .bind(s3_key)
93 - .bind(&url_suffix)
94 - .fetch_one(pool)
95 - .await?
96 - };
97 - Ok(live)
175 + query = query.bind(format!("%{escaped}"));
176 + }
177 + Ok(query.fetch_one(pool).await?)
98 178 }
99 179
100 180 /// Fetch stale pending deletions (older than min_age, up to limit).
@@ -284,8 +284,8 @@ pub async fn get_projects_without_mt_community(pool: &PgPool) -> Result<Vec<DbPr
284 284 /// Callers that fire side-effects after the write — storage credit, scan
285 285 /// enqueue, S3 orphan queueing — must check the bool and roll back on false.
286 286 #[tracing::instrument(skip_all)]
287 - pub async fn update_project_image_url(
288 - pool: &PgPool,
287 + pub async fn update_project_image_url<'e>(
288 + executor: impl sqlx::PgExecutor<'e>,
289 289 id: ProjectId,
290 290 user_id: UserId,
291 291 url: &str,
@@ -294,7 +294,7 @@ pub async fn update_project_image_url(
294 294 .bind(url)
295 295 .bind(id)
296 296 .bind(user_id)
297 - .execute(pool)
297 + .execute(executor)
298 298 .await?;
299 299
300 300 Ok(result.rows_affected() > 0)