Skip to main content

max / makenotwork

19.3 KB · 529 lines History Blame Raw
1 //! SSH session handler: authentication, PTY, shell, SFTP, and input dispatch.
2
3 use std::collections::HashMap;
4 use std::net::SocketAddr;
5 use std::path::PathBuf;
6 use std::sync::Arc;
7
8 use russh::keys::{HashAlg, PublicKey};
9 use russh::server::{Auth, Msg, Session};
10 use russh::{Channel, ChannelId};
11 use tokio::sync::Mutex;
12
13 use crate::api::{MnwApiClient, UserInfo};
14 use crate::rate_limit::AuthRateLimiter;
15 use crate::ssh::git;
16 use crate::ssh::sftp::SftpSession;
17 use crate::ssh::terminal::TerminalHandle;
18 use crate::staging;
19 use crate::tui;
20
21 /// Per-connection handler. Created by `MnwServer::new_client()`.
22 pub struct MnwHandler {
23 api: MnwApiClient,
24 peer_addr: Option<SocketAddr>,
25 staging_dir: Arc<PathBuf>,
26 git_user: Arc<str>,
27 rate_limiter: Arc<AuthRateLimiter>,
28 /// Populated after successful auth.
29 user: Option<UserInfo>,
30 /// Terminal dimensions (cols, rows).
31 term_size: (u16, u16),
32 /// TUI application handle for forwarding keypresses.
33 app: Option<tui::AppHandle>,
34 /// Channels stored between open and shell/subsystem request.
35 channels: Arc<Mutex<HashMap<ChannelId, Channel<Msg>>>>,
36 /// Active git subprocess stdin handles, keyed by channel.
37 git_processes: HashMap<ChannelId, tokio::process::ChildStdin>,
38 /// Pending pipe upload: buffered stdin data + parsed args.
39 pipe_uploads: HashMap<ChannelId, PipeUpload>,
40 }
41
42 /// State for a pipe-mode upload (`cat file | ssh ... upload ...`).
43 pub struct PipeUpload {
44 pub user: UserInfo,
45 pub filename: String,
46 pub project_slug: String,
47 pub title: String,
48 pub price_cents: i32,
49 pub data: Vec<u8>,
50 }
51
52 impl MnwHandler {
53 pub fn new(
54 api: MnwApiClient,
55 peer_addr: Option<SocketAddr>,
56 staging_dir: Arc<PathBuf>,
57 git_user: Arc<str>,
58 rate_limiter: Arc<AuthRateLimiter>,
59 ) -> Self {
60 Self {
61 api,
62 peer_addr,
63 staging_dir,
64 git_user,
65 rate_limiter,
66 user: None,
67 term_size: (80, 24),
68 app: None,
69 channels: Arc::new(Mutex::new(HashMap::new())),
70 git_processes: HashMap::new(),
71 pipe_uploads: HashMap::new(),
72 }
73 }
74 }
75
76 impl russh::server::Handler for MnwHandler {
77 type Error = anyhow::Error;
78
79 async fn auth_publickey_offered(
80 &mut self,
81 _user: &str,
82 key: &PublicKey,
83 ) -> Result<Auth, Self::Error> {
84 // Per-IP rate limiting: reject early if threshold exceeded
85 if let Some(addr) = self.peer_addr {
86 if !self.rate_limiter.check(addr.ip()) {
87 tracing::warn!(peer = %addr, "auth rate limit exceeded");
88 return Ok(Auth::Reject {
89 proceed_with_methods: None,
90 partial_success: false,
91 });
92 }
93 }
94
95 let fingerprint = key.fingerprint(HashAlg::Sha256).to_string();
96 tracing::debug!(%fingerprint, peer = ?self.peer_addr, "key offered");
97
98 match self.api.lookup_ssh_key(&fingerprint).await {
99 Ok(Some(info)) => {
100 if info.suspended {
101 tracing::warn!(user = %info.username, "suspended user attempted SSH login");
102 if let Some(addr) = self.peer_addr {
103 self.rate_limiter.record_failure(addr.ip());
104 }
105 return Ok(Auth::Reject {
106 proceed_with_methods: None,
107 partial_success: false,
108 });
109 }
110 self.user = Some(info);
111 Ok(Auth::Accept)
112 }
113 Ok(None) => {
114 tracing::debug!(%fingerprint, "key not found");
115 if let Some(addr) = self.peer_addr {
116 self.rate_limiter.record_failure(addr.ip());
117 }
118 Ok(Auth::Reject {
119 proceed_with_methods: None,
120 partial_success: false,
121 })
122 }
123 Err(e) => {
124 tracing::error!(error = ?e, "SSH key lookup failed");
125 Ok(Auth::Reject {
126 proceed_with_methods: None,
127 partial_success: false,
128 })
129 }
130 }
131 }
132
133 async fn auth_publickey(
134 &mut self,
135 _user: &str,
136 _key: &PublicKey,
137 ) -> Result<Auth, Self::Error> {
138 // If auth_publickey_offered accepted, the user is already stored.
139 if self.user.is_some() {
140 Ok(Auth::Accept)
141 } else {
142 Ok(Auth::Reject {
143 proceed_with_methods: None,
144 partial_success: false,
145 })
146 }
147 }
148
149 async fn channel_open_session(
150 &mut self,
151 channel: Channel<Msg>,
152 _session: &mut Session,
153 ) -> Result<bool, Self::Error> {
154 let channel_id = channel.id();
155 tracing::debug!(channel = %channel_id, "session channel opened");
156 // Store channel for later consumption by shell_request or subsystem_request
157 self.channels.lock().await.insert(channel_id, channel);
158 Ok(true)
159 }
160
161 async fn pty_request(
162 &mut self,
163 _channel: ChannelId,
164 _term: &str,
165 col_width: u32,
166 row_height: u32,
167 _pix_width: u32,
168 _pix_height: u32,
169 _modes: &[(russh::Pty, u32)],
170 _session: &mut Session,
171 ) -> Result<(), Self::Error> {
172 self.term_size = (col_width as u16, row_height as u16);
173 tracing::debug!(cols = col_width, rows = row_height, "PTY requested");
174 Ok(())
175 }
176
177 async fn shell_request(
178 &mut self,
179 channel: ChannelId,
180 session: &mut Session,
181 ) -> Result<(), Self::Error> {
182 let Some(ref user) = self.user else {
183 tracing::warn!("shell_request without authenticated user");
184 session.close(channel)?;
185 return Ok(());
186 };
187
188 tracing::info!(user = %user.username, "launching TUI");
189
190 let handle = session.handle();
191 let terminal_handle = TerminalHandle::new(handle.clone(), channel);
192 let (cols, rows) = self.term_size;
193
194 let user_clone = user.clone();
195 let staging_dir = staging::user_staging_dir(&self.staging_dir, &user.user_id);
196
197 match tui::launch(
198 terminal_handle,
199 user_clone,
200 cols,
201 rows,
202 handle,
203 channel,
204 self.api.clone(),
205 staging_dir,
206 ) {
207 Ok(app_handle) => {
208 self.app = Some(app_handle);
209 session.channel_success(channel)?;
210 }
211 Err(e) => {
212 tracing::error!(error = ?e, "TUI launch failed");
213 session.close(channel)?;
214 }
215 }
216
217 Ok(())
218 }
219
220 async fn subsystem_request(
221 &mut self,
222 channel_id: ChannelId,
223 name: &str,
224 session: &mut Session,
225 ) -> Result<(), Self::Error> {
226 if name != "sftp" {
227 tracing::debug!(subsystem = name, "unsupported subsystem requested");
228 session.close(channel_id)?;
229 return Ok(());
230 }
231
232 let Some(ref user) = self.user else {
233 tracing::warn!("subsystem_request without authenticated user");
234 session.close(channel_id)?;
235 return Ok(());
236 };
237
238 // Take the stored channel for this ID
239 let channel = self
240 .channels
241 .lock()
242 .await
243 .remove(&channel_id);
244
245 let Some(channel) = channel else {
246 tracing::error!("no stored channel for SFTP subsystem");
247 session.close(channel_id)?;
248 return Ok(());
249 };
250
251 let user_staging = staging::user_staging_dir(&self.staging_dir, &user.user_id);
252 let sftp_session = SftpSession::new(
253 user.user_id.clone(),
254 user.creator_tier.clone(),
255 user_staging,
256 );
257
258 tracing::info!(user = %user.username, "starting SFTP session");
259
260 let stream = channel.into_stream();
261 tokio::spawn(async move {
262 russh_sftp::server::run(stream, sftp_session).await;
263 });
264
265 Ok(())
266 }
267
268 async fn exec_request(
269 &mut self,
270 channel: ChannelId,
271 data: &[u8],
272 session: &mut Session,
273 ) -> Result<(), Self::Error> {
274 let command_line = String::from_utf8_lossy(data);
275 let handle = session.handle();
276
277 let Some(ref user) = self.user else {
278 let _ = handle.close(channel).await;
279 return Ok(());
280 };
281
282 // Git operations: bidirectional streaming via subprocess proxy
283 if let Some((operation, raw_path)) = git::parse_git_command(&command_line)
284 && let Some((owner, repo_name)) = git::parse_repo_path(raw_path)
285 {
286 tracing::info!(
287 user = %user.username,
288 %operation,
289 %owner,
290 %repo_name,
291 "git operation"
292 );
293
294 match self
295 .api
296 .git_authorize(&user.user_id, operation, owner, repo_name)
297 .await
298 {
299 Ok(auth) => {
300 // Auto-create bare repo on disk if it doesn't exist yet.
301 // The server only registers the repo in the DB — we create
302 // it here as the git user so ownership is correct.
303 if !std::path::Path::new(&auth.repo_path).exists() {
304 // Run git init directly (not via sudo) — mnw-cli is in the
305 // git group and the parent dir has setgid, so the repo
306 // gets git group ownership. Avoids systemd security
307 // restrictions that block sudo child processes.
308 match tokio::process::Command::new("git")
309 .args(["init", "--bare", "--shared=group", "-b", "main", &auth.repo_path])
310 .stdout(std::process::Stdio::null())
311 .stderr(std::process::Stdio::null())
312 .status()
313 .await
314 {
315 Ok(s) if s.success() => {
316 tracing::info!(path = %auth.repo_path, "auto-created bare repository");
317 // Install post-receive hook if build trigger token is configured
318 if let Ok(token) = std::env::var("BUILD_TRIGGER_TOKEN") {
319 let _ = git::install_post_receive_hook(
320 &self.git_user,
321 &auth.repo_path,
322 &token,
323 )
324 .await;
325 }
326 }
327 Ok(s) => {
328 tracing::error!(path = %auth.repo_path, code = ?s.code(), "git init --bare failed");
329 let msg = bytes::Bytes::from("fatal: failed to create repository\r\n");
330 let _ = handle.extended_data(channel, 1, msg).await;
331 let _ = handle.exit_status_request(channel, 1).await;
332 let _ = handle.eof(channel).await;
333 let _ = handle.close(channel).await;
334 return Ok(());
335 }
336 Err(e) => {
337 tracing::error!(error = ?e, "failed to spawn git init");
338 let msg = bytes::Bytes::from("fatal: internal error\r\n");
339 let _ = handle.extended_data(channel, 1, msg).await;
340 let _ = handle.exit_status_request(channel, 1).await;
341 let _ = handle.eof(channel).await;
342 let _ = handle.close(channel).await;
343 return Ok(());
344 }
345 }
346 }
347
348 match git::spawn_git_process(
349 &self.git_user,
350 operation,
351 &auth.repo_path,
352 channel,
353 handle.clone(),
354 )
355 .await
356 {
357 Ok(stdin) => {
358 self.git_processes.insert(channel, stdin);
359 session.channel_success(channel)?;
360 }
361 Err(e) => {
362 tracing::error!(error = ?e, "failed to spawn git process");
363 let msg = bytes::Bytes::from(
364 "fatal: internal error\r\n".to_string(),
365 );
366 let _ = handle.data(channel, msg).await;
367 let _ = handle.exit_status_request(channel, 1).await;
368 let _ = handle.eof(channel).await;
369 let _ = handle.close(channel).await;
370 }
371 }
372 }
373 Err(e) => {
374 let msg = bytes::Bytes::from(format!(
375 "fatal: {}\r\n",
376 crate::commands::sanitize_api_error(&e)
377 ));
378 let _ = handle.extended_data(channel, 1, msg).await;
379 let _ = handle.exit_status_request(channel, 1).await;
380 let _ = handle.eof(channel).await;
381 let _ = handle.close(channel).await;
382 }
383 }
384 return Ok(());
385 }
386
387 // Pipe upload: `cat file | ssh ... upload --filename X --project SLUG ...`
388 if command_line.starts_with("upload ") || command_line.as_ref() == "upload" {
389 let parts: Vec<&str> = command_line.split_whitespace().collect();
390 let mut filename = String::new();
391 let mut project_slug = String::new();
392 let mut title = String::new();
393 let mut price_cents: i32 = 0;
394 let mut i = 1;
395 while i < parts.len() {
396 match parts[i] {
397 "--filename" | "-f" => { if i + 1 < parts.len() { filename = parts[i + 1].to_string(); i += 1; } }
398 "--project" | "-p" => { if i + 1 < parts.len() { project_slug = parts[i + 1].to_string(); i += 1; } }
399 "--title" | "-t" => { if i + 1 < parts.len() { title = parts[i + 1].to_string(); i += 1; } }
400 "--price" => { if i + 1 < parts.len() { price_cents = parts[i + 1].parse().unwrap_or(0); i += 1; } }
401 _ => {}
402 }
403 i += 1;
404 }
405
406 if filename.is_empty() || project_slug.is_empty() {
407 let msg = b"Usage: upload --filename NAME.ext --project SLUG [--title TITLE] [--price CENTS]\r\nPipe file data via stdin: cat file.wav | ssh cli.makenot.work upload ...\r\n";
408 let bytes = bytes::Bytes::copy_from_slice(msg);
409 tokio::spawn(async move {
410 let _ = handle.data(channel, bytes).await;
411 let _ = handle.exit_status_request(channel, 1).await;
412 let _ = handle.eof(channel).await;
413 let _ = handle.close(channel).await;
414 });
415 return Ok(());
416 }
417
418 if title.is_empty() {
419 title = staging::derive_title(&filename);
420 }
421
422 self.pipe_uploads.insert(channel, PipeUpload {
423 user: user.clone(),
424 filename,
425 project_slug,
426 title,
427 price_cents,
428 data: Vec::new(),
429 });
430 session.channel_success(channel)?;
431 return Ok(());
432 }
433
434 // Check if this looks like a legacy SCP transfer
435 if command_line.starts_with("scp ") {
436 let msg: &[u8] =
437 b"Use scp (not scp -O) or sftp to upload files to cli.makenot.work\r\n";
438 let bytes = bytes::Bytes::copy_from_slice(msg);
439 tokio::spawn(async move {
440 let _ = handle.data(channel, bytes).await;
441 let _ = handle.close(channel).await;
442 });
443 return Ok(());
444 }
445
446 // Execute the command
447 let user = user.clone();
448 let api = self.api.clone();
449 let cmd = command_line.to_string();
450 tracing::info!(user = %user.username, command = %cmd, "exec command");
451
452 tokio::spawn(async move {
453 let output = crate::commands::execute(&cmd, &user, &api).await;
454 let bytes = bytes::Bytes::from(output);
455 let _ = handle.data(channel, bytes).await;
456 let _ = handle.exit_status_request(channel, 0).await;
457 let _ = handle.eof(channel).await;
458 let _ = handle.close(channel).await;
459 });
460
461 Ok(())
462 }
463
464 async fn data(
465 &mut self,
466 channel: ChannelId,
467 data: &[u8],
468 _session: &mut Session,
469 ) -> Result<(), Self::Error> {
470 if let Some(stdin) = self.git_processes.get_mut(&channel) {
471 use tokio::io::AsyncWriteExt;
472 let _ = stdin.write_all(data).await;
473 } else if let Some(upload) = self.pipe_uploads.get_mut(&channel) {
474 upload.data.extend_from_slice(data);
475 } else if let Some(ref app) = self.app {
476 app.send_input(data).await;
477 }
478 Ok(())
479 }
480
481 async fn channel_eof(
482 &mut self,
483 channel: ChannelId,
484 session: &mut Session,
485 ) -> Result<(), Self::Error> {
486 if let Some(stdin) = self.git_processes.remove(&channel) {
487 drop(stdin); // Closes pipe → subprocess sees EOF
488 } else if let Some(upload) = self.pipe_uploads.remove(&channel) {
489 let handle = session.handle();
490 let api = self.api.clone();
491 tokio::spawn(async move {
492 let result = crate::commands::execute_pipe_upload(&api, upload).await;
493 let (msg, exit_code) = match result {
494 Ok(msg) => (msg, 0),
495 Err(e) => (format!("Error: {}\r\n", crate::commands::sanitize_api_error(&e)), 1),
496 };
497 let _ = handle.data(channel, bytes::Bytes::from(msg)).await;
498 let _ = handle.exit_status_request(channel, exit_code).await;
499 let _ = handle.eof(channel).await;
500 let _ = handle.close(channel).await;
501 });
502 }
503 Ok(())
504 }
505
506 async fn window_change_request(
507 &mut self,
508 _channel: ChannelId,
509 col_width: u32,
510 row_height: u32,
511 _pix_width: u32,
512 _pix_height: u32,
513 _session: &mut Session,
514 ) -> Result<(), Self::Error> {
515 self.term_size = (col_width as u16, row_height as u16);
516 if let Some(ref app) = self.app {
517 app.send_resize(col_width as u16, row_height as u16).await;
518 }
519 Ok(())
520 }
521
522 async fn auth_succeeded(&mut self, _session: &mut Session) -> Result<(), Self::Error> {
523 if let Some(ref user) = self.user {
524 tracing::info!(user = %user.username, peer = ?self.peer_addr, "authenticated");
525 }
526 Ok(())
527 }
528 }
529