Skip to content

Commit

Permalink
Merge branch 'unstable' of https://github.com/pingxie/valkey into rep…
Browse files Browse the repository at this point in the history
…l-slot-states-in-rdb
  • Loading branch information
PingXie committed Jun 7, 2024
2 parents d1799fc + 54c9747 commit 19d3707
Show file tree
Hide file tree
Showing 54 changed files with 3,474 additions and 3,446 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
- name: Check for formatting changes
if: ${{ steps.clang-format.outputs.diff }}
run: |
echo "Code is not formatted correctly. Here is the diff:"
echo "ERROR: Code is not formatted correctly. Here is the diff:"
# Decode the Base64 diff to display it
echo "${{ steps.clang-format.outputs.diff }}" | base64 --decode
exit 1
Expand Down
23 changes: 12 additions & 11 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -904,12 +904,12 @@ int aofFsyncInProgress(void) {
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) {
bioCreateFsyncJob(fd, server.master_repl_offset, 1);
bioCreateFsyncJob(fd, server.primary_repl_offset, 1);
}

/* Close the fd on the basis of aof_background_fsync. */
void aof_background_fsync_and_close(int fd) {
bioCreateCloseAofJob(fd, server.master_repl_offset, 1);
bioCreateCloseAofJob(fd, server.primary_repl_offset, 1);
}

/* Kills an AOFRW child process if exists */
Expand Down Expand Up @@ -1069,11 +1069,12 @@ void flushAppendOnlyFile(int force) {
} else {
/* All data is fsync'd already: Update fsynced_reploff_pending just in case.
* This is needed to avoid a WAITAOF hang in case a module used RM_Call with the NO_AOF flag,
* in which case master_repl_offset will increase but fsynced_reploff_pending won't be updated
* in which case primary_repl_offset will increase but fsynced_reploff_pending won't be updated
* (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on
* the higher offset (which contains data that was only propagated to replicas, and not to AOF) */
if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO)
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset,
memory_order_relaxed);
return;
}
}
Expand Down Expand Up @@ -1243,7 +1244,7 @@ void flushAppendOnlyFile(int force) {
latencyAddSampleIfNeeded("aof-fsync-always", latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset, memory_order_relaxed);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
Expand Down Expand Up @@ -1355,7 +1356,7 @@ struct client *createAOFClient(void) {
c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */

/*
* The AOF client should never be blocked (unlike master
* The AOF client should never be blocked (unlike primary
* replication connection).
* This is because blocking the AOF client might cause
* deadlock (because potentially no one will unblock it).
Expand All @@ -1365,9 +1366,9 @@ struct client *createAOFClient(void) {
*/
c->flags = CLIENT_DENY_BLOCKING;

/* We set the fake client as a slave waiting for the synchronization
/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
return c;
}

Expand Down Expand Up @@ -2320,7 +2321,7 @@ int rewriteAppendOnlyFile(char *filename) {

if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(SLAVE_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
errno = error;
goto werr;
}
Expand Down Expand Up @@ -2403,12 +2404,12 @@ int rewriteAppendOnlyFileBackground(void) {
* between updates to `fsynced_reploff_pending` of the worker thread, belonging
* to the previous AOF, and the new one. This concern is specific for a full
* sync scenario where we don't wanna risk the ACKed replication offset
* jumping backwards or forward when switching to a different master. */
* jumping backwards or forward when switching to a different primary. */
bioDrainWorker(BIO_AOF_FSYNC);

/* Set the initial repl_offset, which will be applied to fsynced_reploff
* when AOFRW finishes (after possibly being updated by a bio thread) */
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset, memory_order_relaxed);
server.fsynced_reploff = 0;
}

Expand Down
6 changes: 3 additions & 3 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void initClientBlockingState(client *c) {
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Master client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_MASTER && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
serverAssert(!(c->flags & CLIENT_PRIMARY && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

c->flags |= CLIENT_BLOCKED;
c->bstate.btype = btype;
Expand Down Expand Up @@ -265,8 +265,8 @@ void replyToClientsBlockedOnShutdown(void) {

/* Mass-unblock clients because something changed in the instance that makes
* blocking no longer safe. For example clients blocked in list operations
* in an instance which turns from master to slave is unsafe, so this function
* is called when a master turns into a slave.
* in an instance which turns from master to replica is unsafe, so this function
* is called when a master turns into a replica.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
Expand Down
55 changes: 27 additions & 28 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,6 @@ void clusterCommand(client *c) {
}
kvstoreReleaseDictIterator(kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr, "slaves") || !strcasecmp(c->argv[1]->ptr, "replicas")) && c->argc == 3) {
/* CLUSTER SLAVES <NODE ID> */
/* CLUSTER REPLICAS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
int j;
Expand All @@ -911,15 +910,15 @@ void clusterCommand(client *c) {
return;
}

if (clusterNodeIsSlave(n)) {
if (clusterNodeIsReplica(n)) {
addReplyError(c, "The specified node is not a master");
return;
}

/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
addReplyArrayLen(c, clusterNodeNumSlaves(n));
for (j = 0; j < clusterNodeNumSlaves(n); j++) {
sds ni = clusterGenNodeDescription(c, clusterNodeGetSlave(n, j), shouldReturnTlsInfo());
addReplyArrayLen(c, clusterNodeNumReplicas(n));
for (j = 0; j < clusterNodeNumReplicas(n); j++) {
sds ni = clusterGenNodeDescription(c, clusterNodeGetReplica(n, j), shouldReturnTlsInfo());
addReplyBulkCString(c, ni);
sdsfree(ni);
}
Expand Down Expand Up @@ -1048,8 +1047,8 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (clusterNodeIsMaster(myself) || c->flags & CLIENT_READONLY) {
if (n == clusterNodeGetMaster(myself) && getMigratingSlotDest(slot) != NULL) {
if (clusterNodeIsPrimary(myself) || c->flags & CLIENT_READONLY) {
if (n == clusterNodeGetPrimary(myself) && getMigratingSlotDest(slot) != NULL) {
migrating_slot = 1;
} else if (getImportingSlotSource(slot) != NULL) {
importing_slot = 1;
Expand Down Expand Up @@ -1122,7 +1121,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
/* MIGRATE always works in the context of the local node if the slot
* is open (migrating or importing state). We need to be able to freely
* move keys among instances in this case. */
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand && clusterNodeIsMaster(myself)) {
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand && clusterNodeIsPrimary(myself)) {
return myself;
}

Expand Down Expand Up @@ -1152,13 +1151,13 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
}
}

/* Handle the read-only client case reading from a slave: if this
* node is a slave and the request is about a hash slot our master
/* Handle the read-only client case reading from a replica: if this
* node is a replica and the request is about a hash slot our primary
* is serving, we can reply without redirection. */
int is_write_command =
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsSlave(myself) &&
clusterNodeGetMaster(myself) == n) {
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == n) {
return myself;
}

Expand Down Expand Up @@ -1204,7 +1203,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* to detect timeouts, in order to handle the following case:
*
* 1) A client blocks with BLPOP or similar blocking operation.
* 2) The master migrates the hash slot elsewhere or turns into a slave.
* 2) The primary migrates the hash slot elsewhere or turns into a replica.
* 3) The client may remain blocked forever (or up to the max timeout time)
* waiting for a key change that will never happen.
*
Expand Down Expand Up @@ -1240,8 +1239,8 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {

/* if the client is read-only and attempting to access key that our
* replica can handle, allow it. */
if ((c->flags & CLIENT_READONLY) && !(c->lastcmd->flags & CMD_WRITE) && clusterNodeIsSlave(myself) &&
clusterNodeGetMaster(myself) == node) {
if ((c->flags & CLIENT_READONLY) && !(c->lastcmd->flags & CMD_WRITE) && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == node) {
node = myself;
}

Expand Down Expand Up @@ -1331,9 +1330,9 @@ int isNodeAvailable(clusterNode *node) {
}

void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
int i, nested_elements = 3; /* slots (2) + primary addr (1) */
for (i = 0; i < clusterNodeNumReplicas(node); i++) {
if (!isNodeAvailable(clusterNodeGetReplica(node, i))) continue;
nested_elements++;
}
addReplyArrayLen(c, nested_elements);
Expand All @@ -1342,11 +1341,11 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
addNodeToNodeReply(c, node);

/* Remaining nodes in reply are replicas for slot range */
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
for (i = 0; i < clusterNodeNumReplicas(node); i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
addNodeToNodeReply(c, clusterNodeGetSlave(node, i));
if (!isNodeAvailable(clusterNodeGetReplica(node, i))) continue;
addNodeToNodeReply(c, clusterNodeGetReplica(node, i));
nested_elements--;
}
serverAssert(nested_elements == 3); /* Original 3 elements */
Expand All @@ -1364,7 +1363,7 @@ void clearCachedClusterSlotsResponse(void) {
sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
clusterNode *n = NULL;
int num_masters = 0, start = -1;
int num_primaries = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(recording_client);

for (int i = 0; i <= CLUSTER_SLOTS; i++) {
Expand All @@ -1380,13 +1379,13 @@ sds generateClusterSlotResponse(void) {
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(recording_client, n, start, i - 1);
num_masters++;
num_primaries++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
setDeferredArrayLen(recording_client, slot_replylen, num_primaries);
sds cluster_slot_response = aggregateClientOutputBuffer(recording_client);
deleteCachedResponseClient(recording_client);
return cluster_slot_response;
Expand All @@ -1405,8 +1404,8 @@ int verifyCachedClusterSlotsResponse(sds cached_response) {
void clusterCommandSlots(client *c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) 1) primary IP
* 2) primary port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
Expand Down Expand Up @@ -1446,8 +1445,8 @@ void askingCommand(client *c) {
}

/* The READONLY command is used by clients to enter the read-only mode.
* In this mode slaves will not redirect clients as long as clients access
* with read-only commands to keys that are served by the slave's master. */
* In this mode replica will not redirect clients as long as clients access
* with read-only commands to keys that are served by the replica's primary. */
void readonlyCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
Expand Down
12 changes: 6 additions & 6 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int clusterCommandSpecial(client *c);
const char **clusterCommandExtendedHelp(void);

int clusterAllowFailoverCmd(client *c);
void clusterPromoteSelfToMaster(void);
void clusterPromoteSelfToPrimary(void);
int clusterManualFailoverTimeLimit(void);

void clusterCommandSlots(client *c);
Expand All @@ -83,18 +83,18 @@ int getClusterSize(void);
int getMyShardSlotCount(void);
int handleDebugClusterCommand(client *c);
int clusterNodePending(clusterNode *node);
int clusterNodeIsMaster(clusterNode *n);
int clusterNodeIsPrimary(clusterNode *n);
char **getClusterNodesList(size_t *numnodes);
char *clusterNodeIp(clusterNode *node);
int clusterNodeIsSlave(clusterNode *node);
clusterNode *clusterNodeGetMaster(clusterNode *node);
int clusterNodeIsReplica(clusterNode *node);
clusterNode *clusterNodeGetPrimary(clusterNode *node);
char *clusterNodeGetName(clusterNode *node);
int clusterNodeTimedOut(clusterNode *node);
int clusterNodeIsFailing(clusterNode *node);
int clusterNodeIsNoFailover(clusterNode *node);
char *clusterNodeGetShardId(clusterNode *node);
int clusterNodeNumSlaves(clusterNode *node);
clusterNode *clusterNodeGetSlave(clusterNode *node, int slave_idx);
int clusterNodeNumReplicas(clusterNode *node);
clusterNode *clusterNodeGetReplica(clusterNode *node, int slave_idx);
clusterNode *getMigratingSlotDest(int slot);
clusterNode *getImportingSlotSource(int slot);
clusterNode *getNodeBySlot(int slot);
Expand Down
Loading

0 comments on commit 19d3707

Please sign in to comment.