From 32f2c73cb5c93516ac5abc5259023caf75326b6a Mon Sep 17 00:00:00 2001 From: Jim Brunner Date: Thu, 12 Dec 2024 14:55:57 -0800 Subject: [PATCH] defrag: eliminate persistent kvstore pointer and edge case fixes (#1430) This update addresses several issues in defrag: 1. In the defrag redesign (https://github.com/valkey-io/valkey/pull/1242), a bug was introduced where `server.cronloops` was no longer being incremented in the `whileBlockedCron()`. This resulted in some memory statistics not being updated while blocked. 2. In the test case for AOF loading, we were seeing errors due to defrag latencies. However, running the math, the latencies are justified given the extremely high CPU target of the testcase. Adjusted the expected latency check to allow longer latencies for this case where defrag is undergoing starvation while AOF loading is in progress. 3. A "stage" is passed a "target". For the main dictionary and expires, we were passing in a `kvstore*`. However, on flushall or swapdb, the pointer may change. It's safer and more stable to use an index for the DB (a DBID). Then if the pointer changes, we can detect the change, and simply abort the stage. (If there's still fragmentation to deal with, we'll pick it up again on the next cycle.) 4. We always start a new stage on a new defrag cycle. This gives the new stage time to run, and prevents latency issues for certain stages which don't operate incrementally. However, often several stages will require almost no work, and this will leave a chunk of our CPU allotment unused. This is mainly an issue in starvation situations (like AOF loading or LUA script) - where defrag is running infrequently, with a large duty-cycle. This change allows a new stage to be initiated if we still have a standard duty-cycle remaining. (This can happen during starvation situations where the planned duty cycle is larger than the standard cycle. Most likely this isn't a concern for real scenarios, but it was observed in testing.) 5. Minor comment correction in `server.h` Signed-off-by: Jim Brunner --- src/defrag.c | 67 ++++++++++++++++++++++-------------- src/server.c | 6 ++++ src/server.h | 3 +- tests/unit/memefficiency.tcl | 8 +++-- 4 files changed, 53 insertions(+), 31 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index be7ff07510..8c1ad29de2 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -121,7 +121,7 @@ typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdat // Private data for main dictionary keys typedef struct { kvstoreIterState kvstate; - serverDb *db; + int dbid; } defragKeysCtx; static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); @@ -736,7 +736,7 @@ static void defragModule(serverDb *db, robj *obj) { /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. */ static void defragKey(defragKeysCtx *ctx, robj **elemref) { - serverDb *db = ctx->db; + serverDb *db = &server.db[ctx->dbid]; int slot = ctx->kvstate.slot; robj *newob, *ob; unsigned char *newzl; @@ -920,7 +920,7 @@ static doneStatus defragLaterStep(monotime endtime, void *privdata) { robj *ob = found; long long key_defragged = server.stat_active_defrag_hits; - bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->db->id) == 1); + bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->dbid) == 1); if (key_defragged != server.stat_active_defrag_hits) { server.stat_active_defrag_key_hits++; } else { @@ -963,7 +963,10 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, state.cursor = 0; return DEFRAG_NOT_DONE; } - serverAssert(kvs == state.kvs); // Shouldn't change during the stage + if (kvs != state.kvs) { + // There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. + return DEFRAG_DONE; + } unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; @@ -1013,26 +1016,30 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, } -// Note: target is a DB, (not a KVS like most stages) +// Target is a DBID static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) { UNUSED(privdata); - serverDb *db = (serverDb *)target; + int dbid = (uintptr_t)target; + serverDb *db = &server.db[dbid]; static defragKeysCtx ctx; // STATIC - this persists if (endtime == 0) { - ctx.db = db; + ctx.dbid = dbid; // Don't return yet. Call the helper with endtime==0 below. } - serverAssert(ctx.db == db); + serverAssert(ctx.dbid == dbid); return defragStageKvstoreHelper(endtime, db->keys, dbKeysScanCallback, defragLaterStep, &ctx); } +// Target is a DBID static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) { UNUSED(privdata); - return defragStageKvstoreHelper(endtime, (kvstore *)target, + int dbid = (uintptr_t)target; + serverDb *db = &server.db[dbid]; + return defragStageKvstoreHelper(endtime, db->expires, scanHashtableCallbackCountScanned, NULL, NULL); } @@ -1226,29 +1233,38 @@ static long long activeDefragTimeProc(struct aeEventLoop *eventLoop, long long i } monotime starttime = getMonotonicUs(); - monotime endtime = starttime + computeDefragCycleUs(); + int dutyCycleUs = computeDefragCycleUs(); + monotime endtime = starttime + dutyCycleUs; + bool haveMoreWork = true; mstime_t latency; latencyStartMonitor(latency); - if (!defrag.current_stage) { - defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages)); - listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages)); - // Initialize the stage with endtime==0 - doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata); - serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE - } + do { + if (!defrag.current_stage) { + defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages)); + listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages)); + // Initialize the stage with endtime==0 + doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata); + serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE + } - doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata); - if (status == DEFRAG_DONE) { - zfree(defrag.current_stage); - defrag.current_stage = NULL; - } + doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata); + if (status == DEFRAG_DONE) { + zfree(defrag.current_stage); + defrag.current_stage = NULL; + } + + haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0); + /* If we've completed a stage early, and still have a standard time allotment remaining, + * we'll start another stage. This can happen when defrag is running infrequently, and + * starvation protection has increased the duty-cycle. */ + } while (haveMoreWork && getMonotonicUs() <= endtime - server.active_defrag_cycle_us); latencyEndMonitor(latency); latencyAddSampleIfNeeded("active-defrag-cycle", latency); - if (defrag.current_stage || listLength(defrag.remaining_stages) > 0) { + if (haveMoreWork) { return computeDelayMs(endtime); } else { endDefragCycle(true); @@ -1287,9 +1303,8 @@ static void beginDefragCycle(void) { defrag.remaining_stages = listCreate(); for (int dbid = 0; dbid < server.dbnum; dbid++) { - serverDb *db = &server.db[dbid]; - addDefragStage(defragStageDbKeys, db, NULL); - addDefragStage(defragStageExpiresKvstore, db->expires, NULL); + addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL); + addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL); } static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels}; diff --git a/src/server.c b/src/server.c index 1e38b5ac69..8e65b1f5cd 100644 --- a/src/server.c +++ b/src/server.c @@ -1669,6 +1669,12 @@ void whileBlockedCron(void) { * latency monitor if this function is called too often. */ if (server.blocked_last_cron >= server.mstime) return; + /* Increment server.cronloops so that run_with_period works. */ + long hz_ms = 1000 / server.hz; + int cronloops = (server.mstime - server.blocked_last_cron + (hz_ms - 1)) / hz_ms; // rounding up + server.blocked_last_cron += cronloops * hz_ms; + server.cronloops += cronloops; + mstime_t latency; latencyStartMonitor(latency); diff --git a/src/server.h b/src/server.h index 14a16593b0..e9332233aa 100644 --- a/src/server.h +++ b/src/server.h @@ -1900,8 +1900,7 @@ struct valkeyServer { int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */ int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */ int jemalloc_bg_thread; /* Enable jemalloc background thread */ - int active_defrag_configuration_changed; /* defrag configuration has been changed and need to reconsider - * active_defrag_running in computeDefragCycles. */ + int active_defrag_configuration_changed; /* Config changed; need to recompute active_defrag_cpu_percent. */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */ int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index ce74b7c618..78a68a682d 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -172,10 +172,12 @@ run_solo {defrag} { # make sure the defragger did enough work to keep the fragmentation low during loading. # we cannot check that it went all the way down, since we don't wait for full defrag cycle to complete. assert {$frag < 1.4} - # since the AOF contains simple (fast) SET commands (and the cron during loading runs every 1024 commands), - # it'll still not block the loading for long periods of time. + # The AOF contains simple (fast) SET commands (and the cron during loading runs every 1024 commands). + # Even so, defrag can get starved for periods exceeding 100ms. Using 200ms for test stability, and + # a 75% CPU requirement (as set above), we should allow up to 600ms latency + # (as total time = 200 non duty + 600 duty = 800ms, and 75% of 800ms is 600ms). if {!$::no_latency} { - assert {$max_latency <= 40} + assert {$max_latency <= 600} } } } ;# Active defrag - AOF loading