//! SQLite implementation of the EmailAccountRepository. //! //! Manages email account configurations for IMAP/SMTP and OAuth2/JMAP connectivity. //! Stores server settings, credentials, OAuth tokens, and sync state. use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::SqlitePool; use goingson_core::{CoreError, EmailAccount, EmailAccountId, EmailAccountRepository, EmailAuthType, FolderSyncState, Result, UserId}; use crate::utils::{format_datetime, format_datetime_now, parse_datetime, parse_uuid}; #[derive(Debug, Clone, sqlx::FromRow)] struct EmailAccountRow { pub id: String, pub user_id: String, pub account_name: String, pub email_address: String, pub imap_server: String, pub imap_port: i32, pub smtp_server: String, pub smtp_port: i32, pub username: String, pub password: String, pub use_tls: i32, pub last_sync_at: Option, pub created_at: String, pub archive_folder_name: Option, pub auth_type: String, pub oauth2_access_token: Option, pub oauth2_refresh_token: Option, pub oauth2_token_expires_at: Option, pub jmap_session_url: Option, pub jmap_account_id: Option, pub sync_interval_minutes: Option, pub email_signature: Option, pub notify_new_emails: i32, } impl TryFrom for EmailAccount { type Error = CoreError; fn try_from(row: EmailAccountRow) -> std::result::Result { Ok(EmailAccount { id: parse_uuid(&row.id)?.into(), user_id: parse_uuid(&row.user_id)?.into(), account_name: row.account_name, email_address: row.email_address, imap_server: row.imap_server, imap_port: row.imap_port, smtp_server: row.smtp_server, smtp_port: row.smtp_port, username: row.username, password: row.password, use_tls: row.use_tls != 0, last_sync_at: row.last_sync_at.as_ref().map(|s| parse_datetime(s)).transpose()?, created_at: parse_datetime(&row.created_at)?, archive_folder_name: row.archive_folder_name, auth_type: EmailAuthType::from_str_or_default(&row.auth_type), oauth2_access_token: row.oauth2_access_token, oauth2_refresh_token: row.oauth2_refresh_token, oauth2_token_expires_at: row.oauth2_token_expires_at.as_ref().map(|s| parse_datetime(s)).transpose()?, jmap_session_url: row.jmap_session_url, jmap_account_id: row.jmap_account_id, sync_interval_minutes: row.sync_interval_minutes, email_signature: row.email_signature, notify_new_emails: row.notify_new_emails != 0, }) } } /// SQLite-backed implementation of [`EmailAccountRepository`]. /// /// Manages email account configurations for both password-based (IMAP/SMTP) /// and OAuth2 (JMAP, Gmail, Outlook) authentication methods. pub struct SqliteEmailAccountRepository { pool: SqlitePool } impl SqliteEmailAccountRepository { /// Creates a new repository instance with the given connection pool. #[tracing::instrument(skip_all)] pub fn new(pool: SqlitePool) -> Self { Self { pool } } /// Full SELECT query with all columns. const SELECT_ALL: &'static str = r#" SELECT id, user_id, account_name, email_address, imap_server, imap_port, smtp_server, smtp_port, username, password, use_tls, last_sync_at, created_at, archive_folder_name, auth_type, oauth2_access_token, oauth2_refresh_token, oauth2_token_expires_at, jmap_session_url, jmap_account_id, sync_interval_minutes, email_signature, notify_new_emails FROM email_accounts "#; } #[async_trait] impl EmailAccountRepository for SqliteEmailAccountRepository { #[tracing::instrument(skip_all)] async fn list_by_user(&self, user_id: UserId) -> Result> { let query = format!("{} WHERE user_id = ? ORDER BY account_name ASC", Self::SELECT_ALL); let rows = sqlx::query_as::<_, EmailAccountRow>(&query) .bind(user_id.to_string()).fetch_all(&self.pool).await.map_err(CoreError::database)?; rows.into_iter().map(EmailAccount::try_from).collect() } #[tracing::instrument(skip_all)] async fn get_by_id(&self, id: EmailAccountId, user_id: UserId) -> Result> { let query = format!("{} WHERE id = ? AND user_id = ?", Self::SELECT_ALL); let row = sqlx::query_as::<_, EmailAccountRow>(&query) .bind(id.to_string()).bind(user_id.to_string()).fetch_optional(&self.pool).await.map_err(CoreError::database)?; row.map(EmailAccount::try_from).transpose() } #[tracing::instrument(skip_all)] async fn create(&self, user_id: UserId, account_name: &str, email_address: &str, imap_server: &str, imap_port: i32, smtp_server: &str, smtp_port: i32, username: &str, password: &str, use_tls: bool, archive_folder_name: Option<&str>) -> Result { let id = EmailAccountId::new(); let now = format_datetime_now(); sqlx::query(r#" INSERT INTO email_accounts (id, user_id, account_name, email_address, imap_server, imap_port, smtp_server, smtp_port, username, password, use_tls, archive_folder_name, created_at, auth_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'password') "#) .bind(id.to_string()).bind(user_id.to_string()).bind(account_name).bind(email_address) .bind(imap_server).bind(imap_port).bind(smtp_server).bind(smtp_port) .bind(username).bind(password).bind(if use_tls { 1 } else { 0 }) .bind(archive_folder_name).bind(&now) .execute(&self.pool).await.map_err(CoreError::database)?; self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created account")) } #[tracing::instrument(skip_all)] async fn create_oauth( &self, user_id: UserId, account_name: &str, email_address: &str, access_token: &str, refresh_token: &str, expires_at: DateTime, jmap_session_url: &str, jmap_account_id: &str, ) -> Result { let id = EmailAccountId::new(); let now = format_datetime_now(); let expires_at_str = format_datetime(&expires_at); sqlx::query(r#" INSERT INTO email_accounts (id, user_id, account_name, email_address, imap_server, imap_port, smtp_server, smtp_port, username, password, use_tls, archive_folder_name, created_at, auth_type, oauth2_access_token, oauth2_refresh_token, oauth2_token_expires_at, jmap_session_url, jmap_account_id) VALUES (?, ?, ?, ?, '', 0, '', 0, '', '', 0, 'Archive', ?, 'oauth2_fastmail', ?, ?, ?, ?, ?) "#) .bind(id.to_string()).bind(user_id.to_string()).bind(account_name).bind(email_address) .bind(&now).bind(access_token).bind(refresh_token).bind(&expires_at_str) .bind(jmap_session_url).bind(jmap_account_id) .execute(&self.pool).await.map_err(CoreError::database)?; self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created OAuth account")) } #[tracing::instrument(skip_all)] async fn create_oauth_imap( &self, user_id: UserId, account_name: &str, email_address: &str, auth_type: EmailAuthType, access_token: &str, refresh_token: &str, expires_at: DateTime, imap_server: &str, imap_port: i32, smtp_server: &str, smtp_port: i32, ) -> Result { let id = EmailAccountId::new(); let now = format_datetime_now(); let expires_at_str = format_datetime(&expires_at); let auth_type_str = auth_type.as_str(); sqlx::query(r#" INSERT INTO email_accounts (id, user_id, account_name, email_address, imap_server, imap_port, smtp_server, smtp_port, username, password, use_tls, archive_folder_name, created_at, auth_type, oauth2_access_token, oauth2_refresh_token, oauth2_token_expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, '', 1, 'Archive', ?, ?, ?, ?, ?) "#) .bind(id.to_string()) .bind(user_id.to_string()) .bind(account_name) .bind(email_address) .bind(imap_server) .bind(imap_port) .bind(smtp_server) .bind(smtp_port) .bind(email_address) // username is email for OAuth .bind(&now) .bind(auth_type_str) .bind(access_token) .bind(refresh_token) .bind(&expires_at_str) .execute(&self.pool).await.map_err(CoreError::database)?; self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created OAuth IMAP account")) } #[tracing::instrument(skip_all)] async fn update(&self, id: EmailAccountId, user_id: UserId, account_name: &str, email_address: &str, imap_server: &str, imap_port: i32, smtp_server: &str, smtp_port: i32, username: &str, password: Option<&str>, use_tls: bool, archive_folder_name: Option<&str>) -> Result> { let result = if let Some(pwd) = password { sqlx::query(r#" UPDATE email_accounts SET account_name = ?, email_address = ?, imap_server = ?, imap_port = ?, smtp_server = ?, smtp_port = ?, username = ?, password = ?, use_tls = ?, archive_folder_name = ? WHERE id = ? AND user_id = ? "#) .bind(account_name).bind(email_address).bind(imap_server).bind(imap_port) .bind(smtp_server).bind(smtp_port).bind(username).bind(pwd) .bind(if use_tls { 1 } else { 0 }).bind(archive_folder_name) .bind(id.to_string()).bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)? } else { sqlx::query(r#" UPDATE email_accounts SET account_name = ?, email_address = ?, imap_server = ?, imap_port = ?, smtp_server = ?, smtp_port = ?, username = ?, use_tls = ?, archive_folder_name = ? WHERE id = ? AND user_id = ? "#) .bind(account_name).bind(email_address).bind(imap_server).bind(imap_port) .bind(smtp_server).bind(smtp_port).bind(username) .bind(if use_tls { 1 } else { 0 }).bind(archive_folder_name) .bind(id.to_string()).bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)? }; if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) } } #[tracing::instrument(skip_all)] async fn update_oauth_tokens( &self, id: EmailAccountId, user_id: UserId, access_token: &str, refresh_token: Option<&str>, expires_at: DateTime, ) -> Result> { let expires_at_str = format_datetime(&expires_at); let result = if let Some(rt) = refresh_token { sqlx::query(r#" UPDATE email_accounts SET oauth2_access_token = ?, oauth2_refresh_token = ?, oauth2_token_expires_at = ? WHERE id = ? AND user_id = ? "#) .bind(access_token).bind(rt).bind(&expires_at_str) .bind(id.to_string()).bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)? } else { sqlx::query(r#" UPDATE email_accounts SET oauth2_access_token = ?, oauth2_token_expires_at = ? WHERE id = ? AND user_id = ? "#) .bind(access_token).bind(&expires_at_str) .bind(id.to_string()).bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)? }; if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) } } #[tracing::instrument(skip_all)] async fn update_jmap_session( &self, id: EmailAccountId, user_id: UserId, session_url: &str, account_id: &str, ) -> Result> { let result = sqlx::query(r#" UPDATE email_accounts SET jmap_session_url = ?, jmap_account_id = ? WHERE id = ? AND user_id = ? "#) .bind(session_url).bind(account_id) .bind(id.to_string()).bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)?; if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) } } #[tracing::instrument(skip_all)] async fn delete(&self, id: EmailAccountId, user_id: UserId) -> Result { let result = sqlx::query("DELETE FROM email_accounts WHERE id = ? AND user_id = ?") .bind(id.to_string()).bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)?; Ok(result.rows_affected() > 0) } #[tracing::instrument(skip_all)] async fn update_last_sync(&self, id: EmailAccountId, user_id: UserId) -> Result { let now = format_datetime_now(); let result = sqlx::query("UPDATE email_accounts SET last_sync_at = ? WHERE id = ? AND user_id = ?") .bind(&now).bind(id.to_string()).bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)?; Ok(result.rows_affected() > 0) } #[tracing::instrument(skip_all)] async fn update_sync_interval(&self, id: EmailAccountId, user_id: UserId, interval_minutes: Option) -> Result> { let result = sqlx::query("UPDATE email_accounts SET sync_interval_minutes = ? WHERE id = ? AND user_id = ?") .bind(interval_minutes) .bind(id.to_string()) .bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)?; if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) } } #[tracing::instrument(skip_all)] async fn update_signature(&self, id: EmailAccountId, user_id: UserId, signature: Option<&str>) -> Result> { let result = sqlx::query("UPDATE email_accounts SET email_signature = ? WHERE id = ? AND user_id = ?") .bind(signature) .bind(id.to_string()) .bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)?; if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) } } #[tracing::instrument(skip_all)] async fn update_notify_new_emails(&self, id: EmailAccountId, user_id: UserId, enabled: bool) -> Result> { let result = sqlx::query("UPDATE email_accounts SET notify_new_emails = ? WHERE id = ? AND user_id = ?") .bind(if enabled { 1 } else { 0 }) .bind(id.to_string()) .bind(user_id.to_string()) .execute(&self.pool).await.map_err(CoreError::database)?; if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) } } #[tracing::instrument(skip_all)] async fn list_accounts_needing_sync(&self, user_id: UserId) -> Result> { let query = format!( "{} WHERE user_id = ? AND sync_interval_minutes IS NOT NULL \ AND (last_sync_at IS NULL \ OR datetime(last_sync_at, '+' || sync_interval_minutes || ' minutes') <= datetime('now'))", Self::SELECT_ALL ); let rows = sqlx::query_as::<_, EmailAccountRow>(&query) .bind(user_id.to_string()) .fetch_all(&self.pool).await.map_err(CoreError::database)?; rows.into_iter().map(EmailAccount::try_from).collect() } #[tracing::instrument(skip_all)] async fn get_folder_sync_state(&self, account_id: EmailAccountId, folder: &str) -> Result> { let row = sqlx::query_as::<_, (i64, i64)>( "SELECT uid_validity, last_seen_uid FROM imap_folder_sync_state WHERE email_account_id = ? AND folder_name = ?" ) .bind(account_id.to_string()) .bind(folder) .fetch_optional(&self.pool).await.map_err(CoreError::database)?; Ok(row.map(|(v, u)| FolderSyncState { uid_validity: v as u32, last_seen_uid: u as u32 })) } #[tracing::instrument(skip_all)] async fn upsert_folder_sync_state(&self, account_id: EmailAccountId, folder: &str, uid_validity: u32, last_seen_uid: u32) -> Result<()> { let now = format_datetime_now(); sqlx::query( "INSERT INTO imap_folder_sync_state (email_account_id, folder_name, uid_validity, last_seen_uid, updated_at) \ VALUES (?, ?, ?, ?, ?) \ ON CONFLICT (email_account_id, folder_name) DO UPDATE SET uid_validity = excluded.uid_validity, last_seen_uid = excluded.last_seen_uid, updated_at = excluded.updated_at" ) .bind(account_id.to_string()) .bind(folder) .bind(uid_validity as i64) .bind(last_seen_uid as i64) .bind(&now) .execute(&self.pool).await.map_err(CoreError::database)?; Ok(()) } #[tracing::instrument(skip_all)] async fn delete_folder_sync_state(&self, account_id: EmailAccountId, folder: &str) -> Result<()> { sqlx::query("DELETE FROM imap_folder_sync_state WHERE email_account_id = ? AND folder_name = ?") .bind(account_id.to_string()) .bind(folder) .execute(&self.pool).await.map_err(CoreError::database)?; Ok(()) } }