use crate::sync_service::*; use crate::sync_service::{apply, pull, UPSERT_ORDER, DELETE_ORDER}; use goingson_db_sqlite::{init_pool, run_migrations}; use serde_json::json; use sqlx::SqlitePool; /// Format the current UTC time as a SQL-compatible timestamp string. fn now_sql() -> String { chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string() } /// Creates an in-memory test database with all migrations applied. async fn setup_test_db() -> SqlitePool { let pool = init_pool(Some(":memory:")) .await .expect("Failed to create in-memory pool"); run_migrations(&pool) .await .expect("Failed to run migrations"); pool } /// Creates a test user and returns their UUID string. async fn create_test_user(pool: &SqlitePool) -> String { let user_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); sqlx::query( "INSERT INTO users (id, email, password_hash, display_name, created_at) VALUES (?, ?, ?, ?, ?)" ) .bind(&user_id) .bind(format!("test-{}@example.com", user_id)) .bind("test-password-hash") .bind("Test User") .bind(&now) .execute(pool) .await .expect("Failed to create test user"); user_id } /// Helper: build a ChangeEntry for tests. fn change(table: &str, op: synckit_client::ChangeOp, row_id: &str, data: Option) -> synckit_client::ChangeEntry { synckit_client::ChangeEntry { table: table.to_string(), op, row_id: row_id.to_string(), timestamp: chrono::Utc::now(), data, } } // -- FK ordering tests -- #[test] fn upsert_order_has_parents_before_children() { let pos = |table: &str| UPSERT_ORDER.iter().position(|t| *t == table); // projects before tasks, events, milestones assert!(pos("projects").unwrap() < pos("tasks").unwrap()); assert!(pos("projects").unwrap() < pos("events").unwrap()); assert!(pos("projects").unwrap() < pos("milestones").unwrap()); // contacts before contact_emails, contact_phones, etc. assert!(pos("contacts").unwrap() < pos("contact_emails").unwrap()); assert!(pos("contacts").unwrap() < pos("contact_phones").unwrap()); assert!(pos("contacts").unwrap() < pos("contact_social_handles").unwrap()); assert!(pos("contacts").unwrap() < pos("contact_custom_fields").unwrap()); // tasks before annotations, subtasks assert!(pos("tasks").unwrap() < pos("annotations").unwrap()); assert!(pos("tasks").unwrap() < pos("subtasks").unwrap()); // milestones before tasks (tasks.milestone_id FK) assert!(pos("milestones").unwrap() < pos("tasks").unwrap()); } #[test] fn delete_order_has_children_before_parents() { let pos = |table: &str| DELETE_ORDER.iter().position(|t| *t == table); // children before parents (reverse of upsert) assert!(pos("tasks").unwrap() < pos("projects").unwrap()); assert!(pos("events").unwrap() < pos("projects").unwrap()); assert!(pos("milestones").unwrap() < pos("projects").unwrap()); assert!(pos("contact_emails").unwrap() < pos("contacts").unwrap()); assert!(pos("contact_phones").unwrap() < pos("contacts").unwrap()); assert!(pos("contact_social_handles").unwrap() < pos("contacts").unwrap()); assert!(pos("contact_custom_fields").unwrap() < pos("contacts").unwrap()); assert!(pos("annotations").unwrap() < pos("tasks").unwrap()); assert!(pos("subtasks").unwrap() < pos("tasks").unwrap()); assert!(pos("tasks").unwrap() < pos("milestones").unwrap()); } #[test] fn upsert_and_delete_orders_are_exact_reverses() { let reversed: Vec<&str> = UPSERT_ORDER.iter().rev().copied().collect(); assert_eq!(reversed, DELETE_ORDER); } #[test] fn all_upsert_tables_have_column_whitelists() { for table in UPSERT_ORDER { assert!( apply::table_columns(table).is_some(), "table_columns missing for syncable table: {}", table ); } } #[test] fn unknown_table_returns_none() { assert!(apply::table_columns("nonexistent").is_none()); assert!(apply::table_columns("users").is_none()); assert!(apply::table_columns("emails").is_none()); } #[test] fn every_table_whitelist_starts_with_id() { for table in UPSERT_ORDER { let cols = apply::table_columns(table).unwrap(); assert_eq!( cols[0], "id", "table {} column whitelist should start with 'id'", table ); } } // -- sync_state helpers -- #[tokio::test] async fn get_sync_state_returns_empty_for_missing_key() { let pool = setup_test_db().await; let val = get_sync_state(&pool, "nonexistent_key").await.unwrap(); assert_eq!(val, ""); } #[tokio::test] async fn set_and_get_sync_state() { let pool = setup_test_db().await; set_sync_state(&pool, "test_key", "test_value").await.unwrap(); let val = get_sync_state(&pool, "test_key").await.unwrap(); assert_eq!(val, "test_value"); } #[tokio::test] async fn set_sync_state_overwrites_existing() { let pool = setup_test_db().await; set_sync_state(&pool, "key", "first").await.unwrap(); set_sync_state(&pool, "key", "second").await.unwrap(); let val = get_sync_state(&pool, "key").await.unwrap(); assert_eq!(val, "second"); } #[tokio::test] async fn migration_seeds_default_sync_state() { let pool = setup_test_db().await; assert_eq!(get_sync_state(&pool, "device_id").await.unwrap(), ""); assert_eq!(get_sync_state(&pool, "pull_cursor").await.unwrap(), "0"); assert_eq!(get_sync_state(&pool, "applying_remote").await.unwrap(), "0"); assert_eq!(get_sync_state(&pool, "initial_snapshot_done").await.unwrap(), "0"); } // -- apply_upsert -- #[tokio::test] async fn apply_upsert_inserts_project() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let data = json!({ "id": project_id, "name": "Synced Project", "description": "From remote", "project_type": "SideProject", "status": "Active", "created_at": now, "user_id": user_id, }); let mut conn = pool.acquire().await.unwrap(); apply::apply_upsert(&mut conn, "projects", &project_id, &data).await.unwrap(); let row: (String,) = sqlx::query_as("SELECT name FROM projects WHERE id = ?") .bind(&project_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "Synced Project"); } #[tokio::test] async fn apply_upsert_replaces_existing_row() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let data1 = json!({ "id": project_id, "name": "Original", "description": "", "project_type": "Job", "status": "Active", "created_at": now, "user_id": user_id, }); let mut conn = pool.acquire().await.unwrap(); apply::apply_upsert(&mut conn, "projects", &project_id, &data1).await.unwrap(); let data2 = json!({ "id": project_id, "name": "Updated", "description": "changed", "project_type": "Job", "status": "OnHold", "created_at": now, "user_id": user_id, }); apply::apply_upsert(&mut conn, "projects", &project_id, &data2).await.unwrap(); let row: (String, String) = sqlx::query_as("SELECT name, status FROM projects WHERE id = ?") .bind(&project_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "Updated"); assert_eq!(row.1, "OnHold"); } #[tokio::test] async fn apply_upsert_rejects_unknown_table() { let pool = setup_test_db().await; let data = json!({"id": "abc"}); let mut conn = pool.acquire().await.unwrap(); let result = apply::apply_upsert(&mut conn, "nonexistent", "abc", &data).await; assert!(result.is_err()); let err_msg = result.unwrap_err().to_string(); assert!(err_msg.contains("unknown syncable table")); } #[tokio::test] async fn apply_upsert_handles_null_fields() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let mut conn = pool.acquire().await.unwrap(); // Insert a project to be a FK parent for the task let data_project = json!({ "id": project_id, "name": "Parent", "description": "", "project_type": "Job", "status": "Active", "created_at": now, "user_id": user_id, }); apply::apply_upsert(&mut conn, "projects", &project_id, &data_project).await.unwrap(); let task_id = uuid::Uuid::new_v4().to_string(); let data = json!({ "id": task_id, "project_id": project_id, "description": "Null test", "status": "Pending", "priority": "Medium", "due": null, "tags": null, "urgency": 50, "recurrence": "None", "created_at": now, "user_id": user_id, "recurrence_parent_id": null, "source_email_id": null, "snoozed_until": null, "waiting_for_response": false, "waiting_since": null, "expected_response_date": null, "scheduled_start": null, "scheduled_duration": null, "is_focus": false, "focus_set_at": null, "contact_id": null, "milestone_id": null, }); apply::apply_upsert(&mut conn, "tasks", &task_id, &data).await.unwrap(); let row: (String,) = sqlx::query_as("SELECT description FROM tasks WHERE id = ?") .bind(&task_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "Null test"); } // -- apply_delete -- #[tokio::test] async fn apply_delete_removes_row() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let mut conn = pool.acquire().await.unwrap(); let data = json!({ "id": project_id, "name": "To Delete", "description": "", "project_type": "Other", "status": "Active", "created_at": now, "user_id": user_id, }); apply::apply_upsert(&mut conn, "projects", &project_id, &data).await.unwrap(); apply::apply_delete(&mut conn, "projects", &project_id).await.unwrap(); let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM projects WHERE id = ?") .bind(&project_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0); } #[tokio::test] async fn apply_delete_rejects_unknown_table() { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); let result = apply::apply_delete(&mut conn, "not_a_table", "abc").await; assert!(result.is_err()); let err_msg = result.unwrap_err().to_string(); assert!(err_msg.contains("unknown syncable table")); } #[tokio::test] async fn apply_delete_is_idempotent() { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); // Deleting a non-existent row should succeed silently. let result = apply::apply_delete(&mut conn, "projects", "nonexistent-id").await; assert!(result.is_ok()); } // -- apply_changes_inner ordering -- #[tokio::test] async fn apply_changes_inner_processes_upserts_in_fk_order() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let task_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); // Provide changes in WRONG order (child before parent) to prove re-ordering works. let changes = vec![ change("tasks", synckit_client::ChangeOp::Insert, &task_id, Some(json!({ "id": task_id, "project_id": project_id, "description": "Child task", "status": "Pending", "priority": "Low", "due": null, "tags": null, "urgency": 10, "recurrence": "None", "created_at": now, "user_id": user_id, "recurrence_parent_id": null, "source_email_id": null, "snoozed_until": null, "waiting_for_response": false, "waiting_since": null, "expected_response_date": null, "scheduled_start": null, "scheduled_duration": null, "is_focus": false, "focus_set_at": null, "contact_id": null, "milestone_id": null, }))), change("projects", synckit_client::ChangeOp::Insert, &project_id, Some(json!({ "id": project_id, "name": "Parent project", "description": "", "project_type": "Job", "status": "Active", "created_at": now, "user_id": user_id, }))), ]; // Should succeed despite task coming before project in the input -- // apply_changes_inner iterates UPSERT_ORDER so projects is applied first. let mut conn = pool.acquire().await.unwrap(); pull::apply_changes_inner(&mut conn, changes).await.unwrap(); let project: (String,) = sqlx::query_as("SELECT name FROM projects WHERE id = ?") .bind(&project_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(project.0, "Parent project"); let task: (String,) = sqlx::query_as("SELECT description FROM tasks WHERE id = ?") .bind(&task_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(task.0, "Child task"); } #[tokio::test] async fn apply_changes_inner_processes_deletes_in_child_first_order() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let task_id = uuid::Uuid::new_v4().to_string(); let subtask_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let mut conn = pool.acquire().await.unwrap(); // Insert parent -> child -> grandchild apply::apply_upsert(&mut conn, "projects", &project_id, &json!({ "id": project_id, "name": "P", "description": "", "project_type": "Job", "status": "Active", "created_at": now, "user_id": user_id, })).await.unwrap(); apply::apply_upsert(&mut conn, "tasks", &task_id, &json!({ "id": task_id, "project_id": project_id, "description": "T", "status": "Pending", "priority": "Low", "due": null, "tags": null, "urgency": 10, "recurrence": "None", "created_at": now, "user_id": user_id, "recurrence_parent_id": null, "source_email_id": null, "snoozed_until": null, "waiting_for_response": false, "waiting_since": null, "expected_response_date": null, "scheduled_start": null, "scheduled_duration": null, "is_focus": false, "focus_set_at": null, "contact_id": null, "milestone_id": null, })).await.unwrap(); apply::apply_upsert(&mut conn, "subtasks", &subtask_id, &json!({ "id": subtask_id, "task_id": task_id, "text": "Sub", "is_completed": false, "position": 0, "created_at": now, "linked_task_id": null, })).await.unwrap(); // Delete in WRONG input order (parent first) -- apply_changes_inner // should re-order to subtask -> task -> project. let changes = vec![ change("projects", synckit_client::ChangeOp::Delete, &project_id, None), change("tasks", synckit_client::ChangeOp::Delete, &task_id, None), change("subtasks", synckit_client::ChangeOp::Delete, &subtask_id, None), ]; pull::apply_changes_inner(&mut conn, changes).await.unwrap(); let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM projects WHERE id = ?") .bind(&project_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0); let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tasks WHERE id = ?") .bind(&task_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0); } #[tokio::test] async fn apply_changes_inner_handles_mixed_ops() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let now = now_sql(); let mut conn = pool.acquire().await.unwrap(); // Create a project to delete later let delete_id = uuid::Uuid::new_v4().to_string(); apply::apply_upsert(&mut conn, "projects", &delete_id, &json!({ "id": delete_id, "name": "Will Delete", "description": "", "project_type": "Job", "status": "Active", "created_at": now, "user_id": user_id, })).await.unwrap(); // Mix upsert and delete in a single batch let insert_id = uuid::Uuid::new_v4().to_string(); let changes = vec![ change("projects", synckit_client::ChangeOp::Delete, &delete_id, None), change("projects", synckit_client::ChangeOp::Insert, &insert_id, Some(json!({ "id": insert_id, "name": "New One", "description": "", "project_type": "Other", "status": "Active", "created_at": now, "user_id": user_id, }))), ]; pull::apply_changes_inner(&mut conn, changes).await.unwrap(); // The old project was deleted let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM projects WHERE id = ?") .bind(&delete_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0); // The new project was inserted let row: (String,) = sqlx::query_as("SELECT name FROM projects WHERE id = ?") .bind(&insert_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "New One"); } #[tokio::test] async fn apply_changes_inner_skips_upsert_without_data() { let pool = setup_test_db().await; // INSERT with no data should be skipped (data is None) let changes = vec![change("projects", synckit_client::ChangeOp::Insert, "abc", None)]; let mut conn = pool.acquire().await.unwrap(); let result = pull::apply_changes_inner(&mut conn, changes).await; assert!(result.is_ok()); let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM projects WHERE id = ?") .bind("abc") .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0); } // -- Trigger suppression -- #[tokio::test] async fn apply_remote_changes_suppresses_triggers() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); // Clear any existing changelog entries sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); let changes = vec![ change("projects", synckit_client::ChangeOp::Insert, &project_id, Some(json!({ "id": project_id, "name": "Remote Project", "description": "", "project_type": "Job", "status": "Active", "created_at": now, "user_id": user_id, }))), ]; pull::apply_remote_changes(&pool, changes).await.unwrap(); // The project should exist let row: (String,) = sqlx::query_as("SELECT name FROM projects WHERE id = ?") .bind(&project_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "Remote Project"); // But the changelog should be empty -- triggers were suppressed let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 0, "trigger should not fire during remote apply"); // The applying_remote flag should be cleared let flag = get_sync_state(&pool, "applying_remote").await.unwrap(); assert_eq!(flag, "0", "applying_remote should be reset after apply"); } #[tokio::test] async fn apply_remote_changes_clears_flag_after_success() { let pool = setup_test_db().await; // Verify the flag is set to "1" during apply and cleared to "0" after. // We can test this by calling apply_remote_changes with an empty batch. let changes = vec![]; pull::apply_remote_changes(&pool, changes).await.unwrap(); let flag = get_sync_state(&pool, "applying_remote").await.unwrap(); assert_eq!(flag, "0", "applying_remote flag should be cleared after apply"); } #[tokio::test] async fn apply_remote_changes_clears_flag_on_error() { let pool = setup_test_db().await; // Test: apply_upsert on unknown table returns BadRequest let mut conn = pool.acquire().await.unwrap(); let result = apply::apply_upsert(&mut conn, "fake_table", "id", &json!({"id": "x"})).await; assert!(result.is_err()); match result.unwrap_err() { goingson_core::CoreError::BadRequest(msg) => assert!(msg.contains("unknown syncable table")), other => panic!("Expected BadRequest, got: {:?}", other), } } #[tokio::test] async fn local_insert_fires_trigger_normally() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); // Clear changelog sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); // Insert directly (not via apply_remote_changes) -- trigger should fire sqlx::query( "INSERT INTO projects (id, name, description, project_type, status, created_at, user_id) VALUES (?, ?, ?, ?, ?, ?, ?)" ) .bind(&project_id) .bind("Local Project") .bind("") .bind("Job") .bind("Active") .bind(&now) .bind(&user_id) .execute(&pool) .await .unwrap(); // Trigger should have created a changelog entry let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'projects' AND row_id = ?") .bind(&project_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 1, "local insert should fire sync trigger"); } // -- Initial snapshot -- #[tokio::test] async fn create_initial_snapshot_captures_all_rows() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let now = now_sql(); // Suppress triggers during setup so only snapshot entries end up in changelog set_sync_state(&pool, "applying_remote", "1").await.unwrap(); // Insert 2 projects for i in 1..=2 { let pid = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO projects (id, name, description, project_type, status, created_at, user_id) VALUES (?, ?, ?, ?, ?, ?, ?)" ) .bind(&pid) .bind(format!("Project {}", i)) .bind("") .bind("Job") .bind("Active") .bind(&now) .bind(&user_id) .execute(&pool) .await .unwrap(); } set_sync_state(&pool, "applying_remote", "0").await.unwrap(); // Clear any stray changelog entries sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); let total = create_initial_snapshot(&pool).await.unwrap(); assert_eq!(total, 2); // Verify changelog entries let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'projects'") .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 2); // Verify snapshot flag was set let flag = get_sync_state(&pool, "initial_snapshot_done").await.unwrap(); assert_eq!(flag, "1"); } #[tokio::test] async fn create_initial_snapshot_empty_db() { let pool = setup_test_db().await; let _user_id = create_test_user(&pool).await; // No data rows -- snapshot should report 0 sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); let total = create_initial_snapshot(&pool).await.unwrap(); assert_eq!(total, 0); } // -- Changelog cleanup -- #[tokio::test] async fn cleanup_changelog_removes_old_pushed_entries() { let pool = setup_test_db().await; // Insert a pushed entry with an old timestamp sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) VALUES (?, ?, ?, datetime('now', '-10 days'), 1)" ) .bind("projects") .bind("INSERT") .bind("old-id") .execute(&pool) .await .unwrap(); // Insert a recent pushed entry sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) VALUES (?, ?, ?, datetime('now'), 1)" ) .bind("projects") .bind("INSERT") .bind("recent-id") .execute(&pool) .await .unwrap(); // Insert an unpushed entry (should never be cleaned) sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, timestamp, pushed) VALUES (?, ?, ?, datetime('now', '-10 days'), 0)" ) .bind("projects") .bind("INSERT") .bind("unpushed-id") .execute(&pool) .await .unwrap(); let deleted = cleanup_changelog(&pool).await.unwrap(); assert_eq!(deleted, 1, "should only delete old pushed entries"); // Verify the remaining entries let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 2, "recent pushed + unpushed should remain"); } // -- Count pending changes -- #[tokio::test] async fn count_pending_changes_counts_unpushed() { let pool = setup_test_db().await; // Clear any existing entries sqlx::query("DELETE FROM sync_changelog") .execute(&pool) .await .unwrap(); // Insert 3 unpushed for i in 0..3 { sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, pushed) VALUES (?, ?, ?, 0)" ) .bind("projects") .bind("INSERT") .bind(format!("id-{}", i)) .execute(&pool) .await .unwrap(); } // Insert 2 pushed for i in 0..2 { sqlx::query( "INSERT INTO sync_changelog (table_name, op, row_id, pushed) VALUES (?, ?, ?, 1)" ) .bind("projects") .bind("INSERT") .bind(format!("pushed-{}", i)) .execute(&pool) .await .unwrap(); } let count = count_pending_changes(&pool).await.unwrap(); assert_eq!(count, 3); } // -- Contact child table upserts -- #[tokio::test] async fn apply_upsert_contact_with_children() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let contact_id = uuid::Uuid::new_v4().to_string(); let email_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let mut conn = pool.acquire().await.unwrap(); // Insert contact (parent) apply::apply_upsert(&mut conn, "contacts", &contact_id, &json!({ "id": contact_id, "user_id": user_id, "display_name": "Test Contact", "nickname": null, "company": null, "title": null, "notes": null, "tags": null, "birthday": null, "timezone": null, "created_at": now, "updated_at": now, })).await.unwrap(); // Insert contact_email (child) apply::apply_upsert(&mut conn, "contact_emails", &email_id, &json!({ "id": email_id, "contact_id": contact_id, "address": "test@example.com", "label": "work", "is_primary": true, })).await.unwrap(); let row: (String,) = sqlx::query_as("SELECT address FROM contact_emails WHERE id = ?") .bind(&email_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "test@example.com"); } // -- Boolean handling -- #[tokio::test] async fn apply_upsert_handles_booleans_as_integers() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let contact_id = uuid::Uuid::new_v4().to_string(); let email_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let mut conn = pool.acquire().await.unwrap(); apply::apply_upsert(&mut conn, "contacts", &contact_id, &json!({ "id": contact_id, "user_id": user_id, "display_name": "Bool Test", "nickname": null, "company": null, "title": null, "notes": null, "tags": null, "birthday": null, "timezone": null, "created_at": now, "updated_at": now, })).await.unwrap(); // is_primary as a JSON boolean -- should be stored as integer apply::apply_upsert(&mut conn, "contact_emails", &email_id, &json!({ "id": email_id, "contact_id": contact_id, "address": "bool@test.com", "label": "home", "is_primary": true, })).await.unwrap(); let row: (i32,) = sqlx::query_as("SELECT is_primary FROM contact_emails WHERE id = ?") .bind(&email_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, 1); } // -- email_accounts sync triggers -- #[tokio::test] async fn email_account_insert_fires_trigger() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); let account_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO email_accounts (id, user_id, account_name, email_address, \ imap_server, imap_port, smtp_server, smtp_port, username, password, \ use_tls, created_at) \ VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \ 'smtp.example.com', 587, 'user', 'pass', 1, datetime('now'))" ) .bind(&account_id) .bind(&user_id) .execute(&pool) .await .unwrap(); let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'email_accounts' AND op = 'INSERT'" ) .fetch_one(&pool) .await .unwrap(); assert_eq!(count.0, 1); let data: (String,) = sqlx::query_as( "SELECT data FROM sync_changelog WHERE table_name = 'email_accounts' AND row_id = ?" ) .bind(&account_id) .fetch_one(&pool) .await .unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap(); let obj = parsed.as_object().unwrap(); assert_eq!(obj.len(), 17, "trigger data should have 17 keys (config only)"); assert!(!obj.contains_key("password"), "password should not be in trigger data"); assert!(!obj.contains_key("oauth2_access_token"), "oauth2_access_token should not be in trigger data"); assert!(!obj.contains_key("oauth2_refresh_token"), "oauth2_refresh_token should not be in trigger data"); assert!(!obj.contains_key("oauth2_token_expires_at"), "oauth2_token_expires_at should not be in trigger data"); assert!(!obj.contains_key("last_sync_at"), "last_sync_at should not be in trigger data"); assert_eq!(parsed["account_name"], "Work"); } #[tokio::test] async fn email_account_update_fires_trigger() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; // Insert with triggers suppressed set_sync_state(&pool, "applying_remote", "1").await.unwrap(); let account_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO email_accounts (id, user_id, account_name, email_address, \ imap_server, imap_port, smtp_server, smtp_port, username, password, \ use_tls, created_at) \ VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \ 'smtp.example.com', 587, 'user', 'pass', 1, datetime('now'))" ) .bind(&account_id) .bind(&user_id) .execute(&pool) .await .unwrap(); set_sync_state(&pool, "applying_remote", "0").await.unwrap(); sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); // Update account_name sqlx::query("UPDATE email_accounts SET account_name = 'Personal' WHERE id = ?") .bind(&account_id) .execute(&pool) .await .unwrap(); let row: (String, String) = sqlx::query_as( "SELECT op, data FROM sync_changelog WHERE table_name = 'email_accounts' AND row_id = ?" ) .bind(&account_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "UPDATE"); let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap(); assert_eq!(parsed["account_name"], "Personal"); assert!(!parsed.as_object().unwrap().contains_key("password"), "update trigger should not include password"); } #[tokio::test] async fn email_account_delete_fires_trigger() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; // Insert with triggers suppressed set_sync_state(&pool, "applying_remote", "1").await.unwrap(); let account_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO email_accounts (id, user_id, account_name, email_address, \ imap_server, imap_port, smtp_server, smtp_port, username, password, \ use_tls, created_at) \ VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \ 'smtp.example.com', 587, 'user', 'pass', 1, datetime('now'))" ) .bind(&account_id) .bind(&user_id) .execute(&pool) .await .unwrap(); set_sync_state(&pool, "applying_remote", "0").await.unwrap(); sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); // Delete the account sqlx::query("DELETE FROM email_accounts WHERE id = ?") .bind(&account_id) .execute(&pool) .await .unwrap(); let row: (String,) = sqlx::query_as( "SELECT op FROM sync_changelog WHERE table_name = 'email_accounts' AND row_id = ?" ) .bind(&account_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "DELETE"); } #[test] fn email_account_table_columns_has_17_entries() { let cols = apply::table_columns("email_accounts").unwrap(); assert_eq!(cols.len(), 17, "email_accounts should sync 17 config columns"); assert!(!cols.contains(&"password"), "password should not be in synced columns"); assert!(!cols.contains(&"oauth2_access_token"), "oauth2_access_token should not be in synced columns"); assert!(!cols.contains(&"oauth2_refresh_token"), "oauth2_refresh_token should not be in synced columns"); assert!(!cols.contains(&"oauth2_token_expires_at"), "oauth2_token_expires_at should not be in synced columns"); assert!(!cols.contains(&"last_sync_at"), "last_sync_at should not be in synced columns"); } // -- email_accounts credential preservation -- #[tokio::test] async fn apply_email_account_upsert_creates_with_empty_password() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let account_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let data = json!({ "id": account_id, "user_id": user_id, "account_name": "Remote Work", "email_address": "work@example.com", "imap_server": "imap.example.com", "imap_port": 993, "smtp_server": "smtp.example.com", "smtp_port": 587, "username": "user", "use_tls": true, "created_at": now, "archive_folder_name": null, "auth_type": "password", "jmap_session_url": null, "jmap_account_id": null, "sync_interval_minutes": 5, }); let mut conn = pool.acquire().await.unwrap(); apply::apply_upsert(&mut conn, "email_accounts", &account_id, &data).await.unwrap(); let row: (String, String) = sqlx::query_as( "SELECT account_name, password FROM email_accounts WHERE id = ?" ) .bind(&account_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "Remote Work"); assert_eq!(row.1, "", "new remote account should have empty password"); } #[tokio::test] async fn apply_email_account_upsert_preserves_local_credentials() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let account_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); // Insert a local account with real credentials sqlx::query( "INSERT INTO email_accounts (id, user_id, account_name, email_address, \ imap_server, imap_port, smtp_server, smtp_port, username, password, \ use_tls, created_at, auth_type, oauth2_access_token, oauth2_refresh_token, \ oauth2_token_expires_at, sync_interval_minutes) \ VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \ 'smtp.example.com', 587, 'user', 'secret-password', 1, ?, 'oauth2', \ 'access-tok-123', 'refresh-tok-456', '2026-12-31T00:00:00Z', 5)" ) .bind(&account_id) .bind(&user_id) .bind(&now) .execute(&pool) .await .unwrap(); // Remote update changes account_name but has no credential fields let remote_data = json!({ "id": account_id, "user_id": user_id, "account_name": "Personal", "email_address": "test@example.com", "imap_server": "imap.example.com", "imap_port": 993, "smtp_server": "smtp.example.com", "smtp_port": 587, "username": "user", "use_tls": true, "created_at": now, "archive_folder_name": null, "auth_type": "oauth2", "jmap_session_url": null, "jmap_account_id": null, "sync_interval_minutes": 5, }); let mut conn = pool.acquire().await.unwrap(); apply::apply_upsert(&mut conn, "email_accounts", &account_id, &remote_data).await.unwrap(); // Config should be updated let row: (String, String, Option, Option, Option) = sqlx::query_as( "SELECT account_name, password, oauth2_access_token, oauth2_refresh_token, \ oauth2_token_expires_at FROM email_accounts WHERE id = ?" ) .bind(&account_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "Personal", "account_name should be updated"); assert_eq!(row.1, "secret-password", "password should be preserved"); assert_eq!(row.2.as_deref(), Some("access-tok-123"), "oauth2_access_token should be preserved"); assert_eq!(row.3.as_deref(), Some("refresh-tok-456"), "oauth2_refresh_token should be preserved"); assert_eq!(row.4.as_deref(), Some("2026-12-31T00:00:00Z"), "oauth2_token_expires_at should be preserved"); } #[tokio::test] async fn apply_upsert_task_with_dangling_source_email_id() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let task_id = uuid::Uuid::new_v4().to_string(); let fake_email_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); // Apply via apply_remote_changes which disables FK enforcement let changes = vec![ change("projects", synckit_client::ChangeOp::Insert, &project_id, Some(json!({ "id": project_id, "name": "P", "description": "", "project_type": "Job", "status": "Active", "created_at": now, "user_id": user_id, }))), change("tasks", synckit_client::ChangeOp::Insert, &task_id, Some(json!({ "id": task_id, "project_id": project_id, "description": "Task from email", "status": "Pending", "priority": "Medium", "due": null, "tags": null, "urgency": 50, "recurrence": "None", "created_at": now, "user_id": user_id, "recurrence_parent_id": null, "source_email_id": fake_email_id, "snoozed_until": null, "waiting_for_response": false, "waiting_since": null, "expected_response_date": null, "scheduled_start": null, "scheduled_duration": null, "is_focus": false, "focus_set_at": null, "contact_id": null, "milestone_id": null, }))), ]; // Should succeed -- FK enforcement is OFF during remote apply pull::apply_remote_changes(&pool, changes).await.unwrap(); let row: (String, Option) = sqlx::query_as( "SELECT description, source_email_id FROM tasks WHERE id = ?" ) .bind(&task_id) .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "Task from email"); assert_eq!(row.1.as_deref(), Some(fake_email_id.as_str()), "dangling source_email_id should be stored"); } #[tokio::test] async fn email_account_trigger_excludes_credentials() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); let account_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO email_accounts (id, user_id, account_name, email_address, \ imap_server, imap_port, smtp_server, smtp_port, username, password, \ use_tls, created_at, auth_type, oauth2_access_token, oauth2_refresh_token, \ oauth2_token_expires_at, sync_interval_minutes) \ VALUES (?, ?, 'Work', 'test@example.com', 'imap.example.com', 993, \ 'smtp.example.com', 587, 'user', 'secret', 1, datetime('now'), 'oauth2', \ 'at-123', 'rt-456', '2026-12-31', 5)" ) .bind(&account_id) .bind(&user_id) .execute(&pool) .await .unwrap(); let data: (String,) = sqlx::query_as( "SELECT data FROM sync_changelog WHERE table_name = 'email_accounts' AND row_id = ?" ) .bind(&account_id) .fetch_one(&pool) .await .unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap(); let obj = parsed.as_object().unwrap(); // Should have exactly the 17 config columns assert_eq!(obj.len(), 17); // Credential fields must NOT be present assert!(!obj.contains_key("password")); assert!(!obj.contains_key("oauth2_access_token")); assert!(!obj.contains_key("oauth2_refresh_token")); assert!(!obj.contains_key("oauth2_token_expires_at")); // Config fields should be present assert_eq!(parsed["account_name"], "Work"); assert_eq!(parsed["auth_type"], "oauth2"); assert_eq!(parsed["sync_interval_minutes"], 5); } // -- Attachment sync tests -- /// Helper: insert a project and task, return (project_id, task_id). async fn setup_project_and_task(pool: &SqlitePool, user_id: &str) -> (String, String) { let project_id = uuid::Uuid::new_v4().to_string(); let task_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); sqlx::query( "INSERT INTO projects (id, name, description, project_type, status, created_at, user_id) VALUES (?, ?, ?, ?, ?, ?, ?)" ) .bind(&project_id).bind("Test Project").bind("").bind("Job").bind("Active").bind(&now).bind(user_id) .execute(pool).await.unwrap(); sqlx::query( "INSERT INTO tasks (id, project_id, description, status, priority, urgency, recurrence, created_at, user_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" ) .bind(&task_id).bind(&project_id).bind("Test Task").bind("Pending").bind("Medium").bind(50).bind("None").bind(&now).bind(user_id) .execute(pool).await.unwrap(); (project_id, task_id) } #[test] fn attachment_table_columns_whitelist_has_10_entries() { let cols = apply::table_columns("attachments").unwrap(); assert_eq!(cols.len(), 10); assert_eq!(cols[0], "id"); assert!(cols.contains(&"blob_hash"), "blob_hash must be synced for blob resolution"); assert!(cols.contains(&"filename")); assert!(cols.contains(&"file_size")); assert!(cols.contains(&"mime_type")); } #[test] fn time_sessions_table_columns_whitelist_has_7_entries() { let cols = apply::table_columns("time_sessions").unwrap(); assert_eq!(cols.len(), 7); assert_eq!(cols[0], "id"); assert!(cols.contains(&"task_id")); assert!(cols.contains(&"duration_minutes")); } #[test] fn upsert_order_has_attachments_after_tasks() { let pos = |table: &str| UPSERT_ORDER.iter().position(|t| *t == table); // attachments reference tasks.id assert!(pos("tasks").unwrap() < pos("attachments").unwrap()); } #[test] fn delete_order_has_attachments_before_tasks() { let pos = |table: &str| DELETE_ORDER.iter().position(|t| *t == table); assert!(pos("attachments").unwrap() < pos("tasks").unwrap()); } #[test] fn upsert_order_has_time_sessions_after_tasks() { let pos = |table: &str| UPSERT_ORDER.iter().position(|t| *t == table); assert!(pos("tasks").unwrap() < pos("time_sessions").unwrap()); } #[test] fn delete_order_has_time_sessions_before_tasks() { let pos = |table: &str| DELETE_ORDER.iter().position(|t| *t == table); assert!(pos("time_sessions").unwrap() < pos("tasks").unwrap()); } #[tokio::test] async fn attachment_insert_fires_sync_trigger() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); let att_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO attachments (id, user_id, task_id, filename, file_size, mime_type, blob_hash, created_at) \ VALUES (?, ?, ?, 'report.pdf', 12345, 'application/pdf', 'abc123hash', datetime('now'))" ) .bind(&att_id).bind(&user_id).bind(&task_id) .execute(&pool).await.unwrap(); let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'attachments' AND row_id = ?" ).bind(&att_id).fetch_one(&pool).await.unwrap(); assert_eq!(count.0, 1, "attachment insert should fire sync trigger"); // Verify trigger data contains blob_hash (critical for blob resolution) let data: (String,) = sqlx::query_as( "SELECT data FROM sync_changelog WHERE table_name = 'attachments' AND row_id = ?" ).bind(&att_id).fetch_one(&pool).await.unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap(); assert_eq!(parsed["blob_hash"], "abc123hash"); assert_eq!(parsed["filename"], "report.pdf"); assert_eq!(parsed["file_size"], 12345); } #[tokio::test] async fn attachment_delete_fires_sync_trigger() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await; let att_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO attachments (id, user_id, task_id, filename, file_size, mime_type, blob_hash, created_at) \ VALUES (?, ?, ?, 'file.txt', 100, 'text/plain', 'hash1', datetime('now'))" ) .bind(&att_id).bind(&user_id).bind(&task_id) .execute(&pool).await.unwrap(); sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); sqlx::query("DELETE FROM attachments WHERE id = ?") .bind(&att_id).execute(&pool).await.unwrap(); let row: (String,) = sqlx::query_as( "SELECT op FROM sync_changelog WHERE table_name = 'attachments' AND row_id = ?" ).bind(&att_id).fetch_one(&pool).await.unwrap(); assert_eq!(row.0, "DELETE"); } #[tokio::test] async fn apply_upsert_attachment_round_trip() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let (project_id, task_id) = setup_project_and_task(&pool, &user_id).await; let att_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let data = json!({ "id": att_id, "user_id": user_id, "task_id": task_id, "project_id": project_id, "filename": "slides.pptx", "file_size": 98765, "mime_type": "application/vnd.ms-powerpoint", "blob_hash": "sha256-deadbeef", "source_email_id": null, "created_at": now, }); let mut conn = pool.acquire().await.unwrap(); apply::apply_upsert(&mut conn, "attachments", &att_id, &data).await.unwrap(); let row: (String, i64, String) = sqlx::query_as( "SELECT filename, file_size, blob_hash FROM attachments WHERE id = ?" ).bind(&att_id).fetch_one(&pool).await.unwrap(); assert_eq!(row.0, "slides.pptx"); assert_eq!(row.1, 98765); assert_eq!(row.2, "sha256-deadbeef"); } #[tokio::test] async fn apply_delete_attachment() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await; let att_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let mut conn = pool.acquire().await.unwrap(); apply::apply_upsert(&mut conn, "attachments", &att_id, &json!({ "id": att_id, "user_id": user_id, "task_id": task_id, "project_id": null, "filename": "delete-me.txt", "file_size": 10, "mime_type": "text/plain", "blob_hash": "h", "source_email_id": null, "created_at": now, })).await.unwrap(); apply::apply_delete(&mut conn, "attachments", &att_id).await.unwrap(); let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM attachments WHERE id = ?") .bind(&att_id).fetch_one(&pool).await.unwrap(); assert_eq!(count.0, 0); } #[tokio::test] async fn time_session_insert_fires_sync_trigger() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await; sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); let session_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO time_sessions (id, task_id, user_id, started_at, created_at) \ VALUES (?, ?, ?, datetime('now'), datetime('now'))" ) .bind(&session_id).bind(&task_id).bind(&user_id) .execute(&pool).await.unwrap(); let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'time_sessions' AND row_id = ?" ).bind(&session_id).fetch_one(&pool).await.unwrap(); assert_eq!(count.0, 1, "time_session insert should fire sync trigger"); let data: (String,) = sqlx::query_as( "SELECT data FROM sync_changelog WHERE table_name = 'time_sessions' AND row_id = ?" ).bind(&session_id).fetch_one(&pool).await.unwrap(); let parsed: serde_json::Value = serde_json::from_str(&data.0).unwrap(); assert_eq!(parsed["task_id"], task_id); } #[tokio::test] async fn time_session_update_fires_sync_trigger() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await; let session_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO time_sessions (id, task_id, user_id, started_at, created_at) \ VALUES (?, ?, ?, datetime('now'), datetime('now'))" ) .bind(&session_id).bind(&task_id).bind(&user_id) .execute(&pool).await.unwrap(); sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); // Stop the timer sqlx::query("UPDATE time_sessions SET ended_at = datetime('now'), duration_minutes = 25 WHERE id = ?") .bind(&session_id).execute(&pool).await.unwrap(); let row: (String, String) = sqlx::query_as( "SELECT op, data FROM sync_changelog WHERE table_name = 'time_sessions' AND row_id = ?" ).bind(&session_id).fetch_one(&pool).await.unwrap(); assert_eq!(row.0, "UPDATE"); let parsed: serde_json::Value = serde_json::from_str(&row.1).unwrap(); assert_eq!(parsed["duration_minutes"], 25); } #[tokio::test] async fn apply_upsert_time_session_round_trip() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await; let session_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); let data = json!({ "id": session_id, "task_id": task_id, "user_id": user_id, "started_at": now, "ended_at": now, "duration_minutes": 42, "created_at": now, }); let mut conn = pool.acquire().await.unwrap(); apply::apply_upsert(&mut conn, "time_sessions", &session_id, &data).await.unwrap(); let row: (String, i32) = sqlx::query_as( "SELECT task_id, duration_minutes FROM time_sessions WHERE id = ?" ).bind(&session_id).fetch_one(&pool).await.unwrap(); assert_eq!(row.0, task_id); assert_eq!(row.1, 42); } #[tokio::test] async fn apply_remote_changes_attachment_with_task_in_same_batch() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let project_id = uuid::Uuid::new_v4().to_string(); let task_id = uuid::Uuid::new_v4().to_string(); let att_id = uuid::Uuid::new_v4().to_string(); let now = now_sql(); // Send task + attachment in same batch (attachment references task via FK) // apply_remote_changes must handle FK ordering correctly let changes = vec![ change("attachments", synckit_client::ChangeOp::Insert, &att_id, Some(json!({ "id": att_id, "user_id": user_id, "task_id": task_id, "project_id": project_id, "filename": "batch.pdf", "file_size": 500, "mime_type": "application/pdf", "blob_hash": "batchhash", "source_email_id": null, "created_at": now, }))), change("tasks", synckit_client::ChangeOp::Insert, &task_id, Some(json!({ "id": task_id, "project_id": project_id, "description": "Batch task", "status": "Pending", "priority": "Low", "due": null, "tags": null, "urgency": 10, "recurrence": "None", "created_at": now, "user_id": user_id, "recurrence_parent_id": null, "source_email_id": null, "snoozed_until": null, "waiting_for_response": false, "waiting_since": null, "expected_response_date": null, "scheduled_start": null, "scheduled_duration": null, "is_focus": false, "focus_set_at": null, "contact_id": null, "milestone_id": null, "completed_at": null, "estimated_minutes": null, "actual_minutes": 0, }))), change("projects", synckit_client::ChangeOp::Insert, &project_id, Some(json!({ "id": project_id, "name": "Batch project", "description": "", "project_type": "Job", "status": "Active", "created_at": now, "user_id": user_id, }))), ]; // Should succeed despite wrong input order (attachment before task before project) pull::apply_remote_changes(&pool, changes).await.unwrap(); let att: (String,) = sqlx::query_as("SELECT filename FROM attachments WHERE id = ?") .bind(&att_id).fetch_one(&pool).await.unwrap(); assert_eq!(att.0, "batch.pdf"); // Triggers should be suppressed -- no changelog entries sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sync_changelog") .fetch_one(&pool).await.unwrap(); assert_eq!(count.0, 0); } #[tokio::test] async fn initial_snapshot_captures_attachments_and_time_sessions() { let pool = setup_test_db().await; let user_id = create_test_user(&pool).await; let (_project_id, task_id) = setup_project_and_task(&pool, &user_id).await; // Insert attachment and time_session with triggers suppressed set_sync_state(&pool, "applying_remote", "1").await.unwrap(); let att_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO attachments (id, user_id, task_id, filename, file_size, mime_type, blob_hash, created_at) \ VALUES (?, ?, ?, 'snap.pdf', 100, 'application/pdf', 'snaphash', datetime('now'))" ) .bind(&att_id).bind(&user_id).bind(&task_id) .execute(&pool).await.unwrap(); let session_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO time_sessions (id, task_id, user_id, started_at, ended_at, duration_minutes, created_at) \ VALUES (?, ?, ?, datetime('now', '-1 hour'), datetime('now'), 60, datetime('now'))" ) .bind(&session_id).bind(&task_id).bind(&user_id) .execute(&pool).await.unwrap(); set_sync_state(&pool, "applying_remote", "0").await.unwrap(); sqlx::query("DELETE FROM sync_changelog").execute(&pool).await.unwrap(); let total = create_initial_snapshot(&pool).await.unwrap(); // Should capture: 1 project + 1 task + 1 attachment + 1 time_session = 4 assert_eq!(total, 4, "snapshot should capture projects, tasks, attachments, and time_sessions"); let att_count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'attachments'" ).fetch_one(&pool).await.unwrap(); assert_eq!(att_count.0, 1); let ts_count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM sync_changelog WHERE table_name = 'time_sessions'" ).fetch_one(&pool).await.unwrap(); assert_eq!(ts_count.0, 1); }