diff --git a/js/hang/src/catalog/captions.ts b/js/hang/src/catalog/audio/captions.ts similarity index 81% rename from js/hang/src/catalog/captions.ts rename to js/hang/src/catalog/audio/captions.ts index b66504735..a5a028961 100644 --- a/js/hang/src/catalog/captions.ts +++ b/js/hang/src/catalog/audio/captions.ts @@ -1,5 +1,5 @@ import { z } from "zod"; -import { TrackSchema } from "./track"; +import { TrackSchema } from "../track"; export const CaptionsSchema = z.object({ // The MoQ track information. diff --git a/js/hang/src/catalog/audio.ts b/js/hang/src/catalog/audio/index.ts similarity index 85% rename from js/hang/src/catalog/audio.ts rename to js/hang/src/catalog/audio/index.ts index 3814d6072..0d563bfad 100644 --- a/js/hang/src/catalog/audio.ts +++ b/js/hang/src/catalog/audio/index.ts @@ -1,7 +1,8 @@ import { z } from "zod"; +import { u53Schema } from "../integers"; +import { TrackSchema } from "../track"; import { CaptionsSchema } from "./captions"; -import { u53Schema } from "./integers"; -import { TrackSchema } from "./track"; +import { SpeakingSchema } from "./speaking"; // Mirrors AudioDecoderConfig // https://w3c.github.io/webcodecs/#audio-decoder-config @@ -34,6 +35,9 @@ export const AudioSchema = z.object({ // An optional captions track captions: CaptionsSchema.optional(), + + // An optional speaking track + speaking: SpeakingSchema.optional(), }); export type Audio = z.infer; diff --git a/js/hang/src/catalog/audio/speaking.ts b/js/hang/src/catalog/audio/speaking.ts new file mode 100644 index 000000000..a29d7fb57 --- /dev/null +++ b/js/hang/src/catalog/audio/speaking.ts @@ -0,0 +1,9 @@ +import { z } from "zod"; +import { TrackSchema } from "../track"; + +export const SpeakingSchema = z.object({ + // The MoQ track information. + track: TrackSchema, +}); + +export type Speaking = z.infer; diff --git a/js/hang/src/catalog/index.ts b/js/hang/src/catalog/index.ts index 1369a0ca1..0f2db785a 100644 --- a/js/hang/src/catalog/index.ts +++ b/js/hang/src/catalog/index.ts @@ -1,6 +1,7 @@ export * from "./audio"; +export * from "./audio/captions"; +export * from "./audio/speaking"; export * from "./capabilities"; -export * from "./captions"; export * from "./chat"; export * from "./detection"; export * from "./integers"; diff --git a/js/hang/src/container/bool.ts b/js/hang/src/container/bool.ts new file mode 100644 index 000000000..ca0f2481c --- /dev/null +++ b/js/hang/src/container/bool.ts @@ -0,0 +1,70 @@ +import type { GroupConsumer, GroupProducer, TrackConsumer, TrackProducer } from "@kixelated/moq"; + +// Creates a track that write a frame on true, and closes the group on false. +export class BoolProducer { + track: TrackProducer; + #group?: GroupProducer; + + constructor(track: TrackProducer) { + this.track = track; + } + + write(value: boolean) { + if (value) { + if (this.#group) return; // noop + this.#group = this.track.appendGroup(); + this.#group.writeFrame(new Uint8Array([1])); + } else { + if (!this.#group) return; // noop + this.#group.close(); + this.#group = undefined; + } + } + + clone() { + return new BoolProducer(this.track); + } + + close() { + this.track.close(); + this.#group?.close(); + this.#group = undefined; + } +} + +export class BoolConsumer { + track: TrackConsumer; + #group?: GroupConsumer; + + constructor(track: TrackConsumer) { + this.track = track; + } + + async next(): Promise { + for (;;) { + if (!this.#group) { + const group = await this.track.nextGroup(); + if (!group) return undefined; + + this.#group = group; + return true; + } + + const group = await Promise.race([this.track.nextGroup(), this.#group.closed()]); + if (group) { + this.#group = group; + continue; + } + + this.#group.close(); + this.#group = undefined; + return false; + } + } + + close() { + this.track.close(); + this.#group?.close(); + this.#group = undefined; + } +} diff --git a/js/hang/src/publish/audio/captions-worker.ts b/js/hang/src/publish/audio/captions-worker.ts index df619ddfa..d8ec9fc7c 100644 --- a/js/hang/src/publish/audio/captions-worker.ts +++ b/js/hang/src/publish/audio/captions-worker.ts @@ -1,28 +1,21 @@ -import { - AutoModel, - type AutomaticSpeechRecognitionPipeline, - type PreTrainedModel, - pipeline, - Tensor, -} from "@huggingface/transformers"; +import { type AutomaticSpeechRecognitionPipeline, pipeline } from "@huggingface/transformers"; +import type { AudioFrame } from "./capture"; -export type Request = Init; +export type Request = Init | Speaking; export interface Init { type: "init"; - - // Receive "speaking" audio directly from the VAD worker. - // TODO strongly type this, receives Speaking and NotSpeaking. + // Captured audio from the worklet. worklet: MessagePort; } -export type Result = Speaking | Text | Error; - export interface Speaking { type: "speaking"; speaking: boolean; } +export type Result = Text | Error; + export interface Text { type: "text"; text: string; @@ -35,121 +28,6 @@ export interface Error { const SAMPLE_RATE = 16000; -// This VAD model expects 512 samples at a time, or 31ms -const VAD_CHUNK_SIZE = 512; - -// Require 8 silence chunks to be detected before we consider the user done speaking. -const VAD_SILENCE_PADDING = 8; - -class Vad { - whisper: Whisper; - - // Simple circular buffer, primarily so we keep the previous buffer around once speaking is detected. - #next = new Float32Array(new ArrayBuffer(Float32Array.BYTES_PER_ELEMENT * VAD_CHUNK_SIZE), 0, 0); // queued - #current = new Float32Array(new ArrayBuffer(Float32Array.BYTES_PER_ELEMENT * VAD_CHUNK_SIZE), 0, 0); // being processed - #prev = new Float32Array(new ArrayBuffer(Float32Array.BYTES_PER_ELEMENT * VAD_CHUNK_SIZE), 0, 0); // already processed - - #processing = false; - - // Initial state for VAD - #sr = new Tensor("int64", [SAMPLE_RATE], []); - #state = new Tensor("float32", new Float32Array(2 * 1 * 128), [2, 1, 128]); - #speaking = false; - - // Count the number of silence results, if we get 3 in a row then we're done. - #silence = 0; - - #model: Promise; - - constructor(whisper: Whisper) { - this.whisper = whisper; - - this.#model = AutoModel.from_pretrained("onnx-community/silero-vad", { - // @ts-expect-error Not sure why this is needed. - config: { model_type: "custom" }, - dtype: "fp32", // Full-precision - }); - } - - write(samples: Float32Array) { - if (this.#next.byteLength >= this.#next.buffer.byteLength) { - if (!this.flush()) { - // Drop the sample if VAD is still processing. - return; - } - } - - this.#next = new Float32Array(this.#next.buffer, 0, this.#next.length + samples.length); - this.#next.set(samples, this.#next.length - samples.length); - - if (this.#next.byteLength === this.#next.buffer.byteLength) { - this.flush(); // don't care if it fails - } - } - - flush(): boolean { - if (this.#processing) { - return false; - } - - this.#processing = true; - - this.#current = this.#next; - this.#next = new Float32Array(this.#prev.buffer, 0, 0); - this.#prev = this.#current; - - this.#flush().finally(() => { - this.#processing = false; - }); - - return true; - } - - async #flush() { - const model = await this.#model; - - const input = new Tensor("float32", this.#current, [1, this.#current.length]); - const result = await model({ input, sr: this.#sr, state: this.#state }); - this.#state = result.stateN; - - const wasSpeaking = this.#speaking; - - const isSpeech = result.output.data[0]; - if (this.#speaking && isSpeech < 0.3) { - this.#silence++; - - if (this.#silence > VAD_SILENCE_PADDING) { - this.#speaking = false; - - postResult({ - type: "speaking", - speaking: false, - }); - } - } else if (!this.#speaking && isSpeech >= 0.1) { - this.#speaking = true; - this.#silence = 0; - - postResult({ - type: "speaking", - speaking: true, - }); - } - - if (!wasSpeaking && this.#speaking) { - this.whisper.write(this.#prev); - } - - if (wasSpeaking || this.#speaking) { - this.whisper.write(this.#current); - } - - if (wasSpeaking && !this.#speaking) { - this.whisper.flush(); - } - } -} - const MAX_WHISPER_BUFFER = 15 * SAMPLE_RATE; // 15 seconds class Whisper { @@ -158,6 +36,7 @@ class Whisper { #processing = false; + #speaking = false; #model: Promise; constructor() { @@ -188,7 +67,12 @@ class Whisper { } } - this.#queued = new Float32Array(this.#queued.buffer, 0, this.#queued.length + samples.length); + // Determine how many samples to keep. + // If we're not speaking, only keep the previous chunk. + // TODO add a constant to keep more. + const keep = this.#speaking ? this.#queued.length : 0; + + this.#queued = new Float32Array(this.#queued.buffer, 0, keep + samples.length); this.#queued.set(samples, this.#queued.length - samples.length); } @@ -227,16 +111,30 @@ class Whisper { text, }); } + + set speaking(speaking: boolean) { + if (this.#speaking === speaking) return; + + this.#speaking = speaking; + + if (!speaking) { + this.flush(); + } + } } +const whisper = new Whisper(); + self.addEventListener("message", async (event: MessageEvent) => { const message = event.data; - const whisper = new Whisper(); - const vad = new Vad(whisper); - message.worklet.onmessage = ({ data: samples }: MessageEvent) => { - vad.write(samples); - }; + if (message.type === "init") { + message.worklet.onmessage = ({ data: { channels } }: MessageEvent) => { + whisper.write(channels[0]); + }; + } else if (message.type === "speaking") { + whisper.speaking = message.speaking; + } }); function postResult(msg: Result) { diff --git a/js/hang/src/publish/audio/captions-worklet.ts b/js/hang/src/publish/audio/captions-worklet.ts deleted file mode 100644 index 389afa76e..000000000 --- a/js/hang/src/publish/audio/captions-worklet.ts +++ /dev/null @@ -1,14 +0,0 @@ -class Capture extends AudioWorkletProcessor { - process(input: Float32Array[][]) { - if (input.length > 1) throw new Error("only one input is supported."); - - const channels = input[0]; - if (channels.length === 0) return true; // TODO: No input hooked up? - if (channels.length !== 1) throw new Error("only one channel is supported."); - - this.port.postMessage(channels[0]); - return true; - } -} - -registerProcessor("captions", Capture); diff --git a/js/hang/src/publish/audio/captions.ts b/js/hang/src/publish/audio/captions.ts index 854f15211..f7c45d44e 100644 --- a/js/hang/src/publish/audio/captions.ts +++ b/js/hang/src/publish/audio/captions.ts @@ -4,21 +4,22 @@ import type * as Catalog from "../../catalog"; import { u8 } from "../../catalog"; import type { Audio } from "."; import type { Request, Result } from "./captions-worker"; -import CaptionsWorklet from "./captions-worklet?worker&url"; +import CaptureWorklet from "./capture-worklet?worker&url"; export type CaptionsProps = { enabled?: boolean; - ttl?: number; + transcribe?: boolean; + + // Captions are cleared after this many milliseconds. (10s default) + ttl?: DOMHighResTimeStamp; }; export class Captions { audio: Audio; // Enable caption generation via an on-device model (whisper). - // This will publish a "captions" track and set the signals `speaking` and `text`. enabled: Signal; - speaking = new Signal(undefined); text = new Signal(undefined); catalog = new Signal(undefined); @@ -66,20 +67,15 @@ export class Captions { // Handle messages from the worker worker.onmessage = ({ data }: MessageEvent) => { - if (data.type === "speaking") { - // Use heuristics to determine if we've toggled speaking or not - this.speaking.set(data.speaking); - } else if (data.type === "text") { + if (data.type === "text") { this.text.set(data.text); } else if (data.type === "error") { console.error("VAD worker error:", data.message); - this.speaking.set(undefined); this.text.set(undefined); } }; effect.cleanup(() => { - this.speaking.set(undefined); this.text.set(undefined); }); @@ -97,10 +93,10 @@ export class Captions { // The workload needs to be loaded asynchronously, unfortunately, but it should be instant. effect.spawn(async () => { - await ctx.audioWorklet.addModule(CaptionsWorklet); + await ctx.audioWorklet.addModule(CaptureWorklet); // Create the worklet. - const worklet = new AudioWorkletNode(ctx, "captions", { + const worklet = new AudioWorkletNode(ctx, "capture", { numberOfInputs: 1, numberOfOutputs: 0, channelCount: 1, @@ -117,6 +113,15 @@ export class Captions { }; worker.postMessage(init, [init.worklet]); }); + + effect.effect((nested) => { + if (!nested.get(this.audio.speaking.enabled)) { + console.warn("VAD needs to be enabled to transcribe"); + return; + } + const speaking = nested.get(this.audio.speaking.active); + worker.postMessage({ type: "speaking", speaking }); + }); } close() { diff --git a/js/hang/src/publish/audio/index.ts b/js/hang/src/publish/audio/index.ts index 1ef977f07..23cbc7c69 100644 --- a/js/hang/src/publish/audio/index.ts +++ b/js/hang/src/publish/audio/index.ts @@ -13,6 +13,7 @@ const FADE_TIME = 0.2; // Unfortunately, we need to use a Vite-exclusive import for now. import CaptureWorklet from "./capture-worklet?worker&url"; +import { Speaking, type SpeakingProps } from "./speaking"; export type AudioConstraints = Omit< MediaTrackConstraints, @@ -48,6 +49,7 @@ export type AudioProps = { muted?: boolean; volume?: number; captions?: CaptionsProps; + speaking?: SpeakingProps; // The size of each group. Larger groups mean fewer drops but the viewer can fall further behind. // NOTE: Each frame is always flushed to the network immediately. @@ -61,6 +63,7 @@ export class Audio { muted: Signal; volume: Signal; captions: Captions; + speaking: Speaking; maxLatency: DOMHighResTimeStamp; media: Signal; @@ -69,6 +72,9 @@ export class Audio { #catalog = new Signal(undefined); readonly catalog: Getter = this.#catalog; + #config = new Signal(undefined); + readonly config: Getter = this.#config; + #worklet = new Signal(undefined); #gain = new Signal(undefined); @@ -84,6 +90,7 @@ export class Audio { this.broadcast = broadcast; this.media = new Signal(props?.media); this.enabled = new Signal(props?.enabled ?? false); + this.speaking = new Speaking(this, props?.speaking); this.captions = new Captions(this, props?.captions); this.constraints = new Signal(props?.constraints); this.muted = new Signal(props?.muted ?? false); @@ -93,6 +100,7 @@ export class Audio { this.#signals.effect(this.#runSource.bind(this)); this.#signals.effect(this.#runGain.bind(this)); this.#signals.effect(this.#runEncoder.bind(this)); + this.#signals.effect(this.#runCatalog.bind(this)); } #runSource(effect: Effect): void { @@ -172,28 +180,16 @@ export class Audio { const settings = media.getSettings() as AudioTrackSettings; - // TODO don't reininalize the encoder just because the captions track changed. - const captions = effect.get(this.captions.catalog); - - const catalog: Catalog.Audio = { - track: { - name: this.#track.name, - priority: u8(this.#track.priority), - }, - config: { - // TODO get codec and description from decoderConfig - codec: "opus", - // Firefox doesn't provide the sampleRate in the settings. - sampleRate: u53(settings.sampleRate ?? worklet?.context.sampleRate), - numberOfChannels: u53(settings.channelCount), - // TODO configurable - bitrate: u53(settings.channelCount * 32_000), - }, - captions, + const config = { + // TODO get codec and description from decoderConfig + codec: "opus", + // Firefox doesn't provide the sampleRate in the settings. + sampleRate: u53(settings.sampleRate ?? worklet?.context.sampleRate), + numberOfChannels: u53(settings.channelCount), + // TODO configurable + bitrate: u53(settings.channelCount * 32_000), }; - effect.set(this.#catalog, catalog); - const encoder = new AudioEncoder({ output: (frame) => { if (frame.type !== "key") { @@ -224,8 +220,6 @@ export class Audio { this.#groupTimestamp = 0; }); - const config = catalog.config; - encoder.configure({ codec: config.codec, numberOfChannels: config.numberOfChannels, @@ -233,6 +227,8 @@ export class Audio { bitrate: config.bitrate, }); + effect.set(this.#config, config); + worklet.port.onmessage = ({ data }: { data: Capture.AudioFrame }) => { const channels = data.channels.slice(0, settings.channelCount); const joinedLength = channels.reduce((a, b) => a + b.length, 0); @@ -258,6 +254,26 @@ export class Audio { }; } + #runCatalog(effect: Effect): void { + const config = effect.get(this.#config); + if (!config) return; + + const captions = effect.get(this.captions.catalog); + const speaking = effect.get(this.speaking.catalog); + + const catalog: Catalog.Audio = { + track: { + name: this.#track.name, + priority: u8(this.#track.priority), + }, + config, + captions, + speaking, + }; + + effect.set(this.#catalog, catalog); + } + close() { this.#signals.close(); this.captions.close(); diff --git a/js/hang/src/publish/audio/speaking-worker.ts b/js/hang/src/publish/audio/speaking-worker.ts new file mode 100644 index 000000000..f573e7228 --- /dev/null +++ b/js/hang/src/publish/audio/speaking-worker.ts @@ -0,0 +1,134 @@ +import { AutoModel, type PreTrainedModel, Tensor } from "@huggingface/transformers"; +import type { AudioFrame } from "./capture"; + +export type Request = Init; + +export interface Init { + type: "init"; + // Captured audio from the worklet. + worklet: MessagePort; +} + +export type Result = Speaking | Error; + +export interface Speaking { + type: "speaking"; + speaking: boolean; +} + +export interface Error { + type: "error"; + message: string; +} + +const SAMPLE_RATE = 16000; + +// This VAD model expects 512 samples at a time, or 31ms +const VAD_CHUNK_SIZE = 512; + +// Require 8 silence chunks to be detected before we consider the user done speaking. +const VAD_SILENCE_PADDING = 8; + +class Vad { + // Simple circular buffer, primarily so we keep the previous buffer around once speaking is detected. + #next = new Float32Array(new ArrayBuffer(Float32Array.BYTES_PER_ELEMENT * VAD_CHUNK_SIZE), 0, 0); // queued + #current = new Float32Array(new ArrayBuffer(Float32Array.BYTES_PER_ELEMENT * VAD_CHUNK_SIZE), 0, 0); // being processed + #prev = new Float32Array(new ArrayBuffer(Float32Array.BYTES_PER_ELEMENT * VAD_CHUNK_SIZE), 0, 0); // already processed + + #processing = false; + + // Initial state for VAD + #sr = new Tensor("int64", [SAMPLE_RATE], []); + #state = new Tensor("float32", new Float32Array(2 * 1 * 128), [2, 1, 128]); + #speaking = false; + + // Count the number of silence results, if we get 3 in a row then we're done. + #silence = 0; + + #model: Promise; + + constructor() { + this.#model = AutoModel.from_pretrained("onnx-community/silero-vad", { + // @ts-expect-error Not sure why this is needed. + config: { model_type: "custom" }, + dtype: "fp32", // Full-precision + }); + } + + write(samples: Float32Array) { + if (this.#next.byteLength >= this.#next.buffer.byteLength) { + if (!this.flush()) { + // Drop the sample if VAD is still processing. + return; + } + } + + this.#next = new Float32Array(this.#next.buffer, 0, this.#next.length + samples.length); + this.#next.set(samples, this.#next.length - samples.length); + + if (this.#next.byteLength === this.#next.buffer.byteLength) { + this.flush(); // don't care if it fails + } + } + + flush(): boolean { + if (this.#processing) { + return false; + } + + this.#processing = true; + + this.#current = this.#next; + this.#next = new Float32Array(this.#prev.buffer, 0, 0); + this.#prev = this.#current; + + this.#flush().finally(() => { + this.#processing = false; + }); + + return true; + } + + async #flush() { + const model = await this.#model; + + const input = new Tensor("float32", this.#current, [1, this.#current.length]); + const result = await model({ input, sr: this.#sr, state: this.#state }); + this.#state = result.stateN; + + const isSpeech = result.output.data[0]; + if (this.#speaking && isSpeech < 0.3) { + this.#silence++; + + if (this.#silence > VAD_SILENCE_PADDING) { + this.#speaking = false; + + postResult({ + type: "speaking", + speaking: false, + }); + } + } else if (!this.#speaking && isSpeech >= 0.1) { + this.#speaking = true; + this.#silence = 0; + + postResult({ + type: "speaking", + speaking: true, + }); + } + } +} + +self.addEventListener("message", async (event: MessageEvent) => { + const message = event.data; + + const vad = new Vad(); + message.worklet.onmessage = ({ data: { channels } }: MessageEvent) => { + vad.write(channels[0]); + }; +}); + +function postResult(msg: Result) { + self.postMessage(msg); +} diff --git a/js/hang/src/publish/audio/speaking.ts b/js/hang/src/publish/audio/speaking.ts new file mode 100644 index 000000000..b837cce80 --- /dev/null +++ b/js/hang/src/publish/audio/speaking.ts @@ -0,0 +1,114 @@ +import * as Moq from "@kixelated/moq"; +import { Effect, Signal } from "@kixelated/signals"; +import type * as Catalog from "../../catalog"; +import { u8 } from "../../catalog"; +import { BoolProducer } from "../../container/bool"; +import type { Audio } from "."; +import CaptureWorklet from "./capture-worklet?worker&url"; +import type { Request, Result } from "./speaking-worker"; + +export type SpeakingProps = { + enabled?: boolean; +}; + +// Detects when the user is speaking. +export class Speaking { + audio: Audio; + + enabled: Signal; + + active = new Signal(false); + catalog = new Signal(undefined); + + signals = new Effect(); + + #bool = new BoolProducer(new Moq.TrackProducer("speaking.bool", 1)); + + constructor(audio: Audio, props?: SpeakingProps) { + this.audio = audio; + this.enabled = new Signal(props?.enabled ?? false); + this.signals.effect(this.#run.bind(this)); + } + + #run(effect: Effect): void { + if (!effect.get(this.enabled)) return; + + const media = effect.get(this.audio.media); + if (!media) return; + + this.audio.broadcast.insertTrack(this.#bool.track.consume()); + effect.cleanup(() => this.audio.broadcast.removeTrack(this.#bool.track.name)); + + const catalog: Catalog.Speaking = { + track: { + name: this.#bool.track.name, + priority: u8(this.#bool.track.priority), + }, + }; + effect.set(this.catalog, catalog); + + // Create a nested effect to avoid recreating the track every time the speaking changes. + effect.effect((nested) => { + const active = nested.get(this.active); + this.#bool.write(active); + }); + + const worker = new Worker(new URL("./speaking-worker", import.meta.url), { type: "module" }); + effect.cleanup(() => worker.terminate()); + + // Handle messages from the worker + worker.onmessage = ({ data }: MessageEvent) => { + if (data.type === "speaking") { + // Use heuristics to determine if we've toggled speaking or not + this.active.set(data.speaking); + } else if (data.type === "error") { + console.error("VAD worker error:", data.message); + this.active.set(false); + } + }; + + effect.cleanup(() => { + this.active.set(false); + }); + + const ctx = new AudioContext({ + latencyHint: "interactive", + sampleRate: 16000, // required by the model. + }); + effect.cleanup(() => ctx.close()); + + // Create the source node. + const root = new MediaStreamAudioSourceNode(ctx, { + mediaStream: new MediaStream([media]), + }); + effect.cleanup(() => root.disconnect()); + + // The workload needs to be loaded asynchronously, unfortunately, but it should be instant. + effect.spawn(async () => { + await ctx.audioWorklet.addModule(CaptureWorklet); + + // Create the worklet. + const worklet = new AudioWorkletNode(ctx, "capture", { + numberOfInputs: 1, + numberOfOutputs: 0, + channelCount: 1, + channelCountMode: "explicit", + channelInterpretation: "discrete", + }); + effect.cleanup(() => worklet.disconnect()); + + root.connect(worklet); + + const init: Request = { + type: "init", + worklet: worklet.port, + }; + worker.postMessage(init, [init.worklet]); + }); + } + + close() { + this.signals.close(); + this.#bool.close(); + } +} diff --git a/js/hang/src/publish/element.ts b/js/hang/src/publish/element.ts index 8a34f0347..08ab05e8a 100644 --- a/js/hang/src/publish/element.ts +++ b/js/hang/src/publish/element.ts @@ -4,14 +4,13 @@ import * as DOM from "@kixelated/signals/dom"; import { Connection } from "../connection"; import { Broadcast, type Device } from "./broadcast"; -const OBSERVED = ["url", "name", "device", "audio", "video", "controls", "transcribe", "captions"] as const; +const OBSERVED = ["url", "name", "device", "audio", "video", "controls", "captions"] as const; type Observed = (typeof OBSERVED)[number]; export default class HangPublish extends HTMLElement { static observedAttributes = OBSERVED; #controls = new Signal(false); - #captions = new Signal(false); connection: Connection; broadcast: Broadcast; @@ -71,8 +70,6 @@ export default class HangPublish extends HTMLElement { this.video = newValue !== null; } else if (name === "controls") { this.controls = newValue !== null; - } else if (name === "transcribe") { - this.transcribe = newValue !== null; } else if (name === "captions") { this.captions = newValue !== null; } else { @@ -129,29 +126,13 @@ export default class HangPublish extends HTMLElement { this.#controls.set(controls); } - get transcribe(): boolean { - return this.broadcast.audio.captions.enabled.peek(); - } - - set transcribe(transcribe: boolean) { - this.broadcast.audio.captions.enabled.set(transcribe); - - if (!transcribe && this.#captions.peek()) { - // Disable captions if transcribe is disabled. - this.#captions.set(false); - } - } - get captions(): boolean { - return this.#captions.peek(); + return this.broadcast.audio.captions.enabled.peek(); } set captions(captions: boolean) { - this.#captions.set(captions); - if (captions) { - // Enable transcribe if captions are enabled. - this.broadcast.audio.captions.enabled.set(true); - } + this.broadcast.audio.captions.enabled.set(captions); + this.broadcast.audio.speaking.enabled.set(captions); } #renderControls() { @@ -192,7 +173,7 @@ export default class HangPublish extends HTMLElement { this.#signals.cleanup(() => this.removeChild(captions)); this.#signals.effect((effect) => { - const show = effect.get(this.#captions); + const show = effect.get(this.broadcast.audio.captions.enabled); if (!show) return; const leftSpacer = DOM.create("div", { @@ -209,7 +190,7 @@ export default class HangPublish extends HTMLElement { effect.effect((effect) => { const text = effect.get(this.broadcast.audio.captions.text); - const speaking = effect.get(this.broadcast.audio.captions.speaking); + const speaking = effect.get(this.broadcast.audio.speaking.active); captionText.textContent = text ?? ""; speakingIcon.textContent = speaking ? "🗣️" : " "; diff --git a/js/hang/src/watch/audio/index.ts b/js/hang/src/watch/audio/index.ts index 4d5ac0b84..05df44a15 100644 --- a/js/hang/src/watch/audio/index.ts +++ b/js/hang/src/watch/audio/index.ts @@ -8,8 +8,7 @@ import type * as Render from "./render"; export * from "./emitter"; import { Captions, type CaptionsProps } from "./captions"; -// Unfortunately, we need to use a Vite-exclusive import for now. -import RenderWorklet from "./render-worklet?worker&url"; +import { Speaking, type SpeakingProps } from "./speaking"; export type AudioProps = { // Enable to download the audio track. @@ -20,8 +19,14 @@ export type AudioProps = { // Enable to download the captions track. captions?: CaptionsProps; + + // Enable to download the speaking track. (boolean) + speaking?: SpeakingProps; }; +// Unfortunately, we need to use a Vite-exclusive import for now. +import RenderWorklet from "./render-worklet?worker&url"; + // Downloads audio from a track and emits it to an AudioContext. // The user is responsible for hooking up audio to speakers, an analyzer, etc. export class Audio { @@ -40,6 +45,7 @@ export class Audio { readonly sampleRate: Getter = this.#sampleRate; captions: Captions; + speaking: Speaking; // Not a signal because it updates constantly. #buffered: DOMHighResTimeStamp = 0; @@ -59,6 +65,7 @@ export class Audio { this.enabled = new Signal(props?.enabled ?? false); this.latency = props?.latency ?? 100; // TODO Reduce this once fMP4 stuttering is fixed. this.captions = new Captions(broadcast, this.info, props?.captions); + this.speaking = new Speaking(broadcast, this.info, props?.speaking); this.#signals.effect((effect) => { this.info.set(effect.get(this.catalog)?.audio?.[0]); @@ -195,6 +202,7 @@ export class Audio { close() { this.#signals.close(); this.captions.close(); + this.speaking.close(); } get buffered() { diff --git a/js/hang/src/watch/audio/speaking.ts b/js/hang/src/watch/audio/speaking.ts new file mode 100644 index 000000000..aa415d2fa --- /dev/null +++ b/js/hang/src/watch/audio/speaking.ts @@ -0,0 +1,64 @@ +import type * as Moq from "@kixelated/moq"; +import { Effect, type Getter, Signal } from "@kixelated/signals"; +import type * as Catalog from "../../catalog"; +import { BoolConsumer } from "../../container/bool"; + +export type SpeakingProps = { + enabled?: boolean; +}; + +export class Speaking { + broadcast: Getter; + info: Getter; + enabled: Signal; + + // Toggles true when the user is speaking. + #active = new Signal(undefined); + readonly active: Getter = this.#active; + + #signals = new Effect(); + + constructor( + broadcast: Getter, + info: Getter, + props?: SpeakingProps, + ) { + this.broadcast = broadcast; + this.info = info; + + this.enabled = new Signal(props?.enabled ?? false); + this.#signals.effect(this.#run.bind(this)); + } + + #run(effect: Effect): void { + const enabled = effect.get(this.enabled); + if (!enabled) return; + + const broadcast = effect.get(this.broadcast); + if (!broadcast) return; + + const info = effect.get(this.info); + if (!info) return; + + if (!info.speaking) return; + + const sub = broadcast.subscribe(info.speaking.track.name, info.speaking.track.priority); + effect.cleanup(() => sub.close()); + + const bool = new BoolConsumer(sub); + + effect.spawn(async (cancel) => { + for (;;) { + const speaking = await Promise.race([bool.next(), cancel]); + if (speaking === undefined) break; + + this.#active.set(speaking); + } + }); + effect.cleanup(() => this.#active.set(undefined)); + } + + close() { + this.#signals.close(); + } +} diff --git a/js/hang/src/watch/element.ts b/js/hang/src/watch/element.ts index b1fcb8b20..b68040e1d 100644 --- a/js/hang/src/watch/element.ts +++ b/js/hang/src/watch/element.ts @@ -183,6 +183,7 @@ export default class HangWatch extends HTMLElement { set captions(captions: boolean) { this.broadcast.audio.captions.enabled.set(captions); + this.broadcast.audio.speaking.enabled.set(captions); } get reload(): boolean { @@ -240,11 +241,34 @@ export default class HangWatch extends HTMLElement { const show = effect.get(this.broadcast.audio.captions.enabled); if (!show) return; - const caption = effect.get(this.broadcast.audio.captions.text); - captions.textContent = caption ?? ""; + const leftSpacer = DOM.create("div", { + style: { width: "1.5em" }, + }); + + const captionText = DOM.create("div", { + style: { textAlign: "center" }, + }); + + const speakingIcon = DOM.create("div", { + style: { width: "1.5em" }, + }); + + effect.effect((effect) => { + const text = effect.get(this.broadcast.audio.captions.text); + const speaking = effect.get(this.broadcast.audio.speaking.active); + + captionText.textContent = text ?? ""; + speakingIcon.textContent = speaking ? "🗣️" : " "; + }); + + captions.appendChild(leftSpacer); + captions.appendChild(captionText); + captions.appendChild(speakingIcon); effect.cleanup(() => { - captions.textContent = ""; + captions.removeChild(leftSpacer); + captions.removeChild(captionText); + captions.removeChild(speakingIcon); }); }); } diff --git a/js/hang/src/watch/video/index.ts b/js/hang/src/watch/video/index.ts index 990b3d703..5c3fed8fa 100644 --- a/js/hang/src/watch/video/index.ts +++ b/js/hang/src/watch/video/index.ts @@ -1,5 +1,5 @@ import type * as Moq from "@kixelated/moq"; -import { Effect, Getter, Signal } from "@kixelated/signals"; +import { Effect, type Getter, Signal } from "@kixelated/signals"; import type * as Catalog from "../../catalog"; import * as Container from "../../container"; import * as Hex from "../../util/hex"; diff --git a/js/moq/src/group.ts b/js/moq/src/group.ts index 2419a6d6c..15da0d378 100644 --- a/js/moq/src/group.ts +++ b/js/moq/src/group.ts @@ -91,6 +91,14 @@ export class GroupConsumer { return frames?.at(this.#index++); } + /** + * Returns a promise that resolves when the reader is closed. + * @returns A promise that resolves when closed + */ + async closed(): Promise { + await this.#frames.closed(); + } + /** * Closes the reader. */