Skip to main content

max / makenotwork

9.0 KB · 258 lines History Blame Raw
1 //! Presigned upload and confirm handlers for version files.
2
3 use axum::{
4 extract::{Path, State},
5 response::IntoResponse,
6 Json,
7 };
8 use serde::Deserialize;
9
10 use crate::{
11 auth::AuthUser,
12 db::{self, VersionId},
13 error::{AppError, Result, ResultExt},
14 storage::{FileType, S3Client, CACHE_CONTROL_IMMUTABLE},
15 AppState,
16 };
17
18 use super::{commit_upload, CommitTarget, ConfirmUploadResponse, PresignUploadResponse};
19
20 /// JSON input for requesting a presigned version upload URL.
21 #[derive(Debug, Deserialize)]
22 pub struct VersionPresignRequest {
23 pub file_name: String,
24 pub content_type: String,
25 }
26
27 /// JSON input for confirming a completed version upload.
28 #[derive(Debug, Deserialize)]
29 pub struct VersionConfirmRequest {
30 pub s3_key: String,
31 }
32
33 /// Generate a presigned URL for uploading a version file to S3
34 ///
35 /// POST /api/versions/{version_id}/upload/presign
36 ///
37 /// Requires authentication. User must own the item (through version -> item -> project chain).
38 #[tracing::instrument(skip_all, name = "storage::version_presign_upload", fields(%version_id, user_id = %user.id))]
39 pub(super) async fn version_presign_upload(
40 State(state): State<AppState>,
41 AuthUser(user): AuthUser,
42 Path(version_id): Path<VersionId>,
43 Json(req): Json<VersionPresignRequest>,
44 ) -> Result<impl IntoResponse> {
45 user.check_not_suspended()?;
46 let s3 = state.require_s3()?;
47
48 let file_type = FileType::Download;
49
50 // Validate content type and extension
51 S3Client::validate_content_type(file_type, &req.content_type)?;
52 S3Client::validate_extension(file_type, &req.file_name)?;
53
54 // Fetch version and verify ownership through version -> item -> project chain
55 let version = db::versions::get_version_by_id(&state.db, version_id)
56 .await?
57 .ok_or(AppError::NotFound)?;
58
59 let owner = db::items::get_item_owner(&state.db, version.item_id)
60 .await?
61 .ok_or(AppError::NotFound)?;
62
63 if owner != user.id {
64 return Err(AppError::Forbidden);
65 }
66
67 // Early quota check
68 db::creator_tiers::check_presign_allowed(&state.db, user.id, file_type).await?;
69
70 let max_file_bytes = db::creator_tiers::get_effective_max_file_bytes(&state.db, user.id, file_type).await?;
71
72 // Generate S3 key using the version's item_id
73 let s3_key = S3Client::generate_key(user.id, version.item_id, file_type, &req.file_name);
74
75 // Track the pending upload so the reaper can clean it up if never confirmed
76 db::pending_uploads::record_pending_upload(&state.db, user.id, &s3_key, "main").await?;
77
78 let expires_in = 3600;
79 let upload_url = s3.presign_upload(&s3_key, &req.content_type, Some(expires_in), Some(CACHE_CONTROL_IMMUTABLE), None)
80 .await
81 .context("presign upload for version file")?;
82
83 Ok(Json(PresignUploadResponse {
84 upload_url,
85 s3_key,
86 expires_in,
87 cache_control: Some(CACHE_CONTROL_IMMUTABLE.to_string()),
88 max_file_bytes,
89 }))
90 }
91
92 /// Confirm that a version file upload has completed and update the database
93 ///
94 /// POST /api/versions/{version_id}/upload/confirm
95 ///
96 /// Requires authentication. User must own the item.
97 #[tracing::instrument(skip_all, name = "storage::version_confirm_upload", fields(%version_id, user_id = %user.id))]
98 pub(super) async fn version_confirm_upload(
99 State(state): State<AppState>,
100 AuthUser(user): AuthUser,
101 Path(version_id): Path<VersionId>,
102 Json(req): Json<VersionConfirmRequest>,
103 ) -> Result<impl IntoResponse> {
104 user.check_not_suspended()?;
105 let s3 = state.require_s3()?;
106
107 // Fetch version and verify ownership
108 let version = db::versions::get_version_by_id(&state.db, version_id)
109 .await?
110 .ok_or(AppError::NotFound)?;
111
112 let owner = db::items::get_item_owner(&state.db, version.item_id)
113 .await?
114 .ok_or(AppError::NotFound)?;
115
116 if owner != user.id {
117 return Err(AppError::Forbidden);
118 }
119
120 // Validate S3 key belongs to this user + item (prevent cross-user file reference)
121 let expected_prefix = format!("{}/{}/", user.id, version.item_id);
122 if !req.s3_key.starts_with(&expected_prefix) {
123 return Err(AppError::BadRequest(
124 "Invalid upload key".to_string(),
125 ));
126 }
127
128 // Verify the object exists in S3
129 if !s3.object_exists(&req.s3_key).await? {
130 return Err(AppError::BadRequest(
131 "Upload not found. Please try uploading again.".to_string(),
132 ));
133 }
134
135 // Enforce file size limit (versions are always downloads)
136 let file_size_bytes = s3.object_size(&req.s3_key).await?.ok_or_else(|| {
137 AppError::BadRequest("Could not determine file size. Please try uploading again.".to_string())
138 })?;
139 if file_size_bytes as u64 > FileType::Download.max_size() {
140 s3.delete_object(&req.s3_key).await.ok();
141 let limit_mb = FileType::Download.max_size() / (1024 * 1024);
142 let file_mb = file_size_bytes as u64 / (1024 * 1024);
143 return Err(AppError::FileTooLarge(format!(
144 "File is {} MB but the maximum for download files is {} MB.",
145 file_mb, limit_mb
146 )));
147 }
148
149 // Enforce tier-based limits (per-file + storage cap)
150 let max_storage = match db::creator_tiers::check_upload_allowed(&state.db, user.id, FileType::Download, file_size_bytes).await {
151 Ok(max) => max,
152 Err(e) => {
153 s3.delete_object(&req.s3_key).await.ok();
154 return Err(e);
155 }
156 };
157
158 // Idempotency: if the version already has this exact s3_key, return success (no-op).
159 // Must come BEFORE scan enqueue / scan_status flip — re-confirming an already-Clean
160 // version must not knock it back to Pending.
161 if version.s3_key.as_deref() == Some(&req.s3_key) {
162 // Still clear pending_uploads — orphan reaper would otherwise delete
163 // the live S3 object 24h later (Run #7 HIGH-1).
164 if let Err(e) = db::pending_uploads::remove_pending_upload(&state.db, user.id, &req.s3_key).await {
165 tracing::warn!(error = ?e, key = %req.s3_key, "remove_pending_upload failed on idempotent re-confirm");
166 }
167 return Ok(Json(ConfirmUploadResponse { success: true, pending_review: None }));
168 }
169
170 let old_s3_key = version.s3_key.clone();
171 let old_size = version.file_size_bytes.unwrap_or(0);
172 let is_replace = old_s3_key.is_some() && old_size > 0;
173
174 // Extract file name from the s3_key (last path segment)
175 let file_name = req.s3_key.rsplit('/').next().map(|s| s.to_string());
176
177 // Storage credit + version UPDATE in ONE transaction. The expected-old guard
178 // (`s3_key IS NOT DISTINCT FROM`) returns no row if another confirm raced
179 // ahead; we leave the tx uncommitted so the rollback undoes the storage
180 // change with no compensating math (the previous swallowed-`.ok()` path).
181 // `commit_upload` stays AFTER the commit (the blessed scan-ordering path).
182 // `Ok(false)` = lost race (rolled back, nothing charged).
183 let committed: Result<bool> = async {
184 let mut tx = state.db.begin().await?;
185 db::creator_tiers::try_apply_storage_on(
186 &mut tx, user.id, is_replace.then_some(old_size), file_size_bytes, max_storage,
187 ).await?;
188 let updated = db::versions::update_version_file(
189 &mut *tx,
190 version_id,
191 old_s3_key.as_deref(),
192 &req.s3_key,
193 Some(file_size_bytes),
194 file_name.as_deref(),
195 )
196 .await?;
197 if updated.is_none() {
198 // Lost race — drop tx to roll back the storage change.
199 return Ok(false);
200 }
201 tx.commit().await?;
202 Ok(true)
203 }
204 .await;
205
206 match committed {
207 Err(e) => {
208 s3.delete_object(&req.s3_key).await.ok();
209 return Err(e);
210 }
211 Ok(false) => {
212 s3.delete_object(&req.s3_key).await.ok();
213 return Err(AppError::BadRequest(
214 "Version was modified concurrently. Please try uploading again.".to_string(),
215 ));
216 }
217 Ok(true) => {}
218 }
219
220 // Clear the pending upload record now that the upload is committed
221 db::pending_uploads::remove_pending_upload(&state.db, user.id, &req.s3_key).await?;
222
223 let scan_status = commit_upload(
224 &state,
225 CommitTarget::Version(version_id),
226 &req.s3_key,
227 FileType::Download,
228 user.id,
229 file_size_bytes,
230 ).await?;
231
232 // Enqueue old S3 key for deletion now that the DB record points to the new key
233 if let Some(old_key) = old_s3_key
234 && let Err(e) = db::pending_s3_deletions::enqueue_deletions(
235 &state.db,
236 &[(old_key, "main".to_string())],
237 "version_replace",
238 ).await
239 {
240 tracing::warn!(error = ?e, "failed to enqueue old version S3 key for deletion");
241 }
242
243 tracing::info!(
244 key = %req.s3_key,
245 size = file_size_bytes,
246 is_replace,
247 ?scan_status,
248 "version upload confirmed"
249 );
250
251 let pending_review = if scan_status == db::FileScanStatus::HeldForReview {
252 Some(true)
253 } else {
254 None
255 };
256 Ok(Json(ConfirmUploadResponse { success: true, pending_review }))
257 }
258