Skip to content

Commit

Permalink
Replace dict with hashtable in kvstore and more
Browse files Browse the repository at this point in the history
* Add functions for embedding key and expire in robj (object.c)
  * Always embed key in robj
  * Embed expire in robj, only if needed or if there is unused space in the allocation
* Update db functions (db.c)
  * dbAdd, setKey and setExpire return the reallocated object with embedded key
  * setKey does not increment the reference counter, since it would require
    duplicating the object
  * Handle dbAdd and dbReplaceValue return value
* Replace dict in kvstore
* defrag
* Fix some things including MOVE, RENAME commands
* Fix test case "expire scan should skip dictionaries with lot's of empty buckets"
* Fix resize allowed maxmemory check
* Fix kvstore unit tests
* Fix maxmemory test (increase size)
* Delete test cases about shared integers
* Scan single step in active expire
* Seed hashtable hash function on startup
* Set resize policy when a fork is active, and fix test case
* Fix rehashing test case in unit/info suite
* Fix more resize test cases in unit/other
* Fix scan testcases not expecting duplicates
* Another run at the unit/expire test suite
* Fix typo in defrag pubsub channels
* Make dbAddRDBLoad return the (maybe reallocated) object
* Add unit test file test_object.c
* Account for scan duplicates in valkey-cli test
* Optimize INCR (can't use shared object)
* Remove code for shared integer objects (When key is embedded in object, all
  objects in the database need to be unique so we can't use shared objects as
  values.)
  • Loading branch information
zuiderkwast committed Nov 18, 2024
1 parent dfbfda5 commit 40f6624
Show file tree
Hide file tree
Showing 39 changed files with 1,525 additions and 1,552 deletions.
11 changes: 5 additions & 6 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2190,7 +2190,7 @@ static int rewriteFunctions(rio *aof) {
}

int rewriteAppendOnlyFileRio(rio *aof) {
dictEntry *de;
valkey *o;
int j;
long key_count = 0;
long long updated_time = 0;
Expand Down Expand Up @@ -2219,17 +2219,16 @@ int rewriteAppendOnlyFileRio(rio *aof) {

kvs_it = kvstoreIteratorInit(db->keys);
/* Iterate this DB writing every entry */
while ((de = kvstoreIteratorNext(kvs_it)) != NULL) {
while (kvstoreIteratorNext(kvs_it, (void **)&o)) {
sds keystr;
robj key, *o;
robj key;
long long expiretime;
size_t aof_bytes_before_key = aof->processed_bytes;

keystr = dictGetKey(de);
o = dictGetVal(de);
keystr = objectGetKey(o);
initStaticStringObject(key, keystr);

expiretime = getExpire(db, &key);
expiretime = objectGetExpire(o);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
Expand Down
5 changes: 2 additions & 3 deletions src/bitops.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ robj *lookupStringForBitCommand(client *c, uint64_t maxbit, int *dirty) {

if (o == NULL) {
o = createObject(OBJ_STRING, sdsnewlen(NULL, byte + 1));
dbAdd(c->db, c->argv[1], o);
o = dbAdd(c->db, c->argv[1], o);
if (dirty) *dirty = 1;
} else {
o = dbUnshareStringValue(c->db, c->argv[1], o);
Expand Down Expand Up @@ -772,9 +772,8 @@ void bitopCommand(client *c) {
/* Store the computed value into the target key */
if (maxlen) {
o = createObject(OBJ_STRING, res);
setKey(c, c->db, targetkey, o, 0);
o = setKey(c, c->db, targetkey, o, 0);
notifyKeyspaceEvent(NOTIFY_STRING, "set", targetkey, c->db->id);
decrRefCount(o);
server.dirty++;
} else if (dbDelete(c->db, targetkey)) {
signalModifiedKey(c, c->db, targetkey);
Expand Down
19 changes: 9 additions & 10 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ void restoreCommand(client *c) {
}

/* Create the key and set the TTL if any */
dbAdd(c->db, key, obj);
obj = dbAdd(c->db, key, obj);
if (ttl) {
setExpire(c, c->db, key, ttl);
obj = setExpire(c, c->db, key, ttl);
if (!absttl) {
/* Propagate TTL as absolute timestamp */
robj *ttl_obj = createStringObjectFromLongLong(ttl);
Expand Down Expand Up @@ -811,7 +811,7 @@ static int shouldReturnTlsInfo(void) {
}

unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreDictSize(server.db->keys, slot);
return kvstoreHashtableSize(server.db->keys, slot);
}

void clusterCommandHelp(client *c) {
Expand Down Expand Up @@ -908,16 +908,15 @@ void clusterCommand(client *c) {
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictIterator(server.db->keys, slot);
kvstoreHashtableIterator *kvs_di = NULL;
valkey *valkey = NULL;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot);
for (unsigned int i = 0; i < numkeys; i++) {
de = kvstoreDictIteratorNext(kvs_di);
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
serverAssert(kvstoreHashtableIteratorNext(kvs_di, (void **)&valkey));
sds sdskey = objectGetKey(valkey);
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreReleaseHashtableIterator(kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr, "slaves") || !strcasecmp(c->argv[1]->ptr, "replicas")) && c->argc == 3) {
/* CLUSTER REPLICAS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
Expand Down
14 changes: 7 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6089,12 +6089,12 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
server.server_del_keys_in_slot = 1;
unsigned int j = 0;

kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictSafeIterator(server.db->keys, hashslot);
while ((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
kvstoreHashtableIterator *kvs_di = NULL;
valkey *valkey = NULL;
kvs_di = kvstoreGetHashtableSafeIterator(server.db->keys, hashslot);
while (kvstoreHashtableIteratorNext(kvs_di, (void **)&valkey)) {
enterExecutionUnit(1, 0);
sds sdskey = dictGetKey(de);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
Expand All @@ -6109,7 +6109,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
j++;
server.dirty++;
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreReleaseHashtableIterator(kvs_di);

server.server_del_keys_in_slot = 0;
serverAssert(server.execution_nesting == 0);
Expand All @@ -6118,7 +6118,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {

/* Get the count of the channels for a given slot. */
unsigned int countChannelsInSlot(unsigned int hashslot) {
return kvstoreDictSize(server.pubsubshard_channels, hashslot);
return kvstoreHashtableSize(server.pubsubshard_channels, hashslot);
}

clusterNode *getMyClusterNode(void) {
Expand Down
Loading

0 comments on commit 40f6624

Please sign in to comment.