From 35d3de5b0b294510e81b9fceeed059950102f273 Mon Sep 17 00:00:00 2001 From: Jonah Snider Date: Mon, 1 Jul 2024 21:55:55 -0700 Subject: [PATCH] feat: support multiple handlers for a single subscription --- src/connections/abstract_connection.ts | 80 ++++++++++++++++---------- src/errors.ts | 2 + 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/src/connections/abstract_connection.ts b/src/connections/abstract_connection.ts index ebd03be..218154a 100644 --- a/src/connections/abstract_connection.ts +++ b/src/connections/abstract_connection.ts @@ -9,7 +9,6 @@ import Emittery from 'emittery' import type { Redis, Cluster } from 'ioredis' -import * as errors from '../errors.js' import type { PubSubOptions, ConnectionEvents, @@ -38,8 +37,8 @@ export abstract class AbstractConnection< /** * A list of active subscriptions and pattern subscription */ - protected subscriptions: Map = new Map() - protected psubscriptions: Map = new Map() + protected subscriptions: Map> = new Map() + protected psubscriptions: Map> = new Map() /** * The last error emitted by the `error` event. We set it to `null` after @@ -230,9 +229,11 @@ export abstract class AbstractConnection< * Listen for messages */ this.ioSubscriberConnection!.on('message', (channel, message) => { - const handler = this.subscriptions.get(channel) - if (handler) { - handler(message) + const handlers = this.subscriptions.get(channel) + if (handlers) { + for (const handler of handlers) { + handler(message) + } } }) @@ -240,9 +241,11 @@ export abstract class AbstractConnection< * Listen for pattern messages */ this.ioSubscriberConnection!.on('pmessage', (pattern, channel, message) => { - const handler = this.psubscriptions.get(pattern) - if (handler) { - handler(channel, message) + const handlers = this.psubscriptions.get(pattern) + if (handlers) { + for (const handler of handlers) { + handler(channel, message) + } } }) } @@ -278,13 +281,6 @@ export abstract class AbstractConnection< */ this.setupSubscriberConnection() - /** - * Disallow multiple subscriptions to a single channel - */ - if (this.subscriptions.has(channel)) { - throw new errors.E_MULTIPLE_REDIS_SUBSCRIPTIONS([channel]) - } - /** * If the subscriptions map is empty, it means we have no active subscriptions * on the given channel, hence we should make one subscription and also set @@ -296,7 +292,12 @@ export abstract class AbstractConnection< options?.onSubscription(count as number) } this.emit('subscription:ready', { count: count as number, connection: this }) - this.subscriptions.set(channel, handler) + const subscriptions = this.subscriptions.get(channel) + if (subscriptions) { + subscriptions.add(handler) + } else { + this.subscriptions.set(channel, new Set([handler])) + } }) .catch((error) => { if (options?.onError) { @@ -309,8 +310,19 @@ export abstract class AbstractConnection< /** * Unsubscribe from a channel */ - unsubscribe(channel: string) { - this.subscriptions.delete(channel) + unsubscribe(channel: string, handler?: PubSubChannelHandler) { + if (handler) { + const subscriptions = this.subscriptions.get(channel) + if (subscriptions) { + subscriptions.delete(handler) + } + + if (subscriptions && subscriptions.size !== 0) { + return Promise.resolve() + } + } else { + this.subscriptions.delete(channel) + } return this.ioSubscriberConnection!.unsubscribe(channel) } @@ -324,13 +336,6 @@ export abstract class AbstractConnection< */ this.setupSubscriberConnection() - /** - * Disallow multiple subscriptions to a single channel - */ - if (this.psubscriptions.has(pattern)) { - throw new errors.E_MULTIPLE_REDIS_PSUBSCRIPTIONS([pattern]) - } - /** * If the subscriptions map is empty, it means we have no active subscriptions * on the given channel, hence we should make one subscription and also set @@ -342,7 +347,12 @@ export abstract class AbstractConnection< options?.onSubscription(count as number) } this.emit('psubscription:ready', { count: count as number, connection: this }) - this.psubscriptions.set(pattern, handler) + const psubscriptions = this.psubscriptions.get(pattern) + if (psubscriptions) { + psubscriptions.add(handler) + } else { + this.psubscriptions.set(pattern, new Set([handler])) + } }) .catch((error) => { if (options?.onError) { @@ -355,8 +365,20 @@ export abstract class AbstractConnection< /** * Unsubscribe from a given pattern */ - punsubscribe(pattern: string) { - this.psubscriptions.delete(pattern) + punsubscribe(pattern: string, handler?: PubSubPatternHandler) { + if (handler) { + const psubscriptions = this.psubscriptions.get(pattern) + if (psubscriptions) { + psubscriptions.delete(handler) + } + + if (psubscriptions && psubscriptions.size !== 0) { + return Promise.resolve() + } + } else { + this.psubscriptions.delete(pattern) + } + return this.ioSubscriberConnection!.punsubscribe(pattern) } diff --git a/src/errors.ts b/src/errors.ts index 2790681..c1af322 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -9,12 +9,14 @@ import { createError } from '@poppinss/utils' +/** @deprecated */ export const E_MULTIPLE_REDIS_SUBSCRIPTIONS = createError<[string]>( 'Cannot subscribe to "%s" channel. Channel already has an active subscription', 'E_MULTIPLE_REDIS_SUBSCRIPTIONS', 500 ) +/** @deprecated */ export const E_MULTIPLE_REDIS_PSUBSCRIPTIONS = createError<[string]>( 'Cannot subscribe to "%s" pattern. Pattern already has an active subscription', 'E_MULTIPLE_REDIS_PSUBSCRIPTIONS',