Skip to content

Commit

Permalink
Consolidate more blocked states
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
PingXie committed May 30, 2024
1 parent feea88e commit 949d017
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 16 deletions.
13 changes: 6 additions & 7 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
if (c->bstate.btype == BLOCKED_DATA) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
Expand Down Expand Up @@ -225,7 +225,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
if (c->bstate.btype == BLOCKED_DATA) {
addReplyNullArray(c);
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
Expand Down Expand Up @@ -434,10 +434,10 @@ static void unblockClientWaitingData(client *c) {

static blocking_type getBlockedTypeByType(int type) {
switch (type) {
case OBJ_LIST: return BLOCKED_LIST;
case OBJ_ZSET: return BLOCKED_ZSET;
case OBJ_LIST: return BLOCKED_DATA;
case OBJ_ZSET: return BLOCKED_DATA;
case OBJ_STREAM: return BLOCKED_DATA;
case OBJ_MODULE: return BLOCKED_MODULE;
case OBJ_STREAM: return BLOCKED_STREAM;
default: return BLOCKED_NONE;
}
}
Expand Down Expand Up @@ -625,8 +625,7 @@ static void unblockClientOnKey(client *c, robj *key) {

/* Only in case of blocking API calls, we might be blocked on several keys.
however we should force unblock the entire blocking keys */
serverAssert(c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET);
serverAssert(c->bstate.btype == BLOCKED_DATA);

/* We need to unblock the client before calling processCommandAndResetClient
* because it checks the CLIENT_BLOCKED flag */
Expand Down
3 changes: 1 addition & 2 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1213,8 +1213,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) {
clusterNode *myself = getMyClusterNode();
if (c->flags & CLIENT_BLOCKED && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
if (c->flags & CLIENT_BLOCKED && (c->bstate.btype == BLOCKED_DATA || c->bstate.btype == BLOCKED_MODULE)) {
dictEntry *de;
dictIterator *di;

Expand Down
4 changes: 1 addition & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
* if CLIENT_BLOCKED flag is set. */
typedef enum blocking_type {
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
BLOCKED_LIST, /* BLPOP & co. */
BLOCKED_DATA, /* Block on data: BLPOP & co., XREAD, BZPOP et al. */
BLOCKED_WAIT, /* WAIT for synchronous replication. */
BLOCKED_MODULE, /* Blocked by a loadable module. */
BLOCKED_STREAM, /* XREAD. */
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_NUM, /* Number of blocked states. */
Expand Down
4 changes: 2 additions & 2 deletions src/t_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
}

/* If the keys do not exist we must block */
blockForKeys(c, BLOCKED_LIST, keys, numkeys, timeout, 0);
blockForKeys(c, BLOCKED_DATA, keys, numkeys, timeout, 0);
}

/* BLPOP <key> [<key> ...] <timeout> */
Expand All @@ -1243,7 +1243,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
addReplyNull(c);
} else {
/* The list is empty and the client blocks. */
blockForKeys(c, BLOCKED_LIST, c->argv + 1, 1, timeout, 0);
blockForKeys(c, BLOCKED_DATA, c->argv + 1, 1, timeout, 0);
}
} else {
/* The list exists and has elements, so
Expand Down
2 changes: 1 addition & 1 deletion src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -2404,7 +2404,7 @@ void xreadCommand(client *c) {
decrRefCount(argv_streamid);
}
}
blockForKeys(c, BLOCKED_STREAM, c->argv + streams_arg, streams_count, timeout, xreadgroup);
blockForKeys(c, BLOCKED_DATA, c->argv + streams_arg, streams_count, timeout, xreadgroup);
goto cleanup;
}

Expand Down
2 changes: 1 addition & 1 deletion src/t_zset.c
Original file line number Diff line number Diff line change
Expand Up @@ -4045,7 +4045,7 @@ void blockingGenericZpopCommand(client *c,
}

/* If the keys do not exist we must block */
blockForKeys(c, BLOCKED_ZSET, keys, numkeys, timeout, 0);
blockForKeys(c, BLOCKED_DATA, keys, numkeys, timeout, 0);
}

// BZPOPMIN key [key ...] timeout
Expand Down

0 comments on commit 949d017

Please sign in to comment.