//! SQLite data layer for tickets with sync support. use chrono::{DateTime, Utc}; use color_eyre::eyre::{Result, WrapErr, eyre}; use rusqlite::{Connection, Row, params}; use crate::types::{Channel, NewTicket, Priority, Status, Ticket}; /// Open (or create) the WAM database and run migrations. pub fn open_db() -> Result { let dirs = directories::ProjectDirs::from("work", "makenot", "wam") .ok_or_else(|| eyre!("cannot determine data directory"))?; let data_dir = dirs.data_dir(); std::fs::create_dir_all(data_dir) .wrap_err_with(|| format!("create data dir: {}", data_dir.display()))?; let db_path = data_dir.join("wam.db"); let conn = Connection::open(&db_path) .wrap_err_with(|| format!("open database: {}", db_path.display()))?; conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?; migrate(&conn)?; Ok(conn) } /// Open an in-memory database for testing. #[cfg(test)] pub fn open_memory() -> Result { let conn = Connection::open_in_memory()?; migrate(&conn)?; Ok(conn) } fn migrate(conn: &Connection) -> Result<()> { conn.execute_batch( "CREATE TABLE IF NOT EXISTS tickets ( id TEXT PRIMARY KEY, title TEXT NOT NULL, body TEXT, priority TEXT NOT NULL DEFAULT 'medium', status TEXT NOT NULL DEFAULT 'open', channel TEXT NOT NULL DEFAULT 'system', node_id TEXT NOT NULL DEFAULT '', source TEXT, source_ref TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, resolved_at TEXT ); CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS sync_cursors ( peer_url TEXT PRIMARY KEY, last_synced TEXT NOT NULL );", )?; // Add channel/node_id columns if migrating from v0.1 schema let has_channel: bool = conn .prepare("SELECT channel FROM tickets LIMIT 0") .is_ok(); if !has_channel { conn.execute_batch( "ALTER TABLE tickets ADD COLUMN channel TEXT NOT NULL DEFAULT 'system'; ALTER TABLE tickets ADD COLUMN node_id TEXT NOT NULL DEFAULT '';", )?; } Ok(()) } /// Get or create this node's persistent identity. pub fn get_or_create_node_id(conn: &Connection) -> Result { let existing: Option = conn .query_row("SELECT value FROM meta WHERE key = 'node_id'", [], |row| { row.get(0) }) .ok(); if let Some(id) = existing { return Ok(id); } let id = uuid::Uuid::new_v4().to_string(); conn.execute( "INSERT INTO meta (key, value) VALUES ('node_id', ?1)", params![id], )?; Ok(id) } fn row_to_ticket(row: &Row) -> rusqlite::Result { let priority_str: String = row.get("priority")?; let status_str: String = row.get("status")?; let channel_str: String = row.get("channel")?; let created_str: String = row.get("created_at")?; let updated_str: String = row.get("updated_at")?; let resolved_str: Option = row.get("resolved_at")?; Ok(Ticket { id: row.get("id")?, title: row.get("title")?, body: row.get("body")?, priority: priority_str.parse().unwrap_or(Priority::Medium), status: status_str.parse().unwrap_or(Status::Open), channel: channel_str.parse().unwrap_or(Channel::System), node_id: row.get("node_id")?, source: row.get("source")?, source_ref: row.get("source_ref")?, created_at: DateTime::parse_from_rfc3339(&created_str) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()), updated_at: DateTime::parse_from_rfc3339(&updated_str) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()), resolved_at: resolved_str.and_then(|s| { DateTime::parse_from_rfc3339(&s) .map(|dt| dt.with_timezone(&Utc)) .ok() }), }) } /// Create a new ticket. Returns the created ticket. pub fn create_ticket(conn: &Connection, new: &NewTicket, node_id: &str) -> Result { let id = uuid::Uuid::new_v4().to_string(); let now = Utc::now().to_rfc3339(); conn.execute( "INSERT INTO tickets (id, title, body, priority, status, channel, node_id, source, source_ref, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, ?8, ?9, ?9)", params![ id, new.title, new.body, new.priority.to_string(), new.channel.to_string(), node_id, new.source, new.source_ref, now, ], )?; get_ticket(conn, &id) } /// Get a ticket by exact ID or unique prefix match. pub fn get_ticket(conn: &Connection, id_prefix: &str) -> Result { let mut stmt = conn.prepare( "SELECT * FROM tickets WHERE id LIKE ?1 || '%'", )?; let tickets: Vec = stmt .query_map(params![id_prefix], row_to_ticket)? .collect::>>()?; match tickets.len() { 0 => Err(eyre!("no ticket matching '{id_prefix}'")), 1 => Ok(tickets.into_iter().next().unwrap()), n => Err(eyre!("ambiguous prefix '{id_prefix}' matches {n} tickets")), } } /// Filter criteria for listing tickets. #[derive(Default)] pub struct ListFilter<'a> { pub status: Option, pub priority: Option, pub channel: Option, pub source: Option<&'a str>, pub search: Option<&'a str>, } /// List tickets with optional filters, ordered by priority (desc) then date (desc). pub fn list_tickets(conn: &Connection, filter: &ListFilter) -> Result> { let mut sql = String::from("SELECT * FROM tickets WHERE 1=1"); let mut bind_values: Vec = Vec::new(); if let Some(status) = filter.status { bind_values.push(status.to_string()); sql.push_str(&format!(" AND status = ?{}", bind_values.len())); } if let Some(priority) = filter.priority { bind_values.push(priority.to_string()); sql.push_str(&format!(" AND priority = ?{}", bind_values.len())); } if let Some(channel) = filter.channel { bind_values.push(channel.to_string()); sql.push_str(&format!(" AND channel = ?{}", bind_values.len())); } if let Some(source) = filter.source { bind_values.push(source.to_string()); sql.push_str(&format!(" AND source = ?{}", bind_values.len())); } if let Some(search) = filter.search { bind_values.push(format!("%{search}%")); sql.push_str(&format!(" AND title LIKE ?{}", bind_values.len())); } sql.push_str( " ORDER BY CASE priority WHEN 'critical' THEN 3 WHEN 'high' THEN 2 WHEN 'medium' THEN 1 WHEN 'low' THEN 0 ELSE 1 END DESC, created_at DESC", ); let mut stmt = conn.prepare(&sql)?; let params_refs: Vec<&dyn rusqlite::types::ToSql> = bind_values.iter().map(|v| v as &dyn rusqlite::types::ToSql).collect(); let tickets = stmt .query_map(params_refs.as_slice(), row_to_ticket)? .collect::>>()?; Ok(tickets) } /// Update a ticket's status. Sets resolved_at when moving to Resolved. pub fn update_status(conn: &Connection, id: &str, status: Status) -> Result<()> { let now = Utc::now().to_rfc3339(); let resolved_at = if status == Status::Resolved { Some(now.clone()) } else { None }; let rows = conn.execute( "UPDATE tickets SET status = ?1, updated_at = ?2, resolved_at = ?3 WHERE id = ?4", params![status.to_string(), now, resolved_at, id], )?; if rows == 0 { return Err(eyre!("no ticket with id '{id}'")); } Ok(()) } /// Update a ticket's title and body. Bumps `updated_at`. pub fn update_ticket( conn: &Connection, id: &str, title: &str, body: Option<&str>, ) -> Result<()> { let now = Utc::now().to_rfc3339(); let rows = conn.execute( "UPDATE tickets SET title = ?1, body = ?2, updated_at = ?3 WHERE id = ?4", params![title, body, now, id], )?; if rows == 0 { return Err(eyre!("no ticket with id '{id}'")); } Ok(()) } /// Aggregate stats across all tickets. #[derive(Debug, Default)] pub struct Stats { pub total: usize, pub by_status: Vec<(Status, usize)>, pub open_by_priority: Vec<(Priority, usize)>, pub open_by_source: Vec<(String, usize)>, /// Average seconds between `created_at` and `resolved_at` for tickets that /// were resolved. `None` when nothing has been resolved yet. pub avg_resolution_seconds: Option, } pub fn stats(conn: &Connection) -> Result { let tickets = list_tickets(conn, &ListFilter::default())?; let mut s = Stats { total: tickets.len(), ..Default::default() }; let order_status = [Status::Open, Status::InProgress, Status::Resolved, Status::Closed]; for status in order_status { let count = tickets.iter().filter(|t| t.status == status).count(); if count > 0 { s.by_status.push((status, count)); } } let order_pri = [Priority::Critical, Priority::High, Priority::Medium, Priority::Low]; for pri in order_pri { let count = tickets .iter() .filter(|t| t.status == Status::Open && t.priority == pri) .count(); if count > 0 { s.open_by_priority.push((pri, count)); } } let mut by_source: std::collections::HashMap = std::collections::HashMap::new(); for t in tickets.iter().filter(|t| t.status == Status::Open) { let key = t.source.clone().unwrap_or_else(|| "-".to_string()); *by_source.entry(key).or_insert(0) += 1; } let mut by_source: Vec<(String, usize)> = by_source.into_iter().collect(); by_source.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0))); s.open_by_source = by_source; let resolved: Vec<&Ticket> = tickets .iter() .filter(|t| t.resolved_at.is_some()) .collect(); if !resolved.is_empty() { let total: i64 = resolved .iter() .map(|t| { let resolved_at = t.resolved_at.unwrap(); (resolved_at - t.created_at).num_seconds().max(0) }) .sum(); s.avg_resolution_seconds = Some(total / resolved.len() as i64); } Ok(s) } /// Delete tickets older than `older_than` with the given status. /// Returns the number of rows deleted. pub fn prune_tickets( conn: &Connection, older_than: chrono::Duration, status: Status, ) -> Result { let cutoff = (Utc::now() - older_than).to_rfc3339(); let rows = conn.execute( "DELETE FROM tickets WHERE status = ?1 AND updated_at < ?2", params![status.to_string(), cutoff], )?; Ok(rows) } // -- Sync operations ---------------------------------------------------------- /// Get all tickets updated after the given timestamp. pub fn tickets_since(conn: &Connection, since: &str) -> Result> { let mut stmt = conn.prepare( "SELECT * FROM tickets WHERE updated_at > ?1 ORDER BY updated_at ASC", )?; let tickets = stmt .query_map(params![since], row_to_ticket)? .collect::>>()?; Ok(tickets) } /// Upsert a ticket from a peer. Last-writer-wins based on updated_at. /// Returns true if the ticket was inserted or updated. pub fn upsert_synced_ticket(conn: &Connection, ticket: &Ticket) -> Result { // Check if we have this ticket and if ours is newer let existing_updated: Option = conn .query_row( "SELECT updated_at FROM tickets WHERE id = ?1", params![ticket.id], |row| row.get(0), ) .ok(); if let Some(ref existing) = existing_updated { let existing_dt = DateTime::parse_from_rfc3339(existing) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()); if existing_dt >= ticket.updated_at { return Ok(false); // Ours is same or newer } } let resolved_at = ticket.resolved_at.map(|dt| dt.to_rfc3339()); conn.execute( "INSERT INTO tickets (id, title, body, priority, status, channel, node_id, source, source_ref, created_at, updated_at, resolved_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) ON CONFLICT(id) DO UPDATE SET title = excluded.title, body = excluded.body, priority = excluded.priority, status = excluded.status, channel = excluded.channel, source = excluded.source, source_ref = excluded.source_ref, updated_at = excluded.updated_at, resolved_at = excluded.resolved_at", params![ ticket.id, ticket.title, ticket.body, ticket.priority.to_string(), ticket.status.to_string(), ticket.channel.to_string(), ticket.node_id, ticket.source, ticket.source_ref, ticket.created_at.to_rfc3339(), ticket.updated_at.to_rfc3339(), resolved_at, ], )?; Ok(true) } /// Get the sync cursor for a peer (last synced timestamp). pub fn get_sync_cursor(conn: &Connection, peer_url: &str) -> Result> { let cursor: Option = conn .query_row( "SELECT last_synced FROM sync_cursors WHERE peer_url = ?1", params![peer_url], |row| row.get(0), ) .ok(); Ok(cursor) } /// Update the sync cursor for a peer. pub fn set_sync_cursor(conn: &Connection, peer_url: &str, last_synced: &str) -> Result<()> { conn.execute( "INSERT INTO sync_cursors (peer_url, last_synced) VALUES (?1, ?2) ON CONFLICT(peer_url) DO UPDATE SET last_synced = excluded.last_synced", params![peer_url, last_synced], )?; Ok(()) } #[cfg(test)] mod tests { use super::*; fn test_new_ticket(title: &str) -> NewTicket { NewTicket { title: title.to_string(), body: None, priority: Priority::Medium, channel: Channel::System, source: Some("test".to_string()), source_ref: None, } } #[test] fn create_and_get() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); let t = create_ticket(&conn, &test_new_ticket("fix the thing"), &node).unwrap(); assert_eq!(t.title, "fix the thing"); assert_eq!(t.status, Status::Open); assert_eq!(t.channel, Channel::System); assert_eq!(t.node_id, node); let fetched = get_ticket(&conn, &t.id).unwrap(); assert_eq!(fetched.id, t.id); } #[test] fn prefix_match() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); let t = create_ticket(&conn, &test_new_ticket("test"), &node).unwrap(); let fetched = get_ticket(&conn, &t.id[..8]).unwrap(); assert_eq!(fetched.id, t.id); } #[test] fn list_with_filter() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); create_ticket(&conn, &NewTicket { title: "urgent".into(), body: None, priority: Priority::Critical, channel: Channel::Request, source: Some("pom".into()), source_ref: None, }, &node).unwrap(); create_ticket(&conn, &test_new_ticket("normal"), &node).unwrap(); let all = list_tickets(&conn, &ListFilter::default()).unwrap(); assert_eq!(all.len(), 2); assert_eq!(all[0].title, "urgent"); let requests = list_tickets(&conn, &ListFilter { channel: Some(Channel::Request), ..Default::default() }).unwrap(); assert_eq!(requests.len(), 1); } #[test] fn update_status_sets_resolved_at() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); let t = create_ticket(&conn, &test_new_ticket("resolve me"), &node).unwrap(); assert!(t.resolved_at.is_none()); update_status(&conn, &t.id, Status::Resolved).unwrap(); let updated = get_ticket(&conn, &t.id).unwrap(); assert_eq!(updated.status, Status::Resolved); assert!(updated.resolved_at.is_some()); } #[test] fn sync_upsert_last_writer_wins() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); let t = create_ticket(&conn, &test_new_ticket("original"), &node).unwrap(); // Simulate a peer's version with a newer timestamp let mut peer_ticket = t.clone(); peer_ticket.title = "updated by peer".to_string(); peer_ticket.updated_at = Utc::now() + chrono::Duration::seconds(10); let changed = upsert_synced_ticket(&conn, &peer_ticket).unwrap(); assert!(changed); let fetched = get_ticket(&conn, &peer_ticket.id).unwrap(); assert_eq!(fetched.title, "updated by peer"); // Older update should be rejected let mut stale = peer_ticket.clone(); stale.title = "stale update".to_string(); stale.updated_at = Utc::now() - chrono::Duration::seconds(100); let changed = upsert_synced_ticket(&conn, &stale).unwrap(); assert!(!changed); let fetched = get_ticket(&conn, &peer_ticket.id).unwrap(); assert_eq!(fetched.title, "updated by peer"); // Not "stale update" } #[test] fn search_filter() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); create_ticket(&conn, &test_new_ticket("refund issue"), &node).unwrap(); create_ticket(&conn, &test_new_ticket("build failure"), &node).unwrap(); let results = list_tickets(&conn, &ListFilter { search: Some("refund"), ..Default::default() }).unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0].title, "refund issue"); } #[test] fn update_ticket_changes_fields() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); let t = create_ticket(&conn, &test_new_ticket("orig"), &node).unwrap(); update_ticket(&conn, &t.id, "renamed", Some("with body")).unwrap(); let fetched = get_ticket(&conn, &t.id).unwrap(); assert_eq!(fetched.title, "renamed"); assert_eq!(fetched.body.as_deref(), Some("with body")); } #[test] fn update_ticket_missing_errors() { let conn = open_memory().unwrap(); assert!(update_ticket(&conn, "no-such-id", "x", None).is_err()); } #[test] fn stats_aggregates() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); let a = create_ticket(&conn, &NewTicket { title: "a".into(), body: None, priority: Priority::Critical, channel: Channel::System, source: Some("pom".into()), source_ref: None, }, &node).unwrap(); create_ticket(&conn, &NewTicket { title: "b".into(), body: None, priority: Priority::Low, channel: Channel::Task, source: Some("manual".into()), source_ref: None, }, &node).unwrap(); let c = create_ticket(&conn, &test_new_ticket("c"), &node).unwrap(); update_status(&conn, &c.id, Status::Resolved).unwrap(); let s = stats(&conn).unwrap(); assert_eq!(s.total, 3); assert!(s.by_status.iter().any(|(st, n)| *st == Status::Open && *n == 2)); assert!(s.by_status.iter().any(|(st, n)| *st == Status::Resolved && *n == 1)); // Two open tickets: Critical and Low assert!(s.open_by_priority.iter().any(|(p, n)| *p == Priority::Critical && *n == 1)); assert!(s.open_by_priority.iter().any(|(p, n)| *p == Priority::Low && *n == 1)); // Source counts only the open tickets let pom = s.open_by_source.iter().find(|(k, _)| k == "pom").map(|(_, n)| *n); assert_eq!(pom, Some(1)); assert!(s.avg_resolution_seconds.is_some()); // First ticket is still open let _ = a; } #[test] fn prune_deletes_matching() { let conn = open_memory().unwrap(); let node = get_or_create_node_id(&conn).unwrap(); let closed = create_ticket(&conn, &test_new_ticket("old closed"), &node).unwrap(); update_status(&conn, &closed.id, Status::Closed).unwrap(); // Backdate updated_at by 100 days let backdate = (Utc::now() - chrono::Duration::days(100)).to_rfc3339(); conn.execute( "UPDATE tickets SET updated_at = ?1 WHERE id = ?2", params![backdate, closed.id], ).unwrap(); // A recent closed ticket should not be pruned let recent = create_ticket(&conn, &test_new_ticket("recent closed"), &node).unwrap(); update_status(&conn, &recent.id, Status::Closed).unwrap(); // An old but still-open ticket should not be pruned let open = create_ticket(&conn, &test_new_ticket("old open"), &node).unwrap(); conn.execute( "UPDATE tickets SET updated_at = ?1 WHERE id = ?2", params![backdate, open.id], ).unwrap(); let n = prune_tickets(&conn, chrono::Duration::days(90), Status::Closed).unwrap(); assert_eq!(n, 1); assert!(get_ticket(&conn, &closed.id).is_err()); assert!(get_ticket(&conn, &recent.id).is_ok()); assert!(get_ticket(&conn, &open.id).is_ok()); } #[test] fn node_id_persists() { let conn = open_memory().unwrap(); let id1 = get_or_create_node_id(&conn).unwrap(); let id2 = get_or_create_node_id(&conn).unwrap(); assert_eq!(id1, id2); } }