Skip to content

Commit ae3f5cc

Browse files
authored
increase file limit to 50gb (#182)
1 parent 6dff8a3 commit ae3f5cc

File tree

5 files changed

+136
-36
lines changed

5 files changed

+136
-36
lines changed

src/together/lib/constants.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
# Multipart upload constants
2222
MIN_PART_SIZE_MB = 5 # Minimum part size (S3 requirement)
23-
TARGET_PART_SIZE_MB = 100 # Target part size for optimal performance
23+
TARGET_PART_SIZE_MB = 250 # Target part size for optimal performance
2424
MAX_MULTIPART_PARTS = 250 # Maximum parts per upload (S3 limit)
2525
MULTIPART_UPLOAD_TIMEOUT = 300 # Timeout in seconds for uploading each part
2626
MULTIPART_THRESHOLD_GB = 5.0 # threshold for switching to multipart upload
@@ -32,7 +32,7 @@
3232
NUM_BYTES_IN_GB = 2**30
3333

3434
# maximum number of GB sized files we support finetuning for
35-
MAX_FILE_SIZE_GB = 25.0
35+
MAX_FILE_SIZE_GB = 50.1
3636

3737
# expected columns for Parquet files
3838
PARQUET_EXPECTED_COLUMNS = ["input_ids", "attention_mask", "labels"]

src/together/lib/resources/files.py

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from typing import IO, Any, Dict, List, Tuple, cast
1212
from pathlib import Path
1313
from functools import partial
14-
from concurrent.futures import ThreadPoolExecutor, as_completed
14+
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
1515

1616
import httpx
1717
from tqdm import tqdm
@@ -463,6 +463,22 @@ def _initiate_upload(
463463
body=response.text,
464464
)
465465

466+
def _submit_part(
467+
self,
468+
executor: ThreadPoolExecutor,
469+
file_handle: IO[bytes],
470+
part_info: Dict[str, Any],
471+
part_size: int,
472+
) -> Tuple[Future[str], int]:
473+
"""Submit a single part for upload and return its future and part number."""
474+
475+
part_number = part_info.get("PartNumber", part_info.get("part_number", 1))
476+
file_handle.seek((part_number - 1) * part_size)
477+
part_data = file_handle.read(part_size)
478+
479+
future = executor.submit(self._upload_single_part, part_info, part_data)
480+
return future, part_number
481+
466482
def _upload_parts_concurrent(self, file: Path, upload_info: Dict[str, Any], part_size: int) -> List[Dict[str, Any]]:
467483
"""Upload file parts concurrently with progress tracking"""
468484

@@ -471,25 +487,32 @@ def _upload_parts_concurrent(self, file: Path, upload_info: Dict[str, Any], part
471487

472488
with ThreadPoolExecutor(max_workers=self.max_concurrent_parts) as executor:
473489
with tqdm(total=len(parts), desc="Uploading parts", unit="part", disable=bool(DISABLE_TQDM)) as pbar:
474-
future_to_part: Dict[Any, int] = {}
475-
476490
with open(file, "rb") as f:
477-
for part_info in parts:
478-
part_number = part_info.get("PartNumber", part_info.get("part_number", 1))
479-
f.seek((part_number - 1) * part_size)
480-
part_data = f.read(part_size)
491+
future_to_part: Dict[Future[str], int] = {}
492+
part_index = 0
481493

482-
future = executor.submit(self._upload_single_part, part_info, part_data)
494+
while part_index < len(parts) and len(future_to_part) < self.max_concurrent_parts:
495+
part_info = parts[part_index]
496+
future, part_number = self._submit_part(executor, f, part_info, part_size)
483497
future_to_part[future] = part_number
498+
part_index += 1
499+
500+
while future_to_part:
501+
done_future = next(as_completed(future_to_part))
502+
part_number = future_to_part.pop(done_future)
484503

485-
for future in as_completed(future_to_part):
486-
part_number = future_to_part[future]
487-
try:
488-
etag = future.result()
489-
completed_parts.append({"part_number": part_number, "etag": etag})
490-
pbar.update(1)
491-
except Exception as e:
492-
raise Exception(f"Failed to upload part {part_number}: {e}") from e
504+
try:
505+
etag = done_future.result()
506+
completed_parts.append({"part_number": part_number, "etag": etag})
507+
pbar.update(1)
508+
except Exception as e:
509+
raise Exception(f"Failed to upload part {part_number}: {e}") from e
510+
511+
if part_index < len(parts):
512+
part_info = parts[part_index]
513+
future, next_part_number = self._submit_part(executor, f, part_info, part_size)
514+
future_to_part[future] = next_part_number
515+
part_index += 1
493516

494517
completed_parts.sort(key=lambda x: x["part_number"])
495518
return completed_parts
@@ -834,25 +857,46 @@ async def _upload_parts_concurrent(
834857

835858
with ThreadPoolExecutor(max_workers=self.max_concurrent_parts) as executor:
836859
with tqdm(total=len(parts), desc="Uploading parts", unit="part", disable=bool(DISABLE_TQDM)) as pbar:
837-
# Submit all upload tasks
838-
futures: List[Tuple[Any, int]] = []
839860
with open(file, "rb") as f:
840-
for part_info in parts:
861+
future_to_part: Dict[asyncio.Future[str], int] = {}
862+
part_index = 0
863+
864+
while part_index < len(parts) and len(future_to_part) < self.max_concurrent_parts:
865+
part_info = parts[part_index]
841866
part_number = part_info.get("PartNumber", part_info.get("part_number", 1))
842867
f.seek((part_number - 1) * part_size)
843868
part_data = f.read(part_size)
844869

845870
future = loop.run_in_executor(executor, self._upload_single_part_sync, part_info, part_data)
846-
futures.append((future, part_number))
847-
848-
# Collect results
849-
for future, part_number in futures:
850-
try:
851-
etag = await future
852-
completed_parts.append({"part_number": part_number, "etag": etag})
853-
pbar.update(1)
854-
except Exception as e:
855-
raise Exception(f"Failed to upload part {part_number}: {e}") from e
871+
future_to_part[future] = part_number
872+
part_index += 1
873+
874+
while future_to_part:
875+
done, _ = await asyncio.wait(
876+
tuple(future_to_part.keys()),
877+
return_when=asyncio.FIRST_COMPLETED,
878+
)
879+
880+
for done_future in done:
881+
part_number = future_to_part.pop(done_future)
882+
883+
try:
884+
etag = await done_future
885+
completed_parts.append({"part_number": part_number, "etag": etag})
886+
pbar.update(1)
887+
except Exception as e:
888+
raise Exception(f"Failed to upload part {part_number}: {e}") from e
889+
890+
if part_index < len(parts):
891+
part_info = parts[part_index]
892+
next_part_number = part_info.get("PartNumber", part_info.get("part_number", 1))
893+
f.seek((next_part_number - 1) * part_size)
894+
part_data = f.read(part_size)
895+
future = loop.run_in_executor(
896+
executor, self._upload_single_part_sync, part_info, part_data
897+
)
898+
future_to_part[future] = next_part_number
899+
part_index += 1
856900

857901
completed_parts.sort(key=lambda x: x["part_number"])
858902
return completed_parts

src/together/lib/utils/files.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
from pathlib import Path
88
from traceback import format_exc
99

10+
from tqdm import tqdm
11+
1012
from together.types import FilePurpose
1113
from together.lib.constants import (
1214
MIN_SAMPLES,
15+
DISABLE_TQDM,
1316
NUM_BYTES_IN_GB,
1417
MAX_FILE_SIZE_GB,
1518
PARQUET_EXPECTED_COLUMNS,
@@ -356,8 +359,10 @@ def _check_utf8(file: Path) -> Dict[str, Any]:
356359
"""
357360
report_dict: Dict[str, Any] = {}
358361
try:
362+
# Dry-run UTF-8 decode by iterating through the file to avoid loading it entirely into memory
359363
with file.open(encoding="utf-8") as f:
360-
f.read()
364+
for _ in f:
365+
pass
361366
report_dict["utf8"] = True
362367
except UnicodeDecodeError as e:
363368
report_dict["utf8"] = False
@@ -453,7 +458,12 @@ def _check_jsonl(file: Path, purpose: FilePurpose | str) -> Dict[str, Any]:
453458
with file.open() as f:
454459
idx = -1
455460
try:
456-
for idx, line in enumerate(f):
461+
for idx, line in tqdm(
462+
enumerate(f),
463+
desc="Validating file",
464+
unit=" lines",
465+
disable=bool(DISABLE_TQDM),
466+
):
457467
json_line = json.loads(line)
458468

459469
if not isinstance(json_line, dict):
@@ -473,7 +483,7 @@ def _check_jsonl(file: Path, purpose: FilePurpose | str) -> Dict[str, Any]:
473483
if all(column in json_line for column in JSONL_REQUIRED_COLUMNS_MAP[possible_format]):
474484
if current_format is None:
475485
current_format = possible_format
476-
elif current_format != possible_format: # type: ignore[unreachable]
486+
elif current_format != possible_format: # type: ignore[unreachable]
477487
raise InvalidFileFormatError(
478488
message="Found multiple dataset formats in the input file. "
479489
f"Got {current_format} and {possible_format} on line {idx + 1}.",
@@ -522,7 +532,7 @@ def _check_jsonl(file: Path, purpose: FilePurpose | str) -> Dict[str, Any]:
522532

523533
if dataset_format is None:
524534
dataset_format = current_format
525-
elif current_format != dataset_format: # type: ignore[unreachable]
535+
elif current_format != dataset_format: # type: ignore[unreachable]
526536
raise InvalidFileFormatError(
527537
message="All samples in the dataset must have the same dataset format. "
528538
f"Got {dataset_format} for the first line and {current_format} "
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import math
2+
from pathlib import Path
3+
from unittest.mock import MagicMock, patch
4+
5+
import pytest
6+
7+
from together.lib.constants import NUM_BYTES_IN_GB, MAX_FILE_SIZE_GB, TARGET_PART_SIZE_MB
8+
from together.lib.types.error import FileTypeError
9+
from together.lib.resources.files import MultipartUploadManager, _calculate_parts
10+
11+
12+
def test_calculate_parts_medium_file():
13+
"""Ensure 500MB files are split into two 250MB parts."""
14+
15+
file_size = 500 * 1024 * 1024 # 500MB
16+
part_size, num_parts = _calculate_parts(file_size)
17+
18+
expected_part_size = TARGET_PART_SIZE_MB * 1024 * 1024
19+
20+
assert num_parts == 2
21+
assert part_size == expected_part_size
22+
23+
24+
def test_calculate_parts_large_file():
25+
"""Ensure 50GB files respect the 205-part cap with ~250MB chunks."""
26+
27+
file_size = 50 * 1024 * 1024 * 1024 # 50GB
28+
part_size, num_parts = _calculate_parts(file_size)
29+
30+
expected_parts = math.ceil(file_size / (TARGET_PART_SIZE_MB * 1024 * 1024)) # 50GB / 250MB ~= 205
31+
32+
assert num_parts == expected_parts
33+
assert part_size >= TARGET_PART_SIZE_MB * 1024 * 1024 - (1 * 1024 * 1024)
34+
35+
36+
@patch("together.lib.resources.files.os.stat")
37+
def test_file_size_exceeds_limit_raises_error(mock_stat: MagicMock):
38+
"""Uploading a file above 50.1GB should raise FileTypeError."""
39+
40+
mock_stat.return_value.st_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB)
41+
manager = MultipartUploadManager(MagicMock())
42+
43+
with pytest.raises(FileTypeError) as exc_info:
44+
manager.upload("/files", Path("too_large.jsonl"), "fine-tune")
45+
46+
assert "exceeds maximum supported size" in str(exc_info.value)

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)