Skip to content

Commit

Permalink
Refactor of ActiveDefrag to reduce latencies
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Brunner <[email protected]>
  • Loading branch information
JimB123 committed Nov 19, 2024
1 parent a37dee4 commit a943d5e
Show file tree
Hide file tree
Showing 11 changed files with 691 additions and 460 deletions.
2 changes: 1 addition & 1 deletion src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->timeEventNextId = 1;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
Expand Down
5 changes: 3 additions & 2 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3201,10 +3201,11 @@ standardConfig static_configs[] = {
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-keepalive", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcpkeepalive, 300, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-migration-barrier", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_migration_barrier, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-threshold-lower", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_lower, 10, INTEGER_CONFIG, NULL, NULL), /* Default: don't defrag when fragmentation is below 10% */
createIntConfig("active-defrag-threshold-upper", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_upper, 100, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: maximum defrag force at 100% fragmentation */
createIntConfig("active-defrag-cycle-us", NULL, MODIFIABLE_CONFIG, 0, 100000, server.active_defrag_cycle_us, 500, INTEGER_CONFIG, NULL, updateDefragConfiguration),
createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-priority", "slave-priority", MODIFIABLE_CONFIG, 0, INT_MAX, server.replica_priority, 100, INTEGER_CONFIG, NULL, NULL),
Expand Down
1,040 changes: 628 additions & 412 deletions src/defrag.c

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {

/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) {
static void dictDefragBucket(dictEntry **bucketref, const dictDefragFunctions *defragfns, void *privdata) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
Expand Down Expand Up @@ -1487,7 +1487,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri
* where NULL means that no reallocation happened and the old memory is still
* valid. */
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) {
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata) {
int htidx0, htidx1;
const dictEntry *de, *next;
unsigned long m0, m1;
Expand Down
2 changes: 1 addition & 1 deletion src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);

Expand Down
23 changes: 18 additions & 5 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d) return 0;
Expand All @@ -748,14 +748,27 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
* within dict, it only reallocates the memory used by the dict structure itself using
* the provided allocation function. This feature was added for the active defrag feature.
*
* The 'defragfn' callback is called with a reference to the dict
* that callback can reallocate. */
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = 0; didx < kvs->num_dicts; didx++) {
* With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time
* to execute. A "cursor" is used to perform the operation iteratively. When first called, a
* cursor value of 0 should be provided. The return value is an updated cursor which should be
* provided on the next iteration. The operation is complete when 0 is returned.
*
* The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = cursor; didx < kvs->num_dicts; didx++) {
dict **d = kvstoreGetDictRef(kvs, didx), *newd;
if (!*d) continue;

listNode *rehashing_node = NULL;
if (listLength(kvs->rehashing) > 0) {
rehashing_node = ((kvstoreDictMetadata *)dictMetadata(*d))->rehashing_node;
}

if ((newd = defragfn(*d))) *d = newd;
if (rehashing_node) listNodeValue(rehashing_node) = *d;
return (didx + 1);
}
return 0;
}

uint64_t kvstoreGetHash(kvstore *kvs, const void *key) {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata);
typedef dict *(kvstoreDictLUTDefragFunction)(dict *d);
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn);
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn);
void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key);
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key);
dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing);
Expand Down
26 changes: 4 additions & 22 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1062,8 +1062,8 @@ void databasesCron(void) {
}
}

/* Defrag keys gradually. */
activeDefragCycle();
/* Start active defrag cycle or adjust defrag CPU if needed. */
monitorActiveDefrag();

/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
Expand Down Expand Up @@ -1532,22 +1532,6 @@ void whileBlockedCron(void) {
mstime_t latency;
latencyStartMonitor(latency);

/* In some cases we may be called with big intervals, so we may need to do
* extra work here. This is because some of the functions in serverCron rely
* on the fact that it is performed every 10 ms or so. For instance, if
* activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we
* need to call it multiple times. */
long hz_ms = 1000 / server.hz;
while (server.blocked_last_cron < server.mstime) {
/* Defrag keys gradually. */
activeDefragCycle();

server.blocked_last_cron += hz_ms;

/* Increment cronloop so that run_with_period works. */
server.cronloops++;
}

/* Other cron jobs do not need to be done in a loop. No need to check
* server.blocked_last_cron since we have an early exit at the top. */

Expand Down Expand Up @@ -2041,7 +2025,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.active_defrag_running = 0;
server.active_defrag_cpu_percent = 0;
server.active_defrag_configuration_changed = 0;
server.notify_keyspace_events = 0;
server.blocked_clients = 0;
Expand Down Expand Up @@ -2655,8 +2639,6 @@ void initServer(void) {
server.db[j].watched_keys = dictCreate(&keylistDictType);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later, (void (*)(void *))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
Expand Down Expand Up @@ -5610,7 +5592,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"mem_aof_buffer:%zu\r\n", mh->aof_buffer,
"mem_allocator:%s\r\n", ZMALLOC_LIB,
"mem_overhead_db_hashtable_rehashing:%zu\r\n", mh->overhead_db_hashtable_rehashing,
"active_defrag_running:%d\r\n", server.active_defrag_running,
"active_defrag_running:%d\r\n", server.active_defrag_cpu_percent,
"lazyfree_pending_objects:%zu\r\n", lazyfreeGetPendingObjectsCount(),
"lazyfreed_objects:%zu\r\n", lazyfreeGetFreedObjectsCount()));
freeMemoryOverheadData(mh);
Expand Down
10 changes: 5 additions & 5 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,6 @@ typedef struct serverDb {
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} serverDb;

/* forward declaration for functions ctx */
Expand Down Expand Up @@ -1669,7 +1668,7 @@ struct valkeyServer {
int last_sig_received; /* Indicates the last SIGNAL received, if any (e.g., SIGINT or SIGTERM). */
int shutdown_flags; /* Flags passed to prepareForShutdown(). */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
int active_defrag_cpu_percent; /* Current desired CPU percentage for active defrag */
char *pidfile; /* PID file path */
int arch_bits; /* 32 or 64 depending on sizeof(long) */
int cronloops; /* Number of times the cron function run */
Expand Down Expand Up @@ -1868,8 +1867,9 @@ struct valkeyServer {
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cpu_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cpu_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cycle_us; /* standard duration of defrag cycle */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from
within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
Expand Down Expand Up @@ -3312,7 +3312,7 @@ void bytesToHuman(char *s, size_t size, unsigned long long n);
void enterExecutionUnit(int update_cached_time, long long us);
void exitExecutionUnit(void);
void resetServerStats(void);
void activeDefragCycle(void);
void monitorActiveDefrag(void);
unsigned int getLRUClock(void);
unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void);
Expand Down
19 changes: 16 additions & 3 deletions tests/unit/memefficiency.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ run_solo {defrag} {
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75

after 1000 ;# Give defrag time to work (might be multiple cycles)

# Wait for the active defrag to stop working.
wait_for_condition 2000 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -138,12 +140,13 @@ run_solo {defrag} {
r config resetstat
r config set key-load-delay -25 ;# sleep on average 1/25 usec
r debug loadaof
after 1000 ;# give defrag a chance to work before turning it off
r config set activedefrag no

# measure hits and misses right after aof loading
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]

after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
Expand Down Expand Up @@ -203,7 +206,7 @@ run_solo {defrag} {
$rd read ; # Discard script load replies
$rd read ; # Discard set replies
}
after 120 ;# serverCron only updates the info once in 100ms
after 1000 ;# give defrag some time to work
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
Expand Down Expand Up @@ -239,6 +242,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag time to work (might be multiple cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -361,6 +366,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -430,7 +437,6 @@ run_solo {defrag} {
$rd read ; # Discard set replies
}

after 120 ;# serverCron only updates the info once in 100ms
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
Expand Down Expand Up @@ -466,6 +472,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand All @@ -475,6 +483,7 @@ run_solo {defrag} {
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
r config set activedefrag no ;# disable before we accidentally create more frag

# test the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
Expand Down Expand Up @@ -561,6 +570,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -685,6 +696,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down
16 changes: 11 additions & 5 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2300,9 +2300,8 @@ rdb-save-incremental-fsync yes
# Fragmentation is a natural process that happens with every allocator (but
# less so with Jemalloc, fortunately) and certain workloads. Normally a server
# restart is needed in order to lower the fragmentation, or at least to flush
# away all the data and create it again. However thanks to this feature
# implemented by Oran Agra, this process can happen at runtime
# in a "hot" way, while the server is running.
# away all the data and create it again. However thanks to this feature, this
# process can happen at runtime in a "hot" way, while the server is running.
#
# Basically when the fragmentation is over a certain level (see the
# configuration options below) the server will start to create new copies of the
Expand Down Expand Up @@ -2341,17 +2340,24 @@ rdb-save-incremental-fsync yes
# active-defrag-threshold-upper 100

# Minimal effort for defrag in CPU percentage, to be used when the lower
# threshold is reached
# threshold is reached.
# Note: this is not actually a cycle time, but is an overall CPU percentage
# active-defrag-cycle-min 1

# Maximal effort for defrag in CPU percentage, to be used when the upper
# threshold is reached
# threshold is reached.
# Note: this is not actually a cycle time, but is an overall CPU percentage
# active-defrag-cycle-max 25

# Maximum number of set/hash/zset/list fields that will be processed from
# the main dictionary scan
# active-defrag-max-scan-fields 1000

# The time spent (in microseconds) of the periodic active defrag process. This
# affects the latency impact of active defrag on client commands. Smaller numbers
# will result in less latency impact at the cost of increased defrag overhead.
# active-defrag-cycle-us 500

# Jemalloc background thread for purging will be enabled by default
jemalloc-bg-thread yes

Expand Down

0 comments on commit a943d5e

Please sign in to comment.