Skip to main content

max / makenotwork

SyncKit v2 billing: open-platform developer billing Replace per-end-user app_sync subscriptions with a developer-pays-MNW model. Each SyncKit app's developer is billed directly via two knobs (storage_gb_cap and egress_multiple, optional key_cap in per_key mode); price is computed in src/synckit_billing.rs at 2x Hetzner Object Storage marginal cost with a pro-rated $5 floor. The end-user no longer pays anything for sync. First-party apps (GO/BB/AF) keep working via an is_internal flag that bypasses billing entirely. Phase 1: migration 117 adds the billing columns to sync_apps plus the sync_app_keys and sync_app_usage_current tables; drops app_sync_subscriptions. Phase 2-3: rip out the old end-user billing (AppSyncTier, AppSyncCheckoutParams, routes/synckit/subscription.rs) and add developer-facing routes setup/ activate/patch/cancel/get on /api/sync/apps/{id}/billing, plus a Stripe billing portal link. Webhook dispatch in routes/stripe/webhook/billing.rs routes the new subscription class by DB lookup on stripe_subscription_id. Phase 4: server-to-server SDK key claim/release/list with per_key cap enforcement returning 402 key_limit_reached. Phase 5: storage + egress byte counters on blob confirm/download presign with 402 enforcement, hourly warning email job at 75/90/100% thresholds (scheduler/synckit_warnings.rs), and a weekly storage drift recalc. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-21 21:49 UTC
Commit: a913484881d75fb0c79a554817f9aeaa5164c8bd
Parent: 1c3cd28
32 files changed, +2401 insertions, -602 deletions
@@ -126,7 +126,7 @@ async-stripe-shared = { version = "1.0.0-rc.5", features = ["deserialize"] }
126 126 async-stripe-billing = { version = "1.0.0-rc.5", features = ["deserialize", "subscription", "billing_portal_session"] }
127 127 async-stripe-checkout = { version = "1.0.0-rc.5", features = ["deserialize", "checkout_session"] }
128 128 async-stripe-connect = { version = "1.0.0-rc.5", features = ["deserialize", "account", "account_link"] }
129 - async-stripe-core = { version = "1.0.0-rc.5", features = ["deserialize", "balance", "refund"] }
129 + async-stripe-core = { version = "1.0.0-rc.5", features = ["deserialize", "balance", "refund", "customer"] }
130 130 async-stripe-payment = { version = "1.0.0-rc.5", features = ["deserialize"] }
131 131 async-stripe-product = { version = "1.0.0-rc.5", features = ["deserialize", "product", "price"] }
132 132 async-stripe-types = { version = "1.0.0-rc.5", features = ["deserialize"] }
@@ -0,0 +1,122 @@
1 + -- SyncKit v2 billing: open-platform model.
2 + --
3 + -- Replaces the old per-end-user subscription system (`app_sync_subscriptions`)
4 + -- with a developer-pays-MNW model. The developer of a SyncKit app is the
5 + -- existing `sync_apps.creator_id`. MNW charges the developer at the start of
6 + -- each period using a parameterized two-knob pricing model:
7 + --
8 + -- monthly_price = max($5 base, storage_gb × $0.013
9 + -- + storage_gb × egress_multiple × $0.0022
10 + -- + key_cap × $0.02 (per_key mode only))
11 + --
12 + -- Knobs the developer sets:
13 + -- - storage_gb_cap — total GB of blob storage available to the app
14 + -- - egress_multiple — monthly egress quota = storage_gb_cap × this
15 + -- - key_cap — only meaningful in per_key enforcement mode
16 + --
17 + -- The base fee is pro-rated: developers always pay at least $5/mo, but usage
18 + -- fees count against it. Once usage exceeds $5, the floor disappears.
19 + --
20 + -- Enforcement modes (orthogonal to pricing):
21 + -- - per_key — when key_cap reached, new key claims refused; existing keys
22 + -- keep working until storage/egress caps hit.
23 + -- - app_wide — any cap hit returns quota_exceeded for the whole app.
24 + --
25 + -- "Key" is a developer-defined opaque unit. The dev's backend calls
26 + -- claim_key/release_key as their app's logic dictates (user, workspace,
27 + -- server, channel, whatever).
28 + --
29 + -- First-party apps (GO/BB/AF) are marked `is_internal` and bypass billing
30 + -- entirely. Their end-user billing (formerly app_sync_subscriptions) is
31 + -- removed — those apps will grow their own subscription code outside SyncKit.
32 +
33 + -- ── sync_apps: billing columns ──
34 + ALTER TABLE sync_apps ADD COLUMN is_internal BOOLEAN NOT NULL DEFAULT FALSE;
35 + ALTER TABLE sync_apps ADD COLUMN stripe_customer_id TEXT;
36 + ALTER TABLE sync_apps ADD COLUMN stripe_subscription_id TEXT UNIQUE;
37 + ALTER TABLE sync_apps ADD COLUMN billing_status TEXT NOT NULL DEFAULT 'draft'
38 + CHECK (billing_status IN ('draft', 'active', 'suspended_unpaid', 'canceled'));
39 +
40 + -- Pricing knobs. NULL while in draft status; required once active (unless internal).
41 + ALTER TABLE sync_apps ADD COLUMN storage_gb_cap INT
42 + CHECK (storage_gb_cap IS NULL OR storage_gb_cap > 0);
43 + ALTER TABLE sync_apps ADD COLUMN egress_multiple NUMERIC(6, 2)
44 + CHECK (egress_multiple IS NULL OR egress_multiple > 0);
45 +
46 + -- Enforcement mode and the optional key cap that goes with per_key mode.
47 + ALTER TABLE sync_apps ADD COLUMN enforcement_mode TEXT NOT NULL DEFAULT 'app_wide'
48 + CHECK (enforcement_mode IN ('per_key', 'app_wide'));
49 + ALTER TABLE sync_apps ADD COLUMN key_cap INT
50 + CHECK (key_cap IS NULL OR key_cap > 0);
51 +
52 + ALTER TABLE sync_apps ADD COLUMN current_period_start TIMESTAMPTZ;
53 + ALTER TABLE sync_apps ADD COLUMN current_period_end TIMESTAMPTZ;
54 +
55 + -- An active non-internal app must have both knobs set. A per_key app must
56 + -- have key_cap set; an app_wide app must NOT.
57 + ALTER TABLE sync_apps ADD CONSTRAINT sync_apps_billing_shape CHECK (
58 + is_internal
59 + OR billing_status = 'draft'
60 + OR billing_status = 'canceled'
61 + OR (storage_gb_cap IS NOT NULL AND egress_multiple IS NOT NULL)
62 + );
63 + ALTER TABLE sync_apps ADD CONSTRAINT sync_apps_key_cap_shape CHECK (
64 + (enforcement_mode = 'per_key' AND key_cap IS NOT NULL)
65 + OR (enforcement_mode = 'app_wide' AND key_cap IS NULL)
66 + );
67 +
68 + CREATE INDEX idx_sync_apps_stripe_customer ON sync_apps(stripe_customer_id)
69 + WHERE stripe_customer_id IS NOT NULL;
70 + CREATE INDEX idx_sync_apps_billing_status ON sync_apps(billing_status);
71 +
72 + -- ── Backfill first-party apps ──
73 + -- All currently-existing apps are first-party. Mark internal, set active.
74 + UPDATE sync_apps SET
75 + is_internal = TRUE,
76 + billing_status = 'active',
77 + current_period_start = NOW(),
78 + current_period_end = NOW() + INTERVAL '100 years';
79 +
80 + -- ── Per-app claimed keys ──
81 + -- A key is an opaque developer-defined string. Counted against key_cap
82 + -- (per_key mode only). released_at NULL ⇒ currently active.
83 + CREATE TABLE sync_app_keys (
84 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
85 + app_id UUID NOT NULL REFERENCES sync_apps(id) ON DELETE CASCADE,
86 + key TEXT NOT NULL,
87 + claimed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
88 + released_at TIMESTAMPTZ,
89 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
90 + );
91 +
92 + CREATE UNIQUE INDEX idx_sync_app_keys_active
93 + ON sync_app_keys(app_id, key)
94 + WHERE released_at IS NULL;
95 + CREATE INDEX idx_sync_app_keys_app_active
96 + ON sync_app_keys(app_id)
97 + WHERE released_at IS NULL;
98 +
99 + -- ── Live usage counters (one row per app, updated in place) ──
100 + -- bytes_stored is a running total maintained by the storage layer;
101 + -- bytes_egress_period and keys_claimed reset on period rollover.
102 + -- last_warning_pct tracks which warning email has been sent (0/75/90/100)
103 + -- so we don't spam the developer.
104 + CREATE TABLE sync_app_usage_current (
105 + app_id UUID PRIMARY KEY REFERENCES sync_apps(id) ON DELETE CASCADE,
106 + bytes_stored BIGINT NOT NULL DEFAULT 0,
107 + bytes_egress_period BIGINT NOT NULL DEFAULT 0,
108 + keys_claimed INT NOT NULL DEFAULT 0,
109 + last_warning_pct SMALLINT NOT NULL DEFAULT 0
110 + CHECK (last_warning_pct IN (0, 75, 90, 100)),
111 + period_started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
112 + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
113 + );
114 +
115 + INSERT INTO sync_app_usage_current (app_id)
116 + SELECT id FROM sync_apps
117 + ON CONFLICT (app_id) DO NOTHING;
118 +
119 + -- ── Drop the old end-user subscription system ──
120 + -- Active Stripe subscriptions linked to this table will be canceled by the
121 + -- Phase 2 application code; this migration only removes the persistence.
122 + DROP TABLE app_sync_subscriptions;
@@ -1,213 +0,0 @@
1 - //! App-level sync subscription queries (GO, BB, AF cloud sync).
2 -
3 - use chrono::{DateTime, Utc};
4 - use sqlx::PgPool;
5 -
6 - use super::enums::{AppSyncTier, SubscriptionStatus};
7 - use super::id_types::*;
8 - use super::models::DbAppSyncSubscription;
9 - use crate::error::Result;
10 -
11 - /// Create a new app sync subscription.
12 - ///
13 - /// Uses ON CONFLICT DO NOTHING on (user_id, app_id) to handle duplicate
14 - /// webhook deliveries and concurrent subscription attempts.
15 - #[tracing::instrument(skip_all)]
16 - pub async fn create_app_sync_subscription<'e>(
17 - executor: impl sqlx::PgExecutor<'e>,
18 - user_id: UserId,
19 - app_id: SyncAppId,
20 - stripe_subscription_id: &str,
21 - stripe_customer_id: &str,
22 - tier: AppSyncTier,
23 - storage_limit_bytes: Option<i64>,
24 - ) -> Result<Option<DbAppSyncSubscription>> {
25 - let sub = sqlx::query_as::<_, DbAppSyncSubscription>(
26 - r#"
27 - INSERT INTO app_sync_subscriptions
28 - (user_id, app_id, stripe_subscription_id, stripe_customer_id, tier, storage_limit_bytes)
29 - VALUES ($1, $2, $3, $4, $5, $6)
30 - ON CONFLICT (user_id, app_id) DO NOTHING
31 - RETURNING *
32 - "#,
33 - )
34 - .bind(user_id)
35 - .bind(app_id)
36 - .bind(stripe_subscription_id)
37 - .bind(stripe_customer_id)
38 - .bind(tier)
39 - .bind(storage_limit_bytes)
40 - .fetch_optional(executor)
41 - .await?;
42 -
43 - Ok(sub)
44 - }
45 -
46 - /// Look up an app sync subscription by its Stripe subscription ID.
47 - #[tracing::instrument(skip_all)]
48 - pub async fn get_app_sync_sub_by_stripe_id(
49 - pool: &PgPool,
50 - stripe_subscription_id: &str,
51 - ) -> Result<Option<DbAppSyncSubscription>> {
52 - let sub = sqlx::query_as::<_, DbAppSyncSubscription>(
53 - "SELECT * FROM app_sync_subscriptions WHERE stripe_subscription_id = $1",
54 - )
55 - .bind(stripe_subscription_id)
56 - .fetch_optional(pool)
57 - .await?;
58 -
59 - Ok(sub)
60 - }
61 -
62 - /// Get a user's app sync subscription for a specific app (any status).
63 - #[tracing::instrument(skip_all)]
64 - pub async fn get_app_sync_sub(
65 - pool: &PgPool,
66 - user_id: UserId,
67 - app_id: SyncAppId,
68 - ) -> Result<Option<DbAppSyncSubscription>> {
69 - let sub = sqlx::query_as::<_, DbAppSyncSubscription>(
70 - "SELECT * FROM app_sync_subscriptions WHERE user_id = $1 AND app_id = $2",
71 - )
72 - .bind(user_id)
73 - .bind(app_id)
74 - .fetch_optional(pool)
75 - .await?;
76 -
77 - Ok(sub)
78 - }
79 -
80 - /// Check whether a user has an active sync subscription for an app.
81 - #[tracing::instrument(skip_all)]
82 - pub async fn has_active_app_sync_sub(
83 - pool: &PgPool,
84 - user_id: UserId,
85 - app_id: SyncAppId,
86 - ) -> Result<bool> {
87 - let exists = sqlx::query_scalar::<_, bool>(
88 - "SELECT EXISTS(SELECT 1 FROM app_sync_subscriptions WHERE user_id = $1 AND app_id = $2 AND status = 'active')",
89 - )
90 - .bind(user_id)
91 - .bind(app_id)
92 - .fetch_one(pool)
93 - .await?;
94 -
95 - Ok(exists)
96 - }
97 -
98 - /// Get the blob storage limit for a user's active app sync subscription.
99 - /// Returns None if no active subscription or no storage limit set.
100 - #[tracing::instrument(skip_all)]
101 - pub async fn get_blob_storage_limit(
102 - pool: &PgPool,
103 - user_id: UserId,
104 - app_id: SyncAppId,
105 - ) -> Result<Option<i64>> {
106 - let limit = sqlx::query_scalar::<_, Option<i64>>(
107 - "SELECT storage_limit_bytes FROM app_sync_subscriptions WHERE user_id = $1 AND app_id = $2 AND status = 'active'",
108 - )
109 - .bind(user_id)
110 - .bind(app_id)
111 - .fetch_optional(pool)
112 - .await?;
113 -
114 - Ok(limit.flatten())
115 - }
116 -
117 - /// Update the status of an app sync subscription.
118 - /// Sets canceled_at when transitioning to canceled, preserving existing value.
119 - #[tracing::instrument(skip_all)]
120 - pub async fn update_app_sync_sub_status<'e>(
121 - executor: impl sqlx::PgExecutor<'e>,
122 - stripe_subscription_id: &str,
123 - status: SubscriptionStatus,
124 - ) -> Result<Option<DbAppSyncSubscription>> {
125 - let sub = sqlx::query_as::<_, DbAppSyncSubscription>(
126 - r#"
127 - UPDATE app_sync_subscriptions
128 - SET status = $2,
129 - canceled_at = CASE
130 - WHEN $2 = 'canceled' THEN COALESCE(canceled_at, NOW())
131 - ELSE canceled_at
132 - END
133 - WHERE stripe_subscription_id = $1
134 - RETURNING *
135 - "#,
136 - )
137 - .bind(stripe_subscription_id)
138 - .bind(status)
139 - .fetch_optional(executor)
140 - .await?;
141 -
142 - Ok(sub)
143 - }
144 -
145 - /// Update the billing period of an app sync subscription.
146 - #[tracing::instrument(skip_all)]
147 - pub async fn update_app_sync_sub_period<'e>(
148 - executor: impl sqlx::PgExecutor<'e>,
149 - stripe_subscription_id: &str,
150 - start: DateTime<Utc>,
151 - end: DateTime<Utc>,
152 - ) -> Result<()> {
153 - sqlx::query(
154 - r#"
155 - UPDATE app_sync_subscriptions
156 - SET current_period_start = $2, current_period_end = $3
157 - WHERE stripe_subscription_id = $1
158 - "#,
159 - )
160 - .bind(stripe_subscription_id)
161 - .bind(start)
162 - .bind(end)
163 - .execute(executor)
164 - .await?;
165 -
166 - Ok(())
167 - }
168 -
169 - /// Update the tier and storage limit of an app sync subscription.
170 - #[tracing::instrument(skip_all)]
171 - pub async fn update_app_sync_sub_tier<'e>(
172 - executor: impl sqlx::PgExecutor<'e>,
173 - stripe_subscription_id: &str,
174 - tier: AppSyncTier,
175 - storage_limit_bytes: Option<i64>,
176 - ) -> Result<Option<DbAppSyncSubscription>> {
177 - let sub = sqlx::query_as::<_, DbAppSyncSubscription>(
178 - r#"
179 - UPDATE app_sync_subscriptions
180 - SET tier = $2, storage_limit_bytes = $3
181 - WHERE stripe_subscription_id = $1
182 - RETURNING *
183 - "#,
184 - )
185 - .bind(stripe_subscription_id)
186 - .bind(tier)
187 - .bind(storage_limit_bytes)
188 - .fetch_optional(executor)
189 - .await?;
190 -
191 - Ok(sub)
192 - }
193 -
194 - /// Cancel an app sync subscription (set status + canceled_at).
195 - #[tracing::instrument(skip_all)]
196 - pub async fn cancel_app_sync_sub(
197 - pool: &PgPool,
198 - stripe_subscription_id: &str,
199 - ) -> Result<Option<DbAppSyncSubscription>> {
200 - let sub = sqlx::query_as::<_, DbAppSyncSubscription>(
201 - r#"
202 - UPDATE app_sync_subscriptions
203 - SET status = 'canceled', canceled_at = COALESCE(canceled_at, NOW())
204 - WHERE stripe_subscription_id = $1
205 - RETURNING *
206 - "#,
207 - )
208 - .bind(stripe_subscription_id)
209 - .fetch_optional(pool)
210 - .await?;
211 -
212 - Ok(sub)
213 - }
@@ -552,95 +552,6 @@ impl CreatorTier {
552 552 }
553 553 }
554 554
555 - // ── App Sync Tiers ──
556 -
557 - /// Subscription tier for app-level cloud sync (GO, BB, AF).
558 - /// GO and BB use `Standard` (single tier). AF uses Light/Standard/Large for blob storage.
559 - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
560 - #[serde(rename_all = "snake_case")]
561 - pub enum AppSyncTier {
562 - /// GO/BB single tier, or AF metadata-only (no blob storage)
563 - Standard,
564 - /// AF blob: 10 GB
565 - Light,
566 - /// AF blob: 50 GB (also the default for GO/BB)
567 - Large,
568 - }
569 -
570 - impl_str_enum!(AppSyncTier {
571 - Standard => "standard",
572 - Light => "light",
573 - Large => "large",
574 - });
575 -
576 - impl AppSyncTier {
577 - /// Human-readable label for display.
578 - pub fn label(&self) -> &'static str {
579 - match self {
580 - Self::Standard => "Standard",
581 - Self::Light => "Light",
582 - Self::Large => "Large",
583 - }
584 - }
585 -
586 - /// Blob storage limit in bytes for this tier.
587 - pub fn blob_storage_bytes(&self) -> Option<i64> {
588 - match self {
589 - Self::Light => Some(10 * 1024 * 1024 * 1024), // 10 GB
590 - Self::Standard => Some(50 * 1024 * 1024 * 1024), // 50 GB
591 - Self::Large => Some(200 * 1024 * 1024 * 1024), // 200 GB
592 - }
593 - }
594 -
595 - /// Monthly price in cents for a given app. Returns None if tier is not valid for the app.
596 - pub fn monthly_price_cents(&self, app_name: &str) -> Option<i64> {
597 - match app_name.to_lowercase().as_str() {
598 - "goingson" => match self {
599 - Self::Standard => Some(200), // $2/mo
600 - _ => None,
601 - },
602 - "balanced_breakfast" | "balanced breakfast" => match self {
603 - Self::Standard => Some(100), // $1/mo
604 - _ => None,
605 - },
606 - "audiofiles" => match self {
607 - Self::Light => Some(100), // $1/mo
608 - Self::Standard => Some(300), // $3/mo
609 - Self::Large => Some(800), // $8/mo
610 - },
611 - _ => None,
612 - }
613 - }
614 -
615 - /// Annual price in cents for a given app. Returns None if tier is not valid for the app.
616 - pub fn annual_price_cents(&self, app_name: &str) -> Option<i64> {
617 - match app_name.to_lowercase().as_str() {
618 - "goingson" => match self {
619 - Self::Standard => Some(1500), // $15/yr
620 - _ => None,
621 - },
622 - "balanced_breakfast" | "balanced breakfast" => match self {
623 - Self::Standard => Some(800), // $8/yr
624 - _ => None,
625 - },
626 - "audiofiles" => match self {
627 - Self::Light => Some(1000), // $10/yr
628 - Self::Standard => Some(3000), // $30/yr
629 - Self::Large => Some(8000), // $80/yr
630 - },
631 - _ => None,
632 - }
633 - }
634 -
635 - /// Product name for Stripe checkout display.
636 - pub fn product_name(&self, app_name: &str) -> String {
637 - match app_name.to_lowercase().as_str() {
638 - "audiofiles" => format!("audiofiles Cloud Sync — {}", self.label()),
639 - _ => format!("{app_name} Cloud Sync"),
640 - }
641 - }
642 - }
643 -
644 555 // ── AI Tiers ──
645 556
646 557 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
@@ -1032,7 +943,6 @@ pub enum CheckoutType {
1032 943 FanPlus,
1033 944 CreatorTier,
1034 945 Cart,
1035 - AppSync,
1036 946 }
1037 947
1038 948 impl_str_enum!(CheckoutType {
@@ -1042,7 +952,6 @@ impl_str_enum!(CheckoutType {
1042 952 FanPlus => "fan_plus",
1043 953 CreatorTier => "creator_tier",
1044 954 Cart => "cart",
1045 - AppSync => "app_sync",
1046 955 });
1047 956
1048 957 impl ModerationActionType {
@@ -1255,27 +1164,6 @@ mod tests {
1255 1164 }
1256 1165
1257 1166 #[test]
1258 - fn app_sync_tier_round_trip() {
1259 - assert_eq!(AppSyncTier::Standard.to_string(), "standard");
1260 - assert_eq!("light".parse::<AppSyncTier>().unwrap(), AppSyncTier::Light);
1261 - assert_eq!("large".parse::<AppSyncTier>().unwrap(), AppSyncTier::Large);
1262 - assert!("bogus".parse::<AppSyncTier>().is_err());
1263 - }
1264 -
1265 - #[test]
1266 - fn app_sync_tier_storage() {
1267 - assert_eq!(AppSyncTier::Light.blob_storage_bytes(), Some(10 * 1024 * 1024 * 1024));
1268 - assert_eq!(AppSyncTier::Standard.blob_storage_bytes(), Some(50 * 1024 * 1024 * 1024));
1269 - assert_eq!(AppSyncTier::Large.blob_storage_bytes(), Some(200 * 1024 * 1024 * 1024));
1270 - }
1271 -
1272 - #[test]
1273 - fn checkout_type_app_sync() {
1274 - assert_eq!(CheckoutType::AppSync.to_string(), "app_sync");
1275 - assert_eq!("app_sync".parse::<CheckoutType>().unwrap(), CheckoutType::AppSync);
1276 - }
1277 -
1278 - #[test]
1279 1167 fn project_feature_round_trip() {
1280 1168 assert_eq!(ProjectFeature::Audio.to_string(), "audio");
1281 1169 assert_eq!("downloads".parse::<ProjectFeature>().unwrap(), ProjectFeature::Downloads);
@@ -23,6 +23,7 @@ pub mod waitlist;
23 23 pub(crate) mod blog_posts;
24 24 pub(crate) mod license_keys;
25 25 pub(crate) mod synckit;
26 + pub(crate) mod synckit_billing;
26 27 pub(crate) mod oauth;
27 28 pub(crate) mod promo_codes;
28 29 pub(crate) mod follows;
@@ -68,7 +69,6 @@ pub(crate) mod cart;
68 69 pub(crate) mod page_views;
69 70 pub(crate) mod pending_s3_deletions;
70 71 pub(crate) mod pending_uploads;
71 - pub(crate) mod app_sync;
72 72
73 73 pub use id_types::*;
74 74 pub use validated_types::*;
@@ -80,13 +80,12 @@ use sqlx::PgPool;
80 80
81 81 /// Check the sandbox per-IP cap under an advisory lock on a single connection.
82 82 ///
83 - /// Acquires a session-level advisory lock, runs the count query, and unlocks —
84 - /// all on the same connection. Returns the active sandbox count.
83 + /// Acquires a session-level advisory lock, runs the count query, and unlocks; /// all on the same connection. Returns the active sandbox count.
85 84 ///
86 85 /// This avoids the bug where `advisory_lock` + `advisory_unlock` through a pool
87 86 /// use different connections, leaving locks permanently held.
88 87 ///
89 - /// Uses `pg_try_advisory_lock` to avoid blocking under burst load — if the lock
88 + /// Uses `pg_try_advisory_lock` to avoid blocking under burst load; if the lock
90 89 /// is already held, returns an error rather than waiting.
91 90 pub async fn check_sandbox_cap(pool: &PgPool, lock_key: i64, ip: &str) -> Result<i64> {
92 91 let mut conn = pool.acquire().await.map_err(|e| {
@@ -112,35 +112,58 @@ pub struct DbSyncBlob {
112 112 pub uploaded_at: DateTime<Utc>,
113 113 }
114 114
115 - // ── App sync subscription ──
116 -
117 - /// An app-level sync subscription (GO, BB, AF cloud sync).
115 + /// Sync app + billing columns + live usage counters (joined view).
116 + ///
117 + /// Mirrors columns added in migration 117 (`117_synckit_v2_billing.sql`).
118 + /// Built by joining `sync_apps` against `sync_app_usage_current` (LEFT JOIN; /// the usage row is inserted on app create, but guards against the row going
119 + /// missing).
118 120 #[derive(Debug, Clone, FromRow, Serialize)]
119 - pub struct DbAppSyncSubscription {
120 - /// Database primary key.
121 - pub id: uuid::Uuid,
122 - /// Subscribing user.
123 - pub user_id: UserId,
124 - /// Sync app this subscription covers.
125 - pub app_id: SyncAppId,
126 - /// Stripe subscription ID (e.g. `sub_...`).
127 - pub stripe_subscription_id: String,
128 - /// Stripe customer ID (e.g. `cus_...`).
129 - pub stripe_customer_id: String,
130 - /// Tier: "standard" for GO/BB, "light"/"standard"/"large" for AF blob tiers.
131 - pub tier: super::super::AppSyncTier,
132 - /// Subscription status (active, past_due, canceled, etc.).
133 - pub status: super::super::SubscriptionStatus,
134 - /// Blob storage limit in bytes (AF only, NULL for GO/BB).
135 - pub storage_limit_bytes: Option<i64>,
136 - /// Start of current billing period.
121 + pub struct DbSyncAppBilling {
122 + // sync_apps base
123 + pub id: SyncAppId,
124 + pub creator_id: UserId,
125 + pub name: String,
126 + /// First-party app; bypasses all billing logic.
127 + pub is_internal: bool,
128 + /// Stripe Customer ID for this app's developer (one customer per app).
129 + pub stripe_customer_id: Option<String>,
130 + /// Stripe Subscription ID; set once billing activates.
131 + pub stripe_subscription_id: Option<String>,
132 + /// 'draft' | 'active' | 'suspended_unpaid' | 'canceled'
133 + pub billing_status: String,
134 + /// Storage cap in GB (NULL while in draft).
135 + pub storage_gb_cap: Option<i32>,
136 + /// Egress quota = storage_gb_cap × egress_multiple (per month).
137 + /// Decoded as `f64` via explicit `CAST(... AS DOUBLE PRECISION)` in queries
138 + /// (sqlx doesn't support PostgreSQL NUMERIC → f64 directly without the
139 + /// `bigdecimal` or `rust_decimal` feature, neither of which is enabled).
140 + pub egress_multiple: Option<f64>,
141 + /// 'per_key' | 'app_wide'
142 + pub enforcement_mode: String,
143 + /// Required iff enforcement_mode = 'per_key'.
144 + pub key_cap: Option<i32>,
137 145 pub current_period_start: Option<DateTime<Utc>>,
138 - /// End of current billing period.
139 146 pub current_period_end: Option<DateTime<Utc>>,
140 - /// When the subscription was canceled.
141 - pub canceled_at: Option<DateTime<Utc>>,
142 - /// When the subscription was created.
143 - pub created_at: DateTime<Utc>,
147 + // sync_app_usage_current (LEFT-joined, may be missing if row absent)
148 + pub bytes_stored: Option<i64>,
149 + pub bytes_egress_period: Option<i64>,
150 + pub keys_claimed: Option<i32>,
151 + pub last_warning_pct: Option<i16>,
152 + pub period_started_at: Option<DateTime<Utc>>,
153 + // projects (LEFT-joined). Some when the app is linked to a project — used
154 + // to route the Stripe billing portal back to the project dashboard.
155 + pub project_slug: Option<String>,
156 + }
157 +
158 + /// A single active key claim (row in `sync_app_keys` with `released_at IS NULL`).
159 + ///
160 + /// See migration 117 for the full table definition. This is the projection
161 + /// used by the dashboard "Active keys" list; only the columns the UI needs.
162 + #[derive(Debug, Clone, FromRow, Serialize)]
163 + pub struct DbSyncAppKey {
164 + pub id: uuid::Uuid,
165 + pub key: String,
166 + pub claimed_at: DateTime<Utc>,
144 167 }
145 168
146 169 // ── OTA models ──
@@ -59,6 +59,11 @@ pub async fn create_sync_blob_idempotent(
59 59 }
60 60
61 61 /// Get total blob storage used by a user for an app (in bytes).
62 + ///
63 + /// Currently unused at the call-site level (Phase 5 moved cap enforcement to
64 + /// the app-level counters in `sync_app_usage_current`), but kept around for
65 + /// the eventual per-user "what's taking up space" dashboard view.
66 + #[allow(dead_code)]
62 67 #[tracing::instrument(skip_all)]
63 68 pub async fn get_blob_storage_used(
64 69 pool: &PgPool,
@@ -1,7 +1,7 @@
1 1 //! SyncKit: device registration, change log, blob storage, and encryption key
2 2 //! management.
3 3 //!
4 - //! Split by domain into submodules — all functions are re-exported flat so
4 + //! Split by domain into submodules; all functions are re-exported flat so
5 5 //! call sites keep using `db::synckit::<fn_name>(...)`.
6 6
7 7 mod apps;
@@ -0,0 +1,806 @@
1 + //! SyncKit v2 developer-billing queries.
2 + //!
3 + //! Operates on the billing columns added to `sync_apps` and the auxiliary
4 + //! `sync_app_usage_current` table by migration 117 (`117_synckit_v2_billing.sql`).
5 + //!
6 + //! Schema reference (kept here for offline-mode reviewers since the migration
7 + //! is not yet applied at the time of writing):
8 + //!
9 + //! ```text
10 + //! sync_apps:
11 + //! is_internal BOOLEAN NOT NULL DEFAULT FALSE
12 + //! stripe_customer_id TEXT
13 + //! stripe_subscription_id TEXT UNIQUE
14 + //! billing_status TEXT NOT NULL CHECK IN
15 + //! ('draft','active','suspended_unpaid','canceled')
16 + //! storage_gb_cap INT
17 + //! egress_multiple NUMERIC(6,2)
18 + //! enforcement_mode TEXT NOT NULL CHECK IN ('per_key','app_wide')
19 + //! key_cap INT
20 + //! current_period_start TIMESTAMPTZ
21 + //! current_period_end TIMESTAMPTZ
22 + //!
23 + //! sync_app_usage_current (one row per app):
24 + //! app_id, bytes_stored, bytes_egress_period, keys_claimed,
25 + //! last_warning_pct, period_started_at, updated_at
26 + //! ```
27 +
28 + use chrono::{DateTime, Utc};
29 + use sqlx::PgPool;
30 +
31 + use super::id_types::SyncAppId;
32 + use super::models::{DbSyncAppBilling, DbSyncAppKey};
33 + use crate::error::Result;
34 +
35 + /// Outcome of a `claim_key` call.
36 + #[derive(Debug, Clone, Copy, PartialEq, Eq)]
37 + pub struct ClaimResult {
38 + /// `true` if this call inserted a new active claim row;
39 + /// `false` if the key was already actively claimed (idempotent re-claim).
40 + pub newly_claimed: bool,
41 + /// Total active claims for this app after the operation.
42 + pub total_claimed: i32,
43 + }
44 +
45 + /// Outcome of a `release_key` call.
46 + #[derive(Debug, Clone, Copy, PartialEq, Eq)]
47 + pub struct ReleaseResult {
48 + /// `true` if this call transitioned an active row to released;
49 + /// `false` if no active row existed (idempotent release).
50 + pub newly_released: bool,
51 + /// Total active claims for this app after the operation.
52 + pub total_claimed: i32,
53 + }
54 +
55 + /// Set the Stripe customer ID on a sync app (idempotent; re-sets the same id
56 + /// without error). Called by the billing setup route once the developer-side
57 + /// Customer object has been created in Stripe.
58 + #[tracing::instrument(skip_all)]
59 + pub async fn set_stripe_customer(
60 + pool: &PgPool,
61 + app_id: SyncAppId,
62 + stripe_customer_id: &str,
63 + ) -> Result<()> {
64 + sqlx::query("UPDATE sync_apps SET stripe_customer_id = $2 WHERE id = $1")
65 + .bind(app_id)
66 + .bind(stripe_customer_id)
67 + .execute(pool)
68 + .await?;
69 + Ok(())
70 + }
71 +
72 + /// Activate billing on a draft app: stamps subscription id, knobs, status,
73 + /// and current period. The `egress_multiple` is passed in as `f64` and cast
74 + /// to the column's `NUMERIC(6,2)` type in SQL.
75 + #[tracing::instrument(skip_all)]
76 + #[allow(clippy::too_many_arguments)]
77 + pub async fn activate_billing(
78 + pool: &PgPool,
79 + app_id: SyncAppId,
80 + storage_gb_cap: i32,
81 + egress_multiple: f64,
82 + enforcement_mode: &str,
83 + key_cap: Option<i32>,
84 + stripe_sub_id: &str,
85 + period_start: DateTime<Utc>,
86 + period_end: DateTime<Utc>,
87 + ) -> Result<()> {
88 + sqlx::query(
89 + r#"
90 + UPDATE sync_apps SET
91 + billing_status = 'active',
92 + stripe_subscription_id = $2,
93 + storage_gb_cap = $3,
94 + egress_multiple = CAST($4 AS NUMERIC(6, 2)),
95 + enforcement_mode = $5,
96 + key_cap = $6,
97 + current_period_start = $7,
98 + current_period_end = $8
99 + WHERE id = $1
100 + "#,
101 + )
102 + .bind(app_id)
103 + .bind(stripe_sub_id)
104 + .bind(storage_gb_cap)
105 + .bind(egress_multiple)
106 + .bind(enforcement_mode)
107 + .bind(key_cap)
108 + .bind(period_start)
109 + .bind(period_end)
110 + .execute(pool)
111 + .await?;
112 + Ok(())
113 + }
114 +
115 + /// Update the pricing knobs on an already-active app (no status change).
116 + #[tracing::instrument(skip_all)]
117 + pub async fn update_knobs(
118 + pool: &PgPool,
119 + app_id: SyncAppId,
120 + storage_gb_cap: i32,
121 + egress_multiple: f64,
122 + enforcement_mode: &str,
123 + key_cap: Option<i32>,
124 + ) -> Result<()> {
125 + sqlx::query(
126 + r#"
127 + UPDATE sync_apps SET
128 + storage_gb_cap = $2,
129 + egress_multiple = CAST($3 AS NUMERIC(6, 2)),
130 + enforcement_mode = $4,
131 + key_cap = $5
132 + WHERE id = $1
133 + "#,
134 + )
135 + .bind(app_id)
136 + .bind(storage_gb_cap)
137 + .bind(egress_multiple)
138 + .bind(enforcement_mode)
139 + .bind(key_cap)
140 + .execute(pool)
141 + .await?;
142 + Ok(())
143 + }
144 +
145 + /// Set `billing_status` directly (used by webhook handlers for
146 + /// suspended_unpaid / canceled transitions).
147 + #[tracing::instrument(skip_all)]
148 + pub async fn set_billing_status<'e>(
149 + executor: impl sqlx::PgExecutor<'e>,
150 + app_id: SyncAppId,
151 + status: &str,
152 + ) -> Result<()> {
153 + sqlx::query("UPDATE sync_apps SET billing_status = $2 WHERE id = $1")
154 + .bind(app_id)
155 + .bind(status)
156 + .execute(executor)
157 + .await?;
158 + Ok(())
159 + }
160 +
161 + /// Set the current period bounds (called by `invoice.paid` handler).
162 + #[tracing::instrument(skip_all)]
163 + pub async fn set_period<'e>(
164 + executor: impl sqlx::PgExecutor<'e>,
165 + app_id: SyncAppId,
166 + start: DateTime<Utc>,
167 + end: DateTime<Utc>,
168 + ) -> Result<()> {
169 + sqlx::query(
170 + r#"
171 + UPDATE sync_apps SET
172 + current_period_start = $2,
173 + current_period_end = $3
174 + WHERE id = $1
175 + "#,
176 + )
177 + .bind(app_id)
178 + .bind(start)
179 + .bind(end)
180 + .execute(executor)
181 + .await?;
182 + Ok(())
183 + }
184 +
185 + /// Reset the per-period usage counters on `sync_app_usage_current`. Called
186 + /// from the `invoice.paid` webhook handler at period rollover.
187 + #[tracing::instrument(skip_all)]
188 + pub async fn reset_period_usage<'e>(
189 + executor: impl sqlx::PgExecutor<'e>,
190 + app_id: SyncAppId,
191 + ) -> Result<()> {
192 + sqlx::query(
193 + r#"
194 + UPDATE sync_app_usage_current SET
195 + bytes_egress_period = 0,
196 + last_warning_pct = 0,
197 + period_started_at = NOW(),
198 + updated_at = NOW()
199 + WHERE app_id = $1
200 + "#,
201 + )
202 + .bind(app_id)
203 + .execute(executor)
204 + .await?;
205 + Ok(())
206 + }
207 +
208 + /// Look up the sync app that owns a given Stripe subscription. Used by the
209 + /// webhook router to distinguish SyncKit v2 subscriptions from
210 + /// creator-tier / Fan+ subscriptions.
211 + #[tracing::instrument(skip_all)]
212 + pub async fn get_app_by_stripe_subscription(
213 + pool: &PgPool,
214 + stripe_sub_id: &str,
215 + ) -> Result<Option<SyncAppId>> {
216 + let row: Option<(SyncAppId,)> = sqlx::query_as(
217 + "SELECT id FROM sync_apps WHERE stripe_subscription_id = $1",
218 + )
219 + .bind(stripe_sub_id)
220 + .fetch_optional(pool)
221 + .await?;
222 + Ok(row.map(|(id,)| id))
223 + }
224 +
225 + /// Fetch the combined app+billing+usage view for a single app. `egress_multiple`
226 + /// is cast from NUMERIC to DOUBLE PRECISION so it decodes into `f64` without
227 + /// the `bigdecimal` sqlx feature. `sync_app_usage_current` is LEFT-joined; /// the row is normally inserted on app create (see migration 117), but
228 + /// `Option` is used so a missing row doesn't blow up the query.
229 + #[tracing::instrument(skip_all)]
230 + pub async fn get_app_with_billing(
231 + pool: &PgPool,
232 + app_id: SyncAppId,
233 + ) -> Result<Option<DbSyncAppBilling>> {
234 + let app = sqlx::query_as::<_, DbSyncAppBilling>(
235 + r#"
236 + SELECT
237 + sa.id,
238 + sa.creator_id,
239 + sa.name,
240 + sa.is_internal,
241 + sa.stripe_customer_id,
242 + sa.stripe_subscription_id,
243 + sa.billing_status,
244 + sa.storage_gb_cap,
245 + CAST(sa.egress_multiple AS DOUBLE PRECISION) AS egress_multiple,
246 + sa.enforcement_mode,
247 + sa.key_cap,
248 + sa.current_period_start,
249 + sa.current_period_end,
250 + u.bytes_stored,
251 + u.bytes_egress_period,
252 + u.keys_claimed,
253 + u.last_warning_pct,
254 + u.period_started_at,
255 + p.slug AS project_slug
256 + FROM sync_apps sa
257 + LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id
258 + LEFT JOIN projects p ON p.id = sa.project_id
259 + WHERE sa.id = $1
260 + "#,
261 + )
262 + .bind(app_id)
263 + .fetch_optional(pool)
264 + .await?;
265 + Ok(app)
266 + }
267 +
268 + /// Batch variant of `get_app_with_billing` that loads every app owned by a
269 + /// creator with its billing+usage join in one query. Used by the user-level
270 + /// SyncKit dashboard tab.
271 + #[tracing::instrument(skip_all)]
272 + pub async fn get_apps_with_billing_by_creator(
273 + pool: &PgPool,
274 + creator_id: super::id_types::UserId,
275 + ) -> Result<Vec<DbSyncAppBilling>> {
276 + let apps = sqlx::query_as::<_, DbSyncAppBilling>(
277 + r#"
278 + SELECT
279 + sa.id,
280 + sa.creator_id,
281 + sa.name,
282 + sa.is_internal,
283 + sa.stripe_customer_id,
284 + sa.stripe_subscription_id,
285 + sa.billing_status,
286 + sa.storage_gb_cap,
287 + CAST(sa.egress_multiple AS DOUBLE PRECISION) AS egress_multiple,
288 + sa.enforcement_mode,
289 + sa.key_cap,
290 + sa.current_period_start,
291 + sa.current_period_end,
292 + u.bytes_stored,
293 + u.bytes_egress_period,
294 + u.keys_claimed,
295 + u.last_warning_pct,
296 + u.period_started_at,
297 + p.slug AS project_slug
298 + FROM sync_apps sa
299 + LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id
300 + LEFT JOIN projects p ON p.id = sa.project_id
301 + WHERE sa.creator_id = $1
302 + "#,
303 + )
304 + .bind(creator_id)
305 + .fetch_all(pool)
306 + .await?;
307 + Ok(apps)
308 + }
309 +
310 + /// Batch variant of `get_app_with_billing` for one project. Used by the
311 + /// project-level SyncKit dashboard tab.
312 + #[tracing::instrument(skip_all)]
313 + pub async fn get_apps_with_billing_by_project(
314 + pool: &PgPool,
315 + project_id: super::id_types::ProjectId,
316 + ) -> Result<Vec<DbSyncAppBilling>> {
317 + let apps = sqlx::query_as::<_, DbSyncAppBilling>(
318 + r#"
319 + SELECT
320 + sa.id,
321 + sa.creator_id,
322 + sa.name,
323 + sa.is_internal,
324 + sa.stripe_customer_id,
325 + sa.stripe_subscription_id,
326 + sa.billing_status,
327 + sa.storage_gb_cap,
328 + CAST(sa.egress_multiple AS DOUBLE PRECISION) AS egress_multiple,
329 + sa.enforcement_mode,
330 + sa.key_cap,
331 + sa.current_period_start,
332 + sa.current_period_end,
333 + u.bytes_stored,
334 + u.bytes_egress_period,
335 + u.keys_claimed,
336 + u.last_warning_pct,
337 + u.period_started_at,
338 + p.slug AS project_slug
339 + FROM sync_apps sa
340 + LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id
341 + LEFT JOIN projects p ON p.id = sa.project_id
342 + WHERE sa.project_id = $1
343 + "#,
344 + )
345 + .bind(project_id)
346 + .fetch_all(pool)
347 + .await?;
348 + Ok(apps)
349 + }
350 +
351 + /// Claim an SDK encryption key for an app. Idempotent: a re-claim of an
352 + /// already-active key returns `newly_claimed = false` without inserting.
353 + ///
354 + /// The transaction locks `sync_app_usage_current` for this app first, so the
355 + /// route handler can read `keys_claimed` and decide on the cap before this
356 + /// runs without races. (The handler does its check pre-transaction; this
357 + /// function re-checks idempotency inside the lock.)
358 + #[tracing::instrument(skip_all)]
359 + pub async fn claim_key(
360 + pool: &sqlx::PgPool,
361 + app_id: SyncAppId,
362 + key: &str,
363 + ) -> Result<ClaimResult> {
364 + let mut tx = pool.begin().await?;
365 +
366 + // Lock the usage row for this app. Returns the current keys_claimed.
367 + let (mut keys_claimed,): (i32,) = sqlx::query_as(
368 + "SELECT keys_claimed FROM sync_app_usage_current WHERE app_id = $1 FOR UPDATE",
369 + )
370 + .bind(app_id)
371 + .fetch_one(&mut *tx)
372 + .await?;
373 +
374 + // Is there already an active claim for this key?
375 + let existing: Option<(uuid::Uuid,)> = sqlx::query_as(
376 + "SELECT id FROM sync_app_keys
377 + WHERE app_id = $1 AND key = $2 AND released_at IS NULL",
378 + )
379 + .bind(app_id)
380 + .bind(key)
381 + .fetch_optional(&mut *tx)
382 + .await?;
383 +
384 + let newly_claimed = if existing.is_some() {
385 + false
386 + } else {
387 + sqlx::query(
388 + "INSERT INTO sync_app_keys (app_id, key) VALUES ($1, $2)",
389 + )
390 + .bind(app_id)
391 + .bind(key)
392 + .execute(&mut *tx)
393 + .await?;
394 +
395 + sqlx::query(
396 + "UPDATE sync_app_usage_current
397 + SET keys_claimed = keys_claimed + 1, updated_at = NOW()
398 + WHERE app_id = $1",
399 + )
400 + .bind(app_id)
401 + .execute(&mut *tx)
402 + .await?;
403 +
404 + keys_claimed += 1;
405 + true
406 + };
407 +
408 + tx.commit().await?;
409 + Ok(ClaimResult {
410 + newly_claimed,
411 + total_claimed: keys_claimed,
412 + })
413 + }
414 +
415 + /// Release an SDK encryption key. Idempotent: releasing a key that is not
416 + /// actively claimed returns `newly_released = false`.
417 + #[tracing::instrument(skip_all)]
418 + pub async fn release_key(
419 + pool: &sqlx::PgPool,
420 + app_id: SyncAppId,
421 + key: &str,
422 + ) -> Result<ReleaseResult> {
423 + let mut tx = pool.begin().await?;
424 +
425 + let (mut keys_claimed,): (i32,) = sqlx::query_as(
426 + "SELECT keys_claimed FROM sync_app_usage_current WHERE app_id = $1 FOR UPDATE",
427 + )
428 + .bind(app_id)
429 + .fetch_one(&mut *tx)
430 + .await?;
431 +
432 + let released: Option<(uuid::Uuid,)> = sqlx::query_as(
433 + "UPDATE sync_app_keys SET released_at = NOW()
434 + WHERE app_id = $1 AND key = $2 AND released_at IS NULL
435 + RETURNING id",
436 + )
437 + .bind(app_id)
438 + .bind(key)
439 + .fetch_optional(&mut *tx)
440 + .await?;
441 +
442 + let newly_released = if released.is_some() {
443 + sqlx::query(
444 + "UPDATE sync_app_usage_current
445 + SET keys_claimed = GREATEST(keys_claimed - 1, 0), updated_at = NOW()
446 + WHERE app_id = $1",
447 + )
448 + .bind(app_id)
449 + .execute(&mut *tx)
450 + .await?;
451 + keys_claimed = (keys_claimed - 1).max(0);
452 + true
453 + } else {
454 + false
455 + };
456 +
457 + tx.commit().await?;
458 + Ok(ReleaseResult {
459 + newly_released,
460 + total_claimed: keys_claimed,
461 + })
462 + }
463 +
464 + /// List active (un-released) key claims for an app, ordered by `claimed_at DESC`.
465 + /// Used by the dashboard "Active keys" view.
466 + #[tracing::instrument(skip_all)]
467 + pub async fn list_active_keys(
468 + pool: &sqlx::PgPool,
469 + app_id: SyncAppId,
470 + limit: i64,
471 + offset: i64,
472 + ) -> Result<Vec<DbSyncAppKey>> {
473 + let rows = sqlx::query_as::<_, DbSyncAppKey>(
474 + "SELECT id, key, claimed_at FROM sync_app_keys
475 + WHERE app_id = $1 AND released_at IS NULL
476 + ORDER BY claimed_at DESC
477 + LIMIT $2 OFFSET $3",
478 + )
479 + .bind(app_id)
480 + .bind(limit)
481 + .bind(offset)
482 + .fetch_all(pool)
483 + .await?;
484 + Ok(rows)
485 + }
486 +
487 + // ── Phase 5: Usage counters and cap enforcement ──
488 +
489 + /// A breached cap, returned by `would_exceed_*` checks.
490 + #[derive(Debug, Clone, PartialEq, Eq)]
491 + pub struct ExceededLimit {
492 + /// `"storage"` or `"egress"`. (`"billing"` is reserved for inactive-billing
493 + /// failure paths so a single 402 response shape can carry every reason; /// the caller decides whether to thread that through here or check
494 + /// `billing_status` separately.)
495 + pub dimension: &'static str,
496 + /// Current usage in bytes (before the would-be addition).
497 + pub used: i64,
498 + /// Configured cap in bytes.
499 + pub limit: i64,
500 + }
Lines truncated
@@ -580,6 +580,67 @@ Your content remains accessible to fans. If you experience issues, they should r
580 580 pub async fn send_alert(&self, to: &str, subject: &str, body: &str) -> Result<()> {
581 581 self.transport.send_email(to, subject, body).await
582 582 }
583 +
584 + /// Warn an app owner that they're approaching (or have hit) a SyncKit cap.
585 + ///
586 + /// `dimension` is `"storage"` or `"egress"`. `pct` is 75/90/100. At 100%
587 + /// the next request to that dimension will be hard-blocked with a 402.
588 + pub async fn send_synckit_usage_warning(
589 + &self,
590 + to_email: &str,
591 + app_name: &str,
592 + dimension: &str,
593 + pct: i16,
594 + used_bytes: i64,
595 + limit_bytes: i64,
596 + billing_url: &str,
597 + ) -> Result<()> {
598 + fn fmt_gb(bytes: i64) -> String {
599 + let gb = bytes as f64 / (1024.0 * 1024.0 * 1024.0);
600 + if gb >= 10.0 { format!("{gb:.0} GB") } else { format!("{gb:.2} GB") }
601 + }
602 +
603 + let dim_human = match dimension {
604 + "storage" => "storage",
605 + "egress" => "monthly egress",
606 + other => other,
607 + };
608 + let subject = if pct >= 100 {
609 + format!("{app_name}: {dim_human} cap reached")
610 + } else {
611 + format!("{app_name}: {pct}% of {dim_human} cap used")
612 + };
613 +
614 + let pct_msg = if pct >= 100 {
615 + format!(
616 + "Your SyncKit app \"{app_name}\" has reached its {dim_human} cap.\n\
617 + Further {dim_human} requests will be rejected (HTTP 402) until the\n\
618 + cap is raised or the period rolls over."
619 + )
620 + } else {
621 + format!(
622 + "Your SyncKit app \"{app_name}\" has used {pct}% of its {dim_human}\n\
623 + cap. At 100% further requests are hard-blocked (HTTP 402).",
624 + )
625 + };
626 +
627 + let body = format!(
628 + r#"{pct_msg}
629 +
630 + Used: {used}
631 + Limit: {limit}
632 +
633 + Adjust caps or review usage:
634 + {url}
635 +
636 + - Makenotwork"#,
637 + used = fmt_gb(used_bytes),
638 + limit = fmt_gb(limit_bytes),
639 + url = billing_url,
640 + );
641 +
642 + self.transport.send_email(to_email, &subject, &body).await
643 + }
583 644 }
584 645
585 646 #[cfg(test)]
@@ -24,6 +24,7 @@ pub mod mt_client;
24 24 pub mod wam_client;
25 25 pub mod payments;
26 26 pub mod pricing;
27 + pub mod synckit_billing;
27 28 pub mod scheduler;
28 29 pub mod routes;
29 30 pub mod rss;
@@ -1,22 +1,21 @@
1 1 //! Checkout session creation.
2 2 //!
3 3 //! Direct Charges pattern: payment goes directly to the connected account.
4 - //! No `application_fee_amount` is set — the 0% platform fee promise.
4 + //! No `application_fee_amount` is set: the 0% platform fee promise.
5 5
6 6 use std::collections::HashMap;
7 7
8 8 use stripe::StripeRequest;
9 9 use stripe_checkout::checkout_session::{
10 10 CreateCheckoutSession, CreateCheckoutSessionAutomaticTax, CreateCheckoutSessionLineItems,
11 - CreateCheckoutSessionLineItemsPriceData, CreateCheckoutSessionLineItemsPriceDataRecurring,
12 - CreateCheckoutSessionLineItemsPriceDataRecurringInterval,
11 + CreateCheckoutSessionLineItemsPriceData,
13 12 CreateCheckoutSessionSubscriptionData, ProductData,
14 13 };
15 14 use stripe_shared::CheckoutSessionMode;
16 15 use stripe_types::Currency;
17 16
18 17 use crate::constants;
19 - use crate::db::{Cents, CheckoutType, ItemId, ProjectId, PromoCodeId, SubscriptionTierId, SyncAppId, UserId};
18 + use crate::db::{Cents, CheckoutType, ItemId, ProjectId, PromoCodeId, SubscriptionTierId, UserId};
20 19 use crate::error::{AppError, Result};
21 20 use super::StripeClient;
22 21
@@ -92,20 +91,6 @@ pub struct GuestCheckoutParams<'a> {
92 91 pub enable_stripe_tax: bool,
93 92 }
94 93
95 - /// Parameters for an app sync subscription Checkout Session (inline pricing).
96 - pub struct AppSyncCheckoutParams<'a> {
97 - pub product_name: &'a str,
98 - pub price_cents: i64,
99 - /// "month" or "year"
100 - pub interval: &'a str,
101 - pub user_id: UserId,
102 - pub app_id: SyncAppId,
103 - pub app_name: &'a str,
104 - pub tier: &'a str,
105 - pub success_url: &'a str,
106 - pub cancel_url: &'a str,
107 - }
108 -
109 94 fn check_min_charge(amount_cents: i64) -> Result<()> {
110 95 if amount_cents > 0 && amount_cents < constants::STRIPE_MINIMUM_CHARGE_CENTS {
111 96 return Err(AppError::BadRequest(format!(
@@ -386,44 +371,4 @@ impl StripeClient {
386 371 self.send_on_platform(builder, "creator_tier_checkout").await
387 372 }
388 373
389 - /// Build a Checkout Session for an app sync subscription on MNW's own Stripe account.
390 - /// Uses inline price_data with recurring — no pre-created Stripe products needed.
391 - #[tracing::instrument(skip_all, name = "payments::create_app_sync_checkout_session")]
392 - pub async fn create_app_sync_checkout_session(
393 - &self,
394 - params: &AppSyncCheckoutParams<'_>,
395 - ) -> Result<stripe_shared::CheckoutSession> {
396 - let recurring_interval = match params.interval {
397 - "year" => CreateCheckoutSessionLineItemsPriceDataRecurringInterval::Year,
398 - _ => CreateCheckoutSessionLineItemsPriceDataRecurringInterval::Month,
399 - };
400 -
401 - let line_item = CreateCheckoutSessionLineItems {
402 - price_data: Some(CreateCheckoutSessionLineItemsPriceData {
403 - currency: Currency::USD,
404 - product_data: Some(ProductData::new(params.product_name.to_string())),
405 - unit_amount: Some(params.price_cents),
406 - recurring: Some(CreateCheckoutSessionLineItemsPriceDataRecurring::new(recurring_interval)),
407 - ..CreateCheckoutSessionLineItemsPriceData::new(Currency::USD)
408 - }),
409 - quantity: Some(1),
410 - ..CreateCheckoutSessionLineItems::new()
411 - };
412 -
413 - let mut metadata = HashMap::new();
414 - metadata.insert("checkout_type".to_string(), CheckoutType::AppSync.to_string());
415 - metadata.insert("user_id".to_string(), params.user_id.to_string());
416 - metadata.insert("app_id".to_string(), params.app_id.to_string());
417 - metadata.insert("tier".to_string(), params.tier.to_string());
418 - metadata.insert("app_name".to_string(), params.app_name.to_string());
419 -
420 - let builder = CreateCheckoutSession::new()
421 - .mode(CheckoutSessionMode::Subscription)
422 - .success_url(params.success_url.to_string())
423 - .cancel_url(params.cancel_url.to_string())
424 - .line_items(vec![line_item])
425 - .metadata(metadata);
426 -
427 - self.send_on_platform(builder, "app_sync_checkout").await
428 - }
429 374 }
@@ -7,10 +7,10 @@
7 7
8 8 use std::collections::HashMap;
9 9
10 - use crate::db::{CheckoutType, ItemId, ProjectId, PromoCodeId, SubscriptionTierId, SyncAppId, UserId};
10 + use crate::db::{CheckoutType, ItemId, ProjectId, PromoCodeId, SubscriptionTierId, UserId};
11 11 use crate::error::{AppError, Result};
12 12
13 - /// Convenience alias — the metadata pulled off a Stripe `CheckoutSession`.
13 + /// Convenience alias for the metadata pulled off a Stripe `CheckoutSession`.
14 14 pub type CheckoutMetaMap = HashMap<String, String>;
15 15
16 16 fn require<'a>(meta: Option<&'a CheckoutMetaMap>, key: &str) -> Result<&'a String> {
@@ -113,25 +113,6 @@ impl TipCheckoutMetadata {
113 113 }
114 114 }
115 115
116 - /// Parsed metadata from an app sync checkout session.
117 - #[derive(Debug)]
118 - pub struct AppSyncCheckoutMetadata {
119 - pub user_id: UserId,
120 - pub app_id: SyncAppId,
121 - pub tier: String,
122 - pub app_name: String,
123 - }
124 -
125 - impl AppSyncCheckoutMetadata {
126 - pub fn from_metadata(meta: Option<&CheckoutMetaMap>) -> Result<Self> {
127 - let user_id: UserId = parse_uuid_to(require(meta, "user_id")?, "user_id")?;
128 - let app_id: SyncAppId = parse_uuid_to(require(meta, "app_id")?, "app_id")?;
129 - let tier = require(meta, "tier")?.clone();
130 - let app_name = require(meta, "app_name")?.clone();
131 - Ok(AppSyncCheckoutMetadata { user_id, app_id, tier, app_name })
132 - }
133 - }
134 -
135 116 /// Parsed metadata from a cart (multi-item) checkout session.
136 117 #[derive(Debug)]
137 118 pub struct CartCheckoutMetadata {
@@ -191,10 +172,6 @@ pub fn is_guest_checkout(meta: Option<&CheckoutMetaMap>) -> bool {
191 172 get_checkout_type(meta) == Some(CheckoutType::Guest)
192 173 }
193 174
194 - pub fn is_app_sync_checkout(meta: Option<&CheckoutMetaMap>) -> bool {
195 - get_checkout_type(meta) == Some(CheckoutType::AppSync)
196 - }
197 -
198 175 pub fn is_cart_checkout(meta: Option<&CheckoutMetaMap>) -> bool {
199 176 get_checkout_type(meta) == Some(CheckoutType::Cart)
200 177 }
@@ -430,12 +407,6 @@ mod tests {
430 407 }
431 408
432 409 #[test]
433 - fn is_app_sync_checkout_true() {
434 - let m = meta_of(&[("checkout_type", "app_sync")]);
435 - assert!(is_app_sync_checkout(Some(&m)));
436 - }
437 -
438 - #[test]
439 410 fn is_tip_checkout_true() {
440 411 let m = meta_of(&[("checkout_type", "tip")]);
441 412 assert!(is_tip_checkout(Some(&m)));
@@ -500,36 +471,6 @@ mod tests {
500 471 assert!(format!("{err:?}").contains("tier"));
501 472 }
502 473
503 - // --- AppSyncCheckoutMetadata ---
504 -
505 - #[test]
506 - fn app_sync_metadata_valid() {
507 - let u = UserId::new(); let a = SyncAppId::new();
508 - let m = meta_of(&[
509 - ("checkout_type", "app_sync"),
510 - ("user_id", &u.to_string()),
511 - ("app_id", &a.to_string()),
512 - ("tier", "basic"),
513 - ("app_name", "GoingsOn"),
514 - ]);
515 - let r = AppSyncCheckoutMetadata::from_metadata(Some(&m)).unwrap();
516 - assert_eq!(r.user_id, u);
517 - assert_eq!(r.app_id, a);
518 - assert_eq!(r.tier, "basic");
519 - assert_eq!(r.app_name, "GoingsOn");
520 - }
521 -
522 - #[test]
523 - fn app_sync_metadata_missing_app_name() {
524 - let m = meta_of(&[
525 - ("user_id", &UserId::new().to_string()),
526 - ("app_id", &SyncAppId::new().to_string()),
527 - ("tier", "basic"),
528 - ]);
529 - let err = AppSyncCheckoutMetadata::from_metadata(Some(&m)).unwrap_err();
530 - assert!(format!("{err:?}").contains("app_name"));
531 - }
532 -
533 474 // --- get_checkout_type ---
534 475
535 476 #[test]
@@ -540,7 +481,6 @@ mod tests {
540 481 ("creator_tier", CheckoutType::CreatorTier),
541 482 ("subscription", CheckoutType::Subscription),
542 483 ("guest", CheckoutType::Guest),
543 - ("app_sync", CheckoutType::AppSync),
544 484 ("cart", CheckoutType::Cart),
545 485 ] {
546 486 let m = meta_of(&[("checkout_type", s)]);
@@ -569,7 +509,6 @@ mod tests {
569 509 assert!(!is_creator_tier_checkout(meta));
570 510 assert!(!is_subscription_checkout(meta));
571 511 assert!(!is_guest_checkout(meta));
572 - assert!(!is_app_sync_checkout(meta));
573 512 assert!(!is_cart_checkout(meta));
574 513 }
575 514 }
@@ -1,13 +1,11 @@
1 - //! Connected account operations — onboarding, balance, product/price creation,
1 + //! Connected account operations: onboarding, balance, product/price creation,
2 2 //! subscription lifecycle, refunds, and billing portal.
3 3
4 4 use stripe::StripeRequest;
5 5 use stripe_billing::subscription::{
6 - CancelSubscription, ResumeSubscription, RetrieveSubscription,
7 - UpdateSubscription, UpdateSubscriptionItems, UpdateSubscriptionItemsPriceData,
8 - UpdateSubscriptionItemsPriceDataRecurring, UpdateSubscriptionItemsPriceDataRecurringInterval,
6 + CancelSubscription, ResumeSubscription,
7 + UpdateSubscription,
9 8 UpdateSubscriptionPauseCollection, UpdateSubscriptionPauseCollectionBehavior,
10 - UpdateSubscriptionProrationBehavior,
11 9 };
12 10 use stripe_billing::billing_portal_session::CreateBillingPortalSession;
13 11 use stripe_connect::account::{CreateAccount, CreateAccountType, RetrieveAccount};
@@ -225,75 +223,6 @@ impl StripeClient {
225 223 Ok(())
226 224 }
227 225
228 - /// Change the tier of a platform-level app sync subscription.
229 - ///
230 - /// rc.5's `UpdateSubscriptionItemsPriceData` requires an existing `product`
231 - /// id (no inline `product_data`), so we create a Product first and then
232 - /// swap the subscription item's price to a new inline Price referencing it.
233 - #[tracing::instrument(skip_all, name = "payments::update_app_sync_subscription_tier")]
234 - pub async fn update_app_sync_subscription_tier(
235 - &self,
236 - stripe_sub_id: &str,
237 - product_name: &str,
238 - price_cents: i64,
239 - interval: &str,
240 - ) -> Result<()> {
241 - let sub_id = parse_subscription_id(stripe_sub_id)?;
242 -
243 - // 1. Retrieve subscription to find the existing item ID.
244 - let sub = RetrieveSubscription::new(sub_id.clone())
245 - .send(&self.client)
246 - .await
247 - .map_err(|e| {
248 - tracing::error!(stripe_sub_id = %stripe_sub_id, error = ?e, "failed to retrieve subscription");
249 - AppError::Internal(anyhow::anyhow!("Failed to retrieve subscription"))
250 - })?;
251 -
252 - let item_id = sub.items.data.first().map(|it| it.id.to_string()).ok_or_else(|| {
253 - tracing::error!(stripe_sub_id = %stripe_sub_id, "subscription has no items");
254 - AppError::Internal(anyhow::anyhow!("Subscription has no items"))
255 - })?;
256 -
257 - // 2. Create a Product (no `Stripe-Account` — platform-mode subscription).
258 - let product = CreateProduct::new(product_name.to_string())
259 - .send(&self.client)
260 - .await
261 - .map_err(|e| {
262 - tracing::error!(stripe_sub_id = %stripe_sub_id, error = ?e, "failed to create product for tier change");
263 - AppError::Internal(anyhow::anyhow!("Failed to create product"))
264 - })?;
265 -
266 - // 3. Update the subscription item with inline price_data on the new product.
267 - let recurring_interval = match interval {
268 - "year" => UpdateSubscriptionItemsPriceDataRecurringInterval::Year,
269 - _ => UpdateSubscriptionItemsPriceDataRecurringInterval::Month,
270 - };
271 - let mut price_data = UpdateSubscriptionItemsPriceData::new(
272 - Currency::USD,
273 - product.id.to_string(),
274 - UpdateSubscriptionItemsPriceDataRecurring::new(recurring_interval),
275 - );
276 - price_data.unit_amount = Some(price_cents);
277 -
278 - let item = UpdateSubscriptionItems {
279 - id: Some(item_id),
280 - price_data: Some(price_data),
281 - ..UpdateSubscriptionItems::new()
282 - };
283 -
284 - UpdateSubscription::new(sub_id)
285 - .items(vec![item])
286 - .proration_behavior(UpdateSubscriptionProrationBehavior::CreateProrations)
287 - .send(&self.client)
288 - .await
289 - .map_err(|e| {
290 - tracing::error!(stripe_sub_id = %stripe_sub_id, error = ?e, "failed to update subscription tier");
291 - AppError::Internal(anyhow::anyhow!("Failed to update subscription tier"))
292 - })?;
293 -
294 - Ok(())
295 - }
296 -
297 226 /// Cancel a platform-level subscription (creator tier, Fan+).
298 227 #[tracing::instrument(skip_all, name = "payments::cancel_platform_subscription")]
299 228 pub async fn cancel_platform_subscription(&self, stripe_sub_id: &str) -> Result<()> {
@@ -17,10 +17,12 @@
17 17 mod checkout;
18 18 mod checkout_metadata;
19 19 mod connect;
20 + pub mod synckit_billing;
20 21 mod webhooks;
21 22
22 23 pub use checkout::*;
23 24 pub use checkout_metadata::*;
25 + pub use synckit_billing::SynckitSubResult;
24 26 pub use webhooks::*;
25 27
26 28 use stripe::Client;
@@ -53,13 +55,13 @@ impl StripeClient {
53 55
54 56 use crate::error::{AppError, Result};
55 57
56 - /// Simplified checkout result — what handlers actually need from Stripe sessions.
58 + /// Simplified checkout result: what handlers need from Stripe sessions.
57 59 pub struct CheckoutResult {
58 60 pub id: String,
59 61 pub url: Option<String>,
60 62 }
61 63
62 - /// Simplified balance — what handlers actually need from Stripe balance.
64 + /// Simplified balance: what handlers need from Stripe balance.
63 65 pub struct BalanceSummary {
64 66 pub available_cents: i64,
65 67 pub pending_cents: i64,
@@ -75,7 +77,6 @@ pub trait PaymentProvider: Send + Sync {
75 77 async fn create_tip_checkout_session(&self, params: &TipCheckoutParams<'_>) -> crate::error::Result<CheckoutResult>;
76 78 async fn create_fan_plus_checkout_session(&self, price_id: &str, user_id: crate::db::UserId, success_url: &str, cancel_url: &str) -> crate::error::Result<CheckoutResult>;
77 79 async fn create_creator_tier_checkout_session(&self, price_id: &str, user_id: crate::db::UserId, tier: &str, success_url: &str, cancel_url: &str) -> crate::error::Result<CheckoutResult>;
78 - async fn create_app_sync_checkout_session(&self, params: &AppSyncCheckoutParams<'_>) -> crate::error::Result<CheckoutResult>;
79 80 async fn create_cart_checkout_session(&self, params: &CartCheckoutParams<'_>) -> crate::error::Result<CheckoutResult>;
80 81
81 82 // Connect
@@ -91,8 +92,6 @@ pub trait PaymentProvider: Send + Sync {
91 92 async fn cancel_subscription(&self, stripe_sub_id: &str, connected_account_id: &str) -> crate::error::Result<()>;
92 93 /// Set or clear `cancel_at_period_end` on a fan subscription (for creator pause/resume).
93 94 async fn set_cancel_at_period_end(&self, stripe_sub_id: &str, connected_account_id: &str, cancel: bool) -> crate::error::Result<()>;
94 - /// Update the tier of an app sync subscription (swap price with proration).
95 - async fn update_app_sync_subscription_tier(&self, stripe_sub_id: &str, product_name: &str, price_cents: i64, interval: &str) -> crate::error::Result<()>;
96 95 /// Cancel a platform-level subscription (creator tier, Fan+). Not on a connected account.
97 96 async fn cancel_platform_subscription(&self, stripe_sub_id: &str) -> crate::error::Result<()>;
98 97 /// Set or clear `cancel_at_period_end` on a platform subscription (Fan+, creator tier).
@@ -106,6 +105,15 @@ pub trait PaymentProvider: Send + Sync {
106 105 // Webhooks
107 106 fn verify_webhook(&self, payload: &str, signature: &str) -> crate::error::Result<UntypedEvent>;
108 107 fn verify_webhook_v2(&self, payload: &str, signature: &str) -> crate::error::Result<serde_json::Value>;
108 +
109 + // SyncKit v2 developer billing — one customer + subscription per app,
110 + // separate from creator-tier and Fan+ subscriptions. See
111 + // `synckit_billing.rs` for the rationale on per-app customers.
112 + async fn create_synckit_customer(&self, developer_user_id: crate::db::UserId, email: &str, app_name: &str) -> crate::error::Result<String>;
113 + async fn create_synckit_subscription(&self, customer_id: &str, app_id: crate::db::SyncAppId, app_name: &str, price_cents: i64) -> crate::error::Result<SynckitSubResult>;
114 + async fn update_synckit_subscription_price(&self, subscription_id: &str, new_price_cents: i64, app_name: &str) -> crate::error::Result<()>;
115 + async fn cancel_synckit_subscription(&self, subscription_id: &str) -> crate::error::Result<()>;
116 + async fn create_synckit_billing_portal(&self, customer_id: &str, return_url: &str) -> crate::error::Result<String>;
109 117 }
110 118
111 119 #[async_trait::async_trait]
@@ -140,11 +148,6 @@ impl PaymentProvider for StripeClient {
140 148 Ok(CheckoutResult { id: session.id.to_string(), url: session.url })
141 149 }
142 150
143 - async fn create_app_sync_checkout_session(&self, params: &AppSyncCheckoutParams<'_>) -> crate::error::Result<CheckoutResult> {
144 - let session = StripeClient::create_app_sync_checkout_session(self, params).await?;
145 - Ok(CheckoutResult { id: session.id.to_string(), url: session.url })
146 - }
147 -
148 151 async fn create_cart_checkout_session(&self, params: &CartCheckoutParams<'_>) -> crate::error::Result<CheckoutResult> {
149 152 let session = StripeClient::create_cart_checkout_session(self, params).await?;
150 153 Ok(CheckoutResult { id: session.id.to_string(), url: session.url })
@@ -199,10 +202,6 @@ impl PaymentProvider for StripeClient {
199 202 StripeClient::set_cancel_at_period_end(self, stripe_sub_id, connected_account_id, cancel).await
200 203 }
201 204
202 - async fn update_app_sync_subscription_tier(&self, stripe_sub_id: &str, product_name: &str, price_cents: i64, interval: &str) -> crate::error::Result<()> {
203 - StripeClient::update_app_sync_subscription_tier(self, stripe_sub_id, product_name, price_cents, interval).await
204 - }
205 -
206 205 async fn cancel_platform_subscription(&self, stripe_sub_id: &str) -> crate::error::Result<()> {
207 206 StripeClient::cancel_platform_subscription(self, stripe_sub_id).await
208 207 }
@@ -226,4 +225,24 @@ impl PaymentProvider for StripeClient {
226 225 fn verify_webhook_v2(&self, payload: &str, signature: &str) -> crate::error::Result<serde_json::Value> {
227 226 StripeClient::verify_webhook_v2(self, payload, signature)
228 227 }
228 +
229 + async fn create_synckit_customer(&self, developer_user_id: crate::db::UserId, email: &str, app_name: &str) -> crate::error::Result<String> {
230 + StripeClient::create_synckit_customer(self, developer_user_id, email, app_name).await
231 + }
232 +
233 + async fn create_synckit_subscription(&self, customer_id: &str, app_id: crate::db::SyncAppId, app_name: &str, price_cents: i64) -> crate::error::Result<SynckitSubResult> {
234 + StripeClient::create_synckit_subscription(self, customer_id, app_id, app_name, price_cents).await
235 + }
236 +
237 + async fn update_synckit_subscription_price(&self, subscription_id: &str, new_price_cents: i64, app_name: &str) -> crate::error::Result<()> {
238 + StripeClient::update_synckit_subscription_price(self, subscription_id, new_price_cents, app_name).await
239 + }
240 +
241 + async fn cancel_synckit_subscription(&self, subscription_id: &str) -> crate::error::Result<()> {
242 + StripeClient::cancel_synckit_subscription(self, subscription_id).await
243 + }
244 +
245 + async fn create_synckit_billing_portal(&self, customer_id: &str, return_url: &str) -> crate::error::Result<String> {
246 + StripeClient::create_synckit_billing_portal(self, customer_id, return_url).await
247 + }
229 248 }
@@ -0,0 +1,256 @@
1 + //! Stripe wiring for SyncKit v2 developer billing.
2 + //!
3 + //! One Stripe Customer is created per sync app (not per developer's MNW
4 + //! account), because each app is billed independently. The subscription's
5 + //! metadata carries `synckit_app_id` so the webhook dispatcher can route
6 + //! events to the SyncKit billing path.
7 + //!
8 + //! Prices are created inline on the subscription via `price_data` rather than
9 + //! by pre-creating Stripe Price objects; this keeps the dashboard tidy and
10 + //! lets us re-price freely on every knob change. We do create a Stripe
11 + //! `Product` per app once (the SDK requires a product id even for inline
12 + //! price_data); the product is reused for subsequent re-prices.
13 +
14 + use std::collections::HashMap;
15 +
16 + use stripe_billing::subscription::{
17 + CancelSubscription, CreateSubscription, CreateSubscriptionItems,
18 + CreateSubscriptionItemsPriceData, CreateSubscriptionItemsPriceDataRecurring,
19 + CreateSubscriptionItemsPriceDataRecurringInterval, RetrieveSubscription, UpdateSubscription,
20 + UpdateSubscriptionItems, UpdateSubscriptionItemsPriceData, UpdateSubscriptionProrationBehavior,
21 + };
22 + use stripe_core::customer::CreateCustomer;
23 + use stripe_product::product::CreateProduct;
24 + use stripe_types::Currency;
25 +
26 + use super::StripeClient;
27 + use crate::db::{SyncAppId, UserId};
28 + use crate::error::{AppError, Result};
29 +
30 + /// Result of creating a SyncKit subscription. Carries enough information for
31 + /// the route handler to stamp the local `sync_apps` row in one go.
32 + pub struct SynckitSubResult {
33 + pub subscription_id: String,
34 + pub current_period_start: i64,
35 + pub current_period_end: i64,
36 + }
37 +
38 + fn parse_subscription_id(id: &str) -> Result<stripe_shared::SubscriptionId> {
39 + id.parse().map_err(|e| {
40 + AppError::Internal(anyhow::anyhow!(
41 + "Invalid Stripe subscription ID '{}': {}",
42 + id,
43 + e
44 + ))
45 + })
46 + }
47 +
48 + impl StripeClient {
49 + /// Create a Stripe Customer for a SyncKit app. The customer represents
50 + /// one app, not the developer's MNW account, because each app is billed
51 + /// independently. Metadata pins the customer to both developer and app
52 + /// for audit-trail visibility in the Stripe dashboard.
53 + #[tracing::instrument(skip_all, name = "payments::create_synckit_customer")]
54 + pub async fn create_synckit_customer(
55 + &self,
56 + developer_user_id: UserId,
57 + email: &str,
58 + app_name: &str,
59 + ) -> Result<String> {
60 + let mut metadata = HashMap::new();
61 + metadata.insert("mnw_user_id".to_string(), developer_user_id.to_string());
62 + metadata.insert("synckit_app_name".to_string(), app_name.to_string());
63 +
64 + let customer = CreateCustomer::new()
65 + .email(email.to_string())
66 + .name(format!("SyncKit — {app_name}"))
67 + .metadata(metadata)
68 + .send(&self.client)
69 + .await
70 + .map_err(|e| {
71 + tracing::error!(error = ?e, "failed to create SyncKit Stripe customer");
72 + AppError::Internal(anyhow::anyhow!("Failed to create Stripe customer"))
73 + })?;
74 +
75 + Ok(customer.id.to_string())
76 + }
77 +
78 + /// Create a Stripe Product for a SyncKit app. Called once during billing
79 + /// activation; the same product is reused on re-price.
80 + async fn create_synckit_product(&self, app_name: &str) -> Result<String> {
81 + let product = CreateProduct::new(format!("SyncKit — {app_name}"))
82 + .send(&self.client)
83 + .await
84 + .map_err(|e| {
85 + tracing::error!(error = ?e, "failed to create SyncKit Stripe product");
86 + AppError::Internal(anyhow::anyhow!("Failed to create Stripe product"))
87 + })?;
88 + Ok(product.id.to_string())
89 + }
90 +
91 + /// Create a monthly recurring subscription for a SyncKit app. The price
92 + /// is created inline via `price_data` (`unit_amount = price_cents`,
93 + /// `interval = month`, `currency = usd`). Metadata `synckit_app_id` lets
94 + /// the webhook dispatcher distinguish these from creator-tier / Fan+
95 + /// subscriptions.
96 + #[tracing::instrument(skip_all, name = "payments::create_synckit_subscription")]
97 + pub async fn create_synckit_subscription(
98 + &self,
99 + customer_id: &str,
100 + app_id: SyncAppId,
101 + app_name: &str,
102 + price_cents: i64,
103 + ) -> Result<SynckitSubResult> {
104 + if price_cents <= 0 {
105 + return Err(AppError::BadRequest(
106 + "Subscription price must be positive".to_string(),
107 + ));
108 + }
109 +
110 + // We need a Product id to use inline price_data; create one per app.
111 + let product_id = self.create_synckit_product(app_name).await?;
112 +
113 + let price_data = CreateSubscriptionItemsPriceData {
114 + currency: Currency::USD,
115 + product: product_id,
116 + recurring: CreateSubscriptionItemsPriceDataRecurring::new(
117 + CreateSubscriptionItemsPriceDataRecurringInterval::Month,
118 + ),
119 + tax_behavior: None,
120 + unit_amount: Some(price_cents),
121 + unit_amount_decimal: None,
122 + };
123 +
124 + let mut item = CreateSubscriptionItems::new();
125 + item.price_data = Some(price_data);
126 +
127 + let mut metadata = HashMap::new();
128 + metadata.insert("synckit_app_id".to_string(), app_id.to_string());
129 +
130 + let subscription = CreateSubscription::new()
131 + .customer(customer_id.to_string())
132 + .items(vec![item])
133 + .metadata(metadata)
134 + .send(&self.client)
135 + .await
136 + .map_err(|e| {
137 + tracing::error!(error = ?e, app_id = %app_id, "failed to create SyncKit subscription");
138 + AppError::Internal(anyhow::anyhow!("Failed to create Stripe subscription"))
139 + })?;
140 +
141 + let first_item = subscription.items.data.first().ok_or_else(|| {
142 + AppError::Internal(anyhow::anyhow!("Stripe subscription has no items"))
143 + })?;
144 +
145 + Ok(SynckitSubResult {
146 + subscription_id: subscription.id.to_string(),
147 + current_period_start: first_item.current_period_start,
148 + current_period_end: first_item.current_period_end,
149 + })
150 + }
151 +
152 + /// Re-price a SyncKit subscription. Fetches the existing subscription to
153 + /// learn its item id, then attaches a new inline `price_data` with the
154 + /// new amount. Prorations are turned on (`create_prorations`) so the
155 + /// developer is credited / charged the difference on the next invoice.
156 + #[tracing::instrument(skip_all, name = "payments::update_synckit_subscription_price")]
157 + pub async fn update_synckit_subscription_price(
158 + &self,
159 + subscription_id: &str,
160 + new_price_cents: i64,
161 + app_name: &str,
162 + ) -> Result<()> {
163 + if new_price_cents <= 0 {
164 + return Err(AppError::BadRequest(
165 + "Subscription price must be positive".to_string(),
166 + ));
167 + }
168 +
169 + let sub_id = parse_subscription_id(subscription_id)?;
170 +
171 + // Need the existing subscription item id to update its price.
172 + let existing = RetrieveSubscription::new(sub_id.clone())
173 + .send(&self.client)
174 + .await
175 + .map_err(|e| {
176 + tracing::error!(error = ?e, subscription_id = %subscription_id, "failed to retrieve SyncKit subscription");
177 + AppError::Internal(anyhow::anyhow!("Failed to retrieve Stripe subscription"))
178 + })?;
179 +
180 + let existing_item = existing.items.data.first().ok_or_else(|| {
181 + AppError::Internal(anyhow::anyhow!(
182 + "Stripe subscription {} has no items",
183 + subscription_id
184 + ))
185 + })?;
186 +
187 + // Reuse the existing item's product so we don't accumulate orphans.
188 + let product_id = existing_item.price.product.id().to_string();
189 +
190 + let new_price_data = UpdateSubscriptionItemsPriceData {
191 + currency: Currency::USD,
192 + product: product_id,
193 + recurring: stripe_billing::subscription::UpdateSubscriptionItemsPriceDataRecurring::new(
194 + stripe_billing::subscription::UpdateSubscriptionItemsPriceDataRecurringInterval::Month,
195 + ),
196 + tax_behavior: None,
197 + unit_amount: Some(new_price_cents),
198 + unit_amount_decimal: None,
199 + };
200 +
201 + let mut item = UpdateSubscriptionItems::default();
202 + item.id = Some(existing_item.id.to_string());
203 + item.price_data = Some(new_price_data);
204 +
205 + // The product name (which surfaces on the Stripe dashboard for this
206 + // product) is set once at create-time. Re-naming is a separate Stripe
207 + // call we currently don't need — record the param so future re-naming
208 + // hooks have it without changing the trait signature.
209 + let _ = app_name;
210 +
211 + UpdateSubscription::new(sub_id)
212 + .items(vec![item])
213 + .proration_behavior(UpdateSubscriptionProrationBehavior::CreateProrations)
214 + .send(&self.client)
215 + .await
216 + .map_err(|e| {
217 + tracing::error!(error = ?e, subscription_id = %subscription_id, "failed to update SyncKit subscription price");
218 + AppError::Internal(anyhow::anyhow!("Failed to update Stripe subscription"))
219 + })?;
220 +
221 + Ok(())
222 + }
223 +
224 + /// Cancel a SyncKit subscription immediately.
225 + ///
226 + /// We cancel immediately (rather than at_period_end=true) because the
227 + /// developer is paying for cloud resources we'll stop providing the
228 + /// moment the app is canceled. Holding the subscription open for a few
229 + /// extra weeks would let the developer keep billing accruing against a
230 + /// dead app, worse for everyone.
231 + #[tracing::instrument(skip_all, name = "payments::cancel_synckit_subscription")]
232 + pub async fn cancel_synckit_subscription(&self, subscription_id: &str) -> Result<()> {
233 + let sub_id = parse_subscription_id(subscription_id)?;
234 + CancelSubscription::new(sub_id)
235 + .send(&self.client)
236 + .await
237 + .map_err(|e| {
238 + tracing::error!(error = ?e, subscription_id = %subscription_id, "failed to cancel SyncKit subscription");
239 + AppError::Internal(anyhow::anyhow!("Failed to cancel Stripe subscription"))
240 + })?;
241 + Ok(())
242 + }
243 +
244 + /// Open a Stripe billing portal session for the SyncKit app's customer.
245 + /// Reuses the platform-level billing portal pattern.
246 + #[tracing::instrument(skip_all, name = "payments::create_synckit_billing_portal")]
247 + pub async fn create_synckit_billing_portal(
248 + &self,
249 + customer_id: &str,
250 + return_url: &str,
251 + ) -> Result<String> {
252 + // Identical to the creator-tier / Fan+ billing portal path — kept as
253 + // a separate method so the trait surface mirrors the SyncKit domain.
254 + self.create_billing_portal_session(customer_id, return_url).await
255 + }
256 + }
@@ -68,11 +68,11 @@ impl StripeClient {
68 68 /// every secret is the only option.
69 69 ///
70 70 /// On failure the returned `AppError::BadRequest` body is specific enough
71 - /// to distinguish signature failures ("Invalid webhook signature: …") from
72 - /// payload-shape failures ("Webhook envelope JSON parse failed: …",
73 - /// "Webhook envelope missing required field: …"). The Stripe Dashboard
74 - /// surfaces these bodies for failed webhook deliveries, so wording matters
75 - /// — past incidents (Stripe API version mismatch producing serde
71 + /// to distinguish signature failures ("Invalid webhook signature: ...") from
72 + /// payload-shape failures ("Webhook envelope JSON parse failed: ...",
73 + /// "Webhook envelope missing required field: ..."). The Stripe Dashboard
74 + /// surfaces these bodies for failed webhook deliveries, so wording matters.
75 + /// Past incidents (Stripe API version mismatch producing serde
76 76 /// `missing field` errors) were initially misread as signature failures.
77 77 #[tracing::instrument(skip_all, name = "payments::verify_webhook")]
78 78 pub fn verify_webhook(&self, payload: &str, signature: &str) -> Result<UntypedEvent> {
@@ -109,7 +109,7 @@ impl StripeClient {
109 109 }
110 110 }
111 111
112 - /// Narrow view of a CheckoutSession — only the fields any handler reads.
112 + /// Narrow view of a CheckoutSession: only the fields any handler reads.
113 113 ///
114 114 /// Built ad-hoc rather than via `stripe_shared::CheckoutSession` to stay
115 115 /// resilient against new required fields Stripe adds. The original migration
@@ -134,7 +134,7 @@ pub struct CheckoutCustomerDetailsView {
134 134 pub email: Option<String>,
135 135 }
136 136
137 - /// Narrow view of a Subscription — id, status, cancellation flag, and the
137 + /// Narrow view of a Subscription: id, status, cancellation flag, and the
138 138 /// item-level period fields rc.5 promoted from the top level.
139 139 #[derive(Debug, serde::Deserialize)]
140 140 pub struct SubscriptionView {
@@ -167,7 +167,7 @@ pub struct SubscriptionItemView {
167 167 pub current_period_end: i64,
168 168 }
169 169
170 - /// Narrow view of an Invoice — subscription id (via legacy `subscription` or
170 + /// Narrow view of an Invoice: subscription id (via legacy `subscription` or
171 171 /// the rc.5 `parent.subscription_details.subscription` path), period bounds,
172 172 /// and billing reason.
173 173 #[derive(Debug, serde::Deserialize)]
@@ -249,7 +249,7 @@ impl From<stripe_shared::Account> for AccountUpdate {
249 249 }
250 250 }
251 251
252 - /// Narrow view of an Account — only the fields we react to.
252 + /// Narrow view of an Account: only the fields we react to.
253 253 #[derive(Debug, serde::Deserialize)]
254 254 pub struct AccountView {
255 255 pub id: String,
@@ -297,7 +297,7 @@ impl ChargeRefundData {
297 297 }
298 298
299 299 /// Build from a parsed charge view. Returns None when there is no
300 - /// payment_intent — these events are out of scope here.
300 + /// payment_intent; these events are out of scope here.
301 301 pub fn from_view(charge: ChargeView) -> Option<Self> {
302 302 Some(ChargeRefundData {
303 303 payment_intent_id: charge.payment_intent?,
@@ -311,7 +311,7 @@ impl ChargeRefundData {
311 311 // v2 thin event types
312 312 // ---------------------------------------------------------------------------
313 313
314 - /// A Stripe v2 "thin" event — contains only the event type and a reference to
314 + /// A Stripe v2 "thin" event: contains only the event type and a reference to
315 315 /// the related object, not the full object snapshot.
316 316 #[derive(Debug, serde::Deserialize)]
317 317 pub struct ThinEvent {
@@ -9,6 +9,7 @@ mod announcements;
9 9 mod cleanup;
10 10 mod integrity;
11 11 mod mt_threads;
12 + mod synckit_warnings;
12 13 mod webhooks;
13 14
14 15 use tokio::sync::watch;
@@ -35,14 +36,19 @@ const DAILY_INTERVAL: u64 = 1440;
35 36 /// Sandbox cleanup interval in scheduler ticks (5 = 5min at 60s).
36 37 const SANDBOX_CLEANUP_INTERVAL: u64 = 5;
37 38
39 + /// Hourly interval in scheduler ticks (60 = 1h at 60s). Used by the SyncKit
40 + /// usage-warning job.
41 + const HOURLY_INTERVAL: u64 = 60;
42 +
38 43 /// Determine which scheduled job groups should run for a given tick.
39 44 ///
40 - /// Returns `(sandbox_cleanup, daily_jobs, weekly_jobs)`.
41 - fn jobs_for_tick(tick: u64) -> (bool, bool, bool) {
45 + /// Returns `(sandbox_cleanup, hourly_jobs, daily_jobs, weekly_jobs)`.
46 + fn jobs_for_tick(tick: u64) -> (bool, bool, bool, bool) {
42 47 let sandbox = tick.is_multiple_of(SANDBOX_CLEANUP_INTERVAL);
48 + let hourly = tick.is_multiple_of(HOURLY_INTERVAL);
43 49 let daily = tick == 1 || tick.is_multiple_of(DAILY_INTERVAL);
44 50 let weekly = tick.is_multiple_of(DRIFT_CORRECTION_INTERVAL);
45 - (sandbox, daily, weekly)
51 + (sandbox, hourly, daily, weekly)
46 52 }
47 53
48 54 /// Spawn the background scheduler loop. Drop `shutdown_tx` to stop it.
@@ -151,7 +157,7 @@ pub fn spawn_scheduler(
151 157 tracing::error!(error = ?e, "failed to clean up expired idempotency keys");
152 158 }
153 159
154 - let (run_sandbox, run_daily, run_weekly) = jobs_for_tick(tick_count);
160 + let (run_sandbox, run_hourly, run_daily, run_weekly) = jobs_for_tick(tick_count);
155 161
156 162 // Clean up expired sandbox accounts (every 5 ticks = 5 min at 60s interval)
157 163 if run_sandbox {
@@ -171,10 +177,24 @@ pub fn spawn_scheduler(
171 177 // Delete S3 objects from presigned uploads that were never confirmed (>24h)
172 178 cleanup::cleanup_orphaned_uploads(&state).await;
173 179
180 + // Hourly: scan SyncKit apps for 75/90/100% cap breaches and email
181 + // the app owner. Cheap query — single JOIN on a small table.
182 + if run_hourly {
183 + synckit_warnings::check_and_send_warnings(&state).await;
184 + }
185 +
174 186 // Weekly storage drift correction + integrity checks
175 187 if run_weekly {
176 188 integrity::recalculate_all_storage_used(&state).await;
177 189 integrity::check_sales_count_drift(&state).await;
190 + match db::synckit_billing::recalculate_synckit_app_storage(&state.db).await {
191 + Ok(n) => {
192 + if n > 0 {
193 + tracing::info!(corrected = n, "synckit app storage drift corrected");
194 + }
195 + }
196 + Err(e) => tracing::error!(error = ?e, "synckit storage drift correction failed"),
197 + }
178 198 }
179 199
180 200 // Daily checks (every 1440 ticks at 60s interval, plus first tick after startup)
@@ -235,7 +255,7 @@ mod tests {
235 255
236 256 #[test]
237 257 fn tick_1_runs_daily_not_weekly() {
238 - let (sandbox, daily, weekly) = jobs_for_tick(1);
258 + let (sandbox, _hourly, daily, weekly) = jobs_for_tick(1);
239 259 assert!(!sandbox, "tick 1 is not a multiple of 5");
240 260 assert!(daily, "tick 1 should trigger daily jobs (first-tick rule)");
241 261 assert!(!weekly, "tick 1 should not trigger weekly jobs");
@@ -243,7 +263,7 @@ mod tests {
243 263
244 264 #[test]
245 265 fn tick_5_runs_sandbox_cleanup() {
246 - let (sandbox, daily, weekly) = jobs_for_tick(5);
266 + let (sandbox, _hourly, daily, weekly) = jobs_for_tick(5);
247 267 assert!(sandbox);
248 268 assert!(!daily);
249 269 assert!(!weekly);
@@ -251,7 +271,7 @@ mod tests {
251 271
252 272 #[test]
253 273 fn tick_1440_runs_daily_and_sandbox() {
254 - let (sandbox, daily, weekly) = jobs_for_tick(1440);
274 + let (sandbox, _hourly, daily, weekly) = jobs_for_tick(1440);
255 275 assert!(sandbox, "1440 is divisible by 5");
256 276 assert!(daily, "1440 is the daily interval");
257 277 assert!(!weekly);
@@ -259,7 +279,7 @@ mod tests {
259 279
260 280 #[test]
261 281 fn tick_10080_runs_all_three() {
262 - let (sandbox, daily, weekly) = jobs_for_tick(10_080);
282 + let (sandbox, _hourly, daily, weekly) = jobs_for_tick(10_080);
263 283 assert!(sandbox, "10080 is divisible by 5");
264 284 assert!(daily, "10080 is divisible by 1440");
265 285 assert!(weekly, "10080 is the weekly interval");
@@ -267,7 +287,7 @@ mod tests {
267 287
268 288 #[test]
269 289 fn normal_tick_runs_nothing_special() {
270 - let (sandbox, daily, weekly) = jobs_for_tick(7);
290 + let (sandbox, _hourly, daily, weekly) = jobs_for_tick(7);
271 291 assert!(!sandbox);
272 292 assert!(!daily);
273 293 assert!(!weekly);
@@ -275,13 +295,13 @@ mod tests {
275 295
276 296 #[test]
277 297 fn second_daily_tick() {
278 - let (_, daily, _) = jobs_for_tick(2880);
298 + let (_, _, daily, _) = jobs_for_tick(2880);
279 299 assert!(daily, "2880 = 2 * 1440");
280 300 }
281 301
282 302 #[test]
283 303 fn second_weekly_tick() {
284 - let (_, _, weekly) = jobs_for_tick(20_160);
304 + let (_, _, _, weekly) = jobs_for_tick(20_160);
285 305 assert!(weekly, "20160 = 2 * 10080");
286 306 }
287 307
@@ -309,7 +329,7 @@ mod tests {
309 329 // Tick 0 should not run anything: 0 % N == 0 for all N,
310 330 // but tick 0 never happens in practice (counter starts at 0, increments before use).
311 331 // Test what would happen if it did.
312 - let (sandbox, daily, weekly) = jobs_for_tick(0);
332 + let (sandbox, _hourly, daily, weekly) = jobs_for_tick(0);
313 333 // 0.is_multiple_of(N) is true for all N, so these all fire
314 334 assert!(sandbox, "0 is a multiple of 5");
315 335 assert!(daily, "0 is a multiple of 1440");
@@ -320,13 +340,13 @@ mod tests {
320 340 fn large_tick_values() {
321 341 // 52 weeks of ticks — exact multiple of weekly interval
322 342 let fifty_two_weeks = 52 * DRIFT_CORRECTION_INTERVAL;
323 - let (sandbox, daily, weekly) = jobs_for_tick(fifty_two_weeks);
343 + let (sandbox, _hourly, daily, weekly) = jobs_for_tick(fifty_two_weeks);
324 344 assert!(sandbox);
325 345 assert!(daily, "{} should be divisible by 1440", fifty_two_weeks);
326 346 assert!(weekly, "{} should be divisible by 10080", fifty_two_weeks);
327 347
328 348 // Large non-aligned tick
329 - let (_, daily, weekly) = jobs_for_tick(999_999);
349 + let (_, _, daily, weekly) = jobs_for_tick(999_999);
330 350 assert!(!daily, "999999 is not divisible by 1440");
331 351 assert!(!weekly, "999999 is not divisible by 10080");
332 352 }
@@ -334,21 +354,21 @@ mod tests {
334 354 #[test]
335 355 fn daily_not_on_partial_day() {
336 356 // Tick 720 = half a day, should not trigger daily
337 - let (_, daily, _) = jobs_for_tick(720);
357 + let (_, _, daily, _) = jobs_for_tick(720);
338 358 assert!(!daily, "720 ticks is only half a day");
339 359 }
340 360
341 361 #[test]
342 362 fn weekly_not_on_partial_week() {
343 363 // 5040 = 3.5 days, should not trigger weekly
344 - let (_, _, weekly) = jobs_for_tick(5040);
364 + let (_, _, _, weekly) = jobs_for_tick(5040);
345 365 assert!(!weekly, "5040 is only 3.5 days");
346 366 }
347 367
348 368 #[test]
349 369 fn sandbox_every_5_ticks_consecutively() {
350 370 for tick in 1..=25 {
351 - let (sandbox, _, _) = jobs_for_tick(tick);
371 + let (sandbox, _, _, _) = jobs_for_tick(tick);
352 372 if tick % 5 == 0 {
353 373 assert!(sandbox, "tick {} should run sandbox cleanup", tick);
354 374 } else {
@@ -0,0 +1,143 @@
1 + //! Hourly SyncKit usage warning emails.
2 + //!
3 + //! Walks every `active`, non-internal sync app and computes the highest
4 + //! warning band (75/90/100%) currently breached above the previously-stamped
5 + //! `last_warning_pct`. For each breach, emails the app owner and stamps the
6 + //! threshold so we don't re-fire on subsequent ticks.
7 + //!
8 + //! Thresholds reset at period rollover via `reset_period_usage` (called from
9 + //! the `invoice.paid` webhook handler), so each billing cycle can fire warnings
10 + //! again from scratch.
11 + //!
12 + //! Storage breaches do NOT reset at period rollover (the cap is absolute, not
13 + //! per-period), but `last_warning_pct` is shared across both dimensions for
14 + //! simplicity. A creator who silenced storage warnings at 75% and then crosses
15 + //! egress 75% in the same cycle won't get re-notified for egress 75% — they'll
16 + //! get the next band (90%/100%) instead. This is an acceptable v1 quirk; if
17 + //! it bites we'll split `last_warning_pct` into two columns.
18 +
19 + use crate::db::synckit_billing;
20 + use crate::AppState;
21 +
22 + /// Run one pass: fetch breach candidates, send warning emails, update
23 + /// `last_warning_pct`. Errors on individual apps are logged and skipped.
24 + #[tracing::instrument(skip_all)]
25 + pub(super) async fn check_and_send_warnings(state: &AppState) {
26 + let candidates = match synckit_billing::get_apps_needing_warning(&state.db).await {
27 + Ok(c) => c,
28 + Err(e) => {
29 + tracing::error!(error = ?e, "synckit warnings: query failed");
30 + return;
31 + }
32 + };
33 +
34 + if candidates.is_empty() { return; }
35 + tracing::info!(count = candidates.len(), "synckit warnings: candidates");
36 +
37 + for c in candidates {
38 + let url = format!(
39 + "{}/sync/apps/{}/billing",
40 + state.config.host_url, c.app_id,
41 + );
42 + if let Err(e) = state.email.send_synckit_usage_warning(
43 + &c.creator_email,
44 + &c.app_name,
45 + c.dimension,
46 + c.threshold_pct,
47 + c.used,
48 + c.limit,
49 + &url,
50 + ).await {
51 + tracing::error!(
52 + error = ?e,
53 + app_id = %c.app_id,
54 + dimension = c.dimension,
55 + pct = c.threshold_pct,
56 + "synckit warnings: send failed",
57 + );
58 + continue;
59 + }
60 + if let Err(e) = synckit_billing::update_warning_pct(
61 + &state.db, c.app_id, c.threshold_pct,
62 + ).await {
63 + tracing::error!(
64 + error = ?e,
65 + app_id = %c.app_id,
66 + "synckit warnings: stamp failed (will re-fire next tick)",
67 + );
68 + }
69 + }
70 + }
71 +
72 + #[cfg(test)]
73 + mod tests {
74 + use crate::db::synckit_billing::highest_breached_threshold;
75 +
76 + #[test]
77 + fn no_breach_below_first_threshold() {
78 + // 50% used, no warning yet
79 + assert!(highest_breached_threshold(50, 100, 0).is_none());
80 + }
81 +
82 + #[test]
83 + fn first_breach_at_75() {
84 + assert_eq!(highest_breached_threshold(75, 100, 0), Some(75));
85 + assert_eq!(highest_breached_threshold(80, 100, 0), Some(75));
86 + }
87 +
88 + #[test]
89 + fn no_re_fire_at_same_band() {
90 + // 80% used, already warned at 75% — don't fire again at 75%.
91 + assert!(highest_breached_threshold(80, 100, 75).is_none());
92 + }
93 +
94 + #[test]
95 + fn next_band_fires_after_previous() {
96 + // 90% used, last fired at 75% — fire 90%.
97 + assert_eq!(highest_breached_threshold(90, 100, 75), Some(90));
98 + }
99 +
100 + #[test]
101 + fn jumps_to_highest_band_only() {
102 + // 100% used after only firing 75% — fire 100%, skip 90%.
103 + assert_eq!(highest_breached_threshold(100, 100, 75), Some(100));
104 + }
105 +
106 + #[test]
107 + fn overshoot_caps_at_100() {
108 + // 150% used, none fired — still goes straight to 100% (highest band).
109 + assert_eq!(highest_breached_threshold(150, 100, 0), Some(100));
110 + }
111 +
112 + #[test]
113 + fn nothing_above_100_fires() {
114 + // Already at 100%; further growth doesn't trigger anything new.
115 + assert!(highest_breached_threshold(200, 100, 100).is_none());
116 + }
117 +
118 + #[test]
119 + fn zero_limit_is_safe() {
120 + // Defensive: don't divide by zero.
121 + assert!(highest_breached_threshold(50, 0, 0).is_none());
122 + }
123 +
124 + #[test]
125 + fn exactly_75_percent() {
126 + // Floor of 75.0 is 75, should breach the 75 band.
127 + assert_eq!(highest_breached_threshold(75, 100, 0), Some(75));
128 + }
129 +
130 + #[test]
131 + fn just_under_75_does_not_fire() {
132 + // 74.99% — floor is 74, below the 75 threshold.
133 + assert!(highest_breached_threshold(74, 100, 0).is_none());
134 + }
135 +
136 + #[test]
137 + fn large_bytes_no_overflow() {
138 + // 800 GiB of 1 TiB, no warning yet.
139 + let used: i64 = 800 * 1024 * 1024 * 1024;
140 + let limit: i64 = 1024 * 1024 * 1024 * 1024;
141 + assert_eq!(highest_breached_threshold(used, limit, 0), Some(75));
142 + }
143 + }
@@ -0,0 +1,120 @@
1 + //! SyncKit developer billing: pricing formula and constants.
2 + //!
3 + //! See migration 117 for the full pricing model. Summary:
4 + //!
5 + //! price = max($5 floor, storage_gb × $0.013
6 + //! + storage_gb × egress_multiple × $0.0022
7 + //! + key_cap × $0.02 (per_key mode only))
8 + //!
9 + //! Constants live here, not in the DB — tuning them is a code change, not a
10 + //! migration. Costs are pinned to Hetzner Object Storage marginal rates at
11 + //! roughly 2× margin (see CLAUDE memory `project_synckit_pricing.md` if/when
12 + //! it gets written).
13 +
14 + /// Pro-rated minimum monthly charge per app, in cents.
15 + pub const BASE_FLOOR_CENTS: i64 = 500;
16 +
17 + /// Storage rate in cents per GB per month (2× Hetzner Object Storage cost).
18 + pub const STORAGE_RATE_CENTS_PER_GB: f64 = 1.3;
19 +
20 + /// Egress rate in cents per GB of monthly egress quota (2× Hetzner overage).
21 + pub const EGRESS_RATE_CENTS_PER_GB: f64 = 0.22;
22 +
23 + /// Per-key rate in cents per month (per_key enforcement mode only).
24 + pub const KEY_RATE_CENTS: f64 = 2.0;
25 +
26 + /// Warning thresholds (percent of any cap). Matches CHECK constraint on
27 + /// `sync_app_usage_current.last_warning_pct`.
28 + pub const WARNING_THRESHOLDS_PCT: &[i16] = &[75, 90, 100];
29 +
30 + /// Compute the monthly Stripe invoice amount in cents for a given knob set.
31 + ///
32 + /// `key_cap` is `Some(n)` only when the app's `enforcement_mode = 'per_key'`.
33 + /// The base floor is pro-rated: small accounts pay $5, larger accounts pay
34 + /// only their accrued usage with no separate base line.
35 + pub fn monthly_price_cents(
36 + storage_gb: u32,
37 + egress_multiple: f64,
38 + key_cap: Option<u32>,
39 + ) -> i64 {
40 + let storage = f64::from(storage_gb) * STORAGE_RATE_CENTS_PER_GB;
41 + let egress_quota_gb = f64::from(storage_gb) * egress_multiple;
42 + let egress = egress_quota_gb * EGRESS_RATE_CENTS_PER_GB;
43 + let keys = key_cap.map_or(0.0, |k| f64::from(k) * KEY_RATE_CENTS);
44 + let usage = storage + egress + keys;
45 + let floored = usage.max(BASE_FLOOR_CENTS as f64);
46 + floored.ceil() as i64
47 + }
48 +
49 + /// Egress quota in bytes for the given knobs.
50 + pub fn egress_quota_bytes(storage_gb: u32, egress_multiple: f64) -> i64 {
51 + let gb = f64::from(storage_gb) * egress_multiple;
52 + (gb * 1024.0 * 1024.0 * 1024.0).ceil() as i64
53 + }
54 +
55 + /// Storage cap in bytes for the given GB cap.
56 + pub fn storage_cap_bytes(storage_gb: u32) -> i64 {
57 + i64::from(storage_gb) * 1024 * 1024 * 1024
58 + }
59 +
60 + #[cfg(test)]
61 + mod tests {
62 + use super::*;
63 +
64 + fn approx(a: i64, b: i64) -> bool {
65 + (a - b).abs() <= 1
66 + }
67 +
68 + #[test]
69 + fn floor_kicks_in_for_small_accounts() {
70 + // 10 GB, 3× egress → usage ≈ $0.20, hits floor.
71 + assert_eq!(monthly_price_cents(10, 3.0, None), 500);
72 + // 100 GB, 10× egress → usage ≈ $3.50, still hits floor.
73 + assert_eq!(monthly_price_cents(100, 10.0, None), 500);
74 + }
75 +
76 + #[test]
77 + fn floor_disappears_when_usage_exceeds() {
78 + // 500 GB, 5× egress → storage $6.50 + egress $5.50 = $12.00.
79 + assert!(approx(monthly_price_cents(500, 5.0, None), 1200));
80 + // 1 TB, 10× egress → storage $13 + egress $22 = $35.
81 + assert!(approx(monthly_price_cents(1024, 10.0, None), 3585));
82 + }
83 +
84 + #[test]
85 + fn heavy_workload_pricing() {
86 + // 10 TB, 30× egress.
87 + // storage = 10240 × 1.3 = 13312 ¢
88 + // egress = 10240 × 30 × 0.22 = 67584 ¢
89 + // total ≈ $808
90 + let p = monthly_price_cents(10240, 30.0, None);
91 + assert!(p > 80_000 && p < 82_000, "got {p}");
92 + }
93 +
94 + #[test]
95 + fn key_cap_adds_to_usage_in_per_key_mode() {
96 + // 100 GB, 10× egress, 1000 keys → usage $3.50 + $20 = $23.50.
97 + let p = monthly_price_cents(100, 10.0, Some(1000));
98 + assert!(approx(p, 2350));
99 + }
100 +
101 + #[test]
102 + fn key_cap_can_push_through_floor() {
103 + // 10 GB, 1× egress, 300 keys → keys alone = $6, exceeds floor.
104 + let p = monthly_price_cents(10, 1.0, Some(300));
105 + assert!(p > 500, "got {p}");
106 + }
107 +
108 + #[test]
109 + fn fractional_multiple_works() {
110 + // 100 GB, 2.5× egress: storage $1.30 + egress $0.55 = $1.85 → floor.
111 + assert_eq!(monthly_price_cents(100, 2.5, None), 500);
112 + }
113 +
114 + #[test]
115 + fn quota_calc_in_bytes() {
116 + // 10 GB at 3× = 30 GB egress quota.
117 + assert_eq!(egress_quota_bytes(10, 3.0), 30 * 1024 * 1024 * 1024);
118 + assert_eq!(storage_cap_bytes(10), 10 * 1024 * 1024 * 1024);
119 + }
120 + }