Skip to main content

max / goingson

6.6 KB · 207 lines History Blame Raw
1 //! Core sync engine: push local changes, pull remote changes, apply to DB.
2 //!
3 //! # SQL Safety: `format!()` for table and column names
4 //!
5 //! Several functions in this module use `format!()` to interpolate table and column
6 //! names into SQL strings (e.g., `apply_upsert`, `apply_delete`, `create_initial_snapshot`).
7 //! This is safe because:
8 //!
9 //! 1. **Table names come from hardcoded constants** (`UPSERT_ORDER`, `DELETE_ORDER`) -- never
10 //! from user input or remote data.
11 //! 2. **Column names come from `table_columns()`**, a compile-time whitelist of `&'static str`
12 //! literals -- also never from user input.
13 //! 3. **All user-supplied values** (row IDs, field data) are passed through `sqlx::query().bind()`,
14 //! which parameterizes them safely.
15 //! 4. **Unknown table names are rejected** before any SQL is constructed: both `apply_upsert` and
16 //! `apply_delete` return an error if `table_columns()` returns `None`.
17 //!
18 //! The `format!()` pattern is used here instead of a procedural macro because the table/column
19 //! sets are dynamic per-row (determined by the sync changelog), but the values themselves are
20 //! drawn exclusively from the static whitelist above.
21
22 mod apply;
23 pub(crate) mod blob_sync;
24 mod pull;
25 mod push;
26 mod state;
27
28 #[cfg(test)]
29 #[path = "tests.rs"]
30 mod tests;
31
32 use std::path::Path;
33
34 use chrono::Utc;
35 use goingson_core::CoreError;
36 use serde::{Deserialize, Serialize};
37 use sqlx::SqlitePool;
38 use synckit_client::SyncKitClient;
39 use tracing::{debug, info, warn};
40
41 // Re-export public API so `crate::sync_service::*` continues to work.
42 pub use self::state::{get_sync_state, get_sync_states_batch, set_sync_state, ensure_device_registered};
43 pub use self::push::push_changes;
44 pub use self::pull::pull_changes;
45
46 /// Maximum changes to push in a single batch.
47 pub(crate) const PUSH_BATCH_LIMIT: i64 = 500;
48
49 /// Email account columns that sync (config only -- credentials stay per-device).
50 pub(crate) const EMAIL_ACCOUNT_SYNC_COLS: &[&str] = &[
51 "id", "user_id", "account_name", "email_address",
52 "imap_server", "imap_port", "smtp_server", "smtp_port",
53 "username", "use_tls", "created_at", "archive_folder_name",
54 "auth_type", "jmap_session_url", "jmap_account_id", "sync_interval_minutes",
55 "email_signature",
56 ];
57
58 /// Tables in FK-safe order for upserts (parents first).
59 pub(crate) const UPSERT_ORDER: &[&str] = &[
60 "projects",
61 "contacts",
62 "email_accounts",
63 "sync_accounts",
64 "milestones",
65 "tasks",
66 "time_sessions",
67 "attachments",
68 "events",
69 "annotations",
70 "subtasks",
71 "contact_emails",
72 "contact_phones",
73 "contact_social_handles",
74 "contact_custom_fields",
75 "daily_notes",
76 ];
77
78 /// Tables in reverse FK-safe order for deletes (children first).
79 pub(crate) const DELETE_ORDER: &[&str] = &[
80 "daily_notes",
81 "contact_custom_fields",
82 "contact_social_handles",
83 "contact_phones",
84 "contact_emails",
85 "subtasks",
86 "annotations",
87 "events",
88 "attachments",
89 "time_sessions",
90 "tasks",
91 "milestones",
92 "sync_accounts",
93 "email_accounts",
94 "contacts",
95 "projects",
96 ];
97
98 /// Result of a sync operation.
99 #[derive(Debug, Serialize, Deserialize)]
100 #[serde(rename_all = "camelCase")]
101 pub struct SyncResult {
102 pub pushed: i64,
103 pub pulled: i64,
104 }
105
106 // -- High-level sync --
107
108 pub async fn perform_sync(pool: &SqlitePool, client: &SyncKitClient) -> Result<SyncResult, CoreError> {
109 perform_sync_with_blobs(pool, client, None).await
110 }
111
112 pub async fn perform_sync_with_blobs(
113 pool: &SqlitePool,
114 client: &SyncKitClient,
115 data_dir: Option<&Path>,
116 ) -> Result<SyncResult, CoreError> {
117 // Clear applying_remote flag in case a previous sync crashed mid-apply.
118 // If the flag is stuck at "1", all local changes silently skip the changelog.
119 set_sync_state(pool, "applying_remote", "0").await?;
120
121 let device_id = ensure_device_registered(pool, client).await?;
122
123 // Push first, then pull
124 let pushed = push_changes(pool, client, device_id).await?;
125 let pulled = pull_changes(pool, client, device_id).await?;
126
127 // Sync blobs after metadata (upload local, download missing)
128 if let Some(dir) = data_dir {
129 if let Err(e) = blob_sync::upload_pending_blobs(pool, dir, client).await {
130 warn!("Blob upload failed (non-fatal): {}", e);
131 }
132 if let Err(e) = blob_sync::download_missing_blobs(pool, dir, client).await {
133 warn!("Blob download failed (non-fatal): {}", e);
134 }
135 }
136
137 // Update last sync timestamp
138 let now = Utc::now().to_rfc3339();
139 set_sync_state(pool, "last_sync_at", &now).await?;
140
141 Ok(SyncResult { pushed, pulled })
142 }
143
144 // -- Initial snapshot --
145
146 /// One-time: snapshot all existing rows into the changelog for the first push.
147 pub async fn create_initial_snapshot(pool: &SqlitePool) -> Result<i64, CoreError> {
148 let tables_and_cols: Vec<(&str, &[&str])> = UPSERT_ORDER
149 .iter()
150 .filter_map(|t| apply::table_columns(t).map(|c| (*t, c)))
151 .collect();
152
153 let mut total: i64 = 0;
154
155 for (table, columns) in &tables_and_cols {
156 let col_list = columns
157 .iter()
158 .map(|c| format!("'{}', {}", c, c))
159 .collect::<Vec<_>>()
160 .join(", ");
161
162 let sql = format!(
163 "INSERT INTO sync_changelog (table_name, op, row_id, data) \
164 SELECT '{table}', 'INSERT', id, json_object({col_list}) FROM {table} \
165 WHERE NOT EXISTS (SELECT 1 FROM sync_changelog sc WHERE sc.table_name = '{table}' AND sc.row_id = {table}.id)",
166 );
167
168 let result = sqlx::query(&sql)
169 .execute(pool)
170 .await
171 .map_err(CoreError::database)?;
172
173 total += result.rows_affected() as i64;
174 }
175
176 set_sync_state(pool, "initial_snapshot_done", "1").await?;
177 info!("Initial sync snapshot created: {} rows", total);
178 Ok(total)
179 }
180
181 // -- Changelog cleanup --
182
183 /// Prune pushed entries older than 7 days.
184 pub async fn cleanup_changelog(pool: &SqlitePool) -> Result<i64, CoreError> {
185 let result = sqlx::query(
186 "DELETE FROM sync_changelog WHERE pushed = 1 AND timestamp < datetime('now', '-7 days')"
187 )
188 .execute(pool)
189 .await
190 .map_err(CoreError::database)?;
191
192 let deleted = result.rows_affected() as i64;
193 if deleted > 0 {
194 debug!("Cleaned up {} old changelog entries", deleted);
195 }
196 Ok(deleted)
197 }
198
199 /// Count unpushed changes.
200 pub async fn count_pending_changes(pool: &SqlitePool) -> Result<i64, CoreError> {
201 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE pushed = 0")
202 .fetch_one(pool)
203 .await
204 .map_err(CoreError::database)?;
205 Ok(row.0)
206 }
207