diff --git a/agents/src/inference_runner.ts b/agents/src/inference_runner.ts index 005eb0cd..3a2cd04a 100644 --- a/agents/src/inference_runner.ts +++ b/agents/src/inference_runner.ts @@ -15,5 +15,5 @@ export abstract class InferenceRunner { } abstract initialize(): Promise; - abstract run(data: any): Promise; + abstract run(data: unknown): Promise; } diff --git a/agents/src/ipc/inference_executor.ts b/agents/src/ipc/inference_executor.ts new file mode 100644 index 00000000..c01d82f1 --- /dev/null +++ b/agents/src/ipc/inference_executor.ts @@ -0,0 +1,7 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export abstract class InferenceExecutor { + abstract doInference(method: string, data: unknown): Promise; +} diff --git a/agents/src/ipc/inference_proc_executor.ts b/agents/src/ipc/inference_proc_executor.ts new file mode 100644 index 00000000..9724a803 --- /dev/null +++ b/agents/src/ipc/inference_proc_executor.ts @@ -0,0 +1,3 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 diff --git a/agents/src/ipc/job_executor.ts b/agents/src/ipc/job_executor.ts index 10080b99..a3838cf9 100644 --- a/agents/src/ipc/job_executor.ts +++ b/agents/src/ipc/job_executor.ts @@ -7,13 +7,14 @@ export interface ProcOpts { agent: string; initializeTimeout: number; closeTimeout: number; + memoryWarnMB: number; + memoryLimitMB: number; + pingInterval: number; + pingTimeout: number; + highPingThreshold: number; } export abstract class JobExecutor { - PING_INTERVAL = 2.5 * 1000; - PING_TIMEOUT = 90 * 1000; - HIGH_PING_THRESHOLD = 0.5 * 1000; - abstract get started(): boolean; abstract get runningJob(): RunningJobInfo | undefined; diff --git a/agents/src/ipc/message.ts b/agents/src/ipc/message.ts index 5eba3e18..d1dbcd0e 100644 --- a/agents/src/ipc/message.ts +++ b/agents/src/ipc/message.ts @@ -7,7 +7,12 @@ import type { LoggerOptions } from '../log.js'; export type IPCMessage = | { case: 'initializeRequest'; - value: { loggerOptions: LoggerOptions }; + value: { + loggerOptions: LoggerOptions; + pingInterval?: number; + pingTimeout?: number; + highPingThreshold?: number; + }; } | { case: 'initializeResponse'; @@ -29,6 +34,14 @@ export type IPCMessage = case: 'shutdownRequest'; value: { reason?: string }; } + | { + case: 'inferenceRequest'; + value: { method: string; requestId: string; data: unknown }; + } + | { + case: 'inferenceResponse'; + value: { requestId: string; data: unknown; error?: Error }; + } | { case: 'exiting'; value: { reason?: string }; diff --git a/agents/src/ipc/proc_pool.ts b/agents/src/ipc/proc_pool.ts index 33cb9936..72cd7be0 100644 --- a/agents/src/ipc/proc_pool.ts +++ b/agents/src/ipc/proc_pool.ts @@ -4,6 +4,7 @@ import { MultiMutex, Mutex } from '@livekit/mutex'; import type { RunningJobInfo } from '../job.js'; import { Queue } from '../utils.js'; +import { InferenceExecutor } from './inference_executor.js'; import type { JobExecutor } from './job_executor.js'; import { ProcJobExecutor } from './proc_job_executor.js'; @@ -20,12 +21,18 @@ export class ProcPool { procMutex?: MultiMutex; procUnlock?: () => void; warmedProcQueue = new Queue(); + inferenceExecutor: InferenceExecutor; + memoryWarnMB: number; + memoryLimitMB: number; constructor( agent: string, numIdleProcesses: number, initializeTimeout: number, closeTimeout: number, + inferenceExecutor: InferenceExecutor, + memoryWarnMB: number, + memoryLimitMB: number, ) { this.agent = agent; if (numIdleProcesses > 0) { @@ -33,6 +40,9 @@ export class ProcPool { } this.initializeTimeout = initializeTimeout; this.closeTimeout = closeTimeout; + this.inferenceExecutor = inferenceExecutor; + this.memoryWarnMB = memoryWarnMB; + this.memoryLimitMB = memoryLimitMB; } get processes(): JobExecutor[] { @@ -52,7 +62,17 @@ export class ProcPool { this.procUnlock = undefined; } } else { - proc = new ProcJobExecutor(this.agent, this.initializeTimeout, this.closeTimeout); + proc = new ProcJobExecutor( + this.agent, + this.initializeTimeout, + this.closeTimeout, + this.inferenceExecutor, + 2500, + 60000, + 500, + this.memoryWarnMB, + this.memoryLimitMB, + ); this.executors.push(proc); await proc.start(); await proc.initialize(); diff --git a/agents/src/ipc/proc_job_executor.ts b/agents/src/ipc/supervised_proc.ts similarity index 83% rename from agents/src/ipc/proc_job_executor.ts rename to agents/src/ipc/supervised_proc.ts index 980a35b5..626625c7 100644 --- a/agents/src/ipc/proc_job_executor.ts +++ b/agents/src/ipc/supervised_proc.ts @@ -10,7 +10,7 @@ import type { ProcOpts } from './job_executor.js'; import { JobExecutor } from './job_executor.js'; import type { IPCMessage } from './message.js'; -export class ProcJobExecutor extends JobExecutor { +export class SupervisedProc { #opts: ProcOpts; #started = false; #closing = false; @@ -22,12 +22,26 @@ export class ProcJobExecutor extends JobExecutor { #join = new Future(); #logger = log().child({ runningJob: this.#runningJob }); - constructor(agent: string, initializeTimeout: number, closeTimeout: number) { - super(); + constructor( + agent: string, + initializeTimeout: number, + closeTimeout: number, + // XXX(nbsp): memoryWarnMB and memoryLimitMB are currently stubbed and serve no use + memoryWarnMB: number, + memoryLimitMB: number, + pingInterval: number, + pingTimeout: number, + highPingThreshold: number, + ) { this.#opts = { agent, initializeTimeout, closeTimeout, + memoryWarnMB, + memoryLimitMB, + pingInterval, + pingTimeout, + highPingThreshold, }; } @@ -61,7 +75,7 @@ export class ProcJobExecutor extends JobExecutor { this.#pingInterval = setInterval(() => { this.#proc!.send({ case: 'pingRequest', value: { timestamp: Date.now() } }); - }, this.PING_INTERVAL); + }, this.#opts.pingInterval); this.#pongTimeout = setTimeout(() => { this.#logger.warn('job is unresponsive'); @@ -69,13 +83,13 @@ export class ProcJobExecutor extends JobExecutor { clearInterval(this.#pingInterval); this.#proc!.kill(); this.#join.resolve(); - }, this.PING_TIMEOUT); + }, this.#opts.pingTimeout); const listener = (msg: IPCMessage) => { switch (msg.case) { case 'pongResponse': { const delay = Date.now() - msg.value.timestamp; - if (delay > this.HIGH_PING_THRESHOLD) { + if (delay > this.#opts.highPingThreshold) { this.#logger.child({ delay }).warn('job executor is unresponsive'); } this.#pongTimeout?.refresh(); @@ -119,7 +133,15 @@ export class ProcJobExecutor extends JobExecutor { this.#init.reject(err); throw err; }, this.#opts.initializeTimeout); - this.#proc!.send({ case: 'initializeRequest', value: { loggerOptions } }); + this.#proc!.send({ + case: 'initializeRequest', + value: { + loggerOptions, + pingInterval: this.#opts.pingInterval, + pingTimeout: this.#opts.pingTimeout, + highPingThreshold: this.#opts.highPingThreshold, + }, + }); await once(this.#proc!, 'message').then(([msg]: IPCMessage[]) => { clearTimeout(timer); if (msg!.case !== 'initializeResponse') { diff --git a/agents/src/job.ts b/agents/src/job.ts index f4e9aa59..aa66adfb 100644 --- a/agents/src/job.ts +++ b/agents/src/job.ts @@ -11,8 +11,22 @@ import type { } from '@livekit/rtc-node'; import { ParticipantKind, RoomEvent, TrackKind } from '@livekit/rtc-node'; import type { Logger } from 'pino'; +import { InferenceExecutor } from './ipc/inference_executor.js'; import { log } from './log.js'; +// TODO(nbsp): what is this thing +class JobCallContext { + static #current: JobCallContext; + + constructor() { + JobCallContext.#current = this; + } + + static getCurrent(): JobCallContext { + return JobCallContext.#current; + } +} + /** Which tracks, if any, should the agent automatically subscribe to? */ export enum AutoSubscribe { SUBSCRIBE_ALL, @@ -60,6 +74,7 @@ export class JobContext { }; } = {}; #logger: Logger; + #inferenceExecutor: InferenceExecutor; constructor( proc: JobProcess, @@ -67,6 +82,7 @@ export class JobContext { room: Room, onConnect: () => void, onShutdown: (s: string) => void, + inferenceExecutor: InferenceExecutor, ) { this.#proc = proc; this.#info = info; @@ -76,6 +92,7 @@ export class JobContext { this.onParticipantConnected = this.onParticipantConnected.bind(this); this.#room.on(RoomEvent.ParticipantConnected, this.onParticipantConnected); this.#logger = log().child({ info: this.#info }); + this.#inferenceExecutor = inferenceExecutor; } get proc(): JobProcess { @@ -96,6 +113,11 @@ export class JobContext { return this.#room.localParticipant; } + /** @returns The global inference executor */ + get inferenceExecutor(): InferenceExecutor { + return this.#inferenceExecutor; + } + /** Adds a promise to be awaited when {@link JobContext.shutdown | shutdown} is called. */ addShutdownCallback(callback: () => Promise) { this.shutdownCallbacks.push(callback);