diff --git a/go.mod b/go.mod index 1ae9b6401d26..a610d8369767 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/antithesishq/antithesis-sdk-go v0.3.8 github.com/ava-labs/avalanchego/graft/coreth v0.16.0-rc.0 github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 - github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d github.com/btcsuite/btcd/btcutil v1.1.3 github.com/cespare/xxhash/v2 v2.3.0 github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 @@ -86,6 +85,8 @@ require ( k8s.io/utils v0.0.0-20230726121419-3b25d923346b ) +require github.com/golang-jwt/jwt/v4 v4.5.2 // indirect + require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect diff --git a/go.sum b/go.sum index c0068716ca84..7a7f0682bce3 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,6 @@ github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 h1:hQ15IJxY7WO github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2/go.mod h1:DqSotSn4Dx/UJV+d3svfW8raR+cH7+Ohl9BpsQ5HlGU= github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19 h1:S6oFasZsplNmw8B2S8cMJQMa62nT5ZKGzZRdCpd+5qQ= github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19/go.mod h1:GVzumIo3zR23/qGRN2AdnVkIPHcKMq/D89EGWZfMGQ0= -github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d h1:7pjEE0BXLjzQlq5uKP5B2BTl9jTgDKaOfJx2Qfb01Jo= -github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d/go.mod h1:JTvIe8YbCjHpy8vy9uZBSpDXxawNXSnIe/Wlf3I09Tk= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -274,7 +272,6 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= -github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= diff --git a/graft/coreth/plugin/evm/atomic/txpool/mempool.go b/graft/coreth/plugin/evm/atomic/txpool/mempool.go index a28cbdd6f8bd..ca00f07b0c03 100644 --- a/graft/coreth/plugin/evm/atomic/txpool/mempool.go +++ b/graft/coreth/plugin/evm/atomic/txpool/mempool.go @@ -9,10 +9,8 @@ import ( "github.com/ava-labs/libevm/log" "github.com/holiman/uint256" - "github.com/prometheus/client_golang/prometheus" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic" - "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p/gossip" ) @@ -39,30 +37,17 @@ var ( // Mempool is a simple mempool for atomic transactions type Mempool struct { *Txs - // bloom is a bloom filter containing the txs in the mempool - bloom *gossip.BloomFilter verify func(tx *atomic.Tx) error } func NewMempool( txs *Txs, - registerer prometheus.Registerer, verify func(tx *atomic.Tx) error, -) (*Mempool, error) { - bloom, err := gossip.NewBloomFilter(registerer, "atomic_mempool_bloom_filter", - config.TxGossipBloomMinTargetElements, - config.TxGossipBloomTargetFalsePositiveRate, - config.TxGossipBloomResetFalsePositiveRate, - ) - if err != nil { - return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) - } - +) *Mempool { return &Mempool{ Txs: txs, - bloom: bloom, verify: verify, - }, nil + } } // Add attempts to add tx to the mempool as a Remote transaction. It is assumed @@ -269,26 +254,6 @@ func (m *Mempool) addTx(tx *atomic.Tx, local bool, force bool) error { m.utxoSpenders[utxoID] = tx } - m.bloom.Add(tx) - reset, err := gossip.ResetBloomFilterIfNeeded(m.bloom, m.length()*config.TxGossipBloomChurnMultiplier) - if err != nil { - return err - } - - if reset { - log.Debug("resetting bloom filter", "reason", "reached max filled ratio") - - for _, pendingTx := range m.pendingTxs.minHeap.items { - m.bloom.Add(pendingTx.tx) - } - // Current transactions must be added to the bloom filter as well - // because they could be added back into the pending set without going - // through addTx again. - for _, currentTx := range m.currentTxs { - m.bloom.Add(currentTx) - } - } - // When adding a transaction to the mempool, we make sure that there is an // item in Pending to signal the VM to produce a block. select { @@ -297,10 +262,3 @@ func (m *Mempool) addTx(tx *atomic.Tx, local bool, force bool) error { } return nil } - -func (m *Mempool) GetFilter() ([]byte, []byte) { - m.lock.RLock() - defer m.lock.RUnlock() - - return m.bloom.Marshal() -} diff --git a/graft/coreth/plugin/evm/atomic/txpool/mempool_test.go b/graft/coreth/plugin/evm/atomic/txpool/mempool_test.go index 65f4872ba8d8..b6408b68ba35 100644 --- a/graft/coreth/plugin/evm/atomic/txpool/mempool_test.go +++ b/graft/coreth/plugin/evm/atomic/txpool/mempool_test.go @@ -4,29 +4,23 @@ package txpool import ( - "math" "testing" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic/atomictest" - "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/snow/snowtest" - "github.com/ava-labs/avalanchego/utils/bloom" ) func TestMempoolAddTx(t *testing.T) { require := require.New(t) ctx := snowtest.Context(t, snowtest.CChainID) - m, err := NewMempool( + m := NewMempool( NewTxs(ctx, 5_000), - prometheus.NewRegistry(), nil, ) - require.NoError(err) txs := make([]*atomic.Tx, 0) for i := 0; i < 3_000; i++ { @@ -34,10 +28,6 @@ func TestMempoolAddTx(t *testing.T) { txs = append(txs, tx) require.NoError(m.Add(tx)) } - - for _, tx := range txs { - require.True(m.bloom.Has(tx)) - } } // Add should return an error if a tx is already known @@ -45,16 +35,14 @@ func TestMempoolAdd(t *testing.T) { require := require.New(t) ctx := snowtest.Context(t, snowtest.CChainID) - m, err := NewMempool( + m := NewMempool( NewTxs(ctx, 5_000), - prometheus.NewRegistry(), nil, ) - require.NoError(err) tx := atomictest.GenerateTestImportTxWithGas(1, 1) require.NoError(m.Add(tx)) - err = m.Add(tx) + err := m.Add(tx) require.ErrorIs(err, ErrAlreadyKnown) } @@ -63,62 +51,16 @@ func TestMempoolAddNoGas(t *testing.T) { require := require.New(t) ctx := snowtest.Context(t, snowtest.CChainID) - m, err := NewMempool( + m := NewMempool( NewTxs(ctx, 5_000), - prometheus.NewRegistry(), nil, ) - require.NoError(err) tx := atomictest.GenerateTestImportTxWithGas(0, 1) - err = m.Add(tx) + err := m.Add(tx) require.ErrorIs(err, atomic.ErrNoGasUsed) } -// Add should return an error if a tx doesn't consume any gas -func TestMempoolAddBloomReset(t *testing.T) { - require := require.New(t) - - ctx := snowtest.Context(t, snowtest.CChainID) - m, err := NewMempool( - NewTxs(ctx, 2), - prometheus.NewRegistry(), - nil, - ) - require.NoError(err) - - maxFeeTx := atomictest.GenerateTestImportTxWithGas(1, math.MaxUint64) - require.NoError(m.Add(maxFeeTx)) - - // Mark maxFeeTx as Current - tx, ok := m.NextTx() - require.True(ok) - require.Equal(maxFeeTx, tx) - - numHashes, numEntries := bloom.OptimalParameters( - config.TxGossipBloomMinTargetElements, - config.TxGossipBloomTargetFalsePositiveRate, - ) - txsToAdd := bloom.EstimateCount( - numHashes, - numEntries, - config.TxGossipBloomResetFalsePositiveRate, - ) - for fee := range txsToAdd { - // Keep increasing the fee to evict older transactions - tx := atomictest.GenerateTestImportTxWithGas(1, uint64(fee)) - require.NoError(m.Add(tx)) - } - - // Mark maxFeeTx as Pending - m.CancelCurrentTxs() - - m.Iterate(func(tx *atomic.Tx) bool { - require.True(m.bloom.Has(tx)) - return true // Iterate over the whole mempool - }) -} - func TestAtomicMempoolIterate(t *testing.T) { txs := []*atomic.Tx{ atomictest.GenerateTestImportTxWithGas(1, 1), @@ -162,12 +104,10 @@ func TestAtomicMempoolIterate(t *testing.T) { require := require.New(t) ctx := snowtest.Context(t, snowtest.CChainID) - m, err := NewMempool( + m := NewMempool( NewTxs(ctx, 10), - prometheus.NewRegistry(), nil, ) - require.NoError(err) for _, add := range tt.add { require.NoError(m.Add(add)) @@ -197,12 +137,10 @@ func TestMempoolMaxSizeHandling(t *testing.T) { require := require.New(t) ctx := snowtest.Context(t, snowtest.CChainID) - mempool, err := NewMempool( + mempool := NewMempool( NewTxs(ctx, 1), - prometheus.NewRegistry(), nil, ) - require.NoError(err) lowFeeTx := atomictest.GenerateTestImportTxWithGas(1, 1) highFeeTx := atomictest.GenerateTestImportTxWithGas(1, 2) @@ -216,7 +154,7 @@ func TestMempoolMaxSizeHandling(t *testing.T) { // Because Current transactions can not be evicted, the mempool should // report full. - err = mempool.Add(highFeeTx) + err := mempool.Add(highFeeTx) require.ErrorIs(err, ErrMempoolFull) // Mark the lowFeeTx as Issued @@ -236,19 +174,17 @@ func TestMempoolPriorityDrop(t *testing.T) { require := require.New(t) ctx := snowtest.Context(t, snowtest.CChainID) - mempool, err := NewMempool( + mempool := NewMempool( NewTxs(ctx, 1), - prometheus.NewRegistry(), nil, ) - require.NoError(err) tx1 := atomictest.GenerateTestImportTxWithGas(1, 2) // lower fee require.NoError(mempool.AddRemoteTx(tx1)) require.True(mempool.Has(tx1.ID())) tx2 := atomictest.GenerateTestImportTxWithGas(1, 2) // lower fee - err = mempool.AddRemoteTx(tx2) + err := mempool.AddRemoteTx(tx2) require.ErrorIs(err, ErrInsufficientFee) require.True(mempool.Has(tx1.ID())) require.False(mempool.Has(tx2.ID())) @@ -266,12 +202,10 @@ func TestMempoolPendingLen(t *testing.T) { require := require.New(t) ctx := snowtest.Context(t, snowtest.CChainID) - mempool, err := NewMempool( + mempool := NewMempool( NewTxs(ctx, 2), - prometheus.NewRegistry(), nil, ) - require.NoError(err) tx1 := atomictest.GenerateTestImportTxWithGas(1, 1) tx2 := atomictest.GenerateTestImportTxWithGas(1, 2) diff --git a/graft/coreth/plugin/evm/atomic/txpool/txs.go b/graft/coreth/plugin/evm/atomic/txpool/txs.go index 7e18dd5f3cb5..4c9150a4370c 100644 --- a/graft/coreth/plugin/evm/atomic/txpool/txs.go +++ b/graft/coreth/plugin/evm/atomic/txpool/txs.go @@ -75,7 +75,7 @@ func NewTxs(ctx *snow.Context, maxSize int) *Txs { } } -// PendingLen returns the number of pending transactions. +// PendingLen returns the number of Pending transactions. func (t *Txs) PendingLen() int { t.lock.RLock() defer t.lock.RUnlock() @@ -83,8 +83,16 @@ func (t *Txs) PendingLen() int { return t.pendingTxs.Len() } -// Iterate applies f to all Pending transactions. If f returns false, the -// iteration stops early. +// Len returns the number of Current and Pending transactions. +func (t *Txs) Len() int { + t.lock.RLock() + defer t.lock.RUnlock() + + return len(t.currentTxs) + t.pendingTxs.Len() +} + +// Iterate applies f to all Current and Pending transactions. If f returns +// false, the iteration stops early. func (t *Txs) Iterate(f func(tx *atomic.Tx) bool) { t.lock.RLock() defer t.lock.RUnlock() @@ -94,6 +102,11 @@ func (t *Txs) Iterate(f func(tx *atomic.Tx) bool) { return } } + for _, tx := range t.currentTxs { + if !f(tx) { + return + } + } } // NextTx returns the highest paying Pending transaction from the mempool and diff --git a/graft/coreth/plugin/evm/atomic/vm/api.go b/graft/coreth/plugin/evm/atomic/vm/api.go index 5337f5f987da..6a2015cfa057 100644 --- a/graft/coreth/plugin/evm/atomic/vm/api.go +++ b/graft/coreth/plugin/evm/atomic/vm/api.go @@ -141,7 +141,8 @@ func (service *AvaxAPI) IssueTx(_ *http.Request, args *api.FormattedTx, response return fmt.Errorf("problem initializing transaction: %w", err) } - response.TxID = tx.ID() + txID := tx.ID() + response.TxID = txID service.vm.Ctx.Lock.Lock() defer service.vm.Ctx.Lock.Unlock() @@ -156,7 +157,13 @@ func (service *AvaxAPI) IssueTx(_ *http.Request, args *api.FormattedTx, response // to the mempool through p2p gossip, this will ensure this node also pushes // it to the network. service.vm.AtomicTxPushGossiper.Add(tx) - return nil + if err != nil { + return nil + } + + // If we just added the tx to the mempool, add it to the gossip bloom + // filter. + return service.vm.atomicGossipSet.AddToBloom(txID) } // GetAtomicTxStatus returns the status of the specified transaction diff --git a/graft/coreth/plugin/evm/atomic/vm/block_extension.go b/graft/coreth/plugin/evm/atomic/vm/block_extension.go index 74a30a11522f..e6ef0f7cfcbe 100644 --- a/graft/coreth/plugin/evm/atomic/vm/block_extension.go +++ b/graft/coreth/plugin/evm/atomic/vm/block_extension.go @@ -214,8 +214,21 @@ func (be *blockExtension) Reject() error { for _, tx := range be.atomicTxs { // Re-issue the transaction in the mempool, continue even if it fails vm.AtomicMempool.RemoveTx(tx) + + txID := tx.ID() if err := vm.AtomicMempool.AddRemoteTx(tx); err != nil { - log.Debug("Failed to re-issue transaction in rejected block", "txID", tx.ID(), "err", err) + log.Debug("Failed to re-issue transaction in rejected block", + "txID", txID, + "err", err, + ) + continue + } + + if err := vm.atomicGossipSet.AddToBloom(txID); err != nil { + log.Debug("Failed to add transaction to gossip bloom in rejected block", + "txID", txID, + "err", err, + ) } } atomicState, err := vm.AtomicBackend.GetVerifiedAtomicState(common.Hash(be.block.ID())) diff --git a/graft/coreth/plugin/evm/atomic/vm/vm.go b/graft/coreth/plugin/evm/atomic/vm/vm.go index 53e53918634c..506d4a924881 100644 --- a/graft/coreth/plugin/evm/atomic/vm/vm.go +++ b/graft/coreth/plugin/evm/atomic/vm/vm.go @@ -82,10 +82,13 @@ type VM struct { Ctx *snow.Context // TODO: unexport these fields - SecpCache *secp256k1.RecoverCache - Fx secp256k1fx.Fx - baseCodec codec.Registry - AtomicMempool *txpool.Mempool + SecpCache *secp256k1.RecoverCache + Fx secp256k1fx.Fx + baseCodec codec.Registry + + AtomicMempool *txpool.Mempool + atomicGossipSet *avalanchegossip.SetWithBloomFilter[*atomic.Tx] + AtomicTxPushGossiper *avalanchegossip.PushGossiper[*atomic.Tx] // AtomicTxRepository maintains two indexes on accepted atomic txs. // - txID to accepted atomic tx @@ -95,8 +98,6 @@ type VM struct { // AtomicBackend abstracts verification and processing of atomic transactions AtomicBackend *atomicstate.AtomicBackend - AtomicTxPushGossiper *avalanchegossip.PushGossiper[*atomic.Tx] - // cancel may be nil until [snow.NormalOp] starts cancel context.CancelFunc shutdownWg sync.WaitGroup @@ -177,11 +178,19 @@ func (vm *VM) Initialize( return fmt.Errorf("failed to initialize inner VM: %w", err) } - atomicMempool, err := txpool.NewMempool(atomicTxs, vm.InnerVM.MetricRegistry(), vm.verifyTxAtTip) + vm.AtomicMempool = txpool.NewMempool(atomicTxs, vm.verifyTxAtTip) + atomicGossipSet, err := avalanchegossip.NewSetWithBloomFilter[*atomic.Tx]( + vm.AtomicMempool, + vm.InnerVM.MetricRegistry(), + "atomic_mempool_bloom_filter", + config.TxGossipBloomMinTargetElements, + config.TxGossipBloomTargetFalsePositiveRate, + config.TxGossipBloomResetFalsePositiveRate, + ) if err != nil { - return fmt.Errorf("failed to initialize mempool: %w", err) + return fmt.Errorf("failed to initialize atomic gossip set: %w", err) } - vm.AtomicMempool = atomicMempool + vm.atomicGossipSet = atomicGossipSet // initialize bonus blocks on mainnet var ( @@ -280,7 +289,7 @@ func (vm *VM) onNormalOperationsStarted() error { vm.AtomicTxPushGossiper, err = avalanchegossip.NewPushGossiper[*atomic.Tx]( &atomicTxGossipMarshaller, - vm.AtomicMempool, + vm.atomicGossipSet, vm.InnerVM.P2PValidators(), atomicTxGossipClient, atomicTxGossipMetrics, @@ -297,7 +306,7 @@ func (vm *VM) onNormalOperationsStarted() error { atomicTxGossipHandler, err := gossip.NewTxGossipHandler[*atomic.Tx]( vm.Ctx.Log, &atomicTxGossipMarshaller, - vm.AtomicMempool, + vm.atomicGossipSet, atomicTxGossipMetrics, config.TxGossipTargetMessageSize, config.TxGossipThrottlingPeriod, @@ -317,7 +326,7 @@ func (vm *VM) onNormalOperationsStarted() error { atomicTxPullGossiper := avalanchegossip.NewPullGossiper[*atomic.Tx]( vm.Ctx.Log, &atomicTxGossipMarshaller, - vm.AtomicMempool, + vm.atomicGossipSet, atomicTxGossipClient, atomicTxGossipMetrics, config.TxGossipPollSize, diff --git a/graft/coreth/plugin/evm/config/constants.go b/graft/coreth/plugin/evm/config/constants.go index 4fe953def797..e7bedeaa6a7a 100644 --- a/graft/coreth/plugin/evm/config/constants.go +++ b/graft/coreth/plugin/evm/config/constants.go @@ -13,7 +13,6 @@ const ( TxGossipBloomMinTargetElements = 8 * 1024 TxGossipBloomTargetFalsePositiveRate = 0.01 TxGossipBloomResetFalsePositiveRate = 0.05 - TxGossipBloomChurnMultiplier = 3 PushGossipDiscardedElements = 16_384 TxGossipTargetMessageSize = 20 * units.KiB TxGossipThrottlingPeriod = time.Hour diff --git a/graft/coreth/plugin/evm/eth_gossiper.go b/graft/coreth/plugin/evm/eth_gossiper.go deleted file mode 100644 index b6129670e012..000000000000 --- a/graft/coreth/plugin/evm/eth_gossiper.go +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -// TODO: move to network - -package evm - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - - "github.com/ava-labs/libevm/core/types" - "github.com/ava-labs/libevm/log" - "github.com/prometheus/client_golang/prometheus" - - "github.com/ava-labs/avalanchego/graft/coreth/core" - "github.com/ava-labs/avalanchego/graft/coreth/core/txpool" - "github.com/ava-labs/avalanchego/graft/coreth/eth" - "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/network/p2p/gossip" - - ethcommon "github.com/ava-labs/libevm/common" -) - -const pendingTxsBuffer = 10 - -var ( - _ gossip.Gossipable = (*GossipEthTx)(nil) - _ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil) - _ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil) - - _ eth.PushGossiper = (*EthPushGossiper)(nil) -) - -func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer) (*GossipEthTxPool, error) { - bloom, err := gossip.NewBloomFilter( - registerer, - "eth_tx_bloom_filter", - config.TxGossipBloomMinTargetElements, - config.TxGossipBloomTargetFalsePositiveRate, - config.TxGossipBloomResetFalsePositiveRate, - ) - if err != nil { - return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) - } - - return &GossipEthTxPool{ - mempool: mempool, - pendingTxs: make(chan core.NewTxsEvent, pendingTxsBuffer), - bloom: bloom, - }, nil -} - -type GossipEthTxPool struct { - mempool *txpool.TxPool - pendingTxs chan core.NewTxsEvent - - bloom *gossip.BloomFilter - lock sync.RWMutex - - // subscribed is set to true when the gossip subscription is active - // mostly used for testing - subscribed atomic.Bool -} - -// IsSubscribed returns whether or not the gossip subscription is active. -func (g *GossipEthTxPool) IsSubscribed() bool { - return g.subscribed.Load() -} - -func (g *GossipEthTxPool) Subscribe(ctx context.Context) { - sub := g.mempool.SubscribeTransactions(g.pendingTxs, false) - if sub == nil { - log.Warn("failed to subscribe to new txs event") - return - } - g.subscribed.CompareAndSwap(false, true) - defer func() { - sub.Unsubscribe() - g.subscribed.CompareAndSwap(true, false) - }() - - for { - select { - case <-ctx.Done(): - log.Debug("shutting down subscription") - return - case pendingTxs := <-g.pendingTxs: - g.lock.Lock() - optimalElements := (g.mempool.PendingSize(txpool.PendingFilter{}) + len(pendingTxs.Txs)) * config.TxGossipBloomChurnMultiplier - for _, pendingTx := range pendingTxs.Txs { - tx := &GossipEthTx{Tx: pendingTx} - g.bloom.Add(tx) - reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, optimalElements) - if err != nil { - log.Error("failed to reset bloom filter", "err", err) - continue - } - - if reset { - log.Debug("resetting bloom filter", "reason", "reached max filled ratio") - - g.mempool.IteratePending(func(tx *types.Transaction) bool { - g.bloom.Add(&GossipEthTx{Tx: tx}) - return true - }) - } - } - g.lock.Unlock() - } - } -} - -// Add enqueues the transaction to the mempool. Subscribe should be called -// to receive an event if tx is actually added to the mempool or not. -func (g *GossipEthTxPool) Add(tx *GossipEthTx) error { - return g.mempool.Add([]*types.Transaction{tx.Tx}, false, false)[0] -} - -// Has should just return whether or not the [txID] is still in the mempool, -// not whether it is in the mempool AND pending. -func (g *GossipEthTxPool) Has(txID ids.ID) bool { - return g.mempool.Has(ethcommon.Hash(txID)) -} - -func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) { - g.mempool.IteratePending(func(tx *types.Transaction) bool { - return f(&GossipEthTx{Tx: tx}) - }) -} - -func (g *GossipEthTxPool) GetFilter() ([]byte, []byte) { - g.lock.RLock() - defer g.lock.RUnlock() - - return g.bloom.Marshal() -} - -type GossipEthTxMarshaller struct{} - -func (GossipEthTxMarshaller) MarshalGossip(tx *GossipEthTx) ([]byte, error) { - return tx.Tx.MarshalBinary() -} - -func (GossipEthTxMarshaller) UnmarshalGossip(bytes []byte) (*GossipEthTx, error) { - tx := &GossipEthTx{ - Tx: &types.Transaction{}, - } - - return tx, tx.Tx.UnmarshalBinary(bytes) -} - -type GossipEthTx struct { - Tx *types.Transaction -} - -func (tx *GossipEthTx) GossipID() ids.ID { - return ids.ID(tx.Tx.Hash()) -} - -// EthPushGossiper is used by the ETH backend to push transactions issued over -// the RPC and added to the mempool to peers. -type EthPushGossiper struct { - vm *VM -} - -func (e *EthPushGossiper) Add(tx *types.Transaction) { - // eth.Backend is initialized before the [ethTxPushGossiper] is created, so - // we just ignore any gossip requests until it is set. - ethTxPushGossiper := e.vm.ethTxPushGossiper.Get() - if ethTxPushGossiper == nil { - return - } - ethTxPushGossiper.Add(&GossipEthTx{tx}) -} diff --git a/graft/coreth/plugin/evm/gossip.go b/graft/coreth/plugin/evm/gossip.go new file mode 100644 index 000000000000..a0def62a3614 --- /dev/null +++ b/graft/coreth/plugin/evm/gossip.go @@ -0,0 +1,97 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +// TODO: move to network + +package evm + +import ( + "github.com/ava-labs/libevm/core/types" + + "github.com/ava-labs/avalanchego/graft/coreth/core/txpool" + "github.com/ava-labs/avalanchego/graft/coreth/eth" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/utils" + + ethcommon "github.com/ava-labs/libevm/common" +) + +var ( + _ gossip.Gossipable = (*gossipTx)(nil) + _ gossip.Marshaller[*gossipTx] = (*gossipTxMarshaller)(nil) + _ gossip.Set[*gossipTx] = (*gossipTxPool)(nil) + + _ eth.PushGossiper = (*atomicPushGossiper)(nil) +) + +func newGossipTxPool(mempool *txpool.TxPool) *gossipTxPool { + return &gossipTxPool{ + mempool: mempool, + } +} + +type gossipTxPool struct { + mempool *txpool.TxPool +} + +// Add enqueues the transaction to the mempool. +func (g *gossipTxPool) Add(tx *gossipTx) error { + return g.mempool.Add([]*types.Transaction{tx.tx}, false, false)[0] +} + +// Has should just return whether or not the [txID] is still in the mempool, +// not whether it is in the mempool AND pending. +func (g *gossipTxPool) Has(txID ids.ID) bool { + return g.mempool.Has(ethcommon.Hash(txID)) +} + +func (g *gossipTxPool) Iterate(f func(tx *gossipTx) bool) { + g.mempool.IteratePending(func(tx *types.Transaction) bool { + return f(&gossipTx{tx: tx}) + }) +} + +func (g *gossipTxPool) Len() int { + return g.mempool.PendingSize(txpool.PendingFilter{}) +} + +type gossipTxMarshaller struct{} + +func (gossipTxMarshaller) MarshalGossip(tx *gossipTx) ([]byte, error) { + return tx.tx.MarshalBinary() +} + +func (gossipTxMarshaller) UnmarshalGossip(bytes []byte) (*gossipTx, error) { + tx := &gossipTx{ + tx: &types.Transaction{}, + } + + return tx, tx.tx.UnmarshalBinary(bytes) +} + +type gossipTx struct { + tx *types.Transaction +} + +func (tx *gossipTx) GossipID() ids.ID { + return ids.ID(tx.tx.Hash()) +} + +// atomicPushGossiper is used by the ETH backend to push transactions issued +// over the RPC and added to the mempool to peers. +type atomicPushGossiper struct { + pusher *utils.Atomic[*gossip.PushGossiper[*gossipTx]] + set **gossip.SetWithBloomFilter[*gossipTx] +} + +func (e *atomicPushGossiper) Add(tx *types.Transaction) { + // eth.Backend is initialized before the pusher is created, so we just + // ignore any gossip requests until it is set. + pusher := e.pusher.Get() + if pusher == nil { + return + } + _ = (*e.set).AddToBloom(ids.ID(tx.Hash())) + pusher.Add(&gossipTx{tx}) +} diff --git a/graft/coreth/plugin/evm/gossip_test.go b/graft/coreth/plugin/evm/gossip_test.go index 15523adc24b6..4a0c7f711d2f 100644 --- a/graft/coreth/plugin/evm/gossip_test.go +++ b/graft/coreth/plugin/evm/gossip_test.go @@ -4,7 +4,6 @@ package evm import ( - "context" "crypto/ecdsa" "math/big" "strings" @@ -15,8 +14,6 @@ import ( "github.com/ava-labs/libevm/core/rawdb" "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/core/vm" - "github.com/ava-labs/libevm/crypto" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/graft/coreth/consensus/dummy" @@ -24,16 +21,14 @@ import ( "github.com/ava-labs/avalanchego/graft/coreth/core/txpool" "github.com/ava-labs/avalanchego/graft/coreth/core/txpool/legacypool" "github.com/ava-labs/avalanchego/graft/coreth/params" - "github.com/ava-labs/avalanchego/graft/coreth/utils" - "github.com/ava-labs/avalanchego/network/p2p/gossip" ) func TestGossipEthTxMarshaller(t *testing.T) { require := require.New(t) blobTx := &types.BlobTx{} - want := &GossipEthTx{Tx: types.NewTx(blobTx)} - marshaller := GossipEthTxMarshaller{} + want := &gossipTx{tx: types.NewTx(blobTx)} + marshaller := gossipTxMarshaller{} bytes, err := marshaller.MarshalGossip(want) require.NoError(err) @@ -43,54 +38,6 @@ func TestGossipEthTxMarshaller(t *testing.T) { require.Equal(want.GossipID(), got.GossipID()) } -func TestGossipSubscribe(t *testing.T) { - require := require.New(t) - key, err := crypto.GenerateKey() - require.NoError(err) - addr := crypto.PubkeyToAddress(key.PublicKey) - - require.NoError(err) - txPool := setupPoolWithConfig(t, params.TestChainConfig, addr) - defer txPool.Close() - txPool.SetGasTip(common.Big1) - txPool.SetMinFee(common.Big0) - - gossipTxPool, err := NewGossipEthTxPool(txPool, prometheus.NewRegistry()) - require.NoError(err) - - // use a custom bloom filter to test the bloom filter reset - gossipTxPool.bloom, err = gossip.NewBloomFilter(prometheus.NewRegistry(), "", 1, 0.01, 0.0000000000000001) // maxCount =1 - require.NoError(err) - ctx, cancel := context.WithCancel(t.Context()) - defer cancel() - go gossipTxPool.Subscribe(ctx) - - require.Eventually(func() bool { - return gossipTxPool.IsSubscribed() - }, 10*time.Second, 500*time.Millisecond, "expected gossipTxPool to be subscribed") - - // create eth txs - ethTxs := getValidEthTxs(key, 10, big.NewInt(226*utils.GWei)) - - // Notify mempool about txs - errs := txPool.AddRemotesSync(ethTxs) - for _, err := range errs { - require.NoError(err, "failed adding tx to remote mempool") - } - - require.Eventually(func() bool { - gossipTxPool.lock.RLock() - defer gossipTxPool.lock.RUnlock() - - for _, tx := range ethTxs { - if !gossipTxPool.bloom.Has(&GossipEthTx{Tx: tx}) { - return false - } - } - return true - }, 30*time.Second, 500*time.Millisecond, "expected all transactions to eventually be in the bloom filter") -} - func setupPoolWithConfig(t *testing.T, config *params.ChainConfig, fundedAddress common.Address) *txpool.TxPool { diskdb := rawdb.NewMemoryDatabase() engine := dummy.NewETHFaker() diff --git a/graft/coreth/plugin/evm/tx_gossip_test.go b/graft/coreth/plugin/evm/tx_gossip_test.go index 7f08d501969f..cd5bbe7bd000 100644 --- a/graft/coreth/plugin/evm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/tx_gossip_test.go @@ -151,7 +151,7 @@ func TestEthTxGossip(t *testing.T) { // wait so we aren't throttled by the vm time.Sleep(5 * time.Second) - marshaller := GossipEthTxMarshaller{} + marshaller := gossipTxMarshaller{} // Ask the VM for new transactions. We should get the newly issued tx. wg.Add(1) onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { @@ -163,7 +163,7 @@ func TestEthTxGossip(t *testing.T) { gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0]) require.NoError(err) - require.Equal(signedTx.Hash(), gotTx.Tx.Hash()) + require.Equal(signedTx.Hash(), gotTx.tx.Hash()) wg.Done() } @@ -213,7 +213,7 @@ func TestEthTxPushGossipOutbound(t *testing.T) { // issue a tx require.NoError(vm.txPool.Add([]*types.Transaction{signedTx}, true, true)[0]) - vm.ethTxPushGossiper.Get().Add(&GossipEthTx{signedTx}) + vm.gossipPusher.Get().Add(&gossipTx{signedTx}) sent := <-sender.SentAppGossip got := &sdk.PushGossip{} @@ -223,7 +223,7 @@ func TestEthTxPushGossipOutbound(t *testing.T) { require.Equal(byte(p2p.TxGossipHandlerID), sent[0]) require.NoError(proto.Unmarshal(sent[1:], got)) - marshaller := GossipEthTxMarshaller{} + marshaller := gossipTxMarshaller{} require.Len(got.Gossip, 1) gossipedTx, err := marshaller.UnmarshalGossip(got.Gossip[0]) require.NoError(err) @@ -266,9 +266,9 @@ func TestEthTxPushGossipInbound(t *testing.T) { signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), pk.ToECDSA()) require.NoError(err) - marshaller := GossipEthTxMarshaller{} - gossipedTx := &GossipEthTx{ - Tx: signedTx, + marshaller := gossipTxMarshaller{} + gossipedTx := &gossipTx{ + tx: signedTx, } gossipedTxBytes, err := marshaller.MarshalGossip(gossipedTx) require.NoError(err) diff --git a/graft/coreth/plugin/evm/vm.go b/graft/coreth/plugin/evm/vm.go index 44fac1fa0d31..4a65cde9bb35 100644 --- a/graft/coreth/plugin/evm/vm.go +++ b/graft/coreth/plugin/evm/vm.go @@ -200,6 +200,11 @@ type VM struct { blockChain *core.BlockChain miner *miner.Miner + gossipClient *p2p.Client + gossipMetrics avalanchegossip.Metrics + gossipSet *avalanchegossip.SetWithBloomFilter[*gossipTx] // gossipSet must be initialized before gossipPusher + gossipPusher avalancheUtils.Atomic[*avalanchegossip.PushGossiper[*gossipTx]] + // [versiondb] is the VM's current versioned database versiondb *versiondb.Database @@ -250,8 +255,6 @@ type VM struct { // Used to serve BLS signatures of warp messages over RPC warpBackend warp.Backend - ethTxPushGossiper avalancheUtils.Atomic[*avalanchegossip.PushGossiper[*GossipEthTx]] - chainAlias string // RPC handlers (should be stopped before closing chaindb) rpcHandlers []interface{ Stop() } @@ -462,6 +465,53 @@ func (vm *VM) Initialize( return err } + { + vm.gossipClient = vm.Network.NewClient(p2p.TxGossipHandlerID) + vm.gossipMetrics, err = avalanchegossip.NewMetrics(vm.sdkMetrics, ethTxGossipNamespace) + if err != nil { + return fmt.Errorf("failed to initialize eth tx gossip metrics: %w", err) + } + + vm.gossipSet, err = avalanchegossip.NewSetWithBloomFilter[*gossipTx]( + newGossipTxPool(vm.txPool), + vm.sdkMetrics, + "eth_tx_bloom_filter", + config.TxGossipBloomMinTargetElements, + config.TxGossipBloomTargetFalsePositiveRate, + config.TxGossipBloomResetFalsePositiveRate, + ) + if err != nil { + return fmt.Errorf("failed to create eth tx gossip set: %w", err) + } + + pushGossipParams := avalanchegossip.BranchingFactor{ + StakePercentage: vm.config.PushGossipPercentStake, + Validators: vm.config.PushGossipNumValidators, + Peers: vm.config.PushGossipNumPeers, + } + pushRegossipParams := avalanchegossip.BranchingFactor{ + Validators: vm.config.PushRegossipNumValidators, + Peers: vm.config.PushRegossipNumPeers, + } + + gossipPusher, err := avalanchegossip.NewPushGossiper[*gossipTx]( + gossipTxMarshaller{}, + vm.gossipSet, + vm.P2PValidators(), + vm.gossipClient, + vm.gossipMetrics, + pushGossipParams, + pushRegossipParams, + config.PushGossipDiscardedElements, + config.TxGossipTargetMessageSize, + vm.config.RegossipFrequency.Duration, + ) + if err != nil { + return fmt.Errorf("failed to initialize eth tx push gossiper: %w", err) + } + vm.gossipPusher.Set(gossipPusher) + } + go vm.ctx.Log.RecoverAndPanic(vm.startContinuousProfiler) // Add p2p warp message warpHandler @@ -553,7 +603,10 @@ func (vm *VM) initializeChain(lastAcceptedHash common.Hash) error { vm.eth, err = eth.New( node, &vm.ethConfig, - &EthPushGossiper{vm: vm}, + &atomicPushGossiper{ + pusher: &vm.gossipPusher, + set: &vm.gossipSet, + }, vm.chaindb, eth.Settings{MaxBlocksPerRequest: vm.config.MaxBlocksPerRequest}, lastAcceptedHash, @@ -763,48 +816,6 @@ func (vm *VM) initBlockBuilding() error { ctx, cancel := context.WithCancel(context.TODO()) vm.cancel = cancel - ethTxGossipMarshaller := GossipEthTxMarshaller{} - ethTxGossipClient := vm.Network.NewClient(p2p.TxGossipHandlerID) - ethTxGossipMetrics, err := avalanchegossip.NewMetrics(vm.sdkMetrics, ethTxGossipNamespace) - if err != nil { - return fmt.Errorf("failed to initialize eth tx gossip metrics: %w", err) - } - ethTxPool, err := NewGossipEthTxPool(vm.txPool, vm.sdkMetrics) - if err != nil { - return fmt.Errorf("failed to initialize gossip eth tx pool: %w", err) - } - vm.shutdownWg.Add(1) - go func() { - ethTxPool.Subscribe(ctx) - vm.shutdownWg.Done() - }() - pushGossipParams := avalanchegossip.BranchingFactor{ - StakePercentage: vm.config.PushGossipPercentStake, - Validators: vm.config.PushGossipNumValidators, - Peers: vm.config.PushGossipNumPeers, - } - pushRegossipParams := avalanchegossip.BranchingFactor{ - Validators: vm.config.PushRegossipNumValidators, - Peers: vm.config.PushRegossipNumPeers, - } - - ethTxPushGossiper, err := avalanchegossip.NewPushGossiper[*GossipEthTx]( - ethTxGossipMarshaller, - ethTxPool, - vm.P2PValidators(), - ethTxGossipClient, - ethTxGossipMetrics, - pushGossipParams, - pushRegossipParams, - config.PushGossipDiscardedElements, - config.TxGossipTargetMessageSize, - vm.config.RegossipFrequency.Duration, - ) - if err != nil { - return fmt.Errorf("failed to initialize eth tx push gossiper: %w", err) - } - vm.ethTxPushGossiper.Set(ethTxPushGossiper) - // NOTE: gossip network must be initialized first otherwise ETH tx gossip will not work. vm.builderLock.Lock() vm.builder = vm.NewBlockBuilder(vm.extensionConfig.ExtraMempool) @@ -813,11 +824,11 @@ func (vm *VM) initBlockBuilding() error { vm.builder.awaitSubmittedTxs() vm.builderLock.Unlock() - ethTxGossipHandler, err := gossip.NewTxGossipHandler[*GossipEthTx]( + ethTxGossipHandler, err := gossip.NewTxGossipHandler[*gossipTx]( vm.ctx.Log, - ethTxGossipMarshaller, - ethTxPool, - ethTxGossipMetrics, + gossipTxMarshaller{}, + vm.gossipSet, + vm.gossipMetrics, config.TxGossipTargetMessageSize, config.TxGossipThrottlingPeriod, config.TxGossipRequestsPerPeer, @@ -833,12 +844,12 @@ func (vm *VM) initBlockBuilding() error { return fmt.Errorf("failed to add eth tx gossip handler: %w", err) } - ethTxPullGossiper := avalanchegossip.NewPullGossiper[*GossipEthTx]( + ethTxPullGossiper := avalanchegossip.NewPullGossiper[*gossipTx]( vm.ctx.Log, - ethTxGossipMarshaller, - ethTxPool, - ethTxGossipClient, - ethTxGossipMetrics, + gossipTxMarshaller{}, + vm.gossipSet, + vm.gossipClient, + vm.gossipMetrics, config.TxGossipPollSize, ) @@ -850,7 +861,7 @@ func (vm *VM) initBlockBuilding() error { vm.shutdownWg.Add(1) go func() { - avalanchegossip.Every(ctx, vm.ctx.Log, ethTxPushGossiper, vm.config.PushGossipFrequency.Duration) + avalanchegossip.Every(ctx, vm.ctx.Log, vm.gossipPusher.Get(), vm.config.PushGossipFrequency.Duration) vm.shutdownWg.Done() }() vm.shutdownWg.Add(1) diff --git a/network/ip_tracker.go b/network/ip_tracker.go index a660fabf25de..6dfd25a831fc 100644 --- a/network/ip_tracker.go +++ b/network/ip_tracker.go @@ -552,9 +552,10 @@ func (i *ipTracker) updateMostRecentTrackedIP(node *trackedNode, ip *ips.Claimed return } - i.bloomAdditions[ip.NodeID] = oldCount + 1 - bloom.Add(i.bloom, ip.GossipID[:], i.bloomSalt) - i.bloomMetrics.Count.Inc() + if bloom.Add(i.bloom, ip.GossipID[:], i.bloomSalt) { + i.bloomAdditions[ip.NodeID] = oldCount + 1 + i.bloomMetrics.Count.Inc() + } } // ResetBloom prunes the current bloom filter. This must be called periodically diff --git a/network/p2p/gossip/bloom.go b/network/p2p/gossip/bloom.go index 9c05c8db78e0..bccf01c0debd 100644 --- a/network/p2p/gossip/bloom.go +++ b/network/p2p/gossip/bloom.go @@ -18,6 +18,8 @@ import ( // // Invariant: The returned bloom filter is not safe to reset concurrently with // other operations. However, it is otherwise safe to access concurrently. +// +// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. func NewBloomFilter( registerer prometheus.Registerer, namespace string, @@ -45,6 +47,7 @@ func NewBloomFilter( return filter, err } +// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. type BloomFilter struct { minTargetElements int targetFalsePositiveProbability float64 @@ -62,8 +65,9 @@ type BloomFilter struct { func (b *BloomFilter) Add(gossipable Gossipable) { h := gossipable.GossipID() - bloom.Add(b.bloom, h[:], b.salt[:]) - b.metrics.Count.Inc() + if bloom.Add(b.bloom, h[:], b.salt[:]) { + b.metrics.Count.Inc() + } } func (b *BloomFilter) Has(gossipable Gossipable) bool { @@ -85,6 +89,8 @@ func (b *BloomFilter) Marshal() ([]byte, []byte) { // the same [targetFalsePositiveProbability]. // // Returns true if the bloom filter was reset. +// +// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. func ResetBloomFilterIfNeeded( bloomFilter *BloomFilter, targetElements int, diff --git a/network/p2p/gossip/gossip.go b/network/p2p/gossip/gossip.go index c03e513854df..ade88ee61684 100644 --- a/network/p2p/gossip/gossip.go +++ b/network/p2p/gossip/gossip.go @@ -18,6 +18,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/buffer" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -40,6 +41,7 @@ const ( var ( _ Gossiper = (*ValidatorGossiper)(nil) _ Gossiper = (*PullGossiper[Gossipable])(nil) + _ Gossiper = (*PushGossiper[Gossipable])(nil) ioTypeLabels = []string{ioLabel, typeLabel} sentPushLabels = prometheus.Labels{ @@ -75,6 +77,17 @@ var ( ErrInvalidRegossipFrequency = errors.New("re-gossip frequency cannot be negative") ) +// Gossipable is an item that can be gossiped across the network +type Gossipable interface { + GossipID() ids.ID +} + +// Marshaller handles parsing logic for a concrete Gossipable type +type Marshaller[T Gossipable] interface { + MarshalGossip(T) ([]byte, error) + UnmarshalGossip([]byte) (T, error) +} + // Gossiper gossips Gossipables to other nodes type Gossiper interface { // Gossip runs a cycle of gossip. Returns an error if we failed to gossip. @@ -191,7 +204,7 @@ func (v ValidatorGossiper) Gossip(ctx context.Context) error { func NewPullGossiper[T Gossipable]( log logging.Logger, marshaller Marshaller[T], - set Set[T], + set PullGossiperSet[T], client *p2p.Client, metrics Metrics, pollSize int, @@ -206,17 +219,27 @@ func NewPullGossiper[T Gossipable]( } } +// PullGossiperSet exposes the current bloom filter and allows adding new items +// that were not included in the filter. +type PullGossiperSet[T Gossipable] interface { + // Add adds a value to the set. Returns an error if v was not added. + Add(v T) error + // BloomFilter returns the bloom filter and its corresponding salt. + BloomFilter() (bloom *bloom.Filter, salt ids.ID) +} + type PullGossiper[T Gossipable] struct { log logging.Logger marshaller Marshaller[T] - set Set[T] + set PullGossiperSet[T] client *p2p.Client metrics Metrics pollSize int } func (p *PullGossiper[_]) Gossip(ctx context.Context) error { - msgBytes, err := MarshalAppRequest(p.set.GetFilter()) + bf, salt := p.set.BloomFilter() + msgBytes, err := MarshalAppRequest(bf.Marshal(), salt[:]) if err != nil { return err } @@ -293,7 +316,7 @@ func (p *PullGossiper[_]) handleResponse( // NewPushGossiper returns an instance of PushGossiper func NewPushGossiper[T Gossipable]( marshaller Marshaller[T], - mempool Set[T], + set PushGossiperSet, validators p2p.ValidatorSubset, client *p2p.Client, metrics Metrics, @@ -320,7 +343,7 @@ func NewPushGossiper[T Gossipable]( return &PushGossiper[T]{ marshaller: marshaller, - set: mempool, + set: set, validators: validators, client: client, metrics: metrics, @@ -336,10 +359,16 @@ func NewPushGossiper[T Gossipable]( }, nil } +// PushGossiperSet exposes whether hashes are still included in a set. +type PushGossiperSet interface { + // Has returns true if the hash is in the set. + Has(h ids.ID) bool +} + // PushGossiper broadcasts gossip to peers randomly in the network type PushGossiper[T Gossipable] struct { marshaller Marshaller[T] - set Set[T] + set PushGossiperSet validators p2p.ValidatorSubset client *p2p.Client metrics Metrics diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index cb8222ceff4f..5dbcc8e70958 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -20,7 +20,6 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/snow/validators/validatorstest" - "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -69,29 +68,26 @@ func (marshaller) UnmarshalGossip(bytes []byte) (tx, error) { type setDouble struct { txs set.Set[tx] - bloom *BloomFilter onAdd func(tx tx) } -func (s *setDouble) Add(gossipable tx) error { - if s.txs.Contains(gossipable) { - return fmt.Errorf("%s already present", ids.ID(gossipable)) +func (s *setDouble) Add(t tx) error { + if s.txs.Contains(t) { + return fmt.Errorf("%s already present", t) } - s.txs.Add(gossipable) - s.bloom.Add(gossipable) + s.txs.Add(t) if s.onAdd != nil { - s.onAdd(gossipable) + s.onAdd(t) } - return nil } -func (s *setDouble) Has(gossipID ids.ID) bool { - return s.txs.Contains(tx(gossipID)) +func (s *setDouble) Has(h ids.ID) bool { + return s.txs.Contains(tx(h)) } -func (s *setDouble) Iterate(f func(gossipable tx) bool) { +func (s *setDouble) Iterate(f func(t tx) bool) { for tx := range s.txs { if !f(tx) { return @@ -99,8 +95,8 @@ func (s *setDouble) Iterate(f func(gossipable tx) bool) { } } -func (s *setDouble) GetFilter() ([]byte, []byte) { - return s.bloom.Marshal() +func (s *setDouble) Len() int { + return s.txs.Len() } func TestGossiperGossip(t *testing.T) { @@ -175,13 +171,17 @@ func TestGossiperGossip(t *testing.T) { ) require.NoError(err) - responseBloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05) + responseSetWithBloom, err := NewSetWithBloomFilter( + &setDouble{}, + prometheus.NewRegistry(), + "", + 1000, + 0.01, + 0.05, + ) require.NoError(err) - responseSet := &setDouble{ - bloom: responseBloom, - } for _, item := range tt.responder { - require.NoError(responseSet.Add(item)) + require.NoError(responseSetWithBloom.Add(item)) } metrics, err := NewMetrics(prometheus.NewRegistry(), "") @@ -196,7 +196,7 @@ func TestGossiperGossip(t *testing.T) { handler := NewHandler[tx]( logging.NoLog{}, marshaller, - responseSet, + responseSetWithBloom, metrics, tt.targetResponseSize, ) @@ -218,13 +218,18 @@ func TestGossiperGossip(t *testing.T) { require.NoError(err) require.NoError(requestNetwork.Connected(t.Context(), ids.EmptyNodeID, nil)) - bloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05) + var requestSet setDouble + requestSetWithBloom, err := NewSetWithBloomFilter( + &requestSet, + prometheus.NewRegistry(), + "", + 1000, + 0.01, + 0.05, + ) require.NoError(err) - requestSet := &setDouble{ - bloom: bloom, - } for _, item := range tt.requester { - require.NoError(requestSet.Add(item)) + require.NoError(requestSetWithBloom.Add(item)) } requestClient := requestNetwork.NewClient( @@ -236,7 +241,7 @@ func TestGossiperGossip(t *testing.T) { gossiper := NewPullGossiper[tx]( logging.NoLog{}, marshaller, - requestSet, + requestSetWithBloom, requestClient, metrics, 1, @@ -457,20 +462,10 @@ func TestPushGossiperNew(t *testing.T) { } } -type fullSet[T Gossipable] struct{} - -func (fullSet[T]) Add(T) error { - return nil -} - -func (fullSet[T]) Has(ids.ID) bool { - return true -} - -func (fullSet[T]) Iterate(func(gossipable T) bool) {} +type hasFunc func(id ids.ID) bool -func (fullSet[_]) GetFilter() ([]byte, []byte) { - return bloom.FullFilter.Marshal(), ids.Empty[:] +func (h hasFunc) Has(id ids.ID) bool { + return h(id) } // Tests that the outgoing gossip is equivalent to what was accumulated @@ -593,7 +588,9 @@ func TestPushGossiper(t *testing.T) { gossiper, err := NewPushGossiper[tx]( marshaller, - fullSet[tx]{}, + hasFunc(func(ids.ID) bool { + return true // Never remove the items from the set + }), validators, client, metrics, diff --git a/network/p2p/gossip/gossipable.go b/network/p2p/gossip/gossipable.go deleted file mode 100644 index b535609a7a48..000000000000 --- a/network/p2p/gossip/gossipable.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package gossip - -import "github.com/ava-labs/avalanchego/ids" - -// Gossipable is an item that can be gossiped across the network -type Gossipable interface { - GossipID() ids.ID -} - -// Marshaller handles parsing logic for a concrete Gossipable type -type Marshaller[T Gossipable] interface { - MarshalGossip(T) ([]byte, error) - UnmarshalGossip([]byte) (T, error) -} - -// Set holds a set of known Gossipable items -type Set[T Gossipable] interface { - // Add adds a Gossipable to the set. Returns an error if gossipable was not - // added. - Add(gossipable T) error - // Has returns true if the gossipable is in the set. - Has(gossipID ids.ID) bool - // Iterate iterates over elements until [f] returns false - Iterate(f func(gossipable T) bool) - // GetFilter returns the byte representation of bloom filter and its - // corresponding salt. - GetFilter() (bloom []byte, salt []byte) -} diff --git a/network/p2p/gossip/handler.go b/network/p2p/gossip/handler.go index 7c016030c913..20cc31520bf3 100644 --- a/network/p2p/gossip/handler.go +++ b/network/p2p/gossip/handler.go @@ -18,10 +18,19 @@ import ( var _ p2p.Handler = (*Handler[Gossipable])(nil) +// HandlerSet exposes the ability to add new values to the set in response to +// pushed information and for responding to pull requests. +type HandlerSet[T Gossipable] interface { + // Add adds a value to the set. Returns an error if v was not added. + Add(v T) error + // Iterate iterates over elements until f returns false. + Iterate(f func(v T) bool) +} + func NewHandler[T Gossipable]( log logging.Logger, marshaller Marshaller[T], - set Set[T], + set HandlerSet[T], metrics Metrics, targetResponseSize int, ) *Handler[T] { @@ -39,7 +48,7 @@ type Handler[T Gossipable] struct { p2p.Handler marshaller Marshaller[T] log logging.Logger - set Set[T] + set HandlerSet[T] metrics Metrics targetResponseSize int } diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go new file mode 100644 index 000000000000..187b13454fcf --- /dev/null +++ b/network/p2p/gossip/set.go @@ -0,0 +1,159 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "crypto/rand" + "fmt" + "sync" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/bloom" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + _ Set[Gossipable] = (*SetWithBloomFilter[Gossipable])(nil) + _ PullGossiperSet[Gossipable] = (*SetWithBloomFilter[Gossipable])(nil) +) + +type Set[T Gossipable] interface { + HandlerSet[T] + PushGossiperSet + // Len returns the number of items in the set. + Len() int +} + +// NewSetWithBloomFilter wraps set with a bloom filter. It is expected for all +// future additions to the provided set to go through the returned set, or be +// followed by a call to AddToBloom. +func NewSetWithBloomFilter[T Gossipable]( + set Set[T], + registerer prometheus.Registerer, + namespace string, + minTargetElements int, + targetFalsePositiveProbability, + resetFalsePositiveProbability float64, +) (*SetWithBloomFilter[T], error) { + metrics, err := bloom.NewMetrics(namespace, registerer) + if err != nil { + return nil, err + } + m := &SetWithBloomFilter[T]{ + set: set, + + minTargetElements: minTargetElements, + targetFalsePositiveProbability: targetFalsePositiveProbability, + resetFalsePositiveProbability: resetFalsePositiveProbability, + + metrics: metrics, + } + return m, m.resetBloomFilter() +} + +type SetWithBloomFilter[T Gossipable] struct { + set Set[T] + + minTargetElements int + targetFalsePositiveProbability float64 + resetFalsePositiveProbability float64 + + metrics *bloom.Metrics + + l sync.RWMutex + maxCount int + bloom *bloom.Filter + // salt is provided to eventually unblock collisions in Bloom. It's possible + // that conflicting Gossipable items collide in the bloom filter, so a salt + // is generated to eventually resolve collisions. + salt ids.ID +} + +func (s *SetWithBloomFilter[T]) Add(v T) error { + if err := s.set.Add(v); err != nil { + return err + } + return s.AddToBloom(v.GossipID()) +} + +// AddToBloom adds the provided ID to the bloom filter. This function is exposed +// to allow modifications to the inner set without going through Add. +// +// Even if an error is returned, the ID has still been added to the bloom +// filter. +func (s *SetWithBloomFilter[T]) AddToBloom(h ids.ID) error { + s.l.RLock() + bloom.Add(s.bloom, h[:], s.salt[:]) + shouldReset := s.shouldReset() + s.l.RUnlock() + s.metrics.Count.Inc() + + if !shouldReset { + return nil + } + + s.l.Lock() + defer s.l.Unlock() + + // Bloom filter was already reset by another thread + if !s.shouldReset() { + return nil + } + return s.resetBloomFilter() +} + +func (s *SetWithBloomFilter[T]) shouldReset() bool { + return s.bloom.Count() > s.maxCount +} + +// resetBloomFilter attempts to generate a new bloom filter and fill it with the +// current entries in the set. +// +// If an error is returned, the bloom filter and salt are unchanged. +func (s *SetWithBloomFilter[T]) resetBloomFilter() error { + targetElements := max(2*s.set.Len(), s.minTargetElements) + numHashes, numEntries := bloom.OptimalParameters( + targetElements, + s.targetFalsePositiveProbability, + ) + newBloom, err := bloom.New(numHashes, numEntries) + if err != nil { + return fmt.Errorf("creating new bloom: %w", err) + } + var newSalt ids.ID + if _, err := rand.Read(newSalt[:]); err != nil { + return fmt.Errorf("generating new salt: %w", err) + } + s.set.Iterate(func(v T) bool { + h := v.GossipID() + bloom.Add(newBloom, h[:], newSalt[:]) + return true + }) + + s.maxCount = bloom.EstimateCount(numHashes, numEntries, s.resetFalsePositiveProbability) + s.bloom = newBloom + s.salt = newSalt + + s.metrics.Reset(newBloom, s.maxCount) + return nil +} + +func (s *SetWithBloomFilter[T]) Has(h ids.ID) bool { + return s.set.Has(h) +} + +func (s *SetWithBloomFilter[T]) Iterate(f func(T) bool) { + s.set.Iterate(f) +} + +func (s *SetWithBloomFilter[T]) Len() int { + return s.set.Len() +} + +func (s *SetWithBloomFilter[_]) BloomFilter() (*bloom.Filter, ids.ID) { + s.l.RLock() + defer s.l.RUnlock() + + return s.bloom, s.salt +} diff --git a/network/p2p/gossip/set_test.go b/network/p2p/gossip/set_test.go new file mode 100644 index 000000000000..add3436ccfc2 --- /dev/null +++ b/network/p2p/gossip/set_test.go @@ -0,0 +1,130 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "testing" + + "github.com/ava-labs/avalanchego/utils/bloom" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestSetWithBloomFilter_Refresh(t *testing.T) { + type ( + op struct { + add *tx + remove *tx + } + test struct { + name string + resetFalsePositiveProbability float64 + ops []op + expected []tx + expectedResetCount float64 + } + ) + tests := []test{ + { + name: "no refresh", + resetFalsePositiveProbability: 1, // maxCount = 9223372036854775807 + ops: []op{ + {add: &tx{0}}, + {add: &tx{1}}, + {add: &tx{2}}, + }, + expected: []tx{ + {0}, + {1}, + {2}, + }, + expectedResetCount: 0, + }, + { + name: "no refresh - with removals", + resetFalsePositiveProbability: 1, // maxCount = 9223372036854775807 + ops: []op{ + {add: &tx{0}}, + {add: &tx{1}}, + {add: &tx{2}}, + {remove: &tx{0}}, + {remove: &tx{1}}, + {remove: &tx{2}}, + }, + expected: []tx{ + {0}, + {1}, + {2}, + }, + expectedResetCount: 0, + }, + { + name: "refresh", + resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1 + ops: []op{ + {add: &tx{0}}, // no reset + {remove: &tx{0}}, + {add: &tx{1}}, // reset + }, + expected: []tx{ + {1}, + }, + expectedResetCount: 1, + }, + { + name: "multiple refresh", + resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1 + ops: []op{ + {add: &tx{0}}, // no reset + {remove: &tx{0}}, + {add: &tx{1}}, // reset + {remove: &tx{1}}, + {add: &tx{2}}, // reset + }, + expected: []tx{ + {2}, + }, + expectedResetCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + const ( + minTargetElements = 1 + targetFalsePositiveProbability = 0.0001 + ) + var s setDouble + bs, err := NewSetWithBloomFilter( + &s, + prometheus.NewRegistry(), + "", + minTargetElements, + targetFalsePositiveProbability, + tt.resetFalsePositiveProbability, + ) + require.NoError(err) + + for _, op := range tt.ops { + if op.add != nil { + require.NoError(bs.Add(*op.add)) + } + if op.remove != nil { + s.txs.Remove(*op.remove) + } + } + + // Add one to expectedResetCount to account for the initial creation + // of the bloom filter. + require.Equal(tt.expectedResetCount+1, testutil.ToFloat64(bs.metrics.ResetCount)) + b, h := bs.BloomFilter() + for _, expected := range tt.expected { + require.True(bloom.Contains(b, expected[:], h[:])) + } + }) + } +} diff --git a/utils/bloom/filter.go b/utils/bloom/filter.go index d139caa52d69..487db11edbe6 100644 --- a/utils/bloom/filter.go +++ b/utils/bloom/filter.go @@ -59,19 +59,29 @@ func New(numHashes, numEntries int) (*Filter, error) { }, nil } -func (f *Filter) Add(hash uint64) { +// Add adds the provided hash to the bloom filter. It returns true if the hash +// was not already present in the bloom filter. +func (f *Filter) Add(hash uint64) bool { f.lock.Lock() defer f.lock.Unlock() - _ = 1 % f.numBits // hint to the compiler that numBits is not 0 + var ( + _ = 1 % f.numBits // hint to the compiler that numBits is not 0 + accumulator byte = 1 + ) for _, seed := range f.hashSeeds { hash = bits.RotateLeft64(hash, hashRotation) ^ seed index := hash % f.numBits byteIndex := index / bitsPerByte bitIndex := index % bitsPerByte + accumulator &= f.entries[byteIndex] >> bitIndex f.entries[byteIndex] |= 1 << bitIndex } - f.count++ + added := accumulator == 0 + if added { + f.count++ + } + return added } // Count returns the number of elements that have been added to the bloom diff --git a/utils/bloom/filter_test.go b/utils/bloom/filter_test.go index f8816e2cb89d..44937f66b549 100644 --- a/utils/bloom/filter_test.go +++ b/utils/bloom/filter_test.go @@ -42,6 +42,19 @@ func TestNewErrors(t *testing.T) { } } +func TestAdd(t *testing.T) { + require := require.New(t) + + initialNumHashes, initialNumBytes := OptimalParameters(1024, 0.01) + filter, err := New(initialNumHashes, initialNumBytes) + require.NoError(err) + + require.True(filter.Add(1)) + require.Equal(1, filter.Count()) + require.False(filter.Add(1)) + require.Equal(1, filter.Count()) +} + func TestNormalUsage(t *testing.T) { require := require.New(t) @@ -61,7 +74,7 @@ func TestNormalUsage(t *testing.T) { } } - require.Equal(len(toAdd), filter.Count()) + require.LessOrEqual(filter.Count(), len(toAdd)) filterBytes := filter.Marshal() parsedFilter, err := Parse(filterBytes) diff --git a/utils/bloom/hasher.go b/utils/bloom/hasher.go index 7b6441472c92..fca81a372b14 100644 --- a/utils/bloom/hasher.go +++ b/utils/bloom/hasher.go @@ -8,8 +8,8 @@ import ( "encoding/binary" ) -func Add(f *Filter, key, salt []byte) { - f.Add(Hash(key, salt)) +func Add(f *Filter, key, salt []byte) bool { + return f.Add(Hash(key, salt)) } func Contains(c Checker, key, salt []byte) bool { diff --git a/vms/avm/network/gossip.go b/vms/avm/network/gossip.go index fb2077449dae..e9d92736dc15 100644 --- a/vms/avm/network/gossip.go +++ b/vms/avm/network/gossip.go @@ -6,23 +6,19 @@ package network import ( "context" "fmt" - "sync" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow/engine/common" - "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/txs/mempool" ) var ( _ p2p.Handler = (*txGossipHandler)(nil) - _ gossip.Set[*txs.Tx] = (*gossipMempool)(nil) + _ gossip.Set[*txs.Tx] = (*mempoolWithVerification)(nil) _ gossip.Marshaller[*txs.Tx] = (*txParser)(nil) ) @@ -66,38 +62,26 @@ func (g *txParser) UnmarshalGossip(bytes []byte) (*txs.Tx, error) { return g.parser.ParseTx(bytes) } -func newGossipMempool( +func newMempoolWithVerification( mempool mempool.Mempool[*txs.Tx], - registerer prometheus.Registerer, - log logging.Logger, txVerifier TxVerifier, - minTargetElements int, - targetFalsePositiveProbability, - resetFalsePositiveProbability float64, -) (*gossipMempool, error) { - bloom, err := gossip.NewBloomFilter(registerer, "mempool_bloom_filter", minTargetElements, targetFalsePositiveProbability, resetFalsePositiveProbability) - return &gossipMempool{ +) *mempoolWithVerification { + return &mempoolWithVerification{ Mempool: mempool, - log: log, txVerifier: txVerifier, - bloom: bloom, - }, err + } } -type gossipMempool struct { +type mempoolWithVerification struct { mempool.Mempool[*txs.Tx] - log logging.Logger txVerifier TxVerifier - - lock sync.RWMutex - bloom *gossip.BloomFilter } // Add is called by the p2p SDK when handling transactions that were pushed to // us and when handling transactions that were pulled from a peer. If this // returns a nil error while handling push gossip, the p2p SDK will queue the // transaction to push gossip as well. -func (g *gossipMempool) Add(tx *txs.Tx) error { +func (g *mempoolWithVerification) Add(tx *txs.Tx) error { txID := tx.ID() if _, ok := g.Mempool.Get(txID); ok { return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID) @@ -120,43 +104,15 @@ func (g *gossipMempool) Add(tx *txs.Tx) error { return g.AddWithoutVerification(tx) } -func (g *gossipMempool) Has(txID ids.ID) bool { +func (g *mempoolWithVerification) Has(txID ids.ID) bool { _, ok := g.Mempool.Get(txID) return ok } -func (g *gossipMempool) AddWithoutVerification(tx *txs.Tx) error { +func (g *mempoolWithVerification) AddWithoutVerification(tx *txs.Tx) error { if err := g.Mempool.Add(tx); err != nil { g.Mempool.MarkDropped(tx.ID(), err) return err } - - g.lock.Lock() - defer g.lock.Unlock() - - g.bloom.Add(tx) - reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier) - if err != nil { - return err - } - - if reset { - g.log.Debug("resetting bloom filter") - g.Mempool.Iterate(func(tx *txs.Tx) bool { - g.bloom.Add(tx) - return true - }) - } return nil } - -func (g *gossipMempool) Iterate(f func(*txs.Tx) bool) { - g.Mempool.Iterate(f) -} - -func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte) { - g.lock.RLock() - defer g.lock.RUnlock() - - return g.bloom.Marshal() -} diff --git a/vms/avm/network/gossip_test.go b/vms/avm/network/gossip_test.go index 3059e22304e3..2c782f5fccf7 100644 --- a/vms/avm/network/gossip_test.go +++ b/vms/avm/network/gossip_test.go @@ -4,13 +4,13 @@ package network import ( + "errors" "testing" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/avm/fxs" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/avm/txs/mempool" @@ -53,7 +53,7 @@ func TestMarshaller(t *testing.T) { require.Equal(want.GossipID(), got.GossipID()) } -func TestGossipMempoolAdd(t *testing.T) { +func TestMempoolWithVerificationAdd(t *testing.T) { require := require.New(t) metrics := prometheus.NewRegistry() @@ -61,17 +61,7 @@ func TestGossipMempoolAdd(t *testing.T) { baseMempool, err := mempool.New("", metrics) require.NoError(err) - mempool, err := newGossipMempool( - baseMempool, - metrics, - logging.NoLog{}, - testVerifier{}, - DefaultConfig.ExpectedBloomFilterElements, - DefaultConfig.ExpectedBloomFilterFalsePositiveProbability, - DefaultConfig.MaxBloomFilterFalsePositiveProbability, - ) - require.NoError(err) - + mempool := newMempoolWithVerification(baseMempool, testVerifier{}) tx := &txs.Tx{ Unsigned: &txs.BaseTx{ BaseTx: avax.BaseTx{ @@ -82,10 +72,10 @@ func TestGossipMempoolAdd(t *testing.T) { } require.NoError(mempool.Add(tx)) - require.True(mempool.bloom.Has(tx)) + require.True(mempool.Has(tx.ID())) } -func TestGossipMempoolAddVerified(t *testing.T) { +func TestMempoolWithVerificationAddWithoutVerification(t *testing.T) { require := require.New(t) metrics := prometheus.NewRegistry() @@ -93,19 +83,9 @@ func TestGossipMempoolAddVerified(t *testing.T) { baseMempool, err := mempool.New("", metrics) require.NoError(err) - mempool, err := newGossipMempool( - baseMempool, - metrics, - logging.NoLog{}, - testVerifier{ - err: errTest, // We shouldn't be attempting to verify the tx in this flow - }, - DefaultConfig.ExpectedBloomFilterElements, - DefaultConfig.ExpectedBloomFilterFalsePositiveProbability, - DefaultConfig.MaxBloomFilterFalsePositiveProbability, - ) - require.NoError(err) - + mempool := newMempoolWithVerification(baseMempool, testVerifier{ + err: errors.New("verification failed"), + }) tx := &txs.Tx{ Unsigned: &txs.BaseTx{ BaseTx: avax.BaseTx{ @@ -116,5 +96,5 @@ func TestGossipMempoolAddVerified(t *testing.T) { } require.NoError(mempool.AddWithoutVerification(tx)) - require.True(mempool.bloom.Has(tx)) + require.True(mempool.Has(tx.ID())) } diff --git a/vms/avm/network/network.go b/vms/avm/network/network.go index 45a873acb60e..90ada1db58cc 100644 --- a/vms/avm/network/network.go +++ b/vms/avm/network/network.go @@ -9,6 +9,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" @@ -28,8 +29,10 @@ var ( type Network struct { *p2p.Network - log logging.Logger - mempool *gossipMempool + log logging.Logger + + mempoolWithVerification *mempoolWithVerification + set *gossip.SetWithBloomFilter[*txs.Tx] txPushGossiper *gossip.PushGossiper[*txs.Tx] txPushGossipFrequency time.Duration @@ -76,11 +79,11 @@ func New( return nil, err } - gossipMempool, err := newGossipMempool( - mempool, + mempoolWithVerification := newMempoolWithVerification(mempool, txVerifier) + set, err := gossip.NewSetWithBloomFilter( + mempoolWithVerification, registerer, - log, - txVerifier, + "mempool_bloom_filter", config.ExpectedBloomFilterElements, config.ExpectedBloomFilterFalsePositiveProbability, config.MaxBloomFilterFalsePositiveProbability, @@ -91,7 +94,7 @@ func New( txPushGossiper, err := gossip.NewPushGossiper[*txs.Tx]( marshaller, - gossipMempool, + set, validators, txGossipClient, txGossipMetrics, @@ -115,7 +118,7 @@ func New( var txPullGossiper gossip.Gossiper = gossip.NewPullGossiper[*txs.Tx]( log, marshaller, - gossipMempool, + set, txGossipClient, txGossipMetrics, config.PullGossipPollSize, @@ -131,7 +134,7 @@ func New( handler := gossip.NewHandler[*txs.Tx]( log, marshaller, - gossipMempool, + set, txGossipMetrics, config.TargetGossipSize, ) @@ -167,13 +170,14 @@ func New( } return &Network{ - Network: p2pNetwork, - log: log, - mempool: gossipMempool, - txPushGossiper: txPushGossiper, - txPushGossipFrequency: config.PushGossipFrequency, - txPullGossiper: txPullGossiper, - txPullGossipFrequency: config.PullGossipFrequency, + Network: p2pNetwork, + log: log, + mempoolWithVerification: mempoolWithVerification, + set: set, + txPushGossiper: txPushGossiper, + txPushGossipFrequency: config.PushGossipFrequency, + txPullGossiper: txPullGossiper, + txPullGossipFrequency: config.PullGossipFrequency, }, nil } @@ -193,7 +197,7 @@ func (n *Network) PullGossip(ctx context.Context) { // returned. // If the tx is not added to the mempool, an error will be returned. func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { - if err := n.mempool.Add(tx); err != nil { + if err := n.set.Add(tx); err != nil { return err } n.txPushGossiper.Add(tx) @@ -208,9 +212,14 @@ func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { // returned. // If the tx is not added to the mempool, an error will be returned. func (n *Network) IssueTxFromRPCWithoutVerification(tx *txs.Tx) error { - if err := n.mempool.AddWithoutVerification(tx); err != nil { + if err := n.mempoolWithVerification.AddWithoutVerification(tx); err != nil { return err } + if err := n.set.AddToBloom(tx.ID()); err != nil { + n.log.Warn("adding tx to bloom filter", + zap.Error(err), + ) + } n.txPushGossiper.Add(tx) return nil } diff --git a/vms/evm/emulate/emulate.go b/vms/evm/emulate/emulate.go deleted file mode 100644 index bcd3a526f263..000000000000 --- a/vms/evm/emulate/emulate.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -// Package emulate provides temporary emulation of coreth (C-Chain) and -// subnet-evm (EVM L1) behaviours. All functions are safe for concurrent use -// with each other, but all hold the same mutex so their execution SHOULD be -// short-lived. -package emulate - -import ( - cchain "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm" - subnet "github.com/ava-labs/subnet-evm/plugin/evm" -) - -// CChain executes `fn` as if running in a `coreth` node. -func CChain(fn func() error) error { - return cchain.WithTempRegisteredLibEVMExtras(fn) -} - -// SubnetEVM executes `fn` as if running in a `subnet-evm` node. -func SubnetEVM(fn func() error) error { - return subnet.WithTempRegisteredLibEVMExtras(fn) -} - -// CChainVal executes `fn` as if running in a `coreth` node. -func CChainVal[T any](fn func() (T, error)) (T, error) { - return val(CChain, fn) -} - -// SubnetEVMVal executes `fn` as if running in a `subnet-evm` node. -func SubnetEVMVal[T any](fn func() (T, error)) (T, error) { - return val(SubnetEVM, fn) -} - -func val[T any]( - wrap func(func() error) error, - fn func() (T, error), -) (T, error) { - var v T - err := wrap(func() error { - var err error - v, err = fn() - return err - }) - return v, err -} diff --git a/vms/evm/emulate/emulate_test.go b/vms/evm/emulate/emulate_test.go deleted file mode 100644 index a3f0e9d080d3..000000000000 --- a/vms/evm/emulate/emulate_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package emulate - -import ( - "errors" - "sync/atomic" - "testing" - - "github.com/ava-labs/libevm/core/types" - "github.com/stretchr/testify/require" - - cchain "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/customtypes" - subnet "github.com/ava-labs/subnet-evm/plugin/evm/customtypes" -) - -// setAndGetMillis is an arbitrary function that can be run if and only if -// emulating either `coreth` or `subnet-evm`. If the respective emulation isn't -// active then it will cause `libevm` to panic. In addition to the panicking -// behaviour, it asserts that it is the only active emulation. -func setAndGetMillis[T *cchain.HeaderExtra | *subnet.HeaderExtra]( - t *testing.T, - active *atomic.Int64, - withExtra func(*types.Header, T) *types.Header, - extra T, - blockMillis func(*types.Block) *uint64, - retErr error, -) func() (*uint64, error) { - return func() (*uint64, error) { - require.True(t, active.CompareAndSwap(0, 1)) - defer func() { - require.True(t, active.CompareAndSwap(1, 0)) - }() - - b := types.NewBlockWithHeader(withExtra( - &types.Header{}, - extra, - )) - return blockMillis(b), retErr - } -} - -func TestEmulation(t *testing.T) { - const milli = uint64(1234) - newUint64 := func(u uint64) *uint64 { return &u } - sentinel := errors.New("uh oh") - - var active atomic.Int64 - onCChain := setAndGetMillis( - t, &active, - cchain.WithHeaderExtra, - &cchain.HeaderExtra{TimeMilliseconds: newUint64(milli)}, - cchain.BlockTimeMilliseconds, - sentinel, - ) - onSubnetEVM := setAndGetMillis( - t, &active, - subnet.WithHeaderExtra, - &subnet.HeaderExtra{TimeMilliseconds: newUint64(milli)}, - subnet.BlockTimeMilliseconds, - sentinel, - ) - - start := make(chan struct{}) - - t.Run("coreth", func(t *testing.T) { - t.Parallel() - <-start - for range 1000 { - got, err := CChainVal(onCChain) - require.ErrorIs(t, err, sentinel) - require.Equal(t, milli, *got) - } - }) - - t.Run("subnet-evm", func(t *testing.T) { - t.Parallel() - <-start - for range 1000 { - got, err := SubnetEVMVal(onSubnetEVM) - require.ErrorIs(t, err, sentinel) - require.Equal(t, milli, *got) - } - }) - - close(start) -} diff --git a/vms/platformvm/network/gossip.go b/vms/platformvm/network/gossip.go index 006c1740c57f..9e37fa9a657a 100644 --- a/vms/platformvm/network/gossip.go +++ b/vms/platformvm/network/gossip.go @@ -6,10 +6,8 @@ package network import ( "context" "fmt" - "sync" "time" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" @@ -27,6 +25,7 @@ var ( _ p2p.Handler = (*txGossipHandler)(nil) _ gossip.Marshaller[*txs.Tx] = (*txMarshaller)(nil) _ gossip.Gossipable = (*txs.Tx)(nil) + _ gossip.Set[*txs.Tx] = (*mempoolWithVerification)(nil) ) // bloomChurnMultiplier is the number used to multiply the size of the mempool @@ -67,34 +66,25 @@ func (txMarshaller) UnmarshalGossip(bytes []byte) (*txs.Tx, error) { return txs.Parse(txs.Codec, bytes) } -func newGossipMempool( +func newMempoolWithVerification( mempool *mempool.Mempool, - registerer prometheus.Registerer, log logging.Logger, txVerifier TxVerifier, - minTargetElements int, - targetFalsePositiveProbability, - resetFalsePositiveProbability float64, -) (*gossipMempool, error) { - bloom, err := gossip.NewBloomFilter(registerer, "mempool_bloom_filter", minTargetElements, targetFalsePositiveProbability, resetFalsePositiveProbability) - return &gossipMempool{ +) *mempoolWithVerification { + return &mempoolWithVerification{ Mempool: mempool, log: log, txVerifier: txVerifier, - bloom: bloom, - }, err + } } -type gossipMempool struct { +type mempoolWithVerification struct { *mempool.Mempool log logging.Logger txVerifier TxVerifier - - lock sync.RWMutex - bloom *gossip.BloomFilter } -func (g *gossipMempool) Add(tx *txs.Tx) error { +func (g *mempoolWithVerification) Add(tx *txs.Tx) error { txID := tx.ID() if _, ok := g.Mempool.Get(txID); ok { return fmt.Errorf("tx %s dropped: %w", txID, txmempool.ErrDuplicateTx) @@ -122,37 +112,10 @@ func (g *gossipMempool) Add(tx *txs.Tx) error { g.Mempool.MarkDropped(txID, err) return err } - - g.lock.Lock() - defer g.lock.Unlock() - - g.bloom.Add(tx) - reset, err := gossip.ResetBloomFilterIfNeeded( - g.bloom, - g.Mempool.Len()*bloomChurnMultiplier, - ) - if err != nil { - return err - } - - if reset { - g.log.Debug("resetting bloom filter") - g.Mempool.Iterate(func(tx *txs.Tx) bool { - g.bloom.Add(tx) - return true - }) - } return nil } -func (g *gossipMempool) Has(txID ids.ID) bool { +func (g *mempoolWithVerification) Has(txID ids.ID) bool { _, ok := g.Mempool.Get(txID) return ok } - -func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte) { - g.lock.RLock() - defer g.lock.RUnlock() - - return g.bloom.Marshal() -} diff --git a/vms/platformvm/network/gossip_test.go b/vms/platformvm/network/gossip_test.go index 87121f26961b..28abf6d7f5b8 100644 --- a/vms/platformvm/network/gossip_test.go +++ b/vms/platformvm/network/gossip_test.go @@ -25,7 +25,7 @@ import ( var errFoo = errors.New("foo") // Add should error if verification errors -func TestGossipMempoolAddVerificationError(t *testing.T) { +func TestMempoolWithVerificationAddVerificationError(t *testing.T) { require := require.New(t) txID := ids.GenerateTestID() @@ -43,24 +43,18 @@ func TestGossipMempoolAddVerificationError(t *testing.T) { require.NoError(err) txVerifier := testTxVerifier{err: errFoo} - gossipMempool, err := newGossipMempool( + mempoolWithVerification := newMempoolWithVerification( mempool, - prometheus.NewRegistry(), logging.NoLog{}, txVerifier, - testConfig.ExpectedBloomFilterElements, - testConfig.ExpectedBloomFilterFalsePositiveProbability, - testConfig.MaxBloomFilterFalsePositiveProbability, ) - require.NoError(err) - err = gossipMempool.Add(tx) - require.ErrorIs(err, errFoo) - require.False(gossipMempool.bloom.Has(tx)) + require.ErrorIs(mempoolWithVerification.Add(tx), errFoo) + require.ErrorIs(mempoolWithVerification.GetDropReason(txID), errFoo) } // Adding a duplicate to the mempool should return an error -func TestMempoolDuplicate(t *testing.T) { +func TestMempoolWithVerificationAddDuplicate(t *testing.T) { require := require.New(t) testMempool, err := pmempool.New( @@ -96,69 +90,10 @@ func TestMempoolDuplicate(t *testing.T) { } require.NoError(testMempool.Add(tx)) - gossipMempool, err := newGossipMempool( + mempoolWithVerification := newMempoolWithVerification( testMempool, - prometheus.NewRegistry(), - logging.NoLog{}, - txVerifier, - testConfig.ExpectedBloomFilterElements, - testConfig.ExpectedBloomFilterFalsePositiveProbability, - testConfig.MaxBloomFilterFalsePositiveProbability, - ) - require.NoError(err) - - err = gossipMempool.Add(tx) - require.ErrorIs(err, mempool.ErrDuplicateTx) - require.False(gossipMempool.bloom.Has(tx)) -} - -// Adding a tx to the mempool should add it to the bloom filter -func TestGossipAddBloomFilter(t *testing.T) { - require := require.New(t) - - txID := ids.GenerateTestID() - tx := &txs.Tx{ - Unsigned: &txs.BaseTx{ - BaseTx: avax.BaseTx{ - Ins: []*avax.TransferableInput{ - { - UTXOID: avax.UTXOID{ - TxID: ids.GenerateTestID(), - }, - Asset: avax.Asset{ - ID: snowtest.AVAXAssetID, - }, - In: &secp256k1fx.TransferInput{ - Amt: 1, - }, - }, - }, - }, - }, - TxID: txID, - } - - txVerifier := testTxVerifier{} - mempool, err := pmempool.New( - "", - gas.Dimensions{1, 1, 1, 1}, - 1_000_000, - snowtest.AVAXAssetID, - prometheus.NewRegistry(), - ) - require.NoError(err) - - gossipMempool, err := newGossipMempool( - mempool, - prometheus.NewRegistry(), logging.NoLog{}, txVerifier, - testConfig.ExpectedBloomFilterElements, - testConfig.ExpectedBloomFilterFalsePositiveProbability, - testConfig.MaxBloomFilterFalsePositiveProbability, ) - require.NoError(err) - - require.NoError(gossipMempool.Add(tx)) - require.True(gossipMempool.bloom.Has(tx)) + require.ErrorIs(mempoolWithVerification.Add(tx), mempool.ErrDuplicateTx) } diff --git a/vms/platformvm/network/network.go b/vms/platformvm/network/network.go index c93c8f009aed..3f03e99b6cb4 100644 --- a/vms/platformvm/network/network.go +++ b/vms/platformvm/network/network.go @@ -30,7 +30,7 @@ type Network struct { *p2p.Network log logging.Logger - mempool *gossipMempool + gossipMempool *gossip.SetWithBloomFilter[*txs.Tx] partialSyncPrimaryNetwork bool txPushGossiper *gossip.PushGossiper[*txs.Tx] @@ -81,11 +81,16 @@ func New( return nil, err } - gossipMempool, err := newGossipMempool( + mempoolWithVerification := newMempoolWithVerification( mempool, - registerer, log, txVerifier, + ) + + gossipMempool, err := gossip.NewSetWithBloomFilter( + mempoolWithVerification, + registerer, + "mempool_bloom_filter", config.ExpectedBloomFilterElements, config.ExpectedBloomFilterFalsePositiveProbability, config.MaxBloomFilterFalsePositiveProbability, @@ -185,7 +190,7 @@ func New( return &Network{ Network: p2pNetwork, log: log, - mempool: gossipMempool, + gossipMempool: gossipMempool, partialSyncPrimaryNetwork: partialSyncPrimaryNetwork, txPushGossiper: txPushGossiper, txPushGossipFrequency: config.PushGossipFrequency, @@ -221,7 +226,7 @@ func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msgBytes []b } func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { - if err := n.mempool.Add(tx); err != nil { + if err := n.gossipMempool.Add(tx); err != nil { return err } n.txPushGossiper.Add(tx)