Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions src/robot/client.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// @vitest-environment happy-dom

import { beforeEach, afterEach, describe, expect, it, vi } from 'vitest';
import type { Transport } from '@connectrpc/connect';
import { createRouterTransport } from '@connectrpc/connect';
import { RobotService } from '../gen/robot/v1/robot_connect';
import { RobotClient } from './client';
import * as rpcModule from '../rpc';

vi.mock('../rpc', async () => {
const actual = await vi.importActual('../rpc');
return {
...actual,
dialWebRTC: vi.fn(),
dialDirect: vi.fn(),
};
});

describe('RobotClient', () => {
describe('event listeners', () => {
let mockTransport: Transport;

let mockPeerConnection: RTCPeerConnection;
let pcAddEventListenerSpy: ReturnType<typeof vi.fn>;
let pcRemoveEventListenerSpy: ReturnType<typeof vi.fn>;

let mockDataChannel: RTCDataChannel;
let dcAddEventListenerSpy: ReturnType<typeof vi.fn>;
let dcRemoveEventListenerSpy: ReturnType<typeof vi.fn>;

let client: RobotClient;

beforeEach(() => {
pcAddEventListenerSpy = vi.fn();
pcRemoveEventListenerSpy = vi.fn();
dcAddEventListenerSpy = vi.fn();
dcRemoveEventListenerSpy = vi.fn();

mockPeerConnection = {
close: vi.fn(),
addEventListener: pcAddEventListenerSpy,
removeEventListener: pcRemoveEventListenerSpy,
iceConnectionState: 'connected',
} as unknown as RTCPeerConnection;

mockDataChannel = {
addEventListener: dcAddEventListenerSpy,
removeEventListener: dcRemoveEventListenerSpy,
readyState: 'open',
} as unknown as RTCDataChannel;

mockTransport = createRouterTransport(({ service }) => {
service(RobotService, {
resourceNames: () => ({ resources: [] }),
getOperations: () => ({ operations: [] }),
});
});

vi.mocked(rpcModule.dialWebRTC).mockResolvedValue({
transport: mockTransport,
peerConnection: mockPeerConnection,
dataChannel: mockDataChannel,
});

client = new RobotClient();
});

afterEach(() => {
vi.clearAllMocks();
});

it.each([
{
eventType: 'iceconnectionstatechange',
addSpy: () => pcAddEventListenerSpy,
removeSpy: () => pcRemoveEventListenerSpy,
description: 'peer connection iceconnectionstatechange',
},
{
eventType: 'close',
addSpy: () => dcAddEventListenerSpy,
removeSpy: () => dcRemoveEventListenerSpy,
description: 'data channel close',
},
{
eventType: 'track',
addSpy: () => pcAddEventListenerSpy,
removeSpy: () => pcRemoveEventListenerSpy,
description: 'peer connection track',
},
])(
'should remove old $description handler before adding new one',
async ({ eventType, addSpy, removeSpy }) => {
await client.dial({
host: 'test-host',
signalingAddress: 'https://test.local',
disableSessions: true,
noReconnect: true,
});

const firstCallArgs = addSpy().mock.calls.find(
(call) => call[0] === eventType
);

expect(firstCallArgs).toBeDefined();

const firstHandler = firstCallArgs?.[1];

addSpy().mockClear();
removeSpy().mockClear();

// simulate reconnection
await client.connect();

const removeCallArgs = removeSpy().mock.calls.find(
(call) => call[0] === eventType
);

const secondCallArgs = addSpy().mock.calls.find(
(call) => call[0] === eventType
);

expect(removeCallArgs).toBeDefined();
expect(removeCallArgs?.[1]).toBe(firstHandler);
expect(secondCallArgs).toBeDefined();
}
);

it.each([
{
eventType: 'iceconnectionstatechange',
addSpy: () => pcAddEventListenerSpy,
removeSpy: () => pcRemoveEventListenerSpy,
description: 'iceconnectionstatechange',
},
{
eventType: 'close',
addSpy: () => dcAddEventListenerSpy,
removeSpy: () => dcRemoveEventListenerSpy,
description: 'data channel close',
},
{
eventType: 'track',
addSpy: () => pcAddEventListenerSpy,
removeSpy: () => pcRemoveEventListenerSpy,
description: 'track',
},
])(
'should only have one $description handler at a time',
async ({ eventType, addSpy, removeSpy }) => {
await client.dial({
host: 'test-host',
signalingAddress: 'https://test.local',
disableSessions: true,
noReconnect: true,
});

const firstConnectionCalls = addSpy().mock.calls.filter(
(call) => call[0] === eventType
);

expect(firstConnectionCalls).toHaveLength(1);

// simulate reconnection
await client.connect();

const totalCalls = addSpy().mock.calls.filter(
(call) => call[0] === eventType
);
const removeCalls = removeSpy().mock.calls.filter(
(call) => call[0] === eventType
);

expect(totalCalls).toHaveLength(2);
expect(removeCalls).toHaveLength(1);
}
);

it('should not accumulate handlers over multiple reconnections', async () => {
await client.dial({
host: 'test-host',
signalingAddress: 'https://test.local',
disableSessions: true,
noReconnect: true,
});

for (let i = 0; i < 5; i += 1) {
// eslint-disable-next-line no-await-in-loop
await client.connect();
}

const iceAddCalls = pcAddEventListenerSpy.mock.calls.filter(
(call) => call[0] === 'iceconnectionstatechange'
);
const iceRemoveCalls = pcRemoveEventListenerSpy.mock.calls.filter(
(call) => call[0] === 'iceconnectionstatechange'
);

expect(iceAddCalls).toHaveLength(6);
expect(iceRemoveCalls).toHaveLength(5);
expect(iceAddCalls.length - iceRemoveCalls.length).toBe(1);
});

it('should clean up all event handlers when disconnecting', async () => {
await client.dial({
host: 'test-host',
signalingAddress: 'https://test.local',
disableSessions: true,
noReconnect: true,
});

pcRemoveEventListenerSpy.mockClear();
dcRemoveEventListenerSpy.mockClear();

await client.disconnect();

const iceRemoveCalls = pcRemoveEventListenerSpy.mock.calls.filter(
(call) => call[0] === 'iceconnectionstatechange'
);
const trackRemoveCalls = pcRemoveEventListenerSpy.mock.calls.filter(
(call) => call[0] === 'track'
);

const dcRemoveCalls = dcRemoveEventListenerSpy.mock.calls.filter(
(call) => call[0] === 'close'
);

expect(iceRemoveCalls.length).toBeGreaterThanOrEqual(1);
expect(trackRemoveCalls.length).toBeGreaterThanOrEqual(1);
expect(dcRemoveCalls.length).toBeGreaterThanOrEqual(1);
});
});
});
71 changes: 53 additions & 18 deletions src/robot/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export class RobotClient extends EventDispatcher implements Robot {
private sessionManager: SessionManager;

private peerConn: RTCPeerConnection | undefined;
private dataChannel: RTCDataChannel | undefined;

private transport: Transport | undefined;

Expand Down Expand Up @@ -245,6 +246,10 @@ export class RobotClient extends EventDispatcher implements Robot {

private currentRetryAttempt = 0;

private onICEConnectionStateChange?: () => void;
private onDataChannelClose?: (event: Event) => void;
private onTrack?: (event: RTCTrackEvent) => void;

constructor(
serviceHost?: string,
webrtcOptions?: WebRTCOptions,
Expand Down Expand Up @@ -306,6 +311,27 @@ export class RobotClient extends EventDispatcher implements Robot {
this.closed = false;
}

private cleanupEventListeners() {
if (this.peerConn && this.onICEConnectionStateChange) {
this.peerConn.removeEventListener(
'iceconnectionstatechange',
this.onICEConnectionStateChange
);

this.onICEConnectionStateChange = undefined;
}

if (this.peerConn && this.onTrack) {
this.peerConn.removeEventListener('track', this.onTrack);
this.onTrack = undefined;
}

if (this.dataChannel && this.onDataChannelClose) {
this.dataChannel.removeEventListener('close', this.onDataChannelClose);
this.onDataChannelClose = undefined;
}
}

private onDisconnect(event?: Event) {
this.emit(MachineConnectionEvent.DISCONNECTED, event ?? {});

Expand Down Expand Up @@ -638,10 +664,17 @@ export class RobotClient extends EventDispatcher implements Robot {
await this.connecting;
}

this.cleanupEventListeners();

if (this.peerConn) {
this.peerConn.close();
this.peerConn = undefined;
}

if (this.dataChannel) {
this.dataChannel = undefined;
}

this.sessionManager.reset();
this.closed = true;
this.emit(MachineConnectionEvent.DISCONNECTED, {});
Expand Down Expand Up @@ -727,12 +760,12 @@ export class RobotClient extends EventDispatcher implements Robot {
this.serviceHost !== '' && signalingAddress !== this.serviceHost
);

/*
* Lint disabled because we know that we are the only code to
* read and then write to 'peerConn', even after we have awaited/paused.
*/
this.peerConn = webRTCConn.peerConnection; // eslint-disable-line require-atomic-updates
this.peerConn.addEventListener('iceconnectionstatechange', () => {
this.peerConn = webRTCConn.peerConnection;
this.dataChannel = webRTCConn.dataChannel;

this.cleanupEventListeners();

this.onICEConnectionStateChange = () => {
/*
* TODO: are there any disconnection scenarios where we can reuse the
* same connection and restart ice?
Expand All @@ -746,17 +779,22 @@ export class RobotClient extends EventDispatcher implements Robot {
} else if (this.peerConn?.iceConnectionState === 'closed') {
this.onDisconnect();
}
});
};

this.peerConn.addEventListener(
'iceconnectionstatechange',
this.onICEConnectionStateChange
);

// There is not an iceconnectionstatechange nor connectionstatechange
// event when the peerConn closes. Instead, listen to the data channel
// closing and emit disconnect when that occurs.
webRTCConn.dataChannel.addEventListener('close', (event) => {
this.onDisconnect(event);
});
this.onDataChannelClose = (event: Event) => this.onDisconnect(event);
this.dataChannel.addEventListener('close', this.onDataChannelClose);

this.transport = webRTCConn.transport;

webRTCConn.peerConnection.addEventListener('track', (event) => {
this.onTrack = (event: RTCTrackEvent) => {
const [eventStream] = event.streams;
if (!eventStream) {
this.emit('track', event);
Expand All @@ -773,7 +811,9 @@ export class RobotClient extends EventDispatcher implements Robot {
value: resName,
});
this.emit('track', event);
});
};

this.peerConn.addEventListener('track', this.onTrack);
} else {
this.transport = await dialDirect(this.serviceHost, opts);
await this.gRPCConnectionManager.start();
Expand All @@ -795,12 +835,7 @@ export class RobotClient extends EventDispatcher implements Robot {
} finally {
this.connectResolve?.();
this.connectResolve = undefined;

/*
* Lint disabled because we know that we are the only code to
* read and then write to 'connecting', even after we have awaited/paused.
*/
this.connecting = undefined; // eslint-disable-line require-atomic-updates
this.connecting = undefined;
}
}

Expand Down