Skip to content

Commit

Permalink
Adding Com Channel fast data path capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
EldarShalev committed Feb 4, 2024
1 parent 2ccd99a commit 25d0d73
Show file tree
Hide file tree
Showing 7 changed files with 695 additions and 64 deletions.
28 changes: 26 additions & 2 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -888,11 +888,35 @@ int Client<IoType, SwitchCycleDuration, PongModeCare>::initBeforeLoop() {
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (s_user_params.doca_comm_channel) {
// Waiting for connection
struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)data->doca_cc_ctx;
while (ctx_client->state != CC_CONNECTED) {
doca_error_t result;
while (data->doca_cc_ctx->state != CC_CONNECTED) {
doca_pe_progress(s_user_params.pe);
}
log_dbg("[fd=%d] Client connected successfully", ifd);
if (s_user_params.doca_cc_fifo) {
struct cc_local_mem_bufs *local_producer_mem = &(data->doca_cc_ctx->ctx_fifo.producer_mem);
// Buf is needed for registering with memrange
local_producer_mem->mem = m_pMsgRequest->getBuf();
result = cc_init_local_mem_bufs(local_producer_mem, data->doca_cc_ctx);
if (result != DOCA_SUCCESS) {
DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result));
return result;
}
log_dbg("[fd=%d] Init producer memory succeeded", ifd);
while (data->doca_cc_ctx->ctx_fifo.fifo_connection_state != CC_FIFO_CONNECTED) {
doca_pe_progress(s_user_params.pe);
}
if (!g_pApp->m_const_params.b_client_ping_pong && !g_pApp->m_const_params.b_stream) {
struct doca_ctx *ctx;
enum doca_ctx_states state;
ctx = doca_cc_consumer_as_ctx(data->doca_cc_ctx->ctx_fifo.consumer);
doca_ctx_get_state(ctx, &state);
while (state != DOCA_CTX_STATE_RUNNING) {
doca_pe_progress(s_user_params.pe_underload);
doca_ctx_get_state(ctx, &state);
}
}
}
}
// Avoid Client binding in Com Channel mode
if (p_client_bind_addr->addr.sa_family != AF_UNSPEC && !s_user_params.doca_comm_channel) {
Expand Down
121 changes: 92 additions & 29 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,51 +115,107 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes,
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (s_user_params.doca_comm_channel) {
doca_error_t doca_error;
int result;
struct doca_cc_producer_send_task *producer_task;
struct doca_cc_send_task *task;
struct doca_buf *doca_buf;
struct doca_task *task_obj;
struct timespec ts = {
.tv_sec = 0,
.tv_nsec = NANOS_10_X_1000,
};
struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx;
do {
if (s_user_params.mode == MODE_SERVER) {
struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx;
doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf,
nbytes, &task);
} else { // MODE_CLIENT
struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx;
doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf,
nbytes, &task);
}
if (doca_error == DOCA_ERROR_NO_MEMORY) {
// Queue is full of tasks, need to free tasks with completion callback
doca_pe_progress(s_user_params.pe);
}
} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY);

if (doca_error != DOCA_SUCCESS) {
if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked
errno = EAGAIN;
ret = -1;
if (s_user_params.doca_cc_fifo) {
do {
doca_error = doca_buf_inventory_buf_get_by_data(ctx->ctx_fifo.producer_mem.buf_inv, ctx->ctx_fifo.producer_mem.mmap,
ctx->ctx_fifo.producer_mem.mem, nbytes, &doca_buf);
if (doca_error == DOCA_ERROR_NO_MEMORY) {
// Queue is full of tasks, need to free tasks with completion callback
doca_pe_progress(s_user_params.pe);
}
} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY);
if (doca_error != DOCA_SUCCESS) {
if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked
errno = EAGAIN;
ret = -1;
} else {
log_err("Failed to get doca buf from producer mmap with error = %s", doca_error_get_name(doca_error));
ret = RET_SOCKET_SHUTDOWN;
}
} else {
log_err("Doca task_alloc_init failed");
ret = RET_SOCKET_SHUTDOWN;
do {
doca_error = doca_cc_producer_send_task_alloc_init(ctx->ctx_fifo.producer, doca_buf,
ctx->ctx_fifo.remote_consumer_id, &producer_task);
if (doca_error == DOCA_ERROR_NO_MEMORY) {
// Queue is full of tasks, need to free tasks with completion callback and buffer decrease
doca_pe_progress(s_user_params.pe);
}
} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY);
if (doca_error != DOCA_SUCCESS) {
if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked
(void)doca_buf_dec_refcount(doca_buf, NULL);
errno = EAGAIN;
ret = -1;
} else {
log_err("Doca task_alloc_init failed");
ret = RET_SOCKET_SHUTDOWN;
}
}
}
} else { // task_alloc_init succeeded
task_obj = doca_cc_send_task_as_task(task);
} else { // Not doca fast path
do {
doca_error = doca_task_submit(task_obj);
if (doca_error == DOCA_ERROR_AGAIN) {
if (s_user_params.mode == MODE_SERVER) {
struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx;
doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf,
nbytes, &task);
} else { // MODE_CLIENT
struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx;
doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf,
nbytes, &task);
}
if (doca_error == DOCA_ERROR_NO_MEMORY) {
// Queue is full of tasks, need to free tasks with completion callback
doca_pe_progress(s_user_params.pe);
}
} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN);

} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY);
if (doca_error != DOCA_SUCCESS) {
if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked
errno = EAGAIN;
ret = -1;
} else {
log_err("Doca task_alloc_init failed");
ret = RET_SOCKET_SHUTDOWN;
}
}
}
if (doca_error == DOCA_SUCCESS) { // task_alloc_init succeeded
if (s_user_params.doca_cc_fifo) {
task_obj = doca_cc_producer_send_task_as_task(producer_task);
do {
doca_error = doca_task_submit(task_obj);
// doca buf_get_by_data and task_alloc_init succeeded, need to submit until no AGAIN
if (doca_error == DOCA_ERROR_AGAIN) {
nanosleep(&ts, &ts);
}
} while (doca_error == DOCA_ERROR_AGAIN);
} else {
task_obj = doca_cc_send_task_as_task(task);
do {
doca_error = doca_task_submit(task_obj);
if (doca_error == DOCA_ERROR_AGAIN) {
// Queue is full of tasks, need to free tasks with completion callback
doca_pe_progress(s_user_params.pe);
}
} while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN);
}
if (doca_error != DOCA_SUCCESS) {
if (doca_error == DOCA_ERROR_AGAIN) { // only for non-blocked
errno = EAGAIN;
ret = -1;
doca_task_free(task_obj);
if (s_user_params.doca_cc_fifo) {
(void)doca_buf_dec_refcount(doca_buf, NULL);
}
} else {
log_err("Doca doca_task_submit failed");
ret = RET_SOCKET_SHUTDOWN;
Expand All @@ -169,7 +225,14 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes,
}
}
// Additional call for better performance- release pressure on send queue
doca_pe_progress(s_user_params.pe);
if (!s_user_params.doca_cc_fifo || doca_error == DOCA_ERROR_NO_MEMORY) {
doca_pe_progress(s_user_params.pe);
} else { // fast path and task submitted successfully
do {
result = doca_pe_progress(s_user_params.pe);
nanosleep(&ts, &ts);
} while (result == 0);
}
} else
#endif /* USING_DOCA_COMM_CHANNEL_API */
{
Expand Down
5 changes: 4 additions & 1 deletion src/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ enum {
#if defined(USING_DOCA_COMM_CHANNEL_API)
OPT_DOCA,
OPT_PCI,
OPT_PCI_REP
OPT_PCI_REP,
OPT_DOCA_FAST_PATH
#endif /* USING_DOCA_COMM_CHANNEL_API */

};
Expand Down Expand Up @@ -820,9 +821,11 @@ struct user_params_t {
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
bool doca_comm_channel = false; /* Flag to indicate using Com Channel*/
bool doca_cc_fifo = false; /* Flag to indicate using fast path*/
char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */
char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */
struct doca_pe *pe = nullptr; /* Progress engine for doca, one per thread*/
struct doca_pe *pe_underload = nullptr; /* Progress engine for doca, one per thread, underload mode */
#endif /* USING_DOCA_COMM_CHANNEL_API */

user_params_t() {
Expand Down
Loading

0 comments on commit 25d0d73

Please sign in to comment.