Skip to content

Commit

Permalink
fix: JOIN-46521 do not update subscription when no changes (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran authored Sep 11, 2024
1 parent 8ed29fa commit ff81351
Show file tree
Hide file tree
Showing 5 changed files with 866 additions and 989 deletions.
2 changes: 1 addition & 1 deletion packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"prepublishOnly": "yarn lint && yarn build"
},
"dependencies": {
"@google-cloud/pubsub": "^4.3.3",
"@google-cloud/pubsub": "^4.7.1",
"avsc": "^5.7.7"
},
"devDependencies": {
Expand Down
18 changes: 16 additions & 2 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { IAM, Message, PubSub, Subscription, SubscriptionOptions, Topic } from '@google-cloud/pubsub'
import { google } from '@google-cloud/pubsub/build/protos/protos'
import { SchemaServiceClient, SubscriberClient } from '@google-cloud/pubsub/build/src/v1'
import { Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
Expand All @@ -8,6 +9,7 @@ import { ILogger } from './ILogger'
import { JOIN_PRESERVE_NULL, JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS } from './Publisher'
import { SchemaCache } from './SchemaCache'
import { replaceNullsWithUndefined } from './util'
import ISubscription = google.pubsub.v1.ISubscription

export interface IParsedMessage<T = unknown> {
dataParsed: T
Expand Down Expand Up @@ -227,8 +229,11 @@ export class Subscriber<T = unknown> {
await subscription.create({ ...options, gaxOpts: createCallOptions })
this.logger?.info(`PubSub: Subscription ${subscriptionName} is created`)
} else if (options) {
await subscription.setMetadata(options)
this.logger?.info(`PubSub: Subscription ${subscriptionName} metadata updated`)
const [existingSubscription] = await subscription.getMetadata()
if (this.isMetadataChanged(existingSubscription, options)) {
await subscription.setMetadata(options)
this.logger?.info(`PubSub: Subscription ${subscriptionName} metadata updated`)
}
}
}

Expand Down Expand Up @@ -318,4 +323,13 @@ export class Subscriber<T = unknown> {
},
}
}

private isMetadataChanged(existingSubscription: ISubscription, options: ISubscriptionInitializationOptions): boolean {
return options.retryPolicy.minimumBackoff?.seconds &&
String(options.retryPolicy.minimumBackoff.seconds) !== existingSubscription.retryPolicy?.minimumBackoff?.seconds ||
options.retryPolicy.maximumBackoff?.seconds &&
String(options.retryPolicy.maximumBackoff.seconds) !== existingSubscription.retryPolicy?.maximumBackoff?.seconds ||
!!options.deadLetterPolicy?.maxDeliveryAttempts &&
options.deadLetterPolicy.maxDeliveryAttempts !== existingSubscription.deadLetterPolicy?.maxDeliveryAttempts
}
}
44 changes: 38 additions & 6 deletions packages/pubsub/src/__tests__/Subscriber.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ describe('Subscriber', () => {
subscriptionMock.exists.mockReset()
subscriptionMock.create.mockReset()
subscriptionMock.setMetadata.mockReset()
subscriptionMock.getMetadata.mockReset()
subscriptionMock.getMetadata.mockReset()
iamTopicMock.setPolicy.mockReset()
iamSubscriptionMock.setPolicy.mockReset()
schemaClientMock.getSchema.mockReset()
Expand All @@ -73,6 +75,7 @@ describe('Subscriber', () => {
topicMock.exists.mockResolvedValue([false])
subscriptionMock.exists.mockResolvedValue([true])
topicMock.getMetadata.mockResolvedValue([])
subscriptionMock.getMetadata.mockResolvedValue([{}])

await subscriber.initialize()

Expand All @@ -84,6 +87,7 @@ describe('Subscriber', () => {
it('does not create topic if exists', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])

await subscriber.initialize()

Expand Down Expand Up @@ -120,9 +124,15 @@ describe('Subscriber', () => {
expect(subscriptionMock.setMetadata).not.toHaveBeenCalled()
})

it('updates metadata if subscription exists', async () => {
it('updates metadata if backoff has changed', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{
retryPolicy: {
minimumBackoff: { seconds: '45' },
maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) }
},
}])

await subscriber.initialize()

Expand All @@ -136,20 +146,34 @@ describe('Subscriber', () => {
})
})

it('resets retry policy unless backoff values provided', async () => {
it('does not update metadata if subscription exists and did not change', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{
retryPolicy: {
minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) },
maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) }
},
}])

await subscriber.initialize()

expect(subscriptionMock.create).not.toHaveBeenCalled()
expect(subscriptionMock.setMetadata).not.toHaveBeenCalled()
})

it('does not update retry policy if no values provided', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])

subscriber = new Subscriber({ topicName, subscriptionName }, clientMock as unknown as PubSub,
schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient)

await subscriber.initialize()

expect(subscriptionMock.create).not.toHaveBeenCalled()
expect(subscriptionMock.setMetadata).toHaveBeenCalledWith({
deadLetterPolicy: null,
retryPolicy: {},
})
expect(subscriptionMock.setMetadata).not.toHaveBeenCalled()
})

describe('dead letter policy', () => {
Expand All @@ -176,6 +200,7 @@ describe('Subscriber', () => {
describe('deadLetterTopic initialization', () => {
beforeEach(() => {
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
})

it('creates deadLetterTopic unless exists', async () => {
Expand Down Expand Up @@ -262,6 +287,7 @@ describe('Subscriber', () => {

it('does not create deadLetterSubscription if exists', async () => {
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])

await subscriber.initialize()

Expand Down Expand Up @@ -340,6 +366,7 @@ describe('Subscriber', () => {
it('receives avro parsed data', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_EXAMPLE)}])

Expand All @@ -362,6 +389,7 @@ describe('Subscriber', () => {
it('receives avro parsed data with null preserve fields', async () => {
topicMock.exists.mockResolvedValue([false])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE)}])

Expand All @@ -384,6 +412,7 @@ describe('Subscriber', () => {
it('processes avro encoded data without assigned schema from gcloud', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.listSchemaRevisions.mockResolvedValue([[{revisionId: 'revision', definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE)}]])

Expand All @@ -406,6 +435,7 @@ describe('Subscriber', () => {
it('processes avro encoded data with latest schema when schema from message can not be found', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])

schemaClientMock.getSchema.mockRejectedValue(new Error('NOT_FOUND'))
Expand All @@ -429,6 +459,7 @@ describe('Subscriber', () => {
it('receives avro parsed data and replaces empty array with undefined using path from metadata', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE)}])

Expand All @@ -452,6 +483,7 @@ describe('Subscriber', () => {
it('receives avro parsed data and replaces 2 empty array with undefined using path from metadata', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE)}])

Expand Down
1 change: 1 addition & 0 deletions packages/pubsub/src/__tests__/support/pubsubMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export const getSubscriptionMock = ({ iamMock }: ISubscriptionMockOption = {}) =
exists: jest.fn(),
create: jest.fn(),
setMetadata: jest.fn(),
getMetadata: jest.fn(),
close: jest.fn(),
open: jest.fn(),
iam: iamMock,
Expand Down
Loading

0 comments on commit ff81351

Please sign in to comment.