Skip to content

Commit 984ec85

Browse files
committed
Merge branch 'dev' into dev-merge
# Conflicts: # app/events-api/main.go # go.mod # go.sum # processor/processor.go
2 parents c54ef5a + ed642f7 commit 984ec85

7 files changed

Lines changed: 184 additions & 52 deletions

File tree

app/events-api/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"fmt"
55
"github.com/cockroachdb/pebble"
6+
"github.com/qubic/go-events/metrics"
67
"github.com/qubic/go-events/processor"
78
"github.com/qubic/go-events/pubsub"
89
"github.com/qubic/go-events/server"
@@ -35,6 +36,7 @@ func run() error {
3536
ShutdownTimeout time.Duration `conf:"default:5s"`
3637
HttpHost string `conf:"default:0.0.0.0:8000"`
3738
GrpcHost string `conf:"default:0.0.0.0:8001"`
39+
MetricsHost string `conf:"default:0.0.0.0:2112"`
3840
NodeSyncThreshold int `conf:"default:3"`
3941
}
4042
Pool struct {
@@ -149,6 +151,10 @@ func run() error {
149151

150152
proc := processor.NewProcessor(pConn, pubSubClient, cfg.PubSub.Enabled, eventsStore, cfg.Qubic.ProcessTickTimeout, passcodes)
151153

154+
log.Printf("Starting metrics service...")
155+
metricsService := metrics.NewMetricsService(cfg.Server.MetricsHost, eventsStore)
156+
metricsService.Start()
157+
152158
srv := server.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, eventsStore)
153159
err = srv.Start()
154160
if err != nil {

go.mod

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ require (
66
github.com/ardanlabs/conf v1.5.0
77
github.com/cockroachdb/pebble v1.1.2
88
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0
9+
github.com/jellydator/ttlcache/v3 v3.3.0
910
github.com/pkg/errors v0.9.1
10-
github.com/qubic/go-qubic v0.3.1
11+
github.com/prometheus/client_golang v1.12.0
12+
github.com/qubic/go-qubic v0.3.2
1113
github.com/redis/go-redis/v9 v9.7.0
14+
golang.org/x/sync v0.10.0
1215
google.golang.org/genproto/googleapis/api v0.0.0-20240730163845-b1a4ccb954bf
1316
google.golang.org/grpc v1.65.0
1417
google.golang.org/protobuf v1.34.2
@@ -33,7 +36,6 @@ require (
3336
github.com/kr/pretty v0.3.1 // indirect
3437
github.com/kr/text v0.2.0 // indirect
3538
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
36-
github.com/prometheus/client_golang v1.12.0 // indirect
3739
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
3840
github.com/prometheus/common v0.32.1 // indirect
3941
github.com/prometheus/procfs v0.7.3 // indirect

go.sum

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0/go.mod h1:nCLIt0w3Ept2NwF8ThLm
167167
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
168168
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
169169
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
170+
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
171+
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
170172
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
171173
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
172174
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -232,8 +234,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
232234
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
233235
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
234236
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
235-
github.com/qubic/go-qubic v0.3.1 h1:P6XZZA44lzmQtdaz+AK5UZGz6IBeqk8WplCKxUBN9hg=
236-
github.com/qubic/go-qubic v0.3.1/go.mod h1:OqqByAtABECupBpf9pmtG6N+uskGVJwZjhDgyPpHyRc=
237+
github.com/qubic/go-qubic v0.3.2 h1:Z/eYsZgZpYAsEt2zMwW+zqmoV3u7RTLTpLJhkB4048I=
238+
github.com/qubic/go-qubic v0.3.2/go.mod h1:OqqByAtABECupBpf9pmtG6N+uskGVJwZjhDgyPpHyRc=
237239
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
238240
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
239241
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@@ -264,6 +266,8 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
264266
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
265267
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
266268
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
269+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
270+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
267271
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
268272
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
269273
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

metrics/service.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package metrics
2+
3+
import (
4+
"github.com/jellydator/ttlcache/v3"
5+
"github.com/pkg/errors"
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
"github.com/prometheus/client_golang/prometheus/promhttp"
9+
eventspb "github.com/qubic/go-events/proto"
10+
"log"
11+
"net/http"
12+
"time"
13+
)
14+
15+
const lptCacheKey = "LPT"
16+
const epochCacheKey = "EPOCH"
17+
18+
type Store interface {
19+
FetchLastProcessedTick() (*eventspb.ProcessedTick, error)
20+
}
21+
22+
type Service struct {
23+
address string
24+
cache *ttlcache.Cache[string, uint32]
25+
store Store
26+
27+
lastProcessedTickGauge prometheus.Gauge
28+
currentEpochGauge prometheus.Gauge
29+
}
30+
31+
func NewMetricsService(address string, store Store) *Service {
32+
return &Service{
33+
address: address,
34+
cache: ttlcache.New[string, uint32](ttlcache.WithTTL[string, uint32](5 * time.Second)),
35+
store: store,
36+
lastProcessedTickGauge: promauto.NewGauge(prometheus.GaugeOpts{
37+
Name: "qubic_events_last_processed_tick",
38+
Help: "The last tick processed by the events service.",
39+
}),
40+
currentEpochGauge: promauto.NewGauge(prometheus.GaugeOpts{
41+
Name: "qubic_events_current_epoch",
42+
Help: "The current epoch of the last processed tick.",
43+
}),
44+
}
45+
}
46+
47+
func (s *Service) Start() {
48+
49+
go s.cache.Start()
50+
51+
go func() {
52+
53+
serverMux := http.NewServeMux()
54+
serverMux.Handle("/metrics", s.metricsEndpointHandler())
55+
56+
var server = &http.Server{
57+
Addr: s.address,
58+
Handler: serverMux,
59+
ReadTimeout: 15 * time.Second,
60+
ReadHeaderTimeout: 15 * time.Second,
61+
WriteTimeout: 15 * time.Second,
62+
}
63+
64+
if err := server.ListenAndServe(); err != nil {
65+
panic(err)
66+
}
67+
}()
68+
}
69+
70+
func (s *Service) refreshCache() error {
71+
72+
lpt, err := s.store.FetchLastProcessedTick()
73+
if err != nil {
74+
return errors.Wrap(err, "fetching last processed tick")
75+
}
76+
77+
s.cache.Set(lptCacheKey, lpt.TickNumber, ttlcache.DefaultTTL)
78+
s.cache.Set(epochCacheKey, lpt.Epoch, ttlcache.DefaultTTL)
79+
80+
return nil
81+
}
82+
83+
func (s *Service) metricsEndpointHandler() http.Handler {
84+
85+
if !s.cache.Has(lptCacheKey) || !s.cache.Has(epochCacheKey) {
86+
err := s.refreshCache()
87+
if err != nil {
88+
log.Printf("Failed to refresh metrics cache: %s\n", err)
89+
}
90+
}
91+
92+
lastProcessedTick := s.cache.Get(lptCacheKey).Value()
93+
epoch := s.cache.Get(epochCacheKey).Value()
94+
95+
s.lastProcessedTickGauge.Set(float64(lastProcessedTick))
96+
s.currentEpochGauge.Set(float64(epoch))
97+
return promhttp.Handler()
98+
}

processor/processor.go

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -58,59 +58,68 @@ func (p *Processor) Start() error {
5858
}
5959

6060
func (p *Processor) processOneByOne() error {
61-
ctx, cancel := context.WithTimeout(context.Background(), p.processTickTimeout)
62-
defer cancel()
61+
f := func(rp connector.RequestPerformer) error {
62+
ctx, cancel := context.WithTimeout(context.Background(), p.processTickTimeout)
63+
defer cancel()
6364

64-
coreClient := core.NewClient(p.qubicConnector)
65-
tickInfo, err := coreClient.GetTickInfo(ctx)
66-
if err != nil {
67-
return errors.Wrap(err, "getting tick info")
68-
}
69-
70-
lastTick, err := p.getLastProcessedTick(ctx, tickInfo)
71-
if err != nil {
72-
return errors.Wrap(err, "getting last processed tick")
73-
}
65+
coreClient := core.NewClient(rp)
66+
tickInfo, err := coreClient.GetTickInfo(ctx)
67+
if err != nil {
68+
return errors.Wrap(err, "getting tick info")
69+
}
7470

75-
nextTick, err := p.getNextProcessingTick(ctx, lastTick, tickInfo)
76-
if err != nil {
77-
return errors.Wrap(err, "getting next processing tick")
78-
}
79-
log.Printf("Next tick to process: %d\n", nextTick.TickNumber)
71+
lastTick, err := p.getLastProcessedTick(ctx, tickInfo)
72+
if err != nil {
73+
return errors.Wrap(err, "getting last processed tick")
74+
}
8075

81-
if tickInfo.Tick < nextTick.TickNumber {
82-
err = newTickInTheFutureError(nextTick.TickNumber, tickInfo.Tick)
83-
return err
84-
}
76+
nextTick, err := p.getNextProcessingTick(ctx, lastTick, tickInfo)
77+
if err != nil {
78+
return errors.Wrap(err, "getting next processing tick")
79+
}
80+
log.Printf("Next tick to process: %d\n", nextTick.TickNumber)
8581

86-
eventsClient := events.NewClient(p.qubicConnector, p.passcodes)
87-
start := time.Now()
88-
tickEvents, err := eventsClient.GetTickEvents(context.Background(), nextTick.TickNumber)
89-
if err != nil {
90-
return errors.Wrap(err, "getting tick events")
91-
}
92-
end := time.Now()
82+
if tickInfo.Tick < nextTick.TickNumber {
83+
err = newTickInTheFutureError(nextTick.TickNumber, tickInfo.Tick)
84+
return err
85+
}
9386

94-
err = p.eventsStore.SetTickEvents(nextTick.TickNumber, tickEvents)
95-
if err != nil {
96-
return errors.Wrap(err, "setting tick events")
97-
}
87+
eventsClient := events.NewClient(rp, p.passcodes)
88+
start := time.Now()
89+
tickEvents, err := eventsClient.GetTickEvents(context.Background(), nextTick.TickNumber)
90+
if err != nil {
91+
return errors.Wrap(err, "getting tick events")
92+
}
93+
end := time.Now()
9894

99-
err = p.eventsStore.SetTickProcessTime(nextTick.TickNumber, uint64(end.Sub(start).Seconds()))
100-
if err != nil {
101-
return errors.Wrap(err, "setting tick process time")
102-
}
95+
err = p.eventsStore.SetTickEvents(nextTick.TickNumber, tickEvents)
96+
if err != nil {
97+
return errors.Wrap(err, "setting tick events")
98+
}
10399

104-
err = p.processStatus(ctx, lastTick, nextTick)
105-
if err != nil {
106-
return errors.Wrapf(err, "processing status for lastTick %+v and nextTick %+v", lastTick, nextTick)
107-
}
100+
err = p.eventsStore.SetTickProcessTime(nextTick.TickNumber, uint64(end.Sub(start).Seconds()))
101+
if err != nil {
102+
return errors.Wrap(err, "setting tick process time")
103+
}
108104

109-
if p.isPubSubEnabled {
110-
err = p.redisPubSubClient.PublishTickEvents(ctx, tickEvents)
105+
err = p.processStatus(ctx, lastTick, nextTick)
111106
if err != nil {
112-
return errors.Wrap(err, "publishing tick events")
107+
return errors.Wrapf(err, "processing status for lastTick %+v and nextTick %+v", lastTick, nextTick)
113108
}
109+
110+
if p.isPubSubEnabled {
111+
err = p.redisPubSubClient.PublishTickEvents(ctx, tickEvents)
112+
if err != nil {
113+
return errors.Wrap(err, "publishing tick events")
114+
}
115+
}
116+
117+
return nil
118+
}
119+
120+
err := p.qubicConnector.WithConnection(f)
121+
if err != nil {
122+
return errors.Wrap(err, "performing WithConnection logic")
114123
}
115124

116125
return nil
@@ -142,7 +151,7 @@ func (p *Processor) getNextProcessingTick(ctx context.Context, lastTick *eventsp
142151
}
143152

144153
func (p *Processor) getLastProcessedTick(ctx context.Context, currentTickInfo *qubicpb.TickInfo) (*eventspb.ProcessedTick, error) {
145-
lastTick, err := p.eventsStore.GetLastProcessedTick(ctx)
154+
lastTick, err := p.eventsStore.FetchLastProcessedTick()
146155
if err != nil {
147156
//handles first run of the events processor where there is nothing in storage
148157
// in this case last tick is 0 and epoch is current tick info epoch

server/events.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func NewEventsService(eventsStore *store.Store) *EventsService {
2727
}
2828

2929
func (s *EventsService) GetTickEvents(ctx context.Context, req *eventspb.GetTickEventsRequest) (*eventspb.TickEvents, error) {
30-
lastProcessedTick, err := s.eventsStore.GetLastProcessedTick(ctx)
30+
lastProcessedTick, err := s.eventsStore.FetchLastProcessedTick()
3131
if err != nil {
3232
return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err)
3333
}
@@ -72,7 +72,7 @@ func (s *EventsService) GetTickEvents(ctx context.Context, req *eventspb.GetTick
7272
}
7373

7474
func (s *EventsService) GetStatus(ctx context.Context, _ *emptypb.Empty) (*eventspb.GetStatusResponse, error) {
75-
tick, err := s.eventsStore.GetLastProcessedTick(ctx)
75+
tick, err := s.eventsStore.FetchLastProcessedTick()
7676
if err != nil {
7777
return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err)
7878
}
@@ -110,7 +110,7 @@ func (s *EventsService) GetStatus(ctx context.Context, _ *emptypb.Empty) (*event
110110
}
111111

112112
func (s *EventsService) GetTickProcessTime(ctx context.Context, req *eventspb.GetTickProcessTimeRequest) (*eventspb.GetTickProcessTimeResponse, error) {
113-
lastProcessedTick, err := s.eventsStore.GetLastProcessedTick(ctx)
113+
lastProcessedTick, err := s.eventsStore.FetchLastProcessedTick()
114114
if err != nil {
115115
return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err)
116116
}

store/store.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/pkg/errors"
88
eventspb "github.com/qubic/go-events/proto"
99
qubicpb "github.com/qubic/go-qubic/proto/v1"
10+
"golang.org/x/sync/singleflight"
1011
"google.golang.org/protobuf/proto"
1112
"strconv"
1213
)
@@ -15,6 +16,8 @@ const maxTickNumber = ^uint64(0)
1516

1617
var ErrNotFound = errors.New("store resource not found")
1718

19+
var group singleflight.Group
20+
1821
type Store struct {
1922
db *pebble.DB
2023
}
@@ -136,7 +139,17 @@ func (s *Store) SetLastProcessedTick(ctx context.Context, lastProcessedTick *eve
136139
return nil
137140
}
138141

139-
func (s *Store) GetLastProcessedTick(ctx context.Context) (*eventspb.ProcessedTick, error) {
142+
func (s *Store) FetchLastProcessedTick() (*eventspb.ProcessedTick, error) {
143+
144+
value, err, _ := group.Do("key-fetch-lpt", s.GetLastProcessedTick)
145+
if err != nil {
146+
return nil, errors.Wrap(err, "fetching last processed tick")
147+
}
148+
149+
return value.(*eventspb.ProcessedTick), err
150+
}
151+
152+
func (s *Store) GetLastProcessedTick() (interface{}, error) {
140153
key := lastProcessedTickKey()
141154
value, closer, err := s.db.Get(key)
142155
if err != nil {

0 commit comments

Comments
 (0)