From d55cb3e37fd40c9c9d37371ad899daf9d8dfe76a Mon Sep 17 00:00:00 2001 From: Ievgen Kyrpychenko Date: Fri, 20 Dec 2024 10:46:20 +0100 Subject: [PATCH 1/5] feat: TECH skip message processing when the message parsing fails --- packages/pubsub/src/Subscriber.ts | 50 ++- .../pubsub/src/__tests__/Subscriber.test.ts | 303 ++++++++++++------ 2 files changed, 234 insertions(+), 119 deletions(-) diff --git a/packages/pubsub/src/Subscriber.ts b/packages/pubsub/src/Subscriber.ts index f583f55..55b6a25 100644 --- a/packages/pubsub/src/Subscriber.ts +++ b/packages/pubsub/src/Subscriber.ts @@ -38,7 +38,7 @@ export interface ISubscriptionOptions { name: string id: number } - labels?: ({ [k: string]: string } | null) + labels?: { [k: string]: string } | null filter?: string } @@ -61,7 +61,7 @@ interface ISubscriptionDeadLetterPolicy { interface ISubscriptionInitializationOptions { deadLetterPolicy: ISubscriptionDeadLetterPolicy | null retryPolicy: ISubscriptionRetryPolicy - labels?: ({ [k: string]: string } | null); + labels?: { [k: string]: string } | null filter?: string } @@ -152,7 +152,9 @@ export class Subscriber { private async parseData(message: Message): Promise { let schemaId = message.attributes['googclient_schemarevisionid'] if (!schemaId) { - this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Message does not have schema revision id`, { message }) + this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Message does not have schema revision id`, { + message, + }) schemaId = await this.schemaCache.getLatestSchemaRevisionId() } @@ -178,7 +180,17 @@ export class Subscriber { asyncCallback: (msg: IParsedMessage, info: IMessageInfo) => Promise, ): (message: Message) => void { const asyncMessageProcessor = async (message: Message) => { - const dataParsed = await this.parseData(message) + const dataParsed = await this.parseData(message).catch(e => { + this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Failed to parse message:`, e) + return undefined + }) + + // If message parsing failed, nack the message and skip processing + if (!dataParsed) { + message.nack() + return + } + const messageParsed = Object.assign(message, { dataParsed }) const info: IMessageInfo = { id: message.id, @@ -229,8 +241,10 @@ export class Subscriber { } else if (options) { const [existingSubscription] = await subscription.getMetadata() if ((options.filter || existingSubscription.filter) && options.filter != existingSubscription.filter) { - throw new Error(`PubSub: Subscriptions filters are immutable, they can't be changed, subscription: ${subscriptionName},` + - ` currentFilter: ${existingSubscription.filter as string}, newFilter: ${options.filter as string}`) + throw new Error( + `PubSub: Subscriptions filters are immutable, they can't be changed, subscription: ${subscriptionName},` + + ` currentFilter: ${existingSubscription.filter as string}, newFilter: ${options.filter as string}`, + ) } if (this.isMetadataChanged(existingSubscription, options)) { await subscription.setMetadata(options) @@ -335,20 +349,28 @@ export class Subscriber { } private isMetadataChanged(existingSubscription: ISubscription, options: ISubscriptionInitializationOptions): boolean { - if (options.retryPolicy.minimumBackoff?.seconds && - String(options.retryPolicy.minimumBackoff.seconds) !== existingSubscription.retryPolicy?.minimumBackoff?.seconds) { + if ( + options.retryPolicy.minimumBackoff?.seconds && + String(options.retryPolicy.minimumBackoff.seconds) !== existingSubscription.retryPolicy?.minimumBackoff?.seconds + ) { return true } - if (options.retryPolicy.maximumBackoff?.seconds && - String(options.retryPolicy.maximumBackoff.seconds) !== existingSubscription.retryPolicy?.maximumBackoff?.seconds) { + if ( + options.retryPolicy.maximumBackoff?.seconds && + String(options.retryPolicy.maximumBackoff.seconds) !== existingSubscription.retryPolicy?.maximumBackoff?.seconds + ) { return true } - if (!!options.deadLetterPolicy?.maxDeliveryAttempts && - options.deadLetterPolicy.maxDeliveryAttempts !== existingSubscription.deadLetterPolicy?.maxDeliveryAttempts) { + if ( + !!options.deadLetterPolicy?.maxDeliveryAttempts && + options.deadLetterPolicy.maxDeliveryAttempts !== existingSubscription.deadLetterPolicy?.maxDeliveryAttempts + ) { return true } - if (!!options.labels && JSON.stringify(existingSubscription.labels) !== JSON.stringify(options.labels) - || options.labels == null && !!existingSubscription.labels && Object.keys(existingSubscription.labels).length !== 0) { + if ( + (!!options.labels && JSON.stringify(existingSubscription.labels) !== JSON.stringify(options.labels)) || + (options.labels == null && !!existingSubscription.labels && Object.keys(existingSubscription.labels).length !== 0) + ) { return true } diff --git a/packages/pubsub/src/__tests__/Subscriber.test.ts b/packages/pubsub/src/__tests__/Subscriber.test.ts index 976e66c..72d4256 100644 --- a/packages/pubsub/src/__tests__/Subscriber.test.ts +++ b/packages/pubsub/src/__tests__/Subscriber.test.ts @@ -29,10 +29,13 @@ const iamTopicMock = getIamMock() const topicMock = getTopicMock({ subscriptionMock, iamMock: iamTopicMock }) const clientMock = getClientMock({ topicMock }) const schemaClientMock = schemaServiceClientMock -const type = Type.forSchema(SCHEMA_DEFINITION_EXAMPLE as Schema, {logicalTypes: {'timestamp-micros': DateType}}) -const readerTypeWithArrays = Type.forSchema(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE as Schema, - {logicalTypes: {'timestamp-micros': DateType}}) -const typeWithPreserveNull = Type.forSchema(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE as Schema, {logicalTypes: {'timestamp-micros': DateType}}) +const type = Type.forSchema(SCHEMA_DEFINITION_EXAMPLE as Schema, { logicalTypes: { 'timestamp-micros': DateType } }) +const readerTypeWithArrays = Type.forSchema(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE as Schema, { + logicalTypes: { 'timestamp-micros': DateType }, +}) +const typeWithPreserveNull = Type.forSchema(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE as Schema, { + logicalTypes: { 'timestamp-micros': DateType }, +}) const flushPromises = () => new Promise(setImmediate) const subscriptionOptions: ISubscriptionOptions = { @@ -42,7 +45,7 @@ const subscriptionOptions: ISubscriptionOptions = { maxStreams: 1, minBackoffSeconds: 1, maxBackoffSeconds: 10, - labels: { testKey: 'testValue'} + labels: { testKey: 'testValue' }, } const loggerMock = jest.mocked({ @@ -56,8 +59,13 @@ describe('Subscriber', () => { beforeEach(() => { process.env['GCLOUD_PROJECT'] = 'project' - subscriber = new Subscriber({ topicName, subscriptionName, subscriptionOptions }, clientMock as unknown as PubSub, - schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient, new ConsoleLogger()) + subscriber = new Subscriber( + { topicName, subscriptionName, subscriptionOptions }, + clientMock as unknown as PubSub, + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, + new ConsoleLogger(), + ) }) afterEach(() => { @@ -137,14 +145,20 @@ describe('Subscriber', () => { it('creates subscription with filter', async () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([false]) - const subscriberWithFilter = new Subscriber({ - topicName, subscriptionName, + const subscriberWithFilter = new Subscriber( + { + topicName, + subscriptionName, subscriptionOptions: { ...subscriptionOptions, filter: 'attributes.testKey="testValue"', }, - }, clientMock as unknown as PubSub, - schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient, new ConsoleLogger()) + }, + clientMock as unknown as PubSub, + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, + new ConsoleLogger(), + ) await subscriberWithFilter.initialize() @@ -156,32 +170,44 @@ describe('Subscriber', () => { }, labels: subscriptionOptions.labels, gaxOpts: createCallOptions, - filter: 'attributes.testKey="testValue"' + filter: 'attributes.testKey="testValue"', }) }) it('throws error if filter has changed', async () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) - subscriptionMock.getMetadata.mockResolvedValue([{ - filter: 'attributes.testKey="currentValue"', - }]) + subscriptionMock.getMetadata.mockResolvedValue([ + { + filter: 'attributes.testKey="currentValue"', + }, + ]) - const subscriberWithFilter = new Subscriber({ - topicName, subscriptionName, + const subscriberWithFilter = new Subscriber( + { + topicName, + subscriptionName, subscriptionOptions: { ...subscriptionOptions, filter: 'attributes.testKey="newValue"', }, - }, clientMock as unknown as PubSub, - schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient, loggerMock) + }, + clientMock as unknown as PubSub, + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, + loggerMock, + ) const processAbortSpy = jest.spyOn(process, 'abort').mockImplementation() await subscriberWithFilter.initialize() - expect(loggerMock.error).toHaveBeenCalledWith('PubSub: Failed to initialize subscriber subscription-name', - new Error('PubSub: Subscriptions filters are immutable, they can\'t be changed, subscription: subscription-name, ' + - 'currentFilter: attributes.testKey="currentValue", newFilter: attributes.testKey="newValue"')) + expect(loggerMock.error).toHaveBeenCalledWith( + 'PubSub: Failed to initialize subscriber subscription-name', + new Error( + "PubSub: Subscriptions filters are immutable, they can't be changed, subscription: subscription-name, " + + 'currentFilter: attributes.testKey="currentValue", newFilter: attributes.testKey="newValue"', + ), + ) processAbortSpy.mockClear() }) @@ -189,17 +215,25 @@ describe('Subscriber', () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) // google cloud returns filter as empty string when no filter is set - subscriptionMock.getMetadata.mockResolvedValue([{ - filter: '', - }]) + subscriptionMock.getMetadata.mockResolvedValue([ + { + filter: '', + }, + ]) - const subscriberWithFilter = new Subscriber({ - topicName, subscriptionName, + const subscriberWithFilter = new Subscriber( + { + topicName, + subscriptionName, subscriptionOptions: { ...subscriptionOptions, }, - }, clientMock as unknown as PubSub, - schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient, loggerMock) + }, + clientMock as unknown as PubSub, + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, + loggerMock, + ) await subscriberWithFilter.initialize() @@ -209,12 +243,14 @@ describe('Subscriber', () => { it('updates metadata if backoff has changed', async () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) - subscriptionMock.getMetadata.mockResolvedValue([{ - retryPolicy: { - minimumBackoff: { seconds: '45' }, - maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) } + subscriptionMock.getMetadata.mockResolvedValue([ + { + retryPolicy: { + minimumBackoff: { seconds: '45' }, + maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) }, + }, }, - }]) + ]) await subscriber.initialize() @@ -232,13 +268,15 @@ describe('Subscriber', () => { it('does not update metadata if subscription exists and did not change', async () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) - subscriptionMock.getMetadata.mockResolvedValue([{ - retryPolicy: { + subscriptionMock.getMetadata.mockResolvedValue([ + { + retryPolicy: { minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) }, - maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) } + maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) }, + }, + labels: subscriptionOptions.labels, }, - labels: subscriptionOptions.labels, - }]) + ]) await subscriber.initialize() @@ -251,8 +289,12 @@ describe('Subscriber', () => { subscriptionMock.exists.mockResolvedValue([true]) subscriptionMock.getMetadata.mockResolvedValue([{}]) - subscriber = new Subscriber({ topicName, subscriptionName }, clientMock as unknown as PubSub, - schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient) + subscriber = new Subscriber( + { topicName, subscriptionName }, + clientMock as unknown as PubSub, + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, + ) await subscriber.initialize() @@ -263,9 +305,11 @@ describe('Subscriber', () => { it('updates labels if labels have changed', async () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) - subscriptionMock.getMetadata.mockResolvedValue([{ - labels: { testKey: 'oldValue' }, - }]) + subscriptionMock.getMetadata.mockResolvedValue([ + { + labels: { testKey: 'oldValue' }, + }, + ]) await subscriber.initialize() @@ -283,17 +327,23 @@ describe('Subscriber', () => { it('updates labels if labels were not empty and now are set to null', async () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) - subscriptionMock.getMetadata.mockResolvedValue([{ - retryPolicy: { - minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) }, - maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) } + subscriptionMock.getMetadata.mockResolvedValue([ + { + retryPolicy: { + minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) }, + maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) }, + }, + labels: { testKey: 'testValue' }, }, - labels: { testKey: 'testValue' }, - }]) + ]) const optionsWithNullLabels = { ...subscriptionOptions, labels: null } - subscriber = new Subscriber({ topicName, subscriptionName, subscriptionOptions: optionsWithNullLabels }, + subscriber = new Subscriber( + { topicName, subscriptionName, subscriptionOptions: optionsWithNullLabels }, clientMock as unknown as PubSub, - schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient, new ConsoleLogger()) + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, + new ConsoleLogger(), + ) await subscriber.initialize() @@ -311,17 +361,23 @@ describe('Subscriber', () => { it('does not update labels if labels were empty and now are set to null', async () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) - subscriptionMock.getMetadata.mockResolvedValue([{ - retryPolicy: { - minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) }, - maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) } + subscriptionMock.getMetadata.mockResolvedValue([ + { + retryPolicy: { + minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) }, + maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) }, + }, + labels: {}, }, - labels: {}, - }]) + ]) const optionsWithNullLabels = { ...subscriptionOptions, labels: null } - subscriber = new Subscriber({ topicName, subscriptionName, subscriptionOptions: optionsWithNullLabels }, + subscriber = new Subscriber( + { topicName, subscriptionName, subscriptionOptions: optionsWithNullLabels }, clientMock as unknown as PubSub, - schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient, new ConsoleLogger()) + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, + new ConsoleLogger(), + ) await subscriber.initialize() @@ -329,8 +385,6 @@ describe('Subscriber', () => { expect(subscriptionMock.setMetadata).not.toHaveBeenCalled() }) - - describe('dead letter policy', () => { const deadLetterTopicName = 'subscription-name-unack' const deadLetterSubscriptionName = 'subscription-name-unack' @@ -347,8 +401,9 @@ describe('Subscriber', () => { beforeEach(() => { subscriber = new Subscriber( { topicName, subscriptionName, subscriptionOptions: deadLetterOptions }, - clientMock as unknown as PubSub, schemaClientMock as unknown as SchemaServiceClient, - undefined as unknown as SubscriberClient + clientMock as unknown as PubSub, + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, ) }) @@ -396,8 +451,9 @@ describe('Subscriber', () => { const emptyOptions = {} const optionlessSubscriber = new Subscriber( { topicName, subscriptionName, subscriptionOptions: emptyOptions }, - clientMock as unknown as PubSub, schemaClientMock as unknown as SchemaServiceClient, - undefined as unknown as SubscriberClient + clientMock as unknown as PubSub, + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, ) await optionlessSubscriber.initialize() @@ -455,8 +511,9 @@ describe('Subscriber', () => { const emptyOptions = {} const optionlessSubscriber = new Subscriber( { topicName, subscriptionName, subscriptionOptions: emptyOptions }, - clientMock as unknown as PubSub, schemaClientMock as unknown as SchemaServiceClient, - undefined as unknown as SubscriberClient + clientMock as unknown as PubSub, + schemaClientMock as unknown as SchemaServiceClient, + undefined as unknown as SubscriberClient, ) await optionlessSubscriber.initialize() @@ -494,19 +551,26 @@ describe('Subscriber', () => { }) describe('start', () => { - const avroData = { first: 'one', second: 'two', third: undefined, - createdAt: new Date('Thu Nov 05 2015 11:38:05 GMT-0800 (PST)')} - const avroDataPreserveNullMessageFromAvro = { first: 'one', second: 'two', third: undefined, + const avroData = { + first: 'one', + second: 'two', + third: undefined, createdAt: new Date('Thu Nov 05 2015 11:38:05 GMT-0800 (PST)'), - now: { id: null, firstName: null }} - + } + const avroDataPreserveNullMessageFromAvro = { + first: 'one', + second: 'two', + third: undefined, + createdAt: new Date('Thu Nov 05 2015 11:38:05 GMT-0800 (PST)'), + now: { id: null, firstName: null }, + } it('receives avro parsed data', async () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) subscriptionMock.getMetadata.mockResolvedValue([{}]) - topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}]) - schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_EXAMPLE)}]) + topicMock.getMetadata.mockResolvedValue([{ schemaSettings: { schema: 'mock-schema' } }]) + schemaClientMock.getSchema.mockResolvedValue([{ definition: JSON.stringify(SCHEMA_DEFINITION_EXAMPLE) }]) await subscriber.initialize() @@ -524,13 +588,17 @@ describe('Subscriber', () => { topicMock.exists.mockResolvedValue([false]) subscriptionMock.exists.mockResolvedValue([true]) subscriptionMock.getMetadata.mockResolvedValue([{}]) - topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}]) - schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE)}]) + topicMock.getMetadata.mockResolvedValue([{ schemaSettings: { schema: 'mock-schema' } }]) + schemaClientMock.getSchema.mockResolvedValue([ + { definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE) }, + ]) await subscriber.initialize() - const messageMock = getMessageMock(Buffer.from(typeWithPreserveNull.toString(avroDataPreserveNullMessageFromAvro))) - messageMock.attributes = {'googclient_schemarevisionid': 'example', 'join_preserve_null': 'now'} + const messageMock = getMessageMock( + Buffer.from(typeWithPreserveNull.toString(avroDataPreserveNullMessageFromAvro)), + ) + messageMock.attributes = { googclient_schemarevisionid: 'example', join_preserve_null: 'now' } let parsedMessage: IParsedMessage | undefined subscriber.start(msg => { @@ -547,13 +615,15 @@ describe('Subscriber', () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) subscriptionMock.getMetadata.mockResolvedValue([{}]) - topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}]) - schemaClientMock.listSchemaRevisions.mockResolvedValue([[{revisionId: 'revision', definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE)}]]) + topicMock.getMetadata.mockResolvedValue([{ schemaSettings: { schema: 'mock-schema' } }]) + schemaClientMock.listSchemaRevisions.mockResolvedValue([ + [{ revisionId: 'revision', definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE) }], + ]) await subscriber.initialize() const messageMock = getMessageMock(Buffer.from(type.toString(avroData))) - messageMock.attributes = { } + messageMock.attributes = {} let parsedMessage: IParsedMessage | undefined subscriber.start(msg => { @@ -567,22 +637,24 @@ describe('Subscriber', () => { }) it('processes avro encoded data with latest schema when schema from message can not be found', async () => { - topicMock.exists.mockResolvedValue([true]) - subscriptionMock.exists.mockResolvedValue([true]) - subscriptionMock.getMetadata.mockResolvedValue([{}]) - topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}]) + topicMock.exists.mockResolvedValue([true]) + subscriptionMock.exists.mockResolvedValue([true]) + subscriptionMock.getMetadata.mockResolvedValue([{}]) + topicMock.getMetadata.mockResolvedValue([{ schemaSettings: { schema: 'mock-schema' } }]) - schemaClientMock.getSchema.mockRejectedValue(new Error('NOT_FOUND')) - schemaClientMock.listSchemaRevisions.mockResolvedValue([[{revisionId: 'revision', definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE)}]]) - await subscriber.initialize() + schemaClientMock.getSchema.mockRejectedValue(new Error('NOT_FOUND')) + schemaClientMock.listSchemaRevisions.mockResolvedValue([ + [{ revisionId: 'revision', definition: JSON.stringify(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE) }], + ]) + await subscriber.initialize() - const messageMock = getMessageMock(Buffer.from(type.toString(avroData))) + const messageMock = getMessageMock(Buffer.from(type.toString(avroData))) - let parsedMessage: IParsedMessage | undefined - subscriber.start(msg => { - parsedMessage = msg - return Promise.resolve() - }) + let parsedMessage: IParsedMessage | undefined + subscriber.start(msg => { + parsedMessage = msg + return Promise.resolve() + }) await subscriptionMock.receiveMessage(messageMock) await flushPromises() @@ -593,15 +665,20 @@ describe('Subscriber', () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) subscriptionMock.getMetadata.mockResolvedValue([{}]) - topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}]) - schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE)}]) + topicMock.getMetadata.mockResolvedValue([{ schemaSettings: { schema: 'mock-schema' } }]) + schemaClientMock.getSchema.mockResolvedValue([ + { definition: JSON.stringify(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE) }, + ]) await subscriber.initialize() const publishedMessage = { first: 'one', tags: ['tag'] } const receivedMessage = { first: 'one', tags: ['tag'], languages: [] } - const messageMock = getMessageMock(Buffer.from(readerTypeWithArrays.toString(receivedMessage))) - messageMock.attributes = {'googclient_schemarevisionid': 'example', 'join_undefined_or_null_optional_arrays': 'languages' } + const messageMock = getMessageMock(Buffer.from(readerTypeWithArrays.toString(receivedMessage))) + messageMock.attributes = { + googclient_schemarevisionid: 'example', + join_undefined_or_null_optional_arrays: 'languages', + } let parsedMessage: IParsedMessage | undefined subscriber.start(msg => { parsedMessage = msg @@ -617,15 +694,20 @@ describe('Subscriber', () => { topicMock.exists.mockResolvedValue([true]) subscriptionMock.exists.mockResolvedValue([true]) subscriptionMock.getMetadata.mockResolvedValue([{}]) - topicMock.getMetadata.mockResolvedValue([{'schemaSettings': {'schema': 'mock-schema'}}]) - schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE)}]) + topicMock.getMetadata.mockResolvedValue([{ schemaSettings: { schema: 'mock-schema' } }]) + schemaClientMock.getSchema.mockResolvedValue([ + { definition: JSON.stringify(SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE) }, + ]) await subscriber.initialize() - const publishedMessage = { first: 'one'} - const receivedMessage = { first: 'one', tags: [], languages: []} + const publishedMessage = { first: 'one' } + const receivedMessage = { first: 'one', tags: [], languages: [] } const messageMock = getMessageMock(Buffer.from(readerTypeWithArrays.toString(receivedMessage))) - messageMock.attributes = {'googclient_schemarevisionid': 'example', 'join_undefined_or_null_optional_arrays': 'languages,tags'} + messageMock.attributes = { + googclient_schemarevisionid: 'example', + join_undefined_or_null_optional_arrays: 'languages,tags', + } let parsedMessage: IParsedMessage | undefined subscriber.start(msg => { parsedMessage = msg @@ -638,7 +720,7 @@ describe('Subscriber', () => { }) it('unacknowledges message if processing fails', async () => { - schemaClientMock.getSchema.mockResolvedValue([{definition: JSON.stringify(SCHEMA_DEFINITION_EXAMPLE)}]) + schemaClientMock.getSchema.mockResolvedValue([{ definition: JSON.stringify(SCHEMA_DEFINITION_EXAMPLE) }]) subscriber.start(() => Promise.reject('Something wrong')) const messageMock = getMessageMock(Buffer.from(type.toString(avroData))) @@ -647,6 +729,17 @@ describe('Subscriber', () => { expect(messageMock.nack).toHaveBeenCalled() }) + + it('unacknowledges message if message parsing fails', async () => { + schemaClientMock.getSchema.mockResolvedValue([{ definition: JSON.stringify(SCHEMA_DEFINITION_EXAMPLE) }]) + subscriber.start(() => Promise.resolve()) + + const messageMock = getMessageMock(Buffer.from('invalid-message')) + await subscriptionMock.receiveMessage(messageMock) + await flushPromises() + + expect(messageMock.nack).toHaveBeenCalled() + }) }) describe('stop', () => { From 2776e749f3c822dbbec73ce00a68d6dd34a931d0 Mon Sep 17 00:00:00 2001 From: Eugene Taran Date: Tue, 31 Dec 2024 16:22:46 +0100 Subject: [PATCH 2/5] fix: TECH do not send filter for update --- packages/pubsub/src/Subscriber.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/pubsub/src/Subscriber.ts b/packages/pubsub/src/Subscriber.ts index 55b6a25..284036e 100644 --- a/packages/pubsub/src/Subscriber.ts +++ b/packages/pubsub/src/Subscriber.ts @@ -247,7 +247,11 @@ export class Subscriber { ) } if (this.isMetadataChanged(existingSubscription, options)) { - await subscription.setMetadata(options) + await subscription.setMetadata({ + deadLetterPolicy: options.deadLetterPolicy, + retryPolicy: options.retryPolicy, + labels: options?.labels + }) this.logger?.info(`PubSub: Subscription ${subscriptionName} metadata updated`) } } From 63e2e75c322e686738323dbce5350313d11f1076 Mon Sep 17 00:00:00 2001 From: Eugene Taran Date: Tue, 31 Dec 2024 16:27:50 +0100 Subject: [PATCH 3/5] fix: TECH trigger release --- packages/pubsub/src/Subscriber.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pubsub/src/Subscriber.ts b/packages/pubsub/src/Subscriber.ts index 284036e..94ed15d 100644 --- a/packages/pubsub/src/Subscriber.ts +++ b/packages/pubsub/src/Subscriber.ts @@ -248,8 +248,8 @@ export class Subscriber { } if (this.isMetadataChanged(existingSubscription, options)) { await subscription.setMetadata({ - deadLetterPolicy: options.deadLetterPolicy, retryPolicy: options.retryPolicy, + deadLetterPolicy: options.deadLetterPolicy, labels: options?.labels }) this.logger?.info(`PubSub: Subscription ${subscriptionName} metadata updated`) From 1fb49f07fcb00564d1337a6a96ac8bfa78dff92d Mon Sep 17 00:00:00 2001 From: Eugene Taran Date: Thu, 9 Jan 2025 13:36:09 +0100 Subject: [PATCH 4/5] fix: JOIN-49692 enable topic exist check in subscription only by env variable --- packages/pubsub/src/Subscriber.ts | 4 +++- packages/pubsub/src/__tests__/Subscriber.test.ts | 10 ++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/pubsub/src/Subscriber.ts b/packages/pubsub/src/Subscriber.ts index 94ed15d..12745f8 100644 --- a/packages/pubsub/src/Subscriber.ts +++ b/packages/pubsub/src/Subscriber.ts @@ -109,7 +109,9 @@ export class Subscriber { public async initialize(): Promise { try { - await this.initializeTopic(this.topicName, this.topic) + if (process.env['PUBSUB_SUBSCRIPTION_TOPIC_INIT'] === 'true') { + await this.initializeTopic(this.topicName, this.topic) + } await this.initializeDeadLetterTopic() diff --git a/packages/pubsub/src/__tests__/Subscriber.test.ts b/packages/pubsub/src/__tests__/Subscriber.test.ts index 72d4256..cdc2ed1 100644 --- a/packages/pubsub/src/__tests__/Subscriber.test.ts +++ b/packages/pubsub/src/__tests__/Subscriber.test.ts @@ -59,6 +59,7 @@ describe('Subscriber', () => { beforeEach(() => { process.env['GCLOUD_PROJECT'] = 'project' + process.env['PUBSUB_SUBSCRIPTION_TOPIC_INIT'] = undefined subscriber = new Subscriber( { topicName, subscriptionName, subscriptionOptions }, clientMock as unknown as PubSub, @@ -88,7 +89,8 @@ describe('Subscriber', () => { }) describe('initialize', () => { - it('creates topic unless exists', async () => { + it('creates topic unless exists if PUBSUB_SUBSCRIPTION_TOPIC_INIT equals true', async () => { + process.env['PUBSUB_SUBSCRIPTION_TOPIC_INIT'] = 'true' topicMock.exists.mockResolvedValue([false]) subscriptionMock.exists.mockResolvedValue([true]) topicMock.getMetadata.mockResolvedValue([]) @@ -418,8 +420,8 @@ describe('Subscriber', () => { await subscriber.initialize() - expect(topicMock.create).toHaveBeenCalledTimes(2) - expect(clientMock.topic).toHaveBeenLastCalledWith(deadLetterTopicName) + expect(topicMock.create).toHaveBeenCalledTimes(1) + expect(clientMock.topic).toHaveBeenCalledWith(deadLetterTopicName) }) it('adds publisher role to pubsub service account', async () => { @@ -458,7 +460,7 @@ describe('Subscriber', () => { await optionlessSubscriber.initialize() - expect(topicMock.create).toHaveBeenCalledTimes(1) + expect(topicMock.create).not.toHaveBeenCalled() expect(clientMock.topic).toHaveBeenLastCalledWith(topicName) expect(iamTopicMock.setPolicy).not.toHaveBeenCalled() }) From f7a6006e77350ac35dd5cf8504e061c0ef837f48 Mon Sep 17 00:00:00 2001 From: Eugene Taran Date: Thu, 9 Jan 2025 13:39:47 +0100 Subject: [PATCH 5/5] trigger commit