From 3357ea3db6530b0750d655c84258c67bb9e69d23 Mon Sep 17 00:00:00 2001 From: Amit Nagler <58042354+naglera@users.noreply.github.com> Date: Tue, 15 Oct 2024 19:26:42 +0300 Subject: [PATCH] Refactor return and goto statements (#945) Consolidate the cleanup of local variables to a single point within the method, ensuring proper resource management and p reventing memory leaks or double-free issues. Previoslly descused here: - https://github.com/valkey-io/valkey/pull/60#discussion_r1667872633 - https://github.com/valkey-io/valkey/pull/60#discussion_r1668045666 --------- Signed-off-by: naglera Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com> Co-authored-by: Ping Xie --- src/replication.c | 422 +++++++++++++++++++++++++++------------------- src/server.h | 1 + 2 files changed, 246 insertions(+), 177 deletions(-) diff --git a/src/replication.c b/src/replication.c index 948a2762bc..63433de865 100644 --- a/src/replication.c +++ b/src/replication.c @@ -53,8 +53,9 @@ int replicaPutOnline(client *replica); void replicaStartCommandStream(client *replica); int cancelReplicationHandshake(int reconnect); void replicationSteadyStateInit(void); -void setupMainConnForPsync(connection *conn); +void dualChannelSetupMainConnForPsync(connection *conn); void dualChannelSyncHandleRdbLoadCompletion(void); +static void dualChannelFullSyncWithPrimary(connection *conn); /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case @@ -2588,13 +2589,135 @@ int sendCurrentOffsetToReplica(client *replica) { return C_OK; } +static int dualChannelReplHandleHandshake(connection *conn, sds *err) { + serverLog(LL_DEBUG, "Received first reply from primary using rdb connection."); + /* AUTH with the primary if required. */ + if (server.primary_auth) { + char *args[] = {"AUTH", NULL, NULL}; + size_t lens[] = {4, 0, 0}; + int argc = 1; + if (server.primary_user) { + args[argc] = server.primary_user; + lens[argc] = strlen(server.primary_user); + argc++; + } + args[argc] = server.primary_auth; + lens[argc] = sdslen(server.primary_auth); + argc++; + *err = sendCommandArgv(conn, argc, args, lens); + if (*err) { + serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err); + return C_ERR; + } + } + /* Send replica listening port to primary for clarification */ + sds portstr = getReplicaPortString(); + *err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-channel", "1", "listening-port", portstr, + NULL); + sdsfree(portstr); + if (*err) { + serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err); + return C_ERR; + } + + if (connSetReadHandler(conn, dualChannelFullSyncWithPrimary) == C_ERR) { + char conninfo[CONN_INFO_LEN]; + serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), + connGetInfo(conn, conninfo, sizeof(conninfo))); + return C_ERR; + } + return C_OK; +} + +static int dualChannelReplHandleAuthReply(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) { + serverLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake"); + return C_ERR; + } + if ((*err)[0] == '-') { + serverLog(LL_WARNING, "Unable to AUTH to Primary: %s", *err); + return C_ERR; + } + server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY; + return C_OK; +} + +static int dualChannelReplHandleReplconfReply(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) { + serverLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake"); + return C_ERR; + } + + if (*err[0] == '-') { + serverLog(LL_NOTICE, "Server does not support sync with offset, dual channel sync approach cannot be used: %s", + *err); + return C_ERR; + } + if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { + serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn)); + return C_ERR; + } + return C_OK; +} + +static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) { + uint64_t rdb_client_id; + *err = receiveSynchronousResponse(conn); + if (*err == NULL) { + return C_ERR; + } + if (*err[0] == '\0') { + /* Retry again later */ + serverLog(LL_DEBUG, "Received empty $ENDOFF response"); + return C_RETRY; + } + long long reploffset; + char primary_replid[CONFIG_RUN_ID_SIZE + 1]; + int dbid; + /* Parse end offset response */ + char *endoff_format = "$ENDOFF:%lld %40s %d %llu"; + if (sscanf(*err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) { + serverLog(LL_WARNING, "Received unexpected $ENDOFF response: %s", *err); + return C_ERR; + } + server.rdb_client_id = rdb_client_id; + server.primary_initial_offset = reploffset; + + /* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */ + server.repl_provisional_primary.conn = server.repl_transfer_s; + memcpy(server.repl_provisional_primary.replid, primary_replid, CONFIG_RUN_ID_SIZE); + server.repl_provisional_primary.reploff = reploffset; + server.repl_provisional_primary.read_reploff = reploffset; + server.repl_provisional_primary.dbid = dbid; + + /* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the + * main connection accordingly.*/ + server.repl_transfer_s->state = CONN_STATE_CONNECTED; + server.repl_state = REPL_STATE_SEND_HANDSHAKE; + serverAssert(connSetReadHandler(server.repl_transfer_s, dualChannelSetupMainConnForPsync) != C_ERR); + dualChannelSetupMainConnForPsync(server.repl_transfer_s); + + /* As the next block we will receive using this connection is the rdb, we need to prepare + * the connection accordingly */ + serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, readSyncBulkPayload) != C_ERR); + server.repl_transfer_size = -1; + server.repl_transfer_read = 0; + server.repl_transfer_last_fsync_off = 0; + server.repl_transfer_lastio = server.unixtime; + + return C_OK; +} + /* Replication: Replica side. * This connection handler is used to initialize the RDB connection (dual-channel-replication). * Once a replica with dual-channel-replication enabled, denied from PSYNC with its primary, - * fullSyncWithPrimary begins its role. The connection handler prepares server.repl_rdb_transfer_s + * dualChannelFullSyncWithPrimary begins its role. The connection handler prepares server.repl_rdb_transfer_s * for a rdb stream, and server.repl_transfer_s for incremental replication data stream. */ -static void fullSyncWithPrimary(connection *conn) { +static void dualChannelFullSyncWithPrimary(connection *conn) { char *err = NULL; + int ret = 0; serverAssert(conn == server.repl_rdb_transfer_s); /* If this event fired after the user turned the instance into a primary * with REPLICAOF NO ONE we must just return ASAP. */ @@ -2607,138 +2730,40 @@ static void fullSyncWithPrimary(connection *conn) { serverLog(LL_WARNING, "Error condition on socket for dual channel replication: %s", connGetLastError(conn)); goto error; } - /* Send replica capabilities */ - if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_SEND_HANDSHAKE) { - serverLog(LL_DEBUG, "Received first reply from primary using rdb connection."); - /* AUTH with the primary if required. */ + switch (server.repl_rdb_channel_state) { + case REPL_DUAL_CHANNEL_SEND_HANDSHAKE: + ret = dualChannelReplHandleHandshake(conn, &err); + if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY; + break; + case REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY: if (server.primary_auth) { - char *args[] = {"AUTH", NULL, NULL}; - size_t lens[] = {4, 0, 0}; - int argc = 1; - if (server.primary_user) { - args[argc] = server.primary_user; - lens[argc] = strlen(server.primary_user); - argc++; - } - args[argc] = server.primary_auth; - lens[argc] = sdslen(server.primary_auth); - argc++; - err = sendCommandArgv(conn, argc, args, lens); - if (err) { - serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", err); - return; - } - } - /* Send replica listening port to primary for clarification */ - sds portstr = getReplicaPortString(); - err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-channel", "1", "listening-port", - portstr, NULL); - sdsfree(portstr); - if (err) { - serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", err); - return; - } - server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY; - - if (connSetReadHandler(conn, fullSyncWithPrimary) == C_ERR) { - char conninfo[CONN_INFO_LEN]; - serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); - goto error; - } - return; - } - if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY && !server.primary_auth) { - server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY; - } - /* Receive AUTH reply. */ - if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) { - serverLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake"); - goto error; - } - if (err[0] == '-') { - serverLog(LL_WARNING, "Unable to AUTH to Primary: %s", err); - goto error; + ret = dualChannelReplHandleAuthReply(conn, &err); + if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY; + /* Wait for next bulk before trying to read replconf reply. */ + break; } - sdsfree(err); server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY; - return; - } - /* Receive replconf response */ - if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) { - serverLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake"); - goto error; - } + /* fall through */ + case REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY: + ret = dualChannelReplHandleReplconfReply(conn, &err); + if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_ENDOFF; + break; + case REPL_DUAL_CHANNEL_RECEIVE_ENDOFF: + ret = dualChannelReplHandleEndOffsetResponse(conn, &err); + if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOAD; + break; + default: + serverPanic("Unexpected dual replication state: %d", server.repl_rdb_channel_state); + } + if (ret == C_ERR) goto error; + sdsfree(err); + return; - if (err[0] == '-') { - serverLog(LL_NOTICE, - "Server does not support sync with offset, dual channel sync approach cannot be used: %s", err); - goto error; - } - if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { - serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn)); - goto error; - } - sdsfree(err); - server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_ENDOFF; - return; - } - /* Receive end offset response */ - if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_ENDOFF) { - uint64_t rdb_client_id; - err = receiveSynchronousResponse(conn); - if (err == NULL) goto error; - if (err[0] == '\0') { - /* Retry again later */ - serverLog(LL_DEBUG, "Received empty $ENDOFF response"); - sdsfree(err); - return; - } - long long reploffset; - char primary_replid[CONFIG_RUN_ID_SIZE + 1]; - int dbid; - /* Parse end offset response */ - char *endoff_format = "$ENDOFF:%lld %40s %d %llu"; - if (sscanf(err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) { - serverLog(LL_WARNING, "Received unexpected $ENDOFF response: %s", err); - goto error; - } +error: + if (err) { + serverLog(LL_WARNING, "Dual channel sync failed with error %s", err); sdsfree(err); - server.rdb_client_id = rdb_client_id; - server.primary_initial_offset = reploffset; - - /* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */ - server.repl_provisional_primary.conn = server.repl_transfer_s; - memcpy(server.repl_provisional_primary.replid, primary_replid, CONFIG_RUN_ID_SIZE); - server.repl_provisional_primary.reploff = reploffset; - server.repl_provisional_primary.read_reploff = reploffset; - server.repl_provisional_primary.dbid = dbid; - - /* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the - * main connection accordingly.*/ - server.repl_transfer_s->state = CONN_STATE_CONNECTED; - server.repl_state = REPL_STATE_SEND_HANDSHAKE; - serverAssert(connSetReadHandler(server.repl_transfer_s, setupMainConnForPsync) != C_ERR); - setupMainConnForPsync(server.repl_transfer_s); - - /* As the next block we will receive using this connection is the rdb, we need to prepare - * the connection accordingly */ - serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, readSyncBulkPayload) != C_ERR); - server.repl_transfer_size = -1; - server.repl_transfer_read = 0; - server.repl_transfer_last_fsync_off = 0; - server.repl_transfer_lastio = server.unixtime; - - server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOAD; - return; } - -error: - sdsfree(err); if (server.repl_transfer_s) { connClose(server.repl_transfer_s); server.repl_transfer_s = NULL; @@ -2751,7 +2776,6 @@ static void fullSyncWithPrimary(connection *conn) { server.repl_transfer_fd = -1; server.repl_state = REPL_STATE_CONNECT; replicationAbortDualChannelSyncTransfer(); - return; } /* Replication: Replica side. @@ -2920,24 +2944,23 @@ void dualChannelSyncSuccess(void) { /* Replication: Replica side. * Main channel successfully established psync with primary. Check whether the rdb channel * has completed its part and act accordingly. */ -void dualChannelSyncHandlePsync(void) { +int dualChannelSyncHandlePsync(void) { serverAssert(server.repl_state == REPL_STATE_RECEIVE_PSYNC_REPLY); if (server.repl_rdb_channel_state < REPL_DUAL_CHANNEL_RDB_LOADED) { /* RDB is still loading */ if (connSetReadHandler(server.repl_provisional_primary.conn, bufferReplData) == C_ERR) { serverLog(LL_WARNING, "Error while setting readable handler: %s", strerror(errno)); cancelReplicationHandshake(1); - return; + return C_ERR; } replDataBufInit(); - server.repl_state = REPL_STATE_TRANSFER; - return; + return C_OK; } serverAssert(server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RDB_LOADED); /* RDB is loaded */ serverLog(LL_DEBUG, "Dual channel sync - psync established after rdb load"); dualChannelSyncSuccess(); - return; + return C_OK; } /* Replication: Replica side. @@ -3195,46 +3218,54 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { return PSYNC_NOT_SUPPORTED; } -/* Replication: Replica side. - * This connection handler fires after rdb-connection was initialized. We use it - * to adjust the replica main for loading incremental changes into the local buffer. */ -void setupMainConnForPsync(connection *conn) { - int psync_result = -1; - char llstr[LONG_STR_SIZE]; - char *err = NULL; - if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { - /* We already have an initialized connection at primary side, we only need to associate it with RDB connection */ - ull2string(llstr, sizeof(llstr), server.rdb_client_id); - err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); - if (err) goto error; - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - sdsfree(err); - return; + +sds getTryPsyncString(int result) { + switch (result) { + case PSYNC_WRITE_ERROR: return sdsnew("PSYNC_WRITE_ERROR"); + case PSYNC_WAIT_REPLY: return sdsnew("PSYNC_WAIT_REPLY"); + case PSYNC_CONTINUE: return sdsnew("PSYNC_CONTINUE"); + case PSYNC_FULLRESYNC: return sdsnew("PSYNC_FULLRESYNC"); + case PSYNC_NOT_SUPPORTED: return sdsnew("PSYNC_NOT_SUPPORTED"); + case PSYNC_TRY_LATER: return sdsnew("PSYNC_TRY_LATER"); + case PSYNC_FULLRESYNC_DUAL_CHANNEL: return sdsnew("PSYNC_FULLRESYNC_DUAL_CHANNEL"); + default: return sdsnew("Unknown result"); } +} - if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto error; - if (err[0] == '-') { - serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", err); - goto error; - } - sdsfree(err); - err = NULL; - server.repl_state = REPL_STATE_SEND_PSYNC; +int dualChannelReplMainConnSendHandshake(connection *conn, sds *err) { + char llstr[LONG_STR_SIZE]; + ull2string(llstr, sizeof(llstr), server.rdb_client_id); + *err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); + if (*err) return C_ERR; + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + return C_OK; +} + +int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + if ((*err)[0] == '-') { + serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err); + return C_ERR; } + server.repl_state = REPL_STATE_SEND_PSYNC; + return C_OK; +} - if (server.repl_state == REPL_STATE_SEND_PSYNC) { - if (server.debug_pause_after_fork) debugPauseProcess(); - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { - serverLog(LL_WARNING, "Aborting dual channel sync. Write error."); - cancelReplicationHandshake(1); - } - server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; - return; +int dualChannelReplMainConnSendPsync(connection *conn, sds *err) { + if (server.debug_pause_after_fork) debugPauseProcess(); + if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { + serverLog(LL_WARNING, "Aborting dual channel sync. Write error."); + *err = sdsnew(connGetLastError(conn)); + return C_ERR; } - psync_result = replicaTryPartialResynchronization(conn, 1); - if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ + server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + return C_OK; +} + +int dualChannelReplMainConnRecvPsyncReply(connection *conn, sds *err) { + int psync_result = replicaTryPartialResynchronization(conn, 1); + if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization%s", @@ -3244,15 +3275,52 @@ void setupMainConnForPsync(connection *conn) { "accept connections in read-write mode.\n"); } dualChannelSyncHandlePsync(); - return; + return C_OK; } + *err = getTryPsyncString(psync_result); + return C_ERR; +} -error: +/* Replication: Replica side. + * This connection handler fires after rdb-connection was initialized. We use it + * to adjust the replica main for loading incremental changes into the local buffer. */ +void dualChannelSetupMainConnForPsync(connection *conn) { + char *err = NULL; + int ret; + + switch (server.repl_state) { + case REPL_STATE_SEND_HANDSHAKE: + ret = dualChannelReplMainConnSendHandshake(conn, &err); + if (ret == C_OK) server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + break; + case REPL_STATE_RECEIVE_CAPA_REPLY: + ret = dualChannelReplMainConnRecvCapaReply(conn, &err); + if (ret == C_ERR) { + break; + } + if (ret == C_OK) server.repl_state = REPL_STATE_SEND_PSYNC; + sdsfree(err); + err = NULL; + /* fall through */ + case REPL_STATE_SEND_PSYNC: + ret = dualChannelReplMainConnSendPsync(conn, &err); + if (ret == C_OK) server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + break; + case REPL_STATE_RECEIVE_PSYNC_REPLY: + ret = dualChannelReplMainConnRecvPsyncReply(conn, &err); + if (ret == C_OK && server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) + server.repl_state = REPL_STATE_TRANSFER; + /* In case the RDB is already loaded, the repl_state will be set during establishPrimaryConnection. */ + break; + default: + serverPanic("Unexpected replication state: %d", server.repl_state); + } + + if (ret == C_ERR) { + serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d %s", ret, err ? err : ""); + cancelReplicationHandshake(1); + } sdsfree(err); - /* The dual-channel sync session must be aborted for any psync_result other than PSYNC_CONTINUE or PSYNC_WAIT_REPLY. - */ - serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d", psync_result); - cancelReplicationHandshake(1); } /* @@ -3625,7 +3693,7 @@ void syncWithPrimary(connection *conn) { /* Create RDB connection */ server.repl_rdb_transfer_s = connCreate(connTypeOfReplication()); if (connConnect(server.repl_rdb_transfer_s, server.primary_host, server.primary_port, server.bind_source_addr, - fullSyncWithPrimary) == C_ERR) { + dualChannelFullSyncWithPrimary) == C_ERR) { serverLog(LL_WARNING, "Unable to connect to Primary: %s", connGetLastError(server.repl_transfer_s)); connClose(server.repl_rdb_transfer_s); server.repl_rdb_transfer_s = NULL; diff --git a/src/server.h b/src/server.h index 4cc93ed8aa..4fad8d2508 100644 --- a/src/server.h +++ b/src/server.h @@ -110,6 +110,7 @@ struct hdr_histogram; /* Error codes */ #define C_OK 0 #define C_ERR -1 +#define C_RETRY -2 /* Static server configuration */ #define CONFIG_DEFAULT_HZ 10 /* Time interrupt calls/sec. */