From ae70c5459b13d426e8e48d85e7f48120d69eeebb Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Thu, 2 Jan 2025 10:01:55 +0200 Subject: [PATCH] replication: fix io-threads possible race by moving waitForClientIO (#1422) ### Fix race with pending writes in replica state transition #### The Problem In #60 (Dual channel replication) a new `connWrite` call was added before the `waitForClientIO` check. This created a race condition where the main thread may attempt to write to a client that could have pending writes in IO threads. #### The Fix Moved the `waitForClientIO()` call earlier in `syncCommand`, before any `connWrite` call. This ensures all pending IO operations are completed before attempting to write to the client. --------- Signed-off-by: Uri Yagelnik --- src/replication.c | 5 +++-- src/socket.c | 10 ++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index f907771e71..160b0c4d5e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1036,6 +1036,9 @@ void syncCommand(client *c) { /* ignore SYNC if already replica or in monitor mode */ if (c->flag.replica) return; + /* Wait for any IO pending operation to finish before changing the client state to replica */ + waitForClientIO(c); + /* Check if this is a failover request to a replica with the same replid and * become a primary if so. */ if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr, "psync") && !strcasecmp(c->argv[3]->ptr, "failover")) { @@ -1148,8 +1151,6 @@ void syncCommand(client *c) { c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ c->repldbfd = -1; - /* Wait for any IO pending operation to finish before changing the client state */ - waitForClientIO(c); c->flag.replica = 1; listAddNodeTail(server.replicas, c); diff --git a/src/socket.c b/src/socket.c index d89e6c8767..94869f3f25 100644 --- a/src/socket.c +++ b/src/socket.c @@ -29,6 +29,7 @@ #include "server.h" #include "connhelpers.h" +#include "io_threads.h" /* The connections module provides a lean abstraction of network connections * to avoid direct socket and async event management across the server code base. @@ -154,6 +155,10 @@ static void connSocketClose(connection *conn) { } static int connSocketWrite(connection *conn, const void *data, size_t data_len) { + /* Assert the main thread is not writing to a connection that is currently offloaded. */ + debugServerAssert(!(conn->flags & CONN_FLAG_ALLOW_ACCEPT_OFFLOAD) || !inMainThread() || + ((client *)connGetPrivateData(conn))->io_write_state != CLIENT_PENDING_IO); + int ret = write(conn->fd, data, data_len); if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; @@ -182,6 +187,11 @@ static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcn } static int connSocketRead(connection *conn, void *buf, size_t buf_len) { + /* Assert the main thread is not reading from a connection that is currently offloaded. */ + debugServerAssert(!(conn->flags & CONN_FLAG_ALLOW_ACCEPT_OFFLOAD) || !inMainThread() || + ((client *)connGetPrivateData(conn))->io_read_state != CLIENT_PENDING_IO); + + int ret = read(conn->fd, buf, buf_len); if (!ret) { conn->state = CONN_STATE_CLOSED;