Skip to main content

max / makenotwork

10.0 KB · 342 lines History Blame Raw
1 //! HTTP API for programmatic ticket management and peer sync.
2 //!
3 //! Designed to run on the tailnet -- tailnet membership is the auth boundary.
4
5 use std::sync::Arc;
6
7 use tokio::sync::Mutex;
8
9 use axum::{
10 Json, Router,
11 extract::{Path, Query, State},
12 http::StatusCode,
13 response::IntoResponse,
14 routing::{get, patch, post},
15 };
16 use rusqlite::Connection;
17 use serde::Deserialize;
18
19 use crate::db::{self, ListFilter};
20 use crate::types::{Channel, NewTicket, Priority, Status, Ticket};
21
22 /// Shared state: SQLite connection + node identity.
23 #[derive(Clone)]
24 pub struct AppState {
25 pub db: Arc<Mutex<Connection>>,
26 pub node_id: String,
27 }
28
29 /// Start the HTTP server, optionally syncing with peers.
30 pub async fn serve(
31 conn: Connection,
32 port: u16,
33 peers: Vec<String>,
34 ) -> color_eyre::eyre::Result<()> {
35 let node_id = db::get_or_create_node_id(&conn)?;
36 eprintln!("node: {}", &node_id[..8]);
37
38 let app_state = AppState {
39 db: Arc::new(Mutex::new(conn)),
40 node_id,
41 };
42
43 // Spawn sync loop if peers are configured
44 if !peers.is_empty() {
45 let sync_state = app_state.clone();
46 let sync_peers = peers.clone();
47 tokio::spawn(async move {
48 sync_loop(sync_state, sync_peers).await;
49 });
50 }
51
52 let app = Router::new()
53 .route("/tickets", post(create_ticket))
54 .route("/tickets", get(list_tickets))
55 .route("/tickets/{id}", get(get_ticket))
56 .route("/tickets/{id}", patch(update_ticket))
57 .route("/sync/pull", get(sync_pull))
58 .route("/sync/push", post(sync_push))
59 .route("/sync/node", get(sync_node_info))
60 .with_state(app_state);
61
62 let addr = format!("0.0.0.0:{port}");
63 let listener = tokio::net::TcpListener::bind(&addr).await?;
64 eprintln!("wam serving on {addr}");
65 if !peers.is_empty() {
66 eprintln!("syncing with {} peer(s)", peers.len());
67 }
68 axum::serve(listener, app).await?;
69 Ok(())
70 }
71
72 // -- Ticket handlers ----------------------------------------------------------
73
74 /// POST /tickets
75 async fn create_ticket(
76 State(state): State<AppState>,
77 Json(new): Json<NewTicket>,
78 ) -> impl IntoResponse {
79 let conn = state.db.lock().await;
80 match db::create_ticket(&conn, &new, &state.node_id) {
81 Ok(ticket) => (StatusCode::CREATED, Json(serde_json::json!(ticket))).into_response(),
82 Err(e) => (
83 StatusCode::INTERNAL_SERVER_ERROR,
84 Json(serde_json::json!({"error": e.to_string()})),
85 )
86 .into_response(),
87 }
88 }
89
90 #[derive(Debug, Deserialize, Default)]
91 pub struct ListQuery {
92 pub status: Option<String>,
93 pub priority: Option<String>,
94 pub channel: Option<String>,
95 pub source: Option<String>,
96 pub search: Option<String>,
97 }
98
99 /// GET /tickets
100 async fn list_tickets(
101 State(state): State<AppState>,
102 Query(q): Query<ListQuery>,
103 ) -> impl IntoResponse {
104 let status = q.status.as_deref().and_then(|s| s.parse::<Status>().ok());
105 let priority = q.priority.as_deref().and_then(|s| s.parse::<Priority>().ok());
106 let channel = q.channel.as_deref().and_then(|s| s.parse::<Channel>().ok());
107
108 let conn = state.db.lock().await;
109 let filter = ListFilter {
110 status,
111 priority,
112 channel,
113 source: q.source.as_deref(),
114 search: q.search.as_deref(),
115 };
116 match db::list_tickets(&conn, &filter) {
117 Ok(tickets) => Json(serde_json::json!({"data": tickets, "count": tickets.len()})).into_response(),
118 Err(e) => (
119 StatusCode::INTERNAL_SERVER_ERROR,
120 Json(serde_json::json!({"error": e.to_string()})),
121 )
122 .into_response(),
123 }
124 }
125
126 /// GET /tickets/:id
127 async fn get_ticket(
128 State(state): State<AppState>,
129 Path(id): Path<String>,
130 ) -> impl IntoResponse {
131 let conn = state.db.lock().await;
132 match db::get_ticket(&conn, &id) {
133 Ok(ticket) => Json(serde_json::json!(ticket)).into_response(),
134 Err(_) => (
135 StatusCode::NOT_FOUND,
136 Json(serde_json::json!({"error": "ticket not found"})),
137 )
138 .into_response(),
139 }
140 }
141
142 #[derive(Debug, Deserialize)]
143 pub struct UpdateBody {
144 pub status: Option<String>,
145 }
146
147 /// PATCH /tickets/:id
148 async fn update_ticket(
149 State(state): State<AppState>,
150 Path(id): Path<String>,
151 Json(body): Json<UpdateBody>,
152 ) -> impl IntoResponse {
153 let conn = state.db.lock().await;
154
155 let ticket = match db::get_ticket(&conn, &id) {
156 Ok(t) => t,
157 Err(_) => {
158 return (
159 StatusCode::NOT_FOUND,
160 Json(serde_json::json!({"error": "ticket not found"})),
161 )
162 .into_response();
163 }
164 };
165
166 if let Some(status_str) = body.status {
167 let status = match status_str.parse::<Status>() {
168 Ok(s) => s,
169 Err(_) => {
170 return (
171 StatusCode::BAD_REQUEST,
172 Json(serde_json::json!({"error": format!("invalid status: {status_str}")})),
173 )
174 .into_response();
175 }
176 };
177 if let Err(e) = db::update_status(&conn, &ticket.id, status) {
178 return (
179 StatusCode::INTERNAL_SERVER_ERROR,
180 Json(serde_json::json!({"error": e.to_string()})),
181 )
182 .into_response();
183 }
184 }
185
186 match db::get_ticket(&conn, &ticket.id) {
187 Ok(t) => Json(serde_json::json!(t)).into_response(),
188 Err(e) => (
189 StatusCode::INTERNAL_SERVER_ERROR,
190 Json(serde_json::json!({"error": e.to_string()})),
191 )
192 .into_response(),
193 }
194 }
195
196 // -- Sync endpoints -----------------------------------------------------------
197
198 #[derive(Debug, Deserialize)]
199 pub struct SyncPullQuery {
200 /// RFC3339 timestamp. Returns tickets updated after this time.
201 pub since: String,
202 }
203
204 /// GET /sync/pull?since=<rfc3339> — peer pulls tickets updated after timestamp
205 async fn sync_pull(
206 State(state): State<AppState>,
207 Query(q): Query<SyncPullQuery>,
208 ) -> impl IntoResponse {
209 let conn = state.db.lock().await;
210 match db::tickets_since(&conn, &q.since) {
211 Ok(tickets) => Json(serde_json::json!({
212 "tickets": tickets,
213 "count": tickets.len(),
214 "node_id": state.node_id,
215 }))
216 .into_response(),
217 Err(e) => (
218 StatusCode::INTERNAL_SERVER_ERROR,
219 Json(serde_json::json!({"error": e.to_string()})),
220 )
221 .into_response(),
222 }
223 }
224
225 /// POST /sync/push — peer pushes tickets to us
226 async fn sync_push(
227 State(state): State<AppState>,
228 Json(tickets): Json<Vec<Ticket>>,
229 ) -> impl IntoResponse {
230 let conn = state.db.lock().await;
231 let mut accepted = 0u32;
232 let mut rejected = 0u32;
233
234 for ticket in &tickets {
235 match db::upsert_synced_ticket(&conn, ticket) {
236 Ok(true) => accepted += 1,
237 Ok(false) => rejected += 1,
238 Err(e) => {
239 eprintln!("sync upsert error for {}: {e}", ticket.short_id());
240 rejected += 1;
241 }
242 }
243 }
244
245 Json(serde_json::json!({
246 "accepted": accepted,
247 "rejected": rejected,
248 }))
249 }
250
251 /// GET /sync/node — returns this node's identity
252 async fn sync_node_info(State(state): State<AppState>) -> impl IntoResponse {
253 Json(serde_json::json!({
254 "node_id": state.node_id,
255 }))
256 }
257
258 // -- Background sync loop -----------------------------------------------------
259
260 /// Periodically pull from all peers and push local changes.
261 async fn sync_loop(state: AppState, peers: Vec<String>) {
262 let client = reqwest::Client::builder()
263 .timeout(std::time::Duration::from_secs(10))
264 .connect_timeout(std::time::Duration::from_secs(5))
265 .build()
266 .unwrap();
267
268 loop {
269 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
270
271 for peer in &peers {
272 if let Err(e) = sync_with_peer(&state, &client, peer).await {
273 eprintln!("sync with {peer}: {e}");
274 }
275 }
276 }
277 }
278
279 /// Pull new tickets from a peer, then push our new tickets to them.
280 async fn sync_with_peer(
281 state: &AppState,
282 client: &reqwest::Client,
283 peer_url: &str,
284 ) -> Result<(), Box<dyn std::error::Error>> {
285 let conn = state.db.lock().await;
286
287 // Get our cursor for this peer (default to epoch)
288 let cursor = db::get_sync_cursor(&conn, peer_url)?
289 .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string());
290
291 drop(conn); // Release lock before HTTP
292
293 // Pull from peer
294 let pull_url = format!("{peer_url}/sync/pull?since={}", urlencoding::encode(&cursor));
295 let resp: serde_json::Value = client.get(&pull_url).send().await?.json().await?;
296
297 let tickets: Vec<Ticket> = serde_json::from_value(
298 resp.get("tickets").cloned().unwrap_or(serde_json::json!([])),
299 )?;
300
301 if !tickets.is_empty() {
302 let conn = state.db.lock().await;
303 let mut latest_updated = cursor.clone();
304
305 for ticket in &tickets {
306 db::upsert_synced_ticket(&conn, ticket)?;
307 let ts = ticket.updated_at.to_rfc3339();
308 if ts > latest_updated {
309 latest_updated = ts;
310 }
311 }
312
313 db::set_sync_cursor(&conn, peer_url, &latest_updated)?;
314 drop(conn);
315
316 eprintln!("sync: pulled {} ticket(s) from {peer_url}", tickets.len());
317 }
318
319 // Push our changes to peer (tickets updated since their last pull from us)
320 // We use the same cursor — they'll filter by last-writer-wins
321 let conn = state.db.lock().await;
322 let our_tickets = db::tickets_since(&conn, &cursor)?;
323 drop(conn);
324
325 if !our_tickets.is_empty() {
326 let push_url = format!("{peer_url}/sync/push");
327 let resp: serde_json::Value = client
328 .post(&push_url)
329 .json(&our_tickets)
330 .send()
331 .await?
332 .json()
333 .await?;
334 let accepted = resp.get("accepted").and_then(|v| v.as_u64()).unwrap_or(0);
335 if accepted > 0 {
336 eprintln!("sync: pushed {accepted} ticket(s) to {peer_url}");
337 }
338 }
339
340 Ok(())
341 }
342