Skip to main content

max / goingson

6.4 KB · 184 lines History Blame Raw
1 //! Email sync business logic.
2 //!
3 //! Provides the dedup-save-clear-waiting loop shared by IMAP and JMAP sync paths.
4 //! The command layer converts protocol-specific parsed emails into [`FetchedEmail`],
5 //! then calls [`process_fetched_emails`] to handle deduplication, persistence,
6 //! and waiting-status clearing.
7
8 use chrono::{DateTime, Utc};
9
10 use crate::error::CoreError;
11 use crate::id_types::{EmailAccountId, UserId};
12 use crate::models::NewEmailWithTracking;
13 use crate::repository::EmailRepository;
14
15 /// A protocol-agnostic fetched email, ready for dedup and save.
16 ///
17 /// Both IMAP `ParsedEmail` and JMAP `JmapParsedEmail` are converted to this
18 /// before being processed.
19 pub struct FetchedEmail {
20 pub message_id: Option<String>,
21 pub in_reply_to: Option<String>,
22 /// First entry from the RFC 2822 References header (the thread root message-ID).
23 pub references_root: Option<String>,
24 pub from: String,
25 pub to: String,
26 pub subject: String,
27 pub body: String,
28 pub html_body: Option<String>,
29 pub is_read: bool,
30 pub date: DateTime<Utc>,
31 pub source_folder: String,
32 pub imap_uid: Option<i64>,
33 pub is_archived: bool,
34 pub attachment_meta: Option<String>,
35 }
36
37 /// Result of processing a batch of fetched emails.
38 #[derive(Debug, Default)]
39 pub struct SyncProcessResult {
40 /// Number of emails that were new and saved.
41 pub emails_saved: usize,
42 /// Number of waiting statuses cleared because a reply was received.
43 pub waiting_cleared: usize,
44 }
45
46 /// Deduplicates, saves, and clears waiting status for a batch of fetched emails.
47 ///
48 /// This is the shared logic for both IMAP and JMAP sync paths:
49 /// 1. Batch-check which message IDs already exist
50 /// 2. Batch-insert all new emails (single transaction, no post-insert SELECTs)
51 /// 3. For replies to messages we're waiting on, clear the waiting status
52 pub async fn process_fetched_emails(
53 email_repo: &dyn EmailRepository,
54 user_id: UserId,
55 account_id: EmailAccountId,
56 emails: Vec<FetchedEmail>,
57 ) -> Result<SyncProcessResult, CoreError> {
58 let mut result = SyncProcessResult::default();
59
60 // Ensure every email has a message_id for dedup: real if present, else synthetic hash
61 let mut emails = emails;
62 for e in &mut emails {
63 if e.message_id.is_none() {
64 // Use SHA-256 for a stable hash across Rust versions (DefaultHasher is not stable).
65 use sha2::{Sha256, Digest};
66 let mut h = Sha256::new();
67 h.update(e.from.as_bytes());
68 h.update(b"\x00");
69 h.update(e.to.as_bytes());
70 h.update(b"\x00");
71 h.update(e.subject.as_bytes());
72 h.update(b"\x00");
73 h.update(e.date.to_rfc3339().as_bytes());
74 let hash = h.finalize();
75 e.message_id = Some(format!("synth-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
76 hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7]));
77 }
78 }
79
80 // Batch check existing message IDs
81 let msg_ids: Vec<&str> = emails.iter()
82 .filter_map(|e| e.message_id.as_deref())
83 .collect();
84
85 let existing_ids = email_repo
86 .exists_by_message_ids(user_id, &msg_ids)
87 .await?;
88
89 // Collect new emails for batch insert, track reply-to IDs for waiting-status clearing
90 let mut new_emails = Vec::new();
91 let mut reply_to_ids: Vec<String> = Vec::new();
92
93 for email in emails {
94 if let Some(ref msg_id) = email.message_id {
95 if existing_ids.contains(msg_id) {
96 continue;
97 }
98 }
99
100 // thread_id groups conversations: use in_reply_to if this is a reply,
101 // otherwise fall back to message_id (starts a new thread). This means
102 // the first email in a thread has thread_id == message_id.
103 let thread_id = email.references_root.clone()
104 .or_else(|| email.in_reply_to.clone())
105 .or_else(|| email.message_id.clone());
106
107 if let Some(ref reply_to) = email.in_reply_to {
108 reply_to_ids.push(reply_to.clone());
109 }
110
111 new_emails.push(NewEmailWithTracking {
112 project_id: None,
113 from_address: email.from,
114 to_address: email.to,
115 subject: email.subject,
116 body: email.body,
117 html_body: email.html_body,
118 is_read: email.is_read,
119 is_archived: email.is_archived,
120 received_at: Some(email.date),
121 message_id: email.message_id,
122 in_reply_to: email.in_reply_to,
123 thread_id,
124 imap_uid: email.imap_uid,
125 source_folder: Some(email.source_folder),
126 email_account_id: Some(account_id),
127 is_outgoing: false,
128 attachment_meta: email.attachment_meta,
129 });
130 }
131
132 // Batch insert — single transaction, no post-insert SELECTs
133 result.emails_saved = email_repo.create_with_tracking_batch(user_id, new_emails).await?;
134
135 // Clear waiting status for emails that received replies
136 for reply_to_msg_id in &reply_to_ids {
137 if let Ok(Some(original)) = email_repo.get_by_message_id(user_id, reply_to_msg_id).await {
138 if original.waiting_for_response {
139 let _ = email_repo.clear_waiting(original.id, user_id).await;
140 result.waiting_cleared += 1;
141 }
142 }
143 }
144
145 Ok(result)
146 }
147
148 #[cfg(test)]
149 mod tests {
150 use super::*;
151
152 #[test]
153 fn sync_process_result_default_all_zeros() {
154 let r = SyncProcessResult::default();
155 assert_eq!(r.emails_saved, 0);
156 assert_eq!(r.waiting_cleared, 0);
157 }
158
159 #[test]
160 fn fetched_email_construction() {
161 let email = FetchedEmail {
162 message_id: Some("msg-1@example.com".to_string()),
163 in_reply_to: None,
164 references_root: None,
165 from: "sender@example.com".to_string(),
166 to: "recipient@example.com".to_string(),
167 subject: "Test".to_string(),
168 body: "Hello".to_string(),
169 html_body: None,
170 is_read: false,
171 date: Utc::now(),
172 source_folder: "INBOX".to_string(),
173 imap_uid: Some(42),
174 is_archived: false,
175 attachment_meta: None,
176 };
177 assert_eq!(email.from, "sender@example.com");
178 assert_eq!(email.imap_uid, Some(42));
179 assert!(!email.is_read);
180 assert!(!email.is_archived);
181 assert!(email.attachment_meta.is_none());
182 }
183 }
184