diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 57cdd6fc16..1924203ae7 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -77,6 +77,13 @@ struct benchmarkThread; struct clusterNode; struct serverConfig; +/* Read from replica options */ +typedef enum readFromReplica { + FROM_PRIMARY_ONLY = 0, /* default option */ + FROM_REPLICA_ONLY, + FROM_ALL +} readFromReplica; + static struct config { aeEventLoop *el; cliConnInfo conn_info; @@ -112,6 +119,7 @@ static struct config { int num_threads; struct benchmarkThread **threads; int cluster_mode; + readFromReplica read_from_replica; int cluster_node_count; struct clusterNode **cluster_nodes; struct serverConfig *redis_config; @@ -168,12 +176,6 @@ typedef struct clusterNode { int *updated_slots; /* Used by updateClusterSlotsConfiguration */ int updated_slots_count; /* Used by updateClusterSlotsConfiguration */ int replicas_count; - sds *migrating; /* An array of sds where even strings are slots and odd - * strings are the destination node IDs. */ - sds *importing; /* An array of sds where even strings are slots and odd - * strings are the source node IDs. */ - int migrating_count; /* Length of the migrating array (migrating slots*2) */ - int importing_count; /* Length of the importing array (importing slots*2) */ struct serverConfig *redis_config; } clusterNode; @@ -228,6 +230,15 @@ static int dictSdsKeyCompare(const void *key1, const void *key2) { return memcmp(key1, key2, l1) == 0; } +static dictType dtype = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + static redisContext *getRedisContext(const char *ip, int port, const char *hostsocket) { redisContext *ctx = NULL; redisReply *reply = NULL; @@ -710,6 +721,15 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } + if (config.cluster_mode && (config.read_from_replica == FROM_REPLICA_ONLY || config.read_from_replica == FROM_ALL)) { + char *buf = NULL; + int len; + len = redisFormatCommand(&buf, "READONLY"); + c->obuf = sdscatlen(c->obuf, buf, len); + free(buf); + c->prefix_pending++; + } + c->prefixlen = sdslen(c->obuf); /* Append the request itself. */ if (from) { @@ -835,7 +855,15 @@ static void showLatencyReport(void) { printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); if (config.cluster_mode) { - printf(" cluster mode: yes (%d primaries)\n", config.cluster_node_count); + const char *node_roles = NULL; + if (config.read_from_replica == FROM_ALL) { + node_roles = "cluster"; + } else if (config.read_from_replica == FROM_REPLICA_ONLY) { + node_roles = "replica"; + } else { + node_roles = "primary"; + } + printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_roles); int m; for (m = 0; m < config.cluster_node_count; m++) { clusterNode *node = config.cluster_nodes[m]; @@ -1009,26 +1037,13 @@ static clusterNode *createClusterNode(char *ip, int port) { node->slots_count = 0; node->updated_slots = NULL; node->updated_slots_count = 0; - node->migrating = NULL; - node->importing = NULL; - node->migrating_count = 0; - node->importing_count = 0; node->redis_config = NULL; return node; } static void freeClusterNode(clusterNode *node) { - int i; if (node->name) sdsfree(node->name); if (node->replicate) sdsfree(node->replicate); - if (node->migrating != NULL) { - for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]); - zfree(node->migrating); - } - if (node->importing != NULL) { - for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]); - zfree(node->importing); - } /* If the node is not the reference node, that uses the address from * config.conn_info.hostip and config.conn_info.hostport, then the node ip has been * allocated by fetchClusterConfiguration, so it must be freed. */ @@ -1056,157 +1071,85 @@ static clusterNode **addClusterNode(clusterNode *node) { return config.cluster_nodes; } -/* TODO: This should be refactored to use CLUSTER SLOTS, the migrating/importing - * information is anyway not used. - */ static int fetchClusterConfiguration(void) { int success = 1; redisContext *ctx = NULL; redisReply *reply = NULL; + dict *nodes = NULL; + const char *errmsg = "Failed to fetch cluster configuration"; + size_t i, j; ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket); if (ctx == NULL) { exit(1); } - clusterNode *firstNode = createClusterNode((char *)config.conn_info.hostip, config.conn_info.hostport); - if (!firstNode) { + + reply = redisCommand(ctx, "CLUSTER SLOTS"); + if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { success = 0; + if (reply) fprintf(stderr, "%s\nCLUSTER SLOTS ERROR: %s\n", errmsg, reply->str); goto cleanup; } - reply = redisCommand(ctx, "CLUSTER NODES"); - success = (reply != NULL); - if (!success) goto cleanup; - success = (reply->type != REDIS_REPLY_ERROR); - if (!success) { - if (config.hostsocket == NULL) { - fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n", config.conn_info.hostip, - config.conn_info.hostport, reply->str); - } else { - fprintf(stderr, "Cluster node %s replied with error:\n%s\n", config.hostsocket, reply->str); - } - goto cleanup; - } - char *lines = reply->str, *p, *line; - while ((p = strstr(lines, "\n")) != NULL) { - *p = '\0'; - line = lines; - lines = p + 1; - char *name = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL; - int i = 0; - while ((p = strchr(line, ' ')) != NULL) { - *p = '\0'; - char *token = line; - line = p + 1; - switch (i++) { - case 0: name = token; break; - case 1: addr = token; break; - case 2: flags = token; break; - case 3: primary_id = token; break; - } - if (i == 8) break; // Slots - } - if (!flags) { - fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n"); - success = 0; - goto cleanup; - } - int myself = (strstr(flags, "myself") != NULL); - int is_replica = (strstr(flags, "slave") != NULL || (primary_id != NULL && primary_id[0] != '-')); - if (is_replica) continue; - if (addr == NULL) { - fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n"); - success = 0; - goto cleanup; - } - clusterNode *node = NULL; - char *ip = NULL; - int port = 0; - char *paddr = strrchr(addr, ':'); - if (paddr != NULL) { - *paddr = '\0'; - ip = addr; - addr = paddr + 1; - /* If internal bus is specified, then just drop it. */ - if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0'; - port = atoi(addr); - } - if (myself) { - node = firstNode; - if (ip != NULL && strcmp(node->ip, ip) != 0) { - node->ip = sdsnew(ip); - node->port = port; + assert(reply->type == REDIS_REPLY_ARRAY); + nodes = dictCreate(&dtype); + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + assert(r->type == REDIS_REPLY_ARRAY); + assert(r->elements >= 3); + int from = r->element[0]->integer; + int to = r->element[1]->integer; + sds primary = NULL; + for (j = 2; j < r->elements; j++) { + redisReply *nr = r->element[j]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); + assert(nr->element[0]->str != NULL); + assert(nr->element[2]->str != NULL); + + int is_primary = (j == 2); + if (is_primary) primary = sdsnew(nr->element[2]->str); + int is_cluster_option_only = (config.read_from_replica == FROM_PRIMARY_ONLY); + if ((config.read_from_replica == FROM_REPLICA_ONLY && is_primary) || (is_cluster_option_only && !is_primary)) continue; + + sds ip = sdsnew(nr->element[0]->str); + sds name = sdsnew(nr->element[2]->str); + int port = nr->element[1]->integer; + int slot_start = from; + int slot_end = to; + + clusterNode *node = NULL; + dictEntry *entry = dictFind(nodes, name); + if (entry == NULL) { + node = createClusterNode(sdsnew(ip), port); + if (node == NULL) { + success = 0; + goto cleanup; + } else { + node->name = name; + if (!is_primary) node->replicate = sdsdup(primary); + } + } else { + node = dictGetVal(entry); } - } else { - node = createClusterNode(sdsnew(ip), port); - } - if (node == NULL) { - success = 0; - goto cleanup; - } - if (name != NULL) node->name = sdsnew(name); - if (i == 8) { - int remaining = strlen(line); - while (remaining > 0) { - p = strchr(line, ' '); - if (p == NULL) p = line + remaining; - remaining -= (p - line); - - char *slotsdef = line; - *p = '\0'; - if (remaining) { - line = p + 1; - remaining--; - } else - line = p; - char *dash = NULL; - if (slotsdef[0] == '[') { - slotsdef++; - if ((p = strstr(slotsdef, "->-"))) { // Migrating - *p = '\0'; - p += 3; - char *closing_bracket = strchr(p, ']'); - if (closing_bracket) *closing_bracket = '\0'; - sds slot = sdsnew(slotsdef); - sds dst = sdsnew(p); - node->migrating_count += 2; - node->migrating = zrealloc(node->migrating, (node->migrating_count * sizeof(sds))); - node->migrating[node->migrating_count - 2] = slot; - node->migrating[node->migrating_count - 1] = dst; - } else if ((p = strstr(slotsdef, "-<-"))) { // Importing - *p = '\0'; - p += 3; - char *closing_bracket = strchr(p, ']'); - if (closing_bracket) *closing_bracket = '\0'; - sds slot = sdsnew(slotsdef); - sds src = sdsnew(p); - node->importing_count += 2; - node->importing = zrealloc(node->importing, (node->importing_count * sizeof(sds))); - node->importing[node->importing_count - 2] = slot; - node->importing[node->importing_count - 1] = src; - } - } else if ((dash = strchr(slotsdef, '-')) != NULL) { - p = dash; - int start, stop; - *p = '\0'; - start = atoi(slotsdef); - stop = atoi(p + 1); - while (start <= stop) { - int slot = start++; - node->slots[node->slots_count++] = slot; - } - } else if (p > slotsdef) { - int slot = atoi(slotsdef); + if (slot_start == slot_end) { + node->slots[node->slots_count++] = slot_start; + } else { + while (slot_start <= slot_end) { + int slot = slot_start++; node->slots[node->slots_count++] = slot; } } + if (node->slots_count == 0) { + fprintf(stderr, "WARNING: Node %s:%d has no slots, skipping...\n", node->ip, node->port); + continue; + } + if (entry == NULL) { + dictReplace(nodes, node->name, node); + if (!addClusterNode(node)) { + success = 0; + goto cleanup; + } + } } - if (node->slots_count == 0) { - fprintf(stderr, "WARNING: Primary node %s:%d has no slots, skipping...\n", node->ip, node->port); - continue; - } - if (!addClusterNode(node)) { - success = 0; - goto cleanup; - } + sdsfree(primary); } cleanup: if (ctx) redisFree(ctx); @@ -1214,6 +1157,7 @@ static int fetchClusterConfiguration(void) { if (config.cluster_nodes) freeClusterNodes(); } if (reply) freeReplyObject(reply); + if (nodes) dictRelease(nodes); return success; } @@ -1222,7 +1166,7 @@ static int fetchClusterConfiguration(void) { static int fetchClusterSlotsConfiguration(client c) { UNUSED(c); int success = 1, is_fetching_slots = 0, last_update = 0; - size_t i; + size_t i, j; last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed); if (c->slots_last_update < last_update) { @@ -1236,16 +1180,9 @@ static int fetchClusterSlotsConfiguration(client c) { atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed); fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n"); const char *errmsg = "Failed to update cluster slots configuration"; - static dictType dtype = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCompare, /* key compare */ - NULL, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ - }; + /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */ - dict *primaries = dictCreate(&dtype); + dict *nodes = dictCreate(&dtype); redisContext *ctx = NULL; for (i = 0; i < (size_t)config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; @@ -1263,7 +1200,7 @@ static int fetchClusterSlotsConfiguration(client c) { if (node->updated_slots != NULL) zfree(node->updated_slots); node->updated_slots = NULL; node->updated_slots_count = 0; - dictReplace(primaries, node->name, node); + dictReplace(nodes, node->name, node); } reply = redisCommand(ctx, "CLUSTER SLOTS"); if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { @@ -1279,30 +1216,44 @@ static int fetchClusterSlotsConfiguration(client c) { int from, to, slot; from = r->element[0]->integer; to = r->element[1]->integer; - redisReply *nr = r->element[2]; - assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); - assert(nr->element[2]->str != NULL); - sds name = sdsnew(nr->element[2]->str); - dictEntry *entry = dictFind(primaries, name); - if (entry == NULL) { - success = 0; - fprintf(stderr, - "%s: could not find node with ID %s in current " - "configuration.\n", - errmsg, name); - if (name) sdsfree(name); - goto cleanup; + size_t start, end; + if (config.read_from_replica == FROM_ALL) { + start = 2; + end = r->elements; + } else if (config.read_from_replica == FROM_REPLICA_ONLY) { + start = 3; + end = r->elements; + } else { + start = 2; + end = 3; + } + + for (j = start; j < end; j++) { + redisReply *nr = r->element[j]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); + assert(nr->element[2]->str != NULL); + sds name = sdsnew(nr->element[2]->str); + dictEntry *entry = dictFind(nodes, name); + if (entry == NULL) { + success = 0; + fprintf(stderr, + "%s: could not find node with ID %s in current " + "configuration.\n", + errmsg, name); + if (name) sdsfree(name); + goto cleanup; + } + sdsfree(name); + clusterNode *node = dictGetVal(entry); + if (node->updated_slots == NULL) node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int)); + for (slot = from; slot <= to; slot++) node->updated_slots[node->updated_slots_count++] = slot; } - sdsfree(name); - clusterNode *node = dictGetVal(entry); - if (node->updated_slots == NULL) node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int)); - for (slot = from; slot <= to; slot++) node->updated_slots[node->updated_slots_count++] = slot; } updateClusterSlotsConfiguration(); cleanup: freeReplyObject(reply); redisFree(ctx); - dictRelease(primaries); + dictRelease(nodes); atomic_store_explicit(&config.is_fetching_slots, 0, memory_order_relaxed); return success; } @@ -1460,6 +1411,19 @@ int parseOptions(int argc, char **argv) { config.num_threads = 0; } else if (!strcmp(argv[i], "--cluster")) { config.cluster_mode = 1; + } else if (!strcmp(argv[i], "--rfr")) { + if (argv[++i]) { + if (!strcmp(argv[i], "all")) { + config.read_from_replica = FROM_ALL; + } else if (!strcmp(argv[i], "yes")) { + config.read_from_replica = FROM_REPLICA_ONLY; + } else if (!strcmp(argv[i], "no")) { + config.read_from_replica = FROM_PRIMARY_ONLY; + } else { + goto invalid; + } + } else + goto invalid; } else if (!strcmp(argv[i], "--enable-tracking")) { config.enable_tracking = 1; } else if (!strcmp(argv[i], "--help")) { @@ -1557,6 +1521,14 @@ int parseOptions(int argc, char **argv) { " If the command is supplied on the command line in cluster\n" " mode, the key must contain \"{tag}\". Otherwise, the\n" " command will not be sent to the right cluster node.\n" + " --rfr Enable read from replicas in cluster mode.\n" + " This command must be used with the --cluster option.\n" + " There are three modes for reading from replicas:\n" + " 'no' - sends read requests to primaries only (default) \n" + " 'yes' - sends read requests to replicas only.\n" + " 'all' - sends read requests to all nodes.\n" + " Since write commands will not be accepted by replicas,\n" + " it is recommended to enable read from replicas only for read command tests.\n" " --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" " -r Use random keys for SET/GET/INCR, random values for SADD,\n" @@ -1698,6 +1670,7 @@ int main(int argc, char **argv) { config.num_threads = 0; config.threads = NULL; config.cluster_mode = 0; + config.read_from_replica = FROM_PRIMARY_ONLY; config.cluster_node_count = 0; config.cluster_nodes = NULL; config.redis_config = NULL; @@ -1742,7 +1715,15 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count); exit(1); } - printf("Cluster has %d primary nodes:\n\n", config.cluster_node_count); + const char *node_roles = NULL; + if (config.read_from_replica == FROM_ALL) { + node_roles = "cluster"; + } else if (config.read_from_replica == FROM_REPLICA_ONLY) { + node_roles = "replica"; + } else { + node_roles = "primary"; + } + printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_roles); int i = 0; for (; i < config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; @@ -1750,7 +1731,8 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster node #%d\n", i); exit(1); } - printf("Primary %d: ", i); + const char *node_type = (node->replicate == NULL ? "Primary" : "Replica"); + printf("Node %d(%s): ", i, node_type); if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port); node->redis_config = getServerConfig(node->ip, node->port, NULL);