From 9f49742336badf8b02a7d37ca09ead8edd031d3c Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Tue, 2 Jul 2024 07:52:32 +0000 Subject: [PATCH] Free offload to IO threads Signed-off-by: Uri Yagelnik --- src/db.c | 6 ++- src/io_threads.c | 102 +++++++++++++++++++++++++++++++++++++++++++++++ src/io_threads.h | 3 ++ src/networking.c | 10 +++-- src/object.c | 2 + src/server.c | 2 + src/server.h | 9 +++-- 7 files changed, 127 insertions(+), 7 deletions(-) diff --git a/src/db.c b/src/db.c index 5a6562a1e2..073bec6ca3 100644 --- a/src/db.c +++ b/src/db.c @@ -32,6 +32,7 @@ #include "latency.h" #include "script.h" #include "functions.h" +#include "io_threads.h" #include #include @@ -297,7 +298,10 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn old = dictGetVal(de); } kvstoreDictSetVal(db->keys, slot, de, val); - if (server.lazyfree_lazy_server_del) { + /* For efficiency, let the I/O thread that allocated an object also deallocate it. */ + if (tryOffloadFreeObjToIOThreads(old) == C_OK) { + /* OK */ + } else if (server.lazyfree_lazy_server_del) { freeObjAsync(key, old, db->id); } else { decrRefCount(old); diff --git a/src/io_threads.c b/src/io_threads.c index 89e2686cd9..2547b803fb 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -124,6 +124,10 @@ int inMainThread(void) { return thread_id == 0; } +int getIOThreadID(void) { + return thread_id; +} + /* Drains the I/O threads queue by waiting for all jobs to be processed. * This function must be called from the main thread. */ void drainIOThreadsQueue(void) { @@ -387,3 +391,101 @@ int trySendWriteToIOThreads(client *c) { IOJobQueue_push(jq, ioThreadWriteToClient, c); return C_OK; } + +/* Internal function to free the client's argv in an IO thread. */ +void IOThreadFreeArgv(void *data) { + robj **argv = (robj **)data; + + for (int i = 0;; i++) { + robj *o = argv[i]; + if (o == NULL) { + continue; + } + + int allocator_id = o->allocator_id; + decrRefCount(o); + + /* The allocator_id is set to 0 to indicate that this is the last argument to free */ + if (allocator_id == 0) { + break; + } + } + + zfree(argv); +} + +/* This function attempts to offload the client's argv to an IO thread. + * Returns C_OK if the client's argv were successfully offloaded to an IO thread, + * C_ERR otherwise. */ +int tryOffloadFreeArgvToIOThreads(client *c) { + if (server.active_io_threads_num <= 1 || c->argc == 0) { + return C_ERR; + } + + size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1; + + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) { + return C_ERR; + } + + int last_arg_to_free = -1; + + /* Prepare the argv */ + for (int j = 0; j < c->argc; j++) { + if (c->argv[j]->refcount > 1 || tid != c->argv[j]->allocator_id) { + decrRefCount(c->argv[j]); + /* Set argv[j] to NULL to avoid double free */ + c->argv[j] = NULL; + } else { + last_arg_to_free = j; + } + } + + /* If no argv to free, free the argv array at the main thread */ + if (last_arg_to_free == -1) { + zfree(c->argv); + return C_OK; + } + + /* We set the allocator_id of the last arg to free to 0 to indicate that + * this is the last argument to free. With this approach, we don't need to + * send the argc to the IO thread and we can send just the argv ptr. */ + c->argv[last_arg_to_free]->allocator_id = 0; + + /* Must succeed as we checked the free space before. */ + IOJobQueue_push(jq, IOThreadFreeArgv, c->argv); + + return C_OK; +} + +/* This function attempts to offload the free of an object to an IO thread. + * Returns C_OK if the object was successfully offloaded to an IO thread, + * C_ERR otherwise.*/ +int tryOffloadFreeObjToIOThreads(robj *obj) { + if (server.active_io_threads_num <= 1) { + return C_ERR; + } + /* the object was not allocated by the IO threads */ + if (obj->allocator_id == 0) return C_ERR; + + if (obj->refcount > 1) return C_ERR; + + int tid = obj->allocator_id; + /* If the thread that allocated the object is no longer active, + * reassign the object to a different thread. This is necessary + * because we dynamically adjust the number of active threads + * based on I/O load.*/ + if (tid >= server.active_io_threads_num) { + tid = (tid % (server.active_io_threads_num - 1)) + 1; + } + + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) { + return C_ERR; + } + + IOJobQueue_push(jq, decrRefCountVoid, obj); + server.stat_io_freed_objects++; + return C_OK; +} diff --git a/src/io_threads.h b/src/io_threads.h index 9fb23c190e..238222ee20 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -6,8 +6,11 @@ void initIOThreads(void); void killIOThreads(void); int inMainThread(void); +int getIOThreadID(void); int trySendReadToIOThreads(client *c); int trySendWriteToIOThreads(client *c); +int tryOffloadFreeObjToIOThreads(robj *o); +int tryOffloadFreeArgvToIOThreads(client *c); void adjustIOThreadsByEventLoad(int numevents, int increase_only); void drainIOThreadsQueue(void); diff --git a/src/networking.c b/src/networking.c index a8cdd4e4cf..6fa330649a 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1424,14 +1424,15 @@ void freeClientOriginalArgv(client *c) { } void freeClientArgv(client *c) { - int j; - for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + if (tryOffloadFreeArgvToIOThreads(c) == C_ERR) { + for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + zfree(c->argv); + } c->argc = 0; c->cmd = NULL; c->io_parsed_cmd = NULL; c->argv_len_sum = 0; c->argv_len = 0; - zfree(c->argv); c->argv = NULL; } @@ -4646,6 +4647,9 @@ void ioThreadReadQueryFromClient(void *data) { goto done; } + /* set the allocator ID for the objects */ + for (int i = 0; i < c->argc; i++) c->argv[i]->allocator_id = getIOThreadID(); + /* Lookup command offload */ c->io_parsed_cmd = lookupCommand(c->argv, c->argc); if (c->io_parsed_cmd && commandCheckArity(c->io_parsed_cmd, c->argc, NULL) == 0) { diff --git a/src/object.c b/src/object.c index 6e5d1f460b..521557fb0c 100644 --- a/src/object.c +++ b/src/object.c @@ -46,6 +46,7 @@ robj *createObject(int type, void *ptr) { o->encoding = OBJ_ENCODING_RAW; o->ptr = ptr; o->refcount = 1; + o->allocator_id = 0; o->lru = 0; return o; } @@ -96,6 +97,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { o->encoding = OBJ_ENCODING_EMBSTR; o->ptr = sh + 1; o->refcount = 1; + o->allocator_id = 0; o->lru = 0; sh->len = len; diff --git a/src/server.c b/src/server.c index 61d6022dfa..dfeeeddad9 100644 --- a/src/server.c +++ b/src/server.c @@ -2492,6 +2492,7 @@ void resetServerStats(void) { server.stat_io_reads_processed = 0; server.stat_total_reads_processed = 0; server.stat_io_writes_processed = 0; + server.stat_io_freed_objects = 0; server.stat_total_writes_processed = 0; server.stat_client_qbuf_limit_disconnections = 0; server.stat_client_outbuf_limit_disconnections = 0; @@ -5702,6 +5703,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "total_writes_processed:%lld\r\n", server.stat_total_writes_processed, "io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, + "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index a5cf2d08aa..1ce8db495f 100644 --- a/src/server.h +++ b/src/server.h @@ -853,8 +853,9 @@ struct ValkeyModuleDigest { #define LRU_CLOCK_MAX ((1 << LRU_BITS) - 1) /* Max value of obj->lru */ #define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ -#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */ -#define OBJ_STATIC_REFCOUNT (INT_MAX - 1) /* Object allocated in the stack. */ +#define UINT28_MAX 0x0FFFFFFF +#define OBJ_SHARED_REFCOUNT UINT28_MAX /* Global object never destroyed. */ +#define OBJ_STATIC_REFCOUNT (UINT28_MAX - 1) /* Object allocated in the stack. */ #define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT struct serverObject { unsigned type : 4; @@ -862,7 +863,8 @@ struct serverObject { unsigned lru : LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ - int refcount; + unsigned refcount : 28; + unsigned allocator_id : 4; void *ptr; }; @@ -1751,6 +1753,7 @@ struct valkeyServer { long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ long long stat_io_reads_processed; /* Number of read events processed by IO threads */ long long stat_io_writes_processed; /* Number of write events processed by IO threads */ + long long stat_io_freed_objects; /* Number of objects freed by IO threads */ long long stat_total_reads_processed; /* Total number of read events processed */ long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */