//! SyncKit push/pull, status, device management, and key management endpoints. use axum::{ extract::{Path, State}, response::IntoResponse, Json, }; use crate::{ constants, db::{self, SyncDeviceId}, error::{AppError, Result}, payments::{self, SyncBillingInterval}, synckit_auth::SyncUser, validation, AppState, }; use super::{ AppPricingRequest, AppPricingResponse, BeginRotationRequest, BeginRotationResponse, CompleteRotationErrorResponse, CompleteRotationRequest, GetKeyResponse, PendingKeyInfo, PullChangeEntry, PullRequest, PullResponse, PushRequest, PushResponse, PutKeyRequest, RegisterDeviceRequest, RotationBatchRequest, RotationBatchResponse, RotationEntriesRequest, RotationEntriesResponse, RotationEntry, SyncAccountResponse, SyncCapChangeRequest, SyncCheckoutResponse, SyncDeviceResponse, SyncQuoteRequest, SyncQuoteResponse, SyncStatusResponse, SyncSubscribeRequest, SyncSubscriptionStatusResponse, }; // ── Sync endpoints (JWT auth) ── /// Push encrypted changelog entries from a device. #[utoipa::path(post, path = "/api/v1/sync/push", tag = "SyncKit", request_body = PushRequest, responses((status = 200, description = "New cursor position", body = PushResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::sync_push", fields(app_id, user_id))] pub(super) async fn sync_push( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { let app_id = sync_user.app_id; let user_id = sync_user.user_id; tracing::Span::current().record("app_id", tracing::field::display(&app_id)); tracing::Span::current().record("user_id", tracing::field::display(&user_id)); if req.changes.is_empty() { return Err(AppError::BadRequest("No changes provided".to_string())); } if req.changes.len() > constants::SYNCKIT_PUSH_MAX_CHANGES { return Err(AppError::BadRequest(format!( "Maximum {} changes per push", constants::SYNCKIT_PUSH_MAX_CHANGES ))); } // Validate all changes for change in &req.changes { validation::validate_sync_table_name(&change.table)?; validation::validate_sync_row_id(&change.row_id)?; if change.op == db::SyncOperation::Delete && change.data.is_some() { return Err(AppError::BadRequest( "DELETE operations should not include data".to_string(), )); } } // Verify device belongs to this user + app let devices = db::synckit::get_sync_devices(&state.db, app_id, user_id).await?; if !devices.iter().any(|d| d.id == req.device_id) { return Err(AppError::BadRequest("Unknown device".to_string())); } // Touch device db::synckit::touch_sync_device(&state.db, req.device_id).await?; // Build change tuples (op converted to string for TEXT column) let changes: Vec<_> = req .changes .iter() .map(|c| { ( c.table.clone(), c.op.to_string(), c.row_id.clone(), c.timestamp, c.data.clone(), ) }) .collect(); let cursor = db::synckit::push_sync_changes( &state.db, app_id, user_id, req.device_id, req.batch_id, &changes, ) .await?; // Notify SSE subscribers that new changes are available if let Some(sender) = state.sync_notify.get(&(app_id, user_id)) { let _ = sender.send(()); // Ignore errors (no subscribers = ok) } Ok(Json(PushResponse { cursor })) } /// Pull changelog entries after a given cursor. #[utoipa::path(post, path = "/api/v1/sync/pull", tag = "SyncKit", request_body = PullRequest, responses((status = 200, description = "Changes since cursor", body = PullResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::sync_pull", fields(app_id, user_id))] pub(super) async fn sync_pull( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { let app_id = sync_user.app_id; let user_id = sync_user.user_id; tracing::Span::current().record("app_id", tracing::field::display(&app_id)); tracing::Span::current().record("user_id", tracing::field::display(&user_id)); // Verify device belongs to this user + app let devices = db::synckit::get_sync_devices(&state.db, app_id, user_id).await?; if !devices.iter().any(|d| d.id == req.device_id) { return Err(AppError::BadRequest("Unknown device".to_string())); } // Touch device db::synckit::touch_sync_device(&state.db, req.device_id).await?; // Validate table name filters if provided if let Some(ref tables) = req.tables { if tables.len() > 50 { return Err(AppError::BadRequest( "Maximum 50 table names per filter".to_string(), )); } for table in tables { validation::validate_sync_table_name(table)?; } } let page_size = constants::SYNCKIT_PULL_PAGE_SIZE; let entries = db::synckit::pull_sync_changes_filtered( &state.db, app_id, user_id, req.cursor, page_size, req.tables.as_deref(), req.since, ) .await?; let has_more = entries.len() as i64 == page_size; let new_cursor = entries.last().map(|e| e.seq).unwrap_or(req.cursor); // Track this device's cursor position for sync log compaction. // GREATEST in the query ensures we never regress the cursor. if new_cursor > req.cursor { db::synckit::update_device_cursor(&state.db, req.device_id, new_cursor).await?; } let changes: Vec = entries .into_iter() .map(|e| PullChangeEntry { seq: e.seq, device_id: e.device_id, table: e.table_name, op: e.operation.to_string(), row_id: e.row_id, timestamp: e.client_timestamp, data: e.data, key_id: e.key_id, }) .collect(); Ok(Json(PullResponse { changes, cursor: new_cursor, has_more, })) } /// Return sync metadata for the authenticated user and app. #[utoipa::path(get, path = "/api/v1/sync/status", tag = "SyncKit", responses((status = 200, description = "Sync status", body = SyncStatusResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::sync_status")] pub(super) async fn sync_status( State(state): State, sync_user: SyncUser, ) -> Result { let app_id = sync_user.app_id; let user_id = sync_user.user_id; let (total_changes, latest_cursor) = db::synckit::get_sync_status(&state.db, app_id, user_id).await?; Ok(Json(SyncStatusResponse { total_changes, latest_cursor, })) } /// Return the authenticated user's email and username, for the app to display /// "logged in as ..." in its sync UI. #[utoipa::path(get, path = "/api/v1/sync/account", tag = "SyncKit", responses((status = 200, description = "Account info", body = SyncAccountResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::sync_account")] pub(super) async fn sync_account( State(state): State, sync_user: SyncUser, ) -> Result { let user = db::users::get_user_by_id(&state.db, sync_user.user_id) .await? .ok_or(AppError::NotFound)?; Ok(Json(SyncAccountResponse { email: user.email.as_str().to_string(), username: user.username.as_str().to_string(), })) } /// Return the authenticated user's subscription status for this app. /// Returns `active: false` and `None` fields when the user has no subscription /// (rather than 404) so clients can render a "subscribe" CTA uniformly. #[utoipa::path(get, path = "/api/v1/sync/subscription", tag = "SyncKit", responses((status = 200, description = "Subscription status", body = SyncSubscriptionStatusResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::sync_subscription_status")] pub(super) async fn sync_subscription_status( State(state): State, sync_user: SyncUser, ) -> Result { let sub = db::synckit::get_user_app_subscription( &state.db, sync_user.user_id, sync_user.app_id, ) .await?; let response = match sub { Some(s) => SyncSubscriptionStatusResponse { active: s.status == "active", tier: Some(s.interval), status: Some(s.status), storage_limit_bytes: s.storage_limit_bytes, pending_storage_limit_bytes: s.pending_storage_limit_bytes, storage_used_bytes: None, current_period_end: s.current_period_end.map(|t| t.to_rfc3339()), }, None => SyncSubscriptionStatusResponse { active: false, tier: None, status: None, storage_limit_bytes: None, pending_storage_limit_bytes: None, storage_used_bytes: None, current_period_end: None, }, }; Ok(Json(response)) } /// Return the pricing-formula constants for an app. The client uses these to /// quote a price locally as the user adjusts the cap slider; the same formula /// is enforced server-side at checkout so the client number is only advisory. #[utoipa::path(post, path = "/api/v1/sync/app/pricing", tag = "SyncKit", request_body = AppPricingRequest, responses((status = 200, description = "Pricing formula", body = AppPricingResponse)), )] #[tracing::instrument(skip_all, name = "synckit::get_app_pricing")] pub(super) async fn get_app_pricing( State(state): State, Json(req): Json, ) -> Result { let app = db::synckit::get_sync_app_by_api_key(&state.db, &req.api_key) .await? .ok_or(AppError::NotFound)?; Ok(Json(AppPricingResponse { app_name: app.name, min_charge_cents: payments::MIN_CHARGE_CENTS, per_gb_tenths_of_cent_per_month: payments::synckit_app_pricing::PER_GB_TENTHS_OF_CENT_PER_MONTH, annual_multiplier: payments::ANNUAL_MULTIPLIER, min_cap_bytes: payments::MIN_CAP_BYTES, max_cap_bytes: payments::MAX_CAP_BYTES, })) } /// Quote the price for a (cap, interval) pair. Authenticated so clients /// cannot scrape pricing without an account, but otherwise pure: the result /// only depends on the formula constants returned by `app/pricing`. #[utoipa::path(post, path = "/api/v1/sync/subscription/quote", tag = "SyncKit", request_body = SyncQuoteRequest, responses((status = 200, description = "Quoted price", body = SyncQuoteResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::quote_subscription_price")] pub(super) async fn quote_subscription_price( State(_state): State, _sync_user: SyncUser, Json(req): Json, ) -> Result { let interval = SyncBillingInterval::parse(&req.interval)?; let price_cents = payments::quote_price_cents(req.cap_bytes, interval)?; Ok(Json(SyncQuoteResponse { cap_bytes: req.cap_bytes, interval: interval.as_str().to_string(), price_cents, })) } /// Create a Stripe Checkout Session for subscribing this user to the app's /// cloud sync at their chosen storage cap. The `app_sync_subscriptions` row /// is written by the Stripe webhook on `checkout.session.completed`. #[utoipa::path(post, path = "/api/v1/sync/subscription/checkout", tag = "SyncKit", request_body = SyncSubscribeRequest, responses((status = 200, description = "Checkout URL", body = SyncCheckoutResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::create_subscription_checkout")] pub(super) async fn create_subscription_checkout( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { if db::synckit::get_user_app_subscription(&state.db, sync_user.user_id, sync_user.app_id) .await? .is_some() { return Err(AppError::BadRequest( "Already subscribed; use the storage-cap endpoint to adjust your cap".to_string(), )); } let interval = SyncBillingInterval::parse(&req.interval)?; let amount_cents = payments::quote_price_cents(req.cap_bytes, interval)?; let app = db::synckit::get_sync_app_by_id(&state.db, sync_user.app_id) .await? .ok_or(AppError::NotFound)?; let cap_gib = req.cap_bytes / (1024 * 1024 * 1024); let product_name = format!("{} cloud sync — {} GiB", app.name, cap_gib); let stripe = state .stripe .as_ref() .ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?; let success_url = format!("{}/sync/subscribed", state.config.host_url); let cancel_url = format!("{}/sync/canceled", state.config.host_url); let result = stripe .create_synckit_app_sub_checkout_session(&crate::payments::SynckitAppSubCheckoutParams { product_name: &product_name, amount_cents, interval: interval.as_str(), user_id: sync_user.user_id, app_id: sync_user.app_id, // NOTE: `tier` carries the interval string here, matching the prior // positional call. Looks suspect (tier vs interval) but preserved // verbatim by this refactor — review separately, don't silently change. tier: interval.as_str(), storage_limit_bytes: Some(req.cap_bytes), success_url: &success_url, cancel_url: &cancel_url, }) .await?; let checkout_url = result .url .ok_or_else(|| AppError::BadRequest("No checkout URL returned".to_string()))?; Ok(Json(SyncCheckoutResponse { checkout_url })) } /// Queue a storage-cap change to take effect at the next billing cycle. /// Holding the change for the cycle boundary avoids mid-cycle proration /// surprises and keeps the user in control of when their bill changes. /// /// Updates Stripe first (re-price the subscription item with /// `proration_behavior=None`), then records the pending cap in the DB. The /// renewal webhook promotes the pending cap to active when Stripe rolls the /// period — and because the Stripe price is already updated, the new invoice /// is at the new price. #[utoipa::path(post, path = "/api/v1/sync/subscription/storage-cap", tag = "SyncKit", request_body = SyncCapChangeRequest, responses((status = 200, description = "Pending cap recorded", body = SyncSubscriptionStatusResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::queue_storage_cap_change")] pub(super) async fn queue_storage_cap_change( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { let sub = db::synckit::get_user_app_subscription(&state.db, sync_user.user_id, sync_user.app_id) .await? .ok_or_else(|| AppError::BadRequest("No active subscription to adjust".to_string()))?; if req.cap_bytes < payments::MIN_CAP_BYTES || req.cap_bytes > payments::MAX_CAP_BYTES { return Err(AppError::BadRequest(format!( "Storage cap must be between {} and {} GiB", payments::MIN_CAP_BYTES / (1024 * 1024 * 1024), payments::MAX_CAP_BYTES / (1024 * 1024 * 1024) ))); } let interval = payments::SyncBillingInterval::parse(&sub.interval)?; let new_price_cents = payments::quote_price_cents(req.cap_bytes, interval)?; let stripe = state .stripe .as_ref() .ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?; let app = db::synckit::get_sync_app_by_id(&state.db, sync_user.app_id) .await? .ok_or(AppError::NotFound)?; let cap_gib = req.cap_bytes / (1024 * 1024 * 1024); let product_name = format!("{} cloud sync — {} GiB", app.name, cap_gib); // Stripe first: if this fails, we want the DB pending cap to stay // unchanged so the user isn't sitting on an upgrade they never paid for. stripe .update_synckit_app_sub_price( &sub.stripe_subscription_id, new_price_cents, interval, &product_name, ) .await?; db::synckit::set_pending_storage_cap( &state.db, sync_user.user_id, sync_user.app_id, req.cap_bytes, ) .await?; Ok(Json(SyncSubscriptionStatusResponse { active: sub.status == "active", tier: Some(sub.interval), status: Some(sub.status), storage_limit_bytes: sub.storage_limit_bytes, pending_storage_limit_bytes: Some(req.cap_bytes), storage_used_bytes: None, current_period_end: sub.current_period_end.map(|t| t.to_rfc3339()), })) } // ── Device endpoints (JWT auth) ── /// Register a new sync device (or update an existing one by name). #[utoipa::path(post, path = "/api/v1/sync/devices", tag = "SyncKit", request_body = RegisterDeviceRequest, responses((status = 200, description = "Registered device", body = SyncDeviceResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::register_device")] pub(super) async fn register_device( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { validation::validate_sync_device_name(&req.device_name)?; // Enforce device limit (upsert on existing name is fine, only new names count) let count = db::synckit::count_sync_devices(&state.db, sync_user.app_id, sync_user.user_id).await?; if count >= constants::SYNCKIT_MAX_DEVICES_PER_APP { // Check if this is an existing device (upsert) — allow updates let existing = db::synckit::get_sync_devices(&state.db, sync_user.app_id, sync_user.user_id).await?; if !existing.iter().any(|d| d.device_name == req.device_name) { return Err(AppError::BadRequest(format!( "Maximum {} devices per app", constants::SYNCKIT_MAX_DEVICES_PER_APP ))); } } let device = db::synckit::upsert_sync_device( &state.db, sync_user.app_id, sync_user.user_id, &req.device_name, req.platform, ) .await?; Ok(Json(SyncDeviceResponse { id: device.id, app_id: device.app_id, user_id: device.user_id, device_name: device.device_name, platform: device.platform.to_string(), last_seen_at: device.last_seen_at, created_at: device.created_at, })) } /// List all devices registered for the authenticated user and app. #[utoipa::path(get, path = "/api/v1/sync/devices", tag = "SyncKit", responses((status = 200, description = "List of devices", body = Vec)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::list_devices")] pub(super) async fn list_devices( State(state): State, sync_user: SyncUser, ) -> Result { let devices = db::synckit::get_sync_devices(&state.db, sync_user.app_id, sync_user.user_id).await?; let response: Vec = devices.into_iter().map(|d| SyncDeviceResponse { id: d.id, app_id: d.app_id, user_id: d.user_id, device_name: d.device_name, platform: d.platform.to_string(), last_seen_at: d.last_seen_at, created_at: d.created_at, }).collect(); Ok(Json(response)) } /// Remove a registered device. #[utoipa::path(delete, path = "/api/v1/sync/devices/{id}", tag = "SyncKit", params(("id" = String, Path, description = "Device ID")), responses((status = 204, description = "Device deleted"), (status = 404, description = "Device not found")), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::delete_device")] pub(super) async fn delete_device( State(state): State, sync_user: SyncUser, Path(device_id): Path, ) -> Result { let deleted = db::synckit::delete_sync_device(&state.db, device_id, sync_user.app_id, sync_user.user_id) .await?; if !deleted { return Err(AppError::NotFound); } Ok(axum::http::StatusCode::NO_CONTENT) } // ── Key management endpoints (JWT auth) ── /// Store or update the user's encrypted master key envelope. #[utoipa::path(put, path = "/api/v1/sync/keys", tag = "SyncKit", request_body = PutKeyRequest, responses((status = 204, description = "Key stored"), (status = 409, description = "Version mismatch")), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::put_sync_key")] pub(super) async fn put_sync_key( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { // Max 4 KB for the encrypted key envelope if req.encrypted_key.len() > constants::SYNCKIT_MAX_KEY_ENVELOPE_BYTES { return Err(AppError::BadRequest( "Encrypted key exceeds 4KB limit".to_string(), )); } let updated = db::synckit::upsert_sync_key( &state.db, sync_user.app_id, sync_user.user_id, &req.encrypted_key, req.expected_version, ) .await?; if !updated { return Err(AppError::Conflict( "Key version mismatch — another device changed the password. Fetch the latest key and retry.".to_string(), )); } Ok(axum::http::StatusCode::NO_CONTENT) } /// Retrieve the user's encrypted master key envelope. #[utoipa::path(get, path = "/api/v1/sync/keys", tag = "SyncKit", responses((status = 200, description = "Encrypted key envelope", body = GetKeyResponse), (status = 404, description = "No key stored")), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::get_sync_key")] pub(super) async fn get_sync_key( State(state): State, sync_user: SyncUser, ) -> Result { let info = db::synckit::get_sync_key(&state.db, sync_user.app_id, sync_user.user_id) .await? .ok_or(AppError::NotFound)?; let pending_key = info.pending_key.map(|(encrypted_key, key_id)| { PendingKeyInfo { encrypted_key, key_id } }); Ok(Json(GetKeyResponse { encrypted_key: info.encrypted_key, key_version: info.key_version, key_id: info.key_id, pending_key, })) } // ── Key rotation endpoints ── /// Begin a key rotation. #[utoipa::path(post, path = "/api/v1/sync/keys/rotate", tag = "SyncKit", request_body = BeginRotationRequest, responses( (status = 200, description = "Rotation started or resumed", body = BeginRotationResponse), (status = 409, description = "Version mismatch or rotation in progress"), ), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::begin_rotation")] pub(super) async fn begin_rotation( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { if req.new_encrypted_key.len() > constants::SYNCKIT_MAX_KEY_ENVELOPE_BYTES { return Err(AppError::BadRequest( "Encrypted key exceeds 4KB limit".to_string(), )); } // Verify device belongs to this user + app let devices = db::synckit::get_sync_devices(&state.db, sync_user.app_id, sync_user.user_id).await?; if !devices.iter().any(|d| d.id == req.device_id) { return Err(AppError::BadRequest("Unknown device".to_string())); } let result = db::synckit::begin_key_rotation( &state.db, sync_user.app_id, sync_user.user_id, req.device_id, &req.new_encrypted_key, req.expected_key_version, ) .await?; match result { Ok(rotation) => Ok(Json(BeginRotationResponse { rotation_id: rotation.id, target_seq: rotation.target_seq, new_key_id: rotation.new_key_id, }).into_response()), Err(msg) => Err(AppError::Conflict(msg.to_string())), } } /// Pull entries that need re-encryption during a rotation. #[utoipa::path(post, path = "/api/v1/sync/keys/rotate/entries", tag = "SyncKit", request_body = RotationEntriesRequest, responses((status = 200, description = "Entries needing re-encryption", body = RotationEntriesResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::rotation_entries")] pub(super) async fn rotation_entries( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { let rotation = db::synckit::get_key_rotation( &state.db, sync_user.app_id, sync_user.user_id, ) .await? .ok_or_else(|| AppError::BadRequest("No active rotation".to_string()))?; if rotation.id != req.rotation_id { return Err(AppError::BadRequest("Rotation ID mismatch".to_string())); } let page_size = constants::SYNCKIT_ROTATION_BATCH_MAX as i64; let raw_entries = db::synckit::get_rotation_entries( &state.db, sync_user.app_id, sync_user.user_id, rotation.new_key_id, req.after_seq, page_size, ) .await?; let has_more = raw_entries.len() as i64 == page_size; let entries: Vec = raw_entries .into_iter() .map(|(seq, data)| RotationEntry { seq, data }) .collect(); Ok(Json(RotationEntriesResponse { entries, has_more })) } /// Submit a batch of re-encrypted entries during rotation. #[utoipa::path(post, path = "/api/v1/sync/keys/rotate/batch", tag = "SyncKit", request_body = RotationBatchRequest, responses((status = 200, description = "Batch processed", body = RotationBatchResponse)), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::rotation_batch")] pub(super) async fn rotation_batch( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { if req.entries.is_empty() { return Err(AppError::BadRequest("No entries provided".to_string())); } if req.entries.len() > constants::SYNCKIT_ROTATION_BATCH_MAX { return Err(AppError::BadRequest(format!( "Maximum {} entries per batch", constants::SYNCKIT_ROTATION_BATCH_MAX ))); } let rotation = db::synckit::get_key_rotation( &state.db, sync_user.app_id, sync_user.user_id, ) .await? .ok_or_else(|| AppError::BadRequest("No active rotation".to_string()))?; if rotation.id != req.rotation_id { return Err(AppError::BadRequest("Rotation ID mismatch".to_string())); } let entries: Vec<(i64, Option)> = req .entries .into_iter() .map(|e| (e.seq, e.data)) .collect(); let updated_count = db::synckit::submit_rotation_batch( &state.db, sync_user.app_id, sync_user.user_id, rotation.id, rotation.new_key_id, &entries, ) .await?; Ok(Json(RotationBatchResponse { updated_count })) } /// Complete a key rotation. #[utoipa::path(post, path = "/api/v1/sync/keys/rotate/complete", tag = "SyncKit", request_body = CompleteRotationRequest, responses( (status = 204, description = "Rotation completed"), (status = 409, description = "Entries still need re-encryption"), ), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::complete_rotation")] pub(super) async fn complete_rotation( State(state): State, sync_user: SyncUser, Json(req): Json, ) -> Result { let result = db::synckit::complete_key_rotation( &state.db, sync_user.app_id, sync_user.user_id, req.rotation_id, ) .await?; match result { Ok(_new_key_id) => Ok(axum::http::StatusCode::NO_CONTENT.into_response()), Err(0) => Err(AppError::BadRequest("No active rotation".to_string())), Err(remaining) => Ok(( axum::http::StatusCode::CONFLICT, Json(CompleteRotationErrorResponse { remaining }), ).into_response()), } } /// Cancel a stale rotation (>24h without activity). #[utoipa::path(delete, path = "/api/v1/sync/keys/rotate", tag = "SyncKit", responses( (status = 204, description = "Stale rotation cancelled"), (status = 404, description = "No stale rotation found"), ), security(("bearer" = [])), )] #[tracing::instrument(skip_all, name = "synckit::cancel_rotation")] pub(super) async fn cancel_rotation( State(state): State, sync_user: SyncUser, ) -> Result { let cancelled = db::synckit::cancel_stale_rotation( &state.db, sync_user.app_id, sync_user.user_id, constants::SYNCKIT_ROTATION_STALE_HOURS, ) .await?; if !cancelled { return Err(AppError::NotFound); } Ok(axum::http::StatusCode::NO_CONTENT) }