Skip to main content

max / goingson

17.7 KB · 379 lines History Blame Raw
1 //! SQLite implementation of the EmailAccountRepository.
2 //!
3 //! Manages email account configurations for IMAP/SMTP and OAuth2/JMAP connectivity.
4 //! Stores server settings, credentials, OAuth tokens, and sync state.
5
6 use async_trait::async_trait;
7 use chrono::{DateTime, Utc};
8 use sqlx::SqlitePool;
9 use goingson_core::{CoreError, EmailAccount, EmailAccountId, EmailAccountRepository, EmailAuthType, FolderSyncState, Result, UserId};
10
11 use crate::utils::{format_datetime, format_datetime_now, parse_datetime, parse_uuid};
12
13 #[derive(Debug, Clone, sqlx::FromRow)]
14 struct EmailAccountRow {
15 pub id: String,
16 pub user_id: String,
17 pub account_name: String,
18 pub email_address: String,
19 pub imap_server: String,
20 pub imap_port: i32,
21 pub smtp_server: String,
22 pub smtp_port: i32,
23 pub username: String,
24 pub password: String,
25 pub use_tls: i32,
26 pub last_sync_at: Option<String>,
27 pub created_at: String,
28 pub archive_folder_name: Option<String>,
29 pub auth_type: String,
30 pub oauth2_access_token: Option<String>,
31 pub oauth2_refresh_token: Option<String>,
32 pub oauth2_token_expires_at: Option<String>,
33 pub jmap_session_url: Option<String>,
34 pub jmap_account_id: Option<String>,
35 pub sync_interval_minutes: Option<i32>,
36 pub email_signature: Option<String>,
37 pub notify_new_emails: i32,
38 }
39
40 impl TryFrom<EmailAccountRow> for EmailAccount {
41 type Error = CoreError;
42
43 fn try_from(row: EmailAccountRow) -> std::result::Result<Self, Self::Error> {
44 Ok(EmailAccount {
45 id: parse_uuid(&row.id)?.into(),
46 user_id: parse_uuid(&row.user_id)?.into(),
47 account_name: row.account_name,
48 email_address: row.email_address,
49 imap_server: row.imap_server,
50 imap_port: row.imap_port,
51 smtp_server: row.smtp_server,
52 smtp_port: row.smtp_port,
53 username: row.username,
54 password: row.password,
55 use_tls: row.use_tls != 0,
56 last_sync_at: row.last_sync_at.as_ref().map(|s| parse_datetime(s)).transpose()?,
57 created_at: parse_datetime(&row.created_at)?,
58 archive_folder_name: row.archive_folder_name,
59 auth_type: EmailAuthType::from_str_or_default(&row.auth_type),
60 oauth2_access_token: row.oauth2_access_token,
61 oauth2_refresh_token: row.oauth2_refresh_token,
62 oauth2_token_expires_at: row.oauth2_token_expires_at.as_ref().map(|s| parse_datetime(s)).transpose()?,
63 jmap_session_url: row.jmap_session_url,
64 jmap_account_id: row.jmap_account_id,
65 sync_interval_minutes: row.sync_interval_minutes,
66 email_signature: row.email_signature,
67 notify_new_emails: row.notify_new_emails != 0,
68 })
69 }
70 }
71
72 /// SQLite-backed implementation of [`EmailAccountRepository`].
73 ///
74 /// Manages email account configurations for both password-based (IMAP/SMTP)
75 /// and OAuth2 (JMAP, Gmail, Outlook) authentication methods.
76 pub struct SqliteEmailAccountRepository { pool: SqlitePool }
77
78 impl SqliteEmailAccountRepository {
79 /// Creates a new repository instance with the given connection pool.
80 #[tracing::instrument(skip_all)]
81 pub fn new(pool: SqlitePool) -> Self { Self { pool } }
82
83 /// Full SELECT query with all columns.
84 const SELECT_ALL: &'static str = r#"
85 SELECT id, user_id, account_name, email_address, imap_server, imap_port,
86 smtp_server, smtp_port, username, password, use_tls, last_sync_at,
87 created_at, archive_folder_name, auth_type, oauth2_access_token,
88 oauth2_refresh_token, oauth2_token_expires_at, jmap_session_url, jmap_account_id,
89 sync_interval_minutes, email_signature, notify_new_emails
90 FROM email_accounts
91 "#;
92 }
93
94 #[async_trait]
95 impl EmailAccountRepository for SqliteEmailAccountRepository {
96 #[tracing::instrument(skip_all)]
97 async fn list_by_user(&self, user_id: UserId) -> Result<Vec<EmailAccount>> {
98 let query = format!("{} WHERE user_id = ? ORDER BY account_name ASC", Self::SELECT_ALL);
99 let rows = sqlx::query_as::<_, EmailAccountRow>(&query)
100 .bind(user_id.to_string()).fetch_all(&self.pool).await.map_err(CoreError::database)?;
101 rows.into_iter().map(EmailAccount::try_from).collect()
102 }
103
104 #[tracing::instrument(skip_all)]
105 async fn get_by_id(&self, id: EmailAccountId, user_id: UserId) -> Result<Option<EmailAccount>> {
106 let query = format!("{} WHERE id = ? AND user_id = ?", Self::SELECT_ALL);
107 let row = sqlx::query_as::<_, EmailAccountRow>(&query)
108 .bind(id.to_string()).bind(user_id.to_string()).fetch_optional(&self.pool).await.map_err(CoreError::database)?;
109 row.map(EmailAccount::try_from).transpose()
110 }
111
112 #[tracing::instrument(skip_all)]
113 async fn create(&self, user_id: UserId, account_name: &str, email_address: &str, imap_server: &str, imap_port: i32, smtp_server: &str, smtp_port: i32, username: &str, password: &str, use_tls: bool, archive_folder_name: Option<&str>) -> Result<EmailAccount> {
114 let id = EmailAccountId::new();
115 let now = format_datetime_now();
116 sqlx::query(r#"
117 INSERT INTO email_accounts (id, user_id, account_name, email_address, imap_server, imap_port,
118 smtp_server, smtp_port, username, password, use_tls, archive_folder_name, created_at, auth_type)
119 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'password')
120 "#)
121 .bind(id.to_string()).bind(user_id.to_string()).bind(account_name).bind(email_address)
122 .bind(imap_server).bind(imap_port).bind(smtp_server).bind(smtp_port)
123 .bind(username).bind(password).bind(if use_tls { 1 } else { 0 })
124 .bind(archive_folder_name).bind(&now)
125 .execute(&self.pool).await.map_err(CoreError::database)?;
126 self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created account"))
127 }
128
129 #[tracing::instrument(skip_all)]
130 async fn create_oauth(
131 &self,
132 user_id: UserId,
133 account_name: &str,
134 email_address: &str,
135 access_token: &str,
136 refresh_token: &str,
137 expires_at: DateTime<Utc>,
138 jmap_session_url: &str,
139 jmap_account_id: &str,
140 ) -> Result<EmailAccount> {
141 let id = EmailAccountId::new();
142 let now = format_datetime_now();
143 let expires_at_str = format_datetime(&expires_at);
144 sqlx::query(r#"
145 INSERT INTO email_accounts (id, user_id, account_name, email_address, imap_server, imap_port,
146 smtp_server, smtp_port, username, password, use_tls, archive_folder_name, created_at,
147 auth_type, oauth2_access_token, oauth2_refresh_token, oauth2_token_expires_at,
148 jmap_session_url, jmap_account_id)
149 VALUES (?, ?, ?, ?, '', 0, '', 0, '', '', 0, 'Archive', ?,
150 'oauth2_fastmail', ?, ?, ?, ?, ?)
151 "#)
152 .bind(id.to_string()).bind(user_id.to_string()).bind(account_name).bind(email_address)
153 .bind(&now).bind(access_token).bind(refresh_token).bind(&expires_at_str)
154 .bind(jmap_session_url).bind(jmap_account_id)
155 .execute(&self.pool).await.map_err(CoreError::database)?;
156 self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created OAuth account"))
157 }
158
159 #[tracing::instrument(skip_all)]
160 async fn create_oauth_imap(
161 &self,
162 user_id: UserId,
163 account_name: &str,
164 email_address: &str,
165 auth_type: EmailAuthType,
166 access_token: &str,
167 refresh_token: &str,
168 expires_at: DateTime<Utc>,
169 imap_server: &str,
170 imap_port: i32,
171 smtp_server: &str,
172 smtp_port: i32,
173 ) -> Result<EmailAccount> {
174 let id = EmailAccountId::new();
175 let now = format_datetime_now();
176 let expires_at_str = format_datetime(&expires_at);
177 let auth_type_str = auth_type.as_str();
178
179 sqlx::query(r#"
180 INSERT INTO email_accounts (id, user_id, account_name, email_address, imap_server, imap_port,
181 smtp_server, smtp_port, username, password, use_tls, archive_folder_name, created_at,
182 auth_type, oauth2_access_token, oauth2_refresh_token, oauth2_token_expires_at)
183 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, '', 1, 'Archive', ?,
184 ?, ?, ?, ?)
185 "#)
186 .bind(id.to_string())
187 .bind(user_id.to_string())
188 .bind(account_name)
189 .bind(email_address)
190 .bind(imap_server)
191 .bind(imap_port)
192 .bind(smtp_server)
193 .bind(smtp_port)
194 .bind(email_address) // username is email for OAuth
195 .bind(&now)
196 .bind(auth_type_str)
197 .bind(access_token)
198 .bind(refresh_token)
199 .bind(&expires_at_str)
200 .execute(&self.pool).await.map_err(CoreError::database)?;
201
202 self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created OAuth IMAP account"))
203 }
204
205 #[tracing::instrument(skip_all)]
206 async fn update(&self, id: EmailAccountId, user_id: UserId, account_name: &str, email_address: &str, imap_server: &str, imap_port: i32, smtp_server: &str, smtp_port: i32, username: &str, password: Option<&str>, use_tls: bool, archive_folder_name: Option<&str>) -> Result<Option<EmailAccount>> {
207 let result = if let Some(pwd) = password {
208 sqlx::query(r#"
209 UPDATE email_accounts SET account_name = ?, email_address = ?, imap_server = ?, imap_port = ?,
210 smtp_server = ?, smtp_port = ?, username = ?, password = ?, use_tls = ?, archive_folder_name = ?
211 WHERE id = ? AND user_id = ?
212 "#)
213 .bind(account_name).bind(email_address).bind(imap_server).bind(imap_port)
214 .bind(smtp_server).bind(smtp_port).bind(username).bind(pwd)
215 .bind(if use_tls { 1 } else { 0 }).bind(archive_folder_name)
216 .bind(id.to_string()).bind(user_id.to_string())
217 .execute(&self.pool).await.map_err(CoreError::database)?
218 } else {
219 sqlx::query(r#"
220 UPDATE email_accounts SET account_name = ?, email_address = ?, imap_server = ?, imap_port = ?,
221 smtp_server = ?, smtp_port = ?, username = ?, use_tls = ?, archive_folder_name = ?
222 WHERE id = ? AND user_id = ?
223 "#)
224 .bind(account_name).bind(email_address).bind(imap_server).bind(imap_port)
225 .bind(smtp_server).bind(smtp_port).bind(username)
226 .bind(if use_tls { 1 } else { 0 }).bind(archive_folder_name)
227 .bind(id.to_string()).bind(user_id.to_string())
228 .execute(&self.pool).await.map_err(CoreError::database)?
229 };
230 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
231 }
232
233 #[tracing::instrument(skip_all)]
234 async fn update_oauth_tokens(
235 &self,
236 id: EmailAccountId,
237 user_id: UserId,
238 access_token: &str,
239 refresh_token: Option<&str>,
240 expires_at: DateTime<Utc>,
241 ) -> Result<Option<EmailAccount>> {
242 let expires_at_str = format_datetime(&expires_at);
243 let result = if let Some(rt) = refresh_token {
244 sqlx::query(r#"
245 UPDATE email_accounts SET oauth2_access_token = ?, oauth2_refresh_token = ?, oauth2_token_expires_at = ?
246 WHERE id = ? AND user_id = ?
247 "#)
248 .bind(access_token).bind(rt).bind(&expires_at_str)
249 .bind(id.to_string()).bind(user_id.to_string())
250 .execute(&self.pool).await.map_err(CoreError::database)?
251 } else {
252 sqlx::query(r#"
253 UPDATE email_accounts SET oauth2_access_token = ?, oauth2_token_expires_at = ?
254 WHERE id = ? AND user_id = ?
255 "#)
256 .bind(access_token).bind(&expires_at_str)
257 .bind(id.to_string()).bind(user_id.to_string())
258 .execute(&self.pool).await.map_err(CoreError::database)?
259 };
260 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
261 }
262
263 #[tracing::instrument(skip_all)]
264 async fn update_jmap_session(
265 &self,
266 id: EmailAccountId,
267 user_id: UserId,
268 session_url: &str,
269 account_id: &str,
270 ) -> Result<Option<EmailAccount>> {
271 let result = sqlx::query(r#"
272 UPDATE email_accounts SET jmap_session_url = ?, jmap_account_id = ?
273 WHERE id = ? AND user_id = ?
274 "#)
275 .bind(session_url).bind(account_id)
276 .bind(id.to_string()).bind(user_id.to_string())
277 .execute(&self.pool).await.map_err(CoreError::database)?;
278 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
279 }
280
281 #[tracing::instrument(skip_all)]
282 async fn delete(&self, id: EmailAccountId, user_id: UserId) -> Result<bool> {
283 let result = sqlx::query("DELETE FROM email_accounts WHERE id = ? AND user_id = ?")
284 .bind(id.to_string()).bind(user_id.to_string())
285 .execute(&self.pool).await.map_err(CoreError::database)?;
286 Ok(result.rows_affected() > 0)
287 }
288
289 #[tracing::instrument(skip_all)]
290 async fn update_last_sync(&self, id: EmailAccountId, user_id: UserId) -> Result<bool> {
291 let now = format_datetime_now();
292 let result = sqlx::query("UPDATE email_accounts SET last_sync_at = ? WHERE id = ? AND user_id = ?")
293 .bind(&now).bind(id.to_string()).bind(user_id.to_string())
294 .execute(&self.pool).await.map_err(CoreError::database)?;
295 Ok(result.rows_affected() > 0)
296 }
297
298 #[tracing::instrument(skip_all)]
299 async fn update_sync_interval(&self, id: EmailAccountId, user_id: UserId, interval_minutes: Option<i32>) -> Result<Option<EmailAccount>> {
300 let result = sqlx::query("UPDATE email_accounts SET sync_interval_minutes = ? WHERE id = ? AND user_id = ?")
301 .bind(interval_minutes)
302 .bind(id.to_string())
303 .bind(user_id.to_string())
304 .execute(&self.pool).await.map_err(CoreError::database)?;
305 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
306 }
307
308 #[tracing::instrument(skip_all)]
309 async fn update_signature(&self, id: EmailAccountId, user_id: UserId, signature: Option<&str>) -> Result<Option<EmailAccount>> {
310 let result = sqlx::query("UPDATE email_accounts SET email_signature = ? WHERE id = ? AND user_id = ?")
311 .bind(signature)
312 .bind(id.to_string())
313 .bind(user_id.to_string())
314 .execute(&self.pool).await.map_err(CoreError::database)?;
315 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
316 }
317
318 #[tracing::instrument(skip_all)]
319 async fn update_notify_new_emails(&self, id: EmailAccountId, user_id: UserId, enabled: bool) -> Result<Option<EmailAccount>> {
320 let result = sqlx::query("UPDATE email_accounts SET notify_new_emails = ? WHERE id = ? AND user_id = ?")
321 .bind(if enabled { 1 } else { 0 })
322 .bind(id.to_string())
323 .bind(user_id.to_string())
324 .execute(&self.pool).await.map_err(CoreError::database)?;
325 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
326 }
327
328 #[tracing::instrument(skip_all)]
329 async fn list_accounts_needing_sync(&self, user_id: UserId) -> Result<Vec<EmailAccount>> {
330 let query = format!(
331 "{} WHERE user_id = ? AND sync_interval_minutes IS NOT NULL \
332 AND (last_sync_at IS NULL \
333 OR datetime(last_sync_at, '+' || sync_interval_minutes || ' minutes') <= datetime('now'))",
334 Self::SELECT_ALL
335 );
336 let rows = sqlx::query_as::<_, EmailAccountRow>(&query)
337 .bind(user_id.to_string())
338 .fetch_all(&self.pool).await.map_err(CoreError::database)?;
339 rows.into_iter().map(EmailAccount::try_from).collect()
340 }
341
342 #[tracing::instrument(skip_all)]
343 async fn get_folder_sync_state(&self, account_id: EmailAccountId, folder: &str) -> Result<Option<FolderSyncState>> {
344 let row = sqlx::query_as::<_, (i64, i64)>(
345 "SELECT uid_validity, last_seen_uid FROM imap_folder_sync_state WHERE email_account_id = ? AND folder_name = ?"
346 )
347 .bind(account_id.to_string())
348 .bind(folder)
349 .fetch_optional(&self.pool).await.map_err(CoreError::database)?;
350 Ok(row.map(|(v, u)| FolderSyncState { uid_validity: v as u32, last_seen_uid: u as u32 }))
351 }
352
353 #[tracing::instrument(skip_all)]
354 async fn upsert_folder_sync_state(&self, account_id: EmailAccountId, folder: &str, uid_validity: u32, last_seen_uid: u32) -> Result<()> {
355 let now = format_datetime_now();
356 sqlx::query(
357 "INSERT INTO imap_folder_sync_state (email_account_id, folder_name, uid_validity, last_seen_uid, updated_at) \
358 VALUES (?, ?, ?, ?, ?) \
359 ON CONFLICT (email_account_id, folder_name) DO UPDATE SET uid_validity = excluded.uid_validity, last_seen_uid = excluded.last_seen_uid, updated_at = excluded.updated_at"
360 )
361 .bind(account_id.to_string())
362 .bind(folder)
363 .bind(uid_validity as i64)
364 .bind(last_seen_uid as i64)
365 .bind(&now)
366 .execute(&self.pool).await.map_err(CoreError::database)?;
367 Ok(())
368 }
369
370 #[tracing::instrument(skip_all)]
371 async fn delete_folder_sync_state(&self, account_id: EmailAccountId, folder: &str) -> Result<()> {
372 sqlx::query("DELETE FROM imap_folder_sync_state WHERE email_account_id = ? AND folder_name = ?")
373 .bind(account_id.to_string())
374 .bind(folder)
375 .execute(&self.pool).await.map_err(CoreError::database)?;
376 Ok(())
377 }
378 }
379