diff --git a/src/io_threads.c b/src/io_threads.c index 3865eb77c3..46fa3d48c1 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -321,8 +321,8 @@ int trySendReadToIOThreads(client *c) { if (server.active_io_threads_num <= 1) return C_ERR; /* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */ if (c->io_read_state != CLIENT_IDLE) return C_OK; - /* Currently, replica/master writes are not offloaded and are processed synchronously. */ - if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; + /* Currently, replica reads are not offloaded to IO threads. */ + if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; /* With Lua debug client we may call connWrite directly in the main thread */ if (c->flag.lua_debug) return C_ERR; /* For simplicity let the main-thread handle the blocked clients */ @@ -345,6 +345,7 @@ int trySendReadToIOThreads(client *c) { c->cur_tid = tid; c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE; c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0; + c->read_flags |= c->flag.primary ? READ_FLAGS_PRIMARY : 0; c->io_read_state = CLIENT_PENDING_IO; connSetPostponeUpdateState(c->conn, 1); @@ -363,8 +364,8 @@ int trySendWriteToIOThreads(client *c) { if (c->io_write_state != CLIENT_IDLE) return C_OK; /* Nothing to write */ if (!clientHasPendingReplies(c)) return C_ERR; - /* Currently, replica/master writes are not offloaded and are processed synchronously. */ - if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; + /* Currently, replica writes are not offloaded to IO threads. */ + if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; /* We can't offload debugged clients as the main-thread may read at the same time */ if (c->flag.lua_debug) return C_ERR; diff --git a/src/networking.c b/src/networking.c index 4d386d6dc4..3cd206b175 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2589,6 +2589,16 @@ void resetClient(client *c) { } } +void resetClientIOState(client *c) { + c->nwritten = 0; + c->nread = 0; + c->io_read_state = c->io_write_state = CLIENT_IDLE; + c->io_parsed_cmd = NULL; + c->flag.pending_command = 0; + c->io_last_bufpos = 0; + c->io_last_reply_block = NULL; +} + /* Initializes the shared query buffer to a new sds with the default capacity. * Need to ensure the initlen is not less than readlen in readToQueryBuf. */ void initSharedQueryBuf(void) { @@ -4954,7 +4964,11 @@ void ioThreadReadQueryFromClient(void *data) { } done: - trimClientQueryBuffer(c); + /* Only trim query buffer for non-primary clients + * Primary client's buffer is handled by main thread using repl_applied position */ + if (!(c->read_flags & READ_FLAGS_PRIMARY)) { + trimClientQueryBuffer(c); + } atomic_thread_fence(memory_order_release); c->io_read_state = CLIENT_COMPLETED_IO; } diff --git a/src/replication.c b/src/replication.c index b5ce77f5e0..984d400ae1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4136,6 +4136,8 @@ void replicationCachePrimary(client *c) { serverAssert(server.primary != NULL && server.cached_primary == NULL); serverLog(LL_NOTICE, "Caching the disconnected primary state."); + /* Wait for IO operations to be done before proceeding */ + waitForClientIO(c); /* Unlink the client from the server structures. */ unlinkClient(c); @@ -4153,6 +4155,7 @@ void replicationCachePrimary(client *c) { c->reply_bytes = 0; c->bufpos = 0; resetClient(c); + resetClientIOState(c); /* Save the primary. Server.primary will be set to null later by * replicationHandlePrimaryDisconnection(). */ diff --git a/src/server.h b/src/server.h index dc4d2e8808..48a2143226 100644 --- a/src/server.h +++ b/src/server.h @@ -2808,6 +2808,7 @@ void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...); void beforeNextClient(client *c); void clearClientConnectionState(client *c); void resetClient(client *c); +void resetClientIOState(client *c); void freeClientOriginalArgv(client *c); void freeClientArgv(client *c); void sendReplyToClient(connection *conn);