Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MSG_ZEROCOPY for plaintext replication traffic #1543

Open
wants to merge 24 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
855a059
Initial draft of zerocopy for replication streams
murphyjacob4 Nov 26, 2024
0a33aaa
Enforce minimum zero copy write size
murphyjacob4 Dec 4, 2024
7199031
Incremental improvements to zerocopy
murphyjacob4 Dec 10, 2024
756ab02
Add tests and support for non-graceful termination
murphyjacob4 Jan 10, 2025
b9a8086
Merge remote-tracking branch 'origin/unstable' into zerocopy
murphyjacob4 Jan 10, 2025
35562ce
Remove debug logs and fix a merge error
murphyjacob4 Jan 10, 2025
6caf217
Fix typos and cmake
murphyjacob4 Jan 10, 2025
bbedd5b
Allow debug command to pause error queue to make tests deterministic …
murphyjacob4 Jan 10, 2025
bec3ff2
Make tests more deterministic
murphyjacob4 Jan 10, 2025
a04d132
Fix mac build
murphyjacob4 Jan 10, 2025
929bc75
Disable zerocopy test suite on unsupported builds
murphyjacob4 Jan 10, 2025
f39f7d3
Typo fix
murphyjacob4 Jan 10, 2025
a47b959
More test stability fixes
murphyjacob4 Jan 10, 2025
82f7303
Log when zerocopy tests are skipped
murphyjacob4 Jan 10, 2025
a1c3e52
Fix wait for offset sync behavior in tests
murphyjacob4 Jan 10, 2025
469fd9b
Cleanup unused functions and fix parameter issue in test
murphyjacob4 Jan 10, 2025
3a11ce9
Make event loop code more readable
murphyjacob4 Jan 11, 2025
63a583e
Remove draining force close logic
murphyjacob4 Jan 11, 2025
0d5a2c7
Fix build
murphyjacob4 Jan 11, 2025
d81949c
Restore gitignore
murphyjacob4 Jan 11, 2025
425366f
Add additional resiliency to zerocopy tests
murphyjacob4 Jan 11, 2025
22d3258
Cleanup zerocopy messsage processing
murphyjacob4 Jan 11, 2025
359728f
Disable zerocopy for non-TCP connections
murphyjacob4 Jan 11, 2025
89e614e
Apply clang-format
murphyjacob4 Jan 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/connection.c
${CMAKE_SOURCE_DIR}/src/unix.c
${CMAKE_SOURCE_DIR}/src/server.c
${CMAKE_SOURCE_DIR}/src/logreqres.c)
${CMAKE_SOURCE_DIR}/src/logreqres.c
${CMAKE_SOURCE_DIR}/src/zerocopy.c)

# valkey-cli
set(VALKEY_CLI_SRCS
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o zerocopy.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
76 changes: 47 additions & 29 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
if (mask & AE_ERROR_QUEUE) fe->errfileproc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd) eventLoop->maxfd = fd;

Expand Down Expand Up @@ -233,7 +234,7 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
/* Check whether there are events to be removed.
* Note: user may remove the AE_BARRIER without
* touching the actual events. */
if (mask & (AE_READABLE | AE_WRITABLE)) {
if (mask & (AE_READABLE | AE_WRITABLE | AE_ERROR_QUEUE)) {
/* Must be invoked after the eventLoop mask is modified,
* which is required by evport and epoll */
aeApiDelEvent(eventLoop, fd, mask);
Expand Down Expand Up @@ -461,7 +462,8 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */
int event_order[AE_NUM_EVENT_TYPES];
aeFileProc *prev_fired[AE_NUM_EVENT_TYPES - 1] = {0};

/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
Expand All @@ -475,34 +477,49 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
* in the beforeSleep() hook, like fsyncing a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;

/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}

/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (invert) {
event_order[0] = AE_ERROR_QUEUE;
event_order[1] = AE_WRITABLE;
event_order[2] = AE_READABLE;
} else {
event_order[0] = AE_ERROR_QUEUE;
event_order[1] = AE_READABLE;
event_order[2] = AE_WRITABLE;
}
Comment on lines +480 to 488
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (invert) {
event_order[0] = AE_ERROR_QUEUE;
event_order[1] = AE_WRITABLE;
event_order[2] = AE_READABLE;
} else {
event_order[0] = AE_ERROR_QUEUE;
event_order[1] = AE_READABLE;
event_order[2] = AE_WRITABLE;
}
event_order[0] = AE_ERROR_QUEUE;
event_order[1] = invert ? AE_WRITABLE : AE_READABLE;
event_order[0] = invert ? AE_READABLE : AE_WRITABLE;


/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
/* Check if each event in our ordering should be fired based on the
* event mask. Also deduplicate event handlers, and refresh the
* events after each call. */
for (int i = 0; i < AE_NUM_EVENT_TYPES; i++) {
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid. */
if (fe->mask & mask & event_order[i]) {
aeFileProc *to_fire = NULL;
switch (event_order[i]) {
case AE_READABLE:
to_fire = fe->rfileProc;
break;
case AE_WRITABLE:
to_fire = fe->wfileProc;
break;
case AE_ERROR_QUEUE:
to_fire = fe->errfileproc;
break;
}
int already_fired = 0;
for (int j = 0; j < i; j++) {
if (prev_fired[j] == to_fire) {
already_fired = 1;
}
}
if (!already_fired) {
to_fire(eventLoop, fd, fe->clientData, mask);
if (i != AE_NUM_EVENT_TYPES - 1) {
prev_fired[i] = to_fire;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
}
}
}

Expand All @@ -525,11 +542,12 @@ int aeWait(int fd, int mask, long long milliseconds) {
pfd.fd = fd;
if (mask & AE_READABLE) pfd.events |= POLLIN;
if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
if (mask & AE_ERROR_QUEUE) pfd.events |= POLLERR;

if ((retval = poll(&pfd, 1, milliseconds)) == 1) {
if (pfd.revents & POLLIN) retmask |= AE_READABLE;
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) retmask |= AE_ERROR_QUEUE;
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask;
} else {
Expand Down
22 changes: 13 additions & 9 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@
#define AE_OK 0
#define AE_ERR -1

#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the \
READABLE event already fired in the same event \
loop iteration. Useful when you want to persist \
things to disk before sending replies, and want \
to do that in a group fashion. */
#define AE_NONE 0 /* No events registered. */
#define AE_READABLE 1 << 0 /* Fire when descriptor is readable. */
#define AE_WRITABLE 1 << 1 /* Fire when descriptor is writable. */
#define AE_BARRIER 1 << 2 /* With WRITABLE, never fire the event if the \
READABLE event already fired in the same event \
loop iteration. Useful when you want to persist \
things to disk before sending replies, and want \
to do that in a group fashion. */
#define AE_ERROR_QUEUE 1 << 3 /* Fire when descriptor has a message on the \
Comment on lines +43 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#define AE_READABLE 1 << 0 /* Fire when descriptor is readable. */
#define AE_WRITABLE 1 << 1 /* Fire when descriptor is writable. */
#define AE_BARRIER 1 << 2 /* With WRITABLE, never fire the event if the \
READABLE event already fired in the same event \
loop iteration. Useful when you want to persist \
things to disk before sending replies, and want \
to do that in a group fashion. */
#define AE_ERROR_QUEUE 1 << 3 /* Fire when descriptor has a message on the \
#define AE_READABLE (1 << 0) /* Fire when descriptor is readable. */
#define AE_WRITABLE (1 << 1) /* Fire when descriptor is writable. */
#define AE_BARRIER (1 << 2) /* With WRITABLE, never fire the event if the \
READABLE event already fired in the same event \
loop iteration. Useful when you want to persist \
things to disk before sending replies, and want \
to do that in a group fashion. */
#define AE_ERROR_QUEUE (1 << 3) /* Fire when descriptor has a message on the \

message queue. */
#define AE_NUM_EVENT_TYPES 3 /* Total number of events we can fire in one pass */

#define AE_FILE_EVENTS (1 << 0)
#define AE_TIME_EVENTS (1 << 1)
Expand Down Expand Up @@ -75,9 +78,10 @@ typedef int aeCustomPollProc(struct aeEventLoop *eventLoop);

/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER|ERROR_QUEUE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
aeFileProc *errfileproc;
void *clientData;
} aeFileEvent;

Expand Down
6 changes: 5 additions & 1 deletion src/ae_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
if (mask & AE_ERROR_QUEUE) ee.events |= EPOLLERR;

ee.data.fd = fd;
if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
return 0;
Expand All @@ -97,6 +99,8 @@ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
if (mask & AE_ERROR_QUEUE) ee.events |= EPOLLERR;

ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
Expand All @@ -123,7 +127,7 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE | AE_READABLE;
if (e->events & EPOLLERR) mask |= AE_ERROR_QUEUE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE | AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
Expand Down
16 changes: 16 additions & 0 deletions src/anet.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,22 @@ int anetRecvTimeout(char *err, int fd, long long ms) {
return ANET_OK;
}


int anetSetZeroCopy(char *err, int fd, int setting) {
#ifdef HAVE_MSG_ZEROCOPY
if (setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &setting, sizeof(setting)) < 0) {
anetSetError(err, "setsockopt SO_ZEROCOPY: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
#else
UNUSED(fd);
UNUSED(setting);
anetSetError(err, "anetSetZeroCopy unsupported on this platform");
return ANET_OK;
#endif
}

/* Resolve the hostname "host" and set the string representation of the
* IP address into the buffer pointed by "ipbuf".
*
Expand Down
1 change: 1 addition & 0 deletions src/anet.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ int anetEnableTcpNoDelay(char *err, int fd);
int anetDisableTcpNoDelay(char *err, int fd);
int anetSendTimeout(char *err, int fd, long long ms);
int anetRecvTimeout(char *err, int fd, long long ms);
int anetSetZeroCopy(char *err, int fd, int setting);
int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int remote);
int anetKeepAlive(char *err, int fd, int interval);
int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port);
Expand Down
15 changes: 15 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,19 @@ static int updateExtendedRedisCompat(const char **err) {
return 1;
}

static int applyTcpTxZerocopy(const char **err) {
#ifndef HAVE_MSG_ZEROCOPY
if (server.tcp_tx_zerocopy) {
*err = "TCP zerocopy is not supported by this system";
return 0;
}
return 1;
#else
UNUSED(err);
return 1;
#endif
}

static int updateSighandlerEnabled(const char **err) {
UNUSED(err);
if (server.crashlog_enabled)
Expand Down Expand Up @@ -3188,6 +3201,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("tcp-tx-zerocopy", NULL, MODIFIABLE_CONFIG, server.tcp_tx_zerocopy, CONFIG_DEFAULT_TCP_TX_ZEROCOPY, NULL, applyTcpTxZerocopy),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down Expand Up @@ -3289,6 +3303,7 @@ standardConfig static_configs[] = {
createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort),
createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-zerocopy-min-write-size", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcp_zerocopy_min_write_size, CONFIG_DEFAULT_ZERO_COPY_MIN_WRITE_SIZE, INTEGER_CONFIG, NULL, NULL),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this falls under the "configurations we do not want users to mess with". agree 10K is the recommended sweet spot, but not sure if this needs any kind of tunability option ATM.
Consider dropping this config for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also didn't have it at first - but in tests it was much easier to deterministically trigger zero copy writes if I can play with the minimum write size.

configurations we do not want users to mess with

I can move it to a DEBUG command if we would prefer, if we have concerns about maintaining new configs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why it makes the test much easier? I thiknk DEBUG command is fine and also a HIDDEN config which is named prefixed/suffixed with 'debug' is fine IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why it makes the test much easier?

Ultimately we have no control over if a call to send will return <= the number of bytes we want it to send. So this leads to nondeterministic behavior - we can call SET with a key of length 10 KiB, and sometimes see that this doesn't trigger a zero-copy write since it is pieced into two send syscalls.

I think this falls under the "configurations we do not want users to mess with".

At a higher level - can I ask why it would be bad to expose such a config to the user? I can imagine that a combination of network hardware and kernel version may impact what setting makes sense here. I don't expect many folks to play with it - but it isn't a confusing setting and it isn't like maintenance cost will be high.


/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down
9 changes: 9 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#ifdef __linux__
#include <features.h>
#include <fcntl.h>
#include <sys/socket.h>
#endif

#if defined(__APPLE__) && defined(MAC_OS_X_VERSION_MAX_ALLOWED) && MAC_OS_X_VERSION_MAX_ALLOWED >= 1060
Expand Down Expand Up @@ -92,6 +93,14 @@
#endif
#endif


/* MSG_ZEROCOPY */
#ifdef __linux__
#if defined(SO_ZEROCOPY)
#define HAVE_MSG_ZEROCOPY 1
#endif
#endif

/* Test for polling API */
#ifdef __linux__
#define HAVE_EPOLL 1
Expand Down
19 changes: 19 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <stdio.h>
#include <string.h>
#include <sys/uio.h>
#include <sys/socket.h>

#include "ae.h"

Expand All @@ -50,6 +51,7 @@ typedef enum {
CONN_STATE_CONNECTING,
CONN_STATE_ACCEPTING,
CONN_STATE_CONNECTED,
CONN_STATE_SHUTDOWN,
CONN_STATE_CLOSED,
CONN_STATE_ERROR
} ConnectionState;
Expand Down Expand Up @@ -104,10 +106,13 @@ typedef struct ConnectionType {
int (*read)(struct connection *conn, void *buf, size_t buf_len);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
int (*set_error_queue_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*send)(struct connection *conn, const void *data, size_t data_len, int flags);
ssize_t (*recvmsg)(struct connection *conn, struct msghdr *msg, int flags);

/* pending data */
int (*has_pending_data)(void);
Expand Down Expand Up @@ -135,6 +140,7 @@ struct connection {
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
ConnectionCallbackFunc error_queue_handler;
};

#define CONFIG_BINDADDR_MAX 16
Expand Down Expand Up @@ -258,6 +264,10 @@ static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCal
return conn->type->set_write_handler(conn, func, barrier);
}

static inline int connSetErrorQueueHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_error_queue_handler(conn, func);
}

static inline void connShutdown(connection *conn) {
conn->type->shutdown(conn);
}
Expand Down Expand Up @@ -285,6 +295,14 @@ static inline ssize_t connSyncReadLine(connection *conn, char *ptr, ssize_t size
return conn->type->sync_readline(conn, ptr, size, timeout);
}

static inline ssize_t connSend(connection *conn, const void *data, size_t data_len, int flags) {
return conn->type->send(conn, data, data_len, flags);
}

static inline ssize_t connRecvMsg(connection *conn, struct msghdr *msg, int flags) {
return conn->type->recvmsg(conn, msg, flags);
}

/* Return CONN_TYPE_* for the specified connection */
static inline const char *connGetType(connection *conn) {
return conn->type->get_type(conn);
Expand Down Expand Up @@ -382,6 +400,7 @@ int connDisableTcpNoDelay(connection *conn);
int connKeepAlive(connection *conn, int interval);
int connSendTimeout(connection *conn, long long ms);
int connRecvTimeout(connection *conn, long long ms);
int connSetZeroCopy(connection *conn, int setting);

/* Get cert for the secure connection */
static inline sds connGetPeerCert(connection *conn) {
Expand Down
10 changes: 10 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ void debugCommand(client *c) {
" Grace period in seconds for replica main channel to establish psync.",
"DICT-RESIZING <0|1>",
" Enable or disable the main dict and expire dict resizing.",
"ZEROCOPY-FOR-LOOPBACK <0|1>",
" Enable or disable zerocopy IO on loopback connections.",
"PAUSE-ERRQUEUE-EVENTS <0|1>",
" Pause or unpause error queue events handling (i.e. for zero copy tracking notifications).",
NULL};
addExtendedReplyHelp(c, help, clusterDebugCommandExtendedHelp());
} else if (!strcasecmp(c->argv[1]->ptr, "segfault")) {
Expand Down Expand Up @@ -1022,6 +1026,12 @@ void debugCommand(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) {
server.dict_resizing = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "zerocopy-for-loopback") && c->argc == 3) {
server.debug_zerocopy_bypass_loopback_check = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "pause-errqueue-events") && c->argc == 3) {
server.debug_pause_errqueue_events = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!handleDebugClusterCommand(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down
Loading
Loading