Skip to main content

max / goingson

3.7 KB · 114 lines History Blame Raw
1 //! Application-level data migrations that require Rust logic (not expressible in SQL).
2
3 use goingson_core::email_id::deterministic_email_id;
4 use sqlx::{Acquire, SqlitePool};
5 use tracing::{info, warn};
6
7 const MIGRATION_KEY: &str = "migration_deterministic_email_ids";
8
9 /// Rehash existing emails from random v4 IDs to deterministic v5 IDs.
10 ///
11 /// Reads all emails with a message_id, computes UUID v5, then updates:
12 /// 1. The email's PK (emails.id)
13 /// 2. The FK in tasks.source_email_id
14 /// 3. FTS triggers auto-handle the PK change on UPDATE
15 ///
16 /// Tracked via sync_state table — runs once, skipped on subsequent launches.
17 #[tracing::instrument(skip_all)]
18 pub async fn migrate_deterministic_email_ids(pool: &SqlitePool) -> Result<(), String> {
19 // Check if already done
20 let done: Option<(String,)> = sqlx::query_as(
21 "SELECT value FROM sync_state WHERE key = ?",
22 )
23 .bind(MIGRATION_KEY)
24 .fetch_optional(pool)
25 .await
26 .map_err(|e| format!("Failed to check migration state: {e}"))?;
27
28 if done.as_ref().map(|r| r.0.as_str()) == Some("1") {
29 return Ok(());
30 }
31
32 info!("Running deterministic email ID migration");
33
34 // Fetch all emails that have a message_id and still use v4 IDs
35 let rows: Vec<(String, String)> = sqlx::query_as(
36 "SELECT id, message_id FROM emails WHERE message_id IS NOT NULL",
37 )
38 .fetch_all(pool)
39 .await
40 .map_err(|e| format!("Failed to fetch emails: {e}"))?;
41
42 let mut updated = 0u64;
43
44 // Use a dedicated connection with FK enforcement off, wrapped in a transaction
45 let mut conn = pool.acquire().await
46 .map_err(|e| format!("Failed to acquire connection: {e}"))?;
47
48 sqlx::query("PRAGMA foreign_keys = OFF")
49 .execute(&mut *conn)
50 .await
51 .map_err(|e| format!("Failed to disable FK: {e}"))?;
52
53 let mut tx = conn.begin().await
54 .map_err(|e| format!("Failed to begin transaction: {e}"))?;
55
56 for (old_id, message_id) in &rows {
57 let new_id = deterministic_email_id(Some(message_id));
58 let new_id_str = new_id.to_string();
59
60 if *old_id == new_id_str {
61 continue; // Already correct
62 }
63
64 // Update the email PK
65 let result = sqlx::query("UPDATE emails SET id = ? WHERE id = ?")
66 .bind(&new_id_str)
67 .bind(old_id)
68 .execute(&mut *tx)
69 .await;
70
71 match result {
72 Ok(_) => {
73 // Update FK references in tasks
74 sqlx::query(
75 "UPDATE tasks SET source_email_id = ? WHERE source_email_id = ?",
76 )
77 .bind(&new_id_str)
78 .bind(old_id)
79 .execute(&mut *tx)
80 .await
81 .map_err(|e| format!("Failed to update task FK for email {old_id}: {e}"))?;
82
83 updated += 1;
84 }
85 Err(e) => {
86 // PK collision = two emails mapped to same v5 UUID (duplicate message_id).
87 // Skip — the first one wins.
88 warn!("Skipping email {old_id}: {e}");
89 }
90 }
91 }
92
93 tx.commit().await
94 .map_err(|e| format!("Failed to commit migration: {e}"))?;
95
96 // Re-enable FK enforcement on the same connection
97 sqlx::query("PRAGMA foreign_keys = ON")
98 .execute(&mut *conn)
99 .await
100 .map_err(|e| format!("Failed to re-enable FK: {e}"))?;
101
102 // Mark migration complete
103 sqlx::query(
104 "INSERT OR REPLACE INTO sync_state (key, value) VALUES (?, '1')",
105 )
106 .bind(MIGRATION_KEY)
107 .execute(pool)
108 .await
109 .map_err(|e| format!("Failed to mark migration done: {e}"))?;
110
111 info!("Deterministic email ID migration complete: {updated} emails updated");
112 Ok(())
113 }
114