Skip to content

Commit

Permalink
Replica flush the old data after RDB file is ok in disk-based replica…
Browse files Browse the repository at this point in the history
…tion (valkey-io#926)

Call emptyData right before rdbLoad to prevent errors in the middle
and we drop the replication stream and leaving an empty database.
The real changes is in disk-based part, the rest is just code movement.

Signed-off-by: Binbin <[email protected]>
Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
enjoy-binbin authored and PingXie committed Sep 14, 2024
1 parent b7fb262 commit 6ddbaa3
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
29 changes: 23 additions & 6 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2180,11 +2180,6 @@ void readSyncBulkPayload(connection *conn) {
temp_functions_lib_ctx = functionsLibCtxCreate();

moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL);
} else {
replicationAttachToNewPrimary();

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}

/* Before loading the DB into memory we need to delete the readable
Expand All @@ -2193,7 +2188,6 @@ void readSyncBulkPayload(connection *conn) {
* time for non blocking loading. */
connSetReadHandler(conn, NULL);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
Expand All @@ -2213,6 +2207,14 @@ void readSyncBulkPayload(connection *conn) {
dbarray = diskless_load_tempDb;
functions_lib_ctx = temp_functions_lib_ctx;
} else {
/* We will soon start loading the RDB from socket, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();

/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent();
functionsLibCtxClear(functions_lib_ctx);
Expand All @@ -2224,6 +2226,8 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout * 1000);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);

int loadingFailed = 0;
Expand Down Expand Up @@ -2256,6 +2260,7 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
} else {
/* Remove the half-loaded data in case we started with an empty replica. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}

Expand Down Expand Up @@ -2332,6 +2337,17 @@ void readSyncBulkPayload(connection *conn) {
return;
}

/* We will soon start loading the RDB from disk, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();

/* Empty the databases only after the RDB file is ok, that is, before the RDB file
* is actually loaded, in case we encounter an error and drop the replication stream
* and leave an empty database. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
"DB from disk, check server logs.");
Expand All @@ -2344,6 +2360,7 @@ void readSyncBulkPayload(connection *conn) {
}

/* If disk-based RDB loading fails, remove the half-loaded dataset. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

/* Note that there's no point in restarting the AOF on sync failure,
Expand Down
40 changes: 40 additions & 0 deletions tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1477,3 +1477,43 @@ start_server {tags {"repl external:skip"}} {
}
}
}

start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
$replica set replica_key replica_value

start_server {} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
$primary set primary_key primary_value

test {Replica keep the old data if RDB file save fails in disk-based replication} {
# Create a folder called 'dump.rdb' to trigger temp-rdb rename failure
# and it will cause RDB file save to fail at the rename.
set dump_rdb [file join [lindex [$replica config get dir] 1] dump.rdb]
if {[file exists $dump_rdb]} { exec rm -f $dump_rdb }
exec mkdir -p $dump_rdb

$replica replicaof $primary_host $primary_port

# Waiting for the rename to fail.
wait_for_log_messages -1 {"*Failed trying to rename the temp DB into dump.rdb*"} 0 1000 10

# Make sure the replica has not completed sync and keep the old data.
assert_equal {} [$replica get primary_key]
assert_equal {replica_value} [$replica get replica_key]

# Remove the test folder and make the rename success
exec rm -rf $dump_rdb
wait_for_condition 500 100 {
[$replica get primary_key] == {primary_value} &&
[$replica get replica_key] == {}
} else {
puts [$primary keys *]
puts [$replica keys *]
fail "Replication failed."
}
}
}
}

0 comments on commit 6ddbaa3

Please sign in to comment.