Skip to content

Commit

Permalink
Merge pull request #1446 from Oneirocom/redis-optimisations
Browse files Browse the repository at this point in the history
Redis optimisations
  • Loading branch information
benbot authored Dec 22, 2023
2 parents 4bb0940 + 475d7b6 commit d7593dc
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 289 deletions.
4 changes: 2 additions & 2 deletions apps/cloud-agent-worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ if (PRODUCTION) {
}
const logger = getLogger()

await initApp()
const app = await initApp()

await initAgentCommander()

Expand All @@ -45,5 +45,5 @@ if (PRODUCTION || DONT_CRASH_ON_ERROR) {
await loadPlugins()

logger.info('Starting worker')
const worker = new CloudAgentWorker()
const worker = new CloudAgentWorker(app)
worker.startWork()
1 change: 0 additions & 1 deletion packages/server/agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { app } from 'server/core'
import { AgentCommander } from './lib/AgentCommander'

export * from './lib/Agent'
export * from './lib/AgentManager'
export * from './lib/AgentCommander'
export * from './lib/AgentLogger'

Expand Down
13 changes: 4 additions & 9 deletions packages/server/agents/src/lib/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import {
} from 'shared/core'

import { getLogger } from 'server/logger'

import { AgentManager } from './AgentManager'
import { Application } from 'server/core'

import {
Expand All @@ -28,6 +26,7 @@ import { CommandHub } from './CommandHub'
import { Spellbook } from 'server/grimoire'
import { AgentInterface } from 'server/schemas'
import { RedisPubSub } from 'server/redis-pubsub'
import { CloudAgentWorker } from 'server/cloud-agent-worker'

/**
* The Agent class that implements AgentInterface.
Expand All @@ -42,7 +41,6 @@ export class Agent implements AgentInterface {
spellManager: SpellManager
projectId!: string
rootSpellId!: string
agentManager: AgentManager
spellRunner?: SpellRunner
logger: pino.Logger = getLogger()
worker: Worker
Expand All @@ -52,9 +50,9 @@ export class Agent implements AgentInterface {
ready = false
app: Application
spellbook: Spellbook<Agent, Application>
agentManager: CloudAgentWorker

outputTypes: any[] = []
updateInterval: any

/**
* Agent constructor initializes properties and sets intervals for updating agents
Expand All @@ -63,14 +61,14 @@ export class Agent implements AgentInterface {
*/
constructor(
agentData: AgentInterface,
agentManager: AgentManager,
agentManager: CloudAgentWorker,
worker: Worker,
pubsub: RedisPubSub,
app: Application
) {
this.id = agentData.id
this.agentManager = agentManager
this.app = app
this.agentManager = agentManager

this.update(agentData)
this.logger.info('Creating new agent named: %s | %s', this.name, this.id)
Expand Down Expand Up @@ -243,9 +241,6 @@ export class Agent implements AgentInterface {
* Clean up resources when the instance is destroyed.
*/
async onDestroy() {
if (this.updateInterval) {
clearInterval(this.updateInterval)
}
this.removePlugins()
this.log('destroyed agent', { id: this.id })
}
Expand Down
184 changes: 0 additions & 184 deletions packages/server/agents/src/lib/AgentManager.ts

This file was deleted.

43 changes: 34 additions & 9 deletions packages/server/cloud-agent-worker/src/lib/cloud-agent-worker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Worker, Job } from 'bullmq'

import { app } from 'server/core'
import { getLogger } from 'server/logger'
import { Application, app } from 'server/core'

import { BullMQWorker, BullQueue } from 'server/communication'

import { Agent, AgentManager, type AgentRunJob } from 'server/agents'
import { Agent, type AgentRunJob } from 'server/agents'
import { v4 as uuidv4 } from 'uuid'
import {
AGENT_DELETE,
Expand All @@ -13,6 +13,7 @@ import {
AGENT_UPDATE_JOB,
} from 'shared/core'
import { type RedisPubSub } from 'server/redis-pubsub'
import pino from 'pino'

export interface AgentListRecord {
id: string
Expand All @@ -22,13 +23,17 @@ export interface AgentListRecord {
// I get that it's confusing extending AgentManager, but it's the best way to
// get the functionality I want without having to rewrite a bunch of stuff.
// Agent Managers just managed agents for a single instance of the server anyway
export class CloudAgentWorker extends AgentManager {
export class CloudAgentWorker {
pubSub: RedisPubSub
subscriptions: Record<string, Function> = {}

constructor() {
super(app, false)

logger: pino.Logger = getLogger()
currentAgents: string[] = []
addHandlers: any = []
removeHandlers: any = []
app: Application

constructor(app: Application) {
this.app = app
this.pubSub = app.get('pubsub')

this.pubSub.subscribe(AGENT_DELETE, async (agentId: string) => {
Expand All @@ -45,7 +50,6 @@ export class CloudAgentWorker extends AgentManager {
})

this.addAgent = this.addAgent.bind(this)
this.updateAgents = this.updateAgents.bind(this)
}

heartbeat() {
Expand All @@ -60,6 +64,10 @@ export class CloudAgentWorker extends AgentManager {
})
}

getAgent(agentId: string) {
return this.currentAgents[agentId]
}

async addAgent(agentId: string) {
this.logger.info(`Creating agent ${agentId}...`)
const agentDBRes = await app.service('agents').find({
Expand Down Expand Up @@ -111,6 +119,23 @@ export class CloudAgentWorker extends AgentManager {
this.logger.info(`Updated agent ${agentId}`)
}

/**
* Register a handler to be called when an agent is added.
* @param handler - The handler function to be called.
*/
registerAddAgentHandler(handler) {
this.logger.debug('Registering add agent handler')
this.addHandlers.push(handler)
}
/**
* Register a handler to be called when an agent is removed.
* @param handler - The handler function to be called.
*/
registerRemoveAgentHandler(handler) {
this.logger.debug('Registering remove agent handler')
this.removeHandlers.push(handler)
}

async removeAgent(agentId: string) {
this.logger.info(`Removing agent ${agentId}...`)

Expand Down
4 changes: 3 additions & 1 deletion packages/server/core/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { RedisPubSub } from 'server/redis-pubsub'
import sync from 'feathers-sync'
import { configureManager, globalsManager } from 'shared/core'

import { REDISCLOUD_URL, API_ACCESS_KEY, bullMQConnection } from 'shared/config'
import { REDISCLOUD_URL, API_ACCESS_KEY } from 'shared/config'
import { createPosthogClient } from 'server/event-tracker'

import { dbClient } from './dbClient'
Expand Down Expand Up @@ -270,4 +270,6 @@ export async function initApp(environment: Environment = 'default') {
teardown: [],
})
logger.info('Feathers app initialized')

return app
}
Loading

0 comments on commit d7593dc

Please sign in to comment.