diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index c5840f8462..132c607863 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -28,18 +28,16 @@ import { getPinoTransport } from '@hyperdx/node-opentelemetry' import { PRODUCTION } from '@magickml/config' if (PRODUCTION) { - initLogger({ - name: 'cloud-agent-worker', - transport: { - targets: [ - getPinoTransport('info') - ] - }, - level: 'info', - }) + initLogger({ + name: 'cloud-agent-worker', + transport: { + targets: [getPinoTransport('info')] + }, + level: 'info' + }) } else { - initLogger({ name: 'cloud-agent-worker' }) -} + initLogger({ name: 'cloud-agent-worker' }) +} const logger = getLogger() // log handle errors @@ -69,7 +67,7 @@ const routes: Route[] = [...spells, ...apis, ...serverRoutes] * form and multipart-json requests, and routes. */ async function init() { - await initApp() + await initApp('server') await initAgentCommander() // load plugins await (async () => { diff --git a/packages/core/server/src/app.ts b/packages/core/server/src/app.ts index a01ffeb853..86a2eb4b23 100644 --- a/packages/core/server/src/app.ts +++ b/packages/core/server/src/app.ts @@ -43,6 +43,8 @@ import { authenticateApiKey } from './hooks/authenticateApiKey' // Initialize the Feathers Koa app export const app: Application = koa(feathers()) +export type Environment = 'default' | 'server' | 'agent' + declare module './declarations' { interface Configuration { vectordb: PostgresVectorStoreCustom | any @@ -51,10 +53,11 @@ declare module './declarations' { redis: Redis isAgent?: boolean agentCommander: AgentCommander + environment: Environment } } -export async function initApp() { +export async function initApp(environment: Environment = 'default') { const logger = getLogger() logger.info('Initializing feathers app...') globalsManager.register('feathers', app) @@ -70,6 +73,7 @@ export async function initApp() { max: paginateMax, } app.set('paginate', paginate) + app.set('environment', environment) // Koa middleware app.use(cors({ origin: '*' })) diff --git a/packages/core/server/src/services/documents/documents.class.ts b/packages/core/server/src/services/documents/documents.class.ts index e9356ba671..100a7da848 100644 --- a/packages/core/server/src/services/documents/documents.class.ts +++ b/packages/core/server/src/services/documents/documents.class.ts @@ -2,6 +2,16 @@ // This module provides a document service for managing documents with embedding and pagination support // For more information about this file see https://dove.feathersjs.com/guides/cli/service.class.html#database-services +import AWS from 'aws-sdk' +import * as BullMQ from 'bullmq' +import { + AWS_BUCKET_NAME, + AWS_ACCESS_KEY, + AWS_REGION, + AWS_SECRET_KEY, + AWS_BUCKET_ENDPOINT, +} from '@magickml/config' + import type { Params } from '@feathersjs/feathers' import type { KnexAdapterOptions, KnexAdapterParams } from '@feathersjs/knex' import { KnexService } from '@feathersjs/knex' @@ -23,6 +33,24 @@ export type DocumentParams = KnexAdapterParams const embeddingSize = 1000 +export const DOCUMENT_QUEUE = 'document:process' + +type Element = { + date: string + type: string + projectId: string + id: string + metadata: { + fileName: string + fileType: string + } + embeddings: { + documentId: string + index: string + content: string + }[] +} + /** * DocumentService class * Implements the custom document service extending the base Knex service @@ -32,6 +60,34 @@ const embeddingSize = 1000 export class DocumentService< ServiceParams extends Params = DocumentParams > extends KnexService { + s3: AWS.S3 + uploader: any + bucketName: string = AWS_BUCKET_NAME + documentQueue: BullMQ.Queue = new BullMQ.Queue(DOCUMENT_QUEUE) + + constructor(args) { + super(args) + // Set up AWS S3 + AWS.config.update({ + accessKeyId: AWS_ACCESS_KEY, + secretAccessKey: AWS_SECRET_KEY, + region: AWS_REGION, + }) + this.s3 = new AWS.S3({ + endpoint: AWS_BUCKET_ENDPOINT, + s3ForcePathStyle: true, + }) + } + + // Not the best typing here + async create(data: DocumentData[] | any): Promise { + console.log('Adding document to queue', data) + const job = this.documentQueue.add('create', data) + return { + job, + } + } + /** * Creates a new document * @param data {DocumentData} The document data to create @@ -40,14 +96,15 @@ export class DocumentService< // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore - async create(data: DocumentData): Promise { + async createWorker(data: DocumentData, job: BullMQ.Job): Promise { + console.log('Work received in create worker') const embeddingdb = app.get('embeddingdb') const { modelName, secrets, files, ...docData } = data as DocumentData & { modelName: string secrets: string } - let elements = [] as any[] + let elements = [] as Element[] if (docData.content) { elements = [ ...elements, @@ -68,11 +125,14 @@ export class DocumentService< } for (const element of elements) { + // report the progress of the jpb + const { embeddings, ...document } = element embeddings //remove linting error lmao await embeddingdb.from('documents').insert(document) //create embeddings - for (const embedding of element.embeddings) { + for (let i = 0; i < element.embeddings.length; i++) { + const embedding = element.embeddings[i] if (!embedding.content || embedding.content?.length === 0) continue if (data.hasOwnProperty('secrets')) { await embeddingdb.fromString(embedding.content, embedding, { @@ -83,6 +143,7 @@ export class DocumentService< } else { await embeddingdb.from('embeddings').insert(embedding) } + await job.updateProgress((i / element.embeddings.length) * 100) } } @@ -126,8 +187,6 @@ export class DocumentService< ) { const param = params.query - console.log('param!!!!!!!', param) - const querys = await db('documents') .joinRaw( 'inner join embeddings on documents.id = embeddings."documentId" and embeddings.index = 0' @@ -292,7 +351,7 @@ const getUnstructuredData = async (files, docData) => { return elements } -const createElement = (element, docData) => { +const createElement = (element, docData): Element => { const documentId = uuidv4() const embeddings: any[] = [] for (const i in element) { diff --git a/packages/core/server/src/services/documents/documents.ts b/packages/core/server/src/services/documents/documents.ts index 6d32d85043..51ee23a3ad 100644 --- a/packages/core/server/src/services/documents/documents.ts +++ b/packages/core/server/src/services/documents/documents.ts @@ -1,14 +1,16 @@ // DOCUMENTED +import * as BullMQ from 'bullmq' import { hooks as schemaHooks } from '@feathersjs/schema' import pgvector from 'pgvector/pg' import { Application, HookContext } from '../../declarations' -import { DocumentService, getOptions } from './documents.class' +import { DOCUMENT_QUEUE, DocumentService, getOptions } from './documents.class' import { documentPatchResolver, documentPatchValidator, documentQueryResolver, documentQueryValidator, } from './documents.schema' +import { event } from '../events/events' // Array with 1536 elements containing 0 const nullArray = new Array(1536).fill(0) @@ -24,9 +26,32 @@ export const document = (app: Application) => { // Register our service on the Feathers application app.use('documents', new DocumentService(getOptions(app)), { methods: ['find', 'get', 'create', 'patch', 'remove'], - events: [], + events: ['finished', 'progress'], }) + if (app.get('environment') === 'server') { + // Set up document queue to process document jobs + const worker = new BullMQ.Worker(DOCUMENT_QUEUE, async job => { + const { data } = await app + .service('documents') + .createWorker(job.data, job) + return data + }) + + // event queues to send events to the client + const eventQueue = new BullMQ.QueueEvents(DOCUMENT_QUEUE) + + eventQueue.on('completed', (jobId, returnvalue) => { + console.log('DOCUMENT COMPLETED', jobId, returnvalue) + app.service('documents').emit('finished', { jobId, returnvalue }) + }) + + eventQueue.on('progress', (jobId, progress) => { + console.log('progress', jobId, progress) + app.service('documents').emit('progress', { jobId, progress }) + }) + } + // Initialize hooks app.service('documents').hooks({ around: {