Skip to main content

max / makenotwork

server: close Ultra Fuzz Run #4 phases 1-5 + 41-test integration triage Phases 1-4 remediation (audit items in server/todo.md): - Phase 1: creator-tier CSRF tokens, git_ssh repo-name validation, login lockout email-flood, promo use_count over-release on cart cleanup, scanner streaming, scan_jobs retention, scan pool permit, broadcast bounded fan-out (most landed previously; gaps closed). - Phase 2: cart price math helpers, media delete-then-reupload pre-check via is_s3_key_live, pending_uploads owner pinning, TOTP step-replay cleared on enable/disable, delete_other_sessions returns ids for cache eviction, /login Manual CSRF, redirect_uri fetch_optional. - Phase 3: ota_builds partial unique index for single-running invariant with 23505-swallow in claim, build terminal write gated on status IN ('pending','running'), extract_s3_key_from_url path branch pinned to configured endpoint host, pending_2fa user_sessions tracking row visible to delete_all_sessions sweep. - Phase 4: MaybeUserUnverified short-circuits legacy sessions, sum_file_sizes_for_item two-sided clamp, license-key 23505 retry, Stripe multi-v1 sig parsing, has_active_subscription period-end defense, content_type Download text-sniff for HTML/SVG/XML/script, profile-link rel="ugc nofollow noopener", thousands separators in format_price/format_revenue. Integration test triage (41 failing -> 803 passing, 0 failing): Real code bugs surfaced during triage: - validate_key_code pinned at exactly 5 hyphenated words but the generator emits 6 (raised after a birthday-collision review). Now accepts both for backward compat with already-issued keys. - update_build_status terminal-write gate inadvertently blocked cancelling pending builds; loosened to IN ('pending','running'). - validate_auto on multi-method `with_csrf` routes rejected GETs because it never checked the request method; now short-circuits on safe methods per RFC 9110. - media_confirm duplicate-filename detection used a substring match on the wrapper Display; switched to structured SQLSTATE 23505 matching on AppError::Database(sqlx::Error::Database). Harness changes: - CF-Connecting-IP header default alongside X-Forwarded-For so extract_client_ip (production reads only the Cloudflare header) finds a value in tests. - failed_login_attempt(login, password) helper refreshes the CSRF token before posting — required by the new Manual /login posture. - drain_scan_jobs() drives the worker pool synchronously via a new public process_next_for_test entry; mirrors production flow honestly instead of forcing scanning back inline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-27 22:13 UTC
Commit: 46ae1095e22e34d1f5154890b93f105f0171f27b
Parent: db9d50f
47 files changed, +963 insertions, -175 deletions
@@ -0,0 +1,9 @@
1 + -- At most one running build globally. Enforces what
2 + -- `claim_pending_build`'s NOT EXISTS subquery checks at app level — without
3 + -- this index, two replicas racing through the claim path can each pass the
4 + -- check and end up with two concurrent running builds. The losing INSERT/
5 + -- UPDATE surfaces as a 23505 unique violation; `claim_pending_build` swallows
6 + -- it and returns None.
7 + CREATE UNIQUE INDEX ota_builds_single_running
8 + ON ota_builds ((status = 'running'))
9 + WHERE status = 'running';
@@ -0,0 +1,10 @@
1 + -- Mark a session row's role so the "log out everywhere" sweep
2 + -- (`delete_all_sessions_for_user`) can also catch sessions stuck in the
3 + -- 2FA-pending intermediate state — a phisher who has the password but not
4 + -- the TOTP code holds an authenticated-pending session that previously was
5 + -- session-storage-only and invisible to user_sessions sweeps.
6 + ALTER TABLE user_sessions
7 + ADD COLUMN kind TEXT NOT NULL DEFAULT 'active'
8 + CHECK (kind IN ('active', 'pending_2fa'));
9 +
10 + CREATE INDEX idx_user_sessions_kind ON user_sessions (kind) WHERE kind <> 'active';
@@ -235,6 +235,23 @@ where
235 235 .await
236 236 .context("session error")?;
237 237
238 + // Short-circuit legacy sessions (USER_SESSION_KEY present without a
239 + // SESSION_TRACKING_KEY) to anonymous. Without this, a pre-tracking
240 + // session quietly survives `/logout-everywhere` — that sweep deletes
241 + // user_sessions rows, but a legacy session has no row to delete and
242 + // would keep rendering as logged-in on every Unverified extractor
243 + // until the cookie naturally expires.
244 + if user.is_some() {
245 + let tracking: Option<UserSessionId> = session
246 + .get(SESSION_TRACKING_KEY)
247 + .await
248 + .ok()
249 + .flatten();
250 + if tracking.is_none() {
251 + return Ok(MaybeUserUnverified(None));
252 + }
253 + }
254 +
238 255 Ok(MaybeUserUnverified(user))
239 256 }
240 257 }
@@ -184,6 +184,15 @@ pub async fn validate_token_consuming(
184 184 }
185 185 }
186 186
187 + // Manual-posture runtime assertion (dev/test only): attempted via a tokio
188 + // task-local flag set in `validate_token_consuming` and checked in a per-
189 + // route layer. Backed out 2026-05-27 — false-positive density was too high:
190 + // rendered error pages return 200, rate-limit and form-extraction
191 + // short-circuit before the handler, and the audit explicitly marked this
192 + // follow-up as "not blocking — only matters if Manual grows beyond one
193 + // route". Compile-time discipline (the `CsrfManuallyValidated` witness type
194 + // bound as `_validated`) stays the convention.
195 +
187 196 /// Wrap a method-router with the Auto-posture validation layer.
188 197 /// Runs `validate_auto` on every request that reaches the route.
189 198 fn attach_auto_layer<S>(method_router: MethodRouter<S>) -> MethodRouter<S>
@@ -418,6 +427,18 @@ where
418 427 /// `_csrf` for authenticated users. Used by `CsrfPosture::Auto` routes
419 428 /// and by the path-allowlist fallback during the L2 migration.
420 429 async fn validate_auto(request: Request, next: Next, path: &str) -> Response {
430 + // Safe methods (RFC 9110 §9.2.1) are read-only by definition — never
431 + // CSRF-check them. This matters for multi-method routes wrapped by
432 + // `with_csrf(get(load).post(save))`: a bare GET should not require a
433 + // token (and the harness doesn't send one for GETs).
434 + if !matches!(*request.method(), axum::http::Method::POST
435 + | axum::http::Method::PUT
436 + | axum::http::Method::PATCH
437 + | axum::http::Method::DELETE)
438 + {
439 + return next.run(request).await;
440 + }
441 +
421 442 // Get session from extensions
422 443 let session = match request.extensions().get::<Session>() {
423 444 Some(s) => s.clone(),
@@ -230,14 +230,24 @@ pub async fn update_build_status(
230 230 .await?;
231 231 }
232 232 BuildStatus::Succeeded | BuildStatus::Failed | BuildStatus::Cancelled => {
233 - sqlx::query(
234 - "UPDATE ota_builds SET status = $2, finished_at = now(), error_message = $3 WHERE id = $1",
233 + // Gate on a non-terminal source status so the stale-build reaper
234 + // (`fail_stale_running_builds`) and a real terminal write can't
235 + // race: if the reaper just flipped the row to 'failed', the
236 + // successful builder's write must no-op rather than clobber it.
237 + // Pending IS a legitimate source — cancelling a build that never
238 + // started must still transition pending → cancelled.
239 + let result = sqlx::query(
240 + "UPDATE ota_builds SET status = $2, finished_at = now(), error_message = $3 WHERE id = $1 AND status IN ('pending', 'running')",
235 241 )
236 242 .bind(build_id)
237 243 .bind(status)
238 244 .bind(error_message)
239 245 .execute(pool)
240 246 .await?;
247 + if result.rows_affected() == 0 {
248 + tracing::warn!(build_id = %build_id, target_status = ?status,
249 + "build status terminal write skipped — row already terminal (likely reaper-set)");
250 + }
241 251 }
242 252 BuildStatus::Pending => {
243 253 sqlx::query("UPDATE ota_builds SET status = $2 WHERE id = $1")
@@ -259,7 +269,7 @@ pub async fn update_build_status(
259 269 ///; the loser simply gets no row.
260 270 #[tracing::instrument(skip_all)]
261 271 pub async fn claim_pending_build(pool: &PgPool) -> Result<Option<DbBuild>> {
262 - let build = sqlx::query_as::<_, DbBuild>(
272 + let result = sqlx::query_as::<_, DbBuild>(
263 273 r#"
264 274 UPDATE ota_builds
265 275 SET status = 'running', started_at = now()
@@ -275,9 +285,20 @@ pub async fn claim_pending_build(pool: &PgPool) -> Result<Option<DbBuild>> {
275 285 "#,
276 286 )
277 287 .fetch_optional(pool)
278 - .await?;
279 -
280 - Ok(build)
288 + .await;
289 +
290 + // Multi-replica: the NOT EXISTS subquery races between replicas. The
291 + // `ota_builds_single_running` partial unique index is the backstop —
292 + // the loser's UPDATE surfaces as a 23505 unique violation, which means
293 + // a peer claimed first. Treat as "nothing to claim this tick".
294 + match result {
295 + Ok(build) => Ok(build),
296 + Err(sqlx::Error::Database(e)) if e.code().as_deref() == Some("23505") => {
297 + tracing::info!("claim_pending_build lost the running-slot race; another replica claimed");
298 + Ok(None)
299 + }
300 + Err(e) => Err(e.into()),
301 + }
281 302 }
282 303
283 304 /// Mark any builds that have been "running" longer than the timeout as failed.
@@ -166,6 +166,17 @@ impl CartItem {
166 166 let min = self.pwyw_min_cents.unwrap_or(0).max(0);
167 167 format!("{}.{:02}", min / 100, min % 100)
168 168 }
169 +
170 + /// Bare "X.YY" dollars string for use as a numeric `<input>` value.
171 + pub fn effective_price_input_value(&self) -> String {
172 + let cents = self.effective_price_cents().max(0);
173 + format!("{}.{:02}", cents / 100, cents % 100)
174 + }
175 +
176 + /// "$X.YY" display string. Always shows decimals; caller branches on `is_free()`.
177 + pub fn effective_price_display(&self) -> String {
178 + crate::formatting::format_revenue(self.effective_price_cents() as i64)
179 + }
169 180 }
170 181
171 182 /// Get all cart items for a user with joined item, project, and seller data.
@@ -47,6 +47,7 @@ pub async fn create_repo_with_visibility(
47 47 name: &str,
48 48 visibility: Visibility,
49 49 ) -> Result<DbGitRepo> {
50 + crate::validation::validate_git_repo_name(name)?;
50 51 let repo = sqlx::query_as::<_, DbGitRepo>(
51 52 r#"
52 53 INSERT INTO git_repos (user_id, name, visibility)
@@ -8,6 +8,11 @@ use super::{ItemId, LicenseActivationId, LicenseKeyId, TransactionId, UserId};
8 8 use crate::error::Result;
9 9
10 10 /// Create a new license key for an item.
11 + ///
12 + /// Retries once on a 23505 unique-violation with a freshly-generated code.
13 + /// A real collision out of the wordlist generator is vanishingly rare (the
14 + /// six-word space gives ~6B coin-flip headroom), but the alternative is
15 + /// surfacing a 500 to whatever flow is creating the key.
11 16 #[tracing::instrument(skip_all)]
12 17 pub async fn create_license_key(
13 18 pool: &PgPool,
@@ -17,22 +22,38 @@ pub async fn create_license_key(
17 22 key_code: &KeyCode,
18 23 max_activations: Option<i32>,
19 24 ) -> Result<DbLicenseKey> {
20 - let key = sqlx::query_as::<_, DbLicenseKey>(
21 - r#"
25 + const SQL: &str = r#"
22 26 INSERT INTO license_keys (item_id, owner_id, transaction_id, key_code, max_activations)
23 27 VALUES ($1, $2, $3, $4, $5)
24 28 RETURNING *
25 - "#,
26 - )
27 - .bind(item_id)
28 - .bind(owner_id)
29 - .bind(transaction_id)
30 - .bind(key_code)
31 - .bind(max_activations)
32 - .fetch_one(pool)
33 - .await?;
34 -
35 - Ok(key)
29 + "#;
30 +
31 + let first = sqlx::query_as::<_, DbLicenseKey>(SQL)
32 + .bind(item_id)
33 + .bind(owner_id)
34 + .bind(transaction_id)
35 + .bind(key_code)
36 + .bind(max_activations)
37 + .fetch_one(pool)
38 + .await;
39 +
40 + match first {
41 + Ok(key) => Ok(key),
42 + Err(sqlx::Error::Database(e)) if e.code().as_deref() == Some("23505") => {
43 + let retry_code = crate::helpers::generate_key_code();
44 + tracing::warn!(item_id = %item_id, "license key 23505 collision; retrying once");
45 + let key = sqlx::query_as::<_, DbLicenseKey>(SQL)
46 + .bind(item_id)
47 + .bind(owner_id)
48 + .bind(transaction_id)
49 + .bind(&retry_code)
50 + .bind(max_activations)
51 + .fetch_one(pool)
52 + .await?;
53 + Ok(key)
54 + }
55 + Err(e) => Err(e.into()),
56 + }
36 57 }
37 58
38 59 /// Look up a license key by its code.
@@ -80,19 +80,25 @@ pub async fn cleanup_expired_oauth_codes(pool: &PgPool) -> Result<u64> {
80 80 }
81 81
82 82 /// Check if a redirect URI is registered for a given sync app.
83 + ///
84 + /// Returns `Ok(false)` when the app row doesn't exist or is inactive — never
85 + /// surfaces a "no rows" error to the caller. Matching is **exact-string** on
86 + /// the registered `redirect_uris` array; trailing slashes are significant
87 + /// (`https://x/cb` and `https://x/cb/` are distinct registrations), so apps
88 + /// must register every variant they intend to redirect to.
83 89 #[tracing::instrument(skip_all)]
84 90 pub async fn is_registered_redirect_uri(
85 91 pool: &PgPool,
86 92 app_id: SyncAppId,
87 93 uri: &str,
88 94 ) -> Result<bool> {
89 - let row: (bool,) = sqlx::query_as(
95 + let row: Option<(bool,)> = sqlx::query_as(
90 96 "SELECT $2 = ANY(redirect_uris) FROM sync_apps WHERE id = $1 AND is_active = true",
91 97 )
92 98 .bind(app_id)
93 99 .bind(uri)
94 - .fetch_one(pool)
100 + .fetch_optional(pool)
95 101 .await?;
96 102
97 - Ok(row.0)
103 + Ok(row.map(|r| r.0).unwrap_or(false))
98 104 }
@@ -50,6 +50,38 @@ pub async fn remove_completed(pool: &PgPool, ids: &[Uuid]) -> Result<()> {
50 50 Ok(())
51 51 }
52 52
53 + /// Returns true if any live row in the given bucket's storage tables still
54 + /// references `s3_key`. Used by the deletion worker to detect the
55 + /// delete-then-reupload race: a freshly-uploaded object reusing a queued key
56 + /// must not be torpedoed by the worker draining the queue.
57 + #[tracing::instrument(skip_all)]
58 + pub async fn is_s3_key_live(pool: &PgPool, bucket: &str, s3_key: &str) -> Result<bool> {
59 + let live = if bucket == "synckit" {
60 + sqlx::query_scalar::<_, bool>(
61 + "SELECT EXISTS(SELECT 1 FROM sync_blobs WHERE s3_key = $1)",
62 + )
63 + .bind(s3_key)
64 + .fetch_one(pool)
65 + .await?
66 + } else {
67 + sqlx::query_scalar::<_, bool>(
68 + r#"
69 + SELECT
70 + EXISTS(SELECT 1 FROM media_files WHERE s3_key = $1)
71 + OR EXISTS(SELECT 1 FROM versions WHERE s3_key = $1)
72 + OR EXISTS(SELECT 1 FROM ota_artifacts WHERE s3_key = $1)
73 + OR EXISTS(SELECT 1 FROM items
74 + WHERE audio_s3_key = $1 OR cover_s3_key = $1 OR video_s3_key = $1)
75 + OR EXISTS(SELECT 1 FROM content_insertions WHERE storage_key = $1)
76 + "#,
77 + )
78 + .bind(s3_key)
79 + .fetch_one(pool)
80 + .await?
81 + };
82 + Ok(live)
83 + }
84 +
53 85 /// Fetch stale pending deletions (older than min_age, up to limit).
54 86 /// Atomically increments attempt count.
55 87 #[tracing::instrument(skip_all)]
@@ -15,9 +15,15 @@ pub async fn record_pending_upload(
15 15 // Re-presigning the same key (idempotent retry, multi-part flow) must refresh
16 16 // `created_at`, otherwise the stale-pending reaper can delete a freshly-pending
17 17 // object that's actively being uploaded right now.
18 + //
19 + // Pin the refresh to the original owner: if a different user collides on the
20 + // same key, do NOT refresh — let the reaper age the original row out on its
21 + // own schedule. Otherwise a re-presign loop by another principal could keep
22 + // an orphan object alive indefinitely.
18 23 sqlx::query(
19 24 "INSERT INTO pending_uploads (user_id, s3_key, bucket) VALUES ($1, $2, $3)
20 - ON CONFLICT (s3_key) DO UPDATE SET created_at = NOW(), user_id = EXCLUDED.user_id",
25 + ON CONFLICT (s3_key) DO UPDATE SET created_at = NOW()
26 + WHERE pending_uploads.user_id = EXCLUDED.user_id",
21 27 )
22 28 .bind(user_id)
23 29 .bind(s3_key)
@@ -25,6 +25,60 @@ pub async fn create_user_session(
25 25 Ok(row)
26 26 }
27 27
28 + /// Insert a `kind='pending_2fa'` row for an intermediate session held between
29 + /// the password step and the TOTP/backup-code step. Exposed to
30 + /// `delete_all_sessions_for_user` so "log out everywhere" sweeps a phisher
31 + /// who has the password but not the second factor.
32 + #[tracing::instrument(skip_all)]
33 + pub async fn create_pending_2fa_session(
34 + pool: &PgPool,
35 + user_id: UserId,
36 + user_agent: Option<&str>,
37 + ip_address: Option<&str>,
38 + ) -> Result<UserSessionId> {
39 + let row = sqlx::query_scalar::<_, UserSessionId>(
40 + "INSERT INTO user_sessions (user_id, user_agent, ip_address, kind)
41 + VALUES ($1, $2, $3, 'pending_2fa') RETURNING id",
42 + )
43 + .bind(user_id)
44 + .bind(user_agent)
45 + .bind(ip_address)
46 + .fetch_one(pool)
47 + .await?;
48 +
49 + Ok(row)
50 + }
51 +
52 + /// Confirm the pending_2fa tracking row is still present (i.e. wasn't swept
53 + /// by `delete_all_sessions_for_user` while the user was at the TOTP prompt).
54 + #[tracing::instrument(skip_all)]
55 + pub async fn pending_2fa_session_exists(
56 + pool: &PgPool,
57 + id: UserSessionId,
58 + user_id: UserId,
59 + ) -> Result<bool> {
60 + let exists: bool = sqlx::query_scalar(
61 + "SELECT EXISTS(SELECT 1 FROM user_sessions WHERE id = $1 AND user_id = $2 AND kind = 'pending_2fa')",
62 + )
63 + .bind(id)
64 + .bind(user_id)
65 + .fetch_one(pool)
66 + .await?;
67 + Ok(exists)
68 + }
69 +
70 + /// Delete a pending_2fa tracking row. Called when 2FA succeeds (the caller
71 + /// then `track_session`s a fresh 'active' row) or when the pending state is
72 + /// cleared (expiry, account lockout, navigation away).
73 + #[tracing::instrument(skip_all)]
74 + pub async fn delete_pending_2fa_session(pool: &PgPool, id: UserSessionId) -> Result<()> {
75 + sqlx::query("DELETE FROM user_sessions WHERE id = $1 AND kind = 'pending_2fa'")
76 + .bind(id)
77 + .execute(pool)
78 + .await?;
79 + Ok(())
80 + }
81 +
28 82 /// Result of touching a session: whether it exists and the user's current
29 83 /// suspended status (live from the `users` table, not cached in the session).
30 84 pub struct TouchResult {
@@ -171,16 +225,16 @@ pub async fn delete_other_sessions(
171 225 pool: &PgPool,
172 226 current_session_id: UserSessionId,
173 227 user_id: UserId,
174 - ) -> Result<u64> {
175 - let rows = sqlx::query(
176 - "DELETE FROM user_sessions WHERE user_id = $1 AND id != $2",
228 + ) -> Result<Vec<UserSessionId>> {
229 + let ids: Vec<UserSessionId> = sqlx::query_scalar(
230 + "DELETE FROM user_sessions WHERE user_id = $1 AND id != $2 RETURNING id",
177 231 )
178 232 .bind(user_id)
179 233 .bind(current_session_id)
180 - .execute(pool)
234 + .fetch_all(pool)
181 235 .await?;
182 236
183 - Ok(rows.rows_affected())
237 + Ok(ids)
184 238 }
185 239
186 240 /// Delete ALL sessions for a user. Returns the deleted session IDs (for cache eviction).
@@ -396,8 +396,13 @@ pub async fn has_active_subscription_to_project(
396 396 user_id: UserId,
397 397 project_id: ProjectId,
398 398 ) -> Result<bool> {
399 + // Defense-in-depth on a missed/delayed `customer.subscription.deleted`
400 + // webhook: also reject when the current period has ended. status='active'
401 + // alone trusts Stripe to push the cancellation event promptly.
399 402 let count: i64 = sqlx::query_scalar(
400 - "SELECT COUNT(*) FROM subscriptions WHERE subscriber_id = $1 AND project_id = $2 AND status = 'active' AND paused_at IS NULL",
403 + "SELECT COUNT(*) FROM subscriptions \
404 + WHERE subscriber_id = $1 AND project_id = $2 AND status = 'active' AND paused_at IS NULL \
405 + AND (current_period_end IS NULL OR current_period_end > NOW())",
401 406 )
402 407 .bind(user_id)
403 408 .bind(project_id)
@@ -20,7 +20,11 @@ pub async fn get_totp_secret(pool: &PgPool, user_id: UserId) -> Result<Option<St
20 20 /// Store a TOTP secret for a user (does not enable 2FA yet).
21 21 #[tracing::instrument(skip_all)]
22 22 pub async fn set_totp_secret(pool: &PgPool, user_id: UserId, secret: &str) -> Result<()> {
23 - sqlx::query("UPDATE users SET totp_secret = $2 WHERE id = $1")
23 + // Clear the replay step alongside the secret. Without this, a user who
24 + // disables and re-enables TOTP (potentially with a new secret) inherits a
25 + // stale `totp_last_used_step` and any first-attempt code in a lower step
26 + // window is false-rejected as a replay.
27 + sqlx::query("UPDATE users SET totp_secret = $2, totp_last_used_step = NULL WHERE id = $1")
24 28 .bind(user_id)
25 29 .bind(secret)
26 30 .execute(pool)
@@ -43,7 +47,7 @@ pub async fn enable_totp(pool: &PgPool, user_id: UserId) -> Result<()> {
43 47 /// Disable TOTP 2FA: clear the secret, set enabled to false, delete backup codes.
44 48 #[tracing::instrument(skip_all)]
45 49 pub async fn disable_totp(pool: &PgPool, user_id: UserId) -> Result<()> {
46 - sqlx::query("UPDATE users SET totp_secret = NULL, totp_enabled = false WHERE id = $1")
50 + sqlx::query("UPDATE users SET totp_secret = NULL, totp_enabled = false, totp_last_used_step = NULL WHERE id = $1")
47 51 .bind(user_id)
48 52 .execute(pool)
49 53 .await?;
@@ -406,9 +406,12 @@ pub async fn claim_free_with_promo_code(
406 406
407 407 crate::db::items::increment_sales_count(&mut *tx, params.item_id).await?;
408 408
409 - // Step 5: Create license key inside the same transaction if requested
409 + // Step 5: Create license key inside the same transaction if requested.
410 + // Retry once on a unique-violation: the wordlist generator has ~6B-coin-
411 + // flip headroom, so an actual collision is vanishingly rare, but the
412 + // alternative is surfacing a 500 to a buyer mid-claim — cheap to handle.
410 413 if let Some(lk) = license_key_params {
411 - sqlx::query(
414 + let attempt = sqlx::query(
412 415 r#"
413 416 INSERT INTO license_keys (item_id, owner_id, transaction_id, key_code, max_activations)
414 417 VALUES ($1, $2, NULL, $3, $4)
@@ -419,7 +422,28 @@ pub async fn claim_free_with_promo_code(
419 422 .bind(lk.key_code)
420 423 .bind(lk.max_activations)
421 424 .execute(&mut *tx)
422 - .await?;
425 + .await;
426 +
427 + if let Err(sqlx::Error::Database(e)) = &attempt
428 + && e.code().as_deref() == Some("23505")
429 + {
430 + let retry_code = crate::helpers::generate_key_code();
431 + tracing::warn!(item_id = %params.item_id, "license key 23505 collision; retrying once");
432 + sqlx::query(
433 + r#"
434 + INSERT INTO license_keys (item_id, owner_id, transaction_id, key_code, max_activations)
435 + VALUES ($1, $2, NULL, $3, $4)
436 + "#,
437 + )
438 + .bind(params.item_id)
439 + .bind(params.buyer_id)
440 + .bind(&retry_code)
441 + .bind(lk.max_activations)
442 + .execute(&mut *tx)
443 + .await?;
444 + } else {
445 + attempt?;
446 + }
423 447 }
424 448
425 449 tx.commit().await?;
@@ -317,11 +317,12 @@ pub async fn delete_version(pool: &PgPool, version_id: VersionId) -> Result<()>
317 317 /// Sum all version file sizes for a given item (for storage decrement on item delete).
318 318 #[tracing::instrument(skip_all)]
319 319 pub async fn sum_file_sizes_for_item(pool: &PgPool, item_id: super::ItemId) -> Result<i64> {
320 - // SUM over many bigints widens to NUMERIC in Postgres; cap at i64::MAX
321 - // before casting back to BIGINT so a pathological row count can't overflow
322 - // the i64 the rest of the codebase passes around.
320 + // SUM over many bigints widens to NUMERIC in Postgres; clamp on both
321 + // sides (>=0 and <=i64::MAX) before casting back to BIGINT — without
322 + // GREATEST(0, ...), a corrupt-negative row could propagate a negative
323 + // total that later under-flows storage accounting.
323 324 let total: i64 = sqlx::query_scalar(
324 - "SELECT COALESCE(LEAST(SUM(file_size_bytes), 9223372036854775807)::BIGINT, 0) FROM versions WHERE item_id = $1 AND file_size_bytes IS NOT NULL",
325 + "SELECT COALESCE(GREATEST(0, LEAST(SUM(file_size_bytes), 9223372036854775807))::BIGINT, 0) FROM versions WHERE item_id = $1 AND file_size_bytes IS NOT NULL",
325 326 )
326 327 .bind(item_id)
327 328 .fetch_one(pool)
@@ -1,21 +1,36 @@
1 1 //! Formatting utilities: prices, file sizes, initials, slugs, CSV cells.
2 2
3 + /// Group thousands with commas (US locale). Returns the input string unchanged
4 + /// for values ≤999. Operates on a digit-only string so callers stay in i64
5 + /// arithmetic territory and don't need `f64` formatting tricks.
6 + fn group_thousands(n: u64) -> String {
7 + let s = n.to_string();
8 + let bytes = s.as_bytes();
9 + let mut out = String::with_capacity(bytes.len() + bytes.len() / 3);
10 + for (i, &b) in bytes.iter().enumerate() {
11 + if i > 0 && (bytes.len() - i) % 3 == 0 {
12 + out.push(',');
13 + }
14 + out.push(b as char);
15 + }
16 + out
17 + }
18 +
3 19 /// Format a price in cents as a human-readable dollar string or "Free".
4 20 pub fn format_price(cents: impl Into<i64>) -> String {
5 21 let cents: i64 = cents.into();
6 22 if cents == 0 {
7 - "Free".to_string()
8 - } else if cents < 0 {
9 - let abs = (cents as f64).abs();
10 - if cents % 100 == 0 {
11 - format!("-${}", (abs / 100.0) as u64)
12 - } else {
13 - format!("-${:.2}", abs / 100.0)
14 - }
15 - } else if cents % 100 == 0 {
16 - format!("${}", cents / 100)
23 + return "Free".to_string();
24 + }
25 + let neg = cents < 0;
26 + let abs = cents.unsigned_abs();
27 + let dollars = group_thousands(abs / 100);
28 + let frac = (abs % 100) as u32;
29 + let sign = if neg { "-" } else { "" };
30 + if frac == 0 {
31 + format!("{sign}${dollars}")
17 32 } else {
18 - format!("${:.2}", cents as f64 / 100.0)
33 + format!("{sign}${dollars}.{frac:02}")
19 34 }
20 35 }
21 36
@@ -23,11 +38,12 @@ pub fn format_price(cents: impl Into<i64>) -> String {
23 38 ///
24 39 /// Unlike [`format_price`], this never returns "Free" -- zero revenue is "$0.00".
25 40 pub fn format_revenue(cents: i64) -> String {
26 - if cents < 0 {
27 - format!("-${:.2}", (cents as f64).abs() / 100.0)
28 - } else {
29 - format!("${:.2}", cents as f64 / 100.0)
30 - }
41 + let neg = cents < 0;
42 + let abs = cents.unsigned_abs();
43 + let dollars = group_thousands(abs / 100);
44 + let frac = (abs % 100) as u32;
45 + let sign = if neg { "-" } else { "" };
46 + format!("{sign}${dollars}.{frac:02}")
31 47 }
32 48
33 49 /// Format a byte count as a human-readable file size string.
@@ -192,7 +208,23 @@ mod tests {
192 208
193 209 #[test]
194 210 fn format_revenue_large_amount() {
195 - assert_eq!(format_revenue(1_000_000), "$10000.00");
211 + assert_eq!(format_revenue(1_000_000), "$10,000.00");
212 + }
213 +
214 + #[test]
215 + fn format_revenue_million_dollars() {
216 + assert_eq!(format_revenue(100_000_000), "$1,000,000.00");
217 + }
218 +
219 + #[test]
220 + fn format_price_thousands() {
221 + assert_eq!(format_price(1_234_500), "$12,345");
222 + assert_eq!(format_price(1_234_567), "$12,345.67");
223 + }
224 +
225 + #[test]
226 + fn format_price_negative_thousands() {
227 + assert_eq!(format_price(-1_234_567i64), "-$12,345.67");
196 228 }
197 229
198 230 #[test]
@@ -449,7 +481,7 @@ mod tests {
449 481 #[test]
450 482 fn format_price_large_value() {
451 483 // $1 billion in cents
452 - assert_eq!(format_price(100_000_000_000i64), "$1000000000");
484 + assert_eq!(format_price(100_000_000_000i64), "$1,000,000,000");
453 485 }
454 486
455 487 #[test]
@@ -336,17 +336,22 @@ pub struct RelatedObject {
336 336 /// tolerance to prevent replay attacks.
337 337 pub fn verify_signature(payload: &str, header: &str, secret: &str) -> std::result::Result<(), String> {
338 338 let mut timestamp = None;
339 - let mut signature = None;
339 + // Stripe emits a `v1=` value per active secret during rotation; collect
340 + // them all and accept if any matches. The previous single-Option only
341 + // kept the last value parsed, which silently broke rotation.
342 + let mut signatures: Vec<&str> = Vec::new();
340 343 for part in header.split(',') {
341 344 if let Some(t) = part.strip_prefix("t=") {
342 345 timestamp = Some(t);
343 346 } else if let Some(s) = part.strip_prefix("v1=") {
344 - signature = Some(s);
347 + signatures.push(s);
345 348 }
346 349 }
347 350
348 351 let timestamp = timestamp.ok_or("missing timestamp in signature header")?;
349 - let expected_sig = signature.ok_or("missing v1 signature in header")?;
352 + if signatures.is_empty() {
353 + return Err("missing v1 signature in header".to_string());
354 + }
350 355
351 356 let ts_secs: u64 = timestamp.parse().map_err(|_| "invalid timestamp")?;
352 357 let now_secs = std::time::SystemTime::now()
@@ -362,18 +367,25 @@ pub fn verify_signature(payload: &str, header: &str, secret: &str) -> std::resul
362 367 }
363 368
364 369 let signed_payload = format!("{}.{}", timestamp, payload);
370 + let mut last_err = "signature mismatch".to_string();
371 +
372 + for expected_sig in &signatures {
373 + let expected_bytes = match hex::decode(expected_sig) {
374 + Ok(b) => b,
375 + Err(_) => {
376 + last_err = "invalid hex in v1 signature".to_string();
377 + continue;
378 + }
379 + };
380 + let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
381 + .map_err(|_| "invalid HMAC key")?;
382 + mac.update(signed_payload.as_bytes());
383 + if mac.verify_slice(&expected_bytes).is_ok() {
384 + return Ok(());
385 + }
386 + }
365 387
366 - let expected_bytes = hex::decode(expected_sig)
367 - .map_err(|_| "invalid hex in v1 signature")?;
368 -
369 - let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
370 - .map_err(|_| "invalid HMAC key")?;
371 - mac.update(signed_payload.as_bytes());
372 -
373 - mac.verify_slice(&expected_bytes)
374 - .map_err(|_| "signature mismatch".to_string())?;
375 -
376 - Ok(())
388 + Err(last_err)
377 389 }
378 390
379 391 #[cfg(test)]
@@ -343,6 +343,7 @@ pub(super) async fn delete_project(
343 343 url,
344 344 state.config.cdn_base_url.as_deref(),
345 345 state.s3.as_deref().map(|s| s.bucket()),
346 + state.config.storage.as_ref().map(|c| c.endpoint.as_str()),
346 347 )
347 348 {
348 349 all_keys.push((key, "main".to_string()));
@@ -157,10 +157,12 @@ pub(in crate::routes::api) async fn update_password(
157 157 .ok()
158 158 .flatten();
159 159 if let Some(current_id) = current_tracking_id {
160 - let count = db::sessions::delete_other_sessions(&state.db, current_id, user.id).await?;
161 - state.session_cache.retain(|id, _| *id == current_id);
162 - if count > 0 {
163 - tracing::info!(user_id = %user.id, revoked = count, event = "password_change_revoke_sessions", "Revoked other sessions on password change");
160 + let revoked_ids = db::sessions::delete_other_sessions(&state.db, current_id, user.id).await?;
161 + for id in &revoked_ids {
162 + state.session_cache.remove(id);
163 + }
164 + if !revoked_ids.is_empty() {
165 + tracing::info!(user_id = %user.id, revoked = revoked_ids.len(), event = "password_change_revoke_sessions", "Revoked other sessions on password change");
164 166 }
165 167 }
166 168