use serde_json::Value as JsonValue; use sqlx::PgPool; use crate::db::models::*; use crate::db::{SyncAppId, SyncDeviceId, UserId}; use crate::error::Result; // ── Sync Log ── /// Push a batch of changes to the sync log. Returns the highest seq assigned. /// /// `batch_id` is a client-generated UUID for idempotent push. If a batch with /// the same ID has already been committed for this app+user, the existing max /// seq is returned without re-inserting (at-most-once semantics). /// /// All changes are inserted within a single transaction for atomicity and /// performance (one fsync instead of N). #[allow(clippy::type_complexity)] #[tracing::instrument(skip_all)] pub async fn push_sync_changes( pool: &PgPool, app_id: SyncAppId, user_id: UserId, device_id: SyncDeviceId, batch_id: uuid::Uuid, changes: &[(String, String, String, chrono::DateTime, Option)], ) -> Result { if changes.is_empty() { return Ok(0); } // Decompose into parallel Vecs for UNNEST-based batch INSERT let mut table_names: Vec = Vec::with_capacity(changes.len()); let mut operations: Vec = Vec::with_capacity(changes.len()); let mut row_ids: Vec = Vec::with_capacity(changes.len()); let mut client_timestamps: Vec> = Vec::with_capacity(changes.len()); let mut data_values: Vec = Vec::with_capacity(changes.len()); for (table_name, operation, row_id, client_timestamp, data) in changes { table_names.push(table_name.clone()); operations.push(operation.clone()); row_ids.push(row_id.clone()); client_timestamps.push(*client_timestamp); data_values.push(data.clone().unwrap_or(JsonValue::Null)); } let mut tx = pool.begin().await?; // Idempotency check inside the transaction to prevent TOCTOU race. let existing: (Option,) = sqlx::query_as( "SELECT MAX(seq) FROM sync_log WHERE app_id = $1 AND user_id = $2 AND batch_id = $3", ) .bind(app_id) .bind(user_id) .bind(batch_id) .fetch_one(&mut *tx) .await?; if let Some(max_seq) = existing.0 { tracing::debug!(batch_id = %batch_id, cursor = max_seq, "Push batch already committed, returning existing cursor"); tx.rollback().await.ok(); return Ok(max_seq); } // Read the current key_id from sync_keys so pushed entries are stamped // with the active encryption key. Falls back to NULL if no key exists yet // (pre-encryption setup — entries have no encrypted data anyway). let current_key_id: Option = sqlx::query_scalar( "SELECT key_id FROM sync_keys WHERE app_id = $1 AND user_id = $2", ) .bind(app_id) .bind(user_id) .fetch_optional(&mut *tx) .await?; let seqs: Vec = sqlx::query_scalar( r#" INSERT INTO sync_log (app_id, user_id, device_id, batch_id, table_name, operation, row_id, client_timestamp, data, key_id) SELECT $1, $2, $3, $4, t.*, $10 FROM UNNEST($5::text[], $6::text[], $7::text[], $8::timestamptz[], $9::jsonb[]) AS t RETURNING seq "#, ) .bind(app_id) .bind(user_id) .bind(device_id) .bind(batch_id) .bind(&table_names) .bind(&operations) .bind(&row_ids) .bind(&client_timestamps) .bind(&data_values) .bind(current_key_id) .fetch_all(&mut *tx) .await?; tx.commit().await?; let max_seq = seqs.iter().copied().max().unwrap_or(0); Ok(max_seq) } /// Pull changes since a cursor for a user within an app. /// /// Prefer `pull_sync_changes_filtered` for new code; it supports optional /// table and timestamp filters. This function is kept for backward compatibility. #[allow(dead_code)] #[tracing::instrument(skip_all)] pub async fn pull_sync_changes( pool: &PgPool, app_id: SyncAppId, user_id: UserId, cursor: i64, limit: i64, ) -> Result> { let entries = sqlx::query_as::<_, DbSyncLogEntry>( r#" SELECT * FROM sync_log WHERE app_id = $1 AND user_id = $2 AND seq > $3 ORDER BY seq ASC LIMIT $4 "#, ) .bind(app_id) .bind(user_id) .bind(cursor) .bind(limit) .fetch_all(pool) .await?; Ok(entries) } /// Pull changes since a cursor with optional table and timestamp filters. /// /// When `tables` is `Some`, only entries whose `table_name` is in the list are returned. /// When `since` is `Some`, only entries with `client_timestamp >= since` are returned. /// Both filters compose (AND). Passing `None` for both is identical to `pull_sync_changes`. #[tracing::instrument(skip_all)] pub async fn pull_sync_changes_filtered( pool: &PgPool, app_id: SyncAppId, user_id: UserId, cursor: i64, limit: i64, tables: Option<&[String]>, since: Option>, ) -> Result> { let entries = sqlx::query_as::<_, DbSyncLogEntry>( r#" SELECT * FROM sync_log WHERE app_id = $1 AND user_id = $2 AND seq > $3 AND ($5::text[] IS NULL OR table_name = ANY($5)) AND ($6::timestamptz IS NULL OR client_timestamp >= $6) ORDER BY seq ASC LIMIT $4 "#, ) .bind(app_id) .bind(user_id) .bind(cursor) .bind(limit) .bind(tables) .bind(since) .fetch_all(pool) .await?; Ok(entries) }