Skip to main content

max / goingson

18.6 KB · 437 lines History Blame Raw
1 //! SQLite implementation of the EventRepository.
2 //!
3 //! Manages calendar events with support for:
4 //! - All-day and timed events
5 //! - Recurrence patterns
6 //! - Project associations
7 //! - Date range queries for dashboard views
8
9 use async_trait::async_trait;
10 use chrono::{DateTime, NaiveDate, Utc};
11 use sqlx::SqlitePool;
12 use goingson_core::{
13 BlockType, ContactId, CoreError, DbValue, Event, EventId, EventRepository, NewEvent, ParseableEnum,
14 ProjectId, Recurrence, RecurrenceRule, Result, TaskId, UpdateEvent, UserId,
15 };
16
17 use crate::utils::{format_datetime, format_datetime_opt, parse_datetime, parse_uuid, parse_uuid_opt};
18
19 /// Column list for SELECT queries - avoids duplication across methods.
20 const EVENT_SELECT_COLUMNS: &str = r#"e.id, e.user_id, e.project_id, p.name as project_name,
21 e.title, e.description, e.start_time, e.end_time, e.location,
22 e.linked_task_id, e.recurrence, e.recurrence_rule, e.recurrence_parent_id,
23 e.contact_id, ct.display_name as contact_name, e.block_type,
24 e.external_source, e.external_id, e.is_read_only, e.snoozed_until,
25 e.reminder_offsets_seconds"#;
26
27 #[derive(Debug, Clone, sqlx::FromRow)]
28 struct EventRow {
29 pub id: String,
30 pub user_id: Option<String>,
31 pub project_id: Option<String>,
32 pub project_name: Option<String>,
33 pub title: String,
34 pub description: String,
35 pub start_time: String,
36 pub end_time: Option<String>,
37 pub location: Option<String>,
38 pub linked_task_id: Option<String>,
39 pub recurrence: String,
40 pub recurrence_rule: Option<String>,
41 pub recurrence_parent_id: Option<String>,
42 pub contact_id: Option<String>,
43 pub contact_name: Option<String>,
44 pub block_type: Option<String>,
45 pub external_source: Option<String>,
46 pub external_id: Option<String>,
47 pub is_read_only: i32,
48 pub snoozed_until: Option<String>,
49 pub reminder_offsets_seconds: Option<String>,
50 }
51
52 impl TryFrom<EventRow> for Event {
53 type Error = CoreError;
54
55 fn try_from(row: EventRow) -> std::result::Result<Self, Self::Error> {
56 Ok(Event {
57 id: parse_uuid(&row.id)?.into(),
58 user_id: parse_uuid_opt(row.user_id.as_deref())?.map(Into::into),
59 project_id: parse_uuid_opt(row.project_id.as_deref())?.map(Into::into),
60 project_name: row.project_name,
61 title: row.title,
62 description: row.description,
63 start_time: parse_datetime(&row.start_time)?,
64 end_time: row.end_time.as_ref().map(|s| parse_datetime(s)).transpose()?,
65 location: row.location,
66 linked_task_id: parse_uuid_opt(row.linked_task_id.as_deref())?.map(Into::into),
67 recurrence: Recurrence::from_str_or_default(&row.recurrence),
68 recurrence_rule: row.recurrence_rule
69 .as_deref()
70 .and_then(|s| serde_json::from_str::<RecurrenceRule>(s).ok()),
71 recurrence_parent_id: parse_uuid_opt(row.recurrence_parent_id.as_deref())?.map(Into::into),
72 is_recurring_instance: false,
73 contact_id: parse_uuid_opt(row.contact_id.as_deref())?.map(Into::into),
74 contact_name: row.contact_name,
75 block_type: row.block_type.as_deref().and_then(BlockType::from_str_opt),
76 external_source: row.external_source,
77 external_id: row.external_id,
78 is_read_only: row.is_read_only != 0,
79 snoozed_until: row.snoozed_until.as_deref().map(parse_datetime).transpose()?,
80 reminder_offsets_seconds: row.reminder_offsets_seconds
81 .as_deref()
82 .and_then(|s| serde_json::from_str::<Vec<i64>>(s).ok())
83 .unwrap_or_default(),
84 })
85 }
86 }
87
88 /// SQLite-backed implementation of [`EventRepository`].
89 ///
90 /// Manages calendar events with date range queries optimized for
91 /// dashboard and day planning views.
92 pub struct SqliteEventRepository {
93 pool: SqlitePool,
94 }
95
96 impl SqliteEventRepository {
97 /// Creates a new repository instance with the given connection pool.
98 #[tracing::instrument(skip_all)]
99 pub fn new(pool: SqlitePool) -> Self {
100 Self { pool }
101 }
102 }
103
104 #[async_trait]
105 impl EventRepository for SqliteEventRepository {
106 #[tracing::instrument(skip_all)]
107 async fn list_all(&self, user_id: UserId) -> Result<Vec<Event>> {
108 let query = format!(
109 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? ORDER BY e.start_time ASC",
110 EVENT_SELECT_COLUMNS
111 );
112 let rows = sqlx::query_as::<_, EventRow>(&query)
113 .bind(user_id.to_string())
114 .bind(user_id.to_string())
115 .fetch_all(&self.pool)
116 .await
117 .map_err(CoreError::database)?;
118 rows.into_iter().map(Event::try_from).collect()
119 }
120
121 #[tracing::instrument(skip_all)]
122 async fn list_by_project(&self, user_id: UserId, project_id: ProjectId) -> Result<Vec<Event>> {
123 let query = format!(
124 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.project_id = ? ORDER BY e.start_time ASC",
125 EVENT_SELECT_COLUMNS
126 );
127 let rows = sqlx::query_as::<_, EventRow>(&query)
128 .bind(user_id.to_string())
129 .bind(user_id.to_string())
130 .bind(project_id.to_string())
131 .fetch_all(&self.pool)
132 .await
133 .map_err(CoreError::database)?;
134 rows.into_iter().map(Event::try_from).collect()
135 }
136
137 #[tracing::instrument(skip_all)]
138 async fn list_by_contact(&self, user_id: UserId, contact_id: ContactId) -> Result<Vec<Event>> {
139 let query = format!(
140 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.contact_id = ? ORDER BY e.start_time DESC",
141 EVENT_SELECT_COLUMNS
142 );
143 let rows = sqlx::query_as::<_, EventRow>(&query)
144 .bind(user_id.to_string())
145 .bind(user_id.to_string())
146 .bind(contact_id.to_string())
147 .fetch_all(&self.pool)
148 .await
149 .map_err(CoreError::database)?;
150 rows.into_iter().map(Event::try_from).collect()
151 }
152
153 #[tracing::instrument(skip_all)]
154 async fn get_by_id(&self, id: EventId, user_id: UserId) -> Result<Option<Event>> {
155 let query = format!(
156 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.id = ? AND e.user_id = ?",
157 EVENT_SELECT_COLUMNS
158 );
159 let row = sqlx::query_as::<_, EventRow>(&query)
160 .bind(user_id.to_string())
161 .bind(id.to_string())
162 .bind(user_id.to_string())
163 .fetch_optional(&self.pool)
164 .await
165 .map_err(CoreError::database)?;
166 row.map(Event::try_from).transpose()
167 }
168
169 #[tracing::instrument(skip_all)]
170 async fn create(&self, user_id: UserId, event: NewEvent) -> Result<Event> {
171 let id = EventId::new();
172 let start_str = format_datetime(&event.start_time);
173 let end_str = format_datetime_opt(event.end_time);
174
175 let recurrence_rule_json = event.recurrence_rule.as_ref()
176 .map(|r| serde_json::to_string(r).unwrap_or_default());
177 let reminder_offsets_json = if event.reminder_offsets_seconds.is_empty() {
178 None
179 } else {
180 Some(serde_json::to_string(&event.reminder_offsets_seconds).unwrap_or_default())
181 };
182
183 sqlx::query(
184 "INSERT INTO events (id, user_id, project_id, title, description, start_time, end_time, location, linked_task_id, recurrence, recurrence_rule, contact_id, block_type, reminder_offsets_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
185 )
186 .bind(id.to_string())
187 .bind(user_id.to_string())
188 .bind(event.project_id.map(|p| p.to_string()))
189 .bind(&event.title)
190 .bind(&event.description)
191 .bind(&start_str)
192 .bind(&end_str)
193 .bind(&event.location)
194 .bind(event.linked_task_id.map(|t| t.to_string()))
195 .bind(event.recurrence.db_value())
196 .bind(&recurrence_rule_json)
197 .bind(event.contact_id.map(|c| c.to_string()))
198 .bind(event.block_type.as_ref().map(|b| b.db_value()))
199 .bind(&reminder_offsets_json)
200 .execute(&self.pool)
201 .await
202 .map_err(CoreError::database)?;
203
204 self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created event"))
205 }
206
207 #[tracing::instrument(skip_all)]
208 async fn update(&self, id: EventId, user_id: UserId, event: UpdateEvent) -> Result<Option<Event>> {
209 let start_str = format_datetime(&event.start_time);
210 let end_str = format_datetime_opt(event.end_time);
211
212 let recurrence_rule_json = event.recurrence_rule.as_ref()
213 .map(|r| serde_json::to_string(r).unwrap_or_default());
214 let reminder_offsets_json = if event.reminder_offsets_seconds.is_empty() {
215 None
216 } else {
217 Some(serde_json::to_string(&event.reminder_offsets_seconds).unwrap_or_default())
218 };
219
220 let result = sqlx::query(
221 "UPDATE events SET project_id = ?, title = ?, description = ?, start_time = ?, end_time = ?, location = ?, linked_task_id = ?, recurrence = ?, recurrence_rule = ?, contact_id = ?, block_type = ?, reminder_offsets_seconds = ? WHERE id = ? AND user_id = ?",
222 )
223 .bind(event.project_id.map(|p| p.to_string()))
224 .bind(&event.title)
225 .bind(&event.description)
226 .bind(&start_str)
227 .bind(&end_str)
228 .bind(&event.location)
229 .bind(event.linked_task_id.map(|t| t.to_string()))
230 .bind(event.recurrence.db_value())
231 .bind(&recurrence_rule_json)
232 .bind(event.contact_id.map(|c| c.to_string()))
233 .bind(event.block_type.as_ref().map(|b| b.db_value()))
234 .bind(&reminder_offsets_json)
235 .bind(id.to_string())
236 .bind(user_id.to_string())
237 .execute(&self.pool)
238 .await
239 .map_err(CoreError::database)?;
240
241 if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) }
242 }
243
244 #[tracing::instrument(skip_all)]
245 async fn delete(&self, id: EventId, user_id: UserId) -> Result<bool> {
246 let result = sqlx::query("DELETE FROM events WHERE id = ? AND user_id = ?")
247 .bind(id.to_string())
248 .bind(user_id.to_string())
249 .execute(&self.pool)
250 .await
251 .map_err(CoreError::database)?;
252
253 Ok(result.rows_affected() > 0)
254 }
255
256 #[tracing::instrument(skip_all)]
257 async fn delete_many(&self, ids: &[EventId], user_id: UserId) -> Result<u64> {
258 if ids.is_empty() {
259 return Ok(0);
260 }
261 let user_id_str = user_id.to_string();
262 let placeholders = vec!["?"; ids.len()].join(",");
263 let sql = format!("DELETE FROM events WHERE user_id = ? AND id IN ({placeholders})");
264 let mut query = sqlx::query(&sql).bind(&user_id_str);
265 for id in ids {
266 query = query.bind(id.to_string());
267 }
268 let result = query.execute(&self.pool).await.map_err(CoreError::database)?;
269 Ok(result.rows_affected())
270 }
271
272 #[tracing::instrument(skip_all)]
273 async fn get_upcoming(&self, user_id: UserId, days: i64) -> Result<Vec<Event>> {
274 let query = format!(
275 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND datetime(e.start_time) >= datetime('now') AND datetime(e.start_time) <= datetime('now', ? || ' days') ORDER BY e.start_time ASC",
276 EVENT_SELECT_COLUMNS
277 );
278 let rows = sqlx::query_as::<_, EventRow>(&query)
279 .bind(user_id.to_string())
280 .bind(user_id.to_string())
281 .bind(format!("+{}", days))
282 .fetch_all(&self.pool)
283 .await
284 .map_err(CoreError::database)?;
285 rows.into_iter().map(Event::try_from).collect()
286 }
287
288 #[tracing::instrument(skip_all)]
289 async fn get_by_linked_task(&self, user_id: UserId, task_id: TaskId) -> Result<Option<Event>> {
290 let query = format!(
291 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.linked_task_id = ?",
292 EVENT_SELECT_COLUMNS
293 );
294 let row = sqlx::query_as::<_, EventRow>(&query)
295 .bind(user_id.to_string())
296 .bind(user_id.to_string())
297 .bind(task_id.to_string())
298 .fetch_optional(&self.pool)
299 .await
300 .map_err(CoreError::database)?;
301 row.map(Event::try_from).transpose()
302 }
303
304 #[tracing::instrument(skip_all)]
305 async fn delete_by_linked_task(&self, user_id: UserId, task_id: TaskId) -> Result<bool> {
306 let result = sqlx::query("DELETE FROM events WHERE user_id = ? AND linked_task_id = ?")
307 .bind(user_id.to_string())
308 .bind(task_id.to_string())
309 .execute(&self.pool)
310 .await
311 .map_err(CoreError::database)?;
312
313 Ok(result.rows_affected() > 0)
314 }
315
316 #[tracing::instrument(skip_all)]
317 async fn list_for_date(&self, user_id: UserId, date: NaiveDate) -> Result<Vec<Event>> {
318 let date_start = format!("{} 00:00:00", date);
319 let date_end = format!("{} 23:59:59", date);
320 let query = format!(
321 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND datetime(e.start_time) <= datetime(?) AND (e.end_time IS NULL OR datetime(e.end_time) >= datetime(?)) ORDER BY e.start_time ASC",
322 EVENT_SELECT_COLUMNS
323 );
324 let rows = sqlx::query_as::<_, EventRow>(&query)
325 .bind(user_id.to_string())
326 .bind(user_id.to_string())
327 .bind(&date_end)
328 .bind(&date_start)
329 .fetch_all(&self.pool)
330 .await
331 .map_err(CoreError::database)?;
332 rows.into_iter().map(Event::try_from).collect()
333 }
334
335 #[tracing::instrument(skip_all)]
336 async fn list_between(&self, user_id: UserId, start: chrono::DateTime<chrono::Utc>, end: chrono::DateTime<chrono::Utc>) -> Result<Vec<Event>> {
337 let start_str = format_datetime(&start);
338 let end_str = format_datetime(&end);
339 let query = format!(
340 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND datetime(e.start_time) <= datetime(?) AND (e.end_time IS NULL OR datetime(e.end_time) >= datetime(?)) ORDER BY e.start_time ASC",
341 EVENT_SELECT_COLUMNS
342 );
343 let rows = sqlx::query_as::<_, EventRow>(&query)
344 .bind(user_id.to_string())
345 .bind(user_id.to_string())
346 .bind(&end_str)
347 .bind(&start_str)
348 .fetch_all(&self.pool)
349 .await
350 .map_err(CoreError::database)?;
351 rows.into_iter().map(Event::try_from).collect()
352 }
353
354 #[tracing::instrument(skip_all)]
355 async fn list_recurring(&self, user_id: UserId) -> Result<Vec<Event>> {
356 let query = format!(
357 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND (e.recurrence != 'None' OR e.recurrence_rule IS NOT NULL) ORDER BY e.start_time ASC",
358 EVENT_SELECT_COLUMNS
359 );
360 let rows = sqlx::query_as::<_, EventRow>(&query)
361 .bind(user_id.to_string())
362 .bind(user_id.to_string())
363 .fetch_all(&self.pool)
364 .await
365 .map_err(CoreError::database)?;
366 rows.into_iter().map(Event::try_from).collect()
367 }
368
369 #[tracing::instrument(skip_all)]
370 async fn find_by_external_id(&self, source: &str, ext_id: &str, user_id: UserId) -> Result<Option<Event>> {
371 let query = format!(
372 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.external_source = ? AND e.external_id = ?",
373 EVENT_SELECT_COLUMNS
374 );
375 let row = sqlx::query_as::<_, EventRow>(&query)
376 .bind(user_id.to_string())
377 .bind(user_id.to_string())
378 .bind(source)
379 .bind(ext_id)
380 .fetch_optional(&self.pool)
381 .await
382 .map_err(CoreError::database)?;
383 row.map(Event::try_from).transpose()
384 }
385
386 #[tracing::instrument(skip_all)]
387 async fn snooze(&self, id: EventId, user_id: UserId, until: DateTime<Utc>) -> Result<Option<Event>> {
388 let until_str = format_datetime(&until);
389 let result = sqlx::query(
390 "UPDATE events SET snoozed_until = ? WHERE id = ? AND user_id = ?"
391 )
392 .bind(&until_str)
393 .bind(id.to_string())
394 .bind(user_id.to_string())
395 .execute(&self.pool)
396 .await
397 .map_err(CoreError::database)?;
398
399 if result.rows_affected() == 0 {
400 return Ok(None);
401 }
402 EventRepository::get_by_id(self, id, user_id).await
403 }
404
405 #[tracing::instrument(skip_all)]
406 async fn unsnooze(&self, id: EventId, user_id: UserId) -> Result<Option<Event>> {
407 let result = sqlx::query(
408 "UPDATE events SET snoozed_until = NULL WHERE id = ? AND user_id = ?"
409 )
410 .bind(id.to_string())
411 .bind(user_id.to_string())
412 .execute(&self.pool)
413 .await
414 .map_err(CoreError::database)?;
415
416 if result.rows_affected() == 0 {
417 return Ok(None);
418 }
419 EventRepository::get_by_id(self, id, user_id).await
420 }
421
422 #[tracing::instrument(skip_all)]
423 async fn list_snoozed(&self, user_id: UserId) -> Result<Vec<Event>> {
424 let query = format!(
425 "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.snoozed_until IS NOT NULL AND datetime(e.snoozed_until) > datetime('now') ORDER BY e.snoozed_until ASC",
426 EVENT_SELECT_COLUMNS
427 );
428 let rows = sqlx::query_as::<_, EventRow>(&query)
429 .bind(user_id.to_string())
430 .bind(user_id.to_string())
431 .fetch_all(&self.pool)
432 .await
433 .map_err(CoreError::database)?;
434 rows.into_iter().map(Event::try_from).collect()
435 }
436 }
437