diff --git a/data_juicer/config/config.py b/data_juicer/config/config.py index eb86b0021c..1bb396dacb 100644 --- a/data_juicer/config/config.py +++ b/data_juicer/config/config.py @@ -452,6 +452,14 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None, l "difference before and after a op. Only available when " "open_tracer is true.", ) + parser.add_argument( + "--trace_keys", + type=List[str], + default=[], + help="List of field names to include in trace output. If set, the " + "specified fields' values will be included in each trace entry. " + "Only available when open_tracer is true.", + ) parser.add_argument( "--open_insight_mining", type=bool, diff --git a/data_juicer/config/config_all.yaml b/data_juicer/config/config_all.yaml index 5e525f8674..52e31c5840 100644 --- a/data_juicer/config/config_all.yaml +++ b/data_juicer/config/config_all.yaml @@ -48,6 +48,7 @@ temp_dir: null # the path to the te open_tracer: false # whether to open the tracer to trace the changes during process. It might take more time when opening tracer op_list_to_trace: [] # only ops in this list will be traced by tracer. If it's empty, all ops will be traced. Only available when tracer is opened. trace_num: 10 # number of samples to show the differences between datasets before and after each op. Only available when tracer is opened. +trace_keys: [] # list of field names to include in trace output (e.g. ['sample_id', 'source_file']). If set, the specified fields' values will be included in each trace entry. Only available when tracer is opened. op_fusion: false # whether to fuse operators that share the same intermediate variables automatically. Op fusion might reduce the memory requirements slightly but speed up the whole process. fusion_strategy: 'probe' # OP fusion strategy. Support ['greedy', 'probe'] now. 'greedy' means keep the basic OP order and put the fused OP to the last of each fused OP group. 'probe' means Data-Juicer will probe the running speed for each OP at the beginning and reorder the OPs and fused OPs according to their probed speed (fast to slow). It's 'probe' in default. cache_compress: null # the compression method of the cache file, which can be specified in ['gzip', 'zstd', 'lz4']. If this parameter is None, the cache file will not be compressed. We recommend you turn on this argument when your input dataset is larger than tens of GB and your disk space is not enough. diff --git a/data_juicer/core/executor/default_executor.py b/data_juicer/core/executor/default_executor.py index ecde56778a..cefe914c0c 100644 --- a/data_juicer/core/executor/default_executor.py +++ b/data_juicer/core/executor/default_executor.py @@ -111,7 +111,12 @@ def __init__(self, cfg: Optional[Namespace] = None): self.open_tracer = self.cfg.open_tracer if self.open_tracer: logger.info("Preparing tracer...") - self.tracer = Tracer(self.work_dir, self.cfg.op_list_to_trace, show_num=self.cfg.trace_num) + self.tracer = Tracer( + self.work_dir, + self.cfg.op_list_to_trace, + show_num=self.cfg.trace_num, + trace_keys=self.cfg.trace_keys, + ) def run( self, diff --git a/data_juicer/core/tracer.py b/data_juicer/core/tracer.py index c0bbd18d1b..a4480f8ddc 100644 --- a/data_juicer/core/tracer.py +++ b/data_juicer/core/tracer.py @@ -16,7 +16,7 @@ class Tracer: The comparison results will be stored in the work directory. """ - def __init__(self, work_dir, op_list_to_trace=None, show_num=10): + def __init__(self, work_dir, op_list_to_trace=None, show_num=10, trace_keys=None): """ Initialization method. @@ -25,6 +25,9 @@ def __init__(self, work_dir, op_list_to_trace=None, show_num=10): :param op_list_to_trace: the OP list to be traced. :param show_num: the maximum number of samples to show in the comparison result files. + :param trace_keys: list of field names to include in trace output. + If set, the specified fields' values will be included in each + trace entry. """ self.work_dir = os.path.join(work_dir, "trace") if not os.path.exists(self.work_dir): @@ -36,6 +39,7 @@ def __init__(self, work_dir, op_list_to_trace=None, show_num=10): else: self.op_list_to_trace = set(op_list_to_trace) self.show_num = show_num + self.trace_keys = trace_keys or [] def trace_mapper(self, op_name: str, previous_ds: Dataset, processed_ds: Dataset, text_key: str): """ @@ -63,12 +67,14 @@ def trace_mapper(self, op_name: str, previous_ds: Dataset, processed_ds: Dataset previous_sample = previous_ds[i][text_key] processed_sample = processed_ds[i][text_key] if previous_sample != processed_sample: - dif_dict.append( - { - "original_text": previous_sample, - "processed_text": processed_sample, - } - ) + entry = {} + # Add specified fields first (appears at start of output) + for key in self.trace_keys: + entry[key] = previous_ds[i].get(key) + # Add trace data (these take precedence over trace_keys) + entry["original_text"] = previous_sample + entry["processed_text"] = processed_sample + dif_dict.append(entry) num += 1 if num >= self.show_num: break diff --git a/tests/core/test_tracer.py b/tests/core/test_tracer.py index 58bce2a03a..8b37070cad 100644 --- a/tests/core/test_tracer.py +++ b/tests/core/test_tracer.py @@ -290,6 +290,123 @@ def test_op_list_to_trace(self): trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl') self.assertFalse(os.path.exists(trace_file_path)) + def test_trace_mapper_with_trace_keys_single(self): + """Test that trace_keys includes specified fields in trace output.""" + prev_ds = Dataset.from_list([ + {'text': 'text 1', 'sample_id': 'id-001'}, + {'text': 'text 2', 'sample_id': 'id-002'}, + {'text': 'text 3', 'sample_id': 'id-003'}, + ]) + done_ds = Dataset.from_list([ + {'text': 'text 1', 'sample_id': 'id-001'}, + {'text': 'processed text 2', 'sample_id': 'id-002'}, + {'text': 'text 3', 'sample_id': 'id-003'}, + ]) + dif_list = [ + { + 'original_text': 'text 2', + 'processed_text': 'processed text 2', + 'sample_id': 'id-002', + } + ] + tracer = Tracer(self.work_dir, trace_keys=['sample_id']) + tracer.trace_mapper('clean_email_mapper', prev_ds, done_ds, 'text') + trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl') + self.assertTrue(os.path.exists(trace_file_path)) + trace_records = [] + with jl.open(trace_file_path, 'r') as reader: + for s in reader: + trace_records.append(s) + self.assertEqual(dif_list, trace_records) + + def test_trace_mapper_with_trace_keys_multiple(self): + """Test that trace_keys includes multiple fields in trace output.""" + prev_ds = Dataset.from_list([ + {'text': 'text 1', 'sample_id': 'id-001', 'source': 'file1.jsonl'}, + {'text': 'text 2', 'sample_id': 'id-002', 'source': 'file1.jsonl'}, + {'text': 'text 3', 'sample_id': 'id-003', 'source': 'file2.jsonl'}, + ]) + done_ds = Dataset.from_list([ + {'text': 'text 1', 'sample_id': 'id-001', 'source': 'file1.jsonl'}, + {'text': 'processed text 2', 'sample_id': 'id-002', 'source': 'file1.jsonl'}, + {'text': 'text 3', 'sample_id': 'id-003', 'source': 'file2.jsonl'}, + ]) + dif_list = [ + { + 'original_text': 'text 2', + 'processed_text': 'processed text 2', + 'sample_id': 'id-002', + 'source': 'file1.jsonl', + } + ] + tracer = Tracer(self.work_dir, trace_keys=['sample_id', 'source']) + tracer.trace_mapper('clean_email_mapper', prev_ds, done_ds, 'text') + trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl') + self.assertTrue(os.path.exists(trace_file_path)) + trace_records = [] + with jl.open(trace_file_path, 'r') as reader: + for s in reader: + trace_records.append(s) + self.assertEqual(dif_list, trace_records) + + def test_trace_mapper_with_trace_keys_missing_field(self): + """Test that trace_keys handles missing field gracefully.""" + prev_ds = Dataset.from_list([ + {'text': 'text 1'}, + {'text': 'text 2'}, + {'text': 'text 3'}, + ]) + done_ds = Dataset.from_list([ + {'text': 'text 1'}, + {'text': 'processed text 2'}, + {'text': 'text 3'}, + ]) + dif_list = [ + { + 'original_text': 'text 2', + 'processed_text': 'processed text 2', + 'sample_id': None, + } + ] + tracer = Tracer(self.work_dir, trace_keys=['sample_id']) + tracer.trace_mapper('clean_email_mapper', prev_ds, done_ds, 'text') + trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl') + self.assertTrue(os.path.exists(trace_file_path)) + trace_records = [] + with jl.open(trace_file_path, 'r') as reader: + for s in reader: + trace_records.append(s) + self.assertEqual(dif_list, trace_records) + + def test_trace_mapper_without_trace_keys(self): + """Test that without trace_keys, output is unchanged (default behavior).""" + prev_ds = Dataset.from_list([ + {'text': 'text 1', 'sample_id': 'id-001'}, + {'text': 'text 2', 'sample_id': 'id-002'}, + {'text': 'text 3', 'sample_id': 'id-003'}, + ]) + done_ds = Dataset.from_list([ + {'text': 'text 1', 'sample_id': 'id-001'}, + {'text': 'processed text 2', 'sample_id': 'id-002'}, + {'text': 'text 3', 'sample_id': 'id-003'}, + ]) + # Without trace_keys, only original_text and processed_text are included + dif_list = [ + { + 'original_text': 'text 2', + 'processed_text': 'processed text 2', + } + ] + tracer = Tracer(self.work_dir) # No trace_keys + tracer.trace_mapper('clean_email_mapper', prev_ds, done_ds, 'text') + trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl') + self.assertTrue(os.path.exists(trace_file_path)) + trace_records = [] + with jl.open(trace_file_path, 'r') as reader: + for s in reader: + trace_records.append(s) + self.assertEqual(dif_list, trace_records) + if __name__ == '__main__': unittest.main()