//! Presigned upload and confirm handlers for item content. use axum::{ extract::State, response::IntoResponse, Json, }; use serde::Deserialize; use std::str::FromStr; use crate::{ auth::AuthUser, db::{self, ItemId}, error::{AppError, Result, ResultExt}, storage::{FileType, S3Client, CACHE_CONTROL_IMMUTABLE}, AppState, }; use super::{commit_upload, CommitTarget, ConfirmUploadResponse, PresignUploadResponse}; /// JSON input for requesting a presigned S3 upload URL. /// /// `file_size_bytes` is optional for compatibility with older clients that /// don't know the size ahead of time, but when supplied it is signed into /// the presigned URL's `Content-Length` and S3 will reject any PUT whose /// actual body length differs — protocol-level enforcement of the per-file /// cap, no bandwidth wasted on oversized uploads that the confirm step /// would have rejected anyway. #[derive(Debug, Deserialize)] pub struct PresignUploadRequest { pub item_id: ItemId, pub file_type: String, pub file_name: String, pub content_type: String, #[serde(default)] pub file_size_bytes: Option, } /// JSON input for confirming a completed S3 upload. #[derive(Debug, Deserialize)] pub struct ConfirmUploadRequest { pub item_id: ItemId, pub file_type: String, pub s3_key: String, } /// Generate a presigned URL for uploading a file to S3 /// /// POST /api/upload/presign /// /// Requires authentication. User must own the item. #[tracing::instrument(skip_all, name = "storage::presign_upload", fields(user_id = %user.id))] pub(super) async fn presign_upload( State(state): State, AuthUser(user): AuthUser, Json(req): Json, ) -> Result { user.check_not_suspended()?; // Check if S3 is configured let s3 = state.require_s3()?; // Parse file type let file_type = FileType::from_str(&req.file_type) .map_err(|_| AppError::BadRequest(format!("Invalid file type: {}", req.file_type)))?; // Validate content type S3Client::validate_content_type(file_type, &req.content_type)?; // Validate file extension S3Client::validate_extension(file_type, &req.file_name)?; // Verify user owns the item let owner = db::items::get_item_owner(&state.db, req.item_id) .await? .ok_or(AppError::NotFound)?; if owner != user.id { return Err(AppError::Forbidden); } // Early quota check (reject before generating presigned URL) db::creator_tiers::check_presign_allowed(&state.db, user.id, file_type).await?; // Get the effective max file size for client-side pre-validation let max_file_bytes = db::creator_tiers::get_effective_max_file_bytes(&state.db, user.id, file_type).await?; // If the client declared the file size, validate it before signing — // both against the static per-type cap and the tier-effective cap. We // bind it as Content-Length so S3 rejects oversized PUTs at the protocol // level. Clients omitting `file_size_bytes` fall back to the old behavior // (no protocol-level enforcement; confirm step still validates). if let Some(size) = req.file_size_bytes { if size <= 0 { return Err(AppError::BadRequest("file_size_bytes must be positive".to_string())); } if size as u64 > file_type.max_size() { let limit_mb = file_type.max_size() / (1024 * 1024); let file_mb = size as u64 / (1024 * 1024); return Err(AppError::FileTooLarge(format!( "File is {} MB but the maximum for {} files is {} MB.", file_mb, file_type.as_str(), limit_mb ))); } if let Some(tier_cap) = max_file_bytes && (size as u64) > tier_cap { let limit_mb = tier_cap / (1024 * 1024); return Err(AppError::FileTooLarge(format!( "File exceeds your tier's per-file limit of {} MB.", limit_mb ))); } } // Generate S3 key let s3_key = S3Client::generate_key(user.id, req.item_id, file_type, &req.file_name); // Track the pending upload so the reaper can clean it up if never confirmed db::pending_uploads::record_pending_upload(&state.db, user.id, &s3_key, "main").await?; // Generate presigned upload URL with immutable cache headers let expires_in = 3600; // 1 hour let upload_url = s3.presign_upload(&s3_key, &req.content_type, Some(expires_in), Some(CACHE_CONTROL_IMMUTABLE), req.file_size_bytes) .await .context("presign upload for item content")?; Ok(Json(PresignUploadResponse { upload_url, s3_key, expires_in, cache_control: Some(CACHE_CONTROL_IMMUTABLE.to_string()), max_file_bytes, })) } /// Confirm that an upload has completed and update the database /// /// POST /api/upload/confirm /// /// Requires authentication. User must own the item. #[tracing::instrument(skip_all, name = "storage::confirm_upload", fields(user_id = %user.id))] pub(super) async fn confirm_upload( State(state): State, AuthUser(user): AuthUser, Json(req): Json, ) -> Result { user.check_not_suspended()?; // Check if S3 is configured let s3 = state.require_s3()?; // Parse file type let file_type = FileType::from_str(&req.file_type) .map_err(|_| AppError::BadRequest(format!("Invalid file type: {}", req.file_type)))?; // Verify user owns the item let owner = db::items::get_item_owner(&state.db, req.item_id) .await? .ok_or(AppError::NotFound)?; if owner != user.id { return Err(AppError::Forbidden); } // Validate S3 key belongs to this user + item (prevent cross-user file reference) let expected_prefix = format!("{}/{}/", user.id, req.item_id); if !req.s3_key.starts_with(&expected_prefix) { return Err(AppError::BadRequest( "Invalid upload key".to_string(), )); } // Verify the object exists in S3 if !s3.object_exists(&req.s3_key).await? { return Err(AppError::BadRequest( "Upload not found. Please try uploading again.".to_string(), )); } // Enforce file size limit (static per-type limit) let file_size_bytes = s3.object_size(&req.s3_key).await?.ok_or_else(|| { AppError::BadRequest("Could not determine file size. Please try uploading again.".to_string()) })?; if file_size_bytes as u64 > file_type.max_size() { s3.delete_object(&req.s3_key).await.ok(); let limit_mb = file_type.max_size() / (1024 * 1024); let file_mb = file_size_bytes as u64 / (1024 * 1024); return Err(AppError::FileTooLarge(format!( "File is {} MB but the maximum for {} files is {} MB.", file_mb, file_type.as_str(), limit_mb ))); } // Enforce tier-based limits (per-file + storage cap) let max_storage = match db::creator_tiers::check_upload_allowed(&state.db, user.id, file_type, file_size_bytes).await { Ok(max) => max, Err(e) => { s3.delete_object(&req.s3_key).await.ok(); return Err(e); } }; // Resolve, from the single exhaustive declaration on `FileType`, how this // type is confirmed on an `items` row — and reject types that belong to a // dedicated route BEFORE any scan enqueue or scan_status flip. (A misrouted // but valid item_id would otherwise flip scan_status to Pending, block every // fan's download, and leak a scan_jobs row for an S3 key we're about to // delete.) Cover is rejected here now: it also needs `cover_image_url`, // which only /api/items/image/confirm writes — the old generic two-column // path left it NULL and rendered an invisible cover (Run #13 SERIOUS). let (s3_col, size_col) = match file_type.generic_item_confirm() { crate::storage::GenericItemConfirm::Columns { s3_key, size } => (s3_key, size), crate::storage::GenericItemConfirm::UseRoute(route) => { s3.delete_object(&req.s3_key).await.ok(); return Err(AppError::BadRequest(format!( "This file type isn't confirmed here — use {route}." ))); } }; // Idempotency + replace detection. We read the item ONCE here and reuse its // project_id for the cache bump below (previously a second get_item_by_id). let mut old_s3_key: Option = None; let mut replaced_old_size: i64 = 0; let mut item_project_id: Option = None; if let Some(item) = db::items::get_item_by_id(&state.db, req.item_id).await? { item_project_id = Some(item.project_id); let existing_key = match file_type { FileType::Audio => item.audio_s3_key.as_deref(), FileType::Cover => item.cover_s3_key.as_deref(), FileType::Video => item.video_s3_key.as_deref(), _ => None, }; if existing_key == Some(&req.s3_key) { // Idempotent re-confirm: the entity already references this s3_key. // Still clear the pending_uploads row — otherwise the orphan reaper // will fire 24h later and delete the live S3 object out from under // a perfectly happy DB row (Run #7 HIGH-1). if let Err(e) = db::pending_uploads::remove_pending_upload(&state.db, user.id, &req.s3_key).await { tracing::warn!(error = ?e, key = %req.s3_key, "remove_pending_upload failed on idempotent re-confirm"); } return Ok(Json(ConfirmUploadResponse { success: true, pending_review: None })); } if let Some(old_key) = existing_key { old_s3_key = Some(old_key.to_string()); replaced_old_size = match file_type { FileType::Audio => item.audio_file_size_bytes.unwrap_or(0), FileType::Cover => item.cover_file_size_bytes.unwrap_or(0), FileType::Video => item.video_file_size_bytes.unwrap_or(0), _ => 0, }; } } let is_replace = old_s3_key.is_some(); let update_sql = format!( "UPDATE items SET {} = $2, {} = $3, updated_at = NOW() WHERE id = $1 AND project_id IN (SELECT id FROM projects WHERE user_id = $4)", s3_col, size_col, ); // Storage credit + item UPDATE in ONE transaction. A rollback restores both, // so a mid-write failure can't leave the storage counter inflated against a // row that never got the key (the previous compensating-action path with // swallowed `.ok()` errors). `commit_upload` (scan enqueue + scan_status // flip) stays AFTER the commit — that ordering is the blessed path; see // `routes/storage/mod.rs::commit_upload`. `Ok(0)` signals the item vanished // between the owner check and the UPDATE (tx rolled back, nothing charged). let tx_result: Result = async { let mut tx = state.db.begin().await?; db::creator_tiers::try_apply_storage_on( &mut tx, user.id, is_replace.then_some(replaced_old_size), file_size_bytes, max_storage, ).await?; let res = sqlx::query(&update_sql) .bind(req.item_id) .bind(&req.s3_key) .bind(file_size_bytes) .bind(user.id) .execute(&mut *tx) .await?; if res.rows_affected() == 0 { // Leave the tx uncommitted — drop rolls it back, undoing the storage // change with no manual compensation. return Ok(0); } tx.commit().await?; Ok(res.rows_affected()) } .await; match tx_result { Err(e) => { // tx rolled back — storage counter unchanged. Just clean up S3. s3.delete_object(&req.s3_key).await.ok(); return Err(e); } Ok(0) => { // Item was deleted (or transferred out from under the ownership // filter) between our earlier `get_item_owner` check and this UPDATE. // The tx rolled back, so nothing was charged; route the now-unreferenced // object through the orphan queue so the reaper still cleans it. super::enqueue_s3_orphan(&state.db, &req.s3_key, "item_upload_target_missing").await; return Err(AppError::BadRequest( "Item was modified concurrently. Please try uploading again.".to_string(), )); } Ok(_) => {} } // Scan enqueue + scan_status flip happens AFTER the DB UPDATE commits via // the shared `commit_upload` helper, which is the only blessed path for // this ordering. See `routes/storage/mod.rs::commit_upload` for the bug // shapes this prevents. let scan_status = commit_upload( &state, CommitTarget::Item(req.item_id), &req.s3_key, file_type, user.id, file_size_bytes, ).await?; // Delete the old S3 object now that the replacement is committed. Route // through the orphan queue so a transient S3 failure here doesn't leak the // object forever — the worker retries until the delete succeeds (or 404s). if let Some(old_key) = &old_s3_key { super::enqueue_s3_orphan(&state.db, old_key, "item_upload_replace").await; } // Clear the pending upload record now that the upload is confirmed db::pending_uploads::remove_pending_upload(&state.db, user.id, &req.s3_key).await?; // Bump project cache generation so dashboard tabs reflect the new upload. // Reuses the project_id read during idempotency above (no second fetch). if let Some(project_id) = item_project_id && let Err(e) = db::projects::bump_cache_generation(&state.db, project_id).await { tracing::warn!(%project_id, error = ?e, "failed to bump cache generation after upload"); } tracing::info!( "Upload confirmed: item={}, type={:?}, key={}, size={}", req.item_id, file_type, req.s3_key, file_size_bytes ); let pending_review = if scan_status == db::FileScanStatus::HeldForReview { Some(true) } else { None }; Ok(Json(ConfirmUploadResponse { success: true, pending_review })) }