Skip to content

Commit

Permalink
Address more PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Aug 6, 2024
1 parent da8106b commit 2fbf659
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 103 deletions.
16 changes: 16 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,4 +348,20 @@ void setcpuaffinity(const char *cpulist);
#endif
#endif

/* Check for GCC version >= 4.9 */
#if defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 9))
#define HAS_BUILTIN_PREFETCH 1
/* Check for Clang version >= 3.6 */
#elif defined(__clang__) && (__clang_major__ > 3 || (__clang_major__ == 3 && __clang_minor__ >= 6))
#define HAS_BUILTIN_PREFETCH 1
#else
#define HAS_BUILTIN_PREFETCH 0
#endif

#if HAS_BUILTIN_PREFETCH
#define valkey_prefetch(addr) __builtin_prefetch(addr)
#else
#define valkey_prefetch(addr) ((void)(addr))
#endif

#endif
65 changes: 29 additions & 36 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "zmalloc.h"
#include "serverassert.h"
#include "monotonic.h"
#include "config.h"

#ifndef static_assert
#define static_assert(expr, lit) _Static_assert(expr, lit)
Expand Down Expand Up @@ -1595,47 +1596,38 @@ typedef struct {
size_t keys_done; /* Number of keys that have been processed */
} PrefetchBatch;

static PrefetchBatch prefetchBatch; /* Global prefetch batch - holds the current batch of keys being prefetched */

static void incrCurIdx(void) {
prefetchBatch.cur_idx++;
if (prefetchBatch.cur_idx >= prefetchBatch.current_batch_size) {
prefetchBatch.cur_idx %= prefetchBatch.current_batch_size;
}
}

/* Prefetches the given pointer and move to the next key in the batch */
static void prefetch(void *ptr) {
__builtin_prefetch(ptr);
static void prefetch(void *addr, PrefetchBatch *batch) {
valkey_prefetch(addr);
/* while the prefetch is in progress, we can continue to the next key */
incrCurIdx();
batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size;
}

static void markDone(PrefetchInfo *info) {
static void markDone(PrefetchInfo *info, PrefetchBatch *batch) {
info->state = PrefetchDone;
prefetchBatch.keys_done++;
batch->keys_done++;
}

static PrefetchInfo *getNextPrefetchInfo(void) {
while (prefetchBatch.prefetch_info[prefetchBatch.cur_idx].state == PrefetchDone) {
incrCurIdx();
static PrefetchInfo *getNextPrefetchInfo(PrefetchBatch *batch) {
while (batch->prefetch_info[batch->cur_idx].state == PrefetchDone) {
batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size;
}
return &prefetchBatch.prefetch_info[prefetchBatch.cur_idx];
return &batch->prefetch_info[batch->cur_idx];
}

static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) {
static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys, PrefetchBatch *batch) {
assert(num_keys <= DictMaxPrefetchSize);

prefetchBatch.current_batch_size = num_keys;
prefetchBatch.cur_idx = 0;
prefetchBatch.keys_done = 0;
batch->current_batch_size = num_keys;
batch->cur_idx = 0;
batch->keys_done = 0;

/* Initialize the prefetch info */
for (size_t i = 0; i < prefetchBatch.current_batch_size; i++) {
PrefetchInfo *info = &prefetchBatch.prefetch_info[i];
for (size_t i = 0; i < batch->current_batch_size; i++) {
PrefetchInfo *info = &batch->prefetch_info[i];
if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) {
info->state = PrefetchDone;
prefetchBatch.keys_done++;
batch->keys_done++;
continue;
}
info->ht_idx = -1;
Expand Down Expand Up @@ -1665,11 +1657,12 @@ static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) {
* dictPrefetch can be invoked with a callback function, get_val_data_func,
* to bring the key's value data closer to the L1 cache as well. */
void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) {
initBatch(keys_dicts, num_keys, keys);
PrefetchBatch batch; /* prefetch batch - holds the current batch of keys being prefetched */
initBatch(keys_dicts, num_keys, keys, &batch);

while (prefetchBatch.keys_done < prefetchBatch.current_batch_size) {
PrefetchInfo *info = getNextPrefetchInfo();
size_t i = prefetchBatch.cur_idx;
while (batch.keys_done < batch.current_batch_size) {
PrefetchInfo *info = getNextPrefetchInfo(&batch);
size_t i = batch.cur_idx;
switch (info->state) {
case PrefetchBucket:
/* Determine which hash table to use */
Expand All @@ -1679,13 +1672,13 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(
info->ht_idx = 1;
} else {
/* No more tables left - mark as done. */
markDone(info);
markDone(info, &batch);
break;
}

/* Prefetch the bucket */
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]);
prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx], &batch);
info->current_entry = NULL;
info->state = PrefetchEntry;
break;
Expand All @@ -1701,7 +1694,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(
}

if (info->current_entry) {
prefetch(info->current_entry);
prefetch(info->current_entry, &batch);
info->state = PrefetchValue;
} else {
/* No entry found in the bucket - try the bucket in the next table */
Expand All @@ -1715,15 +1708,15 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(

if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) {
/* If this is the last element we assume a hit and dont compare the keys */
prefetch(value);
prefetch(value, &batch);
info->state = PrefetchValueData;
break;
}

void *current_entry_key = dictGetKey(info->current_entry);
if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) {
/* If the key is found, prefetch the value */
prefetch(value);
prefetch(value, &batch);
info->state = PrefetchValueData;
} else {
/* Move to next entry */
Expand All @@ -1736,9 +1729,9 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(
/* Prefetch value data if available */
if (get_val_data_func) {
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
if (value_data) prefetch(value_data);
if (value_data) prefetch(value_data, &batch);
}
markDone(info);
markDone(info, &batch);
break;
}

Expand Down
119 changes: 60 additions & 59 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4618,112 +4618,109 @@ int postponeClientRead(client *c) {
/* Prefetch multiple commands batch */
typedef struct {
client *clients[DictMaxPrefetchSize];
size_t clients_count;
size_t keys_count;
size_t client_count;
size_t key_count;
void *keys[DictMaxPrefetchSize];
kvstore *keys_kvs[DictMaxPrefetchSize];
kvstore *expire_kvs[DictMaxPrefetchSize];
int slots[DictMaxPrefetchSize];
int client_closed_during_batch_execution;
} BatchProcessData;

static BatchProcessData batch = {0};

static void *getValData(const void *val) {
static void *getObjectValuePtr(const void *val) {
robj *o = (robj *)val;
if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) {
return o->ptr;
}
return NULL;
}

void processBatchClientsCommands(void) {
if (batch.clients_count == 0) return;
static void batchProcessClientCommands(void) {
for (size_t i = 0; i < batch.client_count; i++) {
client *c = batch.clients[i];
if (c) {
/* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */
batch.clients[i] = NULL;
if (processPendingCommandAndInputBuffer(c) != C_ERR) {
beforeNextClient(c);
}
}
}
memset(&batch, 0, sizeof(batch));
}

/*Prefetch the commands' args allocated by the I/O thread and process all the commands in the batch.*/
static void batchPrefetchArgsAndProcessClientCommands(void) {
if (batch.client_count == 0) return;
/* Prefetch argv's for all clients */
for (size_t i = 0; i < batch.clients_count; i++) {
for (size_t i = 0; i < batch.client_count; i++) {
client *c = batch.clients[i];
if (!c || c->argc <= 1) continue;
/* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */
for (int j = 1; j < c->argc; j++) {
__builtin_prefetch(c->argv[j]);
valkey_prefetch(c->argv[j]);
}
}

/* prefetch the argv->ptr if required */
for (size_t i = 0; i < batch.clients_count; i++) {
for (size_t i = 0; i < batch.client_count; i++) {
client *c = batch.clients[i];
if (!c || c->argc <= 1) continue;
for (int j = 1; j < c->argc; j++) {
if (c->argv[j]->encoding == OBJ_ENCODING_RAW) {
__builtin_prefetch(c->argv[j]->ptr);
valkey_prefetch(c->argv[j]->ptr);
}
}
}

/* Get the keys ptrs - we do it here since we wanted to wait for the arg prefetch */
for (size_t i = 0; i < batch.keys_count; i++) {
for (size_t i = 0; i < batch.key_count; i++) {
batch.keys[i] = ((robj *)batch.keys[i])->ptr;
}

/* Prefetch keys for all commands, prefetch is beneficial only if there are more than one key */
if (batch.keys_count > 1) {
if (batch.key_count > 1) {
server.stat_total_prefetch_batches++;
server.stat_total_prefetch_entries += batch.keys_count;
server.stat_total_prefetch_entries += batch.key_count;
/* Keys */
kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, getValData);
kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **) batch.keys, batch.key_count, getObjectValuePtr);
/* Expires - with expires no values prefetch are required. */
kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, NULL);
kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.key_count, NULL);
}

/* Process clients' commands */
for (size_t i = 0; i < batch.clients_count; i++) {
client *c = batch.clients[i];
if (c) {
/* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */
batch.clients[i] = NULL;
if (processPendingCommandAndInputBuffer(c) != C_ERR) {
beforeNextClient(c);
}
}
}

batch.keys_count = 0;
batch.clients_count = 0;
batchProcessClientCommands();
}

void addCommandToBatchAndProcessIfFull(client *c) {
batch.clients[batch.clients_count++] = c;
batch.clients[batch.client_count++] = c;

/* Get command's keys.
* When ProcessingEventsWhileBlocked is set, we don't want to prefetch keys, as no commands will be executed. */
if (c->io_parsed_cmd && !ProcessingEventsWhileBlocked) {
/* Get command's keys positions */
if (c->io_parsed_cmd) {
getKeysResult result;
initGetKeysResult(&result);
int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result);
for (int i = 0; i < num_keys && batch.keys_count < DictMaxPrefetchSize; i++) {
batch.keys[batch.keys_count] = c->argv[result.keys[i].pos];
batch.slots[batch.keys_count] = c->slot > 0 ? c->slot : 0;
batch.keys_kvs[batch.keys_count] = c->db->keys;
batch.expire_kvs[batch.keys_count] = c->db->expires;
batch.keys_count++;
for (int i = 0; i < num_keys && batch.key_count < DictMaxPrefetchSize; i++) {
batch.keys[batch.key_count] = c->argv[result.keys[i].pos];
batch.slots[batch.key_count] = c->slot > 0 ? c->slot : 0;
batch.keys_kvs[batch.key_count] = c->db->keys;
batch.expire_kvs[batch.key_count] = c->db->expires;
batch.key_count++;
}
getKeysFreeResult(&result);
}

/* If the batch is full, process it.
* We also check the client count to handle cases where
* no keys exist for the client's command. */
if (batch.clients_count == DictMaxPrefetchSize || batch.keys_count == DictMaxPrefetchSize) {
processBatchClientsCommands();
* no keys exist for the clients' commands. */
if (batch.client_count == DictMaxPrefetchSize || batch.key_count == DictMaxPrefetchSize) {
batchPrefetchArgsAndProcessClientCommands();
}
}

void removeClientFromPendingPrefetchBatch(client *c) {
if (batch.clients_count == 0) return;

batch.client_closed_during_batch_execution = 1;

for (size_t i = 0; i < batch.clients_count; i++) {
for (size_t i = 0; i < batch.client_count; i++) {
if (batch.clients[i] == c) {
batch.clients[i] = NULL;
return;
Expand All @@ -4732,9 +4729,14 @@ void removeClientFromPendingPrefetchBatch(client *c) {
}

int processIOThreadsReadDone(void) {
if (listLength(server.clients_pending_io_read) == 0 && batch.clients_count == 0) return 0;
if (ProcessingEventsWhileBlocked) {
/* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively.
* In this case, there may be some clients left in the batch waiting to be processed. */
batchProcessClientCommands();
}

if (listLength(server.clients_pending_io_read) == 0) return 0;
int processed = 0;
batch.client_closed_during_batch_execution = 0;
listNode *ln;

listNode *next = listFirst(server.clients_pending_io_read);
Expand All @@ -4751,16 +4753,17 @@ int processIOThreadsReadDone(void) {
}
/* memory barrier acquire to get the updated client state */
atomic_thread_fence(memory_order_acquire);
/* Don't post-process-writes to clients that are going to be closed anyway. */
if (c->flag.close_asap) continue;
/* If a client is protected, don't do anything,
* that may trigger read/write error or recreate handler. */
if (c->flag.protected) continue;

listUnlinkNode(server.clients_pending_io_read, ln);
c->flag.pending_read = 0;
c->io_read_state = CLIENT_IDLE;

/* Don't post-process-reads to clients that are going to be closed anyway. */
if (c->flag.close_asap) continue;
/* If a client is protected, don't do anything,
* that may trigger read/write error or recreate handler. */
if (c->flag.protected) continue;

processed++;
server.stat_io_reads_processed++;

Expand All @@ -4787,17 +4790,15 @@ int processIOThreadsReadDone(void) {
c->flag.pending_command = 1;
}

size_t list_len = listLength(server.clients_pending_io_read);
addCommandToBatchAndProcessIfFull(c);

/* There is a possibility that a client was closed during the latest batch processing.
* In this case the next node may be invalid */
if (batch.client_closed_during_batch_execution) {
if (list_len != listLength(server.clients_pending_io_read)) {
/* A client was removed from the list - next node may be invalid */
next = listFirst(server.clients_pending_io_read);
batch.client_closed_during_batch_execution = 0;
}
}

processBatchClientsCommands();
batchPrefetchArgsAndProcessClientCommands();

return processed;
}
Expand Down
Loading

0 comments on commit 2fbf659

Please sign in to comment.