Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Ping Xie <[email protected]>
Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera and PingXie authored Jul 10, 2024
1 parent a56e3d0 commit de31ccd
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 38 deletions.
5 changes: 2 additions & 3 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3640,14 +3640,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
} else {
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid,
dual_conn ? "replica socket" : "pipe");
dual_conn ? "direct socket to replica" : "pipe through parent process");
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
if (dual_conn) {
/* For dual connection sync, the main process no longer requires these RDB connections. */
zfree(conns);
}
else {
} else {
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
Expand Down
62 changes: 27 additions & 35 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,14 @@ void rebaseReplicationBuffer(long long base_repl_offset) {
}

/* Replication: Primary side - connections association.
* On rdb-connection sync, connection association is used to keep replication data in
* the backlog until the replica requests PSYNC. Association happens in two forms,
* if there's an existing buffer block at the fork time, the replica is attached
* to the tail, if there is no tail, the replica will be attached when a new
* buffer block is created (see the Retrospect function below).
* Replica rdb client id is used as a unique key for the association.
* On COB overrun, association is deleted and the RDB connection
* is dropped.
*/
* During RDB connection sync, association is used to keep replication data
* in the backlog until the replica requests PSYNC.
* Association occurs in two forms:
* 1. If there's an existing buffer block at fork time, the replica is attached to the tail.
* 2. If there's no tail, the replica is attached when a new buffer block is created
* (see the Retrospect function below).
* The replica RDB client ID is used as a unique key for this association.
* If a COB overrun occurs, the association is deleted and the RDB connection is dropped. */
void addReplicaToPsyncWait(client *replica) {
listNode *ln = NULL;
replBufBlock *tail = NULL;
Expand All @@ -220,8 +219,8 @@ void addReplicaToPsyncWait(client *replica) {
tail->refcount++;
}
}
serverLog(LL_DEBUG, "Add replica %s to waiting psync rax, with cid %llu, %s ", replicationGetReplicaName(replica),
(unsigned long long)replica->id, tail ? "with repl-backlog tail" : "repl-backlog is empty");
serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", replicationGetReplicaName(replica),
(unsigned long long)replica->id, tail ? "tracking repl-backlog tail" : "no repl-backlog to track");
replica->ref_repl_buf_node = tail ? ln : NULL;
/* Prevent rdb client from being freed before psync is established. */
replica->flags |= CLIENT_PROTECTED_RDB_CONN;
Expand All @@ -244,7 +243,7 @@ void addReplicaToPsyncWaitRetrospect(void) {
if (replica->ref_repl_buf_node) continue;
replica->ref_repl_buf_node = ln;
head->refcount++;
serverLog(LL_DEBUG, "Retrospect attach replica %llu to repl buf block", (long long unsigned int)replica->id);
serverLog(LL_DEBUG, "Attach rdb replica %llu to repl buf block", (long long unsigned int)replica->id);
}
raxStop(&iter);
}
Expand Down Expand Up @@ -2492,9 +2491,7 @@ sds getReplicaPortString(void) {
/* Replication: Replica side.
* Free replica's local replication buffer */
void freePendingReplDataBuf(void) {
if (server.pending_repl_data.blocks) {
listRelease(server.pending_repl_data.blocks);
}
listRelease(server.pending_repl_data.blocks);
server.pending_repl_data.blocks = NULL;
server.pending_repl_data.len = 0;
}
Expand All @@ -2504,15 +2501,13 @@ void freePendingReplDataBuf(void) {
* provisional primary struct, and free local replication buffer. */
void abortRdbConnectionSync(void) {
serverAssert(server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE);
serverLog(LL_WARNING, "Aborting dual connection sync");
serverLog(LL_NOTICE, "Aborting dual connection sync");
if (server.repl_rdb_transfer_s) {
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;
}
if (server.repl_transfer_tmpfile) {
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
}
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
if (server.repl_transfer_fd != -1) {
close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
Expand Down Expand Up @@ -2546,14 +2541,14 @@ int sendCurrentOffsetToReplica(client *replica) {

/* Replication: Replica side.
* This connection handler is used to initialize the RDB connection (dual-conn-sync-enabled sync).
* Once a replica with repl rdb-connection enabled, denied from PSYNC with its primary,
* fullSyncWithPrimary begins its role. The connection handler prepare server.repl_rdb_transfer_s
* Once a replica with repl rdb-connection is enabled and denied from PSYNC with its primary,
* fullSyncWithPrimary begins its role. The connection handler prepares server.repl_rdb_transfer_s
* for a rdb stream, and server.repl_transfer_s for increamental replication data stream. */
void fullSyncWithPrimary(connection *conn) {
char *err = NULL;
serverAssert(conn == server.repl_rdb_transfer_s);
/* If this event fired after the user turned the instance into a primary
* with SLAVEOF NO ONE we must just return ASAP. */
* with REPLICAOF NO ONE we must just return ASAP. */
if (server.repl_state == REPL_STATE_NONE) {
goto error;
}
Expand All @@ -2568,8 +2563,8 @@ void fullSyncWithPrimary(connection *conn) {
serverLog(LL_DEBUG, "Received first reply from primary using rdb connection.");
/* AUTH with the primary if required. */
if (server.primary_auth) {
char *args[3] = {"AUTH", NULL, NULL};
size_t lens[3] = {4, 0, 0};
char *args[] = {"AUTH", NULL, NULL};
size_t lens[] = {4, 0, 0};
int argc = 1;
if (server.primary_user) {
args[argc] = server.primary_user;
Expand Down Expand Up @@ -2611,7 +2606,6 @@ void fullSyncWithPrimary(connection *conn) {
goto error;
}
sdsfree(err);
err = NULL;
server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY;
return;
}
Expand All @@ -2627,7 +2621,6 @@ void fullSyncWithPrimary(connection *conn) {
goto error;
}
sdsfree(err);
err = NULL;

if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) {
serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn));
Expand Down Expand Up @@ -2726,9 +2719,9 @@ void replDataBufInit(void) {
void replStreamProgressCallback(size_t offset, int readlen, time_t *last_progress_callback) {
time_t now = mstime();
if (server.loading_process_events_interval_bytes &&
(offset + readlen) / server.loading_process_events_interval_bytes >
offset / server.loading_process_events_interval_bytes &&
now - *last_progress_callback > server.loading_process_events_interval_milliseconds) {
((offset + readlen) / server.loading_process_events_interval_bytes >
offset / server.loading_process_events_interval_bytes) &&
(now - *last_progress_callback > server.loading_process_events_interval_milliseconds)) {
replicationSendNewlineToPrimary();
processEventsWhileBlocked();
*last_progress_callback = now;
Expand All @@ -2741,7 +2734,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) {
int nread = connRead(conn, o->buf + o->used, read);
if (nread == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Error reading from primary: %s", connGetLastError(conn));
serverLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn));
cancelReplicationHandshake(1);
}
return C_ERR;
Expand Down Expand Up @@ -3193,7 +3186,7 @@ void setupMainConnForPsync(connection *conn) {
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "Primary <-> REPLICA sync: Primary accepted a Partial Resynchronization%s",
serverLog(LL_NOTICE, "Primary <-> REPLICA sync: Primary accepted a partial re-synchronization%s",
server.repl_rdb_transfer_s != NULL ? ", RDB load in background." : ".");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
serverCommunicateSystemd("STATUS=Primary <-> REPLICA sync: Partial Resynchronization accepted. Ready to "
Expand Down Expand Up @@ -3469,9 +3462,8 @@ void syncWithPrimary(connection *conn) {

if (server.repl_state == REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL)
goto error;
else if (err[0] == '-') {
if (err == NULL) goto error;
if (err[0] == '-') {
serverLog(LL_NOTICE, "(Non critical) Primary is not capable of rdb-connection sync");
}
sdsfree(err);
Expand Down

0 comments on commit de31ccd

Please sign in to comment.