Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/together/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

# Multipart upload constants
MIN_PART_SIZE_MB = 5 # Minimum part size (S3 requirement)
TARGET_PART_SIZE_MB = 100 # Target part size for optimal performance
MAX_MULTIPART_PARTS = 250 # Maximum parts per upload (S3 limit)
TARGET_PART_SIZE_MB = 250 # Target part size
MAX_MULTIPART_PARTS = 250 # Maximum parts per upload
MULTIPART_UPLOAD_TIMEOUT = 300 # Timeout in seconds for uploading each part
MULTIPART_THRESHOLD_GB = 5.0 # threshold for switching to multipart upload

# maximum number of GB sized files we support finetuning for
MAX_FILE_SIZE_GB = 25.0
MAX_FILE_SIZE_GB = 50.1


# Messages
Expand Down
67 changes: 45 additions & 22 deletions src/together/filemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import stat
import tempfile
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from functools import partial
from pathlib import Path
from typing import Any, Dict, List, Tuple
from typing import Any, BinaryIO, Dict, List, Tuple

import requests
from filelock import FileLock
Expand Down Expand Up @@ -212,6 +212,7 @@ def download(
),
remaining_retries=MAX_RETRIES,
stream=True,
request_timeout=3600,
)

try:
Expand Down Expand Up @@ -512,6 +513,18 @@ def _initiate_upload(

return response.data

def _submit_part(
self,
executor: ThreadPoolExecutor,
f: BinaryIO,
part_info: Dict[str, Any],
part_size: int,
) -> Future[str]:
"""Submit a single part for upload and return the future"""
f.seek((part_info["PartNumber"] - 1) * part_size)
part_data = f.read(part_size)
return executor.submit(self._upload_single_part, part_info, part_data)

def _upload_parts_concurrent(
self, file: Path, upload_info: Dict[str, Any], part_size: int
) -> List[Dict[str, Any]]:
Expand All @@ -522,29 +535,39 @@ def _upload_parts_concurrent(

with ThreadPoolExecutor(max_workers=self.max_concurrent_parts) as executor:
with tqdm(total=len(parts), desc="Uploading parts", unit="part") as pbar:
future_to_part = {}

with open(file, "rb") as f:
for part_info in parts:
f.seek((part_info["PartNumber"] - 1) * part_size)
part_data = f.read(part_size)
future_to_part = {}
part_index = 0

future = executor.submit(
self._upload_single_part, part_info, part_data
)
# Submit initial batch limited by max_concurrent_parts
for _ in range(min(self.max_concurrent_parts, len(parts))):
part_info = parts[part_index]
future = self._submit_part(executor, f, part_info, part_size)
future_to_part[future] = part_info["PartNumber"]

# Collect results
for future in as_completed(future_to_part):
part_number = future_to_part[future]
try:
etag = future.result()
completed_parts.append(
{"part_number": part_number, "etag": etag}
)
pbar.update(1)
except Exception as e:
raise Exception(f"Failed to upload part {part_number}: {e}")
part_index += 1

# Process completions and submit new parts (sliding window)
while future_to_part:
done_future = next(as_completed(future_to_part))
part_number = future_to_part.pop(done_future)

try:
etag = done_future.result()
completed_parts.append(
{"part_number": part_number, "etag": etag}
)
pbar.update(1)
except Exception as e:
raise Exception(f"Failed to upload part {part_number}: {e}")

# Submit next part if available
if part_index < len(parts):
part_info = parts[part_index]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to rewrite it to deduplicate this code piece with the one above. I think you can either make a for loop to submit tasks and wait on result if we have enough already, or use executor.map with buffersize to limit concurrent tasks.

future = self._submit_part(
executor, f, part_info, part_size
)
future_to_part[future] = part_info["PartNumber"]
part_index += 1

completed_parts.sort(key=lambda x: x["part_number"])
return completed_parts
Expand Down
10 changes: 8 additions & 2 deletions src/together/utils/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from traceback import format_exc
from typing import Any, Dict, List

from tqdm import tqdm

from together.constants import (
MAX_FILE_SIZE_GB,
Expand Down Expand Up @@ -363,14 +364,19 @@ def _check_utf8(file: Path) -> Dict[str, Any]:
Dict[str, Any]: A dictionary with the results of the check.
"""
report_dict: Dict[str, Any] = {}

try:
# Dry-run UTF-8 decode: iterate through file to validate encoding
with file.open(encoding="utf-8") as f:
f.read()
for _ in f:
pass

report_dict["utf8"] = True
except UnicodeDecodeError as e:
report_dict["utf8"] = False
report_dict["message"] = f"File is not UTF-8 encoded. Error raised: {e}."
report_dict["is_check_passed"] = False

return report_dict


Expand Down Expand Up @@ -470,7 +476,7 @@ def _check_jsonl(file: Path, purpose: FilePurpose | str) -> Dict[str, Any]:
with file.open() as f:
idx = -1
try:
for idx, line in enumerate(f):
for idx, line in tqdm(enumerate(f), desc="Validating file", unit=" lines"):
json_line = json.loads(line)

if not isinstance(json_line, dict):
Expand Down
19 changes: 10 additions & 9 deletions tests/unit/test_multipart_upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def test_calculate_parts_medium_file(self, manager):
file_size = 500 * 1024 * 1024 # 500MB
part_size, num_parts = manager._calculate_parts(file_size)

expected_parts = 5 # 500MB / 100MB = 5 parts
expected_part_size = 100 * 1024 * 1024 # 100MB
expected_parts = 2 # 500MB / 250MB = 2 parts
expected_part_size = 250 * 1024 * 1024 # 250MB

assert num_parts == expected_parts
assert part_size == expected_part_size
Expand All @@ -85,10 +85,11 @@ def test_calculate_parts_large_file(self, manager):
file_size = 50 * 1024 * 1024 * 1024 # 50GB
part_size, num_parts = manager._calculate_parts(file_size)

# Should use maximum parts and scale part size
assert num_parts == MAX_MULTIPART_PARTS # 250
expected_part_size = file_size // MAX_MULTIPART_PARTS
assert part_size >= expected_part_size
# With 250MB target part size, 50GB should use ~205 parts
expected_parts = 205 # 50GB / 250MB ≈ 205 parts
assert num_parts == expected_parts
# Part size should be close to target (within rounding)
assert part_size >= 249 * 1024 * 1024 # At least 249MB (allowing for rounding)

def test_calculate_parts_respects_minimum_part_size(self, manager):
"""Test that minimum part size is respected"""
Expand Down Expand Up @@ -275,7 +276,7 @@ def test_upload_parts_concurrent(self, mock_open, mock_executor_class, manager):
# Mock as_completed
with patch(
"together.filemanager.as_completed",
return_value=[mock_future1, mock_future2],
side_effect=[iter([mock_future1]), iter([mock_future2])],
):
upload_info = {
"parts": [
Expand Down Expand Up @@ -303,15 +304,15 @@ def test_upload_parts_concurrent(self, mock_open, mock_executor_class, manager):
def test_file_size_exceeds_limit_raises_error(self, mock_stat, manager):
"""Test that files exceeding size limit raise FileTypeError with clear message"""
# Setup - file size over limit
file_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB) # 26GB
file_size = int((MAX_FILE_SIZE_GB + 1) * NUM_BYTES_IN_GB) # 51.1GB
mock_stat.return_value.st_size = file_size

# Should raise FileTypeError with descriptive message
with pytest.raises(FileTypeError) as exc_info:
manager.upload("test-url", Path("test.jsonl"), FilePurpose.FineTune)

error_message = str(exc_info.value)
assert "26.0GB exceeds maximum supported size of 25.0GB" in error_message
assert "51.1GB exceeds maximum supported size of 50.1GB" in error_message

@patch.object(MultipartUploadManager, "_initiate_upload")
@patch.object(MultipartUploadManager, "_upload_parts_concurrent")
Expand Down
Loading