//! Core sync engine: push local changes, pull remote changes, apply to DB. //! //! # SQL Safety: `format!()` for table and column names //! //! Several functions in this module use `format!()` to interpolate table and column //! names into SQL strings (e.g., `apply_upsert`, `apply_delete`, `create_initial_snapshot`). //! This is safe because: //! //! 1. **Table names come from hardcoded constants** (`UPSERT_ORDER`, `DELETE_ORDER`) -- never //! from user input or remote data. //! 2. **Column names come from `table_columns()`**, a compile-time whitelist of `&'static str` //! literals -- also never from user input. //! 3. **All user-supplied values** (row IDs, field data) are passed through `sqlx::query().bind()`, //! which parameterizes them safely. //! 4. **Unknown table names are rejected** before any SQL is constructed: both `apply_upsert` and //! `apply_delete` return an error if `table_columns()` returns `None`. //! //! The `format!()` pattern is used here instead of a procedural macro because the table/column //! sets are dynamic per-row (determined by the sync changelog), but the values themselves are //! drawn exclusively from the static whitelist above. mod apply; pub(crate) mod blob_sync; mod pull; mod push; mod state; #[cfg(test)] #[path = "tests.rs"] mod tests; use std::path::Path; use chrono::Utc; use goingson_core::CoreError; use serde::{Deserialize, Serialize}; use sqlx::SqlitePool; use synckit_client::SyncKitClient; use tracing::{debug, info, warn}; // Re-export public API so `crate::sync_service::*` continues to work. pub use self::state::{get_sync_state, get_sync_states_batch, set_sync_state, ensure_device_registered}; pub use self::push::push_changes; pub use self::pull::pull_changes; /// Maximum changes to push in a single batch. pub(crate) const PUSH_BATCH_LIMIT: i64 = 500; /// Email account columns that sync (config only -- credentials stay per-device). pub(crate) const EMAIL_ACCOUNT_SYNC_COLS: &[&str] = &[ "id", "user_id", "account_name", "email_address", "imap_server", "imap_port", "smtp_server", "smtp_port", "username", "use_tls", "created_at", "archive_folder_name", "auth_type", "jmap_session_url", "jmap_account_id", "sync_interval_minutes", "email_signature", ]; /// Tables in FK-safe order for upserts (parents first). pub(crate) const UPSERT_ORDER: &[&str] = &[ "projects", "contacts", "email_accounts", "sync_accounts", "milestones", "tasks", "time_sessions", "attachments", "events", "annotations", "subtasks", "contact_emails", "contact_phones", "contact_social_handles", "contact_custom_fields", "daily_notes", ]; /// Tables in reverse FK-safe order for deletes (children first). pub(crate) const DELETE_ORDER: &[&str] = &[ "daily_notes", "contact_custom_fields", "contact_social_handles", "contact_phones", "contact_emails", "subtasks", "annotations", "events", "attachments", "time_sessions", "tasks", "milestones", "sync_accounts", "email_accounts", "contacts", "projects", ]; /// Result of a sync operation. #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct SyncResult { pub pushed: i64, pub pulled: i64, } // -- High-level sync -- pub async fn perform_sync(pool: &SqlitePool, client: &SyncKitClient) -> Result { perform_sync_with_blobs(pool, client, None).await } pub async fn perform_sync_with_blobs( pool: &SqlitePool, client: &SyncKitClient, data_dir: Option<&Path>, ) -> Result { // Clear applying_remote flag in case a previous sync crashed mid-apply. // If the flag is stuck at "1", all local changes silently skip the changelog. set_sync_state(pool, "applying_remote", "0").await?; let device_id = ensure_device_registered(pool, client).await?; // Push first, then pull let pushed = push_changes(pool, client, device_id).await?; let pulled = pull_changes(pool, client, device_id).await?; // Sync blobs after metadata (upload local, download missing) if let Some(dir) = data_dir { if let Err(e) = blob_sync::upload_pending_blobs(pool, dir, client).await { warn!("Blob upload failed (non-fatal): {}", e); } if let Err(e) = blob_sync::download_missing_blobs(pool, dir, client).await { warn!("Blob download failed (non-fatal): {}", e); } } // Update last sync timestamp let now = Utc::now().to_rfc3339(); set_sync_state(pool, "last_sync_at", &now).await?; Ok(SyncResult { pushed, pulled }) } // -- Initial snapshot -- /// One-time: snapshot all existing rows into the changelog for the first push. pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result { let tables_and_cols: Vec<(&str, &[&str])> = UPSERT_ORDER .iter() .filter_map(|t| apply::table_columns(t).map(|c| (*t, c))) .collect(); let mut total: i64 = 0; for (table, columns) in &tables_and_cols { let col_list = columns .iter() .map(|c| format!("'{}', {}", c, c)) .collect::>() .join(", "); let sql = format!( "INSERT INTO sync_changelog (table_name, op, row_id, data) \ SELECT '{table}', 'INSERT', id, json_object({col_list}) FROM {table} \ WHERE NOT EXISTS (SELECT 1 FROM sync_changelog sc WHERE sc.table_name = '{table}' AND sc.row_id = {table}.id)", ); let result = sqlx::query(&sql) .execute(pool) .await .map_err(CoreError::database)?; total += result.rows_affected() as i64; } set_sync_state(pool, "initial_snapshot_done", "1").await?; info!("Initial sync snapshot created: {} rows", total); Ok(total) } // -- Changelog cleanup -- /// Prune pushed entries older than 7 days. pub async fn cleanup_changelog(pool: &SqlitePool) -> Result { let result = sqlx::query( "DELETE FROM sync_changelog WHERE pushed = 1 AND timestamp < datetime('now', '-7 days')" ) .execute(pool) .await .map_err(CoreError::database)?; let deleted = result.rows_affected() as i64; if deleted > 0 { debug!("Cleaned up {} old changelog entries", deleted); } Ok(deleted) } /// Count unpushed changes. pub async fn count_pending_changes(pool: &SqlitePool) -> Result { let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0") .fetch_one(pool) .await .map_err(CoreError::database)?; Ok(row.0) }