Skip to content

Commit

Permalink
Init worker
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Feb 3, 2024
1 parent 273317e commit 274d8ea
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 15 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ dist

.dynamodb/

# Debezium

debezium-server/

# TernJS port file

.tern-port
Expand Down
22 changes: 16 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
core-install:
devbox run "cd core && pnpm install"
@devbox run "cd core && pnpm install"
core-test:
devbox run "cd core && pnpm test"
@devbox run "cd core && pnpm test"

worker-install:
@devbox run "cd worker && pnpm install" && \
cd worker && \
([ ! -d debezium-server ] && \
curl -O https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/2.5.0.Final/debezium-server-dist-2.5.0.Final.tar.gz && \
tar -xvf debezium-server-dist-2.5.0.Final.tar.gz && \
rm debezium-server-dist-2.5.0.Final.tar.gz) || true
worker-up:
@devbox run "cd worker && pnpm concurrently -- \"pnpm:up:*\""

docs-install:
devbox run "cd docs && pnpm install"
@devbox run "cd docs && pnpm install"
docs-up:
devbox run "cd docs && pnpm start"
@devbox run "cd docs && pnpm start"
docs-build:
devbox run "cd docs && pnpm build"
@devbox run "cd docs && pnpm build"

sh:
devbox shell
@devbox shell
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@

# Bemi

## Running locally

```sh
make worker-install

export DB_HOST=127.0.0.1 DB_PORT=5432 DB_NAME=postgres DB_USER=postgres DB_PASSWORD=postgres
make worker-up
```

## Testing

```sh
make core-install
make core-test
```

## Architecture

![Bemi Worker Architecture](docs/static/img/worker.png)
Expand Down
15 changes: 12 additions & 3 deletions core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,23 @@
"dependencies": {
"@mikro-orm/core": "^5.9.4",
"@mikro-orm/migrations": "^5.9.4",
"@mikro-orm/postgresql": "^5.9.4",
"nats": "^2.19.0"
},
"jest": {
"moduleFileExtensions": ["js", "json", "ts"],
"moduleFileExtensions": [
"js",
"json",
"ts"
],
"rootDir": "src",
"testRegex": ".*\\.spec\\.ts$",
"transform": { "^.+\\.(t|j)s$": "ts-jest" },
"collectCoverageFrom": ["**/*.(t|j)s"],
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"collectCoverageFrom": [
"**/*.(t|j)s"
],
"coverageDirectory": "../coverage",
"testEnvironment": "node"
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/ingestion.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { MikroORM } from '@mikro-orm/core';
import type { PostgreSqlDriver } from '@mikro-orm/postgresql'

import { logger } from '../core/logger'
import { Message } from '../core/nats'
import { Change } from "../core/entities/Change"
import { ChangeMessage } from '../core/change-message'
import { ChangeMessagesBuffer } from '../core/change-message-buffer'
import { stitchChangeMessages } from '../core/stitching'
import { logger } from './logger'
import { Message } from './nats'
import { Change } from "./entities/Change"
import { ChangeMessage } from './change-message'
import { ChangeMessagesBuffer } from './change-message-buffer'
import { stitchChangeMessages } from './stitching'

const sleep = (seconds: number) => (
new Promise(resolve => setTimeout(resolve, seconds * 1000))
Expand Down
6 changes: 6 additions & 0 deletions devbox.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"packages": {
"nodejs": "latest",
"nats-server": "latest"
}
}
45 changes: 45 additions & 0 deletions devbox.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"lockfile_version": "1",
"packages": {
"nats-server@latest": {
"last_modified": "2024-01-23T02:13:08Z",
"resolved": "github:NixOS/nixpkgs/4d3d04cfd372a0511f12b885c9822e406b00fe8e#nats-server",
"source": "devbox-search",
"version": "2.10.9",
"systems": {
"aarch64-darwin": {
"store_path": "/nix/store/5yc0acxf8zi0m05cky21zh7689bc21xf-nats-server-2.10.9"
},
"aarch64-linux": {
"store_path": "/nix/store/f93srmd78zm5a980hp8h59090x6qajyq-nats-server-2.10.9"
},
"x86_64-darwin": {
"store_path": "/nix/store/p2y7hwn55vivp54qr5r5qr27vq2dnmbd-nats-server-2.10.9"
},
"x86_64-linux": {
"store_path": "/nix/store/2llnywivgyxf84wmviljmsf6cbfy8k8h-nats-server-2.10.9"
}
}
},
"nodejs@latest": {
"last_modified": "2023-11-19T17:46:56Z",
"resolved": "github:NixOS/nixpkgs/0bf3f5cf6a98b5d077cdcdb00a6d4b3d92bc78b5#nodejs_21",
"source": "devbox-search",
"version": "21.2.0",
"systems": {
"aarch64-darwin": {
"store_path": "/nix/store/waq8bb12nbqwxxn1lgvdzsgs0gqk0d04-nodejs-21.2.0"
},
"aarch64-linux": {
"store_path": "/nix/store/v03cn5xw03nxykgix04r1l31jjrgf9d3-nodejs-21.2.0"
},
"x86_64-darwin": {
"store_path": "/nix/store/sv6nivfsqcl8s1dhqjpjy8dqvbx9mxwl-nodejs-21.2.0"
},
"x86_64-linux": {
"store_path": "/nix/store/iklfdn5l792z46dl5qq9zqzqsmp2m4m9-nodejs-21.2.0"
}
}
}
}
}
26 changes: 26 additions & 0 deletions worker/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
quarkus.log.level=INFO
quarkus.log.console.json=false
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.plugin.name=pgoutput
debezium.source.snapshot.mode=never
debezium.source.database.hostname=DB_HOST
debezium.source.database.port=DB_PORT
debezium.source.database.dbname=DB_NAME
debezium.source.database.user=DB_USER
debezium.source.database.password=DB_PASSWORD
debezium.source.schema.include.list=public
debezium.source.table.exclude.list=public.changes
debezium.source.heartbeat.interval.ms=15000
debezium.source.heartbeat.action.query=INSERT INTO _bemi_migrations (id, name, executed_at) VALUES (0, 'Heartbeat', NOW()) ON CONFLICT (id) DO UPDATE SET executed_at = NOW()
debezium.source.offset.storage.file.filename=offsets.dat
debezium.source.topic.prefix=bemi
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false
debezium.transforms=Reroute
debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
debezium.transforms.Reroute.topic.regex=bemi\..*
debezium.transforms.Reroute.topic.replacement=bemi
debezium.sink.type=nats-jetstream
debezium.sink.nats-jetstream.url=nats://127.0.0.1:4222
debezium.sink.nats-jetstream.subjects=bemi,*.*
debezium.sink.nats-jetstream.create-stream=true
10 changes: 10 additions & 0 deletions worker/debezium.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
#
PROPERTIES=$(<application.properties) &&
PROPERTIES="${PROPERTIES//DB_HOST/$DB_HOST}" &&
PROPERTIES="${PROPERTIES//DB_PORT/$DB_PORT}" &&
PROPERTIES="${PROPERTIES//DB_NAME/$DB_NAME}" &&
PROPERTIES="${PROPERTIES//DB_USER/$DB_USER}" &&
PROPERTIES="${PROPERTIES//DB_PASSWORD/$DB_PASSWORD}" &&
echo "${PROPERTIES}" > ./debezium-server/conf/application.properties &&
cd debezium-server && ./run.sh
30 changes: 30 additions & 0 deletions worker/mikro-orm.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Options } from "@mikro-orm/core";
import { SqlHighlighter } from "@mikro-orm/sql-highlighter";
import type { PostgreSqlDriver } from '@mikro-orm/postgresql'

const DB_HOST = process.env.DB_HOST
const DB_PORT = parseInt(process.env.DB_PORT as string, 10)
const DB_NAME = process.env.DB_NAME
const DB_USER = process.env.DB_USER
const DB_PASSWORD = process.env.DB_PASSWORD

const mikroOrmConfig: Options<PostgreSqlDriver> = {
type: 'postgresql' as const,
host: DB_HOST,
port: DB_PORT,
dbName: DB_NAME,
user: DB_USER,
password: DB_PASSWORD,
highlighter: new SqlHighlighter(),
debug: true,
allowGlobalContext: true,
entities: ['../core/src/entities/**/*.js'],
entitiesTs: ['../core/src/entities/**/*.ts'],
migrations: {
path: '../core/src/migrations',
pathTs: '../core/src/migrations',
tableName: '_bemi_migrations',
},
}

export default mikroOrmConfig
20 changes: 20 additions & 0 deletions worker/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "Bemi Worker",
"dependencies": {
"@mikro-orm/core": "^5.9.7",
"@mikro-orm/migrations": "^5.9.7",
"@mikro-orm/postgresql": "^5.9.7",
"@mikro-orm/sql-highlighter": "^1.0.1",
"nats": "^2.19.0"
},
"devDependencies": {
"@types/node": "^20.11.16",
"concurrently": "^8.2.2",
"ts-node": "^10.9.2"
},
"scripts": {
"up:worker": "sleep 5 && pnpm ts-node src/index.ts",
"up:nats": "nats-server -js",
"up:debezium": "pnpm ts-node src/reset.ts && ./debezium.sh"
}
}
34 changes: 34 additions & 0 deletions worker/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { AckPolicy, DeliverPolicy } from 'nats';
import { MikroORM } from '@mikro-orm/core';
import type { PostgreSqlDriver } from '@mikro-orm/postgresql'

import { connectJetstream, buildConsumer } from '../../core/src/nats'
import { runIngestionLoop } from '../../core/src/ingestion'

import mikroOrmConfig from "../mikro-orm.config"

const main = (async () => {
const jetstreamConnection = await connectJetstream('nats://localhost:4222');

const consumer = await buildConsumer({
connection: jetstreamConnection,
stream: 'DebeziumStream',
options: {
durable_name: 'bemi-worker-local',
ack_policy: AckPolicy.All,
deliver_policy: DeliverPolicy.All,
filter_subject: 'bemi',
},
});

const orm = await MikroORM.init<PostgreSqlDriver>(mikroOrmConfig)
await orm.getMigrator().up();

await runIngestionLoop({
orm,
consumer,
fetchBatchSize: 100,
insertBatchSize: 100,
refetchEmptyAfterSeconds: 5,
})
})()
17 changes: 17 additions & 0 deletions worker/src/reset.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { MikroORM } from '@mikro-orm/core';
import type { PostgreSqlDriver } from '@mikro-orm/postgresql'

import mikroOrmConfig from "../mikro-orm.config"

const main = (async () => {
const orm = await MikroORM.init<PostgreSqlDriver>(mikroOrmConfig)
await orm.em.getConnection().execute(`
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'debezium') THEN
PERFORM pg_drop_replication_slot('debezium');
END IF;
END $$;
`)
await orm.close()
})()
24 changes: 24 additions & 0 deletions worker/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"compilerOptions": {
"outDir": "./dist",
"lib": ["esnext"],
"module": "commonjs",
"target": "es6",
"moduleResolution": "node",
"strict": true,
"baseUrl": ".",
"allowJs": true,
"sourceMap": true,

// MikroORM
"experimentalDecorators": true,

"emitDecoratorMetadata": true,
"esModuleInterop": true,
"inlineSources": true,

"sourceRoot": "/"
},
"include": ["mikro-orm.config.ts", "src/index.ts", "core/src"],
"exclude": ["node_modules", "core/node_modules", "core/src/specs"]
}

0 comments on commit 274d8ea

Please sign in to comment.