From 5e13d77aeae8bbff4ede897c943f6d0251f161cf Mon Sep 17 00:00:00 2001 From: David Cauchi <13139524+davidcauchi@users.noreply.github.com> Date: Tue, 28 Jan 2025 13:41:13 +0100 Subject: [PATCH] [ship-3971] Poll for headers when using http rpc (#1537) * Poll headers when WS is not available. --- lib/.changeset/v1.50.21.md | 2 +- lib/blockchain/blockchain.go | 2 + .../chain_header_polling_manager.go | 250 ++++++++++++++++++ lib/blockchain/ethereum.go | 98 ++++++- 4 files changed, 344 insertions(+), 8 deletions(-) create mode 100644 lib/blockchain/chain_header_polling_manager.go diff --git a/lib/.changeset/v1.50.21.md b/lib/.changeset/v1.50.21.md index 8dbbcba70..e2b1e24cc 100644 --- a/lib/.changeset/v1.50.21.md +++ b/lib/.changeset/v1.50.21.md @@ -2,4 +2,4 @@ - Fix start up of Nethermind 1.30.1+ containers - Fix docker 8080 port mappings - Do not change container name, when restarting it -- Automatically forward `SETH_LOG_LEVEL` to k8s \ No newline at end of file +- Automatically forward `SETH_LOG_LEVEL` to k8s diff --git a/lib/blockchain/blockchain.go b/lib/blockchain/blockchain.go index 45b1af142..e75d2e833 100644 --- a/lib/blockchain/blockchain.go +++ b/lib/blockchain/blockchain.go @@ -118,6 +118,8 @@ type EVMClient interface { RawJsonRPCCall(ctx context.Context, result interface{}, method string, params ...interface{}) error GetEthClient() *ethclient.Client + + InitializeHeaderSubscription() error } // NodeHeader header with the ID of the node that received it diff --git a/lib/blockchain/chain_header_polling_manager.go b/lib/blockchain/chain_header_polling_manager.go new file mode 100644 index 000000000..213e5d3f7 --- /dev/null +++ b/lib/blockchain/chain_header_polling_manager.go @@ -0,0 +1,250 @@ +package blockchain + +import ( + "context" + "math" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" + "github.com/rs/zerolog" +) + +type ChainHeaderManager struct { + chainID int64 + pollInterval time.Duration + networkCfg EVMNetwork + logger zerolog.Logger + + ethClient *ethclient.Client + rpcClient *rpc.Client + + done chan struct{} + wg sync.WaitGroup + + headersChan chan *SafeEVMHeader + + mu sync.RWMutex + subscribers map[*EthereumClient]struct{} + + lastProcessed uint64 + + started bool +} + +var ( + chainManagerRegistry = struct { + sync.Mutex + managers map[int64]*ChainHeaderManager + }{ + managers: make(map[int64]*ChainHeaderManager), + } +) + +// getOrCreateChainManager returns an existing manager if found, otherwise creates one. +func getOrCreateChainManager( + chainID int64, + pollInterval time.Duration, + networkCfg EVMNetwork, + logger zerolog.Logger, + ethClient *ethclient.Client, + rpcClient *rpc.Client, +) *ChainHeaderManager { + chainManagerRegistry.Lock() + defer chainManagerRegistry.Unlock() + + if mgr, exists := chainManagerRegistry.managers[chainID]; exists { + return mgr + } + + mgr := newChainHeaderManager(chainID, pollInterval, networkCfg, logger, ethClient, rpcClient) + chainManagerRegistry.managers[chainID] = mgr + return mgr +} + +func removeChainManager(chainID int64) { + chainManagerRegistry.Lock() + defer chainManagerRegistry.Unlock() + delete(chainManagerRegistry.managers, chainID) +} + +// newChainHeaderManager creates the manager but does not start polling automatically +func newChainHeaderManager( + chainID int64, + pollInterval time.Duration, + networkCfg EVMNetwork, + logger zerolog.Logger, + ethClient *ethclient.Client, + rpcClient *rpc.Client, +) *ChainHeaderManager { + return &ChainHeaderManager{ + chainID: chainID, + pollInterval: pollInterval, + networkCfg: networkCfg, + logger: logger, + ethClient: ethClient, + rpcClient: rpcClient, + subscribers: make(map[*EthereumClient]struct{}), + headersChan: make(chan *SafeEVMHeader, 1000), // Buffer to handle rapid blocks + done: make(chan struct{}), + } +} + +// startPolling initiates the two background goroutines (poll + fan-out). +func (m *ChainHeaderManager) startPolling() { + if m.started { + return + } + m.started = true + + // Attempt an initial fetch of the latest block, so we know where to begin + initCtx, cancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration) + defer cancel() + latestHeader, err := m.ethClient.HeaderByNumber(initCtx, nil) + if err != nil { + m.logger.Error(). + Int64("ChainID", m.chainID). + Err(err). + Msg("Failed initial fetch of the latest header, manager won't start polling") + return + } + safeLatest := convertToSafeEVMHeader(latestHeader) + m.lastProcessed = safeLatest.Number.Uint64() - 1 + + m.logger.Info(). + Int64("ChainID", m.chainID). + Uint64("InitialBlock", m.lastProcessed). + Msg("ChainHeaderManager starting polling") + + m.wg.Add(2) + go m.pollRoutine() + go m.fanOutRoutine() +} + +// pollRoutine fetches new headers at a fixed interval and sends them down m.headersChan +func (m *ChainHeaderManager) pollRoutine() { + defer m.wg.Done() + + ticker := time.NewTicker(m.pollInterval) + defer ticker.Stop() + + for { + select { + case <-m.done: + m.logger.Debug(). + Int64("ChainID", m.chainID). + Msg("pollRoutine: shutting down") + return + case <-ticker.C: + if err := m.fetchAndQueueNewHeaders(); err != nil { + m.logger.Error(). + Int64("ChainID", m.chainID). + Err(err). + Msg("pollRoutine: error fetching new headers") + } + } + } +} + +// fanOutRoutine receives newly fetched headers from m.headersChan and distributes them +func (m *ChainHeaderManager) fanOutRoutine() { + defer m.wg.Done() + + for { + select { + case <-m.done: + m.logger.Debug(). + Int64("ChainID", m.chainID). + Msg("fanOutRoutine: shutting down") + return + case hdr := <-m.headersChan: + m.mu.RLock() + for sub := range m.subscribers { + err := sub.receiveHeader(hdr) + if err != nil { + m.logger.Err(err).Msg("Finalizer received error during HTTP polling") + } + } + m.mu.RUnlock() + } + } +} + +// fetchAndQueueNewHeaders fetches the latest header and then loops over any missing blocks +func (m *ChainHeaderManager) fetchAndQueueNewHeaders() error { + ctx, cancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration) + defer cancel() + + latest, err := m.ethClient.HeaderByNumber(ctx, nil) + if err != nil { + return err + } + latestNum := latest.Number.Uint64() + + // We already processed up to X, we process X+1..latest + for blockNum := m.lastProcessed + 1; blockNum <= latestNum; blockNum++ { + if blockNum > math.MaxInt64 { + m.logger.Error().Int64("ChainID", m.chainID). + Uint64("BlockNumber", blockNum). + Msg("blockNum exceeds int64 max, skipping") + continue + } + blockCtx, blockCancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration) + blockHdr, err := m.ethClient.HeaderByNumber(blockCtx, big.NewInt(int64(blockNum))) + blockCancel() + if err != nil { + m.logger.Error(). + Int64("ChainID", m.chainID). + Err(err). + Uint64("BlockNumber", blockNum). + Msg("Could not fetch block header in range") + continue + } + safeHdr := convertToSafeEVMHeader(blockHdr) + m.headersChan <- safeHdr + m.lastProcessed = blockNum + } + return nil +} + +// subscribe attaches an EthereumClient to our manager +func (m *ChainHeaderManager) subscribe(client *EthereumClient) { + m.mu.Lock() + defer m.mu.Unlock() + m.subscribers[client] = struct{}{} +} + +// unsubscribe removes a subscriber from the manager +func (m *ChainHeaderManager) unsubscribe(client *EthereumClient) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.subscribers, client) +} + +// shutdown stops the goroutines and closes channels. +func (m *ChainHeaderManager) shutdown() { + close(m.done) + m.wg.Wait() + close(m.headersChan) +} + +func convertToSafeEVMHeader(hdr *types.Header) *SafeEVMHeader { + if hdr == nil { + return nil + } + var safeTime int64 + if hdr.Time > math.MaxInt64 { + safeTime = math.MaxInt64 + } else { + safeTime = int64(hdr.Time) + } + return &SafeEVMHeader{ + Hash: hdr.Hash(), + Number: hdr.Number, + BaseFee: hdr.BaseFee, + Timestamp: time.Unix(safeTime, 0), + } +} diff --git a/lib/blockchain/ethereum.go b/lib/blockchain/ethereum.go index b9b545ba3..627b064a4 100644 --- a/lib/blockchain/ethereum.go +++ b/lib/blockchain/ethereum.go @@ -105,12 +105,10 @@ func newEVMClient(networkSettings EVMNetwork, logger zerolog.Logger) (EVMClient, return nil, err } ec.gasStats = NewGasStats(ec.ID) - // Check if subscriptions are supported since HTTP does not support subscriptions. - if ec.Client.Client().SupportsSubscriptions() { - err = ec.subscribeToNewHeaders() - if err != nil { - return nil, err - } + + // Initialize header subscription or polling + if err := ec.InitializeHeaderSubscription(); err != nil { + return nil, err } // Check if the chain supports EIP-1559 // https://eips.ethereum.org/EIPS/eip-1559 @@ -701,6 +699,48 @@ func (e *EthereumClient) WaitForFinalizedTx(txHash common.Hash) (*big.Int, time. key := "txFinalizer-" + txHash.String() e.AddHeaderEventSubscription(key, finalizer) defer e.DeleteHeaderEventSubscription(key) + + if !e.Client.Client().SupportsSubscriptions() { + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-finalizer.context.Done(): + return + case <-ticker.C: + latestHeader, err := e.GetLatestFinalizedBlockHeader(context.Background()) + if err != nil { + e.l.Err(err).Msg("Error fetching latest finalized header via HTTP polling") + } + if latestHeader == nil { + e.l.Error().Msg("Latest finalized header is nil") + continue + } + if latestHeader.Time > math.MaxInt64 { + e.l.Error().Msg("Latest finalized header time is too large") + continue + } + + nodeHeader := NodeHeader{ + // NodeID: 0, // Assign appropriate NodeID if needed + SafeEVMHeader: SafeEVMHeader{ + Hash: latestHeader.Hash(), + Number: latestHeader.Number, + Timestamp: time.Unix(int64(latestHeader.Time), 0), + BaseFee: latestHeader.BaseFee, + }, + } + + err = finalizer.ReceiveHeader(nodeHeader) + if err != nil { + e.l.Err(err).Msg("Finalizer received error during HTTP polling") + } + } + } + }() + } err = finalizer.Wait() if err != nil { return nil, time.Time{}, fmt.Errorf("error waiting for finalization: %w in network %s tx %s", err, e.GetNetworkName(), txHash.Hex()) @@ -885,11 +925,24 @@ func (e *EthereumClient) ParallelTransactions(enabled bool) { e.queueTransactions = enabled } -// Close tears down the current open Ethereum client +// Close tears down the current open Ethereum client and unsubscribes from the manager, if any func (e *EthereumClient) Close() error { // close(e.NonceSettings.doneChan) close(e.doneChan) e.subscriptionWg.Wait() + + chainManagerRegistry.Lock() + defer chainManagerRegistry.Unlock() + + mgr, exists := chainManagerRegistry.managers[e.GetChainID().Int64()] + if exists { + mgr.unsubscribe(e) + // If no more subscribers remain, we can shut down the manager + if len(mgr.subscribers) == 0 { + mgr.shutdown() + removeChainManager(e.GetChainID().Int64()) + } + } return nil } @@ -1220,6 +1273,32 @@ func (e *EthereumClient) AvgBlockTime(ctx context.Context) (time.Duration, error return averageBlockTime, nil } +// InitializeHeaderSubscription initializes either subscription-based or polling-based header processing +func (e *EthereumClient) InitializeHeaderSubscription() error { + if e.Client.Client().SupportsSubscriptions() { + return e.subscribeToNewHeaders() + } + // Fallback to polling if subscriptions are not supported + e.l.Info().Str("Network", e.NetworkConfig.Name).Msg("Subscriptions not supported. Using polling for new headers.") + + // Acquire (or create) a manager for this chain + mgr := getOrCreateChainManager( + e.GetChainID().Int64(), + 15*time.Second, // or e.NetworkConfig.PollInterval, etc. + e.NetworkConfig, + e.l, + e.Client, + e.rawRPC, + ) + + // Subscribe + mgr.subscribe(e) + // Start polling if not started + mgr.startPolling() + + return nil +} + // EthereumMultinodeClient wraps the client and the BlockChain network to interact with an EVM based Blockchain with multiple nodes type EthereumMultinodeClient struct { DefaultClient EVMClient @@ -1755,3 +1834,8 @@ func (e *EthereumMultinodeClient) WaitForEvents() error { func (e *EthereumMultinodeClient) ErrorReason(b ethereum.ContractCaller, tx *types.Transaction, receipt *types.Receipt) (string, error) { return e.DefaultClient.ErrorReason(b, tx, receipt) } + +// InitializeHeaderSubscription initializes either subscription-based or polling-based header processing +func (e *EthereumMultinodeClient) InitializeHeaderSubscription() error { + return e.DefaultClient.InitializeHeaderSubscription() +}