| 1 |
|
| 2 |
|
| 3 |
|
| 4 |
|
| 5 |
|
| 6 |
|
| 7 |
|
| 8 |
|
| 9 |
|
| 10 |
|
| 11 |
use async_trait::async_trait; |
| 12 |
use chrono::{DateTime, NaiveDate, Utc}; |
| 13 |
use sqlx::SqlitePool; |
| 14 |
|
| 15 |
use goingson_core::{ |
| 16 |
AnnotationId, Annotation, ContactId, CoreError, DbValue, MilestoneId, NewTask, ParseableEnum, Priority, |
| 17 |
ProjectId, Recurrence, Result, SortDirection, SubtaskId, Subtask, Task, TaskFilterQuery, |
| 18 |
TaskId, TaskRepository, TaskSortColumn, TaskStatus, TimeSession, |
| 19 |
TimeTrackingSummary, UpdateTask, UserId, |
| 20 |
}; |
| 21 |
|
| 22 |
use crate::utils::{format_datetime, format_datetime_now, format_datetime_opt, parse_datetime, parse_tags, parse_uuid, parse_uuid_opt}; |
| 23 |
|
| 24 |
use super::annotation_repo; |
| 25 |
use super::subtask_repo; |
| 26 |
use super::task_repo_state; |
| 27 |
use super::time_session_repo; |
| 28 |
|
| 29 |
|
| 30 |
fn sort_column_sql(col: &TaskSortColumn) -> &'static str { |
| 31 |
match col { |
| 32 |
TaskSortColumn::Description => "t.description", |
| 33 |
TaskSortColumn::Project => "p.name", |
| 34 |
TaskSortColumn::Priority => "CASE t.priority WHEN 'High' THEN 3 WHEN 'Medium' THEN 2 WHEN 'Low' THEN 1 ELSE 0 END", |
| 35 |
TaskSortColumn::Due => "t.due", |
| 36 |
TaskSortColumn::Urgency => "t.urgency", |
| 37 |
} |
| 38 |
} |
| 39 |
|
| 40 |
|
| 41 |
fn sort_column_nulls_last(col: &TaskSortColumn) -> bool { |
| 42 |
matches!(col, TaskSortColumn::Project | TaskSortColumn::Due) |
| 43 |
} |
| 44 |
|
| 45 |
|
| 46 |
|
| 47 |
|
| 48 |
|
| 49 |
pub(crate) const TASK_SELECT_COLUMNS: &str = r#"t.id, t.project_id, p.name as project_name, |
| 50 |
t.contact_id, ct.display_name as contact_name, |
| 51 |
t.milestone_id, |
| 52 |
t.description, t.status, |
| 53 |
t.priority, t.due, t.tags, t.urgency, t.recurrence, t.recurrence_rule, t.recurrence_parent_id, t.source_email_id, |
| 54 |
t.snoozed_until, t.waiting_for_response, t.waiting_since, t.expected_response_date, |
| 55 |
t.scheduled_start, t.scheduled_duration, |
| 56 |
t.estimated_minutes, t.actual_minutes, |
| 57 |
t.created_at, t.completed_at, t.is_focus, t.focus_set_at"#; |
| 58 |
|
| 59 |
|
| 60 |
#[derive(Debug, Clone, sqlx::FromRow)] |
| 61 |
pub(crate) struct TaskRowWithProject { |
| 62 |
pub id: String, |
| 63 |
pub project_id: Option<String>, |
| 64 |
pub project_name: Option<String>, |
| 65 |
pub contact_id: Option<String>, |
| 66 |
pub contact_name: Option<String>, |
| 67 |
pub milestone_id: Option<String>, |
| 68 |
pub description: String, |
| 69 |
pub status: String, |
| 70 |
pub priority: String, |
| 71 |
pub due: Option<String>, |
| 72 |
pub tags: String, |
| 73 |
pub urgency: f64, |
| 74 |
pub recurrence: String, |
| 75 |
pub recurrence_rule: Option<String>, |
| 76 |
pub recurrence_parent_id: Option<String>, |
| 77 |
pub source_email_id: Option<String>, |
| 78 |
pub snoozed_until: Option<String>, |
| 79 |
pub waiting_for_response: i32, |
| 80 |
pub waiting_since: Option<String>, |
| 81 |
pub expected_response_date: Option<String>, |
| 82 |
pub scheduled_start: Option<String>, |
| 83 |
pub scheduled_duration: Option<i32>, |
| 84 |
pub estimated_minutes: Option<i32>, |
| 85 |
pub actual_minutes: i32, |
| 86 |
pub created_at: String, |
| 87 |
pub completed_at: Option<String>, |
| 88 |
pub is_focus: i32, |
| 89 |
pub focus_set_at: Option<String>, |
| 90 |
} |
| 91 |
|
| 92 |
impl TaskRowWithProject { |
| 93 |
fn into_task(self, annotations: Vec<Annotation>, subtasks: Vec<Subtask>) -> Result<Task> { |
| 94 |
Ok(Task { |
| 95 |
id: parse_uuid(&self.id)?.into(), |
| 96 |
project_id: parse_uuid_opt(self.project_id.as_deref())?.map(Into::into), |
| 97 |
project_name: self.project_name, |
| 98 |
contact_id: parse_uuid_opt(self.contact_id.as_deref())?.map(Into::into), |
| 99 |
contact_name: self.contact_name, |
| 100 |
milestone_id: parse_uuid_opt(self.milestone_id.as_deref())?.map(Into::into), |
| 101 |
description: self.description, |
| 102 |
status: TaskStatus::from_str_or_default(&self.status), |
| 103 |
priority: Priority::from_str_or_default(&self.priority), |
| 104 |
due: self.due.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 105 |
tags: parse_tags(&self.tags), |
| 106 |
urgency: self.urgency, |
| 107 |
recurrence: Recurrence::from_str_or_default(&self.recurrence), |
| 108 |
recurrence_rule: self.recurrence_rule |
| 109 |
.as_deref() |
| 110 |
.and_then(|s| serde_json::from_str(s).ok()), |
| 111 |
recurrence_parent_id: parse_uuid_opt(self.recurrence_parent_id.as_deref())?.map(Into::into), |
| 112 |
source_email_id: parse_uuid_opt(self.source_email_id.as_deref())?.map(Into::into), |
| 113 |
snoozed_until: self.snoozed_until.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 114 |
waiting_for_response: self.waiting_for_response != 0, |
| 115 |
waiting_since: self.waiting_since.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 116 |
expected_response_date: self.expected_response_date.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 117 |
scheduled_start: self.scheduled_start.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 118 |
scheduled_duration: self.scheduled_duration, |
| 119 |
estimated_minutes: self.estimated_minutes, |
| 120 |
actual_minutes: self.actual_minutes, |
| 121 |
active_session: None, |
| 122 |
annotations, |
| 123 |
subtasks, |
| 124 |
created_at: parse_datetime(&self.created_at)?, |
| 125 |
completed_at: self.completed_at.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 126 |
is_focus: self.is_focus != 0, |
| 127 |
focus_set_at: self.focus_set_at.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 128 |
}) |
| 129 |
} |
| 130 |
} |
| 131 |
|
| 132 |
|
| 133 |
|
| 134 |
|
| 135 |
|
| 136 |
|
| 137 |
pub struct SqliteTaskRepository { |
| 138 |
pool: SqlitePool, |
| 139 |
} |
| 140 |
|
| 141 |
impl SqliteTaskRepository { |
| 142 |
#[tracing::instrument(skip_all)] |
| 143 |
pub fn new(pool: SqlitePool) -> Self { |
| 144 |
Self { pool } |
| 145 |
} |
| 146 |
} |
| 147 |
|
| 148 |
|
| 149 |
|
| 150 |
|
| 151 |
|
| 152 |
|
| 153 |
|
| 154 |
|
| 155 |
|
| 156 |
pub(crate) async fn rows_to_tasks(pool: &SqlitePool, rows: Vec<TaskRowWithProject>) -> Result<Vec<Task>> { |
| 157 |
if rows.is_empty() { |
| 158 |
return Ok(vec![]); |
| 159 |
} |
| 160 |
|
| 161 |
let task_ids: Vec<String> = rows.iter().map(|r| r.id.clone()).collect(); |
| 162 |
let annotations_map = annotation_repo::get_annotations_for_tasks(pool, &task_ids).await?; |
| 163 |
let subtasks_map = subtask_repo::get_subtasks_for_tasks(pool, &task_ids).await?; |
| 164 |
let active_sessions = time_session_repo::get_active_sessions_for_tasks(pool, &task_ids).await?; |
| 165 |
|
| 166 |
let mut tasks = Vec::with_capacity(rows.len()); |
| 167 |
for row in rows { |
| 168 |
let id: TaskId = parse_uuid(&row.id)?.into(); |
| 169 |
let annotations = annotations_map.get(&id).cloned().unwrap_or_default(); |
| 170 |
let subtasks = subtasks_map.get(&id).cloned().unwrap_or_default(); |
| 171 |
let mut task = row.into_task(annotations, subtasks)?; |
| 172 |
task.active_session = active_sessions.get(&id).cloned(); |
| 173 |
tasks.push(task); |
| 174 |
} |
| 175 |
|
| 176 |
Ok(tasks) |
| 177 |
} |
| 178 |
|
| 179 |
|
| 180 |
pub(crate) async fn get_task_update_context(pool: &SqlitePool, id: TaskId, user_id: UserId) -> Result<Option<goingson_core::models::TaskUpdateContext>> { |
| 181 |
#[derive(sqlx::FromRow)] |
| 182 |
struct Row { |
| 183 |
created_at: String, |
| 184 |
status: String, |
| 185 |
completed_at: Option<String>, |
| 186 |
scheduled_start: Option<String>, |
| 187 |
scheduled_duration: Option<i32>, |
| 188 |
} |
| 189 |
|
| 190 |
let row = sqlx::query_as::<_, Row>( |
| 191 |
"SELECT created_at, status, completed_at, scheduled_start, scheduled_duration FROM tasks WHERE id = ? AND user_id = ?" |
| 192 |
) |
| 193 |
.bind(id.to_string()) |
| 194 |
.bind(user_id.to_string()) |
| 195 |
.fetch_optional(pool) |
| 196 |
.await |
| 197 |
.map_err(CoreError::database)?; |
| 198 |
|
| 199 |
match row { |
| 200 |
Some(r) => Ok(Some(goingson_core::models::TaskUpdateContext { |
| 201 |
created_at: parse_datetime(&r.created_at)?, |
| 202 |
status: TaskStatus::from_str_or_default(&r.status), |
| 203 |
completed_at: r.completed_at.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 204 |
scheduled_start: r.scheduled_start.as_ref().map(|s| parse_datetime(s)).transpose()?, |
| 205 |
scheduled_duration: r.scheduled_duration, |
| 206 |
})), |
| 207 |
None => Ok(None), |
| 208 |
} |
| 209 |
} |
| 210 |
|
| 211 |
|
| 212 |
pub(crate) async fn get_task_by_id(pool: &SqlitePool, id: TaskId, user_id: UserId) -> Result<Option<Task>> { |
| 213 |
let sql = format!( |
| 214 |
r#" |
| 215 |
SELECT {} |
| 216 |
FROM tasks t |
| 217 |
LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ? |
| 218 |
LEFT JOIN contacts ct ON ct.id = t.contact_id |
| 219 |
WHERE t.id = ? AND t.user_id = ? |
| 220 |
"#, |
| 221 |
TASK_SELECT_COLUMNS |
| 222 |
); |
| 223 |
let row = sqlx::query_as::<_, TaskRowWithProject>(&sql) |
| 224 |
.bind(user_id.to_string()) |
| 225 |
.bind(id.to_string()) |
| 226 |
.bind(user_id.to_string()) |
| 227 |
.fetch_optional(pool) |
| 228 |
.await |
| 229 |
.map_err(CoreError::database)?; |
| 230 |
|
| 231 |
match row { |
| 232 |
Some(row) => { |
| 233 |
let annotations = annotation_repo::get_annotations_for_task(pool, id).await?; |
| 234 |
let subtasks = subtask_repo::get_subtasks_for_task(pool, id).await?; |
| 235 |
let active_sessions = time_session_repo::get_active_sessions_for_tasks(pool, std::slice::from_ref(&row.id)).await?; |
| 236 |
let mut task = row.into_task(annotations, subtasks)?; |
| 237 |
task.active_session = active_sessions.get(&task.id).cloned(); |
| 238 |
Ok(Some(task)) |
| 239 |
} |
| 240 |
None => Ok(None), |
| 241 |
} |
| 242 |
} |
| 243 |
|
| 244 |
|
| 245 |
|
| 246 |
|
| 247 |
|
| 248 |
pub(crate) async fn query_tasks(pool: &SqlitePool, sql: &str, binds: &[String]) -> Result<Vec<Task>> { |
| 249 |
let mut query = sqlx::query_as::<_, TaskRowWithProject>(sql); |
| 250 |
for b in binds { |
| 251 |
query = query.bind(b); |
| 252 |
} |
| 253 |
let rows = query.fetch_all(pool).await.map_err(CoreError::database)?; |
| 254 |
rows_to_tasks(pool, rows).await |
| 255 |
} |
| 256 |
|
| 257 |
#[async_trait] |
| 258 |
impl TaskRepository for SqliteTaskRepository { |
| 259 |
#[tracing::instrument(skip_all)] |
| 260 |
async fn list_all(&self, user_id: UserId) -> Result<Vec<Task>> { |
| 261 |
let sql = format!( |
| 262 |
r#" |
| 263 |
SELECT {} |
| 264 |
FROM tasks t |
| 265 |
LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ? |
| 266 |
LEFT JOIN contacts ct ON ct.id = t.contact_id |
| 267 |
WHERE t.user_id = ? AND t.status != 'Deleted' |
| 268 |
ORDER BY t.urgency DESC, t.created_at DESC |
| 269 |
"#, |
| 270 |
TASK_SELECT_COLUMNS |
| 271 |
); |
| 272 |
query_tasks(&self.pool, &sql, &[user_id.to_string(), user_id.to_string()]).await |
| 273 |
} |
| 274 |
|
| 275 |
#[tracing::instrument(skip_all)] |
| 276 |
async fn list_by_project(&self, user_id: UserId, project_id: ProjectId) -> Result<Vec<Task>> { |
| 277 |
let sql = format!( |
| 278 |
r#" |
| 279 |
SELECT {} |
| 280 |
FROM tasks t |
| 281 |
LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ? |
| 282 |
LEFT JOIN contacts ct ON ct.id = t.contact_id |
| 283 |
WHERE t.user_id = ? AND t.project_id = ? AND t.status != 'Deleted' |
| 284 |
ORDER BY t.urgency DESC, t.created_at DESC |
| 285 |
"#, |
| 286 |
TASK_SELECT_COLUMNS |
| 287 |
); |
| 288 |
query_tasks(&self.pool, &sql, &[user_id.to_string(), user_id.to_string(), project_id.to_string()]).await |
| 289 |
} |
| 290 |
|
| 291 |
#[tracing::instrument(skip_all)] |
| 292 |
async fn list_by_contact(&self, user_id: UserId, contact_id: ContactId) -> Result<Vec<Task>> { |
| 293 |
let sql = format!( |
| 294 |
r#" |
| 295 |
SELECT {} |
| 296 |
FROM tasks t |
| 297 |
LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ? |
| 298 |
LEFT JOIN contacts ct ON ct.id = t.contact_id |
| 299 |
WHERE t.user_id = ? AND t.contact_id = ? AND t.status != 'Deleted' |
| 300 |
ORDER BY t.created_at DESC |
| 301 |
"#, |
| 302 |
TASK_SELECT_COLUMNS |
| 303 |
); |
| 304 |
query_tasks(&self.pool, &sql, &[user_id.to_string(), user_id.to_string(), contact_id.to_string()]).await |
| 305 |
} |
| 306 |
|
| 307 |
#[tracing::instrument(skip_all)] |
| 308 |
async fn list_filtered(&self, user_id: UserId, query: TaskFilterQuery) -> Result<(Vec<Task>, i64)> { |
| 309 |
|
| 310 |
let mut conditions = vec!["t.user_id = ?".to_string(), "t.status != 'Deleted'".to_string()]; |
| 311 |
let mut bind_values: Vec<String> = vec![user_id.to_string()]; |
| 312 |
|
| 313 |
|
| 314 |
if let Some(ref status) = query.status { |
| 315 |
conditions.push("t.status = ?".to_string()); |
| 316 |
bind_values.push(status.db_value().to_string()); |
| 317 |
} |
| 318 |
|
| 319 |
|
| 320 |
if let Some(ref project_id) = query.project_id { |
| 321 |
conditions.push("t.project_id = ?".to_string()); |
| 322 |
bind_values.push(project_id.to_string()); |
| 323 |
} |
| 324 |
|
| 325 |
|
| 326 |
if let Some(ref priority) = query.priority { |
| 327 |
conditions.push("t.priority = ?".to_string()); |
| 328 |
bind_values.push(priority.db_value().to_string()); |
| 329 |
} |
| 330 |
|
| 331 |
|
| 332 |
if let Some(ref milestone_id) = query.milestone_id { |
| 333 |
conditions.push("t.milestone_id = ?".to_string()); |
| 334 |
bind_values.push(milestone_id.to_string()); |
| 335 |
} |
| 336 |
|
| 337 |
|
| 338 |
if !query.show_snoozed { |
| 339 |
conditions.push("(t.snoozed_until IS NULL OR datetime(t.snoozed_until) <= datetime('now'))".to_string()); |
| 340 |
} |
| 341 |
|
| 342 |
|
| 343 |
if query.waiting_only { |
| 344 |
conditions.push("t.waiting_for_response = 1".to_string()); |
| 345 |
} |
| 346 |
|
| 347 |
let where_clause = conditions.join(" AND "); |
| 348 |
|
| 349 |
|
| 350 |
let count_sql = format!("SELECT COUNT(*) FROM tasks t WHERE {}", where_clause); |
| 351 |
let mut count_query = sqlx::query_as::<_, (i64,)>(&count_sql); |
| 352 |
for value in &bind_values { |
| 353 |
count_query = count_query.bind(value); |
| 354 |
} |
| 355 |
let (total,): (i64,) = count_query.fetch_one(&self.pool).await.map_err(CoreError::database)?; |
| 356 |
|
| 357 |
if total == 0 { |
| 358 |
return Ok((vec![], 0)); |
| 359 |
} |
| 360 |
|
| 361 |
|
| 362 |
let mut pagination_binds: Vec<i64> = Vec::new(); |
| 363 |
let pagination = match (query.limit, query.offset) { |
| 364 |
(Some(limit), Some(offset)) => { |
| 365 |
pagination_binds.push(limit); |
| 366 |
pagination_binds.push(offset); |
| 367 |
" LIMIT ? OFFSET ?".to_string() |
| 368 |
} |
| 369 |
(Some(limit), None) => { |
| 370 |
pagination_binds.push(limit); |
| 371 |
" LIMIT ?".to_string() |
| 372 |
} |
| 373 |
_ => String::new(), |
| 374 |
}; |
| 375 |
|
| 376 |
|
| 377 |
let sort_column = query.sort_column.unwrap_or(TaskSortColumn::Urgency); |
| 378 |
let sort_direction = query.sort_direction.unwrap_or_else(|| { |
| 379 |
|
| 380 |
if sort_column == TaskSortColumn::Urgency { |
| 381 |
SortDirection::Desc |
| 382 |
} else { |
| 383 |
SortDirection::Asc |
| 384 |
} |
| 385 |
}); |
| 386 |
|
| 387 |
let order_by = if sort_column_nulls_last(&sort_column) { |
| 388 |
|
| 389 |
format!( |
| 390 |
"{} {} NULLS LAST, t.created_at DESC", |
| 391 |
sort_column_sql(&sort_column), |
| 392 |
sort_direction.sql() |
| 393 |
) |
| 394 |
} else { |
| 395 |
format!( |
| 396 |
"{} {}, t.created_at DESC", |
| 397 |
sort_column_sql(&sort_column), |
| 398 |
sort_direction.sql() |
| 399 |
) |
| 400 |
}; |
| 401 |
|
| 402 |
let sql = format!( |
| 403 |
r#" |
| 404 |
SELECT {} |
| 405 |
FROM tasks t |
| 406 |
LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ? |
| 407 |
LEFT JOIN contacts ct ON ct.id = t.contact_id |
| 408 |
WHERE {} |
| 409 |
ORDER BY {}{} |
| 410 |
"#, |
| 411 |
TASK_SELECT_COLUMNS, where_clause, order_by, pagination |
| 412 |
); |
| 413 |
|
| 414 |
|
| 415 |
let mut sqlx_query = sqlx::query_as::<_, TaskRowWithProject>(&sql); |
| 416 |
|
| 417 |
|
| 418 |
sqlx_query = sqlx_query.bind(user_id.to_string()); |
| 419 |
|
| 420 |
|
| 421 |
for value in bind_values { |
| 422 |
sqlx_query = sqlx_query.bind(value); |
| 423 |
} |
| 424 |
|
| 425 |
|
| 426 |
for value in pagination_binds { |
| 427 |
sqlx_query = sqlx_query.bind(value); |
| 428 |
} |
| 429 |
|
| 430 |
let rows = sqlx_query |
| 431 |
.fetch_all(&self.pool) |
| 432 |
.await |
| 433 |
.map_err(CoreError::database)?; |
| 434 |
|
| 435 |
let tasks = rows_to_tasks(&self.pool, rows).await?; |
| 436 |
Ok((tasks, total)) |
| 437 |
} |
| 438 |
|
| 439 |
#[tracing::instrument(skip_all)] |
| 440 |
async fn get_by_id(&self, id: TaskId, user_id: UserId) -> Result<Option<Task>> { |
| 441 |
get_task_by_id(&self.pool, id, user_id).await |
| 442 |
} |
| 443 |
|
| 444 |
#[tracing::instrument(skip_all)] |
| 445 |
async fn get_update_context(&self, id: TaskId, user_id: UserId) -> Result<Option<goingson_core::models::TaskUpdateContext>> { |
| 446 |
get_task_update_context(&self.pool, id, user_id).await |
| 447 |
} |
| 448 |
|
| 449 |
#[tracing::instrument(skip_all)] |
| 450 |
async fn create(&self, user_id: UserId, task: NewTask) -> Result<Task> { |
| 451 |
let id = TaskId::new(); |
| 452 |
let now = format_datetime_now(); |
| 453 |
let due_str = format_datetime_opt(task.due); |
| 454 |
let scheduled_start_str = format_datetime_opt(task.scheduled_start); |
| 455 |
let tags_json = serde_json::to_string(&task.tags).unwrap_or_else(|_| "[]".to_string()); |
| 456 |
|
| 457 |
sqlx::query( |
| 458 |
r#" |
| 459 |
INSERT INTO tasks (id, user_id, project_id, contact_id, milestone_id, description, priority, due, tags, recurrence, recurrence_rule, urgency, source_email_id, scheduled_start, scheduled_duration, estimated_minutes, created_at) |
| 460 |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| 461 |
"#, |
| 462 |
) |
| 463 |
.bind(id.to_string()) |
| 464 |
.bind(user_id.to_string()) |
| 465 |
.bind(task.project_id.map(|p| p.to_string())) |
| 466 |
.bind(task.contact_id.map(|c| c.to_string())) |
| 467 |
.bind(task.milestone_id.map(|m| m.to_string())) |
| 468 |
.bind(&task.description) |
| 469 |
.bind(task.priority.db_value()) |
| 470 |
.bind(&due_str) |
| 471 |
.bind(&tags_json) |
| 472 |
.bind(task.recurrence.db_value()) |
| 473 |
.bind(task.recurrence_rule.as_ref().map(|r| serde_json::to_string(r).unwrap_or_default())) |
| 474 |
.bind(task.urgency) |
| 475 |
.bind(task.source_email_id.map(|e| e.to_string())) |
| 476 |
.bind(&scheduled_start_str) |
| 477 |
.bind(task.scheduled_duration) |
| 478 |
.bind(task.estimated_minutes) |
| 479 |
.bind(&now) |
| 480 |
.execute(&self.pool) |
| 481 |
.await |
| 482 |
.map_err(CoreError::database)?; |
| 483 |
|
| 484 |
get_task_by_id(&self.pool, id, user_id) |
| 485 |
.await? |
| 486 |
.ok_or_else(|| CoreError::internal("Failed to retrieve created task")) |
| 487 |
} |
| 488 |
|
| 489 |
#[tracing::instrument(skip_all)] |
| 490 |
async fn update(&self, id: TaskId, user_id: UserId, task: UpdateTask) -> Result<Option<Task>> { |
| 491 |
let due_str = format_datetime_opt(task.due); |
| 492 |
let scheduled_start_str = format_datetime_opt(task.scheduled_start); |
| 493 |
let tags_json = serde_json::to_string(&task.tags).unwrap_or_else(|_| "[]".to_string()); |
| 494 |
|
| 495 |
|
| 496 |
|
| 497 |
let completed_at_str: Option<String> = if task.status == TaskStatus::Completed { |
| 498 |
let ctx = get_task_update_context(&self.pool, id, user_id).await?; |
| 499 |
match ctx { |
| 500 |
Some(ref c) if c.status == TaskStatus::Completed => { |
| 501 |
c.completed_at.as_ref().map(format_datetime) |
| 502 |
} |
| 503 |
_ => Some(format_datetime_now()), |
| 504 |
} |
| 505 |
} else { |
| 506 |
None |
| 507 |
}; |
| 508 |
|
| 509 |
let result = sqlx::query( |
| 510 |
r#" |
| 511 |
UPDATE tasks |
| 512 |
SET project_id = ?, contact_id = ?, milestone_id = ?, description = ?, status = ?, priority = ?, due = ?, tags = ?, recurrence = ?, urgency = ?, scheduled_start = ?, scheduled_duration = ?, estimated_minutes = ?, completed_at = ? |
| 513 |
WHERE id = ? AND user_id = ? |
| 514 |
"#, |
| 515 |
) |
| 516 |
.bind(task.project_id.map(|p| p.to_string())) |
| 517 |
.bind(task.contact_id.map(|c| c.to_string())) |
| 518 |
.bind(task.milestone_id.map(|m| m.to_string())) |
| 519 |
.bind(&task.description) |
| 520 |
.bind(task.status.db_value()) |
| 521 |
.bind(task.priority.db_value()) |
| 522 |
.bind(&due_str) |
| 523 |
.bind(&tags_json) |
| 524 |
.bind(task.recurrence.db_value()) |
| 525 |
.bind(task.urgency) |
| 526 |
.bind(&scheduled_start_str) |
| 527 |
.bind(task.scheduled_duration) |
| 528 |
.bind(task.estimated_minutes) |
| 529 |
.bind(&completed_at_str) |
| 530 |
.bind(id.to_string()) |
| 531 |
.bind(user_id.to_string()) |
| 532 |
.execute(&self.pool) |
| 533 |
.await |
| 534 |
.map_err(CoreError::database)?; |
| 535 |
|
| 536 |
if result.rows_affected() > 0 { |
| 537 |
get_task_by_id(&self.pool, id, user_id).await |
| 538 |
} else { |
| 539 |
Ok(None) |
| 540 |
} |
| 541 |
} |
| 542 |
|
| 543 |
#[tracing::instrument(skip_all)] |
| 544 |
async fn delete(&self, id: TaskId, user_id: UserId) -> Result<bool> { |
| 545 |
let result = sqlx::query("UPDATE tasks SET status = 'Deleted' WHERE id = ? AND user_id = ?") |
| 546 |
.bind(id.to_string()) |
| 547 |
.bind(user_id.to_string()) |
| 548 |
.execute(&self.pool) |
| 549 |
.await |
| 550 |
.map_err(CoreError::database)?; |
| 551 |
|
| 552 |
Ok(result.rows_affected() > 0) |
| 553 |
} |
| 554 |
|
| 555 |
#[tracing::instrument(skip_all)] |
| 556 |
async fn start(&self, id: TaskId, user_id: UserId) -> Result<bool> { |
| 557 |
let result = sqlx::query( |
| 558 |
"UPDATE tasks SET status = 'Started' WHERE id = ? AND user_id = ? AND status = 'Pending'" |
| 559 |
) |
| 560 |
.bind(id.to_string()) |
| 561 |
.bind(user_id.to_string()) |
| 562 |
.execute(&self.pool) |
| 563 |
.await |
| 564 |
.map_err(CoreError::database)?; |
| 565 |
|
| 566 |
Ok(result.rows_affected() > 0) |
| 567 |
} |
| 568 |
|
| 569 |
#[tracing::instrument(skip_all)] |
| 570 |
async fn complete(&self, id: TaskId, user_id: UserId) -> Result<Option<Task>> { |
| 571 |
let task = match get_task_by_id(&self.pool, id, user_id).await? { |
| 572 |
Some(t) => t, |
| 573 |
None => return Ok(None), |
| 574 |
}; |
| 575 |
|
| 576 |
if task.status == TaskStatus::Completed { |
| 577 |
return Ok(None); |
| 578 |
} |
| 579 |
|
| 580 |
let now = format_datetime_now(); |
| 581 |
|
| 582 |
let result = sqlx::query("UPDATE tasks SET status = 'Completed', completed_at = ? WHERE id = ? AND user_id = ?") |
| 583 |
.bind(&now) |
| 584 |
.bind(id.to_string()) |
| 585 |
.bind(user_id.to_string()) |
| 586 |
.execute(&self.pool) |
| 587 |
.await |
| 588 |
.map_err(CoreError::database)?; |
| 589 |
|
| 590 |
if result.rows_affected() == 0 { |
| 591 |
return Ok(None); |
| 592 |
} |
| 593 |
|
| 594 |
get_task_by_id(&self.pool, id, user_id).await |
| 595 |
} |
| 596 |
|
| 597 |
#[tracing::instrument(skip_all)] |
| 598 |
async fn complete_recurring(&self, id: TaskId, user_id: UserId, next: Option<NewTask>) -> Result<(Option<Task>, Option<Task>)> { |
| 599 |
let task = match get_task_by_id(&self.pool, id, user_id).await? { |
| 600 |
Some(t) => t, |
| 601 |
None => return Ok((None, None)), |
| 602 |
}; |
| 603 |
|
| 604 |
if task.status == TaskStatus::Completed { |
| 605 |
return Ok((None, None)); |
| 606 |
} |
| 607 |
|
| 608 |
let mut tx = self.pool.begin().await.map_err(CoreError::database)?; |
| 609 |
|
| 610 |
|
| 611 |
let now = format_datetime_now(); |
| 612 |
sqlx::query("UPDATE tasks SET status = 'Completed', completed_at = ? WHERE id = ? AND user_id = ?") |
| 613 |
.bind(&now) |
| 614 |
.bind(id.to_string()) |
| 615 |
.bind(user_id.to_string()) |
| 616 |
.execute(&mut *tx) |
| 617 |
.await |
| 618 |
.map_err(CoreError::database)?; |
| 619 |
|
| 620 |
|
| 621 |
let next_id = if let Some(new_task) = &next { |
| 622 |
let nid = TaskId::new(); |
| 623 |
let due_str = format_datetime_opt(new_task.due); |
| 624 |
let scheduled_start_str = format_datetime_opt(new_task.scheduled_start); |
| 625 |
let tags_json = serde_json::to_string(&new_task.tags).unwrap_or_else(|_| "[]".to_string()); |
| 626 |
|
| 627 |
sqlx::query( |
| 628 |
r#" |
| 629 |
INSERT INTO tasks (id, user_id, project_id, contact_id, milestone_id, description, priority, due, tags, recurrence, recurrence_rule, urgency, source_email_id, scheduled_start, scheduled_duration, estimated_minutes, recurrence_parent_id, created_at) |
| 630 |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| 631 |
"#, |
| 632 |
) |
| 633 |
.bind(nid.to_string()) |
| 634 |
.bind(user_id.to_string()) |
| 635 |
.bind(new_task.project_id.map(|p| p.to_string())) |
| 636 |
.bind(new_task.contact_id.map(|c| c.to_string())) |
| 637 |
.bind(new_task.milestone_id.map(|m| m.to_string())) |
| 638 |
.bind(&new_task.description) |
| 639 |
.bind(new_task.priority.db_value()) |
| 640 |
.bind(&due_str) |
| 641 |
.bind(&tags_json) |
| 642 |
.bind(new_task.recurrence.db_value()) |
| 643 |
.bind(new_task.recurrence_rule.as_ref().map(|r| serde_json::to_string(r).unwrap_or_default())) |
| 644 |
.bind(new_task.urgency) |
| 645 |
.bind(new_task.source_email_id.map(|e| e.to_string())) |
| 646 |
.bind(&scheduled_start_str) |
| 647 |
.bind(new_task.scheduled_duration) |
| 648 |
.bind(new_task.estimated_minutes) |
| 649 |
.bind(new_task.recurrence_parent_id.map(|p| p.to_string())) |
| 650 |
.bind(&now) |
| 651 |
.execute(&mut *tx) |
| 652 |
.await |
| 653 |
.map_err(CoreError::database)?; |
| 654 |
|
| 655 |
Some(nid) |
| 656 |
} else { |
| 657 |
None |
| 658 |
}; |
| 659 |
|
| 660 |
tx.commit().await.map_err(CoreError::database)?; |
| 661 |
|
| 662 |
|
| 663 |
let completed = get_task_by_id(&self.pool, id, user_id).await?; |
| 664 |
let next_task = match next_id { |
| 665 |
Some(nid) => get_task_by_id(&self.pool, nid, user_id).await?, |
| 666 |
None => None, |
| 667 |
}; |
| 668 |
|
| 669 |
Ok((completed, next_task)) |
| 670 |
} |
| 671 |
|
| 672 |
#[tracing::instrument(skip_all)] |
| 673 |
async fn count_incomplete_by_milestone(&self, milestone_id: MilestoneId, user_id: UserId) -> Result<i64> { |
| 674 |
let (count,): (i64,) = sqlx::query_as( |
| 675 |
"SELECT COUNT(*) FROM tasks WHERE milestone_id = ? AND user_id = ? AND status != 'Deleted' AND status != 'Completed'" |
| 676 |
) |
| 677 |
.bind(milestone_id.to_string()) |
| 678 |
.bind(user_id.to_string()) |
| 679 |
.fetch_one(&self.pool) |
| 680 |
.await |
| 681 |
.map_err(CoreError::database)?; |
| 682 |
|
| 683 |
Ok(count) |
| 684 |
} |
| 685 |
|
| 686 |
|
| 687 |
|
| 688 |
#[tracing::instrument(skip_all)] |
| 689 |
async fn get_annotations_for_task(&self, task_id: TaskId) -> Result<Vec<Annotation>> { |
| 690 |
annotation_repo::get_annotations_for_task(&self.pool, task_id).await |
| 691 |
} |
| 692 |
|
| 693 |
#[tracing::instrument(skip_all)] |
| 694 |
async fn add_annotation(&self, task_id: TaskId, user_id: UserId, note: &str) -> Result<Option<Annotation>> { |
| 695 |
annotation_repo::add_annotation(&self.pool, task_id, user_id, note).await |
| 696 |
} |
| 697 |
|
| 698 |
#[tracing::instrument(skip_all)] |
| 699 |
async fn delete_annotation(&self, annotation_id: AnnotationId, user_id: UserId) -> Result<bool> { |
| 700 |
annotation_repo::delete_annotation(&self.pool, annotation_id, user_id).await |
| 701 |
} |
| 702 |
|
| 703 |
|
| 704 |
|
| 705 |
#[tracing::instrument(skip_all)] |
| 706 |
async fn get_subtasks_for_task(&self, task_id: TaskId) -> Result<Vec<Subtask>> { |
| 707 |
subtask_repo::get_subtasks_for_task(&self.pool, task_id).await |
| 708 |
} |
| 709 |
|
| 710 |
#[tracing::instrument(skip_all)] |
| 711 |
async fn add_subtask(&self, task_id: TaskId, user_id: UserId, text: &str) -> Result<Option<Subtask>> { |
| 712 |
subtask_repo::add_subtask(&self.pool, task_id, user_id, text).await |
| 713 |
} |
| 714 |
|
| 715 |
#[tracing::instrument(skip_all)] |
| 716 |
async fn toggle_subtask(&self, subtask_id: SubtaskId, user_id: UserId) -> Result<Option<Subtask>> { |
| 717 |
subtask_repo::toggle_subtask(&self.pool, subtask_id, user_id).await |
| 718 |
} |
| 719 |
|
| 720 |
#[tracing::instrument(skip_all)] |
| 721 |
async fn update_subtask(&self, subtask_id: SubtaskId, user_id: UserId, text: &str) -> Result<Option<Subtask>> { |
| 722 |
subtask_repo::update_subtask(&self.pool, subtask_id, user_id, text).await |
| 723 |
} |
| 724 |
|
| 725 |
#[tracing::instrument(skip_all)] |
| 726 |
async fn delete_subtask(&self, subtask_id: SubtaskId, user_id: UserId) -> Result<bool> { |
| 727 |
subtask_repo::delete_subtask(&self.pool, subtask_id, user_id).await |
| 728 |
} |
| 729 |
|
| 730 |
#[tracing::instrument(skip_all)] |
| 731 |
async fn add_subtask_link(&self, task_id: TaskId, user_id: UserId, linked_task_id: TaskId) -> Result<Option<Subtask>> { |
| 732 |
|
| 733 |
let linked_task = get_task_by_id(&self.pool, linked_task_id, user_id).await? |
| 734 |
.ok_or_else(|| CoreError::not_found("linked task", linked_task_id.to_string()))?; |
| 735 |
|
| 736 |
subtask_repo::add_subtask_link( |
| 737 |
&self.pool, |
| 738 |
task_id, |
| 739 |
user_id, |
| 740 |
linked_task_id, |
| 741 |
&linked_task.description, |
| 742 |
&linked_task.status, |
| 743 |
).await |
| 744 |
} |
| 745 |
|
| 746 |
|
| 747 |
|
| 748 |
#[tracing::instrument(skip_all)] |
| 749 |
async fn snooze(&self, id: TaskId, user_id: UserId, until: DateTime<Utc>) -> Result<Option<Task>> { |
| 750 |
task_repo_state::snooze(&self.pool, id, user_id, until).await |
| 751 |
} |
| 752 |
|
| 753 |
#[tracing::instrument(skip_all)] |
| 754 |
async fn unsnooze(&self, id: TaskId, user_id: UserId) -> Result<Option<Task>> { |
| 755 |
task_repo_state::unsnooze(&self.pool, id, user_id).await |
| 756 |
} |
| 757 |
|
| 758 |
#[tracing::instrument(skip_all)] |
| 759 |
async fn list_snoozed(&self, user_id: UserId) -> Result<Vec<Task>> { |
| 760 |
task_repo_state::list_snoozed(&self.pool, user_id).await |
| 761 |
} |
| 762 |
|
| 763 |
|
| 764 |
|
| 765 |
#[tracing::instrument(skip_all)] |
| 766 |
async fn mark_waiting(&self, id: TaskId, user_id: UserId, expected_response: Option<DateTime<Utc>>) -> Result<Option<Task>> { |
| 767 |
task_repo_state::mark_waiting(&self.pool, id, user_id, expected_response).await |
| 768 |
} |
| 769 |
|
| 770 |
#[tracing::instrument(skip_all)] |
| 771 |
async fn clear_waiting(&self, id: TaskId, user_id: UserId) -> Result<Option<Task>> { |
| 772 |
task_repo_state::clear_waiting(&self.pool, id, user_id).await |
| 773 |
} |
| 774 |
|
| 775 |
#[tracing::instrument(skip_all)] |
| 776 |
async fn list_waiting(&self, user_id: UserId) -> Result<Vec<Task>> { |
| 777 |
task_repo_state::list_waiting(&self.pool, user_id).await |
| 778 |
} |
| 779 |
|
| 780 |
|
| 781 |
|
| 782 |
#[tracing::instrument(skip_all)] |
| 783 |
async fn list_scheduled_for_date(&self, user_id: UserId, date: NaiveDate) -> Result<Vec<Task>> { |
| 784 |
task_repo_state::list_scheduled_for_date(&self.pool, user_id, date).await |
| 785 |
} |
| 786 |
|
| 787 |
#[tracing::instrument(skip_all)] |
| 788 |
async fn list_unscheduled_due_on_date(&self, user_id: UserId, date: NaiveDate) -> Result<Vec<Task>> { |
| 789 |
task_repo_state::list_unscheduled_due_on_date(&self.pool, user_id, date).await |
| 790 |
} |
| 791 |
|
| 792 |
#[tracing::instrument(skip_all)] |
| 793 |
async fn update_schedule(&self, id: TaskId, user_id: UserId, start: Option<DateTime<Utc>>, duration: Option<i32>) -> Result<Option<Task>> { |
| 794 |
task_repo_state::update_schedule(&self.pool, id, user_id, start, duration).await |
| 795 |
} |
| 796 |
|
| 797 |
|
| 798 |
|
| 799 |
#[tracing::instrument(skip_all)] |
| 800 |
async fn set_focus(&self, id: TaskId, user_id: UserId, is_focus: bool) -> Result<Option<Task>> { |
| 801 |
task_repo_state::set_focus(&self.pool, id, user_id, is_focus).await |
| 802 |
} |
| 803 |
|
| 804 |
#[tracing::instrument(skip_all)] |
| 805 |
async fn list_focused(&self, user_id: UserId) -> Result<Vec<Task>> { |
| 806 |
task_repo_state::list_focused(&self.pool, user_id).await |
| 807 |
} |
| 808 |
|
| 809 |
#[tracing::instrument(skip_all)] |
| 810 |
async fn clear_all_focus(&self, user_id: UserId) -> Result<u64> { |
| 811 |
task_repo_state::clear_all_focus(&self.pool, user_id).await |
| 812 |
} |
| 813 |
|
| 814 |
|
| 815 |
|
| 816 |
#[tracing::instrument(skip_all)] |
| 817 |
async fn list_completed_between(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Task>> { |
| 818 |
task_repo_state::list_completed_between(&self.pool, user_id, start, end).await |
| 819 |
} |
| 820 |
|
| 821 |
#[tracing::instrument(skip_all)] |
| 822 |
async fn list_became_overdue_between(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Task>> { |
| 823 |
task_repo_state::list_became_overdue_between(&self.pool, user_id, start, end).await |
| 824 |
} |
| 825 |
|
| 826 |
#[tracing::instrument(skip_all)] |
| 827 |
async fn list_due_between(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Task>> { |
| 828 |
task_repo_state::list_due_between(&self.pool, user_id, start, end).await |
| 829 |
} |
| 830 |
|
| 831 |
#[tracing::instrument(skip_all)] |
| 832 |
async fn list_created_between(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Task>> { |
| 833 |
task_repo_state::list_created_between(&self.pool, user_id, start, end).await |
| 834 |
} |
| 835 |
|
| 836 |
#[tracing::instrument(skip_all)] |
| 837 |
async fn list_available_for_focus(&self, user_id: UserId, limit: i64) -> Result<Vec<Task>> { |
| 838 |
task_repo_state::list_available_for_focus(&self.pool, user_id, limit).await |
| 839 |
} |
| 840 |
|
| 841 |
|
| 842 |
|
| 843 |
#[tracing::instrument(skip_all)] |
| 844 |
async fn start_timer(&self, task_id: TaskId, user_id: UserId) -> Result<TimeSession> { |
| 845 |
time_session_repo::start_timer(&self.pool, task_id, user_id).await |
| 846 |
} |
| 847 |
|
| 848 |
#[tracing::instrument(skip_all)] |
| 849 |
async fn stop_timer(&self, task_id: TaskId, user_id: UserId) -> Result<Option<TimeSession>> { |
| 850 |
time_session_repo::stop_timer(&self.pool, task_id, user_id).await |
| 851 |
} |
| 852 |
|
| 853 |
#[tracing::instrument(skip_all)] |
| 854 |
async fn discard_timer(&self, task_id: TaskId, user_id: UserId) -> Result<bool> { |
| 855 |
time_session_repo::discard_timer(&self.pool, task_id, user_id).await |
| 856 |
} |
| 857 |
|
| 858 |
#[tracing::instrument(skip_all)] |
| 859 |
async fn get_active_timer(&self, user_id: UserId) -> Result<Option<(TimeSession, String)>> { |
| 860 |
time_session_repo::get_active_timer(&self.pool, user_id).await |
| 861 |
} |
| 862 |
|
| 863 |
#[tracing::instrument(skip_all)] |
| 864 |
async fn list_time_sessions(&self, task_id: TaskId, user_id: UserId) -> Result<Vec<TimeSession>> { |
| 865 |
time_session_repo::list_time_sessions(&self.pool, task_id, user_id).await |
| 866 |
} |
| 867 |
|
| 868 |
#[tracing::instrument(skip_all)] |
| 869 |
async fn log_manual_time(&self, task_id: TaskId, user_id: UserId, minutes: i32, date: DateTime<Utc>) -> Result<TimeSession> { |
| 870 |
time_session_repo::log_manual_time(&self.pool, task_id, user_id, minutes, date).await |
| 871 |
} |
| 872 |
|
| 873 |
#[tracing::instrument(skip_all)] |
| 874 |
async fn get_time_summary(&self, user_id: UserId, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<TimeTrackingSummary>> { |
| 875 |
time_session_repo::get_time_summary(&self.pool, user_id, start, end).await |
| 876 |
} |
| 877 |
|
| 878 |
#[tracing::instrument(skip_all)] |
| 879 |
async fn list_recurrence_chain(&self, root_id: TaskId, user_id: UserId) -> Result<Vec<Task>> { |
| 880 |
let sql = format!( |
| 881 |
"SELECT {} FROM tasks t LEFT JOIN projects p ON t.project_id = p.id AND p.user_id = ? LEFT JOIN contacts ct ON ct.id = t.contact_id WHERE (t.recurrence_parent_id = ? OR t.id = ?) AND t.user_id = ? ORDER BY t.created_at DESC", |
| 882 |
TASK_SELECT_COLUMNS |
| 883 |
); |
| 884 |
query_tasks(&self.pool, &sql, &[ |
| 885 |
user_id.to_string(), |
| 886 |
root_id.to_string(), |
| 887 |
root_id.to_string(), |
| 888 |
user_id.to_string(), |
| 889 |
]).await |
| 890 |
} |
| 891 |
} |
| 892 |
|