Skip to main content

max / makenotwork

29.2 KB · 836 lines History Blame Raw
1 //! SyncKit push/pull, status, device management, and key management endpoints.
2
3 use axum::{
4 extract::{Path, State},
5 response::IntoResponse,
6 Json,
7 };
8
9 use crate::{
10 constants,
11 db::{self, SyncDeviceId},
12 error::{AppError, Result},
13 payments::{self, SyncBillingInterval},
14 synckit_auth::SyncUser,
15 validation,
16 AppState,
17 };
18
19 use super::{
20 AppPricingRequest, AppPricingResponse, BeginRotationRequest, BeginRotationResponse,
21 CompleteRotationErrorResponse, CompleteRotationRequest, GetKeyResponse, PendingKeyInfo,
22 PullChangeEntry, PullRequest, PullResponse, PushRequest, PushResponse, PutKeyRequest,
23 RegisterDeviceRequest, RotationBatchRequest, RotationBatchResponse, RotationEntriesRequest,
24 RotationEntriesResponse, RotationEntry, SyncAccountResponse, SyncCapChangeRequest,
25 SyncCheckoutResponse, SyncDeviceResponse, SyncQuoteRequest, SyncQuoteResponse,
26 SyncStatusResponse, SyncSubscribeRequest, SyncSubscriptionStatusResponse,
27 };
28
29 // ── Sync endpoints (JWT auth) ──
30
31 /// Push encrypted changelog entries from a device.
32 #[utoipa::path(post, path = "/api/v1/sync/push", tag = "SyncKit",
33 request_body = PushRequest,
34 responses((status = 200, description = "New cursor position", body = PushResponse)),
35 security(("bearer" = [])),
36 )]
37 #[tracing::instrument(skip_all, name = "synckit::sync_push", fields(app_id, user_id))]
38 pub(super) async fn sync_push(
39 State(state): State<AppState>,
40 sync_user: SyncUser,
41 Json(req): Json<PushRequest>,
42 ) -> Result<impl IntoResponse> {
43 let app_id = sync_user.app_id;
44 let user_id = sync_user.user_id;
45 tracing::Span::current().record("app_id", tracing::field::display(&app_id));
46 tracing::Span::current().record("user_id", tracing::field::display(&user_id));
47
48 if req.changes.is_empty() {
49 return Err(AppError::BadRequest("No changes provided".to_string()));
50 }
51 if req.changes.len() > constants::SYNCKIT_PUSH_MAX_CHANGES {
52 return Err(AppError::BadRequest(format!(
53 "Maximum {} changes per push",
54 constants::SYNCKIT_PUSH_MAX_CHANGES
55 )));
56 }
57
58 // Validate all changes
59 for change in &req.changes {
60 validation::validate_sync_table_name(&change.table)?;
61 validation::validate_sync_row_id(&change.row_id)?;
62 if change.op == db::SyncOperation::Delete && change.data.is_some() {
63 return Err(AppError::BadRequest(
64 "DELETE operations should not include data".to_string(),
65 ));
66 }
67 }
68
69 // Verify device belongs to this user + app
70 let devices = db::synckit::get_sync_devices(&state.db, app_id, user_id).await?;
71 if !devices.iter().any(|d| d.id == req.device_id) {
72 return Err(AppError::BadRequest("Unknown device".to_string()));
73 }
74
75 // Touch device
76 db::synckit::touch_sync_device(&state.db, req.device_id).await?;
77
78 // Build change tuples (op converted to string for TEXT column)
79 let changes: Vec<_> = req
80 .changes
81 .iter()
82 .map(|c| {
83 (
84 c.table.clone(),
85 c.op.to_string(),
86 c.row_id.clone(),
87 c.timestamp,
88 c.data.clone(),
89 )
90 })
91 .collect();
92
93 let cursor = db::synckit::push_sync_changes(
94 &state.db,
95 app_id,
96 user_id,
97 req.device_id,
98 req.batch_id,
99 &changes,
100 )
101 .await?;
102
103 // Notify SSE subscribers that new changes are available
104 if let Some(sender) = state.sync_notify.get(&(app_id, user_id)) {
105 let _ = sender.send(()); // Ignore errors (no subscribers = ok)
106 }
107
108 Ok(Json(PushResponse { cursor }))
109 }
110
111 /// Pull changelog entries after a given cursor.
112 #[utoipa::path(post, path = "/api/v1/sync/pull", tag = "SyncKit",
113 request_body = PullRequest,
114 responses((status = 200, description = "Changes since cursor", body = PullResponse)),
115 security(("bearer" = [])),
116 )]
117 #[tracing::instrument(skip_all, name = "synckit::sync_pull", fields(app_id, user_id))]
118 pub(super) async fn sync_pull(
119 State(state): State<AppState>,
120 sync_user: SyncUser,
121 Json(req): Json<PullRequest>,
122 ) -> Result<impl IntoResponse> {
123 let app_id = sync_user.app_id;
124 let user_id = sync_user.user_id;
125 tracing::Span::current().record("app_id", tracing::field::display(&app_id));
126 tracing::Span::current().record("user_id", tracing::field::display(&user_id));
127
128 // Verify device belongs to this user + app
129 let devices = db::synckit::get_sync_devices(&state.db, app_id, user_id).await?;
130 if !devices.iter().any(|d| d.id == req.device_id) {
131 return Err(AppError::BadRequest("Unknown device".to_string()));
132 }
133
134 // Touch device
135 db::synckit::touch_sync_device(&state.db, req.device_id).await?;
136
137 // Validate table name filters if provided
138 if let Some(ref tables) = req.tables {
139 if tables.len() > 50 {
140 return Err(AppError::BadRequest(
141 "Maximum 50 table names per filter".to_string(),
142 ));
143 }
144 for table in tables {
145 validation::validate_sync_table_name(table)?;
146 }
147 }
148
149 let page_size = constants::SYNCKIT_PULL_PAGE_SIZE;
150 let entries = db::synckit::pull_sync_changes_filtered(
151 &state.db,
152 app_id,
153 user_id,
154 req.cursor,
155 page_size,
156 req.tables.as_deref(),
157 req.since,
158 )
159 .await?;
160
161 let has_more = entries.len() as i64 == page_size;
162 let new_cursor = entries.last().map(|e| e.seq).unwrap_or(req.cursor);
163
164 // Track this device's cursor position for sync log compaction.
165 // GREATEST in the query ensures we never regress the cursor.
166 if new_cursor > req.cursor {
167 db::synckit::update_device_cursor(&state.db, req.device_id, new_cursor).await?;
168 }
169
170 let changes: Vec<PullChangeEntry> = entries
171 .into_iter()
172 .map(|e| PullChangeEntry {
173 seq: e.seq,
174 device_id: e.device_id,
175 table: e.table_name,
176 op: e.operation.to_string(),
177 row_id: e.row_id,
178 timestamp: e.client_timestamp,
179 data: e.data,
180 key_id: e.key_id,
181 })
182 .collect();
183
184 Ok(Json(PullResponse {
185 changes,
186 cursor: new_cursor,
187 has_more,
188 }))
189 }
190
191 /// Return sync metadata for the authenticated user and app.
192 #[utoipa::path(get, path = "/api/v1/sync/status", tag = "SyncKit",
193 responses((status = 200, description = "Sync status", body = SyncStatusResponse)),
194 security(("bearer" = [])),
195 )]
196 #[tracing::instrument(skip_all, name = "synckit::sync_status")]
197 pub(super) async fn sync_status(
198 State(state): State<AppState>,
199 sync_user: SyncUser,
200 ) -> Result<impl IntoResponse> {
201 let app_id = sync_user.app_id;
202 let user_id = sync_user.user_id;
203
204 let (total_changes, latest_cursor) =
205 db::synckit::get_sync_status(&state.db, app_id, user_id).await?;
206
207 Ok(Json(SyncStatusResponse {
208 total_changes,
209 latest_cursor,
210 }))
211 }
212
213 /// Return the authenticated user's email and username, for the app to display
214 /// "logged in as ..." in its sync UI.
215 #[utoipa::path(get, path = "/api/v1/sync/account", tag = "SyncKit",
216 responses((status = 200, description = "Account info", body = SyncAccountResponse)),
217 security(("bearer" = [])),
218 )]
219 #[tracing::instrument(skip_all, name = "synckit::sync_account")]
220 pub(super) async fn sync_account(
221 State(state): State<AppState>,
222 sync_user: SyncUser,
223 ) -> Result<impl IntoResponse> {
224 let user = db::users::get_user_by_id(&state.db, sync_user.user_id)
225 .await?
226 .ok_or(AppError::NotFound)?;
227
228 Ok(Json(SyncAccountResponse {
229 email: user.email.as_str().to_string(),
230 username: user.username.as_str().to_string(),
231 }))
232 }
233
234 /// Return the authenticated user's subscription status for this app.
235 /// Returns `active: false` and `None` fields when the user has no subscription
236 /// (rather than 404) so clients can render a "subscribe" CTA uniformly.
237 #[utoipa::path(get, path = "/api/v1/sync/subscription", tag = "SyncKit",
238 responses((status = 200, description = "Subscription status", body = SyncSubscriptionStatusResponse)),
239 security(("bearer" = [])),
240 )]
241 #[tracing::instrument(skip_all, name = "synckit::sync_subscription_status")]
242 pub(super) async fn sync_subscription_status(
243 State(state): State<AppState>,
244 sync_user: SyncUser,
245 ) -> Result<impl IntoResponse> {
246 let sub = db::synckit::get_user_app_subscription(
247 &state.db,
248 sync_user.user_id,
249 sync_user.app_id,
250 )
251 .await?;
252
253 let response = match sub {
254 Some(s) => SyncSubscriptionStatusResponse {
255 active: s.status == "active",
256 tier: Some(s.interval),
257 status: Some(s.status),
258 storage_limit_bytes: s.storage_limit_bytes,
259 pending_storage_limit_bytes: s.pending_storage_limit_bytes,
260 storage_used_bytes: None,
261 current_period_end: s.current_period_end.map(|t| t.to_rfc3339()),
262 },
263 None => SyncSubscriptionStatusResponse {
264 active: false,
265 tier: None,
266 status: None,
267 storage_limit_bytes: None,
268 pending_storage_limit_bytes: None,
269 storage_used_bytes: None,
270 current_period_end: None,
271 },
272 };
273
274 Ok(Json(response))
275 }
276
277 /// Return the pricing-formula constants for an app. The client uses these to
278 /// quote a price locally as the user adjusts the cap slider; the same formula
279 /// is enforced server-side at checkout so the client number is only advisory.
280 #[utoipa::path(post, path = "/api/v1/sync/app/pricing", tag = "SyncKit",
281 request_body = AppPricingRequest,
282 responses((status = 200, description = "Pricing formula", body = AppPricingResponse)),
283 )]
284 #[tracing::instrument(skip_all, name = "synckit::get_app_pricing")]
285 pub(super) async fn get_app_pricing(
286 State(state): State<AppState>,
287 Json(req): Json<AppPricingRequest>,
288 ) -> Result<impl IntoResponse> {
289 let app = db::synckit::get_sync_app_by_api_key(&state.db, &req.api_key)
290 .await?
291 .ok_or(AppError::NotFound)?;
292
293 Ok(Json(AppPricingResponse {
294 app_name: app.name,
295 min_charge_cents: payments::MIN_CHARGE_CENTS,
296 per_gb_tenths_of_cent_per_month: payments::synckit_app_pricing::PER_GB_TENTHS_OF_CENT_PER_MONTH,
297 annual_multiplier: payments::ANNUAL_MULTIPLIER,
298 min_cap_bytes: payments::MIN_CAP_BYTES,
299 max_cap_bytes: payments::MAX_CAP_BYTES,
300 }))
301 }
302
303 /// Quote the price for a (cap, interval) pair. Authenticated so clients
304 /// cannot scrape pricing without an account, but otherwise pure: the result
305 /// only depends on the formula constants returned by `app/pricing`.
306 #[utoipa::path(post, path = "/api/v1/sync/subscription/quote", tag = "SyncKit",
307 request_body = SyncQuoteRequest,
308 responses((status = 200, description = "Quoted price", body = SyncQuoteResponse)),
309 security(("bearer" = [])),
310 )]
311 #[tracing::instrument(skip_all, name = "synckit::quote_subscription_price")]
312 pub(super) async fn quote_subscription_price(
313 State(_state): State<AppState>,
314 _sync_user: SyncUser,
315 Json(req): Json<SyncQuoteRequest>,
316 ) -> Result<impl IntoResponse> {
317 let interval = SyncBillingInterval::parse(&req.interval)?;
318 let price_cents = payments::quote_price_cents(req.cap_bytes, interval)?;
319 Ok(Json(SyncQuoteResponse {
320 cap_bytes: req.cap_bytes,
321 interval: interval.as_str().to_string(),
322 price_cents,
323 }))
324 }
325
326 /// Create a Stripe Checkout Session for subscribing this user to the app's
327 /// cloud sync at their chosen storage cap. The `app_sync_subscriptions` row
328 /// is written by the Stripe webhook on `checkout.session.completed`.
329 #[utoipa::path(post, path = "/api/v1/sync/subscription/checkout", tag = "SyncKit",
330 request_body = SyncSubscribeRequest,
331 responses((status = 200, description = "Checkout URL", body = SyncCheckoutResponse)),
332 security(("bearer" = [])),
333 )]
334 #[tracing::instrument(skip_all, name = "synckit::create_subscription_checkout")]
335 pub(super) async fn create_subscription_checkout(
336 State(state): State<AppState>,
337 sync_user: SyncUser,
338 Json(req): Json<SyncSubscribeRequest>,
339 ) -> Result<impl IntoResponse> {
340 if db::synckit::get_user_app_subscription(&state.db, sync_user.user_id, sync_user.app_id)
341 .await?
342 .is_some()
343 {
344 return Err(AppError::BadRequest(
345 "Already subscribed; use the storage-cap endpoint to adjust your cap".to_string(),
346 ));
347 }
348
349 let interval = SyncBillingInterval::parse(&req.interval)?;
350 let amount_cents = payments::quote_price_cents(req.cap_bytes, interval)?;
351
352 let app = db::synckit::get_sync_app_by_id(&state.db, sync_user.app_id)
353 .await?
354 .ok_or(AppError::NotFound)?;
355 let cap_gib = req.cap_bytes / (1024 * 1024 * 1024);
356 let product_name = format!("{} cloud sync — {} GiB", app.name, cap_gib);
357
358 let stripe = state
359 .stripe
360 .as_ref()
361 .ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?;
362
363 let success_url = format!("{}/sync/subscribed", state.config.host_url);
364 let cancel_url = format!("{}/sync/canceled", state.config.host_url);
365
366 let result = stripe
367 .create_synckit_app_sub_checkout_session(&crate::payments::SynckitAppSubCheckoutParams {
368 product_name: &product_name,
369 amount_cents,
370 interval: interval.as_str(),
371 user_id: sync_user.user_id,
372 app_id: sync_user.app_id,
373 // NOTE: `tier` carries the interval string here, matching the prior
374 // positional call. Looks suspect (tier vs interval) but preserved
375 // verbatim by this refactor — review separately, don't silently change.
376 tier: interval.as_str(),
377 storage_limit_bytes: Some(req.cap_bytes),
378 success_url: &success_url,
379 cancel_url: &cancel_url,
380 })
381 .await?;
382
383 let checkout_url = result
384 .url
385 .ok_or_else(|| AppError::BadRequest("No checkout URL returned".to_string()))?;
386
387 Ok(Json(SyncCheckoutResponse { checkout_url }))
388 }
389
390 /// Queue a storage-cap change to take effect at the next billing cycle.
391 /// Holding the change for the cycle boundary avoids mid-cycle proration
392 /// surprises and keeps the user in control of when their bill changes.
393 ///
394 /// Updates Stripe first (re-price the subscription item with
395 /// `proration_behavior=None`), then records the pending cap in the DB. The
396 /// renewal webhook promotes the pending cap to active when Stripe rolls the
397 /// period — and because the Stripe price is already updated, the new invoice
398 /// is at the new price.
399 #[utoipa::path(post, path = "/api/v1/sync/subscription/storage-cap", tag = "SyncKit",
400 request_body = SyncCapChangeRequest,
401 responses((status = 200, description = "Pending cap recorded", body = SyncSubscriptionStatusResponse)),
402 security(("bearer" = [])),
403 )]
404 #[tracing::instrument(skip_all, name = "synckit::queue_storage_cap_change")]
405 pub(super) async fn queue_storage_cap_change(
406 State(state): State<AppState>,
407 sync_user: SyncUser,
408 Json(req): Json<SyncCapChangeRequest>,
409 ) -> Result<impl IntoResponse> {
410 let sub = db::synckit::get_user_app_subscription(&state.db, sync_user.user_id, sync_user.app_id)
411 .await?
412 .ok_or_else(|| AppError::BadRequest("No active subscription to adjust".to_string()))?;
413
414 if req.cap_bytes < payments::MIN_CAP_BYTES || req.cap_bytes > payments::MAX_CAP_BYTES {
415 return Err(AppError::BadRequest(format!(
416 "Storage cap must be between {} and {} GiB",
417 payments::MIN_CAP_BYTES / (1024 * 1024 * 1024),
418 payments::MAX_CAP_BYTES / (1024 * 1024 * 1024)
419 )));
420 }
421
422 let interval = payments::SyncBillingInterval::parse(&sub.interval)?;
423 let new_price_cents = payments::quote_price_cents(req.cap_bytes, interval)?;
424
425 let stripe = state
426 .stripe
427 .as_ref()
428 .ok_or_else(|| AppError::BadRequest("Stripe is not configured".to_string()))?;
429
430 let app = db::synckit::get_sync_app_by_id(&state.db, sync_user.app_id)
431 .await?
432 .ok_or(AppError::NotFound)?;
433 let cap_gib = req.cap_bytes / (1024 * 1024 * 1024);
434 let product_name = format!("{} cloud sync — {} GiB", app.name, cap_gib);
435
436 // Stripe first: if this fails, we want the DB pending cap to stay
437 // unchanged so the user isn't sitting on an upgrade they never paid for.
438 stripe
439 .update_synckit_app_sub_price(
440 &sub.stripe_subscription_id,
441 new_price_cents,
442 interval,
443 &product_name,
444 )
445 .await?;
446
447 db::synckit::set_pending_storage_cap(
448 &state.db,
449 sync_user.user_id,
450 sync_user.app_id,
451 req.cap_bytes,
452 )
453 .await?;
454
455 Ok(Json(SyncSubscriptionStatusResponse {
456 active: sub.status == "active",
457 tier: Some(sub.interval),
458 status: Some(sub.status),
459 storage_limit_bytes: sub.storage_limit_bytes,
460 pending_storage_limit_bytes: Some(req.cap_bytes),
461 storage_used_bytes: None,
462 current_period_end: sub.current_period_end.map(|t| t.to_rfc3339()),
463 }))
464 }
465
466 // ── Device endpoints (JWT auth) ──
467
468 /// Register a new sync device (or update an existing one by name).
469 #[utoipa::path(post, path = "/api/v1/sync/devices", tag = "SyncKit",
470 request_body = RegisterDeviceRequest,
471 responses((status = 200, description = "Registered device", body = SyncDeviceResponse)),
472 security(("bearer" = [])),
473 )]
474 #[tracing::instrument(skip_all, name = "synckit::register_device")]
475 pub(super) async fn register_device(
476 State(state): State<AppState>,
477 sync_user: SyncUser,
478 Json(req): Json<RegisterDeviceRequest>,
479 ) -> Result<impl IntoResponse> {
480 validation::validate_sync_device_name(&req.device_name)?;
481
482 // Enforce device limit (upsert on existing name is fine, only new names count)
483 let count = db::synckit::count_sync_devices(&state.db, sync_user.app_id, sync_user.user_id).await?;
484 if count >= constants::SYNCKIT_MAX_DEVICES_PER_APP {
485 // Check if this is an existing device (upsert) — allow updates
486 let existing = db::synckit::get_sync_devices(&state.db, sync_user.app_id, sync_user.user_id).await?;
487 if !existing.iter().any(|d| d.device_name == req.device_name) {
488 return Err(AppError::BadRequest(format!(
489 "Maximum {} devices per app",
490 constants::SYNCKIT_MAX_DEVICES_PER_APP
491 )));
492 }
493 }
494
495 let device = db::synckit::upsert_sync_device(
496 &state.db,
497 sync_user.app_id,
498 sync_user.user_id,
499 &req.device_name,
500 req.platform,
501 )
502 .await?;
503
504 Ok(Json(SyncDeviceResponse {
505 id: device.id,
506 app_id: device.app_id,
507 user_id: device.user_id,
508 device_name: device.device_name,
509 platform: device.platform.to_string(),
510 last_seen_at: device.last_seen_at,
511 created_at: device.created_at,
512 }))
513 }
514
515 /// List all devices registered for the authenticated user and app.
516 #[utoipa::path(get, path = "/api/v1/sync/devices", tag = "SyncKit",
517 responses((status = 200, description = "List of devices", body = Vec<SyncDeviceResponse>)),
518 security(("bearer" = [])),
519 )]
520 #[tracing::instrument(skip_all, name = "synckit::list_devices")]
521 pub(super) async fn list_devices(
522 State(state): State<AppState>,
523 sync_user: SyncUser,
524 ) -> Result<impl IntoResponse> {
525 let devices =
526 db::synckit::get_sync_devices(&state.db, sync_user.app_id, sync_user.user_id).await?;
527
528 let response: Vec<SyncDeviceResponse> = devices.into_iter().map(|d| SyncDeviceResponse {
529 id: d.id,
530 app_id: d.app_id,
531 user_id: d.user_id,
532 device_name: d.device_name,
533 platform: d.platform.to_string(),
534 last_seen_at: d.last_seen_at,
535 created_at: d.created_at,
536 }).collect();
537
538 Ok(Json(response))
539 }
540
541 /// Remove a registered device.
542 #[utoipa::path(delete, path = "/api/v1/sync/devices/{id}", tag = "SyncKit",
543 params(("id" = String, Path, description = "Device ID")),
544 responses((status = 204, description = "Device deleted"), (status = 404, description = "Device not found")),
545 security(("bearer" = [])),
546 )]
547 #[tracing::instrument(skip_all, name = "synckit::delete_device")]
548 pub(super) async fn delete_device(
549 State(state): State<AppState>,
550 sync_user: SyncUser,
551 Path(device_id): Path<SyncDeviceId>,
552 ) -> Result<impl IntoResponse> {
553 let deleted =
554 db::synckit::delete_sync_device(&state.db, device_id, sync_user.app_id, sync_user.user_id)
555 .await?;
556
557 if !deleted {
558 return Err(AppError::NotFound);
559 }
560
561 Ok(axum::http::StatusCode::NO_CONTENT)
562 }
563
564 // ── Key management endpoints (JWT auth) ──
565
566 /// Store or update the user's encrypted master key envelope.
567 #[utoipa::path(put, path = "/api/v1/sync/keys", tag = "SyncKit",
568 request_body = PutKeyRequest,
569 responses((status = 204, description = "Key stored"), (status = 409, description = "Version mismatch")),
570 security(("bearer" = [])),
571 )]
572 #[tracing::instrument(skip_all, name = "synckit::put_sync_key")]
573 pub(super) async fn put_sync_key(
574 State(state): State<AppState>,
575 sync_user: SyncUser,
576 Json(req): Json<PutKeyRequest>,
577 ) -> Result<impl IntoResponse> {
578 // Max 4 KB for the encrypted key envelope
579 if req.encrypted_key.len() > constants::SYNCKIT_MAX_KEY_ENVELOPE_BYTES {
580 return Err(AppError::BadRequest(
581 "Encrypted key exceeds 4KB limit".to_string(),
582 ));
583 }
584
585 let updated = db::synckit::upsert_sync_key(
586 &state.db,
587 sync_user.app_id,
588 sync_user.user_id,
589 &req.encrypted_key,
590 req.expected_version,
591 )
592 .await?;
593
594 if !updated {
595 return Err(AppError::Conflict(
596 "Key version mismatch — another device changed the password. Fetch the latest key and retry.".to_string(),
597 ));
598 }
599
600 Ok(axum::http::StatusCode::NO_CONTENT)
601 }
602
603 /// Retrieve the user's encrypted master key envelope.
604 #[utoipa::path(get, path = "/api/v1/sync/keys", tag = "SyncKit",
605 responses((status = 200, description = "Encrypted key envelope", body = GetKeyResponse), (status = 404, description = "No key stored")),
606 security(("bearer" = [])),
607 )]
608 #[tracing::instrument(skip_all, name = "synckit::get_sync_key")]
609 pub(super) async fn get_sync_key(
610 State(state): State<AppState>,
611 sync_user: SyncUser,
612 ) -> Result<impl IntoResponse> {
613 let info =
614 db::synckit::get_sync_key(&state.db, sync_user.app_id, sync_user.user_id)
615 .await?
616 .ok_or(AppError::NotFound)?;
617
618 let pending_key = info.pending_key.map(|(encrypted_key, key_id)| {
619 PendingKeyInfo { encrypted_key, key_id }
620 });
621
622 Ok(Json(GetKeyResponse {
623 encrypted_key: info.encrypted_key,
624 key_version: info.key_version,
625 key_id: info.key_id,
626 pending_key,
627 }))
628 }
629
630 // ── Key rotation endpoints ──
631
632 /// Begin a key rotation.
633 #[utoipa::path(post, path = "/api/v1/sync/keys/rotate", tag = "SyncKit",
634 request_body = BeginRotationRequest,
635 responses(
636 (status = 200, description = "Rotation started or resumed", body = BeginRotationResponse),
637 (status = 409, description = "Version mismatch or rotation in progress"),
638 ),
639 security(("bearer" = [])),
640 )]
641 #[tracing::instrument(skip_all, name = "synckit::begin_rotation")]
642 pub(super) async fn begin_rotation(
643 State(state): State<AppState>,
644 sync_user: SyncUser,
645 Json(req): Json<BeginRotationRequest>,
646 ) -> Result<impl IntoResponse> {
647 if req.new_encrypted_key.len() > constants::SYNCKIT_MAX_KEY_ENVELOPE_BYTES {
648 return Err(AppError::BadRequest(
649 "Encrypted key exceeds 4KB limit".to_string(),
650 ));
651 }
652
653 // Verify device belongs to this user + app
654 let devices = db::synckit::get_sync_devices(&state.db, sync_user.app_id, sync_user.user_id).await?;
655 if !devices.iter().any(|d| d.id == req.device_id) {
656 return Err(AppError::BadRequest("Unknown device".to_string()));
657 }
658
659 let result = db::synckit::begin_key_rotation(
660 &state.db,
661 sync_user.app_id,
662 sync_user.user_id,
663 req.device_id,
664 &req.new_encrypted_key,
665 req.expected_key_version,
666 )
667 .await?;
668
669 match result {
670 Ok(rotation) => Ok(Json(BeginRotationResponse {
671 rotation_id: rotation.id,
672 target_seq: rotation.target_seq,
673 new_key_id: rotation.new_key_id,
674 }).into_response()),
675 Err(msg) => Err(AppError::Conflict(msg.to_string())),
676 }
677 }
678
679 /// Pull entries that need re-encryption during a rotation.
680 #[utoipa::path(post, path = "/api/v1/sync/keys/rotate/entries", tag = "SyncKit",
681 request_body = RotationEntriesRequest,
682 responses((status = 200, description = "Entries needing re-encryption", body = RotationEntriesResponse)),
683 security(("bearer" = [])),
684 )]
685 #[tracing::instrument(skip_all, name = "synckit::rotation_entries")]
686 pub(super) async fn rotation_entries(
687 State(state): State<AppState>,
688 sync_user: SyncUser,
689 Json(req): Json<RotationEntriesRequest>,
690 ) -> Result<impl IntoResponse> {
691 let rotation = db::synckit::get_key_rotation(
692 &state.db,
693 sync_user.app_id,
694 sync_user.user_id,
695 )
696 .await?
697 .ok_or_else(|| AppError::BadRequest("No active rotation".to_string()))?;
698
699 if rotation.id != req.rotation_id {
700 return Err(AppError::BadRequest("Rotation ID mismatch".to_string()));
701 }
702
703 let page_size = constants::SYNCKIT_ROTATION_BATCH_MAX as i64;
704 let raw_entries = db::synckit::get_rotation_entries(
705 &state.db,
706 sync_user.app_id,
707 sync_user.user_id,
708 rotation.new_key_id,
709 req.after_seq,
710 page_size,
711 )
712 .await?;
713
714 let has_more = raw_entries.len() as i64 == page_size;
715 let entries: Vec<RotationEntry> = raw_entries
716 .into_iter()
717 .map(|(seq, data)| RotationEntry { seq, data })
718 .collect();
719
720 Ok(Json(RotationEntriesResponse { entries, has_more }))
721 }
722
723 /// Submit a batch of re-encrypted entries during rotation.
724 #[utoipa::path(post, path = "/api/v1/sync/keys/rotate/batch", tag = "SyncKit",
725 request_body = RotationBatchRequest,
726 responses((status = 200, description = "Batch processed", body = RotationBatchResponse)),
727 security(("bearer" = [])),
728 )]
729 #[tracing::instrument(skip_all, name = "synckit::rotation_batch")]
730 pub(super) async fn rotation_batch(
731 State(state): State<AppState>,
732 sync_user: SyncUser,
733 Json(req): Json<RotationBatchRequest>,
734 ) -> Result<impl IntoResponse> {
735 if req.entries.is_empty() {
736 return Err(AppError::BadRequest("No entries provided".to_string()));
737 }
738 if req.entries.len() > constants::SYNCKIT_ROTATION_BATCH_MAX {
739 return Err(AppError::BadRequest(format!(
740 "Maximum {} entries per batch",
741 constants::SYNCKIT_ROTATION_BATCH_MAX
742 )));
743 }
744
745 let rotation = db::synckit::get_key_rotation(
746 &state.db,
747 sync_user.app_id,
748 sync_user.user_id,
749 )
750 .await?
751 .ok_or_else(|| AppError::BadRequest("No active rotation".to_string()))?;
752
753 if rotation.id != req.rotation_id {
754 return Err(AppError::BadRequest("Rotation ID mismatch".to_string()));
755 }
756
757 let entries: Vec<(i64, Option<serde_json::Value>)> = req
758 .entries
759 .into_iter()
760 .map(|e| (e.seq, e.data))
761 .collect();
762
763 let updated_count = db::synckit::submit_rotation_batch(
764 &state.db,
765 sync_user.app_id,
766 sync_user.user_id,
767 rotation.id,
768 rotation.new_key_id,
769 &entries,
770 )
771 .await?;
772
773 Ok(Json(RotationBatchResponse { updated_count }))
774 }
775
776 /// Complete a key rotation.
777 #[utoipa::path(post, path = "/api/v1/sync/keys/rotate/complete", tag = "SyncKit",
778 request_body = CompleteRotationRequest,
779 responses(
780 (status = 204, description = "Rotation completed"),
781 (status = 409, description = "Entries still need re-encryption"),
782 ),
783 security(("bearer" = [])),
784 )]
785 #[tracing::instrument(skip_all, name = "synckit::complete_rotation")]
786 pub(super) async fn complete_rotation(
787 State(state): State<AppState>,
788 sync_user: SyncUser,
789 Json(req): Json<CompleteRotationRequest>,
790 ) -> Result<impl IntoResponse> {
791 let result = db::synckit::complete_key_rotation(
792 &state.db,
793 sync_user.app_id,
794 sync_user.user_id,
795 req.rotation_id,
796 )
797 .await?;
798
799 match result {
800 Ok(_new_key_id) => Ok(axum::http::StatusCode::NO_CONTENT.into_response()),
801 Err(0) => Err(AppError::BadRequest("No active rotation".to_string())),
802 Err(remaining) => Ok((
803 axum::http::StatusCode::CONFLICT,
804 Json(CompleteRotationErrorResponse { remaining }),
805 ).into_response()),
806 }
807 }
808
809 /// Cancel a stale rotation (>24h without activity).
810 #[utoipa::path(delete, path = "/api/v1/sync/keys/rotate", tag = "SyncKit",
811 responses(
812 (status = 204, description = "Stale rotation cancelled"),
813 (status = 404, description = "No stale rotation found"),
814 ),
815 security(("bearer" = [])),
816 )]
817 #[tracing::instrument(skip_all, name = "synckit::cancel_rotation")]
818 pub(super) async fn cancel_rotation(
819 State(state): State<AppState>,
820 sync_user: SyncUser,
821 ) -> Result<impl IntoResponse> {
822 let cancelled = db::synckit::cancel_stale_rotation(
823 &state.db,
824 sync_user.app_id,
825 sync_user.user_id,
826 constants::SYNCKIT_ROTATION_STALE_HOURS,
827 )
828 .await?;
829
830 if !cancelled {
831 return Err(AppError::NotFound);
832 }
833
834 Ok(axum::http::StatusCode::NO_CONTENT)
835 }
836