Skip to content

Commit

Permalink
feat: JOIN-47413 clean up pubsub lib from schemaless approach (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran authored Oct 18, 2024
1 parent ff81351 commit e229315
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 277 deletions.
15 changes: 0 additions & 15 deletions packages/pubsub/src/DataParser.ts

This file was deleted.

154 changes: 21 additions & 133 deletions packages/pubsub/src/Publisher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { readFileSync } from 'fs'
import { PubSub, Topic } from '@google-cloud/pubsub'
import { google } from '@google-cloud/pubsub/build/protos/protos'
import { MessageOptions } from '@google-cloud/pubsub/build/src/topic'
import { Schema, Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
import { FieldsProcessor } from './FieldsProcessor'
Expand Down Expand Up @@ -30,38 +29,29 @@ export class Publisher<T = unknown> {
private readonly topic: Topic
private readonly topicSchemaName: string

private readonly writerAvroType?: Type
private readonly readerAvroType?: Type
private readonly writerAvroType: Type
private readonly readerAvroType: Type
private readonly optionArrayPaths?: string[]

private readonly avroMessageMetadata?: Record<string, string>
private readonly fieldsProcessor = new FieldsProcessor()
//TODO: remove fields below, when only avro will be used, why we use jsonPublisher described here: https://joinsolutionsag.atlassian.net/browse/JOIN-38534
private jsonPublisher: Topic
private topicHasAssignedSchema = false
private avroSchemasProvided = false

constructor(
public readonly topicName: string,
public readonly client: PubSub,
avroSchemas: { writer: object; reader: object },
private readonly logger?: ILogger,
avroSchemas?: { writer: object; reader: object },
) {
//TODO: avroSchemas parameter should be mandatory when only avro is used
if (avroSchemas) {
this.avroSchemasProvided = true
const writerAvroSchema: SchemaWithMetadata = avroSchemas.writer as SchemaWithMetadata
this.writerAvroType = Type.forSchema(writerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })
if (writerAvroSchema.OptionalArrayPaths) {
this.optionArrayPaths = writerAvroSchema.OptionalArrayPaths.split(',')
}
const readerAvroSchema: SchemaWithMetadata = avroSchemas.reader as SchemaWithMetadata
this.readerAvroType = Type.forSchema(readerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })

this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema)
const writerAvroSchema: SchemaWithMetadata = avroSchemas.writer as SchemaWithMetadata
this.writerAvroType = Type.forSchema(writerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })
if (writerAvroSchema.OptionalArrayPaths) {
this.optionArrayPaths = writerAvroSchema.OptionalArrayPaths.split(',')
}
const readerAvroSchema: SchemaWithMetadata = avroSchemas.reader as SchemaWithMetadata
this.readerAvroType = Type.forSchema(readerAvroSchema, { logicalTypes: { 'timestamp-micros': DateType } })

this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema)
this.topic = client.topic(topicName)
this.jsonPublisher = client.topic(topicName, { gaxOpts: { retry: null } })
this.topicSchemaName = `${this.topicName}-generated-avro`
}

Expand All @@ -76,51 +66,7 @@ export class Publisher<T = unknown> {
}

public async publishMsg(data: T): Promise<void> {
if (!this.avroSchemasProvided) {
// old flow, just send message if no avro schemas provided
await this.sendJsonMessage({ json: data })
} else if (!this.topicHasAssignedSchema) {
try {
// on startup we send json message, and we are relying on switching to avro inside try-catch block.
// if json schema matches avro schema, which can only be if we have only one field with mandatory array of primitives
// we will send json message to avro topic, which will work, because JSON payload matches Avro encoded JSON payload
// we are losing some metadata because of this, but we are fine with this, because it's a only couple of events,
// that are not that important. Solution for this should be removing JSON, and using only avro.
// PR that can be used as base for this work: https://github.com/join-com/pubsub/pull/97
await this.sendJsonMessage({ json: data })
this.logWarnIfMessageViolatesSchema(data)
} catch (e) {
//it's a corner case when application started without schema on topic, and then schema was added to the topic
//in this case we are trying to resend message with avro format if schema appeared
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()
if (!this.topicHasAssignedSchema) {
throw e
}
await this.sendAvroMessage(data)
}
} else {
// TODO: remove everything except this call after services will be ready to use only avro
await this.sendAvroMessage(data)
}
}

private logWarnIfMessageViolatesSchema(data: T): void {
if (this.writerAvroType) {
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(
data as Record<string, unknown>,
this.optionArrayPaths,
)
}
const invalidPaths: string[] = []
if (!this.writerAvroType.isValid(data, { errorHook: path => invalidPaths.push(path.join('.')) })) {
this.logger?.warn(`[schema-violation] [${this.topicName}] Message violates writer avro schema`, {
payload: data,
metadata: this.avroMessageMetadata,
invalidPaths,
})
}
}
await this.sendAvroMessage(data)
}

public async flush(): Promise<void> {
Expand All @@ -139,54 +85,17 @@ export class Publisher<T = unknown> {
}

private async initializeTopicSchema(): Promise<void> {
if (this.avroSchemasProvided) {
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()

if (!this.topicHasAssignedSchema && (await this.doesRegistryHaveTopicSchema())) {
// TODO: this.setSchemaToTheTopic() should be replace with
// ```await this.topic.setMetadata({ schemaSettings: { schema: this.topicSchemaName, encoding: Encoding.JSON }})
// this.topicHasAssignedSchema = true```
// once https://github.com/googleapis/nodejs-pubsub/issues/1587 is fixed
this.setSchemaToTheTopic()
if (!await this.doesTopicHaveSchemaAssigned()) {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error("Can't find GCLOUD_PROJECT env variable, please define it")
}
const schema = `projects/${projectName}/schemas/${this.topicSchemaName}`
await this.topic.setMetadata({ schemaSettings: { schema, encoding: Encoding.JSON }})
this.logger?.info(`PubSub: Schema ${schema} is assigned to the topic: ${this.topicName}`)
}
}

private setSchemaToTheTopic() {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error("Can't find GCLOUD_PROJECT env variable, please define it")
}

this.topic.request(
{
client: 'PublisherClient',
method: 'updateTopic',
reqOpts: {
topic: {
name: `projects/${projectName}/topics/${this.topicName}`,
schemaSettings: {
schema: `projects/${projectName}/schemas/${this.topicSchemaName}`,
encoding: Encoding.JSON,
},
},
updateMask: {
paths: ['schema_settings'],
},
},
gaxOpts: {},
},
(err, _) => {
if (!err) {
this.topicHasAssignedSchema = true
this.logger?.info(`Schema '${this.topicSchemaName}' set to the topic '${this.topicName}'`)
} else {
this.logger?.error(`Couldn't set schema '${this.topicSchemaName}' to the topic '${this.topicName}'`)
}
},
)
}

private async sendAvroMessage(data: T): Promise<void> {
let currentMessageMetadata = this.avroMessageMetadata
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
Expand All @@ -199,10 +108,7 @@ export class Publisher<T = unknown> {
currentMessageMetadata[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS] = undefinedOrNullOptionalArrays.join(',')
}
}
// TODO: remove non-null assertion and eslint-disable when avroType will be mandatory on every topic
// for now we are checking that avro is enabled before calling sendAvroMessage
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (!this.writerAvroType!.isValid(data)) {
if (!this.writerAvroType.isValid(data)) {
this.logger?.error(
`[${this.topicName}] Invalid payload for the specified writer schema, please check that the schema is correct ' +
'and payload can be encoded with it`,
Expand All @@ -214,20 +120,11 @@ export class Publisher<T = unknown> {
logWarnWhenUndefinedInNullPreserveFields(data, currentMessageMetadata[JOIN_PRESERVE_NULL], this.logger)
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const buffer = Buffer.from(this.readerAvroType!.toString(data))
const buffer = Buffer.from(this.readerAvroType.toString(data))
const messageId = await this.topic.publishMessage({ data: buffer, attributes: currentMessageMetadata })
this.logger?.info(`PubSub: Avro message sent for topic: ${this.topicName}:`, { data, messageId })
}

private async sendJsonMessage(message: MessageOptions) {
const messageId = await this.jsonPublisher.publishMessage(message)
this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, {
data: message.json as unknown,
messageId,
})
}

private prepareAvroMessageMetadata(schema: SchemaWithMetadata): Record<string, string> {
const metadata: Record<string, string> = {}

Expand Down Expand Up @@ -257,13 +154,4 @@ export class Publisher<T = unknown> {
const schemaName = metadata?.schemaSettings?.schema
return !!schemaName
}

public async doesRegistryHaveTopicSchema(): Promise<boolean> {
try {
return !!(await this.client.schema(this.topicSchemaName).get())
} catch (e) {
this.logger?.info(`Schema ${this.topicSchemaName} can't be found`, e)
return false
}
}
}
17 changes: 1 addition & 16 deletions packages/pubsub/src/PublisherFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,6 @@ export class PublisherFactory<TypeMap> {
}

public getPublisher<Topic extends keyof TypeMap> (topic: Topic): IPublisher<TypeMap[Topic]> {
return new Publisher(topic.toString(), this.client, this.logger, this.avroSchemas[topic])
}
}

/**
* @deprecated should be used only when migration of the events/commands is not possible
*/
export class PublisherFactorySchemaless<TypeMap> {
private readonly client: PubSub

constructor(private readonly logger: ILogger, private readonly avroSchemas?: Record<keyof TypeMap, { writer: object, reader: object }>) {
this.client = new PubSub()
}

public getPublisher<Topic extends keyof TypeMap> (topic: Topic): IPublisher<TypeMap[Topic]> {
return new Publisher(topic.toString(), this.client, this.logger, this.avroSchemas?.[topic])
return new Publisher(topic.toString(), this.client, this.avroSchemas[topic], this.logger)
}
}
30 changes: 10 additions & 20 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { google } from '@google-cloud/pubsub/build/protos/protos'
import { SchemaServiceClient, SubscriberClient } from '@google-cloud/pubsub/build/src/v1'
import { Type } from 'avsc'
import { createCallOptions } from './createCallOptions'
import { DataParser } from './DataParser'
import { FieldsProcessor } from './FieldsProcessor'
import { ILogger } from './ILogger'
import { JOIN_PRESERVE_NULL, JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS } from './Publisher'
Expand Down Expand Up @@ -143,30 +142,21 @@ export class Subscriber<T = unknown> {
}

private async parseData(message: Message): Promise<T> {
let dataParsed: T
let schemaId = message.attributes['googclient_schemarevisionid']

// TODO: fix for the first couple of messages, that don't have "googclient_schemarevisionid" after the schema is assigned
// Ticket for Google Cloud will be created
if (!schemaId && message.attributes['join_avdl_schema_version']) {
if (!schemaId) {
this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Message does not have schema revision id`, { message })
schemaId = await this.schemaCache.getLatestSchemaRevisionId()
}

// TODO: remove if else block as only avro should be used, throw error if there is no schema revision
if (schemaId) {
dataParsed = await this.parseAvroMessage(message, schemaId)
const undefinedOrNullOptionalArrays = message.attributes[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS]
if (undefinedOrNullOptionalArrays) {
this.fieldsProcessor.setEmptyArrayFieldsToUndefined(
dataParsed as Record<string, unknown>,
undefinedOrNullOptionalArrays.split(','),
)
}
replaceNullsWithUndefined(dataParsed, message.attributes[JOIN_PRESERVE_NULL])
} else {
const dataParser = new DataParser()
dataParsed = dataParser.parse(message.data) as T
const dataParsed = await this.parseAvroMessage(message, schemaId)
const undefinedOrNullOptionalArrays = message.attributes[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS]
if (undefinedOrNullOptionalArrays) {
this.fieldsProcessor.setEmptyArrayFieldsToUndefined(
dataParsed as Record<string, unknown>,
undefinedOrNullOptionalArrays.split(','),
)
}
replaceNullsWithUndefined(dataParsed, message.attributes[JOIN_PRESERVE_NULL])
this.logMessage(message, dataParsed)
return dataParsed
}
Expand Down
Loading

0 comments on commit e229315

Please sign in to comment.