Skip to content
This repository was archived by the owner on May 31, 2023. It is now read-only.

Commit e208699

Browse files
author
Richard Patel
committed
handler: add PriceEventHandler
1 parent 41feefa commit e208699

File tree

6 files changed

+283
-6
lines changed

6 files changed

+283
-6
lines changed

accounts.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
bin "github.com/gagliardetto/binary"
2222
"github.com/gagliardetto/solana-go"
23+
"github.com/shopspring/decimal"
2324
)
2425

2526
// Magic is the 32-bit number prefixed on each account.
@@ -137,7 +138,7 @@ type Ema struct {
137138
Denom int64
138139
}
139140

140-
// PriceInfo contains a price adn confidence at a specific slot.
141+
// PriceInfo contains a price and confidence at a specific slot.
141142
//
142143
// This struct can represent either a publisher's contribution or the outcome of price aggregation.
143144
type PriceInfo struct {
@@ -148,6 +149,25 @@ type PriceInfo struct {
148149
PubSlot uint64 // valid publishing slot
149150
}
150151

152+
func (p *PriceInfo) IsZero() bool {
153+
return p == nil || *p == PriceInfo{}
154+
}
155+
156+
// Value returns the parsed price and conf values.
157+
//
158+
// If ok is false, the value is invalid.
159+
func (p *PriceInfo) Value(exponent int32) (price decimal.Decimal, conf decimal.Decimal, ok bool) {
160+
price = decimal.New(p.Price, exponent)
161+
conf = decimal.New(int64(p.Conf), exponent)
162+
ok = p.Status == PriceStatusTrading
163+
return
164+
}
165+
166+
// HasChanged returns whether there was a change between this and another price info.
167+
func (p *PriceInfo) HasChanged(other *PriceInfo) bool {
168+
return (p == nil) != (other == nil) || p.Status != other.Status || p.PubSlot != other.PubSlot
169+
}
170+
151171
// Price status.
152172
const (
153173
PriceStatusUnknown = uint32(iota)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/gagliardetto/binary v0.6.1
88
github.com/gagliardetto/solana-go v1.3.1-0.20220222155336-dd0af958252d
99
github.com/prometheus/client_golang v1.12.1
10+
github.com/shopspring/decimal v1.3.1
1011
github.com/stretchr/testify v1.7.0
1112
go.uber.org/zap v1.21.0
1213
)

handler.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright 2022 Blockdaemon Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package pyth
16+
17+
import (
18+
"sync"
19+
20+
"github.com/gagliardetto/solana-go"
21+
"github.com/shopspring/decimal"
22+
)
23+
24+
// PriceEventHandler provides a callback-style interface to Pyth updates.
25+
type PriceEventHandler struct {
26+
stream *PriceAccountStream
27+
28+
callbacksLock sync.Mutex // lock over the callbacks map
29+
regNonce uint64
30+
callbacks map[solana.PublicKey]priceCallbacks
31+
}
32+
33+
// NewPriceEventHandler creates a new event handler over the stream.
34+
//
35+
// A stream must not be re-used between event handlers.
36+
func NewPriceEventHandler(stream *PriceAccountStream) *PriceEventHandler {
37+
handler := &PriceEventHandler{
38+
stream: stream,
39+
callbacks: make(map[solana.PublicKey]priceCallbacks),
40+
}
41+
go handler.consume(stream.Updates())
42+
return handler
43+
}
44+
45+
// Err returns the reason why the underlying price account stream is closed.
46+
//
47+
// Will block until the stream has actually closed.
48+
// Returns nil if closure was expected.
49+
//
50+
// After this function returns the event handler will not send any more callbacks.
51+
// You could use this function as a barrier for any cleanup tasks relating to callbacks.
52+
func (p *PriceEventHandler) Err() error {
53+
return p.stream.Err()
54+
}
55+
56+
// OnPriceChange registers a callback function to be called
57+
// whenever the aggregate price of the provided price account changes.
58+
func (p *PriceEventHandler) OnPriceChange(priceKey solana.PublicKey, callback func(PriceUpdate)) CallbackHandle {
59+
p.callbacksLock.Lock()
60+
defer p.callbacksLock.Unlock()
61+
return p.getPriceCallbacks(priceKey).onPrice.register(p, callback)
62+
}
63+
64+
// OnComponentChange registers a callback function to be called
65+
// whenever the price component of the given (price account, publisher account) pair changes.
66+
func (p *PriceEventHandler) OnComponentChange(priceKey solana.PublicKey, publisher solana.PublicKey, callback func(PriceUpdate)) CallbackHandle {
67+
p.callbacksLock.Lock()
68+
defer p.callbacksLock.Unlock()
69+
return p.getComponentCallbacks(priceKey, publisher).register(p, callback)
70+
}
71+
72+
func (p *PriceEventHandler) getPriceCallbacks(priceKey solana.PublicKey) priceCallbacks {
73+
// requires lock
74+
res, ok := p.callbacks[priceKey]
75+
if !ok {
76+
res.init()
77+
p.callbacks[priceKey] = res
78+
}
79+
return res
80+
}
81+
82+
func (p *PriceEventHandler) getComponentCallbacks(priceKey solana.PublicKey, publisherKey solana.PublicKey) callbackMap {
83+
// requires lock
84+
price := p.getPriceCallbacks(priceKey)
85+
res, ok := price.componentCallbacks[publisherKey]
86+
if !ok {
87+
res = make(callbackMap)
88+
price.componentCallbacks[publisherKey] = res
89+
}
90+
return res
91+
}
92+
93+
func (p *PriceEventHandler) consume(updates <-chan PriceAccountUpdate) {
94+
for update := range updates {
95+
p.processUpdate(update.Pubkey, update.Price)
96+
}
97+
}
98+
99+
func (p *PriceEventHandler) processUpdate(priceKey solana.PublicKey, acc *PriceAccount) {
100+
p.callbacksLock.Lock()
101+
defer p.callbacksLock.Unlock()
102+
103+
callbacks := p.callbacks[priceKey]
104+
for _, onPrice := range callbacks.onPrice {
105+
onPrice.inform(acc, &acc.Agg)
106+
}
107+
for _, comp := range acc.Components {
108+
if comp.Publisher.IsZero() {
109+
continue
110+
}
111+
compCbs := callbacks.componentCallbacks[comp.Publisher]
112+
for _, onPrice := range compCbs {
113+
onPrice.inform(acc, &comp.Latest)
114+
}
115+
}
116+
}
117+
118+
type priceCallbacks struct {
119+
onPrice callbackMap
120+
componentCallbacks map[solana.PublicKey]callbackMap
121+
}
122+
123+
func (p *priceCallbacks) init() {
124+
p.onPrice = make(callbackMap)
125+
p.componentCallbacks = make(map[solana.PublicKey]callbackMap)
126+
}
127+
128+
type callbackMap map[uint64]*callbackRegistration
129+
130+
func (container callbackMap) register(p *PriceEventHandler, callback func(PriceUpdate)) CallbackHandle {
131+
// requires lock
132+
p.regNonce += 1
133+
key := p.regNonce
134+
135+
handle := CallbackHandle{
136+
handler: p,
137+
container: container,
138+
key: key,
139+
}
140+
container[key] = &callbackRegistration{
141+
handle: handle,
142+
callback: callback,
143+
}
144+
return handle
145+
}
146+
147+
type callbackRegistration struct {
148+
previousInfo *PriceInfo
149+
callback func(PriceUpdate)
150+
handle CallbackHandle
151+
}
152+
153+
func (r *callbackRegistration) inform(acc *PriceAccount, newInfo *PriceInfo) {
154+
if r.previousInfo.HasChanged(newInfo) {
155+
r.callback(PriceUpdate{
156+
Account: acc,
157+
PreviousInfo: r.previousInfo,
158+
CurrentInfo: newInfo,
159+
})
160+
}
161+
r.previousInfo = newInfo
162+
}
163+
164+
// PriceUpdate is returned to callbacks when an aggregate or component price has been updated.
165+
type PriceUpdate struct {
166+
Account *PriceAccount
167+
PreviousInfo *PriceInfo
168+
CurrentInfo *PriceInfo
169+
}
170+
171+
// Previous returns the value of the previously seen price update.
172+
//
173+
// If ok is false, the value is invalid.
174+
func (p PriceUpdate) Previous() (price decimal.Decimal, conf decimal.Decimal, ok bool) {
175+
if !p.PreviousInfo.IsZero() && p.Account != nil {
176+
p.PreviousInfo.Value(p.Account.Exponent)
177+
}
178+
return
179+
}
180+
181+
// Current returns the value of the last price update.
182+
//
183+
// If ok is false, the value is invalid.
184+
func (p PriceUpdate) Current() (price decimal.Decimal, conf decimal.Decimal, ok bool) {
185+
if !p.CurrentInfo.IsZero() && p.Account != nil {
186+
return p.CurrentInfo.Value(p.Account.Exponent)
187+
}
188+
return
189+
}
190+
191+
// CallbackHandle tracks the lifetime of a callback registration.
192+
type CallbackHandle struct {
193+
handler *PriceEventHandler
194+
container callbackMap
195+
key uint64
196+
}
197+
198+
// Unsubscribe de-registers a callback from the handler.
199+
//
200+
// Calling Unsubscribe is optional.
201+
// The handler calls it automatically when the underlying stream closes.
202+
func (c CallbackHandle) Unsubscribe() {
203+
lock := &c.handler.callbacksLock
204+
lock.Lock()
205+
defer lock.Unlock()
206+
207+
delete(c.container, c.key)
208+
}

handler_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2022 Blockdaemon Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package pyth
16+
17+
import (
18+
"log"
19+
"time"
20+
21+
"github.com/gagliardetto/solana-go"
22+
)
23+
24+
func ExamplePriceEventHandler() {
25+
// Connect to Pyth on Solana devnet.
26+
client := NewClient(Devnet, testRPC, testWS)
27+
28+
// Open new event stream.
29+
stream := client.StreamPriceAccounts()
30+
handler := NewPriceEventHandler(stream)
31+
32+
// Subscribe to price account changes.
33+
priceKey := solana.MustPublicKeyFromBase58("J83w4HKfqxwcq3BEMMkPFSppX3gqekLyLJBexebFVkix")
34+
handler.OnPriceChange(priceKey, func(info PriceUpdate) {
35+
price, conf, ok := info.Current()
36+
if ok {
37+
log.Printf("Price change: $%s ± $%s", price, conf)
38+
}
39+
})
40+
41+
// Close stream after a while.
42+
<-time.After(10 * time.Second)
43+
stream.Close()
44+
}

stream.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
)
2929

3030
// StreamPriceAccounts creates a new stream of price account updates.
31+
//
32+
// It will reconnect automatically if the WebSocket connection breaks or stalls.
3133
func (c *Client) StreamPriceAccounts() *PriceAccountStream {
3234
ctx, cancel := context.WithCancel(context.Background())
3335
stream := &PriceAccountStream{
@@ -42,8 +44,9 @@ func (c *Client) StreamPriceAccounts() *PriceAccountStream {
4244

4345
// PriceAccountUpdate is a real-time update carrying a price account change.
4446
type PriceAccountUpdate struct {
45-
Slot uint64
46-
*PriceAccount
47+
Slot uint64
48+
Pubkey solana.PublicKey
49+
Price *PriceAccount
4750
}
4851

4952
// PriceAccountStream is an ongoing stream of on-chain price account updates.
@@ -175,8 +178,9 @@ func (p *PriceAccountStream) readNextUpdate(ctx context.Context, sub *ws.Program
175178

176179
// Send update to channel.
177180
msg := PriceAccountUpdate{
178-
Slot: update.Context.Slot,
179-
PriceAccount: priceAcc,
181+
Slot: update.Context.Slot,
182+
Pubkey: update.Value.Pubkey,
183+
Price: priceAcc,
180184
}
181185
select {
182186
case <-ctx.Done():

stream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,6 @@ func ExampleClient_StreamPriceAccounts() {
2929
}()
3030
// Print updates.
3131
for update := range stream.Updates() {
32-
fmt.Println(update.Agg.Price)
32+
fmt.Println(update.Price.Agg.Price)
3333
}
3434
}

0 commit comments

Comments
 (0)