Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed Dec 23, 2024
1 parent 731a5a3 commit 8fd22ee
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 14 deletions.
2 changes: 1 addition & 1 deletion agents/src/inference_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ export abstract class InferenceRunner {
}

abstract initialize(): Promise<void>;
abstract run(data: any): Promise<any>;
abstract run(data: unknown): Promise<unknown>;
}
7 changes: 7 additions & 0 deletions agents/src/ipc/inference_executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0

export abstract class InferenceExecutor {
abstract doInference(method: string, data: unknown): Promise<unknown>;
}
3 changes: 3 additions & 0 deletions agents/src/ipc/inference_proc_executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
9 changes: 5 additions & 4 deletions agents/src/ipc/job_executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ export interface ProcOpts {
agent: string;
initializeTimeout: number;
closeTimeout: number;
memoryWarnMB: number;
memoryLimitMB: number;
pingInterval: number;
pingTimeout: number;
highPingThreshold: number;
}

export abstract class JobExecutor {
PING_INTERVAL = 2.5 * 1000;
PING_TIMEOUT = 90 * 1000;
HIGH_PING_THRESHOLD = 0.5 * 1000;

abstract get started(): boolean;
abstract get runningJob(): RunningJobInfo | undefined;

Expand Down
15 changes: 14 additions & 1 deletion agents/src/ipc/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import type { LoggerOptions } from '../log.js';
export type IPCMessage =
| {
case: 'initializeRequest';
value: { loggerOptions: LoggerOptions };
value: {
loggerOptions: LoggerOptions;
pingInterval?: number;
pingTimeout?: number;
highPingThreshold?: number;
};
}
| {
case: 'initializeResponse';
Expand All @@ -29,6 +34,14 @@ export type IPCMessage =
case: 'shutdownRequest';
value: { reason?: string };
}
| {
case: 'inferenceRequest';
value: { method: string; requestId: string; data: unknown };
}
| {
case: 'inferenceResponse';
value: { requestId: string; data: unknown; error?: Error };
}
| {
case: 'exiting';
value: { reason?: string };
Expand Down
22 changes: 21 additions & 1 deletion agents/src/ipc/proc_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { MultiMutex, Mutex } from '@livekit/mutex';
import type { RunningJobInfo } from '../job.js';
import { Queue } from '../utils.js';
import { InferenceExecutor } from './inference_executor.js';
import type { JobExecutor } from './job_executor.js';
import { ProcJobExecutor } from './proc_job_executor.js';

Expand All @@ -20,19 +21,28 @@ export class ProcPool {
procMutex?: MultiMutex;
procUnlock?: () => void;
warmedProcQueue = new Queue<JobExecutor>();
inferenceExecutor: InferenceExecutor;
memoryWarnMB: number;
memoryLimitMB: number;

constructor(
agent: string,
numIdleProcesses: number,
initializeTimeout: number,
closeTimeout: number,
inferenceExecutor: InferenceExecutor,
memoryWarnMB: number,
memoryLimitMB: number,
) {
this.agent = agent;
if (numIdleProcesses > 0) {
this.procMutex = new MultiMutex(numIdleProcesses);
}
this.initializeTimeout = initializeTimeout;
this.closeTimeout = closeTimeout;
this.inferenceExecutor = inferenceExecutor;
this.memoryWarnMB = memoryWarnMB;
this.memoryLimitMB = memoryLimitMB;
}

get processes(): JobExecutor[] {
Expand All @@ -52,7 +62,17 @@ export class ProcPool {
this.procUnlock = undefined;
}
} else {
proc = new ProcJobExecutor(this.agent, this.initializeTimeout, this.closeTimeout);
proc = new ProcJobExecutor(
this.agent,
this.initializeTimeout,
this.closeTimeout,
this.inferenceExecutor,
2500,
60000,
500,
this.memoryWarnMB,
this.memoryLimitMB,
);
this.executors.push(proc);
await proc.start();
await proc.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { ProcOpts } from './job_executor.js';
import { JobExecutor } from './job_executor.js';
import type { IPCMessage } from './message.js';

export class ProcJobExecutor extends JobExecutor {
export class SupervisedProc {
#opts: ProcOpts;
#started = false;
#closing = false;
Expand All @@ -22,12 +22,26 @@ export class ProcJobExecutor extends JobExecutor {
#join = new Future();
#logger = log().child({ runningJob: this.#runningJob });

constructor(agent: string, initializeTimeout: number, closeTimeout: number) {
super();
constructor(
agent: string,
initializeTimeout: number,
closeTimeout: number,
// XXX(nbsp): memoryWarnMB and memoryLimitMB are currently stubbed and serve no use
memoryWarnMB: number,
memoryLimitMB: number,
pingInterval: number,
pingTimeout: number,
highPingThreshold: number,
) {
this.#opts = {
agent,
initializeTimeout,
closeTimeout,
memoryWarnMB,
memoryLimitMB,
pingInterval,
pingTimeout,
highPingThreshold,
};
}

Expand Down Expand Up @@ -61,21 +75,21 @@ export class ProcJobExecutor extends JobExecutor {

this.#pingInterval = setInterval(() => {
this.#proc!.send({ case: 'pingRequest', value: { timestamp: Date.now() } });
}, this.PING_INTERVAL);
}, this.#opts.pingInterval);

this.#pongTimeout = setTimeout(() => {
this.#logger.warn('job is unresponsive');
clearTimeout(this.#pongTimeout);
clearInterval(this.#pingInterval);
this.#proc!.kill();
this.#join.resolve();
}, this.PING_TIMEOUT);
}, this.#opts.pingTimeout);

const listener = (msg: IPCMessage) => {
switch (msg.case) {
case 'pongResponse': {
const delay = Date.now() - msg.value.timestamp;
if (delay > this.HIGH_PING_THRESHOLD) {
if (delay > this.#opts.highPingThreshold) {
this.#logger.child({ delay }).warn('job executor is unresponsive');
}
this.#pongTimeout?.refresh();
Expand Down Expand Up @@ -119,7 +133,15 @@ export class ProcJobExecutor extends JobExecutor {
this.#init.reject(err);
throw err;
}, this.#opts.initializeTimeout);
this.#proc!.send({ case: 'initializeRequest', value: { loggerOptions } });
this.#proc!.send({
case: 'initializeRequest',
value: {
loggerOptions,
pingInterval: this.#opts.pingInterval,
pingTimeout: this.#opts.pingTimeout,
highPingThreshold: this.#opts.highPingThreshold,
},
});
await once(this.#proc!, 'message').then(([msg]: IPCMessage[]) => {
clearTimeout(timer);
if (msg!.case !== 'initializeResponse') {
Expand Down
22 changes: 22 additions & 0 deletions agents/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,22 @@ import type {
} from '@livekit/rtc-node';
import { ParticipantKind, RoomEvent, TrackKind } from '@livekit/rtc-node';
import type { Logger } from 'pino';
import { InferenceExecutor } from './ipc/inference_executor.js';
import { log } from './log.js';

// TODO(nbsp): what is this thing
class JobCallContext {
static #current: JobCallContext;

constructor() {
JobCallContext.#current = this;
}

static getCurrent(): JobCallContext {
return JobCallContext.#current;
}
}

/** Which tracks, if any, should the agent automatically subscribe to? */
export enum AutoSubscribe {
SUBSCRIBE_ALL,
Expand Down Expand Up @@ -60,13 +74,15 @@ export class JobContext {
};
} = {};
#logger: Logger;
#inferenceExecutor: InferenceExecutor;

constructor(
proc: JobProcess,
info: RunningJobInfo,
room: Room,
onConnect: () => void,
onShutdown: (s: string) => void,
inferenceExecutor: InferenceExecutor,
) {
this.#proc = proc;
this.#info = info;
Expand All @@ -76,6 +92,7 @@ export class JobContext {
this.onParticipantConnected = this.onParticipantConnected.bind(this);
this.#room.on(RoomEvent.ParticipantConnected, this.onParticipantConnected);
this.#logger = log().child({ info: this.#info });
this.#inferenceExecutor = inferenceExecutor;
}

get proc(): JobProcess {
Expand All @@ -96,6 +113,11 @@ export class JobContext {
return this.#room.localParticipant;
}

/** @returns The global inference executor */
get inferenceExecutor(): InferenceExecutor {
return this.#inferenceExecutor;
}

/** Adds a promise to be awaited when {@link JobContext.shutdown | shutdown} is called. */
addShutdownCallback(callback: () => Promise<void>) {
this.shutdownCallbacks.push(callback);
Expand Down

0 comments on commit 8fd22ee

Please sign in to comment.