| 1 |
|
| 2 |
|
| 3 |
use axum::{ |
| 4 |
extract::State, |
| 5 |
http::StatusCode, |
| 6 |
response::{IntoResponse, Response}, |
| 7 |
Json, |
| 8 |
}; |
| 9 |
use serde_json::json; |
| 10 |
|
| 11 |
use crate::{ |
| 12 |
constants, |
| 13 |
db::{self, synckit_billing}, |
| 14 |
error::{AppError, Result, ResultExt}, |
| 15 |
synckit_auth::SyncUser, |
| 16 |
validation, |
| 17 |
AppState, |
| 18 |
}; |
| 19 |
|
| 20 |
use super::{ |
| 21 |
BlobConfirmRequest, BlobDownloadUrlRequest, BlobDownloadUrlResponse, BlobUploadUrlRequest, |
| 22 |
BlobUploadUrlResponse, |
| 23 |
}; |
| 24 |
|
| 25 |
|
| 26 |
|
| 27 |
|
| 28 |
|
| 29 |
|
| 30 |
#[utoipa::path(post, path = "/api/v1/sync/blobs/upload", tag = "SyncKit", |
| 31 |
request_body = BlobUploadUrlRequest, |
| 32 |
responses((status = 200, description = "Pre-signed upload URL", body = BlobUploadUrlResponse)), |
| 33 |
security(("bearer" = [])), |
| 34 |
)] |
| 35 |
#[tracing::instrument(skip_all, name = "synckit::blob_upload_url")] |
| 36 |
pub(super) async fn blob_upload_url( |
| 37 |
State(state): State<AppState>, |
| 38 |
sync_user: SyncUser, |
| 39 |
Json(req): Json<BlobUploadUrlRequest>, |
| 40 |
) -> Result<impl IntoResponse> { |
| 41 |
let synckit_s3 = state |
| 42 |
.synckit_s3 |
| 43 |
.as_ref() |
| 44 |
.ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?; |
| 45 |
|
| 46 |
validation::validate_sync_blob_hash(&req.hash)?; |
| 47 |
if req.size_bytes <= 0 || req.size_bytes > constants::SYNCKIT_MAX_BLOB_SIZE_BYTES { |
| 48 |
return Err(AppError::BadRequest(format!( |
| 49 |
"Blob size must be between 1 and {} bytes", |
| 50 |
constants::SYNCKIT_MAX_BLOB_SIZE_BYTES |
| 51 |
))); |
| 52 |
} |
| 53 |
|
| 54 |
|
| 55 |
|
| 56 |
|
| 57 |
|
| 58 |
|
| 59 |
|
| 60 |
if let Some(_existing) = db::synckit::get_sync_blob_by_hash( |
| 61 |
&state.db, |
| 62 |
sync_user.app_id, |
| 63 |
sync_user.user_id, |
| 64 |
&req.hash, |
| 65 |
) |
| 66 |
.await? |
| 67 |
{ |
| 68 |
return Ok(Json(BlobUploadUrlResponse { |
| 69 |
upload_url: String::new(), |
| 70 |
already_exists: true, |
| 71 |
})); |
| 72 |
} |
| 73 |
|
| 74 |
let s3_key = format!("{}/{}/{}", sync_user.app_id, sync_user.user_id, req.hash); |
| 75 |
|
| 76 |
|
| 77 |
db::pending_uploads::record_pending_upload(&state.db, sync_user.user_id, &s3_key, "synckit").await?; |
| 78 |
|
| 79 |
let upload_url = synckit_s3 |
| 80 |
.presign_upload( |
| 81 |
&s3_key, |
| 82 |
"application/octet-stream", |
| 83 |
Some(constants::SYNCKIT_BLOB_PRESIGN_EXPIRY_SECS), |
| 84 |
None, |
| 85 |
None, |
| 86 |
) |
| 87 |
.await |
| 88 |
.context("presign upload for sync blob")?; |
| 89 |
|
| 90 |
Ok(Json(BlobUploadUrlResponse { |
| 91 |
upload_url, |
| 92 |
already_exists: false, |
| 93 |
})) |
| 94 |
} |
| 95 |
|
| 96 |
|
| 97 |
|
| 98 |
|
| 99 |
|
| 100 |
#[utoipa::path(post, path = "/api/v1/sync/blobs/confirm", tag = "SyncKit", |
| 101 |
request_body = BlobConfirmRequest, |
| 102 |
responses((status = 204, description = "Upload confirmed")), |
| 103 |
security(("bearer" = [])), |
| 104 |
)] |
| 105 |
#[tracing::instrument(skip_all, name = "synckit::blob_confirm_upload")] |
| 106 |
pub(super) async fn blob_confirm_upload( |
| 107 |
State(state): State<AppState>, |
| 108 |
sync_user: SyncUser, |
| 109 |
Json(req): Json<BlobConfirmRequest>, |
| 110 |
) -> Result<Response> { |
| 111 |
let synckit_s3 = state |
| 112 |
.synckit_s3 |
| 113 |
.as_ref() |
| 114 |
.ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?; |
| 115 |
|
| 116 |
validation::validate_sync_blob_hash(&req.hash)?; |
| 117 |
|
| 118 |
|
| 119 |
let billing = synckit_billing::get_app_with_billing(&state.db, sync_user.app_id) |
| 120 |
.await? |
| 121 |
.ok_or(AppError::NotFound)?; |
| 122 |
|
| 123 |
if !billing.is_internal { |
| 124 |
if billing.billing_status != "active" { |
| 125 |
return Ok(( |
| 126 |
StatusCode::PAYMENT_REQUIRED, |
| 127 |
Json(json!({ "reason": "billing_inactive" })), |
| 128 |
) |
| 129 |
.into_response()); |
| 130 |
} |
| 131 |
if let Some(exceeded) = synckit_billing::would_exceed_storage( |
| 132 |
&state.db, sync_user.app_id, &sync_user.key, req.size_bytes, |
| 133 |
).await? { |
| 134 |
let mut body = json!({ |
| 135 |
"reason": "storage_limit_reached", |
| 136 |
"dimension": exceeded.dimension, |
| 137 |
"used": exceeded.used, |
| 138 |
"limit": exceeded.limit, |
| 139 |
}); |
| 140 |
if let Some(k) = &exceeded.key { |
| 141 |
body["key"] = json!(k); |
| 142 |
} |
| 143 |
return Ok((StatusCode::PAYMENT_REQUIRED, Json(body)).into_response()); |
| 144 |
} |
| 145 |
} |
| 146 |
|
| 147 |
let s3_key = format!("{}/{}/{}", sync_user.app_id, sync_user.user_id, req.hash); |
| 148 |
|
| 149 |
|
| 150 |
if !synckit_s3.object_exists(&s3_key).await? { |
| 151 |
return Err(AppError::BadRequest( |
| 152 |
"Blob not found in storage — upload before confirming".to_string(), |
| 153 |
)); |
| 154 |
} |
| 155 |
|
| 156 |
|
| 157 |
db::pending_uploads::remove_pending_upload(&state.db, sync_user.user_id, &s3_key).await?; |
| 158 |
|
| 159 |
|
| 160 |
db::synckit::create_sync_blob_idempotent( |
| 161 |
&state.db, |
| 162 |
sync_user.app_id, |
| 163 |
sync_user.user_id, |
| 164 |
&req.hash, |
| 165 |
req.size_bytes, |
| 166 |
&s3_key, |
| 167 |
&sync_user.key, |
| 168 |
) |
| 169 |
.await?; |
| 170 |
|
| 171 |
|
| 172 |
|
| 173 |
|
| 174 |
|
| 175 |
if !billing.is_internal |
| 176 |
&& let Err(e) = synckit_billing::add_bytes_stored( |
| 177 |
&state.db, sync_user.app_id, &sync_user.key, req.size_bytes, |
| 178 |
).await { |
| 179 |
tracing::error!(error = ?e, app_id = %sync_user.app_id, "failed to bump bytes_stored"); |
| 180 |
} |
| 181 |
|
| 182 |
Ok(StatusCode::NO_CONTENT.into_response()) |
| 183 |
} |
| 184 |
|
| 185 |
|
| 186 |
#[utoipa::path(post, path = "/api/v1/sync/blobs/download", tag = "SyncKit", |
| 187 |
request_body = BlobDownloadUrlRequest, |
| 188 |
responses((status = 200, description = "Pre-signed download URL", body = BlobDownloadUrlResponse), (status = 404, description = "Blob not found")), |
| 189 |
security(("bearer" = [])), |
| 190 |
)] |
| 191 |
#[tracing::instrument(skip_all, name = "synckit::blob_download_url")] |
| 192 |
pub(super) async fn blob_download_url( |
| 193 |
State(state): State<AppState>, |
| 194 |
sync_user: SyncUser, |
| 195 |
Json(req): Json<BlobDownloadUrlRequest>, |
| 196 |
) -> Result<Response> { |
| 197 |
let synckit_s3 = state |
| 198 |
.synckit_s3 |
| 199 |
.as_ref() |
| 200 |
.ok_or_else(|| AppError::ServiceUnavailable("SyncKit blob storage is not configured".to_string()))?; |
| 201 |
|
| 202 |
validation::validate_sync_blob_hash(&req.hash)?; |
| 203 |
|
| 204 |
let blob = db::synckit::get_sync_blob_by_hash( |
| 205 |
&state.db, |
| 206 |
sync_user.app_id, |
| 207 |
sync_user.user_id, |
| 208 |
&req.hash, |
| 209 |
) |
| 210 |
.await? |
| 211 |
.ok_or(AppError::NotFound)?; |
| 212 |
|
| 213 |
|
| 214 |
|
| 215 |
|
| 216 |
let billing = synckit_billing::get_app_with_billing(&state.db, sync_user.app_id) |
| 217 |
.await? |
| 218 |
.ok_or(AppError::NotFound)?; |
| 219 |
if !billing.is_internal { |
| 220 |
if billing.billing_status != "active" { |
| 221 |
return Ok(( |
| 222 |
StatusCode::PAYMENT_REQUIRED, |
| 223 |
Json(json!({ "reason": "billing_inactive" })), |
| 224 |
) |
| 225 |
.into_response()); |
| 226 |
} |
| 227 |
|
| 228 |
|
| 229 |
|
| 230 |
|
| 231 |
if let Err(e) = synckit_billing::add_bytes_egress( |
| 232 |
&state.db, sync_user.app_id, blob.size_bytes, |
| 233 |
).await { |
| 234 |
tracing::error!(error = ?e, app_id = %sync_user.app_id, "failed to bump bytes_egress_period"); |
| 235 |
} |
| 236 |
} |
| 237 |
|
| 238 |
let download_url = synckit_s3 |
| 239 |
.presign_download( |
| 240 |
&blob.s3_key, |
| 241 |
Some(constants::SYNCKIT_BLOB_PRESIGN_EXPIRY_SECS), |
| 242 |
) |
| 243 |
.await |
| 244 |
.context("presign download for sync blob")?; |
| 245 |
|
| 246 |
Ok(Json(BlobDownloadUrlResponse { download_url }).into_response()) |
| 247 |
} |
| 248 |
|
| 249 |
|
| 250 |
|
| 251 |
|
| 252 |
|
| 253 |
|
| 254 |
|
| 255 |
|