From f821c869a6754d9620286c940345c57e88c7c2eb Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Mon, 6 May 2024 08:01:47 +0000 Subject: [PATCH] Memory Access Amortization Signed-off-by: Uri Yagelnik --- src/dict.c | 141 ++++++++++++++++++++++++++++++++++++ src/dict.h | 4 +- src/fmtargs.h | 6 +- src/kvstore.c | 12 ++++ src/kvstore.h | 5 ++ src/networking.c | 146 ++++++++++++++++++++++++++++++++++++-- src/server.c | 5 ++ src/server.h | 2 + tests/unit/networking.tcl | 37 ++++++++++ utils/generate-fmtargs.py | 2 +- 10 files changed, 349 insertions(+), 11 deletions(-) diff --git a/src/dict.c b/src/dict.c index 2eb3dd386f..eb59949f94 100644 --- a/src/dict.c +++ b/src/dict.c @@ -1541,6 +1541,147 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio return v; } +typedef enum { PrefetchStart, PrefetchBucket, PrefetchEntry, PrefetchValue, PrefetchDone } PrefetchState; + +typedef struct { + PrefetchState state; + int ht_idx; + uint64_t idx; + uint64_t key_hash; + dictEntry *current_entry; +} PrefetchInfo; + +/* dictPrefetch - Prefetches dictionary data for an array of keys + * + * This function takes an array of dictionaries and keys, attempting to bring + * data closer to the L1 cache that might be needed for dictionary operations + * on those keys. + * + * dictFind Algorithm: + * 1. Evaluate the hash of the key + * 2. Access the index in the first table + * 3. Walk the linked list until the key is found + * If the key hasn't been found and the dictionary is in the middle of rehashing, + * access the index on the second table and repeat step 3 + * + * dictPrefetch executes the same algorithm as dictFind, but one step at a time + * for each key. Instead of waiting for data to be read from memory, it prefetches + * the data and then moves on to execute the next prefetch for another key. + * + * dictPrefetch can be invoked with a callback function, get_val_data_func, + * to bring the key's value data closer to the L1 cache as well. */ +void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) { + PrefetchInfo prefetchInfo[DictMaxPrefetchSize]; + size_t done = 0; + + assert(num_keys <= DictMaxPrefetchSize); + + /* Initialize the prefetch info */ + for (size_t i = 0; i < num_keys; i++) { + PrefetchInfo *info = &prefetchInfo[i]; + if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { + info->state = PrefetchDone; + done++; + continue; + } + info->ht_idx = -1; + info->current_entry = NULL; + info->state = PrefetchStart; + info->key_hash = dictHashKey(keys_dicts[i], keys[i]); + } + + for (size_t j = 0; done < num_keys; j++) { + size_t i = j % num_keys; + PrefetchInfo *info = &prefetchInfo[i]; + switch (info->state) { + case PrefetchDone: + /* Skip already processed keys */ + break; + + case PrefetchStart: + /* Determine which hash table to use */ + if (info->ht_idx == -1) { + info->ht_idx = 0; + } else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) { + info->ht_idx = 1; + } else { + done++; + info->state = PrefetchDone; + break; + } + + /* Prefetch the bucket */ + info->idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); + __builtin_prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->idx]); + info->state = PrefetchBucket; + break; + + case PrefetchBucket: + /* Prefetch the first entry in the bucket */ + info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->idx]; + if (info->current_entry) { + __builtin_prefetch(info->current_entry); + info->state = PrefetchEntry; + } else { + /* No entry found in the bucket - try the next table */ + info->state = PrefetchStart; + } + break; + + case PrefetchEntry: { + /* Prefetch the entry's value. */ + void *value = get_val_data_func ? dictGetVal(info->current_entry) : NULL; + + if (info->current_entry->next == NULL && !dictIsRehashing(keys_dicts[i])) { + /* If this is the last element we assume a hit and dont compare the keys */ + if (value) { + __builtin_prefetch(value); + info->state = PrefetchValue; + } else { + done++; + info->state = PrefetchDone; + } + break; + } + + if (value) { + void *current_entry_key = dictGetKey(info->current_entry); + if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { + /* If the key is found, prefetch the value */ + __builtin_prefetch(value); + info->state = PrefetchValue; + break; + } + } + + /* Move to next entry or start over */ + info->current_entry = dictGetNext(info->current_entry); + if (info->current_entry) { + __builtin_prefetch(info->current_entry); + info->state = PrefetchEntry; + } else { + info->state = PrefetchStart; + } + + break; + } + + case PrefetchValue: { + /* Prefetch value data if available */ + void *value_data = get_val_data_func(dictGetVal(info->current_entry)); + if (value_data) { + __builtin_prefetch(value_data); + } + done++; + info->state = PrefetchDone; + break; + } + + default: assert(0); + } + } +} + /* ------------------------- private functions ------------------------------ */ /* Because we may need to allocate huge memory chunk at once when dict diff --git a/src/dict.h b/src/dict.h index 97a79910cb..f984023e23 100644 --- a/src/dict.h +++ b/src/dict.h @@ -45,7 +45,8 @@ #define DICT_ERR 1 /* Hash table parameters */ -#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */ +#define HASHTABLE_MIN_FILL 8 /* Minimal hash table fill 12.5%(100/8) */ +#define DictMaxPrefetchSize 16 /* Limit of maximum number of dict entries to prefetch */ typedef struct dictEntry dictEntry; /* opaque */ typedef struct dict dict; @@ -247,6 +248,7 @@ unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); uint64_t dictGetHash(dict *d, const void *key); void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size); +void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)); size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full); dictStats *dictGetStatsHt(dict *d, int htidx, int full); diff --git a/src/fmtargs.h b/src/fmtargs.h index e52d3b99c5..3fb6cbb479 100644 --- a/src/fmtargs.h +++ b/src/fmtargs.h @@ -44,9 +44,9 @@ /* Everything below this line is automatically generated by * generate-fmtargs.py. Do not manually edit. */ -#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N +#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, N, ...) N -#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 +#define RSEQ_N() 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 #define COMPACT_FMT_2(fmt, value) fmt #define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__) @@ -108,6 +108,7 @@ #define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__) #define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__) #define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__) +#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__) #define COMPACT_VALUES_2(fmt, value) value #define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__) @@ -169,5 +170,6 @@ #define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__) #define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__) #define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__) +#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__) #endif diff --git a/src/kvstore.c b/src/kvstore.c index 16cc8e4822..d6f886c95b 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -828,3 +828,15 @@ int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) { } return ret; } + +void kvstoreDictPrefetch(kvstore **kvs, + int *slots, + const void **keys, + size_t keys_count, + void *(*get_val_data_func)(const void *val)) { + dict *dicts[keys_count]; + for (size_t i = 0; i < keys_count; i++) { + dicts[i] = kvstoreGetDict(kvs[i], slots[i]); + } + dictPrefetch(dicts, keys_count, keys, get_val_data_func); +} diff --git a/src/kvstore.h b/src/kvstore.h index a94f366b6b..40d1eab15f 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -36,6 +36,11 @@ int kvstoreNumNonEmptyDicts(kvstore *kvs); int kvstoreNumAllocatedDicts(kvstore *kvs); int kvstoreNumDicts(kvstore *kvs); uint64_t kvstoreGetHash(kvstore *kvs, const void *key); +void kvstoreDictPrefetch(kvstore **kvs, + int *slots, + const void **keys, + size_t keys_count, + void *(*get_val_data_func)(const void *val)); /* kvstore iterator specific functions */ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs); diff --git a/src/networking.c b/src/networking.c index 915a0b016f..17325c1c54 100644 --- a/src/networking.c +++ b/src/networking.c @@ -45,6 +45,8 @@ static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); +void removeClientFromPendingPrefetchBatch(client *c); + int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; @@ -1504,6 +1506,7 @@ void unlinkClient(client *c) { listDelNode(server.clients, c->client_list_node); c->client_list_node = NULL; } + removeClientFromPendingPrefetchBatch(c); /* Check if this is a replica waiting for diskless replication (rdb pipe), * in which case it needs to be cleaned from that list */ @@ -4612,9 +4615,124 @@ int postponeClientRead(client *c) { return (trySendReadToIOThreads(c) == C_OK); } +/* Prefetch multiple commands batch */ +typedef struct { + client *clients[DictMaxPrefetchSize]; + size_t clients_count; + size_t keys_count; + void *keys[DictMaxPrefetchSize]; + kvstore *keys_kvs[DictMaxPrefetchSize]; + kvstore *expire_kvs[DictMaxPrefetchSize]; + int slots[DictMaxPrefetchSize]; + int client_closed_during_batch_execution; +} BatchProcessData; + +static BatchProcessData batch = {0}; + +static void *getValData(const void *val) { + robj *o = (robj *)val; + if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) { + return o->ptr; + } + return NULL; +} + +void processBatchClientsCommands(void) { + if (batch.clients_count == 0) return; + /* Prefetch argv's for all clients */ + for (size_t i = 0; i < batch.clients_count; i++) { + client *c = batch.clients[i]; + if (!c || c->argc <= 1) continue; + /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ + for (int j = 1; j < c->argc; j++) { + __builtin_prefetch(c->argv[j]); + } + } + + /* prefetch the argv->ptr if required */ + for (size_t i = 0; i < batch.clients_count; i++) { + client *c = batch.clients[i]; + if (!c || c->argc <= 1) continue; + for (int j = 1; j < c->argc; j++) { + if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { + __builtin_prefetch(c->argv[j]->ptr); + } + } + } + + /* Get the keys ptrs - we do it here since we wanted to wait for the arg prefetch */ + for (size_t i = 0; i < batch.keys_count; i++) { + batch.keys[i] = ((robj *)batch.keys[i])->ptr; + } + + /* Prefetch keys for all commands */ + if (batch.keys_count > 1) { + /* Keys */ + kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, getValData); + /* Expires - with expires no values prefetch are required. */ + kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, NULL); + } + + /* Process clients' commands */ + for (size_t i = 0; i < batch.clients_count; i++) { + client *c = batch.clients[i]; + if (c) { + /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ + batch.clients[i] = NULL; + if (processPendingCommandAndInputBuffer(c) != C_ERR) { + beforeNextClient(c); + } + } + } + + batch.keys_count = 0; + batch.clients_count = 0; +} + +void addCommandToBatchAndProcessIfFull(client *c) { + batch.clients[batch.clients_count++] = c; + + /* Get command's keys. + * When ProcessingEventsWhileBlocked is set, we don't want to prefetch keys, as no commands will be executed. */ + if (c->io_parsed_cmd && !ProcessingEventsWhileBlocked) { + getKeysResult result; + initGetKeysResult(&result); + int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); + for (int i = 0; i < num_keys && batch.keys_count < DictMaxPrefetchSize; i++) { + batch.keys[batch.keys_count] = c->argv[result.keys[i].pos]; + batch.slots[batch.keys_count] = c->slot > 0 ? c->slot : 0; + batch.keys_kvs[batch.keys_count] = c->db->keys; + batch.expire_kvs[batch.keys_count] = c->db->expires; + batch.keys_count++; + } + getKeysFreeResult(&result); + } + + /* If the batch is full, process it. + * We also check the client count to handle cases where + * no keys exist for the client's command. */ + if (batch.clients_count == DictMaxPrefetchSize || batch.keys_count == DictMaxPrefetchSize) { + processBatchClientsCommands(); + } +} + +void removeClientFromPendingPrefetchBatch(client *c) { + if (batch.clients_count == 0) return; + + batch.client_closed_during_batch_execution = 1; + + for (size_t i = 0; i < batch.clients_count; i++) { + if (batch.clients[i] == c) { + batch.clients[i] = NULL; + return; + } + } +} + int processIOThreadsReadDone(void) { - if (listLength(server.clients_pending_io_read) == 0) return 0; + if (listLength(server.clients_pending_io_read) == 0 && batch.clients_count == 0) return 0; int processed = 0; + batch.client_closed_during_batch_execution = 0; listNode *ln; listNode *next = listFirst(server.clients_pending_io_read); @@ -4667,16 +4785,18 @@ int processIOThreadsReadDone(void) { c->flag.pending_command = 1; } - size_t list_length_before_command_execute = listLength(server.clients_pending_io_read); - if (processPendingCommandAndInputBuffer(c) == C_OK) { - beforeNextClient(c); - } - if (list_length_before_command_execute != listLength(server.clients_pending_io_read)) { - /* A client was unlink from the list possibly making the next node invalid */ + addCommandToBatchAndProcessIfFull(c); + + /* There is a possibility that a client was closed during the latest batch processing. + * In this case the next node may be invalid */ + if (batch.client_closed_during_batch_execution) { next = listFirst(server.clients_pending_io_read); + batch.client_closed_during_batch_execution = 0; } } + processBatchClientsCommands(); + return processed; } @@ -4775,6 +4895,18 @@ void ioThreadReadQueryFromClient(void *data) { c->io_parsed_cmd = NULL; } + /* Offload slot calculations to the I/O thread to reduce main-thread load. */ + if (c->io_parsed_cmd && server.cluster_enabled) { + getKeysResult result; + initGetKeysResult(&result); + int numkeys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); + if (numkeys) { + robj *first_key = c->argv[result.keys[0].pos]; + c->slot = calculateKeySlot(first_key->ptr); + } + getKeysFreeResult(&result); + } + done: trimClientQueryBuffer(c); atomic_thread_fence(memory_order_release); diff --git a/src/server.c b/src/server.c index d332e6989c..3e7abeab19 100644 --- a/src/server.c +++ b/src/server.c @@ -5623,6 +5623,10 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { server.stat_last_eviction_exceeded_time ? (long long)elapsedUs(server.stat_last_eviction_exceeded_time) : 0; long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long)elapsedUs(server.stat_last_active_defrag_time) : 0; + long long average_prefetch_batch_size = + (server.stat_total_prefetch_batches + ? server.stat_total_prefetch_entries / server.stat_total_prefetch_batches + : 0); if (sections++) info = sdscat(info, "\r\n"); /* clang-format off */ @@ -5678,6 +5682,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, + "io_threaded_average_prefetch_batch_size:%lld\r\n", average_prefetch_batch_size, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index ccdece20dd..d1a4d94190 100644 --- a/src/server.h +++ b/src/server.h @@ -1828,6 +1828,8 @@ struct valkeyServer { long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ + long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ + long long stat_total_prefetch_batches; /* Total number of prefetched batches */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 24f8caae9c..7cc11b2ff9 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -170,3 +170,40 @@ start_server {config "minimal.conf" tags {"external:skip"}} { } } } + +start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} { + test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { + # Create 17 (prefetch batch size) +1 clients + for {set i 0} {$i < 17} {incr i} { + set rd$i [valkey_deferring_client] + } + + # Get the client ID of rd4 + $rd4 client id + set rd4_id [$rd4 read] + + # Create a batch of commands by making sure the server sleeps for a while + # before responding to the first command + $rd0 debug sleep 2 + after 200 ; # wait a bit to make sure the server is sleeping. + + # The first client will kill the fourth client + $rd1 client kill id $rd4_id + + # Send set commands for all clients except the first + for {set i 1} {$i < 17} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Read the results + assert_equal {1} [$rd1 read] + catch {$rd4 read} err + assert_match {I/O error reading reply} $err + + # Verify the final state + $rd16 get a + assert_equal {OK} [$rd16 read] + assert_equal {16} [$rd16 read] + } +} diff --git a/utils/generate-fmtargs.py b/utils/generate-fmtargs.py index e16cc368fa..873b8f67f6 100755 --- a/utils/generate-fmtargs.py +++ b/utils/generate-fmtargs.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Outputs the generated part of src/fmtargs.h -MAX_ARGS = 120 +MAX_ARGS = 122 import os print("/* Everything below this line is automatically generated by")