Skip to content

Commit

Permalink
feat: rewrite concurrency manager (#8)
Browse files Browse the repository at this point in the history
* feat: rewrite concurrency manager

* fix: missing auth header

* feat: implement abort identifies

* feat: implement server using vanilla http

* fix: POST not DELETE op in acquire

* fix: Return no content as status message

* fix: fix start issues

* fix: fix http dont throw errors now

* fix: address is a function not a prop

* fix: force listen to localhost ipv4

* feat: add debug logs for concurrency server

* fix: make fetch aware of http or https

* fix: make https first over http

* fix: query string parsing

* chore: remove deps once again

* refactor: use event emitter raw

* fix: use NodeJS.EventEmitter for event emitter type
  • Loading branch information
Deivu authored Dec 22, 2024
1 parent 5075657 commit 9188ae3
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 59 deletions.
12 changes: 7 additions & 5 deletions src/Indomitable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Cluster, { ClusterSettings } from 'node:cluster';
import EventEmitter from 'node:events';
import Os from 'node:os';
import { clearTimeout } from 'timers';
import { ConcurrencyManager } from './concurrency/ConcurrencyManager';
import { ConcurrencyServer } from './concurrency/ConcurrencyServer';
import { ShardClient } from './client/ShardClient';
import { ClusterManager } from './manager/ClusterManager.js';
import {
Expand All @@ -20,6 +20,7 @@ import {
Sendable
} from './Util';


/**
* Options to control Indomitable behavior
*/
Expand Down Expand Up @@ -143,7 +144,7 @@ export class Indomitable extends EventEmitter {
public clusterCount: number|'auto';
public shardCount: number|'auto';
public cachedSession?: SessionObject;
public concurrencyManager?: ConcurrencyManager;
public concurrencyServer?: ConcurrencyServer;
public readonly clientOptions: DiscordJsClientOptions;
public readonly clusterSettings: ClusterSettings;
public readonly ipcTimeout: number;
Expand Down Expand Up @@ -187,7 +188,7 @@ export class Indomitable extends EventEmitter {
this.token = options.token;
this.clusters = new Map();
this.spawnQueue = [];
this.concurrencyManager = undefined;
this.concurrencyServer = undefined;
this.cachedSession = undefined;
this.busy = false;
}
Expand Down Expand Up @@ -231,8 +232,9 @@ export class Indomitable extends EventEmitter {
}
if (this.handleConcurrency) {
const sessions = await this.fetchSessions();
this.concurrencyManager = new ConcurrencyManager(sessions.session_start_limit.max_concurrency);
this.emit(LibraryEvents.DEBUG, 'Handle concurrency is currently enabled. Indomitable will automatically handle your identifies that can result to more stable connection');
this.concurrencyServer = new ConcurrencyServer(this, sessions.session_start_limit.max_concurrency);
const info = await this.concurrencyServer.start();
this.emit(LibraryEvents.DEBUG, `Handle concurrency is currently enabled! =>\n Server is currently bound to:\n Address: ${info.address}:${info.port}\n Concurrency: ${sessions.session_start_limit.max_concurrency}`);
}
if (typeof this.clusterCount !== 'number')
this.clusterCount = Os.cpus().length;
Expand Down
37 changes: 27 additions & 10 deletions src/Util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Https, { RequestOptions } from 'node:https';
import Http from 'node:http';
import { WebSocketShardEvents } from '@discordjs/ws';

/**
Expand Down Expand Up @@ -41,8 +42,6 @@ export enum InternalOps {
RESTART = 'restart',
RESTART_ALL = 'restartAll',
DESTROY_CLIENT = 'destroyClient',
REQUEST_IDENTIFY = 'requestIdentify',
CANCEL_IDENTIFY = 'cancelIdentify',
SESSION_INFO = 'sessionInfo',
PING = 'ping'
}
Expand Down Expand Up @@ -212,27 +211,42 @@ export interface SessionObject {
};
}

export interface FetchResponse {
code: number;
message: string;
body?: any;
}

/**
* Wrapper function for fetching data using HTTP
* @param url URL of resource to fetch
* @param options RequestOptions to modify behavior
* @returns A promise containing data fetched, or an error
*/
export function Fetch(url: string|URL, options: RequestOptions): Promise<any> {
export function Fetch(url: string, options: RequestOptions): Promise<FetchResponse> {
return new Promise((resolve, reject) => {
const request = Https.request(url, options, response => {
let client;

if (url.startsWith('https')) {
client = Https.request;
} else if (url.startsWith('http')) {
client = Http.request;
} else {
throw new Error('Unknown url protocol');
}

const request = client(url, options, response => {
const chunks: any[] = [];

response.on('data', chunk => chunks.push(chunk));
response.on('error', reject);
response.on('end', () => {
const code = response.statusCode ?? 500;
const body = chunks.join('');
if (code >= 200 && code <= 299)
resolve(body);
else
reject(new Error(`Response received is not ok, Status Code: ${response.statusCode}, body: ${body}`));
resolve({ code, body, message: response.statusMessage ?? '' });
});
});

request.on('error', reject);
request.end();
});
Expand All @@ -245,11 +259,14 @@ export function Fetch(url: string|URL, options: RequestOptions): Promise<any> {
*/
export async function FetchSessions(token: string): Promise<SessionObject> {
const url = new URL('https://discord.com/api/v10/gateway/bot');
const data = await Fetch(url, {
const response = await Fetch(url.toString(), {
method: 'GET',
headers: { authorization: `Bot ${token}` }
});
return JSON.parse(data);
if (response.code >= 200 && response.code <= 299)
return JSON.parse(response.body);
else
throw new Error(`Response received is not ok, code: ${response.code}`)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/client/ShardClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class ShardClient {
clientOptions.shardCount = EnvProcessData.shardCount;
if (manager.handleConcurrency) {
if (!clientOptions.ws) clientOptions.ws = {};
clientOptions.ws.buildIdentifyThrottler = () => Promise.resolve(new ConcurrencyClient(new BaseWorker()));
clientOptions.ws.buildIdentifyThrottler = () => Promise.resolve(new ConcurrencyClient());
}
this.client = new manager.client(clientOptions);
// @ts-expect-error: Override shard client util with indomitable shard client util
Expand Down
15 changes: 6 additions & 9 deletions src/concurrency/AsyncQueue.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import { EventEmitter, once } from 'events';
import { EventEmitter, once } from 'node:events';

export declare interface AsyncQueueWaitOptions {
signal?: AbortSignal | undefined;
}

export declare interface AsyncQueueEmitter extends EventEmitter {
on(event: 'resolve', listener: (message: string) => void): this;
once(event: 'resolve', listener: (message: string) => void): this;
off(event: 'resolve', listener: (event: unknown) => void): this;
}

export class AsyncQueue {
private queue: AsyncQueueEmitter[];
private readonly queue: NodeJS.EventEmitter[];
constructor() {
this.queue = [];
}
Expand All @@ -21,9 +15,11 @@ export class AsyncQueue {
}

public wait({ signal }: AsyncQueueWaitOptions): Promise<void[]> {

const next = this.remaining ? once(this.queue[this.remaining - 1], 'resolve', { signal }) : Promise.resolve([]);

const emitter: AsyncQueueEmitter = new EventEmitter();
const emitter = new EventEmitter() as NodeJS.EventEmitter;

this.queue.push(emitter);

if (signal) {
Expand All @@ -39,6 +35,7 @@ export class AsyncQueue {

public shift(): void {
const emitter = this.queue.shift();

if (typeof emitter !== 'undefined') emitter.emit('resolve');
}
}
72 changes: 45 additions & 27 deletions src/concurrency/ConcurrencyClient.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,62 @@
import { InternalOps, InternalOpsData } from '../Util';
import { BaseWorker } from '../ipc/BaseWorker';
import { Delay, Fetch } from '../Util';

/**
* Internal class that is passed to @discordjs/ws to handle concurrency
*/
export class ConcurrencyClient {
private ipc: BaseWorker;
constructor(ipc: BaseWorker) {
this.ipc = ipc;
private readonly address: string;
private readonly port: number;
private readonly password: string;

constructor() {
this.address = process.env.INDOMITABLE_CONCURRENCY_SERVER_ADDRESS!;
this.port = Number(process.env.INDOMITABLE_CONCURRENCY_SERVER_PORT!);
this.password = process.env.INDOMITABLE_CONCURRENCY_SERVER_PASSWORD!;
}

/**
* Method to try and acquire a lock for identify
* Method to try and acquire a lock for identify. This could never error or else it would hang out the whole system.
* Look at (https://github.com/discordjs/discord.js/blob/f1bce54a287eaa431ceb8b1996db87cbc6290317/packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts#L321)
* If it errors that isn't anything from websocket shard, this will have issues
*/
public async waitForIdentify(shardId: number, signal: AbortSignal): Promise<void> {
const content: InternalOpsData = {
op: InternalOps.REQUEST_IDENTIFY,
data: { shardId },
internal: true
};
const listener = () => this.abortIdentify(shardId);
const url = new URL(`http://${this.address}:${this.port}/concurrency/acquire`);
url.searchParams.append('shardId', shardId.toString());

const listener = () => {
const url = new URL(`http://${this.address}:${this.port}/concurrency/cancel`);

url.searchParams.append('shardId', shardId.toString());

Fetch(url.toString(), {
method: 'DELETE',
headers: { authorization: this.password }
}).catch(() => null);
}

try {
signal.addEventListener('abort', listener);
await this.ipc.send({ content, repliable: true });

const response = await Fetch(url.toString(), {
method: 'POST',
headers: { authorization: this.password }
});

if (response.code === 202 || response.code === 204) {
// aborted request || ok request
return;
}

if (response.code >= 400 && response.code <= 499) {
// something happened server didn't accept your req
await Delay(1000);
return;
}
} catch (_) {
// this should not happen but we just delay if it happens
await Delay(1000);
} finally {
signal.removeEventListener('abort', listener);
}
}

/**
* Aborts an acquire lock request
*/
private abortIdentify(shardId: number): void {
const content: InternalOpsData = {
op: InternalOps.CANCEL_IDENTIFY,
data: { shardId },
internal: true
};
this.ipc
.send({ content, repliable: false })
.catch(() => null);
}
}
118 changes: 118 additions & 0 deletions src/concurrency/ConcurrencyServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { AddressInfo } from 'node:net';
import { ConcurrencyManager } from './ConcurrencyManager';
import { Indomitable } from '../Indomitable';
import { LibraryEvents } from '../Util';
import Http from 'node:http';
import QueryString from 'node:querystring';

/**
* Server that handles identify locks
*/
export class ConcurrencyServer {
private readonly manager: Indomitable;
/**
* Http server of this instance
* @private
*/
private readonly server: Http.Server;
/**
* Concurrency manager for this server
* @private
*/
private readonly concurrency: ConcurrencyManager;
/**
* Randomly generated password to secure this server
* @private
*/
private readonly password: string;

constructor(manager: Indomitable, concurrency: number) {
this.manager = manager;
this.server = Http.createServer((req, res) => this.handle(req, res));
this.concurrency = new ConcurrencyManager(concurrency);
this.password = Math.random().toString(36).slice(2, 10);
}

/**
* Gets the randomly generated password for this instance
*/
public get key(): string {
return this.password;
}

/**
* Gets the address info assigned for this instance
*/
public get info(): AddressInfo {
return this.server.address() as AddressInfo;
}

/**
* Handles the incoming requests
* @param request
* @param response
* @private
*/
private async handle(request: Http.IncomingMessage, response: Http.ServerResponse): Promise<void> {
if (!request.url || request.method !== 'POST' && request.method !== 'DELETE') {
response.statusCode = 404;
response.statusMessage = 'Not Found';
return void response.end();
}

if (request.headers['authorization'] !== this.password) {
response.statusCode = 401;
response.statusMessage = 'Unauthorized';
return void response.end();
}

if (!request.url.includes('?shardId=')) {
response.statusCode = 400;
response.statusMessage = 'Bad Request';
return void response.end('Missing shardId query string');
}

const shardId = Number(request.url.split('?shardId=')[1]);

if (isNaN(shardId)) {
response.statusCode = 400;
response.statusMessage = 'Bad Request';
return void response.end('Expected shardId to be a number');
}

this.manager.emit(LibraryEvents.DEBUG, `Received a request in concurrency server! =>\n Url: ${request.url}\n Method: ${request.method}\n ShardId: ${shardId}`);

if (request.method === 'DELETE' && request.url.includes('/concurrency/cancel')) {
this.concurrency.abortIdentify(shardId);
response.statusCode = 200;
response.statusMessage = 'OK';
return void response.end();
}

if (request.method === 'POST' && request.url.includes('/concurrency/acquire')) {
try {
await this.concurrency.waitForIdentify(shardId);
response.statusCode = 204;
response.statusMessage = 'No Content';
return void response.end();
} catch (error) {
response.statusCode = 202;
response.statusMessage = 'Accepted';
return void response.end('Acquire lock cancelled');
}
}

response.statusCode = 404;
response.statusMessage = 'Not Found';
return void response.end();
}

/**
* Starts this server
*/
public start(): Promise<AddressInfo> {
return new Promise((resolve) => {
this.server.listen(0 , '127.0.0.1', () => resolve(this.info));
})
}
}
Loading

0 comments on commit 9188ae3

Please sign in to comment.