Skip to content

Commit

Permalink
move client_waiting_acks_list_node to blockingState
Browse files Browse the repository at this point in the history
Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Jul 17, 2024
1 parent da630b0 commit 0b09385
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
9 changes: 6 additions & 3 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);
void initClientBlockingState(client *c) {
c->bstate.btype = BLOCKED_NONE;
c->bstate.timeout = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bstate.numreplicas = 0;
c->bstate.numlocal = 0;
c->bstate.reploffset = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.client_waiting_acks_list_node = NULL;
c->bstate.module_blocked_handle = NULL;
c->bstate.async_rm_call_handle = NULL;
}

Expand Down Expand Up @@ -596,8 +599,8 @@ void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, lon
/* Note that we remember the linked list node where the client is stored,
* this way removing the client in unblockClientWaitingReplicas() will not
* require a linear scan, but just a constant time operation. */
serverAssert(c->client_waiting_acks_list_node == NULL);
c->client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
serverAssert(c->bstate.client_waiting_acks_list_node == NULL);
c->bstate.client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
blockClient(c, BLOCKED_WAIT);
}

Expand Down
1 change: 0 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ client *createClient(connection *conn) {
c->peerid = NULL;
c->sockname = NULL;
c->client_list_node = NULL;
c->client_waiting_acks_list_node = NULL;
c->postponed_list_node = NULL;
c->io_read_state = CLIENT_IDLE;
c->io_write_state = CLIENT_IDLE;
Expand Down
6 changes: 3 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3513,9 +3513,9 @@ void waitaofCommand(client *c) {
* waiting for replica acks. Never call it directly, call unblockClient()
* instead. */
void unblockClientWaitingReplicas(client *c) {
serverAssert(c->client_waiting_acks_list_node);
listDelNode(server.clients_waiting_acks, c->client_waiting_acks_list_node);
c->client_waiting_acks_list_node = NULL;
serverAssert(c->bstate.client_waiting_acks_list_node);
listDelNode(server.clients_waiting_acks, c->bstate.client_waiting_acks_list_node);
c->bstate.client_waiting_acks_list_node = NULL;
updateStatsOnUnblock(c, 0, 0, 0);
}

Expand Down
19 changes: 10 additions & 9 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -975,21 +975,23 @@ typedef struct multiState {
} multiState;

/* This structure holds the blocking operation state for a client.
* The fields used depend on client->btype. */
* The fields used depend on client->bstate.btype. */
typedef struct blockingState {
/* Generic fields. */
blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */
int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys
is deleted or does not exist anymore */

/* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */
dict *keys; /* The keys we are blocked on */

/* BLOCKED_WAIT and BLOCKED_WAITAOF */
int numreplicas; /* Number of replicas we are waiting for ACK. */
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
long long reploffset; /* Replication offset to reach. */
listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */

/* BLOCKED_MODULE */
void *module_blocked_handle; /* ValkeyModuleBlockedClient structure.
Expand Down Expand Up @@ -1267,14 +1269,13 @@ typedef struct client {
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *client_waiting_acks_list_node; /* list node in client waiting acks list. */
listNode *postponed_list_node; /* list node within the postponed list */
void *module_blocked_client; /* Pointer to the ValkeyModuleBlockedClient associated with this
* client. This is set in case of module authentication before the
* unblocked client is reprocessed to handle reply callbacks. */
void *module_auth_ctx; /* Ongoing / attempted module based auth callback's ctx.
* This is only tracked within the context of the command attempting
* authentication. If not NULL, it means module auth is in progress. */
listNode *postponed_list_node; /* list node within the postponed list */
void *module_blocked_client; /* Pointer to the ValkeyModuleBlockedClient associated with this
* client. This is set in case of module authentication before the
* unblocked client is reprocessed to handle reply callbacks. */
void *module_auth_ctx; /* Ongoing / attempted module based auth callback's ctx.
* This is only tracked within the context of the command attempting
* authentication. If not NULL, it means module auth is in progress. */
ValkeyModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
Expand Down

0 comments on commit 0b09385

Please sign in to comment.