//! HTTP API for programmatic ticket management and peer sync. //! //! Designed to run on the tailnet -- tailnet membership is the auth boundary. use std::sync::Arc; use tokio::sync::Mutex; use axum::{ Json, Router, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, patch, post}, }; use rusqlite::Connection; use serde::Deserialize; use crate::db::{self, ListFilter}; use crate::types::{Channel, NewTicket, Priority, Status, Ticket}; /// Shared state: SQLite connection + node identity. #[derive(Clone)] pub struct AppState { pub db: Arc>, pub node_id: String, } /// Start the HTTP server, optionally syncing with peers. pub async fn serve( conn: Connection, port: u16, peers: Vec, ) -> color_eyre::eyre::Result<()> { let node_id = db::get_or_create_node_id(&conn)?; eprintln!("node: {}", &node_id[..8]); let app_state = AppState { db: Arc::new(Mutex::new(conn)), node_id, }; // Spawn sync loop if peers are configured if !peers.is_empty() { let sync_state = app_state.clone(); let sync_peers = peers.clone(); tokio::spawn(async move { sync_loop(sync_state, sync_peers).await; }); } let app = Router::new() .route("/tickets", post(create_ticket)) .route("/tickets", get(list_tickets)) .route("/tickets/{id}", get(get_ticket)) .route("/tickets/{id}", patch(update_ticket)) .route("/sync/pull", get(sync_pull)) .route("/sync/push", post(sync_push)) .route("/sync/node", get(sync_node_info)) .with_state(app_state); let addr = format!("0.0.0.0:{port}"); let listener = tokio::net::TcpListener::bind(&addr).await?; eprintln!("wam serving on {addr}"); if !peers.is_empty() { eprintln!("syncing with {} peer(s)", peers.len()); } axum::serve(listener, app).await?; Ok(()) } // -- Ticket handlers ---------------------------------------------------------- /// POST /tickets async fn create_ticket( State(state): State, Json(new): Json, ) -> impl IntoResponse { let conn = state.db.lock().await; match db::create_ticket(&conn, &new, &state.node_id) { Ok(ticket) => (StatusCode::CREATED, Json(serde_json::json!(ticket))).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } #[derive(Debug, Deserialize, Default)] pub struct ListQuery { pub status: Option, pub priority: Option, pub channel: Option, pub source: Option, pub search: Option, } /// GET /tickets async fn list_tickets( State(state): State, Query(q): Query, ) -> impl IntoResponse { let status = q.status.as_deref().and_then(|s| s.parse::().ok()); let priority = q.priority.as_deref().and_then(|s| s.parse::().ok()); let channel = q.channel.as_deref().and_then(|s| s.parse::().ok()); let conn = state.db.lock().await; let filter = ListFilter { status, priority, channel, source: q.source.as_deref(), search: q.search.as_deref(), }; match db::list_tickets(&conn, &filter) { Ok(tickets) => Json(serde_json::json!({"data": tickets, "count": tickets.len()})).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } /// GET /tickets/:id async fn get_ticket( State(state): State, Path(id): Path, ) -> impl IntoResponse { let conn = state.db.lock().await; match db::get_ticket(&conn, &id) { Ok(ticket) => Json(serde_json::json!(ticket)).into_response(), Err(_) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "ticket not found"})), ) .into_response(), } } #[derive(Debug, Deserialize)] pub struct UpdateBody { pub status: Option, } /// PATCH /tickets/:id async fn update_ticket( State(state): State, Path(id): Path, Json(body): Json, ) -> impl IntoResponse { let conn = state.db.lock().await; let ticket = match db::get_ticket(&conn, &id) { Ok(t) => t, Err(_) => { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "ticket not found"})), ) .into_response(); } }; if let Some(status_str) = body.status { let status = match status_str.parse::() { Ok(s) => s, Err(_) => { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": format!("invalid status: {status_str}")})), ) .into_response(); } }; if let Err(e) = db::update_status(&conn, &ticket.id, status) { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(); } } match db::get_ticket(&conn, &ticket.id) { Ok(t) => Json(serde_json::json!(t)).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } // -- Sync endpoints ----------------------------------------------------------- #[derive(Debug, Deserialize)] pub struct SyncPullQuery { /// RFC3339 timestamp. Returns tickets updated after this time. pub since: String, } /// GET /sync/pull?since= — peer pulls tickets updated after timestamp async fn sync_pull( State(state): State, Query(q): Query, ) -> impl IntoResponse { let conn = state.db.lock().await; match db::tickets_since(&conn, &q.since) { Ok(tickets) => Json(serde_json::json!({ "tickets": tickets, "count": tickets.len(), "node_id": state.node_id, })) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) .into_response(), } } /// POST /sync/push — peer pushes tickets to us async fn sync_push( State(state): State, Json(tickets): Json>, ) -> impl IntoResponse { let conn = state.db.lock().await; let mut accepted = 0u32; let mut rejected = 0u32; for ticket in &tickets { match db::upsert_synced_ticket(&conn, ticket) { Ok(true) => accepted += 1, Ok(false) => rejected += 1, Err(e) => { eprintln!("sync upsert error for {}: {e}", ticket.short_id()); rejected += 1; } } } Json(serde_json::json!({ "accepted": accepted, "rejected": rejected, })) } /// GET /sync/node — returns this node's identity async fn sync_node_info(State(state): State) -> impl IntoResponse { Json(serde_json::json!({ "node_id": state.node_id, })) } // -- Background sync loop ----------------------------------------------------- /// Periodically pull from all peers and push local changes. async fn sync_loop(state: AppState, peers: Vec) { let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(10)) .connect_timeout(std::time::Duration::from_secs(5)) .build() .unwrap(); loop { tokio::time::sleep(std::time::Duration::from_secs(30)).await; for peer in &peers { if let Err(e) = sync_with_peer(&state, &client, peer).await { eprintln!("sync with {peer}: {e}"); } } } } /// Pull new tickets from a peer, then push our new tickets to them. async fn sync_with_peer( state: &AppState, client: &reqwest::Client, peer_url: &str, ) -> Result<(), Box> { let conn = state.db.lock().await; // Get our cursor for this peer (default to epoch) let cursor = db::get_sync_cursor(&conn, peer_url)? .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()); drop(conn); // Release lock before HTTP // Pull from peer let pull_url = format!("{peer_url}/sync/pull?since={}", urlencoding::encode(&cursor)); let resp: serde_json::Value = client.get(&pull_url).send().await?.json().await?; let tickets: Vec = serde_json::from_value( resp.get("tickets").cloned().unwrap_or(serde_json::json!([])), )?; if !tickets.is_empty() { let conn = state.db.lock().await; let mut latest_updated = cursor.clone(); for ticket in &tickets { db::upsert_synced_ticket(&conn, ticket)?; let ts = ticket.updated_at.to_rfc3339(); if ts > latest_updated { latest_updated = ts; } } db::set_sync_cursor(&conn, peer_url, &latest_updated)?; drop(conn); eprintln!("sync: pulled {} ticket(s) from {peer_url}", tickets.len()); } // Push our changes to peer (tickets updated since their last pull from us) // We use the same cursor — they'll filter by last-writer-wins let conn = state.db.lock().await; let our_tickets = db::tickets_since(&conn, &cursor)?; drop(conn); if !our_tickets.is_empty() { let push_url = format!("{peer_url}/sync/push"); let resp: serde_json::Value = client .post(&push_url) .json(&our_tickets) .send() .await? .json() .await?; let accepted = resp.get("accepted").and_then(|v| v.as_u64()).unwrap_or(0); if accepted > 0 { eprintln!("sync: pushed {accepted} ticket(s) to {peer_url}"); } } Ok(()) }