Skip to content

Commit

Permalink
Creating a node with multiple slot ranges only once
Browse files Browse the repository at this point in the history
The `CLUSTER SLOTS` command returns a node that is responsible for multiple slot ranges multiple times. Therefore, I modify it for creating a node only once using dict, even if there are multiple slot ranges.
  • Loading branch information
bluayer committed Dec 11, 2024
1 parent db738c3 commit 76545cb
Showing 1 changed file with 30 additions and 22 deletions.
52 changes: 30 additions & 22 deletions src/valkey-benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,15 @@ static int dictSdsKeyCompare(const void *key1, const void *key2) {
return memcmp(key1, key2, l1) == 0;
}

static dictType dtype = {
dictSdsHash, /* hash function */
NULL, /* key dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};

static redisContext *getRedisContext(const char *ip, int port, const char *hostsocket) {
redisContext *ctx = NULL;
redisReply *reply = NULL;
Expand Down Expand Up @@ -1019,7 +1028,6 @@ static clusterNode *createClusterNode(char *ip, int port) {
}

static void freeClusterNode(clusterNode *node) {
int i;
if (node->name) sdsfree(node->name);
if (node->replicate) sdsfree(node->replicate);
/* If the node is not the reference node, that uses the address from
Expand Down Expand Up @@ -1067,6 +1075,7 @@ static int fetchClusterConfiguration(void) {
goto cleanup;
}
assert(reply->type == REDIS_REPLY_ARRAY);
dict *nodes = dictCreate(&dtype);
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
assert(r->type == REDIS_REPLY_ARRAY);
Expand All @@ -1085,15 +1094,18 @@ static int fetchClusterConfiguration(void) {
sds name = sdsnew(nr->element[2]->str);
int port = nr->element[1]->integer;

printf("\nNode %s [%d-%d] %s:%d\n", name, from, to, ip, port);

clusterNode *node = NULL;
node = createClusterNode(sdsnew(ip), port);
if (node == NULL) {
success = 0;
goto cleanup;
dictEntry *entry = dictFind(nodes, name);
if (entry == NULL) {
node = createClusterNode(sdsnew(ip), port);
if (node == NULL) {
success = 0;
goto cleanup;
}
if (name != NULL) node->name = name;
} else {
node = dictGetVal(entry);
}
if (name != NULL) node->name = name;
if (from == to) {
node->slots[node->slots_count++] = from;
} else {
Expand All @@ -1106,9 +1118,12 @@ static int fetchClusterConfiguration(void) {
fprintf(stderr, "WARNING: Node %s:%d has no slots, skipping...\n", node->ip, node->port);
continue;
}
if (!addClusterNode(node)) {
success = 0;
goto cleanup;
if (entry == NULL) {
dictReplace(nodes, node->name, node);
if (!addClusterNode(node)) {
success = 0;
goto cleanup;
}
}
}
}
Expand Down Expand Up @@ -1139,17 +1154,10 @@ static int fetchClusterSlotsConfiguration(client c) {
if (is_fetching_slots) return -1; // TODO: use other codes || errno ?
atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed);
fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n");
fprintf(stderr, "If you are using the --replicas option and sending write requests (set type commands),\nthe requests could not be processed properly.\n");
fprintf(stderr, "If you are using the --rfr option and sending write requests (set type commands),\nthe requests could not be processed properly.\n");

const char *errmsg = "Failed to update cluster slots configuration";
static dictType dtype = {
dictSdsHash, /* hash function */
NULL, /* key dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};

/* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */
dict *nodes = dictCreate(&dtype);
redisContext *ctx = NULL;
Expand Down Expand Up @@ -1377,7 +1385,7 @@ int parseOptions(int argc, char **argv) {
config.num_threads = 0;
} else if (!strcmp(argv[i], "--cluster")) {
config.cluster_mode = 1;
} else if (!strcmp(argv[i], "--replicas")) {
} else if (!strcmp(argv[i], "--rfr")) {
config.read_from_replicas = 1;
} else if (!strcmp(argv[i], "--enable-tracking")) {
config.enable_tracking = 1;
Expand Down Expand Up @@ -1476,7 +1484,7 @@ int parseOptions(int argc, char **argv) {
" If the command is supplied on the command line in cluster\n"
" mode, the key must contain \"{tag}\". Otherwise, the\n"
" command will not be sent to the right cluster node.\n"
" --replicas Enable read from replicas in cluster mode.\n"
" --rfr Enable read from replicas in cluster mode.\n"
" This command must be used with the --cluster option.\n"
" When using this option, it is recommended to use only \n"
" the commands for read requests.\n"
Expand Down

0 comments on commit 76545cb

Please sign in to comment.