Skip to main content

max / makenotwork

11.1 KB · 366 lines History Blame Raw
1 //! Project member and revenue split operations.
2
3 use sqlx::PgPool;
4
5 use super::id_types::*;
6 use super::models::*;
7 use super::ProjectRole;
8 use crate::error::{AppError, Result};
9
10 // ── Project Members ──
11
12 /// Add a member to a project with a revenue split percentage (0-100).
13 ///
14 /// The sum of all member splits (excluding the owner's implicit remainder)
15 /// must not exceed 100.
16 #[tracing::instrument(skip(pool))]
17 pub async fn add_project_member(
18 pool: &PgPool,
19 project_id: ProjectId,
20 user_id: UserId,
21 role: ProjectRole,
22 split_percent: i16,
23 added_by: UserId,
24 ) -> Result<DbProjectMember> {
25 let mut tx = pool.begin().await?;
26
27 // Lock all existing members to prevent concurrent split modifications.
28 // FOR UPDATE cannot be combined with aggregate functions, so we lock
29 // the rows first, then compute the sum.
30 // Reject negative and out-of-range splits before any DB write. The Run #6
31 // audit caught the missing lower bound — negative percentages flow through
32 // `compute_splits` and record negative obligations.
33 if !(0..=100).contains(&split_percent) {
34 return Err(AppError::BadRequest(format!(
35 "split_percent must be between 0 and 100 (got {split_percent})"
36 )));
37 }
38
39 let _locked: Vec<(i16,)> = sqlx::query_as(
40 "SELECT split_percent FROM project_members WHERE project_id = $1 FOR UPDATE",
41 )
42 .bind(project_id)
43 .fetch_all(&mut *tx)
44 .await?;
45
46 // Subtract the existing row's split (if this is an upsert) before the cap
47 // check; otherwise a legitimate update is rejected as "> 100%" whenever
48 // current_split + new_split crosses 100 even if the post-update total wouldn't.
49 let existing_split: (Option<i16>,) = sqlx::query_as(
50 "SELECT split_percent FROM project_members WHERE project_id = $1 AND user_id = $2",
51 )
52 .bind(project_id)
53 .bind(user_id)
54 .fetch_optional(&mut *tx)
55 .await?
56 .map(|r: (i16,)| (Some(r.0),))
57 .unwrap_or((None,));
58 let existing = existing_split.0.unwrap_or(0) as i64;
59
60 let current_total: (Option<i64>,) = sqlx::query_as(
61 "SELECT SUM(split_percent)::BIGINT FROM project_members WHERE project_id = $1",
62 )
63 .bind(project_id)
64 .fetch_one(&mut *tx)
65 .await?;
66
67 let total = current_total.0.unwrap_or(0);
68 let new_total = total - existing + split_percent as i64;
69 if new_total > 100 {
70 return Err(AppError::BadRequest(format!(
71 "Total split would be {new_total}%, exceeding 100%"
72 )));
73 }
74
75 let member = sqlx::query_as::<_, DbProjectMember>(
76 r#"
77 INSERT INTO project_members (project_id, user_id, role, split_percent, added_by)
78 VALUES ($1, $2, $3, $4, $5)
79 ON CONFLICT (project_id, user_id) DO UPDATE
80 SET role = EXCLUDED.role,
81 split_percent = EXCLUDED.split_percent
82 RETURNING *
83 "#,
84 )
85 .bind(project_id)
86 .bind(user_id)
87 .bind(role)
88 .bind(split_percent)
89 .bind(added_by)
90 .fetch_one(&mut *tx)
91 .await?;
92
93 tx.commit().await?;
94 Ok(member)
95 }
96
97 /// Remove a member from a project.
98 #[tracing::instrument(skip(pool))]
99 pub async fn remove_project_member(
100 pool: &PgPool,
101 project_id: ProjectId,
102 user_id: UserId,
103 ) -> Result<bool> {
104 let result = sqlx::query(
105 "DELETE FROM project_members WHERE project_id = $1 AND user_id = $2",
106 )
107 .bind(project_id)
108 .bind(user_id)
109 .execute(pool)
110 .await?;
111
112 Ok(result.rows_affected() > 0)
113 }
114
115 /// Get all members of a project with user info, ordered by split descending.
116 #[tracing::instrument(skip(pool))]
117 pub async fn get_project_members(
118 pool: &PgPool,
119 project_id: ProjectId,
120 ) -> Result<Vec<DbProjectMemberWithUser>> {
121 let members = sqlx::query_as::<_, DbProjectMemberWithUser>(
122 r#"
123 SELECT pm.id, pm.project_id, pm.user_id, pm.role, pm.split_percent, pm.added_at,
124 u.username, u.display_name, u.stripe_account_id, u.stripe_charges_enabled
125 FROM project_members pm
126 JOIN users u ON u.id = pm.user_id
127 WHERE pm.project_id = $1
128 ORDER BY pm.split_percent DESC
129 "#,
130 )
131 .bind(project_id)
132 .fetch_all(pool)
133 .await?;
134
135 Ok(members)
136 }
137
138 /// Get the total split percentage allocated to members (excluding the owner).
139 #[tracing::instrument(skip(pool))]
140 pub async fn get_total_split_percent(pool: &PgPool, project_id: ProjectId) -> Result<i64> {
141 let row: (Option<i64>,) = sqlx::query_as(
142 "SELECT SUM(split_percent)::BIGINT FROM project_members WHERE project_id = $1",
143 )
144 .bind(project_id)
145 .fetch_one(pool)
146 .await?;
147
148 Ok(row.0.unwrap_or(0))
149 }
150
151 /// Update a member's split percentage.
152 #[allow(dead_code)]
153 #[tracing::instrument(skip(pool))]
154 pub async fn update_member_split(
155 pool: &PgPool,
156 project_id: ProjectId,
157 user_id: UserId,
158 new_split_percent: i16,
159 ) -> Result<()> {
160 if !(0..=100).contains(&new_split_percent) {
161 return Err(AppError::BadRequest(format!(
162 "split_percent must be between 0 and 100 (got {new_split_percent})"
163 )));
164 }
165 let mut tx = pool.begin().await?;
166
167 // Lock all members for this project to prevent concurrent split modifications
168 let current: Option<DbProjectMember> = sqlx::query_as(
169 "SELECT * FROM project_members WHERE project_id = $1 AND user_id = $2 FOR UPDATE",
170 )
171 .bind(project_id)
172 .bind(user_id)
173 .fetch_optional(&mut *tx)
174 .await?;
175
176 let current_member_split = current.map(|m| m.split_percent as i64).unwrap_or(0);
177
178 let total_row: (Option<i64>,) = sqlx::query_as(
179 r#"
180 SELECT SUM(split_percent)::BIGINT
181 FROM project_members
182 WHERE project_id = $1
183 FOR UPDATE
184 "#,
185 )
186 .bind(project_id)
187 .fetch_one(&mut *tx)
188 .await?;
189
190 let total = total_row.0.unwrap_or(0);
191 let new_total = total - current_member_split + new_split_percent as i64;
192 if new_total > 100 {
193 return Err(AppError::BadRequest(format!(
194 "Total split would be {}%, exceeding 100%",
195 new_total
196 )));
197 }
198
199 sqlx::query(
200 "UPDATE project_members SET split_percent = $3 WHERE project_id = $1 AND user_id = $2",
201 )
202 .bind(project_id)
203 .bind(user_id)
204 .bind(new_split_percent)
205 .execute(&mut *tx)
206 .await?;
207
208 tx.commit().await?;
209 Ok(())
210 }
211
212 // ── Revenue Splits ──
213
214 /// Record revenue splits for a completed tip.
215 #[tracing::instrument(skip(pool))]
216 pub async fn create_tip_splits(
217 pool: &PgPool,
218 tip_id: TipId,
219 splits: &[(UserId, i64, i16)], // (recipient_id, amount_cents, split_percent)
220 ) -> Result<()> {
221 if splits.is_empty() { return Ok(()); }
222 let recipient_ids: Vec<UserId> = splits.iter().map(|(id, _, _)| *id).collect();
223 let amounts: Vec<i32> = splits.iter().map(|(_, a, _)| *a as i32).collect();
224 let percents: Vec<i16> = splits.iter().map(|(_, _, p)| *p).collect();
225 sqlx::query(
226 r#"
227 INSERT INTO revenue_splits (tip_id, recipient_id, amount_cents, split_percent, status)
228 SELECT $1, UNNEST($2::uuid[]), UNNEST($3::int[]), UNNEST($4::smallint[]), 'pending'
229 "#,
230 )
231 .bind(tip_id)
232 .bind(&recipient_ids)
233 .bind(&amounts)
234 .bind(&percents)
235 .execute(pool)
236 .await?;
237 Ok(())
238 }
239
240 /// Record revenue splits for a completed transaction (item purchase).
241 #[tracing::instrument(skip(pool))]
242 pub async fn create_transaction_splits(
243 pool: &PgPool,
244 transaction_id: TransactionId,
245 splits: &[(UserId, i64, i16)], // (recipient_id, amount_cents, split_percent)
246 ) -> Result<()> {
247 if splits.is_empty() { return Ok(()); }
248 let recipient_ids: Vec<UserId> = splits.iter().map(|(id, _, _)| *id).collect();
249 let amounts: Vec<i32> = splits.iter().map(|(_, a, _)| *a as i32).collect();
250 let percents: Vec<i16> = splits.iter().map(|(_, _, p)| *p).collect();
251 sqlx::query(
252 r#"
253 INSERT INTO revenue_splits (transaction_id, recipient_id, amount_cents, split_percent, status)
254 SELECT $1, UNNEST($2::uuid[]), UNNEST($3::int[]), UNNEST($4::smallint[]), 'pending'
255 "#,
256 )
257 .bind(transaction_id)
258 .bind(&recipient_ids)
259 .bind(&amounts)
260 .bind(&percents)
261 .execute(pool)
262 .await?;
263 Ok(())
264 }
265
266 /// Get all revenue splits for a recipient, most recent first.
267 #[allow(dead_code)]
268 #[tracing::instrument(skip(pool))]
269 pub async fn get_splits_for_recipient(
270 pool: &PgPool,
271 recipient_id: UserId,
272 limit: i64,
273 offset: i64,
274 ) -> Result<Vec<DbRevenueSplit>> {
275 let splits = sqlx::query_as::<_, DbRevenueSplit>(
276 r#"
277 SELECT * FROM revenue_splits
278 WHERE recipient_id = $1
279 ORDER BY created_at DESC
280 LIMIT $2 OFFSET $3
281 "#,
282 )
283 .bind(recipient_id)
284 .bind(limit)
285 .bind(offset)
286 .fetch_all(pool)
287 .await?;
288
289 Ok(splits)
290 }
291
292 /// Total split revenue owed to a recipient (all completed splits).
293 #[tracing::instrument(skip(pool))]
294 pub async fn total_split_revenue(pool: &PgPool, recipient_id: UserId) -> Result<i64> {
295 let row: (Option<i64>,) = sqlx::query_as(
296 "SELECT SUM(amount_cents)::BIGINT FROM revenue_splits WHERE recipient_id = $1",
297 )
298 .bind(recipient_id)
299 .fetch_one(pool)
300 .await?;
301
302 Ok(row.0.unwrap_or(0))
303 }
304
305 /// Count of split records for a recipient.
306 #[tracing::instrument(skip(pool))]
307 pub async fn count_splits_for_recipient(pool: &PgPool, recipient_id: UserId) -> Result<i64> {
308 let row: (i64,) = sqlx::query_as(
309 "SELECT COUNT(*) FROM revenue_splits WHERE recipient_id = $1",
310 )
311 .bind(recipient_id)
312 .fetch_one(pool)
313 .await?;
314
315 Ok(row.0)
316 }
317
318 /// Get all splits involving a user (as owner or recipient) for CSV export.
319 /// Returns splits where the user is either:
320 /// - The recipient (collaborator receiving a share), or
321 /// - The seller/tip recipient (owner who owes collaborators)
322 #[tracing::instrument(skip(pool))]
323 pub async fn get_splits_for_export(
324 pool: &PgPool,
325 user_id: UserId,
326 ) -> Result<Vec<DbSplitExportRow>> {
327 let rows = sqlx::query_as::<_, DbSplitExportRow>(
328 r#"
329 SELECT rs.id, rs.recipient_id, rs.amount_cents, rs.split_percent, rs.created_at,
330 CASE WHEN rs.transaction_id IS NOT NULL THEN 'sale' ELSE 'tip' END AS source_type,
331 u.username AS recipient_username
332 FROM revenue_splits rs
333 JOIN users u ON u.id = rs.recipient_id
334 LEFT JOIN transactions t ON t.id = rs.transaction_id
335 LEFT JOIN tips tip ON tip.id = rs.tip_id
336 WHERE rs.recipient_id = $1
337 OR COALESCE(t.seller_id, tip.recipient_id) = $1
338 ORDER BY rs.created_at DESC
339 "#,
340 )
341 .bind(user_id)
342 .fetch_all(pool)
343 .await?;
344
345 Ok(rows)
346 }
347
348 /// Total split obligations owed by a project owner (splits on their transactions/tips).
349 #[tracing::instrument(skip(pool))]
350 pub async fn total_split_obligations(pool: &PgPool, owner_id: UserId) -> Result<i64> {
351 let row: (Option<i64>,) = sqlx::query_as(
352 r#"
353 SELECT SUM(rs.amount_cents)::BIGINT
354 FROM revenue_splits rs
355 LEFT JOIN transactions t ON t.id = rs.transaction_id
356 LEFT JOIN tips tip ON tip.id = rs.tip_id
357 WHERE COALESCE(t.seller_id, tip.recipient_id) = $1
358 "#,
359 )
360 .bind(owner_id)
361 .fetch_one(pool)
362 .await?;
363
364 Ok(row.0.unwrap_or(0))
365 }
366