From d8bb28cdbcb73de00c9840cfc0d18e3d55858af9 Mon Sep 17 00:00:00 2001 From: bluayer Date: Wed, 4 Dec 2024 16:58:48 +0900 Subject: [PATCH 01/16] Rewrite to use CLUSTER SLOTS 1. The CLUSTER NODES command does not provide replicas' slot range. Therefore, I have changed to use the CLUSTER SLOTS command to implement read from the Replica. 2. If you use the CLUSTER NODES command, the string parsing is so long and complex that it makes the code difficult to read. In comparison, CLUSTER SLOTS makes the code much simpler and more concise. 3. As noted in the TODO comment, the migrating and importing information is not used. Signed-off-by: bluayer Signed-off-by: bluayer --- src/valkey-benchmark.c | 180 +++++++++++------------------------------ 1 file changed, 45 insertions(+), 135 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 57cdd6fc16..155a7f9b82 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -1056,156 +1056,66 @@ static clusterNode **addClusterNode(clusterNode *node) { return config.cluster_nodes; } -/* TODO: This should be refactored to use CLUSTER SLOTS, the migrating/importing - * information is anyway not used. - */ static int fetchClusterConfiguration(void) { int success = 1; redisContext *ctx = NULL; redisReply *reply = NULL; + const char *errmsg = "Failed to fetch cluster configuration"; + size_t i, j; ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket); if (ctx == NULL) { exit(1); } - clusterNode *firstNode = createClusterNode((char *)config.conn_info.hostip, config.conn_info.hostport); - if (!firstNode) { + + reply = redisCommand(ctx, "CLUSTER SLOTS"); + if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { success = 0; + if (reply) fprintf(stderr, "%s\nCLUSTER SLOTS ERROR: %s\n", errmsg, reply->str); goto cleanup; } - reply = redisCommand(ctx, "CLUSTER NODES"); - success = (reply != NULL); - if (!success) goto cleanup; - success = (reply->type != REDIS_REPLY_ERROR); - if (!success) { - if (config.hostsocket == NULL) { - fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n", config.conn_info.hostip, - config.conn_info.hostport, reply->str); - } else { - fprintf(stderr, "Cluster node %s replied with error:\n%s\n", config.hostsocket, reply->str); - } - goto cleanup; - } - char *lines = reply->str, *p, *line; - while ((p = strstr(lines, "\n")) != NULL) { - *p = '\0'; - line = lines; - lines = p + 1; - char *name = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL; - int i = 0; - while ((p = strchr(line, ' ')) != NULL) { - *p = '\0'; - char *token = line; - line = p + 1; - switch (i++) { - case 0: name = token; break; - case 1: addr = token; break; - case 2: flags = token; break; - case 3: primary_id = token; break; - } - if (i == 8) break; // Slots - } - if (!flags) { - fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n"); - success = 0; - goto cleanup; - } - int myself = (strstr(flags, "myself") != NULL); - int is_replica = (strstr(flags, "slave") != NULL || (primary_id != NULL && primary_id[0] != '-')); - if (is_replica) continue; - if (addr == NULL) { - fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n"); - success = 0; - goto cleanup; - } - clusterNode *node = NULL; - char *ip = NULL; - int port = 0; - char *paddr = strrchr(addr, ':'); - if (paddr != NULL) { - *paddr = '\0'; - ip = addr; - addr = paddr + 1; - /* If internal bus is specified, then just drop it. */ - if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0'; - port = atoi(addr); - } - if (myself) { - node = firstNode; - if (ip != NULL && strcmp(node->ip, ip) != 0) { - node->ip = sdsnew(ip); - node->port = port; - } - } else { + assert(reply->type == REDIS_REPLY_ARRAY); + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + assert(r->type == REDIS_REPLY_ARRAY); + assert(r->elements >= 3); + int from = r->element[0]->integer; + int to = r->element[1]->integer; + for (j = 2; j < r->elements; j++) { + int is_primary = (j == 2); + if (!is_primary) continue; + redisReply *nr = r->element[j]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); + assert(nr->element[0]->str != NULL); + assert(nr->element[2]->str != NULL); + sds ip = sdsnew(nr->element[0]->str); + sds name = sdsnew(nr->element[2]->str); + int port = nr->element[1]->integer; + + printf("\nNode %s [%d-%d] %s:%d\n", name, from, to, ip, port); + + clusterNode *node = NULL; node = createClusterNode(sdsnew(ip), port); - } - if (node == NULL) { - success = 0; - goto cleanup; - } - if (name != NULL) node->name = sdsnew(name); - if (i == 8) { - int remaining = strlen(line); - while (remaining > 0) { - p = strchr(line, ' '); - if (p == NULL) p = line + remaining; - remaining -= (p - line); - - char *slotsdef = line; - *p = '\0'; - if (remaining) { - line = p + 1; - remaining--; - } else - line = p; - char *dash = NULL; - if (slotsdef[0] == '[') { - slotsdef++; - if ((p = strstr(slotsdef, "->-"))) { // Migrating - *p = '\0'; - p += 3; - char *closing_bracket = strchr(p, ']'); - if (closing_bracket) *closing_bracket = '\0'; - sds slot = sdsnew(slotsdef); - sds dst = sdsnew(p); - node->migrating_count += 2; - node->migrating = zrealloc(node->migrating, (node->migrating_count * sizeof(sds))); - node->migrating[node->migrating_count - 2] = slot; - node->migrating[node->migrating_count - 1] = dst; - } else if ((p = strstr(slotsdef, "-<-"))) { // Importing - *p = '\0'; - p += 3; - char *closing_bracket = strchr(p, ']'); - if (closing_bracket) *closing_bracket = '\0'; - sds slot = sdsnew(slotsdef); - sds src = sdsnew(p); - node->importing_count += 2; - node->importing = zrealloc(node->importing, (node->importing_count * sizeof(sds))); - node->importing[node->importing_count - 2] = slot; - node->importing[node->importing_count - 1] = src; - } - } else if ((dash = strchr(slotsdef, '-')) != NULL) { - p = dash; - int start, stop; - *p = '\0'; - start = atoi(slotsdef); - stop = atoi(p + 1); - while (start <= stop) { - int slot = start++; - node->slots[node->slots_count++] = slot; - } - } else if (p > slotsdef) { - int slot = atoi(slotsdef); + if (node == NULL) { + success = 0; + goto cleanup; + } + if (name != NULL) node->name = name; + if (from == to) { + node->slots[node->slots_count++] = from; + } else { + while (from <= to) { + int slot = from++; node->slots[node->slots_count++] = slot; } } - } - if (node->slots_count == 0) { - fprintf(stderr, "WARNING: Primary node %s:%d has no slots, skipping...\n", node->ip, node->port); - continue; - } - if (!addClusterNode(node)) { - success = 0; - goto cleanup; + if (node->slots_count == 0) { + fprintf(stderr, "WARNING: Node %s:%d has no slots, skipping...\n", node->ip, node->port); + continue; + } + if (!addClusterNode(node)) { + success = 0; + goto cleanup; + } } } cleanup: From 7cbfa2f077648298cc711b5242224576f117fc5b Mon Sep 17 00:00:00 2001 From: bluayer Date: Wed, 4 Dec 2024 17:34:56 +0900 Subject: [PATCH 02/16] Add CLI option for reading from replicas 1. Add CLI option and description for reading from replicas with cluster option. 2. If the user uses the replicas option, the READONLY command is automatically used when the client is created. Signed-off-by: bluayer Signed-off-by: bluayer --- src/valkey-benchmark.c | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 155a7f9b82..3855daebd4 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -112,6 +112,7 @@ static struct config { int num_threads; struct benchmarkThread **threads; int cluster_mode; + int read_from_replicas; int cluster_node_count; struct clusterNode **cluster_nodes; struct serverConfig *redis_config; @@ -710,6 +711,16 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } + if (config.cluster_mode && config.read_from_replicas) { + char *buf = NULL; + int len; + + len = redisFormatCommand(&buf, "READONLY"); + c->obuf = sdscatlen(c->obuf, buf, len); + free(buf); + c->prefix_pending++; + } + c->prefixlen = sdslen(c->obuf); /* Append the request itself. */ if (from) { @@ -835,7 +846,8 @@ static void showLatencyReport(void) { printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); if (config.cluster_mode) { - printf(" cluster mode: yes (%d primaries)\n", config.cluster_node_count); + char * node_prefix = config.read_from_replicas ? "replicas" : "primaries"; + printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_prefix); int m; for (m = 0; m < config.cluster_node_count; m++) { clusterNode *node = config.cluster_nodes[m]; @@ -1082,7 +1094,8 @@ static int fetchClusterConfiguration(void) { int to = r->element[1]->integer; for (j = 2; j < r->elements; j++) { int is_primary = (j == 2); - if (!is_primary) continue; + if (config.read_from_replicas == is_primary) continue; + redisReply *nr = r->element[j]; assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); assert(nr->element[0]->str != NULL); @@ -1370,6 +1383,8 @@ int parseOptions(int argc, char **argv) { config.num_threads = 0; } else if (!strcmp(argv[i], "--cluster")) { config.cluster_mode = 1; + } else if (!strcmp(argv[i], "--replicas")) { + config.read_from_replicas = 1; } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; } else if (!strcmp(argv[i], "--help")) { @@ -1467,6 +1482,8 @@ int parseOptions(int argc, char **argv) { " If the command is supplied on the command line in cluster\n" " mode, the key must contain \"{tag}\". Otherwise, the\n" " command will not be sent to the right cluster node.\n" + " --replicas Enable read from replicas in cluster mode.\n" + " This command must be used with the --cluster option.\n" " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" " -r Use random keys for SET/GET/INCR, random values for SADD,\n" @@ -1608,6 +1625,7 @@ int main(int argc, char **argv) { config.num_threads = 0; config.threads = NULL; config.cluster_mode = 0; + config.read_from_replicas = 0; config.cluster_node_count = 0; config.cluster_nodes = NULL; config.redis_config = NULL; @@ -1652,7 +1670,8 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count); exit(1); } - printf("Cluster has %d primary nodes:\n\n", config.cluster_node_count); + char * node_prefix = config.read_from_replicas ? "replica" : "primary"; + printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_prefix); int i = 0; for (; i < config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; @@ -1660,7 +1679,7 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster node #%d\n", i); exit(1); } - printf("Primary %d: ", i); + printf("%s %d: ", node_prefix, i); if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port); node->redis_config = getServerConfig(node->ip, node->port, NULL); From 97ad6152f54f45a62ae2e70a2dd14f790daf8679 Mon Sep 17 00:00:00 2001 From: bluayer Date: Thu, 5 Dec 2024 16:17:25 +0900 Subject: [PATCH 03/16] Change to support both primary and replica when changing slots 1. Update the slot by distinguishing whether it is a primary or a replica. 2. With the above change, the variable name "primaries" has been changed to "nodes". Signed-off-by: bluayer Signed-off-by: bluayer --- src/valkey-benchmark.c | 54 ++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 3855daebd4..331eabe2e7 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -1145,7 +1145,7 @@ static int fetchClusterConfiguration(void) { static int fetchClusterSlotsConfiguration(client c) { UNUSED(c); int success = 1, is_fetching_slots = 0, last_update = 0; - size_t i; + size_t i, j; last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); if (c->slots_last_update < last_update) { @@ -1168,7 +1168,7 @@ static int fetchClusterSlotsConfiguration(client c) { NULL /* allow to expand */ }; /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */ - dict *primaries = dictCreate(&dtype); + dict *nodes = dictCreate(&dtype); redisContext *ctx = NULL; for (i = 0; i < (size_t)config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; @@ -1186,7 +1186,7 @@ static int fetchClusterSlotsConfiguration(client c) { if (node->updated_slots != NULL) zfree(node->updated_slots); node->updated_slots = NULL; node->updated_slots_count = 0; - dictReplace(primaries, node->name, node); + dictReplace(nodes, node->name, node); } reply = redisCommand(ctx, "CLUSTER SLOTS"); if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { @@ -1202,30 +1202,42 @@ static int fetchClusterSlotsConfiguration(client c) { int from, to, slot; from = r->element[0]->integer; to = r->element[1]->integer; - redisReply *nr = r->element[2]; - assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); - assert(nr->element[2]->str != NULL); - sds name = sdsnew(nr->element[2]->str); - dictEntry *entry = dictFind(primaries, name); - if (entry == NULL) { - success = 0; - fprintf(stderr, - "%s: could not find node with ID %s in current " - "configuration.\n", - errmsg, name); - if (name) sdsfree(name); - goto cleanup; + + size_t start, end; + if (config.read_from_replicas) { + start = 3; + end = r->elements; + } else { + start = 2; + end = 3; + } + + for (j = start; j < end; j++) { + redisReply *nr = r->element[j]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); + assert(nr->element[2]->str != NULL); + sds name = sdsnew(nr->element[2]->str); + dictEntry *entry = dictFind(nodes, name); + if (entry == NULL) { + success = 0; + fprintf(stderr, + "%s: could not find node with ID %s in current " + "configuration.\n", + errmsg, name); + if (name) sdsfree(name); + goto cleanup; + } + sdsfree(name); + clusterNode *node = dictGetVal(entry); + if (node->updated_slots == NULL) node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int)); + for (slot = from; slot <= to; slot++) node->updated_slots[node->updated_slots_count++] = slot; } - sdsfree(name); - clusterNode *node = dictGetVal(entry); - if (node->updated_slots == NULL) node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int)); - for (slot = from; slot <= to; slot++) node->updated_slots[node->updated_slots_count++] = slot; } updateClusterSlotsConfiguration(); cleanup: freeReplyObject(reply); redisFree(ctx); - dictRelease(primaries); + dictRelease(nodes); atomic_store_explicit(&config.is_fetching_slots, 0, memory_order_relaxed); return success; } From fcc240ff39e51188329c0f8c7129512789b0413d Mon Sep 17 00:00:00 2001 From: bluayer Date: Thu, 5 Dec 2024 17:38:03 +0900 Subject: [PATCH 04/16] Add description for CLI Added a warning about writing requests with replicas option Signed-off-by: bluayer Signed-off-by: bluayer --- src/valkey-benchmark.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 331eabe2e7..4061c7ec77 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -1158,6 +1158,8 @@ static int fetchClusterSlotsConfiguration(client c) { if (is_fetching_slots) return -1; // TODO: use other codes || errno ? atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); + fprintf(stderr, "If you are using the --replicas option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); + const char *errmsg = "Failed to update cluster slots configuration"; static dictType dtype = { dictSdsHash, /* hash function */ @@ -1495,7 +1497,9 @@ int parseOptions(int argc, char **argv) { " mode, the key must contain \"{tag}\". Otherwise, the\n" " command will not be sent to the right cluster node.\n" " --replicas Enable read from replicas in cluster mode.\n" - " This command must be used with the --cluster option.\n" + " This command must be used with the --cluster option.\n" + " When using this option, it is recommended to use only \n" + " the commands for read requests." " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" " -r Use random keys for SET/GET/INCR, random values for SADD,\n" From ebc0fc30580845fec0c8b24f9a3a9a30ad824e46 Mon Sep 17 00:00:00 2001 From: bluayer Date: Thu, 5 Dec 2024 18:07:24 +0900 Subject: [PATCH 05/16] Formatting according to clang-format Signed-off-by: bluayer Signed-off-by: bluayer --- src/valkey-benchmark.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 4061c7ec77..0e64e49124 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -714,7 +714,6 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { if (config.cluster_mode && config.read_from_replicas) { char *buf = NULL; int len; - len = redisFormatCommand(&buf, "READONLY"); c->obuf = sdscatlen(c->obuf, buf, len); free(buf); @@ -846,7 +845,7 @@ static void showLatencyReport(void) { printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); if (config.cluster_mode) { - char * node_prefix = config.read_from_replicas ? "replicas" : "primaries"; + char *node_prefix = config.read_from_replicas ? "replicas" : "primaries"; printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_prefix); int m; for (m = 0; m < config.cluster_node_count; m++) { @@ -1204,7 +1203,6 @@ static int fetchClusterSlotsConfiguration(client c) { int from, to, slot; from = r->element[0]->integer; to = r->element[1]->integer; - size_t start, end; if (config.read_from_replicas) { start = 3; @@ -1499,7 +1497,7 @@ int parseOptions(int argc, char **argv) { " --replicas Enable read from replicas in cluster mode.\n" " This command must be used with the --cluster option.\n" " When using this option, it is recommended to use only \n" - " the commands for read requests." + " the commands for read requests.\n" " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" " -r Use random keys for SET/GET/INCR, random values for SADD,\n" @@ -1686,7 +1684,7 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count); exit(1); } - char * node_prefix = config.read_from_replicas ? "replica" : "primary"; + char *node_prefix = config.read_from_replicas ? "replica" : "primary"; printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_prefix); int i = 0; for (; i < config.cluster_node_count; i++) { From 136ed28d646bf59c185c28025a95e226cdb181bf Mon Sep 17 00:00:00 2001 From: bluayer Date: Wed, 11 Dec 2024 21:56:27 +0900 Subject: [PATCH 06/16] Remove migrating, importing attributes No longer managing attributes about slots migration at the cluster node struct, so remove it. Signed-off-by: bluayer --- src/valkey-benchmark.c | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 0e64e49124..e25ba1bed4 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -169,12 +169,6 @@ typedef struct clusterNode { int *updated_slots; /* Used by updateClusterSlotsConfiguration */ int updated_slots_count; /* Used by updateClusterSlotsConfiguration */ int replicas_count; - sds *migrating; /* An array of sds where even strings are slots and odd - * strings are the destination node IDs. */ - sds *importing; /* An array of sds where even strings are slots and odd - * strings are the source node IDs. */ - int migrating_count; /* Length of the migrating array (migrating slots*2) */ - int importing_count; /* Length of the importing array (importing slots*2) */ struct serverConfig *redis_config; } clusterNode; @@ -1020,10 +1014,6 @@ static clusterNode *createClusterNode(char *ip, int port) { node->slots_count = 0; node->updated_slots = NULL; node->updated_slots_count = 0; - node->migrating = NULL; - node->importing = NULL; - node->migrating_count = 0; - node->importing_count = 0; node->redis_config = NULL; return node; } @@ -1032,14 +1022,6 @@ static void freeClusterNode(clusterNode *node) { int i; if (node->name) sdsfree(node->name); if (node->replicate) sdsfree(node->replicate); - if (node->migrating != NULL) { - for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]); - zfree(node->migrating); - } - if (node->importing != NULL) { - for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]); - zfree(node->importing); - } /* If the node is not the reference node, that uses the address from * config.conn_info.hostip and config.conn_info.hostport, then the node ip has been * allocated by fetchClusterConfiguration, so it must be freed. */ From 66cd375139409ab8f0cacbbc8d928ea54cfedb4b Mon Sep 17 00:00:00 2001 From: bluayer Date: Wed, 11 Dec 2024 22:58:27 +0900 Subject: [PATCH 07/16] Creating a node with multiple slot ranges only once The `CLUSTER SLOTS` command returns a node that is responsible for multiple slot ranges multiple times. Therefore, I modify it for creating a node only once using dict, even if there are multiple slot ranges. Signed-off-by: bluayer --- src/valkey-benchmark.c | 52 ++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index e25ba1bed4..ab6ef54e62 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -223,6 +223,15 @@ static int dictSdsKeyCompare(const void *key1, const void *key2) { return memcmp(key1, key2, l1) == 0; } +static dictType dtype = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + static redisContext *getRedisContext(const char *ip, int port, const char *hostsocket) { redisContext *ctx = NULL; redisReply *reply = NULL; @@ -1019,7 +1028,6 @@ static clusterNode *createClusterNode(char *ip, int port) { } static void freeClusterNode(clusterNode *node) { - int i; if (node->name) sdsfree(node->name); if (node->replicate) sdsfree(node->replicate); /* If the node is not the reference node, that uses the address from @@ -1067,6 +1075,7 @@ static int fetchClusterConfiguration(void) { goto cleanup; } assert(reply->type == REDIS_REPLY_ARRAY); + dict *nodes = dictCreate(&dtype); for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; assert(r->type == REDIS_REPLY_ARRAY); @@ -1085,15 +1094,18 @@ static int fetchClusterConfiguration(void) { sds name = sdsnew(nr->element[2]->str); int port = nr->element[1]->integer; - printf("\nNode %s [%d-%d] %s:%d\n", name, from, to, ip, port); - clusterNode *node = NULL; - node = createClusterNode(sdsnew(ip), port); - if (node == NULL) { - success = 0; - goto cleanup; + dictEntry *entry = dictFind(nodes, name); + if (entry == NULL) { + node = createClusterNode(sdsnew(ip), port); + if (node == NULL) { + success = 0; + goto cleanup; + } + if (name != NULL) node->name = name; + } else { + node = dictGetVal(entry); } - if (name != NULL) node->name = name; if (from == to) { node->slots[node->slots_count++] = from; } else { @@ -1106,9 +1118,12 @@ static int fetchClusterConfiguration(void) { fprintf(stderr, "WARNING: Node %s:%d has no slots, skipping...\n", node->ip, node->port); continue; } - if (!addClusterNode(node)) { - success = 0; - goto cleanup; + if (entry == NULL) { + dictReplace(nodes, node->name, node); + if (!addClusterNode(node)) { + success = 0; + goto cleanup; + } } } } @@ -1139,17 +1154,10 @@ static int fetchClusterSlotsConfiguration(client c) { if (is_fetching_slots) return -1; // TODO: use other codes || errno ? atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); - fprintf(stderr, "If you are using the --replicas option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); + fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); const char *errmsg = "Failed to update cluster slots configuration"; - static dictType dtype = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCompare, /* key compare */ - NULL, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ - }; + /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */ dict *nodes = dictCreate(&dtype); redisContext *ctx = NULL; @@ -1377,7 +1385,7 @@ int parseOptions(int argc, char **argv) { config.num_threads = 0; } else if (!strcmp(argv[i], "--cluster")) { config.cluster_mode = 1; - } else if (!strcmp(argv[i], "--replicas")) { + } else if (!strcmp(argv[i], "--rfr")) { config.read_from_replicas = 1; } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; @@ -1476,7 +1484,7 @@ int parseOptions(int argc, char **argv) { " If the command is supplied on the command line in cluster\n" " mode, the key must contain \"{tag}\". Otherwise, the\n" " command will not be sent to the right cluster node.\n" - " --replicas Enable read from replicas in cluster mode.\n" + " --rfr Enable read from replicas in cluster mode.\n" " This command must be used with the --cluster option.\n" " When using this option, it is recommended to use only \n" " the commands for read requests.\n" From bd440407cc0bb23bb1dd73942756b9e1ed1b9a4d Mon Sep 17 00:00:00 2001 From: bluayer Date: Thu, 12 Dec 2024 00:50:05 +0900 Subject: [PATCH 08/16] Add --rfa and --rfro option - When users enable rfa(read_from_all) option, they can read from replica and primary. - When users enable rfro(read_from_replicas only) option, they can read from replicas only. - If users don't use any option related to replicas, they can read from primaries only. - Add READONLY when enabling option for reading all nodes Signed-off-by: bluayer --- src/valkey-benchmark.c | 63 +++++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index ab6ef54e62..7258b3f8c4 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -112,7 +112,8 @@ static struct config { int num_threads; struct benchmarkThread **threads; int cluster_mode; - int read_from_replicas; + int read_from_all; + int read_from_replicas_only; int cluster_node_count; struct clusterNode **cluster_nodes; struct serverConfig *redis_config; @@ -714,7 +715,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } - if (config.cluster_mode && config.read_from_replicas) { + if (config.cluster_mode && (config.read_from_replicas_only || config.read_from_all)) { char *buf = NULL; int len; len = redisFormatCommand(&buf, "READONLY"); @@ -848,7 +849,14 @@ static void showLatencyReport(void) { printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); if (config.cluster_mode) { - char *node_prefix = config.read_from_replicas ? "replicas" : "primaries"; + const char *node_prefix = NULL; + if (config.read_from_all) { + node_prefix = "all"; + } else if (config.read_from_replicas_only) { + node_prefix = "replica"; + } else { + node_prefix = "primary"; + } printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_prefix); int m; for (m = 0; m < config.cluster_node_count; m++) { @@ -1061,12 +1069,14 @@ static int fetchClusterConfiguration(void) { int success = 1; redisContext *ctx = NULL; redisReply *reply = NULL; + dict *nodes = NULL; const char *errmsg = "Failed to fetch cluster configuration"; size_t i, j; ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket); if (ctx == NULL) { exit(1); } + assert(!(config.read_from_all && config.read_from_replicas_only) && "--rfa and --rfro cannot be enabled simultaneously"); reply = redisCommand(ctx, "CLUSTER SLOTS"); if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { @@ -1075,7 +1085,7 @@ static int fetchClusterConfiguration(void) { goto cleanup; } assert(reply->type == REDIS_REPLY_ARRAY); - dict *nodes = dictCreate(&dtype); + nodes = dictCreate(&dtype); for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; assert(r->type == REDIS_REPLY_ARRAY); @@ -1084,7 +1094,8 @@ static int fetchClusterConfiguration(void) { int to = r->element[1]->integer; for (j = 2; j < r->elements; j++) { int is_primary = (j == 2); - if (config.read_from_replicas == is_primary) continue; + int is_cluster_option_only = (!config.read_from_all && !config.read_from_replicas_only); + if ((config.read_from_replicas_only && is_primary) || (is_cluster_option_only && !is_primary)) continue; redisReply *nr = r->element[j]; assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); @@ -1093,6 +1104,8 @@ static int fetchClusterConfiguration(void) { sds ip = sdsnew(nr->element[0]->str); sds name = sdsnew(nr->element[2]->str); int port = nr->element[1]->integer; + int slot_start = from; + int slot_end = to; clusterNode *node = NULL; dictEntry *entry = dictFind(nodes, name); @@ -1106,11 +1119,11 @@ static int fetchClusterConfiguration(void) { } else { node = dictGetVal(entry); } - if (from == to) { - node->slots[node->slots_count++] = from; + if (slot_start == slot_end) { + node->slots[node->slots_count++] = slot_start; } else { - while (from <= to) { - int slot = from++; + while (slot_start <= slot_end) { + int slot = slot_start++; node->slots[node->slots_count++] = slot; } } @@ -1133,6 +1146,7 @@ static int fetchClusterConfiguration(void) { if (config.cluster_nodes) freeClusterNodes(); } if (reply) freeReplyObject(reply); + if (nodes) dictRelease(nodes); return success; } @@ -1154,7 +1168,7 @@ static int fetchClusterSlotsConfiguration(client c) { if (is_fetching_slots) return -1; // TODO: use other codes || errno ? atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); - fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); + fprintf(stderr, "If you are using the --rfa and --rfro option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); const char *errmsg = "Failed to update cluster slots configuration"; @@ -1194,7 +1208,10 @@ static int fetchClusterSlotsConfiguration(client c) { from = r->element[0]->integer; to = r->element[1]->integer; size_t start, end; - if (config.read_from_replicas) { + if (config.read_from_all) { + start = 2; + end = r->elements; + } else if (config.read_from_replicas_only) { start = 3; end = r->elements; } else { @@ -1385,8 +1402,10 @@ int parseOptions(int argc, char **argv) { config.num_threads = 0; } else if (!strcmp(argv[i], "--cluster")) { config.cluster_mode = 1; - } else if (!strcmp(argv[i], "--rfr")) { - config.read_from_replicas = 1; + } else if (!strcmp(argv[i], "--rfa")) { + config.read_from_all = 1; + } else if (!strcmp(argv[i], "--rfro")) { + config.read_from_replicas_only = 1; } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; } else if (!strcmp(argv[i], "--help")) { @@ -1484,7 +1503,11 @@ int parseOptions(int argc, char **argv) { " If the command is supplied on the command line in cluster\n" " mode, the key must contain \"{tag}\". Otherwise, the\n" " command will not be sent to the right cluster node.\n" - " --rfr Enable read from replicas in cluster mode.\n" + " --rfa Enable read from all nodes(primary and replica) in cluster mode.\n" + " This command must be used with the --cluster option.\n" + " When using this option, it is recommended to use only \n" + " the commands for read requests.\n" + " --rfro Enable read from replicas only in cluster mode.\n" " This command must be used with the --cluster option.\n" " When using this option, it is recommended to use only \n" " the commands for read requests.\n" @@ -1629,7 +1652,8 @@ int main(int argc, char **argv) { config.num_threads = 0; config.threads = NULL; config.cluster_mode = 0; - config.read_from_replicas = 0; + config.read_from_all = 0; + config.read_from_replicas_only = 0; config.cluster_node_count = 0; config.cluster_nodes = NULL; config.redis_config = NULL; @@ -1674,7 +1698,14 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count); exit(1); } - char *node_prefix = config.read_from_replicas ? "replica" : "primary"; + const char *node_prefix = NULL; + if (config.read_from_all) { + node_prefix = "all"; + } else if (config.read_from_replicas_only) { + node_prefix = "replica"; + } else { + node_prefix = "primary"; + } printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_prefix); int i = 0; for (; i < config.cluster_node_count; i++) { From e1e925f9ceb6a3ff3509e608531157df89f51354 Mon Sep 17 00:00:00 2001 From: bluayer Date: Tue, 17 Dec 2024 22:13:15 +0900 Subject: [PATCH 09/16] Combining two options into single option "--rfr" Refactoring with enum, readFromReplicas, and combining two options(rfa, rfro) int single option(rfr) Signed-off-by: bluayer --- src/valkey-benchmark.c | 51 ++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 7258b3f8c4..e49a9bffb4 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -77,6 +77,13 @@ struct benchmarkThread; struct clusterNode; struct serverConfig; +/* Read from replica options */ +typedef enum readFromReplica { + FROM_PRIMARY_ONLY = 0, /* default option */ + FROM_REPLICA_ONLY, + FROM_ALL +} readFromReplica; + static struct config { aeEventLoop *el; cliConnInfo conn_info; @@ -112,8 +119,7 @@ static struct config { int num_threads; struct benchmarkThread **threads; int cluster_mode; - int read_from_all; - int read_from_replicas_only; + readFromReplica read_from_replica; int cluster_node_count; struct clusterNode **cluster_nodes; struct serverConfig *redis_config; @@ -715,7 +721,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } - if (config.cluster_mode && (config.read_from_replicas_only || config.read_from_all)) { + if (config.cluster_mode && (config.read_from_replica == FROM_REPLICA_ONLY|| config.read_from_replica == FROM_ALL)) { char *buf = NULL; int len; len = redisFormatCommand(&buf, "READONLY"); @@ -850,9 +856,9 @@ static void showLatencyReport(void) { printf(" keep alive: %d\n", config.keepalive); if (config.cluster_mode) { const char *node_prefix = NULL; - if (config.read_from_all) { + if (config.read_from_replica == FROM_ALL) { node_prefix = "all"; - } else if (config.read_from_replicas_only) { + } else if (config.read_from_replica == FROM_REPLICA_ONLY) { node_prefix = "replica"; } else { node_prefix = "primary"; @@ -1076,7 +1082,6 @@ static int fetchClusterConfiguration(void) { if (ctx == NULL) { exit(1); } - assert(!(config.read_from_all && config.read_from_replicas_only) && "--rfa and --rfro cannot be enabled simultaneously"); reply = redisCommand(ctx, "CLUSTER SLOTS"); if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { @@ -1094,8 +1099,8 @@ static int fetchClusterConfiguration(void) { int to = r->element[1]->integer; for (j = 2; j < r->elements; j++) { int is_primary = (j == 2); - int is_cluster_option_only = (!config.read_from_all && !config.read_from_replicas_only); - if ((config.read_from_replicas_only && is_primary) || (is_cluster_option_only && !is_primary)) continue; + int is_cluster_option_only = (config.read_from_replica == FROM_PRIMARY_ONLY); + if ((config.read_from_replica == FROM_REPLICA_ONLY && is_primary) || (is_cluster_option_only && !is_primary)) continue; redisReply *nr = r->element[j]; assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); @@ -1168,7 +1173,7 @@ static int fetchClusterSlotsConfiguration(client c) { if (is_fetching_slots) return -1; // TODO: use other codes || errno ? atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); - fprintf(stderr, "If you are using the --rfa and --rfro option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); + fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); const char *errmsg = "Failed to update cluster slots configuration"; @@ -1208,10 +1213,10 @@ static int fetchClusterSlotsConfiguration(client c) { from = r->element[0]->integer; to = r->element[1]->integer; size_t start, end; - if (config.read_from_all) { + if (config.read_from_replica == FROM_ALL) { start = 2; end = r->elements; - } else if (config.read_from_replicas_only) { + } else if (config.read_from_replica == FROM_REPLICA_ONLY) { start = 3; end = r->elements; } else { @@ -1402,10 +1407,11 @@ int parseOptions(int argc, char **argv) { config.num_threads = 0; } else if (!strcmp(argv[i], "--cluster")) { config.cluster_mode = 1; - } else if (!strcmp(argv[i], "--rfa")) { - config.read_from_all = 1; - } else if (!strcmp(argv[i], "--rfro")) { - config.read_from_replicas_only = 1; + } else if (!strcmp(argv[i], "--rfr")) { + config.read_from_replica = FROM_REPLICA_ONLY; + if (argv[i + 1] && atoi(argv[++i]) == 2) { + config.read_from_replica = FROM_ALL; + } } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; } else if (!strcmp(argv[i], "--help")) { @@ -1503,14 +1509,12 @@ int parseOptions(int argc, char **argv) { " If the command is supplied on the command line in cluster\n" " mode, the key must contain \"{tag}\". Otherwise, the\n" " command will not be sent to the right cluster node.\n" - " --rfa Enable read from all nodes(primary and replica) in cluster mode.\n" - " This command must be used with the --cluster option.\n" - " When using this option, it is recommended to use only \n" - " the commands for read requests.\n" - " --rfro Enable read from replicas only in cluster mode.\n" + " --rfr Enable read from replicas in cluster mode.\n" " This command must be used with the --cluster option.\n" " When using this option, it is recommended to use only \n" " the commands for read requests.\n" + " default=read from replica only\n" + " 2=read from all nodes(primary and replica)\n" " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" " -r Use random keys for SET/GET/INCR, random values for SADD,\n" @@ -1652,8 +1656,7 @@ int main(int argc, char **argv) { config.num_threads = 0; config.threads = NULL; config.cluster_mode = 0; - config.read_from_all = 0; - config.read_from_replicas_only = 0; + config.read_from_replica = FROM_PRIMARY_ONLY; config.cluster_node_count = 0; config.cluster_nodes = NULL; config.redis_config = NULL; @@ -1699,9 +1702,9 @@ int main(int argc, char **argv) { exit(1); } const char *node_prefix = NULL; - if (config.read_from_all) { + if (config.read_from_replica == FROM_ALL) { node_prefix = "all"; - } else if (config.read_from_replicas_only) { + } else if (config.read_from_replica == FROM_REPLICA_ONLY) { node_prefix = "replica"; } else { node_prefix = "primary"; From 570f5f643f6dcbad51f3c1ddeffde78678ba47e1 Mon Sep 17 00:00:00 2001 From: bluayer Date: Wed, 18 Dec 2024 01:51:56 +0900 Subject: [PATCH 10/16] Moidfy mode option to string and allocate replicate name 1. Modify mode option, "replicas" and "all" only 2. Allocate replicate name in replica node and determine whether it is a primary or a replica based on the presence of "replicate" 3. Adjust position of continue statement for --rfr only case 4. Change the i plus timing at option Signed-off-by: bluayer --- src/valkey-benchmark.c | 42 +++++++++++++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index e49a9bffb4..746547705e 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -857,7 +857,7 @@ static void showLatencyReport(void) { if (config.cluster_mode) { const char *node_prefix = NULL; if (config.read_from_replica == FROM_ALL) { - node_prefix = "all"; + node_prefix = "primaries and replicas"; } else if (config.read_from_replica == FROM_REPLICA_ONLY) { node_prefix = "replica"; } else { @@ -1097,15 +1097,18 @@ static int fetchClusterConfiguration(void) { assert(r->elements >= 3); int from = r->element[0]->integer; int to = r->element[1]->integer; + sds primary = NULL; for (j = 2; j < r->elements; j++) { - int is_primary = (j == 2); - int is_cluster_option_only = (config.read_from_replica == FROM_PRIMARY_ONLY); - if ((config.read_from_replica == FROM_REPLICA_ONLY && is_primary) || (is_cluster_option_only && !is_primary)) continue; - redisReply *nr = r->element[j]; assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); assert(nr->element[0]->str != NULL); assert(nr->element[2]->str != NULL); + + int is_primary = (j == 2); + if (is_primary) primary = sdsnew(nr->element[2]->str); + int is_cluster_option_only = (config.read_from_replica == FROM_PRIMARY_ONLY); + if ((config.read_from_replica == FROM_REPLICA_ONLY && is_primary) || (is_cluster_option_only && !is_primary)) continue; + sds ip = sdsnew(nr->element[0]->str); sds name = sdsnew(nr->element[2]->str); int port = nr->element[1]->integer; @@ -1119,8 +1122,10 @@ static int fetchClusterConfiguration(void) { if (node == NULL) { success = 0; goto cleanup; + } else { + node->name = name; + if (!is_primary) node->replicate = sdsdup(primary); } - if (name != NULL) node->name = name; } else { node = dictGetVal(entry); } @@ -1144,6 +1149,7 @@ static int fetchClusterConfiguration(void) { } } } + sdsfree(primary); } cleanup: if (ctx) redisFree(ctx); @@ -1409,8 +1415,16 @@ int parseOptions(int argc, char **argv) { config.cluster_mode = 1; } else if (!strcmp(argv[i], "--rfr")) { config.read_from_replica = FROM_REPLICA_ONLY; - if (argv[i + 1] && atoi(argv[++i]) == 2) { - config.read_from_replica = FROM_ALL; + if (argv[i+1]) { + if (!strcmp(argv[i+1], "all")) { + config.read_from_replica = FROM_ALL; + i++; + } else if (!strcmp(argv[i+1], "replicas")) { + config.read_from_replica = FROM_REPLICA_ONLY; + i++; + } else if (strlen(argv[i+1]) > 0 && argv[i+1][0] != '-') { + goto invalid; + } } } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; @@ -1513,7 +1527,12 @@ int parseOptions(int argc, char **argv) { " This command must be used with the --cluster option.\n" " When using this option, it is recommended to use only \n" " the commands for read requests.\n" - " default=read from replica only\n" + " There are two mode for reading from replicas.\n" + " If nothing is entered or 'replicas' is entered, this\n" + " sends read requests to replicas only (default) \n" + " If 'all' is entered, this sends read requests to all nodes,\n" + " including primaries and replicas. \n" + " If you only want to request the primaries, use --cluster option only.\n" " 2=read from all nodes(primary and replica)\n" " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" @@ -1703,7 +1722,7 @@ int main(int argc, char **argv) { } const char *node_prefix = NULL; if (config.read_from_replica == FROM_ALL) { - node_prefix = "all"; + node_prefix = "primaries and replicas"; } else if (config.read_from_replica == FROM_REPLICA_ONLY) { node_prefix = "replica"; } else { @@ -1717,7 +1736,8 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster node #%d\n", i); exit(1); } - printf("%s %d: ", node_prefix, i); + const char* node_type = (node->replicate == NULL ? "Primary" : "Replica"); + printf("%s %d: ", node_type, i); if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port); node->redis_config = getServerConfig(node->ip, node->port, NULL); From 18a787041bb0f05df1523d7d5f24cc02cab6a83c Mon Sep 17 00:00:00 2001 From: bluayer Date: Wed, 18 Dec 2024 02:50:36 +0900 Subject: [PATCH 11/16] Explicit print with Node Signed-off-by: bluayer --- src/valkey-benchmark.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 746547705e..688067a742 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -1722,7 +1722,7 @@ int main(int argc, char **argv) { } const char *node_prefix = NULL; if (config.read_from_replica == FROM_ALL) { - node_prefix = "primaries and replicas"; + node_prefix = "(primaries and replicas)"; } else if (config.read_from_replica == FROM_REPLICA_ONLY) { node_prefix = "replica"; } else { @@ -1737,7 +1737,7 @@ int main(int argc, char **argv) { exit(1); } const char* node_type = (node->replicate == NULL ? "Primary" : "Replica"); - printf("%s %d: ", node_type, i); + printf("Node %d(%s): ", i, node_type); if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port); node->redis_config = getServerConfig(node->ip, node->port, NULL); From 6f804b63ceb6cabfda75916795a8ef952af27501 Mon Sep 17 00:00:00 2001 From: bluayer Date: Wed, 18 Dec 2024 03:01:03 +0900 Subject: [PATCH 12/16] Change node_prefix to node_roles Signed-off-by: bluayer --- src/valkey-benchmark.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 688067a742..20b1237514 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -855,15 +855,15 @@ static void showLatencyReport(void) { printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); if (config.cluster_mode) { - const char *node_prefix = NULL; + const char *node_roles = NULL; if (config.read_from_replica == FROM_ALL) { - node_prefix = "primaries and replicas"; + node_roles = "cluster"; } else if (config.read_from_replica == FROM_REPLICA_ONLY) { - node_prefix = "replica"; + node_roles = "replica"; } else { - node_prefix = "primary"; + node_roles = "primary"; } - printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_prefix); + printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_roles); int m; for (m = 0; m < config.cluster_node_count; m++) { clusterNode *node = config.cluster_nodes[m]; @@ -1720,15 +1720,15 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count); exit(1); } - const char *node_prefix = NULL; + const char *node_roles = NULL; if (config.read_from_replica == FROM_ALL) { - node_prefix = "(primaries and replicas)"; + node_roles = "cluster"; } else if (config.read_from_replica == FROM_REPLICA_ONLY) { - node_prefix = "replica"; + node_roles = "replica"; } else { - node_prefix = "primary"; + node_roles = "primary"; } - printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_prefix); + printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_roles); int i = 0; for (; i < config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; From 67ae655dd1badc28999415e0f8fa8f69e80fdd8d Mon Sep 17 00:00:00 2001 From: Jungwoo Song <37579681+bluayer@users.noreply.github.com> Date: Wed, 18 Dec 2024 03:02:34 +0900 Subject: [PATCH 13/16] Change description in CLI Based on @ranshid suggestion Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com> Signed-off-by: Jungwoo Song <37579681+bluayer@users.noreply.github.com> --- src/valkey-benchmark.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 20b1237514..06f4a69b08 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -1525,15 +1525,12 @@ int parseOptions(int argc, char **argv) { " command will not be sent to the right cluster node.\n" " --rfr Enable read from replicas in cluster mode.\n" " This command must be used with the --cluster option.\n" - " When using this option, it is recommended to use only \n" - " the commands for read requests.\n" - " There are two mode for reading from replicas.\n" - " If nothing is entered or 'replicas' is entered, this\n" - " sends read requests to replicas only (default) \n" - " If 'all' is entered, this sends read requests to all nodes,\n" - " including primaries and replicas. \n" - " If you only want to request the primaries, use --cluster option only.\n" - " 2=read from all nodes(primary and replica)\n" + " There are three modes for reading from replicas:\n" + " 'no' - sends read requests to primaries only (default) \n" + " 'yes' - sends read requests to replicas only.\n" + " 'all' - sends read requests to all nodes.\n" + " Since write commands will not be accepted by replicas,\n" + " it is recommended to enable read from replicas only for read command tests.\n" " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" " -r Use random keys for SET/GET/INCR, random values for SADD,\n" From 4262232e02a51cc1acc7bb8eede2a0f5365a7bdf Mon Sep 17 00:00:00 2001 From: Jungwoo Song <37579681+bluayer@users.noreply.github.com> Date: Wed, 18 Dec 2024 07:10:25 +0900 Subject: [PATCH 14/16] Modify CLI options Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com> Signed-off-by: Jungwoo Song <37579681+bluayer@users.noreply.github.com> --- src/valkey-benchmark.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 06f4a69b08..916266c779 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -1414,18 +1414,18 @@ int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i], "--cluster")) { config.cluster_mode = 1; } else if (!strcmp(argv[i], "--rfr")) { - config.read_from_replica = FROM_REPLICA_ONLY; - if (argv[i+1]) { - if (!strcmp(argv[i+1], "all")) { + if (argv[++i]) { + if (!strcmp(argv[i], "all")) { config.read_from_replica = FROM_ALL; - i++; - } else if (!strcmp(argv[i+1], "replicas")) { + } else if (!strcmp(argv[i], "yes")) { config.read_from_replica = FROM_REPLICA_ONLY; - i++; - } else if (strlen(argv[i+1]) > 0 && argv[i+1][0] != '-') { + } else if (!strcmp(argv[i], "no")) { + config.read_from_replica = FROM_PRIMARY_ONLY; + } else { goto invalid; } - } + } else + goto invalid; } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; } else if (!strcmp(argv[i], "--help")) { From 54f84906fc91bb6afa43bbe01f9d6422b1ae2f72 Mon Sep 17 00:00:00 2001 From: Jungwoo Song <37579681+bluayer@users.noreply.github.com> Date: Wed, 18 Dec 2024 17:15:45 +0900 Subject: [PATCH 15/16] Remove unnecessary printing stderr Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com> Signed-off-by: Jungwoo Song <37579681+bluayer@users.noreply.github.com> --- src/valkey-benchmark.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 916266c779..966c9b488a 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -1179,8 +1179,6 @@ static int fetchClusterSlotsConfiguration(client c) { if (is_fetching_slots) return -1; // TODO: use other codes || errno ? atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); - fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n"); - const char *errmsg = "Failed to update cluster slots configuration"; /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */ From a739454bae0d6aa592a267e6d5c399772fb8b1e5 Mon Sep 17 00:00:00 2001 From: bluayer Date: Thu, 19 Dec 2024 20:40:46 +0900 Subject: [PATCH 16/16] Formatting with clang Signed-off-by: bluayer --- src/valkey-benchmark.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 966c9b488a..1924203ae7 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -721,7 +721,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } - if (config.cluster_mode && (config.read_from_replica == FROM_REPLICA_ONLY|| config.read_from_replica == FROM_ALL)) { + if (config.cluster_mode && (config.read_from_replica == FROM_REPLICA_ONLY || config.read_from_replica == FROM_ALL)) { char *buf = NULL; int len; len = redisFormatCommand(&buf, "READONLY"); @@ -1731,7 +1731,7 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster node #%d\n", i); exit(1); } - const char* node_type = (node->replicate == NULL ? "Primary" : "Replica"); + const char *node_type = (node->replicate == NULL ? "Primary" : "Replica"); printf("Node %d(%s): ", i, node_type); if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port);