Skip to content

feat: allow early muxer selection by connection encrypters #3022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/interface-compliance-tests/src/mocks/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { setMaxListeners } from '@libp2p/interface'
import { anySignal } from 'any-signal'
import { mockConnection } from './connection.js'
import type { Libp2pEvents, Connection, MultiaddrConnection, TypedEventTarget, Upgrader, UpgraderOptions, ClearableSignal } from '@libp2p/interface'
import type { Libp2pEvents, Connection, MultiaddrConnection, TypedEventTarget, Upgrader, UpgraderOptions, ClearableSignal, ConnectionEncrypter, StreamMuxerFactory } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'

export interface MockUpgraderInit {
Expand Down Expand Up @@ -49,6 +49,14 @@ class MockUpgrader implements Upgrader {

return output
}

getConnectionEncrypters (): Map<string, ConnectionEncrypter<unknown>> {
return new Map()
}

getStreamMuxers (): Map<string, StreamMuxerFactory> {
return new Map()
}
}

export function mockUpgrader (init: MockUpgraderInit = {}): Upgrader {
Expand Down
9 changes: 8 additions & 1 deletion packages/interface/src/connection-encrypter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { MultiaddrConnection } from './connection.js'
import type { AbortOptions } from './index.js'
import type { AbortOptions, StreamMuxerFactory } from './index.js'
import type { PeerId } from './peer-id.js'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
Expand Down Expand Up @@ -39,4 +39,11 @@ export interface SecuredConnection<Stream = any, Extension = unknown> {
conn: Stream
remoteExtensions?: Extension
remotePeer: PeerId

/**
* Some encryption protocols allow negotiating application protocols as part
* of the initial handshake. Where we are able to negotiated a stream muxer
* for the connection it will be returned here.
*/
streamMuxer?: StreamMuxerFactory
}
12 changes: 11 additions & 1 deletion packages/interface/src/transport.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Connection, ConnectionLimits, MultiaddrConnection } from './connection.js'
import type { TypedEventTarget } from './event-target.js'
import type { AbortOptions, ClearableSignal } from './index.js'
import type { AbortOptions, ClearableSignal, ConnectionEncrypter } from './index.js'
import type { StreamMuxerFactory } from './stream-muxer.js'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressOptions, ProgressEvent } from 'progress-events'
Expand Down Expand Up @@ -159,4 +159,14 @@ export interface Upgrader {
* controller to `upgradeInbound`.
*/
createInboundAbortSignal (signal: AbortSignal): ClearableSignal

/**
* Returns configured stream muxers
*/
getStreamMuxers (): Map<string, StreamMuxerFactory>

/**
* Returns configured connection encrypters
*/
getConnectionEncrypters (): Map<string, ConnectionEncrypter>
}
19 changes: 14 additions & 5 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ function countStreams (protocol: string, direction: 'inbound' | 'outbound', conn
return streamCount
}

export interface DefaultUpgraderComponents {
export interface UpgraderComponents {
peerId: PeerId
metrics?: Metrics
connectionManager: ConnectionManager
Expand All @@ -115,7 +115,7 @@ export interface DefaultUpgraderComponents {
type ConnectionDeniedType = keyof Pick<ConnectionGater, 'denyOutboundConnection' | 'denyInboundEncryptedConnection' | 'denyOutboundEncryptedConnection' | 'denyInboundUpgradedConnection' | 'denyOutboundUpgradedConnection'>

export class Upgrader implements UpgraderInterface {
private readonly components: DefaultUpgraderComponents
private readonly components: UpgraderComponents
private readonly connectionEncrypters: Map<string, ConnectionEncrypter>
private readonly streamMuxers: Map<string, StreamMuxerFactory>
private readonly inboundUpgradeTimeout: number
Expand All @@ -127,7 +127,7 @@ export class Upgrader implements UpgraderInterface {
errors?: CounterGroup<'inbound' | 'outbound'>
}

constructor (components: DefaultUpgraderComponents, init: UpgraderInit) {
constructor (components: UpgraderComponents, init: UpgraderInit) {
this.components = components
this.connectionEncrypters = new Map()

Expand Down Expand Up @@ -286,7 +286,8 @@ export class Upgrader implements UpgraderInterface {
({
conn: encryptedConn,
remotePeer,
protocol: cryptoProtocol
protocol: cryptoProtocol,
streamMuxer: muxerFactory
} = await (direction === 'inbound'
? this._encryptInbound(protectedConn, opts)
: this._encryptOutbound(protectedConn, opts)
Expand Down Expand Up @@ -322,7 +323,7 @@ export class Upgrader implements UpgraderInterface {
upgradedConn = encryptedConn
if (opts?.muxerFactory != null) {
muxerFactory = opts.muxerFactory
} else if (this.streamMuxers.size > 0) {
} else if (muxerFactory == null && this.streamMuxers.size > 0) {
opts?.onProgress?.(new CustomProgressEvent(`upgrader:multiplex-${direction}-connection`))

// Multiplex the connection
Expand Down Expand Up @@ -746,4 +747,12 @@ export class Upgrader implements UpgraderInterface {
throw new MuxerUnavailableError(String(err))
}
}

getConnectionEncrypters (): Map<string, ConnectionEncrypter<unknown>> {
return this.connectionEncrypters
}

getStreamMuxers (): Map<string, StreamMuxerFactory> {
return this.streamMuxers
}
}
120 changes: 103 additions & 17 deletions packages/libp2p/test/upgrading/upgrader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import { pEvent } from 'p-event'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { Upgrader, type UpgraderInit } from '../../src/upgrader.js'
import { Upgrader } from '../../src/upgrader.js'
import { createDefaultUpgraderComponents } from './utils.js'
import type { UpgraderComponents, UpgraderInit } from '../../src/upgrader.js'
import type { ConnectionEncrypter, StreamMuxerFactory, MultiaddrConnection, StreamMuxer, ConnectionProtector, PeerId, SecuredConnection, Stream, StreamMuxerInit, Connection } from '@libp2p/interface'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'

describe('upgrader', () => {
let components: UpgraderComponents
let init: UpgraderInit
const encrypterProtocol = '/test-encrypter'
const muxerProtocol = '/test-muxer'
Expand All @@ -36,6 +38,7 @@ describe('upgrader', () => {
beforeEach(async () => {
remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
remoteAddr = multiaddr(`/ip4/123.123.123.123/tcp/1234/p2p/${remotePeer}`)
components = await createDefaultUpgraderComponents()

init = {
connectionEncrypters: [
Expand Down Expand Up @@ -76,7 +79,7 @@ describe('upgrader', () => {
})

it('should upgrade outbound with valid muxers and crypto', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), init)
const upgrader = new Upgrader(components, init)
const conn = await upgrader.upgradeOutbound(maConn, {
signal: AbortSignal.timeout(5_000)
})
Expand All @@ -85,7 +88,7 @@ describe('upgrader', () => {
})

it('should upgrade outbound with only crypto', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
streamMuxers: []
})
Expand All @@ -102,9 +105,10 @@ describe('upgrader', () => {
const connectionProtector = stubInterface<ConnectionProtector>()
connectionProtector.protect.callsFake(async (conn) => conn)

const upgrader = new Upgrader(await createDefaultUpgraderComponents({
const upgrader = new Upgrader({
...components,
connectionProtector
}), init)
}, init)

await upgrader.upgradeInbound(maConn, {
signal: AbortSignal.timeout(5_000)
Expand All @@ -117,9 +121,10 @@ describe('upgrader', () => {
const connectionProtector = stubInterface<ConnectionProtector>()
connectionProtector.protect.callsFake(async (conn) => conn)

const upgrader = new Upgrader(await createDefaultUpgraderComponents({
const upgrader = new Upgrader({
...components,
connectionProtector
}), init)
}, init)

await upgrader.upgradeOutbound(maConn, {
signal: AbortSignal.timeout(5_000)
Expand All @@ -129,7 +134,7 @@ describe('upgrader', () => {
})

it('should fail inbound if crypto fails', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
connectionEncrypters: [
new BoomCrypto()
Expand All @@ -143,7 +148,7 @@ describe('upgrader', () => {
})

it('should fail outbound if crypto fails', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
connectionEncrypters: [
new BoomCrypto()
Expand All @@ -157,7 +162,7 @@ describe('upgrader', () => {
})

it('should abort if inbound upgrade is slow', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
inboundUpgradeTimeout: 100
})
Expand All @@ -174,7 +179,7 @@ describe('upgrader', () => {
})

it('should abort by signal if inbound upgrade is slow', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
inboundUpgradeTimeout: 10000
})
Expand Down Expand Up @@ -212,7 +217,7 @@ describe('upgrader', () => {
})

it('should not abort if outbound upgrade is successful', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
inboundUpgradeTimeout: 100
})
Expand Down Expand Up @@ -248,7 +253,7 @@ describe('upgrader', () => {
})

it('should not abort by signal if outbound upgrade is successful', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
inboundUpgradeTimeout: 10000
})
Expand All @@ -263,7 +268,7 @@ describe('upgrader', () => {
})

it('should abort protocol selection for slow outbound stream creation', async () => {
const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
streamMuxers: [
stubInterface<StreamMuxerFactory>({
Expand Down Expand Up @@ -298,7 +303,7 @@ describe('upgrader', () => {
it('should abort stream when protocol negotiation fails on outbound stream', async () => {
let stream: Stream | undefined

const upgrader = new Upgrader(await createDefaultUpgraderComponents(), {
const upgrader = new Upgrader(components, {
...init,
streamMuxers: [
stubInterface<StreamMuxerFactory>({
Expand Down Expand Up @@ -376,9 +381,10 @@ describe('upgrader', () => {
protocol: encrypterProtocol
})

const upgrader = new Upgrader(await createDefaultUpgraderComponents({
const upgrader = new Upgrader({
...components,
connectionProtector
}), {
}, {
...init,
connectionEncrypters: [
connectionEncrypter
Expand Down Expand Up @@ -617,4 +623,84 @@ describe('upgrader', () => {
await expect(conn.newStream(protocol, opts)).to.eventually.be.rejected
.with.property('name', 'TooManyOutboundProtocolStreamsError')
})

describe('early muxer selection', () => {
let earlyMuxerProtocol: string
let streamMuxerFactory: StreamMuxerFactory
let upgrader: Upgrader
let maConn: MultiaddrConnection
let encrypterProtocol: string

beforeEach(async () => {
encrypterProtocol = '/test-encrypt-with-early'
earlyMuxerProtocol = '/early-muxer'
streamMuxerFactory = stubInterface<StreamMuxerFactory>({
protocol: earlyMuxerProtocol,
createStreamMuxer: () => stubInterface<StreamMuxer>({
protocol: earlyMuxerProtocol,
sink: async (source) => drain(source),
source: (async function * () {})()
})
})

upgrader = new Upgrader(components, {
connectionEncrypters: [
stubInterface<ConnectionEncrypter>({
protocol: encrypterProtocol,
secureOutbound: async (connection) => ({
conn: connection,
remotePeer,
streamMuxer: streamMuxerFactory
}),
secureInbound: async (connection) => ({
conn: connection,
remotePeer,
streamMuxer: streamMuxerFactory
})
})
],
streamMuxers: [
stubInterface<StreamMuxerFactory>({
protocol: '/late-muxer',
createStreamMuxer: () => stubInterface<StreamMuxer>({
protocol: '/late-muxer',
sink: async (source) => drain(source),
source: (async function * () {})()
})
})
]
})

maConn = stubInterface<MultiaddrConnection>({
remoteAddr,
log: logger('test'),
sink: async (source) => drain(source),
source: map((async function * () {
yield '/multistream/1.0.0\n'
yield `${encrypterProtocol}\n`
})(), str => encode.single(uint8ArrayFromString(str)))
})
})

it('should allow early muxer selection on inbound connection', async () => {
const connectionPromise = pEvent<'connection:open', CustomEvent<Connection>>(components.events, 'connection:open')

await upgrader.upgradeInbound(maConn, {
signal: AbortSignal.timeout(5_000)
})

const event = await connectionPromise
const conn = event.detail

expect(conn.multiplexer).to.equal(earlyMuxerProtocol)
})

it('should allow early muxer selection on outbound connection', async () => {
const conn = await upgrader.upgradeOutbound(maConn, {
signal: AbortSignal.timeout(5_000)
})

expect(conn.multiplexer).to.equal(earlyMuxerProtocol)
})
})
})
6 changes: 3 additions & 3 deletions packages/libp2p/test/upgrading/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import { TypedEventEmitter } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { stubInterface, type StubbedInstance } from 'sinon-ts'
import type { DefaultUpgraderComponents } from '../../src/upgrader.js'
import type { UpgraderComponents } from '../../src/upgrader.js'
import type { ConnectionGater, PeerId, PeerStore, TypedEventTarget, Libp2pEvents, ComponentLogger, Metrics, ConnectionProtector } from '@libp2p/interface'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'

export interface StubbedDefaultUpgraderComponents {
export interface StubbedUpgraderComponents {
peerId: PeerId
metrics?: StubbedInstance<Metrics>
connectionManager: StubbedInstance<ConnectionManager>
Expand All @@ -21,7 +21,7 @@ export interface StubbedDefaultUpgraderComponents {
logger: ComponentLogger
}

export async function createDefaultUpgraderComponents (options?: Partial<DefaultUpgraderComponents>): Promise<StubbedDefaultUpgraderComponents> {
export async function createDefaultUpgraderComponents (options?: Partial<UpgraderComponents>): Promise<StubbedUpgraderComponents> {
return {
peerId: peerIdFromPrivateKey(await generateKeyPair('Ed25519')),
connectionManager: stubInterface<ConnectionManager>({
Expand Down