-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_multipart_upload_threaded.py
80 lines (61 loc) · 1.96 KB
/
test_multipart_upload_threaded.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import json
import os
import sys
import boto3
def generate_file(file_path, size_in_gb):
os.system(f"fallocate -l {1024 * 1024 * 1024 * size_in_gb} {file_path}")
def chunk_file(file_path, chunk_size=8 * 1024 * 1024):
with open(file_path, 'rb') as f:
chunk_number = 1
while chunk := f.read(chunk_size):
yield chunk_number, chunk
chunk_number += 1
def upload_file_multipart(bucket_name, file_path, file_name):
s3_client = boto3.client('s3')
part_info = {
'Parts': []
}
create_response = s3_client.create_multipart_upload(
Bucket=bucket_name,
Key=file_name,
ChecksumAlgorithm='CRC64NVME'
)
upload_id = create_response['UploadId']
for chunk_number, chunk in chunk_file(file_path):
response = s3_client.upload_part(
Bucket=bucket_name,
Body=chunk,
UploadId=upload_id,
PartNumber=chunk_number,
Key=file_name,
ChecksumAlgorithm='CRC64NVME'
)
part = {
'PartNumber': chunk_number,
'ETag': response['ETag']
}
print(f"Completed part {part}", file=sys.stderr)
part_info['Parts'].append(part)
with open('parts.json', 'w', encoding='utf8') as f:
json.dump(part_info, f)
result = s3_client.complete_multipart_upload(
Bucket=bucket_name,
Key=file_name,
UploadId=upload_id,
MultipartUpload=part_info
)
with open('result.json', 'w', encoding='utf8') as f:
json.dump(result, f)
def main():
if len(sys.argv) != 2:
print("upload_file.py BUCKET_NAME")
sys.exit(1)
bucket_name = sys.argv[1]
file_name = "file.dat"
file_path = f"/tmp/{file_name}"
generate_file(file_path, 8)
print(f"Uploading {file_name} to S3 bucket {bucket_name}")
upload_file_multipart(bucket_name, file_path, file_name)
os.remove(file_path)
if __name__ == "__main__":
main()