Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve multithreaded performance with memory prefetching #861

Merged
merged 7 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,147 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio
return v;
}

typedef enum { PrefetchStart, PrefetchBucket, PrefetchEntry, PrefetchValue, PrefetchDone } PrefetchState;
uriyage marked this conversation as resolved.
Show resolved Hide resolved

typedef struct {
PrefetchState state;
int ht_idx;
uint64_t idx;
uint64_t key_hash;
dictEntry *current_entry;
uriyage marked this conversation as resolved.
Show resolved Hide resolved
} PrefetchInfo;
madolson marked this conversation as resolved.
Show resolved Hide resolved

/* dictPrefetch - Prefetches dictionary data for an array of keys
*
* This function takes an array of dictionaries and keys, attempting to bring
* data closer to the L1 cache that might be needed for dictionary operations
* on those keys.
*
* dictFind Algorithm:
* 1. Evaluate the hash of the key
* 2. Access the index in the first table
* 3. Walk the linked list until the key is found
* If the key hasn't been found and the dictionary is in the middle of rehashing,
* access the index on the second table and repeat step 3
*
* dictPrefetch executes the same algorithm as dictFind, but one step at a time
* for each key. Instead of waiting for data to be read from memory, it prefetches
* the data and then moves on to execute the next prefetch for another key.
*
* dictPrefetch can be invoked with a callback function, get_val_data_func,
* to bring the key's value data closer to the L1 cache as well. */
void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) {
PrefetchInfo prefetchInfo[DictMaxPrefetchSize];
size_t done = 0;

assert(num_keys <= DictMaxPrefetchSize);

/* Initialize the prefetch info */
for (size_t i = 0; i < num_keys; i++) {
PrefetchInfo *info = &prefetchInfo[i];
if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) {
info->state = PrefetchDone;
done++;
continue;
}
info->ht_idx = -1;
info->current_entry = NULL;
info->state = PrefetchStart;
info->key_hash = dictHashKey(keys_dicts[i], keys[i]);
}

for (size_t j = 0; done < num_keys; j++) {
size_t i = j % num_keys;
PrefetchInfo *info = &prefetchInfo[i];
switch (info->state) {
case PrefetchDone:
/* Skip already processed keys */
break;

case PrefetchStart:
/* Determine which hash table to use */
if (info->ht_idx == -1) {
info->ht_idx = 0;
} else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) {
info->ht_idx = 1;
uriyage marked this conversation as resolved.
Show resolved Hide resolved
} else {
done++;
info->state = PrefetchDone;
break;
}

/* Prefetch the bucket */
info->idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]);
__builtin_prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->idx]);
info->state = PrefetchBucket;
break;

case PrefetchBucket:
/* Prefetch the first entry in the bucket */
info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->idx];
if (info->current_entry) {
__builtin_prefetch(info->current_entry);
info->state = PrefetchEntry;
} else {
/* No entry found in the bucket - try the next table */
info->state = PrefetchStart;
}
break;

case PrefetchEntry: {
/* Prefetch the entry's value. */
void *value = get_val_data_func ? dictGetVal(info->current_entry) : NULL;

if (info->current_entry->next == NULL && !dictIsRehashing(keys_dicts[i])) {
/* If this is the last element we assume a hit and dont compare the keys */
uriyage marked this conversation as resolved.
Show resolved Hide resolved
if (value) {
__builtin_prefetch(value);
info->state = PrefetchValue;
} else {
done++;
info->state = PrefetchDone;
}
break;
}

if (value) {
void *current_entry_key = dictGetKey(info->current_entry);
if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) {
/* If the key is found, prefetch the value */
__builtin_prefetch(value);
info->state = PrefetchValue;
break;
}
}

/* Move to next entry or start over */
info->current_entry = dictGetNext(info->current_entry);
if (info->current_entry) {
__builtin_prefetch(info->current_entry);
info->state = PrefetchEntry;
} else {
info->state = PrefetchStart;
}

break;
}

case PrefetchValue: {
/* Prefetch value data if available */
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
if (value_data) {
__builtin_prefetch(value_data);
}
done++;
info->state = PrefetchDone;
break;
}

default: assert(0);
}
}
}

/* ------------------------- private functions ------------------------------ */

/* Because we may need to allocate huge memory chunk at once when dict
Expand Down
4 changes: 3 additions & 1 deletion src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
#define DICT_ERR 1

/* Hash table parameters */
#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */
#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */
#define DictMaxPrefetchSize 16 /* Limit of maximum number of dict entries to prefetch */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we reach to this number? Should we provide a config for this to modify (mutable/immutable) based on the workload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a heuristic, as is the entire feature. We found it to be optimal for GET commands with a value size of 512 bytes. However, the optimal setting may vary depending on the client load, command execution duration, and number of keys per command. I'm not sure about making this configuration modifiable, as determining an optimal value is complex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general concern I have is how much this might vary between different architectures. I know some of this comes from us fitting to the AWS graviton instances, but it could be very different from intel/amd/etc in addition to the workload specifics you mentioned. I guess I would like to see us at least set up a benchmark and test to see how widely it can vary, If the difference between most workloads is 2x, i.e. the optimal value is between 8-32, picking 16 is probably okay. If the optimal value goes between like 2 - 100, then maybe we should reconsider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will run some benchmarks, but I believe the optimal batch size depends much more on the specific workload. For example, if we run expensive commands against one big hashmap with expensive operations, we may want to decrease the batch size, as the prefetched items for the next commands may be pushed out of memory. Conversely, with MSET involving many small keys, the batch size can be larger. Maybe we should consider giving users the option to fine-tune this parameter and set the default to 16?"


typedef struct dictEntry dictEntry; /* opaque */
typedef struct dict dict;
Expand Down Expand Up @@ -247,6 +248,7 @@ unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);
void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val));

size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full);
dictStats *dictGetStatsHt(dict *d, int htidx, int full);
Expand Down
6 changes: 4 additions & 2 deletions src/fmtargs.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
/* Everything below this line is automatically generated by
* generate-fmtargs.py. Do not manually edit. */

#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N
#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, N, ...) N

#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
#define RSEQ_N() 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0

#define COMPACT_FMT_2(fmt, value) fmt
#define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__)
Expand Down Expand Up @@ -108,6 +108,7 @@
#define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__)
#define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__)
#define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__)
#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__)

#define COMPACT_VALUES_2(fmt, value) value
#define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__)
Expand Down Expand Up @@ -169,5 +170,6 @@
#define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__)
#define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__)
#define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__)
#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__)

#endif
12 changes: 12 additions & 0 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -828,3 +828,15 @@ int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) {
}
return ret;
}

void kvstoreDictPrefetch(kvstore **kvs,
int *slots,
const void **keys,
size_t keys_count,
void *(*get_val_data_func)(const void *val)) {
dict *dicts[keys_count];
for (size_t i = 0; i < keys_count; i++) {
dicts[i] = kvstoreGetDict(kvs[i], slots[i]);
}
dictPrefetch(dicts, keys_count, keys, get_val_data_func);
}
5 changes: 5 additions & 0 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ int kvstoreNumNonEmptyDicts(kvstore *kvs);
int kvstoreNumAllocatedDicts(kvstore *kvs);
int kvstoreNumDicts(kvstore *kvs);
uint64_t kvstoreGetHash(kvstore *kvs, const void *key);
void kvstoreDictPrefetch(kvstore **kvs,
int *slots,
const void **keys,
size_t keys_count,
void *(*get_val_data_func)(const void *val));

/* kvstore iterator specific functions */
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs);
Expand Down
Loading
Loading