Skip to content

Commit

Permalink
Generate correct slot information in cluster shards command on primar…
Browse files Browse the repository at this point in the history
…y failure (#790)

Fix #784

Prior to the change, `CLUSTER SHARDS` command processing might pick a
failed primary node which won't have the slot coverage information and
the slots `output` in turn would be empty. This change finds an
appropriate node which has the slot coverage information served by a
given shard and correctly displays it as part of `CLUSTER SHARDS`
output.

 Before:

 ```
 1) 1) "slots"
   2)  (empty array)
   3) "nodes"
   4) 1)  1) "id"
          2) "2936f22a490095a0a851b7956b0a88f2b67a5d44"
          ...
          9) "role"
         10) "master"
         ...
         13) "health"
         14) "fail"
 ```

 After:

 ```
 1) 1) "slots"
   2) 1) 0
       2) 5461
   3) "nodes"
   4) 1)  1) "id"
          2) "2936f22a490095a0a851b7956b0a88f2b67a5d44"
          ...
          9) "role"
         10) "master"
         ...
         13) "health"
         14) "fail"
 ```

---------

Signed-off-by: Harkrishn Patro <[email protected]>
  • Loading branch information
hpatro authored and PingXie committed Jul 23, 2024
1 parent 6df023f commit 3aaa129
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 36 deletions.
75 changes: 39 additions & 36 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2591,7 +2591,7 @@ uint32_t writePingExt(clusterMsg *hdr, int gossipcount) {
clusterMsgPingExtHumanNodename *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME, getHumanNodenamePingExtSize());
memcpy(ext->human_nodename, myself->human_nodename, sdslen(myself->human_nodename));

/* Move the write cursor */
/* Move the write cursor */
cursor = nextPingExt(cursor);
}

Expand Down Expand Up @@ -2655,7 +2655,7 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
ext_hostname = hostname_ext->hostname;
} else if (type == CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME) {
} else if (type == CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME) {
clusterMsgPingExtHumanNodename *humannodename_ext = (clusterMsgPingExtHumanNodename *) &(ext->ext[0].human_nodename);
ext_humannodename = humannodename_ext->human_nodename;
} else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
Expand Down Expand Up @@ -4303,7 +4303,7 @@ void clusterHandleSlaveFailover(void) {
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
serverLog(LL_NOTICE,
"Start of election delayed for %lld milliseconds "
Expand Down Expand Up @@ -4842,7 +4842,7 @@ void clusterCron(void) {
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
server.cluster_allow_replica_migration)
server.cluster_allow_replica_migration)
clusterHandleSlaveMigration(max_slaves);
}

Expand Down Expand Up @@ -5759,37 +5759,6 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
setDeferredMapLen(c, node_replylen, reply_count);
}

/* Add the shard reply of a single shard based off the given primary node. */
void addShardReplyForClusterShards(client *c, list *nodes) {
serverAssert(listLength(nodes) > 0);
clusterNode *n = listNodeValue(listFirst(nodes));
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");

/* Use slot_info_pairs from the primary only */
n = clusterNodeGetMaster(n);

if (n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, n->slot_info_pairs_count);
for (int i = 0; i < n->slot_info_pairs_count; i++)
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes));
listIter li;
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
}
}

/* Add to the output buffer of the given client, an array of slot (start, end)
* pair owned by the shard, also the primary and set of replica(s) along with
* information about each node. */
Expand All @@ -5799,7 +5768,41 @@ void clusterReplyShards(client *c) {
clusterGenNodesSlotsInfo(0);
dictIterator *di = dictGetSafeIterator(server.cluster->shards);
for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
addShardReplyForClusterShards(c, dictGetVal(de));
list *nodes = dictGetVal(de);
serverAssert(listLength(nodes) > 0);
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");

/* Find a node which has the slot information served by this shard. */
clusterNode *n = NULL;
listIter li;
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
n = listNodeValue(ln);
if (n->slot_info_pairs) {
break;
}
}

if (n && n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, n->slot_info_pairs_count);
for (int i = 0; i < n->slot_info_pairs_count; i++) {
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
}
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes));
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
}
}
dictReleaseIterator(di);
}
Expand Down
55 changes: 55 additions & 0 deletions tests/unit/cluster/cluster-shards.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Get the node info with the specific node_id from the
# given reference node. Valid type options are "node" and "shard"
proc get_node_info_from_shard {id reference {type node}} {
set shards_response [R $reference CLUSTER SHARDS]
foreach shard_response $shards_response {
set nodes [dict get $shard_response nodes]
foreach node $nodes {
if {[dict get $node id] eq $id} {
if {$type eq "node"} {
return $node
} elseif {$type eq "shard"} {
return $shard_response
} else {
return {}
}
}
}
}
# No shard found, return nothing
return {}
}

start_cluster 3 3 {tags {external:skip cluster}} {
set primary_node 0
set replica_node 3
set validation_node 4
set node_0_id ""
set shard_0_slot_coverage {0 5461}

test "Cluster should start ok" {
wait_for_cluster_state ok
}

test "Cluster shards response is ok for shard 0" {
set node_0_id [R $primary_node CLUSTER MYID]
assert_equal $shard_0_slot_coverage [dict get [get_node_info_from_shard $node_0_id $validation_node "shard"] "slots"]
}

test "Kill a node and tell the replica to immediately takeover" {
pause_process [srv $primary_node pid]
R $replica_node CLUSTER failover force
}

test "Verify health as fail for killed node" {
wait_for_condition 50 100 {
"fail" eq [dict get [get_node_info_from_shard $node_0_id $validation_node "node"] "health"]
} else {
fail "New primary never detected the node failed"
}
}

test "CLUSTER SHARDS slot response is non-empty when primary node fails" {
assert_equal $shard_0_slot_coverage [dict get [get_node_info_from_shard $node_0_id $validation_node "shard"] "slots"]
}
}

0 comments on commit 3aaa129

Please sign in to comment.