From de31ccdb127f231fa8c8f13603b71ccac76c45dc Mon Sep 17 00:00:00 2001 From: naglera <58042354+naglera@users.noreply.github.com> Date: Wed, 10 Jul 2024 11:32:48 +0300 Subject: [PATCH] Apply suggestions from code review Co-authored-by: Ping Xie Signed-off-by: naglera <58042354+naglera@users.noreply.github.com> --- src/rdb.c | 5 ++-- src/replication.c | 62 +++++++++++++++++++++-------------------------- 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 9cb66b5b65..52598c2d9f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3640,14 +3640,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } } else { serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid, - dual_conn ? "replica socket" : "pipe"); + dual_conn ? "direct socket to replica" : "pipe through parent process"); server.rdb_save_time_start = time(NULL); server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; if (dual_conn) { /* For dual connection sync, the main process no longer requires these RDB connections. */ zfree(conns); - } - else { + } else { close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); diff --git a/src/replication.c b/src/replication.c index 5eccf876ec..72cde818d0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -199,15 +199,14 @@ void rebaseReplicationBuffer(long long base_repl_offset) { } /* Replication: Primary side - connections association. - * On rdb-connection sync, connection association is used to keep replication data in - * the backlog until the replica requests PSYNC. Association happens in two forms, - * if there's an existing buffer block at the fork time, the replica is attached - * to the tail, if there is no tail, the replica will be attached when a new - * buffer block is created (see the Retrospect function below). - * Replica rdb client id is used as a unique key for the association. - * On COB overrun, association is deleted and the RDB connection - * is dropped. - */ + * During RDB connection sync, association is used to keep replication data + * in the backlog until the replica requests PSYNC. + * Association occurs in two forms: + * 1. If there's an existing buffer block at fork time, the replica is attached to the tail. + * 2. If there's no tail, the replica is attached when a new buffer block is created + * (see the Retrospect function below). + * The replica RDB client ID is used as a unique key for this association. + * If a COB overrun occurs, the association is deleted and the RDB connection is dropped. */ void addReplicaToPsyncWait(client *replica) { listNode *ln = NULL; replBufBlock *tail = NULL; @@ -220,8 +219,8 @@ void addReplicaToPsyncWait(client *replica) { tail->refcount++; } } - serverLog(LL_DEBUG, "Add replica %s to waiting psync rax, with cid %llu, %s ", replicationGetReplicaName(replica), - (unsigned long long)replica->id, tail ? "with repl-backlog tail" : "repl-backlog is empty"); + serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", replicationGetReplicaName(replica), + (unsigned long long)replica->id, tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); replica->ref_repl_buf_node = tail ? ln : NULL; /* Prevent rdb client from being freed before psync is established. */ replica->flags |= CLIENT_PROTECTED_RDB_CONN; @@ -244,7 +243,7 @@ void addReplicaToPsyncWaitRetrospect(void) { if (replica->ref_repl_buf_node) continue; replica->ref_repl_buf_node = ln; head->refcount++; - serverLog(LL_DEBUG, "Retrospect attach replica %llu to repl buf block", (long long unsigned int)replica->id); + serverLog(LL_DEBUG, "Attach rdb replica %llu to repl buf block", (long long unsigned int)replica->id); } raxStop(&iter); } @@ -2492,9 +2491,7 @@ sds getReplicaPortString(void) { /* Replication: Replica side. * Free replica's local replication buffer */ void freePendingReplDataBuf(void) { - if (server.pending_repl_data.blocks) { - listRelease(server.pending_repl_data.blocks); - } + listRelease(server.pending_repl_data.blocks); server.pending_repl_data.blocks = NULL; server.pending_repl_data.len = 0; } @@ -2504,15 +2501,13 @@ void freePendingReplDataBuf(void) { * provisional primary struct, and free local replication buffer. */ void abortRdbConnectionSync(void) { serverAssert(server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE); - serverLog(LL_WARNING, "Aborting dual connection sync"); + serverLog(LL_NOTICE, "Aborting dual connection sync"); if (server.repl_rdb_transfer_s) { connClose(server.repl_rdb_transfer_s); server.repl_rdb_transfer_s = NULL; } - if (server.repl_transfer_tmpfile) { - zfree(server.repl_transfer_tmpfile); - server.repl_transfer_tmpfile = NULL; - } + zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; if (server.repl_transfer_fd != -1) { close(server.repl_transfer_fd); server.repl_transfer_fd = -1; @@ -2546,14 +2541,14 @@ int sendCurrentOffsetToReplica(client *replica) { /* Replication: Replica side. * This connection handler is used to initialize the RDB connection (dual-conn-sync-enabled sync). - * Once a replica with repl rdb-connection enabled, denied from PSYNC with its primary, - * fullSyncWithPrimary begins its role. The connection handler prepare server.repl_rdb_transfer_s + * Once a replica with repl rdb-connection is enabled and denied from PSYNC with its primary, + * fullSyncWithPrimary begins its role. The connection handler prepares server.repl_rdb_transfer_s * for a rdb stream, and server.repl_transfer_s for increamental replication data stream. */ void fullSyncWithPrimary(connection *conn) { char *err = NULL; serverAssert(conn == server.repl_rdb_transfer_s); /* If this event fired after the user turned the instance into a primary - * with SLAVEOF NO ONE we must just return ASAP. */ + * with REPLICAOF NO ONE we must just return ASAP. */ if (server.repl_state == REPL_STATE_NONE) { goto error; } @@ -2568,8 +2563,8 @@ void fullSyncWithPrimary(connection *conn) { serverLog(LL_DEBUG, "Received first reply from primary using rdb connection."); /* AUTH with the primary if required. */ if (server.primary_auth) { - char *args[3] = {"AUTH", NULL, NULL}; - size_t lens[3] = {4, 0, 0}; + char *args[] = {"AUTH", NULL, NULL}; + size_t lens[] = {4, 0, 0}; int argc = 1; if (server.primary_user) { args[argc] = server.primary_user; @@ -2611,7 +2606,6 @@ void fullSyncWithPrimary(connection *conn) { goto error; } sdsfree(err); - err = NULL; server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY; return; } @@ -2627,7 +2621,6 @@ void fullSyncWithPrimary(connection *conn) { goto error; } sdsfree(err); - err = NULL; if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn)); @@ -2726,9 +2719,9 @@ void replDataBufInit(void) { void replStreamProgressCallback(size_t offset, int readlen, time_t *last_progress_callback) { time_t now = mstime(); if (server.loading_process_events_interval_bytes && - (offset + readlen) / server.loading_process_events_interval_bytes > - offset / server.loading_process_events_interval_bytes && - now - *last_progress_callback > server.loading_process_events_interval_milliseconds) { + ((offset + readlen) / server.loading_process_events_interval_bytes > + offset / server.loading_process_events_interval_bytes) && + (now - *last_progress_callback > server.loading_process_events_interval_milliseconds)) { replicationSendNewlineToPrimary(); processEventsWhileBlocked(); *last_progress_callback = now; @@ -2741,7 +2734,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) { int nread = connRead(conn, o->buf + o->used, read); if (nread == -1) { if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_VERBOSE, "Error reading from primary: %s", connGetLastError(conn)); + serverLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn)); cancelReplicationHandshake(1); } return C_ERR; @@ -3193,7 +3186,7 @@ void setupMainConnForPsync(connection *conn) { if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ if (psync_result == PSYNC_CONTINUE) { - serverLog(LL_NOTICE, "Primary <-> REPLICA sync: Primary accepted a Partial Resynchronization%s", + serverLog(LL_NOTICE, "Primary <-> REPLICA sync: Primary accepted a partial re-synchronization%s", server.repl_rdb_transfer_s != NULL ? ", RDB load in background." : "."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { serverCommunicateSystemd("STATUS=Primary <-> REPLICA sync: Partial Resynchronization accepted. Ready to " @@ -3469,9 +3462,8 @@ void syncWithPrimary(connection *conn) { if (server.repl_state == REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY) { err = receiveSynchronousResponse(conn); - if (err == NULL) - goto error; - else if (err[0] == '-') { + if (err == NULL) goto error; + if (err[0] == '-') { serverLog(LL_NOTICE, "(Non critical) Primary is not capable of rdb-connection sync"); } sdsfree(err);