Skip to main content

max / makenotwork

12.7 KB · 427 lines History Blame Raw
1 //! SFTP subsystem handler for file uploads.
2 //!
3 //! Presents a virtual filesystem with a single `/upload/` directory.
4 //! Files written here land in the staging directory on disk, from where
5 //! the TUI publish flow sends them to S3.
6
7 use std::collections::HashMap;
8 use std::path::PathBuf;
9
10 use russh_sftp::protocol::{
11 Attrs, Data, File, FileAttributes, Handle, Name, OpenFlags, Status, StatusCode, Version,
12 };
13
14 use crate::staging::{self, is_allowed_extension, sanitize_filename, STAGING_QUOTA_BYTES};
15
16 /// SFTP session handler for a single authenticated user.
17 pub struct SftpSession {
18 user_id: String,
19 creator_tier: Option<String>,
20 staging_dir: PathBuf,
21 open_files: HashMap<String, OpenFile>,
22 dir_handles: HashMap<String, bool>, // handle -> already_read
23 next_handle: u64,
24 }
25
26 struct OpenFile {
27 path: PathBuf,
28 file: tokio::fs::File,
29 }
30
31 impl SftpSession {
32 pub fn new(user_id: String, creator_tier: Option<String>, staging_dir: PathBuf) -> Self {
33 Self {
34 user_id,
35 creator_tier,
36 staging_dir,
37 open_files: HashMap::new(),
38 dir_handles: HashMap::new(),
39 next_handle: 1,
40 }
41 }
42
43 fn alloc_handle(&mut self) -> String {
44 let h = self.next_handle;
45 self.next_handle += 1;
46 format!("h{}", h)
47 }
48
49 fn ok_status(&self, id: u32) -> Status {
50 Status {
51 id,
52 status_code: StatusCode::Ok,
53 error_message: String::new(),
54 language_tag: String::new(),
55 }
56 }
57
58 fn is_basic_tier(&self) -> bool {
59 self.creator_tier.as_deref() == Some("basic")
60 }
61
62 fn is_upload_path(path: &str) -> bool {
63 let normalized = path.trim_matches('/');
64 normalized == "upload" || normalized.is_empty() || normalized == "."
65 }
66
67 fn extract_filename(path: &str) -> Option<&str> {
68 let normalized = path.trim_start_matches('/');
69 normalized.strip_prefix("upload/").or_else(|| {
70 // Direct filename without upload/ prefix
71 if !normalized.contains('/') && normalized != "upload" && !normalized.is_empty() {
72 Some(normalized)
73 } else {
74 None
75 }
76 })
77 }
78 }
79
80 impl russh_sftp::server::Handler for SftpSession {
81 type Error = StatusCode;
82
83 fn unimplemented(&self) -> Self::Error {
84 StatusCode::OpUnsupported
85 }
86
87 async fn init(
88 &mut self,
89 version: u32,
90 _extensions: HashMap<String, String>,
91 ) -> Result<Version, Self::Error> {
92 tracing::debug!(user = %self.user_id, sftp_version = version, "SFTP session initialized");
93
94 // Ensure staging dir exists
95 if let Err(e) = tokio::fs::create_dir_all(&self.staging_dir).await {
96 tracing::error!(error = ?e, "failed to create staging dir");
97 return Err(StatusCode::Failure);
98 }
99
100 Ok(Version::new())
101 }
102
103 async fn realpath(&mut self, id: u32, _path: String) -> Result<Name, Self::Error> {
104 Ok(Name {
105 id,
106 files: vec![File::dummy("/upload")],
107 })
108 }
109
110 async fn opendir(&mut self, id: u32, path: String) -> Result<Handle, Self::Error> {
111 if !Self::is_upload_path(&path) {
112 return Err(StatusCode::NoSuchFile);
113 }
114
115 let handle = self.alloc_handle();
116 self.dir_handles.insert(handle.clone(), false);
117
118 Ok(Handle { id, handle })
119 }
120
121 async fn readdir(&mut self, id: u32, handle: String) -> Result<Name, Self::Error> {
122 let already_read = self
123 .dir_handles
124 .get_mut(&handle)
125 .ok_or(StatusCode::Failure)?;
126
127 if *already_read {
128 return Err(StatusCode::Eof);
129 }
130 *already_read = true;
131
132 let staged = staging::list_staged_files(&self.staging_dir).await;
133
134 let files: Vec<File> = staged
135 .into_iter()
136 .map(|sf| {
137 let mut attrs = FileAttributes::empty();
138 attrs.set_regular(true);
139 attrs.size = Some(sf.size);
140 if let Ok(dur) = sf
141 .modified
142 .duration_since(std::time::SystemTime::UNIX_EPOCH)
143 {
144 attrs.mtime = Some(dur.as_secs() as u32);
145 }
146 File::new(sf.filename, attrs)
147 })
148 .collect();
149
150 Ok(Name { id, files })
151 }
152
153 async fn stat(&mut self, id: u32, path: String) -> Result<Attrs, Self::Error> {
154 if Self::is_upload_path(&path) {
155 let mut attrs = FileAttributes::empty();
156 attrs.set_dir(true);
157 attrs.permissions = Some(0o755);
158 return Ok(Attrs { id, attrs });
159 }
160
161 if let Some(filename) = Self::extract_filename(&path) {
162 let file_path = self.staging_dir.join(sanitize_filename(filename));
163 if let Ok(metadata) = tokio::fs::metadata(&file_path).await {
164 let attrs = FileAttributes::from(&metadata);
165 return Ok(Attrs { id, attrs });
166 }
167 }
168
169 Err(StatusCode::NoSuchFile)
170 }
171
172 async fn lstat(&mut self, id: u32, path: String) -> Result<Attrs, Self::Error> {
173 self.stat(id, path).await
174 }
175
176 async fn fstat(&mut self, id: u32, handle: String) -> Result<Attrs, Self::Error> {
177 if self.dir_handles.contains_key(&handle) {
178 let mut attrs = FileAttributes::empty();
179 attrs.set_dir(true);
180 attrs.permissions = Some(0o755);
181 return Ok(Attrs { id, attrs });
182 }
183
184 if let Some(of) = self.open_files.get(&handle)
185 && let Ok(metadata) = of.file.metadata().await
186 {
187 let attrs = FileAttributes::from(&metadata);
188 return Ok(Attrs { id, attrs });
189 }
190
191 Err(StatusCode::Failure)
192 }
193
194 async fn open(
195 &mut self,
196 id: u32,
197 filename: String,
198 pflags: OpenFlags,
199 _attrs: FileAttributes,
200 ) -> Result<Handle, Self::Error> {
201 // Only allow writing to /upload/<filename>
202 let raw_name = Self::extract_filename(&filename).ok_or(StatusCode::PermissionDenied)?;
203 let safe_name = sanitize_filename(raw_name);
204
205 if safe_name.is_empty() {
206 return Err(StatusCode::NoSuchFile);
207 }
208
209 // Check tier — Basic is text-only, no file uploads
210 if self.is_basic_tier() {
211 tracing::warn!(user = %self.user_id, "Basic tier user attempted SFTP upload");
212 return Err(StatusCode::PermissionDenied);
213 }
214
215 // Check extension
216 let ext = safe_name.rsplit('.').next().unwrap_or("").to_lowercase();
217 if !is_allowed_extension(&ext) {
218 tracing::warn!(user = %self.user_id, ext, "unsupported file extension");
219 return Err(StatusCode::PermissionDenied);
220 }
221
222 // Check staging quota before opening
223 let current_usage = staging::staging_usage(&self.staging_dir).await;
224 if current_usage >= STAGING_QUOTA_BYTES {
225 tracing::warn!(user = %self.user_id, usage = current_usage, "staging quota exceeded");
226 return Err(StatusCode::Failure);
227 }
228
229 let file_path = self.staging_dir.join(&safe_name);
230
231 if pflags.contains(OpenFlags::WRITE) || pflags.contains(OpenFlags::CREATE) {
232 // Ensure staging dir exists
233 if let Err(e) = tokio::fs::create_dir_all(&self.staging_dir).await {
234 tracing::error!(error = ?e, "failed to create staging dir");
235 return Err(StatusCode::Failure);
236 }
237
238 let file = tokio::fs::OpenOptions::new()
239 .write(true)
240 .create(true)
241 .truncate(pflags.contains(OpenFlags::TRUNCATE))
242 .open(&file_path)
243 .await
244 .map_err(|e| {
245 tracing::error!(error = ?e, "failed to open staging file for write");
246 StatusCode::Failure
247 })?;
248
249 let handle = self.alloc_handle();
250 self.open_files.insert(
251 handle.clone(),
252 OpenFile {
253 path: file_path,
254 file,
255 },
256 );
257
258 tracing::info!(user = %self.user_id, file = %safe_name, "staging file opened for write");
259 return Ok(Handle { id, handle });
260 }
261
262 if pflags.contains(OpenFlags::READ) {
263 let file = tokio::fs::File::open(&file_path).await.map_err(|_| StatusCode::NoSuchFile)?;
264 let handle = self.alloc_handle();
265 self.open_files.insert(
266 handle.clone(),
267 OpenFile {
268 path: file_path,
269 file,
270 },
271 );
272 return Ok(Handle { id, handle });
273 }
274
275 Err(StatusCode::PermissionDenied)
276 }
277
278 async fn write(
279 &mut self,
280 id: u32,
281 handle: String,
282 offset: u64,
283 data: Vec<u8>,
284 ) -> Result<Status, Self::Error> {
285 use tokio::io::{AsyncSeekExt, AsyncWriteExt};
286
287 let of = self
288 .open_files
289 .get_mut(&handle)
290 .ok_or(StatusCode::Failure)?;
291
292 // Check staging quota (approximate — race-free enforcement at close time)
293 let current_usage = staging::staging_usage(&self.staging_dir).await;
294 if current_usage + data.len() as u64 > STAGING_QUOTA_BYTES {
295 return Err(StatusCode::Failure);
296 }
297
298 of.file
299 .seek(std::io::SeekFrom::Start(offset))
300 .await
301 .map_err(|_| StatusCode::Failure)?;
302
303 of.file
304 .write_all(&data)
305 .await
306 .map_err(|_| StatusCode::Failure)?;
307
308 Ok(self.ok_status(id))
309 }
310
311 async fn read(
312 &mut self,
313 id: u32,
314 handle: String,
315 offset: u64,
316 len: u32,
317 ) -> Result<Data, Self::Error> {
318 use tokio::io::{AsyncReadExt, AsyncSeekExt};
319
320 let of = self
321 .open_files
322 .get_mut(&handle)
323 .ok_or(StatusCode::Failure)?;
324
325 of.file
326 .seek(std::io::SeekFrom::Start(offset))
327 .await
328 .map_err(|_| StatusCode::Failure)?;
329
330 let mut buf = vec![0u8; len as usize];
331 let n = of
332 .file
333 .read(&mut buf)
334 .await
335 .map_err(|_| StatusCode::Failure)?;
336
337 if n == 0 {
338 return Err(StatusCode::Eof);
339 }
340
341 buf.truncate(n);
342 Ok(Data { id, data: buf })
343 }
344
345 async fn close(&mut self, id: u32, handle: String) -> Result<Status, Self::Error> {
346 if self.dir_handles.remove(&handle).is_some() {
347 return Ok(self.ok_status(id));
348 }
349
350 if let Some(of) = self.open_files.remove(&handle) {
351 drop(of.file);
352 tracing::debug!(user = %self.user_id, path = %of.path.display(), "file handle closed");
353 return Ok(self.ok_status(id));
354 }
355
356 Err(StatusCode::Failure)
357 }
358
359 async fn remove(&mut self, id: u32, filename: String) -> Result<Status, Self::Error> {
360 let raw_name = Self::extract_filename(&filename).ok_or(StatusCode::NoSuchFile)?;
361 let safe_name = sanitize_filename(raw_name);
362 let file_path = self.staging_dir.join(&safe_name);
363
364 tokio::fs::remove_file(&file_path)
365 .await
366 .map_err(|_| StatusCode::NoSuchFile)?;
367
368 tracing::info!(user = %self.user_id, file = %safe_name, "staging file removed");
369 Ok(self.ok_status(id))
370 }
371
372 async fn mkdir(
373 &mut self,
374 _id: u32,
375 _path: String,
376 _attrs: FileAttributes,
377 ) -> Result<Status, Self::Error> {
378 Err(StatusCode::PermissionDenied)
379 }
380
381 async fn rmdir(&mut self, _id: u32, _path: String) -> Result<Status, Self::Error> {
382 Err(StatusCode::PermissionDenied)
383 }
384
385 async fn rename(
386 &mut self,
387 _id: u32,
388 _oldpath: String,
389 _newpath: String,
390 ) -> Result<Status, Self::Error> {
391 Err(StatusCode::PermissionDenied)
392 }
393
394 async fn symlink(
395 &mut self,
396 _id: u32,
397 _linkpath: String,
398 _targetpath: String,
399 ) -> Result<Status, Self::Error> {
400 Err(StatusCode::OpUnsupported)
401 }
402
403 async fn readlink(&mut self, _id: u32, _path: String) -> Result<Name, Self::Error> {
404 Err(StatusCode::OpUnsupported)
405 }
406
407 async fn setstat(
408 &mut self,
409 id: u32,
410 _path: String,
411 _attrs: FileAttributes,
412 ) -> Result<Status, Self::Error> {
413 // Silently accept — some SFTP clients send setstat after upload
414 Ok(self.ok_status(id))
415 }
416
417 async fn fsetstat(
418 &mut self,
419 id: u32,
420 _handle: String,
421 _attrs: FileAttributes,
422 ) -> Result<Status, Self::Error> {
423 // Silently accept
424 Ok(self.ok_status(id))
425 }
426 }
427