Skip to main content

max / makenotwork

server: Ultra Fuzz Run #13 remediation — security fixes + chronic structural fix Security findings (all axes): - scanning: detect gzip/bzip2/xz/zstd bombs by magic and decompress-count against the size/ratio caps (early-exit); catch prefixed/self-extracting ZIPs via tail EOCD scan. Closes the FailOpen-ClamAV-only gap for the common standalone-compressor bomb vectors. Adds flate2/bzip2/xz2/zstd as direct deps (already in the tree via zip). - scanning/worker: purge quarantined CDN-served image kinds (project/gallery/ content-insertion) from storage — the only enforcement point, since they have no scan_status column and no app-proxied gate. - rate_limit: SyncAppKeyExtractor now verifies the JWT signature; forged/ malformed tokens collapse to one nil bucket so a sprayed app claim can't mint unlimited rate-limit buckets. Secret threaded build_app -> synckit_routes. - git_ssh: exec path interpolates the Username-validated owner, not the raw str. - db/auth: account lockout no longer re-extends on every failed attempt during an active lock (perpetual-lockout nuisance). - auth: check_password_breach logs HIBP lookup failures instead of failing silently open. - profile: clarify that update_user_password already bumps jwt_invalidated_at (the change-password JWT-revocation finding was a false positive). Chronic meta-pattern (safety guard applied to N-1 of N siblings) — structural fix: - Subscriptions: one guarded writer per family writing status+period together, so an ungated period write is no longer a writable shape. The three identical families share a single macro (db::subscription_writer) so the terminal- canceled guard lives in one place; synckit_billing gets a hand-written apply_billing_update (usage-reset gated on the guarded write landing). Raw split status/period setters deleted; both webhook dispatchers rewired. - Upload-confirm: FileType::generic_item_confirm() is the single exhaustive declaration of each type's confirm posture (a new variant fails the build). Cover is routed to its dedicated /api/items/image/confirm handler instead of half-writing cover_s3_key without cover_image_url. Verified: 1615 lib tests, 855 integration tests, clippy clean across all targets. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-08 20:18 UTC
Commit: 86b9a096721ffd80c91e9e7143858336bf15eb95
Parent: 01badaa
27 files changed, +1024 insertions, -418 deletions
@@ -4160,6 +4160,7 @@ dependencies = [
4160 4160 "axum",
4161 4161 "axum-extra",
4162 4162 "base64 0.22.1",
4163 + "bzip2 0.4.4",
4163 4164 "chrono",
4164 4165 "clap",
4165 4166 "const-oid 0.9.6",
@@ -4168,6 +4169,7 @@ dependencies = [
4168 4169 "docengine",
4169 4170 "dotenvy",
4170 4171 "email_address",
4172 + "flate2",
4171 4173 "fs2",
4172 4174 "git2",
4173 4175 "goblin 0.10.5",
@@ -4220,8 +4222,10 @@ dependencies = [
4220 4222 "webauthn-rs-proto",
4221 4223 "wiremock",
4222 4224 "x509-cert",
4225 + "xz2",
4223 4226 "yara-x",
4224 4227 "zip 8.2.0",
4228 + "zstd",
4225 4229 ]
4226 4230
4227 4231 [[package]]
@@ -79,6 +79,12 @@ tempfile = "3"
79 79 infer = "0.19"
80 80 goblin = "0.10"
81 81 zip = "8.2"
82 + # Single-stream decompressors for archive-bomb detection (already in the tree
83 + # transitively via `zip`; pinned to the resolved versions so no new build).
84 + flate2 = "1"
85 + bzip2 = "0.4"
86 + xz2 = "0.1"
87 + zstd = "0.13"
82 88 yara-x = "1.16"
83 89 fs2 = "0.4"
84 90 memmap2 = "0.9"
@@ -560,6 +560,11 @@ pub async fn maybe_send_login_notification(
560 560 /// Check if a password appears in the HaveIBeenPwned breached passwords database.
561 561 /// Uses k-anonymity: only the first 5 characters of the SHA-1 hash are sent.
562 562 /// Returns Some(count) if breached, None if clean or API unavailable.
563 + ///
564 + /// This check is advisory (it never blocks a password change), so a lookup
565 + /// failure fails open — but it must not fail *silently*. A network blip or
566 + /// HIBP outage that disables breach checking is logged at WARN so the gap is
567 + /// visible in observability rather than disappearing into a bare `?`.
563 568 pub async fn check_password_breach(password: &str) -> Option<u64> {
564 569 use sha1::{Sha1, Digest};
565 570
@@ -567,17 +572,27 @@ pub async fn check_password_breach(password: &str) -> Option<u64> {
567 572 let (prefix, suffix) = hash.split_at(5);
568 573
569 574 let url = format!("https://api.pwnedpasswords.com/range/{}", prefix);
570 - let response = reqwest::Client::new()
575 + let response = match reqwest::Client::new()
571 576 .get(&url)
572 577 .header("User-Agent", "MakeNotWork-Security-Check")
573 578 .header("Add-Padding", "true")
574 579 .timeout(std::time::Duration::from_secs(3))
575 580 .send()
576 581 .await
577 - .ok()?
578 - .text()
579 - .await
580 - .ok()?;
582 + {
583 + Ok(resp) => resp,
584 + Err(e) => {
585 + tracing::warn!(error = %e, "HIBP breach lookup failed (network/timeout); breach check skipped (fail-open)");
586 + return None;
587 + }
588 + };
589 + let response = match response.text().await {
590 + Ok(body) => body,
591 + Err(e) => {
592 + tracing::warn!(error = %e, "HIBP breach lookup: could not read response body; breach check skipped (fail-open)");
593 + return None;
594 + }
595 + };
581 596
582 597 for line in response.lines() {
583 598 let mut parts = line.splitn(2, ':');
@@ -34,7 +34,16 @@ pub async fn increment_failed_login(
34 34 SET failed_login_attempts = failed_login_attempts + 1,
35 35 last_failed_login_at = NOW(),
36 36 locked_until = CASE
37 - WHEN failed_login_attempts + 1 >= $2 THEN NOW() + ($3 || ' minutes')::interval
37 + -- Set/refresh the lock only when reaching the threshold AND not
38 + -- already inside an active lock window. Without the second clause,
39 + -- every failed attempt during a lock pushed `locked_until` forward
40 + -- another window, letting an attacker keep a victim perpetually
41 + -- locked with one wrong password every <LOCKOUT_MINUTES. Now an
42 + -- active lock simply runs out; a fresh failure after it expires
43 + -- re-locks (the counter isn't reset until a successful login).
44 + WHEN failed_login_attempts + 1 >= $2
45 + AND (locked_until IS NULL OR locked_until <= NOW())
46 + THEN NOW() + ($3 || ' minutes')::interval
38 47 ELSE locked_until
39 48 END
40 49 WHERE id = $1
@@ -3,7 +3,7 @@
3 3 use chrono::{DateTime, Utc};
4 4 use sqlx::PgPool;
5 5
6 - use super::enums::{CreatorTier, SubscriptionStatus};
6 + use super::enums::CreatorTier;
7 7 use super::id_types::*;
8 8 use super::models::{DbCreatorSubscription, StorageBreakdown};
9 9 use crate::error::{AppError, Result};
@@ -102,66 +102,16 @@ pub async fn get_active_creator_tier(
102 102 Ok(tier.and_then(|t| t.parse().ok()))
103 103 }
104 104
105 - /// Update the status of a creator subscription.
106 - /// Sets canceled_at when transitioning to canceled, preserving existing value.
107 - /// Returns None if not found OR already canceled.
108 - ///
109 - /// `canceled` is terminal here too (see `subscriptions::update_subscription_status`
110 - /// for the out-of-order-retry revival this guard prevents). Re-subscribing runs
111 - /// through `create_creator_subscription`'s `ON CONFLICT (user_id) DO UPDATE`
112 - /// path, which overwrites the row with the new stripe_subscription_id and flips
113 - /// it active — it does NOT go through this function — so the guard never blocks
114 - /// a legitimate reactivation.
115 - #[tracing::instrument(skip_all)]
116 - pub async fn update_creator_sub_status<'e>(
117 - executor: impl sqlx::PgExecutor<'e>,
118 - stripe_subscription_id: &str,
119 - status: SubscriptionStatus,
120 - ) -> Result<Option<DbCreatorSubscription>> {
121 - let sub = sqlx::query_as::<_, DbCreatorSubscription>(
122 - r#"
123 - UPDATE creator_subscriptions
124 - SET status = $2,
125 - canceled_at = CASE
126 - WHEN $2 = 'canceled' THEN COALESCE(canceled_at, NOW())
127 - ELSE canceled_at
128 - END
129 - WHERE stripe_subscription_id = $1
130 - AND (status != 'canceled' OR $2 = 'canceled')
131 - RETURNING *
132 - "#,
133 - )
134 - .bind(stripe_subscription_id)
135 - .bind(status)
136 - .fetch_optional(executor)
137 - .await?;
138 -
139 - Ok(sub)
140 - }
141 -
142 - /// Update the billing period of a creator subscription.
143 - #[tracing::instrument(skip_all)]
144 - pub async fn update_creator_sub_period<'e>(
145 - executor: impl sqlx::PgExecutor<'e>,
146 - stripe_subscription_id: &str,
147 - start: DateTime<Utc>,
148 - end: DateTime<Utc>,
149 - ) -> Result<()> {
150 - sqlx::query(
151 - r#"
152 - UPDATE creator_subscriptions
153 - SET current_period_start = $2, current_period_end = $3
154 - WHERE stripe_subscription_id = $1
155 - "#,
156 - )
157 - .bind(stripe_subscription_id)
158 - .bind(start)
159 - .bind(end)
160 - .execute(executor)
161 - .await?;
162 -
163 - Ok(())
164 - }
105 + // Apply a Stripe-driven status and/or period update in one guarded statement.
106 + // `canceled` is terminal; reactivation runs through `create_creator_subscription`'s
107 + // `ON CONFLICT (user_id) DO UPDATE` at checkout, never through this path. Replaces
108 + // the old split status/period setters (the period half lacked the guard). See
109 + // `crate::db::subscription_writer`.
110 + crate::db::subscription_writer::define_stripe_subscription_writer!(
111 + apply_stripe_update,
112 + "creator_subscriptions",
113 + DbCreatorSubscription
114 + );
165 115
166 116 /// Cancel a creator subscription (set status + canceled_at).
167 117 #[tracing::instrument(skip_all)]
@@ -1,9 +1,7 @@
1 1 //! Fan+ consumer subscription queries.
2 2
3 - use chrono::{DateTime, Utc};
4 3 use sqlx::PgPool;
5 4
6 - use super::enums::SubscriptionStatus;
7 5 use super::id_types::*;
8 6 use super::models::DbFanPlusSubscription;
9 7 use crate::error::Result;
@@ -56,70 +54,18 @@ pub async fn get_fan_plus_by_stripe_id(
56 54 Ok(sub)
57 55 }
58 56
59 - /// Update the status of a Fan+ subscription.
60 - /// Sets canceled_at when transitioning to canceled, preserving existing value.
61 - /// Returns None if not found OR already canceled.
62 - ///
63 - /// `canceled` is terminal: the `AND (status != 'canceled' OR $2 = 'canceled')`
64 - /// guard refuses to transition a canceled Fan+ sub back to active/past_due, so an
65 - /// out-of-order/retried `customer.subscription.updated`(active) landing after a
66 - /// `deleted` can't revive it (Run #12 SERIOUS — the sibling the Run #11 revival
67 - /// guard missed). Re-subscribe runs through `create_fan_plus_subscription`'s
68 - /// ON CONFLICT DO UPDATE, not this setter, so legitimate reactivation is
69 - /// unaffected. Every subscription status setter now enforces this invariant at
70 - /// the data layer (see also `subscriptions::update_subscription_status`,
71 - /// `creator_tiers::update_creator_sub_status`, `synckit_billing::set_billing_status`,
72 - /// `synckit::subscriptions::update_app_sync_subscription_status`).
73 - #[tracing::instrument(skip_all)]
74 - pub async fn update_fan_plus_status<'e>(
75 - executor: impl sqlx::PgExecutor<'e>,
76 - stripe_subscription_id: &str,
77 - status: SubscriptionStatus,
78 - ) -> Result<Option<DbFanPlusSubscription>> {
79 - let sub = sqlx::query_as::<_, DbFanPlusSubscription>(
80 - r#"
81 - UPDATE fan_plus_subscriptions
82 - SET status = $2,
83 - canceled_at = CASE
84 - WHEN $2 = 'canceled' THEN COALESCE(canceled_at, NOW())
85 - ELSE canceled_at
86 - END
87 - WHERE stripe_subscription_id = $1
88 - AND (status != 'canceled' OR $2 = 'canceled')
89 - RETURNING *
90 - "#,
91 - )
92 - .bind(stripe_subscription_id)
93 - .bind(status)
94 - .fetch_optional(executor)
95 - .await?;
96 -
97 - Ok(sub)
98 - }
99 -
100 - /// Update the billing period of a Fan+ subscription.
101 - #[tracing::instrument(skip_all)]
102 - pub async fn update_fan_plus_period<'e>(
103 - executor: impl sqlx::PgExecutor<'e>,
104 - stripe_subscription_id: &str,
105 - start: DateTime<Utc>,
106 - end: DateTime<Utc>,
107 - ) -> Result<()> {
108 - sqlx::query(
109 - r#"
110 - UPDATE fan_plus_subscriptions
111 - SET current_period_start = $2, current_period_end = $3
112 - WHERE stripe_subscription_id = $1
113 - "#,
114 - )
115 - .bind(stripe_subscription_id)
116 - .bind(start)
117 - .bind(end)
118 - .execute(executor)
119 - .await?;
120 -
121 - Ok(())
122 - }
57 + // Apply a Stripe-driven status and/or period update in one guarded statement.
58 + // Fan+ was the sibling the Run #11 revival guard missed (Run #12 SERIOUS), and
59 + // its period setter then lacked the guard too. Writing status + period together
60 + // here closes both: a canceled Fan+ sub can be neither revived nor period-
61 + // refreshed by an out-of-order webhook. Reactivation runs through
62 + // `create_fan_plus_subscription`'s ON CONFLICT DO UPDATE at checkout, never this
63 + // path. See `crate::db::subscription_writer`.
64 + crate::db::subscription_writer::define_stripe_subscription_writer!(
65 + apply_stripe_update,
66 + "fan_plus_subscriptions",
67 + DbFanPlusSubscription
68 + );
123 69
124 70 /// Cancel a Fan+ subscription (set status + canceled_at).
125 71 #[tracing::instrument(skip_all)]
@@ -8,6 +8,7 @@ mod id_types;
8 8 mod validated_types;
9 9 mod enums;
10 10 mod models;
11 + mod subscription_writer;
11 12 pub mod users;
12 13 pub(crate) mod projects;
13 14 pub mod items;
@@ -57,6 +57,25 @@ impl ScanTargetKind {
57 57 _ => return None,
58 58 })
59 59 }
60 +
61 + /// Whether this kind is served directly from the CDN by `s3_key` with no
62 + /// app-proxied download route that can consult a `scan_status` column.
63 + ///
64 + /// `Item`/`Version`/`Media` are gated at their download handlers (the route
65 + /// refuses to issue a non-`Clean` object). These three image kinds carry no
66 + /// `scan_status` column and are rendered straight from `cdn.makenot.work/{key}`,
67 + /// so the ONLY enforcement point for a `Quarantined` verdict is removing the
68 + /// object from storage — otherwise the malicious file keeps serving from the
69 + /// CDN until an admin manually acts on the WAM ticket. The worker purges the
70 + /// object for these kinds on quarantine.
71 + pub fn is_cdn_served_without_gate(&self) -> bool {
72 + matches!(
73 + self,
74 + ScanTargetKind::ProjectImage
75 + | ScanTargetKind::GalleryImage
76 + | ScanTargetKind::ContentInsertion
77 + )
78 + }
60 79 }
61 80
62 81 /// A queued or running scan job, as claimed by a worker. Most fields are
@@ -0,0 +1,77 @@
1 + //! One guarded writer for Stripe-driven subscription state, shared by every
2 + //! standard subscription family.
3 + //!
4 + //! ## Why this exists
5 + //!
6 + //! Across four consecutive audit runs the same bug shape recurred: a family's
7 + //! `update_*_status` carried the `canceled`-is-terminal guard, but its sibling
8 + //! `update_*_period` did NOT — so an out-of-order `invoice.payment_succeeded`
9 + //! (or a stray `customer.subscription.updated`) arriving after a cancellation
10 + //! refreshed the period on a canceled row, and each new family copied the same
11 + //! split (status guarded, period unguarded).
12 + //!
13 + //! Copying the guard onto each period setter is the fix class that kept failing.
14 + //! Instead, this macro makes the *split itself* unwritable for these families:
15 + //! it emits a SINGLE function that writes status AND period together under one
16 + //! guard. There is no separate period write to forget the guard on, and the
17 + //! guard exists in exactly ONE place (this macro body) rather than copied per
18 + //! family. Adding a new standard family is one macro invocation that carries
19 + //! the guard by construction.
20 + //!
21 + //! `synckit_billing` (string `billing_status`, keyed by app id, with a
22 + //! `suspended_unpaid -> active` recovery exception) and `synckit` app-sub
23 + //! (only `current_period_end`) don't share this exact shape; each has its own
24 + //! single guarded writer in its module.
25 +
26 + /// Generate `pub async fn $fn(executor, stripe_sub_id, status, period)` for a
27 + /// table with `status TEXT`, `canceled_at`, and `current_period_{start,end}`
28 + /// columns keyed by `stripe_subscription_id`, returning the updated `$row`.
29 + ///
30 + /// `status` and `period` are independently optional: `customer.subscription.
31 + /// updated` passes both, `invoice.payment_succeeded` passes period-only,
32 + /// `invoice.payment_failed` passes status-only. In every case the write is
33 + /// refused (no row matched -> `Ok(None)`) when the row is already `canceled`,
34 + /// unless the new status is itself `canceled` (so a duplicate delete stays
35 + /// idempotent). Reactivation never flows through here — it happens at checkout
36 + /// via each family's `create_*`/`ON CONFLICT DO UPDATE` path.
37 + macro_rules! define_stripe_subscription_writer {
38 + ($fn:ident, $table:literal, $row:ty) => {
39 + #[doc = concat!("Guarded Stripe state write for `", $table, "`. See ")]
40 + #[doc = "[`crate::db::subscription_writer`] for why status + period are written together."]
41 + #[tracing::instrument(skip_all)]
42 + pub async fn $fn<'e>(
43 + executor: impl sqlx::PgExecutor<'e>,
44 + stripe_sub_id: &str,
45 + status: Option<$crate::db::SubscriptionStatus>,
46 + period: Option<(
47 + ::chrono::DateTime<::chrono::Utc>,
48 + ::chrono::DateTime<::chrono::Utc>,
49 + )>,
50 + ) -> $crate::error::Result<Option<$row>> {
51 + let (period_start, period_end) = match period {
52 + Some((start, end)) => (Some(start), Some(end)),
53 + None => (None, None),
54 + };
55 + let row = sqlx::query_as::<_, $row>(concat!(
56 + "UPDATE ", $table, " SET ",
57 + "status = COALESCE($2, status), ",
58 + "canceled_at = CASE WHEN $2 = 'canceled' THEN COALESCE(canceled_at, NOW()) ELSE canceled_at END, ",
59 + "current_period_start = COALESCE($3, current_period_start), ",
60 + "current_period_end = COALESCE($4, current_period_end) ",
61 + "WHERE stripe_subscription_id = $1 ",
62 + "AND (status != 'canceled' OR $2 = 'canceled') ",
63 + "RETURNING *",
64 + ))
65 + .bind(stripe_sub_id)
66 + .bind(status)
67 + .bind(period_start)
68 + .bind(period_end)
69 + .fetch_optional(executor)
70 + .await?;
71 +
72 + Ok(row)
73 + }
74 + };
75 + }
76 +
77 + pub(crate) use define_stripe_subscription_writer;
@@ -1,9 +1,7 @@
1 1 //! Subscription queries: tier CRUD, subscription lifecycle, and access control.
2 2
3 - use chrono::{DateTime, Utc};
4 3 use sqlx::PgPool;
5 4
6 - use super::enums::SubscriptionStatus;
7 5 use super::models::*;
8 6 use super::{PriceCents, ProjectId, SubscriptionId, SubscriptionTierId, UserId};
9 7 use crate::error::Result;
@@ -226,69 +224,18 @@ pub async fn get_subscription_by_stripe_id(
226 224 Ok(sub)
227 225 }
228 226
229 - /// Update subscription status (active, past_due, canceled, unpaid).
230 - /// Sets canceled_at when transitioning to canceled, preserving existing value.
231 - /// Returns the updated record, or None if not found OR already canceled.
232 - ///
233 - /// `canceled` is terminal: the `AND (status != 'canceled' OR $2 = 'canceled')`
234 - /// guard refuses to transition a canceled row back to active/past_due. Without
235 - /// it, a `customer.subscription.updated` (active) that failed, queued, and got
236 - /// retried AFTER a later `customer.subscription.deleted` already processed would
237 - /// revive the canceled subscription (and `sync_user_creator_tier` /
238 - /// `unhide_all_items_for_user` would restore access). Re-subscribing always
239 - /// creates a NEW row via `create_subscription`, so nothing legitimately
240 - /// transitions out of canceled. Re-cancel (canceled -> canceled) stays allowed
241 - /// so a duplicate delete is idempotent.
242 - #[tracing::instrument(skip_all)]
243 - pub async fn update_subscription_status<'e>(
244 - executor: impl sqlx::PgExecutor<'e>,
245 - stripe_sub_id: &str,
246 - status: SubscriptionStatus,
247 - ) -> Result<Option<DbSubscription>> {
248 - let sub = sqlx::query_as::<_, DbSubscription>(
249 - r#"
250 - UPDATE subscriptions
251 - SET status = $2,
252 - canceled_at = CASE
253 - WHEN $2 = 'canceled' THEN COALESCE(canceled_at, NOW())
254 - ELSE canceled_at
255 - END
256 - WHERE stripe_subscription_id = $1
257 - AND (status != 'canceled' OR $2 = 'canceled')
258 - RETURNING *
259 - "#,
260 - )
261 - .bind(stripe_sub_id)
262 - .bind(status)
263 - .fetch_optional(executor)
264 - .await?;
265 -
266 - Ok(sub)
267 - }
268 -
269 - /// Update the billing period timestamps for a subscription.
270 - #[tracing::instrument(skip_all)]
271 - pub async fn update_subscription_period<'e>(
272 - executor: impl sqlx::PgExecutor<'e>,
273 - stripe_sub_id: &str,
274 - period_start: DateTime<Utc>,
275 - period_end: DateTime<Utc>,
276 - ) -> Result<()> {
277 - sqlx::query(
278 - r#"
279 - UPDATE subscriptions
280 - SET current_period_start = $2, current_period_end = $3
281 - WHERE stripe_subscription_id = $1
282 - "#,
283 - )
284 - .bind(stripe_sub_id)
285 - .bind(period_start)
286 - .bind(period_end)
287 - .execute(executor)
288 - .await?;
289 -
290 - Ok(())
291 - }
227 + // Apply a Stripe-driven status and/or period update in one guarded statement.
228 + // `canceled` is terminal — an out-of-order `updated`(active) or `invoice.paid`
229 + // landing after a `deleted` cannot revive the row (status) nor refresh its
230 + // period, because both columns are written here under the single guard. The
231 + // old split `update_subscription_status` + `update_subscription_period` (whose
232 + // period half lacked the guard) are replaced by this; reactivation only ever
233 + // happens at checkout via `create_subscription`, never through this path.
234 + crate::db::subscription_writer::define_stripe_subscription_writer!(
235 + apply_stripe_update,
236 + "subscriptions",
237 + DbSubscription
238 + );
292 239
293 240 /// Mark a subscription as canceled.
294 241 #[tracing::instrument(skip_all)]
@@ -142,57 +142,49 @@ pub async fn update_knobs(
142 142 Ok(())
143 143 }
144 144
145 - /// Set `billing_status` directly (used by webhook handlers for
146 - /// suspended_unpaid / canceled / active transitions).
145 + /// Apply a Stripe-driven billing update to a sync app: optionally change
146 + /// `billing_status` and/or refresh the current period, in ONE guarded statement.
147 + /// Returns `true` if a live (non-canceled) row was updated.
147 148 ///
148 149 /// `canceled` is terminal: the `AND (billing_status != 'canceled' OR $2 = 'canceled')`
149 - /// guard refuses to move a canceled app back to active/suspended_unpaid, so an
150 - /// out-of-order/retried `customer.subscription.updated`(active) — or a stray
151 - /// `invoice.paid` — landing after a `deleted` can't revive blob upload + key
152 - /// issuance for a non-paying developer (Run #12 SERIOUS). The legitimate
150 + /// guard refuses to move a canceled app back to active/suspended_unpaid — AND,
151 + /// because the period columns are written here under the same guard, a stray
152 + /// `invoice.paid` after a `deleted` can no longer refresh the period (nor, via
153 + /// the returned `false`, reset usage) on a canceled app. This replaces the old
154 + /// split `set_billing_status` (guarded) + `set_period` (UNGUARDED) — the synckit
155 + /// instance of the period-without-guard bug class. The legitimate
153 156 /// `suspended_unpaid -> active` recovery on `invoice.paid` is unaffected
154 - /// (suspended_unpaid != canceled). Re-activation from `draft` goes through
155 - /// `activate_billing`, not this setter. Every subscription status setter enforces
156 - /// this invariant at the data layer (see `fan_plus::update_fan_plus_status`).
157 + /// (suspended_unpaid != canceled). Initial activation from `draft` goes through
158 + /// `activate_billing`, not this path. See `crate::db::subscription_writer` for
159 + /// the cross-family rationale.
157 160 #[tracing::instrument(skip_all)]
158 - pub async fn set_billing_status<'e>(
161 + pub async fn apply_billing_update<'e>(
159 162 executor: impl sqlx::PgExecutor<'e>,
160 163 app_id: SyncAppId,
161 - status: &str,
162 - ) -> Result<()> {
163 - sqlx::query(
164 - "UPDATE sync_apps SET billing_status = $2 \
165 - WHERE id = $1 AND (billing_status != 'canceled' OR $2 = 'canceled')",
166 - )
167 - .bind(app_id)
168 - .bind(status)
169 - .execute(executor)
170 - .await?;
171 - Ok(())
172 - }
173 -
174 - /// Set the current period bounds (called by `invoice.paid` handler).
175 - #[tracing::instrument(skip_all)]
176 - pub async fn set_period<'e>(
177 - executor: impl sqlx::PgExecutor<'e>,
178 - app_id: SyncAppId,
179 - start: DateTime<Utc>,
180 - end: DateTime<Utc>,
181 - ) -> Result<()> {
182 - sqlx::query(
164 + status: Option<&str>,
165 + period: Option<(DateTime<Utc>, DateTime<Utc>)>,
166 + ) -> Result<bool> {
167 + let (period_start, period_end) = match period {
168 + Some((start, end)) => (Some(start), Some(end)),
169 + None => (None, None),
170 + };
171 + let result = sqlx::query(
183 172 r#"
184 173 UPDATE sync_apps SET
185 - current_period_start = $2,
186 - current_period_end = $3
174 + billing_status = COALESCE($2, billing_status),
175 + current_period_start = COALESCE($3, current_period_start),
176 + current_period_end = COALESCE($4, current_period_end)
187 177 WHERE id = $1
178 + AND (billing_status != 'canceled' OR $2 = 'canceled')
188 179 "#,
189 180 )
190 181 .bind(app_id)
191 - .bind(start)
192 - .bind(end)
182 + .bind(status)
183 + .bind(period_start)
184 + .bind(period_end)
193 185 .execute(executor)
194 186 .await?;
195 - Ok(())
187 + Ok(result.rows_affected() > 0)
196 188 }
197 189
198 190 /// Reset the per-period usage counters on `sync_app_usage_current`. Called
@@ -130,8 +130,12 @@ async fn exec_git_operation(
130 130 }
131 131
132 132 // Authorized — exec git-shell with a sanitized command reconstructed
133 - // from validated components (prevents argument injection via the original command)
134 - let sanitized_cmd = format!("{} '/{}/{}.git'", operation.command(), owner, repo_name);
133 + // from validated components (prevents argument injection via the original
134 + // command). Use `owner_username` (the `Username`-validated value), not the
135 + // raw `owner` &str: `Username::new` preserves the string but constrains the
136 + // charset, so the value flowing into the `git-shell -c` argument stays
137 + // load-bearing on the validated type even if `parse_repo_path` ever loosens.
138 + let sanitized_cmd = format!("{} '/{}/{}.git'", operation.command(), owner_username.as_ref(), repo_name);
135 139 let err = exec_git_shell(&sanitized_cmd);
136 140 anyhow::bail!("failed to exec git-shell: {}", err);
137 141 }
@@ -158,7 +158,7 @@ pub fn build_app(
158 158 .merge(storage_routes())
159 159 .merge(stripe_routes())
160 160 .merge(admin_routes())
161 - .merge(synckit_routes())
161 + .merge(synckit_routes(state.config.synckit_jwt_secret.clone().map(std::sync::Arc::new)))
162 162 .merge(oauth_routes())
163 163 .merge(postmark_routes())
164 164 .merge(git_issue_routes())
@@ -45,15 +45,72 @@ impl KeyExtractor for CloudflareIpKeyExtractor {
45 45 }
46 46 }
47 47
48 - /// Per-SyncKit-app key extractor. Decodes the JWT payload from the
49 - /// `Authorization: Bearer <token>` header to extract the `app` claim.
48 + /// Per-SyncKit-app key extractor. Reads the `app` claim from the
49 + /// `Authorization: Bearer <token>` header to bucket requests per app.
50 50 ///
51 - /// This does NOT verify the JWT signature — that's the handler's job via
52 - /// `SyncUser`. The rate limiter only needs a consistent key to bucket
53 - /// requests. A forged JWT gets a rate-limit bucket but is rejected by the
54 - /// handler; the IP-based layer provides defense-in-depth against abuse.
55 - #[derive(Debug, Clone, Copy, PartialEq, Eq)]
56 - pub struct SyncAppKeyExtractor;
51 + /// The signature IS verified here (HS256, against the SyncKit JWT secret).
52 + /// Verification matters for rate limiting specifically: without it, an attacker
53 + /// can mint unsigned tokens carrying arbitrary `app` UUIDs and spray a fresh
54 + /// claim per request, landing each in its own fresh bucket and defeating the
55 + /// per-app limiter entirely. By verifying, only validly-signed tokens earn
56 + /// their real per-app bucket; everything else (missing/forged/malformed token)
57 + /// collapses into ONE shared sentinel bucket, so a spray cannot manufacture
58 + /// unlimited buckets. Auth itself is still enforced downstream by `SyncUser`,
59 + /// and an IP-layer limiter backstops volume regardless.
60 + ///
61 + /// Expiry/issuer are intentionally NOT enforced here — an expired-but-genuine
62 + /// token should still bucket by its real app (the handler will reject it). We
63 + /// only need proof the `app` claim wasn't fabricated.
64 + #[derive(Debug, Clone)]
65 + pub struct SyncAppKeyExtractor {
66 + /// SyncKit JWT signing secret. `None` when SyncKit isn't configured — in
67 + /// that case there is no auth and these routes are inert, so we fall back
68 + /// to the unverified payload parse to preserve prior behavior.
69 + secret: Option<std::sync::Arc<String>>,
70 + }
71 +
72 + impl SyncAppKeyExtractor {
73 + pub fn new(secret: Option<std::sync::Arc<String>>) -> Self {
74 + Self { secret }
75 + }
76 +
77 + /// Verify the HS256 signature and pull out the `app` claim. Returns `None`
78 + /// on any failure (bad signature, malformed token, missing claim).
79 + fn verify_app(secret: &str, token: &str) -> Option<SyncAppId> {
80 + use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
81 +
82 + #[derive(serde::Deserialize)]
83 + struct AppClaim {
84 + app: SyncAppId,
85 + }
86 +
87 + let mut validation = Validation::new(Algorithm::HS256);
88 + // Bucket genuine-but-expired tokens by their real app; only the
89 + // signature must hold. Clearing required claims + disabling exp means
90 + // the decode succeeds purely on a valid signature carrying `app`.
91 + validation.validate_exp = false;
92 + validation.required_spec_claims.clear();
93 +
94 + decode::<AppClaim>(token, &DecodingKey::from_secret(secret.as_bytes()), &validation)
95 + .ok()
96 + .map(|data| data.claims.app)
97 + }
98 +
99 + /// Unverified payload parse — only used when no secret is configured.
100 + fn parse_app_unverified(token: &str) -> Option<SyncAppId> {
101 + let payload_b64 = token.split('.').nth(1)?;
102 + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
103 + .decode(payload_b64)
104 + .or_else(|_| base64::engine::general_purpose::STANDARD.decode(payload_b64))
105 + .ok()?;
106 +
107 + #[derive(serde::Deserialize)]
108 + struct AppClaim {
109 + app: SyncAppId,
110 + }
111 + serde_json::from_slice::<AppClaim>(&payload_bytes).ok().map(|c| c.app)
112 + }
113 + }
57 114
58 115 impl KeyExtractor for SyncAppKeyExtractor {
59 116 type Key = SyncAppId;
@@ -71,27 +128,14 @@ impl KeyExtractor for SyncAppKeyExtractor {
71 128 None => return Ok(SyncAppId::nil()),
72 129 };
73 130
74 - // JWT is header.payload.signature — decode the payload (middle segment)
75 - // without verifying the signature. We only need the `app` field.
76 - let payload_b64 = token
77 - .split('.')
78 - .nth(1)
79 - .ok_or(GovernorError::UnableToExtractKey)?;
80 -
81 - let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
82 - .decode(payload_b64)
83 - .or_else(|_| base64::engine::general_purpose::STANDARD.decode(payload_b64))
84 - .map_err(|_| GovernorError::UnableToExtractKey)?;
85 -
86 - #[derive(serde::Deserialize)]
87 - struct AppClaim {
88 - app: SyncAppId,
89 - }
90 -
91 - let claims: AppClaim =
92 - serde_json::from_slice(&payload_bytes).map_err(|_| GovernorError::UnableToExtractKey)?;
131 + let app = match &self.secret {
132 + Some(secret) => Self::verify_app(secret, token),
133 + None => Self::parse_app_unverified(token),
134 + };
93 135
94 - Ok(claims.app)
136 + // A verified app gets its own bucket; anything that fails verification
137 + // collapses to the nil sentinel so forged tokens can't spray buckets.
138 + Ok(app.unwrap_or_else(SyncAppId::nil))
95 139 }
96 140 }
97 141
@@ -140,7 +184,11 @@ pub fn rate_limiter_per_sec(
140 184 }
141 185
142 186 /// Build a per-SyncKit-app rate limiter from a per-millisecond interval and burst size.
187 + ///
188 + /// `secret` is the SyncKit JWT signing secret; when present the extractor
189 + /// verifies token signatures (so forged `app` claims can't mint fresh buckets).
143 190 pub fn synckit_app_rate_limiter_ms(
191 + secret: Option<std::sync::Arc<String>>,
144 192 ms: u64,
145 193 burst: u32,
146 194 ) -> std::sync::Arc<
@@ -151,7 +199,7 @@ pub fn synckit_app_rate_limiter_ms(
151 199 > {
152 200 std::sync::Arc::new(
153 201 tower_governor::governor::GovernorConfigBuilder::default()
154 - .key_extractor(SyncAppKeyExtractor)
202 + .key_extractor(SyncAppKeyExtractor::new(secret))
155 203 .per_millisecond(ms)
156 204 .burst_size(burst)
157 205 .use_headers()
@@ -183,8 +231,27 @@ mod tests {
183 231 format!("{header}.{payload}.{sig}")
184 232 }
185 233
234 + /// Sign a real HS256 token carrying the given app id, using `secret`.
235 + fn signed_jwt(secret: &str, app_id: &SyncAppId) -> String {
236 + use jsonwebtoken::{encode, Algorithm, EncodingKey, Header};
237 + let claims = serde_json::json!({
238 + "sub": "00000000-0000-0000-0000-000000000001",
239 + "app": app_id,
240 + "iss": "makenotwork-synckit",
241 + "exp": 9999999999_i64,
242 + "iat": 1000000000_i64,
243 + });
244 + encode(
245 + &Header::new(Algorithm::HS256),
246 + &claims,
247 + &EncodingKey::from_secret(secret.as_bytes()),
248 + )
249 + .unwrap()
250 + }
251 +
186 252 #[test]
187 - fn extracts_app_id_from_jwt() {
253 + fn unverified_extracts_app_id_from_jwt_when_no_secret() {
254 + // No secret configured (SyncKit off) → fall back to unverified parse.
188 255 let app_id = SyncAppId::new();
189 256 let token = fake_jwt(&app_id);
190 257
@@ -193,14 +260,64 @@ mod tests {
193 260 .body(())
194 261 .unwrap();
195 262
196 - let extracted = SyncAppKeyExtractor.extract(&req).unwrap();
263 + let extracted = SyncAppKeyExtractor::new(None).extract(&req).unwrap();
197 264 assert_eq!(extracted, app_id);
198 265 }
199 266
200 267 #[test]
268 + fn verified_extracts_app_id_from_validly_signed_jwt() {
269 + let secret = "test-secret-key-for-synckit-jwt".to_string();
270 + let app_id = SyncAppId::new();
271 + let token = signed_jwt(&secret, &app_id);
272 +
273 + let req = Request::builder()
274 + .header("authorization", format!("Bearer {token}"))
275 + .body(())
276 + .unwrap();
277 +
278 + let extractor = SyncAppKeyExtractor::new(Some(std::sync::Arc::new(secret)));
279 + assert_eq!(extractor.extract(&req).unwrap(), app_id);
280 + }
281 +
282 + #[test]
283 + fn verified_forged_token_collapses_to_nil_bucket() {
284 + // A token whose `app` claim is attacker-chosen but whose signature does
285 + // NOT match the configured secret must NOT earn its own bucket — it
286 + // collapses to the nil sentinel so a spray of fresh `app` claims can't
287 + // manufacture unlimited buckets.
288 + let secret = "test-secret-key-for-synckit-jwt".to_string();
289 + let attacker_app = SyncAppId::new();
290 + let forged = fake_jwt(&attacker_app); // signed with "fakesig", not `secret`
291 +
292 + let req = Request::builder()
293 + .header("authorization", format!("Bearer {forged}"))
294 + .body(())
295 + .unwrap();
296 +
297 + let extractor = SyncAppKeyExtractor::new(Some(std::sync::Arc::new(secret)));
298 + assert_eq!(extractor.extract(&req).unwrap(), SyncAppId::nil());
299 + }
300 +
301 + #[test]
302 + fn verified_spray_of_forged_apps_all_share_one_bucket() {
303 + // The core anti-spray property: many DISTINCT forged app claims must all
304 + // map to the SAME (nil) key, not N distinct keys.
305 + let secret = std::sync::Arc::new("test-secret-key-for-synckit-jwt".to_string());
306 + let extractor = SyncAppKeyExtractor::new(Some(secret));
307 + for _ in 0..5 {
308 + let forged = fake_jwt(&SyncAppId::new());
309 + let req = Request::builder()
310 + .header("authorization", format!("Bearer {forged}"))
311 + .body(())
312 + .unwrap();
313 + assert_eq!(extractor.extract(&req).unwrap(), SyncAppId::nil());
314 + }
315 + }
316 +
317 + #[test]
201 318 fn missing_auth_header_returns_nil_sentinel() {
202 319 let req = Request::builder().body(()).unwrap();
203 - let key = SyncAppKeyExtractor.extract(&req).unwrap();
320 + let key = SyncAppKeyExtractor::new(None).extract(&req).unwrap();
204 321 assert_eq!(key, SyncAppId::nil());
205 322 }
206 323
@@ -210,21 +327,25 @@ mod tests {
210 327 .header("authorization", "Basic dXNlcjpwYXNz")
211 328 .body(())
212 329 .unwrap();
213 - let key = SyncAppKeyExtractor.extract(&req).unwrap();
330 + let key = SyncAppKeyExtractor::new(None).extract(&req).unwrap();
214 331 assert_eq!(key, SyncAppId::nil());
215 332 }
216 333
217 334 #[test]
218 - fn malformed_jwt_returns_error() {
335 + fn malformed_jwt_collapses_to_nil_sentinel() {
336 + // Previously this returned an extractor error; now a malformed token
337 + // collapses to the shared nil bucket (same anti-spray treatment as a
338 + // forged one) rather than erroring the request out of the limiter.
219 339 let req = Request::builder()
220 340 .header("authorization", "Bearer not-a-jwt")
221 341 .body(())
222 342 .unwrap();
223 - assert!(SyncAppKeyExtractor.extract(&req).is_err());
343 + let key = SyncAppKeyExtractor::new(None).extract(&req).unwrap();
344 + assert_eq!(key, SyncAppId::nil());
224 345 }
225 346
226 347 #[test]
227 - fn jwt_missing_app_claim_returns_error() {
348 + fn jwt_missing_app_claim_collapses_to_nil_sentinel() {
228 349 let header = base64::engine::general_purpose::URL_SAFE_NO_PAD
229 350 .encode(r#"{"alg":"HS256"}"#);
230 351 let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD
@@ -236,7 +357,8 @@ mod tests {
236 357 .header("authorization", format!("Bearer {token}"))
237 358 .body(())
238 359 .unwrap();
239 - assert!(SyncAppKeyExtractor.extract(&req).is_err());
360 + let key = SyncAppKeyExtractor::new(None).extract(&req).unwrap();
361 + assert_eq!(key, SyncAppId::nil());
240 362 }
241 363
242 364 #[test]
@@ -282,8 +404,9 @@ mod tests {
282 404 .body(())
283 405 .unwrap();
284 406
285 - let key1 = SyncAppKeyExtractor.extract(&req1).unwrap();
286 - let key2 = SyncAppKeyExtractor.extract(&req2).unwrap();
407 + let extractor = SyncAppKeyExtractor::new(None);
408 + let key1 = extractor.extract(&req1).unwrap();
409 + let key2 = extractor.extract(&req2).unwrap();
287 410 assert_ne!(key1, key2);
288 411 }
289 412 }
@@ -146,7 +146,13 @@ pub(in crate::routes::api) async fn update_password(
146 146 )).await.ok();
147 147 }
148 148
149 - // Hash and update
149 + // Hash and update. NOTE: `update_user_password` bumps `users.jwt_invalidated_at`
150 + // in the SAME UPDATE as the password hash, which is what revokes outstanding
151 + // SyncKit/OAuth bearer tokens (the `SyncUser` extractor rejects any JWT whose
152 + // `iat <= jwt_invalidated_at`). The session sweep below only clears web session
153 + // ROWS — it is deliberately NOT the JWT-revocation mechanism. Do not "fix" this
154 + // by swapping in `delete_all_sessions_for_user`: that would also log the user
155 + // out of their current web session, and the JWT bump already happened here.
150 156 let new_hash = crate::auth::hash_password(&req.new_password)?;
151 157 db::users::update_user_password(&state.db, user.id, &new_hash).await?;
152 158
@@ -199,31 +199,23 @@ pub(super) async fn confirm_upload(
199 199 }
200 200 };
201 201
202 - // Reject file types that have their own dedicated confirm routes BEFORE
203 - // any scan enqueue or scan_status flip — otherwise a misrouted-but-valid
204 - // item_id flips that item's scan_status to Pending, blocks every fan's
205 - // download, and leaks a scan_jobs row for an S3 key we're about to delete.
206 - match file_type {
207 - FileType::Audio | FileType::Cover | FileType::Video => {}
208 - FileType::Download => {
202 + // Resolve, from the single exhaustive declaration on `FileType`, how this
203 + // type is confirmed on an `items` row — and reject types that belong to a
204 + // dedicated route BEFORE any scan enqueue or scan_status flip. (A misrouted
205 + // but valid item_id would otherwise flip scan_status to Pending, block every
206 + // fan's download, and leak a scan_jobs row for an S3 key we're about to
207 + // delete.) Cover is rejected here now: it also needs `cover_image_url`,
208 + // which only /api/items/image/confirm writes — the old generic two-column
209 + // path left it NULL and rendered an invisible cover (Run #13 SERIOUS).
210 + let (s3_col, size_col) = match file_type.generic_item_confirm() {
211 + crate::storage::GenericItemConfirm::Columns { s3_key, size } => (s3_key, size),
212 + crate::storage::GenericItemConfirm::UseRoute(route) => {
209 213 s3.delete_object(&req.s3_key).await.ok();
210 - return Err(AppError::BadRequest(
211 - "Use /api/versions/{version_id}/upload/* routes for download files".to_string(),
212 - ));
213 - }
214 - FileType::Insertion => {
215 - s3.delete_object(&req.s3_key).await.ok();
216 - return Err(AppError::BadRequest(
217 - "Use /api/users/me/insertions/* routes for insertion clips".to_string(),
218 - ));
219 - }
220 - FileType::MediaImage | FileType::MediaVideo => {
221 - s3.delete_object(&req.s3_key).await.ok();
222 - return Err(AppError::BadRequest(
223 - "Use /api/media/* routes for media library uploads".to_string(),
224 - ));
214 + return Err(AppError::BadRequest(format!(
215 + "This file type isn't confirmed here — use {route}."
216 + )));
225 217 }
226 - }
218 + };
227 219
228 220 // Idempotency + replace detection. We read the item ONCE here and reuse its
229 221 // project_id for the cache bump below (previously a second get_item_by_id).
@@ -260,15 +252,6 @@ pub(super) async fn confirm_upload(
260 252 }
261 253 let is_replace = old_s3_key.is_some();
262 254
263 - let (s3_col, size_col) = match file_type {
264 - FileType::Audio => ("audio_s3_key", "audio_file_size_bytes"),
265 - FileType::Cover => ("cover_s3_key", "cover_file_size_bytes"),
266 - FileType::Video => ("video_s3_key", "video_file_size_bytes"),
267 - // Already rejected above — defensive error instead of panic
268 - FileType::Download | FileType::Insertion | FileType::MediaImage | FileType::MediaVideo => {
269 - return Err(AppError::BadRequest(format!("File type {:?} not supported for item uploads", file_type)));
270 - }
271 - };
272 255 let update_sql = format!(
273 256 "UPDATE items SET {} = $2, {} = $3, updated_at = NOW() WHERE id = $1 AND project_id IN (SELECT id FROM projects WHERE user_id = $4)",
274 257 s3_col, size_col,
@@ -55,9 +55,13 @@ pub(super) async fn handle_invoice_payment_succeeded(
55 55 let period_start = stripe_timestamp(invoice.period_start);
56 56 let period_end = stripe_timestamp(invoice.period_end);
57 57 let mut tx = state.db.begin().await.context("begin synckit invoice.paid transaction")?;
58 - db::synckit_billing::set_billing_status(&mut *tx, app_id, "active").await.context("synckit billing -> active")?;
59 - db::synckit_billing::set_period(&mut *tx, app_id, period_start, period_end).await.context("synckit set_period")?;
60 - db::synckit_billing::reset_period_usage(&mut *tx, app_id).await.context("synckit reset_period_usage")?;
58 + // One guarded write for status + period; only reset usage if the app was
59 + // live (a canceled app is refused, so a stray invoice.paid can't refresh
60 + // period or usage on it).
61 + let applied = db::synckit_billing::apply_billing_update(&mut *tx, app_id, Some("active"), Some((period_start, period_end))).await.context("synckit apply_billing_update")?;
62 + if applied {
63 + db::synckit_billing::reset_period_usage(&mut *tx, app_id).await.context("synckit reset_period_usage")?;
64 + }
61 65 tx.commit().await.context("commit synckit invoice.paid")?;
62 66 if let Err(e) = db::subscriptions::log_subscription_event(
63 67 &state.db, None, event_id, "invoice.payment_succeeded.synckit",
@@ -70,10 +74,10 @@ pub(super) async fn handle_invoice_payment_succeeded(
70 74
71 75 // Check if this is a Fan+ subscription
72 76 if let Some(fan_sub) = db::fan_plus::get_fan_plus_by_stripe_id(&state.db, &stripe_sub_id).await.context("fetch fan+ by stripe id")? {
73 - // Update period
77 + // Refresh period (guarded: a canceled Fan+ sub is left untouched).
74 78 let period_start = stripe_timestamp(invoice.period_start);
75 79 let period_end = stripe_timestamp(invoice.period_end);
76 - db::fan_plus::update_fan_plus_period(&state.db, &stripe_sub_id, period_start, period_end).await.context("update fan+ period")?;
80 + db::fan_plus::apply_stripe_update(&state.db, &stripe_sub_id, None, Some((period_start, period_end))).await.context("refresh fan+ period")?;
77 81
78 82 // On renewal, generate a $5 platform-wide promo code and email it
79 83 if is_renewal {
@@ -152,7 +156,7 @@ pub(super) async fn handle_invoice_payment_succeeded(
152 156 if let Some(_ct_sub) = db::creator_tiers::get_creator_sub_by_stripe_id(&state.db, &stripe_sub_id).await.context("fetch creator sub by stripe id")? {
153 157 let period_start = stripe_timestamp(invoice.period_start);
154 158 let period_end = stripe_timestamp(invoice.period_end);
155 - db::creator_tiers::update_creator_sub_period(&state.db, &stripe_sub_id, period_start, period_end).await.context("update creator sub period")?;
159 + db::creator_tiers::apply_stripe_update(&state.db, &stripe_sub_id, None, Some((period_start, period_end))).await.context("refresh creator sub period")?;
156 160
157 161 if let Err(e) = db::subscriptions::log_subscription_event(
158 162 &state.db, None, event_id, "invoice.payment_succeeded.creator_tier",
@@ -163,10 +167,10 @@ pub(super) async fn handle_invoice_payment_succeeded(
163 167 return Ok(());
164 168 }
165 169
166 - // Update period for creator subscriptions
170 + // Refresh period for fan subscriptions (guarded: canceled rows untouched).
167 171 let period_start = stripe_timestamp(invoice.period_start);
168 172 let period_end = stripe_timestamp(invoice.period_end);
169 - db::subscriptions::update_subscription_period(&state.db, &stripe_sub_id, period_start, period_end).await.context("update subscription period")?;
173 + db::subscriptions::apply_stripe_update(&state.db, &stripe_sub_id, None, Some((period_start, period_end))).await.context("refresh subscription period")?;
170 174
171 175 // Send renewal email only for renewals (not the first invoice)
172 176 let db_sub = db::subscriptions::get_subscription_by_stripe_id(&state.db, &stripe_sub_id).await.context("fetch subscription by stripe id")?;
@@ -219,7 +223,7 @@ pub(super) async fn handle_invoice_payment_failed(
219 223
220 224 // SyncKit v2 developer subscription? Mark suspended_unpaid.
221 225 if let Some(app_id) = db::synckit_billing::get_app_by_stripe_subscription(&state.db, &stripe_sub_id).await.context("fetch synckit app by stripe sub id")? {
222 - db::synckit_billing::set_billing_status(&state.db, app_id, "suspended_unpaid").await.context("synckit billing -> suspended_unpaid")?;
226 + db::synckit_billing::apply_billing_update(&state.db, app_id, Some("suspended_unpaid"), None).await.context("synckit billing -> suspended_unpaid")?;
223 227 if let Err(e) = db::subscriptions::log_subscription_event(
224 228 &state.db, None, event_id, "invoice.payment_failed.synckit",
225 229 &serde_json::json!({"stripe_sub_id": stripe_sub_id, "synckit_app_id": app_id.to_string()}),
@@ -235,7 +239,7 @@ pub(super) async fn handle_invoice_payment_failed(
235 239
236 240 // Check if this is a Fan+ subscription
237 241 if let Some(_fan_sub) = db::fan_plus::get_fan_plus_by_stripe_id(&state.db, &stripe_sub_id).await.context("fetch fan+ by stripe id")? {
238 - db::fan_plus::update_fan_plus_status(&state.db, &stripe_sub_id, SubscriptionStatus::PastDue).await.context("update fan+ status to past_due")?;
242 + db::fan_plus::apply_stripe_update(&state.db, &stripe_sub_id, Some(SubscriptionStatus::PastDue), None).await.context("fan+ status -> past_due")?;
239 243
240 244 if let Err(e) = db::subscriptions::log_subscription_event(
241 245 &state.db, None, event_id, "invoice.payment_failed.fan_plus",
@@ -248,7 +252,7 @@ pub(super) async fn handle_invoice_payment_failed(
248 252
249 253 // Check if this is a creator tier subscription
250 254 if let Some(ct_sub) = db::creator_tiers::get_creator_sub_by_stripe_id(&state.db, &stripe_sub_id).await.context("fetch creator sub by stripe id")? {
251 - db::creator_tiers::update_creator_sub_status(&state.db, &stripe_sub_id, SubscriptionStatus::PastDue).await.context("update creator sub status to past_due")?;
255 + db::creator_tiers::apply_stripe_update(&state.db, &stripe_sub_id, Some(SubscriptionStatus::PastDue), None).await.context("creator sub status -> past_due")?;
252 256 db::creator_tiers::sync_user_creator_tier(&state.db, ct_sub.user_id).await.context("sync user creator tier")?;
253 257
254 258 if let Err(e) = db::subscriptions::log_subscription_event(
@@ -260,7 +264,7 @@ pub(super) async fn handle_invoice_payment_failed(
260 264 return Ok(());
261 265 }
262 266
263 - let updated = db::subscriptions::update_subscription_status(&state.db, &stripe_sub_id, SubscriptionStatus::PastDue).await.context("update subscription status to past_due")?;
267 + let updated = db::subscriptions::apply_stripe_update(&state.db, &stripe_sub_id, Some(SubscriptionStatus::PastDue), None).await.context("subscription status -> past_due")?;
264 268
265 269 // Log event
266 270 let sub_id = updated.as_ref().map(|s| s.id);
@@ -47,7 +47,7 @@ pub(super) async fn handle_subscription_updated(
47 47 _ => None,
48 48 };
49 49 if let Some(s) = new_status {
50 - db::synckit_billing::set_billing_status(&state.db, app_id, s).await.context("synckit set_billing_status")?;
50 + db::synckit_billing::apply_billing_update(&state.db, app_id, Some(s), None).await.context("synckit apply_billing_update")?;
51 51 }
52 52 if let Err(e) = db::subscriptions::log_subscription_event(
53 53 &state.db, None, event_id, "customer.subscription.updated.synckit",
@@ -87,12 +87,12 @@ pub(super) async fn handle_subscription_updated(
87 87 if let Some(_fan_sub) = db::fan_plus::get_fan_plus_by_stripe_id(&state.db, &stripe_sub_id).await.context("fetch fan plus by stripe id")? {
88 88 let status_str = sub.status.as_str();
89 89 let Some(status) = parse_status_or_log(status_str, event_id, &stripe_sub_id) else { return Ok(()); };
90 - db::fan_plus::update_fan_plus_status(&state.db, &stripe_sub_id, status).await.context("update fan plus status")?;
91 90
91 + // Status + period in one guarded write: a canceled Fan+ sub is neither
92 + // revived nor period-refreshed by an out-of-order update.
92 93 let (start_ts, end_ts) = sub.current_period().unwrap_or((0, 0));
93 - let period_start = stripe_timestamp(start_ts);
94 - let period_end = stripe_timestamp(end_ts);
95 - db::fan_plus::update_fan_plus_period(&state.db, &stripe_sub_id, period_start, period_end).await.context("update fan plus period")?;
94 + let period = Some((stripe_timestamp(start_ts), stripe_timestamp(end_ts)));
95 + db::fan_plus::apply_stripe_update(&state.db, &stripe_sub_id, Some(status), period).await.context("apply fan plus update")?;
96 96
97 97 // Keep the dashboard flag in sync with Stripe — covers cancellation
98 98 // initiated via the customer portal as well as our dashboard route.
@@ -113,12 +113,11 @@ pub(super) async fn handle_subscription_updated(
113 113 if let Some(ct_sub) = db::creator_tiers::get_creator_sub_by_stripe_id(&state.db, &stripe_sub_id).await.context("fetch creator sub by stripe id")? {
114 114 let status_str = sub.status.as_str();
115 115 let Some(status) = parse_status_or_log(status_str, event_id, &stripe_sub_id) else { return Ok(()); };
116 - db::creator_tiers::update_creator_sub_status(&state.db, &stripe_sub_id, status).await.context("update creator sub status")?;
117 116
117 + // Status + period in one guarded write (canceled is terminal for both).
118 118 let (start_ts, end_ts) = sub.current_period().unwrap_or((0, 0));
119 - let period_start = stripe_timestamp(start_ts);
120 - let period_end = stripe_timestamp(end_ts);
121 - db::creator_tiers::update_creator_sub_period(&state.db, &stripe_sub_id, period_start, period_end).await.context("update creator sub period")?;
119 + let period = Some((stripe_timestamp(start_ts), stripe_timestamp(end_ts)));
120 + db::creator_tiers::apply_stripe_update(&state.db, &stripe_sub_id, Some(status), period).await.context("apply creator sub update")?;
122 121
123 122 // Sync the denormalized creator_tier column on users
124 123 db::creator_tiers::sync_user_creator_tier(&state.db, ct_sub.user_id).await.context("sync user creator tier")?;
@@ -135,15 +134,11 @@ pub(super) async fn handle_subscription_updated(
135 134 let status_str = sub.status.as_str();
136 135 let Some(status) = parse_status_or_log(status_str, event_id, &stripe_sub_id) else { return Ok(()); };
137 136
138 - // Update status + period atomically in a single transaction
139 - let mut tx = state.db.begin().await.context("begin subscription update transaction")?;
140 - let updated = db::subscriptions::update_subscription_status(&mut *tx, &stripe_sub_id, status).await.context("update subscription status")?;
141 -
137 + // Status + period in one guarded statement — `canceled` is terminal for both,
138 + // so a late `updated`(active) can neither revive the row nor refresh its period.
142 139 let (start_ts, end_ts) = sub.current_period().unwrap_or((0, 0));
143 - let period_start = stripe_timestamp(start_ts);
144 - let period_end = stripe_timestamp(end_ts);
145 - db::subscriptions::update_subscription_period(&mut *tx, &stripe_sub_id, period_start, period_end).await.context("update subscription period")?;
146 - tx.commit().await.context("commit subscription update")?;
140 + let period = Some((stripe_timestamp(start_ts), stripe_timestamp(end_ts)));
141 + let updated = db::subscriptions::apply_stripe_update(&state.db, &stripe_sub_id, Some(status), period).await.context("apply subscription update")?;
147 142
148 143 // Log event
149 144 let sub_id = updated.as_ref().map(|s| s.id);
@@ -168,7 +163,7 @@ pub(super) async fn handle_subscription_deleted(
168 163
169 164 // SyncKit v2 developer subscription? Flip to 'canceled'.
170 165 if let Some(app_id) = db::synckit_billing::get_app_by_stripe_subscription(&state.db, &stripe_sub_id).await.context("fetch synckit app by stripe sub id")? {
171 - db::synckit_billing::set_billing_status(&state.db, app_id, "canceled").await.context("synckit billing -> canceled")?;
166 + db::synckit_billing::apply_billing_update(&state.db, app_id, Some("canceled"), None).await.context("synckit billing -> canceled")?;
172 167 if let Err(e) = db::subscriptions::log_subscription_event(
173 168 &state.db, None, event_id, "customer.subscription.deleted.synckit",
174 169 &serde_json::json!({"stripe_sub_id": stripe_sub_id, "synckit_app_id": app_id.to_string()}),
@@ -252,7 +252,7 @@ pub(super) async fn cancel(
252 252 stripe.cancel_synckit_subscription(sub_id).await?;
253 253 }
254 254
255 - db::synckit_billing::set_billing_status(&state.db, app_id, "canceled").await?;
255 + db::synckit_billing::apply_billing_update(&state.db, app_id, Some("canceled"), None).await?;
256 256
257 257 Ok(axum::http::StatusCode::NO_CONTENT)
258 258 }
@@ -530,7 +530,11 @@ pub(super) fn generate_api_key() -> String {
530 530 /// - **App management routes** (`/api/sync/apps/...`): Session-based auth
531 531 /// via `AuthUser` extractor (accessed from the MNW dashboard), no extra
532 532 /// rate limit beyond the global middleware.
533 - pub fn synckit_routes() -> CsrfRouter<AppState> {
533 + ///
534 + /// `synckit_jwt_secret` is threaded in (rather than read from a global) so the
535 + /// per-app rate limiter's key extractor can verify token signatures; see
536 + /// [`crate::rate_limit::SyncAppKeyExtractor`].
537 + pub fn synckit_routes(synckit_jwt_secret: Option<std::sync::Arc<String>>) -> CsrfRouter<AppState> {
534 538 let auth_rate_limit = crate::helpers::rate_limiter_per_sec(constants::SYNCKIT_AUTH_RATE_LIMIT_PER_SEC, constants::SYNCKIT_AUTH_RATE_LIMIT_BURST);
535 539
536 540 let auth_routes = CsrfRouter::new()
@@ -552,7 +556,7 @@ pub fn synckit_routes() -> CsrfRouter<AppState> {
552 556 });
553 557
554 558 let sync_ip_rate_limit = crate::helpers::rate_limiter_ms(constants::SYNCKIT_SYNC_RATE_LIMIT_MS, constants::SYNCKIT_SYNC_RATE_LIMIT_BURST);
555 - let sync_app_rate_limit = crate::helpers::synckit_app_rate_limiter_ms(constants::SYNCKIT_APP_RATE_LIMIT_MS, constants::SYNCKIT_APP_RATE_LIMIT_BURST);
559 + let sync_app_rate_limit = crate::helpers::synckit_app_rate_limiter_ms(synckit_jwt_secret, constants::SYNCKIT_APP_RATE_LIMIT_MS, constants::SYNCKIT_APP_RATE_LIMIT_BURST);
556 560
557 561 let sync_routes = CsrfRouter::new()
558 562 .route("/api/sync/push", post_csrf_skip(SYNCKIT_JWT_SKIP, sync::sync_push))