Skip to main content

max / makenotwork

2.5 KB · 77 lines History Blame Raw
1 //! Tracking for presigned uploads that have not yet been confirmed.
2
3 use sqlx::PgPool;
4
5 use crate::db::UserId;
6 use crate::error::Result;
7
8 /// Record that a presigned upload URL was issued.
9 pub async fn record_pending_upload(
10 pool: &PgPool,
11 user_id: UserId,
12 s3_key: &str,
13 bucket: &str,
14 ) -> Result<()> {
15 // Re-presigning the same key (idempotent retry, multi-part flow) must refresh
16 // `created_at`, otherwise the stale-pending reaper can delete a freshly-pending
17 // object that's actively being uploaded right now.
18 //
19 // Pin the refresh to the original owner: if a different user collides on the
20 // same key, do NOT refresh — let the reaper age the original row out on its
21 // own schedule. Otherwise a re-presign loop by another principal could keep
22 // an orphan object alive indefinitely.
23 sqlx::query(
24 "INSERT INTO pending_uploads (user_id, s3_key, bucket) VALUES ($1, $2, $3)
25 ON CONFLICT (s3_key) DO UPDATE SET created_at = NOW()
26 WHERE pending_uploads.user_id = EXCLUDED.user_id",
27 )
28 .bind(user_id)
29 .bind(s3_key)
30 .bind(bucket)
31 .execute(pool)
32 .await?;
33 Ok(())
34 }
35
36 /// Remove the pending upload record after a successful confirm. Scoped to
37 /// `user_id` so a future caller that accepts a partially user-supplied key
38 /// can't delete another user's pending row — today's per-handler prefix
39 /// validation makes cross-user collision unreachable, but the function
40 /// signature shouldn't be broader than the invariant it protects.
41 pub async fn remove_pending_upload<'e>(
42 executor: impl sqlx::PgExecutor<'e>,
43 user_id: UserId,
44 s3_key: &str,
45 ) -> Result<()> {
46 sqlx::query("DELETE FROM pending_uploads WHERE s3_key = $1 AND user_id = $2")
47 .bind(s3_key)
48 .bind(user_id)
49 .execute(executor)
50 .await?;
51 Ok(())
52 }
53
54 /// Fetch presigned uploads older than `max_age` that were never confirmed.
55 pub async fn get_stale_pending_uploads(
56 pool: &PgPool,
57 max_age: chrono::Duration,
58 ) -> Result<Vec<(String, String)>> {
59 let cutoff = chrono::Utc::now() - max_age;
60 let rows: Vec<(String, String)> = sqlx::query_as(
61 "SELECT s3_key, bucket FROM pending_uploads WHERE created_at < $1",
62 )
63 .bind(cutoff)
64 .fetch_all(pool)
65 .await?;
66 Ok(rows)
67 }
68
69 /// Bulk-delete pending upload records by S3 key.
70 pub async fn delete_pending_uploads(pool: &PgPool, s3_keys: &[String]) -> Result<()> {
71 sqlx::query("DELETE FROM pending_uploads WHERE s3_key = ANY($1)")
72 .bind(s3_keys)
73 .execute(pool)
74 .await?;
75 Ok(())
76 }
77