From c02a5099ab3d4aaca33765925ed4334c86af9ba9 Mon Sep 17 00:00:00 2001 From: naglera Date: Wed, 10 Jul 2024 13:35:19 +0000 Subject: [PATCH] merge fixes Signed-off-by: naglera --- src/networking.c | 381 +--------------------------------------------- src/replication.c | 16 +- src/server.c | 2 +- src/server.h | 32 ++-- 4 files changed, 27 insertions(+), 404 deletions(-) diff --git a/src/networking.c b/src/networking.c index 42b9efb555..dbf80f5b91 100644 --- a/src/networking.c +++ b/src/networking.c @@ -121,7 +121,7 @@ int authRequired(client *c) { static inline int isReplicaReadyForReplData(client *replica) { return (replica->repl_state == REPLICA_STATE_ONLINE || - replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) && !(replica->flags & CLIENT_CLOSE_ASAP); + replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) && !(replica->flag.close_asap); } client *createClient(connection *conn) { @@ -1645,7 +1645,7 @@ void freeClient(client *c) { /* Log link disconnection with replica */ if (getClientType(c) == CLIENT_TYPE_REPLICA) { serverLog(LL_NOTICE, - c->flags & CLIENT_REPL_RDB_CONN ? "Replica %s rdb connection disconnected." + c->flag.repl_rdb_conn ? "Replica %s rdb connection disconnected." : "Connection with replica %s lost.", replicationGetReplicaName(c)); } @@ -1780,379 +1780,6 @@ void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...) { freeClientAsync(c); } -/* Perform processing of the client before moving on to processing the next client - * this is useful for performing operations that affect the global state but can't - * wait until we're done with all clients. In other words can't wait until beforeSleep() - * return C_ERR in case client is no longer valid after call. - * The input client argument: c, may be NULL in case the previous client was - * freed before the call. */ -int beforeNextClient(client *c) { - /* Notice, this code is also called from 'processUnblockedClients'. - * But in case of a module blocked client (see RM_Call 'K' flag) we do not reach this code path. - * So whenever we change the code here we need to consider if we need this change on module - * blocked client as well */ - - /* Skip the client processing if we're in an IO thread, in that case we'll perform - this operation later (this function is called again) in the fan-in stage of the threading mechanism */ - if (io_threads_op != IO_THREADS_OP_IDLE) return C_OK; - /* Handle async frees */ - /* Note: this doesn't make the server.clients_to_close list redundant because of - * cases where we want an async free of a client other than myself. For example - * in ACL modifications we disconnect clients authenticated to non-existent - * users (see ACL LOAD). */ - if (c && (c->flags & CLIENT_CLOSE_ASAP)) { - freeClient(c); - return C_ERR; - } - return C_OK; -} - -/* Free the clients marked as CLOSE_ASAP, return the number of clients - * freed. */ -int freeClientsInAsyncFreeQueue(void) { - int freed = 0; - listIter li; - listNode *ln; - - listRewind(server.clients_to_close, &li); - while ((ln = listNext(&li)) != NULL) { - client *c = listNodeValue(ln); - - if (c->flags & CLIENT_PROTECTED_RDB_CONN) { - /* Check if it's safe to remove RDB connection protection during synchronization - * The primary gives a grace period before freeing this client because - * it serves as a reference to the first required replication data block for - * this replica */ - if (!c->rdb_client_disconnect_time) { - c->rdb_client_disconnect_time = server.unixtime; - serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, - replicationGetReplicaName(c), server.wait_before_rdb_client_free); - continue; - } - if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) { - serverLog(LL_NOTICE, - "Replica main connection failed to establish PSYNC within the grace period (%ld seconds). " - "Freeing RDB client %llu.", - (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); - c->flags &= ~CLIENT_PROTECTED_RDB_CONN; - } - } - - if (c->flags & CLIENT_PROTECTED) continue; - - c->flags &= ~CLIENT_CLOSE_ASAP; - freeClient(c); - listDelNode(server.clients_to_close, ln); - freed++; - } - return freed; -} - -/* Return a client by ID, or NULL if the client ID is not in the set - * of registered clients. Note that "fake clients", created with -1 as FD, - * are not registered clients. */ -client *lookupClientByID(uint64_t id) { - id = htonu64(id); - void *c = NULL; - raxFind(server.clients_index, (unsigned char *)&id, sizeof(id), &c); - return c; -} - -/* This function should be called from _writeToClient when the reply list is not empty, - * it gathers the scattered buffers from reply list and sends them away with connWritev. - * If we write successfully, it returns C_OK, otherwise, C_ERR is returned, - * and 'nwritten' is an output parameter, it means how many bytes server write - * to client. */ -static int _writevToClient(client *c, ssize_t *nwritten) { - int iovcnt = 0; - int iovmax = min(IOV_MAX, c->conn->iovcnt); - struct iovec iov[iovmax]; - size_t iov_bytes_len = 0; - /* If the static reply buffer is not empty, - * add it to the iov array for writev() as well. */ - if (c->bufpos > 0) { - iov[iovcnt].iov_base = c->buf + c->sentlen; - iov[iovcnt].iov_len = c->bufpos - c->sentlen; - iov_bytes_len += iov[iovcnt++].iov_len; - } - /* The first node of reply list might be incomplete from the last call, - * thus it needs to be calibrated to get the actual data address and length. */ - size_t offset = c->bufpos > 0 ? 0 : c->sentlen; - listIter iter; - listNode *next; - clientReplyBlock *o; - listRewind(c->reply, &iter); - while ((next = listNext(&iter)) && iovcnt < iovmax && iov_bytes_len < NET_MAX_WRITES_PER_EVENT) { - o = listNodeValue(next); - if (o->used == 0) { /* empty node, just release it and skip. */ - c->reply_bytes -= o->size; - listDelNode(c->reply, next); - offset = 0; - continue; - } - - iov[iovcnt].iov_base = o->buf + offset; - iov[iovcnt].iov_len = o->used - offset; - iov_bytes_len += iov[iovcnt++].iov_len; - offset = 0; - } - if (iovcnt == 0) return C_OK; - *nwritten = connWritev(c->conn, iov, iovcnt); - if (*nwritten <= 0) return C_ERR; - - /* Locate the new node which has leftover data and - * release all nodes in front of it. */ - ssize_t remaining = *nwritten; - if (c->bufpos > 0) { /* deal with static reply buffer first. */ - int buf_len = c->bufpos - c->sentlen; - c->sentlen += remaining; - /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ - if (remaining >= buf_len) { - c->bufpos = 0; - c->sentlen = 0; - } - remaining -= buf_len; - } - listRewind(c->reply, &iter); - while (remaining > 0) { - next = listNext(&iter); - o = listNodeValue(next); - if (remaining < (ssize_t)(o->used - c->sentlen)) { - c->sentlen += remaining; - break; - } - remaining -= (ssize_t)(o->used - c->sentlen); - c->reply_bytes -= o->size; - listDelNode(c->reply, next); - c->sentlen = 0; - } - - return C_OK; -} - -/* This function does actual writing output buffers to different types of - * clients, it is called by writeToClient. - * If we write successfully, it returns C_OK, otherwise, C_ERR is returned, - * and 'nwritten' is an output parameter, it means how many bytes server write - * to client. */ -int _writeToClient(client *c, ssize_t *nwritten) { - *nwritten = 0; - if (getClientType(c) == CLIENT_TYPE_REPLICA) { - serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); - - replBufBlock *o = listNodeValue(c->ref_repl_buf_node); - serverAssert(o->used >= c->ref_block_pos); - /* Send current block if it is not fully sent. */ - if (o->used > c->ref_block_pos) { - *nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos); - if (*nwritten <= 0) return C_ERR; - c->ref_block_pos += *nwritten; - } - - /* If we fully sent the object on head, go to the next one. */ - listNode *next = listNextNode(c->ref_repl_buf_node); - if (next && c->ref_block_pos == o->used) { - o->refcount--; - ((replBufBlock *)(listNodeValue(next)))->refcount++; - c->ref_repl_buf_node = next; - c->ref_block_pos = 0; - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); - } - return C_OK; - } - - /* When the reply list is not empty, it's better to use writev to save us some - * system calls and TCP packets. */ - if (listLength(c->reply) > 0) { - int ret = _writevToClient(c, nwritten); - if (ret != C_OK) return ret; - - /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ - if (listLength(c->reply) == 0) serverAssert(c->reply_bytes == 0); - } else if (c->bufpos > 0) { - *nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen); - if (*nwritten <= 0) return C_ERR; - c->sentlen += *nwritten; - - /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ - if ((int)c->sentlen == c->bufpos) { - c->bufpos = 0; - c->sentlen = 0; - } - } - - return C_OK; -} - -/* Write data in output buffers to client. Return C_OK if the client - * is still valid after the call, C_ERR if it was freed because of some - * error. If handler_installed is set, it will attempt to clear the - * write event. - * - * This function is called by threads, but always with handler_installed - * set to 0. So when handler_installed is set to 0 the function must be - * thread safe. */ -int writeToClient(client *c, int handler_installed) { - /* Update total number of writes on server */ - atomic_fetch_add_explicit(&server.stat_total_writes_processed, 1, memory_order_relaxed); - - ssize_t nwritten = 0, totwritten = 0; - - while (clientHasPendingReplies(c)) { - int ret = _writeToClient(c, &nwritten); - if (ret == C_ERR) break; - totwritten += nwritten; - /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT - * bytes, in a single threaded server it's a good idea to serve - * other clients as well, even if a very large request comes from - * super fast link that is always able to accept data (in real world - * scenario think about 'KEYS *' against the loopback interface). - * - * However if we are over the maxmemory limit we ignore that and - * just deliver as much data as it is possible to deliver. - * - * Moreover, we also send as much as possible if the client is - * a replica or a monitor (otherwise, on high-speed traffic, the - * replication/output buffer will grow indefinitely) */ - if (totwritten > NET_MAX_WRITES_PER_EVENT && - (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_REPLICA)) - break; - } - - if (getClientType(c) == CLIENT_TYPE_REPLICA) { - atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, totwritten, memory_order_relaxed); - } else { - atomic_fetch_add_explicit(&server.stat_net_output_bytes, totwritten, memory_order_relaxed); - } - c->net_output_bytes += totwritten; - - if (nwritten == -1) { - if (connGetState(c->conn) != CONN_STATE_CONNECTED) { - serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn)); - freeClientAsync(c); - return C_ERR; - } - } - if (totwritten > 0) { - /* For clients representing primaries we don't count sending data - * as an interaction, since we always send REPLCONF ACK commands - * that take some time to just fill the socket output buffer. - * We just rely on data / pings received for timeout detection. */ - if (!(c->flags & CLIENT_PRIMARY)) c->last_interaction = server.unixtime; - } - if (!clientHasPendingReplies(c)) { - c->sentlen = 0; - /* Note that writeToClient() is called in a threaded way, but - * aeDeleteFileEvent() is not thread safe: however writeToClient() - * is always called with handler_installed set to 0 from threads - * so we are fine. */ - if (handler_installed) { - serverAssert(io_threads_op == IO_THREADS_OP_IDLE); - connSetWriteHandler(c->conn, NULL); - } - - /* Close connection after entire reply has been sent. */ - if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { - freeClientAsync(c); - return C_ERR; - } - } - /* Update client's memory usage after writing. - * Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in - * handleClientsWithPendingWritesUsingThreads(). */ - if (io_threads_op == IO_THREADS_OP_IDLE) updateClientMemUsageAndBucket(c); - return C_OK; -} - -/* Write event handler. Just send data to the client. */ -void sendReplyToClient(connection *conn) { - client *c = connGetPrivateData(conn); - writeToClient(c, 1); -} - -/* This function is called just before entering the event loop, in the hope - * we can just write the replies to the client output buffer without any - * need to use a syscall in order to install the writable event handler, - * get it called, and so forth. */ -int handleClientsWithPendingWrites(void) { - listIter li; - listNode *ln; - int processed = listLength(server.clients_pending_write); - - listRewind(server.clients_pending_write, &li); - while ((ln = listNext(&li))) { - client *c = listNodeValue(ln); - c->flags &= ~CLIENT_PENDING_WRITE; - listUnlinkNode(server.clients_pending_write, ln); - - /* If a client is protected, don't do anything, - * that may trigger write error or recreate handler. */ - if (c->flags & CLIENT_PROTECTED) continue; - - /* Don't write to clients that are going to be closed anyway. */ - if (c->flags & CLIENT_CLOSE_ASAP) continue; - - /* Try to write buffers to the client socket. */ - if (writeToClient(c, 0) == C_ERR) continue; - - /* If after the synchronous writes above we still have data to - * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c)) { - installClientWriteHandler(c); - } - } - return processed; -} - -/* resetClient prepare the client to process the next command */ -void resetClient(client *c) { - serverCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; - - freeClientArgv(c); - c->cur_script = NULL; - c->reqtype = 0; - c->multibulklen = 0; - c->bulklen = -1; - c->slot = -1; - c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_REPLICATION_DONE); - - /* Make sure the duration has been recorded to some command. */ - serverAssert(c->duration == 0); -#ifdef LOG_REQ_RES - reqresReset(c, 1); -#endif - - if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); - c->deferred_reply_errors = NULL; - - /* We clear the ASKING flag as well if we are not inside a MULTI, and - * if what we just executed is not the ASKING command itself. */ - if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) c->flags &= ~CLIENT_ASKING; - - /* We do the same for the CACHING command as well. It also affects - * the next command or transaction executed, in a way very similar - * to ASKING. */ - if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand) c->flags &= ~CLIENT_TRACKING_CACHING; - - /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply - * to the next command will be sent, but set the flag if the command - * we just processed was "CLIENT REPLY SKIP". */ - c->flags &= ~CLIENT_REPLY_SKIP; - if (c->flags & CLIENT_REPLY_SKIP_NEXT) { - c->flags |= CLIENT_REPLY_SKIP; - c->flags &= ~CLIENT_REPLY_SKIP_NEXT; - } -} - -/* Initializes the shared query buffer to a new sds with the default capacity */ -void initSharedQueryBuf(void) { - thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); - sdsclear(thread_shared_qb); -} - /* Resets the shared query buffer used by the given client. * If any data remained in the buffer, the client will take ownership of the buffer * and a new empty buffer will be allocated for the shared buffer. */ @@ -4695,7 +4322,7 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) { if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(), c); /* Remove RDB connection protection on COB overrun */ - c.flag->protected_rdb_conn = 0; + c->flag.protected_rdb_conn = 0; if (async) { freeClientAsync(c); @@ -4742,7 +4369,7 @@ void flushReplicasOutputBuffers(void) { if (isReplicaReadyForReplData(replica) && !(replica->flag.close_asap) && can_receive_writes && !replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) { - writeToClient(replica, 0); + writeToClient(replica); } } } diff --git a/src/replication.c b/src/replication.c index 4cbf2e236b..a949b57fb6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -232,7 +232,7 @@ void addRdbReplicaToPsyncWait(client *rdb_replica) { (unsigned long long)rdb_replica->id, tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); rdb_replica->ref_repl_buf_node = tail ? ln : NULL; /* Prevent rdb client from being freed before psync is established. */ - rdb_replica->flags |= CLIENT_PROTECTED_RDB_CONN; + rdb_replica->flag.protected_rdb_conn = 1; uint64_t id = htonu64(rdb_replica->id); raxInsert(server.replicas_waiting_psync, (unsigned char *)&id, sizeof(id), rdb_replica, NULL); } @@ -269,7 +269,7 @@ void removeReplicaFromPsyncWait(client *replica) { o->refcount--; } rdb_replica->ref_repl_buf_node = NULL; - rdb_replica->flags &= ~CLIENT_PROTECTED_RDB_CONN; + rdb_replica->flag.protected_rdb_conn = 0; serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id, o ? "ref count decreased" : "doesn't exist"); @@ -386,7 +386,7 @@ void incrementalTrimReplicationBacklog(size_t max_blocks) { /* Free replication buffer blocks that are referenced by this client. */ void freeReplicaReferencedReplBuffer(client *replica) { - if (replica->flags & CLIENT_REPL_RDB_CONN) { + if (replica->flag.repl_rdb_conn) { uint64_t rdb_cid = htonu64(replica->id); if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&rdb_cid, sizeof(rdb_cid), NULL)) { serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.", @@ -479,7 +479,7 @@ void feedReplicationBuffer(char *s, size_t len) { listRewind(server.replicas, &li); while ((ln = listNext(&li))) { client *replica = ln->value; - if (!canFeedReplicaReplBuffer(replica) && !(replica->flags & CLIENT_PROTECTED_RDB_CONN)) continue; + if (!canFeedReplicaReplBuffer(replica) && !(replica->flag.protected_rdb_conn)) continue; /* Update shared replication buffer start position. */ if (replica->ref_repl_buf_node == NULL) { replica->ref_repl_buf_node = start_node; @@ -1382,10 +1382,10 @@ void replconfCommand(client *c) { return; } if (start_with_offset == 1) { - c->flags |= CLIENT_REPL_RDB_CONN; + c->flag.repl_rdb_conn = 1; c->replica_req |= REPLICA_REQ_RDB_CONN; } else { - c->flags &= ~CLIENT_REPL_RDB_CONN; + c->flag.repl_rdb_conn = 0; c->replica_req &= ~REPLICA_REQ_RDB_CONN; } } else if (!strcasecmp(c->argv[j]->ptr, "set-rdb-client-id")) { @@ -2777,7 +2777,7 @@ void bufferReplData(connection *conn) { remaining_bytes = readIntoReplDataBlock(conn, tail, remaining_bytes); } if (readlen && remaining_bytes == 0) { - if (server.pending_repl_data.len > server.client_obuf_limits[CLIENT_REPLICA].hard_limit_bytes) { + if (server.pending_repl_data.len > server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes) { serverLog(LL_NOTICE, "Replication buffer limit reached, stopping buffering."); /* Stop accumulating primary commands. */ connSetReadHandler(conn, NULL); @@ -2815,7 +2815,7 @@ void bufferReplData(connection *conn) { /* Replication: Replica side. * Streams accumulated replication data into the database while freeing read nodes */ int streamReplDataBufToDb(client *c) { - serverAssert(c->flags & CLIENT_PRIMARY); + serverAssert(c->flag.primary); blockingOperationStarts(); size_t used, offset = 0; listNode *cur = NULL; diff --git a/src/server.c b/src/server.c index 33864323b6..b57ed94a87 100644 --- a/src/server.c +++ b/src/server.c @@ -5813,7 +5813,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "offset=%lld,lag=%ld,type=%s\r\n", replica_id, replica_ip, replica->replica_listening_port, state, replica->repl_ack_off, lag, - replica->flags & CLIENT_REPL_RDB_CONN ? "rdb-conn" + replica->flag.repl_rdb_conn ? "rdb-conn" : replica->repl_state == REPLICA_STATE_BG_RDB_LOAD ? "main-conn" : "replica"); replica_id++; diff --git a/src/server.h b/src/server.h index c4772ffd9f..98aa91e943 100644 --- a/src/server.h +++ b/src/server.h @@ -351,23 +351,6 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* Client capabilities */ #define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */ -#define CLIENT_REPL_RDB_CONN \ - (1ULL << 53) /* Dual connection sync: track a connection \ - which is used for rdb snapshot */ -#define CLIENT_PROTECTED_RDB_CONN \ - (1ULL << 54) /* Dual connection sync: Protects the RDB client from premature \ - * release during full sync. This flag is used to ensure that the RDB client, which \ - * references the first replication data block required by the replica, is not \ - * released prematurely. Protecting the client is crucial for prevention of \ - * synchronization failures: \ - * If the RDB client is released before the replica initiates PSYNC, the primary \ - * will reduce the reference count (o->refcount) of the block needed by the replica. \ - * This could potentially lead to the removal of the required data block, resulting \ - * in synchronization failures. Such failures could occur even in scenarios where \ - * the replica only needs an additional 4KB beyond the minimum size of the repl_backlog. \ - * By using this flag, we ensure that the RDB client remains intact until the replica \ - * has successfully initiated PSYNC. */ - /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ typedef enum blocking_type { @@ -1231,7 +1214,20 @@ typedef struct ClientFlags { uint64_t reprocessing_command : 1; /* The client is re-processing the command. */ uint64_t replication_done : 1; /* Indicate that replication has been done on the client */ uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */ - uint64_t reserved : 9; /* Reserved for future use */ + uint64_t protected_rdb_conn : 1; /* Dual connection sync: Protects the RDB client from premature \ + * release during full sync. This flag is used to ensure that the RDB client, which \ + * references the first replication data block required by the replica, is not \ + * released prematurely. Protecting the client is crucial for prevention of \ + * synchronization failures: \ + * If the RDB client is released before the replica initiates PSYNC, the primary \ + * will reduce the reference count (o->refcount) of the block needed by the replica. \ + * This could potentially lead to the removal of the required data block, resulting \ + * in synchronization failures. Such failures could occur even in scenarios where \ + * the replica only needs an additional 4KB beyond the minimum size of the repl_backlog. \ + * By using this flag, we ensure that the RDB client remains intact until the replica \ + * has successfully initiated PSYNC. */ + uint64_t repl_rdb_conn : 1; /* Dual connection sync: track a connection which is used for rdb snapshot */ + uint64_t reserved : 7; /* Reserved for future use */ } ClientFlags; typedef struct client {