Skip to content

Commit

Permalink
Fix dual-channel-replication related issues (#837)
Browse files Browse the repository at this point in the history
- Fix TLS bug where connection were shutdown by primary's main process
while the child process was still writing- causing main process to be
blocked.
- TLS connection fix -file descriptors are set to blocking mode in the
main thread, followed by a blocking write. This sets the file
descriptors to non-blocking if TLS is used (see `connTLSSyncWrite()`)
(@xbasel).
- Improve the reliability of dual-channel tests. Modify the pause
mechanism to verify process status directly, rather than relying on log.
- Ensure that `server.repl_offset` and `server.replid` are updated
correctly when dual channel synchronization completes successfully.
Thist led to failures in replication tests that validate replication IDs
or compare replication offsets.

---------

Signed-off-by: naglera <[email protected]>
Signed-off-by: naglera <[email protected]>
Signed-off-by: xbasel <[email protected]>
Signed-off-by: Madelyn Olson <[email protected]>
Signed-off-by: Binbin <[email protected]>
Co-authored-by: ranshid <[email protected]>
Co-authored-by: xbasel <[email protected]>
Co-authored-by: Madelyn Olson <[email protected]>
Co-authored-by: Binbin <[email protected]>
  • Loading branch information
5 people committed Sep 2, 2024
1 parent 914abb0 commit 9034e84
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 41 deletions.
6 changes: 5 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,11 @@ void unlinkClient(client *c) {
}
}
/* Only use shutdown when the fork is active and we are the parent. */
if (server.child_type) connShutdown(c->conn);
if (server.child_type && !c->flag.repl_rdb_channel) {
connShutdown(c->conn);
} else if (c->flag.repl_rdb_channel) {
shutdown(c->conn->fd, SHUT_RDWR);
}
connClose(c->conn);
c->conn = NULL;
}
Expand Down
4 changes: 2 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3557,14 +3557,14 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {

conns[connsnum++] = replica->conn;
if (dual_channel) {
/* Put the socket in blocking mode to simplify RDB transfer. */
connBlock(replica->conn);
connSendTimeout(replica->conn, server.repl_timeout * 1000);
/* This replica uses diskless dual channel sync, hence we need
* to inform it with the save end offset.*/
sendCurrentOffsetToReplica(replica);
/* Make sure repl traffic is appended to the replication backlog */
addRdbReplicaToPsyncWait(replica);
/* Put the socket in blocking mode to simplify RDB transfer. */
connBlock(replica->conn);
} else {
server.rdb_pipe_numconns++;
}
Expand Down
8 changes: 6 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ void freeReplicaReferencedReplBuffer(client *replica) {
uint64_t rdb_cid = htonu64(replica->id);
if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&rdb_cid, sizeof(rdb_cid), NULL)) {
serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.",
replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id);
replicationGetReplicaName(replica), (long long unsigned int)replica->id);
}
}
if (replica->ref_repl_buf_node != NULL) {
Expand Down Expand Up @@ -956,7 +956,9 @@ int startBgsaveForReplication(int mincapa, int req) {
/* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */
serverAssert(socket_target || !(req & REPLICA_REQ_RDB_MASK));

serverLog(LL_NOTICE, "Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk");
serverLog(LL_NOTICE, "Starting BGSAVE for SYNC with target: %s using: %s",
socket_target ? "replicas sockets" : "disk",
(req & REPLICA_REQ_RDB_CHANNEL) ? "dual-channel" : "normal sync");

rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
Expand Down Expand Up @@ -4159,6 +4161,8 @@ void replicationResurrectProvisionalPrimary(void) {
memcpy(server.primary->replid, server.repl_provisional_primary.replid, CONFIG_RUN_ID_SIZE);
server.primary->reploff = server.repl_provisional_primary.reploff;
server.primary->read_reploff = server.repl_provisional_primary.read_reploff;
server.primary_repl_offset = server.primary->reploff;
memcpy(server.replid, server.primary->replid, sizeof(server.primary->replid));
establishPrimaryConnection();
}

Expand Down
81 changes: 45 additions & 36 deletions tests/integration/dual-channel-replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ proc get_client_id_by_last_cmd {r cmd} {
return $client_id
}

# Wait until the process enters a paused state, then resume the process.
proc wait_and_resume_process idx {
set pid [srv $idx pid]
wait_for_log_messages $idx {"*Process is about to stop.*"} 0 2000 1
wait_for_condition 50 1000 {
[string match "T*" [exec ps -o state= -p $pid]]
} else {
fail "Process $pid didn't stop, current state is [exec ps -o state= -p $pid]"
}
resume_process $pid
}

Expand Down Expand Up @@ -315,13 +320,12 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}

$replica1 replicaof no one
$primary set key3 val3


test "Test replica's buffer limit reached" {
$primary config set repl-diskless-sync-delay 0
$primary config set rdb-key-save-delay 500
$primary config set rdb-key-save-delay 10000
# At this point we have about 10k keys in the db,
# We expect that the next full sync will take 5 seconds (10k*500)ms
# We expect that the next full sync will take 100 seconds (10k*10000)ms
# It will give us enough time to fill the replica buffer.
$replica1 config set dual-channel-replication-enabled yes
$replica1 config set client-output-buffer-limit "replica 16383 16383 0"
Expand All @@ -343,19 +347,25 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}
assert {[s -2 replicas_replication_buffer_size] <= 16385*2}

# Wait for sync to succeed
# Primary replication buffer should grow
wait_for_condition 50 1000 {
[status $replica1 master_link_status] == "up"
[status $primary mem_total_replication_buffers] >= 81915
} else {
fail "Replica is not synced"
fail "Primary should take the load"
}
wait_for_value_to_propegate_to_replica $primary $replica1 "key3"
}

$replica1 replicaof no one
$replica1 config set client-output-buffer-limit "replica 256mb 256mb 0"; # remove repl buffer limitation
$primary config set rdb-key-save-delay 0

wait_for_condition 500 1000 {
[s 0 rdb_bgsave_in_progress] eq 0
} else {
fail "can't kill rdb child"
}

$primary set key4 val4
$primary set key3 val3

test "dual-channel-replication fails when primary diskless disabled" {
set cur_psync [status $primary sync_partial_ok]
Expand All @@ -370,7 +380,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
} else {
fail "Replica is not synced"
}
wait_for_value_to_propegate_to_replica $primary $replica1 "key4"
wait_for_value_to_propegate_to_replica $primary $replica1 "key3"

# Verify that we did not use dual-channel-replication sync
assert {[status $primary sync_partial_ok] == $cur_psync}
Expand Down Expand Up @@ -921,8 +931,8 @@ start_server {tags {"dual-channel-replication external:skip"}} {
fail "replica didn't start sync session in time"
}

$primary debug log "killing replica rdb connection"
set replica_rdb_channel_id [get_client_id_by_last_cmd $primary "sync"]
$primary debug log "killing replica rdb connection $replica_rdb_channel_id"
assert {$replica_rdb_channel_id != ""}
set loglines [count_log_lines -1]
$primary client kill id $replica_rdb_channel_id
Expand Down Expand Up @@ -956,6 +966,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
$primary debug log "killing replica rdb connection $replica_rdb_channel_id"
$primary client kill id $replica_rdb_channel_id
# Wait for primary to abort the sync
wait_and_resume_process 0
wait_for_condition 10000000 10 {
[s -1 rdb_bgsave_in_progress] eq 0 &&
[string match {*replicas_waiting_psync:0*} [$primary info replication]]
Expand All @@ -965,7 +976,6 @@ start_server {tags {"dual-channel-replication external:skip"}} {
# Verify primary reject replconf set-rdb-client-id
set res [catch {$primary replconf set-rdb-client-id $replica_rdb_channel_id} err]
assert [string match *ERR* $err]
wait_and_resume_process 0
}
stop_write_load $load_handle
}
Expand All @@ -982,9 +992,9 @@ start_server {tags {"dual-channel-replication external:skip"}} {
$primary config set loglevel debug
$primary config set repl-diskless-sync-delay 0; # don't wait for other replicas

# Generating RDB will cost 5s(10000 * 0.0001s)
# Generating RDB will cost 100s
$primary debug populate 10000 primary 1
$primary config set rdb-key-save-delay 100
$primary config set rdb-key-save-delay 10000

start_server {} {
set replica_1 [srv 0 client]
Expand Down Expand Up @@ -1016,11 +1026,6 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}
$replica_2 replicaof $primary_host $primary_port
wait_for_log_messages -2 {"*Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC*"} $loglines 100 1000
$primary config set rdb-key-save-delay 0
# Verify second replica needed new session
wait_for_sync $replica_2
assert {[s -2 sync_partial_ok] eq 2}
assert {[s -2 sync_full] eq 2}
}
stop_write_load $load_handle
}
Expand All @@ -1038,9 +1043,9 @@ start_server {tags {"dual-channel-replication external:skip"}} {
$primary config set loglevel debug
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry

# Generating RDB will cost 5s(10000 * 0.0001s)
# Generating RDB will cost 100 sec to generate
$primary debug populate 10000 primary 1
$primary config set rdb-key-save-delay 100
$primary config set rdb-key-save-delay 10000

start_server {} {
set replica [srv 0 client]
Expand All @@ -1051,8 +1056,8 @@ start_server {tags {"dual-channel-replication external:skip"}} {
$replica config set dual-channel-replication-enabled yes
$replica config set loglevel debug
$replica config set repl-timeout 10
set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"]
test "Replica recover rdb-connection killed" {
set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"]
$replica replicaof $primary_host $primary_port
# Wait for sync session to start
wait_for_condition 500 1000 {
Expand All @@ -1076,18 +1081,21 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}
wait_for_log_messages -1 {"*Background RDB transfer error*"} $loglines 1000 10
# Replica should retry
verify_replica_online $primary 0 500
stop_write_load $load_handle
wait_for_condition 1000 100 {
[s -1 master_repl_offset] eq [s master_repl_offset]
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "Replica offset didn't catch up with the primary after too long time"
fail "replica didn't retry after connection close"
}
}
$replica replicaof no one

wait_for_condition 500 1000 {
[s -1 rdb_bgsave_in_progress] eq 0
} else {
fail "Primary should abort sync"
}
test "Replica recover main-connection killed" {
set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"]
$replica replicaof $primary_host $primary_port
# Wait for sync session to start
wait_for_condition 500 1000 {
Expand All @@ -1111,13 +1119,14 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}
wait_for_log_messages -1 {"*Background RDB transfer error*"} $loglines 1000 10
# Replica should retry
verify_replica_online $primary 0 500
stop_write_load $load_handle
wait_for_condition 1000 100 {
[s -1 master_repl_offset] eq [s master_repl_offset]
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "Replica offset didn't catch up with the primary after too long time"
}
fail "replica didn't retry after connection close"
}
}
stop_write_load $load_handle
}
}

0 comments on commit 9034e84

Please sign in to comment.