//! Project member and revenue split operations. use sqlx::PgPool; use super::id_types::*; use super::models::*; use super::ProjectRole; use crate::error::{AppError, Result}; // ── Project Members ── /// Add a member to a project with a revenue split percentage (0-100). /// /// The sum of all member splits (excluding the owner's implicit remainder) /// must not exceed 100. #[tracing::instrument(skip(pool))] pub async fn add_project_member( pool: &PgPool, project_id: ProjectId, user_id: UserId, role: ProjectRole, split_percent: i16, added_by: UserId, ) -> Result { let mut tx = pool.begin().await?; // Lock all existing members to prevent concurrent split modifications. // FOR UPDATE cannot be combined with aggregate functions, so we lock // the rows first, then compute the sum. // Reject negative and out-of-range splits before any DB write. The Run #6 // audit caught the missing lower bound — negative percentages flow through // `compute_splits` and record negative obligations. if !(0..=100).contains(&split_percent) { return Err(AppError::BadRequest(format!( "split_percent must be between 0 and 100 (got {split_percent})" ))); } let _locked: Vec<(i16,)> = sqlx::query_as( "SELECT split_percent FROM project_members WHERE project_id = $1 FOR UPDATE", ) .bind(project_id) .fetch_all(&mut *tx) .await?; // Subtract the existing row's split (if this is an upsert) before the cap // check; otherwise a legitimate update is rejected as "> 100%" whenever // current_split + new_split crosses 100 even if the post-update total wouldn't. let existing_split: (Option,) = sqlx::query_as( "SELECT split_percent FROM project_members WHERE project_id = $1 AND user_id = $2", ) .bind(project_id) .bind(user_id) .fetch_optional(&mut *tx) .await? .map(|r: (i16,)| (Some(r.0),)) .unwrap_or((None,)); let existing = existing_split.0.unwrap_or(0) as i64; let current_total: (Option,) = sqlx::query_as( "SELECT SUM(split_percent)::BIGINT FROM project_members WHERE project_id = $1", ) .bind(project_id) .fetch_one(&mut *tx) .await?; let total = current_total.0.unwrap_or(0); let new_total = total - existing + split_percent as i64; if new_total > 100 { return Err(AppError::BadRequest(format!( "Total split would be {new_total}%, exceeding 100%" ))); } let member = sqlx::query_as::<_, DbProjectMember>( r#" INSERT INTO project_members (project_id, user_id, role, split_percent, added_by) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (project_id, user_id) DO UPDATE SET role = EXCLUDED.role, split_percent = EXCLUDED.split_percent RETURNING * "#, ) .bind(project_id) .bind(user_id) .bind(role) .bind(split_percent) .bind(added_by) .fetch_one(&mut *tx) .await?; tx.commit().await?; Ok(member) } /// Remove a member from a project. #[tracing::instrument(skip(pool))] pub async fn remove_project_member( pool: &PgPool, project_id: ProjectId, user_id: UserId, ) -> Result { let result = sqlx::query( "DELETE FROM project_members WHERE project_id = $1 AND user_id = $2", ) .bind(project_id) .bind(user_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } /// Get all members of a project with user info, ordered by split descending. #[tracing::instrument(skip(pool))] pub async fn get_project_members( pool: &PgPool, project_id: ProjectId, ) -> Result> { let members = sqlx::query_as::<_, DbProjectMemberWithUser>( r#" SELECT pm.id, pm.project_id, pm.user_id, pm.role, pm.split_percent, pm.added_at, u.username, u.display_name, u.stripe_account_id, u.stripe_charges_enabled FROM project_members pm JOIN users u ON u.id = pm.user_id WHERE pm.project_id = $1 ORDER BY pm.split_percent DESC "#, ) .bind(project_id) .fetch_all(pool) .await?; Ok(members) } /// Get the total split percentage allocated to members (excluding the owner). #[tracing::instrument(skip(pool))] pub async fn get_total_split_percent(pool: &PgPool, project_id: ProjectId) -> Result { let row: (Option,) = sqlx::query_as( "SELECT SUM(split_percent)::BIGINT FROM project_members WHERE project_id = $1", ) .bind(project_id) .fetch_one(pool) .await?; Ok(row.0.unwrap_or(0)) } /// Update a member's split percentage. #[allow(dead_code)] #[tracing::instrument(skip(pool))] pub async fn update_member_split( pool: &PgPool, project_id: ProjectId, user_id: UserId, new_split_percent: i16, ) -> Result<()> { if !(0..=100).contains(&new_split_percent) { return Err(AppError::BadRequest(format!( "split_percent must be between 0 and 100 (got {new_split_percent})" ))); } let mut tx = pool.begin().await?; // Lock all members for this project to prevent concurrent split modifications let current: Option = sqlx::query_as( "SELECT * FROM project_members WHERE project_id = $1 AND user_id = $2 FOR UPDATE", ) .bind(project_id) .bind(user_id) .fetch_optional(&mut *tx) .await?; let current_member_split = current.map(|m| m.split_percent as i64).unwrap_or(0); let total_row: (Option,) = sqlx::query_as( r#" SELECT SUM(split_percent)::BIGINT FROM project_members WHERE project_id = $1 FOR UPDATE "#, ) .bind(project_id) .fetch_one(&mut *tx) .await?; let total = total_row.0.unwrap_or(0); let new_total = total - current_member_split + new_split_percent as i64; if new_total > 100 { return Err(AppError::BadRequest(format!( "Total split would be {}%, exceeding 100%", new_total ))); } sqlx::query( "UPDATE project_members SET split_percent = $3 WHERE project_id = $1 AND user_id = $2", ) .bind(project_id) .bind(user_id) .bind(new_split_percent) .execute(&mut *tx) .await?; tx.commit().await?; Ok(()) } // ── Revenue Splits ── /// Record revenue splits for a completed tip. #[tracing::instrument(skip(pool))] pub async fn create_tip_splits( pool: &PgPool, tip_id: TipId, splits: &[(UserId, i64, i16)], // (recipient_id, amount_cents, split_percent) ) -> Result<()> { if splits.is_empty() { return Ok(()); } let recipient_ids: Vec = splits.iter().map(|(id, _, _)| *id).collect(); let amounts: Vec = splits.iter().map(|(_, a, _)| *a as i32).collect(); let percents: Vec = splits.iter().map(|(_, _, p)| *p).collect(); sqlx::query( r#" INSERT INTO revenue_splits (tip_id, recipient_id, amount_cents, split_percent, status) SELECT $1, UNNEST($2::uuid[]), UNNEST($3::int[]), UNNEST($4::smallint[]), 'pending' "#, ) .bind(tip_id) .bind(&recipient_ids) .bind(&amounts) .bind(&percents) .execute(pool) .await?; Ok(()) } /// Record revenue splits for a completed transaction (item purchase). #[tracing::instrument(skip(pool))] pub async fn create_transaction_splits( pool: &PgPool, transaction_id: TransactionId, splits: &[(UserId, i64, i16)], // (recipient_id, amount_cents, split_percent) ) -> Result<()> { if splits.is_empty() { return Ok(()); } let recipient_ids: Vec = splits.iter().map(|(id, _, _)| *id).collect(); let amounts: Vec = splits.iter().map(|(_, a, _)| *a as i32).collect(); let percents: Vec = splits.iter().map(|(_, _, p)| *p).collect(); sqlx::query( r#" INSERT INTO revenue_splits (transaction_id, recipient_id, amount_cents, split_percent, status) SELECT $1, UNNEST($2::uuid[]), UNNEST($3::int[]), UNNEST($4::smallint[]), 'pending' "#, ) .bind(transaction_id) .bind(&recipient_ids) .bind(&amounts) .bind(&percents) .execute(pool) .await?; Ok(()) } /// Get all revenue splits for a recipient, most recent first. #[allow(dead_code)] #[tracing::instrument(skip(pool))] pub async fn get_splits_for_recipient( pool: &PgPool, recipient_id: UserId, limit: i64, offset: i64, ) -> Result> { let splits = sqlx::query_as::<_, DbRevenueSplit>( r#" SELECT * FROM revenue_splits WHERE recipient_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3 "#, ) .bind(recipient_id) .bind(limit) .bind(offset) .fetch_all(pool) .await?; Ok(splits) } /// Total split revenue owed to a recipient (all completed splits). #[tracing::instrument(skip(pool))] pub async fn total_split_revenue(pool: &PgPool, recipient_id: UserId) -> Result { let row: (Option,) = sqlx::query_as( "SELECT SUM(amount_cents)::BIGINT FROM revenue_splits WHERE recipient_id = $1", ) .bind(recipient_id) .fetch_one(pool) .await?; Ok(row.0.unwrap_or(0)) } /// Count of split records for a recipient. #[tracing::instrument(skip(pool))] pub async fn count_splits_for_recipient(pool: &PgPool, recipient_id: UserId) -> Result { let row: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM revenue_splits WHERE recipient_id = $1", ) .bind(recipient_id) .fetch_one(pool) .await?; Ok(row.0) } /// Get all splits involving a user (as owner or recipient) for CSV export. /// Returns splits where the user is either: /// - The recipient (collaborator receiving a share), or /// - The seller/tip recipient (owner who owes collaborators) #[tracing::instrument(skip(pool))] pub async fn get_splits_for_export( pool: &PgPool, user_id: UserId, ) -> Result> { let rows = sqlx::query_as::<_, DbSplitExportRow>( r#" SELECT rs.id, rs.recipient_id, rs.amount_cents, rs.split_percent, rs.created_at, CASE WHEN rs.transaction_id IS NOT NULL THEN 'sale' ELSE 'tip' END AS source_type, u.username AS recipient_username FROM revenue_splits rs JOIN users u ON u.id = rs.recipient_id LEFT JOIN transactions t ON t.id = rs.transaction_id LEFT JOIN tips tip ON tip.id = rs.tip_id WHERE rs.recipient_id = $1 OR COALESCE(t.seller_id, tip.recipient_id) = $1 ORDER BY rs.created_at DESC "#, ) .bind(user_id) .fetch_all(pool) .await?; Ok(rows) } /// Total split obligations owed by a project owner (splits on their transactions/tips). #[tracing::instrument(skip(pool))] pub async fn total_split_obligations(pool: &PgPool, owner_id: UserId) -> Result { let row: (Option,) = sqlx::query_as( r#" SELECT SUM(rs.amount_cents)::BIGINT FROM revenue_splits rs LEFT JOIN transactions t ON t.id = rs.transaction_id LEFT JOIN tips tip ON tip.id = rs.tip_id WHERE COALESCE(t.seller_id, tip.recipient_id) = $1 "#, ) .bind(owner_id) .fetch_one(pool) .await?; Ok(row.0.unwrap_or(0)) }