Skip to content

Commit

Permalink
Consolidate various BLOCKED_WAIT states
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
PingXie committed May 28, 2024
1 parent 5d0f4bc commit 1ce1dbd
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 50 deletions.
32 changes: 7 additions & 25 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ void queueClientForReprocessing(client *c) {
void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
} else if (c->bstate.btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
Expand All @@ -200,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand Down Expand Up @@ -230,15 +228,15 @@ void replyToBlockedClientTimedOut(client *c) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
addReplyNullArray(c);
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
} else if (c->cmd == shared.wait_cmd) {
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->cmd == shared.waitaof_cmd) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
} else if (c->cmd == shared.setslot_cmd) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
Expand Down Expand Up @@ -585,29 +583,13 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}

/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) {
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, btype);
}

/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
}

/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
blockClient(c, BLOCKED_WAIT);
}

/* Postpone client from executing a command. For example the server might be busy
Expand Down
6 changes: 4 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6009,7 +6009,7 @@ void clusterCommandSetSlot(client *c) {
* This ensures that all replicas have the latest topology information, enabling
* a reliable slot ownership transfer even if the primary node went down during
* the process. */
if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_PREREPL_DONE) == 0) {
if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_REPLICATION_DONE) == 0) {
forceCommandPropagation(c, PROPAGATE_REPL);
/* We are a primary and this is the first time we see this `SETSLOT`
* command. Force-replicate the command to all of our replicas
Expand All @@ -6022,7 +6022,9 @@ void clusterCommandSetSlot(client *c) {
if (timeout_ms == 0) {
timeout_ms = CLUSTER_OPERATION_TIMEOUT;
}
blockForPreReplication(c, mstime() + timeout_ms, server.master_repl_offset + 1, myself->numslaves);
blockClientForReplicaAck(c, mstime() + timeout_ms, server.master_repl_offset + 1, myself->numslaves, 0);
/* Mark client as pending command for execution after replication to replicas. */
c->flags |= CLIENT_PENDING_COMMAND;
replicationRequestAckFromSlaves();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2062,7 +2062,7 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE);
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_REPLICATION_DONE);

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down
11 changes: 5 additions & 6 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3422,7 +3422,7 @@ void waitCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
blockForReplication(c, timeout, offset, numreplicas);
blockClientForReplicaAck(c, timeout, offset, numreplicas, 0);

/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
Expand Down Expand Up @@ -3462,7 +3462,7 @@ void waitaofCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
blockForAofFsync(c, timeout, c->woff, numlocal, numreplicas);
blockClientForReplicaAck(c, timeout, c->woff, numlocal, numreplicas);

/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
Expand Down Expand Up @@ -3497,8 +3497,7 @@ void processClientsWaitingReplicas(void) {
int numreplicas = 0;

client *c = ln->value;
int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL;
int is_wait_aof = c->cmd == shared.waitaof_cmd;

if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
Expand Down Expand Up @@ -3545,8 +3544,8 @@ void processClientsWaitingReplicas(void) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, numlocal);
addReplyLongLong(c, numreplicas);
} else if (is_wait_prerepl) {
c->flags |= CLIENT_PREREPL_DONE;
} else if (c->flags | CLIENT_PENDING_COMMAND) {
c->flags |= CLIENT_REPLICATION_DONE;
} else {
addReplyLongLong(c, numreplicas);
}
Expand Down
5 changes: 5 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,11 @@ void createSharedObjects(void) {
* string in string comparisons for the ZRANGEBYLEX command. */
shared.minstring = sdsnew("minstring");
shared.maxstring = sdsnew("maxstring");

/* Shared command object */
shared.wait_cmd = lookupCommandByCString("WAIT");
shared.waitaof_cmd = lookupCommandByCString("WAITAOF");
shared.setslot_cmd = lookupCommandByCString("CLUSTER|SETSLOT");
}

void initServerClientMemUsageBuckets(void) {
Expand Down
29 changes: 13 additions & 16 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,23 +426,21 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL << 48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL << 49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL << 50) /* The client is re-processing the command. */
#define CLIENT_PREREPL_DONE (1ULL << 51) /* Indicate that pre-replication has been done on the client */
#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
typedef enum blocking_type {
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
BLOCKED_LIST, /* BLPOP & co. */
BLOCKED_WAIT, /* WAIT for synchronous replication. */
BLOCKED_WAITAOF, /* WAITAOF for AOF file fsync. */
BLOCKED_MODULE, /* Blocked by a loadable module. */
BLOCKED_STREAM, /* XREAD. */
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
BLOCKED_LIST, /* BLPOP & co. */
BLOCKED_WAIT, /* WAIT for synchronous replication. */
BLOCKED_MODULE, /* Blocked by a loadable module. */
BLOCKED_STREAM, /* XREAD. */
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
} blocking_type;

/* Client request types */
Expand Down Expand Up @@ -1375,6 +1373,7 @@ struct sharedObjectsStruct {
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
*sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~<value>\r\n" */
sds minstring, maxstring;
struct serverCommand *wait_cmd, *waitaof_cmd, *setslot_cmd;
};

/* ZSETs use a specialized version of Skiplists */
Expand Down Expand Up @@ -3492,9 +3491,7 @@ void signalKeyAsReady(serverDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey);
void blockClientShutdown(client *c);
void blockPostponeClient(client *c);
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal);
void replicationRequestAckFromSlaves(void);
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
Expand Down

0 comments on commit 1ce1dbd

Please sign in to comment.