Skip to content

Commit 9a52da5

Browse files
committed
add sample log for toy_rl/metrics; support sample log for log_stream
1 parent 28547cc commit 9a52da5

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

apps/toy_rl/toy_metrics/main.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,19 @@ async def generate_step(self, step: int, substep: int):
7272
record_metric("policy/count_sequences_completed", 1, Reduce.SUM)
7373
record_metric("policy/avg_tokens_per_sample", value, Reduce.MEAN)
7474

75+
# Sample-level log (e.g. rollout info)
76+
record_metric(
77+
"rollout/samples",
78+
{
79+
"rank": rank,
80+
"step": step,
81+
"substep": substep,
82+
"tokens_generated": value,
83+
"max_tokens": 50,
84+
"timestamp": time.time(),
85+
},
86+
Reduce.SAMPLE,
87+
)
7588
print(f"🎯 Gen rank {rank}: Step {step}.{substep}, tokens={value}")
7689

7790
return value

src/forge/observability/metrics.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# This source code is licensed under the BSD-style license found in the
55
# LICENSE file in the root directory of this source tree.
66

7+
import asyncio
78
import heapq
89
import itertools
910
import logging
@@ -664,7 +665,12 @@ def push(self, metric: Metric) -> None:
664665

665666
# For PER_RANK_NO_REDUCE backends: stream immediately
666667
for backend in self.per_rank_no_reduce_backends:
667-
backend.log_stream(metric=metric, step=self.step)
668+
if metric.reduction == Reduce.SAMPLE:
669+
# Wrap singleton Metric into expected {key: [list_of_dicts]} format
670+
sample = {metric.key: [metric.value]}
671+
asyncio.create_task(backend.log_samples(sample, self.step))
672+
else:
673+
backend.log_stream(metric=metric, step=self.step)
668674

669675
# Always accumulate for reduction and state return
670676
key = metric.key

0 commit comments

Comments
 (0)