Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: WebTransport stream now extends abstract stream #2514

Merged
merged 9 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions packages/transport-webtransport/.aegir.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
/* eslint-disable no-console */
import { spawn, exec } from 'child_process'
import { existsSync } from 'fs'
import { existsSync } from 'node:fs'
import os from 'node:os'
import defer from 'p-defer'

/** @type {import('aegir/types').PartialOptions} */
export default {
test: {
async before() {
async before () {
const main = os.platform() === 'win32' ? 'main.exe' : 'main'

if (!existsSync('./go-libp2p-webtransport-server/main')) {
await new Promise((resolve, reject) => {
exec('go build -o main main.go',
exec(`go build -o ${main} main.go`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these build files are being cleaned by npm run clean, which they probably should be

Copy link
Member Author

@achingbrain achingbrain Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I want to remove this transport server entirely and just rely on precomplied binaries in the interop tests because they exercise more functionality which gives us more certainty and we don't need to be recompiling this unchanging go script over and over again.

{ cwd: './go-libp2p-webtransport-server' },
(error, stdout, stderr) => {
if (error) {
Expand All @@ -21,7 +25,7 @@ export default {
})
}

const server = spawn('./main', [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' })
const server = spawn(`./${main}`, [], { cwd: './go-libp2p-webtransport-server', killSignal: 'SIGINT' })
server.stderr.on('data', (data) => {
console.log('stderr:', data.toString())
})
Expand Down Expand Up @@ -53,7 +57,7 @@ export default {
}
}
},
async after(_, { server }) {
async after (_, { server }) {
server.kill('SIGINT')
}
},
Expand Down
15 changes: 12 additions & 3 deletions packages/transport-webtransport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,34 @@
"@chainsafe/libp2p-noise": "^15.0.0",
"@libp2p/interface": "^1.3.0",
"@libp2p/peer-id": "^4.1.0",
"@libp2p/utils": "^5.3.2",
"@multiformats/multiaddr": "^12.2.1",
"@multiformats/multiaddr-matcher": "^1.2.0",
"it-stream-types": "^2.0.1",
"multiformats": "^13.1.0",
"race-signal": "^1.0.2",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.3"
},
"devDependencies": {
"@libp2p/logger": "^4.0.11",
"@libp2p/peer-id-factory": "^4.1.0",
"@noble/hashes": "^1.4.0",
"aegir": "^42.2.5",
"it-map": "^3.1.0",
"it-to-buffer": "^4.0.7",
"libp2p": "^1.4.3",
"p-defer": "^4.0.1"
"p-defer": "^4.0.1",
"p-wait-for": "^5.0.2"
},
"browser": {
"./dist/src/listener.js": "./dist/src/listener.browser.js"
"./dist/src/listener.js": "./dist/src/listener.browser.js",
"./dist/src/webtransport.js": "./dist/src/webtransport.browser.js"
},
"react-native": {
"./dist/src/listener.js": "./dist/src/listener.browser.js"
"./dist/src/listener.js": "./dist/src/listener.browser.js",
"./dist/src/webtransport.js": "./dist/src/webtransport.browser.js",
"./dist/src/utils/generate-certificates.js": "./dist/src/utils/generate-certificates.browser.js"
},
"sideEffects": false
}
175 changes: 58 additions & 117 deletions packages/transport-webtransport/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,38 @@
*/

import { noise } from '@chainsafe/libp2p-noise'
import { type Transport, transportSymbol, type CreateListenerOptions, type DialOptions, type Listener, type ComponentLogger, type Logger, type Connection, type MultiaddrConnection, type Stream, type CounterGroup, type Metrics, type PeerId, type StreamMuxerFactory, type StreamMuxerInit, type StreamMuxer } from '@libp2p/interface'
import { type Multiaddr, type AbortOptions } from '@multiformats/multiaddr'
import { AbortError, CodeError, transportSymbol } from '@libp2p/interface'
import { WebTransport as WebTransportMatcher } from '@multiformats/multiaddr-matcher'
import { webtransportBiDiStreamToStream } from './stream.js'
import { raceSignal } from 'race-signal'
import createListener from './listener.js'
import { webtransportMuxer } from './muxer.js'
import { inertDuplex } from './utils/inert-duplex.js'
import { isSubset } from './utils/is-subset.js'
import { parseMultiaddr } from './utils/parse-multiaddr.js'
import WebTransport from './webtransport.js'
import type { Transport, CreateListenerOptions, DialOptions, Listener, ComponentLogger, Logger, Connection, MultiaddrConnection, CounterGroup, Metrics, PeerId } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Source } from 'it-stream-types'
import type { MultihashDigest } from 'multiformats/hashes/interface'
import type { Uint8ArrayList } from 'uint8arraylist'

/**
* PEM format server certificate and private key
*/
export interface WebTransportCertificate {
privateKey: string
pem: string
hash: MultihashDigest<number>
secret: string
}

interface WebTransportSessionCleanup {
(metric: string): void
}

export interface WebTransportInit {
maxInboundStreams?: number
certificates?: WebTransportCertificate[]
}

export interface WebTransportComponents {
Expand All @@ -69,7 +84,9 @@
this.log = components.logger.forComponent('libp2p:webtransport')
this.components = components
this.config = {
maxInboundStreams: init.maxInboundStreams ?? 1000
...init,
maxInboundStreams: init.maxInboundStreams ?? 1000,
certificates: init.certificates ?? []
}

if (components.metrics != null) {
Expand All @@ -87,24 +104,26 @@
readonly [transportSymbol] = true

async dial (ma: Multiaddr, options: DialOptions): Promise<Connection> {
options?.signal?.throwIfAborted()
if (options?.signal?.aborted === true) {
throw new AbortError()
}

Check warning on line 109 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L108-L109

Added lines #L108 - L109 were not covered by tests

this.log('dialing %s', ma)
const localPeer = this.components.peerId
if (localPeer === undefined) {
throw new Error('Need a local peerid')
throw new CodeError('Need a local peerid', 'ERR_INVALID_PARAMETERS')

Check warning on line 114 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L114

Added line #L114 was not covered by tests
}

options = options ?? {}

const { url, certhashes, remotePeer } = parseMultiaddr(ma)

if (remotePeer == null) {
throw new Error('Need a target peerid')
throw new CodeError('Need a target peerid', 'ERR_INVALID_PARAMETERS')

Check warning on line 122 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L122

Added line #L122 was not covered by tests
}

if (certhashes.length === 0) {
throw new Error('Expected multiaddr to contain certhashes')
throw new CodeError('Expected multiaddr to contain certhashes', 'ERR_INVALID_PARAMETERS')
}

let abortListener: (() => void) | undefined
Expand Down Expand Up @@ -159,10 +178,12 @@
once: true
})

this.log('wait for session to be ready')
await Promise.race([
wt.closed,
wt.ready
])
this.log('session became ready')

ready = true
this.metrics?.dialerEvents.increment({ ready: true })
Expand All @@ -175,15 +196,19 @@
cleanUpWTSession('remote_close')
})

if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) {
if (!await raceSignal(this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes), options.signal)) {
throw new Error('Failed to authenticate webtransport')
}

if (options?.signal?.aborted === true) {
throw new AbortError()

Check warning on line 204 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L204

Added line #L204 was not covered by tests
}

this.metrics?.dialerEvents.increment({ open: true })

maConn = {
close: async () => {
this.log('Closing webtransport')
this.log('closing webtransport')
cleanUpWTSession('close')
},
abort: (err: Error) => {
Expand All @@ -201,7 +226,11 @@

authenticated = true

return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true })
return await options.upgrader.upgradeOutbound(maConn, {
skipEncryption: true,
muxerFactory: webtransportMuxer(wt, wt.incomingBidirectionalStreams.getReader(), this.components.logger, this.config),
skipProtection: true
})
} catch (err: any) {
this.log.error('caught wt session err', err)

Expand All @@ -221,11 +250,14 @@
}
}

async authenticateWebTransport (wt: InstanceType<typeof WebTransport>, localPeer: PeerId, remotePeer: PeerId, certhashes: Array<MultihashDigest<number>>): Promise<boolean> {
async authenticateWebTransport (wt: WebTransport, localPeer: PeerId, remotePeer: PeerId, certhashes: Array<MultihashDigest<number>>, signal?: AbortSignal): Promise<boolean> {
if (signal?.aborted === true) {
throw new AbortError()
}

Check warning on line 256 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L255-L256

Added lines #L255 - L256 were not covered by tests

const stream = await wt.createBidirectionalStream()
const writer = stream.writable.getWriter()
const reader = stream.readable.getReader()
await writer.ready

const duplex = {
source: (async function * () {
Expand All @@ -241,13 +273,15 @@
}
}
})(),
sink: async function (source: Source<Uint8Array | Uint8ArrayList>) {
sink: async (source: Source<Uint8Array | Uint8ArrayList>) => {
for await (const chunk of source) {
if (chunk instanceof Uint8Array) {
await writer.write(chunk)
} else {
await writer.write(chunk.subarray())
}
await raceSignal(writer.ready, signal)

const buf = chunk instanceof Uint8Array ? chunk : chunk.subarray()

writer.write(buf).catch(err => {
this.log.error('could not write chunk during authentication of WebTransport stream', err)

Check warning on line 283 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L283

Added line #L283 was not covered by tests
})
}
}
}
Expand All @@ -273,105 +307,12 @@
return true
}

webtransportMuxer (wt: WebTransport): StreamMuxerFactory {
let streamIDCounter = 0
const config = this.config
const self = this
return {
protocol: 'webtransport',
createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => {
// !TODO handle abort signal when WebTransport supports this.

if (typeof init === 'function') {
// The api docs say that init may be a function
init = { onIncomingStream: init }
}

const activeStreams: Stream[] = [];

(async function () {
//! TODO unclear how to add backpressure here?

const reader = wt.incomingBidirectionalStreams.getReader()
while (true) {
const { done, value: wtStream } = await reader.read()

if (done) {
break
}

if (activeStreams.length >= config.maxInboundStreams) {
// We've reached our limit, close this stream.
wtStream.writable.close().catch((err: Error) => {
self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`)
})
wtStream.readable.cancel().catch((err: Error) => {
self.log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`)
})
} else {
const stream = await webtransportBiDiStreamToStream(
wtStream,
String(streamIDCounter++),
'inbound',
activeStreams,
init?.onStreamEnd,
self.components.logger
)
activeStreams.push(stream)
init?.onIncomingStream?.(stream)
}
}
})().catch(() => {
this.log.error('WebTransport failed to receive incoming stream')
})

const muxer: StreamMuxer = {
protocol: 'webtransport',
streams: activeStreams,
newStream: async (name?: string): Promise<Stream> => {
const wtStream = await wt.createBidirectionalStream()

const stream = await webtransportBiDiStreamToStream(
wtStream,
String(streamIDCounter++),
init?.direction ?? 'outbound',
activeStreams,
init?.onStreamEnd,
self.components.logger
)
activeStreams.push(stream)

return stream
},

/**
* Close or abort all tracked streams and stop the muxer
*/
close: async (options?: AbortOptions) => {
this.log('Closing webtransport muxer')

await Promise.all(
activeStreams.map(async s => s.close(options))
)
},
abort: (err: Error) => {
this.log('Aborting webtransport muxer with err:', err)

for (const stream of activeStreams) {
stream.abort(err)
}
},
// This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex.
...inertDuplex()
}

return muxer
}
}
}

createListener (options: CreateListenerOptions): Listener {
throw new Error('Webtransport servers are not supported in Node or the browser')
return createListener(this.components, {
...options,
certificates: this.config.certificates,
maxInboundStreams: this.config.maxInboundStreams
})

Check warning on line 315 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L311-L315

Added lines #L311 - L315 were not covered by tests
}

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/transport-webtransport/src/listener.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import type { CreateListenerOptions, Listener } from '@libp2p/interface'

export default function createListener (options: CreateListenerOptions): Listener {
throw new Error('Not implemented')
}

Check warning on line 5 in packages/transport-webtransport/src/listener.browser.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/listener.browser.ts#L4-L5

Added lines #L4 - L5 were not covered by tests
19 changes: 19 additions & 0 deletions packages/transport-webtransport/src/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { WebTransportCertificate } from './index.js'
import type { Connection, Upgrader, Listener, CreateListenerOptions, PeerId, ComponentLogger, Metrics } from '@libp2p/interface'

export interface WebTransportListenerComponents {
peerId: PeerId
logger: ComponentLogger
metrics?: Metrics
}

export interface WebTransportListenerInit extends CreateListenerOptions {
handler?(conn: Connection): void
upgrader: Upgrader
certificates?: WebTransportCertificate[]
maxInboundStreams?: number
}

export default function createListener (components: WebTransportListenerComponents, options: WebTransportListenerInit): Listener {
throw new Error('Only supported in browsers')
}
Loading
Loading