Skip to content

Commit 1201470

Browse files
authored
feat: Increase file size limit from 25GB to 50.1GB (#396)
1 parent 4f3647d commit 1201470

File tree

4 files changed

+66
-36
lines changed

4 files changed

+66
-36
lines changed

src/together/constants.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020

2121
# Multipart upload constants
2222
MIN_PART_SIZE_MB = 5 # Minimum part size (S3 requirement)
23-
TARGET_PART_SIZE_MB = 100 # Target part size for optimal performance
24-
MAX_MULTIPART_PARTS = 250 # Maximum parts per upload (S3 limit)
23+
TARGET_PART_SIZE_MB = 250 # Target part size
24+
MAX_MULTIPART_PARTS = 250 # Maximum parts per upload
2525
MULTIPART_UPLOAD_TIMEOUT = 300 # Timeout in seconds for uploading each part
2626
MULTIPART_THRESHOLD_GB = 5.0 # threshold for switching to multipart upload
2727

2828
# maximum number of GB sized files we support finetuning for
29-
MAX_FILE_SIZE_GB = 25.0
29+
MAX_FILE_SIZE_GB = 50.1
3030

3131

3232
# Messages

src/together/filemanager.py

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
import stat
77
import tempfile
88
import uuid
9-
from concurrent.futures import ThreadPoolExecutor, as_completed
9+
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
1010
from functools import partial
1111
from pathlib import Path
12-
from typing import Any, Dict, List, Tuple
12+
from typing import Any, BinaryIO, Dict, List, Tuple
1313

1414
import requests
1515
from filelock import FileLock
@@ -212,6 +212,7 @@ def download(
212212
),
213213
remaining_retries=MAX_RETRIES,
214214
stream=True,
215+
request_timeout=3600,
215216
)
216217

217218
try:
@@ -512,6 +513,18 @@ def _initiate_upload(
512513

513514
return response.data
514515

516+
def _submit_part(
517+
self,
518+
executor: ThreadPoolExecutor,
519+
f: BinaryIO,
520+
part_info: Dict[str, Any],
521+
part_size: int,
522+
) -> Future[str]:
523+
"""Submit a single part for upload and return the future"""
524+
f.seek((part_info["PartNumber"] - 1) * part_size)
525+
part_data = f.read(part_size)
526+
return executor.submit(self._upload_single_part, part_info, part_data)
527+
515528
def _upload_parts_concurrent(
516529
self, file: Path, upload_info: Dict[str, Any], part_size: int
517530
) -> List[Dict[str, Any]]:
@@ -522,29 +535,39 @@ def _upload_parts_concurrent(
522535

523536
with ThreadPoolExecutor(max_workers=self.max_concurrent_parts) as executor:
524537
with tqdm(total=len(parts), desc="Uploading parts", unit="part") as pbar:
525-
future_to_part = {}
526-
527538
with open(file, "rb") as f:
528-
for part_info in parts:
529-
f.seek((part_info["PartNumber"] - 1) * part_size)
530-
part_data = f.read(part_size)
539+
future_to_part = {}
540+
part_index = 0
531541

532-
future = executor.submit(
533-
self._upload_single_part, part_info, part_data
534-
)
542+
# Submit initial batch limited by max_concurrent_parts
543+
for _ in range(min(self.max_concurrent_parts, len(parts))):
544+
part_info = parts[part_index]
545+
future = self._submit_part(executor, f, part_info, part_size)
535546
future_to_part[future] = part_info["PartNumber"]
536-
537-
# Collect results
538-
for future in as_completed(future_to_part):
539-
part_number = future_to_part[future]
540-
try:
541-
etag = future.result()
542-
completed_parts.append(
543-
{"part_number": part_number, "etag": etag}
544-
)
545-
pbar.update(1)
546-
except Exception as e:
547-
raise Exception(f"Failed to upload part {part_number}: {e}")
547+
part_index += 1
548+
549+
# Process completions and submit new parts (sliding window)
550+
while future_to_part:
551+
done_future = next(as_completed(future_to_part))
552+
part_number = future_to_part.pop(done_future)
553+
554+
try:
555+
etag = done_future.result()
556+
completed_parts.append(
557+
{"part_number": part_number, "etag": etag}
558+
)
559+
pbar.update(1)
560+
except Exception as e:
561+
raise Exception(f"Failed to upload part {part_number}: {e}")
562+
563+
# Submit next part if available
564+
if part_index < len(parts):
565+
part_info = parts[part_index]
566+
future = self._submit_part(
567+
executor, f, part_info, part_size
568+
)
569+
future_to_part[future] = part_info["PartNumber"]
570+
part_index += 1
548571

549572
completed_parts.sort(key=lambda x: x["part_number"])
550573
return completed_parts

src/together/utils/files.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from traceback import format_exc
88
from typing import Any, Dict, List
99

10+
from tqdm import tqdm
1011

1112
from together.constants import (
1213
MAX_FILE_SIZE_GB,
@@ -363,14 +364,19 @@ def _check_utf8(file: Path) -> Dict[str, Any]:
363364
Dict[str, Any]: A dictionary with the results of the check.
364365
"""
365366
report_dict: Dict[str, Any] = {}
367+
366368
try:
369+
# Dry-run UTF-8 decode: iterate through file to validate encoding
367370
with file.open(encoding="utf-8") as f:
368-
f.read()
371+
for _ in f:
372+
pass
373+
369374
report_dict["utf8"] = True
370375
except UnicodeDecodeError as e:
371376
report_dict["utf8"] = False
372377
report_dict["message"] = f"File is not UTF-8 encoded. Error raised: {e}."
373378
report_dict["is_check_passed"] = False
379+
374380
return report_dict
375381

376382

@@ -470,7 +476,7 @@ def _check_jsonl(file: Path, purpose: FilePurpose | str) -> Dict[str, Any]:
470476
with file.open() as f:
471477
idx = -1
472478
try:
473-
for idx, line in enumerate(f):
479+
for idx, line in tqdm(enumerate(f), desc="Validating file", unit=" lines"):
474480
json_line = json.loads(line)
475481

476482
if not isinstance(json_line, dict):

tests/unit/test_multipart_upload_manager.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ def test_calculate_parts_medium_file(self, manager):
7474
file_size = 500 * 1024 * 1024 # 500MB
7575
part_size, num_parts = manager._calculate_parts(file_size)
7676

77-
expected_parts = 5 # 500MB / 100MB = 5 parts
78-
expected_part_size = 100 * 1024 * 1024 # 100MB
77+
expected_parts = 2 # 500MB / 250MB = 2 parts
78+
expected_part_size = 250 * 1024 * 1024 # 250MB
7979

8080
assert num_parts == expected_parts
8181
assert part_size == expected_part_size
@@ -85,10 +85,11 @@ def test_calculate_parts_large_file(self, manager):
8585
file_size = 50 * 1024 * 1024 * 1024 # 50GB
8686
part_size, num_parts = manager._calculate_parts(file_size)
8787

88-
# Should use maximum parts and scale part size
89-
assert num_parts == MAX_MULTIPART_PARTS # 250
90-
expected_part_size = file_size // MAX_MULTIPART_PARTS
91-
assert part_size >= expected_part_size
88+
# With 250MB target part size, 50GB should use ~205 parts
89+
expected_parts = 205 # 50GB / 250MB ≈ 205 parts
90+
assert num_parts == expected_parts
91+
# Part size should be close to target (within rounding)
92+
assert part_size >= 249 * 1024 * 1024 # At least 249MB (allowing for rounding)
9293

9394
def test_calculate_parts_respects_minimum_part_size(self, manager):
9495
"""Test that minimum part size is respected"""
@@ -275,7 +276,7 @@ def test_upload_parts_concurrent(self, mock_open, mock_executor_class, manager):
275276
# Mock as_completed
276277
with patch(
277278
"together.filemanager.as_completed",
278-
return_value=[mock_future1, mock_future2],
279+
side_effect=[iter([mock_future1]), iter([mock_future2])],
279280
):
280281
upload_info = {
281282
"parts": [
@@ -303,15 +304,15 @@ def test_upload_parts_concurrent(self, mock_open, mock_executor_class, manager):
303304
def test_file_size_exceeds_limit_raises_error(self, mock_stat, manager):
304305
"""Test that files exceeding size limit raise FileTypeError with clear message"""
305306
# Setup - file size over limit
306-
file_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB) # 26GB
307+
file_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB) # 51.1GB
307308
mock_stat.return_value.st_size = file_size
308309

309310
# Should raise FileTypeError with descriptive message
310311
with pytest.raises(FileTypeError) as exc_info:
311312
manager.upload("test-url", Path("test.jsonl"), FilePurpose.FineTune)
312313

313314
error_message = str(exc_info.value)
314-
assert "26.0GB exceeds maximum supported size of 25.0GB" in error_message
315+
assert "51.1GB exceeds maximum supported size of 50.1GB" in error_message
315316

316317
@patch.object(MultipartUploadManager, "_initiate_upload")
317318
@patch.object(MultipartUploadManager, "_upload_parts_concurrent")

0 commit comments

Comments
 (0)