From ea5ce9cd9a8a881c8d68e186a3820e2468b01e88 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Tue, 17 Dec 2024 15:55:58 +0000 Subject: [PATCH 1/2] Support primary IO offload to IO threads Signed-off-by: Uri Yagelnik --- src/io_threads.c | 5 +++-- src/networking.c | 4 +++- src/replication.c | 9 +++++++++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/io_threads.c b/src/io_threads.c index 3865eb77c3..edd214c14a 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -322,7 +322,7 @@ int trySendReadToIOThreads(client *c) { /* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */ if (c->io_read_state != CLIENT_IDLE) return C_OK; /* Currently, replica/master writes are not offloaded and are processed synchronously. */ - if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; + if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; /* With Lua debug client we may call connWrite directly in the main thread */ if (c->flag.lua_debug) return C_ERR; /* For simplicity let the main-thread handle the blocked clients */ @@ -345,6 +345,7 @@ int trySendReadToIOThreads(client *c) { c->cur_tid = tid; c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE; c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0; + c->read_flags |= c->flag.primary ? READ_FLAGS_PRIMARY : 0; c->io_read_state = CLIENT_PENDING_IO; connSetPostponeUpdateState(c->conn, 1); @@ -364,7 +365,7 @@ int trySendWriteToIOThreads(client *c) { /* Nothing to write */ if (!clientHasPendingReplies(c)) return C_ERR; /* Currently, replica/master writes are not offloaded and are processed synchronously. */ - if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; + if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; /* We can't offload debugged clients as the main-thread may read at the same time */ if (c->flag.lua_debug) return C_ERR; diff --git a/src/networking.c b/src/networking.c index 4d386d6dc4..a217a92db2 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4954,7 +4954,9 @@ void ioThreadReadQueryFromClient(void *data) { } done: - trimClientQueryBuffer(c); + if (!(c->read_flags & READ_FLAGS_PRIMARY)) { + trimClientQueryBuffer(c); + } atomic_thread_fence(memory_order_release); c->io_read_state = CLIENT_COMPLETED_IO; } diff --git a/src/replication.c b/src/replication.c index b5ce77f5e0..eafc898309 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4136,6 +4136,8 @@ void replicationCachePrimary(client *c) { serverAssert(server.primary != NULL && server.cached_primary == NULL); serverLog(LL_NOTICE, "Caching the disconnected primary state."); + /* Wait for IO operations to be done before proceeding */ + waitForClientIO(c); /* Unlink the client from the server structures. */ unlinkClient(c); @@ -4154,6 +4156,13 @@ void replicationCachePrimary(client *c) { c->bufpos = 0; resetClient(c); + /* Reset the primary IO state. */ + c->nwritten = 0; + c->nread = 0; + c->io_read_state = c->io_write_state = CLIENT_IDLE; + c->io_parsed_cmd = NULL; + c->flag.pending_command = 0; + /* Save the primary. Server.primary will be set to null later by * replicationHandlePrimaryDisconnection(). */ server.cached_primary = server.primary; From 39f0a483ca60ace21f38b3154d70723c8f869ff8 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Wed, 18 Dec 2024 11:41:53 +0000 Subject: [PATCH 2/2] Addressed PR comments Signed-off-by: Uri Yagelnik --- src/io_threads.c | 4 ++-- src/networking.c | 12 ++++++++++++ src/replication.c | 8 +------- src/server.h | 1 + 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/io_threads.c b/src/io_threads.c index edd214c14a..46fa3d48c1 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -321,7 +321,7 @@ int trySendReadToIOThreads(client *c) { if (server.active_io_threads_num <= 1) return C_ERR; /* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */ if (c->io_read_state != CLIENT_IDLE) return C_OK; - /* Currently, replica/master writes are not offloaded and are processed synchronously. */ + /* Currently, replica reads are not offloaded to IO threads. */ if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; /* With Lua debug client we may call connWrite directly in the main thread */ if (c->flag.lua_debug) return C_ERR; @@ -364,7 +364,7 @@ int trySendWriteToIOThreads(client *c) { if (c->io_write_state != CLIENT_IDLE) return C_OK; /* Nothing to write */ if (!clientHasPendingReplies(c)) return C_ERR; - /* Currently, replica/master writes are not offloaded and are processed synchronously. */ + /* Currently, replica writes are not offloaded to IO threads. */ if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; /* We can't offload debugged clients as the main-thread may read at the same time */ if (c->flag.lua_debug) return C_ERR; diff --git a/src/networking.c b/src/networking.c index a217a92db2..3cd206b175 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2589,6 +2589,16 @@ void resetClient(client *c) { } } +void resetClientIOState(client *c) { + c->nwritten = 0; + c->nread = 0; + c->io_read_state = c->io_write_state = CLIENT_IDLE; + c->io_parsed_cmd = NULL; + c->flag.pending_command = 0; + c->io_last_bufpos = 0; + c->io_last_reply_block = NULL; +} + /* Initializes the shared query buffer to a new sds with the default capacity. * Need to ensure the initlen is not less than readlen in readToQueryBuf. */ void initSharedQueryBuf(void) { @@ -4954,6 +4964,8 @@ void ioThreadReadQueryFromClient(void *data) { } done: + /* Only trim query buffer for non-primary clients + * Primary client's buffer is handled by main thread using repl_applied position */ if (!(c->read_flags & READ_FLAGS_PRIMARY)) { trimClientQueryBuffer(c); } diff --git a/src/replication.c b/src/replication.c index eafc898309..984d400ae1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4155,13 +4155,7 @@ void replicationCachePrimary(client *c) { c->reply_bytes = 0; c->bufpos = 0; resetClient(c); - - /* Reset the primary IO state. */ - c->nwritten = 0; - c->nread = 0; - c->io_read_state = c->io_write_state = CLIENT_IDLE; - c->io_parsed_cmd = NULL; - c->flag.pending_command = 0; + resetClientIOState(c); /* Save the primary. Server.primary will be set to null later by * replicationHandlePrimaryDisconnection(). */ diff --git a/src/server.h b/src/server.h index dc4d2e8808..48a2143226 100644 --- a/src/server.h +++ b/src/server.h @@ -2808,6 +2808,7 @@ void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...); void beforeNextClient(client *c); void clearClientConnectionState(client *c); void resetClient(client *c); +void resetClientIOState(client *c); void freeClientOriginalArgv(client *c); void freeClientArgv(client *c); void sendReplyToClient(connection *conn);