diff --git a/src/replication.c b/src/replication.c index 864df51791..e0beb1b1bd 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3242,50 +3242,50 @@ void setupMainConnForPsync(connection *conn) { * - Once the replica completes loading the rdb, it drops the rdb channel and streams the accumulated incremental * changes into memory. Repl steady state continues normally. * - * * Replica state machine * - * ┌───────────────────┐ Dual channel sync - * │RECEIVE_PING_REPLY │ ┌──────────────────────────────────────────────────────────────┐ - * └────────┬──────────┘ │ RDB channel states Main channel state │ - * │+PONG │ ┌────────────────────────────┐ ┌───────────────────┐ │ - * ┌────────▼──────────┐ ┌─┼─────►DUAL_CHANNEL_SEND_HANDSHAKE │ ┌─►SEND_HANDSHAKE │ │ - * │SEND_HANDSHAKE │ │ │ └────┬───────────────────────┘ │ └──┬────────────────┘ │ - * └────────┬──────────┘ │ │ │ │ │REPLCONF set-rdb-client-id - * │ │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ - * ┌────────▼──────────┐ │ │ │DUAL_CHANNEL_RECEIVE_AUTH_REPLY│ │ │RECEIVE_CAPA_REPLY │ │ - * │RECEIVE_AUTH_REPLY │ │ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ - * └────────┬──────────┘ │ │ │+OK │ │+OK │ - * │+OK │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ - * ┌────────▼──────────┐ │ │ │DUAL_CHANNEL_RECEIVE_REPLCONF_.│ │ │SEND_PSYNC │ │ - * │RECEIVE_PORT_REPLY │ │ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ - * └────────┬──────────┘ │ │ │+OK │ │PSYNC use snapshot │ - * │+OK │ │ ┌───────▼────────────────┐ │ │end-offset provided │ - * ┌────────▼──────────┐ │ │ │DUAL_CHANNEL_RECEIVE_EN.│ │ │by the primary │ - * │RECEIVE_IP_REPLY │ │ │ └───────┬────────────────┘ │ ┌──▼────────────────┐ │ - * └────────┬──────────┘ │ │ │$ENDOFF │ │RECEIVE_PSYNC_REPLY│ │ - * │+OK │ │ ├─────────────────────────┘ └──┬────────────────┘ │ - * ┌────────▼──────────┐ │ │ │ │+CONTINUE │ - * │RECEIVE_IP_REPLY │ │ │ ┌───────▼───────────────┐ ┌──▼────────────────┐ │ - * └────────┬──────────┘ │ │ │DUAL_CHANNEL_RDB_LOAD │ │TRANSFER │ │ - * │+OK │ │ └───────┬───────────────┘ └─────┬─────────────┘ │ - * ┌────────▼──────────┐ │ │ │Done loading │ │ - * │RECEIVE_CAPA_REPLY │ │ │ ┌───────▼───────────────┐ │ │ - * └────────┬──────────┘ │ │ │DUAL_CHANNEL_RDB_LOADE.│ │ │ - * │ │ │ └───────┬───────────────┘ │ │ - * ┌────────▼───┐ │ │ │ │ │ - * │SEND_PSYNC │ │ │ │Replica loads local replication │ │ - * └─┬──────────┘ │ │ │buffer into memory │ │ - * │PSYNC (use cached-primary)│ │ └─────────┬───────────────────────┘ │ - * ┌─▼─────────────────┐ │ │ │ │ - * │RECEIVE_PSYNC_REPLY│ │ └────────────────────┼─────────────────────────────────────────┘ - * └────────┬─┬────────┘ │ │ - * +CONTINUE│ │+DUALCHANNELSYNC │ │ - * │ │ └─────────────────┘ │ - * │ │+FULLRESYNC │ - * │ ┌─▼─────────────────┐ ┌────▼──────────────┐ - * │ │TRANSFER ├───────────────────►CONNECTED │ - * │ └───────────────────┘ └────▲──────────────┘ - * │ │ - * └─────────────────────────────────────────────────┘ + * * Replica state machine * + * ┌───────────────────┐ Dual channel sync + * │RECEIVE_PING_REPLY │ ┌──────────────────────────────────────────────────────────────┐ + * └────────┬──────────┘ │ RDB channel states Main channel state │ + * │+PONG │ ┌────────────────────────────┐ ┌───────────────────┐ │ + * ┌────────▼──────────┐ ┌─┼─────►DUAL_CHANNEL_SEND_HANDSHAKE │ ┌─►SEND_HANDSHAKE │ │ + * │SEND_HANDSHAKE │ │ │ └────┬───────────────────────┘ │ └──┬────────────────┘ │ + * └────────┬──────────┘ │ │ │ │ │REPLCONF set-rdb-client-id + * │ │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ + * ┌────────▼──────────┐ │ │ │DUAL_CHANNEL_RECEIVE_AUTH_REPLY│ │ │RECEIVE_CAPA_REPLY │ │ + * │RECEIVE_AUTH_REPLY │ │ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ + * └────────┬──────────┘ │ │ │+OK │ │+OK │ + * │+OK │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ + * ┌────────▼──────────┐ │ │ │DUAL_CHANNEL_RECEIVE_REPLCONF_.│ │ │SEND_PSYNC │ │ + * │RECEIVE_PORT_REPLY │ │ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ + * └────────┬──────────┘ │ │ │+OK │ │PSYNC use snapshot │ + * │+OK │ │ ┌───────▼────────────────┐ │ │end-offset provided │ + * ┌────────▼──────────┐ │ │ │DUAL_CHANNEL_RECEIVE_EN.│ │ │by the primary │ + * │RECEIVE_IP_REPLY │ │ │ └───────┬────────────────┘ │ ┌──▼────────────────┐ │ + * └────────┬──────────┘ │ │ │$ENDOFF │ │RECEIVE_PSYNC_REPLY│ │ + * │+OK │ │ ├─────────────────────────┘ └──┬────────────────┘ │ + * ┌────────▼──────────┐ │ │ │ │+CONTINUE │ + * │RECEIVE_IP_REPLY │ │ │ ┌───────▼───────────────┐ ┌──▼────────────────┐ │ + * └────────┬──────────┘ │ │ │DUAL_CHANNEL_RDB_LOAD │ │TRANSFER │ │ + * │+OK │ │ └───────┬───────────────┘ └─────┬─────────────┘ │ + * ┌────────▼──────────┐ │ │ │Done loading │ │ + * │RECEIVE_CAPA_REPLY │ │ │ ┌───────▼───────────────┐ │ │ + * └────────┬──────────┘ │ │ │DUAL_CHANNEL_RDB_LOADE.│ │ │ + * │ │ │ └───────┬───────────────┘ │ │ + * ┌────────▼───┐ │ │ │ │ │ + * │SEND_PSYNC │ │ │ │Replica loads local replication │ │ + * └─┬──────────┘ │ │ │buffer into memory │ │ + * │PSYNC (use cached-primary)│ │ └─────────┬───────────────────────┘ │ + * ┌─▼─────────────────┐ │ │ │ │ + * │RECEIVE_PSYNC_REPLY│ │ └────────────────────┼─────────────────────────────────────────┘ + * └────────┬─┬────────┘ │ │ + * +CONTINUE│ │+DUALCHANNELSYNC │ │ + * │ │ └─────────────────┘ │ + * │ │+FULLRESYNC │ + * │ ┌─▼─────────────────┐ ┌────▼──────────────┐ + * │ │TRANSFER ├───────────────────►CONNECTED │ + * │ └───────────────────┘ └────▲──────────────┘ + * │ │ + * └─────────────────────────────────────────────────┘ */ /* This handler fires when the non blocking connect was able to * establish a connection with the primary. */ @@ -3389,7 +3389,9 @@ void syncWithPrimary(connection *conn) { * PSYNC2: supports PSYNC v2, so understands +CONTINUE . * * The primary will ignore capabilities it does not understand. */ - err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", server.dual_channel_replication ? "capa" : NULL, server.dual_channel_replication ? "dual-channel" : NULL, NULL); + err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", + server.dual_channel_replication ? "capa" : NULL, + server.dual_channel_replication ? "dual-channel" : NULL, NULL); if (err) goto write_error; /* Inform the primary of our (replica) version. */ diff --git a/src/server.h b/src/server.h index 058ca43b54..182d492d7c 100644 --- a/src/server.h +++ b/src/server.h @@ -131,7 +131,8 @@ struct hdr_histogram; #define CONFIG_BGSAVE_RETRY_DELAY 5 /* Wait a few secs before trying again. */ #define CONFIG_DEFAULT_PID_FILE "/var/run/valkey.pid" #define CONFIG_DEFAULT_BINDADDR_COUNT 2 -#define CONFIG_DEFAULT_BINDADDR {"*", "-::*"} +#define CONFIG_DEFAULT_BINDADDR \ + { "*", "-::*" } #define NET_HOST_STR_LEN 256 /* Longest valid hostname */ #define NET_IP_STR_LEN 46 /* INET6_ADDRSTRLEN is 46, but we need to be sure */ #define NET_ADDR_STR_LEN (NET_IP_STR_LEN + 32) /* Must be enough for ip:port */ @@ -388,15 +389,15 @@ typedef enum { REPL_STATE_CONNECT, /* Must connect to primary */ REPL_STATE_CONNECTING, /* Connecting to primary */ /* --- Handshake states, must be ordered --- */ - REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ - REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to primary */ - REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ - REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_SEND_PSYNC, /* Send PSYNC */ - REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ + REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ + REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to primary */ + REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ + REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_SEND_PSYNC, /* Send PSYNC */ + REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from primary */ REPL_STATE_CONNECTED, /* Connected to primary */ @@ -689,7 +690,7 @@ typedef enum { /* Extract encver / signature from a module type ID. */ #define VALKEYMODULE_TYPE_ENCVER_BITS 10 #define VALKEYMODULE_TYPE_ENCVER_MASK ((1 << VALKEYMODULE_TYPE_ENCVER_BITS) - 1) -#define VALKEYMODULE_TYPE_ENCVER(id) ((id) & VALKEYMODULE_TYPE_ENCVER_MASK) +#define VALKEYMODULE_TYPE_ENCVER(id) ((id)&VALKEYMODULE_TYPE_ENCVER_MASK) #define VALKEYMODULE_TYPE_SIGN(id) \ (((id) & ~((uint64_t)VALKEYMODULE_TYPE_ENCVER_MASK)) >> VALKEYMODULE_TYPE_ENCVER_BITS) @@ -1220,8 +1221,9 @@ typedef struct ClientFlags { * repl_backlog. * By using this flag, we ensure that the RDB client remains intact until the replica * \ has successfully initiated PSYNC. */ - uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */ - uint64_t reserved : 7; /* Reserved for future use */ + uint64_t + repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */ + uint64_t reserved : 7; /* Reserved for future use */ } ClientFlags; typedef struct client { @@ -1506,7 +1508,7 @@ struct serverMemOverhead { size_t dbid; size_t overhead_ht_main; size_t overhead_ht_expires; - } *db; + } * db; }; /* Replication error behavior determines the replica behavior @@ -1536,7 +1538,8 @@ typedef struct rdbSaveInfo { long long repl_offset; /* Replication offset. */ } rdbSaveInfo; -#define RDB_SAVE_INFO_INIT {-1, 0, "0000000000000000000000000000000000000000", -1} +#define RDB_SAVE_INFO_INIT \ + { -1, 0, "0000000000000000000000000000000000000000", -1 } struct malloc_stats { size_t zmalloc_used; @@ -1860,11 +1863,12 @@ struct valkeyServer { int set_proc_title; /* True if change proc title */ char *proc_title_template; /* Process title template format */ clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT]; - int extended_redis_compat; /* True if extended Redis OSS compatibility is enabled */ - int pause_cron; /* Don't run cron tasks (debug) */ - int dict_resizing; /* Whether to allow main dict and expired dict to be resized (debug) */ - int latency_tracking_enabled; /* 1 if extended latency tracking is enabled, 0 otherwise. */ - double *latency_tracking_info_percentiles; /* Extended latency tracking info output percentile list configuration. */ + int extended_redis_compat; /* True if extended Redis OSS compatibility is enabled */ + int pause_cron; /* Don't run cron tasks (debug) */ + int dict_resizing; /* Whether to allow main dict and expired dict to be resized (debug) */ + int latency_tracking_enabled; /* 1 if extended latency tracking is enabled, 0 otherwise. */ + double + *latency_tracking_info_percentiles; /* Extended latency tracking info output percentile list configuration. */ int latency_tracking_info_percentiles_len; unsigned int max_new_tls_conns_per_cycle; /* The maximum number of tls connections that will be accepted during each invocation of the event loop. */ @@ -3966,7 +3970,7 @@ sds getVersion(void); * should be ignored due to low level. */ #define serverLog(level, ...) \ do { \ - if (((level) & 0xff) < server.verbosity) break; \ + if (((level)&0xff) < server.verbosity) break; \ _serverLog(level, __VA_ARGS__); \ } while (0)