Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permissions #1371

Merged
merged 31 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
fb10f53
Remove console log
parzival418 Oct 30, 2023
6023c9d
Move track google AI usage into result block
parzival418 Oct 30, 2023
735a276
Add feathers permissions package
parzival418 Oct 30, 2023
afdf2db
Add sessionId to module
parzival418 Oct 30, 2023
613c5f0
Pass session id into spellrunner
parzival418 Oct 30, 2023
05c43d0
Add session id to remote plugin emit args
parzival418 Oct 30, 2023
4961b2e
Add sessionId to module
parzival418 Oct 30, 2023
bdbb01f
Add session id to module context type
parzival418 Oct 30, 2023
6ecb6cd
Add session id from payload into socket params
parzival418 Oct 30, 2023
d56ee95
Handle sending events to sessionId if it exists in channels
parzival418 Oct 30, 2023
23321eb
Use agent commander in agent run service class function
parzival418 Oct 30, 2023
cd0d92b
Require a role to access the API
parzival418 Oct 30, 2023
a964a71
Modify agent commander to use agent or agentId and sessionid
parzival418 Oct 30, 2023
ec3e0a7
Pass sessionId down into runComponent
parzival418 Oct 30, 2023
8c35e4d
Add permissions to agentImage
parzival418 Oct 30, 2023
ae392cd
Add permissions to the rest of the services
parzival418 Oct 30, 2023
2510347
Fix permissions
parzival418 Oct 30, 2023
4a09403
Add log env to root logger creation
parzival418 Oct 30, 2023
6573556
Pass down spellId propertly to run agent commander
parzival418 Oct 30, 2023
7e9abd0
Add permissions to incoming API calls for permissioning full access c…
parzival418 Oct 30, 2023
b6810b0
Add permissions to agent HTTP service
parzival418 Oct 30, 2023
4101551
Improve logging. Got session ID channel routing to work.
parzival418 Oct 31, 2023
9d8e7d5
Pass in server environment on server initApp
parzival418 Oct 31, 2023
133610e
Add support for environment setter in initApp
parzival418 Oct 31, 2023
f9bf812
Remove embeddings and spell references in event emitting to client
parzival418 Oct 31, 2023
01ea86a
Don't publish events if they arent from the server
parzival418 Oct 31, 2023
e374007
Add a few more points of data from the remote plugin emitter
parzival418 Oct 31, 2023
8a0fb0e
fix eslint errors
Oct 31, 2023
b500d0b
fix eslint errors
Oct 31, 2023
ff50c80
Fix typescripot build errors by moving permissions library into the repo
parzival418 Oct 31, 2023
2934d7f
Merge
parzival418 Oct 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,16 @@ import { getPinoTransport } from '@hyperdx/node-opentelemetry'
import { PRODUCTION } from '@magickml/config'

if (PRODUCTION) {
initLogger({
name: 'cloud-agent-worker',
transport: {
targets: [
getPinoTransport('info')
]
},
level: 'info',
})
initLogger({
name: 'cloud-agent-worker',
transport: {
targets: [getPinoTransport('info')]
},
level: 'info'
})
} else {
initLogger({ name: 'cloud-agent-worker' })
}
initLogger({ name: 'cloud-agent-worker' })
}
const logger = getLogger()

// log handle errors
Expand Down Expand Up @@ -69,7 +67,7 @@ const routes: Route[] = [...spells, ...apis, ...serverRoutes]
* form and multipart-json requests, and routes.
*/
async function init() {
await initApp()
await initApp('server')
await initAgentCommander()
// load plugins
await (async () => {
Expand Down
25 changes: 25 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
"ethers": "5.7.2",
"expletives": "0.1.5",
"express": "4.18.2",
"feathers-permissions": "^2.1.4",
"feathers-sync": "3.0.3",
"flatted": "3.2.7",
"flexlayout-react": "0.7.7",
Expand Down
3 changes: 3 additions & 0 deletions packages/agents/src/lib/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ export class Agent implements AgentInterface {

async runWorker(job: Job<AgentRunJob>) {
// the job name is the agent id. Only run if the agent id matches.
this.logger.debug('running worker', { id: this.id, data: job.data })
if (this.id !== job.data.agentId) return

const { data } = job
Expand Down Expand Up @@ -235,6 +236,7 @@ export class Agent implements AgentInterface {
...this.secrets,
...data.secrets,
},
sessionId: data?.sessionId,
publicVariables: this.publicVariables,
runSubspell: data.runSubspell,
app,
Expand Down Expand Up @@ -269,6 +271,7 @@ export class Agent implements AgentInterface {

export interface AgentRunJob {
inputs: MagickSpellInput
sessionId?: string
jobId: string
agentId: string
spellId: string
Expand Down
39 changes: 26 additions & 13 deletions packages/agents/src/lib/AgentCommander.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import { AgentResult, AgentRunJob } from './Agent'
import { AGENT_RESPONSE_TIMEOUT_MSEC } from '@magickml/config'

export type RunRootSpellArgs = {
agent: Agent
agent?: Agent
agentId?: string
inputs: MagickSpellInput
componentName?: string
runSubspell?: boolean
Expand All @@ -25,6 +26,7 @@ export type RunRootSpellArgs = {
isSubSpell?: boolean
currentJob?: Job<AgentRunJob>
subSpellDepth?: number
sessionId?: string
}

interface AgentCommanderArgs {
Expand All @@ -41,7 +43,10 @@ export class AgentCommander extends EventEmitter {
}

runSpellWithResponse(args: RunRootSpellArgs) {
const { agent } = args
const { agentId, agent } = args
const id = agentId || agent?.id
if (!id) throw new Error('Agent or agent id is required')

return new Promise((resolve, reject) => {
;(async () => {
setTimeout(() => {
Expand All @@ -50,7 +55,7 @@ export class AgentCommander extends EventEmitter {

let jobId: null | string = null

const agentMessageName = AGENT_RUN_RESULT(agent.id)
const agentMessageName = AGENT_RUN_RESULT(id)

this.pubSub.subscribe(agentMessageName, (data: AgentResult) => {
if (data.result.error) {
Expand All @@ -66,7 +71,7 @@ export class AgentCommander extends EventEmitter {
}
})

const agentErrorName = AGENT_RUN_ERROR(agent.id)
const agentErrorName = AGENT_RUN_ERROR(id)
this.pubSub.subscribe(agentErrorName, (data: AgentResult) => {
if (data.jobId === jobId) {
this.pubSub.unsubscribe(agentErrorName)
Expand All @@ -83,46 +88,54 @@ export class AgentCommander extends EventEmitter {
private runRootSpellArgsToString(
jobId: string,
{
agent,
agentId,
inputs,
componentName,
runSubspell,
secrets,
publicVariables,
spellId,
subSpellDepth,
sessionId,
}: RunRootSpellArgs
) {
return JSON.stringify({
jobId,
agentId: agent.id,
spellId: spellId || agent.rootSpellId,
agentId,
spellId: spellId,
inputs,
componentName,
runSubspell,
secrets,
publicVariables,
subSpellDepth,
sessionId,
})
}

async runSubSpell(args: RunRootSpellArgs) {
const { agent } = args
const { agentId, agent } = args
const id = agentId || agent?.id
if (!id) throw new Error('Agent or agent id is required')

const jobId = uuidv4()
await this.pubSub.publish(
AGENT_RUN_JOB(agent.id),
AGENT_RUN_JOB(id),
this.runRootSpellArgsToString(jobId, args)
)
return jobId
}

async runSpell(args: RunRootSpellArgs) {
const { agent } = args
this.logger.debug(`Running Spell on Agent: ${agent.id}`)
this.logger.debug(AGENT_RUN_JOB(agent.id))
const { agent, agentId } = args
const id = agentId || agent?.id
if (!id) throw new Error('Agent or agent id is required')

this.logger.debug(`Running Spell on Agent: ${id}`)
this.logger.debug(AGENT_RUN_JOB(id))
const jobId = uuidv4()
await this.pubSub.publish(
AGENT_RUN_JOB(agent.id),
AGENT_RUN_JOB(id),
this.runRootSpellArgsToString(jobId, args)
)
return jobId
Expand Down
116 changes: 72 additions & 44 deletions packages/cloud-agent-manager/src/lib/CloudAgentManager.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import pino from "pino"
import { diff, unique } from "radash"
import { AGENT_DELETE, AGENT_DELETE_JOB, AGENT_UPDATE_JOB, getLogger } from "@magickml/core"
import type { Reporter } from "./Reporters"
import { type PubSub, type MessageQueue, app } from "@magickml/server-core"
import { Agent } from "packages/core/server/src/services/agents/agents.schema"
import { HEARTBEAT_MSEC, MANAGER_WARM_UP_MSEC } from "@magickml/config"
import pino from 'pino'
import { diff, unique } from 'radash'
import {
AGENT_DELETE,
AGENT_DELETE_JOB,
AGENT_UPDATE_JOB,
getLogger,
} from '@magickml/core'
import type { Reporter } from './Reporters'
import { type PubSub, type MessageQueue, app } from '@magickml/server-core'
import { Agent } from 'packages/core/server/src/services/agents/agents.schema'
import { HEARTBEAT_MSEC, MANAGER_WARM_UP_MSEC } from '@magickml/config'

interface CloudAgentManagerConstructor {
pubSub: PubSub
Expand Down Expand Up @@ -42,69 +47,92 @@ export class CloudAgentManager {
this.logger.info(`Agent Updated: ${agent.id}`)

if (agent.enabled) {
this.logger.info(`Agent ${agent.id} enabled, adding to cloud agent worker`)
await this.newQueue.addJob('agent:new', {agentId: agent.id})
this.logger.info(
`Agent ${agent.id} enabled, adding to cloud agent worker`
)
await this.newQueue.addJob('agent:new', { agentId: agent.id })
this.logger.debug(`Agent create job for ${agent.id} added`)
return
}

this.pubSub.publish(AGENT_UPDATE_JOB(agent.id), JSON.stringify({ agentId: agent.id }))
this.pubSub.publish(
AGENT_UPDATE_JOB(agent.id),
JSON.stringify({ agentId: agent.id })
)
})

this.agentStateReporter.on(AGENT_DELETE, async (data: unknown) => {
const agent = data as Agent
this.pubSub.publish(AGENT_DELETE_JOB(agent.id), JSON.stringify({ agentId: agent.id }))
this.pubSub.publish(
AGENT_DELETE_JOB(agent.id),
JSON.stringify({ agentId: agent.id })
)
})
}

async dedupeAgents(agents: string[]) {
const deduped = unique(agents)
const diffAgents = diff(agents, deduped)

this.logger.info("deduping agents %o", diffAgents)
diffAgents.forEach(async (agentId) => {
await this.pubSub.publish(AGENT_DELETE_JOB(agentId), JSON.stringify({ agentId: agentId }))
await this.newQueue.addJob('agent:new', {agentId: agentId})
this.logger.trace('deduping agents %o', diffAgents)
diffAgents.forEach(async agentId => {
await this.pubSub.publish(
AGENT_DELETE_JOB(agentId),
JSON.stringify({ agentId: agentId })
)
await this.newQueue.addJob('agent:new', { agentId: agentId })
})

return deduped
}

// Eventually we'll need this heartbeat to keep track of running agents on workers
async heartbeat() {
this.logger.debug("Started heartbeat")
this.logger.debug('Started heartbeat')
let agentsOfWorkers: string[] = []
this.pubSub.subscribe("heartbeat-pong", async (agents: string[]) => {
this.logger.debug("Got heartbeat pong")
this.pubSub.subscribe('heartbeat-pong', async (agents: string[]) => {
this.logger.trace('Got heartbeat pong')
agents.forEach(a => agentsOfWorkers.push(a))
agentsOfWorkers = await this.dedupeAgents(agentsOfWorkers)
})
await this.pubSub.publish("heartbeat-ping", "{}")

setTimeout(() =>
setInterval(async () => {
this.logger.debug(`Starting Heartbeat update`)
const enabledAgents = await app.service('agents').find({
query: {
enabled: true,
},
})

const agentDiff = diff(enabledAgents.data.map(a => a.id), Array.from(agentsOfWorkers))
const agentsToUpdate = enabledAgents.data.filter(a => agentDiff.includes(a.id))

if (agentDiff.length > 0) {
this.logger.info(`Found ${agentDiff.length} agents to Update`)
const agentPromises: Promise<any>[] = []
for (const agent of agentsToUpdate) {
this.logger.debug(`Adding agent ${agent.id} to cloud agent worker`)
agentPromises.push(this.newQueue.addJob('agent:new', {agentId: agent.id}))
await this.pubSub.publish('heartbeat-ping', '{}')

setTimeout(
() =>
setInterval(async () => {
this.logger.trace(`Starting Heartbeat update`)
const enabledAgents = await app.service('agents').find({
query: {
enabled: true,
},
})

const agentDiff = diff(
enabledAgents.data.map(a => a.id),
Array.from(agentsOfWorkers)
)
const agentsToUpdate = enabledAgents.data.filter(a =>
agentDiff.includes(a.id)
)

if (agentDiff.length > 0) {
this.logger.info(`Found ${agentDiff.length} agents to Update`)
const agentPromises: Promise<any>[] = []
for (const agent of agentsToUpdate) {
this.logger.debug(
`Adding agent ${agent.id} to cloud agent worker`
)
agentPromises.push(
this.newQueue.addJob('agent:new', { agentId: agent.id })
)
}

await Promise.all(agentPromises)
}

await Promise.all(agentPromises)
}
agentsOfWorkers = [];
this.pubSub.publish("heartbeat-ping", "{}")
}, HEARTBEAT_MSEC), MANAGER_WARM_UP_MSEC)
agentsOfWorkers = []
this.pubSub.publish('heartbeat-ping', '{}')
}, HEARTBEAT_MSEC),
MANAGER_WARM_UP_MSEC
)
}
}
2 changes: 1 addition & 1 deletion packages/cloud-agent-worker/src/lib/cloud-agent-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class CloudAgentWorker extends AgentManager {
})

this.pubSub.subscribe('heartbeat-ping', async () => {
this.logger.debug('Got heartbeat ping')
this.logger.trace('Got heartbeat ping')
const agentIds = Object.keys(this.currentAgents)
this.pubSub.publish('heartbeat-pong', JSON.stringify(agentIds))
})
Expand Down
Loading