diff --git a/src/index.ts b/src/index.ts index fc89ff7a..f95f9edd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -963,8 +963,11 @@ export class GossipSub extends TypedEventEmitter implements Pub this.streamsInbound.delete(id) // remove peer from topics map - for (const peers of this.topics.values()) { + for (const [topic, peers] of this.topics) { peers.delete(id) + if (peers.size === 0) { + this.topics.delete(topic) + } } // Remove this peer from the mesh @@ -1191,17 +1194,21 @@ export class GossipSub extends TypedEventEmitter implements Pub this.log('subscription update from %p topic %s', from, topic) let topicSet = this.topics.get(topic) - if (topicSet == null) { - topicSet = new Set() - this.topics.set(topic, topicSet) - } if (subscribe) { + if (topicSet == null) { + topicSet = new Set() + this.topics.set(topic, topicSet) + } + // subscribe peer to new topic topicSet.add(from.toString()) - } else { + } else if (topicSet != null) { // unsubscribe from existing topic topicSet.delete(from.toString()) + if (topicSet.size === 0) { + this.topics.delete(topic) + } } // TODO: rust-libp2p has A LOT more logic here diff --git a/test/gossip.spec.ts b/test/gossip.spec.ts index e656f4d1..023f7118 100644 --- a/test/gossip.spec.ts +++ b/test/gossip.spec.ts @@ -269,6 +269,86 @@ describe('gossip', () => { expect(peerInfoB?.tags.get(topic)).to.be.undefined() }) + it('should delete empty topic entries after remote unsubscribe', async function () { + this.timeout(10e4) + const nodeA = nodes[0] + const nodeB = nodes[1] + const topic = 'empty-topic-cleanup' + + await connectAllPubSubNodes([nodeA, nodeB]) + + nodeA.pubsub.subscribe(topic) + await pEvent(nodeB.pubsub, 'subscription-change') + + expect((nodeB.pubsub as any).topics.has(topic)).to.be.true() + + nodeA.pubsub.unsubscribe(topic) + await pEvent(nodeB.pubsub, 'subscription-change') + + expect(nodeB.pubsub.getSubscribers(topic)).to.be.empty() + expect((nodeB.pubsub as any).topics.has(topic)).to.be.false() + }) + + it('should delete empty topic entries after peer disconnect', async function () { + this.timeout(10e4) + const nodeA = nodes[0] + const nodeB = nodes[1] + const topic = 'disconnect-topic-cleanup' + + await connectAllPubSubNodes([nodeA, nodeB]) + + nodeA.pubsub.subscribe(topic) + await pEvent(nodeB.pubsub, 'subscription-change') + + expect((nodeB.pubsub as any).topics.has(topic)).to.be.true() + + ;(nodeB.pubsub as any).onPeerDisconnected(nodeA.components.peerId) + + expect(nodeB.pubsub.getSubscribers(topic)).to.be.empty() + expect((nodeB.pubsub as any).topics.has(topic)).to.be.false() + }) + + it('should not create empty topic entries from unsubscribe-only updates', async function () { + this.timeout(10e4) + const nodeA = nodes[0] + const nodeB = nodes[1] + const topic = 'unsubscribe-only-cleanup' + + await connectAllPubSubNodes([nodeA, nodeB]) + + ;(nodeB.pubsub as any).handleReceivedSubscription(nodeA.components.peerId, topic, false) + + expect((nodeB.pubsub as any).topics.has(topic)).to.be.false() + expect(nodeB.pubsub.getSubscribers(topic)).to.be.empty() + }) + + it('should keep topic entries while other peers remain subscribed', async function () { + this.timeout(10e4) + const nodeA = nodes[0] + const nodeB = nodes[1] + const nodeC = nodes[2] + const topic = 'multi-peer-topic-cleanup' + + await connectAllPubSubNodes([nodeA, nodeB, nodeC]) + + nodeA.pubsub.subscribe(topic) + await pEvent(nodeB.pubsub, 'subscription-change') + + nodeC.pubsub.subscribe(topic) + await pEvent(nodeB.pubsub, 'subscription-change') + + expect((nodeB.pubsub as any).topics.has(topic)).to.be.true() + expect(nodeB.pubsub.getSubscribers(topic)).to.have.lengthOf(2) + + nodeA.pubsub.unsubscribe(topic) + await pEvent(nodeB.pubsub, 'subscription-change') + + expect((nodeB.pubsub as any).topics.has(topic)).to.be.true() + const subscribers = nodeB.pubsub.getSubscribers(topic).map((p) => p.toString()) + expect(subscribers).to.have.lengthOf(1) + expect(subscribers).to.include(nodeC.components.peerId.toString()) + }) + it('should reject incoming messages bigger than maxInboundDataLength limit', async function () { this.timeout(10e4) const nodeA = nodes[0]