Skip to main content

max / makenotwork

13.0 KB · 331 lines History Blame Raw
1 //! Storage API routes for S3 file uploads and streaming
2
3 mod downloads;
4 mod gallery;
5 mod images;
6 pub(crate) mod media;
7 mod uploads;
8 mod versions;
9
10 use axum::routing::get;
11 use serde::Serialize;
12 use tower_governor::GovernorLayer;
13 use uuid::Uuid;
14
15 use crate::{
16 constants,
17 csrf::{delete_csrf, post_csrf, CsrfRouter},
18 db,
19 db::scan_jobs::ScanTargetKind,
20 error::Result,
21 storage::FileType,
22 AppState,
23 };
24
25 /// Enqueue an orphaned S3 key for the pending-deletion worker.
26 ///
27 /// Use this when an upload has already crossed the durability boundary —
28 /// storage credit committed, DB row inserted, or an old object queued for
29 /// replacement — and a downstream step then failed. The queue worker retries
30 /// until the delete succeeds (or 404s), so a transient S3 failure doesn't
31 /// leak the object permanently.
32 ///
33 /// Pre-credit / pre-DB rejection paths (size cap, type-mismatch, tier check
34 /// fail) may still use `s3.delete_object(...).await.ok()` directly: nothing
35 /// in the DB references the key yet, so a swallowed error is at worst a 24h
36 /// orphan that the pending_uploads reaper will collect on its own schedule.
37 pub(crate) async fn enqueue_s3_orphan(pool: &sqlx::PgPool, s3_key: &str, source: &'static str) {
38 if let Err(e) = db::pending_s3_deletions::enqueue_deletions(
39 pool,
40 &[(s3_key.to_string(), "main".to_string())],
41 source,
42 )
43 .await
44 {
45 tracing::warn!(error = ?e, key = %s3_key, source = %source, "failed to enqueue orphan S3 key");
46 }
47 }
48
49 /// Register S3 upload and streaming routes.
50 ///
51 /// Upload routes (presign + confirm) are rate limited per IP (see `constants::UPLOAD_RATE_LIMIT_*`).
52 /// Stream/download endpoints are unlimited (presigned URLs already expire in 1 hour).
53 pub fn storage_routes() -> CsrfRouter<AppState> {
54 let upload_rate_limit = crate::helpers::rate_limiter_ms(constants::UPLOAD_RATE_LIMIT_MS, constants::UPLOAD_RATE_LIMIT_BURST);
55
56 let upload_routes = CsrfRouter::new()
57 .route("/api/upload/presign", post_csrf(uploads::presign_upload))
58 .route("/api/upload/confirm", post_csrf(uploads::confirm_upload))
59 .route("/api/versions/{version_id}/upload/presign", post_csrf(versions::version_presign_upload))
60 .route("/api/versions/{version_id}/upload/confirm", post_csrf(versions::version_confirm_upload))
61 .route("/api/projects/image/presign", post_csrf(images::project_image_presign))
62 .route("/api/projects/image/confirm", post_csrf(images::project_image_confirm))
63 .route("/api/items/image/presign", post_csrf(images::item_image_presign))
64 .route("/api/items/image/confirm", post_csrf(images::item_image_confirm))
65 .route("/api/gallery/presign", post_csrf(gallery::gallery_presign))
66 .route("/api/gallery/confirm", post_csrf(gallery::gallery_confirm))
67 .route("/api/gallery/reorder", post_csrf(gallery::gallery_reorder))
68 .route_get("/api/gallery/list/{target_type}/{target_id}", get(gallery::gallery_list))
69 .route("/api/gallery/image/{target_type}/{image_id}", delete_csrf(gallery::gallery_delete))
70 .route("/api/media/presign", post_csrf(media::media_presign))
71 .route("/api/media/confirm", post_csrf(media::media_confirm))
72 .route_get("/api/media", get(media::media_list))
73 .route_get("/api/media/folders", get(media::media_folders))
74 .route("/api/media/{id}", delete_csrf(media::media_delete))
75 .route_layer(GovernorLayer {
76 config: upload_rate_limit,
77 });
78
79 let stream_rate_limit = crate::helpers::rate_limiter_ms(constants::STREAM_RATE_LIMIT_MS, constants::STREAM_RATE_LIMIT_BURST);
80
81 let stream_routes = CsrfRouter::new()
82 .route_get("/api/stream/{item_id}", get(downloads::stream_url))
83 .route_get("/api/versions/{version_id}/download", get(downloads::version_download))
84 .route_layer(GovernorLayer {
85 config: stream_rate_limit,
86 });
87
88 upload_routes.merge(stream_routes)
89 }
90
91 // =============================================================================
92 // Shared Request/Response Types
93 // =============================================================================
94
95 /// JSON response containing the presigned upload URL and S3 key.
96 #[derive(Debug, Serialize)]
97 pub struct PresignUploadResponse {
98 pub upload_url: String,
99 pub s3_key: String,
100 pub expires_in: u64,
101 /// Cache-Control header the client must send with the S3 PUT (part of the presigned signature).
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub cache_control: Option<String>,
104 /// Maximum file size in bytes for this upload (for client-side pre-validation).
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub max_file_bytes: Option<u64>,
107 }
108
109 /// JSON response confirming a successful upload.
110 #[derive(Debug, Serialize)]
111 pub struct ConfirmUploadResponse {
112 pub success: bool,
113 /// When true, the file was uploaded but is pending manual review before
114 /// it becomes available to fans. The creator should see a "pending review"
115 /// indicator instead of assuming the file is live.
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub pending_review: Option<bool>,
118 }
119
120 // =============================================================================
121 // Helpers
122 // =============================================================================
123
124 /// Discriminates which entity an upload commit applies to, and carries the
125 /// per-target ID and the corresponding scan_status update.
126 ///
127 /// Construct one of these in your handler AFTER the entity's DB write has
128 /// committed, then pass it to [`commit_upload`]. Order matters — see
129 /// [`commit_upload`] docs.
130 pub(crate) enum CommitTarget {
131 /// An item (Audio/Cover/Video s3_key column on `items`).
132 Item(db::ItemId),
133 /// A version (`versions` table).
134 Version(db::VersionId),
135 /// A media library file (`media_files` table).
136 Media(db::MediaFileId),
137 /// A project cover image (URL stored on `projects.cover_image_url`;
138 /// project images do not carry a per-row scan_status column — the worker
139 /// only logs and creates a WAM ticket on quarantine).
140 ProjectImage(db::ProjectId),
141 /// An item image (`items.cover_s3_key`); shares the item's scan_status.
142 ItemImage(db::ItemId),
143 /// A gallery image row (`item_images`/`project_images`); like ProjectImage,
144 /// no per-row scan_status column — the worker logs + tickets on quarantine.
145 /// Carries the gallery row's own id (for log correlation only; unused by the
146 /// worker, which acts on the s3_key).
147 GalleryImage(Uuid),
148 /// A content insertion clip; no per-row scan_status column.
149 ContentInsertion(db::ContentInsertionId),
150 }
151
152 impl CommitTarget {
153 fn kind(&self) -> ScanTargetKind {
154 match self {
155 CommitTarget::Item(_) => ScanTargetKind::Item,
156 CommitTarget::ItemImage(_) => ScanTargetKind::ItemImage,
157 CommitTarget::Version(_) => ScanTargetKind::Version,
158 CommitTarget::Media(_) => ScanTargetKind::Media,
159 CommitTarget::ProjectImage(_) => ScanTargetKind::ProjectImage,
160 CommitTarget::GalleryImage(_) => ScanTargetKind::GalleryImage,
161 CommitTarget::ContentInsertion(_) => ScanTargetKind::ContentInsertion,
162 }
163 }
164
165 fn target_uuid(&self) -> Uuid {
166 match self {
167 CommitTarget::Item(id) | CommitTarget::ItemImage(id) => (*id).into(),
168 CommitTarget::Version(id) => (*id).into(),
169 CommitTarget::Media(id) => (*id).into(),
170 CommitTarget::ProjectImage(id) => (*id).into(),
171 CommitTarget::GalleryImage(id) => *id,
172 CommitTarget::ContentInsertion(id) => (*id).into(),
173 }
174 }
175 }
176
177 /// Enqueue a scan job and write the resulting status onto the target entity.
178 ///
179 /// **Call this AFTER the DB write that commits the upload has succeeded.**
180 /// Calling it earlier produces three known bug shapes — chronic across four
181 /// audit runs — which is why the lower-level pieces (`enqueue_scan_for`,
182 /// `update_*_scan_status`) are gated behind this single entry point:
183 ///
184 /// 1. A handler that early-returns (idempotent re-confirm, route mismatch,
185 /// quota rejection) leaks a `scan_jobs` row and flips a Clean status back
186 /// to Pending, blocking every fan's download until a rescan.
187 /// 2. A failed DB write leaves a dangling scan_jobs row pointing at an S3
188 /// key that's about to be deleted.
189 /// 3. The worker can race the still-uncommitted entity row.
190 ///
191 /// Use [`CommitTarget`] to bind the target id + per-target status updater.
192 /// The function returns the `FileScanStatus` that was written (callers use
193 /// this to populate the `pending_review` field of `ConfirmUploadResponse`).
194 #[tracing::instrument(
195 skip_all,
196 name = "storage::commit_upload",
197 fields(kind = ?target.kind(), target_id = %target.target_uuid(), %user_id, file_size_bytes, scan_status = tracing::field::Empty)
198 )]
199 pub(crate) async fn commit_upload(
200 state: &AppState,
201 target: CommitTarget,
202 s3_key: &str,
203 file_type: FileType,
204 user_id: db::UserId,
205 file_size_bytes: i64,
206 ) -> Result<db::FileScanStatus> {
207 let scan_status = enqueue_scan_for(
208 state,
209 target.kind(),
210 target.target_uuid(),
211 s3_key,
212 file_type,
213 user_id,
214 file_size_bytes,
215 )
216 .await?;
217 match target {
218 CommitTarget::Item(id) | CommitTarget::ItemImage(id) => {
219 db::scanning::update_item_scan_status(&state.db, id, scan_status).await?;
220 }
221 CommitTarget::Version(id) => {
222 db::scanning::update_version_scan_status(&state.db, id, scan_status).await?;
223 }
224 CommitTarget::Media(id) => {
225 db::scanning::update_media_file_scan_status(&state.db, id, scan_status).await?;
226 }
227 CommitTarget::ProjectImage(_)
228 | CommitTarget::GalleryImage(_)
229 | CommitTarget::ContentInsertion(_) => {
230 // No per-row scan_status column; worker logs + creates WAM ticket.
231 }
232 }
233 tracing::Span::current().record("scan_status", tracing::field::debug(&scan_status));
234 Ok(scan_status)
235 }
236
237 /// Admin-rescan entry point. The entity already exists; we just need to
238 /// re-run the scan pipeline against its existing `s3_key`. Enqueues the
239 /// scan job then flips the per-row `scan_status` to Pending in the same
240 /// order `commit_upload` uses for first-scan, so admin handlers can't
241 /// invert it (the chronic disease the seal was built to prevent).
242 #[tracing::instrument(
243 skip_all,
244 name = "storage::commit_rescan",
245 fields(kind = ?target.kind(), target_id = %target.target_uuid(), %user_id, file_size_bytes)
246 )]
247 pub(crate) async fn commit_rescan(
248 state: &AppState,
249 target: CommitTarget,
250 s3_key: &str,
251 file_type: FileType,
252 user_id: db::UserId,
253 file_size_bytes: i64,
254 ) -> Result<db::FileScanStatus> {
255 enqueue_scan_for(
256 state,
257 target.kind(),
258 target.target_uuid(),
259 s3_key,
260 file_type,
261 user_id,
262 file_size_bytes,
263 )
264 .await?;
265 let pending = db::FileScanStatus::Pending;
266 match target {
267 CommitTarget::Item(id) | CommitTarget::ItemImage(id) => {
268 db::scanning::update_item_scan_status(&state.db, id, pending).await?;
269 }
270 CommitTarget::Version(id) => {
271 db::scanning::update_version_scan_status(&state.db, id, pending).await?;
272 }
273 CommitTarget::Media(id) => {
274 db::scanning::update_media_file_scan_status(&state.db, id, pending).await?;
275 }
276 CommitTarget::ProjectImage(_)
277 | CommitTarget::GalleryImage(_)
278 | CommitTarget::ContentInsertion(_) => {
279 // No per-row scan_status column.
280 }
281 }
282 Ok(pending)
283 }
284
285 /// Enqueue an async scan job for an uploaded file and return the initial
286 /// `scan_status` to write onto the target entity.
287 ///
288 /// **Storage handlers should not call this directly** — use [`commit_upload`]
289 /// so the ordering invariant (scan-after-DB-commit) cannot be inverted by a
290 /// future sibling handler. This function remains `pub(super)`-equivalent for
291 /// the `commit_upload` implementation and for the worker / admin tooling
292 /// that legitimately needs the lower-level op.
293 #[tracing::instrument(
294 skip_all,
295 name = "storage::enqueue_scan_for",
296 fields(kind = ?target_kind, %target_id, %user_id, file_size_bytes)
297 )]
298 async fn enqueue_scan_for(
299 state: &AppState,
300 target_kind: ScanTargetKind,
301 target_id: Uuid,
302 s3_key: &str,
303 file_type: FileType,
304 user_id: db::UserId,
305 file_size_bytes: i64,
306 ) -> Result<db::FileScanStatus> {
307 if state.scanner.is_none() {
308 let is_trusted = db::users::is_upload_trusted(&state.db, user_id).await?;
309 let status = if is_trusted {
310 db::FileScanStatus::Clean
311 } else {
312 db::FileScanStatus::HeldForReview
313 };
314 tracing::info!(scanner = "disabled", is_trusted, ?status, "scanner unavailable; status assigned without enqueue");
315 return Ok(status);
316 }
317
318 db::scan_jobs::enqueue(
319 &state.db,
320 target_kind,
321 target_id,
322 s3_key,
323 file_type,
324 user_id,
325 file_size_bytes,
326 )
327 .await?;
328
329 Ok(db::FileScanStatus::Pending)
330 }
331