diff --git a/decontamination/index.py b/decontamination/index.py index fbdb475ba1..44b5336894 100644 --- a/decontamination/index.py +++ b/decontamination/index.py @@ -6,6 +6,8 @@ from elasticsearch import Elasticsearch, helpers from tqdm import tqdm +import open_instruct.utils as open_instruct_utils + def create_text_index(es, index_name): mappings = { @@ -45,7 +47,7 @@ def create_vector_index(es, index_name): def read_dataset(dataset_name, split, messages_field, query_filter, query_field): - dataset = load_dataset(dataset_name, split=split) + dataset = load_dataset(dataset_name, split=split, num_proc=open_instruct_utils.max_num_processes()) data_to_index = [] query_filter_key, query_filter_value = query_filter.split(":") diff --git a/decontamination/search.py b/decontamination/search.py index ef491e3636..bab4a4f6bc 100644 --- a/decontamination/search.py +++ b/decontamination/search.py @@ -11,6 +11,8 @@ from tqdm import tqdm from transformers import AutoModel, AutoTokenizer +import open_instruct.utils as open_instruct_utils + SPACY_MODEL = spacy.load("en_core_web_lg") @@ -286,14 +288,14 @@ def main(): for dataset, subset, split, fields, limit in eval_sets: print(f"Querying {index_name} for {dataset}.") try: - query_dataset = list(load_dataset(dataset, subset, split=split))[:limit] + query_dataset = list(load_dataset(dataset, subset, split=split, num_proc=open_instruct_utils.max_num_processes()))[:limit] except ValueError: query_dataset = [] if args.subset is None: # Dataset has multiple subsets. We want to concatenate all of them. from datasets import get_dataset_config_names for subset in get_dataset_config_names(dataset): - query_dataset.extend(list(load_dataset(dataset, subset, split=split))[:limit]) + query_dataset.extend(list(load_dataset(dataset, subset, split=split, num_proc=open_instruct_utils.max_num_processes()))[:limit]) else: raise @@ -337,7 +339,7 @@ def main(): for dataset_name, contaminated_ids in zip(dataset_names, all_index_contaminated_ids): print(f"Decontaminating {dataset_name}") # Assuming dataset has no subsets and we want the train split. - train_dataset = load_dataset(dataset_name, split="train") + train_dataset = load_dataset(dataset_name, split="train", num_proc=open_instruct_utils.max_num_processes()) decontaminated_dataset = [] num_kept = 0 num_total = 0 diff --git a/open_instruct/code_utils/test_code_utils.py b/open_instruct/code_utils/test_code_utils.py index 4a094d5264..a257357ed6 100644 --- a/open_instruct/code_utils/test_code_utils.py +++ b/open_instruct/code_utils/test_code_utils.py @@ -7,6 +7,7 @@ import datasets import parameterized +import open_instruct.utils as open_instruct_utils from open_instruct.code_utils import code_utils SIMPLE_PROGRAM = "a = 1" @@ -45,7 +46,9 @@ def test_all_fail_or_timeout(self): def test_tiger_lab_acecode_sample(self): """Tests the script against an actual AceCode record.""" - ds = datasets.load_dataset("TIGER-Lab/AceCode-87K", split="train") + ds = datasets.load_dataset( + "TIGER-Lab/AceCode-87K", split="train", num_proc=open_instruct_utils.max_num_processes() + ) # Choose the same sample index used in the original snippet. i = 1 diff --git a/open_instruct/dataset_transformation.py b/open_instruct/dataset_transformation.py index f88da72109..db51b227c2 100644 --- a/open_instruct/dataset_transformation.py +++ b/open_instruct/dataset_transformation.py @@ -66,7 +66,7 @@ ) from transformers.utils.hub import _CACHED_NO_EXIST, TRANSFORMERS_CACHE, extract_commit_hash, try_to_load_from_cache -from open_instruct.utils import hf_whoami +from open_instruct.utils import hf_whoami, max_num_processes # ---------------------------------------------------------------------------- @@ -1379,16 +1379,25 @@ def __post_init__(self): # if the file exists locally, use the local file if os.path.exists(self.dataset_name) and self.dataset_name.endswith(".jsonl"): assert self.dataset_split == "train", "Only train split is supported for local jsonl files." - self.dataset = load_dataset("json", data_files=self.dataset_name, split=self.dataset_split) + self.dataset = load_dataset( + "json", data_files=self.dataset_name, split=self.dataset_split, num_proc=max_num_processes() + ) elif os.path.exists(self.dataset_name) and self.dataset_name.endswith(".parquet"): assert self.dataset_split == "train", "Only train split is supported for local parquet files." - self.dataset = load_dataset("parquet", data_files=self.dataset_name, split=self.dataset_split) + self.dataset = load_dataset( + "parquet", data_files=self.dataset_name, split=self.dataset_split, num_proc=max_num_processes() + ) else: # commit hash only works for hf datasets self.dataset_commit_hash = get_commit_hash( self.dataset_name, self.dataset_revision, "README.md", "dataset" ) - self.dataset = load_dataset(self.dataset_name, split=self.dataset_split, revision=self.dataset_revision) + self.dataset = load_dataset( + self.dataset_name, + split=self.dataset_split, + revision=self.dataset_revision, + num_proc=max_num_processes(), + ) if self.dataset_range is None: dataset_range = len(self.dataset) self.update_range(dataset_range) @@ -1512,7 +1521,12 @@ def load_or_transform_dataset( print("dataset_skip_cache is True, so we will not load the dataset from cache") else: # Use the split from the first dataset config as default - return load_dataset(repo_name, split=DEFAULT_SPLIT_FOR_CACHED_DATASET, revision=self.config_hash) + return load_dataset( + repo_name, + split=DEFAULT_SPLIT_FOR_CACHED_DATASET, + revision=self.config_hash, + num_proc=max_num_processes(), + ) print(f"Cache not found, transforming datasets...") @@ -1565,7 +1579,9 @@ def load_or_transform_dataset( # NOTE: Load the dataset again to make sure it's downloaded to the HF cache print(f"✅ Found cached dataset at https://huggingface.co/datasets/{repo_name}/tree/{self.config_hash}") - return load_dataset(repo_name, split=DEFAULT_SPLIT_FOR_CACHED_DATASET, revision=self.config_hash) + return load_dataset( + repo_name, split=DEFAULT_SPLIT_FOR_CACHED_DATASET, revision=self.config_hash, num_proc=max_num_processes() + ) class LocalDatasetTransformationCache: @@ -1931,7 +1947,9 @@ def test_get_cached_dataset_tulu_sft(): dataset_skip_cache=True, ) - gold_tokenized_dataset = load_dataset("allenai/dataset-mix-cached", split="train", revision="61ac38e052") + gold_tokenized_dataset = load_dataset( + "allenai/dataset-mix-cached", split="train", revision="61ac38e052", num_proc=max_num_processes() + ) assert len(dataset) == len(gold_tokenized_dataset) for i in range(len(dataset)): assert dataset[i]["input_ids"] == gold_tokenized_dataset[i]["input_ids"] @@ -1959,7 +1977,9 @@ def test_get_cached_dataset_tulu_preference(): TOKENIZED_PREFERENCE_DATASET_KEYS, dataset_skip_cache=True, ) - gold_tokenized_dataset = load_dataset("allenai/dataset-mix-cached", split="train", revision="9415479293") + gold_tokenized_dataset = load_dataset( + "allenai/dataset-mix-cached", split="train", revision="9415479293", num_proc=max_num_processes() + ) assert len(dataset) == len(gold_tokenized_dataset) for i in range(len(dataset)): assert dataset[i]["chosen_input_ids"] == gold_tokenized_dataset[i]["chosen_input_ids"] @@ -1987,7 +2007,9 @@ def test_get_cached_dataset_tulu_rlvr(): transform_fn_args, dataset_skip_cache=True, ) - gold_tokenized_dataset = load_dataset("allenai/dataset-mix-cached", split="train", revision="0ff0043e56") + gold_tokenized_dataset = load_dataset( + "allenai/dataset-mix-cached", split="train", revision="0ff0043e56", num_proc=max_num_processes() + ) assert len(dataset) == len(gold_tokenized_dataset) for i in range(len(dataset)): assert dataset[i][INPUT_IDS_PROMPT_KEY] == gold_tokenized_dataset[i][INPUT_IDS_PROMPT_KEY] diff --git a/open_instruct/utils.py b/open_instruct/utils.py index 42db119110..a43fdb3ae2 100644 --- a/open_instruct/utils.py +++ b/open_instruct/utils.py @@ -75,6 +75,14 @@ DataClassType = NewType("DataClassType", Any) +def max_num_processes() -> int: + """Returns a reasonable default number of processes to run for multiprocessing.""" + if hasattr(os, "sched_getaffinity"): + return len(os.sched_getaffinity(0)) + else: + return os.cpu_count() or 1 + + def repeat_each(seq, k): """Repeat each element in a sequence k times.""" return [item for item in seq for _ in range(k)] @@ -315,13 +323,13 @@ def get_datasets( for split in splits: # if dataset ends with .json or .jsonl, load from file if ds.endswith(".json") or ds.endswith(".jsonl"): - dataset = load_dataset("json", data_files=ds, split=split) + dataset = load_dataset("json", data_files=ds, split=split, num_proc=max_num_processes()) elif ds.endswith(".parquet"): - dataset = load_dataset("parquet", data_files=ds, split=split) + dataset = load_dataset("parquet", data_files=ds, split=split, num_proc=max_num_processes()) else: try: # Try first if dataset on a Hub repo - dataset = load_dataset(ds, ds_config, split=split) + dataset = load_dataset(ds, ds_config, split=split, num_proc=max_num_processes()) except DatasetGenerationError: # If not, check local dataset dataset = load_from_disk(os.path.join(ds, split)) @@ -529,11 +537,11 @@ def combine_dataset( for (ds, frac_or_samples), ds_config, split in zip(dataset_mixer.items(), configs, splits): # if dataset ends with .json or .jsonl, load from file if ds.endswith(".json") or ds.endswith(".jsonl"): - dataset = load_dataset("json", data_files=ds, split=split) + dataset = load_dataset("json", data_files=ds, split=split, num_proc=max_num_processes()) else: try: # Try first if dataset on a Hub repo - dataset = load_dataset(ds, ds_config, split=split) + dataset = load_dataset(ds, ds_config, split=split, num_proc=max_num_processes()) except DatasetGenerationError: # If not, check local dataset dataset = load_from_disk(os.path.join(ds, split)) diff --git a/quantize/quantize_autogptq_wikitext.py b/quantize/quantize_autogptq_wikitext.py index b552fb5f8d..12cc1911e0 100644 --- a/quantize/quantize_autogptq_wikitext.py +++ b/quantize/quantize_autogptq_wikitext.py @@ -18,10 +18,12 @@ from datasets import load_dataset from transformers import AutoTokenizer +import open_instruct.utils as open_instruct_utils + def get_wikitext2(nsamples, seed, seqlen, model): - traindata = load_dataset("wikitext", "wikitext-2-raw-v1", split="train") - testdata = load_dataset("wikitext", "wikitext-2-raw-v1", split="test") + traindata = load_dataset("wikitext", "wikitext-2-raw-v1", split="train", num_proc=open_instruct_utils.max_num_processes()) + testdata = load_dataset("wikitext", "wikitext-2-raw-v1", split="test", num_proc=open_instruct_utils.max_num_processes()) tokenizer = AutoTokenizer.from_pretrained(model, use_fast=False) trainenc = tokenizer("\n\n".join(traindata["text"]), return_tensors="pt") diff --git a/scripts/create_ground_truth_data.py b/scripts/create_ground_truth_data.py index 48422e7bf0..bb0cd71df8 100644 --- a/scripts/create_ground_truth_data.py +++ b/scripts/create_ground_truth_data.py @@ -4,6 +4,7 @@ import random from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from tqdm import tqdm from open_instruct.math_utils import last_boxed_only_string, remove_boxed @@ -86,7 +87,7 @@ for sample in GSM8K_EXEMPLARS: gsm8k_prompt += f"Question: {sample['question'].strip()}\nAnswer:{sample['cot_answer'].strip()}\n\n" -gsm8k_dataset = load_dataset("gsm8k", "main", split="train") +gsm8k_dataset = load_dataset("gsm8k", "main", split="train", num_proc=open_instruct_utils.max_num_processes()) new_data = [] for sample in gsm8k_dataset: answer = sample["answer"].split("####")[-1].strip() @@ -97,7 +98,7 @@ }) # also make a test split for eval -gsm8k_dataset = load_dataset("gsm8k", "main", split="test") +gsm8k_dataset = load_dataset("gsm8k", "main", split="test", num_proc=open_instruct_utils.max_num_processes()) test_data = [] for sample in gsm8k_dataset: answer = sample["answer"].split("####")[-1].strip() @@ -111,7 +112,7 @@ math_prompt = "" for sample in MATH_EXAMPLARS: math_prompt += f"Question: {sample['question'].strip()}\nAnswer:{sample['cot_answer'].strip()}\n\n" -math_dataset = load_dataset("lighteval/MATH", "all", split="train") +math_dataset = load_dataset("lighteval/MATH", "all", split="train", num_proc=open_instruct_utils.max_num_processes()) for sample in math_dataset: # same code used to extract answer for eval answer = remove_boxed(last_boxed_only_string(sample["solution"])) @@ -132,7 +133,7 @@ # dataset.push_to_hub("ai2-adapt-dev/gsm8k_math_ground_truth") # # alternate dataset: metamathqa! -# metamathqa_dataset = load_dataset("meta-math/MetaMathQA", "main", split="train") +# metamathqa_dataset = load_dataset("meta-math/MetaMathQA", "main", split="train", num_proc=open_instruct_utils.max_num_processes()) # # let's re-use the MATH prompt. # new_data = [] # def extract_answer(text): @@ -158,7 +159,7 @@ # dataset.push_to_hub("ai2-adapt-dev/metamathqa_ground_truth") # alternate dataset: numina-tir -metamathqa_dataset = load_dataset("AI-MO/NuminaMath-TIR", split="train") +metamathqa_dataset = load_dataset("AI-MO/NuminaMath-TIR", split="train", num_proc=open_instruct_utils.max_num_processes()) # let's re-use the MATH prompt. new_data = [] def find_last_outermost_boxed(string): @@ -209,7 +210,7 @@ def find_last_outermost_boxed(string): dataset.push_to_hub("ai2-adapt-dev/numinamath_tir_ground_truth_one_turn") # alternate dataset: numina-cot (much, much larger) -metamathqa_dataset = load_dataset("AI-MO/NuminaMath-CoT", split="train") +metamathqa_dataset = load_dataset("AI-MO/NuminaMath-CoT", split="train", num_proc=open_instruct_utils.max_num_processes()) # let's re-use the MATH prompt. new_data = [] for sample in tqdm(metamathqa_dataset): diff --git a/scripts/data/azure_batch/process_azure_batch_results.py b/scripts/data/azure_batch/process_azure_batch_results.py index beefdf68b3..a21c1ecdb2 100644 --- a/scripts/data/azure_batch/process_azure_batch_results.py +++ b/scripts/data/azure_batch/process_azure_batch_results.py @@ -277,7 +277,7 @@ def process_batch_results( max_rows: Optional[int] = None, ): # Load the original dataset first so we can look up failed prompts - original_ds = datasets.load_dataset(input_dataset, split=split) + original_ds = datasets.load_dataset(input_dataset, split=split, num_proc=max_num_processes()) id_lookup = {row["id"]: row for row in original_ds} all_batch_results = {} diff --git a/scripts/data/azure_batch/regenerate_dataset_completions.py b/scripts/data/azure_batch/regenerate_dataset_completions.py index 12df0eb691..a0fe82438a 100755 --- a/scripts/data/azure_batch/regenerate_dataset_completions.py +++ b/scripts/data/azure_batch/regenerate_dataset_completions.py @@ -188,7 +188,7 @@ def main(sample_limit: int | None = None, os.makedirs(f"{current_dir}/batch_files", exist_ok=True) print(f"Loading dataset {input_dataset_name} with split {split}") - input_dataset = datasets.load_dataset(input_dataset_name, split=split) + input_dataset = datasets.load_dataset(input_dataset_name, split=split, num_proc=max_num_processes()) # First get all unique IDs print(f'Processing dataset with {len(input_dataset)} rows') diff --git a/scripts/data/build_hardcoded.py b/scripts/data/build_hardcoded.py index 700df1ef12..3cbfc51f1c 100644 --- a/scripts/data/build_hardcoded.py +++ b/scripts/data/build_hardcoded.py @@ -3,6 +3,7 @@ from functools import partial from datasets import DatasetDict, load_dataset +import open_instruct.utils as open_instruct_utils from huggingface_hub import HfApi from open_instruct import logger_utils @@ -281,7 +282,7 @@ def main(): # --- Load Source Dataset --- try: logger.info(f"Loading source dataset '{args.source_repo}'...") - original_dataset = load_dataset(args.source_repo) + original_dataset = load_dataset(args.source_repo, num_proc=open_instruct_utils.max_num_processes()) logger.info(f"Dataset loaded successfully. Splits: {list(original_dataset.keys())}") except Exception as e: logger.error(f"Failed to load source dataset '{args.source_repo}': {e}") diff --git a/scripts/data/convert_general_thought_to_tulu_thinker.py b/scripts/data/convert_general_thought_to_tulu_thinker.py index 85f1b7e1f0..b5d5478615 100644 --- a/scripts/data/convert_general_thought_to_tulu_thinker.py +++ b/scripts/data/convert_general_thought_to_tulu_thinker.py @@ -9,10 +9,11 @@ import random from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils random_gen = random.Random(42) -ds = load_dataset("natolambert/GeneralThought-430K-filtered", split="train") +ds = load_dataset("natolambert/GeneralThought-430K-filtered", split="train", num_proc=open_instruct_utils.max_num_processes()) new_data = [] for sample in ds: diff --git a/scripts/data/create_deepscaler_data.py b/scripts/data/create_deepscaler_data.py index bcc8e6bda0..f88141e9a6 100644 --- a/scripts/data/create_deepscaler_data.py +++ b/scripts/data/create_deepscaler_data.py @@ -1,11 +1,12 @@ import random from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from tqdm import tqdm random_gen = random.Random(42) -dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train") +dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train", num_proc=open_instruct_utils.max_num_processes()) # reqular dataset new_data = [] @@ -19,7 +20,7 @@ dataset = Dataset.from_list(new_data) dataset.push_to_hub("ai2-adapt-dev/deepscaler-gt") -dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train") +dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train", num_proc=open_instruct_utils.max_num_processes()) # 4k length only new_data = [] for sample in tqdm(dataset): @@ -33,7 +34,7 @@ dataset = Dataset.from_list(new_data) dataset.push_to_hub("ai2-adapt-dev/deepscaler_gt_random_max_length_only") -dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train") +dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train", num_proc=open_instruct_utils.max_num_processes()) # 4k length and solution new_data = [] for sample in tqdm(dataset): @@ -47,7 +48,7 @@ dataset = Dataset.from_list(new_data) dataset.push_to_hub("ai2-adapt-dev/deepscaler_gt_with_random_max_length") -dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train") +dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train", num_proc=open_instruct_utils.max_num_processes()) # 8k length only new_data = [] for sample in tqdm(dataset): @@ -61,7 +62,7 @@ dataset = Dataset.from_list(new_data) dataset.push_to_hub("ai2-adapt-dev/deepscaler_gt_random_max_length_only_8192") -dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train") +dataset = load_dataset("agentica-org/DeepScaleR-Preview-Dataset", split="train", num_proc=open_instruct_utils.max_num_processes()) # 8k length and solution new_data = [] for sample in tqdm(dataset): diff --git a/scripts/data/create_gpqa_data.py b/scripts/data/create_gpqa_data.py index 0763496ce3..f36d34b29a 100644 --- a/scripts/data/create_gpqa_data.py +++ b/scripts/data/create_gpqa_data.py @@ -1,7 +1,8 @@ from datasets import Dataset, DatasetDict, load_dataset +import open_instruct.utils as open_instruct_utils # load dataset -dataset = load_dataset("rulins/gpqa_extended_decontamination", split="train") +dataset = load_dataset("rulins/gpqa_extended_decontamination", split="train", num_proc=open_instruct_utils.max_num_processes()) # construct rlvr-style dataset new_train_data = [] for data in dataset: @@ -34,7 +35,7 @@ }) # create eval set (gpqa diamond) -dataset = load_dataset("Idavidrein/gpqa", "gpqa_diamond", split="train") +dataset = load_dataset("Idavidrein/gpqa", "gpqa_diamond", split="train", num_proc=open_instruct_utils.max_num_processes()) # construct rlvr-style dataset new_val_data = [] for data in dataset: diff --git a/scripts/data/create_long_form_qa_data.py b/scripts/data/create_long_form_qa_data.py index 8e4bf88088..c8413b34c2 100644 --- a/scripts/data/create_long_form_qa_data.py +++ b/scripts/data/create_long_form_qa_data.py @@ -21,8 +21,7 @@ def convert_deepseek_long_form_reasoning_to_rl_format(): dataset = datasets.load_dataset( "glaiveai/reasoning-v1-20m", # cache_dir="/checkpoint/comem/rulin/cache/huggingface", - # download_timeout=1000 - )["train"].select(range(1000000)) + # download_timeout=1000, num_proc=max_num_processes())["train"].select(range(1000000)) dataset = dataset.train_test_split(test_size=0.1, seed=42) train_data = dataset["train"] test_data = dataset["test"] @@ -59,7 +58,7 @@ def convert_deepseek_long_form_reasoning_to_rl_format(): def convert_os_to_rl_format(): # Load the OS dataset - dataset = datasets.load_dataset("OpenSciLM/OS_Train_Data", split="train") + dataset = datasets.load_dataset("OpenSciLM/OS_Train_Data", split="train", num_proc=max_num_processes()) # Remove rows with dataset=="hamishivi/rs-llama-3-stuff" dataset = dataset.filter(lambda x: x["dataset"] != "hamishivi/rs-llama-3-stuff") dataset = dataset.train_test_split(test_size=0.1, seed=42) diff --git a/scripts/data/create_nq_tqa_hotpotqa_2wiki_simplqa_data.py b/scripts/data/create_nq_tqa_hotpotqa_2wiki_simplqa_data.py index 398010c5f1..41a14f828d 100644 --- a/scripts/data/create_nq_tqa_hotpotqa_2wiki_simplqa_data.py +++ b/scripts/data/create_nq_tqa_hotpotqa_2wiki_simplqa_data.py @@ -15,7 +15,7 @@ def convert_hotpotqa_to_rlvr_format(no_prompt: bool): # Load the HotpotQA dataset - hotpotqa_dataset = datasets.load_dataset("hotpot_qa", "fullwiki") + hotpotqa_dataset = datasets.load_dataset("hotpot_qa", "fullwiki", num_proc=max_num_processes()) hotpotqa_data = hotpotqa_dataset["train"] if no_prompt: @@ -54,7 +54,7 @@ def convert_hotpotqa_to_rlvr_format(no_prompt: bool): def convert_nq_to_rlvr_format_with_context(no_prompt: bool): # Load the NQ dataset - nq_dataset = datasets.load_dataset("google-research-datasets/nq_open") + nq_dataset = datasets.load_dataset("google-research-datasets/nq_open", num_proc=max_num_processes()) nq_data = nq_dataset["train"] if no_prompt: @@ -93,14 +93,14 @@ def convert_nq_to_rlvr_format_with_context(no_prompt: bool): def check_nq_rlvr_dataset(): # Load the NQ dataset - nq_dataset = datasets.load_dataset("hamishivi/nq_rlvr") + nq_dataset = datasets.load_dataset("hamishivi/nq_rlvr", num_proc=max_num_processes()) nq_data = nq_dataset["train"] import pdb; pdb.set_trace() def convert_tqa_to_rlvr_format(no_prompt: bool): # Load the TQA dataset - tqa_dataset = datasets.load_dataset("mandarjoshi/trivia_qa", "rc.nocontext") + tqa_dataset = datasets.load_dataset("mandarjoshi/trivia_qa", "rc.nocontext", num_proc=max_num_processes()) tqa_data = tqa_dataset["train"] if no_prompt: @@ -139,7 +139,7 @@ def convert_tqa_to_rlvr_format(no_prompt: bool): def convert_2wiki_to_rlvr_format(no_prompt: bool): # Load the 2Wiki dataset - two_wiki_dataset = datasets.load_dataset("hzy/kr1_2wiki") + two_wiki_dataset = datasets.load_dataset("hzy/kr1_2wiki", num_proc=max_num_processes()) two_wiki_data = two_wiki_dataset["train"] if no_prompt: @@ -177,7 +177,7 @@ def convert_2wiki_to_rlvr_format(no_prompt: bool): def convert_simple_qa_to_rlvr_format(no_prompt: bool): # Load the SimpleQA dataset - simple_qa_dataset = datasets.load_dataset("basicv8vc/SimpleQA", split="test") + simple_qa_dataset = datasets.load_dataset("basicv8vc/SimpleQA", split="test", num_proc=max_num_processes()) simple_qa_dataset = simple_qa_dataset.train_test_split(test_size=0.1, seed=42) simple_qa_data = simple_qa_dataset["train"] simple_qa_test_data = simple_qa_dataset["test"] diff --git a/scripts/data/create_thinker_tulu_data.py b/scripts/data/create_thinker_tulu_data.py index 88cd2758fa..3aefe18db2 100644 --- a/scripts/data/create_thinker_tulu_data.py +++ b/scripts/data/create_thinker_tulu_data.py @@ -1,10 +1,11 @@ import random from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils random_gen = random.Random(42) -ds = load_dataset("jacobmorrison/tulu-3-sft-t3-70b-thoughts", split="train") +ds = load_dataset("jacobmorrison/tulu-3-sft-t3-70b-thoughts", split="train", num_proc=open_instruct_utils.max_num_processes()) new_data = [] for sample in ds: diff --git a/scripts/data/filtering_and_updates/filter_chinese.py b/scripts/data/filtering_and_updates/filter_chinese.py index d9459741c4..0c43ca64f0 100644 --- a/scripts/data/filtering_and_updates/filter_chinese.py +++ b/scripts/data/filtering_and_updates/filter_chinese.py @@ -6,6 +6,7 @@ from pathlib import Path from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from huggingface_hub import hf_hub_download, list_repo_files import pyarrow.parquet as pq import pandas as pd @@ -189,7 +190,7 @@ def main(): try: # Try standard loading first - dataset = load_dataset(input_dataset, split="train") + dataset = load_dataset(input_dataset, split="train", num_proc=open_instruct_utils.max_num_processes()) except: try: # Fallback to direct parquet loading diff --git a/scripts/data/filtering_and_updates/filter_cots.py b/scripts/data/filtering_and_updates/filter_cots.py index 49ed7cae24..b57ab8c559 100644 --- a/scripts/data/filtering_and_updates/filter_cots.py +++ b/scripts/data/filtering_and_updates/filter_cots.py @@ -8,6 +8,7 @@ import argparse import re from datasets import load_dataset, Features, Sequence, Value +import open_instruct.utils as open_instruct_utils # ----------------------- filter functions ----------------------- # def is_think_answer(elem): @@ -87,7 +88,7 @@ def main() -> None: args = parser.parse_args() # Load dataset (Dataset or DatasetDict) - ds = load_dataset(args.input_dataset_name) + ds = load_dataset(args.input_dataset_name, num_proc=open_instruct_utils.max_num_processes()) # Helper: count rows across splits def count_rows(dset): diff --git a/scripts/data/filtering_and_updates/filter_cutoff_date.py b/scripts/data/filtering_and_updates/filter_cutoff_date.py index bc8755515b..2e81b1c2d7 100644 --- a/scripts/data/filtering_and_updates/filter_cutoff_date.py +++ b/scripts/data/filtering_and_updates/filter_cutoff_date.py @@ -3,6 +3,7 @@ from typing import Dict from datasets import Sequence, Value, load_dataset +import open_instruct.utils as open_instruct_utils """ Script to remove mentions of a date cutoff from post-training datasets. @@ -79,7 +80,7 @@ def main(): parser.add_argument("--output-entity", type=str, help="Output entity (org/user) for the filtered dataset. If not provided, uses the same entity as the input dataset.") args = parser.parse_args() - dataset = load_dataset(args.dataset) + dataset = load_dataset(args.dataset, num_proc=open_instruct_utils.max_num_processes()) split_name = args.split split_data = dataset[split_name] diff --git a/scripts/data/filtering_and_updates/filter_dataset_by_keywords.py b/scripts/data/filtering_and_updates/filter_dataset_by_keywords.py index 74ff77d086..53f7d9dede 100644 --- a/scripts/data/filtering_and_updates/filter_dataset_by_keywords.py +++ b/scripts/data/filtering_and_updates/filter_dataset_by_keywords.py @@ -6,6 +6,7 @@ from pathlib import Path from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from huggingface_hub import hf_hub_download, list_repo_files import pyarrow.parquet as pq import pandas as pd @@ -149,7 +150,7 @@ def main(): try: # Try standard loading first - dataset = load_dataset(input_dataset, split="train") + dataset = load_dataset(input_dataset, split="train", num_proc=open_instruct_utils.max_num_processes()) except: try: # Fallback to direct parquet loading diff --git a/scripts/data/filtering_and_updates/filter_ngram_repetitions.py b/scripts/data/filtering_and_updates/filter_ngram_repetitions.py index 68f7bea294..dbad86d22f 100644 --- a/scripts/data/filtering_and_updates/filter_ngram_repetitions.py +++ b/scripts/data/filtering_and_updates/filter_ngram_repetitions.py @@ -7,6 +7,7 @@ import multiprocessing as mp from datasets import Sequence, Value, load_dataset +import open_instruct.utils as open_instruct_utils from huggingface_hub import hf_hub_download, list_repo_files import pyarrow.parquet as pq import pandas as pd @@ -622,7 +623,7 @@ def main(): parser.error("Either dataset_name positional argument or --input-dataset must be provided") print(f"Loading dataset: {dataset_name}") - dataset = load_dataset(dataset_name, split=args.split) + dataset = load_dataset(dataset_name, split=args.split, num_proc=open_instruct_utils.max_num_processes()) print(f"Dataset loaded with {len(dataset)} examples") if args.verbose: diff --git a/scripts/data/filtering_and_updates/filter_special_tokens.py b/scripts/data/filtering_and_updates/filter_special_tokens.py index e97c5f69fb..5ee70dfa60 100644 --- a/scripts/data/filtering_and_updates/filter_special_tokens.py +++ b/scripts/data/filtering_and_updates/filter_special_tokens.py @@ -1,6 +1,7 @@ import argparse import logging from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from huggingface_hub import HfApi from open_instruct import logger_utils @@ -124,9 +125,9 @@ def main(): # Load dataset if args.debug: logger.info("Debug mode: Loading first 100 samples") - dataset = load_dataset(args.dataset_name, split="train[:100]") + dataset = load_dataset(args.dataset_name, split="train[:100]", num_proc=open_instruct_utils.max_num_processes()) else: - dataset = load_dataset(args.dataset_name, split="train") + dataset = load_dataset(args.dataset_name, split="train", num_proc=open_instruct_utils.max_num_processes()) logger.info(f"Original dataset size: {len(dataset)}") diff --git a/scripts/data/filtering_and_updates/filter_wildchat.py b/scripts/data/filtering_and_updates/filter_wildchat.py index 5f5f501270..ffc23e4a8d 100644 --- a/scripts/data/filtering_and_updates/filter_wildchat.py +++ b/scripts/data/filtering_and_updates/filter_wildchat.py @@ -18,6 +18,7 @@ import logging import os from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from open_instruct import logger_utils @@ -60,7 +61,7 @@ def main(): args = parser.parse_args() logger.info("Loading dataset %s…", args.input_dataset) - ds = load_dataset(args.input_dataset, split="train") + ds = load_dataset(args.input_dataset, split="train", num_proc=open_instruct_utils.max_num_processes()) logger.info("Loaded dataset with %d examples.", ds.num_rows) # 1) Filter out toxic examples diff --git a/scripts/data/filtering_and_updates/update_subsets.py b/scripts/data/filtering_and_updates/update_subsets.py index 642e981cf5..9e7ee1bdbc 100644 --- a/scripts/data/filtering_and_updates/update_subsets.py +++ b/scripts/data/filtering_and_updates/update_subsets.py @@ -21,6 +21,7 @@ import argparse from datasets import Dataset, concatenate_datasets, load_dataset +import open_instruct.utils as open_instruct_utils def remove_specified_columns(dataset: Dataset, columns_to_remove: list) -> Dataset: @@ -87,7 +88,7 @@ def main(): # 1. Load the base dataset print(f"Loading base dataset: {args.base_ds}") - base_ds = load_dataset(args.base_ds, split="train") + base_ds = load_dataset(args.base_ds, split="train", num_proc=open_instruct_utils.max_num_processes()) # 2. Filter out unwanted sources, if any if len(args.remove_sources) > 0: @@ -107,7 +108,7 @@ def keep_example(example): combined_ds = filtered_ds for ds_name in args.add_ds: - add_ds = load_dataset(ds_name, split="train") + add_ds = load_dataset(ds_name, split="train", num_proc=open_instruct_utils.max_num_processes()) # add column to add_ds where 'source' is the name of the column (it shouldnt be in it for subset) source = [ds_name] * len(add_ds) add_ds = add_ds.add_column("source", source) diff --git a/scripts/data/get_statistics.py b/scripts/data/get_statistics.py index 938efc8801..7470041bb6 100644 --- a/scripts/data/get_statistics.py +++ b/scripts/data/get_statistics.py @@ -20,6 +20,7 @@ import pandas as pd import tqdm from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from huggingface_hub import repo_exists from transformers import AutoTokenizer @@ -33,7 +34,7 @@ def get_statistics_for_messages_data( ): if dataset is None: # load dataset - dataset = load_dataset("json", data_files={split: data_path}) + dataset = load_dataset("json", data_files={split: data_path}, num_proc=open_instruct_utils.max_num_processes()) # tokenize dataset tokenizer = AutoTokenizer.from_pretrained(tokenizer, use_fast=False) # get statistics @@ -107,7 +108,7 @@ def get_statistics_for_prompt_completion_data( ): if dataset is None: # load dataset - dataset = load_dataset("json", data_files={split: data_path}) + dataset = load_dataset("json", data_files={split: data_path}, num_proc=open_instruct_utils.max_num_processes()) prompts = [instance["prompt"] for instance in dataset[split]] completions = [instance[response_key] for instance in dataset[split]] # tokenize dataset @@ -162,7 +163,7 @@ def get_statistics_for_prompt_completion_data( elif repo_exists(args.data_path, repo_type="dataset"): - dataset = load_dataset(args.data_path) + dataset = load_dataset(args.data_path, num_proc=open_instruct_utils.max_num_processes()) sample = dataset[args.split][0] else: raise ValueError("Invalid data path - the data path should be either a dataset id or a path to a json file.") diff --git a/scripts/data/get_token_distribution.py b/scripts/data/get_token_distribution.py index f706cf3676..f54fe11a00 100644 --- a/scripts/data/get_token_distribution.py +++ b/scripts/data/get_token_distribution.py @@ -4,6 +4,7 @@ import matplotlib.pyplot as plt import numpy as np from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from matplotlib.patches import Patch from transformers import AutoTokenizer @@ -100,7 +101,7 @@ def plot_token_length_histogram(dataset_name, print("Running analytics...") # Load the dataset - dataset = load_dataset(dataset_name) + dataset = load_dataset(dataset_name, num_proc=open_instruct_utils.max_num_processes()) # Convert "from"/"value" format to "role"/"content" if needed def convert_to_messages(sample, column_name=column_name): diff --git a/scripts/data/gsm8k.py b/scripts/data/gsm8k.py index 5411b8acd5..f3658ec0ff 100644 --- a/scripts/data/gsm8k.py +++ b/scripts/data/gsm8k.py @@ -23,7 +23,7 @@ class Args: hf_entity: Optional[str] = None def main(args: Args): - dataset = datasets.load_dataset("gsm8k", "main") + dataset = datasets.load_dataset("gsm8k", "main", num_proc=max_num_processes()) print(dataset) def process(example): diff --git a/scripts/data/make_eurus2_data.py b/scripts/data/make_eurus2_data.py index 3f41b0f787..134b8af736 100644 --- a/scripts/data/make_eurus2_data.py +++ b/scripts/data/make_eurus2_data.py @@ -1,11 +1,12 @@ import random from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from tqdm import tqdm random_gen = random.Random(42) -dataset = load_dataset("ai2-adapt-dev/eurus2_ground_truth", split="train") +dataset = load_dataset("ai2-adapt-dev/eurus2_ground_truth", split="train", num_proc=open_instruct_utils.max_num_processes()) new_data = [] for sample in tqdm(dataset): diff --git a/scripts/data/preferences/helpsteer2.py b/scripts/data/preferences/helpsteer2.py index f1ba746390..51d0ff9583 100644 --- a/scripts/data/preferences/helpsteer2.py +++ b/scripts/data/preferences/helpsteer2.py @@ -61,7 +61,7 @@ def main( hf_entity: Optional[str], ): # Load the HelpSteer 2 dataset - ds = datasets.load_dataset("nvidia/HelpSteer2") + ds = datasets.load_dataset("nvidia/HelpSteer2", num_proc=max_num_processes()) # Binarize the dataset binarized_ds = binarize_dataset(ds["train"], aspects_to_ignore, min_score, margin) diff --git a/scripts/data/preferences/helpsteer2_nvidia.py b/scripts/data/preferences/helpsteer2_nvidia.py index 5ec17fe086..ecdbb76384 100644 --- a/scripts/data/preferences/helpsteer2_nvidia.py +++ b/scripts/data/preferences/helpsteer2_nvidia.py @@ -60,7 +60,7 @@ def binarize_dataset(dataset, min_score: Optional[float]): def main(min_score: Optional[float], push_to_hub: bool, hf_entity: Optional[str]): # Load the HelpSteer 2 dataset - ds = datasets.load_dataset("nvidia/HelpSteer2") + ds = datasets.load_dataset("nvidia/HelpSteer2", num_proc=max_num_processes()) # Binarize the dataset binarized_ds = binarize_dataset(ds["train"], min_score) diff --git a/scripts/data/preferences/hh-harmless.py b/scripts/data/preferences/hh-harmless.py index d29e2784f7..67b6708249 100644 --- a/scripts/data/preferences/hh-harmless.py +++ b/scripts/data/preferences/hh-harmless.py @@ -57,7 +57,7 @@ def filter_fn(data): def main(push_to_hub: bool, hf_entity: str | None): api = HfApi() - ds = datasets.load_dataset("Anthropic/hh-rlhf", data_dir="harmless-base") + ds = datasets.load_dataset("Anthropic/hh-rlhf", data_dir="harmless-base", num_proc=max_num_processes()) ds = ds.map( extract, diff --git a/scripts/data/preferences/hh-helpful.py b/scripts/data/preferences/hh-helpful.py index 31def6d6e5..c0f6934cfd 100644 --- a/scripts/data/preferences/hh-helpful.py +++ b/scripts/data/preferences/hh-helpful.py @@ -57,7 +57,7 @@ def filter_fn(data): def main(push_to_hub: bool, hf_entity: str | None): api = HfApi() - ds = datasets.load_dataset("Anthropic/hh-rlhf", data_dir="helpful-base") + ds = datasets.load_dataset("Anthropic/hh-rlhf", data_dir="helpful-base", num_proc=max_num_processes()) ds = ds.map( extract, diff --git a/scripts/data/preferences/nectar.py b/scripts/data/preferences/nectar.py index 57a93f746f..a03fd4795a 100644 --- a/scripts/data/preferences/nectar.py +++ b/scripts/data/preferences/nectar.py @@ -2,6 +2,7 @@ import random from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from tqdm import tqdm @@ -19,7 +20,7 @@ def is_valid_response(response): def load_nectar_dataset(subset="lmsys-chat-1m", deduplication=False): # Load the Nectar dataset - nectar_dataset = load_dataset("berkeley-nest/Nectar", split="train") + nectar_dataset = load_dataset("berkeley-nest/Nectar", split="train", num_proc=open_instruct_utils.max_num_processes()) print(f"Original Nectar dataset size: {len(nectar_dataset)}") if deduplication: @@ -27,7 +28,7 @@ def load_nectar_dataset(subset="lmsys-chat-1m", deduplication=False): # very popular and sourcing many of the same prompts (FLAN, ShareGPT, evol instruct, etc) # we handle LMSYS and Anthropic HH separately because UltraFeedback does not use these print("Deduplication enabled. Loading UltraFeedback dataset...") - ultra_feedback = load_dataset("openbmb/UltraFeedback", split="train") + ultra_feedback = load_dataset("openbmb/UltraFeedback", split="train", num_proc=open_instruct_utils.max_num_processes()) print(f"UltraFeedback dataset size: {len(ultra_feedback)}") # Create a set of UltraFeedback instructions for faster lookup diff --git a/scripts/data/preferences/split_tulu2.5_prefs.py b/scripts/data/preferences/split_tulu2.5_prefs.py index c8ecbf8b51..f13469146a 100644 --- a/scripts/data/preferences/split_tulu2.5_prefs.py +++ b/scripts/data/preferences/split_tulu2.5_prefs.py @@ -4,7 +4,7 @@ def main(push_to_hub: bool, hf_entity: str | None): - ds = datasets.load_dataset("allenai/tulu-2.5-preference-data") + ds = datasets.load_dataset("allenai/tulu-2.5-preference-data", num_proc=max_num_processes()) # for each config in the above dict, upload a new private dataset # with the corresponding subsets, for easy mixing diff --git a/scripts/data/preferences/ultrafeedback_replications.py b/scripts/data/preferences/ultrafeedback_replications.py index 2c8d586285..a16ccabce4 100644 --- a/scripts/data/preferences/ultrafeedback_replications.py +++ b/scripts/data/preferences/ultrafeedback_replications.py @@ -14,7 +14,7 @@ def main(push_to_hub: bool, hf_entity: str | None): for key in setups: json_path = f"setup_{key}" data_file = json_path + "/" + f"uf_setup_{key}_dpo_9000.jsonl" - dataset = datasets.load_dataset(load_repo, data_files=data_file) + dataset = datasets.load_dataset(load_repo, data_files=data_file, num_proc=max_num_processes()) dataset_name = f"{dataset_name_base}{key}" if push_to_hub: diff --git a/scripts/data/preferences/ultrainteract.py b/scripts/data/preferences/ultrainteract.py index 1e9dcadad8..e48126ad44 100644 --- a/scripts/data/preferences/ultrainteract.py +++ b/scripts/data/preferences/ultrainteract.py @@ -2,6 +2,7 @@ import numpy as np from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from utils import convert_message_keys @@ -35,7 +36,7 @@ def create_chosen_and_rejected(row): def process_and_upload(task, push_to_hub, hf_entity): - dataset = load_dataset("openbmb/UltraInteract_pair", split="train") + dataset = load_dataset("openbmb/UltraInteract_pair", split="train", num_proc=open_instruct_utils.max_num_processes()) dataset_task = dataset.filter(lambda x: x["task"] == task) # convert to pandas dataframe diff --git a/scripts/data/preferences/webgpt.py b/scripts/data/preferences/webgpt.py index aaae97fc18..38276e4f26 100644 --- a/scripts/data/preferences/webgpt.py +++ b/scripts/data/preferences/webgpt.py @@ -68,7 +68,7 @@ def format_quotes(quotes): def main(push_to_hub: bool, hf_entity: str | None): api = HfApi() - ds = datasets.load_dataset("openai/webgpt_comparisons") + ds = datasets.load_dataset("openai/webgpt_comparisons", num_proc=max_num_processes()) # filter out the ties ds = ds["train"].filter( diff --git a/scripts/data/rlvr/acecoder.py b/scripts/data/rlvr/acecoder.py index 8e47330b69..52f1120d51 100644 --- a/scripts/data/rlvr/acecoder.py +++ b/scripts/data/rlvr/acecoder.py @@ -20,7 +20,7 @@ class Args: hf_entity: Optional[str] = None def main(args: Args): - dataset = datasets.load_dataset("TIGER-Lab/AceCode-87K", split="train") + dataset = datasets.load_dataset("TIGER-Lab/AceCode-87K", split="train", num_proc=max_num_processes()) def process(example): example["messages"] = [ diff --git a/scripts/data/rlvr/filtering_vllm.py b/scripts/data/rlvr/filtering_vllm.py index 160dca91f5..d7554db1ce 100644 --- a/scripts/data/rlvr/filtering_vllm.py +++ b/scripts/data/rlvr/filtering_vllm.py @@ -20,6 +20,7 @@ import json from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from transformers import AutoTokenizer from vllm import LLM, SamplingParams @@ -89,7 +90,7 @@ def main(): args = parser.parse_args() # 1. Load and slice dataset - ds = load_dataset(args.dataset, split=args.split) + ds = load_dataset(args.dataset, split=args.split, num_proc=open_instruct_utils.max_num_processes()) ds = ds.shuffle(seed=42) # so we dont just take first n samples up_to = min(args.offset + args.size, len(ds)) subset = ds.select(range(args.offset, up_to)) @@ -133,7 +134,7 @@ def main(): out_f.write(json.dumps(enriched, ensure_ascii=False) + "\n") if args.push_to_hub is not None: - dataset = load_dataset(args.dataset, split=args.split) + dataset = load_dataset(args.dataset, split=args.split, num_proc=open_instruct_utils.max_num_processes()) dataset.push_to_hub(args.push_to_hub) diff --git a/scripts/data/rlvr/gsm8k_rlvr.py b/scripts/data/rlvr/gsm8k_rlvr.py index c16148e131..f612d7b826 100644 --- a/scripts/data/rlvr/gsm8k_rlvr.py +++ b/scripts/data/rlvr/gsm8k_rlvr.py @@ -23,7 +23,7 @@ class Args: hf_entity: Optional[str] = None def main(args: Args): - dataset = datasets.load_dataset("gsm8k", "main") + dataset = datasets.load_dataset("gsm8k", "main", num_proc=max_num_processes()) def process(example): # extract answer; it's after #### in the answer diff --git a/scripts/data/rlvr_code/code_create_batch.py b/scripts/data/rlvr_code/code_create_batch.py index fb5ab52f60..abfd3faf6a 100644 --- a/scripts/data/rlvr_code/code_create_batch.py +++ b/scripts/data/rlvr_code/code_create_batch.py @@ -97,6 +97,7 @@ from typing import List from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI from pydantic import BaseModel @@ -118,10 +119,10 @@ SPLIT = "python" hf_datasets = { - "taco": load_dataset("BAAI/TACO", trust_remote_code=True), - "apps": load_dataset("codeparrot/apps", trust_remote_code=True), - "code_contests": load_dataset("deepmind/code_contests"), - "open-r1/codeforces": load_dataset("open-r1/codeforces") + "taco": load_dataset("BAAI/TACO", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "apps": load_dataset("codeparrot/apps", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "code_contests": load_dataset("deepmind/code_contests", num_proc=open_instruct_utils.max_num_processes()), + "open-r1/codeforces": load_dataset("open-r1/codeforces", num_proc=open_instruct_utils.max_num_processes()) } def extract_python_code(model_output: str) -> str: @@ -258,7 +259,7 @@ def find_cached_results(id: str): def main(): global SAMPLE_LIMIT - input_dataset = load_dataset(INPUT_HF_DATASET, "train", split=SPLIT) + input_dataset = load_dataset(INPUT_HF_DATASET, "train", split=SPLIT, num_proc=open_instruct_utils.max_num_processes()) # First get all unique IDs unique_ids = set() diff --git a/scripts/data/rlvr_code/code_create_batch_solution.py b/scripts/data/rlvr_code/code_create_batch_solution.py index 97ccd26a6b..e0304840ad 100644 --- a/scripts/data/rlvr_code/code_create_batch_solution.py +++ b/scripts/data/rlvr_code/code_create_batch_solution.py @@ -34,6 +34,7 @@ import time from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI client = AzureOpenAI( @@ -106,7 +107,7 @@ def create_batch_file(prompts): def main(): global SAMPLE_LIMIT - input_dataset = load_dataset(INPUT_HF_DATASET, split=SPLIT) + input_dataset = load_dataset(INPUT_HF_DATASET, split=SPLIT, num_proc=open_instruct_utils.max_num_processes()) print(f"Found {len(input_dataset)} total rows") diff --git a/scripts/data/rlvr_code/code_create_batch_stdio.py b/scripts/data/rlvr_code/code_create_batch_stdio.py index ee345a3a05..8cb7afa1e3 100644 --- a/scripts/data/rlvr_code/code_create_batch_stdio.py +++ b/scripts/data/rlvr_code/code_create_batch_stdio.py @@ -46,6 +46,7 @@ from typing import List from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI from pydantic import BaseModel, ConfigDict @@ -79,10 +80,10 @@ def extract_python_code(model_output: str) -> str: hf_datasets = { - "taco": load_dataset("BAAI/TACO", trust_remote_code=True), - "apps": load_dataset("codeparrot/apps", trust_remote_code=True), - "code_contests": load_dataset("deepmind/code_contests"), - "open-r1/codeforces": load_dataset("open-r1/codeforces") + "taco": load_dataset("BAAI/TACO", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "apps": load_dataset("codeparrot/apps", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "code_contests": load_dataset("deepmind/code_contests", num_proc=open_instruct_utils.max_num_processes()), + "open-r1/codeforces": load_dataset("open-r1/codeforces", num_proc=open_instruct_utils.max_num_processes()) } def get_question(ds_name, split, index): @@ -199,7 +200,7 @@ def find_cached_results(id: str): def main(): global SAMPLE_LIMIT - input_dataset = load_dataset(INPUT_HF_DATASET, split=SPLIT) + input_dataset = load_dataset(INPUT_HF_DATASET, split=SPLIT, num_proc=open_instruct_utils.max_num_processes()) # First get all unique IDs unique_ids = set() diff --git a/scripts/data/rlvr_code/code_create_batch_translate.py b/scripts/data/rlvr_code/code_create_batch_translate.py index c743c4f560..9820fba948 100644 --- a/scripts/data/rlvr_code/code_create_batch_translate.py +++ b/scripts/data/rlvr_code/code_create_batch_translate.py @@ -45,6 +45,7 @@ from typing import List from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI from pydantic import BaseModel @@ -67,10 +68,10 @@ TARGET_LANGUAGES = ["JavaScript", "bash", "C++", "Go", "Java", "Rust", "Swift", "Kotlin", "Haskell", "Lean", "TypeScript"] hf_datasets = { - "taco": load_dataset("BAAI/TACO", trust_remote_code=True), - "apps": load_dataset("codeparrot/apps", trust_remote_code=True), - "code_contests": load_dataset("deepmind/code_contests"), - "open-r1/codeforces": load_dataset("open-r1/codeforces") + "taco": load_dataset("BAAI/TACO", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "apps": load_dataset("codeparrot/apps", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "code_contests": load_dataset("deepmind/code_contests", num_proc=open_instruct_utils.max_num_processes()), + "open-r1/codeforces": load_dataset("open-r1/codeforces", num_proc=open_instruct_utils.max_num_processes()) } def extract_python_code(model_output: str) -> str: @@ -224,7 +225,7 @@ def main(target_language): input_rows = [] for INPUT_HF_DATASET in INPUT_HF_DATASETS: - input_dataset = load_dataset(INPUT_HF_DATASET, split=SPLIT) + input_dataset = load_dataset(INPUT_HF_DATASET, split=SPLIT, num_proc=open_instruct_utils.max_num_processes()) input_rows.extend(input_dataset) # First get all unique IDs diff --git a/scripts/data/rlvr_code/code_upload_batch.py b/scripts/data/rlvr_code/code_upload_batch.py index 787fb0395c..915fae60f9 100644 --- a/scripts/data/rlvr_code/code_upload_batch.py +++ b/scripts/data/rlvr_code/code_upload_batch.py @@ -88,6 +88,7 @@ import requests from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI from pydantic import BaseModel @@ -102,10 +103,10 @@ SPLIT = "python" hf_datasets = { - "taco": load_dataset("BAAI/TACO", trust_remote_code=True), - "apps": load_dataset("codeparrot/apps", trust_remote_code=True), - "code_contests": load_dataset("deepmind/code_contests"), - "open-r1/codeforces": load_dataset("open-r1/codeforces") + "taco": load_dataset("BAAI/TACO", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "apps": load_dataset("codeparrot/apps", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "code_contests": load_dataset("deepmind/code_contests", num_proc=open_instruct_utils.max_num_processes()), + "open-r1/codeforces": load_dataset("open-r1/codeforces", num_proc=open_instruct_utils.max_num_processes()) } def get_question(ds_name, split, index): @@ -219,7 +220,7 @@ def process_batch_results(batch_id: str): # Filter and validate results url = "http://localhost:1234/test_program" new_results = [] - original_dataset = load_dataset(INPUT_HF_DATASET, SPLIT, split=SPLIT) + original_dataset = load_dataset(INPUT_HF_DATASET, SPLIT, split=SPLIT, num_proc=open_instruct_utils.max_num_processes()) # Create a lookup dictionary for O(1) access id_to_row = {row['id']: row for row in original_dataset} diff --git a/scripts/data/rlvr_code/code_upload_batch_difficulty.py b/scripts/data/rlvr_code/code_upload_batch_difficulty.py index 7e0ca2587b..8d7a710f4a 100644 --- a/scripts/data/rlvr_code/code_upload_batch_difficulty.py +++ b/scripts/data/rlvr_code/code_upload_batch_difficulty.py @@ -33,6 +33,7 @@ import os from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI from pydantic import BaseModel, ConfigDict @@ -114,7 +115,7 @@ def process_batch_results(dataset: str, batch_id: str): print("No results found in batch") return - ds = load_dataset(dataset, split='train') + ds = load_dataset(dataset, split='train', num_proc=open_instruct_utils.max_num_processes()) print("building id to row") id_to_row = {get_id(row): row for row in ds} new_results = [] diff --git a/scripts/data/rlvr_code/code_upload_batch_stdio.py b/scripts/data/rlvr_code/code_upload_batch_stdio.py index 1df77633ad..420097452f 100644 --- a/scripts/data/rlvr_code/code_upload_batch_stdio.py +++ b/scripts/data/rlvr_code/code_upload_batch_stdio.py @@ -57,6 +57,7 @@ import requests from datasets import Dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI from pydantic import BaseModel @@ -164,7 +165,7 @@ def process_batch_results(batch_ids: List[str]): # Filter and validate results url = "http://localhost:1234/test_program_stdio" new_results = [] - #original_dataset = load_dataset(INPUT_HF_DATASET, "SFT", split=SPLIT) + #original_dataset = load_dataset(INPUT_HF_DATASET, "SFT", split=SPLIT, num_proc=open_instruct_utils.max_num_processes()) # Create a lookup dictionary for O(1) access print('here') diff --git a/scripts/data/rlvr_code/code_upload_batch_translate.py b/scripts/data/rlvr_code/code_upload_batch_translate.py index 7813fc5150..00158be585 100644 --- a/scripts/data/rlvr_code/code_upload_batch_translate.py +++ b/scripts/data/rlvr_code/code_upload_batch_translate.py @@ -35,6 +35,7 @@ from typing import List from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI from pydantic import BaseModel @@ -53,10 +54,10 @@ # needed for open-code-reasoning-2 hf_datasets = { - "taco": load_dataset("BAAI/TACO", trust_remote_code=True), - "apps": load_dataset("codeparrot/apps", trust_remote_code=True), - "code_contests": load_dataset("deepmind/code_contests"), - "open-r1/codeforces": load_dataset("open-r1/codeforces") + "taco": load_dataset("BAAI/TACO", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "apps": load_dataset("codeparrot/apps", trust_remote_code=True, num_proc=open_instruct_utils.max_num_processes()), + "code_contests": load_dataset("deepmind/code_contests", num_proc=open_instruct_utils.max_num_processes()), + "open-r1/codeforces": load_dataset("open-r1/codeforces", num_proc=open_instruct_utils.max_num_processes()) } def get_question(ds_name, split, index): diff --git a/scripts/data/rlvr_code/filter_seq_len.py b/scripts/data/rlvr_code/filter_seq_len.py index 749c5aa741..e84237847e 100644 --- a/scripts/data/rlvr_code/filter_seq_len.py +++ b/scripts/data/rlvr_code/filter_seq_len.py @@ -133,7 +133,7 @@ def main(args): logger.info(f"Loading dataset: {args.dataset_name}, split: {args.split}, streaming={args.streaming}") try: - dataset = datasets.load_dataset(args.dataset_name, split=args.split, streaming=args.streaming) + dataset = datasets.load_dataset(args.dataset_name, split=args.split, streaming=args.streaming, num_proc=max_num_processes()) except FileNotFoundError: logger.error(f"Dataset '{args.dataset_name}' not found.") sys.exit(1) diff --git a/scripts/data/rlvr_code/grade_difficulty.py b/scripts/data/rlvr_code/grade_difficulty.py index b80835ebed..e11a0be2f6 100644 --- a/scripts/data/rlvr_code/grade_difficulty.py +++ b/scripts/data/rlvr_code/grade_difficulty.py @@ -15,6 +15,7 @@ import time from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from openai import AzureOpenAI from pydantic import BaseModel, ConfigDict @@ -118,7 +119,7 @@ def create_batch_file(prompts): f.write(json.dumps(batch_request) + "\n") def grade_difficulty(dataset): - ds = load_dataset(dataset, split='train') + ds = load_dataset(dataset, split='train', num_proc=open_instruct_utils.max_num_processes()) prompts = [] for row in ds: problem = get_input(row) diff --git a/scripts/data/rlvr_code/plot_seq_len.py b/scripts/data/rlvr_code/plot_seq_len.py index 3990b68e34..5044491b5a 100644 --- a/scripts/data/rlvr_code/plot_seq_len.py +++ b/scripts/data/rlvr_code/plot_seq_len.py @@ -86,7 +86,7 @@ def main(args): logger.info(f"Loading dataset: {args.dataset_name}, split: {args.split}") dataset = None try: - dataset = datasets.load_dataset(args.dataset_name, split=args.split, streaming=args.streaming) + dataset = datasets.load_dataset(args.dataset_name, split=args.split, streaming=args.streaming, num_proc=max_num_processes()) if args.streaming: logger.info("Processing in streaming mode.") if args.max_samples_streaming > 0: diff --git a/scripts/data/rlvr_code/rlvr_to_sft.py b/scripts/data/rlvr_code/rlvr_to_sft.py index 7c983e113f..c4f172884d 100644 --- a/scripts/data/rlvr_code/rlvr_to_sft.py +++ b/scripts/data/rlvr_code/rlvr_to_sft.py @@ -53,7 +53,7 @@ def get_original_input(row): def main(): try: logger.info(f"Loading dataset: {INPUT_HF_DATASET}") - input_ds = datasets.load_dataset(INPUT_HF_DATASET, split="train") + input_ds = datasets.load_dataset(INPUT_HF_DATASET, split="train", num_proc=max_num_processes()) logger.info(f"Loaded {len(input_ds)} rows") stdin_rows = [] diff --git a/scripts/data/rlvr_code/sft_to_rlvr_azure.py b/scripts/data/rlvr_code/sft_to_rlvr_azure.py index 155391ce61..b8280a5d12 100644 --- a/scripts/data/rlvr_code/sft_to_rlvr_azure.py +++ b/scripts/data/rlvr_code/sft_to_rlvr_azure.py @@ -80,6 +80,7 @@ import boto3 # Added for S3 import requests from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from openai import AsyncAzureOpenAI, RateLimitError # Modified to import RateLimitError from tqdm.asyncio import tqdm_asyncio @@ -290,7 +291,7 @@ async def main(): if READ_FROM_CACHE: # Only initialize if cache is enabled initialize_cache_index() - dataset = load_dataset(INPUT_DATASET_NAME, split="train") + dataset = load_dataset(INPUT_DATASET_NAME, split="train", num_proc=open_instruct_utils.max_num_processes()) dataset = dataset.filter(lambda x: x['solution'] != "NO SOLUTION" and x['generator'] == 'DeepSeek-R1') print(f"Samples after filtering: {len(dataset)}") diff --git a/scripts/data/rlvr_code/verify_qwq.py b/scripts/data/rlvr_code/verify_qwq.py index 8cbce00694..668bbbe399 100644 --- a/scripts/data/rlvr_code/verify_qwq.py +++ b/scripts/data/rlvr_code/verify_qwq.py @@ -14,6 +14,7 @@ from tqdm import tqdm from datasets import load_dataset, Dataset +import open_instruct.utils as open_instruct_utils import hashlib import os import re @@ -38,7 +39,7 @@ def load_dataset_with_retries(dataset_name, split="train", max_retries=5, initia backoff = initial_backoff while retries < max_retries: try: - return load_dataset(dataset_name, split=split) + return load_dataset(dataset_name, split=split, num_proc=open_instruct_utils.max_num_processes()) except Exception as e: if "429" in str(e): logger.warning(f"Rate limit exceeded for {dataset_name}. Retrying in {backoff} seconds...") diff --git a/scripts/data/scripts/data/create_asqa_data.py b/scripts/data/scripts/data/create_asqa_data.py index 2ab0534b57..67b3a93825 100644 --- a/scripts/data/scripts/data/create_asqa_data.py +++ b/scripts/data/scripts/data/create_asqa_data.py @@ -1,5 +1,6 @@ from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils """ Example input: @@ -19,7 +20,7 @@ def load_asqa_dataset(): Returns the train, validation, and test splits. """ # Load the dataset from Hugging Face - dataset = load_dataset("din0s/asqa") + dataset = load_dataset("din0s/asqa", num_proc=open_instruct_utils.max_num_processes()) # Get the different splits train_data = dataset["train"] diff --git a/scripts/data/sft/coconot.py b/scripts/data/sft/coconot.py index be10dc5ce2..85ecdad62f 100644 --- a/scripts/data/sft/coconot.py +++ b/scripts/data/sft/coconot.py @@ -1,6 +1,7 @@ import argparse from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -68,7 +69,7 @@ "for more information about this dataset and the license." ) - ds = load_dataset("allenai/coconot", "original") + ds = load_dataset("allenai/coconot", "original", num_proc=open_instruct_utils.max_num_processes()) convert_sft_dataset( ds=ds, hf_dataset_id=None, diff --git a/scripts/data/sft/daring_anteater.py b/scripts/data/sft/daring_anteater.py index 170a437b8f..f90b3f427d 100644 --- a/scripts/data/sft/daring_anteater.py +++ b/scripts/data/sft/daring_anteater.py @@ -1,6 +1,7 @@ import argparse from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -81,7 +82,7 @@ def conversion_func(example): "for more information about this dataset and the license." ) - ds = load_dataset("nvidia/Daring-Anteater") + ds = load_dataset("nvidia/Daring-Anteater", num_proc=open_instruct_utils.max_num_processes()) if args.remove_subsets: ds = ds.filter(lambda x: x["dataset"] not in args.remove_subsets) diff --git a/scripts/data/sft/flan.py b/scripts/data/sft/flan.py index afcf77fe3b..c4ee582d70 100644 --- a/scripts/data/sft/flan.py +++ b/scripts/data/sft/flan.py @@ -1,6 +1,7 @@ import argparse from datasets import concatenate_datasets, load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -80,7 +81,7 @@ subsets = [] for subset, sampling_size in sampling_sizes.items(): - ds = load_dataset("Open-Orca/FLAN", data_files=f"{subset}/*")["train"] + ds = load_dataset("Open-Orca/FLAN", data_files=f"{subset}/*", num_proc=open_instruct_utils.max_num_processes())["train"] if len(ds) > sampling_size: ds = ds.shuffle(seed=42).select(range(sampling_size)) subsets.append(ds) diff --git a/scripts/data/sft/lima.py b/scripts/data/sft/lima.py index 56a630c41b..7f0a021d61 100644 --- a/scripts/data/sft/lima.py +++ b/scripts/data/sft/lima.py @@ -1,6 +1,7 @@ import argparse from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -73,7 +74,7 @@ def conversion_func(example): "for more information about this dataset and the license." ) - ds = load_dataset("GAIR/lima")["train"] + ds = load_dataset("GAIR/lima", num_proc=open_instruct_utils.max_num_processes())["train"] convert_sft_dataset( ds=ds, hf_dataset_id=None, diff --git a/scripts/data/sft/llama_nemotron.py b/scripts/data/sft/llama_nemotron.py index aec520b58f..25642eadb3 100644 --- a/scripts/data/sft/llama_nemotron.py +++ b/scripts/data/sft/llama_nemotron.py @@ -2,6 +2,7 @@ import uuid from datasets import load_dataset +import open_instruct.utils as open_instruct_utils LLAMA_NEMOTRON_REPO = "nvidia/Llama-Nemotron-Post-Training-Dataset" SFT_DEST_REPO = "saurabh5/llama-nemotron-sft" @@ -21,7 +22,7 @@ def extract_python_code(model_output: str) -> str: def process_and_upload_dataset(): print(f"Loading dataset {LLAMA_NEMOTRON_REPO}, subset SFT...") - dataset = load_dataset(LLAMA_NEMOTRON_REPO, "SFT", split="code") + dataset = load_dataset(LLAMA_NEMOTRON_REPO, "SFT", split="code", num_proc=open_instruct_utils.max_num_processes()) print(f"Dataset loaded with {len(dataset)} examples, proceeed?") breakpoint() diff --git a/scripts/data/sft/lmsys_chat.py b/scripts/data/sft/lmsys_chat.py index efec1ec8f4..ed23d0bdb8 100644 --- a/scripts/data/sft/lmsys_chat.py +++ b/scripts/data/sft/lmsys_chat.py @@ -2,6 +2,7 @@ import re from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -69,7 +70,7 @@ "for more information about this dataset and the license." ) - ds = load_dataset("lmsys/lmsys-chat-1m") + ds = load_dataset("lmsys/lmsys-chat-1m", num_proc=open_instruct_utils.max_num_processes()) if args.model_name_regex: ds = ds.filter(lambda example: re.search(args.model_name_regex, example["model"])) print(f"Dataset size after model name regex: {ds.num_rows}") diff --git a/scripts/data/sft/open_assistant.py b/scripts/data/sft/open_assistant.py index b4576de8e0..7c44defbdf 100644 --- a/scripts/data/sft/open_assistant.py +++ b/scripts/data/sft/open_assistant.py @@ -3,6 +3,7 @@ from typing import Optional from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -143,7 +144,7 @@ def dfs(node, stack, valid_sequences): "for more information about this dataset and the license." ) - v1_ds = load_dataset("OpenAssistant/oasst1")["train"] + v1_ds = load_dataset("OpenAssistant/oasst1", num_proc=open_instruct_utils.max_num_processes())["train"] v1_sequences = convert_oasst_dataset(v1_ds, top_k=args.top_k) v1_instances = [] for i, sequence in enumerate(v1_sequences): @@ -188,7 +189,7 @@ def dfs(node, stack, valid_sequences): "for more information about this dataset and the license." ) - v2_ds = load_dataset("OpenAssistant/oasst2")["train"] + v2_ds = load_dataset("OpenAssistant/oasst2", num_proc=open_instruct_utils.max_num_processes())["train"] v2_sequences = convert_oasst_dataset(v2_ds, top_k=args.top_k) v2_instances = [] for i, sequence in enumerate(v2_sequences): diff --git a/scripts/data/sft/open_code_reasoner.py b/scripts/data/sft/open_code_reasoner.py index cd7ab9e2c4..906704bdd5 100644 --- a/scripts/data/sft/open_code_reasoner.py +++ b/scripts/data/sft/open_code_reasoner.py @@ -2,6 +2,7 @@ from typing import List from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from tqdm import tqdm NVIDIA_OCR = "nvidia/OpenCodeReasoning" @@ -12,8 +13,8 @@ def main(): - ocr_split0 = load_dataset(NVIDIA_OCR, 'split_0', split="split_0") - my_ocr = load_dataset(MY_OCR, split="train") + ocr_split0 = load_dataset(NVIDIA_OCR, 'split_0', split="split_0", num_proc=open_instruct_utils.max_num_processes()) + my_ocr = load_dataset(MY_OCR, split="train", num_proc=open_instruct_utils.max_num_processes()) id_to_data = {} for row in ocr_split0: @@ -43,7 +44,7 @@ def main(): def create_sft_dataset_with_n_outputs(n_outputs: List[int]): - sft_base = load_dataset(SFT_REPO, split="train") + sft_base = load_dataset(SFT_REPO, split="train", num_proc=open_instruct_utils.max_num_processes()) # for each id, sample exactly n_outputs datapoints id_to_data = {} for row in tqdm(sft_base, desc="Mapping id's in SFT data"): diff --git a/scripts/data/sft/open_math_instruct.py b/scripts/data/sft/open_math_instruct.py index c433524529..4d5993f051 100644 --- a/scripts/data/sft/open_math_instruct.py +++ b/scripts/data/sft/open_math_instruct.py @@ -1,6 +1,7 @@ import argparse from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -53,7 +54,7 @@ ] } - ds = load_dataset("nvidia/OpenMathInstruct-2")["train"] + ds = load_dataset("nvidia/OpenMathInstruct-2", num_proc=open_instruct_utils.max_num_processes())["train"] gsm_subset = ds.filter(lambda x: "gsm8k" in x["problem_source"]) math_subset = ds.filter(lambda x: "math" in x["problem_source"]) diff --git a/scripts/data/sft/sciriff.py b/scripts/data/sft/sciriff.py index 897a302af9..407fb18a55 100644 --- a/scripts/data/sft/sciriff.py +++ b/scripts/data/sft/sciriff.py @@ -1,6 +1,7 @@ import argparse from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -46,7 +47,7 @@ ) args = parser.parse_args() - sciriff_data = load_dataset("allenai/SciRIFF-train-mix") + sciriff_data = load_dataset("allenai/SciRIFF-train-mix", num_proc=open_instruct_utils.max_num_processes()) # filter the dataset to only include science conversations sciriff_data = sciriff_data.filter(lambda example: example["dataset"].startswith("science"), num_proc=16) diff --git a/scripts/data/sft/table_gpt.py b/scripts/data/sft/table_gpt.py index 81d8b38c53..c2413c2bbe 100644 --- a/scripts/data/sft/table_gpt.py +++ b/scripts/data/sft/table_gpt.py @@ -1,6 +1,7 @@ import argparse from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -68,7 +69,7 @@ "for more information about this dataset and the license." ) - ds = load_dataset("LipengCS/Table-GPT", "All") + ds = load_dataset("LipengCS/Table-GPT", "All", num_proc=open_instruct_utils.max_num_processes()) convert_sft_dataset( ds=ds, hf_dataset_id=None, diff --git a/scripts/data/sft/tulu_hard_coded.py b/scripts/data/sft/tulu_hard_coded.py index 31383c911e..c072dfd3f4 100644 --- a/scripts/data/sft/tulu_hard_coded.py +++ b/scripts/data/sft/tulu_hard_coded.py @@ -1,6 +1,7 @@ import argparse from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -39,7 +40,7 @@ ) args = parser.parse_args() - ds = load_dataset("allenai/tulu-3-hardcoded-prompts")["train"] + ds = load_dataset("allenai/tulu-3-hardcoded-prompts", num_proc=open_instruct_utils.max_num_processes())["train"] readme_content = ( "This dataset contains a set of hard-coded examples for Tulu, " diff --git a/scripts/data/sft/utils.py b/scripts/data/sft/utils.py index 0512697e6b..720ebb3461 100644 --- a/scripts/data/sft/utils.py +++ b/scripts/data/sft/utils.py @@ -5,6 +5,7 @@ import datasets from datasets import Dataset, load_dataset +import open_instruct.utils as open_instruct_utils from huggingface_hub import HfApi @@ -92,7 +93,7 @@ def convert_sft_dataset( "Either ds or hf_dataset_id must be provided." if ds is None: - ds = load_dataset(hf_dataset_id) + ds = load_dataset(hf_dataset_id, num_proc=open_instruct_utils.max_num_processes()) if convert_fn: print("Converting dataset to messages format...") diff --git a/scripts/data/sft/web_instruct.py b/scripts/data/sft/web_instruct.py index 90de04bc5d..d0e7b211bf 100644 --- a/scripts/data/sft/web_instruct.py +++ b/scripts/data/sft/web_instruct.py @@ -1,6 +1,7 @@ import argparse from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -46,7 +47,7 @@ ) args = parser.parse_args() - ds = load_dataset("TIGER-Lab/WebInstructSub") + ds = load_dataset("TIGER-Lab/WebInstructSub", num_proc=open_instruct_utils.max_num_processes()) stack_exchange_sub = ds.filter(lambda example: example["source"] in ["mathstackexchange", "stackexchange"]) socratic_sub = ds.filter(lambda example: example["source"] == "socratic") assert len(stack_exchange_sub["train"]) + len(socratic_sub["train"]) == len(ds["train"]), \ diff --git a/scripts/data/sft/wildchat.py b/scripts/data/sft/wildchat.py index 85cb6efcd7..b0fe9a09f5 100644 --- a/scripts/data/sft/wildchat.py +++ b/scripts/data/sft/wildchat.py @@ -1,6 +1,7 @@ import argparse from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from scripts.data.sft.utils import convert_sft_dataset @@ -46,7 +47,7 @@ ) args = parser.parse_args() - ds = load_dataset("allenai/WildChat-1M-Full") + ds = load_dataset("allenai/WildChat-1M-Full", num_proc=open_instruct_utils.max_num_processes()) gpt4_subset = ds.filter(lambda example: "gpt-4" in example["model"], num_proc=16) gpt35_subset = ds.filter(lambda example: "gpt-3.5" in example["model"], num_proc=16) assert len(gpt4_subset["train"]) + len(gpt35_subset["train"]) == len(ds["train"]), \ diff --git a/scripts/does_prompt_make_sense.py b/scripts/does_prompt_make_sense.py index 1881796e49..49f2aaf2eb 100644 --- a/scripts/does_prompt_make_sense.py +++ b/scripts/does_prompt_make_sense.py @@ -6,6 +6,7 @@ import pandas as pd from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from openai import AsyncOpenAI from tqdm.asyncio import tqdm_asyncio from transformers import AutoTokenizer, HfArgumentParser @@ -110,7 +111,7 @@ async def main(ljc: LLMJudgeConfig, df: pd.DataFrame): if __name__ == "__main__": args, ljc = HfArgumentParser((Args, LLMJudgeConfig)).parse_args_into_dataclasses() - raw_dataset = load_dataset("allenai/tulu-v2-sft-mixture", split="train") + raw_dataset = load_dataset("allenai/tulu-v2-sft-mixture", split="train", num_proc=open_instruct_utils.max_num_processes()) raw_dataset = raw_dataset.select(range(64)) tokenizer = AutoTokenizer.from_pretrained("allenai/llama-3-tulu-2-8b") ds = raw_dataset.map( diff --git a/scripts/eval/dummy_length_scorer.py b/scripts/eval/dummy_length_scorer.py index 34ab982b47..1176ceca7c 100644 --- a/scripts/eval/dummy_length_scorer.py +++ b/scripts/eval/dummy_length_scorer.py @@ -5,6 +5,7 @@ import json from datasets import load_dataset +import open_instruct.utils as open_instruct_utils from transformers import AutoTokenizer parser = argparse.ArgumentParser() @@ -28,7 +29,7 @@ def count_token_length(text): if __name__ == "__main__": # load reference data - reference_dataset = load_dataset("hamishivi/alpaca-farm-davinci-003-2048-token") + reference_dataset = load_dataset("hamishivi/alpaca-farm-davinci-003-2048-token", num_proc=open_instruct_utils.max_num_processes()) reference_dataset = [x["output"] for x in reference_dataset["train"]] # load candidate data with open(args.candidate_file, "r") as f: diff --git a/scripts/persona_driven_data_gen/persona_driven_generate_ifdata.py b/scripts/persona_driven_data_gen/persona_driven_generate_ifdata.py index 24de45ae8c..bf20122388 100755 --- a/scripts/persona_driven_data_gen/persona_driven_generate_ifdata.py +++ b/scripts/persona_driven_data_gen/persona_driven_generate_ifdata.py @@ -16,8 +16,6 @@ import openai from datasets import load_dataset - -# from openai import OpenAI from prompt_templates import ( instruction_following, instruction_following_solution, @@ -31,6 +29,8 @@ from tenacity import retry, stop_after_attempt, wait_random_exponential # for exponential backofff from tqdm import tqdm +import open_instruct.utils as open_instruct_utils + @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6)) def completion_with_backoff(**kwargs): @@ -87,9 +87,9 @@ def main(args): # Load the dataset if args.dataset.endswith(".jsonl"): - persona_dataset = load_dataset("json", data_files=args.dataset)['train'] + persona_dataset = load_dataset("json", data_files=args.dataset, num_proc=open_instruct_utils.max_num_processes())['train'] else: - persona_dataset = load_dataset(args.dataset)['train'] + persona_dataset = load_dataset(args.dataset, num_proc=open_instruct_utils.max_num_processes())['train'] if args.sanity_check > 0: persona_dataset = persona_dataset.select(range(0, args.sanity_check)) diff --git a/scripts/persona_driven_data_gen/persona_driven_generate_math_code.py b/scripts/persona_driven_data_gen/persona_driven_generate_math_code.py index b1f1e5a968..cc8bd86441 100755 --- a/scripts/persona_driven_data_gen/persona_driven_generate_math_code.py +++ b/scripts/persona_driven_data_gen/persona_driven_generate_math_code.py @@ -18,8 +18,6 @@ import anthropic import openai from datasets import load_dataset - -# from openai import OpenAI from prompt_templates import ( code_solution_template, code_template, @@ -35,6 +33,8 @@ from tenacity import retry, stop_after_attempt, wait_random_exponential # for exponential backoff from tqdm import tqdm +import open_instruct.utils as open_instruct_utils + @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6)) def completion_with_backoff(**kwargs): @@ -117,9 +117,9 @@ def main(args): # Load the dataset if args.dataset.endswith(".jsonl"): - persona_dataset = load_dataset("json", data_files=args.dataset)['train'] + persona_dataset = load_dataset("json", data_files=args.dataset, num_proc=open_instruct_utils.max_num_processes())['train'] else: - persona_dataset = load_dataset(args.dataset)['train'] + persona_dataset = load_dataset(args.dataset, num_proc=open_instruct_utils.max_num_processes())['train'] if args.sanity_check > 0: persona_dataset = persona_dataset.select(range(0, args.sanity_check))