//! Tracking for presigned uploads that have not yet been confirmed. use sqlx::PgPool; use crate::db::UserId; use crate::error::Result; /// Record that a presigned upload URL was issued. pub async fn record_pending_upload( pool: &PgPool, user_id: UserId, s3_key: &str, bucket: &str, ) -> Result<()> { // Re-presigning the same key (idempotent retry, multi-part flow) must refresh // `created_at`, otherwise the stale-pending reaper can delete a freshly-pending // object that's actively being uploaded right now. // // Pin the refresh to the original owner: if a different user collides on the // same key, do NOT refresh — let the reaper age the original row out on its // own schedule. Otherwise a re-presign loop by another principal could keep // an orphan object alive indefinitely. sqlx::query( "INSERT INTO pending_uploads (user_id, s3_key, bucket) VALUES ($1, $2, $3) ON CONFLICT (s3_key) DO UPDATE SET created_at = NOW() WHERE pending_uploads.user_id = EXCLUDED.user_id", ) .bind(user_id) .bind(s3_key) .bind(bucket) .execute(pool) .await?; Ok(()) } /// Remove the pending upload record after a successful confirm. Scoped to /// `user_id` so a future caller that accepts a partially user-supplied key /// can't delete another user's pending row — today's per-handler prefix /// validation makes cross-user collision unreachable, but the function /// signature shouldn't be broader than the invariant it protects. pub async fn remove_pending_upload<'e>( executor: impl sqlx::PgExecutor<'e>, user_id: UserId, s3_key: &str, ) -> Result<()> { sqlx::query("DELETE FROM pending_uploads WHERE s3_key = $1 AND user_id = $2") .bind(s3_key) .bind(user_id) .execute(executor) .await?; Ok(()) } /// Fetch presigned uploads older than `max_age` that were never confirmed. pub async fn get_stale_pending_uploads( pool: &PgPool, max_age: chrono::Duration, ) -> Result> { let cutoff = chrono::Utc::now() - max_age; let rows: Vec<(String, String)> = sqlx::query_as( "SELECT s3_key, bucket FROM pending_uploads WHERE created_at < $1", ) .bind(cutoff) .fetch_all(pool) .await?; Ok(rows) } /// Bulk-delete pending upload records by S3 key. pub async fn delete_pending_uploads(pool: &PgPool, s3_keys: &[String]) -> Result<()> { sqlx::query("DELETE FROM pending_uploads WHERE s3_key = ANY($1)") .bind(s3_keys) .execute(pool) .await?; Ok(()) }