Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit 6e5fddb

Browse files
committed
Support receive MediaStream over WebTransport datagrams.
1 parent b80c458 commit 6e5fddb

File tree

2 files changed

+237
-43
lines changed

2 files changed

+237
-43
lines changed

Diff for: src/sdk/conference/webtransport/connection.js

+153-19
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
/* eslint-disable require-jsdoc */
66
/* global Promise, Map, WebTransport, WebTransportBidirectionalStream,
7-
Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor */
7+
Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor,
8+
proto */
89

910
'use strict';
1011

@@ -44,6 +45,10 @@ export class QuicConnection extends EventDispatcher {
4445
this._transportId = this._token.transportId;
4546
this._initReceiveStreamReader();
4647
this._worker = new Worker(workerDir + '/media-worker.js', {type: 'module'});
48+
// Key is SSRC, value is a MediaStreamTrackGenerator writer.
49+
this._mstGeneratorWriters = new Map();
50+
this._initRtpModule();
51+
this._initDatagramReader();
4752
}
4853

4954
/**
@@ -77,6 +82,10 @@ export class QuicConnection extends EventDispatcher {
7782
await this._authenticate(this._tokenString);
7883
}
7984

85+
async _initRtpModule() {
86+
this._worker.postMessage(['init-rtp']);
87+
}
88+
8089
async _initReceiveStreamReader() {
8190
const receiveStreamReader =
8291
this._quicTransport.incomingBidirectionalStreams.getReader();
@@ -173,6 +182,20 @@ export class QuicConnection extends EventDispatcher {
173182
}
174183
}
175184

185+
async _initDatagramReader() {
186+
const datagramReader = this._quicTransport.datagrams.readable.getReader();
187+
console.log('Get datagram reader.');
188+
let receivingDone = false;
189+
while (!receivingDone) {
190+
const {value: datagram, done: readingDatagramsDone} =
191+
await datagramReader.read();
192+
this._worker.postMessage(['rtp-packet', datagram]);
193+
if (readingDatagramsDone) {
194+
receivingDone = true;
195+
}
196+
}
197+
}
198+
176199
_createSubscription(id, receiveStream) {
177200
// TODO: Incomplete subscription.
178201
const subscription = new Subscription(id, () => {
@@ -207,6 +230,94 @@ export class QuicConnection extends EventDispatcher {
207230
return quicStream;
208231
}
209232

233+
async bindFeedbackReader(stream, publicationId) {
234+
// The receiver side of a publication stream starts with a UUID of
235+
// publication ID, then each feedback message has a 4 bytes header indicates
236+
// its length, and followed by protobuf encoded body.
237+
const feedbackChunkReader = stream.readable.getReader();
238+
let feedbackChunksDone = false;
239+
let publicationIdOffset = 0;
240+
const headerSize=4;
241+
const header = new Uint8Array(headerSize);
242+
let headerOffset = 0;
243+
let bodySize = 0;
244+
let bodyOffset = 0;
245+
let bodyBytes;
246+
while (!feedbackChunksDone) {
247+
let valueOffset=0;
248+
const {value, done} = await feedbackChunkReader.read();
249+
Logger.debug(value);
250+
while (valueOffset < value.byteLength) {
251+
if (publicationIdOffset < uuidByteLength) {
252+
// TODO: Check publication ID matches. For now, we just skip this ID.
253+
const readLength =
254+
Math.min(uuidByteLength - publicationIdOffset, value.byteLength);
255+
valueOffset += readLength;
256+
publicationIdOffset += readLength;
257+
}
258+
if (headerOffset < headerSize) {
259+
// Read header.
260+
const copyLength = Math.min(
261+
headerSize - headerOffset, value.byteLength - valueOffset);
262+
if (copyLength === 0) {
263+
continue;
264+
}
265+
header.set(
266+
value.subarray(valueOffset, valueOffset + copyLength),
267+
headerOffset);
268+
headerOffset += copyLength;
269+
valueOffset += copyLength;
270+
if (headerOffset < headerSize) {
271+
continue;
272+
}
273+
bodySize = 0;
274+
bodyOffset = 0;
275+
for (let i = 0; i < headerSize; i++) {
276+
bodySize += (header[i] << ((headerSize - 1 - i) * 8));
277+
}
278+
bodyBytes = new Uint8Array(bodySize);
279+
Logger.debug('Body size ' + bodySize);
280+
}
281+
if (bodyOffset < bodySize) {
282+
const copyLength =
283+
Math.min(bodySize - bodyOffset, value.byteLength - valueOffset);
284+
if (copyLength === 0) {
285+
continue;
286+
}
287+
Logger.debug('Bytes for body: '+copyLength);
288+
bodyBytes.set(
289+
value.subarray(valueOffset, valueOffset + copyLength),
290+
bodyOffset);
291+
bodyOffset += copyLength;
292+
valueOffset += copyLength;
293+
if (valueOffset < bodySize) {
294+
continue;
295+
}
296+
// Decode body.
297+
const feedback = proto.owt.protobuf.Feedback.deserializeBinary(bodyBytes);
298+
this.handleFeedback(feedback, publicationId);
299+
}
300+
}
301+
if (done) {
302+
feedbackChunksDone = true;
303+
break;
304+
}
305+
}
306+
}
307+
308+
async handleFeedback(feedback, publicationId) {
309+
Logger.debug(
310+
'Key frame request type: ' +
311+
proto.owt.protobuf.Feedback.Type.KEY_FRAME_REQUEST);
312+
if (feedback.getType() ===
313+
proto.owt.protobuf.Feedback.Type.KEY_FRAME_REQUEST) {
314+
this._worker.postMessage(
315+
['rtcp-feedback', ['key-frame-request', publicationId]]);
316+
} else {
317+
Logger.warning('Unrecognized feedback type ' + feedback.getType());
318+
}
319+
}
320+
210321
async publish(stream, options) {
211322
// TODO: Avoid a stream to be published twice. The first 16 bit data send to
212323
// server must be it's publication ID.
@@ -225,6 +336,7 @@ export class QuicConnection extends EventDispatcher {
225336
for (const track of stream.stream.getTracks()) {
226337
const quicStream =
227338
await this._quicTransport.createBidirectionalStream();
339+
this.bindFeedbackReader(quicStream, publicationId);
228340
this._quicMediaStreamTracks.set(track.id, quicStream);
229341
quicStreams.push(quicStream);
230342
}
@@ -262,6 +374,7 @@ export class QuicConnection extends EventDispatcher {
262374
[
263375
'media-sender',
264376
[
377+
publicationId,
265378
track.id,
266379
track.kind,
267380
processor.readable,
@@ -317,12 +430,12 @@ export class QuicConnection extends EventDispatcher {
317430
if (typeof options !== 'object') {
318431
return Promise.reject(new TypeError('Options should be an object.'));
319432
}
320-
// if (options.audio === undefined) {
321-
// options.audio = !!stream.settings.audio;
322-
// }
323-
// if (options.video === undefined) {
324-
// options.video = !!stream.settings.video;
325-
// }
433+
if (options.audio === undefined) {
434+
options.audio = !!stream.settings.audio;
435+
}
436+
if (options.video === undefined) {
437+
options.video = !!stream.settings.video;
438+
}
326439
let mediaOptions;
327440
let dataOptions;
328441
if (options.audio || options.video) {
@@ -375,19 +488,40 @@ export class QuicConnection extends EventDispatcher {
375488
})
376489
.then((data) => {
377490
this._subscribeOptions.set(data.id, options);
378-
Logger.debug('Subscribe info is set.');
379-
if (this._quicDataStreams.has(data.id)) {
380-
// QUIC stream created before signaling returns.
381-
// TODO: Update subscription to accept list of QUIC streams.
382-
const subscription = this._createSubscription(
383-
data.id, this._quicDataStreams.get(data.id)[0]);
384-
resolve(subscription);
491+
if (dataOptions) {
492+
// A WebTransport stream is associated with a subscription for
493+
// data.
494+
if (this._quicDataStreams.has(data.id)) {
495+
// QUIC stream created before signaling returns.
496+
// TODO: Update subscription to accept list of QUIC streams.
497+
const subscription = this._createSubscription(
498+
data.id, this._quicDataStreams.get(data.id)[0]);
499+
resolve(subscription);
500+
} else {
501+
this._quicDataStreams.set(data.id, null);
502+
// QUIC stream is not created yet, resolve promise after getting
503+
// QUIC stream.
504+
this._subscribePromises.set(
505+
data.id, {resolve: resolve, reject: reject});
506+
}
385507
} else {
386-
this._quicDataStreams.set(data.id, null);
387-
// QUIC stream is not created yet, resolve promise after getting
388-
// QUIC stream.
389-
this._subscribePromises.set(
390-
data.id, {resolve: resolve, reject: reject});
508+
// A MediaStream is associated with a subscription for media.
509+
// Media packets are received over WebTransport datagram.
510+
const ms = new Module.MediaSession();
511+
console.log('Created media session.');
512+
const generators = [];
513+
for (const track of mediaOptions){
514+
const generator =
515+
new MediaStreamTrackGenerator({kind: track.type});
516+
generators.push(generator);
517+
// TODO: Update key with the correct SSRC.
518+
this._mstGeneratorWriters.set(
519+
0, generator.writable.getWriter());
520+
}
521+
const mediaStream = new MediaStream(generators);
522+
const subscription =
523+
this._createSubscription(data.id, mediaStream);
524+
resolve(subscription);
391525
}
392526
if (this._subscriptionInfoReady.has(data.id)) {
393527
this._subscriptionInfoReady.get(data.id)();

0 commit comments

Comments
 (0)