diff --git a/src/ae.c b/src/ae.c index 9bf8619902..643ff17070 100644 --- a/src/ae.c +++ b/src/ae.c @@ -85,7 +85,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->timeEventHead = NULL; - eventLoop->timeEventNextId = 0; + eventLoop->timeEventNextId = 1; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; diff --git a/src/config.c b/src/config.c index 6c03cbb476..d05ea83611 100644 --- a/src/config.c +++ b/src/config.c @@ -3201,10 +3201,11 @@ standardConfig static_configs[] = { createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL), createIntConfig("tcp-keepalive", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcpkeepalive, 300, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-migration-barrier", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_migration_barrier, 1, INTEGER_CONFIG, NULL, NULL), - createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */ - createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */ + createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */ + createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */ createIntConfig("active-defrag-threshold-lower", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_lower, 10, INTEGER_CONFIG, NULL, NULL), /* Default: don't defrag when fragmentation is below 10% */ createIntConfig("active-defrag-threshold-upper", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_upper, 100, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: maximum defrag force at 100% fragmentation */ + createIntConfig("active-defrag-cycle-us", NULL, MODIFIABLE_CONFIG, 0, 100000, server.active_defrag_cycle_us, 500, INTEGER_CONFIG, NULL, updateDefragConfiguration), createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL), createIntConfig("replica-priority", "slave-priority", MODIFIABLE_CONFIG, 0, INT_MAX, server.replica_priority, 100, INTEGER_CONFIG, NULL, NULL), diff --git a/src/defrag.c b/src/defrag.c index 4d34009f8b..1aeb6968f6 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -38,27 +38,124 @@ #ifdef HAVE_DEFRAG -typedef struct defragCtx { - void *privdata; +typedef enum { DEFRAG_NOT_DONE = 0, + DEFRAG_DONE = 1 } doneStatus; // Clearly not a "bool" + + +/* + * Defragmentation is performed in stages. Each stage is serviced by a stage function + * (defragStageFn). The stage function is passed a target (void*) to defrag. The contents of that + * target are unique to the particular stage - and may even be NULL for some stage functions. The + * same stage function can be used multiple times (for different stages) each having a different + * target. + * + * The stage function is required to maintain an internal static state. This allows the stage + * function to continue when invoked in an iterative manner. When invoked with a 0 endtime, the + * stage function is required to clear it's internal state and prepare to begin a new stage. It + * should return false (more work to do) as it should NOT perform any real "work" during init. + * + * Parameters: + * endtime - This is the monotonic time that the function should end and return. This ensures + * a bounded latency due to defrag. When endtime is 0, the internal state should be + * cleared, preparing to begin the stage with a new target. + * target - This is the "thing" that should be defragged. It's type is dependent on the + * type of the stage function. This might be a dict, a kvstore, a DB, or other. + * privdata - A pointer to arbitrary private data which is unique to the stage function. + * + * Returns: + * - DEFRAG_DONE if the stage is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*defragStageFn)(monotime endtime, void *target, void *privdata); + +typedef struct { + defragStageFn stage_fn; // The function to be invoked for the stage + void *target; // The target that the function will defrag + void *privdata; // Private data, unique to the stage function +} StageDescriptor; + +/* Globals needed for the main defrag processing logic. + * Doesn't include variables specific to a stage or type of data. */ +struct DefragContext { + monotime start_cycle; // Time of beginning of defrag cycle + long long start_defrag_hits; // server.stat_active_defrag_hits captured at beginning of cycle + list *remaining_stages; // List of stages which remain to be processed + StageDescriptor *current_stage; // The stage that's currently being processed + + long long timeproc_id; // Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID) + monotime timeproc_end_time; // Ending time of previous timerproc execution + long timeproc_overage_us; // A correction value if over/under target CPU percent +}; +static struct DefragContext defrag; + + +/* There are a number of stages which process a kvstore. To simplify this, a stage helper function + * `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It + * uses these definitions. + */ +/* State of the kvstore helper. The private data (privdata) passed to the kvstore helper MUST BEGIN + * with a kvstoreIterState (or be passed as NULL). */ +#define KVS_SLOT_DEFRAG_LUT -2 +#define KVS_SLOT_UNASSIGNED -1 +typedef struct { + kvstore *kvs; int slot; - void *aux; -} defragCtx; + unsigned long cursor; +} kvstoreIterState; +/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the + * main dictionary, large items are set aside and processed by this function before continuing with + * iteration over the kvstore. + * endtime - This is the monotonic time that the function should end and return. + * privdata - Private data for functions invoked by the helper. If provided in the call to + * `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning) + * will be updated with the current kvstore iteration status. + * + * Returns: + * - DEFRAG_DONE if the pre-continue work is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdata); + -typedef struct defragPubSubCtx { - kvstore *pubsub_channels; - dict *(*clientPubSubChannels)(client *); +// Private data for main dictionary keys +typedef struct { + kvstoreIterState kvstate; + serverDb *db; + dictEntry *saved_expire_de; +} defragKeysCtx; + +// Private data for pubsub kvstores +typedef dict *(*getClientChannelsFn)(client *); +typedef struct { + getClientChannelsFn fn; +} getClientChannelsFnWrapper; + +typedef struct { + kvstoreIterState kvstate; + getClientChannelsFn getPubSubChannels; } defragPubSubCtx; + +/* When scanning a main kvstore, large elements are queued for later handling rather than + * causing a large latency spike while processing a hash table bucket. This list is only used + * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being + * defragged. + * Note that this is a list of key names. It's possible that the key may be deleted or modified + * before "later" and we will search by key name to find the entry when we defrag the item later. + */ +static list *defrag_later; +static unsigned long defrag_later_cursor; + + /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ int je_get_defrag_hint(void *ptr); -/* Defrag helper for generic allocations. - * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -void *activeDefragAlloc(void *ptr) { +/* Defrag function which allocates and copies memory if needed, but DOESN'T free the old block. + * It is the responsibility of the caller to free the old block if a non-NULL value (new block) + * is returned. (Returns NULL if no relocation was needed.) + */ +static void *activeDefragAllocWithoutFree(void *ptr) { size_t size; void *newptr; if (!je_get_defrag_hint(ptr)) { @@ -71,28 +168,40 @@ void *activeDefragAlloc(void *ptr) { size = zmalloc_size(ptr); newptr = zmalloc_no_tcache(size); memcpy(newptr, ptr, size); - zfree_no_tcache(ptr); server.stat_active_defrag_hits++; return newptr; } +/* Defrag helper for generic allocations. + * + * Returns NULL in case the allocation wasn't moved. + * When it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +void *activeDefragAlloc(void *ptr) { + void *newptr = activeDefragAllocWithoutFree(ptr); + if (newptr) zfree_no_tcache(ptr); + return newptr; +} + /* This method captures the expiry db dict entry which refers to data stored in keys db dict entry. */ -void defragEntryStartCbForKeys(void *ctx, void *oldptr) { - defragCtx *defragctx = (defragCtx *)ctx; - serverDb *db = defragctx->privdata; +static void defragEntryStartCbForKeys(void *ctx, void *oldptr) { + defragKeysCtx *defragctx = (defragKeysCtx *)ctx; + serverDb *db = defragctx->db; sds oldsds = (sds)dictGetKey((dictEntry *)oldptr); - int slot = defragctx->slot; + int slot = defragctx->kvstate.slot; if (kvstoreDictSize(db->expires, slot)) { dictEntry *expire_de = kvstoreDictFind(db->expires, slot, oldsds); - defragctx->aux = expire_de; + defragctx->saved_expire_de = expire_de; + } else { + defragctx->saved_expire_de = NULL; } } /* This method updates the key of expiry db dict entry. The key might be no longer valid * as it could have been cleaned up during the defrag-realloc of the main dictionary. */ -void defragEntryFinishCbForKeys(void *ctx, void *newptr) { - defragCtx *defragctx = (defragCtx *)ctx; - dictEntry *expire_de = (dictEntry *)defragctx->aux; +static void defragEntryFinishCbForKeys(void *ctx, void *newptr) { + defragKeysCtx *defragctx = (defragKeysCtx *)ctx; + dictEntry *expire_de = defragctx->saved_expire_de; /* Item doesn't have TTL associated to it. */ if (!expire_de) return; /* No reallocation happened. */ @@ -100,18 +209,18 @@ void defragEntryFinishCbForKeys(void *ctx, void *newptr) { expire_de = NULL; return; } - serverDb *db = defragctx->privdata; + serverDb *db = defragctx->db; sds newsds = (sds)dictGetKey((dictEntry *)newptr); - int slot = defragctx->slot; + int slot = defragctx->kvstate.slot; kvstoreDictSetKey(db->expires, slot, expire_de, newsds); } -/*Defrag helper for sds strings +/* Defrag helper for sds strings * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released + * Returns NULL in case the allocation wasn't moved. + * When it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -sds activeDefragSds(sds sdsptr) { +static sds activeDefragSds(sds sdsptr) { void *ptr = sdsAllocPtr(sdsptr); void *newptr = activeDefragAlloc(ptr); if (newptr) { @@ -122,60 +231,47 @@ sds activeDefragSds(sds sdsptr) { return NULL; } -/* Defrag helper for robj and/or string objects with expected refcount. - * - * Like activeDefragStringOb, but it requires the caller to pass in the expected - * reference count. In some cases, the caller needs to update a robj whose - * reference count is not 1, in these cases, the caller must explicitly pass - * in the reference count, otherwise defragmentation will not be performed. - * Note that the caller is responsible for updating any other references to the robj. */ -robj *activeDefragStringObEx(robj *ob, int expected_refcount) { - robj *ret = NULL; - if (ob->refcount != expected_refcount) return NULL; - - /* try to defrag robj (only if not an EMBSTR type (handled below). */ - if (ob->type != OBJ_STRING || ob->encoding != OBJ_ENCODING_EMBSTR) { - if ((ret = activeDefragAlloc(ob))) { - ob = ret; - } +/* Performs defrag on a string-type (or generic) robj, but does not free the old robj. This is the + * caller's responsibility. This is necessary for string objects with multiple references. In this + * case the caller can fix the references before freeing the original object. + */ +static robj *activeDefragStringObWithoutFree(robj *ob) { + if (ob->type == OBJ_STRING && ob->encoding == OBJ_ENCODING_RAW) { + // Try to defrag the linked sds, regardless of if robj will be moved + sds newsds = activeDefragSds((sds)ob->ptr); + if (newsds) ob->ptr = newsds; } - /* try to defrag string object */ - if (ob->type == OBJ_STRING) { - if (ob->encoding == OBJ_ENCODING_RAW) { - sds newsds = activeDefragSds((sds)ob->ptr); - if (newsds) { - ob->ptr = newsds; - } - } else if (ob->encoding == OBJ_ENCODING_EMBSTR) { - /* The sds is embedded in the object allocation, calculate the - * offset and update the pointer in the new allocation. */ - long ofs = (intptr_t)ob->ptr - (intptr_t)ob; - if ((ret = activeDefragAlloc(ob))) { - ret->ptr = (void *)((intptr_t)ret + ofs); - } - } else if (ob->encoding != OBJ_ENCODING_INT) { - serverPanic("Unknown string encoding"); - } + robj *new_robj = activeDefragAllocWithoutFree(ob); + + if (new_robj && ob->type == OBJ_STRING && ob->encoding == OBJ_ENCODING_EMBSTR) { + // If the robj is moved, correct the internal pointer + long embstr_offset = (intptr_t)ob->ptr - (intptr_t)ob; + new_robj->ptr = (void *)((intptr_t)new_robj + embstr_offset); } - return ret; + return new_robj; } + /* Defrag helper for robj and/or string objects * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released + * Returns NULL in case the allocation wasn't moved. + * When it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ robj *activeDefragStringOb(robj *ob) { - return activeDefragStringObEx(ob, 1); + if (ob->refcount != 1) return NULL; // Unsafe to defrag if multiple refs + robj *new_robj = activeDefragStringObWithoutFree(ob); + if (new_robj) zfree_no_tcache(ob); + return new_robj; } + /* Defrag helper for lua scripts * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released + * Returns NULL in case the allocation wasn't moved. + * When it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -luaScript *activeDefragLuaScript(luaScript *script) { +static luaScript *activeDefragLuaScript(luaScript *script) { luaScript *ret = NULL; /* try to defrag script struct */ @@ -197,7 +293,7 @@ luaScript *activeDefragLuaScript(luaScript *script) { * Returns NULL in case the allocation wasn't moved. * When it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -dict *dictDefragTables(dict *d) { +static dict *dictDefragTables(dict *d) { dict *ret = NULL; dictEntry **newtable; /* handle the dict struct */ @@ -215,7 +311,7 @@ dict *dictDefragTables(dict *d) { } /* Internal function used by zslDefrag */ -void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) { +static void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { if (update[i]->level[i].forward == oldnode) update[i]->level[i].forward = newnode; @@ -237,7 +333,7 @@ void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnod * only need to defrag the skiplist, but not update the obj pointer. * When return value is non-NULL, it is the score reference that must be updated * in the dict record. */ -double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { +static double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx; int i; sds ele = newele ? newele : oldele; @@ -271,7 +367,7 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { /* Defrag helper for sorted set. * Defrag a single dict entry key name, and corresponding skiplist struct */ -void activeDefragZsetEntry(zset *zs, dictEntry *de) { +static void activeDefragZsetEntry(zset *zs, dictEntry *de) { sds newsds; double *newscore; sds sdsele = dictGetKey(de); @@ -288,13 +384,13 @@ void activeDefragZsetEntry(zset *zs, dictEntry *de) { #define DEFRAG_SDS_DICT_VAL_VOID_PTR 3 #define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4 -void activeDefragSdsDictCallback(void *privdata, const dictEntry *de) { +static void activeDefragSdsDictCallback(void *privdata, const dictEntry *de) { UNUSED(privdata); UNUSED(de); } /* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ -void activeDefragSdsDict(dict *d, int val_type) { +static void activeDefragSdsDict(dict *d, int val_type) { unsigned long cursor = 0; dictDefragFunctions defragfns = { .defragAlloc = activeDefragAlloc, @@ -310,34 +406,7 @@ void activeDefragSdsDict(dict *d, int val_type) { } /* Defrag a list of ptr, sds or robj string values */ -void activeDefragList(list *l, int val_type) { - listNode *ln, *newln; - for (ln = l->head; ln; ln = ln->next) { - if ((newln = activeDefragAlloc(ln))) { - if (newln->prev) - newln->prev->next = newln; - else - l->head = newln; - if (newln->next) - newln->next->prev = newln; - else - l->tail = newln; - ln = newln; - } - if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { - sds newsds, sdsele = ln->value; - if ((newsds = activeDefragSds(sdsele))) ln->value = newsds; - } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { - robj *newele, *ele = ln->value; - if ((newele = activeDefragStringOb(ele))) ln->value = newele; - } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { - void *newptr, *ptr = ln->value; - if ((newptr = activeDefragAlloc(ptr))) ln->value = newptr; - } - } -} - -void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { +static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { quicklistNode *newnode, *node = *node_ref; unsigned char *newzl; if ((newnode = activeDefragAlloc(node))) { @@ -354,7 +423,7 @@ void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { if ((newzl = activeDefragAlloc(node->entry))) node->entry = newzl; } -void activeDefragQuickListNodes(quicklist *ql) { +static void activeDefragQuickListNodes(quicklist *ql) { quicklistNode *node = ql->head; while (node) { activeDefragQuickListNode(ql, &node); @@ -365,13 +434,18 @@ void activeDefragQuickListNodes(quicklist *ql) { /* when the value has lots of elements, we want to handle it later and not as * part of the main dictionary scan. this is needed in order to prevent latency * spikes when handling large items */ -void defragLater(serverDb *db, dictEntry *kde) { +static void defragLater(dictEntry *kde) { + if (!defrag_later) { + defrag_later = listCreate(); + listSetFreeMethod(defrag_later, (void (*)(void *))sdsfree); + defrag_later_cursor = 0; + } sds key = sdsdup(dictGetKey(kde)); - listAddNodeTail(db->defrag_later, key); + listAddNodeTail(defrag_later, key); } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { +static long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) { quicklist *ql = ob->ptr; quicklistNode *node; long iterations = 0; @@ -396,7 +470,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { activeDefragQuickListNode(ql, &node); server.stat_active_defrag_scanned++; if (++iterations > 128 && !bookmark_failed) { - if (ustime() > endtime) { + if (getMonotonicUs() > endtime) { if (!quicklistBookmarkCreate(&ql, "_AD", node)) { bookmark_failed = 1; } else { @@ -417,14 +491,14 @@ typedef struct { zset *zs; } scanLaterZsetData; -void scanLaterZsetCallback(void *privdata, const dictEntry *_de) { +static void scanLaterZsetCallback(void *privdata, const dictEntry *_de) { dictEntry *de = (dictEntry *)_de; scanLaterZsetData *data = privdata; activeDefragZsetEntry(data->zs, de); server.stat_active_defrag_scanned++; } -void scanLaterZset(robj *ob, unsigned long *cursor) { +static void scanLaterZset(robj *ob, unsigned long *cursor) { if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) return; zset *zs = (zset *)ob->ptr; dict *d = zs->dict; @@ -434,13 +508,13 @@ void scanLaterZset(robj *ob, unsigned long *cursor) { } /* Used as scan callback when all the work is done in the dictDefragFunctions. */ -void scanCallbackCountScanned(void *privdata, const dictEntry *de) { +static void scanCallbackCountScanned(void *privdata, const dictEntry *de) { UNUSED(privdata); UNUSED(de); server.stat_active_defrag_scanned++; } -void scanLaterSet(robj *ob, unsigned long *cursor) { +static void scanLaterSet(robj *ob, unsigned long *cursor) { if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT) return; dict *d = ob->ptr; dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc, @@ -448,7 +522,7 @@ void scanLaterSet(robj *ob, unsigned long *cursor) { *cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL); } -void scanLaterHash(robj *ob, unsigned long *cursor) { +static void scanLaterHash(robj *ob, unsigned long *cursor) { if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) return; dict *d = ob->ptr; dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc, @@ -457,18 +531,18 @@ void scanLaterHash(robj *ob, unsigned long *cursor) { *cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL); } -void defragQuicklist(serverDb *db, dictEntry *kde) { +static void defragQuicklist(dictEntry *kde) { robj *ob = dictGetVal(kde); quicklist *ql = ob->ptr, *newql; serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); if ((newql = activeDefragAlloc(ql))) ob->ptr = ql = newql; if (ql->len > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(kde); else activeDefragQuickListNodes(ql); } -void defragZsetSkiplist(serverDb *db, dictEntry *kde) { +static void defragZsetSkiplist(dictEntry *kde) { robj *ob = dictGetVal(kde); zset *zs = (zset *)ob->ptr; zset *newzs; @@ -481,7 +555,7 @@ void defragZsetSkiplist(serverDb *db, dictEntry *kde) { if ((newzsl = activeDefragAlloc(zs->zsl))) zs->zsl = newzsl; if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader; if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(kde); else { dictIterator *di = dictGetIterator(zs->dict); while ((de = dictNext(di)) != NULL) { @@ -493,26 +567,26 @@ void defragZsetSkiplist(serverDb *db, dictEntry *kde) { if ((newdict = dictDefragTables(zs->dict))) zs->dict = newdict; } -void defragHash(serverDb *db, dictEntry *kde) { +static void defragHash(dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(kde); else activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS); /* defrag the dict struct and tables */ if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd; } -void defragSet(serverDb *db, dictEntry *kde) { +static void defragSet(dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(kde); else activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); /* defrag the dict struct and tables */ @@ -521,7 +595,7 @@ void defragSet(serverDb *db, dictEntry *kde) { /* Defrag callback for radix tree iterator, called for each node, * used in order to defrag the nodes allocations. */ -int defragRaxNode(raxNode **noderef) { +static int defragRaxNode(raxNode **noderef) { raxNode *newnode = activeDefragAlloc(*noderef); if (newnode) { *noderef = newnode; @@ -531,7 +605,7 @@ int defragRaxNode(raxNode **noderef) { } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) { +static int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) { static unsigned char last[sizeof(streamID)]; raxIterator ri; long iterations = 0; @@ -567,7 +641,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) if (newdata) raxSetData(ri.node, ri.data = newdata); server.stat_active_defrag_scanned++; if (++iterations > 128) { - if (ustime() > endtime) { + if (getMonotonicUs() > endtime) { serverAssert(ri.key_len == sizeof(last)); memcpy(last, ri.key, ri.key_len); raxStop(&ri); @@ -589,7 +663,7 @@ typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata); * 2) rax nodes * 3) rax entry data (only if defrag_data is specified) * 4) call a callback per element, and allow the callback to return a new pointer for the element */ -void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { +static void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { raxIterator ri; rax *rax; if ((rax = activeDefragAlloc(*raxref))) *raxref = rax; @@ -612,7 +686,7 @@ typedef struct { streamConsumer *c; } PendingEntryContext; -void *defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) { +static void *defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) { PendingEntryContext *ctx = privdata; streamNACK *nack = ri->data, *newnack; nack->consumer = ctx->c; /* update nack pointer to consumer */ @@ -626,7 +700,7 @@ void *defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) { return newnack; } -void *defragStreamConsumer(raxIterator *ri, void *privdata) { +static void *defragStreamConsumer(raxIterator *ri, void *privdata) { streamConsumer *c = ri->data; streamCG *cg = privdata; void *newc = activeDefragAlloc(c); @@ -642,7 +716,7 @@ void *defragStreamConsumer(raxIterator *ri, void *privdata) { return newc; /* returns NULL if c was not defragged */ } -void *defragStreamConsumerGroup(raxIterator *ri, void *privdata) { +static void *defragStreamConsumerGroup(raxIterator *ri, void *privdata) { streamCG *cg = ri->data; UNUSED(privdata); if (cg->consumers) defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg); @@ -650,7 +724,7 @@ void *defragStreamConsumerGroup(raxIterator *ri, void *privdata) { return NULL; } -void defragStream(serverDb *db, dictEntry *kde) { +static void defragStream(dictEntry *kde) { robj *ob = dictGetVal(kde); serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); stream *s = ob->ptr, *news; @@ -661,7 +735,7 @@ void defragStream(serverDb *db, dictEntry *kde) { if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { rax *newrax = activeDefragAlloc(s->rax); if (newrax) s->rax = newrax; - defragLater(db, kde); + defragLater(kde); } else defragRadixTree(&s->rax, 1, NULL, NULL); @@ -671,25 +745,25 @@ void defragStream(serverDb *db, dictEntry *kde) { /* Defrag a module key. This is either done immediately or scheduled * for later. Returns then number of pointers defragged. */ -void defragModule(serverDb *db, dictEntry *kde) { +static void defragModule(serverDb *db, dictEntry *kde) { robj *obj = dictGetVal(kde); serverAssert(obj->type == OBJ_MODULE); - if (!moduleDefragValue(dictGetKey(kde), obj, db->id)) defragLater(db, kde); + if (!moduleDefragValue(dictGetKey(kde), obj, db->id)) defragLater(kde); } /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. */ -void defragKey(defragCtx *ctx, dictEntry *de) { - serverDb *db = ctx->privdata; - int slot = ctx->slot; +static void defragKey(defragKeysCtx *ctx, dictEntry *de) { + serverDb *db = ctx->db; + int slot = ctx->kvstate.slot; robj *newob, *ob; unsigned char *newzl; /* Try to defrag robj and / or string value. */ ob = dictGetVal(de); if ((newob = activeDefragStringOb(ob))) { - kvstoreDictSetVal(db->keys, slot, de, newob); + kvstoreDictSetVal(ctx->kvstate.kvs, slot, de, newob); ob = newob; } @@ -697,7 +771,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { /* Already handled in activeDefragStringOb. */ } else if (ob->type == OBJ_LIST) { if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - defragQuicklist(db, de); + defragQuicklist(de); } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; } else { @@ -705,7 +779,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { } } else if (ob->type == OBJ_SET) { if (ob->encoding == OBJ_ENCODING_HT) { - defragSet(db, de); + defragSet(de); } else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) { void *newptr, *ptr = ob->ptr; if ((newptr = activeDefragAlloc(ptr))) ob->ptr = newptr; @@ -716,7 +790,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - defragZsetSkiplist(db, de); + defragZsetSkiplist(de); } else { serverPanic("Unknown sorted set encoding"); } @@ -724,12 +798,12 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { - defragHash(db, de); + defragHash(de); } else { serverPanic("Unknown hash encoding"); } } else if (ob->type == OBJ_STREAM) { - defragStream(db, de); + defragStream(de); } else if (ob->type == OBJ_MODULE) { defragModule(db, de); } else { @@ -738,9 +812,9 @@ void defragKey(defragCtx *ctx, dictEntry *de) { } /* Defrag scan callback for the main db dictionary. */ -void defragScanCallback(void *privdata, const dictEntry *de) { +static void dbKeysScanCallback(void *privdata, const dictEntry *de) { long long hits_before = server.stat_active_defrag_hits; - defragKey((defragCtx *)privdata, (dictEntry *)de); + defragKey((defragKeysCtx *)privdata, (dictEntry *)de); if (server.stat_active_defrag_hits != hits_before) server.stat_active_defrag_key_hits++; else @@ -754,7 +828,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) { * fragmentation ratio in order to decide if a defrag action should be taken * or not, a false detection can cause the defragmenter to waste a lot of CPU * without the possibility of getting any results. */ -float getAllocatorFragmentation(size_t *out_frag_bytes) { +static float getAllocatorFragmentation(size_t *out_frag_bytes) { size_t resident, active, allocated, frag_smallbins_bytes; zmalloc_get_allocator_info(&allocated, &active, &resident, NULL, NULL, &frag_smallbins_bytes); @@ -772,18 +846,17 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) { } /* Defrag scan callback for the pubsub dictionary. */ -void defragPubsubScanCallback(void *privdata, const dictEntry *de) { - defragCtx *ctx = privdata; - defragPubSubCtx *pubsub_ctx = ctx->privdata; - kvstore *pubsub_channels = pubsub_ctx->pubsub_channels; +static void defragPubsubScanCallback(void *privdata, const dictEntry *de) { + defragPubSubCtx *ctx = privdata; + kvstore *pubsub_channels = ctx->kvstate.kvs; robj *newchannel, *channel = dictGetKey(de); dict *newclients, *clients = dictGetVal(de); /* Try to defrag the channel name. */ serverAssert(channel->refcount == (int)dictSize(clients) + 1); - newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); + newchannel = activeDefragStringObWithoutFree(channel); if (newchannel) { - kvstoreDictSetKey(pubsub_channels, ctx->slot, (dictEntry *)de, newchannel); + kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newchannel); /* The channel name is shared by the client's pubsub(shard) and server's * pubsub(shard), after defraging the channel name, we need to update @@ -792,35 +865,26 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { dictEntry *clientde; while ((clientde = dictNext(di)) != NULL) { client *c = dictGetKey(clientde); - dictEntry *pubsub_channel = dictFind(pubsub_ctx->clientPubSubChannels(c), newchannel); + dict *client_channels = ctx->getPubSubChannels(c); + dictEntry *pubsub_channel = dictFind(client_channels, newchannel); serverAssert(pubsub_channel); - dictSetKey(pubsub_ctx->clientPubSubChannels(c), pubsub_channel, newchannel); + dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel); } dictReleaseIterator(di); + // Now that we're done correcting the references, we can safely free the old channel robj + zfree_no_tcache(channel); } /* Try to defrag the dictionary of clients that is stored as the value part. */ if ((newclients = dictDefragTables(clients))) - kvstoreDictSetVal(pubsub_channels, ctx->slot, (dictEntry *)de, newclients); + kvstoreDictSetVal(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newclients); server.stat_active_defrag_scanned++; } -/* We may need to defrag other globals, one small allocation can hold a full allocator run. - * so although small, it is still important to defrag these */ -void defragOtherGlobals(void) { - /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. - * but we assume most of these are short lived, we only need to defrag allocations - * that remain static for a long time */ - activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); - moduleDefragGlobals(); - kvstoreDictLUTDefrag(server.pubsub_channels, dictDefragTables); - kvstoreDictLUTDefrag(server.pubsubshard_channels, dictDefragTables); -} - /* returns 0 more work may or may not be needed (see non-zero cursor), * and 1 if time is up and more work is needed. */ -int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int dbid) { +static int defragLaterItem(dictEntry *de, unsigned long *cursor, monotime endtime, int dbid) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { @@ -834,7 +898,8 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int } else if (ob->type == OBJ_STREAM) { return scanLaterStreamListpacks(ob, cursor, endtime); } else if (ob->type == OBJ_MODULE) { - return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, dbid); + long long endtimeWallClock = ustime() + (endtime - getMonotonicUs()); + return moduleLateDefrag(dictGetKey(de), ob, cursor, endtimeWallClock, dbid); } else { *cursor = 0; /* object type may have changed since we schedule it for later */ } @@ -844,299 +909,450 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int return 0; } -/* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */ -static sds defrag_later_current_key = NULL; -static unsigned long defrag_later_cursor = 0; -/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int defragLaterStep(serverDb *db, int slot, long long endtime) { +// A kvstoreHelperPreContinueFn +static doneStatus defragLaterStep(monotime endtime, void *privdata) { + defragKeysCtx *ctx = privdata; + unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - long long key_defragged; - - do { - /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!defrag_later_cursor) { - listNode *head = listFirst(db->defrag_later); - - /* Move on to next key */ - if (defrag_later_current_key) { - serverAssert(defrag_later_current_key == head->value); - listDelNode(db->defrag_later, head); - defrag_later_cursor = 0; - defrag_later_current_key = NULL; - } - /* stop if we reached the last one. */ - head = listFirst(db->defrag_later); - if (!head) return 0; + while (defrag_later && listLength(defrag_later) > 0) { + listNode *head = listFirst(defrag_later); + sds key = head->value; + dictEntry *de = kvstoreDictFind(ctx->kvstate.kvs, ctx->kvstate.slot, key); - /* start a new key */ - defrag_later_current_key = head->value; - defrag_later_cursor = 0; - } - - /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = kvstoreDictFind(db->keys, slot, defrag_later_current_key); - key_defragged = server.stat_active_defrag_hits; - do { - int quit = 0; - if (defragLaterItem(de, &defrag_later_cursor, endtime, db->id)) - quit = 1; /* time is up, we didn't finish all the work */ - - /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields - * (if we have a lot of pointers in one hash bucket, or rehashing), - * check if we reached the time limit. */ - if (quit || (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64)) { - if (quit || ustime() > endtime) { - if (key_defragged != server.stat_active_defrag_hits) - server.stat_active_defrag_key_hits++; - else - server.stat_active_defrag_key_misses++; - return 1; - } - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } while (defrag_later_cursor); - if (key_defragged != server.stat_active_defrag_hits) + long long key_defragged = server.stat_active_defrag_hits; + bool timeout = (defragLaterItem(de, &defrag_later_cursor, endtime, ctx->db->id) == 1); + if (key_defragged != server.stat_active_defrag_hits) { server.stat_active_defrag_key_hits++; - else + } else { server.stat_active_defrag_key_misses++; - } while (1); -} + } -#define INTERPOLATE(x, x1, x2, y1, y2) ((y1) + ((x) - (x1)) * ((y2) - (y1)) / ((x2) - (x1))) -#define LIMIT(y, min, max) ((y) < (min) ? min : ((y) > (max) ? max : (y))) + if (timeout) break; -/* decide if defrag is needed, and at what CPU effort to invest in it */ -void computeDefragCycles(void) { - size_t frag_bytes; - float frag_pct = getAllocatorFragmentation(&frag_bytes); - /* If we're not already running, and below the threshold, exit. */ - if (!server.active_defrag_running) { - if (frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) return; + if (defrag_later_cursor == 0) { + // the item is finished, move on + listDelNode(defrag_later, head); + } + + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() > endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } } - /* Calculate the adaptive aggressiveness of the defrag based on the current - * fragmentation and configurations. */ - int cpu_pct = INTERPOLATE(frag_pct, server.active_defrag_threshold_lower, server.active_defrag_threshold_upper, - server.active_defrag_cycle_min, server.active_defrag_cycle_max); - cpu_pct = LIMIT(cpu_pct, server.active_defrag_cycle_min, server.active_defrag_cycle_max); + return (!defrag_later || listLength(defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE; +} - /* Normally we allow increasing the aggressiveness during a scan, but don't - * reduce it, since we should not lower the aggressiveness when fragmentation - * drops. But when a configuration is made, we should reconsider it. */ - if (cpu_pct > server.active_defrag_running || server.active_defrag_configuration_changed) { - server.active_defrag_running = cpu_pct; - server.active_defrag_configuration_changed = 0; - serverLog(LL_VERBOSE, "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", frag_pct, frag_bytes, - cpu_pct); + +/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if + * provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this + * function during the iteration. */ +static doneStatus defragStageKvstoreHelper(monotime endtime, + kvstore *kvs, + dictScanFunction scan_fn, + kvstoreHelperPreContinueFn precontinue_fn, + const dictDefragFunctions *defragfns, + void *privdata) { + static kvstoreIterState state; // STATIC - this persists + if (endtime == 0) { + // Starting the stage, set up the state information for this stage + state.kvs = kvs; + state.slot = KVS_SLOT_DEFRAG_LUT; + state.cursor = 0; + return DEFRAG_NOT_DONE; } -} + serverAssert(kvs == state.kvs); // Shouldn't change during the stage -/* Perform incremental defragmentation work from the serverCron. - * This works in a similar way to activeExpireCycle, in the sense that - * we do incremental work across calls. */ -void activeDefragCycle(void) { - static int slot = -1; - static int current_db = -1; - static int defrag_later_item_in_progress = 0; - static int defrag_stage = 0; - static unsigned long defrag_cursor = 0; - static serverDb *db = NULL; - static long long start_scan, start_stat; unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - long long start, timelimit, endtime; - mstime_t latency; - int all_stages_finished = 0; - int quit = 0; - if (!server.active_defrag_enabled) { - if (server.active_defrag_running) { - /* if active defrag was disabled mid-run, start from fresh next time. */ - server.active_defrag_running = 0; - server.active_defrag_configuration_changed = 0; - if (db) listEmpty(db->defrag_later); - defrag_later_current_key = NULL; - defrag_later_cursor = 0; - current_db = -1; - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; - db = NULL; - goto update_metrics; + if (state.slot == KVS_SLOT_DEFRAG_LUT) { + // Before we start scanning the kvstore, handle the main structures + do { + state.cursor = kvstoreDictLUTDefrag(kvs, state.cursor, dictDefragTables); + if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE; + } while (state.cursor != 0); + state.slot = KVS_SLOT_UNASSIGNED; + } + + while (true) { + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() >= endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; } - return; + + if (precontinue_fn) { + if (privdata) *(kvstoreIterState *)privdata = state; + if (precontinue_fn(endtime, privdata) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; + } + + if (!state.cursor) { + // If there's no cursor, we're ready to begin a new kvstore slot. + if (state.slot == KVS_SLOT_UNASSIGNED) { + state.slot = kvstoreGetFirstNonEmptyDictIndex(kvs); + } else { + state.slot = kvstoreGetNextNonEmptyDictIndex(kvs, state.slot); + } + + if (state.slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE; + } + + // Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. + if (privdata) *(kvstoreIterState *)privdata = state; + state.cursor = kvstoreDictScanDefrag(kvs, state.slot, state.cursor, + scan_fn, defragfns, privdata); } - if (hasActiveChildProcess()) return; /* Defragging memory while there's a fork will just do damage. */ + return DEFRAG_NOT_DONE; +} + - /* Once a second, check if the fragmentation justfies starting a scan - * or making it more aggressive. */ - run_with_period(1000) { - computeDefragCycles(); +// Note: target is a DB, (not a KVS like most stages) +static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) { + UNUSED(privdata); + serverDb *db = (serverDb *)target; + + static defragKeysCtx ctx; // STATIC - this persists + if (endtime == 0) { + ctx.db = db; + // Don't return yet. Call the helper with endtime==0 below. } + serverAssert(ctx.db == db); - /* Normally it is checked once a second, but when there is a configuration - * change, we want to check it as soon as possible. */ - if (server.active_defrag_configuration_changed) { - computeDefragCycles(); - server.active_defrag_configuration_changed = 0; + /* Note: for DB keys, we use the start/finish callback to fix an expires table entry if + * the main DB entry has been moved. */ + static const dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, // Handled by dbKeysScanCallback + .defragVal = NULL, // Handled by dbKeysScanCallback + .defragEntryStartCb = defragEntryStartCbForKeys, + .defragEntryFinishCb = defragEntryFinishCbForKeys}; + + return defragStageKvstoreHelper(endtime, db->keys, + dbKeysScanCallback, defragLaterStep, &defragfns, &ctx); +} + + +static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) { + UNUSED(privdata); + static const dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, // Not needed for expires (just a ref) + .defragVal = NULL, // Not needed for expires (no value) + }; + return defragStageKvstoreHelper(endtime, (kvstore *)target, + scanCallbackCountScanned, NULL, &defragfns, NULL); +} + + +static doneStatus defragStagePubsubKvstore(monotime endtime, void *target, void *privdata) { + // target is server.pubsub_channels or server.pubsubshard_channels + getClientChannelsFnWrapper *fnWrapper = privdata; + + static const dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, // Handled by defragPubsubScanCallback + .defragVal = NULL, // Not needed for expires (no value) + }; + defragPubSubCtx ctx; + ctx.getPubSubChannels = fnWrapper->fn; + return defragStageKvstoreHelper(endtime, (kvstore *)target, + defragPubsubScanCallback, NULL, &defragfns, &ctx); +} + + +static doneStatus defragLuaScripts(monotime endtime, void *target, void *privdata) { + UNUSED(target); + UNUSED(privdata); + if (endtime == 0) return DEFRAG_NOT_DONE; // required initialization + activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); + return DEFRAG_DONE; +} + + +static doneStatus defragModuleGlobals(monotime endtime, void *target, void *privdata) { + UNUSED(target); + UNUSED(privdata); + if (endtime == 0) return DEFRAG_NOT_DONE; // required initialization + moduleDefragGlobals(); + return DEFRAG_DONE; +} + + +static bool defragIsRunning(void) { + return (defrag.timeproc_id > 0); +} + + +static void addDefragStage(defragStageFn stage_fn, void *target, void *privdata) { + StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); + stage->stage_fn = stage_fn; + stage->target = target; + stage->privdata = privdata; + listAddNodeTail(defrag.remaining_stages, stage); +} + + +// Called at the end of a complete defrag cycle, or when defrag is terminated +static void endDefragCycle(bool normal_termination) { + if (normal_termination) { + // For normal termination, we expect... + serverAssert(!defrag.current_stage); + serverAssert(listLength(defrag.remaining_stages) == 0); + serverAssert(!defrag_later || listLength(defrag_later) == 0); + } else { + // Defrag is being terminated abnormally + aeDeleteTimeEvent(server.el, defrag.timeproc_id); + + if (defrag.current_stage) { + zfree(defrag.current_stage); + defrag.current_stage = NULL; + } + listSetFreeMethod(defrag.remaining_stages, zfree); } + defrag.timeproc_id = AE_DELETED_EVENT_ID; - if (!server.active_defrag_running) return; + listRelease(defrag.remaining_stages); + defrag.remaining_stages = NULL; - /* See activeExpireCycle for how timelimit is handled. */ - start = ustime(); - timelimit = 1000000 * server.active_defrag_running / server.hz / 100; - if (timelimit <= 0) timelimit = 1; - endtime = start + timelimit; - latencyStartMonitor(latency); + if (defrag_later) { + listRelease(defrag_later); + defrag_later = NULL; + } + defrag_later_cursor = 0; - dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc, - .defragEntryStartCb = defragEntryStartCbForKeys, - .defragEntryFinishCb = defragEntryFinishCbForKeys}; - do { - /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!defrag_stage && !defrag_cursor && (slot < 0)) { - /* finish any leftovers from previous db before moving to the next one */ - if (db && defragLaterStep(db, slot, endtime)) { - quit = 1; /* time is up, we didn't finish all the work */ - break; /* this will exit the function and we'll continue on the next cycle */ - } + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", + (int)elapsedMs(defrag.start_cycle), (int)(server.stat_active_defrag_hits - defrag.start_defrag_hits), + frag_pct, frag_bytes); - /* Move on to next database, and stop if we reached the last one. */ - if (++current_db >= server.dbnum) { - /* defrag other items not part of the db / keys */ - defragOtherGlobals(); - - long long now = ustime(); - size_t frag_bytes; - float frag_pct = getAllocatorFragmentation(&frag_bytes); - serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", - (int)((now - start_scan) / 1000), (int)(server.stat_active_defrag_hits - start_stat), - frag_pct, frag_bytes); - - start_scan = now; - current_db = -1; - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; - db = NULL; - server.active_defrag_running = 0; - - computeDefragCycles(); /* if another scan is needed, start it right away */ - if (server.active_defrag_running != 0 && ustime() < endtime) continue; - break; - } else if (current_db == 0) { - /* Start a scan from the first database. */ - start_scan = ustime(); - start_stat = server.stat_active_defrag_hits; - } + server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); + server.stat_last_active_defrag_time = 0; + server.active_defrag_cpu_percent = 0; +} + + +/* Must be called at the start of the timeProc as it measures the delay from the end of the previous + * timeProc invocation when performing the computation. */ +static int computeDefragCycleUs(void) { + long dutyCycleUs; + + int targetCpuPercent = server.active_defrag_cpu_percent; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + static int prevCpuPercent = 0; // STATIC - this persists + if (targetCpuPercent != prevCpuPercent) { + /* If the targetCpuPercent changes, the value might be different from when the last wait + * time was computed. In this case, don't consider wait time. (This is really only an + * issue in crazy tests that dramatically increase CPU while defrag is running.) */ + defrag.timeproc_end_time = 0; + prevCpuPercent = targetCpuPercent; + } - db = &server.db[current_db]; - kvstoreDictLUTDefrag(db->keys, dictDefragTables); - kvstoreDictLUTDefrag(db->expires, dictDefragTables); - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; + // Given when the last duty cycle ended, compute time needed to achieve the desired percentage. + if (defrag.timeproc_end_time == 0) { + // Either the first call to the timeProc, or we were paused for some reason. + defrag.timeproc_overage_us = 0; + dutyCycleUs = server.active_defrag_cycle_us; + } else { + long waitedUs = getMonotonicUs() - defrag.timeproc_end_time; + /* Given the elapsed wait time between calls, compute the necessary duty time needed to + * achieve the desired CPU percentage. + * With: D = duty time, W = wait time, P = percent + * Solve: D P + * ----- = ----- + * D + W 100 + * Solving for D: + * D = P * W / (100 - P) + * + * Note that dutyCycleUs addresses starvation. If the wait time was long, we will compensate + * with a proportionately long duty-cycle. This won't significantly affect perceived + * latency, because clients are already being impacted by the long cycle time which caused + * the starvation of the timer. */ + dutyCycleUs = targetCpuPercent * waitedUs / (100 - targetCpuPercent); + + // Also adjust for any accumulated overage(underage). + dutyCycleUs -= defrag.timeproc_overage_us; + defrag.timeproc_overage_us = 0; + + if (dutyCycleUs < server.active_defrag_cycle_us) { + /* We never reduce our cycle time, that would increase overhead. Instead, we track this + * as part of the overage, and increase wait time between cycles. */ + defrag.timeproc_overage_us = server.active_defrag_cycle_us - dutyCycleUs; + dutyCycleUs = server.active_defrag_cycle_us; } + } + return dutyCycleUs; +} - /* This array of structures holds the parameters for all defragmentation stages. */ - typedef struct defragStage { - kvstore *kvs; - dictScanFunction *scanfn; - void *privdata; - } defragStage; - defragStage defrag_stages[] = { - {db->keys, defragScanCallback, db}, - {db->expires, scanCallbackCountScanned, NULL}, - {server.pubsub_channels, defragPubsubScanCallback, - &(defragPubSubCtx){server.pubsub_channels, getClientPubSubChannels}}, - {server.pubsubshard_channels, defragPubsubScanCallback, - &(defragPubSubCtx){server.pubsubshard_channels, getClientPubSubShardChannels}}, - }; - do { - int num_stages = sizeof(defrag_stages) / sizeof(defrag_stages[0]); - serverAssert(defrag_stage < num_stages); - defragStage *current_stage = &defrag_stages[defrag_stage]; - - /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ - if (defragLaterStep(db, slot, endtime)) { - quit = 1; /* time is up, we didn't finish all the work */ - break; /* this will exit the function and we'll continue on the next cycle */ - } - if (!defrag_later_item_in_progress) { - /* Continue defragmentation from the previous stage. - * If slot is -1, it means this stage starts from the first non-empty slot. */ - if (slot == -1) slot = kvstoreGetFirstNonEmptyDictIndex(current_stage->kvs); - defrag_cursor = kvstoreDictScanDefrag(current_stage->kvs, slot, defrag_cursor, current_stage->scanfn, - &defragfns, &(defragCtx){current_stage->privdata, slot}); - } +/* Must be called at the end of the timeProc as it records the timeproc_end_time for use in the next + * computeDefragCycleUs computation. */ +static int computeDelayMs(monotime intendedEndtime) { + defrag.timeproc_end_time = getMonotonicUs(); + int overage = defrag.timeproc_end_time - intendedEndtime; + defrag.timeproc_overage_us += overage; // track over/under desired CPU + + int targetCpuPercent = server.active_defrag_cpu_percent; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + // Given the desired duty cycle, what inter-cycle delay do we need to achieve that? + long totalCycleTimeUs = server.active_defrag_cycle_us * 100 / targetCpuPercent; + long delayUs = totalCycleTimeUs - server.active_defrag_cycle_us; + // Only increase delay by the fraction of the overage that would be non-duty-cycle + delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100; // "overage" might be negative + if (delayUs < 0) delayUs = 0; + long delayMs = delayUs / 1000; // round down + return delayMs; +} - if (!defrag_cursor) { - /* Move to the next slot only if regular and large item scanning has been completed. */ - if (listLength(db->defrag_later) > 0) { - defrag_later_item_in_progress = 1; - continue; - } - /* Move to the next slot in the current stage. If we've reached the end, move to the next stage. */ - if ((slot = kvstoreGetNextNonEmptyDictIndex(current_stage->kvs, slot)) == -1) defrag_stage++; - defrag_later_item_in_progress = 0; - } +/* An independent time proc for defrag. While defrag is running, this is called much more often + * than the server cron. Frequent short calls provides low latency impact. */ +static long long activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) { + UNUSED(eventLoop); + UNUSED(id); + UNUSED(clientData); - /* Check if all defragmentation stages have been processed. - * If so, mark as finished and reset the stage counter to move on to next database. */ - if (defrag_stage == num_stages) { - all_stages_finished = 1; - defrag_stage = 0; - } + // This timer shouldn't be registered unless there's work to do. + serverAssert(defrag.current_stage || listLength(defrag.remaining_stages) > 0); - /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys - * (if we have a lot of pointers in one hash bucket or rehashing), - * check if we reached the time limit. - * But regardless, don't start a new db in this loop, this is because after - * the last db we call defragOtherGlobals, which must be done in one cycle */ - if (all_stages_finished || ++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64) { - /* Quit if all stages were finished or timeout. */ - if (all_stages_finished || ustime() > endtime) { - quit = 1; - break; - } - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } while (!all_stages_finished && !quit); - } while (!quit); + if (!server.active_defrag_enabled) { + // Defrag has been disabled while running + endDefragCycle(false); + return AE_NOMORE; + } + + if (hasActiveChildProcess()) { + // If there's a child process, pause the defrag, polling until the child completes. + defrag.timeproc_end_time = 0; // prevent starvation recovery + return 100; + } + + monotime starttime = getMonotonicUs(); + monotime endtime = starttime + computeDefragCycleUs(); + + mstime_t latency; + latencyStartMonitor(latency); + + if (!defrag.current_stage) { + defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages)); + listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages)); + // Initialize the stage with endtime==0 + doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata); + serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE + } + + doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata); + if (status == DEFRAG_DONE) { + zfree(defrag.current_stage); + defrag.current_stage = NULL; + } latencyEndMonitor(latency); latencyAddSampleIfNeeded("active-defrag-cycle", latency); -update_metrics: - if (server.active_defrag_running > 0) { - if (server.stat_last_active_defrag_time == 0) elapsedStart(&server.stat_last_active_defrag_time); - } else if (server.stat_last_active_defrag_time != 0) { - server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); - server.stat_last_active_defrag_time = 0; + if (defrag.current_stage || listLength(defrag.remaining_stages) > 0) { + return computeDelayMs(endtime); + } else { + endDefragCycle(true); + return AE_NOMORE; // Ends the timer proc + } +} + + +static void beginDefragCycle(void) { + serverAssert(!defragIsRunning()); + + serverAssert(defrag.remaining_stages == NULL); + defrag.remaining_stages = listCreate(); + + for (int dbid = 0; dbid < server.dbnum; dbid++) { + serverDb *db = &server.db[dbid]; + addDefragStage(defragStageDbKeys, db, NULL); + addDefragStage(defragStageExpiresKvstore, db->expires, NULL); } + + static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels}; + static getClientChannelsFnWrapper getClientPubSubShardChannelsFn = {getClientPubSubShardChannels}; + addDefragStage(defragStagePubsubKvstore, server.pubsub_channels, &getClientPubSubChannelsFn); + addDefragStage(defragStagePubsubKvstore, server.pubsubshard_channels, &getClientPubSubShardChannelsFn); + + addDefragStage(defragLuaScripts, NULL, NULL); + addDefragStage(defragModuleGlobals, NULL, NULL); + + defrag.current_stage = NULL; + defrag.start_cycle = getMonotonicUs(); + defrag.start_defrag_hits = server.stat_active_defrag_hits; + defrag.timeproc_end_time = 0; + defrag.timeproc_overage_us = 0; + defrag.timeproc_id = aeCreateTimeEvent(server.el, 0, activeDefragTimeProc, NULL, NULL); + + elapsedStart(&server.stat_last_active_defrag_time); +} + + +#define INTERPOLATE(x, x1, x2, y1, y2) ((y1) + ((x) - (x1)) * ((y2) - (y1)) / ((x2) - (x1))) +#define LIMIT(y, min, max) ((y) < (min) ? min : ((y) > (max) ? max : (y))) + +/* decide if defrag is needed, and at what CPU effort to invest in it */ +static void updateDefragCpuPercent(void) { + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + if (server.active_defrag_cpu_percent == 0) { + if (frag_pct < server.active_defrag_threshold_lower || + frag_bytes < server.active_defrag_ignore_bytes) return; + } + + /* Calculate the adaptive aggressiveness of the defrag based on the current + * fragmentation and configurations. */ + int cpu_pct = INTERPOLATE(frag_pct, server.active_defrag_threshold_lower, server.active_defrag_threshold_upper, + server.active_defrag_cpu_min, server.active_defrag_cpu_max); + cpu_pct = LIMIT(cpu_pct, server.active_defrag_cpu_min, server.active_defrag_cpu_max); + + /* Normally we allow increasing the aggressiveness during a scan, but don't + * reduce it, since we should not lower the aggressiveness when fragmentation + * drops. But when a configuration is made, we should reconsider it. */ + if (cpu_pct > server.active_defrag_cpu_percent || server.active_defrag_configuration_changed) { + server.active_defrag_configuration_changed = 0; + if (defragIsRunning()) { + serverLog(LL_VERBOSE, "Changing active defrag CPU, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } else { + serverLog(LL_VERBOSE, "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } + server.active_defrag_cpu_percent = cpu_pct; + } +} + + +void monitorActiveDefrag(void) { + if (!server.active_defrag_enabled) return; + + /* Defrag gets paused while a child process is active. So there's no point in starting a new + * cycle or adjusting the CPU percentage for an existing cycle. */ + if (hasActiveChildProcess()) return; + + updateDefragCpuPercent(); + + if (server.active_defrag_cpu_percent > 0 && !defragIsRunning()) beginDefragCycle(); } #else /* HAVE_DEFRAG */ -void activeDefragCycle(void) { +void monitorActiveDefrag(void) { /* Not implemented yet. */ } diff --git a/src/dict.c b/src/dict.c index f164820584..e37c01cf11 100644 --- a/src/dict.c +++ b/src/dict.c @@ -1309,7 +1309,7 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) { /* Reallocate the dictEntry, key and value allocations in a bucket using the * provided allocation functions in order to defrag them. */ -static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) { +static void dictDefragBucket(dictEntry **bucketref, const dictDefragFunctions *defragfns, void *privdata) { dictDefragAllocFunction *defragalloc = defragfns->defragAlloc; dictDefragAllocFunction *defragkey = defragfns->defragKey; dictDefragAllocFunction *defragval = defragfns->defragVal; @@ -1487,7 +1487,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri * where NULL means that no reallocation happened and the old memory is still * valid. */ unsigned long -dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) { +dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata) { int htidx0, htidx1; const dictEntry *de, *next; unsigned long m0, m1; diff --git a/src/dict.h b/src/dict.h index 1c9e059baa..a494501c35 100644 --- a/src/dict.h +++ b/src/dict.h @@ -241,7 +241,7 @@ void dictSetHashFunctionSeed(uint8_t *seed); uint8_t *dictGetHashFunctionSeed(void); unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata); unsigned long -dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); +dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata); uint64_t dictGetHash(dict *d, const void *key); void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size); diff --git a/src/kvstore.c b/src/kvstore.c index e92af03784..ec0445c2ea 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -737,7 +737,7 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, - dictDefragFunctions *defragfns, + const dictDefragFunctions *defragfns, void *privdata) { dict *d = kvstoreGetDict(kvs, didx); if (!d) return 0; @@ -748,14 +748,27 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs, * within dict, it only reallocates the memory used by the dict structure itself using * the provided allocation function. This feature was added for the active defrag feature. * - * The 'defragfn' callback is called with a reference to the dict - * that callback can reallocate. */ -void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) { - for (int didx = 0; didx < kvs->num_dicts; didx++) { + * With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time + * to execute. A "cursor" is used to perform the operation iteratively. When first called, a + * cursor value of 0 should be provided. The return value is an updated cursor which should be + * provided on the next iteration. The operation is complete when 0 is returned. + * + * The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */ +unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) { + for (int didx = cursor; didx < kvs->num_dicts; didx++) { dict **d = kvstoreGetDictRef(kvs, didx), *newd; if (!*d) continue; + + listNode *rehashing_node = NULL; + if (listLength(kvs->rehashing) > 0) { + rehashing_node = ((kvstoreDictMetadata *)dictMetadata(*d))->rehashing_node; + } + if ((newd = defragfn(*d))) *d = newd; + if (rehashing_node) listNodeValue(rehashing_node) = *d; + return (didx + 1); } + return 0; } uint64_t kvstoreGetHash(kvstore *kvs, const void *key) { diff --git a/src/kvstore.h b/src/kvstore.h index 81a0d9a96e..00ec472e73 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -68,10 +68,10 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, - dictDefragFunctions *defragfns, + const dictDefragFunctions *defragfns, void *privdata); typedef dict *(kvstoreDictLUTDefragFunction)(dict *d); -void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn); +unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn); void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key); dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key); dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing); diff --git a/src/server.c b/src/server.c index 866023a455..694f934074 100644 --- a/src/server.c +++ b/src/server.c @@ -1062,8 +1062,8 @@ void databasesCron(void) { } } - /* Defrag keys gradually. */ - activeDefragCycle(); + /* Start active defrag cycle or adjust defrag CPU if needed. */ + monitorActiveDefrag(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad @@ -1532,22 +1532,6 @@ void whileBlockedCron(void) { mstime_t latency; latencyStartMonitor(latency); - /* In some cases we may be called with big intervals, so we may need to do - * extra work here. This is because some of the functions in serverCron rely - * on the fact that it is performed every 10 ms or so. For instance, if - * activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we - * need to call it multiple times. */ - long hz_ms = 1000 / server.hz; - while (server.blocked_last_cron < server.mstime) { - /* Defrag keys gradually. */ - activeDefragCycle(); - - server.blocked_last_cron += hz_ms; - - /* Increment cronloop so that run_with_period works. */ - server.cronloops++; - } - /* Other cron jobs do not need to be done in a loop. No need to check * server.blocked_last_cron since we have an early exit at the top. */ @@ -2041,7 +2025,7 @@ void initServerConfig(void) { server.aof_flush_postponed_start = 0; server.aof_last_incr_size = 0; server.aof_last_incr_fsync_offset = 0; - server.active_defrag_running = 0; + server.active_defrag_cpu_percent = 0; server.active_defrag_configuration_changed = 0; server.notify_keyspace_events = 0; server.blocked_clients = 0; @@ -2655,8 +2639,6 @@ void initServer(void) { server.db[j].watched_keys = dictCreate(&keylistDictType); server.db[j].id = j; server.db[j].avg_ttl = 0; - server.db[j].defrag_later = listCreate(); - listSetFreeMethod(server.db[j].defrag_later, (void (*)(void *))sdsfree); } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ /* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which @@ -5610,7 +5592,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "mem_aof_buffer:%zu\r\n", mh->aof_buffer, "mem_allocator:%s\r\n", ZMALLOC_LIB, "mem_overhead_db_hashtable_rehashing:%zu\r\n", mh->overhead_db_hashtable_rehashing, - "active_defrag_running:%d\r\n", server.active_defrag_running, + "active_defrag_running:%d\r\n", server.active_defrag_cpu_percent, "lazyfree_pending_objects:%zu\r\n", lazyfreeGetPendingObjectsCount(), "lazyfreed_objects:%zu\r\n", lazyfreeGetFreedObjectsCount())); freeMemoryOverheadData(mh); diff --git a/src/server.h b/src/server.h index 1b8f08833f..421068bc9a 100644 --- a/src/server.h +++ b/src/server.h @@ -950,7 +950,6 @@ typedef struct serverDb { int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ - list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } serverDb; /* forward declaration for functions ctx */ @@ -1669,7 +1668,7 @@ struct valkeyServer { int last_sig_received; /* Indicates the last SIGNAL received, if any (e.g., SIGINT or SIGTERM). */ int shutdown_flags; /* Flags passed to prepareForShutdown(). */ int activerehashing; /* Incremental rehash in serverCron() */ - int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ + int active_defrag_cpu_percent; /* Current desired CPU percentage for active defrag */ char *pidfile; /* PID file path */ int arch_bits; /* 32 or 64 depending on sizeof(long) */ int cronloops; /* Number of times the cron function run */ @@ -1868,8 +1867,9 @@ struct valkeyServer { size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */ int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ - int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ - int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ + int active_defrag_cpu_min; /* minimal effort for defrag in CPU percentage */ + int active_defrag_cpu_max; /* maximal effort for defrag in CPU percentage */ + int active_defrag_cycle_us; /* standard duration of defrag cycle */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ @@ -3312,7 +3312,7 @@ void bytesToHuman(char *s, size_t size, unsigned long long n); void enterExecutionUnit(int update_cached_time, long long us); void exitExecutionUnit(void); void resetServerStats(void); -void activeDefragCycle(void); +void monitorActiveDefrag(void); unsigned int getLRUClock(void); unsigned int LRU_CLOCK(void); const char *evictPolicyToString(void); diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index d5a6a6efe2..745f59f47f 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -89,6 +89,8 @@ run_solo {defrag} { r config set active-defrag-cycle-min 65 r config set active-defrag-cycle-max 75 + after 1000 ;# Give defrag time to work (might be multiple cycles) + # Wait for the active defrag to stop working. wait_for_condition 2000 100 { [s active_defrag_running] eq 0 @@ -138,12 +140,13 @@ run_solo {defrag} { r config resetstat r config set key-load-delay -25 ;# sleep on average 1/25 usec r debug loadaof + after 1000 ;# give defrag a chance to work before turning it off r config set activedefrag no + # measure hits and misses right after aof loading set misses [s active_defrag_misses] set hits [s active_defrag_hits] - after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] set max_latency 0 foreach event [r latency latest] { @@ -203,7 +206,7 @@ run_solo {defrag} { $rd read ; # Discard script load replies $rd read ; # Discard set replies } - after 120 ;# serverCron only updates the info once in 100ms + after 1000 ;# give defrag some time to work if {$::verbose} { puts "used [s allocator_allocated]" puts "rss [s allocator_active]" @@ -239,6 +242,8 @@ run_solo {defrag} { fail "defrag not started." } + after 1000 ;# Give defrag time to work (might be multiple cycles) + # wait for the active defrag to stop working wait_for_condition 500 100 { [s active_defrag_running] eq 0 @@ -361,6 +366,8 @@ run_solo {defrag} { fail "defrag not started." } + after 1000 ;# Give defrag some time to work (it may run several cycles) + # wait for the active defrag to stop working wait_for_condition 500 100 { [s active_defrag_running] eq 0 @@ -430,7 +437,6 @@ run_solo {defrag} { $rd read ; # Discard set replies } - after 120 ;# serverCron only updates the info once in 100ms if {$::verbose} { puts "used [s allocator_allocated]" puts "rss [s allocator_active]" @@ -466,6 +472,8 @@ run_solo {defrag} { fail "defrag not started." } + after 1000 ;# Give defrag some time to work (it may run several cycles) + # wait for the active defrag to stop working wait_for_condition 500 100 { [s active_defrag_running] eq 0 @@ -475,6 +483,7 @@ run_solo {defrag} { puts [r memory malloc-stats] fail "defrag didn't stop." } + r config set activedefrag no ;# disable before we accidentally create more frag # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -561,6 +570,8 @@ run_solo {defrag} { fail "defrag not started." } + after 1000 ;# Give defrag some time to work (it may run several cycles) + # wait for the active defrag to stop working wait_for_condition 500 100 { [s active_defrag_running] eq 0 @@ -685,6 +696,8 @@ run_solo {defrag} { fail "defrag not started." } + after 1000 ;# Give defrag some time to work (it may run several cycles) + # wait for the active defrag to stop working wait_for_condition 500 100 { [s active_defrag_running] eq 0 diff --git a/valkey.conf b/valkey.conf index f9d102a95d..6935013057 100644 --- a/valkey.conf +++ b/valkey.conf @@ -2300,9 +2300,8 @@ rdb-save-incremental-fsync yes # Fragmentation is a natural process that happens with every allocator (but # less so with Jemalloc, fortunately) and certain workloads. Normally a server # restart is needed in order to lower the fragmentation, or at least to flush -# away all the data and create it again. However thanks to this feature -# implemented by Oran Agra, this process can happen at runtime -# in a "hot" way, while the server is running. +# away all the data and create it again. However thanks to this feature, this +# process can happen at runtime in a "hot" way, while the server is running. # # Basically when the fragmentation is over a certain level (see the # configuration options below) the server will start to create new copies of the @@ -2341,17 +2340,24 @@ rdb-save-incremental-fsync yes # active-defrag-threshold-upper 100 # Minimal effort for defrag in CPU percentage, to be used when the lower -# threshold is reached +# threshold is reached. +# Note: this is not actually a cycle time, but is an overall CPU percentage # active-defrag-cycle-min 1 # Maximal effort for defrag in CPU percentage, to be used when the upper -# threshold is reached +# threshold is reached. +# Note: this is not actually a cycle time, but is an overall CPU percentage # active-defrag-cycle-max 25 # Maximum number of set/hash/zset/list fields that will be processed from # the main dictionary scan # active-defrag-max-scan-fields 1000 +# The time spent (in microseconds) of the periodic active defrag process. This +# affects the latency impact of active defrag on client commands. Smaller numbers +# will result in less latency impact at the cost of increased defrag overhead. +# active-defrag-cycle-us 500 + # Jemalloc background thread for purging will be enabled by default jemalloc-bg-thread yes