Skip to content

Commit

Permalink
On RDB load, after reading db size opcode forcefully expand main dict
Browse files Browse the repository at this point in the history
This commit is meant to fix performance degradation caused by the replica
having to rehash many times during RDB load.
Testing locally we were able to reproduce up to 50% degredation in
BGSAVE time.

Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Oct 21, 2024
1 parent 2743b7e commit 3d51cdd
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 11 deletions.
18 changes: 13 additions & 5 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -1902,28 +1902,36 @@ static int dbExpandSkipSlot(int slot) {
* `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in
* `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
*/
static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand) {
static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand, int create_if_missing) {
int ret;
if (server.cluster_enabled) {
/* We don't know exact number of keys that would fall into each slot, but we can
* approximate it, assuming even distribution, divide it by the number of slots. */
int slots = getMyShardSlotCount();
if (slots == 0) return C_OK;
db_size = db_size / slots;
ret = kvstoreExpand(kvs, db_size, try_expand, dbExpandSkipSlot);
ret = kvstoreExpandGeneric(kvs, db_size, try_expand, dbExpandSkipSlot, create_if_missing);
} else {
ret = kvstoreExpand(kvs, db_size, try_expand, NULL);
ret = kvstoreExpandGeneric(kvs, db_size, try_expand, NULL, create_if_missing);
}

return ret ? C_OK : C_ERR;
}

int dbExpand(serverDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->keys, db_size, try_expand);
return dbExpandGeneric(db->keys, db_size, try_expand, 0);
}

int dbExpand_CreateIfMissing(serverDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->keys, db_size, try_expand, 1);
}

int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->expires, db_size, try_expand);
return dbExpandGeneric(db->expires, db_size, try_expand, 0);
}

int dbExpandExpires_CreateIfMissing(serverDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->expires, db_size, try_expand, 1);
}

dictEntry *dbFindWithDictIndex(serverDb *db, void *key, int dict_index) {
Expand Down
9 changes: 8 additions & 1 deletion src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,16 @@ unsigned long long kvstoreScan(kvstore *kvs,
*
* Based on the parameter `try_expand`, appropriate dict expand API is invoked.
* if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`.
* if create_if_missing is set to 1, the dictionary will be created if it doesn't exist before doing the
* expansion.
* The return code is either `DICT_OK`/`DICT_ERR` for both the API(s).
* `DICT_OK` response is for successful expansion. However, `DICT_ERR` response signifies failure in allocation in
* `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
*/
int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb) {
int kvstoreExpandGeneric(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb, int create_if_missing) {
for (int i = 0; i < kvs->num_dicts; i++) {
dict *d = kvstoreGetDict(kvs, i);
if (!d && create_if_missing) d = createDictIfNeeded(kvs, i);
if (!d || (skip_cb && skip_cb(i))) continue;
int result = try_expand ? dictTryExpand(d, newsize) : dictExpand(d, newsize);
if (try_expand && result == DICT_ERR) return 0;
Expand All @@ -433,6 +436,10 @@ int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandS
return 1;
}

int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb) {
return kvstoreExpandGeneric(kvs, newsize, try_expand, skip_cb, 0);
}

/* Returns fair random dict index, probability of each dict being returned is proportional to the number of elements
* that dictionary holds. This function guarantees that it returns a dict-index of a non-empty dict, unless the entire
* kvstore is empty. Time complexity of this function is O(log(kvs->num_dicts)). */
Expand Down
1 change: 1 addition & 0 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ unsigned long long kvstoreScan(kvstore *kvs,
kvstoreScanShouldSkipDict *skip_cb,
void *privdata);
int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb);
int kvstoreExpandGeneric(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb, int create_if_missing);
int kvstoreGetFairRandomDictIndex(kvstore *kvs);
void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full);

Expand Down
16 changes: 11 additions & 5 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3075,7 +3075,12 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
* selected data base, in order to avoid useless rehashing. */
if ((db_size = rdbLoadLen(rdb, NULL)) == RDB_LENERR) goto eoferr;
if ((expires_size = rdbLoadLen(rdb, NULL)) == RDB_LENERR) goto eoferr;
should_expand_db = 1;
if (!server.cluster_enabled) {
/* Ignore RDB_OPCODE_RESIZEDB in ValKey-8.0 CME: ValKey 8.0 uses dict per slot. Using resize opcode might
* expand all the replica's dictionaries, including dictionaries of unowned slots. This causes memory
* overhead when syncing from ValKey-7 primaries. */
should_expand_db = 1;
}
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
Expand Down Expand Up @@ -3245,11 +3250,12 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
continue;
}

/* If there is no slot info, it means that it's either not cluster mode or we are trying to load legacy RDB
* file. In this case we want to estimate number of keys per slot and resize accordingly. */
/* If there is no slot info, it means that it's either not cluster mode or we are trying to load a legacy RDB
* file. In either cases we want to resize the db accordingly. */
if (should_expand_db) {
dbExpand(db, db_size, 0);
dbExpandExpires(db, expires_size, 0);
// The db dictionary might not be created yet, so we need to create it if it's missing.
dbExpand_CreateIfMissing(db, db_size, 0);
dbExpandExpires_CreateIfMissing(db, expires_size, 0);
should_expand_db = 0;
}

Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3346,7 +3346,9 @@ int calculateKeySlot(sds key);

/* kvstore wrappers */
int dbExpand(serverDb *db, uint64_t db_size, int try_expand);
int dbExpand_CreateIfMissing(serverDb *db, uint64_t db_size, int try_expand);
int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand);
int dbExpandExpires_CreateIfMissing(serverDb *db, uint64_t db_size, int try_expand);
dictEntry *dbFind(serverDb *db, void *key);
dictEntry *dbFindExpires(serverDb *db, void *key);
unsigned long long dbSize(serverDb *db);
Expand Down

0 comments on commit 3d51cdd

Please sign in to comment.