Skip to main content

max / makenotwork

Stream git clone packfile and gate clone concurrency smart_http_upload_pack ran git upload-pack to completion and buffered the whole packfile into heap via Body::from(output.stdout) — concurrent clones of a large repo could OOM, and there was no per-clone permit. Stream the packfile straight from the child's stdout with Body::from_stream + ReaderStream, and add a git_smart_http_semaphore (cap 8) acquired on both info/refs and upload-pack. The owned permit lives in a reaper task that holds it until child.wait() returns, releasing it on completion or client disconnect (SIGPIPE). stdin is fully written then dropped before streaming, and --stateless-rpc drains it first, so there's no stdin/stdout deadlock; inbound stays capped by the 10 MB DefaultBodyLimit. Regression tests: git_smart_http_info_refs_advertises_refs, git_smart_http_upload_pack_streams_packfile. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Author: Max Johnson <me@maxj.phd> · 2026-06-15 22:19 UTC
Commit: 32f7f8a425d142fd98940ed3dcaf8131a2f72758
Parent: e218483
9 files changed, +119 insertions, -5 deletions
@@ -4360,6 +4360,7 @@ dependencies = [
4360 4360 "thiserror 2.0.18",
4361 4361 "tokio",
4362 4362 "tokio-stream",
4363 + "tokio-util",
4363 4364 "totp-rs",
4364 4365 "tower",
4365 4366 "tower-http",
@@ -22,6 +22,7 @@ serde = { version = "1.0.228", features = ["derive"] }
22 22 serde_json = "1.0.149"
23 23 tokio = { version = "1.50.0", features = ["macros", "rt-multi-thread", "net", "signal"] }
24 24 tokio-stream = { version = "0.1", features = ["sync"] }
25 + tokio-util = { version = "0.7", features = ["io"] }
25 26 tower = "0.5.3"
26 27 tower-http = { version = "0.6.8", features = ["trace", "fs", "limit", "request-id", "propagate-header", "set-header"] }
27 28 tracing = "0.1.44"
@@ -246,6 +246,10 @@ pub const GIT_REPOS_PER_PAGE: usize = 30;
246 246 pub const GIT_FILE_LOG_MAX_WALK: usize = 1000; // Max commits to walk for per-file history
247 247 pub const GIT_RAW_MAX_BYTES: usize = 100 * 1024 * 1024; // 100 MB raw download limit
248 248 pub const GIT_UPLOAD_PACK_MAX_BYTES: usize = 10 * 1024 * 1024; // 10 MB upload-pack body limit
249 + // Max concurrent git smart-HTTP clone/fetch responses. Each runs a `git
250 + // upload-pack` child and streams a packfile; this bounds the process fan-out so
251 + // a burst of clones on a large repo can't exhaust processes/memory.
252 + pub const GIT_SMART_HTTP_MAX_CONCURRENT: usize = 8;
249 253
250 254 // -- Webhook security --
251 255 pub const WEBHOOK_TIMESTAMP_TOLERANCE_SECS: u64 = 300; // 5 minutes
@@ -107,6 +107,12 @@ pub struct AppState {
107 107 /// Caps concurrent cache-miss DB lookups in `caddy-ask` so a flood of
108 108 /// unknown-domain queries can't saturate the pool or drive ACME issuance.
109 109 pub caddy_ask_semaphore: Arc<tokio::sync::Semaphore>,
110 + /// Caps concurrent git smart-HTTP clone/fetch responses. Each `git
111 + /// upload-pack` spawns a child process and streams a packfile that can be
112 + /// arbitrarily large for a big repo; without a permit, N concurrent clones
113 + /// fan out unbounded processes + memory. The packfile body is streamed (not
114 + /// buffered), so this bounds the process count, not a heap ceiling.
115 + pub git_smart_http_semaphore: Arc<tokio::sync::Semaphore>,
110 116 /// Unix timestamp when the server will restart (0 = no restart pending).
111 117 /// Set by the deploy script via the internal API before uploading a new binary.
112 118 pub restart_at: Arc<std::sync::atomic::AtomicI64>,
@@ -353,6 +353,7 @@ async fn main() {
353 353 domain_cache,
354 354 scan_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(makenotwork::constants::SCAN_MAX_CONCURRENT)),
355 355 caddy_ask_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(makenotwork::constants::CADDY_ASK_MAX_CONCURRENT)),
356 + git_smart_http_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(makenotwork::constants::GIT_SMART_HTTP_MAX_CONCURRENT)),
356 357 restart_at: std::sync::Arc::new(std::sync::atomic::AtomicI64::new(0)),
357 358 sync_notify: std::sync::Arc::new(dashmap::DashMap::new()),
358 359 sse_connections: std::sync::Arc::new(dashmap::DashMap::new()),
@@ -7,6 +7,7 @@ use axum::{
7 7 response::Response,
8 8 };
9 9 use serde::Deserialize;
10 + use tokio_util::io::ReaderStream;
10 11
11 12 use crate::{
12 13 auth::MaybeUserVerified,
@@ -109,6 +110,14 @@ pub(super) async fn smart_http_info_refs(
109 110 let root = repos_root(&state)?;
110 111 let repo_path = git::repo_disk_path(&root, &owner, repo_name)?;
111 112
113 + // Same per-clone permit as upload-pack; the ref advertisement is a `git`
114 + // child too, so it shares the fan-out budget.
115 + let _permit = state
116 + .git_smart_http_semaphore
117 + .acquire()
118 + .await
119 + .context("acquire git smart-http permit")?;
120 +
112 121 let output = tokio::process::Command::new("git")
113 122 .arg("upload-pack")
114 123 .arg("--stateless-rpc")
@@ -157,6 +166,16 @@ pub(super) async fn smart_http_upload_pack(
157 166 let root = repos_root(&state)?;
158 167 let repo_path = git::repo_disk_path(&root, &owner, repo_name)?;
159 168
169 + // One permit for the whole clone. Held until the child exits (see the reaper
170 + // task below), so the cap bounds concurrent `upload-pack` processes —
171 + // previously a burst of clones fanned out unbounded.
172 + let permit = state
173 + .git_smart_http_semaphore
174 + .clone()
175 + .acquire_owned()
176 + .await
177 + .context("acquire git smart-http permit")?;
178 +
160 179 let mut child = tokio::process::Command::new("git")
161 180 .arg("upload-pack")
162 181 .arg("--stateless-rpc")
@@ -169,16 +188,32 @@ pub(super) async fn smart_http_upload_pack(
169 188
170 189 if let Some(mut stdin) = child.stdin.take() {
171 190 use tokio::io::AsyncWriteExt;
191 + // The inbound negotiation is capped at GIT_UPLOAD_PACK_MAX_BYTES by the
192 + // route's DefaultBodyLimit, and `--stateless-rpc` reads it all before
193 + // emitting the pack, so writing it in full then dropping stdin (EOF)
194 + // can't deadlock against the still-undrained stdout.
172 195 stdin
173 196 .write_all(&body)
174 197 .await
175 198 .context("write to git upload-pack stdin")?;
176 199 }
177 200
178 - let output = child
179 - .wait_with_output()
180 - .await
181 - .context("wait git upload-pack output")?;
201 + // Stream the packfile straight from the child's stdout instead of buffering
202 + // the whole pack into heap. The pack can be arbitrarily large for a big
203 + // repo; `Body::from(output.stdout)` (the old path) OOMs under concurrent
204 + // clones. (Run #20 Performance HIGH.)
205 + let stdout = child
206 + .stdout
207 + .take()
208 + .ok_or_else(|| AppError::Internal(anyhow::anyhow!("git upload-pack stdout missing")))?;
209 +
210 + // Reap the child and release the permit once it exits. If the client
211 + // disconnects, the response body (and `stdout`) drops, git gets SIGPIPE and
212 + // exits, so `wait()` returns and the permit is freed — no leak either way.
213 + tokio::spawn(async move {
214 + let _permit = permit;
215 + let _ = child.wait().await;
216 + });
182 217
183 218 Response::builder()
184 219 .status(StatusCode::OK)
@@ -187,6 +222,6 @@ pub(super) async fn smart_http_upload_pack(
187 222 "application/x-git-upload-pack-result",
188 223 )
189 224 .header(header::CACHE_CONTROL, "no-cache")
190 - .body(Body::from(output.stdout))
225 + .body(Body::from_stream(ReaderStream::new(stdout)))
191 226 .context("build git http response")
192 227 }
@@ -394,6 +394,9 @@ impl TestHarness {
394 394 metrics_handle: None,
395 395 scan_semaphore: Arc::new(tokio::sync::Semaphore::new(4)),
396 396 caddy_ask_semaphore: Arc::new(tokio::sync::Semaphore::new(8)),
397 + git_smart_http_semaphore: Arc::new(tokio::sync::Semaphore::new(
398 + makenotwork::constants::GIT_SMART_HTTP_MAX_CONCURRENT,
399 + )),
397 400 page_view_tx: makenotwork::db::page_views::spawn_batcher(pool.clone()),
398 401 bg: makenotwork::background::spawn_pool(),
399 402 };
@@ -133,6 +133,9 @@ pub async fn run(config: LoadConfig) {
133 133 metrics_handle: None,
134 134 scan_semaphore: Arc::new(tokio::sync::Semaphore::new(4)),
135 135 caddy_ask_semaphore: Arc::new(tokio::sync::Semaphore::new(8)),
136 + git_smart_http_semaphore: Arc::new(tokio::sync::Semaphore::new(
137 + makenotwork::constants::GIT_SMART_HTTP_MAX_CONCURRENT,
138 + )),
136 139 page_view_tx: makenotwork::db::page_views::spawn_batcher(pool.clone()),
137 140 bg: makenotwork::background::spawn_pool(),
138 141 };
@@ -648,3 +648,63 @@ async fn git_tree_no_emoji() {
648 648 assert!(!resp.text.contains("\u{1F4C1}"), "Subdirectory should not have folder emoji");
649 649 assert!(!resp.text.contains("\u{1F4C4}"), "Subdirectory should not have file emoji");
650 650 }
651 +
652 + // ── Smart HTTP (git clone) ──
653 +
654 + /// Tip commit sha of refs/heads/main from the on-disk bare repo.
655 + fn main_tip_sha(dir: &std::path::Path) -> String {
656 + let bare = dir.join("testowner").join("testrepo.git");
657 + let repo = git2::Repository::open_bare(&bare).unwrap();
658 + repo.refname_to_id("refs/heads/main").unwrap().to_string()
659 + }
660 +
661 + #[tokio::test]
662 + async fn git_smart_http_info_refs_advertises_refs() {
663 + let tmp = tempfile::TempDir::new().unwrap();
664 + make_test_repo(tmp.path());
665 + let mut h = setup_git_harness(&tmp).await;
666 +
667 + let resp = h
668 + .client
669 + .get("/git/testowner/testrepo.git/info/refs?service=git-upload-pack")
670 + .await;
671 + assert!(resp.status.is_success(), "info/refs failed: {} {}", resp.status, resp.text);
672 + let ct = resp.headers.get("content-type").and_then(|v| v.to_str().ok()).unwrap_or("");
673 + assert_eq!(ct, "application/x-git-upload-pack-advertisement", "wrong content-type: {ct}");
674 + assert!(resp.text.contains("# service=git-upload-pack"), "missing service banner: {}", resp.text);
675 + assert!(resp.text.contains("refs/heads/main"), "advertisement missing main ref: {}", resp.text);
676 + }
677 +
678 + #[tokio::test]
679 + async fn git_smart_http_upload_pack_streams_packfile() {
680 + // Exercises the streamed (Body::from_stream) upload-pack response added in
681 + // Run #20 (Performance HIGH): a real clone negotiation must still produce a
682 + // valid packfile, proving the stream + concurrency-permit path didn't
683 + // corrupt the protocol framing.
684 + let tmp = tempfile::TempDir::new().unwrap();
685 + make_test_repo(tmp.path());
686 + let sha = main_tip_sha(tmp.path());
687 + let mut h = setup_git_harness(&tmp).await;
688 +
689 + // Minimal upload-pack request: one want line (capabilities ride the first
690 + // line; no side-band so the pack returns raw), flush, then done.
691 + let want = format!("want {sha} ofs-delta agent=git/test\n");
692 + let body = format!("{:04x}{want}00000009done\n", want.len() + 4);
693 +
694 + let resp = h
695 + .client
696 + .request_with_headers(
697 + "POST",
698 + "/git/testowner/testrepo.git/git-upload-pack",
699 + Some(&body),
700 + &[("Content-Type", "application/x-git-upload-pack-request")],
701 + )
702 + .await;
703 +
704 + assert!(resp.status.is_success(), "upload-pack failed: {} {}", resp.status, resp.text);
705 + let ct = resp.headers.get("content-type").and_then(|v| v.to_str().ok()).unwrap_or("");
706 + assert_eq!(ct, "application/x-git-upload-pack-result", "wrong content-type: {ct}");
707 + // The streamed body must carry the packfile magic — ASCII "PACK" survives
708 + // the lossy-UTF8 view the test client exposes.
709 + assert!(resp.text.contains("PACK"), "streamed response carried no packfile ({} bytes)", resp.text.len());
710 + }