From 47cb49a545a99afea812c5ad866d6b63f78dfb8e Mon Sep 17 00:00:00 2001 From: exAspArk Date: Thu, 21 Mar 2024 13:18:30 -0400 Subject: [PATCH] Ingest changes in real-time with streaming vs batches with intervals --- core/src/ingestion.ts | 52 ++++++++++++++++++++++++------------------- core/src/nats.ts | 22 +++++++++++++++--- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/core/src/ingestion.ts b/core/src/ingestion.ts index 05ec912..9792cf6 100644 --- a/core/src/ingestion.ts +++ b/core/src/ingestion.ts @@ -8,8 +8,10 @@ import { ChangeMessage } from './change-message' import { ChangeMessagesBuffer } from './change-message-buffer' import { stitchChangeMessages } from './stitching' -const sleep = (seconds: number) => ( - new Promise(resolve => setTimeout(resolve, seconds * 1000)) +const INSERT_INTERVAL_MS = 1000 // 1 second to avoid overwhelming the database + +const sleep = (ms: number) => ( + new Promise(resolve => setTimeout(resolve, ms)) ) const chunk = (array: T[], size: number): T[][] => ( @@ -35,12 +37,15 @@ const fetchMessages = async ( { consumer: any, fetchBatchSize: number, lastStreamSequence: number | null } ) => { const messageBySequence: { [sequence: number]: Message } = {} + let pendingMessageCount = 0 const iterator = await consumer.fetch({ max_messages: fetchBatchSize }); for await (const message of iterator) { const { streamSequence, pending } = message.info; - logger.debug(`Stream sequence: ${streamSequence}`) + logger.debug(`Stream sequence: ${streamSequence}, pending: ${pending}`) + + pendingMessageCount = pending // Accumulate the batch if (!lastStreamSequence || lastStreamSequence < streamSequence) { @@ -48,17 +53,10 @@ const fetchMessages = async ( } // Exhausted the batch - if (pending === 0) { - const sequences = Object.keys(messageBySequence).sort((a, b) => Number(b) - Number(a)) // reverse sort - const lastBatchSequence = sequences.length ? Number(sequences[0]) : null - if (lastBatchSequence && (!lastStreamSequence || lastBatchSequence > lastStreamSequence)) { - lastStreamSequence = lastBatchSequence; - } - break; - } + if (pendingMessageCount === 0) break; } - return { messageBySequence, lastStreamSequence } + return { messageBySequence, pendingMessageCount } } export const runIngestionLoop = async ( @@ -67,13 +65,11 @@ export const runIngestionLoop = async ( consumer, fetchBatchSize = 100, insertBatchSize = 100, - refetchEmptyAfterSeconds = 10, useBuffer = false, }: { orm: MikroORM, consumer: any, fetchBatchSize?: number, - refetchEmptyAfterSeconds?: number, insertBatchSize?: number, useBuffer?: boolean, } @@ -84,14 +80,22 @@ export const runIngestionLoop = async ( while (true) { // Fetching logger.info('Fetching...') - const { messageBySequence, lastStreamSequence: newLastStreamSequence } = await fetchMessages({ consumer, fetchBatchSize, lastStreamSequence }) - const messages = Object.values(messageBySequence) - lastStreamSequence = newLastStreamSequence + const fetchedMessages = await fetchMessages({ consumer, fetchBatchSize, lastStreamSequence }) + const { messageBySequence, pendingMessageCount } = fetchedMessages + + // Last sequence tracking + const sequences = Object.keys(messageBySequence).sort((a, b) => Number(b) - Number(a)) // reverse sort + if (sequences.length) { + const lastBatchSequence = Number(sequences[0]) + if (!lastStreamSequence || lastBatchSequence > lastStreamSequence) { + lastStreamSequence = lastBatchSequence; + } + } // Stitching const now = new Date() + const messages = Object.values(messageBySequence) const changeMessages = messages.map((message: Message) => ChangeMessage.fromMessage(message, now)) - const { stitchedChangeMessages, newChangeMessagesBuffer, ackStreamSequence } = stitchChangeMessages({ changeMessagesBuffer: changeMessagesBuffer.addChangeMessages(changeMessages), useBuffer, @@ -101,7 +105,8 @@ export const runIngestionLoop = async ( logger.info([ `Fetched: ${messages.length}`, `Saving: ${stitchedChangeMessages.length}`, - `Pending: ${changeMessagesBuffer.size()}`, + `Pending in buffer: ${changeMessagesBuffer.size()}`, + `Pending in stream: ${pendingMessageCount}`, `Ack sequence: ${ackStreamSequence ? `#${ackStreamSequence}` : 'none'}`, `Last sequence: #${lastStreamSequence}`, ].join('. ')) @@ -112,12 +117,13 @@ export const runIngestionLoop = async ( await orm.em.flush() } if (ackStreamSequence) { - logger.info(`Acking ${ackStreamSequence}...`) + logger.debug(`Acking ${ackStreamSequence}...`) messageBySequence[ackStreamSequence]?.ack() } - // Waiting for the next loop - logger.debug(`Sleeping for ${refetchEmptyAfterSeconds} seconds...`) - await sleep(refetchEmptyAfterSeconds) + if (stitchedChangeMessages.length) { + logger.debug('Sleeping...') + await sleep(INSERT_INTERVAL_MS) + } } } diff --git a/core/src/nats.ts b/core/src/nats.ts index 7555495..1a6c15b 100644 --- a/core/src/nats.ts +++ b/core/src/nats.ts @@ -1,5 +1,7 @@ import { connect, JSONCodec } from "nats"; +import { logger } from './logger' + const JSON_CODEC = JSONCodec() export interface Message { @@ -18,13 +20,27 @@ export const connectJetstream = (host: string) => { export const buildConsumer = async ( { connection, stream, options }: - { connection: any, stream: string, options: object } + { connection: any, stream: string, options: any } ) => { const jetstream = connection.jetstream(); const jetstreamManager = await connection.jetstreamManager(); + let consumer; + + try { + consumer = await jetstream.consumers.get(stream, options.durable_name); + const { config } = await consumer.info(); + if (Object.keys(options).some(key => options[key] !== config[key])) { + logger.info('Updating consumer...'); + await jetstreamManager.consumers.update(stream, options.durable_name, options); + } + } catch (e: any) { + if (e.message !== "consumer not found") throw e; + logger.info('Creating consumer...'); + await jetstreamManager.consumers.add(stream, options); + consumer = await jetstream.consumers.get(stream, options.durable_name); + } - const consumerInfo = await jetstreamManager.consumers.add(stream, options); - return jetstream.consumers.get(stream, consumerInfo.name); + return consumer; } export const decodeData = (data: any) => {