Skip to content

Commit

Permalink
defrag: eliminate persistent kvstore pointer and edge case fixes (val…
Browse files Browse the repository at this point in the history
…key-io#1430)

This update addresses several issues in defrag:
1. In the defrag redesign
(valkey-io#1242), a bug was introduced
where `server.cronloops` was no longer being incremented in the
`whileBlockedCron()`. This resulted in some memory statistics not being
updated while blocked.
2. In the test case for AOF loading, we were seeing errors due to defrag
latencies. However, running the math, the latencies are justified given
the extremely high CPU target of the testcase. Adjusted the expected
latency check to allow longer latencies for this case where defrag is
undergoing starvation while AOF loading is in progress.
3. A "stage" is passed a "target". For the main dictionary and expires,
we were passing in a `kvstore*`. However, on flushall or swapdb, the
pointer may change. It's safer and more stable to use an index for the
DB (a DBID). Then if the pointer changes, we can detect the change, and
simply abort the stage. (If there's still fragmentation to deal with,
we'll pick it up again on the next cycle.)
4. We always start a new stage on a new defrag cycle. This gives the new
stage time to run, and prevents latency issues for certain stages which
don't operate incrementally. However, often several stages will require
almost no work, and this will leave a chunk of our CPU allotment unused.
This is mainly an issue in starvation situations (like AOF loading or
LUA script) - where defrag is running infrequently, with a large
duty-cycle. This change allows a new stage to be initiated if we still
have a standard duty-cycle remaining. (This can happen during starvation
situations where the planned duty cycle is larger than the standard
cycle. Most likely this isn't a concern for real scenarios, but it was
observed in testing.)
5. Minor comment correction in `server.h`

Signed-off-by: Jim Brunner <[email protected]>
  • Loading branch information
JimB123 authored Dec 12, 2024
1 parent 3a1043a commit 32f2c73
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 31 deletions.
67 changes: 41 additions & 26 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdat
// Private data for main dictionary keys
typedef struct {
kvstoreIterState kvstate;
serverDb *db;
int dbid;
} defragKeysCtx;
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");

Expand Down Expand Up @@ -736,7 +736,7 @@ static void defragModule(serverDb *db, robj *obj) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverDb *db = ctx->db;
serverDb *db = &server.db[ctx->dbid];
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
Expand Down Expand Up @@ -920,7 +920,7 @@ static doneStatus defragLaterStep(monotime endtime, void *privdata) {
robj *ob = found;

long long key_defragged = server.stat_active_defrag_hits;
bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->db->id) == 1);
bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->dbid) == 1);
if (key_defragged != server.stat_active_defrag_hits) {
server.stat_active_defrag_key_hits++;
} else {
Expand Down Expand Up @@ -963,7 +963,10 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
state.cursor = 0;
return DEFRAG_NOT_DONE;
}
serverAssert(kvs == state.kvs); // Shouldn't change during the stage
if (kvs != state.kvs) {
// There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage.
return DEFRAG_DONE;
}

unsigned int iterations = 0;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
Expand Down Expand Up @@ -1013,26 +1016,30 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
}


// Note: target is a DB, (not a KVS like most stages)
// Target is a DBID
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
serverDb *db = (serverDb *)target;
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];

static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
ctx.db = db;
ctx.dbid = dbid;
// Don't return yet. Call the helper with endtime==0 below.
}
serverAssert(ctx.db == db);
serverAssert(ctx.dbid == dbid);

return defragStageKvstoreHelper(endtime, db->keys,
dbKeysScanCallback, defragLaterStep, &ctx);
}


// Target is a DBID
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
return defragStageKvstoreHelper(endtime, (kvstore *)target,
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
return defragStageKvstoreHelper(endtime, db->expires,
scanHashtableCallbackCountScanned, NULL, NULL);
}

Expand Down Expand Up @@ -1226,29 +1233,38 @@ static long long activeDefragTimeProc(struct aeEventLoop *eventLoop, long long i
}

monotime starttime = getMonotonicUs();
monotime endtime = starttime + computeDefragCycleUs();
int dutyCycleUs = computeDefragCycleUs();
monotime endtime = starttime + dutyCycleUs;
bool haveMoreWork = true;

mstime_t latency;
latencyStartMonitor(latency);

if (!defrag.current_stage) {
defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages));
listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages));
// Initialize the stage with endtime==0
doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata);
serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE
}
do {
if (!defrag.current_stage) {
defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages));
listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages));
// Initialize the stage with endtime==0
doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata);
serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE
}

doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata);
if (status == DEFRAG_DONE) {
zfree(defrag.current_stage);
defrag.current_stage = NULL;
}
doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata);
if (status == DEFRAG_DONE) {
zfree(defrag.current_stage);
defrag.current_stage = NULL;
}

haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0);
/* If we've completed a stage early, and still have a standard time allotment remaining,
* we'll start another stage. This can happen when defrag is running infrequently, and
* starvation protection has increased the duty-cycle. */
} while (haveMoreWork && getMonotonicUs() <= endtime - server.active_defrag_cycle_us);

latencyEndMonitor(latency);
latencyAddSampleIfNeeded("active-defrag-cycle", latency);

if (defrag.current_stage || listLength(defrag.remaining_stages) > 0) {
if (haveMoreWork) {
return computeDelayMs(endtime);
} else {
endDefragCycle(true);
Expand Down Expand Up @@ -1287,9 +1303,8 @@ static void beginDefragCycle(void) {
defrag.remaining_stages = listCreate();

for (int dbid = 0; dbid < server.dbnum; dbid++) {
serverDb *db = &server.db[dbid];
addDefragStage(defragStageDbKeys, db, NULL);
addDefragStage(defragStageExpiresKvstore, db->expires, NULL);
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
}

static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels};
Expand Down
6 changes: 6 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,12 @@ void whileBlockedCron(void) {
* latency monitor if this function is called too often. */
if (server.blocked_last_cron >= server.mstime) return;

/* Increment server.cronloops so that run_with_period works. */
long hz_ms = 1000 / server.hz;
int cronloops = (server.mstime - server.blocked_last_cron + (hz_ms - 1)) / hz_ms; // rounding up
server.blocked_last_cron += cronloops * hz_ms;
server.cronloops += cronloops;

mstime_t latency;
latencyStartMonitor(latency);

Expand Down
3 changes: 1 addition & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1900,8 +1900,7 @@ struct valkeyServer {
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */
int jemalloc_bg_thread; /* Enable jemalloc background thread */
int active_defrag_configuration_changed; /* defrag configuration has been changed and need to reconsider
* active_defrag_running in computeDefragCycles. */
int active_defrag_configuration_changed; /* Config changed; need to recompute active_defrag_cpu_percent. */
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/memefficiency.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ run_solo {defrag} {
# make sure the defragger did enough work to keep the fragmentation low during loading.
# we cannot check that it went all the way down, since we don't wait for full defrag cycle to complete.
assert {$frag < 1.4}
# since the AOF contains simple (fast) SET commands (and the cron during loading runs every 1024 commands),
# it'll still not block the loading for long periods of time.
# The AOF contains simple (fast) SET commands (and the cron during loading runs every 1024 commands).
# Even so, defrag can get starved for periods exceeding 100ms. Using 200ms for test stability, and
# a 75% CPU requirement (as set above), we should allow up to 600ms latency
# (as total time = 200 non duty + 600 duty = 800ms, and 75% of 800ms is 600ms).
if {!$::no_latency} {
assert {$max_latency <= 40}
assert {$max_latency <= 600}
}
}
} ;# Active defrag - AOF loading
Expand Down

0 comments on commit 32f2c73

Please sign in to comment.