Skip to content

Commit

Permalink
Replace dict with hashtable for keys, expires and pubsub channels
Browse files Browse the repository at this point in the history
Instead of a dictEntry with pointers to key and value, the hashtable
has a pointer directly to the value (robj) which can hold an embedded
key and acts as a key-value in the hashtable. This minimizes the number
of pointers to follow and thus the number of memory accesses to lookup
a key-value pair.

        Keys         robj
      hashtable
      +-------+   +-----------------------+
      | 0     |   | type, encoding, LRU   |
      | 1 ------->| refcount, expire      |
      | 2     |   | ptr                   |
      | ...   |   | optional embedded key |
      +-------+   | optional embedded val |
                  +-----------------------+

The expire timestamp (TTL) is also stored in the robj, if any. The expire
hash table points to the same robj.

Overview of changes:

* Replace dict with hashtable in kvstore (kvstore.c)
* Add functions for embedding key and expire in robj (object.c)
  * When there's unused space, reserve an expire field to avoid realloting
    it later if expire is added.
  * Always reserve space for expire for large key names to avoid realloc
    if it's set later.
* Update db functions (db.c)
  * dbAdd, setKey and setExpire reallocate the object when embedding a key
  * setKey does not increment the reference counter, since it would require
    duplicating the object. This responsibility is moved to the caller.
* Remove logic for shared integer objects as values in the database. The keys
  are now embedded in the objects, so all objects in the database need to be
  unique. Thus, we can't use shared objects as values. Also delete test cases
  for shared integers.
* Adjust various commands to the changes mentioned above.
* Adjust defrag code
  * Improvement: Don't access the expires table before defrag has actually
    reallocated the object.
* Adjust test cases that were using hard-coded sizes for dict when realloc
  would happen, and some other adjustments in test cases.
* Adjust memory prefetch for new hash table implementation in IO-threading,
  using new `hashtableIncrementalFind` API
* Adjust offloading of free() to IO threads: Object free to be done in main
  thread while keeping obj->ptr offloading in IO-thread since the DB object is
  now allocated by the main-thread and not by the IO-thread as it used to be.
* Let expireIfNeeded take an optional value, to avoid looking up the expires
  table when possible.

---------

Signed-off-by: Uri Yagelnik <[email protected]>
Signed-off-by: uriyage <[email protected]>
Signed-off-by: Viktor Söderqvist <[email protected]>
Co-authored-by: Uri Yagelnik <[email protected]>
  • Loading branch information
zuiderkwast and uriyage committed Dec 6, 2024
1 parent 1661431 commit 3ed37ea
Show file tree
Hide file tree
Showing 42 changed files with 1,705 additions and 1,436 deletions.
12 changes: 6 additions & 6 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2190,7 +2190,6 @@ static int rewriteFunctions(rio *aof) {
}

int rewriteAppendOnlyFileRio(rio *aof) {
dictEntry *de;
int j;
long key_count = 0;
long long updated_time = 0;
Expand Down Expand Up @@ -2219,17 +2218,18 @@ int rewriteAppendOnlyFileRio(rio *aof) {

kvs_it = kvstoreIteratorInit(db->keys);
/* Iterate this DB writing every entry */
while ((de = kvstoreIteratorNext(kvs_it)) != NULL) {
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
robj *o = next;
sds keystr;
robj key, *o;
robj key;
long long expiretime;
size_t aof_bytes_before_key = aof->processed_bytes;

keystr = dictGetKey(de);
o = dictGetVal(de);
keystr = objectGetKey(o);
initStaticStringObject(key, keystr);

expiretime = getExpire(db, &key);
expiretime = objectGetExpire(o);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
Expand Down
5 changes: 2 additions & 3 deletions src/bitops.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ robj *lookupStringForBitCommand(client *c, uint64_t maxbit, int *dirty) {

if (o == NULL) {
o = createObject(OBJ_STRING, sdsnewlen(NULL, byte + 1));
dbAdd(c->db, c->argv[1], o);
dbAdd(c->db, c->argv[1], &o);
if (dirty) *dirty = 1;
} else {
o = dbUnshareStringValue(c->db, c->argv[1], o);
Expand Down Expand Up @@ -772,9 +772,8 @@ void bitopCommand(client *c) {
/* Store the computed value into the target key */
if (maxlen) {
o = createObject(OBJ_STRING, res);
setKey(c, c->db, targetkey, o, 0);
setKey(c, c->db, targetkey, &o, 0);
notifyKeyspaceEvent(NOTIFY_STRING, "set", targetkey, c->db->id);
decrRefCount(o);
server.dirty++;
} else if (dbDelete(c->db, targetkey)) {
signalModifiedKey(c, c->db, targetkey);
Expand Down
20 changes: 10 additions & 10 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ void restoreCommand(client *c) {
}

/* Create the key and set the TTL if any */
dbAdd(c->db, key, obj);
dbAdd(c->db, key, &obj);
if (ttl) {
setExpire(c, c->db, key, ttl);
obj = setExpire(c, c->db, key, ttl);
if (!absttl) {
/* Propagate TTL as absolute timestamp */
robj *ttl_obj = createStringObjectFromLongLong(ttl);
Expand Down Expand Up @@ -811,7 +811,7 @@ static int shouldReturnTlsInfo(void) {
}

unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreDictSize(server.db->keys, slot);
return kvstoreHashtableSize(server.db->keys, slot);
}

void clusterCommandHelp(client *c) {
Expand Down Expand Up @@ -908,16 +908,16 @@ void clusterCommand(client *c) {
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictIterator(server.db->keys, slot);
kvstoreHashtableIterator *kvs_di = NULL;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot);
for (unsigned int i = 0; i < numkeys; i++) {
de = kvstoreDictIteratorNext(kvs_di);
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
void *next;
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
robj *valkey = next;
sds sdskey = objectGetKey(valkey);
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreReleaseHashtableIterator(kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr, "slaves") || !strcasecmp(c->argv[1]->ptr, "replicas")) && c->argc == 3) {
/* CLUSTER REPLICAS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
Expand Down
15 changes: 8 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6158,12 +6158,13 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
server.server_del_keys_in_slot = 1;
unsigned int j = 0;

kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictSafeIterator(server.db->keys, hashslot);
while ((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
kvstoreHashtableIterator *kvs_di = NULL;
void *next;
kvs_di = kvstoreGetHashtableSafeIterator(server.db->keys, hashslot);
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
robj *valkey = next;
enterExecutionUnit(1, 0);
sds sdskey = dictGetKey(de);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
Expand All @@ -6178,7 +6179,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
j++;
server.dirty++;
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreReleaseHashtableIterator(kvs_di);

server.server_del_keys_in_slot = 0;
serverAssert(server.execution_nesting == 0);
Expand All @@ -6187,7 +6188,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {

/* Get the count of the channels for a given slot. */
unsigned int countChannelsInSlot(unsigned int hashslot) {
return kvstoreDictSize(server.pubsubshard_channels, hashslot);
return kvstoreHashtableSize(server.pubsubshard_channels, hashslot);
}

clusterNode *getMyClusterNode(void) {
Expand Down
Loading

0 comments on commit 3ed37ea

Please sign in to comment.