Skip to content

Commit

Permalink
clang format
Browse files Browse the repository at this point in the history
Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Jul 10, 2024
1 parent c02a509 commit e8200bc
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 62 deletions.
15 changes: 7 additions & 8 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ int authRequired(client *c) {
}

static inline int isReplicaReadyForReplData(client *replica) {
return (replica->repl_state == REPLICA_STATE_ONLINE ||
replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) && !(replica->flag.close_asap);
return (replica->repl_state == REPLICA_STATE_ONLINE || replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) &&
!(replica->flag.close_asap);
}

client *createClient(connection *conn) {
Expand Down Expand Up @@ -262,8 +262,8 @@ void putClientInPendingWriteQueue(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for replicas, if the replica can actually receive
* writes at this stage. */
if (!c->flag.pending_write && (c->repl_state == REPL_STATE_NONE ||
(isReplicaReadyForReplData(c) && !c->repl_start_cmd_stream_on_ack))) {
if (!c->flag.pending_write &&
(c->repl_state == REPL_STATE_NONE || (isReplicaReadyForReplData(c) && !c->repl_start_cmd_stream_on_ack))) {
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
Expand Down Expand Up @@ -1646,7 +1646,7 @@ void freeClient(client *c) {
if (getClientType(c) == CLIENT_TYPE_REPLICA) {
serverLog(LL_NOTICE,
c->flag.repl_rdb_conn ? "Replica %s rdb connection disconnected."
: "Connection with replica %s lost.",
: "Connection with replica %s lost.",
replicationGetReplicaName(c));
}

Expand Down Expand Up @@ -4366,9 +4366,8 @@ void flushReplicasOutputBuffers(void) {
*
* 3. Obviously if the replica is not ONLINE.
*/
if (isReplicaReadyForReplData(replica) &&
!(replica->flag.close_asap) && can_receive_writes && !replica->repl_start_cmd_stream_on_ack &&
clientHasPendingReplies(replica)) {
if (isReplicaReadyForReplData(replica) && !(replica->flag.close_asap) && can_receive_writes &&
!replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) {
writeToClient(replica);
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3529,7 +3529,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
rdb_pipe_write = pipefds[1]; /* write end */

/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
* that it can exit. */
if (anetPipe(pipefds, 0, 0) == -1) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
Expand Down Expand Up @@ -3652,7 +3652,8 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
zfree(conns);
} else {
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) {
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) ==
AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ static inline client *lookupRdbClientByID(uint64_t id) {

/* Replication: Primary side - connections association.
* During RDB connection sync, association is used to keep replication data
* in the backlog until the replica requests PSYNC.
* in the backlog until the replica requests PSYNC.
* Association occurs in two forms:
* 1. If there's an existing buffer block at fork time, the replica is attached to the tail.
* 2. If there's no tail, the replica is attached when a new buffer block is created
* 2. If there's no tail, the replica is attached when a new buffer block is created
* (see the Retrospect function below).
* The replica RDB client ID is used as a unique key for this association.
* If a COB overrun occurs, the association is deleted and the RDB connection is dropped. */
Expand All @@ -228,8 +228,9 @@ void addRdbReplicaToPsyncWait(client *rdb_replica) {
tail->refcount++;
}
}
serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", replicationGetReplicaName(rdb_replica),
(unsigned long long)rdb_replica->id, tail ? "tracking repl-backlog tail" : "no repl-backlog to track");
serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ",
replicationGetReplicaName(rdb_replica), (unsigned long long)rdb_replica->id,
tail ? "tracking repl-backlog tail" : "no repl-backlog to track");
rdb_replica->ref_repl_buf_node = tail ? ln : NULL;
/* Prevent rdb client from being freed before psync is established. */
rdb_replica->flag.protected_rdb_conn = 1;
Expand Down Expand Up @@ -2730,7 +2731,7 @@ void replStreamProgressCallback(size_t offset, int readlen, time_t *last_progres
time_t now = mstime();
if (server.loading_process_events_interval_bytes &&
((offset + readlen) / server.loading_process_events_interval_bytes >
offset / server.loading_process_events_interval_bytes) &&
offset / server.loading_process_events_interval_bytes) &&
(now - *last_progress_callback > server.loading_process_events_interval_ms)) {
replicationSendNewlineToPrimary();
processEventsWhileBlocked();
Expand Down
95 changes: 48 additions & 47 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1214,20 +1214,21 @@ typedef struct ClientFlags {
uint64_t reprocessing_command : 1; /* The client is re-processing the command. */
uint64_t replication_done : 1; /* Indicate that replication has been done on the client */
uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */
uint64_t protected_rdb_conn : 1; /* Dual connection sync: Protects the RDB client from premature \
* release during full sync. This flag is used to ensure that the RDB client, which \
* references the first replication data block required by the replica, is not \
* released prematurely. Protecting the client is crucial for prevention of \
* synchronization failures: \
* If the RDB client is released before the replica initiates PSYNC, the primary \
* will reduce the reference count (o->refcount) of the block needed by the replica. \
* This could potentially lead to the removal of the required data block, resulting \
* in synchronization failures. Such failures could occur even in scenarios where \
* the replica only needs an additional 4KB beyond the minimum size of the repl_backlog. \
* By using this flag, we ensure that the RDB client remains intact until the replica \
* has successfully initiated PSYNC. */
uint64_t repl_rdb_conn : 1; /* Dual connection sync: track a connection which is used for rdb snapshot */
uint64_t reserved : 7; /* Reserved for future use */
uint64_t
protected_rdb_conn : 1; /* Dual connection sync: Protects the RDB client from premature \
* release during full sync. This flag is used to ensure that the RDB client, which \
* references the first replication data block required by the replica, is not \
* released prematurely. Protecting the client is crucial for prevention of \
* synchronization failures: \
* If the RDB client is released before the replica initiates PSYNC, the primary \
* will reduce the reference count (o->refcount) of the block needed by the replica. \
* This could potentially lead to the removal of the required data block, resulting \
* in synchronization failures. Such failures could occur even in scenarios where \
* the replica only needs an additional 4KB beyond the minimum size of the repl_backlog.
* By using this flag, we ensure that the RDB client remains intact until the replica \
* has successfully initiated PSYNC. */
uint64_t repl_rdb_conn : 1; /* Dual connection sync: track a connection which is used for rdb snapshot */
uint64_t reserved : 7; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down Expand Up @@ -1917,39 +1918,39 @@ struct valkeyServer {
default no. (for testings). */

/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */
long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
int rdb_del_sync_files; /* Remove RDB files used only for SYNC if
the instance does not use persistence. */
time_t lastsave; /* Unix time of last successful save */
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
time_t rdb_save_time_last; /* Time used by last RDB save run. */
time_t rdb_save_time_start; /* Current RDB save start time. */
int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */
int rdb_child_type; /* Type of save by active child. */
int lastbgsave_status; /* C_OK or C_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */
/* to the parent process in diskless repl. */
int rdb_child_exit_pipe; /* Used by the diskless parent allow child exit. */
connection **rdb_pipe_conns; /* Connections which are currently the */
int rdb_pipe_numconns; /* target of diskless rdb fork child. */
int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */
char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */
int rdb_pipe_bufflen; /* that was read from the rdb pipe. */
int rdb_key_save_delay; /* Delay in microseconds between keys while
* writing aof or rdb. (for testings). negative
* value means fractions of microseconds (on average). */
int key_load_delay; /* Delay in microseconds between keys while
* loading aof or rdb. (for testings). negative
* value means fractions of microseconds (on average). */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */
long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
int rdb_del_sync_files; /* Remove RDB files used only for SYNC if
the instance does not use persistence. */
time_t lastsave; /* Unix time of last successful save */
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
time_t rdb_save_time_last; /* Time used by last RDB save run. */
time_t rdb_save_time_start; /* Current RDB save start time. */
int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */
int rdb_child_type; /* Type of save by active child. */
int lastbgsave_status; /* C_OK or C_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */
/* to the parent process in diskless repl. */
int rdb_child_exit_pipe; /* Used by the diskless parent allow child exit. */
connection **rdb_pipe_conns; /* Connections which are currently the */
int rdb_pipe_numconns; /* target of diskless rdb fork child. */
int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */
char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */
int rdb_pipe_bufflen; /* that was read from the rdb pipe. */
int rdb_key_save_delay; /* Delay in microseconds between keys while
* writing aof or rdb. (for testings). negative
* value means fractions of microseconds (on average). */
int key_load_delay; /* Delay in microseconds between keys while
* loading aof or rdb. (for testings). negative
* value means fractions of microseconds (on average). */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
int child_info_nread; /* Num of bytes of the last read from pipe */
Expand Down

0 comments on commit e8200bc

Please sign in to comment.