Skip to content

Commit

Permalink
RDMA: Fix dead loop when transfer large data (20KB) (valkey-io#1386)
Browse files Browse the repository at this point in the history
Determine the status of the Client when attempting to read data. If
state=CLIENT_COMPLETED_IO, no read attempt is made and I/O operations on
the Client are rescheduled by the main thread.

> And 20474 Byte = PROTO_IOBUF_LEN(16KB) + SDS_HDR_VAR(16, s)(4090 Byte)

Fixes valkey-io#1385

---------

Signed-off-by: fengquyoumo <[email protected]>
  • Loading branch information
fengquyoumo authored Dec 5, 2024
1 parent 71560a2 commit 6b3e122
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ typedef enum ValkeyRdmaOpcode {
#define VALKEY_RDMA_INVALID_OPCODE 0xffff
#define VALKEY_RDMA_KEEPALIVE_MS 3000

#define RDMA_CONN_FLAG_POSTPONE_UPDATE_STATE (1 << 0)

typedef struct rdma_connection {
connection c;
struct rdma_cm_id *cm_id;
int flags;
int last_errno;
listNode *pending_list_node;
} rdma_connection;
Expand Down Expand Up @@ -693,7 +696,7 @@ static void connRdmaEventHandler(struct aeEventLoop *el, int fd, void *clientDat
}

/* uplayer should read all */
while (ctx->rx.pos < ctx->rx.offset) {
while (!(rdma_conn->flags & RDMA_CONN_FLAG_POSTPONE_UPDATE_STATE) && ctx->rx.pos < ctx->rx.offset) {
if (conn->read_handler && (callHandler(conn, conn->read_handler) == C_ERR)) {
return;
}
Expand All @@ -705,7 +708,7 @@ static void connRdmaEventHandler(struct aeEventLoop *el, int fd, void *clientDat
}

/* RDMA comp channel has no POLLOUT event, try to send remaining buffer */
if ((ctx->tx.offset < ctx->tx.length) && conn->write_handler) {
if (!(rdma_conn->flags & RDMA_CONN_FLAG_POSTPONE_UPDATE_STATE) && ctx->tx.offset < ctx->tx.length && conn->write_handler) {
callHandler(conn, conn->write_handler);
}
}
Expand Down Expand Up @@ -884,6 +887,9 @@ static void connRdmaAcceptHandler(aeEventLoop *el, int fd, void *privdata, int m
}

static int connRdmaSetRwHandler(connection *conn) {
rdma_connection *rdma_conn = (rdma_connection *)conn;
if (rdma_conn->flags & RDMA_CONN_FLAG_POSTPONE_UPDATE_STATE) return C_OK;

/* IB channel only has POLLIN event */
if (conn->read_handler || conn->write_handler) {
if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) {
Expand Down Expand Up @@ -1721,12 +1727,12 @@ static int rdmaProcessPendingData(void) {
listNode *ln;
rdma_connection *rdma_conn;
connection *conn;
int processed;
int processed = 0;

processed = listLength(pending_list);
listRewind(pending_list, &li);
while ((ln = listNext(&li))) {
rdma_conn = listNodeValue(ln);
if (rdma_conn->flags & RDMA_CONN_FLAG_POSTPONE_UPDATE_STATE) continue;
conn = &rdma_conn->c;

/* a connection can be disconnected by remote peer, CM event mark state as CONN_STATE_CLOSED, kick connection
Expand All @@ -1741,15 +1747,32 @@ static int rdmaProcessPendingData(void) {
callHandler(conn, conn->write_handler);
}

++processed;
continue;
}

connRdmaEventHandler(NULL, -1, rdma_conn, 0);
++processed;
}

return processed;
}

static void postPoneUpdateRdmaState(struct connection *conn, int postpone) {
rdma_connection *rdma_conn = (rdma_connection *)conn;
if (postpone) {
rdma_conn->flags |= RDMA_CONN_FLAG_POSTPONE_UPDATE_STATE;
} else {
rdma_conn->flags &= ~RDMA_CONN_FLAG_POSTPONE_UPDATE_STATE;
}
}

static void updateRdmaState(struct connection *conn) {
rdma_connection *rdma_conn = (rdma_connection *)conn;
connRdmaSetRwHandler(conn);
connRdmaEventHandler(NULL, -1, rdma_conn, 0);
}

static ConnectionType CT_RDMA = {
/* connection type */
.get_type = connRdmaGetType,
Expand Down Expand Up @@ -1792,6 +1815,8 @@ static ConnectionType CT_RDMA = {
/* pending data */
.has_pending_data = rdmaHasPendingData,
.process_pending_data = rdmaProcessPendingData,
.postpone_update_state = postPoneUpdateRdmaState,
.update_state = updateRdmaState,
};

ConnectionType *connectionTypeRdma(void) {
Expand Down

0 comments on commit 6b3e122

Please sign in to comment.