max / makenotwork
12 files changed,
+252 insertions,
-78 deletions
| @@ -298,12 +298,32 @@ pub fn hash_password(password: &str) -> Result<String, AppError> { | |||
| 298 | 298 | Ok(hash.to_string()) | |
| 299 | 299 | } | |
| 300 | 300 | ||
| 301 | - | /// Verify a password against a hash | |
| 301 | + | /// Verify a password against a stored PHC-encoded Argon2 hash. | |
| 302 | + | /// | |
| 303 | + | /// Derives the verifier from the stored hash's own algorithm/version/params | |
| 304 | + | /// rather than relying on `Argon2::default()`. The two are functionally | |
| 305 | + | /// equivalent today (the `PasswordVerifier` trait reads parameters from | |
| 306 | + | /// the parsed hash, not the instance) but explicit derivation pins our | |
| 307 | + | /// boundary: this function only verifies Argon2 family hashes, anything | |
| 308 | + | /// else fails out at `Algorithm::try_from`. Forward-compatible with a | |
| 309 | + | /// future algorithm migration — when one lands, add a dispatch table | |
| 310 | + | /// instead of swapping the default instance under the verifier's feet. | |
| 302 | 311 | pub fn verify_password(password: &str, hash: &str) -> Result<bool, AppError> { | |
| 303 | 312 | let parsed_hash = PasswordHash::new(hash) | |
| 304 | 313 | .map_err(|e| AppError::Internal(anyhow::anyhow!("parse password hash: {e}")))?; | |
| 305 | 314 | ||
| 306 | - | Ok(Argon2::default() | |
| 315 | + | let algorithm = Algorithm::try_from(parsed_hash.algorithm) | |
| 316 | + | .map_err(|e| AppError::Internal(anyhow::anyhow!("unexpected password hash algorithm: {e}")))?; | |
| 317 | + | let version = parsed_hash | |
| 318 | + | .version | |
| 319 | + | .map(Version::try_from) | |
| 320 | + | .transpose() | |
| 321 | + | .map_err(|e| AppError::Internal(anyhow::anyhow!("unexpected password hash version: {e}")))? | |
| 322 | + | .unwrap_or(Version::V0x13); | |
| 323 | + | let params = Params::try_from(&parsed_hash) | |
| 324 | + | .map_err(|e| AppError::Internal(anyhow::anyhow!("parse password hash params: {e}")))?; | |
| 325 | + | ||
| 326 | + | Ok(Argon2::new(algorithm, version, params) | |
| 307 | 327 | .verify_password(password.as_bytes(), &parsed_hash) | |
| 308 | 328 | .is_ok()) | |
| 309 | 329 | } |
| @@ -29,9 +29,19 @@ pub fn generate_token() -> String { | |||
| 29 | 29 | hex::encode(token) | |
| 30 | 30 | } | |
| 31 | 31 | ||
| 32 | - | /// Get or create a CSRF token for the session | |
| 32 | + | /// Get or create a CSRF token for the session. | |
| 33 | + | /// | |
| 34 | + | /// `tower-sessions`' `insert` is last-write-wins, so two concurrent first | |
| 35 | + | /// requests (e.g. the user opens two tabs while not yet having a token) | |
| 36 | + | /// can each generate a fresh token and clobber each other — the first | |
| 37 | + | /// form to post then fails with a 403 because its rendered token has | |
| 38 | + | /// already been overwritten. | |
| 39 | + | /// | |
| 40 | + | /// Re-check via `get` after insert: if a different token landed between | |
| 41 | + | /// our get and our insert, prefer THAT value over ours so all renders | |
| 42 | + | /// from this point forward agree. `String` is `Clone`-cheap, and the | |
| 43 | + | /// race window is small enough that the duplicate insert is negligible. | |
| 33 | 44 | pub async fn get_or_create_token(session: &Session) -> Result<String, AppError> { | |
| 34 | - | // Try to get existing token | |
| 35 | 45 | if let Some(token) = session | |
| 36 | 46 | .get::<String>(CSRF_SESSION_KEY) | |
| 37 | 47 | .await | |
| @@ -40,14 +50,22 @@ pub async fn get_or_create_token(session: &Session) -> Result<String, AppError> | |||
| 40 | 50 | return Ok(token); | |
| 41 | 51 | } | |
| 42 | 52 | ||
| 43 | - | // Generate new token | |
| 44 | - | let token = generate_token(); | |
| 53 | + | let candidate = generate_token(); | |
| 45 | 54 | session | |
| 46 | - | .insert(CSRF_SESSION_KEY, &token) | |
| 55 | + | .insert(CSRF_SESSION_KEY, &candidate) | |
| 47 | 56 | .await | |
| 48 | 57 | .context("session insert")?; | |
| 49 | 58 | ||
| 50 | - | Ok(token) | |
| 59 | + | // Re-fetch so a concurrent insert wins consistently — whichever caller | |
| 60 | + | // wrote last is what every subsequent render will see, and we hand | |
| 61 | + | // back the same value here. | |
| 62 | + | let final_token: String = session | |
| 63 | + | .get(CSRF_SESSION_KEY) | |
| 64 | + | .await | |
| 65 | + | .context("session error")? | |
| 66 | + | .unwrap_or(candidate); | |
| 67 | + | ||
| 68 | + | Ok(final_token) | |
| 51 | 69 | } | |
| 52 | 70 | ||
| 53 | 71 | /// Validate a CSRF token against the session token | |
| @@ -74,11 +92,16 @@ pub fn extract_token_from_request(headers: &HeaderMap, body: Option<&str>) -> Op | |||
| 74 | 92 | return Some(token); | |
| 75 | 93 | } | |
| 76 | 94 | ||
| 77 | - | // Fall back to the _csrf field in form-encoded body (vanilla HTML forms) | |
| 95 | + | // Fall back to the `_csrf` field in form-encoded body (vanilla HTML | |
| 96 | + | // forms). We use a proper urlencoded parser instead of `split('&')` | |
| 97 | + | // so a textarea containing `&_csrf=attacker-token` can't sneak past | |
| 98 | + | // a later field with the wrong value — the parser respects field | |
| 99 | + | // ordering and won't conflate textarea content with form fields | |
| 100 | + | // because the form encoder percent-encodes `&` inside text values. | |
| 78 | 101 | if let Some(body_str) = body { | |
| 79 | - | for part in body_str.split('&') { | |
| 80 | - | if let Some(value) = part.strip_prefix("_csrf=") { | |
| 81 | - | return Some(urlencoding::decode(value).ok()?.to_string()); | |
| 102 | + | for (key, value) in url::form_urlencoded::parse(body_str.as_bytes()) { | |
| 103 | + | if key == "_csrf" { | |
| 104 | + | return Some(value.into_owned()); | |
| 82 | 105 | } | |
| 83 | 106 | } | |
| 84 | 107 | } | |
| @@ -192,17 +215,36 @@ pub async fn csrf_middleware(request: Request, next: Next) -> Response { | |||
| 192 | 215 | return next.run(request).await; | |
| 193 | 216 | } | |
| 194 | 217 | ||
| 195 | - | // Authenticated user without header token — check form body for _csrf field. | |
| 196 | - | // Only attempt body parsing for form-encoded content types. | |
| 197 | - | let is_form = request | |
| 218 | + | // Authenticated user without header token — check form body for `_csrf`. | |
| 219 | + | // We only parse `application/x-www-form-urlencoded`. Other content | |
| 220 | + | // types are rejected here: | |
| 221 | + | // - `multipart/form-data` is the closest near-miss: it has its own | |
| 222 | + | // `_csrf` part but parsing it would mean pulling in a multipart | |
| 223 | + | // decoder and buffering the entire upload body, defeating the | |
| 224 | + | // upload-size limit. The codebase doesn't currently use multipart | |
| 225 | + | // forms (uploads go through HTMX + fetch, which attach | |
| 226 | + | // `X-CSRF-Token` on the header path above), so rejecting here is | |
| 227 | + | // the explicit boundary. If multipart adoption ever becomes | |
| 228 | + | // necessary, add a content-type branch that streams the body | |
| 229 | + | // through a multipart parser instead of naive `to_bytes`. | |
| 230 | + | // - `application/json` and others must use the `X-CSRF-Token` | |
| 231 | + | // header — anything that can set a custom header can set this one. | |
| 232 | + | let content_type = request | |
| 198 | 233 | .headers() | |
| 199 | 234 | .get("content-type") | |
| 200 | 235 | .and_then(|v| v.to_str().ok()) | |
| 201 | - | .is_some_and(|ct| ct.starts_with("application/x-www-form-urlencoded")); | |
| 236 | + | .unwrap_or(""); | |
| 237 | + | let is_form = content_type.starts_with("application/x-www-form-urlencoded"); | |
| 202 | 238 | ||
| 203 | 239 | if !is_form { | |
| 204 | - | tracing::warn!(path = %path, "CSRF token missing for authenticated non-form request"); | |
| 205 | - | return (StatusCode::FORBIDDEN, "CSRF token required").into_response(); | |
| 240 | + | let is_multipart = content_type.starts_with("multipart/form-data"); | |
| 241 | + | tracing::warn!( | |
| 242 | + | path = %path, | |
| 243 | + | content_type, | |
| 244 | + | is_multipart, | |
| 245 | + | "CSRF token missing for authenticated non-form request" | |
| 246 | + | ); | |
| 247 | + | return crate::error::AppError::Forbidden.into_response(); | |
| 206 | 248 | } | |
| 207 | 249 | ||
| 208 | 250 | // Buffer the body to extract _csrf, then reconstruct the request. |
| @@ -94,26 +94,6 @@ pub async fn get_by_id(pool: &PgPool, id: MediaFileId) -> Result<Option<DbMediaF | |||
| 94 | 94 | Ok(row) | |
| 95 | 95 | } | |
| 96 | 96 | ||
| 97 | - | /// Get a media file by user + folder + filename (for collision detection). | |
| 98 | - | #[tracing::instrument(skip_all)] | |
| 99 | - | pub async fn get_by_user_folder_name( | |
| 100 | - | pool: &PgPool, | |
| 101 | - | user_id: UserId, | |
| 102 | - | folder: &str, | |
| 103 | - | filename: &str, | |
| 104 | - | ) -> Result<Option<DbMediaFile>> { | |
| 105 | - | let row = sqlx::query_as::<_, DbMediaFile>( | |
| 106 | - | "SELECT * FROM media_files WHERE user_id = $1 AND folder = $2 AND filename = $3", | |
| 107 | - | ) | |
| 108 | - | .bind(user_id) | |
| 109 | - | .bind(folder) | |
| 110 | - | .bind(filename) | |
| 111 | - | .fetch_optional(pool) | |
| 112 | - | .await?; | |
| 113 | - | ||
| 114 | - | Ok(row) | |
| 115 | - | } | |
| 116 | - | ||
| 117 | 97 | /// Delete a media file by ID. | |
| 118 | 98 | #[tracing::instrument(skip_all)] | |
| 119 | 99 | pub async fn delete<'e>( |
| @@ -200,7 +200,22 @@ pub async fn try_increment_use_count<'e>( | |||
| 200 | 200 | } | |
| 201 | 201 | ||
| 202 | 202 | /// Release a reserved use_count slot (decrement, clamped to 0). | |
| 203 | - | /// Called when a stale pending transaction with a reserved promo code is cleaned up. | |
| 203 | + | /// | |
| 204 | + | /// Used in two places that must coordinate so the count doesn't drop twice | |
| 205 | + | /// for the same reservation: | |
| 206 | + | /// 1. Route handlers, when a Stripe checkout creation or pending-tx | |
| 207 | + | /// insert fails AFTER the use_count was reserved. They call | |
| 208 | + | /// `release_use_count_and_detach` (below) which also nulls the | |
| 209 | + | /// `promo_code_id` on any pending transaction rows for this | |
| 210 | + | /// reservation, so `cleanup_stale_pending` can't fire a second | |
| 211 | + | /// release for the same buyer's promo hold. | |
| 212 | + | /// 2. `cleanup_stale_pending` itself, when it deletes stale pending | |
| 213 | + | /// rows past the 24h checkout-session expiry. Those rows still | |
| 214 | + | /// carry their `promo_code_id`, so this plain function is the | |
| 215 | + | /// right call from there. | |
| 216 | + | /// | |
| 217 | + | /// `GREATEST(0, ...)` makes a double-release harmless (count clamps at | |
| 218 | + | /// zero) but the structural fix above prevents it from happening at all. | |
| 204 | 219 | #[tracing::instrument(skip_all)] | |
| 205 | 220 | pub async fn release_use_count(pool: &PgPool, id: PromoCodeId) -> Result<()> { | |
| 206 | 221 | sqlx::query( | |
| @@ -213,6 +228,43 @@ pub async fn release_use_count(pool: &PgPool, id: PromoCodeId) -> Result<()> { | |||
| 213 | 228 | Ok(()) | |
| 214 | 229 | } | |
| 215 | 230 | ||
| 231 | + | /// Release a use_count slot AND detach the same promo_code_id from any | |
| 232 | + | /// pending transactions for `buyer_id` so the scheduler's | |
| 233 | + | /// `cleanup_stale_pending` doesn't release it a second time when those | |
| 234 | + | /// stale rows eventually time out. | |
| 235 | + | /// | |
| 236 | + | /// Use this from route-level failure paths (Stripe session creation | |
| 237 | + | /// failed, pending-tx insert failed mid-cart, etc). The detach is a | |
| 238 | + | /// no-op when the failure happened BEFORE any pending row was inserted; | |
| 239 | + | /// it's the safety net for when a partial pending row may have landed. | |
| 240 | + | #[tracing::instrument(skip_all)] | |
| 241 | + | pub async fn release_use_count_and_detach( | |
| 242 | + | pool: &PgPool, | |
| 243 | + | id: PromoCodeId, | |
| 244 | + | buyer_id: UserId, | |
| 245 | + | ) -> Result<()> { | |
| 246 | + | let mut tx = pool.begin().await?; | |
| 247 | + | ||
| 248 | + | sqlx::query( | |
| 249 | + | "UPDATE transactions SET promo_code_id = NULL \ | |
| 250 | + | WHERE buyer_id = $1 AND promo_code_id = $2 AND status = 'pending'", | |
| 251 | + | ) | |
| 252 | + | .bind(buyer_id) | |
| 253 | + | .bind(id) | |
| 254 | + | .execute(&mut *tx) | |
| 255 | + | .await?; | |
| 256 | + | ||
| 257 | + | sqlx::query( | |
| 258 | + | "UPDATE promo_codes SET use_count = GREATEST(0, use_count - 1) WHERE id = $1", | |
| 259 | + | ) | |
| 260 | + | .bind(id) | |
| 261 | + | .execute(&mut *tx) | |
| 262 | + | .await?; | |
| 263 | + | ||
| 264 | + | tx.commit().await?; | |
| 265 | + | Ok(()) | |
| 266 | + | } | |
| 267 | + | ||
| 216 | 268 | /// Update editable fields on a promo code (expires_at, starts_at, max_uses). | |
| 217 | 269 | #[tracing::instrument(skip_all)] | |
| 218 | 270 | pub async fn update_promo_code( |
| @@ -783,13 +783,29 @@ pub struct StatusAlertSubscriber { | |||
| 783 | 783 | } | |
| 784 | 784 | ||
| 785 | 785 | /// Get all users who opted into platform status notifications. | |
| 786 | + | /// | |
| 787 | + | /// Hard cap at 10k rows so the monitor's status-change fan-out can't unbox | |
| 788 | + | /// an unbounded query into RAM. The 100ms pacing in `monitor.rs` already | |
| 789 | + | /// limits fan-out throughput to ~600/minute — anything past 10k would | |
| 790 | + | /// chew through Postmark rate limits anyway. If we ever hit the cap a | |
| 791 | + | /// WARN fires so we know to switch to a paged dispatch model. | |
| 786 | 792 | #[tracing::instrument(skip_all)] | |
| 787 | 793 | pub async fn get_status_alert_subscribers(pool: &PgPool) -> Result<Vec<StatusAlertSubscriber>> { | |
| 794 | + | const STATUS_SUBSCRIBER_CAP: i64 = 10_000; | |
| 788 | 795 | let rows = sqlx::query_as::<_, StatusAlertSubscriber>( | |
| 789 | - | "SELECT id, email, display_name FROM users WHERE notify_status = true AND deactivated_at IS NULL", | |
| 796 | + | "SELECT id, email, display_name FROM users \ | |
| 797 | + | WHERE notify_status = true AND deactivated_at IS NULL \ | |
| 798 | + | ORDER BY id LIMIT $1", | |
| 790 | 799 | ) | |
| 800 | + | .bind(STATUS_SUBSCRIBER_CAP) | |
| 791 | 801 | .fetch_all(pool) | |
| 792 | 802 | .await?; | |
| 803 | + | if rows.len() as i64 == STATUS_SUBSCRIBER_CAP { | |
| 804 | + | tracing::warn!( | |
| 805 | + | cap = STATUS_SUBSCRIBER_CAP, | |
| 806 | + | "get_status_alert_subscribers hit hard cap; promote to paged dispatch" | |
| 807 | + | ); | |
| 808 | + | } | |
| 793 | 809 | Ok(rows) | |
| 794 | 810 | } | |
| 795 | 811 |
| @@ -124,11 +124,16 @@ pub async fn record_user_download( | |||
| 124 | 124 | item_id: ItemId, | |
| 125 | 125 | version_id: VersionId, | |
| 126 | 126 | ) -> Result<()> { | |
| 127 | + | // Explicit conflict target — the table's PRIMARY KEY is | |
| 128 | + | // (user_id, item_id, version_id), but `ON CONFLICT DO NOTHING` without | |
| 129 | + | // a target would silently swallow conflicts on ANY future constraint | |
| 130 | + | // we add (a unique index on downloaded_at, say). Naming the target | |
| 131 | + | // means a new constraint surfaces as an error rather than a no-op. | |
| 127 | 132 | sqlx::query( | |
| 128 | 133 | r#" | |
| 129 | 134 | INSERT INTO user_downloads (user_id, item_id, version_id) | |
| 130 | 135 | VALUES ($1, $2, $3) | |
| 131 | - | ON CONFLICT DO NOTHING | |
| 136 | + | ON CONFLICT (user_id, item_id, version_id) DO NOTHING | |
| 132 | 137 | "#, | |
| 133 | 138 | ) | |
| 134 | 139 | .bind(user_id) |
| @@ -115,6 +115,11 @@ pub fn spawn_monitor( | |||
| 115 | 115 | let mut previous_status: Option<MonitorStatus> = None; | |
| 116 | 116 | let mut last_alert_at: Option<Instant> = None; | |
| 117 | 117 | let mut last_pool_alert_at: Option<Instant> = None; | |
| 118 | + | // Hysteresis arming flag for DB pool pressure. True = next crossing | |
| 119 | + | // above the high threshold is allowed to fire a ticket. Resets to | |
| 120 | + | // true whenever pressure drops below the low threshold. Starts | |
| 121 | + | // true so the first overrun after boot can alert. | |
| 122 | + | let mut pool_pressure_armed: bool = true; | |
| 118 | 123 | let mut last_pg_activity_alert_at: Option<Instant> = None; | |
| 119 | 124 | let mut prune_counter: u64 = 0; | |
| 120 | 125 | ||
| @@ -241,23 +246,38 @@ pub fn spawn_monitor( | |||
| 241 | 246 | ||
| 242 | 247 | previous_status = Some(snap.status); | |
| 243 | 248 | ||
| 244 | - | // DB pool pressure check (>80% active connections), with cooldown | |
| 249 | + | // DB pool pressure check with hysteresis: open at 80%, only | |
| 250 | + | // re-alert after pressure drops below 60% and climbs back over | |
| 251 | + | // 80%. Without hysteresis, a load that oscillates around the | |
| 252 | + | // threshold (e.g. each scheduler tick briefly maxes the pool) | |
| 253 | + | // spams a ticket every ALERT_COOLDOWN_SECS window. | |
| 245 | 254 | { | |
| 246 | 255 | let pool_size = state.db.size(); | |
| 247 | 256 | let pool_idle = state.db.num_idle() as u32; | |
| 248 | 257 | let active = pool_size.saturating_sub(pool_idle); | |
| 249 | - | if pool_size > 0 && active * 100 / pool_size > 80 { | |
| 250 | - | tracing::warn!(pool_size, active, idle = pool_idle, "DB pool pressure >80%"); | |
| 258 | + | let pct = if pool_size > 0 { active * 100 / pool_size } else { 0 }; | |
| 259 | + | let high = 80u32; | |
| 260 | + | let low = 60u32; | |
| 261 | + | ||
| 262 | + | if pct > high { | |
| 263 | + | tracing::warn!(pool_size, active, idle = pool_idle, pct, "DB pool pressure >80%"); | |
| 264 | + | // Only fire a ticket on the rising edge (was below the low | |
| 265 | + | // threshold last we recovered). The cooldown is still here | |
| 266 | + | // as a backstop in case the hysteresis state machine ever | |
| 267 | + | // gets confused. | |
| 251 | 268 | let cooldown_ok = last_pool_alert_at | |
| 252 | 269 | .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS); | |
| 253 | - | if cooldown_ok | |
| 254 | - | && let Some(ref wam) = state.wam | |
| 255 | - | { | |
| 270 | + | if cooldown_ok && pool_pressure_armed && let Some(ref wam) = state.wam { | |
| 256 | 271 | let title = format!("DB pool pressure: {active}/{pool_size} active"); | |
| 257 | 272 | wam.create_ticket(&title, None, "high", "db-pool-pressure", None).await; | |
| 258 | 273 | last_pool_alert_at = Some(Instant::now()); | |
| 274 | + | pool_pressure_armed = false; // wait for drop below `low` before re-arming | |
| 259 | 275 | } | |
| 276 | + | } else if pct < low { | |
| 277 | + | pool_pressure_armed = true; | |
| 260 | 278 | } | |
| 279 | + | // Between `low` and `high` we hold the current armed state — | |
| 280 | + | // that's the dead-band where neither edge triggers. | |
| 261 | 281 | } | |
| 262 | 282 | ||
| 263 | 283 | // Postgres server-wide saturation check via pg_stat_activity. This | |
| @@ -266,6 +286,12 @@ pub fn spawn_monitor( | |||
| 266 | 286 | // pressure check above cannot see. The probe also emits Prometheus | |
| 267 | 287 | // gauges via `record_pg_stat_activity` so dashboards can graph the | |
| 268 | 288 | // ratio over time, not just react to the alert threshold. | |
| 289 | + | // | |
| 290 | + | // Cadence note: this runs on EVERY monitor tick (default 30s) | |
| 291 | + | // because the gauges are load-bearing for the operator dashboard | |
| 292 | + | // — a 30s refresh on connection-utilization is the right tradeoff. | |
| 293 | + | // The probe itself is a single-row query against a system view; | |
| 294 | + | // its cost is on the order of microseconds. | |
| 269 | 295 | if let Some((active, max_conn)) = crate::metrics::record_pg_stat_activity(&state.db).await { | |
| 270 | 296 | let pct = active * 100 / max_conn; | |
| 271 | 297 | if pct > 80 { |
| @@ -144,21 +144,14 @@ pub(super) async fn media_presign( | |||
| 144 | 144 | // Early quota check — images bypass tier, video requires BigFiles+ | |
| 145 | 145 | db::creator_tiers::check_presign_allowed(&state.db, user.id, file_type).await?; | |
| 146 | 146 | ||
| 147 | - | // Check filename collision | |
| 148 | - | let safe_filename = req.file_name | |
| 149 | - | .chars() | |
| 150 | - | .filter(|c| c.is_alphanumeric() || *c == '.' || *c == '-' || *c == '_') | |
| 151 | - | .collect::<String>(); | |
| 152 | - | if db::media_files::get_by_user_folder_name(&state.db, user.id, &folder, &safe_filename) | |
| 153 | - | .await? | |
| 154 | - | .is_some() | |
| 155 | - | { | |
| 156 | - | return Err(AppError::BadRequest(format!( | |
| 157 | - | "A file named '{}' already exists in folder '{}'.", | |
| 158 | - | safe_filename, | |
| 159 | - | if folder.is_empty() { "(root)" } else { &folder } | |
| 160 | - | ))); | |
| 161 | - | } | |
| 147 | + | // Filename uniqueness is enforced at confirm time by the | |
| 148 | + | // `idx_media_files_user_folder_name` unique index — see `media_confirm`, | |
| 149 | + | // which catches the duplicate-INSERT error, rolls back the storage | |
| 150 | + | // credit, deletes the orphaned S3 object, and returns the clean | |
| 151 | + | // "already exists" message. The pre-check we used to do here at presign | |
| 152 | + | // time was racy (two concurrent presigns both pass the SELECT, then | |
| 153 | + | // both try to upload and one wastes bandwidth) and the confirm-time | |
| 154 | + | // path is authoritative either way. | |
| 162 | 155 | ||
| 163 | 156 | // Generate S3 key | |
| 164 | 157 | let s3_key = S3Client::generate_media_key(user.id, &folder, &req.file_name); |
| @@ -115,13 +115,24 @@ pub(crate) async fn scan_and_classify( | |||
| 115 | 115 | return Ok((db::FileScanStatus::HeldForReview, None)); | |
| 116 | 116 | } | |
| 117 | 117 | ||
| 118 | - | // Limit concurrent scans to avoid memory exhaustion (each downloads up to 100 MB) | |
| 119 | - | let _permit = state.scan_semaphore.acquire().await | |
| 120 | - | .context("acquire scan semaphore")?; | |
| 121 | - | let data = s3.download_object(s3_key).await?; | |
| 122 | - | // `scanner` is `Arc<ScanPipeline>`; clone the Arc so the spawn_blocking | |
| 123 | - | // closure inside `scan` can own it for the duration of CPU-bound layers. | |
| 124 | - | let result = std::sync::Arc::clone(scanner).scan(data, file_type).await; | |
| 118 | + | // Limit concurrent scans to avoid memory exhaustion (each downloads | |
| 119 | + | // up to 100 MB into a Vec<u8>). The permit covers the in-memory work | |
| 120 | + | // — download buffer + scan — and is dropped BEFORE the DB insert and | |
| 121 | + | // WAM ticket creation. Those tail ops don't allocate big buffers, so | |
| 122 | + | // holding the semaphore through them just serializes uploads for no | |
| 123 | + | // benefit. The scope below ensures the permit's drop is the last | |
| 124 | + | // thing that happens before we leave the memory-heavy region. | |
| 125 | + | let result = { | |
| 126 | + | let _permit = state.scan_semaphore.acquire().await | |
| 127 | + | .context("acquire scan semaphore")?; | |
| 128 | + | let data = s3.download_object(s3_key).await?; | |
| 129 | + | // `scanner` is `Arc<ScanPipeline>`; clone the Arc so the | |
| 130 | + | // spawn_blocking closure inside `scan` can own it for the | |
| 131 | + | // duration of CPU-bound layers. `scan` consumes the Vec, so | |
| 132 | + | // by the time it returns we hold only the ScanResult + layer | |
| 133 | + | // metadata — small enough that the permit can drop here. | |
| 134 | + | std::sync::Arc::clone(scanner).scan(data, file_type).await | |
| 135 | + | }; | |
| 125 | 136 | ||
| 126 | 137 | db::scanning::insert_scan_result(&state.db, s3_key, &result).await?; | |
| 127 | 138 |
| @@ -307,7 +307,7 @@ pub(in crate::routes::stripe) async fn create_cart_checkout( | |||
| 307 | 307 | Ok(r) => r, | |
| 308 | 308 | Err(e) => { | |
| 309 | 309 | if let Some(pc_id) = promo_code_id { | |
| 310 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 310 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 311 | 311 | } | |
| 312 | 312 | return Err(e).context("create cart checkout session"); | |
| 313 | 313 | } | |
| @@ -348,7 +348,7 @@ pub(in crate::routes::stripe) async fn create_cart_checkout( | |||
| 348 | 348 | Err(e) => { | |
| 349 | 349 | // Transaction auto-rolls back on drop | |
| 350 | 350 | if let Some(pc_id) = promo_code_id { | |
| 351 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 351 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 352 | 352 | } | |
| 353 | 353 | return Err(e).context("create pending transaction for cart item"); | |
| 354 | 354 | } | |
| @@ -633,7 +633,7 @@ pub(super) async fn process_seller_checkout( | |||
| 633 | 633 | Ok(r) => r, | |
| 634 | 634 | Err(e) => { | |
| 635 | 635 | if let Some(pc_id) = promo_code_id { | |
| 636 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 636 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 637 | 637 | } | |
| 638 | 638 | return Err(e).context("create cart checkout session"); | |
| 639 | 639 | } | |
| @@ -663,7 +663,7 @@ pub(super) async fn process_seller_checkout( | |||
| 663 | 663 | if db_err.code().as_deref() == Some("23505") => {} | |
| 664 | 664 | Err(e) => { | |
| 665 | 665 | if let Some(pc_id) = promo_code_id { | |
| 666 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 666 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 667 | 667 | } | |
| 668 | 668 | return Err(e).context("create pending transaction"); | |
| 669 | 669 | } |
| @@ -329,7 +329,7 @@ pub(in crate::routes::stripe) async fn create_checkout( | |||
| 329 | 329 | Ok(s) => s, | |
| 330 | 330 | Err(e) => { | |
| 331 | 331 | if let Some(pc_id) = promo_code_id { | |
| 332 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 332 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 333 | 333 | } | |
| 334 | 334 | return Err(e).with_context(|| format!("create Stripe checkout for item {item_uuid}")); | |
| 335 | 335 | } | |
| @@ -361,14 +361,14 @@ pub(in crate::routes::stripe) async fn create_checkout( | |||
| 361 | 361 | if db_err.code().as_deref() == Some("23505") => | |
| 362 | 362 | { | |
| 363 | 363 | if let Some(pc_id) = promo_code_id { | |
| 364 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 364 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 365 | 365 | } | |
| 366 | 366 | tracing::info!(buyer_id = %user.id, item_id = %item_uuid, "duplicate pending checkout blocked"); | |
| 367 | 367 | return Ok(Redirect::to(&format!("/purchase/{}", item_id)).into_response()); | |
| 368 | 368 | } | |
| 369 | 369 | Err(e) => { | |
| 370 | 370 | if let Some(pc_id) = promo_code_id { | |
| 371 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 371 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 372 | 372 | } | |
| 373 | 373 | return Err(e).context("create pending transaction"); | |
| 374 | 374 | } |
| @@ -51,6 +51,31 @@ pub(in crate::routes::stripe) async fn create_fan_plus_checkout( | |||
| 51 | 51 | Ok(Redirect::to(&checkout_url).into_response()) | |
| 52 | 52 | } | |
| 53 | 53 | ||
| 54 | + | /// Reject the request if its `Sec-Fetch-Site` doesn't look like a real | |
| 55 | + | /// click from our own dashboard. | |
| 56 | + | /// | |
| 57 | + | /// These two endpoints (`fan-plus/cancel` and `resume`) are exempted from | |
| 58 | + | /// the global CSRF middleware because they're vanilla form posts that | |
| 59 | + | /// redirect back to the dashboard — there's no place to attach a header | |
| 60 | + | /// token. SameSite=Lax cookies block cross-site form posts already, but | |
| 61 | + | /// Sec-Fetch-Site is the explicit second check we promise in the | |
| 62 | + | /// CSRF-exempt rationale (see `csrf.rs`). | |
| 63 | + | /// | |
| 64 | + | /// Allow: | |
| 65 | + | /// - `same-origin` — click from our own dashboard, exactly what we want | |
| 66 | + | /// - missing — older browsers that don't send the header at all | |
| 67 | + | /// Reject everything else (`cross-site`, `same-site`, `none`/typed-URL). | |
| 68 | + | fn check_sec_fetch_site(headers: &axum::http::HeaderMap) -> Result<()> { | |
| 69 | + | let Some(value) = headers.get("sec-fetch-site").and_then(|v| v.to_str().ok()) else { | |
| 70 | + | return Ok(()); | |
| 71 | + | }; | |
| 72 | + | if value == "same-origin" { | |
| 73 | + | return Ok(()); | |
| 74 | + | } | |
| 75 | + | tracing::warn!(sec_fetch_site = value, "fan-plus subscription change rejected: bad Sec-Fetch-Site"); | |
| 76 | + | Err(AppError::Forbidden) | |
| 77 | + | } | |
| 78 | + | ||
| 54 | 79 | /// POST /stripe/fan-plus/cancel: Schedule Fan+ to cancel at period end. | |
| 55 | 80 | /// | |
| 56 | 81 | /// Self-service: leaves the subscription active through the current paid | |
| @@ -60,8 +85,10 @@ pub(in crate::routes::stripe) async fn create_fan_plus_checkout( | |||
| 60 | 85 | #[tracing::instrument(skip_all, name = "stripe::fan_plus_cancel")] | |
| 61 | 86 | pub(in crate::routes::stripe) async fn cancel_fan_plus( | |
| 62 | 87 | State(state): State<AppState>, | |
| 88 | + | headers: axum::http::HeaderMap, | |
| 63 | 89 | AuthUser(user): AuthUser, | |
| 64 | 90 | ) -> Result<Redirect> { | |
| 91 | + | check_sec_fetch_site(&headers)?; | |
| 65 | 92 | let sub = db::fan_plus::get_fan_plus_by_user(&state.db, user.id) | |
| 66 | 93 | .await? | |
| 67 | 94 | .ok_or_else(|| AppError::BadRequest("No active Fan+ subscription".to_string()))?; | |
| @@ -81,8 +108,10 @@ pub(in crate::routes::stripe) async fn cancel_fan_plus( | |||
| 81 | 108 | #[tracing::instrument(skip_all, name = "stripe::fan_plus_resume")] | |
| 82 | 109 | pub(in crate::routes::stripe) async fn resume_fan_plus( | |
| 83 | 110 | State(state): State<AppState>, | |
| 111 | + | headers: axum::http::HeaderMap, | |
| 84 | 112 | AuthUser(user): AuthUser, | |
| 85 | 113 | ) -> Result<Redirect> { | |
| 114 | + | check_sec_fetch_site(&headers)?; | |
| 86 | 115 | let sub = db::fan_plus::get_fan_plus_by_user(&state.db, user.id) | |
| 87 | 116 | .await? | |
| 88 | 117 | .ok_or_else(|| AppError::BadRequest("No Fan+ subscription".to_string()))?; | |
| @@ -387,7 +416,7 @@ pub(in crate::routes::stripe) async fn create_subscription_checkout( | |||
| 387 | 416 | Ok(s) => s, | |
| 388 | 417 | Err(e) => { | |
| 389 | 418 | if let Some(pc_id) = promo_code_id { | |
| 390 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 419 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 391 | 420 | } | |
| 392 | 421 | return Err(e); | |
| 393 | 422 | } | |
| @@ -407,7 +436,7 @@ pub(in crate::routes::stripe) async fn create_subscription_checkout( | |||
| 407 | 436 | ).await | |
| 408 | 437 | { | |
| 409 | 438 | // If we can't create the pending row, release the reservation and fail. | |
| 410 | - | db::promo_codes::release_use_count(&state.db, pc_id).await.ok(); | |
| 439 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 411 | 440 | return Err(e).context("create subscription pending transaction for promo code"); | |
| 412 | 441 | } | |
| 413 | 442 |