Skip to content

Commit

Permalink
enh: improve buildConsumer types (#2)
Browse files Browse the repository at this point in the history
* enh: improve buildConsumer types

* Check if durable is not nullable

* Revert stream

* Use single quote

* Make durable name required rather than null, undefined, and empty string
  • Loading branch information
haecheonlee authored May 30, 2024
1 parent 3ba9a7a commit 357a418
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions core/src/nats.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { connect, JSONCodec } from "nats";
import { connect, ConsumerConfig, JSONCodec, NatsConnection } from "nats";

import { logger } from './logger'

Expand All @@ -20,7 +20,7 @@ export const connectJetstream = (host: string) => {

export const buildConsumer = async (
{ connection, stream, options }:
{ connection: any, stream: string, options: any }
{ connection: NatsConnection, stream: string, options: Partial<ConsumerConfig> }
) => {
const jetstream = connection.jetstream();
const jetstreamManager = await connection.jetstreamManager();
Expand All @@ -29,12 +29,18 @@ export const buildConsumer = async (
try {
consumer = await jetstream.consumers.get(stream, options.durable_name);
const { config } = await consumer.info();
if (Object.keys(options).some(key => options[key] !== config[key])) {
const hasDifferentValue = Object
.keys(options)
.some(key => {
const keyAsOptionsKeyType = key as keyof typeof options;
return options[keyAsOptionsKeyType] !== config[keyAsOptionsKeyType]
});
if (hasDifferentValue && options.durable_name) {
logger.info('Updating consumer...');
await jetstreamManager.consumers.update(stream, options.durable_name, options);
}
} catch (e: any) {
if (e.message !== "consumer not found") throw e;
} catch (e) {
if (e instanceof Error && e.message !== "consumer not found") throw e;
logger.info('Creating consumer...');
await jetstreamManager.consumers.add(stream, options);
consumer = await jetstream.consumers.get(stream, options.durable_name);
Expand Down

0 comments on commit 357a418

Please sign in to comment.