//! Email sync business logic. //! //! Provides the dedup-save-clear-waiting loop shared by IMAP and JMAP sync paths. //! The command layer converts protocol-specific parsed emails into [`FetchedEmail`], //! then calls [`process_fetched_emails`] to handle deduplication, persistence, //! and waiting-status clearing. use chrono::{DateTime, Utc}; use crate::error::CoreError; use crate::id_types::{EmailAccountId, UserId}; use crate::models::NewEmailWithTracking; use crate::repository::EmailRepository; /// A protocol-agnostic fetched email, ready for dedup and save. /// /// Both IMAP `ParsedEmail` and JMAP `JmapParsedEmail` are converted to this /// before being processed. pub struct FetchedEmail { pub message_id: Option, pub in_reply_to: Option, /// First entry from the RFC 2822 References header (the thread root message-ID). pub references_root: Option, pub from: String, pub to: String, pub subject: String, pub body: String, pub html_body: Option, pub is_read: bool, pub date: DateTime, pub source_folder: String, pub imap_uid: Option, pub is_archived: bool, pub attachment_meta: Option, } /// Result of processing a batch of fetched emails. #[derive(Debug, Default)] pub struct SyncProcessResult { /// Number of emails that were new and saved. pub emails_saved: usize, /// Number of waiting statuses cleared because a reply was received. pub waiting_cleared: usize, } /// Deduplicates, saves, and clears waiting status for a batch of fetched emails. /// /// This is the shared logic for both IMAP and JMAP sync paths: /// 1. Batch-check which message IDs already exist /// 2. Batch-insert all new emails (single transaction, no post-insert SELECTs) /// 3. For replies to messages we're waiting on, clear the waiting status pub async fn process_fetched_emails( email_repo: &dyn EmailRepository, user_id: UserId, account_id: EmailAccountId, emails: Vec, ) -> Result { let mut result = SyncProcessResult::default(); // Ensure every email has a message_id for dedup: real if present, else synthetic hash let mut emails = emails; for e in &mut emails { if e.message_id.is_none() { // Use SHA-256 for a stable hash across Rust versions (DefaultHasher is not stable). use sha2::{Sha256, Digest}; let mut h = Sha256::new(); h.update(e.from.as_bytes()); h.update(b"\x00"); h.update(e.to.as_bytes()); h.update(b"\x00"); h.update(e.subject.as_bytes()); h.update(b"\x00"); h.update(e.date.to_rfc3339().as_bytes()); let hash = h.finalize(); e.message_id = Some(format!("synth-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}", hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7])); } } // Batch check existing message IDs let msg_ids: Vec<&str> = emails.iter() .filter_map(|e| e.message_id.as_deref()) .collect(); let existing_ids = email_repo .exists_by_message_ids(user_id, &msg_ids) .await?; // Collect new emails for batch insert, track reply-to IDs for waiting-status clearing let mut new_emails = Vec::new(); let mut reply_to_ids: Vec = Vec::new(); for email in emails { if let Some(ref msg_id) = email.message_id { if existing_ids.contains(msg_id) { continue; } } // thread_id groups conversations: use in_reply_to if this is a reply, // otherwise fall back to message_id (starts a new thread). This means // the first email in a thread has thread_id == message_id. let thread_id = email.references_root.clone() .or_else(|| email.in_reply_to.clone()) .or_else(|| email.message_id.clone()); if let Some(ref reply_to) = email.in_reply_to { reply_to_ids.push(reply_to.clone()); } new_emails.push(NewEmailWithTracking { project_id: None, from_address: email.from, to_address: email.to, subject: email.subject, body: email.body, html_body: email.html_body, is_read: email.is_read, is_archived: email.is_archived, received_at: Some(email.date), message_id: email.message_id, in_reply_to: email.in_reply_to, thread_id, imap_uid: email.imap_uid, source_folder: Some(email.source_folder), email_account_id: Some(account_id), is_outgoing: false, attachment_meta: email.attachment_meta, }); } // Batch insert — single transaction, no post-insert SELECTs result.emails_saved = email_repo.create_with_tracking_batch(user_id, new_emails).await?; // Clear waiting status for emails that received replies for reply_to_msg_id in &reply_to_ids { if let Ok(Some(original)) = email_repo.get_by_message_id(user_id, reply_to_msg_id).await { if original.waiting_for_response { let _ = email_repo.clear_waiting(original.id, user_id).await; result.waiting_cleared += 1; } } } Ok(result) } #[cfg(test)] mod tests { use super::*; #[test] fn sync_process_result_default_all_zeros() { let r = SyncProcessResult::default(); assert_eq!(r.emails_saved, 0); assert_eq!(r.waiting_cleared, 0); } #[test] fn fetched_email_construction() { let email = FetchedEmail { message_id: Some("msg-1@example.com".to_string()), in_reply_to: None, references_root: None, from: "sender@example.com".to_string(), to: "recipient@example.com".to_string(), subject: "Test".to_string(), body: "Hello".to_string(), html_body: None, is_read: false, date: Utc::now(), source_folder: "INBOX".to_string(), imap_uid: Some(42), is_archived: false, attachment_meta: None, }; assert_eq!(email.from, "sender@example.com"); assert_eq!(email.imap_uid, Some(42)); assert!(!email.is_read); assert!(!email.is_archived); assert!(email.attachment_meta.is_none()); } }