max / makenotwork
14 files changed,
+248 insertions,
-48 deletions
| @@ -461,7 +461,11 @@ pub fn apply_discount(price_cents: i32, discount_type: DiscountType, discount_va | |||
| 461 | 461 | let discount = (price_cents as i64 * discount_value as i64) / 100; | |
| 462 | 462 | (price_cents as i64 - discount).max(0) as i32 | |
| 463 | 463 | } | |
| 464 | - | DiscountType::Fixed => (price_cents - discount_value).max(0), | |
| 464 | + | // Subtract in i64 (like the Percentage arm) so a configuration where | |
| 465 | + | // `discount_value > i32::MAX - price_cents` can't underflow before the | |
| 466 | + | // `.max(0)` clamp catches it. discount_value is i32 so the sub is | |
| 467 | + | // bounded; we cast for parity with the Percentage path. | |
| 468 | + | DiscountType::Fixed => (price_cents as i64 - discount_value as i64).max(0) as i32, | |
| 465 | 469 | } | |
| 466 | 470 | } | |
| 467 | 471 |
| @@ -128,12 +128,23 @@ pub async fn delete_user_session( | |||
| 128 | 128 | Ok(rows.rows_affected() > 0) | |
| 129 | 129 | } | |
| 130 | 130 | ||
| 131 | - | /// Delete a session by ID only (no user scoping). Used during logout | |
| 132 | - | /// when we have the tracking ID but not necessarily the user ID. | |
| 131 | + | /// Delete a session row, scoped to a specific user. | |
| 132 | + | /// | |
| 133 | + | /// The user scoping isn't strictly required for correctness in the current | |
| 134 | + | /// caller (logout reads its own tracking ID out of the session and we | |
| 135 | + | /// trust that), but the unscoped signature was an easy footgun — anyone | |
| 136 | + | /// who later wired this up with an attacker-controllable session_id could | |
| 137 | + | /// delete arbitrary rows. Requiring user_id in the signature keeps the | |
| 138 | + | /// SQL pinned to "this user, this row" so that misuse fails fast. | |
| 133 | 139 | #[tracing::instrument(skip_all)] | |
| 134 | - | pub async fn delete_session_by_id(pool: &PgPool, session_id: UserSessionId) -> Result<bool> { | |
| 135 | - | let rows = sqlx::query("DELETE FROM user_sessions WHERE id = $1") | |
| 140 | + | pub async fn delete_session_by_id( | |
| 141 | + | pool: &PgPool, | |
| 142 | + | session_id: UserSessionId, | |
| 143 | + | user_id: UserId, | |
| 144 | + | ) -> Result<bool> { | |
| 145 | + | let rows = sqlx::query("DELETE FROM user_sessions WHERE id = $1 AND user_id = $2") | |
| 136 | 146 | .bind(session_id) | |
| 147 | + | .bind(user_id) | |
| 137 | 148 | .execute(pool) | |
| 138 | 149 | .await?; | |
| 139 | 150 |
| @@ -133,9 +133,32 @@ pub(in crate::routes::api) async fn export_content( | |||
| 133 | 133 | let mut manifest: Vec<(String, i64)> = Vec::new(); | |
| 134 | 134 | let mut total_size: u64 = 0; | |
| 135 | 135 | const MAX_TOTAL_SIZE: u64 = 2 * 1024 * 1024 * 1024; // 2 GB | |
| 136 | + | const MAX_FILE_SIZE: u64 = 500 * 1024 * 1024; // 500 MB per file | |
| 136 | 137 | let mut skipped: Vec<String> = Vec::new(); | |
| 137 | 138 | ||
| 138 | 139 | for (s3_key, zip_path_entry) in &files { | |
| 140 | + | // Per-file size pre-check BEFORE downloading so a single 20 GB | |
| 141 | + | // video can't blow the heap before the post-download total | |
| 142 | + | // check fires. We HEAD the object first; if it's over the | |
| 143 | + | // per-file cap, skip it with a clear message in the manifest. | |
| 144 | + | // The total-size guard below still catches the 2 GB aggregate. | |
| 145 | + | if let Ok(Some(size)) = s3_clone.object_size(s3_key).await { | |
| 146 | + | if size as u64 > MAX_FILE_SIZE { | |
| 147 | + | skipped.push(format!( | |
| 148 | + | "{} (exceeds 500 MB per-file export cap)", | |
| 149 | + | zip_path_entry | |
| 150 | + | )); | |
| 151 | + | continue; | |
| 152 | + | } | |
| 153 | + | if total_size + size as u64 > MAX_TOTAL_SIZE { | |
| 154 | + | let msg = "Content export exceeds 2 GB limit. Try exporting a single project instead."; | |
| 155 | + | if is_htmx { | |
| 156 | + | return Ok(export_error_html(msg)); | |
| 157 | + | } | |
| 158 | + | return Err(AppError::BadRequest(msg.to_string())); | |
| 159 | + | } | |
| 160 | + | } | |
| 161 | + | ||
| 139 | 162 | match s3_clone.download_object(s3_key).await { | |
| 140 | 163 | Ok(data) => { | |
| 141 | 164 | total_size += data.len() as u64; |
| @@ -572,8 +572,12 @@ pub(super) async fn claim_promo_code( | |||
| 572 | 572 | return Err(AppError::BadRequest("This promo code is not yet active".to_string())); | |
| 573 | 573 | } | |
| 574 | 574 | ||
| 575 | - | // Check expiration | |
| 576 | - | if let Some(expires_at) = promo_code.expires_at && expires_at < chrono::Utc::now() { | |
| 575 | + | // Check expiration. Use `<=` so an exact `expires_at == NOW()` clock tick | |
| 576 | + | // is treated as expired here — matches the SQL `expires_at > NOW()` guard | |
| 577 | + | // in `try_increment_use_count`. Without the alignment, the route would | |
| 578 | + | // accept a code right at the boundary, then the atomic SQL would reject | |
| 579 | + | // it (rows_affected = 0) and the user gets the wrong error. | |
| 580 | + | if let Some(expires_at) = promo_code.expires_at && expires_at <= chrono::Utc::now() { | |
| 577 | 581 | return Err(AppError::BadRequest("This promo code has expired".to_string())); | |
| 578 | 582 | } | |
| 579 | 583 |
| @@ -246,10 +246,17 @@ async fn logout_handler( | |||
| 246 | 246 | State(state): State<AppState>, | |
| 247 | 247 | session: Session, | |
| 248 | 248 | ) -> Result<impl IntoResponse> { | |
| 249 | - | // Clean up tracking row before flushing session | |
| 249 | + | // Clean up tracking row before flushing session. We require the | |
| 250 | + | // SessionUser to derive the user_id for the scoped delete; if it's | |
| 251 | + | // gone (already-stale session), skip the row delete — the cache | |
| 252 | + | // remove below is still safe and the row will get pruned by the | |
| 253 | + | // expired-session sweeper. | |
| 254 | + | let session_user = session.get::<crate::auth::SessionUser>("user").await.ok().flatten(); | |
| 250 | 255 | if let Ok(Some(tracking_id)) = session.get::<UserSessionId>(SESSION_TRACKING_KEY).await { | |
| 251 | - | if let Err(e) = db::sessions::delete_session_by_id(&state.db, tracking_id).await { | |
| 252 | - | tracing::warn!(tracking_id = %tracking_id, error = ?e, "failed to delete session tracking row on logout"); | |
| 256 | + | if let Some(ref u) = session_user { | |
| 257 | + | if let Err(e) = db::sessions::delete_session_by_id(&state.db, tracking_id, u.id).await { | |
| 258 | + | tracing::warn!(tracking_id = %tracking_id, error = ?e, "failed to delete session tracking row on logout"); | |
| 259 | + | } | |
| 253 | 260 | } | |
| 254 | 261 | state.session_cache.remove(&tracking_id); | |
| 255 | 262 | } |
| @@ -90,19 +90,21 @@ fn generate_oauth_code() -> String { | |||
| 90 | 90 | ||
| 91 | 91 | /// Validate that a redirect_uri is allowed. | |
| 92 | 92 | /// | |
| 93 | - | /// Localhost callbacks (`http://127.0.0.1:{port}/...` or `http://localhost:{port}/...`) | |
| 94 | - | /// are always permitted. Non-localhost URIs must be registered in the app's | |
| 95 | - | /// `redirect_uris` column. | |
| 93 | + | /// Localhost callbacks are always permitted. Accepts the three loopback | |
| 94 | + | /// forms RFC 8252 §7.3 calls out: | |
| 95 | + | /// - `http://127.0.0.1:{port}/...` (IPv4 loopback) | |
| 96 | + | /// - `http://[::1]:{port}/...` (IPv6 loopback, bracketed) | |
| 97 | + | /// - `http://localhost:{port}/...` (resolver-dependent, included for parity) | |
| 98 | + | /// | |
| 99 | + | /// Non-localhost URIs must be registered in the app's `redirect_uris` column. | |
| 96 | 100 | fn is_localhost_redirect(uri: &str) -> bool { | |
| 97 | - | if let Some(rest) = uri.strip_prefix("http://127.0.0.1:") | |
| 98 | - | && let Some(port_str) = rest.split('/').next() | |
| 99 | - | { | |
| 100 | - | return port_str.parse::<u16>().is_ok(); | |
| 101 | - | } | |
| 102 | - | if let Some(rest) = uri.strip_prefix("http://localhost:") | |
| 103 | - | && let Some(port_str) = rest.split('/').next() | |
| 104 | - | { | |
| 105 | - | return port_str.parse::<u16>().is_ok(); | |
| 101 | + | for prefix in ["http://127.0.0.1:", "http://[::1]:", "http://localhost:"] { | |
| 102 | + | if let Some(rest) = uri.strip_prefix(prefix) | |
| 103 | + | && let Some(port_str) = rest.split('/').next() | |
| 104 | + | && port_str.parse::<u16>().is_ok() | |
| 105 | + | { | |
| 106 | + | return true; | |
| 107 | + | } | |
| 106 | 108 | } | |
| 107 | 109 | false | |
| 108 | 110 | } | |
| @@ -457,6 +459,17 @@ async fn token_exchange( | |||
| 457 | 459 | return Err(AppError::BadRequest("redirect_uri does not match".to_string())); | |
| 458 | 460 | } | |
| 459 | 461 | ||
| 462 | + | // Verify PKCE method matches what the authorize step recorded. We only | |
| 463 | + | // accept S256 at authorize time, but pinning it here too means a future | |
| 464 | + | // change that loosens the authorize check can't silently downgrade | |
| 465 | + | // verification to `plain` (where code_verifier == code_challenge and | |
| 466 | + | // the SHA-256 step below would never run). Defense in depth. | |
| 467 | + | if oauth_code.code_challenge_method != "S256" { | |
| 468 | + | return Err(AppError::BadRequest( | |
| 469 | + | "Unsupported PKCE method on authorization code".to_string(), | |
| 470 | + | )); | |
| 471 | + | } | |
| 472 | + | ||
| 460 | 473 | // Verify PKCE: SHA256(code_verifier) must equal stored code_challenge | |
| 461 | 474 | // code_challenge is URL-safe base64 no-pad of SHA256(verifier) | |
| 462 | 475 | let mut hasher = Sha256::new(); |
| @@ -136,6 +136,10 @@ async fn changelog_index( | |||
| 136 | 136 | ) -> Result<impl IntoResponse> { | |
| 137 | 137 | let csrf_token = get_csrf_token(&session).await; | |
| 138 | 138 | ||
| 139 | + | // `from_trusted` is safe here because CHANGELOG_PROJECT_SLUG is a | |
| 140 | + | // compile-time constant (`&'static str`), not user input. The audit | |
| 141 | + | // flagged this site historically; the constant origin is the | |
| 142 | + | // justification. | |
| 139 | 143 | let slug = Slug::from_trusted(constants::CHANGELOG_PROJECT_SLUG.to_owned()); | |
| 140 | 144 | let db_project = db::projects::get_public_project_by_slug(&state.db, &slug) | |
| 141 | 145 | .await? | |
| @@ -171,6 +175,10 @@ async fn changelog_post( | |||
| 171 | 175 | ) -> Result<impl IntoResponse> { | |
| 172 | 176 | let csrf_token = get_csrf_token(&session).await; | |
| 173 | 177 | ||
| 178 | + | // `from_trusted` is safe here because CHANGELOG_PROJECT_SLUG is a | |
| 179 | + | // compile-time constant (`&'static str`), not user input. The audit | |
| 180 | + | // flagged this site historically; the constant origin is the | |
| 181 | + | // justification. | |
| 174 | 182 | let slug = Slug::from_trusted(constants::CHANGELOG_PROJECT_SLUG.to_owned()); | |
| 175 | 183 | let post_slug = Slug::new(&post_slug).map_err(|_| AppError::NotFound)?; | |
| 176 | 184 | let db_project = db::projects::get_public_project_by_slug(&state.db, &slug) |
| @@ -309,11 +309,21 @@ pub(super) async fn media_list( | |||
| 309 | 309 | AuthUser(user): AuthUser, | |
| 310 | 310 | Query(query): Query<MediaListQuery>, | |
| 311 | 311 | ) -> Result<impl IntoResponse> { | |
| 312 | - | let cdn_base = state | |
| 313 | - | .config | |
| 314 | - | .cdn_base_url | |
| 315 | - | .as_deref() | |
| 316 | - | .unwrap_or("https://cdn.makenot.work"); | |
| 312 | + | // CDN base falls back to the production host so dev/test environments | |
| 313 | + | // without CDN config still render plausible URLs. In production a | |
| 314 | + | // missing `cdn_base_url` is an operator-side misconfiguration; we log | |
| 315 | + | // a WARN once per process so it surfaces without blocking the request. | |
| 316 | + | let cdn_base = if let Some(base) = state.config.cdn_base_url.as_deref() { | |
| 317 | + | base | |
| 318 | + | } else { | |
| 319 | + | static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new(); | |
| 320 | + | WARNED.get_or_init(|| { | |
| 321 | + | tracing::warn!( | |
| 322 | + | "cdn_base_url not configured; falling back to https://cdn.makenot.work for media URLs" | |
| 323 | + | ); | |
| 324 | + | }); | |
| 325 | + | "https://cdn.makenot.work" | |
| 326 | + | }; | |
| 317 | 327 | ||
| 318 | 328 | let files = db::media_files::list_by_user_folder( | |
| 319 | 329 | &state.db, |
| @@ -287,6 +287,21 @@ pub(in crate::routes::stripe) async fn create_cart_checkout( | |||
| 287 | 287 | }) | |
| 288 | 288 | .collect(); | |
| 289 | 289 | ||
| 290 | + | // Reject sub-Stripe-minimum totals before calling Stripe; same rationale | |
| 291 | + | // as the per-seller path further down — chained promo+PWYW combinations | |
| 292 | + | // can land between 1¢ and 49¢, and Stripe's error message for that is | |
| 293 | + | // not user-friendly. | |
| 294 | + | let cart_total: i64 = line_items.iter().map(|li| li.amount_cents).sum(); | |
| 295 | + | if cart_total > 0 && cart_total < crate::constants::STRIPE_MINIMUM_CHARGE_CENTS { | |
| 296 | + | if let Some(pc_id) = promo_code_id { | |
| 297 | + | db::promo_codes::release_use_count_and_detach(&state.db, pc_id, user.id).await.ok(); | |
| 298 | + | } | |
| 299 | + | return Err(AppError::BadRequest(format!( | |
| 300 | + | "Minimum cart total is ${:.2}", | |
| 301 | + | crate::constants::STRIPE_MINIMUM_CHARGE_CENTS as f64 / 100.0 | |
| 302 | + | ))); | |
| 303 | + | } | |
| 304 | + | ||
| 290 | 305 | let success_url = format!( | |
| 291 | 306 | "{}/stripe/success?session_id={{CHECKOUT_SESSION_ID}}", | |
| 292 | 307 | state.config.host_url | |
| @@ -616,6 +631,18 @@ pub(super) async fn process_seller_checkout( | |||
| 616 | 631 | }) | |
| 617 | 632 | .collect(); | |
| 618 | 633 | ||
| 634 | + | // Reject sub-Stripe-minimum totals here. The cart flow doesn't share | |
| 635 | + | // the same `check_min_charge` gate as item/subscription checkout, so a | |
| 636 | + | // chained promo+PWYW combination that lands between 1¢ and 49¢ would | |
| 637 | + | // be accepted here and then rejected by Stripe with a confusing error. | |
| 638 | + | let cart_total: i64 = line_items.iter().map(|li| li.amount_cents).sum(); | |
| 639 | + | if cart_total > 0 && cart_total < crate::constants::STRIPE_MINIMUM_CHARGE_CENTS { | |
| 640 | + | return Err(AppError::BadRequest(format!( | |
| 641 | + | "Minimum cart total is ${:.2}", | |
| 642 | + | crate::constants::STRIPE_MINIMUM_CHARGE_CENTS as f64 / 100.0 | |
| 643 | + | ))); | |
| 644 | + | } | |
| 645 | + | ||
| 619 | 646 | let success_url = format!("{}/stripe/success?session_id={{CHECKOUT_SESSION_ID}}", state.config.host_url); | |
| 620 | 647 | let cancel_url = format!("{}/cart", state.config.host_url); | |
| 621 | 648 |
| @@ -51,6 +51,13 @@ pub(super) async fn handle_invoice_payment_succeeded( | |||
| 51 | 51 | if is_renewal { | |
| 52 | 52 | let period_end = chrono::DateTime::from_timestamp(invoice.period_end, 0); | |
| 53 | 53 | ||
| 54 | + | // Uniqueness of the generated code is enforced by the DB-level | |
| 55 | + | // `UNIQUE(creator_id, upper(code))` partial index on `promo_codes` | |
| 56 | + | // (see migration 019, idx_promo_codes_creator_code). The wordlist | |
| 57 | + | // gives ~66 bits of entropy (6 words × log₂2048) so a collision | |
| 58 | + | // within a single creator's history is astronomically unlikely; | |
| 59 | + | // if one ever lands, the INSERT errors out as DB error 23505 and | |
| 60 | + | // surfaces to the operator log — no silent overwrite. | |
| 54 | 61 | let code = helpers::generate_key_code(); | |
| 55 | 62 | match db::promo_codes::create_platform_promo_code( | |
| 56 | 63 | &state.db, |
| @@ -2,6 +2,13 @@ | |||
| 2 | 2 | //! | |
| 3 | 3 | //! Uses the `infer` crate to detect actual file type from magic bytes and | |
| 4 | 4 | //! compares against the claimed FileType. | |
| 5 | + | //! | |
| 6 | + | //! **Polyglot blind-spot disclaimer**: magic-byte sniffing is defense in | |
| 7 | + | //! depth, not a security boundary. A polyglot file (e.g. valid PE that | |
| 8 | + | //! also has audio framing at offset N) can satisfy this layer's "looks | |
| 9 | + | //! like audio" check and still be malicious. Layers 2 (structural), | |
| 10 | + | //! 3 (archive), 4 (YARA), and 5 (ClamAV) all exist precisely so this | |
| 11 | + | //! one isn't the only thing standing between the user and a payload. | |
| 5 | 12 | ||
| 6 | 13 | use crate::storage::FileType; | |
| 7 | 14 |
| @@ -449,6 +449,13 @@ impl S3Client { | |||
| 449 | 449 | /// Sanitize a filename: keep only alphanumeric, dots, dashes, and underscores. | |
| 450 | 450 | /// Prevents path traversal, shell injection, and S3 key encoding issues. | |
| 451 | 451 | /// Falls back to "file" if the sanitized result has no basename (only extension or empty). | |
| 452 | + | /// | |
| 453 | + | /// **By design**: the sanitizer keeps `.`/`-`/`_` and strips everything else, | |
| 454 | + | /// so e.g. `"../etc/passwd"` collapses to `"..etcpasswd"` — preserved as a | |
| 455 | + | /// literal filename, not as a directory traversal. The unit test pins this | |
| 456 | + | /// behavior: we don't reject names containing `..`, we just guarantee the | |
| 457 | + | /// output has no path separators. S3 keys are namespaced by user/item ID | |
| 458 | + | /// upstream, so a flat literal here can't escape the user's prefix. | |
| 452 | 459 | fn sanitize_filename(filename: &str) -> String { | |
| 453 | 460 | let sanitized: String = filename | |
| 454 | 461 | .chars() | |
| @@ -525,7 +532,10 @@ pub fn extract_s3_key_from_url( | |||
| 525 | 532 | } | |
| 526 | 533 | } | |
| 527 | 534 | ||
| 528 | - | // Path-style S3: strip `https://{host}/{bucket}/`. | |
| 535 | + | // Path-style S3: strip `https://{host}/{bucket}/`. HTTP URLs are | |
| 536 | + | // intentionally not handled — every CDN and S3 endpoint we use | |
| 537 | + | // serves over TLS, and an `http://` URL in `cover_image_url` would | |
| 538 | + | // be an operator-side misconfiguration we shouldn't paper over. | |
| 529 | 539 | if let Some(bucket) = bucket | |
| 530 | 540 | && let Some(rest) = no_query.strip_prefix("https://") | |
| 531 | 541 | { |
| @@ -62,7 +62,13 @@ pub fn create_sync_token( | |||
| 62 | 62 | ||
| 63 | 63 | /// Decode and validate a sync JWT. | |
| 64 | 64 | /// | |
| 65 | - | /// Validates signature (HS256), expiry, and issuer claim. | |
| 65 | + | /// Validates signature (HS256), expiry, issuer claim, and rejects future-`iat` | |
| 66 | + | /// tokens. The future-`iat` check is the defense-in-depth match for our | |
| 67 | + | /// `jwt_invalidated_at` revocation strategy: if a stolen secret were used | |
| 68 | + | /// to mint a token with `iat = now + 1 year`, the iat-based revocation | |
| 69 | + | /// check in `SyncUser::from_request_parts` would always see | |
| 70 | + | /// `claims.iat >= invalidated_at` and let the token survive any password | |
| 71 | + | /// change or admin suspend. Rejecting future-dated tokens here closes that. | |
| 66 | 72 | pub fn decode_sync_token(secret: &str, token: &str) -> Result<SyncClaims, AppError> { | |
| 67 | 73 | let mut validation = Validation::new(Algorithm::HS256); | |
| 68 | 74 | validation.set_issuer(&[SYNCKIT_JWT_ISSUER]); | |
| @@ -74,6 +80,14 @@ pub fn decode_sync_token(secret: &str, token: &str) -> Result<SyncClaims, AppErr | |||
| 74 | 80 | ) | |
| 75 | 81 | .map_err(|_| AppError::Unauthorized)?; | |
| 76 | 82 | ||
| 83 | + | // Reject `iat > now + clock_skew`. 60s skew matches the jsonwebtoken | |
| 84 | + | // crate's default `leeway` and absorbs typical NTP drift without | |
| 85 | + | // letting a deliberately future-dated token through. | |
| 86 | + | let now = chrono::Utc::now().timestamp(); | |
| 87 | + | if data.claims.iat > now + 60 { | |
| 88 | + | return Err(AppError::Unauthorized); | |
| 89 | + | } | |
| 90 | + | ||
| 77 | 91 | Ok(data.claims) | |
| 78 | 92 | } | |
| 79 | 93 | ||
| @@ -342,12 +356,15 @@ mod tests { | |||
| 342 | 356 | } | |
| 343 | 357 | ||
| 344 | 358 | #[test] | |
| 345 | - | fn token_with_future_iat_accepted() { | |
| 359 | + | fn token_with_future_iat_rejected() { | |
| 360 | + | // Defense-in-depth: future-dated iat would defeat the | |
| 361 | + | // jwt_invalidated_at revocation strategy in SyncUser, since the | |
| 362 | + | // iat-based comparison would always see iat >= invalidated_at. | |
| 363 | + | // decode_sync_token rejects iat > now + 60s clock skew. | |
| 346 | 364 | let user_id = UserId::new(); | |
| 347 | 365 | let app_id = SyncAppId::new(); | |
| 348 | 366 | let now = chrono::Utc::now().timestamp(); | |
| 349 | 367 | ||
| 350 | - | // iat set far in the future; exp is still valid | |
| 351 | 368 | let claims = SyncClaims { | |
| 352 | 369 | sub: user_id, | |
| 353 | 370 | app: app_id, | |
| @@ -364,9 +381,34 @@ mod tests { | |||
| 364 | 381 | ) | |
| 365 | 382 | .unwrap(); | |
| 366 | 383 | ||
| 367 | - | // Should still decode since iat is not validated by default | |
| 368 | - | let decoded = decode_sync_token(TEST_SECRET, &token).unwrap(); | |
| 369 | - | assert_eq!(decoded.sub, user_id); | |
| 370 | - | assert_eq!(decoded.app, app_id); | |
| 384 | + | assert!(decode_sync_token(TEST_SECRET, &token).is_err()); | |
| 385 | + | } | |
| 386 | + | ||
| 387 | + | #[test] | |
| 388 | + | fn token_with_iat_within_skew_accepted() { | |
| 389 | + | // A small clock-skew window (60s default) must still pass so two | |
| 390 | + | // servers with mildly out-of-sync clocks don't reject each other's | |
| 391 | + | // freshly-minted tokens. | |
| 392 | + | let user_id = UserId::new(); | |
| 393 | + | let app_id = SyncAppId::new(); | |
| 394 | + | let now = chrono::Utc::now().timestamp(); | |
| 395 | + | ||
| 396 | + | let claims = SyncClaims { | |
| 397 | + | sub: user_id, | |
| 398 | + | app: app_id, | |
| 399 | + | key: TEST_KEY.to_string(), | |
| 400 | + | iss: SYNCKIT_JWT_ISSUER.to_string(), | |
| 401 | + | exp: now + SYNCKIT_JWT_EXPIRY_SECS, | |
| 402 | + | iat: now + 30, // within the 60s skew window | |
| 403 | + | }; | |
| 404 | + | ||
| 405 | + | let token = encode( | |
| 406 | + | &Header::default(), | |
| 407 | + | &claims, | |
| 408 | + | &EncodingKey::from_secret(TEST_SECRET.as_bytes()), | |
| 409 | + | ) | |
| 410 | + | .unwrap(); | |
| 411 | + | ||
| 412 | + | assert!(decode_sync_token(TEST_SECRET, &token).is_ok()); | |
| 371 | 413 | } | |
| 372 | 414 | } |
| @@ -399,19 +399,46 @@ impl S3Client { | |||
| 399 | 399 | break; | |
| 400 | 400 | } | |
| 401 | 401 | ||
| 402 | - | let body = aws_sdk_s3::primitives::ByteStream::from(buf[..bytes_read].to_vec()); | |
| 403 | - | let part_resp = self | |
| 404 | - | .client | |
| 405 | - | .upload_part() | |
| 406 | - | .bucket(&self.bucket) | |
| 407 | - | .key(key) | |
| 408 | - | .upload_id(&upload_id) | |
| 409 | - | .part_number(part_number) | |
| 410 | - | .body(body) | |
| 411 | - | .send() | |
| 412 | - | .await; | |
| 402 | + | // Retry the part upload up to 3 times on transient failures. | |
| 403 | + | // S3 part uploads can flake on network blips; aborting the | |
| 404 | + | // whole multipart upload because of one timeout means the | |
| 405 | + | // caller has to restart from byte 0. Three attempts with | |
| 406 | + | // exponential backoff covers the common transient cases | |
| 407 | + | // without making a permanent failure (auth, oversize, etc.) | |
| 408 | + | // wait forever. | |
| 409 | + | let mut attempt: u32 = 0; | |
| 410 | + | let resp = loop { | |
| 411 | + | attempt += 1; | |
| 412 | + | let body = aws_sdk_s3::primitives::ByteStream::from(buf[..bytes_read].to_vec()); | |
| 413 | + | match self | |
| 414 | + | .client | |
| 415 | + | .upload_part() | |
| 416 | + | .bucket(&self.bucket) | |
| 417 | + | .key(key) | |
| 418 | + | .upload_id(&upload_id) | |
| 419 | + | .part_number(part_number) | |
| 420 | + | .body(body) | |
| 421 | + | .send() | |
| 422 | + | .await | |
| 423 | + | { | |
| 424 | + | Ok(resp) => break Ok(resp), | |
| 425 | + | Err(e) if attempt < 3 => { | |
| 426 | + | // Backoff: 200ms, 800ms. Cheap enough not to | |
| 427 | + | // mask a permanent failure; long enough that | |
| 428 | + | // a brief network glitch resolves. | |
| 429 | + | let delay_ms = 200u64 * (1u64 << (attempt - 1) * 2); | |
| 430 | + | tracing::warn!( | |
| 431 | + | part_number, attempt, delay_ms, error = ?e, | |
| 432 | + | "S3 upload_part transient failure, retrying" | |
| 433 | + | ); | |
| 434 | + | tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; | |
| 435 | + | continue; | |
| 436 | + | } | |
| 437 | + | Err(e) => break Err(e), | |
| 438 | + | } | |
| 439 | + | }; | |
| 413 | 440 | ||
| 414 | - | match part_resp { | |
| 441 | + | match resp { | |
| 415 | 442 | Ok(resp) => { | |
| 416 | 443 | let etag = resp.e_tag().unwrap_or_default().to_string(); | |
| 417 | 444 | completed_parts.push( | |
| @@ -423,7 +450,7 @@ impl S3Client { | |||
| 423 | 450 | } | |
| 424 | 451 | Err(e) => { | |
| 425 | 452 | self.abort_multipart_upload(key, &upload_id).await; | |
| 426 | - | return Err(format!("S3 upload part {part_number} failed: {e}")); | |
| 453 | + | return Err(format!("S3 upload part {part_number} failed after retries: {e}")); | |
| 427 | 454 | } | |
| 428 | 455 | } | |
| 429 | 456 |