Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(deps): TECH update dependencies #102

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defaults: &defaults
environment:
NODE_OPTIONS: '--max_old_space_size=1536'
docker:
- image: cimg/node:16.14.1
- image: cimg/node:20.11.0
<<: *docker-auth

commands:
Expand Down
2 changes: 1 addition & 1 deletion .node-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
16.14.1
20.11.0
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"postinstall": "if [ -d .git ]; then git config core.hooksPath .hooks; fi"
},
"engines": {
"node": ">=14.0.0"
"node": "^20.9.0"
},
"devDependencies": {
"conventional-changelog-conventionalcommits": "^5.0.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub-legacy-factories/src/SubscriberFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class SubscriberFactory {
private readonly client: PubSub
private readonly schemaServiceClient: SchemaServiceClient

constructor(readonly options: ISubscriptionOptions, private readonly logger: ILogger) {
constructor(public readonly options: ISubscriptionOptions, private readonly logger: ILogger) {
this.client = new PubSub()
this.schemaServiceClient = new SchemaServiceClient()
}
Expand Down
22 changes: 11 additions & 11 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@
"prepublishOnly": "yarn lint && yarn build"
},
"dependencies": {
"@google-cloud/pubsub": "^3.5.1",
"@google-cloud/pubsub": "^4.3.3",
"avsc": "^5.7.7"
},
"devDependencies": {
"@join-private/eslint-config-backend": "^1.3.0",
"@join-private/eslint-config-backend": "^1.5.0",
"@side/jest-runtime": "^1.1.0",
"@swc/core": "1.3.90",
"@swc/jest": "0.2.29",
"@types/jest": "^29.1.1",
"@types/node": "^18.8.2",
"eslint": "^8.24.0",
"@swc/core": "1.4.16",
"@swc/jest": "0.2.36",
"@types/jest": "^29.5.12",
"@types/node": "^20.12.7",
"eslint": "^8.57.0",
"jest": "^29.7.0",
"jest-extended": "^3.1.0",
"jest-watch-typeahead": "^2.2.0",
"prettier": "^2.7.1",
"typescript": "^4.8.4"
"jest-extended": "^4.0.2",
"jest-watch-typeahead": "^2.2.2",
"prettier": "^3.2.5",
"typescript": "^5.4.5"
},
"publishConfig": {
"access": "public"
Expand Down
60 changes: 39 additions & 21 deletions packages/pubsub/src/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import { logWarnWhenUndefinedInNullPreserveFields } from './util'
import Encoding = google.pubsub.v1.Encoding

interface IMessageMetadata {
Event: string,
GeneratorVersion: string,
GeneratorGitRemoteOriginUrl: string,
Event: string
GeneratorVersion: string
GeneratorGitRemoteOriginUrl: string
SchemaType: string
AvdlSchemaPathInGitRepo: string,
AvdlSchemaGitRemoteOriginUrl: string,
AvdlSchemaVersion: string,
AvdlSchemaPathInGitRepo: string
AvdlSchemaGitRemoteOriginUrl: string
AvdlSchemaVersion: string
PreserveNull?: string
OptionalArrayPaths?: string
}
Expand All @@ -41,8 +41,12 @@ export class Publisher<T = unknown> {
private topicHasAssignedSchema = false
private avroSchemasProvided = false

constructor(readonly topicName: string, readonly client: PubSub, private readonly logger?: ILogger,
avroSchemas?: { writer: object, reader: object }) {
constructor(
public readonly topicName: string,
public readonly client: PubSub,
private readonly logger?: ILogger,
avroSchemas?: { writer: object; reader: object },
) {
//TODO: avroSchemas parameter should be mandatory when only avro is used
if (avroSchemas) {
this.avroSchemasProvided = true
Expand All @@ -57,7 +61,7 @@ export class Publisher<T = unknown> {
this.avroMessageMetadata = this.prepareAvroMessageMetadata(readerAvroSchema)
}
this.topic = client.topic(topicName)
this.jsonPublisher = client.topic(topicName, { gaxOpts: {retry: null}})
this.jsonPublisher = client.topic(topicName, { gaxOpts: { retry: null } })
this.topicSchemaName = `${this.topicName}-generated-avro`
}

Expand Down Expand Up @@ -103,11 +107,18 @@ export class Publisher<T = unknown> {
private logWarnIfMessageViolatesSchema(data: T): void {
if (this.writerAvroType) {
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(data as Record<string, unknown>, this.optionArrayPaths)
this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(
data as Record<string, unknown>,
this.optionArrayPaths,
)
}
const invalidPaths: string[] = []
if (!this.writerAvroType.isValid(data, {errorHook: path => invalidPaths.push(path.join('.'))} )) {
this.logger?.warn(`[schema-violation] [${this.topicName}] Message violates writer avro schema`, { payload: data, metadata: this.avroMessageMetadata, invalidPaths })
if (!this.writerAvroType.isValid(data, { errorHook: path => invalidPaths.push(path.join('.')) })) {
this.logger?.warn(`[schema-violation] [${this.topicName}] Message violates writer avro schema`, {
payload: data,
metadata: this.avroMessageMetadata,
invalidPaths,
})
}
}
}
Expand All @@ -131,7 +142,7 @@ export class Publisher<T = unknown> {
if (this.avroSchemasProvided) {
this.topicHasAssignedSchema = await this.doesTopicHaveSchemaAssigned()

if (!this.topicHasAssignedSchema && await this.doesRegistryHaveTopicSchema()) {
if (!this.topicHasAssignedSchema && (await this.doesRegistryHaveTopicSchema())) {
// TODO: this.setSchemaToTheTopic() should be replace with
// ```await this.topic.setMetadata({ schemaSettings: { schema: this.topicSchemaName, encoding: Encoding.JSON }})
// this.topicHasAssignedSchema = true```
Expand All @@ -144,7 +155,7 @@ export class Publisher<T = unknown> {
private setSchemaToTheTopic() {
const projectName = process.env['GCLOUD_PROJECT']
if (!projectName) {
throw new Error('Can\'t find GCLOUD_PROJECT env variable, please define it')
throw new Error("Can't find GCLOUD_PROJECT env variable, please define it")
}

this.topic.request(
Expand Down Expand Up @@ -179,7 +190,10 @@ export class Publisher<T = unknown> {
private async sendAvroMessage(data: T): Promise<void> {
let currentMessageMetadata = this.avroMessageMetadata
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
const undefinedOrNullOptionalArrays = this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(data as Record<string, unknown>, this.optionArrayPaths)
const undefinedOrNullOptionalArrays = this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(
data as Record<string, unknown>,
this.optionArrayPaths,
)
if (undefinedOrNullOptionalArrays.length > 0) {
currentMessageMetadata = { ...currentMessageMetadata }
currentMessageMetadata[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS] = undefinedOrNullOptionalArrays.join(',')
Expand All @@ -189,8 +203,11 @@ export class Publisher<T = unknown> {
// for now we are checking that avro is enabled before calling sendAvroMessage
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
if (!this.writerAvroType!.isValid(data)) {
this.logger?.error(`[${this.topicName}] Invalid payload for the specified writer schema, please check that the schema is correct ' +
'and payload can be encoded with it`, {payload: data, schemaMetadata: currentMessageMetadata})
this.logger?.error(
`[${this.topicName}] Invalid payload for the specified writer schema, please check that the schema is correct ' +
'and payload can be encoded with it`,
{ payload: data, schemaMetadata: currentMessageMetadata },
)
throw new Error(`[${this.topicName}] Can't encode the avro message for the topic`)
}
if (currentMessageMetadata && currentMessageMetadata[JOIN_PRESERVE_NULL]) {
Expand All @@ -203,11 +220,12 @@ export class Publisher<T = unknown> {
this.logger?.info(`PubSub: Avro message sent for topic: ${this.topicName}:`, { data, messageId })
}



private async sendJsonMessage(message: MessageOptions) {
const messageId = await this.jsonPublisher.publishMessage(message)
this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, { data: message.json as unknown, messageId })
this.logger?.info(`PubSub: JSON Message sent for topic: ${this.topicName}:`, {
data: message.json as unknown,
messageId,
})
}

private prepareAvroMessageMetadata(schema: SchemaWithMetadata): Record<string, string> {
Expand All @@ -230,7 +248,7 @@ export class Publisher<T = unknown> {

private getLibraryVersion(): string {
const libPackageJsonPath = `${__dirname}/../package.json`
const packageJson = JSON.parse(readFileSync(libPackageJsonPath, 'utf8')) as { version: string}
const packageJson = JSON.parse(readFileSync(libPackageJsonPath, 'utf8')) as { version: string }
return packageJson.version
}

Expand Down
38 changes: 22 additions & 16 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface IParsedMessage<T = unknown> {
nack: () => void
}

export interface IMessageInfo{
export interface IMessageInfo {
id: string
receivedAt: Date
}
Expand Down Expand Up @@ -57,8 +57,8 @@ interface ISubscriptionInitializationOptions {
}

export class Subscriber<T = unknown> {
readonly topicName: string
readonly subscriptionName: string
public readonly topicName: string
public readonly subscriptionName: string

private readonly topic: Topic
private readonly subscription: Subscription
Expand Down Expand Up @@ -112,7 +112,7 @@ export class Subscriber<T = unknown> {
}
}

public start(asyncCallback: (msg: IParsedMessage<T>, info : IMessageInfo) => Promise<void>): void {
public start(asyncCallback: (msg: IParsedMessage<T>, info: IMessageInfo) => Promise<void>): void {
this.subscription.on('error', this.processError)
this.subscription.on('message', this.processMsg(asyncCallback))

Expand All @@ -131,7 +131,7 @@ export class Subscriber<T = unknown> {
attributes: message.attributes,
publishTime: message.publishTime?.toISOString(),
received: message.received,
deliveryAttempt: message.deliveryAttempt
deliveryAttempt: message.deliveryAttempt,
}

this.logger?.info(
Expand All @@ -155,7 +155,10 @@ export class Subscriber<T = unknown> {
dataParsed = await this.parseAvroMessage(message, schemaId)
const undefinedOrNullOptionalArrays = message.attributes[JOIN_UNDEFINED_OR_NULL_OPTIONAL_ARRAYS]
if (undefinedOrNullOptionalArrays) {
this.fieldsProcessor.setEmptyArrayFieldsToUndefined(dataParsed as Record<string, unknown>, undefinedOrNullOptionalArrays.split(','))
this.fieldsProcessor.setEmptyArrayFieldsToUndefined(
dataParsed as Record<string, unknown>,
undefinedOrNullOptionalArrays.split(','),
)
}
replaceNullsWithUndefined(dataParsed, message.attributes[JOIN_PRESERVE_NULL])
} else {
Expand All @@ -171,27 +174,30 @@ export class Subscriber<T = unknown> {
return type.fromString(message.data.toString()) as T
}


private processMsg(asyncCallback: (msg: IParsedMessage<T> , info: IMessageInfo) => Promise<void>): (message: Message) => void {
private processMsg(
asyncCallback: (msg: IParsedMessage<T>, info: IMessageInfo) => Promise<void>,
): (message: Message) => void {
const asyncMessageProcessor = async (message: Message) => {
const dataParsed = await this.parseData(message)
const messageParsed = Object.assign(message, { dataParsed })
const info : IMessageInfo = {
const info: IMessageInfo = {
id: message.id,
receivedAt: new Date(message.received)
receivedAt: new Date(message.received),
}
asyncCallback(messageParsed , info).catch(e => {
asyncCallback(messageParsed, info).catch(e => {
message.nack()
this.logger?.error(`PubSub: Subscription: ${this.subscriptionName} Failed to process message:`, e)
})
}

return (message: Message) => {
asyncMessageProcessor(message).then(_ => {
return
}).catch(e => {
throw e
})
asyncMessageProcessor(message)
.then(_ => {
return
})
.catch(e => {
throw e
})
}
}

Expand Down
26 changes: 13 additions & 13 deletions packages/pubsub/src/__tests__/support/pubsubMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ type EventHandler = (attrs: unknown) => Promise<unknown>
type EventHandlerMap = { [key: string]: EventHandler }

export interface IMessageType {
first?: string,
second?: string,
createdAt?: Date,
third?: string,
fourth?: {flag: boolean}
first?: string
second?: string
createdAt?: Date
third?: string
fourth?: { flag: boolean }
}

export const getIamMock = () => ({
Expand Down Expand Up @@ -67,7 +67,7 @@ export const getTopicMock = ({ subscriptionMock, iamMock }: ITopicMockOption = {
publishMessage: jest.fn(),
subscription: jest.fn(() => subscriptionMock),
iam: iamMock,
getMetadata: jest.fn()
getMetadata: jest.fn(),
})

export const schemaMock = {
Expand All @@ -76,11 +76,11 @@ export const schemaMock = {

export const schemaServiceClientMock = {
getSchema: jest.fn(),
listSchemaRevisions: jest.fn()
listSchemaRevisions: jest.fn(),
}

export const subscriberServiceClientMock = {
getSubscription: jest.fn()
getSubscription: jest.fn(),
}

export interface IClientMockOption {
Expand All @@ -89,7 +89,7 @@ export interface IClientMockOption {

export const getClientMock = ({ topicMock }: IClientMockOption = {}) => ({
topic: jest.fn(() => topicMock),
schema: jest.fn(() => schemaMock)
schema: jest.fn(() => schemaMock),
})

export interface IMessageMock {
Expand All @@ -105,22 +105,22 @@ export const getMessageMock = (data: unknown): IMessageMock => {
data: buffer,
ack: jest.fn(),
nack: jest.fn(),
attributes: { }
attributes: {},
}
}

export class ConsoleLogger implements ILogger {
error(message: string, payload: unknown | undefined): void {
public error(message: string, payload: unknown | undefined): void {
// eslint-disable-next-line no-console
console.log(message, payload)
}

info(message: string, payload: unknown | undefined): void {
public info(message: string, payload: unknown | undefined): void {
// eslint-disable-next-line no-console
console.log(message, payload)
}

warn(message: string, payload: unknown | undefined): void {
public warn(message: string, payload: unknown | undefined): void {
// eslint-disable-next-line no-console
console.log(message, payload)
}
Expand Down
Loading
Loading