diff --git a/.gitignore b/.gitignore index 6bd811c..1ffdcf0 100644 --- a/.gitignore +++ b/.gitignore @@ -152,6 +152,10 @@ dist .dynamodb/ +# Debezium + +debezium-server/ + # TernJS port file .tern-port diff --git a/Makefile b/Makefile index 9e56735..3cc8e63 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,25 @@ core-install: - devbox run "cd core && pnpm install" + @devbox run "cd core && pnpm install" core-test: - devbox run "cd core && pnpm test" + @devbox run "cd core && pnpm test" + +worker-setup: + @cd worker && \ + ([ ! -d debezium-server ] && \ + curl -O https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/2.5.0.Final/debezium-server-dist-2.5.0.Final.tar.gz && \ + tar -xvf debezium-server-dist-2.5.0.Final.tar.gz && \ + rm debezium-server-dist-2.5.0.Final.tar.gz) || true +worker-install: worker-setup + @devbox run "cd worker && pnpm install" +worker-up: + @devbox run "cd worker && pnpm concurrently -- \"pnpm:up:*\"" docs-install: - devbox run "cd docs && pnpm install" + @devbox run "cd docs && pnpm install" docs-up: - devbox run "cd docs && pnpm start" + @devbox run "cd docs && pnpm start" docs-build: - devbox run "cd docs && pnpm build" + @devbox run "cd docs && pnpm build" sh: - devbox shell + @devbox shell diff --git a/README.md b/README.md index 0d3c170..6b6756e 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,22 @@ # Bemi +## Running locally + +```sh +make worker-install + +export DB_HOST=127.0.0.1 DB_PORT=5432 DB_NAME=postgres DB_USER=postgres DB_PASSWORD=postgres +make worker-up +``` + +## Testing + +```sh +make core-install +make core-test +``` + ## Architecture ![Bemi Worker Architecture](docs/static/img/worker.png) diff --git a/core/package.json b/core/package.json index 732c6d5..28db757 100644 --- a/core/package.json +++ b/core/package.json @@ -25,14 +25,23 @@ "dependencies": { "@mikro-orm/core": "^5.9.4", "@mikro-orm/migrations": "^5.9.4", + "@mikro-orm/postgresql": "^5.9.4", "nats": "^2.19.0" }, "jest": { - "moduleFileExtensions": ["js", "json", "ts"], + "moduleFileExtensions": [ + "js", + "json", + "ts" + ], "rootDir": "src", "testRegex": ".*\\.spec\\.ts$", - "transform": { "^.+\\.(t|j)s$": "ts-jest" }, - "collectCoverageFrom": ["**/*.(t|j)s"], + "transform": { + "^.+\\.(t|j)s$": "ts-jest" + }, + "collectCoverageFrom": [ + "**/*.(t|j)s" + ], "coverageDirectory": "../coverage", "testEnvironment": "node" } diff --git a/core/src/ingestion.ts b/core/src/ingestion.ts index 100b626..0997a27 100644 --- a/core/src/ingestion.ts +++ b/core/src/ingestion.ts @@ -1,12 +1,12 @@ import { MikroORM } from '@mikro-orm/core'; import type { PostgreSqlDriver } from '@mikro-orm/postgresql' -import { logger } from '../core/logger' -import { Message } from '../core/nats' -import { Change } from "../core/entities/Change" -import { ChangeMessage } from '../core/change-message' -import { ChangeMessagesBuffer } from '../core/change-message-buffer' -import { stitchChangeMessages } from '../core/stitching' +import { logger } from './logger' +import { Message } from './nats' +import { Change } from "./entities/Change" +import { ChangeMessage } from './change-message' +import { ChangeMessagesBuffer } from './change-message-buffer' +import { stitchChangeMessages } from './stitching' const sleep = (seconds: number) => ( new Promise(resolve => setTimeout(resolve, seconds * 1000)) @@ -62,8 +62,21 @@ const fetchMessages = async ( } export const runIngestionLoop = async ( - { orm, consumer, fetchBatchSize, refetchEmptyAfterSeconds, insertBatchSize }: - { orm: MikroORM, consumer: any, fetchBatchSize: number, refetchEmptyAfterSeconds: number, insertBatchSize: number } + { + orm, + consumer, + fetchBatchSize = 100, + insertBatchSize = 100, + refetchEmptyAfterSeconds = 10, + useBuffer = false, + }: { + orm: MikroORM, + consumer: any, + fetchBatchSize?: number, + refetchEmptyAfterSeconds?: number, + insertBatchSize?: number, + useBuffer?: boolean, + } ) => { let lastStreamSequence: number | null = null let changeMessagesBuffer = new ChangeMessagesBuffer() @@ -78,13 +91,18 @@ export const runIngestionLoop = async ( // Stitching const now = new Date() const changeMessages = messages.map((message: Message) => ChangeMessage.fromMessage(message, now)) - const { changeMessages: stitchedChangeMessages, changeMessagesBuffer: newChangeMessagesBuffer, ackStreamSequence } = stitchChangeMessages({ changeMessages, changeMessagesBuffer }) + + const { stitchedChangeMessages, newChangeMessagesBuffer, ackStreamSequence } = stitchChangeMessages({ + changeMessagesBuffer: changeMessagesBuffer.addChangeMessages(changeMessages), + useBuffer, + }) changeMessagesBuffer = newChangeMessagesBuffer logger.info([ `Fetched: ${messages.length}`, `Saving: ${stitchedChangeMessages.length}`, `Pending: ${changeMessagesBuffer.size()}`, + `Ack sequence: #${ackStreamSequence}`, `Last sequence: #${lastStreamSequence}`, ].join('. ')) diff --git a/core/src/specs/stitching.spec.ts b/core/src/specs/stitching.spec.ts index 82914f0..2acc2d0 100644 --- a/core/src/specs/stitching.spec.ts +++ b/core/src/specs/stitching.spec.ts @@ -23,14 +23,17 @@ describe('stitchChangeMessages', () => { new ChangeMessage({ subject, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }), ] - const result = stitchChangeMessages({ changeMessages, changeMessagesBuffer: new ChangeMessagesBuffer() }) + const result = stitchChangeMessages({ + changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages), + useBuffer: true, + }) expect(result).toStrictEqual({ - changeMessages: [ + stitchedChangeMessages: [ findChangeMessage(changeMessages, 2).setContext(findChangeMessage(changeMessages, 1).context()), ], ackStreamSequence: 2, - changeMessagesBuffer: ChangeMessagesBuffer.fromStore({}), + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}), }) }) @@ -43,15 +46,18 @@ describe('stitchChangeMessages', () => { new ChangeMessage({ subject, streamSequence: 4, changeAttributes: CHANGE_ATTRIBUTES.DELETE }), ] - const result = stitchChangeMessages({ changeMessages, changeMessagesBuffer: new ChangeMessagesBuffer() }) + const result = stitchChangeMessages({ + changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages), + useBuffer: true, + }) expect(result).toStrictEqual({ - changeMessages: [ + stitchedChangeMessages: [ findChangeMessage(changeMessages, 1).setContext(findChangeMessage(changeMessages, 2).context()), findChangeMessage(changeMessages, 3), ], ackStreamSequence: 3, - changeMessagesBuffer: ChangeMessagesBuffer.fromStore({ + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({ [subject]: { [POSITIONS.DELETE]: [ findChangeMessage(changeMessages, 4), @@ -71,13 +77,16 @@ describe('stitchChangeMessages', () => { new ChangeMessage({ subject, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.UPDATE }), ] - const result1 = stitchChangeMessages({ changeMessages: changeMessages1, changeMessagesBuffer: new ChangeMessagesBuffer() }) + const result1 = stitchChangeMessages({ + changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages1), + useBuffer: true, + }) expect(result1).toStrictEqual({ - changeMessages: [ + stitchedChangeMessages: [ findChangeMessage(changeMessages1, 1).setContext(findChangeMessage(changeMessages1, 2).context()), ], ackStreamSequence: 1, - changeMessagesBuffer: ChangeMessagesBuffer.fromStore({ + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({ [subject]: { [POSITIONS.UPDATE]: [ findChangeMessage(changeMessages1, 3), @@ -91,13 +100,16 @@ describe('stitchChangeMessages', () => { new ChangeMessage({ subject, streamSequence: 5, changeAttributes: CHANGE_ATTRIBUTES.DELETE_MESSAGE, messagePrefix: MESSAGE_PREFIX_CONTEXT }), ] - const result2 = stitchChangeMessages({ changeMessages: changeMessages2, changeMessagesBuffer: result1.changeMessagesBuffer }) + const result2 = stitchChangeMessages({ + changeMessagesBuffer: result1.newChangeMessagesBuffer.addChangeMessages(changeMessages2), + useBuffer: true, + }) expect(result2).toStrictEqual({ - changeMessages: [ + stitchedChangeMessages: [ findChangeMessage(changeMessages1, 3).setContext(findChangeMessage(changeMessages2, 4).context()), ], ackStreamSequence: 3, - changeMessagesBuffer: ChangeMessagesBuffer.fromStore({ + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({ [subject]: { [POSITIONS.DELETE]: [ findChangeMessage(changeMessages2, 5), @@ -113,11 +125,14 @@ describe('stitchChangeMessages', () => { new ChangeMessage({ subject, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE }), ] - const result1 = stitchChangeMessages({ changeMessages: changeMessages1, changeMessagesBuffer: new ChangeMessagesBuffer() }) + const result1 = stitchChangeMessages({ + changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages1), + useBuffer: true, + }) expect(result1).toStrictEqual({ - changeMessages: [], + stitchedChangeMessages: [], ackStreamSequence: undefined, - changeMessagesBuffer: ChangeMessagesBuffer.fromStore({ + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({ [subject]: { [POSITIONS.CREATE]: [ findChangeMessage(changeMessages1, 1), @@ -132,14 +147,17 @@ describe('stitchChangeMessages', () => { new ChangeMessage({ subject, streamSequence: 4, changeAttributes: CHANGE_ATTRIBUTES.DELETE }), ] - const result2 = stitchChangeMessages({ changeMessages: changeMessages2, changeMessagesBuffer: result1.changeMessagesBuffer }) + const result2 = stitchChangeMessages({ + changeMessagesBuffer: result1.newChangeMessagesBuffer.addChangeMessages(changeMessages2), + useBuffer: true, + }) expect(result2).toStrictEqual({ - changeMessages: [ + stitchedChangeMessages: [ findChangeMessage(changeMessages1, 1).setContext(findChangeMessage(changeMessages2, 2).context()), findChangeMessage(changeMessages2, 3), ], ackStreamSequence: 3, - changeMessagesBuffer: ChangeMessagesBuffer.fromStore({ + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({ [subject]: { [POSITIONS.DELETE]: [findChangeMessage(changeMessages2, 4)], }, @@ -153,11 +171,14 @@ describe('stitchChangeMessages', () => { new ChangeMessage({ subject, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE }), ] - const result1 = stitchChangeMessages({ changeMessages: changeMessages1, changeMessagesBuffer: new ChangeMessagesBuffer() }) + const result1 = stitchChangeMessages({ + changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages1), + useBuffer: true, + }) expect(result1).toStrictEqual({ - changeMessages: [], + stitchedChangeMessages: [], ackStreamSequence: undefined, - changeMessagesBuffer: ChangeMessagesBuffer.fromStore({ + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({ [subject]: { [POSITIONS.CREATE]: [ findChangeMessage(changeMessages1, 1), @@ -170,13 +191,16 @@ describe('stitchChangeMessages', () => { new ChangeMessage({ subject, streamSequence: 2, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }), ] - const result2 = stitchChangeMessages({ changeMessages: changeMessages2, changeMessagesBuffer: result1.changeMessagesBuffer }) + const result2 = stitchChangeMessages({ + changeMessagesBuffer: result1.newChangeMessagesBuffer.addChangeMessages(changeMessages2), + useBuffer: true, + }) expect(result2).toStrictEqual({ - changeMessages: [ + stitchedChangeMessages: [ findChangeMessage(changeMessages1, 1), ], ackStreamSequence: 1, - changeMessagesBuffer: ChangeMessagesBuffer.fromStore({}), + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}), }) }) }) diff --git a/core/src/stitching.ts b/core/src/stitching.ts index c233f1a..a910f0d 100644 --- a/core/src/stitching.ts +++ b/core/src/stitching.ts @@ -3,24 +3,24 @@ import { ChangeMessage } from './change-message' import { ChangeMessagesBuffer } from './change-message-buffer' export const stitchChangeMessages = ( - { changeMessages: initialChangeMessages, changeMessagesBuffer: initialChangeMessagesBuffer }: - { changeMessages: ChangeMessage[], changeMessagesBuffer: ChangeMessagesBuffer } + { changeMessagesBuffer, useBuffer = false }: + { changeMessagesBuffer: ChangeMessagesBuffer, useBuffer: boolean } ) => { - const mergedChangeMessagesBuffer = initialChangeMessagesBuffer.addChangeMessages(initialChangeMessages) - let stitchedChangeMsgs: ChangeMessage[] = [] + let stitchedChangeMessages: ChangeMessage[] = [] let ackSequenceBySubject: { [key: string]: string | undefined } = {} let newChangeMessagesBuffer = new ChangeMessagesBuffer() - mergedChangeMessagesBuffer.forEach((subject, sortedChangeMessages) => { + changeMessagesBuffer.forEach((subject, sortedChangeMessages) => { let ackSequence: number | undefined = undefined sortedChangeMessages.forEach((changeMessage) => { const position = changeMessage.changeAttributes.position.toString() - const changeMessages = mergedChangeMessagesBuffer.changeMessagesByPosition(subject, position) + const changeMessages = changeMessagesBuffer.changeMessagesByPosition(subject, position) const contextMessageChangeMessage = changeMessages.find(cm => cm.isContextMessage()) // Last message without a pair - skip it if ( + useBuffer && changeMessage === sortedChangeMessages[sortedChangeMessages.length - 1] && changeMessages.length === 1 ) { @@ -47,14 +47,14 @@ export const stitchChangeMessages = ( if (contextMessageChangeMessage) { // Stitch with context message change message if it exists - stitchedChangeMsgs = [ - ...stitchedChangeMsgs, + stitchedChangeMessages = [ + ...stitchedChangeMessages, changeMessage.setContext(contextMessageChangeMessage.context()), ] } else { // Return mutation change message as is without stitching - stitchedChangeMsgs = [ - ...stitchedChangeMsgs, + stitchedChangeMessages = [ + ...stitchedChangeMessages, changeMessage, ] } @@ -72,11 +72,11 @@ export const stitchChangeMessages = ( !min || streamSequence! < min ? streamSequence : min ), undefined) as undefined | number - logger.debug({ stitched: stitchedChangeMsgs, buffer: newChangeMessagesBuffer.store, ackStreamSequence }) + logger.debug({ stitched: stitchedChangeMessages, buffer: newChangeMessagesBuffer.store, ackStreamSequence }) return { - changeMessages: stitchedChangeMsgs, - changeMessagesBuffer: newChangeMessagesBuffer, + stitchedChangeMessages, + newChangeMessagesBuffer, ackStreamSequence, } } diff --git a/devbox.json b/devbox.json new file mode 100644 index 0000000..d80d6cb --- /dev/null +++ b/devbox.json @@ -0,0 +1,7 @@ +{ + "packages": { + "nodejs": "latest", + "nats-server": "latest", + "nodePackages.pnpm": "latest" + } +} diff --git a/devbox.lock b/devbox.lock new file mode 100644 index 0000000..bd069ca --- /dev/null +++ b/devbox.lock @@ -0,0 +1,65 @@ +{ + "lockfile_version": "1", + "packages": { + "nats-server@latest": { + "last_modified": "2024-01-23T02:13:08Z", + "resolved": "github:NixOS/nixpkgs/4d3d04cfd372a0511f12b885c9822e406b00fe8e#nats-server", + "source": "devbox-search", + "version": "2.10.9", + "systems": { + "aarch64-darwin": { + "store_path": "/nix/store/5yc0acxf8zi0m05cky21zh7689bc21xf-nats-server-2.10.9" + }, + "aarch64-linux": { + "store_path": "/nix/store/f93srmd78zm5a980hp8h59090x6qajyq-nats-server-2.10.9" + }, + "x86_64-darwin": { + "store_path": "/nix/store/p2y7hwn55vivp54qr5r5qr27vq2dnmbd-nats-server-2.10.9" + }, + "x86_64-linux": { + "store_path": "/nix/store/2llnywivgyxf84wmviljmsf6cbfy8k8h-nats-server-2.10.9" + } + } + }, + "nodePackages.pnpm@latest": { + "last_modified": "2024-01-23T02:13:08Z", + "resolved": "github:NixOS/nixpkgs/4d3d04cfd372a0511f12b885c9822e406b00fe8e#nodePackages.pnpm", + "source": "devbox-search", + "version": "8.14.0", + "systems": { + "aarch64-darwin": { + "store_path": "/nix/store/rw37gcc3m93kh2hq3pi8891g41p54518-pnpm-8.14.0" + }, + "aarch64-linux": { + "store_path": "/nix/store/ab961l4fcs1n5ldfy4463kqr4pln1cxc-pnpm-8.14.0" + }, + "x86_64-darwin": { + "store_path": "/nix/store/06qmz8k1gi9s68mcynlyis35w2pj6fl6-pnpm-8.14.0" + }, + "x86_64-linux": { + "store_path": "/nix/store/pgzxiapcydn1mmqh1w4jkl9pcw7ybbj4-pnpm-8.14.0" + } + } + }, + "nodejs@latest": { + "last_modified": "2023-11-19T17:46:56Z", + "resolved": "github:NixOS/nixpkgs/0bf3f5cf6a98b5d077cdcdb00a6d4b3d92bc78b5#nodejs_21", + "source": "devbox-search", + "version": "21.2.0", + "systems": { + "aarch64-darwin": { + "store_path": "/nix/store/waq8bb12nbqwxxn1lgvdzsgs0gqk0d04-nodejs-21.2.0" + }, + "aarch64-linux": { + "store_path": "/nix/store/v03cn5xw03nxykgix04r1l31jjrgf9d3-nodejs-21.2.0" + }, + "x86_64-darwin": { + "store_path": "/nix/store/sv6nivfsqcl8s1dhqjpjy8dqvbx9mxwl-nodejs-21.2.0" + }, + "x86_64-linux": { + "store_path": "/nix/store/iklfdn5l792z46dl5qq9zqzqsmp2m4m9-nodejs-21.2.0" + } + } + } + } +} diff --git a/worker/application.properties b/worker/application.properties new file mode 100644 index 0000000..0be3025 --- /dev/null +++ b/worker/application.properties @@ -0,0 +1,24 @@ +quarkus.log.level=INFO +quarkus.log.console.json=false +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.plugin.name=pgoutput +debezium.source.snapshot.mode=never +debezium.source.database.hostname=DB_HOST +debezium.source.database.port=DB_PORT +debezium.source.database.dbname=DB_NAME +debezium.source.database.user=DB_USER +debezium.source.database.password=DB_PASSWORD +debezium.source.schema.include.list=public +debezium.source.table.exclude.list=public.changes +debezium.source.offset.storage.file.filename=offsets.dat +debezium.source.topic.prefix=bemi +debezium.source.key.converter.schemas.enable=false +debezium.source.value.converter.schemas.enable=false +debezium.transforms=Reroute +debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter +debezium.transforms.Reroute.topic.regex=bemi\..* +debezium.transforms.Reroute.topic.replacement=bemi +debezium.sink.type=nats-jetstream +debezium.sink.nats-jetstream.url=nats://127.0.0.1:4222 +debezium.sink.nats-jetstream.subjects=bemi,__debezium-heartbeat.* +debezium.sink.nats-jetstream.create-stream=true diff --git a/worker/debezium.sh b/worker/debezium.sh new file mode 100755 index 0000000..b65b132 --- /dev/null +++ b/worker/debezium.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# +PROPERTIES=$( ./debezium-server/conf/application.properties && +cd debezium-server && ./run.sh diff --git a/worker/mikro-orm.config.ts b/worker/mikro-orm.config.ts new file mode 100644 index 0000000..3a04061 --- /dev/null +++ b/worker/mikro-orm.config.ts @@ -0,0 +1,30 @@ +import { Options } from "@mikro-orm/core"; +import { SqlHighlighter } from "@mikro-orm/sql-highlighter"; +import type { PostgreSqlDriver } from '@mikro-orm/postgresql' + +const DB_HOST = process.env.DB_HOST +const DB_PORT = parseInt(process.env.DB_PORT as string, 10) +const DB_NAME = process.env.DB_NAME +const DB_USER = process.env.DB_USER +const DB_PASSWORD = process.env.DB_PASSWORD + +const mikroOrmConfig: Options = { + type: 'postgresql' as const, + host: DB_HOST, + port: DB_PORT, + dbName: DB_NAME, + user: DB_USER, + password: DB_PASSWORD, + highlighter: new SqlHighlighter(), + debug: true, + allowGlobalContext: true, + entities: ['../core/src/entities/**/*.js'], + entitiesTs: ['../core/src/entities/**/*.ts'], + migrations: { + path: '../core/src/migrations', + pathTs: '../core/src/migrations', + tableName: '_bemi_migrations', + }, +} + +export default mikroOrmConfig diff --git a/worker/package.json b/worker/package.json new file mode 100644 index 0000000..a2c3cd2 --- /dev/null +++ b/worker/package.json @@ -0,0 +1,20 @@ +{ + "name": "Bemi Worker", + "dependencies": { + "@mikro-orm/core": "^5.9.7", + "@mikro-orm/migrations": "^5.9.7", + "@mikro-orm/postgresql": "^5.9.7", + "@mikro-orm/sql-highlighter": "^1.0.1", + "nats": "^2.19.0" + }, + "devDependencies": { + "@types/node": "^20.11.16", + "concurrently": "^8.2.2", + "ts-node": "^10.9.2" + }, + "scripts": { + "up:worker": "sleep 5 && LOG_LEVEL=DEBUG pnpm ts-node src/index.ts", + "up:nats": "nats-server -js", + "up:debezium": "sleep 1 && pnpm ts-node src/reset.ts && ./debezium.sh" + } +} diff --git a/worker/src/index.ts b/worker/src/index.ts new file mode 100644 index 0000000..112a42b --- /dev/null +++ b/worker/src/index.ts @@ -0,0 +1,28 @@ +import { AckPolicy, DeliverPolicy } from 'nats'; +import { MikroORM } from '@mikro-orm/core'; +import type { PostgreSqlDriver } from '@mikro-orm/postgresql' + +import { connectJetstream, buildConsumer } from '../../core/src/nats' +import { runIngestionLoop } from '../../core/src/ingestion' + +import mikroOrmConfig from "../mikro-orm.config" + +const main = (async () => { + const jetstreamConnection = await connectJetstream('nats://localhost:4222'); + + const consumer = await buildConsumer({ + connection: jetstreamConnection, + stream: 'DebeziumStream', + options: { + durable_name: 'bemi-worker-local', + filter_subject: 'bemi', + ack_policy: AckPolicy.All, + deliver_policy: DeliverPolicy.All, + }, + }); + + const orm = await MikroORM.init(mikroOrmConfig) + await orm.getMigrator().up(); + + await runIngestionLoop({ orm, consumer }) +})() diff --git a/worker/src/reset.ts b/worker/src/reset.ts new file mode 100644 index 0000000..18ec479 --- /dev/null +++ b/worker/src/reset.ts @@ -0,0 +1,26 @@ +import fs from 'fs' +import { MikroORM } from '@mikro-orm/core'; +import type { PostgreSqlDriver } from '@mikro-orm/postgresql' + +import mikroOrmConfig from "../mikro-orm.config" + +const main = (async () => { + const orm = await MikroORM.init(mikroOrmConfig) + await orm.em.getConnection().execute(` + DO $$ + BEGIN + IF EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'debezium') THEN + PERFORM pg_drop_replication_slot('debezium'); + END IF; + END $$; + `) + await orm.close() + + try { + fs.unlinkSync('./debezium-server/offsets.dat') + } catch (e: any) { + if (e.code !== 'ENOENT') { + throw e + } + } +})() diff --git a/worker/tsconfig.json b/worker/tsconfig.json new file mode 100644 index 0000000..eaed8ab --- /dev/null +++ b/worker/tsconfig.json @@ -0,0 +1,24 @@ +{ + "compilerOptions": { + "outDir": "./dist", + "lib": ["esnext"], + "module": "commonjs", + "target": "es6", + "moduleResolution": "node", + "strict": true, + "baseUrl": ".", + "allowJs": true, + "sourceMap": true, + + // MikroORM + "experimentalDecorators": true, + + "emitDecoratorMetadata": true, + "esModuleInterop": true, + "inlineSources": true, + + "sourceRoot": "/" + }, + "include": ["mikro-orm.config.ts", "src/index.ts", "core/src"], + "exclude": ["node_modules", "core/node_modules", "core/src/specs"] +}