Skip to main content

max / goingson

61.5 KB · 1609 lines History Blame Raw
1 use crate::sync_service::*;
2 use crate::sync_service::{apply, pull, UPSERT_ORDER, DELETE_ORDER};
3 use goingson_db_sqlite::{init_pool, run_migrations};
4 use serde_json::json;
5 use sqlx::SqlitePool;
6
7 /// Format the current UTC time as a SQL-compatible timestamp string.
8 fn now_sql() -> String {
9 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string()
10 }
11
12 /// Creates an in-memory test database with all migrations applied.
13 async fn setup_test_db() -> SqlitePool {
14 let pool = init_pool(Some(":memory:"))
15 .await
16 .expect("Failed to create in-memory pool");
17 run_migrations(&pool)
18 .await
19 .expect("Failed to run migrations");
20 pool
21 }
22
23 /// Creates a test user and returns their UUID string.
24 async fn create_test_user(pool: &SqlitePool) -> String {
25 let user_id = uuid::Uuid::new_v4().to_string();
26 let now = now_sql();
27 sqlx::query(
28 "INSERT INTO users (id, email, password_hash, display_name, created_at) VALUES (?, ?, ?, ?, ?)"
29 )
30 .bind(&user_id)
31 .bind(format!("test-{}@example.com", user_id))
32 .bind("test-password-hash")
33 .bind("Test User")
34 .bind(&now)
35 .execute(pool)
36 .await
37 .expect("Failed to create test user");
38 user_id
39 }
40
41 /// Helper: build a ChangeEntry for tests.
42 fn change(table: &str, op: synckit_client::ChangeOp, row_id: &str, data: Option<serde_json::Value>) -> synckit_client::ChangeEntry {
43 synckit_client::ChangeEntry {
44 table: table.to_string(),
45 op,
46 row_id: row_id.to_string(),
47 timestamp: chrono::Utc::now(),
48 data,
49 }
50 }
51
52 // -- FK ordering tests --
53
54 #[test]
55 fn upsert_order_has_parents_before_children() {
56 let pos = |table: &str| UPSERT_ORDER.iter().position(|t| *t == table);
57
58 // projects before tasks, events, milestones
59 assert!(pos("projects").unwrap() < pos("tasks").unwrap());
60 assert!(pos("projects").unwrap() < pos("events").unwrap());
61 assert!(pos("projects").unwrap() < pos("milestones").unwrap());
62
63 // contacts before contact_emails, contact_phones, etc.
64 assert!(pos("contacts").unwrap() < pos("contact_emails").unwrap());
65 assert!(pos("contacts").unwrap() < pos("contact_phones").unwrap());
66 assert!(pos("contacts").unwrap() < pos("contact_social_handles").unwrap());
67 assert!(pos("contacts").unwrap() < pos("contact_custom_fields").unwrap());
68
69 // tasks before annotations, subtasks
70 assert!(pos("tasks").unwrap() < pos("annotations").unwrap());
71 assert!(pos("tasks").unwrap() < pos("subtasks").unwrap());
72
73 // milestones before tasks (tasks.milestone_id FK)
74 assert!(pos("milestones").unwrap() < pos("tasks").unwrap());
75 }
76
77 #[test]
78 fn delete_order_has_children_before_parents() {
79 let pos = |table: &str| DELETE_ORDER.iter().position(|t| *t == table);
80
81 // children before parents (reverse of upsert)
82 assert!(pos("tasks").unwrap() < pos("projects").unwrap());
83 assert!(pos("events").unwrap() < pos("projects").unwrap());
84 assert!(pos("milestones").unwrap() < pos("projects").unwrap());
85
86 assert!(pos("contact_emails").unwrap() < pos("contacts").unwrap());
87 assert!(pos("contact_phones").unwrap() < pos("contacts").unwrap());
88 assert!(pos("contact_social_handles").unwrap() < pos("contacts").unwrap());
89 assert!(pos("contact_custom_fields").unwrap() < pos("contacts").unwrap());
90
91 assert!(pos("annotations").unwrap() < pos("tasks").unwrap());
92 assert!(pos("subtasks").unwrap() < pos("tasks").unwrap());
93
94 assert!(pos("tasks").unwrap() < pos("milestones").unwrap());
95 }
96
97 #[test]
98 fn upsert_and_delete_orders_are_exact_reverses() {
99 let reversed: Vec<&str> = UPSERT_ORDER.iter().rev().copied().collect();
100 assert_eq!(reversed, DELETE_ORDER);
101 }
102
103 #[test]
104 fn all_upsert_tables_have_column_whitelists() {
105 for table in UPSERT_ORDER {
106 assert!(
107 apply::table_columns(table).is_some(),
108 "table_columns missing for syncable table: {}",
109 table
110 );
111 }
112 }
113
114 #[test]
115 fn unknown_table_returns_none() {
116 assert!(apply::table_columns("nonexistent").is_none());
117 assert!(apply::table_columns("users").is_none());
118 assert!(apply::table_columns("emails").is_none());
119 }
120
121 #[test]
122 fn every_table_whitelist_starts_with_id() {
123 for table in UPSERT_ORDER {
124 let cols = apply::table_columns(table).unwrap();
125 assert_eq!(
126 cols[0], "id",
127 "table {} column whitelist should start with 'id'",
128 table
129 );
130 }
131 }
132
133 // -- sync_state helpers --
134
135 #[tokio::test]
136 async fn get_sync_state_returns_empty_for_missing_key() {
137 let pool = setup_test_db().await;
138 let val = get_sync_state(&pool, "nonexistent_key").await.unwrap();
139 assert_eq!(val, "");
140 }
141
142 #[tokio::test]
143 async fn set_and_get_sync_state() {
144 let pool = setup_test_db().await;
145 set_sync_state(&pool, "test_key", "test_value").await.unwrap();
146 let val = get_sync_state(&pool, "test_key").await.unwrap();
147 assert_eq!(val, "test_value");
148 }
149
150 #[tokio::test]
151 async fn set_sync_state_overwrites_existing() {
152 let pool = setup_test_db().await;
153 set_sync_state(&pool, "key", "first").await.unwrap();
154 set_sync_state(&pool, "key", "second").await.unwrap();
155 let val = get_sync_state(&pool, "key").await.unwrap();
156 assert_eq!(val, "second");
157 }
158
159 #[tokio::test]
160 async fn migration_seeds_default_sync_state() {
161 let pool = setup_test_db().await;
162
163 assert_eq!(get_sync_state(&pool, "device_id").await.unwrap(), "");
164 assert_eq!(get_sync_state(&pool, "pull_cursor").await.unwrap(), "0");
165 assert_eq!(get_sync_state(&pool, "applying_remote").await.unwrap(), "0");
166 assert_eq!(get_sync_state(&pool, "initial_snapshot_done").await.unwrap(), "0");
167 }
168
169 // -- apply_upsert --
170
171 #[tokio::test]
172 async fn apply_upsert_inserts_project() {
173 let pool = setup_test_db().await;
174 let user_id = create_test_user(&pool).await;
175 let project_id = uuid::Uuid::new_v4().to_string();
176 let now = now_sql();
177
178 let data = json!({
179 "id": project_id,
180 "name": "Synced Project",
181 "description": "From remote",
182 "project_type": "SideProject",
183 "status": "Active",
184 "created_at": now,
185 "user_id": user_id,
186 });
187
188 let mut conn = pool.acquire().await.unwrap();
189 apply::apply_upsert(&mut conn, "projects", &project_id, &data).await.unwrap();
190
191 let row: (String,) = sqlx::query_as("SELECT name FROM projects WHERE id = ?")
192 .bind(&project_id)
193 .fetch_one(&pool)
194 .await
195 .unwrap();
196 assert_eq!(row.0, "Synced Project");
197 }
198
199 #[tokio::test]
200 async fn apply_upsert_replaces_existing_row() {
201 let pool = setup_test_db().await;
202 let user_id = create_test_user(&pool).await;
203 let project_id = uuid::Uuid::new_v4().to_string();
204 let now = now_sql();
205
206 let data1 = json!({
207 "id": project_id,
208 "name": "Original",
209 "description": "",
210 "project_type": "Job",
211 "status": "Active",
212 "created_at": now,
213 "user_id": user_id,
214 });
215 let mut conn = pool.acquire().await.unwrap();
216 apply::apply_upsert(&mut conn, "projects", &project_id, &data1).await.unwrap();
217
218 let data2 = json!({
219 "id": project_id,
220 "name": "Updated",
221 "description": "changed",
222 "project_type": "Job",
223 "status": "OnHold",
224 "created_at": now,
225 "user_id": user_id,
226 });
227 apply::apply_upsert(&mut conn, "projects", &project_id, &data2).await.unwrap();
228
229 let row: (String, String) =
230 sqlx::query_as("SELECT name, status FROM projects WHERE id = ?")
231 .bind(&project_id)
232 .fetch_one(&pool)
233 .await
234 .unwrap();
235 assert_eq!(row.0, "Updated");
236 assert_eq!(row.1, "OnHold");
237 }
238
239 #[tokio::test]
240 async fn apply_upsert_rejects_unknown_table() {
241 let pool = setup_test_db().await;
242 let data = json!({"id": "abc"});
243 let mut conn = pool.acquire().await.unwrap();
244 let result = apply::apply_upsert(&mut conn, "nonexistent", "abc", &data).await;
245 assert!(result.is_err());
246 let err_msg = result.unwrap_err().to_string();
247 assert!(err_msg.contains("unknown syncable table"));
248 }
249
250 #[tokio::test]
251 async fn apply_upsert_handles_null_fields() {
252 let pool = setup_test_db().await;
253 let user_id = create_test_user(&pool).await;
254 let project_id = uuid::Uuid::new_v4().to_string();
255 let now = now_sql();
256
257 let mut conn = pool.acquire().await.unwrap();
258
259 // Insert a project to be a FK parent for the task
260 let data_project = json!({
261 "id": project_id,
262 "name": "Parent",
263 "description": "",
264 "project_type": "Job",
265 "status": "Active",
266 "created_at": now,
267 "user_id": user_id,
268 });
269 apply::apply_upsert(&mut conn, "projects", &project_id, &data_project).await.unwrap();
270
271 let task_id = uuid::Uuid::new_v4().to_string();
272 let data = json!({
273 "id": task_id,
274 "project_id": project_id,
275 "description": "Null test",
276 "status": "Pending",
277 "priority": "Medium",
278 "due": null,
279 "tags": null,
280 "urgency": 50,
281 "recurrence": "None",
282 "created_at": now,
283 "user_id": user_id,
284 "recurrence_parent_id": null,
285 "source_email_id": null,
286 "snoozed_until": null,
287 "waiting_for_response": false,
288 "waiting_since": null,
289 "expected_response_date": null,
290 "scheduled_start": null,
291 "scheduled_duration": null,
292 "is_focus": false,
293 "focus_set_at": null,
294 "contact_id": null,
295 "milestone_id": null,
296 });
297
298 apply::apply_upsert(&mut conn, "tasks", &task_id, &data).await.unwrap();
299
300 let row: (String,) = sqlx::query_as("SELECT description FROM tasks WHERE id = ?")
301 .bind(&task_id)
302 .fetch_one(&pool)
303 .await
304 .unwrap();
305 assert_eq!(row.0, "Null test");
306 }
307
308 // -- apply_delete --
309
310 #[tokio::test]
311 async fn apply_delete_removes_row() {
312 let pool = setup_test_db().await;
313 let user_id = create_test_user(&pool).await;
314 let project_id = uuid::Uuid::new_v4().to_string();
315 let now = now_sql();
316
317 let mut conn = pool.acquire().await.unwrap();
318
319 let data = json!({
320 "id": project_id,
321 "name": "To Delete",
322 "description": "",
323 "project_type": "Other",
324 "status": "Active",
325 "created_at": now,
326 "user_id": user_id,
327 });
328 apply::apply_upsert(&mut conn, "projects", &project_id, &data).await.unwrap();
329
330 apply::apply_delete(&mut conn, "projects", &project_id).await.unwrap();
331
332 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM projects WHERE id = ?")
333 .bind(&project_id)
334 .fetch_one(&pool)
335 .await
336 .unwrap();
337 assert_eq!(count.0, 0);
338 }
339
340 #[tokio::test]
341 async fn apply_delete_rejects_unknown_table() {
342 let pool = setup_test_db().await;
343 let mut conn = pool.acquire().await.unwrap();
344 let result = apply::apply_delete(&mut conn, "not_a_table", "abc").await;
345 assert!(result.is_err());
346 let err_msg = result.unwrap_err().to_string();
347 assert!(err_msg.contains("unknown syncable table"));
348 }
349
350 #[tokio::test]
351 async fn apply_delete_is_idempotent() {
352 let pool = setup_test_db().await;
353 let mut conn = pool.acquire().await.unwrap();
354 // Deleting a non-existent row should succeed silently.
355 let result = apply::apply_delete(&mut conn, "projects", "nonexistent-id").await;
356 assert!(result.is_ok());
357 }
358
359 // -- apply_changes_inner ordering --
360
361 #[tokio::test]
362 async fn apply_changes_inner_processes_upserts_in_fk_order() {
363 let pool = setup_test_db().await;
364 let user_id = create_test_user(&pool).await;
365 let project_id = uuid::Uuid::new_v4().to_string();
366 let task_id = uuid::Uuid::new_v4().to_string();
367 let now = now_sql();
368
369 // Provide changes in WRONG order (child before parent) to prove re-ordering works.
370 let changes = vec![
371 change("tasks", synckit_client::ChangeOp::Insert, &task_id, Some(json!({
372 "id": task_id,
373 "project_id": project_id,
374 "description": "Child task",
375 "status": "Pending",
376 "priority": "Low",
377 "due": null,
378 "tags": null,
379 "urgency": 10,
380 "recurrence": "None",
381 "created_at": now,
382 "user_id": user_id,
383 "recurrence_parent_id": null,
384 "source_email_id": null,
385 "snoozed_until": null,
386 "waiting_for_response": false,
387 "waiting_since": null,
388 "expected_response_date": null,
389 "scheduled_start": null,
390 "scheduled_duration": null,
391 "is_focus": false,
392 "focus_set_at": null,
393 "contact_id": null,
394 "milestone_id": null,
395 }))),
396 change("projects", synckit_client::ChangeOp::Insert, &project_id, Some(json!({
397 "id": project_id,
398 "name": "Parent project",
399 "description": "",
400 "project_type": "Job",
401 "status": "Active",
402 "created_at": now,
403 "user_id": user_id,
404 }))),
405 ];
406
407 // Should succeed despite task coming before project in the input --
408 // apply_changes_inner iterates UPSERT_ORDER so projects is applied first.
409 let mut conn = pool.acquire().await.unwrap();
410 pull::apply_changes_inner(&mut conn, changes).await.unwrap();
411
412 let project: (String,) = sqlx::query_as("SELECT name FROM projects WHERE id = ?")
413 .bind(&project_id)
414 .fetch_one(&pool)
415 .await
416 .unwrap();
417 assert_eq!(project.0, "Parent project");
418
419 let task: (String,) = sqlx::query_as("SELECT description FROM tasks WHERE id = ?")
420 .bind(&task_id)
421 .fetch_one(&pool)
422 .await
423 .unwrap();
424 assert_eq!(task.0, "Child task");
425 }
426
427 #[tokio::test]
428 async fn apply_changes_inner_processes_deletes_in_child_first_order() {
429 let pool = setup_test_db().await;
430 let user_id = create_test_user(&pool).await;
431 let project_id = uuid::Uuid::new_v4().to_string();
432 let task_id = uuid::Uuid::new_v4().to_string();
433 let subtask_id = uuid::Uuid::new_v4().to_string();
434 let now = now_sql();
435
436 let mut conn = pool.acquire().await.unwrap();
437
438 // Insert parent -> child -> grandchild
439 apply::apply_upsert(&mut conn, "projects", &project_id, &json!({
440 "id": project_id, "name": "P", "description": "",
441 "project_type": "Job", "status": "Active",
442 "created_at": now, "user_id": user_id,
443 })).await.unwrap();
444
445 apply::apply_upsert(&mut conn, "tasks", &task_id, &json!({
446 "id": task_id, "project_id": project_id, "description": "T",
447 "status": "Pending", "priority": "Low", "due": null,
448 "tags": null, "urgency": 10, "recurrence": "None",
449 "created_at": now, "user_id": user_id,
450 "recurrence_parent_id": null, "source_email_id": null,
451 "snoozed_until": null, "waiting_for_response": false,
452 "waiting_since": null, "expected_response_date": null,
453 "scheduled_start": null, "scheduled_duration": null,
454 "is_focus": false, "focus_set_at": null,
455 "contact_id": null, "milestone_id": null,
456 })).await.unwrap();
457
458 apply::apply_upsert(&mut conn, "subtasks", &subtask_id, &json!({
459 "id": subtask_id, "task_id": task_id, "text": "Sub",
460 "is_completed": false, "position": 0,
461 "created_at": now, "linked_task_id": null,
462 })).await.unwrap();
463
464 // Delete in WRONG input order (parent first) -- apply_changes_inner
465 // should re-order to subtask -> task -> project.
466 let changes = vec![
467 change("projects", synckit_client::ChangeOp::Delete, &project_id, None),
468 change("tasks", synckit_client::ChangeOp::Delete, &task_id, None),
469 change("subtasks", synckit_client::ChangeOp::Delete, &subtask_id, None),
470 ];
471
472 pull::apply_changes_inner(&mut conn, changes).await.unwrap();
473
474 let count: (i64,) =
475 sqlx::query_as("SELECT COUNT(*) FROM projects WHERE id = ?")
476 .bind(&project_id)
477 .fetch_one(&pool)
478 .await
479 .unwrap();
480 assert_eq!(count.0, 0);
481
482 let count: (i64,) =
483 sqlx::query_as("SELECT COUNT(*) FROM tasks WHERE id = ?")
484 .bind(&task_id)
485 .fetch_one(&pool)
486 .await
487 .unwrap();
488 assert_eq!(count.0, 0);
489 }
490
491 #[tokio::test]
492 async fn apply_changes_inner_handles_mixed_ops() {
493 let pool = setup_test_db().await;
494 let user_id = create_test_user(&pool).await;
495 let now = now_sql();
496
497 let mut conn = pool.acquire().await.unwrap();
498
499 // Create a project to delete later
500 let delete_id = uuid::Uuid::new_v4().to_string();
501 apply::apply_upsert(&mut conn, "projects", &delete_id, &json!({
502 "id": delete_id, "name": "Will Delete", "description": "",
503 "project_type": "Job", "status": "Active",
504 "created_at": now, "user_id": user_id,
505 })).await.unwrap();
506
507 // Mix upsert and delete in a single batch
508 let insert_id = uuid::Uuid::new_v4().to_string();
509 let changes = vec![
510 change("projects", synckit_client::ChangeOp::Delete, &delete_id, None),
511 change("projects", synckit_client::ChangeOp::Insert, &insert_id, Some(json!({
512 "id": insert_id, "name": "New One", "description": "",
513 "project_type": "Other", "status": "Active",
514 "created_at": now, "user_id": user_id,
515 }))),
516 ];
517
518 pull::apply_changes_inner(&mut conn, changes).await.unwrap();
519
520 // The old project was deleted
521 let count: (i64,) =
522 sqlx::query_as("SELECT COUNT(*) FROM projects WHERE id = ?")
523 .bind(&delete_id)
524 .fetch_one(&pool)
525 .await
526 .unwrap();
527 assert_eq!(count.0, 0);
528
529 // The new project was inserted
530 let row: (String,) = sqlx::query_as("SELECT name FROM projects WHERE id = ?")
531 .bind(&insert_id)
532 .fetch_one(&pool)
533 .await
534 .unwrap();
535 assert_eq!(row.0, "New One");
536 }
537
538 #[tokio::test]
539 async fn apply_changes_inner_skips_upsert_without_data() {
540 let pool = setup_test_db().await;
541
542 // INSERT with no data should be skipped (data is None)
543 let changes = vec![change("projects", synckit_client::ChangeOp::Insert, "abc", None)];
544 let mut conn = pool.acquire().await.unwrap();
545 let result = pull::apply_changes_inner(&mut conn, changes).await;
546 assert!(result.is_ok());
547
548 let count: (i64,) =
549 sqlx::query_as("SELECT COUNT(*) FROM projects WHERE id = ?")
550 .bind("abc")
551 .fetch_one(&pool)
552 .await
553 .unwrap();
554 assert_eq!(count.0, 0);
555 }
556
557 // -- Trigger suppression --
558
559 #[tokio::test]
560 async fn apply_remote_changes_suppresses_triggers() {
561 let pool = setup_test_db().await;
562 let user_id = create_test_user(&pool).await;
563 let project_id = uuid::Uuid::new_v4().to_string();
564 let now = now_sql();
565
566 // Clear any existing changelog entries
567 sqlx::query("DELETE FROM sync_changelog")
568 .execute(&pool)
569 .await
570 .unwrap();
571
572 let changes = vec![
573 change("projects", synckit_client::ChangeOp::Insert, &project_id, Some(json!({
574 "id": project_id, "name": "Remote Project", "description": "",
575 "project_type": "Job", "status": "Active",
576 "created_at": now, "user_id": user_id,
577 }))),
578 ];
579
580 pull::apply_remote_changes(&pool, changes).await.unwrap();
581
582 // The project should exist
583 let row: (String,) = sqlx::query_as("SELECT name FROM projects WHERE id = ?")
584 .bind(&project_id)
585 .fetch_one(&pool)
586 .await
587 .unwrap();
588 assert_eq!(row.0, "Remote Project");
589
590 // But the changelog should be empty -- triggers were suppressed
591 let count: (i64,) =
592 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
593 .fetch_one(&pool)
594 .await
595 .unwrap();
596 assert_eq!(count.0, 0, "trigger should not fire during remote apply");
597
598 // The applying_remote flag should be cleared
599 let flag = get_sync_state(&pool, "applying_remote").await.unwrap();
600 assert_eq!(flag, "0", "applying_remote should be reset after apply");
601 }
602
603 #[tokio::test]
604 async fn apply_remote_changes_clears_flag_after_success() {
605 let pool = setup_test_db().await;
606
607 // Verify the flag is set to "1" during apply and cleared to "0" after.
608 // We can test this by calling apply_remote_changes with an empty batch.
609 let changes = vec![];
610 pull::apply_remote_changes(&pool, changes).await.unwrap();
611
612 let flag = get_sync_state(&pool, "applying_remote").await.unwrap();
613 assert_eq!(flag, "0", "applying_remote flag should be cleared after apply");
614 }
615
616 #[tokio::test]
617 async fn apply_remote_changes_clears_flag_on_error() {
618 let pool = setup_test_db().await;
619
620 // Test: apply_upsert on unknown table returns BadRequest
621 let mut conn = pool.acquire().await.unwrap();
622 let result = apply::apply_upsert(&mut conn, "fake_table", "id", &json!({"id": "x"})).await;
623 assert!(result.is_err());
624 match result.unwrap_err() {
625 goingson_core::CoreError::BadRequest(msg) => assert!(msg.contains("unknown syncable table")),
626 other => panic!("Expected BadRequest, got: {:?}", other),
627 }
628 }
629
630 #[tokio::test]
631 async fn local_insert_fires_trigger_normally() {
632 let pool = setup_test_db().await;
633 let user_id = create_test_user(&pool).await;
634 let project_id = uuid::Uuid::new_v4().to_string();
635 let now = now_sql();
636
637 // Clear changelog
638 sqlx::query("DELETE FROM sync_changelog")
639 .execute(&pool)
640 .await
641 .unwrap();
642
643 // Insert directly (not via apply_remote_changes) -- trigger should fire
644 sqlx::query(
645 "INSERT INTO projects (id, name, description, project_type, status, created_at, user_id) VALUES (?, ?, ?, ?, ?, ?, ?)"
646 )
647 .bind(&project_id)
648 .bind("Local Project")
649 .bind("")
650 .bind("Job")
651 .bind("Active")
652 .bind(&now)
653 .bind(&user_id)
654 .execute(&pool)
655 .await
656 .unwrap();
657
658 // Trigger should have created a changelog entry
659 let count: (i64,) =
660 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'projects' AND row_id = ?")
661 .bind(&project_id)
662 .fetch_one(&pool)
663 .await
664 .unwrap();
665 assert_eq!(count.0, 1, "local insert should fire sync trigger");
666 }
667
668 // -- Initial snapshot --
669
670 #[tokio::test]
671 async fn create_initial_snapshot_captures_all_rows() {
672 let pool = setup_test_db().await;
673 let user_id = create_test_user(&pool).await;
674 let now = now_sql();
675
676 // Suppress triggers during setup so only snapshot entries end up in changelog
677 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
678
679 // Insert 2 projects
680 for i in 1..=2 {
681 let pid = uuid::Uuid::new_v4().to_string();
682 sqlx::query(
683 "INSERT INTO projects (id, name, description, project_type, status, created_at, user_id) VALUES (?, ?, ?, ?, ?, ?, ?)"
684 )
685 .bind(&pid)
686 .bind(format!("Project {}", i))
687 .bind("")
688 .bind("Job")
689 .bind("Active")
690 .bind(&now)
691 .bind(&user_id)
692 .execute(&pool)
693 .await
694 .unwrap();
695 }
696
697 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
698
699 // Clear any stray changelog entries
700 sqlx::query("DELETE FROM sync_changelog")
701 .execute(&pool)
702 .await
703 .unwrap();
704
705 let total = create_initial_snapshot(&pool).await.unwrap();
706 assert_eq!(total, 2);
707
708 // Verify changelog entries
709 let count: (i64,) =
710 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'projects'")
711 .fetch_one(&pool)
712 .await
713 .unwrap();
714 assert_eq!(count.0, 2);
715
716 // Verify snapshot flag was set
717 let flag = get_sync_state(&pool, "initial_snapshot_done").await.unwrap();
718 assert_eq!(flag, "1");
719 }
720
721 #[tokio::test]
722 async fn create_initial_snapshot_empty_db() {
723 let pool = setup_test_db().await;
724 let _user_id = create_test_user(&pool).await;
725
726 // No data rows -- snapshot should report 0
727 sqlx::query("DELETE FROM sync_changelog")
728 .execute(&pool)
729 .await
730 .unwrap();
731
732 let total = create_initial_snapshot(&pool).await.unwrap();
733 assert_eq!(total, 0);
734 }
735
736 // -- Changelog cleanup --
737
738 #[tokio::test]
739 async fn cleanup_changelog_removes_old_pushed_entries() {
740 let pool = setup_test_db().await;
741
742 // Insert a pushed entry with an old timestamp
743 sqlx::query(
744 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) VALUES (?, ?, ?, datetime('now', '-10 days'), 1)"
745 )
746 .bind("projects")
747 .bind("INSERT")
748 .bind("old-id")
749 .execute(&pool)
750 .await
751 .unwrap();
752
753 // Insert a recent pushed entry
754 sqlx::query(
755 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) VALUES (?, ?, ?, datetime('now'), 1)"
756 )
757 .bind("projects")
758 .bind("INSERT")
759 .bind("recent-id")
760 .execute(&pool)
761 .await
762 .unwrap();
763
764 // Insert an unpushed entry (should never be cleaned)
765 sqlx::query(
766 "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) VALUES (?, ?, ?, datetime('now', '-10 days'), 0)"
767 )
768 .bind("projects")
769 .bind("INSERT")
770 .bind("unpushed-id")
771 .execute(&pool)
772 .await
773 .unwrap();
774
775 let deleted = cleanup_changelog(&pool).await.unwrap();
776 assert_eq!(deleted, 1, "should only delete old pushed entries");
777
778 // Verify the remaining entries
779 let count: (i64,) =
780 sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
781 .fetch_one(&pool)
782 .await
783 .unwrap();
784 assert_eq!(count.0, 2, "recent pushed + unpushed should remain");
785 }
786
787 // -- Count pending changes --
788
789 #[tokio::test]
790 async fn count_pending_changes_counts_unpushed() {
791 let pool = setup_test_db().await;
792
793 // Clear any existing entries
794 sqlx::query("DELETE FROM sync_changelog")
795 .execute(&pool)
796 .await
797 .unwrap();
798
799 // Insert 3 unpushed
800 for i in 0..3 {
801 sqlx::query(
802 "INSERT INTO sync_changelog (table_name, op, row_id, pushed) VALUES (?, ?, ?, 0)"
803 )
804 .bind("projects")
805 .bind("INSERT")
806 .bind(format!("id-{}", i))
807 .execute(&pool)
808 .await
809 .unwrap();
810 }
811
812 // Insert 2 pushed
813 for i in 0..2 {
814 sqlx::query(
815 "INSERT INTO sync_changelog (table_name, op, row_id, pushed) VALUES (?, ?, ?, 1)"
816 )
817 .bind("projects")
818 .bind("INSERT")
819 .bind(format!("pushed-{}", i))
820 .execute(&pool)
821 .await
822 .unwrap();
823 }
824
825 let count = count_pending_changes(&pool).await.unwrap();
826 assert_eq!(count, 3);
827 }
828
829 // -- Contact child table upserts --
830
831 #[tokio::test]
832 async fn apply_upsert_contact_with_children() {
833 let pool = setup_test_db().await;
834 let user_id = create_test_user(&pool).await;
835 let contact_id = uuid::Uuid::new_v4().to_string();
836 let email_id = uuid::Uuid::new_v4().to_string();
837 let now = now_sql();
838
839 let mut conn = pool.acquire().await.unwrap();
840
841 // Insert contact (parent)
842 apply::apply_upsert(&mut conn, "contacts", &contact_id, &json!({
843 "id": contact_id, "user_id": user_id,
844 "display_name": "Test Contact", "nickname": null,
845 "company": null, "title": null, "notes": null,
846 "tags": null, "birthday": null, "timezone": null,
847 "created_at": now, "updated_at": now,
848 })).await.unwrap();
849
850 // Insert contact_email (child)
851 apply::apply_upsert(&mut conn, "contact_emails", &email_id, &json!({
852 "id": email_id, "contact_id": contact_id,
853 "address": "test@example.com", "label": "work",
854 "is_primary": true,
855 })).await.unwrap();
856
857 let row: (String,) = sqlx::query_as("SELECT address FROM contact_emails WHERE id = ?")
858 .bind(&email_id)
859 .fetch_one(&pool)
860 .await
861 .unwrap();
862 assert_eq!(row.0, "test@example.com");
863 }
864
865 // -- Boolean handling --
866
867 #[tokio::test]
868 async fn apply_upsert_handles_booleans_as_integers() {
869 let pool = setup_test_db().await;
870 let user_id = create_test_user(&pool).await;
871 let contact_id = uuid::Uuid::new_v4().to_string();
872 let email_id = uuid::Uuid::new_v4().to_string();
873 let now = now_sql();
874
875 let mut conn = pool.acquire().await.unwrap();
876
877 apply::apply_upsert(&mut conn, "contacts", &contact_id, &json!({
878 "id": contact_id, "user_id": user_id,
879 "display_name": "Bool Test", "nickname": null,
880 "company": null, "title": null, "notes": null,
881 "tags": null, "birthday": null, "timezone": null,
882 "created_at": now, "updated_at": now,
883 })).await.unwrap();
884
885 // is_primary as a JSON boolean -- should be stored as integer
886 apply::apply_upsert(&mut conn, "contact_emails", &email_id, &json!({
887 "id": email_id, "contact_id": contact_id,
888 "address": "bool@test.com", "label": "home",
889 "is_primary": true,
890 })).await.unwrap();
891
892 let row: (i32,) = sqlx::query_as("SELECT is_primary FROM contact_emails WHERE id = ?")
893 .bind(&email_id)
894 .fetch_one(&pool)
895 .await
896 .unwrap();
897 assert_eq!(row.0, 1);
898 }
899
900 // -- email_accounts sync triggers --
901
902 #[tokio::test]
903 async fn email_account_insert_fires_trigger() {
904 let pool = setup_test_db().await;
905 let user_id = create_test_user(&pool).await;
906 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
907
908 let account_id = uuid::Uuid::new_v4().to_string();
909 sqlx::query(
910 "INSERT INTO email_accounts (id, user_id, account_name, email_address, \
911 imap_server, imap_port, smtp_server, smtp_port, username, password, \
912 use_tls, created_at) \
913 VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \
914 'smtp.example.com', 587, 'user', 'pass', 1, datetime('now'))"
915 )
916 .bind(&account_id)
917 .bind(&user_id)
918 .execute(&pool)
919 .await
920 .unwrap();
921
922 let count: (i64,) = sqlx::query_as(
923 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'email_accounts' AND op = 'INSERT'"
924 )
925 .fetch_one(&pool)
926 .await
927 .unwrap();
928 assert_eq!(count.0, 1);
929
930 let data: (String,) = sqlx::query_as(
931 "SELECT data FROM sync_changelog WHERE table_name = 'email_accounts' AND row_id = ?"
932 )
933 .bind(&account_id)
934 .fetch_one(&pool)
935 .await
936 .unwrap();
937 let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap();
938 let obj = parsed.as_object().unwrap();
939 assert_eq!(obj.len(), 17, "trigger data should have 17 keys (config only)");
940 assert!(!obj.contains_key("password"), "password should not be in trigger data");
941 assert!(!obj.contains_key("oauth2_access_token"), "oauth2_access_token should not be in trigger data");
942 assert!(!obj.contains_key("oauth2_refresh_token"), "oauth2_refresh_token should not be in trigger data");
943 assert!(!obj.contains_key("oauth2_token_expires_at"), "oauth2_token_expires_at should not be in trigger data");
944 assert!(!obj.contains_key("last_sync_at"), "last_sync_at should not be in trigger data");
945 assert_eq!(parsed["account_name"], "Work");
946 }
947
948 #[tokio::test]
949 async fn email_account_update_fires_trigger() {
950 let pool = setup_test_db().await;
951 let user_id = create_test_user(&pool).await;
952
953 // Insert with triggers suppressed
954 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
955 let account_id = uuid::Uuid::new_v4().to_string();
956 sqlx::query(
957 "INSERT INTO email_accounts (id, user_id, account_name, email_address, \
958 imap_server, imap_port, smtp_server, smtp_port, username, password, \
959 use_tls, created_at) \
960 VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \
961 'smtp.example.com', 587, 'user', 'pass', 1, datetime('now'))"
962 )
963 .bind(&account_id)
964 .bind(&user_id)
965 .execute(&pool)
966 .await
967 .unwrap();
968 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
969 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
970
971 // Update account_name
972 sqlx::query("UPDATE email_accounts SET account_name = 'Personal' WHERE id = ?")
973 .bind(&account_id)
974 .execute(&pool)
975 .await
976 .unwrap();
977
978 let row: (String, String) = sqlx::query_as(
979 "SELECT op, data FROM sync_changelog WHERE table_name = 'email_accounts' AND row_id = ?"
980 )
981 .bind(&account_id)
982 .fetch_one(&pool)
983 .await
984 .unwrap();
985 assert_eq!(row.0, "UPDATE");
986
987 let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap();
988 assert_eq!(parsed["account_name"], "Personal");
989 assert!(!parsed.as_object().unwrap().contains_key("password"), "update trigger should not include password");
990 }
991
992 #[tokio::test]
993 async fn email_account_delete_fires_trigger() {
994 let pool = setup_test_db().await;
995 let user_id = create_test_user(&pool).await;
996
997 // Insert with triggers suppressed
998 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
999 let account_id = uuid::Uuid::new_v4().to_string();
1000 sqlx::query(
1001 "INSERT INTO email_accounts (id, user_id, account_name, email_address, \
1002 imap_server, imap_port, smtp_server, smtp_port, username, password, \
1003 use_tls, created_at) \
1004 VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \
1005 'smtp.example.com', 587, 'user', 'pass', 1, datetime('now'))"
1006 )
1007 .bind(&account_id)
1008 .bind(&user_id)
1009 .execute(&pool)
1010 .await
1011 .unwrap();
1012 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
1013 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
1014
1015 // Delete the account
1016 sqlx::query("DELETE FROM email_accounts WHERE id = ?")
1017 .bind(&account_id)
1018 .execute(&pool)
1019 .await
1020 .unwrap();
1021
1022 let row: (String,) = sqlx::query_as(
1023 "SELECT op FROM sync_changelog WHERE table_name = 'email_accounts' AND row_id = ?"
1024 )
1025 .bind(&account_id)
1026 .fetch_one(&pool)
1027 .await
1028 .unwrap();
1029 assert_eq!(row.0, "DELETE");
1030 }
1031
1032 #[test]
1033 fn email_account_table_columns_has_17_entries() {
1034 let cols = apply::table_columns("email_accounts").unwrap();
1035 assert_eq!(cols.len(), 17, "email_accounts should sync 17 config columns");
1036 assert!(!cols.contains(&"password"), "password should not be in synced columns");
1037 assert!(!cols.contains(&"oauth2_access_token"), "oauth2_access_token should not be in synced columns");
1038 assert!(!cols.contains(&"oauth2_refresh_token"), "oauth2_refresh_token should not be in synced columns");
1039 assert!(!cols.contains(&"oauth2_token_expires_at"), "oauth2_token_expires_at should not be in synced columns");
1040 assert!(!cols.contains(&"last_sync_at"), "last_sync_at should not be in synced columns");
1041 }
1042
1043 // -- email_accounts credential preservation --
1044
1045 #[tokio::test]
1046 async fn apply_email_account_upsert_creates_with_empty_password() {
1047 let pool = setup_test_db().await;
1048 let user_id = create_test_user(&pool).await;
1049 let account_id = uuid::Uuid::new_v4().to_string();
1050 let now = now_sql();
1051
1052 let data = json!({
1053 "id": account_id,
1054 "user_id": user_id,
1055 "account_name": "Remote Work",
1056 "email_address": "work@example.com",
1057 "imap_server": "imap.example.com",
1058 "imap_port": 993,
1059 "smtp_server": "smtp.example.com",
1060 "smtp_port": 587,
1061 "username": "user",
1062 "use_tls": true,
1063 "created_at": now,
1064 "archive_folder_name": null,
1065 "auth_type": "password",
1066 "jmap_session_url": null,
1067 "jmap_account_id": null,
1068 "sync_interval_minutes": 5,
1069 });
1070
1071 let mut conn = pool.acquire().await.unwrap();
1072 apply::apply_upsert(&mut conn, "email_accounts", &account_id, &data).await.unwrap();
1073
1074 let row: (String, String) = sqlx::query_as(
1075 "SELECT account_name, password FROM email_accounts WHERE id = ?"
1076 )
1077 .bind(&account_id)
1078 .fetch_one(&pool)
1079 .await
1080 .unwrap();
1081 assert_eq!(row.0, "Remote Work");
1082 assert_eq!(row.1, "", "new remote account should have empty password");
1083 }
1084
1085 #[tokio::test]
1086 async fn apply_email_account_upsert_preserves_local_credentials() {
1087 let pool = setup_test_db().await;
1088 let user_id = create_test_user(&pool).await;
1089 let account_id = uuid::Uuid::new_v4().to_string();
1090 let now = now_sql();
1091
1092 // Insert a local account with real credentials
1093 sqlx::query(
1094 "INSERT INTO email_accounts (id, user_id, account_name, email_address, \
1095 imap_server, imap_port, smtp_server, smtp_port, username, password, \
1096 use_tls, created_at, auth_type, oauth2_access_token, oauth2_refresh_token, \
1097 oauth2_token_expires_at, sync_interval_minutes) \
1098 VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \
1099 'smtp.example.com', 587, 'user', 'secret-password', 1, ?, 'oauth2', \
1100 'access-tok-123', 'refresh-tok-456', '2026-12-31T00:00:00Z', 5)"
1101 )
1102 .bind(&account_id)
1103 .bind(&user_id)
1104 .bind(&now)
1105 .execute(&pool)
1106 .await
1107 .unwrap();
1108
1109 // Remote update changes account_name but has no credential fields
1110 let remote_data = json!({
1111 "id": account_id,
1112 "user_id": user_id,
1113 "account_name": "Personal",
1114 "email_address": "test@example.com",
1115 "imap_server": "imap.example.com",
1116 "imap_port": 993,
1117 "smtp_server": "smtp.example.com",
1118 "smtp_port": 587,
1119 "username": "user",
1120 "use_tls": true,
1121 "created_at": now,
1122 "archive_folder_name": null,
1123 "auth_type": "oauth2",
1124 "jmap_session_url": null,
1125 "jmap_account_id": null,
1126 "sync_interval_minutes": 5,
1127 });
1128
1129 let mut conn = pool.acquire().await.unwrap();
1130 apply::apply_upsert(&mut conn, "email_accounts", &account_id, &remote_data).await.unwrap();
1131
1132 // Config should be updated
1133 let row: (String, String, Option<String>, Option<String>, Option<String>) = sqlx::query_as(
1134 "SELECT account_name, password, oauth2_access_token, oauth2_refresh_token, \
1135 oauth2_token_expires_at FROM email_accounts WHERE id = ?"
1136 )
1137 .bind(&account_id)
1138 .fetch_one(&pool)
1139 .await
1140 .unwrap();
1141
1142 assert_eq!(row.0, "Personal", "account_name should be updated");
1143 assert_eq!(row.1, "secret-password", "password should be preserved");
1144 assert_eq!(row.2.as_deref(), Some("access-tok-123"), "oauth2_access_token should be preserved");
1145 assert_eq!(row.3.as_deref(), Some("refresh-tok-456"), "oauth2_refresh_token should be preserved");
1146 assert_eq!(row.4.as_deref(), Some("2026-12-31T00:00:00Z"), "oauth2_token_expires_at should be preserved");
1147 }
1148
1149 #[tokio::test]
1150 async fn apply_upsert_task_with_dangling_source_email_id() {
1151 let pool = setup_test_db().await;
1152 let user_id = create_test_user(&pool).await;
1153 let project_id = uuid::Uuid::new_v4().to_string();
1154 let task_id = uuid::Uuid::new_v4().to_string();
1155 let fake_email_id = uuid::Uuid::new_v4().to_string();
1156 let now = now_sql();
1157
1158 // Apply via apply_remote_changes which disables FK enforcement
1159 let changes = vec![
1160 change("projects", synckit_client::ChangeOp::Insert, &project_id, Some(json!({
1161 "id": project_id, "name": "P", "description": "",
1162 "project_type": "Job", "status": "Active",
1163 "created_at": now, "user_id": user_id,
1164 }))),
1165 change("tasks", synckit_client::ChangeOp::Insert, &task_id, Some(json!({
1166 "id": task_id,
1167 "project_id": project_id,
1168 "description": "Task from email",
1169 "status": "Pending",
1170 "priority": "Medium",
1171 "due": null,
1172 "tags": null,
1173 "urgency": 50,
1174 "recurrence": "None",
1175 "created_at": now,
1176 "user_id": user_id,
1177 "recurrence_parent_id": null,
1178 "source_email_id": fake_email_id,
1179 "snoozed_until": null,
1180 "waiting_for_response": false,
1181 "waiting_since": null,
1182 "expected_response_date": null,
1183 "scheduled_start": null,
1184 "scheduled_duration": null,
1185 "is_focus": false,
1186 "focus_set_at": null,
1187 "contact_id": null,
1188 "milestone_id": null,
1189 }))),
1190 ];
1191
1192 // Should succeed -- FK enforcement is OFF during remote apply
1193 pull::apply_remote_changes(&pool, changes).await.unwrap();
1194
1195 let row: (String, Option<String>) = sqlx::query_as(
1196 "SELECT description, source_email_id FROM tasks WHERE id = ?"
1197 )
1198 .bind(&task_id)
1199 .fetch_one(&pool)
1200 .await
1201 .unwrap();
1202 assert_eq!(row.0, "Task from email");
1203 assert_eq!(row.1.as_deref(), Some(fake_email_id.as_str()), "dangling source_email_id should be stored");
1204 }
1205
1206 #[tokio::test]
1207 async fn email_account_trigger_excludes_credentials() {
1208 let pool = setup_test_db().await;
1209 let user_id = create_test_user(&pool).await;
1210 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
1211
1212 let account_id = uuid::Uuid::new_v4().to_string();
1213 sqlx::query(
1214 "INSERT INTO email_accounts (id, user_id, account_name, email_address, \
1215 imap_server, imap_port, smtp_server, smtp_port, username, password, \
1216 use_tls, created_at, auth_type, oauth2_access_token, oauth2_refresh_token, \
1217 oauth2_token_expires_at, sync_interval_minutes) \
1218 VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \
1219 'smtp.example.com', 587, 'user', 'secret', 1, datetime('now'), 'oauth2', \
1220 'at-123', 'rt-456', '2026-12-31', 5)"
1221 )
1222 .bind(&account_id)
1223 .bind(&user_id)
1224 .execute(&pool)
1225 .await
1226 .unwrap();
1227
1228 let data: (String,) = sqlx::query_as(
1229 "SELECT data FROM sync_changelog WHERE table_name = 'email_accounts' AND row_id = ?"
1230 )
1231 .bind(&account_id)
1232 .fetch_one(&pool)
1233 .await
1234 .unwrap();
1235
1236 let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap();
1237 let obj = parsed.as_object().unwrap();
1238
1239 // Should have exactly the 17 config columns
1240 assert_eq!(obj.len(), 17);
1241
1242 // Credential fields must NOT be present
1243 assert!(!obj.contains_key("password"));
1244 assert!(!obj.contains_key("oauth2_access_token"));
1245 assert!(!obj.contains_key("oauth2_refresh_token"));
1246 assert!(!obj.contains_key("oauth2_token_expires_at"));
1247
1248 // Config fields should be present
1249 assert_eq!(parsed["account_name"], "Work");
1250 assert_eq!(parsed["auth_type"], "oauth2");
1251 assert_eq!(parsed["sync_interval_minutes"], 5);
1252 }
1253
1254 // -- Attachment sync tests --
1255
1256 /// Helper: insert a project and task, return (project_id, task_id).
1257 async fn setup_project_and_task(pool: &SqlitePool, user_id: &str) -> (String, String) {
1258 let project_id = uuid::Uuid::new_v4().to_string();
1259 let task_id = uuid::Uuid::new_v4().to_string();
1260 let now = now_sql();
1261
1262 sqlx::query(
1263 "INSERT INTO projects (id, name, description, project_type, status, created_at, user_id) VALUES (?, ?, ?, ?, ?, ?, ?)"
1264 )
1265 .bind(&project_id).bind("Test Project").bind("").bind("Job").bind("Active").bind(&now).bind(user_id)
1266 .execute(pool).await.unwrap();
1267
1268 sqlx::query(
1269 "INSERT INTO tasks (id, project_id, description, status, priority, urgency, recurrence, created_at, user_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
1270 )
1271 .bind(&task_id).bind(&project_id).bind("Test Task").bind("Pending").bind("Medium").bind(50).bind("None").bind(&now).bind(user_id)
1272 .execute(pool).await.unwrap();
1273
1274 (project_id, task_id)
1275 }
1276
1277 #[test]
1278 fn attachment_table_columns_whitelist_has_10_entries() {
1279 let cols = apply::table_columns("attachments").unwrap();
1280 assert_eq!(cols.len(), 10);
1281 assert_eq!(cols[0], "id");
1282 assert!(cols.contains(&"blob_hash"), "blob_hash must be synced for blob resolution");
1283 assert!(cols.contains(&"filename"));
1284 assert!(cols.contains(&"file_size"));
1285 assert!(cols.contains(&"mime_type"));
1286 }
1287
1288 #[test]
1289 fn time_sessions_table_columns_whitelist_has_7_entries() {
1290 let cols = apply::table_columns("time_sessions").unwrap();
1291 assert_eq!(cols.len(), 7);
1292 assert_eq!(cols[0], "id");
1293 assert!(cols.contains(&"task_id"));
1294 assert!(cols.contains(&"duration_minutes"));
1295 }
1296
1297 #[test]
1298 fn upsert_order_has_attachments_after_tasks() {
1299 let pos = |table: &str| UPSERT_ORDER.iter().position(|t| *t == table);
1300 // attachments reference tasks.id
1301 assert!(pos("tasks").unwrap() < pos("attachments").unwrap());
1302 }
1303
1304 #[test]
1305 fn delete_order_has_attachments_before_tasks() {
1306 let pos = |table: &str| DELETE_ORDER.iter().position(|t| *t == table);
1307 assert!(pos("attachments").unwrap() < pos("tasks").unwrap());
1308 }
1309
1310 #[test]
1311 fn upsert_order_has_time_sessions_after_tasks() {
1312 let pos = |table: &str| UPSERT_ORDER.iter().position(|t| *t == table);
1313 assert!(pos("tasks").unwrap() < pos("time_sessions").unwrap());
1314 }
1315
1316 #[test]
1317 fn delete_order_has_time_sessions_before_tasks() {
1318 let pos = |table: &str| DELETE_ORDER.iter().position(|t| *t == table);
1319 assert!(pos("time_sessions").unwrap() < pos("tasks").unwrap());
1320 }
1321
1322 #[tokio::test]
1323 async fn attachment_insert_fires_sync_trigger() {
1324 let pool = setup_test_db().await;
1325 let user_id = create_test_user(&pool).await;
1326 let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await;
1327 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
1328
1329 let att_id = uuid::Uuid::new_v4().to_string();
1330 sqlx::query(
1331 "INSERT INTO attachments (id, user_id, task_id, filename, file_size, mime_type, blob_hash, created_at) \
1332 VALUES (?, ?, ?, 'report.pdf', 12345, 'application/pdf', 'abc123hash', datetime('now'))"
1333 )
1334 .bind(&att_id).bind(&user_id).bind(&task_id)
1335 .execute(&pool).await.unwrap();
1336
1337 let count: (i64,) = sqlx::query_as(
1338 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'attachments' AND row_id = ?"
1339 ).bind(&att_id).fetch_one(&pool).await.unwrap();
1340 assert_eq!(count.0, 1, "attachment insert should fire sync trigger");
1341
1342 // Verify trigger data contains blob_hash (critical for blob resolution)
1343 let data: (String,) = sqlx::query_as(
1344 "SELECT data FROM sync_changelog WHERE table_name = 'attachments' AND row_id = ?"
1345 ).bind(&att_id).fetch_one(&pool).await.unwrap();
1346 let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap();
1347 assert_eq!(parsed["blob_hash"], "abc123hash");
1348 assert_eq!(parsed["filename"], "report.pdf");
1349 assert_eq!(parsed["file_size"], 12345);
1350 }
1351
1352 #[tokio::test]
1353 async fn attachment_delete_fires_sync_trigger() {
1354 let pool = setup_test_db().await;
1355 let user_id = create_test_user(&pool).await;
1356 let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await;
1357
1358 let att_id = uuid::Uuid::new_v4().to_string();
1359 sqlx::query(
1360 "INSERT INTO attachments (id, user_id, task_id, filename, file_size, mime_type, blob_hash, created_at) \
1361 VALUES (?, ?, ?, 'file.txt', 100, 'text/plain', 'hash1', datetime('now'))"
1362 )
1363 .bind(&att_id).bind(&user_id).bind(&task_id)
1364 .execute(&pool).await.unwrap();
1365 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
1366
1367 sqlx::query("DELETE FROM attachments WHERE id = ?")
1368 .bind(&att_id).execute(&pool).await.unwrap();
1369
1370 let row: (String,) = sqlx::query_as(
1371 "SELECT op FROM sync_changelog WHERE table_name = 'attachments' AND row_id = ?"
1372 ).bind(&att_id).fetch_one(&pool).await.unwrap();
1373 assert_eq!(row.0, "DELETE");
1374 }
1375
1376 #[tokio::test]
1377 async fn apply_upsert_attachment_round_trip() {
1378 let pool = setup_test_db().await;
1379 let user_id = create_test_user(&pool).await;
1380 let (project_id, task_id) = setup_project_and_task(&pool, &user_id).await;
1381 let att_id = uuid::Uuid::new_v4().to_string();
1382 let now = now_sql();
1383
1384 let data = json!({
1385 "id": att_id,
1386 "user_id": user_id,
1387 "task_id": task_id,
1388 "project_id": project_id,
1389 "filename": "slides.pptx",
1390 "file_size": 98765,
1391 "mime_type": "application/vnd.ms-powerpoint",
1392 "blob_hash": "sha256-deadbeef",
1393 "source_email_id": null,
1394 "created_at": now,
1395 });
1396
1397 let mut conn = pool.acquire().await.unwrap();
1398 apply::apply_upsert(&mut conn, "attachments", &att_id, &data).await.unwrap();
1399
1400 let row: (String, i64, String) = sqlx::query_as(
1401 "SELECT filename, file_size, blob_hash FROM attachments WHERE id = ?"
1402 ).bind(&att_id).fetch_one(&pool).await.unwrap();
1403 assert_eq!(row.0, "slides.pptx");
1404 assert_eq!(row.1, 98765);
1405 assert_eq!(row.2, "sha256-deadbeef");
1406 }
1407
1408 #[tokio::test]
1409 async fn apply_delete_attachment() {
1410 let pool = setup_test_db().await;
1411 let user_id = create_test_user(&pool).await;
1412 let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await;
1413 let att_id = uuid::Uuid::new_v4().to_string();
1414 let now = now_sql();
1415
1416 let mut conn = pool.acquire().await.unwrap();
1417 apply::apply_upsert(&mut conn, "attachments", &att_id, &json!({
1418 "id": att_id, "user_id": user_id, "task_id": task_id,
1419 "project_id": null, "filename": "delete-me.txt",
1420 "file_size": 10, "mime_type": "text/plain", "blob_hash": "h",
1421 "source_email_id": null, "created_at": now,
1422 })).await.unwrap();
1423
1424 apply::apply_delete(&mut conn, "attachments", &att_id).await.unwrap();
1425
1426 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM attachments WHERE id = ?")
1427 .bind(&att_id).fetch_one(&pool).await.unwrap();
1428 assert_eq!(count.0, 0);
1429 }
1430
1431 #[tokio::test]
1432 async fn time_session_insert_fires_sync_trigger() {
1433 let pool = setup_test_db().await;
1434 let user_id = create_test_user(&pool).await;
1435 let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await;
1436 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
1437
1438 let session_id = uuid::Uuid::new_v4().to_string();
1439 sqlx::query(
1440 "INSERT INTO time_sessions (id, task_id, user_id, started_at, created_at) \
1441 VALUES (?, ?, ?, datetime('now'), datetime('now'))"
1442 )
1443 .bind(&session_id).bind(&task_id).bind(&user_id)
1444 .execute(&pool).await.unwrap();
1445
1446 let count: (i64,) = sqlx::query_as(
1447 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'time_sessions' AND row_id = ?"
1448 ).bind(&session_id).fetch_one(&pool).await.unwrap();
1449 assert_eq!(count.0, 1, "time_session insert should fire sync trigger");
1450
1451 let data: (String,) = sqlx::query_as(
1452 "SELECT data FROM sync_changelog WHERE table_name = 'time_sessions' AND row_id = ?"
1453 ).bind(&session_id).fetch_one(&pool).await.unwrap();
1454 let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap();
1455 assert_eq!(parsed["task_id"], task_id);
1456 }
1457
1458 #[tokio::test]
1459 async fn time_session_update_fires_sync_trigger() {
1460 let pool = setup_test_db().await;
1461 let user_id = create_test_user(&pool).await;
1462 let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await;
1463
1464 let session_id = uuid::Uuid::new_v4().to_string();
1465 sqlx::query(
1466 "INSERT INTO time_sessions (id, task_id, user_id, started_at, created_at) \
1467 VALUES (?, ?, ?, datetime('now'), datetime('now'))"
1468 )
1469 .bind(&session_id).bind(&task_id).bind(&user_id)
1470 .execute(&pool).await.unwrap();
1471 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
1472
1473 // Stop the timer
1474 sqlx::query("UPDATE time_sessions SET ended_at = datetime('now'), duration_minutes = 25 WHERE id = ?")
1475 .bind(&session_id).execute(&pool).await.unwrap();
1476
1477 let row: (String, String) = sqlx::query_as(
1478 "SELECT op, data FROM sync_changelog WHERE table_name = 'time_sessions' AND row_id = ?"
1479 ).bind(&session_id).fetch_one(&pool).await.unwrap();
1480 assert_eq!(row.0, "UPDATE");
1481
1482 let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap();
1483 assert_eq!(parsed["duration_minutes"], 25);
1484 }
1485
1486 #[tokio::test]
1487 async fn apply_upsert_time_session_round_trip() {
1488 let pool = setup_test_db().await;
1489 let user_id = create_test_user(&pool).await;
1490 let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await;
1491 let session_id = uuid::Uuid::new_v4().to_string();
1492 let now = now_sql();
1493
1494 let data = json!({
1495 "id": session_id,
1496 "task_id": task_id,
1497 "user_id": user_id,
1498 "started_at": now,
1499 "ended_at": now,
1500 "duration_minutes": 42,
1501 "created_at": now,
1502 });
1503
1504 let mut conn = pool.acquire().await.unwrap();
1505 apply::apply_upsert(&mut conn, "time_sessions", &session_id, &data).await.unwrap();
1506
1507 let row: (String, i32) = sqlx::query_as(
1508 "SELECT task_id, duration_minutes FROM time_sessions WHERE id = ?"
1509 ).bind(&session_id).fetch_one(&pool).await.unwrap();
1510 assert_eq!(row.0, task_id);
1511 assert_eq!(row.1, 42);
1512 }
1513
1514 #[tokio::test]
1515 async fn apply_remote_changes_attachment_with_task_in_same_batch() {
1516 let pool = setup_test_db().await;
1517 let user_id = create_test_user(&pool).await;
1518 let project_id = uuid::Uuid::new_v4().to_string();
1519 let task_id = uuid::Uuid::new_v4().to_string();
1520 let att_id = uuid::Uuid::new_v4().to_string();
1521 let now = now_sql();
1522
1523 // Send task + attachment in same batch (attachment references task via FK)
1524 // apply_remote_changes must handle FK ordering correctly
1525 let changes = vec![
1526 change("attachments", synckit_client::ChangeOp::Insert, &att_id, Some(json!({
1527 "id": att_id, "user_id": user_id, "task_id": task_id,
1528 "project_id": project_id, "filename": "batch.pdf",
1529 "file_size": 500, "mime_type": "application/pdf",
1530 "blob_hash": "batchhash", "source_email_id": null, "created_at": now,
1531 }))),
1532 change("tasks", synckit_client::ChangeOp::Insert, &task_id, Some(json!({
1533 "id": task_id, "project_id": project_id, "description": "Batch task",
1534 "status": "Pending", "priority": "Low", "due": null,
1535 "tags": null, "urgency": 10, "recurrence": "None",
1536 "created_at": now, "user_id": user_id,
1537 "recurrence_parent_id": null, "source_email_id": null,
1538 "snoozed_until": null, "waiting_for_response": false,
1539 "waiting_since": null, "expected_response_date": null,
1540 "scheduled_start": null, "scheduled_duration": null,
1541 "is_focus": false, "focus_set_at": null,
1542 "contact_id": null, "milestone_id": null,
1543 "completed_at": null, "estimated_minutes": null, "actual_minutes": 0,
1544 }))),
1545 change("projects", synckit_client::ChangeOp::Insert, &project_id, Some(json!({
1546 "id": project_id, "name": "Batch project", "description": "",
1547 "project_type": "Job", "status": "Active",
1548 "created_at": now, "user_id": user_id,
1549 }))),
1550 ];
1551
1552 // Should succeed despite wrong input order (attachment before task before project)
1553 pull::apply_remote_changes(&pool, changes).await.unwrap();
1554
1555 let att: (String,) = sqlx::query_as("SELECT filename FROM attachments WHERE id = ?")
1556 .bind(&att_id).fetch_one(&pool).await.unwrap();
1557 assert_eq!(att.0, "batch.pdf");
1558
1559 // Triggers should be suppressed -- no changelog entries
1560 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
1561 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog")
1562 .fetch_one(&pool).await.unwrap();
1563 assert_eq!(count.0, 0);
1564 }
1565
1566 #[tokio::test]
1567 async fn initial_snapshot_captures_attachments_and_time_sessions() {
1568 let pool = setup_test_db().await;
1569 let user_id = create_test_user(&pool).await;
1570 let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await;
1571
1572 // Insert attachment and time_session with triggers suppressed
1573 set_sync_state(&pool, "applying_remote", "1").await.unwrap();
1574
1575 let att_id = uuid::Uuid::new_v4().to_string();
1576 sqlx::query(
1577 "INSERT INTO attachments (id, user_id, task_id, filename, file_size, mime_type, blob_hash, created_at) \
1578 VALUES (?, ?, ?, 'snap.pdf', 100, 'application/pdf', 'snaphash', datetime('now'))"
1579 )
1580 .bind(&att_id).bind(&user_id).bind(&task_id)
1581 .execute(&pool).await.unwrap();
1582
1583 let session_id = uuid::Uuid::new_v4().to_string();
1584 sqlx::query(
1585 "INSERT INTO time_sessions (id, task_id, user_id, started_at, ended_at, duration_minutes, created_at) \
1586 VALUES (?, ?, ?, datetime('now', '-1 hour'), datetime('now'), 60, datetime('now'))"
1587 )
1588 .bind(&session_id).bind(&task_id).bind(&user_id)
1589 .execute(&pool).await.unwrap();
1590
1591 set_sync_state(&pool, "applying_remote", "0").await.unwrap();
1592 sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap();
1593
1594 let total = create_initial_snapshot(&pool).await.unwrap();
1595
1596 // Should capture: 1 project + 1 task + 1 attachment + 1 time_session = 4
1597 assert_eq!(total, 4, "snapshot should capture projects, tasks, attachments, and time_sessions");
1598
1599 let att_count: (i64,) = sqlx::query_as(
1600 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'attachments'"
1601 ).fetch_one(&pool).await.unwrap();
1602 assert_eq!(att_count.0, 1);
1603
1604 let ts_count: (i64,) = sqlx::query_as(
1605 "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'time_sessions'"
1606 ).fetch_one(&pool).await.unwrap();
1607 assert_eq!(ts_count.0, 1);
1608 }
1609