//! Internal upload pipeline: presigned URL generation, upload confirmation, and storage usage. use axum::{ extract::{Query, State}, response::IntoResponse, Json, }; use serde::{Deserialize, Serialize}; use std::str::FromStr; use crate::{ auth::ServiceAuth, db::{self, ItemId, UserId}, error::{AppError, Result}, storage::{FileType, S3Client, CACHE_CONTROL_IMMUTABLE}, AppState, }; // ── Presign upload (for CLI upload pipeline) ── #[derive(Deserialize)] pub(super) struct InternalPresignRequest { user_id: UserId, item_id: ItemId, file_type: String, file_name: String, content_type: String, } #[derive(Serialize)] struct InternalPresignResponse { upload_url: String, s3_key: String, expires_in: u64, #[serde(skip_serializing_if = "Option::is_none")] cache_control: Option, } /// POST /api/internal/upload/presign /// /// Generate a presigned S3 upload URL. Used by the CLI upload pipeline. #[tracing::instrument(skip_all, name = "internal::presign_upload")] pub(super) async fn presign_upload( State(state): State, _auth: ServiceAuth, Json(req): Json, ) -> Result { let s3 = state.require_s3()?; let file_type = FileType::from_str(&req.file_type) .map_err(|_| AppError::BadRequest(format!("Invalid file type: {}", req.file_type)))?; S3Client::validate_content_type(file_type, &req.content_type)?; S3Client::validate_extension(file_type, &req.file_name)?; // Verify user owns the item let owner = db::items::get_item_owner(&state.db, req.item_id) .await? .ok_or(AppError::NotFound)?; if owner != req.user_id { return Err(AppError::Forbidden); } // Early quota check db::creator_tiers::check_presign_allowed(&state.db, req.user_id, file_type).await?; let s3_key = S3Client::generate_key(req.user_id, req.item_id, file_type, &req.file_name); // Track the pending upload so the reaper can clean it up if never confirmed db::pending_uploads::record_pending_upload(&state.db, req.user_id, &s3_key, "main").await?; let expires_in = 3600; let upload_url = s3 .presign_upload( &s3_key, &req.content_type, Some(expires_in), Some(CACHE_CONTROL_IMMUTABLE), None, ) .await?; Ok(Json(InternalPresignResponse { upload_url, s3_key, expires_in, cache_control: Some(CACHE_CONTROL_IMMUTABLE.to_string()), })) } // ── Confirm upload (for CLI upload pipeline) ── #[derive(Deserialize)] pub(super) struct InternalConfirmRequest { user_id: UserId, item_id: ItemId, file_type: String, s3_key: String, } #[derive(Serialize)] struct InternalConfirmResponse { success: bool, } /// POST /api/internal/upload/confirm /// /// Confirm a completed S3 upload: verify, scan, update DB. Used by the CLI upload pipeline. #[tracing::instrument(skip_all, name = "internal::confirm_upload")] pub(super) async fn confirm_upload( State(state): State, _auth: ServiceAuth, Json(req): Json, ) -> Result { let s3 = state.require_s3()?; let file_type = FileType::from_str(&req.file_type) .map_err(|_| AppError::BadRequest(format!("Invalid file type: {}", req.file_type)))?; // Verify user owns the item let owner = db::items::get_item_owner(&state.db, req.item_id) .await? .ok_or(AppError::NotFound)?; if owner != req.user_id { return Err(AppError::Forbidden); } // Validate S3 key belongs to this user + item (prevent cross-user file reference) let expected_prefix = format!("{}/{}/", req.user_id, req.item_id); if !req.s3_key.starts_with(&expected_prefix) { return Err(AppError::BadRequest("Invalid upload key".to_string())); } // Verify the object exists in S3 if !s3.object_exists(&req.s3_key).await? { return Err(AppError::BadRequest( "Upload not found. Please try uploading again.".to_string(), )); } // Enforce file size limit let file_size_bytes = s3.object_size(&req.s3_key).await?.ok_or_else(|| { AppError::BadRequest("Could not determine file size. Please try uploading again.".to_string()) })?; if file_size_bytes as u64 > file_type.max_size() { s3.delete_object(&req.s3_key).await.ok(); return Err(AppError::BadRequest(format!( "File exceeds maximum size of {} MB", file_type.max_size() / (1024 * 1024) ))); } // Enforce tier-based limits let max_storage = match db::creator_tiers::check_upload_allowed( &state.db, req.user_id, file_type, file_size_bytes, ) .await { Ok(max) => max, Err(e) => { s3.delete_object(&req.s3_key).await.ok(); return Err(e); } }; // Reject unsupported file types BEFORE any side effect — same ordering rule // as the web upload handlers (see routes/storage/mod.rs::commit_upload). if !matches!(file_type, FileType::Audio | FileType::Download | FileType::Video) { s3.delete_object(&req.s3_key).await.ok(); return Err(AppError::BadRequest( "CLI upload only supports audio, video, and download file types".to_string(), )); } // Increment storage BEFORE writing the DB record (if quota exceeded, the // S3 object is cleaned up and no DB record is created). if let Err(e) = db::creator_tiers::try_increment_storage(&state.db, req.user_id, file_size_bytes, max_storage).await { s3.delete_object(&req.s3_key).await.ok(); return Err(e); } db::pending_uploads::remove_pending_upload(&state.db, req.user_id, &req.s3_key).await?; // Update the database with S3 key and file size. let file_name = req.s3_key.rsplit('/').next().map(|s| s.to_string()); let commit_target = match file_type { FileType::Audio => { db::items::update_item_audio_s3_key(&state.db, req.item_id, req.user_id, &req.s3_key).await?; db::items::update_item_audio_file_size(&state.db, req.item_id, req.user_id, file_size_bytes) .await?; crate::routes::storage::CommitTarget::Item(req.item_id) } FileType::Download => { let version = db::versions::create_version( &state.db, req.item_id, "1.0", None, Some(&req.s3_key), Some(file_size_bytes), file_name.as_deref(), None, ) .await?; crate::routes::storage::CommitTarget::Version(version.id) } FileType::Video => { db::items::update_item_video_s3_key(&state.db, req.item_id, req.user_id, &req.s3_key).await?; db::items::update_item_video_file_size(&state.db, req.item_id, req.user_id, file_size_bytes) .await?; crate::routes::storage::CommitTarget::Item(req.item_id) } _ => unreachable!("guarded above"), }; // Scan enqueue + scan_status flip AFTER the DB writes commit — chronic // ordering invariant enforced via the shared commit_upload helper. let _status = crate::routes::storage::commit_upload( &state, commit_target, &req.s3_key, file_type, req.user_id, file_size_bytes, ).await?; // Bump project cache if let Some(item) = db::items::get_item_by_id(&state.db, req.item_id).await? && let Err(e) = db::projects::bump_cache_generation(&state.db, item.project_id).await { tracing::warn!(project_id = %item.project_id, error = ?e, "failed to bump cache generation after upload"); } tracing::info!( user = %req.user_id, item = %req.item_id, file_type = ?file_type, s3_key = %req.s3_key, size = file_size_bytes, "CLI upload confirmed" ); Ok(Json(InternalConfirmResponse { success: true })) } // ── Storage info ── #[derive(Deserialize)] pub(super) struct UserIdQuery { user_id: UserId, } #[derive(Serialize)] struct StorageInfoResponse { storage_used_bytes: i64, max_storage_bytes: i64, allows_file_uploads: bool, } /// GET /api/internal/creator/storage?user_id={uuid} /// /// Get storage usage and limits for a creator. #[tracing::instrument(skip_all, name = "internal::creator_storage")] pub(super) async fn creator_storage( State(state): State, _auth: ServiceAuth, Query(query): Query, ) -> Result { let used = db::creator_tiers::get_storage_used(&state.db, query.user_id).await?; // Resolve effective tier let tier = db::creator_tiers::get_active_creator_tier(&state.db, query.user_id).await?; let (max_storage, allows_uploads) = match tier { Some(t) => (t.max_storage_bytes(), t.allows_file_uploads()), None => (0, false), }; Ok(Json(StorageInfoResponse { storage_used_bytes: used, max_storage_bytes: max_storage, allows_file_uploads: allows_uploads, })) }