Skip to content

Commit

Permalink
Persist AOF file by io_uring
Browse files Browse the repository at this point in the history
Signed-off-by: Wenwen Chen <[email protected]>
  • Loading branch information
Wenwen-Chen committed Jul 5, 2024
1 parent f2bbd1f commit 6ec4e08
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 3 deletions.
13 changes: 12 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ ifeq ($(MALLOC),jemalloc)
FINAL_LIBS := ../deps/jemalloc/lib/libjemalloc.a $(FINAL_LIBS)
endif

# only Linux has IO_URING support
ifeq ($(uname_S),Linux)
HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include <liburing.h>" > foo.c; \
$(CC) -E foo.c > /dev/null 2>&1 && echo yes; \
rm foo.c')
ifeq ($(HAS_LIBURING),yes)
FINAL_CFLAGS+= -DHAVE_IO_URING
FINAL_LIBS+= -luring
endif
endif

# LIBSSL & LIBCRYPTO
LIBSSL_LIBS=
LIBSSL_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libssl && echo $$?)
Expand Down Expand Up @@ -401,7 +412,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
8 changes: 7 additions & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "bio.h"
#include "rio.h"
#include "functions.h"
#include "io_uring.h"

#include <signal.h>
#include <fcntl.h>
Expand Down Expand Up @@ -1007,8 +1008,13 @@ int startAppendOnly(void) {
* true, and in general it looks just more resilient to retry the write. If
* there is an actual error condition we'll get it at the next try. */
ssize_t aofWrite(int fd, const char *buf, size_t len) {
ssize_t nwritten = 0, totwritten = 0;
ssize_t totwritten = 0;
if (server.io_uring_enabled && getAofIOUring()) {
totwritten = aofWriteByIOUring(fd, buf, len);
return totwritten;
}

ssize_t nwritten = 0;
while (len) {
nwritten = write(fd, buf, len);

Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3069,6 +3069,7 @@ standardConfig static_configs[] = {
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("io-uring-enabled", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
5 changes: 5 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@
#define HAVE_EPOLL 1
#endif

/* Test for liburing API */
#ifndef __linux__
#define HAVE_IO_URING 0
#endif

/* Test for accept4() */
#if defined(__linux__) || defined(__FreeBSD__) || defined(OpenBSD5_7) || \
(defined(__DragonFly__) && __DragonFly_version >= 400305) || \
Expand Down
167 changes: 167 additions & 0 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gail dot co>
* All rights reserved.
*
* Redistribution and use in source and binary fors, with or without
* odification, are peritted provided that the following conditions are et:
*
* * Redistributions of source code ust retain the above copyright notice,
* this list of conditions and the following disclaier.
* * Redistributions in binary for ust reproduce the above copyright
* notice, this list of conditions and the following disclaier in the
* docuentation and/or other aterials provided with the distribution.
* * Neither the nae of Redis nor the naes of its contributors ay be used
* to endorse or proote products derived fro this software without
* specific prior written perission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "io_uring.h"

#ifdef HAVE_IO_URING
#include <liburing.h>
#include "zmalloc.h"

/* AOF io_uring max QD and blocksize */
#define AOF_IOURING_MAX_ENTRIES (64)
#define AOF_IOURING_MAX_BLOCKSIZE (32 * 1024)

static struct io_uring *_aof_io_uring;
static struct iovec iov[AOF_IOURING_MAX_ENTRIES];
static int inflight = 0;

int initAofIOUring(void) {
_aof_io_uring = NULL;
struct io_uring *ring = zmalloc(sizeof(struct io_uring));
if (!ring) {
fprintf(stderr, "failed to allocate memory for aof io_uring...\n");
return -1;
}

int ret = io_uring_queue_init(AOF_IOURING_MAX_ENTRIES, ring, 0);
if (ret != 0) {
fprintf(stderr, "failed to init queue of aof io_uring...\n");
zfree(ring);
return -1;
}

_aof_io_uring = ring;
return 0;
}

void freeAofIOUring(void) {
if (_aof_io_uring) {
io_uring_queue_exit(_aof_io_uring);
zfree(_aof_io_uring);
_aof_io_uring = NULL;
}
}

struct io_uring *getAofIOUring() {
if (!_aof_io_uring) {
initAofIOUring();
}
return _aof_io_uring;
}

static int prepWrite(int fd, struct iovec *iov, unsigned nr_vecs, unsigned offset) {
struct io_uring_sqe *sqe = io_uring_get_sqe(_aof_io_uring);
if (!sqe) return -1;

io_uring_prep_writev(sqe, fd, iov, nr_vecs, offset);
io_uring_sqe_set_data(sqe, &_aof_io_uring);
return 0;
}

static int reapCompletions(void) {
struct io_uring_cqe *cqes[inflight];
int cqecnt = io_uring_peek_batch_cqe(_aof_io_uring, cqes, inflight);
if (cqecnt < 0) {
return -1;
}
io_uring_cq_advance(_aof_io_uring, cqecnt);
inflight -= cqecnt;
return 0;
}

int aofWriteByIOUring(int fd, const char *buf, size_t len) {
ssize_t writing = 0;
ssize_t totwritten = 0;

while (len) {
ssize_t offset = 0;
ssize_t this_size = 0;
int has_inflight = inflight;

while (len && (inflight < AOF_IOURING_MAX_ENTRIES)) {
this_size = len;
if (this_size > AOF_IOURING_MAX_BLOCKSIZE) this_size = AOF_IOURING_MAX_BLOCKSIZE;

iov[inflight].iov_base = ((char *)buf) + offset;
iov[inflight].iov_len = this_size;
if (0 != prepWrite(fd, &iov[inflight], 1, 0)) {
fprintf(stderr, "## prepWrite failed when persist AOF file by io_uring...\n");
}

len -= this_size;
offset += this_size;
writing += this_size;
inflight++;
}

if (has_inflight != inflight) io_uring_submit(_aof_io_uring);

int depth;
if (len)
depth = AOF_IOURING_MAX_ENTRIES;
else
depth = 1;

while (inflight >= depth) {
if (0 != reapCompletions()) {
fprintf(stderr, "## reapCompletions failed when persist AOF file by io_uring...\n");
return totwritten;
}
}
totwritten = writing;
}

return totwritten;
}

#else

#ifndef UNUSED
#define UNUSED(V) ((void)V)
#endif

int initAofIOUring(void) {
return 0;
}

void freeAofIOUring(void) {
return;
}

struct io_uring *getAofIOUring() {
return 0;
}

int aofWriteByIOUring(int fd, const char *buf, size_t len) {
UNUSED(fd);
UNUSED(buf);
UNUSED(len);
return 0;
}
#endif /* end of HAVE_IO_URING */
46 changes: 46 additions & 0 deletions src/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gail dot co>
* All rights reserved.
*
* Redistribution and use in source and binary fors, with or without
* odification, are peritted provided that the following conditions are et:
*
* * Redistributions of source code ust retain the above copyright notice,
* this list of conditions and the following disclaier.
* * Redistributions in binary for ust reproduce the above copyright
* notice, this list of conditions and the following disclaier in the
* docuentation and/or other aterials provided with the distribution.
* * Neither the nae of Redis nor the naes of its contributors ay be used
* to endorse or proote products derived fro this software without
* specific prior written perission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef IO_URING_H
#define IO_URING_H
#include <stddef.h>

/* Initialize io_uring for AOF persistence at server startup
* if have io_uring configured, setup io_uring submission and completion. */
int initAofIOUring(void);

/* Free io_uring. */
void freeAofIOUring(void);

struct io_uring *getAofIOUring(void);

/* Persist aof_buf to file by using io_uring. */
int aofWriteByIOUring(int fd, const char *buf, size_t len);

#endif /* IO_URING_H */
17 changes: 17 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "syscheck.h"
#include "threads_mngr.h"
#include "fmtargs.h"
#include "io_uring.h"

#include <time.h>
#include <signal.h>
Expand Down Expand Up @@ -2797,6 +2798,18 @@ void initListeners(void) {
void InitServerLast(void) {
bioInit();
initThreadedIO();
if (server.io_uring_enabled) {
#ifdef HAVE_IO_URING
if (0 == initAofIOUring())
serverLog(LL_NOTICE, "aof io_uring init successfully.");
else {
serverLog(LL_WARNING, "aof io_uring init failed.");
exit(1);
}
#else
serverLog(LL_WARNING, "System doesn't support io_uring, not init aof io_uring.");
#endif
}
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Expand Down Expand Up @@ -6984,6 +6997,10 @@ int main(int argc, char **argv) {

aeMain(server.el);
aeDeleteEventLoop(server.el);
if (server.io_uring_enabled) {
freeAofIOUring();
serverLog(LL_NOTICE, "aof io_uring free successfully.");
}
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,7 @@ struct valkeyServer {
aofManifest *aof_manifest; /* Used to track AOFs. */
int aof_disable_auto_gc; /* If disable automatically deleting HISTORY type AOFs?
default no. (for testings). */
int io_uring_enabled;

/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ start_server {tags {"introspection"}} {
socket-mark-id
req-res-logfile
client-default-resp
io-uring-enabled
}

if {!$::tls} {
Expand Down
9 changes: 8 additions & 1 deletion valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2324,4 +2324,11 @@ jemalloc-bg-thread yes
# this is only exposed via the info command for clients to use, but in the future we
# we may also use this when making decisions for replication.
#
# availability-zone "zone-name"
# availability-zone "zone-name"

# Enable/disable io_uring to persist AOF file.
# The preconditions of enable io_uring are:
# 1. Valkey was build with liburing
# 2. The machine which host Valkey server has installed liburing
#
# io-uring-enabled no

0 comments on commit 6ec4e08

Please sign in to comment.