From 5d11ff8e0cdda94036f13df0f458389aad45e68d Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Mon, 6 May 2024 07:37:58 +0000 Subject: [PATCH] poll offload to io threads Signed-off-by: Uri Yagelnik --- src/ae.c | 51 +++++++++++------ src/ae.h | 136 +------------------------------------------ src/ae_apidata.h | 20 +++++++ src/ae_eventloop.h | 140 +++++++++++++++++++++++++++++++++++++++++++++ src/config.c | 2 +- src/connection.h | 5 +- src/io_threads.c | 61 ++++++++++++++++++++ src/io_threads.h | 1 + src/module.c | 4 +- src/rdb.c | 4 +- src/replication.c | 6 +- src/sentinel.c | 1 + src/server.c | 73 ++++++++++++++++++++--- src/server.h | 12 +++- src/socket.c | 14 ++--- src/tls.c | 18 +++--- 16 files changed, 360 insertions(+), 188 deletions(-) create mode 100644 src/ae_apidata.h create mode 100644 src/ae_eventloop.h diff --git a/src/ae.c b/src/ae.c index b6a1ce0b10..34a754eff1 100644 --- a/src/ae.c +++ b/src/ae.c @@ -41,7 +41,6 @@ #include #include #include -#include #include #include "zmalloc.h" @@ -81,6 +80,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; + eventLoop->custompoll = NULL; eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the @@ -345,6 +345,13 @@ static int processTimeEvents(aeEventLoop *eventLoop) { return processed; } +/* This function provides direct access to the aeApiPoll call. + * It is intended to be called from a custom poll function, + * currently invoked from IOThreadPoll.*/ +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) { + return aeApiPoll(eventLoop, tvp); +} + /* Process every pending file event, then every pending time event * (that may be registered by file event callbacks just processed). * Without special flags the function sleeps until some file event @@ -377,25 +384,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop); - /* The eventLoop->flags may be changed inside beforesleep. - * So we should check it after beforesleep be called. At the same time, - * the parameter flags always should have the highest priority. - * That is to say, once the parameter flag is set to AE_DONT_WAIT, - * no matter what value eventLoop->flags is set to, we should ignore it. */ - if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { - tv.tv_sec = tv.tv_usec = 0; - tvp = &tv; - } else if (flags & AE_TIME_EVENTS) { - usUntilTimer = usUntilEarliestTimer(eventLoop); - if (usUntilTimer >= 0) { - tv.tv_sec = usUntilTimer / 1000000; - tv.tv_usec = usUntilTimer % 1000000; + if (eventLoop->custompoll != NULL) { + numevents = eventLoop->custompoll(eventLoop); + } else { + /* The eventLoop->flags may be changed inside beforesleep. + * So we should check it after beforesleep be called. At the same time, + * the parameter flags always should have the highest priority. + * That is to say, once the parameter flag is set to AE_DONT_WAIT, + * no matter what value eventLoop->flags is set to, we should ignore it. */ + if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { + tv.tv_sec = tv.tv_usec = 0; tvp = &tv; + } else if (flags & AE_TIME_EVENTS) { + usUntilTimer = usUntilEarliestTimer(eventLoop); + if (usUntilTimer >= 0) { + tv.tv_sec = usUntilTimer / 1000000; + tv.tv_usec = usUntilTimer % 1000000; + tvp = &tv; + } } + /* Call the multiplexing API, will return only on timeout or when + * some event fires. */ + numevents = aeApiPoll(eventLoop, tvp); } - /* Call the multiplexing API, will return only on timeout or when - * some event fires. */ - numevents = aeApiPoll(eventLoop, tvp); /* Don't process file events if not requested. */ if (!(flags & AE_FILE_EVENTS)) { @@ -503,3 +514,9 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) { eventLoop->aftersleep = aftersleep; } + +/* This function allows setting a custom poll procedure to be used by the event loop. + * The custom poll procedure, if set, will be called instead of the default aeApiPoll */ +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) { + eventLoop->custompoll = custompoll; +} diff --git a/src/ae.h b/src/ae.h index 3b1c96a01d..4515b94a4f 100644 --- a/src/ae.h +++ b/src/ae.h @@ -1,139 +1,7 @@ -/* A simple event-driven programming library. Originally I wrote this code - * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated - * it in form of a library for easy reuse. - * - * Copyright (c) 2006-2012, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - #ifndef __AE_H__ #define __AE_H__ -#include "monotonic.h" - -#define AE_OK 0 -#define AE_ERR -1 - -#define AE_NONE 0 /* No events registered. */ -#define AE_READABLE 1 /* Fire when descriptor is readable. */ -#define AE_WRITABLE 2 /* Fire when descriptor is writable. */ -#define AE_BARRIER \ - 4 /* With WRITABLE, never fire the event if the \ - READABLE event already fired in the same event \ - loop iteration. Useful when you want to persist \ - things to disk before sending replies, and want \ - to do that in a group fashion. */ - -#define AE_FILE_EVENTS (1 << 0) -#define AE_TIME_EVENTS (1 << 1) -#define AE_ALL_EVENTS (AE_FILE_EVENTS | AE_TIME_EVENTS) -#define AE_DONT_WAIT (1 << 2) -#define AE_CALL_BEFORE_SLEEP (1 << 3) -#define AE_CALL_AFTER_SLEEP (1 << 4) - -#define AE_NOMORE -1 -#define AE_DELETED_EVENT_ID -1 - -/* Macros */ -#define AE_NOTUSED(V) ((void)V) - -struct aeEventLoop; - -/* Types and data structures */ -typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); -typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); -typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); -typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); -typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents); - -/* File event structure */ -typedef struct aeFileEvent { - int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ - aeFileProc *rfileProc; - aeFileProc *wfileProc; - void *clientData; -} aeFileEvent; - -/* Time event structure */ -typedef struct aeTimeEvent { - long long id; /* time event identifier. */ - monotime when; - aeTimeProc *timeProc; - aeEventFinalizerProc *finalizerProc; - void *clientData; - struct aeTimeEvent *prev; - struct aeTimeEvent *next; - int refcount; /* refcount to prevent timer events from being - * freed in recursive time event calls. */ -} aeTimeEvent; - -/* A fired event */ -typedef struct aeFiredEvent { - int fd; - int mask; -} aeFiredEvent; - -/* State of an event based program */ -typedef struct aeEventLoop { - int maxfd; /* highest file descriptor currently registered */ - int setsize; /* max number of file descriptors tracked */ - long long timeEventNextId; - aeFileEvent *events; /* Registered events */ - aeFiredEvent *fired; /* Fired events */ - aeTimeEvent *timeEventHead; - int stop; - void *apidata; /* This is used for polling API specific data */ - aeBeforeSleepProc *beforesleep; - aeAfterSleepProc *aftersleep; - int flags; -} aeEventLoop; - -/* Prototypes */ -aeEventLoop *aeCreateEventLoop(int setsize); -void aeDeleteEventLoop(aeEventLoop *eventLoop); -void aeStop(aeEventLoop *eventLoop); -int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); -void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); -int aeGetFileEvents(aeEventLoop *eventLoop, int fd); -void *aeGetFileClientData(aeEventLoop *eventLoop, int fd); -long long aeCreateTimeEvent(aeEventLoop *eventLoop, - long long milliseconds, - aeTimeProc *proc, - void *clientData, - aeEventFinalizerProc *finalizerProc); -int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); -int aeProcessEvents(aeEventLoop *eventLoop, int flags); -int aeWait(int fd, int mask, long long milliseconds); -void aeMain(aeEventLoop *eventLoop); -char *aeGetApiName(void); -void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); -void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep); -int aeGetSetSize(aeEventLoop *eventLoop); -int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); -void aeSetDontWait(aeEventLoop *eventLoop, int noWait); +#include "ae_eventloop.h" +#include "ae_apidata.h" #endif diff --git a/src/ae_apidata.h b/src/ae_apidata.h new file mode 100644 index 0000000000..8584b21af9 --- /dev/null +++ b/src/ae_apidata.h @@ -0,0 +1,20 @@ +/* + * ae_apidata.h - Header file for API data functions implemented in ae.c + * + * This header file declares functions that are implemented in ae.c. + * These functions may alter the state of the underlying apidata , + * which may require a mutex for modification in the case of the server + * main event loop (`server.el`), as it may be accessed by IO threads. + */ + +#ifndef AE_APIDATA_H +#define AE_APIDATA_H + +#include "ae_eventloop.h" + +int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); +void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); +int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp); + +#endif /* AE_APIDATA_H */ diff --git a/src/ae_eventloop.h b/src/ae_eventloop.h new file mode 100644 index 0000000000..d29dd47581 --- /dev/null +++ b/src/ae_eventloop.h @@ -0,0 +1,140 @@ +/* A simple event-driven programming library. Originally I wrote this code + * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated + * it in form of a library for easy reuse. + * + * Copyright (c) 2006-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef AE_EVENTLOOP_H +#define AE_EVENTLOOP_H + +#include "monotonic.h" +#include + +#define AE_OK 0 +#define AE_ERR -1 + +#define AE_NONE 0 /* No events registered. */ +#define AE_READABLE 1 /* Fire when descriptor is readable. */ +#define AE_WRITABLE 2 /* Fire when descriptor is writable. */ +#define AE_BARRIER \ + 4 /* With WRITABLE, never fire the event if the \ + READABLE event already fired in the same event \ + loop iteration. Useful when you want to persist \ + things to disk before sending replies, and want \ + to do that in a group fashion. */ + +#define AE_FILE_EVENTS (1 << 0) +#define AE_TIME_EVENTS (1 << 1) +#define AE_ALL_EVENTS (AE_FILE_EVENTS | AE_TIME_EVENTS) +#define AE_DONT_WAIT (1 << 2) +#define AE_CALL_BEFORE_SLEEP (1 << 3) +#define AE_CALL_AFTER_SLEEP (1 << 4) + +#define AE_NOMORE -1 +#define AE_DELETED_EVENT_ID -1 + +/* Macros */ +#define AE_NOTUSED(V) ((void)V) + +struct aeEventLoop; + +/* Types and data structures */ +typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); +typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); +typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); +typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); +typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents); +typedef int aeCustomPollProc(struct aeEventLoop *eventLoop); + +/* File event structure */ +typedef struct aeFileEvent { + int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ + aeFileProc *rfileProc; + aeFileProc *wfileProc; + void *clientData; +} aeFileEvent; + +/* Time event structure */ +typedef struct aeTimeEvent { + long long id; /* time event identifier. */ + monotime when; + aeTimeProc *timeProc; + aeEventFinalizerProc *finalizerProc; + void *clientData; + struct aeTimeEvent *prev; + struct aeTimeEvent *next; + int refcount; /* refcount to prevent timer events from being + * freed in recursive time event calls. */ +} aeTimeEvent; + +/* A fired event */ +typedef struct aeFiredEvent { + int fd; + int mask; +} aeFiredEvent; + +/* State of an event based program */ +typedef struct aeEventLoop { + int maxfd; /* highest file descriptor currently registered */ + int setsize; /* max number of file descriptors tracked */ + long long timeEventNextId; + aeFileEvent *events; /* Registered events */ + aeFiredEvent *fired; /* Fired events */ + aeTimeEvent *timeEventHead; + int stop; + void *apidata; /* This is used for polling API specific data */ + aeBeforeSleepProc *beforesleep; + aeAfterSleepProc *aftersleep; + aeCustomPollProc *custompoll; + int flags; +} aeEventLoop; + +/* Prototypes */ +aeEventLoop *aeCreateEventLoop(int setsize); +void aeDeleteEventLoop(aeEventLoop *eventLoop); +void aeStop(aeEventLoop *eventLoop); +int aeGetFileEvents(aeEventLoop *eventLoop, int fd); +void *aeGetFileClientData(aeEventLoop *eventLoop, int fd); +long long aeCreateTimeEvent(aeEventLoop *eventLoop, + long long milliseconds, + aeTimeProc *proc, + void *clientData, + aeEventFinalizerProc *finalizerProc); +int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); +int aeProcessEvents(aeEventLoop *eventLoop, int flags); +int aeWait(int fd, int mask, long long milliseconds); +void aeMain(aeEventLoop *eventLoop); +char *aeGetApiName(void); +void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); +void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep); +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll); +int aeGetSetSize(aeEventLoop *eventLoop); +void aeSetDontWait(aeEventLoop *eventLoop, int noWait); + +#endif diff --git a/src/config.c b/src/config.c index 32e6018ff2..bdf4faeff2 100644 --- a/src/config.c +++ b/src/config.c @@ -2527,7 +2527,7 @@ static int updateMaxclients(const char **err) { return 0; } if ((unsigned int)aeGetSetSize(server.el) < server.maxclients + CONFIG_FDSET_INCR) { - if (aeResizeSetSize(server.el, server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) { + if (serverResizeSetSize(server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) { *err = "The event loop API is not able to handle the specified number of clients"; return 0; } diff --git a/src/connection.h b/src/connection.h index c6466c2d4c..56848cad08 100644 --- a/src/connection.h +++ b/src/connection.h @@ -35,8 +35,9 @@ #include #include #include - -#include "ae.h" +/* We include just the event loop, not the ae.h header, + * to avoid direct access to the server event loop apidata modifications. */ +#include "ae_eventloop.h" #define CONN_INFO_LEN 32 #define CONN_ADDR_STR_LEN 128 /* Similar to INET6_ADDRSTRLEN, hoping to handle other protocols. */ diff --git a/src/io_threads.c b/src/io_threads.c index 2547b803fb..d7d02c398d 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -193,6 +193,15 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only) { } } +/* This function performs polling on the given event loop and updates the server's + * IO fired events count and poll state. */ +void IOThreadPoll(void *data) { + UNUSED(data); + struct timeval tvp = {0, 0}; + server.io_ae_fired_events = serverPoll(&tvp); + atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release); +} + static void *IOThreadMain(void *myid) { /* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */ long id = (long)myid; @@ -284,6 +293,10 @@ void killIOThreads(void) { void initIOThreads(void) { server.active_io_threads_num = 1; /* We start with threads not active. */ + pthread_mutex_init(&server.io_poll_mutex, NULL); + server.io_poll_state = AE_IO_STATE_NONE; + server.io_ae_fired_events = 0; + /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; @@ -489,3 +502,51 @@ int tryOffloadFreeObjToIOThreads(robj *obj) { server.stat_io_freed_objects++; return C_OK; } + +/* This function retrieves the results of the IO Thread poll. + * returns the number of fired events if the IO thread has finished processing poll events, 0 otherwise. */ +static int getIOThreadPollResults(aeEventLoop *eventLoop) { + int io_state; + io_state = atomic_load_explicit(&server.io_poll_state, memory_order_acquire); + if (io_state == AE_IO_STATE_POLL) { + /* IO thread is still processing poll events. */ + return 0; + } + + /* IO thread is done processing poll events. */ + serverAssert(io_state == AE_IO_STATE_DONE); + server.stat_poll_processed_by_io_threads++; + server.io_poll_state = AE_IO_STATE_NONE; + + /* Remove the custom poll proc. */ + aeSetCustomPollProc(eventLoop, NULL); + + return server.io_ae_fired_events; +} + +void trySendPollJobToIOThreads(void) { + if (server.active_io_threads_num <= 1) { + return; + } + + /* If there are no pending jobs, let the main thread do the poll-wait by itself. */ + if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) { + return; + } + + /* If the IO thread is already processing poll events, don't send another job. */ + if (server.io_poll_state != AE_IO_STATE_NONE) { + return; + } + + /* The poll is sent to the last thread. While a random thread could have been selected, + * the last thread has a slightly better chance of being less loaded compared to other threads, + * As we activate the lowest threads first. */ + int tid = server.active_io_threads_num - 1; + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */ + + server.io_poll_state = AE_IO_STATE_POLL; + aeSetCustomPollProc(server.el, getIOThreadPollResults); + IOJobQueue_push(jq, IOThreadPoll, server.el); +} diff --git a/src/io_threads.h b/src/io_threads.h index 238222ee20..7c0d636fdb 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -13,5 +13,6 @@ int tryOffloadFreeObjToIOThreads(robj *o); int tryOffloadFreeArgvToIOThreads(client *c); void adjustIOThreadsByEventLoad(int numevents, int increase_only); void drainIOThreadsQueue(void); +void trySendPollJobToIOThreads(void); #endif /* IO_THREADS_H */ diff --git a/src/module.c b/src/module.c index 90a18e0a38..0d5b041706 100644 --- a/src/module.c +++ b/src/module.c @@ -9337,7 +9337,7 @@ int VM_EventLoopAdd(int fd, int mask, ValkeyModuleEventLoopFunc func, void *user int aeMask = eventLoopToAeMask(mask); - if (aeCreateFileEvent(server.el, fd, aeMask, aeProc, data) != AE_OK) { + if (serverCreateFileEvent(fd, aeMask, aeProc, data) != AE_OK) { if (aeGetFileEvents(server.el, fd) == AE_NONE) zfree(data); return VALKEYMODULE_ERR; } @@ -9378,7 +9378,7 @@ int VM_EventLoopDel(int fd, int mask) { /* After deleting the event, if fd does not have any registered event * anymore, we can free the EventLoopData object. */ EventLoopData *data = aeGetFileClientData(server.el, fd); - aeDeleteFileEvent(server.el, fd, eventLoopToAeMask(mask)); + serverDeleteFileEvent(fd, eventLoopToAeMask(mask)); if (aeGetFileEvents(server.el, fd) == AE_NONE) zfree(data); errno = 0; diff --git a/src/rdb.c b/src/rdb.c index f9ccd676fd..0c3a7a9b98 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3455,7 +3455,7 @@ static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { serverLog(LL_WARNING, "Background transfer terminated by signal %d", bysignal); } if (server.rdb_child_exit_pipe != -1) close(server.rdb_child_exit_pipe); - aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); + serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE); close(server.rdb_pipe_read); server.rdb_child_exit_pipe = -1; server.rdb_pipe_read = -1; @@ -3609,7 +3609,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { server.rdb_save_time_start = time(NULL); server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ - if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { + if (serverCreateFileEvent(server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); } } diff --git a/src/replication.c b/src/replication.c index 21ccb0e92d..e677f494a0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1431,7 +1431,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { server.rdb_pipe_numconns_writing--; /* if there are no more writes for now for this conn, or write error: */ if (server.rdb_pipe_numconns_writing == 0) { - if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { + if (serverCreateFileEvent(server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); } } @@ -1488,7 +1488,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, if (server.rdb_pipe_bufflen == 0) { /* EOF - write end was closed. */ int stillUp = 0; - aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); + serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE); for (i = 0; i < server.rdb_pipe_numconns; i++) { connection *conn = server.rdb_pipe_conns[i]; if (!conn) continue; @@ -1542,7 +1542,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, } /* Remove the pipe read handler if at least one write handler was set. */ if (server.rdb_pipe_numconns_writing || stillAlive == 0) { - aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE); + serverDeleteFileEvent(server.rdb_pipe_read, AE_READABLE); break; } } diff --git a/src/sentinel.c b/src/sentinel.c index a095b8f48a..0d061f17b1 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -28,6 +28,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include "ae.h" #include "server.h" #include "hiredis.h" #if USE_OPENSSL == 1 /* BUILD_YES */ diff --git a/src/server.c b/src/server.c index dfeeeddad9..142c609cd3 100644 --- a/src/server.c +++ b/src/server.c @@ -26,7 +26,7 @@ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ - +#include "ae.h" #include "server.h" #include "monotonic.h" #include "cluster.h" @@ -98,6 +98,11 @@ int isReadyToShutdown(void); int finishShutdown(void); const char *replstateToString(int replstate); +/*============================ AE event modification functions ============================ */ +int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); +void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); +int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); + /*============================ Utility functions ============================ */ /* This macro tells if we are in the context of loading an AOF. */ @@ -1565,6 +1570,9 @@ extern int ProcessingEventsWhileBlocked; void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* When I/O threads are enabled and there are pending I/O jobs, the poll is offloaded to one of the I/O threads. */ + trySendPollJobToIOThreads(); + size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; @@ -1596,10 +1604,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */ connTypeProcessPendingData(); - /* If any connection type(typical TLS) still has pending unread data or if there are clients - * with pending IO reads/writes, don't sleep at all. */ - int dont_sleep = connTypeHasPendingData() || listLength(server.clients_pending_io_read) > 0 || - listLength(server.clients_pending_io_write) > 0; + /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ + int dont_sleep = connTypeHasPendingData(); /* Call the Cluster before sleep function. Note that this function * may change the state of Cluster (from ok to fail or vice versa), @@ -2375,7 +2381,7 @@ void closeListener(connListener *sfd) { for (j = 0; j < sfd->count; j++) { if (sfd->fd[j] == -1) continue; - aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); + serverDeleteFileEvent(sfd->fd[j], AE_READABLE); close(sfd->fd[j]); } @@ -2388,9 +2394,9 @@ int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) { int j; for (j = 0; j < sfd->count; j++) { - if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler, sfd) == AE_ERR) { + if (serverCreateFileEvent(sfd->fd[j], AE_READABLE, accept_handler, sfd) == AE_ERR) { /* Rollback */ - for (j = j - 1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); + for (j = j - 1; j >= 0; j--) serverDeleteFileEvent(sfd->fd[j], AE_READABLE); return C_ERR; } } @@ -2493,6 +2499,7 @@ void resetServerStats(void) { server.stat_total_reads_processed = 0; server.stat_io_writes_processed = 0; server.stat_io_freed_objects = 0; + server.stat_poll_processed_by_io_threads = 0; server.stat_total_writes_processed = 0; server.stat_client_qbuf_limit_disconnections = 0; server.stat_client_outbuf_limit_disconnections = 0; @@ -2698,7 +2705,7 @@ void initServer(void) { /* Register a readable event for the pipe used to awake the event loop * from module threads. */ - if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE, modulePipeReadable, NULL) == AE_ERR) { + if (serverCreateFileEvent(server.module_pipe[0], AE_READABLE, modulePipeReadable, NULL) == AE_ERR) { serverPanic("Error registering the readable event for the module pipe."); } @@ -5704,6 +5711,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, + "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, @@ -6351,6 +6359,53 @@ void dismissClientMemory(client *c) { } } +/* server eventloop functions */ +int serverResizeSetSize(int setsize) { + if (server.io_poll_state != AE_IO_STATE_POLL) { + return aeResizeSetSize(server.el, setsize); + } + /* Take mutex - we need to be sure that the file events are not + * resized while the IO thread is polling for events. */ + pthread_mutex_lock(&server.io_poll_mutex); + int ret = aeResizeSetSize(server.el, setsize); + pthread_mutex_unlock(&server.io_poll_mutex); + return ret; +} + +int serverCreateFileEvent(int fd, int mask, aeFileProc *proc, void *clientData) { + if (server.io_poll_state != AE_IO_STATE_POLL) { + return aeCreateFileEvent(server.el, fd, mask, proc, clientData); + } + /* Take mutex - we need to be sure that the file event is not + * added while the IO thread is polling for events. */ + pthread_mutex_lock(&server.io_poll_mutex); + int ret = aeCreateFileEvent(server.el, fd, mask, proc, clientData); + pthread_mutex_unlock(&server.io_poll_mutex); + return ret; +} + +void serverDeleteFileEvent(int fd, int mask) { + if (server.io_poll_state != AE_IO_STATE_POLL) { + aeDeleteFileEvent(server.el, fd, mask); + } + /* Take mutex - we need to be sure that the file event is not + * deleted from the event loop while the IO thread is + * polling for events. */ + pthread_mutex_lock(&server.io_poll_mutex); + aeDeleteFileEvent(server.el, fd, mask); + pthread_mutex_unlock(&server.io_poll_mutex); +} + +/* Called by the IO thread to poll for events */ +int serverPoll(struct timeval *tvp) { + /* Take mutex - we need to be sure that the file events are not + * resized while the IO thread is polling for events. */ + pthread_mutex_lock(&server.io_poll_mutex); + int num_events = aePoll(server.el, tvp); + pthread_mutex_unlock(&server.io_poll_mutex); + return num_events; +} + /* In the child process, we don't need some buffers anymore, and these are * likely to change in the parent when there's heavy write traffic. * We dismiss them right away, to avoid CoW. diff --git a/src/server.h b/src/server.h index 1ce8db495f..131f8f8fba 100644 --- a/src/server.h +++ b/src/server.h @@ -64,7 +64,6 @@ typedef long long mstime_t; /* millisecond time type. */ typedef long long ustime_t; /* microsecond time type. */ -#include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ #include "dict.h" /* Hash tables */ #include "kvstore.h" /* Slot-based hash table */ @@ -639,6 +638,9 @@ typedef enum { #define BUSY_MODULE_YIELD_EVENTS (1 << 0) #define BUSY_MODULE_YIELD_CLIENTS (1 << 1) +/* IO poll */ +typedef enum { AE_IO_STATE_NONE, AE_IO_STATE_POLL, AE_IO_STATE_DONE } AeIoState; + /*----------------------------------------------------------------------------- * Data types *----------------------------------------------------------------------------*/ @@ -1599,6 +1601,9 @@ struct valkeyServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; + _Atomic AeIoState io_poll_state; /* Indicates the state of the IO polling. */ + pthread_mutex_t io_poll_mutex; /* Mutex to protect the `server.el` polling access. */ + int io_ae_fired_events; /* Number of poll events received by the IO thread. */ rax *errors; /* Errors table */ int errors_enabled; /* If true, errorstats is enabled, and we will add new errors. */ unsigned int lruclock; /* Clock for LRU eviction */ @@ -1754,6 +1759,7 @@ struct valkeyServer { long long stat_io_reads_processed; /* Number of read events processed by IO threads */ long long stat_io_writes_processed; /* Number of write events processed by IO threads */ long long stat_io_freed_objects; /* Number of objects freed by IO threads */ + long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */ long long stat_total_reads_processed; /* Total number of read events processed */ long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ @@ -3204,6 +3210,10 @@ robj *activeDefragStringOb(robj *ob); void dismissSds(sds s); void dismissMemory(void *ptr, size_t size_hint); void dismissMemoryInChild(void); +void serverDeleteFileEvent(int fd, int mask); +int serverCreateFileEvent(int fd, int mask, aeFileProc *proc, void *clientData); +int serverResizeSetSize(int setsize); +int serverPoll(struct timeval *tvp); #define RESTART_SERVER_NONE 0 #define RESTART_SERVER_GRACEFULLY (1 << 0) /* Do proper shutdown. */ diff --git a/src/socket.c b/src/socket.c index b2f8f1aaec..843cf4b738 100644 --- a/src/socket.c +++ b/src/socket.c @@ -117,7 +117,7 @@ static int connSocketConnect(connection *conn, conn->state = CONN_STATE_CONNECTING; conn->conn_handler = connect_handler; - aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, conn->type->ae_handler, conn); + serverCreateFileEvent(conn->fd, AE_WRITABLE, conn->type->ae_handler, conn); return C_OK; } @@ -137,7 +137,7 @@ static void connSocketShutdown(connection *conn) { /* Close the connection and free resources. */ static void connSocketClose(connection *conn) { if (conn->fd != -1) { - aeDeleteFileEvent(server.el, conn->fd, AE_READABLE | AE_WRITABLE); + serverDeleteFileEvent(conn->fd, AE_READABLE | AE_WRITABLE); close(conn->fd); conn->fd = -1; } @@ -227,8 +227,8 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu else conn->flags &= ~CONN_FLAG_WRITE_BARRIER; if (!conn->write_handler) - aeDeleteFileEvent(server.el, conn->fd, AE_WRITABLE); - else if (aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, conn->type->ae_handler, conn) == AE_ERR) + serverDeleteFileEvent(conn->fd, AE_WRITABLE); + else if (serverCreateFileEvent(conn->fd, AE_WRITABLE, conn->type->ae_handler, conn) == AE_ERR) return C_ERR; return C_OK; } @@ -241,8 +241,8 @@ static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc fun conn->read_handler = func; if (!conn->read_handler) - aeDeleteFileEvent(server.el, conn->fd, AE_READABLE); - else if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) + serverDeleteFileEvent(conn->fd, AE_READABLE); + else if (serverCreateFileEvent(conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) return C_ERR; return C_OK; } @@ -265,7 +265,7 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD conn->state = CONN_STATE_CONNECTED; } - if (!conn->write_handler) aeDeleteFileEvent(server.el, conn->fd, AE_WRITABLE); + if (!conn->write_handler) serverDeleteFileEvent(conn->fd, AE_WRITABLE); if (!callHandler(conn, conn->conn_handler)) return; conn->conn_handler = NULL; diff --git a/src/tls.c b/src/tls.c index 1913d876fa..1aee4224b3 100644 --- a/src/tls.c +++ b/src/tls.c @@ -586,12 +586,12 @@ static void registerSSLEvent(tls_connection *conn, WantIOType want) { switch (want) { case WANT_READ: - if (mask & AE_WRITABLE) aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE); - if (!(mask & AE_READABLE)) aeCreateFileEvent(server.el, conn->c.fd, AE_READABLE, tlsEventHandler, conn); + if (mask & AE_WRITABLE) serverDeleteFileEvent(conn->c.fd, AE_WRITABLE); + if (!(mask & AE_READABLE)) serverCreateFileEvent(conn->c.fd, AE_READABLE, tlsEventHandler, conn); break; case WANT_WRITE: - if (mask & AE_READABLE) aeDeleteFileEvent(server.el, conn->c.fd, AE_READABLE); - if (!(mask & AE_WRITABLE)) aeCreateFileEvent(server.el, conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); + if (mask & AE_READABLE) serverDeleteFileEvent(conn->c.fd, AE_READABLE); + if (!(mask & AE_WRITABLE)) serverCreateFileEvent(conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); break; default: serverAssert(0); break; } @@ -629,13 +629,11 @@ static void updateSSLEvent(tls_connection *conn) { int need_read = conn->c.read_handler || (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ); int need_write = conn->c.write_handler || (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE); - if (need_read && !(mask & AE_READABLE)) - aeCreateFileEvent(server.el, conn->c.fd, AE_READABLE, tlsEventHandler, conn); - if (!need_read && (mask & AE_READABLE)) aeDeleteFileEvent(server.el, conn->c.fd, AE_READABLE); + if (need_read && !(mask & AE_READABLE)) serverCreateFileEvent(conn->c.fd, AE_READABLE, tlsEventHandler, conn); + if (!need_read && (mask & AE_READABLE)) serverDeleteFileEvent(conn->c.fd, AE_READABLE); - if (need_write && !(mask & AE_WRITABLE)) - aeCreateFileEvent(server.el, conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); - if (!need_write && (mask & AE_WRITABLE)) aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE); + if (need_write && !(mask & AE_WRITABLE)) serverCreateFileEvent(conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); + if (!need_write && (mask & AE_WRITABLE)) serverDeleteFileEvent(conn->c.fd, AE_WRITABLE); } static void updateSSLState(connection *conn_) {