Skip to main content

max / makenotwork

9.1 KB · 255 lines History Blame Raw
1 //! SyncKit blob storage: presigned upload/download URLs and upload confirmation.
2
3 use axum::{
4 extract::State,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7 Json,
8 };
9 use serde_json::json;
10
11 use crate::{
12 constants,
13 db::{self, synckit_billing},
14 error::{AppError, Result, ResultExt},
15 synckit_auth::SyncUser,
16 validation,
17 AppState,
18 };
19
20 use super::{
21 BlobConfirmRequest, BlobDownloadUrlRequest, BlobDownloadUrlResponse, BlobUploadUrlRequest,
22 BlobUploadUrlResponse,
23 };
24
25 /// Request a pre-signed S3 upload URL for a blob.
26 ///
27 /// Content-addressed by hash: if a blob with the same hash already exists
28 /// for this user/app, returns `already_exists: true` and an empty URL,
29 /// skipping the upload.
30 #[utoipa::path(post, path = "/api/v1/sync/blobs/upload", tag = "SyncKit",
31 request_body = BlobUploadUrlRequest,
32 responses((status = 200, description = "Pre-signed upload URL", body = BlobUploadUrlResponse)),
33 security(("bearer" = [])),
34 )]
35 #[tracing::instrument(skip_all, name = "synckit::blob_upload_url")]
36 pub(super) async fn blob_upload_url(
37 State(state): State<AppState>,
38 sync_user: SyncUser,
39 Json(req): Json<BlobUploadUrlRequest>,
40 ) -> Result<impl IntoResponse> {
41 let synckit_s3 = state
42 .synckit_s3
43 .as_ref()
44 .ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?;
45
46 validation::validate_sync_blob_hash(&req.hash)?;
47 if req.size_bytes <= 0 || req.size_bytes > constants::SYNCKIT_MAX_BLOB_SIZE_BYTES {
48 return Err(AppError::BadRequest(format!(
49 "Blob size must be between 1 and {} bytes",
50 constants::SYNCKIT_MAX_BLOB_SIZE_BYTES
51 )));
52 }
53
54 // Cap enforcement happens at confirm time (when we know the upload
55 // succeeded). Presign returns a URL even when the upload would later be
56 // rejected — cheap, and avoids leaking cap state to unauthenticated
57 // S3 calls. See blob_confirm_upload for the gate.
58
59 // Check dedup — if this hash already exists, skip upload
60 if let Some(_existing) = db::synckit::get_sync_blob_by_hash(
61 &state.db,
62 sync_user.app_id,
63 sync_user.user_id,
64 &req.hash,
65 )
66 .await?
67 {
68 return Ok(Json(BlobUploadUrlResponse {
69 upload_url: String::new(),
70 already_exists: true,
71 }));
72 }
73
74 let s3_key = format!("{}/{}/{}", sync_user.app_id, sync_user.user_id, req.hash);
75
76 // Track the pending upload so the reaper can clean it up if never confirmed
77 db::pending_uploads::record_pending_upload(&state.db, sync_user.user_id, &s3_key, "synckit").await?;
78
79 let upload_url = synckit_s3
80 .presign_upload(
81 &s3_key,
82 "application/octet-stream",
83 Some(constants::SYNCKIT_BLOB_PRESIGN_EXPIRY_SECS),
84 None,
85 None,
86 )
87 .await
88 .context("presign upload for sync blob")?;
89
90 Ok(Json(BlobUploadUrlResponse {
91 upload_url,
92 already_exists: false,
93 }))
94 }
95
96 /// Confirm that a blob upload to S3 completed successfully.
97 ///
98 /// Verifies the object exists in S3, then records it in the database.
99 /// Idempotent: returns success without creating a duplicate.
100 #[utoipa::path(post, path = "/api/v1/sync/blobs/confirm", tag = "SyncKit",
101 request_body = BlobConfirmRequest,
102 responses((status = 204, description = "Upload confirmed")),
103 security(("bearer" = [])),
104 )]
105 #[tracing::instrument(skip_all, name = "synckit::blob_confirm_upload")]
106 pub(super) async fn blob_confirm_upload(
107 State(state): State<AppState>,
108 sync_user: SyncUser,
109 Json(req): Json<BlobConfirmRequest>,
110 ) -> Result<Response> {
111 let synckit_s3 = state
112 .synckit_s3
113 .as_ref()
114 .ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?;
115
116 validation::validate_sync_blob_hash(&req.hash)?;
117
118 // Billing + cap enforcement (skipped entirely for internal apps).
119 let billing = synckit_billing::get_app_with_billing(&state.db, sync_user.app_id)
120 .await?
121 .ok_or(AppError::NotFound)?;
122
123 if !billing.is_internal {
124 if billing.billing_status != "active" {
125 return Ok((
126 StatusCode::PAYMENT_REQUIRED,
127 Json(json!({ "reason": "billing_inactive" })),
128 )
129 .into_response());
130 }
131 if let Some(exceeded) = synckit_billing::would_exceed_storage(
132 &state.db, sync_user.app_id, &sync_user.key, req.size_bytes,
133 ).await? {
134 let mut body = json!({
135 "reason": "storage_limit_reached",
136 "dimension": exceeded.dimension,
137 "used": exceeded.used,
138 "limit": exceeded.limit,
139 });
140 if let Some(k) = &exceeded.key {
141 body["key"] = json!(k);
142 }
143 return Ok((StatusCode::PAYMENT_REQUIRED, Json(body)).into_response());
144 }
145 }
146
147 let s3_key = format!("{}/{}/{}", sync_user.app_id, sync_user.user_id, req.hash);
148
149 // Verify the object actually exists in S3
150 if !synckit_s3.object_exists(&s3_key).await? {
151 return Err(AppError::BadRequest(
152 "Blob not found in storage — upload before confirming".to_string(),
153 ));
154 }
155
156 // Clear the pending upload record now that the upload is confirmed
157 db::pending_uploads::remove_pending_upload(&state.db, sync_user.user_id, &s3_key).await?;
158
159 // Record in DB (idempotent — ON CONFLICT DO NOTHING if already exists)
160 db::synckit::create_sync_blob_idempotent(
161 &state.db,
162 sync_user.app_id,
163 sync_user.user_id,
164 &req.hash,
165 req.size_bytes,
166 &s3_key,
167 &sync_user.key,
168 )
169 .await?;
170
171 // Update the rolling storage counter (app-level + per-key). We don't fail
172 // the request if this breaks — the weekly drift correction job will
173 // reconcile from sync_blobs. Skip for internal apps to keep the counter
174 // at 0 there.
175 if !billing.is_internal
176 && let Err(e) = synckit_billing::add_bytes_stored(
177 &state.db, sync_user.app_id, &sync_user.key, req.size_bytes,
178 ).await {
179 tracing::error!(error = ?e, app_id = %sync_user.app_id, "failed to bump bytes_stored");
180 }
181
182 Ok(StatusCode::NO_CONTENT.into_response())
183 }
184
185 /// Request a pre-signed S3 download URL for a blob by hash.
186 #[utoipa::path(post, path = "/api/v1/sync/blobs/download", tag = "SyncKit",
187 request_body = BlobDownloadUrlRequest,
188 responses((status = 200, description = "Pre-signed download URL", body = BlobDownloadUrlResponse), (status = 404, description = "Blob not found")),
189 security(("bearer" = [])),
190 )]
191 #[tracing::instrument(skip_all, name = "synckit::blob_download_url")]
192 pub(super) async fn blob_download_url(
193 State(state): State<AppState>,
194 sync_user: SyncUser,
195 Json(req): Json<BlobDownloadUrlRequest>,
196 ) -> Result<Response> {
197 let synckit_s3 = state
198 .synckit_s3
199 .as_ref()
200 .ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?;
201
202 validation::validate_sync_blob_hash(&req.hash)?;
203
204 let blob = db::synckit::get_sync_blob_by_hash(
205 &state.db,
206 sync_user.app_id,
207 sync_user.user_id,
208 &req.hash,
209 )
210 .await?
211 .ok_or(AppError::NotFound)?;
212
213 // Billing check (internal apps bypass). Egress is NOT enforced — it's a
214 // free metric for the developer's dashboard, absorbed in the storage rate
215 // margin. We still count it at presign time so devs see the stat.
216 let billing = synckit_billing::get_app_with_billing(&state.db, sync_user.app_id)
217 .await?
218 .ok_or(AppError::NotFound)?;
219 if !billing.is_internal {
220 if billing.billing_status != "active" {
221 return Ok((
222 StatusCode::PAYMENT_REQUIRED,
223 Json(json!({ "reason": "billing_inactive" })),
224 )
225 .into_response());
226 }
227 // Count egress optimistically at presign time. The client may not
228 // actually download (retries that hit dedup-cached content, for
229 // example), so this overcounts slightly. Acceptable for a free
230 // dashboard metric.
231 if let Err(e) = synckit_billing::add_bytes_egress(
232 &state.db, sync_user.app_id, blob.size_bytes,
233 ).await {
234 tracing::error!(error = ?e, app_id = %sync_user.app_id, "failed to bump bytes_egress_period");
235 }
236 }
237
238 let download_url = synckit_s3
239 .presign_download(
240 &blob.s3_key,
241 Some(constants::SYNCKIT_BLOB_PRESIGN_EXPIRY_SECS),
242 )
243 .await
244 .context("presign download for sync blob")?;
245
246 Ok(Json(BlobDownloadUrlResponse { download_url }).into_response())
247 }
248
249 // NOTE: No active blob-delete API path was found in v1. Storage counters can
250 // only grow (until period rollover or manual reset via reset_period_usage,
251 // which only resets egress, not storage). The weekly drift correction job in
252 // `db::synckit_billing::recalculate_synckit_app_storage` reconciles
253 // `bytes_stored` against `sync_blobs` and is the only way storage can shrink
254 // without DB intervention. Known limitation; revisit when blob deletion ships.
255