Skip to content

Commit

Permalink
PubSub handle array of messages for RESP2 (#3811)
Browse files Browse the repository at this point in the history
* PubSub handle array of messages

for RESP2 only

* Modify test and add binary mode test

* Edit
  • Loading branch information
sazzad16 authored Apr 9, 2024
1 parent b617822 commit 44d34a6
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/main/java/redis/clients/jedis/JedisPubSubBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,13 @@ private void process() {
onUnsubscribe(enchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
final byte[] bchannel = (byte[]) listReply.get(1);
final byte[] bmesg = (byte[]) listReply.get(2);
final Object mesg = listReply.get(2);
final T enchannel = (bchannel == null) ? null : encode(bchannel);
final T enmesg = (bmesg == null) ? null : encode(bmesg);
onMessage(enchannel, enmesg);
if (mesg instanceof List) {
((List<byte[]>) mesg).forEach(bmesg -> onMessage(enchannel, encode(bmesg)));
} else {
onMessage(enchannel, (mesg == null) ? null : encode((byte[]) mesg));
}
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
final byte[] bpattern = (byte[]) listReply.get(1);
final byte[] bchannel = (byte[]) listReply.get(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static redis.clients.jedis.Protocol.Command.CLIENT;

import java.io.IOException;
import java.net.UnknownHostException;
Expand All @@ -15,7 +17,9 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -537,4 +541,73 @@ private String makeLargeString(int size) {

return sb.toString();
}

@Test(timeout = 5000)
public void subscribeCacheInvalidateChannel() {
org.junit.Assume.assumeThat(protocol, Matchers.not(RedisProtocol.RESP3));

final String cacheInvalidate = "__redis__:invalidate";
final AtomicBoolean onMessage = new AtomicBoolean(false);
final JedisPubSub pubsub = new JedisPubSub() {
@Override public void onMessage(String channel, String message) {
onMessage.set(true);
assertEquals(cacheInvalidate, channel);
if (message != null) {
assertEquals("foo", message);
consumeJedis(j -> j.flushAll());
} else {
unsubscribe(channel);
}
}

@Override public void onSubscribe(String channel, int subscribedChannels) {
assertEquals(cacheInvalidate, channel);
consumeJedis(j -> j.set("foo", "bar"));
}
};

try (Jedis subscriber = createJedis()) {
long clientId = subscriber.clientId();
subscriber.sendCommand(CLIENT, "TRACKING", "ON", "REDIRECT", Long.toString(clientId), "BCAST");
subscriber.subscribe(pubsub, cacheInvalidate);
assertTrue("Subscriber didn't get any message.", onMessage.get());
}
}

@Test(timeout = 5000)
public void subscribeCacheInvalidateChannelBinary() {
org.junit.Assume.assumeThat(protocol, Matchers.not(RedisProtocol.RESP3));

final byte[] cacheInvalidate = "__redis__:invalidate".getBytes();
final AtomicBoolean onMessage = new AtomicBoolean(false);
final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() {
@Override public void onMessage(byte[] channel, byte[] message) {
onMessage.set(true);
assertArrayEquals(cacheInvalidate, channel);
if (message != null) {
assertArrayEquals("foo".getBytes(), message);
consumeJedis(j -> j.flushAll());
} else {
unsubscribe(channel);
}
}

@Override public void onSubscribe(byte[] channel, int subscribedChannels) {
assertArrayEquals(cacheInvalidate, channel);
consumeJedis(j -> j.set("foo".getBytes(), "bar".getBytes()));
}
};

try (Jedis subscriber = createJedis()) {
long clientId = subscriber.clientId();
subscriber.sendCommand(CLIENT, "TRACKING", "ON", "REDIRECT", Long.toString(clientId), "BCAST");
subscriber.subscribe(pubsub, cacheInvalidate);
assertTrue("Subscriber didn't get any message.", onMessage.get());
}
}

private void consumeJedis(Consumer<Jedis> consumer) {
Thread t = new Thread(() -> consumer.accept(jedis));
t.start();
}
}

0 comments on commit 44d34a6

Please sign in to comment.