From c3195720e009c92e6d44a00aa0d43eee1460ba5d Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Sun, 25 Aug 2024 07:16:21 +0000 Subject: [PATCH] PR comments Signed-off-by: Uri Yagelnik --- src/config.c | 8 +--- src/dict.c | 1 - src/dict.h | 1 + src/io_threads.c | 6 +++ src/kvstore.h | 1 + src/memory_prefetch.c | 90 ++++++++++++++++++++------------------- src/memory_prefetch.h | 1 - tests/unit/networking.tcl | 58 +++++++++++++------------ 8 files changed, 87 insertions(+), 79 deletions(-) diff --git a/src/config.c b/src/config.c index 364aa38fe1..7ef9d58a23 100644 --- a/src/config.c +++ b/src/config.c @@ -2564,12 +2564,6 @@ static int updateOOMScoreAdj(const char **err) { return 1; } -static int UpdateMaxPrefetchBatchSize(const char **err) { - UNUSED(err); - onMaxBatchSizeChange(); - return 1; -} - int invalidateClusterSlotsResp(const char **err) { UNUSED(err); clearCachedClusterSlotsResponse(); @@ -3170,7 +3164,7 @@ standardConfig static_configs[] = { createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), - createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, UpdateMaxPrefetchBatchSize), + createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */ createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL), diff --git a/src/dict.c b/src/dict.c index 9ad3fd0abf..851718626e 100644 --- a/src/dict.c +++ b/src/dict.c @@ -120,7 +120,6 @@ static void _dictExpandIfNeeded(dict *d); static void _dictShrinkIfNeeded(dict *d); static signed char _dictNextExp(unsigned long size); static int _dictInit(dict *d, dictType *type); -dictEntry *dictGetNext(const dictEntry *de); static dictEntry **dictGetNextRef(dictEntry *de); static void dictSetNext(dictEntry *de, dictEntry *next); diff --git a/src/dict.h b/src/dict.h index 97a79910cb..1671533f5c 100644 --- a/src/dict.h +++ b/src/dict.h @@ -229,6 +229,7 @@ void dictInitIterator(dictIterator *iter, dict *d); void dictInitSafeIterator(dictIterator *iter, dict *d); void dictResetIterator(dictIterator *iter); dictEntry *dictNext(dictIterator *iter); +dictEntry *dictGetNext(const dictEntry *de); void dictReleaseIterator(dictIterator *iter); dictEntry *dictGetRandomKey(dict *d); dictEntry *dictGetFairRandomKey(dict *d); diff --git a/src/io_threads.c b/src/io_threads.c index 7a68cfb87f..5b2230f635 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -1,3 +1,9 @@ +/* + * Copyright Valkey Contributors. + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + */ + #include "io_threads.h" static __thread int thread_id = 0; /* Thread local var */ diff --git a/src/kvstore.h b/src/kvstore.h index a94f366b6b..202f6a9c25 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -76,5 +76,6 @@ void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val); dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index); void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index); int kvstoreDictDelete(kvstore *kvs, int didx, const void *key); +dict *kvstoreGetDict(kvstore *kvs, int didx); #endif /* DICTARRAY_H_ */ diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index a8a7beecff..01c510638a 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -1,4 +1,8 @@ /* + * Copyright Valkey Contributors. + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + * * This file utilizes prefetching keys and data for multiple commands in a batch, * to improve performance by amortizing memory access costs across multiple operations. */ @@ -48,38 +52,38 @@ typedef enum { └───────────┬──────────────┘ │ | │ ┌───────-─▼─────────────┐ │ - │ PREFETCH_DONE │◄────────┘ + │ PREFETCH_DONE │◄────────┘ └───────────────────────┘ **********************************************************************************************************************/ typedef void *(*GetValueDataFunc)(const void *val); -typedef struct PrefetchInfo { +typedef struct KeyPrefetchInfo { PrefetchState state; /* Current state of the prefetch operation */ HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ uint64_t bucket_idx; /* Index of the bucket in the current hash table */ uint64_t key_hash; /* Hash value of the key being prefetched */ dictEntry *current_entry; /* Pointer to the current entry being processed */ -} PrefetchInfo; - -/* CommandsBatch structure holds the state of the current batch of client commands being processed. */ -typedef struct CommandsBatch { - size_t cur_idx; /* Index of the current key being processed */ - size_t keys_done; /* Number of keys that have been prefetched */ - size_t key_count; /* Number of keys in the current batch */ - size_t client_count; /* Number of clients in the current batch */ - size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ - size_t executed_commands; /* Number of commands executed in the current batch */ - int *slots; /* Array of slots for each key */ - void **keys; /* Array of keys to prefetch in the current batch */ - client **clients; /* Array of clients in the current batch */ - dict **keys_dicts; /* Main dict for each key */ - dict **expire_dicts; /* Expire dict for each key */ - dict **current_dicts; /* Points to either keys_dicts or expire_dicts */ - PrefetchInfo *prefetch_info; /* Prefetch info for each key */ -} CommandsBatch; - -static CommandsBatch *batch = NULL; +} KeyPrefetchInfo; + +/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */ +typedef struct PrefetchCommandsBatch { + size_t cur_idx; /* Index of the current key being processed */ + size_t keys_done; /* Number of keys that have been prefetched */ + size_t key_count; /* Number of keys in the current batch */ + size_t client_count; /* Number of clients in the current batch */ + size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ + size_t executed_commands; /* Number of commands executed in the current batch */ + int *slots; /* Array of slots for each key */ + void **keys; /* Array of keys to prefetch in the current batch */ + client **clients; /* Array of clients in the current batch */ + dict **keys_dicts; /* Main dict for each key */ + dict **expire_dicts; /* Expire dict for each key */ + dict **current_dicts; /* Points to either keys_dicts or expire_dicts */ + KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */ +} PrefetchCommandsBatch; + +static PrefetchCommandsBatch *batch = NULL; void freePrefetchCommandsBatch(void) { if (batch == NULL) { @@ -104,14 +108,14 @@ void prefetchCommandsBatchInit(void) { return; } - batch = zcalloc(sizeof(CommandsBatch)); + batch = zcalloc(sizeof(PrefetchCommandsBatch)); batch->max_prefetch_size = max_prefetch_size; batch->clients = zcalloc(max_prefetch_size * sizeof(client *)); batch->keys = zcalloc(max_prefetch_size * sizeof(void *)); batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); batch->slots = zcalloc(max_prefetch_size * sizeof(int)); - batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(PrefetchInfo)); + batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo)); } void onMaxBatchSizeChange(void) { @@ -125,23 +129,23 @@ void onMaxBatchSizeChange(void) { } /* Prefetch the given pointer and move to the next key in the batch. */ -static void prefetch(void *addr) { +static void prefetchAndMoveToNextKey(void *addr) { valkey_prefetch(addr); /* While the prefetch is in progress, we can continue to the next key */ batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; } -static void markDone(PrefetchInfo *info) { +static void markKeyAsdone(KeyPrefetchInfo *info) { info->state = PREFETCH_DONE; server.stat_total_prefetch_entries++; batch->keys_done++; } -/* Returns the next PrefetchInfo structure that needs to be processed. */ -static PrefetchInfo *getNextPrefetchInfo(void) { +/* Returns the next KeyPrefetchInfo structure that needs to be processed. */ +static KeyPrefetchInfo *getNextPrefetchInfo(void) { size_t start_idx = batch->cur_idx; do { - PrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; + KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; if (info->state != PREFETCH_DONE) return info; batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; } while (batch->cur_idx != start_idx); @@ -153,7 +157,7 @@ static void initBatchInfo(dict **dicts) { /* Initialize the prefetch info */ for (size_t i = 0; i < batch->key_count; i++) { - PrefetchInfo *info = &batch->prefetch_info[i]; + KeyPrefetchInfo *info = &batch->prefetch_info[i]; if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) { info->state = PREFETCH_DONE; batch->keys_done++; @@ -168,7 +172,7 @@ static void initBatchInfo(dict **dicts) { /* Prefetch the bucket of the next hash table index. * If no tables are left, move to the PREFETCH_DONE state. */ -static void prefetchBucket(PrefetchInfo *info) { +static void prefetchBucket(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; /* Determine which hash table to use */ @@ -178,20 +182,20 @@ static void prefetchBucket(PrefetchInfo *info) { info->ht_idx = HT_IDX_SECOND; } else { /* No more tables left - mark as done. */ - markDone(info); + markKeyAsdone(info); return; } /* Prefetch the bucket */ info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]); - prefetch(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); info->current_entry = NULL; info->state = PREFETCH_ENTRY; } /* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state. * If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ -static void prefetchEntry(PrefetchInfo *info) { +static void prefetchEntry(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; if (info->current_entry) { @@ -203,7 +207,7 @@ static void prefetchEntry(PrefetchInfo *info) { } if (info->current_entry) { - prefetch(info->current_entry); + prefetchAndMoveToNextKey(info->current_entry); info->state = PREFETCH_VALUE; } else { /* No entry found in the bucket - try the bucket in the next table */ @@ -213,13 +217,13 @@ static void prefetchEntry(PrefetchInfo *info) { /* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state. * If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */ -static void prefetchValue(PrefetchInfo *info) { +static void prefetchValue(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; void *value = dictGetVal(info->current_entry); if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) { /* If this is the last element, we assume a hit and don't compare the keys */ - prefetch(value); + prefetchAndMoveToNextKey(value); info->state = PREFETCH_VALUE_DATA; return; } @@ -228,7 +232,7 @@ static void prefetchValue(PrefetchInfo *info) { if (batch->keys[i] == current_entry_key || dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) { /* If the key is found, prefetch the value */ - prefetch(value); + prefetchAndMoveToNextKey(value); info->state = PREFETCH_VALUE_DATA; } else { /* Move to the next entry */ @@ -237,12 +241,12 @@ static void prefetchValue(PrefetchInfo *info) { } /* Prefetch the value data if available. */ -static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_func) { +static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_data_func) { if (get_val_data_func) { void *value_data = get_val_data_func(dictGetVal(info->current_entry)); - if (value_data) prefetch(value_data); + if (value_data) prefetchAndMoveToNextKey(value_data); } - markDone(info); + markKeyAsdone(info); } /* Prefetch dictionary data for an array of keys. @@ -268,7 +272,7 @@ static void prefetchValueData(PrefetchInfo *info, GetValueDataFunc get_val_data_ */ static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) { initBatchInfo(dicts); - PrefetchInfo *info; + KeyPrefetchInfo *info; while ((info = getNextPrefetchInfo())) { switch (info->state) { case PREFETCH_BUCKET: prefetchBucket(info); break; @@ -357,7 +361,7 @@ void processClientsCommandsBatch(void) { resetCommandsBatch(); - /* Handle the case where the max prefetch size has been changed during the batch processing */ + /* Handle the case where the max prefetch size has been changed. */ if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) { onMaxBatchSizeChange(); } diff --git a/src/memory_prefetch.h b/src/memory_prefetch.h index 428f097e05..5a181cc58d 100644 --- a/src/memory_prefetch.h +++ b/src/memory_prefetch.h @@ -3,7 +3,6 @@ struct client; -void onMaxBatchSizeChange(void); void prefetchCommandsBatchInit(void); void processClientsCommandsBatch(void); int addCommandToBatchAndProcessIfFull(struct client *c); diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 3ddda042b5..9eaf467477 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -172,12 +172,12 @@ start_server {config "minimal.conf" tags {"external:skip"}} { } start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} { - + set server_pid [s process_id] # Skip if non io-threads mode - as it is relevant only for io-threads mode if {[r config get io-threads] ne "io-threads 1"} { test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { - # Create 17 (prefetch batch size) +1 clients - for {set i 0} {$i < 17} {incr i} { + # Create 16 (prefetch batch size) +1 clients + for {set i 0} {$i < 16} {incr i} { set rd$i [valkey_deferring_client] } @@ -188,22 +188,24 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb $rd4 client id set rd4_id [$rd4 read] - # Create a batch of commands by making sure the server sleeps for a while + # Create a batch of commands by suspending the server for a while # before responding to the first command - $rd0 debug sleep 2 - after 200 ; # wait a bit to make sure the server is sleeping. + pause_process $server_pid # The first client will kill the fourth client - $rd1 client kill id $rd4_id + $rd0 client kill id $rd4_id # Send set commands for all clients except the first - for {set i 1} {$i < 17} {incr i} { + for {set i 1} {$i < 16} {incr i} { [set rd$i] set a $i [set rd$i] flush } + # Resume the server + resume_process $server_pid + # Read the results - assert_equal {1} [$rd1 read] + assert_equal {1} [$rd0 read] catch {$rd4 read} err assert_match {I/O error reading reply} $err @@ -212,25 +214,24 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] - assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher + assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher # Verify the final state - $rd16 get a - assert_equal {OK} [$rd16 read] - assert_equal {16} [$rd16 read] + $rd15 get a + assert_equal {OK} [$rd15 read] + assert_equal {15} [$rd15 read] } - + test {prefetch works as expected when changing the batch size while executing the commands batch} { # Create 16 (default prefetch batch size) clients for {set i 0} {$i < 16} {incr i} { set rd$i [valkey_deferring_client] } - - # Create a batch of commands by making sure the server sleeps for a while + + # Create a batch of commands by suspending the server for a while # before responding to the first command - $rd0 debug sleep 2 - after 200 ; # wait a bit to make sure the server is sleeping. - + pause_process $server_pid + # Send set commands for all clients the 5th client will change the prefetch batch size for {set i 0} {$i < 16} {incr i} { if {$i == 4} { @@ -239,14 +240,15 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb [set rd$i] set a $i [set rd$i] flush } - + # Resume the server + resume_process $server_pid # Read the results for {set i 0} {$i < 16} {incr i} { assert_equal {OK} [[set rd$i] read] } # assert the configured prefetch batch size was changed - assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} + assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} } test {no prefetch when the batch size is set to 0} { @@ -260,18 +262,20 @@ start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-deb for {set i 0} {$i < 16} {incr i} { set rd$i [valkey_deferring_client] } - - # Create a batch of commands by making sure the server sleeps for a while + + # Create a batch of commands by suspending the server for a while # before responding to the first command - $rd0 debug sleep 2 - after 200 ; # wait a bit to make sure the server is sleeping. - + pause_process $server_pid + # Send set commands for all clients for {set i 0} {$i < 16} {incr i} { [set rd$i] set a $i [set rd$i] flush } - + + # Resume the server + resume_process $server_pid + # Read the results for {set i 0} {$i < 16} {incr i} { assert_equal {OK} [[set rd$i] read]