//! Storage API routes for S3 file uploads and streaming mod downloads; mod gallery; mod images; pub(crate) mod media; mod uploads; mod versions; use axum::routing::get; use serde::Serialize; use tower_governor::GovernorLayer; use uuid::Uuid; use crate::{ constants, csrf::{delete_csrf, post_csrf, CsrfRouter}, db, db::scan_jobs::ScanTargetKind, error::Result, storage::FileType, AppState, }; /// Enqueue an orphaned S3 key for the pending-deletion worker. /// /// Use this when an upload has already crossed the durability boundary — /// storage credit committed, DB row inserted, or an old object queued for /// replacement — and a downstream step then failed. The queue worker retries /// until the delete succeeds (or 404s), so a transient S3 failure doesn't /// leak the object permanently. /// /// Pre-credit / pre-DB rejection paths (size cap, type-mismatch, tier check /// fail) may still use `s3.delete_object(...).await.ok()` directly: nothing /// in the DB references the key yet, so a swallowed error is at worst a 24h /// orphan that the pending_uploads reaper will collect on its own schedule. pub(crate) async fn enqueue_s3_orphan(pool: &sqlx::PgPool, s3_key: &str, source: &'static str) { if let Err(e) = db::pending_s3_deletions::enqueue_deletions( pool, &[(s3_key.to_string(), "main".to_string())], source, ) .await { tracing::warn!(error = ?e, key = %s3_key, source = %source, "failed to enqueue orphan S3 key"); } } /// Register S3 upload and streaming routes. /// /// Upload routes (presign + confirm) are rate limited per IP (see `constants::UPLOAD_RATE_LIMIT_*`). /// Stream/download endpoints are unlimited (presigned URLs already expire in 1 hour). pub fn storage_routes() -> CsrfRouter { let upload_rate_limit = crate::helpers::rate_limiter_ms(constants::UPLOAD_RATE_LIMIT_MS, constants::UPLOAD_RATE_LIMIT_BURST); let upload_routes = CsrfRouter::new() .route("/api/upload/presign", post_csrf(uploads::presign_upload)) .route("/api/upload/confirm", post_csrf(uploads::confirm_upload)) .route("/api/versions/{version_id}/upload/presign", post_csrf(versions::version_presign_upload)) .route("/api/versions/{version_id}/upload/confirm", post_csrf(versions::version_confirm_upload)) .route("/api/projects/image/presign", post_csrf(images::project_image_presign)) .route("/api/projects/image/confirm", post_csrf(images::project_image_confirm)) .route("/api/items/image/presign", post_csrf(images::item_image_presign)) .route("/api/items/image/confirm", post_csrf(images::item_image_confirm)) .route("/api/gallery/presign", post_csrf(gallery::gallery_presign)) .route("/api/gallery/confirm", post_csrf(gallery::gallery_confirm)) .route("/api/gallery/reorder", post_csrf(gallery::gallery_reorder)) .route_get("/api/gallery/list/{target_type}/{target_id}", get(gallery::gallery_list)) .route("/api/gallery/image/{target_type}/{image_id}", delete_csrf(gallery::gallery_delete)) .route("/api/media/presign", post_csrf(media::media_presign)) .route("/api/media/confirm", post_csrf(media::media_confirm)) .route_get("/api/media", get(media::media_list)) .route_get("/api/media/folders", get(media::media_folders)) .route("/api/media/{id}", delete_csrf(media::media_delete)) .route_layer(GovernorLayer { config: upload_rate_limit, }); let stream_rate_limit = crate::helpers::rate_limiter_ms(constants::STREAM_RATE_LIMIT_MS, constants::STREAM_RATE_LIMIT_BURST); let stream_routes = CsrfRouter::new() .route_get("/api/stream/{item_id}", get(downloads::stream_url)) .route_get("/api/versions/{version_id}/download", get(downloads::version_download)) .route_layer(GovernorLayer { config: stream_rate_limit, }); upload_routes.merge(stream_routes) } // ============================================================================= // Shared Request/Response Types // ============================================================================= /// JSON response containing the presigned upload URL and S3 key. #[derive(Debug, Serialize)] pub struct PresignUploadResponse { pub upload_url: String, pub s3_key: String, pub expires_in: u64, /// Cache-Control header the client must send with the S3 PUT (part of the presigned signature). #[serde(skip_serializing_if = "Option::is_none")] pub cache_control: Option, /// Maximum file size in bytes for this upload (for client-side pre-validation). #[serde(skip_serializing_if = "Option::is_none")] pub max_file_bytes: Option, } /// JSON response confirming a successful upload. #[derive(Debug, Serialize)] pub struct ConfirmUploadResponse { pub success: bool, /// When true, the file was uploaded but is pending manual review before /// it becomes available to fans. The creator should see a "pending review" /// indicator instead of assuming the file is live. #[serde(skip_serializing_if = "Option::is_none")] pub pending_review: Option, } // ============================================================================= // Helpers // ============================================================================= /// Discriminates which entity an upload commit applies to, and carries the /// per-target ID and the corresponding scan_status update. /// /// Construct one of these in your handler AFTER the entity's DB write has /// committed, then pass it to [`commit_upload`]. Order matters — see /// [`commit_upload`] docs. pub(crate) enum CommitTarget { /// An item (Audio/Cover/Video s3_key column on `items`). Item(db::ItemId), /// A version (`versions` table). Version(db::VersionId), /// A media library file (`media_files` table). Media(db::MediaFileId), /// A project cover image (URL stored on `projects.cover_image_url`; /// project images do not carry a per-row scan_status column — the worker /// only logs and creates a WAM ticket on quarantine). ProjectImage(db::ProjectId), /// An item image (`items.cover_s3_key`); shares the item's scan_status. ItemImage(db::ItemId), /// A gallery image row (`item_images`/`project_images`); like ProjectImage, /// no per-row scan_status column — the worker logs + tickets on quarantine. /// Carries the gallery row's own id (for log correlation only; unused by the /// worker, which acts on the s3_key). GalleryImage(Uuid), /// A content insertion clip; no per-row scan_status column. ContentInsertion(db::ContentInsertionId), } impl CommitTarget { fn kind(&self) -> ScanTargetKind { match self { CommitTarget::Item(_) => ScanTargetKind::Item, CommitTarget::ItemImage(_) => ScanTargetKind::ItemImage, CommitTarget::Version(_) => ScanTargetKind::Version, CommitTarget::Media(_) => ScanTargetKind::Media, CommitTarget::ProjectImage(_) => ScanTargetKind::ProjectImage, CommitTarget::GalleryImage(_) => ScanTargetKind::GalleryImage, CommitTarget::ContentInsertion(_) => ScanTargetKind::ContentInsertion, } } fn target_uuid(&self) -> Uuid { match self { CommitTarget::Item(id) | CommitTarget::ItemImage(id) => (*id).into(), CommitTarget::Version(id) => (*id).into(), CommitTarget::Media(id) => (*id).into(), CommitTarget::ProjectImage(id) => (*id).into(), CommitTarget::GalleryImage(id) => *id, CommitTarget::ContentInsertion(id) => (*id).into(), } } } /// Enqueue a scan job and write the resulting status onto the target entity. /// /// **Call this AFTER the DB write that commits the upload has succeeded.** /// Calling it earlier produces three known bug shapes — chronic across four /// audit runs — which is why the lower-level pieces (`enqueue_scan_for`, /// `update_*_scan_status`) are gated behind this single entry point: /// /// 1. A handler that early-returns (idempotent re-confirm, route mismatch, /// quota rejection) leaks a `scan_jobs` row and flips a Clean status back /// to Pending, blocking every fan's download until a rescan. /// 2. A failed DB write leaves a dangling scan_jobs row pointing at an S3 /// key that's about to be deleted. /// 3. The worker can race the still-uncommitted entity row. /// /// Use [`CommitTarget`] to bind the target id + per-target status updater. /// The function returns the `FileScanStatus` that was written (callers use /// this to populate the `pending_review` field of `ConfirmUploadResponse`). #[tracing::instrument( skip_all, name = "storage::commit_upload", fields(kind = ?target.kind(), target_id = %target.target_uuid(), %user_id, file_size_bytes, scan_status = tracing::field::Empty) )] pub(crate) async fn commit_upload( state: &AppState, target: CommitTarget, s3_key: &str, file_type: FileType, user_id: db::UserId, file_size_bytes: i64, ) -> Result { let scan_status = enqueue_scan_for( state, target.kind(), target.target_uuid(), s3_key, file_type, user_id, file_size_bytes, ) .await?; match target { CommitTarget::Item(id) | CommitTarget::ItemImage(id) => { db::scanning::update_item_scan_status(&state.db, id, scan_status).await?; } CommitTarget::Version(id) => { db::scanning::update_version_scan_status(&state.db, id, scan_status).await?; } CommitTarget::Media(id) => { db::scanning::update_media_file_scan_status(&state.db, id, scan_status).await?; } CommitTarget::ProjectImage(_) | CommitTarget::GalleryImage(_) | CommitTarget::ContentInsertion(_) => { // No per-row scan_status column; worker logs + creates WAM ticket. } } tracing::Span::current().record("scan_status", tracing::field::debug(&scan_status)); Ok(scan_status) } /// Admin-rescan entry point. The entity already exists; we just need to /// re-run the scan pipeline against its existing `s3_key`. Enqueues the /// scan job then flips the per-row `scan_status` to Pending in the same /// order `commit_upload` uses for first-scan, so admin handlers can't /// invert it (the chronic disease the seal was built to prevent). #[tracing::instrument( skip_all, name = "storage::commit_rescan", fields(kind = ?target.kind(), target_id = %target.target_uuid(), %user_id, file_size_bytes) )] pub(crate) async fn commit_rescan( state: &AppState, target: CommitTarget, s3_key: &str, file_type: FileType, user_id: db::UserId, file_size_bytes: i64, ) -> Result { enqueue_scan_for( state, target.kind(), target.target_uuid(), s3_key, file_type, user_id, file_size_bytes, ) .await?; let pending = db::FileScanStatus::Pending; match target { CommitTarget::Item(id) | CommitTarget::ItemImage(id) => { db::scanning::update_item_scan_status(&state.db, id, pending).await?; } CommitTarget::Version(id) => { db::scanning::update_version_scan_status(&state.db, id, pending).await?; } CommitTarget::Media(id) => { db::scanning::update_media_file_scan_status(&state.db, id, pending).await?; } CommitTarget::ProjectImage(_) | CommitTarget::GalleryImage(_) | CommitTarget::ContentInsertion(_) => { // No per-row scan_status column. } } Ok(pending) } /// Enqueue an async scan job for an uploaded file and return the initial /// `scan_status` to write onto the target entity. /// /// **Storage handlers should not call this directly** — use [`commit_upload`] /// so the ordering invariant (scan-after-DB-commit) cannot be inverted by a /// future sibling handler. This function remains `pub(super)`-equivalent for /// the `commit_upload` implementation and for the worker / admin tooling /// that legitimately needs the lower-level op. #[tracing::instrument( skip_all, name = "storage::enqueue_scan_for", fields(kind = ?target_kind, %target_id, %user_id, file_size_bytes) )] async fn enqueue_scan_for( state: &AppState, target_kind: ScanTargetKind, target_id: Uuid, s3_key: &str, file_type: FileType, user_id: db::UserId, file_size_bytes: i64, ) -> Result { if state.scanner.is_none() { let is_trusted = db::users::is_upload_trusted(&state.db, user_id).await?; let status = if is_trusted { db::FileScanStatus::Clean } else { db::FileScanStatus::HeldForReview }; tracing::info!(scanner = "disabled", is_trusted, ?status, "scanner unavailable; status assigned without enqueue"); return Ok(status); } db::scan_jobs::enqueue( &state.db, target_kind, target_id, s3_key, file_type, user_id, file_size_bytes, ) .await?; Ok(db::FileScanStatus::Pending) }