//! SSH session handler: authentication, PTY, shell, SFTP, and input dispatch. use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use russh::keys::{HashAlg, PublicKey}; use russh::server::{Auth, Msg, Session}; use russh::{Channel, ChannelId}; use tokio::sync::Mutex; use crate::api::{MnwApiClient, UserInfo}; use crate::rate_limit::AuthRateLimiter; use crate::ssh::git; use crate::ssh::sftp::SftpSession; use crate::ssh::terminal::TerminalHandle; use crate::staging; use crate::tui; /// Per-connection handler. Created by `MnwServer::new_client()`. pub struct MnwHandler { api: MnwApiClient, peer_addr: Option, staging_dir: Arc, git_user: Arc, rate_limiter: Arc, /// Populated after successful auth. user: Option, /// Terminal dimensions (cols, rows). term_size: (u16, u16), /// TUI application handle for forwarding keypresses. app: Option, /// Channels stored between open and shell/subsystem request. channels: Arc>>>, /// Active git subprocess stdin handles, keyed by channel. git_processes: HashMap, /// Pending pipe upload: buffered stdin data + parsed args. pipe_uploads: HashMap, } /// State for a pipe-mode upload (`cat file | ssh ... upload ...`). pub struct PipeUpload { pub user: UserInfo, pub filename: String, pub project_slug: String, pub title: String, pub price_cents: i32, pub data: Vec, } impl MnwHandler { pub fn new( api: MnwApiClient, peer_addr: Option, staging_dir: Arc, git_user: Arc, rate_limiter: Arc, ) -> Self { Self { api, peer_addr, staging_dir, git_user, rate_limiter, user: None, term_size: (80, 24), app: None, channels: Arc::new(Mutex::new(HashMap::new())), git_processes: HashMap::new(), pipe_uploads: HashMap::new(), } } } impl russh::server::Handler for MnwHandler { type Error = anyhow::Error; async fn auth_publickey_offered( &mut self, _user: &str, key: &PublicKey, ) -> Result { // Per-IP rate limiting: reject early if threshold exceeded if let Some(addr) = self.peer_addr { if !self.rate_limiter.check(addr.ip()) { tracing::warn!(peer = %addr, "auth rate limit exceeded"); return Ok(Auth::Reject { proceed_with_methods: None, partial_success: false, }); } } let fingerprint = key.fingerprint(HashAlg::Sha256).to_string(); tracing::debug!(%fingerprint, peer = ?self.peer_addr, "key offered"); match self.api.lookup_ssh_key(&fingerprint).await { Ok(Some(info)) => { if info.suspended { tracing::warn!(user = %info.username, "suspended user attempted SSH login"); if let Some(addr) = self.peer_addr { self.rate_limiter.record_failure(addr.ip()); } return Ok(Auth::Reject { proceed_with_methods: None, partial_success: false, }); } self.user = Some(info); Ok(Auth::Accept) } Ok(None) => { tracing::debug!(%fingerprint, "key not found"); if let Some(addr) = self.peer_addr { self.rate_limiter.record_failure(addr.ip()); } Ok(Auth::Reject { proceed_with_methods: None, partial_success: false, }) } Err(e) => { tracing::error!(error = ?e, "SSH key lookup failed"); Ok(Auth::Reject { proceed_with_methods: None, partial_success: false, }) } } } async fn auth_publickey( &mut self, _user: &str, _key: &PublicKey, ) -> Result { // If auth_publickey_offered accepted, the user is already stored. if self.user.is_some() { Ok(Auth::Accept) } else { Ok(Auth::Reject { proceed_with_methods: None, partial_success: false, }) } } async fn channel_open_session( &mut self, channel: Channel, _session: &mut Session, ) -> Result { let channel_id = channel.id(); tracing::debug!(channel = %channel_id, "session channel opened"); // Store channel for later consumption by shell_request or subsystem_request self.channels.lock().await.insert(channel_id, channel); Ok(true) } async fn pty_request( &mut self, _channel: ChannelId, _term: &str, col_width: u32, row_height: u32, _pix_width: u32, _pix_height: u32, _modes: &[(russh::Pty, u32)], _session: &mut Session, ) -> Result<(), Self::Error> { self.term_size = (col_width as u16, row_height as u16); tracing::debug!(cols = col_width, rows = row_height, "PTY requested"); Ok(()) } async fn shell_request( &mut self, channel: ChannelId, session: &mut Session, ) -> Result<(), Self::Error> { let Some(ref user) = self.user else { tracing::warn!("shell_request without authenticated user"); session.close(channel)?; return Ok(()); }; tracing::info!(user = %user.username, "launching TUI"); let handle = session.handle(); let terminal_handle = TerminalHandle::new(handle.clone(), channel); let (cols, rows) = self.term_size; let user_clone = user.clone(); let staging_dir = staging::user_staging_dir(&self.staging_dir, &user.user_id); match tui::launch( terminal_handle, user_clone, cols, rows, handle, channel, self.api.clone(), staging_dir, ) { Ok(app_handle) => { self.app = Some(app_handle); session.channel_success(channel)?; } Err(e) => { tracing::error!(error = ?e, "TUI launch failed"); session.close(channel)?; } } Ok(()) } async fn subsystem_request( &mut self, channel_id: ChannelId, name: &str, session: &mut Session, ) -> Result<(), Self::Error> { if name != "sftp" { tracing::debug!(subsystem = name, "unsupported subsystem requested"); session.close(channel_id)?; return Ok(()); } let Some(ref user) = self.user else { tracing::warn!("subsystem_request without authenticated user"); session.close(channel_id)?; return Ok(()); }; // Take the stored channel for this ID let channel = self .channels .lock() .await .remove(&channel_id); let Some(channel) = channel else { tracing::error!("no stored channel for SFTP subsystem"); session.close(channel_id)?; return Ok(()); }; let user_staging = staging::user_staging_dir(&self.staging_dir, &user.user_id); let sftp_session = SftpSession::new( user.user_id.clone(), user.creator_tier.clone(), user_staging, ); tracing::info!(user = %user.username, "starting SFTP session"); let stream = channel.into_stream(); tokio::spawn(async move { russh_sftp::server::run(stream, sftp_session).await; }); Ok(()) } async fn exec_request( &mut self, channel: ChannelId, data: &[u8], session: &mut Session, ) -> Result<(), Self::Error> { let command_line = String::from_utf8_lossy(data); let handle = session.handle(); let Some(ref user) = self.user else { let _ = handle.close(channel).await; return Ok(()); }; // Git operations: bidirectional streaming via subprocess proxy if let Some((operation, raw_path)) = git::parse_git_command(&command_line) && let Some((owner, repo_name)) = git::parse_repo_path(raw_path) { tracing::info!( user = %user.username, %operation, %owner, %repo_name, "git operation" ); match self .api .git_authorize(&user.user_id, operation, owner, repo_name) .await { Ok(auth) => { // Auto-create bare repo on disk if it doesn't exist yet. // The server only registers the repo in the DB — we create // it here as the git user so ownership is correct. if !std::path::Path::new(&auth.repo_path).exists() { // Run git init directly (not via sudo) — mnw-cli is in the // git group and the parent dir has setgid, so the repo // gets git group ownership. Avoids systemd security // restrictions that block sudo child processes. match tokio::process::Command::new("git") .args(["init", "--bare", "--shared=group", "-b", "main", &auth.repo_path]) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) .status() .await { Ok(s) if s.success() => { tracing::info!(path = %auth.repo_path, "auto-created bare repository"); // Install post-receive hook if build trigger token is configured if let Ok(token) = std::env::var("BUILD_TRIGGER_TOKEN") { let _ = git::install_post_receive_hook( &self.git_user, &auth.repo_path, &token, ) .await; } } Ok(s) => { tracing::error!(path = %auth.repo_path, code = ?s.code(), "git init --bare failed"); let msg = bytes::Bytes::from("fatal: failed to create repository\r\n"); let _ = handle.extended_data(channel, 1, msg).await; let _ = handle.exit_status_request(channel, 1).await; let _ = handle.eof(channel).await; let _ = handle.close(channel).await; return Ok(()); } Err(e) => { tracing::error!(error = ?e, "failed to spawn git init"); let msg = bytes::Bytes::from("fatal: internal error\r\n"); let _ = handle.extended_data(channel, 1, msg).await; let _ = handle.exit_status_request(channel, 1).await; let _ = handle.eof(channel).await; let _ = handle.close(channel).await; return Ok(()); } } } match git::spawn_git_process( &self.git_user, operation, &auth.repo_path, channel, handle.clone(), ) .await { Ok(stdin) => { self.git_processes.insert(channel, stdin); session.channel_success(channel)?; } Err(e) => { tracing::error!(error = ?e, "failed to spawn git process"); let msg = bytes::Bytes::from( "fatal: internal error\r\n".to_string(), ); let _ = handle.data(channel, msg).await; let _ = handle.exit_status_request(channel, 1).await; let _ = handle.eof(channel).await; let _ = handle.close(channel).await; } } } Err(e) => { let msg = bytes::Bytes::from(format!( "fatal: {}\r\n", crate::commands::sanitize_api_error(&e) )); let _ = handle.extended_data(channel, 1, msg).await; let _ = handle.exit_status_request(channel, 1).await; let _ = handle.eof(channel).await; let _ = handle.close(channel).await; } } return Ok(()); } // Pipe upload: `cat file | ssh ... upload --filename X --project SLUG ...` if command_line.starts_with("upload ") || command_line.as_ref() == "upload" { let parts: Vec<&str> = command_line.split_whitespace().collect(); let mut filename = String::new(); let mut project_slug = String::new(); let mut title = String::new(); let mut price_cents: i32 = 0; let mut i = 1; while i < parts.len() { match parts[i] { "--filename" | "-f" => { if i + 1 < parts.len() { filename = parts[i + 1].to_string(); i += 1; } } "--project" | "-p" => { if i + 1 < parts.len() { project_slug = parts[i + 1].to_string(); i += 1; } } "--title" | "-t" => { if i + 1 < parts.len() { title = parts[i + 1].to_string(); i += 1; } } "--price" => { if i + 1 < parts.len() { price_cents = parts[i + 1].parse().unwrap_or(0); i += 1; } } _ => {} } i += 1; } if filename.is_empty() || project_slug.is_empty() { let msg = b"Usage: upload --filename NAME.ext --project SLUG [--title TITLE] [--price CENTS]\r\nPipe file data via stdin: cat file.wav | ssh cli.makenot.work upload ...\r\n"; let bytes = bytes::Bytes::copy_from_slice(msg); tokio::spawn(async move { let _ = handle.data(channel, bytes).await; let _ = handle.exit_status_request(channel, 1).await; let _ = handle.eof(channel).await; let _ = handle.close(channel).await; }); return Ok(()); } if title.is_empty() { title = staging::derive_title(&filename); } self.pipe_uploads.insert(channel, PipeUpload { user: user.clone(), filename, project_slug, title, price_cents, data: Vec::new(), }); session.channel_success(channel)?; return Ok(()); } // Check if this looks like a legacy SCP transfer if command_line.starts_with("scp ") { let msg: &[u8] = b"Use scp (not scp -O) or sftp to upload files to cli.makenot.work\r\n"; let bytes = bytes::Bytes::copy_from_slice(msg); tokio::spawn(async move { let _ = handle.data(channel, bytes).await; let _ = handle.close(channel).await; }); return Ok(()); } // Execute the command let user = user.clone(); let api = self.api.clone(); let cmd = command_line.to_string(); tracing::info!(user = %user.username, command = %cmd, "exec command"); tokio::spawn(async move { let output = crate::commands::execute(&cmd, &user, &api).await; let bytes = bytes::Bytes::from(output); let _ = handle.data(channel, bytes).await; let _ = handle.exit_status_request(channel, 0).await; let _ = handle.eof(channel).await; let _ = handle.close(channel).await; }); Ok(()) } async fn data( &mut self, channel: ChannelId, data: &[u8], _session: &mut Session, ) -> Result<(), Self::Error> { if let Some(stdin) = self.git_processes.get_mut(&channel) { use tokio::io::AsyncWriteExt; let _ = stdin.write_all(data).await; } else if let Some(upload) = self.pipe_uploads.get_mut(&channel) { upload.data.extend_from_slice(data); } else if let Some(ref app) = self.app { app.send_input(data).await; } Ok(()) } async fn channel_eof( &mut self, channel: ChannelId, session: &mut Session, ) -> Result<(), Self::Error> { if let Some(stdin) = self.git_processes.remove(&channel) { drop(stdin); // Closes pipe → subprocess sees EOF } else if let Some(upload) = self.pipe_uploads.remove(&channel) { let handle = session.handle(); let api = self.api.clone(); tokio::spawn(async move { let result = crate::commands::execute_pipe_upload(&api, upload).await; let (msg, exit_code) = match result { Ok(msg) => (msg, 0), Err(e) => (format!("Error: {}\r\n", crate::commands::sanitize_api_error(&e)), 1), }; let _ = handle.data(channel, bytes::Bytes::from(msg)).await; let _ = handle.exit_status_request(channel, exit_code).await; let _ = handle.eof(channel).await; let _ = handle.close(channel).await; }); } Ok(()) } async fn window_change_request( &mut self, _channel: ChannelId, col_width: u32, row_height: u32, _pix_width: u32, _pix_height: u32, _session: &mut Session, ) -> Result<(), Self::Error> { self.term_size = (col_width as u16, row_height as u16); if let Some(ref app) = self.app { app.send_resize(col_width as u16, row_height as u16).await; } Ok(()) } async fn auth_succeeded(&mut self, _session: &mut Session) -> Result<(), Self::Error> { if let Some(ref user) = self.user { tracing::info!(user = %user.username, peer = ?self.peer_addr, "authenticated"); } Ok(()) } }