From a97cb64ca9223a1fde09c739bf4d40ffbf0b64b6 Mon Sep 17 00:00:00 2001 From: naglera Date: Tue, 2 Jul 2024 09:59:42 +0000 Subject: [PATCH] Clang format for feature related c files Signed-off-by: naglera --- src/config.c | 6 +- src/debug.c | 10 +- src/networking.c | 34 ++-- src/rdb.c | 8 +- src/replication.c | 424 +++++++++++++++++++++++----------------------- src/rio.c | 36 ++-- src/rio.h | 2 +- src/server.c | 9 +- src/server.h | 82 ++++----- 9 files changed, 306 insertions(+), 305 deletions(-) diff --git a/src/config.c b/src/config.c index 5cf1b5d4e8..9983fa322d 100644 --- a/src/config.c +++ b/src/config.c @@ -2276,8 +2276,10 @@ static void numericConfigRewrite(standardConfig *config, const char *name, struc } #define createSpecialConfig(name, alias, modifiable, setfn, getfn, rewritefn, applyfn) \ - {.type = SPECIAL_CONFIG, \ - embedCommonConfig(name, alias, modifiable) embedConfigInterface(NULL, setfn, getfn, rewritefn, applyfn)} + { \ + .type = SPECIAL_CONFIG, \ + embedCommonConfig(name, alias, modifiable) embedConfigInterface(NULL, setfn, getfn, rewritefn, applyfn) \ + } static int isValidActiveDefrag(int val, const char **err) { #ifndef HAVE_DEFRAG diff --git a/src/debug.c b/src/debug.c index 9ba9d65f43..c5b3de99a4 100644 --- a/src/debug.c +++ b/src/debug.c @@ -994,19 +994,17 @@ void debugCommand(client *c) { return; } addReply(c, shared.ok); - } else if(!strcasecmp(c->argv[1]->ptr,"sleep-after-fork-seconds") && - c->argc == 3) { + } else if (!strcasecmp(c->argv[1]->ptr, "sleep-after-fork-seconds") && c->argc == 3) { double sleep_after_fork_seconds; if (getDoubleFromObjectOrReply(c, c->argv[2], &sleep_after_fork_seconds, NULL) != C_OK) { addReply(c, shared.err); return; } server.debug_sleep_after_fork_ms = (int)(sleep_after_fork_seconds * 1e6); - addReply(c,shared.ok); - } else if(!strcasecmp(c->argv[1]->ptr,"WAIT-BEFORE-RDB-CLIENT-FREE") && - c->argc == 3) { + addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr, "WAIT-BEFORE-RDB-CLIENT-FREE") && c->argc == 3) { server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr); - addReply(c,shared.ok); + addReply(c, shared.ok); } else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) { server.dict_resizing = atoi(c->argv[2]->ptr); addReply(c, shared.ok); diff --git a/src/networking.c b/src/networking.c index 3041a11586..ddcfee79ee 100644 --- a/src/networking.c +++ b/src/networking.c @@ -253,7 +253,8 @@ void putClientInPendingWriteQueue(client *c) { * writes at this stage. */ if (!(c->flags & CLIENT_PENDING_WRITE) && (c->repl_state == REPL_STATE_NONE || - ((c->repl_state == REPLICA_STATE_ONLINE || c->repl_state == REPLICA_STATE_BG_RDB_LOAD) && !c->repl_start_cmd_stream_on_ack))) { + ((c->repl_state == REPLICA_STATE_ONLINE || c->repl_state == REPLICA_STATE_BG_RDB_LOAD) && + !c->repl_start_cmd_stream_on_ack))) { /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something * to write to the socket. This way before re-entering the event @@ -1622,10 +1623,10 @@ void freeClient(client *c) { /* Log link disconnection with replica */ if (getClientType(c) == CLIENT_TYPE_REPLICA) { - serverLog(LL_NOTICE, c->flags & CLIENT_REPL_RDB_CONN ? - "Replica %s rdb connection disconnected.": - "Connection with replica %s lost.", - replicationGetReplicaName(c)); + serverLog(LL_NOTICE, + c->flags & CLIENT_REPL_RDB_CONN ? "Replica %s rdb connection disconnected." + : "Connection with replica %s lost.", + replicationGetReplicaName(c)); } /* Free the query buffer */ @@ -1812,16 +1813,19 @@ int freeClientsInAsyncFreeQueue(void) { if (c->flags & CLIENT_PROTECTED_RDB_CONN) { /* Check if it's safe to remove RDB connection protection during synchronization * The primary gives a grace period before freeing this client because - * it serves as a reference to the first required replication data block for + * it serves as a reference to the first required replication data block for * this replica */ if (!c->rdb_client_disconnect_time) { c->rdb_client_disconnect_time = server.unixtime; - serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", - (unsigned long long)c->id, replicationGetReplicaName(c), server.wait_before_rdb_client_free); + serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, + replicationGetReplicaName(c), server.wait_before_rdb_client_free); continue; } if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) { - serverLog(LL_NOTICE, "Replica main connection failed to establish PSYNC within the grace period (%ld seconds). Freeing RDB client %llu.", (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); + serverLog(LL_NOTICE, + "Replica main connection failed to establish PSYNC within the grace period (%ld seconds). " + "Freeing RDB client %llu.", + (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); c->flags &= ~CLIENT_PROTECTED_RDB_CONN; } } @@ -1851,7 +1855,7 @@ client *lookupClientByID(uint64_t id) { client *lookupRdbClientByID(uint64_t id) { id = htonu64(id); void *c = NULL; - raxFind(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),&c); + raxFind(server.replicas_waiting_psync, (unsigned char *)&id, sizeof(id), &c); return c; } @@ -4024,7 +4028,9 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) { serverAssert(c->reply_bytes < SIZE_MAX - (1024 * 64)); /* Note that c->reply_bytes is irrelevant for replica clients * (they use the global repl buffers). */ - if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_REPLICA) || (c->flags & CLIENT_CLOSE_ASAP && !(c->flags & CLIENT_PROTECTED_RDB_CONN))) return 0; + if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_REPLICA) || + (c->flags & CLIENT_CLOSE_ASAP && !(c->flags & CLIENT_PROTECTED_RDB_CONN))) + return 0; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(), c); /* Remove RDB connection protection on COB overrun */ @@ -4072,9 +4078,9 @@ void flushReplicasOutputBuffers(void) { * * 3. Obviously if the replica is not ONLINE. */ - if ((replica->repl_state == REPLICA_STATE_ONLINE || replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) - && !(replica->flags & CLIENT_CLOSE_ASAP) && - can_receive_writes && !replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) { + if ((replica->repl_state == REPLICA_STATE_ONLINE || replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) && + !(replica->flags & CLIENT_CLOSE_ASAP) && can_receive_writes && !replica->repl_start_cmd_stream_on_ack && + clientHasPendingReplies(replica)) { writeToClient(replica, 0); } } diff --git a/src/rdb.c b/src/rdb.c index 8e5dde17a8..aab7042915 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3542,7 +3542,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } /* Filter replica connections pending full sync (ie. in WAIT_BGSAVE_START state). */ listRewind(server.replicas, &li); - while((ln = listNext(&li))) { + while ((ln = listNext(&li))) { client *replica = ln->value; if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { /* Check replica has the exact requirements */ @@ -3556,7 +3556,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { /* This replica uses diskless dual connection sync, hence we need * to inform it with the save end offset.*/ sendCurrentOffsetToReplica(replica); - /* Make sure repl traffic is appended to the replication backlog */ + /* Make sure repl traffic is appended to the replication backlog */ addReplicaToPsyncWait(replica); } else { server.rdb_pipe_numconns++; @@ -3633,8 +3633,8 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { server.rdb_pipe_numconns_writing = 0; } } else { - serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", - (long) childpid, dual_conn? "replica socket" : "pipe"); + serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid, + dual_conn ? "replica socket" : "pipe"); server.rdb_save_time_start = time(NULL); server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ diff --git a/src/replication.c b/src/replication.c index b102df745b..70ed636957 100644 --- a/src/replication.c +++ b/src/replication.c @@ -199,16 +199,16 @@ void rebaseReplicationBuffer(long long base_repl_offset) { } /* Replication: Primary side - connections association. - * On rdb-connection sync, connection association is used to keep replication data in - * the backlog until the replica requests PSYNC. Association happens in two forms, + * On rdb-connection sync, connection association is used to keep replication data in + * the backlog until the replica requests PSYNC. Association happens in two forms, * if there's an existing buffer block at the fork time, the replica is attached * to the tail, if there is no tail, the replica will be attached when a new * buffer block is created (see the Retrospect function below). - * Replica rdb client id is used as a unique key for the association. - * On COB overrun, association is deleted and the RDB connection + * Replica rdb client id is used as a unique key for the association. + * On COB overrun, association is deleted and the RDB connection * is dropped. */ -void addReplicaToPsyncWait(client* replica) { +void addReplicaToPsyncWait(client *replica) { listNode *ln = NULL; replBufBlock *tail = NULL; if (server.repl_backlog == NULL) { @@ -220,13 +220,13 @@ void addReplicaToPsyncWait(client* replica) { tail->refcount++; } } - serverLog(LL_DEBUG, "Add replica %s to waiting psync rax, with cid %llu, %s ", replicationGetReplicaName(replica), (unsigned long long)replica->id, - tail? "with repl-backlog tail": "repl-backlog is empty"); - replica->ref_repl_buf_node = tail? ln: NULL; + serverLog(LL_DEBUG, "Add replica %s to waiting psync rax, with cid %llu, %s ", replicationGetReplicaName(replica), + (unsigned long long)replica->id, tail ? "with repl-backlog tail" : "repl-backlog is empty"); + replica->ref_repl_buf_node = tail ? ln : NULL; /* Prevent rdb client from being freed before psync is established. */ replica->flags |= CLIENT_PROTECTED_RDB_CONN; uint64_t id = htonu64(replica->id); - raxInsert(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),replica,NULL); + raxInsert(server.replicas_waiting_psync, (unsigned char *)&id, sizeof(id), replica, NULL); } /* Attach waiting psync replicas with new replication backlog head. */ @@ -237,10 +237,10 @@ void addReplicaToPsyncWaitRetrospect(void) { if (head == NULL) return; /* Update waiting psync replicas to wait on new buffer block */ - raxStart(&iter,server.replicas_waiting_psync); + raxStart(&iter, server.replicas_waiting_psync); raxSeek(&iter, "^", NULL, 0); - while(raxNext(&iter)) { - client* replica = iter.data; + while (raxNext(&iter)) { + client *replica = iter.data; if (replica->ref_repl_buf_node) continue; replica->ref_repl_buf_node = ln; head->refcount++; @@ -249,10 +249,10 @@ void addReplicaToPsyncWaitRetrospect(void) { raxStop(&iter); } -void removeReplicaFromPsyncWait(client* replica) { +void removeReplicaFromPsyncWait(client *replica) { listNode *ln; replBufBlock *o; - /* Get replBufBlock pointed by this replica */ + /* Get replBufBlock pointed by this replica */ client *peer_replica = lookupRdbClientByID(replica->associated_rdb_client_id); ln = peer_replica->ref_repl_buf_node; o = ln ? listNodeValue(ln) : NULL; @@ -262,10 +262,11 @@ void removeReplicaFromPsyncWait(client* replica) { } peer_replica->ref_repl_buf_node = NULL; peer_replica->flags &= ~CLIENT_PROTECTED_RDB_CONN; - serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", - replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id, o? "ref count decreased": "doesn't exist"); + serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", + replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id, + o ? "ref count decreased" : "doesn't exist"); uint64_t id = htonu64(peer_replica->id); - raxRemove(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),NULL); + raxRemove(server.replicas_waiting_psync, (unsigned char *)&id, sizeof(id), NULL); } void resetReplicationBuffer(void) { @@ -379,9 +380,9 @@ void incrementalTrimReplicationBacklog(size_t max_blocks) { void freeReplicaReferencedReplBuffer(client *replica) { if (replica->flags & CLIENT_REPL_RDB_CONN) { uint64_t id = htonu64(replica->id); - if(raxRemove(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),NULL)) { + if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&id, sizeof(id), NULL)) { serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.", - replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id); + replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id); } } if (replica->ref_repl_buf_node != NULL) { @@ -467,11 +468,10 @@ void feedReplicationBuffer(char *s, size_t len) { /* For output buffer of replicas. */ listIter li; - listRewind(server.replicas,&li); - while((ln = listNext(&li))) { + listRewind(server.replicas, &li); + while ((ln = listNext(&li))) { client *replica = ln->value; - if (!canFeedReplicaReplBuffer(replica) && - !(replica->flags & CLIENT_PROTECTED_RDB_CONN)) continue; + if (!canFeedReplicaReplBuffer(replica) && !(replica->flags & CLIENT_PROTECTED_RDB_CONN)) continue; /* Update shared replication buffer start position. */ if (replica->ref_repl_buf_node == NULL) { replica->ref_repl_buf_node = start_node; @@ -854,8 +854,8 @@ int primaryTryPartialResynchronization(client *c, long long psync_offset) { goto need_full_resync; } - /* There are two scenarios that lead to this point. One is that we are able - * to perform a partial resync with the replica. The second is that the replica + /* There are two scenarios that lead to this point. One is that we are able + * to perform a partial resync with the replica. The second is that the replica * is using rdb-connection sync, while loading the snapshot in the background. * in both cases: * 1) Set client state to make it a replica. @@ -866,7 +866,7 @@ int primaryTryPartialResynchronization(client *c, long long psync_offset) { c->repl_state = REPLICA_STATE_BG_RDB_LOAD; removeReplicaFromPsyncWait(c); } else { - c->repl_state = REPLICA_STATE_ONLINE; + c->repl_state = REPLICA_STATE_ONLINE; } c->repl_ack_time = server.unixtime; c->repl_start_cmd_stream_on_ack = 0; @@ -1093,11 +1093,13 @@ void syncCommand(client *c) { * resync. */ if (primary_replid[0] != '?') server.stat_sync_partial_err++; if (c->replica_capa & REPLICA_CAPA_DUAL_CONN) { - c->flags |= CLIENT_REPL_MAIN_CONN; - serverLog(LL_NOTICE,"Replica %s is capable of rdb-connection synchronization, and partial sync isn't possible. " - "Full sync will continue with dedicated RDB connection.", replicationGetReplicaName(c)); + c->flags |= CLIENT_REPL_MAIN_CONN; + serverLog(LL_NOTICE, + "Replica %s is capable of rdb-connection synchronization, and partial sync isn't possible. " + "Full sync will continue with dedicated RDB connection.", + replicationGetReplicaName(c)); const char *buf = "+DUALCONNECTIONSYNC\r\n"; - if (connWrite(c->conn,buf,strlen(buf)) != (int)strlen(buf)) { + if (connWrite(c->conn, buf, strlen(buf)) != (int)strlen(buf)) { freeClientAsync(c); } return; @@ -1237,10 +1239,10 @@ void syncCommand(client *c) { * * - version * The replica reports its version. - * + * * - rdb-conn <1|0> - * Used to identify the client as a replica's rdb connection in an dual connection - * sync session. + * Used to identify the client as a replica's rdb connection in an dual connection + * sync session. * */ void replconfCommand(client *c) { int j; @@ -1277,9 +1279,9 @@ void replconfCommand(client *c) { c->replica_capa |= REPLICA_CAPA_EOF; else if (!strcasecmp(c->argv[j + 1]->ptr, "psync2")) c->replica_capa |= REPLICA_CAPA_PSYNC2; - else if (!strcasecmp(c->argv[j+1]->ptr,"dual-conn") && - server.dual_conn_enabled && server.repl_diskless_sync) { - /* If rdb-connection is disable on this primary, treat this command as unrecognized + else if (!strcasecmp(c->argv[j + 1]->ptr, "dual-conn") && server.dual_conn_enabled && + server.repl_diskless_sync) { + /* If rdb-connection is disable on this primary, treat this command as unrecognized * replconf option. */ c->replica_capa |= REPLICA_CAPA_DUAL_CONN; } @@ -1307,8 +1309,7 @@ void replconfCommand(client *c) { * quick check first (instead of waiting for the next ACK. */ if (server.child_type == CHILD_TYPE_RDB && c->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) checkChildrenDone(); - if (c->repl_start_cmd_stream_on_ack && c->repl_state == REPLICA_STATE_ONLINE) - replicaStartCommandStream(c); + if (c->repl_start_cmd_stream_on_ack && c->repl_state == REPLICA_STATE_ONLINE) replicaStartCommandStream(c); if (c->repl_state == REPLICA_STATE_BG_RDB_LOAD) { c->flags &= ~CLIENT_REPL_MAIN_CONN; replicaPutOnline(c); @@ -1367,10 +1368,9 @@ void replconfCommand(client *c) { } } else if (!strcasecmp(c->argv[j]->ptr, "rdb-conn")) { long start_with_offset = 0; - if (getRangeLongFromObjectOrReply(c, c->argv[j +1], - 0, 1, &start_with_offset,NULL) != C_OK) { + if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], 0, 1, &start_with_offset, NULL) != C_OK) { return; - } + } if (start_with_offset == 1) { c->flags |= CLIENT_REPL_RDB_CONN; c->replica_req |= REPLICA_REQ_RDB_CONN; @@ -1382,7 +1382,7 @@ void replconfCommand(client *c) { /* REPLCONF identify is used to identify the current replica main connection with existing * rdb-connection with the given id. */ long long client_id = 0; - if (getLongLongFromObjectOrReply(c, c->argv[j +1], &client_id, NULL) != C_OK) { + if (getLongLongFromObjectOrReply(c, c->argv[j + 1], &client_id, NULL) != C_OK) { return; } c->associated_rdb_client_id = (uint64_t)client_id; @@ -1855,8 +1855,7 @@ void replicationEmptyDbCallback(dict *d) { * at server.primary, starting from the specified file descriptor. */ void replicationCreatePrimaryClientWithHandler(connection *conn, int dbid, ConnectionCallbackFunc handler) { server.primary = createClient(conn); - if (conn) - connSetReadHandler(server.primary->conn, handler); + if (conn) connSetReadHandler(server.primary->conn, handler); /** * Important note: @@ -2348,13 +2347,11 @@ void readSyncBulkPayload(connection *conn) { server.repl_down_since = 0; /* Fire the primary link modules event. */ - moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, - VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, - NULL); + moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, NULL); if (server.repl_state == REPL_STATE_CONNECTED) { /* After a full resynchronization we use the replication ID and - * offset of the primary. The secondary ID / offset are cleared since - * we are starting a new history. */ + * offset of the primary. The secondary ID / offset are cleared since + * we are starting a new history. */ memcpy(server.replid, server.primary->replid, sizeof(server.replid)); server.primary_repl_offset = server.primary->reploff; } @@ -2376,7 +2373,7 @@ void readSyncBulkPayload(connection *conn) { * will trigger an AOF rewrite, and when done will start appending * to the new file. */ if (server.aof_enabled) restartAOFAfterSYNC(); - + /* In case of dual connection sync we want to close the RDB connection * once the connection is established */ if (conn == server.repl_rdb_transfer_s) { @@ -2478,7 +2475,7 @@ char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens } /* Replication: Replica side. - * Returns an sds represent this replica port to be used by the primary (mostly + * Returns an sds represent this replica port to be used by the primary (mostly * for logs) */ sds getReplicaPortString(void) { long long replica_port; @@ -2503,7 +2500,7 @@ void freePendingReplDataBuf(void) { } /* Replication: Replica side. - * Upon rdb-sync failure, close rdb-connection, reset repl-state, reset + * Upon rdb-sync failure, close rdb-connection, reset repl-state, reset * provisional primary struct, and free local replication buffer. */ void abortRdbConnectionSync(void) { serverAssert(server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE); @@ -2533,13 +2530,14 @@ void abortRdbConnectionSync(void) { /* Replication: Primary side. * Send current replication offset to replica. Use the following structure: * $ENDOFF: */ -int sendCurrentOffsetToReplica(client* replica) { +int sendCurrentOffsetToReplica(client *replica) { char buf[128]; int buflen; - buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d %llu\r\n", server.primary_repl_offset, server.replid, server.db->id, (long long unsigned int)replica->id); - serverLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu", - replicationGetReplicaName(replica), server.primary_repl_offset, (long long unsigned int)replica->id); - if (connSyncWrite(replica->conn, buf, buflen, server.repl_syncio_timeout*1000) != buflen) { + buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d %llu\r\n", server.primary_repl_offset, server.replid, + server.db->id, (long long unsigned int)replica->id); + serverLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu", + replicationGetReplicaName(replica), server.primary_repl_offset, (long long unsigned int)replica->id); + if (connSyncWrite(replica->conn, buf, buflen, server.repl_syncio_timeout * 1000) != buflen) { freeClientAsync(replica); return C_ERR; } @@ -2547,11 +2545,11 @@ int sendCurrentOffsetToReplica(client* replica) { } /* Replication: Replica side. - * This connection handler is used to initialize the RDB connection (dual-connection-sync-enabled sync). - * Once a replica with repl rdb-connection enabled, denied from PSYNC with its primary, + * This connection handler is used to initialize the RDB connection (dual-connection-sync-enabled sync). + * Once a replica with repl rdb-connection enabled, denied from PSYNC with its primary, * fullSyncWithPrimary begins its role. The connection handler prepare server.repl_rdb_transfer_s * for a rdb stream, and server.repl_transfer_s for increamental replication data stream. */ -void fullSyncWithPrimary(connection* conn) { +void fullSyncWithPrimary(connection *conn) { char *err = NULL; serverAssert(conn == server.repl_rdb_transfer_s); /* If this event fired after the user turned the instance into a primary @@ -2562,8 +2560,7 @@ void fullSyncWithPrimary(connection* conn) { /* Check for errors in the socket: after a non blocking connect() we * may find that the socket is in error state. */ if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_WARNING,"Error condition on socket for RDB-CONNECTION-SYNC: %s", - connGetLastError(conn)); + serverLog(LL_WARNING, "Error condition on socket for RDB-CONNECTION-SYNC: %s", connGetLastError(conn)); goto error; } /* Send replica capabilities */ @@ -2571,8 +2568,8 @@ void fullSyncWithPrimary(connection* conn) { serverLog(LL_DEBUG, "Received first reply from primary using rdb connection."); /* AUTH with the primary if required. */ if (server.primary_auth) { - char *args[3] = {"AUTH",NULL,NULL}; - size_t lens[3] = {4,0,0}; + char *args[3] = {"AUTH", NULL, NULL}; + size_t lens[3] = {4, 0, 0}; int argc = 1; if (server.primary_user) { args[argc] = server.primary_user; @@ -2586,18 +2583,17 @@ void fullSyncWithPrimary(connection* conn) { if (err) goto write_error; } /* Send replica lisening port to primary for clarification */ - sds portstr = getReplicaPortString(); - err = sendCommand(conn, "REPLCONF", "capa", "eof", - "rdb-only", "1", "rdb-conn", "1", "listening-port", portstr, NULL); + sds portstr = getReplicaPortString(); + err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-conn", "1", "listening-port", portstr, + NULL); sdsfree(portstr); if (err) goto write_error; server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_AUTH_REPLY; if (connSetReadHandler(conn, fullSyncWithPrimary) == C_ERR) { char conninfo[CONN_INFO_LEN]; - serverLog(LL_WARNING, - "Can't create readable event for SYNC: %s (%s)", - strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); + serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } return; @@ -2610,7 +2606,7 @@ void fullSyncWithPrimary(connection* conn) { err = receiveSynchronousResponse(conn); if (err == NULL) goto no_response_error; if (err[0] == '-') { - serverLog(LL_WARNING,"Unable to AUTH to Primary: %s",err); + serverLog(LL_WARNING, "Unable to AUTH to Primary: %s", err); sdsfree(err); goto error; } @@ -2625,15 +2621,16 @@ void fullSyncWithPrimary(connection* conn) { if (err == NULL) goto no_response_error; if (err[0] == '-') { - serverLog(LL_NOTICE, "Server does not support sync with offset, dual connection sync approach cannot be used: %s", err); + serverLog(LL_NOTICE, + "Server does not support sync with offset, dual connection sync approach cannot be used: %s", + err); goto error; } sdsfree(err); err = NULL; - if (connSyncWrite(conn, "SYNC\r\n",6, server.repl_syncio_timeout*1000) == -1) { - serverLog(LL_WARNING, "I/O error writing to Primary: %s", - connGetLastError(conn)); + if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { + serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn)); goto error; } @@ -2652,7 +2649,7 @@ void fullSyncWithPrimary(connection* conn) { return; } long long reploffset; - char primary_replid[CONFIG_RUN_ID_SIZE+1]; + char primary_replid[CONFIG_RUN_ID_SIZE + 1]; int dbid; /* Parse end offset response */ char *endoff_format = "$ENDOFF:%lld %40s %d %ld"; @@ -2670,7 +2667,7 @@ void fullSyncWithPrimary(connection* conn) { server.repl_provisional_primary.read_reploff = reploffset; server.repl_provisional_primary.dbid = dbid; - /* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the + /* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the * main connection accordingly.*/ server.repl_transfer_s->state = CONN_STATE_CONNECTED; server.repl_state = REPL_STATE_SEND_HANDSHAKE; @@ -2689,29 +2686,28 @@ void fullSyncWithPrimary(connection* conn) { return; } - no_response_error: - /* Handle receiveSynchronousResponse() error when primary has no reply */ - serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); - /* Fall through to regular error handling */ +no_response_error: + /* Handle receiveSynchronousResponse() error when primary has no reply */ + serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); + /* Fall through to regular error handling */ - error: - connClose(conn); - server.repl_transfer_s = NULL; - if (server.repl_rdb_transfer_s) { - connClose(server.repl_rdb_transfer_s); - server.repl_rdb_transfer_s = NULL; - } - if (server.repl_transfer_fd != -1) - close(server.repl_transfer_fd); - server.repl_transfer_fd = -1; - server.repl_state = REPL_STATE_CONNECT; - abortRdbConnectionSync(); - return; +error: + connClose(conn); + server.repl_transfer_s = NULL; + if (server.repl_rdb_transfer_s) { + connClose(server.repl_rdb_transfer_s); + server.repl_rdb_transfer_s = NULL; + } + if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd); + server.repl_transfer_fd = -1; + server.repl_state = REPL_STATE_CONNECT; + abortRdbConnectionSync(); + return; - write_error: /* Handle sendCommand() errors. */ - serverLog(LL_WARNING, "Sending command to primary in dual connection replication handshake: %s", err); - sdsfree(err); - goto error; +write_error: /* Handle sendCommand() errors. */ + serverLog(LL_WARNING, "Sending command to primary in dual connection replication handshake: %s", err); + sdsfree(err); + goto error; } /* Replication: Replica side. @@ -2730,9 +2726,9 @@ void replDataBufInit(void) { void replStreamProgressCallback(size_t offset, int readlen, time_t *last_progress_callback) { time_t now = mstime(); if (server.loading_process_events_interval_bytes && - (offset + readlen) / server.loading_process_events_interval_bytes > offset / server.loading_process_events_interval_bytes && - now - *last_progress_callback > server.loading_process_events_interval_miliseconds) - { + (offset + readlen) / server.loading_process_events_interval_bytes > + offset / server.loading_process_events_interval_bytes && + now - *last_progress_callback > server.loading_process_events_interval_miliseconds) { replicationSendNewlineToPrimary(); processEventsWhileBlocked(); *last_progress_callback = now; @@ -2741,11 +2737,11 @@ void replStreamProgressCallback(size_t offset, int readlen, time_t *last_progres /* Replication: Replica side. * Reads replication data from primary into specified repl buffer block */ -int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) { +int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) { int nread = connRead(conn, o->buf + o->used, read); if (nread == -1) { if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_VERBOSE, "Error reading from primary: %s",connGetLastError(conn)); + serverLog(LL_VERBOSE, "Error reading from primary: %s", connGetLastError(conn)); cancelReplicationHandshake(1); } return C_ERR; @@ -2773,7 +2769,7 @@ int isReplicaBufferLimitReached(void) { void bufferReplData(connection *conn) { size_t readlen = PROTO_IOBUF_LEN; int read = 0; - + while (readlen > 0) { listNode *ln = listLast(server.pending_repl_data.blocks); replDataBufBlock *tail = ln ? listNodeValue(ln) : NULL; @@ -2792,8 +2788,8 @@ void bufferReplData(connection *conn) { connSetReadHandler(conn, NULL); break; } - /* Create a new node, make sure it is allocated to at least PROTO_REPLY_CHUNK_BYTES. - * Use the same upper boundary as the shared replication buffer (feedReplicationBuffer), + /* Create a new node, make sure it is allocated to at least PROTO_REPLY_CHUNK_BYTES. + * Use the same upper boundary as the shared replication buffer (feedReplicationBuffer), * as they share the same purpose */ size_t usable_size; size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); @@ -2809,7 +2805,7 @@ void bufferReplData(connection *conn) { read = min(readlen, tail->size); readlen -= read; - read = readIntoReplDataBlock(conn, tail, read); + read = readIntoReplDataBlock(conn, tail, read); } if (read > 0) { /* Stop reading in case we read less than we anticipated */ @@ -2818,7 +2814,7 @@ void bufferReplData(connection *conn) { if (read == C_ERR) { return; } - } + } } /* Replication: Replica side. @@ -2844,7 +2840,7 @@ int streamReplDataBufToDb(client *c) { blockingOperationEnds(); if (!server.pending_repl_data.blocks) { /* If we encounter a `replicaof` command during the replStreamProgressCallback, - * pending_repl_data.blocks will be NULL, and we should return an error and + * pending_repl_data.blocks will be NULL, and we should return an error and * abort the current sync session. */ return C_ERR; } @@ -2879,7 +2875,7 @@ void rdbConnectionSyncPsyncEstablished(connection *conn) { if (server.repl_rdb_conn_state < REPL_RDB_CONN_RDB_LOADED) { /* RDB is still loading */ if (connSetReadHandler(server.repl_provisional_primary.conn, bufferReplData) == C_ERR) { - serverLog(LL_WARNING,"Error while setting readable handler: %s", strerror(errno)); + serverLog(LL_WARNING, "Error while setting readable handler: %s", strerror(errno)); cancelReplicationHandshake(1); } replDataBufInit(); @@ -2985,8 +2981,9 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { if (server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE) { /* While in rdb-connection-sync, we should use our prepared repl id and offset. */ psync_replid = server.repl_provisional_primary.replid; - snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff+1); - serverLog(LL_NOTICE, "Trying a partial resynchronization using main connection (request %s:%s).", psync_replid, psync_offset); + snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1); + serverLog(LL_NOTICE, "Trying a partial resynchronization using main connection (request %s:%s).", + psync_replid, psync_offset); } else if (server.cached_primary) { psync_replid = server.cached_primary->replid; snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->reploff + 1); @@ -3133,7 +3130,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { } if (!strncmp(reply, "+DUALCONNECTIONSYNC", 15)) { - /* A response of +DUALCONNECTIONSYNC from the primary implies that partial + /* A response of +DUALCONNECTIONSYNC from the primary implies that partial * synchronization is not possible and that the primary supports full * sync using dedicated RDB connection. Full sync will continue that way. */ server.primary_supports_dual_connection_sync = 1; @@ -3161,10 +3158,10 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { void setupMainConnForPsync(connection *conn) { int psync_result = -1; char llstr[LONG_STR_SIZE]; - char* err = NULL; + char *err = NULL; if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { /* We already have an initialized connection at primary side, we only need to associate it with RDB connection */ - ll2string(llstr,sizeof(llstr), server.rdb_client_id); + ll2string(llstr, sizeof(llstr), server.rdb_client_id); err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); if (err) goto error; server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; @@ -3175,7 +3172,7 @@ void setupMainConnForPsync(connection *conn) { err = receiveSynchronousResponse(conn); if (err == NULL) goto error; if (err[0] == '-') { - serverLog(LL_NOTICE,"Primary does not understand REPLCONF identify: %s", err); + serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", err); goto error; } sdsfree(err); @@ -3185,27 +3182,28 @@ void setupMainConnForPsync(connection *conn) { if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (server.debug_sleep_after_fork_ms) usleep(server.debug_sleep_after_fork_ms); - if (replicaTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { + if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { serverLog(LL_WARNING, "Aborting dual connection sync. Write error."); cancelReplicationHandshake(1); } server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; return; } - psync_result = replicaTryPartialResynchronization(conn,1); + psync_result = replicaTryPartialResynchronization(conn, 1); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "Primary <-> REPLICA sync: Primary accepted a Partial Resynchronization%s", - server.repl_rdb_transfer_s != NULL ? ", RDB load in background.":"."); + server.repl_rdb_transfer_s != NULL ? ", RDB load in background." : "."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { - serverCommunicateSystemd("STATUS=Primary <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n"); + serverCommunicateSystemd("STATUS=Primary <-> REPLICA sync: Partial Resynchronization accepted. Ready to " + "accept connections in read-write mode.\n"); } rdbConnectionSyncPsyncEstablished(conn); return; } - error: +error: /* The rdb-conn-sync session must be aborted for any psync_result other than PSYNC_CONTINUE or PSYNC_WAIT_REPLY. */ serverLog(LL_WARNING, "Aborting dual connection sync. Main connection psync result %d", psync_result); cancelReplicationHandshake(1); @@ -3215,81 +3213,81 @@ void setupMainConnForPsync(connection *conn) { * Dual connection for full sync * * * Motivation * - * - Reduce primary memory load. We do that by moving the COB tracking to the replica side. This also decrease - * the chance for COB overruns. Note that primary's input buffer limits at the replica side are less restricted - * then primary's COB as the replica plays less critical part in the replication group. While increasing the - * primary’s COB may end up with primary reaching swap and clients suffering, at replica side we’re more at + * - Reduce primary memory load. We do that by moving the COB tracking to the replica side. This also decrease + * the chance for COB overruns. Note that primary's input buffer limits at the replica side are less restricted + * then primary's COB as the replica plays less critical part in the replication group. While increasing the + * primary’s COB may end up with primary reaching swap and clients suffering, at replica side we’re more at * ease with it. Larger COB means better chance to sync successfully. - * - Reduce primary main process CPU load. By opening a new, dedicated connection for the RDB transfer, child - * processes can have direct access to the new connection. Due to TLS connection restrictions, this was not - * possible using one main connection. We eliminate the need for the child process to use the primary's + * - Reduce primary main process CPU load. By opening a new, dedicated connection for the RDB transfer, child + * processes can have direct access to the new connection. Due to TLS connection restrictions, this was not + * possible using one main connection. We eliminate the need for the child process to use the primary's * child-proc -> main-proc pipeline, thus freeing up the main process to process clients queries. * * * High level interface design * - * - RDB-connection sync begins when the replica sends a REPLCONF MAINCONN to the primary during initial - * handshake. This allows the replica to verify whether the primary supports rdb-connection sync and, if - * so, state that this is the replica's main connection, which is not used for snapshot transfer. - * - When replica lacks sufficient data for PSYNC, the primary will send +DUALCONNECTIONSYNC response instead - * of RDB data. As a next step, the replica creates a new connection (rdb-connection) and configures it against - * the primary with the appropriate capabilities and requirements. The replica then requests a sync - * using the RDB connection. - * - Prior to forking, the primary sends the replica the snapshot's end repl-offset, and attaches the replica - * to the replication backlog to keep repl data until the replica requests psync. The replica uses the main - * connection to request a PSYNC starting at the snapshot end offset. - * - The primary main threads sends incremental changes via the main connection, while the bgsave process - * sends the RDB directly to the replica via the rdb-connection. As for the replica, the incremental - * changes are stored on a local buffer, while the RDB is loaded into memory. - * - Once the replica completes loading the rdb, it drops the rdb-connection and streams the accumulated incremental + * - RDB-connection sync begins when the replica sends a REPLCONF MAINCONN to the primary during initial + * handshake. This allows the replica to verify whether the primary supports rdb-connection sync and, if + * so, state that this is the replica's main connection, which is not used for snapshot transfer. + * - When replica lacks sufficient data for PSYNC, the primary will send +DUALCONNECTIONSYNC response instead + * of RDB data. As a next step, the replica creates a new connection (rdb-connection) and configures it against + * the primary with the appropriate capabilities and requirements. The replica then requests a sync + * using the RDB connection. + * - Prior to forking, the primary sends the replica the snapshot's end repl-offset, and attaches the replica + * to the replication backlog to keep repl data until the replica requests psync. The replica uses the main + * connection to request a PSYNC starting at the snapshot end offset. + * - The primary main threads sends incremental changes via the main connection, while the bgsave process + * sends the RDB directly to the replica via the rdb-connection. As for the replica, the incremental + * changes are stored on a local buffer, while the RDB is loaded into memory. + * - Once the replica completes loading the rdb, it drops the rdb-connection and streams the accumulated incremental * changes into memory. Repl steady state continues normally. - * - * * Replica state machine * - * ┌───────────────────┐ Dual connection sync - * │RECEIVE_PING_REPLY │ ┌──────────────────────────────────────────────────────────────┐ - * └────────┬──────────┘ │ RDB connection states Main connection state │ - * │+PONG │ ┌────────────────────────────┐ ┌───────────────────┐ │ - * ┌────────▼──────────┐ ┌─┼─────►REPL_RDB_CONN_SEND_HANDSHAKE│ ┌─►SEND_HANDSHAKE │ │ - * │SEND_HANDSHAKE │ │ │ └────┬───────────────────────┘ │ └──┬────────────────┘ │ - * └────────┬──────────┘ │ │ │ │ │REPLCONF set-rdb-client-id - * │ │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ - * ┌────────▼──────────┐ │ │ │RDB_CONN_RECEIVE_AUTH_REPLY │ │ │RECEIVE_CAPA_REPLY │ │ - * │RECEIVE_AUTH_REPLY │ │ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ - * └────────┬──────────┘ │ │ │+OK │ │+OK │ - * │+OK │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ - * ┌────────▼──────────┐ │ │ │RDB_CONN_RECEIVE_REPLCONF_REPLY│ │ │SEND_PSYNC │ │ - * │RECEIVE_PORT_REPLY │ │ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ - * └────────┬──────────┘ │ │ │+OK │ │PSYNC use snapshot │ - * │+OK │ │ ┌───────▼───────────────┐ │ │end-offset provided │ - * ┌────────▼──────────┐ │ │ │RDB_CONN_RECEIVE_ENDOFF│ │ │by the primary │ - * │RECEIVE_IP_REPLY │ │ │ └───────┬───────────────┘ │ ┌──▼────────────────┐ │ - * └────────┬──────────┘ │ │ │$ENDOFF │ │RECEIVE_PSYNC_REPLY│ │ - * │+OK │ │ ├─────────────────────────┘ └──┬────────────────┘ │ - * ┌────────▼──────────┐ │ │ │ │+CONTINUE │ - * │RECEIVE_IP_REPLY │ │ │ ┌───────▼───────────────┐ ┌──▼────────────────┐ │ - * └────────┬──────────┘ │ │ │RDB_CONN_RDB_LOAD │ │TRANSFER │ │ - * │+OK │ │ └───────┬───────────────┘ └─────┬─────────────┘ │ - * ┌────────▼────────────────┐ │ │ │Done loading │ │ - * │RECEIVE_NO_FULLSYNC_REPLY│ │ │ ┌───────▼───────────────┐ │ │ - * └─┬────┬──────────────────┘ │ │ │RDB_CONN_RDB_LOADED │ │ │ - * │+OK │Unrecognized REPLCONF│ │ └───────┬───────────────┘ │ │ - * ┌─▼────▼────────────┐ │ │ │ │ │ - * │RECEIVE_CAPA_REPLY │ │ │ │Replica loads local replication │ │ - * └────────┬──────────┘ │ │ │buffer into memory │ │ - * │ │ │ └──────────────┬──────────────────┘ │ - * ┌────────▼───┐ │ │ │ │ - * │SEND_PSYNC │ │ └─────────────────────────┼────────────────────────────────────┘ - * └─┬──────────┘ │ │ - * │PSYNC (use cached-primary)│ │ - * ┌─▼─────────────────┐ │ │ - * │RECEIVE_PSYNC_REPLY│ │ │ - * └────────┬─┬────────┘ │ │ - * +CONTINUE│ │+DUALCONNECTIONSYNC │ - * │ │ └─────────────────┘ │ - * │ │+FULLRESYNC │ - * │ ┌─▼─────────────────┐ ┌─────────▼─────────┐ - * │ │TRANSFER ├───────────────────►CONNECTED │ - * │ └───────────────────┘ └─────────▲─────────┘ - * │ │ - * └──────────────────────────────────────────────────────┘ + * + * * Replica state machine * + * ┌───────────────────┐ Dual connection sync + * │RECEIVE_PING_REPLY │ ┌──────────────────────────────────────────────────────────────┐ + * └────────┬──────────┘ │ RDB connection states Main connection state │ + * │+PONG │ ┌────────────────────────────┐ ┌───────────────────┐ │ + * ┌────────▼──────────┐ ┌─┼─────►REPL_RDB_CONN_SEND_HANDSHAKE│ ┌─►SEND_HANDSHAKE │ │ + * │SEND_HANDSHAKE │ │ │ └────┬───────────────────────┘ │ └──┬────────────────┘ │ + * └────────┬──────────┘ │ │ │ │ │REPLCONF set-rdb-client-id + * │ │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ + * ┌────────▼──────────┐ │ │ │RDB_CONN_RECEIVE_AUTH_REPLY │ │ │RECEIVE_CAPA_REPLY │ │ + * │RECEIVE_AUTH_REPLY │ │ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ + * └────────┬──────────┘ │ │ │+OK │ │+OK │ + * │+OK │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ + * ┌────────▼──────────┐ │ │ │RDB_CONN_RECEIVE_REPLCONF_REPLY│ │ │SEND_PSYNC │ │ + * │RECEIVE_PORT_REPLY │ │ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ + * └────────┬──────────┘ │ │ │+OK │ │PSYNC use snapshot │ + * │+OK │ │ ┌───────▼───────────────┐ │ │end-offset provided │ + * ┌────────▼──────────┐ │ │ │RDB_CONN_RECEIVE_ENDOFF│ │ │by the primary │ + * │RECEIVE_IP_REPLY │ │ │ └───────┬───────────────┘ │ ┌──▼────────────────┐ │ + * └────────┬──────────┘ │ │ │$ENDOFF │ │RECEIVE_PSYNC_REPLY│ │ + * │+OK │ │ ├─────────────────────────┘ └──┬────────────────┘ │ + * ┌────────▼──────────┐ │ │ │ │+CONTINUE │ + * │RECEIVE_IP_REPLY │ │ │ ┌───────▼───────────────┐ ┌──▼────────────────┐ │ + * └────────┬──────────┘ │ │ │RDB_CONN_RDB_LOAD │ │TRANSFER │ │ + * │+OK │ │ └───────┬───────────────┘ └─────┬─────────────┘ │ + * ┌────────▼────────────────┐ │ │ │Done loading │ │ + * │RECEIVE_NO_FULLSYNC_REPLY│ │ │ ┌───────▼───────────────┐ │ │ + * └─┬────┬──────────────────┘ │ │ │RDB_CONN_RDB_LOADED │ │ │ + * │+OK │Unrecognized REPLCONF│ │ └───────┬───────────────┘ │ │ + * ┌─▼────▼────────────┐ │ │ │ │ │ + * │RECEIVE_CAPA_REPLY │ │ │ │Replica loads local replication │ │ + * └────────┬──────────┘ │ │ │buffer into memory │ │ + * │ │ │ └──────────────┬──────────────────┘ │ + * ┌────────▼───┐ │ │ │ │ + * │SEND_PSYNC │ │ └─────────────────────────┼────────────────────────────────────┘ + * └─┬──────────┘ │ │ + * │PSYNC (use cached-primary)│ │ + * ┌─▼─────────────────┐ │ │ + * │RECEIVE_PSYNC_REPLY│ │ │ + * └────────┬─┬────────┘ │ │ + * +CONTINUE│ │+DUALCONNECTIONSYNC │ + * │ │ └─────────────────┘ │ + * │ │+FULLRESYNC │ + * │ ┌─▼─────────────────┐ ┌─────────▼─────────┐ + * │ │TRANSFER ├───────────────────►CONNECTED │ + * │ └───────────────────┘ └─────────▲─────────┘ + * │ │ + * └──────────────────────────────────────────────────────┘ */ /* This handler fires when the non blocking connect was able to * establish a connection with the primary. */ @@ -3374,8 +3372,7 @@ void syncWithPrimary(connection *conn) { * replica listening port correctly. */ { sds portstr = getReplicaPortString(); - err = sendCommand(conn,"REPLCONF", - "listening-port",portstr, NULL); + err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); sdsfree(portstr); if (err) goto write_error; } @@ -3391,7 +3388,7 @@ void syncWithPrimary(connection *conn) { /* When using rdb-connection for sync, announce that the replica is capable * of dual connection sync. */ if (server.dual_conn_enabled) { - err = sendCommand(conn,"REPLCONF", "capa" ,"dual-conn", NULL); + err = sendCommand(conn, "REPLCONF", "capa", "dual-conn", NULL); } /* Inform the primary of our (replica) capabilities. @@ -3472,9 +3469,10 @@ void syncWithPrimary(connection *conn) { if (server.repl_state == REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY) { err = receiveSynchronousResponse(conn); - if (err == NULL) goto error; + if (err == NULL) + goto error; else if (err[0] == '-') { - serverLog(LL_NOTICE,"(Non critical) Primary is not capable of rdb-connection sync"); + serverLog(LL_NOTICE, "(Non critical) Primary is not capable of rdb-connection sync"); } sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; @@ -3599,32 +3597,29 @@ void syncWithPrimary(connection *conn) { server.repl_transfer_fd = dfd; } - /* Using rdb-connection sync, the primary responded +DUALCONNECTIONSYNC. We need to + /* Using rdb-connection sync, the primary responded +DUALCONNECTIONSYNC. We need to * initialize the RDB connection. */ if (psync_result == PSYNC_FULLRESYNC_RDB_CONN) { /* Create a full sync connection */ server.repl_rdb_transfer_s = connCreate(connTypeOfReplication()); - if (connConnect(server.repl_rdb_transfer_s, server.primary_host, server.primary_port, - server.bind_source_addr, fullSyncWithPrimary) == C_ERR) { - serverLog(LL_WARNING,"Unable to connect to Primary: %s", - connGetLastError(server.repl_transfer_s)); + if (connConnect(server.repl_rdb_transfer_s, server.primary_host, server.primary_port, server.bind_source_addr, + fullSyncWithPrimary) == C_ERR) { + serverLog(LL_WARNING, "Unable to connect to Primary: %s", connGetLastError(server.repl_transfer_s)); connClose(server.repl_rdb_transfer_s); server.repl_rdb_transfer_s = NULL; goto error; } if (connSetReadHandler(conn, NULL) == C_ERR) { char conninfo[CONN_INFO_LEN]; - serverLog(LL_WARNING, - "Can't clear main connection handler: %s (%s)", - strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); + serverLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno), + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } server.repl_rdb_conn_state = REPL_RDB_CONN_SEND_HANDSHAKE; return; } /* Setup the non blocking download of the bulk file. */ - if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) - { + if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); @@ -3693,8 +3688,7 @@ void undoConnectWithPrimary(void) { * Never call this function directly, use cancelReplicationHandshake() instead. */ void replicationAbortSyncTransfer(void) { - serverAssert(server.repl_state == REPL_STATE_TRANSFER || - server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE); + serverAssert(server.repl_state == REPL_STATE_TRANSFER || server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE); undoConnectWithPrimary(); if (server.repl_transfer_fd != -1) { close(server.repl_transfer_fd); @@ -3780,7 +3774,7 @@ void replicationSetPrimary(char *ip, int port) { moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_DOWN, NULL); server.repl_state = REPL_STATE_CONNECT; - /* Allow trying rdb-connection sync with the new primary. If newprimaryr doesn't + /* Allow trying rdb-connection sync with the new primary. If newprimaryr doesn't * support rdb-connection sync, we will set to 0 afterwards. */ server.primary_supports_dual_connection_sync = -1; serverLog(LL_NOTICE, "Connecting to Primary %s:%d", server.primary_host, server.primary_port); @@ -4108,10 +4102,10 @@ void replicationDiscardCachedPrimary(void) { /* Replication: Replica side. * This method performs the necessary steps to establish a connection with the primary server. - * It sets private data, updates flags, and fires an event to notify modules about the primary link change. */ + * It sets private data, updates flags, and fires an event to notify modules about the primary link change. */ void establishPrimaryConnection(void) { connSetPrivateData(server.primary->conn, server.primary); - server.primary->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); + server.primary->flags &= ~(CLIENT_CLOSE_AFTER_REPLY | CLIENT_CLOSE_ASAP); server.primary->flags |= CLIENT_AUTHENTICATED; server.primary->last_interaction = server.unixtime; server.repl_state = REPL_STATE_CONNECTED; @@ -4119,7 +4113,6 @@ void establishPrimaryConnection(void) { /* Fire the primary link modules event. */ moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, NULL); - } /* Replication: Replica side. @@ -4133,7 +4126,7 @@ void replicationResurrectCachedPrimary(connection *conn) { server.primary = server.cached_primary; server.cached_primary = NULL; server.primary->conn = conn; - + establishPrimaryConnection(); /* Re-add to the list of clients. */ linkClient(server.primary); @@ -4165,7 +4158,8 @@ void replicationSteadyStateInit(void) { * Turn the provisional primary into the current primary. * This function is called after rdb-connection sync is finished successfully. */ void replicationResurrectProvisionalPrimary(void) { - /* Create a primary client, but do not initialize the read handler yet, as this replica still has a local buffer to drain. */ + /* Create a primary client, but do not initialize the read handler yet, as this replica still has a local buffer to + * drain. */ replicationCreatePrimaryClientWithHandler(server.repl_transfer_s, server.repl_provisional_primary.dbid, NULL); memcpy(server.primary->replid, server.repl_provisional_primary.replid, CONFIG_RUN_ID_SIZE); server.primary->reploff = server.repl_provisional_primary.reploff; @@ -4627,7 +4621,7 @@ void replicationCron(void) { if (listLength(server.repl_buffer_blocks) > 0) { replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks)); serverAssert(o->refcount > 0 && - o->refcount <= (int)listLength(server.replicas) + 1 + (int)raxSize(server.replicas_waiting_psync)); + o->refcount <= (int)listLength(server.replicas) + 1 + (int)raxSize(server.replicas_waiting_psync)); } /* Refresh the number of replicas with lag <= min-replicas-max-lag. */ diff --git a/src/rio.c b/src/rio.c index 35aa72bb55..ffe95002bc 100644 --- a/src/rio.c +++ b/src/rio.c @@ -55,7 +55,7 @@ #include "crc64.h" #include "config.h" #include "server.h" - #include "connhelpers.h" +#include "connhelpers.h" /* ------------------------- Buffer I/O implementation ----------------------- */ @@ -508,26 +508,26 @@ size_t rioWriteBulkDouble(rio *r, double d) { static size_t rioConnsetWrite(rio *r, const void *buf, size_t len) { ssize_t retval; int j; - unsigned char *p = (unsigned char*) buf; + unsigned char *p = (unsigned char *)buf; int doflush = (buf == NULL && len == 0); /* To start we always append to our buffer. If it gets larger than * a given size, we actually write to the sockets. */ if (len) { - r->io.connset.buf = sdscatlen(r->io.connset.buf,buf,len); + r->io.connset.buf = sdscatlen(r->io.connset.buf, buf, len); len = 0; /* Prevent entering the while below if we don't flush. */ if (sdslen(r->io.connset.buf) > PROTO_IOBUF_LEN) doflush = 1; } if (doflush) { - p = (unsigned char*) r->io.connset.buf; + p = (unsigned char *)r->io.connset.buf; len = sdslen(r->io.connset.buf); } /* Write in little chunchs so that when there are big writes we * parallelize while the kernel is sending data in background to * the TCP socket. */ - while(len) { + while (len) { size_t count = len < 1024 ? len : 1024; int broken = 0; for (j = 0; j < r->io.connset.numconns; j++) { @@ -540,8 +540,8 @@ static size_t rioConnsetWrite(rio *r, const void *buf, size_t len) { /* Make sure to write 'count' bytes to the socket regardless * of short writes. */ size_t nwritten = 0; - while(nwritten != count) { - retval = connWrite(r->io.connset.conns[j],p+nwritten,count-nwritten); + while (nwritten != count) { + retval = connWrite(r->io.connset.conns[j], p + nwritten, count - nwritten); if (retval <= 0) { /* With blocking sockets, which is the sole user of this * rio target, EWOULDBLOCK is returned only because of @@ -587,7 +587,7 @@ static off_t rioConnsetTell(rio *r) { static int rioConnsetFlush(rio *r) { /* Our flush is implemented by the write method, that recognizes a * buffer set to NULL with a count of zero as a flush request. */ - return rioConnsetWrite(r,NULL,0); + return rioConnsetWrite(r, NULL, 0); } static const rio rioConnsetIO = { @@ -595,21 +595,21 @@ static const rio rioConnsetIO = { rioConnsetWrite, rioConnsetTell, rioConnsetFlush, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* flags */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* flags */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + {{NULL, 0}} /* union for io-specific vars */ }; void rioInitWithConnset(rio *r, connection **conns, int numconns) { *r = rioConnsetIO; - r->io.connset.conns = zmalloc(sizeof(connection*) * numconns); + r->io.connset.conns = zmalloc(sizeof(connection *) * numconns); r->io.connset.state = zmalloc(sizeof(int) * numconns); - for (int i = 0; i < numconns; i++) { + for (int i = 0; i < numconns; i++) { connIncrRefs(conns[i]); r->io.connset.conns[i] = conns[i]; r->io.connset.state[i] = 0; @@ -622,10 +622,10 @@ void rioInitWithConnset(rio *r, connection **conns, int numconns) { /* release the rio stream. */ void rioFreeConnset(rio *r) { - for (int i = 0; i < r->io.connset.numconns; i++) { + for (int i = 0; i < r->io.connset.numconns; i++) { connection *conn = r->io.connset.conns[i]; connDecrRefs(conn); - callHandler(conn, NULL); // trigger close/free if necessary + callHandler(conn, NULL); // trigger close/free if necessary } zfree(r->io.connset.conns); diff --git a/src/rio.h b/src/rio.h index 5111570ba7..698dcb66d1 100644 --- a/src/rio.h +++ b/src/rio.h @@ -99,7 +99,7 @@ struct _rio { } fd; /* Multiple connections target (used to write to N sockets). */ struct { - connection **conns; /* Connections */ + connection **conns; /* Connections */ int *state; /* Error state of each fd. 0 (if ok) or errno. */ int numconns; off_t pos; diff --git a/src/server.c b/src/server.c index 5265de1945..7c8c2482dc 100644 --- a/src/server.c +++ b/src/server.c @@ -5160,8 +5160,7 @@ const char *replstateToString(int replstate) { switch (replstate) { case REPLICA_STATE_WAIT_BGSAVE_START: case REPLICA_STATE_WAIT_BGSAVE_END: return "wait_bgsave"; - case REPLICA_STATE_BG_RDB_LOAD: - return "bg_transfer"; + case REPLICA_STATE_BG_RDB_LOAD: return "bg_transfer"; case REPLICA_STATE_SEND_BULK: return "send_bulk"; case REPLICA_STATE_ONLINE: return "online"; default: return ""; @@ -5797,8 +5796,10 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "slave%d:ip=%s,port=%d,state=%s," "offset=%lld,lag=%ld,type=%s\r\n", replica_id, replica_ip, replica->replica_listening_port, state, - replica->repl_ack_off, lag, replica->flags & CLIENT_REPL_RDB_CONN ? "rdb-conn": - replica->repl_state == REPLICA_STATE_BG_RDB_LOAD ? "main-conn": "replica"); + replica->repl_ack_off, lag, + replica->flags & CLIENT_REPL_RDB_CONN ? "rdb-conn" + : replica->repl_state == REPLICA_STATE_BG_RDB_LOAD ? "main-conn" + : "replica"); replica_id++; } } diff --git a/src/server.h b/src/server.h index e88cd4c1b1..f1ac269dc0 100644 --- a/src/server.h +++ b/src/server.h @@ -144,7 +144,7 @@ struct hdr_histogram; 60 /* Grace period in seconds for replica main \ connection to establish psync. */ #define INCREMENTAL_REHASHING_THRESHOLD_US 1000 -#define LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT 100 /* Default: 0.1 seconds */ +#define LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT 100 /* Default: 0.1 seconds */ /* Bucket sizes for client eviction pools. Each bucket stores clients with * memory usage of up to twice the size of the bucket below it. */ @@ -435,13 +435,13 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_AUTHENTICATED (1ULL << 52) /* Indicate a client has successfully authenticated */ #define CLIENT_REPL_MAIN_CONN \ - (1ULL << 52) /* Dual connection sync: track a connection \ + (1ULL << 52) /* Dual connection sync: track a connection \ which is used for online replication data */ #define CLIENT_REPL_RDB_CONN \ - (1ULL << 53) /* Dual connection sync: track a connection \ + (1ULL << 53) /* Dual connection sync: track a connection \ which is used for rdb snapshot */ #define CLIENT_PROTECTED_RDB_CONN \ - (1ULL << 54) /* Dual connection sync: Protects the RDB client from premature \ + (1ULL << 54) /* Dual connection sync: Protects the RDB client from premature \ * release during full sync. This flag is used to ensure that the RDB client, which \ * references the first replication data block required by the replica, is not \ * released prematurely. Protecting the client is crucial for prevention of \ @@ -542,8 +542,8 @@ typedef enum { /* Replica capabilities. */ #define REPLICA_CAPA_NONE 0 -#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ -#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ +#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ +#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ #define REPLICA_CAPA_DUAL_CONN (1 << 2) /* Supports dual connection sync */ /* Replica requirements */ @@ -1919,41 +1919,41 @@ struct valkeyServer { default no. (for testings). */ /* RDB persistence */ - long long dirty; /* Changes to DB from the last save */ - long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ - long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */ - long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */ - struct saveparam *saveparams; /* Save points array for RDB */ - int saveparamslen; /* Number of saving points */ - char *rdb_filename; /* Name of RDB file */ - int rdb_compression; /* Use compression in RDB? */ - int rdb_checksum; /* Use RDB checksum? */ - int rdb_del_sync_files; /* Remove RDB files used only for SYNC if - the instance does not use persistence. */ - time_t lastsave; /* Unix time of last successful save */ - time_t lastbgsave_try; /* Unix time of last attempted bgsave */ - time_t rdb_save_time_last; /* Time used by last RDB save run. */ - time_t rdb_save_time_start; /* Current RDB save start time. */ - int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */ - int rdb_child_type; /* Type of save by active child. */ - int lastbgsave_status; /* C_OK or C_ERR */ - int primary_supports_dual_connection_sync; /* Track whether the primary is able to sync using rdb connection. - * -1 = unknown, 0 = no, 1 = yes. */ - int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ - int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */ - /* to the parent process in diskless repl. */ - int rdb_child_exit_pipe; /* Used by the diskless parent allow child exit. */ - connection **rdb_pipe_conns; /* Connections which are currently the */ - int rdb_pipe_numconns; /* target of diskless rdb fork child. */ - int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */ - char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */ - int rdb_pipe_bufflen; /* that was read from the rdb pipe. */ - int rdb_key_save_delay; /* Delay in microseconds between keys while - * writing aof or rdb. (for testings). negative - * value means fractions of microseconds (on average). */ - int key_load_delay; /* Delay in microseconds between keys while - * loading aof or rdb. (for testings). negative - * value means fractions of microseconds (on average). */ + long long dirty; /* Changes to DB from the last save */ + long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ + long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */ + long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */ + struct saveparam *saveparams; /* Save points array for RDB */ + int saveparamslen; /* Number of saving points */ + char *rdb_filename; /* Name of RDB file */ + int rdb_compression; /* Use compression in RDB? */ + int rdb_checksum; /* Use RDB checksum? */ + int rdb_del_sync_files; /* Remove RDB files used only for SYNC if + the instance does not use persistence. */ + time_t lastsave; /* Unix time of last successful save */ + time_t lastbgsave_try; /* Unix time of last attempted bgsave */ + time_t rdb_save_time_last; /* Time used by last RDB save run. */ + time_t rdb_save_time_start; /* Current RDB save start time. */ + int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */ + int rdb_child_type; /* Type of save by active child. */ + int lastbgsave_status; /* C_OK or C_ERR */ + int primary_supports_dual_connection_sync; /* Track whether the primary is able to sync using rdb connection. + * -1 = unknown, 0 = no, 1 = yes. */ + int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ + int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */ + /* to the parent process in diskless repl. */ + int rdb_child_exit_pipe; /* Used by the diskless parent allow child exit. */ + connection **rdb_pipe_conns; /* Connections which are currently the */ + int rdb_pipe_numconns; /* target of diskless rdb fork child. */ + int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */ + char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */ + int rdb_pipe_bufflen; /* that was read from the rdb pipe. */ + int rdb_key_save_delay; /* Delay in microseconds between keys while + * writing aof or rdb. (for testings). negative + * value means fractions of microseconds (on average). */ + int key_load_delay; /* Delay in microseconds between keys while + * loading aof or rdb. (for testings). negative + * value means fractions of microseconds (on average). */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ int child_info_nread; /* Num of bytes of the last read from pipe */