Skip to main content

max / goingson

2.8 KB · 95 lines History Blame Raw
1 //! Push local changes to the remote sync server.
2
3 use chrono::Utc;
4 use goingson_core::CoreError;
5 use sqlx::SqlitePool;
6 use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient};
7 use tracing::{debug, warn};
8 use uuid::Uuid;
9
10 use super::PUSH_BATCH_LIMIT;
11
12 pub async fn push_changes(
13 pool: &SqlitePool,
14 client: &SyncKitClient,
15 device_id: Uuid,
16 ) -> Result<i64, CoreError> {
17 let mut total_pushed: i64 = 0;
18
19 loop {
20 let rows: Vec<(i64, String, String, String, String, Option<String>)> = sqlx::query_as(
21 "SELECT id, table_name, op, row_id, timestamp, data FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC LIMIT ?"
22 )
23 .bind(PUSH_BATCH_LIMIT)
24 .fetch_all(pool)
25 .await
26 .map_err(CoreError::database)?;
27
28 if rows.is_empty() {
29 break;
30 }
31
32 let row_count = rows.len() as i64;
33 let mut pushed_ids: Vec<i64> = Vec::new();
34 let changes: Vec<ChangeEntry> = rows
35 .into_iter()
36 .filter_map(|(id, table, op, row_id, timestamp, data)| {
37 // Always mark as pushed (even unknown ops) to avoid infinite re-fetch
38 pushed_ids.push(id);
39
40 let ts = chrono::DateTime::parse_from_rfc3339(&timestamp)
41 .map(|dt| dt.with_timezone(&Utc))
42 .unwrap_or(chrono::DateTime::UNIX_EPOCH);
43
44 let json_data = data.and_then(|d| serde_json::from_str(&d).ok());
45
46 let change_op = match ChangeOp::from_str_opt(&op) {
47 Some(o) => o,
48 None => {
49 warn!("Skipping changelog entry with unknown op: {}", op);
50 return None;
51 }
52 };
53
54 Some(ChangeEntry {
55 table,
56 op: change_op,
57 row_id,
58 timestamp: ts,
59 data: json_data,
60 })
61 })
62 .collect();
63
64 let count = changes.len() as i64;
65 let is_last_batch = row_count < PUSH_BATCH_LIMIT;
66
67 client
68 .push(device_id, changes)
69 .await
70 .map_err(|e| CoreError::sync(format!("push failed: {}", e)))?;
71
72 // Mark pushed entries in a single transaction
73 let mut tx = pool.begin().await.map_err(CoreError::database)?;
74 for id in &pushed_ids {
75 sqlx::query("UPDATE sync_changelog SET pushed = 1 WHERE id = ?")
76 .bind(id)
77 .execute(&mut *tx)
78 .await
79 .map_err(CoreError::database)?;
80 }
81 tx.commit().await.map_err(CoreError::database)?;
82
83 total_pushed += count;
84
85 if is_last_batch {
86 break;
87 }
88 }
89
90 if total_pushed > 0 {
91 debug!("Pushed {} changes", total_pushed);
92 }
93 Ok(total_pushed)
94 }
95