//! Webhook handlers for checkout.session.completed events. use crate::{ db, error::{AppError, Result, ResultExt}, helpers::{self, spawn_email}, payments::{CheckoutMetadata, CreatorTierCheckoutMetadata, FanPlusCheckoutMetadata, SubscriptionCheckoutMetadata, SynckitAppSubCheckoutMetadata, TipCheckoutMetadata}, AppState, }; use super::checkout_helpers::{ check_pending_refund, maybe_generate_license_key, record_tip_splits, record_transaction_splits, send_guest_sale_notification, send_purchase_emails, send_tip_email, subscribe_buyer_to_mailing_list, }; /// Handle checkout.session.completed for one-time purchases #[tracing::instrument(skip_all, name = "stripe::handle_purchase_checkout")] pub(super) async fn handle_purchase_checkout_completed( state: &AppState, session: &crate::payments::CheckoutSessionView, event_id: &str, ) -> Result<()> { let session_id = session.id.clone(); tracing::info!(session_id = %session_id, "processing completed purchase checkout"); // Extract metadata (already typed IDs from CheckoutMetadata) let raw_metadata = CheckoutMetadata::from_metadata(session.metadata.as_ref())?; let buyer_id = raw_metadata.buyer_id; let seller_id = raw_metadata.seller_id; let item_id = raw_metadata.item_id; let _promo_code_id = raw_metadata.promo_code_id; let item_id_display = item_id.map(|id| id.to_string()).unwrap_or_else(|| "project".to_string()); // Get the payment intent ID let payment_intent_id = session.payment_intent.clone().unwrap_or_else(|| "unknown".to_string()); // Complete the transaction (idempotent - returns None if already completed). // Steps 1-3 (complete_transaction, increment_sales_count, discount code increment) // are wrapped in a single DB transaction to prevent inconsistent state if any step fails. let mut db_tx = state.db.begin().await.context("begin purchase webhook transaction")?; match db::transactions::complete_transaction(&mut *db_tx, &session_id, &payment_intent_id).await { Ok(Some(tx)) => { tracing::info!( buyer_id = %buyer_id, seller_id = %seller_id, item_id = %item_id_display, amount_cents = %tx.amount_cents, "transaction completed" ); // Defense-in-depth reconciliation: our line items are server-built, // so Stripe's pre-tax subtotal should equal the amount we credit. A // mismatch (a future Stripe Tax / price-edit / currency edge) is // logged loudly rather than silently trusted — we still credit the // server amount, which is authoritative. if let Some(subtotal) = session.amount_subtotal && subtotal != i64::from(tx.amount_cents) { tracing::error!( session_id = %session_id, credited_cents = %tx.amount_cents, stripe_subtotal_cents = %subtotal, "checkout amount mismatch: credited transaction amount differs from Stripe session subtotal" ); } // Increment denormalized sales_count (inside transaction) if let Some(iid) = item_id { db::items::increment_sales_count(&mut *db_tx, iid) .await .with_context(|| format!("increment sales count for item {iid}"))?; } // Promo code use_count is reserved at checkout time (not here) to prevent // concurrent checkouts from exceeding max_uses. No increment needed in webhook. // Commit the critical data integrity operations db_tx.commit().await.context("commit purchase webhook transaction")?; // --- Secondary effects below (outside transaction) --- // Grant access to bundle child items (if this is a bundle) if let Some(iid) = item_id && let Ok(Some(purchased_item)) = db::items::get_item_by_id(&state.db, iid).await && purchased_item.item_type == db::ItemType::Bundle { crate::routes::stripe::checkout::grant_bundle_items(state, iid, buyer_id, seller_id, Some(tx.id)).await; } if tx.share_contact { db::transactions::clear_contact_revocation(&state.db, buyer_id, seller_id) .await .context("clear contact revocation after purchase")?; } // Record revenue splits if the item's project has members if let Some(iid) = item_id { record_transaction_splits(state, tx.id, iid, tx.amount_cents).await; maybe_generate_license_key(state, iid, buyer_id, tx.id).await; subscribe_buyer_to_mailing_list(state, iid, buyer_id); } send_purchase_emails(state, &tx, buyer_id, seller_id); if let Err(e) = db::subscriptions::log_subscription_event( &state.db, None, event_id, "checkout.session.completed.purchase", &serde_json::json!({"session_id": session_id}), ).await { tracing::warn!(event_id = %event_id, error = ?e, "failed to log subscription event"); } // Check for a pending refund that arrived before this payment webhook. // If found, process it now that the transaction is completed. check_pending_refund(state, &payment_intent_id).await; } Ok(None) => { tracing::info!(session_id = %session_id, "transaction already completed, ignoring duplicate webhook"); } Err(e) => { tracing::error!(session_id = %session_id, error = ?e, "failed to complete transaction"); return Err(e); } } Ok(()) } /// Handle checkout.session.completed for cart (multi-item) purchases #[tracing::instrument(skip_all, name = "stripe::handle_cart_checkout")] pub(super) async fn handle_cart_checkout_completed( state: &AppState, session: &crate::payments::CheckoutSessionView, event_id: &str, ) -> Result<()> { let session_id = session.id.clone(); tracing::info!(session_id = %session_id, "processing completed cart checkout"); let meta = crate::payments::CartCheckoutMetadata::from_metadata(session.metadata.as_ref())?; let buyer_id = meta.buyer_id; let seller_id = meta.seller_id; let payment_intent_id = session.payment_intent.clone().unwrap_or_else(|| "unknown".to_string()); // Complete ALL pending transactions for this session in a single DB transaction let mut db_tx = state.db.begin().await.context("begin cart webhook transaction")?; let completed_txs = db::transactions::complete_cart_transactions( &mut *db_tx, &session_id, &payment_intent_id, ) .await .context("complete cart transactions")?; if completed_txs.is_empty() { tracing::info!(session_id = %session_id, "cart transactions already completed, ignoring duplicate webhook"); return Ok(()); } tracing::info!( session_id = %session_id, buyer_id = %buyer_id, seller_id = %seller_id, count = completed_txs.len(), "cart transactions completed" ); // Defense-in-depth reconciliation: line items are server-built, so the sum // of the credited transactions should equal Stripe's pre-tax subtotal. A // mismatch (future Stripe Tax / price-edit / currency edge) is logged loudly // rather than silently trusted — the server amounts remain authoritative. if let Some(subtotal) = session.amount_subtotal { let credited: i64 = completed_txs.iter().map(|tx| i64::from(tx.amount_cents)).sum(); if credited != subtotal { tracing::error!( session_id = %session_id, credited_cents = %credited, stripe_subtotal_cents = %subtotal, "cart checkout amount mismatch: sum of credited transactions differs from Stripe session subtotal" ); } } // Increment sales count for each item for tx in &completed_txs { if let Some(item_id) = tx.item_id { db::items::increment_sales_count(&mut *db_tx, item_id) .await .with_context(|| format!("increment sales count for item {item_id}"))?; } } db_tx.commit().await.context("commit cart webhook transaction")?; // Remove purchased items from cart (items stay in cart until payment succeeds, // so cancelled checkouts don't lose cart contents) db::cart::remove_seller_items_from_cart(&state.db, buyer_id, seller_id) .await .context("remove cart items after successful payment")?; // --- Secondary effects (outside transaction) --- for tx in &completed_txs { if let Some(item_id) = tx.item_id { // Bundle grants if let Ok(Some(purchased_item)) = db::items::get_item_by_id(&state.db, item_id).await && purchased_item.item_type == db::ItemType::Bundle { crate::routes::stripe::checkout::grant_bundle_items( state, item_id, buyer_id, seller_id, Some(tx.id), ) .await; } // Revenue splits record_transaction_splits(state, tx.id, item_id, tx.amount_cents).await; // License keys maybe_generate_license_key(state, item_id, buyer_id, tx.id).await; // Mailing list subscribe_buyer_to_mailing_list(state, item_id, buyer_id); } } // Contact sharing (once per seller) if completed_txs.iter().any(|t| t.share_contact) { db::transactions::clear_contact_revocation(&state.db, buyer_id, seller_id) .await .context("clear contact revocation after cart purchase")?; } // Send purchase emails for each item (reuse existing per-item emails) for tx in &completed_txs { send_purchase_emails(state, tx, buyer_id, seller_id); } // Log event if let Err(e) = db::subscriptions::log_subscription_event( &state.db, None, event_id, "checkout.session.completed.cart", &serde_json::json!({"session_id": session_id, "item_count": completed_txs.len()}), ).await { tracing::warn!(event_id = %event_id, error = ?e, "failed to log cart checkout event"); } // Check for pending refund check_pending_refund(state, &payment_intent_id).await; Ok(()) } /// Handle checkout.session.completed for subscriptions #[tracing::instrument(skip_all, name = "stripe::handle_subscription_checkout")] pub(super) async fn handle_subscription_checkout_completed( state: &AppState, session: &crate::payments::CheckoutSessionView, event_id: &str, ) -> Result<()> { let session_id = session.id.clone(); tracing::info!(session_id = %session_id, "processing completed subscription checkout"); // Extract subscription-specific metadata (already typed IDs) let raw_metadata = SubscriptionCheckoutMetadata::from_metadata(session.metadata.as_ref())?; let subscriber_id = raw_metadata.subscriber_id; let project_id = raw_metadata.project_id; let tier_id = raw_metadata.tier_id; // Get the Stripe subscription ID from the session let stripe_subscription_id = session.subscription.clone() .ok_or_else(|| { tracing::error!("Subscription checkout completed but no subscription ID on session"); AppError::BadRequest("Missing subscription ID on session".to_string()) })?; // Get the Stripe customer ID from the session let stripe_customer_id = session.customer.clone() .ok_or_else(|| { tracing::error!("Subscription checkout completed but no customer ID on session"); AppError::BadRequest("Missing customer ID on session".to_string()) })?; // Create the subscription record + increment promo code in a single transaction. let mut tx = state.db.begin().await.context("begin subscription webhook transaction")?; let sub = match db::subscriptions::create_subscription( &mut *tx, subscriber_id, tier_id, project_id, &stripe_subscription_id, &stripe_customer_id, ).await .context("create subscription record")? { Some(sub) => sub, None => { tracing::info!( subscriber_id = %subscriber_id, project_id = %project_id, "subscription already exists, ignoring duplicate" ); return Ok(()); } }; // Promo code use_count is reserved at checkout time (not here) to prevent // concurrent checkouts from exceeding max_uses. No increment needed in webhook. // Delete the pending promo-hold transaction (created at checkout time so // cleanup_stale_pending_transactions can release the code if abandoned). db::transactions::delete_subscription_pending_transaction(&mut *tx, &session_id) .await .context("delete subscription pending promo-hold transaction")?; tx.commit().await.context("commit subscription webhook transaction")?; tracing::info!( subscription_id = %sub.id, subscriber_id = %subscriber_id, project_id = %project_id, tier_id = %tier_id, "subscription created" ); // Send subscription started email (fire-and-forget) if let (Ok(Some(subscriber)), Ok(Some(tier)), Ok(Some(project))) = ( db::users::get_user_by_id(&state.db, subscriber_id).await, db::subscriptions::get_subscription_tier_by_id(&state.db, tier_id).await, db::projects::get_project_by_id(&state.db, project_id).await, ) { let price = helpers::format_price(tier.price_cents); let sub_email = subscriber.email.clone(); let sub_name = subscriber.display_name.clone(); let tier_name = tier.name.clone(); let project_title = project.title.clone(); spawn_email!(state, "subscription started", |email| { email.send_subscription_started( &sub_email, sub_name.as_deref(), &tier_name, &project_title, &price, ) }); } // Log event if let Err(e) = db::subscriptions::log_subscription_event( &state.db, Some(sub.id), event_id, "checkout.session.completed.subscription", &serde_json::json!({"session_id": session_id, "stripe_subscription_id": stripe_subscription_id}), ).await { tracing::warn!(event_id = %event_id, error = ?e, "failed to log subscription event"); } Ok(()) } /// Handle checkout.session.completed for Fan+ subscriptions #[tracing::instrument(skip_all, name = "stripe::handle_fan_plus_checkout")] pub(super) async fn handle_fan_plus_checkout_completed( state: &AppState, session: &crate::payments::CheckoutSessionView, event_id: &str, ) -> Result<()> { let session_id = session.id.clone(); tracing::info!(session_id = %session_id, "processing completed Fan+ checkout"); let metadata = FanPlusCheckoutMetadata::from_metadata(session.metadata.as_ref())?; let user_id = metadata.user_id; // Get the Stripe subscription ID from the session let stripe_subscription_id = session.subscription.clone() .ok_or_else(|| { tracing::error!("Fan+ checkout completed but no subscription ID on session"); AppError::BadRequest("Missing subscription ID on session".to_string()) })?; // Get the Stripe customer ID from the session let stripe_customer_id = session.customer.clone() .ok_or_else(|| { tracing::error!("Fan+ checkout completed but no customer ID on session"); AppError::BadRequest("Missing customer ID on session".to_string()) })?; // Create the subscription record. Idempotent via ON CONFLICT (user_id) DO // UPDATE with a guard WHERE: a duplicate webhook for an unchanged row updates // nothing and RETURNING yields no row (-> None below, "already exists"); a // genuine re-subscribe updates in place. let sub = match db::fan_plus::create_fan_plus_subscription( &state.db, user_id, &stripe_subscription_id, &stripe_customer_id, ).await .with_context(|| format!("create Fan+ subscription for user {user_id}"))? { Some(sub) => sub, None => { tracing::info!(user_id = %user_id, "Fan+ subscription already exists, ignoring duplicate"); return Ok(()); } }; tracing::info!( subscription_id = %sub.id, user_id = %user_id, "Fan+ subscription created" ); // Send welcome email (fire-and-forget) if let Ok(Some(user)) = db::users::get_user_by_id(&state.db, user_id).await { let user_email = user.email.clone(); let user_name = user.display_name.clone(); spawn_email!(state, "Fan+ welcome", |email| { email.send_fan_plus_welcome(&user_email, user_name.as_deref()) }); } // Log event if let Err(e) = db::subscriptions::log_subscription_event( &state.db, None, event_id, "checkout.session.completed.fan_plus", &serde_json::json!({"session_id": session_id, "stripe_subscription_id": stripe_subscription_id}), ).await { tracing::warn!(event_id = %event_id, error = ?e, "failed to log subscription event"); } Ok(()) } /// Handle checkout.session.completed for creator tier subscriptions #[tracing::instrument(skip_all, name = "stripe::handle_creator_tier_checkout")] pub(super) async fn handle_creator_tier_checkout_completed( state: &AppState, session: &crate::payments::CheckoutSessionView, event_id: &str, ) -> Result<()> { let session_id = session.id.clone(); tracing::info!(session_id = %session_id, "processing completed creator tier checkout"); let metadata = CreatorTierCheckoutMetadata::from_metadata(session.metadata.as_ref())?; let user_id = metadata.user_id; let tier: db::CreatorTier = metadata.tier.parse() .map_err(|_| AppError::BadRequest(format!("Invalid tier: {}", metadata.tier)))?; // Get the Stripe subscription ID from the session let stripe_subscription_id = session.subscription.clone() .ok_or_else(|| { tracing::error!("Creator tier checkout completed but no subscription ID on session"); AppError::BadRequest("Missing subscription ID on session".to_string()) })?; // Get the Stripe customer ID from the session let stripe_customer_id = session.customer.clone() .ok_or_else(|| { tracing::error!("Creator tier checkout completed but no customer ID on session"); AppError::BadRequest("Missing customer ID on session".to_string()) })?; // Create the subscription record. Idempotent via ON CONFLICT (user_id) DO // UPDATE with a guard WHERE (`stripe_subscription_id != EXCLUDED OR status // != 'active'`): a duplicate webhook updates nothing and RETURNING yields no // row (-> None below); a genuine tier-switch or re-subscribe overwrites the // row with the new subscription id and flips it active. let sub = match db::creator_tiers::create_creator_subscription( &state.db, user_id, &stripe_subscription_id, &stripe_customer_id, tier, ).await .with_context(|| format!("create creator tier subscription for user {user_id}"))? { Some(sub) => sub, None => { tracing::info!(user_id = %user_id, "Creator tier subscription already exists, ignoring duplicate"); return Ok(()); } }; // Sync the denormalized creator_tier column on users db::creator_tiers::sync_user_creator_tier(&state.db, user_id) .await .with_context(|| format!("sync creator tier for user {user_id}"))?; // Auto-unhide: restore items hidden by post-grace enforcement match db::items::unhide_all_items_for_user(&state.db, user_id).await { Ok(count) if count > 0 => { tracing::info!(user_id = %user_id, items_unhidden = count, "auto-unhidden items after tier re-subscription"); } Err(e) => { tracing::warn!(user_id = %user_id, error = ?e, "failed to unhide items after tier re-subscription"); } _ => {} } // Auto-unpause: if this creator was paused and just re-subscribed, clear the pause // and un-cancel any fan subscriptions that haven't expired yet. if let Ok(Some(db_user)) = db::users::get_user_by_id(&state.db, user_id).await && db_user.is_creator_paused() { db::users::unpause_creator(&state.db, user_id) .await .with_context(|| format!("unpause creator {user_id}"))?; // Un-cancel active fan subscriptions (clear cancel_at_period_end) if let (Some(stripe), Some(stripe_account_id)) = (&state.stripe, &db_user.stripe_account_id) { let fan_subs = db::subscriptions::get_active_subscriptions_by_creator(&state.db, user_id) .await .with_context(|| format!("fetch active fan subs for unpause {user_id}"))?; for fan_sub in &fan_subs { if let Err(e) = stripe.set_cancel_at_period_end( &fan_sub.stripe_subscription_id, stripe_account_id, false, ).await { tracing::warn!( stripe_sub_id = %fan_sub.stripe_subscription_id, error = ?e, "failed to clear cancel_at_period_end on fan sub during unpause" ); } } } tracing::info!(user_id = %user_id, "creator auto-unpaused after re-subscribing to tier"); } tracing::info!( user_id = %user_id, tier = %tier, "creator tier subscription created" ); // Log event if let Err(e) = db::subscriptions::log_subscription_event( &state.db, None, event_id, "checkout.session.completed.creator_tier", &serde_json::json!({ "session_id": session_id, "stripe_subscription_id": stripe_subscription_id, "tier": sub.tier, }), ).await { tracing::warn!(event_id = %event_id, error = ?e, "failed to log subscription event"); } Ok(()) } /// Handle checkout.session.completed for tips #[tracing::instrument(skip_all, name = "stripe::handle_tip_checkout")] pub(super) async fn handle_tip_checkout_completed( state: &AppState, session: &crate::payments::CheckoutSessionView, _event_id: &str, ) -> Result<()> { let session_id = session.id.clone(); tracing::info!(session_id = %session_id, "processing completed tip checkout"); let metadata = TipCheckoutMetadata::from_metadata(session.metadata.as_ref())?; let tipper_id = metadata.tipper_id; let recipient_id = metadata.recipient_id; let payment_intent_id = session.payment_intent.clone().unwrap_or_else(|| "unknown".to_string()); // Complete the tip (idempotent) match db::tips::complete_tip(&state.db, &session_id, &payment_intent_id) .await .context("complete tip")? { Some(tip) => { tracing::info!( tip_id = %tip.id, tipper_id = %tipper_id, recipient_id = %recipient_id, amount_cents = %tip.amount_cents, "tip completed" ); // Record revenue splits if the tip's project has members if let Some(project_id) = tip.project_id { record_tip_splits(state, tip.id, project_id, tip.amount_cents).await; } // Send tip notification email (fire-and-forget) send_tip_email(state, &tip, tipper_id, recipient_id); } None => { tracing::info!(session_id = %session_id, "tip already completed, ignoring duplicate webhook"); } } Ok(()) } /// Handle checkout.session.completed for guest purchases (no MNW account). /// /// Extracts the buyer's email from Stripe, completes the transaction, and /// auto-attaches to an existing account if the email matches. #[tracing::instrument(skip_all, name = "stripe::handle_guest_checkout")] pub(super) async fn handle_guest_checkout_completed( state: &AppState, session: &crate::payments::CheckoutSessionView, _event_id: &str, ) -> Result<()> { use crate::payments::GuestCheckoutMetadata; let session_id = session.id.clone(); tracing::info!(session_id = %session_id, "processing completed guest checkout"); let meta = GuestCheckoutMetadata::from_metadata(session.metadata.as_ref())?; // Extract buyer email from Stripe customer_details let guest_email = session.customer_details.as_ref() .and_then(|cd| cd.email.as_deref()) .unwrap_or("unknown@guest") .to_string(); let payment_intent_id = session.payment_intent.clone().unwrap_or_else(|| "unknown".to_string()); // Complete the guest transaction and increment sales count in a single DB transaction // (matching the non-guest path pattern to prevent counter drift on partial failure) let mut db_tx = state.db.begin().await.context("begin guest checkout webhook transaction")?; // Check if email matches an existing user — auto-attach if so. // `FOR SHARE` blocks a concurrent email-change UPDATE from racing the // attach: if someone edits this user's email mid-checkout, the writer // waits for our tx to commit so we either attach the row we matched or // the writer wins and we see no row (treated as guest purchase). let existing_user_id: Option = sqlx::query_scalar( "SELECT id FROM users WHERE LOWER(email) = LOWER($1) AND email_verified = true FOR SHARE", ) .bind(&guest_email) .fetch_optional(&mut *db_tx) .await?; match db::transactions::complete_guest_transaction( &mut *db_tx, &session_id, &payment_intent_id, &guest_email, existing_user_id, ).await? { Some(tx) => { tracing::info!( session_id = %session_id, guest_email = %guest_email, item_id = %meta.item_id, auto_attached = tx.buyer_id.is_some(), "guest transaction completed" ); // Increment sales count inside transaction db::items::increment_sales_count(&mut *db_tx, meta.item_id) .await .with_context(|| format!("increment sales count for guest item {}", meta.item_id))?; db_tx.commit().await.context("commit guest checkout webhook transaction")?; // --- Secondary effects below (outside transaction) --- // Generate license key if applicable and buyer was auto-attached if let Some(buyer_id) = tx.buyer_id { maybe_generate_license_key(state, meta.item_id, buyer_id, tx.id).await; } // Record revenue splits record_transaction_splits(state, tx.id, meta.item_id, tx.amount_cents).await; // Send guest purchase confirmation email (only if not auto-attached to existing account) if tx.buyer_id.is_none() && let (Some(download_token), Some(claim_token)) = (tx.download_token, tx.claim_token) { let email_client = state.email.clone(); let host_url = state.config.host_url.clone(); let item_title = tx.item_title.clone().unwrap_or_else(|| "your item".to_string()); let price = helpers::format_price(tx.amount_cents); let guest_email_addr = guest_email.clone(); let download_url = format!("{}/download/{}", host_url, download_token); let claim_url = format!("{}/claim?token={}", host_url, claim_token); state.bg.spawn("guest purchase confirmation", async move { if let Err(e) = email_client.send_guest_purchase_confirmation( &guest_email_addr, &item_title, &price, &download_url, &claim_url, ).await { tracing::error!(error = ?e, "failed to send guest purchase confirmation email"); } }); } // Send sale notification to seller send_guest_sale_notification(state, &tx, &guest_email, meta.seller_id); } None => { db_tx.commit().await.ok(); tracing::info!(session_id = %session_id, "guest transaction already completed, ignoring duplicate webhook"); } } Ok(()) } /// Handle checkout.session.completed for an end-user SyncKit app subscription. /// Inserts the `app_sync_subscriptions` row; subsequent /// `customer.subscription.updated/.deleted` events keep it in sync. #[tracing::instrument(skip_all, name = "stripe::handle_synckit_app_sub_checkout")] pub(super) async fn handle_synckit_app_sub_checkout_completed( state: &AppState, session: &crate::payments::CheckoutSessionView, event_id: &str, ) -> Result<()> { let session_id = session.id.clone(); tracing::info!(session_id = %session_id, "processing completed SyncKit app subscription checkout"); let meta = SynckitAppSubCheckoutMetadata::from_metadata(session.metadata.as_ref())?; let stripe_subscription_id = session .subscription .clone() .ok_or_else(|| AppError::BadRequest("Missing subscription ID on session".to_string()))?; let stripe_customer_id = session .customer .clone() .ok_or_else(|| AppError::BadRequest("Missing customer ID on session".to_string()))?; let inserted = db::synckit::create_app_sync_subscription( &state.db, &db::synckit::NewAppSyncSubscription { user_id: meta.user_id, app_id: meta.app_id, stripe_subscription_id: &stripe_subscription_id, stripe_customer_id: &stripe_customer_id, interval: &meta.tier, // metadata "tier" carries the interval string ("monthly"/"annual") storage_limit_bytes: meta.storage_limit_bytes.unwrap_or(0), }, ) .await .with_context(|| { format!( "create app sync subscription user={} app={}", meta.user_id, meta.app_id ) })?; if !inserted { tracing::info!( user_id = %meta.user_id, app_id = %meta.app_id, "SyncKit app subscription already exists, ignoring duplicate webhook" ); } let _ = event_id; Ok(()) }