Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Jul 9, 2024
1 parent 5f0ccf1 commit 6c46eef
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
24 changes: 20 additions & 4 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -4097,7 +4097,10 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
*
* The replica rank is used to add a delay to start an election in order to
* get voted and replace a failing primary. Replicas with better replication
* offsets are more likely to win. */
* offsets are more likely to win.
*
* If the replication offsets are the same, the one with the smaller node id
* will have a better ranking. */
int clusterGetReplicaRank(void) {
long long myoffset;
int j, rank = 0;
Expand All @@ -4108,10 +4111,17 @@ int clusterGetReplicaRank(void) {
if (primary == NULL) return 0; /* Never called by replicas without primary. */

myoffset = replicationGetReplicaOffset();
for (j = 0; j < primary->num_replicas; j++)
if (primary->replicas[j] != myself && !nodeCantFailover(primary->replicas[j]) &&
primary->replicas[j]->repl_offset > myoffset)
for (j = 0; j < primary->num_replicas; j++) {
if (primary->replicas[j] == myself) continue;
if (nodeCantFailover(primary->replicas[j])) continue;

if (primary->replicas[j]->repl_offset > myoffset) {
rank++;
} else if (primary->replicas[j]->repl_offset == myoffset &&
memcmp(primary->replicas[j]->name, myself->name, CLUSTER_NAMELEN) < 0) {
rank++;
}
}
return rank;
}

Expand Down Expand Up @@ -4327,6 +4337,12 @@ void clusterHandleReplicaFailover(void) {
server.cluster->failover_auth_rank = newrank;
serverLog(LL_NOTICE, "Replica rank updated to #%d, added %lld milliseconds of delay.", newrank,
added_delay);
} else if (newrank < server.cluster->failover_auth_rank) {
long long reduced_delay = (server.cluster->failover_auth_rank - newrank) * 1000;
server.cluster->failover_auth_time -= reduced_delay;
server.cluster->failover_auth_rank = newrank;
serverLog(LL_NOTICE, "Replica rank updated to #%d, reduced %lld milliseconds of delay.", newrank,
reduced_delay);
}
}

Expand Down
61 changes: 61 additions & 0 deletions tests/unit/cluster/failover.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,64 @@ test "Instance #0 gets converted into a slave" {
}

} ;# start_cluster

# Add all replicas to the first primary node.
proc cluster_allocate_replicas_to_one_primary {primaries replicas} {
set primary_id 0
for {set j 0} {$j < $replicas} {incr j} {
set replica_id [cluster_find_available_replica $primaries]
set primary_myself [cluster_get_myself $primary_id]
R $replica_id cluster replicate [dict get $primary_myself id]
}
}

start_cluster 5 10 {tags {external:skip cluster} overrides {cluster-node-timeout 15000}} {

test "Cluster is up" {
wait_for_cluster_state ok
}

test "Cluster is writable" {
cluster_write_test [srv 0 port]
}

set current_epoch [CI 1 cluster_current_epoch]

set paused_pid [srv 0 pid]
test "Killing the first primary node" {
pause_process $paused_pid
}

test "Wait for failover" {
wait_for_condition 1000 50 {
[CI 1 cluster_current_epoch] > $current_epoch
} else {
fail "No failover detected"
}
}

test "Cluster should eventually be up again" {
for {set j 0} {$j < [llength $::servers]} {incr j} {
if {[process_is_paused $paused_pid]} continue
wait_for_condition 1000 50 {
[CI $j cluster_state] eq "ok"
} else {
fail "Cluster node $j cluster_state:[CI $j cluster_state]"
}
}
}

test "Restarting the previously killed primary node" {
resume_process $paused_pid
}

test "Instance #0 gets converted into a replica" {
wait_for_condition 1000 50 {
[s 0 role] eq {slave}
} else {
fail "Old primary was not converted into replica"
}
wait_for_cluster_propagation
}

} continuous_slot_allocation cluster_allocate_replicas_to_one_primary ;# start_cluster

0 comments on commit 6c46eef

Please sign in to comment.