Skip to main content

max / goingson

31.3 KB · 666 lines History Blame Raw
1 //! SQLite implementation of the EmailRepository.
2 //!
3 //! Manages email messages with support for:
4 //! - IMAP synchronization tracking (message_id, imap_uid)
5 //! - Threading via in_reply_to and thread_id
6 //! - Read/archived status
7 //! - Project associations
8 //! - Snoozing and waiting-for-response tracking
9
10 use async_trait::async_trait;
11 use chrono::{DateTime, Utc};
12 use sqlx::SqlitePool;
13 use std::collections::HashSet;
14 use goingson_core::{
15 CoreError, Email, EmailAccountId, EmailId, EmailRepository, EmailThread, NewEmail,
16 NewEmailWithTracking, ProjectId, Result, UserId,
17 };
18 use std::collections::HashMap;
19
20 use crate::utils::{format_datetime, format_datetime_now, format_datetime_opt, parse_datetime, parse_uuid, parse_uuid_opt};
21
22 /// Column list for SELECT queries - avoids duplication across methods.
23 const EMAIL_SELECT_COLUMNS: &str = r#"e.id, e.project_id, p.name as project_name, e.from_address, e.to_address,
24 e.subject, e.body, e.html_body, e.is_read, e.is_archived, e.received_at, e.message_id,
25 e.in_reply_to, e.thread_id, e.email_account_id, e.is_outgoing, e.imap_uid, e.source_folder,
26 e.attachment_meta, e.labels, e.is_draft, e.cc_address, e.bcc_address, e.draft_account_id,
27 e.snoozed_until, e.waiting_for_response, e.waiting_since, e.expected_response_date"#;
28
29 #[derive(Debug, Clone, sqlx::FromRow)]
30 struct EmailRow {
31 pub id: String,
32 pub project_id: Option<String>,
33 pub project_name: Option<String>,
34 pub from_address: String,
35 pub to_address: String,
36 pub subject: String,
37 pub body: String,
38 pub html_body: Option<String>,
39 pub is_read: i32,
40 pub is_archived: i32,
41 pub received_at: String,
42 pub message_id: Option<String>,
43 pub in_reply_to: Option<String>,
44 pub thread_id: Option<String>,
45 pub email_account_id: Option<String>,
46 pub is_outgoing: i32,
47 pub imap_uid: Option<i64>,
48 pub source_folder: Option<String>,
49 pub attachment_meta: Option<String>,
50 pub labels: String,
51 pub is_draft: i32,
52 pub cc_address: Option<String>,
53 pub bcc_address: Option<String>,
54 pub draft_account_id: Option<String>,
55 pub snoozed_until: Option<String>,
56 pub waiting_for_response: i32,
57 pub waiting_since: Option<String>,
58 pub expected_response_date: Option<String>,
59 }
60
61 impl TryFrom<EmailRow> for Email {
62 type Error = CoreError;
63
64 fn try_from(row: EmailRow) -> std::result::Result<Self, Self::Error> {
65 Ok(Email {
66 id: parse_uuid(&row.id)?.into(),
67 project_id: parse_uuid_opt(row.project_id.as_deref())?.map(Into::into),
68 project_name: row.project_name,
69 from: row.from_address,
70 to: row.to_address,
71 subject: row.subject,
72 body: row.body,
73 html_body: row.html_body,
74 is_read: row.is_read != 0,
75 is_archived: row.is_archived != 0,
76 received_at: parse_datetime(&row.received_at)?,
77 message_id: row.message_id,
78 in_reply_to: row.in_reply_to,
79 thread_id: row.thread_id,
80 email_account_id: parse_uuid_opt(row.email_account_id.as_deref())?.map(Into::into),
81 is_outgoing: row.is_outgoing != 0,
82 imap_uid: row.imap_uid,
83 source_folder: row.source_folder,
84 attachment_meta: row.attachment_meta,
85 labels: serde_json::from_str(&row.labels).unwrap_or_default(),
86 is_draft: row.is_draft != 0,
87 cc_address: row.cc_address,
88 bcc_address: row.bcc_address,
89 draft_account_id: parse_uuid_opt(row.draft_account_id.as_deref())?.map(Into::into),
90 snoozed_until: row.snoozed_until.as_ref().map(|s| parse_datetime(s)).transpose()?,
91 waiting_for_response: row.waiting_for_response != 0,
92 waiting_since: row.waiting_since.as_ref().map(|s| parse_datetime(s)).transpose()?,
93 expected_response_date: row.expected_response_date.as_ref().map(|s| parse_datetime(s)).transpose()?,
94 })
95 }
96 }
97
98 /// SQLite-backed implementation of [`EmailRepository`].
99 ///
100 /// Manages email messages with threading support, snoozing, and
101 /// waiting-for-response tracking. Integrates with IMAP sync via message_id.
102 pub struct SqliteEmailRepository { pool: SqlitePool }
103
104 impl SqliteEmailRepository {
105 /// Creates a new repository instance with the given connection pool.
106 #[tracing::instrument(skip_all)]
107 pub fn new(pool: SqlitePool) -> Self { Self { pool } }
108 }
109
110 #[async_trait]
111 impl EmailRepository for SqliteEmailRepository {
112 #[tracing::instrument(skip_all)]
113 async fn list_all(&self, user_id: UserId, include_archived: bool) -> Result<Vec<Email>> {
114 let archived_filter = if include_archived { "" } else { "AND e.is_archived = 0" };
115 let query = format!(
116 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND e.is_draft = 0 {} ORDER BY e.received_at DESC",
117 EMAIL_SELECT_COLUMNS, archived_filter
118 );
119 let rows = sqlx::query_as::<_, EmailRow>(&query).bind(user_id.to_string()).bind(user_id.to_string()).fetch_all(&self.pool).await.map_err(CoreError::database)?;
120 rows.into_iter().map(Email::try_from).collect()
121 }
122
123 #[tracing::instrument(skip_all)]
124 async fn list_threaded(&self, user_id: UserId, include_archived: bool, offset: Option<i64>, limit: Option<i64>, folder: Option<&str>, label: Option<&str>) -> Result<(Vec<EmailThread>, i64)> {
125 let uid = user_id.to_string();
126 let archived_filter = if include_archived { "" } else { "AND e.is_archived = 0" };
127 let folder_filter = folder.map(|_| "AND e.source_folder = ?").unwrap_or("");
128 let label_filter = label.map(|_| "AND EXISTS (SELECT 1 FROM json_each(e.labels) j WHERE j.value = ?)").unwrap_or("");
129 let offset_val = offset.unwrap_or(0);
130 let limit_val = limit.unwrap_or(50);
131
132 // Query 1: Get total thread count
133 let count_sql = format!(
134 "SELECT COUNT(DISTINCT COALESCE(e.thread_id, e.id)) FROM emails e WHERE e.user_id = ? AND e.is_draft = 0 {} {} {}",
135 archived_filter, folder_filter, label_filter
136 );
137 let mut count_q = sqlx::query_as::<_, (i64,)>(&count_sql).bind(&uid);
138 if let Some(f) = folder { count_q = count_q.bind(f); }
139 if let Some(l) = label { count_q = count_q.bind(l); }
140 let (total,) = count_q.fetch_one(&self.pool).await.map_err(CoreError::database)?;
141
142 if total == 0 {
143 return Ok((vec![], 0));
144 }
145
146 // Query 2: Thread summary — group by thread, get latest received_at, count, unread status
147 #[derive(sqlx::FromRow)]
148 #[allow(dead_code)]
149 struct ThreadSummary {
150 thread_key: String,
151 latest_received_at: String, // needed for SQL ORDER BY
152 thread_count: i64,
153 unread_count: i64,
154 latest_email_id: String,
155 }
156
157 let summary_sql = format!(
158 r#"SELECT
159 COALESCE(e.thread_id, e.id) AS thread_key,
160 MAX(e.received_at) AS latest_received_at,
161 COUNT(*) AS thread_count,
162 SUM(CASE WHEN e.is_read = 0 THEN 1 ELSE 0 END) AS unread_count,
163 (SELECT e2.id FROM emails e2
164 WHERE COALESCE(e2.thread_id, e2.id) = COALESCE(e.thread_id, e.id)
165 AND e2.user_id = ? AND e2.is_draft = 0 {} {} {}
166 ORDER BY e2.received_at DESC LIMIT 1) AS latest_email_id
167 FROM emails e
168 WHERE e.user_id = ? AND e.is_draft = 0 {} {} {}
169 GROUP BY COALESCE(e.thread_id, e.id)
170 ORDER BY latest_received_at DESC
171 LIMIT ? OFFSET ?"#,
172 archived_filter, folder_filter, label_filter,
173 archived_filter, folder_filter, label_filter,
174 );
175
176 let mut summary_q = sqlx::query_as::<_, ThreadSummary>(&summary_sql).bind(&uid);
177 if let Some(f) = folder { summary_q = summary_q.bind(f); }
178 if let Some(l) = label { summary_q = summary_q.bind(l); }
179 summary_q = summary_q.bind(&uid);
180 if let Some(f) = folder { summary_q = summary_q.bind(f); }
181 if let Some(l) = label { summary_q = summary_q.bind(l); }
182 let summaries = summary_q
183 .bind(limit_val)
184 .bind(offset_val)
185 .fetch_all(&self.pool)
186 .await
187 .map_err(CoreError::database)?;
188
189 if summaries.is_empty() {
190 return Ok((vec![], total));
191 }
192
193 // Query 3: Fetch full emails for the page's most-recent-email IDs
194 let email_ids: Vec<String> = summaries.iter().map(|s| s.latest_email_id.clone()).collect();
195 let placeholders = email_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
196 let emails_sql = format!(
197 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.id IN ({}) AND e.user_id = ?",
198 EMAIL_SELECT_COLUMNS, placeholders
199 );
200
201 let mut q = sqlx::query_as::<_, EmailRow>(&emails_sql).bind(&uid);
202 for id in &email_ids {
203 q = q.bind(id);
204 }
205 q = q.bind(&uid);
206
207 let rows = q.fetch_all(&self.pool).await.map_err(CoreError::database)?;
208
209 let email_map: HashMap<String, Email> = rows
210 .into_iter()
211 .filter_map(|row| {
212 let id_str = row.id.clone();
213 Email::try_from(row).ok().map(|e| (id_str, e))
214 })
215 .collect();
216
217 // Assemble threads in summary order
218 let threads: Vec<EmailThread> = summaries
219 .into_iter()
220 .filter_map(|s| {
221 let email = email_map.get(&s.latest_email_id)?.clone();
222 Some(EmailThread {
223 thread_id: s.thread_key,
224 most_recent_email: email,
225 thread_count: s.thread_count as usize,
226 has_unread: s.unread_count > 0,
227 })
228 })
229 .collect();
230
231 Ok((threads, total))
232 }
233
234 #[tracing::instrument(skip_all)]
235 async fn list_by_project(&self, user_id: UserId, project_id: ProjectId) -> Result<Vec<Email>> {
236 let query = format!(
237 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND e.project_id = ? ORDER BY e.received_at DESC",
238 EMAIL_SELECT_COLUMNS
239 );
240 let rows = sqlx::query_as::<_, EmailRow>(&query).bind(user_id.to_string()).bind(user_id.to_string()).bind(project_id.to_string()).fetch_all(&self.pool).await.map_err(CoreError::database)?;
241 rows.into_iter().map(Email::try_from).collect()
242 }
243
244 #[tracing::instrument(skip_all)]
245 async fn list_by_addresses(&self, user_id: UserId, addresses: &[&str]) -> Result<Vec<Email>> {
246 if addresses.is_empty() {
247 return Ok(Vec::new());
248 }
249 let placeholders = addresses.iter().map(|_| "?").collect::<Vec<_>>().join(",");
250 let query = format!(
251 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND (LOWER(e.from_address) IN ({placeholders}) OR LOWER(e.to_address) IN ({placeholders})) ORDER BY e.received_at DESC LIMIT 200",
252 EMAIL_SELECT_COLUMNS
253 );
254 let mut q = sqlx::query_as::<_, EmailRow>(&query)
255 .bind(user_id.to_string())
256 .bind(user_id.to_string());
257 // Bind addresses twice (once for from_address IN, once for to_address IN)
258 for _ in 0..2 {
259 for addr in addresses {
260 q = q.bind(addr.to_lowercase());
261 }
262 }
263 let rows = q.fetch_all(&self.pool).await.map_err(CoreError::database)?;
264 rows.into_iter().map(Email::try_from).collect()
265 }
266
267 #[tracing::instrument(skip_all)]
268 async fn list_unlinked(&self, user_id: UserId) -> Result<Vec<Email>> {
269 let query = format!(
270 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND e.project_id IS NULL AND e.is_archived = 0 ORDER BY e.received_at DESC",
271 EMAIL_SELECT_COLUMNS
272 );
273 let rows = sqlx::query_as::<_, EmailRow>(&query).bind(user_id.to_string()).bind(user_id.to_string()).fetch_all(&self.pool).await.map_err(CoreError::database)?;
274 rows.into_iter().map(Email::try_from).collect()
275 }
276
277 #[tracing::instrument(skip_all)]
278 async fn get_by_id(&self, id: EmailId, user_id: UserId) -> Result<Option<Email>> {
279 let query = format!(
280 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.id = ? AND e.user_id = ?",
281 EMAIL_SELECT_COLUMNS
282 );
283 let row = sqlx::query_as::<_, EmailRow>(&query).bind(user_id.to_string()).bind(id.to_string()).bind(user_id.to_string()).fetch_optional(&self.pool).await.map_err(CoreError::database)?;
284 row.map(Email::try_from).transpose()
285 }
286
287 #[tracing::instrument(skip_all)]
288 async fn create(&self, user_id: UserId, email: NewEmail) -> Result<Email> {
289 let id = EmailId::new();
290 let received_at = format_datetime(&email.received_at.unwrap_or_else(Utc::now));
291 sqlx::query("INSERT INTO emails (id, user_id, project_id, from_address, to_address, subject, body, is_read, received_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
292 .bind(id.to_string()).bind(user_id.to_string()).bind(email.project_id.map(|p| p.to_string()))
293 .bind(&email.from_address).bind(&email.to_address).bind(&email.subject).bind(&email.body)
294 .bind(if email.is_read { 1 } else { 0 }).bind(&received_at)
295 .execute(&self.pool).await.map_err(CoreError::database)?;
296 self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created email"))
297 }
298
299 #[tracing::instrument(skip_all)]
300 async fn create_with_tracking(&self, user_id: UserId, email: NewEmailWithTracking) -> Result<Email> {
301 let id = goingson_core::deterministic_email_id(email.message_id.as_deref());
302 let received_at = format_datetime(&email.received_at.unwrap_or_else(Utc::now));
303 sqlx::query("INSERT INTO emails (id, user_id, project_id, from_address, to_address, subject, body, html_body, is_read, is_archived, received_at, message_id, in_reply_to, thread_id, email_account_id, is_outgoing, imap_uid, source_folder, attachment_meta) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
304 .bind(id.to_string()).bind(user_id.to_string()).bind(email.project_id.map(|p| p.to_string()))
305 .bind(&email.from_address).bind(&email.to_address).bind(&email.subject).bind(&email.body).bind(&email.html_body)
306 .bind(if email.is_read { 1 } else { 0 }).bind(if email.is_archived { 1 } else { 0 }).bind(&received_at)
307 .bind(&email.message_id).bind(&email.in_reply_to).bind(&email.thread_id)
308 .bind(email.email_account_id.map(|a| a.to_string()))
309 .bind(if email.is_outgoing { 1 } else { 0 }).bind(email.imap_uid).bind(&email.source_folder)
310 .bind(&email.attachment_meta)
311 .execute(&self.pool).await.map_err(CoreError::database)?;
312 self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created email"))
313 }
314
315 #[tracing::instrument(skip_all)]
316 async fn create_with_tracking_batch(&self, user_id: UserId, emails: Vec<NewEmailWithTracking>) -> Result<usize> {
317 if emails.is_empty() {
318 return Ok(0);
319 }
320
321 let mut count = 0usize;
322 let uid = user_id.to_string();
323
324 let mut tx = self.pool.begin().await.map_err(CoreError::database)?;
325
326 for email in emails {
327 let id = goingson_core::deterministic_email_id(email.message_id.as_deref());
328 let received_at = format_datetime(&email.received_at.unwrap_or_else(Utc::now));
329 let result = sqlx::query("INSERT OR IGNORE INTO emails (id, user_id, project_id, from_address, to_address, subject, body, html_body, is_read, is_archived, received_at, message_id, in_reply_to, thread_id, email_account_id, is_outgoing, imap_uid, source_folder, attachment_meta) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
330 .bind(id.to_string()).bind(&uid).bind(email.project_id.map(|p| p.to_string()))
331 .bind(&email.from_address).bind(&email.to_address).bind(&email.subject).bind(&email.body).bind(&email.html_body)
332 .bind(if email.is_read { 1 } else { 0 }).bind(if email.is_archived { 1 } else { 0 }).bind(&received_at)
333 .bind(&email.message_id).bind(&email.in_reply_to).bind(&email.thread_id)
334 .bind(email.email_account_id.map(|a| a.to_string()))
335 .bind(if email.is_outgoing { 1 } else { 0 }).bind(email.imap_uid).bind(&email.source_folder)
336 .bind(&email.attachment_meta)
337 .execute(&mut *tx).await.map_err(CoreError::database)?;
338 if result.rows_affected() > 0 {
339 count += 1;
340 }
341 }
342
343 tx.commit().await.map_err(CoreError::database)?;
344 Ok(count)
345 }
346
347 #[tracing::instrument(skip_all)]
348 async fn delete(&self, id: EmailId, user_id: UserId) -> Result<bool> {
349 let result = sqlx::query("DELETE FROM emails WHERE id = ? AND user_id = ?").bind(id.to_string()).bind(user_id.to_string()).execute(&self.pool).await.map_err(CoreError::database)?;
350 Ok(result.rows_affected() > 0)
351 }
352
353 #[tracing::instrument(skip_all)]
354 async fn mark_read(&self, id: EmailId, user_id: UserId) -> Result<bool> {
355 let result = sqlx::query("UPDATE emails SET is_read = 1 WHERE id = ? AND user_id = ?").bind(id.to_string()).bind(user_id.to_string()).execute(&self.pool).await.map_err(CoreError::database)?;
356 Ok(result.rows_affected() > 0)
357 }
358
359 #[tracing::instrument(skip_all)]
360 async fn mark_unread(&self, id: EmailId, user_id: UserId) -> Result<bool> {
361 let result = sqlx::query("UPDATE emails SET is_read = 0 WHERE id = ? AND user_id = ?").bind(id.to_string()).bind(user_id.to_string()).execute(&self.pool).await.map_err(CoreError::database)?;
362 Ok(result.rows_affected() > 0)
363 }
364
365 #[tracing::instrument(skip_all)]
366 async fn archive(&self, id: EmailId, user_id: UserId) -> Result<bool> {
367 let result = sqlx::query("UPDATE emails SET is_archived = 1 WHERE id = ? AND user_id = ?").bind(id.to_string()).bind(user_id.to_string()).execute(&self.pool).await.map_err(CoreError::database)?;
368 Ok(result.rows_affected() > 0)
369 }
370
371 #[tracing::instrument(skip_all)]
372 async fn unarchive(&self, id: EmailId, user_id: UserId) -> Result<bool> {
373 let result = sqlx::query("UPDATE emails SET is_archived = 0 WHERE id = ? AND user_id = ?").bind(id.to_string()).bind(user_id.to_string()).execute(&self.pool).await.map_err(CoreError::database)?;
374 Ok(result.rows_affected() > 0)
375 }
376
377 #[tracing::instrument(skip_all)]
378 async fn update_source_folder(&self, id: EmailId, user_id: UserId, new_folder: &str) -> Result<bool> {
379 let result = sqlx::query("UPDATE emails SET source_folder = ? WHERE id = ? AND user_id = ?").bind(new_folder).bind(id.to_string()).bind(user_id.to_string()).execute(&self.pool).await.map_err(CoreError::database)?;
380 Ok(result.rows_affected() > 0)
381 }
382
383 #[tracing::instrument(skip_all)]
384 async fn mark_all_read(&self, user_id: UserId) -> Result<u64> {
385 let result = sqlx::query("UPDATE emails SET is_read = 1 WHERE user_id = ? AND is_read = 0").bind(user_id.to_string()).execute(&self.pool).await.map_err(CoreError::database)?;
386 Ok(result.rows_affected())
387 }
388
389 #[tracing::instrument(skip_all)]
390 async fn link_to_project(&self, id: EmailId, user_id: UserId, project_id: Option<ProjectId>) -> Result<bool> {
391 let result = sqlx::query("UPDATE emails SET project_id = ? WHERE id = ? AND user_id = ?").bind(project_id.map(|p| p.to_string())).bind(id.to_string()).bind(user_id.to_string()).execute(&self.pool).await.map_err(CoreError::database)?;
392 Ok(result.rows_affected() > 0)
393 }
394
395 #[tracing::instrument(skip_all)]
396 async fn count_unread(&self, user_id: UserId) -> Result<i64> {
397 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM emails WHERE user_id = ? AND is_read = 0").bind(user_id.to_string()).fetch_one(&self.pool).await.map_err(CoreError::database)?;
398 Ok(row.0)
399 }
400
401 #[tracing::instrument(skip_all)]
402 async fn exists_by_message_id(&self, user_id: UserId, message_id: &str) -> Result<bool> {
403 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM emails WHERE user_id = ? AND message_id = ?").bind(user_id.to_string()).bind(message_id).fetch_one(&self.pool).await.map_err(CoreError::database)?;
404 Ok(row.0 > 0)
405 }
406
407 #[tracing::instrument(skip_all)]
408 async fn exists_by_message_ids(&self, user_id: UserId, message_ids: &[&str]) -> Result<HashSet<String>> {
409 if message_ids.is_empty() {
410 return Ok(HashSet::new());
411 }
412
413 let placeholders = message_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
414 let query = format!(
415 "SELECT message_id FROM emails WHERE user_id = ? AND message_id IN ({})",
416 placeholders
417 );
418
419 let mut q = sqlx::query_as::<_, (String,)>(&query).bind(user_id.to_string());
420 for msg_id in message_ids {
421 q = q.bind(*msg_id);
422 }
423
424 let rows = q.fetch_all(&self.pool).await
425 .map_err(CoreError::database)?;
426
427 Ok(rows.into_iter().map(|(id,)| id).collect())
428 }
429
430 #[tracing::instrument(skip_all)]
431 async fn exists_as_senders(&self, user_id: UserId, addresses: &[&str]) -> Result<HashSet<String>> {
432 if addresses.is_empty() {
433 return Ok(HashSet::new());
434 }
435
436 let placeholders = addresses.iter().map(|_| "?").collect::<Vec<_>>().join(",");
437 let query = format!(
438 "SELECT DISTINCT LOWER(from_address) FROM emails WHERE user_id = ? AND LOWER(from_address) IN ({})",
439 placeholders
440 );
441
442 let mut q = sqlx::query_as::<_, (String,)>(&query).bind(user_id.to_string());
443 for addr in addresses {
444 q = q.bind(addr.to_lowercase());
445 }
446
447 let rows = q.fetch_all(&self.pool).await
448 .map_err(CoreError::database)?;
449
450 Ok(rows.into_iter().map(|(a,)| a).collect())
451 }
452
453 #[tracing::instrument(skip_all)]
454 async fn snooze(&self, id: EmailId, user_id: UserId, until: DateTime<Utc>) -> Result<Option<Email>> {
455 let until_str = format_datetime(&until);
456 let result = sqlx::query("UPDATE emails SET snoozed_until = ? WHERE id = ? AND user_id = ?")
457 .bind(&until_str)
458 .bind(id.to_string())
459 .bind(user_id.to_string())
460 .execute(&self.pool)
461 .await
462 .map_err(CoreError::database)?;
463 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
464 }
465
466 #[tracing::instrument(skip_all)]
467 async fn unsnooze(&self, id: EmailId, user_id: UserId) -> Result<Option<Email>> {
468 let result = sqlx::query("UPDATE emails SET snoozed_until = NULL WHERE id = ? AND user_id = ?")
469 .bind(id.to_string())
470 .bind(user_id.to_string())
471 .execute(&self.pool)
472 .await
473 .map_err(CoreError::database)?;
474 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
475 }
476
477 #[tracing::instrument(skip_all)]
478 async fn list_snoozed(&self, user_id: UserId) -> Result<Vec<Email>> {
479 let query = format!(
480 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND e.snoozed_until IS NOT NULL AND datetime(e.snoozed_until) > datetime('now') ORDER BY e.snoozed_until ASC",
481 EMAIL_SELECT_COLUMNS
482 );
483 let rows = sqlx::query_as::<_, EmailRow>(&query)
484 .bind(user_id.to_string())
485 .bind(user_id.to_string())
486 .fetch_all(&self.pool)
487 .await
488 .map_err(CoreError::database)?;
489 rows.into_iter().map(Email::try_from).collect()
490 }
491
492 #[tracing::instrument(skip_all)]
493 async fn mark_waiting(&self, id: EmailId, user_id: UserId, expected_response: Option<DateTime<Utc>>) -> Result<Option<Email>> {
494 let now = format_datetime_now();
495 let expected = format_datetime_opt(expected_response);
496
497 let result = sqlx::query(
498 "UPDATE emails SET waiting_for_response = 1, waiting_since = ?, expected_response_date = ? WHERE id = ? AND user_id = ?"
499 )
500 .bind(&now)
501 .bind(&expected)
502 .bind(id.to_string())
503 .bind(user_id.to_string())
504 .execute(&self.pool)
505 .await
506 .map_err(CoreError::database)?;
507
508 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
509 }
510
511 #[tracing::instrument(skip_all)]
512 async fn clear_waiting(&self, id: EmailId, user_id: UserId) -> Result<Option<Email>> {
513 let result = sqlx::query(
514 "UPDATE emails SET waiting_for_response = 0, waiting_since = NULL, expected_response_date = NULL WHERE id = ? AND user_id = ?"
515 )
516 .bind(id.to_string())
517 .bind(user_id.to_string())
518 .execute(&self.pool)
519 .await
520 .map_err(CoreError::database)?;
521
522 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
523 }
524
525 #[tracing::instrument(skip_all)]
526 async fn list_waiting(&self, user_id: UserId) -> Result<Vec<Email>> {
527 let query = format!(
528 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND e.waiting_for_response = 1 ORDER BY e.expected_response_date ASC",
529 EMAIL_SELECT_COLUMNS
530 );
531 let rows = sqlx::query_as::<_, EmailRow>(&query)
532 .bind(user_id.to_string())
533 .bind(user_id.to_string())
534 .fetch_all(&self.pool)
535 .await
536 .map_err(CoreError::database)?;
537 rows.into_iter().map(Email::try_from).collect()
538 }
539
540 #[tracing::instrument(skip_all)]
541 async fn list_by_thread(&self, user_id: UserId, thread_id: &str) -> Result<Vec<Email>> {
542 let query = format!(
543 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND e.thread_id = ? ORDER BY e.received_at ASC",
544 EMAIL_SELECT_COLUMNS
545 );
546 let rows = sqlx::query_as::<_, EmailRow>(&query)
547 .bind(user_id.to_string())
548 .bind(user_id.to_string())
549 .bind(thread_id)
550 .fetch_all(&self.pool)
551 .await
552 .map_err(CoreError::database)?;
553 rows.into_iter().map(Email::try_from).collect()
554 }
555
556 #[tracing::instrument(skip_all)]
557 async fn list_drafts(&self, user_id: UserId) -> Result<Vec<Email>> {
558 let query = format!(
559 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND e.is_draft = 1 ORDER BY e.received_at DESC",
560 EMAIL_SELECT_COLUMNS
561 );
562 let rows = sqlx::query_as::<_, EmailRow>(&query)
563 .bind(user_id.to_string())
564 .bind(user_id.to_string())
565 .fetch_all(&self.pool)
566 .await
567 .map_err(CoreError::database)?;
568 rows.into_iter().map(Email::try_from).collect()
569 }
570
571 #[tracing::instrument(skip_all)]
572 async fn save_draft(&self, id: EmailId, user_id: UserId, from: &str, to: &str, cc: Option<&str>, bcc: Option<&str>, subject: &str, body: &str, account_id: Option<EmailAccountId>, in_reply_to: Option<&str>, _references: Option<&str>, thread_id: Option<&str>) -> Result<Email> {
573 let now = format_datetime_now();
574 let account_id_str = account_id.map(|a: EmailAccountId| a.to_string());
575
576 // Upsert: update if exists, insert if not
577 sqlx::query(r#"
578 INSERT INTO emails (id, user_id, from_address, to_address, cc_address, bcc_address, subject, body,
579 is_read, is_archived, is_draft, is_outgoing, received_at, draft_account_id, in_reply_to, thread_id)
580 VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, 0, 1, 1, ?, ?, ?, ?)
581 ON CONFLICT(id) DO UPDATE SET
582 from_address = excluded.from_address,
583 to_address = excluded.to_address,
584 cc_address = excluded.cc_address,
585 bcc_address = excluded.bcc_address,
586 subject = excluded.subject,
587 body = excluded.body,
588 received_at = excluded.received_at,
589 draft_account_id = excluded.draft_account_id,
590 in_reply_to = excluded.in_reply_to,
591 thread_id = excluded.thread_id
592 "#)
593 .bind(id.to_string())
594 .bind(user_id.to_string())
595 .bind(from)
596 .bind(to)
597 .bind(cc)
598 .bind(bcc)
599 .bind(subject)
600 .bind(body)
601 .bind(&now)
602 .bind(&account_id_str)
603 .bind(in_reply_to)
604 .bind(thread_id)
605 .execute(&self.pool)
606 .await
607 .map_err(CoreError::database)?;
608
609 self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve saved draft"))
610 }
611
612 #[tracing::instrument(skip_all)]
613 async fn get_by_message_id(&self, user_id: UserId, message_id: &str) -> Result<Option<Email>> {
614 let query = format!(
615 "SELECT {} FROM emails e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? WHERE e.user_id = ? AND e.message_id = ?",
616 EMAIL_SELECT_COLUMNS
617 );
618 let row = sqlx::query_as::<_, EmailRow>(&query)
619 .bind(user_id.to_string())
620 .bind(user_id.to_string())
621 .bind(message_id)
622 .fetch_optional(&self.pool)
623 .await
624 .map_err(CoreError::database)?;
625 row.map(Email::try_from).transpose()
626 }
627
628 #[tracing::instrument(skip_all)]
629 async fn update_labels(&self, id: EmailId, user_id: UserId, labels: &[String]) -> Result<Option<Email>> {
630 let labels_json = serde_json::to_string(labels).unwrap_or_else(|_| "[]".to_string());
631 let result = sqlx::query("UPDATE emails SET labels = ? WHERE id = ? AND user_id = ?")
632 .bind(&labels_json)
633 .bind(id.to_string())
634 .bind(user_id.to_string())
635 .execute(&self.pool)
636 .await
637 .map_err(CoreError::database)?;
638 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
639 }
640
641 #[tracing::instrument(skip_all)]
642 async fn list_folders(&self, user_id: UserId) -> Result<Vec<String>> {
643 let rows: Vec<(String,)> = sqlx::query_as(
644 "SELECT DISTINCT source_folder FROM emails WHERE user_id = ? AND source_folder IS NOT NULL AND is_draft = 0 ORDER BY source_folder ASC"
645 )
646 .bind(user_id.to_string())
647 .fetch_all(&self.pool)
648 .await
649 .map_err(CoreError::database)?;
650 Ok(rows.into_iter().map(|r| r.0).collect())
651 }
652
653 #[tracing::instrument(skip_all)]
654 async fn list_labels(&self, user_id: UserId) -> Result<Vec<String>> {
655 // Extract all unique labels across all emails via JSON parsing
656 let rows: Vec<(String,)> = sqlx::query_as(
657 "SELECT DISTINCT j.value FROM emails e, json_each(e.labels) j WHERE e.user_id = ? AND e.is_draft = 0 ORDER BY j.value ASC"
658 )
659 .bind(user_id.to_string())
660 .fetch_all(&self.pool)
661 .await
662 .map_err(CoreError::database)?;
663 Ok(rows.into_iter().map(|r| r.0).collect())
664 }
665 }
666