Skip to content

Commit

Permalink
Ingest changes in real-time with streaming vs batches with intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Mar 21, 2024
1 parent 72dc7df commit 47cb49a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 26 deletions.
52 changes: 29 additions & 23 deletions core/src/ingestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import { ChangeMessage } from './change-message'
import { ChangeMessagesBuffer } from './change-message-buffer'
import { stitchChangeMessages } from './stitching'

const sleep = (seconds: number) => (
new Promise(resolve => setTimeout(resolve, seconds * 1000))
const INSERT_INTERVAL_MS = 1000 // 1 second to avoid overwhelming the database

const sleep = (ms: number) => (
new Promise(resolve => setTimeout(resolve, ms))
)

const chunk = <T>(array: T[], size: number): T[][] => (
Expand All @@ -35,30 +37,26 @@ const fetchMessages = async (
{ consumer: any, fetchBatchSize: number, lastStreamSequence: number | null }
) => {
const messageBySequence: { [sequence: number]: Message } = {}
let pendingMessageCount = 0

const iterator = await consumer.fetch({ max_messages: fetchBatchSize });

for await (const message of iterator) {
const { streamSequence, pending } = message.info;
logger.debug(`Stream sequence: ${streamSequence}`)
logger.debug(`Stream sequence: ${streamSequence}, pending: ${pending}`)

pendingMessageCount = pending

// Accumulate the batch
if (!lastStreamSequence || lastStreamSequence < streamSequence) {
messageBySequence[streamSequence] = message
}

// Exhausted the batch
if (pending === 0) {
const sequences = Object.keys(messageBySequence).sort((a, b) => Number(b) - Number(a)) // reverse sort
const lastBatchSequence = sequences.length ? Number(sequences[0]) : null
if (lastBatchSequence && (!lastStreamSequence || lastBatchSequence > lastStreamSequence)) {
lastStreamSequence = lastBatchSequence;
}
break;
}
if (pendingMessageCount === 0) break;
}

return { messageBySequence, lastStreamSequence }
return { messageBySequence, pendingMessageCount }
}

export const runIngestionLoop = async (
Expand All @@ -67,13 +65,11 @@ export const runIngestionLoop = async (
consumer,
fetchBatchSize = 100,
insertBatchSize = 100,
refetchEmptyAfterSeconds = 10,
useBuffer = false,
}: {
orm: MikroORM<PostgreSqlDriver>,
consumer: any,
fetchBatchSize?: number,
refetchEmptyAfterSeconds?: number,
insertBatchSize?: number,
useBuffer?: boolean,
}
Expand All @@ -84,14 +80,22 @@ export const runIngestionLoop = async (
while (true) {
// Fetching
logger.info('Fetching...')
const { messageBySequence, lastStreamSequence: newLastStreamSequence } = await fetchMessages({ consumer, fetchBatchSize, lastStreamSequence })
const messages = Object.values(messageBySequence)
lastStreamSequence = newLastStreamSequence
const fetchedMessages = await fetchMessages({ consumer, fetchBatchSize, lastStreamSequence })
const { messageBySequence, pendingMessageCount } = fetchedMessages

// Last sequence tracking
const sequences = Object.keys(messageBySequence).sort((a, b) => Number(b) - Number(a)) // reverse sort
if (sequences.length) {
const lastBatchSequence = Number(sequences[0])
if (!lastStreamSequence || lastBatchSequence > lastStreamSequence) {
lastStreamSequence = lastBatchSequence;
}
}

// Stitching
const now = new Date()
const messages = Object.values(messageBySequence)
const changeMessages = messages.map((message: Message) => ChangeMessage.fromMessage(message, now))

const { stitchedChangeMessages, newChangeMessagesBuffer, ackStreamSequence } = stitchChangeMessages({
changeMessagesBuffer: changeMessagesBuffer.addChangeMessages(changeMessages),
useBuffer,
Expand All @@ -101,7 +105,8 @@ export const runIngestionLoop = async (
logger.info([
`Fetched: ${messages.length}`,
`Saving: ${stitchedChangeMessages.length}`,
`Pending: ${changeMessagesBuffer.size()}`,
`Pending in buffer: ${changeMessagesBuffer.size()}`,
`Pending in stream: ${pendingMessageCount}`,
`Ack sequence: ${ackStreamSequence ? `#${ackStreamSequence}` : 'none'}`,
`Last sequence: #${lastStreamSequence}`,
].join('. '))
Expand All @@ -112,12 +117,13 @@ export const runIngestionLoop = async (
await orm.em.flush()
}
if (ackStreamSequence) {
logger.info(`Acking ${ackStreamSequence}...`)
logger.debug(`Acking ${ackStreamSequence}...`)
messageBySequence[ackStreamSequence]?.ack()
}

// Waiting for the next loop
logger.debug(`Sleeping for ${refetchEmptyAfterSeconds} seconds...`)
await sleep(refetchEmptyAfterSeconds)
if (stitchedChangeMessages.length) {
logger.debug('Sleeping...')
await sleep(INSERT_INTERVAL_MS)
}
}
}
22 changes: 19 additions & 3 deletions core/src/nats.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { connect, JSONCodec } from "nats";

import { logger } from './logger'

const JSON_CODEC = JSONCodec()

export interface Message {
Expand All @@ -18,13 +20,27 @@ export const connectJetstream = (host: string) => {

export const buildConsumer = async (
{ connection, stream, options }:
{ connection: any, stream: string, options: object }
{ connection: any, stream: string, options: any }
) => {
const jetstream = connection.jetstream();
const jetstreamManager = await connection.jetstreamManager();
let consumer;

try {
consumer = await jetstream.consumers.get(stream, options.durable_name);
const { config } = await consumer.info();
if (Object.keys(options).some(key => options[key] !== config[key])) {
logger.info('Updating consumer...');
await jetstreamManager.consumers.update(stream, options.durable_name, options);
}
} catch (e: any) {
if (e.message !== "consumer not found") throw e;
logger.info('Creating consumer...');
await jetstreamManager.consumers.add(stream, options);
consumer = await jetstream.consumers.get(stream, options.durable_name);
}

const consumerInfo = await jetstreamManager.consumers.add(stream, options);
return jetstream.consumers.get(stream, consumerInfo.name);
return consumer;
}

export const decodeData = (data: any) => {
Expand Down

0 comments on commit 47cb49a

Please sign in to comment.