Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Aug 25, 2024
1 parent d300b1b commit c319572
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 79 deletions.
8 changes: 1 addition & 7 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2564,12 +2564,6 @@ static int updateOOMScoreAdj(const char **err) {
return 1;
}

static int UpdateMaxPrefetchBatchSize(const char **err) {
UNUSED(err);
onMaxBatchSizeChange();
return 1;
}

int invalidateClusterSlotsResp(const char **err) {
UNUSED(err);
clearCachedClusterSlotsResponse();
Expand Down Expand Up @@ -3170,7 +3164,7 @@ standardConfig static_configs[] = {
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, UpdateMaxPrefetchBatchSize),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
Expand Down
1 change: 0 additions & 1 deletion src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ static void _dictExpandIfNeeded(dict *d);
static void _dictShrinkIfNeeded(dict *d);
static signed char _dictNextExp(unsigned long size);
static int _dictInit(dict *d, dictType *type);
dictEntry *dictGetNext(const dictEntry *de);
static dictEntry **dictGetNextRef(dictEntry *de);
static void dictSetNext(dictEntry *de, dictEntry *next);

Expand Down
1 change: 1 addition & 0 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ void dictInitIterator(dictIterator *iter, dict *d);
void dictInitSafeIterator(dictIterator *iter, dict *d);
void dictResetIterator(dictIterator *iter);
dictEntry *dictNext(dictIterator *iter);
dictEntry *dictGetNext(const dictEntry *de);
void dictReleaseIterator(dictIterator *iter);
dictEntry *dictGetRandomKey(dict *d);
dictEntry *dictGetFairRandomKey(dict *d);
Expand Down
6 changes: 6 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "io_threads.h"

static __thread int thread_id = 0; /* Thread local var */
Expand Down
1 change: 1 addition & 0 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val);
dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index);
void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index);
int kvstoreDictDelete(kvstore *kvs, int didx, const void *key);
dict *kvstoreGetDict(kvstore *kvs, int didx);

#endif /* DICTARRAY_H_ */
90 changes: 47 additions & 43 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*
* This file utilizes prefetching keys and data for multiple commands in a batch,
* to improve performance by amortizing memory access costs across multiple operations.
*/
Expand Down Expand Up @@ -48,38 +52,38 @@ typedef enum {
└───────────┬──────────────┘ │
| │
┌───────-─▼─────────────┐ │
│ PREFETCH_DONE │◄────────┘
│ PREFETCH_DONE │◄────────┘
└───────────────────────┘
**********************************************************************************************************************/

typedef void *(*GetValueDataFunc)(const void *val);

typedef struct PrefetchInfo {
typedef struct KeyPrefetchInfo {
PrefetchState state; /* Current state of the prefetch operation */
HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
uint64_t bucket_idx; /* Index of the bucket in the current hash table */
uint64_t key_hash; /* Hash value of the key being prefetched */
dictEntry *current_entry; /* Pointer to the current entry being processed */
} PrefetchInfo;

/* CommandsBatch structure holds the state of the current batch of client commands being processed. */
typedef struct CommandsBatch {
size_t cur_idx; /* Index of the current key being processed */
size_t keys_done; /* Number of keys that have been prefetched */
size_t key_count; /* Number of keys in the current batch */
size_t client_count; /* Number of clients in the current batch */
size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */
size_t executed_commands; /* Number of commands executed in the current batch */
int *slots; /* Array of slots for each key */
void **keys; /* Array of keys to prefetch in the current batch */
client **clients; /* Array of clients in the current batch */
dict **keys_dicts; /* Main dict for each key */
dict **expire_dicts; /* Expire dict for each key */
dict **current_dicts; /* Points to either keys_dicts or expire_dicts */
PrefetchInfo *prefetch_info; /* Prefetch info for each key */
} CommandsBatch;

static CommandsBatch *batch = NULL;
} KeyPrefetchInfo;

/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */
typedef struct PrefetchCommandsBatch {
size_t cur_idx; /* Index of the current key being processed */
size_t keys_done; /* Number of keys that have been prefetched */
size_t key_count; /* Number of keys in the current batch */
size_t client_count; /* Number of clients in the current batch */
size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */
size_t executed_commands; /* Number of commands executed in the current batch */
int *slots; /* Array of slots for each key */
void **keys; /* Array of keys to prefetch in the current batch */
client **clients; /* Array of clients in the current batch */
dict **keys_dicts; /* Main dict for each key */
dict **expire_dicts; /* Expire dict for each key */
dict **current_dicts; /* Points to either keys_dicts or expire_dicts */
KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */
} PrefetchCommandsBatch;

static PrefetchCommandsBatch *batch = NULL;

void freePrefetchCommandsBatch(void) {
if (batch == NULL) {
Expand All @@ -104,14 +108,14 @@ void prefetchCommandsBatchInit(void) {
return;
}

batch = zcalloc(sizeof(CommandsBatch));
batch = zcalloc(sizeof(PrefetchCommandsBatch));
batch->max_prefetch_size = max_prefetch_size;
batch->clients = zcalloc(max_prefetch_size * sizeof(client *));
batch->keys = zcalloc(max_prefetch_size * sizeof(void *));
batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *));
batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *));
batch->slots = zcalloc(max_prefetch_size * sizeof(int));
batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(PrefetchInfo));
batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo));
}

void onMaxBatchSizeChange(void) {
Expand All @@ -125,23 +129,23 @@ void onMaxBatchSizeChange(void) {
}

/* Prefetch the given pointer and move to the next key in the batch. */
static void prefetch(void *addr) {
static void prefetchAndMoveToNextKey(void *addr) {
valkey_prefetch(addr);
/* While the prefetch is in progress, we can continue to the next key */
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
}

static void markDone(PrefetchInfo *info) {
static void markKeyAsdone(KeyPrefetchInfo *info) {
info->state = PREFETCH_DONE;
server.stat_total_prefetch_entries++;
batch->keys_done++;
}

/* Returns the next PrefetchInfo structure that needs to be processed. */
static PrefetchInfo *getNextPrefetchInfo(void) {
/* Returns the next KeyPrefetchInfo structure that needs to be processed. */
static KeyPrefetchInfo *getNextPrefetchInfo(void) {
size_t start_idx = batch->cur_idx;
do {
PrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
if (info->state != PREFETCH_DONE) return info;
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
} while (batch->cur_idx != start_idx);
Expand All @@ -153,7 +157,7 @@ static void initBatchInfo(dict **dicts) {

/* Initialize the prefetch info */
for (size_t i = 0; i < batch->key_count; i++) {
PrefetchInfo *info = &batch->prefetch_info[i];
KeyPrefetchInfo *info = &batch->prefetch_info[i];
if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) {
info->state = PREFETCH_DONE;
batch->keys_done++;
Expand All @@ -168,7 +172,7 @@ static void initBatchInfo(dict **dicts) {

/* Prefetch the bucket of the next hash table index.
* If no tables are left, move to the PREFETCH_DONE state. */
static void prefetchBucket(PrefetchInfo *info) {
static void prefetchBucket(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;

/* Determine which hash table to use */
Expand All @@ -178,20 +182,20 @@ static void prefetchBucket(PrefetchInfo *info) {
info->ht_idx = HT_IDX_SECOND;
} else {
/* No more tables left - mark as done. */
markDone(info);
markKeyAsdone(info);
return;
}

/* Prefetch the bucket */
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]);
prefetch(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
info->current_entry = NULL;
info->state = PREFETCH_ENTRY;
}

/* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state.
* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */
static void prefetchEntry(PrefetchInfo *info) {
static void prefetchEntry(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;

if (info->current_entry) {
Expand All @@ -203,7 +207,7 @@ static void prefetchEntry(PrefetchInfo *info) {
}

if (info->current_entry) {
prefetch(info->current_entry);
prefetchAndMoveToNextKey(info->current_entry);
info->state = PREFETCH_VALUE;
} else {
/* No entry found in the bucket - try the bucket in the next table */
Expand All @@ -213,13 +217,13 @@ static void prefetchEntry(PrefetchInfo *info) {

/* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state.
* If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */
static void prefetchValue(PrefetchInfo *info) {
static void prefetchValue(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
void *value = dictGetVal(info->current_entry);

if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) {
/* If this is the last element, we assume a hit and don't compare the keys */
prefetch(value);
prefetchAndMoveToNextKey(value);
info->state = PREFETCH_VALUE_DATA;
return;
}
Expand All @@ -228,7 +232,7 @@ static void prefetchValue(PrefetchInfo *info) {
if (batch->keys[i] == current_entry_key ||
dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) {
/* If the key is found, prefetch the value */
prefetch(value);
prefetchAndMoveToNextKey(value);
info->state = PREFETCH_VALUE_DATA;
} else {
/* Move to the next entry */
Expand All @@ -237,12 +241,12 @@ static void prefetchValue(PrefetchInfo *info) {
}

/* Prefetch the value data if available. */
static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_func) {
static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_data_func) {
if (get_val_data_func) {
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
if (value_data) prefetch(value_data);
if (value_data) prefetchAndMoveToNextKey(value_data);
}
markDone(info);
markKeyAsdone(info);
}

/* Prefetch dictionary data for an array of keys.
Expand All @@ -268,7 +272,7 @@ static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_
*/
static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) {
initBatchInfo(dicts);
PrefetchInfo *info;
KeyPrefetchInfo *info;
while ((info = getNextPrefetchInfo())) {
switch (info->state) {
case PREFETCH_BUCKET: prefetchBucket(info); break;
Expand Down Expand Up @@ -357,7 +361,7 @@ void processClientsCommandsBatch(void) {

resetCommandsBatch();

/* Handle the case where the max prefetch size has been changed during the batch processing */
/* Handle the case where the max prefetch size has been changed. */
if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) {
onMaxBatchSizeChange();
}
Expand Down
1 change: 0 additions & 1 deletion src/memory_prefetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

struct client;

void onMaxBatchSizeChange(void);
void prefetchCommandsBatchInit(void);
void processClientsCommandsBatch(void);
int addCommandToBatchAndProcessIfFull(struct client *c);
Expand Down
Loading

0 comments on commit c319572

Please sign in to comment.