Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,14 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None, l
"difference before and after a op. Only available when "
"open_tracer is true.",
)
parser.add_argument(
"--trace_keys",
type=List[str],
default=[],
help="List of field names to include in trace output. If set, the "
"specified fields' values will be included in each trace entry. "
"Only available when open_tracer is true.",
)
parser.add_argument(
"--open_insight_mining",
type=bool,
Expand Down
1 change: 1 addition & 0 deletions data_juicer/config/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ temp_dir: null # the path to the te
open_tracer: false # whether to open the tracer to trace the changes during process. It might take more time when opening tracer
op_list_to_trace: [] # only ops in this list will be traced by tracer. If it's empty, all ops will be traced. Only available when tracer is opened.
trace_num: 10 # number of samples to show the differences between datasets before and after each op. Only available when tracer is opened.
trace_keys: [] # list of field names to include in trace output (e.g. ['sample_id', 'source_file']). If set, the specified fields' values will be included in each trace entry. Only available when tracer is opened.
op_fusion: false # whether to fuse operators that share the same intermediate variables automatically. Op fusion might reduce the memory requirements slightly but speed up the whole process.
fusion_strategy: 'probe' # OP fusion strategy. Support ['greedy', 'probe'] now. 'greedy' means keep the basic OP order and put the fused OP to the last of each fused OP group. 'probe' means Data-Juicer will probe the running speed for each OP at the beginning and reorder the OPs and fused OPs according to their probed speed (fast to slow). It's 'probe' in default.
cache_compress: null # the compression method of the cache file, which can be specified in ['gzip', 'zstd', 'lz4']. If this parameter is None, the cache file will not be compressed. We recommend you turn on this argument when your input dataset is larger than tens of GB and your disk space is not enough.
Expand Down
7 changes: 6 additions & 1 deletion data_juicer/core/executor/default_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ def __init__(self, cfg: Optional[Namespace] = None):
self.open_tracer = self.cfg.open_tracer
if self.open_tracer:
logger.info("Preparing tracer...")
self.tracer = Tracer(self.work_dir, self.cfg.op_list_to_trace, show_num=self.cfg.trace_num)
self.tracer = Tracer(
self.work_dir,
self.cfg.op_list_to_trace,
show_num=self.cfg.trace_num,
trace_keys=self.cfg.trace_keys,
)

def run(
self,
Expand Down
20 changes: 13 additions & 7 deletions data_juicer/core/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Tracer:
The comparison results will be stored in the work directory.
"""

def __init__(self, work_dir, op_list_to_trace=None, show_num=10):
def __init__(self, work_dir, op_list_to_trace=None, show_num=10, trace_keys=None):
"""
Initialization method.

Expand All @@ -25,6 +25,9 @@ def __init__(self, work_dir, op_list_to_trace=None, show_num=10):
:param op_list_to_trace: the OP list to be traced.
:param show_num: the maximum number of samples to show in the
comparison result files.
:param trace_keys: list of field names to include in trace output.
If set, the specified fields' values will be included in each
trace entry.
"""
self.work_dir = os.path.join(work_dir, "trace")
if not os.path.exists(self.work_dir):
Expand All @@ -36,6 +39,7 @@ def __init__(self, work_dir, op_list_to_trace=None, show_num=10):
else:
self.op_list_to_trace = set(op_list_to_trace)
self.show_num = show_num
self.trace_keys = trace_keys or []

def trace_mapper(self, op_name: str, previous_ds: Dataset, processed_ds: Dataset, text_key: str):
"""
Expand Down Expand Up @@ -63,12 +67,14 @@ def trace_mapper(self, op_name: str, previous_ds: Dataset, processed_ds: Dataset
previous_sample = previous_ds[i][text_key]
processed_sample = processed_ds[i][text_key]
if previous_sample != processed_sample:
dif_dict.append(
{
"original_text": previous_sample,
"processed_text": processed_sample,
}
)
entry = {}
# Add specified fields first (appears at start of output)
for key in self.trace_keys:
entry[key] = previous_ds[i].get(key)
# Add trace data (these take precedence over trace_keys)
entry["original_text"] = previous_sample
entry["processed_text"] = processed_sample
dif_dict.append(entry)
num += 1
if num >= self.show_num:
break
Expand Down
117 changes: 117 additions & 0 deletions tests/core/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,123 @@ def test_op_list_to_trace(self):
trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl')
self.assertFalse(os.path.exists(trace_file_path))

def test_trace_mapper_with_trace_keys_single(self):
"""Test that trace_keys includes specified fields in trace output."""
prev_ds = Dataset.from_list([
{'text': 'text 1', 'sample_id': 'id-001'},
{'text': 'text 2', 'sample_id': 'id-002'},
{'text': 'text 3', 'sample_id': 'id-003'},
])
done_ds = Dataset.from_list([
{'text': 'text 1', 'sample_id': 'id-001'},
{'text': 'processed text 2', 'sample_id': 'id-002'},
{'text': 'text 3', 'sample_id': 'id-003'},
])
dif_list = [
{
'original_text': 'text 2',
'processed_text': 'processed text 2',
'sample_id': 'id-002',
}
]
tracer = Tracer(self.work_dir, trace_keys=['sample_id'])
tracer.trace_mapper('clean_email_mapper', prev_ds, done_ds, 'text')
trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl')
self.assertTrue(os.path.exists(trace_file_path))
trace_records = []
with jl.open(trace_file_path, 'r') as reader:
for s in reader:
trace_records.append(s)
self.assertEqual(dif_list, trace_records)

def test_trace_mapper_with_trace_keys_multiple(self):
"""Test that trace_keys includes multiple fields in trace output."""
prev_ds = Dataset.from_list([
{'text': 'text 1', 'sample_id': 'id-001', 'source': 'file1.jsonl'},
{'text': 'text 2', 'sample_id': 'id-002', 'source': 'file1.jsonl'},
{'text': 'text 3', 'sample_id': 'id-003', 'source': 'file2.jsonl'},
])
done_ds = Dataset.from_list([
{'text': 'text 1', 'sample_id': 'id-001', 'source': 'file1.jsonl'},
{'text': 'processed text 2', 'sample_id': 'id-002', 'source': 'file1.jsonl'},
{'text': 'text 3', 'sample_id': 'id-003', 'source': 'file2.jsonl'},
])
dif_list = [
{
'original_text': 'text 2',
'processed_text': 'processed text 2',
'sample_id': 'id-002',
'source': 'file1.jsonl',
}
]
tracer = Tracer(self.work_dir, trace_keys=['sample_id', 'source'])
tracer.trace_mapper('clean_email_mapper', prev_ds, done_ds, 'text')
trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl')
self.assertTrue(os.path.exists(trace_file_path))
trace_records = []
with jl.open(trace_file_path, 'r') as reader:
for s in reader:
trace_records.append(s)
self.assertEqual(dif_list, trace_records)

def test_trace_mapper_with_trace_keys_missing_field(self):
"""Test that trace_keys handles missing field gracefully."""
prev_ds = Dataset.from_list([
{'text': 'text 1'},
{'text': 'text 2'},
{'text': 'text 3'},
])
done_ds = Dataset.from_list([
{'text': 'text 1'},
{'text': 'processed text 2'},
{'text': 'text 3'},
])
dif_list = [
{
'original_text': 'text 2',
'processed_text': 'processed text 2',
'sample_id': None,
}
]
tracer = Tracer(self.work_dir, trace_keys=['sample_id'])
tracer.trace_mapper('clean_email_mapper', prev_ds, done_ds, 'text')
trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl')
self.assertTrue(os.path.exists(trace_file_path))
trace_records = []
with jl.open(trace_file_path, 'r') as reader:
for s in reader:
trace_records.append(s)
self.assertEqual(dif_list, trace_records)

def test_trace_mapper_without_trace_keys(self):
"""Test that without trace_keys, output is unchanged (default behavior)."""
prev_ds = Dataset.from_list([
{'text': 'text 1', 'sample_id': 'id-001'},
{'text': 'text 2', 'sample_id': 'id-002'},
{'text': 'text 3', 'sample_id': 'id-003'},
])
done_ds = Dataset.from_list([
{'text': 'text 1', 'sample_id': 'id-001'},
{'text': 'processed text 2', 'sample_id': 'id-002'},
{'text': 'text 3', 'sample_id': 'id-003'},
])
# Without trace_keys, only original_text and processed_text are included
dif_list = [
{
'original_text': 'text 2',
'processed_text': 'processed text 2',
}
]
tracer = Tracer(self.work_dir) # No trace_keys
tracer.trace_mapper('clean_email_mapper', prev_ds, done_ds, 'text')
trace_file_path = os.path.join(self.work_dir, 'trace', 'mapper-clean_email_mapper.jsonl')
self.assertTrue(os.path.exists(trace_file_path))
trace_records = []
with jl.open(trace_file_path, 'r') as reader:
for s in reader:
trace_records.append(s)
self.assertEqual(dif_list, trace_records)


if __name__ == '__main__':
unittest.main()