forked from verl-project/verl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
149 lines (126 loc) · 5.16 KB
/
utils.py
File metadata and controls
149 lines (126 loc) · 5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# Copyright 2025 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Metrics utils.
"""
from enum import Enum
from typing import Any, Optional, Union
import numpy as np
import torch
def reduce_metrics(metrics: dict[str, Union["Metric", list[Any]]]) -> dict[str, Any]:
"""
Reduces a dictionary of metric lists by computing the mean, max, or min of each list.
The reduce operation is determined by the key name:
- If the key contains "max", np.max is used
- If the key contains "min", np.min is used
- Otherwise, np.mean is used
Args:
metrics: A dictionary mapping metric names to lists of metric values.
Returns:
A dictionary with the same keys but with each list replaced by its reduced value.
Example:
>>> metrics = {
... "loss": [1.0, 2.0, 3.0],
... "accuracy": [0.8, 0.9, 0.7],
... "max_reward": [5.0, 8.0, 6.0],
... "min_error": [0.1, 0.05, 0.2]
... }
>>> reduce_metrics(metrics)
{"loss": 2.0, "accuracy": 0.8, "max_reward": 8.0, "min_error": 0.05}
"""
for key, val in metrics.items():
if isinstance(val, Metric):
metrics[key] = val.aggregate()
elif "max" in key:
metrics[key] = np.max(val)
elif "min" in key:
metrics[key] = np.min(val)
else:
metrics[key] = np.mean(val)
return metrics
class AggregationType(Enum):
MEAN = "mean"
SUM = "sum"
MIN = "min"
MAX = "max"
NumericType = int, float, torch.Tensor
Numeric = int | float | torch.Tensor
class Metric:
"""
A metric aggregator for collecting and aggregating numeric values.
This class accumulates numeric values (int, float, or scalar tensors) and computes
an aggregate statistic based on the specified aggregation type (MEAN, SUM, MIN, or MAX).
Args:
aggregation: The aggregation method to use. Can be a string ("mean", "sum", "min", "max")
or an AggregationType enum value.
value: Optional initial value(s) to add. Can be a single numeric value or a list of values.
Example:
>>> metric = Metric(aggregation="mean", value=1.0)
>>> metric.append(2.0)
>>> metric.append(3.0)
>>> metric.aggregate()
2.0
"""
def __init__(self, aggregation: str | AggregationType, value: Optional[Numeric | list[Numeric]] = None) -> None:
if isinstance(aggregation, str):
self.aggregation = AggregationType(aggregation)
else:
self.aggregation = aggregation
if not isinstance(self.aggregation, AggregationType):
raise ValueError(f"Unsupported aggregation type: {aggregation}")
self.values = []
if value is not None:
self.append(value)
def append(self, value: Union[Numeric, "Metric"]) -> None:
if isinstance(value, Metric):
self.extend(value)
return
if isinstance(value, torch.Tensor):
if value.numel() != 1:
raise ValueError("Only scalar tensors can be converted to float")
value = value.detach().item()
if not isinstance(value, NumericType):
raise ValueError(f"Unsupported value type: {type(value)}")
self.values.append(value)
def extend(self, values: Union["Metric", list[Numeric]]) -> None:
if isinstance(values, Metric):
if values.aggregation != self.aggregation:
raise ValueError(f"Aggregation type mismatch: {self.aggregation} != {values.aggregation}")
values = values.values
for value in values:
self.append(value)
def aggregate(self) -> float:
match self.aggregation:
case AggregationType.MEAN:
return np.mean(self.values)
case AggregationType.SUM:
return np.sum(self.values)
case AggregationType.MIN:
return np.min(self.values)
case AggregationType.MAX:
return np.max(self.values)
@classmethod
def chain(cls, metric_lists: list["Metric"]) -> "Metric":
if len(metric_lists) == 0:
return cls(aggregation=AggregationType.MEAN)
aggregation = metric_lists[0].aggregation
chained = cls(aggregation=aggregation)
for ml in metric_lists:
chained.extend(ml)
return chained
@classmethod
def from_dict(cls, data: dict[str, Numeric], aggregation: str | AggregationType) -> dict[str, "Metric"]:
return {key: cls(value=value, aggregation=aggregation) for key, value in data.items()}
def init_list(self) -> "Metric":
return Metric(aggregation=self.aggregation)