diff --git a/src/server.h b/src/server.h index 6e04f24e2c..f0c2709ec4 100644 --- a/src/server.h +++ b/src/server.h @@ -131,7 +131,8 @@ struct hdr_histogram; #define CONFIG_BGSAVE_RETRY_DELAY 5 /* Wait a few secs before trying again. */ #define CONFIG_DEFAULT_PID_FILE "/var/run/valkey.pid" #define CONFIG_DEFAULT_BINDADDR_COUNT 2 -#define CONFIG_DEFAULT_BINDADDR {"*", "-::*"} +#define CONFIG_DEFAULT_BINDADDR \ + { "*", "-::*" } #define NET_HOST_STR_LEN 256 /* Longest valid hostname */ #define NET_IP_STR_LEN 46 /* INET6_ADDRSTRLEN is 46, but we need to be sure */ #define NET_ADDR_STR_LEN (NET_IP_STR_LEN + 32) /* Must be enough for ip:port */ @@ -139,8 +140,9 @@ struct hdr_histogram; #define CONFIG_BINDADDR_MAX 16 #define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}" -#define DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE 60 /* Grace period in seconds for replica main - connection to establish psync. */ +#define DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE \ + 60 /* Grace period in seconds for replica main \ + connection to establish psync. */ #define INCREMENTAL_REHASHING_THRESHOLD_US 1000 /* Bucket sizes for client eviction pools. Each bucket stores clients with @@ -431,22 +433,25 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */ #define CLIENT_AUTHENTICATED (1ULL << 52) /* Indicate a client has successfully authenticated */ -#define CLIENT_REPL_MAIN_CONN (1ULL<<52) /* RDB connection: track a connection - which is used for online replication data */ -#define CLIENT_REPL_RDB_CONN (1ULL<<53) /* RDB connection: track a connection - which is used for rdb snapshot */ -#define CLIENT_PROTECTED_RDB_CONN (1ULL<<54) /* RDB connection: Protects the RDB client from premature - * release during full sync. This flag is used to ensure that the RDB client, which - * references the first replication data block required by the replica, is not - * released prematurely. Protecting the client is crucial for prevention of - * synchronization failures: - * If the RDB client is released before the replica initiates PSYNC, the primary - * will reduce the reference count (o->refcount) of the block needed by the replica. - * This could potentially lead to the removal of the required data block, resulting - * in synchronization failures. Such failures could occur even in scenarios where - * the replica only needs an additional 4KB beyond the minimum size of the repl_backlog. - * By using this flag, we ensure that the RDB client remains intact until the replica - * has successfully initiated PSYNC. */ +#define CLIENT_REPL_MAIN_CONN \ + (1ULL << 52) /* RDB connection: track a connection \ + which is used for online replication data */ +#define CLIENT_REPL_RDB_CONN \ + (1ULL << 53) /* RDB connection: track a connection \ + which is used for rdb snapshot */ +#define CLIENT_PROTECTED_RDB_CONN \ + (1ULL << 54) /* RDB connection: Protects the RDB client from premature \ + * release during full sync. This flag is used to ensure that the RDB client, which \ + * references the first replication data block required by the replica, is not \ + * released prematurely. Protecting the client is crucial for prevention of \ + * synchronization failures: \ + * If the RDB client is released before the replica initiates PSYNC, the primary \ + * will reduce the reference count (o->refcount) of the block needed by the replica. \ + * This could potentially lead to the removal of the required data block, resulting \ + * in synchronization failures. Such failures could occur even in scenarios where \ + * the replica only needs an additional 4KB beyond the minimum size of the repl_backlog. \ + * By using this flag, we ensure that the RDB client remains intact until the replica \ + * has successfully initiated PSYNC. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -486,30 +491,30 @@ typedef enum { REPL_STATE_CONNECT, /* Must connect to primary */ REPL_STATE_CONNECTING, /* Connecting to primary */ /* --- Handshake states, must be ordered --- */ - REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ - REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to primary */ - REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ - REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ + REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to primary */ + REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ + REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY, /* If using rdb-connection for sync, mark main connection as psync conn */ - REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_SEND_PSYNC, /* Send PSYNC */ - REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ + REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_SEND_PSYNC, /* Send PSYNC */ + REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from primary */ REPL_STATE_CONNECTED, /* Connected to primary */ } repl_state; -/* Replica rdb-connection replication state. Used in server.repl_rdb_conn_state for +/* Replica rdb-connection replication state. Used in server.repl_rdb_conn_state for * replicas to remember what to do next. */ typedef enum { - REPL_RDB_CONN_STATE_NONE = 0, /* No active replication */ - REPL_RDB_CONN_SEND_HANDSHAKE, /* Send handshake sequence to primary */ - REPL_RDB_CONN_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ - REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY, /* Wait for REPLCONF reply */ - REPL_RDB_CONN_RECEIVE_ENDOFF, /* Wait for $ENDOFF reply */ - REPL_RDB_CONN_RDB_LOAD, /* Loading rdb using rdb connection */ + REPL_RDB_CONN_STATE_NONE = 0, /* No active replication */ + REPL_RDB_CONN_SEND_HANDSHAKE, /* Send handshake sequence to primary */ + REPL_RDB_CONN_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ + REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY, /* Wait for REPLCONF reply */ + REPL_RDB_CONN_RECEIVE_ENDOFF, /* Wait for $ENDOFF reply */ + REPL_RDB_CONN_RDB_LOAD, /* Loading rdb using rdb connection */ REPL_RDB_CONN_RDB_LOADED, } repl_rdb_conn_state; @@ -530,21 +535,21 @@ typedef enum { #define REPLICA_STATE_SEND_BULK 8 /* Sending RDB file to replica. */ #define REPLICA_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */ #define REPLICA_STATE_RDB_TRANSMITTED \ - 10 /* RDB file transmitted - This state is used only for \ - * a replica that only wants RDB without replication buffer */ + 10 /* RDB file transmitted - This state is used only for \ + * a replica that only wants RDB without replication buffer */ #define REPLICA_STATE_BG_RDB_LOAD 11 /* Main connection of a replica which uses rdb-connection-sync. */ /* Replica capabilities. */ #define REPLICA_CAPA_NONE 0 -#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ -#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ -#define REPLICA_CAPA_RDB_CONN (1<<2) /* Supports RDB connection sync */ +#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ +#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ +#define REPLICA_CAPA_RDB_CONN (1 << 2) /* Supports RDB connection sync */ /* Replica requirements */ #define REPLICA_REQ_NONE 0 #define REPLICA_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */ #define REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS (1 << 1) /* Exclude functions from RDB */ -#define REPLICA_REQ_RDB_CONN (1 << 2) /* Use rdb-connection sync */ +#define REPLICA_REQ_RDB_CONN (1 << 2) /* Use rdb-connection sync */ /* Mask of all bits in the replica requirements bitfield that represent non-standard (filtered) RDB requirements */ #define REPLICA_REQ_RDB_MASK (REPLICA_REQ_RDB_EXCLUDE_DATA | REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS) @@ -788,7 +793,7 @@ typedef enum { /* Extract encver / signature from a module type ID. */ #define VALKEYMODULE_TYPE_ENCVER_BITS 10 #define VALKEYMODULE_TYPE_ENCVER_MASK ((1 << VALKEYMODULE_TYPE_ENCVER_BITS) - 1) -#define VALKEYMODULE_TYPE_ENCVER(id) ((id) & VALKEYMODULE_TYPE_ENCVER_MASK) +#define VALKEYMODULE_TYPE_ENCVER(id) ((id)&VALKEYMODULE_TYPE_ENCVER_MASK) #define VALKEYMODULE_TYPE_SIGN(id) \ (((id) & ~((uint64_t)VALKEYMODULE_TYPE_ENCVER_MASK)) >> VALKEYMODULE_TYPE_ENCVER_BITS) @@ -1037,7 +1042,7 @@ typedef struct replBufBlock { char buf[]; } replBufBlock; -/* Link list block, used by replDataBuf during rdb-connection sync to store +/* Link list block, used by replDataBuf during rdb-connection sync to store * replication data */ typedef struct replDataBufBlock { size_t size, used; @@ -1310,8 +1315,8 @@ typedef struct client { int replica_version; /* Version on the form 0xMMmmpp. */ short replica_capa; /* Replica capabilities: REPLICA_CAPA_* bitwise OR. */ short replica_req; /* Replica requirements: REPLICA_REQ_* */ - uint64_t associated_rdb_client_id; /* The client id of this replica's rdb connection */ - time_t rdb_client_disconnect_time; /* Time of the first freeClient call on this client. Used for delaying free. */ + uint64_t associated_rdb_client_id; /* The client id of this replica's rdb connection */ + time_t rdb_client_disconnect_time; /* Time of the first freeClient call on this client. Used for delaying free. */ multiState mstate; /* MULTI/EXEC state */ blockingState bstate; /* blocking state */ long long woff; /* Last write global replication offset. */ @@ -1517,7 +1522,7 @@ struct serverMemOverhead { size_t dbid; size_t overhead_ht_main; size_t overhead_ht_expires; - } *db; + } * db; }; /* Replication error behavior determines the replica behavior @@ -1547,7 +1552,8 @@ typedef struct rdbSaveInfo { long long repl_offset; /* Replication offset. */ } rdbSaveInfo; -#define RDB_SAVE_INFO_INIT {-1, 0, "0000000000000000000000000000000000000000", -1} +#define RDB_SAVE_INFO_INIT \ + { -1, 0, "0000000000000000000000000000000000000000", -1 } struct malloc_stats { size_t zmalloc_used; @@ -1701,10 +1707,10 @@ struct valkeyServer { list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ list *replicas, *monitors; /* List of replicas and MONITORs */ - rax *replicas_waiting_psync;/* Radix tree using rdb-client id as keys and rdb-client as values. - * This rax contains replicas for the period from the beginning of - * their RDB connection to the end of their main connection's - * partial synchronization. */ + rax *replicas_waiting_psync; /* Radix tree using rdb-client id as keys and rdb-client as values. + * This rax contains replicas for the period from the beginning of + * their RDB connection to the end of their main connection's + * partial synchronization. */ client *current_client; /* The client that triggered the command execution (External or AOF). */ client *executing_client; /* The client executing the current command (possibly script or module). */ @@ -1805,12 +1811,13 @@ struct valkeyServer { long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to primary, etc.) error replies */ long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ - long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ - long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ - long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ - _Atomic long long stat_total_reads_processed; /* Total number of read events processed */ - _Atomic long long stat_total_writes_processed; /* Total number of write events processed */ - _Atomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ + long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ + long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ + long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ + _Atomic long long stat_total_reads_processed; /* Total number of read events processed */ + _Atomic long long stat_total_writes_processed; /* Total number of write events processed */ + _Atomic long long + stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ @@ -1860,11 +1867,12 @@ struct valkeyServer { int set_proc_title; /* True if change proc title */ char *proc_title_template; /* Process title template format */ clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT]; - int extended_redis_compat; /* True if extended Redis OSS compatibility is enabled */ - int pause_cron; /* Don't run cron tasks (debug) */ - int dict_resizing; /* Whether to allow main dict and expired dict to be resized (debug) */ - int latency_tracking_enabled; /* 1 if extended latency tracking is enabled, 0 otherwise. */ - double *latency_tracking_info_percentiles; /* Extended latency tracking info output percentile list configuration. */ + int extended_redis_compat; /* True if extended Redis OSS compatibility is enabled */ + int pause_cron; /* Don't run cron tasks (debug) */ + int dict_resizing; /* Whether to allow main dict and expired dict to be resized (debug) */ + int latency_tracking_enabled; /* 1 if extended latency tracking is enabled, 0 otherwise. */ + double + *latency_tracking_info_percentiles; /* Extended latency tracking info output percentile list configuration. */ int latency_tracking_info_percentiles_len; unsigned int max_new_tls_conns_per_cycle; /* The maximum number of tls connections that will be accepted during each invocation of the event loop. */ @@ -1928,8 +1936,8 @@ struct valkeyServer { int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */ int rdb_child_type; /* Type of save by active child. */ int lastbgsave_status; /* C_OK or C_ERR */ - int primary_supports_rdb_connection;/* Track whether the primary is able to sync using rdb connection. - * -1 = unknown, 0 = no, 1 = yes. */ + int primary_supports_rdb_connection; /* Track whether the primary is able to sync using rdb connection. + * -1 = unknown, 0 = no, 1 = yes. */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */ /* to the parent process in diskless repl. */ @@ -1980,7 +1988,7 @@ struct valkeyServer { int repl_ping_replica_period; /* Primary pings the replica every N seconds */ replBacklog *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog circular buffer size */ - replDataBuf pending_repl_data; /* Replication data buffer for rdb-connection sync */ + replDataBuf pending_repl_data; /* Replication data buffer for rdb-connection sync */ time_t repl_backlog_time_limit; /* Time without replicas after the backlog gets released. */ time_t repl_no_replicas_since; /* We have no replicas since that time. @@ -1994,26 +2002,26 @@ struct valkeyServer { int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ int repl_diskless_sync_max_replicas; /* Max replicas for diskless repl BGSAVE * delay (start sooner if they all connect). */ - int rdb_conn_enabled; /* Config used to determine if the replica should - * use rdb connection for full syncs. */ - int wait_before_rdb_client_free;/* Grace period in seconds for replica main connection - * to establish psync. */ - int debug_sleep_after_fork_seconds; /* Debug param that force the main connection to - * sleep for N seconds after fork() in repl. */ + int rdb_conn_enabled; /* Config used to determine if the replica should + * use rdb connection for full syncs. */ + int wait_before_rdb_client_free; /* Grace period in seconds for replica main connection + * to establish psync. */ + int debug_sleep_after_fork_seconds; /* Debug param that force the main connection to + * sleep for N seconds after fork() in repl. */ size_t repl_buffer_mem; /* The memory of replication buffer. */ list *repl_buffer_blocks; /* Replication buffers blocks list * (serving replica clients and repl backlog) */ /* Replication (replica) */ - char *primary_user; /* AUTH with this user and primary_auth with primary */ - sds primary_auth; /* AUTH with this password with primary */ - char *primary_host; /* Hostname of primary */ - int primary_port; /* Port of primary */ - int repl_timeout; /* Timeout after N seconds of primary idle */ - client *primary; /* Client that is primary for this replica */ - uint64_t rdb_client_id; /* Rdb client id as it defined at primary side */ + char *primary_user; /* AUTH with this user and primary_auth with primary */ + sds primary_auth; /* AUTH with this password with primary */ + char *primary_host; /* Hostname of primary */ + int primary_port; /* Port of primary */ + int repl_timeout; /* Timeout after N seconds of primary idle */ + client *primary; /* Client that is primary for this replica */ + uint64_t rdb_client_id; /* Rdb client id as it defined at primary side */ struct { - connection* conn; - char replid[CONFIG_RUN_ID_SIZE+1]; + connection *conn; + char replid[CONFIG_RUN_ID_SIZE + 1]; long long reploff; long long read_reploff; int dbid; @@ -2021,7 +2029,7 @@ struct valkeyServer { client *cached_primary; /* Cached primary to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* Replication status if the instance is a replica */ - int repl_rdb_conn_state; /* State of the replica's rdb connection during rdb-connection sync */ + int repl_rdb_conn_state; /* State of the replica's rdb connection during rdb-connection sync */ off_t repl_transfer_size; /* Size of RDB to read from primary during sync. */ off_t repl_transfer_read; /* Amount of RDB read from primary during sync. */ off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ @@ -2996,8 +3004,8 @@ void updateFailoverStatus(void); void abortFailover(const char *err); const char *getFailoverStateString(void); void abortRdbConnectionSync(void); -int sendCurrentOffsetToReplica(client* replica); -void addReplicaToPsyncWait(client* replica); +int sendCurrentOffsetToReplica(client *replica); +void addReplicaToPsyncWait(client *replica); /* Generic persistence functions */ void startLoadingFile(size_t size, char *filename, int rdbflags); @@ -3941,7 +3949,7 @@ sds getVersion(void); * should be ignored due to low level. */ #define serverLog(level, ...) \ do { \ - if (((level) & 0xff) < server.verbosity) break; \ + if (((level)&0xff) < server.verbosity) break; \ _serverLog(level, __VA_ARGS__); \ } while (0)