Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Valkey 7.2.6] Generate correct slot information in cluster shards command on primary failure #809

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 39 additions & 36 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2591,7 +2591,7 @@ uint32_t writePingExt(clusterMsg *hdr, int gossipcount) {
clusterMsgPingExtHumanNodename *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME, getHumanNodenamePingExtSize());
memcpy(ext->human_nodename, myself->human_nodename, sdslen(myself->human_nodename));

/* Move the write cursor */
/* Move the write cursor */
cursor = nextPingExt(cursor);
}

Expand Down Expand Up @@ -2655,7 +2655,7 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
ext_hostname = hostname_ext->hostname;
} else if (type == CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME) {
} else if (type == CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME) {
clusterMsgPingExtHumanNodename *humannodename_ext = (clusterMsgPingExtHumanNodename *) &(ext->ext[0].human_nodename);
ext_humannodename = humannodename_ext->human_nodename;
} else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
Expand Down Expand Up @@ -4303,7 +4303,7 @@ void clusterHandleSlaveFailover(void) {
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
Expand Down Expand Up @@ -4842,7 +4842,7 @@ void clusterCron(void) {
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
server.cluster_allow_replica_migration)
server.cluster_allow_replica_migration)
clusterHandleSlaveMigration(max_slaves);
}

Expand Down Expand Up @@ -5759,37 +5759,6 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
setDeferredMapLen(c, node_replylen, reply_count);
}

/* Add the shard reply of a single shard based off the given primary node. */
void addShardReplyForClusterShards(client *c, list *nodes) {
serverAssert(listLength(nodes) > 0);
clusterNode *n = listNodeValue(listFirst(nodes));
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");

/* Use slot_info_pairs from the primary only */
n = clusterNodeGetMaster(n);

if (n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, n->slot_info_pairs_count);
for (int i = 0; i < n->slot_info_pairs_count; i++)
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes));
listIter li;
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
}
}

/* Add to the output buffer of the given client, an array of slot (start, end)
* pair owned by the shard, also the primary and set of replica(s) along with
* information about each node. */
Expand All @@ -5799,7 +5768,41 @@ void clusterReplyShards(client *c) {
clusterGenNodesSlotsInfo(0);
dictIterator *di = dictGetSafeIterator(server.cluster->shards);
for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
addShardReplyForClusterShards(c, dictGetVal(de));
list *nodes = dictGetVal(de);
serverAssert(listLength(nodes) > 0);
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");

/* Find a node which has the slot information served by this shard. */
clusterNode *n = NULL;
listIter li;
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
n = listNodeValue(ln);
if (n->slot_info_pairs) {
break;
}
}

if (n && n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, n->slot_info_pairs_count);
for (int i = 0; i < n->slot_info_pairs_count; i++) {
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
}
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes));
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
}
}
dictReleaseIterator(di);
}
Expand Down
55 changes: 55 additions & 0 deletions tests/unit/cluster/cluster-shards.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Get the node info with the specific node_id from the
# given reference node. Valid type options are "node" and "shard"
proc get_node_info_from_shard {id reference {type node}} {
set shards_response [R $reference CLUSTER SHARDS]
foreach shard_response $shards_response {
set nodes [dict get $shard_response nodes]
foreach node $nodes {
if {[dict get $node id] eq $id} {
if {$type eq "node"} {
return $node
} elseif {$type eq "shard"} {
return $shard_response
} else {
return {}
}
}
}
}
# No shard found, return nothing
return {}
}

start_cluster 3 3 {tags {external:skip cluster}} {
set primary_node 0
set replica_node 3
set validation_node 4
set node_0_id ""
set shard_0_slot_coverage {0 5461}

test "Cluster should start ok" {
wait_for_cluster_state ok
}

test "Cluster shards response is ok for shard 0" {
set node_0_id [R $primary_node CLUSTER MYID]
assert_equal $shard_0_slot_coverage [dict get [get_node_info_from_shard $node_0_id $validation_node "shard"] "slots"]
}

test "Kill a node and tell the replica to immediately takeover" {
pause_process [srv $primary_node pid]
R $replica_node CLUSTER failover force
}

test "Verify health as fail for killed node" {
wait_for_condition 50 100 {
"fail" eq [dict get [get_node_info_from_shard $node_0_id $validation_node "node"] "health"]
} else {
fail "New primary never detected the node failed"
}
}

test "CLUSTER SHARDS slot response is non-empty when primary node fails" {
assert_equal $shard_0_slot_coverage [dict get [get_node_info_from_shard $node_0_id $validation_node "shard"] "slots"]
}
}
Loading