diff --git a/src/db.c b/src/db.c index 312199ec6c..3493e2d863 100644 --- a/src/db.c +++ b/src/db.c @@ -55,6 +55,7 @@ typedef enum { keyStatus expireIfNeeded(serverDb *db, robj *key, int flags); int keyIsExpired(serverDb *db, robj *key); static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de); +static int getKVStoreIndexForKey(sds key); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. @@ -125,7 +126,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) { if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)) { if (!canUseSharedObject() && val->refcount == OBJ_SHARED_REFCOUNT) { val = dupStringObject(val); - kvstoreDictSetVal(db->keys, getKeySlot(key->ptr), de, val); + kvstoreDictSetVal(db->keys, getKVStoreIndexForKey(key->ptr), de, val); } if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); @@ -202,15 +203,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { * if the key already exists, otherwise, it can fall back to dbOverwrite. */ static void dbAddInternal(serverDb *db, robj *key, robj *val, int update_if_existing) { dictEntry *existing; - int slot = getKeySlot(key->ptr); - dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing); + int dict_index = getKVStoreIndexForKey(key->ptr); + dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key->ptr, &existing); if (update_if_existing && existing) { dbSetValue(db, key, val, 1, existing); return; } serverAssertWithInfo(NULL, key, de != NULL); initObjectLRUOrLFU(val); - kvstoreDictSetVal(db->keys, slot, de, val); + kvstoreDictSetVal(db->keys, dict_index, de, val); signalKeyAsReady(db, key, val->type); notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id); } @@ -219,32 +220,33 @@ void dbAdd(serverDb *db, robj *key, robj *val) { dbAddInternal(db, key, val, 0); } -/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled. - * The only difference between this function and getKeySlot, is that it's not using cached key slot from the - * current_client and always calculates CRC hash. This is useful when slot needs to be calculated for a key that user - * didn't request for, such as in case of eviction. */ -int calculateKeySlot(sds key) { - return server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0; +/* Returns which dict index should be used with kvstore for a given key. */ +static int getKVStoreIndexForKey(sds key) { + return server.cluster_enabled ? getKeySlot(key) : 0; } -/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/ +/* Returns the cluster hash slot for a given key, trying to use the cached slot that + * stored on the server.current_client first. If there is no cached value, it will compute the hash slot + * and then cache the value.*/ int getKeySlot(sds key) { + serverAssert(server.cluster_enabled); /* This is performance optimization that uses pre-set slot id from the current command, * in order to avoid calculation of the key hash. * * This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set. * It only gets set during the execution of command under `call` method. Other flows requesting - * the key slot would fallback to calculateKeySlot. + * the key slot would fallback to keyHashSlot. * * Modules and scripts executed on the primary may get replicated as multi-execs that operate on multiple slots, * so we must always recompute the slot for commands coming from the primary. */ if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command && !server.current_client->flag.primary) { - debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot); + debugServerAssertWithInfo(server.current_client, NULL, + (int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot); return server.current_client->slot; } - int slot = calculateKeySlot(key); + int slot = keyHashSlot(key, (int)sdslen(key)); /* For the case of replicated commands from primary, getNodeByQuery() never gets called, * and thus c->slot never gets populated. That said, if this command ends up accessing a key, * we are able to backfill c->slot here, where the key's hash calculation is made. */ @@ -267,11 +269,11 @@ int getKeySlot(sds key) { * In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed. */ int dbAddRDBLoad(serverDb *db, sds key, robj *val) { - int slot = getKeySlot(key); - dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL); + int dict_index = server.cluster_enabled ? getKeySlot(key) : 0; + dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL); if (de == NULL) return 0; initObjectLRUOrLFU(val); - kvstoreDictSetVal(db->keys, slot, de, val); + kvstoreDictSetVal(db->keys, dict_index, de, val); return 1; } @@ -288,8 +290,8 @@ int dbAddRDBLoad(serverDb *db, sds key, robj *val) { * * The program is aborted if the key was not already present. */ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de) { - int slot = getKeySlot(key->ptr); - if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr); + int dict_index = getKVStoreIndexForKey(key->ptr); + if (!de) de = kvstoreDictFind(db->keys, dict_index, key->ptr); serverAssertWithInfo(NULL, key, de != NULL); robj *old = dictGetVal(de); @@ -309,7 +311,7 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn /* Because of RM_StringDMA, old may be changed, so we need get old again */ old = dictGetVal(de); } - kvstoreDictSetVal(db->keys, slot, de, val); + kvstoreDictSetVal(db->keys, dict_index, de, val); /* For efficiency, let the I/O thread that allocated an object also deallocate it. */ if (tryOffloadFreeObjToIOThreads(old) == C_OK) { /* OK */ @@ -404,8 +406,8 @@ robj *dbRandomKey(serverDb *db) { int dbGenericDelete(serverDb *db, robj *key, int async, int flags) { dictEntry **plink; int table; - int slot = getKeySlot(key->ptr); - dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table); + int dict_index = getKVStoreIndexForKey(key->ptr); + dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table); if (de) { robj *val = dictGetVal(de); /* RM_StringDMA may call dbUnshareStringValue which may free val, so we @@ -421,13 +423,13 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) { if (async) { /* Because of dbUnshareStringValue, the val in de may change. */ freeObjAsync(key, dictGetVal(de), db->id); - kvstoreDictSetVal(db->keys, slot, de, NULL); + kvstoreDictSetVal(db->keys, dict_index, de, NULL); } /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - kvstoreDictDelete(db->expires, slot, key->ptr); + kvstoreDictDelete(db->expires, dict_index, key->ptr); - kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table); + kvstoreDictTwoPhaseUnlinkFree(db->keys, dict_index, de, plink, table); return 1; } else { return 0; @@ -1664,7 +1666,7 @@ void swapdbCommand(client *c) { *----------------------------------------------------------------------------*/ int removeExpire(serverDb *db, robj *key) { - return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK; + return kvstoreDictDelete(db->expires, getKVStoreIndexForKey(key->ptr), key->ptr) == DICT_OK; } /* Set an expire to the specified key. If the expire is set in the context @@ -1675,10 +1677,10 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) { dictEntry *kde, *de, *existing; /* Reuse the sds from the main dict in the expire dict */ - int slot = getKeySlot(key->ptr); - kde = kvstoreDictFind(db->keys, slot, key->ptr); + int dict_index = getKVStoreIndexForKey(key->ptr); + kde = kvstoreDictFind(db->keys, dict_index, key->ptr); serverAssertWithInfo(NULL, key, kde != NULL); - de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing); + de = kvstoreDictAddRaw(db->expires, dict_index, dictGetKey(kde), &existing); if (existing) { dictSetSignedIntegerVal(existing, when); } else { @@ -1896,7 +1898,7 @@ int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) { } static dictEntry *dbFindGeneric(kvstore *kvs, void *key) { - return kvstoreDictFind(kvs, getKeySlot(key), key); + return kvstoreDictFind(kvs, server.cluster_enabled ? getKeySlot(key) : 0, key); } dictEntry *dbFind(serverDb *db, void *key) { diff --git a/src/networking.c b/src/networking.c index 0c6716c504..24c68dc8a2 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4827,7 +4827,7 @@ void ioThreadReadQueryFromClient(void *data) { int numkeys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result); if (numkeys) { robj *first_key = c->argv[result.keys[0].pos]; - c->slot = calculateKeySlot(first_key->ptr); + c->slot = keyHashSlot(first_key->ptr, sdslen(first_key->ptr)); } getKeysFreeResult(&result); } diff --git a/src/pubsub.c b/src/pubsub.c index 047d408621..5b037b5721 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -667,7 +667,8 @@ void pubsubCommand(client *c) { int j; addReplyArrayLen(c, (c->argc - 2) * 2); for (j = 2; j < c->argc; j++) { - unsigned int slot = calculateKeySlot(c->argv[j]->ptr); + sds key = c->argv[j]->ptr; + unsigned int slot = server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0; dict *clients = kvstoreDictFetchValue(server.pubsubshard_channels, slot, c->argv[j]); addReplyBulk(c, c->argv[j]);