From b1c1d4344b1413e6db8cac64fc1d0eb583ff57bc Mon Sep 17 00:00:00 2001 From: haecheonlee Date: Wed, 29 May 2024 19:58:57 -0400 Subject: [PATCH 1/2] fix: remove unused code --- core/src/change-message.ts | 51 -------------------------------------- 1 file changed, 51 deletions(-) diff --git a/core/src/change-message.ts b/core/src/change-message.ts index f225be0..cb63c97 100644 --- a/core/src/change-message.ts +++ b/core/src/change-message.ts @@ -1,50 +1,10 @@ import { RequiredEntityData } from '@mikro-orm/postgresql'; import { Change, Operation } from "./entities/Change" -import { Message, decodeData } from './nats' export const MESSAGE_PREFIX_CONTEXT = '_bemi' export const MESSAGE_PREFIX_HEARTBEAT = '_bemi_heartbeat' -const parseDebeziumData = (debeziumChange: any, now: Date) => { - const { - op, - before, - after, - ts_ms: queueAtMs, - message, - source: { db: database, schema, table, txId: transactionId, lsn: position, ts_ms: committedAtMs }, - } = debeziumChange - - let operation - if (op === 'c') operation = Operation.CREATE - else if (op === 'u') operation = Operation.UPDATE - else if (op === 'd') operation = Operation.DELETE - else if (op === 't') operation = Operation.TRUNCATE - else if (op === 'm') operation = Operation.MESSAGE - else throw new Error(`Unknown operation: ${op}`) - - const context = message?.prefix === MESSAGE_PREFIX_CONTEXT ? - JSON.parse(Buffer.from(message?.content, 'base64').toString('utf-8')) : - {} - - return { - primaryKey: (operation === Operation.DELETE ? before : after)?.id?.toString(), - before: before || {}, - after: after || {}, - context, - database, - schema, - table, - operation, - committedAt: new Date(committedAtMs), - queuedAt: new Date(queueAtMs), - transactionId, - position: parseInt(position, 10), - createdAt: now, - } -} - export class ChangeMessage { changeAttributes: RequiredEntityData subject: string @@ -61,17 +21,6 @@ export class ChangeMessage { this.messagePrefix = messagePrefix } - static fromMessage(message: Message, now = new Date()) { - const debeziumData = decodeData(message.data) as any - - return new ChangeMessage({ - changeAttributes: parseDebeziumData(debeziumData, now), - subject: message.subject, - streamSequence: message.info.streamSequence, - messagePrefix: debeziumData.message?.prefix, - }) - } - isMessage() { return this.changeAttributes.operation === Operation.MESSAGE } From a0b0fb219c796c42ba9ab295569082304fd878cb Mon Sep 17 00:00:00 2001 From: haecheonlee Date: Wed, 29 May 2024 20:07:25 -0400 Subject: [PATCH 2/2] Remove unused files --- core/src/change-message-buffer.ts | 67 ----------- core/src/change-message.ts | 45 ------- core/src/specs/fixtures/change-messages.ts | 129 --------------------- 3 files changed, 241 deletions(-) delete mode 100644 core/src/change-message-buffer.ts delete mode 100644 core/src/change-message.ts delete mode 100644 core/src/specs/fixtures/change-messages.ts diff --git a/core/src/change-message-buffer.ts b/core/src/change-message-buffer.ts deleted file mode 100644 index e573a42..0000000 --- a/core/src/change-message-buffer.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { ChangeMessage } from './change-message' - -export class ChangeMessagesBuffer { - store: { [subject: string]: { [position: string]: ChangeMessage[] } } - - constructor() { - this.store = {} - } - - static fromStore(store: { [subject: string]: { [position: string]: ChangeMessage[] } }) { - const buffer = new ChangeMessagesBuffer() - buffer.store = store - return buffer - } - - addChangeMessage(changeMessage: ChangeMessage) { - const newBuffer = Object.assign(Object.create(this), this) - const { subject, changeAttributes } = changeMessage - const position = changeAttributes.position.toString() - const existingChangeMessages = this.store[subject]?.[position] - - if (existingChangeMessages) { - newBuffer.store = { - ...this.store, - [subject]: { ...this.store[subject], [position]: [...existingChangeMessages, changeMessage] }, - } - return newBuffer - } - - newBuffer.store = { - ...this.store, - [subject]: { ...(this.store[subject] || []), [position]: [changeMessage] }, - } - return newBuffer - } - - addChangeMessages(changeMessages: ChangeMessage[]) { - let newBuffer = this - - changeMessages.forEach((changeMessage: ChangeMessage) => { - newBuffer = newBuffer.addChangeMessage(changeMessage) - }) - - return newBuffer - } - - forEach(callback: (subject: string, changeMessages: ChangeMessage[]) => void) { - Object.keys(this.store).forEach((subject) => { - const changeMessages = Object.values(this.store[subject]).flat() - const sortedChangeMessages = changeMessages.sort((a, b) => ( - parseInt(a.changeAttributes.position.toString(), 10) - parseInt(b.changeAttributes.position.toString(), 10) - )) - - callback(subject, sortedChangeMessages) - }) - } - - changeMessagesByPosition(subject: string, position: string) { - return this.store[subject]?.[position] || [] - } - - size() { - return Object.values(this.store).map(changeMsgsByPos => ( - Object.values(changeMsgsByPos).length - )).reduce((acc, l) => acc + l, 0) - } -} diff --git a/core/src/change-message.ts b/core/src/change-message.ts deleted file mode 100644 index cb63c97..0000000 --- a/core/src/change-message.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { RequiredEntityData } from '@mikro-orm/postgresql'; - -import { Change, Operation } from "./entities/Change" - -export const MESSAGE_PREFIX_CONTEXT = '_bemi' -export const MESSAGE_PREFIX_HEARTBEAT = '_bemi_heartbeat' - -export class ChangeMessage { - changeAttributes: RequiredEntityData - subject: string - streamSequence: number - messagePrefix?: string - - constructor( - { changeAttributes, subject, streamSequence, messagePrefix }: - { changeAttributes: RequiredEntityData, subject: string, streamSequence: number, messagePrefix?: string } - ) { - this.changeAttributes = changeAttributes - this.subject = subject - this.streamSequence = streamSequence - this.messagePrefix = messagePrefix - } - - isMessage() { - return this.changeAttributes.operation === Operation.MESSAGE - } - - isContextMessage() { - return this.isMessage() && this.messagePrefix === MESSAGE_PREFIX_CONTEXT - } - - isHeartbeatMessage() { - return this.isMessage() && this.messagePrefix === MESSAGE_PREFIX_HEARTBEAT - } - - context() { - return this.changeAttributes.context as object - } - - setContext(context: object) { - const newChangeMessage = Object.assign(Object.create(this), this) - newChangeMessage.changeAttributes = { ...this.changeAttributes, context } - return newChangeMessage - } -} diff --git a/core/src/specs/fixtures/change-messages.ts b/core/src/specs/fixtures/change-messages.ts deleted file mode 100644 index b769eb7..0000000 --- a/core/src/specs/fixtures/change-messages.ts +++ /dev/null @@ -1,129 +0,0 @@ -import { ChangeMessage, MESSAGE_PREFIX_CONTEXT } from '../../change-message' -import { Operation } from '../../entities/Change' - -import { POSITIONS } from './nats-messages' - -export const MOCKED_DATE = new Date(1466424490000) - -export const CHANGE_ATTRIBUTES = { - CREATE: { - "committedAt": MOCKED_DATE, - "createdAt": MOCKED_DATE, - "database": "bemi_dev_source", - "context": {}, - "operation": Operation.CREATE, - "position": POSITIONS.CREATE, - "primaryKey": "2", - "queuedAt": MOCKED_DATE, - "schema": "public", - "table": "todo", - "transactionId": 768, - "before": {}, - "after": { "id": 2, "isCompleted": false, "task": "Test" }, - }, - CREATE_MESSAGE: { - "committedAt": MOCKED_DATE, - "createdAt": MOCKED_DATE, - "database": "bemi_dev_source", - "context": { "op": "c" }, - "operation": Operation.MESSAGE, - "position": POSITIONS.CREATE, - "primaryKey": undefined, - "queuedAt": MOCKED_DATE, - "schema": "", - "table": "", - "transactionId": 768, - "before": {}, - "after": {}, - }, - UPDATE: { - "committedAt": MOCKED_DATE, - "createdAt": MOCKED_DATE, - "database": "bemi_dev_source", - "context": {}, - "operation": Operation.UPDATE, - "position": POSITIONS.UPDATE, - "primaryKey": "2", - "queuedAt": MOCKED_DATE, - "schema": "public", - "table": "todo", - "transactionId": 769, - "before": { "id": 2, "isCompleted": false, "task": "Test" }, - "after": { "id": 2, "isCompleted": true, "task": "Test" }, - }, - UPDATE_MESSAGE: { - "committedAt": MOCKED_DATE, - "createdAt": MOCKED_DATE, - "database": "bemi_dev_source", - "context": { "op": "u" }, - "operation": Operation.MESSAGE, - "position": POSITIONS.UPDATE, - "primaryKey": undefined, - "queuedAt": MOCKED_DATE, - "schema": "", - "table": "", - "transactionId": 769, - "before": {}, - "after": {}, - }, - DELETE: { - "committedAt": MOCKED_DATE, - "createdAt": MOCKED_DATE, - "database": "bemi_dev_source", - "context": {}, - "operation": Operation.DELETE, - "position": POSITIONS.DELETE, - "primaryKey": "2", - "queuedAt": MOCKED_DATE, - "schema": "public", - "table": "todo", - "transactionId": 767, - "before": { "id": 2, "isCompleted": true, "task": "Test" }, - "after": {}, - }, - DELETE_MESSAGE: { - "committedAt": MOCKED_DATE, - "createdAt": MOCKED_DATE, - "database": "bemi_dev_source", - "context": { "op": "d" }, - "operation": Operation.MESSAGE, - "position": POSITIONS.DELETE, - "primaryKey": undefined, - "queuedAt": MOCKED_DATE, - "schema": "", - "table": "", - "transactionId": 767, - "before": {}, - "after": {}, - }, - HEARTBEAT_MESSAGE: { - "committedAt": MOCKED_DATE, - "createdAt": MOCKED_DATE, - "database": "bemi_dev_source", - "context": {}, - "operation": Operation.MESSAGE, - "position": POSITIONS.HEARTBEAT_MESSAGE, - "primaryKey": undefined, - "queuedAt": MOCKED_DATE, - "schema": "", - "table": "", - "transactionId": 769, - "before": {}, - "after": {}, - }, - TRUNCATE: { - "committedAt": MOCKED_DATE, - "createdAt": MOCKED_DATE, - "database": "bemi_dev_source", - "context": {}, - "operation": Operation.TRUNCATE, - "position": POSITIONS.TRUNCATE, - "primaryKey": undefined, - "queuedAt": MOCKED_DATE, - "schema": "public", - "table": "todo", - "transactionId": 770, - "before": {}, - "after": {}, - }, -}