diff --git a/src/networking.c b/src/networking.c index 211507132f..2d13d56bf3 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1847,11 +1847,11 @@ client *lookupClientByID(uint64_t id) { } /* Return a client by ID, or NULL if the client ID is not in the set - * of slaves waiting psync clients. */ + * of replicas waiting psync clients. */ client *lookupRdbClientByID(uint64_t id) { id = htonu64(id); void *c = NULL; - raxFind(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),&c); + raxFind(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),&c); return c; } diff --git a/src/rdb.c b/src/rdb.c index fe9884842e..7c04ced3b9 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3600,7 +3600,7 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) { * to inform it with the save end offset.*/ sendCurrentOffsetToReplica(slave); /* Make sure repl traffic is appended to the replication backlog */ - addSlaveToPsyncWaitingRax(slave); + addReplicaToPsyncWaitingRax(slave); } else { server.rdb_pipe_numconns++; } diff --git a/src/replication.c b/src/replication.c index 207a2bfceb..c4008b36bc 100644 --- a/src/replication.c +++ b/src/replication.c @@ -215,7 +215,7 @@ void rebaseReplicationBuffer(long long base_repl_offset) { * On COB overrun, association is deleted and the RDB connection * is dropped. */ -void addSlaveToPsyncWaitingRax(client* slave) { +void addReplicaToPsyncWaitingRax(client* replica) { listNode *ln = NULL; replBufBlock *tail = NULL; if (server.repl_backlog == NULL) { @@ -227,52 +227,52 @@ void addSlaveToPsyncWaitingRax(client* slave) { tail->refcount++; } } - serverLog(LL_DEBUG, "Add slave %s to waiting psync rax, with cid %llu, %s ", replicationGetSlaveName(slave), (long long unsigned int)slave->id, + serverLog(LL_DEBUG, "Add replica %s to waiting psync rax, with cid %llu, %s ", replicationGetSlaveName(replica), (long long unsigned int)replica->id, tail? "with repl-backlog tail": "repl-backlog is empty"); - slave->ref_repl_buf_node = tail? ln: NULL; + replica->ref_repl_buf_node = tail? ln: NULL; /* Prevent rdb client from being freed before psync is established. */ - slave->flags |= CLIENT_PROTECTED_RDB_CHANNEL; - uint64_t id = htonu64(slave->id); - raxInsert(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),slave,NULL); + replica->flags |= CLIENT_PROTECTED_RDB_CHANNEL; + uint64_t id = htonu64(replica->id); + raxInsert(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),replica,NULL); } /* Attach waiting psync replicas with new replication backlog head. */ -void addSlaveToPsyncWaitingRaxRetrospect(void) { +void addReplicaToPsyncWaitingRaxRetrospect(void) { listNode *ln = listFirst(server.repl_buffer_blocks); replBufBlock *head = ln ? listNodeValue(ln) : NULL; raxIterator iter; if (head == NULL) return; - /* Update waiting psync slaves to wait on new buffer block */ - raxStart(&iter,server.slaves_waiting_psync); + /* Update waiting psync replicas to wait on new buffer block */ + raxStart(&iter,server.replicas_waiting_psync); raxSeek(&iter, "^", NULL, 0); while(raxNext(&iter)) { - client* slave = iter.data; - if (slave->ref_repl_buf_node) continue; - slave->ref_repl_buf_node = ln; + client* replica = iter.data; + if (replica->ref_repl_buf_node) continue; + replica->ref_repl_buf_node = ln; head->refcount++; - serverLog(LL_DEBUG, "Retrospect attach slave %llu to repl buf block", (long long unsigned int)slave->id); + serverLog(LL_DEBUG, "Retrospect attach replica %llu to repl buf block", (long long unsigned int)replica->id); } raxStop(&iter); } -void removeSlaveFromPsyncWaitingRax(client* slave) { +void removeReplicaFromPsyncWaitingRax(client* replica) { listNode *ln; replBufBlock *o; /* Get replBufBlock pointed by this replica */ - client *peer_slave = lookupRdbClientByID(slave->associated_rdb_client_id); - ln = peer_slave->ref_repl_buf_node; + client *peer_replica = lookupRdbClientByID(replica->associated_rdb_client_id); + ln = peer_replica->ref_repl_buf_node; o = ln ? listNodeValue(ln) : NULL; if (o != NULL) { serverAssert(o->refcount > 0); o->refcount--; } - peer_slave->ref_repl_buf_node = NULL; - peer_slave->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL; - serverLog(LL_DEBUG, "Remove psync waiting slave %s with cid %llu, repl buffer block %s", - replicationGetSlaveName(slave), (long long unsigned int)slave->associated_rdb_client_id, o? "ref count decreased": "doesn't exist"); - uint64_t id = htonu64(peer_slave->id); - raxRemove(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),NULL); + peer_replica->ref_repl_buf_node = NULL; + peer_replica->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL; + serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", + replicationGetSlaveName(replica), (long long unsigned int)replica->associated_rdb_client_id, o? "ref count decreased": "doesn't exist"); + uint64_t id = htonu64(peer_replica->id); + raxRemove(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),NULL); } void resetReplicationBuffer(void) { @@ -392,7 +392,7 @@ void incrementalTrimReplicationBacklog(size_t max_blocks) { void freeReplicaReferencedReplBuffer(client *replica) { if (replica->flags & CLIENT_REPL_RDB_CHANNEL) { uint64_t id = htonu64(replica->id); - if(raxRemove(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),NULL)) { + if(raxRemove(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),NULL)) { serverLog(LL_DEBUG, "Remove psync waiting slave %s with cid %llu from replicas rax.", replicationGetSlaveName(replica), (long long unsigned int)replica->associated_rdb_client_id); } @@ -473,9 +473,9 @@ void feedReplicationBuffer(char *s, size_t len) { server.master_repl_offset += copy; server.repl_backlog->histlen += copy; } - if (empty_backlog && raxSize(server.slaves_waiting_psync) > 0) { + if (empty_backlog && raxSize(server.replicas_waiting_psync) > 0) { /* Increase refcount for pending replicas. */ - addSlaveToPsyncWaitingRaxRetrospect(); + addReplicaToPsyncWaitingRaxRetrospect(); } /* For output buffer of replicas. */ @@ -889,7 +889,7 @@ int masterTryPartialResynchronization(client *c, long long psync_offset) { c->flags |= CLIENT_SLAVE; if (c->flags & CLIENT_REPL_MAIN_CHANNEL && lookupRdbClientByID(c->associated_rdb_client_id)) { c->replstate = SLAVE_STATE_BG_RDB_LOAD; - removeSlaveFromPsyncWaitingRax(c); + removeReplicaFromPsyncWaitingRax(c); } else { c->replstate = SLAVE_STATE_ONLINE; } @@ -4750,7 +4750,7 @@ void replicationCron(void) { if (listLength(server.repl_buffer_blocks) > 0) { replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks)); serverAssert(o->refcount > 0 && - o->refcount <= (int)listLength(server.slaves) + 1 + (int)raxSize(server.slaves_waiting_psync)); + o->refcount <= (int)listLength(server.slaves) + 1 + (int)raxSize(server.replicas_waiting_psync)); } /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ diff --git a/src/server.c b/src/server.c index c30c5693b7..47f4bd5fb2 100644 --- a/src/server.c +++ b/src/server.c @@ -2638,7 +2638,7 @@ void initServer(void) { server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); - server.slaves_waiting_psync = raxNew(); + server.replicas_waiting_psync = raxNew(); server.wait_before_rdb_client_free = DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE; server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); @@ -6071,13 +6071,13 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { slaveid,slaveip,slave->slave_listening_port,state, slave->repl_ack_off, lag, slave->flags & CLIENT_REPL_RDB_CHANNEL ? "rdb-conn": - slave->replstate == SLAVE_STATE_BG_RDB_LOAD ? "main-conn": "normal-slave"); + slave->replstate == SLAVE_STATE_BG_RDB_LOAD ? "main-conn": "replica"); slaveid++; } } /* clang-format off */ info = sdscatprintf(info, FMTARGS( - "slaves_waiting_psync:%llu\r\n", (unsigned long long)raxSize(server.slaves_waiting_psync), + "replicas_waiting_psync:%llu\r\n", (unsigned long long)raxSize(server.replicas_waiting_psync), "master_failover_state:%s\r\n", getFailoverStateString(), "master_replid:%s\r\n", server.replid, "master_replid2:%s\r\n", server.replid2, diff --git a/src/server.h b/src/server.h index 783b9bd386..84f833757c 100644 --- a/src/server.h +++ b/src/server.h @@ -1657,7 +1657,7 @@ struct valkeyServer { list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ - rax *slaves_waiting_psync; /* Radix tree using rdb-client id as keys and rdb-client as values. + rax *replicas_waiting_psync; /* Radix tree using rdb-client id as keys and rdb-client as values. * This rax contains slaves for the period from the beginning of * their RDB connection to the end of their main connection's * partial synchronization. */ @@ -2931,7 +2931,7 @@ void abortFailover(const char *err); const char *getFailoverStateString(void); void abortRdbConnectionSync(void); int sendCurrentOffsetToReplica(client* replica); -void addSlaveToPsyncWaitingRax(client* slave); +void addReplicaToPsyncWaitingRax(client* slave); /* Generic persistence functions */ void startLoadingFile(size_t size, char* filename, int rdbflags); diff --git a/tests/integration/repl-rdb-channel.tcl b/tests/integration/repl-rdb-channel.tcl index 4b9c4ef5a7..958a0b7dee 100644 --- a/tests/integration/repl-rdb-channel.tcl +++ b/tests/integration/repl-rdb-channel.tcl @@ -475,8 +475,8 @@ start_server {tags {"repl rdb-channel external:skip"}} { $replica2 debug sleep-after-fork [expr {2 * [expr {10 ** 6}]}] test "Test rdb-channel connection peering - start with empty backlog (retrospect)" { $replica1 slaveof $master_host $master_port - set res [wait_for_log_messages 0 {"*Add slave * repl-backlog is empty*"} $loglines 2000 1] - set res [wait_for_log_messages 0 {"*Retrospect attach slave*"} $loglines 2000 1] + set res [wait_for_log_messages 0 {"*Add replica * repl-backlog is empty*"} $loglines 2000 1] + set res [wait_for_log_messages 0 {"*Retrospect attach replica*"} $loglines 2000 1] set loglines [lindex $res 1] incr $loglines verify_replica_online $master 0 700 @@ -486,12 +486,12 @@ start_server {tags {"repl rdb-channel external:skip"}} { fail "Replica is not synced" } $replica1 slaveof no one - assert [string match *slaves_waiting_psync:0* [$master info replication]] + assert [string match *replicas_waiting_psync:0* [$master info replication]] } test "Test rdb-channel connection peering - start with backlog" { $replica2 slaveof $master_host $master_port - set res [wait_for_log_messages 0 {"*Add slave * with repl-backlog tail*"} $loglines 2000 1] + set res [wait_for_log_messages 0 {"*Add replica * with repl-backlog tail*"} $loglines 2000 1] set loglines [lindex $res 1] incr $loglines verify_replica_online $master 0 700 @@ -500,7 +500,7 @@ start_server {tags {"repl rdb-channel external:skip"}} { } else { fail "Replica is not synced" } - assert [string match *slaves_waiting_psync:0* [$master info replication]] + assert [string match *replicas_waiting_psync:0* [$master info replication]] } stop_write_load $load_handle0 @@ -555,14 +555,14 @@ start_server {tags {"repl rdb-channel external:skip"}} { # Force the replica to sleep for 3 seconds so the master main process will wake up, while the replica is unresponsive. set sleep_handle [start_bg_server_sleep $replica_host $replica_port 3] wait_for_condition 50 100 { - [string match {*slaves_waiting_psync:1*} [$master info replication]] + [string match {*replicas_waiting_psync:1*} [$master info replication]] } else { fail "Master freed RDB client before psync was established" } verify_replica_online $master 0 500 wait_for_condition 50 100 { - [string match {*slaves_waiting_psync:0*} [$master info replication]] + [string match {*replicas_waiting_psync:0*} [$master info replication]] } else { fail "Master did not free repl buf block after psync establishment" } @@ -594,7 +594,7 @@ start_server {tags {"repl rdb-channel external:skip"}} { # We expect the grace time to be over before the replica wake up, so sync will fail. set sleep_handle [start_bg_server_sleep $replica_host $replica_port 8] wait_for_condition 50 100 { - [string match {*slaves_waiting_psync:1*} [$master info replication]] + [string match {*replicas_waiting_psync:1*} [$master info replication]] } else { fail "Master should wait before freeing repl block" } @@ -605,7 +605,7 @@ start_server {tags {"repl rdb-channel external:skip"}} { # Should succeed on retry verify_replica_online $master 0 500 wait_for_condition 50 100 { - [string match {*slaves_waiting_psync:0*} [$master info replication]] + [string match {*replicas_waiting_psync:0*} [$master info replication]] } else { fail "Master did not free repl buf block after psync establishment" } @@ -653,7 +653,7 @@ start_server {tags {"repl rdb-channel external:skip"}} { set sleep_handle [start_bg_server_sleep $replica_host $replica_port 5] wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 2000 1 wait_for_condition 50 100 { - [string match {*slaves_waiting_psync:0*} [$master info replication]] + [string match {*replicas_waiting_psync:0*} [$master info replication]] } else { fail "Master did not free repl buf block after sync failure" } @@ -675,7 +675,7 @@ start_server {tags {"repl rdb-channel external:skip"}} { wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 2000 1 wait_for_condition 50 100 { - [string match {*slaves_waiting_psync:0*} [$master info replication]] + [string match {*replicas_waiting_psync:0*} [$master info replication]] } else { fail "Master did not free repl buf block after sync failure" } @@ -782,4 +782,4 @@ start_server {tags {"repl rdb-channel external:skip"}} { } } } -} \ No newline at end of file +} diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index c24b9b0d66..36da46c3cb 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -378,7 +378,7 @@ foreach mdl {no yes} rdbchannel {no yes} { wait_for_ofs_sync $master [lindex $slaves 1] wait_for_ofs_sync $master [lindex $slaves 2] - assert [string match *slaves_waiting_psync:0* [$master info replication]] + assert [string match *replicas_waiting_psync:0* [$master info replication]] # Check digests set digest [$master debug digest]