From b1cd95396866206caa72fa77a8d2f62c484a3134 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Sun, 23 Nov 2025 14:42:35 +0000 Subject: [PATCH 1/4] feat(gossip): make `BloomFilter` threadsafe and function a method --- network/p2p/gossip/bloom.go | 87 ++++++++++++++++++++------------ network/p2p/gossip/bloom_test.go | 31 ++++++++++++ 2 files changed, 87 insertions(+), 31 deletions(-) diff --git a/network/p2p/gossip/bloom.go b/network/p2p/gossip/bloom.go index 9c05c8db78e0..8b105f6fc758 100644 --- a/network/p2p/gossip/bloom.go +++ b/network/p2p/gossip/bloom.go @@ -5,6 +5,7 @@ package gossip import ( "crypto/rand" + "sync" "github.com/prometheus/client_golang/prometheus" @@ -16,8 +17,7 @@ import ( // anticipated at any moment, and a false positive probability of [targetFalsePositiveProbability]. If the // false positive probability exceeds [resetFalsePositiveProbability], the bloom filter will be reset. // -// Invariant: The returned bloom filter is not safe to reset concurrently with -// other operations. However, it is otherwise safe to access concurrently. +// The returned bloom filter is safe for concurrent usage. func NewBloomFilter( registerer prometheus.Registerer, namespace string, @@ -36,12 +36,8 @@ func NewBloomFilter( metrics: metrics, } - err = resetBloomFilter( - filter, - minTargetElements, - targetFalsePositiveProbability, - resetFalsePositiveProbability, - ) + // A lock is unnecessary as no other goroutine could have access. + err = filter.resetWhenLocked(minTargetElements) return filter, err } @@ -52,6 +48,11 @@ type BloomFilter struct { metrics *bloom.Metrics + // [bloom.Filter] itself is threadsafe, but resetting requires replacing it + // entirely. This mutex protects the [BloomFilter] fields, not the + // [bloom.Filter], so resetting is a write while everything else is a read. + resetMu sync.RWMutex + maxCount int bloom *bloom.Filter // salt is provided to eventually unblock collisions in Bloom. It's possible @@ -61,17 +62,26 @@ type BloomFilter struct { } func (b *BloomFilter) Add(gossipable Gossipable) { + b.resetMu.RLock() + defer b.resetMu.RUnlock() + h := gossipable.GossipID() bloom.Add(b.bloom, h[:], b.salt[:]) b.metrics.Count.Inc() } func (b *BloomFilter) Has(gossipable Gossipable) bool { + b.resetMu.RLock() + defer b.resetMu.RUnlock() + h := gossipable.GossipID() return bloom.Contains(b.bloom, h[:], b.salt[:]) } func (b *BloomFilter) Marshal() ([]byte, []byte) { + b.resetMu.RLock() + defer b.resetMu.RUnlock() + bloomBytes := b.bloom.Marshal() // salt must be copied here to ensure the bytes aren't overwritten if salt // is later modified. @@ -81,37 +91,52 @@ func (b *BloomFilter) Marshal() ([]byte, []byte) { // ResetBloomFilterIfNeeded resets a bloom filter if it breaches [targetFalsePositiveProbability]. // -// If [targetElements] exceeds [minTargetElements], the size of the bloom filter will grow to maintain -// the same [targetFalsePositiveProbability]. -// -// Returns true if the bloom filter was reset. +// Deprecated: use [BloomFilter.ResetIfNeeded]. func ResetBloomFilterIfNeeded( bloomFilter *BloomFilter, targetElements int, ) (bool, error) { - if bloomFilter.bloom.Count() <= bloomFilter.maxCount { + return bloomFilter.ResetIfNeeded(targetElements) +} + +// ResetIfNeeded resets the bloom filter if it breaches [targetFalsePositiveProbability]. +// +// If [targetElements] exceeds [minTargetElements], the size of the bloom filter will grow to maintain +// the same [targetFalsePositiveProbability]. +// +// Returns true if the bloom filter was reset. +func (b *BloomFilter) ResetIfNeeded(targetElements int) (bool, error) { + mu := &b.resetMu + + // Although this pattern requires a double checking of the same property, + // it's cheap and avoids unnecessarily locking out all other goroutines on + // every call to this method. + isResetNeeded := func() bool { + return b.bloom.Count() > b.maxCount + } + mu.RLock() + reset := isResetNeeded() + mu.RUnlock() + if !reset { return false, nil } - targetElements = max(bloomFilter.minTargetElements, targetElements) - err := resetBloomFilter( - bloomFilter, - targetElements, - bloomFilter.targetFalsePositiveProbability, - bloomFilter.resetFalsePositiveProbability, - ) + mu.Lock() + defer mu.Unlock() + // Another thread may have beaten us to acquire the write lock. + if !isResetNeeded() { + return false, nil + } + + targetElements = max(b.minTargetElements, targetElements) + err := b.resetWhenLocked(targetElements) return err == nil, err } -func resetBloomFilter( - bloomFilter *BloomFilter, - targetElements int, - targetFalsePositiveProbability, - resetFalsePositiveProbability float64, -) error { +func (b *BloomFilter) resetWhenLocked(targetElements int) error { numHashes, numEntries := bloom.OptimalParameters( targetElements, - targetFalsePositiveProbability, + b.targetFalsePositiveProbability, ) newBloom, err := bloom.New(numHashes, numEntries) if err != nil { @@ -122,10 +147,10 @@ func resetBloomFilter( return err } - bloomFilter.maxCount = bloom.EstimateCount(numHashes, numEntries, resetFalsePositiveProbability) - bloomFilter.bloom = newBloom - bloomFilter.salt = newSalt + b.maxCount = bloom.EstimateCount(numHashes, numEntries, b.resetFalsePositiveProbability) + b.bloom = newBloom + b.salt = newSalt - bloomFilter.metrics.Reset(newBloom, bloomFilter.maxCount) + b.metrics.Reset(newBloom, b.maxCount) return nil } diff --git a/network/p2p/gossip/bloom_test.go b/network/p2p/gossip/bloom_test.go index 61fe94e5bc60..ef9c310c5257 100644 --- a/network/p2p/gossip/bloom_test.go +++ b/network/p2p/gossip/bloom_test.go @@ -5,6 +5,7 @@ package gossip import ( "slices" + "sync" "testing" "github.com/prometheus/client_golang/prometheus" @@ -106,3 +107,33 @@ func TestBloomFilterRefresh(t *testing.T) { }) } } + +func TesetBloomFilterClobber(t *testing.T) { + b, err := NewBloomFilter(prometheus.NewRegistry(), "", 1, 0.5, 0.5) + require.NoError(t, err, "NewBloomFilter()") + + start := make(chan struct{}) + var wg sync.WaitGroup + + for _, fn := range []func(){ + func() { b.Add(&testTx{}) }, + func() { b.Has(&testTx{}) }, + func() { b.Marshal() }, + func() { + _, err := b.ResetIfNeeded(1) + require.NoErrorf(t, err, "%T.ResetIfNeeded()", b) + }, + } { + for range 10_000 { + wg.Add(1) + go func() { + <-start + fn() + wg.Done() + }() + } + } + + close(start) + wg.Wait() +} From dbd23ec2f1793cbed1344bec5cd7dac88826c6b9 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Sun, 23 Nov 2025 14:59:20 +0000 Subject: [PATCH 2/4] fix: typo in test name --- network/p2p/gossip/bloom_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/gossip/bloom_test.go b/network/p2p/gossip/bloom_test.go index ef9c310c5257..680d177edb79 100644 --- a/network/p2p/gossip/bloom_test.go +++ b/network/p2p/gossip/bloom_test.go @@ -108,7 +108,7 @@ func TestBloomFilterRefresh(t *testing.T) { } } -func TesetBloomFilterClobber(t *testing.T) { +func TestBloomFilterClobber(t *testing.T) { b, err := NewBloomFilter(prometheus.NewRegistry(), "", 1, 0.5, 0.5) require.NoError(t, err, "NewBloomFilter()") From 7118ad65123872d59d7cc7f89c02da8a626624b5 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Mon, 24 Nov 2025 18:30:57 +0000 Subject: [PATCH 3/4] feat: `afterReset` callback arg --- network/p2p/gossip/bloom.go | 18 +++++++++++++----- network/p2p/gossip/bloom_test.go | 7 ++++++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/network/p2p/gossip/bloom.go b/network/p2p/gossip/bloom.go index 8b105f6fc758..6454170f2dae 100644 --- a/network/p2p/gossip/bloom.go +++ b/network/p2p/gossip/bloom.go @@ -96,7 +96,7 @@ func ResetBloomFilterIfNeeded( bloomFilter *BloomFilter, targetElements int, ) (bool, error) { - return bloomFilter.ResetIfNeeded(targetElements) + return bloomFilter.ResetIfNeeded(targetElements, nil) } // ResetIfNeeded resets the bloom filter if it breaches [targetFalsePositiveProbability]. @@ -104,8 +104,11 @@ func ResetBloomFilterIfNeeded( // If [targetElements] exceeds [minTargetElements], the size of the bloom filter will grow to maintain // the same [targetFalsePositiveProbability]. // -// Returns true if the bloom filter was reset. -func (b *BloomFilter) ResetIfNeeded(targetElements int) (bool, error) { +// Returns true if the bloom filter was reset, in which case the `afterReset` +// function is also called (if non-nil) while still holding a mutex excluding +// all other access. This callback is typically used to refill the Bloom filter +// with known elements. +func (b *BloomFilter) ResetIfNeeded(targetElements int, afterReset func() error) (bool, error) { mu := &b.resetMu // Although this pattern requires a double checking of the same property, @@ -129,8 +132,13 @@ func (b *BloomFilter) ResetIfNeeded(targetElements int) (bool, error) { } targetElements = max(b.minTargetElements, targetElements) - err := b.resetWhenLocked(targetElements) - return err == nil, err + if err := b.resetWhenLocked(targetElements); err != nil { + return false, err + } + if afterReset == nil { + return true, nil + } + return true, afterReset() } func (b *BloomFilter) resetWhenLocked(targetElements int) error { diff --git a/network/p2p/gossip/bloom_test.go b/network/p2p/gossip/bloom_test.go index 680d177edb79..756a70f8d67e 100644 --- a/network/p2p/gossip/bloom_test.go +++ b/network/p2p/gossip/bloom_test.go @@ -120,8 +120,13 @@ func TestBloomFilterClobber(t *testing.T) { func() { b.Has(&testTx{}) }, func() { b.Marshal() }, func() { - _, err := b.ResetIfNeeded(1) + var called bool + reset, err := b.ResetIfNeeded(1, func() error { + called = true + return nil + }) require.NoErrorf(t, err, "%T.ResetIfNeeded()", b) + require.Equalf(t, reset, called, "%T.ResetIfNeeded(..., [callback]) callback called i.f.f. reset", b) }, } { for range 10_000 { From ceaaf8e2d16e16b2d3b8e03720f3972b1c620c40 Mon Sep 17 00:00:00 2001 From: Arran Schlosberg Date: Tue, 25 Nov 2025 10:56:24 +0000 Subject: [PATCH 4/4] refactor!: `ResetIfNeeded()` accepts iterator for refilling --- network/p2p/gossip/bloom.go | 23 ++++++++----- network/p2p/gossip/bloom_test.go | 58 ++++++++++++++++++++++++++++---- 2 files changed, 66 insertions(+), 15 deletions(-) diff --git a/network/p2p/gossip/bloom.go b/network/p2p/gossip/bloom.go index 6454170f2dae..4b4d8e6af3cc 100644 --- a/network/p2p/gossip/bloom.go +++ b/network/p2p/gossip/bloom.go @@ -5,6 +5,7 @@ package gossip import ( "crypto/rand" + "iter" "sync" "github.com/prometheus/client_golang/prometheus" @@ -63,8 +64,11 @@ type BloomFilter struct { func (b *BloomFilter) Add(gossipable Gossipable) { b.resetMu.RLock() - defer b.resetMu.RUnlock() + b.addWhenLocked(gossipable) + b.resetMu.RUnlock() +} +func (b *BloomFilter) addWhenLocked(gossipable Gossipable) { h := gossipable.GossipID() bloom.Add(b.bloom, h[:], b.salt[:]) b.metrics.Count.Inc() @@ -104,11 +108,9 @@ func ResetBloomFilterIfNeeded( // If [targetElements] exceeds [minTargetElements], the size of the bloom filter will grow to maintain // the same [targetFalsePositiveProbability]. // -// Returns true if the bloom filter was reset, in which case the `afterReset` -// function is also called (if non-nil) while still holding a mutex excluding -// all other access. This callback is typically used to refill the Bloom filter -// with known elements. -func (b *BloomFilter) ResetIfNeeded(targetElements int, afterReset func() error) (bool, error) { +// Returns true if the bloom filter was reset, in which case the elements +// yielded by `refillWith` are added to the filter. +func (b *BloomFilter) ResetIfNeeded(targetElements int, refillWith iter.Seq[Gossipable]) (bool, error) { mu := &b.resetMu // Although this pattern requires a double checking of the same property, @@ -135,10 +137,13 @@ func (b *BloomFilter) ResetIfNeeded(targetElements int, afterReset func() error) if err := b.resetWhenLocked(targetElements); err != nil { return false, err } - if afterReset == nil { - return true, nil + + if refillWith != nil { + for g := range refillWith { + b.addWhenLocked(g) + } } - return true, afterReset() + return true, nil } func (b *BloomFilter) resetWhenLocked(targetElements int) error { diff --git a/network/p2p/gossip/bloom_test.go b/network/p2p/gossip/bloom_test.go index 756a70f8d67e..497319c2ca79 100644 --- a/network/p2p/gossip/bloom_test.go +++ b/network/p2p/gossip/bloom_test.go @@ -120,13 +120,8 @@ func TestBloomFilterClobber(t *testing.T) { func() { b.Has(&testTx{}) }, func() { b.Marshal() }, func() { - var called bool - reset, err := b.ResetIfNeeded(1, func() error { - called = true - return nil - }) + _, err := b.ResetIfNeeded(1, nil) require.NoErrorf(t, err, "%T.ResetIfNeeded()", b) - require.Equalf(t, reset, called, "%T.ResetIfNeeded(..., [callback]) callback called i.f.f. reset", b) }, } { for range 10_000 { @@ -142,3 +137,54 @@ func TestBloomFilterClobber(t *testing.T) { close(start) wg.Wait() } + +func TestBloomFilterRefillAfterReset(t *testing.T) { + b, err := NewBloomFilter(prometheus.NewRegistry(), "", 1, 0.5, 0.5) + require.NoError(t, err, "NewBloomFilter()") + + var before []*testTx + for i := range byte(10) { + before = append(before, &testTx{ids.ID{1, i}}) + } + + after := &testTx{ids.ID{2, 0}} + refill := func(yield func(Gossipable) bool) { + yield(after) + } + + steps := []struct { + setup func() + targetEls int + // Although we assert if resetting occurred, this is just to confirm + // proper test setup. The real test is via [BloomFilter.Has] of the + // before / after elements. + wantReset bool + }{ + { + setup: func() { + b.Add(before[0]) + }, + targetEls: 1e6, + wantReset: false, + }, + { + setup: func() { + for _, g := range before { + b.Add(g) + } + }, + targetEls: 1, + wantReset: true, + }, + } + + for _, s := range steps { + s.setup() + reset, err := b.ResetIfNeeded(s.targetEls, refill) + require.NoError(t, err, "ResetIfNeeded()") + require.Equal(t, s.wantReset, reset, "ResetIfNeeded()") + + require.Equalf(t, !reset, b.Has(before[0]), "Has([existing element]) when ResetIfNeeded() returned %t", reset) + require.Equalf(t, reset, b.Has(after), "Has([iterator element]) when ResetIfNeeded() returned %t", reset) + } +}