Skip to content

Commit dd71d8a

Browse files
authored
feat: allow early muxer selection by connection encrypters (#3022)
Some encrypters allow sending application protocols along with the initial handshake. We can use these to select muxers and save the round-trips of mss.
1 parent 80fe31a commit dd71d8a

File tree

6 files changed

+148
-28
lines changed

6 files changed

+148
-28
lines changed

packages/interface-compliance-tests/src/mocks/upgrader.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { setMaxListeners } from '@libp2p/interface'
22
import { anySignal } from 'any-signal'
33
import { mockConnection } from './connection.js'
4-
import type { Libp2pEvents, Connection, MultiaddrConnection, TypedEventTarget, Upgrader, UpgraderOptions, ClearableSignal } from '@libp2p/interface'
4+
import type { Libp2pEvents, Connection, MultiaddrConnection, TypedEventTarget, Upgrader, UpgraderOptions, ClearableSignal, ConnectionEncrypter, StreamMuxerFactory } from '@libp2p/interface'
55
import type { Registrar } from '@libp2p/interface-internal'
66

77
export interface MockUpgraderInit {
@@ -49,6 +49,14 @@ class MockUpgrader implements Upgrader {
4949

5050
return output
5151
}
52+
53+
getConnectionEncrypters (): Map<string, ConnectionEncrypter<unknown>> {
54+
return new Map()
55+
}
56+
57+
getStreamMuxers (): Map<string, StreamMuxerFactory> {
58+
return new Map()
59+
}
5260
}
5361

5462
export function mockUpgrader (init: MockUpgraderInit = {}): Upgrader {

packages/interface/src/connection-encrypter.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { MultiaddrConnection } from './connection.js'
2-
import type { AbortOptions } from './index.js'
2+
import type { AbortOptions, StreamMuxerFactory } from './index.js'
33
import type { PeerId } from './peer-id.js'
44
import type { Duplex } from 'it-stream-types'
55
import type { Uint8ArrayList } from 'uint8arraylist'
@@ -39,4 +39,11 @@ export interface SecuredConnection<Stream = any, Extension = unknown> {
3939
conn: Stream
4040
remoteExtensions?: Extension
4141
remotePeer: PeerId
42+
43+
/**
44+
* Some encryption protocols allow negotiating application protocols as part
45+
* of the initial handshake. Where we are able to negotiated a stream muxer
46+
* for the connection it will be returned here.
47+
*/
48+
streamMuxer?: StreamMuxerFactory
4249
}

packages/interface/src/transport.ts

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Connection, ConnectionLimits, MultiaddrConnection } from './connection.js'
22
import type { TypedEventTarget } from './event-target.js'
3-
import type { AbortOptions, ClearableSignal } from './index.js'
3+
import type { AbortOptions, ClearableSignal, ConnectionEncrypter } from './index.js'
44
import type { StreamMuxerFactory } from './stream-muxer.js'
55
import type { Multiaddr } from '@multiformats/multiaddr'
66
import type { ProgressOptions, ProgressEvent } from 'progress-events'
@@ -159,4 +159,14 @@ export interface Upgrader {
159159
* controller to `upgradeInbound`.
160160
*/
161161
createInboundAbortSignal (signal: AbortSignal): ClearableSignal
162+
163+
/**
164+
* Returns configured stream muxers
165+
*/
166+
getStreamMuxers (): Map<string, StreamMuxerFactory>
167+
168+
/**
169+
* Returns configured connection encrypters
170+
*/
171+
getConnectionEncrypters (): Map<string, ConnectionEncrypter>
162172
}

packages/libp2p/src/upgrader.ts

+14-5
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ function countStreams (protocol: string, direction: 'inbound' | 'outbound', conn
100100
return streamCount
101101
}
102102

103-
export interface DefaultUpgraderComponents {
103+
export interface UpgraderComponents {
104104
peerId: PeerId
105105
metrics?: Metrics
106106
connectionManager: ConnectionManager
@@ -115,7 +115,7 @@ export interface DefaultUpgraderComponents {
115115
type ConnectionDeniedType = keyof Pick<ConnectionGater, 'denyOutboundConnection' | 'denyInboundEncryptedConnection' | 'denyOutboundEncryptedConnection' | 'denyInboundUpgradedConnection' | 'denyOutboundUpgradedConnection'>
116116

117117
export class Upgrader implements UpgraderInterface {
118-
private readonly components: DefaultUpgraderComponents
118+
private readonly components: UpgraderComponents
119119
private readonly connectionEncrypters: Map<string, ConnectionEncrypter>
120120
private readonly streamMuxers: Map<string, StreamMuxerFactory>
121121
private readonly inboundUpgradeTimeout: number
@@ -127,7 +127,7 @@ export class Upgrader implements UpgraderInterface {
127127
errors?: CounterGroup<'inbound' | 'outbound'>
128128
}
129129

130-
constructor (components: DefaultUpgraderComponents, init: UpgraderInit) {
130+
constructor (components: UpgraderComponents, init: UpgraderInit) {
131131
this.components = components
132132
this.connectionEncrypters = new Map()
133133

@@ -286,7 +286,8 @@ export class Upgrader implements UpgraderInterface {
286286
({
287287
conn: encryptedConn,
288288
remotePeer,
289-
protocol: cryptoProtocol
289+
protocol: cryptoProtocol,
290+
streamMuxer: muxerFactory
290291
} = await (direction === 'inbound'
291292
? this._encryptInbound(protectedConn, opts)
292293
: this._encryptOutbound(protectedConn, opts)
@@ -322,7 +323,7 @@ export class Upgrader implements UpgraderInterface {
322323
upgradedConn = encryptedConn
323324
if (opts?.muxerFactory != null) {
324325
muxerFactory = opts.muxerFactory
325-
} else if (this.streamMuxers.size > 0) {
326+
} else if (muxerFactory == null && this.streamMuxers.size > 0) {
326327
opts?.onProgress?.(new CustomProgressEvent(`upgrader:multiplex-${direction}-connection`))
327328

328329
// Multiplex the connection
@@ -746,4 +747,12 @@ export class Upgrader implements UpgraderInterface {
746747
throw new MuxerUnavailableError(String(err))
747748
}
748749
}
750+
751+
getConnectionEncrypters (): Map<string, ConnectionEncrypter<unknown>> {
752+
return this.connectionEncrypters
753+
}
754+
755+
getStreamMuxers (): Map<string, StreamMuxerFactory> {
756+
return this.streamMuxers
757+
}
749758
}

packages/libp2p/test/upgrading/upgrader.spec.ts

+103-17
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import { pEvent } from 'p-event'
1313
import Sinon from 'sinon'
1414
import { stubInterface } from 'sinon-ts'
1515
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
16-
import { Upgrader, type UpgraderInit } from '../../src/upgrader.js'
16+
import { Upgrader } from '../../src/upgrader.js'
1717
import { createDefaultUpgraderComponents } from './utils.js'
18+
import type { UpgraderComponents, UpgraderInit } from '../../src/upgrader.js'
1819
import type { ConnectionEncrypter, StreamMuxerFactory, MultiaddrConnection, StreamMuxer, ConnectionProtector, PeerId, SecuredConnection, Stream, StreamMuxerInit, Connection } from '@libp2p/interface'
1920
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'
2021

2122
describe('upgrader', () => {
23+
let components: UpgraderComponents
2224
let init: UpgraderInit
2325
const encrypterProtocol = '/test-encrypter'
2426
const muxerProtocol = '/test-muxer'
@@ -36,6 +38,7 @@ describe('upgrader', () => {
3638
beforeEach(async () => {
3739
remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
3840
remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`)
41+
components = await createDefaultUpgraderComponents()
3942

4043
init = {
4144
connectionEncrypters: [
@@ -76,7 +79,7 @@ describe('upgrader', () => {
7679
})
7780

7881
it('should upgrade outbound with valid muxers and crypto', async () => {
79-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), init)
82+
const upgrader = new Upgrader(components, init)
8083
const conn = await upgrader.upgradeOutbound(maConn, {
8184
signal: AbortSignal.timeout(5_000)
8285
})
@@ -85,7 +88,7 @@ describe('upgrader', () => {
8588
})
8689

8790
it('should upgrade outbound with only crypto', async () => {
88-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
91+
const upgrader = new Upgrader(components, {
8992
...init,
9093
streamMuxers: []
9194
})
@@ -102,9 +105,10 @@ describe('upgrader', () => {
102105
const connectionProtector = stubInterface<ConnectionProtector>()
103106
connectionProtector.protect.callsFake(async (conn) => conn)
104107

105-
const upgrader = new Upgrader(await createDefaultUpgraderComponents({
108+
const upgrader = new Upgrader({
109+
...components,
106110
connectionProtector
107-
}), init)
111+
}, init)
108112

109113
await upgrader.upgradeInbound(maConn, {
110114
signal: AbortSignal.timeout(5_000)
@@ -117,9 +121,10 @@ describe('upgrader', () => {
117121
const connectionProtector = stubInterface<ConnectionProtector>()
118122
connectionProtector.protect.callsFake(async (conn) => conn)
119123

120-
const upgrader = new Upgrader(await createDefaultUpgraderComponents({
124+
const upgrader = new Upgrader({
125+
...components,
121126
connectionProtector
122-
}), init)
127+
}, init)
123128

124129
await upgrader.upgradeOutbound(maConn, {
125130
signal: AbortSignal.timeout(5_000)
@@ -129,7 +134,7 @@ describe('upgrader', () => {
129134
})
130135

131136
it('should fail inbound if crypto fails', async () => {
132-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
137+
const upgrader = new Upgrader(components, {
133138
...init,
134139
connectionEncrypters: [
135140
new BoomCrypto()
@@ -143,7 +148,7 @@ describe('upgrader', () => {
143148
})
144149

145150
it('should fail outbound if crypto fails', async () => {
146-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
151+
const upgrader = new Upgrader(components, {
147152
...init,
148153
connectionEncrypters: [
149154
new BoomCrypto()
@@ -157,7 +162,7 @@ describe('upgrader', () => {
157162
})
158163

159164
it('should abort if inbound upgrade is slow', async () => {
160-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
165+
const upgrader = new Upgrader(components, {
161166
...init,
162167
inboundUpgradeTimeout: 100
163168
})
@@ -174,7 +179,7 @@ describe('upgrader', () => {
174179
})
175180

176181
it('should abort by signal if inbound upgrade is slow', async () => {
177-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
182+
const upgrader = new Upgrader(components, {
178183
...init,
179184
inboundUpgradeTimeout: 10000
180185
})
@@ -212,7 +217,7 @@ describe('upgrader', () => {
212217
})
213218

214219
it('should not abort if outbound upgrade is successful', async () => {
215-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
220+
const upgrader = new Upgrader(components, {
216221
...init,
217222
inboundUpgradeTimeout: 100
218223
})
@@ -248,7 +253,7 @@ describe('upgrader', () => {
248253
})
249254

250255
it('should not abort by signal if outbound upgrade is successful', async () => {
251-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
256+
const upgrader = new Upgrader(components, {
252257
...init,
253258
inboundUpgradeTimeout: 10000
254259
})
@@ -263,7 +268,7 @@ describe('upgrader', () => {
263268
})
264269

265270
it('should abort protocol selection for slow outbound stream creation', async () => {
266-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
271+
const upgrader = new Upgrader(components, {
267272
...init,
268273
streamMuxers: [
269274
stubInterface<StreamMuxerFactory>({
@@ -298,7 +303,7 @@ describe('upgrader', () => {
298303
it('should abort stream when protocol negotiation fails on outbound stream', async () => {
299304
let stream: Stream | undefined
300305

301-
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
306+
const upgrader = new Upgrader(components, {
302307
...init,
303308
streamMuxers: [
304309
stubInterface<StreamMuxerFactory>({
@@ -376,9 +381,10 @@ describe('upgrader', () => {
376381
protocol: encrypterProtocol
377382
})
378383

379-
const upgrader = new Upgrader(await createDefaultUpgraderComponents({
384+
const upgrader = new Upgrader({
385+
...components,
380386
connectionProtector
381-
}), {
387+
}, {
382388
...init,
383389
connectionEncrypters: [
384390
connectionEncrypter
@@ -617,4 +623,84 @@ describe('upgrader', () => {
617623
await expect(conn.newStream(protocol, opts)).to.eventually.be.rejected
618624
.with.property('name', 'TooManyOutboundProtocolStreamsError')
619625
})
626+
627+
describe('early muxer selection', () => {
628+
let earlyMuxerProtocol: string
629+
let streamMuxerFactory: StreamMuxerFactory
630+
let upgrader: Upgrader
631+
let maConn: MultiaddrConnection
632+
let encrypterProtocol: string
633+
634+
beforeEach(async () => {
635+
encrypterProtocol = '/test-encrypt-with-early'
636+
earlyMuxerProtocol = '/early-muxer'
637+
streamMuxerFactory = stubInterface<StreamMuxerFactory>({
638+
protocol: earlyMuxerProtocol,
639+
createStreamMuxer: () => stubInterface<StreamMuxer>({
640+
protocol: earlyMuxerProtocol,
641+
sink: async (source) => drain(source),
642+
source: (async function * () {})()
643+
})
644+
})
645+
646+
upgrader = new Upgrader(components, {
647+
connectionEncrypters: [
648+
stubInterface<ConnectionEncrypter>({
649+
protocol: encrypterProtocol,
650+
secureOutbound: async (connection) => ({
651+
conn: connection,
652+
remotePeer,
653+
streamMuxer: streamMuxerFactory
654+
}),
655+
secureInbound: async (connection) => ({
656+
conn: connection,
657+
remotePeer,
658+
streamMuxer: streamMuxerFactory
659+
})
660+
})
661+
],
662+
streamMuxers: [
663+
stubInterface<StreamMuxerFactory>({
664+
protocol: '/late-muxer',
665+
createStreamMuxer: () => stubInterface<StreamMuxer>({
666+
protocol: '/late-muxer',
667+
sink: async (source) => drain(source),
668+
source: (async function * () {})()
669+
})
670+
})
671+
]
672+
})
673+
674+
maConn = stubInterface<MultiaddrConnection>({
675+
remoteAddr,
676+
log: logger('test'),
677+
sink: async (source) => drain(source),
678+
source: map((async function * () {
679+
yield '/multistream/1.0.0\n'
680+
yield `${encrypterProtocol}\n`
681+
})(), str => encode.single(uint8ArrayFromString(str)))
682+
})
683+
})
684+
685+
it('should allow early muxer selection on inbound connection', async () => {
686+
const connectionPromise = pEvent<'connection:open', CustomEvent<Connection>>(components.events, 'connection:open')
687+
688+
await upgrader.upgradeInbound(maConn, {
689+
signal: AbortSignal.timeout(5_000)
690+
})
691+
692+
const event = await connectionPromise
693+
const conn = event.detail
694+
695+
expect(conn.multiplexer).to.equal(earlyMuxerProtocol)
696+
})
697+
698+
it('should allow early muxer selection on outbound connection', async () => {
699+
const conn = await upgrader.upgradeOutbound(maConn, {
700+
signal: AbortSignal.timeout(5_000)
701+
})
702+
703+
expect(conn.multiplexer).to.equal(earlyMuxerProtocol)
704+
})
705+
})
620706
})

packages/libp2p/test/upgrading/utils.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import { TypedEventEmitter } from '@libp2p/interface'
55
import { defaultLogger } from '@libp2p/logger'
66
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
77
import { stubInterface, type StubbedInstance } from 'sinon-ts'
8-
import type { DefaultUpgraderComponents } from '../../src/upgrader.js'
8+
import type { UpgraderComponents } from '../../src/upgrader.js'
99
import type { ConnectionGater, PeerId, PeerStore, TypedEventTarget, Libp2pEvents, ComponentLogger, Metrics, ConnectionProtector } from '@libp2p/interface'
1010
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'
1111

12-
export interface StubbedDefaultUpgraderComponents {
12+
export interface StubbedUpgraderComponents {
1313
peerId: PeerId
1414
metrics?: StubbedInstance<Metrics>
1515
connectionManager: StubbedInstance<ConnectionManager>
@@ -21,7 +21,7 @@ export interface StubbedDefaultUpgraderComponents {
2121
logger: ComponentLogger
2222
}
2323

24-
export async function createDefaultUpgraderComponents (options?: Partial<DefaultUpgraderComponents>): Promise<StubbedDefaultUpgraderComponents> {
24+
export async function createDefaultUpgraderComponents (options?: Partial<UpgraderComponents>): Promise<StubbedUpgraderComponents> {
2525
return {
2626
peerId: peerIdFromPrivateKey(await generateKeyPair('Ed25519')),
2727
connectionManager: stubInterface<ConnectionManager>({

0 commit comments

Comments
 (0)