Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve disconnection handling #26

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@11labs/client",
"version": "0.0.6-beta.1",
"version": "0.0.6-beta.2",
"description": "ElevenLabs JavaScript Client Library",
"main": "./dist/lib.umd.js",
"module": "./dist/lib.module.js",
Expand Down
75 changes: 72 additions & 3 deletions packages/client/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Conversation, Mode, Status } from ".";
import { Client, Server } from "mock-socket";
import chunk from "./__tests__/chunk";

const AGENT_ID = "TEST_AGENT_ID";
const CONVERSATION_ID = "TEST_CONVERSATION_ID";
const OUTPUT_AUDIO_FORMAT = "pcm_16000";
const AGENT_RESPONSE = "Hello, how can I help you?";
Expand All @@ -16,7 +15,7 @@ const CUSTOM_LLM_EXTRA_BODY = "CUSTOM_LLM_EXTRA_BODY";

describe("Conversation", () => {
it("invokes respective callbacks", async () => {
const server = new Server("wss://api.elevenlabs.io/v1/convai/conversation");
const server = new Server("wss://api.elevenlabs.io/1");
const clientPromise = new Promise<Client>((resolve, reject) => {
server.on("connection", socket => {
resolve(socket);
Expand All @@ -34,7 +33,7 @@ describe("Conversation", () => {
let mode: Mode | null = null;

const conversationPromise = Conversation.startSession({
agentId: AGENT_ID,
signedUrl: "wss://api.elevenlabs.io/1",
overrides: {
agent: {
prompt: {
Expand Down Expand Up @@ -172,6 +171,76 @@ describe("Conversation", () => {

server.close();
});

it("throws upon immediate cancellation", async () => {
const server = new Server("wss://api.elevenlabs.io/2");
const clientPromise = new Promise<Client>((resolve, reject) => {
server.on("connection", socket => {
socket.close({
code: 3000,
reason: "Test cancellation reason",
wasClean: true,
});
resolve(socket);
});
server.on("error", reject);
setTimeout(() => reject(new Error("timeout")), 5000);
});

await expect(async () => {
await Conversation.startSession({
signedUrl: "wss://api.elevenlabs.io/2",
});
await clientPromise;
}).rejects.toThrowError(
expect.objectContaining({
code: 3000,
reason: "Test cancellation reason",
})
);
});

it("terminates when server closes connection", async () => {
const server = new Server("wss://api.elevenlabs.io/3");
const clientPromise = new Promise<Client>((resolve, reject) => {
server.on("connection", socket => resolve(socket));
server.on("error", reject);
setTimeout(() => reject(new Error("timeout")), 5000);
});

const disconnectionPromise = new Promise((resolve, reject) => {
Conversation.startSession({
signedUrl: "wss://api.elevenlabs.io/3",
onDisconnect: resolve,
});
setTimeout(() => reject(new Error("timeout")), 5000);
});

const client = await clientPromise;
client.send(
JSON.stringify({
type: "conversation_initiation_metadata",
conversation_initiation_metadata_event: {
conversation_id: CONVERSATION_ID,
agent_output_audio_format: OUTPUT_AUDIO_FORMAT,
},
})
);

client.close({
code: 3000,
reason: "Test cancellation reason",
wasClean: true,
});

const details = await disconnectionPromise;
expect(details).toEqual(
expect.objectContaining({
reason: "error",
message: "Test cancellation reason",
})
);
});
});

async function sleep(ms: number) {
Expand Down
30 changes: 16 additions & 14 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import { arrayBufferToBase64, base64ToArrayBuffer } from "./utils/audio";
import { Input, InputConfig } from "./utils/input";
import { Output } from "./utils/output";
import { Connection, SessionConfig } from "./utils/connection";
import {
Connection,
DisconnectionDetails,
OnDisconnectCallback,
SessionConfig,
} from "./utils/connection";
import {
ClientToolCallEvent,
isValidSocketEvent,
PingEvent,
} from "./utils/events";

export type { IncomingSocketEvent } from "./utils/events";
export type { SessionConfig } from "./utils/connection";

export type { SessionConfig, DisconnectionDetails } from "./utils/connection";
export type Role = "user" | "ai";
export type Mode = "speaking" | "listening";
export type Status =
Expand All @@ -34,7 +38,7 @@ export type Callbacks = {
onConnect: (props: { conversationId: string }) => void;
// internal debug events, not to be used
onDebug: (props: any) => void;
onDisconnect: () => void;
onDisconnect: OnDisconnectCallback;
onError: (message: string, context?: any) => void;
onMessage: (props: { message: string; source: Role }) => void;
onModeChange: (prop: { mode: Mode }) => void;
Expand Down Expand Up @@ -117,32 +121,28 @@ export class Conversation {
) {
this.options.onConnect({ conversationId: connection.conversationId });

this.connection.onDisconnect(this.endSessionWithDetails);
this.connection.socket.addEventListener("message", event => {
this.onEvent(event);
});
this.connection.socket.addEventListener("error", event => {
this.updateStatus("disconnected");
this.onError("Socket error", event);
});
this.connection.socket.addEventListener("close", () => {
this.updateStatus("disconnected");
this.options.onDisconnect();
});

this.input.worklet.port.onmessage = this.onInputWorkletMessage;
this.output.worklet.port.onmessage = this.onOutputWorkletMessage;
this.updateStatus("connected");
}

public endSession = async () => {
if (this.status !== "connected") return;
public endSession = () => this.endSessionWithDetails({ reason: "user" });

private endSessionWithDetails = async (details: DisconnectionDetails) => {
if (this.status !== "connected" && this.status !== "connecting") return;
this.updateStatus("disconnecting");

this.connection.close();
await this.input.close();
await this.output.close();

this.updateStatus("disconnected");
this.options.onDisconnect(details);
};

private updateMode = (mode: Mode) => {
Expand Down Expand Up @@ -376,6 +376,8 @@ export class Conversation {

public getId = () => this.connection.conversationId;

public isOpen = () => this.status === "connected";

public setVolume = ({ volume }: { volume: number }) => {
this.volume = volume;
};
Expand Down
70 changes: 68 additions & 2 deletions packages/client/src/utils/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ export type FormatConfig = {
format: "pcm" | "ulaw";
sampleRate: number;
};
export type DisconnectionDetails =
| {
reason: "error";
message: string;
context: Event;
}
| {
reason: "agent";
context: CloseEvent;
}
| {
reason: "user";
};
export type OnDisconnectCallback = (details: DisconnectionDetails) => void;

const WSS_API_ORIGIN = "wss://api.elevenlabs.io";
const WSS_API_PATHNAME = "/v1/convai/conversation?agent_id=";
Expand Down Expand Up @@ -116,7 +130,12 @@ export class Connection {
},
{ once: true }
);
socket!.addEventListener("error", reject);
socket!.addEventListener("error", event => {
// In case the error event is followed by a close event, we want the
// latter to be the one that rejects the promise as it contains more
// useful information.
setTimeout(() => reject(event), 0);
});
socket!.addEventListener("close", reject);
socket!.addEventListener(
"message",
Expand Down Expand Up @@ -155,12 +174,45 @@ export class Connection {
}
}

private disconnectionDetails: DisconnectionDetails | null = null;
private onDisconnectCallback: OnDisconnectCallback | null = null;

private constructor(
public readonly socket: WebSocket,
public readonly conversationId: string,
public readonly inputFormat: FormatConfig,
public readonly outputFormat: FormatConfig
) {}
) {
this.socket.addEventListener("error", event => {
// In case the error event is followed by a close event, we want the
// latter to be the one that disconnects the session as it contains more
// useful information.
setTimeout(
() =>
this.disconnect({
reason: "error",
message: "The connection was closed due to a socket error.",
context: event,
}),
0
);
});
this.socket.addEventListener("close", event => {
this.disconnect(
event.code === 1000
? {
reason: "agent",
context: event,
}
: {
reason: "error",
message:
event.reason || "The connection was closed by the server.",
context: event,
}
);
});
}

public close() {
this.socket.close();
Expand All @@ -169,6 +221,20 @@ export class Connection {
public sendMessage(message: OutgoingSocketEvent) {
this.socket.send(JSON.stringify(message));
}

public onDisconnect(callback: OnDisconnectCallback) {
this.onDisconnectCallback = callback;
if (this.disconnectionDetails) {
callback(this.disconnectionDetails);
}
}

private disconnect(details: DisconnectionDetails) {
if (!this.disconnectionDetails) {
this.disconnectionDetails = details;
this.onDisconnectCallback?.(details);
}
}
}

function parseFormat(format: string): FormatConfig {
Expand Down
2 changes: 1 addition & 1 deletion packages/react/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@11labs/react",
"version": "0.0.6-beta.1",
"version": "0.0.6-beta.2",
"description": "ElevenLabs React Library",
"main": "./dist/lib.umd.js",
"module": "./dist/lib.module.js",
Expand Down
10 changes: 8 additions & 2 deletions packages/react/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import {
ClientToolsConfig,
} from "@11labs/client";
import { InputConfig } from "@11labs/client/dist/utils/input";
export type { Role, Mode, Status, SessionConfig } from "@11labs/client";
export type {
Role,
Mode,
Status,
SessionConfig,
DisconnectionDetails,
} from "@11labs/client";
export { postOverallFeedback } from "@11labs/client";

export type HookOptions = Partial<
Expand Down Expand Up @@ -40,7 +46,7 @@ export function useConversation<T extends HookOptions>(defaultOptions?: T) {

return {
startSession: (async (options?: HookOptions) => {
if (conversationRef.current) {
if (conversationRef.current?.isOpen()) {
return conversationRef.current.getId();
}

Expand Down
Loading