Skip to content

Commit

Permalink
enh: replace customized nats message type by js msg (#5)
Browse files Browse the repository at this point in the history
* enh: replace customized nats message type byjs msg

* Fix extra space

* Align import

---------

Co-authored-by: exAspArk <[email protected]>
  • Loading branch information
haecheonlee and exAspArk authored Jun 2, 2024
1 parent 055b3dc commit deec8b1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 18 deletions.
5 changes: 3 additions & 2 deletions core/src/fetched-record.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { RequiredEntityData } from '@mikro-orm/postgresql';
import { JsMsg } from 'nats';

import { Change, Operation } from "./entities/Change"
import { NatsMessage, decodeData } from './nats'
import { decodeData } from './nats'

export const MESSAGE_PREFIX_CONTEXT = '_bemi'
export const MESSAGE_PREFIX_HEARTBEAT = '_bemi_heartbeat'
Expand Down Expand Up @@ -76,7 +77,7 @@ export class FetchedRecord {
this.messagePrefix = messagePrefix
}

static fromNatsMessage(natsMessage: NatsMessage, now = new Date()) {
static fromNatsMessage(natsMessage: JsMsg, now = new Date()) {
const debeziumData = decodeData(natsMessage.data) as any

const messagePrefix = debeziumData.message?.prefix
Expand Down
7 changes: 3 additions & 4 deletions core/src/ingestion.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { MikroORM } from '@mikro-orm/postgresql';
import { Consumer } from 'nats';
import { Consumer, JsMsg } from 'nats';

import { logger } from './logger'
import { NatsMessage } from './nats'
import { Change } from "./entities/Change"
import { FetchedRecord } from './fetched-record'
import { FetchedRecordBuffer } from './fetched-record-buffer'
Expand Down Expand Up @@ -37,7 +36,7 @@ const fetchNatsMessages = async (
{ consumer, fetchBatchSize, lastStreamSequence }:
{ consumer: Consumer, fetchBatchSize: number, lastStreamSequence: number | null }
) => {
const natsMessageBySequence: { [sequence: number]: NatsMessage } = {}
const natsMessageBySequence: { [sequence: number]: JsMsg } = {}
let pendingMessageCount = 0

const iterator = await consumer.fetch({ max_messages: fetchBatchSize, expires: FETCH_EXPIRES_MS });
Expand Down Expand Up @@ -96,7 +95,7 @@ export const runIngestionLoop = async (
// Stitching
const now = new Date()
const natsMessages = Object.values(natsMessageBySequence)
const fetchedRecords = natsMessages.map((m: NatsMessage) => FetchedRecord.fromNatsMessage(m, now)).filter(r => r) as FetchedRecord[]
const fetchedRecords = natsMessages.map((m: JsMsg) => FetchedRecord.fromNatsMessage(m, now)).filter(r => r) as FetchedRecord[]
const { stitchedFetchedRecords, newFetchedRecordBuffer, ackStreamSequence } = stitchFetchedRecords({
fetchedRecordBuffer: fetchedRecordBuffer.addFetchedRecords(fetchedRecords),
useBuffer,
Expand Down
10 changes: 0 additions & 10 deletions core/src/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@ import { logger } from './logger'

const JSON_CODEC = JSONCodec()

export interface NatsMessage {
subject: string,
info: {
streamSequence: number,
pending: number,
},
data: any,
ack: () => void,
}

export const connectJetstream = (host: string) => {
return connect({ servers: host });
}
Expand Down
24 changes: 22 additions & 2 deletions core/src/specs/fixtures/nats-messages.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { NatsMessage, encodeData } from '../../nats'
import { JsMsg } from 'nats'

import { encodeData } from '../../nats'

export const POSITIONS = {
CREATE: 35878528,
Expand Down Expand Up @@ -215,12 +217,30 @@ export const MESSAGE_DATA = {
export const buildNatsMessage = (
{ subject, data, streamSequence }:
{ subject: string, data: any, streamSequence: number }
): NatsMessage => ({
): JsMsg => ({
redelivered: false,
headers: undefined,
seq: 0,
sid: 0,
subject,
info: {
streamSequence,
pending: 0,
domain: "",
stream: "",
consumer: "",
redeliveryCount: 0,
deliverySequence: 0,
timestampNanos: 0,
redelivered: false
},
data: encodeData(data),
ack: () => {},
nak: () => {},
working: () => {},
next: () => {},
term: () => {},
ackAck: async () => false,
json: <T>() => null as T,
string: () => "",
})

0 comments on commit deec8b1

Please sign in to comment.