Skip to main content

max / mnw-cli

11.7 KB · 373 lines History Blame Raw
1 //! SSH session handler: authentication, PTY, shell, SFTP, and input dispatch.
2
3 use std::collections::HashMap;
4 use std::net::SocketAddr;
5 use std::path::PathBuf;
6 use std::sync::Arc;
7
8 use russh::keys::{HashAlg, PublicKey};
9 use russh::server::{Auth, Msg, Session};
10 use russh::{Channel, ChannelId};
11 use tokio::sync::Mutex;
12
13 use crate::api::{MnwApiClient, UserInfo};
14 use crate::ssh::git;
15 use crate::ssh::sftp::SftpSession;
16 use crate::ssh::terminal::TerminalHandle;
17 use crate::staging;
18 use crate::tui;
19
20 /// Per-connection handler. Created by `MnwServer::new_client()`.
21 pub struct MnwHandler {
22 api: MnwApiClient,
23 peer_addr: Option<SocketAddr>,
24 staging_dir: Arc<PathBuf>,
25 git_user: Arc<str>,
26 /// Populated after successful auth.
27 user: Option<UserInfo>,
28 /// Terminal dimensions (cols, rows).
29 term_size: (u16, u16),
30 /// TUI application handle for forwarding keypresses.
31 app: Option<tui::AppHandle>,
32 /// Channels stored between open and shell/subsystem request.
33 channels: Arc<Mutex<HashMap<ChannelId, Channel<Msg>>>>,
34 /// Active git subprocess stdin handles, keyed by channel.
35 git_processes: HashMap<ChannelId, tokio::process::ChildStdin>,
36 }
37
38 impl MnwHandler {
39 pub fn new(
40 api: MnwApiClient,
41 peer_addr: Option<SocketAddr>,
42 staging_dir: Arc<PathBuf>,
43 git_user: Arc<str>,
44 ) -> Self {
45 Self {
46 api,
47 peer_addr,
48 staging_dir,
49 git_user,
50 user: None,
51 term_size: (80, 24),
52 app: None,
53 channels: Arc::new(Mutex::new(HashMap::new())),
54 git_processes: HashMap::new(),
55 }
56 }
57 }
58
59 impl russh::server::Handler for MnwHandler {
60 type Error = anyhow::Error;
61
62 async fn auth_publickey_offered(
63 &mut self,
64 _user: &str,
65 key: &PublicKey,
66 ) -> Result<Auth, Self::Error> {
67 let fingerprint = key.fingerprint(HashAlg::Sha256).to_string();
68 tracing::debug!(%fingerprint, peer = ?self.peer_addr, "key offered");
69
70 match self.api.lookup_ssh_key(&fingerprint).await {
71 Ok(Some(info)) => {
72 if info.suspended {
73 tracing::warn!(user = %info.username, "suspended user attempted SSH login");
74 return Ok(Auth::Reject {
75 proceed_with_methods: None,
76 partial_success: false,
77 });
78 }
79 self.user = Some(info);
80 Ok(Auth::Accept)
81 }
82 Ok(None) => {
83 tracing::debug!(%fingerprint, "key not found");
84 Ok(Auth::Reject {
85 proceed_with_methods: None,
86 partial_success: false,
87 })
88 }
89 Err(e) => {
90 tracing::error!(error = ?e, "SSH key lookup failed");
91 Ok(Auth::Reject {
92 proceed_with_methods: None,
93 partial_success: false,
94 })
95 }
96 }
97 }
98
99 async fn auth_publickey(
100 &mut self,
101 _user: &str,
102 _key: &PublicKey,
103 ) -> Result<Auth, Self::Error> {
104 // If auth_publickey_offered accepted, the user is already stored.
105 if self.user.is_some() {
106 Ok(Auth::Accept)
107 } else {
108 Ok(Auth::Reject {
109 proceed_with_methods: None,
110 partial_success: false,
111 })
112 }
113 }
114
115 async fn channel_open_session(
116 &mut self,
117 channel: Channel<Msg>,
118 _session: &mut Session,
119 ) -> Result<bool, Self::Error> {
120 let channel_id = channel.id();
121 tracing::debug!(channel = %channel_id, "session channel opened");
122 // Store channel for later consumption by shell_request or subsystem_request
123 self.channels.lock().await.insert(channel_id, channel);
124 Ok(true)
125 }
126
127 async fn pty_request(
128 &mut self,
129 _channel: ChannelId,
130 _term: &str,
131 col_width: u32,
132 row_height: u32,
133 _pix_width: u32,
134 _pix_height: u32,
135 _modes: &[(russh::Pty, u32)],
136 _session: &mut Session,
137 ) -> Result<(), Self::Error> {
138 self.term_size = (col_width as u16, row_height as u16);
139 tracing::debug!(cols = col_width, rows = row_height, "PTY requested");
140 Ok(())
141 }
142
143 async fn shell_request(
144 &mut self,
145 channel: ChannelId,
146 session: &mut Session,
147 ) -> Result<(), Self::Error> {
148 let Some(ref user) = self.user else {
149 tracing::warn!("shell_request without authenticated user");
150 session.close(channel)?;
151 return Ok(());
152 };
153
154 tracing::info!(user = %user.username, "launching TUI");
155
156 let handle = session.handle();
157 let terminal_handle = TerminalHandle::new(handle.clone(), channel);
158 let (cols, rows) = self.term_size;
159
160 let user_clone = user.clone();
161 let staging_dir = staging::user_staging_dir(&self.staging_dir, &user.user_id);
162 let app_handle = tui::launch(
163 terminal_handle,
164 user_clone,
165 cols,
166 rows,
167 handle,
168 channel,
169 self.api.clone(),
170 staging_dir,
171 )?;
172 self.app = Some(app_handle);
173
174 Ok(())
175 }
176
177 async fn subsystem_request(
178 &mut self,
179 channel_id: ChannelId,
180 name: &str,
181 session: &mut Session,
182 ) -> Result<(), Self::Error> {
183 if name != "sftp" {
184 tracing::debug!(subsystem = name, "unsupported subsystem requested");
185 session.close(channel_id)?;
186 return Ok(());
187 }
188
189 let Some(ref user) = self.user else {
190 tracing::warn!("subsystem_request without authenticated user");
191 session.close(channel_id)?;
192 return Ok(());
193 };
194
195 // Take the stored channel for this ID
196 let channel = self
197 .channels
198 .lock()
199 .await
200 .remove(&channel_id);
201
202 let Some(channel) = channel else {
203 tracing::error!("no stored channel for SFTP subsystem");
204 session.close(channel_id)?;
205 return Ok(());
206 };
207
208 let user_staging = staging::user_staging_dir(&self.staging_dir, &user.user_id);
209 let sftp_session = SftpSession::new(
210 user.user_id.clone(),
211 user.creator_tier.clone(),
212 user_staging,
213 );
214
215 tracing::info!(user = %user.username, "starting SFTP session");
216
217 let stream = channel.into_stream();
218 tokio::spawn(async move {
219 russh_sftp::server::run(stream, sftp_session).await;
220 });
221
222 Ok(())
223 }
224
225 async fn exec_request(
226 &mut self,
227 channel: ChannelId,
228 data: &[u8],
229 session: &mut Session,
230 ) -> Result<(), Self::Error> {
231 let command_line = String::from_utf8_lossy(data);
232 let handle = session.handle();
233
234 let Some(ref user) = self.user else {
235 let _ = handle.close(channel).await;
236 return Ok(());
237 };
238
239 // Git operations: bidirectional streaming via subprocess proxy
240 if let Some((operation, raw_path)) = git::parse_git_command(&command_line)
241 && let Some((owner, repo_name)) = git::parse_repo_path(raw_path)
242 {
243 tracing::info!(
244 user = %user.username,
245 %operation,
246 %owner,
247 %repo_name,
248 "git operation"
249 );
250
251 match self
252 .api
253 .git_authorize(&user.user_id, operation, owner, repo_name)
254 .await
255 {
256 Ok(auth) => {
257 match git::spawn_git_process(
258 &self.git_user,
259 operation,
260 &auth.repo_path,
261 channel,
262 handle.clone(),
263 )
264 .await
265 {
266 Ok(stdin) => {
267 self.git_processes.insert(channel, stdin);
268 session.channel_success(channel)?;
269 }
270 Err(e) => {
271 tracing::error!(error = ?e, "failed to spawn git process");
272 let msg = bytes::Bytes::from(
273 "fatal: internal error\r\n".to_string(),
274 );
275 let _ = handle.data(channel, msg).await;
276 let _ = handle.exit_status_request(channel, 1).await;
277 let _ = handle.eof(channel).await;
278 let _ = handle.close(channel).await;
279 }
280 }
281 }
282 Err(e) => {
283 let msg =
284 bytes::Bytes::from(format!("fatal: {e}\r\n"));
285 let _ = handle.extended_data(channel, 1, msg).await;
286 let _ = handle.exit_status_request(channel, 1).await;
287 let _ = handle.eof(channel).await;
288 let _ = handle.close(channel).await;
289 }
290 }
291 return Ok(());
292 }
293
294 // Check if this looks like a legacy SCP transfer
295 if command_line.starts_with("scp ") {
296 let msg: &[u8] =
297 b"Use scp (not scp -O) or sftp to upload files to cli.makenot.work\r\n";
298 let bytes = bytes::Bytes::copy_from_slice(msg);
299 tokio::spawn(async move {
300 let _ = handle.data(channel, bytes).await;
301 let _ = handle.close(channel).await;
302 });
303 return Ok(());
304 }
305
306 // Execute the command
307 let user = user.clone();
308 let api = self.api.clone();
309 let cmd = command_line.to_string();
310 tracing::info!(user = %user.username, command = %cmd, "exec command");
311
312 tokio::spawn(async move {
313 let output = crate::commands::execute(&cmd, &user, &api).await;
314 let bytes = bytes::Bytes::from(output);
315 let _ = handle.data(channel, bytes).await;
316 let _ = handle.exit_status_request(channel, 0).await;
317 let _ = handle.eof(channel).await;
318 let _ = handle.close(channel).await;
319 });
320
321 Ok(())
322 }
323
324 async fn data(
325 &mut self,
326 channel: ChannelId,
327 data: &[u8],
328 _session: &mut Session,
329 ) -> Result<(), Self::Error> {
330 if let Some(stdin) = self.git_processes.get_mut(&channel) {
331 use tokio::io::AsyncWriteExt;
332 let _ = stdin.write_all(data).await;
333 } else if let Some(ref app) = self.app {
334 app.send_input(data).await;
335 }
336 Ok(())
337 }
338
339 async fn channel_eof(
340 &mut self,
341 channel: ChannelId,
342 _session: &mut Session,
343 ) -> Result<(), Self::Error> {
344 if let Some(stdin) = self.git_processes.remove(&channel) {
345 drop(stdin); // Closes pipe → subprocess sees EOF
346 }
347 Ok(())
348 }
349
350 async fn window_change_request(
351 &mut self,
352 _channel: ChannelId,
353 col_width: u32,
354 row_height: u32,
355 _pix_width: u32,
356 _pix_height: u32,
357 _session: &mut Session,
358 ) -> Result<(), Self::Error> {
359 self.term_size = (col_width as u16, row_height as u16);
360 if let Some(ref app) = self.app {
361 app.send_resize(col_width as u16, row_height as u16).await;
362 }
363 Ok(())
364 }
365
366 async fn auth_succeeded(&mut self, _session: &mut Session) -> Result<(), Self::Error> {
367 if let Some(ref user) = self.user {
368 tracing::info!(user = %user.username, peer = ?self.peer_addr, "authenticated");
369 }
370 Ok(())
371 }
372 }
373