| 1 |
|
| 2 |
|
| 3 |
use chrono::Utc; |
| 4 |
use goingson_core::CoreError; |
| 5 |
use sqlx::SqlitePool; |
| 6 |
use synckit_client::{ChangeEntry, ChangeOp, SyncKitClient}; |
| 7 |
use tracing::{debug, warn}; |
| 8 |
use uuid::Uuid; |
| 9 |
|
| 10 |
use super::PUSH_BATCH_LIMIT; |
| 11 |
|
| 12 |
pub async fn push_changes( |
| 13 |
pool: &SqlitePool, |
| 14 |
client: &SyncKitClient, |
| 15 |
device_id: Uuid, |
| 16 |
) -> Result<i64, CoreError> { |
| 17 |
let mut total_pushed: i64 = 0; |
| 18 |
|
| 19 |
loop { |
| 20 |
let rows: Vec<(i64, String, String, String, String, Option<String>)> = sqlx::query_as( |
| 21 |
"SELECT id, table_name, op, row_id, timestamp, data FROM sync_changelog WHERE pushed = 0 ORDER BY id ASC LIMIT ?" |
| 22 |
) |
| 23 |
.bind(PUSH_BATCH_LIMIT) |
| 24 |
.fetch_all(pool) |
| 25 |
.await |
| 26 |
.map_err(CoreError::database)?; |
| 27 |
|
| 28 |
if rows.is_empty() { |
| 29 |
break; |
| 30 |
} |
| 31 |
|
| 32 |
let row_count = rows.len() as i64; |
| 33 |
let mut pushed_ids: Vec<i64> = Vec::new(); |
| 34 |
let changes: Vec<ChangeEntry> = rows |
| 35 |
.into_iter() |
| 36 |
.filter_map(|(id, table, op, row_id, timestamp, data)| { |
| 37 |
|
| 38 |
pushed_ids.push(id); |
| 39 |
|
| 40 |
let ts = chrono::DateTime::parse_from_rfc3339(×tamp) |
| 41 |
.map(|dt| dt.with_timezone(&Utc)) |
| 42 |
.unwrap_or(chrono::DateTime::UNIX_EPOCH); |
| 43 |
|
| 44 |
let json_data = data.and_then(|d| serde_json::from_str(&d).ok()); |
| 45 |
|
| 46 |
let change_op = match ChangeOp::from_str_opt(&op) { |
| 47 |
Some(o) => o, |
| 48 |
None => { |
| 49 |
warn!("Skipping changelog entry with unknown op: {}", op); |
| 50 |
return None; |
| 51 |
} |
| 52 |
}; |
| 53 |
|
| 54 |
Some(ChangeEntry { |
| 55 |
table, |
| 56 |
op: change_op, |
| 57 |
row_id, |
| 58 |
timestamp: ts, |
| 59 |
data: json_data, |
| 60 |
}) |
| 61 |
}) |
| 62 |
.collect(); |
| 63 |
|
| 64 |
let count = changes.len() as i64; |
| 65 |
let is_last_batch = row_count < PUSH_BATCH_LIMIT; |
| 66 |
|
| 67 |
client |
| 68 |
.push(device_id, changes) |
| 69 |
.await |
| 70 |
.map_err(|e| CoreError::sync(format!("push failed: {}", e)))?; |
| 71 |
|
| 72 |
|
| 73 |
let mut tx = pool.begin().await.map_err(CoreError::database)?; |
| 74 |
for id in &pushed_ids { |
| 75 |
sqlx::query("UPDATE sync_changelog SET pushed = 1 WHERE id = ?") |
| 76 |
.bind(id) |
| 77 |
.execute(&mut *tx) |
| 78 |
.await |
| 79 |
.map_err(CoreError::database)?; |
| 80 |
} |
| 81 |
tx.commit().await.map_err(CoreError::database)?; |
| 82 |
|
| 83 |
total_pushed += count; |
| 84 |
|
| 85 |
if is_last_batch { |
| 86 |
break; |
| 87 |
} |
| 88 |
} |
| 89 |
|
| 90 |
if total_pushed > 0 { |
| 91 |
debug!("Pushed {} changes", total_pushed); |
| 92 |
} |
| 93 |
Ok(total_pushed) |
| 94 |
} |
| 95 |
|