//! Page view tracking with daily aggregation. //! //! Each page view UPSERTs into `page_view_daily`, incrementing a counter per //! (target_type, target_id, date). No raw per-request rows; the table stays //! small (365 rows/item/year). use chrono::{DateTime, Utc}; use sqlx::PgPool; use uuid::Uuid; use super::analytics::{format_bucket_label, TimeRange}; use super::{ProjectId, UserId}; use crate::error::Result; /// Background batcher for page-view UPSERTs. Replaces the previous /// `tokio::spawn(record_view(...))` per request pattern, which under any traffic /// spike took a connection per pageview and starved real requests at the pool /// acquire boundary. /// /// `PageViewTx::try_record` is non-blocking (`try_send`); on channel overflow /// the increment is dropped (view counts are already approximate — bot filter, /// no per-user dedupe — so losing a fraction during a burst is acceptable). /// The background drainer flushes a single bulk UPSERT every `FLUSH_INTERVAL`. const CHANNEL_CAPACITY: usize = 4096; const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500); type ViewKey = (&'static str, Uuid); #[derive(Clone)] pub struct PageViewTx(tokio::sync::mpsc::Sender); impl PageViewTx { pub fn try_record(&self, target_type: &'static str, target_id: Uuid) { let _ = self.0.try_send((target_type, target_id)); } } /// Spawn the background drainer and return the sender to install on AppState. pub fn spawn_batcher(pool: PgPool) -> PageViewTx { let (tx, mut rx) = tokio::sync::mpsc::channel::(CHANNEL_CAPACITY); tokio::spawn(async move { let mut pending: std::collections::HashMap = std::collections::HashMap::new(); let mut tick = tokio::time::interval(FLUSH_INTERVAL); tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { tokio::select! { Some(key) = rx.recv() => { *pending.entry(key).or_insert(0) += 1; } _ = tick.tick() => { if pending.is_empty() { continue; } let batch = std::mem::take(&mut pending); if let Err(e) = flush_batch(&pool, &batch).await { tracing::warn!(error = ?e, count = batch.len(), "page-view batch flush failed"); } } else => break, } } }); PageViewTx(tx) } async fn flush_batch( pool: &PgPool, batch: &std::collections::HashMap, ) -> Result<()> { // Unzip into parallel arrays for UNNEST-style bulk INSERT — single roundtrip // regardless of batch size. let mut types: Vec<&str> = Vec::with_capacity(batch.len()); let mut ids: Vec = Vec::with_capacity(batch.len()); let mut counts: Vec = Vec::with_capacity(batch.len()); for ((t, id), c) in batch { types.push(t); ids.push(*id); counts.push(*c); } sqlx::query( r#" INSERT INTO page_view_daily (target_type, target_id, view_date, view_count) SELECT t, i, CURRENT_DATE, c FROM UNNEST($1::TEXT[], $2::UUID[], $3::BIGINT[]) AS u(t, i, c) ON CONFLICT (target_type, target_id, view_date) DO UPDATE SET view_count = page_view_daily.view_count + EXCLUDED.view_count "#, ) .bind(&types) .bind(&ids) .bind(&counts) .execute(pool) .await?; Ok(()) } /// Direct UPSERT path. Kept for backfill / admin tooling; the request path /// goes through `PageViewTx::try_record` to avoid pool pressure under burst. #[allow(dead_code)] pub async fn record_view(pool: &PgPool, target_type: &str, target_id: Uuid) -> Result<()> { sqlx::query( r#" INSERT INTO page_view_daily (target_type, target_id, view_date, view_count) VALUES ($1, $2, CURRENT_DATE, 1) ON CONFLICT (target_type, target_id, view_date) DO UPDATE SET view_count = page_view_daily.view_count + 1 "#, ) .bind(target_type) .bind(target_id) .execute(pool) .await?; Ok(()) } /// A single time bucket in a view timeseries. #[allow(dead_code)] pub struct ViewBucket { pub label: String, pub view_count: i64, } /// Fetch time-bucketed view counts for a seller (across all their items and projects). /// /// Optionally scoped to a single project. Uses the same bucketing as revenue charts. #[allow(dead_code)] pub async fn get_view_timeseries( pool: &PgPool, seller_id: UserId, project_id: Option, range: &TimeRange, ) -> Result> { let bucket = range.bucket_sql(); let time_filter = match range.interval_sql() { Some(interval) => format!(" AND pv.view_date >= (CURRENT_DATE - INTERVAL '{interval}')"), None => String::new(), }; let project_filter = if project_id.is_some() { " AND i.project_id = $2" } else { "" }; let sql = format!( r#" SELECT date_trunc('{bucket}', pv.view_date::TIMESTAMPTZ) AS bucket, COALESCE(SUM(pv.view_count), 0)::BIGINT FROM page_view_daily pv JOIN items i ON pv.target_type = 'item' AND pv.target_id = i.id JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1{project_filter}{time_filter} GROUP BY bucket ORDER BY bucket LIMIT 500 "# ); let rows: Vec<(DateTime, i64)> = if let Some(pid) = project_id { sqlx::query_as(&sql) .bind(seller_id) .bind(pid) .fetch_all(pool) .await? } else { sqlx::query_as(&sql) .bind(seller_id) .fetch_all(pool) .await? }; let buckets = rows .into_iter() .map(|(dt, count)| ViewBucket { label: format_bucket_label(&dt, range), view_count: count, }) .collect(); Ok(buckets) } /// Period-over-period view comparison for stat cards. /// /// Returns `(current_views, previous_views)`. pub async fn get_view_period_comparison( pool: &PgPool, seller_id: UserId, project_id: Option, range: &TimeRange, ) -> Result<(i64, i64)> { let Some(interval) = range.interval_sql() else { // All time: no comparison possible — return total views with 0 previous. let total = get_total_views(pool, seller_id, project_id, None).await?; return Ok((total, 0)); }; let project_filter = if project_id.is_some() { " AND i.project_id = $2" } else { "" }; let sql = format!( r#" SELECT COALESCE(SUM(pv.view_count) FILTER ( WHERE pv.view_date >= CURRENT_DATE - INTERVAL '{interval}' ), 0)::BIGINT, COALESCE(SUM(pv.view_count) FILTER ( WHERE pv.view_date >= CURRENT_DATE - INTERVAL '{interval}' * 2 AND pv.view_date < CURRENT_DATE - INTERVAL '{interval}' ), 0)::BIGINT FROM page_view_daily pv JOIN items i ON pv.target_type = 'item' AND pv.target_id = i.id JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1{project_filter} AND pv.view_date >= CURRENT_DATE - INTERVAL '{interval}' * 2 "# ); let row: (i64, i64) = if let Some(pid) = project_id { sqlx::query_as(&sql) .bind(seller_id) .bind(pid) .fetch_one(pool) .await? } else { sqlx::query_as(&sql) .bind(seller_id) .fetch_one(pool) .await? }; Ok(row) } /// Total views for a seller, optionally scoped to a project and time range. async fn get_total_views( pool: &PgPool, seller_id: UserId, project_id: Option, since: Option>, ) -> Result { let time_filter = if since.is_some() { " AND pv.view_date >= $2::DATE" } else { "" }; let project_filter = if project_id.is_some() { if since.is_some() { " AND i.project_id = $3" } else { " AND i.project_id = $2" } } else { "" }; let sql = format!( r#" SELECT COALESCE(SUM(pv.view_count), 0)::BIGINT FROM page_view_daily pv JOIN items i ON pv.target_type = 'item' AND pv.target_id = i.id JOIN projects p ON i.project_id = p.id WHERE p.user_id = $1{time_filter}{project_filter} "# ); let row: (i64,) = match (since, project_id) { (Some(s), Some(pid)) => { sqlx::query_as(&sql) .bind(seller_id) .bind(s) .bind(pid) .fetch_one(pool) .await? } (Some(s), None) => { sqlx::query_as(&sql) .bind(seller_id) .bind(s) .fetch_one(pool) .await? } (None, Some(pid)) => { sqlx::query_as(&sql) .bind(seller_id) .bind(pid) .fetch_one(pool) .await? } (None, None) => { sqlx::query_as(&sql) .bind(seller_id) .fetch_one(pool) .await? } }; Ok(row.0) } /// Per-project view totals for a seller. Used for the cross-project comparison table. pub async fn get_views_by_seller_projects( pool: &PgPool, seller_id: UserId, range: &TimeRange, ) -> Result> { let time_filter = match range.interval_sql() { Some(interval) => format!(" AND pv.view_date >= (CURRENT_DATE - INTERVAL '{interval}')"), None => String::new(), }; let sql = format!( r#" SELECT p.id, COALESCE(SUM(pv.view_count), 0)::BIGINT FROM projects p LEFT JOIN items i ON i.project_id = p.id LEFT JOIN page_view_daily pv ON pv.target_type = 'item' AND pv.target_id = i.id{time_filter} WHERE p.user_id = $1 GROUP BY p.id "# ); let rows: Vec<(ProjectId, i64)> = sqlx::query_as(&sql) .bind(seller_id) .fetch_all(pool) .await?; Ok(rows) } /// Per-item view totals for a project. Used for the project analytics "top items" list. #[allow(dead_code)] pub async fn get_views_by_project_items( pool: &PgPool, project_id: ProjectId, range: &TimeRange, ) -> Result> { let time_filter = match range.interval_sql() { Some(interval) => format!(" AND pv.view_date >= (CURRENT_DATE - INTERVAL '{interval}')"), None => String::new(), }; let sql = format!( r#" SELECT i.id, COALESCE(SUM(pv.view_count), 0)::BIGINT FROM items i LEFT JOIN page_view_daily pv ON pv.target_type = 'item' AND pv.target_id = i.id{time_filter} WHERE i.project_id = $1 GROUP BY i.id "# ); let rows: Vec<(super::ItemId, i64)> = sqlx::query_as(&sql) .bind(project_id) .fetch_all(pool) .await?; Ok(rows) } /// Delete page view rows older than `retain_days`. Called by the daily scheduler. pub async fn prune_old_views(pool: &PgPool, retain_days: i64) -> Result { let result = sqlx::query( "DELETE FROM page_view_daily WHERE view_date < CURRENT_DATE - $1 * INTERVAL '1 day'", ) .bind(retain_days) .execute(pool) .await?; Ok(result.rows_affected()) }