max / makenotwork
3 files changed,
+145 insertions,
-52 deletions
| @@ -133,6 +133,96 @@ pub fn record_db_pool_stats(pool: &sqlx::PgPool) { | |||
| 133 | 133 | gauge!("db_pool_connections_active").set(size - idle); | |
| 134 | 134 | } | |
| 135 | 135 | ||
| 136 | + | /// Record server-wide Postgres saturation as Prometheus gauges. Sibling of | |
| 137 | + | /// the local-pool gauges — this looks at the SHARED Postgres (MNW + MT + | |
| 138 | + | /// ad hoc clients) so a dashboard can see the global ceiling, not just our | |
| 139 | + | /// pool's share. Cheap query (single row from `pg_stat_activity`). | |
| 140 | + | /// | |
| 141 | + | /// Returns `(active_backends, max_connections)` so the caller can also fire | |
| 142 | + | /// the existing WAM alert when utilization climbs past the threshold. | |
| 143 | + | #[tracing::instrument(skip_all)] | |
| 144 | + | pub async fn record_pg_stat_activity(pool: &sqlx::PgPool) -> Option<(i64, i64)> { | |
| 145 | + | let row: Result<(i64, i64), _> = sqlx::query_as( | |
| 146 | + | "SELECT \ | |
| 147 | + | (SELECT count(*) FROM pg_stat_activity \ | |
| 148 | + | WHERE state IS NOT NULL AND backend_type = 'client backend')::bigint, \ | |
| 149 | + | current_setting('max_connections')::bigint", | |
| 150 | + | ) | |
| 151 | + | .fetch_one(pool) | |
| 152 | + | .await; | |
| 153 | + | ||
| 154 | + | match row { | |
| 155 | + | Ok((active, max_conn)) if max_conn > 0 => { | |
| 156 | + | gauge!("pg_stat_activity_active_backends").set(active as f64); | |
| 157 | + | gauge!("pg_stat_activity_max_connections").set(max_conn as f64); | |
| 158 | + | gauge!("pg_stat_activity_utilization_ratio").set(active as f64 / max_conn as f64); | |
| 159 | + | Some((active, max_conn)) | |
| 160 | + | } | |
| 161 | + | Ok(_) => None, | |
| 162 | + | Err(e) => { | |
| 163 | + | tracing::debug!(error = ?e, "pg_stat_activity gauge update failed"); | |
| 164 | + | None | |
| 165 | + | } | |
| 166 | + | } | |
| 167 | + | } | |
| 168 | + | ||
| 169 | + | /// Aggregated storage fill metrics across all paying creators. | |
| 170 | + | /// | |
| 171 | + | /// Emits three gauges so the dashboard can compute fill ratio without | |
| 172 | + | /// joining queries client-side: | |
| 173 | + | /// - `creator_storage_used_bytes_total` — sum of `users.storage_used_bytes` | |
| 174 | + | /// for users with an active creator subscription. | |
| 175 | + | /// - `creator_storage_cap_bytes_total` — sum of the corresponding tier caps. | |
| 176 | + | /// - `creator_storage_fill_ratio` — used / cap. | |
| 177 | + | /// | |
| 178 | + | /// Pricing economics assume ~20% fill; 60%+ is 3× projection and warrants | |
| 179 | + | /// re-pricing. This gauge is the canonical input for that threshold. | |
| 180 | + | #[tracing::instrument(skip_all)] | |
| 181 | + | pub async fn record_storage_fill_stats(pool: &sqlx::PgPool) { | |
| 182 | + | // Tier cap table: kept in SQL because the values rarely change and | |
| 183 | + | // mirror `CreatorTier::max_storage_bytes`. Update both sites if a tier | |
| 184 | + | // cap moves. Tiers without a creator_subscriptions row are excluded — | |
| 185 | + | // a user without an active subscription has no committed storage cap. | |
| 186 | + | let row: Result<(i64, i64), _> = sqlx::query_as( | |
| 187 | + | r#" | |
| 188 | + | WITH tier_caps(tier, cap_bytes) AS ( | |
| 189 | + | VALUES | |
| 190 | + | ('basic', 50::bigint * 1024 * 1024 * 1024), | |
| 191 | + | ('small_files', 250::bigint * 1024 * 1024 * 1024), | |
| 192 | + | ('big_files', 1024::bigint * 1024 * 1024 * 1024), | |
| 193 | + | ('everything', 5120::bigint * 1024 * 1024 * 1024) | |
| 194 | + | ) | |
| 195 | + | SELECT | |
| 196 | + | COALESCE(SUM(u.storage_used_bytes), 0)::bigint AS used, | |
| 197 | + | COALESCE(SUM(tc.cap_bytes), 0)::bigint AS cap | |
| 198 | + | FROM users u | |
| 199 | + | JOIN creator_subscriptions cs | |
| 200 | + | ON cs.user_id = u.id AND cs.status = 'active' | |
| 201 | + | JOIN tier_caps tc ON tc.tier = cs.tier | |
| 202 | + | "#, | |
| 203 | + | ) | |
| 204 | + | .fetch_one(pool) | |
| 205 | + | .await; | |
| 206 | + | ||
| 207 | + | match row { | |
| 208 | + | Ok((used, cap)) => { | |
| 209 | + | gauge!("creator_storage_used_bytes_total").set(used as f64); | |
| 210 | + | gauge!("creator_storage_cap_bytes_total").set(cap as f64); | |
| 211 | + | let ratio = if cap > 0 { used as f64 / cap as f64 } else { 0.0 }; | |
| 212 | + | gauge!("creator_storage_fill_ratio").set(ratio); | |
| 213 | + | } | |
| 214 | + | Err(e) => { | |
| 215 | + | tracing::debug!(error = ?e, "storage fill stats query failed"); | |
| 216 | + | } | |
| 217 | + | } | |
| 218 | + | } | |
| 219 | + | ||
| 220 | + | /// Emit the current `domain_cache` size as a gauge so dashboards can track | |
| 221 | + | /// cache growth + correlate with `caddy_ask_total{outcome="cache_hit"}`. | |
| 222 | + | pub fn record_domain_cache_size(size: usize) { | |
| 223 | + | gauge!("domain_cache_entries").set(size as f64); | |
| 224 | + | } | |
| 225 | + | ||
| 136 | 226 | /// Axum middleware that implements idempotency keys for POST endpoints. | |
| 137 | 227 | /// | |
| 138 | 228 | /// If the request includes an `Idempotency-Key` header and the user is |
| @@ -129,8 +129,14 @@ pub fn spawn_monitor( | |||
| 129 | 129 | ||
| 130 | 130 | let snap = run_health_check(&state).await; | |
| 131 | 131 | ||
| 132 | - | // Update Prometheus DB pool gauges on every tick | |
| 132 | + | // Update Prometheus gauges on every tick. DB pool + domain cache | |
| 133 | + | // are local and cheap; the pg_stat_activity probe is one fast | |
| 134 | + | // query against the shared Postgres. Storage fill aggregates | |
| 135 | + | // across all paying creators — also cheap (single JOIN over | |
| 136 | + | // creator_subscriptions, sub-second on any plausible row count). | |
| 133 | 137 | crate::metrics::record_db_pool_stats(&state.db); | |
| 138 | + | crate::metrics::record_domain_cache_size(state.domain_cache.len()); | |
| 139 | + | crate::metrics::record_storage_fill_stats(&state.db).await; | |
| 134 | 140 | ||
| 135 | 141 | // Log status changes. Skip the bootstrap None->Operational transition | |
| 136 | 142 | // so clean restarts don't fire a spurious "recovered" alert. | |
| @@ -257,58 +263,45 @@ pub fn spawn_monitor( | |||
| 257 | 263 | // Postgres server-wide saturation check via pg_stat_activity. This | |
| 258 | 264 | // catches cases where the *shared* Postgres (MNW + MT pools + ad hoc | |
| 259 | 265 | // clients) is approaching `max_connections`, which the local pool | |
| 260 | - | // pressure check above cannot see. | |
| 261 | - | { | |
| 262 | - | let row: Result<(i64, i64), _> = sqlx::query_as( | |
| 263 | - | "SELECT \ | |
| 264 | - | (SELECT count(*) FROM pg_stat_activity \ | |
| 265 | - | WHERE state IS NOT NULL AND backend_type = 'client backend')::bigint, \ | |
| 266 | - | current_setting('max_connections')::bigint", | |
| 267 | - | ) | |
| 268 | - | .fetch_one(&state.db) | |
| 269 | - | .await; | |
| 270 | - | ||
| 271 | - | if let Ok((active, max_conn)) = row | |
| 272 | - | && max_conn > 0 | |
| 273 | - | { | |
| 274 | - | let pct = active * 100 / max_conn; | |
| 275 | - | if pct > 80 { | |
| 276 | - | tracing::warn!( | |
| 277 | - | active, | |
| 278 | - | max_conn, | |
| 279 | - | pct, | |
| 280 | - | "Postgres pg_stat_activity saturation >80%" | |
| 266 | + | // pressure check above cannot see. The probe also emits Prometheus | |
| 267 | + | // gauges via `record_pg_stat_activity` so dashboards can graph the | |
| 268 | + | // ratio over time, not just react to the alert threshold. | |
| 269 | + | if let Some((active, max_conn)) = crate::metrics::record_pg_stat_activity(&state.db).await { | |
| 270 | + | let pct = active * 100 / max_conn; | |
| 271 | + | if pct > 80 { | |
| 272 | + | tracing::warn!( | |
| 273 | + | active, | |
| 274 | + | max_conn, | |
| 275 | + | pct, | |
| 276 | + | "Postgres pg_stat_activity saturation >80%" | |
| 277 | + | ); | |
| 278 | + | let cooldown_ok = last_pg_activity_alert_at | |
| 279 | + | .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS); | |
| 280 | + | if cooldown_ok | |
| 281 | + | && let Some(ref wam) = state.wam | |
| 282 | + | { | |
| 283 | + | let title = format!( | |
| 284 | + | "Postgres saturation: {active}/{max_conn} client backends ({pct}%)" | |
| 281 | 285 | ); | |
| 282 | - | let cooldown_ok = last_pg_activity_alert_at | |
| 283 | - | .is_none_or(|t| t.elapsed().as_secs() >= constants::ALERT_COOLDOWN_SECS); | |
| 284 | - | if cooldown_ok | |
| 285 | - | && let Some(ref wam) = state.wam | |
| 286 | - | { | |
| 287 | - | let title = format!( | |
| 288 | - | "Postgres saturation: {active}/{max_conn} client backends ({pct}%)" | |
| 289 | - | ); | |
| 290 | - | let body = format!( | |
| 291 | - | "pg_stat_activity client-backend count is at {pct}% of \ | |
| 292 | - | max_connections ({active}/{max_conn}). Shared Postgres serves \ | |
| 293 | - | MNW + MT + ad hoc clients; exhaustion will fail new connections \ | |
| 294 | - | for all of them. Investigate which role/application is holding \ | |
| 295 | - | connections via:\n\n \ | |
| 296 | - | SELECT usename, application_name, state, count(*) \ | |
| 297 | - | FROM pg_stat_activity GROUP BY 1,2,3 ORDER BY 4 DESC;" | |
| 298 | - | ); | |
| 299 | - | wam.create_ticket( | |
| 300 | - | &title, | |
| 301 | - | Some(&body), | |
| 302 | - | "high", | |
| 303 | - | "pg-stat-activity-saturation", | |
| 304 | - | None, | |
| 305 | - | ) | |
| 306 | - | .await; | |
| 307 | - | last_pg_activity_alert_at = Some(Instant::now()); | |
| 308 | - | } | |
| 286 | + | let body = format!( | |
| 287 | + | "pg_stat_activity client-backend count is at {pct}% of \ | |
| 288 | + | max_connections ({active}/{max_conn}). Shared Postgres serves \ | |
| 289 | + | MNW + MT + ad hoc clients; exhaustion will fail new connections \ | |
| 290 | + | for all of them. Investigate which role/application is holding \ | |
| 291 | + | connections via:\n\n \ | |
| 292 | + | SELECT usename, application_name, state, count(*) \ | |
| 293 | + | FROM pg_stat_activity GROUP BY 1,2,3 ORDER BY 4 DESC;" | |
| 294 | + | ); | |
| 295 | + | wam.create_ticket( | |
| 296 | + | &title, | |
| 297 | + | Some(&body), | |
| 298 | + | "high", | |
| 299 | + | "pg-stat-activity-saturation", | |
| 300 | + | None, | |
| 301 | + | ) | |
| 302 | + | .await; | |
| 303 | + | last_pg_activity_alert_at = Some(Instant::now()); | |
| 309 | 304 | } | |
| 310 | - | } else if let Err(e) = row { | |
| 311 | - | tracing::debug!(error = ?e, "pg_stat_activity probe failed"); | |
| 312 | 305 | } | |
| 313 | 306 | } | |
| 314 | 307 |
| @@ -176,6 +176,7 @@ pub(super) async fn caddy_ask( | |||
| 176 | 176 | State(state): State<AppState>, | |
| 177 | 177 | Query(q): Query<CaddyAskQuery>, | |
| 178 | 178 | ) -> impl IntoResponse { | |
| 179 | + | use metrics::{counter, gauge}; | |
| 179 | 180 | let domain = q.domain.to_lowercase(); | |
| 180 | 181 | ||
| 181 | 182 | // Cheap structural reject: hostnames with no dot, spaces, control chars, | |
| @@ -186,11 +187,13 @@ pub(super) async fn caddy_ask( | |||
| 186 | 187 | || !domain.contains('.') | |
| 187 | 188 | || domain.contains(|c: char| c.is_whitespace() || c.is_control()) | |
| 188 | 189 | { | |
| 190 | + | counter!("caddy_ask_total", "outcome" => "rejected_invalid").increment(1); | |
| 189 | 191 | return axum::http::StatusCode::NOT_FOUND; | |
| 190 | 192 | } | |
| 191 | 193 | ||
| 192 | 194 | // Fast path: cache hit, no DB, no semaphore. | |
| 193 | 195 | if state.domain_cache.contains_key(&domain) { | |
| 196 | + | counter!("caddy_ask_total", "outcome" => "cache_hit").increment(1); | |
| 194 | 197 | return axum::http::StatusCode::OK; | |
| 195 | 198 | } | |
| 196 | 199 | ||
| @@ -199,15 +202,22 @@ pub(super) async fn caddy_ask( | |||
| 199 | 202 | // retry rather than queue. | |
| 200 | 203 | let Ok(_permit) = state.caddy_ask_semaphore.try_acquire() else { | |
| 201 | 204 | tracing::warn!(domain = %domain, "caddy-ask: cache-miss concurrency cap reached"); | |
| 205 | + | counter!("caddy_ask_total", "outcome" => "rejected_at_cap").increment(1); | |
| 202 | 206 | return axum::http::StatusCode::SERVICE_UNAVAILABLE; | |
| 203 | 207 | }; | |
| 204 | 208 | ||
| 205 | 209 | match db::custom_domains::get_verified_domain(&state.db, &domain).await { | |
| 206 | 210 | Ok(Some(d)) => { | |
| 207 | 211 | state.domain_cache.insert(d.domain, d.user_id); | |
| 212 | + | // Update cache-size gauge after the insert so it reflects current state. | |
| 213 | + | gauge!("domain_cache_entries").set(state.domain_cache.len() as f64); | |
| 214 | + | counter!("caddy_ask_total", "outcome" => "miss_found").increment(1); | |
| 208 | 215 | axum::http::StatusCode::OK | |
| 209 | 216 | } | |
| 210 | - | _ => axum::http::StatusCode::NOT_FOUND, | |
| 217 | + | _ => { | |
| 218 | + | counter!("caddy_ask_total", "outcome" => "miss_notfound").increment(1); | |
| 219 | + | axum::http::StatusCode::NOT_FOUND | |
| 220 | + | } | |
| 211 | 221 | } | |
| 212 | 222 | } | |
| 213 | 223 |