From 384c667c1135859de2793f7c8b74439f171ce751 Mon Sep 17 00:00:00 2001 From: exAspArk Date: Fri, 2 Feb 2024 11:51:47 -0500 Subject: [PATCH] Init worker --- .gitignore | 4 ++++ Makefile | 22 ++++++++++++----- README.md | 16 ++++++++++++- core/package.json | 15 +++++++++--- core/src/ingestion.ts | 12 +++++----- devbox.json | 6 +++++ devbox.lock | 45 +++++++++++++++++++++++++++++++++++ worker/application.properties | 24 +++++++++++++++++++ worker/debezium.sh | 10 ++++++++ worker/mikro-orm.config.ts | 30 +++++++++++++++++++++++ worker/package.json | 20 ++++++++++++++++ worker/src/index.ts | 38 +++++++++++++++++++++++++++++ worker/tsconfig.json | 24 +++++++++++++++++++ 13 files changed, 250 insertions(+), 16 deletions(-) create mode 100644 devbox.json create mode 100644 devbox.lock create mode 100644 worker/application.properties create mode 100755 worker/debezium.sh create mode 100644 worker/mikro-orm.config.ts create mode 100644 worker/package.json create mode 100644 worker/src/index.ts create mode 100644 worker/tsconfig.json diff --git a/.gitignore b/.gitignore index 6bd811c..1ffdcf0 100644 --- a/.gitignore +++ b/.gitignore @@ -152,6 +152,10 @@ dist .dynamodb/ +# Debezium + +debezium-server/ + # TernJS port file .tern-port diff --git a/Makefile b/Makefile index 9e56735..aebff96 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,24 @@ core-install: - devbox run "cd core && pnpm install" + @devbox run "cd core && pnpm install" core-test: - devbox run "cd core && pnpm test" + @devbox run "cd core && pnpm test" + +worker-install: + @devbox run "cd worker && pnpm install" && \ + cd worker && \ + ([ ! -d debezium-server ] && \ + curl -O https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/2.5.0.Final/debezium-server-dist-2.5.0.Final.tar.gz && \ + tar -xvf debezium-server-dist-2.5.0.Final.tar.gz && \ + rm debezium-server-dist-2.5.0.Final.tar.gz) || true +worker-up: + @devbox run "cd worker && pnpm concurrently -- \"pnpm:up:*\"" docs-install: - devbox run "cd docs && pnpm install" + @devbox run "cd docs && pnpm install" docs-up: - devbox run "cd docs && pnpm start" + @devbox run "cd docs && pnpm start" docs-build: - devbox run "cd docs && pnpm build" + @devbox run "cd docs && pnpm build" sh: - devbox shell + @devbox shell diff --git a/README.md b/README.md index e7b8ebe..5b18af2 100644 --- a/README.md +++ b/README.md @@ -22,4 +22,18 @@ # Bemi -Automatic data change tracking for PostgreSQL. +## Running locally + +```sh +make worker-install + +export DB_HOST=127.0.0.1 DB_PORT=5432 DB_NAME=postgres DB_USER=postgres DB_PASSWORD=postgres +make worker-up +``` + +## Testing + +```sh +make core-install +make core-test +``` diff --git a/core/package.json b/core/package.json index 732c6d5..28db757 100644 --- a/core/package.json +++ b/core/package.json @@ -25,14 +25,23 @@ "dependencies": { "@mikro-orm/core": "^5.9.4", "@mikro-orm/migrations": "^5.9.4", + "@mikro-orm/postgresql": "^5.9.4", "nats": "^2.19.0" }, "jest": { - "moduleFileExtensions": ["js", "json", "ts"], + "moduleFileExtensions": [ + "js", + "json", + "ts" + ], "rootDir": "src", "testRegex": ".*\\.spec\\.ts$", - "transform": { "^.+\\.(t|j)s$": "ts-jest" }, - "collectCoverageFrom": ["**/*.(t|j)s"], + "transform": { + "^.+\\.(t|j)s$": "ts-jest" + }, + "collectCoverageFrom": [ + "**/*.(t|j)s" + ], "coverageDirectory": "../coverage", "testEnvironment": "node" } diff --git a/core/src/ingestion.ts b/core/src/ingestion.ts index 1ea5873..5644748 100644 --- a/core/src/ingestion.ts +++ b/core/src/ingestion.ts @@ -1,12 +1,12 @@ import { MikroORM } from '@mikro-orm/core'; import type { PostgreSqlDriver } from '@mikro-orm/postgresql' -import { logger } from '../core/logger' -import { Message } from '../core/nats' -import { Change } from "../core/entities/Change" -import { ChangeMessage } from '../core/change-message' -import { ChangeMessagesBuffer } from '../core/change-message-buffer' -import { stitchChangeMessages } from '../core/stitching' +import { logger } from './logger' +import { Message } from './nats' +import { Change } from "./entities/Change" +import { ChangeMessage } from './change-message' +import { ChangeMessagesBuffer } from './change-message-buffer' +import { stitchChangeMessages } from './stitching' const sleep = (seconds: number) => ( new Promise(resolve => setTimeout(resolve, seconds * 1000)) diff --git a/devbox.json b/devbox.json new file mode 100644 index 0000000..9abb630 --- /dev/null +++ b/devbox.json @@ -0,0 +1,6 @@ +{ + "packages": { + "nodejs": "latest", + "nats-server": "latest" + } +} diff --git a/devbox.lock b/devbox.lock new file mode 100644 index 0000000..edff0e9 --- /dev/null +++ b/devbox.lock @@ -0,0 +1,45 @@ +{ + "lockfile_version": "1", + "packages": { + "nats-server@latest": { + "last_modified": "2024-01-23T02:13:08Z", + "resolved": "github:NixOS/nixpkgs/4d3d04cfd372a0511f12b885c9822e406b00fe8e#nats-server", + "source": "devbox-search", + "version": "2.10.9", + "systems": { + "aarch64-darwin": { + "store_path": "/nix/store/5yc0acxf8zi0m05cky21zh7689bc21xf-nats-server-2.10.9" + }, + "aarch64-linux": { + "store_path": "/nix/store/f93srmd78zm5a980hp8h59090x6qajyq-nats-server-2.10.9" + }, + "x86_64-darwin": { + "store_path": "/nix/store/p2y7hwn55vivp54qr5r5qr27vq2dnmbd-nats-server-2.10.9" + }, + "x86_64-linux": { + "store_path": "/nix/store/2llnywivgyxf84wmviljmsf6cbfy8k8h-nats-server-2.10.9" + } + } + }, + "nodejs@latest": { + "last_modified": "2023-11-19T17:46:56Z", + "resolved": "github:NixOS/nixpkgs/0bf3f5cf6a98b5d077cdcdb00a6d4b3d92bc78b5#nodejs_21", + "source": "devbox-search", + "version": "21.2.0", + "systems": { + "aarch64-darwin": { + "store_path": "/nix/store/waq8bb12nbqwxxn1lgvdzsgs0gqk0d04-nodejs-21.2.0" + }, + "aarch64-linux": { + "store_path": "/nix/store/v03cn5xw03nxykgix04r1l31jjrgf9d3-nodejs-21.2.0" + }, + "x86_64-darwin": { + "store_path": "/nix/store/sv6nivfsqcl8s1dhqjpjy8dqvbx9mxwl-nodejs-21.2.0" + }, + "x86_64-linux": { + "store_path": "/nix/store/iklfdn5l792z46dl5qq9zqzqsmp2m4m9-nodejs-21.2.0" + } + } + } + } +} diff --git a/worker/application.properties b/worker/application.properties new file mode 100644 index 0000000..3596f25 --- /dev/null +++ b/worker/application.properties @@ -0,0 +1,24 @@ +quarkus.log.level=INFO +quarkus.log.console.json=false +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.slot.name=bemi_worker_local +debezium.source.publication.name=bemi_worker_local +debezium.source.plugin.name=pgoutput +debezium.source.snapshot.mode=never +debezium.source.database.hostname=DB_HOST +debezium.source.database.port=DB_PORT +debezium.source.database.dbname=DB_NAME +debezium.source.database.user=DB_USER +debezium.source.database.password=DB_PASSWORD +debezium.source.schema.include.list=public +debezium.source.table.exclude.list=public.changes +debezium.source.heartbeat.interval.ms=10000 +debezium.source.heartbeat.action.query=INSERT INTO _bemi_migrations (id, name, executed_at) VALUES (0, 'Heartbeat', NOW()) ON CONFLICT (id) DO UPDATE SET executed_at = NOW() +debezium.source.offset.storage.file.filename=/dev/null +debezium.sink.type=nats-jetstream +debezium.sink.nats-jetstream.url=nats://127.0.0.1:4222 +debezium.sink.nats-jetstream.subjects=bemi.*.*,__debezium-heartbeat.bemi +debezium.sink.nats-jetstream.create-stream=true +debezium.source.key.converter.schemas.enable=false +debezium.source.value.converter.schemas.enable=false +debezium.source.topic.prefix=bemi diff --git a/worker/debezium.sh b/worker/debezium.sh new file mode 100755 index 0000000..b65b132 --- /dev/null +++ b/worker/debezium.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# +PROPERTIES=$( ./debezium-server/conf/application.properties && +cd debezium-server && ./run.sh diff --git a/worker/mikro-orm.config.ts b/worker/mikro-orm.config.ts new file mode 100644 index 0000000..3a04061 --- /dev/null +++ b/worker/mikro-orm.config.ts @@ -0,0 +1,30 @@ +import { Options } from "@mikro-orm/core"; +import { SqlHighlighter } from "@mikro-orm/sql-highlighter"; +import type { PostgreSqlDriver } from '@mikro-orm/postgresql' + +const DB_HOST = process.env.DB_HOST +const DB_PORT = parseInt(process.env.DB_PORT as string, 10) +const DB_NAME = process.env.DB_NAME +const DB_USER = process.env.DB_USER +const DB_PASSWORD = process.env.DB_PASSWORD + +const mikroOrmConfig: Options = { + type: 'postgresql' as const, + host: DB_HOST, + port: DB_PORT, + dbName: DB_NAME, + user: DB_USER, + password: DB_PASSWORD, + highlighter: new SqlHighlighter(), + debug: true, + allowGlobalContext: true, + entities: ['../core/src/entities/**/*.js'], + entitiesTs: ['../core/src/entities/**/*.ts'], + migrations: { + path: '../core/src/migrations', + pathTs: '../core/src/migrations', + tableName: '_bemi_migrations', + }, +} + +export default mikroOrmConfig diff --git a/worker/package.json b/worker/package.json new file mode 100644 index 0000000..5947720 --- /dev/null +++ b/worker/package.json @@ -0,0 +1,20 @@ +{ + "name": "Bemi Worker", + "dependencies": { + "@mikro-orm/core": "^5.9.7", + "@mikro-orm/migrations": "^5.9.7", + "@mikro-orm/postgresql": "^5.9.7", + "@mikro-orm/sql-highlighter": "^1.0.1", + "nats": "^2.19.0" + }, + "devDependencies": { + "@types/node": "^20.11.16", + "concurrently": "^8.2.2", + "ts-node": "^10.9.2" + }, + "scripts": { + "up:worker": "sleep 5 && pnpm ts-node src/index.ts", + "up:nats": "nats-server -js", + "up:debezium": "./debezium.sh" + } +} diff --git a/worker/src/index.ts b/worker/src/index.ts new file mode 100644 index 0000000..43c6e6f --- /dev/null +++ b/worker/src/index.ts @@ -0,0 +1,38 @@ +import { AckPolicy, DeliverPolicy } from 'nats'; +import { MikroORM } from '@mikro-orm/core'; +import type { PostgreSqlDriver } from '@mikro-orm/postgresql' + +process.env.LOG_LEVEL = 'DEBUG' + +import { connectJetstream, buildConsumer } from '../../core/src/nats' +import { runIngestionLoop } from '../../core/src/ingestion' + +import mikroOrmConfig from "../mikro-orm.config" + +const FETCH_BATCH_SIZE = 100 +const INSERT_BATCH_SIZE = 100 +const REFETCH_EMPTY_AFTER_SECONDS = 5 +const NATS_JETSTREAM_HOST = 'nats://localhost:4222' +const NATS_JETSTREAM_STREAM = 'DebeziumStream' +const NATS_JETSTREAM_SUBJECT = 'bemi.*.*' +const NATS_JETSTREAM_CONSUMER_OPTIONS = { + ack_policy: AckPolicy.All, + deliver_policy: DeliverPolicy.All, + filter_subject: NATS_JETSTREAM_SUBJECT, +} + +const main = (async () => { + const jetstreamConnection = await connectJetstream(NATS_JETSTREAM_HOST); + const consumer = await buildConsumer({ connection: jetstreamConnection, stream: NATS_JETSTREAM_STREAM, options: NATS_JETSTREAM_CONSUMER_OPTIONS }); + + const orm = await MikroORM.init(mikroOrmConfig) + await orm.getMigrator().up(); + + await runIngestionLoop({ + orm, + consumer, + fetchBatchSize: FETCH_BATCH_SIZE, + refetchEmptyAfterSeconds: REFETCH_EMPTY_AFTER_SECONDS, + insertBatchSize: INSERT_BATCH_SIZE, + }) +})() diff --git a/worker/tsconfig.json b/worker/tsconfig.json new file mode 100644 index 0000000..eaed8ab --- /dev/null +++ b/worker/tsconfig.json @@ -0,0 +1,24 @@ +{ + "compilerOptions": { + "outDir": "./dist", + "lib": ["esnext"], + "module": "commonjs", + "target": "es6", + "moduleResolution": "node", + "strict": true, + "baseUrl": ".", + "allowJs": true, + "sourceMap": true, + + // MikroORM + "experimentalDecorators": true, + + "emitDecoratorMetadata": true, + "esModuleInterop": true, + "inlineSources": true, + + "sourceRoot": "/" + }, + "include": ["mikro-orm.config.ts", "src/index.ts", "core/src"], + "exclude": ["node_modules", "core/node_modules", "core/src/specs"] +}