Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiple handlers for a single subscription #66

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 51 additions & 29 deletions src/connections/abstract_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import Emittery from 'emittery'
import type { Redis, Cluster } from 'ioredis'
import * as errors from '../errors.js'
import type {
PubSubOptions,
ConnectionEvents,
Expand Down Expand Up @@ -38,8 +37,8 @@ export abstract class AbstractConnection<
/**
* A list of active subscriptions and pattern subscription
*/
protected subscriptions: Map<string, PubSubChannelHandler> = new Map()
protected psubscriptions: Map<string, PubSubPatternHandler> = new Map()
protected subscriptions: Map<string, Set<PubSubChannelHandler>> = new Map()
protected psubscriptions: Map<string, Set<PubSubPatternHandler>> = new Map()

/**
* The last error emitted by the `error` event. We set it to `null` after
Expand Down Expand Up @@ -230,19 +229,23 @@ export abstract class AbstractConnection<
* Listen for messages
*/
this.ioSubscriberConnection!.on('message', (channel, message) => {
const handler = this.subscriptions.get(channel)
if (handler) {
handler(message)
const handlers = this.subscriptions.get(channel)
if (handlers) {
for (const handler of handlers) {
handler(message)
}
}
})

/**
* Listen for pattern messages
*/
this.ioSubscriberConnection!.on('pmessage', (pattern, channel, message) => {
const handler = this.psubscriptions.get(pattern)
if (handler) {
handler(channel, message)
const handlers = this.psubscriptions.get(pattern)
if (handlers) {
for (const handler of handlers) {
handler(channel, message)
}
}
})
}
Expand Down Expand Up @@ -278,13 +281,6 @@ export abstract class AbstractConnection<
*/
this.setupSubscriberConnection()

/**
* Disallow multiple subscriptions to a single channel
*/
if (this.subscriptions.has(channel)) {
throw new errors.E_MULTIPLE_REDIS_SUBSCRIPTIONS([channel])
}

/**
* If the subscriptions map is empty, it means we have no active subscriptions
* on the given channel, hence we should make one subscription and also set
Expand All @@ -296,7 +292,12 @@ export abstract class AbstractConnection<
options?.onSubscription(count as number)
}
this.emit('subscription:ready', { count: count as number, connection: this })
this.subscriptions.set(channel, handler)
const subscriptions = this.subscriptions.get(channel)
if (subscriptions) {
subscriptions.add(handler)
} else {
this.subscriptions.set(channel, new Set([handler]))
}
})
.catch((error) => {
if (options?.onError) {
Expand All @@ -309,8 +310,19 @@ export abstract class AbstractConnection<
/**
* Unsubscribe from a channel
*/
unsubscribe(channel: string) {
this.subscriptions.delete(channel)
unsubscribe(channel: string, handler?: PubSubChannelHandler) {
if (handler) {
const subscriptions = this.subscriptions.get(channel)
if (subscriptions) {
subscriptions.delete(handler)
}

if (subscriptions && subscriptions.size !== 0) {
return Promise.resolve()
}
} else {
this.subscriptions.delete(channel)
}
return this.ioSubscriberConnection!.unsubscribe(channel)
}

Expand All @@ -324,13 +336,6 @@ export abstract class AbstractConnection<
*/
this.setupSubscriberConnection()

/**
* Disallow multiple subscriptions to a single channel
*/
if (this.psubscriptions.has(pattern)) {
throw new errors.E_MULTIPLE_REDIS_PSUBSCRIPTIONS([pattern])
}

/**
* If the subscriptions map is empty, it means we have no active subscriptions
* on the given channel, hence we should make one subscription and also set
Expand All @@ -342,7 +347,12 @@ export abstract class AbstractConnection<
options?.onSubscription(count as number)
}
this.emit('psubscription:ready', { count: count as number, connection: this })
this.psubscriptions.set(pattern, handler)
const psubscriptions = this.psubscriptions.get(pattern)
if (psubscriptions) {
psubscriptions.add(handler)
} else {
this.psubscriptions.set(pattern, new Set([handler]))
}
})
.catch((error) => {
if (options?.onError) {
Expand All @@ -355,8 +365,20 @@ export abstract class AbstractConnection<
/**
* Unsubscribe from a given pattern
*/
punsubscribe(pattern: string) {
this.psubscriptions.delete(pattern)
punsubscribe(pattern: string, handler?: PubSubPatternHandler) {
if (handler) {
const psubscriptions = this.psubscriptions.get(pattern)
if (psubscriptions) {
psubscriptions.delete(handler)
}

if (psubscriptions && psubscriptions.size !== 0) {
return Promise.resolve()
}
} else {
this.psubscriptions.delete(pattern)
}

return this.ioSubscriberConnection!.punsubscribe(pattern)
}

Expand Down
2 changes: 2 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

import { createError } from '@poppinss/utils'

/** @deprecated */
export const E_MULTIPLE_REDIS_SUBSCRIPTIONS = createError<[string]>(
'Cannot subscribe to "%s" channel. Channel already has an active subscription',
'E_MULTIPLE_REDIS_SUBSCRIPTIONS',
500
)

/** @deprecated */
export const E_MULTIPLE_REDIS_PSUBSCRIPTIONS = createError<[string]>(
'Cannot subscribe to "%s" pattern. Pattern already has an active subscription',
'E_MULTIPLE_REDIS_PSUBSCRIPTIONS',
Expand Down