Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.streamsInbound.delete(id)

// remove peer from topics map
for (const peers of this.topics.values()) {
for (const [topic, peers] of this.topics) {
peers.delete(id)
if (peers.size === 0) {
this.topics.delete(topic)
}
}

// Remove this peer from the mesh
Expand Down Expand Up @@ -1191,17 +1194,21 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.log('subscription update from %p topic %s', from, topic)

let topicSet = this.topics.get(topic)
if (topicSet == null) {
topicSet = new Set()
this.topics.set(topic, topicSet)
}

if (subscribe) {
if (topicSet == null) {
topicSet = new Set()
this.topics.set(topic, topicSet)
}

// subscribe peer to new topic
topicSet.add(from.toString())
} else {
} else if (topicSet != null) {
// unsubscribe from existing topic
topicSet.delete(from.toString())
if (topicSet.size === 0) {
this.topics.delete(topic)
}
}

// TODO: rust-libp2p has A LOT more logic here
Expand Down
80 changes: 80 additions & 0 deletions test/gossip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,86 @@ describe('gossip', () => {
expect(peerInfoB?.tags.get(topic)).to.be.undefined()
})

it('should delete empty topic entries after remote unsubscribe', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
const nodeB = nodes[1]
const topic = 'empty-topic-cleanup'

await connectAllPubSubNodes([nodeA, nodeB])

nodeA.pubsub.subscribe(topic)
await pEvent(nodeB.pubsub, 'subscription-change')

expect((nodeB.pubsub as any).topics.has(topic)).to.be.true()

nodeA.pubsub.unsubscribe(topic)
await pEvent(nodeB.pubsub, 'subscription-change')

expect(nodeB.pubsub.getSubscribers(topic)).to.be.empty()
expect((nodeB.pubsub as any).topics.has(topic)).to.be.false()
})

it('should delete empty topic entries after peer disconnect', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
const nodeB = nodes[1]
const topic = 'disconnect-topic-cleanup'

await connectAllPubSubNodes([nodeA, nodeB])

nodeA.pubsub.subscribe(topic)
await pEvent(nodeB.pubsub, 'subscription-change')

expect((nodeB.pubsub as any).topics.has(topic)).to.be.true()

;(nodeB.pubsub as any).onPeerDisconnected(nodeA.components.peerId)

expect(nodeB.pubsub.getSubscribers(topic)).to.be.empty()
expect((nodeB.pubsub as any).topics.has(topic)).to.be.false()
})

it('should not create empty topic entries from unsubscribe-only updates', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
const nodeB = nodes[1]
const topic = 'unsubscribe-only-cleanup'

await connectAllPubSubNodes([nodeA, nodeB])

;(nodeB.pubsub as any).handleReceivedSubscription(nodeA.components.peerId, topic, false)

expect((nodeB.pubsub as any).topics.has(topic)).to.be.false()
expect(nodeB.pubsub.getSubscribers(topic)).to.be.empty()
})

it('should keep topic entries while other peers remain subscribed', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
const nodeB = nodes[1]
const nodeC = nodes[2]
const topic = 'multi-peer-topic-cleanup'

await connectAllPubSubNodes([nodeA, nodeB, nodeC])

nodeA.pubsub.subscribe(topic)
await pEvent(nodeB.pubsub, 'subscription-change')

nodeC.pubsub.subscribe(topic)
await pEvent(nodeB.pubsub, 'subscription-change')

expect((nodeB.pubsub as any).topics.has(topic)).to.be.true()
expect(nodeB.pubsub.getSubscribers(topic)).to.have.lengthOf(2)

nodeA.pubsub.unsubscribe(topic)
await pEvent(nodeB.pubsub, 'subscription-change')

expect((nodeB.pubsub as any).topics.has(topic)).to.be.true()
const subscribers = nodeB.pubsub.getSubscribers(topic).map((p) => p.toString())
expect(subscribers).to.have.lengthOf(1)
expect(subscribers).to.include(nodeC.components.peerId.toString())
})

it('should reject incoming messages bigger than maxInboundDataLength limit', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
Expand Down
Loading