From 9087f064d1f9dc1717d9b9b661028c8dcb4a4dc4 Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Tue, 27 Aug 2024 07:10:44 +0300 Subject: [PATCH] Improve multithreaded performance with memory prefetching (#861) This PR utilizes the IO threads to execute commands in batches, allowing us to prefetch the dictionary data in advance. After making the IO threads asynchronous and offloading more work to them in the first 2 PRs, the `lookupKey` function becomes a main bottle-neck and it takes about 50% of the main-thread time (Tested with SET command). This is because the Valkey dictionary is a straightforward but inefficient chained hash implementation. While traversing the hash linked lists, every access to either a dictEntry structure, pointer to key, or a value object requires, with high probability, an expensive external memory access. ### Memory Access Amortization Memory Access Amortization (MAA) is a technique designed to optimize the performance of dynamic data structures by reducing the impact of memory access latency. It is applicable when multiple operations need to be executed concurrently. The principle behind it is that for certain dynamic data structures, executing operations in a batch is more efficient than executing each one separately. Rather than executing operations sequentially, this approach interleaves the execution of all operations. This is done in such a way that whenever a memory access is required during an operation, the program prefetches the necessary memory and transitions to another operation. This ensures that when one operation is blocked awaiting memory access, other memory accesses are executed in parallel, thereby reducing the average access latency. We applied this method in the development of `dictPrefetch`, which takes as parameters a vector of keys and dictionaries. It ensures that all memory addresses required to execute dictionary operations for these keys are loaded into the L1-L3 caches when executing commands. Essentially, `dictPrefetch` is an interleaved execution of dictFind for all the keys. **Implementation details** When the main thread iterates over the `clients-pending-io-read`, for clients with ready-to-execute commands (i.e., clients for which the IO thread has parsed the commands), a batch of up to 16 commands is created. Initially, the command's argv, which were allocated by the IO thread, is prefetched to the main thread's L1 cache. Subsequently, all the dict entries and values required for the commands are prefetched from the dictionary before the command execution. Only then will the commands be executed. --------- Signed-off-by: Uri Yagelnik --- src/Makefile | 2 +- src/config.c | 1 + src/config.h | 16 ++ src/dict.c | 4 +- src/dict.h | 1 + src/fmtargs.h | 84 +++++++- src/io_threads.c | 8 + src/kvstore.c | 2 +- src/kvstore.h | 1 + src/memory_prefetch.c | 414 ++++++++++++++++++++++++++++++++++++++ src/memory_prefetch.h | 11 + src/networking.c | 43 +++- src/server.c | 2 + src/server.h | 4 + tests/unit/networking.tcl | 118 +++++++++++ utils/generate-fmtargs.py | 2 +- valkey.conf | 14 +- 17 files changed, 711 insertions(+), 16 deletions(-) create mode 100644 src/memory_prefetch.c create mode 100644 src/memory_prefetch.h diff --git a/src/Makefile b/src/Makefile index d69bd915dc..13fa1c0275 100644 --- a/src/Makefile +++ b/src/Makefile @@ -423,7 +423,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/config.c b/src/config.c index f8e0bffdae..a4896d6cb6 100644 --- a/src/config.c +++ b/src/config.c @@ -3166,6 +3166,7 @@ standardConfig static_configs[] = { createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), + createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */ createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL), diff --git a/src/config.h b/src/config.h index 8d19fa9f7f..7ae69da46f 100644 --- a/src/config.h +++ b/src/config.h @@ -348,4 +348,20 @@ void setcpuaffinity(const char *cpulist); #endif #endif +/* Check for GCC version >= 4.9 */ +#if defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 9)) +#define HAS_BUILTIN_PREFETCH 1 +/* Check for Clang version >= 3.6 */ +#elif defined(__clang__) && (__clang_major__ > 3 || (__clang_major__ == 3 && __clang_minor__ >= 6)) +#define HAS_BUILTIN_PREFETCH 1 +#else +#define HAS_BUILTIN_PREFETCH 0 +#endif + +#if HAS_BUILTIN_PREFETCH +#define valkey_prefetch(addr) __builtin_prefetch(addr) +#else +#define valkey_prefetch(addr) ((void)(addr)) +#endif + #endif diff --git a/src/dict.c b/src/dict.c index 1df03f6546..cf49d5975d 100644 --- a/src/dict.c +++ b/src/dict.c @@ -48,6 +48,7 @@ #include "zmalloc.h" #include "serverassert.h" #include "monotonic.h" +#include "config.h" #ifndef static_assert #define static_assert(expr, lit) _Static_assert(expr, lit) @@ -119,7 +120,6 @@ static void _dictExpandIfNeeded(dict *d); static void _dictShrinkIfNeeded(dict *d); static signed char _dictNextExp(unsigned long size); static int _dictInit(dict *d, dictType *type); -static dictEntry *dictGetNext(const dictEntry *de); static dictEntry **dictGetNextRef(dictEntry *de); static void dictSetNext(dictEntry *de, dictEntry *next); @@ -962,7 +962,7 @@ double *dictGetDoubleValPtr(dictEntry *de) { /* Returns the 'next' field of the entry or NULL if the entry doesn't have a * 'next' field. */ -static dictEntry *dictGetNext(const dictEntry *de) { +dictEntry *dictGetNext(const dictEntry *de) { if (entryIsKey(de)) return NULL; /* there's no next */ if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next; if (entryIsEmbedded(de)) return decodeEmbeddedEntry(de)->next; diff --git a/src/dict.h b/src/dict.h index 5fd15004b8..40fa284d2b 100644 --- a/src/dict.h +++ b/src/dict.h @@ -229,6 +229,7 @@ void dictInitIterator(dictIterator *iter, dict *d); void dictInitSafeIterator(dictIterator *iter, dict *d); void dictResetIterator(dictIterator *iter); dictEntry *dictNext(dictIterator *iter); +dictEntry *dictGetNext(const dictEntry *de); void dictReleaseIterator(dictIterator *iter); dictEntry *dictGetRandomKey(dict *d); dictEntry *dictGetFairRandomKey(dict *d); diff --git a/src/fmtargs.h b/src/fmtargs.h index e52d3b99c5..1fbd02ed82 100644 --- a/src/fmtargs.h +++ b/src/fmtargs.h @@ -44,9 +44,9 @@ /* Everything below this line is automatically generated by * generate-fmtargs.py. Do not manually edit. */ -#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N +#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, _123, _124, _125, _126, _127, _128, _129, _130, _131, _132, _133, _134, _135, _136, _137, _138, _139, _140, _141, _142, _143, _144, _145, _146, _147, _148, _149, _150, _151, _152, _153, _154, _155, _156, _157, _158, _159, _160, _161, _162, _163, _164, _165, _166, _167, _168, _169, _170, _171, _172, _173, _174, _175, _176, _177, _178, _179, _180, _181, _182, _183, _184, _185, _186, _187, _188, _189, _190, _191, _192, _193, _194, _195, _196, _197, _198, _199, _200, N, ...) N -#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 +#define RSEQ_N() 200, 199, 198, 197, 196, 195, 194, 193, 192, 191, 190, 189, 188, 187, 186, 185, 184, 183, 182, 181, 180, 179, 178, 177, 176, 175, 174, 173, 172, 171, 170, 169, 168, 167, 166, 165, 164, 163, 162, 161, 160, 159, 158, 157, 156, 155, 154, 153, 152, 151, 150, 149, 148, 147, 146, 145, 144, 143, 142, 141, 140, 139, 138, 137, 136, 135, 134, 133, 132, 131, 130, 129, 128, 127, 126, 125, 124, 123, 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 #define COMPACT_FMT_2(fmt, value) fmt #define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__) @@ -108,6 +108,46 @@ #define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__) #define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__) #define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__) +#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__) +#define COMPACT_FMT_124(fmt, value, ...) fmt COMPACT_FMT_122(__VA_ARGS__) +#define COMPACT_FMT_126(fmt, value, ...) fmt COMPACT_FMT_124(__VA_ARGS__) +#define COMPACT_FMT_128(fmt, value, ...) fmt COMPACT_FMT_126(__VA_ARGS__) +#define COMPACT_FMT_130(fmt, value, ...) fmt COMPACT_FMT_128(__VA_ARGS__) +#define COMPACT_FMT_132(fmt, value, ...) fmt COMPACT_FMT_130(__VA_ARGS__) +#define COMPACT_FMT_134(fmt, value, ...) fmt COMPACT_FMT_132(__VA_ARGS__) +#define COMPACT_FMT_136(fmt, value, ...) fmt COMPACT_FMT_134(__VA_ARGS__) +#define COMPACT_FMT_138(fmt, value, ...) fmt COMPACT_FMT_136(__VA_ARGS__) +#define COMPACT_FMT_140(fmt, value, ...) fmt COMPACT_FMT_138(__VA_ARGS__) +#define COMPACT_FMT_142(fmt, value, ...) fmt COMPACT_FMT_140(__VA_ARGS__) +#define COMPACT_FMT_144(fmt, value, ...) fmt COMPACT_FMT_142(__VA_ARGS__) +#define COMPACT_FMT_146(fmt, value, ...) fmt COMPACT_FMT_144(__VA_ARGS__) +#define COMPACT_FMT_148(fmt, value, ...) fmt COMPACT_FMT_146(__VA_ARGS__) +#define COMPACT_FMT_150(fmt, value, ...) fmt COMPACT_FMT_148(__VA_ARGS__) +#define COMPACT_FMT_152(fmt, value, ...) fmt COMPACT_FMT_150(__VA_ARGS__) +#define COMPACT_FMT_154(fmt, value, ...) fmt COMPACT_FMT_152(__VA_ARGS__) +#define COMPACT_FMT_156(fmt, value, ...) fmt COMPACT_FMT_154(__VA_ARGS__) +#define COMPACT_FMT_158(fmt, value, ...) fmt COMPACT_FMT_156(__VA_ARGS__) +#define COMPACT_FMT_160(fmt, value, ...) fmt COMPACT_FMT_158(__VA_ARGS__) +#define COMPACT_FMT_162(fmt, value, ...) fmt COMPACT_FMT_160(__VA_ARGS__) +#define COMPACT_FMT_164(fmt, value, ...) fmt COMPACT_FMT_162(__VA_ARGS__) +#define COMPACT_FMT_166(fmt, value, ...) fmt COMPACT_FMT_164(__VA_ARGS__) +#define COMPACT_FMT_168(fmt, value, ...) fmt COMPACT_FMT_166(__VA_ARGS__) +#define COMPACT_FMT_170(fmt, value, ...) fmt COMPACT_FMT_168(__VA_ARGS__) +#define COMPACT_FMT_172(fmt, value, ...) fmt COMPACT_FMT_170(__VA_ARGS__) +#define COMPACT_FMT_174(fmt, value, ...) fmt COMPACT_FMT_172(__VA_ARGS__) +#define COMPACT_FMT_176(fmt, value, ...) fmt COMPACT_FMT_174(__VA_ARGS__) +#define COMPACT_FMT_178(fmt, value, ...) fmt COMPACT_FMT_176(__VA_ARGS__) +#define COMPACT_FMT_180(fmt, value, ...) fmt COMPACT_FMT_178(__VA_ARGS__) +#define COMPACT_FMT_182(fmt, value, ...) fmt COMPACT_FMT_180(__VA_ARGS__) +#define COMPACT_FMT_184(fmt, value, ...) fmt COMPACT_FMT_182(__VA_ARGS__) +#define COMPACT_FMT_186(fmt, value, ...) fmt COMPACT_FMT_184(__VA_ARGS__) +#define COMPACT_FMT_188(fmt, value, ...) fmt COMPACT_FMT_186(__VA_ARGS__) +#define COMPACT_FMT_190(fmt, value, ...) fmt COMPACT_FMT_188(__VA_ARGS__) +#define COMPACT_FMT_192(fmt, value, ...) fmt COMPACT_FMT_190(__VA_ARGS__) +#define COMPACT_FMT_194(fmt, value, ...) fmt COMPACT_FMT_192(__VA_ARGS__) +#define COMPACT_FMT_196(fmt, value, ...) fmt COMPACT_FMT_194(__VA_ARGS__) +#define COMPACT_FMT_198(fmt, value, ...) fmt COMPACT_FMT_196(__VA_ARGS__) +#define COMPACT_FMT_200(fmt, value, ...) fmt COMPACT_FMT_198(__VA_ARGS__) #define COMPACT_VALUES_2(fmt, value) value #define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__) @@ -169,5 +209,45 @@ #define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__) #define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__) #define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__) +#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__) +#define COMPACT_VALUES_124(fmt, value, ...) value, COMPACT_VALUES_122(__VA_ARGS__) +#define COMPACT_VALUES_126(fmt, value, ...) value, COMPACT_VALUES_124(__VA_ARGS__) +#define COMPACT_VALUES_128(fmt, value, ...) value, COMPACT_VALUES_126(__VA_ARGS__) +#define COMPACT_VALUES_130(fmt, value, ...) value, COMPACT_VALUES_128(__VA_ARGS__) +#define COMPACT_VALUES_132(fmt, value, ...) value, COMPACT_VALUES_130(__VA_ARGS__) +#define COMPACT_VALUES_134(fmt, value, ...) value, COMPACT_VALUES_132(__VA_ARGS__) +#define COMPACT_VALUES_136(fmt, value, ...) value, COMPACT_VALUES_134(__VA_ARGS__) +#define COMPACT_VALUES_138(fmt, value, ...) value, COMPACT_VALUES_136(__VA_ARGS__) +#define COMPACT_VALUES_140(fmt, value, ...) value, COMPACT_VALUES_138(__VA_ARGS__) +#define COMPACT_VALUES_142(fmt, value, ...) value, COMPACT_VALUES_140(__VA_ARGS__) +#define COMPACT_VALUES_144(fmt, value, ...) value, COMPACT_VALUES_142(__VA_ARGS__) +#define COMPACT_VALUES_146(fmt, value, ...) value, COMPACT_VALUES_144(__VA_ARGS__) +#define COMPACT_VALUES_148(fmt, value, ...) value, COMPACT_VALUES_146(__VA_ARGS__) +#define COMPACT_VALUES_150(fmt, value, ...) value, COMPACT_VALUES_148(__VA_ARGS__) +#define COMPACT_VALUES_152(fmt, value, ...) value, COMPACT_VALUES_150(__VA_ARGS__) +#define COMPACT_VALUES_154(fmt, value, ...) value, COMPACT_VALUES_152(__VA_ARGS__) +#define COMPACT_VALUES_156(fmt, value, ...) value, COMPACT_VALUES_154(__VA_ARGS__) +#define COMPACT_VALUES_158(fmt, value, ...) value, COMPACT_VALUES_156(__VA_ARGS__) +#define COMPACT_VALUES_160(fmt, value, ...) value, COMPACT_VALUES_158(__VA_ARGS__) +#define COMPACT_VALUES_162(fmt, value, ...) value, COMPACT_VALUES_160(__VA_ARGS__) +#define COMPACT_VALUES_164(fmt, value, ...) value, COMPACT_VALUES_162(__VA_ARGS__) +#define COMPACT_VALUES_166(fmt, value, ...) value, COMPACT_VALUES_164(__VA_ARGS__) +#define COMPACT_VALUES_168(fmt, value, ...) value, COMPACT_VALUES_166(__VA_ARGS__) +#define COMPACT_VALUES_170(fmt, value, ...) value, COMPACT_VALUES_168(__VA_ARGS__) +#define COMPACT_VALUES_172(fmt, value, ...) value, COMPACT_VALUES_170(__VA_ARGS__) +#define COMPACT_VALUES_174(fmt, value, ...) value, COMPACT_VALUES_172(__VA_ARGS__) +#define COMPACT_VALUES_176(fmt, value, ...) value, COMPACT_VALUES_174(__VA_ARGS__) +#define COMPACT_VALUES_178(fmt, value, ...) value, COMPACT_VALUES_176(__VA_ARGS__) +#define COMPACT_VALUES_180(fmt, value, ...) value, COMPACT_VALUES_178(__VA_ARGS__) +#define COMPACT_VALUES_182(fmt, value, ...) value, COMPACT_VALUES_180(__VA_ARGS__) +#define COMPACT_VALUES_184(fmt, value, ...) value, COMPACT_VALUES_182(__VA_ARGS__) +#define COMPACT_VALUES_186(fmt, value, ...) value, COMPACT_VALUES_184(__VA_ARGS__) +#define COMPACT_VALUES_188(fmt, value, ...) value, COMPACT_VALUES_186(__VA_ARGS__) +#define COMPACT_VALUES_190(fmt, value, ...) value, COMPACT_VALUES_188(__VA_ARGS__) +#define COMPACT_VALUES_192(fmt, value, ...) value, COMPACT_VALUES_190(__VA_ARGS__) +#define COMPACT_VALUES_194(fmt, value, ...) value, COMPACT_VALUES_192(__VA_ARGS__) +#define COMPACT_VALUES_196(fmt, value, ...) value, COMPACT_VALUES_194(__VA_ARGS__) +#define COMPACT_VALUES_198(fmt, value, ...) value, COMPACT_VALUES_196(__VA_ARGS__) +#define COMPACT_VALUES_200(fmt, value, ...) value, COMPACT_VALUES_198(__VA_ARGS__) #endif diff --git a/src/io_threads.c b/src/io_threads.c index c9345d72e0..5b2230f635 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -1,3 +1,9 @@ +/* + * Copyright Valkey Contributors. + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + */ + #include "io_threads.h" static __thread int thread_id = 0; /* Thread local var */ @@ -303,6 +309,8 @@ void initIOThreads(void) { serverAssert(server.io_threads_num <= IO_THREADS_MAX_NUM); + prefetchCommandsBatchInit(); + /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { createIOThread(i); diff --git a/src/kvstore.c b/src/kvstore.c index 16cc8e4822..b7fa7359ab 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -93,7 +93,7 @@ typedef struct { /**********************************/ /* Get the dictionary pointer based on dict-index. */ -static dict *kvstoreGetDict(kvstore *kvs, int didx) { +dict *kvstoreGetDict(kvstore *kvs, int didx) { return kvs->dicts[didx]; } diff --git a/src/kvstore.h b/src/kvstore.h index a94f366b6b..202f6a9c25 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -76,5 +76,6 @@ void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val); dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index); void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index); int kvstoreDictDelete(kvstore *kvs, int didx, const void *key); +dict *kvstoreGetDict(kvstore *kvs, int didx); #endif /* DICTARRAY_H_ */ diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c new file mode 100644 index 0000000000..01c510638a --- /dev/null +++ b/src/memory_prefetch.c @@ -0,0 +1,414 @@ +/* + * Copyright Valkey Contributors. + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + * + * This file utilizes prefetching keys and data for multiple commands in a batch, + * to improve performance by amortizing memory access costs across multiple operations. + */ + +#include "memory_prefetch.h" +#include "server.h" +#include "dict.h" + +/* Forward declarations of dict.c functions */ +dictEntry *dictGetNext(const dictEntry *de); + +/* Forward declarations of kvstore.c functions */ +dict *kvstoreGetDict(kvstore *kvs, int didx); + +typedef enum { HT_IDX_FIRST = 0, HT_IDX_SECOND = 1, HT_IDX_INVALID = -1 } HashTableIndex; + +typedef enum { + PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */ + PREFETCH_ENTRY, /* prefetch entries associated with the given key's hash */ + PREFETCH_VALUE, /* prefetch the value object of the entry found in the previous step */ + PREFETCH_VALUE_DATA, /* prefetch the value object's data (if applicable) */ + PREFETCH_DONE /* Indicates that prefetching for this key is complete */ +} PrefetchState; + + +/************************************ State machine diagram for the prefetch operation. ******************************** + │ + start + │ + ┌────────▼─────────┐ + ┌─────────►│ PREFETCH_BUCKET ├────►────────┐ + │ └────────┬─────────┘ no more tables -> done + | bucket|found | + │ | │ + entry not found - goto next table ┌────────▼────────┐ │ + └────◄─────┤ PREFETCH_ENTRY | ▼ + ┌────────────►└────────┬────────┘ │ + | Entry│found │ + │ | │ + value not found - goto next entry ┌───────▼────────┐ | + └───────◄──────┤ PREFETCH_VALUE | ▼ + └───────┬────────┘ │ + Value│found │ + | | + ┌───────────▼──────────────┐ │ + │ PREFETCH_VALUE_DATA │ ▼ + └───────────┬──────────────┘ │ + | │ + ┌───────-─▼─────────────┐ │ + │ PREFETCH_DONE │◄────────┘ + └───────────────────────┘ +**********************************************************************************************************************/ + +typedef void *(*GetValueDataFunc)(const void *val); + +typedef struct KeyPrefetchInfo { + PrefetchState state; /* Current state of the prefetch operation */ + HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ + uint64_t bucket_idx; /* Index of the bucket in the current hash table */ + uint64_t key_hash; /* Hash value of the key being prefetched */ + dictEntry *current_entry; /* Pointer to the current entry being processed */ +} KeyPrefetchInfo; + +/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */ +typedef struct PrefetchCommandsBatch { + size_t cur_idx; /* Index of the current key being processed */ + size_t keys_done; /* Number of keys that have been prefetched */ + size_t key_count; /* Number of keys in the current batch */ + size_t client_count; /* Number of clients in the current batch */ + size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ + size_t executed_commands; /* Number of commands executed in the current batch */ + int *slots; /* Array of slots for each key */ + void **keys; /* Array of keys to prefetch in the current batch */ + client **clients; /* Array of clients in the current batch */ + dict **keys_dicts; /* Main dict for each key */ + dict **expire_dicts; /* Expire dict for each key */ + dict **current_dicts; /* Points to either keys_dicts or expire_dicts */ + KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */ +} PrefetchCommandsBatch; + +static PrefetchCommandsBatch *batch = NULL; + +void freePrefetchCommandsBatch(void) { + if (batch == NULL) { + return; + } + + zfree(batch->clients); + zfree(batch->keys); + zfree(batch->keys_dicts); + zfree(batch->expire_dicts); + zfree(batch->slots); + zfree(batch->prefetch_info); + zfree(batch); + batch = NULL; +} + +void prefetchCommandsBatchInit(void) { + serverAssert(!batch); + size_t max_prefetch_size = server.prefetch_batch_max_size; + + if (max_prefetch_size == 0) { + return; + } + + batch = zcalloc(sizeof(PrefetchCommandsBatch)); + batch->max_prefetch_size = max_prefetch_size; + batch->clients = zcalloc(max_prefetch_size * sizeof(client *)); + batch->keys = zcalloc(max_prefetch_size * sizeof(void *)); + batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->slots = zcalloc(max_prefetch_size * sizeof(int)); + batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo)); +} + +void onMaxBatchSizeChange(void) { + if (batch && batch->client_count > 0) { + /* We need to process the current batch before updating the size */ + return; + } + + freePrefetchCommandsBatch(); + prefetchCommandsBatchInit(); +} + +/* Prefetch the given pointer and move to the next key in the batch. */ +static void prefetchAndMoveToNextKey(void *addr) { + valkey_prefetch(addr); + /* While the prefetch is in progress, we can continue to the next key */ + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; +} + +static void markKeyAsdone(KeyPrefetchInfo *info) { + info->state = PREFETCH_DONE; + server.stat_total_prefetch_entries++; + batch->keys_done++; +} + +/* Returns the next KeyPrefetchInfo structure that needs to be processed. */ +static KeyPrefetchInfo *getNextPrefetchInfo(void) { + size_t start_idx = batch->cur_idx; + do { + KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; + if (info->state != PREFETCH_DONE) return info; + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; + } while (batch->cur_idx != start_idx); + return NULL; +} + +static void initBatchInfo(dict **dicts) { + batch->current_dicts = dicts; + + /* Initialize the prefetch info */ + for (size_t i = 0; i < batch->key_count; i++) { + KeyPrefetchInfo *info = &batch->prefetch_info[i]; + if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) { + info->state = PREFETCH_DONE; + batch->keys_done++; + continue; + } + info->ht_idx = HT_IDX_INVALID; + info->current_entry = NULL; + info->state = PREFETCH_BUCKET; + info->key_hash = dictHashKey(batch->current_dicts[i], batch->keys[i]); + } +} + +/* Prefetch the bucket of the next hash table index. + * If no tables are left, move to the PREFETCH_DONE state. */ +static void prefetchBucket(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + + /* Determine which hash table to use */ + if (info->ht_idx == HT_IDX_INVALID) { + info->ht_idx = HT_IDX_FIRST; + } else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) { + info->ht_idx = HT_IDX_SECOND; + } else { + /* No more tables left - mark as done. */ + markKeyAsdone(info); + return; + } + + /* Prefetch the bucket */ + info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]); + prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + info->current_entry = NULL; + info->state = PREFETCH_ENTRY; +} + +/* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state. + * If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ +static void prefetchEntry(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + + if (info->current_entry) { + /* We already found an entry in the bucket - move to the next entry */ + info->current_entry = dictGetNext(info->current_entry); + } else { + /* Go to the first entry in the bucket */ + info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]; + } + + if (info->current_entry) { + prefetchAndMoveToNextKey(info->current_entry); + info->state = PREFETCH_VALUE; + } else { + /* No entry found in the bucket - try the bucket in the next table */ + info->state = PREFETCH_BUCKET; + } +} + +/* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state. + * If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */ +static void prefetchValue(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + void *value = dictGetVal(info->current_entry); + + if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) { + /* If this is the last element, we assume a hit and don't compare the keys */ + prefetchAndMoveToNextKey(value); + info->state = PREFETCH_VALUE_DATA; + return; + } + + void *current_entry_key = dictGetKey(info->current_entry); + if (batch->keys[i] == current_entry_key || + dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) { + /* If the key is found, prefetch the value */ + prefetchAndMoveToNextKey(value); + info->state = PREFETCH_VALUE_DATA; + } else { + /* Move to the next entry */ + info->state = PREFETCH_ENTRY; + } +} + +/* Prefetch the value data if available. */ +static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_data_func) { + if (get_val_data_func) { + void *value_data = get_val_data_func(dictGetVal(info->current_entry)); + if (value_data) prefetchAndMoveToNextKey(value_data); + } + markKeyAsdone(info); +} + +/* Prefetch dictionary data for an array of keys. + * + * This function takes an array of dictionaries and keys, attempting to bring + * data closer to the L1 cache that might be needed for dictionary operations + * on those keys. + * + * The dictFind algorithm: + * 1. Evaluate the hash of the key + * 2. Access the index in the first table + * 3. Walk the entries linked list until the key is found + * If the key hasn't been found and the dictionary is in the middle of rehashing, + * access the index on the second table and repeat step 3 + * + * dictPrefetch executes the same algorithm as dictFind, but one step at a time + * for each key. Instead of waiting for data to be read from memory, it prefetches + * the data and then moves on to execute the next prefetch for another key. + * + * dicts - An array of dictionaries to prefetch data from. + * get_val_data_func - A callback function that dictPrefetch can invoke + * to bring the key's value data closer to the L1 cache as well. + */ +static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) { + initBatchInfo(dicts); + KeyPrefetchInfo *info; + while ((info = getNextPrefetchInfo())) { + switch (info->state) { + case PREFETCH_BUCKET: prefetchBucket(info); break; + case PREFETCH_ENTRY: prefetchEntry(info); break; + case PREFETCH_VALUE: prefetchValue(info); break; + case PREFETCH_VALUE_DATA: prefetchValueData(info, get_val_data_func); break; + default: serverPanic("Unknown prefetch state %d", info->state); + } + } +} + +/* Helper function to get the value pointer of an object. */ +static void *getObjectValuePtr(const void *val) { + robj *o = (robj *)val; + return (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) ? o->ptr : NULL; +} + +static void resetCommandsBatch(void) { + batch->cur_idx = 0; + batch->keys_done = 0; + batch->key_count = 0; + batch->client_count = 0; + batch->executed_commands = 0; +} + +/* Prefetch command-related data: + * 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache. + * 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */ +static void prefetchCommands(void) { + /* Prefetch argv's for all clients */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ + for (int j = 1; j < c->argc; j++) { + valkey_prefetch(c->argv[j]); + } + } + + /* Prefetch the argv->ptr if required */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + for (int j = 1; j < c->argc; j++) { + if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { + valkey_prefetch(c->argv[j]->ptr); + } + } + } + + /* Get the keys ptrs - we do it here after the key obj was prefetched. */ + for (size_t i = 0; i < batch->key_count; i++) { + batch->keys[i] = ((robj *)batch->keys[i])->ptr; + } + + /* Prefetch dict keys for all commands. Prefetching is beneficial only if there are more than one key. */ + if (batch->key_count > 1) { + server.stat_total_prefetch_batches++; + /* Prefetch keys from the main dict */ + dictPrefetch(batch->keys_dicts, getObjectValuePtr); + /* Prefetch keys from the expires dict - no value data to prefetch */ + dictPrefetch(batch->expire_dicts, NULL); + } +} + +/* Processes all the prefetched commands in the current batch. */ +void processClientsCommandsBatch(void) { + if (!batch || batch->client_count == 0) return; + + /* If executed_commands is not 0, + * it means that we are in the middle of processing a batch and this is a recursive call */ + if (batch->executed_commands == 0) { + prefetchCommands(); + } + + /* Process the commands */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (c == NULL) continue; + + /* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */ + batch->clients[i] = NULL; + batch->executed_commands++; + if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c); + } + + resetCommandsBatch(); + + /* Handle the case where the max prefetch size has been changed. */ + if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) { + onMaxBatchSizeChange(); + } +} + +/* Adds the client's command to the current batch and processes the batch + * if it becomes full. + * + * Returns C_OK if the command was added successfully, C_ERR otherwise. */ +int addCommandToBatchAndProcessIfFull(client *c) { + if (!batch) return C_ERR; + + batch->clients[batch->client_count++] = c; + + /* Get command's keys positions */ + if (c->io_parsed_cmd) { + getKeysResult result; + initGetKeysResult(&result); + int num_keys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); + for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + batch->slots[batch->key_count] = c->slot > 0 ? c->slot : 0; + batch->keys_dicts[batch->key_count] = kvstoreGetDict(c->db->keys, batch->slots[batch->key_count]); + batch->expire_dicts[batch->key_count] = kvstoreGetDict(c->db->expires, batch->slots[batch->key_count]); + batch->key_count++; + } + getKeysFreeResult(&result); + } + + /* If the batch is full, process it. + * We also check the client count to handle cases where + * no keys exist for the clients' commands. */ + if (batch->client_count == batch->max_prefetch_size || batch->key_count == batch->max_prefetch_size) { + processClientsCommandsBatch(); + } + + return C_OK; +} + +/* Removes the given client from the pending prefetch batch, if present. */ +void removeClientFromPendingCommandsBatch(client *c) { + if (!batch) return; + + for (size_t i = 0; i < batch->client_count; i++) { + if (batch->clients[i] == c) { + batch->clients[i] = NULL; + return; + } + } +} diff --git a/src/memory_prefetch.h b/src/memory_prefetch.h new file mode 100644 index 0000000000..5a181cc58d --- /dev/null +++ b/src/memory_prefetch.h @@ -0,0 +1,11 @@ +#ifndef MEMORY_PREFETCH_H +#define MEMORY_PREFETCH_H + +struct client; + +void prefetchCommandsBatchInit(void); +void processClientsCommandsBatch(void); +int addCommandToBatchAndProcessIfFull(struct client *c); +void removeClientFromPendingCommandsBatch(struct client *c); + +#endif /* MEMORY_PREFETCH_H */ diff --git a/src/networking.c b/src/networking.c index bcefe6054a..3e0a186fde 100644 --- a/src/networking.c +++ b/src/networking.c @@ -33,8 +33,8 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" -#include #include "io_threads.h" +#include #include #include #include @@ -45,6 +45,7 @@ static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); + int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; @@ -1500,6 +1501,7 @@ void unlinkClient(client *c) { listDelNode(server.clients, c->client_list_node); c->client_list_node = NULL; } + removeClientFromPendingCommandsBatch(c); /* Check if this is a replica waiting for diskless replication (rdb pipe), * in which case it needs to be cleaned from that list */ @@ -4621,6 +4623,12 @@ int postponeClientRead(client *c) { } int processIOThreadsReadDone(void) { + if (ProcessingEventsWhileBlocked) { + /* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively. + * In this case, there may be some clients left in the batch waiting to be processed. */ + processClientsCommandsBatch(); + } + if (listLength(server.clients_pending_io_read) == 0) return 0; int processed = 0; listNode *ln; @@ -4639,16 +4647,18 @@ int processIOThreadsReadDone(void) { } /* memory barrier acquire to get the updated client state */ atomic_thread_fence(memory_order_acquire); - /* Don't post-process-writes to clients that are going to be closed anyway. */ - if (c->flag.close_asap) continue; - /* If a client is protected, don't do anything, - * that may trigger read/write error or recreate handler. */ - if (c->flag.protected) continue; listUnlinkNode(server.clients_pending_io_read, ln); c->flag.pending_read = 0; c->io_read_state = CLIENT_IDLE; + /* Don't post-process-reads from clients that are going to be closed anyway. */ + if (c->flag.close_asap) continue; + + /* If a client is protected, don't do anything, + * that may trigger read/write error or recreate handler. */ + if (c->flag.protected) continue; + processed++; server.stat_io_reads_processed++; @@ -4676,8 +4686,11 @@ int processIOThreadsReadDone(void) { } size_t list_length_before_command_execute = listLength(server.clients_pending_io_read); - if (processPendingCommandAndInputBuffer(c) == C_OK) { - beforeNextClient(c); + /* try to add the command to the batch */ + int ret = addCommandToBatchAndProcessIfFull(c); + /* If the command was not added to the commands batch, process it immediately */ + if (ret == C_ERR) { + if (processPendingCommandAndInputBuffer(c) == C_OK) beforeNextClient(c); } if (list_length_before_command_execute != listLength(server.clients_pending_io_read)) { /* A client was unlink from the list possibly making the next node invalid */ @@ -4685,6 +4698,8 @@ int processIOThreadsReadDone(void) { } } + processClientsCommandsBatch(); + return processed; } @@ -4783,6 +4798,18 @@ void ioThreadReadQueryFromClient(void *data) { c->io_parsed_cmd = NULL; } + /* Offload slot calculations to the I/O thread to reduce main-thread load. */ + if (c->io_parsed_cmd && server.cluster_enabled) { + getKeysResult result; + initGetKeysResult(&result); + int numkeys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); + if (numkeys) { + robj *first_key = c->argv[result.keys[0].pos]; + c->slot = calculateKeySlot(first_key->ptr); + } + getKeysFreeResult(&result); + } + done: trimClientQueryBuffer(c); atomic_thread_fence(memory_order_release); diff --git a/src/server.c b/src/server.c index aaee462739..0ab3d87dab 100644 --- a/src/server.c +++ b/src/server.c @@ -5720,6 +5720,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, + "io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches, + "io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index 3530f42718..549e7eb6cb 100644 --- a/src/server.h +++ b/src/server.h @@ -79,6 +79,7 @@ typedef long long ustime_t; /* microsecond time type. */ N-elements flat arrays */ #include "rax.h" /* Radix tree */ #include "connection.h" /* Connection abstraction */ +#include "memory_prefetch.h" #define VALKEYMODULE_CORE 1 typedef struct serverObject robj; @@ -1756,6 +1757,7 @@ struct valkeyServer { int io_threads_do_reads; /* Read and parse from IO threads? */ int active_io_threads_num; /* Current number of active IO threads, includes main thread. */ int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */ + int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ @@ -1837,6 +1839,8 @@ struct valkeyServer { long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ + long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ + long long stat_total_prefetch_batches; /* Total number of prefetched batches */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 24f8caae9c..9eaf467477 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -170,3 +170,121 @@ start_server {config "minimal.conf" tags {"external:skip"}} { } } } + +start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} { + set server_pid [s process_id] + # Skip if non io-threads mode - as it is relevant only for io-threads mode + if {[r config get io-threads] ne "io-threads 1"} { + test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { + # Create 16 (prefetch batch size) +1 clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [valkey_deferring_client] + } + + # set a key that will be later be prefetch + r set a 0 + + # Get the client ID of rd4 + $rd4 client id + set rd4_id [$rd4 read] + + # Create a batch of commands by suspending the server for a while + # before responding to the first command + pause_process $server_pid + + # The first client will kill the fourth client + $rd0 client kill id $rd4_id + + # Send set commands for all clients except the first + for {set i 1} {$i < 16} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Resume the server + resume_process $server_pid + + # Read the results + assert_equal {1} [$rd0 read] + catch {$rd4 read} err + assert_match {I/O error reading reply} $err + + # verify the prefetch stats are as expected + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower + set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] + assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher + + # Verify the final state + $rd15 get a + assert_equal {OK} [$rd15 read] + assert_equal {15} [$rd15 read] + } + + test {prefetch works as expected when changing the batch size while executing the commands batch} { + # Create 16 (default prefetch batch size) clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [valkey_deferring_client] + } + + # Create a batch of commands by suspending the server for a while + # before responding to the first command + pause_process $server_pid + + # Send set commands for all clients the 5th client will change the prefetch batch size + for {set i 0} {$i < 16} {incr i} { + if {$i == 4} { + [set rd$i] config set prefetch-batch-max-size 1 + } + [set rd$i] set a $i + [set rd$i] flush + } + # Resume the server + resume_process $server_pid + # Read the results + for {set i 0} {$i < 16} {incr i} { + assert_equal {OK} [[set rd$i] read] + } + + # assert the configured prefetch batch size was changed + assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} + } + + test {no prefetch when the batch size is set to 0} { + # set the batch size to 0 + r config set prefetch-batch-max-size 0 + # save the current value of prefetch entries + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + + # Create 16 (default prefetch batch size) clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [valkey_deferring_client] + } + + # Create a batch of commands by suspending the server for a while + # before responding to the first command + pause_process $server_pid + + # Send set commands for all clients + for {set i 0} {$i < 16} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Resume the server + resume_process $server_pid + + # Read the results + for {set i 0} {$i < 16} {incr i} { + assert_equal {OK} [[set rd$i] read] + } + + # assert the prefetch entries did not change + set info [r info stats] + set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_equal $prefetch_entries $new_prefetch_entries + } + } +} diff --git a/utils/generate-fmtargs.py b/utils/generate-fmtargs.py index e16cc368fa..dfe8efadcc 100755 --- a/utils/generate-fmtargs.py +++ b/utils/generate-fmtargs.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Outputs the generated part of src/fmtargs.h -MAX_ARGS = 120 +MAX_ARGS = 200 import os print("/* Everything below this line is automatically generated by") diff --git a/valkey.conf b/valkey.conf index 8465facb50..fef152a290 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1326,7 +1326,19 @@ lazyfree-lazy-user-flush no # to thread the write and read syscall and transfer the client buffers to the # socket and to enable threading of reads and protocol parsing. # -# NOTE 2: If you want to test the server speedup using valkey-benchmark, make +# When multiple commands are parsed by the I/O threads and ready for execution, +# we take advantage of knowing the next set of commands and prefetch their +# required dictionary entries in a batch. This reduces memory access costs. +# +# The optimal batch size depends on the specific workflow of the user. +# The default batch size is 16, which can be modified using the +# 'prefetch-batch-max-size' config. +# +# When the config is set to 0, prefetching is disabled. +# +# prefetch-batch-max-size 16 +# +# NOTE: If you want to test the server speedup using valkey-benchmark, make # sure you also run the benchmark itself in threaded mode, using the # --threads option to match the number of server threads, otherwise you'll not # be able to notice the improvements.