From 9f0444191f9065e61a214f8d41e6a1a604a137e5 Mon Sep 17 00:00:00 2001 From: Eugene Taran Date: Tue, 19 Dec 2023 14:59:54 +0100 Subject: [PATCH] fix: JOIN-38534 fix occasional error during pubsub publishing on startup (#100) --- packages/pubsub/src/Publisher.ts | 7 ++++--- packages/pubsub/src/__tests__/Publisher.test.ts | 4 ++-- packages/pubsub/src/__tests__/support/pubsubMock.ts | 5 +---- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/pubsub/src/Publisher.ts b/packages/pubsub/src/Publisher.ts index dad4bf1..b33a70e 100644 --- a/packages/pubsub/src/Publisher.ts +++ b/packages/pubsub/src/Publisher.ts @@ -36,7 +36,8 @@ export class Publisher { private readonly avroMessageMetadata?: Record private readonly fieldsProcessor = new FieldsProcessor() - //TODO: remove flags below, when only avro will be used + //TODO: remove fields below, when only avro will be used, why we use jsonPublisher described here: https://joinsolutionsag.atlassian.net/browse/JOIN-38534 + private jsonPublisher: Topic private topicHasAssignedSchema = false private avroSchemasProvided = false @@ -56,6 +57,7 @@ export class Publisher { this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema) } this.topic = client.topic(topicName) + this.jsonPublisher = client.topic(topicName, { gaxOpts: {retry: null}}) this.topicSchemaName = `${this.topicName}-generated-avro` } @@ -204,8 +206,7 @@ export class Publisher { private async sendJsonMessage(message: MessageOptions) { - // why we use this.topic.publisher described here: https://joinsolutionsag.atlassian.net/browse/JOIN-38534 - const messageId = await this.topic.publisher.publishMessage(message) + const messageId = await this.jsonPublisher.publishMessage(message) this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, { data: message.json as unknown, messageId }) } diff --git a/packages/pubsub/src/__tests__/Publisher.test.ts b/packages/pubsub/src/__tests__/Publisher.test.ts index 2f8b61e..6c84755 100644 --- a/packages/pubsub/src/__tests__/Publisher.test.ts +++ b/packages/pubsub/src/__tests__/Publisher.test.ts @@ -78,14 +78,14 @@ describe('Publisher', () => { it('publishes json object', async () => { await publisher.publishMsg(message) - expect(topicMock.publisher.publishMessage).toHaveBeenCalledWith({ json: message }) + expect(topicMock.publishMessage).toHaveBeenCalledWith({ json: message }) }) it('publishes json array', async () => { const array = [message, message] await publisher.publishMsg(array) - expect(topicMock.publisher.publishMessage).toHaveBeenCalledWith({ json: array }) + expect(topicMock.publishMessage).toHaveBeenCalledWith({ json: array }) }) }) diff --git a/packages/pubsub/src/__tests__/support/pubsubMock.ts b/packages/pubsub/src/__tests__/support/pubsubMock.ts index 426f3a9..5233a03 100644 --- a/packages/pubsub/src/__tests__/support/pubsubMock.ts +++ b/packages/pubsub/src/__tests__/support/pubsubMock.ts @@ -67,10 +67,7 @@ export const getTopicMock = ({ subscriptionMock, iamMock }: ITopicMockOption = { publishMessage: jest.fn(), subscription: jest.fn(() => subscriptionMock), iam: iamMock, - getMetadata: jest.fn(), - publisher: { - publishMessage: jest.fn() - } + getMetadata: jest.fn() }) export const schemaMock = {