//! Email synchronization commands. //! //! Provides IMAP and JMAP email sync, using `goingson_core::email_sync` for the //! shared dedup-save-clear-waiting loop. use serde::Serialize; use sha2::{Sha256, Digest}; use std::path::Path; use std::sync::Arc; use tauri::State; use tracing::{instrument, warn}; use uuid::Uuid; use goingson_core::email_sync::{FetchedEmail, process_fetched_emails}; use goingson_core::{AttachmentMeta, EmailAccount, EmailAccountId}; use crate::email::{AttachmentPart, ParsedEmail}; use crate::email::{uses_jmap, ImapClient}; use crate::jmap::JmapClient; use crate::oauth::{CredentialStore, TokenManager}; use crate::state::{AppState, DESKTOP_USER_ID}; use super::{ApiError, OptionApiError, OptionNotFound, ResultApiError}; use super::email_account::{uses_oauth_imap, get_account_password, get_valid_access_token}; // ============ Types ============ #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct SyncResponse { pub emails_fetched: usize, pub emails_saved: usize, pub inbox_fetched: usize, pub archive_fetched: usize, pub message: String, pub debug_info: Option, } // ============ Commands ============ /// Inner sync function callable from both the command and the scheduler. #[instrument(skip_all)] pub async fn sync_email_account_inner( state: &Arc, id: EmailAccountId, full_sync: Option, ) -> Result { // Acquire per-account lock to prevent concurrent syncs on the same account if !state.email_sync_locks.lock().unwrap_or_else(|e| e.into_inner()).insert(id) { return Err(ApiError::bad_request("Sync already in progress for this account")); } let result = async { let account = state.email_accounts .get_by_id(id, DESKTOP_USER_ID) .await? .or_not_found("emailAccount", id)?; let result = if uses_jmap(&account) { sync_jmap_account_inner(state, &account, id, full_sync).await? } else { sync_imap_account_inner(state, &account, id, full_sync).await? }; state.email_accounts.update_last_sync(id, DESKTOP_USER_ID).await?; Ok(result) }.await; // Always release the lock. The lock is not held across the await, so it cannot be poisoned // by an async-block panic; unwrap_or_else handles the unreachable poison case defensively. state.email_sync_locks .lock() .unwrap_or_else(|p| p.into_inner()) .remove(&id); result } /// Synchronizes emails for an account. #[tauri::command] #[instrument(skip_all)] pub async fn sync_email_account(state: State<'_, Arc>, id: EmailAccountId, full_sync: Option) -> Result { sync_email_account_inner(&state, id, full_sync).await } /// Syncs an IMAP email account. /// /// Fetches from INBOX and Archive sequentially (single IMAP connection), then /// processes both folders concurrently — hashing attachments, saving to DB, and /// writing blobs in parallel for each folder. #[instrument(skip_all)] async fn sync_imap_account_inner( state: &Arc, account: &EmailAccount, id: EmailAccountId, full_sync: Option, ) -> Result { let imap_client = if uses_oauth_imap(account) { let access_token = get_valid_access_token(state, account).await?; ImapClient::with_oauth( &account.imap_server, account.imap_port as u16, &account.email_address, &access_token, ) } else { let password = get_account_password(account)?; ImapClient::with_password(account, &password) }; let archive_folder = account.archive_folder_name.clone().unwrap_or_else(|| "Archive".to_string()); let is_full_sync = full_sync.unwrap_or(false); let mut debug_parts = Vec::new(); let since = if is_full_sync { None } else { account.last_sync_at }; debug_parts.push(format!("since filter: {:?}, full_sync: {:?}", since.map(|d| d.to_rfc3339()), full_sync)); let blobs_dir = state.data_dir.join("blobs"); // --- Fetch sync states --- let inbox_sync_state = if is_full_sync { None } else { state.email_accounts.get_folder_sync_state(id, "INBOX").await.ok().flatten() }; let archive_sync_state = if is_full_sync { None } else { state.email_accounts.get_folder_sync_state(id, &archive_folder).await.ok().flatten() }; // --- IMAP fetches (sequential, single connection) --- let inbox_result = imap_client .fetch_emails_incremental("INBOX", inbox_sync_state.as_ref(), since) .await .map_api_err("Failed to fetch INBOX", ApiError::external_service)?; let inbox_fetched = inbox_result.emails.len(); debug_parts.push(format!("INBOX: {}", inbox_result.debug_info)); let archive_result = match imap_client .fetch_emails_incremental(&archive_folder, archive_sync_state.as_ref(), since) .await { Ok(result) => { debug_parts.push(format!("Archive: {}", result.debug_info)); Some(result) } Err(e) => { warn!("Could not sync archive folder '{}': {}", archive_folder, e); None } }; let archive_fetched = archive_result.as_ref().map_or(0, |r| r.emails.len()); let emails_fetched = inbox_fetched + archive_fetched; // Extract sync metadata before consuming emails let archive_uid_validity = archive_result.as_ref().and_then(|r| r.uid_validity); let archive_max_uid = archive_result.as_ref().and_then(|r| r.max_uid_fetched); let archive_emails = archive_result.map(|r| r.emails).unwrap_or_default(); // --- Process both folders concurrently (hash + DB save + blob write) --- let inbox_process = process_folder_emails(state, &blobs_dir, id, inbox_result.emails, false); let archive_process = process_folder_emails(state, &blobs_dir, id, archive_emails, true); let (inbox_saved, archive_saved) = tokio::try_join!(inbox_process, archive_process)?; let total_saved = inbox_saved + archive_saved; // --- Update sync states --- if let Some(validity) = inbox_result.uid_validity { if let Some(ref prev) = inbox_sync_state { if prev.uid_validity != validity { state.email_accounts.delete_folder_sync_state(id, "INBOX").await.ok(); } } let new_max = inbox_result.max_uid_fetched .unwrap_or_else(|| inbox_sync_state.as_ref().map_or(0, |s| s.last_seen_uid)); if new_max > 0 { state.email_accounts.upsert_folder_sync_state(id, "INBOX", validity, new_max).await.ok(); } } if let Some(validity) = archive_uid_validity { if let Some(ref prev) = archive_sync_state { if prev.uid_validity != validity { state.email_accounts.delete_folder_sync_state(id, &archive_folder).await.ok(); } } let new_max = archive_max_uid .unwrap_or_else(|| archive_sync_state.as_ref().map_or(0, |s| s.last_seen_uid)); if new_max > 0 { state.email_accounts.upsert_folder_sync_state(id, &archive_folder, validity, new_max).await.ok(); } } Ok(SyncResponse { emails_fetched, emails_saved: total_saved, inbox_fetched, archive_fetched, message: format!("Found {} in INBOX, {} in Archive. Saved {} new emails.", inbox_fetched, archive_fetched, total_saved), debug_info: Some(debug_parts.join(" | ")), }) } /// Process fetched emails for a single folder: hash attachments, save to DB, /// then write blobs to disk. Blob I/O is deferred until after the DB save so /// large attachments don't block the sync pipeline. async fn process_folder_emails( state: &Arc, blobs_dir: &Path, account_id: EmailAccountId, emails: Vec, is_archived: bool, ) -> Result { if emails.is_empty() { return Ok(0); } // Phase 1: Hash attachments and build FetchedEmail structs (CPU-bound, no disk I/O) let (fetched_emails, pending_blobs) = tokio::task::spawn_blocking(move || { let mut all_pending: Vec<(String, Vec)> = Vec::new(); let fetched: Vec = emails.into_iter().map(|p| { let (attachment_meta, pending) = build_attachment_meta_deferred(p.attachments); all_pending.extend(pending); FetchedEmail { message_id: p.message_id, in_reply_to: p.in_reply_to, references_root: p.references_root, from: p.from, to: p.to, subject: p.subject, body: p.body, html_body: p.html_body, is_read: p.is_read, date: p.date, source_folder: p.source_folder, imap_uid: Some(p.imap_uid as i64), is_archived, attachment_meta, } }).collect(); (fetched, all_pending) }).await.map_err(|e| ApiError::internal(format!("Attachment processing failed: {e}")))?; // Phase 2: Save to DB immediately (metadata is ready) let save_result = process_fetched_emails( state.emails.as_ref(), DESKTOP_USER_ID, account_id, fetched_emails, ).await?; // Phase 3: Write blobs to disk (deferred — large file I/O after DB commit) if !pending_blobs.is_empty() { let blobs_dir = blobs_dir.to_path_buf(); tokio::task::spawn_blocking(move || { write_pending_blobs(&blobs_dir, pending_blobs); }).await.ok(); } Ok(save_result.emails_saved) } /// Syncs a JMAP email account. #[instrument(skip_all)] async fn sync_jmap_account_inner( state: &Arc, account: &EmailAccount, id: EmailAccountId, full_sync: Option, ) -> Result { let session_url = account.jmap_session_url.as_ref() .or_api_err(|| ApiError::bad_request("No JMAP session URL configured"))?; // Token resolution order: (1) in-memory keychain cache, (2) DB-stored token, // (3) account field. The keychain is preferred because token refreshes update // it first, and the DB may lag behind during the same session. let raw_id: Uuid = account.id.into(); let mut access_token = CredentialStore::get_oauth(raw_id) .map(|c| c.access_token) .or_else(|| account.oauth2_access_token.clone()) .or_api_err(|| ApiError::auth("No access token available"))?; // Check if token needs refresh (serialized per-account to prevent thundering herd) if account.needs_token_refresh() { let refresh_lock = state.token_refresh_lock(raw_id); let _guard = refresh_lock.lock().await; // Re-check after acquiring lock — another task may have refreshed already let token_manager = TokenManager::from_env(); if let Ok(Some((new_token, new_refresh, expires_at))) = token_manager.refresh_if_needed(account).await { state.email_accounts .update_oauth_tokens(id, DESKTOP_USER_ID, &new_token, new_refresh.as_deref(), expires_at) .await?; let _ = CredentialStore::update_oauth_tokens(raw_id, &new_token, new_refresh.as_deref()); access_token = new_token; } } let mut client = JmapClient::new(session_url, &access_token) .map_err(ApiError::external_service)?; let mut debug_parts = Vec::new(); let since = if full_sync.unwrap_or(false) { None } else { account.last_sync_at }; debug_parts.push(format!("JMAP sync, since: {:?}, full_sync: {:?}", since.map(|d| d.to_rfc3339()), full_sync)); let limit = if full_sync.unwrap_or(false) { 100 } else { 50 }; let mut emails_fetched = 0; let mut total_saved = 0; // Sync inbox let inbox_emails = client.fetch_inbox(since, limit).await .map_api_err("Failed to fetch JMAP inbox", ApiError::external_service)?; let inbox_fetched = inbox_emails.len(); emails_fetched += inbox_emails.len(); debug_parts.push(format!("JMAP Inbox: {} emails", inbox_fetched)); // JMAP returns plain-text body only — html_body is not yet extracted from // the JMAP Email/get response (would require requesting bodyValues with // text/html type). IMAP UIDs don't apply to JMAP (uses opaque blob IDs). let fetched_inbox: Vec = inbox_emails.into_iter().map(|p| FetchedEmail { message_id: p.message_id, in_reply_to: p.in_reply_to, references_root: p.references_root, from: p.from, to: p.to, subject: p.subject, body: p.body, html_body: None, is_read: p.is_read, date: p.date, source_folder: p.source_folder, imap_uid: None, is_archived: false, attachment_meta: None, }).collect(); let inbox_result = process_fetched_emails( state.emails.as_ref(), DESKTOP_USER_ID, id, fetched_inbox, ).await?; total_saved += inbox_result.emails_saved; // Sync archive let archive_emails = match client.fetch_archive(since, limit).await { Ok(emails) => emails, Err(e) => { debug_parts.push(format!("Archive error: {}", e)); Vec::new() } }; let archive_fetched = archive_emails.len(); emails_fetched += archive_emails.len(); debug_parts.push(format!("JMAP Archive: {} emails", archive_fetched)); let fetched_archive: Vec = archive_emails.into_iter().map(|p| FetchedEmail { message_id: p.message_id, in_reply_to: p.in_reply_to, references_root: p.references_root, from: p.from, to: p.to, subject: p.subject, body: p.body, html_body: None, is_read: true, date: p.date, source_folder: p.source_folder, imap_uid: None, is_archived: true, attachment_meta: None, }).collect(); let archive_result = process_fetched_emails( state.emails.as_ref(), DESKTOP_USER_ID, id, fetched_archive, ).await?; total_saved += archive_result.emails_saved; Ok(SyncResponse { emails_fetched, emails_saved: total_saved, inbox_fetched, archive_fetched, message: format!("JMAP: Found {} in Inbox, {} in Archive. Saved {} new emails.", inbox_fetched, archive_fetched, total_saved), debug_info: Some(debug_parts.join(" | ")), }) } // ============ Attachment Helpers ============ /// Hash attachments and build metadata JSON without writing blobs to disk. /// /// Returns (metadata JSON string, pending blob writes as (hash, data) pairs). /// The caller is responsible for writing blobs via [`write_pending_blobs`]. fn build_attachment_meta_deferred( attachments: Vec, ) -> (Option, Vec<(String, Vec)>) { if attachments.is_empty() { return (None, Vec::new()); } let mut pending_writes = Vec::new(); let metas: Vec = attachments.into_iter().map(|part| { let hash = { let mut hasher = Sha256::new(); hasher.update(&part.data); format!("{:x}", hasher.finalize()) }; let size = part.data.len(); pending_writes.push((hash.clone(), part.data)); AttachmentMeta { filename: part.filename, mime_type: part.mime_type, size, blob_hash: hash, } }).collect(); if metas.is_empty() { return (None, Vec::new()); } (serde_json::to_string(&metas).ok(), pending_writes) } /// Write pending blob data to disk with content-addressed dedup. fn write_pending_blobs(blobs_dir: &Path, pending: Vec<(String, Vec)>) { if let Err(e) = std::fs::create_dir_all(blobs_dir) { warn!("Failed to create blobs dir: {}", e); return; } for (hash, data) in pending { let blob_path = blobs_dir.join(&hash); if !blob_path.exists() { if let Err(e) = std::fs::write(&blob_path, &data) { warn!(blob_hash = %hash, "Failed to write attachment blob: {}", e); } } } }