//! Item version management: release creation, listing, and download tracking. use sqlx::PgPool; use super::models::*; use super::{ItemId, UserId, VersionId}; use crate::error::Result; /// Create a new version for an item, marking it as the current release. /// /// Wrapped in a transaction so the UPDATE (clearing the old `is_current` /// flag) and INSERT (setting the new version as current) either both /// succeed or both roll back. #[tracing::instrument(skip_all)] #[allow(clippy::too_many_arguments)] pub async fn create_version( pool: &PgPool, item_id: ItemId, version_number: &str, changelog: Option<&str>, file_url: Option<&str>, file_size_bytes: Option, file_name: Option<&str>, label: Option<&str>, ) -> Result { let mut tx = pool.begin().await?; // Unset current on older version numbers (versions with the same number stay current) sqlx::query("UPDATE versions SET is_current = false WHERE item_id = $1 AND version_number != $2") .bind(item_id) .bind(version_number) .execute(&mut *tx) .await?; // Create new version as current let version = sqlx::query_as::<_, DbVersion>( r#" INSERT INTO versions (item_id, version_number, changelog, file_url, file_size_bytes, file_name, is_current, label) VALUES ($1, $2, $3, $4, $5, $6, true, $7) RETURNING * "#, ) .bind(item_id) .bind(version_number) .bind(changelog) .bind(file_url) .bind(file_size_bytes) .bind(file_name) .bind(label) .fetch_one(&mut *tx) .await?; tx.commit().await?; Ok(version) } /// Hard cap on rows returned by the non-paginated version listing. Items /// with this many versions are exceptional; if we ever hit the cap a warning /// fires so we can promote the caller to cursor pagination. Real pagination /// is deferred (Phase 6/8) — this constant just makes the truncation loud. pub const VERSIONS_LIST_HARD_CAP: i64 = 5000; /// List all versions for an item, newest first. /// /// Capped at `VERSIONS_LIST_HARD_CAP` as a safety limit. Hitting the cap is /// logged at WARN so we notice before a real user gets silently truncated. #[tracing::instrument(skip_all)] pub async fn get_versions_by_item(pool: &PgPool, item_id: ItemId) -> Result> { let versions = sqlx::query_as::<_, DbVersion>( "SELECT * FROM versions WHERE item_id = $1 ORDER BY created_at DESC LIMIT $2", ) .bind(item_id) .bind(VERSIONS_LIST_HARD_CAP) .fetch_all(pool) .await?; if versions.len() as i64 == VERSIONS_LIST_HARD_CAP { tracing::warn!( %item_id, cap = VERSIONS_LIST_HARD_CAP, "get_versions_by_item hit hard cap; promote caller to cursor pagination" ); } Ok(versions) } /// Batch-load versions for multiple items, grouped by item_id. #[tracing::instrument(skip_all)] pub async fn get_versions_by_items( pool: &PgPool, item_ids: &[ItemId], ) -> Result>> { let versions = sqlx::query_as::<_, DbVersion>( "SELECT * FROM versions WHERE item_id = ANY($1) ORDER BY item_id, created_at DESC", ) .bind(item_ids) .fetch_all(pool) .await?; let mut map: std::collections::HashMap> = std::collections::HashMap::new(); for v in versions { map.entry(v.item_id).or_default().push(v); } Ok(map) } /// Atomically increment the download counter for a version. #[tracing::instrument(skip_all)] pub async fn increment_download_count(pool: &PgPool, version_id: VersionId) -> Result<()> { sqlx::query("UPDATE versions SET download_count = download_count + 1 WHERE id = $1") .bind(version_id) .execute(pool) .await?; Ok(()) } /// Record that a user downloaded a specific version (idempotent). #[tracing::instrument(skip_all)] pub async fn record_user_download( pool: &PgPool, user_id: UserId, item_id: ItemId, version_id: VersionId, ) -> Result<()> { // Explicit conflict target — the table's PRIMARY KEY is // (user_id, item_id, version_id), but `ON CONFLICT DO NOTHING` without // a target would silently swallow conflicts on ANY future constraint // we add (a unique index on downloaded_at, say). Naming the target // means a new constraint surfaces as an error rather than a no-op. sqlx::query( r#" INSERT INTO user_downloads (user_id, item_id, version_id) VALUES ($1, $2, $3) ON CONFLICT (user_id, item_id, version_id) DO NOTHING "#, ) .bind(user_id) .bind(item_id) .bind(version_id) .execute(pool) .await?; Ok(()) } /// Get the latest version ID a user has downloaded for an item, if any. #[tracing::instrument(skip_all)] pub async fn get_user_latest_download( pool: &PgPool, user_id: UserId, item_id: ItemId, ) -> Result> { let row: Option<(VersionId,)> = sqlx::query_as( r#" SELECT ud.version_id FROM user_downloads ud JOIN versions v ON v.id = ud.version_id WHERE ud.user_id = $1 AND ud.item_id = $2 ORDER BY v.created_at DESC LIMIT 1 "#, ) .bind(user_id) .bind(item_id) .fetch_optional(pool) .await?; Ok(row.map(|(id,)| id)) } /// Fetch a version by primary key. Returns `None` if not found. #[tracing::instrument(skip_all)] pub async fn get_version_by_id(pool: &PgPool, version_id: VersionId) -> Result> { let version = sqlx::query_as::<_, DbVersion>("SELECT * FROM versions WHERE id = $1") .bind(version_id) .fetch_optional(pool) .await?; Ok(version) } /// Collect all S3 keys for versions owned by a user. /// /// Returns s3_key, file_name, version_number, item title, and project slug. /// Only includes versions that have an S3 key. #[tracing::instrument(skip_all)] pub async fn get_user_version_s3_keys( pool: &PgPool, user_id: super::UserId, ) -> Result> { let rows = sqlx::query_as::<_, VersionS3KeyRow>( r#" SELECT v.s3_key, v.file_name, v.version_number, i.title AS item_title, p.id AS project_id, p.slug AS project_slug, v.file_size_bytes FROM versions v JOIN items i ON v.item_id = i.id JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1 AND v.s3_key IS NOT NULL ORDER BY p.slug, i.sort_order, v.created_at DESC LIMIT $2 "#, ) .bind(user_id) .bind(VERSIONS_LIST_HARD_CAP) .fetch_all(pool) .await?; if rows.len() as i64 == VERSIONS_LIST_HARD_CAP { tracing::warn!( %user_id, cap = VERSIONS_LIST_HARD_CAP, "get_user_version_s3_keys (account export) hit hard cap; some files will be omitted from export" ); } Ok(rows) } /// Update a version's S3 key, file size, and file name in one query. /// /// `expected_old_s3_key` guards against a lost-update race: two concurrent /// confirms can both pass the idempotency gate (reading the same prior /// `version.s3_key`) and both succeed in incrementing storage; without this /// guard, the second's UPDATE silently overwrites the first's, leaking S3 /// objects and double-charging storage. /// /// `Ok(None)` means the row exists but the `s3_key` no longer matches the /// expected value — the caller is responsible for the rollback (refund the /// storage increment, delete the new S3 object). #[tracing::instrument(skip_all)] pub async fn update_version_file<'e>( executor: impl sqlx::PgExecutor<'e>, version_id: VersionId, expected_old_s3_key: Option<&str>, s3_key: &str, file_size_bytes: Option, file_name: Option<&str>, ) -> Result> { let version = sqlx::query_as::<_, DbVersion>( r#" UPDATE versions SET s3_key = $2, file_size_bytes = $3, file_name = $4 WHERE id = $1 AND s3_key IS NOT DISTINCT FROM $5 RETURNING * "#, ) .bind(version_id) .bind(s3_key) .bind(file_size_bytes) .bind(file_name) .bind(expected_old_s3_key) .fetch_optional(executor) .await?; Ok(version) } /// Delete a version by ID, decrementing the owning user's storage counter /// and enqueuing its S3 object for durable deletion in the same transaction. /// /// Both the storage refund and the S3-delete enqueue must succeed together /// with the row delete — otherwise a future caller of this function could /// forget either step and leak storage credit or orphan an S3 object. /// `delete_version_row_only` exists for cases where the caller has already /// handled both side effects (e.g. cascading item delete that batches them). #[tracing::instrument(skip_all)] pub async fn delete_version(pool: &PgPool, version_id: VersionId) -> Result<()> { let Some(version) = get_version_by_id(pool, version_id).await? else { return Ok(()); // already gone — idempotent }; // Look up the owning user so we can refund storage. let owner_id: Option = sqlx::query_scalar( r#" SELECT p.user_id FROM versions v JOIN items i ON v.item_id = i.id JOIN projects p ON i.project_id = p.id WHERE v.id = $1 "#, ) .bind(version_id) .fetch_optional(pool) .await?; let mut tx = pool.begin().await?; // Refund storage ONLY when this DELETE actually removed the row. The // get_version_by_id / owner lookup above run outside the tx, so a concurrent // double-delete (double-click / retry) can let both requests past them; // gating the decrement on rows_affected stops the second from refunding a // second time and under-counting storage in the creator's favor (Run #12 LOW, // the delete-side mirror of the confirm handlers' rows-affected discipline). let deleted = sqlx::query("DELETE FROM versions WHERE id = $1") .bind(version_id) .execute(&mut *tx) .await?; if deleted.rows_affected() > 0 && let Some(user_id) = owner_id && let Some(size) = version.file_size_bytes && size > 0 { crate::db::creator_tiers::decrement_storage_used(&mut *tx, user_id, size).await?; } tx.commit().await?; if let Some(s3_key) = version.s3_key.as_deref() && let Err(e) = crate::db::pending_s3_deletions::enqueue_deletions( pool, &[(s3_key.to_string(), "main".to_string())], "version_delete", ) .await { tracing::warn!(error = ?e, version_id = %version_id, "failed to enqueue S3 deletion for version"); } Ok(()) } /// Sum all version file sizes for a given item (for storage decrement on item delete). #[tracing::instrument(skip_all)] pub async fn sum_file_sizes_for_item(pool: &PgPool, item_id: super::ItemId) -> Result { // SUM over many bigints widens to NUMERIC in Postgres; clamp on both // sides (>=0 and <=i64::MAX) before casting back to BIGINT — without // GREATEST(0, ...), a corrupt-negative row could propagate a negative // total that later under-flows storage accounting. let total: i64 = sqlx::query_scalar( "SELECT COALESCE(GREATEST(0, LEAST(SUM(file_size_bytes), 9223372036854775807))::BIGINT, 0) FROM versions WHERE item_id = $1 AND file_size_bytes IS NOT NULL", ) .bind(item_id) .fetch_one(pool) .await?; Ok(total) }