Skip to main content

max / makenotwork

2.7 KB · 106 lines History Blame Raw
1 use sqlx::PgPool;
2
3 use crate::db::models::*;
4 use crate::db::{SyncAppId, UserId};
5 use crate::error::Result;
6
7 // ── Sync Blobs ──
8
9 /// Get a blob by content hash for a user within an app.
10 #[tracing::instrument(skip_all)]
11 pub async fn get_sync_blob_by_hash(
12 pool: &PgPool,
13 app_id: SyncAppId,
14 user_id: UserId,
15 hash: &str,
16 ) -> Result<Option<DbSyncBlob>> {
17 let blob = sqlx::query_as::<_, DbSyncBlob>(
18 "SELECT * FROM sync_blobs WHERE app_id = $1 AND user_id = $2 AND hash = $3",
19 )
20 .bind(app_id)
21 .bind(user_id)
22 .bind(hash)
23 .fetch_optional(pool)
24 .await?;
25
26 Ok(blob)
27 }
28
29 /// Insert a sync blob, updating size on conflict (idempotent).
30 ///
31 /// Uses `ON CONFLICT DO UPDATE` to keep `size_bytes` consistent with the
32 /// actual S3 object when concurrent uploads race. `key` is the developer-
33 /// defined SDK key from the session JWT; it never changes on re-upload of
34 /// the same hash so the conflict path leaves it alone.
35 #[tracing::instrument(skip_all)]
36 pub async fn create_sync_blob_idempotent(
37 pool: &PgPool,
38 app_id: SyncAppId,
39 user_id: UserId,
40 hash: &str,
41 size_bytes: i64,
42 s3_key: &str,
43 key: &str,
44 ) -> Result<()> {
45 sqlx::query(
46 r#"
47 INSERT INTO sync_blobs (app_id, user_id, hash, size_bytes, s3_key, key)
48 VALUES ($1, $2, $3, $4, $5, $6)
49 ON CONFLICT (app_id, user_id, hash)
50 DO UPDATE SET size_bytes = EXCLUDED.size_bytes
51 "#,
52 )
53 .bind(app_id)
54 .bind(user_id)
55 .bind(hash)
56 .bind(size_bytes)
57 .bind(s3_key)
58 .bind(key)
59 .execute(pool)
60 .await?;
61
62 Ok(())
63 }
64
65 /// Get total blob storage used by a user for an app (in bytes).
66 ///
67 /// Currently unused at the call-site level (Phase 5 moved cap enforcement to
68 /// the app-level counters in `sync_app_usage_current`), but kept around for
69 /// the eventual per-user "what's taking up space" dashboard view.
70 #[allow(dead_code)]
71 #[tracing::instrument(skip_all)]
72 pub async fn get_blob_storage_used(
73 pool: &PgPool,
74 app_id: SyncAppId,
75 user_id: UserId,
76 ) -> Result<i64> {
77 let total: Option<i64> = sqlx::query_scalar(
78 "SELECT SUM(size_bytes)::BIGINT FROM sync_blobs WHERE app_id = $1 AND user_id = $2",
79 )
80 .bind(app_id)
81 .bind(user_id)
82 .fetch_one(pool)
83 .await?;
84
85 Ok(total.unwrap_or(0))
86 }
87
88 /// Count devices registered for a user/app pair.
89 #[tracing::instrument(skip_all)]
90 pub async fn count_sync_devices(
91 pool: &PgPool,
92 app_id: SyncAppId,
93 user_id: UserId,
94 ) -> Result<i64> {
95 let count: i64 = sqlx::query_scalar(
96 "SELECT COUNT(*) FROM sync_devices WHERE app_id = $1 AND user_id = $2",
97 )
98 .bind(app_id)
99 .bind(user_id)
100 .fetch_one(pool)
101 .await?;
102
103 Ok(count)
104 }
105
106