Skip to content

Commit

Permalink
build(deps): TECH update dependencies (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
kirpichenko authored Jun 10, 2024
1 parent 9f04441 commit 8ed29fa
Show file tree
Hide file tree
Showing 9 changed files with 1,471 additions and 1,039 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defaults: &defaults
environment:
NODE_OPTIONS: '--max_old_space_size=1536'
docker:
- image: cimg/node:16.14.1
- image: cimg/node:20.11.0
<<: *docker-auth

commands:
Expand Down
2 changes: 1 addition & 1 deletion .node-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
16.14.1
20.11.0
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"postinstall": "if [ -d .git ]; then git config core.hooksPath .hooks; fi"
},
"engines": {
"node": ">=14.0.0"
"node": "^20.9.0"
},
"devDependencies": {
"conventional-changelog-conventionalcommits": "^5.0.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub-legacy-factories/src/SubscriberFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class SubscriberFactory {
private readonly client: PubSub
private readonly schemaServiceClient: SchemaServiceClient

constructor(readonly options: ISubscriptionOptions, private readonly logger: ILogger) {
constructor(public readonly options: ISubscriptionOptions, private readonly logger: ILogger) {
this.client = new PubSub()
this.schemaServiceClient = new SchemaServiceClient()
}
Expand Down
22 changes: 11 additions & 11 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@
"prepublishOnly": "yarn lint && yarn build"
},
"dependencies": {
"@google-cloud/pubsub": "^3.5.1",
"@google-cloud/pubsub": "^4.3.3",
"avsc": "^5.7.7"
},
"devDependencies": {
"@join-private/eslint-config-backend": "^1.3.0",
"@join-private/eslint-config-backend": "^1.5.0",
"@side/jest-runtime": "^1.1.0",
"@swc/core": "1.3.90",
"@swc/jest": "0.2.29",
"@types/jest": "^29.1.1",
"@types/node": "^18.8.2",
"eslint": "^8.24.0",
"@swc/core": "1.4.16",
"@swc/jest": "0.2.36",
"@types/jest": "^29.5.12",
"@types/node": "^20.12.7",
"eslint": "^8.57.0",
"jest": "^29.7.0",
"jest-extended": "^3.1.0",
"jest-watch-typeahead": "^2.2.0",
"prettier": "^2.7.1",
"typescript": "^4.8.4"
"jest-extended": "^4.0.2",
"jest-watch-typeahead": "^2.2.2",
"prettier": "^3.2.5",
"typescript": "^5.4.5"
},
"publishConfig": {
"access": "public"
Expand Down
60 changes: 39 additions & 21 deletions packages/pubsub/src/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import { logWarnWhenUndefinedInNullPreserveFields } from './util'
import Encoding = google.pubsub.v1.Encoding

interface IMessageMetadata {
Event: string,
GeneratorVersion: string,
GeneratorGitRemoteOriginUrl: string,
Event: string
GeneratorVersion: string
GeneratorGitRemoteOriginUrl: string
SchemaType: string
AvdlSchemaPathInGitRepo: string,
AvdlSchemaGitRemoteOriginUrl: string,
AvdlSchemaVersion: string,
AvdlSchemaPathInGitRepo: string
AvdlSchemaGitRemoteOriginUrl: string
AvdlSchemaVersion: string
PreserveNull?: string
OptionalArrayPaths?: string
}
Expand All @@ -41,8 +41,12 @@ export class Publisher<T = unknown> {
private topicHasAssignedSchema = false
private avroSchemasProvided = false

constructor(readonly topicName: string, readonly client: PubSub, private readonly logger?: ILogger,
avroSchemas?: { writer: object, reader: object }) {
constructor(
public readonly topicName: string,
public readonly client: PubSub,
private readonly logger?: ILogger,
avroSchemas?: { writer: object; reader: object },
) {
//TODO: avroSchemas parameter should be mandatory when only avro is used
if (avroSchemas) {
this.avroSchemasProvided = true
Expand All @@ -57,7 +61,7 @@ export class Publisher<T = unknown> {
this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema)
}
this.topic = client.topic(topicName)
this.jsonPublisher = client.topic(topicName, { gaxOpts: {retry: null}})
this.jsonPublisher = client.topic(topicName, { gaxOpts: { retry: null } })
this.topicSchemaName = `${this.topicName}-generated-avro`
}

Expand Down Expand Up @@ -103,11 +107,18 @@ export class Publisher<T = unknown> {
private logWarnIfMessageViolatesSchema(data: T): void {
if (this.writerAvroType) {
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(data as Record<string, unknown>, this.optionArrayPaths)
this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(
data as Record<string, unknown>,
this.optionArrayPaths,
)
}
const invalidPaths: string[] = []
if (!this.writerAvroType.isValid(data, {errorHook: path => invalidPaths.push(path.join('.'))} )) {
this.logger?.warn(`[schema-violation] [${this.topicName}] Message violates writer avro schema`, { payload: data, metadata: this.avroMessageMetadata, invalidPaths })
if (!this.writerAvroType.isValid(data, { errorHook: path => invalidPaths.push(path.join('.')) })) {
this.logger?.warn(`[schema-violation] [${this.topicName}] Message violates writer avro schema`, {
payload: data,
metadata: this.avroMessageMetadata,
invalidPaths,
})
}
}
}
Expand All @@ -131,7 +142,7 @@ export class Publisher<T = unknown> {
if (this.avroSchemasProvided) {
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()

if (!this.topicHasAssignedSchema && await this.doesRegistryHaveTopicSchema()) {
if (!this.topicHasAssignedSchema && (await this.doesRegistryHaveTopicSchema())) {
// TODO: this.setSchemaToTheTopic() should be replace with
// ```await this.topic.setMetadata({ schemaSettings: { schema: this.topicSchemaName, encoding: Encoding.JSON }})
// this.topicHasAssignedSchema = true```
Expand All @@ -144,7 +155,7 @@ export class Publisher<T = unknown> {
private setSchemaToTheTopic() {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
throw new Error("Can't find GCLOUD_PROJECT env variable, please define it")
}

this.topic.request(
Expand Down Expand Up @@ -179,7 +190,10 @@ export class Publisher<T = unknown> {
private async sendAvroMessage(data: T): Promise<void> {
let currentMessageMetadata = this.avroMessageMetadata
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
const undefinedOrNullOptionalArrays = this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(data as Record<string, unknown>, this.optionArrayPaths)
const undefinedOrNullOptionalArrays = this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(
data as Record<string, unknown>,
this.optionArrayPaths,
)
if (undefinedOrNullOptionalArrays.length > 0) {
currentMessageMetadata = { ...currentMessageMetadata }
currentMessageMetadata[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS] = undefinedOrNullOptionalArrays.join(',')
Expand All @@ -189,8 +203,11 @@ export class Publisher<T = unknown> {
// for now we are checking that avro is enabled before calling sendAvroMessage
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (!this.writerAvroType!.isValid(data)) {
this.logger?.error(`[${this.topicName}] Invalid payload for the specified writer schema, please check that the schema is correct ' +
'and payload can be encoded with it`, {payload: data, schemaMetadata: currentMessageMetadata})
this.logger?.error(
`[${this.topicName}] Invalid payload for the specified writer schema, please check that the schema is correct ' +
'and payload can be encoded with it`,
{ payload: data, schemaMetadata: currentMessageMetadata },
)
throw new Error(`[${this.topicName}] Can't encode the avro message for the topic`)
}
if (currentMessageMetadata && currentMessageMetadata[JOIN_PRESERVE_NULL]) {
Expand All @@ -203,11 +220,12 @@ export class Publisher<T = unknown> {
this.logger?.info(`PubSub: Avro message sent for topic: ${this.topicName}:`, { data, messageId })
}



private async sendJsonMessage(message: MessageOptions) {
const messageId = await this.jsonPublisher.publishMessage(message)
this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, { data: message.json as unknown, messageId })
this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, {
data: message.json as unknown,
messageId,
})
}

private prepareAvroMessageMetadata(schema: SchemaWithMetadata): Record<string, string> {
Expand All @@ -230,7 +248,7 @@ export class Publisher<T = unknown> {

private getLibraryVersion(): string {
const libPackageJsonPath = `${__dirname}/../package.json`
const packageJson = JSON.parse(readFileSync(libPackageJsonPath, 'utf8')) as { version: string}
const packageJson = JSON.parse(readFileSync(libPackageJsonPath, 'utf8')) as { version: string }
return packageJson.version
}

Expand Down
38 changes: 22 additions & 16 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface IParsedMessage<T = unknown> {
nack: () => void
}

export interface IMessageInfo{
export interface IMessageInfo {
id: string
receivedAt: Date
}
Expand Down Expand Up @@ -57,8 +57,8 @@ interface ISubscriptionInitializationOptions {
}

export class Subscriber<T = unknown> {
readonly topicName: string
readonly subscriptionName: string
public readonly topicName: string
public readonly subscriptionName: string

private readonly topic: Topic
private readonly subscription: Subscription
Expand Down Expand Up @@ -112,7 +112,7 @@ export class Subscriber<T = unknown> {
}
}

public start(asyncCallback: (msg: IParsedMessage<T>, info : IMessageInfo) => Promise<void>): void {
public start(asyncCallback: (msg: IParsedMessage<T>, info: IMessageInfo) => Promise<void>): void {
this.subscription.on('error', this.processError)
this.subscription.on('message', this.processMsg(asyncCallback))

Expand All @@ -131,7 +131,7 @@ export class Subscriber<T = unknown> {
attributes: message.attributes,
publishTime: message.publishTime?.toISOString(),
received: message.received,
deliveryAttempt: message.deliveryAttempt
deliveryAttempt: message.deliveryAttempt,
}

this.logger?.info(
Expand All @@ -155,7 +155,10 @@ export class Subscriber<T = unknown> {
dataParsed = await this.parseAvroMessage(message, schemaId)
const undefinedOrNullOptionalArrays = message.attributes[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS]
if (undefinedOrNullOptionalArrays) {
this.fieldsProcessor.setEmptyArrayFieldsToUndefined(dataParsed as Record<string, unknown>, undefinedOrNullOptionalArrays.split(','))
this.fieldsProcessor.setEmptyArrayFieldsToUndefined(
dataParsed as Record<string, unknown>,
undefinedOrNullOptionalArrays.split(','),
)
}
replaceNullsWithUndefined(dataParsed, message.attributes[JOIN_PRESERVE_NULL])
} else {
Expand All @@ -171,27 +174,30 @@ export class Subscriber<T = unknown> {
return type.fromString(message.data.toString()) as T
}


private processMsg(asyncCallback: (msg: IParsedMessage<T> , info: IMessageInfo) => Promise<void>): (message: Message) => void {
private processMsg(
asyncCallback: (msg: IParsedMessage<T>, info: IMessageInfo) => Promise<void>,
): (message: Message) => void {
const asyncMessageProcessor = async (message: Message) => {
const dataParsed = await this.parseData(message)
const messageParsed = Object.assign(message, { dataParsed })
const info : IMessageInfo = {
const info: IMessageInfo = {
id: message.id,
receivedAt: new Date(message.received)
receivedAt: new Date(message.received),
}
asyncCallback(messageParsed , info).catch(e => {
asyncCallback(messageParsed, info).catch(e => {
message.nack()
this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Failed to process message:`, e)
})
}

return (message: Message) => {
asyncMessageProcessor(message).then(_ => {
return
}).catch(e => {
throw e
})
asyncMessageProcessor(message)
.then(_ => {
return
})
.catch(e => {
throw e
})
}
}

Expand Down
26 changes: 13 additions & 13 deletions packages/pubsub/src/__tests__/support/pubsubMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ type EventHandler = (attrs: unknown) => Promise<unknown>
type EventHandlerMap = { [key: string]: EventHandler }

export interface IMessageType {
first?: string,
second?: string,
createdAt?: Date,
third?: string,
fourth?: {flag: boolean}
first?: string
second?: string
createdAt?: Date
third?: string
fourth?: { flag: boolean }
}

export const getIamMock = () => ({
Expand Down Expand Up @@ -67,7 +67,7 @@ export const getTopicMock = ({ subscriptionMock, iamMock }: ITopicMockOption = {
publishMessage: jest.fn(),
subscription: jest.fn(() => subscriptionMock),
iam: iamMock,
getMetadata: jest.fn()
getMetadata: jest.fn(),
})

export const schemaMock = {
Expand All @@ -76,11 +76,11 @@ export const schemaMock = {

export const schemaServiceClientMock = {
getSchema: jest.fn(),
listSchemaRevisions: jest.fn()
listSchemaRevisions: jest.fn(),
}

export const subscriberServiceClientMock = {
getSubscription: jest.fn()
getSubscription: jest.fn(),
}

export interface IClientMockOption {
Expand All @@ -89,7 +89,7 @@ export interface IClientMockOption {

export const getClientMock = ({ topicMock }: IClientMockOption = {}) => ({
topic: jest.fn(() => topicMock),
schema: jest.fn(() => schemaMock)
schema: jest.fn(() => schemaMock),
})

export interface IMessageMock {
Expand All @@ -105,22 +105,22 @@ export const getMessageMock = (data: unknown): IMessageMock => {
data: buffer,
ack: jest.fn(),
nack: jest.fn(),
attributes: { }
attributes: {},
}
}

export class ConsoleLogger implements ILogger {
error(message: string, payload: unknown | undefined): void {
public error(message: string, payload: unknown | undefined): void {
// eslint-disable-next-line no-console
console.log(message, payload)
}

info(message: string, payload: unknown | undefined): void {
public info(message: string, payload: unknown | undefined): void {
// eslint-disable-next-line no-console
console.log(message, payload)
}

warn(message: string, payload: unknown | undefined): void {
public warn(message: string, payload: unknown | undefined): void {
// eslint-disable-next-line no-console
console.log(message, payload)
}
Expand Down
Loading

0 comments on commit 8ed29fa

Please sign in to comment.