Skip to main content

max / makenotwork

5.3 KB · 172 lines History Blame Raw
1 use serde_json::Value as JsonValue;
2 use sqlx::PgPool;
3
4 use crate::db::models::*;
5 use crate::db::{SyncAppId, SyncDeviceId, UserId};
6 use crate::error::Result;
7
8 // ── Sync Log ──
9
10 /// Push a batch of changes to the sync log. Returns the highest seq assigned.
11 ///
12 /// `batch_id` is a client-generated UUID for idempotent push. If a batch with
13 /// the same ID has already been committed for this app+user, the existing max
14 /// seq is returned without re-inserting (at-most-once semantics).
15 ///
16 /// All changes are inserted within a single transaction for atomicity and
17 /// performance (one fsync instead of N).
18 #[allow(clippy::type_complexity)]
19 #[tracing::instrument(skip_all)]
20 pub async fn push_sync_changes(
21 pool: &PgPool,
22 app_id: SyncAppId,
23 user_id: UserId,
24 device_id: SyncDeviceId,
25 batch_id: uuid::Uuid,
26 changes: &[(String, String, String, chrono::DateTime<chrono::Utc>, Option<JsonValue>)],
27 ) -> Result<i64> {
28 if changes.is_empty() {
29 return Ok(0);
30 }
31
32 // Decompose into parallel Vecs for UNNEST-based batch INSERT
33 let mut table_names: Vec<String> = Vec::with_capacity(changes.len());
34 let mut operations: Vec<String> = Vec::with_capacity(changes.len());
35 let mut row_ids: Vec<String> = Vec::with_capacity(changes.len());
36 let mut client_timestamps: Vec<chrono::DateTime<chrono::Utc>> = Vec::with_capacity(changes.len());
37 let mut data_values: Vec<JsonValue> = Vec::with_capacity(changes.len());
38
39 for (table_name, operation, row_id, client_timestamp, data) in changes {
40 table_names.push(table_name.clone());
41 operations.push(operation.clone());
42 row_ids.push(row_id.clone());
43 client_timestamps.push(*client_timestamp);
44 data_values.push(data.clone().unwrap_or(JsonValue::Null));
45 }
46
47 let mut tx = pool.begin().await?;
48
49 // Idempotency check inside the transaction to prevent TOCTOU race.
50 let existing: (Option<i64>,) = sqlx::query_as(
51 "SELECT MAX(seq) FROM sync_log WHERE app_id = $1 AND user_id = $2 AND batch_id = $3",
52 )
53 .bind(app_id)
54 .bind(user_id)
55 .bind(batch_id)
56 .fetch_one(&mut *tx)
57 .await?;
58
59 if let Some(max_seq) = existing.0 {
60 tracing::debug!(batch_id = %batch_id, cursor = max_seq, "Push batch already committed, returning existing cursor");
61 tx.rollback().await.ok();
62 return Ok(max_seq);
63 }
64
65 // Read the current key_id from sync_keys so pushed entries are stamped
66 // with the active encryption key. Falls back to NULL if no key exists yet
67 // (pre-encryption setup — entries have no encrypted data anyway).
68 let current_key_id: Option<i32> = sqlx::query_scalar(
69 "SELECT key_id FROM sync_keys WHERE app_id = $1 AND user_id = $2",
70 )
71 .bind(app_id)
72 .bind(user_id)
73 .fetch_optional(&mut *tx)
74 .await?;
75
76 let seqs: Vec<i64> = sqlx::query_scalar(
77 r#"
78 INSERT INTO sync_log (app_id, user_id, device_id, batch_id, table_name, operation, row_id, client_timestamp, data, key_id)
79 SELECT $1, $2, $3, $4, t.*, $10
80 FROM UNNEST($5::text[], $6::text[], $7::text[], $8::timestamptz[], $9::jsonb[]) AS t
81 RETURNING seq
82 "#,
83 )
84 .bind(app_id)
85 .bind(user_id)
86 .bind(device_id)
87 .bind(batch_id)
88 .bind(&table_names)
89 .bind(&operations)
90 .bind(&row_ids)
91 .bind(&client_timestamps)
92 .bind(&data_values)
93 .bind(current_key_id)
94 .fetch_all(&mut *tx)
95 .await?;
96
97 tx.commit().await?;
98
99 let max_seq = seqs.iter().copied().max().unwrap_or(0);
100
101 Ok(max_seq)
102 }
103
104 /// Pull changes since a cursor for a user within an app.
105 ///
106 /// Prefer `pull_sync_changes_filtered` for new code; it supports optional
107 /// table and timestamp filters. This function is kept for backward compatibility.
108 #[allow(dead_code)]
109 #[tracing::instrument(skip_all)]
110 pub async fn pull_sync_changes(
111 pool: &PgPool,
112 app_id: SyncAppId,
113 user_id: UserId,
114 cursor: i64,
115 limit: i64,
116 ) -> Result<Vec<DbSyncLogEntry>> {
117 let entries = sqlx::query_as::<_, DbSyncLogEntry>(
118 r#"
119 SELECT * FROM sync_log
120 WHERE app_id = $1 AND user_id = $2 AND seq > $3
121 ORDER BY seq ASC
122 LIMIT $4
123 "#,
124 )
125 .bind(app_id)
126 .bind(user_id)
127 .bind(cursor)
128 .bind(limit)
129 .fetch_all(pool)
130 .await?;
131
132 Ok(entries)
133 }
134
135 /// Pull changes since a cursor with optional table and timestamp filters.
136 ///
137 /// When `tables` is `Some`, only entries whose `table_name` is in the list are returned.
138 /// When `since` is `Some`, only entries with `client_timestamp >= since` are returned.
139 /// Both filters compose (AND). Passing `None` for both is identical to `pull_sync_changes`.
140 #[tracing::instrument(skip_all)]
141 pub async fn pull_sync_changes_filtered(
142 pool: &PgPool,
143 app_id: SyncAppId,
144 user_id: UserId,
145 cursor: i64,
146 limit: i64,
147 tables: Option<&[String]>,
148 since: Option<chrono::DateTime<chrono::Utc>>,
149 ) -> Result<Vec<DbSyncLogEntry>> {
150 let entries = sqlx::query_as::<_, DbSyncLogEntry>(
151 r#"
152 SELECT * FROM sync_log
153 WHERE app_id = $1 AND user_id = $2 AND seq > $3
154 AND ($5::text[] IS NULL OR table_name = ANY($5))
155 AND ($6::timestamptz IS NULL OR client_timestamp >= $6)
156 ORDER BY seq ASC
157 LIMIT $4
158 "#,
159 )
160 .bind(app_id)
161 .bind(user_id)
162 .bind(cursor)
163 .bind(limit)
164 .bind(tables)
165 .bind(since)
166 .fetch_all(pool)
167 .await?;
168
169 Ok(entries)
170 }
171
172