//! Shared S3-compatible storage client. //! //! Thin wrapper around the AWS SDK providing upload, download, delete, //! presigned URL generation, and bucket management. Used by MNW and //! Multithreaded to avoid duplicating S3 initialization and operations. use aws_config::BehaviorVersion; use aws_sdk_s3::config::{Credentials, Region}; use aws_sdk_s3::presigning::PresigningConfig; use aws_sdk_s3::types::{ CompletedMultipartUpload, CompletedPart, CorsConfiguration, CorsRule, Delete, ObjectIdentifier, }; use aws_sdk_s3::Client; use std::time::Duration; pub use aws_sdk_s3::primitives::ByteStream; /// S3 connection configuration. #[derive(Debug, Clone)] pub struct S3Config { /// Endpoint URL (e.g., `https://fsn1.your-objectstorage.com`) pub endpoint: String, /// Bucket name pub bucket: String, /// Access key ID pub access_key: String, /// Secret access key pub secret_key: String, /// Region (e.g., `fsn1`) pub region: String, } /// S3 client wrapper. #[derive(Clone)] pub struct S3Client { client: Client, bucket: String, } impl S3Client { /// Create a new S3 client from configuration. pub async fn new(config: &S3Config) -> Result { let credentials = Credentials::new( &config.access_key, &config.secret_key, None, None, "s3-storage", ); let s3_config = aws_sdk_s3::Config::builder() .behavior_version(BehaviorVersion::latest()) .region(Region::new(config.region.clone())) .endpoint_url(&config.endpoint) .credentials_provider(credentials) .force_path_style(true) .build(); let client = Client::from_conf(s3_config); Ok(Self { client, bucket: config.bucket.clone(), }) } /// Bucket name accessor. pub fn bucket(&self) -> &str { &self.bucket } /// Upload bytes to S3. pub async fn upload( &self, key: &str, content_type: &str, data: Vec, cache_control: Option<&str>, ) -> Result<(), String> { let mut req = self .client .put_object() .bucket(&self.bucket) .key(key) .content_type(content_type) .body(data.into()); if let Some(cc) = cache_control { req = req.cache_control(cc); } req.send() .await .map_err(|e| format!("S3 upload failed: {e}"))?; Ok(()) } /// Download bytes from S3. Returns `(data, content_type)`. pub async fn download(&self, key: &str) -> Result<(Vec, String), String> { let resp = self .client .get_object() .bucket(&self.bucket) .key(key) .send() .await .map_err(|e| format!("S3 download failed: {e}"))?; let content_type = resp .content_type() .unwrap_or("application/octet-stream") .to_string(); let bytes = resp .body .collect() .await .map_err(|e| format!("S3 read body failed: {e}"))?; Ok((bytes.into_bytes().to_vec(), content_type)) } /// Stream an object's body from S3 without buffering. Caller drives the /// `ByteStream` to disk or hands it to a layer that wants chunks. pub async fn download_stream( &self, key: &str, ) -> Result { let resp = self .client .get_object() .bucket(&self.bucket) .key(key) .send() .await .map_err(|e| format!("S3 download failed: {e}"))?; Ok(resp.body) } /// Delete an object from S3. pub async fn delete(&self, key: &str) -> Result<(), String> { self.client .delete_object() .bucket(&self.bucket) .key(key) .send() .await .map_err(|e| format!("S3 delete failed: {e}"))?; Ok(()) } /// Delete a batch of objects in a single S3 `DeleteObjects` request. /// /// S3 accepts up to 1000 keys per call; the caller is responsible for /// chunking. Returns the keys that failed (if any) along with their /// per-object error message. A successful response with `Errors` is /// not a hard error — partial success is normal for batched deletes. pub async fn delete_objects(&self, keys: &[String]) -> Result, String> { if keys.is_empty() { return Ok(Vec::new()); } let objects: Vec = keys .iter() .filter_map(|k| ObjectIdentifier::builder().key(k).build().ok()) .collect(); let delete = Delete::builder() .set_objects(Some(objects)) .quiet(true) .build() .map_err(|e| format!("S3 delete_objects build failed: {e}"))?; let resp = self .client .delete_objects() .bucket(&self.bucket) .delete(delete) .send() .await .map_err(|e| format!("S3 delete_objects failed: {e}"))?; let failures = resp .errors .unwrap_or_default() .into_iter() .filter_map(|err| { let key = err.key?; let msg = err.message.unwrap_or_else(|| "".into()); Some((key, msg)) }) .collect(); Ok(failures) } /// Delete all objects under a given key prefix. /// /// Lists in pages of 1000 and deletes each page in a single batched /// `DeleteObjects` call (S3's max). For a 50k-key prefix this is 50 /// round-trips instead of 50,000. pub async fn delete_prefix(&self, prefix: &str) -> Result<(), String> { let mut continuation_token: Option = None; loop { let mut req = self .client .list_objects_v2() .bucket(&self.bucket) .prefix(prefix) .max_keys(1000); if let Some(ref token) = continuation_token { req = req.continuation_token(token); } let resp = req .send() .await .map_err(|e| format!("S3 list objects failed: {e}"))?; let keys: Vec = resp .contents .unwrap_or_default() .into_iter() .filter_map(|obj| obj.key) .collect(); if !keys.is_empty() { let failures = self.delete_objects(&keys).await?; if !failures.is_empty() { // Don't bubble — partial-success is normal. Log first few. let preview: Vec = failures .iter() .take(5) .map(|(k, e)| format!("{k}: {e}")) .collect(); return Err(format!( "S3 delete_prefix partial failure: {} keys failed (first 5: {})", failures.len(), preview.join(", ") )); } } if resp.is_truncated.unwrap_or(false) { continuation_token = resp.next_continuation_token; } else { break; } } Ok(()) } /// Check if an object exists in S3. pub async fn object_exists(&self, key: &str) -> Result { match self .client .head_object() .bucket(&self.bucket) .key(key) .send() .await { Ok(_) => Ok(true), Err(e) => { let service_error = e.into_service_error(); if service_error.is_not_found() { Ok(false) } else { Err(format!("S3 head_object failed: {service_error}")) } } } } /// Get the size of an object in bytes, or `None` if not found. pub async fn object_size(&self, key: &str) -> Result, String> { match self .client .head_object() .bucket(&self.bucket) .key(key) .send() .await { Ok(resp) => Ok(resp.content_length()), Err(e) => { let service_error = e.into_service_error(); if service_error.is_not_found() { Ok(None) } else { Err(format!("S3 head_object failed: {service_error}")) } } } } /// Generate a presigned URL for uploading. /// /// When `max_bytes` is set, the value is signed into the request as /// `Content-Length`, so S3 rejects PUTs that don't match the exact length /// at the protocol level — prevents an attacker (or a confused client) /// from burning tier storage by uploading past the per-file cap. The /// uploading client MUST send a `Content-Length` header equal to /// `max_bytes` for the PUT to succeed (browsers/fetch/XHR/curl do this /// automatically when given a sized body). pub async fn presign_upload( &self, key: &str, content_type: &str, expiry_secs: u64, cache_control: Option<&str>, max_bytes: Option, ) -> Result { let presigning_config = PresigningConfig::builder() .expires_in(Duration::from_secs(expiry_secs)) .build() .map_err(|e| format!("Presigning config error: {e}"))?; let mut req = self .client .put_object() .bucket(&self.bucket) .key(key) .content_type(content_type); if let Some(cc) = cache_control { req = req.cache_control(cc); } if let Some(n) = max_bytes { req = req.content_length(n); } let presigned = req .presigned(presigning_config) .await .map_err(|e| format!("Failed to generate upload URL: {e}"))?; Ok(presigned.uri().to_string()) } /// Generate a presigned URL for downloading. pub async fn presign_download( &self, key: &str, expiry_secs: u64, ) -> Result { let presigning_config = PresigningConfig::builder() .expires_in(Duration::from_secs(expiry_secs)) .build() .map_err(|e| format!("Presigning config error: {e}"))?; let presigned = self .client .get_object() .bucket(&self.bucket) .key(key) .presigned(presigning_config) .await .map_err(|e| format!("Failed to generate download URL: {e}"))?; Ok(presigned.uri().to_string()) } /// Upload a file to S3 using multipart upload. /// /// Reads the file in `part_size` chunks (default 10 MB, minimum 5 MB) and /// uploads each as a part. Aborts the multipart upload on failure. pub async fn upload_multipart( &self, key: &str, content_type: &str, file_path: &std::path::Path, part_size: Option, ) -> Result<(), String> { use tokio::io::AsyncReadExt; let part_size = part_size.unwrap_or(10 * 1024 * 1024); // 10 MB default if part_size < 5 * 1024 * 1024 { return Err("Multipart part size must be at least 5 MB".to_string()); } let create = self .client .create_multipart_upload() .bucket(&self.bucket) .key(key) .content_type(content_type) .send() .await .map_err(|e| format!("S3 create multipart upload failed: {e}"))?; let upload_id = create .upload_id() .ok_or("S3 create multipart upload returned no upload_id")? .to_string(); let mut file = tokio::fs::File::open(file_path) .await .map_err(|e| format!("Failed to open file for multipart upload: {e}"))?; let mut part_number: i32 = 1; let mut completed_parts: Vec = Vec::new(); let mut buf = vec![0u8; part_size]; loop { let mut bytes_read = 0; // Fill the buffer completely (or until EOF) while bytes_read < part_size { match file.read(&mut buf[bytes_read..]).await { Ok(0) => break, Ok(n) => bytes_read += n, Err(e) => { self.abort_multipart_upload(key, &upload_id).await; return Err(format!("Failed to read file: {e}")); } } } if bytes_read == 0 { break; } // Retry the part upload up to 3 times on transient failures. // S3 part uploads can flake on network blips; aborting the // whole multipart upload because of one timeout means the // caller has to restart from byte 0. Three attempts with // exponential backoff covers the common transient cases // without making a permanent failure (auth, oversize, etc.) // wait forever. let mut attempt: u32 = 0; let resp = loop { attempt += 1; let body = aws_sdk_s3::primitives::ByteStream::from(buf[..bytes_read].to_vec()); match self .client .upload_part() .bucket(&self.bucket) .key(key) .upload_id(&upload_id) .part_number(part_number) .body(body) .send() .await { Ok(resp) => break Ok(resp), Err(e) if attempt < 3 => { // Backoff: 200ms, 800ms. Cheap enough not to // mask a permanent failure; long enough that // a brief network glitch resolves. let delay_ms = 200u64 * (1u64 << (attempt - 1) * 2); tracing::warn!( part_number, attempt, delay_ms, error = ?e, "S3 upload_part transient failure, retrying" ); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; continue; } Err(e) => break Err(e), } }; match resp { Ok(resp) => { let etag = resp.e_tag().unwrap_or_default().to_string(); completed_parts.push( CompletedPart::builder() .e_tag(etag) .part_number(part_number) .build(), ); } Err(e) => { self.abort_multipart_upload(key, &upload_id).await; return Err(format!("S3 upload part {part_number} failed after retries: {e}")); } } part_number += 1; } if completed_parts.is_empty() { self.abort_multipart_upload(key, &upload_id).await; return Err("No parts uploaded (empty file)".to_string()); } let completed = CompletedMultipartUpload::builder() .set_parts(Some(completed_parts)) .build(); self.client .complete_multipart_upload() .bucket(&self.bucket) .key(key) .upload_id(&upload_id) .multipart_upload(completed) .send() .await .map_err(|e| format!("S3 complete multipart upload failed: {e}"))?; Ok(()) } /// Abort a multipart upload (best-effort, logs on failure). async fn abort_multipart_upload(&self, key: &str, upload_id: &str) { if let Err(e) = self .client .abort_multipart_upload() .bucket(&self.bucket) .key(key) .upload_id(upload_id) .send() .await { tracing::warn!("Failed to abort multipart upload for {key}: {e}"); } } /// Configure CORS on the bucket for browser uploads. pub async fn configure_cors(&self, allowed_origin: &str) { let origin = allowed_origin.trim_end_matches('/').to_string(); let rule = match CorsRule::builder() .allowed_origins(&origin) .allowed_methods("PUT") .allowed_methods("GET") .allowed_methods("HEAD") .allowed_headers("Content-Type") .allowed_headers("Cache-Control") .allowed_headers("Content-Disposition") .expose_headers("ETag") .max_age_seconds(3600) .build() { Ok(r) => r, Err(e) => { tracing::warn!("Failed to build CORS rule: {}", e); return; } }; let cors_config = match CorsConfiguration::builder().cors_rules(rule).build() { Ok(c) => c, Err(e) => { tracing::warn!("Failed to build CORS config: {}", e); return; } }; match self .client .put_bucket_cors() .bucket(&self.bucket) .cors_configuration(cors_config) .send() .await { Ok(_) => tracing::info!("S3 bucket CORS configured for {}", origin), Err(e) => tracing::warn!("Failed to configure S3 CORS: {}", e), } } /// Lightweight connectivity check — `list_objects_v2` with `max_keys(0)`. pub async fn check_connectivity(&self) -> Result<(), String> { self.client .list_objects_v2() .bucket(&self.bucket) .max_keys(0) .send() .await .map(|_| ()) .map_err(|e| format!("{e}")) } }