Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 25 additions & 51 deletions s3/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1540,73 +1540,47 @@ impl Bucket {
let msg = self
.initiate_multipart_upload(s3_path, content_type)
.await?;
let path = msg.key;
let path = &msg.key;
let upload_id = &msg.upload_id;

let mut part_number: u32 = 0;
let mut etags = Vec::new();

// Collect request handles
let mut handles = vec![];
let mut total_size = 0;
loop {
let chunk = if part_number == 0 {
first_chunk.clone()
} else {
crate::utils::read_chunk_async(reader).await?
};
total_size += chunk.len();

let done = chunk.len() < CHUNK_SIZE;

// Start chunk upload
// Read chunks one by one making multi part request for each and gathering responses
let mut chunk = first_chunk;
let mut part_number = 0;
let mut parts = vec![];
while !chunk.is_empty() {
total_size += chunk.len();
part_number += 1;
handles.push(self.make_multipart_request(
&path,
chunk,

// Perform chunk upload and reading next chunk in parallel
let part_fut = self.make_multipart_request(
path,
std::mem::take(&mut chunk),
part_number,
upload_id,
content_type,
));

if done {
break;
}
}
);
let chunk_fut = crate::utils::read_chunk_async(reader);

// Wait for all chunks to finish (or fail)
let responses = futures::future::join_all(handles).await;
let (next_chunk, response) = futures::future::try_join(chunk_fut, part_fut).await?;
chunk = next_chunk;

for response in responses {
let response_data = response?;
if !(200..300).contains(&response_data.status_code()) {
// Analyze the response to fast fail bad upload
if !(200..300).contains(&response.status_code()) {
// if chunk upload failed - abort the upload
match self.abort_upload(&path, upload_id).await {
Ok(_) => {
return Err(error_from_response_data(response_data)?);
}
Err(error) => {
return Err(error);
}
}
self.abort_upload(&path, upload_id).await?;
return Err(error_from_response_data(response)?);
}

let etag = response_data.as_str()?;
etags.push(etag.to_string());
parts.push(Part {
etag: response.to_string()?,
Copy link

Copilot AI Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method call response.to_string()? appears incorrect. Based on the original code that used response_data.as_str()?, this should likely be response.as_str()?.to_string() to first convert to string slice then to owned String.

Suggested change
etag: response.to_string()?,
etag: response.as_str()?.to_string(),

Copilot uses AI. Check for mistakes.
part_number,
});
}

// Finish the upload
let inner_data = etags
.clone()
.into_iter()
.enumerate()
.map(|(i, x)| Part {
etag: x,
part_number: i as u32 + 1,
})
.collect::<Vec<Part>>();
let response_data = self
.complete_multipart_upload(&path, &msg.upload_id, inner_data)
.complete_multipart_upload(path, upload_id, parts)
.await?;

Ok(PutStreamResponse::new(
Expand Down