From d93cb02385e83dab571d9cb1595b14344008fdf4 Mon Sep 17 00:00:00 2001 From: naglera Date: Mon, 10 Jun 2024 08:19:28 +0000 Subject: [PATCH] Fix use after free replication data block During replStreamProgressCallback replica may recive replicaof command and hance free replica side replicaiton buffer. If replication buffer is freed we should abort the streaming Signed-off-by: naglera --- src/replication.c | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/replication.c b/src/replication.c index 7c402d668b..9597f73ca7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2928,24 +2928,31 @@ void bufferReplData(connection *conn) { /* Replication: Replica side. * Streams accumulated replication data into the database while freeing read nodes */ -void streamReplDataBufToDb(client *c) { +int streamReplDataBufToDb(client *c) { serverAssert(c->flags & CLIENT_MASTER); blockingOperationStarts(); size_t offset = 0; listNode *cur = NULL; time_t last_progress_callback = mstime(); - while ((cur = listFirst(server.pending_repl_data.blocks))) { + while (server.pending_repl_data.blocks && (cur = listFirst(server.pending_repl_data.blocks))) { /* Read and process repl data block */ replDataBufBlock *o = listNodeValue(cur); c->querybuf = sdscatlen(c->querybuf, o->buf, o->used); c->read_reploff += o->used; processInputBuffer(c); server.pending_repl_data.len -= o->used; - replStreamProgressCallback(offset, o->used, &last_progress_callback); offset += o->used; listDelNode(server.pending_repl_data.blocks, cur); + replStreamProgressCallback(offset, o->used, &last_progress_callback); } blockingOperationEnds(); + if (!server.pending_repl_data.blocks) { + /* If we encounter a `replicaof` command during the replStreamProgressCallback, + * pending_repl_data.blocks will be NULL, and we should return an error and + * abort the current sync session. */ + return C_ERR; + } + return C_OK; } /* Replication: Replica side. @@ -2955,13 +2962,17 @@ void rdbChannelSyncSuccess(void) { server.master_initial_offset = server.repl_provisional_master.reploff; replicationResurrectProvisionalMaster(); /* Wait for the accumulated buffer to be processed before reading any more replication updates */ - if (server.pending_repl_data.blocks) streamReplDataBufToDb(server.master); + if (streamReplDataBufToDb(server.master)) { + /* Sync session aborted during repl data streaming. */ + return; + } freePendingReplDataBuf(); serverLog(LL_NOTICE, "Successfully streamed replication data into memory"); /* We can resume reading from the master connection once the local replication buffer has been loaded. */ replicationSteadyStateInit(); replicationSendAck(); /* Send ACK to notify primary that replica is synced */ server.rdb_client_id = -1; + server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; } /* Replication: Replica side. @@ -3002,7 +3013,6 @@ void completeTaskRDBChannelSyncRdbConn(connection *conn) { if (server.repl_state == REPL_STATE_TRANSFER) { connSetReadHandler(server.repl_transfer_s, NULL); rdbChannelSyncSuccess(); - server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; return; } serverPanic("Unrecognized replication state %d using rdb connection", server.repl_state);