Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Io thread work offload #763

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 91 additions & 25 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@
#endif
#endif

#define AE_LOCK(eventLoop) \
if ((eventLoop)->flags & AE_PROTECT_POLL) { \
assert(pthread_mutex_lock(&(eventLoop)->poll_mutex) == 0); \
}

#define AE_UNLOCK(eventLoop) \
if ((eventLoop)->flags & AE_PROTECT_POLL) { \
assert(pthread_mutex_unlock(&(eventLoop)->poll_mutex) == 0); \
}

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
Expand All @@ -81,7 +90,14 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->custompoll = NULL;
eventLoop->flags = 0;
/* Initialize the eventloop mutex with PTHREAD_MUTEX_ERRORCHECK type */
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if (pthread_mutex_init(&eventLoop->poll_mutex, &attr) != 0) goto err;

if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
Expand Down Expand Up @@ -122,11 +138,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait) {
*
* Otherwise AE_OK is returned and the operation is successful. */
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
AE_LOCK(eventLoop);
int ret = AE_OK;
int i;

if (setsize == eventLoop->setsize) return AE_OK;
if (eventLoop->maxfd >= setsize) return AE_ERR;
if (aeApiResize(eventLoop, setsize) == -1) return AE_ERR;
if (setsize == eventLoop->setsize) goto done;
if (eventLoop->maxfd >= setsize) goto err;
if (aeApiResize(eventLoop, setsize) == -1) goto err;

eventLoop->events = zrealloc(eventLoop->events, sizeof(aeFileEvent) * setsize);
eventLoop->fired = zrealloc(eventLoop->fired, sizeof(aeFiredEvent) * setsize);
Expand All @@ -135,7 +153,13 @@ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
/* Make sure that if we created new slots, they are initialized with
* an AE_NONE mask. */
for (i = eventLoop->maxfd + 1; i < setsize; i++) eventLoop->events[i].mask = AE_NONE;
return AE_OK;
goto done;

err:
ret = AE_ERR;
done:
AE_UNLOCK(eventLoop);
return ret;
}

void aeDeleteEventLoop(aeEventLoop *eventLoop) {
Expand All @@ -159,25 +183,35 @@ void aeStop(aeEventLoop *eventLoop) {
}

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
AE_LOCK(eventLoop);
int ret = AE_ERR;

if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
goto done;
}
aeFileEvent *fe = &eventLoop->events[fd];

if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR;
if (aeApiAddEvent(eventLoop, fd, mask) == -1) goto done;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd) eventLoop->maxfd = fd;
return AE_OK;

ret = AE_OK;

done:
AE_UNLOCK(eventLoop);
return ret;
}

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
if (fd >= eventLoop->setsize) return;
AE_LOCK(eventLoop);
if (fd >= eventLoop->setsize) goto done;

aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
if (fe->mask == AE_NONE) goto done;

/* We want to always remove AE_BARRIER if set when AE_WRITABLE
* is removed. */
Expand All @@ -204,6 +238,9 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
* which is required by evport and epoll */
aeApiDelEvent(eventLoop, fd, mask);
}

done:
AE_UNLOCK(eventLoop);
}

void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) {
Expand Down Expand Up @@ -345,6 +382,17 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
return processed;
}

/* This function provides direct access to the aeApiPoll call.
* It is intended to be called from a custom poll function.*/
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) {
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
AE_LOCK(eventLoop);

int ret = aeApiPoll(eventLoop, tvp);

AE_UNLOCK(eventLoop);
return ret;
}

/* Process every pending file event, then every pending time event
* (that may be registered by file event callbacks just processed).
* Without special flags the function sleeps until some file event
Expand Down Expand Up @@ -377,25 +425,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {

if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop);

/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
if (eventLoop->custompoll != NULL) {
numevents = eventLoop->custompoll(eventLoop);
} else {
/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

/* Don't process file events if not requested. */
if (!(flags & AE_FILE_EVENTS)) {
Expand Down Expand Up @@ -503,3 +555,17 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}

/* This function allows setting a custom poll procedure to be used by the event loop.
* The custom poll procedure, if set, will be called instead of the default aeApiPoll */
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) {
eventLoop->custompoll = custompoll;
}

void aeSetPollProtect(aeEventLoop *eventLoop, int protect) {
if (protect) {
uriyage marked this conversation as resolved.
Show resolved Hide resolved
eventLoop->flags |= AE_PROTECT_POLL;
} else {
eventLoop->flags &= ~AE_PROTECT_POLL;
}
}
9 changes: 9 additions & 0 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#define __AE_H__

#include "monotonic.h"
#include <pthread.h>

#define AE_OK 0
#define AE_ERR -1
Expand All @@ -54,13 +55,15 @@
#define AE_DONT_WAIT (1 << 2)
#define AE_CALL_BEFORE_SLEEP (1 << 3)
#define AE_CALL_AFTER_SLEEP (1 << 4)
#define AE_PROTECT_POLL (1 << 5)

#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1

/* Macros */
#define AE_NOTUSED(V) ((void)V)

struct timeval; /* forward declaration */
struct aeEventLoop;

/* Types and data structures */
Expand All @@ -69,6 +72,7 @@ typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *client
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);
typedef int aeCustomPollProc(struct aeEventLoop *eventLoop);

/* File event structure */
typedef struct aeFileEvent {
Expand Down Expand Up @@ -109,6 +113,8 @@ typedef struct aeEventLoop {
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeAfterSleepProc *aftersleep;
aeCustomPollProc *custompoll;
pthread_mutex_t poll_mutex;
int flags;
} aeEventLoop;

Expand All @@ -132,6 +138,9 @@ void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll);
void aeSetPollProtect(aeEventLoop *eventLoop, int protect);
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
Expand Down
6 changes: 5 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "latency.h"
#include "script.h"
#include "functions.h"
#include "io_threads.h"

#include <signal.h>
#include <ctype.h>
Expand Down Expand Up @@ -297,7 +298,10 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
if (server.lazyfree_lazy_server_del) {
/* For efficiency, let the I/O thread that allocated an object also deallocate it. */
if (tryOffloadFreeObjToIOThreads(old) == C_OK) {
/* OK */
} else if (server.lazyfree_lazy_server_del) {
freeObjAsync(key, old, db->id);
} else {
decrRefCount(old);
Expand Down
6 changes: 6 additions & 0 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ int _dictResize(dict *d, unsigned long size, int *malloc_failed) {
return DICT_OK;
}

if (d->type->no_incremental_rehash) {
/* If the dict type does not support incremental rehashing, we need to
* rehash the whole table immediately. */
while (dictRehash(d, 1000));
}

return DICT_OK;
}

Expand Down
2 changes: 2 additions & 0 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ typedef struct dictType {
/* If embedded_entry flag is set, it indicates that a copy of the key is created and the key is embedded
* as part of the dict entry. */
unsigned int embedded_entry : 1;
/* Perform rehashing during resizing instead of incrementally rehashing across multiple steps */
unsigned int no_incremental_rehash : 1;
} dictType;

#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1 << (exp))
Expand Down
Loading
Loading