//! SyncKit blob storage: presigned upload/download URLs and upload confirmation. use axum::{ extract::State, http::StatusCode, response::{IntoResponse, Response}, Json, }; use serde_json::json; use crate::{ constants, db::{self, synckit_billing}, error::{AppError, Result, ResultExt}, synckit_auth::SyncUser, validation, AppState, }; use super::{ BlobConfirmRequest, BlobDownloadUrlRequest, BlobDownloadUrlResponse, BlobUploadUrlRequest, BlobUploadUrlResponse, }; /// Request a pre-signed S3 upload URL for a blob. /// /// Content-addressed by hash: if a blob with the same hash already exists /// for this user/app, returns `already_exists: true` and an empty URL, /// skipping the upload. #[utoipa::path(post, path = "/api/v1/sync/blobs/upload", tag = "SyncKit", request_body = BlobUploadUrlRequest, responses((status = 200, description = "Pre-signed upload URL", body = BlobUploadUrlResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::blob_upload_url")] pub(super) async fn blob_upload_url( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { let synckit_s3 = state .synckit_s3 .as_ref() .ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?; validation::validate_sync_blob_hash(&req.hash)?; if req.size_bytes <= 0 || req.size_bytes > constants::SYNCKIT_MAX_BLOB_SIZE_BYTES { return Err(AppError::BadRequest(format!( "Blob size must be between 1 and {} bytes", constants::SYNCKIT_MAX_BLOB_SIZE_BYTES ))); } // Cap enforcement happens at confirm time (when we know the upload // succeeded). Presign returns a URL even when the upload would later be // rejected — cheap, and avoids leaking cap state to unauthenticated // S3 calls. See blob_confirm_upload for the gate. // Check dedup — if this hash already exists, skip upload if let Some(_existing) = db::synckit::get_sync_blob_by_hash( &state.db, sync_user.app_id, sync_user.user_id, &req.hash, ) .await? { return Ok(Json(BlobUploadUrlResponse { upload_url: String::new(), already_exists: true, })); } let s3_key = format!("{}/{}/{}", sync_user.app_id, sync_user.user_id, req.hash); // Track the pending upload so the reaper can clean it up if never confirmed db::pending_uploads::record_pending_upload(&state.db, sync_user.user_id, &s3_key, "synckit").await?; let upload_url = synckit_s3 .presign_upload( &s3_key, "application/octet-stream", Some(constants::SYNCKIT_BLOB_PRESIGN_EXPIRY_SECS), None, None, ) .await .context("presign upload for sync blob")?; Ok(Json(BlobUploadUrlResponse { upload_url, already_exists: false, })) } /// Confirm that a blob upload to S3 completed successfully. /// /// Verifies the object exists in S3, then records it in the database. /// Idempotent: returns success without creating a duplicate. #[utoipa::path(post, path = "/api/v1/sync/blobs/confirm", tag = "SyncKit", request_body = BlobConfirmRequest, responses((status = 204, description = "Upload confirmed")), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::blob_confirm_upload")] pub(super) async fn blob_confirm_upload( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { let synckit_s3 = state .synckit_s3 .as_ref() .ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?; validation::validate_sync_blob_hash(&req.hash)?; // Billing + cap enforcement (skipped entirely for internal apps). let billing = synckit_billing::get_app_with_billing(&state.db, sync_user.app_id) .await? .ok_or(AppError::NotFound)?; if !billing.is_internal { if billing.billing_status != "active" { return Ok(( StatusCode::PAYMENT_REQUIRED, Json(json!({ "reason": "billing_inactive" })), ) .into_response()); } if let Some(exceeded) = synckit_billing::would_exceed_storage( &state.db, sync_user.app_id, &sync_user.key, req.size_bytes, ).await? { let mut body = json!({ "reason": "storage_limit_reached", "dimension": exceeded.dimension, "used": exceeded.used, "limit": exceeded.limit, }); if let Some(k) = &exceeded.key { body["key"] = json!(k); } return Ok((StatusCode::PAYMENT_REQUIRED, Json(body)).into_response()); } } let s3_key = format!("{}/{}/{}", sync_user.app_id, sync_user.user_id, req.hash); // Verify the object actually exists in S3 if !synckit_s3.object_exists(&s3_key).await? { return Err(AppError::BadRequest( "Blob not found in storage — upload before confirming".to_string(), )); } // Clear the pending upload record now that the upload is confirmed db::pending_uploads::remove_pending_upload(&state.db, sync_user.user_id, &s3_key).await?; // Record in DB (idempotent — ON CONFLICT DO NOTHING if already exists) db::synckit::create_sync_blob_idempotent( &state.db, sync_user.app_id, sync_user.user_id, &req.hash, req.size_bytes, &s3_key, &sync_user.key, ) .await?; // Update the rolling storage counter (app-level + per-key). We don't fail // the request if this breaks — the weekly drift correction job will // reconcile from sync_blobs. Skip for internal apps to keep the counter // at 0 there. if !billing.is_internal && let Err(e) = synckit_billing::add_bytes_stored( &state.db, sync_user.app_id, &sync_user.key, req.size_bytes, ).await { tracing::error!(error = ?e, app_id = %sync_user.app_id, "failed to bump bytes_stored"); } Ok(StatusCode::NO_CONTENT.into_response()) } /// Request a pre-signed S3 download URL for a blob by hash. #[utoipa::path(post, path = "/api/v1/sync/blobs/download", tag = "SyncKit", request_body = BlobDownloadUrlRequest, responses((status = 200, description = "Pre-signed download URL", body = BlobDownloadUrlResponse), (status = 404, description = "Blob not found")), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::blob_download_url")] pub(super) async fn blob_download_url( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { let synckit_s3 = state .synckit_s3 .as_ref() .ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?; validation::validate_sync_blob_hash(&req.hash)?; let blob = db::synckit::get_sync_blob_by_hash( &state.db, sync_user.app_id, sync_user.user_id, &req.hash, ) .await? .ok_or(AppError::NotFound)?; // Billing check (internal apps bypass). Egress is NOT enforced — it's a // free metric for the developer's dashboard, absorbed in the storage rate // margin. We still count it at presign time so devs see the stat. let billing = synckit_billing::get_app_with_billing(&state.db, sync_user.app_id) .await? .ok_or(AppError::NotFound)?; if !billing.is_internal { if billing.billing_status != "active" { return Ok(( StatusCode::PAYMENT_REQUIRED, Json(json!({ "reason": "billing_inactive" })), ) .into_response()); } // Count egress optimistically at presign time. The client may not // actually download (retries that hit dedup-cached content, for // example), so this overcounts slightly. Acceptable for a free // dashboard metric. if let Err(e) = synckit_billing::add_bytes_egress( &state.db, sync_user.app_id, blob.size_bytes, ).await { tracing::error!(error = ?e, app_id = %sync_user.app_id, "failed to bump bytes_egress_period"); } } let download_url = synckit_s3 .presign_download( &blob.s3_key, Some(constants::SYNCKIT_BLOB_PRESIGN_EXPIRY_SECS), ) .await .context("presign download for sync blob")?; Ok(Json(BlobDownloadUrlResponse { download_url }).into_response()) } // NOTE: No active blob-delete API path was found in v1. Storage counters can // only grow (until period rollover or manual reset via reset_period_usage, // which only resets egress, not storage). The weekly drift correction job in // `db::synckit_billing::recalculate_synckit_app_storage` reconciles // `bytes_stored` against `sync_blobs` and is the only way storage can shrink // without DB intervention. Known limitation; revisit when blob deletion ships.