Skip to content

Commit

Permalink
poll offload to io threads
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Jul 9, 2024
1 parent d8468b7 commit a47dbee
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 55 deletions.
48 changes: 31 additions & 17 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include <stdlib.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <errno.h>

#include "zmalloc.h"
Expand Down Expand Up @@ -81,6 +80,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->custompoll = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
Expand Down Expand Up @@ -345,6 +345,10 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
return processed;
}

int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) {
return aeApiPoll(eventLoop, tvp);
}

/* Process every pending file event, then every pending time event
* (that may be registered by file event callbacks just processed).
* Without special flags the function sleeps until some file event
Expand Down Expand Up @@ -377,25 +381,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {

if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop);

/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
if (eventLoop->custompoll != NULL) {
numevents = eventLoop->custompoll(eventLoop);
} else {
/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

/* Don't process file events if not requested. */
if (!(flags & AE_FILE_EVENTS)) {
Expand Down Expand Up @@ -503,3 +511,9 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}

/*This function allows setting a custom poll procedure to be used by the event loop
* The custom poll procedure, if set, will be called instead of the default aeApiPoll */
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) {
eventLoop->custompoll = custompoll;
}
13 changes: 10 additions & 3 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#define __AE_H__

#include "monotonic.h"
#include <time.h>

#define AE_OK 0
#define AE_ERR -1
Expand Down Expand Up @@ -69,6 +70,7 @@ typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *client
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);
typedef int aeCustomPollProc(struct aeEventLoop *eventLoop);

/* File event structure */
typedef struct aeFileEvent {
Expand Down Expand Up @@ -109,15 +111,14 @@ typedef struct aeEventLoop {
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeAfterSleepProc *aftersleep;
aeCustomPollProc *custompoll;
int flags;
} aeEventLoop;

/* Prototypes */
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
void *aeGetFileClientData(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop,
Expand All @@ -132,8 +133,14 @@ void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp);
#ifndef AE_EXCLUDE_EVENT_MODIFICATION
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
#endif /* AE_EXCLUDE_EVENT_MODIFICATION */

#endif
2 changes: 1 addition & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2527,7 +2527,7 @@ static int updateMaxclients(const char **err) {
return 0;
}
if ((unsigned int)aeGetSetSize(server.el) < server.maxclients + CONFIG_FDSET_INCR) {
if (aeResizeSetSize(server.el, server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) {
if (serverResizeSetSize(server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) {
*err = "The event loop API is not able to handle the specified number of clients";
return 0;
}
Expand Down
3 changes: 2 additions & 1 deletion src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
#include <stdio.h>
#include <string.h>
#include <sys/uio.h>

#define AE_EXCLUDE_EVENT_MODIFICATION
#include "ae.h"
#undef AE_EXCLUDE_EVENT_MODIFICATION

#define CONN_INFO_LEN 32
#define CONN_ADDR_STR_LEN 128 /* Similar to INET6_ADDRSTRLEN, hoping to handle other protocols. */
Expand Down
66 changes: 66 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only) {
}
}

/* This function performs polling on the given event loop and updates the server's
* IO fired events count and poll state. */
void IOThreadPoll(void *data) {
aeEventLoop *el = (aeEventLoop *)data;
struct timeval tvp = {0, 0};

pthread_mutex_lock(&server.io_poll_mutex);
int num_events = aePoll(el, &tvp);
pthread_mutex_unlock(&server.io_poll_mutex);

server.io_ae_fired_events = num_events;
atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release);
}

static void *IOThreadMain(void *myid) {
/* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */
long id = (long)myid;
Expand Down Expand Up @@ -284,6 +298,10 @@ void killIOThreads(void) {
void initIOThreads(void) {
server.active_io_threads_num = 1; /* We start with threads not active. */

pthread_mutex_init(&server.io_poll_mutex, NULL);
server.io_poll_state = AE_IO_STATE_NONE;
server.io_ae_fired_events = 0;

/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
Expand Down Expand Up @@ -489,3 +507,51 @@ int tryOffloadFreeObjToIOThreads(robj *obj) {
server.stat_io_freed_objects++;
return C_OK;
}

/* This function retrieves the results of the IO Thread poll.
* returns the number of fired events if the IO thread has finished processing poll events, 0 otherwise. */
static int getIOThreadPollResults(aeEventLoop *eventLoop) {
int io_state;
io_state = atomic_load_explicit(&server.io_poll_state, memory_order_acquire);
if (io_state == AE_IO_STATE_POLL) {
/* IO thread is still processing poll events. */
return 0;
}

/* IO thread is done processing poll events. */
serverAssert(io_state == AE_IO_STATE_DONE);
server.stat_poll_processed_by_io_threads++;
server.io_poll_state = AE_IO_STATE_NONE;

/* Remove the custom poll proc. */
aeSetCustomPollProc(eventLoop, NULL);

return server.io_ae_fired_events;
}

void trySendPollJobToIOThreads(void) {
if (server.active_io_threads_num <= 1) {
return;
}

/* If there are no pending jobs, let the main thread do the poll-wait by itself. */
if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) {
return;
}

/* If the IO thread is already processing poll events, don't send another job. */
if (server.io_poll_state != AE_IO_STATE_NONE) {
return;
}

/* The poll is sent to the last thread. While a random thread could have been selected,
* the last thread has a slightly better chance of being less loaded compared to other threads,
* As we activate the lowest threads first. */
int tid = server.active_io_threads_num - 1;
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */

server.io_poll_state = AE_IO_STATE_POLL;
aeSetCustomPollProc(server.el, getIOThreadPollResults);
IOJobQueue_push(jq, IOThreadPoll, server.el);
}
1 change: 1 addition & 0 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ int tryOffloadFreeObjToIOThreads(robj *o);
int tryOffloadFreeArgvToIOThreads(client *c);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);

#endif /* IO_THREADS_H */
4 changes: 2 additions & 2 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -9337,7 +9337,7 @@ int VM_EventLoopAdd(int fd, int mask, ValkeyModuleEventLoopFunc func, void *user

int aeMask = eventLoopToAeMask(mask);

if (aeCreateFileEvent(server.el, fd, aeMask, aeProc, data) != AE_OK) {
if (serverCreateFileEvent(fd, aeMask, aeProc, data) != AE_OK) {
if (aeGetFileEvents(server.el, fd) == AE_NONE) zfree(data);
return VALKEYMODULE_ERR;
}
Expand Down Expand Up @@ -9378,7 +9378,7 @@ int VM_EventLoopDel(int fd, int mask) {
/* After deleting the event, if fd does not have any registered event
* anymore, we can free the EventLoopData object. */
EventLoopData *data = aeGetFileClientData(server.el, fd);
aeDeleteFileEvent(server.el, fd, eventLoopToAeMask(mask));
serverDeleteFileEvent(fd, eventLoopToAeMask(mask));
if (aeGetFileEvents(server.el, fd) == AE_NONE) zfree(data);

errno = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3455,7 +3455,7 @@ static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
serverLog(LL_WARNING, "Background transfer terminated by signal %d", bysignal);
}
if (server.rdb_child_exit_pipe != -1) close(server.rdb_child_exit_pipe);
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE);
close(server.rdb_pipe_read);
server.rdb_child_exit_pipe = -1;
server.rdb_pipe_read = -1;
Expand Down Expand Up @@ -3609,7 +3609,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) {
if (serverCreateFileEvent(server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
server.rdb_pipe_numconns_writing--;
/* if there are no more writes for now for this conn, or write error: */
if (server.rdb_pipe_numconns_writing == 0) {
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) {
if (serverCreateFileEvent(server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
Expand Down Expand Up @@ -1488,7 +1488,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
if (server.rdb_pipe_bufflen == 0) {
/* EOF - write end was closed. */
int stillUp = 0;
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE);
for (i = 0; i < server.rdb_pipe_numconns; i++) {
connection *conn = server.rdb_pipe_conns[i];
if (!conn) continue;
Expand Down Expand Up @@ -1542,7 +1542,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
}
/* Remove the pipe read handler if at least one write handler was set. */
if (server.rdb_pipe_numconns_writing || stillAlive == 0) {
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE);
break;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "ae.h"
#include "server.h"
#include "hiredis.h"
#if USE_OPENSSL == 1 /* BUILD_YES */
Expand Down
Loading

0 comments on commit a47dbee

Please sign in to comment.