Skip to main content

max / goingson

34.6 KB · 1026 lines History Blame Raw
1 //! SQLite implementation of the ContactRepository.
2 //!
3 //! Manages contacts with sub-collections (emails, phones, social handles).
4 //! Sub-collections are stored in separate tables and batch-loaded for list operations.
5
6 use async_trait::async_trait;
7 use sqlx::SqlitePool;
8 use std::collections::{HashMap, HashSet};
9 use goingson_core::{
10 Contact, ContactCustomField, ContactEmail, ContactEmailId, ContactId, ContactPhone,
11 ContactPhoneId, ContactRepository, CoreError, CustomFieldId, NewContact,
12 NewContactCustomField, NewContactEmail, NewContactPhone, NewSocialHandle, Result,
13 SocialHandle, SocialHandleId, UpdateContact, UserId,
14 };
15
16 use crate::utils::{escape_like, format_datetime_now, parse_datetime, parse_tags, parse_uuid};
17
18 // ============ Row Structs ============
19
20 #[derive(Debug, Clone, sqlx::FromRow)]
21 struct ContactRow {
22 pub id: String,
23 pub display_name: String,
24 pub nickname: Option<String>,
25 pub company: Option<String>,
26 pub title: Option<String>,
27 pub notes: String,
28 pub tags: String,
29 pub birthday: Option<String>,
30 pub timezone: Option<String>,
31 pub external_source: Option<String>,
32 pub external_id: Option<String>,
33 pub is_implicit: i32,
34 pub created_at: String,
35 pub updated_at: String,
36 }
37
38 #[derive(Debug, Clone, sqlx::FromRow)]
39 struct ContactEmailRow {
40 pub id: String,
41 pub contact_id: String,
42 pub address: String,
43 pub label: String,
44 pub is_primary: i32,
45 }
46
47 #[derive(Debug, Clone, sqlx::FromRow)]
48 struct ContactPhoneRow {
49 pub id: String,
50 pub contact_id: String,
51 pub number: String,
52 pub label: String,
53 pub is_primary: i32,
54 }
55
56 #[derive(Debug, Clone, sqlx::FromRow)]
57 struct SocialHandleRow {
58 pub id: String,
59 pub contact_id: String,
60 pub platform: String,
61 pub handle: String,
62 pub url: Option<String>,
63 }
64
65 #[derive(Debug, Clone, sqlx::FromRow)]
66 struct CustomFieldRow {
67 pub id: String,
68 pub contact_id: String,
69 pub label: String,
70 pub value: String,
71 pub url: Option<String>,
72 }
73
74 // ============ Row Conversions ============
75
76 fn contact_email_from_row(row: ContactEmailRow) -> Result<ContactEmail> {
77 Ok(ContactEmail {
78 id: parse_uuid(&row.id)?.into(),
79 contact_id: parse_uuid(&row.contact_id)?.into(),
80 address: row.address,
81 label: row.label,
82 is_primary: row.is_primary != 0,
83 })
84 }
85
86 fn contact_phone_from_row(row: ContactPhoneRow) -> Result<ContactPhone> {
87 Ok(ContactPhone {
88 id: parse_uuid(&row.id)?.into(),
89 contact_id: parse_uuid(&row.contact_id)?.into(),
90 number: row.number,
91 label: row.label,
92 is_primary: row.is_primary != 0,
93 })
94 }
95
96 fn social_handle_from_row(row: SocialHandleRow) -> Result<SocialHandle> {
97 Ok(SocialHandle {
98 id: parse_uuid(&row.id)?.into(),
99 contact_id: parse_uuid(&row.contact_id)?.into(),
100 platform: row.platform,
101 handle: row.handle,
102 url: row.url,
103 })
104 }
105
106 fn custom_field_from_row(row: CustomFieldRow) -> Result<ContactCustomField> {
107 Ok(ContactCustomField {
108 id: parse_uuid(&row.id)?.into(),
109 contact_id: parse_uuid(&row.contact_id)?.into(),
110 label: row.label,
111 value: row.value,
112 url: row.url,
113 })
114 }
115
116 fn contact_from_row(
117 row: ContactRow,
118 emails: Vec<ContactEmail>,
119 phones: Vec<ContactPhone>,
120 social_handles: Vec<SocialHandle>,
121 custom_fields: Vec<ContactCustomField>,
122 ) -> Result<Contact> {
123 let birthday = row
124 .birthday
125 .as_deref()
126 .map(|s| {
127 chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d")
128 .map_err(|e| CoreError::database_msg(format!("Invalid birthday: {}", e)))
129 })
130 .transpose()?;
131
132 Ok(Contact {
133 id: parse_uuid(&row.id)?.into(),
134 display_name: row.display_name,
135 nickname: row.nickname,
136 company: row.company,
137 title: row.title,
138 notes: row.notes,
139 tags: parse_tags(&row.tags),
140 birthday,
141 timezone: row.timezone,
142 external_source: row.external_source,
143 external_id: row.external_id,
144 is_implicit: row.is_implicit != 0,
145 emails,
146 phones,
147 social_handles,
148 custom_fields,
149 created_at: parse_datetime(&row.created_at)?,
150 updated_at: parse_datetime(&row.updated_at)?,
151 })
152 }
153
154 // ============ Repository ============
155
156 /// SQLite-backed implementation of [`ContactRepository`].
157 pub struct SqliteContactRepository {
158 pool: SqlitePool,
159 }
160
161 impl SqliteContactRepository {
162 /// Creates a new repository instance with the given connection pool.
163 #[tracing::instrument(skip_all)]
164 pub fn new(pool: SqlitePool) -> Self {
165 Self { pool }
166 }
167
168 /// Batch-load emails for a set of contact IDs.
169 async fn load_emails_for_contacts(
170 &self,
171 ids: &[String],
172 ) -> Result<HashMap<ContactId, Vec<ContactEmail>>> {
173 if ids.is_empty() {
174 return Ok(HashMap::new());
175 }
176
177 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
178 let sql = format!(
179 "SELECT id, contact_id, address, label, is_primary FROM contact_emails WHERE contact_id IN ({}) ORDER BY is_primary DESC, rowid ASC",
180 placeholders
181 );
182
183 let mut query = sqlx::query_as::<_, ContactEmailRow>(&sql);
184 for id in ids {
185 query = query.bind(id);
186 }
187
188 let rows = query.fetch_all(&self.pool).await.map_err(CoreError::database)?;
189 let mut map: HashMap<ContactId, Vec<ContactEmail>> = HashMap::new();
190 for row in rows {
191 let contact_id: ContactId = parse_uuid(&row.contact_id)?.into();
192 let email = contact_email_from_row(row)?;
193 map.entry(contact_id).or_default().push(email);
194 }
195 Ok(map)
196 }
197
198 /// Batch-load phones for a set of contact IDs.
199 async fn load_phones_for_contacts(
200 &self,
201 ids: &[String],
202 ) -> Result<HashMap<ContactId, Vec<ContactPhone>>> {
203 if ids.is_empty() {
204 return Ok(HashMap::new());
205 }
206
207 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
208 let sql = format!(
209 "SELECT id, contact_id, number, label, is_primary FROM contact_phones WHERE contact_id IN ({}) ORDER BY is_primary DESC, rowid ASC",
210 placeholders
211 );
212
213 let mut query = sqlx::query_as::<_, ContactPhoneRow>(&sql);
214 for id in ids {
215 query = query.bind(id);
216 }
217
218 let rows = query.fetch_all(&self.pool).await.map_err(CoreError::database)?;
219 let mut map: HashMap<ContactId, Vec<ContactPhone>> = HashMap::new();
220 for row in rows {
221 let contact_id: ContactId = parse_uuid(&row.contact_id)?.into();
222 let phone = contact_phone_from_row(row)?;
223 map.entry(contact_id).or_default().push(phone);
224 }
225 Ok(map)
226 }
227
228 /// Batch-load social handles for a set of contact IDs.
229 async fn load_social_handles_for_contacts(
230 &self,
231 ids: &[String],
232 ) -> Result<HashMap<ContactId, Vec<SocialHandle>>> {
233 if ids.is_empty() {
234 return Ok(HashMap::new());
235 }
236
237 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
238 let sql = format!(
239 "SELECT id, contact_id, platform, handle, url FROM contact_social_handles WHERE contact_id IN ({}) ORDER BY rowid ASC",
240 placeholders
241 );
242
243 let mut query = sqlx::query_as::<_, SocialHandleRow>(&sql);
244 for id in ids {
245 query = query.bind(id);
246 }
247
248 let rows = query.fetch_all(&self.pool).await.map_err(CoreError::database)?;
249 let mut map: HashMap<ContactId, Vec<SocialHandle>> = HashMap::new();
250 for row in rows {
251 let contact_id: ContactId = parse_uuid(&row.contact_id)?.into();
252 let handle = social_handle_from_row(row)?;
253 map.entry(contact_id).or_default().push(handle);
254 }
255 Ok(map)
256 }
257
258 /// Batch-load custom fields for a set of contact IDs.
259 async fn load_custom_fields_for_contacts(
260 &self,
261 ids: &[String],
262 ) -> Result<HashMap<ContactId, Vec<ContactCustomField>>> {
263 if ids.is_empty() {
264 return Ok(HashMap::new());
265 }
266
267 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
268 let sql = format!(
269 "SELECT id, contact_id, label, value, url FROM contact_custom_fields WHERE contact_id IN ({}) ORDER BY rowid ASC",
270 placeholders
271 );
272
273 let mut query = sqlx::query_as::<_, CustomFieldRow>(&sql);
274 for id in ids {
275 query = query.bind(id);
276 }
277
278 let rows = query.fetch_all(&self.pool).await.map_err(CoreError::database)?;
279 let mut map: HashMap<ContactId, Vec<ContactCustomField>> = HashMap::new();
280 for row in rows {
281 let contact_id: ContactId = parse_uuid(&row.contact_id)?.into();
282 let field = custom_field_from_row(row)?;
283 map.entry(contact_id).or_default().push(field);
284 }
285 Ok(map)
286 }
287
288 /// Hydrate a list of contact rows with their sub-collections.
289 async fn hydrate_contacts(&self, rows: Vec<ContactRow>) -> Result<Vec<Contact>> {
290 if rows.is_empty() {
291 return Ok(vec![]);
292 }
293
294 let ids: Vec<String> = rows.iter().map(|r| r.id.clone()).collect();
295 let mut emails_map = self.load_emails_for_contacts(&ids).await?;
296 let mut phones_map = self.load_phones_for_contacts(&ids).await?;
297 let mut social_map = self.load_social_handles_for_contacts(&ids).await?;
298 let mut custom_map = self.load_custom_fields_for_contacts(&ids).await?;
299
300 let mut contacts = Vec::with_capacity(rows.len());
301 for row in rows {
302 let id: ContactId = parse_uuid(&row.id)?.into();
303 let emails = emails_map.remove(&id).unwrap_or_default();
304 let phones = phones_map.remove(&id).unwrap_or_default();
305 let social_handles = social_map.remove(&id).unwrap_or_default();
306 let custom_fields = custom_map.remove(&id).unwrap_or_default();
307 contacts.push(contact_from_row(row, emails, phones, social_handles, custom_fields)?);
308 }
309 Ok(contacts)
310 }
311 }
312
313 #[async_trait]
314 impl ContactRepository for SqliteContactRepository {
315 #[tracing::instrument(skip_all)]
316 async fn list_all(&self, user_id: UserId) -> Result<Vec<Contact>> {
317 let rows = sqlx::query_as::<_, ContactRow>(
318 r#"
319 SELECT id, display_name, nickname, company, title, notes, tags, birthday, timezone, external_source, external_id, is_implicit, created_at, updated_at
320 FROM contacts
321 WHERE user_id = ? AND is_implicit = 0
322 ORDER BY display_name ASC
323 "#,
324 )
325 .bind(user_id.to_string())
326 .fetch_all(&self.pool)
327 .await
328 .map_err(CoreError::database)?;
329
330 self.hydrate_contacts(rows).await
331 }
332
333 #[tracing::instrument(skip_all)]
334 async fn get_by_id(&self, id: ContactId, user_id: UserId) -> Result<Option<Contact>> {
335 let row = sqlx::query_as::<_, ContactRow>(
336 r#"
337 SELECT id, display_name, nickname, company, title, notes, tags, birthday, timezone, external_source, external_id, is_implicit, created_at, updated_at
338 FROM contacts
339 WHERE id = ? AND user_id = ?
340 "#,
341 )
342 .bind(id.to_string())
343 .bind(user_id.to_string())
344 .fetch_optional(&self.pool)
345 .await
346 .map_err(CoreError::database)?;
347
348 match row {
349 Some(r) => {
350 let contacts = self.hydrate_contacts(vec![r]).await?;
351 Ok(contacts.into_iter().next())
352 }
353 None => Ok(None),
354 }
355 }
356
357 #[tracing::instrument(skip_all)]
358 async fn create(&self, user_id: UserId, contact: NewContact) -> Result<Contact> {
359 let id = ContactId::new();
360 let now = format_datetime_now();
361 let tags_json = serde_json::to_string(&contact.tags).unwrap_or_else(|_| "[]".to_string());
362 let birthday_str = contact.birthday.map(|d| d.format("%Y-%m-%d").to_string());
363
364 sqlx::query(
365 r#"
366 INSERT INTO contacts (id, user_id, display_name, nickname, company, title, notes, tags, birthday, timezone, is_implicit, created_at, updated_at)
367 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
368 "#,
369 )
370 .bind(id.to_string())
371 .bind(user_id.to_string())
372 .bind(&contact.display_name)
373 .bind(&contact.nickname)
374 .bind(&contact.company)
375 .bind(&contact.title)
376 .bind(&contact.notes)
377 .bind(&tags_json)
378 .bind(&birthday_str)
379 .bind(&contact.timezone)
380 .bind(if contact.is_implicit { 1 } else { 0 })
381 .bind(&now)
382 .bind(&now)
383 .execute(&self.pool)
384 .await
385 .map_err(CoreError::database)?;
386
387 self.get_by_id(id, user_id)
388 .await?
389 .ok_or_else(|| CoreError::internal("Failed to retrieve created contact"))
390 }
391
392 #[tracing::instrument(skip_all)]
393 async fn update(&self, id: ContactId, user_id: UserId, contact: UpdateContact) -> Result<Option<Contact>> {
394 let now = format_datetime_now();
395 let tags_json = serde_json::to_string(&contact.tags).unwrap_or_else(|_| "[]".to_string());
396 let birthday_str = contact.birthday.map(|d| d.format("%Y-%m-%d").to_string());
397
398 let result = sqlx::query(
399 r#"
400 UPDATE contacts
401 SET display_name = ?, nickname = ?, company = ?, title = ?, notes = ?, tags = ?, birthday = ?, timezone = ?, updated_at = ?
402 WHERE id = ? AND user_id = ?
403 "#,
404 )
405 .bind(&contact.display_name)
406 .bind(&contact.nickname)
407 .bind(&contact.company)
408 .bind(&contact.title)
409 .bind(&contact.notes)
410 .bind(&tags_json)
411 .bind(&birthday_str)
412 .bind(&contact.timezone)
413 .bind(&now)
414 .bind(id.to_string())
415 .bind(user_id.to_string())
416 .execute(&self.pool)
417 .await
418 .map_err(CoreError::database)?;
419
420 if result.rows_affected() > 0 {
421 self.get_by_id(id, user_id).await
422 } else {
423 Ok(None)
424 }
425 }
426
427 #[tracing::instrument(skip_all)]
428 async fn delete(&self, id: ContactId, user_id: UserId) -> Result<bool> {
429 let result = sqlx::query("DELETE FROM contacts WHERE id = ? AND user_id = ?")
430 .bind(id.to_string())
431 .bind(user_id.to_string())
432 .execute(&self.pool)
433 .await
434 .map_err(CoreError::database)?;
435
436 Ok(result.rows_affected() > 0)
437 }
438
439 #[tracing::instrument(skip_all)]
440 async fn delete_many(&self, ids: &[ContactId], user_id: UserId) -> Result<u64> {
441 if ids.is_empty() {
442 return Ok(0);
443 }
444 let user_id_str = user_id.to_string();
445 let placeholders = vec!["?"; ids.len()].join(",");
446 let sql = format!("DELETE FROM contacts WHERE user_id = ? AND id IN ({placeholders})");
447 let mut query = sqlx::query(&sql).bind(&user_id_str);
448 for id in ids {
449 query = query.bind(id.to_string());
450 }
451 let result = query.execute(&self.pool).await.map_err(CoreError::database)?;
452 Ok(result.rows_affected())
453 }
454
455 #[tracing::instrument(skip_all)]
456 async fn tag_many(&self, ids: &[ContactId], user_id: UserId, tag: &str) -> Result<u64> {
457 if ids.is_empty() || tag.is_empty() {
458 return Ok(0);
459 }
460 let user_id_str = user_id.to_string();
461 let like_pattern = format!("%\"{}\"%" , escape_like(tag));
462 let placeholders = vec!["?"; ids.len()].join(",");
463 // Append tag to JSON array where not already present.
464 let sql = format!(
465 r#"UPDATE contacts
466 SET tags = CASE
467 WHEN tags IS NULL OR tags = '' OR tags = '[]'
468 THEN json_array(?)
469 ELSE json_insert(tags, '$[#]', ?)
470 END,
471 updated_at = datetime('now')
472 WHERE user_id = ? AND id IN ({placeholders})
473 AND (tags IS NULL OR tags NOT LIKE ? ESCAPE '\')"#,
474 );
475 let mut query = sqlx::query(&sql)
476 .bind(tag)
477 .bind(tag)
478 .bind(&user_id_str);
479 for id in ids {
480 query = query.bind(id.to_string());
481 }
482 query = query.bind(&like_pattern);
483 let result = query.execute(&self.pool).await.map_err(CoreError::database)?;
484 Ok(result.rows_affected())
485 }
486
487 #[tracing::instrument(skip_all)]
488 async fn list_by_tag(&self, user_id: UserId, tag: &str) -> Result<Vec<Contact>> {
489 // Tags stored as JSON array, use LIKE for matching
490 let pattern = format!("%\"{}\"%" , escape_like(tag));
491 let rows = sqlx::query_as::<_, ContactRow>(
492 r#"
493 SELECT id, display_name, nickname, company, title, notes, tags, birthday, timezone, external_source, external_id, is_implicit, created_at, updated_at
494 FROM contacts
495 WHERE user_id = ? AND tags LIKE ? ESCAPE '\'
496 ORDER BY display_name ASC
497 "#,
498 )
499 .bind(user_id.to_string())
500 .bind(&pattern)
501 .fetch_all(&self.pool)
502 .await
503 .map_err(CoreError::database)?;
504
505 self.hydrate_contacts(rows).await
506 }
507
508 #[tracing::instrument(skip_all)]
509 async fn list_filtered(&self, user_id: UserId, search: Option<&str>, tag: Option<&str>, include_implicit: bool) -> Result<Vec<Contact>> {
510 let has_search = search.is_some_and(|s| !s.is_empty());
511 let has_tag = tag.is_some_and(|t| !t.is_empty());
512
513 if !has_search && !has_tag && !include_implicit {
514 return self.list_all(user_id).await;
515 }
516
517 let mut conditions = vec!["c.user_id = ?".to_string()];
518 if !include_implicit {
519 conditions.push("c.is_implicit = 0".to_string());
520 }
521 let mut binds: Vec<String> = vec![user_id.to_string()];
522
523 if let Some(t) = tag.filter(|t| !t.is_empty()) {
524 conditions.push("c.tags LIKE ? ESCAPE '\\'".to_string());
525 binds.push(format!("%\"{}\"%" , escape_like(t)));
526 }
527
528 if let Some(s) = search.filter(|s| !s.is_empty()) {
529 let search_pattern = format!("%{}%", escape_like(&s.to_lowercase()));
530 // Search across contact fields and email addresses using a subquery
531 conditions.push(
532 "(LOWER(c.display_name) LIKE ? ESCAPE '\\' OR LOWER(COALESCE(c.nickname, '')) LIKE ? ESCAPE '\\' OR LOWER(COALESCE(c.company, '')) LIKE ? ESCAPE '\\' OR LOWER(COALESCE(c.title, '')) LIKE ? ESCAPE '\\' OR LOWER(c.notes) LIKE ? ESCAPE '\\' OR EXISTS (SELECT 1 FROM contact_emails ce WHERE ce.contact_id = c.id AND LOWER(ce.address) LIKE ? ESCAPE '\\'))".to_string()
533 );
534 // 6 binds for the search pattern
535 for _ in 0..6 {
536 binds.push(search_pattern.clone());
537 }
538 }
539
540 let sql = format!(
541 "SELECT c.id, c.display_name, c.nickname, c.company, c.title, c.notes, c.tags, c.birthday, c.timezone, c.external_source, c.external_id, c.is_implicit, c.created_at, c.updated_at FROM contacts c WHERE {} ORDER BY c.display_name ASC",
542 conditions.join(" AND ")
543 );
544
545 let mut query = sqlx::query_as::<_, ContactRow>(&sql);
546 for bind in &binds {
547 query = query.bind(bind);
548 }
549
550 let rows = query.fetch_all(&self.pool).await.map_err(CoreError::database)?;
551 self.hydrate_contacts(rows).await
552 }
553
554 #[tracing::instrument(skip_all)]
555 async fn find_by_email(&self, user_id: UserId, email: &str) -> Result<Option<Contact>> {
556 let row = sqlx::query_as::<_, ContactRow>(
557 r#"
558 SELECT c.id, c.display_name, c.nickname, c.company, c.title, c.notes, c.tags, c.birthday, c.timezone, c.external_source, c.external_id, c.is_implicit, c.created_at, c.updated_at
559 FROM contacts c
560 JOIN contact_emails ce ON ce.contact_id = c.id
561 WHERE c.user_id = ? AND LOWER(ce.address) = LOWER(?)
562 LIMIT 1
563 "#,
564 )
565 .bind(user_id.to_string())
566 .bind(email)
567 .fetch_optional(&self.pool)
568 .await
569 .map_err(CoreError::database)?;
570
571 match row {
572 Some(r) => {
573 let contacts = self.hydrate_contacts(vec![r]).await?;
574 Ok(contacts.into_iter().next())
575 }
576 None => Ok(None),
577 }
578 }
579
580 #[tracing::instrument(skip_all)]
581 async fn find_emails_in_contacts(&self, user_id: UserId, addresses: &[&str]) -> Result<HashSet<String>> {
582 if addresses.is_empty() {
583 return Ok(HashSet::new());
584 }
585
586 let placeholders = addresses.iter().map(|_| "?").collect::<Vec<_>>().join(",");
587 let query = format!(
588 "SELECT DISTINCT LOWER(ce.address) FROM contact_emails ce JOIN contacts c ON ce.contact_id = c.id WHERE c.user_id = ? AND LOWER(ce.address) IN ({})",
589 placeholders
590 );
591
592 let mut q = sqlx::query_as::<_, (String,)>(&query).bind(user_id.to_string());
593 for addr in addresses {
594 q = q.bind(addr.to_lowercase());
595 }
596
597 let rows = q.fetch_all(&self.pool).await
598 .map_err(CoreError::database)?;
599
600 Ok(rows.into_iter().map(|(a,)| a).collect())
601 }
602
603 #[tracing::instrument(skip_all)]
604 async fn promote_contact(&self, id: ContactId, user_id: UserId) -> Result<Option<Contact>> {
605 let now = format_datetime_now();
606 let result = sqlx::query(
607 "UPDATE contacts SET is_implicit = 0, updated_at = ? WHERE id = ? AND user_id = ?"
608 )
609 .bind(&now)
610 .bind(id.to_string())
611 .bind(user_id.to_string())
612 .execute(&self.pool)
613 .await
614 .map_err(CoreError::database)?;
615
616 if result.rows_affected() > 0 {
617 self.get_by_id(id, user_id).await
618 } else {
619 Ok(None)
620 }
621 }
622
623 #[tracing::instrument(skip_all)]
624 async fn find_by_external_id(&self, source: &str, ext_id: &str, user_id: UserId) -> Result<Option<Contact>> {
625 let row = sqlx::query_as::<_, ContactRow>(
626 r#"
627 SELECT id, display_name, nickname, company, title, notes, tags, birthday, timezone, external_source, external_id, is_implicit, created_at, updated_at
628 FROM contacts
629 WHERE user_id = ? AND external_source = ? AND external_id = ?
630 LIMIT 1
631 "#,
632 )
633 .bind(user_id.to_string())
634 .bind(source)
635 .bind(ext_id)
636 .fetch_optional(&self.pool)
637 .await
638 .map_err(CoreError::database)?;
639
640 match row {
641 Some(r) => {
642 let contacts = self.hydrate_contacts(vec![r]).await?;
643 Ok(contacts.into_iter().next())
644 }
645 None => Ok(None),
646 }
647 }
648
649 #[tracing::instrument(skip_all)]
650 async fn add_email(&self, contact_id: ContactId, user_id: UserId, email: NewContactEmail) -> Result<ContactEmail> {
651 // Verify contact ownership
652 let exists = sqlx::query_scalar::<_, i32>(
653 "SELECT COUNT(*) FROM contacts WHERE id = ? AND user_id = ?"
654 )
655 .bind(contact_id.to_string())
656 .bind(user_id.to_string())
657 .fetch_one(&self.pool)
658 .await
659 .map_err(CoreError::database)?;
660
661 if exists == 0 {
662 return Err(CoreError::not_found("contact", contact_id));
663 }
664
665 let id = ContactEmailId::new();
666 sqlx::query(
667 "INSERT INTO contact_emails (id, contact_id, address, label, is_primary) VALUES (?, ?, ?, ?, ?)"
668 )
669 .bind(id.to_string())
670 .bind(contact_id.to_string())
671 .bind(&email.address)
672 .bind(&email.label)
673 .bind(email.is_primary as i32)
674 .execute(&self.pool)
675 .await
676 .map_err(CoreError::database)?;
677
678 Ok(ContactEmail {
679 id,
680 contact_id,
681 address: email.address,
682 label: email.label,
683 is_primary: email.is_primary,
684 })
685 }
686
687 #[tracing::instrument(skip_all)]
688 async fn remove_email(&self, email_id: ContactEmailId, user_id: UserId) -> Result<bool> {
689 // Verify ownership via JOIN
690 let result = sqlx::query(
691 r#"
692 DELETE FROM contact_emails
693 WHERE id = ? AND contact_id IN (SELECT id FROM contacts WHERE user_id = ?)
694 "#
695 )
696 .bind(email_id.to_string())
697 .bind(user_id.to_string())
698 .execute(&self.pool)
699 .await
700 .map_err(CoreError::database)?;
701
702 Ok(result.rows_affected() > 0)
703 }
704
705 #[tracing::instrument(skip_all)]
706 async fn add_phone(&self, contact_id: ContactId, user_id: UserId, phone: NewContactPhone) -> Result<ContactPhone> {
707 // Verify contact ownership
708 let exists = sqlx::query_scalar::<_, i32>(
709 "SELECT COUNT(*) FROM contacts WHERE id = ? AND user_id = ?"
710 )
711 .bind(contact_id.to_string())
712 .bind(user_id.to_string())
713 .fetch_one(&self.pool)
714 .await
715 .map_err(CoreError::database)?;
716
717 if exists == 0 {
718 return Err(CoreError::not_found("contact", contact_id));
719 }
720
721 let id = ContactPhoneId::new();
722 sqlx::query(
723 "INSERT INTO contact_phones (id, contact_id, number, label, is_primary) VALUES (?, ?, ?, ?, ?)"
724 )
725 .bind(id.to_string())
726 .bind(contact_id.to_string())
727 .bind(&phone.number)
728 .bind(&phone.label)
729 .bind(phone.is_primary as i32)
730 .execute(&self.pool)
731 .await
732 .map_err(CoreError::database)?;
733
734 Ok(ContactPhone {
735 id,
736 contact_id,
737 number: phone.number,
738 label: phone.label,
739 is_primary: phone.is_primary,
740 })
741 }
742
743 #[tracing::instrument(skip_all)]
744 async fn remove_phone(&self, phone_id: ContactPhoneId, user_id: UserId) -> Result<bool> {
745 let result = sqlx::query(
746 r#"
747 DELETE FROM contact_phones
748 WHERE id = ? AND contact_id IN (SELECT id FROM contacts WHERE user_id = ?)
749 "#
750 )
751 .bind(phone_id.to_string())
752 .bind(user_id.to_string())
753 .execute(&self.pool)
754 .await
755 .map_err(CoreError::database)?;
756
757 Ok(result.rows_affected() > 0)
758 }
759
760 #[tracing::instrument(skip_all)]
761 async fn add_social_handle(&self, contact_id: ContactId, user_id: UserId, handle: NewSocialHandle) -> Result<SocialHandle> {
762 // Verify contact ownership
763 let exists = sqlx::query_scalar::<_, i32>(
764 "SELECT COUNT(*) FROM contacts WHERE id = ? AND user_id = ?"
765 )
766 .bind(contact_id.to_string())
767 .bind(user_id.to_string())
768 .fetch_one(&self.pool)
769 .await
770 .map_err(CoreError::database)?;
771
772 if exists == 0 {
773 return Err(CoreError::not_found("contact", contact_id));
774 }
775
776 let id = SocialHandleId::new();
777 sqlx::query(
778 "INSERT INTO contact_social_handles (id, contact_id, platform, handle, url) VALUES (?, ?, ?, ?, ?)"
779 )
780 .bind(id.to_string())
781 .bind(contact_id.to_string())
782 .bind(&handle.platform)
783 .bind(&handle.handle)
784 .bind(&handle.url)
785 .execute(&self.pool)
786 .await
787 .map_err(CoreError::database)?;
788
789 Ok(SocialHandle {
790 id,
791 contact_id,
792 platform: handle.platform,
793 handle: handle.handle,
794 url: handle.url,
795 })
796 }
797
798 #[tracing::instrument(skip_all)]
799 async fn remove_social_handle(&self, handle_id: SocialHandleId, user_id: UserId) -> Result<bool> {
800 let result = sqlx::query(
801 r#"
802 DELETE FROM contact_social_handles
803 WHERE id = ? AND contact_id IN (SELECT id FROM contacts WHERE user_id = ?)
804 "#
805 )
806 .bind(handle_id.to_string())
807 .bind(user_id.to_string())
808 .execute(&self.pool)
809 .await
810 .map_err(CoreError::database)?;
811
812 Ok(result.rows_affected() > 0)
813 }
814
815 #[tracing::instrument(skip_all)]
816 async fn add_custom_field(&self, contact_id: ContactId, user_id: UserId, field: NewContactCustomField) -> Result<ContactCustomField> {
817 // Verify contact ownership
818 let exists = sqlx::query_scalar::<_, i32>(
819 "SELECT COUNT(*) FROM contacts WHERE id = ? AND user_id = ?"
820 )
821 .bind(contact_id.to_string())
822 .bind(user_id.to_string())
823 .fetch_one(&self.pool)
824 .await
825 .map_err(CoreError::database)?;
826
827 if exists == 0 {
828 return Err(CoreError::not_found("contact", contact_id));
829 }
830
831 let id = CustomFieldId::new();
832 sqlx::query(
833 "INSERT INTO contact_custom_fields (id, contact_id, label, value, url) VALUES (?, ?, ?, ?, ?)"
834 )
835 .bind(id.to_string())
836 .bind(contact_id.to_string())
837 .bind(&field.label)
838 .bind(&field.value)
839 .bind(&field.url)
840 .execute(&self.pool)
841 .await
842 .map_err(CoreError::database)?;
843
844 Ok(ContactCustomField {
845 id,
846 contact_id,
847 label: field.label,
848 value: field.value,
849 url: field.url,
850 })
851 }
852
853 #[tracing::instrument(skip_all)]
854 async fn remove_custom_field(&self, field_id: CustomFieldId, user_id: UserId) -> Result<bool> {
855 let result = sqlx::query(
856 r#"
857 DELETE FROM contact_custom_fields
858 WHERE id = ? AND contact_id IN (SELECT id FROM contacts WHERE user_id = ?)
859 "#
860 )
861 .bind(field_id.to_string())
862 .bind(user_id.to_string())
863 .execute(&self.pool)
864 .await
865 .map_err(CoreError::database)?;
866
867 Ok(result.rows_affected() > 0)
868 }
869
870 #[tracing::instrument(skip_all)]
871 async fn update_email(&self, email_id: ContactEmailId, user_id: UserId, email: NewContactEmail) -> Result<Option<ContactEmail>> {
872 let result = sqlx::query(
873 r#"
874 UPDATE contact_emails
875 SET address = ?, label = ?, is_primary = ?
876 WHERE id = ? AND contact_id IN (SELECT id FROM contacts WHERE user_id = ?)
877 "#
878 )
879 .bind(&email.address)
880 .bind(&email.label)
881 .bind(email.is_primary as i32)
882 .bind(email_id.to_string())
883 .bind(user_id.to_string())
884 .execute(&self.pool)
885 .await
886 .map_err(CoreError::database)?;
887
888 if result.rows_affected() == 0 {
889 return Ok(None);
890 }
891
892 let row = sqlx::query_as::<_, (String, String, String, i32)>(
893 "SELECT contact_id, address, label, is_primary FROM contact_emails WHERE id = ?"
894 )
895 .bind(email_id.to_string())
896 .fetch_one(&self.pool)
897 .await
898 .map_err(CoreError::database)?;
899
900 Ok(Some(ContactEmail {
901 id: email_id,
902 contact_id: crate::utils::parse_uuid(&row.0)?.into(),
903 address: row.1,
904 label: row.2,
905 is_primary: row.3 != 0,
906 }))
907 }
908
909 #[tracing::instrument(skip_all)]
910 async fn update_phone(&self, phone_id: ContactPhoneId, user_id: UserId, phone: NewContactPhone) -> Result<Option<ContactPhone>> {
911 let result = sqlx::query(
912 r#"
913 UPDATE contact_phones
914 SET number = ?, label = ?, is_primary = ?
915 WHERE id = ? AND contact_id IN (SELECT id FROM contacts WHERE user_id = ?)
916 "#
917 )
918 .bind(&phone.number)
919 .bind(&phone.label)
920 .bind(phone.is_primary as i32)
921 .bind(phone_id.to_string())
922 .bind(user_id.to_string())
923 .execute(&self.pool)
924 .await
925 .map_err(CoreError::database)?;
926
927 if result.rows_affected() == 0 {
928 return Ok(None);
929 }
930
931 let row = sqlx::query_as::<_, (String, String, String, i32)>(
932 "SELECT contact_id, number, label, is_primary FROM contact_phones WHERE id = ?"
933 )
934 .bind(phone_id.to_string())
935 .fetch_one(&self.pool)
936 .await
937 .map_err(CoreError::database)?;
938
939 Ok(Some(ContactPhone {
940 id: phone_id,
941 contact_id: crate::utils::parse_uuid(&row.0)?.into(),
942 number: row.1,
943 label: row.2,
944 is_primary: row.3 != 0,
945 }))
946 }
947
948 #[tracing::instrument(skip_all)]
949 async fn update_social_handle(&self, handle_id: SocialHandleId, user_id: UserId, handle: NewSocialHandle) -> Result<Option<SocialHandle>> {
950 let result = sqlx::query(
951 r#"
952 UPDATE contact_social_handles
953 SET platform = ?, handle = ?, url = ?
954 WHERE id = ? AND contact_id IN (SELECT id FROM contacts WHERE user_id = ?)
955 "#
956 )
957 .bind(&handle.platform)
958 .bind(&handle.handle)
959 .bind(&handle.url)
960 .bind(handle_id.to_string())
961 .bind(user_id.to_string())
962 .execute(&self.pool)
963 .await
964 .map_err(CoreError::database)?;
965
966 if result.rows_affected() == 0 {
967 return Ok(None);
968 }
969
970 let row = sqlx::query_as::<_, (String, String, String, Option<String>)>(
971 "SELECT contact_id, platform, handle, url FROM contact_social_handles WHERE id = ?"
972 )
973 .bind(handle_id.to_string())
974 .fetch_one(&self.pool)
975 .await
976 .map_err(CoreError::database)?;
977
978 Ok(Some(SocialHandle {
979 id: handle_id,
980 contact_id: crate::utils::parse_uuid(&row.0)?.into(),
981 platform: row.1,
982 handle: row.2,
983 url: row.3,
984 }))
985 }
986
987 #[tracing::instrument(skip_all)]
988 async fn update_custom_field(&self, field_id: CustomFieldId, user_id: UserId, field: NewContactCustomField) -> Result<Option<ContactCustomField>> {
989 let result = sqlx::query(
990 r#"
991 UPDATE contact_custom_fields
992 SET label = ?, value = ?, url = ?
993 WHERE id = ? AND contact_id IN (SELECT id FROM contacts WHERE user_id = ?)
994 "#
995 )
996 .bind(&field.label)
997 .bind(&field.value)
998 .bind(&field.url)
999 .bind(field_id.to_string())
1000 .bind(user_id.to_string())
1001 .execute(&self.pool)
1002 .await
1003 .map_err(CoreError::database)?;
1004
1005 if result.rows_affected() == 0 {
1006 return Ok(None);
1007 }
1008
1009 let row = sqlx::query_as::<_, (String, String, String, Option<String>)>(
1010 "SELECT contact_id, label, value, url FROM contact_custom_fields WHERE id = ?"
1011 )
1012 .bind(field_id.to_string())
1013 .fetch_one(&self.pool)
1014 .await
1015 .map_err(CoreError::database)?;
1016
1017 Ok(Some(ContactCustomField {
1018 id: field_id,
1019 contact_id: crate::utils::parse_uuid(&row.0)?.into(),
1020 label: row.1,
1021 value: row.2,
1022 url: row.3,
1023 }))
1024 }
1025 }
1026