Skip to main content

max / makenotwork

18.2 KB · 572 lines History Blame Raw
1 //! Shared S3-compatible storage client.
2 //!
3 //! Thin wrapper around the AWS SDK providing upload, download, delete,
4 //! presigned URL generation, and bucket management. Used by MNW and
5 //! Multithreaded to avoid duplicating S3 initialization and operations.
6
7 use aws_config::BehaviorVersion;
8 use aws_sdk_s3::config::{Credentials, Region};
9 use aws_sdk_s3::presigning::PresigningConfig;
10 use aws_sdk_s3::types::{
11 CompletedMultipartUpload, CompletedPart, CorsConfiguration, CorsRule, Delete,
12 ObjectIdentifier,
13 };
14 use aws_sdk_s3::Client;
15 use std::time::Duration;
16
17 pub use aws_sdk_s3::primitives::ByteStream;
18
19 /// S3 connection configuration.
20 #[derive(Debug, Clone)]
21 pub struct S3Config {
22 /// Endpoint URL (e.g., `https://fsn1.your-objectstorage.com`)
23 pub endpoint: String,
24 /// Bucket name
25 pub bucket: String,
26 /// Access key ID
27 pub access_key: String,
28 /// Secret access key
29 pub secret_key: String,
30 /// Region (e.g., `fsn1`)
31 pub region: String,
32 }
33
34 /// S3 client wrapper.
35 #[derive(Clone)]
36 pub struct S3Client {
37 client: Client,
38 bucket: String,
39 }
40
41 impl S3Client {
42 /// Create a new S3 client from configuration.
43 pub async fn new(config: &S3Config) -> Result<Self, String> {
44 let credentials = Credentials::new(
45 &config.access_key,
46 &config.secret_key,
47 None,
48 None,
49 "s3-storage",
50 );
51
52 let s3_config = aws_sdk_s3::Config::builder()
53 .behavior_version(BehaviorVersion::latest())
54 .region(Region::new(config.region.clone()))
55 .endpoint_url(&config.endpoint)
56 .credentials_provider(credentials)
57 .force_path_style(true)
58 .build();
59
60 let client = Client::from_conf(s3_config);
61
62 Ok(Self {
63 client,
64 bucket: config.bucket.clone(),
65 })
66 }
67
68 /// Bucket name accessor.
69 pub fn bucket(&self) -> &str {
70 &self.bucket
71 }
72
73 /// Upload bytes to S3.
74 pub async fn upload(
75 &self,
76 key: &str,
77 content_type: &str,
78 data: Vec<u8>,
79 cache_control: Option<&str>,
80 ) -> Result<(), String> {
81 let mut req = self
82 .client
83 .put_object()
84 .bucket(&self.bucket)
85 .key(key)
86 .content_type(content_type)
87 .body(data.into());
88
89 if let Some(cc) = cache_control {
90 req = req.cache_control(cc);
91 }
92
93 req.send()
94 .await
95 .map_err(|e| format!("S3 upload failed: {e}"))?;
96
97 Ok(())
98 }
99
100 /// Download bytes from S3. Returns `(data, content_type)`.
101 pub async fn download(&self, key: &str) -> Result<(Vec<u8>, String), String> {
102 let resp = self
103 .client
104 .get_object()
105 .bucket(&self.bucket)
106 .key(key)
107 .send()
108 .await
109 .map_err(|e| format!("S3 download failed: {e}"))?;
110
111 let content_type = resp
112 .content_type()
113 .unwrap_or("application/octet-stream")
114 .to_string();
115
116 let bytes = resp
117 .body
118 .collect()
119 .await
120 .map_err(|e| format!("S3 read body failed: {e}"))?;
121
122 Ok((bytes.into_bytes().to_vec(), content_type))
123 }
124
125 /// Stream an object's body from S3 without buffering. Caller drives the
126 /// `ByteStream` to disk or hands it to a layer that wants chunks.
127 pub async fn download_stream(
128 &self,
129 key: &str,
130 ) -> Result<aws_sdk_s3::primitives::ByteStream, String> {
131 let resp = self
132 .client
133 .get_object()
134 .bucket(&self.bucket)
135 .key(key)
136 .send()
137 .await
138 .map_err(|e| format!("S3 download failed: {e}"))?;
139
140 Ok(resp.body)
141 }
142
143 /// Delete an object from S3.
144 pub async fn delete(&self, key: &str) -> Result<(), String> {
145 self.client
146 .delete_object()
147 .bucket(&self.bucket)
148 .key(key)
149 .send()
150 .await
151 .map_err(|e| format!("S3 delete failed: {e}"))?;
152
153 Ok(())
154 }
155
156 /// Delete a batch of objects in a single S3 `DeleteObjects` request.
157 ///
158 /// S3 accepts up to 1000 keys per call; the caller is responsible for
159 /// chunking. Returns the keys that failed (if any) along with their
160 /// per-object error message. A successful response with `Errors` is
161 /// not a hard error — partial success is normal for batched deletes.
162 pub async fn delete_objects(&self, keys: &[String]) -> Result<Vec<(String, String)>, String> {
163 if keys.is_empty() {
164 return Ok(Vec::new());
165 }
166 let objects: Vec<ObjectIdentifier> = keys
167 .iter()
168 .filter_map(|k| ObjectIdentifier::builder().key(k).build().ok())
169 .collect();
170 let delete = Delete::builder()
171 .set_objects(Some(objects))
172 .quiet(true)
173 .build()
174 .map_err(|e| format!("S3 delete_objects build failed: {e}"))?;
175 let resp = self
176 .client
177 .delete_objects()
178 .bucket(&self.bucket)
179 .delete(delete)
180 .send()
181 .await
182 .map_err(|e| format!("S3 delete_objects failed: {e}"))?;
183 let failures = resp
184 .errors
185 .unwrap_or_default()
186 .into_iter()
187 .filter_map(|err| {
188 let key = err.key?;
189 let msg = err.message.unwrap_or_else(|| "<no message>".into());
190 Some((key, msg))
191 })
192 .collect();
193 Ok(failures)
194 }
195
196 /// Delete all objects under a given key prefix.
197 ///
198 /// Lists in pages of 1000 and deletes each page in a single batched
199 /// `DeleteObjects` call (S3's max). For a 50k-key prefix this is 50
200 /// round-trips instead of 50,000.
201 pub async fn delete_prefix(&self, prefix: &str) -> Result<(), String> {
202 let mut continuation_token: Option<String> = None;
203 loop {
204 let mut req = self
205 .client
206 .list_objects_v2()
207 .bucket(&self.bucket)
208 .prefix(prefix)
209 .max_keys(1000);
210 if let Some(ref token) = continuation_token {
211 req = req.continuation_token(token);
212 }
213 let resp = req
214 .send()
215 .await
216 .map_err(|e| format!("S3 list objects failed: {e}"))?;
217
218 let keys: Vec<String> = resp
219 .contents
220 .unwrap_or_default()
221 .into_iter()
222 .filter_map(|obj| obj.key)
223 .collect();
224
225 if !keys.is_empty() {
226 let failures = self.delete_objects(&keys).await?;
227 if !failures.is_empty() {
228 // Don't bubble — partial-success is normal. Log first few.
229 let preview: Vec<String> = failures
230 .iter()
231 .take(5)
232 .map(|(k, e)| format!("{k}: {e}"))
233 .collect();
234 return Err(format!(
235 "S3 delete_prefix partial failure: {} keys failed (first 5: {})",
236 failures.len(),
237 preview.join(", ")
238 ));
239 }
240 }
241
242 if resp.is_truncated.unwrap_or(false) {
243 continuation_token = resp.next_continuation_token;
244 } else {
245 break;
246 }
247 }
248 Ok(())
249 }
250
251 /// Check if an object exists in S3.
252 pub async fn object_exists(&self, key: &str) -> Result<bool, String> {
253 match self
254 .client
255 .head_object()
256 .bucket(&self.bucket)
257 .key(key)
258 .send()
259 .await
260 {
261 Ok(_) => Ok(true),
262 Err(e) => {
263 let service_error = e.into_service_error();
264 if service_error.is_not_found() {
265 Ok(false)
266 } else {
267 Err(format!("S3 head_object failed: {service_error}"))
268 }
269 }
270 }
271 }
272
273 /// Get the size of an object in bytes, or `None` if not found.
274 pub async fn object_size(&self, key: &str) -> Result<Option<i64>, String> {
275 match self
276 .client
277 .head_object()
278 .bucket(&self.bucket)
279 .key(key)
280 .send()
281 .await
282 {
283 Ok(resp) => Ok(resp.content_length()),
284 Err(e) => {
285 let service_error = e.into_service_error();
286 if service_error.is_not_found() {
287 Ok(None)
288 } else {
289 Err(format!("S3 head_object failed: {service_error}"))
290 }
291 }
292 }
293 }
294
295 /// Generate a presigned URL for uploading.
296 ///
297 /// When `max_bytes` is set, the value is signed into the request as
298 /// `Content-Length`, so S3 rejects PUTs that don't match the exact length
299 /// at the protocol level — prevents an attacker (or a confused client)
300 /// from burning tier storage by uploading past the per-file cap. The
301 /// uploading client MUST send a `Content-Length` header equal to
302 /// `max_bytes` for the PUT to succeed (browsers/fetch/XHR/curl do this
303 /// automatically when given a sized body).
304 pub async fn presign_upload(
305 &self,
306 key: &str,
307 content_type: &str,
308 expiry_secs: u64,
309 cache_control: Option<&str>,
310 max_bytes: Option<i64>,
311 ) -> Result<String, String> {
312 let presigning_config = PresigningConfig::builder()
313 .expires_in(Duration::from_secs(expiry_secs))
314 .build()
315 .map_err(|e| format!("Presigning config error: {e}"))?;
316
317 let mut req = self
318 .client
319 .put_object()
320 .bucket(&self.bucket)
321 .key(key)
322 .content_type(content_type);
323
324 if let Some(cc) = cache_control {
325 req = req.cache_control(cc);
326 }
327
328 if let Some(n) = max_bytes {
329 req = req.content_length(n);
330 }
331
332 let presigned = req
333 .presigned(presigning_config)
334 .await
335 .map_err(|e| format!("Failed to generate upload URL: {e}"))?;
336
337 Ok(presigned.uri().to_string())
338 }
339
340 /// Generate a presigned URL for downloading.
341 pub async fn presign_download(
342 &self,
343 key: &str,
344 expiry_secs: u64,
345 ) -> Result<String, String> {
346 let presigning_config = PresigningConfig::builder()
347 .expires_in(Duration::from_secs(expiry_secs))
348 .build()
349 .map_err(|e| format!("Presigning config error: {e}"))?;
350
351 let presigned = self
352 .client
353 .get_object()
354 .bucket(&self.bucket)
355 .key(key)
356 .presigned(presigning_config)
357 .await
358 .map_err(|e| format!("Failed to generate download URL: {e}"))?;
359
360 Ok(presigned.uri().to_string())
361 }
362
363 /// Upload a file to S3 using multipart upload.
364 ///
365 /// Reads the file in `part_size` chunks (default 10 MB, minimum 5 MB) and
366 /// uploads each as a part. Aborts the multipart upload on failure.
367 pub async fn upload_multipart(
368 &self,
369 key: &str,
370 content_type: &str,
371 file_path: &std::path::Path,
372 part_size: Option<usize>,
373 ) -> Result<(), String> {
374 use tokio::io::AsyncReadExt;
375
376 let part_size = part_size.unwrap_or(10 * 1024 * 1024); // 10 MB default
377 if part_size < 5 * 1024 * 1024 {
378 return Err("Multipart part size must be at least 5 MB".to_string());
379 }
380
381 let create = self
382 .client
383 .create_multipart_upload()
384 .bucket(&self.bucket)
385 .key(key)
386 .content_type(content_type)
387 .send()
388 .await
389 .map_err(|e| format!("S3 create multipart upload failed: {e}"))?;
390
391 let upload_id = create
392 .upload_id()
393 .ok_or("S3 create multipart upload returned no upload_id")?
394 .to_string();
395
396 let mut file = tokio::fs::File::open(file_path)
397 .await
398 .map_err(|e| format!("Failed to open file for multipart upload: {e}"))?;
399
400 let mut part_number: i32 = 1;
401 let mut completed_parts: Vec<CompletedPart> = Vec::new();
402 let mut buf = vec![0u8; part_size];
403
404 loop {
405 let mut bytes_read = 0;
406 // Fill the buffer completely (or until EOF)
407 while bytes_read < part_size {
408 match file.read(&mut buf[bytes_read..]).await {
409 Ok(0) => break,
410 Ok(n) => bytes_read += n,
411 Err(e) => {
412 self.abort_multipart_upload(key, &upload_id).await;
413 return Err(format!("Failed to read file: {e}"));
414 }
415 }
416 }
417
418 if bytes_read == 0 {
419 break;
420 }
421
422 // Retry the part upload up to 3 times on transient failures.
423 // S3 part uploads can flake on network blips; aborting the
424 // whole multipart upload because of one timeout means the
425 // caller has to restart from byte 0. Three attempts with
426 // exponential backoff covers the common transient cases
427 // without making a permanent failure (auth, oversize, etc.)
428 // wait forever.
429 let mut attempt: u32 = 0;
430 let resp = loop {
431 attempt += 1;
432 let body = aws_sdk_s3::primitives::ByteStream::from(buf[..bytes_read].to_vec());
433 match self
434 .client
435 .upload_part()
436 .bucket(&self.bucket)
437 .key(key)
438 .upload_id(&upload_id)
439 .part_number(part_number)
440 .body(body)
441 .send()
442 .await
443 {
444 Ok(resp) => break Ok(resp),
445 Err(e) if attempt < 3 => {
446 // Backoff: 200ms, 800ms. Cheap enough not to
447 // mask a permanent failure; long enough that
448 // a brief network glitch resolves.
449 let delay_ms = 200u64 * (1u64 << (attempt - 1) * 2);
450 tracing::warn!(
451 part_number, attempt, delay_ms, error = ?e,
452 "S3 upload_part transient failure, retrying"
453 );
454 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
455 continue;
456 }
457 Err(e) => break Err(e),
458 }
459 };
460
461 match resp {
462 Ok(resp) => {
463 let etag = resp.e_tag().unwrap_or_default().to_string();
464 completed_parts.push(
465 CompletedPart::builder()
466 .e_tag(etag)
467 .part_number(part_number)
468 .build(),
469 );
470 }
471 Err(e) => {
472 self.abort_multipart_upload(key, &upload_id).await;
473 return Err(format!("S3 upload part {part_number} failed after retries: {e}"));
474 }
475 }
476
477 part_number += 1;
478 }
479
480 if completed_parts.is_empty() {
481 self.abort_multipart_upload(key, &upload_id).await;
482 return Err("No parts uploaded (empty file)".to_string());
483 }
484
485 let completed = CompletedMultipartUpload::builder()
486 .set_parts(Some(completed_parts))
487 .build();
488
489 self.client
490 .complete_multipart_upload()
491 .bucket(&self.bucket)
492 .key(key)
493 .upload_id(&upload_id)
494 .multipart_upload(completed)
495 .send()
496 .await
497 .map_err(|e| format!("S3 complete multipart upload failed: {e}"))?;
498
499 Ok(())
500 }
501
502 /// Abort a multipart upload (best-effort, logs on failure).
503 async fn abort_multipart_upload(&self, key: &str, upload_id: &str) {
504 if let Err(e) = self
505 .client
506 .abort_multipart_upload()
507 .bucket(&self.bucket)
508 .key(key)
509 .upload_id(upload_id)
510 .send()
511 .await
512 {
513 tracing::warn!("Failed to abort multipart upload for {key}: {e}");
514 }
515 }
516
517 /// Configure CORS on the bucket for browser uploads.
518 pub async fn configure_cors(&self, allowed_origin: &str) {
519 let origin = allowed_origin.trim_end_matches('/').to_string();
520 let rule = match CorsRule::builder()
521 .allowed_origins(&origin)
522 .allowed_methods("PUT")
523 .allowed_methods("GET")
524 .allowed_methods("HEAD")
525 .allowed_headers("Content-Type")
526 .allowed_headers("Cache-Control")
527 .allowed_headers("Content-Disposition")
528 .expose_headers("ETag")
529 .max_age_seconds(3600)
530 .build()
531 {
532 Ok(r) => r,
533 Err(e) => {
534 tracing::warn!("Failed to build CORS rule: {}", e);
535 return;
536 }
537 };
538
539 let cors_config = match CorsConfiguration::builder().cors_rules(rule).build() {
540 Ok(c) => c,
541 Err(e) => {
542 tracing::warn!("Failed to build CORS config: {}", e);
543 return;
544 }
545 };
546
547 match self
548 .client
549 .put_bucket_cors()
550 .bucket(&self.bucket)
551 .cors_configuration(cors_config)
552 .send()
553 .await
554 {
555 Ok(_) => tracing::info!("S3 bucket CORS configured for {}", origin),
556 Err(e) => tracing::warn!("Failed to configure S3 CORS: {}", e),
557 }
558 }
559
560 /// Lightweight connectivity check — `list_objects_v2` with `max_keys(0)`.
561 pub async fn check_connectivity(&self) -> Result<(), String> {
562 self.client
563 .list_objects_v2()
564 .bucket(&self.bucket)
565 .max_keys(0)
566 .send()
567 .await
568 .map(|_| ())
569 .map_err(|e| format!("{e}"))
570 }
571 }
572