Skip to main content

max / makenotwork

11.3 KB · 340 lines History Blame Raw
1 //! Item version management: release creation, listing, and download tracking.
2
3 use sqlx::PgPool;
4
5 use super::models::*;
6 use super::{ItemId, UserId, VersionId};
7 use crate::error::Result;
8
9 /// Create a new version for an item, marking it as the current release.
10 ///
11 /// Wrapped in a transaction so the UPDATE (clearing the old `is_current`
12 /// flag) and INSERT (setting the new version as current) either both
13 /// succeed or both roll back.
14 #[tracing::instrument(skip_all)]
15 #[allow(clippy::too_many_arguments)]
16 pub async fn create_version(
17 pool: &PgPool,
18 item_id: ItemId,
19 version_number: &str,
20 changelog: Option<&str>,
21 file_url: Option<&str>,
22 file_size_bytes: Option<i64>,
23 file_name: Option<&str>,
24 label: Option<&str>,
25 ) -> Result<DbVersion> {
26 let mut tx = pool.begin().await?;
27
28 // Unset current on older version numbers (versions with the same number stay current)
29 sqlx::query("UPDATE versions SET is_current = false WHERE item_id = $1 AND version_number != $2")
30 .bind(item_id)
31 .bind(version_number)
32 .execute(&mut *tx)
33 .await?;
34
35 // Create new version as current
36 let version = sqlx::query_as::<_, DbVersion>(
37 r#"
38 INSERT INTO versions (item_id, version_number, changelog, file_url, file_size_bytes, file_name, is_current, label)
39 VALUES ($1, $2, $3, $4, $5, $6, true, $7)
40 RETURNING *
41 "#,
42 )
43 .bind(item_id)
44 .bind(version_number)
45 .bind(changelog)
46 .bind(file_url)
47 .bind(file_size_bytes)
48 .bind(file_name)
49 .bind(label)
50 .fetch_one(&mut *tx)
51 .await?;
52
53 tx.commit().await?;
54
55 Ok(version)
56 }
57
58 /// Hard cap on rows returned by the non-paginated version listing. Items
59 /// with this many versions are exceptional; if we ever hit the cap a warning
60 /// fires so we can promote the caller to cursor pagination. Real pagination
61 /// is deferred (Phase 6/8) — this constant just makes the truncation loud.
62 pub const VERSIONS_LIST_HARD_CAP: i64 = 5000;
63
64 /// List all versions for an item, newest first.
65 ///
66 /// Capped at `VERSIONS_LIST_HARD_CAP` as a safety limit. Hitting the cap is
67 /// logged at WARN so we notice before a real user gets silently truncated.
68 #[tracing::instrument(skip_all)]
69 pub async fn get_versions_by_item(pool: &PgPool, item_id: ItemId) -> Result<Vec<DbVersion>> {
70 let versions = sqlx::query_as::<_, DbVersion>(
71 "SELECT * FROM versions WHERE item_id = $1 ORDER BY created_at DESC LIMIT $2",
72 )
73 .bind(item_id)
74 .bind(VERSIONS_LIST_HARD_CAP)
75 .fetch_all(pool)
76 .await?;
77
78 if versions.len() as i64 == VERSIONS_LIST_HARD_CAP {
79 tracing::warn!(
80 %item_id, cap = VERSIONS_LIST_HARD_CAP,
81 "get_versions_by_item hit hard cap; promote caller to cursor pagination"
82 );
83 }
84
85 Ok(versions)
86 }
87
88 /// Batch-load versions for multiple items, grouped by item_id.
89 #[tracing::instrument(skip_all)]
90 pub async fn get_versions_by_items(
91 pool: &PgPool,
92 item_ids: &[ItemId],
93 ) -> Result<std::collections::HashMap<ItemId, Vec<DbVersion>>> {
94 let versions = sqlx::query_as::<_, DbVersion>(
95 "SELECT * FROM versions WHERE item_id = ANY($1) ORDER BY item_id, created_at DESC",
96 )
97 .bind(item_ids)
98 .fetch_all(pool)
99 .await?;
100
101 let mut map: std::collections::HashMap<ItemId, Vec<DbVersion>> = std::collections::HashMap::new();
102 for v in versions {
103 map.entry(v.item_id).or_default().push(v);
104 }
105 Ok(map)
106 }
107
108 /// Atomically increment the download counter for a version.
109 #[tracing::instrument(skip_all)]
110 pub async fn increment_download_count(pool: &PgPool, version_id: VersionId) -> Result<()> {
111 sqlx::query("UPDATE versions SET download_count = download_count + 1 WHERE id = $1")
112 .bind(version_id)
113 .execute(pool)
114 .await?;
115
116 Ok(())
117 }
118
119 /// Record that a user downloaded a specific version (idempotent).
120 #[tracing::instrument(skip_all)]
121 pub async fn record_user_download(
122 pool: &PgPool,
123 user_id: UserId,
124 item_id: ItemId,
125 version_id: VersionId,
126 ) -> Result<()> {
127 // Explicit conflict target — the table's PRIMARY KEY is
128 // (user_id, item_id, version_id), but `ON CONFLICT DO NOTHING` without
129 // a target would silently swallow conflicts on ANY future constraint
130 // we add (a unique index on downloaded_at, say). Naming the target
131 // means a new constraint surfaces as an error rather than a no-op.
132 sqlx::query(
133 r#"
134 INSERT INTO user_downloads (user_id, item_id, version_id)
135 VALUES ($1, $2, $3)
136 ON CONFLICT (user_id, item_id, version_id) DO NOTHING
137 "#,
138 )
139 .bind(user_id)
140 .bind(item_id)
141 .bind(version_id)
142 .execute(pool)
143 .await?;
144
145 Ok(())
146 }
147
148 /// Get the latest version ID a user has downloaded for an item, if any.
149 #[tracing::instrument(skip_all)]
150 pub async fn get_user_latest_download(
151 pool: &PgPool,
152 user_id: UserId,
153 item_id: ItemId,
154 ) -> Result<Option<VersionId>> {
155 let row: Option<(VersionId,)> = sqlx::query_as(
156 r#"
157 SELECT ud.version_id FROM user_downloads ud
158 JOIN versions v ON v.id = ud.version_id
159 WHERE ud.user_id = $1 AND ud.item_id = $2
160 ORDER BY v.created_at DESC
161 LIMIT 1
162 "#,
163 )
164 .bind(user_id)
165 .bind(item_id)
166 .fetch_optional(pool)
167 .await?;
168
169 Ok(row.map(|(id,)| id))
170 }
171
172 /// Fetch a version by primary key. Returns `None` if not found.
173 #[tracing::instrument(skip_all)]
174 pub async fn get_version_by_id(pool: &PgPool, version_id: VersionId) -> Result<Option<DbVersion>> {
175 let version = sqlx::query_as::<_, DbVersion>("SELECT * FROM versions WHERE id = $1")
176 .bind(version_id)
177 .fetch_optional(pool)
178 .await?;
179
180 Ok(version)
181 }
182
183 /// Collect all S3 keys for versions owned by a user.
184 ///
185 /// Returns s3_key, file_name, version_number, item title, and project slug.
186 /// Only includes versions that have an S3 key.
187 #[tracing::instrument(skip_all)]
188 pub async fn get_user_version_s3_keys(
189 pool: &PgPool,
190 user_id: super::UserId,
191 ) -> Result<Vec<VersionS3KeyRow>> {
192 let rows = sqlx::query_as::<_, VersionS3KeyRow>(
193 r#"
194 SELECT v.s3_key, v.file_name, v.version_number, i.title AS item_title,
195 p.id AS project_id, p.slug AS project_slug, v.file_size_bytes
196 FROM versions v
197 JOIN items i ON v.item_id = i.id
198 JOIN projects p ON i.project_id = p.id
199 WHERE p.user_id = $1 AND v.s3_key IS NOT NULL
200 ORDER BY p.slug, i.sort_order, v.created_at DESC
201 LIMIT $2
202 "#,
203 )
204 .bind(user_id)
205 .bind(VERSIONS_LIST_HARD_CAP)
206 .fetch_all(pool)
207 .await?;
208
209 if rows.len() as i64 == VERSIONS_LIST_HARD_CAP {
210 tracing::warn!(
211 %user_id, cap = VERSIONS_LIST_HARD_CAP,
212 "get_user_version_s3_keys (account export) hit hard cap; some files will be omitted from export"
213 );
214 }
215
216 Ok(rows)
217 }
218
219 /// Update a version's S3 key, file size, and file name in one query.
220 ///
221 /// `expected_old_s3_key` guards against a lost-update race: two concurrent
222 /// confirms can both pass the idempotency gate (reading the same prior
223 /// `version.s3_key`) and both succeed in incrementing storage; without this
224 /// guard, the second's UPDATE silently overwrites the first's, leaking S3
225 /// objects and double-charging storage.
226 ///
227 /// `Ok(None)` means the row exists but the `s3_key` no longer matches the
228 /// expected value — the caller is responsible for the rollback (refund the
229 /// storage increment, delete the new S3 object).
230 #[tracing::instrument(skip_all)]
231 pub async fn update_version_file<'e>(
232 executor: impl sqlx::PgExecutor<'e>,
233 version_id: VersionId,
234 expected_old_s3_key: Option<&str>,
235 s3_key: &str,
236 file_size_bytes: Option<i64>,
237 file_name: Option<&str>,
238 ) -> Result<Option<DbVersion>> {
239 let version = sqlx::query_as::<_, DbVersion>(
240 r#"
241 UPDATE versions
242 SET s3_key = $2, file_size_bytes = $3, file_name = $4
243 WHERE id = $1
244 AND s3_key IS NOT DISTINCT FROM $5
245 RETURNING *
246 "#,
247 )
248 .bind(version_id)
249 .bind(s3_key)
250 .bind(file_size_bytes)
251 .bind(file_name)
252 .bind(expected_old_s3_key)
253 .fetch_optional(executor)
254 .await?;
255
256 Ok(version)
257 }
258
259 /// Delete a version by ID, decrementing the owning user's storage counter
260 /// and enqueuing its S3 object for durable deletion in the same transaction.
261 ///
262 /// Both the storage refund and the S3-delete enqueue must succeed together
263 /// with the row delete — otherwise a future caller of this function could
264 /// forget either step and leak storage credit or orphan an S3 object.
265 /// `delete_version_row_only` exists for cases where the caller has already
266 /// handled both side effects (e.g. cascading item delete that batches them).
267 #[tracing::instrument(skip_all)]
268 pub async fn delete_version(pool: &PgPool, version_id: VersionId) -> Result<()> {
269 let Some(version) = get_version_by_id(pool, version_id).await? else {
270 return Ok(()); // already gone — idempotent
271 };
272
273 // Look up the owning user so we can refund storage.
274 let owner_id: Option<super::UserId> = sqlx::query_scalar(
275 r#"
276 SELECT p.user_id
277 FROM versions v
278 JOIN items i ON v.item_id = i.id
279 JOIN projects p ON i.project_id = p.id
280 WHERE v.id = $1
281 "#,
282 )
283 .bind(version_id)
284 .fetch_optional(pool)
285 .await?;
286
287 let mut tx = pool.begin().await?;
288
289 // Refund storage ONLY when this DELETE actually removed the row. The
290 // get_version_by_id / owner lookup above run outside the tx, so a concurrent
291 // double-delete (double-click / retry) can let both requests past them;
292 // gating the decrement on rows_affected stops the second from refunding a
293 // second time and under-counting storage in the creator's favor (Run #12 LOW,
294 // the delete-side mirror of the confirm handlers' rows-affected discipline).
295 let deleted = sqlx::query("DELETE FROM versions WHERE id = $1")
296 .bind(version_id)
297 .execute(&mut *tx)
298 .await?;
299
300 if deleted.rows_affected() > 0
301 && let Some(user_id) = owner_id
302 && let Some(size) = version.file_size_bytes
303 && size > 0
304 {
305 crate::db::creator_tiers::decrement_storage_used(&mut *tx, user_id, size).await?;
306 }
307
308 tx.commit().await?;
309
310 if let Some(s3_key) = version.s3_key.as_deref()
311 && let Err(e) = crate::db::pending_s3_deletions::enqueue_deletions(
312 pool,
313 &[(s3_key.to_string(), "main".to_string())],
314 "version_delete",
315 )
316 .await
317 {
318 tracing::warn!(error = ?e, version_id = %version_id, "failed to enqueue S3 deletion for version");
319 }
320
321 Ok(())
322 }
323
324 /// Sum all version file sizes for a given item (for storage decrement on item delete).
325 #[tracing::instrument(skip_all)]
326 pub async fn sum_file_sizes_for_item(pool: &PgPool, item_id: super::ItemId) -> Result<i64> {
327 // SUM over many bigints widens to NUMERIC in Postgres; clamp on both
328 // sides (>=0 and <=i64::MAX) before casting back to BIGINT — without
329 // GREATEST(0, ...), a corrupt-negative row could propagate a negative
330 // total that later under-flows storage accounting.
331 let total: i64 = sqlx::query_scalar(
332 "SELECT COALESCE(GREATEST(0, LEAST(SUM(file_size_bytes), 9223372036854775807))::BIGINT, 0) FROM versions WHERE item_id = $1 AND file_size_bytes IS NOT NULL",
333 )
334 .bind(item_id)
335 .fetch_one(pool)
336 .await?;
337
338 Ok(total)
339 }
340