Skip to content

Commit

Permalink
poll offload to io threads
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Jul 17, 2024
1 parent bbf53de commit 2753365
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 30 deletions.
119 changes: 93 additions & 26 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include <stdlib.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <errno.h>

#include "zmalloc.h"
Expand All @@ -63,6 +62,15 @@
#endif
#endif

#define AE_LOCK(eventLoop) \
if ((eventLoop)->flags & AE_PROTECT_POLL) { \
assert(pthread_mutex_lock(&(eventLoop)->poll_mutex) == 0); \
}

#define AE_UNLOCK(eventLoop) \
if ((eventLoop)->flags & AE_PROTECT_POLL) { \
assert(pthread_mutex_unlock(&(eventLoop)->poll_mutex) == 0); \
}

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
Expand All @@ -81,7 +89,14 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->custompoll = NULL;
eventLoop->flags = 0;
/* Initialize the eventloop mutex with PTHREAD_MUTEX_ERRORCHECK type */
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if (pthread_mutex_init(&eventLoop->poll_mutex, &attr) != 0) goto err;

if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
Expand Down Expand Up @@ -122,11 +137,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait) {
*
* Otherwise AE_OK is returned and the operation is successful. */
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
AE_LOCK(eventLoop);
int ret = AE_OK;
int i;

if (setsize == eventLoop->setsize) return AE_OK;
if (eventLoop->maxfd >= setsize) return AE_ERR;
if (aeApiResize(eventLoop, setsize) == -1) return AE_ERR;
if (setsize == eventLoop->setsize) goto done;
if (eventLoop->maxfd >= setsize) goto err;
if (aeApiResize(eventLoop, setsize) == -1) goto err;

eventLoop->events = zrealloc(eventLoop->events, sizeof(aeFileEvent) * setsize);
eventLoop->fired = zrealloc(eventLoop->fired, sizeof(aeFiredEvent) * setsize);
Expand All @@ -135,10 +152,17 @@ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
/* Make sure that if we created new slots, they are initialized with
* an AE_NONE mask. */
for (i = eventLoop->maxfd + 1; i < setsize; i++) eventLoop->events[i].mask = AE_NONE;
return AE_OK;
goto done;

err:
ret = AE_ERR;
done:
AE_UNLOCK(eventLoop);
return ret;
}

void aeDeleteEventLoop(aeEventLoop *eventLoop) {
AE_LOCK(eventLoop);
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
Expand All @@ -152,32 +176,43 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop) {
te = next_te;
}
zfree(eventLoop);
AE_UNLOCK(eventLoop);
}

void aeStop(aeEventLoop *eventLoop) {
eventLoop->stop = 1;
}

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
AE_LOCK(eventLoop);
int ret = AE_ERR;

if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
goto done;
}
aeFileEvent *fe = &eventLoop->events[fd];

if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR;
if (aeApiAddEvent(eventLoop, fd, mask) == -1) goto done;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd) eventLoop->maxfd = fd;
return AE_OK;

ret = AE_OK;

done:
AE_UNLOCK(eventLoop);
return ret;
}

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
if (fd >= eventLoop->setsize) return;
AE_LOCK(eventLoop);
if (fd >= eventLoop->setsize) goto done;

aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
if (fe->mask == AE_NONE) goto done;

/* We want to always remove AE_BARRIER if set when AE_WRITABLE
* is removed. */
Expand All @@ -204,6 +239,9 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
* which is required by evport and epoll */
aeApiDelEvent(eventLoop, fd, mask);
}

done:
AE_UNLOCK(eventLoop);
}

void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) {
Expand Down Expand Up @@ -345,6 +383,17 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
return processed;
}

/* This function provides direct access to the aeApiPoll call.
* It is intended to be called from a custom poll function.*/
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) {
AE_LOCK(eventLoop);

int ret = aeApiPoll(eventLoop, tvp);

AE_UNLOCK(eventLoop);
return ret;
}

/* Process every pending file event, then every pending time event
* (that may be registered by file event callbacks just processed).
* Without special flags the function sleeps until some file event
Expand Down Expand Up @@ -377,25 +426,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {

if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop);

/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
if (eventLoop->custompoll != NULL) {
numevents = eventLoop->custompoll(eventLoop);
} else {
/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

/* Don't process file events if not requested. */
if (!(flags & AE_FILE_EVENTS)) {
Expand Down Expand Up @@ -503,3 +556,17 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}

/* This function allows setting a custom poll procedure to be used by the event loop.
* The custom poll procedure, if set, will be called instead of the default aeApiPoll */
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) {
eventLoop->custompoll = custompoll;
}

void aeSetPollProtect(aeEventLoop *eventLoop, int protect) {
if (protect) {
eventLoop->flags |= AE_PROTECT_POLL;
} else {
eventLoop->flags &= ~AE_PROTECT_POLL;
}
}
9 changes: 9 additions & 0 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#define __AE_H__

#include "monotonic.h"
#include <time.h>
#include <pthread.h>

#define AE_OK 0
#define AE_ERR -1
Expand All @@ -54,6 +56,7 @@
#define AE_DONT_WAIT (1 << 2)
#define AE_CALL_BEFORE_SLEEP (1 << 3)
#define AE_CALL_AFTER_SLEEP (1 << 4)
#define AE_PROTECT_POLL (1 << 5)

#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1
Expand All @@ -69,6 +72,7 @@ typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *client
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);
typedef int aeCustomPollProc(struct aeEventLoop *eventLoop);

/* File event structure */
typedef struct aeFileEvent {
Expand Down Expand Up @@ -109,6 +113,8 @@ typedef struct aeEventLoop {
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeAfterSleepProc *aftersleep;
aeCustomPollProc *custompoll;
pthread_mutex_t poll_mutex;
int flags;
} aeEventLoop;

Expand All @@ -132,6 +138,9 @@ void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll);
void aeSetPollProtect(aeEventLoop *eventLoop, int protect);
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
Expand Down
62 changes: 62 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only) {
}
}

/* This function performs polling on the given event loop and updates the server's
* IO fired events count and poll state. */
void IOThreadPoll(void *data) {
aeEventLoop *el = (aeEventLoop *)data;
struct timeval tvp = {0, 0};
int num_events = aePoll(el, &tvp);

server.io_ae_fired_events = num_events;
atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release);
}

static void *IOThreadMain(void *myid) {
/* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */
long id = (long)myid;
Expand Down Expand Up @@ -283,6 +294,8 @@ void killIOThreads(void) {
/* Initialize the data structures needed for I/O threads. */
void initIOThreads(void) {
server.active_io_threads_num = 1; /* We start with threads not active. */
server.io_poll_state = AE_IO_STATE_NONE;
server.io_ae_fired_events = 0;

/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
Expand Down Expand Up @@ -485,3 +498,52 @@ int tryOffloadFreeObjToIOThreads(robj *obj) {
server.stat_io_freed_objects++;
return C_OK;
}

/* This function retrieves the results of the IO Thread poll.
* returns the number of fired events if the IO thread has finished processing poll events, 0 otherwise. */
static int getIOThreadPollResults(aeEventLoop *eventLoop) {
int io_state;
io_state = atomic_load_explicit(&server.io_poll_state, memory_order_acquire);
if (io_state == AE_IO_STATE_POLL) {
/* IO thread is still processing poll events. */
return 0;
}

/* IO thread is done processing poll events. */
serverAssert(io_state == AE_IO_STATE_DONE);
server.stat_poll_processed_by_io_threads++;
server.io_poll_state = AE_IO_STATE_NONE;

/* Remove the custom poll proc. */
aeSetCustomPollProc(eventLoop, NULL);
aeSetPollProtect(eventLoop, 0);
return server.io_ae_fired_events;
}

void trySendPollJobToIOThreads(void) {
if (server.active_io_threads_num <= 1) {
return;
}

/* If there are no pending jobs, let the main thread do the poll-wait by itself. */
if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) {
return;
}

/* If the IO thread is already processing poll events, don't send another job. */
if (server.io_poll_state != AE_IO_STATE_NONE) {
return;
}

/* The poll is sent to the last thread. While a random thread could have been selected,
* the last thread has a slightly better chance of being less loaded compared to other threads,
* As we activate the lowest threads first. */
int tid = server.active_io_threads_num - 1;
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */

server.io_poll_state = AE_IO_STATE_POLL;
aeSetCustomPollProc(server.el, getIOThreadPollResults);
aeSetPollProtect(server.el, 1);
IOJobQueue_push(jq, IOThreadPoll, server.el);
}
1 change: 1 addition & 0 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ int tryOffloadFreeObjToIOThreads(robj *o);
int tryOffloadFreeArgvToIOThreads(client *c);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);

#endif /* IO_THREADS_H */
11 changes: 7 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,9 @@ extern int ProcessingEventsWhileBlocked;
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);

/* When I/O threads are enabled and there are pending I/O jobs, the poll is offloaded to one of the I/O threads. */
trySendPollJobToIOThreads();

size_t zmalloc_used = zmalloc_used_memory();
if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used;

Expand Down Expand Up @@ -1596,10 +1599,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */
connTypeProcessPendingData();

/* If any connection type(typical TLS) still has pending unread data or if there are clients
* with pending IO reads/writes, don't sleep at all. */
int dont_sleep = connTypeHasPendingData() || listLength(server.clients_pending_io_read) > 0 ||
listLength(server.clients_pending_io_write) > 0;
/* If any connection type(typical TLS) still has pending unread data don't sleep at all. */
int dont_sleep = connTypeHasPendingData();

/* Call the Cluster before sleep function. Note that this function
* may change the state of Cluster (from ok to fail or vice versa),
Expand Down Expand Up @@ -2493,6 +2494,7 @@ void resetServerStats(void) {
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_io_freed_objects = 0;
server.stat_poll_processed_by_io_threads = 0;
server.stat_total_writes_processed = 0;
server.stat_client_qbuf_limit_disconnections = 0;
server.stat_client_outbuf_limit_disconnections = 0;
Expand Down Expand Up @@ -5704,6 +5706,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects,
"io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads,
"client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections,
"client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections,
"reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks,
Expand Down
Loading

0 comments on commit 2753365

Please sign in to comment.