Skip to main content

max / makenotwork

Ultra-fuzz Run #1 remediation: all axes to A-; bump 0.4.0 Adversarial multi-axis audit (Community, Storage, UX, Security, Performance) surfaced 15 serious findings; this remediates all of them. - Storage: authorize image serving (community access check); drop reply_count denorm (migration 027) and compute it live from non-removed posts; delete S3 objects on image removal; atomic create_thread_with_op. - Security: rate-limit /auth/* and the internal API; consume OAuth state/PKCE verifier before validation; HMAC over raw bytes + asymmetric replay window; connect-time SSRF resolver dropping private IPs; remove /auth/ CSRF exemption; default bind to 127.0.0.1. - UX: form-failure no longer dead-ends (mt.js surfaces the error and keeps input); fix negative OFFSET on ?page=0. - Performance: size the DB pool (shared with sessions); DefaultBodyLimit on uploads; fan out thread-view fetches with try_join!; bound background link-preview spawns. reply_count is now computed live (denorm column dropped) and thread+OP creation is transactional. 353 tests green, clippy clean. Token-at-rest (S13) and HMAC method+path+nonce remain cross-repo/post-launch (see oauth-rp-refresh.md). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-15 19:13 UTC
Commit: 9e7cbdec649fe94c62b625bba67473cffa3eb287
Parent: c221672
32 files changed, +496 insertions, -189 deletions
@@ -5,7 +5,11 @@ OAUTH_CLIENT_ID=your-oauth-client-id
5 5 # Optional (defaults shown)
6 6 MNW_BASE_URL=http://127.0.0.1:3000
7 7 OAUTH_REDIRECT_URI=http://127.0.0.1:3400/auth/callback
8 - HOST=0.0.0.0
8 + # Bind loopback by default. Rate limiting trusts X-Forwarded-For, which is only
9 + # safe behind a reverse proxy that OVERWRITES that header (Caddy must set, not
10 + # append, X-Forwarded-For / X-Real-IP). Only set 0.0.0.0 if the port is meant to
11 + # be directly reachable (e.g. a tailnet-direct staging box).
12 + HOST=127.0.0.1
9 13 PORT=3400
10 14 COOKIE_SECURE=true
11 15 RUST_LOG=info
@@ -1080,7 +1080,7 @@ dependencies = [
1080 1080
1081 1081 [[package]]
1082 1082 name = "docengine"
1083 - version = "0.3.4"
1083 + version = "0.3.5"
1084 1084 dependencies = [
1085 1085 "ammonia",
1086 1086 "pulldown-cmark",
@@ -2189,7 +2189,7 @@ dependencies = [
2189 2189
2190 2190 [[package]]
2191 2191 name = "mt-core"
2192 - version = "0.3.5"
2192 + version = "0.4.0"
2193 2193 dependencies = [
2194 2194 "chrono",
2195 2195 "serde",
@@ -2198,7 +2198,7 @@ dependencies = [
2198 2198
2199 2199 [[package]]
2200 2200 name = "mt-db"
2201 - version = "0.3.5"
2201 + version = "0.4.0"
2202 2202 dependencies = [
2203 2203 "chrono",
2204 2204 "mt-core",
@@ -2227,7 +2227,7 @@ dependencies = [
2227 2227
2228 2228 [[package]]
2229 2229 name = "multithreaded"
2230 - version = "0.3.5"
2230 + version = "0.4.0"
2231 2231 dependencies = [
2232 2232 "askama",
2233 2233 "axum",
@@ -7,7 +7,7 @@ members = [
7 7 default-members = ["."]
8 8
9 9 [workspace.package]
10 - version = "0.3.5"
10 + version = "0.4.0"
11 11 edition = "2024"
12 12 license-file = "LICENSE"
13 13
@@ -71,16 +71,25 @@ pub async fn ensure_membership_with_role(
71 71 Ok(())
72 72 }
73 73
74 - /// Create a thread with an external reference. Returns the thread ID.
74 + /// Atomically create an externally-referenced thread and its opening post in
75 + /// one transaction. Returns `(thread_id, op_post_id)`.
76 + ///
77 + /// The atomicity matters most here: the MNW→MT internal path retries, and a
78 + /// committed-but-post-less thread would make the retry collide with the UNIQUE
79 + /// `external_ref` index and 500 while leaking an empty thread.
75 80 #[tracing::instrument(skip_all)]
76 - pub async fn create_thread_with_external_ref(
81 + pub async fn create_thread_with_op_external_ref(
77 82 pool: &PgPool,
78 83 category_id: Uuid,
79 84 author_id: Uuid,
80 85 title: &str,
81 86 external_ref: &str,
82 - ) -> Result<Uuid, sqlx::Error> {
83 - let row: (Uuid,) = sqlx::query_as(
87 + body_markdown: &str,
88 + body_html: &str,
89 + ) -> Result<(Uuid, Uuid), sqlx::Error> {
90 + let mut tx = pool.begin().await?;
91 +
92 + let thread: (Uuid,) = sqlx::query_as(
84 93 "INSERT INTO threads (category_id, author_id, title, external_ref)
85 94 VALUES ($1, $2, $3, $4)
86 95 RETURNING id",
@@ -89,9 +98,23 @@ pub async fn create_thread_with_external_ref(
89 98 .bind(author_id)
90 99 .bind(title)
91 100 .bind(external_ref)
92 - .fetch_one(pool)
101 + .fetch_one(&mut *tx)
93 102 .await?;
94 - Ok(row.0)
103 +
104 + let post: (Uuid,) = sqlx::query_as(
105 + "INSERT INTO posts (thread_id, author_id, body_markdown, body_html)
106 + VALUES ($1, $2, $3, $4)
107 + RETURNING id",
108 + )
109 + .bind(thread.0)
110 + .bind(author_id)
111 + .bind(body_markdown)
112 + .bind(body_html)
113 + .fetch_one(&mut *tx)
114 + .await?;
115 +
116 + tx.commit().await?;
117 + Ok((thread.0, post.0))
95 118 }
96 119
97 120 /// Ensure a user has a membership in a community. Creates a 'member' role if none exists.
@@ -113,7 +136,12 @@ pub async fn ensure_membership(
113 136 Ok(())
114 137 }
115 138
116 - /// Insert a new thread and return its ID.
139 + /// Low-level: insert a bare thread row (no opening post). Returns the thread ID.
140 + ///
141 + /// Handlers must NOT pair this with a separate `create_post` for the OP — that
142 + /// non-atomic sequence can leak a post-less thread. Use [`create_thread_with_op`]
143 + /// for the real "start a thread" operation. This primitive exists for tests and
144 + /// data construction that build posts explicitly.
117 145 #[tracing::instrument(skip_all)]
118 146 pub async fn create_thread(
119 147 pool: &PgPool,
@@ -134,9 +162,54 @@ pub async fn create_thread(
134 162 Ok(row.0)
135 163 }
136 164
137 - /// Insert a new post and bump the thread's last_activity_at atomically.
138 - /// When `is_reply` is true, also increments the denormalized reply_count.
139 - /// Pass false for the opening post (OP), true for all subsequent replies.
165 + /// Atomically create a thread and its opening post in one transaction.
166 + ///
167 + /// Returns `(thread_id, op_post_id)`. Creating the thread and OP separately
168 + /// could leave a titled, post-less thread if the second insert failed; this
169 + /// makes that state unreachable. `threads.last_activity_at` defaults to now().
170 + #[tracing::instrument(skip_all)]
171 + pub async fn create_thread_with_op(
172 + pool: &PgPool,
173 + category_id: Uuid,
174 + author_id: Uuid,
175 + title: &str,
176 + body_markdown: &str,
177 + body_html: &str,
178 + ) -> Result<(Uuid, Uuid), sqlx::Error> {
179 + let mut tx = pool.begin().await?;
180 +
181 + let thread: (Uuid,) = sqlx::query_as(
182 + "INSERT INTO threads (category_id, author_id, title)
183 + VALUES ($1, $2, $3)
184 + RETURNING id",
185 + )
186 + .bind(category_id)
187 + .bind(author_id)
188 + .bind(title)
189 + .fetch_one(&mut *tx)
190 + .await?;
191 +
192 + let post: (Uuid,) = sqlx::query_as(
193 + "INSERT INTO posts (thread_id, author_id, body_markdown, body_html)
194 + VALUES ($1, $2, $3, $4)
195 + RETURNING id",
196 + )
197 + .bind(thread.0)
198 + .bind(author_id)
199 + .bind(body_markdown)
200 + .bind(body_html)
201 + .fetch_one(&mut *tx)
202 + .await?;
203 +
204 + tx.commit().await?;
205 + Ok((thread.0, post.0))
206 + }
207 +
208 + /// Insert a reply and bump the thread's last_activity_at atomically.
209 + ///
210 + /// Opening posts are created with the thread via [`create_thread_with_op`];
211 + /// this is for replies only. Reply counts are computed live from non-removed
212 + /// posts at read time (no denormalized counter to maintain).
140 213 #[tracing::instrument(skip_all)]
141 214 pub async fn create_post(
142 215 pool: &PgPool,
@@ -144,7 +217,6 @@ pub async fn create_post(
144 217 author_id: Uuid,
145 218 body_markdown: &str,
146 219 body_html: &str,
147 - is_reply: bool,
148 220 ) -> Result<Uuid, sqlx::Error> {
149 221 let mut tx = pool.begin().await?;
150 222
@@ -160,19 +232,10 @@ pub async fn create_post(
160 232 .fetch_one(&mut *tx)
161 233 .await?;
162 234
163 - if is_reply {
164 - sqlx::query(
165 - "UPDATE threads SET last_activity_at = now(), reply_count = reply_count + 1 WHERE id = $1",
166 - )
235 + sqlx::query("UPDATE threads SET last_activity_at = now() WHERE id = $1")
167 236 .bind(thread_id)
168 237 .execute(&mut *tx)
169 238 .await?;
170 - } else {
171 - sqlx::query("UPDATE threads SET last_activity_at = now() WHERE id = $1")
172 - .bind(thread_id)
173 - .execute(&mut *tx)
174 - .await?;
175 - }
176 239
177 240 tx.commit().await?;
178 241 Ok(row.0)
@@ -262,6 +262,20 @@ pub async fn get_community_by_slug(
262 262 }
263 263
264 264 #[tracing::instrument(skip_all)]
265 + pub async fn get_community_by_id(
266 + pool: &PgPool,
267 + id: Uuid,
268 + ) -> Result<Option<CommunityRow>, sqlx::Error> {
269 + sqlx::query_as::<_, CommunityRow>(
270 + "SELECT id, name, slug, description, suspended_at, auto_hide_threshold, state
271 + FROM communities WHERE id = $1",
272 + )
273 + .bind(id)
274 + .fetch_optional(pool)
275 + .await
276 + }
277 +
278 + #[tracing::instrument(skip_all)]
265 279 pub async fn list_categories_with_counts(
266 280 pool: &PgPool,
267 281 community_slug: &str,
@@ -311,7 +325,8 @@ pub async fn list_threads_in_category_paginated(
311 325 "SELECT t.id, t.title,
312 326 COALESCE(u.display_name, u.username) AS author_name,
313 327 u.username AS author_username,
314 - t.reply_count::BIGINT AS reply_count,
328 + GREATEST((SELECT COUNT(*) FROM posts p
329 + WHERE p.thread_id = t.id AND p.removed_at IS NULL) - 1, 0)::BIGINT AS reply_count,
315 330 t.last_activity_at,
316 331 t.pinned, t.locked
317 332 FROM threads t
@@ -353,7 +368,8 @@ pub async fn list_threads_in_category_sorted(
353 368 "SELECT t.id, t.title,
354 369 COALESCE(u.display_name, u.username) AS author_name,
355 370 u.username AS author_username,
356 - t.reply_count::BIGINT AS reply_count,
371 + GREATEST((SELECT COUNT(*) FROM posts p
372 + WHERE p.thread_id = t.id AND p.removed_at IS NULL) - 1, 0)::BIGINT AS reply_count,
357 373 t.last_activity_at,
358 374 t.pinned, t.locked
359 375 FROM threads t
@@ -1293,7 +1309,8 @@ pub async fn list_threads_in_category_sorted_filtered(
1293 1309 "SELECT t.id, t.title,
1294 1310 COALESCE(u.display_name, u.username) AS author_name,
1295 1311 u.username AS author_username,
1296 - t.reply_count::BIGINT AS reply_count,
1312 + GREATEST((SELECT COUNT(*) FROM posts p
1313 + WHERE p.thread_id = t.id AND p.removed_at IS NULL) - 1, 0)::BIGINT AS reply_count,
1297 1314 t.last_activity_at,
1298 1315 t.pinned, t.locked
1299 1316 FROM threads t
@@ -1517,19 +1534,21 @@ pub async fn list_endorsements_for_posts(
1517 1534 #[derive(sqlx::FromRow)]
1518 1535 pub struct ImageRow {
1519 1536 pub id: Uuid,
1537 + pub community_id: Uuid,
1520 1538 pub s3_key: String,
1521 1539 pub content_type: String,
1522 1540 pub removed_at: Option<DateTime<Utc>>,
1523 1541 }
1524 1542
1525 - /// Fetch an image by ID (for serving).
1543 + /// Fetch an image by ID (for serving). Carries `community_id` so the serve
1544 + /// handler can enforce the owning community's access policy.
1526 1545 #[tracing::instrument(skip_all)]
1527 1546 pub async fn get_image(
1528 1547 pool: &PgPool,
1529 1548 image_id: Uuid,
1530 1549 ) -> Result<Option<ImageRow>, sqlx::Error> {
1531 1550 sqlx::query_as::<_, ImageRow>(
1532 - "SELECT id, s3_key, content_type, removed_at FROM images WHERE id = $1",
1551 + "SELECT id, community_id, s3_key, content_type, removed_at FROM images WHERE id = $1",
1533 1552 )
1534 1553 .bind(image_id)
1535 1554 .fetch_optional(pool)
@@ -0,0 +1,7 @@
1 + -- Drop the denormalized reply_count (added in 022). It was incremented on reply
2 + -- but never decremented on mod-remove/soft-delete, so it drifted upward
3 + -- permanently and the >= 0 CHECK blocked the naive decrement fix. Reply counts
4 + -- are now computed live at read time from non-removed posts
5 + -- (GREATEST(COUNT(*) FILTER (WHERE removed_at IS NULL) - 1, 0)), which cannot
6 + -- drift. Dropping the column also drops the reply_count_non_negative CHECK.
7 + ALTER TABLE threads DROP COLUMN reply_count;
@@ -355,14 +355,25 @@ pub async fn callback(
355 355 ) -> impl IntoResponse {
356 356 tracing::info!("OAuth callback received");
357 357
358 - // Verify state nonce
358 + // Read and immediately consume the one-time OAuth params. Removing them up
359 + // front makes both single-use, so a failed state check — or a replayed
360 + // callback — cannot leave a reusable PKCE verifier behind in the session.
359 361 let stored_state: Option<String> = session.get(SESSION_OAUTH_STATE).await.unwrap_or(None);
362 + let stored_verifier: Option<String> = session.get(SESSION_PKCE_VERIFIER).await.unwrap_or(None);
363 + if let Err(e) = session.remove::<String>(SESSION_OAUTH_STATE).await {
364 + tracing::warn!(error = %e, "failed to remove OAuth state from session");
365 + }
366 + if let Err(e) = session.remove::<String>(SESSION_PKCE_VERIFIER).await {
367 + tracing::warn!(error = %e, "failed to remove PKCE verifier from session");
368 + }
369 +
370 + // Verify state nonce
360 371 if stored_state.as_deref() != Some(&params.state) {
361 372 tracing::warn!(stored = ?stored_state, received = %params.state, "state mismatch");
362 373 return Redirect::to("/?error=state_mismatch");
363 374 }
364 375
365 - let verifier: String = match session.get(SESSION_PKCE_VERIFIER).await.unwrap_or(None) {
376 + let verifier: String = match stored_verifier {
366 377 Some(v) => v,
367 378 None => {
368 379 tracing::warn!("missing PKCE verifier in session");
@@ -370,14 +381,6 @@ pub async fn callback(
370 381 }
371 382 };
372 383
373 - // Clean up OAuth session data
374 - if let Err(e) = session.remove::<String>(SESSION_OAUTH_STATE).await {
375 - tracing::warn!(error = %e, "failed to remove OAuth state from session");
376 - }
377 - if let Err(e) = session.remove::<String>(SESSION_PKCE_VERIFIER).await {
378 - tracing::warn!(error = %e, "failed to remove PKCE verifier from session");
379 - }
380 -
381 384 // Exchange code for token (retry up to 2 attempts on network/5xx errors)
382 385 let token_url = format!("{}/oauth/token", state.config.mnw_base_url);
383 386 tracing::info!(%token_url, "exchanging code for token");
@@ -45,7 +45,11 @@ pub fn constant_time_compare(a: &str, b: &str) -> bool {
45 45
46 46 /// Middleware: validate X-CSRF-Token header on POST/PUT/PATCH/DELETE.
47 47 ///
48 - /// Exempt paths: `/auth/`, `/api/health`, `/_test/`.
48 + /// `/auth/` is deliberately NOT exempt: its only mutating routes (`logout`,
49 + /// `refresh`) are same-origin forms that carry the token via mt.js, so they
50 + /// get CSRF protection like everything else. `login`/`callback` are GET and so
51 + /// never reach this check. `/_test/` is only ever mounted by the integration
52 + /// harness (never in production); `/api/health` is GET-only.
49 53 pub async fn csrf_middleware(request: Request, next: Next) -> Response {
50 54 let method = request.method().clone();
51 55
@@ -55,7 +59,7 @@ pub async fn csrf_middleware(request: Request, next: Next) -> Response {
55 59
56 60 let path = request.uri().path().to_string();
57 61
58 - let exempt_prefixes = ["/auth/", "/api/health", "/_test/"];
62 + let exempt_prefixes = ["/api/health", "/_test/"];
59 63 if exempt_prefixes.iter().any(|p| path.starts_with(p)) {
60 64 return next.run(request).await;
61 65 }
@@ -18,6 +18,12 @@ use crate::AppState;
18 18 /// Maximum age (in seconds) for an internal request timestamp before it's rejected.
19 19 const MAX_TIMESTAMP_AGE_SECS: i64 = 60;
20 20
21 + /// Maximum tolerated clock skew into the future. The window was previously
22 + /// symmetric (±60s), so a captured signature was replayable across a ~120s
23 + /// band; only a few seconds of skew are legitimate, so future timestamps are
24 + /// held to a tight bound, roughly halving the replay window.
25 + const MAX_FUTURE_SKEW_SECS: i64 = 5;
26 +
21 27 /// Axum extractor that validates HMAC-SHA256 signatures on internal API requests.
22 28 /// Extracts the raw request body as `Bytes` after successful verification.
23 29 pub struct InternalAuth(pub Bytes);
@@ -67,10 +73,16 @@ impl FromRequest<AppState> for InternalAuth {
67 73 /// Compute the hex-encoded HMAC-SHA256 signature for an internal request.
68 74 /// `secret` may be any length (HMAC-SHA256 accepts any key length).
69 75 pub(crate) fn compute_internal_signature(secret: &str, timestamp_str: &str, body: &[u8]) -> String {
70 - let message = format!("{}\n{}", timestamp_str, std::str::from_utf8(body).unwrap_or(""));
76 + // MAC over raw bytes (`timestamp\n` ++ body), not a lossy UTF-8 string. For
77 + // valid-UTF-8 bodies (all our JSON) this is byte-identical to the old
78 + // `format!`-based message, so it stays wire-compatible with MNW's signer;
79 + // it also closes the latent hole where two distinct non-UTF-8 bodies both
80 + // collapsed to the empty string and signed identically.
71 81 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes())
72 82 .expect("HMAC-SHA256 accepts any key length");
73 - mac.update(message.as_bytes());
83 + mac.update(timestamp_str.as_bytes());
84 + mac.update(b"\n");
85 + mac.update(body);
74 86 hex::encode(mac.finalize().into_bytes())
75 87 }
76 88
@@ -95,8 +107,11 @@ pub(crate) fn verify_internal_signature(
95 107 .parse()
96 108 .map_err(|_| (StatusCode::UNAUTHORIZED, "Invalid timestamp"))?;
97 109
98 - if (now_unix - timestamp).abs() > MAX_TIMESTAMP_AGE_SECS {
99 - return Err((StatusCode::UNAUTHORIZED, "Timestamp too old or too far in the future"));
110 + if now_unix - timestamp > MAX_TIMESTAMP_AGE_SECS {
111 + return Err((StatusCode::UNAUTHORIZED, "Timestamp too old"));
112 + }
113 + if timestamp - now_unix > MAX_FUTURE_SKEW_SECS {
114 + return Err((StatusCode::UNAUTHORIZED, "Timestamp too far in the future"));
100 115 }
101 116
102 117 let expected = compute_internal_signature(secret, timestamp_str, body);
@@ -274,22 +289,22 @@ mod tests {
274 289
275 290 #[test]
276 291 fn verify_at_window_boundary_accepts_inside_rejects_outside() {
277 - // Pins `(now - timestamp).abs() > MAX_TIMESTAMP_AGE_SECS` (60s).
278 - // Exactly at the boundary (abs diff == 60) must be accepted (since `>`
279 - // is strict). One second past must be rejected.
292 + // Asymmetric window: up to MAX_TIMESTAMP_AGE_SECS (60s) old, but only
293 + // MAX_FUTURE_SKEW_SECS (5s) into the future. `>` is strict, so exactly
294 + // at each boundary is accepted.
280 295 let secret = "s";
281 296 let body = b"abc";
282 297 let ts = "1000";
283 298 let sig = valid(secret, ts, body);
284 299
285 - // diff = 60 → accepted
300 + // now - ts = 60 → accepted (at the age boundary)
286 301 assert!(verify_internal_signature(secret, Some(ts), Some(&sig), body, 1060).is_ok());
287 - // diff = 61 → rejected (too old)
302 + // now - ts = 61 → rejected (too old)
288 303 assert!(verify_internal_signature(secret, Some(ts), Some(&sig), body, 1061).is_err());
289 - // diff = -60 → accepted
290 - assert!(verify_internal_signature(secret, Some(ts), Some(&sig), body, 940).is_ok());
291 - // diff = -61 → rejected (too far in future)
292 - assert!(verify_internal_signature(secret, Some(ts), Some(&sig), body, 939).is_err());
304 + // ts - now = 5 → accepted (at the future-skew boundary)
305 + assert!(verify_internal_signature(secret, Some(ts), Some(&sig), body, 995).is_ok());
306 + // ts - now = 6 → rejected (too far in the future)
307 + assert!(verify_internal_signature(secret, Some(ts), Some(&sig), body, 994).is_err());
293 308 }
294 309
295 310 #[test]
@@ -30,8 +30,11 @@ fn is_private_ip(ip: std::net::IpAddr) -> bool {
30 30 }
31 31 }
32 32
33 - /// Validate that a URL is safe to fetch (no SSRF to internal networks).
34 - /// Resolves the hostname to catch alternative IP encodings (octal, hex, decimal, IPv6-mapped).
33 + /// Cheap, synchronous pre-check that a URL is safe to fetch: http(s) scheme and
34 + /// not an obvious private/literal-IP target. Hostnames are NOT resolved here —
35 + /// that (and the authoritative anti-rebinding check) happens at connect time in
36 + /// [`SsrfSafeResolver`], so this stays non-blocking and usable from reqwest's
37 + /// synchronous redirect callback.
35 38 fn validate_url(url: &str) -> bool {
36 39 let lower = url.to_ascii_lowercase();
37 40 if !lower.starts_with("http://") && !lower.starts_with("https://") {
@@ -64,15 +67,10 @@ fn validate_url(url: &str) -> bool {
64 67 return !is_private_ip(ip);
65 68 }
66 69
67 - // For hostnames, resolve and check all addresses
68 - if let Ok(addrs) = std::net::ToSocketAddrs::to_socket_addrs(&(bare_host, 80)) {
69 - for addr in addrs {
70 - if is_private_ip(addr.ip()) {
71 - return false;
72 - }
73 - }
74 - }
75 -
70 + // Hostnames are resolved and re-checked at connect time by SsrfSafeResolver,
71 + // which is what reqwest actually connects through — so a name that resolves
72 + // to a private address (including via DNS rebinding after this check) can
73 + // never be connected to.
76 74 true
77 75 }
78 76
@@ -100,9 +98,37 @@ pub fn extract_urls(input: &str) -> Vec<String> {
100 98 urls
101 99 }
102 100
103 - /// Build a reqwest client for link preview fetching with SSRF-safe redirect policy.
101 + /// Connect-time DNS resolver that drops private/reserved addresses.
102 + ///
103 + /// `validate_url` resolves and checks during validation, but reqwest re-resolves
104 + /// when it actually connects — a DNS-rebinding host could pass validation and
105 + /// then connect internally. Filtering here, at the resolver reqwest uses to
106 + /// connect, closes that TOCTOU: reqwest can only connect to an address this
107 + /// resolver returned, and we never return a private one. Resolution is async
108 + /// (`tokio::net::lookup_host`), so it also avoids blocking the runtime.
109 + struct SsrfSafeResolver;
110 +
111 + impl reqwest::dns::Resolve for SsrfSafeResolver {
112 + fn resolve(&self, name: reqwest::dns::Name) -> reqwest::dns::Resolving {
113 + Box::pin(async move {
114 + let host = name.as_str().to_string();
115 + let addrs = tokio::net::lookup_host((host.as_str(), 0)).await?;
116 + let public: Vec<std::net::SocketAddr> =
117 + addrs.filter(|a| !is_private_ip(a.ip())).collect();
118 + if public.is_empty() {
119 + return Err("no public address for host (SSRF guard)".into());
120 + }
121 + let iter: reqwest::dns::Addrs = Box::new(public.into_iter());
122 + Ok(iter)
123 + })
124 + }
125 + }
126 +
127 + /// Build a reqwest client for link preview fetching with SSRF-safe redirect
128 + /// policy and a connect-time resolver that refuses private addresses.
104 129 pub fn build_preview_client() -> reqwest::Client {
105 130 reqwest::Client::builder()
131 + .dns_resolver(std::sync::Arc::new(SsrfSafeResolver))
106 132 .redirect(reqwest::redirect::Policy::custom(|attempt| {
107 133 if !validate_url(attempt.url().as_str()) || attempt.previous().len() >= 5 {
108 134 attempt.stop()
@@ -1,5 +1,5 @@
1 1 use multithreaded::{config::Config, csrf, AppState};
2 - use sqlx::PgPool;
2 + use sqlx::postgres::PgPoolOptions;
3 3 use tokio::net::TcpListener;
4 4 use tower_http::services::ServeDir;
5 5 use tower_sessions::SessionManagerLayer;
@@ -19,7 +19,14 @@ async fn main() {
19 19 let database_url = std::env::var("DATABASE_URL")
20 20 .expect("DATABASE_URL must be set");
21 21
22 - let pool = PgPool::connect(&database_url)
22 + // Explicit pool sizing. The sqlx default is 10 connections, and this pool
23 + // is shared with the tower-sessions PostgresStore (every authed request
24 + // does session I/O on top of its handler queries), so the default is tight.
25 + // Bound acquisition so a burst fails fast rather than hanging for 30s.
26 + let pool = PgPoolOptions::new()
27 + .max_connections(20)
28 + .acquire_timeout(std::time::Duration::from_secs(10))
29 + .connect(&database_url)
23 30 .await
24 31 .expect("failed to connect to database");
25 32
@@ -113,7 +120,13 @@ async fn main() {
113 120 .merge(multithreaded::routes::internal::internal_routes(state))
114 121 .nest_service("/static", ServeDir::new("static"));
115 122
116 - let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
123 + // Default to loopback. Rate limiting uses SmartIpKeyExtractor, which trusts
124 + // X-Forwarded-For — that is only safe behind a reverse proxy (Caddy) that
125 + // *overwrites* the header. Binding to 127.0.0.1 by default keeps the app
126 + // unreachable except via that proxy, so XFF can't be spoofed by a direct
127 + // client. Environments that intentionally expose the port (e.g. a
128 + // tailnet-direct staging box) set HOST explicitly.
129 + let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
117 130 let port = std::env::var("PORT").unwrap_or_else(|_| "3400".to_string());
118 131 let addr = format!("{host}:{port}");
119 132
@@ -53,7 +53,6 @@ pub(super) async fn account_settings(
53 53 fan_plus: user.perks.fan_plus,
54 54 signature_markdown,
55 55 signature_html,
56 - error: None,
57 56 })
58 57 }
59 58
@@ -55,8 +55,14 @@ pub(super) async fn flag_post_handler(
55 55 return Err((StatusCode::FORBIDDEN, "You cannot flag your own post.").into_response());
56 56 }
57 57
58 - // Check community access
58 + // Check community access. Authorize against the post's *own* community —
59 + // not just the URL slug — so a user banned in the post's community can't
60 + // route the flag through a different slug to evade the ban check and apply
61 + // the wrong community's auto-hide threshold.
59 62 let community = get_community(&state.db, &slug).await?;
63 + if post_data.community_id != community.id {
64 + return Err((StatusCode::NOT_FOUND, "Not found").into_response());
65 + }
60 66 check_community_access(&state.db, &community, Some(user.user_id)).await?;
61 67
62 68 let detail = form.detail.as_deref().filter(|d| !d.trim().is_empty());
@@ -190,8 +190,18 @@ pub(super) async fn resolve_and_render_mentions(
190 190
191 191 /// Spawn link preview fetching as a detached background task.
192 192 /// The HTTP response returns immediately; previews appear asynchronously.
193 + /// Caps concurrent background link-preview fetches process-wide. Without it, a
194 + /// burst of posts spawns unbounded outbound HTTP fetchers, each acquiring a DB
195 + /// connection to store its results — a pool-exhaustion vector. Excess fetches
196 + /// queue on the permit instead of running at once.
197 + static LINK_PREVIEW_FETCHES: std::sync::LazyLock<tokio::sync::Semaphore> =
198 + std::sync::LazyLock::new(|| tokio::sync::Semaphore::new(8));
199 +
193 200 fn spawn_link_preview_fetch(state: AppState, body: String, post_id: Uuid) {
194 201 tokio::spawn(async move {
202 + let Ok(_permit) = LINK_PREVIEW_FETCHES.acquire().await else {
203 + return; // semaphore closed (shutdown) — skip
204 + };
195 205 fetch_and_store_link_previews(&state, &body, post_id).await;
196 206 });
197 207 }
@@ -270,19 +280,13 @@ pub(in crate::routes) async fn create_thread_handler(
270 280 &state.db, body, community.id, &slug, user.user_id, author_plus,
271 281 ).await?;
272 282
273 - let thread_id = mt_db::mutations::create_thread(&state.db, category_id, user.user_id, title)
274 - .await
275 - .map_err(|e| {
276 - tracing::error!(error = ?e, "db error creating thread");
277 - StatusCode::INTERNAL_SERVER_ERROR.into_response()
278 - })?;
279 -
280 - let post_id = mt_db::mutations::create_post(&state.db, thread_id, user.user_id, body, &body_html, false)
281 - .await
282 - .map_err(|e| {
283 - tracing::error!(error = ?e, "db error creating post");
284 - StatusCode::INTERNAL_SERVER_ERROR.into_response()
285 - })?;
283 + let (thread_id, post_id) =
284 + mt_db::mutations::create_thread_with_op(&state.db, category_id, user.user_id, title, body, &body_html)
285 + .await
286 + .map_err(|e| {
287 + tracing::error!(error = ?e, "db error creating thread");
288 + StatusCode::INTERNAL_SERVER_ERROR.into_response()
289 + })?;
286 290
287 291 if !mention_ids.is_empty() {
288 292 mt_db::mutations::insert_mentions(&state.db, post_id, &mention_ids)
@@ -358,7 +362,7 @@ pub(in crate::routes) async fn create_reply_handler(
358 362 ).await?;
359 363
360 364 let thread_id = parse_uuid(&thread_id_str)?;
361 - let post_id = mt_db::mutations::create_post(&state.db, thread_id, user.user_id, body, &body_html, true)
365 + let post_id = mt_db::mutations::create_post(&state.db, thread_id, user.user_id, body, &body_html)
362 366 .await
363 367 .map_err(|e| {
364 368 tracing::error!(error = ?e, "db error creating reply");
@@ -74,21 +74,36 @@ pub(in crate::routes) async fn thread(
74 74 false
75 75 };
76 76
77 - // Batch-fetch footnotes and endorsements for all posts on this page
77 + // Batch-fetch footnotes, endorsements, and link previews for all posts on
78 + // this page. They're independent (all keyed off the same post_ids), so run
79 + // them concurrently rather than as three serial round-trips.
78 80 let post_ids: Vec<uuid::Uuid> = db_posts.iter().map(|p| p.id).collect();
79 - let all_footnotes = mt_db::queries::list_footnotes_for_posts(&state.db, &post_ids)
80 - .await
81 - .map_err(|e| {
82 - tracing::error!(error = ?e, "db error fetching footnotes");
83 - StatusCode::INTERNAL_SERVER_ERROR.into_response()
84 - })?;
85 -
86 - let all_endorsements = mt_db::queries::list_endorsements_for_posts(&state.db, &post_ids)
87 - .await
88 - .map_err(|e| {
89 - tracing::error!(error = ?e, "db error fetching endorsements");
90 - StatusCode::INTERNAL_SERVER_ERROR.into_response()
91 - })?;
81 + let (all_footnotes, all_endorsements, all_link_previews) = tokio::try_join!(
82 + async {
83 + mt_db::queries::list_footnotes_for_posts(&state.db, &post_ids)
84 + .await
85 + .map_err(|e| {
86 + tracing::error!(error = ?e, "db error fetching footnotes");
87 + StatusCode::INTERNAL_SERVER_ERROR.into_response()
88 + })
89 + },
90 + async {
91 + mt_db::queries::list_endorsements_for_posts(&state.db, &post_ids)
92 + .await
93 + .map_err(|e| {
94 + tracing::error!(error = ?e, "db error fetching endorsements");
95 + StatusCode::INTERNAL_SERVER_ERROR.into_response()
96 + })
97 + },
98 + async {
99 + mt_db::queries::list_link_previews_for_posts(&state.db, &post_ids)
100 + .await
101 + .map_err(|e| {
102 + tracing::error!(error = ?e, "db error fetching link previews");
103 + StatusCode::INTERNAL_SERVER_ERROR.into_response()
104 + })
105 + },
106 + )?;
92 107
93 108 // Group endorsements: counts per post + set of posts current user endorsed
94 109 let mut endorsement_counts: HashMap<String, u32> = HashMap::new();
@@ -113,14 +128,6 @@ pub(in crate::routes) async fn thread(
113 128 });
114 129 }
115 130
116 - // Batch-fetch link previews
117 - let all_link_previews = mt_db::queries::list_link_previews_for_posts(&state.db, &post_ids)
118 - .await
119 - .map_err(|e| {
120 - tracing::error!(error = ?e, "db error fetching link previews");
121 - StatusCode::INTERNAL_SERVER_ERROR.into_response()
122 - })?;
123 -
124 131 let mut link_previews_by_post: HashMap<String, Vec<LinkPreviewViewRow>> = HashMap::new();
125 132 for lp in all_link_previews {
126 133 link_previews_by_post
@@ -43,8 +43,8 @@ pub(in crate::routes) async fn forum_directory(
43 43 mt_db::queries::count_communities(&state.db).await
44 44 }
45 45 .unwrap_or(0);
46 - let pagination = Pagination::new(query.page.unwrap_or(1), total, per_page);
47 - let offset = (pagination.current_page as i64 - 1) * per_page;
46 + let pagination = Pagination::new(query.page.unwrap_or(1).max(1), total, per_page);
47 + let offset = pagination.offset(per_page);
48 48
49 49 let rows = if viewing_archived {
50 50 mt_db::queries::list_archived_communities(&state.db, per_page, offset).await
@@ -314,6 +314,7 @@ pub(crate) async fn check_user_post_rate(
314 314
315 315 /// Parse a ban duration string into an optional expiration datetime.
316 316 /// Returns `Err` for unrecognized durations to prevent accidental permanent bans.
317 + #[allow(clippy::result_large_err)]
317 318 pub(crate) fn parse_duration(duration: &str) -> Result<Option<DateTime<Utc>>, Response> {
318 319 match duration {
319 320 "permanent" => Ok(None),
@@ -9,6 +9,7 @@ use axum::{
9 9 Json, Router,
10 10 };
11 11 use serde::{Deserialize, Serialize};
12 + use tower_governor::{GovernorLayer, governor::GovernorConfigBuilder, key_extractor::SmartIpKeyExtractor};
12 13 use uuid::Uuid;
13 14
14 15 use crate::internal_auth::InternalAuth;
@@ -239,26 +240,16 @@ async fn create_thread(
239 240 .await
240 241 .map_err(db_error)?;
241 242
242 - // Create thread with external_ref
243 - let thread_id = mt_db::mutations::create_thread_with_external_ref(
243 + // Create thread + opening post atomically (external_ref retry-safe).
244 + let body_html = super::render_markdown(&req.body_markdown);
245 + let (thread_id, post_id) = mt_db::mutations::create_thread_with_op_external_ref(
244 246 &state.db,
245 247 category.id,
246 248 req.author_mnw_id,
247 249 &req.title,
248 250 &req.external_ref,
249 - )
250 - .await
251 - .map_err(db_error)?;
252 -
253 - // Create opening post
254 - let body_html = super::render_markdown(&req.body_markdown);
255 - let post_id = mt_db::mutations::create_post(
256 - &state.db,
257 - thread_id,
258 - req.author_mnw_id,
259 251 &req.body_markdown,
260 252 &body_html,
261 - false,
262 253 )
263 254 .await
264 255 .map_err(db_error)?;
@@ -353,7 +344,6 @@ async fn create_post(
353 344 req.author_mnw_id,
354 345 &req.body_markdown,
355 346 &body_html,
356 - true,
357 347 )
358 348 .await
359 349 .map_err(db_error)?;
@@ -368,12 +358,28 @@ async fn create_post(
368 358 }
369 359
370 360 /// Build the internal API router. Registered outside CSRF/session middleware.
361 + ///
362 + /// HMAC-authenticated, but still rate-limited per IP so an attacker can't flood
363 + /// it with signature-guessing attempts unbounded. The cap is generous (burst
364 + /// 60, then 20/sec) — well above legitimate MNW→MT traffic.
371 365 pub fn internal_routes(state: AppState) -> Router {
366 + let internal_rate_limit = std::sync::Arc::new(
367 + GovernorConfigBuilder::default()
368 + .key_extractor(SmartIpKeyExtractor)
369 + .per_millisecond(50)
370 + .burst_size(60)
371 + .finish()
372 + .expect("internal rate limiter config"),
373 + );
374 +
372 375 Router::new()
373 376 .route("/internal/communities", post(create_community))
374 377 .route("/internal/threads", post(create_thread))
375 378 .route("/internal/threads/{id}/posts", post(create_post))
376 379 .route("/internal/threads/{id}/stats", get(thread_stats))
380 + .route_layer(GovernorLayer {
381 + config: internal_rate_limit,
382 + })
377 383 .with_state(state)
378 384 }
379 385
@@ -42,6 +42,15 @@ const WRITE_RATE_LIMIT_BURST: u32 = 10;
42 42 const SEARCH_RATE_LIMIT_MS: u64 = 1000;
43 43 const SEARCH_RATE_LIMIT_BURST: u32 = 5;
44 44
45 + /// Upload request body cap: the image size limit plus headroom for multipart
46 + /// framing. Bounds the in-memory buffer before the handler reads the field.
47 + const MAX_UPLOAD_BODY_BYTES: usize = crate::storage::MAX_IMAGE_SIZE + 64 * 1024;
48 +
49 + /// Auth endpoints: burst 10, then 1/sec. Throttles login/callback floods and
50 + /// the `/auth/refresh` → MNW userinfo amplifier.
51 + const AUTH_RATE_LIMIT_MS: u64 = 1000;
52 + const AUTH_RATE_LIMIT_BURST: u32 = 10;
53 +
45 54 /// Build the forum route tree.
46 55 pub fn forum_routes(state: AppState) -> Router {
47 56 let write_rate_limit = std::sync::Arc::new(
@@ -87,7 +96,11 @@ pub fn forum_routes(state: AppState) -> Router {
87 96 .route("/_admin/communities/{slug}/clean-slate", post(admin::admin_community_clean_slate_handler))
88 97 .route("/_admin/users/{id}/suspend", post(admin::suspend_user_handler))
89 98 .route("/_admin/users/{id}/unsuspend", post(admin::unsuspend_user_handler))
90 - .route("/p/{slug}/upload", post(uploads::upload_image_handler))
99 + .route(
100 + "/p/{slug}/upload",
101 + post(uploads::upload_image_handler)
102 + .layer(axum::extract::DefaultBodyLimit::max(MAX_UPLOAD_BODY_BYTES)),
103 + )
91 104 .route("/p/{slug}/uploads/{id}/remove", post(uploads::remove_image_handler))
92 105 .route_layer(GovernorLayer {
93 106 config: write_rate_limit,
@@ -109,7 +122,27 @@ pub fn forum_routes(state: AppState) -> Router {
109 122 config: search_rate_limit,
110 123 });
111 124
112 - // GET routes + auth + health — no rate limiting
125 + // Auth endpoints — rate limited per IP (login/callback flood + refresh
126 + // amplifier against MNW).
127 + let auth_rate_limit = std::sync::Arc::new(
128 + GovernorConfigBuilder::default()
129 + .key_extractor(SmartIpKeyExtractor)
130 + .per_millisecond(AUTH_RATE_LIMIT_MS)
131 + .burst_size(AUTH_RATE_LIMIT_BURST)
132 + .finish()
133 + .expect("auth rate limiter config"),
134 + );
135 +
136 + let auth_routes = Router::new()
137 + .route("/auth/login", get(auth::login))
138 + .route("/auth/callback", get(auth::callback))
139 + .route("/auth/logout", post(auth::logout))
140 + .route("/auth/refresh", post(auth::refresh))
141 + .route_layer(GovernorLayer {
142 + config: auth_rate_limit,
143 + });
144 +
145 + // GET routes + health — no rate limiting
113 146 let read_routes = Router::new()
114 147 .route("/", get(forum::forum_directory))
115 148 .route("/p/{slug}", get(forum::project_forum))
@@ -128,16 +161,13 @@ pub fn forum_routes(state: AppState) -> Router {
128 161 .route("/about/tracking", get(tracking::tracking_info_page))
129 162 .route("/_admin", get(admin::admin_dashboard))
130 163 .route("/_admin/communities/{slug}", get(admin::admin_community_detail))
131 - .route("/auth/login", get(auth::login))
132 - .route("/auth/callback", get(auth::callback))
133 - .route("/auth/logout", post(auth::logout))
134 - .route("/auth/refresh", post(auth::refresh))
135 164 .route("/api/user/{user_id}/summary", get(forum::user_summary_api))
136 165 .route("/uploads/{id}", get(uploads::serve_image_handler))
137 166 .route("/api/health", get(health));
138 167
139 168 read_routes
140 169 .merge(search_routes)
170 + .merge(auth_routes)
141 171 .merge(write_routes)
142 172 .fallback(not_found_handler)
143 173 .with_state(state)
@@ -332,22 +362,6 @@ fn health_body(db_ok: bool) -> serde_json::Value {
332 362 })
333 363 }
334 364
335 - #[cfg(test)]
336 - mod health_tests {
337 - use super::health_body;
338 -
339 - /// Schema-drift guard for the `mt` target. See `shared/pom-contract/`.
340 - #[test]
341 - fn pom_hetzner_health_expectations_resolve() {
342 - let body = health_body(true);
343 - pom_contract::assert_health_expectations_resolve(
344 - "../pom/deploy/pom-hetzner.toml",
345 - "mt",
346 - &body,
347 - );
348 - }
349 - }
350 -
351 365 // ============================================================================
352 366 // 404 fallback
353 367 // ============================================================================
@@ -369,3 +383,19 @@ async fn not_found_handler(
369 383 },
370 384 )
371 385 }
386 +
387 + #[cfg(test)]
388 + mod health_tests {
389 + use super::health_body;
390 +
391 + /// Schema-drift guard for the `mt` target. See `shared/pom-contract/`.
392 + #[test]
393 + fn pom_hetzner_health_expectations_resolve() {
394 + let body = health_body(true);
395 + pom_contract::assert_health_expectations_resolve(
396 + "../pom/deploy/pom-hetzner.toml",
397 + "mt",
398 + &body,
399 + );
400 + }
401 + }
@@ -120,9 +120,15 @@ pub(super) async fn upload_image_handler(
120 120 }
121 121
122 122 /// GET /uploads/{id} — serve an uploaded image (proxied from S3).
123 + ///
124 + /// Enforces the owning community's access policy: a suspended community or a
125 + /// caller banned from it cannot read the bytes. The cache is `private` so the
126 + /// access decision is never stored in a shared cache and replayed to an
127 + /// unauthorized viewer.
123 128 #[tracing::instrument(skip_all)]
124 129 pub(super) async fn serve_image_handler(
125 130 axum::extract::State(state): axum::extract::State<AppState>,
131 + MaybeUser(session_user): MaybeUser,
126 132 Path(image_id_str): Path<String>,
127 133 ) -> Result<Response, Response> {
128 134 let image_id = super::parse_uuid(&image_id_str)?;
@@ -136,15 +142,25 @@ pub(super) async fn serve_image_handler(
136 142 })?
137 143 .ok_or_else(|| StatusCode::NOT_FOUND.into_response())?;
138 144
139 - let s3 = state.s3.as_ref().ok_or_else(|| {
140 - StatusCode::SERVICE_UNAVAILABLE.into_response()
141 - })?;
142 -
143 145 // Don't serve removed images
144 146 if image.removed_at.is_some() {
145 147 return Err(StatusCode::GONE.into_response());
146 148 }
147 149
150 + // Enforce the owning community's access policy before serving any bytes.
151 + let community = mt_db::queries::get_community_by_id(&state.db, image.community_id)
152 + .await
153 + .map_err(|e| {
154 + tracing::error!(error = ?e, "db error fetching image community");
155 + StatusCode::INTERNAL_SERVER_ERROR.into_response()
156 + })?
157 + .ok_or_else(|| StatusCode::NOT_FOUND.into_response())?;
158 + check_community_access(&state.db, &community, session_user.as_ref().map(|u| u.user_id)).await?;
159 +
160 + let s3 = state.s3.as_ref().ok_or_else(|| {
161 + StatusCode::SERVICE_UNAVAILABLE.into_response()
162 + })?;
163 +
148 164 let (data, content_type) = s3.download(&image.s3_key).await.map_err(|e| {
149 165 tracing::error!(error = %e, "S3 download failed");
150 166 StatusCode::INTERNAL_SERVER_ERROR.into_response()
@@ -153,7 +169,7 @@ pub(super) async fn serve_image_handler(
153 169 Ok(Response::builder()
154 170 .status(StatusCode::OK)
155 171 .header(header::CONTENT_TYPE, content_type)
156 - .header(header::CACHE_CONTROL, "public, max-age=86400, immutable")
172 + .header(header::CACHE_CONTROL, "private, max-age=86400, immutable")
157 173 .body(Body::from(data))
158 174 .unwrap())
159 175 }
@@ -177,6 +193,20 @@ pub(super) async fn remove_image_handler(
177 193
178 194 let image_id = super::parse_uuid(&image_id_str)?;
179 195
196 + // Fetch first so we have the S3 key and can confirm the image belongs to
197 + // the community whose mod is acting (authorize against the resource, not
198 + // just the URL slug).
199 + let image = mt_db::queries::get_image(&state.db, image_id)
200 + .await
201 + .map_err(|e| {
202 + tracing::error!(error = ?e, "db error fetching image");
203 + StatusCode::INTERNAL_SERVER_ERROR.into_response()
204 + })?
205 + .ok_or_else(|| StatusCode::NOT_FOUND.into_response())?;
206 + if image.community_id != community.id {
207 + return Err(StatusCode::NOT_FOUND.into_response());
208 + }
209 +
180 210 mt_db::mutations::remove_image(&state.db, image_id, user.user_id)
181 211 .await
182 212 .map_err(|e| {
@@ -184,6 +214,15 @@ pub(super) async fn remove_image_handler(
184 214 StatusCode::INTERNAL_SERVER_ERROR.into_response()
185 215 })?;
186 216
217 + // Delete the backing S3 object so removed images don't accumulate in the
218 + // bucket forever. Best-effort: the DB row is already marked removed (serve
219 + // returns 410), so a transient S3 failure is logged, not surfaced.
220 + if let Some(s3) = state.s3.as_ref()
221 + && let Err(e) = s3.delete(&image.s3_key).await
222 + {
223 + tracing::warn!(error = %e, s3_key = %image.s3_key, "failed to delete removed image from S3");
224 + }
225 +
187 226 // Log mod action
188 227 super::log_mod_action(
189 228 &state.db,