diff --git a/src/networking.c b/src/networking.c index dbf80f5b91..5f4a5e640c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -120,8 +120,8 @@ int authRequired(client *c) { } static inline int isReplicaReadyForReplData(client *replica) { - return (replica->repl_state == REPLICA_STATE_ONLINE || - replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) && !(replica->flag.close_asap); + return (replica->repl_state == REPLICA_STATE_ONLINE || replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) && + !(replica->flag.close_asap); } client *createClient(connection *conn) { @@ -262,8 +262,8 @@ void putClientInPendingWriteQueue(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for replicas, if the replica can actually receive * writes at this stage. */ - if (!c->flag.pending_write && (c->repl_state == REPL_STATE_NONE || - (isReplicaReadyForReplData(c) && !c->repl_start_cmd_stream_on_ack))) { + if (!c->flag.pending_write && + (c->repl_state == REPL_STATE_NONE || (isReplicaReadyForReplData(c) && !c->repl_start_cmd_stream_on_ack))) { /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something * to write to the socket. This way before re-entering the event @@ -1646,7 +1646,7 @@ void freeClient(client *c) { if (getClientType(c) == CLIENT_TYPE_REPLICA) { serverLog(LL_NOTICE, c->flag.repl_rdb_conn ? "Replica %s rdb connection disconnected." - : "Connection with replica %s lost.", + : "Connection with replica %s lost.", replicationGetReplicaName(c)); } @@ -4366,9 +4366,8 @@ void flushReplicasOutputBuffers(void) { * * 3. Obviously if the replica is not ONLINE. */ - if (isReplicaReadyForReplData(replica) && - !(replica->flag.close_asap) && can_receive_writes && !replica->repl_start_cmd_stream_on_ack && - clientHasPendingReplies(replica)) { + if (isReplicaReadyForReplData(replica) && !(replica->flag.close_asap) && can_receive_writes && + !replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) { writeToClient(replica); } } diff --git a/src/rdb.c b/src/rdb.c index 66a0bbbba6..63f471f86c 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3529,7 +3529,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { rdb_pipe_write = pipefds[1]; /* write end */ /* create another pipe that is used by the parent to signal to the child - * that it can exit. */ + * that it can exit. */ if (anetPipe(pipefds, 0, 0) == -1) { close(rdb_pipe_write); close(server.rdb_pipe_read); @@ -3652,7 +3652,8 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { zfree(conns); } else { close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ - if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { + if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == + AE_ERR) { serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); } } diff --git a/src/replication.c b/src/replication.c index a949b57fb6..4deafdd54d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -209,10 +209,10 @@ static inline client *lookupRdbClientByID(uint64_t id) { /* Replication: Primary side - connections association. * During RDB connection sync, association is used to keep replication data - * in the backlog until the replica requests PSYNC. + * in the backlog until the replica requests PSYNC. * Association occurs in two forms: * 1. If there's an existing buffer block at fork time, the replica is attached to the tail. - * 2. If there's no tail, the replica is attached when a new buffer block is created + * 2. If there's no tail, the replica is attached when a new buffer block is created * (see the Retrospect function below). * The replica RDB client ID is used as a unique key for this association. * If a COB overrun occurs, the association is deleted and the RDB connection is dropped. */ @@ -228,8 +228,9 @@ void addRdbReplicaToPsyncWait(client *rdb_replica) { tail->refcount++; } } - serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", replicationGetReplicaName(rdb_replica), - (unsigned long long)rdb_replica->id, tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); + serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", + replicationGetReplicaName(rdb_replica), (unsigned long long)rdb_replica->id, + tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); rdb_replica->ref_repl_buf_node = tail ? ln : NULL; /* Prevent rdb client from being freed before psync is established. */ rdb_replica->flag.protected_rdb_conn = 1; @@ -2730,7 +2731,7 @@ void replStreamProgressCallback(size_t offset, int readlen, time_t *last_progres time_t now = mstime(); if (server.loading_process_events_interval_bytes && ((offset + readlen) / server.loading_process_events_interval_bytes > - offset / server.loading_process_events_interval_bytes) && + offset / server.loading_process_events_interval_bytes) && (now - *last_progress_callback > server.loading_process_events_interval_ms)) { replicationSendNewlineToPrimary(); processEventsWhileBlocked(); diff --git a/src/server.h b/src/server.h index 98aa91e943..b36d838abc 100644 --- a/src/server.h +++ b/src/server.h @@ -1214,20 +1214,21 @@ typedef struct ClientFlags { uint64_t reprocessing_command : 1; /* The client is re-processing the command. */ uint64_t replication_done : 1; /* Indicate that replication has been done on the client */ uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */ - uint64_t protected_rdb_conn : 1; /* Dual connection sync: Protects the RDB client from premature \ - * release during full sync. This flag is used to ensure that the RDB client, which \ - * references the first replication data block required by the replica, is not \ - * released prematurely. Protecting the client is crucial for prevention of \ - * synchronization failures: \ - * If the RDB client is released before the replica initiates PSYNC, the primary \ - * will reduce the reference count (o->refcount) of the block needed by the replica. \ - * This could potentially lead to the removal of the required data block, resulting \ - * in synchronization failures. Such failures could occur even in scenarios where \ - * the replica only needs an additional 4KB beyond the minimum size of the repl_backlog. \ - * By using this flag, we ensure that the RDB client remains intact until the replica \ - * has successfully initiated PSYNC. */ - uint64_t repl_rdb_conn : 1; /* Dual connection sync: track a connection which is used for rdb snapshot */ - uint64_t reserved : 7; /* Reserved for future use */ + uint64_t + protected_rdb_conn : 1; /* Dual connection sync: Protects the RDB client from premature \ + * release during full sync. This flag is used to ensure that the RDB client, which \ + * references the first replication data block required by the replica, is not \ + * released prematurely. Protecting the client is crucial for prevention of \ + * synchronization failures: \ + * If the RDB client is released before the replica initiates PSYNC, the primary \ + * will reduce the reference count (o->refcount) of the block needed by the replica. \ + * This could potentially lead to the removal of the required data block, resulting \ + * in synchronization failures. Such failures could occur even in scenarios where \ + * the replica only needs an additional 4KB beyond the minimum size of the repl_backlog. + * By using this flag, we ensure that the RDB client remains intact until the replica \ + * has successfully initiated PSYNC. */ + uint64_t repl_rdb_conn : 1; /* Dual connection sync: track a connection which is used for rdb snapshot */ + uint64_t reserved : 7; /* Reserved for future use */ } ClientFlags; typedef struct client { @@ -1917,39 +1918,39 @@ struct valkeyServer { default no. (for testings). */ /* RDB persistence */ - long long dirty; /* Changes to DB from the last save */ - long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ - long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */ - long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */ - struct saveparam *saveparams; /* Save points array for RDB */ - int saveparamslen; /* Number of saving points */ - char *rdb_filename; /* Name of RDB file */ - int rdb_compression; /* Use compression in RDB? */ - int rdb_checksum; /* Use RDB checksum? */ - int rdb_del_sync_files; /* Remove RDB files used only for SYNC if - the instance does not use persistence. */ - time_t lastsave; /* Unix time of last successful save */ - time_t lastbgsave_try; /* Unix time of last attempted bgsave */ - time_t rdb_save_time_last; /* Time used by last RDB save run. */ - time_t rdb_save_time_start; /* Current RDB save start time. */ - int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */ - int rdb_child_type; /* Type of save by active child. */ - int lastbgsave_status; /* C_OK or C_ERR */ - int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ - int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */ - /* to the parent process in diskless repl. */ - int rdb_child_exit_pipe; /* Used by the diskless parent allow child exit. */ - connection **rdb_pipe_conns; /* Connections which are currently the */ - int rdb_pipe_numconns; /* target of diskless rdb fork child. */ - int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */ - char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */ - int rdb_pipe_bufflen; /* that was read from the rdb pipe. */ - int rdb_key_save_delay; /* Delay in microseconds between keys while - * writing aof or rdb. (for testings). negative - * value means fractions of microseconds (on average). */ - int key_load_delay; /* Delay in microseconds between keys while - * loading aof or rdb. (for testings). negative - * value means fractions of microseconds (on average). */ + long long dirty; /* Changes to DB from the last save */ + long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ + long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */ + long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */ + struct saveparam *saveparams; /* Save points array for RDB */ + int saveparamslen; /* Number of saving points */ + char *rdb_filename; /* Name of RDB file */ + int rdb_compression; /* Use compression in RDB? */ + int rdb_checksum; /* Use RDB checksum? */ + int rdb_del_sync_files; /* Remove RDB files used only for SYNC if + the instance does not use persistence. */ + time_t lastsave; /* Unix time of last successful save */ + time_t lastbgsave_try; /* Unix time of last attempted bgsave */ + time_t rdb_save_time_last; /* Time used by last RDB save run. */ + time_t rdb_save_time_start; /* Current RDB save start time. */ + int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */ + int rdb_child_type; /* Type of save by active child. */ + int lastbgsave_status; /* C_OK or C_ERR */ + int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ + int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */ + /* to the parent process in diskless repl. */ + int rdb_child_exit_pipe; /* Used by the diskless parent allow child exit. */ + connection **rdb_pipe_conns; /* Connections which are currently the */ + int rdb_pipe_numconns; /* target of diskless rdb fork child. */ + int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */ + char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */ + int rdb_pipe_bufflen; /* that was read from the rdb pipe. */ + int rdb_key_save_delay; /* Delay in microseconds between keys while + * writing aof or rdb. (for testings). negative + * value means fractions of microseconds (on average). */ + int key_load_delay; /* Delay in microseconds between keys while + * loading aof or rdb. (for testings). negative + * value means fractions of microseconds (on average). */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ int child_info_nread; /* Num of bytes of the last read from pipe */