diff --git a/src/config.h b/src/config.h index 201e421976..844545dee5 100644 --- a/src/config.h +++ b/src/config.h @@ -348,4 +348,20 @@ void setcpuaffinity(const char *cpulist); #endif #endif +/* Check for GCC version >= 4.9 */ +#if defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 9)) +#define HAS_BUILTIN_PREFETCH 1 +/* Check for Clang version >= 3.6 */ +#elif defined(__clang__) && (__clang_major__ > 3 || (__clang_major__ == 3 && __clang_minor__ >= 6)) +#define HAS_BUILTIN_PREFETCH 1 +#else +#define HAS_BUILTIN_PREFETCH 0 +#endif + +#if HAS_BUILTIN_PREFETCH +#define valkey_prefetch(addr) __builtin_prefetch(addr) +#else +#define valkey_prefetch(addr) ((void)(addr)) +#endif + #endif diff --git a/src/dict.c b/src/dict.c index 3f2b547ac1..c7bace846a 100644 --- a/src/dict.c +++ b/src/dict.c @@ -48,6 +48,7 @@ #include "zmalloc.h" #include "serverassert.h" #include "monotonic.h" +#include "config.h" #ifndef static_assert #define static_assert(expr, lit) _Static_assert(expr, lit) @@ -1595,47 +1596,38 @@ typedef struct { size_t keys_done; /* Number of keys that have been processed */ } PrefetchBatch; -static PrefetchBatch prefetchBatch; /* Global prefetch batch - holds the current batch of keys being prefetched */ - -static void incrCurIdx(void) { - prefetchBatch.cur_idx++; - if (prefetchBatch.cur_idx >= prefetchBatch.current_batch_size) { - prefetchBatch.cur_idx %= prefetchBatch.current_batch_size; - } -} - /* Prefetches the given pointer and move to the next key in the batch */ -static void prefetch(void *ptr) { - __builtin_prefetch(ptr); +static void prefetch(void *addr, PrefetchBatch *batch) { + valkey_prefetch(addr); /* while the prefetch is in progress, we can continue to the next key */ - incrCurIdx(); + batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; } -static void markDone(PrefetchInfo *info) { +static void markDone(PrefetchInfo *info, PrefetchBatch *batch) { info->state = PrefetchDone; - prefetchBatch.keys_done++; + batch->keys_done++; } -static PrefetchInfo *getNextPrefetchInfo(void) { - while (prefetchBatch.prefetch_info[prefetchBatch.cur_idx].state == PrefetchDone) { - incrCurIdx(); +static PrefetchInfo *getNextPrefetchInfo(PrefetchBatch *batch) { + while (batch->prefetch_info[batch->cur_idx].state == PrefetchDone) { + batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; } - return &prefetchBatch.prefetch_info[prefetchBatch.cur_idx]; + return &batch->prefetch_info[batch->cur_idx]; } -static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) { +static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys, PrefetchBatch *batch) { assert(num_keys <= DictMaxPrefetchSize); - prefetchBatch.current_batch_size = num_keys; - prefetchBatch.cur_idx = 0; - prefetchBatch.keys_done = 0; + batch->current_batch_size = num_keys; + batch->cur_idx = 0; + batch->keys_done = 0; /* Initialize the prefetch info */ - for (size_t i = 0; i < prefetchBatch.current_batch_size; i++) { - PrefetchInfo *info = &prefetchBatch.prefetch_info[i]; + for (size_t i = 0; i < batch->current_batch_size; i++) { + PrefetchInfo *info = &batch->prefetch_info[i]; if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { info->state = PrefetchDone; - prefetchBatch.keys_done++; + batch->keys_done++; continue; } info->ht_idx = -1; @@ -1665,11 +1657,12 @@ static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) { * dictPrefetch can be invoked with a callback function, get_val_data_func, * to bring the key's value data closer to the L1 cache as well. */ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) { - initBatch(keys_dicts, num_keys, keys); + PrefetchBatch batch; /* prefetch batch - holds the current batch of keys being prefetched */ + initBatch(keys_dicts, num_keys, keys, &batch); - while (prefetchBatch.keys_done < prefetchBatch.current_batch_size) { - PrefetchInfo *info = getNextPrefetchInfo(); - size_t i = prefetchBatch.cur_idx; + while (batch.keys_done < batch.current_batch_size) { + PrefetchInfo *info = getNextPrefetchInfo(&batch); + size_t i = batch.cur_idx; switch (info->state) { case PrefetchBucket: /* Determine which hash table to use */ @@ -1679,13 +1672,13 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( info->ht_idx = 1; } else { /* No more tables left - mark as done. */ - markDone(info); + markDone(info, &batch); break; } /* Prefetch the bucket */ info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); - prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx], &batch); info->current_entry = NULL; info->state = PrefetchEntry; break; @@ -1701,7 +1694,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( } if (info->current_entry) { - prefetch(info->current_entry); + prefetch(info->current_entry, &batch); info->state = PrefetchValue; } else { /* No entry found in the bucket - try the bucket in the next table */ @@ -1715,7 +1708,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) { /* If this is the last element we assume a hit and dont compare the keys */ - prefetch(value); + prefetch(value, &batch); info->state = PrefetchValueData; break; } @@ -1723,7 +1716,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( void *current_entry_key = dictGetKey(info->current_entry); if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { /* If the key is found, prefetch the value */ - prefetch(value); + prefetch(value, &batch); info->state = PrefetchValueData; } else { /* Move to next entry */ @@ -1736,9 +1729,9 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( /* Prefetch value data if available */ if (get_val_data_func) { void *value_data = get_val_data_func(dictGetVal(info->current_entry)); - if (value_data) prefetch(value_data); + if (value_data) prefetch(value_data, &batch); } - markDone(info); + markDone(info, &batch); break; } diff --git a/src/networking.c b/src/networking.c index 009729b041..3965325123 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4618,18 +4618,17 @@ int postponeClientRead(client *c) { /* Prefetch multiple commands batch */ typedef struct { client *clients[DictMaxPrefetchSize]; - size_t clients_count; - size_t keys_count; + size_t client_count; + size_t key_count; void *keys[DictMaxPrefetchSize]; kvstore *keys_kvs[DictMaxPrefetchSize]; kvstore *expire_kvs[DictMaxPrefetchSize]; int slots[DictMaxPrefetchSize]; - int client_closed_during_batch_execution; } BatchProcessData; static BatchProcessData batch = {0}; -static void *getValData(const void *val) { +static void *getObjectValuePtr(const void *val) { robj *o = (robj *)val; if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) { return o->ptr; @@ -4637,93 +4636,91 @@ static void *getValData(const void *val) { return NULL; } -void processBatchClientsCommands(void) { - if (batch.clients_count == 0) return; +static void batchProcessClientCommands(void) { + for (size_t i = 0; i < batch.client_count; i++) { + client *c = batch.clients[i]; + if (c) { + /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ + batch.clients[i] = NULL; + if (processPendingCommandAndInputBuffer(c) != C_ERR) { + beforeNextClient(c); + } + } + } + memset(&batch, 0, sizeof(batch)); +} + +/*Prefetch the commands' args allocated by the I/O thread and process all the commands in the batch.*/ +static void batchPrefetchArgsAndProcessClientCommands(void) { + if (batch.client_count == 0) return; /* Prefetch argv's for all clients */ - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { client *c = batch.clients[i]; if (!c || c->argc <= 1) continue; /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ for (int j = 1; j < c->argc; j++) { - __builtin_prefetch(c->argv[j]); + valkey_prefetch(c->argv[j]); } } /* prefetch the argv->ptr if required */ - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { client *c = batch.clients[i]; if (!c || c->argc <= 1) continue; for (int j = 1; j < c->argc; j++) { if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { - __builtin_prefetch(c->argv[j]->ptr); + valkey_prefetch(c->argv[j]->ptr); } } } /* Get the keys ptrs - we do it here since we wanted to wait for the arg prefetch */ - for (size_t i = 0; i < batch.keys_count; i++) { + for (size_t i = 0; i < batch.key_count; i++) { batch.keys[i] = ((robj *)batch.keys[i])->ptr; } /* Prefetch keys for all commands, prefetch is beneficial only if there are more than one key */ - if (batch.keys_count > 1) { + if (batch.key_count > 1) { server.stat_total_prefetch_batches++; - server.stat_total_prefetch_entries += batch.keys_count; + server.stat_total_prefetch_entries += batch.key_count; /* Keys */ - kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, getValData); + kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **) batch.keys, batch.key_count, getObjectValuePtr); /* Expires - with expires no values prefetch are required. */ - kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, NULL); + kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.key_count, NULL); } /* Process clients' commands */ - for (size_t i = 0; i < batch.clients_count; i++) { - client *c = batch.clients[i]; - if (c) { - /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ - batch.clients[i] = NULL; - if (processPendingCommandAndInputBuffer(c) != C_ERR) { - beforeNextClient(c); - } - } - } - - batch.keys_count = 0; - batch.clients_count = 0; + batchProcessClientCommands(); } void addCommandToBatchAndProcessIfFull(client *c) { - batch.clients[batch.clients_count++] = c; + batch.clients[batch.client_count++] = c; - /* Get command's keys. - * When ProcessingEventsWhileBlocked is set, we don't want to prefetch keys, as no commands will be executed. */ - if (c->io_parsed_cmd && !ProcessingEventsWhileBlocked) { + /* Get command's keys positions */ + if (c->io_parsed_cmd) { getKeysResult result; initGetKeysResult(&result); int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch.keys_count < DictMaxPrefetchSize; i++) { - batch.keys[batch.keys_count] = c->argv[result.keys[i].pos]; - batch.slots[batch.keys_count] = c->slot > 0 ? c->slot : 0; - batch.keys_kvs[batch.keys_count] = c->db->keys; - batch.expire_kvs[batch.keys_count] = c->db->expires; - batch.keys_count++; + for (int i = 0; i < num_keys && batch.key_count < DictMaxPrefetchSize; i++) { + batch.keys[batch.key_count] = c->argv[result.keys[i].pos]; + batch.slots[batch.key_count] = c->slot > 0 ? c->slot : 0; + batch.keys_kvs[batch.key_count] = c->db->keys; + batch.expire_kvs[batch.key_count] = c->db->expires; + batch.key_count++; } getKeysFreeResult(&result); } /* If the batch is full, process it. * We also check the client count to handle cases where - * no keys exist for the client's command. */ - if (batch.clients_count == DictMaxPrefetchSize || batch.keys_count == DictMaxPrefetchSize) { - processBatchClientsCommands(); + * no keys exist for the clients' commands. */ + if (batch.client_count == DictMaxPrefetchSize || batch.key_count == DictMaxPrefetchSize) { + batchPrefetchArgsAndProcessClientCommands(); } } void removeClientFromPendingPrefetchBatch(client *c) { - if (batch.clients_count == 0) return; - - batch.client_closed_during_batch_execution = 1; - - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { if (batch.clients[i] == c) { batch.clients[i] = NULL; return; @@ -4732,9 +4729,14 @@ void removeClientFromPendingPrefetchBatch(client *c) { } int processIOThreadsReadDone(void) { - if (listLength(server.clients_pending_io_read) == 0 && batch.clients_count == 0) return 0; + if (ProcessingEventsWhileBlocked) { + /* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively. + * In this case, there may be some clients left in the batch waiting to be processed. */ + batchProcessClientCommands(); + } + + if (listLength(server.clients_pending_io_read) == 0) return 0; int processed = 0; - batch.client_closed_during_batch_execution = 0; listNode *ln; listNode *next = listFirst(server.clients_pending_io_read); @@ -4751,16 +4753,17 @@ int processIOThreadsReadDone(void) { } /* memory barrier acquire to get the updated client state */ atomic_thread_fence(memory_order_acquire); - /* Don't post-process-writes to clients that are going to be closed anyway. */ - if (c->flag.close_asap) continue; - /* If a client is protected, don't do anything, - * that may trigger read/write error or recreate handler. */ - if (c->flag.protected) continue; listUnlinkNode(server.clients_pending_io_read, ln); c->flag.pending_read = 0; c->io_read_state = CLIENT_IDLE; + /* Don't post-process-reads to clients that are going to be closed anyway. */ + if (c->flag.close_asap) continue; + /* If a client is protected, don't do anything, + * that may trigger read/write error or recreate handler. */ + if (c->flag.protected) continue; + processed++; server.stat_io_reads_processed++; @@ -4787,17 +4790,15 @@ int processIOThreadsReadDone(void) { c->flag.pending_command = 1; } + size_t list_len = listLength(server.clients_pending_io_read); addCommandToBatchAndProcessIfFull(c); - - /* There is a possibility that a client was closed during the latest batch processing. - * In this case the next node may be invalid */ - if (batch.client_closed_during_batch_execution) { + if (list_len != listLength(server.clients_pending_io_read)) { + /* A client was removed from the list - next node may be invalid */ next = listFirst(server.clients_pending_io_read); - batch.client_closed_during_batch_execution = 0; } } - processBatchClientsCommands(); + batchPrefetchArgsAndProcessClientCommands(); return processed; } diff --git a/src/server.c b/src/server.c index 3e7abeab19..4bcbbe4826 100644 --- a/src/server.c +++ b/src/server.c @@ -5623,10 +5623,6 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { server.stat_last_eviction_exceeded_time ? (long long)elapsedUs(server.stat_last_eviction_exceeded_time) : 0; long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long)elapsedUs(server.stat_last_active_defrag_time) : 0; - long long average_prefetch_batch_size = - (server.stat_total_prefetch_batches - ? server.stat_total_prefetch_entries / server.stat_total_prefetch_batches - : 0); if (sections++) info = sdscat(info, "\r\n"); /* clang-format off */ @@ -5682,7 +5678,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, - "io_threaded_average_prefetch_batch_size:%lld\r\n", average_prefetch_batch_size, + "io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches, + "io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index a9831085f1..1740436c2b 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -206,8 +206,10 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb # verify the prefetch stats are as expected set info [r info stats] - set prefetch_stats [getInfoProperty $info io_threaded_average_prefetch_batch_size] - assert_range [expr $prefetch_stats] 2 15 ; # we expect max 15 as the the kill command doesn't have any keys. + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower + set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] + assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher # Verify the final state $rd16 get a diff --git a/utils/generate-fmtargs.py b/utils/generate-fmtargs.py index 873b8f67f6..dfe8efadcc 100755 --- a/utils/generate-fmtargs.py +++ b/utils/generate-fmtargs.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Outputs the generated part of src/fmtargs.h -MAX_ARGS = 122 +MAX_ARGS = 200 import os print("/* Everything below this line is automatically generated by")