Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload reading the replication stream to IO threads #1449

Merged
merged 2 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* Currently, replica reads are not offloaded to IO threads. */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* With Lua debug client we may call connWrite directly in the main thread */
if (c->flag.lua_debug) return C_ERR;
/* For simplicity let the main-thread handle the blocked clients */
Expand All @@ -345,6 +345,7 @@ int trySendReadToIOThreads(client *c) {
c->cur_tid = tid;
c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE;
c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0;
c->read_flags |= c->flag.primary ? READ_FLAGS_PRIMARY : 0;

c->io_read_state = CLIENT_PENDING_IO;
connSetPostponeUpdateState(c->conn, 1);
Expand All @@ -363,8 +364,8 @@ int trySendWriteToIOThreads(client *c) {
if (c->io_write_state != CLIENT_IDLE) return C_OK;
/* Nothing to write */
if (!clientHasPendingReplies(c)) return C_ERR;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* Currently, replica writes are not offloaded to IO threads. */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* We can't offload debugged clients as the main-thread may read at the same time */
if (c->flag.lua_debug) return C_ERR;

Expand Down
16 changes: 15 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2589,6 +2589,16 @@ void resetClient(client *c) {
}
}

void resetClientIOState(client *c) {
c->nwritten = 0;
c->nread = 0;
c->io_read_state = c->io_write_state = CLIENT_IDLE;
c->io_parsed_cmd = NULL;
c->flag.pending_command = 0;
c->io_last_bufpos = 0;
c->io_last_reply_block = NULL;
}

/* Initializes the shared query buffer to a new sds with the default capacity.
* Need to ensure the initlen is not less than readlen in readToQueryBuf. */
void initSharedQueryBuf(void) {
Expand Down Expand Up @@ -4954,7 +4964,11 @@ void ioThreadReadQueryFromClient(void *data) {
}

done:
trimClientQueryBuffer(c);
/* Only trim query buffer for non-primary clients
* Primary client's buffer is handled by main thread using repl_applied position */
if (!(c->read_flags & READ_FLAGS_PRIMARY)) {
uriyage marked this conversation as resolved.
Show resolved Hide resolved
trimClientQueryBuffer(c);
}
atomic_thread_fence(memory_order_release);
c->io_read_state = CLIENT_COMPLETED_IO;
}
Expand Down
3 changes: 3 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4136,6 +4136,8 @@ void replicationCachePrimary(client *c) {
serverAssert(server.primary != NULL && server.cached_primary == NULL);
serverLog(LL_NOTICE, "Caching the disconnected primary state.");

/* Wait for IO operations to be done before proceeding */
waitForClientIO(c);
/* Unlink the client from the server structures. */
unlinkClient(c);

Expand All @@ -4153,6 +4155,7 @@ void replicationCachePrimary(client *c) {
c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);
resetClientIOState(c);

/* Save the primary. Server.primary will be set to null later by
* replicationHandlePrimaryDisconnection(). */
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2808,6 +2808,7 @@ void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...);
void beforeNextClient(client *c);
void clearClientConnectionState(client *c);
void resetClient(client *c);
void resetClientIOState(client *c);
void freeClientOriginalArgv(client *c);
void freeClientArgv(client *c);
void sendReplyToClient(connection *conn);
Expand Down
Loading