Skip to content

Commit

Permalink
refactor: rework node connect logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Deivu committed Dec 16, 2024
1 parent 670f320 commit 2207b0e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 117 deletions.
4 changes: 1 addition & 3 deletions src/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ import { NodeOption, ShoukakuOptions } from './Shoukaku';

export enum State {
CONNECTING,
NEARLY,
CONNECTED,
RECONNECTING,
DISCONNECTING,
DISCONNECTED
IDLE
}

export enum VoiceState {
Expand Down
8 changes: 4 additions & 4 deletions src/guild/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export class Connection extends EventEmitter {
this.region = null;
this.lastRegion = null;
this.serverUpdate = null;
this.state = State.DISCONNECTED;
this.state = State.IDLE;
}

/**
Expand Down Expand Up @@ -123,13 +123,13 @@ export class Connection extends EventEmitter {
* @internal
*/
public disconnect(): void {
if (this.state === State.DISCONNECTED) return;
if (this.state === State.IDLE) return;
this.channelId = null;
this.deafened = false;
this.muted = false;
this.removeAllListeners();
this.sendVoiceUpdate();
this.state = State.DISCONNECTED;
this.state = State.IDLE;
this.debug(`[Voice] -> [Node] & [Discord] : Connection Destroyed | Guild: ${this.guildId}`);
}

Expand Down Expand Up @@ -185,7 +185,7 @@ export class Connection extends EventEmitter {
}

if (!this.channelId) {
this.state = State.DISCONNECTED;
this.state = State.IDLE;
this.debug(`[Voice] <- [Discord] : Channel Disconnected | Guild: ${this.guildId}`);
}

Expand Down
190 changes: 80 additions & 110 deletions src/node/Node.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { IncomingMessage } from 'http';
import { NodeOption, Shoukaku, ShoukakuEvents } from '../Shoukaku';
import { OpCodes, ShoukakuClientInfo, State, Versions } from '../Constants';
import { TypedEventEmitter, wait } from '../Utils';
import { Rest } from './Rest';
import { PlayerUpdate, TrackEndEvent, TrackExceptionEvent, TrackStartEvent, TrackStuckEvent, WebSocketClosedEvent } from '../guild/Player';
import {IncomingMessage} from 'http';
import {NodeOption, Shoukaku, ShoukakuEvents} from '../Shoukaku';
import {OpCodes, ShoukakuClientInfo, State, Versions} from '../Constants';
import {TypedEventEmitter, wait} from '../Utils';
import {Rest} from './Rest';
import {
PlayerUpdate,
TrackEndEvent,
TrackExceptionEvent,
TrackStartEvent,
TrackStuckEvent,
WebSocketClosedEvent
} from '../guild/Player';
import Websocket from 'ws';

export interface Ready {
Expand Down Expand Up @@ -116,14 +123,14 @@ export class Node extends TypedEventEmitter<NodeEvents> {
*/
private readonly auth: string;
/**
* The number of reconnects to Lavalink
*/
public reconnects: number;
/**
* The state of this connection
*/
public state: State;
/**
* The number of reconnects to Lavalink
*/
public reconnects: number;
/**
* Statistics from Lavalink
*/
public stats: Stats | null;
Expand All @@ -139,14 +146,7 @@ export class Node extends TypedEventEmitter<NodeEvents> {
* SessionId of this Lavalink connection (not to be confused with Discord SessionId)
*/
public sessionId: string | null;
/**
* Boolean that represents if the node has initialized once
*/
protected initialized: boolean;
/**
* Boolean that represents if this connection is destroyed
*/
protected destroyed: boolean;

/**
* @param manager Shoukaku instance
* @param options Options on creating this node
Expand All @@ -164,14 +164,12 @@ export class Node extends TypedEventEmitter<NodeEvents> {
this.group = options.group;
this.auth = options.auth;
this.url = `${options.secure ? 'wss' : 'ws'}://${options.url}/v${Versions.WEBSOCKET_VERSION}/websocket`;
this.state = State.IDLE;
this.reconnects = 0;
this.state = State.DISCONNECTED;
this.stats = null;
this.info = null;
this.ws = null;
this.sessionId = null;
this.initialized = false;
this.destroyed = false;
}

/**
Expand All @@ -194,20 +192,13 @@ export class Node extends TypedEventEmitter<NodeEvents> {
return penalties;
}

/**
* If we should clean this node
* @internal @readonly
*/
private get shouldClean(): boolean {
return this.destroyed || this.reconnects >= this.manager.options.reconnectTries;
}

/**
* Connect to Lavalink
*/
public connect(): void {
if (!this.manager.id) throw new Error('Don\'t connect a node when the library is not yet ready');
if (this.destroyed) throw new Error('You can\'t re-use the same instance of a node once disconnected, please re-add the node again');
if (!this.manager.id) throw new Error('UserId missing, probably your connector is misconfigured?');

if (this.state !== State.IDLE) return;

this.state = State.CONNECTING;

Expand All @@ -218,16 +209,16 @@ export class Node extends TypedEventEmitter<NodeEvents> {
'User-Id': this.manager.id
};

if (this.sessionId)
if (this.sessionId) {
headers['Session-Id'] = this.sessionId;
if (!this.initialized)
this.initialized = true;

this.emit('debug', `[Socket] -> [${this.name}] : Connecting to ${this.url} ...`);
this.emit('debug', `[Socket] -> [${this.name}] : Session-Id is present, attempting to resume`);
}

const url = new URL(this.url);
this.ws = new Websocket(url.toString(), { headers } as Websocket.ClientOptions);

this.emit('debug', `[Socket] -> [${this.name}] : Connecting to ${this.url} ...`);

this.ws.once('upgrade', response => this.open(response));
this.ws.once('close', (...args) => this.close(...args));
this.ws.on('error', error => this.error(error));
Expand All @@ -240,8 +231,14 @@ export class Node extends TypedEventEmitter<NodeEvents> {
* @param reason Reason for disconnect
*/
public disconnect(code: number, reason?: string): void {
this.destroyed = true;
this.internalDisconnect(code, reason);
if (this.state !== State.CONNECTED && this.state !== State.CONNECTING) return;

this.state = State.DISCONNECTING;

if (this.ws)
this.ws.close(code, reason);
else
void this.close(1000, Buffer.from(reason || 'Unknown Reason', 'utf-8'));
}

/**
Expand All @@ -250,11 +247,16 @@ export class Node extends TypedEventEmitter<NodeEvents> {
* @internal
*/
private open(response: IncomingMessage): void {
this.reconnects = 0;

const resumed = response.headers['session-resumed'];

if (!resumed) {
this.sessionId = null;
}

// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
this.emit('debug', `[Socket] <-> [${this.name}] : Connection Handshake Done! ${this.url} | Resumed Header Value: ${resumed}`);
this.reconnects = 0;
this.state = State.NEARLY;
}

/**
Expand All @@ -273,17 +275,19 @@ export class Node extends TypedEventEmitter<NodeEvents> {
this.stats = json;
break;
case OpCodes.READY: {
this.state = State.CONNECTED;

if (!json.sessionId) {
this.emit('debug', `[Socket] -> [${this.name}] : No session id found from ready op? disconnecting and reconnecting to avoid issues`);
return this.internalDisconnect(1000);
return this.disconnect(1000);
}

this.sessionId = json.sessionId;

const players = [ ...this.manager.players.values() ].filter(player => player.node.name === this.name);

let resumedByLibrary = false;
if (!json.resumed && Boolean(this.initialized && (players.length && this.manager.options.resumeByLibrary))) {
if (!json.resumed && Boolean(players.length && this.manager.options.resumeByLibrary)) {
try {
await this.resumePlayers();
resumedByLibrary = true;
Expand All @@ -292,7 +296,6 @@ export class Node extends TypedEventEmitter<NodeEvents> {
}
}

this.state = State.CONNECTED;
this.emit('debug', `[Socket] -> [${this.name}] : Lavalink is ready! | Lavalink resume: ${json.resumed} | Lib resume: ${resumedByLibrary}`);
this.emit('ready', json.resumed, resumedByLibrary);

Expand Down Expand Up @@ -323,89 +326,56 @@ export class Node extends TypedEventEmitter<NodeEvents> {
* @param code Status close
* @param reason Reason for connection close
*/
private close(code: number, reason: Buffer): void {
this.emit('debug', `[Socket] <-/-> [${this.name}] : Connection Closed, Code: ${code || 'Unknown Code'}`);
private async close(code: number, reason: Buffer): Promise<void> {
this.emit('close', code, String(reason));
if (this.shouldClean)
void this.clean();
else
void this.reconnect();
}
this.emit('debug', `[Socket] <-/-> [${this.name}] : Connection Closed, Code: ${code || 'Unknown Code'}`);

/**
* To emit error events easily
* @param error error message
*/
public error(error: Error): void {
this.emit('error', error);
}
this.state = State.DISCONNECTING;

/**
* Internal disconnect function
* @internal
*/
private internalDisconnect(code: number, reason?: string): void {
if (this.destroyed) return;
if (this.reconnects >= this.manager.options.reconnectTries) {
this.sessionId = null;

this.state = State.DISCONNECTING;
let count = 0;

if (this.ws)
this.ws.close(code, reason);
else
void this.clean();
}
if (this.manager.options.moveOnDisconnect) {
count = await this.movePlayers();
}

this.ws?.removeAllListeners();
this.ws?.close();
this.ws = null;

if (!this.manager.options.resume) {
this.sessionId = null;
}

this.state = State.IDLE;

/**
* Destroys the websocket connection
* @internal
*/
private destroy(count = 0): void {
this.ws?.removeAllListeners();
this.ws?.close();
this.ws = null;
this.state = State.DISCONNECTED;
if (!this.manager.options.resume) {
this.sessionId = null;
}
if (this.shouldClean) {
this.destroyed = true;
this.sessionId = null;
this.emit('disconnect', count);
}
}

/**
* Cleans and moves players to other nodes if possible
* @internal
*/
private async clean(): Promise<void> {
const move = this.manager.options.moveOnDisconnect;
if (!move) return this.destroy();
let count = 0;
try {
count = await this.movePlayers();
} catch (error) {
this.error(error as Error);
} finally {
this.destroy(count);
return;
}
}

/**
* Reconnect to Lavalink
* @internal
*/
private async reconnect(): Promise<void> {
if (this.state === State.RECONNECTING) return;
if (this.state !== State.DISCONNECTED) this.destroy();
this.state = State.RECONNECTING;
this.reconnects++;
this.state = State.IDLE;

this.emit('reconnecting', this.manager.options.reconnectTries - this.reconnects, this.manager.options.reconnectInterval);
this.emit('debug', `[Socket] -> [${this.name}] : Reconnecting in ${this.manager.options.reconnectInterval} seconds. ${this.manager.options.reconnectTries - this.reconnects} tries left`);

await wait(this.manager.options.reconnectInterval * 1000);

this.reconnects++;

this.connect();
}

/**
* To emit error events easily
* @param error error message
*/
public error(error: Error): void {
this.emit('error', error);
}

/**
* Tries to resume the players internally
* @internal
Expand Down

0 comments on commit 2207b0e

Please sign in to comment.