From 30403a037534efd71a66db8b5562e12c7e4bb234 Mon Sep 17 00:00:00 2001 From: hwware Date: Tue, 10 Sep 2024 22:04:51 +0000 Subject: [PATCH] Add client capa --- src/networking.c | 2 ++ src/pubsub.c | 18 ++++++++++++++++-- src/server.h | 1 + src/valkey-cli.c | 17 ++++++++++++++++- tests/integration/valkey-cli.tcl | 10 ++++++++-- tests/unit/cluster/pubsubshard.tcl | 8 +++++--- tests/unit/pubsub.tcl | 30 ++++++++++++++++++------------ tests/unit/pubsubshard.tcl | 7 ++++--- 8 files changed, 70 insertions(+), 23 deletions(-) diff --git a/src/networking.c b/src/networking.c index 503a85d693..702a8286e5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4016,6 +4016,8 @@ NULL for (int i = 2; i < c->argc; i++) { if (!strcasecmp(c->argv[i]->ptr, "redirect")) { c->capa |= CLIENT_CAPA_REDIRECT; + } else if (!strcasecmp(c->argv[i]->ptr, "subv2")) { + c->capa |= CLIENT_CAPA_SUBV2; } } addReply(c, shared.ok); diff --git a/src/pubsub.c b/src/pubsub.c index 950cb465da..44e5cdc4e5 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -285,6 +285,11 @@ void pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { serverAssert(dictInsertAtPosition(type.clientPubSubChannels(c), channel, position)); incrRefCount(channel); } + + if (!(c->capa & CLIENT_CAPA_SUBV2)) { + /* Notify the client */ + addReplyPubsubSubscribed(c, channel, type); + } } /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or @@ -567,7 +572,12 @@ void subscribeCommand(client *c) { addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client"); return; } - addPubSubChannel(c, pubSubType); + + if (c->capa & CLIENT_CAPA_SUBV2) { + addPubSubChannel(c, pubSubType); + } else { + for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubType); + } markClientAsPubSub(c); } @@ -734,7 +744,11 @@ void ssubscribeCommand(client *c) { return; } - addPubSubChannel(c, pubSubShardType); + if (c->capa & CLIENT_CAPA_SUBV2) { + addPubSubChannel(c, pubSubShardType); + } else { + for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubShardType); + } markClientAsPubSub(c); } diff --git a/src/server.h b/src/server.h index fbe57917c8..e85a22a096 100644 --- a/src/server.h +++ b/src/server.h @@ -355,6 +355,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* Client capabilities */ #define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */ +#define CLIENT_CAPA_SUBV2 (1 << 1) /* Indicate that the client can handle pubsub v2 version */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 7630954734..5f002f10c9 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -272,6 +272,7 @@ static struct config { char *test_hint_file; int prefer_ipv4; /* Prefer IPv4 over IPv6 on DNS lookup. */ int prefer_ipv6; /* Prefer IPv6 over IPv4 on DNS lookup. */ + int pubsub_version; } config; /* User preferences. */ @@ -2345,6 +2346,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) { config.output = OUTPUT_RAW; } + if (!strcasecmp(command, "client") && argc >= 3 && !strcasecmp(argv[1], "capa")) { + for (int index = 2; index < argc; index++) { + if (!strcasecmp(argv[index], "subv2")) { + config.pubsub_version = 2; + break; + } + } + } + /* Setup argument length */ argvlen = zmalloc(argc * sizeof(size_t)); for (j = 0; j < argc; j++) argvlen[j] = sdslen(argv[j]); @@ -2375,7 +2385,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) { * an in-band message is received, but these commands are confirmed * using push replies only. There is one push reply per channel if * channels are specified, otherwise at least one. */ - num_expected_pubsub_push = 1; + if (config.pubsub_version == 2) { + num_expected_pubsub_push = 1; + } else { + num_expected_pubsub_push = argc > 1 ? argc - 1 : 1; + } /* Unset our default PUSH handler so this works in RESP2/RESP3 */ redisSetPushCallback(context, NULL); } @@ -9537,6 +9551,7 @@ int main(int argc, char **argv) { config.server_version = NULL; config.prefer_ipv4 = 0; config.prefer_ipv6 = 0; + config.pubsub_version = 1; config.cluster_manager_command.name = NULL; config.cluster_manager_command.argc = 0; config.cluster_manager_command.argv = NULL; diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index 4ee4adbfb8..9052f8fa6a 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -204,11 +204,13 @@ start_server {tags {"cli"}} { # Subscribe to some channels. set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n" - set sub2 "4) \"subscribe\"\n5) \"ch2\"\n6) (integer) 2\n" - set sub3 "7) \"subscribe\"\n8) \"ch3\"\n9) (integer) 3\n" + set sub2 "1) \"subscribe\"\n2) \"ch2\"\n3) (integer) 2\n" + set sub3 "1) \"subscribe\"\n2) \"ch3\"\n3) (integer) 3\n" assert_equal $sub1$sub2$sub3$reading \ [run_command $fd "subscribe ch1 ch2 ch3"] + # set sub2 "4) \"subscribe\"\n5) \"ch2\"\n6) (integer) 2\n" + # set sub3 "7) \"subscribe\"\n8) \"ch3\"\n9) (integer) 3\n" # Receive pubsub message. r publish ch2 hello set message "1) \"message\"\n2) \"ch2\"\n3) \"hello\"\n" @@ -241,6 +243,10 @@ start_server {tags {"cli"}} { [run_command $fd "subscribe ch1"] } + test_interactive_cli "Subscribed mode" { + + } + test_interactive_nontty_cli "Subscribed mode" { # Raw output and no "Reading messages..." info message. # Use RESP3 in this test case. diff --git a/tests/unit/cluster/pubsubshard.tcl b/tests/unit/cluster/pubsubshard.tcl index ca168de1e8..74018b810c 100644 --- a/tests/unit/cluster/pubsubshard.tcl +++ b/tests/unit/cluster/pubsubshard.tcl @@ -59,9 +59,11 @@ test "sunsubscribe without specifying any channel would unsubscribe all shard ch set publishclient [valkey_client_by_addr $publishnode(host) $publishnode(port)] set subscribeclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)] - assert_equal {1} [ssubscribe $subscribeclient {"\{channel.0\}1"}] - assert_equal {2} [ssubscribe $subscribeclient {"\{channel.0\}2"}] - assert_equal {3} [ssubscribe $subscribeclient {"\{channel.0\}3"}] + # assert_equal {1} [ssubscribe $subscribeclient {"\{channel.0\}1"}] + # assert_equal {2} [ssubscribe $subscribeclient {"\{channel.0\}2"}] + # assert_equal {3} [ssubscribe $subscribeclient {"\{channel.0\}3"}] + set sub_res [ssubscribe $subscribeclient [list "\{channel.0\}1" "\{channel.0\}2" "\{channel.0\}3"]] + assert_equal [list 1 2 3] $sub_res sunsubscribe $subscribeclient diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index 4235b2eaf7..c873d0276d 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -45,8 +45,9 @@ start_server {tags {"pubsub network"}} { set rd1 [valkey_deferring_client] # subscribe to two channels - assert_equal {1} [subscribe $rd1 {chan1}] - assert_equal {2} [subscribe $rd1 {chan2}] + #assert_equal {1} [subscribe $rd1 {chan1}] + #assert_equal {2} [subscribe $rd1 {chan2}] + assert_equal {1 2} [subscribe $rd1 {chan1 chan2}] assert_equal 1 [r publish chan1 hello] assert_equal 1 [r publish chan2 world] assert_equal {message chan1 hello} [$rd1 read] @@ -84,9 +85,10 @@ start_server {tags {"pubsub network"}} { test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" { set rd1 [valkey_deferring_client] - assert_equal {1} [subscribe $rd1 {chan1}] - assert_equal {2} [subscribe $rd1 {chan2}] - assert_equal {3} [subscribe $rd1 {chan3}] + #assert_equal {1} [subscribe $rd1 {chan1}] + #assert_equal {2} [subscribe $rd1 {chan2}] + #assert_equal {3} [subscribe $rd1 {chan3}] + assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}] unsubscribe $rd1 # wait for the unsubscribe to take effect wait_for_condition 50 100 { @@ -104,9 +106,10 @@ start_server {tags {"pubsub network"}} { test "SUBSCRIBE to one channel more than once" { set rd1 [valkey_deferring_client] - assert_equal {1} [subscribe $rd1 {chan1}] - assert_equal {2} [subscribe $rd1 {chan2}] - assert_equal {3} [subscribe $rd1 {chan3}] + #assert_equal {1} [subscribe $rd1 {chan1}] + #assert_equal {2} [subscribe $rd1 {chan2}] + #assert_equal {3} [subscribe $rd1 {chan3}] + assert_equal {1 1 1} [subscribe $rd1 {chan1 chan1 chan1}] assert_equal 1 [r publish chan1 hello] assert_equal {message chan1 hello} [$rd1 read] @@ -128,9 +131,9 @@ start_server {tags {"pubsub network"}} { set rd1 [valkey_deferring_client] # subscribe to two patterns - # assert_equal {1 2} [psubscribe $rd1 {foo.* bar.*}] - assert_equal {1} [psubscribe $rd1 {foo.*}] - assert_equal {2} [psubscribe $rd1 {bar.*}] + assert_equal {1 2} [psubscribe $rd1 {foo.* bar.*}] + #assert_equal {1} [psubscribe $rd1 {foo.*}] + #assert_equal {2} [psubscribe $rd1 {bar.*}] assert_equal 1 [r publish foo.1 hello] assert_equal 1 [r publish bar.1 hello] assert_equal 0 [r publish foo1 hello] @@ -488,7 +491,10 @@ start_server {tags {"pubsub network"}} { # Note: SUBSCRIBE and UNSUBSCRIBE with multiple channels in the same command, # Only one response is returned # This update matches with Redis response: one command always returns one response - assert_equal "subscribe foo 1 subscribe bar 2 subscribe baz 3" [r subscribe foo bar baz] + #assert_equal "subscribe foo 1 subscribe bar 2 subscribe baz 3" [r subscribe foo bar baz] + assert_equal "subscribe foo 1" [r subscribe foo bar baz] + assert_equal "subscribe bar 2" [r read] + assert_equal "subscribe baz 3" [r read] r multi r ping abc diff --git a/tests/unit/pubsubshard.tcl b/tests/unit/pubsubshard.tcl index 92b6cd3d2e..c405773272 100644 --- a/tests/unit/pubsubshard.tcl +++ b/tests/unit/pubsubshard.tcl @@ -64,9 +64,10 @@ start_server {tags {"pubsubshard external:skip"}} { test "SSUBSCRIBE to one channel more than once" { set rd1 [valkey_deferring_client] - assert_equal {1} [ssubscribe $rd1 {chan1}] - assert_equal {1} [ssubscribe $rd1 {chan1}] - assert_equal {1} [ssubscribe $rd1 {chan1}] + #assert_equal {1} [ssubscribe $rd1 {chan1}] + #assert_equal {1} [ssubscribe $rd1 {chan1}] + #assert_equal {1} [ssubscribe $rd1 {chan1}] + assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}] assert_equal 1 [r SPUBLISH chan1 hello] assert_equal {smessage chan1 hello} [$rd1 read]