From a81c32079cfd63ae022a3d1a330904f8d9fd1858 Mon Sep 17 00:00:00 2001 From: Sankar <1890648+srgsanky@users.noreply.github.com> Date: Sun, 16 Jun 2024 20:37:09 -0700 Subject: [PATCH 01/53] Make cluster meet reliable under link failures (#461) When there is a link failure while an ongoing MEET request is sent the sending node stops sending anymore MEET and starts sending PINGs. Since every node responds to PINGs from unknown nodes with a PONG, the receiving node never adds the sending node. But the sending node adds the receiving node when it sees a PONG. This can lead to asymmetry in cluster membership. This changes makes the sender keep sending MEET until it sees a PONG, avoiding the asymmetry. --------- Signed-off-by: Sankar <1890648+srgsanky@users.noreply.github.com> --- src/cluster_legacy.c | 35 ++++++-- src/debug.c | 6 ++ src/server.h | 2 + tests/unit/cluster/cluster-multiple-meets.tcl | 83 +++++++++++++++++++ tests/unit/cluster/cluster-reliable-meet.tcl | 71 ++++++++++++++++ 5 files changed, 190 insertions(+), 7 deletions(-) create mode 100644 tests/unit/cluster/cluster-multiple-meets.tcl create mode 100644 tests/unit/cluster/cluster-reliable-meet.tcl diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index f566cf5a35..9104a76d87 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2844,7 +2844,16 @@ int clusterIsValidPacket(clusterLink *link) { * received from the wrong sender ID). */ int clusterProcessPacket(clusterLink *link) { /* Validate that the packet is well-formed */ - if (!clusterIsValidPacket(link)) return 1; + if (!clusterIsValidPacket(link)) { + clusterMsg *hdr = (clusterMsg *)link->rcvbuf; + uint16_t type = ntohs(hdr->type); + if (server.debug_cluster_close_link_on_packet_drop && type == server.cluster_drop_packet_filter) { + freeClusterLink(link); + serverLog(LL_WARNING, "Closing link for matching packet type %hu", type); + return 0; + } + return 1; + } clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint16_t type = ntohs(hdr->type); @@ -2942,6 +2951,13 @@ int clusterProcessPacket(clusterLink *link) { if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { serverLog(LL_DEBUG, "%s packet received: %.40s", clusterGetMessageTypeString(type), link->node ? link->node->name : "NULL"); + + if (sender && (sender->flags & CLUSTER_NODE_MEET)) { + /* Once we get a response for MEET from the sender, we can stop sending more MEET. */ + sender->flags &= ~CLUSTER_NODE_MEET; + serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name, + sender->human_nodename); + } if (!link->inbound) { if (nodeInHandshake(link->node)) { /* If we already have this node, try to change the @@ -3376,12 +3392,17 @@ void clusterLinkConnectHandler(connection *conn) { * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; } - /* We can clear the flag after the first packet is sent. - * If we'll never receive a PONG, we'll never send new packets - * to this node. Instead after the PONG is received and we - * are no longer in meet/handshake status, we want to send - * normal PING packets. */ - node->flags &= ~CLUSTER_NODE_MEET; + /* NOTE: Assume the current node is A and is asked to MEET another node B. + * Once A sends MEET to B, it cannot clear the MEET flag for B until it + * gets a response from B. If the MEET packet is not accepted by B due to + * link failure, A must continue sending MEET. If A doesn't continue sending + * MEET, A will know about B, but B will never add A. Every node always + * responds to PINGs from unknown nodes with a PONG, so A will know about B + * and continue sending PINGs. But B won't add A until it sees a MEET (or it + * gets to know about A from a trusted third node C). In this case, clearing + * the MEET flag here leads to asymmetry in the cluster membership. So, we + * clear the MEET flag in clusterProcessPacket. + */ serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport); } diff --git a/src/debug.c b/src/debug.c index 6394e3f0f4..d9fe93c7d4 100644 --- a/src/debug.c +++ b/src/debug.c @@ -429,6 +429,9 @@ void debugCommand(client *c) { " Show low level info about `key` and associated value.", "DROP-CLUSTER-PACKET-FILTER ", " Drop all packets that match the filtered type. Set to -1 allow all packets.", + "CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>", + " This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type." + " When set to 1, the cluster link is closed after dropping a packet based on the filter." "OOM", " Crash the server simulating an out-of-memory error.", "PANIC", @@ -593,6 +596,9 @@ void debugCommand(client *c) { if (getLongFromObjectOrReply(c, c->argv[2], &packet_type, NULL) != C_OK) return; server.cluster_drop_packet_filter = packet_type; addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr, "close-cluster-link-on-packet-drop") && c->argc == 3) { + server.debug_cluster_close_link_on_packet_drop = atoi(c->argv[2]->ptr); + addReply(c, shared.ok); } else if (!strcasecmp(c->argv[1]->ptr, "object") && c->argc == 3) { dictEntry *de; robj *val; diff --git a/src/server.h b/src/server.h index ae2d23b99f..c4ce6f655e 100644 --- a/src/server.h +++ b/src/server.h @@ -2069,6 +2069,8 @@ struct valkeyServer { unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */ int cluster_drop_packet_filter; /* Debug config that allows tactically * dropping packets of a specific type */ + /* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */ + uint32_t debug_cluster_close_link_on_packet_drop : 1; sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Scripting */ mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */ diff --git a/tests/unit/cluster/cluster-multiple-meets.tcl b/tests/unit/cluster/cluster-multiple-meets.tcl new file mode 100644 index 0000000000..07a2582133 --- /dev/null +++ b/tests/unit/cluster/cluster-multiple-meets.tcl @@ -0,0 +1,83 @@ +# make sure the test infra won't use SELECT +set old_singledb $::singledb +set ::singledb 1 + +tags {tls:skip external:skip cluster} { + set base_conf [list cluster-enabled yes] + start_multiple_servers 2 [list overrides $base_conf] { + test "Cluster nodes are reachable" { + for {set id 0} {$id < [llength $::servers]} {incr id} { + # Every node should be reachable. + wait_for_condition 1000 50 { + ([catch {R $id ping} ping_reply] == 0) && + ($ping_reply eq {PONG}) + } else { + catch {R $id ping} err + fail "Node #$id keeps replying '$err' to PING." + } + } + } + + test "Before slots allocation, all nodes report cluster failure" { + wait_for_cluster_state fail + } + + set CLUSTER_PACKET_TYPE_PONG 1 + set CLUSTER_PACKET_TYPE_NONE -1 + + test "Cluster nodes haven't met each other" { + assert {[llength [get_cluster_nodes 1]] == 1} + assert {[llength [get_cluster_nodes 0]] == 1} + } + + test "Allocate slots" { + cluster_allocate_slots 2 0;# primaries replicas + } + + test "Multiple MEETs from Node 1 to Node 0 should work" { + # Make 1 drop the PONG responses to MEET + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_PONG + # It is important to close the connection on drop, otherwise a subsequent MEET won't be sent + R 1 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1 + + R 1 CLUSTER MEET 127.0.0.1 [srv 0 port] + + # Wait for at least a few MEETs to be sent so that we are sure that 1 is dropping the response to MEET. + wait_for_condition 1000 50 { + [CI 0 cluster_stats_messages_meet_received] > 1 && + [CI 1 cluster_state] eq {fail} && [CI 0 cluster_state] eq {ok} + } else { + fail "Cluster node 1 never sent multiple MEETs to 0" + } + + # 0 will be connected to 1, but 1 won't see that 0 is connected + assert {[llength [get_cluster_nodes 1 connected]] == 1} + assert {[llength [get_cluster_nodes 0 connected]] == 2} + + # Drop incoming and outgoing links from/to 1 + R 0 DEBUG CLUSTERLINK KILL ALL [R 1 CLUSTER MYID] + + # Wait for 0 to know about 1 again after 1 sends a MEET + wait_for_condition 1000 50 { + [llength [get_cluster_nodes 0 connected]] == 2 + } else { + fail "Cluster node 1 never sent multiple MEETs to 0" + } + + # Undo packet drop + R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + R 1 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 0 + + # Both a and b will turn to cluster state ok + wait_for_condition 1000 50 { + [CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received] + } else { + fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]" + } + } + } ;# stop servers +} ;# tags + +set ::singledb $old_singledb + diff --git a/tests/unit/cluster/cluster-reliable-meet.tcl b/tests/unit/cluster/cluster-reliable-meet.tcl new file mode 100644 index 0000000000..41da97ab9b --- /dev/null +++ b/tests/unit/cluster/cluster-reliable-meet.tcl @@ -0,0 +1,71 @@ +# make sure the test infra won't use SELECT +set old_singledb $::singledb +set ::singledb 1 + +tags {tls:skip external:skip cluster} { + set base_conf [list cluster-enabled yes] + start_multiple_servers 2 [list overrides $base_conf] { + test "Cluster nodes are reachable" { + for {set id 0} {$id < [llength $::servers]} {incr id} { + # Every node should be reachable. + wait_for_condition 1000 50 { + ([catch {R $id ping} ping_reply] == 0) && + ($ping_reply eq {PONG}) + } else { + catch {R $id ping} err + fail "Node #$id keeps replying '$err' to PING." + } + } + } + + test "Before slots allocation, all nodes report cluster failure" { + wait_for_cluster_state fail + } + + set CLUSTER_PACKET_TYPE_MEET 2 + set CLUSTER_PACKET_TYPE_NONE -1 + + test "Cluster nodes haven't met each other" { + assert {[llength [get_cluster_nodes 1]] == 1} + assert {[llength [get_cluster_nodes 0]] == 1} + } + + test "Allocate slots" { + cluster_allocate_slots 2 0 + } + + test "MEET is reliable when target drops the initial MEETs" { + # Make 0 drop the initial MEET messages due to link failure + R 0 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET + R 0 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1 + + R 1 CLUSTER MEET 127.0.0.1 [srv 0 port] + + # Wait for at least a few MEETs to be sent so that we are sure that 0 is + # dropping them. + wait_for_condition 1000 50 { + [CI 0 cluster_stats_messages_meet_received] >= 3 + } else { + fail "Cluster node 1 never sent multiple MEETs to 0" + } + + # Make sure the nodes still don't know about each other + assert {[llength [get_cluster_nodes 1 connected]] == 1} + assert {[llength [get_cluster_nodes 0 connected]] == 1} + + R 0 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE + + # If the MEET is reliable, both a and b will turn to cluster state ok + wait_for_condition 1000 50 { + [CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} && + [CI 0 cluster_stats_messages_meet_received] >= 4 && + [CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received] + } else { + fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]" + } + } + } ;# stop servers +} ;# tags + +set ::singledb $old_singledb + From db6d3c1138695947412a745146ca29dbdf2e91c6 Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 17 Jun 2024 11:46:08 +0800 Subject: [PATCH 02/53] Only primary with slots has the right to mark a node as failed (#634) In markNodeAsFailingIfNeeded we will count needed_quorum and failures, needed_quorum is the half the cluster->size and plus one, and cluster-size is the size of primary node which contain slots, but when counting failures, we dit not check if primary has slots. Only the primary has slots that has the rights to vote, adding a new clusterNodeIsVotingPrimary to formalize this concept. Release notes: bugfix where nodes not in the quorum group might spuriously mark nodes as failed --------- Signed-off-by: Binbin Co-authored-by: Ping Xie --- src/cluster_legacy.c | 23 +++++---- tests/unit/cluster/failure-marking.tcl | 68 ++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 9 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 9104a76d87..21aa620dd9 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -116,6 +116,12 @@ int verifyClusterNodeId(const char *name, int length); sds clusterEncodeOpenSlotsAuxField(int rdbflags); int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s); +/* Only primaries that own slots have voting rights. + * Returns 1 if the node has voting rights, otherwise returns 0. */ +static inline int clusterNodeIsVotingPrimary(clusterNode *n) { + return (n->flags & CLUSTER_NODE_PRIMARY) && n->numslots; +} + int getNodeDefaultClientPort(clusterNode *n) { return server.tls_cluster ? n->tls_port : n->tcp_port; } @@ -1867,8 +1873,8 @@ void markNodeAsFailingIfNeeded(clusterNode *node) { if (nodeFailed(node)) return; /* Already FAILing. */ failures = clusterNodeFailureReportsCount(node); - /* Also count myself as a voter if I'm a primary. */ - if (clusterNodeIsPrimary(myself)) failures++; + /* Also count myself as a voter if I'm a voting primary. */ + if (clusterNodeIsVotingPrimary(myself)) failures++; if (failures < needed_quorum) return; /* No weak agreement from primaries. */ serverLog(LL_NOTICE, "Marking node %.40s (%s) as failing (quorum reached).", node->name, node->human_nodename); @@ -1908,7 +1914,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { * 1) The FAIL state is old enough. * 2) It is yet serving slots from our point of view (not failed over). * Apparently no one is going to fix these slots, clear the FAIL flag. */ - if (clusterNodeIsPrimary(node) && node->numslots > 0 && + if (clusterNodeIsVotingPrimary(node) && (now - node->fail_time) > (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT)) { serverLog( LL_NOTICE, @@ -2090,8 +2096,8 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { /* Ignore gossips about self. */ if (node && node != myself) { /* We already know this node. - Handle failure reports, only when the sender is a primary. */ - if (sender && clusterNodeIsPrimary(sender)) { + Handle failure reports, only when the sender is a voting primary. */ + if (sender && clusterNodeIsVotingPrimary(sender)) { if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) { if (clusterNodeAddFailureReport(node, sender)) { serverLog(LL_VERBOSE, "Node %.40s (%s) reported node %.40s (%s) as not reachable.", @@ -3250,8 +3256,7 @@ int clusterProcessPacket(clusterLink *link) { /* We consider this vote only if the sender is a primary serving * a non zero number of slots, and its currentEpoch is greater or * equal to epoch where this node started the election. */ - if (clusterNodeIsPrimary(sender) && sender->numslots > 0 && - senderCurrentEpoch >= server.cluster->failover_auth_epoch) { + if (clusterNodeIsVotingPrimary(sender) && senderCurrentEpoch >= server.cluster->failover_auth_epoch) { server.cluster->failover_auth_count++; /* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ @@ -4768,7 +4773,7 @@ void clusterCron(void) { if (!(node->flags & (CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL))) { node->flags |= CLUSTER_NODE_PFAIL; update_state = 1; - if (clusterNodeIsPrimary(myself) && server.cluster->size == 1) { + if (server.cluster->size == 1 && clusterNodeIsVotingPrimary(myself)) { markNodeAsFailingIfNeeded(node); } else { serverLog(LL_DEBUG, "*** NODE %.40s possibly failing", node->name); @@ -5038,7 +5043,7 @@ void clusterUpdateState(void) { while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); - if (clusterNodeIsPrimary(node) && node->numslots) { + if (clusterNodeIsVotingPrimary(node)) { server.cluster->size++; if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++; } diff --git a/tests/unit/cluster/failure-marking.tcl b/tests/unit/cluster/failure-marking.tcl index c4746c8264..cfed7fff0f 100644 --- a/tests/unit/cluster/failure-marking.tcl +++ b/tests/unit/cluster/failure-marking.tcl @@ -16,6 +16,8 @@ start_cluster 1 1 {tags {external:skip cluster}} { pause_process $replica1_pid wait_node_marked_fail 0 $replica1_instance_id + + resume_process $replica1_pid } } @@ -49,5 +51,71 @@ start_cluster 2 1 {tags {external:skip cluster}} { resume_process $primary2_pid wait_node_marked_fail 0 $replica1_instance_id + + resume_process $replica1_pid + } +} + +set old_singledb $::singledb +set ::singledb 1 + +tags {external:skip tls:skip cluster} { + set base_conf [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 save ""] + start_multiple_servers 5 [list overrides $base_conf] { + test "Only primary with slots has the right to mark a node as failed" { + set primary_host [srv 0 host] + set primary_port [srv 0 port] + set primary_pid [srv 0 pid] + set primary_id [R 0 CLUSTER MYID] + set replica_id [R 1 CLUSTER MYID] + set replica_pid [srv -1 pid] + + # Meet others nodes. + R 1 CLUSTER MEET $primary_host $primary_port + R 2 CLUSTER MEET $primary_host $primary_port + R 3 CLUSTER MEET $primary_host $primary_port + R 4 CLUSTER MEET $primary_host $primary_port + + # Build a single primary cluster. + cluster_allocate_slots 1 1 + wait_for_cluster_propagation + R 1 CLUSTER REPLICATE $primary_id + wait_for_cluster_propagation + wait_for_cluster_state "ok" + + # Pause the primary, marking the primary as pfail. + pause_process $primary_pid + wait_node_marked_pfail 1 $primary_id + wait_node_marked_pfail 2 $primary_id + wait_node_marked_pfail 3 $primary_id + wait_node_marked_pfail 4 $primary_id + + # Pause the replica, marking the replica as pfail. + pause_process $replica_pid + wait_node_marked_pfail 2 $replica_id + wait_node_marked_pfail 3 $replica_id + wait_node_marked_pfail 4 $replica_id + + # Resume the primary, marking the replica as fail. + resume_process $primary_pid + wait_node_marked_fail 0 $replica_id + wait_node_marked_fail 2 $replica_id + wait_node_marked_fail 3 $replica_id + wait_node_marked_fail 4 $replica_id + + # Check if we got the right failure reports. + wait_for_condition 1000 50 { + [R 0 CLUSTER COUNT-FAILURE-REPORTS $replica_id] == 0 && + [R 2 CLUSTER COUNT-FAILURE-REPORTS $replica_id] == 1 && + [R 3 CLUSTER COUNT-FAILURE-REPORTS $replica_id] == 1 && + [R 4 CLUSTER COUNT-FAILURE-REPORTS $replica_id] == 1 + } else { + fail "Cluster COUNT-FAILURE-REPORTS is not right." + } + + resume_process $replica_pid + } } } + +set ::singledb $old_singledb From 5a51bf5045d1713f5ebf58011dcb8900805be164 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 17 Jun 2024 12:18:20 +0800 Subject: [PATCH 03/53] Combine events to eliminate redundant kevent(2) calls (#638) Combine events to eliminate redundant kevent(2) calls to improve performance. --------- Signed-off-by: Andy Pan --- src/ae_kqueue.c | 31 ++++++++++++------------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/src/ae_kqueue.c b/src/ae_kqueue.c index 3cb6fbae4a..4159f25744 100644 --- a/src/ae_kqueue.c +++ b/src/ae_kqueue.c @@ -101,31 +101,24 @@ static void aeApiFree(aeEventLoop *eventLoop) { static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; - struct kevent ke; + struct kevent evs[2]; + int nch = 0; - if (mask & AE_READABLE) { - EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); - if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1; - } - if (mask & AE_WRITABLE) { - EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); - if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1; - } - return 0; + if (mask & AE_READABLE) EV_SET(evs + nch++, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + if (mask & AE_WRITABLE) EV_SET(evs + nch++, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + + return kevent(state->kqfd, evs, nch, NULL, 0, NULL); } static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; - struct kevent ke; + struct kevent evs[2]; + int nch = 0; - if (mask & AE_READABLE) { - EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - kevent(state->kqfd, &ke, 1, NULL, 0, NULL); - } - if (mask & AE_WRITABLE) { - EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - kevent(state->kqfd, &ke, 1, NULL, 0, NULL); - } + if (mask & AE_READABLE) EV_SET(evs + nch++, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (mask & AE_WRITABLE) EV_SET(evs + nch++, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + + kevent(state->kqfd, evs, nch, NULL, 0, NULL); } static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { From 495a121f1938ccba6a249bd44df7d963fd32139a Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 18 Jun 2024 10:46:56 +0800 Subject: [PATCH 04/53] Adjust the log level of some logs in the cluster (#633) I think the log of pfail status changes will be very useful. The other parts were scanned and found that it can be modified. Changes: 1. Changing pfail status releated logs from VERBOSE to NOTICE. 2. Changing configEpoch collision log from VERBOSE(warning) to NOTICE. 3. Changing some logs from DEBUG to NOTICE. Signed-off-by: Binbin Co-authored-by: Madelyn Olson --- src/cluster_legacy.c | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 21aa620dd9..0803e5039a 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1761,10 +1761,8 @@ void clusterHandleConfigEpochCollision(clusterNode *sender) { server.cluster->currentEpoch++; myself->configEpoch = server.cluster->currentEpoch; clusterSaveConfigOrDie(1); - serverLog(LL_VERBOSE, - "WARNING: configEpoch collision with node %.40s (%s)." - " configEpoch set to %llu", - sender->name, sender->human_nodename, (unsigned long long)myself->configEpoch); + serverLog(LL_NOTICE, "configEpoch collision with node %.40s (%s). configEpoch set to %llu", sender->name, + sender->human_nodename, (unsigned long long)myself->configEpoch); } /* ----------------------------------------------------------------------------- @@ -2100,13 +2098,13 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { if (sender && clusterNodeIsVotingPrimary(sender)) { if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) { if (clusterNodeAddFailureReport(node, sender)) { - serverLog(LL_VERBOSE, "Node %.40s (%s) reported node %.40s (%s) as not reachable.", - sender->name, sender->human_nodename, node->name, node->human_nodename); + serverLog(LL_NOTICE, "Node %.40s (%s) reported node %.40s (%s) as not reachable.", sender->name, + sender->human_nodename, node->name, node->human_nodename); } markNodeAsFailingIfNeeded(node); } else { if (clusterNodeDelFailureReport(node, sender)) { - serverLog(LL_VERBOSE, "Node %.40s (%s) reported node %.40s (%s) is back online.", sender->name, + serverLog(LL_NOTICE, "Node %.40s (%s) reported node %.40s (%s) is back online.", sender->name, sender->human_nodename, node->name, node->human_nodename); } } @@ -2993,7 +2991,7 @@ int clusterProcessPacket(clusterLink *link) { /* If the reply has a non matching node ID we * disconnect this node and set it as not having an associated * address. */ - serverLog(LL_DEBUG, + serverLog(LL_NOTICE, "PONG contains mismatching sender ID. About node %.40s (%s) in shard %.40s added %d ms ago, " "having flags %d", link->node->name, link->node->human_nodename, link->node->shard_id, @@ -4776,7 +4774,7 @@ void clusterCron(void) { if (server.cluster->size == 1 && clusterNodeIsVotingPrimary(myself)) { markNodeAsFailingIfNeeded(node); } else { - serverLog(LL_DEBUG, "*** NODE %.40s possibly failing", node->name); + serverLog(LL_NOTICE, "NODE %.40s (%s) possibly failing.", node->name, node->human_nodename); } } } From 4135894a5d40f7270a2127d601fbf4f81ca4ab26 Mon Sep 17 00:00:00 2001 From: Ping Xie Date: Mon, 17 Jun 2024 20:31:15 -0700 Subject: [PATCH 05/53] Update remaining `master` references to `primary` (#660) Signed-off-by: Ping Xie --- src/blocked.c | 6 +- src/cluster.c | 4 +- src/cluster_legacy.c | 8 +- src/commands.def | 19 +-- src/commands/client-kill.json | 10 ++ src/commands/client-list.json | 4 + src/commands/cluster-failover.json | 2 +- src/commands/cluster-replicas.json | 4 +- src/commands/cluster-replicate.json | 2 +- src/commands/cluster-slaves.json | 4 +- src/commands/cluster-slots.json | 2 +- src/commands/replicaof.json | 2 +- src/commands/role.json | 20 +-- src/config.c | 2 +- src/networking.c | 13 +- src/object.c | 2 +- src/replication.c | 182 ++++++++++++++-------------- src/rio.c | 2 +- src/valkey-benchmark.c | 8 +- src/valkey-cli.c | 73 ++++++----- src/valkeymodule.h | 2 +- tests/integration/replication.tcl | 4 +- tests/unit/auth.tcl | 2 +- tests/unit/introspection.tcl | 4 +- 24 files changed, 199 insertions(+), 182 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 6d8d4fbc7c..08abac15e3 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -86,7 +86,7 @@ void initClientBlockingState(client *c) { * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { - /* Master client should never be blocked unless pause or module */ + /* Primary client should never be blocked unless pause or module */ serverAssert(!(c->flags & CLIENT_PRIMARY && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE)); c->flags |= CLIENT_BLOCKED; @@ -265,8 +265,8 @@ void replyToClientsBlockedOnShutdown(void) { /* Mass-unblock clients because something changed in the instance that makes * blocking no longer safe. For example clients blocked in list operations - * in an instance which turns from master to replica is unsafe, so this function - * is called when a master turns into a replica. + * in an instance which turns from primary to replica is unsafe, so this function + * is called when a primary turns into a replica. * * The semantics is to send an -UNBLOCKED error to the client, disconnecting * it at the same time. */ diff --git a/src/cluster.c b/src/cluster.c index 00f3c2d889..8aa6793ba8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -813,12 +813,12 @@ void clusterCommandHelp(client *c) { " Return the node's shard id.", "NODES", " Return cluster configuration seen by node. Output format:", - " ...", + " ...", "REPLICAS ", " Return replicas.", "SLOTS", " Return information about slots range mappings. Each range is made of:", - " start, end, master and replicas IP addresses, ports and ids", + " start, end, primary and replicas IP addresses, ports and ids", "SHARDS", " Return information about slot range mappings and the nodes associated with them.", NULL}; diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 0803e5039a..e9816d52a1 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -533,9 +533,9 @@ int clusterLoadConfig(char *filename) { serverAssert(server.cluster->myself == NULL); myself = server.cluster->myself = n; n->flags |= CLUSTER_NODE_MYSELF; - } else if (!strcasecmp(s, "master")) { + } else if (!strcasecmp(s, "master") || !strcasecmp(s, "primary")) { n->flags |= CLUSTER_NODE_PRIMARY; - } else if (!strcasecmp(s, "slave")) { + } else if (!strcasecmp(s, "slave") || !strcasecmp(s, "replica")) { n->flags |= CLUSTER_NODE_REPLICA; } else if (!strcasecmp(s, "fail?")) { n->flags |= CLUSTER_NODE_PFAIL; @@ -1903,7 +1903,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { * node again. */ if (nodeIsReplica(node) || node->numslots == 0) { serverLog(LL_NOTICE, "Clear FAIL state for node %.40s (%s):%s is reachable again.", node->name, - node->human_nodename, nodeIsReplica(node) ? "replica" : "master without slots"); + node->human_nodename, nodeIsReplica(node) ? "replica" : "primary without slots"); node->flags &= ~CLUSTER_NODE_FAIL; clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG); } @@ -4154,7 +4154,7 @@ void clusterLogCantFailover(int reason) { switch (reason) { case CLUSTER_CANT_FAILOVER_DATA_AGE: - msg = "Disconnected from master for longer than allowed. " + msg = "Disconnected from primary for longer than allowed. " "Please check the 'cluster-replica-validity-factor' configuration " "option."; break; diff --git a/src/commands.def b/src/commands.def index 06cdb4b87e..cb7fd73cc5 100644 --- a/src/commands.def +++ b/src/commands.def @@ -961,7 +961,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("countkeysinslot","Returns the number of keys in a hash slot.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_COUNTKEYSINSLOT_History,0,CLUSTER_COUNTKEYSINSLOT_Tips,0,clusterCommand,3,CMD_STALE,0,CLUSTER_COUNTKEYSINSLOT_Keyspecs,0,NULL,1),.args=CLUSTER_COUNTKEYSINSLOT_Args}, {MAKE_CMD("delslots","Sets hash slots as unbound for a node.","O(N) where N is the total number of hash slot arguments","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTS_History,0,CLUSTER_DELSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTS_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTS_Args}, {MAKE_CMD("delslotsrange","Sets hash slot ranges as unbound for a node.","O(N) where N is the total number of the slots between the start slot and end slot arguments.","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTSRANGE_History,0,CLUSTER_DELSLOTSRANGE_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTSRANGE_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTSRANGE_Args}, -{MAKE_CMD("failover","Forces a replica to perform a manual failover of its master.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FAILOVER_History,0,CLUSTER_FAILOVER_Tips,0,clusterCommand,-2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FAILOVER_Keyspecs,0,NULL,1),.args=CLUSTER_FAILOVER_Args}, +{MAKE_CMD("failover","Forces a replica to perform a manual failover of its primary.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FAILOVER_History,0,CLUSTER_FAILOVER_Tips,0,clusterCommand,-2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FAILOVER_Keyspecs,0,NULL,1),.args=CLUSTER_FAILOVER_Args}, {MAKE_CMD("flushslots","Deletes all slots information from a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOTS_History,0,CLUSTER_FLUSHSLOTS_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FLUSHSLOTS_Keyspecs,0,NULL,0)}, {MAKE_CMD("forget","Removes a node from the nodes table.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FORGET_History,0,CLUSTER_FORGET_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FORGET_Keyspecs,0,NULL,1),.args=CLUSTER_FORGET_Args}, {MAKE_CMD("getkeysinslot","Returns the key names in a hash slot.","O(N) where N is the number of requested keys","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_GETKEYSINSLOT_History,0,CLUSTER_GETKEYSINSLOT_Tips,1,clusterCommand,4,CMD_STALE,0,CLUSTER_GETKEYSINSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_GETKEYSINSLOT_Args}, @@ -973,14 +973,14 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("myid","Returns the ID of a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_MYID_History,0,CLUSTER_MYID_Tips,0,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_MYID_Keyspecs,0,NULL,0)}, {MAKE_CMD("myshardid","Returns the shard ID of a node.","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_MYSHARDID_History,0,CLUSTER_MYSHARDID_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_MYSHARDID_Keyspecs,0,NULL,0)}, {MAKE_CMD("nodes","Returns the cluster configuration for a node.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,0,CLUSTER_NODES_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_NODES_Keyspecs,0,NULL,0)}, -{MAKE_CMD("replicas","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","5.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICAS_History,0,CLUSTER_REPLICAS_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICAS_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICAS_Args}, -{MAKE_CMD("replicate","Configure a node as replica of a master node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,0,CLUSTER_REPLICATE_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICATE_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICATE_Args}, +{MAKE_CMD("replicas","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","5.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICAS_History,0,CLUSTER_REPLICAS_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICAS_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICAS_Args}, +{MAKE_CMD("replicate","Configure a node as replica of a primary node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,0,CLUSTER_REPLICATE_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICATE_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICATE_Args}, {MAKE_CMD("reset","Resets a node.","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,0,CLUSTER_RESET_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE|CMD_NOSCRIPT,0,CLUSTER_RESET_Keyspecs,0,NULL,1),.args=CLUSTER_RESET_Args}, {MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)}, {MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args}, {MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args}, {MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, -{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, +{MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, {0} }; @@ -1187,6 +1187,7 @@ commandHistory CLIENT_KILL_History[] = { {"5.0.0","Replaced `slave` `TYPE` with `replica`. `slave` still supported for backward compatibility."}, {"6.2.0","`LADDR` option."}, {"8.0.0","`MAXAGE` option."}, +{"8.0.0","Replaced `master` `TYPE` with `primary`. `master` still supported for backward compatibility."}, }; #endif @@ -1204,6 +1205,7 @@ commandHistory CLIENT_KILL_History[] = { struct COMMAND_ARG CLIENT_KILL_filter_new_format_client_type_Subargs[] = { {MAKE_ARG("normal",ARG_TYPE_PURE_TOKEN,-1,"NORMAL",NULL,NULL,CMD_ARG_NONE,0,NULL)}, {MAKE_ARG("master",ARG_TYPE_PURE_TOKEN,-1,"MASTER",NULL,"3.2.0",CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("primary",ARG_TYPE_PURE_TOKEN,-1,"PRIMARY",NULL,"8.0.0",CMD_ARG_NONE,0,NULL)}, {MAKE_ARG("slave",ARG_TYPE_PURE_TOKEN,-1,"SLAVE",NULL,NULL,CMD_ARG_NONE,0,NULL)}, {MAKE_ARG("replica",ARG_TYPE_PURE_TOKEN,-1,"REPLICA",NULL,"5.0.0",CMD_ARG_NONE,0,NULL)}, {MAKE_ARG("pubsub",ARG_TYPE_PURE_TOKEN,-1,"PUBSUB",NULL,NULL,CMD_ARG_NONE,0,NULL)}, @@ -1218,7 +1220,7 @@ struct COMMAND_ARG CLIENT_KILL_filter_new_format_skipme_Subargs[] = { /* CLIENT KILL filter new_format argument table */ struct COMMAND_ARG CLIENT_KILL_filter_new_format_Subargs[] = { {MAKE_ARG("client-id",ARG_TYPE_INTEGER,-1,"ID",NULL,"2.8.12",CMD_ARG_OPTIONAL,0,NULL)}, -{MAKE_ARG("client-type",ARG_TYPE_ONEOF,-1,"TYPE",NULL,"2.8.12",CMD_ARG_OPTIONAL,5,NULL),.subargs=CLIENT_KILL_filter_new_format_client_type_Subargs}, +{MAKE_ARG("client-type",ARG_TYPE_ONEOF,-1,"TYPE",NULL,"2.8.12",CMD_ARG_OPTIONAL,6,NULL),.subargs=CLIENT_KILL_filter_new_format_client_type_Subargs}, {MAKE_ARG("username",ARG_TYPE_STRING,-1,"USER",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)}, {MAKE_ARG("addr",ARG_TYPE_STRING,-1,"ADDR",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL),.display_text="ip:port"}, {MAKE_ARG("laddr",ARG_TYPE_STRING,-1,"LADDR",NULL,"6.2.0",CMD_ARG_OPTIONAL,0,NULL),.display_text="ip:port"}, @@ -1248,6 +1250,7 @@ commandHistory CLIENT_LIST_History[] = { {"6.2.0","Added `argv-mem`, `tot-mem`, `laddr` and `redir` fields and the optional `ID` filter."}, {"7.0.0","Added `resp`, `multi-mem`, `rbs` and `rbp` fields."}, {"7.0.3","Added `ssub` field."}, +{"8.0.0","Replaced `master` `TYPE` with `primary`. `master` still supported for backward compatibility."}, }; #endif @@ -1554,8 +1557,8 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = { {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, {MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, {MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, -{MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,6,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, -{MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,6,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, +{MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, +{MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, {MAKE_CMD("no-evict","Sets the client eviction mode of the connection.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_NO_EVICT_History,0,CLIENT_NO_EVICT_Tips,0,clientCommand,3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_NO_EVICT_Keyspecs,0,NULL,1),.args=CLIENT_NO_EVICT_Args}, {MAKE_CMD("no-touch","Controls whether commands sent by the client affect the LRU/LFU of accessed keys.","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_NO_TOUCH_History,0,CLIENT_NO_TOUCH_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_NO_TOUCH_Keyspecs,0,NULL,1),.args=CLIENT_NO_TOUCH_Args}, {MAKE_CMD("pause","Suspends commands processing.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_PAUSE_History,1,CLIENT_PAUSE_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_PAUSE_Keyspecs,0,NULL,2),.args=CLIENT_PAUSE_Args}, @@ -10816,7 +10819,7 @@ struct COMMAND_STRUCT serverCommandTable[] = { {MAKE_CMD("monitor","Listens for all requests received by the server in real-time.",NULL,"1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,MONITOR_History,0,MONITOR_Tips,0,monitorCommand,1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,MONITOR_Keyspecs,0,NULL,0)}, {MAKE_CMD("psync","An internal command used in replication.",NULL,"2.8.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,PSYNC_History,0,PSYNC_Tips,0,syncCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NO_MULTI|CMD_NOSCRIPT,0,PSYNC_Keyspecs,0,NULL,2),.args=PSYNC_Args}, {MAKE_CMD("replconf","An internal command for configuring the replication stream.","O(1)","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,"server",COMMAND_GROUP_SERVER,REPLCONF_History,0,REPLCONF_Tips,0,replconfCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_ALLOW_BUSY,0,REPLCONF_Keyspecs,0,NULL,0)}, -{MAKE_CMD("replicaof","Configures a server as replica of another, or promotes it to a master.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,REPLICAOF_History,0,REPLICAOF_Tips,0,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,REPLICAOF_Keyspecs,0,NULL,1),.args=REPLICAOF_Args}, +{MAKE_CMD("replicaof","Configures a server as replica of another, or promotes it to a primary.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,REPLICAOF_History,0,REPLICAOF_Tips,0,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,REPLICAOF_Keyspecs,0,NULL,1),.args=REPLICAOF_Args}, {MAKE_CMD("restore-asking","An internal command for migrating keys in a cluster.","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,"server",COMMAND_GROUP_SERVER,RESTORE_ASKING_History,3,RESTORE_ASKING_Tips,0,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,RESTORE_ASKING_Keyspecs,1,NULL,7),.args=RESTORE_ASKING_Args}, {MAKE_CMD("role","Returns the replication role.","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ROLE_History,0,ROLE_Tips,0,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS,ROLE_Keyspecs,0,NULL,0)}, {MAKE_CMD("save","Synchronously saves the database(s) to disk.","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SAVE_History,0,SAVE_Tips,0,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_NO_MULTI,0,SAVE_Keyspecs,0,NULL,0)}, diff --git a/src/commands/client-kill.json b/src/commands/client-kill.json index 01079ad993..97fa932cd8 100644 --- a/src/commands/client-kill.json +++ b/src/commands/client-kill.json @@ -31,6 +31,10 @@ [ "8.0.0", "`MAXAGE` option." + ], + [ + "8.0.0", + "Replaced `master` `TYPE` with `primary`. `master` still supported for backward compatibility." ] ], "command_flags": [ @@ -84,6 +88,12 @@ "token": "master", "since": "3.2.0" }, + { + "name": "primary", + "type": "pure-token", + "token": "primary", + "since": "8.0.0" + }, { "name": "slave", "type": "pure-token", diff --git a/src/commands/client-list.json b/src/commands/client-list.json index f72ffaf40a..d9c0054e60 100644 --- a/src/commands/client-list.json +++ b/src/commands/client-list.json @@ -31,6 +31,10 @@ [ "7.0.3", "Added `ssub` field." + ], + [ + "8.0.0", + "Replaced `master` `TYPE` with `primary`. `master` still supported for backward compatibility." ] ], "command_flags": [ diff --git a/src/commands/cluster-failover.json b/src/commands/cluster-failover.json index f58fd562a7..9b31e310eb 100644 --- a/src/commands/cluster-failover.json +++ b/src/commands/cluster-failover.json @@ -1,6 +1,6 @@ { "FAILOVER": { - "summary": "Forces a replica to perform a manual failover of its master.", + "summary": "Forces a replica to perform a manual failover of its primary.", "complexity": "O(1)", "group": "cluster", "since": "3.0.0", diff --git a/src/commands/cluster-replicas.json b/src/commands/cluster-replicas.json index 4e8bd4204c..2fb47afea4 100644 --- a/src/commands/cluster-replicas.json +++ b/src/commands/cluster-replicas.json @@ -1,6 +1,6 @@ { "REPLICAS": { - "summary": "Lists the replica nodes of a master node.", + "summary": "Lists the replica nodes of a primary node.", "complexity": "O(N) where N is the number of replicas.", "group": "cluster", "since": "5.0.0", @@ -21,7 +21,7 @@ } ], "reply_schema": { - "description": "A list of replica nodes replicating from the specified master node provided in the same format used by CLUSTER NODES.", + "description": "A list of replica nodes replicating from the specified primary node provided in the same format used by CLUSTER NODES.", "type": "array", "items": { "type": "string", diff --git a/src/commands/cluster-replicate.json b/src/commands/cluster-replicate.json index 060d4af190..857a8022b8 100644 --- a/src/commands/cluster-replicate.json +++ b/src/commands/cluster-replicate.json @@ -1,6 +1,6 @@ { "REPLICATE": { - "summary": "Configure a node as replica of a master node.", + "summary": "Configure a node as replica of a primary node.", "complexity": "O(1)", "group": "cluster", "since": "3.0.0", diff --git a/src/commands/cluster-slaves.json b/src/commands/cluster-slaves.json index db66a1c1db..7059e544bb 100644 --- a/src/commands/cluster-slaves.json +++ b/src/commands/cluster-slaves.json @@ -1,6 +1,6 @@ { "SLAVES": { - "summary": "Lists the replica nodes of a master node.", + "summary": "Lists the replica nodes of a primary node.", "complexity": "O(N) where N is the number of replicas.", "group": "cluster", "since": "3.0.0", @@ -26,7 +26,7 @@ } ], "reply_schema": { - "description": "A list of replica nodes replicating from the specified master node provided in the same format used by CLUSTER NODES.", + "description": "A list of replica nodes replicating from the specified primary node provided in the same format used by CLUSTER NODES.", "type": "array", "items": { "type": "string", diff --git a/src/commands/cluster-slots.json b/src/commands/cluster-slots.json index ca48f371ea..5d00280f15 100644 --- a/src/commands/cluster-slots.json +++ b/src/commands/cluster-slots.json @@ -42,7 +42,7 @@ }, { "type": "array", - "description": "Master node for the slot range.", + "description": "Primary node for the slot range.", "minItems": 4, "maxItems": 4, "items": [ diff --git a/src/commands/replicaof.json b/src/commands/replicaof.json index 6ddedf2d68..cd5102171c 100644 --- a/src/commands/replicaof.json +++ b/src/commands/replicaof.json @@ -1,6 +1,6 @@ { "REPLICAOF": { - "summary": "Configures a server as replica of another, or promotes it to a master.", + "summary": "Configures a server as replica of another, or promotes it to a primary.", "complexity": "O(1)", "group": "server", "since": "5.0.0", diff --git a/src/commands/role.json b/src/commands/role.json index 1c3a4490ca..d31396faf6 100644 --- a/src/commands/role.json +++ b/src/commands/role.json @@ -28,7 +28,7 @@ "const": "master" }, { - "description": "Current replication master offset.", + "description": "Current replication primary offset.", "type": "integer" }, { @@ -65,18 +65,18 @@ "const": "slave" }, { - "description": "IP of master.", + "description": "IP of primary.", "type": "string" }, { - "description": "Port number of master.", + "description": "Port number of primary.", "type": "integer" }, { - "description": "State of the replication from the point of view of the master.", + "description": "State of the replication from the point of view of the primary.", "oneOf": [ { - "description": "The instance is in handshake with its master.", + "description": "The instance is in handshake with its primary.", "const": "handshake" }, { @@ -84,15 +84,15 @@ "const": "none" }, { - "description": "The instance needs to connect to its master.", + "description": "The instance needs to connect to its primary.", "const": "connect" }, { - "description": "The master-replica connection is in progress.", + "description": "The primary-replica connection is in progress.", "const": "connecting" }, { - "description": "The master and replica are trying to perform the synchronization.", + "description": "The primary and replica are trying to perform the synchronization.", "const": "sync" }, { @@ -106,7 +106,7 @@ ] }, { - "description": "The amount of data received from the replica so far in terms of master replication offset.", + "description": "The amount of data received from the replica so far in terms of primary replication offset.", "type": "integer" } ] @@ -120,7 +120,7 @@ "const": "sentinel" }, { - "description": "List of master names monitored by this sentinel instance.", + "description": "List of primary names monitored by this sentinel instance.", "type": "array", "items": { "type": "string" diff --git a/src/config.c b/src/config.c index 83e2a51db1..2a692ac8fa 100644 --- a/src/config.c +++ b/src/config.c @@ -2891,7 +2891,7 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, char *ptr; server.primary_port = strtol(argv[1], &ptr, 10); if (server.primary_port < 0 || server.primary_port > 65535 || *ptr != '\0') { - *err = "Invalid master port"; + *err = "Invalid primary port"; return 0; } server.primary_host = sdsnew(argv[0]); diff --git a/src/networking.c b/src/networking.c index ecdeeb6588..d6d3d4fece 100644 --- a/src/networking.c +++ b/src/networking.c @@ -32,6 +32,7 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" +#include #include #include #include @@ -586,11 +587,11 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) { to = "AOF-loading-client"; from = "server"; } else if (ctype == CLIENT_TYPE_PRIMARY) { - to = "master"; + to = "primary"; from = "replica"; } else { to = "replica"; - from = "master"; + from = "primary"; } if (len > 4096) len = 4096; @@ -2232,7 +2233,7 @@ int processInlineBuffer(client *c) { sdsfreesplitres(argv, argc); serverLog(LL_WARNING, "WARNING: Receiving inline protocol from primary, primary stream corruption? Closing the " "primary connection and discarding the cached primary."); - setProtocolError("Master using the inline protocol. Desync?", c); + setProtocolError("Primary using the inline protocol. Desync?", c); return C_ERR; } @@ -3075,7 +3076,7 @@ void clientCommand(client *c) { " Kill connections made from the specified address", " * LADDR (|:0)", " Kill connections made to specified local address", -" * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)", +" * TYPE (NORMAL|PRIMARY|REPLICA|PUBSUB)", " Kill connections by type.", " * USER ", " Kill connections authenticated by .", @@ -3087,7 +3088,7 @@ void clientCommand(client *c) { " Kill connections older than the specified age.", "LIST [options ...]", " Return information about client connections. Options:", -" * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)", +" * TYPE (NORMAL|PRIMARY|REPLICA|PUBSUB)", " Return clients of specified type.", "UNPAUSE", " Stop the current client pause, resuming traffic.", @@ -3898,7 +3899,7 @@ int getClientTypeByName(char *name) { return CLIENT_TYPE_REPLICA; else if (!strcasecmp(name, "pubsub")) return CLIENT_TYPE_PUBSUB; - else if (!strcasecmp(name, "master")) + else if (!strcasecmp(name, "master") || !strcasecmp(name, "primary")) return CLIENT_TYPE_PRIMARY; else return -1; diff --git a/src/object.c b/src/object.c index 7f93c3768d..73c3de55dd 100644 --- a/src/object.c +++ b/src/object.c @@ -1385,7 +1385,7 @@ sds getMemoryDoctorReport(void) { " * Big replica buffers: The replica output buffers in this instance are greater than 10MB for " "each replica (on average). This likely means that there is some replica instance that is " "struggling receiving data, either because it is too slow or because of networking issues. As a " - "result, data piles on the master output buffers. Please try to identify what replica is not " + "result, data piles on the primary output buffers. Please try to identify what replica is not " "receiving data correctly and why. You can use the INFO output in order to check the replicas " "delays and the CLIENT LIST command to check the output buffers of each replica.\n\n"); } diff --git a/src/replication.c b/src/replication.c index 4fe8470371..e74f66a67c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -43,8 +43,8 @@ #include #include -void replicationDiscardCachedMaster(void); -void replicationResurrectCachedMaster(connection *conn); +void replicationDiscardCachedPrimary(void); +void replicationResurrectCachedPrimary(connection *conn); void replicationSendAck(void); int replicaPutOnline(client *replica); void replicaStartCommandStream(client *replica); @@ -114,7 +114,7 @@ int bg_unlink(const char *filename) { } } -/* ---------------------------------- MASTER -------------------------------- */ +/* ---------------------------------- PRIMARY -------------------------------- */ void createReplicationBacklog(void) { serverAssert(server.repl_backlog == NULL); @@ -420,7 +420,7 @@ void feedReplicationBuffer(char *s, size_t len) { * This function is used if the instance is a primary: we use the commands * received by our clients in order to create the replication stream. * Instead if the instance is a replica and has sub-replicas attached, we use - * replicationFeedStreamFromMasterStream() */ + * replicationFeedStreamFromPrimaryStream() */ void replicationFeedReplicas(int dictid, robj **argv, int argc) { int j, len; char llstr[LONG_STR_SIZE]; @@ -925,7 +925,7 @@ void syncCommand(client *c) { replicationUnsetPrimary(); } sds client = catClientInfoString(sdsempty(), c); - serverLog(LL_NOTICE, "MASTER MODE enabled (failover request from '%s')", client); + serverLog(LL_NOTICE, "PRIMARY MODE enabled (failover request from '%s')", client); sdsfree(client); } else { addReplyError(c, "PSYNC FAILOVER replid must match my replid."); @@ -1706,7 +1706,7 @@ void replicationEmptyDbCallback(dict *d) { /* Once we have a link with the primary and the synchronization was * performed, this function materializes the primary client we store * at server.primary, starting from the specified file descriptor. */ -void replicationCreateMasterClient(connection *conn, int dbid) { +void replicationCreatePrimaryClient(connection *conn, int dbid) { server.primary = createClient(conn); if (conn) connSetReadHandler(server.primary->conn, readQueryFromClient); @@ -1793,11 +1793,11 @@ void disklessLoadDiscardTempDb(serverDb *tempDb) { * we have no way to incrementally feed our replicas after that. * We want our replicas to resync with us as well, if we have any sub-replicas. * This is useful on readSyncBulkPayload in places where we just finished transferring db. */ -void replicationAttachToNewMaster(void) { +void replicationAttachToNewPrimary(void) { /* Replica starts to apply data from new primary, we must discard the cached * primary structure. */ serverAssert(server.primary == NULL); - replicationDiscardCachedMaster(); + replicationDiscardCachedPrimary(); disconnectReplicas(); /* Force our replicas to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */ @@ -1825,7 +1825,7 @@ void readSyncBulkPayload(connection *conn) { if (server.repl_transfer_size == -1) { nread = connSyncReadLine(conn, buf, 1024, server.repl_syncio_timeout * 1000); if (nread == -1) { - serverLog(LL_WARNING, "I/O error reading bulk count from MASTER: %s", connGetLastError(conn)); + serverLog(LL_WARNING, "I/O error reading bulk count from PRIMARY: %s", connGetLastError(conn)); goto error; } else { /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and @@ -1834,7 +1834,7 @@ void readSyncBulkPayload(connection *conn) { } if (buf[0] == '-') { - serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf + 1); + serverLog(LL_WARNING, "PRIMARY aborted replication with an error: %s", buf + 1); goto error; } else if (buf[0] == '\0') { /* At this stage just a newline works as a PING in order to take @@ -1844,7 +1844,7 @@ void readSyncBulkPayload(connection *conn) { return; } else if (buf[0] != '$') { serverLog(LL_WARNING, - "Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host " + "Bad protocol from PRIMARY, the first byte is not '$' (we received '%s'), are you sure the host " "and port are right?", buf); goto error; @@ -1867,12 +1867,12 @@ void readSyncBulkPayload(connection *conn) { /* Set any repl_transfer_size to avoid entering this code path * at the next call. */ server.repl_transfer_size = 0; - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from primary with EOF %s", + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving streamed RDB from primary with EOF %s", use_diskless_load ? "to parser" : "to disk"); } else { usemark = 0; server.repl_transfer_size = strtol(buf + 1, NULL, 10); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving %lld bytes from primary %s", + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary %s", (long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk"); } return; @@ -1894,7 +1894,7 @@ void readSyncBulkPayload(connection *conn) { /* equivalent to EAGAIN */ return; } - serverLog(LL_WARNING, "I/O error trying to sync with MASTER: %s", + serverLog(LL_WARNING, "I/O error trying to sync with PRIMARY: %s", (nread == -1) ? connGetLastError(conn) : "connection lost"); cancelReplicationHandshake(1); return; @@ -1925,7 +1925,7 @@ void readSyncBulkPayload(connection *conn) { if ((nwritten = write(server.repl_transfer_fd, buf, nread)) != nread) { serverLog(LL_WARNING, "Write error or short write writing to the DB dump file " - "needed for MASTER <-> REPLICA synchronization: %s", + "needed for PRIMARY <-> REPLICA synchronization: %s", (nwritten == -1) ? strerror(errno) : "short write"); goto error; } @@ -1996,9 +1996,9 @@ void readSyncBulkPayload(connection *conn) { moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL); } else { - replicationAttachToNewMaster(); + replicationAttachToNewPrimary(); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); } @@ -2008,7 +2008,7 @@ void readSyncBulkPayload(connection *conn) { * time for non blocking loading. */ connSetReadHandler(conn, NULL); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (use_diskless_load) { rio rdb; @@ -2045,7 +2045,7 @@ void readSyncBulkPayload(connection *conn) { rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) { /* RDB loading failed. */ - serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB " + serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " "from socket, check server logs."); loadingFailed = 1; } else if (usemark) { @@ -2068,7 +2068,7 @@ void readSyncBulkPayload(connection *conn) { disklessLoadDiscardTempDb(diskless_load_tempDb); functionsLibCtxFree(temp_functions_lib_ctx); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); } else { /* Remove the half-loaded data in case we started with an empty replica. */ emptyData(-1, empty_db_flags, replicationEmptyDbCallback); @@ -2085,9 +2085,9 @@ void readSyncBulkPayload(connection *conn) { /* We will soon swap main db with tempDb and replicas will start * to apply data from new primary, we must discard the cached * primary structure and force resync of sub-replicas. */ - replicationAttachToNewMaster(); + replicationAttachToNewPrimary(); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Swapping active DB with loaded DB"); swapMainDbWithTempDb(diskless_load_tempDb); /* swap existing functions ctx with the temporary one */ @@ -2098,7 +2098,7 @@ void readSyncBulkPayload(connection *conn) { /* Delete the old db as it's useless now. */ disklessLoadDiscardTempDb(diskless_load_tempDb); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding old DB in background"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding old DB in background"); } /* Inform about db change, as replication was diskless and didn't cause a save. */ @@ -2117,7 +2117,7 @@ void readSyncBulkPayload(connection *conn) { if (fsync(server.repl_transfer_fd) == -1) { serverLog(LL_WARNING, "Failed trying to sync the temp DB to disk in " - "MASTER <-> REPLICA synchronization: %s", + "PRIMARY <-> REPLICA synchronization: %s", strerror(errno)); cancelReplicationHandshake(1); return; @@ -2128,7 +2128,7 @@ void readSyncBulkPayload(connection *conn) { if (rename(server.repl_transfer_tmpfile, server.rdb_filename) == -1) { serverLog(LL_WARNING, "Failed trying to rename the temp DB into %s in " - "MASTER <-> REPLICA synchronization: %s", + "PRIMARY <-> REPLICA synchronization: %s", server.rdb_filename, strerror(errno)); cancelReplicationHandshake(1); if (old_rdb_fd != -1) close(old_rdb_fd); @@ -2141,14 +2141,14 @@ void readSyncBulkPayload(connection *conn) { if (fsyncFileDir(server.rdb_filename) == -1) { serverLog(LL_WARNING, "Failed trying to sync DB directory %s in " - "MASTER <-> REPLICA synchronization: %s", + "PRIMARY <-> REPLICA synchronization: %s", server.rdb_filename, strerror(errno)); cancelReplicationHandshake(1); return; } if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) { - serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization " + serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " "DB from disk, check server logs."); cancelReplicationHandshake(1); if (server.rdb_del_sync_files && allPersistenceDisabled()) { @@ -2181,7 +2181,7 @@ void readSyncBulkPayload(connection *conn) { } /* Final setup of the connected replica <- primary link */ - replicationCreateMasterClient(server.repl_transfer_s, rsi.repl_stream_db); + replicationCreatePrimaryClient(server.repl_transfer_s, rsi.repl_stream_db); server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; @@ -2200,11 +2200,11 @@ void readSyncBulkPayload(connection *conn) { * or not, in order to behave correctly if they are promoted to * primaries after a failover. */ if (server.repl_backlog == NULL) createReplicationBacklog(); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Finished with success"); if (server.supervised_mode == SUPERVISED_SYSTEMD) { - serverCommunicateSystemd( - "STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n"); + serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Finished with success. Ready to accept connections " + "in read-write mode.\n"); } /* Send the initial ACK immediately to put this replica in online state. */ @@ -2314,7 +2314,7 @@ char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens * command in order to obtain the primary replid and the primary replication * global offset. * - * This function is designed to be called from syncWithMaster(), so the + * This function is designed to be called from syncWithPrimary(), so the * following assumptions are made: * * 1) We pass the function an already connected socket "fd". @@ -2345,7 +2345,7 @@ char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens * the caller should fall back to SYNC. * PSYNC_WRITE_ERROR: There was an error writing the command to the socket. * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1. - * PSYNC_TRY_LATER: Master is currently in a transient error condition. + * PSYNC_TRY_LATER: Primary is currently in a transient error condition. * * Notable side effects: * @@ -2406,10 +2406,10 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { /* Reading half */ reply = receiveSynchronousResponse(conn); - /* Master did not reply to PSYNC */ + /* Primary did not reply to PSYNC */ if (reply == NULL) { connSetReadHandler(conn, NULL); - serverLog(LL_WARNING, "Master did not reply to PSYNC, will try later"); + serverLog(LL_WARNING, "Primary did not reply to PSYNC, will try later"); return PSYNC_TRY_LATER; } @@ -2434,7 +2434,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { if (offset) offset++; } if (!replid || !offset || (offset - replid - 1) != CONFIG_RUN_ID_SIZE) { - serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); + serverLog(LL_WARNING, "Primary replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the primary supports PSYNC, but the reply * format seems wrong. To stay safe we blank the primary @@ -2469,8 +2469,8 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { new[CONFIG_RUN_ID_SIZE] = '\0'; if (strcmp(new, server.cached_primary->replid)) { - /* Master ID changed. */ - serverLog(LL_NOTICE, "Master replication ID changed to %s", new); + /* Primary ID changed. */ + serverLog(LL_NOTICE, "Primary replication ID changed to %s", new); /* Set the old ID as our ID2, up to the current offset+1. */ memcpy(server.replid2, server.cached_primary->replid, sizeof(server.replid2)); @@ -2488,7 +2488,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { /* Setup the replication to continue. */ sdsfree(reply); - replicationResurrectCachedMaster(conn); + replicationResurrectCachedPrimary(conn); /* If this instance was restarted and we read the metadata to * PSYNC from the persistence file, our replication backlog could @@ -2506,7 +2506,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { if (!strncmp(reply, "-NOMASTERLINK", 13) || !strncmp(reply, "-LOADING", 8)) { serverLog(LL_NOTICE, - "Master is currently unable to PSYNC " + "Primary is currently unable to PSYNC " "but should be in the future: %s", reply); sdsfree(reply); @@ -2518,7 +2518,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { serverLog(LL_WARNING, "Unexpected reply to PSYNC from primary: %s", reply); } else { serverLog(LL_NOTICE, - "Master does not support PSYNC or is in " + "Primary does not support PSYNC or is in " "error state (reply: %s)", reply); } @@ -2528,7 +2528,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { /* This handler fires when the non blocking connect was able to * establish a connection with the primary. */ -void syncWithMaster(connection *conn) { +void syncWithPrimary(connection *conn) { char tmpfile[256], *err = NULL; int dfd = -1, maxtries = 5; int psync_result; @@ -2552,7 +2552,7 @@ void syncWithMaster(connection *conn) { serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ - connSetReadHandler(conn, syncWithMaster); + connSetReadHandler(conn, syncWithPrimary); connSetWriteHandler(conn, NULL); server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; /* Send the PING, don't check for errors at all, we have the timeout @@ -2580,7 +2580,7 @@ void syncWithMaster(connection *conn) { sdsfree(err); goto error; } else { - serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); + serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); } sdsfree(err); err = NULL; @@ -2605,7 +2605,7 @@ void syncWithMaster(connection *conn) { if (err) goto write_error; } - /* Set the replica port, so that Master's INFO command can list the + /* Set the replica port, so that primary's INFO command can list the * replica listening port correctly. */ { int port; @@ -2621,7 +2621,7 @@ void syncWithMaster(connection *conn) { if (err) goto write_error; } - /* Set the replica ip, so that Master's INFO command can list the + /* Set the replica ip, so that primary's INFO command can list the * replica IP address port correctly in case of port forwarding or NAT. * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ if (server.replica_announce_ip) { @@ -2654,7 +2654,7 @@ void syncWithMaster(connection *conn) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; if (err[0] == '-') { - serverLog(LL_WARNING, "Unable to AUTH to MASTER: %s", err); + serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err); sdsfree(err); goto error; } @@ -2672,7 +2672,7 @@ void syncWithMaster(connection *conn) { * REPLCONF listening-port. */ if (err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Master does not understand " + "(Non critical) Primary does not understand " "REPLCONF listening-port: %s", err); } @@ -2692,7 +2692,7 @@ void syncWithMaster(connection *conn) { * REPLCONF ip-address. */ if (err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Master does not understand " + "(Non critical) Primary does not understand " "REPLCONF ip-address: %s", err); } @@ -2709,7 +2709,7 @@ void syncWithMaster(connection *conn) { * REPLCONF capa. */ if (err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Master does not understand " + "(Non critical) Primary does not understand " "REPLCONF capa: %s", err); } @@ -2752,7 +2752,7 @@ void syncWithMaster(connection *conn) { /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { serverLog(LL_WARNING, - "syncWithMaster(): state machine error, " + "syncWithPrimary(): state machine error, " "state should be RECEIVE_PSYNC but is %d", server.repl_state); goto error; @@ -2783,9 +2783,9 @@ void syncWithMaster(connection *conn) { * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { - serverCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to " + serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to " "accept connections in read-write mode.\n"); } return; @@ -2797,7 +2797,7 @@ void syncWithMaster(connection *conn) { if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE, "Retrying with SYNC..."); if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { - serverLog(LL_WARNING, "I/O error writing to MASTER: %s", connGetLastError(conn)); + serverLog(LL_WARNING, "I/O error writing to PRIMARY: %s", connGetLastError(conn)); goto error; } } @@ -2811,7 +2811,7 @@ void syncWithMaster(connection *conn) { sleep(1); } if (dfd == -1) { - serverLog(LL_WARNING, "Opening the temp file needed for MASTER <-> REPLICA synchronization: %s", + serverLog(LL_WARNING, "Opening the temp file needed for PRIMARY <-> REPLICA synchronization: %s", strerror(errno)); goto error; } @@ -2835,7 +2835,7 @@ void syncWithMaster(connection *conn) { return; no_response_error: /* Handle receiveSynchronousResponse() error when primary has no reply */ - serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake"); + serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); /* Fall through to regular error handling */ error: @@ -2855,11 +2855,11 @@ void syncWithMaster(connection *conn) { goto error; } -int connectWithMaster(void) { +int connectWithPrimary(void) { server.repl_transfer_s = connCreate(connTypeOfReplication()); if (connConnect(server.repl_transfer_s, server.primary_host, server.primary_port, server.bind_source_addr, - syncWithMaster) == C_ERR) { - serverLog(LL_WARNING, "Unable to connect to MASTER: %s", connGetLastError(server.repl_transfer_s)); + syncWithPrimary) == C_ERR) { + serverLog(LL_WARNING, "Unable to connect to PRIMARY: %s", connGetLastError(server.repl_transfer_s)); connClose(server.repl_transfer_s); server.repl_transfer_s = NULL; return C_ERR; @@ -2868,7 +2868,7 @@ int connectWithMaster(void) { server.repl_transfer_lastio = server.unixtime; server.repl_state = REPL_STATE_CONNECTING; - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync started"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync started"); return C_OK; } @@ -2876,7 +2876,7 @@ int connectWithMaster(void) { * in progress to undo it. * Never call this function directly, use cancelReplicationHandshake() instead. */ -void undoConnectWithMaster(void) { +void undoConnectWithPrimary(void) { connClose(server.repl_transfer_s); server.repl_transfer_s = NULL; } @@ -2886,7 +2886,7 @@ void undoConnectWithMaster(void) { */ void replicationAbortSyncTransfer(void) { serverAssert(server.repl_state == REPL_STATE_TRANSFER); - undoConnectWithMaster(); + undoConnectWithPrimary(); if (server.repl_transfer_fd != -1) { close(server.repl_transfer_fd); bg_unlink(server.repl_transfer_tmpfile); @@ -2909,7 +2909,7 @@ int cancelReplicationHandshake(int reconnect) { replicationAbortSyncTransfer(); server.repl_state = REPL_STATE_CONNECT; } else if (server.repl_state == REPL_STATE_CONNECTING || replicaIsInHandshakeState()) { - undoConnectWithMaster(); + undoConnectWithPrimary(); server.repl_state = REPL_STATE_CONNECT; } else { return 0; @@ -2919,8 +2919,8 @@ int cancelReplicationHandshake(int reconnect) { /* try to re-connect without waiting for replicationCron, this is needed * for the "diskless loading short read" test. */ - serverLog(LL_NOTICE, "Reconnecting to MASTER %s:%d after failure", server.primary_host, server.primary_port); - connectWithMaster(); + serverLog(LL_NOTICE, "Reconnecting to PRIMARY %s:%d after failure", server.primary_host, server.primary_port); + connectWithPrimary(); return 1; } @@ -2937,7 +2937,7 @@ void replicationSetPrimary(char *ip, int port) { disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */ /* Setting primary_host only after the call to freeClient since it calls - * replicationHandleMasterDisconnection which can trigger a re-connect + * replicationHandlePrimaryDisconnection which can trigger a re-connect * directly from within that call. */ server.primary_host = sdsnew(ip); server.primary_port = port; @@ -2955,7 +2955,7 @@ void replicationSetPrimary(char *ip, int port) { /* Before destroying our primary state, create a cached primary using * our own parameters, to later PSYNC with the new primary. */ if (was_primary) { - replicationDiscardCachedMaster(); + replicationDiscardCachedPrimary(); replicationCachePrimaryUsingMyself(); } @@ -2968,8 +2968,8 @@ void replicationSetPrimary(char *ip, int port) { moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_DOWN, NULL); server.repl_state = REPL_STATE_CONNECT; - serverLog(LL_NOTICE, "Connecting to MASTER %s:%d", server.primary_host, server.primary_port); - connectWithMaster(); + serverLog(LL_NOTICE, "Connecting to PRIMARY %s:%d", server.primary_host, server.primary_port); + connectWithPrimary(); } /* Cancel replication, setting the instance as a primary itself. */ @@ -2981,11 +2981,11 @@ void replicationUnsetPrimary(void) { moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_DOWN, NULL); /* Clear primary_host first, since the freeClient calls - * replicationHandleMasterDisconnection which can attempt to re-connect. */ + * replicationHandlePrimaryDisconnection which can attempt to re-connect. */ sdsfree(server.primary_host); server.primary_host = NULL; if (server.primary) freeClient(server.primary); - replicationDiscardCachedMaster(); + replicationDiscardCachedPrimary(); cancelReplicationHandshake(0); /* When a replica is turned into a primary, the current replication ID * (that was inherited from the primary at synchronization time) is @@ -3043,8 +3043,8 @@ void replicationHandlePrimaryDisconnection(void) { /* Try to re-connect immediately rather than wait for replicationCron * waiting 1 second may risk backlog being recycled. */ if (server.primary_host) { - serverLog(LL_NOTICE, "Reconnecting to MASTER %s:%d", server.primary_host, server.primary_port); - connectWithMaster(); + serverLog(LL_NOTICE, "Reconnecting to PRIMARY %s:%d", server.primary_host, server.primary_port); + connectWithPrimary(); } } @@ -3067,7 +3067,7 @@ void replicaofCommand(client *c) { if (server.primary_host) { replicationUnsetPrimary(); sds client = catClientInfoString(sdsempty(), c); - serverLog(LL_NOTICE, "MASTER MODE enabled (user request from '%s')", client); + serverLog(LL_NOTICE, "PRIMARY MODE enabled (user request from '%s')", client); sdsfree(client); } } else { @@ -3184,7 +3184,7 @@ void replicationSendAck(void) { } } -/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ +/* ---------------------- PRIMARY CACHING FOR PSYNC -------------------------- */ /* In order to implement partial synchronization we need to be able to cache * our primary's client structure after a transient disconnection. @@ -3198,10 +3198,10 @@ void replicationSendAck(void) { * * The other functions that will deal with the cached primary are: * - * replicationDiscardCachedMaster() that will make sure to kill the client + * replicationDiscardCachedPrimary() that will make sure to kill the client * as for some reason we don't want to use it in the future. * - * replicationResurrectCachedMaster() that is used after a successful PSYNC + * replicationResurrectCachedPrimary() that is used after a successful PSYNC * handshake in order to reactivate the cached primary. */ void replicationCachePrimary(client *c) { @@ -3227,7 +3227,7 @@ void replicationCachePrimary(client *c) { resetClient(c); /* Save the primary. Server.primary will be set to null later by - * replicationHandleMasterDisconnection(). */ + * replicationHandlePrimaryDisconnection(). */ server.cached_primary = server.primary; /* Invalidate the Peer ID cache. */ @@ -3262,14 +3262,14 @@ void replicationCachePrimaryUsingMyself(void) { "the new primary with just a partial transfer."); /* This will be used to populate the field server.primary->reploff - * by replicationCreateMasterClient(). We'll later set the created + * by replicationCreatePrimaryClient(). We'll later set the created * primary as server.cached_primary, so the replica will use such * offset for PSYNC. */ server.primary_initial_offset = server.primary_repl_offset; /* The primary client we create can be set to any DBID, because * the new primary will start its replication stream with SELECT. */ - replicationCreateMasterClient(NULL, -1); + replicationCreatePrimaryClient(NULL, -1); /* Use our own ID / offset. */ memcpy(server.primary->replid, server.replid, sizeof(server.replid)); @@ -3282,7 +3282,7 @@ void replicationCachePrimaryUsingMyself(void) { /* Free a cached primary, called when there are no longer the conditions for * a partial resync on reconnection. */ -void replicationDiscardCachedMaster(void) { +void replicationDiscardCachedPrimary(void) { if (server.cached_primary == NULL) return; serverLog(LL_NOTICE, "Discarding previously cached primary state."); @@ -3297,7 +3297,7 @@ void replicationDiscardCachedMaster(void) { * This function is called when successfully setup a partial resynchronization * so the stream of data that we'll receive will start from where this * primary left. */ -void replicationResurrectCachedMaster(connection *conn) { +void replicationResurrectCachedPrimary(connection *conn) { server.primary = server.cached_primary; server.cached_primary = NULL; server.primary->conn = conn; @@ -3363,8 +3363,8 @@ int checkGoodReplicasStatus(void) { /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- * Synchronous replication design can be summarized in points: * - * - Masters have a global replication offset, used by PSYNC. - * - Master increment the offset every time new commands are sent to replicas. + * - Primary have a global replication offset, used by PSYNC. + * - Primary increment the offset every time new commands are sent to replicas. * - Replicas ping back primary with the offset processed so far. * * So synchronous replication adds a new WAIT command in the form: @@ -3620,14 +3620,14 @@ void replicationCron(void) { /* Non blocking connection timeout? */ if (server.primary_host && (server.repl_state == REPL_STATE_CONNECTING || replicaIsInHandshakeState()) && (time(NULL) - server.repl_transfer_lastio) > server.repl_timeout) { - serverLog(LL_WARNING, "Timeout connecting to the MASTER..."); + serverLog(LL_WARNING, "Timeout connecting to the PRIMARY..."); cancelReplicationHandshake(1); } /* Bulk transfer I/O timeout? */ if (server.primary_host && server.repl_state == REPL_STATE_TRANSFER && (time(NULL) - server.repl_transfer_lastio) > server.repl_timeout) { - serverLog(LL_WARNING, "Timeout receiving bulk data from MASTER... If the problem persists try to set the " + serverLog(LL_WARNING, "Timeout receiving bulk data from PRIMARY... If the problem persists try to set the " "'repl-timeout' parameter in redis.conf to a larger value."); cancelReplicationHandshake(1); } @@ -3635,14 +3635,14 @@ void replicationCron(void) { /* Timed out primary when we are an already connected replica? */ if (server.primary_host && server.repl_state == REPL_STATE_CONNECTED && (time(NULL) - server.primary->last_interaction) > server.repl_timeout) { - serverLog(LL_WARNING, "MASTER timeout: no data nor PING received..."); + serverLog(LL_WARNING, "PRIMARY timeout: no data nor PING received..."); freeClient(server.primary); } - /* Check if we should connect to a MASTER */ + /* Check if we should connect to a PRIMARY */ if (server.repl_state == REPL_STATE_CONNECT) { - serverLog(LL_NOTICE, "Connecting to MASTER %s:%d", server.primary_host, server.primary_port); - connectWithMaster(); + serverLog(LL_NOTICE, "Connecting to PRIMARY %s:%d", server.primary_host, server.primary_port); + connectWithPrimary(); } /* Send ACK to primary from time to time. @@ -4039,7 +4039,7 @@ void failoverCommand(client *c) { /* Failover cron function, checks coordinated failover state. * - * Implementation note: The current implementation calls replicationSetMaster() + * Implementation note: The current implementation calls replicationSetPrimary() * to start the failover request, this has some unintended side effects if the * failover doesn't work like blocked clients will be unblocked and replicas will * be disconnected. This could be optimized further. diff --git a/src/rio.c b/src/rio.c index 569d5ddcf0..408a931d17 100644 --- a/src/rio.c +++ b/src/rio.c @@ -309,7 +309,7 @@ void rioFreeConn(rio *r, sds *remaining) { } /* ------------------- File descriptor implementation ------------------ - * This target is used to write the RDB file to pipe, when the master just + * This target is used to write the RDB file to pipe, when the primary just * streams the data to the replicas without creating an RDB on-disk image * (diskless replication option). * It only implements writes. */ diff --git a/src/valkey-benchmark.c b/src/valkey-benchmark.c index 802b4c5735..5fe707510f 100644 --- a/src/valkey-benchmark.c +++ b/src/valkey-benchmark.c @@ -837,7 +837,7 @@ static void showLatencyReport(void) { printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); if (config.cluster_mode) { - printf(" cluster mode: yes (%d masters)\n", config.cluster_node_count); + printf(" cluster mode: yes (%d primaries)\n", config.cluster_node_count); int m; for (m = 0; m < config.cluster_node_count; m++) { clusterNode *node = config.cluster_nodes[m]; @@ -1202,7 +1202,7 @@ static int fetchClusterConfiguration(void) { } } if (node->slots_count == 0) { - fprintf(stderr, "WARNING: Master node %s:%d has no slots, skipping...\n", node->ip, node->port); + fprintf(stderr, "WARNING: Primary node %s:%d has no slots, skipping...\n", node->ip, node->port); continue; } if (!addClusterNode(node)) { @@ -1747,7 +1747,7 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count); exit(1); } - printf("Cluster has %d master nodes:\n\n", config.cluster_node_count); + printf("Cluster has %d primary nodes:\n\n", config.cluster_node_count); int i = 0; for (; i < config.cluster_node_count; i++) { clusterNode *node = config.cluster_nodes[i]; @@ -1755,7 +1755,7 @@ int main(int argc, char **argv) { fprintf(stderr, "Invalid cluster node #%d\n", i); exit(1); } - printf("Master %d: ", i); + printf("Primary %d: ", i); if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port); node->redis_config = getServerConfig(node->ip, node->port, NULL); diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 0f92a5fee7..5d506383c0 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2603,9 +2603,7 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i], "--lru-test") && !lastarg) { config.lru_test_mode = 1; config.lru_test_sample_size = strtoll(argv[++i], NULL, 10); - } else if (!strcmp(argv[i], "--slave")) { - config.replica_mode = 1; - } else if (!strcmp(argv[i], "--replica")) { + } else if (!strcmp(argv[i], "--slave") || !strcmp(argv[i], "--replica")) { config.replica_mode = 1; } else if (!strcmp(argv[i], "--stat")) { config.stat_mode = 1; @@ -3148,11 +3146,11 @@ void cliLoadPreferences(void) { * history file. Currently these commands are include: * - AUTH * - ACL DELUSER, ACL SETUSER, ACL GETUSER - * - CONFIG SET masterauth/masteruser/tls-key-file-pass/tls-client-key-file-pass/requirepass + * - CONFIG SET primaryauth/primaryuser/tls-key-file-pass/tls-client-key-file-pass/requirepass * - HELLO with [AUTH username password] * - MIGRATE with [AUTH password] or [AUTH2 username password] * - SENTINEL CONFIG SET sentinel-pass password, SENTINEL CONFIG SET sentinel-user username - * - SENTINEL SET auth-pass password, SENTINEL SET auth-user username */ + * - SENTINEL SET auth-pass password, SENTINEL SET auth-user username */ static int isSensitiveCommand(int argc, char **argv) { if (!strcasecmp(argv[0], "auth")) { return 1; @@ -3202,8 +3200,8 @@ static int isSensitiveCommand(int argc, char **argv) { (!strcasecmp(argv[3], "sentinel-pass") || !strcasecmp(argv[3], "sentinel-user"))) { return 1; } - /* SENTINEL SET auth-pass password - * SENTINEL SET auth-user username */ + /* SENTINEL SET auth-pass password + * SENTINEL SET auth-user username */ if (!strcasecmp(argv[1], "set") && (!strcasecmp(argv[3], "auth-pass") || !strcasecmp(argv[3], "auth-user"))) { return 1; } @@ -4165,7 +4163,7 @@ static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes, clusterManagerNode **offenders = NULL; int score = clusterManagerGetAntiAffinityScore(ipnodes, ip_count, NULL, NULL); if (score == 0) goto cleanup; - clusterManagerLogInfo(">>> Trying to optimize slaves allocation " + clusterManagerLogInfo(">>> Trying to optimize replicas allocation " "for anti-affinity\n"); int node_len = cluster_manager.nodes->len; int maxiter = 500 * node_len; // Effort is proportional to cluster size... @@ -4219,9 +4217,9 @@ static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes, if (perfect) msg = "[OK] Perfect anti-affinity obtained!"; else if (score >= 10000) - msg = ("[WARNING] Some slaves are in the same host as their master"); + msg = ("[WARNING] Some replicsa are in the same host as their primary"); else - msg = ("[WARNING] Some slaves of the same master are in the same host"); + msg = ("[WARNING] Some replicas of the same primary are in the same host"); clusterManagerLog(log_level, "%s\n", msg); cleanup: zfree(offenders); @@ -4447,13 +4445,13 @@ static void clusterManagerShowClusterInfo(void) { return; }; if (reply != NULL) freeReplyObject(reply); - printf("%s:%d (%s...) -> %lld keys | %d slots | %d slaves.\n", node->ip, node->port, name, dbsize, + printf("%s:%d (%s...) -> %lld keys | %d slots | %d replicas.\n", node->ip, node->port, name, dbsize, node->slots_count, replicas); primaries++; keys += dbsize; } } - clusterManagerLogOk("[OK] %lld keys in %d masters.\n", keys, primaries); + clusterManagerLogOk("[OK] %lld keys in %d primaries.\n", keys, primaries); float keys_per_slot = keys / (float)CLUSTER_MANAGER_SLOTS; printf("%.2f keys per slot on average.\n", keys_per_slot); } @@ -4993,7 +4991,8 @@ clusterManagerMoveSlot(clusterManagerNode *source, clusterManagerNode *target, i * unblocked with the role change error. */ success = clusterManagerSetSlot(source, target, slot, "node", err); if (!success && err) { - const char *acceptable[] = {"ERR Please use SETSLOT only with masters.", "UNBLOCKED"}; + const char *acceptable[] = {"ERR Please use SETSLOT only with masters.", + "ERR Please use SETSLOT only with primaries.", "UNBLOCKED"}; for (size_t i = 0; i < sizeof(acceptable) / sizeof(acceptable[0]); i++) { if (!strncmp(*err, acceptable[i], strlen(acceptable[i]))) { zfree(*err); @@ -5264,7 +5263,7 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, char * currentNode->flags |= CLUSTER_MANAGER_FLAG_DISCONNECT; else if (strcmp(flag, "fail") == 0) currentNode->flags |= CLUSTER_MANAGER_FLAG_FAIL; - else if (strcmp(flag, "slave") == 0) { + else if ((strcmp(flag, "slave") == 0) || (strcmp(flag, "replica") == 0)) { currentNode->flags |= CLUSTER_MANAGER_FLAG_REPLICA; if (primary_id != NULL) { if (currentNode->replicate) sdsfree(currentNode->replicate); @@ -5352,7 +5351,7 @@ static int clusterManagerLoadInfoFromNode(clusterManagerNode *node) { clusterManagerNode *primary = clusterManagerNodeByName(n->replicate); if (primary == NULL) { clusterManagerLogWarn("*** WARNING: %s:%d claims to be " - "slave of unknown node ID %s.\n", + "replica of unknown node ID %s.\n", n->ip, n->port, n->replicate); } else primary->replicas_count++; @@ -5712,10 +5711,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { if (cluster_manager.unreachable_primaries > 0 && !force_fix) { clusterManagerLogWarn( - "*** Fixing slots coverage with %d unreachable masters is dangerous: valkey-cli will assume that slots " - "about masters that are not reachable are not covered, and will try to reassign them to the reachable " + "*** Fixing slots coverage with %d unreachable primaries is dangerous: valkey-cli will assume that slots " + "about primaries that are not reachable are not covered, and will try to reassign them to the reachable " "nodes. This can cause data loss and is rarely what you want to do. If you really want to proceed use the " - "--cluster-fix-with-unreachable-masters option.\n", + "--cluster-fix-with-unreachable-primaries option.\n", cluster_manager.unreachable_primaries); exit(1); } @@ -5906,10 +5905,10 @@ static int clusterManagerFixOpenSlot(int slot) { if (cluster_manager.unreachable_primaries > 0 && !force_fix) { clusterManagerLogWarn( - "*** Fixing open slots with %d unreachable masters is dangerous: valkey-cli will assume that slots about " - "masters that are not reachable are not covered, and will try to reassign them to the reachable nodes. " + "*** Fixing open slots with %d unreachable primaries is dangerous: valkey-cli will assume that slots about " + "primaries that are not reachable are not covered, and will try to reassign them to the reachable nodes. " "This can cause data loss and is rarely what you want to do. If you really want to proceed use the " - "--cluster-fix-with-unreachable-masters option.\n", + "--cluster-fix-with-unreachable-primaries option.\n", cluster_manager.unreachable_primaries); exit(1); } @@ -6420,7 +6419,7 @@ static int clusterManagerCheckCluster(int quiet) { static clusterManagerNode *clusterNodeForResharding(char *id, clusterManagerNode *target, int *raise_err) { clusterManagerNode *node = NULL; const char *invalid_node_msg = "*** The specified node (%s) is not known " - "or not a master, please retry.\n"; + "or not a primary, please retry.\n"; node = clusterManagerNodeByName(id); *raise_err = 0; if (!node || node->flags & CLUSTER_MANAGER_FLAG_REPLICA) { @@ -6642,7 +6641,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) { int primaries_count = CLUSTER_MANAGER_PRIMARIES_COUNT(node_len, replicas); if (primaries_count < 3) { clusterManagerLogErr("*** ERROR: Invalid configuration for cluster creation.\n" - "*** Valkey Cluster requires at least 3 master nodes.\n" + "*** Valkey Cluster requires at least 3 primary nodes.\n" "*** This is not possible with %d nodes and %d replicas per node.", node_len, replicas); clusterManagerLogErr("\n*** At least %d nodes are required.\n", 3 * (replicas + 1)); @@ -6696,7 +6695,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) { long last = lround(cursor + slots_per_node - 1); if (last > CLUSTER_MANAGER_SLOTS || i == (primaries_count - 1)) last = CLUSTER_MANAGER_SLOTS - 1; if (last < first) last = first; - printf("Master[%d] -> Slots %ld - %ld\n", i, first, last); + printf("Primary[%d] -> Slots %ld - %ld\n", i, first, last); primary->slots_count = 0; for (j = first; j <= last; j++) { primary->slots[j] = 1; @@ -6907,13 +6906,13 @@ static int clusterManagerCommandAddNode(int argc, char **argv) { if (primary_id != NULL) { primary_node = clusterManagerNodeByName(primary_id); if (primary_node == NULL) { - clusterManagerLogErr("[ERR] No such master ID %s\n", primary_id); + clusterManagerLogErr("[ERR] No such primary ID %s\n", primary_id); return 0; } } else { primary_node = clusterManagerNodeWithLeastReplicas(); assert(primary_node != NULL); - printf("Automatically selected master %s:%d\n", primary_node->ip, primary_node->port); + printf("Automatically selected primary %s:%d\n", primary_node->ip, primary_node->port); } } @@ -7336,7 +7335,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) { float w = atof(++p); clusterManagerNode *n = clusterManagerNodeByAbbreviatedName(name); if (n == NULL) { - clusterManagerLogErr("*** No such master node %s\n", name); + clusterManagerLogErr("*** No such primary node %s\n", name); result = 0; goto cleanup; } @@ -8130,13 +8129,13 @@ unsigned long long sendSync(redisContext *c, int send_sync, char *out_eof, int * if (send_sync) { /* Send the SYNC command. */ if (cliWriteConn(c, "SYNC\r\n", 6) != 6) { - fprintf(stderr, "Error writing to master\n"); + fprintf(stderr, "Error writing to primary\n"); exit(1); } } else { /* We have written the command into c->obuf before. */ if (cliWriteConn(c, "", 0) != 0) { - fprintf(stderr, "Error writing to master\n"); + fprintf(stderr, "Error writing to primary\n"); exit(1); } } @@ -8155,7 +8154,7 @@ unsigned long long sendSync(redisContext *c, int send_sync, char *out_eof, int * } *p = '\0'; if (buf[0] == '-') { - fprintf(stderr, "SYNC with master failed: %s\n", buf); + fprintf(stderr, "SYNC with primary failed: %s\n", buf); exit(1); } @@ -8207,18 +8206,18 @@ static void replicaMode(int send_sync) { memset(lastbytes, 0, RDB_EOF_MARK_SIZE); usemark = 1; fprintf(stderr, - "%s with master, discarding " + "%s with primary, discarding " "bytes of bulk transfer until EOF marker...\n", info); } else if (out_full_mode == 1 && payload != 0) { /* SYNC without EOF marker or PSYNC +FULLRESYNC. */ fprintf(stderr, - "%s with master, discarding %llu " + "%s with primary, discarding %llu " "bytes of bulk transfer...\n", info, payload); } else if (out_full_mode == 0 && payload == 0) { /* PSYNC +CONTINUE (no RDB payload). */ - fprintf(stderr, "%s with master...\n", info); + fprintf(stderr, "%s with primary...\n", info); } /* Discard the payload. */ @@ -8247,12 +8246,12 @@ static void replicaMode(int send_sync) { if (usemark) { unsigned long long offset = ULLONG_MAX - payload; - fprintf(stderr, "%s done after %llu bytes. Logging commands from master.\n", info, offset); + fprintf(stderr, "%s done after %llu bytes. Logging commands from primary.\n", info, offset); /* put the replica online */ sleep(1); sendReplconf("ACK", "0"); } else - fprintf(stderr, "%s done. Logging commands from master.\n", info); + fprintf(stderr, "%s done. Logging commands from primary.\n", info); /* Now we can use hiredis to read the incoming protocol. */ config.output = OUTPUT_CSV; @@ -8289,11 +8288,11 @@ static void getRDB(clusterManagerNode *node) { memset(lastbytes, 0, RDB_EOF_MARK_SIZE); usemark = 1; fprintf(stderr, - "SYNC sent to master, writing bytes of bulk transfer " + "SYNC sent to primary, writing bytes of bulk transfer " "until EOF marker to '%s'\n", filename); } else { - fprintf(stderr, "SYNC sent to master, writing %llu bytes to '%s'\n", payload, filename); + fprintf(stderr, "SYNC sent to primary, writing %llu bytes to '%s'\n", payload, filename); } int write_to_stdout = !strcmp(filename, "-"); diff --git a/src/valkeymodule.h b/src/valkeymodule.h index a49f83b766..16f7929081 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -1469,7 +1469,7 @@ VALKEYMODULE_API int (*ValkeyModule_SendClusterMessage)(ValkeyModuleCtx *ctx, VALKEYMODULE_API int (*ValkeyModule_GetClusterNodeInfo)(ValkeyModuleCtx *ctx, const char *id, char *ip, - char *master_id, + char *primary_id, int *port, int *flags) VALKEYMODULE_ATTR; VALKEYMODULE_API char **(*ValkeyModule_GetClusterNodesList)(ValkeyModuleCtx *ctx, size_t *numnodes)VALKEYMODULE_ATTR; diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index e64394ad1b..f56fe0a1dc 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -51,7 +51,7 @@ start_server {tags {"repl network external:skip"}} { test {Slave is able to detect timeout during handshake} { wait_for_condition 50 1000 { - [log_file_matches $slave_log "*Timeout connecting to the MASTER*"] + [log_file_matches $slave_log "*Timeout connecting to the PRIMARY*"] } else { fail "Replica is not able to detect timeout" } @@ -1390,7 +1390,7 @@ start_server {tags {"repl" "external:skip"}} { # Check we got the warning logs about the GET command. verify_log_message 0 "*Replica generated a reply to command 'get', disconnecting it: *" $lines - verify_log_message 0 "*== CRITICAL == This master is sending an error to its replica: *" $lines + verify_log_message 0 "*== CRITICAL == This primary is sending an error to its replica: *" $lines verify_log_message 0 "*Replica can't interact with the keyspace*" $lines $rd close diff --git a/tests/unit/auth.tcl b/tests/unit/auth.tcl index ee5d2db0fc..5c2071c176 100644 --- a/tests/unit/auth.tcl +++ b/tests/unit/auth.tcl @@ -74,7 +74,7 @@ start_server {tags {"auth_binary_password external:skip"}} { $slave slaveof $master_host $master_port # Verify replica is not able to sync with master - wait_for_log_messages 0 {"*Unable to AUTH to MASTER*"} $loglines 1000 10 + wait_for_log_messages 0 {"*Unable to AUTH to PRIMARY*"} $loglines 1000 10 assert_equal {down} [s 0 master_link_status] # Test replica with the correct primaryauth diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 8ceb03b502..a12a3ba23d 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -827,9 +827,9 @@ start_server {tags {"introspection"}} { # Something like `valkey-server --some-config --config-value1 --config-value2 --loglevel debug` would break, # because if you want to pass a value to a config starting with `--`, it can only be a single value. catch {exec src/valkey-server --replicaof 127.0.0.1 abc} err - assert_match {*'replicaof "127.0.0.1" "abc"'*Invalid master port*} $err + assert_match {*'replicaof "127.0.0.1" "abc"'*Invalid primary port*} $err catch {exec src/valkey-server --replicaof --127.0.0.1 abc} err - assert_match {*'replicaof "--127.0.0.1" "abc"'*Invalid master port*} $err + assert_match {*'replicaof "--127.0.0.1" "abc"'*Invalid primary port*} $err catch {exec src/valkey-server --replicaof --127.0.0.1 --abc} err assert_match {*'replicaof "--127.0.0.1"'*wrong number of arguments*} $err } {} {external:skip} From b33f932c5670749bea67933c10578336c67f16e6 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Mon, 17 Jun 2024 21:08:08 -0700 Subject: [PATCH 06/53] Add missing commas from debug command (#662) The missing commas caused the `DEBUG HELP` to be compressed onto a single line. Signed-off-by: Madelyn Olson --- src/debug.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/debug.c b/src/debug.c index d9fe93c7d4..c625ab5150 100644 --- a/src/debug.c +++ b/src/debug.c @@ -430,8 +430,8 @@ void debugCommand(client *c) { "DROP-CLUSTER-PACKET-FILTER ", " Drop all packets that match the filtered type. Set to -1 allow all packets.", "CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>", - " This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type." - " When set to 1, the cluster link is closed after dropping a packet based on the filter." + " This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type.", + " When set to 1, the cluster link is closed after dropping a packet based on the filter.", "OOM", " Crash the server simulating an out-of-memory error.", "PANIC", From a2cc2fe26ddf5fac46476ec1f958dea56e35a513 Mon Sep 17 00:00:00 2001 From: Binbin Date: Tue, 18 Jun 2024 12:18:57 +0800 Subject: [PATCH 07/53] Fix memory leak when loading slot migrations states fails (#658) When we goto eoferr, we need to release the auxkey and auxval, this is a cleanup, also explicitly check that decoder return value is C_ERR. Introduced in #586. Signed-off-by: Binbin --- src/rdb.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/rdb.c b/src/rdb.c index ad7da17ea1..6ce7871031 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3149,7 +3149,11 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (de != NULL) { handled = 1; rdbAuxFieldCodec *codec = (rdbAuxFieldCodec *)dictGetVal(de); - if (codec->decoder(rdbflags, auxval->ptr) < 0) goto eoferr; + if (codec->decoder(rdbflags, auxval->ptr) == C_ERR) { + decrRefCount(auxkey); + decrRefCount(auxval); + goto eoferr; + } } } From be2c3216824207613cf00b1e5579ee510b7fadc2 Mon Sep 17 00:00:00 2001 From: ranshid <88133677+ranshid@users.noreply.github.com> Date: Tue, 18 Jun 2024 22:04:06 +0300 Subject: [PATCH 08/53] Support RDB compatability with Redis 7.2.4 RDB format (#665) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR makes our current RDB format compatible with the Redis 7.2.4 RDB format. there are 2 changes introduced in this PR: 1. Move back the RDB version to 11 2. Make slot info section persist as AUX data instead of dedicated section. We have introduced slot-info as part of the work to replace cluster metadata with slot specific dictionaries. This caused us to bump the RDB version and thus we prevent downgrade (which is conceptualy O.K but better be prevented). We do not require the slot-info section to exist, so making it an AUX section will help suppport version downgrade from Valkey 8. fixes: [#645](https://github.com/valkey-io/valkey/issues/645) NOTE: tested manually by: 1. connecting Redis 7.2.4 replica to a Valkey 8(RC) 2. upgrade/downgrade Redis 7.2.4 cluster and Valkey 8(RC) cluster --------- Signed-off-by: ranshid Co-authored-by: Viktor Söderqvist --- src/rdb.c | 47 ++++++++++++++++++++++-------------------- src/rdb.h | 3 +-- src/valkey-check-rdb.c | 6 ------ 3 files changed, 26 insertions(+), 30 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 6ce7871031..07fc70c16d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1349,15 +1349,14 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { int curr_slot = kvstoreIteratorGetCurrentDictIndex(kvs_it); /* Save slot info. */ if (server.cluster_enabled && curr_slot != last_slot) { - if ((res = rdbSaveType(rdb, RDB_OPCODE_SLOT_INFO)) < 0) goto werr; - written += res; - if ((res = rdbSaveLen(rdb, curr_slot)) < 0) goto werr; - written += res; - if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->keys, curr_slot))) < 0) goto werr; - written += res; - if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->expires, curr_slot))) < 0) goto werr; - written += res; + sds slot_info = sdscatprintf(sdsempty(), "%i,%lu,%lu", curr_slot, kvstoreDictSize(db->keys, curr_slot), + kvstoreDictSize(db->expires, curr_slot)); + if ((res = rdbSaveAuxFieldStrStr(rdb, "slot-info", slot_info)) < 0) { + sdsfree(slot_info); + goto werr; + } last_slot = curr_slot; + sdsfree(slot_info); } sds keystr = dictGetKey(de); robj key, *o = dictGetVal(de); @@ -3078,20 +3077,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if ((expires_size = rdbLoadLen(rdb, NULL)) == RDB_LENERR) goto eoferr; should_expand_db = 1; continue; /* Read next opcode. */ - } else if (type == RDB_OPCODE_SLOT_INFO) { - uint64_t slot_id, slot_size, expires_slot_size; - if ((slot_id = rdbLoadLen(rdb, NULL)) == RDB_LENERR) goto eoferr; - if ((slot_size = rdbLoadLen(rdb, NULL)) == RDB_LENERR) goto eoferr; - if ((expires_slot_size = rdbLoadLen(rdb, NULL)) == RDB_LENERR) goto eoferr; - if (!server.cluster_enabled) { - continue; /* Ignore gracefully. */ - } - /* In cluster mode we resize individual slot specific dictionaries based on the number of keys that slot - * holds. */ - kvstoreDictExpand(db->keys, slot_id, slot_size); - kvstoreDictExpand(db->expires, slot_id, expires_slot_size); - should_expand_db = 0; - continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB * which is backward compatible. Implementations of RDB loading @@ -3141,6 +3126,24 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (isbase) serverLog(LL_NOTICE, "RDB is base AOF"); } else if (!strcasecmp(auxkey->ptr, "redis-bits")) { /* Just ignored. */ + } else if (!strcasecmp(auxkey->ptr, "slot-info")) { + int slot_id; + unsigned long slot_size, expires_slot_size; + /* Try to parse the slot information. In case the number of parsed arguments is smaller than expected + * we'll fail the RDB load. */ + if (sscanf(auxval->ptr, "%i,%lu,%lu", &slot_id, &slot_size, &expires_slot_size) < 3) { + decrRefCount(auxkey); + decrRefCount(auxval); + goto eoferr; + } + + if (server.cluster_enabled) { + /* In cluster mode we resize individual slot specific dictionaries based on the number of keys that + * slot holds. */ + kvstoreDictExpand(db->keys, slot_id, slot_size); + kvstoreDictExpand(db->expires, slot_id, expires_slot_size); + should_expand_db = 0; + } } else { /* Check if this is a dynamic aux field */ int handled = 0; diff --git a/src/rdb.h b/src/rdb.h index 393d2f658a..3b17cbe9de 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -38,7 +38,7 @@ /* The current RDB version. When the format changes in a way that is no longer * backward compatible this number gets incremented. */ -#define RDB_VERSION 12 +#define RDB_VERSION 11 /* Defines related to the dump file format. To store 32 bits lengths for short * keys requires a lot of space, so we check the most significant 2 bits of @@ -101,7 +101,6 @@ #define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ -#define RDB_OPCODE_SLOT_INFO 244 /* Individual slot info, such as slot id and size (cluster mode only). */ #define RDB_OPCODE_FUNCTION2 245 /* function library data */ #define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ diff --git a/src/valkey-check-rdb.c b/src/valkey-check-rdb.c index 7e93f70360..0b2fdbb666 100644 --- a/src/valkey-check-rdb.c +++ b/src/valkey-check-rdb.c @@ -256,12 +256,6 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { if ((db_size = rdbLoadLen(&rdb, NULL)) == RDB_LENERR) goto eoferr; if ((expires_size = rdbLoadLen(&rdb, NULL)) == RDB_LENERR) goto eoferr; continue; /* Read type again. */ - } else if (type == RDB_OPCODE_SLOT_INFO) { - uint64_t slot_id, slot_size, expires_slot_size; - if ((slot_id = rdbLoadLen(&rdb, NULL)) == RDB_LENERR) goto eoferr; - if ((slot_size = rdbLoadLen(&rdb, NULL)) == RDB_LENERR) goto eoferr; - if ((expires_slot_size = rdbLoadLen(&rdb, NULL)) == RDB_LENERR) goto eoferr; - continue; /* Read type again. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB * which is backward compatible. Implementations of RDB loading From ae2d4217e147996bd6c546f559aa564f873f9203 Mon Sep 17 00:00:00 2001 From: kukey Date: Wed, 19 Jun 2024 08:48:58 +0800 Subject: [PATCH 09/53] Add new SCRIPT SHOW subcommand to dump script via sha1 (#617) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In some scenarios, the business may not be able to find the previously used Lua script and only have a SHA signature. Or there are multiple identical evalsha's args in monitor/slowlog, and admin is not able to distinguish the script body. Add a new script subcommmand to show the contents of script given the scripts sha1. Returns a NOSCRIPT error if the script is not present in the cache. Usage: `SCRIPT SHOW sha1` Complexity: `O(1)` Closes #604. Doc PR: https://github.com/valkey-io/valkey-doc/pull/143 --------- Signed-off-by: wei.kukey Signed-off-by: Madelyn Olson Co-authored-by: Madelyn Olson Co-authored-by: Binbin Co-authored-by: Viktor Söderqvist --- src/commands.def | 23 +++++++++++++++++++++++ src/commands/script-show.json | 27 +++++++++++++++++++++++++++ src/eval.c | 12 ++++++++++++ src/server.c | 2 +- tests/unit/scripting.tcl | 15 +++++++++++++++ 5 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 src/commands/script-show.json diff --git a/src/commands.def b/src/commands.def index cb7fd73cc5..989dd1864d 100644 --- a/src/commands.def +++ b/src/commands.def @@ -5333,6 +5333,28 @@ struct COMMAND_ARG SCRIPT_LOAD_Args[] = { {MAKE_ARG("script",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, }; +/********** SCRIPT SHOW ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* SCRIPT SHOW history */ +#define SCRIPT_SHOW_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* SCRIPT SHOW tips */ +#define SCRIPT_SHOW_Tips NULL +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* SCRIPT SHOW key specs */ +#define SCRIPT_SHOW_Keyspecs NULL +#endif + +/* SCRIPT SHOW argument table */ +struct COMMAND_ARG SCRIPT_SHOW_Args[] = { +{MAKE_ARG("sha1",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, +}; + /* SCRIPT command table */ struct COMMAND_STRUCT SCRIPT_Subcommands[] = { {MAKE_CMD("debug","Sets the debug mode of server-side Lua scripts.","O(1)","3.2.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,SCRIPT_DEBUG_History,0,SCRIPT_DEBUG_Tips,0,scriptCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,SCRIPT_DEBUG_Keyspecs,0,NULL,1),.args=SCRIPT_DEBUG_Args}, @@ -5341,6 +5363,7 @@ struct COMMAND_STRUCT SCRIPT_Subcommands[] = { {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,SCRIPT_HELP_History,0,SCRIPT_HELP_Tips,0,scriptCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING,SCRIPT_HELP_Keyspecs,0,NULL,0)}, {MAKE_CMD("kill","Terminates a server-side Lua script during execution.","O(1)","2.6.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,SCRIPT_KILL_History,0,SCRIPT_KILL_Tips,2,scriptCommand,2,CMD_NOSCRIPT|CMD_ALLOW_BUSY,ACL_CATEGORY_SCRIPTING,SCRIPT_KILL_Keyspecs,0,NULL,0)}, {MAKE_CMD("load","Loads a server-side Lua script to the script cache.","O(N) with N being the length in bytes of the script body.","2.6.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,SCRIPT_LOAD_History,0,SCRIPT_LOAD_Tips,2,scriptCommand,3,CMD_NOSCRIPT|CMD_STALE,ACL_CATEGORY_SCRIPTING,SCRIPT_LOAD_Keyspecs,0,NULL,1),.args=SCRIPT_LOAD_Args}, +{MAKE_CMD("show","Show server-side Lua script in the script cache.","O(1).","8.0.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,SCRIPT_SHOW_History,0,SCRIPT_SHOW_Tips,0,scriptCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,SCRIPT_SHOW_Keyspecs,0,NULL,1),.args=SCRIPT_SHOW_Args}, {0} }; diff --git a/src/commands/script-show.json b/src/commands/script-show.json new file mode 100644 index 0000000000..f22fa29675 --- /dev/null +++ b/src/commands/script-show.json @@ -0,0 +1,27 @@ +{ + "SHOW": { + "summary": "Show server-side Lua script in the script cache.", + "complexity": "O(1).", + "group": "scripting", + "since": "8.0.0", + "arity": 3, + "container": "SCRIPT", + "function": "scriptCommand", + "command_flags": [ + "NOSCRIPT" + ], + "acl_categories": [ + "SCRIPTING" + ], + "arguments": [ + { + "name": "sha1", + "type": "string" + } + ], + "reply_schema": { + "description": "Lua script if sha1 hash exists in script cache.", + "type": "string" + } + } +} diff --git a/src/eval.c b/src/eval.c index e747c233e8..f4d09a5aa6 100644 --- a/src/eval.c +++ b/src/eval.c @@ -682,6 +682,8 @@ void scriptCommand(client *c) { " Kill the currently executing Lua script.", "LOAD