Skip to main content

max / goingson

16.0 KB · 444 lines History Blame Raw
1 //! Email synchronization commands.
2 //!
3 //! Provides IMAP and JMAP email sync, using `goingson_core::email_sync` for the
4 //! shared dedup-save-clear-waiting loop.
5
6 use serde::Serialize;
7 use sha2::{Sha256, Digest};
8 use std::path::Path;
9 use std::sync::Arc;
10 use tauri::State;
11 use tracing::{instrument, warn};
12 use uuid::Uuid;
13
14 use goingson_core::email_sync::{FetchedEmail, process_fetched_emails};
15 use goingson_core::{AttachmentMeta, EmailAccount, EmailAccountId};
16
17 use crate::email::{AttachmentPart, ParsedEmail};
18
19 use crate::email::{uses_jmap, ImapClient};
20 use crate::jmap::JmapClient;
21 use crate::oauth::{CredentialStore, TokenManager};
22 use crate::state::{AppState, DESKTOP_USER_ID};
23 use super::{ApiError, OptionApiError, OptionNotFound, ResultApiError};
24 use super::email_account::{uses_oauth_imap, get_account_password, get_valid_access_token};
25
26 // ============ Types ============
27
28 #[derive(Debug, Serialize)]
29 #[serde(rename_all = "camelCase")]
30 pub struct SyncResponse {
31 pub emails_fetched: usize,
32 pub emails_saved: usize,
33 pub inbox_fetched: usize,
34 pub archive_fetched: usize,
35 pub message: String,
36 pub debug_info: Option<String>,
37 }
38
39 // ============ Commands ============
40
41 /// Inner sync function callable from both the command and the scheduler.
42 #[instrument(skip_all)]
43 pub async fn sync_email_account_inner(
44 state: &Arc<AppState>,
45 id: EmailAccountId,
46 full_sync: Option<bool>,
47 ) -> Result<SyncResponse, ApiError> {
48 // Acquire per-account lock to prevent concurrent syncs on the same account
49 if !state.email_sync_locks.lock().unwrap_or_else(|e| e.into_inner()).insert(id) {
50 return Err(ApiError::bad_request("Sync already in progress for this account"));
51 }
52
53 let result = async {
54 let account = state.email_accounts
55 .get_by_id(id, DESKTOP_USER_ID)
56 .await?
57 .or_not_found("emailAccount", id)?;
58
59 let result = if uses_jmap(&account) {
60 sync_jmap_account_inner(state, &account, id, full_sync).await?
61 } else {
62 sync_imap_account_inner(state, &account, id, full_sync).await?
63 };
64
65 state.email_accounts.update_last_sync(id, DESKTOP_USER_ID).await?;
66
67 Ok(result)
68 }.await;
69
70 // Always release the lock. The lock is not held across the await, so it cannot be poisoned
71 // by an async-block panic; unwrap_or_else handles the unreachable poison case defensively.
72 state.email_sync_locks
73 .lock()
74 .unwrap_or_else(|p| p.into_inner())
75 .remove(&id);
76
77 result
78 }
79
80 /// Synchronizes emails for an account.
81 #[tauri::command]
82 #[instrument(skip_all)]
83 pub async fn sync_email_account(state: State<'_, Arc<AppState>>, id: EmailAccountId, full_sync: Option<bool>) -> Result<SyncResponse, ApiError> {
84 sync_email_account_inner(&state, id, full_sync).await
85 }
86
87 /// Syncs an IMAP email account.
88 ///
89 /// Fetches from INBOX and Archive sequentially (single IMAP connection), then
90 /// processes both folders concurrently — hashing attachments, saving to DB, and
91 /// writing blobs in parallel for each folder.
92 #[instrument(skip_all)]
93 async fn sync_imap_account_inner(
94 state: &Arc<AppState>,
95 account: &EmailAccount,
96 id: EmailAccountId,
97 full_sync: Option<bool>,
98 ) -> Result<SyncResponse, ApiError> {
99 let imap_client = if uses_oauth_imap(account) {
100 let access_token = get_valid_access_token(state, account).await?;
101 ImapClient::with_oauth(
102 &account.imap_server,
103 account.imap_port as u16,
104 &account.email_address,
105 &access_token,
106 )
107 } else {
108 let password = get_account_password(account)?;
109 ImapClient::with_password(account, &password)
110 };
111
112 let archive_folder = account.archive_folder_name.clone().unwrap_or_else(|| "Archive".to_string());
113 let is_full_sync = full_sync.unwrap_or(false);
114
115 let mut debug_parts = Vec::new();
116 let since = if is_full_sync { None } else { account.last_sync_at };
117 debug_parts.push(format!("since filter: {:?}, full_sync: {:?}", since.map(|d| d.to_rfc3339()), full_sync));
118
119 let blobs_dir = state.data_dir.join("blobs");
120
121 // --- Fetch sync states ---
122 let inbox_sync_state = if is_full_sync {
123 None
124 } else {
125 state.email_accounts.get_folder_sync_state(id, "INBOX").await.ok().flatten()
126 };
127
128 let archive_sync_state = if is_full_sync {
129 None
130 } else {
131 state.email_accounts.get_folder_sync_state(id, &archive_folder).await.ok().flatten()
132 };
133
134 // --- IMAP fetches (sequential, single connection) ---
135 let inbox_result = imap_client
136 .fetch_emails_incremental("INBOX", inbox_sync_state.as_ref(), since)
137 .await
138 .map_api_err("Failed to fetch INBOX", ApiError::external_service)?;
139
140 let inbox_fetched = inbox_result.emails.len();
141 debug_parts.push(format!("INBOX: {}", inbox_result.debug_info));
142
143 let archive_result = match imap_client
144 .fetch_emails_incremental(&archive_folder, archive_sync_state.as_ref(), since)
145 .await
146 {
147 Ok(result) => {
148 debug_parts.push(format!("Archive: {}", result.debug_info));
149 Some(result)
150 }
151 Err(e) => {
152 warn!("Could not sync archive folder '{}': {}", archive_folder, e);
153 None
154 }
155 };
156
157 let archive_fetched = archive_result.as_ref().map_or(0, |r| r.emails.len());
158 let emails_fetched = inbox_fetched + archive_fetched;
159
160 // Extract sync metadata before consuming emails
161 let archive_uid_validity = archive_result.as_ref().and_then(|r| r.uid_validity);
162 let archive_max_uid = archive_result.as_ref().and_then(|r| r.max_uid_fetched);
163 let archive_emails = archive_result.map(|r| r.emails).unwrap_or_default();
164
165 // --- Process both folders concurrently (hash + DB save + blob write) ---
166 let inbox_process = process_folder_emails(state, &blobs_dir, id, inbox_result.emails, false);
167 let archive_process = process_folder_emails(state, &blobs_dir, id, archive_emails, true);
168
169 let (inbox_saved, archive_saved) = tokio::try_join!(inbox_process, archive_process)?;
170 let total_saved = inbox_saved + archive_saved;
171
172 // --- Update sync states ---
173 if let Some(validity) = inbox_result.uid_validity {
174 if let Some(ref prev) = inbox_sync_state {
175 if prev.uid_validity != validity {
176 state.email_accounts.delete_folder_sync_state(id, "INBOX").await.ok();
177 }
178 }
179 let new_max = inbox_result.max_uid_fetched
180 .unwrap_or_else(|| inbox_sync_state.as_ref().map_or(0, |s| s.last_seen_uid));
181 if new_max > 0 {
182 state.email_accounts.upsert_folder_sync_state(id, "INBOX", validity, new_max).await.ok();
183 }
184 }
185
186 if let Some(validity) = archive_uid_validity {
187 if let Some(ref prev) = archive_sync_state {
188 if prev.uid_validity != validity {
189 state.email_accounts.delete_folder_sync_state(id, &archive_folder).await.ok();
190 }
191 }
192 let new_max = archive_max_uid
193 .unwrap_or_else(|| archive_sync_state.as_ref().map_or(0, |s| s.last_seen_uid));
194 if new_max > 0 {
195 state.email_accounts.upsert_folder_sync_state(id, &archive_folder, validity, new_max).await.ok();
196 }
197 }
198
199 Ok(SyncResponse {
200 emails_fetched,
201 emails_saved: total_saved,
202 inbox_fetched,
203 archive_fetched,
204 message: format!("Found {} in INBOX, {} in Archive. Saved {} new emails.", inbox_fetched, archive_fetched, total_saved),
205 debug_info: Some(debug_parts.join(" | ")),
206 })
207 }
208
209 /// Process fetched emails for a single folder: hash attachments, save to DB,
210 /// then write blobs to disk. Blob I/O is deferred until after the DB save so
211 /// large attachments don't block the sync pipeline.
212 async fn process_folder_emails(
213 state: &Arc<AppState>,
214 blobs_dir: &Path,
215 account_id: EmailAccountId,
216 emails: Vec<ParsedEmail>,
217 is_archived: bool,
218 ) -> Result<usize, ApiError> {
219 if emails.is_empty() {
220 return Ok(0);
221 }
222
223 // Phase 1: Hash attachments and build FetchedEmail structs (CPU-bound, no disk I/O)
224 let (fetched_emails, pending_blobs) = tokio::task::spawn_blocking(move || {
225 let mut all_pending: Vec<(String, Vec<u8>)> = Vec::new();
226 let fetched: Vec<FetchedEmail> = emails.into_iter().map(|p| {
227 let (attachment_meta, pending) = build_attachment_meta_deferred(p.attachments);
228 all_pending.extend(pending);
229 FetchedEmail {
230 message_id: p.message_id,
231 in_reply_to: p.in_reply_to,
232 references_root: p.references_root,
233 from: p.from,
234 to: p.to,
235 subject: p.subject,
236 body: p.body,
237 html_body: p.html_body,
238 is_read: p.is_read,
239 date: p.date,
240 source_folder: p.source_folder,
241 imap_uid: Some(p.imap_uid as i64),
242 is_archived,
243 attachment_meta,
244 }
245 }).collect();
246 (fetched, all_pending)
247 }).await.map_err(|e| ApiError::internal(format!("Attachment processing failed: {e}")))?;
248
249 // Phase 2: Save to DB immediately (metadata is ready)
250 let save_result = process_fetched_emails(
251 state.emails.as_ref(), DESKTOP_USER_ID, account_id, fetched_emails,
252 ).await?;
253
254 // Phase 3: Write blobs to disk (deferred — large file I/O after DB commit)
255 if !pending_blobs.is_empty() {
256 let blobs_dir = blobs_dir.to_path_buf();
257 tokio::task::spawn_blocking(move || {
258 write_pending_blobs(&blobs_dir, pending_blobs);
259 }).await.ok();
260 }
261
262 Ok(save_result.emails_saved)
263 }
264
265 /// Syncs a JMAP email account.
266 #[instrument(skip_all)]
267 async fn sync_jmap_account_inner(
268 state: &Arc<AppState>,
269 account: &EmailAccount,
270 id: EmailAccountId,
271 full_sync: Option<bool>,
272 ) -> Result<SyncResponse, ApiError> {
273 let session_url = account.jmap_session_url.as_ref()
274 .or_api_err(|| ApiError::bad_request("No JMAP session URL configured"))?;
275
276 // Token resolution order: (1) in-memory keychain cache, (2) DB-stored token,
277 // (3) account field. The keychain is preferred because token refreshes update
278 // it first, and the DB may lag behind during the same session.
279 let raw_id: Uuid = account.id.into();
280 let mut access_token = CredentialStore::get_oauth(raw_id)
281 .map(|c| c.access_token)
282 .or_else(|| account.oauth2_access_token.clone())
283 .or_api_err(|| ApiError::auth("No access token available"))?;
284
285 // Check if token needs refresh (serialized per-account to prevent thundering herd)
286 if account.needs_token_refresh() {
287 let refresh_lock = state.token_refresh_lock(raw_id);
288 let _guard = refresh_lock.lock().await;
289 // Re-check after acquiring lock — another task may have refreshed already
290 let token_manager = TokenManager::from_env();
291 if let Ok(Some((new_token, new_refresh, expires_at))) = token_manager.refresh_if_needed(account).await {
292 state.email_accounts
293 .update_oauth_tokens(id, DESKTOP_USER_ID, &new_token, new_refresh.as_deref(), expires_at)
294 .await?;
295 let _ = CredentialStore::update_oauth_tokens(raw_id, &new_token, new_refresh.as_deref());
296 access_token = new_token;
297 }
298 }
299
300 let mut client = JmapClient::new(session_url, &access_token)
301 .map_err(ApiError::external_service)?;
302 let mut debug_parts = Vec::new();
303
304 let since = if full_sync.unwrap_or(false) { None } else { account.last_sync_at };
305 debug_parts.push(format!("JMAP sync, since: {:?}, full_sync: {:?}", since.map(|d| d.to_rfc3339()), full_sync));
306
307 let limit = if full_sync.unwrap_or(false) { 100 } else { 50 };
308
309 let mut emails_fetched = 0;
310 let mut total_saved = 0;
311
312 // Sync inbox
313 let inbox_emails = client.fetch_inbox(since, limit).await
314 .map_api_err("Failed to fetch JMAP inbox", ApiError::external_service)?;
315
316 let inbox_fetched = inbox_emails.len();
317 emails_fetched += inbox_emails.len();
318 debug_parts.push(format!("JMAP Inbox: {} emails", inbox_fetched));
319
320 // JMAP returns plain-text body only — html_body is not yet extracted from
321 // the JMAP Email/get response (would require requesting bodyValues with
322 // text/html type). IMAP UIDs don't apply to JMAP (uses opaque blob IDs).
323 let fetched_inbox: Vec<FetchedEmail> = inbox_emails.into_iter().map(|p| FetchedEmail {
324 message_id: p.message_id,
325 in_reply_to: p.in_reply_to,
326 references_root: p.references_root,
327 from: p.from,
328 to: p.to,
329 subject: p.subject,
330 body: p.body,
331 html_body: None,
332 is_read: p.is_read,
333 date: p.date,
334 source_folder: p.source_folder,
335 imap_uid: None,
336 is_archived: false,
337 attachment_meta: None,
338 }).collect();
339
340 let inbox_result = process_fetched_emails(
341 state.emails.as_ref(), DESKTOP_USER_ID, id, fetched_inbox,
342 ).await?;
343 total_saved += inbox_result.emails_saved;
344
345 // Sync archive
346 let archive_emails = match client.fetch_archive(since, limit).await {
347 Ok(emails) => emails,
348 Err(e) => {
349 debug_parts.push(format!("Archive error: {}", e));
350 Vec::new()
351 }
352 };
353
354 let archive_fetched = archive_emails.len();
355 emails_fetched += archive_emails.len();
356 debug_parts.push(format!("JMAP Archive: {} emails", archive_fetched));
357
358 let fetched_archive: Vec<FetchedEmail> = archive_emails.into_iter().map(|p| FetchedEmail {
359 message_id: p.message_id,
360 in_reply_to: p.in_reply_to,
361 references_root: p.references_root,
362 from: p.from,
363 to: p.to,
364 subject: p.subject,
365 body: p.body,
366 html_body: None,
367 is_read: true,
368 date: p.date,
369 source_folder: p.source_folder,
370 imap_uid: None,
371 is_archived: true,
372 attachment_meta: None,
373 }).collect();
374
375 let archive_result = process_fetched_emails(
376 state.emails.as_ref(), DESKTOP_USER_ID, id, fetched_archive,
377 ).await?;
378 total_saved += archive_result.emails_saved;
379
380 Ok(SyncResponse {
381 emails_fetched,
382 emails_saved: total_saved,
383 inbox_fetched,
384 archive_fetched,
385 message: format!("JMAP: Found {} in Inbox, {} in Archive. Saved {} new emails.", inbox_fetched, archive_fetched, total_saved),
386 debug_info: Some(debug_parts.join(" | ")),
387 })
388 }
389
390 // ============ Attachment Helpers ============
391
392 /// Hash attachments and build metadata JSON without writing blobs to disk.
393 ///
394 /// Returns (metadata JSON string, pending blob writes as (hash, data) pairs).
395 /// The caller is responsible for writing blobs via [`write_pending_blobs`].
396 fn build_attachment_meta_deferred(
397 attachments: Vec<AttachmentPart>,
398 ) -> (Option<String>, Vec<(String, Vec<u8>)>) {
399 if attachments.is_empty() {
400 return (None, Vec::new());
401 }
402
403 let mut pending_writes = Vec::new();
404 let metas: Vec<AttachmentMeta> = attachments.into_iter().map(|part| {
405 let hash = {
406 let mut hasher = Sha256::new();
407 hasher.update(&part.data);
408 format!("{:x}", hasher.finalize())
409 };
410
411 let size = part.data.len();
412 pending_writes.push((hash.clone(), part.data));
413
414 AttachmentMeta {
415 filename: part.filename,
416 mime_type: part.mime_type,
417 size,
418 blob_hash: hash,
419 }
420 }).collect();
421
422 if metas.is_empty() {
423 return (None, Vec::new());
424 }
425
426 (serde_json::to_string(&metas).ok(), pending_writes)
427 }
428
429 /// Write pending blob data to disk with content-addressed dedup.
430 fn write_pending_blobs(blobs_dir: &Path, pending: Vec<(String, Vec<u8>)>) {
431 if let Err(e) = std::fs::create_dir_all(blobs_dir) {
432 warn!("Failed to create blobs dir: {}", e);
433 return;
434 }
435 for (hash, data) in pending {
436 let blob_path = blobs_dir.join(&hash);
437 if !blob_path.exists() {
438 if let Err(e) = std::fs::write(&blob_path, &data) {
439 warn!(blob_hash = %hash, "Failed to write attachment blob: {}", e);
440 }
441 }
442 }
443 }
444