Skip to main content

max / goingson

35.5 KB · 892 lines History Blame Raw
1 //! SQLite implementation of the TaskRepository.
2 //!
3 //! Manages tasks with full support for:
4 //! - Status tracking (pending, in_progress, completed, deleted)
5 //! - Priority and urgency calculations
6 //! - Due dates and recurrence patterns
7 //! - Annotations and subtasks (delegated to annotation_repo and subtask_repo)
8 //! - Snoozing and waiting-for-response states
9 //! - Day planning with scheduled time blocks
10
11 use async_trait::async_trait;
12 use chrono::{DateTime, NaiveDate, Utc};
13 use sqlx::SqlitePool;
14
15 use goingson_core::{
16 AnnotationId, Annotation, ContactId, CoreError, DbValue, MilestoneId, NewTask, ParseableEnum, Priority,
17 ProjectId, Recurrence, Result, SortDirection, SubtaskId, Subtask, Task, TaskFilterQuery,
18 TaskId, TaskRepository, TaskSortColumn, TaskStatus, TimeSession,
19 TimeTrackingSummary, UpdateTask, UserId,
20 };
21
22 use crate::utils::{format_datetime, format_datetime_now, format_datetime_opt, parse_datetime, parse_tags, parse_uuid, parse_uuid_opt};
23
24 use super::annotation_repo;
25 use super::subtask_repo;
26 use super::task_repo_state;
27 use super::time_session_repo;
28
29 /// Returns the SQL column expression for a [`TaskSortColumn`].
30 fn sort_column_sql(col: &TaskSortColumn) -> &'static str {
31 match col {
32 TaskSortColumn::Description => "t.description",
33 TaskSortColumn::Project => "p.name",
34 TaskSortColumn::Priority => "CASE t.priority WHEN 'High' THEN 3 WHEN 'Medium' THEN 2 WHEN 'Low' THEN 1 ELSE 0 END",
35 TaskSortColumn::Due => "t.due",
36 TaskSortColumn::Urgency => "t.urgency",
37 }
38 }
39
40 /// Returns whether NULLs should sort last for the given column.
41 fn sort_column_nulls_last(col: &TaskSortColumn) -> bool {
42 matches!(col, TaskSortColumn::Project | TaskSortColumn::Due)
43 }
44
45 /// Common SELECT columns for task queries with project JOIN.
46 ///
47 /// This constant ensures consistent column ordering across all task queries.
48 /// Usage: `format!("SELECT {} FROM tasks t LEFT JOIN projects p ON ...", TASK_SELECT_COLUMNS)`
49 pub(crate) const TASK_SELECT_COLUMNS: &str = r#"t.id, t.project_id, p.name as project_name,
50 t.contact_id, ct.display_name as contact_name,
51 t.milestone_id,
52 t.description, t.status,
53 t.priority, t.due, t.tags, t.urgency, t.recurrence, t.recurrence_rule, t.recurrence_parent_id, t.source_email_id,
54 t.snoozed_until, t.waiting_for_response, t.waiting_since, t.expected_response_date,
55 t.scheduled_start, t.scheduled_duration,
56 t.estimated_minutes, t.actual_minutes,
57 t.created_at, t.completed_at, t.is_focus, t.focus_set_at"#;
58
59 /// Row struct for task with project name from JOIN
60 #[derive(Debug, Clone, sqlx::FromRow)]
61 pub(crate) struct TaskRowWithProject {
62 pub id: String,
63 pub project_id: Option<String>,
64 pub project_name: Option<String>,
65 pub contact_id: Option<String>,
66 pub contact_name: Option<String>,
67 pub milestone_id: Option<String>,
68 pub description: String,
69 pub status: String,
70 pub priority: String,
71 pub due: Option<String>,
72 pub tags: String,
73 pub urgency: f64,
74 pub recurrence: String,
75 pub recurrence_rule: Option<String>,
76 pub recurrence_parent_id: Option<String>,
77 pub source_email_id: Option<String>,
78 pub snoozed_until: Option<String>,
79 pub waiting_for_response: i32,
80 pub waiting_since: Option<String>,
81 pub expected_response_date: Option<String>,
82 pub scheduled_start: Option<String>,
83 pub scheduled_duration: Option<i32>,
84 pub estimated_minutes: Option<i32>,
85 pub actual_minutes: i32,
86 pub created_at: String,
87 pub completed_at: Option<String>,
88 pub is_focus: i32,
89 pub focus_set_at: Option<String>,
90 }
91
92 impl TaskRowWithProject {
93 fn into_task(self, annotations: Vec<Annotation>, subtasks: Vec<Subtask>) -> Result<Task> {
94 Ok(Task {
95 id: parse_uuid(&self.id)?.into(),
96 project_id: parse_uuid_opt(self.project_id.as_deref())?.map(Into::into),
97 project_name: self.project_name,
98 contact_id: parse_uuid_opt(self.contact_id.as_deref())?.map(Into::into),
99 contact_name: self.contact_name,
100 milestone_id: parse_uuid_opt(self.milestone_id.as_deref())?.map(Into::into),
101 description: self.description,
102 status: TaskStatus::from_str_or_default(&self.status),
103 priority: Priority::from_str_or_default(&self.priority),
104 due: self.due.as_ref().map(|s| parse_datetime(s)).transpose()?,
105 tags: parse_tags(&self.tags),
106 urgency: self.urgency,
107 recurrence: Recurrence::from_str_or_default(&self.recurrence),
108 recurrence_rule: self.recurrence_rule
109 .as_deref()
110 .and_then(|s| serde_json::from_str(s).ok()),
111 recurrence_parent_id: parse_uuid_opt(self.recurrence_parent_id.as_deref())?.map(Into::into),
112 source_email_id: parse_uuid_opt(self.source_email_id.as_deref())?.map(Into::into),
113 snoozed_until: self.snoozed_until.as_ref().map(|s| parse_datetime(s)).transpose()?,
114 waiting_for_response: self.waiting_for_response != 0,
115 waiting_since: self.waiting_since.as_ref().map(|s| parse_datetime(s)).transpose()?,
116 expected_response_date: self.expected_response_date.as_ref().map(|s| parse_datetime(s)).transpose()?,
117 scheduled_start: self.scheduled_start.as_ref().map(|s| parse_datetime(s)).transpose()?,
118 scheduled_duration: self.scheduled_duration,
119 estimated_minutes: self.estimated_minutes,
120 actual_minutes: self.actual_minutes,
121 active_session: None,
122 annotations,
123 subtasks,
124 created_at: parse_datetime(&self.created_at)?,
125 completed_at: self.completed_at.as_ref().map(|s| parse_datetime(s)).transpose()?,
126 is_focus: self.is_focus != 0,
127 focus_set_at: self.focus_set_at.as_ref().map(|s| parse_datetime(s)).transpose()?,
128 })
129 }
130 }
131
132 /// SQLite-backed implementation of [`TaskRepository`].
133 ///
134 /// The most complex repository in the system, handling tasks with all their
135 /// related data (annotations, subtasks) and supporting advanced filtering,
136 /// sorting, and recurrence logic.
137 pub struct SqliteTaskRepository {
138 pool: SqlitePool,
139 }
140
141 impl SqliteTaskRepository {
142 #[tracing::instrument(skip_all)]
143 pub fn new(pool: SqlitePool) -> Self {
144 Self { pool }
145 }
146 }
147
148 /// Converts task rows to Task objects with annotations, subtasks, and active sessions.
149 ///
150 /// This helper encapsulates the common pattern of:
151 /// 1. Extracting task IDs from rows
152 /// 2. Batch-fetching annotations, subtasks, and active sessions for all tasks
153 /// 3. Converting each row to a Task with its related data
154 ///
155 /// Returns an empty vec if rows is empty (no database calls made).
156 pub(crate) async fn rows_to_tasks(pool: &SqlitePool, rows: Vec<TaskRowWithProject>) -> Result<Vec<Task>> {
157 if rows.is_empty() {
158 return Ok(vec![]);
159 }
160
161 let task_ids: Vec<String> = rows.iter().map(|r| r.id.clone()).collect();
162 let annotations_map = annotation_repo::get_annotations_for_tasks(pool, &task_ids).await?;
163 let subtasks_map = subtask_repo::get_subtasks_for_tasks(pool, &task_ids).await?;
164 let active_sessions = time_session_repo::get_active_sessions_for_tasks(pool, &task_ids).await?;
165
166 let mut tasks = Vec::with_capacity(rows.len());
167 for row in rows {
168 let id: TaskId = parse_uuid(&row.id)?.into();
169 let annotations = annotations_map.get(&id).cloned().unwrap_or_default();
170 let subtasks = subtasks_map.get(&id).cloned().unwrap_or_default();
171 let mut task = row.into_task(annotations, subtasks)?;
172 task.active_session = active_sessions.get(&id).cloned();
173 tasks.push(task);
174 }
175
176 Ok(tasks)
177 }
178
179 /// Fetch only the fields needed for update logic — avoids annotation/subtask/session sub-queries.
180 pub(crate) async fn get_task_update_context(pool: &SqlitePool, id: TaskId, user_id: UserId) -> Result<Option<goingson_core::models::TaskUpdateContext>> {
181 #[derive(sqlx::FromRow)]
182 struct Row {
183 created_at: String,
184 status: String,
185 completed_at: Option<String>,
186 scheduled_start: Option<String>,
187 scheduled_duration: Option<i32>,
188 }
189
190 let row = sqlx::query_as::<_, Row>(
191 "SELECT created_at, status, completed_at, scheduled_start, scheduled_duration FROM tasks WHERE id = ? AND user_id = ?"
192 )
193 .bind(id.to_string())
194 .bind(user_id.to_string())
195 .fetch_optional(pool)
196 .await
197 .map_err(CoreError::database)?;
198
199 match row {
200 Some(r) => Ok(Some(goingson_core::models::TaskUpdateContext {
201 created_at: parse_datetime(&r.created_at)?,
202 status: TaskStatus::from_str_or_default(&r.status),
203 completed_at: r.completed_at.as_ref().map(|s| parse_datetime(s)).transpose()?,
204 scheduled_start: r.scheduled_start.as_ref().map(|s| parse_datetime(s)).transpose()?,
205 scheduled_duration: r.scheduled_duration,
206 })),
207 None => Ok(None),
208 }
209 }
210
211 /// Fetch a single task by ID and user, with annotations and subtasks.
212 pub(crate) async fn get_task_by_id(pool: &SqlitePool, id: TaskId, user_id: UserId) -> Result<Option<Task>> {
213 let sql = format!(
214 r#"
215 SELECT {}
216 FROM tasks t
217 LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ?
218 LEFT JOIN contacts ct ON ct.id = t.contact_id
219 WHERE t.id = ? AND t.user_id = ?
220 "#,
221 TASK_SELECT_COLUMNS
222 );
223 let row = sqlx::query_as::<_, TaskRowWithProject>(&sql)
224 .bind(user_id.to_string())
225 .bind(id.to_string())
226 .bind(user_id.to_string())
227 .fetch_optional(pool)
228 .await
229 .map_err(CoreError::database)?;
230
231 match row {
232 Some(row) => {
233 let annotations = annotation_repo::get_annotations_for_task(pool, id).await?;
234 let subtasks = subtask_repo::get_subtasks_for_task(pool, id).await?;
235 let active_sessions = time_session_repo::get_active_sessions_for_tasks(pool, std::slice::from_ref(&row.id)).await?;
236 let mut task = row.into_task(annotations, subtasks)?;
237 task.active_session = active_sessions.get(&task.id).cloned();
238 Ok(Some(task))
239 }
240 None => Ok(None),
241 }
242 }
243
244 /// Run a task query with string bind parameters and convert rows to tasks.
245 ///
246 /// Handles the common pattern of: format SQL with TASK_SELECT_COLUMNS,
247 /// bind string params in order, fetch rows, convert via rows_to_tasks.
248 pub(crate) async fn query_tasks(pool: &SqlitePool, sql: &str, binds: &[String]) -> Result<Vec<Task>> {
249 let mut query = sqlx::query_as::<_, TaskRowWithProject>(sql);
250 for b in binds {
251 query = query.bind(b);
252 }
253 let rows = query.fetch_all(pool).await.map_err(CoreError::database)?;
254 rows_to_tasks(pool, rows).await
255 }
256
257 #[async_trait]
258 impl TaskRepository for SqliteTaskRepository {
259 #[tracing::instrument(skip_all)]
260 async fn list_all(&self, user_id: UserId) -> Result<Vec<Task>> {
261 let sql = format!(
262 r#"
263 SELECT {}
264 FROM tasks t
265 LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ?
266 LEFT JOIN contacts ct ON ct.id = t.contact_id
267 WHERE t.user_id = ? AND t.status != 'Deleted'
268 ORDER BY t.urgency DESC, t.created_at DESC
269 "#,
270 TASK_SELECT_COLUMNS
271 );
272 query_tasks(&self.pool, &sql, &[user_id.to_string(), user_id.to_string()]).await
273 }
274
275 #[tracing::instrument(skip_all)]
276 async fn list_by_project(&self, user_id: UserId, project_id: ProjectId) -> Result<Vec<Task>> {
277 let sql = format!(
278 r#"
279 SELECT {}
280 FROM tasks t
281 LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ?
282 LEFT JOIN contacts ct ON ct.id = t.contact_id
283 WHERE t.user_id = ? AND t.project_id = ? AND t.status != 'Deleted'
284 ORDER BY t.urgency DESC, t.created_at DESC
285 "#,
286 TASK_SELECT_COLUMNS
287 );
288 query_tasks(&self.pool, &sql, &[user_id.to_string(), user_id.to_string(), project_id.to_string()]).await
289 }
290
291 #[tracing::instrument(skip_all)]
292 async fn list_by_contact(&self, user_id: UserId, contact_id: ContactId) -> Result<Vec<Task>> {
293 let sql = format!(
294 r#"
295 SELECT {}
296 FROM tasks t
297 LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ?
298 LEFT JOIN contacts ct ON ct.id = t.contact_id
299 WHERE t.user_id = ? AND t.contact_id = ? AND t.status != 'Deleted'
300 ORDER BY t.created_at DESC
301 "#,
302 TASK_SELECT_COLUMNS
303 );
304 query_tasks(&self.pool, &sql, &[user_id.to_string(), user_id.to_string(), contact_id.to_string()]).await
305 }
306
307 #[tracing::instrument(skip_all)]
308 async fn list_filtered(&self, user_id: UserId, query: TaskFilterQuery) -> Result<(Vec<Task>, i64)> {
309 // Build dynamic WHERE clause
310 let mut conditions = vec!["t.user_id = ?".to_string(), "t.status != 'Deleted'".to_string()];
311 let mut bind_values: Vec<String> = vec![user_id.to_string()];
312
313 // Status filter
314 if let Some(ref status) = query.status {
315 conditions.push("t.status = ?".to_string());
316 bind_values.push(status.db_value().to_string());
317 }
318
319 // Project filter
320 if let Some(ref project_id) = query.project_id {
321 conditions.push("t.project_id = ?".to_string());
322 bind_values.push(project_id.to_string());
323 }
324
325 // Priority filter
326 if let Some(ref priority) = query.priority {
327 conditions.push("t.priority = ?".to_string());
328 bind_values.push(priority.db_value().to_string());
329 }
330
331 // Milestone filter
332 if let Some(ref milestone_id) = query.milestone_id {
333 conditions.push("t.milestone_id = ?".to_string());
334 bind_values.push(milestone_id.to_string());
335 }
336
337 // Snoozed filter - hide snoozed tasks unless explicitly requested
338 if !query.show_snoozed {
339 conditions.push("(t.snoozed_until IS NULL OR datetime(t.snoozed_until) <= datetime('now'))".to_string());
340 }
341
342 // Waiting only filter
343 if query.waiting_only {
344 conditions.push("t.waiting_for_response = 1".to_string());
345 }
346
347 let where_clause = conditions.join(" AND ");
348
349 // First, get total count for pagination
350 let count_sql = format!("SELECT COUNT(*) FROM tasks t WHERE {}", where_clause);
351 let mut count_query = sqlx::query_as::<_, (i64,)>(&count_sql);
352 for value in &bind_values {
353 count_query = count_query.bind(value);
354 }
355 let (total,): (i64,) = count_query.fetch_one(&self.pool).await.map_err(CoreError::database)?;
356
357 if total == 0 {
358 return Ok((vec![], 0));
359 }
360
361 // Build paginated query with parameterized LIMIT/OFFSET
362 let mut pagination_binds: Vec<i64> = Vec::new();
363 let pagination = match (query.limit, query.offset) {
364 (Some(limit), Some(offset)) => {
365 pagination_binds.push(limit);
366 pagination_binds.push(offset);
367 " LIMIT ? OFFSET ?".to_string()
368 }
369 (Some(limit), None) => {
370 pagination_binds.push(limit);
371 " LIMIT ?".to_string()
372 }
373 _ => String::new(),
374 };
375
376 // Build dynamic ORDER BY clause
377 let sort_column = query.sort_column.unwrap_or(TaskSortColumn::Urgency);
378 let sort_direction = query.sort_direction.unwrap_or_else(|| {
379 // Default to DESC for urgency (highest first), ASC for others
380 if sort_column == TaskSortColumn::Urgency {
381 SortDirection::Desc
382 } else {
383 SortDirection::Asc
384 }
385 });
386
387 let order_by = if sort_column_nulls_last(&sort_column) {
388 // For nullable columns (project, due), put NULLs last regardless of sort direction
389 format!(
390 "{} {} NULLS LAST, t.created_at DESC",
391 sort_column_sql(&sort_column),
392 sort_direction.sql()
393 )
394 } else {
395 format!(
396 "{} {}, t.created_at DESC",
397 sort_column_sql(&sort_column),
398 sort_direction.sql()
399 )
400 };
401
402 let sql = format!(
403 r#"
404 SELECT {}
405 FROM tasks t
406 LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ?
407 LEFT JOIN contacts ct ON ct.id = t.contact_id
408 WHERE {}
409 ORDER BY {}{}
410 "#,
411 TASK_SELECT_COLUMNS, where_clause, order_by, pagination
412 );
413
414 // Build query with dynamic bindings
415 let mut sqlx_query = sqlx::query_as::<_, TaskRowWithProject>(&sql);
416
417 // Bind user_id for the JOIN
418 sqlx_query = sqlx_query.bind(user_id.to_string());
419
420 // Bind all WHERE clause values
421 for value in bind_values {
422 sqlx_query = sqlx_query.bind(value);
423 }
424
425 // Bind LIMIT/OFFSET values
426 for value in pagination_binds {
427 sqlx_query = sqlx_query.bind(value);
428 }
429
430 let rows = sqlx_query
431 .fetch_all(&self.pool)
432 .await
433 .map_err(CoreError::database)?;
434
435 let tasks = rows_to_tasks(&self.pool, rows).await?;
436 Ok((tasks, total))
437 }
438
439 #[tracing::instrument(skip_all)]
440 async fn get_by_id(&self, id: TaskId, user_id: UserId) -> Result<Option<Task>> {
441 get_task_by_id(&self.pool, id, user_id).await
442 }
443
444 #[tracing::instrument(skip_all)]
445 async fn get_update_context(&self, id: TaskId, user_id: UserId) -> Result<Option<goingson_core::models::TaskUpdateContext>> {
446 get_task_update_context(&self.pool, id, user_id).await
447 }
448
449 #[tracing::instrument(skip_all)]
450 async fn create(&self, user_id: UserId, task: NewTask) -> Result<Task> {
451 let id = TaskId::new();
452 let now = format_datetime_now();
453 let due_str = format_datetime_opt(task.due);
454 let scheduled_start_str = format_datetime_opt(task.scheduled_start);
455 let tags_json = serde_json::to_string(&task.tags).unwrap_or_else(|_| "[]".to_string());
456
457 sqlx::query(
458 r#"
459 INSERT INTO tasks (id, user_id, project_id, contact_id, milestone_id, description, priority, due, tags, recurrence, recurrence_rule, urgency, source_email_id, scheduled_start, scheduled_duration, estimated_minutes, created_at)
460 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
461 "#,
462 )
463 .bind(id.to_string())
464 .bind(user_id.to_string())
465 .bind(task.project_id.map(|p| p.to_string()))
466 .bind(task.contact_id.map(|c| c.to_string()))
467 .bind(task.milestone_id.map(|m| m.to_string()))
468 .bind(&task.description)
469 .bind(task.priority.db_value())
470 .bind(&due_str)
471 .bind(&tags_json)
472 .bind(task.recurrence.db_value())
473 .bind(task.recurrence_rule.as_ref().map(|r| serde_json::to_string(r).unwrap_or_default()))
474 .bind(task.urgency)
475 .bind(task.source_email_id.map(|e| e.to_string()))
476 .bind(&scheduled_start_str)
477 .bind(task.scheduled_duration)
478 .bind(task.estimated_minutes)
479 .bind(&now)
480 .execute(&self.pool)
481 .await
482 .map_err(CoreError::database)?;
483
484 get_task_by_id(&self.pool, id, user_id)
485 .await?
486 .ok_or_else(|| CoreError::internal("Failed to retrieve created task"))
487 }
488
489 #[tracing::instrument(skip_all)]
490 async fn update(&self, id: TaskId, user_id: UserId, task: UpdateTask) -> Result<Option<Task>> {
491 let due_str = format_datetime_opt(task.due);
492 let scheduled_start_str = format_datetime_opt(task.scheduled_start);
493 let tags_json = serde_json::to_string(&task.tags).unwrap_or_else(|_| "[]".to_string());
494
495 // Set completed_at when transitioning to Completed, clear when leaving Completed.
496 // Uses lightweight query (no annotation/subtask/session sub-queries).
497 let completed_at_str: Option<String> = if task.status == TaskStatus::Completed {
498 let ctx = get_task_update_context(&self.pool, id, user_id).await?;
499 match ctx {
500 Some(ref c) if c.status == TaskStatus::Completed => {
501 c.completed_at.as_ref().map(format_datetime)
502 }
503 _ => Some(format_datetime_now()),
504 }
505 } else {
506 None
507 };
508
509 let result = sqlx::query(
510 r#"
511 UPDATE tasks
512 SET project_id = ?, contact_id = ?, milestone_id = ?, description = ?, status = ?, priority = ?, due = ?, tags = ?, recurrence = ?, urgency = ?, scheduled_start = ?, scheduled_duration = ?, estimated_minutes = ?, completed_at = ?
513 WHERE id = ? AND user_id = ?
514 "#,
515 )
516 .bind(task.project_id.map(|p| p.to_string()))
517 .bind(task.contact_id.map(|c| c.to_string()))
518 .bind(task.milestone_id.map(|m| m.to_string()))
519 .bind(&task.description)
520 .bind(task.status.db_value())
521 .bind(task.priority.db_value())
522 .bind(&due_str)
523 .bind(&tags_json)
524 .bind(task.recurrence.db_value())
525 .bind(task.urgency)
526 .bind(&scheduled_start_str)
527 .bind(task.scheduled_duration)
528 .bind(task.estimated_minutes)
529 .bind(&completed_at_str)
530 .bind(id.to_string())
531 .bind(user_id.to_string())
532 .execute(&self.pool)
533 .await
534 .map_err(CoreError::database)?;
535
536 if result.rows_affected() > 0 {
537 get_task_by_id(&self.pool, id, user_id).await
538 } else {
539 Ok(None)
540 }
541 }
542
543 #[tracing::instrument(skip_all)]
544 async fn delete(&self, id: TaskId, user_id: UserId) -> Result<bool> {
545 let result = sqlx::query("UPDATE tasks SET status = 'Deleted' WHERE id = ? AND user_id = ?")
546 .bind(id.to_string())
547 .bind(user_id.to_string())
548 .execute(&self.pool)
549 .await
550 .map_err(CoreError::database)?;
551
552 Ok(result.rows_affected() > 0)
553 }
554
555 #[tracing::instrument(skip_all)]
556 async fn start(&self, id: TaskId, user_id: UserId) -> Result<bool> {
557 let result = sqlx::query(
558 "UPDATE tasks SET status = 'Started' WHERE id = ? AND user_id = ? AND status = 'Pending'"
559 )
560 .bind(id.to_string())
561 .bind(user_id.to_string())
562 .execute(&self.pool)
563 .await
564 .map_err(CoreError::database)?;
565
566 Ok(result.rows_affected() > 0)
567 }
568
569 #[tracing::instrument(skip_all)]
570 async fn complete(&self, id: TaskId, user_id: UserId) -> Result<Option<Task>> {
571 let task = match get_task_by_id(&self.pool, id, user_id).await? {
572 Some(t) => t,
573 None => return Ok(None),
574 };
575
576 if task.status == TaskStatus::Completed {
577 return Ok(None);
578 }
579
580 let now = format_datetime_now();
581
582 let result = sqlx::query("UPDATE tasks SET status = 'Completed', completed_at = ? WHERE id = ? AND user_id = ?")
583 .bind(&now)
584 .bind(id.to_string())
585 .bind(user_id.to_string())
586 .execute(&self.pool)
587 .await
588 .map_err(CoreError::database)?;
589
590 if result.rows_affected() == 0 {
591 return Ok(None);
592 }
593
594 get_task_by_id(&self.pool, id, user_id).await
595 }
596
597 #[tracing::instrument(skip_all)]
598 async fn complete_recurring(&self, id: TaskId, user_id: UserId, next: Option<NewTask>) -> Result<(Option<Task>, Option<Task>)> {
599 let task = match get_task_by_id(&self.pool, id, user_id).await? {
600 Some(t) => t,
601 None => return Ok((None, None)),
602 };
603
604 if task.status == TaskStatus::Completed {
605 return Ok((None, None));
606 }
607
608 let mut tx = self.pool.begin().await.map_err(CoreError::database)?;
609
610 // Mark complete
611 let now = format_datetime_now();
612 sqlx::query("UPDATE tasks SET status = 'Completed', completed_at = ? WHERE id = ? AND user_id = ?")
613 .bind(&now)
614 .bind(id.to_string())
615 .bind(user_id.to_string())
616 .execute(&mut *tx)
617 .await
618 .map_err(CoreError::database)?;
619
620 // Create next recurring instance if provided
621 let next_id = if let Some(new_task) = &next {
622 let nid = TaskId::new();
623 let due_str = format_datetime_opt(new_task.due);
624 let scheduled_start_str = format_datetime_opt(new_task.scheduled_start);
625 let tags_json = serde_json::to_string(&new_task.tags).unwrap_or_else(|_| "[]".to_string());
626
627 sqlx::query(
628 r#"
629 INSERT INTO tasks (id, user_id, project_id, contact_id, milestone_id, description, priority, due, tags, recurrence, recurrence_rule, urgency, source_email_id, scheduled_start, scheduled_duration, estimated_minutes, recurrence_parent_id, created_at)
630 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
631 "#,
632 )
633 .bind(nid.to_string())
634 .bind(user_id.to_string())
635 .bind(new_task.project_id.map(|p| p.to_string()))
636 .bind(new_task.contact_id.map(|c| c.to_string()))
637 .bind(new_task.milestone_id.map(|m| m.to_string()))
638 .bind(&new_task.description)
639 .bind(new_task.priority.db_value())
640 .bind(&due_str)
641 .bind(&tags_json)
642 .bind(new_task.recurrence.db_value())
643 .bind(new_task.recurrence_rule.as_ref().map(|r| serde_json::to_string(r).unwrap_or_default()))
644 .bind(new_task.urgency)
645 .bind(new_task.source_email_id.map(|e| e.to_string()))
646 .bind(&scheduled_start_str)
647 .bind(new_task.scheduled_duration)
648 .bind(new_task.estimated_minutes)
649 .bind(new_task.recurrence_parent_id.map(|p| p.to_string()))
650 .bind(&now)
651 .execute(&mut *tx)
652 .await
653 .map_err(CoreError::database)?;
654
655 Some(nid)
656 } else {
657 None
658 };
659
660 tx.commit().await.map_err(CoreError::database)?;
661
662 // Fetch the completed task and new task (outside transaction, committed)
663 let completed = get_task_by_id(&self.pool, id, user_id).await?;
664 let next_task = match next_id {
665 Some(nid) => get_task_by_id(&self.pool, nid, user_id).await?,
666 None => None,
667 };
668
669 Ok((completed, next_task))
670 }
671
672 #[tracing::instrument(skip_all)]
673 async fn count_incomplete_by_milestone(&self, milestone_id: MilestoneId, user_id: UserId) -> Result<i64> {
674 let (count,): (i64,) = sqlx::query_as(
675 "SELECT COUNT(*) FROM tasks WHERE milestone_id = ? AND user_id = ? AND status != 'Deleted' AND status != 'Completed'"
676 )
677 .bind(milestone_id.to_string())
678 .bind(user_id.to_string())
679 .fetch_one(&self.pool)
680 .await
681 .map_err(CoreError::database)?;
682
683 Ok(count)
684 }
685
686 // ---- Annotations (delegated to annotation_repo) ----
687
688 #[tracing::instrument(skip_all)]
689 async fn get_annotations_for_task(&self, task_id: TaskId) -> Result<Vec<Annotation>> {
690 annotation_repo::get_annotations_for_task(&self.pool, task_id).await
691 }
692
693 #[tracing::instrument(skip_all)]
694 async fn add_annotation(&self, task_id: TaskId, user_id: UserId, note: &str) -> Result<Option<Annotation>> {
695 annotation_repo::add_annotation(&self.pool, task_id, user_id, note).await
696 }
697
698 #[tracing::instrument(skip_all)]
699 async fn delete_annotation(&self, annotation_id: AnnotationId, user_id: UserId) -> Result<bool> {
700 annotation_repo::delete_annotation(&self.pool, annotation_id, user_id).await
701 }
702
703 // ---- Subtasks (delegated to subtask_repo) ----
704
705 #[tracing::instrument(skip_all)]
706 async fn get_subtasks_for_task(&self, task_id: TaskId) -> Result<Vec<Subtask>> {
707 subtask_repo::get_subtasks_for_task(&self.pool, task_id).await
708 }
709
710 #[tracing::instrument(skip_all)]
711 async fn add_subtask(&self, task_id: TaskId, user_id: UserId, text: &str) -> Result<Option<Subtask>> {
712 subtask_repo::add_subtask(&self.pool, task_id, user_id, text).await
713 }
714
715 #[tracing::instrument(skip_all)]
716 async fn toggle_subtask(&self, subtask_id: SubtaskId, user_id: UserId) -> Result<Option<Subtask>> {
717 subtask_repo::toggle_subtask(&self.pool, subtask_id, user_id).await
718 }
719
720 #[tracing::instrument(skip_all)]
721 async fn update_subtask(&self, subtask_id: SubtaskId, user_id: UserId, text: &str) -> Result<Option<Subtask>> {
722 subtask_repo::update_subtask(&self.pool, subtask_id, user_id, text).await
723 }
724
725 #[tracing::instrument(skip_all)]
726 async fn delete_subtask(&self, subtask_id: SubtaskId, user_id: UserId) -> Result<bool> {
727 subtask_repo::delete_subtask(&self.pool, subtask_id, user_id).await
728 }
729
730 #[tracing::instrument(skip_all)]
731 async fn add_subtask_link(&self, task_id: TaskId, user_id: UserId, linked_task_id: TaskId) -> Result<Option<Subtask>> {
732 // Verify linked task exists and belongs to user
733 let linked_task = get_task_by_id(&self.pool, linked_task_id, user_id).await?
734 .ok_or_else(|| CoreError::not_found("linked task", linked_task_id.to_string()))?;
735
736 subtask_repo::add_subtask_link(
737 &self.pool,
738 task_id,
739 user_id,
740 linked_task_id,
741 &linked_task.description,
742 &linked_task.status,
743 ).await
744 }
745
746 // ---- Snooze (delegated to task_repo_state) ----
747
748 #[tracing::instrument(skip_all)]
749 async fn snooze(&self, id: TaskId, user_id: UserId, until: DateTime<Utc>) -> Result<Option<Task>> {
750 task_repo_state::snooze(&self.pool, id, user_id, until).await
751 }
752
753 #[tracing::instrument(skip_all)]
754 async fn unsnooze(&self, id: TaskId, user_id: UserId) -> Result<Option<Task>> {
755 task_repo_state::unsnooze(&self.pool, id, user_id).await
756 }
757
758 #[tracing::instrument(skip_all)]
759 async fn list_snoozed(&self, user_id: UserId) -> Result<Vec<Task>> {
760 task_repo_state::list_snoozed(&self.pool, user_id).await
761 }
762
763 // ---- Waiting (delegated to task_repo_state) ----
764
765 #[tracing::instrument(skip_all)]
766 async fn mark_waiting(&self, id: TaskId, user_id: UserId, expected_response: Option<DateTime<Utc>>) -> Result<Option<Task>> {
767 task_repo_state::mark_waiting(&self.pool, id, user_id, expected_response).await
768 }
769
770 #[tracing::instrument(skip_all)]
771 async fn clear_waiting(&self, id: TaskId, user_id: UserId) -> Result<Option<Task>> {
772 task_repo_state::clear_waiting(&self.pool, id, user_id).await
773 }
774
775 #[tracing::instrument(skip_all)]
776 async fn list_waiting(&self, user_id: UserId) -> Result<Vec<Task>> {
777 task_repo_state::list_waiting(&self.pool, user_id).await
778 }
779
780 // ---- Scheduling (delegated to task_repo_state) ----
781
782 #[tracing::instrument(skip_all)]
783 async fn list_scheduled_for_date(&self, user_id: UserId, date: NaiveDate) -> Result<Vec<Task>> {
784 task_repo_state::list_scheduled_for_date(&self.pool, user_id, date).await
785 }
786
787 #[tracing::instrument(skip_all)]
788 async fn list_unscheduled_due_on_date(&self, user_id: UserId, date: NaiveDate) -> Result<Vec<Task>> {
789 task_repo_state::list_unscheduled_due_on_date(&self.pool, user_id, date).await
790 }
791
792 #[tracing::instrument(skip_all)]
793 async fn update_schedule(&self, id: TaskId, user_id: UserId, start: Option<DateTime<Utc>>, duration: Option<i32>) -> Result<Option<Task>> {
794 task_repo_state::update_schedule(&self.pool, id, user_id, start, duration).await
795 }
796
797 // ---- Focus (delegated to task_repo_state) ----
798
799 #[tracing::instrument(skip_all)]
800 async fn set_focus(&self, id: TaskId, user_id: UserId, is_focus: bool) -> Result<Option<Task>> {
801 task_repo_state::set_focus(&self.pool, id, user_id, is_focus).await
802 }
803
804 #[tracing::instrument(skip_all)]
805 async fn list_focused(&self, user_id: UserId) -> Result<Vec<Task>> {
806 task_repo_state::list_focused(&self.pool, user_id).await
807 }
808
809 #[tracing::instrument(skip_all)]
810 async fn clear_all_focus(&self, user_id: UserId) -> Result<u64> {
811 task_repo_state::clear_all_focus(&self.pool, user_id).await
812 }
813
814 // ---- Reporting (delegated to task_repo_state) ----
815
816 #[tracing::instrument(skip_all)]
817 async fn list_completed_between(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Task>> {
818 task_repo_state::list_completed_between(&self.pool, user_id, start, end).await
819 }
820
821 #[tracing::instrument(skip_all)]
822 async fn list_became_overdue_between(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Task>> {
823 task_repo_state::list_became_overdue_between(&self.pool, user_id, start, end).await
824 }
825
826 #[tracing::instrument(skip_all)]
827 async fn list_due_between(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Task>> {
828 task_repo_state::list_due_between(&self.pool, user_id, start, end).await
829 }
830
831 #[tracing::instrument(skip_all)]
832 async fn list_created_between(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Task>> {
833 task_repo_state::list_created_between(&self.pool, user_id, start, end).await
834 }
835
836 #[tracing::instrument(skip_all)]
837 async fn list_available_for_focus(&self, user_id: UserId, limit: i64) -> Result<Vec<Task>> {
838 task_repo_state::list_available_for_focus(&self.pool, user_id, limit).await
839 }
840
841 // ---- Time Tracking (delegated to time_session_repo) ----
842
843 #[tracing::instrument(skip_all)]
844 async fn start_timer(&self, task_id: TaskId, user_id: UserId) -> Result<TimeSession> {
845 time_session_repo::start_timer(&self.pool, task_id, user_id).await
846 }
847
848 #[tracing::instrument(skip_all)]
849 async fn stop_timer(&self, task_id: TaskId, user_id: UserId) -> Result<Option<TimeSession>> {
850 time_session_repo::stop_timer(&self.pool, task_id, user_id).await
851 }
852
853 #[tracing::instrument(skip_all)]
854 async fn discard_timer(&self, task_id: TaskId, user_id: UserId) -> Result<bool> {
855 time_session_repo::discard_timer(&self.pool, task_id, user_id).await
856 }
857
858 #[tracing::instrument(skip_all)]
859 async fn get_active_timer(&self, user_id: UserId) -> Result<Option<(TimeSession, String)>> {
860 time_session_repo::get_active_timer(&self.pool, user_id).await
861 }
862
863 #[tracing::instrument(skip_all)]
864 async fn list_time_sessions(&self, task_id: TaskId, user_id: UserId) -> Result<Vec<TimeSession>> {
865 time_session_repo::list_time_sessions(&self.pool, task_id, user_id).await
866 }
867
868 #[tracing::instrument(skip_all)]
869 async fn log_manual_time(&self, task_id: TaskId, user_id: UserId, minutes: i32, date: DateTime<Utc>) -> Result<TimeSession> {
870 time_session_repo::log_manual_time(&self.pool, task_id, user_id, minutes, date).await
871 }
872
873 #[tracing::instrument(skip_all)]
874 async fn get_time_summary(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<TimeTrackingSummary>> {
875 time_session_repo::get_time_summary(&self.pool, user_id, start, end).await
876 }
877
878 #[tracing::instrument(skip_all)]
879 async fn list_recurrence_chain(&self, root_id: TaskId, user_id: UserId) -> Result<Vec<Task>> {
880 let sql = format!(
881 "SELECT {} FROM tasks t LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = t.contact_id WHERE (t.recurrence_parent_id = ? OR t.id = ?) AND t.user_id = ? ORDER BY t.created_at DESC",
882 TASK_SELECT_COLUMNS
883 );
884 query_tasks(&self.pool, &sql, &[
885 user_id.to_string(),
886 root_id.to_string(),
887 root_id.to_string(),
888 user_id.to_string(),
889 ]).await
890 }
891 }
892