Skip to content

Commit

Permalink
Optimize the per slot dictionary by checking for cluster mode earlier (
Browse files Browse the repository at this point in the history
…valkey-io#995)

While doing some profiling, I noticed that getKeySlot() was a fairly
large part (~0.7%) of samples doing perf with high pipeline during
standalone. I think this is because we do a very late check for
server.cluster_mode, we first call getKeySlot() and then call
calculateKeySlot(). (calculateKeySlot was surprisingly not automatically
inlined, we were doing a jump into it and then immediately returning
zero). We then also do useless work in the form of caching zero in
client->slot, which will further mess with cache lines.

So, this PR tries to accomplish a few things things.
1) The usage of the `slot` name made a lot more sense before the
introduction of the kvstore. Now with kvstore, we call this the database
index, so all the references to slot in standalone are no longer really
accurate.
2) Pull the cluster mode check all the way out of getKeySlot(), so
hopefully a bit more performant.
3) Remove calculateKeySlot() as independent from getKeySlot().
calculateKeySlot used to have 3 call sites outside of db.c, which
warranted it's own function. It's now only called in two places,
pubsub.c and networking.c.

I ran some profiling, and saw about ~0.3% improvement, but don't really
trust it because you'll see a much higher (~2%) variance in test runs
just by how the branch predictions will get changed with a new memory
layout. Running perf again showed no samples in getKeySlot() and a
reduction in samples in lookupKey(), so maybe this will help a little
bit.

---------

Signed-off-by: Madelyn Olson <[email protected]>
  • Loading branch information
madolson authored Sep 11, 2024
1 parent 2b207ee commit fa348e2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 32 deletions.
62 changes: 32 additions & 30 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ typedef enum {
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags);
int keyIsExpired(serverDb *db, robj *key);
static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
static int getKVStoreIndexForKey(sds key);

/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
Expand Down Expand Up @@ -125,7 +126,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) {
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)) {
if (!canUseSharedObject() && val->refcount == OBJ_SHARED_REFCOUNT) {
val = dupStringObject(val);
kvstoreDictSetVal(db->keys, getKeySlot(key->ptr), de, val);
kvstoreDictSetVal(db->keys, getKVStoreIndexForKey(key->ptr), de, val);
}
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
Expand Down Expand Up @@ -202,15 +203,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
* if the key already exists, otherwise, it can fall back to dbOverwrite. */
static void dbAddInternal(serverDb *db, robj *key, robj *val, int update_if_existing) {
dictEntry *existing;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing);
int dict_index = getKVStoreIndexForKey(key->ptr);
dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key->ptr, &existing);
if (update_if_existing && existing) {
dbSetValue(db, key, val, 1, existing);
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
signalKeyAsReady(db, key, val->type);
notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id);
}
Expand All @@ -219,32 +220,33 @@ void dbAdd(serverDb *db, robj *key, robj *val) {
dbAddInternal(db, key, val, 0);
}

/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled.
* The only difference between this function and getKeySlot, is that it's not using cached key slot from the
* current_client and always calculates CRC hash. This is useful when slot needs to be calculated for a key that user
* didn't request for, such as in case of eviction. */
int calculateKeySlot(sds key) {
return server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0;
/* Returns which dict index should be used with kvstore for a given key. */
static int getKVStoreIndexForKey(sds key) {
return server.cluster_enabled ? getKeySlot(key) : 0;
}

/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/
/* Returns the cluster hash slot for a given key, trying to use the cached slot that
* stored on the server.current_client first. If there is no cached value, it will compute the hash slot
* and then cache the value.*/
int getKeySlot(sds key) {
serverAssert(server.cluster_enabled);
/* This is performance optimization that uses pre-set slot id from the current command,
* in order to avoid calculation of the key hash.
*
* This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
* the key slot would fallback to keyHashSlot.
*
* Modules and scripts executed on the primary may get replicated as multi-execs that operate on multiple slots,
* so we must always recompute the slot for commands coming from the primary.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
!server.current_client->flag.primary) {
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
debugServerAssertWithInfo(server.current_client, NULL,
(int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot);
return server.current_client->slot;
}
int slot = calculateKeySlot(key);
int slot = keyHashSlot(key, (int)sdslen(key));
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
* we are able to backfill c->slot here, where the key's hash calculation is made. */
Expand All @@ -267,11 +269,11 @@ int getKeySlot(sds key) {
* In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed.
*/
int dbAddRDBLoad(serverDb *db, sds key, robj *val) {
int slot = getKeySlot(key);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL);
int dict_index = server.cluster_enabled ? getKeySlot(key) : 0;
dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL);
if (de == NULL) return 0;
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
return 1;
}

Expand All @@ -288,8 +290,8 @@ int dbAddRDBLoad(serverDb *db, sds key, robj *val) {
*
* The program is aborted if the key was not already present. */
static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de) {
int slot = getKeySlot(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr);
int dict_index = getKVStoreIndexForKey(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, dict_index, key->ptr);
serverAssertWithInfo(NULL, key, de != NULL);
robj *old = dictGetVal(de);

Expand All @@ -309,7 +311,7 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
/* For efficiency, let the I/O thread that allocated an object also deallocate it. */
if (tryOffloadFreeObjToIOThreads(old) == C_OK) {
/* OK */
Expand Down Expand Up @@ -404,8 +406,8 @@ robj *dbRandomKey(serverDb *db) {
int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
dictEntry **plink;
int table;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table);
int dict_index = getKVStoreIndexForKey(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
Expand All @@ -421,13 +423,13 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
if (async) {
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
kvstoreDictSetVal(db->keys, slot, de, NULL);
kvstoreDictSetVal(db->keys, dict_index, de, NULL);
}
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
kvstoreDictDelete(db->expires, slot, key->ptr);
kvstoreDictDelete(db->expires, dict_index, key->ptr);

kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table);
kvstoreDictTwoPhaseUnlinkFree(db->keys, dict_index, de, plink, table);
return 1;
} else {
return 0;
Expand Down Expand Up @@ -1664,7 +1666,7 @@ void swapdbCommand(client *c) {
*----------------------------------------------------------------------------*/

int removeExpire(serverDb *db, robj *key) {
return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK;
return kvstoreDictDelete(db->expires, getKVStoreIndexForKey(key->ptr), key->ptr) == DICT_OK;
}

/* Set an expire to the specified key. If the expire is set in the context
Expand All @@ -1675,10 +1677,10 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) {
dictEntry *kde, *de, *existing;

/* Reuse the sds from the main dict in the expire dict */
int slot = getKeySlot(key->ptr);
kde = kvstoreDictFind(db->keys, slot, key->ptr);
int dict_index = getKVStoreIndexForKey(key->ptr);
kde = kvstoreDictFind(db->keys, dict_index, key->ptr);
serverAssertWithInfo(NULL, key, kde != NULL);
de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing);
de = kvstoreDictAddRaw(db->expires, dict_index, dictGetKey(kde), &existing);
if (existing) {
dictSetSignedIntegerVal(existing, when);
} else {
Expand Down Expand Up @@ -1896,7 +1898,7 @@ int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) {
}

static dictEntry *dbFindGeneric(kvstore *kvs, void *key) {
return kvstoreDictFind(kvs, getKeySlot(key), key);
return kvstoreDictFind(kvs, server.cluster_enabled ? getKeySlot(key) : 0, key);
}

dictEntry *dbFind(serverDb *db, void *key) {
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4827,7 +4827,7 @@ void ioThreadReadQueryFromClient(void *data) {
int numkeys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result);
if (numkeys) {
robj *first_key = c->argv[result.keys[0].pos];
c->slot = calculateKeySlot(first_key->ptr);
c->slot = keyHashSlot(first_key->ptr, sdslen(first_key->ptr));
}
getKeysFreeResult(&result);
}
Expand Down
3 changes: 2 additions & 1 deletion src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ void pubsubCommand(client *c) {
int j;
addReplyArrayLen(c, (c->argc - 2) * 2);
for (j = 2; j < c->argc; j++) {
unsigned int slot = calculateKeySlot(c->argv[j]->ptr);
sds key = c->argv[j]->ptr;
unsigned int slot = server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0;
dict *clients = kvstoreDictFetchValue(server.pubsubshard_channels, slot, c->argv[j]);

addReplyBulk(c, c->argv[j]);
Expand Down

0 comments on commit fa348e2

Please sign in to comment.