//! SQLite implementation of the EventRepository. //! //! Manages calendar events with support for: //! - All-day and timed events //! - Recurrence patterns //! - Project associations //! - Date range queries for dashboard views use async_trait::async_trait; use chrono::{DateTime, NaiveDate, Utc}; use sqlx::SqlitePool; use goingson_core::{ BlockType, ContactId, CoreError, DbValue, Event, EventId, EventRepository, NewEvent, ParseableEnum, ProjectId, Recurrence, RecurrenceRule, Result, TaskId, UpdateEvent, UserId, }; use crate::utils::{format_datetime, format_datetime_opt, parse_datetime, parse_uuid, parse_uuid_opt}; /// Column list for SELECT queries - avoids duplication across methods. const EVENT_SELECT_COLUMNS: &str = r#"e.id, e.user_id, e.project_id, p.name as project_name, e.title, e.description, e.start_time, e.end_time, e.location, e.linked_task_id, e.recurrence, e.recurrence_rule, e.recurrence_parent_id, e.contact_id, ct.display_name as contact_name, e.block_type, e.external_source, e.external_id, e.is_read_only, e.snoozed_until, e.reminder_offsets_seconds"#; #[derive(Debug, Clone, sqlx::FromRow)] struct EventRow { pub id: String, pub user_id: Option, pub project_id: Option, pub project_name: Option, pub title: String, pub description: String, pub start_time: String, pub end_time: Option, pub location: Option, pub linked_task_id: Option, pub recurrence: String, pub recurrence_rule: Option, pub recurrence_parent_id: Option, pub contact_id: Option, pub contact_name: Option, pub block_type: Option, pub external_source: Option, pub external_id: Option, pub is_read_only: i32, pub snoozed_until: Option, pub reminder_offsets_seconds: Option, } impl TryFrom for Event { type Error = CoreError; fn try_from(row: EventRow) -> std::result::Result { Ok(Event { id: parse_uuid(&row.id)?.into(), user_id: parse_uuid_opt(row.user_id.as_deref())?.map(Into::into), project_id: parse_uuid_opt(row.project_id.as_deref())?.map(Into::into), project_name: row.project_name, title: row.title, description: row.description, start_time: parse_datetime(&row.start_time)?, end_time: row.end_time.as_ref().map(|s| parse_datetime(s)).transpose()?, location: row.location, linked_task_id: parse_uuid_opt(row.linked_task_id.as_deref())?.map(Into::into), recurrence: Recurrence::from_str_or_default(&row.recurrence), recurrence_rule: row.recurrence_rule .as_deref() .and_then(|s| serde_json::from_str::(s).ok()), recurrence_parent_id: parse_uuid_opt(row.recurrence_parent_id.as_deref())?.map(Into::into), is_recurring_instance: false, contact_id: parse_uuid_opt(row.contact_id.as_deref())?.map(Into::into), contact_name: row.contact_name, block_type: row.block_type.as_deref().and_then(BlockType::from_str_opt), external_source: row.external_source, external_id: row.external_id, is_read_only: row.is_read_only != 0, snoozed_until: row.snoozed_until.as_deref().map(parse_datetime).transpose()?, reminder_offsets_seconds: row.reminder_offsets_seconds .as_deref() .and_then(|s| serde_json::from_str::>(s).ok()) .unwrap_or_default(), }) } } /// SQLite-backed implementation of [`EventRepository`]. /// /// Manages calendar events with date range queries optimized for /// dashboard and day planning views. pub struct SqliteEventRepository { pool: SqlitePool, } impl SqliteEventRepository { /// Creates a new repository instance with the given connection pool. #[tracing::instrument(skip_all)] pub fn new(pool: SqlitePool) -> Self { Self { pool } } } #[async_trait] impl EventRepository for SqliteEventRepository { #[tracing::instrument(skip_all)] async fn list_all(&self, user_id: UserId) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? ORDER BY e.start_time ASC", EVENT_SELECT_COLUMNS ); let rows = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .fetch_all(&self.pool) .await .map_err(CoreError::database)?; rows.into_iter().map(Event::try_from).collect() } #[tracing::instrument(skip_all)] async fn list_by_project(&self, user_id: UserId, project_id: ProjectId) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.project_id = ? ORDER BY e.start_time ASC", EVENT_SELECT_COLUMNS ); let rows = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .bind(project_id.to_string()) .fetch_all(&self.pool) .await .map_err(CoreError::database)?; rows.into_iter().map(Event::try_from).collect() } #[tracing::instrument(skip_all)] async fn list_by_contact(&self, user_id: UserId, contact_id: ContactId) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.contact_id = ? ORDER BY e.start_time DESC", EVENT_SELECT_COLUMNS ); let rows = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .bind(contact_id.to_string()) .fetch_all(&self.pool) .await .map_err(CoreError::database)?; rows.into_iter().map(Event::try_from).collect() } #[tracing::instrument(skip_all)] async fn get_by_id(&self, id: EventId, user_id: UserId) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.id = ? AND e.user_id = ?", EVENT_SELECT_COLUMNS ); let row = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(id.to_string()) .bind(user_id.to_string()) .fetch_optional(&self.pool) .await .map_err(CoreError::database)?; row.map(Event::try_from).transpose() } #[tracing::instrument(skip_all)] async fn create(&self, user_id: UserId, event: NewEvent) -> Result { let id = EventId::new(); let start_str = format_datetime(&event.start_time); let end_str = format_datetime_opt(event.end_time); let recurrence_rule_json = event.recurrence_rule.as_ref() .map(|r| serde_json::to_string(r).unwrap_or_default()); let reminder_offsets_json = if event.reminder_offsets_seconds.is_empty() { None } else { Some(serde_json::to_string(&event.reminder_offsets_seconds).unwrap_or_default()) }; sqlx::query( "INSERT INTO events (id, user_id, project_id, title, description, start_time, end_time, location, linked_task_id, recurrence, recurrence_rule, contact_id, block_type, reminder_offsets_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(id.to_string()) .bind(user_id.to_string()) .bind(event.project_id.map(|p| p.to_string())) .bind(&event.title) .bind(&event.description) .bind(&start_str) .bind(&end_str) .bind(&event.location) .bind(event.linked_task_id.map(|t| t.to_string())) .bind(event.recurrence.db_value()) .bind(&recurrence_rule_json) .bind(event.contact_id.map(|c| c.to_string())) .bind(event.block_type.as_ref().map(|b| b.db_value())) .bind(&reminder_offsets_json) .execute(&self.pool) .await .map_err(CoreError::database)?; self.get_by_id(id, user_id).await?.ok_or_else(|| CoreError::internal("Failed to retrieve created event")) } #[tracing::instrument(skip_all)] async fn update(&self, id: EventId, user_id: UserId, event: UpdateEvent) -> Result> { let start_str = format_datetime(&event.start_time); let end_str = format_datetime_opt(event.end_time); let recurrence_rule_json = event.recurrence_rule.as_ref() .map(|r| serde_json::to_string(r).unwrap_or_default()); let reminder_offsets_json = if event.reminder_offsets_seconds.is_empty() { None } else { Some(serde_json::to_string(&event.reminder_offsets_seconds).unwrap_or_default()) }; let result = sqlx::query( "UPDATE events SET project_id = ?, title = ?, description = ?, start_time = ?, end_time = ?, location = ?, linked_task_id = ?, recurrence = ?, recurrence_rule = ?, contact_id = ?, block_type = ?, reminder_offsets_seconds = ? WHERE id = ? AND user_id = ?", ) .bind(event.project_id.map(|p| p.to_string())) .bind(&event.title) .bind(&event.description) .bind(&start_str) .bind(&end_str) .bind(&event.location) .bind(event.linked_task_id.map(|t| t.to_string())) .bind(event.recurrence.db_value()) .bind(&recurrence_rule_json) .bind(event.contact_id.map(|c| c.to_string())) .bind(event.block_type.as_ref().map(|b| b.db_value())) .bind(&reminder_offsets_json) .bind(id.to_string()) .bind(user_id.to_string()) .execute(&self.pool) .await .map_err(CoreError::database)?; if result.rows_affected() > 0 { self.get_by_id(id, user_id).await } else { Ok(None) } } #[tracing::instrument(skip_all)] async fn delete(&self, id: EventId, user_id: UserId) -> Result { let result = sqlx::query("DELETE FROM events WHERE id = ? AND user_id = ?") .bind(id.to_string()) .bind(user_id.to_string()) .execute(&self.pool) .await .map_err(CoreError::database)?; Ok(result.rows_affected() > 0) } #[tracing::instrument(skip_all)] async fn delete_many(&self, ids: &[EventId], user_id: UserId) -> Result { if ids.is_empty() { return Ok(0); } let user_id_str = user_id.to_string(); let placeholders = vec!["?"; ids.len()].join(","); let sql = format!("DELETE FROM events WHERE user_id = ? AND id IN ({placeholders})"); let mut query = sqlx::query(&sql).bind(&user_id_str); for id in ids { query = query.bind(id.to_string()); } let result = query.execute(&self.pool).await.map_err(CoreError::database)?; Ok(result.rows_affected()) } #[tracing::instrument(skip_all)] async fn get_upcoming(&self, user_id: UserId, days: i64) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND datetime(e.start_time) >= datetime('now') AND datetime(e.start_time) <= datetime('now', ? || ' days') ORDER BY e.start_time ASC", EVENT_SELECT_COLUMNS ); let rows = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .bind(format!("+{}", days)) .fetch_all(&self.pool) .await .map_err(CoreError::database)?; rows.into_iter().map(Event::try_from).collect() } #[tracing::instrument(skip_all)] async fn get_by_linked_task(&self, user_id: UserId, task_id: TaskId) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.linked_task_id = ?", EVENT_SELECT_COLUMNS ); let row = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .bind(task_id.to_string()) .fetch_optional(&self.pool) .await .map_err(CoreError::database)?; row.map(Event::try_from).transpose() } #[tracing::instrument(skip_all)] async fn delete_by_linked_task(&self, user_id: UserId, task_id: TaskId) -> Result { let result = sqlx::query("DELETE FROM events WHERE user_id = ? AND linked_task_id = ?") .bind(user_id.to_string()) .bind(task_id.to_string()) .execute(&self.pool) .await .map_err(CoreError::database)?; Ok(result.rows_affected() > 0) } #[tracing::instrument(skip_all)] async fn list_for_date(&self, user_id: UserId, date: NaiveDate) -> Result> { let date_start = format!("{} 00:00:00", date); let date_end = format!("{} 23:59:59", date); let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND datetime(e.start_time) <= datetime(?) AND (e.end_time IS NULL OR datetime(e.end_time) >= datetime(?)) ORDER BY e.start_time ASC", EVENT_SELECT_COLUMNS ); let rows = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .bind(&date_end) .bind(&date_start) .fetch_all(&self.pool) .await .map_err(CoreError::database)?; rows.into_iter().map(Event::try_from).collect() } #[tracing::instrument(skip_all)] async fn list_between(&self, user_id: UserId, start: chrono::DateTime, end: chrono::DateTime) -> Result> { let start_str = format_datetime(&start); let end_str = format_datetime(&end); let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND datetime(e.start_time) <= datetime(?) AND (e.end_time IS NULL OR datetime(e.end_time) >= datetime(?)) ORDER BY e.start_time ASC", EVENT_SELECT_COLUMNS ); let rows = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .bind(&end_str) .bind(&start_str) .fetch_all(&self.pool) .await .map_err(CoreError::database)?; rows.into_iter().map(Event::try_from).collect() } #[tracing::instrument(skip_all)] async fn list_recurring(&self, user_id: UserId) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND (e.recurrence != 'None' OR e.recurrence_rule IS NOT NULL) ORDER BY e.start_time ASC", EVENT_SELECT_COLUMNS ); let rows = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .fetch_all(&self.pool) .await .map_err(CoreError::database)?; rows.into_iter().map(Event::try_from).collect() } #[tracing::instrument(skip_all)] async fn find_by_external_id(&self, source: &str, ext_id: &str, user_id: UserId) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.external_source = ? AND e.external_id = ?", EVENT_SELECT_COLUMNS ); let row = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .bind(source) .bind(ext_id) .fetch_optional(&self.pool) .await .map_err(CoreError::database)?; row.map(Event::try_from).transpose() } #[tracing::instrument(skip_all)] async fn snooze(&self, id: EventId, user_id: UserId, until: DateTime) -> Result> { let until_str = format_datetime(&until); let result = sqlx::query( "UPDATE events SET snoozed_until = ? WHERE id = ? AND user_id = ?" ) .bind(&until_str) .bind(id.to_string()) .bind(user_id.to_string()) .execute(&self.pool) .await .map_err(CoreError::database)?; if result.rows_affected() == 0 { return Ok(None); } EventRepository::get_by_id(self, id, user_id).await } #[tracing::instrument(skip_all)] async fn unsnooze(&self, id: EventId, user_id: UserId) -> Result> { let result = sqlx::query( "UPDATE events SET snoozed_until = NULL WHERE id = ? AND user_id = ?" ) .bind(id.to_string()) .bind(user_id.to_string()) .execute(&self.pool) .await .map_err(CoreError::database)?; if result.rows_affected() == 0 { return Ok(None); } EventRepository::get_by_id(self, id, user_id).await } #[tracing::instrument(skip_all)] async fn list_snoozed(&self, user_id: UserId) -> Result> { let query = format!( "SELECT {} FROM events e LEFT JOIN projects p ON e.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = e.contact_id WHERE e.user_id = ? AND e.snoozed_until IS NOT NULL AND datetime(e.snoozed_until) > datetime('now') ORDER BY e.snoozed_until ASC", EVENT_SELECT_COLUMNS ); let rows = sqlx::query_as::<_, EventRow>(&query) .bind(user_id.to_string()) .bind(user_id.to_string()) .fetch_all(&self.pool) .await .map_err(CoreError::database)?; rows.into_iter().map(Event::try_from).collect() } }