Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions agents/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,29 @@ import type { EventEmitter } from 'node:events';
import { initializeLogger, log } from './log.js';
import { Plugin } from './plugin.js';
import { version } from './version.js';
import { Worker, WorkerOptions } from './worker.js';
import { AgentServer, ServerOptions } from './worker.js';

type CliArgs = {
opts: WorkerOptions;
opts: ServerOptions;
production: boolean;
watch: boolean;
event?: EventEmitter;
room?: string;
participantIdentity?: string;
};

const runWorker = async (args: CliArgs) => {
const runServer = async (args: CliArgs) => {
initializeLogger({ pretty: !args.production, level: args.opts.logLevel });
const logger = log();

// though `production` is defined in WorkerOptions, it will always be overridden by CLI.
// though `production` is defined in ServerOptions, it will always be overridden by CLI.
const { production: _, ...opts } = args.opts; // eslint-disable-line @typescript-eslint/no-unused-vars
const worker = new Worker(new WorkerOptions({ production: args.production, ...opts }));
const server = new AgentServer(new ServerOptions({ production: args.production, ...opts }));

if (args.room) {
worker.event.once('worker_registered', () => {
server.event.once('worker_registered', () => {
logger.info(`connecting to room ${args.room}`);
worker.simulateJob(args.room!, args.participantIdentity);
server.simulateJob(args.room!, args.participantIdentity);
});
}

Expand All @@ -40,25 +40,25 @@ const runWorker = async (args: CliArgs) => {
process.exit(130); // SIGINT exit code
});
if (args.production) {
await worker.drain();
await server.drain();
}
await worker.close();
await server.close();
logger.debug('worker closed due to SIGINT.');
process.exit(130); // SIGINT exit code
});

process.once('SIGTERM', async () => {
logger.debug('SIGTERM received in CLI.');
if (args.production) {
await worker.drain();
await server.drain();
}
await worker.close();
await server.close();
logger.debug('worker closed due to SIGTERM.');
process.exit(143); // SIGTERM exit code
});

try {
await worker.run();
await server.run();
} catch {
logger.fatal('closing worker due to error.');
process.exit(1);
Expand All @@ -72,11 +72,11 @@ const runWorker = async (args: CliArgs) => {
* @example
* ```
* if (process.argv[1] === fileURLToPath(import.meta.url)) {
* cli.runApp(new WorkerOptions({ agent: import.meta.filename }));
* cli.runApp(new ServerOptions({ agent: import.meta.filename }));
* }
* ```
*/
export const runApp = (opts: WorkerOptions) => {
export const runApp = (opts: ServerOptions) => {
const program = new Command()
.name('agents')
.description('LiveKit Agents CLI')
Expand Down Expand Up @@ -127,7 +127,7 @@ export const runApp = (opts: WorkerOptions) => {
opts.apiSecret = options.apiSecret || opts.apiSecret;
opts.logLevel = options.logLevel || opts.logLevel;
opts.workerToken = options.workerToken || opts.workerToken;
runWorker({
runServer({
opts,
production: true,
watch: false,
Expand All @@ -150,7 +150,7 @@ export const runApp = (opts: WorkerOptions) => {
opts.apiSecret = options.apiSecret || opts.apiSecret;
opts.logLevel = options.logLevel || opts.logLevel;
opts.workerToken = options.workerToken || opts.workerToken;
runWorker({
runServer({
opts,
production: false,
watch: false,
Expand All @@ -175,7 +175,7 @@ export const runApp = (opts: WorkerOptions) => {
opts.apiSecret = options.apiSecret || opts.apiSecret;
opts.logLevel = options.logLevel || opts.logLevel;
opts.workerToken = options.workerToken || opts.workerToken;
runWorker({
runServer({
opts,
production: false,
watch: false,
Expand Down
9 changes: 7 additions & 2 deletions agents/src/generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
import type { JobContext, JobProcess } from './job.js';

/** @see {@link defineAgent} */
/** @see {@link defineAgentServer} */
export interface Agent {
entry: (ctx: JobContext) => Promise<void>;
prewarm?: (proc: JobProcess) => unknown;
Expand Down Expand Up @@ -33,6 +33,11 @@ export function isAgent(obj: unknown): obj is Agent {
* })
* ```
*/
export function defineAgent(agent: Agent): Agent {
export function defineAgentServer(agent: Agent): Agent {
return agent;
}

/**
* @deprecated Use {@link defineAgentServer} instead. This alias is provided for backward compatibility.
*/
export const defineAgent = defineAgentServer;
44 changes: 27 additions & 17 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const defaultRequestFunc = async (ctx: JobRequest) => {
};

// eslint-disable-next-line @typescript-eslint/no-unused-vars
const defaultCpuLoad = async (worker: Worker): Promise<number> => {
const defaultCpuLoad = async (worker: AgentServer): Promise<number> => {
return new Promise((resolve) => {
const cpus1 = os.cpus();

Expand Down Expand Up @@ -141,17 +141,17 @@ export class WorkerPermissions {
*
* This class is mostly useful in conjunction with {@link cli.runApp}.
*/
export class WorkerOptions {
export class ServerOptions {
agent: string;
requestFunc: (job: JobRequest) => Promise<void>;
loadFunc: (worker: Worker) => Promise<number>;
loadFunc: (worker: AgentServer) => Promise<number>;
loadThreshold: number;
numIdleProcesses: number;
shutdownProcessTimeout: number;
initializeProcessTimeout: number;
permissions: WorkerPermissions;
agentName: string;
workerType: JobType;
serverType: JobType;
maxRetry: number;
wsURL: string;
apiKey?: string;
Expand All @@ -175,7 +175,7 @@ export class WorkerOptions {
initializeProcessTimeout = 10 * 1000,
permissions = new WorkerPermissions(),
agentName = '',
workerType = JobType.JT_ROOM,
serverType = JobType.JT_ROOM,
maxRetry = MAX_RECONNECT_ATTEMPTS,
wsURL = 'ws://localhost:7880',
apiKey = undefined,
Expand All @@ -195,15 +195,15 @@ export class WorkerOptions {
agent: string;
requestFunc?: (job: JobRequest) => Promise<void>;
/** Called to determine the current load of the worker. Should return a value between 0 and 1. */
loadFunc?: (worker: Worker) => Promise<number>;
loadFunc?: (worker: AgentServer) => Promise<number>;
/** When the load exceeds this threshold, the worker will be marked as unavailable. */
loadThreshold?: number;
numIdleProcesses?: number;
shutdownProcessTimeout?: number;
initializeProcessTimeout?: number;
permissions?: WorkerPermissions;
agentName?: string;
workerType?: JobType;
serverType?: JobType;
maxRetry?: number;
wsURL?: string;
apiKey?: string;
Expand All @@ -228,7 +228,7 @@ export class WorkerOptions {
this.initializeProcessTimeout = initializeProcessTimeout;
this.permissions = permissions;
this.agentName = agentName;
this.workerType = workerType;
this.serverType = serverType;
this.maxRetry = maxRetry;
this.wsURL = wsURL;
this.apiKey = apiKey;
Expand Down Expand Up @@ -261,8 +261,8 @@ class PendingAssignment {
* you don't have access to a command line, such as a headless program, or one that uses Agents
* behind a wrapper.
*/
export class Worker {
#opts: WorkerOptions;
export class AgentServer {
#opts: ServerOptions;
#procPool: ProcPool;

#id = 'unregistered';
Expand All @@ -279,23 +279,23 @@ export class Worker {
#logger = log().child({ version });
#inferenceExecutor?: InferenceProcExecutor;

/** @throws {@link MissingCredentialsError} if URL, API key or API secret are missing */
constructor(opts: WorkerOptions) {
/* @throws {@link MissingCredentialsError} if URL, API key or API secret are missing */
constructor(opts: ServerOptions) {
opts.wsURL = opts.wsURL || process.env.LIVEKIT_URL || '';
opts.apiKey = opts.apiKey || process.env.LIVEKIT_API_KEY || '';
opts.apiSecret = opts.apiSecret || process.env.LIVEKIT_API_SECRET || '';

if (opts.wsURL === '')
throw new MissingCredentialsError(
'URL is required: Set LIVEKIT_URL, run with --url, or pass wsURL in WorkerOptions',
'URL is required: Set LIVEKIT_URL, run with --url, or pass wsURL in ServerOptions',
);
if (opts.apiKey === '')
throw new MissingCredentialsError(
'API Key is required: Set LIVEKIT_API_KEY, run with --api-key, or pass apiKey in WorkerOptions',
'API Key is required: Set LIVEKIT_API_KEY, run with --api-key, or pass apiKey in ServerOptions',
);
if (opts.apiSecret === '')
throw new MissingCredentialsError(
'API Secret is required: Set LIVEKIT_API_SECRET, run with --api-secret, or pass apiSecret in WorkerOptions',
'API Secret is required: Set LIVEKIT_API_SECRET, run with --api-secret, or pass apiSecret in ServerOptions',
);

if (opts.workerToken) {
Expand Down Expand Up @@ -340,7 +340,7 @@ export class Worker {
this.#opts = opts;
this.#httpServer = new HTTPServer(opts.host, opts.port, () => ({
agent_name: opts.agentName,
worker_type: JobType[opts.workerType],
worker_type: JobType[opts.serverType],
active_jobs: this.activeJobs.length,
sdk_version: version,
project_type: PROJECT_TYPE,
Expand Down Expand Up @@ -610,7 +610,7 @@ export class Worker {
message: {
case: 'register',
value: {
type: this.#opts.workerType,
type: this.#opts.serverType,
agentName: this.#opts.agentName,
allowedPermissions: new ParticipantPermission({
canPublish: this.#opts.permissions.canPublish,
Expand Down Expand Up @@ -788,3 +788,13 @@ export class Worker {
await this.#close.await;
}
}

/**
* @deprecated Use {@link AgentServer} instead. This alias is provided for backward compatibility.
*/
export const Worker = AgentServer;

/**
* @deprecated Use {@link ServerOptions} instead. This alias is provided for backward compatibility.
*/
export const WorkerOptions = ServerOptions;