Skip to content

Commit

Permalink
Add client capa
Browse files Browse the repository at this point in the history
Signed-off-by: hwware <[email protected]>
  • Loading branch information
hwware committed Oct 16, 2024
1 parent 27352ed commit ea343e1
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 23 deletions.
2 changes: 2 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4037,6 +4037,8 @@ void clientCommand(client *c) {
for (int i = 2; i < c->argc; i++) {
if (!strcasecmp(c->argv[i]->ptr, "redirect")) {
c->capa |= CLIENT_CAPA_REDIRECT;
} else if (!strcasecmp(c->argv[i]->ptr, "subv2")) {
c->capa |= CLIENT_CAPA_SUBV2;
}
}
addReply(c, shared.ok);
Expand Down
18 changes: 16 additions & 2 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ void pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
serverAssert(dictInsertAtPosition(type.clientPubSubChannels(c), channel, position));
incrRefCount(channel);
}

if (!(c->capa & CLIENT_CAPA_SUBV2)) {
/* Notify the client */
addReplyPubsubSubscribed(c, channel, type);
}
}

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
Expand Down Expand Up @@ -567,7 +572,12 @@ void subscribeCommand(client *c) {
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
addPubSubChannel(c, pubSubType);

if (c->capa & CLIENT_CAPA_SUBV2) {
addPubSubChannel(c, pubSubType);
} else {
for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubType);
}

markClientAsPubSub(c);
}
Expand Down Expand Up @@ -735,7 +745,11 @@ void ssubscribeCommand(client *c) {
return;
}

addPubSubChannel(c, pubSubShardType);
if (c->capa & CLIENT_CAPA_SUBV2) {
addPubSubChannel(c, pubSubShardType);
} else {
for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
}
markClientAsPubSub(c);
}

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];

/* Client capabilities */
#define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */
#define CLIENT_CAPA_SUBV2 (1 << 1) /* Indicate that the client can handle pubsub v2 version */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
17 changes: 16 additions & 1 deletion src/valkey-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ static struct config {
char *test_hint_file;
int prefer_ipv4; /* Prefer IPv4 over IPv6 on DNS lookup. */
int prefer_ipv6; /* Prefer IPv6 over IPv4 on DNS lookup. */
int pubsub_version;
} config;

/* User preferences. */
Expand Down Expand Up @@ -2345,6 +2346,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
config.output = OUTPUT_RAW;
}

if (!strcasecmp(command, "client") && argc >= 3 && !strcasecmp(argv[1], "capa")) {
for (int index = 2; index < argc; index++) {
if (!strcasecmp(argv[index], "subv2")) {
config.pubsub_version = 2;
break;
}
}
}

/* Setup argument length */
argvlen = zmalloc(argc * sizeof(size_t));
for (j = 0; j < argc; j++) argvlen[j] = sdslen(argv[j]);
Expand Down Expand Up @@ -2375,7 +2385,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
* an in-band message is received, but these commands are confirmed
* using push replies only. There is one push reply per channel if
* channels are specified, otherwise at least one. */
num_expected_pubsub_push = 1;
if (config.pubsub_version == 2) {
num_expected_pubsub_push = 1;
} else {
num_expected_pubsub_push = argc > 1 ? argc - 1 : 1;
}
/* Unset our default PUSH handler so this works in RESP2/RESP3 */
redisSetPushCallback(context, NULL);
}
Expand Down Expand Up @@ -9535,6 +9549,7 @@ int main(int argc, char **argv) {
config.server_version = NULL;
config.prefer_ipv4 = 0;
config.prefer_ipv6 = 0;
config.pubsub_version = 1;
config.cluster_manager_command.name = NULL;
config.cluster_manager_command.argc = 0;
config.cluster_manager_command.argv = NULL;
Expand Down
10 changes: 8 additions & 2 deletions tests/integration/valkey-cli.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,13 @@ start_server {tags {"cli"}} {

# Subscribe to some channels.
set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n"
set sub2 "4) \"subscribe\"\n5) \"ch2\"\n6) (integer) 2\n"
set sub3 "7) \"subscribe\"\n8) \"ch3\"\n9) (integer) 3\n"
set sub2 "1) \"subscribe\"\n2) \"ch2\"\n3) (integer) 2\n"
set sub3 "1) \"subscribe\"\n2) \"ch3\"\n3) (integer) 3\n"
assert_equal $sub1$sub2$sub3$reading \
[run_command $fd "subscribe ch1 ch2 ch3"]

# set sub2 "4) \"subscribe\"\n5) \"ch2\"\n6) (integer) 2\n"
# set sub3 "7) \"subscribe\"\n8) \"ch3\"\n9) (integer) 3\n"
# Receive pubsub message.
r publish ch2 hello
set message "1) \"message\"\n2) \"ch2\"\n3) \"hello\"\n"
Expand Down Expand Up @@ -241,6 +243,10 @@ start_server {tags {"cli"}} {
[run_command $fd "subscribe ch1"]
}

test_interactive_cli "Subscribed mode" {

}

test_interactive_nontty_cli "Subscribed mode" {
# Raw output and no "Reading messages..." info message.
# Use RESP3 in this test case.
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/cluster/pubsubshard.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ test "sunsubscribe without specifying any channel would unsubscribe all shard ch
set publishclient [valkey_client_by_addr $publishnode(host) $publishnode(port)]
set subscribeclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]

assert_equal {1} [ssubscribe $subscribeclient {"\{channel.0\}1"}]
assert_equal {2} [ssubscribe $subscribeclient {"\{channel.0\}2"}]
assert_equal {3} [ssubscribe $subscribeclient {"\{channel.0\}3"}]
# assert_equal {1} [ssubscribe $subscribeclient {"\{channel.0\}1"}]
# assert_equal {2} [ssubscribe $subscribeclient {"\{channel.0\}2"}]
# assert_equal {3} [ssubscribe $subscribeclient {"\{channel.0\}3"}]
set sub_res [ssubscribe $subscribeclient [list "\{channel.0\}1" "\{channel.0\}2" "\{channel.0\}3"]]
assert_equal [list 1 2 3] $sub_res

sunsubscribe $subscribeclient

Expand Down
30 changes: 18 additions & 12 deletions tests/unit/pubsub.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ start_server {tags {"pubsub network"}} {
set rd1 [valkey_deferring_client]

# subscribe to two channels
assert_equal {1} [subscribe $rd1 {chan1}]
assert_equal {2} [subscribe $rd1 {chan2}]
#assert_equal {1} [subscribe $rd1 {chan1}]
#assert_equal {2} [subscribe $rd1 {chan2}]
assert_equal {1 2} [subscribe $rd1 {chan1 chan2}]
assert_equal 1 [r publish chan1 hello]
assert_equal 1 [r publish chan2 world]
assert_equal {message chan1 hello} [$rd1 read]
Expand Down Expand Up @@ -84,9 +85,10 @@ start_server {tags {"pubsub network"}} {

test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" {
set rd1 [valkey_deferring_client]
assert_equal {1} [subscribe $rd1 {chan1}]
assert_equal {2} [subscribe $rd1 {chan2}]
assert_equal {3} [subscribe $rd1 {chan3}]
#assert_equal {1} [subscribe $rd1 {chan1}]
#assert_equal {2} [subscribe $rd1 {chan2}]
#assert_equal {3} [subscribe $rd1 {chan3}]
assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}]
unsubscribe $rd1
# wait for the unsubscribe to take effect
wait_for_condition 50 100 {
Expand All @@ -104,9 +106,10 @@ start_server {tags {"pubsub network"}} {

test "SUBSCRIBE to one channel more than once" {
set rd1 [valkey_deferring_client]
assert_equal {1} [subscribe $rd1 {chan1}]
assert_equal {2} [subscribe $rd1 {chan2}]
assert_equal {3} [subscribe $rd1 {chan3}]
#assert_equal {1} [subscribe $rd1 {chan1}]
#assert_equal {2} [subscribe $rd1 {chan2}]
#assert_equal {3} [subscribe $rd1 {chan3}]
assert_equal {1 1 1} [subscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r publish chan1 hello]
assert_equal {message chan1 hello} [$rd1 read]

Expand All @@ -128,9 +131,9 @@ start_server {tags {"pubsub network"}} {
set rd1 [valkey_deferring_client]

# subscribe to two patterns
# assert_equal {1 2} [psubscribe $rd1 {foo.* bar.*}]
assert_equal {1} [psubscribe $rd1 {foo.*}]
assert_equal {2} [psubscribe $rd1 {bar.*}]
assert_equal {1 2} [psubscribe $rd1 {foo.* bar.*}]
#assert_equal {1} [psubscribe $rd1 {foo.*}]
#assert_equal {2} [psubscribe $rd1 {bar.*}]
assert_equal 1 [r publish foo.1 hello]
assert_equal 1 [r publish bar.1 hello]
assert_equal 0 [r publish foo1 hello]
Expand Down Expand Up @@ -488,7 +491,10 @@ start_server {tags {"pubsub network"}} {
# Note: SUBSCRIBE and UNSUBSCRIBE with multiple channels in the same command,
# Only one response is returned
# This update matches with Redis response: one command always returns one response
assert_equal "subscribe foo 1 subscribe bar 2 subscribe baz 3" [r subscribe foo bar baz]
#assert_equal "subscribe foo 1 subscribe bar 2 subscribe baz 3" [r subscribe foo bar baz]
assert_equal "subscribe foo 1" [r subscribe foo bar baz]
assert_equal "subscribe bar 2" [r read]
assert_equal "subscribe baz 3" [r read]

r multi
r ping abc
Expand Down
7 changes: 4 additions & 3 deletions tests/unit/pubsubshard.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ start_server {tags {"pubsubshard external:skip"}} {

test "SSUBSCRIBE to one channel more than once" {
set rd1 [valkey_deferring_client]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {1} [ssubscribe $rd1 {chan1}]
#assert_equal {1} [ssubscribe $rd1 {chan1}]
#assert_equal {1} [ssubscribe $rd1 {chan1}]
#assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r SPUBLISH chan1 hello]
assert_equal {smessage chan1 hello} [$rd1 read]

Expand Down

0 comments on commit ea343e1

Please sign in to comment.