Skip to main content

max / makenotwork

11.4 KB · 378 lines History Blame Raw
1 //! Page view tracking with daily aggregation.
2 //!
3 //! Each page view UPSERTs into `page_view_daily`, incrementing a counter per
4 //! (target_type, target_id, date). No raw per-request rows; the table stays
5 //! small (365 rows/item/year).
6
7 use chrono::{DateTime, Utc};
8 use sqlx::PgPool;
9 use uuid::Uuid;
10
11 use super::analytics::{format_bucket_label, TimeRange};
12 use super::{ProjectId, UserId};
13 use crate::error::Result;
14
15 /// Background batcher for page-view UPSERTs. Replaces the previous
16 /// `tokio::spawn(record_view(...))` per request pattern, which under any traffic
17 /// spike took a connection per pageview and starved real requests at the pool
18 /// acquire boundary.
19 ///
20 /// `PageViewTx::try_record` is non-blocking (`try_send`); on channel overflow
21 /// the increment is dropped (view counts are already approximate — bot filter,
22 /// no per-user dedupe — so losing a fraction during a burst is acceptable).
23 /// The background drainer flushes a single bulk UPSERT every `FLUSH_INTERVAL`.
24 const CHANNEL_CAPACITY: usize = 4096;
25 const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
26
27 type ViewKey = (&'static str, Uuid);
28
29 #[derive(Clone)]
30 pub struct PageViewTx(tokio::sync::mpsc::Sender<ViewKey>);
31
32 impl PageViewTx {
33 pub fn try_record(&self, target_type: &'static str, target_id: Uuid) {
34 let _ = self.0.try_send((target_type, target_id));
35 }
36 }
37
38 /// Spawn the background drainer and return the sender to install on AppState.
39 pub fn spawn_batcher(pool: PgPool) -> PageViewTx {
40 let (tx, mut rx) = tokio::sync::mpsc::channel::<ViewKey>(CHANNEL_CAPACITY);
41 tokio::spawn(async move {
42 let mut pending: std::collections::HashMap<ViewKey, i64> = std::collections::HashMap::new();
43 let mut tick = tokio::time::interval(FLUSH_INTERVAL);
44 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
45 loop {
46 tokio::select! {
47 Some(key) = rx.recv() => {
48 *pending.entry(key).or_insert(0) += 1;
49 }
50 _ = tick.tick() => {
51 if pending.is_empty() {
52 continue;
53 }
54 let batch = std::mem::take(&mut pending);
55 if let Err(e) = flush_batch(&pool, &batch).await {
56 tracing::warn!(error = ?e, count = batch.len(), "page-view batch flush failed");
57 }
58 }
59 else => break,
60 }
61 }
62 });
63 PageViewTx(tx)
64 }
65
66 async fn flush_batch(
67 pool: &PgPool,
68 batch: &std::collections::HashMap<ViewKey, i64>,
69 ) -> Result<()> {
70 // Unzip into parallel arrays for UNNEST-style bulk INSERT — single roundtrip
71 // regardless of batch size.
72 let mut types: Vec<&str> = Vec::with_capacity(batch.len());
73 let mut ids: Vec<Uuid> = Vec::with_capacity(batch.len());
74 let mut counts: Vec<i64> = Vec::with_capacity(batch.len());
75 for ((t, id), c) in batch {
76 types.push(t);
77 ids.push(*id);
78 counts.push(*c);
79 }
80 sqlx::query(
81 r#"
82 INSERT INTO page_view_daily (target_type, target_id, view_date, view_count)
83 SELECT t, i, CURRENT_DATE, c
84 FROM UNNEST($1::TEXT[], $2::UUID[], $3::BIGINT[]) AS u(t, i, c)
85 ON CONFLICT (target_type, target_id, view_date)
86 DO UPDATE SET view_count = page_view_daily.view_count + EXCLUDED.view_count
87 "#,
88 )
89 .bind(&types)
90 .bind(&ids)
91 .bind(&counts)
92 .execute(pool)
93 .await?;
94 Ok(())
95 }
96
97 /// Direct UPSERT path. Kept for backfill / admin tooling; the request path
98 /// goes through `PageViewTx::try_record` to avoid pool pressure under burst.
99 #[allow(dead_code)]
100 pub async fn record_view(pool: &PgPool, target_type: &str, target_id: Uuid) -> Result<()> {
101 sqlx::query(
102 r#"
103 INSERT INTO page_view_daily (target_type, target_id, view_date, view_count)
104 VALUES ($1, $2, CURRENT_DATE, 1)
105 ON CONFLICT (target_type, target_id, view_date)
106 DO UPDATE SET view_count = page_view_daily.view_count + 1
107 "#,
108 )
109 .bind(target_type)
110 .bind(target_id)
111 .execute(pool)
112 .await?;
113 Ok(())
114 }
115
116 /// A single time bucket in a view timeseries.
117 #[allow(dead_code)]
118 pub struct ViewBucket {
119 pub label: String,
120 pub view_count: i64,
121 }
122
123 /// Fetch time-bucketed view counts for a seller (across all their items and projects).
124 ///
125 /// Optionally scoped to a single project. Uses the same bucketing as revenue charts.
126 #[allow(dead_code)]
127 pub async fn get_view_timeseries(
128 pool: &PgPool,
129 seller_id: UserId,
130 project_id: Option<ProjectId>,
131 range: &TimeRange,
132 ) -> Result<Vec<ViewBucket>> {
133 let bucket = range.bucket_sql();
134 let time_filter = match range.interval_sql() {
135 Some(interval) => format!(" AND pv.view_date >= (CURRENT_DATE - INTERVAL '{interval}')"),
136 None => String::new(),
137 };
138
139 let project_filter = if project_id.is_some() {
140 " AND i.project_id = $2"
141 } else {
142 ""
143 };
144
145 let sql = format!(
146 r#"
147 SELECT
148 date_trunc('{bucket}', pv.view_date::TIMESTAMPTZ) AS bucket,
149 COALESCE(SUM(pv.view_count), 0)::BIGINT
150 FROM page_view_daily pv
151 JOIN items i ON pv.target_type = 'item' AND pv.target_id = i.id
152 JOIN projects p ON i.project_id = p.id
153 WHERE p.user_id = $1{project_filter}{time_filter}
154 GROUP BY bucket
155 ORDER BY bucket
156 LIMIT 500
157 "#
158 );
159
160 let rows: Vec<(DateTime<Utc>, i64)> = if let Some(pid) = project_id {
161 sqlx::query_as(&sql)
162 .bind(seller_id)
163 .bind(pid)
164 .fetch_all(pool)
165 .await?
166 } else {
167 sqlx::query_as(&sql)
168 .bind(seller_id)
169 .fetch_all(pool)
170 .await?
171 };
172
173 let buckets = rows
174 .into_iter()
175 .map(|(dt, count)| ViewBucket {
176 label: format_bucket_label(&dt, range),
177 view_count: count,
178 })
179 .collect();
180
181 Ok(buckets)
182 }
183
184 /// Period-over-period view comparison for stat cards.
185 ///
186 /// Returns `(current_views, previous_views)`.
187 pub async fn get_view_period_comparison(
188 pool: &PgPool,
189 seller_id: UserId,
190 project_id: Option<ProjectId>,
191 range: &TimeRange,
192 ) -> Result<(i64, i64)> {
193 let Some(interval) = range.interval_sql() else {
194 // All time: no comparison possible — return total views with 0 previous.
195 let total = get_total_views(pool, seller_id, project_id, None).await?;
196 return Ok((total, 0));
197 };
198
199 let project_filter = if project_id.is_some() {
200 " AND i.project_id = $2"
201 } else {
202 ""
203 };
204
205 let sql = format!(
206 r#"
207 SELECT
208 COALESCE(SUM(pv.view_count) FILTER (
209 WHERE pv.view_date >= CURRENT_DATE - INTERVAL '{interval}'
210 ), 0)::BIGINT,
211 COALESCE(SUM(pv.view_count) FILTER (
212 WHERE pv.view_date >= CURRENT_DATE - INTERVAL '{interval}' * 2
213 AND pv.view_date < CURRENT_DATE - INTERVAL '{interval}'
214 ), 0)::BIGINT
215 FROM page_view_daily pv
216 JOIN items i ON pv.target_type = 'item' AND pv.target_id = i.id
217 JOIN projects p ON i.project_id = p.id
218 WHERE p.user_id = $1{project_filter}
219 AND pv.view_date >= CURRENT_DATE - INTERVAL '{interval}' * 2
220 "#
221 );
222
223 let row: (i64, i64) = if let Some(pid) = project_id {
224 sqlx::query_as(&sql)
225 .bind(seller_id)
226 .bind(pid)
227 .fetch_one(pool)
228 .await?
229 } else {
230 sqlx::query_as(&sql)
231 .bind(seller_id)
232 .fetch_one(pool)
233 .await?
234 };
235
236 Ok(row)
237 }
238
239 /// Total views for a seller, optionally scoped to a project and time range.
240 async fn get_total_views(
241 pool: &PgPool,
242 seller_id: UserId,
243 project_id: Option<ProjectId>,
244 since: Option<DateTime<Utc>>,
245 ) -> Result<i64> {
246 let time_filter = if since.is_some() {
247 " AND pv.view_date >= $2::DATE"
248 } else {
249 ""
250 };
251 let project_filter = if project_id.is_some() {
252 if since.is_some() {
253 " AND i.project_id = $3"
254 } else {
255 " AND i.project_id = $2"
256 }
257 } else {
258 ""
259 };
260
261 let sql = format!(
262 r#"
263 SELECT COALESCE(SUM(pv.view_count), 0)::BIGINT
264 FROM page_view_daily pv
265 JOIN items i ON pv.target_type = 'item' AND pv.target_id = i.id
266 JOIN projects p ON i.project_id = p.id
267 WHERE p.user_id = $1{time_filter}{project_filter}
268 "#
269 );
270
271 let row: (i64,) = match (since, project_id) {
272 (Some(s), Some(pid)) => {
273 sqlx::query_as(&sql)
274 .bind(seller_id)
275 .bind(s)
276 .bind(pid)
277 .fetch_one(pool)
278 .await?
279 }
280 (Some(s), None) => {
281 sqlx::query_as(&sql)
282 .bind(seller_id)
283 .bind(s)
284 .fetch_one(pool)
285 .await?
286 }
287 (None, Some(pid)) => {
288 sqlx::query_as(&sql)
289 .bind(seller_id)
290 .bind(pid)
291 .fetch_one(pool)
292 .await?
293 }
294 (None, None) => {
295 sqlx::query_as(&sql)
296 .bind(seller_id)
297 .fetch_one(pool)
298 .await?
299 }
300 };
301
302 Ok(row.0)
303 }
304
305 /// Per-project view totals for a seller. Used for the cross-project comparison table.
306 pub async fn get_views_by_seller_projects(
307 pool: &PgPool,
308 seller_id: UserId,
309 range: &TimeRange,
310 ) -> Result<Vec<(ProjectId, i64)>> {
311 let time_filter = match range.interval_sql() {
312 Some(interval) => format!(" AND pv.view_date >= (CURRENT_DATE - INTERVAL '{interval}')"),
313 None => String::new(),
314 };
315
316 let sql = format!(
317 r#"
318 SELECT p.id, COALESCE(SUM(pv.view_count), 0)::BIGINT
319 FROM projects p
320 LEFT JOIN items i ON i.project_id = p.id
321 LEFT JOIN page_view_daily pv
322 ON pv.target_type = 'item' AND pv.target_id = i.id{time_filter}
323 WHERE p.user_id = $1
324 GROUP BY p.id
325 "#
326 );
327
328 let rows: Vec<(ProjectId, i64)> = sqlx::query_as(&sql)
329 .bind(seller_id)
330 .fetch_all(pool)
331 .await?;
332
333 Ok(rows)
334 }
335
336 /// Per-item view totals for a project. Used for the project analytics "top items" list.
337 #[allow(dead_code)]
338 pub async fn get_views_by_project_items(
339 pool: &PgPool,
340 project_id: ProjectId,
341 range: &TimeRange,
342 ) -> Result<Vec<(super::ItemId, i64)>> {
343 let time_filter = match range.interval_sql() {
344 Some(interval) => format!(" AND pv.view_date >= (CURRENT_DATE - INTERVAL '{interval}')"),
345 None => String::new(),
346 };
347
348 let sql = format!(
349 r#"
350 SELECT i.id, COALESCE(SUM(pv.view_count), 0)::BIGINT
351 FROM items i
352 LEFT JOIN page_view_daily pv
353 ON pv.target_type = 'item' AND pv.target_id = i.id{time_filter}
354 WHERE i.project_id = $1
355 GROUP BY i.id
356 "#
357 );
358
359 let rows: Vec<(super::ItemId, i64)> = sqlx::query_as(&sql)
360 .bind(project_id)
361 .fetch_all(pool)
362 .await?;
363
364 Ok(rows)
365 }
366
367 /// Delete page view rows older than `retain_days`. Called by the daily scheduler.
368 pub async fn prune_old_views(pool: &PgPool, retain_days: i64) -> Result<u64> {
369 let result = sqlx::query(
370 "DELETE FROM page_view_daily WHERE view_date < CURRENT_DATE - $1 * INTERVAL '1 day'",
371 )
372 .bind(retain_days)
373 .execute(pool)
374 .await?;
375
376 Ok(result.rows_affected())
377 }
378