Skip to content

Commit

Permalink
Async IO threads
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Jul 8, 2024
1 parent 5f0ccf1 commit c387e1f
Show file tree
Hide file tree
Showing 38 changed files with 1,551 additions and 969 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ jobs:
run: sudo apt-get install tcl8.6 tclx
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
run: ./runtest --config io-threads 4 --config io-threads-do-reads yes --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
run: ./runtest --config io-threads 2 --config events-per-io-thread 0 --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
- name: cluster tests
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: ./runtest-cluster --config io-threads 4 --config io-threads-do-reads yes ${{github.event.inputs.cluster_test_args}}
run: ./runtest-cluster --config io-threads 2 --config events-per-io-thread 0 ${{github.event.inputs.cluster_test_args}}

test-ubuntu-reclaim-cache:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
4 changes: 2 additions & 2 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
}

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop, numevents);

for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
Expand Down Expand Up @@ -489,6 +489,6 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
eventLoop->beforesleep = beforesleep;
}

void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}
5 changes: 3 additions & 2 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData,
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);

/* File event structure */
typedef struct aeFileEvent {
Expand Down Expand Up @@ -107,7 +108,7 @@ typedef struct aeEventLoop {
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
aeAfterSleepProc *aftersleep;
int flags;
} aeEventLoop;

Expand All @@ -130,7 +131,7 @@ int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
Expand Down
2 changes: 1 addition & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void processUnblockedClients(void) {
if (!c->flag.blocked) {
/* If we have a queued command, execute it now. */
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
c = NULL;
continue;
}
}
beforeNextClient(c);
Expand Down
6 changes: 5 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ void loadServerConfigFromString(char *config) {
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;

/* To ensure backward compatibility when io_threads_num is according to the previous maximum of 128. */
if (server.io_threads_num > IO_THREADS_MAX_NUM) server.io_threads_num = IO_THREADS_MAX_NUM;

sdsfreesplitres(lines, totlines);
reading_config_file = 0;
return;
Expand Down Expand Up @@ -3023,7 +3026,7 @@ standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_threads_do_reads, 0, NULL, NULL), /* Read + parse from threads? */
createBoolConfig("io-threads-do-reads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_threads_do_reads, 1, NULL, NULL), /* Read + parse from threads */
createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL),
createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL),
createBoolConfig("rdbcompression", NULL, MODIFIABLE_CONFIG, server.rdb_compression, 1, NULL, NULL),
Expand Down Expand Up @@ -3124,6 +3127,7 @@ standardConfig static_configs[] = {
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
Expand Down
11 changes: 10 additions & 1 deletion src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ void setproctitle(const char *fmt, ...);
#error "Undefined or invalid BYTE_ORDER"
#endif

/* Cache line alignment */
#ifndef CACHE_LINE_SIZE
#if defined(__aarch64__) && defined(__APPLE__)
#define CACHE_LINE_SIZE 128
#else
#define CACHE_LINE_SIZE 64
#endif /* __aarch64__ && __APPLE__ */
#endif /* CACHE_LINE_SIZE */

#if (__i386 || __amd64 || __powerpc__) && __GNUC__
#define GNUC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__)
#if defined(__clang__)
Expand Down Expand Up @@ -329,7 +338,7 @@ void setcpuaffinity(const char *cpulist);
#define HAVE_FADVISE
#endif

#define IO_THREADS_MAX_NUM 128
#define IO_THREADS_MAX_NUM 16

#ifndef CACHE_LINE_SIZE
#if defined(__aarch64__) && defined(__APPLE__)
Expand Down
18 changes: 18 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ typedef struct ConnectionType {
int (*has_pending_data)(void);
int (*process_pending_data)(void);

/* Postpone update state - with IO threads & TLS we don't want the IO threads to update the event loop events - let
* the main-thread do it */
void (*postpone_update_state)(struct connection *conn, int);
/* Called by the main-thread */
void (*update_state)(struct connection *conn);

/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);
} ConnectionType;
Expand Down Expand Up @@ -456,4 +462,16 @@ static inline int connIsTLS(connection *conn) {
return conn && conn->type == connectionTypeTls();
}

static inline void connUpdateState(connection *conn) {
if (conn->type->update_state) {
conn->type->update_state(conn);
}
}

static inline void connSetPostponeUpdateState(connection *conn, int on) {
if (conn->type->postpone_update_state) {
conn->type->postpone_update_state(conn, on);
}
}

#endif /* __REDIS_CONNECTION_H */
2 changes: 2 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "fpconv_dtoa.h"
#include "cluster.h"
#include "threads_mngr.h"
#include "io_threads.h"

#include <arpa/inet.h>
#include <signal.h>
Expand Down Expand Up @@ -2159,6 +2160,7 @@ void removeSigSegvHandlers(void) {
}

void printCrashReport(void) {
server.crashed = 1;
/* Log INFO and CLIENT LIST */
logServerInfo();

Expand Down
2 changes: 1 addition & 1 deletion src/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ void ldbEndSession(client *c) {

/* If it's a fork()ed session, we just exit. */
if (ldb.forked) {
writeToClient(c, 0);
writeToClient(c);
serverLog(LL_NOTICE, "Lua debugging session child exiting");
exitFromChild(0);
} else {
Expand Down
Loading

0 comments on commit c387e1f

Please sign in to comment.