diff --git a/go.mod b/go.mod index a151fc149145..3bff67f5e91b 100644 --- a/go.mod +++ b/go.mod @@ -21,9 +21,9 @@ require ( github.com/DataDog/zstd v1.5.2 github.com/StephenButtolph/canoto v0.17.3 github.com/antithesishq/antithesis-sdk-go v0.3.8 - github.com/ava-labs/avalanchego/graft/coreth v0.0.0-20251201152440-a263a7cedcd0 + github.com/ava-labs/avalanchego/graft/coreth v0.0.0-20251201173339-98b2978e465a github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 - github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d + github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d github.com/btcsuite/btcd/btcutil v1.1.3 github.com/cespare/xxhash/v2 v2.3.0 github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 diff --git a/go.sum b/go.sum index c0068716ca84..9453b5e0cad6 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 h1:hQ15IJxY7WO github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2/go.mod h1:DqSotSn4Dx/UJV+d3svfW8raR+cH7+Ohl9BpsQ5HlGU= github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19 h1:S6oFasZsplNmw8B2S8cMJQMa62nT5ZKGzZRdCpd+5qQ= github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19/go.mod h1:GVzumIo3zR23/qGRN2AdnVkIPHcKMq/D89EGWZfMGQ0= -github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d h1:7pjEE0BXLjzQlq5uKP5B2BTl9jTgDKaOfJx2Qfb01Jo= -github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d/go.mod h1:JTvIe8YbCjHpy8vy9uZBSpDXxawNXSnIe/Wlf3I09Tk= +github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d h1:IlhCuTqhPEfpW+q/8ZlhmjflB/Onn9AhtXuRCRYa+oo= +github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d/go.mod h1:Hvl0SeW3Y/ZUgVQrfjzumterrF5T898YtkhDguq+pQA= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/graft/coreth/plugin/evm/atomic/txpool/mempool.go b/graft/coreth/plugin/evm/atomic/txpool/mempool.go index a28cbdd6f8bd..3674aaa86e2d 100644 --- a/graft/coreth/plugin/evm/atomic/txpool/mempool.go +++ b/graft/coreth/plugin/evm/atomic/txpool/mempool.go @@ -15,10 +15,13 @@ import ( "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/utils/bloom" ) var ( - _ gossip.Set[*atomic.Tx] = (*Mempool)(nil) + _ gossip.HandlerSet[*atomic.Tx] = (*Mempool)(nil) + _ gossip.PullGossiperSet[*atomic.Tx] = (*Mempool)(nil) + _ gossip.PushGossiperSet = (*Mempool)(nil) ErrAlreadyKnown = errors.New("already known") ErrConflict = errors.New("conflict present") @@ -298,9 +301,9 @@ func (m *Mempool) addTx(tx *atomic.Tx, local bool, force bool) error { return nil } -func (m *Mempool) GetFilter() ([]byte, []byte) { +func (m *Mempool) BloomFilter() (*bloom.Filter, ids.ID) { m.lock.RLock() defer m.lock.RUnlock() - return m.bloom.Marshal() + return m.bloom.BloomFilter() } diff --git a/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go b/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go index 05569bb87738..a17e6294e6e1 100644 --- a/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go @@ -16,7 +16,6 @@ import ( "github.com/ava-labs/avalanchego/database/memdb" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic" - "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/vmtest" "github.com/ava-labs/avalanchego/graft/coreth/utils/utilstest" "github.com/ava-labs/avalanchego/ids" @@ -27,6 +26,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -113,31 +113,20 @@ func TestAtomicTxGossip(t *testing.T) { } // Ask the VM for any new transactions. We should get nothing at first. - emptyBloomFilter, err := gossip.NewBloomFilter( - prometheus.NewRegistry(), - "", - config.TxGossipBloomMinTargetElements, - config.TxGossipBloomTargetFalsePositiveRate, - config.TxGossipBloomResetFalsePositiveRate, + requestBytes, err := gossip.MarshalAppRequest( + bloom.EmptyFilter.Marshal(), + agoUtils.RandomBytes(32), ) require.NoError(err) - emptyBloomFilterBytes, _ := emptyBloomFilter.Marshal() - request := &sdk.PullGossipRequest{ - Filter: emptyBloomFilterBytes, - Salt: agoUtils.RandomBytes(32), - } - - requestBytes, err := proto.Marshal(request) - require.NoError(err) wg := &sync.WaitGroup{} wg.Add(1) onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &sdk.PullGossipResponse{} - require.NoError(proto.Unmarshal(responseBytes, response)) - require.Empty(response.Gossip) + responseGossip, err := gossip.ParseAppResponse(responseBytes) + require.NoError(err) + require.Empty(responseGossip) wg.Done() } require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse)) @@ -166,18 +155,17 @@ func TestAtomicTxGossip(t *testing.T) { // Ask the VM for new transactions. We should get the newly issued tx. wg.Add(1) - marshaller := atomic.TxMarshaller{} onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &sdk.PullGossipResponse{} - require.NoError(proto.Unmarshal(responseBytes, response)) - require.Len(response.Gossip, 1) - - gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0]) + responseGossip, err := gossip.ParseAppResponse(responseBytes) require.NoError(err) - require.Equal(tx.ID(), gotTx.GossipID()) - + require.Equal( + [][]byte{ + tx.SignedBytes(), + }, + responseGossip, + ) wg.Done() } require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse)) diff --git a/graft/coreth/plugin/evm/eth_gossiper.go b/graft/coreth/plugin/evm/eth_gossiper.go index b6129670e012..438b15d2a4cd 100644 --- a/graft/coreth/plugin/evm/eth_gossiper.go +++ b/graft/coreth/plugin/evm/eth_gossiper.go @@ -21,6 +21,7 @@ import ( "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/utils/bloom" ethcommon "github.com/ava-labs/libevm/common" ) @@ -28,9 +29,11 @@ import ( const pendingTxsBuffer = 10 var ( - _ gossip.Gossipable = (*GossipEthTx)(nil) - _ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil) - _ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil) + _ gossip.Gossipable = (*GossipEthTx)(nil) + _ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil) + _ gossip.HandlerSet[*GossipEthTx] = (*GossipEthTxPool)(nil) + _ gossip.PullGossiperSet[*GossipEthTx] = (*GossipEthTxPool)(nil) + _ gossip.PushGossiperSet = (*GossipEthTxPool)(nil) _ eth.PushGossiper = (*EthPushGossiper)(nil) ) @@ -132,11 +135,11 @@ func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) { }) } -func (g *GossipEthTxPool) GetFilter() ([]byte, []byte) { +func (g *GossipEthTxPool) BloomFilter() (*bloom.Filter, ids.ID) { g.lock.RLock() defer g.lock.RUnlock() - return g.bloom.Marshal() + return g.bloom.BloomFilter() } type GossipEthTxMarshaller struct{} diff --git a/graft/coreth/plugin/evm/gossip/handler.go b/graft/coreth/plugin/evm/gossip/handler.go index 69c376957e1a..99529ddea120 100644 --- a/graft/coreth/plugin/evm/gossip/handler.go +++ b/graft/coreth/plugin/evm/gossip/handler.go @@ -22,7 +22,7 @@ var _ p2p.Handler = (*txGossipHandler)(nil) func NewTxGossipHandler[T gossip.Gossipable]( log logging.Logger, marshaller gossip.Marshaller[T], - mempool gossip.Set[T], + mempool gossip.HandlerSet[T], metrics gossip.Metrics, maxMessageSize int, throttlingPeriod time.Duration, diff --git a/graft/coreth/plugin/evm/tx_gossip_test.go b/graft/coreth/plugin/evm/tx_gossip_test.go index 7f08d501969f..5b4159eeb708 100644 --- a/graft/coreth/plugin/evm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/tx_gossip_test.go @@ -17,7 +17,6 @@ import ( "google.golang.org/protobuf/proto" "github.com/ava-labs/avalanchego/database/memdb" - "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/upgrade/ap0" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/vmtest" "github.com/ava-labs/avalanchego/graft/coreth/utils/utilstest" @@ -29,6 +28,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -107,31 +107,20 @@ func TestEthTxGossip(t *testing.T) { } // Ask the VM for any new transactions. We should get nothing at first. - emptyBloomFilter, err := gossip.NewBloomFilter( - prometheus.NewRegistry(), - "", - config.TxGossipBloomMinTargetElements, - config.TxGossipBloomTargetFalsePositiveRate, - config.TxGossipBloomResetFalsePositiveRate, + requestBytes, err := gossip.MarshalAppRequest( + bloom.EmptyFilter.Marshal(), + agoUtils.RandomBytes(32), ) require.NoError(err) - emptyBloomFilterBytes, _ := emptyBloomFilter.Marshal() - request := &sdk.PullGossipRequest{ - Filter: emptyBloomFilterBytes, - Salt: agoUtils.RandomBytes(32), - } - - requestBytes, err := proto.Marshal(request) - require.NoError(err) wg := &sync.WaitGroup{} wg.Add(1) onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &sdk.PullGossipResponse{} - require.NoError(proto.Unmarshal(responseBytes, response)) - require.Empty(response.Gossip) + response, err := gossip.ParseAppResponse(responseBytes) + require.NoError(err) + require.Empty(response) wg.Done() } require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse)) @@ -151,19 +140,22 @@ func TestEthTxGossip(t *testing.T) { // wait so we aren't throttled by the vm time.Sleep(5 * time.Second) - marshaller := GossipEthTxMarshaller{} // Ask the VM for new transactions. We should get the newly issued tx. wg.Add(1) onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &sdk.PullGossipResponse{} - require.NoError(proto.Unmarshal(responseBytes, response)) - require.Len(response.Gossip, 1) + response, err := gossip.ParseAppResponse(responseBytes) + require.NoError(err) - gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0]) + txBytes, err := signedTx.MarshalBinary() require.NoError(err) - require.Equal(signedTx.Hash(), gotTx.Tx.Hash()) + require.Equal( + [][]byte{ + txBytes, + }, + response, + ) wg.Done() } diff --git a/network/p2p/gossip/bloom.go b/network/p2p/gossip/bloom.go index c46e0776215c..341284f257ca 100644 --- a/network/p2p/gossip/bloom.go +++ b/network/p2p/gossip/bloom.go @@ -18,6 +18,8 @@ import ( // // Invariant: The returned bloom filter is not safe to reset concurrently with // other operations. However, it is otherwise safe to access concurrently. +// +// Deprecated: [BloomSet] should be used to manage bloom filters. func NewBloomFilter( registerer prometheus.Registerer, namespace string, @@ -45,6 +47,7 @@ func NewBloomFilter( return filter, err } +// Deprecated: [BloomSet] should be used to manage bloom filters. type BloomFilter struct { minTargetElements int targetFalsePositiveProbability float64 @@ -72,12 +75,8 @@ func (b *BloomFilter) Has(gossipable Gossipable) bool { return bloom.Contains(b.bloom, h[:], b.salt[:]) } -func (b *BloomFilter) Marshal() ([]byte, []byte) { - bloomBytes := b.bloom.Marshal() - // salt must be copied here to ensure the bytes aren't overwritten if salt - // is later modified. - salt := b.salt - return bloomBytes, salt[:] +func (b *BloomFilter) BloomFilter() (*bloom.Filter, ids.ID) { + return b.bloom, b.salt } // ResetBloomFilterIfNeeded resets a bloom filter if it breaches [targetFalsePositiveProbability]. @@ -86,6 +85,8 @@ func (b *BloomFilter) Marshal() ([]byte, []byte) { // the same [targetFalsePositiveProbability]. // // Returns true if the bloom filter was reset. +// +// Deprecated: [BloomSet] should be used to manage bloom filters. func ResetBloomFilterIfNeeded( bloomFilter *BloomFilter, targetElements int, diff --git a/network/p2p/gossip/bloom_test.go b/network/p2p/gossip/bloom_test.go index 0649972d4afc..03485e3d6024 100644 --- a/network/p2p/gossip/bloom_test.go +++ b/network/p2p/gossip/bloom_test.go @@ -81,19 +81,22 @@ func TestBloomFilterRefresh(t *testing.T) { var resetCount uint64 for _, item := range tt.add { - bloomBytes, saltBytes := bloom.Marshal() - initialBloomBytes := slices.Clone(bloomBytes) - initialSaltBytes := slices.Clone(saltBytes) + bloomFilter, salt := bloom.BloomFilter() + initialBloomBytes := slices.Clone(bloomFilter.Marshal()) + initialSaltBytes := slices.Clone(salt[:]) reset, err := ResetBloomFilterIfNeeded(bloom, len(tt.add)) require.NoError(err) if reset { resetCount++ } - bloom.Add(item) + require.Equal(initialBloomBytes, bloomFilter.Marshal()) + require.Equal(initialSaltBytes, salt[:]) - require.Equal(initialBloomBytes, bloomBytes) - require.Equal(initialSaltBytes, saltBytes) + // If the bloom filter wasn't reset, adding an item may modify + // the returned bloom filter, so this must be done after the + // checks. + bloom.Add(item) } require.Equal(tt.resetCount, resetCount) diff --git a/network/p2p/gossip/gossip.go b/network/p2p/gossip/gossip.go index c03e513854df..35867588a29f 100644 --- a/network/p2p/gossip/gossip.go +++ b/network/p2p/gossip/gossip.go @@ -18,6 +18,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/buffer" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -40,6 +41,7 @@ const ( var ( _ Gossiper = (*ValidatorGossiper)(nil) _ Gossiper = (*PullGossiper[Gossipable])(nil) + _ Gossiper = (*PushGossiper[Gossipable])(nil) ioTypeLabels = []string{ioLabel, typeLabel} sentPushLabels = prometheus.Labels{ @@ -75,6 +77,17 @@ var ( ErrInvalidRegossipFrequency = errors.New("re-gossip frequency cannot be negative") ) +// Gossipable is an item that can be gossiped across the network +type Gossipable interface { + GossipID() ids.ID +} + +// Marshaller handles parsing logic for a concrete Gossipable type +type Marshaller[T Gossipable] interface { + MarshalGossip(T) ([]byte, error) + UnmarshalGossip([]byte) (T, error) +} + // Gossiper gossips Gossipables to other nodes type Gossiper interface { // Gossip runs a cycle of gossip. Returns an error if we failed to gossip. @@ -191,7 +204,7 @@ func (v ValidatorGossiper) Gossip(ctx context.Context) error { func NewPullGossiper[T Gossipable]( log logging.Logger, marshaller Marshaller[T], - set Set[T], + set PullGossiperSet[T], client *p2p.Client, metrics Metrics, pollSize int, @@ -206,17 +219,30 @@ func NewPullGossiper[T Gossipable]( } } +// PullGossiperSet exposes the current bloom filter and allows adding new items +// that were not included in the filter. +// +// TODO: Consider naming this interface based on what it provides rather than +// how its used. +type PullGossiperSet[T Gossipable] interface { + // Add adds a value to the set. Returns an error if v was not added. + Add(v T) error + // BloomFilter returns the bloom filter and its corresponding salt. + BloomFilter() (bloom *bloom.Filter, salt ids.ID) +} + type PullGossiper[T Gossipable] struct { log logging.Logger marshaller Marshaller[T] - set Set[T] + set PullGossiperSet[T] client *p2p.Client metrics Metrics pollSize int } func (p *PullGossiper[_]) Gossip(ctx context.Context) error { - msgBytes, err := MarshalAppRequest(p.set.GetFilter()) + bf, salt := p.set.BloomFilter() + msgBytes, err := MarshalAppRequest(bf.Marshal(), salt[:]) if err != nil { return err } @@ -293,7 +319,7 @@ func (p *PullGossiper[_]) handleResponse( // NewPushGossiper returns an instance of PushGossiper func NewPushGossiper[T Gossipable]( marshaller Marshaller[T], - mempool Set[T], + set PushGossiperSet, validators p2p.ValidatorSubset, client *p2p.Client, metrics Metrics, @@ -320,7 +346,7 @@ func NewPushGossiper[T Gossipable]( return &PushGossiper[T]{ marshaller: marshaller, - set: mempool, + set: set, validators: validators, client: client, metrics: metrics, @@ -336,10 +362,19 @@ func NewPushGossiper[T Gossipable]( }, nil } +// PushGossiperSet exposes whether hashes are still included in a set. +// +// TODO: Consider naming this interface based on what it provides rather than +// how its used. +type PushGossiperSet interface { + // Has returns true if the hash is in the set. + Has(h ids.ID) bool +} + // PushGossiper broadcasts gossip to peers randomly in the network type PushGossiper[T Gossipable] struct { marshaller Marshaller[T] - set Set[T] + set PushGossiperSet validators p2p.ValidatorSubset client *p2p.Client metrics Metrics diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index cb8222ceff4f..deb060a01707 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -20,7 +20,6 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/snow/validators/validatorstest" - "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -68,30 +67,44 @@ func (marshaller) UnmarshalGossip(bytes []byte) (tx, error) { } type setDouble struct { + lock sync.RWMutex txs set.Set[tx] - bloom *BloomFilter onAdd func(tx tx) } -func (s *setDouble) Add(gossipable tx) error { - if s.txs.Contains(gossipable) { - return fmt.Errorf("%s already present", ids.ID(gossipable)) +func (s *setDouble) Add(t tx) error { + s.lock.Lock() + defer s.lock.Unlock() + + if s.txs.Contains(t) { + return fmt.Errorf("%s already present", t) } - s.txs.Add(gossipable) - s.bloom.Add(gossipable) + s.txs.Add(t) if s.onAdd != nil { - s.onAdd(gossipable) + s.onAdd(t) } - return nil } -func (s *setDouble) Has(gossipID ids.ID) bool { - return s.txs.Contains(tx(gossipID)) +func (s *setDouble) Remove(t tx) { + s.lock.Lock() + defer s.lock.Unlock() + + s.txs.Remove(t) +} + +func (s *setDouble) Has(h ids.ID) bool { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.txs.Contains(tx(h)) } -func (s *setDouble) Iterate(f func(gossipable tx) bool) { +func (s *setDouble) Iterate(f func(t tx) bool) { + s.lock.RLock() + defer s.lock.RUnlock() + for tx := range s.txs { if !f(tx) { return @@ -99,8 +112,11 @@ func (s *setDouble) Iterate(f func(gossipable tx) bool) { } } -func (s *setDouble) GetFilter() ([]byte, []byte) { - return s.bloom.Marshal() +func (s *setDouble) Len() int { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.txs.Len() } func TestGossiperGossip(t *testing.T) { @@ -175,13 +191,10 @@ func TestGossiperGossip(t *testing.T) { ) require.NoError(err) - responseBloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05) + responseBloomSet, err := NewBloomSet(&setDouble{}, BloomSetConfig{}) require.NoError(err) - responseSet := &setDouble{ - bloom: responseBloom, - } for _, item := range tt.responder { - require.NoError(responseSet.Add(item)) + require.NoError(responseBloomSet.Add(item)) } metrics, err := NewMetrics(prometheus.NewRegistry(), "") @@ -196,7 +209,7 @@ func TestGossiperGossip(t *testing.T) { handler := NewHandler[tx]( logging.NoLog{}, marshaller, - responseSet, + responseBloomSet, metrics, tt.targetResponseSize, ) @@ -218,13 +231,11 @@ func TestGossiperGossip(t *testing.T) { require.NoError(err) require.NoError(requestNetwork.Connected(t.Context(), ids.EmptyNodeID, nil)) - bloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05) + var requestSet setDouble + requestBloomSet, err := NewBloomSet(&requestSet, BloomSetConfig{}) require.NoError(err) - requestSet := &setDouble{ - bloom: bloom, - } for _, item := range tt.requester { - require.NoError(requestSet.Add(item)) + require.NoError(requestBloomSet.Add(item)) } requestClient := requestNetwork.NewClient( @@ -236,7 +247,7 @@ func TestGossiperGossip(t *testing.T) { gossiper := NewPullGossiper[tx]( logging.NoLog{}, marshaller, - requestSet, + requestBloomSet, requestClient, metrics, 1, @@ -457,20 +468,10 @@ func TestPushGossiperNew(t *testing.T) { } } -type fullSet[T Gossipable] struct{} - -func (fullSet[T]) Add(T) error { - return nil -} - -func (fullSet[T]) Has(ids.ID) bool { - return true -} - -func (fullSet[T]) Iterate(func(gossipable T) bool) {} +type hasFunc func(id ids.ID) bool -func (fullSet[_]) GetFilter() ([]byte, []byte) { - return bloom.FullFilter.Marshal(), ids.Empty[:] +func (h hasFunc) Has(id ids.ID) bool { + return h(id) } // Tests that the outgoing gossip is equivalent to what was accumulated @@ -593,7 +594,9 @@ func TestPushGossiper(t *testing.T) { gossiper, err := NewPushGossiper[tx]( marshaller, - fullSet[tx]{}, + hasFunc(func(ids.ID) bool { + return true // Never remove the items from the set + }), validators, client, metrics, diff --git a/network/p2p/gossip/gossipable.go b/network/p2p/gossip/gossipable.go deleted file mode 100644 index b535609a7a48..000000000000 --- a/network/p2p/gossip/gossipable.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package gossip - -import "github.com/ava-labs/avalanchego/ids" - -// Gossipable is an item that can be gossiped across the network -type Gossipable interface { - GossipID() ids.ID -} - -// Marshaller handles parsing logic for a concrete Gossipable type -type Marshaller[T Gossipable] interface { - MarshalGossip(T) ([]byte, error) - UnmarshalGossip([]byte) (T, error) -} - -// Set holds a set of known Gossipable items -type Set[T Gossipable] interface { - // Add adds a Gossipable to the set. Returns an error if gossipable was not - // added. - Add(gossipable T) error - // Has returns true if the gossipable is in the set. - Has(gossipID ids.ID) bool - // Iterate iterates over elements until [f] returns false - Iterate(f func(gossipable T) bool) - // GetFilter returns the byte representation of bloom filter and its - // corresponding salt. - GetFilter() (bloom []byte, salt []byte) -} diff --git a/network/p2p/gossip/handler.go b/network/p2p/gossip/handler.go index 7c016030c913..de405fc40893 100644 --- a/network/p2p/gossip/handler.go +++ b/network/p2p/gossip/handler.go @@ -18,10 +18,22 @@ import ( var _ p2p.Handler = (*Handler[Gossipable])(nil) +// HandlerSet exposes the ability to add new values to the set in response to +// pushed information and for responding to pull requests. +// +// TODO: Consider naming this interface based on what it provides rather than +// how its used. +type HandlerSet[T Gossipable] interface { + // Add adds a value to the set. Returns an error if v was not added. + Add(v T) error + // Iterate iterates over elements until f returns false. + Iterate(f func(v T) bool) +} + func NewHandler[T Gossipable]( log logging.Logger, marshaller Marshaller[T], - set Set[T], + set HandlerSet[T], metrics Metrics, targetResponseSize int, ) *Handler[T] { @@ -39,7 +51,7 @@ type Handler[T Gossipable] struct { p2p.Handler marshaller Marshaller[T] log logging.Logger - set Set[T] + set HandlerSet[T] metrics Metrics targetResponseSize int } diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go new file mode 100644 index 000000000000..b5b28a7f6ee9 --- /dev/null +++ b/network/p2p/gossip/set.go @@ -0,0 +1,191 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "crypto/rand" + "errors" + "fmt" + "sync" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/bloom" +) + +const ( + DefaultMinTargetElements = 1000 + DefaultTargetFalsePositiveProbability = .01 + DefaultResetFalsePositiveProbability = .05 +) + +var ( + _ Set[Gossipable] = (*BloomSet[Gossipable])(nil) + _ PullGossiperSet[Gossipable] = (*BloomSet[Gossipable])(nil) + + ErrBloomReset = errors.New("bloom reset") +) + +type BloomSetConfig struct { + // Metrics allows exposing the current state of the bloom filter across + // additions and resets. If nil, no metrics are recorded. + Metrics *bloom.Metrics + // MinTargetElements is the minimum number of elements to target when + // creating a new bloom filter. If zero, [DefaultMinTargetElements] is used. + MinTargetElements int + // TargetFalsePositiveProbability is the target false positive probability + // when creating a new bloom filter. If zero, + // [DefaultTargetFalsePositiveProbability] is used. + TargetFalsePositiveProbability float64 + // ResetFalsePositiveProbability is the false positive probability at + // which the bloom filter is reset. If zero, + // [DefaultResetFalsePositiveProbability] is used. + ResetFalsePositiveProbability float64 +} + +func (c *BloomSetConfig) fillDefaults() { + if c.MinTargetElements == 0 { + c.MinTargetElements = DefaultMinTargetElements + } + if c.TargetFalsePositiveProbability == 0 { + c.TargetFalsePositiveProbability = DefaultTargetFalsePositiveProbability + } + if c.ResetFalsePositiveProbability == 0 { + c.ResetFalsePositiveProbability = DefaultResetFalsePositiveProbability + } +} + +type Set[T Gossipable] interface { + HandlerSet[T] + PushGossiperSet + // Len returns the number of items in the set. + // + // This value should match the number of items that can be iterated over + // with a call to Iterate. + Len() int +} + +// NewBloomSet wraps the [Set] with a bloom filter. It is expected for all +// future additions to the provided set to go through the returned [BloomSet]. +func NewBloomSet[T Gossipable]( + set Set[T], + c BloomSetConfig, +) (*BloomSet[T], error) { + c.fillDefaults() + m := &BloomSet[T]{ + set: set, + c: c, + } + return m, m.resetBloom() +} + +type BloomSet[T Gossipable] struct { + set Set[T] + c BloomSetConfig + + lock sync.RWMutex + maxCount int + bloom *bloom.Filter + // salt is provided to eventually unblock collisions in Bloom. It's possible + // that conflicting Gossipable items collide in the bloom filter, so a salt + // is generated to eventually resolve collisions. + salt ids.ID +} + +// Add adds v to the set and bloom filter. +// +// If adding the inner set succeeds and resetting the bloom filter fails, +// [ErrBloomReset] is returned. However, it is still guaranteed that v has +// been added to the bloom filter. +func (s *BloomSet[T]) Add(v T) error { + if err := s.set.Add(v); err != nil { + return err + } + return s.addToBloom(v.GossipID()) +} + +// addToBloom adds the provided ID to the bloom filter. +// +// Even if an error is returned, the ID has still been added to the bloom +// filter. +func (s *BloomSet[T]) addToBloom(h ids.ID) error { + s.lock.RLock() + if bloom.Add(s.bloom, h[:], s.salt[:]) && s.c.Metrics != nil { + s.c.Metrics.Count.Inc() + } + shouldReset := s.shouldReset() + s.lock.RUnlock() + + if !shouldReset { + return nil + } + + s.lock.Lock() + defer s.lock.Unlock() + + // Bloom filter was already reset by another thread + if !s.shouldReset() { + return nil + } + return s.resetBloom() +} + +// shouldReset expects either a read lock or a write lock to be held. +func (s *BloomSet[T]) shouldReset() bool { + return s.bloom.Count() > s.maxCount +} + +// resetBloom attempts to generate a new bloom filter and fill it with the +// current entries in the set. +// +// If an error is returned, the bloom filter and salt are unchanged. +// +// resetBloom expects a write lock to be held. +func (s *BloomSet[T]) resetBloom() error { + targetElements := max(2*s.set.Len(), s.c.MinTargetElements) + numHashes, numEntries := bloom.OptimalParameters( + targetElements, + s.c.TargetFalsePositiveProbability, + ) + newBloom, err := bloom.New(numHashes, numEntries) + if err != nil { + return fmt.Errorf("%w: creating new bloom: %w", ErrBloomReset, err) + } + var newSalt ids.ID + if _, err := rand.Read(newSalt[:]); err != nil { + return fmt.Errorf("%w: generating new salt: %w", ErrBloomReset, err) + } + s.set.Iterate(func(v T) bool { + h := v.GossipID() + bloom.Add(newBloom, h[:], newSalt[:]) + return true + }) + + s.maxCount = bloom.EstimateCount(numHashes, numEntries, s.c.ResetFalsePositiveProbability) + s.bloom = newBloom + s.salt = newSalt + + if s.c.Metrics != nil { + s.c.Metrics.Reset(newBloom, s.maxCount) + } + return nil +} + +func (s *BloomSet[T]) Has(h ids.ID) bool { + return s.set.Has(h) +} + +func (s *BloomSet[T]) Iterate(f func(T) bool) { + s.set.Iterate(f) +} + +func (s *BloomSet[T]) Len() int { + return s.set.Len() +} + +func (s *BloomSet[_]) BloomFilter() (*bloom.Filter, ids.ID) { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.bloom, s.salt +} diff --git a/network/p2p/gossip/set_test.go b/network/p2p/gossip/set_test.go new file mode 100644 index 000000000000..bbdb90a7c675 --- /dev/null +++ b/network/p2p/gossip/set_test.go @@ -0,0 +1,173 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "fmt" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/bloom" +) + +func TestBloomSet_Refresh(t *testing.T) { + type ( + op struct { + add *tx + remove *tx + } + test struct { + name string + resetFalsePositiveProbability float64 + ops []op + expectedInFilter []tx + expectedResetCount float64 + } + ) + tests := []test{ + { + name: "no refresh", + resetFalsePositiveProbability: 1, // maxCount = 9223372036854775807 + ops: []op{ + {add: &tx{0}}, + {add: &tx{1}}, + {add: &tx{2}}, + }, + expectedInFilter: []tx{ + {0}, + {1}, + {2}, + }, + expectedResetCount: 0, + }, + { + name: "no refresh - with removals", + resetFalsePositiveProbability: 1, // maxCount = 9223372036854775807 + ops: []op{ + {add: &tx{0}}, + {add: &tx{1}}, + {add: &tx{2}}, + {remove: &tx{0}}, + {remove: &tx{1}}, + {remove: &tx{2}}, + }, + expectedInFilter: []tx{ + {0}, + {1}, + {2}, + }, + expectedResetCount: 0, + }, + { + name: "refresh", + resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1 + ops: []op{ + {add: &tx{0}}, // no reset + {remove: &tx{0}}, + {add: &tx{1}}, // reset + }, + expectedInFilter: []tx{ + {1}, + }, + expectedResetCount: 1, + }, + { + name: "multiple refresh", + resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1 + ops: []op{ + {add: &tx{0}}, // no reset + {remove: &tx{0}}, + {add: &tx{1}}, // reset + {remove: &tx{1}}, + {add: &tx{2}}, // reset + }, + expectedInFilter: []tx{ + {2}, + }, + expectedResetCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + const ( + minTargetElements = 1 + targetFalsePositiveProbability = 0.000001 + ) + var s setDouble + m, err := bloom.NewMetrics("", prometheus.NewRegistry()) + require.NoError(t, err, "NewMetrics()") + bs, err := NewBloomSet( + &s, + BloomSetConfig{ + Metrics: m, + MinTargetElements: minTargetElements, + TargetFalsePositiveProbability: targetFalsePositiveProbability, + ResetFalsePositiveProbability: tt.resetFalsePositiveProbability, + }, + ) + require.NoError(t, err, "NewBloomSet()") + + for _, op := range tt.ops { + if op.add != nil { + require.NoErrorf(t, bs.Add(*op.add), "%T.Add(...)", bs) + } + if op.remove != nil { + s.txs.Remove(*op.remove) + } + } + + // Add one to expectedResetCount to account for the initial creation + // of the bloom filter. + require.Equal(t, tt.expectedResetCount+1, testutil.ToFloat64(m.ResetCount), "number of resets") + b, h := bs.BloomFilter() + for _, expected := range tt.expectedInFilter { + require.Truef(t, bloom.Contains(b, expected[:], h[:]), "%T.Contains(%s)", b, expected.GossipID()) + } + }) + } +} + +// TestBloomSet_Concurrent tests that BloomSet ensures that the returned bloom +// filter is a super set of the items in the Set at the time it is called, even +// under concurrent resets, because resets of the filter are accompanied by +// refilling from the Set. +func TestBloomSet_Concurrent(t *testing.T) { + var s setDouble + bs, err := NewBloomSet( + &s, + BloomSetConfig{ + MinTargetElements: 1, + TargetFalsePositiveProbability: 0.01, + ResetFalsePositiveProbability: 0.0000000001, // Forces frequent resets + }, + ) + require.NoError(t, err) + + var eg errgroup.Group + for range 10 { + eg.Go(func() error { + for range 1000 { + tx := tx(ids.GenerateTestID()) + if err := bs.Add(tx); err != nil { + return err + } + + bf, salt := bs.BloomFilter() + if !bloom.Contains(bf, tx[:], salt[:]) { + return fmt.Errorf("expected to find %s in bloom filter", tx) + } + + s.Remove(tx) + } + return nil + }) + } + require.NoError(t, eg.Wait()) +} diff --git a/vms/avm/network/gossip.go b/vms/avm/network/gossip.go index fb2077449dae..4b78a3d9ba98 100644 --- a/vms/avm/network/gossip.go +++ b/vms/avm/network/gossip.go @@ -15,6 +15,7 @@ import ( "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/txs/mempool" @@ -154,9 +155,9 @@ func (g *gossipMempool) Iterate(f func(*txs.Tx) bool) { g.Mempool.Iterate(f) } -func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte) { +func (g *gossipMempool) BloomFilter() (*bloom.Filter, ids.ID) { g.lock.RLock() defer g.lock.RUnlock() - return g.bloom.Marshal() + return g.bloom.BloomFilter() } diff --git a/vms/platformvm/network/gossip.go b/vms/platformvm/network/gossip.go index 006c1740c57f..575f9bdc6bf7 100644 --- a/vms/platformvm/network/gossip.go +++ b/vms/platformvm/network/gossip.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/platformvm/txs" "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" @@ -150,9 +151,9 @@ func (g *gossipMempool) Has(txID ids.ID) bool { return ok } -func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte) { +func (g *gossipMempool) BloomFilter() (*bloom.Filter, ids.ID) { g.lock.RLock() defer g.lock.RUnlock() - return g.bloom.Marshal() + return g.bloom.BloomFilter() }