Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve dual channel replication stability and fix compatibility issues #804

Merged
merged 32 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
81667ef
Deflake test "Psync established after RDB load - beyond grace period"
naglera Jul 18, 2024
2ce7756
Deflake Dual channel replication test batch
naglera Jul 18, 2024
309a83e
Use pause process instead of brute force sleep
naglera Jul 21, 2024
4d3001a
test documentation fix
naglera Jul 21, 2024
81355d0
Deflake- Test dual-channel-replication primary reject set-rdb-client …
naglera Jul 22, 2024
773ad9c
Use smaller RDB for "Test replica's buffer limit reached"
naglera Jul 22, 2024
e0696c5
Update src/server.h
madolson Jul 23, 2024
021975d
Fix race condition in rdb client free
naglera Jul 23, 2024
67dd350
Fix race condition dualChannelSync abort after local replication buff…
naglera Jul 23, 2024
2931ecc
Complete dualChannelSync abort after local buffer stream fix: Handle …
naglera Jul 23, 2024
caa0c1a
Update src/debug.c
madolson Jul 23, 2024
c4f2ecb
Move repl_rdb_channel_state check to abortDualChannelSync
naglera Jul 24, 2024
605ee5f
clang format
naglera Jul 24, 2024
92855ac
Revert "Move repl_rdb_channel_state check to abortDualChannelSync"
naglera Jul 24, 2024
a682319
Fix uint64 on 32bit machine bug
naglera Jul 24, 2024
ab211b4
Verify sync is still in progress when sync aborted during local
naglera Jul 24, 2024
a8ef15d
NOT FOR MERGE - debug logs
naglera Jul 25, 2024
b32505f
NOT FOR MERGE - debug logs
naglera Jul 25, 2024
5d3baf9
Fix freeClientAsync double free
naglera Jul 25, 2024
dbac48b
NOT FOR MERGE - debug logs
naglera Jul 25, 2024
ff5bd68
NOT FOR MERGE - debug logs
naglera Jul 25, 2024
2ad0e0d
NOT FOR MERGE - debug logs
naglera Jul 25, 2024
5b0e447
NOT FOR MERGE - debug logs
naglera Jul 25, 2024
5b9cb0e
fix endoffset format
naglera Jul 25, 2024
4a5ec3c
fix endoffset format 2
naglera Jul 25, 2024
1bf7dbe
Revert "NOT FOR MERGE - debug logs"
naglera Jul 25, 2024
0d59c7f
Revert "NOT FOR MERGE - debug logs"
naglera Jul 25, 2024
d3eac4b
Revert "NOT FOR MERGE - debug logs"
naglera Jul 25, 2024
eebf226
Revert "NOT FOR MERGE - debug logs"
naglera Jul 25, 2024
08d0b3e
Revert "NOT FOR MERGE - debug logs"
naglera Jul 25, 2024
b2f739f
Revert "NOT FOR MERGE - debug logs"
naglera Jul 25, 2024
468c969
clang format
naglera Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ void debugCommand(client *c) {
" In case RESET is provided the peak reset time will be restored to the default value",
"REPLYBUFFER RESIZING <0|1>",
" Enable or disable the reply buffer resize cron job",
"SLEEP-AFTER-FORK-SECONDS <seconds>",
" Stop the server's main process for <seconds> after forking.",
"PAUSE-AFTER-FORK <0|1>",
" Stop the server's main process after fork.",
"DELAY-RDB-CLIENT-FREE-SECOND <seconds>",
" Grace period in seconds for replica main channel to establish psync.",
"DICT-RESIZING <0|1>",
Expand Down Expand Up @@ -995,12 +995,8 @@ void debugCommand(client *c) {
return;
}
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "sleep-after-fork-seconds") && c->argc == 3) {
double sleep_after_fork_seconds;
if (getDoubleFromObjectOrReply(c, c->argv[2], &sleep_after_fork_seconds, NULL) != C_OK) {
return;
}
server.debug_sleep_after_fork_us = (int)(sleep_after_fork_seconds * 1e6);
} else if (!strcasecmp(c->argv[1]->ptr, "pause-after-fork") && c->argc == 3) {
server.debug_pause_after_fork = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "delay-rdb-client-free-seconds") && c->argc == 3) {
server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr);
Expand Down Expand Up @@ -2305,6 +2301,12 @@ void applyWatchdogPeriod(void) {
}
}

void debugPauseProcess(void) {
serverLog(LL_NOTICE, "Process is about to stop.");
raise(SIGSTOP);
serverLog(LL_NOTICE, "Process has been continued.");
}

/* Positive input is sleep time in microseconds. Negative input is fractions
* of microseconds, i.e. -10 means 100 nanoseconds. */
void debugDelay(int usec) {
Expand Down
18 changes: 8 additions & 10 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1896,15 +1896,13 @@ int freeClientsInAsyncFreeQueue(void) {
c->rdb_client_disconnect_time = server.unixtime;
serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id,
replicationGetReplicaName(c), server.wait_before_rdb_client_free);
continue;
}
if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) {
serverLog(LL_NOTICE,
"Replica main channel failed to establish PSYNC within the grace period (%ld seconds). "
"Freeing RDB client %llu.",
(long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
c->flag.protected_rdb_channel = 0;
}
if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue;
serverLog(LL_NOTICE,
"Replica main channel failed to establish PSYNC within the grace period (%ld seconds). "
"Freeing RDB client %llu.",
(long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
c->flag.protected_rdb_channel = 0;
}

if (c->flag.protected) continue;
Expand Down Expand Up @@ -4332,9 +4330,9 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) {
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(), c);
/* Remove RDB connection protection on COB overrun */
c->flag.protected_rdb_channel = 0;

if (async) {
if (async || c->flag.protected_rdb_channel) {
c->flag.protected_rdb_channel = 0;
freeClientAsync(c);
serverLog(LL_WARNING, "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.",
client);
Expand Down
17 changes: 11 additions & 6 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ int startBgsaveForReplication(int mincapa, int req) {
/* Keep the page cache since it'll get used soon */
retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE);
}
if (server.debug_sleep_after_fork_us) usleep(server.debug_sleep_after_fork_us);
if (server.debug_pause_after_fork) debugPauseProcess();
} else {
serverLog(LL_WARNING, "BGSAVE for replication: replication information not available, can't generate the RDB "
"file right now. Try later.");
Expand Down Expand Up @@ -2654,7 +2654,7 @@ static void fullSyncWithPrimary(connection *conn) {
}
/* Receive end offset response */
if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_ENDOFF) {
int64_t rdb_client_id;
uint64_t rdb_client_id;
err = receiveSynchronousResponse(conn);
if (err == NULL) goto error;
if (err[0] == '\0') {
Expand All @@ -2667,7 +2667,7 @@ static void fullSyncWithPrimary(connection *conn) {
char primary_replid[CONFIG_RUN_ID_SIZE + 1];
int dbid;
/* Parse end offset response */
char *endoff_format = "$ENDOFF:%lld %40s %d %ld";
char *endoff_format = "$ENDOFF:%lld %40s %d %llu";
if (sscanf(err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) {
goto error;
}
Expand Down Expand Up @@ -2859,9 +2859,14 @@ void dualChannelSyncSuccess(void) {
server.primary_initial_offset = server.repl_provisional_primary.reploff;
replicationResurrectProvisionalPrimary();
/* Wait for the accumulated buffer to be processed before reading any more replication updates */
if (streamReplDataBufToDb(server.primary) == C_ERR) {
if (server.pending_repl_data.blocks && streamReplDataBufToDb(server.primary) == C_ERR) {
/* Sync session aborted during repl data streaming. */
serverLog(LL_WARNING, "Failed to stream local replication buffer into memory");
/* Verify sync is still in progress */
if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) {
replicationAbortDualChannelSyncTransfer();
replicationUnsetPrimary();
}
return;
}
freePendingReplDataBuf();
Expand Down Expand Up @@ -3160,7 +3165,7 @@ void setupMainConnForPsync(connection *conn) {
char *err = NULL;
if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) {
/* We already have an initialized connection at primary side, we only need to associate it with RDB connection */
ll2string(llstr, sizeof(llstr), server.rdb_client_id);
ull2string(llstr, sizeof(llstr), server.rdb_client_id);
err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL);
if (err) goto error;
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
Expand All @@ -3181,7 +3186,7 @@ void setupMainConnForPsync(connection *conn) {
}

if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (server.debug_sleep_after_fork_us) usleep(server.debug_sleep_after_fork_us);
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Write error.");
cancelReplicationHandshake(1);
Expand Down
5 changes: 3 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2005,8 +2005,8 @@ struct valkeyServer {
* use dual channel replication for full syncs. */
int wait_before_rdb_client_free; /* Grace period in seconds for replica main channel
* to establish psync. */
int debug_sleep_after_fork_us; /* Debug param that force the main process to
* sleep for N microseconds after fork() in repl. */
int debug_pause_after_fork; /* Debug param that pauses the main process
* after a replication fork() (for bgsave). */
size_t repl_buffer_mem; /* The memory of replication buffer. */
list *repl_buffer_blocks; /* Replication buffers blocks list
* (serving replica clients and repl backlog) */
Expand Down Expand Up @@ -3981,6 +3981,7 @@ void killThreads(void);
void makeThreadKillable(void);
void swapMainDbWithTempDb(serverDb *tempDb);
sds getVersion(void);
void debugPauseProcess(void);

/* Use macro for checking log level to avoid evaluating arguments in cases log
* should be ignored due to low level. */
Expand Down
10 changes: 0 additions & 10 deletions tests/helpers/bg_server_sleep.tcl

This file was deleted.

Loading
Loading