Skip to content

Commit

Permalink
Init worker
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Feb 4, 2024
1 parent 273317e commit 6c05b6f
Show file tree
Hide file tree
Showing 16 changed files with 371 additions and 55 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ dist

.dynamodb/

# Debezium

debezium-server/

# TernJS port file

.tern-port
Expand Down
23 changes: 17 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
core-install:
devbox run "cd core && pnpm install"
@devbox run "cd core && pnpm install"
core-test:
devbox run "cd core && pnpm test"
@devbox run "cd core && pnpm test"

worker-setup:
@cd worker && \
([ ! -d debezium-server ] && \
curl -O https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/2.5.0.Final/debezium-server-dist-2.5.0.Final.tar.gz && \
tar -xvf debezium-server-dist-2.5.0.Final.tar.gz && \
rm debezium-server-dist-2.5.0.Final.tar.gz) || true
worker-install: worker-setup
@devbox run "cd worker && pnpm install"
worker-up:
@devbox run "cd worker && pnpm concurrently -- \"pnpm:up:*\""

docs-install:
devbox run "cd docs && pnpm install"
@devbox run "cd docs && pnpm install"
docs-up:
devbox run "cd docs && pnpm start"
@devbox run "cd docs && pnpm start"
docs-build:
devbox run "cd docs && pnpm build"
@devbox run "cd docs && pnpm build"

sh:
devbox shell
@devbox shell
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@

# Bemi

## Running locally

```sh
make worker-install

export DB_HOST=127.0.0.1 DB_PORT=5432 DB_NAME=postgres DB_USER=postgres DB_PASSWORD=postgres
make worker-up
```

## Testing

```sh
make core-install
make core-test
```

## Architecture

![Bemi Worker Architecture](docs/static/img/worker.png)
Expand Down
15 changes: 12 additions & 3 deletions core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,23 @@
"dependencies": {
"@mikro-orm/core": "^5.9.4",
"@mikro-orm/migrations": "^5.9.4",
"@mikro-orm/postgresql": "^5.9.4",
"nats": "^2.19.0"
},
"jest": {
"moduleFileExtensions": ["js", "json", "ts"],
"moduleFileExtensions": [
"js",
"json",
"ts"
],
"rootDir": "src",
"testRegex": ".*\\.spec\\.ts$",
"transform": { "^.+\\.(t|j)s$": "ts-jest" },
"collectCoverageFrom": ["**/*.(t|j)s"],
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"collectCoverageFrom": [
"**/*.(t|j)s"
],
"coverageDirectory": "../coverage",
"testEnvironment": "node"
}
Expand Down
36 changes: 27 additions & 9 deletions core/src/ingestion.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { MikroORM } from '@mikro-orm/core';
import type { PostgreSqlDriver } from '@mikro-orm/postgresql'

import { logger } from '../core/logger'
import { Message } from '../core/nats'
import { Change } from "../core/entities/Change"
import { ChangeMessage } from '../core/change-message'
import { ChangeMessagesBuffer } from '../core/change-message-buffer'
import { stitchChangeMessages } from '../core/stitching'
import { logger } from './logger'
import { Message } from './nats'
import { Change } from "./entities/Change"
import { ChangeMessage } from './change-message'
import { ChangeMessagesBuffer } from './change-message-buffer'
import { stitchChangeMessages } from './stitching'

const sleep = (seconds: number) => (
new Promise(resolve => setTimeout(resolve, seconds * 1000))
Expand Down Expand Up @@ -62,8 +62,21 @@ const fetchMessages = async (
}

export const runIngestionLoop = async (
{ orm, consumer, fetchBatchSize, refetchEmptyAfterSeconds, insertBatchSize }:
{ orm: MikroORM<PostgreSqlDriver>, consumer: any, fetchBatchSize: number, refetchEmptyAfterSeconds: number, insertBatchSize: number }
{
orm,
consumer,
fetchBatchSize = 100,
insertBatchSize = 100,
refetchEmptyAfterSeconds = 10,
useBuffer = false,
}: {
orm: MikroORM<PostgreSqlDriver>,
consumer: any,
fetchBatchSize?: number,
refetchEmptyAfterSeconds?: number,
insertBatchSize?: number,
useBuffer?: boolean,
}
) => {
let lastStreamSequence: number | null = null
let changeMessagesBuffer = new ChangeMessagesBuffer()
Expand All @@ -78,13 +91,18 @@ export const runIngestionLoop = async (
// Stitching
const now = new Date()
const changeMessages = messages.map((message: Message) => ChangeMessage.fromMessage(message, now))
const { changeMessages: stitchedChangeMessages, changeMessagesBuffer: newChangeMessagesBuffer, ackStreamSequence } = stitchChangeMessages({ changeMessages, changeMessagesBuffer })

const { stitchedChangeMessages, newChangeMessagesBuffer, ackStreamSequence } = stitchChangeMessages({
changeMessagesBuffer: changeMessagesBuffer.addChangeMessages(changeMessages),
useBuffer,
})
changeMessagesBuffer = newChangeMessagesBuffer

logger.info([
`Fetched: ${messages.length}`,
`Saving: ${stitchedChangeMessages.length}`,
`Pending: ${changeMessagesBuffer.size()}`,
`Ack sequence: #${ackStreamSequence}`,
`Last sequence: #${lastStreamSequence}`,
].join('. '))

Expand Down
72 changes: 48 additions & 24 deletions core/src/specs/stitching.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ describe('stitchChangeMessages', () => {
new ChangeMessage({ subject, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }),
]

const result = stitchChangeMessages({ changeMessages, changeMessagesBuffer: new ChangeMessagesBuffer() })
const result = stitchChangeMessages({
changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages),
useBuffer: true,
})

expect(result).toStrictEqual({
changeMessages: [
stitchedChangeMessages: [
findChangeMessage(changeMessages, 2).setContext(findChangeMessage(changeMessages, 1).context()),
],
ackStreamSequence: 2,
changeMessagesBuffer: ChangeMessagesBuffer.fromStore({}),
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}),
})
})

Expand All @@ -43,15 +46,18 @@ describe('stitchChangeMessages', () => {
new ChangeMessage({ subject, streamSequence: 4, changeAttributes: CHANGE_ATTRIBUTES.DELETE }),
]

const result = stitchChangeMessages({ changeMessages, changeMessagesBuffer: new ChangeMessagesBuffer() })
const result = stitchChangeMessages({
changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages),
useBuffer: true,
})

expect(result).toStrictEqual({
changeMessages: [
stitchedChangeMessages: [
findChangeMessage(changeMessages, 1).setContext(findChangeMessage(changeMessages, 2).context()),
findChangeMessage(changeMessages, 3),
],
ackStreamSequence: 3,
changeMessagesBuffer: ChangeMessagesBuffer.fromStore({
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({
[subject]: {
[POSITIONS.DELETE]: [
findChangeMessage(changeMessages, 4),
Expand All @@ -71,13 +77,16 @@ describe('stitchChangeMessages', () => {
new ChangeMessage({ subject, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.UPDATE }),
]

const result1 = stitchChangeMessages({ changeMessages: changeMessages1, changeMessagesBuffer: new ChangeMessagesBuffer() })
const result1 = stitchChangeMessages({
changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages1),
useBuffer: true,
})
expect(result1).toStrictEqual({
changeMessages: [
stitchedChangeMessages: [
findChangeMessage(changeMessages1, 1).setContext(findChangeMessage(changeMessages1, 2).context()),
],
ackStreamSequence: 1,
changeMessagesBuffer: ChangeMessagesBuffer.fromStore({
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({
[subject]: {
[POSITIONS.UPDATE]: [
findChangeMessage(changeMessages1, 3),
Expand All @@ -91,13 +100,16 @@ describe('stitchChangeMessages', () => {
new ChangeMessage({ subject, streamSequence: 5, changeAttributes: CHANGE_ATTRIBUTES.DELETE_MESSAGE, messagePrefix: MESSAGE_PREFIX_CONTEXT }),
]

const result2 = stitchChangeMessages({ changeMessages: changeMessages2, changeMessagesBuffer: result1.changeMessagesBuffer })
const result2 = stitchChangeMessages({
changeMessagesBuffer: result1.newChangeMessagesBuffer.addChangeMessages(changeMessages2),
useBuffer: true,
})
expect(result2).toStrictEqual({
changeMessages: [
stitchedChangeMessages: [
findChangeMessage(changeMessages1, 3).setContext(findChangeMessage(changeMessages2, 4).context()),
],
ackStreamSequence: 3,
changeMessagesBuffer: ChangeMessagesBuffer.fromStore({
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({
[subject]: {
[POSITIONS.DELETE]: [
findChangeMessage(changeMessages2, 5),
Expand All @@ -113,11 +125,14 @@ describe('stitchChangeMessages', () => {
new ChangeMessage({ subject, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE }),
]

const result1 = stitchChangeMessages({ changeMessages: changeMessages1, changeMessagesBuffer: new ChangeMessagesBuffer() })
const result1 = stitchChangeMessages({
changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages1),
useBuffer: true,
})
expect(result1).toStrictEqual({
changeMessages: [],
stitchedChangeMessages: [],
ackStreamSequence: undefined,
changeMessagesBuffer: ChangeMessagesBuffer.fromStore({
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({
[subject]: {
[POSITIONS.CREATE]: [
findChangeMessage(changeMessages1, 1),
Expand All @@ -132,14 +147,17 @@ describe('stitchChangeMessages', () => {
new ChangeMessage({ subject, streamSequence: 4, changeAttributes: CHANGE_ATTRIBUTES.DELETE }),
]

const result2 = stitchChangeMessages({ changeMessages: changeMessages2, changeMessagesBuffer: result1.changeMessagesBuffer })
const result2 = stitchChangeMessages({
changeMessagesBuffer: result1.newChangeMessagesBuffer.addChangeMessages(changeMessages2),
useBuffer: true,
})
expect(result2).toStrictEqual({
changeMessages: [
stitchedChangeMessages: [
findChangeMessage(changeMessages1, 1).setContext(findChangeMessage(changeMessages2, 2).context()),
findChangeMessage(changeMessages2, 3),
],
ackStreamSequence: 3,
changeMessagesBuffer: ChangeMessagesBuffer.fromStore({
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({
[subject]: {
[POSITIONS.DELETE]: [findChangeMessage(changeMessages2, 4)],
},
Expand All @@ -153,11 +171,14 @@ describe('stitchChangeMessages', () => {
new ChangeMessage({ subject, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE }),
]

const result1 = stitchChangeMessages({ changeMessages: changeMessages1, changeMessagesBuffer: new ChangeMessagesBuffer() })
const result1 = stitchChangeMessages({
changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages1),
useBuffer: true,
})
expect(result1).toStrictEqual({
changeMessages: [],
stitchedChangeMessages: [],
ackStreamSequence: undefined,
changeMessagesBuffer: ChangeMessagesBuffer.fromStore({
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({
[subject]: {
[POSITIONS.CREATE]: [
findChangeMessage(changeMessages1, 1),
Expand All @@ -170,13 +191,16 @@ describe('stitchChangeMessages', () => {
new ChangeMessage({ subject, streamSequence: 2, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }),
]

const result2 = stitchChangeMessages({ changeMessages: changeMessages2, changeMessagesBuffer: result1.changeMessagesBuffer })
const result2 = stitchChangeMessages({
changeMessagesBuffer: result1.newChangeMessagesBuffer.addChangeMessages(changeMessages2),
useBuffer: true,
})
expect(result2).toStrictEqual({
changeMessages: [
stitchedChangeMessages: [
findChangeMessage(changeMessages1, 1),
],
ackStreamSequence: 1,
changeMessagesBuffer: ChangeMessagesBuffer.fromStore({}),
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}),
})
})
})
Expand Down
26 changes: 13 additions & 13 deletions core/src/stitching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@ import { ChangeMessage } from './change-message'
import { ChangeMessagesBuffer } from './change-message-buffer'

export const stitchChangeMessages = (
{ changeMessages: initialChangeMessages, changeMessagesBuffer: initialChangeMessagesBuffer }:
{ changeMessages: ChangeMessage[], changeMessagesBuffer: ChangeMessagesBuffer }
{ changeMessagesBuffer, useBuffer = false }:
{ changeMessagesBuffer: ChangeMessagesBuffer, useBuffer: boolean }
) => {
const mergedChangeMessagesBuffer = initialChangeMessagesBuffer.addChangeMessages(initialChangeMessages)
let stitchedChangeMsgs: ChangeMessage[] = []
let stitchedChangeMessages: ChangeMessage[] = []
let ackSequenceBySubject: { [key: string]: string | undefined } = {}
let newChangeMessagesBuffer = new ChangeMessagesBuffer()

mergedChangeMessagesBuffer.forEach((subject, sortedChangeMessages) => {
changeMessagesBuffer.forEach((subject, sortedChangeMessages) => {
let ackSequence: number | undefined = undefined

sortedChangeMessages.forEach((changeMessage) => {
const position = changeMessage.changeAttributes.position.toString()
const changeMessages = mergedChangeMessagesBuffer.changeMessagesByPosition(subject, position)
const changeMessages = changeMessagesBuffer.changeMessagesByPosition(subject, position)
const contextMessageChangeMessage = changeMessages.find(cm => cm.isContextMessage())

// Last message without a pair - skip it
if (
useBuffer &&
changeMessage === sortedChangeMessages[sortedChangeMessages.length - 1] &&
changeMessages.length === 1
) {
Expand All @@ -47,14 +47,14 @@ export const stitchChangeMessages = (

if (contextMessageChangeMessage) {
// Stitch with context message change message if it exists
stitchedChangeMsgs = [
...stitchedChangeMsgs,
stitchedChangeMessages = [
...stitchedChangeMessages,
changeMessage.setContext(contextMessageChangeMessage.context()),
]
} else {
// Return mutation change message as is without stitching
stitchedChangeMsgs = [
...stitchedChangeMsgs,
stitchedChangeMessages = [
...stitchedChangeMessages,
changeMessage,
]
}
Expand All @@ -72,11 +72,11 @@ export const stitchChangeMessages = (
!min || streamSequence! < min ? streamSequence : min
), undefined) as undefined | number

logger.debug({ stitched: stitchedChangeMsgs, buffer: newChangeMessagesBuffer.store, ackStreamSequence })
logger.debug({ stitched: stitchedChangeMessages, buffer: newChangeMessagesBuffer.store, ackStreamSequence })

return {
changeMessages: stitchedChangeMsgs,
changeMessagesBuffer: newChangeMessagesBuffer,
stitchedChangeMessages,
newChangeMessagesBuffer,
ackStreamSequence,
}
}
7 changes: 7 additions & 0 deletions devbox.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"packages": {
"nodejs": "latest",
"nats-server": "latest",
"nodePackages.pnpm": "latest"
}
}
Loading

0 comments on commit 6c05b6f

Please sign in to comment.