Skip to content

Commit

Permalink
Move the handler logic to replication
Browse files Browse the repository at this point in the history
  • Loading branch information
enjoy-binbin committed Nov 27, 2024
1 parent d88f448 commit 8239112
Show file tree
Hide file tree
Showing 13 changed files with 683 additions and 845 deletions.
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,6 @@ void clusterSlotPendingDelete(void);
int testInjectError(const char *error);
char *getInjectOptionValue(const char *option);
list *createSlotRangeList(void);
void resetSlotSyncLinkForReconnect(void *link);

#endif /* __CLUSTER_H */
24 changes: 12 additions & 12 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ int isSlotInClusterSlotSyncLinkList(int slot);
int isSlotInPendingDelete(int slot);
clusterSlotSyncLink *createSlotSyncLink(void);
void initSlotSyncLink(clusterSlotSyncLink *link, clusterNode *node, list *slot_ranges);
const char *slotSyncStateToString(slotSyncState state);
const char *slotSyncStateToString(int repl_state);
sds formatSlotSyncImportingSlots(void);
void clusterCommandSlotLinkList(client *c);
void clusterCommandSlotLinkKill(client *c, const char *linkname);
Expand Down Expand Up @@ -6995,13 +6995,13 @@ int clusterCommandSpecial(client *c) {
/* CLUSTER SLOTSYNC <start slot> <end slot> [<start slot> <end slot> ...]
*
* This command is sent to the target node, which must be a primary node, and
* it will try to synchronize slot data from the slot owner. */
* it will try to synchronize slot data from the slot primary. */
if (!nodeIsPrimary(myself)) {
addReplyError(c, "Myself should be a primary.");
return 1;
}

clusterNode *n = NULL;
clusterNode *source_node = NULL;

/* Build the slot range list by the command arguments. */
list *slot_ranges = createSlotRangeList();
Expand All @@ -7028,15 +7028,15 @@ int clusterCommandSpecial(client *c) {
listRelease(slot_ranges);
return 1;
}
if (!n) {
n = server.cluster->slots[j];
} else if (n != server.cluster->slots[j]) {
if (!source_node) {
source_node = server.cluster->slots[j];
} else if (source_node != server.cluster->slots[j]) {
addReplyErrorFormat(c, "The slot ranges can not cross nodes, please check slot: %d.", j);
listRelease(slot_ranges);
return 1;
}
if (n == myself) {
addReplyErrorFormat(c, "Slot %d is served by myself.", j);
if (source_node == myself) {
addReplyErrorFormat(c, "Slot %d is already served by myself.", j);
listRelease(slot_ranges);
return 1;
}
Expand All @@ -7057,13 +7057,13 @@ int clusterCommandSpecial(client *c) {
new_range->end_slot = endslot;
listAddNodeTail(slot_ranges, new_range);
serverLog(LL_NOTICE, "Syncing slot range [%d-%d] from node %.40s (%s)",
startslot, endslot, n->name, n->human_nodename);
startslot, endslot, source_node->name, source_node->human_nodename);
}
serverAssert(n);
serverAssert(source_node && source_node != myself);

/* Create and initialize the slot sync link. */
clusterSlotSyncLink *link = createSlotSyncLink();
initSlotSyncLink(link, n, slot_ranges);
initSlotSyncLink(link, source_node, slot_ranges);
listAddNodeTail(server.cluster->slotsync_links, link);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "slotfailover") && (c->argc == 2 || c->argc == 3)) {
Expand Down Expand Up @@ -7091,7 +7091,7 @@ int clusterCommandSpecial(client *c) {
listRewind(server.cluster->slotsync_links, &li);
while ((ln = listNext(&li)) != NULL) {
clusterSlotSyncLink *link = ln->value;
if (link->sync_state != CLUSTER_SLOTSYNC_STATE_CONNECTED) {
if (link->sync_state != REPL_STATE_CONNECTED) {
addReplyErrorFormat(c, "Slot sync link %.40s is not connected, link status: %s", link->linkname,
slotSyncStateToString(link->sync_state));
return 1;
Expand Down
53 changes: 16 additions & 37 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,49 +372,28 @@ typedef struct slotRange {
int end_slot;
} slotRange;

/* Slot sync state. Used in clusterSlotSyncLink struct sync_state for links to remember
* what to do next. */
typedef enum {
CLUSTER_SLOTSYNC_STATE_NONE = 0,
CLUSTER_SLOTSYNC_STATE_TOCONNECT, /* Need to reconnect with the slot owner */
CLUSTER_SLOTSYNC_STATE_CONNECTING, /* In connecting with the slot owner */
/* --- Handshake states, must be ordered --- */
CLUSTER_SLOTSYNC_STATE_SEND_AUTH, /* Need to send AUTH */
CLUSTER_SLOTSYNC_STATE_RECV_AUTH, /* Wait for AUTH reply */
CLUSTER_SLOTSYNC_STATE_SEND_CAPA, /* Need to send REPLCONF capa */
CLUSTER_SLOTSYNC_STATE_RECV_CAPA, /* Wait for REPLCONF reply */
CLUSTER_SLOTSYNC_STATE_WAIT_SCHED, /* Wait for schedule to avod oncurrency bug */
CLUSTER_SLOTSYNC_STATE_SEND_SYNC, /* Need to send SYNC */
/* --- End of handshake states --- */
CLUSTER_SLOTSYNC_STATE_RECV_RDB, /* Receiving the filtered rdb */
CLUSTER_SLOTSYNC_STATE_LOADING_RDB, /* Loading the RDB in bio. */
CLUSTER_SLOTSYNC_STATE_DONE_LOADING,/* Done loading the RDB in bio. */
CLUSTER_SLOTSYNC_STATE_CONNECTED, /* Synced with the slot owner */
CLUSTER_SLOTSYNC_STATE_LOADING_FAIL,/* Loading fail. */
CLUSTER_SLOTSYNC_STATE_FAILED, /* Meet unexpected error and retry will not work */
} slotSyncState;

/* Encapsulate everything needed to talk with the slot sync source node. */
typedef struct clusterSlotSyncLink {
mstime_t ctime; /* Link creation time */
connection *sync_conn; /* Connection to slot sync source node */
client* client;
char linkname[CLUSTER_NAMELEN]; /* Name of this link */
char nodename[CLUSTER_NAMELEN]; /* Name of the slot sync source node */
mstime_t ctime; /* Link object creation time. */
char linkname[CLUSTER_NAMELEN]; /* Name of this link, hex string, sha1-size. */
char nodename[CLUSTER_NAMELEN]; /* Name of the slot sync source node, hex string, sha1-size. */

serverDb *db;
functionsLibCtx *functions_lib_ctx;
/* Temporary resources during slot synchronization. */
serverDb *temp_db; /* Temp db stores the keys during the sync process. */
functionsLibCtx *temp_func_ctx; /* Temp function ctx stores functions during the sync process. */

slotSyncState sync_state; /* Sync state */
list* slot_ranges; /* List of the slot ranges we want to sync */
client *client; /* Client to slot sync source node. */
connection *sync_conn; /* Connection to slot sync source node. */
int sync_state; /* State of the slot sync link during slot sync. */
list *slot_ranges; /* List of the slot ranges we want to sync. */

/* The following fields are used by slot sync RDB transfer. */
int transfer_tmpfile_fd; /* Descriptor of the tmpfile to store slot sync RDB */
char* transfer_tmpfile_name; /* Name of the tmpfile to store slot sync RDB */
int64_t transfer_total_size; /* Total size of the slot sync RDB file */
int64_t transfer_read_size; /* Amount of read from the slot sync RDB file */
off_t transfer_last_fsync_off; /* Offset when we fsync-ed last time */
time_t transfer_lastio; /* Unix time of the latest read, for timeout */
int repl_transfer_fd; /* Descriptor of the tmpfile to store slot sync RDB */
char *repl_transfer_tmpfile; /* Name of the tmpfile to store slot sync RDB */
int64_t repl_transfer_size; /* Total size of the slot sync RDB file */
int64_t repl_transfer_read; /* Amount of read from the slot sync RDB file */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time */
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */

/* The following fields are used by slot failover. */
int slot_mf_ready; /* If is ready to do slot manual failover */
Expand Down
Loading

0 comments on commit 8239112

Please sign in to comment.