//! Application-level data migrations that require Rust logic (not expressible in SQL). use goingson_core::email_id::deterministic_email_id; use sqlx::{Acquire, SqlitePool}; use tracing::{info, warn}; const MIGRATION_KEY: &str = "migration_deterministic_email_ids"; /// Rehash existing emails from random v4 IDs to deterministic v5 IDs. /// /// Reads all emails with a message_id, computes UUID v5, then updates: /// 1. The email's PK (emails.id) /// 2. The FK in tasks.source_email_id /// 3. FTS triggers auto-handle the PK change on UPDATE /// /// Tracked via sync_state table — runs once, skipped on subsequent launches. #[tracing::instrument(skip_all)] pub async fn migrate_deterministic_email_ids(pool: &SqlitePool) -> Result<(), String> { // Check if already done let done: Option<(String,)> = sqlx::query_as( "SELECT value FROM sync_state WHERE key = ?", ) .bind(MIGRATION_KEY) .fetch_optional(pool) .await .map_err(|e| format!("Failed to check migration state: {e}"))?; if done.as_ref().map(|r| r.0.as_str()) == Some("1") { return Ok(()); } info!("Running deterministic email ID migration"); // Fetch all emails that have a message_id and still use v4 IDs let rows: Vec<(String, String)> = sqlx::query_as( "SELECT id, message_id FROM emails WHERE message_id IS NOT NULL", ) .fetch_all(pool) .await .map_err(|e| format!("Failed to fetch emails: {e}"))?; let mut updated = 0u64; // Use a dedicated connection with FK enforcement off, wrapped in a transaction let mut conn = pool.acquire().await .map_err(|e| format!("Failed to acquire connection: {e}"))?; sqlx::query("PRAGMA foreign_keys = OFF") .execute(&mut *conn) .await .map_err(|e| format!("Failed to disable FK: {e}"))?; let mut tx = conn.begin().await .map_err(|e| format!("Failed to begin transaction: {e}"))?; for (old_id, message_id) in &rows { let new_id = deterministic_email_id(Some(message_id)); let new_id_str = new_id.to_string(); if *old_id == new_id_str { continue; // Already correct } // Update the email PK let result = sqlx::query("UPDATE emails SET id = ? WHERE id = ?") .bind(&new_id_str) .bind(old_id) .execute(&mut *tx) .await; match result { Ok(_) => { // Update FK references in tasks sqlx::query( "UPDATE tasks SET source_email_id = ? WHERE source_email_id = ?", ) .bind(&new_id_str) .bind(old_id) .execute(&mut *tx) .await .map_err(|e| format!("Failed to update task FK for email {old_id}: {e}"))?; updated += 1; } Err(e) => { // PK collision = two emails mapped to same v5 UUID (duplicate message_id). // Skip — the first one wins. warn!("Skipping email {old_id}: {e}"); } } } tx.commit().await .map_err(|e| format!("Failed to commit migration: {e}"))?; // Re-enable FK enforcement on the same connection sqlx::query("PRAGMA foreign_keys = ON") .execute(&mut *conn) .await .map_err(|e| format!("Failed to re-enable FK: {e}"))?; // Mark migration complete sqlx::query( "INSERT OR REPLACE INTO sync_state (key, value) VALUES (?, '1')", ) .bind(MIGRATION_KEY) .execute(pool) .await .map_err(|e| format!("Failed to mark migration done: {e}"))?; info!("Deterministic email ID migration complete: {updated} emails updated"); Ok(()) }