forked from streamingfast/substreams-sink
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstats.go
93 lines (72 loc) · 2.48 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package sink
import (
"time"
"github.com/streamingfast/bstream"
"github.com/streamingfast/dmetrics"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)
type Stats struct {
*shutter.Shutter
dataMsgRate *dmetrics.AvgRatePromCounter
progressBlockRate *dmetrics.AvgRatePromGauge
undoMsgRate *dmetrics.AvgRatePromCounter
lastBlock bstream.BlockRef
logger *zap.Logger
}
func newStats(logger *zap.Logger) *Stats {
return &Stats{
Shutter: shutter.New(),
dataMsgRate: dmetrics.MustNewAvgRateFromPromCounter(DataMessageCount, 1*time.Second, 30*time.Second, "msg"),
progressBlockRate: dmetrics.MustNewAvgRateFromPromGauge(ProgressMessageTotalProcessedBlocks, 1*time.Second, 30*time.Second, "block"),
undoMsgRate: dmetrics.MustNewAvgRateFromPromCounter(UndoMessageCount, 1*time.Second, 30*time.Second, "msg"),
lastBlock: unsetBlockRef{},
logger: logger,
}
}
func (s *Stats) RecordBlock(block bstream.BlockRef) {
s.lastBlock = block
}
func (s *Stats) Start(each time.Duration) {
if s.IsTerminating() || s.IsTerminated() {
panic("already shutdown, refusing to start again")
}
go func() {
ticker := time.NewTicker(each)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.LogNow()
case <-s.Terminating():
return
}
}
}()
}
func (s *Stats) LogNow() {
// Logging fields order is important as it affects the final rendering, we carefully ordered
// them so the development logs looks nicer.
s.logger.Info("substreams stream stats",
zap.Stringer("data_msg_rate", s.dataMsgRate),
zap.Any("progress_block_rate", s.progressBlockRate),
zap.Stringer("undo_msg_rate", s.undoMsgRate),
zap.Any("progress_last_block", dmetrics.NewValuesFromMetric(ProgressMessageLastBlock).Uints("stage")),
zap.Any("progress_running_jobs", dmetrics.NewValuesFromMetric(ProgressMessageRunningJobs).Uints("stage")),
zap.Uint64("progress_total_processed_blocks", dmetrics.NewValueFromMetric(ProgressMessageTotalProcessedBlocks, "blocks").ValueUint()),
zap.Any("progress_last_contiguous_block", dmetrics.NewValuesFromMetric(ProgressMessageLastContiguousBlock).Uints("stage")),
zap.Stringer("last_block", s.lastBlock),
)
}
func (s *Stats) Close() {
s.dataMsgRate.SyncNow()
s.undoMsgRate.SyncNow()
s.LogNow()
s.Shutdown(nil)
s.dataMsgRate.Stop()
s.undoMsgRate.Stop()
}
type unsetBlockRef struct{}
func (unsetBlockRef) ID() string { return "" }
func (unsetBlockRef) Num() uint64 { return 0 }
func (unsetBlockRef) String() string { return "None" }