Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1 prefix #24

Merged
merged 4 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
418 changes: 206 additions & 212 deletions examples/client-app/package-lock.json

Large diffs are not rendered by default.

429 changes: 207 additions & 222 deletions examples/users-microservice/package-lock.json

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion examples/users-microservice/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
"@nestjs/schematics": "^9.2.0",
"@nestjs/testing": "^9.4.3",
"@types/express": "^4.17.21",
"@types/ioredis": "^4.28.10",
"@types/jest": "28.1.8",
"@types/node": "^16.18.101",
"@types/supertest": "^2.0.16",
Expand Down
1 change: 1 addition & 0 deletions examples/users-microservice/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async function bootstrap() {
strategy: new RedisStreamStrategy({
connection: {
url: '0.0.0.0:6379',
// keyPrefix: 'my-key-prefix:',
},
streams: {
block: 5000,
Expand Down
51 changes: 43 additions & 8 deletions lib/redis.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ export class RedisStreamStrategy
try {
if (!this.redis) throw new Error('Redis instance not found.');

await this.redis.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');
const modifiedStreamKey = this.prependPrefix(stream);

await this.redis.xgroup(
'CREATE',
modifiedStreamKey,
consumerGroup,
'$',
'MKSTREAM',
);

return true;
} catch (error) {
Expand All @@ -106,7 +114,7 @@ export class RedisStreamStrategy
'Consumer Group "' +
consumerGroup +
'" already exists for stream: ' +
stream,
this.prependPrefix(stream),
);
return true;
} else {
Expand Down Expand Up @@ -214,12 +222,13 @@ export class RedisStreamStrategy

private async notifyHandlers(stream: string, messages: any[]) {
try {
const handler = this.streamHandlerMap[stream];
const modifiedStream = this.stripPrefix(stream);
const handler = this.streamHandlerMap[modifiedStream];

await Promise.all(
messages.map(async (message) => {
let ctx = new RedisStreamContext([
stream,
modifiedStream,
message[0], // message id needed for ACK.
this.options?.streams?.consumerGroup,
this.options?.streams?.consumer,
Expand Down Expand Up @@ -269,10 +278,14 @@ export class RedisStreamStrategy
'BLOCK',
this.options?.streams?.block || 0,
'STREAMS',
...(Object.keys(this.streamHandlerMap) as string[]), // streams keys
...(Object.keys(this.streamHandlerMap) as string[]).map(
(stream: string) => '>',
), // '>', this is needed for xreadgroup as id.
...(Object.keys(this.streamHandlerMap).map((s) =>
this.stripPrefix(s),
) as string[]), // streams keys
...(
Object.keys(this.streamHandlerMap).map((s) =>
this.stripPrefix(s),
) as string[]
).map((stream: string) => '>'), // '>', this is needed for xreadgroup as id.
);

// if BLOCK time ended, and results are null, listen again.
Expand All @@ -285,10 +298,32 @@ export class RedisStreamStrategy

return this.listenOnStreams();
} catch (error) {
console.log('Error in listenOnStreams: ', error);
this.logger.error(error);
}
}

// When the stream handler name is stored in streamHandlerMap, its stored WITH the key prefix, so sending additional redis commands when using the prefix with the existing key will cause a duplicate prefix. This ensures to strip the first occurrence of the prefix when binding listeners.
private stripPrefix(streamHandlerName: string) {
const keyPrefix = this?.redis?.options?.keyPrefix;
if (!keyPrefix || !streamHandlerName.startsWith(keyPrefix)) {
return streamHandlerName;
}
// Replace just the first instance of the substring
return streamHandlerName.replace(keyPrefix, '');
}

// xgroup CREATE command with ioredis does not automatically prefix the keyPrefix, though many other commands do, such as xreadgroup.
// https://github.com/redis/ioredis/issues/1659
private prependPrefix(key: string) {
const keyPrefix = this?.redis?.options?.keyPrefix;
if (keyPrefix && !key.startsWith(keyPrefix)) {
return `${keyPrefix}${key}`;
} else {
return key;
}
}

// for redis instances. need to add mechanism to try to connect back.
public handleError(stream: any) {
stream.on(ERROR_EVENT, (err: any) => {
Expand Down
Loading
Loading