| 1 |
|
| 2 |
|
| 3 |
|
| 4 |
|
| 5 |
|
| 6 |
|
| 7 |
|
| 8 |
|
| 9 |
|
| 10 |
|
| 11 |
|
| 12 |
|
| 13 |
|
| 14 |
|
| 15 |
|
| 16 |
|
| 17 |
|
| 18 |
|
| 19 |
|
| 20 |
|
| 21 |
|
| 22 |
mod apply; |
| 23 |
pub(crate) mod blob_sync; |
| 24 |
mod pull; |
| 25 |
mod push; |
| 26 |
mod state; |
| 27 |
|
| 28 |
#[cfg(test)] |
| 29 |
#[path = "tests.rs"] |
| 30 |
mod tests; |
| 31 |
|
| 32 |
use std::path::Path; |
| 33 |
|
| 34 |
use chrono::Utc; |
| 35 |
use goingson_core::CoreError; |
| 36 |
use serde::{Deserialize, Serialize}; |
| 37 |
use sqlx::SqlitePool; |
| 38 |
use synckit_client::SyncKitClient; |
| 39 |
use tracing::{debug, info, warn}; |
| 40 |
|
| 41 |
|
| 42 |
pub use self::state::{get_sync_state, get_sync_states_batch, set_sync_state, ensure_device_registered}; |
| 43 |
pub use self::push::push_changes; |
| 44 |
pub use self::pull::pull_changes; |
| 45 |
|
| 46 |
|
| 47 |
pub(crate) const PUSH_BATCH_LIMIT: i64 = 500; |
| 48 |
|
| 49 |
|
| 50 |
pub(crate) const EMAIL_ACCOUNT_SYNC_COLS: &[&str] = &[ |
| 51 |
"id", "user_id", "account_name", "email_address", |
| 52 |
"imap_server", "imap_port", "smtp_server", "smtp_port", |
| 53 |
"username", "use_tls", "created_at", "archive_folder_name", |
| 54 |
"auth_type", "jmap_session_url", "jmap_account_id", "sync_interval_minutes", |
| 55 |
"email_signature", |
| 56 |
]; |
| 57 |
|
| 58 |
|
| 59 |
pub(crate) const UPSERT_ORDER: &[&str] = &[ |
| 60 |
"projects", |
| 61 |
"contacts", |
| 62 |
"email_accounts", |
| 63 |
"sync_accounts", |
| 64 |
"milestones", |
| 65 |
"tasks", |
| 66 |
"time_sessions", |
| 67 |
"attachments", |
| 68 |
"events", |
| 69 |
"annotations", |
| 70 |
"subtasks", |
| 71 |
"contact_emails", |
| 72 |
"contact_phones", |
| 73 |
"contact_social_handles", |
| 74 |
"contact_custom_fields", |
| 75 |
"daily_notes", |
| 76 |
]; |
| 77 |
|
| 78 |
|
| 79 |
pub(crate) const DELETE_ORDER: &[&str] = &[ |
| 80 |
"daily_notes", |
| 81 |
"contact_custom_fields", |
| 82 |
"contact_social_handles", |
| 83 |
"contact_phones", |
| 84 |
"contact_emails", |
| 85 |
"subtasks", |
| 86 |
"annotations", |
| 87 |
"events", |
| 88 |
"attachments", |
| 89 |
"time_sessions", |
| 90 |
"tasks", |
| 91 |
"milestones", |
| 92 |
"sync_accounts", |
| 93 |
"email_accounts", |
| 94 |
"contacts", |
| 95 |
"projects", |
| 96 |
]; |
| 97 |
|
| 98 |
|
| 99 |
#[derive(Debug, Serialize, Deserialize)] |
| 100 |
#[serde(rename_all = "camelCase")] |
| 101 |
pub struct SyncResult { |
| 102 |
pub pushed: i64, |
| 103 |
pub pulled: i64, |
| 104 |
} |
| 105 |
|
| 106 |
|
| 107 |
|
| 108 |
pub async fn perform_sync(pool: &SqlitePool, client: &SyncKitClient) -> Result<SyncResult, CoreError> { |
| 109 |
perform_sync_with_blobs(pool, client, None).await |
| 110 |
} |
| 111 |
|
| 112 |
pub async fn perform_sync_with_blobs( |
| 113 |
pool: &SqlitePool, |
| 114 |
client: &SyncKitClient, |
| 115 |
data_dir: Option<&Path>, |
| 116 |
) -> Result<SyncResult, CoreError> { |
| 117 |
|
| 118 |
|
| 119 |
set_sync_state(pool, "applying_remote", "0").await?; |
| 120 |
|
| 121 |
let device_id = ensure_device_registered(pool, client).await?; |
| 122 |
|
| 123 |
|
| 124 |
let pushed = push_changes(pool, client, device_id).await?; |
| 125 |
let pulled = pull_changes(pool, client, device_id).await?; |
| 126 |
|
| 127 |
|
| 128 |
if let Some(dir) = data_dir { |
| 129 |
if let Err(e) = blob_sync::upload_pending_blobs(pool, dir, client).await { |
| 130 |
warn!("Blob upload failed (non-fatal): {}", e); |
| 131 |
} |
| 132 |
if let Err(e) = blob_sync::download_missing_blobs(pool, dir, client).await { |
| 133 |
warn!("Blob download failed (non-fatal): {}", e); |
| 134 |
} |
| 135 |
} |
| 136 |
|
| 137 |
|
| 138 |
let now = Utc::now().to_rfc3339(); |
| 139 |
set_sync_state(pool, "last_sync_at", &now).await?; |
| 140 |
|
| 141 |
Ok(SyncResult { pushed, pulled }) |
| 142 |
} |
| 143 |
|
| 144 |
|
| 145 |
|
| 146 |
|
| 147 |
pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, CoreError> { |
| 148 |
let tables_and_cols: Vec<(&str, &[&str])> = UPSERT_ORDER |
| 149 |
.iter() |
| 150 |
.filter_map(|t| apply::table_columns(t).map(|c| (*t, c))) |
| 151 |
.collect(); |
| 152 |
|
| 153 |
let mut total: i64 = 0; |
| 154 |
|
| 155 |
for (table, columns) in &tables_and_cols { |
| 156 |
let col_list = columns |
| 157 |
.iter() |
| 158 |
.map(|c| format!("'{}', {}", c, c)) |
| 159 |
.collect::<Vec<_>>() |
| 160 |
.join(", "); |
| 161 |
|
| 162 |
let sql = format!( |
| 163 |
"INSERT INTO sync_changelog (table_name, op, row_id, data) \ |
| 164 |
SELECT '{table}', 'INSERT', id, json_object({col_list}) FROM {table} \ |
| 165 |
WHERE NOT EXISTS (SELECT 1 FROM sync_changelog sc WHERE sc.table_name = '{table}' AND sc.row_id = {table}.id)", |
| 166 |
); |
| 167 |
|
| 168 |
let result = sqlx::query(&sql) |
| 169 |
.execute(pool) |
| 170 |
.await |
| 171 |
.map_err(CoreError::database)?; |
| 172 |
|
| 173 |
total += result.rows_affected() as i64; |
| 174 |
} |
| 175 |
|
| 176 |
set_sync_state(pool, "initial_snapshot_done", "1").await?; |
| 177 |
info!("Initial sync snapshot created: {} rows", total); |
| 178 |
Ok(total) |
| 179 |
} |
| 180 |
|
| 181 |
|
| 182 |
|
| 183 |
|
| 184 |
pub async fn cleanup_changelog(pool: &SqlitePool) -> Result<i64, CoreError> { |
| 185 |
let result = sqlx::query( |
| 186 |
"DELETE FROM sync_changelog WHERE pushed = 1 AND timestamp < datetime('now', '-7 days')" |
| 187 |
) |
| 188 |
.execute(pool) |
| 189 |
.await |
| 190 |
.map_err(CoreError::database)?; |
| 191 |
|
| 192 |
let deleted = result.rows_affected() as i64; |
| 193 |
if deleted > 0 { |
| 194 |
debug!("Cleaned up {} old changelog entries", deleted); |
| 195 |
} |
| 196 |
Ok(deleted) |
| 197 |
} |
| 198 |
|
| 199 |
|
| 200 |
pub async fn count_pending_changes(pool: &SqlitePool) -> Result<i64, CoreError> { |
| 201 |
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0") |
| 202 |
.fetch_one(pool) |
| 203 |
.await |
| 204 |
.map_err(CoreError::database)?; |
| 205 |
Ok(row.0) |
| 206 |
} |
| 207 |
|