diff --git a/src/config.c b/src/config.c index 38e6d1d2fb..05a23c817c 100644 --- a/src/config.c +++ b/src/config.c @@ -3088,6 +3088,7 @@ standardConfig static_configs[] = { createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush , 0, NULL, NULL), createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL), createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL), + createBoolConfig("repl-rdb-channel", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.rdb_channel_enabled, 0, NULL, NULL), createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL), createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL), createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL), @@ -3252,6 +3253,7 @@ standardConfig static_configs[] = { /* Other configs */ createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */ + createTimeTConfig("loading-process-events-interval-ms", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, LONG_MAX, server.loading_process_events_interval_ms, 100, INTEGER_CONFIG, NULL, NULL), /* Default: 0.1 seconds */ createOffTConfig("auto-aof-rewrite-min-size", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.aof_rewrite_min_size, 64*1024*1024, MEMORY_CONFIG, NULL, NULL), createOffTConfig("loading-process-events-interval-bytes", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 1024, INT_MAX, server.loading_process_events_interval_bytes, 1024*1024*2, INTEGER_CONFIG, NULL, NULL), diff --git a/src/debug.c b/src/debug.c index 3142389172..db1c701a16 100644 --- a/src/debug.c +++ b/src/debug.c @@ -496,6 +496,10 @@ void debugCommand(client *c) { " In case RESET is provided the peak reset time will be restored to the default value", "REPLYBUFFER RESIZING <0|1>", " Enable or disable the reply buffer resize cron job", +"SLEEP-AFTER-FORK ", +" Stop the server's main process for after forking.", +"WAIT-BEFORE-RDB-CLIENT-FREE ", +" Grace period in seconds for replica main channel to establish psync.", "DICT-RESIZING <0|1>", " Enable or disable the main dict and expire dict resizing.", NULL @@ -1022,6 +1026,14 @@ NULL return; } addReply(c, shared.ok); + } else if(!strcasecmp(c->argv[1]->ptr,"SLEEP-AFTER-FORK") && + c->argc == 3) { + server.debug_sleep_after_fork = atoi(c->argv[2]->ptr); + addReply(c,shared.ok); + } else if(!strcasecmp(c->argv[1]->ptr,"WAIT-BEFORE-RDB-CLIENT-FREE") && + c->argc == 3) { + server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr); + addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) { server.dict_resizing = atoi(c->argv[2]->ptr); addReply(c, shared.ok); diff --git a/src/networking.c b/src/networking.c index 5aa02e8315..e64cc81b07 100644 --- a/src/networking.c +++ b/src/networking.c @@ -183,6 +183,8 @@ client *createClient(connection *conn) { c->slave_addr = NULL; c->slave_capa = SLAVE_CAPA_NONE; c->slave_req = SLAVE_REQ_NONE; + c->associated_rdb_client_id = 0; + c->rdb_client_disconnect_time = 0; c->reply = listCreate(); c->deferred_reply_errors = NULL; c->reply_bytes = 0; @@ -237,6 +239,11 @@ void installClientWriteHandler(client *c) { } } +/* Determining whether a replica requires online data updates based on its state */ +int isReplDataRequired(client *c) { + return c->replstate == SLAVE_STATE_ONLINE || c->replstate == SLAVE_STATE_BG_RDB_LOAD; +} + /* This function puts the client in the queue of clients that should write * their output buffers to the socket. Note that it does not *yet* install * the write handler, to start clients are put in a queue of clients that need @@ -250,7 +257,7 @@ void putClientInPendingWriteQueue(client *c) { * writes at this stage. */ if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || - (c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack))) + (isReplDataRequired(c) && !c->repl_start_cmd_stream_on_ack))) { /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something @@ -1569,7 +1576,7 @@ void freeClient(client *c) { /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ - if (c->flags & CLIENT_PROTECTED) { + if ((c->flags & CLIENT_PROTECTED) || (c->flags & CLIENT_PROTECTED_RDB_CHANNEL)) { freeClientAsync(c); return; } @@ -1700,6 +1707,9 @@ void freeClient(client *c) { moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICA_CHANGE, VALKEYMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE, NULL); + if (c->flags & CLIENT_REPL_RDB_CHANNEL) { + uint64_t id = htonu64(c->id); + raxRemove(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),NULL); } } /* Master/slave cleanup Case 2: @@ -1802,6 +1812,18 @@ int freeClientsInAsyncFreeQueue(void) { while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); + if (c->flags & CLIENT_PROTECTED_RDB_CHANNEL) { + /* Check if we can remove RDB connection protection. */ + if (!c->rdb_client_disconnect_time) { + c->rdb_client_disconnect_time = server.unixtime; + continue; + } + if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) { + serverLog(LL_NOTICE, "Replica main connection failed to establish PSYNC within the grace period (%ld seconds). Freeing RDB client %llu.", (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); + c->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL; + } + } + if (c->flags & CLIENT_PROTECTED) continue; c->flags &= ~CLIENT_CLOSE_ASAP; @@ -1822,6 +1844,15 @@ client *lookupClientByID(uint64_t id) { return c; } +/* Return a client by ID, or NULL if the client ID is not in the set + * of slaves waiting psync clients. */ +client *lookupRdbClientByID(uint64_t id) { + id = htonu64(id); + void *c = NULL; + raxFind(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),&c); + return c; +} + /* This function should be called from _writeToClient when the reply list is not empty, * it gathers the scattered buffers from reply list and sends them away with connWritev. * If we write successfully, it returns C_OK, otherwise, C_ERR is returned, @@ -2644,6 +2675,9 @@ void readQueryFromClient(connection *conn) { int nread, big_arg = 0; size_t qblen, readlen; + /* If the replica RDB client is marked as closed ASAP, do not try to read from it */ + if ((c->flags & CLIENT_CLOSE_ASAP) && (c->flags & CLIENT_PROTECTED_RDB_CHANNEL)) return; + /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return; @@ -2705,6 +2739,9 @@ void readQueryFromClient(connection *conn) { if (server.verbosity <= LL_VERBOSE) { sds info = catClientInfoString(sdsempty(), c); serverLog(LL_VERBOSE, "Client closed connection %s", info); + if (c->flags & CLIENT_PROTECTED_RDB_CHANNEL) { + serverLog(LL_VERBOSE, "Postpone RDB client (%llu) free for %d seconds", (unsigned long long)c->id, server.wait_before_rdb_client_free); + } sdsfree(info); } freeClientAsync(c); @@ -3975,10 +4012,11 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) { /* Note that c->reply_bytes is irrelevant for replica clients * (they use the global repl buffers). */ if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) || - c->flags & CLIENT_CLOSE_ASAP) return 0; + (c->flags & CLIENT_CLOSE_ASAP && !(c->flags & CLIENT_PROTECTED_RDB_CHANNEL))) return 0; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); - + /* Remove RDB connection protection on COB overrun */ + c->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL; if (async) { freeClientAsync(c); serverLog(LL_WARNING, @@ -4025,7 +4063,8 @@ void flushSlavesOutputBuffers(void) { * * 3. Obviously if the slave is not ONLINE. */ - if (slave->replstate == SLAVE_STATE_ONLINE && + if ((slave->replstate == SLAVE_STATE_ONLINE || + slave->replstate == SLAVE_STATE_BG_RDB_LOAD) && !(slave->flags & CLIENT_CLOSE_ASAP) && can_receive_writes && !slave->repl_start_cmd_stream_on_ack && diff --git a/src/rdb.c b/src/rdb.c index 0995907e62..fe9884842e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3547,6 +3547,7 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { listIter li; pid_t childpid; int pipefds[2], rdb_pipe_write, safe_to_exit_pipe; + int direct = (req & SLAVE_REQ_RDB_CHANNEL); if (hasActiveChildProcess()) return C_ERR; @@ -3574,9 +3575,15 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { /* Collect the connections of the replicas we want to transfer * the RDB to, which are i WAIT_BGSAVE_START state. */ - server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves)); - server.rdb_pipe_numconns = 0; - server.rdb_pipe_numconns_writing = 0; + int connsnum = 0; + connection **conns = zmalloc(sizeof(connection *)*listLength(server.slaves)); + server.rdb_pipe_conns = NULL; + if (!direct) { + server.rdb_pipe_conns = conns; + server.rdb_pipe_numconns = 0; + server.rdb_pipe_numconns_writing = 0; + } + /* Filter replica connections pending full sync (ie. in WAIT_BGSAVE_START state). */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; @@ -3584,7 +3591,19 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { /* Check slave has the exact requirements */ if (slave->slave_req != req) continue; - server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn; + conns[connsnum++] = slave->conn; + if (direct) { + /* Put the socket in blocking mode to simplify RDB transfer. */ + connBlock(slave->conn); + connSendTimeout(slave->conn, server.repl_timeout * 1000); + /* This replica uses diskless rdb channel sync, hence we need + * to inform it with the save end offset.*/ + sendCurrentOffsetToReplica(slave); + /* Make sure repl traffic is appended to the replication backlog */ + addSlaveToPsyncWaitingRax(slave); + } else { + server.rdb_pipe_numconns++; + } replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset()); } } @@ -3594,8 +3613,11 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { /* Child */ int retval, dummy; rio rdb; - - rioInitWithFd(&rdb,rdb_pipe_write); + if (direct) { + rioInitWithConnset(&rdb, conns, connsnum); + } else { + rioInitWithFd(&rdb,rdb_pipe_write); + } /* Close the reading part, so that if the parent crashes, the child will * get a write error and exit. */ @@ -3614,8 +3636,12 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { if (retval == C_OK) { sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB"); } - - rioFreeFd(&rdb); + if (direct) { + rioFreeConnset(&rdb); + } else { + rioFreeFd(&rdb); + } + zfree(conns); /* wake up the reader, tell it we're done. */ close(rdb_pipe_write); close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */ @@ -3642,20 +3668,24 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { } close(rdb_pipe_write); close(server.rdb_pipe_read); - close(server.rdb_child_exit_pipe); - zfree(server.rdb_pipe_conns); - server.rdb_pipe_conns = NULL; - server.rdb_pipe_numconns = 0; - server.rdb_pipe_numconns_writing = 0; + zfree(conns); + if (direct) { + closeChildInfoPipe(); + } else { + server.rdb_pipe_conns = NULL; + server.rdb_pipe_numconns = 0; + server.rdb_pipe_numconns_writing = 0; + } } else { - serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld", - (long) childpid); + serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld to %s", + (long) childpid, direct? "replica socket" : "pipe"); server.rdb_save_time_start = time(NULL); server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); } + if (direct) zfree(conns); } close(safe_to_exit_pipe); return (childpid == -1) ? C_ERR : C_OK; diff --git a/src/replication.c b/src/replication.c index f53fdc9160..1fb944ab43 100644 --- a/src/replication.c +++ b/src/replication.c @@ -44,10 +44,17 @@ void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(connection *conn); +void replicationResurrectProvisionalMaster(void); void replicationSendAck(void); int replicaPutOnline(client *slave); void replicaStartCommandStream(client *slave); int cancelReplicationHandshake(int reconnect); +void syncWithMaster(connection *conn); +void replicationSteadyStateInit(void); +void setupMainConnForPsync(connection *conn); +void completeTaskRDBChannelSyncMainConn(connection *conn); +void completeTaskRDBChannelSyncRdbConn(connection *conn); +void replicationAbortSyncTransfer(void); /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case @@ -197,6 +204,76 @@ void rebaseReplicationBuffer(long long base_repl_offset) { } } +/* Replication: Master side - connections association. + * On rdb-channel sync, connection association is used to keep replication data in + * the backlog until the replica requests PSYNC. Association happens in two forms, + * if there's an existing buffer block at the fork time, the replica is attached + * to the tail, if there is no tail, the replica will be attached when a new + * buffer block is created (see the Retrospect function below). + * Replica rdb client id is used as a unique key for the association. + * On COB overrun, association is deleted and the RDB connection + * is dropped. + */ +void addSlaveToPsyncWaitingRax(client* slave) { + listNode *ln = NULL; + replBufBlock *tail = NULL; + if (server.repl_backlog == NULL) { + createReplicationBacklog(); + } else { + ln = listLast(server.repl_buffer_blocks); + tail = ln ? listNodeValue(ln) : NULL; + if (tail) { + tail->refcount++; + } + } + serverLog(LL_DEBUG, "Add slave %s to waiting psync rax, with cid %llu, %s ", replicationGetSlaveName(slave), (long long unsigned int)slave->id, + tail? "with repl-backlog tail": "repl-backlog is empty"); + slave->ref_repl_buf_node = tail? ln: NULL; + /* Prevent rdb client from being freed before psync is established. */ + slave->flags |= CLIENT_PROTECTED_RDB_CHANNEL; + uint64_t id = htonu64(slave->id); + raxInsert(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),slave,NULL); +} + +/* Attach waiting psync replicas with new replication backlog head. */ +void addSlaveToPsyncWaitingRaxRetrospect(void) { + listNode *ln = listFirst(server.repl_buffer_blocks); + replBufBlock *head = ln ? listNodeValue(ln) : NULL; + raxIterator iter; + + if (head == NULL) return; + /* Update waiting psync slaves to wait on new buffer block */ + raxStart(&iter,server.slaves_waiting_psync); + raxSeek(&iter, "^", NULL, 0); + while(raxNext(&iter)) { + client* slave = iter.data; + if (slave->ref_repl_buf_node) continue; + slave->ref_repl_buf_node = ln; + head->refcount++; + serverLog(LL_DEBUG, "Retrospect attach slave %llu to repl buf block", (long long unsigned int)slave->id); + } + raxStop(&iter); +} + +void removeSlaveFromPsyncWaitingRax(client* slave) { + listNode *ln; + replBufBlock *o; + /* Get replBufBlock pointed by this replica */ + client *peer_slave = lookupRdbClientByID(slave->associated_rdb_client_id); + ln = peer_slave->ref_repl_buf_node; + o = ln ? listNodeValue(ln) : NULL; + if (o != NULL) { + serverAssert(o->refcount > 0); + o->refcount--; + } + peer_slave->ref_repl_buf_node = NULL; + peer_slave->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL; + serverLog(LL_DEBUG, "Remove psync waiting slave %s with cid %llu, repl buffer block %s", + replicationGetSlaveName(slave), (long long unsigned int)slave->associated_rdb_client_id, o? "ref count decreased": "doesn't exist"); + uint64_t id = htonu64(peer_slave->id); + raxRemove(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),NULL); +} + void resetReplicationBuffer(void) { server.repl_buffer_mem = 0; server.repl_buffer_blocks = listCreate(); @@ -323,7 +400,8 @@ void freeReplicaReferencedReplBuffer(client *replica) { replica->ref_block_pos = 0; } -/* Append bytes into the global replication buffer list, replication backlog and +/* Replication: Master side. + * Append bytes into the global replication buffer list, replication backlog and * all replica clients use replication buffers collectively, this function replace * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, * First we add buffer into global replication buffer block list, and then @@ -339,6 +417,7 @@ void feedReplicationBuffer(char *s, size_t len) { int add_new_block = 0; /* Create new block if current block is total used. */ listNode *ln = listLast(server.repl_buffer_blocks); replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + int empty_backlog = (tail == NULL); /* Append to tail string when possible. */ if (tail && tail->size > tail->used) { @@ -386,13 +465,18 @@ void feedReplicationBuffer(char *s, size_t len) { server.master_repl_offset += copy; server.repl_backlog->histlen += copy; } + if (empty_backlog && raxSize(server.slaves_waiting_psync) > 0) { + /* Increase refcount for pending replicas. */ + addSlaveToPsyncWaitingRaxRetrospect(); + } /* For output buffer of replicas. */ listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (!canFeedReplicaReplBuffer(slave)) continue; + if (!canFeedReplicaReplBuffer(slave) && + !(slave->flags & CLIENT_PROTECTED_RDB_CHANNEL)) continue; /* Update shared replication buffer start position. */ if (slave->ref_repl_buf_node == NULL) { @@ -418,7 +502,6 @@ void feedReplicationBuffer(char *s, size_t len) { } if (add_new_block) { createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); - /* It is important to trim after adding replication data to keep the backlog size close to * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated * unnecessary trimming attempts when small amounts of data are added. See comments in @@ -788,12 +871,20 @@ int masterTryPartialResynchronization(client *c, long long psync_offset) { goto need_full_resync; } - /* If we reached this point, we are able to perform a partial resync: + /* There are two scenarios that lead to this point. One is that we are able + * to perform a partial resync with the replica. The second is that the replica + * is using rdb-connection sync, while loading the snapshot in the background. + * in both cases: * 1) Set client state to make it a slave. * 2) Inform the client we can continue with +CONTINUE * 3) Send the backlog data (from the offset to the end) to the slave. */ c->flags |= CLIENT_SLAVE; - c->replstate = SLAVE_STATE_ONLINE; + if (c->flags & CLIENT_REPL_MAIN_CHANNEL && lookupRdbClientByID(c->associated_rdb_client_id)) { + c->replstate = SLAVE_STATE_BG_RDB_LOAD; + removeSlaveFromPsyncWaitingRax(c); + } else { + c->replstate = SLAVE_STATE_ONLINE; + } c->repl_ack_time = server.unixtime; c->repl_start_cmd_stream_on_ack = 0; listAddNodeTail(server.slaves,c); @@ -875,11 +966,12 @@ int startBgsaveForReplication(int mincapa, int req) { * otherwise slave will miss repl-stream-db. */ if (rsiptr) { if (socket_target) - retval = rdbSaveToSlavesSockets(req,rsiptr); + retval = rdbSaveToSlavesSockets(req, rsiptr); else { /* Keep the page cache since it'll get used soon */ retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE); } + if (server.debug_sleep_after_fork) usleep(server.debug_sleep_after_fork); } else { serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); retval = C_ERR; @@ -914,7 +1006,7 @@ int startBgsaveForReplication(int mincapa, int req) { return retval; } - /* If the target is socket, rdbSaveToSlavesSockets() already setup + /* If the target is socket, rdbDisklessSaveToSlaves() already setup * the slaves for a full resync. Otherwise for disk target do it now.*/ if (!socket_target) { listRewind(server.slaves,&li); @@ -1027,6 +1119,13 @@ void syncCommand(client *c) { * resync on purpose when they are not able to partially * resync. */ if (master_replid[0] != '?') server.stat_sync_partial_err++; + if (c->flags & CLIENT_REPL_MAIN_CHANNEL) { + serverLog(LL_NOTICE,"Replica %s is marked as main-conn, and psync isn't possible. Full sync will continue with dedicated RDB connection.", replicationGetSlaveName(c)); + if (connWrite(c->conn,"-FULLSYNCNEEDED\r\n",17) != 17) { + freeClientAsync(c); + } + return; + } } } else { /* If a slave uses SYNC, we are dealing with an old implementation @@ -1166,7 +1265,17 @@ void syncCommand(client *c) { * - rdb-filter-only * Define "include" filters for the RDB snapshot. Currently we only support * a single include filter: "functions". Passing an empty string "" will - * result in an empty RDB. */ + * result in an empty RDB. + * + * - rdb-channel <1|0> + * Used to identify the client as a replica's rdb connection in an rdb connection + * sync session. + * + * - main-conn <1|0> + * Used to identify the replica main connection during + * rdb-connection sync. It also means that if psync is impossible, master + * should not auto trigger full sync. + * */ void replconfCommand(client *c) { int j; @@ -1232,6 +1341,10 @@ void replconfCommand(client *c) { checkChildrenDone(); if (c->repl_start_cmd_stream_on_ack && c->replstate == SLAVE_STATE_ONLINE) replicaStartCommandStream(c); + if (c->replstate == SLAVE_STATE_BG_RDB_LOAD) { + c->flags &= ~CLIENT_REPL_MAIN_CHANNEL; + replicaPutOnline(c); + } /* Note: this command does not reply anything! */ return; } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { @@ -1275,6 +1388,49 @@ void replconfCommand(client *c) { } } sdsfreesplitres(filters, filter_count); + } else if (!strcasecmp(c->argv[j]->ptr, "rdb-conn")) { + long start_with_offset = 0; + if (getRangeLongFromObjectOrReply(c, c->argv[j +1], + 0, 1, &start_with_offset,NULL) != C_OK) { + return; + } + if (start_with_offset == 1) { + c->flags |= CLIENT_REPL_RDB_CHANNEL; + c->slave_req |= SLAVE_REQ_RDB_CHANNEL; + } else { + c->flags &= ~CLIENT_REPL_RDB_CHANNEL; + c->slave_req &= ~SLAVE_REQ_RDB_CHANNEL; + } + } else if (!strcasecmp(c->argv[j]->ptr, "main-conn") && server.rdb_channel_enabled) { + /* If rdb-channel is disable on this master, treat this command as unrecognized + * replconf option. */ + long main_conn = 0; + if (getRangeLongFromObjectOrReply(c, c->argv[j +1], + 0, 1, &main_conn, NULL) != C_OK) { + return; + } + if (main_conn == 1) { + if (!server.repl_diskless_sync) { + /* When the primary uses disk for full sync, replicas can usually join during the time the + * primary saves the database to disk. RDB-channel-sync, however, does not allow replicas + * to join the primary since the COB does not keep the needed replication data. To avoid + * making the primary create a new snapshot for each replica, we forbid rdb-channel-sync + * along with primary disk-based sync */ + addReplyErrorFormat(c,"Rdb channel sync is not allowed when repl-diskless-sync disabled on primary"); + return; + } + c->flags |= CLIENT_REPL_MAIN_CHANNEL; + } else { + c->flags &= ~CLIENT_REPL_MAIN_CHANNEL; + } + } else if (!strcasecmp(c->argv[j]->ptr, "set-rdb-conn-id")) { + /* REPLCONF identify is used to identify the current replica main connection with existing + * rdb-connection with the given id. */ + long long client_id = 0; + if (getLongLongFromObjectOrReply(c, c->argv[j +1], &client_id, NULL) != C_OK) { + return; + } + c->associated_rdb_client_id = (uint64_t)client_id; } else { addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr); @@ -1766,10 +1922,10 @@ void replicationEmptyDbCallback(dict *d) { /* Once we have a link with the master and the synchronization was * performed, this function materializes the master client we store * at server.master, starting from the specified file descriptor. */ -void replicationCreateMasterClient(connection *conn, int dbid) { +void replicationCreateMasterClientWithHandler(connection *conn, int dbid, ConnectionCallbackFunc handler) { server.master = createClient(conn); if (conn) - connSetReadHandler(server.master->conn, readQueryFromClient); + connSetReadHandler(server.master->conn, handler); /** * Important note: @@ -1797,6 +1953,12 @@ void replicationCreateMasterClient(connection *conn, int dbid) { if (dbid != -1) selectDb(server.master,dbid); } +/* Wrapper for replicationCreateMasterClientWithHandler, init master connection handler + * with ordinary client connection handler */ +void replicationCreateMasterClient(connection *conn, int dbid) { + replicationCreateMasterClientWithHandler(conn, dbid, readQueryFromClient); +} + /* This function will try to re-enable the AOF file after the * master-replica synchronization: if it fails after multiple attempts * the replica cannot be considered reliable and exists with an @@ -2267,20 +2429,29 @@ void readSyncBulkPayload(connection *conn) { } /* Final setup of the connected slave <- master link */ - replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); - server.repl_state = REPL_STATE_CONNECTED; + if (conn == server.repl_rdb_transfer_s) { + /* In case of full-sync using rdb channel, the master client was already created for psync purposes + * Instead of creating a new client we will use the one created for partial sync */ + completeTaskRDBChannelSyncRdbConn(conn); + } else { + replicationCreateMasterClient(server.repl_transfer_s, rsi.repl_stream_db); + server.repl_state = REPL_STATE_CONNECTED; + /* Send the initial ACK immediately to put this replica in online state. */ + replicationSendAck(); + } server.repl_down_since = 0; /* Fire the master link modules event. */ moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, NULL); - - /* After a full resynchronization we use the replication ID and - * offset of the master. The secondary ID / offset are cleared since - * we are starting a new history. */ - memcpy(server.replid,server.master->replid,sizeof(server.replid)); - server.master_repl_offset = server.master->reploff; + if (server.repl_state == REPL_STATE_CONNECTED) { + /* After a full resynchronization we use the replication ID and + * offset of the master. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(server.replid,server.master->replid,sizeof(server.replid)); + server.master_repl_offset = server.master->reploff; + } clearReplicationId2(); /* Let's create the replication backlog if needed. Slaves need to @@ -2294,13 +2465,17 @@ void readSyncBulkPayload(connection *conn) { serverCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n"); } - /* Send the initial ACK immediately to put this replica in online state. */ - if (usemark) replicationSendAck(); - /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ if (server.aof_enabled) restartAOFAfterSYNC(); + + /* In case of RDB Connection Sync we want to close the RDB connection + * once the connection is established */ + if (conn == server.repl_rdb_transfer_s) { + connClose(conn); + server.repl_rdb_transfer_s = NULL; + } return; error: @@ -2399,6 +2574,432 @@ char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens return NULL; } +/* Replication: Replica side. + * Returns an sds represent this replica port to be used by the master (mostly + * for logs) */ +sds getReplicaPortString(void) { + long long replica_port; + if (server.slave_announce_port) { + replica_port = server.slave_announce_port; + } else if (server.tls_replication && server.tls_port) { + replica_port = server.tls_port; + } else { + replica_port = server.port; + } + return sdsfromlonglong(replica_port); +} + +/* Replication: Replica side. + * Free replica's local replication buffer */ +void freePendingReplDataBuf(void) { + if (server.pending_repl_data.blocks) { + listRelease(server.pending_repl_data.blocks); + } + server.pending_repl_data.blocks = NULL; + server.pending_repl_data.len = 0; +} + +/* Replication: Replica side. + * Upon rdb-sync failure, close rdb-connection, reset repl-state, reset + * provisional master struct, and free local replication buffer. */ +void abortRdbConnectionSync(void) { + serverAssert(server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE); + serverLog(LL_WARNING, "Aborting RDB connection sync"); + if (server.repl_rdb_transfer_s) { + connClose(server.repl_rdb_transfer_s); + server.repl_rdb_transfer_s = NULL; + } + if (server.repl_transfer_tmpfile) { + zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + } + if (server.repl_transfer_fd != -1) { + close(server.repl_transfer_fd); + server.repl_transfer_fd = -1; + } + server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; + server.repl_provisional_master.read_reploff = 0; + server.repl_provisional_master.reploff = 0; + server.repl_provisional_master.conn = NULL; + server.repl_provisional_master.dbid = -1; + server.rdb_client_id = -1; + freePendingReplDataBuf(); + return; +} + +/* Replication: Master side. + * Send current replication offset to replica. Use the following structure: + * $ENDOFF: */ +int sendCurrentOffsetToReplica(client* replica) { + char buf[128]; + int buflen; + buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d %llu\r\n", server.master_repl_offset, server.replid, server.db->id, (long long unsigned int)replica->id); + serverLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu", + replicationGetSlaveName(replica), server.master_repl_offset, (long long unsigned int)replica->id); + if (connSyncWrite(replica->conn, buf, buflen, server.repl_syncio_timeout*1000) != buflen) { + freeClientAsync(replica); + return C_ERR; + } + return C_OK; +} + +/* Replication: Replica side. + * This connection handler is used to initialize the RDB connection (repl-rdb-channel sync). + * Once a replica with repl rdb-channel enabled, denied from PSYNC with its primary, + * fullSyncWithMaster begins its role. The connection handler prepare server.repl_rdb_transfer_s + * for a rdb stream, and server.repl_transfer_s for increamental replication data stream. */ +void fullSyncWithMaster(connection* conn) { + char *err = NULL; + serverAssert(conn == server.repl_rdb_transfer_s); + /* If this event fired after the user turned the instance into a master + * with SLAVEOF NO ONE we must just return ASAP. */ + if (server.repl_state == REPL_STATE_NONE) { + goto error; + } + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) { + serverLog(LL_WARNING,"Error condition on socket for RDB-CHANNEL-SYNC: %s", + connGetLastError(conn)); + goto error; + } + /* Send replica capabilities */ + if (server.repl_rdb_conn_state == REPL_RDB_CONN_SEND_HANDSHAKE) { + serverLog(LL_DEBUG, "Received first reply from primary using rdb connection."); + /* AUTH with the master if required. */ + if (server.masterauth) { + char *args[3] = {"AUTH",NULL,NULL}; + size_t lens[3] = {4,0,0}; + int argc = 1; + if (server.masteruser) { + args[argc] = server.masteruser; + lens[argc] = strlen(server.masteruser); + argc++; + } + args[argc] = server.masterauth; + lens[argc] = sdslen(server.masterauth); + argc++; + err = sendCommandArgv(conn, argc, args, lens); + if (err) goto write_error; + } + /* Send replica lisening port to master for clarification */ + sds portstr = getReplicaPortString(); + err = sendCommand(conn, "REPLCONF", "capa", "eof", + "rdb-only", "1", "rdb-conn", "1", "listening-port", portstr, NULL); + sdsfree(portstr); + if (err) goto write_error; + server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_AUTH_REPLY; + + if (connSetReadHandler(conn, fullSyncWithMaster) == C_ERR) { + char conninfo[CONN_INFO_LEN]; + serverLog(LL_WARNING, + "Can't create readable event for SYNC: %s (%s)", + strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); + goto error; + } + return; + } + if (server.repl_rdb_conn_state == REPL_RDB_CONN_RECEIVE_AUTH_REPLY && !server.masterauth) { + server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY; + } + /* Receive AUTH reply. */ + if (server.repl_rdb_conn_state == REPL_RDB_CONN_RECEIVE_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + if (err[0] == '-') { + serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); + sdsfree(err); + goto error; + } + sdsfree(err); + err = NULL; + server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY; + return; + } + /* Receive replconf response */ + if (server.repl_rdb_conn_state == REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + + if (err[0] == '-') { + serverLog(LL_NOTICE, "Server does not support sync with offset, RDB Channel Sync approach cannot be used: %s", err); + goto error; + } + sdsfree(err); + err = NULL; + + if (connSyncWrite(conn, "SYNC\r\n",6, server.repl_syncio_timeout*1000) == -1) { + serverLog(LL_WARNING, "I/O error writing to Primary: %s", + connGetLastError(conn)); + goto error; + } + + server.repl_rdb_conn_state = REPL_RDB_CONN_RECEIVE_ENDOFF; + return; + } + /* Receive master rdb-channel end offset response */ + if (server.repl_rdb_conn_state == REPL_RDB_CONN_RECEIVE_ENDOFF) { + int64_t rdb_client_id; + err = receiveSynchronousResponse(conn); + if (err == NULL) goto error; + if (err[0] == '\0') { + /* Retry again later */ + serverLog(LL_DEBUG, "Received empty $ENDOFF response"); + sdsfree(err); + return; + } + long long reploffset; + char master_replid[CONFIG_RUN_ID_SIZE+1]; + int dbid; + /* Parse end offset response */ + char *endoff_format = "$ENDOFF:%lld %40s %d %ld"; + if (sscanf(err, endoff_format, &reploffset, master_replid, &dbid, &rdb_client_id) != 4) { + goto error; + } + sdsfree(err); + server.rdb_client_id = rdb_client_id; + server.master_initial_offset = reploffset; + + /* Initiate repl_provisional_master to act as this replica temp master until RDB is loaded */ + server.repl_provisional_master.conn = server.repl_transfer_s; + memcpy(server.repl_provisional_master.replid, master_replid, CONFIG_RUN_ID_SIZE); + server.repl_provisional_master.reploff = reploffset; + server.repl_provisional_master.read_reploff = reploffset; + server.repl_provisional_master.dbid = dbid; + + /* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the + * main connection accordingly.*/ + server.repl_transfer_s->state = CONN_STATE_CONNECTED; + server.repl_state = REPL_STATE_SEND_HANDSHAKE; + serverAssert(connSetReadHandler(server.repl_transfer_s, setupMainConnForPsync) != C_ERR); + setupMainConnForPsync(server.repl_transfer_s); + + /* As the next block we will receive using this connection is the rdb, we need to prepare + * the connection accordingly */ + serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, readSyncBulkPayload) != C_ERR); + server.repl_transfer_size = -1; + server.repl_transfer_read = 0; + server.repl_transfer_last_fsync_off = 0; + server.repl_transfer_lastio = server.unixtime; + + server.repl_rdb_conn_state = REPL_RDB_CONN_RDB_LOAD; + return; + } + + no_response_error: + /* Handle receiveSynchronousResponse() error when primary has no reply */ + serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake"); + /* Fall through to regular error handling */ + + error: + connClose(conn); + server.repl_transfer_s = NULL; + if (server.repl_rdb_transfer_s) { + connClose(server.repl_rdb_transfer_s); + server.repl_rdb_transfer_s = NULL; + } + if (server.repl_transfer_fd != -1) + close(server.repl_transfer_fd); + server.repl_transfer_fd = -1; + server.repl_state = REPL_STATE_CONNECT; + abortRdbConnectionSync(); + return; + + write_error: /* Handle sendCommand() errors. */ + serverLog(LL_WARNING, "Sending command to master in rdb channel replication handshake: %s", err); + sdsfree(err); + goto error; +} + +/* Replication: Replica side. + * Initialize server.pending_repl_data infrastructure, we will allocate the buffer + * itself once we need it */ +void replDataBufInit(void) { + serverAssert(server.pending_repl_data.blocks == NULL); + server.pending_repl_data.len = 0; + server.pending_repl_data.peak = 0; + server.pending_repl_data.blocks = listCreate(); + server.pending_repl_data.blocks->free = zfree; +} + +/* Replication: Replica side. + * Track the local repl-data buffer streaming progress and serve clients from time to time */ +void replStreamProgressCallback(size_t offset, int readlen, time_t *last_progress_callback) { + time_t now = mstime(); + if (server.loading_process_events_interval_bytes && + (offset + readlen) / server.loading_process_events_interval_bytes > offset / server.loading_process_events_interval_bytes && + now - *last_progress_callback > server.loading_process_events_interval_ms) + { + replicationSendNewlineToMaster(); + processEventsWhileBlocked(); + *last_progress_callback = now; + } +} + +/* Replication: Replica side. + * Reads replication data from primary into specified repl buffer block */ +int readIntoReplDataBlock(connection *conn, replDataBufBlock *o, size_t read) { + int nread = connRead(conn, o->buf + o->used, read); + if (nread == -1) { + if (connGetState(conn) != CONN_STATE_CONNECTED) { + serverLog(LL_VERBOSE, "Error reading from primary: %s",connGetLastError(conn)); + cancelReplicationHandshake(1); + } + return C_ERR; + } + if (nread == 0) { + if (server.verbosity <= LL_VERBOSE) { + serverLog(LL_VERBOSE, "Provisional master closed connection"); + } + cancelReplicationHandshake(1); + return C_ERR; + } + o->used += nread; + atomicIncr(server.stat_total_reads_processed, nread); + return read - nread; +} + +/* Replication: Replica side. + * Returns true in case the replica's local repl-baffer used all of its space */ +int isReplicaBufferLimitReached(void) { + return server.pending_repl_data.len > server.client_obuf_limits[CLIENT_SLAVE].hard_limit_bytes; +} + +/* Replication: Replica side. + * Read handler for buffering incoming repl data during RDB download/loading. */ +void bufferReplData(connection *conn) { + size_t readlen = PROTO_IOBUF_LEN; + int read = 0; + + while (readlen > 0) { + listNode *ln = listLast(server.pending_repl_data.blocks); + replDataBufBlock *tail = ln ? listNodeValue(ln) : NULL; + + /* Append to tail string when possible */ + if (tail && tail->used < tail->size) { + size_t avail = tail->size - tail->used; + read = min(readlen, avail); + readlen -= read; + read = readIntoReplDataBlock(conn, tail, read); + } + if (readlen && read == 0) { + if (isReplicaBufferLimitReached()) { + serverLog(LL_NOTICE, "Replication buffer limit reached, stopping buffering."); + /* Stop accumulating master commands. */ + connSetReadHandler(conn, NULL); + break; + } + /* Create a new node, make sure it is allocated to at least PROTO_REPLY_CHUNK_BYTES. + * Use the same upper boundary as the shared replication buffer (feedReplicationBuffer), + * as they share the same purpose */ + size_t usable_size; + size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); + size_t size = min(max(readlen, (size_t)PROTO_REPLY_CHUNK_BYTES), limit); + tail = zmalloc_usable(size + sizeof(replDataBufBlock), &usable_size); + tail->size = usable_size - sizeof(replDataBufBlock); + tail->used = 0; + listAddNodeTail(server.pending_repl_data.blocks, tail); + server.pending_repl_data.len += tail->size; + /* Update buffer's peak */ + if (server.pending_repl_data.peak < server.pending_repl_data.len) + server.pending_repl_data.peak = server.pending_repl_data.len; + + read = min(readlen, tail->size); + readlen -= read; + read = readIntoReplDataBlock(conn, tail, read); + } + if (read > 0) { + /* Stop reading in case we read less than we anticipated */ + break; + } + if (read == C_ERR) { + return; + } + } +} + +/* Replication: Replica side. + * Streams accumulated replication data into the database while freeing read nodes */ +void streamReplDataBufToDb(client *c) { + serverAssert(c->flags & CLIENT_MASTER); + blockingOperationStarts(); + size_t offset = 0; + listNode *cur = NULL; + time_t last_progress_callback = mstime(); + while ((cur = listFirst(server.pending_repl_data.blocks))) { + /* Read and process repl data block */ + replDataBufBlock *o = listNodeValue(cur); + c->querybuf = sdscatlen(c->querybuf, o->buf, o->used); + c->read_reploff += o->used; + processInputBuffer(c); + server.pending_repl_data.len -= o->used; + replStreamProgressCallback(offset, o->used, &last_progress_callback); + offset += o->used; + listDelNode(server.pending_repl_data.blocks, cur); + } + blockingOperationEnds(); +} + +/* Replication: Replica side. + * After done loading the snapshot using the rdb-connection prepare this replica for steady state by + * initializing the master client, amd stream local increamental buffer into memory. */ +void rdbChannelSyncSuccess(void) { + server.master_initial_offset = server.repl_provisional_master.reploff; + replicationResurrectProvisionalMaster(); + /* Wait for the accumulated buffer to be processed before reading any more replication updates */ + if (server.pending_repl_data.blocks) streamReplDataBufToDb(server.master); + freePendingReplDataBuf(); + serverLog(LL_NOTICE, "Successfully streamed replication data into memory"); + /* We can resume reading from the master connection once the local replication buffer has been loaded. */ + replicationSteadyStateInit(); + replicationSendAck(); /* Send ACK to notify primary that replica is synced */ + server.rdb_client_id = -1; +} + +/* Replication: Replica side. + * Main connection successfully established psync with master. The 'conn' argument must be the main + * connection. Check whether the rdb connection has completed its part and act accordingly. */ +void completeTaskRDBChannelSyncMainConn(connection *conn) { + serverAssert(conn == server.repl_transfer_s && server.repl_state == REPL_STATE_RECEIVE_PSYNC_REPLY); + if (server.repl_rdb_conn_state < REPL_RDB_CONN_RDB_LOADED) { + /* RDB is still loading */ + if (connSetReadHandler(server.repl_provisional_master.conn, bufferReplData)) { + serverLog(LL_WARNING,"Error while setting readable handler: %s", strerror(errno)); + cancelReplicationHandshake(1); + } + replDataBufInit(); + server.repl_state = REPL_STATE_TRANSFER; + return; + } + if (server.repl_rdb_conn_state == REPL_RDB_CONN_RDB_LOADED) { + /* RDB is loaded */ + serverLog(LL_DEBUG, "RDB channel sync - psync established after rdb load"); + rdbChannelSyncSuccess(); + return; + } + serverPanic("Unrecognized rdb channel replication state %d", server.repl_rdb_conn_state); +} + +/* Replication: Replica side. + * Rdb connection done loading rdb. The 'conn' argument must be the rdb connection. Check whether the + * main connection has completed its part and act accordingly. */ +void completeTaskRDBChannelSyncRdbConn(connection *conn) { + serverAssert(conn == server.repl_rdb_transfer_s && server.repl_rdb_conn_state == REPL_RDB_CONN_RDB_LOAD); + /* RDB connection */ + if (server.repl_state < REPL_STATE_TRANSFER) { + /* Main psync connection hasn't been established yet */ + server.repl_rdb_conn_state = REPL_RDB_CONN_RDB_LOADED; + return; + } + if (server.repl_state == REPL_STATE_TRANSFER) { + connSetReadHandler(server.repl_transfer_s, NULL); + rdbChannelSyncSuccess(); + server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; + return; + } + serverPanic("Unrecognized replication state %d using rdb connection", server.repl_state); +} + /* Try a partial resynchronization with the master if we are about to reconnect. * If there is no cached master structure, at least try to issue a * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC @@ -2453,6 +3054,7 @@ char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens #define PSYNC_FULLRESYNC 3 #define PSYNC_NOT_SUPPORTED 4 #define PSYNC_TRY_LATER 5 +#define PSYNC_FULLRESYNC_RDB_CONN 6 int slaveTryPartialResynchronization(connection *conn, int read_reply) { char *psync_replid; char psync_offset[32]; @@ -2467,7 +3069,12 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) { * client structure representing the master into server.master. */ server.master_initial_offset = -1; - if (server.cached_master) { + if (server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE) { + /* While in rdb-channel-sync, we should use our prepared repl id and offset. */ + psync_replid = server.repl_provisional_master.replid; + snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_master.reploff+1); + serverLog(LL_NOTICE, "Trying a partial resynchronization using main channel (request %s:%s).", psync_replid, psync_offset); + } else if (server.cached_master) { psync_replid = server.cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); @@ -2545,6 +3152,11 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) { } if (!strncmp(reply,"+CONTINUE",9)) { + if (server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE) { + /* During rdb-sync sesseion, master struct is already initialized. */ + sdsfree(reply); + return PSYNC_CONTINUE; + } /* Partial resync was accepted. */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); @@ -2609,6 +3221,14 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) { return PSYNC_TRY_LATER; } + if (!strncmp(reply, "-FULLSYNCNEEDED", 15)) { + /* In case the main connection with master is at main-conn mode, the master + * will respond with -FULLSYNCNEEDED to imply that psync is not possible */ + serverLog(LL_NOTICE, "PSYNC is not possible, initialize RDB channel."); + sdsfree(reply); + return PSYNC_FULLRESYNC_RDB_CONN; + } + if (strncmp(reply,"-ERR",4)) { /* If it's not an error, log the unexpected event. */ serverLog(LL_WARNING, @@ -2622,6 +3242,131 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) { return PSYNC_NOT_SUPPORTED; } +/* Replication: Replica side. + * This connection handler fires after rdb-channel was initialized. We use it + * to adjust the replica main for loading incremental changes into the local buffer. */ +void setupMainConnForPsync(connection *conn) { + int psync_result = -1; + char llstr[LONG_STR_SIZE]; + char* err = NULL; + if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { + /* We already have an initialized connection at master side, we only need to associate it with RDB connection */ + ll2string(llstr,sizeof(llstr), server.rdb_client_id); + err = sendCommand(conn, "REPLCONF", "set-rdb-conn-id", llstr, NULL); + if (err) goto error; + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + return; + } + + if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto error; + if (err[0] == '-') { + serverLog(LL_NOTICE,"Master does not understand REPLCONF identify: %s", err); + goto error; + } + sdsfree(err); + err = NULL; + server.repl_state = REPL_STATE_SEND_PSYNC; + } + + if (server.repl_state == REPL_STATE_SEND_PSYNC) { + if (server.debug_sleep_after_fork) usleep(server.debug_sleep_after_fork); + if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { + serverLog(LL_WARNING, "Aborting RDB connection sync. Write error."); + cancelReplicationHandshake(1); + } + server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + return; + } + psync_result = slaveTryPartialResynchronization(conn,1); + if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ + + if (psync_result == PSYNC_CONTINUE) { + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization%s", + server.repl_rdb_transfer_s != NULL ? ", RDB load in background.":"."); + if (server.supervised_mode == SUPERVISED_SYSTEMD) { + serverCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n"); + } + completeTaskRDBChannelSyncMainConn(conn); + return; + } + + error: + /* The rdb-conn-sync session must be aborted for any psync_result other than PSYNC_CONTINUE or PSYNC_WAIT_REPLY. */ + serverLog(LL_WARNING, "Aborting RDB connection sync. Main connection psync result %d", psync_result); + cancelReplicationHandshake(1); +} + +/* + * RDB-Channel for full sync + * + * * Motivation * + * - Reduce master memory load. We do that by moving the COB tracking to the replica side. This also decrease + * the chance for COB overruns. Note that master's input buffer limits at the replica side are less restricted + * then master's COB as the replica plays less critical part in the replication group. While increasing the + * primary’s COB may end up with primary reaching swap and clients suffering, at replica side we’re more at + * ease with it. Larger COB means better chance to sync successfully. + * - Reduce master main process CPU load. By opening a new, dedicated connection for the RDB transfer, child + * processes can have direct access to the new connection. Due to TLS connection restrictions, this was not + * possible using one main connection. We eliminate the need for the child process to use the master's + * child-proc -> main-proc pipeline, thus freeing up the main process to process clients queries. + * + * * High level interface design * + * - RDB-Channel sync begins when the replica sends a REPLCONF MAINCONN to the master during initial + * handshake. This allows the replica to verify whether the master supports rdb-channel sync and, if + * so, state that this is the replica's main connection, which is not used for snapshot transfer. + * - When replica lacks sufficient data for PSYNC, the master will send -FULLSYNCNEEDED response instead + * of RDB data. As a next step, the replica creates a new connection (rdb-channel) and configures it against + * the master with the appropriate capabilities and requirements. The replica then requests a sync + * using the RDB connection. + * - Prior to forking, the master sends the replica the snapshot's end repl-offset, and attaches the replica + * to the replication backlog to keep repl data until the replica requests psync. The replica uses the main + * connection to request a PSYNC starting at the snapshot end offset. + * - The master main threads sends incremental changes via the main connection, while the bgsave process + * sends the RDB directly to the replica via the rdb-connection. As for the replica, the incremental + * changes are stored on a local buffer, while the RDB is loaded into memory. + * - Once the replica completes loading the rdb, it drops the rdb-connection and streams the accumulated incremental + * changes into memory. Repl steady state continues normally. + * + * * Replica state machine * + * RDB Channel Sync + * ┌──────────────────────────────────────────────────────────────┐ + * │ RDB connection states Main connection state │ + * ┌───────────────────┐ ┌────────────┐ │ ┌────────────────────────────┐ ┌───────────────────┐ │ + * │RECEIVE_PING_REPLY │ ┌───►SEND_PSYNC │ -FULLSYNCNEEDED─┤REPL_RDB_CONN_SEND_HANDSHAKE│ ┌──►SEND_HANDSHAKE │ │ + * └────────┬──────────┘ │ └─┬──────────┘ │ │ └────┬───────────────────────┘ │ └──┬────────────────┘ │ + * │+PONG │ │PSYNC (use cached-master) │ │ │REPLCONF set-rdb-conn-id + * ┌────────▼──────────┐ │ ┌─▼─────────────────┐ │ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ + * │SEND_HANDSHAKE │ │ ┌─┤RECEIVE_PSYNC_REPLY├─┘ │ │RDB_CONN_RECEIVE_AUTH_REPLY │ │ │RECEIVE_CAPA_REPLY │ │ + * └────────┬──────────┘ │ │ └─┬─────────────────┘ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ + * │ │ │ │+FULLRESYNC │ │+OK │ │+OK │ + * ┌────────▼──────────┐ │ │ ┌─▼─────────────────┐ │ ┌───────▼───────────────────────┐ │ ┌──▼────────────────┐ │ + * │RECEIVE_AUTH_REPLY │ │ │ │TRANSFER │ │ │RDB_CONN_RECEIVE_REPLCONF_REPLY│ │ │SEND_PSYNC │ │ + * └────────┬──────────┘ │ │ └───────────────────┘ │ └───────┬───────────────────────┘ │ └──┬────────────────┘ │ + * │+OK │ │ │ │+OK │ │PSYNC use snapshot │ + * ┌────────▼──────────┐ │ │ │ ┌───────▼───────────────┐ │ │end-offset provided│ + * │RECEIVE_PORT_REPLY │ │ │ │ │RDB_CONN_RECEIVE_ENDOFF│ │ │by the master │ + * └────────┬──────────┘ │ │ │ └───────┬───────────────┘ │ ┌──▼────────────────┐ │ + * │+OK │ │+CONTINUE │ │$ENDOFF │ │RECEIVE_PSYNC_REPLY│ │ + * ┌────────▼──────────┐ │ │ │ ├─────────────────────────┘ └──┬────────────────┘ │ + * │RECEIVE_IP_REPLY │ │ │ │ │ │+CONTINUE │ + * └────────┬──────────┘ │ │ │ ┌───────▼───────────────┐ ┌──▼────────────────┐ │ + * │+OK │ │ │ │RDB_CONN_RDB_LOAD │ │TRANSFER │ │ + * ┌────────▼──────────┐ │ │ │ └───────┬───────────────┘ └────┬──────────────┘ │ + * │RECEIVE_IP_REPLY │ │ │ │ │Done loading snapshot │ │ + * └────────┬──────────┘ │ │ │ ┌───────▼───────────────┐ │ │ + * │+OK │ │ │ │RDB_CONN_RDB_LOADED │ │ │ + * ┌────────▼────────────────┐ │ │ │ └───────┬───────────────┘ │ │ + * │RECEIVE_NO_FULLSYNC_REPLY│ │ │ │ │ │ │ + * └─┬────┬──────────────────┘ │ └────┐ │ │Slave loads local replication │ │ + * │+OK │Unrecognized REPLCONF │ │ │buffer into memory │ │ + * ┌─▼────▼────────────┐ │ │ ┌────────┼──────────┴─────────────────────────────────┘ │ + * │RECEIVE_CAPA_REPLY ├───────┘ │ │ │ │ + * └───────────────────┘ ┌──▼────────────▼───┐ │ │ + * │CONNECTED │ └──────────────────────────────────────────────────────────────┘ + * └───────────────────┘ + */ /* This handler fires when the non blocking connect was able to * establish a connection with the master. */ void syncWithMaster(connection *conn) { @@ -2709,14 +3454,7 @@ void syncWithMaster(connection *conn) { /* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ { - int port; - if (server.slave_announce_port) - port = server.slave_announce_port; - else if (server.tls_replication && server.tls_port) - port = server.tls_port; - else - port = server.port; - sds portstr = sdsfromlonglong(port); + sds portstr = getReplicaPortString(); err = sendCommand(conn,"REPLCONF", "listening-port",portstr, NULL); sdsfree(portstr); @@ -2732,6 +3470,13 @@ void syncWithMaster(connection *conn) { if (err) goto write_error; } + /* When using rdb-channel for sync, mark the main connection only for psync. + * The master's capabilities will also be verified here, since if the master + * does not support rdb-channel sync, it will not understand this command. */ + if (server.rdb_channel_enabled) { + err = sendCommand(conn,"REPLCONF", "main-conn", "1", NULL); + } + /* Inform the master of our (slave) capabilities. * * EOF: supports EOF-style RDB transfer for diskless replication. @@ -2780,7 +3525,7 @@ void syncWithMaster(connection *conn) { } if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.slave_announce_ip) - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + server.repl_state = REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY; /* Receive REPLCONF ip-address reply. */ if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) { @@ -2793,6 +3538,29 @@ void syncWithMaster(connection *conn) { "REPLCONF ip-address: %s", err); } sdsfree(err); + server.repl_state = REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY; + return; + } + + if (server.repl_state == REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY && !server.rdb_channel_enabled) { + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + } + + if (server.repl_state == REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto error; + else if (err[0] == '-') { + /* Activate rdb-channel fallback mechanism. The master did not understand + * REPLCONF main-conn. This means the master does not support RDB channel + * synchronization. The replica will continue the sync session with one + * channel (normally). */ + serverLog(LL_WARNING, "Master does not understand REPLCONF main-conn aborting rdb-channel sync %s", err); + server.master_supports_rdb_channel = 0; + } else if (memcmp(err, "+OK", 3) == 0) { + /* Master support rdb-connection sync. Continue with rdb-channel approach. */ + server.master_supports_rdb_channel = 1; + } + sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; return; } @@ -2896,8 +3664,31 @@ void syncWithMaster(connection *conn) { server.repl_transfer_fd = dfd; } + /* Using rdb-channel sync, the master responded -FULLSYNCNEEDED. We need to + * initialize the RDB channel. */ + if (psync_result == PSYNC_FULLRESYNC_RDB_CONN) { + /* Create a full sync connection */ + server.repl_rdb_transfer_s = connCreate(connTypeOfReplication()); + if (connConnect(server.repl_rdb_transfer_s, server.masterhost, server.masterport, + server.bind_source_addr, fullSyncWithMaster) == C_ERR) { + serverLog(LL_WARNING,"Unable to connect to Primary: %s", + connGetLastError(server.repl_transfer_s)); + connClose(server.repl_rdb_transfer_s); + server.repl_rdb_transfer_s = NULL; + goto error; + } + if (connSetReadHandler(conn, NULL) == C_ERR) { + char conninfo[CONN_INFO_LEN]; + serverLog(LL_WARNING, + "Can't clear main connection handler: %s (%s)", + strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); + goto error; + } + server.repl_rdb_conn_state = REPL_RDB_CONN_SEND_HANDSHAKE; + return; + } /* Setup the non blocking download of the bulk file. */ - if (connSetReadHandler(conn, readSyncBulkPayload) + else if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; @@ -2922,6 +3713,10 @@ void syncWithMaster(connection *conn) { if (dfd != -1) close(dfd); connClose(conn); server.repl_transfer_s = NULL; + if (server.repl_rdb_transfer_s) { + connClose(server.repl_rdb_transfer_s); + server.repl_rdb_transfer_s = NULL; + } if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd); if (server.repl_transfer_tmpfile) @@ -2968,7 +3763,8 @@ void undoConnectWithMaster(void) { * Never call this function directly, use cancelReplicationHandshake() instead. */ void replicationAbortSyncTransfer(void) { - serverAssert(server.repl_state == REPL_STATE_TRANSFER); + serverAssert(server.repl_state == REPL_STATE_TRANSFER || + server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE); undoConnectWithMaster(); if (server.repl_transfer_fd!=-1) { close(server.repl_transfer_fd); @@ -2988,6 +3784,9 @@ void replicationAbortSyncTransfer(void) { * * Otherwise zero is returned and no operation is performed at all. */ int cancelReplicationHandshake(int reconnect) { + if (server.repl_rdb_conn_state != REPL_RDB_CONN_STATE_NONE) { + abortRdbConnectionSync(); + } if (server.repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(); server.repl_state = REPL_STATE_CONNECT; @@ -3058,6 +3857,9 @@ void replicationSetMaster(char *ip, int port) { NULL); server.repl_state = REPL_STATE_CONNECT; + /* Allow trying rdb-channel sync with the new master. If new master doesn't + * support rdb-channel sync, we will set to 0 afterwards. */ + server.master_supports_rdb_channel = -1; serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); connectWithMaster(); @@ -3398,16 +4200,10 @@ void replicationDiscardCachedMaster(void) { server.cached_master = NULL; } -/* Turn the cached master into the current master, using the file descriptor - * passed as argument as the socket for the new master. - * - * This function is called when successfully setup a partial resynchronization - * so the stream of data that we'll receive will start from where this - * master left. */ -void replicationResurrectCachedMaster(connection *conn) { - server.master = server.cached_master; - server.cached_master = NULL; - server.master->conn = conn; +/* Replication: Replica side. + * This method performs the necessary steps to establish a connection with the master server. + * It sets private data, updates flags, and fires an event to notify modules about the master link change. */ +void establishMasterConnection(void) { connSetPrivateData(server.master->conn, server.master); server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); server.master->authenticated = 1; @@ -3419,9 +4215,30 @@ void replicationResurrectCachedMaster(connection *conn) { moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_UP, NULL); +} +/* Replication: Replica side. + * Turn the cached master into the current master, using the file descriptor + * passed as argument as the socket for the new master. + * + * This function is called when successfully setup a partial resynchronization + * so the stream of data that we'll receive will start from where this + * master left. */ +void replicationResurrectCachedMaster(connection *conn) { + server.master = server.cached_master; + server.cached_master = NULL; + server.master->conn = conn; + + establishMasterConnection(); /* Re-add to the list of clients. */ linkClient(server.master); + replicationSteadyStateInit(); +} + +/* Replication: Replica side. + * Prepare replica to steady state. + * prerequisite: server.master is already initialized and linked in client list. */ +void replicationSteadyStateInit(void) { if (connSetReadHandler(server.master->conn, readQueryFromClient)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ @@ -3437,6 +4254,18 @@ void replicationResurrectCachedMaster(connection *conn) { } } +/* Replication: Replica side. + * Turn the provisional master into the current master. + * This function is called after rdb-channel sync is finished successfully. */ +void replicationResurrectProvisionalMaster(void) { + /* Create a master client, but do not initialize the read handler yet, as this replica still has a local buffer to drain. */ + replicationCreateMasterClientWithHandler(server.repl_transfer_s, server.repl_provisional_master.dbid, NULL); + memcpy(server.master->replid, server.repl_provisional_master.replid, CONFIG_RUN_ID_SIZE); + server.master->reploff = server.repl_provisional_master.reploff; + server.master->read_reploff = server.repl_provisional_master.read_reploff; + establishMasterConnection(); +} + /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ /* This function counts the number of slaves with lag <= min-slaves-max-lag. @@ -3913,7 +4742,7 @@ void replicationCron(void) { if (listLength(server.repl_buffer_blocks) > 0) { replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks)); serverAssert(o->refcount > 0 && - o->refcount <= (int)listLength(server.slaves)+1); + o->refcount <= (int)listLength(server.slaves) + 1 + (int)raxSize(server.slaves_waiting_psync)); } /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ diff --git a/src/rio.c b/src/rio.c index 8388daae53..7d983595fa 100644 --- a/src/rio.c +++ b/src/rio.c @@ -55,6 +55,7 @@ #include "crc64.h" #include "config.h" #include "server.h" + #include "connhelpers.h" /* ------------------------- Buffer I/O implementation ----------------------- */ @@ -518,3 +519,138 @@ size_t rioWriteBulkDouble(rio *r, double d) { dbuf[dlen] = '\0'; return rioWriteBulkString(r,dbuf,dlen); } + +/* Returns 1 or 0 for success/failure. + * The function returns success as long as we are able to correctly write + * to at least one file descriptor. + * + * When buf is NULL and len is 0, the function performs a flush operation + * if there is some pending buffer, so this function is also used in order + * to implement rioConnsetFlush(). */ +static size_t rioConnsetWrite(rio *r, const void *buf, size_t len) { + ssize_t retval; + int j; + unsigned char *p = (unsigned char*) buf; + int doflush = (buf == NULL && len == 0); + + /* To start we always append to our buffer. If it gets larger than + * a given size, we actually write to the sockets. */ + if (len) { + r->io.connset.buf = sdscatlen(r->io.connset.buf,buf,len); + len = 0; /* Prevent entering the while below if we don't flush. */ + if (sdslen(r->io.connset.buf) > PROTO_IOBUF_LEN) doflush = 1; + } + + if (doflush) { + p = (unsigned char*) r->io.connset.buf; + len = sdslen(r->io.connset.buf); + } + + /* Write in little chunchs so that when there are big writes we + * parallelize while the kernel is sending data in background to + * the TCP socket. */ + while(len) { + size_t count = len < 1024 ? len : 1024; + int broken = 0; + for (j = 0; j < r->io.connset.numconns; j++) { + if (r->io.connset.state[j] != 0) { + /* Skip FDs already in error. */ + broken++; + continue; + } + + /* Make sure to write 'count' bytes to the socket regardless + * of short writes. */ + size_t nwritten = 0; + while(nwritten != count) { + retval = connWrite(r->io.connset.conns[j],p+nwritten,count-nwritten); + if (retval <= 0) { + /* With blocking sockets, which is the sole user of this + * rio target, EWOULDBLOCK is returned only because of + * the SO_SNDTIMEO socket option, so we translate the error + * into one more recognizable by the user. */ + if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; + break; + } + nwritten += retval; + } + + if (nwritten != count) { + /* Mark this FD as broken. */ + r->io.connset.state[j] = errno; + if (r->io.connset.state[j] == 0) r->io.connset.state[j] = EIO; + } + } + if (broken == r->io.connset.numconns) return 0; /* All the FDs in error. */ + p += count; + len -= count; + r->io.connset.pos += count; + } + + if (doflush) sdsclear(r->io.connset.buf); + return 1; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioConnsetRead(rio *r, void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not support reading. */ +} + +/* Returns read/write position in file. */ +static off_t rioConnsetTell(rio *r) { + return r->io.connset.pos; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioConnsetFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioConnsetWrite(r,NULL,0); +} + +static const rio rioConnsetIO = { + rioConnsetRead, + rioConnsetWrite, + rioConnsetTell, + rioConnsetFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* flags */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + + +void rioInitWithConnset(rio *r, connection **conns, int numconns) { + *r = rioConnsetIO; + r->io.connset.conns = zmalloc(sizeof(connection*) * numconns); + r->io.connset.state = zmalloc(sizeof(int) * numconns); + + for (int i = 0; i < numconns; i++) { + connIncrRefs(conns[i]); + r->io.connset.conns[i] = conns[i]; + r->io.connset.state[i] = 0; + } + + r->io.connset.numconns = numconns; + r->io.connset.pos = 0; + r->io.connset.buf = sdsempty(); +} + +/* release the rio stream. */ +void rioFreeConnset(rio *r) { + for (int i = 0; i < r->io.connset.numconns; i++) { + connection *conn = r->io.connset.conns[i]; + connDecrRefs(conn); + callHandler(conn, NULL); // trigger close/free if necessary + } + + zfree(r->io.connset.conns); + zfree(r->io.connset.state); + sdsfree(r->io.connset.buf); +} diff --git a/src/rio.h b/src/rio.h index edad7e7956..70d391ddec 100644 --- a/src/rio.h +++ b/src/rio.h @@ -97,6 +97,14 @@ struct _rio { off_t pos; sds buf; } fd; + /* Multiple connections target (used to write to N sockets). */ + struct { + connection **conns; /* Connections */ + int *state; /* Error state of each fd. 0 (if ok) or errno. */ + int numconns; + off_t pos; + sds buf; + } connset; } io; }; @@ -182,4 +190,6 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len); void rioSetAutoSync(rio *r, off_t bytes); void rioSetReclaimCache(rio *r, int enabled); uint8_t rioCheckType(rio *r); +void rioInitWithConnset(rio *r, connection **conns, int numconns); +void rioFreeConnset(rio *r); #endif diff --git a/src/server.c b/src/server.c index 8f020f07ed..c30c5693b7 100644 --- a/src/server.c +++ b/src/server.c @@ -2131,6 +2131,7 @@ void initServerConfig(void) { server.cached_master = NULL; server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; + server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; server.repl_transfer_tmpfile = NULL; server.repl_transfer_fd = -1; server.repl_transfer_s = NULL; @@ -2138,6 +2139,7 @@ void initServerConfig(void) { server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.master_repl_offset = 0; server.fsynced_reploff_pending = 0; + server.rdb_client_id = -1; /* Replication partial resync backlog */ server.repl_backlog = NULL; @@ -2636,6 +2638,8 @@ void initServer(void) { server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); + server.slaves_waiting_psync = raxNew(); + server.wait_before_rdb_client_free = DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE; server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); server.clients_timeout_table = raxNew(); @@ -2757,6 +2761,7 @@ void initServer(void) { server.cron_malloc_stats.allocator_active = 0; server.cron_malloc_stats.allocator_resident = 0; server.lastbgsave_status = C_OK; + server.master_supports_rdb_channel = -1; server.aof_last_write_status = C_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; @@ -5407,6 +5412,8 @@ const char *replstateToString(int replstate) { case SLAVE_STATE_WAIT_BGSAVE_START: case SLAVE_STATE_WAIT_BGSAVE_END: return "wait_bgsave"; + case SLAVE_STATE_BG_RDB_LOAD: + return "bg_transfer"; case SLAVE_STATE_SEND_BULK: return "send_bulk"; case SLAVE_STATE_ONLINE: @@ -5989,8 +5996,10 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "master_last_io_seconds_ago:%d\r\n", server.master ? ((int)(server.unixtime-server.master->lastinteraction)) : -1, "master_sync_in_progress:%d\r\n", server.repl_state == REPL_STATE_TRANSFER, "slave_read_repl_offset:%lld\r\n", slave_read_repl_offset, - "slave_repl_offset:%lld\r\n", slave_repl_offset)); - /* clang-format on */ + "slave_repl_offset:%lld\r\n", slave_repl_offset, + "replicas_repl_buffer_size:%zu\r\n", server.pending_repl_data.len, + "replicas_repl_buffer_peak:%zu\r\n", server.pending_repl_data.peak)); + /* clang-format on */ if (server.repl_state == REPL_STATE_TRANSFER) { double perc = 0; @@ -6058,14 +6067,17 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { info = sdscatprintf(info, "slave%d:ip=%s,port=%d,state=%s," - "offset=%lld,lag=%ld\r\n", + "offset=%lld,lag=%ld,type=%s\r\n", slaveid,slaveip,slave->slave_listening_port,state, - slave->repl_ack_off, lag); + slave->repl_ack_off, lag, + slave->flags & CLIENT_REPL_RDB_CHANNEL ? "rdb-conn": + slave->replstate == SLAVE_STATE_BG_RDB_LOAD ? "main-conn": "normal-slave"); slaveid++; } } /* clang-format off */ info = sdscatprintf(info, FMTARGS( + "slaves_waiting_psync:%llu\r\n", (unsigned long long)raxSize(server.slaves_waiting_psync), "master_failover_state:%s\r\n", getFailoverStateString(), "master_replid:%s\r\n", server.replid, "master_replid2:%s\r\n", server.replid2, diff --git a/src/server.h b/src/server.h index 23bcda81d7..ec908bd96b 100644 --- a/src/server.h +++ b/src/server.h @@ -135,10 +135,12 @@ struct hdr_histogram; #define NET_HOST_STR_LEN 256 /* Longest valid hostname */ #define NET_IP_STR_LEN 46 /* INET6_ADDRSTRLEN is 46, but we need to be sure */ #define NET_ADDR_STR_LEN (NET_IP_STR_LEN+32) /* Must be enough for ip:port */ -#define NET_HOST_PORT_STR_LEN (NET_HOST_STR_LEN+32) /* Must be enough for hostname:port */ +#define NET_HOST_PORT_STR_LEN (NET_HOST_STR_LEN+64) /* Must be enough for hostname:port:connection-type */ #define CONFIG_BINDADDR_MAX 16 #define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}" +#define DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE 60 /* Grace period in seconds for replica main + channel to establish psync. */ #define INCREMENTAL_REHASHING_THRESHOLD_US 1000 /* Bucket sizes for client eviction pools. Each bucket stores clients with @@ -404,6 +406,12 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ #define CLIENT_PREREPL_DONE (1ULL<<51) /* Indicate that pre-replication has been done on the client */ +#define CLIENT_REPL_MAIN_CHANNEL (1ULL<<50) /* RDB Channel: track a connection + which is used for online replication data */ +#define CLIENT_REPL_RDB_CHANNEL (1ULL<<51) /* RDB Channel: track a connection + which is used for rdb snapshot */ +#define CLIENT_PROTECTED_RDB_CHANNEL (1ULL<<52) /* Client should kept until PSYNC establishment. */ + /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ typedef enum blocking_type { @@ -448,6 +456,7 @@ typedef enum { REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_NO_FULLSYNC_REPLY, /* If using rdb-channel for sync, mark main connection as psync conn */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ @@ -456,6 +465,18 @@ typedef enum { REPL_STATE_CONNECTED, /* Connected to master */ } repl_state; +/* Slave rdb-channel replication state. Used in server.repl_rdb_conn_state for + * slaves to remember what to do next. */ +typedef enum { + REPL_RDB_CONN_STATE_NONE = 0, /* No active replication */ + REPL_RDB_CONN_SEND_HANDSHAKE, /* Send handshake sequence to master */ + REPL_RDB_CONN_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ + REPL_RDB_CONN_RECEIVE_REPLCONF_REPLY, /* Wait for REPLCONF reply */ + REPL_RDB_CONN_RECEIVE_ENDOFF, /* Wait for $ENDOFF reply */ + REPL_RDB_CONN_RDB_LOAD, /* Loading rdb using rdb channel */ + REPL_RDB_CONN_RDB_LOADED, +} repl_rdb_conn_state; + /* The state of an in progress coordinated failover */ typedef enum { NO_FAILOVER = 0, /* No failover in progress */ @@ -474,6 +495,8 @@ typedef enum { #define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */ #define SLAVE_STATE_RDB_TRANSMITTED 10 /* RDB file transmitted - This state is used only for * a replica that only wants RDB without replication buffer */ +#define SLAVE_STATE_BG_RDB_LOAD 11 /* Main connection of a replica which uses rdb-channel-sync. */ + /* Slave capabilities. */ #define SLAVE_CAPA_NONE 0 @@ -484,6 +507,7 @@ typedef enum { #define SLAVE_REQ_NONE 0 #define SLAVE_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */ #define SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS (1 << 1) /* Exclude functions from RDB */ +#define SLAVE_REQ_RDB_CHANNEL (1 << 2) /* Use rdb-channel sync */ /* Mask of all bits in the slave requirements bitfield that represent non-standard (filtered) RDB requirements */ #define SLAVE_REQ_RDB_MASK (SLAVE_REQ_RDB_EXCLUDE_DATA | SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS) @@ -973,6 +997,13 @@ typedef struct replBufBlock { char buf[]; } replBufBlock; +/* Link list block, used by replDataBuf during rdb-channel sync to store + * replication data */ +typedef struct replDataBufBlock { + size_t size, used; + char buf[]; +} replDataBufBlock; + /* Database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -1131,6 +1162,12 @@ typedef struct replBacklog { * byte in the replication backlog buffer.*/ } replBacklog; +typedef struct replDataBuf { + list *blocks; /* List of replDataBufBlock */ + size_t len; /* Number of bytes stored in all blocks */ + size_t peak; +} replDataBuf; + typedef struct { list *clients; size_t mem_usage_sum; @@ -1222,6 +1259,8 @@ typedef struct client { char *slave_addr; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ int slave_req; /* Slave requirements: SLAVE_REQ_* */ + uint64_t associated_rdb_client_id; /* The client id of this replica's rdb connection */ + time_t rdb_client_disconnect_time; /* Time of the first freeClient call on this client. Used for delaying free. */ multiState mstate; /* MULTI/EXEC state */ blockingState bstate; /* blocking state */ long long woff; /* Last write global replication offset. */ @@ -1617,6 +1656,10 @@ struct valkeyServer { list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ + rax *slaves_waiting_psync; /* Radix tree using rdb-client id as keys and rdb-client as values. + * This rax contains slaves for the period from the beginning of + * their RDB connection to the end of their main connection's + * partial synchronization. */ client *current_client; /* The client that triggered the command execution (External or AOF). */ client *executing_client; /* The client executing the current command (possibly script or module). */ @@ -1656,6 +1699,7 @@ struct valkeyServer { off_t loading_loaded_bytes; time_t loading_start_time; off_t loading_process_events_interval_bytes; + time_t loading_process_events_interval_ms; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ @@ -1831,6 +1875,8 @@ struct valkeyServer { int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */ int rdb_child_type; /* Type of save by active child. */ int lastbgsave_status; /* C_OK or C_ERR */ + int master_supports_rdb_channel;/* Track whether the master is able to sync using rdb channel. + * -1 = unknown, 0 = no, 1 = yes. */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ int rdb_pipe_read; /* RDB pipe used to transfer the rdb data */ /* to the parent process in diskless repl. */ @@ -1881,6 +1927,7 @@ struct valkeyServer { int repl_ping_slave_period; /* Master pings the slave every N seconds */ replBacklog *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog circular buffer size */ + replDataBuf pending_repl_data; /* Replication data buffer for rdb-channel sync */ time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. @@ -1894,6 +1941,12 @@ struct valkeyServer { int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ int repl_diskless_sync_max_replicas;/* Max replicas for diskless repl BGSAVE * delay (start sooner if they all connect). */ + int rdb_channel_enabled; /* Config used to determine if the replica should + * use rdb channel for full syncs. */ + int wait_before_rdb_client_free;/* Grace period in seconds for replica main channel + * to establish psync. */ + int debug_sleep_after_fork; /* Debug param that force the main connection to + * sleep for N seconds after fork() in repl. */ size_t repl_buffer_mem; /* The memory of replication buffer. */ list *repl_buffer_blocks; /* Replication buffers blocks list * (serving replica clients and repl backlog) */ @@ -1904,13 +1957,23 @@ struct valkeyServer { int masterport; /* Port of master */ int repl_timeout; /* Timeout after N seconds of master idle */ client *master; /* Client that is master for this slave */ + uint64_t rdb_client_id; /* Rdb client id as it defined at master side */ + struct { + connection* conn; + char replid[CONFIG_RUN_ID_SIZE+1]; + long long reploff; + long long read_reploff; + int dbid; + } repl_provisional_master; client *cached_master; /* Cached master to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* Replication status if the instance is a slave */ + int repl_rdb_conn_state; /* State of the replica's rdb channel during rdb-channel sync */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ connection *repl_transfer_s; /* Slave -> Master SYNC connection */ + connection *repl_rdb_transfer_s; /* Master FULL SYNC connection (RDB download) */ int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ @@ -2672,12 +2735,14 @@ int clientHasPendingReplies(client *c); int updateClientMemUsageAndBucket(client *c); void removeClientFromMemUsageBucket(client *c, int allow_eviction); void unlinkClient(client *c); +void removeFromServerClientList(client *c); int writeToClient(client *c, int handler_installed); void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); void initThreadedIO(void); client *lookupClientByID(uint64_t id); +client *lookupRdbClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); @@ -2863,6 +2928,9 @@ void clearFailoverState(void); void updateFailoverStatus(void); void abortFailover(const char *err); const char *getFailoverStateString(void); +void abortRdbConnectionSync(void); +int sendCurrentOffsetToReplica(client* replica); +void addSlaveToPsyncWaitingRax(client* slave); /* Generic persistence functions */ void startLoadingFile(size_t size, char* filename, int rdbflags); diff --git a/tests/helpers/bg_server_sleep.tcl b/tests/helpers/bg_server_sleep.tcl new file mode 100644 index 0000000000..efdbf33dcd --- /dev/null +++ b/tests/helpers/bg_server_sleep.tcl @@ -0,0 +1,10 @@ +source tests/support/redis.tcl +source tests/support/util.tcl + +proc bg_server_sleep {host port sec} { + set r [redis $host $port 0] + $r client setname SLEEP_HANDLER + $r debug sleep $sec +} + +bg_server_sleep [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] \ No newline at end of file diff --git a/tests/helpers/gen_write_load.tcl b/tests/helpers/gen_write_load.tcl index 9a148b73cc..30fa981d25 100644 --- a/tests/helpers/gen_write_load.tcl +++ b/tests/helpers/gen_write_load.tcl @@ -2,17 +2,21 @@ source tests/support/valkey.tcl set ::tlsdir "tests/tls" -proc gen_write_load {host port seconds tls} { +proc gen_write_load {host port seconds tls key} { set start_time [clock seconds] set r [valkey $host $port 1 $tls] $r client setname LOAD_HANDLER $r select 9 while 1 { - $r set [expr rand()] [expr rand()] + if {$key == ""} { + $r set [expr rand()] [expr rand()] + } else { + $r set $key [expr rand()] + } if {[clock seconds]-$start_time > $seconds} { exit 0 } } } -gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] +gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] [lindex $argv 4] diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index 6f7a31d218..c0d06afa6f 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -203,10 +203,16 @@ start_server {} { } else { fail "Replicas didn't sync after master restart" } + set rdbchannel [lindex [r config get repl-rdb-channel] 1] + set psync_count 0 + if {$rdbchannel == "yes"} { + # Expect one fake psync + set psync_count 1 + } # Replication backlog is full assert {[status $master repl_backlog_first_byte_offset] > [status $master second_repl_offset]} - assert {[status $master sync_partial_ok] == 0} + assert {[status $master sync_partial_ok] == $psync_count} assert {[status $master sync_full] == 1} assert {[status $master rdb_last_load_keys_expired] == 2048} assert {[status $replica sync_full] == 1} diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl index c57b86897b..826d6104fc 100644 --- a/tests/integration/replication-buffer.tcl +++ b/tests/integration/replication-buffer.tcl @@ -1,6 +1,7 @@ # This test group aims to test that all replicas share one global replication buffer, # two replicas don't make replication buffer size double, and when there is no replica, # replica buffer will shrink. +foreach rdbchann {"yes" "no"} { start_server {tags {"repl external:skip"}} { start_server {} { start_server {} { @@ -8,6 +9,9 @@ start_server {} { set replica1 [srv -3 client] set replica2 [srv -2 client] set replica3 [srv -1 client] + $replica1 config set repl-rdb-channel $rdbchann + $replica2 config set repl-rdb-channel $rdbchann + $replica3 config set repl-rdb-channel $rdbchann set master [srv 0 client] set master_host [srv 0 host] @@ -18,6 +22,7 @@ start_server {} { $master config set repl-diskless-sync-delay 5 $master config set repl-diskless-sync-max-replicas 1 $master config set client-output-buffer-limit "replica 0 0 0" + $master config set repl-rdb-channel $rdbchann # Make sure replica3 is synchronized with master $replica3 replicaof $master_host $master_port @@ -39,7 +44,7 @@ start_server {} { fail "fail to sync with replicas" } - test {All replicas share one global replication buffer} { + test "All replicas share one global replication buffer rdbchannel $rdbchann" { set before_used [s used_memory] populate 1024 "" 1024 ; # Write extra 1M data # New data uses 1M memory, but all replicas use only one @@ -47,19 +52,29 @@ start_server {} { # more than double of replication buffer. set repl_buf_mem [s mem_total_replication_buffers] set extra_mem [expr {[s used_memory]-$before_used-1024*1024}] - assert {$extra_mem < 2*$repl_buf_mem} - + if {$rdbchann == "yes"} { + # master's replication buffers should not grow during rdb-channel-sync + assert {$extra_mem < 1024*1024} + assert {$repl_buf_mem < 1024*1024} + } else { + assert {$extra_mem < 2*$repl_buf_mem} + } # Kill replica1, replication_buffer will not become smaller catch {$replica1 shutdown nosave} - wait_for_condition 50 100 { - [s connected_slaves] eq {2} + set cur_slave_count 2 + if {$rdbchann == "yes"} { + # slave3 is connected, slave2 is syncing (has two connection) + set cur_slave_count 3 + } + wait_for_condition 500 100 { + [s connected_slaves] eq $cur_slave_count } else { fail "replica doesn't disconnect with master" } assert_equal $repl_buf_mem [s mem_total_replication_buffers] } - test {Replication buffer will become smaller when no replica uses} { + test "Replication buffer will become smaller when no replica uses rdbchannel $rdbchann" { # Make sure replica3 catch up with the master wait_for_ofs_sync $master $replica3 @@ -71,12 +86,18 @@ start_server {} { } else { fail "replica2 doesn't disconnect with master" } - assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]} + if {$rdbchann == "yes"} { + # master's replication buffers should not grow during rdb-channel-sync + assert {1024*512 > [s mem_total_replication_buffers]} + } else { + assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]} + } } } } } } +} # This test group aims to test replication backlog size can outgrow the backlog # limit config if there is a slow replica which keep massive replication buffers, @@ -84,6 +105,7 @@ start_server {} { # partial re-synchronization. Of course, replication backlog memory also can # become smaller when master disconnects with slow replicas since output buffer # limit is reached. +foreach rdbchannel {yes no} { start_server {tags {"repl external:skip"}} { start_server {} { start_server {} { @@ -91,6 +113,7 @@ start_server {} { set replica1_pid [s -2 process_id] set replica2 [srv -1 client] set replica2_pid [s -1 process_id] + $replica1 config set repl-rdb-channel $rdbchannel set master [srv 0 client] set master_host [srv 0 host] @@ -99,6 +122,7 @@ start_server {} { $master config set save "" $master config set repl-backlog-size 16384 $master config set client-output-buffer-limit "replica 0 0 0" + $master config set repl-rdb-channel $rdbchannel # Executing 'debug digest' on master which has many keys costs much time # (especially in valgrind), this causes that replica1 and replica2 disconnect @@ -106,11 +130,13 @@ start_server {} { $master config set repl-timeout 1000 $replica1 config set repl-timeout 1000 $replica2 config set repl-timeout 1000 + $replica2 config set client-output-buffer-limit "replica 0 0 0" + $replica2 config set repl-rdb-channel $rdbchannel $replica1 replicaof $master_host $master_port wait_for_sync $replica1 - test {Replication backlog size can outgrow the backlog limit config} { + test "Replication backlog size can outgrow the backlog limit config rdbchannel $rdbchannel" { # Generating RDB will take 1000 seconds $master config set rdb-key-save-delay 1000000 populate 1000 master 10000 @@ -124,7 +150,7 @@ start_server {} { } # Replication actual backlog grow more than backlog setting since # the slow replica2 kept replication buffer. - populate 10000 master 10000 + populate 20000 master 10000 assert {[s repl_backlog_histlen] > [expr 10000*10000]} } @@ -135,7 +161,7 @@ start_server {} { fail "Replica offset didn't catch up with the master after too long time" } - test {Replica could use replication buffer (beyond backlog config) for partial resynchronization} { + test "Replica could use replication buffer (beyond backlog config) for partial resynchronization rdbchannel $rdbchannel" { # replica1 disconnects with master $replica1 replicaof [srv -1 host] [srv -1 port] # Write a mass of data that exceeds repl-backlog-size @@ -151,21 +177,33 @@ start_server {} { # replica2 still waits for bgsave ending assert {[s rdb_bgsave_in_progress] eq {1} && [lindex [$replica2 role] 3] eq {sync}} # master accepted replica1 partial resync - assert_equal [s sync_partial_ok] {1} + if { $rdbchannel == "yes" } { + # 2 psync using main channel + # +1 "real" psync + assert_equal [s sync_partial_ok] {3} + } else { + assert_equal [s sync_partial_ok] {1} + } assert_equal [$master debug digest] [$replica1 debug digest] } test {Replication backlog memory will become smaller if disconnecting with replica} { assert {[s repl_backlog_histlen] > [expr 2*10000*10000]} - assert_equal [s connected_slaves] {2} + if {$rdbchannel == "yes"} { + # 1 connection of replica1 + # +2 connections during sync of replica2 + assert_equal [s connected_slaves] {3} + } else { + assert_equal [s connected_slaves] {2} + } pause_process $replica2_pid r config set client-output-buffer-limit "replica 128k 0 0" # trigger output buffer limit check - r set key [string repeat A [expr 64*1024]] + r set key [string repeat A [expr 64*2048]] # master will close replica2's connection since replica2's output # buffer limit is reached, so there only is replica1. - wait_for_condition 100 100 { + wait_for_condition 1000000 100 { [s connected_slaves] eq {1} } else { fail "master didn't disconnect with replica2" @@ -185,15 +223,19 @@ start_server {} { } } } +} -test {Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size} { +foreach rdbchann {"yes" "no"} { +test "Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size. rdbchann $rdbchann" { start_server {tags {"repl external:skip"}} { start_server {} { r config set save "" r config set repl-backlog-size 100mb r config set client-output-buffer-limit "replica 512k 0 0" + r config set repl-rdb-channel $rdbchann set replica [srv -1 client] + $replica config set repl-rdb-channel $rdbchann $replica replicaof [srv 0 host] [srv 0 port] wait_for_sync $replica @@ -210,8 +252,13 @@ test {Partial resynchronization is successful even client-output-buffer-limit is r set key $big_str ;# trigger output buffer limit check wait_for_ofs_sync r $replica # master accepted replica partial resync + set psync_count 1 + if {$rdbchann == "yes"} { + # One fake and one real psync + set psync_count 2 + } assert_equal [s sync_full] {1} - assert_equal [s sync_partial_ok] {1} + assert_equal [s sync_partial_ok] $psync_count r multi r set key $big_str @@ -225,13 +272,13 @@ test {Partial resynchronization is successful even client-output-buffer-limit is fail "Replica offset didn't catch up with the master after too long time" } assert_equal [s sync_full] {1} - assert_equal [s sync_partial_ok] {1} + assert_equal [s sync_partial_ok] $psync_count } } } # This test was added to make sure big keys added to the backlog do not trigger psync loop. -test {Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending} { +test "Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending. rdbchann $rdbchann" { proc client_field {r type f} { set client [$r client list type $type] if {![regexp $f=(\[a-zA-Z0-9-\]+) $client - res]} { @@ -252,6 +299,8 @@ test {Replica client-output-buffer size is limited to backlog_limit/16 when no r $master config set repl-backlog-size 16384 $master config set client-output-buffer-limit "replica 32768 32768 60" + $master config set repl-rdb-channel $rdbchann + $replica config set repl-rdb-channel $rdbchann # Key has has to be larger than replica client-output-buffer limit. set keysize [expr 256*1024] @@ -290,7 +339,11 @@ test {Replica client-output-buffer size is limited to backlog_limit/16 when no r # now we expect the replica to re-connect but fail partial sync (it doesn't have large # enough COB limit and must result in a full-sync) - assert {[status $master sync_partial_ok] == 0} + if {$rdbchann == "yes"} { + assert {[status $master sync_partial_ok] == [status $master sync_full]} + } else { + assert {[status $master sync_partial_ok] == 0} + } # Before this fix (#11905), the test would trigger an assertion in 'o->used >= c->ref_block_pos' test {The update of replBufBlock's repl_offset is ok - Regression test for #11666} { @@ -304,4 +357,5 @@ test {Replica client-output-buffer size is limited to backlog_limit/16 when no r } } } +} diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index dc1df0fa62..a01101a3a9 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -8,7 +8,7 @@ # If reconnect is > 0, the test actually try to break the connection and # reconnect with the master, otherwise just the initial synchronization is # checked for consistency. -proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl rdbchann reconnect} { start_server {tags {"repl"} overrides {save {}}} { start_server {overrides {save {}}} { @@ -21,7 +21,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco $master config set repl-backlog-ttl $backlog_ttl $master config set repl-diskless-sync $mdl $master config set repl-diskless-sync-delay 1 + $master config set repl-rdb-channel $rdbchann $slave config set repl-diskless-load $sdl + $slave config set repl-rdb-channel $rdbchann set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000] set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000] @@ -46,8 +48,8 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco } } - test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { - # Now while the clients are writing data, break the maste-slave + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, rdb-channel: $rdbchann, reconnect: $reconnect)" { + # Now while the clients are writing data, break the master-slave # link multiple times. if ($reconnect) { for {set j 0} {$j < $duration*10} {incr j} { @@ -74,19 +76,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco # Wait for the slave to reach the "online" # state from the POV of the master. - set retry 5000 - while {$retry} { - set info [$master info] - if {[string match {*slave0:*state=online*} $info]} { - break - } else { - incr retry -1 - after 100 - } - } - if {$retry == 0} { - error "assertion:Slave not correctly synchronized" - } + verify_replica_online $master 0 5000 # Wait that slave acknowledge it is online so # we are sure that DBSIZE and DEBUG DIGEST will not @@ -111,6 +101,10 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco fail "Master - Replica inconsistency, Run diff -u against /tmp/repldump*.txt for more info" } assert {[$master dbsize] > 0} + # if {$descr == "no backlog" && $mdl == "yes" && $sdl == "disabled"} { + # puts "Master port: $master_port" + # after 100000000 + # } eval $cond } } @@ -120,24 +114,26 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco tags {"external:skip"} { foreach mdl {no yes} { foreach sdl {disabled swapdb} { - test_psync {no reconnection, just sync} 6 1000000 3600 0 { - } $mdl $sdl 0 + foreach rdbchann {yes no} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl $rdbchann 0 - test_psync {ok psync} 6 100000000 3600 0 { - assert {[s -1 sync_partial_ok] > 0} - } $mdl $sdl 1 + test_psync {ok psync} 6 100000000 3600 0 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl $rdbchann 1 - test_psync {no backlog} 6 100 3600 0.5 { - assert {[s -1 sync_partial_err] > 0} - } $mdl $sdl 1 + test_psync {no backlog} 6 100 3600 0.5 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl $rdbchann 1 - test_psync {ok after delay} 3 100000000 3600 3 { - assert {[s -1 sync_partial_ok] > 0} - } $mdl $sdl 1 + test_psync {ok after delay} 3 100000000 3600 3 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl $rdbchann 1 - test_psync {backlog expired} 3 100000000 1 3 { - assert {[s -1 sync_partial_err] > 0} - } $mdl $sdl 1 + test_psync {backlog expired} 3 100000000 1 3 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl $rdbchann 1 + } } } } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 2118a8acdb..860a09a495 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -300,7 +300,7 @@ start_server {tags {"repl external:skip"}} { } } -foreach mdl {no yes} { +foreach mdl {no yes} rdbchannel {no yes} { foreach sdl {disabled swapdb} { start_server {tags {"repl external:skip"} overrides {save {}}} { set master [srv 0 client] @@ -316,7 +316,7 @@ foreach mdl {no yes} { lappend slaves [srv 0 client] start_server {overrides {save {}}} { lappend slaves [srv 0 client] - test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" { + test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl repl-rdb-channel=$rdbchannel" { # start load handles only inside the test, so that the test can be skipped set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] @@ -325,7 +325,11 @@ foreach mdl {no yes} { set load_handle4 [start_write_load $master_host $master_port 4] after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork + $master config set repl-rdb-channel $rdbchannel # Send SLAVEOF commands to slaves + [lindex $slaves 0] config set repl-rdb-channel $rdbchannel + [lindex $slaves 1] config set repl-rdb-channel $rdbchannel + [lindex $slaves 2] config set repl-rdb-channel $rdbchannel [lindex $slaves 0] config set repl-diskless-load $sdl [lindex $slaves 1] config set repl-diskless-load $sdl [lindex $slaves 2] config set repl-diskless-load $sdl @@ -335,7 +339,7 @@ foreach mdl {no yes} { # Wait for all the three slaves to reach the "online" # state from the POV of the master. - set retry 500 + set retry 1000 while {$retry} { set info [r -3 info] if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { @@ -374,6 +378,8 @@ foreach mdl {no yes} { wait_for_ofs_sync $master [lindex $slaves 1] wait_for_ofs_sync $master [lindex $slaves 2] + assert [string match *slaves_waiting_psync:0* [$master info replication]] + # Check digests set digest [$master debug digest] set digest0 [[lindex $slaves 0] debug digest] @@ -435,7 +441,7 @@ start_server {tags {"repl external:skip"} overrides {save {}}} { } # Diskless load swapdb when NOT async_loading (different master replid) -foreach testType {Successful Aborted} { +foreach testType {Successful Aborted} rdbchannel {yes no} { start_server {tags {"repl external:skip"}} { set replica [srv 0 client] set replica_host [srv 0 host] @@ -450,8 +456,10 @@ foreach testType {Successful Aborted} { $master config set repl-diskless-sync yes $master config set repl-diskless-sync-delay 0 $master config set save "" + $master config set repl-rdb-channel $rdbchannel $replica config set repl-diskless-load swapdb $replica config set save "" + $replica config set repl-rdb-channel $rdbchannel # Put different data sets on the master and replica # We need to put large keys on the master since the replica replies to info only once in 2mb @@ -471,7 +479,7 @@ foreach testType {Successful Aborted} { # Start the replication process $replica replicaof $master_host $master_port - test {Diskless load swapdb (different replid): replica enter loading} { + test "Diskless load swapdb (different replid): replica enter loading repl-rdb-channel=$rdbchannel" { # Wait for the replica to start reading the rdb wait_for_condition 100 100 { [s -1 loading] eq 1 @@ -495,7 +503,7 @@ foreach testType {Successful Aborted} { fail "Replica didn't disconnect" } - test {Diskless load swapdb (different replid): old database is exposed after replication fails} { + test "Diskless load swapdb (different replid): old database is exposed after replication fails rdb-channel=$rdbchannel" { # Ensure we see old values from replica assert_equal [$replica get mykey] "myvalue" @@ -517,7 +525,7 @@ foreach testType {Successful Aborted} { fail "Master <-> Replica didn't finish sync" } - test {Diskless load swapdb (different replid): new database is exposed after swapping} { + test "Diskless load swapdb (different replid): new database is exposed after swapping rdb-channel=$rdbchannel" { # Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status assert_equal [$replica GET mykey] "" @@ -548,6 +556,7 @@ foreach testType {Successful Aborted} { $master config set save "" $replica config set repl-diskless-load swapdb $replica config set save "" + $replica config set repl-rdb-channel no; # Doesn't work with swapdb # Set replica writable so we can check that a key we manually added is served # during replication and after failure, but disappears on success @@ -852,6 +861,7 @@ start_server {tags {"repl external:skip"} overrides {save ""}} { $master config set repl-diskless-sync yes $master config set repl-diskless-sync-delay 5 $master config set repl-diskless-sync-max-replicas 2 + $master config set repl-rdb-channel "no"; # rdb-channel doesn't use pipe set master_host [srv 0 host] set master_port [srv 0 port] set master_pid [srv 0 pid] @@ -1041,8 +1051,8 @@ test "diskless replication child being killed is collected" { } } {} {external:skip} -foreach mdl {yes no} { - test "replication child dies when parent is killed - diskless: $mdl" { +foreach mdl {yes no} rdbchannel {yes no} { + test "replication child dies when parent is killed - diskless: $mdl repl-rdb-channel: $rdbchannel" { # when master is killed, make sure the fork child can detect that and exit start_server {tags {"repl"} overrides {save ""}} { set master [srv 0 client] @@ -1056,6 +1066,7 @@ foreach mdl {yes no} { $master debug populate 10000 start_server {overrides {save ""}} { set replica [srv 0 client] + $replica config set repl-rdb-channel $rdbchannel $replica replicaof $master_host $master_port # wait for rdb child to start @@ -1235,69 +1246,80 @@ test {Kill rdb child process if its dumping RDB is not useful} { } } } {} {external:skip} - -start_server {tags {"repl external:skip"}} { - set master1_host [srv 0 host] - set master1_port [srv 0 port] - r set a b - - start_server {} { - set master2 [srv 0 client] - set master2_host [srv 0 host] - set master2_port [srv 0 port] - # Take 10s for dumping RDB - $master2 debug populate 10 master2 10 - $master2 config set rdb-key-save-delay 1000000 +foreach rdbchannel {yes no} { + start_server {tags {"repl external:skip"}} { + set master1 [srv 0 client] + set master1_host [srv 0 host] + set master1_port [srv 0 port] + $master1 config set repl-rdb-channel $rdbchannel + r set a b start_server {} { - set sub_replica [srv 0 client] + set master2 [srv 0 client] + set master2_host [srv 0 host] + set master2_port [srv 0 port] + # Take 10s for dumping RDB + $master2 debug populate 10 master2 10 + $master2 config set rdb-key-save-delay 1000000 + $master2 config set repl-rdb-channel $rdbchannel start_server {} { - # Full sync with master1 - r slaveof $master1_host $master1_port - wait_for_sync r - assert_equal "b" [r get a] - - # Let sub replicas sync with me - $sub_replica slaveof [srv 0 host] [srv 0 port] - wait_for_sync $sub_replica - assert_equal "b" [$sub_replica get a] - - # Full sync with master2, and then kill master2 before finishing dumping RDB - r slaveof $master2_host $master2_port - wait_for_condition 50 100 { - ([s -2 rdb_bgsave_in_progress] == 1) && - ([string match "*wait_bgsave*" [s -2 slave0]]) - } else { - fail "full sync didn't start" - } - catch {$master2 shutdown nosave} - - test {Don't disconnect with replicas before loading transferred RDB when full sync} { - assert ![log_file_matches [srv -1 stdout] "*Connection with master lost*"] - # The replication id is not changed in entire replication chain - assert_equal [s master_replid] [s -3 master_replid] - assert_equal [s master_replid] [s -1 master_replid] - } + set sub_replica [srv 0 client] + $sub_replica config set repl-rdb-channel $rdbchannel - test {Discard cache master before loading transferred RDB when full sync} { - set full_sync [s -3 sync_full] - set partial_sync [s -3 sync_partial_ok] - # Partial sync with master1 + start_server {} { + # Full sync with master1 + set replica [srv 0 client] + $replica config set repl-rdb-channel $rdbchannel r slaveof $master1_host $master1_port wait_for_sync r - # master1 accepts partial sync instead of full sync - assert_equal $full_sync [s -3 sync_full] - assert_equal [expr $partial_sync+1] [s -3 sync_partial_ok] - - # Since master only partially sync replica, and repl id is not changed, - # the replica doesn't disconnect with its sub-replicas - assert_equal [s master_replid] [s -3 master_replid] - assert_equal [s master_replid] [s -1 master_replid] - assert ![log_file_matches [srv -1 stdout] "*Connection with master lost*"] - # Sub replica just has one full sync, no partial resync. - assert_equal 1 [s sync_full] - assert_equal 0 [s sync_partial_ok] + assert_equal "b" [r get a] + + # Let sub replicas sync with me + $sub_replica slaveof [srv 0 host] [srv 0 port] + wait_for_sync $sub_replica + assert_equal "b" [$sub_replica get a] + + # Full sync with master2, and then kill master2 before finishing dumping RDB + r slaveof $master2_host $master2_port + wait_for_condition 50 100 { + ([s -2 rdb_bgsave_in_progress] == 1) && + ([string match "*wait_bgsave*" [s -2 slave0]]) + } else { + fail "full sync didn't start" + } + catch {$master2 shutdown nosave} + + test "Don't disconnect with replicas before loading transferred RDB when full sync with rdb-channel $rdbchannel" { + assert ![log_file_matches [srv -1 stdout] "*Connection with master lost*"] + # The replication id is not changed in entire replication chain + assert_equal [s master_replid] [s -3 master_replid] + assert_equal [s master_replid] [s -1 master_replid] + } + + test "Discard cache master before loading transferred RDB when full sync with rdb-channel $rdbchannel" { + set full_sync [s -3 sync_full] + set partial_sync [s -3 sync_partial_ok] + # Partial sync with master1 + r slaveof $master1_host $master1_port + wait_for_sync r + # master1 accepts partial sync instead of full sync + assert_equal $full_sync [s -3 sync_full] + assert_equal [expr $partial_sync+1] [s -3 sync_partial_ok] + + # Since master only partially sync replica, and repl id is not changed, + # the replica doesn't disconnect with its sub-replicas + assert_equal [s master_replid] [s -3 master_replid] + assert_equal [s master_replid] [s -1 master_replid] + assert ![log_file_matches [srv -1 stdout] "*Connection with master lost*"] + # Sub replica just has one full sync, no partial resync. + assert_equal 1 [s sync_full] + if {$rdbchannel == "yes"} { + assert_equal 1 [s sync_partial_ok] + } else { + assert_equal 0 [s sync_partial_ok] + } + } } } } @@ -1454,3 +1476,661 @@ start_server {tags {"repl external:skip"}} { } } } + +start_server {tags {"repl rdb-channel external:skip"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Configure the master in order to hang waiting for the BGSAVE + # operation, so that the replica remains in the handshake state. + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 1000 + $master config set repl-rdb-channel yes + + # Start the replication process... + $replica config set repl-rdb-channel yes + $replica slaveof $master_host $master_port + + test {Test repl-rdb-channel replica enters handshake} { + wait_for_condition 50 1000 { + [string match *handshake* [$replica role]] + } else { + fail "Replica does not enter handshake state" + } + } + + test {Test repl-rdb-channel enters wait_bgsave} { + wait_for_condition 50 1000 { + [string match *state=wait_bgsave* [$master info replication]] + } else { + fail "Replica does not enter wait_bgsave state" + } + } + + $master config set repl-diskless-sync-delay 0 + + test {Test repl-rdb-channel replica is able to sync} { + verify_replica_online $master 0 500 + wait_for_condition 50 1000 { + [string match *connected_slaves:1* [$master info]] + } else { + fail "Replica rdb connection is still open" + } + $master set foo 1 + # wait for value to propegate to replica + wait_for_condition 50 1000 { + [$replica get foo] == 1 + } else { + fail "Replica isn't connected" + } + } + } +} + +start_server {tags {"repl rdb-channel external:skip"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $master config set rdb-key-save-delay 200 + $master config set repl-rdb-channel yes + $replica config set repl-rdb-channel yes + $replica config set repl-diskless-sync no + + populate 1000 master 10000 + set load_handle1 [start_one_key_write_load $master_host $master_port 100 "mykey1"] + set load_handle2 [start_one_key_write_load $master_host $master_port 100 "mykey2"] + set load_handle3 [start_one_key_write_load $master_host $master_port 100 "mykey3"] + + # wait for load handlers to start + wait_for_condition 50 1000 { + ([$master get "mykey1"] != "") && + ([$master get "mykey2"] != "") && + ([$master get "mykey3"] != "") + } else { + fail "Can't set new keys" + } + + set before_used [s 0 used_memory] + + test {Primary memory usage does not increase during rdb-channel sync} { + $replica slaveof $master_host $master_port + + # Verify used_memory stays low through all the sync + set max_retry 500 + while {$max_retry} { + # Verify memory + set used_memory [s 0 used_memory] + assert {$used_memory-$before_used <= 1.5*10^6}; # ~1/3 of the space + # Check replica state + set master_info [$master info] + set replica_info [$replica info] + if {[string match *slave0:*state=online* $master_info] && + [string match *master_link_status:up* $replica_info]} { + break + } else { + incr max_retry -1 + after 10 + } + } + if {$max_retry == 0} { + error "assertion:Replica not in sync after 5 seconds" + } + } + stop_write_load $load_handle1 + stop_write_load $load_handle2 + stop_write_load $load_handle3 + + test {Steady state after rdb channel sync} { + set val1 [$master get mykey1] + set val2 [$master get mykey2] + set val3 [$master get mykey3] + wait_for_condition 50 1000 { + ([$replica get "mykey1"] eq $val1) && + ([$replica get "mykey2"] eq $val2) && + ([$replica get "mykey3"] eq $val3) + } else { + fail "Can't set new keys" + } + } + + test {Rollback to nornal sync} { + $replica slaveof no one + $replica config set repl-rdb-channel no + $master set newkey newval + + set sync_full [s 0 sync_full] + assert {$sync_full > 0} + + $replica slaveof $master_host $master_port + verify_replica_online $master 0 500 + assert_equal [expr $sync_full + 1] [s 0 sync_full] + assert [string match *connected_slaves:1* [$master info]] + } + } +} + +start_server {tags {"repl rdb-channel external:skip"}} { + foreach start_with_rdb_sync_enabled {yes no} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $master config set repl-diskless-sync yes + $master config set client-output-buffer-limit "replica 1100k 0 0" + $replica config set repl-rdb-channel $start_with_rdb_sync_enabled + + test "Test enable disable repl-rdb-channel start with $start_with_rdb_sync_enabled" { + # Set master shared replication buffer size to a bit more then the size of + # a replication buffer block. + + populate 1000 master 10000 + + $replica slaveof $master_host $master_port + verify_replica_online $master 0 500 + + set sync_full [s 0 sync_full] + assert {$sync_full > 0} + + $replica slaveof no one + if {$start_with_rdb_sync_enabled == "yes"} { + # disable rdb channel sync + $replica config set repl-rdb-channel no + } else { + $replica config set repl-rdb-channel yes + } + + # Force replica to full sync next time + populate 1000 master 10000 + + $replica slaveof $master_host $master_port + verify_replica_online $master 0 500 + wait_for_sync $replica + + wait_for_condition 100 100 { + [s 0 sync_full] > $sync_full + } else { + fail "Master <-> Replica didn't start the full sync" + } + } + } + } +} + +start_server {tags {"repl rdb-channel external:skip"}} { + set replica1 [srv 0 client] + set replica1_host [srv 0 host] + set replica1_port [srv 0 port] + set replica1_log [srv 0 stdout] + start_server {} { + set replica2 [srv 0 client] + set replica2_host [srv 0 host] + set replica2_port [srv 0 port] + set replica2_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + set loglines [count_log_lines -1] + + populate 10000 master 10000 + $master set key1 val1 + + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 5; # allow both replicas to ask for sync + $master config set repl-rdb-channel yes + + $replica1 config set repl-rdb-channel yes + $replica2 config set repl-rdb-channel yes + $replica1 config set repl-diskless-sync no + $replica2 config set repl-diskless-sync no + $replica1 config set loglevel debug + $replica2 config set loglevel debug + + test "Two replicas in one sync session with repl-rdb-channel" { + $replica1 slaveof $master_host $master_port + $replica2 slaveof $master_host $master_port + + wait_for_value_to_propegate_to_replica $master $replica1 "key1" + wait_for_value_to_propegate_to_replica $master $replica2 "key1" + + wait_for_condition 100 100 { + [s 0 total_forks] eq "1" + } else { + fail "Master <-> Replica didn't start the full sync" + } + + verify_replica_online $master 0 500 + verify_replica_online $master 1 500 + set res [wait_for_log_messages -2 {"*MASTER <-> REPLICA sync: Finished with success*"} $loglines 2000 1] + set loglines [lindex $res 1] + incr $loglines + } + + $replica1 slaveof no one + $replica2 slaveof no one + + $replica1 config set repl-rdb-channel yes + $replica2 config set repl-rdb-channel no + + $master set key2 val2 + + test "Test one replica with repl-rdb-channel enabled one with disabled" { + $replica1 slaveof $master_host $master_port + $replica2 slaveof $master_host $master_port + + wait_for_value_to_propegate_to_replica $master $replica1 "key2" + wait_for_value_to_propegate_to_replica $master $replica2 "key2" + + verify_replica_online $master 0 500 + verify_replica_online $master 1 500 + set res [wait_for_log_messages -2 {"*MASTER <-> REPLICA sync: Finished with success*"} $loglines 2000 1] + set loglines [lindex $res 1] + incr $loglines + } + + $replica1 slaveof no one + $master set key4 val4 + + test "Test replica's buffer limit reached" { + $master config set repl-diskless-sync-delay 0 + $master config set rdb-key-save-delay 500 + # At this point we have about 10k keys in the db, + # We expect that the next full sync will take 5 seconds (10k*500)ms + # It will give us enough time to fill the replica buffer. + $replica1 config set repl-rdb-channel yes + $replica1 config set client-output-buffer-limit "replica 16383 16383 0" + $replica1 config set loglevel debug + + $replica1 slaveof $master_host $master_port + # Wait for replica to establish psync using main connection + wait_for_condition 50 1000 { + [log_file_matches $replica1_log "*Master accepted a Partial Resynchronization, RDB load in background.*"] + } else { + fail "Psync hasn't been established" + } + + populate 10000 master 10000 + # Wait for replica's buffer limit reached + wait_for_condition 50 1000 { + [log_file_matches $replica1_log "*Replication buffer limit reached, stopping buffering*"] + } else { + fail "Replica buffer should fill" + } + assert {[s -2 replicas_replication_buffer_size] <= 16385*2} + + # Wait for sync to succeed + wait_for_value_to_propegate_to_replica $master $replica1 "key4" + set res [wait_for_log_messages -2 {"*MASTER <-> REPLICA sync: Finished with success*"} $loglines 4000 1] + set loglines [lindex $res 1] + incr $loglines + } + + $replica1 slaveof no one + $replica1 config set client-output-buffer-limit "replica 256mb 256mb 0"; # remove repl buffer limitation + + $master set key5 val5 + + test "Rdb-channel-sync fails when primary diskless disabled" { + $master config set repl-diskless-sync no + + $replica1 config set repl-rdb-channel yes + $replica1 slaveof $master_host $master_port + + # Wait for sync to fail + wait_for_condition 100 50 { + [log_file_matches $replica1_log "*Master does not understand REPLCONF main-conn*"] + } else { + fail "rdb-connection sync rollback should have been triggered." + } + + # Wait for mitigation and resync + wait_for_value_to_propegate_to_replica $master $replica1 "key5" + + set res [wait_for_log_messages -2 {"*MASTER <-> REPLICA sync: Finished with success*"} $loglines 4000 1] + set loglines [lindex $res 1] + incr $loglines + } + } + } +} + +start_server {tags {"repl rdb-channel external:skip"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + set loglines [count_log_lines -1] + # Create small enough db to be loaded before replica establish psync connection + $master set key1 val1 + + $master config set repl-diskless-sync yes + $master debug sleep-after-fork [expr {5 * [expr {10 ** 6}]}];# Stop master after fork for 5 seconds + $master config set repl-rdb-channel yes + + $replica config set repl-rdb-channel yes + $replica config set loglevel debug + + test "Test rdb-channel psync established after rdb load" { + $replica slaveof $master_host $master_port + + wait_for_value_to_propegate_to_replica $master $replica "key1" + + verify_replica_online $master 0 500 + set res [wait_for_log_messages -1 {"*MASTER <-> REPLICA sync: Finished with success*"} $loglines 2000 1] + # Confirm the occurrence of a race condition. + set res [wait_for_log_messages -1 {"*RDB channel sync - psync established after rdb load*"} $loglines 2000 1] + set loglines [lindex $res 1] + incr $loglines + } + } +} + +start_server {tags {"repl rdb-channel external:skip"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + set backlog_size [expr {10 ** 6}] + set loglines [count_log_lines -1] + + $master config set repl-diskless-sync yes + $master config set repl-rdb-channel yes + $master config set repl-backlog-size $backlog_size + $master config set loglevel debug + $master config set repl-timeout 10 + $master config set rdb-key-save-delay 200 + populate 10000 master 10000 + + set load_handle0 [start_write_load $master_host $master_port 20] + + $replica config set repl-rdb-channel yes + $replica config set loglevel debug + $replica config set repl-timeout 10 + # Stop replica after master fork for 2 seconds + $replica debug sleep-after-fork [expr {2 * [expr {10 ** 6}]}] + + test "Test rdb-channel connection peering - replica able to establish psync" { + $replica slaveof $master_host $master_port + # Verify repl backlog can grow + wait_for_condition 5000 10 { + [s 0 mem_total_replication_buffers] > [expr {2 * $backlog_size}] + } else { + fail "Master should allow backlog to grow beyond its limits during rdb-channel sync handshake" + } + + verify_replica_online $master 0 500 + set res [wait_for_log_messages -1 {"*MASTER <-> REPLICA sync: Finished with success*"} $loglines 2000 1] + } + + stop_write_load $load_handle0 + } +} + +start_server {tags {"repl rdb-channel external:skip"}} { + set replica1 [srv 0 client] + set replica1_host [srv 0 host] + set replica1_port [srv 0 port] + set replica1_log [srv 0 stdout] + start_server {} { + set replica2 [srv 0 client] + set replica2_host [srv 0 host] + set replica2_port [srv 0 port] + set replica2_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + set backlog_size [expr {10 ** 6}] + set loglines [count_log_lines -1] + + $master config set repl-diskless-sync yes + $master config set repl-rdb-channel yes + $master config set repl-backlog-size $backlog_size + $master config set loglevel debug + $master config set repl-timeout 10 + $master config set rdb-key-save-delay 10 + populate 1024 master 16 + + set load_handle0 [start_write_load $master_host $master_port 20] + + $replica1 config set repl-rdb-channel yes + $replica2 config set repl-rdb-channel yes + $replica1 config set loglevel debug + $replica2 config set loglevel debug + $replica1 config set repl-timeout 10 + $replica2 config set repl-timeout 10 + + # Stop replica after master fork for 2 seconds + $replica1 debug sleep-after-fork [expr {2 * [expr {10 ** 6}]}] + $replica2 debug sleep-after-fork [expr {2 * [expr {10 ** 6}]}] + test "Test rdb-channel connection peering - start with empty backlog (retrospect)" { + $replica1 slaveof $master_host $master_port + set res [wait_for_log_messages 0 {"*Add slave * repl-backlog is empty*"} $loglines 2000 1] + set res [wait_for_log_messages 0 {"*Retrospect attach slave*"} $loglines 2000 1] + set loglines [lindex $res 1] + incr $loglines + verify_replica_online $master 0 700 + wait_for_log_messages -2 {"*MASTER <-> REPLICA sync: Finished with success*"} 0 2000 1 + $replica1 slaveof no one + assert [string match *slaves_waiting_psync:0* [$master info replication]] + } + + test "Test rdb-channel connection peering - start with backlog" { + $replica2 slaveof $master_host $master_port + set res [wait_for_log_messages 0 {"*Add slave * with repl-backlog tail*"} $loglines 2000 1] + set loglines [lindex $res 1] + incr $loglines + verify_replica_online $master 0 700 + wait_for_log_messages -1 {"*MASTER <-> REPLICA sync: Finished with success*"} 0 2000 1 + assert [string match *slaves_waiting_psync:0* [$master info replication]] + } + + stop_write_load $load_handle0 + } + } +} + +proc start_bg_server_sleep {host port sec} { + set tclsh [info nameofexecutable] + exec $tclsh tests/helpers/bg_server_sleep.tcl $host $port $sec & +} + +proc stop_bg_server_sleep {handle} { + catch {exec /bin/kill -9 $handle} +} + +start_server {tags {"repl rdb-channel external:skip"}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $master config set repl-diskless-sync yes + $master config set repl-rdb-channel yes + $master config set repl-backlog-size [expr {10 ** 6}] + $master config set loglevel debug + $master config set repl-timeout 10 + # generate small db + populate 10 master 10 + # Stop master main process after fork for 2 seconds + $master debug sleep-after-fork [expr {2 * [expr {10 ** 6}]}] + $master debug wait-before-rdb-client-free 5 + + start_server {} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + set loglines [count_log_lines 0] + + set load_handle0 [start_write_load $master_host $master_port 20] + + $replica config set repl-rdb-channel yes + $replica config set loglevel debug + $replica config set repl-timeout 10 + + test "Test rdb-channel psync established after rdb load, master keep repl-block" { + $replica slaveof $master_host $master_port + set res [wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1] + set loglines [lindex $res 1] + incr $loglines + # At this point rdb is loaded but psync hasn't been established yet. + # Force the replica to sleep for 3 seconds so the master main process will wake up, while the replica is unresponsive. + set sleep_handle [start_bg_server_sleep $replica_host $replica_port 3] + wait_for_condition 50 100 { + [string match {*slaves_waiting_psync:1*} [$master info replication]] + } else { + fail "Master freed RDB client before psync was established" + } + + verify_replica_online $master 0 500 + wait_for_condition 50 100 { + [string match {*slaves_waiting_psync:0*} [$master info replication]] + } else { + fail "Master did not free repl buf block after psync establishment" + } + $replica slaveof no one + } + stop_write_load $load_handle0 + } + + start_server {} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + set loglines [count_log_lines 0] + + set load_handle0 [start_write_load $master_host $master_port 20] + + $replica config set repl-rdb-channel yes + $replica config set loglevel debug + $replica config set repl-timeout 10 + + test "Test rdb-channel psync established after rdb load, master dismiss repl-block" { + $replica slaveof $master_host $master_port + set res [wait_for_log_messages 0 {"*Done loading RDB*"} $loglines 2000 1] + set loglines [lindex $res 1] + incr $loglines + # At this point rdb is loaded but psync hasn't been established yet. + # Force the replica to sleep for 8 seconds so the master main process will wake up, while the replica is unresponsive. + # We expect the grace time to be over before the replica wake up, so sync will fail. + set sleep_handle [start_bg_server_sleep $replica_host $replica_port 8] + wait_for_condition 50 100 { + [string match {*slaves_waiting_psync:1*} [$master info replication]] + } else { + fail "Master should wait before freeing repl block" + } + + # Sync should fail once the replica ask for PSYNC using main channel + set res [wait_for_log_messages -1 {"*Replica main connection failed to establish PSYNC within the grace period*"} 0 4000 1] + + # Should succeed on retry + verify_replica_online $master 0 500 + wait_for_condition 50 100 { + [string match {*slaves_waiting_psync:0*} [$master info replication]] + } else { + fail "Master did not free repl buf block after psync establishment" + } + $replica slaveof no one + } + stop_write_load $load_handle0 + } +} + +start_server {tags {"repl rdb-channel external:skip"}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + set loglines [count_log_lines 0] + + $master config set repl-diskless-sync yes + $master config set repl-rdb-channel yes + $master config set client-output-buffer-limit "replica 1100k 0 0" + $master config set loglevel debug + # generate small db + populate 10 master 10 + # Stop master main process after fork for 1 seconds + $master debug sleep-after-fork [expr {2 * [expr {10 ** 6}]}] + start_server {} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + + set load_handle0 [start_write_load $master_host $master_port 20] + set load_handle1 [start_write_load $master_host $master_port 20] + set load_handle2 [start_write_load $master_host $master_port 20] + + $replica config set repl-rdb-channel yes + $replica config set loglevel debug + $replica config set repl-timeout 10 + + test "Test rdb-channel master gets cob overrun before established psync" { + $replica slaveof $master_host $master_port + wait_for_log_messages 0 {"*Done loading RDB*"} 0 2000 1 + + # At this point rdb is loaded but psync hasn't been established yet. + # Force the replica to sleep for 5 seconds so the master main process will wake up while the + # replica is unresponsive. We expect the main process to fill the COB before the replica wakes. + set sleep_handle [start_bg_server_sleep $replica_host $replica_port 5] + wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 2000 1 + wait_for_condition 50 100 { + [string match {*slaves_waiting_psync:0*} [$master info replication]] + } else { + fail "Master did not free repl buf block after sync failure" + } + set res [wait_for_log_messages -1 {"*Unable to partial resync with replica * for lack of backlog*"} $loglines 20000 1] + set loglines [lindex $res 1] + } + + $replica slaveof no one + $replica debug sleep-after-fork [expr {2 * [expr {10 ** 6}]}] + + $master debug populate 1000 master 100000 + # Set master with a slow rdb generation, so that we can easily intercept loading + # 10ms per key, with 1000 keys is 10 seconds + $master config set rdb-key-save-delay 10000 + $master debug sleep-after-fork 0 + + test "Test rdb-channel master gets cob overrun during replica rdb load" { + $replica slaveof $master_host $master_port + + wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 2000 1 + wait_for_condition 50 100 { + [string match {*slaves_waiting_psync:0*} [$master info replication]] + } else { + fail "Master did not free repl buf block after sync failure" + } + set res [wait_for_log_messages -1 {"*Unable to partial resync with replica * for lack of backlog*"} $loglines 20000 1] + set loglines [lindex $res 0] + } + stop_write_load $load_handle0 + stop_write_load $load_handle1 + stop_write_load $load_handle2 + } +} \ No newline at end of file diff --git a/tests/support/test.tcl b/tests/support/test.tcl index bb59ee7972..8f90f40744 100644 --- a/tests/support/test.tcl +++ b/tests/support/test.tcl @@ -141,6 +141,31 @@ proc wait_for_condition {maxtries delay e _else_ elsescript} { } } +proc verify_replica_online {master replica_idx max_retry} { + while {$max_retry} { + set info [$master info] + set pattern *slave$replica_idx:*state=online* + if {[string match $pattern $info]} { + break + } else { + incr max_retry -1 + after 100 + } + } + if {$max_retry == 0} { + error "assertion:Replica not correctly synchronized" + } +} + +proc wait_for_value_to_propegate_to_replica {master replica key} { + set val [$master get key] + wait_for_condition 50 1000 { + ([$replica get $key] eq $val) + } else { + error "key $key did not propegate" + } +} + # try to match a value to a list of patterns that are either regex (starts with "/") or plain string. # The caller can specify to use only glob-pattern match proc search_pattern_list {value pattern_list {glob_pattern false}} { diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 9d69e44232..9e9471a1de 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -555,7 +555,15 @@ proc find_valgrind_errors {stderr on_termination} { # of seconds to the specified the server instance. proc start_write_load {host port seconds} { set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls & + exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls "" & +} + +# Execute a background process writing only one key for the specified number +# of seconds to the specified Redis instance. This load handler is useful for +# tests which requires heavy replication stream but no memory load. +proc start_one_key_write_load {host port seconds key} { + set tclsh [info nameofexecutable] + exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls $key & } # Stop a process generating write load executed with start_write_load. diff --git a/tests/unit/auth.tcl b/tests/unit/auth.tcl index d3c6156e6b..3fea6e9c42 100644 --- a/tests/unit/auth.tcl +++ b/tests/unit/auth.tcl @@ -47,6 +47,7 @@ start_server {tags {"auth external:skip"} overrides {requirepass foobar}} { } } +foreach rdbchann {yes no} { start_server {tags {"auth_binary_password external:skip"}} { test {AUTH fails when binary password is wrong} { r config set requirepass "abc\x00def" @@ -65,12 +66,13 @@ start_server {tags {"auth_binary_password external:skip"}} { set master_port [srv -1 port] set slave [srv 0 client] - test {MASTERAUTH test with binary password} { + test "MASTERAUTH test with binary password rdbchannel = $rdbchann" { $master config set requirepass "abc\x00def" - + $master config set repl-rdb-channel $rdbchann # Configure the replica with masterauth set loglines [count_log_lines 0] $slave config set masterauth "abc" + $slave config set repl-rdb-channel $rdbchann $slave slaveof $master_host $master_port # Verify replica is not able to sync with master @@ -87,3 +89,4 @@ start_server {tags {"auth_binary_password external:skip"}} { } } } +} \ No newline at end of file