Skip to content

Commit

Permalink
revert: improvement: pub/sub layer now accepts any data type
Browse files Browse the repository at this point in the history
  • Loading branch information
thetutlage committed May 19, 2021
1 parent 3b02000 commit c488b42
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 28 deletions.
8 changes: 4 additions & 4 deletions adonis-typings/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ declare module '@ioc:Adonis/Addons/Redis' {
/**
* Pubsub subscriber
*/
export type PubSubChannelHandler<T extends any = any> = (data: T) => Promise<void> | void
export type PubSubPatternHandler<T extends any = any> = (
export type PubSubChannelHandler<T extends any = string> = (data: T) => Promise<void> | void
export type PubSubPatternHandler<T extends any = string> = (
channel: string,
data: T
) => Promise<void> | void
Expand All @@ -45,10 +45,10 @@ declare module '@ioc:Adonis/Addons/Redis' {
export interface RedisPubSubContract {
publish(
channel: string,
message: any,
message: string,
callback: (error: Error | null, count: number) => void
): void
publish(channel: string, message: any): Promise<number>
publish(channel: string, message: string): Promise<number>
subscribe(channel: string, handler: PubSubChannelHandler | string): void
psubscribe(pattern: string, handler: PubSubPatternHandler | string): void
unsubscribe(channel: string): void
Expand Down
24 changes: 5 additions & 19 deletions src/AbstractConnection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import { EventEmitter } from 'events'
import { Redis, Cluster } from 'ioredis'
import { Exception } from '@poppinss/utils'
import { MessageBuilder } from '@poppinss/utils/build/helpers'
import { ApplicationContract } from '@ioc:Adonis/Core/Application'
import { ContainerBindings, IocResolverContract } from '@ioc:Adonis/Core/Application'

Expand All @@ -22,8 +21,6 @@ import {
PubSubPatternHandler,
} from '@ioc:Adonis/Addons/Redis'

const PUBSUB_PURPOSE = 'adonis-pubsub'

/**
* Helper to sleep
*/
Expand Down Expand Up @@ -181,12 +178,7 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
this.ioSubscriberConnection!.on('message', (channel, message) => {
const handler = this.subscriptions.get(channel)
if (handler) {
/**
* If the message is not originated from the same process and not built using
* the message builder, then should pass it as it is
*/
const verifiedMessage = new MessageBuilder().verify(message, PUBSUB_PURPOSE)
handler(verifiedMessage || message)
handler(message)
}
})

Expand All @@ -196,12 +188,7 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
this.ioSubscriberConnection!.on('pmessage', (pattern, channel, message) => {
const handler = this.psubscriptions.get(pattern)
if (handler) {
/**
* If the message is not originated from the same process and not built using
* the message builder, then should pass it as it is
*/
const verifiedMessage = new MessageBuilder().verify(message, PUBSUB_PURPOSE)
handler(channel, verifiedMessage || message)
handler(channel, message)
}
})

Expand Down Expand Up @@ -414,10 +401,9 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
}
}

public publish(channel: string, message: any, callback?: any) {
const messageString = new MessageBuilder().build(message, undefined, PUBSUB_PURPOSE)
public publish(channel: string, message: string, callback?: any) {
return callback
? this.ioConnection.publish(channel, messageString, callback)
: this.ioConnection.publish(channel, messageString)
? this.ioConnection.publish(channel, message, callback)
: this.ioConnection.publish(channel, message)
}
}
10 changes: 5 additions & 5 deletions test/redis-connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,12 @@ test.group('Redis factory - PSubscribe', () => {
) as unknown as RedisConnectionContract

factory.on('psubscription:ready', () => {
factory.publish('news:prime', { title: 'breaking news at 9' })
factory.publish('news:prime', JSON.stringify({ title: 'breaking news at 9' }))
})

factory.psubscribe('news:*', (channel, message) => {
assert.equal(channel, 'news:prime')
assert.deepEqual(message, { title: 'breaking news at 9' })
assert.deepEqual(message, JSON.stringify({ title: 'breaking news at 9' }))
factory.punsubscribe('news:*')

factory.publish('news:prime', 'breaking news at 9', (_error, count) => {
Expand All @@ -509,9 +509,9 @@ test.group('Redis factory - PSubscribe', () => {
) as unknown as RedisConnectionContract

class RedisListeners {
public async onNews(channel: string, message: { title: string }) {
public async onNews(channel: string, message: string) {
assert.equal(channel, 'news:prime')
assert.deepEqual(message, { title: 'breaking news at 9' })
assert.equal(message, JSON.stringify({ title: 'breaking news at 9' }))
await factory.quit()
done()
}
Expand All @@ -524,7 +524,7 @@ test.group('Redis factory - PSubscribe', () => {
factory.psubscribe('news:*', 'RedisListeners.onNews')

factory.on('psubscription:ready', () => {
factory.publish('news:prime', { title: 'breaking news at 9' })
factory.publish('news:prime', JSON.stringify({ title: 'breaking news at 9' }))
})
})

Expand Down

0 comments on commit c488b42

Please sign in to comment.