Skip to main content

max / makenotwork

9.1 KB · 288 lines History Blame Raw
1 //! Internal upload pipeline: presigned URL generation, upload confirmation, and storage usage.
2
3 use axum::{
4 extract::{Query, State},
5 response::IntoResponse,
6 Json,
7 };
8 use serde::{Deserialize, Serialize};
9 use std::str::FromStr;
10
11 use crate::{
12 auth::ServiceAuth,
13 db::{self, ItemId, UserId},
14 error::{AppError, Result},
15 storage::{FileType, S3Client, CACHE_CONTROL_IMMUTABLE},
16 AppState,
17 };
18
19 // ── Presign upload (for CLI upload pipeline) ──
20
21 #[derive(Deserialize)]
22 pub(super) struct InternalPresignRequest {
23 user_id: UserId,
24 item_id: ItemId,
25 file_type: String,
26 file_name: String,
27 content_type: String,
28 }
29
30 #[derive(Serialize)]
31 struct InternalPresignResponse {
32 upload_url: String,
33 s3_key: String,
34 expires_in: u64,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 cache_control: Option<String>,
37 }
38
39 /// POST /api/internal/upload/presign
40 ///
41 /// Generate a presigned S3 upload URL. Used by the CLI upload pipeline.
42 #[tracing::instrument(skip_all, name = "internal::presign_upload")]
43 pub(super) async fn presign_upload(
44 State(state): State<AppState>,
45 _auth: ServiceAuth,
46 Json(req): Json<InternalPresignRequest>,
47 ) -> Result<impl IntoResponse> {
48 let s3 = state.require_s3()?;
49
50 let file_type = FileType::from_str(&req.file_type)
51 .map_err(|_| AppError::BadRequest(format!("Invalid file type: {}", req.file_type)))?;
52
53 S3Client::validate_content_type(file_type, &req.content_type)?;
54 S3Client::validate_extension(file_type, &req.file_name)?;
55
56 // Verify user owns the item
57 let owner = db::items::get_item_owner(&state.db, req.item_id)
58 .await?
59 .ok_or(AppError::NotFound)?;
60 if owner != req.user_id {
61 return Err(AppError::Forbidden);
62 }
63
64 // Early quota check
65 db::creator_tiers::check_presign_allowed(&state.db, req.user_id, file_type).await?;
66
67 let s3_key = S3Client::generate_key(req.user_id, req.item_id, file_type, &req.file_name);
68
69 // Track the pending upload so the reaper can clean it up if never confirmed
70 db::pending_uploads::record_pending_upload(&state.db, req.user_id, &s3_key, "main").await?;
71
72 let expires_in = 3600;
73 let upload_url = s3
74 .presign_upload(
75 &s3_key,
76 &req.content_type,
77 Some(expires_in),
78 Some(CACHE_CONTROL_IMMUTABLE),
79 None,
80 )
81 .await?;
82
83 Ok(Json(InternalPresignResponse {
84 upload_url,
85 s3_key,
86 expires_in,
87 cache_control: Some(CACHE_CONTROL_IMMUTABLE.to_string()),
88 }))
89 }
90
91 // ── Confirm upload (for CLI upload pipeline) ──
92
93 #[derive(Deserialize)]
94 pub(super) struct InternalConfirmRequest {
95 user_id: UserId,
96 item_id: ItemId,
97 file_type: String,
98 s3_key: String,
99 }
100
101 #[derive(Serialize)]
102 struct InternalConfirmResponse {
103 success: bool,
104 }
105
106 /// POST /api/internal/upload/confirm
107 ///
108 /// Confirm a completed S3 upload: verify, scan, update DB. Used by the CLI upload pipeline.
109 #[tracing::instrument(skip_all, name = "internal::confirm_upload")]
110 pub(super) async fn confirm_upload(
111 State(state): State<AppState>,
112 _auth: ServiceAuth,
113 Json(req): Json<InternalConfirmRequest>,
114 ) -> Result<impl IntoResponse> {
115 let s3 = state.require_s3()?;
116
117 let file_type = FileType::from_str(&req.file_type)
118 .map_err(|_| AppError::BadRequest(format!("Invalid file type: {}", req.file_type)))?;
119
120 // Verify user owns the item
121 let owner = db::items::get_item_owner(&state.db, req.item_id)
122 .await?
123 .ok_or(AppError::NotFound)?;
124 if owner != req.user_id {
125 return Err(AppError::Forbidden);
126 }
127
128 // Validate S3 key belongs to this user + item (prevent cross-user file reference)
129 let expected_prefix = format!("{}/{}/", req.user_id, req.item_id);
130 if !req.s3_key.starts_with(&expected_prefix) {
131 return Err(AppError::BadRequest("Invalid upload key".to_string()));
132 }
133
134 // Verify the object exists in S3
135 if !s3.object_exists(&req.s3_key).await? {
136 return Err(AppError::BadRequest(
137 "Upload not found. Please try uploading again.".to_string(),
138 ));
139 }
140
141 // Enforce file size limit
142 let file_size_bytes = s3.object_size(&req.s3_key).await?.ok_or_else(|| {
143 AppError::BadRequest("Could not determine file size. Please try uploading again.".to_string())
144 })?;
145 if file_size_bytes as u64 > file_type.max_size() {
146 s3.delete_object(&req.s3_key).await.ok();
147 return Err(AppError::BadRequest(format!(
148 "File exceeds maximum size of {} MB",
149 file_type.max_size() / (1024 * 1024)
150 )));
151 }
152
153 // Enforce tier-based limits
154 let max_storage = match db::creator_tiers::check_upload_allowed(
155 &state.db,
156 req.user_id,
157 file_type,
158 file_size_bytes,
159 )
160 .await
161 {
162 Ok(max) => max,
163 Err(e) => {
164 s3.delete_object(&req.s3_key).await.ok();
165 return Err(e);
166 }
167 };
168
169 // Reject unsupported file types BEFORE any side effect — same ordering rule
170 // as the web upload handlers (see routes/storage/mod.rs::commit_upload).
171 if !matches!(file_type, FileType::Audio | FileType::Download | FileType::Video) {
172 s3.delete_object(&req.s3_key).await.ok();
173 return Err(AppError::BadRequest(
174 "CLI upload only supports audio, video, and download file types".to_string(),
175 ));
176 }
177
178 // Increment storage BEFORE writing the DB record (if quota exceeded, the
179 // S3 object is cleaned up and no DB record is created).
180 if let Err(e) = db::creator_tiers::try_increment_storage(&state.db, req.user_id, file_size_bytes, max_storage).await {
181 s3.delete_object(&req.s3_key).await.ok();
182 return Err(e);
183 }
184
185 db::pending_uploads::remove_pending_upload(&state.db, req.user_id, &req.s3_key).await?;
186
187 // Update the database with S3 key and file size.
188 let file_name = req.s3_key.rsplit('/').next().map(|s| s.to_string());
189 let commit_target = match file_type {
190 FileType::Audio => {
191 db::items::update_item_audio_s3_key(&state.db, req.item_id, req.user_id, &req.s3_key).await?;
192 db::items::update_item_audio_file_size(&state.db, req.item_id, req.user_id, file_size_bytes)
193 .await?;
194 crate::routes::storage::CommitTarget::Item(req.item_id)
195 }
196 FileType::Download => {
197 let version = db::versions::create_version(
198 &state.db,
199 req.item_id,
200 "1.0",
201 None,
202 Some(&req.s3_key),
203 Some(file_size_bytes),
204 file_name.as_deref(),
205 None,
206 )
207 .await?;
208 crate::routes::storage::CommitTarget::Version(version.id)
209 }
210 FileType::Video => {
211 db::items::update_item_video_s3_key(&state.db, req.item_id, req.user_id, &req.s3_key).await?;
212 db::items::update_item_video_file_size(&state.db, req.item_id, req.user_id, file_size_bytes)
213 .await?;
214 crate::routes::storage::CommitTarget::Item(req.item_id)
215 }
216 _ => unreachable!("guarded above"),
217 };
218
219 // Scan enqueue + scan_status flip AFTER the DB writes commit — chronic
220 // ordering invariant enforced via the shared commit_upload helper.
221 let _status = crate::routes::storage::commit_upload(
222 &state,
223 commit_target,
224 &req.s3_key,
225 file_type,
226 req.user_id,
227 file_size_bytes,
228 ).await?;
229
230 // Bump project cache
231 if let Some(item) = db::items::get_item_by_id(&state.db, req.item_id).await?
232 && let Err(e) = db::projects::bump_cache_generation(&state.db, item.project_id).await
233 {
234 tracing::warn!(project_id = %item.project_id, error = ?e, "failed to bump cache generation after upload");
235 }
236
237 tracing::info!(
238 user = %req.user_id,
239 item = %req.item_id,
240 file_type = ?file_type,
241 s3_key = %req.s3_key,
242 size = file_size_bytes,
243 "CLI upload confirmed"
244 );
245
246 Ok(Json(InternalConfirmResponse { success: true }))
247 }
248
249 // ── Storage info ──
250
251 #[derive(Deserialize)]
252 pub(super) struct UserIdQuery {
253 user_id: UserId,
254 }
255
256 #[derive(Serialize)]
257 struct StorageInfoResponse {
258 storage_used_bytes: i64,
259 max_storage_bytes: i64,
260 allows_file_uploads: bool,
261 }
262
263 /// GET /api/internal/creator/storage?user_id={uuid}
264 ///
265 /// Get storage usage and limits for a creator.
266 #[tracing::instrument(skip_all, name = "internal::creator_storage")]
267 pub(super) async fn creator_storage(
268 State(state): State<AppState>,
269 _auth: ServiceAuth,
270 Query(query): Query<UserIdQuery>,
271 ) -> Result<impl IntoResponse> {
272 let used = db::creator_tiers::get_storage_used(&state.db, query.user_id).await?;
273
274 // Resolve effective tier
275 let tier = db::creator_tiers::get_active_creator_tier(&state.db, query.user_id).await?;
276
277 let (max_storage, allows_uploads) = match tier {
278 Some(t) => (t.max_storage_bytes(), t.allows_file_uploads()),
279 None => (0, false),
280 };
281
282 Ok(Json(StorageInfoResponse {
283 storage_used_bytes: used,
284 max_storage_bytes: max_storage,
285 allows_file_uploads: allows_uploads,
286 }))
287 }
288