Skip to content

Commit

Permalink
fix: JOIN-38534 fix occasional error during pubsub publishing on star…
Browse files Browse the repository at this point in the history
…tup (#100)
  • Loading branch information
eugene-taran authored Dec 19, 2023
1 parent 99bcb97 commit 9f04441
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
7 changes: 4 additions & 3 deletions packages/pubsub/src/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ export class Publisher<T = unknown> {

private readonly avroMessageMetadata?: Record<string, string>
private readonly fieldsProcessor = new FieldsProcessor()
//TODO: remove flags below, when only avro will be used
//TODO: remove fields below, when only avro will be used, why we use jsonPublisher described here: https://joinsolutionsag.atlassian.net/browse/JOIN-38534
private jsonPublisher: Topic
private topicHasAssignedSchema = false
private avroSchemasProvided = false

Expand All @@ -56,6 +57,7 @@ export class Publisher<T = unknown> {
this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema)
}
this.topic = client.topic(topicName)
this.jsonPublisher = client.topic(topicName, { gaxOpts: {retry: null}})
this.topicSchemaName = `${this.topicName}-generated-avro`
}

Expand Down Expand Up @@ -204,8 +206,7 @@ export class Publisher<T = unknown> {


private async sendJsonMessage(message: MessageOptions) {
// why we use this.topic.publisher described here: https://joinsolutionsag.atlassian.net/browse/JOIN-38534
const messageId = await this.topic.publisher.publishMessage(message)
const messageId = await this.jsonPublisher.publishMessage(message)
this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, { data: message.json as unknown, messageId })
}

Expand Down
4 changes: 2 additions & 2 deletions packages/pubsub/src/__tests__/Publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ describe('Publisher', () => {
it('publishes json object', async () => {
await publisher.publishMsg(message)

expect(topicMock.publisher.publishMessage).toHaveBeenCalledWith({ json: message })
expect(topicMock.publishMessage).toHaveBeenCalledWith({ json: message })
})

it('publishes json array', async () => {
const array = [message, message]
await publisher.publishMsg(array)

expect(topicMock.publisher.publishMessage).toHaveBeenCalledWith({ json: array })
expect(topicMock.publishMessage).toHaveBeenCalledWith({ json: array })
})
})

Expand Down
5 changes: 1 addition & 4 deletions packages/pubsub/src/__tests__/support/pubsubMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ export const getTopicMock = ({ subscriptionMock, iamMock }: ITopicMockOption = {
publishMessage: jest.fn(),
subscription: jest.fn(() => subscriptionMock),
iam: iamMock,
getMetadata: jest.fn(),
publisher: {
publishMessage: jest.fn()
}
getMetadata: jest.fn()
})

export const schemaMock = {
Expand Down

0 comments on commit 9f04441

Please sign in to comment.