Skip to content

Commit

Permalink
poll offload to io threads
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Jul 12, 2024
1 parent 9f49742 commit a941909
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 182 deletions.
51 changes: 34 additions & 17 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include <stdlib.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <errno.h>

#include "zmalloc.h"
Expand Down Expand Up @@ -81,6 +80,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->custompoll = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
Expand Down Expand Up @@ -345,6 +345,13 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
return processed;
}

/* This function provides direct access to the aeApiPoll call.
* It is intended to be called from a custom poll function,
* currently invoked from IOThreadPoll.*/
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) {
return aeApiPoll(eventLoop, tvp);
}

/* Process every pending file event, then every pending time event
* (that may be registered by file event callbacks just processed).
* Without special flags the function sleeps until some file event
Expand Down Expand Up @@ -377,25 +384,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {

if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop);

/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
if (eventLoop->custompoll != NULL) {
numevents = eventLoop->custompoll(eventLoop);
} else {
/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

/* Don't process file events if not requested. */
if (!(flags & AE_FILE_EVENTS)) {
Expand Down Expand Up @@ -503,3 +514,9 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}

/* This function allows setting a custom poll procedure to be used by the event loop.
* The custom poll procedure, if set, will be called instead of the default aeApiPoll */
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) {
eventLoop->custompoll = custompoll;
}
136 changes: 8 additions & 128 deletions src/ae.h
Original file line number Diff line number Diff line change
@@ -1,139 +1,19 @@
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
*
* Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef __AE_H__
#define __AE_H__

#include "monotonic.h"

#define AE_OK 0
#define AE_ERR -1

#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER \
4 /* With WRITABLE, never fire the event if the \
READABLE event already fired in the same event \
loop iteration. Useful when you want to persist \
things to disk before sending replies, and want \
to do that in a group fashion. */

#define AE_FILE_EVENTS (1 << 0)
#define AE_TIME_EVENTS (1 << 1)
#define AE_ALL_EVENTS (AE_FILE_EVENTS | AE_TIME_EVENTS)
#define AE_DONT_WAIT (1 << 2)
#define AE_CALL_BEFORE_SLEEP (1 << 3)
#define AE_CALL_AFTER_SLEEP (1 << 4)

#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1

/* Macros */
#define AE_NOTUSED(V) ((void)V)

struct aeEventLoop;

/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);

/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
monotime when;
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
#include "ae_eventloop.h"

/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeAfterSleepProc *aftersleep;
int flags;
} aeEventLoop;
/* This header file declares functions that are implemented in ae.c.
* These functions may alter the state of the underlying apidata,
* which may require a mutex for modification in the case of the server
* main event loop (`server.el`), as it may be accessed by IO threads.
* In files that may access the main event loop, we will include only the
* eventloop.h header file, which does not include the functions declared here. */

/* Prototypes */
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
void *aeGetFileClientData(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop,
long long milliseconds,
aeTimeProc *proc,
void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp);

#endif
140 changes: 140 additions & 0 deletions src/ae_eventloop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
*
* Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef AE_EVENTLOOP_H
#define AE_EVENTLOOP_H

#include "monotonic.h"
#include <time.h>

#define AE_OK 0
#define AE_ERR -1

#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER \
4 /* With WRITABLE, never fire the event if the \
READABLE event already fired in the same event \
loop iteration. Useful when you want to persist \
things to disk before sending replies, and want \
to do that in a group fashion. */

#define AE_FILE_EVENTS (1 << 0)
#define AE_TIME_EVENTS (1 << 1)
#define AE_ALL_EVENTS (AE_FILE_EVENTS | AE_TIME_EVENTS)
#define AE_DONT_WAIT (1 << 2)
#define AE_CALL_BEFORE_SLEEP (1 << 3)
#define AE_CALL_AFTER_SLEEP (1 << 4)

#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1

/* Macros */
#define AE_NOTUSED(V) ((void)V)

struct aeEventLoop;

/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);
typedef int aeCustomPollProc(struct aeEventLoop *eventLoop);

/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
monotime when;
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;

/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeAfterSleepProc *aftersleep;
aeCustomPollProc *custompoll;
int flags;
} aeEventLoop;

/* Prototypes */
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
void *aeGetFileClientData(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop,
long long milliseconds,
aeTimeProc *proc,
void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll);
int aeGetSetSize(aeEventLoop *eventLoop);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);

#endif
2 changes: 1 addition & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2527,7 +2527,7 @@ static int updateMaxclients(const char **err) {
return 0;
}
if ((unsigned int)aeGetSetSize(server.el) < server.maxclients + CONFIG_FDSET_INCR) {
if (aeResizeSetSize(server.el, server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) {
if (serverResizeSetSize(server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) {
*err = "The event loop API is not able to handle the specified number of clients";
return 0;
}
Expand Down
5 changes: 3 additions & 2 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
#include <stdio.h>
#include <string.h>
#include <sys/uio.h>

#include "ae.h"
/* We include just the event loop, not the ae.h header,
* to avoid direct access to the server event loop apidata modifications. */
#include "ae_eventloop.h"

#define CONN_INFO_LEN 32
#define CONN_ADDR_STR_LEN 128 /* Similar to INET6_ADDRSTRLEN, hoping to handle other protocols. */
Expand Down
Loading

0 comments on commit a941909

Please sign in to comment.