Skip to main content

max / makenotwork

Fix remaining integration test failures from Audit Run 18 Resolve test deserialization failures, CSRF exemptions, and OAuth form-encoded fixes identified during audit. Delete fingerprinting tests (tables dropped in migration 082). Add batch_id index migration. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-02 19:23 UTC
Commit: 87dad573086898d06823bb93488e71ba7e119bc2
Parent: 698d0e6
13 files changed, +77 insertions, -265 deletions
@@ -0,0 +1,7 @@
1 + -- Fix: batch_id unique index was too restrictive — a single push batch
2 + -- inserts multiple sync_log rows (one per change), all sharing the same
3 + -- batch_id. Replace the unique index with a regular index used only by
4 + -- the idempotency MAX(seq) query.
5 +
6 + DROP INDEX IF EXISTS idx_sync_log_batch_id;
7 + CREATE INDEX idx_sync_log_batch_id ON sync_log (app_id, user_id, batch_id) WHERE batch_id IS NOT NULL;
@@ -140,6 +140,7 @@ pub async fn csrf_middleware(request: Request, next: Next) -> Response {
140 140 "/api/sync/devices", "/api/sync/keys", "/api/sync/blobs",
141 141 "/oauth", "/auth/passkey", "/postmark/",
142 142 "/unsubscribe", "/confirm-delete",
143 + "/api/checkout/guest",
143 144 ];
144 145
145 146 if exempt_paths.iter().any(|p| path.starts_with(p)) {
@@ -24,14 +24,18 @@ pub async fn add_project_member(
24 24 ) -> Result<DbProjectMember> {
25 25 let mut tx = pool.begin().await?;
26 26
27 - // Lock all existing members to prevent concurrent split modifications
27 + // Lock all existing members to prevent concurrent split modifications.
28 + // FOR UPDATE cannot be combined with aggregate functions, so we lock
29 + // the rows first, then compute the sum.
30 + let _locked: Vec<(i16,)> = sqlx::query_as(
31 + "SELECT split_percent FROM project_members WHERE project_id = $1 FOR UPDATE",
32 + )
33 + .bind(project_id)
34 + .fetch_all(&mut *tx)
35 + .await?;
36 +
28 37 let current_total: (Option<i64>,) = sqlx::query_as(
29 - r#"
30 - SELECT SUM(split_percent)::BIGINT
31 - FROM project_members
32 - WHERE project_id = $1
33 - FOR UPDATE
34 - "#,
38 + "SELECT SUM(split_percent)::BIGINT FROM project_members WHERE project_id = $1",
35 39 )
36 40 .bind(project_id)
37 41 .fetch_one(&mut *tx)
@@ -263,16 +263,16 @@ pub async fn push_sync_changes(
263 263
264 264 // Idempotency check: if this batch was already committed, return the
265 265 // existing cursor without re-inserting.
266 - let existing: Option<(i64,)> = sqlx::query_as(
266 + let existing: (Option<i64>,) = sqlx::query_as(
267 267 "SELECT MAX(seq) FROM sync_log WHERE app_id = $1 AND user_id = $2 AND batch_id = $3",
268 268 )
269 269 .bind(app_id)
270 270 .bind(user_id)
271 271 .bind(batch_id)
272 - .fetch_optional(pool)
272 + .fetch_one(pool)
273 273 .await?;
274 274
275 - if let Some((max_seq,)) = existing {
275 + if let Some(max_seq) = existing.0 {
276 276 tracing::debug!(batch_id = %batch_id, cursor = max_seq, "Push batch already committed, returning existing cursor");
277 277 return Ok(max_seq);
278 278 }
@@ -851,7 +851,7 @@ pub async fn create_free_guest_transaction(
851 851 guest_email, claim_token, download_token
852 852 )
853 853 VALUES ($1, $2, $3, 0, 0, $4, 'completed', NOW(), $5, $6, false, $7, $8, $9)
854 - ON CONFLICT (guest_email, item_id) WHERE status = 'pending' DO NOTHING
854 + ON CONFLICT (guest_email, item_id) WHERE status = 'pending' AND guest_email IS NOT NULL DO NOTHING
855 855 "#,
856 856 )
857 857 .bind(buyer_id)
@@ -212,7 +212,7 @@ pub(super) async fn library_tab_communities(
212 212
213 213 /// Render the login page.
214 214 #[tracing::instrument(skip_all, name = "landing::login_page")]
215 - pub(super) async fn login_page(session: Session) -> impl IntoResponse {
215 + pub(crate) async fn login_page(session: Session) -> impl IntoResponse {
216 216 LoginTemplate {
217 217 csrf_token: get_csrf_token(&session).await,
218 218 }
@@ -6,7 +6,7 @@ mod docs;
6 6 mod feed;
7 7 mod health;
8 8 pub(crate) mod join_wizard;
9 - mod landing;
9 + pub(crate) mod landing;
10 10 mod two_factor;
11 11
12 12 use axum::{
@@ -45,7 +45,8 @@ pub fn public_routes() -> Router<AppState> {
45 45 .route("/library/tabs/communities", get(landing::library_tab_communities))
46 46 .route("/health", get(health::health))
47 47 .route("/api/health", get(health::health_json))
48 - .route("/login", get(landing::login_page))
48 + // NOTE: GET /login is registered in auth_routes() alongside POST /login
49 + // to avoid Axum merge conflicts that strip rate limiting layers.
49 50 // Join wizard
50 51 .route("/join", get(join_wizard::wizard_page))
51 52 .route(
@@ -1,6 +1,4 @@
1 - //! Content fingerprinting integration tests:
2 - //! - Download fingerprint registry
3 - //! - Streaming session management (IP binding, concurrency cap)
1 + //! Content protection integration tests:
4 2 //! - License verification (phone-home activation binding, JWT)
5 3 //! - License deactivation (free slot)
6 4
@@ -8,215 +6,6 @@ use crate::harness::TestHarness;
8 6 use serde_json::Value;
9 7
10 8 // =============================================================================
11 - // Fingerprint Registry
12 - // =============================================================================
13 -
14 - #[tokio::test]
15 - async fn fingerprint_recorded_in_db() {
16 - let mut h = TestHarness::new().await;
17 - let user_id = h
18 - .signup("fpuser", "fpuser@test.com", "password123")
19 - .await;
20 -
21 - // Insert a fingerprint directly via SQL and verify it exists
22 - let fp_id: uuid::Uuid = sqlx::query_scalar(
23 - "INSERT INTO download_fingerprints (user_id, content_type, content_id, watermark_method, ip_address)
24 - VALUES ($1, 'file', 'test-content-123', 'visible', '10.0.0.1')
25 - RETURNING fingerprint_id",
26 - )
27 - .bind(user_id)
28 - .fetch_one(&h.db)
29 - .await
30 - .unwrap();
31 -
32 - // Look it up
33 - let row: (String, String) = sqlx::query_as(
34 - "SELECT content_type, content_id FROM download_fingerprints WHERE fingerprint_id = $1",
35 - )
36 - .bind(fp_id)
37 - .fetch_one(&h.db)
38 - .await
39 - .unwrap();
40 -
41 - assert_eq!(row.0, "file");
42 - assert_eq!(row.1, "test-content-123");
43 - }
44 -
45 - #[tokio::test]
46 - async fn fingerprint_lookup_by_user_and_content() {
47 - let mut h = TestHarness::new().await;
48 - let user_id = h
49 - .signup("fplookup", "fplookup@test.com", "password123")
50 - .await;
51 -
52 - // Insert multiple fingerprints for the same user+content
53 - for _ in 0..3 {
54 - sqlx::query(
55 - "INSERT INTO download_fingerprints (user_id, content_type, content_id)
56 - VALUES ($1, 'stream', 'audio-item-42')",
57 - )
58 - .bind(user_id)
59 - .execute(&h.db)
60 - .await
61 - .unwrap();
62 - }
63 -
64 - let count: i64 = sqlx::query_scalar(
65 - "SELECT COUNT(*) FROM download_fingerprints
66 - WHERE user_id = $1 AND content_type = 'stream' AND content_id = 'audio-item-42'",
67 - )
68 - .bind(user_id)
69 - .fetch_one(&h.db)
70 - .await
71 - .unwrap();
72 -
73 - assert_eq!(count, 3);
74 - }
75 -
76 - // =============================================================================
77 - // Streaming Sessions
78 - // =============================================================================
79 -
80 - #[tokio::test]
81 - async fn streaming_session_create_and_validate() {
82 - let mut h = TestHarness::new().await;
83 - let user_id = h
84 - .signup("streamer", "streamer@test.com", "password123")
85 - .await;
86 -
87 - // Create a session
88 - let token: uuid::Uuid = sqlx::query_scalar(
89 - "INSERT INTO streaming_sessions (user_id, content_id, ip_address)
90 - VALUES ($1, 'item-99', '10.0.0.1')
91 - RETURNING session_token",
92 - )
93 - .bind(user_id)
94 - .fetch_one(&h.db)
95 - .await
96 - .unwrap();
97 -
98 - // Validate: matching IP should succeed
99 - let valid: Option<(i64,)> = sqlx::query_as(
100 - "SELECT id FROM streaming_sessions
101 - WHERE session_token = $1 AND ip_address = '10.0.0.1' AND expired = false",
102 - )
103 - .bind(token)
104 - .fetch_optional(&h.db)
105 - .await
106 - .unwrap();
107 - assert!(valid.is_some(), "Session should be valid with matching IP");
108 -
109 - // Validate: different IP should not match
110 - let invalid: Option<(i64,)> = sqlx::query_as(
111 - "SELECT id FROM streaming_sessions
112 - WHERE session_token = $1 AND ip_address = '10.0.0.2' AND expired = false",
113 - )
114 - .bind(token)
115 - .fetch_optional(&h.db)
116 - .await
117 - .unwrap();
118 - assert!(invalid.is_none(), "Session should not match different IP");
119 - }
120 -
121 - #[tokio::test]
122 - async fn streaming_session_concurrency_cap() {
123 - let mut h = TestHarness::new().await;
124 - let user_id = h
125 - .signup("capuser", "capuser@test.com", "password123")
126 - .await;
127 -
128 - // Create MAX_CONCURRENT_STREAMS sessions
129 - for i in 0..2 {
130 - sqlx::query(
131 - "INSERT INTO streaming_sessions (user_id, content_id, ip_address)
132 - VALUES ($1, $2, '10.0.0.1')",
133 - )
134 - .bind(user_id)
135 - .bind(format!("item-{}", i))
136 - .execute(&h.db)
137 - .await
138 - .unwrap();
139 - }
140 -
141 - // Count active sessions
142 - let active: i64 = sqlx::query_scalar(
143 - "SELECT COUNT(*) FROM streaming_sessions WHERE user_id = $1 AND expired = false",
144 - )
145 - .bind(user_id)
146 - .fetch_one(&h.db)
147 - .await
148 - .unwrap();
149 - assert_eq!(active, 2, "Should have 2 active sessions");
150 -
151 - // Expire one session
152 - sqlx::query(
153 - "UPDATE streaming_sessions SET expired = true
154 - WHERE user_id = $1 AND content_id = 'item-0'",
155 - )
156 - .bind(user_id)
157 - .execute(&h.db)
158 - .await
159 - .unwrap();
160 -
161 - let active_after: i64 = sqlx::query_scalar(
162 - "SELECT COUNT(*) FROM streaming_sessions WHERE user_id = $1 AND expired = false",
163 - )
164 - .bind(user_id)
165 - .fetch_one(&h.db)
166 - .await
167 - .unwrap();
168 - assert_eq!(active_after, 1, "Should have 1 active session after expiry");
169 - }
170 -
171 - #[tokio::test]
172 - async fn streaming_session_stale_expiry() {
173 - let mut h = TestHarness::new().await;
174 - let user_id = h
175 - .signup("staleuser", "staleuser@test.com", "password123")
176 - .await;
177 -
178 - // Create a session with old last_active_at
179 - sqlx::query(
180 - "INSERT INTO streaming_sessions (user_id, content_id, ip_address, last_active_at)
181 - VALUES ($1, 'old-item', '10.0.0.1', now() - interval '2 hours')",
182 - )
183 - .bind(user_id)
184 - .execute(&h.db)
185 - .await
186 - .unwrap();
187 -
188 - // Create a fresh session
189 - sqlx::query(
190 - "INSERT INTO streaming_sessions (user_id, content_id, ip_address)
191 - VALUES ($1, 'new-item', '10.0.0.1')",
192 - )
193 - .bind(user_id)
194 - .execute(&h.db)
195 - .await
196 - .unwrap();
197 -
198 - // Expire stale sessions (> 1 hour inactive)
199 - let expired = sqlx::query(
200 - "UPDATE streaming_sessions SET expired = true
201 - WHERE expired = false AND last_active_at < now() - interval '1 hour'",
202 - )
203 - .execute(&h.db)
204 - .await
205 - .unwrap();
206 - assert_eq!(expired.rows_affected(), 1, "Should expire 1 stale session");
207 -
208 - // Fresh session should still be active
209 - let active: i64 = sqlx::query_scalar(
210 - "SELECT COUNT(*) FROM streaming_sessions WHERE user_id = $1 AND expired = false",
211 - )
212 - .bind(user_id)
213 - .fetch_one(&h.db)
214 - .await
215 - .unwrap();
216 - assert_eq!(active, 1, "Fresh session should still be active");
217 - }
218 -
219 - // =============================================================================
220 9 // License Verification (phone-home)
221 10 // =============================================================================
222 11
@@ -51,7 +51,7 @@ async fn setup_paid_item(h: &mut TestHarness, price_cents: i64) -> (String, Stri
51 51
52 52 #[tokio::test]
53 53 async fn guest_checkout_creates_session() {
54 - let mut h = TestHarness::new().await;
54 + let mut h = TestHarness::with_mocks().await;
55 55 let (_, item_id) = setup_paid_item(&mut h, 999).await;
56 56
57 57 let body = json!({}).to_string();
@@ -88,7 +88,10 @@ async fn guest_checkout_private_item_404() {
88 88 let item: Value = resp.json();
89 89 let item_id = item["id"].as_str().unwrap();
90 90
91 - // Don't publish — item is private
91 + // Explicitly un-publish (items default to is_public=true)
92 + h.client
93 + .put_form(&format!("/api/items/{}", item_id), "is_public=false")
94 + .await;
92 95 h.client.post_form("/logout", "").await;
93 96
94 97 let body = json!({}).to_string();
@@ -161,6 +164,11 @@ async fn buy_page_private_item_404() {
161 164 let item: Value = resp.json();
162 165 let item_id = item["id"].as_str().unwrap();
163 166
167 + // Explicitly un-publish (items default to is_public=true)
168 + h.client
169 + .put_form(&format!("/api/items/{}", item_id), "is_public=false")
170 + .await;
171 +
164 172 h.client.post_form("/logout", "").await;
165 173 let resp = h.client.get(&format!("/buy/{}", item_id)).await;
166 174 assert_eq!(resp.status.as_u16(), 404);
@@ -145,19 +145,18 @@ async fn oauth_full_flow() {
145 145 let (code, state) = authorize(&mut h, &client_id, &challenge, "oauthuser", "Password1!").await;
146 146 assert_eq!(state, "test-state-12345");
147 147
148 - // Exchange code for token
148 + // Exchange code for token (OAuth RFC requires form-encoded)
149 149 let resp = h
150 150 .client
151 - .post_json(
151 + .post_form(
152 152 "/oauth/token",
153 - &serde_json::json!({
154 - "grant_type": "authorization_code",
155 - "code": code,
156 - "redirect_uri": "http://127.0.0.1:9999/callback",
157 - "code_verifier": verifier,
158 - "client_id": client_id,
159 - })
160 - .to_string(),
153 + &format!(
154 + "grant_type=authorization_code&code={}&redirect_uri={}&code_verifier={}&client_id={}",
155 + code,
156 + urlencoding::encode("http://127.0.0.1:9999/callback"),
157 + verifier,
158 + client_id,
159 + ),
161 160 )
162 161 .await;
163 162 assert_eq!(resp.status.as_u16(), 200, "Token exchange failed: {}", resp.text);
@@ -184,16 +183,14 @@ async fn oauth_pkce_wrong_verifier() {
184 183 // Use wrong verifier
185 184 let resp = h
186 185 .client
187 - .post_json(
186 + .post_form(
188 187 "/oauth/token",
189 - &serde_json::json!({
190 - "grant_type": "authorization_code",
191 - "code": code,
192 - "redirect_uri": "http://127.0.0.1:9999/callback",
193 - "code_verifier": "this-is-the-wrong-verifier-and-should-fail",
194 - "client_id": client_id,
195 - })
196 - .to_string(),
188 + &format!(
189 + "grant_type=authorization_code&code={}&redirect_uri={}&code_verifier=this-is-the-wrong-verifier-and-should-fail&client_id={}",
190 + code,
191 + urlencoding::encode("http://127.0.0.1:9999/callback"),
192 + client_id,
193 + ),
197 194 )
198 195 .await;
199 196 assert_eq!(resp.status.as_u16(), 400, "Wrong PKCE verifier should be rejected");
@@ -210,21 +207,20 @@ async fn oauth_code_single_use() {
210 207
211 208 let (code, _) = authorize(&mut h, &client_id, &challenge, "oauthonce", "Password1!").await;
212 209
213 - let token_body = serde_json::json!({
214 - "grant_type": "authorization_code",
215 - "code": code,
216 - "redirect_uri": "http://127.0.0.1:9999/callback",
217 - "code_verifier": verifier,
218 - "client_id": client_id,
219 - })
220 - .to_string();
210 + let token_body = format!(
211 + "grant_type=authorization_code&code={}&redirect_uri={}&code_verifier={}&client_id={}",
212 + code,
213 + urlencoding::encode("http://127.0.0.1:9999/callback"),
214 + verifier,
215 + client_id,
216 + );
221 217
222 218 // First exchange — should succeed
223 - let resp = h.client.post_json("/oauth/token", &token_body).await;
219 + let resp = h.client.post_form("/oauth/token", &token_body).await;
224 220 assert_eq!(resp.status.as_u16(), 200, "First token exchange failed: {}", resp.text);
225 221
226 222 // Second exchange with same code — should fail
227 - let resp = h.client.post_json("/oauth/token", &token_body).await;
223 + let resp = h.client.post_form("/oauth/token", &token_body).await;
228 224 assert_eq!(resp.status.as_u16(), 400, "Reused auth code should be rejected");
229 225 }
230 226
@@ -370,7 +370,10 @@ async fn purchase_unlisted_item_fails() {
370 370 let item: Value = resp.json();
371 371 let item_id = item["id"].as_str().unwrap();
372 372
373 - // Deliberately NOT publishing: project and item stay private
373 + // Explicitly un-publish: items default to is_public=true in DB
374 + h.client
375 + .put_form(&format!("/api/items/{}", item_id), "is_public=false")
376 + .await;
374 377 h.client.post_form("/logout", "").await;
375 378
376 379 // Buyer tries to checkout
@@ -215,6 +215,7 @@ async fn push_pull_roundtrip() {
215 215 "/api/sync/push",
216 216 &json!({
217 217 "device_id": device_id,
218 + "batch_id": uuid::Uuid::new_v4().to_string(),
218 219 "changes": [
219 220 { "table": "tasks", "op": "INSERT", "row_id": "aaa", "timestamp": "2025-01-01T00:00:00Z", "data": {"title": "Task 1"} },
220 221 { "table": "tasks", "op": "UPDATE", "row_id": "aaa", "timestamp": "2025-01-01T00:01:00Z", "data": {"title": "Task 1 updated"} },
@@ -256,12 +257,12 @@ async fn key_management() {
256 257 let mut h = TestHarness::new().await;
257 258 setup_authenticated(&mut h).await;
258 259
259 - // PUT encrypted key
260 + // PUT encrypted key (expected_version 0 = no prior key)
260 261 let resp = h
261 262 .client
262 263 .put_json(
263 264 "/api/sync/keys",
264 - &json!({ "encrypted_key": "encrypted-master-key-blob" }).to_string(),
265 + &json!({ "encrypted_key": "encrypted-master-key-blob", "expected_version": 0 }).to_string(),
265 266 )
266 267 .await;
267 268 assert_eq!(resp.status, 204, "Put key failed: {}", resp.text);
@@ -273,12 +274,12 @@ async fn key_management() {
273 274 assert_eq!(key.encrypted_key, "encrypted-master-key-blob");
274 275 assert_eq!(key.key_version, 1);
275 276
276 - // PUT again (upsert bumps version)
277 + // PUT again (upsert bumps version, expect current version 1)
277 278 let resp = h
278 279 .client
279 280 .put_json(
280 281 "/api/sync/keys",
281 - &json!({ "encrypted_key": "rotated-key-blob" }).to_string(),
282 + &json!({ "encrypted_key": "rotated-key-blob", "expected_version": 1 }).to_string(),
282 283 )
283 284 .await;
284 285 assert_eq!(resp.status, 204);
@@ -306,7 +307,7 @@ async fn unauthenticated_rejected() {
306 307
307 308 let resp = h
308 309 .client
309 - .post_json("/api/sync/push", &json!({"device_id": 1, "changes": []}).to_string())
310 + .post_json("/api/sync/push", &json!({"device_id": 1, "batch_id": uuid::Uuid::new_v4().to_string(), "changes": []}).to_string())
310 311 .await;
311 312 assert_eq!(resp.status, 401);
312 313 }
@@ -333,7 +334,7 @@ async fn push_validation() {
333 334 .client
334 335 .post_json(
335 336 "/api/sync/push",
336 - &json!({ "device_id": device_id, "changes": changes }).to_string(),
337 + &json!({ "device_id": device_id, "batch_id": uuid::Uuid::new_v4().to_string(), "changes": changes }).to_string(),
337 338 )
338 339 .await;
339 340 assert_eq!(resp.status, 400, "Expected 400 for >500 changes: {}", resp.text);
@@ -345,6 +346,7 @@ async fn push_validation() {
345 346 "/api/sync/push",
346 347 &json!({
347 348 "device_id": device_id,
349 + "batch_id": uuid::Uuid::new_v4().to_string(),
348 350 "changes": [{
349 351 "table": "tasks",
350 352 "op": "DELETE",
@@ -363,7 +365,7 @@ async fn push_validation() {
363 365 .client
364 366 .post_json(
365 367 "/api/sync/push",
366 - &json!({ "device_id": device_id, "changes": [] }).to_string(),
368 + &json!({ "device_id": device_id, "batch_id": uuid::Uuid::new_v4().to_string(), "changes": [] }).to_string(),
367 369 )
368 370 .await;
369 371 assert_eq!(resp.status, 400, "Expected 400 for empty changes: {}", resp.text);
@@ -105,6 +105,7 @@ async fn push_mixed_changes(h: &mut TestHarness, device_id: SyncDeviceId) -> i64
105 105 "/api/sync/push",
106 106 &json!({
107 107 "device_id": device_id,
108 + "batch_id": uuid::Uuid::new_v4().to_string(),
108 109 "changes": [
109 110 { "table": "tasks", "op": "INSERT", "row_id": "t1", "timestamp": "2025-01-01T00:00:00Z", "data": {"title": "Task 1"} },
110 111 { "table": "events", "op": "INSERT", "row_id": "e1", "timestamp": "2025-01-01T01:00:00Z", "data": {"title": "Event 1"} },