Skip to content

Commit

Permalink
Free offload to IO threads
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Jul 10, 2024
1 parent 385e552 commit 9f49742
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 7 deletions.
6 changes: 5 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "latency.h"
#include "script.h"
#include "functions.h"
#include "io_threads.h"

#include <signal.h>
#include <ctype.h>
Expand Down Expand Up @@ -297,7 +298,10 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
if (server.lazyfree_lazy_server_del) {
/* For efficiency, let the I/O thread that allocated an object also deallocate it. */
if (tryOffloadFreeObjToIOThreads(old) == C_OK) {
/* OK */
} else if (server.lazyfree_lazy_server_del) {
freeObjAsync(key, old, db->id);
} else {
decrRefCount(old);
Expand Down
102 changes: 102 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ int inMainThread(void) {
return thread_id == 0;
}

int getIOThreadID(void) {
return thread_id;
}

/* Drains the I/O threads queue by waiting for all jobs to be processed.
* This function must be called from the main thread. */
void drainIOThreadsQueue(void) {
Expand Down Expand Up @@ -387,3 +391,101 @@ int trySendWriteToIOThreads(client *c) {
IOJobQueue_push(jq, ioThreadWriteToClient, c);
return C_OK;
}

/* Internal function to free the client's argv in an IO thread. */
void IOThreadFreeArgv(void *data) {
robj **argv = (robj **)data;

for (int i = 0;; i++) {
robj *o = argv[i];
if (o == NULL) {
continue;
}

int allocator_id = o->allocator_id;
decrRefCount(o);

/* The allocator_id is set to 0 to indicate that this is the last argument to free */
if (allocator_id == 0) {
break;
}
}

zfree(argv);
}

/* This function attempts to offload the client's argv to an IO thread.
* Returns C_OK if the client's argv were successfully offloaded to an IO thread,
* C_ERR otherwise. */
int tryOffloadFreeArgvToIOThreads(client *c) {
if (server.active_io_threads_num <= 1 || c->argc == 0) {
return C_ERR;
}

size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;

IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) {
return C_ERR;
}

int last_arg_to_free = -1;

/* Prepare the argv */
for (int j = 0; j < c->argc; j++) {
if (c->argv[j]->refcount > 1 || tid != c->argv[j]->allocator_id) {
decrRefCount(c->argv[j]);
/* Set argv[j] to NULL to avoid double free */
c->argv[j] = NULL;
} else {
last_arg_to_free = j;
}
}

/* If no argv to free, free the argv array at the main thread */
if (last_arg_to_free == -1) {
zfree(c->argv);
return C_OK;
}

/* We set the allocator_id of the last arg to free to 0 to indicate that
* this is the last argument to free. With this approach, we don't need to
* send the argc to the IO thread and we can send just the argv ptr. */
c->argv[last_arg_to_free]->allocator_id = 0;

/* Must succeed as we checked the free space before. */
IOJobQueue_push(jq, IOThreadFreeArgv, c->argv);

return C_OK;
}

/* This function attempts to offload the free of an object to an IO thread.
* Returns C_OK if the object was successfully offloaded to an IO thread,
* C_ERR otherwise.*/
int tryOffloadFreeObjToIOThreads(robj *obj) {
if (server.active_io_threads_num <= 1) {
return C_ERR;
}
/* the object was not allocated by the IO threads */
if (obj->allocator_id == 0) return C_ERR;

if (obj->refcount > 1) return C_ERR;

int tid = obj->allocator_id;
/* If the thread that allocated the object is no longer active,
* reassign the object to a different thread. This is necessary
* because we dynamically adjust the number of active threads
* based on I/O load.*/
if (tid >= server.active_io_threads_num) {
tid = (tid % (server.active_io_threads_num - 1)) + 1;
}

IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) {
return C_ERR;
}

IOJobQueue_push(jq, decrRefCountVoid, obj);
server.stat_io_freed_objects++;
return C_OK;
}
3 changes: 3 additions & 0 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
void initIOThreads(void);
void killIOThreads(void);
int inMainThread(void);
int getIOThreadID(void);
int trySendReadToIOThreads(client *c);
int trySendWriteToIOThreads(client *c);
int tryOffloadFreeObjToIOThreads(robj *o);
int tryOffloadFreeArgvToIOThreads(client *c);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);

Expand Down
10 changes: 7 additions & 3 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1424,14 +1424,15 @@ void freeClientOriginalArgv(client *c) {
}

void freeClientArgv(client *c) {
int j;
for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
if (tryOffloadFreeArgvToIOThreads(c) == C_ERR) {
for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
zfree(c->argv);
}
c->argc = 0;
c->cmd = NULL;
c->io_parsed_cmd = NULL;
c->argv_len_sum = 0;
c->argv_len = 0;
zfree(c->argv);
c->argv = NULL;
}

Expand Down Expand Up @@ -4646,6 +4647,9 @@ void ioThreadReadQueryFromClient(void *data) {
goto done;
}

/* set the allocator ID for the objects */
for (int i = 0; i < c->argc; i++) c->argv[i]->allocator_id = getIOThreadID();

/* Lookup command offload */
c->io_parsed_cmd = lookupCommand(c->argv, c->argc);
if (c->io_parsed_cmd && commandCheckArity(c->io_parsed_cmd, c->argc, NULL) == 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ robj *createObject(int type, void *ptr) {
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
o->allocator_id = 0;
o->lru = 0;
return o;
}
Expand Down Expand Up @@ -96,6 +97,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
o->encoding = OBJ_ENCODING_EMBSTR;
o->ptr = sh + 1;
o->refcount = 1;
o->allocator_id = 0;
o->lru = 0;

sh->len = len;
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2492,6 +2492,7 @@ void resetServerStats(void) {
server.stat_io_reads_processed = 0;
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_io_freed_objects = 0;
server.stat_total_writes_processed = 0;
server.stat_client_qbuf_limit_disconnections = 0;
server.stat_client_outbuf_limit_disconnections = 0;
Expand Down Expand Up @@ -5702,6 +5703,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"total_writes_processed:%lld\r\n", server.stat_total_writes_processed,
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects,
"client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections,
"client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections,
"reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks,
Expand Down
9 changes: 6 additions & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -853,16 +853,18 @@ struct ValkeyModuleDigest {
#define LRU_CLOCK_MAX ((1 << LRU_BITS) - 1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */

#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX - 1) /* Object allocated in the stack. */
#define UINT28_MAX 0x0FFFFFFF
#define OBJ_SHARED_REFCOUNT UINT28_MAX /* Global object never destroyed. */
#define OBJ_STATIC_REFCOUNT (UINT28_MAX - 1) /* Object allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
struct serverObject {
unsigned type : 4;
unsigned encoding : 4;
unsigned lru : LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
unsigned refcount : 28;
unsigned allocator_id : 4;
void *ptr;
};

Expand Down Expand Up @@ -1751,6 +1753,7 @@ struct valkeyServer {
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
long long stat_io_reads_processed; /* Number of read events processed by IO threads */
long long stat_io_writes_processed; /* Number of write events processed by IO threads */
long long stat_io_freed_objects; /* Number of objects freed by IO threads */
long long stat_total_reads_processed; /* Total number of read events processed */
long long stat_total_writes_processed; /* Total number of write events processed */
long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
Expand Down

0 comments on commit 9f49742

Please sign in to comment.