Skip to main content

max / makenotwork

WAM v0.2.0: channels, node identity, peer sync Three ticket channels: - system: machine-to-machine (CI, health, refunds) - request: bidirectional human-system (approvals, investigations) - task: human-to-human async coordination Distributed sync across tailnet: - Each WAM instance has a persistent node_id (UUID in SQLite meta table) - Tickets carry origin node_id and channel - Sync endpoints: GET /sync/pull?since=<rfc3339>, POST /sync/push - Background sync loop pulls from peers every 30s - Last-writer-wins conflict resolution on updated_at - Sync cursors tracked per peer to avoid re-processing - Peers configured via --peer flag: wam serve --peer http://host:7890 Schema migration: adds channel, node_id columns to tickets table, sync_cursors table, meta table. Backwards-compatible with v0.1 DBs. TUI: channel column + filter (t key), node column in list view. CLI: --channel flag on create, channel filter on list. 7 tests (2 new: sync upsert last-writer-wins, node ID persistence). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-04-25 21:24 UTC
Commit: 61ee4ef495229673226b51352a88772ee6c906f1
Parent: 7aac358
8 files changed, +904 insertions, -78 deletions
M wam/Cargo.lock +374 -1
@@ -168,6 +168,12 @@ dependencies = [
168 168 ]
169 169
170 170 [[package]]
171 + name = "base64"
172 + version = "0.22.1"
173 + source = "registry+https://github.com/rust-lang/crates.io-index"
174 + checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
175 +
176 + [[package]]
171 177 name = "bitflags"
172 178 version = "2.11.1"
173 179 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -217,6 +223,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
217 223 checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
218 224
219 225 [[package]]
226 + name = "cfg_aliases"
227 + version = "0.2.1"
228 + source = "registry+https://github.com/rust-lang/crates.io-index"
229 + checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
230 +
231 + [[package]]
220 232 name = "chrono"
221 233 version = "0.4.44"
222 234 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -404,6 +416,17 @@ dependencies = [
404 416 ]
405 417
406 418 [[package]]
419 + name = "displaydoc"
420 + version = "0.2.5"
421 + source = "registry+https://github.com/rust-lang/crates.io-index"
422 + checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
423 + dependencies = [
424 + "proc-macro2",
425 + "quote",
426 + "syn",
427 + ]
428 +
429 + [[package]]
407 430 name = "either"
408 431 version = "1.15.0"
409 432 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -508,8 +531,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
508 531 checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
509 532 dependencies = [
510 533 "cfg-if",
534 + "js-sys",
511 535 "libc",
512 536 "wasi",
537 + "wasm-bindgen",
538 + ]
539 +
540 + [[package]]
541 + name = "getrandom"
542 + version = "0.3.4"
543 + source = "registry+https://github.com/rust-lang/crates.io-index"
544 + checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
545 + dependencies = [
546 + "cfg-if",
547 + "js-sys",
548 + "libc",
549 + "r-efi 5.3.0",
550 + "wasip2",
551 + "wasm-bindgen",
513 552 ]
514 553
515 554 [[package]]
@@ -520,7 +559,7 @@ checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
520 559 dependencies = [
521 560 "cfg-if",
522 561 "libc",
523 - "r-efi",
562 + "r-efi 6.0.0",
524 563 "wasip2",
525 564 "wasip3",
526 565 ]
@@ -626,6 +665,23 @@ dependencies = [
626 665 "pin-project-lite",
627 666 "smallvec",
628 667 "tokio",
668 + "want",
669 + ]
670 +
671 + [[package]]
672 + name = "hyper-rustls"
673 + version = "0.27.9"
674 + source = "registry+https://github.com/rust-lang/crates.io-index"
675 + checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f"
676 + dependencies = [
677 + "http",
678 + "hyper",
679 + "hyper-util",
680 + "rustls",
681 + "tokio",
682 + "tokio-rustls",
683 + "tower-service",
684 + "webpki-roots",
629 685 ]
630 686
631 687 [[package]]
@@ -634,13 +690,21 @@ version = "0.1.20"
634 690 source = "registry+https://github.com/rust-lang/crates.io-index"
635 691 checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0"
636 692 dependencies = [
693 + "base64",
637 694 "bytes",
695 + "futures-channel",
696 + "futures-util",
638 697 "http",
639 698 "http-body",
640 699 "hyper",
700 + "ipnet",
701 + "libc",
702 + "percent-encoding",
641 703 "pin-project-lite",
704 + "socket2",
642 705 "tokio",
643 706 "tower-service",
707 + "tracing",
644 708 ]
645 709
646 710 [[package]]
@@ -668,6 +732,88 @@ dependencies = [
668 732 ]
669 733
670 734 [[package]]
735 + name = "icu_collections"
736 + version = "2.2.0"
737 + source = "registry+https://github.com/rust-lang/crates.io-index"
738 + checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c"
739 + dependencies = [
740 + "displaydoc",
741 + "potential_utf",
742 + "utf8_iter",
743 + "yoke",
744 + "zerofrom",
745 + "zerovec",
746 + ]
747 +
748 + [[package]]
749 + name = "icu_locale_core"
750 + version = "2.2.0"
751 + source = "registry+https://github.com/rust-lang/crates.io-index"
752 + checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29"
753 + dependencies = [
754 + "displaydoc",
755 + "litemap",
756 + "tinystr",
757 + "writeable",
758 + "zerovec",
759 + ]
760 +
761 + [[package]]
762 + name = "icu_normalizer"
763 + version = "2.2.0"
764 + source = "registry+https://github.com/rust-lang/crates.io-index"
765 + checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4"
766 + dependencies = [
767 + "icu_collections",
768 + "icu_normalizer_data",
769 + "icu_properties",
770 + "icu_provider",
771 + "smallvec",
772 + "zerovec",
773 + ]
774 +
775 + [[package]]
776 + name = "icu_normalizer_data"
777 + version = "2.2.0"
778 + source = "registry+https://github.com/rust-lang/crates.io-index"
779 + checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38"
780 +
781 + [[package]]
782 + name = "icu_properties"
783 + version = "2.2.0"
784 + source = "registry+https://github.com/rust-lang/crates.io-index"
785 + checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de"
786 + dependencies = [
787 + "icu_collections",
788 + "icu_locale_core",
789 + "icu_properties_data",
790 + "icu_provider",
791 + "zerotrie",
792 + "zerovec",
793 + ]
794 +
795 + [[package]]
796 + name = "icu_properties_data"
797 + version = "2.2.0"
798 + source = "registry+https://github.com/rust-lang/crates.io-index"
799 + checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14"
800 +
801 + [[package]]
802 + name = "icu_provider"
803 + version = "2.2.0"
804 + source = "registry+https://github.com/rust-lang/crates.io-index"
805 + checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421"
806 + dependencies = [
807 + "displaydoc",
808 + "icu_locale_core",
809 + "writeable",
810 + "yoke",
811 + "zerofrom",
812 + "zerotrie",
813 + "zerovec",
814 + ]
815 +
816 + [[package]]
671 817 name = "id-arena"
672 818 version = "2.3.0"
673 819 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -680,6 +826,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
680 826 checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
681 827
682 828 [[package]]
829 + name = "idna"
830 + version = "1.1.0"
831 + source = "registry+https://github.com/rust-lang/crates.io-index"
832 + checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de"
833 + dependencies = [
834 + "idna_adapter",
835 + "smallvec",
836 + "utf8_iter",
837 + ]
838 +
839 + [[package]]
840 + name = "idna_adapter"
841 + version = "1.2.1"
842 + source = "registry+https://github.com/rust-lang/crates.io-index"
843 + checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344"
844 + dependencies = [
845 + "icu_normalizer",
846 + "icu_properties",
847 + ]
848 +
849 + [[package]]
683 850 name = "indenter"
684 851 version = "0.3.4"
685 852 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -720,6 +887,22 @@ dependencies = [
720 887 ]
721 888
722 889 [[package]]
890 + name = "ipnet"
891 + version = "2.12.0"
892 + source = "registry+https://github.com/rust-lang/crates.io-index"
893 + checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2"
894 +
895 + [[package]]
896 + name = "iri-string"
897 + version = "0.7.12"
898 + source = "registry+https://github.com/rust-lang/crates.io-index"
899 + checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20"
900 + dependencies = [
901 + "memchr",
902 + "serde",
903 + ]
904 +
905 + [[package]]
723 906 name = "is_terminal_polyfill"
724 907 version = "1.70.2"
725 908 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -746,6 +929,8 @@ version = "0.3.95"
746 929 source = "registry+https://github.com/rust-lang/crates.io-index"
747 930 checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca"
748 931 dependencies = [
932 + "cfg-if",
933 + "futures-util",
749 934 "once_cell",
750 935 "wasm-bindgen",
751 936 ]
@@ -795,6 +980,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
795 980 checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
796 981
797 982 [[package]]
983 + name = "litemap"
984 + version = "0.8.2"
985 + source = "registry+https://github.com/rust-lang/crates.io-index"
986 + checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0"
987 +
988 + [[package]]
798 989 name = "lock_api"
799 990 version = "0.4.14"
800 991 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -819,6 +1010,12 @@ dependencies = [
819 1010 ]
820 1011
821 1012 [[package]]
1013 + name = "lru-slab"
1014 + version = "0.1.2"
1015 + source = "registry+https://github.com/rust-lang/crates.io-index"
1016 + checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
1017 +
1018 + [[package]]
822 1019 name = "matchit"
823 1020 version = "0.8.4"
824 1021 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -947,6 +1144,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
947 1144 checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
948 1145
949 1146 [[package]]
1147 + name = "potential_utf"
1148 + version = "0.1.5"
1149 + source = "registry+https://github.com/rust-lang/crates.io-index"
1150 + checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564"
1151 + dependencies = [
1152 + "zerovec",
1153 + ]
1154 +
1155 + [[package]]
1156 + name = "ppv-lite86"
1157 + version = "0.2.21"
1158 + source = "registry+https://github.com/rust-lang/crates.io-index"
1159 + checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
1160 + dependencies = [
1161 + "zerocopy",
1162 + ]
1163 +
1164 + [[package]]
950 1165 name = "prettyplease"
951 1166 version = "0.2.37"
952 1167 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -966,6 +1181,61 @@ dependencies = [
966 1181 ]
967 1182
968 1183 [[package]]
1184 + name = "quinn"
1185 + version = "0.11.9"
1186 + source = "registry+https://github.com/rust-lang/crates.io-index"
1187 + checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20"
1188 + dependencies = [
1189 + "bytes",
1190 + "cfg_aliases",
1191 + "pin-project-lite",
1192 + "quinn-proto",
1193 + "quinn-udp",
1194 + "rustc-hash",
1195 + "rustls",
1196 + "socket2",
1197 + "thiserror",
1198 + "tokio",
1199 + "tracing",
1200 + "web-time",
1201 + ]
1202 +
1203 + [[package]]
1204 + name = "quinn-proto"
1205 + version = "0.11.14"
1206 + source = "registry+https://github.com/rust-lang/crates.io-index"
1207 + checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098"
1208 + dependencies = [
1209 + "bytes",
1210 + "getrandom 0.3.4",
1211 + "lru-slab",
1212 + "rand",
1213 + "ring",
1214 + "rustc-hash",
1215 + "rustls",
1216 + "rustls-pki-types",
1217 + "slab",
1218 + "thiserror",
1219 + "tinyvec",
1220 + "tracing",
1221 + "web-time",
1222 + ]
1223 +
1224 + [[package]]
1225 + name = "quinn-udp"
1226 + version = "0.5.14"
1227 + source = "registry+https://github.com/rust-lang/crates.io-index"
1228 + checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd"
1229 + dependencies = [
1230 + "cfg_aliases",
1231 + "libc",
1232 + "once_cell",
1233 + "socket2",
1234 + "tracing",
1235 + "windows-sys 0.59.0",
1236 + ]
1237 +
1238 + [[package]]
969 1239 name = "quote"
970 1240 version = "1.0.45"
971 1241 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -976,11 +1246,46 @@ dependencies = [
976 1246
977 1247 [[package]]
978 1248 name = "r-efi"
1249 + version = "5.3.0"
1250 + source = "registry+https://github.com/rust-lang/crates.io-index"
1251 + checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
1252 +
1253 + [[package]]
1254 + name = "r-efi"
979 1255 version = "6.0.0"
980 1256 source = "registry+https://github.com/rust-lang/crates.io-index"
981 1257 checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf"
982 1258
983 1259 [[package]]
1260 + name = "rand"
1261 + version = "0.9.4"
1262 + source = "registry+https://github.com/rust-lang/crates.io-index"
1263 + checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea"
1264 + dependencies = [
1265 + "rand_chacha",
1266 + "rand_core",
1267 + ]
1268 +
1269 + [[package]]
1270 + name = "rand_chacha"
1271 + version = "0.9.0"
1272 + source = "registry+https://github.com/rust-lang/crates.io-index"
1273 + checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
1274 + dependencies = [
1275 + "ppv-lite86",
1276 + "rand_core",
1277 + ]
1278 +
1279 + [[package]]
1280 + name = "rand_core"
1281 + version = "0.9.5"
1282 + source = "registry+https://github.com/rust-lang/crates.io-index"
1283 + checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c"
1284 + dependencies = [
1285 + "getrandom 0.3.4",
1286 + ]
1287 +
1288 + [[package]]
984 1289 name = "ratatui"
985 1290 version = "0.29.0"
986 1291 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1022,6 +1327,58 @@ dependencies = [
1022 1327 ]
1023 1328
1024 1329 [[package]]
1330 + name = "reqwest"
1331 + version = "0.12.28"
1332 + source = "registry+https://github.com/rust-lang/crates.io-index"
1333 + checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147"
1334 + dependencies = [
1335 + "base64",
1336 + "bytes",
1337 + "futures-core",
1338 + "http",
1339 + "http-body",
1340 + "http-body-util",
1341 + "hyper",
1342 + "hyper-rustls",
1343 + "hyper-util",
1344 + "js-sys",
1345 + "log",
1346 + "percent-encoding",
1347 + "pin-project-lite",
1348 + "quinn",
1349 + "rustls",
1350 + "rustls-pki-types",
1351 + "serde",
1352 + "serde_json",
1353 + "serde_urlencoded",
1354 + "sync_wrapper",
1355 + "tokio",
1356 + "tokio-rustls",
1357 + "tower",
1358 + "tower-http",
1359 + "tower-service",
1360 + "url",
1361 + "wasm-bindgen",
1362 + "wasm-bindgen-futures",
1363 + "web-sys",
1364 + "webpki-roots",
1365 + ]
1366 +
1367 + [[package]]
1368 + name = "ring"
1369 + version = "0.17.14"
1370 + source = "registry+https://github.com/rust-lang/crates.io-index"
1371 + checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
1372 + dependencies = [
1373 + "cc",
1374 + "cfg-if",
1375 + "getrandom 0.2.17",
1376 + "libc",
1377 + "untrusted",
1378 + "windows-sys 0.52.0",
1379 + ]
1380 +
1381 + [[package]]
1025 1382 name = "rusqlite"
1026 1383 version = "0.34.0"
1027 1384 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1042,6 +1399,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
1042 1399 checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d"
1043 1400
1044 1401 [[package]]
1402 + name = "rustc-hash"
1403 + version = "2.1.2"
1404 + source = "registry+https://github.com/rust-lang/crates.io-index"
1405 + checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe"
1406 +
1407 + [[package]]
1045 1408 name = "rustix"
1046 1409 version = "0.38.44"
1047 1410 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1055,6 +1418,41 @@ dependencies = [
1055 1418 ]
1056 1419
1057 1420 [[package]]
1421 + name = "rustls"
1422 + version = "0.23.39"
1423 + source = "registry+https://github.com/rust-lang/crates.io-index"
1424 + checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e"
1425 + dependencies = [
1426 + "once_cell",
1427 + "ring",
1428 + "rustls-pki-types",
1429 + "rustls-webpki",
1430 + "subtle",
Lines truncated
M wam/Cargo.toml +3 -1
@@ -19,5 +19,7 @@ ratatui = "0.29"
19 19 rusqlite = { version = "0.34", features = ["bundled"] }
20 20 serde = { version = "1", features = ["derive"] }
21 21 serde_json = "1"
22 - tokio = { version = "1", features = ["macros", "rt-multi-thread", "net"] }
22 + reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
23 + tokio = { version = "1", features = ["macros", "rt-multi-thread", "net", "time"] }
24 + urlencoding = "2"
23 25 uuid = { version = "1", features = ["v4"] }
M wam/src/api.rs +203 -30
@@ -1,9 +1,10 @@
1 - //! HTTP API for programmatic ticket management.
1 + //! HTTP API for programmatic ticket management and peer sync.
2 2 //!
3 - //! Designed to run on the tailnet -- tailnet membership is the auth boundary,
4 - //! so no HMAC or token auth is needed.
3 + //! Designed to run on the tailnet -- tailnet membership is the auth boundary.
5 4
6 - use std::sync::{Arc, Mutex};
5 + use std::sync::Arc;
6 +
7 + use tokio::sync::Mutex;
7 8
8 9 use axum::{
9 10 Json, Router,
@@ -16,41 +17,67 @@ use rusqlite::Connection;
16 17 use serde::Deserialize;
17 18
18 19 use crate::db::{self, ListFilter};
19 - use crate::types::{NewTicket, Priority, Status};
20 + use crate::types::{Channel, NewTicket, Priority, Status, Ticket};
21 +
22 + /// Shared state: SQLite connection + node identity.
23 + #[derive(Clone)]
24 + pub struct AppState {
25 + pub db: Arc<Mutex<Connection>>,
26 + pub node_id: String,
27 + }
28 +
29 + /// Start the HTTP server, optionally syncing with peers.
30 + pub async fn serve(
31 + conn: Connection,
32 + port: u16,
33 + peers: Vec<String>,
34 + ) -> color_eyre::eyre::Result<()> {
35 + let node_id = db::get_or_create_node_id(&conn)?;
36 + eprintln!("node: {}", &node_id[..8]);
20 37
21 - /// Shared state: SQLite connection behind a mutex.
22 - pub type Db = Arc<Mutex<Connection>>;
38 + let app_state = AppState {
39 + db: Arc::new(Mutex::new(conn)),
40 + node_id,
41 + };
42 +
43 + // Spawn sync loop if peers are configured
44 + if !peers.is_empty() {
45 + let sync_state = app_state.clone();
46 + let sync_peers = peers.clone();
47 + tokio::spawn(async move {
48 + sync_loop(sync_state, sync_peers).await;
49 + });
50 + }
23 51
24 - /// Build the axum router.
25 - pub fn router(conn: Connection) -> Router {
26 - let db: Db = Arc::new(Mutex::new(conn));
27 - Router::new()
52 + let app = Router::new()
28 53 .route("/tickets", post(create_ticket))
29 54 .route("/tickets", get(list_tickets))
30 55 .route("/tickets/{id}", get(get_ticket))
31 56 .route("/tickets/{id}", patch(update_ticket))
32 - .with_state(db)
33 - }
57 + .route("/sync/pull", get(sync_pull))
58 + .route("/sync/push", post(sync_push))
59 + .route("/sync/node", get(sync_node_info))
60 + .with_state(app_state);
34 61
35 - /// Start the HTTP server on the given port.
36 - pub async fn serve(conn: Connection, port: u16) -> color_eyre::eyre::Result<()> {
37 - let app = router(conn);
38 62 let addr = format!("0.0.0.0:{port}");
39 63 let listener = tokio::net::TcpListener::bind(&addr).await?;
40 64 eprintln!("wam serving on {addr}");
65 + if !peers.is_empty() {
66 + eprintln!("syncing with {} peer(s)", peers.len());
67 + }
41 68 axum::serve(listener, app).await?;
42 69 Ok(())
43 70 }
44 71
45 - // -- Handlers -----------------------------------------------------------------
72 + // -- Ticket handlers ----------------------------------------------------------
46 73
47 74 /// POST /tickets
48 75 async fn create_ticket(
49 - State(db): State<Db>,
76 + State(state): State<AppState>,
50 77 Json(new): Json<NewTicket>,
51 78 ) -> impl IntoResponse {
52 - let conn = db.lock().unwrap();
53 - match db::create_ticket(&conn, &new) {
79 + let conn = state.db.lock().await;
80 + match db::create_ticket(&conn, &new, &state.node_id) {
54 81 Ok(ticket) => (StatusCode::CREATED, Json(serde_json::json!(ticket))).into_response(),
55 82 Err(e) => (
56 83 StatusCode::INTERNAL_SERVER_ERROR,
@@ -60,27 +87,29 @@ async fn create_ticket(
60 87 }
61 88 }
62 89
63 - /// Query params for GET /tickets.
64 90 #[derive(Debug, Deserialize, Default)]
65 91 pub struct ListQuery {
66 92 pub status: Option<String>,
67 93 pub priority: Option<String>,
94 + pub channel: Option<String>,
68 95 pub source: Option<String>,
69 96 pub search: Option<String>,
70 97 }
71 98
72 99 /// GET /tickets
73 100 async fn list_tickets(
74 - State(db): State<Db>,
101 + State(state): State<AppState>,
75 102 Query(q): Query<ListQuery>,
76 103 ) -> impl IntoResponse {
77 104 let status = q.status.as_deref().and_then(|s| s.parse::<Status>().ok());
78 105 let priority = q.priority.as_deref().and_then(|s| s.parse::<Priority>().ok());
106 + let channel = q.channel.as_deref().and_then(|s| s.parse::<Channel>().ok());
79 107
80 - let conn = db.lock().unwrap();
108 + let conn = state.db.lock().await;
81 109 let filter = ListFilter {
82 110 status,
83 111 priority,
112 + channel,
84 113 source: q.source.as_deref(),
85 114 search: q.search.as_deref(),
86 115 };
@@ -96,10 +125,10 @@ async fn list_tickets(
96 125
97 126 /// GET /tickets/:id
98 127 async fn get_ticket(
99 - State(db): State<Db>,
128 + State(state): State<AppState>,
100 129 Path(id): Path<String>,
101 130 ) -> impl IntoResponse {
102 - let conn = db.lock().unwrap();
131 + let conn = state.db.lock().await;
103 132 match db::get_ticket(&conn, &id) {
104 133 Ok(ticket) => Json(serde_json::json!(ticket)).into_response(),
105 134 Err(_) => (
@@ -110,7 +139,6 @@ async fn get_ticket(
110 139 }
111 140 }
112 141
113 - /// PATCH /tickets/:id body.
114 142 #[derive(Debug, Deserialize)]
115 143 pub struct UpdateBody {
116 144 pub status: Option<String>,
@@ -118,13 +146,12 @@ pub struct UpdateBody {
118 146
119 147 /// PATCH /tickets/:id
120 148 async fn update_ticket(
121 - State(db): State<Db>,
149 + State(state): State<AppState>,
122 150 Path(id): Path<String>,
123 151 Json(body): Json<UpdateBody>,
124 152 ) -> impl IntoResponse {
125 - let conn = db.lock().unwrap();
153 + let conn = state.db.lock().await;
126 154
127 - // Resolve prefix to full ID first
128 155 let ticket = match db::get_ticket(&conn, &id) {
129 156 Ok(t) => t,
130 157 Err(_) => {
@@ -156,7 +183,6 @@ async fn update_ticket(
156 183 }
157 184 }
158 185
159 - // Return updated ticket
160 186 match db::get_ticket(&conn, &ticket.id) {
161 187 Ok(t) => Json(serde_json::json!(t)).into_response(),
162 188 Err(e) => (
@@ -166,3 +192,150 @@ async fn update_ticket(
166 192 .into_response(),
167 193 }
168 194 }
195 +
196 + // -- Sync endpoints -----------------------------------------------------------
197 +
198 + #[derive(Debug, Deserialize)]
199 + pub struct SyncPullQuery {
200 + /// RFC3339 timestamp. Returns tickets updated after this time.
201 + pub since: String,
202 + }
203 +
204 + /// GET /sync/pull?since=<rfc3339> — peer pulls tickets updated after timestamp
205 + async fn sync_pull(
206 + State(state): State<AppState>,
207 + Query(q): Query<SyncPullQuery>,
208 + ) -> impl IntoResponse {
209 + let conn = state.db.lock().await;
210 + match db::tickets_since(&conn, &q.since) {
211 + Ok(tickets) => Json(serde_json::json!({
212 + "tickets": tickets,
213 + "count": tickets.len(),
214 + "node_id": state.node_id,
215 + }))
216 + .into_response(),
217 + Err(e) => (
218 + StatusCode::INTERNAL_SERVER_ERROR,
219 + Json(serde_json::json!({"error": e.to_string()})),
220 + )
221 + .into_response(),
222 + }
223 + }
224 +
225 + /// POST /sync/push — peer pushes tickets to us
226 + async fn sync_push(
227 + State(state): State<AppState>,
228 + Json(tickets): Json<Vec<Ticket>>,
229 + ) -> impl IntoResponse {
230 + let conn = state.db.lock().await;
231 + let mut accepted = 0u32;
232 + let mut rejected = 0u32;
233 +
234 + for ticket in &tickets {
235 + match db::upsert_synced_ticket(&conn, ticket) {
236 + Ok(true) => accepted += 1,
237 + Ok(false) => rejected += 1,
238 + Err(e) => {
239 + eprintln!("sync upsert error for {}: {e}", ticket.short_id());
240 + rejected += 1;
241 + }
242 + }
243 + }
244 +
245 + Json(serde_json::json!({
246 + "accepted": accepted,
247 + "rejected": rejected,
248 + }))
249 + }
250 +
251 + /// GET /sync/node — returns this node's identity
252 + async fn sync_node_info(State(state): State<AppState>) -> impl IntoResponse {
253 + Json(serde_json::json!({
254 + "node_id": state.node_id,
255 + }))
256 + }
257 +
258 + // -- Background sync loop -----------------------------------------------------
259 +
260 + /// Periodically pull from all peers and push local changes.
261 + async fn sync_loop(state: AppState, peers: Vec<String>) {
262 + let client = reqwest::Client::builder()
263 + .timeout(std::time::Duration::from_secs(10))
264 + .connect_timeout(std::time::Duration::from_secs(5))
265 + .build()
266 + .unwrap();
267 +
268 + loop {
269 + tokio::time::sleep(std::time::Duration::from_secs(30)).await;
270 +
271 + for peer in &peers {
272 + if let Err(e) = sync_with_peer(&state, &client, peer).await {
273 + eprintln!("sync with {peer}: {e}");
274 + }
275 + }
276 + }
277 + }
278 +
279 + /// Pull new tickets from a peer, then push our new tickets to them.
280 + async fn sync_with_peer(
281 + state: &AppState,
282 + client: &reqwest::Client,
283 + peer_url: &str,
284 + ) -> Result<(), Box<dyn std::error::Error>> {
285 + let conn = state.db.lock().await;
286 +
287 + // Get our cursor for this peer (default to epoch)
288 + let cursor = db::get_sync_cursor(&conn, peer_url)?
289 + .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string());
290 +
291 + drop(conn); // Release lock before HTTP
292 +
293 + // Pull from peer
294 + let pull_url = format!("{peer_url}/sync/pull?since={}", urlencoding::encode(&cursor));
295 + let resp: serde_json::Value = client.get(&pull_url).send().await?.json().await?;
296 +
297 + let tickets: Vec<Ticket> = serde_json::from_value(
298 + resp.get("tickets").cloned().unwrap_or(serde_json::json!([])),
299 + )?;
300 +
301 + if !tickets.is_empty() {
302 + let conn = state.db.lock().await;
303 + let mut latest_updated = cursor.clone();
304 +
305 + for ticket in &tickets {
306 + db::upsert_synced_ticket(&conn, ticket)?;
307 + let ts = ticket.updated_at.to_rfc3339();
308 + if ts > latest_updated {
309 + latest_updated = ts;
310 + }
311 + }
312 +
313 + db::set_sync_cursor(&conn, peer_url, &latest_updated)?;
314 + drop(conn);
315 +
316 + eprintln!("sync: pulled {} ticket(s) from {peer_url}", tickets.len());
317 + }
318 +
319 + // Push our changes to peer (tickets updated since their last pull from us)
320 + // We use the same cursor — they'll filter by last-writer-wins
321 + let conn = state.db.lock().await;
322 + let our_tickets = db::tickets_since(&conn, &cursor)?;
323 + drop(conn);
324 +
325 + if !our_tickets.is_empty() {
326 + let push_url = format!("{peer_url}/sync/push");
327 + let resp: serde_json::Value = client
328 + .post(&push_url)
329 + .json(&our_tickets)
330 + .send()
331 + .await?
332 + .json()
333 + .await?;
334 + let accepted = resp.get("accepted").and_then(|v| v.as_u64()).unwrap_or(0);
335 + if accepted > 0 {
336 + eprintln!("sync: pushed {accepted} ticket(s) to {peer_url}");
337 + }
338 + }
339 +
340 + Ok(())
341 + }
M wam/src/cli.rs +12 -3
@@ -2,10 +2,10 @@
2 2
3 3 use clap::{Parser, Subcommand};
4 4
5 - use crate::types::{Priority, Status};
5 + use crate::types::{Channel, Priority, Status};
6 6
7 7 #[derive(Parser)]
8 - #[command(name = "wam", about = "Whack-a-Mole -- ticket manager")]
8 + #[command(name = "wam", about = "Whack-a-Mole -- distributed ticket manager")]
9 9 pub struct Cli {
10 10 #[command(subcommand)]
11 11 pub command: Option<Command>,
@@ -24,6 +24,9 @@ pub enum Command {
24 24 /// Priority (low, medium, high, critical)
25 25 #[arg(short, long, default_value = "medium")]
26 26 priority: Priority,
27 + /// Channel (system, request, task)
28 + #[arg(short, long, default_value = "task")]
29 + channel: Channel,
27 30 /// Source system (e.g. "refund-escalation", "pom")
28 31 #[arg(short, long, default_value = "manual")]
29 32 source: String,
@@ -39,6 +42,9 @@ pub enum Command {
39 42 /// Filter by priority
40 43 #[arg(short, long)]
41 44 priority: Option<Priority>,
45 + /// Filter by channel
46 + #[arg(short, long)]
47 + channel: Option<Channel>,
42 48 /// Filter by source
43 49 #[arg(long)]
44 50 source: Option<String>,
@@ -58,10 +64,13 @@ pub enum Command {
58 64 /// Ticket ID (or unique prefix)
59 65 id: String,
60 66 },
61 - /// Start the HTTP API server (tailnet-only, no auth required)
67 + /// Start the HTTP API server with optional peer sync
62 68 Serve {
63 69 /// Port to listen on
64 70 #[arg(short, long, default_value = "7890")]
65 71 port: u16,
72 + /// Peer WAM URLs to sync with (repeatable)
73 + #[arg(long)]
74 + peer: Vec<String>,
66 75 },
67 76 }
M wam/src/db.rs +203 -23
@@ -1,10 +1,10 @@
1 - //! SQLite data layer for tickets.
1 + //! SQLite data layer for tickets with sync support.
2 2
3 3 use chrono::{DateTime, Utc};
4 4 use color_eyre::eyre::{Result, WrapErr, eyre};
5 5 use rusqlite::{Connection, Row, params};
6 6
7 - use crate::types::{NewTicket, Priority, Status, Ticket};
7 + use crate::types::{Channel, NewTicket, Priority, Status, Ticket};
8 8
9 9 /// Open (or create) the WAM database and run migrations.
10 10 pub fn open_db() -> Result<Connection> {
@@ -39,19 +39,62 @@ fn migrate(conn: &Connection) -> Result<()> {
39 39 body TEXT,
40 40 priority TEXT NOT NULL DEFAULT 'medium',
41 41 status TEXT NOT NULL DEFAULT 'open',
42 + channel TEXT NOT NULL DEFAULT 'system',
43 + node_id TEXT NOT NULL DEFAULT '',
42 44 source TEXT,
43 45 source_ref TEXT,
44 46 created_at TEXT NOT NULL,
45 47 updated_at TEXT NOT NULL,
46 48 resolved_at TEXT
47 - )",
49 + );
50 + CREATE TABLE IF NOT EXISTS meta (
51 + key TEXT PRIMARY KEY,
52 + value TEXT NOT NULL
53 + );
54 + CREATE TABLE IF NOT EXISTS sync_cursors (
55 + peer_url TEXT PRIMARY KEY,
56 + last_synced TEXT NOT NULL
57 + );",
48 58 )?;
59 +
60 + // Add channel/node_id columns if migrating from v0.1 schema
61 + let has_channel: bool = conn
62 + .prepare("SELECT channel FROM tickets LIMIT 0")
63 + .is_ok();
64 + if !has_channel {
65 + conn.execute_batch(
66 + "ALTER TABLE tickets ADD COLUMN channel TEXT NOT NULL DEFAULT 'system';
67 + ALTER TABLE tickets ADD COLUMN node_id TEXT NOT NULL DEFAULT '';",
68 + )?;
69 + }
70 +
49 71 Ok(())
50 72 }
51 73
74 + /// Get or create this node's persistent identity.
75 + pub fn get_or_create_node_id(conn: &Connection) -> Result<String> {
76 + let existing: Option<String> = conn
77 + .query_row("SELECT value FROM meta WHERE key = 'node_id'", [], |row| {
78 + row.get(0)
79 + })
80 + .ok();
81 +
82 + if let Some(id) = existing {
83 + return Ok(id);
84 + }
85 +
86 + let id = uuid::Uuid::new_v4().to_string();
87 + conn.execute(
88 + "INSERT INTO meta (key, value) VALUES ('node_id', ?1)",
89 + params![id],
90 + )?;
91 + Ok(id)
92 + }
93 +
52 94 fn row_to_ticket(row: &Row) -> rusqlite::Result<Ticket> {
53 95 let priority_str: String = row.get("priority")?;
54 96 let status_str: String = row.get("status")?;
97 + let channel_str: String = row.get("channel")?;
55 98 let created_str: String = row.get("created_at")?;
56 99 let updated_str: String = row.get("updated_at")?;
57 100 let resolved_str: Option<String> = row.get("resolved_at")?;
@@ -62,6 +105,8 @@ fn row_to_ticket(row: &Row) -> rusqlite::Result<Ticket> {
62 105 body: row.get("body")?,
63 106 priority: priority_str.parse().unwrap_or(Priority::Medium),
64 107 status: status_str.parse().unwrap_or(Status::Open),
108 + channel: channel_str.parse().unwrap_or(Channel::System),
109 + node_id: row.get("node_id")?,
65 110 source: row.get("source")?,
66 111 source_ref: row.get("source_ref")?,
67 112 created_at: DateTime::parse_from_rfc3339(&created_str)
@@ -79,18 +124,20 @@ fn row_to_ticket(row: &Row) -> rusqlite::Result<Ticket> {
79 124 }
80 125
81 126 /// Create a new ticket. Returns the created ticket.
82 - pub fn create_ticket(conn: &Connection, new: &NewTicket) -> Result<Ticket> {
127 + pub fn create_ticket(conn: &Connection, new: &NewTicket, node_id: &str) -> Result<Ticket> {
83 128 let id = uuid::Uuid::new_v4().to_string();
84 129 let now = Utc::now().to_rfc3339();
85 130
86 131 conn.execute(
87 - "INSERT INTO tickets (id, title, body, priority, status, source, source_ref, created_at, updated_at)
88 - VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, ?7)",
132 + "INSERT INTO tickets (id, title, body, priority, status, channel, node_id, source, source_ref, created_at, updated_at)
133 + VALUES (?1, ?2, ?3, ?4, 'open', ?5, ?6, ?7, ?8, ?9, ?9)",
89 134 params![
90 135 id,
91 136 new.title,
92 137 new.body,
93 138 new.priority.to_string(),
139 + new.channel.to_string(),
140 + node_id,
94 141 new.source,
95 142 new.source_ref,
96 143 now,
@@ -121,6 +168,7 @@ pub fn get_ticket(conn: &Connection, id_prefix: &str) -> Result<Ticket> {
121 168 pub struct ListFilter<'a> {
122 169 pub status: Option<Status>,
123 170 pub priority: Option<Priority>,
171 + pub channel: Option<Channel>,
124 172 pub source: Option<&'a str>,
125 173 pub search: Option<&'a str>,
126 174 }
@@ -138,6 +186,10 @@ pub fn list_tickets(conn: &Connection, filter: &ListFilter) -> Result<Vec<Ticket
138 186 bind_values.push(priority.to_string());
139 187 sql.push_str(&format!(" AND priority = ?{}", bind_values.len()));
140 188 }
189 + if let Some(channel) = filter.channel {
190 + bind_values.push(channel.to_string());
191 + sql.push_str(&format!(" AND channel = ?{}", bind_values.len()));
192 + }
141 193 if let Some(source) = filter.source {
142 194 bind_values.push(source.to_string());
143 195 sql.push_str(&format!(" AND source = ?{}", bind_values.len()));
@@ -187,6 +239,96 @@ pub fn update_status(conn: &Connection, id: &str, status: Status) -> Result<()>
187 239 Ok(())
188 240 }
189 241
242 + // -- Sync operations ----------------------------------------------------------
243 +
244 + /// Get all tickets updated after the given timestamp.
245 + pub fn tickets_since(conn: &Connection, since: &str) -> Result<Vec<Ticket>> {
246 + let mut stmt = conn.prepare(
247 + "SELECT * FROM tickets WHERE updated_at > ?1 ORDER BY updated_at ASC",
248 + )?;
249 + let tickets = stmt
250 + .query_map(params![since], row_to_ticket)?
251 + .collect::<rusqlite::Result<Vec<_>>>()?;
252 + Ok(tickets)
253 + }
254 +
255 + /// Upsert a ticket from a peer. Last-writer-wins based on updated_at.
256 + /// Returns true if the ticket was inserted or updated.
257 + pub fn upsert_synced_ticket(conn: &Connection, ticket: &Ticket) -> Result<bool> {
258 + // Check if we have this ticket and if ours is newer
259 + let existing_updated: Option<String> = conn
260 + .query_row(
261 + "SELECT updated_at FROM tickets WHERE id = ?1",
262 + params![ticket.id],
263 + |row| row.get(0),
264 + )
265 + .ok();
266 +
267 + if let Some(ref existing) = existing_updated {
268 + let existing_dt = DateTime::parse_from_rfc3339(existing)
269 + .map(|dt| dt.with_timezone(&Utc))
270 + .unwrap_or_else(|_| Utc::now());
271 + if existing_dt >= ticket.updated_at {
272 + return Ok(false); // Ours is same or newer
273 + }
274 + }
275 +
276 + let resolved_at = ticket.resolved_at.map(|dt| dt.to_rfc3339());
277 +
278 + conn.execute(
279 + "INSERT INTO tickets (id, title, body, priority, status, channel, node_id, source, source_ref, created_at, updated_at, resolved_at)
280 + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
281 + ON CONFLICT(id) DO UPDATE SET
282 + title = excluded.title,
283 + body = excluded.body,
284 + priority = excluded.priority,
285 + status = excluded.status,
286 + channel = excluded.channel,
287 + source = excluded.source,
288 + source_ref = excluded.source_ref,
289 + updated_at = excluded.updated_at,
290 + resolved_at = excluded.resolved_at",
291 + params![
292 + ticket.id,
293 + ticket.title,
294 + ticket.body,
295 + ticket.priority.to_string(),
296 + ticket.status.to_string(),
297 + ticket.channel.to_string(),
298 + ticket.node_id,
299 + ticket.source,
300 + ticket.source_ref,
301 + ticket.created_at.to_rfc3339(),
302 + ticket.updated_at.to_rfc3339(),
303 + resolved_at,
304 + ],
305 + )?;
306 +
307 + Ok(true)
308 + }
309 +
310 + /// Get the sync cursor for a peer (last synced timestamp).
311 + pub fn get_sync_cursor(conn: &Connection, peer_url: &str) -> Result<Option<String>> {
312 + let cursor: Option<String> = conn
313 + .query_row(
314 + "SELECT last_synced FROM sync_cursors WHERE peer_url = ?1",
315 + params![peer_url],
316 + |row| row.get(0),
317 + )
318 + .ok();
319 + Ok(cursor)
320 + }
321 +
322 + /// Update the sync cursor for a peer.
323 + pub fn set_sync_cursor(conn: &Connection, peer_url: &str, last_synced: &str) -> Result<()> {
324 + conn.execute(
325 + "INSERT INTO sync_cursors (peer_url, last_synced) VALUES (?1, ?2)
326 + ON CONFLICT(peer_url) DO UPDATE SET last_synced = excluded.last_synced",
327 + params![peer_url, last_synced],
328 + )?;
329 + Ok(())
330 + }
331 +
190 332 #[cfg(test)]
191 333 mod tests {
192 334 use super::*;
@@ -196,6 +338,7 @@ mod tests {
196 338 title: title.to_string(),
197 339 body: None,
198 340 priority: Priority::Medium,
341 + channel: Channel::System,
199 342 source: Some("test".to_string()),
200 343 source_ref: None,
201 344 }
@@ -204,9 +347,12 @@ mod tests {
204 347 #[test]
205 348 fn create_and_get() {
206 349 let conn = open_memory().unwrap();
207 - let t = create_ticket(&conn, &test_new_ticket("fix the thing")).unwrap();
350 + let node = get_or_create_node_id(&conn).unwrap();
351 + let t = create_ticket(&conn, &test_new_ticket("fix the thing"), &node).unwrap();
208 352 assert_eq!(t.title, "fix the thing");
209 353 assert_eq!(t.status, Status::Open);
354 + assert_eq!(t.channel, Channel::System);
355 + assert_eq!(t.node_id, node);
210 356
211 357 let fetched = get_ticket(&conn, &t.id).unwrap();
212 358 assert_eq!(fetched.id, t.id);
@@ -215,7 +361,8 @@ mod tests {
215 361 #[test]
216 362 fn prefix_match() {
217 363 let conn = open_memory().unwrap();
218 - let t = create_ticket(&conn, &test_new_ticket("test")).unwrap();
364 + let node = get_or_create_node_id(&conn).unwrap();
365 + let t = create_ticket(&conn, &test_new_ticket("test"), &node).unwrap();
219 366 let fetched = get_ticket(&conn, &t.id[..8]).unwrap();
220 367 assert_eq!(fetched.id, t.id);
221 368 }
@@ -223,37 +370,33 @@ mod tests {
223 370 #[test]
224 371 fn list_with_filter() {
225 372 let conn = open_memory().unwrap();
373 + let node = get_or_create_node_id(&conn).unwrap();
226 374 create_ticket(&conn, &NewTicket {
227 375 title: "urgent".into(),
228 376 body: None,
229 377 priority: Priority::Critical,
378 + channel: Channel::Request,
230 379 source: Some("pom".into()),
231 380 source_ref: None,
232 - }).unwrap();
233 - create_ticket(&conn, &test_new_ticket("normal")).unwrap();
381 + }, &node).unwrap();
382 + create_ticket(&conn, &test_new_ticket("normal"), &node).unwrap();
234 383
235 384 let all = list_tickets(&conn, &ListFilter::default()).unwrap();
236 385 assert_eq!(all.len(), 2);
237 - // Critical should sort first
238 386 assert_eq!(all[0].title, "urgent");
239 387
240 - let critical_only = list_tickets(&conn, &ListFilter {
241 - priority: Some(Priority::Critical),
242 - ..Default::default()
243 - }).unwrap();
244 - assert_eq!(critical_only.len(), 1);
245 -
246 - let pom_only = list_tickets(&conn, &ListFilter {
247 - source: Some("pom"),
388 + let requests = list_tickets(&conn, &ListFilter {
389 + channel: Some(Channel::Request),
248 390 ..Default::default()
249 391 }).unwrap();
250 - assert_eq!(pom_only.len(), 1);
392 + assert_eq!(requests.len(), 1);
251 393 }
252 394
253 395 #[test]
254 396 fn update_status_sets_resolved_at() {
255 397 let conn = open_memory().unwrap();
256 - let t = create_ticket(&conn, &test_new_ticket("resolve me")).unwrap();
398 + let node = get_or_create_node_id(&conn).unwrap();
399 + let t = create_ticket(&conn, &test_new_ticket("resolve me"), &node).unwrap();
257 400 assert!(t.resolved_at.is_none());
258 401
259 402 update_status(&conn, &t.id, Status::Resolved).unwrap();
@@ -263,10 +406,39 @@ mod tests {
263 406 }
264 407
265 408 #[test]
409 + fn sync_upsert_last_writer_wins() {
410 + let conn = open_memory().unwrap();
411 + let node = get_or_create_node_id(&conn).unwrap();
412 + let t = create_ticket(&conn, &test_new_ticket("original"), &node).unwrap();
413 +
414 + // Simulate a peer's version with a newer timestamp
415 + let mut peer_ticket = t.clone();
416 + peer_ticket.title = "updated by peer".to_string();
417 + peer_ticket.updated_at = Utc::now() + chrono::Duration::seconds(10);
418 +
419 + let changed = upsert_synced_ticket(&conn, &peer_ticket).unwrap();
420 + assert!(changed);
421 +
422 + let fetched = get_ticket(&conn, &peer_ticket.id).unwrap();
423 + assert_eq!(fetched.title, "updated by peer");
424 +
425 + // Older update should be rejected
426 + let mut stale = peer_ticket.clone();
427 + stale.title = "stale update".to_string();
428 + stale.updated_at = Utc::now() - chrono::Duration::seconds(100);
429 + let changed = upsert_synced_ticket(&conn, &stale).unwrap();
430 + assert!(!changed);
431 +
432 + let fetched = get_ticket(&conn, &peer_ticket.id).unwrap();
433 + assert_eq!(fetched.title, "updated by peer"); // Not "stale update"
434 + }
435 +
436 + #[test]
266 437 fn search_filter() {
267 438 let conn = open_memory().unwrap();
268 - create_ticket(&conn, &test_new_ticket("refund issue")).unwrap();
269 - create_ticket(&conn, &test_new_ticket("build failure")).unwrap();
439 + let node = get_or_create_node_id(&conn).unwrap();
440 + create_ticket(&conn, &test_new_ticket("refund issue"), &node).unwrap();
441 + create_ticket(&conn, &test_new_ticket("build failure"), &node).unwrap();
270 442
271 443 let results = list_tickets(&conn, &ListFilter {
272 444 search: Some("refund"),
@@ -275,4 +447,12 @@ mod tests {
275 447 assert_eq!(results.len(), 1);
276 448 assert_eq!(results[0].title, "refund issue");
277 449 }
450 +
451 + #[test]
452 + fn node_id_persists() {
453 + let conn = open_memory().unwrap();
454 + let id1 = get_or_create_node_id(&conn).unwrap();
455 + let id2 = get_or_create_node_id(&conn).unwrap();
456 + assert_eq!(id1, id2);
457 + }
278 458 }
M wam/src/main.rs +18 -12
@@ -14,25 +14,28 @@ fn main() -> Result<()> {
14 14 color_eyre::install()?;
15 15 let cli = cli::Cli::parse();
16 16 let conn = db::open_db()?;
17 + let node_id = db::get_or_create_node_id(&conn)?;
17 18
18 19 match cli.command {
19 - None => tui::run(conn)?,
20 + None => tui::run(conn, node_id)?,
20 21
21 - Some(Command::Create { title, body, priority, source, source_ref }) => {
22 + Some(Command::Create { title, body, priority, channel, source, source_ref }) => {
22 23 let ticket = db::create_ticket(&conn, &NewTicket {
23 24 title,
24 25 body,
25 26 priority,
27 + channel,
26 28 source: Some(source),
27 29 source_ref,
28 - })?;
29 - println!("created {} ({})", ticket.short_id(), ticket.title);
30 + }, &node_id)?;
31 + println!("created {} [{}] ({})", ticket.short_id(), ticket.channel, ticket.title);
30 32 }
31 33
32 - Some(Command::List { status, priority, source }) => {
34 + Some(Command::List { status, priority, channel, source }) => {
33 35 let tickets = db::list_tickets(&conn, &ListFilter {
34 36 status,
35 37 priority,
38 + channel,
36 39 source: source.as_deref(),
37 40 ..Default::default()
38 41 })?;
@@ -42,16 +45,17 @@ fn main() -> Result<()> {
42 45 return Ok(());
43 46 }
44 47
45 - println!("{:<10} {:<5} {:<12} {:<36} {}", "ID", "Pri", "Status", "Title", "Source");
46 - println!("{}", "-".repeat(75));
48 + println!("{:<10} {:<8} {:<5} {:<12} {:<30} {}", "ID", "Channel", "Pri", "Status", "Title", "Node");
49 + println!("{}", "-".repeat(80));
47 50 for t in &tickets {
48 51 println!(
49 - "{:<10} {:<5} {:<12} {:<36} {}",
52 + "{:<10} {:<8} {:<5} {:<12} {:<30} {}",
50 53 t.short_id(),
54 + t.channel,
51 55 t.priority,
52 56 t.status,
53 - truncate(&t.title, 36),
54 - t.source.as_deref().unwrap_or("-"),
57 + truncate(&t.title, 30),
58 + t.short_node(),
55 59 );
56 60 }
57 61 println!("\n{} ticket(s)", tickets.len());
@@ -61,8 +65,10 @@ fn main() -> Result<()> {
61 65 let t = db::get_ticket(&conn, &id)?;
62 66 println!("ID: {}", t.id);
63 67 println!("Title: {}", t.title);
68 + println!("Channel: {}", t.channel);
64 69 println!("Priority: {}", t.priority);
65 70 println!("Status: {} {}", t.status.indicator(), t.status);
71 + println!("Node: {}", t.node_id);
66 72 println!("Source: {}", t.source.as_deref().unwrap_or("-"));
67 73 println!("Ref: {}", t.source_ref.as_deref().unwrap_or("-"));
68 74 println!("Created: {}", t.created_at.format("%Y-%m-%d %H:%M UTC"));
@@ -87,9 +93,9 @@ fn main() -> Result<()> {
87 93 println!("closed {} ({})", t.short_id(), t.title);
88 94 }
89 95
90 - Some(Command::Serve { port }) => {
96 + Some(Command::Serve { port, peer }) => {
91 97 let rt = tokio::runtime::Runtime::new()?;
92 - rt.block_on(api::serve(conn, port))?;
98 + rt.block_on(api::serve(conn, port, peer))?;
93 99 }
94 100 }
95 101
M wam/src/tui.rs +39 -6
@@ -12,7 +12,7 @@ use ratatui::{
12 12 use rusqlite::Connection;
13 13
14 14 use crate::db::{self, ListFilter};
15 - use crate::types::{Priority, Status, Ticket};
15 + use crate::types::{Channel, Priority, Status, Ticket};
16 16
17 17 // -- View state ---------------------------------------------------------------
18 18
@@ -31,12 +31,14 @@ enum InputMode {
31 31
32 32 struct App {
33 33 conn: Connection,
34 + node_id: String,
34 35 tickets: Vec<Ticket>,
35 36 table_state: TableState,
36 37 view: View,
37 38 input_mode: InputMode,
38 39 status_filter: Option<Status>,
39 40 priority_filter: Option<Priority>,
41 + channel_filter: Option<Channel>,
40 42 source_filter: Option<String>,
41 43 all_sources: Vec<String>,
42 44 search_query: String,
@@ -45,15 +47,17 @@ struct App {
45 47 }
46 48
47 49 impl App {
48 - fn new(conn: Connection) -> Result<Self> {
50 + fn new(conn: Connection, node_id: String) -> Result<Self> {
49 51 let mut app = Self {
50 52 conn,
53 + node_id,
51 54 tickets: Vec::new(),
52 55 table_state: TableState::default(),
53 56 view: View::List,
54 57 input_mode: InputMode::Normal,
55 58 status_filter: None,
56 59 priority_filter: None,
60 + channel_filter: None,
57 61 source_filter: None,
58 62 all_sources: Vec::new(),
59 63 search_query: String::new(),
@@ -79,6 +83,7 @@ impl App {
79 83 &ListFilter {
80 84 status: self.status_filter,
81 85 priority: self.priority_filter,
86 + channel: self.channel_filter,
82 87 source: self.source_filter.as_deref(),
83 88 search,
84 89 },
@@ -159,6 +164,16 @@ impl App {
159 164 Ok(())
160 165 }
161 166
167 + fn cycle_channel_filter(&mut self) -> Result<()> {
168 + self.channel_filter = match self.channel_filter {
169 + None => Some(Channel::System),
170 + Some(Channel::System) => Some(Channel::Request),
171 + Some(Channel::Request) => Some(Channel::Task),
172 + Some(Channel::Task) => None,
173 + };
174 + self.refresh()
175 + }
176 +
162 177 fn submit_create(&mut self) -> Result<()> {
163 178 let title = self.create_input.trim().to_string();
164 179 if !title.is_empty() {
@@ -168,9 +183,11 @@ impl App {
168 183 title,
169 184 body: None,
170 185 priority: Priority::Medium,
186 + channel: Channel::Task,
171 187 source: Some("manual".into()),
172 188 source_ref: None,
173 189 },
190 + &self.node_id,
174 191 )?;
175 192 self.refresh()?;
176 193 }
@@ -182,9 +199,9 @@ impl App {
182 199
183 200 // -- Entry point --------------------------------------------------------------
184 201
185 - pub fn run(conn: Connection) -> Result<()> {
202 + pub fn run(conn: Connection, node_id: String) -> Result<()> {
186 203 let mut terminal = ratatui::init();
187 - let mut app = App::new(conn)?;
204 + let mut app = App::new(conn, node_id)?;
188 205
189 206 while app.running {
190 207 terminal.draw(|f| render(&mut app, f))?;
@@ -274,6 +291,7 @@ fn handle_key(app: &mut App, key: KeyCode) -> Result<()> {
274 291 KeyCode::Char('f') => app.cycle_status_filter()?,
275 292 KeyCode::Char('p') => app.cycle_priority_filter()?,
276 293 KeyCode::Char('s') => app.cycle_source_filter()?,
294 + KeyCode::Char('t') => app.cycle_channel_filter()?,
277 295 KeyCode::Char('/') => {
278 296 app.search_query.clear();
279 297 app.input_mode = InputMode::Search;
@@ -329,6 +347,9 @@ fn render_title_bar(app: &App, f: &mut Frame, area: Rect) {
329 347 if let Some(priority) = app.priority_filter {
330 348 filters.push(format!("pri:{priority}"));
331 349 }
350 + if let Some(channel) = app.channel_filter {
351 + filters.push(format!("ch:{channel}"));
352 + }
332 353 if let Some(ref source) = app.source_filter {
333 354 filters.push(format!("src:{source}"));
334 355 }
@@ -352,7 +373,7 @@ fn render_title_bar(app: &App, f: &mut Frame, area: Rect) {
352 373 }
353 374
354 375 fn render_list(app: &mut App, f: &mut Frame, area: Rect) {
355 - let header = Row::new(["Pri", "Title", "Source", "Status", "Age"])
376 + let header = Row::new(["Pri", "Title", "Ch", "Source", "Status", "Node", "Age"])
356 377 .style(Style::new().bold().underlined());
357 378
358 379 let rows: Vec<Row> = app
@@ -364,8 +385,10 @@ fn render_list(app: &mut App, f: &mut Frame, area: Rect) {
364 385 Cell::from(format!(" {} ", t.priority.to_string().chars().next().unwrap_or(' ')))
365 386 .style(pri_style),
366 387 Cell::from(t.title.as_str()),
388 + Cell::from(t.channel.to_string()),
367 389 Cell::from(t.source.as_deref().unwrap_or("-")),
368 390 Cell::from(format!("{} {}", t.status.indicator(), t.status)),
391 + Cell::from(t.short_node()),
369 392 Cell::from(t.age()),
370 393 ])
371 394 })
@@ -374,8 +397,10 @@ fn render_list(app: &mut App, f: &mut Frame, area: Rect) {
374 397 let widths = [
375 398 Constraint::Length(5),
376 399 Constraint::Fill(1),
400 + Constraint::Length(8),
377 401 Constraint::Length(14),
378 402 Constraint::Length(14),
403 + Constraint::Length(10),
379 404 Constraint::Length(5),
380 405 ];
381 406
@@ -405,6 +430,10 @@ fn render_detail(app: &App, f: &mut Frame, area: Rect) {
405 430 Span::raw(&ticket.title),
406 431 ]),
407 432 Line::from(vec![
433 + Span::styled("Channel: ", Style::new().bold()),
434 + Span::raw(ticket.channel.to_string()),
435 + ]),
436 + Line::from(vec![
408 437 Span::styled("Priority: ", Style::new().bold()),
409 438 Span::styled(ticket.priority.to_string(), Style::new().fg(ticket.priority.color())),
410 439 ]),
@@ -413,6 +442,10 @@ fn render_detail(app: &App, f: &mut Frame, area: Rect) {
413 442 Span::raw(format!("{} {}", ticket.status.indicator(), ticket.status)),
414 443 ]),
415 444 Line::from(vec![
445 + Span::styled("Node: ", Style::new().bold()),
446 + Span::raw(&ticket.node_id),
447 + ]),
448 + Line::from(vec![
416 449 Span::styled("Source: ", Style::new().bold()),
417 450 Span::raw(ticket.source.as_deref().unwrap_or("-")),
418 451 ]),
@@ -489,7 +522,7 @@ fn render_status_bar(app: &App, f: &mut Frame, area: Rect) {
489 522 let hints = match app.view {
490 523 View::List => match app.input_mode {
491 524 InputMode::Search => "Enter: apply Esc: clear Type to search",
492 - InputMode::Normal => "j/k:nav Enter:open n:new o/i/r/c:status f:status p:pri s:src /:search q:quit",
525 + InputMode::Normal => "j/k:nav Enter:open n:new o/i/r/c:status f:status p:pri t:channel s:src /:search q:quit",
493 526 },
494 527 View::Detail => "o/i/r/c:status Esc:back",
495 528 View::Create => "Enter:create Esc:cancel",
M wam/src/types.rs +52 -2
@@ -1,4 +1,4 @@
1 - //! Core types: Priority, Status, Ticket.
1 + //! Core types: Priority, Status, Channel, Ticket.
2 2
3 3 use std::fmt;
4 4 use std::str::FromStr;
@@ -27,7 +27,6 @@ impl Priority {
27 27 Self::Critical => Color::Red,
28 28 }
29 29 }
30 -
31 30 }
32 31
33 32 impl fmt::Display for Priority {
@@ -100,6 +99,44 @@ impl FromStr for Status {
100 99 }
101 100 }
102 101
102 + // -- Channel ------------------------------------------------------------------
103 +
104 + /// Ticket channel: who is communicating with whom.
105 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
106 + #[serde(rename_all = "lowercase")]
107 + pub enum Channel {
108 + /// System-to-system: automated alerts, CI results, health checks.
109 + /// No human response expected.
110 + System,
111 + /// Human-system bidirectional: approval requests, investigations.
112 + /// A system creates it, a human acts on it (or vice versa).
113 + Request,
114 + /// Human-to-human: async tasks, questions, coordination across machines.
115 + Task,
116 + }
117 +
118 + impl fmt::Display for Channel {
119 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 + f.write_str(match self {
121 + Self::System => "system",
122 + Self::Request => "request",
123 + Self::Task => "task",
124 + })
125 + }
126 + }
127 +
128 + impl FromStr for Channel {
129 + type Err = String;
130 + fn from_str(s: &str) -> Result<Self, Self::Err> {
131 + match s.to_lowercase().as_str() {
132 + "system" => Ok(Self::System),
133 + "request" => Ok(Self::Request),
134 + "task" => Ok(Self::Task),
135 + _ => Err(format!("unknown channel: {s}")),
136 + }
137 + }
138 + }
139 +
103 140 // -- Ticket -------------------------------------------------------------------
104 141
105 142 #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -109,6 +146,8 @@ pub struct Ticket {
109 146 pub body: Option<String>,
110 147 pub priority: Priority,
111 148 pub status: Status,
149 + pub channel: Channel,
150 + pub node_id: String,
112 151 pub source: Option<String>,
113 152 pub source_ref: Option<String>,
114 153 pub created_at: DateTime<Utc>,
@@ -133,6 +172,11 @@ impl Ticket {
133 172 pub fn short_id(&self) -> &str {
134 173 &self.id[..8.min(self.id.len())]
135 174 }
175 +
176 + /// Short node ID for display.
177 + pub fn short_node(&self) -> &str {
178 + &self.node_id[..8.min(self.node_id.len())]
179 + }
136 180 }
137 181
138 182 // -- NewTicket (for create) ---------------------------------------------------
@@ -144,6 +188,8 @@ pub struct NewTicket {
144 188 pub body: Option<String>,
145 189 #[serde(default = "default_priority")]
146 190 pub priority: Priority,
191 + #[serde(default = "default_channel")]
192 + pub channel: Channel,
147 193 #[serde(default)]
148 194 pub source: Option<String>,
149 195 #[serde(default)]
@@ -153,3 +199,7 @@ pub struct NewTicket {
153 199 fn default_priority() -> Priority {
154 200 Priority::Medium
155 201 }
202 +
203 + fn default_channel() -> Channel {
204 + Channel::System
205 + }