From 9a41120506d551eb5a95a0dc01d396f0493e3b68 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Tue, 6 Aug 2024 22:56:43 +0000 Subject: [PATCH] Address more PR comments Signed-off-by: Uri Yagelnik --- src/config.h | 16 +++++ src/dict.c | 65 ++++++++++----------- src/fmtargs.h | 82 +++++++++++++++++++++++++- src/networking.c | 119 +++++++++++++++++++------------------- src/server.c | 7 +-- tests/unit/networking.tcl | 6 +- utils/generate-fmtargs.py | 2 +- 7 files changed, 192 insertions(+), 105 deletions(-) diff --git a/src/config.h b/src/config.h index 201e421976..844545dee5 100644 --- a/src/config.h +++ b/src/config.h @@ -348,4 +348,20 @@ void setcpuaffinity(const char *cpulist); #endif #endif +/* Check for GCC version >= 4.9 */ +#if defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 9)) +#define HAS_BUILTIN_PREFETCH 1 +/* Check for Clang version >= 3.6 */ +#elif defined(__clang__) && (__clang_major__ > 3 || (__clang_major__ == 3 && __clang_minor__ >= 6)) +#define HAS_BUILTIN_PREFETCH 1 +#else +#define HAS_BUILTIN_PREFETCH 0 +#endif + +#if HAS_BUILTIN_PREFETCH +#define valkey_prefetch(addr) __builtin_prefetch(addr) +#else +#define valkey_prefetch(addr) ((void)(addr)) +#endif + #endif diff --git a/src/dict.c b/src/dict.c index 3f2b547ac1..c7bace846a 100644 --- a/src/dict.c +++ b/src/dict.c @@ -48,6 +48,7 @@ #include "zmalloc.h" #include "serverassert.h" #include "monotonic.h" +#include "config.h" #ifndef static_assert #define static_assert(expr, lit) _Static_assert(expr, lit) @@ -1595,47 +1596,38 @@ typedef struct { size_t keys_done; /* Number of keys that have been processed */ } PrefetchBatch; -static PrefetchBatch prefetchBatch; /* Global prefetch batch - holds the current batch of keys being prefetched */ - -static void incrCurIdx(void) { - prefetchBatch.cur_idx++; - if (prefetchBatch.cur_idx >= prefetchBatch.current_batch_size) { - prefetchBatch.cur_idx %= prefetchBatch.current_batch_size; - } -} - /* Prefetches the given pointer and move to the next key in the batch */ -static void prefetch(void *ptr) { - __builtin_prefetch(ptr); +static void prefetch(void *addr, PrefetchBatch *batch) { + valkey_prefetch(addr); /* while the prefetch is in progress, we can continue to the next key */ - incrCurIdx(); + batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; } -static void markDone(PrefetchInfo *info) { +static void markDone(PrefetchInfo *info, PrefetchBatch *batch) { info->state = PrefetchDone; - prefetchBatch.keys_done++; + batch->keys_done++; } -static PrefetchInfo *getNextPrefetchInfo(void) { - while (prefetchBatch.prefetch_info[prefetchBatch.cur_idx].state == PrefetchDone) { - incrCurIdx(); +static PrefetchInfo *getNextPrefetchInfo(PrefetchBatch *batch) { + while (batch->prefetch_info[batch->cur_idx].state == PrefetchDone) { + batch->cur_idx = (batch->cur_idx + 1) % batch->current_batch_size; } - return &prefetchBatch.prefetch_info[prefetchBatch.cur_idx]; + return &batch->prefetch_info[batch->cur_idx]; } -static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) { +static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys, PrefetchBatch *batch) { assert(num_keys <= DictMaxPrefetchSize); - prefetchBatch.current_batch_size = num_keys; - prefetchBatch.cur_idx = 0; - prefetchBatch.keys_done = 0; + batch->current_batch_size = num_keys; + batch->cur_idx = 0; + batch->keys_done = 0; /* Initialize the prefetch info */ - for (size_t i = 0; i < prefetchBatch.current_batch_size; i++) { - PrefetchInfo *info = &prefetchBatch.prefetch_info[i]; + for (size_t i = 0; i < batch->current_batch_size; i++) { + PrefetchInfo *info = &batch->prefetch_info[i]; if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) { info->state = PrefetchDone; - prefetchBatch.keys_done++; + batch->keys_done++; continue; } info->ht_idx = -1; @@ -1665,11 +1657,12 @@ static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) { * dictPrefetch can be invoked with a callback function, get_val_data_func, * to bring the key's value data closer to the L1 cache as well. */ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) { - initBatch(keys_dicts, num_keys, keys); + PrefetchBatch batch; /* prefetch batch - holds the current batch of keys being prefetched */ + initBatch(keys_dicts, num_keys, keys, &batch); - while (prefetchBatch.keys_done < prefetchBatch.current_batch_size) { - PrefetchInfo *info = getNextPrefetchInfo(); - size_t i = prefetchBatch.cur_idx; + while (batch.keys_done < batch.current_batch_size) { + PrefetchInfo *info = getNextPrefetchInfo(&batch); + size_t i = batch.cur_idx; switch (info->state) { case PrefetchBucket: /* Determine which hash table to use */ @@ -1679,13 +1672,13 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( info->ht_idx = 1; } else { /* No more tables left - mark as done. */ - markDone(info); + markDone(info, &batch); break; } /* Prefetch the bucket */ info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]); - prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx], &batch); info->current_entry = NULL; info->state = PrefetchEntry; break; @@ -1701,7 +1694,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( } if (info->current_entry) { - prefetch(info->current_entry); + prefetch(info->current_entry, &batch); info->state = PrefetchValue; } else { /* No entry found in the bucket - try the bucket in the next table */ @@ -1715,7 +1708,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) { /* If this is the last element we assume a hit and dont compare the keys */ - prefetch(value); + prefetch(value, &batch); info->state = PrefetchValueData; break; } @@ -1723,7 +1716,7 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( void *current_entry_key = dictGetKey(info->current_entry); if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) { /* If the key is found, prefetch the value */ - prefetch(value); + prefetch(value, &batch); info->state = PrefetchValueData; } else { /* Move to next entry */ @@ -1736,9 +1729,9 @@ void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *( /* Prefetch value data if available */ if (get_val_data_func) { void *value_data = get_val_data_func(dictGetVal(info->current_entry)); - if (value_data) prefetch(value_data); + if (value_data) prefetch(value_data, &batch); } - markDone(info); + markDone(info, &batch); break; } diff --git a/src/fmtargs.h b/src/fmtargs.h index 3fb6cbb479..1fbd02ed82 100644 --- a/src/fmtargs.h +++ b/src/fmtargs.h @@ -44,9 +44,9 @@ /* Everything below this line is automatically generated by * generate-fmtargs.py. Do not manually edit. */ -#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, N, ...) N +#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, _123, _124, _125, _126, _127, _128, _129, _130, _131, _132, _133, _134, _135, _136, _137, _138, _139, _140, _141, _142, _143, _144, _145, _146, _147, _148, _149, _150, _151, _152, _153, _154, _155, _156, _157, _158, _159, _160, _161, _162, _163, _164, _165, _166, _167, _168, _169, _170, _171, _172, _173, _174, _175, _176, _177, _178, _179, _180, _181, _182, _183, _184, _185, _186, _187, _188, _189, _190, _191, _192, _193, _194, _195, _196, _197, _198, _199, _200, N, ...) N -#define RSEQ_N() 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 +#define RSEQ_N() 200, 199, 198, 197, 196, 195, 194, 193, 192, 191, 190, 189, 188, 187, 186, 185, 184, 183, 182, 181, 180, 179, 178, 177, 176, 175, 174, 173, 172, 171, 170, 169, 168, 167, 166, 165, 164, 163, 162, 161, 160, 159, 158, 157, 156, 155, 154, 153, 152, 151, 150, 149, 148, 147, 146, 145, 144, 143, 142, 141, 140, 139, 138, 137, 136, 135, 134, 133, 132, 131, 130, 129, 128, 127, 126, 125, 124, 123, 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 #define COMPACT_FMT_2(fmt, value) fmt #define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__) @@ -109,6 +109,45 @@ #define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__) #define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__) #define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__) +#define COMPACT_FMT_124(fmt, value, ...) fmt COMPACT_FMT_122(__VA_ARGS__) +#define COMPACT_FMT_126(fmt, value, ...) fmt COMPACT_FMT_124(__VA_ARGS__) +#define COMPACT_FMT_128(fmt, value, ...) fmt COMPACT_FMT_126(__VA_ARGS__) +#define COMPACT_FMT_130(fmt, value, ...) fmt COMPACT_FMT_128(__VA_ARGS__) +#define COMPACT_FMT_132(fmt, value, ...) fmt COMPACT_FMT_130(__VA_ARGS__) +#define COMPACT_FMT_134(fmt, value, ...) fmt COMPACT_FMT_132(__VA_ARGS__) +#define COMPACT_FMT_136(fmt, value, ...) fmt COMPACT_FMT_134(__VA_ARGS__) +#define COMPACT_FMT_138(fmt, value, ...) fmt COMPACT_FMT_136(__VA_ARGS__) +#define COMPACT_FMT_140(fmt, value, ...) fmt COMPACT_FMT_138(__VA_ARGS__) +#define COMPACT_FMT_142(fmt, value, ...) fmt COMPACT_FMT_140(__VA_ARGS__) +#define COMPACT_FMT_144(fmt, value, ...) fmt COMPACT_FMT_142(__VA_ARGS__) +#define COMPACT_FMT_146(fmt, value, ...) fmt COMPACT_FMT_144(__VA_ARGS__) +#define COMPACT_FMT_148(fmt, value, ...) fmt COMPACT_FMT_146(__VA_ARGS__) +#define COMPACT_FMT_150(fmt, value, ...) fmt COMPACT_FMT_148(__VA_ARGS__) +#define COMPACT_FMT_152(fmt, value, ...) fmt COMPACT_FMT_150(__VA_ARGS__) +#define COMPACT_FMT_154(fmt, value, ...) fmt COMPACT_FMT_152(__VA_ARGS__) +#define COMPACT_FMT_156(fmt, value, ...) fmt COMPACT_FMT_154(__VA_ARGS__) +#define COMPACT_FMT_158(fmt, value, ...) fmt COMPACT_FMT_156(__VA_ARGS__) +#define COMPACT_FMT_160(fmt, value, ...) fmt COMPACT_FMT_158(__VA_ARGS__) +#define COMPACT_FMT_162(fmt, value, ...) fmt COMPACT_FMT_160(__VA_ARGS__) +#define COMPACT_FMT_164(fmt, value, ...) fmt COMPACT_FMT_162(__VA_ARGS__) +#define COMPACT_FMT_166(fmt, value, ...) fmt COMPACT_FMT_164(__VA_ARGS__) +#define COMPACT_FMT_168(fmt, value, ...) fmt COMPACT_FMT_166(__VA_ARGS__) +#define COMPACT_FMT_170(fmt, value, ...) fmt COMPACT_FMT_168(__VA_ARGS__) +#define COMPACT_FMT_172(fmt, value, ...) fmt COMPACT_FMT_170(__VA_ARGS__) +#define COMPACT_FMT_174(fmt, value, ...) fmt COMPACT_FMT_172(__VA_ARGS__) +#define COMPACT_FMT_176(fmt, value, ...) fmt COMPACT_FMT_174(__VA_ARGS__) +#define COMPACT_FMT_178(fmt, value, ...) fmt COMPACT_FMT_176(__VA_ARGS__) +#define COMPACT_FMT_180(fmt, value, ...) fmt COMPACT_FMT_178(__VA_ARGS__) +#define COMPACT_FMT_182(fmt, value, ...) fmt COMPACT_FMT_180(__VA_ARGS__) +#define COMPACT_FMT_184(fmt, value, ...) fmt COMPACT_FMT_182(__VA_ARGS__) +#define COMPACT_FMT_186(fmt, value, ...) fmt COMPACT_FMT_184(__VA_ARGS__) +#define COMPACT_FMT_188(fmt, value, ...) fmt COMPACT_FMT_186(__VA_ARGS__) +#define COMPACT_FMT_190(fmt, value, ...) fmt COMPACT_FMT_188(__VA_ARGS__) +#define COMPACT_FMT_192(fmt, value, ...) fmt COMPACT_FMT_190(__VA_ARGS__) +#define COMPACT_FMT_194(fmt, value, ...) fmt COMPACT_FMT_192(__VA_ARGS__) +#define COMPACT_FMT_196(fmt, value, ...) fmt COMPACT_FMT_194(__VA_ARGS__) +#define COMPACT_FMT_198(fmt, value, ...) fmt COMPACT_FMT_196(__VA_ARGS__) +#define COMPACT_FMT_200(fmt, value, ...) fmt COMPACT_FMT_198(__VA_ARGS__) #define COMPACT_VALUES_2(fmt, value) value #define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__) @@ -171,5 +210,44 @@ #define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__) #define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__) #define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__) +#define COMPACT_VALUES_124(fmt, value, ...) value, COMPACT_VALUES_122(__VA_ARGS__) +#define COMPACT_VALUES_126(fmt, value, ...) value, COMPACT_VALUES_124(__VA_ARGS__) +#define COMPACT_VALUES_128(fmt, value, ...) value, COMPACT_VALUES_126(__VA_ARGS__) +#define COMPACT_VALUES_130(fmt, value, ...) value, COMPACT_VALUES_128(__VA_ARGS__) +#define COMPACT_VALUES_132(fmt, value, ...) value, COMPACT_VALUES_130(__VA_ARGS__) +#define COMPACT_VALUES_134(fmt, value, ...) value, COMPACT_VALUES_132(__VA_ARGS__) +#define COMPACT_VALUES_136(fmt, value, ...) value, COMPACT_VALUES_134(__VA_ARGS__) +#define COMPACT_VALUES_138(fmt, value, ...) value, COMPACT_VALUES_136(__VA_ARGS__) +#define COMPACT_VALUES_140(fmt, value, ...) value, COMPACT_VALUES_138(__VA_ARGS__) +#define COMPACT_VALUES_142(fmt, value, ...) value, COMPACT_VALUES_140(__VA_ARGS__) +#define COMPACT_VALUES_144(fmt, value, ...) value, COMPACT_VALUES_142(__VA_ARGS__) +#define COMPACT_VALUES_146(fmt, value, ...) value, COMPACT_VALUES_144(__VA_ARGS__) +#define COMPACT_VALUES_148(fmt, value, ...) value, COMPACT_VALUES_146(__VA_ARGS__) +#define COMPACT_VALUES_150(fmt, value, ...) value, COMPACT_VALUES_148(__VA_ARGS__) +#define COMPACT_VALUES_152(fmt, value, ...) value, COMPACT_VALUES_150(__VA_ARGS__) +#define COMPACT_VALUES_154(fmt, value, ...) value, COMPACT_VALUES_152(__VA_ARGS__) +#define COMPACT_VALUES_156(fmt, value, ...) value, COMPACT_VALUES_154(__VA_ARGS__) +#define COMPACT_VALUES_158(fmt, value, ...) value, COMPACT_VALUES_156(__VA_ARGS__) +#define COMPACT_VALUES_160(fmt, value, ...) value, COMPACT_VALUES_158(__VA_ARGS__) +#define COMPACT_VALUES_162(fmt, value, ...) value, COMPACT_VALUES_160(__VA_ARGS__) +#define COMPACT_VALUES_164(fmt, value, ...) value, COMPACT_VALUES_162(__VA_ARGS__) +#define COMPACT_VALUES_166(fmt, value, ...) value, COMPACT_VALUES_164(__VA_ARGS__) +#define COMPACT_VALUES_168(fmt, value, ...) value, COMPACT_VALUES_166(__VA_ARGS__) +#define COMPACT_VALUES_170(fmt, value, ...) value, COMPACT_VALUES_168(__VA_ARGS__) +#define COMPACT_VALUES_172(fmt, value, ...) value, COMPACT_VALUES_170(__VA_ARGS__) +#define COMPACT_VALUES_174(fmt, value, ...) value, COMPACT_VALUES_172(__VA_ARGS__) +#define COMPACT_VALUES_176(fmt, value, ...) value, COMPACT_VALUES_174(__VA_ARGS__) +#define COMPACT_VALUES_178(fmt, value, ...) value, COMPACT_VALUES_176(__VA_ARGS__) +#define COMPACT_VALUES_180(fmt, value, ...) value, COMPACT_VALUES_178(__VA_ARGS__) +#define COMPACT_VALUES_182(fmt, value, ...) value, COMPACT_VALUES_180(__VA_ARGS__) +#define COMPACT_VALUES_184(fmt, value, ...) value, COMPACT_VALUES_182(__VA_ARGS__) +#define COMPACT_VALUES_186(fmt, value, ...) value, COMPACT_VALUES_184(__VA_ARGS__) +#define COMPACT_VALUES_188(fmt, value, ...) value, COMPACT_VALUES_186(__VA_ARGS__) +#define COMPACT_VALUES_190(fmt, value, ...) value, COMPACT_VALUES_188(__VA_ARGS__) +#define COMPACT_VALUES_192(fmt, value, ...) value, COMPACT_VALUES_190(__VA_ARGS__) +#define COMPACT_VALUES_194(fmt, value, ...) value, COMPACT_VALUES_192(__VA_ARGS__) +#define COMPACT_VALUES_196(fmt, value, ...) value, COMPACT_VALUES_194(__VA_ARGS__) +#define COMPACT_VALUES_198(fmt, value, ...) value, COMPACT_VALUES_196(__VA_ARGS__) +#define COMPACT_VALUES_200(fmt, value, ...) value, COMPACT_VALUES_198(__VA_ARGS__) #endif diff --git a/src/networking.c b/src/networking.c index 009729b041..3965325123 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4618,18 +4618,17 @@ int postponeClientRead(client *c) { /* Prefetch multiple commands batch */ typedef struct { client *clients[DictMaxPrefetchSize]; - size_t clients_count; - size_t keys_count; + size_t client_count; + size_t key_count; void *keys[DictMaxPrefetchSize]; kvstore *keys_kvs[DictMaxPrefetchSize]; kvstore *expire_kvs[DictMaxPrefetchSize]; int slots[DictMaxPrefetchSize]; - int client_closed_during_batch_execution; } BatchProcessData; static BatchProcessData batch = {0}; -static void *getValData(const void *val) { +static void *getObjectValuePtr(const void *val) { robj *o = (robj *)val; if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) { return o->ptr; @@ -4637,93 +4636,91 @@ static void *getValData(const void *val) { return NULL; } -void processBatchClientsCommands(void) { - if (batch.clients_count == 0) return; +static void batchProcessClientCommands(void) { + for (size_t i = 0; i < batch.client_count; i++) { + client *c = batch.clients[i]; + if (c) { + /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ + batch.clients[i] = NULL; + if (processPendingCommandAndInputBuffer(c) != C_ERR) { + beforeNextClient(c); + } + } + } + memset(&batch, 0, sizeof(batch)); +} + +/*Prefetch the commands' args allocated by the I/O thread and process all the commands in the batch.*/ +static void batchPrefetchArgsAndProcessClientCommands(void) { + if (batch.client_count == 0) return; /* Prefetch argv's for all clients */ - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { client *c = batch.clients[i]; if (!c || c->argc <= 1) continue; /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ for (int j = 1; j < c->argc; j++) { - __builtin_prefetch(c->argv[j]); + valkey_prefetch(c->argv[j]); } } /* prefetch the argv->ptr if required */ - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { client *c = batch.clients[i]; if (!c || c->argc <= 1) continue; for (int j = 1; j < c->argc; j++) { if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { - __builtin_prefetch(c->argv[j]->ptr); + valkey_prefetch(c->argv[j]->ptr); } } } /* Get the keys ptrs - we do it here since we wanted to wait for the arg prefetch */ - for (size_t i = 0; i < batch.keys_count; i++) { + for (size_t i = 0; i < batch.key_count; i++) { batch.keys[i] = ((robj *)batch.keys[i])->ptr; } /* Prefetch keys for all commands, prefetch is beneficial only if there are more than one key */ - if (batch.keys_count > 1) { + if (batch.key_count > 1) { server.stat_total_prefetch_batches++; - server.stat_total_prefetch_entries += batch.keys_count; + server.stat_total_prefetch_entries += batch.key_count; /* Keys */ - kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, getValData); + kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **) batch.keys, batch.key_count, getObjectValuePtr); /* Expires - with expires no values prefetch are required. */ - kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, NULL); + kvstoreDictPrefetch(batch.expire_kvs, batch.slots, (const void **)batch.keys, batch.key_count, NULL); } /* Process clients' commands */ - for (size_t i = 0; i < batch.clients_count; i++) { - client *c = batch.clients[i]; - if (c) { - /* Set immediately the client to null - in order to not access it again when ProcessingEventsWhileBlocked */ - batch.clients[i] = NULL; - if (processPendingCommandAndInputBuffer(c) != C_ERR) { - beforeNextClient(c); - } - } - } - - batch.keys_count = 0; - batch.clients_count = 0; + batchProcessClientCommands(); } void addCommandToBatchAndProcessIfFull(client *c) { - batch.clients[batch.clients_count++] = c; + batch.clients[batch.client_count++] = c; - /* Get command's keys. - * When ProcessingEventsWhileBlocked is set, we don't want to prefetch keys, as no commands will be executed. */ - if (c->io_parsed_cmd && !ProcessingEventsWhileBlocked) { + /* Get command's keys positions */ + if (c->io_parsed_cmd) { getKeysResult result; initGetKeysResult(&result); int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch.keys_count < DictMaxPrefetchSize; i++) { - batch.keys[batch.keys_count] = c->argv[result.keys[i].pos]; - batch.slots[batch.keys_count] = c->slot > 0 ? c->slot : 0; - batch.keys_kvs[batch.keys_count] = c->db->keys; - batch.expire_kvs[batch.keys_count] = c->db->expires; - batch.keys_count++; + for (int i = 0; i < num_keys && batch.key_count < DictMaxPrefetchSize; i++) { + batch.keys[batch.key_count] = c->argv[result.keys[i].pos]; + batch.slots[batch.key_count] = c->slot > 0 ? c->slot : 0; + batch.keys_kvs[batch.key_count] = c->db->keys; + batch.expire_kvs[batch.key_count] = c->db->expires; + batch.key_count++; } getKeysFreeResult(&result); } /* If the batch is full, process it. * We also check the client count to handle cases where - * no keys exist for the client's command. */ - if (batch.clients_count == DictMaxPrefetchSize || batch.keys_count == DictMaxPrefetchSize) { - processBatchClientsCommands(); + * no keys exist for the clients' commands. */ + if (batch.client_count == DictMaxPrefetchSize || batch.key_count == DictMaxPrefetchSize) { + batchPrefetchArgsAndProcessClientCommands(); } } void removeClientFromPendingPrefetchBatch(client *c) { - if (batch.clients_count == 0) return; - - batch.client_closed_during_batch_execution = 1; - - for (size_t i = 0; i < batch.clients_count; i++) { + for (size_t i = 0; i < batch.client_count; i++) { if (batch.clients[i] == c) { batch.clients[i] = NULL; return; @@ -4732,9 +4729,14 @@ void removeClientFromPendingPrefetchBatch(client *c) { } int processIOThreadsReadDone(void) { - if (listLength(server.clients_pending_io_read) == 0 && batch.clients_count == 0) return 0; + if (ProcessingEventsWhileBlocked) { + /* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively. + * In this case, there may be some clients left in the batch waiting to be processed. */ + batchProcessClientCommands(); + } + + if (listLength(server.clients_pending_io_read) == 0) return 0; int processed = 0; - batch.client_closed_during_batch_execution = 0; listNode *ln; listNode *next = listFirst(server.clients_pending_io_read); @@ -4751,16 +4753,17 @@ int processIOThreadsReadDone(void) { } /* memory barrier acquire to get the updated client state */ atomic_thread_fence(memory_order_acquire); - /* Don't post-process-writes to clients that are going to be closed anyway. */ - if (c->flag.close_asap) continue; - /* If a client is protected, don't do anything, - * that may trigger read/write error or recreate handler. */ - if (c->flag.protected) continue; listUnlinkNode(server.clients_pending_io_read, ln); c->flag.pending_read = 0; c->io_read_state = CLIENT_IDLE; + /* Don't post-process-reads to clients that are going to be closed anyway. */ + if (c->flag.close_asap) continue; + /* If a client is protected, don't do anything, + * that may trigger read/write error or recreate handler. */ + if (c->flag.protected) continue; + processed++; server.stat_io_reads_processed++; @@ -4787,17 +4790,15 @@ int processIOThreadsReadDone(void) { c->flag.pending_command = 1; } + size_t list_len = listLength(server.clients_pending_io_read); addCommandToBatchAndProcessIfFull(c); - - /* There is a possibility that a client was closed during the latest batch processing. - * In this case the next node may be invalid */ - if (batch.client_closed_during_batch_execution) { + if (list_len != listLength(server.clients_pending_io_read)) { + /* A client was removed from the list - next node may be invalid */ next = listFirst(server.clients_pending_io_read); - batch.client_closed_during_batch_execution = 0; } } - processBatchClientsCommands(); + batchPrefetchArgsAndProcessClientCommands(); return processed; } diff --git a/src/server.c b/src/server.c index 3e7abeab19..4bcbbe4826 100644 --- a/src/server.c +++ b/src/server.c @@ -5623,10 +5623,6 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { server.stat_last_eviction_exceeded_time ? (long long)elapsedUs(server.stat_last_eviction_exceeded_time) : 0; long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long)elapsedUs(server.stat_last_active_defrag_time) : 0; - long long average_prefetch_batch_size = - (server.stat_total_prefetch_batches - ? server.stat_total_prefetch_entries / server.stat_total_prefetch_batches - : 0); if (sections++) info = sdscat(info, "\r\n"); /* clang-format off */ @@ -5682,7 +5678,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, - "io_threaded_average_prefetch_batch_size:%lld\r\n", average_prefetch_batch_size, + "io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches, + "io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index a9831085f1..1740436c2b 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -206,8 +206,10 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb # verify the prefetch stats are as expected set info [r info stats] - set prefetch_stats [getInfoProperty $info io_threaded_average_prefetch_batch_size] - assert_range [expr $prefetch_stats] 2 15 ; # we expect max 15 as the the kill command doesn't have any keys. + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower + set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] + assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher # Verify the final state $rd16 get a diff --git a/utils/generate-fmtargs.py b/utils/generate-fmtargs.py index 873b8f67f6..dfe8efadcc 100755 --- a/utils/generate-fmtargs.py +++ b/utils/generate-fmtargs.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Outputs the generated part of src/fmtargs.h -MAX_ARGS = 122 +MAX_ARGS = 200 import os print("/* Everything below this line is automatically generated by")