Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: JOIN-46521 do not update subscription when no changes #106

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"prepublishOnly": "yarn lint && yarn build"
},
"dependencies": {
"@google-cloud/pubsub": "^4.3.3",
"@google-cloud/pubsub": "^4.7.1",
"avsc": "^5.7.7"
},
"devDependencies": {
Expand Down
18 changes: 16 additions & 2 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { IAM, Message, PubSub, Subscription, SubscriptionOptions, Topic } from '@google-cloud/pubsub'
import { google } from '@google-cloud/pubsub/build/protos/protos'
import { SchemaServiceClient, SubscriberClient } from '@google-cloud/pubsub/build/src/v1'
import { Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
Expand All @@ -8,6 +9,7 @@ import { ILogger } from './ILogger'
import { JOIN_PRESERVE_NULL, JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS } from './Publisher'
import { SchemaCache } from './SchemaCache'
import { replaceNullsWithUndefined } from './util'
import ISubscription = google.pubsub.v1.ISubscription

export interface IParsedMessage<T = unknown> {
dataParsed: T
Expand Down Expand Up @@ -227,8 +229,11 @@ export class Subscriber<T = unknown> {
await subscription.create({ ...options, gaxOpts: createCallOptions })
this.logger?.info(`PubSub: Subscription ${subscriptionName} is created`)
} else if (options) {
await subscription.setMetadata(options)
this.logger?.info(`PubSub: Subscription ${subscriptionName} metadata updated`)
const [existingSubscription] = await subscription.getMetadata()
kirpichenko marked this conversation as resolved.
Show resolved Hide resolved
if (this.isMetadataChanged(existingSubscription, options)) {
await subscription.setMetadata(options)
this.logger?.info(`PubSub: Subscription ${subscriptionName} metadata updated`)
}
}
}

Expand Down Expand Up @@ -318,4 +323,13 @@ export class Subscriber<T = unknown> {
},
}
}

private isMetadataChanged(existingSubscription: ISubscription, options: ISubscriptionInitializationOptions): boolean {
return options.retryPolicy.minimumBackoff?.seconds &&
String(options.retryPolicy.minimumBackoff.seconds) !== existingSubscription.retryPolicy?.minimumBackoff?.seconds ||
options.retryPolicy.maximumBackoff?.seconds &&
String(options.retryPolicy.maximumBackoff.seconds) !== existingSubscription.retryPolicy?.maximumBackoff?.seconds ||
!!options.deadLetterPolicy?.maxDeliveryAttempts &&
options.deadLetterPolicy.maxDeliveryAttempts !== existingSubscription.deadLetterPolicy?.maxDeliveryAttempts
}
}
44 changes: 38 additions & 6 deletions packages/pubsub/src/__tests__/Subscriber.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ describe('Subscriber', () => {
subscriptionMock.exists.mockReset()
subscriptionMock.create.mockReset()
subscriptionMock.setMetadata.mockReset()
subscriptionMock.getMetadata.mockReset()
subscriptionMock.getMetadata.mockReset()
iamTopicMock.setPolicy.mockReset()
iamSubscriptionMock.setPolicy.mockReset()
schemaClientMock.getSchema.mockReset()
Expand All @@ -73,6 +75,7 @@ describe('Subscriber', () => {
topicMock.exists.mockResolvedValue([false])
subscriptionMock.exists.mockResolvedValue([true])
topicMock.getMetadata.mockResolvedValue([])
subscriptionMock.getMetadata.mockResolvedValue([{}])

await subscriber.initialize()

Expand All @@ -84,6 +87,7 @@ describe('Subscriber', () => {
it('does not create topic if exists', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])

await subscriber.initialize()

Expand Down Expand Up @@ -120,9 +124,15 @@ describe('Subscriber', () => {
expect(subscriptionMock.setMetadata).not.toHaveBeenCalled()
})

it('updates metadata if subscription exists', async () => {
it('updates metadata if backoff has changed', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{
retryPolicy: {
minimumBackoff: { seconds: '45' },
maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) }
},
}])

await subscriber.initialize()

Expand All @@ -136,20 +146,34 @@ describe('Subscriber', () => {
})
})

it('resets retry policy unless backoff values provided', async () => {
it('does not update metadata if subscription exists and did not change', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{
retryPolicy: {
minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) },
maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) }
},
}])

await subscriber.initialize()

expect(subscriptionMock.create).not.toHaveBeenCalled()
expect(subscriptionMock.setMetadata).not.toHaveBeenCalled()
})

it('does not update retry policy if no values provided', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])

subscriber = new Subscriber({ topicName, subscriptionName }, clientMock as unknown as PubSub,
schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient)

await subscriber.initialize()

expect(subscriptionMock.create).not.toHaveBeenCalled()
expect(subscriptionMock.setMetadata).toHaveBeenCalledWith({
deadLetterPolicy: null,
retryPolicy: {},
})
expect(subscriptionMock.setMetadata).not.toHaveBeenCalled()
})

describe('dead letter policy', () => {
Expand All @@ -176,6 +200,7 @@ describe('Subscriber', () => {
describe('deadLetterTopic initialization', () => {
beforeEach(() => {
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
})

it('creates deadLetterTopic unless exists', async () => {
Expand Down Expand Up @@ -262,6 +287,7 @@ describe('Subscriber', () => {

it('does not create deadLetterSubscription if exists', async () => {
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])

await subscriber.initialize()

Expand Down Expand Up @@ -340,6 +366,7 @@ describe('Subscriber', () => {
it('receives avro parsed data', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_EXAMPLE)}])

Expand All @@ -362,6 +389,7 @@ describe('Subscriber', () => {
it('receives avro parsed data with null preserve fields', async () => {
topicMock.exists.mockResolvedValue([false])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE)}])

Expand All @@ -384,6 +412,7 @@ describe('Subscriber', () => {
it('processes avro encoded data without assigned schema from gcloud', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.listSchemaRevisions.mockResolvedValue([[{revisionId: 'revision', definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE)}]])

Expand All @@ -406,6 +435,7 @@ describe('Subscriber', () => {
it('processes avro encoded data with latest schema when schema from message can not be found', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])

schemaClientMock.getSchema.mockRejectedValue(new Error('NOT_FOUND'))
Expand All @@ -429,6 +459,7 @@ describe('Subscriber', () => {
it('receives avro parsed data and replaces empty array with undefined using path from metadata', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE)}])

Expand All @@ -452,6 +483,7 @@ describe('Subscriber', () => {
it('receives avro parsed data and replaces 2 empty array with undefined using path from metadata', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{}])
topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}])
schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE)}])

Expand Down
1 change: 1 addition & 0 deletions packages/pubsub/src/__tests__/support/pubsubMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export const getSubscriptionMock = ({ iamMock }: ISubscriptionMockOption = {}) =
exists: jest.fn(),
create: jest.fn(),
setMetadata: jest.fn(),
getMetadata: jest.fn(),
close: jest.fn(),
open: jest.fn(),
iam: iamMock,
Expand Down
Loading
Loading