Skip to main content

max / makenotwork

Ultra Fuzz Run 27: 32 of 33 audit items remediated Closes Run 27 Phase 1 (CRITICAL + SERIOUS) and most of Phase 6 (architectural). Only UX-M3 (per-field AppError::Validation refactor, 187 call sites) remains deferred. CRITICAL - C1 item_image_confirm: scan runs before storage swap; old cover swapped via try_replace_storage and enqueued for durable deletion only after commit. Mirrors the project_image_confirm exemplar. - C2 confirm_upload: UPDATE failure now restores old_bytes via swap-back (try_replace_storage with i64::MAX cap) and deletes the new S3 object. - C3 media_confirm: every INSERT failure (not just unique-conflict) decrements storage and deletes the S3 object. - C4 extract_client_ip: dropped XFF fallback; CF-Connecting-IP only. Defense-in-depth tests added. - H6 idempotency middleware: skips caching cleanly when content-length missing; returns 500 only on actual body-vs-header mismatch. Storage - STO-S1 Content-Length binding: StorageBackend trait + s3-storage inner method + InMemoryStorage take max_bytes: Option<i64>. Item-content presign validates declared file_size_bytes against per-type + tier caps and binds exact size; static JS uploaders send file.size. Other presign endpoints pass None for incremental adoption. - STO-S4/S7 versions: update_version_file takes expected_old_s3_key and returns Ok(None) on lost race; caller rolls back via try_replace_storage swap-back and deletes orphaned S3 object. - STO-S6 hard caps: bumped LIMIT 500 -> 5000 + WARN on hit (cursor pagination deferred to Phase 8). - STO-S8 delete_version: row delete + storage refund in one tx; S3 deletion enqueued. Route handler simplified. - STO-S10 media_delete: DB delete + decrement in single tx, then best-effort S3; pending_s3_deletions queue is the safety net. - STO-S11 record_pending_upload: ON CONFLICT DO UPDATE SET created_at. - STO-S2/S3 extract_s3_key_from_url: explicit cdn_base + bucket prefixes replace find("projects/") heuristic; regression test pins behavior. Security - SEC-S2 git_ssh: SSH-supplied owner string validated via Username::new before DB lookup or shell reconstruction. - SEC-S3: SESSION_TOUCH_CACHE_SECS lowered 30s -> 5s. - SEC-S4: jwt_invalidated_at bumped in suspend_user, deactivate_user, terminate_user, and delete_all_sessions_for_user. - SEC-M3 backup codes: HMAC-SHA256 -> Argon2id (8 MiB, 1 iter, unique salt); verify_and_consume_backup_code iterates unused rows and accepts both Argon2 PHC and legacy HMAC for dual-read. Existing codes remain valid until regenerated. - SEC-M5: 10-minute TTL on pending_2fa_user_id (started_at timestamp + clear_pending_2fa helper; both /auth/2fa GET and verify POST enforce). - SEC-Surprise: constant_time_compare now subtle::ConstantTimeEq + length check; subtle 2.6 added as direct dep. Payments - PAY-S1 compute_splits: total_split_pct.min(100) + expected_total.min(amount). - PAY-S2: atomic promo-use increment now checks starts_at. - PAY-S3: FOR SHARE row lock on guest auto-attach lookup. - PAY-M2: license keys bumped 5 -> 6 words (~55 -> ~66 bits entropy). UX - UX-H1 WishlistItem::price_display() replaces inline cents-to-dollars math in cart.html and the two library tab partials. - UX-H2: verified wizard.css uses words "Done"/"Todo", no glyphs. - UX-H3: login + reset-password re-render their templates with prefilled values + inline error banner instead of bouncing to global error.html. - UX-H4: git/settings.html form gated on CSRF token presence; missing state shows "Log in again" deep link. - UX-H5: all CSRF middleware rejections route through AppError::Forbidden / AppError::Internal so ErrorTemplate renders. Performance - PERF-H1: CPU-bound scan layers (sha256/content-type/structural/archive/ yara) run in spawn_blocking; ClamAV runs concurrently via tokio::join!. - PERF-H2: build runner now upload_multipart(path); temp file removed even on failure. - PERF-H3: tick-duration WARN at 30s + WAM ticket at 50s with 1h cooldown. Parallel-where-safe subsystem refactor deferred. - PERF-H4: item + blog-post announcement fan-outs spawned off the lock-held tick. - PERF-H5: s3-storage gained delete_objects (DeleteObjects, 1000/call); delete_prefix uses it; StorageBackend trait + S3Client wrapper added; cleanup.rs purge_deleted_items uses batched form. - PERF-M2 monitor: cooldown gates both admin email and subscriber fan-out; 100ms pacing between subscriber sends. - PERF-M5: get_build_log_size (SELECT octet_length(log) + right marker compare) replaces full-row get_build in append_log_bounded. Deferred - STO-S1 partial: other presign endpoints pass None pending opt-in size declaration in the corresponding JSON request bodies. - STO-S6: cursor pagination deferred to Phase 8 (8 callers). - PERF-H1: streaming download itself still buffered (needs ranged-GET wrapper). - PERF-H3: subsystem parallelism deferred (needs explicit dep graph). - UX-M3: 187 call sites of AppError::Validation; deferred to dedicated effort. Full audit and remediation log: _private/docs/mnw/server-docs/todo.md
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-22 04:08 UTC
Commit: e36f906b7f579cd88bd24d755117a654a6edac2d
Parent: 6e4f24c
53 files changed, +1114 insertions, -324 deletions
@@ -3599,6 +3599,7 @@ dependencies = [
3599 3599 "sha1 0.10.6",
3600 3600 "sha2 0.10.9",
3601 3601 "sqlx",
3602 + "subtle",
3602 3603 "syntect",
3603 3604 "tagtree",
3604 3605 "tempfile",
@@ -68,6 +68,7 @@ rand = "0.9"
68 68 hmac = "0.12.1"
69 69 sha1 = "0.10.6"
70 70 sha2 = "0.10.9"
71 + subtle = "2.6"
71 72 hex = "0.4.3"
72 73 base64 = "0.22.1"
73 74
@@ -392,22 +392,31 @@ async fn execute_target(
392 392 String::new()
393 393 };
394 394
395 - // Upload to S3
396 - let data = tokio::fs::read(&local_tmp)
397 - .await
398 - .map_err(|e| format!("failed to read artifact: {e}"))?;
399 -
400 - let _ = tokio::fs::remove_file(&local_tmp).await;
401 -
395 + // Upload to S3 via multipart streaming from disk — the previous
396 + // implementation `tokio::fs::read` → `Vec<u8>` → `upload_object` pinned
397 + // the entire artifact (up to ~100 MB per build) in RAM during upload.
398 + // `upload_multipart` reads the file in chunks and lets the S3 SDK do
399 + // parallel part uploads, keeping memory bounded regardless of artifact
400 + // size.
402 401 let synckit_s3 = state
403 402 .synckit_s3
404 403 .as_ref()
405 404 .ok_or("SyncKit storage not configured")?;
406 405
407 - synckit_s3
408 - .upload_object(&s3_key, "application/octet-stream", data, None)
406 + let upload_result = synckit_s3
407 + .upload_multipart(
408 + &s3_key,
409 + "application/octet-stream",
410 + std::path::Path::new(&local_tmp),
411 + )
409 412 .await
410 - .map_err(|e| format!("S3 upload failed: {e}"))?;
413 + .map_err(|e| format!("S3 multipart upload failed: {e}"));
414 +
415 + // Always remove the local temp file, even if the upload failed — leaving
416 + // it on disk fills the build runner's tmp directory across retries.
417 + let _ = tokio::fs::remove_file(&local_tmp).await;
418 +
419 + upload_result?;
411 420
412 421 if !signature.is_empty() {
413 422 let _ = append_log_bounded(
@@ -501,19 +510,22 @@ async fn run_scp_download(
501 510 }
502 511
503 512 /// Append to build log, respecting the max log size.
513 + ///
514 + /// Probes `octet_length(log)` instead of fetching the whole row (the log
515 + /// column tops out at 5 MiB and is read on every line append).
504 516 async fn append_log_bounded(
505 517 state: &AppState,
506 518 build_id: db::BuildId,
507 519 line: &str,
508 520 ) -> crate::error::Result<()> {
509 - // Check current log size (approximate — avoids fetching the full log)
510 - let build = db::builds::get_build(&state.db, build_id).await?;
511 - if let Some(b) = build
512 - && b.log.len() + line.len() > BUILD_MAX_LOG_BYTES
521 + const TRUNCATED: &str = "[log truncated]\n";
522 + if let Some((current_len, already_truncated)) =
523 + db::builds::get_build_log_size(&state.db, build_id, TRUNCATED).await?
524 + && (current_len as usize) + line.len() > BUILD_MAX_LOG_BYTES
513 525 {
514 - if !b.log.ends_with("[log truncated]\n") {
526 + if !already_truncated {
515 527 tracing::warn!(build_id = %build_id, "Build log exceeded {} bytes, truncating", BUILD_MAX_LOG_BYTES);
516 - db::builds::append_build_log(&state.db, build_id, "[log truncated]\n").await?;
528 + db::builds::append_build_log(&state.db, build_id, TRUNCATED).await?;
517 529 }
518 530 return Ok(());
519 531 }
@@ -15,11 +15,18 @@ pub const DB_IDLE_TIMEOUT_SECS: u64 = 600;
15 15
16 16 // -- Sessions --
17 17 pub const SESSION_EXPIRY_DAYS: i64 = 7;
18 - pub const SESSION_TOUCH_CACHE_SECS: u64 = 30; // Skip DB touch if validated within this window
18 + /// Skip DB touch if validated within this window. Doubles as the upper bound on
19 + /// session-revocation lag (admin suspend, logout-everywhere, password change) —
20 + /// shorter = tighter revocation, slightly more DB load on the auth hot path.
21 + pub const SESSION_TOUCH_CACHE_SECS: u64 = 5;
19 22
20 23 // -- Login security --
21 24 pub const MAX_LOGIN_ATTEMPTS: i32 = 5;
22 25 pub const LOCKOUT_MINUTES: i64 = 15;
26 + /// How long a half-completed login (password verified, awaiting 2FA) stays
27 + /// valid before the user must re-enter their password. Defends against the
28 + /// "unattended browser one TOTP from logged in" failure mode.
29 + pub const PENDING_2FA_TTL_SECS: i64 = 600;
23 30
24 31 // -- Email link expiry (seconds) --
25 32 pub const PASSWORD_RESET_EXPIRY_SECS: i64 = 900; // 15 minutes
@@ -1,30 +1,36 @@
1 1 //! Cryptographic utilities: constant-time comparison, key generation, feed signing.
2 2
3 - /// Constant-time string comparison to prevent timing attacks.
3 + /// Constant-time byte comparison for tokens, MACs, and other fixed-shape
4 + /// secrets. Backed by [`subtle::ConstantTimeEq`] (audited reference impl)
5 + /// instead of a hand-rolled XOR loop wrapped in cosmetic SHA-256.
4 6 ///
5 - /// Hashes both inputs with SHA-256 before comparing to avoid leaking
6 - /// the length of the expected value via early return.
7 + /// Length mismatch short-circuits — leaking the length of fixed-format
8 + /// tokens (hex-encoded HMACs, CSRF tokens, PKCE verifiers, base64 secrets)
9 + /// reveals nothing useful to an attacker, since the format already fixes
10 + /// the length. Don't use this for variable-length sensitive payloads
11 + /// where length is itself secret.
7 12 pub fn constant_time_compare(a: &str, b: &str) -> bool {
8 - use sha2::{Sha256, Digest};
9 -
10 - let hash_a = Sha256::digest(a.as_bytes());
11 - let hash_b = Sha256::digest(b.as_bytes());
12 -
13 - let mut result = 0u8;
14 - for (x, y) in hash_a.iter().zip(hash_b.iter()) {
15 - result |= x ^ y;
13 + use subtle::ConstantTimeEq;
14 + let a = a.as_bytes();
15 + let b = b.as_bytes();
16 + if a.len() != b.len() {
17 + return false;
16 18 }
17 - result == 0
19 + a.ct_eq(b).into()
18 20 }
19 21
20 - /// Generate a license key code in word-word-word-word-word format.
22 + /// Generate a license key code in word-word-word-word-word-word format.
21 23 ///
22 - /// Uses 5 random words from the 2048-word list (~55 bits of entropy).
23 - /// Returns a `KeyCode` via `from_trusted` — the wordlist guarantees validity.
24 + /// Six random words from the 2048-word list (~66 bits of entropy). Six was
25 + /// chosen over five (~55 bits) after a birthday-collision review: at five
26 + /// words, ~190M keys gives a coin-flip chance of collision; at six, the
27 + /// equivalent threshold rises to ~6B keys — far past the lifetime cap of
28 + /// any realistic license catalog. Returns a `KeyCode` via `from_trusted` —
29 + /// the wordlist guarantees validity.
24 30 pub fn generate_key_code() -> crate::db::KeyCode {
25 31 use rand::Rng;
26 32 let mut rng = rand::rng();
27 - let words: Vec<&str> = (0..5)
33 + let words: Vec<&str> = (0..6)
28 34 .map(|_| {
29 35 let idx = rng.random_range(0..crate::wordlist::WORDLIST.len());
30 36 crate::wordlist::WORDLIST[idx]
@@ -104,7 +110,7 @@ mod tests {
104 110 fn key_code_format() {
105 111 let code = generate_key_code();
106 112 let parts: Vec<&str> = code.split('-').collect();
107 - assert_eq!(parts.len(), 5, "Key code should have 5 words");
113 + assert_eq!(parts.len(), 6, "Key code should have 6 words");
108 114 for word in &parts {
109 115 assert!(word.len() >= 3, "Each word should be at least 3 chars: {}", word);
110 116 assert!(word.len() <= 6, "Each word should be at most 6 chars: {}", word);
@@ -171,11 +171,11 @@ pub async fn csrf_middleware(request: Request, next: Next) -> Response {
171 171 Ok(true) => next.run(request).await,
172 172 Ok(false) => {
173 173 tracing::warn!(path = %path, "CSRF token mismatch");
174 - (StatusCode::FORBIDDEN, "Invalid CSRF token").into_response()
174 + crate::error::AppError::Forbidden.into_response()
175 175 }
176 176 Err(e) => {
177 177 tracing::error!(error = ?e, "CSRF validation error");
178 - (StatusCode::INTERNAL_SERVER_ERROR, "CSRF validation error").into_response()
178 + crate::error::AppError::Internal(anyhow::anyhow!("CSRF validation error")).into_response()
179 179 }
180 180 };
181 181 }
@@ -223,7 +223,7 @@ pub async fn csrf_middleware(request: Request, next: Next) -> Response {
223 223 Some(t) => t,
224 224 None => {
225 225 tracing::warn!(path = %path, "CSRF token missing from form body");
226 - return (StatusCode::FORBIDDEN, "CSRF token required").into_response();
226 + return crate::error::AppError::Forbidden.into_response();
227 227 }
228 228 };
229 229
@@ -156,6 +156,30 @@ pub async fn get_build(pool: &PgPool, build_id: BuildId) -> Result<Option<DbBuil
156 156 Ok(build)
157 157 }
158 158
159 + /// Return the current `octet_length` of a build's log column, plus whether
160 + /// it already ends with `marker` (server-side string compare so the full
161 + /// log column never travels back over the wire).
162 + ///
163 + /// Used by `append_log_bounded` to make the cap check without pulling the
164 + /// 5 MiB log row into memory on every line.
165 + #[tracing::instrument(skip_all)]
166 + pub async fn get_build_log_size(
167 + pool: &PgPool,
168 + build_id: BuildId,
169 + marker: &str,
170 + ) -> Result<Option<(i64, bool)>> {
171 + let row: Option<(i64, bool)> = sqlx::query_as(
172 + "SELECT octet_length(log)::BIGINT, right(log, char_length($2)) = $2 \
173 + FROM ota_builds WHERE id = $1",
174 + )
175 + .bind(build_id)
176 + .bind(marker)
177 + .fetch_optional(pool)
178 + .await?;
179 +
180 + Ok(row)
181 + }
182 +
159 183 /// List builds for an app, newest first.
160 184 #[tracing::instrument(skip_all)]
161 185 pub async fn list_builds_by_app(
@@ -116,12 +116,15 @@ pub async fn get_by_user_folder_name(
116 116
117 117 /// Delete a media file by ID.
118 118 #[tracing::instrument(skip_all)]
119 - pub async fn delete(pool: &PgPool, id: MediaFileId) -> Result<Option<DbMediaFile>> {
119 + pub async fn delete<'e>(
120 + executor: impl sqlx::PgExecutor<'e>,
121 + id: MediaFileId,
122 + ) -> Result<Option<DbMediaFile>> {
120 123 let row = sqlx::query_as::<_, DbMediaFile>(
121 124 "DELETE FROM media_files WHERE id = $1 RETURNING *",
122 125 )
123 126 .bind(id)
124 - .fetch_optional(pool)
127 + .fetch_optional(executor)
125 128 .await?;
126 129
127 130 Ok(row)
@@ -12,9 +12,12 @@ pub async fn record_pending_upload(
12 12 s3_key: &str,
13 13 bucket: &str,
14 14 ) -> Result<()> {
15 + // Re-presigning the same key (idempotent retry, multi-part flow) must refresh
16 + // `created_at`, otherwise the stale-pending reaper can delete a freshly-pending
17 + // object that's actively being uploaded right now.
15 18 sqlx::query(
16 19 "INSERT INTO pending_uploads (user_id, s3_key, bucket) VALUES ($1, $2, $3)
17 - ON CONFLICT (s3_key) DO NOTHING",
20 + ON CONFLICT (s3_key) DO UPDATE SET created_at = NOW(), user_id = EXCLUDED.user_id",
18 21 )
19 22 .bind(user_id)
20 23 .bind(s3_key)
@@ -186,7 +186,11 @@ pub async fn try_increment_use_count<'e>(
186 186 id: PromoCodeId,
187 187 ) -> Result<bool> {
188 188 let result = sqlx::query(
189 - "UPDATE promo_codes SET use_count = use_count + 1 WHERE id = $1 AND (max_uses IS NULL OR use_count < max_uses) AND (expires_at IS NULL OR expires_at > NOW())",
189 + "UPDATE promo_codes SET use_count = use_count + 1 \
190 + WHERE id = $1 \
191 + AND (max_uses IS NULL OR use_count < max_uses) \
192 + AND (expires_at IS NULL OR expires_at > NOW()) \
193 + AND (starts_at IS NULL OR starts_at <= NOW())",
190 194 )
191 195 .bind(id)
192 196 .execute(executor)
@@ -173,6 +173,11 @@ pub async fn delete_other_sessions(
173 173 }
174 174
175 175 /// Delete ALL sessions for a user. Returns the deleted session IDs (for cache eviction).
176 + ///
177 + /// Also bumps `users.jwt_invalidated_at` — without this, a stolen SyncKit JWT
178 + /// would survive a "log out everywhere" until its natural expiry. The two
179 + /// updates run sequentially (not in a transaction): a session row deleted
180 + /// without a JWT bump is harmless, the converse would leak access.
176 181 #[tracing::instrument(skip_all)]
177 182 pub async fn delete_all_sessions_for_user(
178 183 pool: &PgPool,
@@ -185,5 +190,10 @@ pub async fn delete_all_sessions_for_user(
185 190 .fetch_all(pool)
186 191 .await?;
187 192
193 + sqlx::query("UPDATE users SET jwt_invalidated_at = NOW() WHERE id = $1")
194 + .bind(user_id)
195 + .execute(pool)
196 + .await?;
197 +
188 198 Ok(ids)
189 199 }
@@ -120,25 +120,65 @@ pub async fn create_backup_codes(
120 120 Ok(())
121 121 }
122 122
123 - /// Verify a backup code hash and mark it as used if found.
124 - /// Returns true if the code was valid and consumed.
123 + /// Verify a backup code and mark it as used if found.
124 + ///
125 + /// `code` is the raw 8-char token the user typed; `legacy_hmac` is the
126 + /// HMAC-SHA256 of the same code (passed in pre-computed by the caller so the
127 + /// secret stays in route-layer scope). Returns `Ok(true)` when a matching
128 + /// unused code is consumed.
129 + ///
130 + /// Dual-read window: rows hashed under the old HMAC scheme remain valid
131 + /// until the user regenerates their backup codes (each regeneration writes
132 + /// fresh Argon2 hashes). Argon2 PHC strings begin with `$argon2`; anything
133 + /// else is treated as a legacy 64-char hex HMAC.
125 134 #[tracing::instrument(skip_all)]
126 135 pub async fn verify_and_consume_backup_code(
127 136 pool: &PgPool,
128 137 user_id: UserId,
129 - code_hash: &str,
138 + code: &str,
139 + legacy_hmac: &str,
130 140 ) -> Result<bool> {
131 - let result = sqlx::query(
132 - r#"
133 - UPDATE backup_codes
134 - SET used_at = NOW()
135 - WHERE user_id = $1
136 - AND code_hash = $2
137 - AND used_at IS NULL
138 - "#,
141 + use argon2::{password_hash::PasswordVerifier, Argon2, PasswordHash};
142 +
143 + let rows: Vec<(uuid::Uuid, String)> = sqlx::query_as(
144 + "SELECT id, code_hash FROM backup_codes WHERE user_id = $1 AND used_at IS NULL",
139 145 )
140 146 .bind(user_id)
141 - .bind(code_hash)
147 + .fetch_all(pool)
148 + .await?;
149 +
150 + let mut matched_id: Option<uuid::Uuid> = None;
151 + for (id, stored) in &rows {
152 + let is_match = if stored.starts_with("$argon2") {
153 + match PasswordHash::new(stored) {
154 + Ok(parsed) => Argon2::default()
155 + .verify_password(code.as_bytes(), &parsed)
156 + .is_ok(),
157 + Err(e) => {
158 + tracing::warn!(error = %e, "malformed argon2 backup code hash in DB; skipping");
159 + false
160 + }
161 + }
162 + } else {
163 + // Legacy HMAC-SHA256 hex. Length-equality short-circuits before
164 + // the constant-time compare, matching the existing behavior of
165 + // `crypto::constant_time_compare`.
166 + crate::crypto::constant_time_compare(stored, legacy_hmac)
167 + };
168 + if is_match {
169 + matched_id = Some(*id);
170 + break;
171 + }
172 + }
173 +
174 + let Some(id) = matched_id else {
175 + return Ok(false);
176 + };
177 +
178 + let result = sqlx::query(
179 + "UPDATE backup_codes SET used_at = NOW() WHERE id = $1 AND used_at IS NULL",
180 + )
181 + .bind(id)
142 182 .execute(pool)
143 183 .await?;
144 184
@@ -114,10 +114,13 @@ pub async fn update_user_password(pool: &PgPool, id: UserId, password_hash: &str
114 114 }
115 115
116 116 /// Self-deactivate an account (enter limbo state).
117 + ///
118 + /// Bumps `jwt_invalidated_at` so any outstanding SyncKit JWTs minted from
119 + /// this account stop authenticating immediately.
117 120 #[tracing::instrument(skip_all)]
118 121 pub async fn deactivate_user(pool: &PgPool, id: UserId) -> Result<()> {
119 122 sqlx::query(
120 - "UPDATE users SET deactivated_at = NOW(), updated_at = NOW() WHERE id = $1",
123 + "UPDATE users SET deactivated_at = NOW(), jwt_invalidated_at = NOW(), updated_at = NOW() WHERE id = $1",
121 124 )
122 125 .bind(id)
123 126 .execute(pool)
@@ -145,7 +148,7 @@ pub async fn reactivate_user(pool: &PgPool, id: UserId) -> Result<()> {
145 148 #[tracing::instrument(skip_all)]
146 149 pub async fn terminate_user(pool: &PgPool, id: UserId) -> Result<()> {
147 150 sqlx::query(
148 - "UPDATE users SET terminated_at = NOW(), updated_at = NOW() WHERE id = $1",
151 + "UPDATE users SET terminated_at = NOW(), jwt_invalidated_at = NOW(), updated_at = NOW() WHERE id = $1",
149 152 )
150 153 .bind(id)
151 154 .execute(pool)
@@ -417,6 +420,9 @@ pub async fn verify_user_email(pool: &PgPool, user_id: UserId) -> Result<()> {
417 420 // ── Suspension / Appeals ──
418 421
419 422 /// Suspend a user account, clearing any prior appeal fields.
423 + ///
424 + /// Bumps `jwt_invalidated_at` so SyncKit JWTs minted for this user expire
425 + /// at the next extractor check (subject to `SESSION_TOUCH_CACHE_SECS`).
420 426 #[tracing::instrument(skip_all)]
421 427 pub async fn suspend_user(pool: &PgPool, user_id: UserId, reason: &str) -> Result<()> {
422 428 sqlx::query(
@@ -424,6 +430,7 @@ pub async fn suspend_user(pool: &PgPool, user_id: UserId, reason: &str) -> Resul
424 430 UPDATE users
425 431 SET suspended_at = NOW(),
426 432 suspension_reason = $2,
433 + jwt_invalidated_at = NOW(),
427 434 appeal_text = NULL,
428 435 appeal_submitted_at = NULL,
429 436 appeal_decision = NULL,
@@ -55,18 +55,33 @@ pub async fn create_version(
55 55 Ok(version)
56 56 }
57 57
58 + /// Hard cap on rows returned by the non-paginated version listing. Items
59 + /// with this many versions are exceptional; if we ever hit the cap a warning
60 + /// fires so we can promote the caller to cursor pagination. Real pagination
61 + /// is deferred (Phase 6/8) — this constant just makes the truncation loud.
62 + pub const VERSIONS_LIST_HARD_CAP: i64 = 5000;
63 +
58 64 /// List all versions for an item, newest first.
59 65 ///
60 - /// Capped at 500 as a safety limit.
66 + /// Capped at `VERSIONS_LIST_HARD_CAP` as a safety limit. Hitting the cap is
67 + /// logged at WARN so we notice before a real user gets silently truncated.
61 68 #[tracing::instrument(skip_all)]
62 69 pub async fn get_versions_by_item(pool: &PgPool, item_id: ItemId) -> Result<Vec<DbVersion>> {
63 70 let versions = sqlx::query_as::<_, DbVersion>(
64 - "SELECT * FROM versions WHERE item_id = $1 ORDER BY created_at DESC LIMIT 500",
71 + "SELECT * FROM versions WHERE item_id = $1 ORDER BY created_at DESC LIMIT $2",
65 72 )
66 73 .bind(item_id)
74 + .bind(VERSIONS_LIST_HARD_CAP)
67 75 .fetch_all(pool)
68 76 .await?;
69 77
78 + if versions.len() as i64 == VERSIONS_LIST_HARD_CAP {
79 + tracing::warn!(
80 + %item_id, cap = VERSIONS_LIST_HARD_CAP,
81 + "get_versions_by_item hit hard cap; promote caller to cursor pagination"
82 + );
83 + }
84 +
70 85 Ok(versions)
71 86 }
72 87
@@ -177,30 +192,50 @@ pub async fn get_user_version_s3_keys(
177 192 JOIN projects p ON i.project_id = p.id
178 193 WHERE p.user_id = $1 AND v.s3_key IS NOT NULL
179 194 ORDER BY p.slug, i.sort_order, v.created_at DESC
180 - LIMIT 500
195 + LIMIT $2
181 196 "#,
182 197 )
183 198 .bind(user_id)
199 + .bind(VERSIONS_LIST_HARD_CAP)
184 200 .fetch_all(pool)
185 201 .await?;
186 202
203 + if rows.len() as i64 == VERSIONS_LIST_HARD_CAP {
204 + tracing::warn!(
205 + %user_id, cap = VERSIONS_LIST_HARD_CAP,
206 + "get_user_version_s3_keys (account export) hit hard cap; some files will be omitted from export"
207 + );
208 + }
209 +
187 210 Ok(rows)
188 211 }
189 212
190 213 /// Update a version's S3 key, file size, and file name in one query.
214 + ///
215 + /// `expected_old_s3_key` guards against a lost-update race: two concurrent
216 + /// confirms can both pass the idempotency gate (reading the same prior
217 + /// `version.s3_key`) and both succeed in incrementing storage; without this
218 + /// guard, the second's UPDATE silently overwrites the first's, leaking S3
219 + /// objects and double-charging storage.
220 + ///
221 + /// `Ok(None)` means the row exists but the `s3_key` no longer matches the
222 + /// expected value — the caller is responsible for the rollback (refund the
223 + /// storage increment, delete the new S3 object).
191 224 #[tracing::instrument(skip_all)]
192 225 pub async fn update_version_file(
193 226 pool: &PgPool,
194 227 version_id: VersionId,
228 + expected_old_s3_key: Option<&str>,
195 229 s3_key: &str,
196 230 file_size_bytes: Option<i64>,
197 231 file_name: Option<&str>,
198 - ) -> Result<DbVersion> {
232 + ) -> Result<Option<DbVersion>> {
199 233 let version = sqlx::query_as::<_, DbVersion>(
200 234 r#"
201 235 UPDATE versions
202 236 SET s3_key = $2, file_size_bytes = $3, file_name = $4
203 237 WHERE id = $1
238 + AND s3_key IS NOT DISTINCT FROM $5
204 239 RETURNING *
205 240 "#,
206 241 )
@@ -208,19 +243,69 @@ pub async fn update_version_file(
208 243 .bind(s3_key)
209 244 .bind(file_size_bytes)
210 245 .bind(file_name)
211 - .fetch_one(pool)
246 + .bind(expected_old_s3_key)
247 + .fetch_optional(pool)
212 248 .await?;
213 249
214 250 Ok(version)
215 251 }
216 252
217 - /// Delete a version by ID.
253 + /// Delete a version by ID, decrementing the owning user's storage counter
254 + /// and enqueuing its S3 object for durable deletion in the same transaction.
255 + ///
256 + /// Both the storage refund and the S3-delete enqueue must succeed together
257 + /// with the row delete — otherwise a future caller of this function could
258 + /// forget either step and leak storage credit or orphan an S3 object.
259 + /// `delete_version_row_only` exists for cases where the caller has already
260 + /// handled both side effects (e.g. cascading item delete that batches them).
218 261 #[tracing::instrument(skip_all)]
219 262 pub async fn delete_version(pool: &PgPool, version_id: VersionId) -> Result<()> {
263 + let Some(version) = get_version_by_id(pool, version_id).await? else {
264 + return Ok(()); // already gone — idempotent
265 + };
266 +
267 + // Look up the owning user so we can refund storage.
268 + let owner_id: Option<super::UserId> = sqlx::query_scalar(
269 + r#"
270 + SELECT p.user_id
271 + FROM versions v
272 + JOIN items i ON v.item_id = i.id
273 + JOIN projects p ON i.project_id = p.id
274 + WHERE v.id = $1
275 + "#,
276 + )
277 + .bind(version_id)
278 + .fetch_optional(pool)
279 + .await?;
280 +
281 + let mut tx = pool.begin().await?;
282 +
220 283 sqlx::query("DELETE FROM versions WHERE id = $1")
221 284 .bind(version_id)
222 - .execute(pool)
285 + .execute(&mut *tx)
223 286 .await?;
287 +
288 + if let Some(user_id) = owner_id
289 + && let Some(size) = version.file_size_bytes
290 + && size > 0
291 + {
292 + crate::db::creator_tiers::decrement_storage_used(&mut *tx, user_id, size).await?;
293 + }
294 +
295 + tx.commit().await?;
296 +
297 + if let Some(s3_key) = version.s3_key.as_deref() {
298 + if let Err(e) = crate::db::pending_s3_deletions::enqueue_deletions(
299 + pool,
300 + &[(s3_key.to_string(), "main".to_string())],
301 + "version_delete",
302 + )
303 + .await
304 + {
305 + tracing::warn!(error = ?e, version_id = %version_id, "failed to enqueue S3 deletion for version");
306 + }
307 + }
308 +
224 309 Ok(())
225 310 }
226 311
@@ -57,6 +57,16 @@ pub struct WishlistItem {
57 57 pub added_at: DateTime<Utc>,
58 58 }
59 59
60 + impl WishlistItem {
61 + /// Pre-formatted price string — call this from templates instead of doing
62 + /// inline cents-to-dollars math. Wraps the canonical `format_price` helper
63 + /// so wishlist rows render the same way as everywhere else in the app
64 + /// (handles the "Free" zero case, the whole-dollar case, etc).
65 + pub fn price_display(&self) -> String {
66 + crate::formatting::format_price(self.price_cents as i64)
67 + }
68 + }
69 +
60 70 /// Get the user's wishlist with item details.
61 71 #[tracing::instrument(skip_all)]
62 72 pub async fn get_wishlist(pool: &PgPool, user_id: UserId) -> Result<Vec<WishlistItem>> {
@@ -76,7 +76,14 @@ async fn exec_git_operation(
76 76 let (operation, repo_path) = parse_ssh_command(original_cmd)?;
77 77 let (owner, repo_name) = parse_repo_path(&repo_path)?;
78 78
79 - let owner_user = db::users::get_user_by_username(pool, &Username::from_trusted(owner.to_string()))
79 + // Validate the SSH-supplied owner string before any DB lookup or shell
80 + // reconstruction. `parse_repo_path` is a path-shape check, not a Username
81 + // syntax check — without this, a malformed owner could reach the DB layer
82 + // or end up embedded in the `git-shell -c` argument below.
83 + let owner_username = Username::new(owner)
84 + .map_err(|_| anyhow::anyhow!("repository not found"))?;
85 +
86 + let owner_user = db::users::get_user_by_username(pool, &owner_username)
80 87 .await?
81 88 .ok_or_else(|| anyhow::anyhow!("repository not found"))?;
82 89
@@ -25,13 +25,14 @@ pub use crate::rate_limit::{
25 25
26 26 /// Extract the client IP from request headers.
27 27 ///
28 - /// Prefers `CF-Connecting-IP` (set by Cloudflare, trusted) over `X-Forwarded-For`.
29 - /// Returns the first IP in the chain, trimmed. All code paths that store or compare
30 - /// client IPs should use this function to ensure consistency.
28 + /// Honors `CF-Connecting-IP` only — the single header Cloudflare sets and that
29 + /// origin clients cannot reach (Hetzner firewall + Caddy strip arbitrary XFF).
30 + /// `X-Forwarded-For` is intentionally not consulted: there is no trusted-proxy
31 + /// allowlist, so any request bypassing Cloudflare could spoof the IP and evade
32 + /// sandbox caps / poison audit logs / forge "new device" notifications.
31 33 pub fn extract_client_ip(headers: &HeaderMap) -> Option<String> {
32 34 headers
33 35 .get("cf-connecting-ip")
34 - .or_else(|| headers.get("x-forwarded-for"))
35 36 .and_then(|v| v.to_str().ok())
36 37 .and_then(|s| s.split(',').next())
37 38 .map(|s| s.trim().to_string())
@@ -322,10 +323,20 @@ mod tests {
322 323 }
323 324
324 325 #[test]
325 - fn extract_client_ip_xff_fallback() {
326 + fn extract_client_ip_ignores_xff_when_cf_missing() {
327 + // XFF alone must not be trusted — see security note on extract_client_ip.
326 328 let mut headers = HeaderMap::new();
327 329 headers.insert("x-forwarded-for", HeaderValue::from_static("5.6.7.8, 9.10.11.12"));
328 - assert_eq!(extract_client_ip(&headers).as_deref(), Some("5.6.7.8"));
330 + assert_eq!(extract_client_ip(&headers), None);
331 + }
332 +
333 + #[test]
334 + fn extract_client_ip_ignores_xff_even_when_cf_present() {
335 + // Defense in depth: presence of XFF must not influence the result.
336 + let mut headers = HeaderMap::new();
337 + headers.insert("cf-connecting-ip", HeaderValue::from_static("1.2.3.4"));
338 + headers.insert("x-forwarded-for", HeaderValue::from_static("5.6.7.8"));
339 + assert_eq!(extract_client_ip(&headers).as_deref(), Some("1.2.3.4"));
329 340 }
330 341
331 342 #[test]
@@ -201,31 +201,44 @@ pub async fn idempotency_middleware(
201 201
202 202 // Only cache successful responses (2xx/3xx) to avoid caching transient errors
203 203 if status_code < 400 {
204 - // Skip caching if content-length exceeds 1MB to avoid consuming the body
204 + // Only cache when content-length is present AND <= 1MB. We must decide
205 + // BEFORE consuming the body, otherwise a chunked / unknown-length response
206 + // that exceeds the cap would be silently truncated to empty — a correctness
207 + // landmine, since the status + headers would still claim success.
205 208 let content_length = response.headers()
206 209 .get(axum::http::header::CONTENT_LENGTH)
207 210 .and_then(|v| v.to_str().ok())
208 211 .and_then(|v| v.parse::<usize>().ok());
209 - if content_length.is_some_and(|len| len > 1024 * 1024) {
210 - tracing::info!(
212 + let Some(len) = content_length else {
213 + tracing::debug!(
211 214 key = %idem_key, method = %method, path = %path,
212 - "response body exceeds 1MB, skipping idempotency cache"
215 + "no content-length on response; skipping idempotency cache (body left intact)"
216 + );
217 + return response;
218 + };
219 + if len > 1024 * 1024 {
220 + tracing::info!(
221 + key = %idem_key, method = %method, path = %path, len,
222 + "response body exceeds 1MB; skipping idempotency cache"
213 223 );
214 224 return response;
215 225 }
216 226
217 - // Extract body bytes to cache (up to 1MB)
227 + // Extract body bytes to cache. Content-length confirms <= 1MB, so this
228 + // should not exceed the limit; if it does, that's a header/body mismatch
229 + // and we surface 500 rather than silently dropping the body.
218 230 let (parts, body) = response.into_parts();
219 231 let body_bytes = match axum::body::to_bytes(body, 1024 * 1024).await {
220 232 Ok(b) => b,
221 - Err(_) => {
222 - tracing::info!(
223 - key = %idem_key, method = %method, path = %path,
224 - "response body exceeds 1MB, skipping idempotency cache"
233 + Err(e) => {
234 + tracing::error!(
235 + key = %idem_key, method = %method, path = %path, error = ?e,
236 + "response body exceeded 1MB despite content-length <= 1MB; failing closed"
225 237 );
226 - // Body is consumed — return empty. This only triggers for chunked
227 - // responses without content-length (rare for API endpoints).
228 - return axum::response::Response::from_parts(parts, axum::body::Body::empty());
238 + return axum::response::Response::builder()
239 + .status(StatusCode::INTERNAL_SERVER_ERROR)
240 + .body(axum::body::Body::from("internal error"))
241 + .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response());
229 242 }
230 243 };
231 244 // Only cache UTF-8 responses — skip binary content to avoid corruption
@@ -158,56 +158,61 @@ pub fn spawn_monitor(
158 158 }
159 159 }
160 160
161 - // Send alert email on status transitions (with cooldown)
162 - if let Some(ref to) = alert_email {
163 - let cooldown_elapsed = last_alert_at
164 - .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS);
161 + // Status-change notifications (admin alert + user notifications) share
162 + // a single cooldown so a flapping monitor cannot spam either audience.
163 + let cooldown_elapsed = last_alert_at
164 + .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS);
165 165
166 - if cooldown_elapsed {
166 + if cooldown_elapsed {
167 + if let Some(ref to) = alert_email {
167 168 let (subject, body) = build_alert(previous_status, &snap);
168 169 match state.email.send_alert(to, &subject, &body).await {
169 - Ok(()) => {
170 - last_alert_at = Some(Instant::now());
171 - tracing::info!(recipient = %to, "alert email sent");
172 - }
173 - Err(e) => {
174 - tracing::error!(error = ?e, "failed to send alert email");
175 - }
170 + Ok(()) => tracing::info!(recipient = %to, "alert email sent"),
171 + Err(e) => tracing::error!(error = ?e, "failed to send alert email"),
176 172 }
177 173 }
178 - }
179 174
180 - // Notify opted-in users of status changes (fire-and-forget)
181 - {
182 - let pool = state.db.clone();
183 - let email_client = state.email.clone();
184 - let host_url = state.config.host_url.clone();
185 - let signing_secret = state.config.signing_secret.clone();
186 - let current_status = snap.status.as_str().to_string();
187 - let prev_status = previous_status.map_or("unknown", |s| s.as_str()).to_string();
188 - tokio::spawn(async move {
189 - match db::users::get_status_alert_subscribers(&pool).await {
190 - Ok(subscribers) if !subscribers.is_empty() => {
191 - tracing::info!(count = subscribers.len(), "sending status notifications to opted-in users");
192 - for sub in &subscribers {
193 - let unsub_url = crate::email::generate_unsubscribe_url(
194 - &host_url, sub.id, crate::email::UnsubscribeAction::Status, &sub.id.to_string(), &signing_secret,
195 - );
196 - let _ = email_client.send_status_notification(
197 - &sub.email,
198 - sub.display_name.as_deref(),
199 - &current_status,
200 - &prev_status,
201 - &unsub_url,
202 - ).await;
175 + // Notify opted-in users of status changes (fire-and-forget).
176 + // Paced at ~10/sec to stay under Postmark's default send rate.
177 + {
178 + let pool = state.db.clone();
179 + let email_client = state.email.clone();
180 + let host_url = state.config.host_url.clone();
181 + let signing_secret = state.config.signing_secret.clone();
182 + let current_status = snap.status.as_str().to_string();
183 + let prev_status = previous_status.map_or("unknown", |s| s.as_str()).to_string();
184 + tokio::spawn(async move {
185 + match db::users::get_status_alert_subscribers(&pool).await {
186 + Ok(subscribers) if !subscribers.is_empty() => {
187 + tracing::info!(count = subscribers.len(), "sending status notifications to opted-in users");
188 + for sub in &subscribers {
189 + let unsub_url = crate::email::generate_unsubscribe_url(
190 + &host_url, sub.id, crate::email::UnsubscribeAction::Status, &sub.id.to_string(), &signing_secret,
191 + );
192 + let _ = email_client.send_status_notification(
193 + &sub.email,
194 + sub.display_name.as_deref(),
195 + &current_status,
196 + &prev_status,
197 + &unsub_url,
198 + ).await;
199 + tokio::time::sleep(std::time::Duration::from_millis(100)).await;
200 + }
203 201 }
202 + Err(e) => {
203 + tracing::error!(error = ?e, "failed to query status alert subscribers");
204 + }
205 + _ => {}
204 206 }
205 - Err(e) => {
206 - tracing::error!(error = ?e, "failed to query status alert subscribers");
207 - }
208 - _ => {}
209 - }
210 - });
207 + });
208 + }
209 +
210 + // Only record the cooldown timestamp once we've issued at least
211 + // one notification path (admin or subscribers); admin block above
212 + // is gated on `alert_email` being set, but subscriber fan-out is
213 + // always spawned. Always-set is fine since the goal is "don't
214 + // re-notify within ALERT_COOLDOWN_SECS regardless of audience."
215 + last_alert_at = Some(Instant::now());
211 216 }
212 217
213 218 // Create WAM ticket on degradation/error transitions
@@ -99,7 +99,7 @@ pub(super) async fn presign_insertion(
99 99 db::pending_uploads::record_pending_upload(&state.db, user.id, &s3_key, "main").await?;
100 100
101 101 let expires_in = 3600;
102 - let upload_url = s3.presign_upload(&s3_key, &req.content_type, Some(expires_in), Some(crate::storage::CACHE_CONTROL_IMMUTABLE))
102 + let upload_url = s3.presign_upload(&s3_key, &req.content_type, Some(expires_in), Some(crate::storage::CACHE_CONTROL_IMMUTABLE), None)
103 103 .await
104 104 .context("presign upload for insertion clip")?;
105 105