Skip to content

Commit

Permalink
Refact: Apply strict mode (#16)
Browse files Browse the repository at this point in the history
* Refact: Apply strict mode

modify type errors that occur when the strict is set to true

* Fix: TS strict mode errors.

* Refactor: Updated tests.

* v1.0.5

---------

Co-authored-by: tamimaj <[email protected]>
  • Loading branch information
Luna-Runa and tamimaj authored Apr 9, 2024
1 parent cd53544 commit cc1651d
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 83 deletions.
4 changes: 3 additions & 1 deletion examples/client-app/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion examples/users-microservice/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 12 additions & 7 deletions lib/redis-stream-client.core-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@ export class RedisStreamClientCoreModule {
);
}

if (options.useExisting || options.useFactory) {
return [this.createAsyncClientProvider(options)];
const providers: Provider[] = [this.createAsyncClientProvider(options)];

if (!options.useExisting && !options.useFactory && options.useClass) {
providers.push({ provide: options.useClass, useClass: options.useClass });
}

return [
this.createAsyncClientProvider(options),
{ provide: options.useClass, useClass: options.useClass },
];
return providers
}

/* createAsyncOptionsProvider */
Expand All @@ -86,12 +85,18 @@ export class RedisStreamClientCoreModule {
};
}

const inject = options.useClass
? [options.useClass]
: options.useExisting
? [options.useExisting]
: []

return {
provide: REDIS_STREAM_CLIENT_MODULE_OPTIONS,
useFactory: async (
optionsFactory: RedisStreamClientModuleOptionsFactory,
) => optionsFactory.createRedisStreamClientModuleOptions(),
inject: [options.useClass || options.useExisting],
inject,
};
}
}
30 changes: 19 additions & 11 deletions lib/redis.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import { firstValueFrom, share } from 'rxjs';
export class RedisStreamClient extends ClientProxy {
protected readonly logger = new Logger(RedisStreamClient.name);

private redis: RedisInstance; // server instance for listening on response streams.
private redis: RedisInstance | null = null; // server instance for listening on response streams.

private client: RedisInstance; // client instance for publishing streams.
private client: RedisInstance | null = null; // client instance for publishing streams.

protected connection: Promise<any>; // client connection logic is required by framework.
protected connection: Promise<any> | null = null; // client connection logic is required by framework.

private streamsToListenOn: string[] = []; // response streams to listen on.

Expand All @@ -40,9 +40,9 @@ export class RedisStreamClient extends ClientProxy {
this.logger.log(
'Redis Client Responses Listener connected successfully on ' +
(this.options.connection?.url ??
this.options.connection.host +
this.options.connection?.host +
':' +
this.options.connection.port),
this.options.connection?.port),
);

this.initListener();
Expand Down Expand Up @@ -102,6 +102,8 @@ export class RedisStreamClient extends ClientProxy {

public async handleXadd(stream: string, serializedPayloadArray: any[]) {
try {
if (!this.client) throw new Error('Redis client instance not found.');

let response = await this.client.xadd(
stream,
'*',
Expand Down Expand Up @@ -222,12 +224,14 @@ export class RedisStreamClient extends ClientProxy {

private async createConsumerGroup(stream: string, consumerGroup: string) {
try {
if (!this.redis) throw new Error('Redis instance not found.');

await this.redis.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');

return true;
} catch (error) {
// if group exist for this stream. log debug.
if (error?.message.includes('BUSYGROUP')) {
if (error instanceof Error && error?.message.includes('BUSYGROUP')) {
this.logger.debug(
'Consumer Group "' +
consumerGroup +
Expand All @@ -242,14 +246,16 @@ export class RedisStreamClient extends ClientProxy {
}
}

private async listenOnStreams() {
private async listenOnStreams(): Promise<void> {
try {
if (!this.redis) throw new Error('Redis instance not found.');

let results: any[];

results = await this.redis.xreadgroup(
'GROUP',
this.options?.streams?.consumerGroup || undefined,
this.options?.streams?.consumer || undefined, // need to make it throw an error.
this.options?.streams?.consumerGroup || '',
this.options?.streams?.consumer || '',
'BLOCK',
this.options?.streams?.block || 0,
'STREAMS',
Expand Down Expand Up @@ -315,8 +321,8 @@ export class RedisStreamClient extends ClientProxy {

// after message
private async deliverToHandler(
correlationId,
parsedPayload,
correlationId: string,
parsedPayload: any,
ctx: RedisStreamContext,
) {
try {
Expand Down Expand Up @@ -369,6 +375,8 @@ export class RedisStreamClient extends ClientProxy {

private async handleAck(inboundContext: RedisStreamContext) {
try {
if (!this.client) throw new Error('Redis client instance not found.');

await this.client.xack(
inboundContext.getStream(),
inboundContext.getConsumerGroup(),
Expand Down
30 changes: 20 additions & 10 deletions lib/redis.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ export class RedisStreamStrategy
extends Server
implements CustomTransportStrategy
{
private streamHandlerMap = {};
private streamHandlerMap: { [key: string]: any } = {};

private redis: RedisInstance;
private redis: RedisInstance | null = null;

private client: RedisInstance;
private client: RedisInstance | null = null;

constructor(private readonly options: ConstructorOptions) {
super();
Expand All @@ -40,7 +40,9 @@ export class RedisStreamStrategy
this.logger.log(
'Redis connected successfully on ' +
(this.options.connection?.url ??
this.options.connection.host + ':' + this.options.connection.port),
this.options.connection?.host +
':' +
this.options.connection?.port),
);

this.bindHandlers();
Expand Down Expand Up @@ -85,20 +87,22 @@ export class RedisStreamStrategy
return true;
} catch (error) {
// JSON.parse will throw error, if is not parsable.
this.logger.debug(error + '. Handler Pattern is: ' + pattern);
this.logger.debug!(error + '. Handler Pattern is: ' + pattern);
return false;
}
}

private async createConsumerGroup(stream: string, consumerGroup: string) {
try {
if (!this.redis) throw new Error('Redis instance not found.');

await this.redis.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');

return true;
} catch (error) {
// if group exist for this stream. log debug.
if (error?.message.includes('BUSYGROUP')) {
this.logger.debug(
if (error instanceof Error && error.message.includes('BUSYGROUP')) {
this.logger.debug!(
'Consumer Group "' +
consumerGroup +
'" already exists for stream: ' +
Expand Down Expand Up @@ -134,6 +138,8 @@ export class RedisStreamStrategy
);
}

if (!this.client) throw new Error('Redis client instance not found.');

await this.client.xadd(responseObj.stream, '*', ...serializedEntries);
}),
);
Expand All @@ -147,6 +153,8 @@ export class RedisStreamStrategy

private async handleAck(inboundContext: RedisStreamContext) {
try {
if (!this.client) throw new Error('Redis client instance not found.');

await this.client.xack(
inboundContext.getStream(),
inboundContext.getConsumerGroup(),
Expand Down Expand Up @@ -248,14 +256,16 @@ export class RedisStreamStrategy
}
}

private async listenOnStreams() {
private async listenOnStreams(): Promise<void> {
try {
if (!this.redis) throw new Error('Redis instance not found.');

let results: any[];

results = await this.redis.xreadgroup(
'GROUP',
this.options?.streams?.consumerGroup || undefined,
this.options?.streams?.consumer || undefined, // need to make it throw an error.
this.options?.streams?.consumerGroup || '',
this.options?.streams?.consumer || '',
'BLOCK',
this.options?.streams?.block || 0,
'STREAMS',
Expand Down
3 changes: 1 addition & 2 deletions lib/redis.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ export function createRedisConnection(
connection?: RedisConnectionOptions,
): RedisInstance {
// connection obj is optional, ioredis handle the default connection to localhost:6379

if (connection?.url) {
return new Redis(connection?.url, connection);
} else {
return new Redis(connection);
return new Redis(connection!);
}
}
10 changes: 5 additions & 5 deletions lib/requests-map.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
export class RequestsMap<T, S> {
private map = {};
export class RequestsMap<T extends string | number | symbol, S> {
private map: Record<T, S> = {} as Record<T, S>;

constructor() {}

public addEntry(requestId, handler) {
public addEntry(requestId: T, handler: S) {
this.map[requestId] = handler;
return true;
}

public getEntry(requestId) {
public getEntry(requestId: T) {
return this.map[requestId];
}

public removeEntry(requestId) {
public removeEntry(requestId: T) {
delete this.map[requestId];
return true;
}
Expand Down
12 changes: 6 additions & 6 deletions lib/streams.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export async function serialize(
return stringifiedResponse;
} catch (error) {
logger.error(error);
return null;
return [];
}
}

Expand All @@ -58,10 +58,10 @@ export async function parseJson(data: string): Promise<any> {
}
}

export function parseRawMessage(rawMessage: any): any {
export function parseRawMessage(rawMessage: any): Record<string, any> {
let payload = rawMessage[1];

let obj = {};
let obj: Record<string, any> = {};

for (let i = 0; i < payload.length; i += 2) {
obj[payload[i]] = payload[i + 1];
Expand All @@ -70,9 +70,9 @@ export function parseRawMessage(rawMessage: any): any {
return obj;
}

export function stringifyMessage(messageObj: any): any {
export function stringifyMessage(messageObj: Record<string, string>): string[] {
try {
let finalArray = [];
let finalArray: string[] = [];

for (let key in messageObj) {
finalArray.push(key);
Expand All @@ -82,7 +82,7 @@ export function stringifyMessage(messageObj: any): any {
return finalArray;
} catch (error) {
logger.error(error);
return null;
return [];
}
}

Expand Down
Loading

0 comments on commit cc1651d

Please sign in to comment.