max / makenotwork
37 files changed,
+1146 insertions,
-114 deletions
| @@ -3551,7 +3551,7 @@ dependencies = [ | |||
| 3551 | 3551 | ||
| 3552 | 3552 | [[package]] | |
| 3553 | 3553 | name = "makenotwork" | |
| 3554 | - | version = "0.7.3" | |
| 3554 | + | version = "0.8.0" | |
| 3555 | 3555 | dependencies = [ | |
| 3556 | 3556 | "anyhow", | |
| 3557 | 3557 | "argon2", |
| @@ -1,6 +1,6 @@ | |||
| 1 | 1 | [package] | |
| 2 | 2 | name = "makenotwork" | |
| 3 | - | version = "0.7.3" | |
| 3 | + | version = "0.8.0" | |
| 4 | 4 | edition = "2024" | |
| 5 | 5 | license-file = "LICENSE" | |
| 6 | 6 |
| @@ -0,0 +1,40 @@ | |||
| 1 | + | -- SyncKit per-key storage attribution. | |
| 2 | + | -- | |
| 3 | + | -- Migration 117 introduced enforcement_mode = 'per_key' but tracked storage | |
| 4 | + | -- only at the app level. As a result, a full key in per_key mode would | |
| 5 | + | -- degrade the whole app instead of just that key. This migration adds the | |
| 6 | + | -- per-key counter table and the `key` column on sync_blobs that lets the | |
| 7 | + | -- weekly drift job reconcile per-key totals from the authoritative blob set. | |
| 8 | + | -- | |
| 9 | + | -- JWT minting now requires a `key` claim (see `synckit_auth.rs`), so every | |
| 10 | + | -- new blob upload knows its key. Existing sync_blobs rows are first-party | |
| 11 | + | -- internal app data (GO/BB/AF) — we don't need to preserve them and a | |
| 12 | + | -- backfill default would not be meaningful, so we drop and recreate. | |
| 13 | + | ||
| 14 | + | -- ── Per-key live counters ── | |
| 15 | + | -- One row per (app_id, key). bytes_stored is the running total maintained | |
| 16 | + | -- by the storage layer; the weekly drift job reconciles from sync_blobs. | |
| 17 | + | CREATE TABLE sync_key_usage_current ( | |
| 18 | + | app_id UUID NOT NULL REFERENCES sync_apps(id) ON DELETE CASCADE, | |
| 19 | + | key TEXT NOT NULL, | |
| 20 | + | bytes_stored BIGINT NOT NULL DEFAULT 0, | |
| 21 | + | bytes_egress_period BIGINT NOT NULL DEFAULT 0, | |
| 22 | + | -- Highest 75/90/100 warning band already emailed for this key under the | |
| 23 | + | -- current cap. Mirrors `sync_app_usage_current.last_warning_pct` so the | |
| 24 | + | -- per-key warning loop can stamp progress without re-firing every tick. | |
| 25 | + | last_warning_pct SMALLINT NOT NULL DEFAULT 0, | |
| 26 | + | period_started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), | |
| 27 | + | updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), | |
| 28 | + | PRIMARY KEY (app_id, key) | |
| 29 | + | ); | |
| 30 | + | ||
| 31 | + | CREATE INDEX idx_sync_key_usage_current_app ON sync_key_usage_current(app_id); | |
| 32 | + | ||
| 33 | + | -- ── sync_blobs: attribute each blob to a key ── | |
| 34 | + | -- Drop existing rows (internal-app data only; not preserved across this | |
| 35 | + | -- schema change). The `key` column is NOT NULL with no default — every | |
| 36 | + | -- future write must supply it. | |
| 37 | + | DELETE FROM sync_blobs; | |
| 38 | + | ALTER TABLE sync_blobs ADD COLUMN key TEXT NOT NULL; | |
| 39 | + | ||
| 40 | + | CREATE INDEX idx_sync_blobs_app_key ON sync_blobs(app_id, key); |
| @@ -23,7 +23,7 @@ pub mod waitlist; | |||
| 23 | 23 | pub(crate) mod blog_posts; | |
| 24 | 24 | pub(crate) mod license_keys; | |
| 25 | 25 | pub(crate) mod synckit; | |
| 26 | - | pub(crate) mod synckit_billing; | |
| 26 | + | pub mod synckit_billing; | |
| 27 | 27 | pub(crate) mod oauth; | |
| 28 | 28 | pub(crate) mod promo_codes; | |
| 29 | 29 | pub(crate) mod follows; |
| @@ -108,6 +108,9 @@ pub struct DbSyncBlob { | |||
| 108 | 108 | pub s3_key: String, | |
| 109 | 109 | /// Size of the blob in bytes. | |
| 110 | 110 | pub size_bytes: i64, | |
| 111 | + | /// Developer-defined SDK key this blob was uploaded under. Used to | |
| 112 | + | /// attribute storage against the right per-key counter (per_key mode). | |
| 113 | + | pub key: String, | |
| 111 | 114 | /// When the blob upload was confirmed. | |
| 112 | 115 | pub uploaded_at: DateTime<Utc>, | |
| 113 | 116 | } | |
| @@ -163,6 +166,9 @@ pub struct DbSyncAppKey { | |||
| 163 | 166 | pub id: uuid::Uuid, | |
| 164 | 167 | pub key: String, | |
| 165 | 168 | pub claimed_at: DateTime<Utc>, | |
| 169 | + | /// Bytes stored under this key (LEFT JOIN against | |
| 170 | + | /// `sync_key_usage_current` — `0` when no upload has landed yet). | |
| 171 | + | pub bytes_stored: i64, | |
| 166 | 172 | } | |
| 167 | 173 | ||
| 168 | 174 | // ── OTA models ── |
| @@ -29,7 +29,9 @@ pub async fn get_sync_blob_by_hash( | |||
| 29 | 29 | /// Insert a sync blob, updating size on conflict (idempotent). | |
| 30 | 30 | /// | |
| 31 | 31 | /// Uses `ON CONFLICT DO UPDATE` to keep `size_bytes` consistent with the | |
| 32 | - | /// actual S3 object when concurrent uploads race. | |
| 32 | + | /// actual S3 object when concurrent uploads race. `key` is the developer- | |
| 33 | + | /// defined SDK key from the session JWT; it never changes on re-upload of | |
| 34 | + | /// the same hash so the conflict path leaves it alone. | |
| 33 | 35 | #[tracing::instrument(skip_all)] | |
| 34 | 36 | pub async fn create_sync_blob_idempotent( | |
| 35 | 37 | pool: &PgPool, | |
| @@ -38,11 +40,12 @@ pub async fn create_sync_blob_idempotent( | |||
| 38 | 40 | hash: &str, | |
| 39 | 41 | size_bytes: i64, | |
| 40 | 42 | s3_key: &str, | |
| 43 | + | key: &str, | |
| 41 | 44 | ) -> Result<()> { | |
| 42 | 45 | sqlx::query( | |
| 43 | 46 | r#" | |
| 44 | - | INSERT INTO sync_blobs (app_id, user_id, hash, size_bytes, s3_key) | |
| 45 | - | VALUES ($1, $2, $3, $4, $5) | |
| 47 | + | INSERT INTO sync_blobs (app_id, user_id, hash, size_bytes, s3_key, key) | |
| 48 | + | VALUES ($1, $2, $3, $4, $5, $6) | |
| 46 | 49 | ON CONFLICT (app_id, user_id, hash) | |
| 47 | 50 | DO UPDATE SET size_bytes = EXCLUDED.size_bytes | |
| 48 | 51 | "#, | |
| @@ -52,6 +55,7 @@ pub async fn create_sync_blob_idempotent( | |||
| 52 | 55 | .bind(hash) | |
| 53 | 56 | .bind(size_bytes) | |
| 54 | 57 | .bind(s3_key) | |
| 58 | + | .bind(key) | |
| 55 | 59 | .execute(pool) | |
| 56 | 60 | .await?; | |
| 57 | 61 |
| @@ -462,7 +462,8 @@ pub async fn release_key( | |||
| 462 | 462 | } | |
| 463 | 463 | ||
| 464 | 464 | /// List active (un-released) key claims for an app, ordered by `claimed_at DESC`. | |
| 465 | - | /// Used by the dashboard "Active keys" view. | |
| 465 | + | /// Used by the dashboard "Active keys" view. `bytes_stored` is LEFT-joined | |
| 466 | + | /// from `sync_key_usage_current` — `0` when no upload has landed yet. | |
| 466 | 467 | #[tracing::instrument(skip_all)] | |
| 467 | 468 | pub async fn list_active_keys( | |
| 468 | 469 | pool: &sqlx::PgPool, | |
| @@ -471,10 +472,16 @@ pub async fn list_active_keys( | |||
| 471 | 472 | offset: i64, | |
| 472 | 473 | ) -> Result<Vec<DbSyncAppKey>> { | |
| 473 | 474 | let rows = sqlx::query_as::<_, DbSyncAppKey>( | |
| 474 | - | "SELECT id, key, claimed_at FROM sync_app_keys | |
| 475 | - | WHERE app_id = $1 AND released_at IS NULL | |
| 476 | - | ORDER BY claimed_at DESC | |
| 477 | - | LIMIT $2 OFFSET $3", | |
| 475 | + | r#" | |
| 476 | + | SELECT k.id, k.key, k.claimed_at, | |
| 477 | + | COALESCE(u.bytes_stored, 0) AS bytes_stored | |
| 478 | + | FROM sync_app_keys k | |
| 479 | + | LEFT JOIN sync_key_usage_current u | |
| 480 | + | ON u.app_id = k.app_id AND u.key = k.key | |
| 481 | + | WHERE k.app_id = $1 AND k.released_at IS NULL | |
| 482 | + | ORDER BY k.claimed_at DESC | |
| 483 | + | LIMIT $2 OFFSET $3 | |
| 484 | + | "#, | |
| 478 | 485 | ) | |
| 479 | 486 | .bind(app_id) | |
| 480 | 487 | .bind(limit) | |
| @@ -484,19 +491,58 @@ pub async fn list_active_keys( | |||
| 484 | 491 | Ok(rows) | |
| 485 | 492 | } | |
| 486 | 493 | ||
| 494 | + | /// Per-app top-N key usage for the dashboard panel. Returns the highest-usage | |
| 495 | + | /// active keys (sorted by `bytes_stored DESC`) for each app in `app_ids`, | |
| 496 | + | /// batched into a single query so the integrations page doesn't N+1 across | |
| 497 | + | /// every per_key-mode app. | |
| 498 | + | #[tracing::instrument(skip_all)] | |
| 499 | + | pub async fn get_top_keys_per_app( | |
| 500 | + | pool: &PgPool, | |
| 501 | + | app_ids: &[SyncAppId], | |
| 502 | + | limit_per_app: i64, | |
| 503 | + | ) -> Result<Vec<(SyncAppId, String, i64)>> { | |
| 504 | + | if app_ids.is_empty() { | |
| 505 | + | return Ok(Vec::new()); | |
| 506 | + | } | |
| 507 | + | let rows: Vec<(SyncAppId, String, i64)> = sqlx::query_as( | |
| 508 | + | r#" | |
| 509 | + | SELECT app_id, key, bytes_stored | |
| 510 | + | FROM ( | |
| 511 | + | SELECT u.app_id, u.key, u.bytes_stored, | |
| 512 | + | ROW_NUMBER() OVER (PARTITION BY u.app_id ORDER BY u.bytes_stored DESC, u.key) AS rn | |
| 513 | + | FROM sync_key_usage_current u | |
| 514 | + | JOIN sync_app_keys k | |
| 515 | + | ON k.app_id = u.app_id AND k.key = u.key AND k.released_at IS NULL | |
| 516 | + | WHERE u.app_id = ANY($1) | |
| 517 | + | ) ranked | |
| 518 | + | WHERE rn <= $2 | |
| 519 | + | ORDER BY app_id, rn | |
| 520 | + | "#, | |
| 521 | + | ) | |
| 522 | + | .bind(app_ids) | |
| 523 | + | .bind(limit_per_app) | |
| 524 | + | .fetch_all(pool) | |
| 525 | + | .await?; | |
| 526 | + | Ok(rows) | |
| 527 | + | } | |
| 528 | + | ||
| 487 | 529 | // ── Phase 5: Usage counters and cap enforcement ── | |
| 488 | 530 | ||
| 489 | 531 | /// A breached cap, returned by `would_exceed_*` checks. | |
| 490 | 532 | #[derive(Debug, Clone, PartialEq, Eq)] | |
| 491 | 533 | pub struct ExceededLimit { | |
| 492 | - | /// `"storage"` or `"egress"`. (`"billing"` is reserved for inactive-billing | |
| 493 | - | /// failure paths so a single 402 response shape can carry every reason; /// the caller decides whether to thread that through here or check | |
| 494 | - | /// `billing_status` separately.) | |
| 534 | + | /// `"storage"`, `"storage_per_key"`, or `"egress"`. (`"billing"` is reserved | |
| 535 | + | /// for inactive-billing failure paths so a single 402 response shape can | |
| 536 | + | /// carry every reason; the caller decides whether to thread that through | |
| 537 | + | /// here or check `billing_status` separately.) | |
| 495 | 538 | pub dimension: &'static str, | |
| 496 | 539 | /// Current usage in bytes (before the would-be addition). | |
| 497 | 540 | pub used: i64, | |
| 498 | 541 | /// Configured cap in bytes. | |
| 499 | 542 | pub limit: i64, | |
| 543 | + | /// The SDK key that hit its cap, if `dimension == "storage_per_key"`. | |
| 544 | + | /// `None` for app-wide dimensions. | |
| 545 | + | pub key: Option<String>, | |
| 500 | 546 | } | |
| 501 | 547 | ||
| 502 | 548 | /// A row that may need a warning email sent. | |
| @@ -515,10 +561,15 @@ pub struct WarningCandidate { | |||
| 515 | 561 | pub dimension: &'static str, | |
| 516 | 562 | pub used: i64, | |
| 517 | 563 | pub limit: i64, | |
| 564 | + | /// SDK key that breached the threshold, when `dimension == | |
| 565 | + | /// "storage_per_key"`. `None` for app-wide breaches. | |
| 566 | + | pub key: Option<String>, | |
| 518 | 567 | } | |
| 519 | 568 | ||
| 520 | 569 | /// Increment (or decrement, for negative `delta`) the rolling `bytes_stored` | |
| 521 | - | /// counter on `sync_app_usage_current`. Returns the new value. | |
| 570 | + | /// counter on `sync_app_usage_current` and the per-key row on | |
| 571 | + | /// `sync_key_usage_current` (upserted if missing). Returns the new app-level | |
| 572 | + | /// total. | |
| 522 | 573 | /// | |
| 523 | 574 | /// NOTE: read-then-add elsewhere in the request handler is racy. Acceptable | |
| 524 | 575 | /// overshoot under concurrent uploads in v1. | |
| @@ -526,8 +577,11 @@ pub struct WarningCandidate { | |||
| 526 | 577 | pub async fn add_bytes_stored( | |
| 527 | 578 | pool: &PgPool, | |
| 528 | 579 | app_id: SyncAppId, | |
| 580 | + | key: &str, | |
| 529 | 581 | delta: i64, | |
| 530 | 582 | ) -> Result<i64> { | |
| 583 | + | let mut tx = pool.begin().await?; | |
| 584 | + | ||
| 531 | 585 | let (new_val,): (i64,) = sqlx::query_as( | |
| 532 | 586 | r#" | |
| 533 | 587 | UPDATE sync_app_usage_current | |
| @@ -538,8 +592,26 @@ pub async fn add_bytes_stored( | |||
| 538 | 592 | ) | |
| 539 | 593 | .bind(app_id) | |
| 540 | 594 | .bind(delta) | |
| 541 | - | .fetch_one(pool) | |
| 595 | + | .fetch_one(&mut *tx) | |
| 542 | 596 | .await?; | |
| 597 | + | ||
| 598 | + | sqlx::query( | |
| 599 | + | r#" | |
| 600 | + | INSERT INTO sync_key_usage_current (app_id, key, bytes_stored) | |
| 601 | + | VALUES ($1, $2, GREATEST($3, 0)) | |
| 602 | + | ON CONFLICT (app_id, key) | |
| 603 | + | DO UPDATE SET | |
| 604 | + | bytes_stored = GREATEST(sync_key_usage_current.bytes_stored + $3, 0), | |
| 605 | + | updated_at = NOW() | |
| 606 | + | "#, | |
| 607 | + | ) | |
| 608 | + | .bind(app_id) | |
| 609 | + | .bind(key) | |
| 610 | + | .bind(delta) | |
| 611 | + | .execute(&mut *tx) | |
| 612 | + | .await?; | |
| 613 | + | ||
| 614 | + | tx.commit().await?; | |
| 543 | 615 | Ok(new_val) | |
| 544 | 616 | } | |
| 545 | 617 | ||
| @@ -570,7 +642,16 @@ pub async fn add_bytes_egress( | |||
| 570 | 642 | /// | |
| 571 | 643 | /// Returns `None` if no cap applies (internal apps, billing not active, or | |
| 572 | 644 | /// the cap is unset). Returns `Some(ExceededLimit)` only when adding | |
| 573 | - | /// `additional_bytes` would push `bytes_stored` past the cap. | |
| 645 | + | /// `additional_bytes` would push usage past the cap. | |
| 646 | + | /// | |
| 647 | + | /// In `per_key` mode the per-key counter is checked first against | |
| 648 | + | /// `gb_per_key × 1GB` — if exceeded, returns `dimension: "storage_per_key"` | |
| 649 | + | /// with the offending `key`. The app-aggregate (`key_cap × gb_per_key`) is | |
| 650 | + | /// also checked as a defensive hard ceiling; under normal operation this | |
| 651 | + | /// can't trip before some key has tripped, but it guards against drift. | |
| 652 | + | /// | |
| 653 | + | /// In `bulk` mode `key` is unused (still passed in for call-site uniformity) | |
| 654 | + | /// and the app-wide `storage_gb_cap` is the only check. | |
| 574 | 655 | /// | |
| 575 | 656 | /// Race-condition note: this reads, the caller adds. Concurrent uploads can | |
| 576 | 657 | /// produce small overshoots. Acceptable for v1. | |
| @@ -578,6 +659,7 @@ pub async fn add_bytes_egress( | |||
| 578 | 659 | pub async fn would_exceed_storage( | |
| 579 | 660 | pool: &PgPool, | |
| 580 | 661 | app_id: SyncAppId, | |
| 662 | + | key: &str, | |
| 581 | 663 | additional_bytes: i64, | |
| 582 | 664 | ) -> Result<Option<ExceededLimit>> { | |
| 583 | 665 | let row: Option<(bool, String, Option<i32>, Option<i32>, Option<i32>, Option<i64>)> = sqlx::query_as( | |
| @@ -597,25 +679,60 @@ pub async fn would_exceed_storage( | |||
| 597 | 679 | let Some((is_internal, mode, storage_gb, key_cap, gb_per_key, bytes_stored)) = row else { return Ok(None); }; | |
| 598 | 680 | if is_internal { return Ok(None); } | |
| 599 | 681 | ||
| 600 | - | // Effective storage cap depends on mode. Per-key degradation is currently | |
| 601 | - | // app-wide at the enforcement layer (we cap on key_cap × gb_per_key); the | |
| 602 | - | // dev-side UX can present per-key feedback using their own claim tracking. | |
| 603 | - | let gb = match mode.as_str() { | |
| 604 | - | "bulk" => match storage_gb { Some(g) => g, None => return Ok(None) }, | |
| 605 | - | "per_key" => match (key_cap, gb_per_key) { | |
| 606 | - | (Some(k), Some(g)) => k.saturating_mul(g), | |
| 607 | - | _ => return Ok(None), | |
| 608 | - | }, | |
| 682 | + | let app_used = bytes_stored.unwrap_or(0); | |
| 683 | + | ||
| 684 | + | match mode.as_str() { | |
| 685 | + | "bulk" => { | |
| 686 | + | let Some(gb) = storage_gb else { return Ok(None) }; | |
| 687 | + | let limit = crate::synckit_billing::storage_cap_bytes(gb as u32); | |
| 688 | + | if app_used.saturating_add(additional_bytes) > limit { | |
| 689 | + | return Ok(Some(ExceededLimit { | |
| 690 | + | dimension: "storage", | |
| 691 | + | used: app_used, | |
| 692 | + | limit, | |
| 693 | + | key: None, | |
| 694 | + | })); | |
| 695 | + | } | |
| 696 | + | } | |
| 697 | + | "per_key" => { | |
| 698 | + | let (Some(k), Some(g)) = (key_cap, gb_per_key) else { return Ok(None) }; | |
| 699 | + | let per_key_limit = crate::synckit_billing::storage_cap_bytes(g as u32); | |
| 700 | + | let app_limit = crate::synckit_billing::storage_cap_bytes(k.saturating_mul(g) as u32); | |
| 701 | + | ||
| 702 | + | let key_used: Option<i64> = sqlx::query_scalar( | |
| 703 | + | "SELECT bytes_stored FROM sync_key_usage_current | |
| 704 | + | WHERE app_id = $1 AND key = $2", | |
| 705 | + | ) | |
| 706 | + | .bind(app_id) | |
| 707 | + | .bind(key) | |
| 708 | + | .fetch_optional(pool) | |
| 709 | + | .await?; | |
| 710 | + | let key_used = key_used.unwrap_or(0); | |
| 711 | + | ||
| 712 | + | if key_used.saturating_add(additional_bytes) > per_key_limit { | |
| 713 | + | return Ok(Some(ExceededLimit { | |
| 714 | + | dimension: "storage_per_key", | |
| 715 | + | used: key_used, | |
| 716 | + | limit: per_key_limit, | |
| 717 | + | key: Some(key.to_string()), | |
| 718 | + | })); | |
| 719 | + | } | |
| 720 | + | // Defensive app-aggregate ceiling. Tripping this before the per-key | |
| 721 | + | // check means the per-key counters have drifted under the app | |
| 722 | + | // counter — refuse rather than admit silent over-allocation. | |
| 723 | + | if app_used.saturating_add(additional_bytes) > app_limit { | |
| 724 | + | return Ok(Some(ExceededLimit { | |
| 725 | + | dimension: "storage", | |
| 726 | + | used: app_used, | |
| 727 | + | limit: app_limit, | |
| 728 | + | key: None, | |
| 729 | + | })); | |
| 730 | + | } | |
| 731 | + | } | |
| 609 | 732 | _ => return Ok(None), | |
| 610 | - | }; | |
| 611 | - | ||
| 612 | - | let limit = crate::synckit_billing::storage_cap_bytes(gb as u32); | |
| 613 | - | let used = bytes_stored.unwrap_or(0); | |
| 614 | - | if used.saturating_add(additional_bytes) > limit { | |
| 615 | - | Ok(Some(ExceededLimit { dimension: "storage", used, limit })) | |
| 616 | - | } else { | |
| 617 | - | Ok(None) | |
| 618 | 733 | } | |
| 734 | + | ||
| 735 | + | Ok(None) | |
| 619 | 736 | } | |
| 620 | 737 | ||
| 621 | 738 | /// Fetch the list of active, non-internal apps that may need a warning email, | |
| @@ -655,7 +772,7 @@ pub async fn get_apps_needing_warning(pool: &PgPool) -> Result<Vec<WarningCandid | |||
| 655 | 772 | sa.key_cap AS key_cap, | |
| 656 | 773 | sa.gb_per_key AS gb_per_key, | |
| 657 | 774 | COALESCE(u.bytes_stored, 0) AS bytes_stored, | |
| 658 | - | COALESCE(u.last_warning_pct, 0) AS last_warning_pct | |
| 775 | + | COALESCE(u.last_warning_pct, 0::smallint) AS last_warning_pct | |
| 659 | 776 | FROM sync_apps sa | |
| 660 | 777 | JOIN users u_user ON u_user.id = sa.creator_id | |
| 661 | 778 | LEFT JOIN sync_app_usage_current u ON u.app_id = sa.id | |
| @@ -667,32 +784,92 @@ pub async fn get_apps_needing_warning(pool: &PgPool) -> Result<Vec<WarningCandid | |||
| 667 | 784 | .await?; | |
| 668 | 785 | ||
| 669 | 786 | let mut out = Vec::new(); | |
| 787 | + | let mut per_key_apps: Vec<(SyncAppId, UserId, String, String, i32)> = Vec::new(); | |
| 670 | 788 | for r in rows { | |
| 671 | - | let gb = match r.enforcement_mode.as_str() { | |
| 672 | - | "bulk" => r.storage_gb_cap, | |
| 673 | - | "per_key" => match (r.key_cap, r.gb_per_key) { | |
| 674 | - | (Some(k), Some(g)) => Some(k.saturating_mul(g)), | |
| 675 | - | _ => None, | |
| 676 | - | }, | |
| 677 | - | _ => None, | |
| 678 | - | }; | |
| 679 | - | let Some(gb) = gb else { continue }; | |
| 680 | - | let limit = crate::synckit_billing::storage_cap_bytes(gb as u32); | |
| 681 | - | if let Some(pct) = highest_breached_threshold( | |
| 682 | - | r.bytes_stored, limit, r.last_warning_pct, | |
| 683 | - | ) { | |
| 684 | - | out.push(WarningCandidate { | |
| 685 | - | app_id: r.app_id, | |
| 686 | - | creator_id: r.creator_id, | |
| 687 | - | creator_email: r.creator_email, | |
| 688 | - | app_name: r.app_name, | |
| 689 | - | threshold_pct: pct, | |
| 690 | - | dimension: "storage", | |
| 691 | - | used: r.bytes_stored, | |
| 692 | - | limit, | |
| 693 | - | }); | |
| 789 | + | match r.enforcement_mode.as_str() { | |
| 790 | + | "bulk" => { | |
| 791 | + | let Some(gb) = r.storage_gb_cap else { continue }; | |
| 792 | + | let limit = crate::synckit_billing::storage_cap_bytes(gb as u32); | |
| 793 | + | if let Some(pct) = | |
| 794 | + | highest_breached_threshold(r.bytes_stored, limit, r.last_warning_pct) | |
| 795 | + | { | |
| 796 | + | out.push(WarningCandidate { | |
| 797 | + | app_id: r.app_id, | |
| 798 | + | creator_id: r.creator_id, | |
| 799 | + | creator_email: r.creator_email, | |
| 800 | + | app_name: r.app_name, | |
| 801 | + | threshold_pct: pct, | |
| 802 | + | dimension: "storage", | |
| 803 | + | used: r.bytes_stored, | |
| 804 | + | limit, | |
| 805 | + | key: None, | |
| 806 | + | }); | |
| 807 | + | } | |
| 808 | + | } | |
| 809 | + | "per_key" => { | |
| 810 | + | // Defer to a second query that fans out per (app, key); the | |
| 811 | + | // per-key counter — not the app aggregate — is what we warn on. | |
| 812 | + | let (Some(_), Some(g)) = (r.key_cap, r.gb_per_key) else { continue }; | |
| 813 | + | per_key_apps.push((r.app_id, r.creator_id, r.creator_email, r.app_name, g)); | |
| 814 | + | } | |
| 815 | + | _ => {} | |
| 816 | + | } | |
| 817 | + | } | |
| 818 | + | ||
| 819 | + | if !per_key_apps.is_empty() { | |
| 820 | + | #[derive(sqlx::FromRow)] | |
| 821 | + | struct KeyRow { | |
| 822 | + | app_id: SyncAppId, | |
| 823 | + | key: String, | |
| 824 | + | bytes_stored: i64, | |
| 825 | + | last_warning_pct: i16, | |
| 826 | + | } | |
| 827 | + | let app_ids: Vec<SyncAppId> = per_key_apps.iter().map(|t| t.0).collect(); | |
| 828 | + | let key_rows = sqlx::query_as::<_, KeyRow>( | |
| 829 | + | r#" | |
| 830 | + | SELECT u.app_id, u.key, u.bytes_stored, u.last_warning_pct | |
| 831 | + | FROM sync_key_usage_current u | |
| 832 | + | JOIN sync_app_keys k | |
| 833 | + | ON k.app_id = u.app_id AND k.key = u.key AND k.released_at IS NULL | |
| 834 | + | WHERE u.app_id = ANY($1) | |
| 835 | + | "#, | |
| 836 | + | ) | |
| 837 | + | .bind(&app_ids) | |
| 838 | + | .fetch_all(pool) | |
| 839 | + | .await?; | |
| 840 | + | ||
| 841 | + | // Index app metadata by id for cheap join. | |
| 842 | + | let meta: std::collections::HashMap<SyncAppId, (UserId, String, String, i32)> = | |
| 843 | + | per_key_apps | |
| 844 | + | .into_iter() | |
| 845 | + | .map(|(a, c, e, n, g)| (a, (c, e, n, g))) | |
| 846 | + | .collect(); | |
| 847 | + | ||
| 848 | + | for r in key_rows { | |
| 849 | + | let Some((creator_id, creator_email, app_name, gb_per_key)) = | |
| 850 | + | meta.get(&r.app_id).cloned() | |
| 851 | + | else { | |
| 852 | + | continue; | |
| 853 | + | }; | |
| 854 | + | let limit = crate::synckit_billing::storage_cap_bytes(gb_per_key as u32); | |
| 855 | + | if let Some(pct) = | |
| 856 | + | highest_breached_threshold(r.bytes_stored, limit, r.last_warning_pct) | |
| 857 | + | { | |
| 858 | + | out.push(WarningCandidate { | |
| 859 | + | app_id: r.app_id, | |
| 860 | + | creator_id, | |
| 861 | + | creator_email, | |
| 862 | + | app_name, | |
| 863 | + | threshold_pct: pct, | |
| 864 | + | dimension: "storage_per_key", | |
| 865 | + | used: r.bytes_stored, | |
| 866 | + | limit, | |
| 867 | + | key: Some(r.key), | |
| 868 | + | }); | |
| 869 | + | } | |
| 694 | 870 | } | |
| 695 | 871 | } | |
| 872 | + | ||
| 696 | 873 | Ok(out) | |
| 697 | 874 | } | |
| 698 | 875 | ||
| @@ -734,11 +911,39 @@ pub async fn update_warning_pct( | |||
| 734 | 911 | Ok(()) | |
| 735 | 912 | } | |
| 736 | 913 | ||
| 914 | + | /// Per-key analogue of `update_warning_pct`. Stamps the band on | |
| 915 | + | /// `sync_key_usage_current` so subsequent ticks don't re-fire the same band | |
| 916 | + | /// for the same key. | |
| 917 | + | #[tracing::instrument(skip_all)] | |
| 918 | + | pub async fn update_key_warning_pct( | |
| 919 | + | pool: &PgPool, | |
| 920 | + | app_id: SyncAppId, | |
| 921 | + | key: &str, | |
| 922 | + | pct: i16, | |
| 923 | + | ) -> Result<()> { | |
| 924 | + | sqlx::query( | |
| 925 | + | r#" | |
| 926 | + | UPDATE sync_key_usage_current | |
| 927 | + | SET last_warning_pct = $3, updated_at = NOW() | |
| 928 | + | WHERE app_id = $1 AND key = $2 | |
| 929 | + | "#, | |
| 930 | + | ) | |
| 931 | + | .bind(app_id) | |
| 932 | + | .bind(key) | |
| 933 | + | .bind(pct) | |
| 934 | + | .execute(pool) | |
| 935 | + | .await?; | |
| 936 | + | Ok(()) | |
| 937 | + | } | |
| 938 | + | ||
| 737 | 939 | /// Recalculate `bytes_stored` from the authoritative `sync_blobs` table for | |
| 738 | - | /// every app. Weekly drift correction. Returns count of rows updated. | |
| 940 | + | /// every app and every per-key counter. Weekly drift correction. Returns | |
| 941 | + | /// total count of rows updated across both tables. | |
| 739 | 942 | #[tracing::instrument(skip_all)] | |
| 740 | 943 | pub async fn recalculate_synckit_app_storage(pool: &PgPool) -> Result<u64> { | |
| 741 | - | let res = sqlx::query( | |
| 944 | + | let mut tx = pool.begin().await?; | |
| 945 | + | ||
| 946 | + | let app_res = sqlx::query( | |
| 742 | 947 | r#" | |
| 743 | 948 | UPDATE sync_app_usage_current u | |
| 744 | 949 | SET bytes_stored = COALESCE(s.total, 0), updated_at = NOW() | |
| @@ -751,9 +956,32 @@ pub async fn recalculate_synckit_app_storage(pool: &PgPool) -> Result<u64> { | |||
| 751 | 956 | AND u.bytes_stored <> COALESCE(s.total, 0) | |
| 752 | 957 | "#, | |
| 753 | 958 | ) | |
| 754 | - | .execute(pool) | |
| 959 | + | .execute(&mut *tx) | |
| 755 | 960 | .await?; | |
| 756 | - | Ok(res.rows_affected()) | |
| 961 | + | ||
| 962 | + | // Per-key reconciliation: upsert every (app_id, key) found in sync_blobs. | |
| 963 | + | // Keys that disappear (all blobs deleted) aren't pruned here — bytes_stored | |
| 964 | + | // would just stay at whatever it last drifted to. Blob delete isn't shipped | |
| 965 | + | // yet, so this is fine. When it ships, add a step that resets rows whose | |
| 966 | + | // sync_blobs total is zero (or just delete them — the upsert recreates). | |
| 967 | + | let key_res = sqlx::query( | |
| 968 | + | r#" | |
| 969 | + | INSERT INTO sync_key_usage_current (app_id, key, bytes_stored, updated_at) | |
| 970 | + | SELECT app_id, key, SUM(size_bytes)::BIGINT, NOW() | |
| 971 | + | FROM sync_blobs | |
| 972 | + | GROUP BY app_id, key | |
| 973 | + | ON CONFLICT (app_id, key) | |
| 974 | + | DO UPDATE SET | |
| 975 | + | bytes_stored = EXCLUDED.bytes_stored, | |
| 976 | + | updated_at = NOW() | |
| 977 | + | WHERE sync_key_usage_current.bytes_stored <> EXCLUDED.bytes_stored | |
| 978 | + | "#, | |
| 979 | + | ) | |
| 980 | + | .execute(&mut *tx) | |
| 981 | + | .await?; | |
| 982 | + | ||
| 983 | + | tx.commit().await?; | |
| 984 | + | Ok(app_res.rows_affected() + key_res.rows_affected()) | |
| 757 | 985 | } | |
| 758 | 986 | ||
| 759 | 987 | /// Check whether a key is currently actively claimed. Used by the claim |
| @@ -583,13 +583,17 @@ Your content remains accessible to fans. If you experience issues, they should r | |||
| 583 | 583 | ||
| 584 | 584 | /// Warn an app owner that they're approaching (or have hit) a SyncKit cap. | |
| 585 | 585 | /// | |
| 586 | - | /// `dimension` is `"storage"` or `"egress"`. `pct` is 75/90/100. At 100% | |
| 587 | - | /// the next request to that dimension will be hard-blocked with a 402. | |
| 586 | + | /// `dimension` is `"storage"`, `"storage_per_key"`, or `"egress"`. `pct` | |
| 587 | + | /// is 75/90/100. At 100% the next request to that dimension will be | |
| 588 | + | /// hard-blocked with a 402. `key` is `Some(_)` only when | |
| 589 | + | /// `dimension == "storage_per_key"` — it names the SDK key whose | |
| 590 | + | /// allotment tripped, so the developer knows which workspace to nudge. | |
| 588 | 591 | pub async fn send_synckit_usage_warning( | |
| 589 | 592 | &self, | |
| 590 | 593 | to_email: &str, | |
| 591 | 594 | app_name: &str, | |
| 592 | 595 | dimension: &str, | |
| 596 | + | key: Option<&str>, | |
| 593 | 597 | pct: i16, | |
| 594 | 598 | used_bytes: i64, | |
| 595 | 599 | limit_bytes: i64, | |
| @@ -601,9 +605,13 @@ Your content remains accessible to fans. If you experience issues, they should r | |||
| 601 | 605 | } | |
| 602 | 606 | ||
| 603 | 607 | let dim_human = match dimension { | |
| 604 | - | "storage" => "storage", | |
| 605 | - | "egress" => "monthly egress", | |
| 606 | - | other => other, | |
| 608 | + | "storage" => "storage".to_string(), | |
| 609 | + | "storage_per_key" => match key { | |
| 610 | + | Some(k) => format!("storage for key \"{k}\""), | |
| 611 | + | None => "per-key storage".to_string(), | |
| 612 | + | }, | |
| 613 | + | "egress" => "monthly egress".to_string(), | |
| 614 | + | other => other.to_string(), | |
| 607 | 615 | }; | |
| 608 | 616 | let subject = if pct >= 100 { | |
| 609 | 617 | format!("{app_name}: {dim_human} cap reached") |
| @@ -66,6 +66,9 @@ pub struct TokenRequest { | |||
| 66 | 66 | pub redirect_uri: String, | |
| 67 | 67 | pub code_verifier: String, | |
| 68 | 68 | pub client_id: String, | |
| 69 | + | /// Developer-defined SDK key. Identifies which billing slot this session's | |
| 70 | + | /// uploads count against. Required. | |
| 71 | + | pub key: String, | |
| 69 | 72 | } | |
| 70 | 73 | ||
| 71 | 74 | #[derive(Serialize)] | |
| @@ -426,6 +429,8 @@ async fn token_exchange( | |||
| 426 | 429 | return Err(AppError::BadRequest("grant_type must be 'authorization_code'".to_string())); | |
| 427 | 430 | } | |
| 428 | 431 | ||
| 432 | + | crate::validation::validate_synckit_key(&req.key)?; | |
| 433 | + | ||
| 429 | 434 | let secret = state | |
| 430 | 435 | .config | |
| 431 | 436 | .synckit_jwt_secret | |
| @@ -467,6 +472,7 @@ async fn token_exchange( | |||
| 467 | 472 | secret, | |
| 468 | 473 | oauth_code.user_id, | |
| 469 | 474 | oauth_code.app_id, | |
| 475 | + | &req.key, | |
| 470 | 476 | )?; | |
| 471 | 477 | ||
| 472 | 478 | Ok(Json(TokenResponse { |
| @@ -629,6 +629,7 @@ pub(super) async fn project_tab_synckit( | |||
| 629 | 629 | .into_iter() | |
| 630 | 630 | .map(|b| (b.id, b)) | |
| 631 | 631 | .collect(); | |
| 632 | + | let top_keys_map = crate::types::build_top_keys_map(&state.db, &billing_map).await?; | |
| 632 | 633 | ||
| 633 | 634 | let mut apps = Vec::with_capacity(db_apps.len()); | |
| 634 | 635 | for app in &db_apps { | |
| @@ -639,9 +640,11 @@ pub(super) async fn project_tab_synckit( | |||
| 639 | 640 | ||
| 640 | 641 | let api_key_masked = format!("{}...", &app.api_key_prefix); | |
| 641 | 642 | ||
| 642 | - | let billing = billing_map | |
| 643 | - | .get(&app.id) | |
| 644 | - | .map(crate::types::SyncAppBillingView::from_db); | |
| 643 | + | let billing = billing_map.get(&app.id).map(|b| { | |
| 644 | + | let mut view = crate::types::SyncAppBillingView::from_db(b); | |
| 645 | + | crate::types::apply_top_keys(&mut view, b, top_keys_map.get(&b.id)); | |
| 646 | + | view | |
| 647 | + | }); | |
| 645 | 648 | ||
| 646 | 649 | apps.push(SyncAppRow { | |
| 647 | 650 | id: app.id.to_string(), |
| @@ -116,6 +116,10 @@ pub(in crate::routes::pages::dashboard) async fn dashboard_tab_synckit( | |||
| 116 | 116 | .map(|b| (b.id, b)) | |
| 117 | 117 | .collect(); | |
| 118 | 118 | ||
| 119 | + | // For per_key-mode apps, fetch the most-loaded keys to render mini-gauges | |
| 120 | + | // under the app-aggregate gauge. One batched query covers every app. | |
| 121 | + | let top_keys_map = build_top_keys_map(&state.db, &billing_map).await?; | |
| 122 | + | ||
| 119 | 123 | let mut apps = Vec::with_capacity(db_apps.len()); | |
| 120 | 124 | for app in db_apps { | |
| 121 | 125 | let (device_count, log_entry_count) = stats_map | |
| @@ -136,9 +140,11 @@ pub(in crate::routes::pages::dashboard) async fn dashboard_tab_synckit( | |||
| 136 | 140 | let item_title = | |
| 137 | 141 | app.item_id.and_then(|iid| item_title_map.get(&iid).cloned()); | |
| 138 | 142 | ||
| 139 | - | let billing = billing_map | |
| 140 | - | .get(&app.id) | |
| 141 | - | .map(crate::types::SyncAppBillingView::from_db); | |
| 143 | + | let billing = billing_map.get(&app.id).map(|b| { | |
| 144 | + | let mut view = crate::types::SyncAppBillingView::from_db(b); | |
| 145 | + | apply_top_keys(&mut view, b, top_keys_map.get(&b.id)); | |
| 146 | + | view | |
| 147 | + | }); | |
| 142 | 148 | ||
| 143 | 149 | apps.push(SyncAppRow { | |
| 144 | 150 | id: app.id.to_string(), |
| @@ -11,6 +11,7 @@ use crate::{ | |||
| 11 | 11 | db, | |
| 12 | 12 | error::{AppError, Result}, | |
| 13 | 13 | synckit_auth, | |
| 14 | + | validation, | |
| 14 | 15 | AppState, | |
| 15 | 16 | }; | |
| 16 | 17 | ||
| @@ -48,6 +49,8 @@ pub(super) async fn sync_auth( | |||
| 48 | 49 | .as_deref() | |
| 49 | 50 | .ok_or_else(|| AppError::ServiceUnavailable("SyncKit is not configured".to_string()))?; | |
| 50 | 51 | ||
| 52 | + | validation::validate_synckit_key(&req.key)?; | |
| 53 | + | ||
| 51 | 54 | // Verify app exists and is active | |
| 52 | 55 | let app = db::synckit::get_sync_app_by_api_key(&state.db, &req.api_key) | |
| 53 | 56 | .await? | |
| @@ -113,7 +116,7 @@ pub(super) async fn sync_auth( | |||
| 113 | 116 | // Successful auth — reset failed login counter | |
| 114 | 117 | db::auth::reset_failed_login(&state.db, user.id).await?; | |
| 115 | 118 | ||
| 116 | - | let token = synckit_auth::create_sync_token(secret, user.id, app.id)?; | |
| 119 | + | let token = synckit_auth::create_sync_token(secret, user.id, app.id, &req.key)?; | |
| 117 | 120 | ||
| 118 | 121 | Ok(Json(SyncAuthResponse { | |
| 119 | 122 | token, |
| @@ -128,17 +128,18 @@ pub(super) async fn blob_confirm_upload( | |||
| 128 | 128 | .into_response()); | |
| 129 | 129 | } | |
| 130 | 130 | if let Some(exceeded) = synckit_billing::would_exceed_storage( | |
| 131 | - | &state.db, sync_user.app_id, req.size_bytes, | |
| 131 | + | &state.db, sync_user.app_id, &sync_user.key, req.size_bytes, | |
| 132 | 132 | ).await? { | |
| 133 | - | return Ok(( | |
| 134 | - | StatusCode::PAYMENT_REQUIRED, | |
| 135 | - | Json(json!({ | |
| 136 | - | "reason": "storage_limit_reached", | |
| 137 | - | "used": exceeded.used, | |
| 138 | - | "limit": exceeded.limit, | |
| 139 | - | })), | |
| 140 | - | ) | |
| 141 | - | .into_response()); | |
| 133 | + | let mut body = json!({ | |
| 134 | + | "reason": "storage_limit_reached", | |
| 135 | + | "dimension": exceeded.dimension, | |
| 136 | + | "used": exceeded.used, | |
| 137 | + | "limit": exceeded.limit, | |
| 138 | + | }); | |
| 139 | + | if let Some(k) = &exceeded.key { | |
| 140 | + | body["key"] = json!(k); | |
| 141 | + | } | |
| 142 | + | return Ok((StatusCode::PAYMENT_REQUIRED, Json(body)).into_response()); | |
| 142 | 143 | } | |
| 143 | 144 | } | |
| 144 | 145 | ||
| @@ -162,15 +163,17 @@ pub(super) async fn blob_confirm_upload( | |||
| 162 | 163 | &req.hash, | |
| 163 | 164 | req.size_bytes, | |
| 164 | 165 | &s3_key, | |
| 166 | + | &sync_user.key, | |
| 165 | 167 | ) | |
| 166 | 168 | .await?; | |
| 167 | 169 | ||
| 168 | - | // Update the rolling storage counter. We don't fail the request if this | |
| 169 | - | // breaks — the weekly drift correction job will reconcile from sync_blobs. | |
| 170 | - | // Skip for internal apps to keep the counter at 0 there. | |
| 170 | + | // Update the rolling storage counter (app-level + per-key). We don't fail | |
| 171 | + | // the request if this breaks — the weekly drift correction job will | |
| 172 | + | // reconcile from sync_blobs. Skip for internal apps to keep the counter | |
| 173 | + | // at 0 there. | |
| 171 | 174 | if !billing.is_internal { | |
| 172 | 175 | if let Err(e) = synckit_billing::add_bytes_stored( | |
| 173 | - | &state.db, sync_user.app_id, req.size_bytes, | |
| 176 | + | &state.db, sync_user.app_id, &sync_user.key, req.size_bytes, | |
| 174 | 177 | ).await { | |
| 175 | 178 | tracing::error!(error = ?e, app_id = %sync_user.app_id, "failed to bump bytes_stored"); | |
| 176 | 179 | } |
| @@ -137,6 +137,7 @@ pub(super) async fn list( | |||
| 137 | 137 | id: r.id, | |
| 138 | 138 | key: r.key, | |
| 139 | 139 | claimed_at: r.claimed_at, | |
| 140 | + | bytes_stored: r.bytes_stored, | |
| 140 | 141 | }) | |
| 141 | 142 | .collect(); | |
| 142 | 143 |
| @@ -43,6 +43,9 @@ pub(crate) struct SyncAuthRequest { | |||
| 43 | 43 | pub email: String, | |
| 44 | 44 | pub password: String, | |
| 45 | 45 | pub api_key: String, | |
| 46 | + | /// Developer-defined SDK key. Identifies which billing slot this session's | |
| 47 | + | /// uploads count against. Required. | |
| 48 | + | pub key: String, | |
| 46 | 49 | } | |
| 47 | 50 | ||
| 48 | 51 | #[derive(Serialize, utoipa::ToSchema)] | |
| @@ -404,6 +407,9 @@ pub(crate) struct KeyInfo { | |||
| 404 | 407 | pub id: uuid::Uuid, | |
| 405 | 408 | pub key: String, | |
| 406 | 409 | pub claimed_at: DateTime<Utc>, | |
| 410 | + | /// Bytes stored under this key (rolling counter, reconciled weekly by | |
| 411 | + | /// the drift job). `0` if no upload has confirmed yet for this key. | |
| 412 | + | pub bytes_stored: i64, | |
| 407 | 413 | } | |
| 408 | 414 | ||
| 409 | 415 | /// Response body for `POST /api/sync/keys/list`. |
| @@ -43,6 +43,7 @@ pub(super) async fn check_and_send_warnings(state: &AppState) { | |||
| 43 | 43 | &c.creator_email, | |
| 44 | 44 | &c.app_name, | |
| 45 | 45 | c.dimension, | |
| 46 | + | c.key.as_deref(), | |
| 46 | 47 | c.threshold_pct, | |
| 47 | 48 | c.used, | |
| 48 | 49 | c.limit, | |
| @@ -52,17 +53,33 @@ pub(super) async fn check_and_send_warnings(state: &AppState) { | |||
| 52 | 53 | error = ?e, | |
| 53 | 54 | app_id = %c.app_id, | |
| 54 | 55 | dimension = c.dimension, | |
| 56 | + | key = c.key.as_deref().unwrap_or(""), | |
| 55 | 57 | pct = c.threshold_pct, | |
| 56 | 58 | "synckit warnings: send failed", | |
| 57 | 59 | ); | |
| 58 | 60 | continue; | |
| 59 | 61 | } | |
| 60 | - | if let Err(e) = synckit_billing::update_warning_pct( | |
| 61 | - | &state.db, c.app_id, c.threshold_pct, | |
| 62 | - | ).await { | |
| 62 | + | // Stamp the band on the right table: per-key warnings stamp the | |
| 63 | + | // per-key row, app-wide stamps the app row. Failure to stamp logs | |
| 64 | + | // and continues — next tick will re-fire, which is preferable to | |
| 65 | + | // silently losing the notification. | |
| 66 | + | let stamp = match c.key.as_deref() { | |
| 67 | + | Some(k) => { | |
| 68 | + | synckit_billing::update_key_warning_pct( | |
| 69 | + | &state.db, c.app_id, k, c.threshold_pct, | |
| 70 | + | ).await | |
| 71 | + | } | |
| 72 | + | None => { | |
| 73 | + | synckit_billing::update_warning_pct( | |
| 74 | + | &state.db, c.app_id, c.threshold_pct, | |
| 75 | + | ).await | |
| 76 | + | } | |
| 77 | + | }; | |
| 78 | + | if let Err(e) = stamp { | |
| 63 | 79 | tracing::error!( | |
| 64 | 80 | error = ?e, | |
| 65 | 81 | app_id = %c.app_id, | |
| 82 | + | key = c.key.as_deref().unwrap_or(""), | |
| 66 | 83 | "synckit warnings: stamp failed (will re-fire next tick)", | |
| 67 | 84 | ); | |
| 68 | 85 | } |
| @@ -21,6 +21,10 @@ pub struct SyncClaims { | |||
| 21 | 21 | pub sub: UserId, | |
| 22 | 22 | /// App ID | |
| 23 | 23 | pub app: SyncAppId, | |
| 24 | + | /// Developer-defined SDK key this session belongs to. Required for | |
| 25 | + | /// per-key storage attribution. The dev's backend picks the key when | |
| 26 | + | /// minting the session — typically one key per workspace/org/end-user. | |
| 27 | + | pub key: String, | |
| 24 | 28 | /// Issuer | |
| 25 | 29 | pub iss: String, | |
| 26 | 30 | /// Expiration (Unix timestamp) | |
| @@ -30,11 +34,17 @@ pub struct SyncClaims { | |||
| 30 | 34 | } | |
| 31 | 35 | ||
| 32 | 36 | /// Create a signed JWT for a sync user. | |
| 33 | - | pub fn create_sync_token(secret: &str, user_id: UserId, app_id: SyncAppId) -> Result<String, AppError> { | |
| 37 | + | pub fn create_sync_token( | |
| 38 | + | secret: &str, | |
| 39 | + | user_id: UserId, | |
| 40 | + | app_id: SyncAppId, | |
| 41 | + | key: &str, | |
| 42 | + | ) -> Result<String, AppError> { | |
| 34 | 43 | let now = chrono::Utc::now().timestamp(); | |
| 35 | 44 | let claims = SyncClaims { | |
| 36 | 45 | sub: user_id, | |
| 37 | 46 | app: app_id, | |
| 47 | + | key: key.to_string(), | |
| 38 | 48 | iss: SYNCKIT_JWT_ISSUER.to_string(), | |
| 39 | 49 | exp: now + SYNCKIT_JWT_EXPIRY_SECS, | |
| 40 | 50 | iat: now, | |
| @@ -71,6 +81,8 @@ pub fn decode_sync_token(secret: &str, token: &str) -> Result<SyncClaims, AppErr | |||
| 71 | 81 | pub struct SyncUser { | |
| 72 | 82 | pub user_id: UserId, | |
| 73 | 83 | pub app_id: SyncAppId, | |
| 84 | + | /// SDK key this session was minted under. All writes attributed here. | |
| 85 | + | pub key: String, | |
| 74 | 86 | } | |
| 75 | 87 | ||
| 76 | 88 | impl FromRequestParts<AppState> for SyncUser { | |
| @@ -123,9 +135,14 @@ impl FromRequestParts<AppState> for SyncUser { | |||
| 123 | 135 | return Err(AppError::Unauthorized); | |
| 124 | 136 | } | |
| 125 | 137 | ||
| 138 | + | if claims.key.is_empty() { | |
| 139 | + | return Err(AppError::Unauthorized); | |
| 140 | + | } | |
| 141 | + | ||
| 126 | 142 | Ok(SyncUser { | |
| 127 | 143 | user_id: claims.sub, | |
| 128 | 144 | app_id: claims.app, | |
| 145 | + | key: claims.key, | |
| 129 | 146 | }) | |
| 130 | 147 | } | |
| 131 | 148 | } | |
| @@ -135,17 +152,19 @@ mod tests { | |||
| 135 | 152 | use super::*; | |
| 136 | 153 | ||
| 137 | 154 | const TEST_SECRET: &str = "test-secret-key-for-synckit-jwt"; | |
| 155 | + | const TEST_KEY: &str = "test-key"; | |
| 138 | 156 | ||
| 139 | 157 | #[test] | |
| 140 | 158 | fn jwt_round_trip() { | |
| 141 | 159 | let user_id = UserId::new(); | |
| 142 | 160 | let app_id = SyncAppId::new(); | |
| 143 | 161 | ||
| 144 | - | let token = create_sync_token(TEST_SECRET, user_id, app_id).unwrap(); | |
| 162 | + | let token = create_sync_token(TEST_SECRET, user_id, app_id, TEST_KEY).unwrap(); | |
| 145 | 163 | let claims = decode_sync_token(TEST_SECRET, &token).unwrap(); | |
| 146 | 164 | ||
| 147 | 165 | assert_eq!(claims.sub, user_id); | |
| 148 | 166 | assert_eq!(claims.app, app_id); | |
| 167 | + | assert_eq!(claims.key, TEST_KEY); | |
| 149 | 168 | } | |
| 150 | 169 | ||
| 151 | 170 | #[test] | |
| @@ -157,6 +176,7 @@ mod tests { | |||
| 157 | 176 | let claims = SyncClaims { | |
| 158 | 177 | sub: user_id, | |
| 159 | 178 | app: app_id, | |
| 179 | + | key: TEST_KEY.to_string(), | |
| 160 | 180 | iss: SYNCKIT_JWT_ISSUER.to_string(), | |
| 161 | 181 | exp: now - 3600, // expired 1 hour ago | |
| 162 | 182 | iat: now - 7200, | |
| @@ -182,7 +202,7 @@ mod tests { | |||
| 182 | 202 | let user_id = UserId::new(); | |
| 183 | 203 | let app_id = SyncAppId::new(); | |
| 184 | 204 | ||
| 185 | - | let token = create_sync_token(TEST_SECRET, user_id, app_id).unwrap(); | |
| 205 | + | let token = create_sync_token(TEST_SECRET, user_id, app_id, TEST_KEY).unwrap(); | |
| 186 | 206 | assert!(decode_sync_token("wrong-secret", &token).is_err()); | |
| 187 | 207 | } | |
| 188 | 208 | ||
| @@ -212,6 +232,7 @@ mod tests { | |||
| 212 | 232 | let claims = SyncClaims { | |
| 213 | 233 | sub: user_id, | |
| 214 | 234 | app: app_id, | |
| 235 | + | key: TEST_KEY.to_string(), | |
| 215 | 236 | iss: "wrong-issuer".to_string(), | |
| 216 | 237 | exp: now + SYNCKIT_JWT_EXPIRY_SECS, | |
| 217 | 238 | iat: now, | |
| @@ -261,7 +282,7 @@ mod tests { | |||
| 261 | 282 | let user_id = UserId::new(); | |
| 262 | 283 | let app_id = SyncAppId::new(); | |
| 263 | 284 | ||
| 264 | - | let token = create_sync_token(TEST_SECRET, user_id, app_id).unwrap(); | |
| 285 | + | let token = create_sync_token(TEST_SECRET, user_id, app_id, TEST_KEY).unwrap(); | |
| 265 | 286 | let parts: Vec<&str> = token.split('.').collect(); | |
| 266 | 287 | assert_eq!(parts.len(), 3); | |
| 267 | 288 | ||
| @@ -291,6 +312,7 @@ mod tests { | |||
| 291 | 312 | let claims = SyncClaims { | |
| 292 | 313 | sub: user_id, | |
| 293 | 314 | app: app_id, | |
| 315 | + | key: TEST_KEY.to_string(), | |
| 294 | 316 | iss: SYNCKIT_JWT_ISSUER.to_string(), | |
| 295 | 317 | exp: now + SYNCKIT_JWT_EXPIRY_SECS, | |
| 296 | 318 | iat: now + 86400 * 365, // 1 year in the future |
| @@ -490,6 +490,91 @@ fn gauge_tier(pct: i32) -> &'static str { | |||
| 490 | 490 | if pct >= 90 { "danger" } else if pct >= 75 { "warn" } else { "" } | |
| 491 | 491 | } | |
| 492 | 492 | ||
| 493 | + | /// How many keys to surface on the dashboard before truncating. The full | |
| 494 | + | /// list is still reachable via `POST /api/sync/keys/list`; the cap keeps a | |
| 495 | + | /// 1,000-key app from rendering 1,000 progress bars. | |
| 496 | + | pub const DASHBOARD_TOP_KEYS: usize = 5; | |
| 497 | + | ||
| 498 | + | /// Batched fetch + group: returns a map of `app_id → Vec<(key, bytes_stored)>`, | |
| 499 | + | /// limited to `DASHBOARD_TOP_KEYS + 1` per app (the extra row tells the caller | |
| 500 | + | /// whether to render a "more" indicator). Only `per_key`-mode active apps are | |
| 501 | + | /// queried; everything else is omitted from the map. | |
| 502 | + | pub async fn build_top_keys_map( | |
| 503 | + | pool: &sqlx::PgPool, | |
| 504 | + | billing_map: &std::collections::HashMap<db::SyncAppId, db::DbSyncAppBilling>, | |
| 505 | + | ) -> Result<std::collections::HashMap<db::SyncAppId, Vec<(String, i64)>>, crate::error::AppError> { | |
| 506 | + | let per_key_app_ids: Vec<db::SyncAppId> = billing_map | |
| 507 | + | .values() | |
| 508 | + | .filter(|b| b.enforcement_mode == "per_key" && b.billing_status == "active") | |
| 509 | + | .map(|b| b.id) | |
| 510 | + | .collect(); | |
| 511 | + | if per_key_app_ids.is_empty() { | |
| 512 | + | return Ok(std::collections::HashMap::new()); | |
| 513 | + | } | |
| 514 | + | let rows = db::synckit_billing::get_top_keys_per_app( | |
| 515 | + | pool, | |
| 516 | + | &per_key_app_ids, | |
| 517 | + | (DASHBOARD_TOP_KEYS + 1) as i64, | |
| 518 | + | ) | |
| 519 | + | .await?; | |
| 520 | + | let mut map: std::collections::HashMap<db::SyncAppId, Vec<(String, i64)>> = | |
| 521 | + | std::collections::HashMap::new(); | |
| 522 | + | for (app_id, key, bytes) in rows { | |
| 523 | + | map.entry(app_id).or_default().push((key, bytes)); | |
| 524 | + | } | |
| 525 | + | Ok(map) | |
| 526 | + | } | |
| 527 | + | ||
| 528 | + | /// Populate `view.top_keys` and `view.more_keys` from a row batch fetched by | |
| 529 | + | /// `build_top_keys_map`. Safe to call for bulk-mode apps too (no-op when | |
| 530 | + | /// `rows` is `None` or `gb_per_key` is missing). | |
| 531 | + | pub fn apply_top_keys( | |
| 532 | + | view: &mut super::dashboard::SyncAppBillingView, | |
| 533 | + | b: &db::DbSyncAppBilling, | |
| 534 | + | rows: Option<&Vec<(String, i64)>>, | |
| 535 | + | ) { | |
| 536 | + | if b.enforcement_mode != "per_key" { | |
| 537 | + | return; | |
| 538 | + | } | |
| 539 | + | let Some(gb_per_key) = b.gb_per_key else { return }; | |
| 540 | + | let Some(rows) = rows else { return }; | |
| 541 | + | if rows.is_empty() { | |
| 542 | + | return; | |
| 543 | + | } | |
| 544 | + | let take = DASHBOARD_TOP_KEYS.min(rows.len()); | |
| 545 | + | view.more_keys = rows.len() > DASHBOARD_TOP_KEYS; | |
| 546 | + | view.top_keys = rows | |
| 547 | + | .iter() | |
| 548 | + | .take(take) | |
| 549 | + | .map(|(k, bytes)| { | |
| 550 | + | super::dashboard::KeyUsageRow::from_usage(k.clone(), *bytes, gb_per_key as u32) | |
| 551 | + | }) | |
| 552 | + | .collect(); | |
| 553 | + | } | |
| 554 | + | ||
| 555 | + | impl super::dashboard::KeyUsageRow { | |
| 556 | + | /// Build a row given the per-key allotment in GB. `pct` is computed | |
| 557 | + | /// against `gb_per_key × 1 GiB` and clamped to `[0, 200]` for display. | |
| 558 | + | pub fn from_usage(key: String, bytes_stored: i64, gb_per_key: u32) -> Self { | |
| 559 | + | use crate::helpers::format_bytes; | |
| 560 | + | use crate::synckit_billing::storage_cap_bytes; | |
| 561 | + | let limit = storage_cap_bytes(gb_per_key); | |
| 562 | + | let pct = if limit > 0 { | |
| 563 | + | ((bytes_stored as f64 / limit as f64) * 100.0).round() as i32 | |
| 564 | + | } else { | |
| 565 | + | 0 | |
| 566 | + | }; | |
| 567 | + | let pct = pct.clamp(0, 200); | |
| 568 | + | Self { | |
| 569 | + | key, | |
| 570 | + | bytes_stored, | |
| 571 | + | pct, | |
| 572 | + | tier: gauge_tier(pct), | |
| 573 | + | display: format!("{} / {} GB", format_bytes(bytes_stored), gb_per_key), | |
| 574 | + | } | |
| 575 | + | } | |
| 576 | + | } | |
| 577 | + | ||
| 493 | 578 | impl super::dashboard::SyncAppBillingView { | |
| 494 | 579 | pub fn from_db(b: &db::DbSyncAppBilling) -> Self { | |
| 495 | 580 | use crate::helpers::format_bytes; | |
| @@ -587,6 +672,11 @@ impl super::dashboard::SyncAppBillingView { | |||
| 587 | 672 | default_storage_gb: b.storage_gb_cap.unwrap_or(10), | |
| 588 | 673 | default_key_cap: b.key_cap.unwrap_or(100), | |
| 589 | 674 | default_gb_per_key: b.gb_per_key.unwrap_or(1), | |
| 675 | + | // Populated by the dashboard route for `per_key` mode after this | |
| 676 | + | // call; default is empty so `from_db` stays usable in contexts | |
| 677 | + | // that don't need the breakdown (API responses, tests). | |
| 678 | + | top_keys: Vec::new(), | |
| 679 | + | more_keys: false, | |
| 590 | 680 | } | |
| 591 | 681 | } | |
| 592 | 682 | } |
| @@ -138,6 +138,30 @@ pub struct SyncAppBillingView { | |||
| 138 | 138 | pub default_storage_gb: i32, | |
| 139 | 139 | pub default_key_cap: i32, | |
| 140 | 140 | pub default_gb_per_key: i32, | |
| 141 | + | /// Top per-key gauges, shown in `per_key` mode under the app-aggregate | |
| 142 | + | /// storage gauge. Empty for `bulk` mode and for per_key apps that have | |
| 143 | + | /// not yet recorded any storage. Pre-sorted descending by usage; capped | |
| 144 | + | /// at the top-N most loaded keys so a 1000-key app doesn't render 1000 | |
| 145 | + | /// gauges. | |
| 146 | + | pub top_keys: Vec<KeyUsageRow>, | |
| 147 | + | /// `true` when `top_keys` was truncated — drives the "Show all" hint | |
| 148 | + | /// in the dashboard partial. The full list is still available via | |
| 149 | + | /// `POST /api/sync/keys/list`. | |
| 150 | + | pub more_keys: bool, | |
| 151 | + | } | |
| 152 | + | ||
| 153 | + | /// One row in the per-key usage list rendered in the SyncKit billing panel. | |
| 154 | + | #[derive(Clone)] | |
| 155 | + | #[allow(dead_code)] // Fields used by Askama template | |
| 156 | + | pub struct KeyUsageRow { | |
| 157 | + | pub key: String, | |
| 158 | + | pub bytes_stored: i64, | |
| 159 | + | /// `bytes_stored / per_key_limit × 100`, clamped to `[0, 200]`. | |
| 160 | + | pub pct: i32, | |
| 161 | + | /// `""`, `"warn"`, `"danger"` — same gauge tiering as the app gauge. | |
| 162 | + | pub tier: &'static str, | |
| 163 | + | /// e.g. "320 MB / 1 GB". | |
| 164 | + | pub display: String, | |
| 141 | 165 | } | |
| 142 | 166 | ||
| 143 | 167 | /// A contact (buyer who shared their email) for the contacts dashboard tab. |
| @@ -22,6 +22,7 @@ pub use dashboard::*; | |||
| 22 | 22 | pub use discover::*; | |
| 23 | 23 | pub use payments::*; | |
| 24 | 24 | pub use user::*; | |
| 25 | + | pub use conversions::{apply_top_keys, build_top_keys_map}; | |
| 25 | 26 | ||
| 26 | 27 | use serde::Serialize; | |
| 27 | 28 |