Skip to main content

max / makenotwork

21.9 KB · 634 lines History Blame Raw
1 //! SQLite data layer for tickets with sync support.
2
3 use chrono::{DateTime, Utc};
4 use color_eyre::eyre::{Result, WrapErr, eyre};
5 use rusqlite::{Connection, Row, params};
6
7 use crate::types::{Channel, NewTicket, Priority, Status, Ticket};
8
9 /// Open (or create) the WAM database and run migrations.
10 pub fn open_db() -> Result<Connection> {
11 let dirs = directories::ProjectDirs::from("work", "makenot", "wam")
12 .ok_or_else(|| eyre!("cannot determine data directory"))?;
13 let data_dir = dirs.data_dir();
14 std::fs::create_dir_all(data_dir)
15 .wrap_err_with(|| format!("create data dir: {}", data_dir.display()))?;
16
17 let db_path = data_dir.join("wam.db");
18 let conn = Connection::open(&db_path)
19 .wrap_err_with(|| format!("open database: {}", db_path.display()))?;
20
21 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
22 migrate(&conn)?;
23 Ok(conn)
24 }
25
26 /// Open an in-memory database for testing.
27 #[cfg(test)]
28 pub fn open_memory() -> Result<Connection> {
29 let conn = Connection::open_in_memory()?;
30 migrate(&conn)?;
31 Ok(conn)
32 }
33
34 fn migrate(conn: &Connection) -> Result<()> {
35 conn.execute_batch(
36 "CREATE TABLE IF NOT EXISTS tickets (
37 id TEXT PRIMARY KEY,
38 title TEXT NOT NULL,
39 body TEXT,
40 priority TEXT NOT NULL DEFAULT 'medium',
41 status TEXT NOT NULL DEFAULT 'open',
42 channel TEXT NOT NULL DEFAULT 'system',
43 node_id TEXT NOT NULL DEFAULT '',
44 source TEXT,
45 source_ref TEXT,
46 created_at TEXT NOT NULL,
47 updated_at TEXT NOT NULL,
48 resolved_at TEXT
49 );
50 CREATE TABLE IF NOT EXISTS meta (
51 key TEXT PRIMARY KEY,
52 value TEXT NOT NULL
53 );
54 CREATE TABLE IF NOT EXISTS sync_cursors (
55 peer_url TEXT PRIMARY KEY,
56 last_synced TEXT NOT NULL
57 );",
58 )?;
59
60 // Add channel/node_id columns if migrating from v0.1 schema
61 let has_channel: bool = conn
62 .prepare("SELECT channel FROM tickets LIMIT 0")
63 .is_ok();
64 if !has_channel {
65 conn.execute_batch(
66 "ALTER TABLE tickets ADD COLUMN channel TEXT NOT NULL DEFAULT 'system';
67 ALTER TABLE tickets ADD COLUMN node_id TEXT NOT NULL DEFAULT '';",
68 )?;
69 }
70
71 Ok(())
72 }
73
74 /// Get or create this node's persistent identity.
75 pub fn get_or_create_node_id(conn: &Connection) -> Result<String> {
76 let existing: Option<String> = conn
77 .query_row("SELECT value FROM meta WHERE key = 'node_id'", [], |row| {
78 row.get(0)
79 })
80 .ok();
81
82 if let Some(id) = existing {
83 return Ok(id);
84 }
85
86 let id = uuid::Uuid::new_v4().to_string();
87 conn.execute(
88 "INSERT INTO meta (key, value) VALUES ('node_id', ?1)",
89 params![id],
90 )?;
91 Ok(id)
92 }
93
94 fn row_to_ticket(row: &Row) -> rusqlite::Result<Ticket> {
95 let priority_str: String = row.get("priority")?;
96 let status_str: String = row.get("status")?;
97 let channel_str: String = row.get("channel")?;
98 let created_str: String = row.get("created_at")?;
99 let updated_str: String = row.get("updated_at")?;
100 let resolved_str: Option<String> = row.get("resolved_at")?;
101
102 Ok(Ticket {
103 id: row.get("id")?,
104 title: row.get("title")?,
105 body: row.get("body")?,
106 priority: priority_str.parse().unwrap_or(Priority::Medium),
107 status: status_str.parse().unwrap_or(Status::Open),
108 channel: channel_str.parse().unwrap_or(Channel::System),
109 node_id: row.get("node_id")?,
110 source: row.get("source")?,
111 source_ref: row.get("source_ref")?,
112 created_at: DateTime::parse_from_rfc3339(&created_str)
113 .map(|dt| dt.with_timezone(&Utc))
114 .unwrap_or_else(|_| Utc::now()),
115 updated_at: DateTime::parse_from_rfc3339(&updated_str)
116 .map(|dt| dt.with_timezone(&Utc))
117 .unwrap_or_else(|_| Utc::now()),
118 resolved_at: resolved_str.and_then(|s| {
119 DateTime::parse_from_rfc3339(&s)
120 .map(|dt| dt.with_timezone(&Utc))
121 .ok()
122 }),
123 })
124 }
125
126 /// Create a new ticket. Returns the created ticket.
127 pub fn create_ticket(conn: &Connection, new: &NewTicket, node_id: &str) -> Result<Ticket> {
128 let id = uuid::Uuid::new_v4().to_string();
129 let now = Utc::now().to_rfc3339();
130
131 conn.execute(
132 "INSERT INTO tickets (id, title, body, priority, status, channel, node_id, source, source_ref, created_at, updated_at)
133 VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, ?8, ?9, ?9)",
134 params![
135 id,
136 new.title,
137 new.body,
138 new.priority.to_string(),
139 new.channel.to_string(),
140 node_id,
141 new.source,
142 new.source_ref,
143 now,
144 ],
145 )?;
146
147 get_ticket(conn, &id)
148 }
149
150 /// Get a ticket by exact ID or unique prefix match.
151 pub fn get_ticket(conn: &Connection, id_prefix: &str) -> Result<Ticket> {
152 let mut stmt = conn.prepare(
153 "SELECT * FROM tickets WHERE id LIKE ?1 || '%'",
154 )?;
155 let tickets: Vec<Ticket> = stmt
156 .query_map(params![id_prefix], row_to_ticket)?
157 .collect::<rusqlite::Result<Vec<_>>>()?;
158
159 match tickets.len() {
160 0 => Err(eyre!("no ticket matching '{id_prefix}'")),
161 1 => Ok(tickets.into_iter().next().unwrap()),
162 n => Err(eyre!("ambiguous prefix '{id_prefix}' matches {n} tickets")),
163 }
164 }
165
166 /// Filter criteria for listing tickets.
167 #[derive(Default)]
168 pub struct ListFilter<'a> {
169 pub status: Option<Status>,
170 pub priority: Option<Priority>,
171 pub channel: Option<Channel>,
172 pub source: Option<&'a str>,
173 pub search: Option<&'a str>,
174 }
175
176 /// List tickets with optional filters, ordered by priority (desc) then date (desc).
177 pub fn list_tickets(conn: &Connection, filter: &ListFilter) -> Result<Vec<Ticket>> {
178 let mut sql = String::from("SELECT * FROM tickets WHERE 1=1");
179 let mut bind_values: Vec<String> = Vec::new();
180
181 if let Some(status) = filter.status {
182 bind_values.push(status.to_string());
183 sql.push_str(&format!(" AND status = ?{}", bind_values.len()));
184 }
185 if let Some(priority) = filter.priority {
186 bind_values.push(priority.to_string());
187 sql.push_str(&format!(" AND priority = ?{}", bind_values.len()));
188 }
189 if let Some(channel) = filter.channel {
190 bind_values.push(channel.to_string());
191 sql.push_str(&format!(" AND channel = ?{}", bind_values.len()));
192 }
193 if let Some(source) = filter.source {
194 bind_values.push(source.to_string());
195 sql.push_str(&format!(" AND source = ?{}", bind_values.len()));
196 }
197 if let Some(search) = filter.search {
198 bind_values.push(format!("%{search}%"));
199 sql.push_str(&format!(" AND title LIKE ?{}", bind_values.len()));
200 }
201
202 sql.push_str(
203 " ORDER BY CASE priority
204 WHEN 'critical' THEN 3
205 WHEN 'high' THEN 2
206 WHEN 'medium' THEN 1
207 WHEN 'low' THEN 0
208 ELSE 1
209 END DESC, created_at DESC",
210 );
211
212 let mut stmt = conn.prepare(&sql)?;
213 let params_refs: Vec<&dyn rusqlite::types::ToSql> =
214 bind_values.iter().map(|v| v as &dyn rusqlite::types::ToSql).collect();
215 let tickets = stmt
216 .query_map(params_refs.as_slice(), row_to_ticket)?
217 .collect::<rusqlite::Result<Vec<_>>>()?;
218
219 Ok(tickets)
220 }
221
222 /// Update a ticket's status. Sets resolved_at when moving to Resolved.
223 pub fn update_status(conn: &Connection, id: &str, status: Status) -> Result<()> {
224 let now = Utc::now().to_rfc3339();
225 let resolved_at = if status == Status::Resolved {
226 Some(now.clone())
227 } else {
228 None
229 };
230
231 let rows = conn.execute(
232 "UPDATE tickets SET status = ?1, updated_at = ?2, resolved_at = ?3 WHERE id = ?4",
233 params![status.to_string(), now, resolved_at, id],
234 )?;
235
236 if rows == 0 {
237 return Err(eyre!("no ticket with id '{id}'"));
238 }
239 Ok(())
240 }
241
242 /// Update a ticket's title and body. Bumps `updated_at`.
243 pub fn update_ticket(
244 conn: &Connection,
245 id: &str,
246 title: &str,
247 body: Option<&str>,
248 ) -> Result<()> {
249 let now = Utc::now().to_rfc3339();
250 let rows = conn.execute(
251 "UPDATE tickets SET title = ?1, body = ?2, updated_at = ?3 WHERE id = ?4",
252 params![title, body, now, id],
253 )?;
254 if rows == 0 {
255 return Err(eyre!("no ticket with id '{id}'"));
256 }
257 Ok(())
258 }
259
260 /// Aggregate stats across all tickets.
261 #[derive(Debug, Default)]
262 pub struct Stats {
263 pub total: usize,
264 pub by_status: Vec<(Status, usize)>,
265 pub open_by_priority: Vec<(Priority, usize)>,
266 pub open_by_source: Vec<(String, usize)>,
267 /// Average seconds between `created_at` and `resolved_at` for tickets that
268 /// were resolved. `None` when nothing has been resolved yet.
269 pub avg_resolution_seconds: Option<i64>,
270 }
271
272 pub fn stats(conn: &Connection) -> Result<Stats> {
273 let tickets = list_tickets(conn, &ListFilter::default())?;
274 let mut s = Stats { total: tickets.len(), ..Default::default() };
275
276 let order_status = [Status::Open, Status::InProgress, Status::Resolved, Status::Closed];
277 for status in order_status {
278 let count = tickets.iter().filter(|t| t.status == status).count();
279 if count > 0 {
280 s.by_status.push((status, count));
281 }
282 }
283
284 let order_pri = [Priority::Critical, Priority::High, Priority::Medium, Priority::Low];
285 for pri in order_pri {
286 let count = tickets
287 .iter()
288 .filter(|t| t.status == Status::Open && t.priority == pri)
289 .count();
290 if count > 0 {
291 s.open_by_priority.push((pri, count));
292 }
293 }
294
295 let mut by_source: std::collections::HashMap<String, usize> =
296 std::collections::HashMap::new();
297 for t in tickets.iter().filter(|t| t.status == Status::Open) {
298 let key = t.source.clone().unwrap_or_else(|| "-".to_string());
299 *by_source.entry(key).or_insert(0) += 1;
300 }
301 let mut by_source: Vec<(String, usize)> = by_source.into_iter().collect();
302 by_source.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
303 s.open_by_source = by_source;
304
305 let resolved: Vec<&Ticket> = tickets
306 .iter()
307 .filter(|t| t.resolved_at.is_some())
308 .collect();
309 if !resolved.is_empty() {
310 let total: i64 = resolved
311 .iter()
312 .map(|t| {
313 let resolved_at = t.resolved_at.unwrap();
314 (resolved_at - t.created_at).num_seconds().max(0)
315 })
316 .sum();
317 s.avg_resolution_seconds = Some(total / resolved.len() as i64);
318 }
319
320 Ok(s)
321 }
322
323 /// Delete tickets older than `older_than` with the given status.
324 /// Returns the number of rows deleted.
325 pub fn prune_tickets(
326 conn: &Connection,
327 older_than: chrono::Duration,
328 status: Status,
329 ) -> Result<usize> {
330 let cutoff = (Utc::now() - older_than).to_rfc3339();
331 let rows = conn.execute(
332 "DELETE FROM tickets WHERE status = ?1 AND updated_at < ?2",
333 params![status.to_string(), cutoff],
334 )?;
335 Ok(rows)
336 }
337
338 // -- Sync operations ----------------------------------------------------------
339
340 /// Get all tickets updated after the given timestamp.
341 pub fn tickets_since(conn: &Connection, since: &str) -> Result<Vec<Ticket>> {
342 let mut stmt = conn.prepare(
343 "SELECT * FROM tickets WHERE updated_at > ?1 ORDER BY updated_at ASC",
344 )?;
345 let tickets = stmt
346 .query_map(params![since], row_to_ticket)?
347 .collect::<rusqlite::Result<Vec<_>>>()?;
348 Ok(tickets)
349 }
350
351 /// Upsert a ticket from a peer. Last-writer-wins based on updated_at.
352 /// Returns true if the ticket was inserted or updated.
353 pub fn upsert_synced_ticket(conn: &Connection, ticket: &Ticket) -> Result<bool> {
354 // Check if we have this ticket and if ours is newer
355 let existing_updated: Option<String> = conn
356 .query_row(
357 "SELECT updated_at FROM tickets WHERE id = ?1",
358 params![ticket.id],
359 |row| row.get(0),
360 )
361 .ok();
362
363 if let Some(ref existing) = existing_updated {
364 let existing_dt = DateTime::parse_from_rfc3339(existing)
365 .map(|dt| dt.with_timezone(&Utc))
366 .unwrap_or_else(|_| Utc::now());
367 if existing_dt >= ticket.updated_at {
368 return Ok(false); // Ours is same or newer
369 }
370 }
371
372 let resolved_at = ticket.resolved_at.map(|dt| dt.to_rfc3339());
373
374 conn.execute(
375 "INSERT INTO tickets (id, title, body, priority, status, channel, node_id, source, source_ref, created_at, updated_at, resolved_at)
376 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
377 ON CONFLICT(id) DO UPDATE SET
378 title = excluded.title,
379 body = excluded.body,
380 priority = excluded.priority,
381 status = excluded.status,
382 channel = excluded.channel,
383 source = excluded.source,
384 source_ref = excluded.source_ref,
385 updated_at = excluded.updated_at,
386 resolved_at = excluded.resolved_at",
387 params![
388 ticket.id,
389 ticket.title,
390 ticket.body,
391 ticket.priority.to_string(),
392 ticket.status.to_string(),
393 ticket.channel.to_string(),
394 ticket.node_id,
395 ticket.source,
396 ticket.source_ref,
397 ticket.created_at.to_rfc3339(),
398 ticket.updated_at.to_rfc3339(),
399 resolved_at,
400 ],
401 )?;
402
403 Ok(true)
404 }
405
406 /// Get the sync cursor for a peer (last synced timestamp).
407 pub fn get_sync_cursor(conn: &Connection, peer_url: &str) -> Result<Option<String>> {
408 let cursor: Option<String> = conn
409 .query_row(
410 "SELECT last_synced FROM sync_cursors WHERE peer_url = ?1",
411 params![peer_url],
412 |row| row.get(0),
413 )
414 .ok();
415 Ok(cursor)
416 }
417
418 /// Update the sync cursor for a peer.
419 pub fn set_sync_cursor(conn: &Connection, peer_url: &str, last_synced: &str) -> Result<()> {
420 conn.execute(
421 "INSERT INTO sync_cursors (peer_url, last_synced) VALUES (?1, ?2)
422 ON CONFLICT(peer_url) DO UPDATE SET last_synced = excluded.last_synced",
423 params![peer_url, last_synced],
424 )?;
425 Ok(())
426 }
427
428 #[cfg(test)]
429 mod tests {
430 use super::*;
431
432 fn test_new_ticket(title: &str) -> NewTicket {
433 NewTicket {
434 title: title.to_string(),
435 body: None,
436 priority: Priority::Medium,
437 channel: Channel::System,
438 source: Some("test".to_string()),
439 source_ref: None,
440 }
441 }
442
443 #[test]
444 fn create_and_get() {
445 let conn = open_memory().unwrap();
446 let node = get_or_create_node_id(&conn).unwrap();
447 let t = create_ticket(&conn, &test_new_ticket("fix the thing"), &node).unwrap();
448 assert_eq!(t.title, "fix the thing");
449 assert_eq!(t.status, Status::Open);
450 assert_eq!(t.channel, Channel::System);
451 assert_eq!(t.node_id, node);
452
453 let fetched = get_ticket(&conn, &t.id).unwrap();
454 assert_eq!(fetched.id, t.id);
455 }
456
457 #[test]
458 fn prefix_match() {
459 let conn = open_memory().unwrap();
460 let node = get_or_create_node_id(&conn).unwrap();
461 let t = create_ticket(&conn, &test_new_ticket("test"), &node).unwrap();
462 let fetched = get_ticket(&conn, &t.id[..8]).unwrap();
463 assert_eq!(fetched.id, t.id);
464 }
465
466 #[test]
467 fn list_with_filter() {
468 let conn = open_memory().unwrap();
469 let node = get_or_create_node_id(&conn).unwrap();
470 create_ticket(&conn, &NewTicket {
471 title: "urgent".into(),
472 body: None,
473 priority: Priority::Critical,
474 channel: Channel::Request,
475 source: Some("pom".into()),
476 source_ref: None,
477 }, &node).unwrap();
478 create_ticket(&conn, &test_new_ticket("normal"), &node).unwrap();
479
480 let all = list_tickets(&conn, &ListFilter::default()).unwrap();
481 assert_eq!(all.len(), 2);
482 assert_eq!(all[0].title, "urgent");
483
484 let requests = list_tickets(&conn, &ListFilter {
485 channel: Some(Channel::Request),
486 ..Default::default()
487 }).unwrap();
488 assert_eq!(requests.len(), 1);
489 }
490
491 #[test]
492 fn update_status_sets_resolved_at() {
493 let conn = open_memory().unwrap();
494 let node = get_or_create_node_id(&conn).unwrap();
495 let t = create_ticket(&conn, &test_new_ticket("resolve me"), &node).unwrap();
496 assert!(t.resolved_at.is_none());
497
498 update_status(&conn, &t.id, Status::Resolved).unwrap();
499 let updated = get_ticket(&conn, &t.id).unwrap();
500 assert_eq!(updated.status, Status::Resolved);
501 assert!(updated.resolved_at.is_some());
502 }
503
504 #[test]
505 fn sync_upsert_last_writer_wins() {
506 let conn = open_memory().unwrap();
507 let node = get_or_create_node_id(&conn).unwrap();
508 let t = create_ticket(&conn, &test_new_ticket("original"), &node).unwrap();
509
510 // Simulate a peer's version with a newer timestamp
511 let mut peer_ticket = t.clone();
512 peer_ticket.title = "updated by peer".to_string();
513 peer_ticket.updated_at = Utc::now() + chrono::Duration::seconds(10);
514
515 let changed = upsert_synced_ticket(&conn, &peer_ticket).unwrap();
516 assert!(changed);
517
518 let fetched = get_ticket(&conn, &peer_ticket.id).unwrap();
519 assert_eq!(fetched.title, "updated by peer");
520
521 // Older update should be rejected
522 let mut stale = peer_ticket.clone();
523 stale.title = "stale update".to_string();
524 stale.updated_at = Utc::now() - chrono::Duration::seconds(100);
525 let changed = upsert_synced_ticket(&conn, &stale).unwrap();
526 assert!(!changed);
527
528 let fetched = get_ticket(&conn, &peer_ticket.id).unwrap();
529 assert_eq!(fetched.title, "updated by peer"); // Not "stale update"
530 }
531
532 #[test]
533 fn search_filter() {
534 let conn = open_memory().unwrap();
535 let node = get_or_create_node_id(&conn).unwrap();
536 create_ticket(&conn, &test_new_ticket("refund issue"), &node).unwrap();
537 create_ticket(&conn, &test_new_ticket("build failure"), &node).unwrap();
538
539 let results = list_tickets(&conn, &ListFilter {
540 search: Some("refund"),
541 ..Default::default()
542 }).unwrap();
543 assert_eq!(results.len(), 1);
544 assert_eq!(results[0].title, "refund issue");
545 }
546
547 #[test]
548 fn update_ticket_changes_fields() {
549 let conn = open_memory().unwrap();
550 let node = get_or_create_node_id(&conn).unwrap();
551 let t = create_ticket(&conn, &test_new_ticket("orig"), &node).unwrap();
552
553 update_ticket(&conn, &t.id, "renamed", Some("with body")).unwrap();
554 let fetched = get_ticket(&conn, &t.id).unwrap();
555 assert_eq!(fetched.title, "renamed");
556 assert_eq!(fetched.body.as_deref(), Some("with body"));
557 }
558
559 #[test]
560 fn update_ticket_missing_errors() {
561 let conn = open_memory().unwrap();
562 assert!(update_ticket(&conn, "no-such-id", "x", None).is_err());
563 }
564
565 #[test]
566 fn stats_aggregates() {
567 let conn = open_memory().unwrap();
568 let node = get_or_create_node_id(&conn).unwrap();
569 let a = create_ticket(&conn, &NewTicket {
570 title: "a".into(), body: None, priority: Priority::Critical,
571 channel: Channel::System, source: Some("pom".into()), source_ref: None,
572 }, &node).unwrap();
573 create_ticket(&conn, &NewTicket {
574 title: "b".into(), body: None, priority: Priority::Low,
575 channel: Channel::Task, source: Some("manual".into()), source_ref: None,
576 }, &node).unwrap();
577 let c = create_ticket(&conn, &test_new_ticket("c"), &node).unwrap();
578 update_status(&conn, &c.id, Status::Resolved).unwrap();
579
580 let s = stats(&conn).unwrap();
581 assert_eq!(s.total, 3);
582 assert!(s.by_status.iter().any(|(st, n)| *st == Status::Open && *n == 2));
583 assert!(s.by_status.iter().any(|(st, n)| *st == Status::Resolved && *n == 1));
584 // Two open tickets: Critical and Low
585 assert!(s.open_by_priority.iter().any(|(p, n)| *p == Priority::Critical && *n == 1));
586 assert!(s.open_by_priority.iter().any(|(p, n)| *p == Priority::Low && *n == 1));
587 // Source counts only the open tickets
588 let pom = s.open_by_source.iter().find(|(k, _)| k == "pom").map(|(_, n)| *n);
589 assert_eq!(pom, Some(1));
590 assert!(s.avg_resolution_seconds.is_some());
591 // First ticket is still open
592 let _ = a;
593 }
594
595 #[test]
596 fn prune_deletes_matching() {
597 let conn = open_memory().unwrap();
598 let node = get_or_create_node_id(&conn).unwrap();
599 let closed = create_ticket(&conn, &test_new_ticket("old closed"), &node).unwrap();
600 update_status(&conn, &closed.id, Status::Closed).unwrap();
601 // Backdate updated_at by 100 days
602 let backdate = (Utc::now() - chrono::Duration::days(100)).to_rfc3339();
603 conn.execute(
604 "UPDATE tickets SET updated_at = ?1 WHERE id = ?2",
605 params![backdate, closed.id],
606 ).unwrap();
607
608 // A recent closed ticket should not be pruned
609 let recent = create_ticket(&conn, &test_new_ticket("recent closed"), &node).unwrap();
610 update_status(&conn, &recent.id, Status::Closed).unwrap();
611
612 // An old but still-open ticket should not be pruned
613 let open = create_ticket(&conn, &test_new_ticket("old open"), &node).unwrap();
614 conn.execute(
615 "UPDATE tickets SET updated_at = ?1 WHERE id = ?2",
616 params![backdate, open.id],
617 ).unwrap();
618
619 let n = prune_tickets(&conn, chrono::Duration::days(90), Status::Closed).unwrap();
620 assert_eq!(n, 1);
621 assert!(get_ticket(&conn, &closed.id).is_err());
622 assert!(get_ticket(&conn, &recent.id).is_ok());
623 assert!(get_ticket(&conn, &open.id).is_ok());
624 }
625
626 #[test]
627 fn node_id_persists() {
628 let conn = open_memory().unwrap();
629 let id1 = get_or_create_node_id(&conn).unwrap();
630 let id2 = get_or_create_node_id(&conn).unwrap();
631 assert_eq!(id1, id2);
632 }
633 }
634