From 76545cbd15f300742f2747e9b0107f7c9149c635 Mon Sep 17 00:00:00 2001 From: bluayer Date: Wed, 11 Dec 2024 22:58:27 +0900 Subject: [PATCH] Creating a node with multiple slot ranges only once The `CLUSTER SLOTS` command returns a node that is responsible for multiple slot ranges multiple times. Therefore, I modify it for creating a node only once using dict, even if there are multiple slot ranges. --- src/valkey-benchmark.c | 52 ++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index e25ba1bed4..ab6ef54e62 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -223,6 +223,15 @@ static int dictSdsKeyCompare(const void *key1, const void *key2) { return memcmp(key1, key2, l1) == 0; } +static dictType dtype = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + static redisContext *getRedisContext(const char *ip, int port, const char *hostsocket) { redisContext *ctx = NULL; redisReply *reply = NULL; @@ -1019,7 +1028,6 @@ static clusterNode *createClusterNode(char *ip, int port) { } static void freeClusterNode(clusterNode *node) { - int i; if (node->name) sdsfree(node->name); if (node->replicate) sdsfree(node->replicate); /* If the node is not the reference node, that uses the address from @@ -1067,6 +1075,7 @@ static int fetchClusterConfiguration(void) { goto cleanup; } assert(reply->type == REDIS_REPLY_ARRAY); + dict *nodes = dictCreate(&dtype); for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; assert(r->type == REDIS_REPLY_ARRAY); @@ -1085,15 +1094,18 @@ static int fetchClusterConfiguration(void) { sds name = sdsnew(nr->element[2]->str); int port = nr->element[1]->integer; - printf("\nNode %s [%d-%d] %s:%d\n", name, from, to, ip, port); - clusterNode *node = NULL; - node = createClusterNode(sdsnew(ip), port); - if (node == NULL) { - success = 0; - goto cleanup; + dictEntry *entry = dictFind(nodes, name); + if (entry == NULL) { + node = createClusterNode(sdsnew(ip), port); + if (node == NULL) { + success = 0; + goto cleanup; + } + if (name != NULL) node->name = name; + } else { + node = dictGetVal(entry); } - if (name != NULL) node->name = name; if (from == to) { node->slots[node->slots_count++] = from; } else { @@ -1106,9 +1118,12 @@ static int fetchClusterConfiguration(void) { fprintf(stderr, "WARNING: Node %s:%d has no slots, skipping...\n", node->ip, node->port); continue; } - if (!addClusterNode(node)) { - success = 0; - goto cleanup; + if (entry == NULL) { + dictReplace(nodes, node->name, node); + if (!addClusterNode(node)) { + success = 0; + goto cleanup; + } } } } @@ -1139,17 +1154,10 @@ static int fetchClusterSlotsConfiguration(client c) { if (is_fetching_slots) return -1; // TODO: use other codes || errno ? atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); - fprintf(stderr, "If you are using the --replicas option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); + fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); const char *errmsg = "Failed to update cluster slots configuration"; - static dictType dtype = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCompare, /* key compare */ - NULL, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ - }; + /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */ dict *nodes = dictCreate(&dtype); redisContext *ctx = NULL; @@ -1377,7 +1385,7 @@ int parseOptions(int argc, char **argv) { config.num_threads = 0; } else if (!strcmp(argv[i], "--cluster")) { config.cluster_mode = 1; - } else if (!strcmp(argv[i], "--replicas")) { + } else if (!strcmp(argv[i], "--rfr")) { config.read_from_replicas = 1; } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; @@ -1476,7 +1484,7 @@ int parseOptions(int argc, char **argv) { " If the command is supplied on the command line in cluster\n" " mode, the key must contain \"{tag}\". Otherwise, the\n" " command will not be sent to the right cluster node.\n" - " --replicas Enable read from replicas in cluster mode.\n" + " --rfr Enable read from replicas in cluster mode.\n" " This command must be used with the --cluster option.\n" " When using this option, it is recommended to use only \n" " the commands for read requests.\n"