Skip to main content

max / balanced_breakfast

11.7 KB · 363 lines History Blame Raw
1 //! Push local changes to the remote sync server.
2
3 use chrono::Utc;
4 use sqlx::SqlitePool;
5 use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient};
6 use tracing::{debug, instrument, warn};
7 use uuid::Uuid;
8
9 use crate::commands::error::ApiError;
10
11 use super::PUSH_BATCH_LIMIT;
12
13 #[instrument(skip_all)]
14 pub async fn push_changes(
15 pool: &SqlitePool,
16 client: &SyncKitClient,
17 device_id: Uuid,
18 ) -> Result<i64, ApiError> {
19 let rows: Vec<(i64, String, String, String, String, Option<String>)> = sqlx::query_as(
20 "SELECT id, table_name, op, row_id, timestamp, data \
21 FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC LIMIT ?",
22 )
23 .bind(PUSH_BATCH_LIMIT)
24 .fetch_all(pool)
25 .await
26 .map_err(super::db_err)?;
27
28 if rows.is_empty() {
29 return Ok(0);
30 }
31
32 // Track which changelog IDs are actually included in the push.
33 // Entries with corrupt JSON or unknown ops are skipped — they must NOT
34 // be marked as pushed, or their data is silently lost.
35 let mut pushed_ids: Vec<i64> = Vec::with_capacity(rows.len());
36 let mut skipped_ids: Vec<i64> = Vec::new();
37
38 let changes: Vec<ChangeEntry> = rows
39 .into_iter()
40 .filter_map(|(id, table, op, row_id, timestamp, data)| {
41 let ts = chrono::DateTime::parse_from_rfc3339(&timestamp)
42 .map(|dt| dt.with_timezone(&Utc))
43 .unwrap_or_else(|e| {
44 warn!(raw = %timestamp, error = %e, "Malformed changelog timestamp, using now");
45 Utc::now()
46 });
47
48 let json_data = data.and_then(|d| match serde_json::from_str(&d) {
49 Ok(v) => Some(v),
50 Err(e) => {
51 warn!(id, error = %e, raw = %d, "Corrupt changelog JSON, skipping entry");
52 None
53 }
54 });
55
56 let change_op = match ChangeOp::from_str_opt(&op) {
57 Some(o) => o,
58 None => {
59 warn!(id, op = %op, "Skipping changelog entry with unknown op");
60 skipped_ids.push(id);
61 return None;
62 }
63 };
64
65 pushed_ids.push(id);
66 Some(ChangeEntry {
67 table,
68 op: change_op,
69 row_id,
70 timestamp: ts,
71 data: json_data,
72 })
73 })
74 .collect();
75
76 if !skipped_ids.is_empty() {
77 warn!(count = skipped_ids.len(), "Skipped unpushable changelog entries (will retry)");
78 }
79
80 let count = changes.len() as i64;
81
82 if !changes.is_empty() {
83 client
84 .push(device_id, changes)
85 .await
86 .map_err(|e| ApiError::internal(format!("push failed: {}", e)))?;
87 }
88
89 // Mark only successfully-pushed IDs (not skipped entries).
90 // Batch into groups of 999 to stay under SQLite's variable limit.
91 for chunk in pushed_ids.chunks(999) {
92 let placeholders = chunk.iter().map(|_| "?").collect::<Vec<_>>().join(",");
93 let sql = format!(
94 "UPDATE sync_changelog SET pushed = 1 WHERE id IN ({})",
95 placeholders
96 );
97 let mut query = sqlx::query(&sql);
98 for id in chunk {
99 query = query.bind(id);
100 }
101 query
102 .execute(pool)
103 .await
104 .map_err(super::db_err)?;
105 }
106
107 debug!(count, "Pushed changes");
108 Ok(count)
109 }
110
111 #[cfg(test)]
112 mod tests {
113 use super::super::tests::*;
114 use super::*;
115 use synckit_client::{SyncKitClient, SyncKitConfig};
116 use wiremock::matchers::{method, path};
117 use wiremock::{Mock, MockServer, ResponseTemplate};
118
119 /// Create a SyncKitClient pointing at the mock server with auth and key set.
120 fn mock_client(server_url: &str) -> SyncKitClient {
121 let client = SyncKitClient::new(SyncKitConfig {
122 server_url: server_url.to_string(),
123 api_key: "test-key".to_string(),
124 });
125 // Use a plain token (no JWT exp check when jwt_exp returns None)
126 client.restore_session(
127 "fake-token",
128 Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
129 Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(),
130 );
131 client.set_master_key_raw([42u8; 32]);
132 client
133 }
134
135 // ── Empty changelog ──
136
137 #[tokio::test]
138 async fn push_changes_empty_changelog_returns_zero() {
139 let pool = setup_test_db().await;
140 sqlx::query("DELETE FROM sync_changelog")
141 .execute(&pool)
142 .await
143 .unwrap();
144
145 // No mock server needed -- push_changes returns early before HTTP
146 let server = MockServer::start().await;
147 let client = mock_client(&server.uri());
148 let device_id = Uuid::new_v4();
149
150 let result = push_changes(&pool, &client, device_id).await.unwrap();
151 assert_eq!(result, 0);
152 }
153
154 // ── Successful push ──
155
156 #[tokio::test]
157 async fn push_changes_sends_and_marks_pushed() {
158 let pool = setup_test_db().await;
159 sqlx::query("DELETE FROM sync_changelog")
160 .execute(&pool)
161 .await
162 .unwrap();
163
164 // Insert changelog entries directly (bypass triggers)
165 sqlx::query(
166 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \
167 VALUES ('feeds', 'INSERT', 'feed-1', datetime('now'), '{\"id\":\"feed-1\",\"name\":\"Test\"}', 0)"
168 )
169 .execute(&pool)
170 .await
171 .unwrap();
172
173 sqlx::query(
174 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \
175 VALUES ('user_config', 'INSERT', 'theme', datetime('now'), '{\"key\":\"theme\",\"value\":\"dark\"}', 0)"
176 )
177 .execute(&pool)
178 .await
179 .unwrap();
180
181 let server = MockServer::start().await;
182 Mock::given(method("POST"))
183 .and(path("/api/v1/sync/push"))
184 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
185 "cursor": 42
186 })))
187 .expect(1)
188 .mount(&server)
189 .await;
190
191 let client = mock_client(&server.uri());
192 let device_id = Uuid::new_v4();
193
194 let result = push_changes(&pool, &client, device_id).await.unwrap();
195 assert_eq!(result, 2);
196
197 // Verify entries are marked as pushed
198 let (unpushed,): (i64,) =
199 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0")
200 .fetch_one(&pool)
201 .await
202 .unwrap();
203 assert_eq!(unpushed, 0);
204
205 let (pushed,): (i64,) =
206 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 1")
207 .fetch_one(&pool)
208 .await
209 .unwrap();
210 assert_eq!(pushed, 2);
211 }
212
213 // ── Only unpushed entries are sent ──
214
215 #[tokio::test]
216 async fn push_changes_skips_already_pushed() {
217 let pool = setup_test_db().await;
218 sqlx::query("DELETE FROM sync_changelog")
219 .execute(&pool)
220 .await
221 .unwrap();
222
223 // One already pushed, one pending
224 sqlx::query(
225 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \
226 VALUES ('feeds', 'INSERT', 'old', datetime('now'), '{\"id\":\"old\"}', 1)"
227 )
228 .execute(&pool)
229 .await
230 .unwrap();
231
232 sqlx::query(
233 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \
234 VALUES ('feeds', 'UPDATE', 'new', datetime('now'), '{\"id\":\"new\"}', 0)"
235 )
236 .execute(&pool)
237 .await
238 .unwrap();
239
240 let server = MockServer::start().await;
241 Mock::given(method("POST"))
242 .and(path("/api/v1/sync/push"))
243 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
244 "cursor": 10
245 })))
246 .expect(1)
247 .mount(&server)
248 .await;
249
250 let client = mock_client(&server.uri());
251 let result = push_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
252 assert_eq!(result, 1);
253 }
254
255 // ── Delete entries (no data) ──
256
257 #[tokio::test]
258 async fn push_changes_handles_delete_with_null_data() {
259 let pool = setup_test_db().await;
260 sqlx::query("DELETE FROM sync_changelog")
261 .execute(&pool)
262 .await
263 .unwrap();
264
265 sqlx::query(
266 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \
267 VALUES ('feeds', 'DELETE', 'deleted-feed', datetime('now'), NULL, 0)"
268 )
269 .execute(&pool)
270 .await
271 .unwrap();
272
273 let server = MockServer::start().await;
274 Mock::given(method("POST"))
275 .and(path("/api/v1/sync/push"))
276 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
277 "cursor": 1
278 })))
279 .expect(1)
280 .mount(&server)
281 .await;
282
283 let client = mock_client(&server.uri());
284 let result = push_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
285 assert_eq!(result, 1);
286 }
287
288 // ── Trigger-produced entries push correctly ──
289
290 #[tokio::test]
291 async fn push_changes_from_trigger_produced_entries() {
292 let pool = setup_test_db().await;
293 sqlx::query("DELETE FROM sync_changelog")
294 .execute(&pool)
295 .await
296 .unwrap();
297
298 // Create a feed via the normal path (triggers fire, producing changelog entries)
299 let _feed_id = create_test_feed(&pool, "Trigger Feed").await;
300
301 let server = MockServer::start().await;
302 Mock::given(method("POST"))
303 .and(path("/api/v1/sync/push"))
304 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
305 "cursor": 5
306 })))
307 .expect(1)
308 .mount(&server)
309 .await;
310
311 let client = mock_client(&server.uri());
312 let result = push_changes(&pool, &client, Uuid::new_v4()).await.unwrap();
313 assert_eq!(result, 1); // One INSERT from the trigger
314
315 // Verify marked as pushed
316 let (unpushed,): (i64,) =
317 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0")
318 .fetch_one(&pool)
319 .await
320 .unwrap();
321 assert_eq!(unpushed, 0);
322 }
323
324 // ── Server error propagates ──
325
326 #[tokio::test]
327 async fn push_changes_propagates_server_error() {
328 let pool = setup_test_db().await;
329 sqlx::query("DELETE FROM sync_changelog")
330 .execute(&pool)
331 .await
332 .unwrap();
333
334 sqlx::query(
335 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, data, pushed) \
336 VALUES ('feeds', 'INSERT', 'fail', datetime('now'), '{\"id\":\"fail\"}', 0)"
337 )
338 .execute(&pool)
339 .await
340 .unwrap();
341
342 let server = MockServer::start().await;
343 Mock::given(method("POST"))
344 .and(path("/api/v1/sync/push"))
345 .respond_with(ResponseTemplate::new(400).set_body_string("bad request"))
346 .expect(1)
347 .mount(&server)
348 .await;
349
350 let client = mock_client(&server.uri());
351 let result = push_changes(&pool, &client, Uuid::new_v4()).await;
352 assert!(result.is_err());
353
354 // Entries should NOT be marked as pushed on failure
355 let (unpushed,): (i64,) =
356 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0")
357 .fetch_one(&pool)
358 .await
359 .unwrap();
360 assert_eq!(unpushed, 1, "entries should remain unpushed after server error");
361 }
362 }
363