Skip to content

Commit

Permalink
code review from Ping
Browse files Browse the repository at this point in the history
Co-authored-by: Ping Xie <[email protected]>
  • Loading branch information
enjoy-binbin and PingXie committed Aug 12, 2024
1 parent e41044f commit 2a1707e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
28 changes: 14 additions & 14 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetPrimary(clusterNode *n, int closeSlots, int try_psync);
static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required);
void clusterHandleReplicaFailover(void);
void clusterHandleReplicaMigration(int max_replicas);
int bitmapTestBit(unsigned char *bitmap, int pos);
Expand Down Expand Up @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *
/* Check if this is our primary and we have to change the
* replication target as well. */
if (nodeIsReplica(myself) && myself->replicaof == node)
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 1);
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 0);
return 1;
}

Expand Down Expand Up @@ -2631,7 +2631,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* just gets promoted to the new primary in the shard.
*
* If the sender and myself are in the same shard, try psync. */
clusterSetPrimary(sender, !are_in_same_shard, are_in_same_shard);
clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
} else if ((sender_slots >= migrated_our_slots) && !are_in_same_shard) {
/* When all our slots are lost to the sender and the sender belongs to
Expand Down Expand Up @@ -3400,7 +3400,7 @@ int clusterProcessPacket(clusterLink *link) {
* so we can try a psync. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1, areInSameShard(myself->replicaof->replicaof, myself));
clusterSetPrimary(myself->replicaof->replicaof, 1, !areInSameShard(myself->replicaof->replicaof, myself));
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -4704,9 +4704,9 @@ void clusterHandleReplicaMigration(int max_replicas) {
!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) {
serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name,
target->human_nodename, target->shard_id);
/* We are migrating to a different primary, a total different replication
* history, so a full sync is required. */
clusterSetPrimary(target, 1, 0);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(target, 1, 1);
}
}

Expand Down Expand Up @@ -5019,7 +5019,7 @@ void clusterCron(void) {
* enable it if we know the address of our primary and it appears to
* be up. */
if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) {
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 1);
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 0);
}

/* Abort a manual failover if the timeout is reached. */
Expand Down Expand Up @@ -5412,7 +5412,7 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) {

/* Set the specified node 'n' as primary for this node.
* If this node is currently a primary, it is turned into a replica. */
void clusterSetPrimary(clusterNode *n, int closeSlots, int try_psync) {
static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);

Expand All @@ -5426,7 +5426,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots, int try_psync) {
myself->replicaof = n;
updateShardId(myself, n->shard_id);
clusterNodeAddReplica(n, myself);
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), try_psync);
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), full_sync_required);
removeAllNotOwnedShardChannelSubscriptions();
resetManualFailover();

Expand Down Expand Up @@ -6357,9 +6357,9 @@ void clusterCommandSetSlot(client *c) {
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
/* We are migrating to a different primary, a total different replication
* history, so a full sync is required. */
clusterSetPrimary(n, 1, 0);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(n, 1, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -6568,7 +6568,7 @@ int clusterCommandSpecial(client *c) {
* If the instance is a primary, it is an empty primary.
* If the instance is a replica, it had a totally different replication history.
* In these both cases, myself as a replica has to do a full sync. */
clusterSetPrimary(n, 1, 0);
clusterSetPrimary(n, 1, 1);
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
addReply(c, shared.ok);
Expand Down
18 changes: 10 additions & 8 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3723,7 +3723,7 @@ int cancelReplicationHandshake(int reconnect) {
}

/* Set replication to the specified primary address and port. */
void replicationSetPrimary(char *ip, int port, int try_psync) {
void replicationSetPrimary(char *ip, int port, int full_sync_required) {
int was_primary = server.primary_host == NULL;

sdsfree(server.primary_host);
Expand Down Expand Up @@ -3752,14 +3752,16 @@ void replicationSetPrimary(char *ip, int port, int try_psync) {

/* Before destroying our primary state, create a cached primary using
* our own parameters, to later PSYNC with the new primary. */
if (was_primary && try_psync) {
if (was_primary && !full_sync_required) {
replicationDiscardCachedPrimary();
replicationCachePrimaryUsingMyself();
}

/* If try_psync is 0, it means the caller requested that psync not be
* performed, dropping the cached primary. */
if (!try_psync) {
/* If full sync is required, drop the cached primary. Doing so increases
* this replica node's election rank (delay) and reduces its chance of
* winning the election. If a replica requiring a full sync wins the
* election, it will flush valid data in the shard, causing data loss. */
if (full_sync_required) {
replicationDiscardCachedPrimary();
}

Expand Down Expand Up @@ -3900,7 +3902,7 @@ void replicaofCommand(client *c) {
}
/* There was no previous primary or the user specified a different one,
* we can continue. */
replicationSetPrimary(c->argv[1]->ptr, port, 1);
replicationSetPrimary(c->argv[1]->ptr, port, 0);
sds client = catClientInfoString(sdsempty(), c);
serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host,
server.primary_port, client);
Expand Down Expand Up @@ -4909,7 +4911,7 @@ void updateFailoverStatus(void) {
server.target_replica_port);
server.failover_state = FAILOVER_IN_PROGRESS;
/* If timeout has expired force a failover if requested. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 1);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
return;
} else {
/* Force was not requested, so timeout. */
Expand Down Expand Up @@ -4952,6 +4954,6 @@ void updateFailoverStatus(void) {
serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host,
server.target_replica_port);
/* Designated replica is caught up, failover to it. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 1);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
}
}
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3020,7 +3020,7 @@ void replicationStartPendingFork(void);
void replicationHandlePrimaryDisconnection(void);
void replicationCachePrimary(client *c);
void resizeReplicationBacklog(void);
void replicationSetPrimary(char *ip, int port, int try_psync);
void replicationSetPrimary(char *ip, int port, int full_sync_required);
void replicationUnsetPrimary(void);
void refreshGoodReplicasCount(void);
int checkGoodReplicasStatus(void);
Expand Down
32 changes: 16 additions & 16 deletions tests/unit/cluster/replica_migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
assert_equal {1024} [R 0 get key_991803]
}

test "Write some data to primary 4, slot 0, make a big repl_offset" {
test "Write some data to primary 3, slot 0, make a big repl_offset" {
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]
}

test "If a replica has not completed sync, it can not do the failover" {
# 10s, make sure primary 1 will hang in the save
test "If a replica has not completed full sync, the offset is 0 and the rank is the lowest" {
# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Move the slot 0 from primary 4 to primary 1
# Move the slot 0 from primary 3 to primary 0
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
set code [catch {
Expand All @@ -40,15 +40,15 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

# Let primary 4's primary and replica can convert to replicas when
# they lost the last slot.
# Validate that shard 3's primary and replica can convert to replicas after
# they lose the last slot.
R 3 config set cluster-replica-validity-factor 0
R 7 config set cluster-replica-validity-factor 0
R 3 config set cluster-allow-replica-migration yes
R 7 config set cluster-allow-replica-migration yes

# Shutdown the primary 1
catch {R 0 shutdown nosave}
# Shutdown the primary 0.
catch {R 0 shutdown}

# Wait for the replica become a primary, and make sure
# the other primary become a replica.
Expand All @@ -63,14 +63,14 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
fail "Failover does not happened"
}

# Make sure 3 / 7 get the lower rank and offset is 0.
# Make sure 3 / 7 get the lower rank and the offset is 0.
verify_log_message -3 "*Start of election*offset 0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Make sure the right replica get the higher rank.
verify_log_message -4 "*Start of election*rank #0*" 0

# Make sure the key is exists and consistent.
# Make sure the key exists and is consistent.
R 3 readonly
R 7 readonly
wait_for_condition 1000 50 {
Expand All @@ -94,24 +94,24 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
assert_equal {1024} [R 0 get key_991803]
}

test "Write some data to primary 4, slot 0, make a big repl_offset" {
test "Write some data to primary 3, slot 0, make a big repl_offset" {
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]
}

test "A old replica use CLUSTER REPLICA get a zero offset before the full sync is completed" {
# 10s, make sure primary 1 will hang in the save
# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Let the replica do the replicate with primary 1.
# Let the replica do the replicate with primary 0.
R 7 config set cluster-replica-validity-factor 0
R 7 config set cluster-allow-replica-migration yes
R 7 cluster replicate [R 0 cluster myid]

# Shutdown the primary 1.
catch {R 0 shutdown nosave}
# Shutdown the primary 0.
catch {R 0 shutdown}

# Wait for the replica become a primary.
wait_for_condition 1000 50 {
Expand All @@ -127,7 +127,7 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
verify_log_message -4 "*Start of election*rank #0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Make sure the key is exists and consistence.
# Make sure the key exists and is consistent.
R 7 readonly
wait_for_condition 1000 50 {
[R 4 get key_991803] == 1024 &&
Expand Down

0 comments on commit 2a1707e

Please sign in to comment.