Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: JOIN-47413 clean up pubsub lib from schemaless approach #107

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions packages/pubsub/src/DataParser.ts

This file was deleted.

154 changes: 21 additions & 133 deletions packages/pubsub/src/Publisher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { readFileSync } from 'fs'
import { PubSub, Topic } from '@google-cloud/pubsub'
import { google } from '@google-cloud/pubsub/build/protos/protos'
import { MessageOptions } from '@google-cloud/pubsub/build/src/topic'
import { Schema, Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
import { FieldsProcessor } from './FieldsProcessor'
Expand Down Expand Up @@ -30,38 +29,29 @@ export class Publisher<T = unknown> {
private readonly topic: Topic
private readonly topicSchemaName: string

private readonly writerAvroType?: Type
private readonly readerAvroType?: Type
private readonly writerAvroType: Type
private readonly readerAvroType: Type
private readonly optionArrayPaths?: string[]

private readonly avroMessageMetadata?: Record<string, string>
private readonly fieldsProcessor = new FieldsProcessor()
//TODO: remove fields below, when only avro will be used, why we use jsonPublisher described here: https://joinsolutionsag.atlassian.net/browse/JOIN-38534
private jsonPublisher: Topic
private topicHasAssignedSchema = false
private avroSchemasProvided = false

constructor(
public readonly topicName: string,
public readonly client: PubSub,
avroSchemas: { writer: object; reader: object },
private readonly logger?: ILogger,
avroSchemas?: { writer: object; reader: object },
) {
//TODO: avroSchemas parameter should be mandatory when only avro is used
if (avroSchemas) {
this.avroSchemasProvided = true
const writerAvroSchema: SchemaWithMetadata = avroSchemas.writer as SchemaWithMetadata
this.writerAvroType = Type.forSchema(writerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })
if (writerAvroSchema.OptionalArrayPaths) {
this.optionArrayPaths = writerAvroSchema.OptionalArrayPaths.split(',')
}
const readerAvroSchema: SchemaWithMetadata = avroSchemas.reader as SchemaWithMetadata
this.readerAvroType = Type.forSchema(readerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })

this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema)
const writerAvroSchema: SchemaWithMetadata = avroSchemas.writer as SchemaWithMetadata
this.writerAvroType = Type.forSchema(writerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })
if (writerAvroSchema.OptionalArrayPaths) {
this.optionArrayPaths = writerAvroSchema.OptionalArrayPaths.split(',')
}
const readerAvroSchema: SchemaWithMetadata = avroSchemas.reader as SchemaWithMetadata
this.readerAvroType = Type.forSchema(readerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })

this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema)
this.topic = client.topic(topicName)
this.jsonPublisher = client.topic(topicName, { gaxOpts: { retry: null } })
this.topicSchemaName = `${this.topicName}-generated-avro`
}

Expand All @@ -76,51 +66,7 @@ export class Publisher<T = unknown> {
}

public async publishMsg(data: T): Promise<void> {
if (!this.avroSchemasProvided) {
// old flow, just send message if no avro schemas provided
await this.sendJsonMessage({ json: data })
} else if (!this.topicHasAssignedSchema) {
try {
// on startup we send json message, and we are relying on switching to avro inside try-catch block.
// if json schema matches avro schema, which can only be if we have only one field with mandatory array of primitives
// we will send json message to avro topic, which will work, because JSON payload matches Avro encoded JSON payload
// we are losing some metadata because of this, but we are fine with this, because it's a only couple of events,
// that are not that important. Solution for this should be removing JSON, and using only avro.
// PR that can be used as base for this work: https://github.com/join-com/pubsub/pull/97
await this.sendJsonMessage({ json: data })
this.logWarnIfMessageViolatesSchema(data)
} catch (e) {
//it's a corner case when application started without schema on topic, and then schema was added to the topic
//in this case we are trying to resend message with avro format if schema appeared
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()
if (!this.topicHasAssignedSchema) {
throw e
}
await this.sendAvroMessage(data)
}
} else {
// TODO: remove everything except this call after services will be ready to use only avro
await this.sendAvroMessage(data)
}
}

private logWarnIfMessageViolatesSchema(data: T): void {
if (this.writerAvroType) {
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(
data as Record<string, unknown>,
this.optionArrayPaths,
)
}
const invalidPaths: string[] = []
if (!this.writerAvroType.isValid(data, { errorHook: path => invalidPaths.push(path.join('.')) })) {
this.logger?.warn(`[schema-violation] [${this.topicName}] Message violates writer avro schema`, {
payload: data,
metadata: this.avroMessageMetadata,
invalidPaths,
})
}
}
await this.sendAvroMessage(data)
}

public async flush(): Promise<void> {
Expand All @@ -139,54 +85,17 @@ export class Publisher<T = unknown> {
}

private async initializeTopicSchema(): Promise<void> {
if (this.avroSchemasProvided) {
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()

if (!this.topicHasAssignedSchema && (await this.doesRegistryHaveTopicSchema())) {
// TODO: this.setSchemaToTheTopic() should be replace with
// ```await this.topic.setMetadata({ schemaSettings: { schema: this.topicSchemaName, encoding: Encoding.JSON }})
// this.topicHasAssignedSchema = true```
// once https://github.com/googleapis/nodejs-pubsub/issues/1587 is fixed
this.setSchemaToTheTopic()
if (!await this.doesTopicHaveSchemaAssigned()) {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error("Can't find GCLOUD_PROJECT env variable, please define it")
}
const schema = `projects/${projectName}/schemas/${this.topicSchemaName}`
await this.topic.setMetadata({ schemaSettings: { schema, encoding: Encoding.JSON }})
this.logger?.info(`PubSub: Schema ${schema} is assigned to the topic: ${this.topicName}`)
}
}

private setSchemaToTheTopic() {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error("Can't find GCLOUD_PROJECT env variable, please define it")
}

this.topic.request(
{
client: 'PublisherClient',
method: 'updateTopic',
reqOpts: {
topic: {
name: `projects/${projectName}/topics/${this.topicName}`,
schemaSettings: {
schema: `projects/${projectName}/schemas/${this.topicSchemaName}`,
encoding: Encoding.JSON,
},
},
updateMask: {
paths: ['schema_settings'],
},
},
gaxOpts: {},
},
(err, _) => {
if (!err) {
this.topicHasAssignedSchema = true
this.logger?.info(`Schema '${this.topicSchemaName}' set to the topic '${this.topicName}'`)
} else {
this.logger?.error(`Couldn't set schema '${this.topicSchemaName}' to the topic '${this.topicName}'`)
}
},
)
}

private async sendAvroMessage(data: T): Promise<void> {
let currentMessageMetadata = this.avroMessageMetadata
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
Expand All @@ -199,10 +108,7 @@ export class Publisher<T = unknown> {
currentMessageMetadata[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS] = undefinedOrNullOptionalArrays.join(',')
}
}
// TODO: remove non-null assertion and eslint-disable when avroType will be mandatory on every topic
// for now we are checking that avro is enabled before calling sendAvroMessage
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (!this.writerAvroType!.isValid(data)) {
if (!this.writerAvroType.isValid(data)) {
this.logger?.error(
`[${this.topicName}] Invalid payload for the specified writer schema, please check that the schema is correct ' +
'and payload can be encoded with it`,
Expand All @@ -214,20 +120,11 @@ export class Publisher<T = unknown> {
logWarnWhenUndefinedInNullPreserveFields(data, currentMessageMetadata[JOIN_PRESERVE_NULL], this.logger)
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const buffer = Buffer.from(this.readerAvroType!.toString(data))
const buffer = Buffer.from(this.readerAvroType.toString(data))
const messageId = await this.topic.publishMessage({ data: buffer, attributes: currentMessageMetadata })
this.logger?.info(`PubSub: Avro message sent for topic: ${this.topicName}:`, { data, messageId })
}

private async sendJsonMessage(message: MessageOptions) {
const messageId = await this.jsonPublisher.publishMessage(message)
this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, {
data: message.json as unknown,
messageId,
})
}

private prepareAvroMessageMetadata(schema: SchemaWithMetadata): Record<string, string> {
const metadata: Record<string, string> = {}

Expand Down Expand Up @@ -257,13 +154,4 @@ export class Publisher<T = unknown> {
const schemaName = metadata?.schemaSettings?.schema
return !!schemaName
}

public async doesRegistryHaveTopicSchema(): Promise<boolean> {
try {
return !!(await this.client.schema(this.topicSchemaName).get())
} catch (e) {
this.logger?.info(`Schema ${this.topicSchemaName} can't be found`, e)
return false
}
}
}
17 changes: 1 addition & 16 deletions packages/pubsub/src/PublisherFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,6 @@ export class PublisherFactory<TypeMap> {
}

public getPublisher<Topic extends keyof TypeMap> (topic: Topic): IPublisher<TypeMap[Topic]> {
return new Publisher(topic.toString(), this.client, this.logger, this.avroSchemas[topic])
}
}

/**
* @deprecated should be used only when migration of the events/commands is not possible
*/
export class PublisherFactorySchemaless<TypeMap> {
private readonly client: PubSub

constructor(private readonly logger: ILogger, private readonly avroSchemas?: Record<keyof TypeMap, { writer: object, reader: object }>) {
this.client = new PubSub()
}

public getPublisher<Topic extends keyof TypeMap> (topic: Topic): IPublisher<TypeMap[Topic]> {
return new Publisher(topic.toString(), this.client, this.logger, this.avroSchemas?.[topic])
return new Publisher(topic.toString(), this.client, this.avroSchemas[topic], this.logger)
}
}
30 changes: 10 additions & 20 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { google } from '@google-cloud/pubsub/build/protos/protos'
import { SchemaServiceClient, SubscriberClient } from '@google-cloud/pubsub/build/src/v1'
import { Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
import { DataParser } from './DataParser'
import { FieldsProcessor } from './FieldsProcessor'
import { ILogger } from './ILogger'
import { JOIN_PRESERVE_NULL, JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS } from './Publisher'
Expand Down Expand Up @@ -143,30 +142,21 @@ export class Subscriber<T = unknown> {
}

private async parseData(message: Message): Promise<T> {
let dataParsed: T
let schemaId = message.attributes['googclient_schemarevisionid']

// TODO: fix for the first couple of messages, that don't have "googclient_schemarevisionid" after the schema is assigned
// Ticket for Google Cloud will be created
if (!schemaId && message.attributes['join_avdl_schema_version']) {
if (!schemaId) {
this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Message does not have schema revision id`, { message })
schemaId = await this.schemaCache.getLatestSchemaRevisionId()
}

// TODO: remove if else block as only avro should be used, throw error if there is no schema revision
if (schemaId) {
dataParsed = await this.parseAvroMessage(message, schemaId)
const undefinedOrNullOptionalArrays = message.attributes[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS]
if (undefinedOrNullOptionalArrays) {
this.fieldsProcessor.setEmptyArrayFieldsToUndefined(
dataParsed as Record<string, unknown>,
undefinedOrNullOptionalArrays.split(','),
)
}
replaceNullsWithUndefined(dataParsed, message.attributes[JOIN_PRESERVE_NULL])
} else {
const dataParser = new DataParser()
dataParsed = dataParser.parse(message.data) as T
const dataParsed = await this.parseAvroMessage(message, schemaId)
const undefinedOrNullOptionalArrays = message.attributes[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS]
if (undefinedOrNullOptionalArrays) {
this.fieldsProcessor.setEmptyArrayFieldsToUndefined(
dataParsed as Record<string, unknown>,
undefinedOrNullOptionalArrays.split(','),
)
}
replaceNullsWithUndefined(dataParsed, message.attributes[JOIN_PRESERVE_NULL])
this.logMessage(message, dataParsed)
return dataParsed
}
Expand Down
Loading
Loading