From bbc18296458ac6ef6ce9600a2ec5dd8a80038d78 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Wed, 11 Dec 2024 07:57:24 +0000 Subject: [PATCH 1/2] replication: fix io-threads possible race by moving waitForClientIO earlier Signed-off-by: Uri Yagelnik --- src/replication.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/replication.c b/src/replication.c index 8ff8ad3f0f..22a710c291 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1036,6 +1036,9 @@ void syncCommand(client *c) { /* ignore SYNC if already replica or in monitor mode */ if (c->flag.replica) return; + /* Wait for any IO pending operation to finish before changing the client state to replica */ + waitForClientIO(c); + /* Check if this is a failover request to a replica with the same replid and * become a primary if so. */ if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr, "psync") && !strcasecmp(c->argv[3]->ptr, "failover")) { @@ -1147,8 +1150,6 @@ void syncCommand(client *c) { c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ c->repldbfd = -1; - /* Wait for any IO pending operation to finish before changing the client state */ - waitForClientIO(c); c->flag.replica = 1; listAddNodeTail(server.replicas, c); From 756c3dce45f147314c7c99cc2171375f76602321 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Tue, 24 Dec 2024 10:06:17 +0000 Subject: [PATCH 2/2] Add assert to conn write/read Signed-off-by: Uri Yagelnik --- src/socket.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/socket.c b/src/socket.c index 7344d66ad8..c83ce38234 100644 --- a/src/socket.c +++ b/src/socket.c @@ -29,6 +29,7 @@ #include "server.h" #include "connhelpers.h" +#include "io_threads.h" /* The connections module provides a lean abstraction of network connections * to avoid direct socket and async event management across the server code base. @@ -154,6 +155,10 @@ static void connSocketClose(connection *conn) { } static int connSocketWrite(connection *conn, const void *data, size_t data_len) { + /* Assert the main thread is not writing to a connection that is currently offloaded. */ + debugServerAssert(!(conn->flags & CONN_FLAG_ALLOW_ACCEPT_OFFLOAD) || !inMainThread() || + ((client *)connGetPrivateData(conn))->io_write_state != CLIENT_PENDING_IO); + int ret = write(conn->fd, data, data_len); if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; @@ -182,6 +187,11 @@ static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcn } static int connSocketRead(connection *conn, void *buf, size_t buf_len) { + /* Assert the main thread is not reading from a connection that is currently offloaded. */ + debugServerAssert(!(conn->flags & CONN_FLAG_ALLOW_ACCEPT_OFFLOAD) || !inMainThread() || + ((client *)connGetPrivateData(conn))->io_read_state != CLIENT_PENDING_IO); + + int ret = read(conn->fd, buf, buf_len); if (!ret) { conn->state = CONN_STATE_CLOSED;