Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 111 additions & 33 deletions bchain/coins/eth/ethrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type EthereumRPC struct {
alternativeFeeProvider alternativeFeeProviderInterface
alternativeSendTxProvider *AlternativeSendTxProvider
InternalDataProvider bchain.EthereumInternalDataProvider
consensusMonitor *consensusVersionMonitor
}

// NewEthereumRPC returns new EthRPC instance.
Expand Down Expand Up @@ -400,11 +401,119 @@ func (b *EthereumRPC) Initialize() error {

b.InitAlternativeProviders()

b.consensusMonitor = newConsensusVersionMonitor(b.ChainConfig.ConsensusNodeVersionURL)
b.consensusMonitor.start()

glog.Info("rpc: block chain ", b.Network)

return nil
}

const (
consensusVersionUnreachable = "unreachable-locally"
consensusVersionPollPeriod = 60 * time.Second
)

// consensusVersionMonitor probes the configured consensus node /eth/v1/node/version
// endpoint and caches the latest result. The cached value (real version or
// "unreachable-locally") is the signal exposed via getInfo and the Prometheus
// backend_subversion label; periodic re-probes are silent so a node being
// down does not spam the log.
type consensusVersionMonitor struct {
url string
mu sync.RWMutex
version string
stop chan struct{}
stopOnce sync.Once
}

func newConsensusVersionMonitor(url string) *consensusVersionMonitor {
if url == "" {
return nil
}
return &consensusVersionMonitor{url: url, stop: make(chan struct{})}
}

// start performs an initial synchronous probe (logging one WARN if it fails)
// and then launches a background goroutine that re-probes every
// consensusVersionPollPeriod. Safe to call on a nil receiver.
func (m *consensusVersionMonitor) start() {
if m == nil {
return
}
v, err := m.fetch()
if err != nil {
glog.Warningf("consensus node version probe failed for %s: %v", m.url, err)
v = consensusVersionUnreachable
}
m.set(v)
go m.run()
}

func (m *consensusVersionMonitor) run() {
ticker := time.NewTicker(consensusVersionPollPeriod)
defer ticker.Stop()
for {
select {
case <-m.stop:
return
case <-ticker.C:
v, err := m.fetch()
if err != nil {
v = consensusVersionUnreachable
}
m.set(v)
}
}
}

func (m *consensusVersionMonitor) fetch() (string, error) {
httpClient := &http.Client{Timeout: 2 * time.Second}
resp, err := httpClient.Get(m.url)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("status %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
var v struct {
Data struct {
Version string `json:"version"`
} `json:"data"`
}
if err := json.Unmarshal(body, &v); err != nil {
return "", err
}
return v.Data.Version, nil
}

func (m *consensusVersionMonitor) set(v string) {
m.mu.Lock()
m.version = v
m.mu.Unlock()
}

func (m *consensusVersionMonitor) get() string {
if m == nil {
return ""
}
m.mu.RLock()
defer m.mu.RUnlock()
return m.version
}

func (m *consensusVersionMonitor) shutdown() {
if m == nil {
return
}
m.stopOnce.Do(func() { close(m.stop) })
}

// InitAlternativeProviders initializes alternative providers
func (b *EthereumRPC) InitAlternativeProviders() {
b.initAlternativeFeeProvider()
Expand Down Expand Up @@ -626,6 +735,7 @@ func (b *EthereumRPC) Shutdown(ctx context.Context) error {
b.closeRPC()
b.NewBlock.Close()
b.NewTx.Close()
b.consensusMonitor.shutdown()
glog.Info("rpc: shutdown")
return nil
}
Expand All @@ -640,37 +750,6 @@ func (b *EthereumRPC) GetSubversion() string {
return ""
}

func (b *EthereumRPC) getConsensusVersion() string {
if b.ChainConfig.ConsensusNodeVersionURL == "" {
return ""
}
httpClient := &http.Client{
Timeout: 2 * time.Second,
}
resp, err := httpClient.Get(b.ChainConfig.ConsensusNodeVersionURL)
if err != nil || resp.StatusCode != http.StatusOK {
glog.Error("getConsensusVersion ", err)
return ""
}
body, err := io.ReadAll(resp.Body)
if err != nil {
glog.Error("getConsensusVersion ", err)
return ""
}
type consensusVersion struct {
Data struct {
Version string `json:"version"`
} `json:"data"`
}
var v consensusVersion
err = json.Unmarshal(body, &v)
if err != nil {
glog.Error("getConsensusVersion ", err)
return ""
}
return v.Data.Version
}

// GetChainInfo returns information about the connected backend
func (b *EthereumRPC) GetChainInfo() (*bchain.ChainInfo, error) {
h, err := b.getBestHeader()
Expand All @@ -687,13 +766,12 @@ func (b *EthereumRPC) GetChainInfo() (*bchain.ChainInfo, error) {
if err := b.RPC.CallContext(ctx, &ver, "web3_clientVersion"); err != nil {
return nil, err
}
consensusVersion := b.getConsensusVersion()
rv := &bchain.ChainInfo{
Blocks: int(h.Number().Int64()),
Bestblockhash: h.Hash(),
Difficulty: h.Difficulty().String(),
Version: ver,
ConsensusVersion: consensusVersion,
ConsensusVersion: b.consensusMonitor.get(),
}
idi := int(id.Uint64())
if idi == int(b.MainNetChainID) {
Expand Down
Loading