//! SFTP subsystem handler for file uploads. //! //! Presents a virtual filesystem with a single `/upload/` directory. //! Files written here land in the staging directory on disk, from where //! the TUI publish flow sends them to S3. use std::collections::HashMap; use std::path::PathBuf; use russh_sftp::protocol::{ Attrs, Data, File, FileAttributes, Handle, Name, OpenFlags, Status, StatusCode, Version, }; use crate::staging::{self, is_allowed_extension, sanitize_filename, STAGING_QUOTA_BYTES}; /// SFTP session handler for a single authenticated user. pub struct SftpSession { user_id: String, creator_tier: Option, staging_dir: PathBuf, open_files: HashMap, dir_handles: HashMap, // handle -> already_read next_handle: u64, } struct OpenFile { path: PathBuf, file: tokio::fs::File, } impl SftpSession { pub fn new(user_id: String, creator_tier: Option, staging_dir: PathBuf) -> Self { Self { user_id, creator_tier, staging_dir, open_files: HashMap::new(), dir_handles: HashMap::new(), next_handle: 1, } } fn alloc_handle(&mut self) -> String { let h = self.next_handle; self.next_handle += 1; format!("h{}", h) } fn ok_status(&self, id: u32) -> Status { Status { id, status_code: StatusCode::Ok, error_message: String::new(), language_tag: String::new(), } } fn is_basic_tier(&self) -> bool { self.creator_tier.as_deref() == Some("basic") } fn is_upload_path(path: &str) -> bool { let normalized = path.trim_matches('/'); normalized == "upload" || normalized.is_empty() || normalized == "." } fn extract_filename(path: &str) -> Option<&str> { let normalized = path.trim_start_matches('/'); normalized.strip_prefix("upload/").or_else(|| { // Direct filename without upload/ prefix if !normalized.contains('/') && normalized != "upload" && !normalized.is_empty() { Some(normalized) } else { None } }) } } impl russh_sftp::server::Handler for SftpSession { type Error = StatusCode; fn unimplemented(&self) -> Self::Error { StatusCode::OpUnsupported } async fn init( &mut self, version: u32, _extensions: HashMap, ) -> Result { tracing::debug!(user = %self.user_id, sftp_version = version, "SFTP session initialized"); // Ensure staging dir exists if let Err(e) = tokio::fs::create_dir_all(&self.staging_dir).await { tracing::error!(error = ?e, "failed to create staging dir"); return Err(StatusCode::Failure); } Ok(Version::new()) } async fn realpath(&mut self, id: u32, _path: String) -> Result { Ok(Name { id, files: vec![File::dummy("/upload")], }) } async fn opendir(&mut self, id: u32, path: String) -> Result { if !Self::is_upload_path(&path) { return Err(StatusCode::NoSuchFile); } let handle = self.alloc_handle(); self.dir_handles.insert(handle.clone(), false); Ok(Handle { id, handle }) } async fn readdir(&mut self, id: u32, handle: String) -> Result { let already_read = self .dir_handles .get_mut(&handle) .ok_or(StatusCode::Failure)?; if *already_read { return Err(StatusCode::Eof); } *already_read = true; let staged = staging::list_staged_files(&self.staging_dir).await; let files: Vec = staged .into_iter() .map(|sf| { let mut attrs = FileAttributes::empty(); attrs.set_regular(true); attrs.size = Some(sf.size); if let Ok(dur) = sf .modified .duration_since(std::time::SystemTime::UNIX_EPOCH) { attrs.mtime = Some(dur.as_secs() as u32); } File::new(sf.filename, attrs) }) .collect(); Ok(Name { id, files }) } async fn stat(&mut self, id: u32, path: String) -> Result { if Self::is_upload_path(&path) { let mut attrs = FileAttributes::empty(); attrs.set_dir(true); attrs.permissions = Some(0o755); return Ok(Attrs { id, attrs }); } if let Some(filename) = Self::extract_filename(&path) { let file_path = self.staging_dir.join(sanitize_filename(filename)); if let Ok(metadata) = tokio::fs::metadata(&file_path).await { let attrs = FileAttributes::from(&metadata); return Ok(Attrs { id, attrs }); } } Err(StatusCode::NoSuchFile) } async fn lstat(&mut self, id: u32, path: String) -> Result { self.stat(id, path).await } async fn fstat(&mut self, id: u32, handle: String) -> Result { if self.dir_handles.contains_key(&handle) { let mut attrs = FileAttributes::empty(); attrs.set_dir(true); attrs.permissions = Some(0o755); return Ok(Attrs { id, attrs }); } if let Some(of) = self.open_files.get(&handle) && let Ok(metadata) = of.file.metadata().await { let attrs = FileAttributes::from(&metadata); return Ok(Attrs { id, attrs }); } Err(StatusCode::Failure) } async fn open( &mut self, id: u32, filename: String, pflags: OpenFlags, _attrs: FileAttributes, ) -> Result { // Only allow writing to /upload/ let raw_name = Self::extract_filename(&filename).ok_or(StatusCode::PermissionDenied)?; let safe_name = sanitize_filename(raw_name); if safe_name.is_empty() { return Err(StatusCode::NoSuchFile); } // Check tier — Basic is text-only, no file uploads if self.is_basic_tier() { tracing::warn!(user = %self.user_id, "Basic tier user attempted SFTP upload"); return Err(StatusCode::PermissionDenied); } // Check extension let ext = safe_name.rsplit('.').next().unwrap_or("").to_lowercase(); if !is_allowed_extension(&ext) { tracing::warn!(user = %self.user_id, ext, "unsupported file extension"); return Err(StatusCode::PermissionDenied); } // Check staging quota before opening let current_usage = staging::staging_usage(&self.staging_dir).await; if current_usage >= STAGING_QUOTA_BYTES { tracing::warn!(user = %self.user_id, usage = current_usage, "staging quota exceeded"); return Err(StatusCode::Failure); } let file_path = self.staging_dir.join(&safe_name); if pflags.contains(OpenFlags::WRITE) || pflags.contains(OpenFlags::CREATE) { // Ensure staging dir exists if let Err(e) = tokio::fs::create_dir_all(&self.staging_dir).await { tracing::error!(error = ?e, "failed to create staging dir"); return Err(StatusCode::Failure); } let file = tokio::fs::OpenOptions::new() .write(true) .create(true) .truncate(pflags.contains(OpenFlags::TRUNCATE)) .open(&file_path) .await .map_err(|e| { tracing::error!(error = ?e, "failed to open staging file for write"); StatusCode::Failure })?; let handle = self.alloc_handle(); self.open_files.insert( handle.clone(), OpenFile { path: file_path, file, }, ); tracing::info!(user = %self.user_id, file = %safe_name, "staging file opened for write"); return Ok(Handle { id, handle }); } if pflags.contains(OpenFlags::READ) { let file = tokio::fs::File::open(&file_path).await.map_err(|_| StatusCode::NoSuchFile)?; let handle = self.alloc_handle(); self.open_files.insert( handle.clone(), OpenFile { path: file_path, file, }, ); return Ok(Handle { id, handle }); } Err(StatusCode::PermissionDenied) } async fn write( &mut self, id: u32, handle: String, offset: u64, data: Vec, ) -> Result { use tokio::io::{AsyncSeekExt, AsyncWriteExt}; let of = self .open_files .get_mut(&handle) .ok_or(StatusCode::Failure)?; // Check staging quota (approximate — race-free enforcement at close time) let current_usage = staging::staging_usage(&self.staging_dir).await; if current_usage + data.len() as u64 > STAGING_QUOTA_BYTES { return Err(StatusCode::Failure); } of.file .seek(std::io::SeekFrom::Start(offset)) .await .map_err(|_| StatusCode::Failure)?; of.file .write_all(&data) .await .map_err(|_| StatusCode::Failure)?; Ok(self.ok_status(id)) } async fn read( &mut self, id: u32, handle: String, offset: u64, len: u32, ) -> Result { use tokio::io::{AsyncReadExt, AsyncSeekExt}; let of = self .open_files .get_mut(&handle) .ok_or(StatusCode::Failure)?; of.file .seek(std::io::SeekFrom::Start(offset)) .await .map_err(|_| StatusCode::Failure)?; let mut buf = vec![0u8; len as usize]; let n = of .file .read(&mut buf) .await .map_err(|_| StatusCode::Failure)?; if n == 0 { return Err(StatusCode::Eof); } buf.truncate(n); Ok(Data { id, data: buf }) } async fn close(&mut self, id: u32, handle: String) -> Result { if self.dir_handles.remove(&handle).is_some() { return Ok(self.ok_status(id)); } if let Some(of) = self.open_files.remove(&handle) { drop(of.file); tracing::debug!(user = %self.user_id, path = %of.path.display(), "file handle closed"); return Ok(self.ok_status(id)); } Err(StatusCode::Failure) } async fn remove(&mut self, id: u32, filename: String) -> Result { let raw_name = Self::extract_filename(&filename).ok_or(StatusCode::NoSuchFile)?; let safe_name = sanitize_filename(raw_name); let file_path = self.staging_dir.join(&safe_name); tokio::fs::remove_file(&file_path) .await .map_err(|_| StatusCode::NoSuchFile)?; tracing::info!(user = %self.user_id, file = %safe_name, "staging file removed"); Ok(self.ok_status(id)) } async fn mkdir( &mut self, _id: u32, _path: String, _attrs: FileAttributes, ) -> Result { Err(StatusCode::PermissionDenied) } async fn rmdir(&mut self, _id: u32, _path: String) -> Result { Err(StatusCode::PermissionDenied) } async fn rename( &mut self, _id: u32, _oldpath: String, _newpath: String, ) -> Result { Err(StatusCode::PermissionDenied) } async fn symlink( &mut self, _id: u32, _linkpath: String, _targetpath: String, ) -> Result { Err(StatusCode::OpUnsupported) } async fn readlink(&mut self, _id: u32, _path: String) -> Result { Err(StatusCode::OpUnsupported) } async fn setstat( &mut self, id: u32, _path: String, _attrs: FileAttributes, ) -> Result { // Silently accept — some SFTP clients send setstat after upload Ok(self.ok_status(id)) } async fn fsetstat( &mut self, id: u32, _handle: String, _attrs: FileAttributes, ) -> Result { // Silently accept Ok(self.ok_status(id)) } }