diff --git a/src/client.cpp b/src/client.cpp index 767e5dc3..2fa1efdf 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -888,11 +888,25 @@ int Client::initBeforeLoop() { #if defined(USING_DOCA_COMM_CHANNEL_API) if (s_user_params.doca_comm_channel) { // Waiting for connection - struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)data->doca_cc_ctx; - while (ctx_client->state != CC_CONNECTED) { + doca_error_t result; + while (data->doca_cc_ctx->state != CC_CONNECTED) { doca_pe_progress(s_user_params.pe); } log_dbg("[fd=%d] Client connected successfully", ifd); + if (s_user_params.doca_fast_path) { + struct cc_local_mem_bufs *local_producer_mem = &(data->doca_cc_ctx->ctx_fifo.producer_mem); + // Buf is needed for registering with memrange + local_producer_mem->mem = m_pMsgRequest->getBuf(); + result = cc_init_local_mem_bufs(local_producer_mem, data->doca_cc_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result)); + return result; + } + log_dbg("[fd=%d] Init producer memory succeeded", ifd); + while (data->doca_cc_ctx->ctx_fifo.fifo_connection_state != CC_FIFO_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + } } // Avoid Client binding in Com Channel mode if (p_client_bind_addr->addr.sa_family != AF_UNSPEC && !s_user_params.doca_comm_channel) { diff --git a/src/common.h b/src/common.h index 31a82df3..d7ff849e 100644 --- a/src/common.h +++ b/src/common.h @@ -115,51 +115,107 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, #if defined(USING_DOCA_COMM_CHANNEL_API) if (s_user_params.doca_comm_channel) { doca_error_t doca_error; + int result; + struct doca_cc_producer_send_task *producer_task; struct doca_cc_send_task *task; + struct doca_buf *doca_buf; struct doca_task *task_obj; struct timespec ts = { .tv_sec = 0, .tv_nsec = NANOS_10_X_1000, }; struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; - do { - if (s_user_params.mode == MODE_SERVER) { - struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; - doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf, - nbytes, &task); - } else { // MODE_CLIENT - struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx; - doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf, - nbytes, &task); - } - if (doca_error == DOCA_ERROR_NO_MEMORY) { - // Queue is full of tasks, need to free tasks with completion callback - doca_pe_progress(s_user_params.pe); - } - } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); - - if (doca_error != DOCA_SUCCESS) { - if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked - errno = EAGAIN; - ret = -1; + if (s_user_params.doca_fast_path) { + do { + doca_error = doca_buf_inventory_buf_get_by_data(ctx->ctx_fifo.producer_mem.buf_inv, ctx->ctx_fifo.producer_mem.mmap, + ctx->ctx_fifo.producer_mem.mem, nbytes, &doca_buf); + if (doca_error == DOCA_ERROR_NO_MEMORY) { + // Queue is full of tasks, need to free tasks with completion callback + doca_pe_progress(s_user_params.pe); + } + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); + if (doca_error != DOCA_SUCCESS) { + if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + errno = EAGAIN; + ret = -1; + } else { + log_err("Failed to get doca buf from producer mmap with error = %s", doca_error_get_name(doca_error)); + ret = RET_SOCKET_SHUTDOWN; + } } else { - log_err("Doca task_alloc_init failed"); - ret = RET_SOCKET_SHUTDOWN; + do { + doca_error = doca_cc_producer_send_task_alloc_init(ctx->ctx_fifo.producer, doca_buf, + ctx->ctx_fifo.remote_consumer_id, &producer_task); + if (doca_error == DOCA_ERROR_NO_MEMORY) { + // Queue is full of tasks, need to free tasks with completion callback and buffer decrease + doca_pe_progress(s_user_params.pe); + } + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); + if (doca_error != DOCA_SUCCESS) { + if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + (void)doca_buf_dec_refcount(doca_buf, NULL); + errno = EAGAIN; + ret = -1; + } else { + log_err("Doca task_alloc_init failed"); + ret = RET_SOCKET_SHUTDOWN; + } + } } - } else { // task_alloc_init succeeded - task_obj = doca_cc_send_task_as_task(task); + } else { // Not doca fast path do { - doca_error = doca_task_submit(task_obj); - if (doca_error == DOCA_ERROR_AGAIN) { + if (s_user_params.mode == MODE_SERVER) { + struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; + doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf, + nbytes, &task); + } else { // MODE_CLIENT + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx; + doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf, + nbytes, &task); + } + if (doca_error == DOCA_ERROR_NO_MEMORY) { // Queue is full of tasks, need to free tasks with completion callback doca_pe_progress(s_user_params.pe); } - } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN); - + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); + if (doca_error != DOCA_SUCCESS) { + if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + errno = EAGAIN; + ret = -1; + } else { + log_err("Doca task_alloc_init failed"); + ret = RET_SOCKET_SHUTDOWN; + } + } + } + if (doca_error == DOCA_SUCCESS) { // task_alloc_init succeeded + if (s_user_params.doca_fast_path) { + task_obj = doca_cc_producer_send_task_as_task(producer_task); + do { + doca_error = doca_task_submit(task_obj); + // doca buf_get_by_data and task_alloc_init succeeded, need to submit until no AGAIN + if (doca_error == DOCA_ERROR_AGAIN) { + nanosleep(&ts, &ts); + } + } while (doca_error == DOCA_ERROR_AGAIN); + } else { + task_obj = doca_cc_send_task_as_task(task); + do { + doca_error = doca_task_submit(task_obj); + if (doca_error == DOCA_ERROR_AGAIN) { + // Queue is full of tasks, need to free tasks with completion callback + doca_pe_progress(s_user_params.pe); + } + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN); + } if (doca_error != DOCA_SUCCESS) { if (doca_error == DOCA_ERROR_AGAIN) { // only for non-blocked errno = EAGAIN; ret = -1; + doca_task_free(task_obj); + if (s_user_params.doca_fast_path) { + (void)doca_buf_dec_refcount(doca_buf, NULL); + } } else { log_err("Doca doca_task_submit failed"); ret = RET_SOCKET_SHUTDOWN; @@ -169,7 +225,15 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, } } // Additional call for better performance- release pressure on send queue - doca_pe_progress(s_user_params.pe); + if (!s_user_params.doca_fast_path || doca_error == DOCA_ERROR_NO_MEMORY) { + doca_pe_progress(s_user_params.pe); + } else { // fast path and task submitted successfully + // for fast data path we need to make sure for completion in ping pong blocking mode + do { + result = doca_pe_progress(s_user_params.pe); + nanosleep(&ts, &ts); + } while (result == 0); + } } else #endif /* USING_DOCA_COMM_CHANNEL_API */ { diff --git a/src/defs.h b/src/defs.h index fea3ba3c..631bbc4b 100644 --- a/src/defs.h +++ b/src/defs.h @@ -301,7 +301,8 @@ enum { #if defined(USING_DOCA_COMM_CHANNEL_API) OPT_DOCA, OPT_PCI, - OPT_PCI_REP + OPT_PCI_REP, + OPT_DOCA_FAST_PATH #endif /* USING_DOCA_COMM_CHANNEL_API */ }; @@ -820,6 +821,7 @@ struct user_params_t { #endif /* DEFINED_TLS */ #if defined(USING_DOCA_COMM_CHANNEL_API) bool doca_comm_channel = false; /* Flag to indicate using Com Channel*/ + bool doca_fast_path = false; /* Flag to indicate using fast path*/ char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */ char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */ struct doca_pe *pe = nullptr; /* Progress engine for doca, one per thread*/ diff --git a/src/doca_cc_helper.h b/src/doca_cc_helper.h index e13f3256..b2caa8e2 100644 --- a/src/doca_cc_helper.h +++ b/src/doca_cc_helper.h @@ -39,9 +39,17 @@ #include #include #include +#include +#include +#include +#include + #include "os_abstract.h" #define MSG_SIZE 4080 +static int MAX_BUF_SIZE = 65507 * 2; +#define MAX_BUFS 10 +#define CC_DATA_PATH_LOG_TASK_NUM 10 /* Maximum amount of CC consumer and producer task number */ #define PCI_ADDR_LEN 8 #define CC_MAX_QUEUE_SIZE 1024 /* Maximum amount of message in queue */ #define CC_REC_QUEUE_SIZE 10 /* Maximum amount of message in queue */ @@ -54,10 +62,37 @@ #define DOCA_LOG_ERR(format, ...) log_dbg(format, ##__VA_ARGS__) #define DOCA_LOG_DBG(format, ...) log_dbg(format, ##__VA_ARGS__) -enum cc_client_state { +enum cc_state { CONNECTION_IN_PROGRESS, CC_CONNECTED }; +enum cc_fifo_connection_state { + FIFO_CONNECTION_IN_PROGRESS, + CC_FIFO_CONNECTED +}; + +enum cc_fifo_data_path_state { + CC_FIFO_GET_BUF, + CC_FIFO_ALLOC_INIT, + CC_FIFO_SUBMIT +}; +struct cc_local_mem_bufs { + void *mem; /* Memory address for DOCA buf mmap */ + struct doca_mmap *mmap; /* DOCA mmap object */ + struct doca_buf_inventory *buf_inv; /* DOCA buf inventory object */ +}; + +struct cc_ctx_fifo { + struct doca_cc_consumer *consumer; /**< CC consumer object */ + struct cc_local_mem_bufs consumer_mem; /**< Mmap and DOCA buf objects for consumer */ + struct doca_cc_producer *producer; /**< CC producer object */ + struct cc_local_mem_bufs producer_mem; /**< Mmap and DOCA buf objects for producer */ + uint32_t remote_consumer_id; /**< Consumer ID on the peer side */ + struct doca_pe *pe; /**< Progress Engine for */ + enum cc_fifo_connection_state fifo_connection_state; /**< Holding state for fast path connection >*/ + enum cc_fifo_data_path_state fifo_data_path_state; /**< Holding state for fast path data path >*/ +}; + struct cc_ctx { struct doca_dev *hw_dev; /**< Doca Device used per PCI address > */ struct doca_cc_connection *connection; /**< Connection object used for pairing a connection >*/ @@ -67,7 +102,10 @@ struct cc_ctx { bool recv_flag; /**< flag indicates when message received >*/ int fd; /**< File Descriptor >*/ os_mutex_t lock; /**< For underload mode only>*/ - os_cond_t cond; /**< For underload mode only>*/ + os_cond_t cond; /**< For underload mode only>*/ + enum cc_state state; /**< Holding state of client connection >*/ + bool fast_path; /**< Indicated for using fast data path*/ + struct cc_ctx_fifo ctx_fifo; /**< Data path objects */ }; struct cc_ctx_server { @@ -79,7 +117,6 @@ struct cc_ctx_server { struct cc_ctx_client { struct cc_ctx ctx; /**< Base common ctx >*/ struct doca_cc_client *client; /**< Client object >*/ - enum cc_client_state state; /**< Holding state of client connection >*/ bool underload_mode; /**< For using different callback>*/ }; @@ -98,6 +135,8 @@ struct priv_doca_pci_bdf { }; /************** General ******************/ +static doca_error_t cc_init_producer(struct cc_ctx *ctx); +static doca_error_t cc_init_consumer(struct cc_ctx *ctx); static doca_error_t cc_parse_pci_addr(char const *pci_addr, struct priv_doca_pci_bdf *out_bdf) { @@ -213,6 +252,50 @@ cc_open_doca_device_rep_with_pci(struct doca_dev *local, enum doca_devinfo_rep_f return DOCA_ERROR_NOT_FOUND; } +static doca_error_t +cc_init_local_mem_bufs(struct cc_local_mem_bufs *local_mem, struct cc_ctx *ctx) +{ + doca_error_t result; + + result = doca_buf_inventory_create(MAX_BUFS, &(local_mem->buf_inv)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to create inventory: %s", doca_error_get_descr(result)); + } + + result = doca_buf_inventory_start(local_mem->buf_inv); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to start inventory: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_create(&local_mem->mmap); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to create mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_add_dev(local_mem->mmap, ctx->hw_dev); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to add device to mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_set_permissions(local_mem->mmap, DOCA_ACCESS_FLAG_PCI_READ_WRITE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to set permission to mmap: %s", doca_error_get_descr(result)); + } + + // set here sockperf buf as local->mem + result = doca_mmap_set_memrange(local_mem->mmap, local_mem->mem, sizeof(uint8_t) * 65507 * 2); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to set memrange to mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_start(local_mem->mmap); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to start mmap: %s", doca_error_get_descr(result)); + } + + return DOCA_SUCCESS; +} + /************** SERVER ******************/ /** @@ -230,7 +313,7 @@ cc_server_send_task_completion_callback(struct doca_cc_send_task *task, union do (void)user_data; (void)task_user_data; - // DOCA_LOG_INFO("Task sent successfully"); + DOCA_LOG_INFO("Task sent successfully"); doca_task_free(doca_cc_send_task_as_task(task)); } @@ -289,14 +372,116 @@ cc_server_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *r /* Save the connection that the ping was sent over for sending the response */ struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; ctx_server->ctx.connection = cc_connection; - - //DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); + + DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); memcpy(ctx_server->ctx.recv_buffer, recv_buffer, msg_len); ctx_server->ctx.buf_size = (int)msg_len; ctx_server->ctx.recv_flag = true; } +/** + * Callback for consumer post recv task successfull completion + * + * @task [in]: Recv task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_consumer_recv_task_completion_callback(struct doca_cc_consumer_post_recv_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + size_t recv_msg_len; + void *recv_msg; + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + struct cc_ctx *ctx = (struct cc_ctx *)user_data.ptr; + + buf = doca_cc_consumer_post_recv_task_get_buf(task); + + result = doca_buf_get_data(buf, &recv_msg); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get data address from DOCA buf with error = %s", doca_error_get_name(result)); + } + + result = doca_buf_get_data_len(buf, &recv_msg_len); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get data length from DOCA buf with error = %s", doca_error_get_name(result)); + } + + ctx->buf_size = (int)recv_msg_len; + ctx->recv_flag = true; + + DOCA_LOG_INFO("Message received: '%.*s'", (int)recv_msg_len, (char *)recv_msg); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_consumer_post_recv_task_as_task(task)); +} + +/** + * Callback for consumer post recv task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_consumer_recv_task_completion_err_callback(struct doca_cc_consumer_post_recv_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + result = doca_task_get_status(doca_cc_consumer_post_recv_task_as_task(task)); + DOCA_LOG_ERR("Consumer failed to recv message with error = %s", doca_error_get_name(result)); + + buf = doca_cc_consumer_post_recv_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_consumer_post_recv_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_consumer_as_ctx(ctx_server->ctx.ctx_fifo.consumer)); +} + +static doca_error_t +cc_init_consumer(struct cc_ctx *ctx) +{ + doca_error_t result; + doca_data user_data; + struct doca_ctx *doca_ctx; + struct cc_local_mem_bufs *local_consumer_mem = &(ctx->ctx_fifo.consumer_mem); + + result = doca_cc_consumer_create(ctx->connection, local_consumer_mem->mmap, &(ctx->ctx_fifo.consumer)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to create consumer with error = %s", doca_error_get_name(result)); + return result; + } + doca_ctx = doca_cc_consumer_as_ctx(ctx->ctx_fifo.consumer); + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe, doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding pe context to server with error = %s", doca_error_get_name(result)); + } + result = doca_cc_consumer_post_recv_task_set_conf(ctx->ctx_fifo.consumer, cc_consumer_recv_task_completion_callback, + cc_consumer_recv_task_completion_err_callback, CC_DATA_PATH_LOG_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting consumer recv task cbs with error = %s", doca_error_get_name(result)); + return result; + } + user_data.ptr = (void*) ctx; + result = doca_ctx_set_user_data(doca_ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting consumer user data with error = %s", doca_error_get_name(result)); + return result; + } + result = doca_ctx_start(doca_ctx); + if (result != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed to start consumer context with error = %s", doca_error_get_name(result)); + } + return DOCA_SUCCESS; +} + /** * Callback for connection event * @@ -331,6 +516,21 @@ cc_server_connection_event_callback(struct doca_cc_event_connection_status_chang } ctx_server->ctx.num_connected_clients++; + ctx_server->ctx.connection = cc_conn; + + if (ctx_server->ctx.fast_path) { + /* Init a cc consumer */ + result = cc_init_consumer(&ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a consumer with error = %s", ctx_server->ctx.fd, doca_error_get_name(result)); + } + /* Init a cc producer */ + result = cc_init_producer(&ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a producer with error = %s", ctx_server->ctx.fd, doca_error_get_name(result)); + } + DOCA_LOG_INFO("Consumer & Producer were created successfully"); + } DOCA_LOG_INFO("[fd=%d] New client connected to server", ctx_server->ctx.fd); } @@ -369,8 +569,6 @@ cc_server_disconnection_event_callback(struct doca_cc_event_connection_status_ch DOCA_LOG_INFO("[fd=%d] client was disconnected from server", ctx_server->ctx.fd); } - - /** * Callback triggered whenever CC server context state changes * @@ -413,6 +611,54 @@ cc_server_state_changed_callback(const union doca_data user_data, struct doca_ct } } +/** + * Callback for new consumer arrival event + * + * @event [in]: New remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the new remote consumer + */ +static void +cc_server_new_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + ctx_server->ctx.ctx_fifo.remote_consumer_id = id; + DOCA_LOG_INFO("[fd=%d] Got a new remote consumer with ID = [%d]",ctx_server->ctx.fd, id); + ctx_server->ctx.ctx_fifo.fifo_connection_state = CC_FIFO_CONNECTED; + +} + +/** + * Callback for expired consumer arrival event + * + * @event [in]: Expired remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the expired remote consumer + */ +static void +cc_server_expired_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + /* These arguments are not in use */ + (void)event; + (void)cc_connection; + (void)id; +} + + static doca_error_t cc_doca_server_set_params(struct cc_ctx_server *ctx_server) { @@ -457,6 +703,21 @@ cc_doca_server_set_params(struct cc_ctx_server *ctx_server) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); } + if (ctx_server->ctx.fast_path) { // Fast path option + result = doca_cc_server_event_consumer_register(ctx_server->server, cc_server_new_consumer_callback, + cc_server_expired_consumer_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding consumer event cb with error = %s", doca_error_get_name(result)); + } + struct cc_local_mem_bufs *local_consumer_mem = &(ctx_server->ctx.ctx_fifo.consumer_mem); + local_consumer_mem->mem = ctx_server->ctx.recv_buffer; + result = cc_init_local_mem_bufs(local_consumer_mem, &ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init consumer memory with error = %s", doca_error_get_name(result)); + return result; + } + DOCA_LOG_DBG("Init consumer memory succeeded"); + } user_data.ptr = (void *)ctx_server; result = doca_ctx_set_user_data(ctx, user_data); @@ -468,7 +729,7 @@ cc_doca_server_set_params(struct cc_ctx_server *ctx_server) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to start server context with error = %s", doca_error_get_name(result)); } - DOCA_LOG_DBG("[fd=%d] server properties setters succeeded", ctx_server->ctx.fd); + DOCA_LOG_INFO("[fd=%d] server properties setters succeeded", ctx_server->ctx.fd); return result; } @@ -580,6 +841,91 @@ cc_client_message_UL_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t os_mutex_unlock(&cc_client->ctx.lock); } + +/** + * Callback for producer send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_producer_send_task_completion_callback(struct doca_cc_producer_send_task *task, union doca_data task_user_data, + union doca_data ctx_user_data) +{ + (void)task_user_data; + (void)ctx_user_data; + struct doca_buf *buf; + + DOCA_LOG_INFO("Producer task sent successfully"); + buf = doca_cc_producer_send_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_producer_send_task_as_task(task)); +} + +/** + * Callback for producer send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_producer_send_task_completion_err_callback(struct doca_cc_producer_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)user_data.ptr; + result = doca_task_get_status(doca_cc_producer_send_task_as_task(task)); + DOCA_LOG_ERR("Producer message failed to send with error = %s", + doca_error_get_name(result)); + + buf = doca_cc_producer_send_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_producer_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_producer_as_ctx(ctx_client->ctx.ctx_fifo.producer)); +} + +static doca_error_t +cc_init_producer(struct cc_ctx *ctx) +{ + doca_error_t result; + doca_data user_data; + struct doca_ctx *doca_ctx; + + result = doca_cc_producer_create(ctx->connection, &(ctx->ctx_fifo.producer)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to create producer with error = %s", doca_error_get_name(result)); + return result; + } + doca_ctx = doca_cc_producer_as_ctx(ctx->ctx_fifo.producer); + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe, doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding pe context to producer with error = %s", doca_error_get_name(result)); + } + result = doca_cc_producer_send_task_set_conf(ctx->ctx_fifo.producer, cc_producer_send_task_completion_callback, + cc_producer_send_task_completion_err_callback, CC_DATA_PATH_LOG_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting producer send task cbs with error = %s", doca_error_get_name(result)); + } + + user_data.ptr = (void*) ctx; + result = doca_ctx_set_user_data(doca_ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting producer user data with error = %s", doca_error_get_name(result)); + return result; + } + result = doca_ctx_start(doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to start producer context with error = %s", doca_error_get_name(result)); + } + return DOCA_SUCCESS; +} + /** * Init message on client * @@ -606,7 +952,18 @@ cc_init_client_send_message(struct cc_ctx_client *cc_client) return result; } - cc_client->state = CC_CONNECTED; + if (cc_client->ctx.fast_path) { + /* Init a cc producer */ + result = cc_init_producer(&cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a producer with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + } + result = cc_init_consumer(&cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a consumer with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + } + } + cc_client->ctx.state = CC_CONNECTED; DOCA_LOG_INFO("[fd=%d] init_client_send_message succeeded", cc_client->ctx.fd); return DOCA_SUCCESS; } @@ -657,6 +1014,53 @@ cc_client_state_changed_callback(const union doca_data user_data, struct doca_ct break; } } +/** + * Callback for new consumer arrival event + * + * @event [in]: New remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the new remote consumer + */ +static void +cc_client_new_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + union doca_data user_data; + struct doca_cc_client *cc_client; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_client = doca_cc_client_get_client_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_client_as_ctx(cc_client), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)(user_data.ptr); + ctx_client->ctx.ctx_fifo.remote_consumer_id = id; + + ctx_client->ctx.ctx_fifo.fifo_connection_state = CC_FIFO_CONNECTED; + DOCA_LOG_INFO("[fd=%d] Got a new remote consumer with ID = [%d]",ctx_client->ctx.fd, id); +} + +/** + * Callback for expired consumer arrival event + * + * @event [in]: Expired remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the expired remote consumer + */ +static void +cc_client_expired_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + /* These arguments are not in use */ + (void)event; + (void)cc_connection; + (void)id; +} static doca_error_t cc_doca_client_set_params(struct cc_ctx_client *cc_client) @@ -697,8 +1101,24 @@ cc_doca_client_set_params(struct cc_ctx_client *cc_client) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); } - user_data.ptr = (void *)cc_client; + if (cc_client->ctx.fast_path) { // Fast path option + result = doca_cc_client_event_consumer_register(cc_client->client, cc_client_new_consumer_callback, + cc_client_expired_consumer_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding consumer event cb with error = %s", doca_error_get_name(result)); + } + struct cc_local_mem_bufs *local_consumer_mem = &(cc_client->ctx.ctx_fifo.consumer_mem); + local_consumer_mem->mem = cc_client->ctx.recv_buffer; + result = cc_init_local_mem_bufs(local_consumer_mem, &cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to consumer memory with error = %s", doca_error_get_name(result)); + return result; + } + DOCA_LOG_DBG("Init consumer memory succeeded"); + } + + user_data.ptr = (void *)cc_client; result = doca_ctx_set_user_data(ctx, user_data); if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(result)); diff --git a/src/input_handlers.h b/src/input_handlers.h index b9483a0a..29a62a37 100644 --- a/src/input_handlers.h +++ b/src/input_handlers.h @@ -78,6 +78,7 @@ class RecvFromInputHandler : public MessageParser { .tv_nsec = NANOS_10_X_1000, }; struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; + doca_error_t result = DOCA_SUCCESS; if (s_user_params.mode == MODE_CLIENT && !g_pApp->m_const_params.b_client_ping_pong && !g_pApp->m_const_params.b_stream) { // latency_under_load os_mutex_lock(&ctx->lock); @@ -85,15 +86,62 @@ class RecvFromInputHandler : public MessageParser { // UL only-> wait for signal, once done copy buffer os_cond_wait(&ctx->cond, &ctx->lock); } - } else { - // Waiting for meesage receive callback - blocking mode - if (s_user_params.is_blocked) { + } else { // ping pong or throughput + if (s_user_params.doca_fast_path) { + struct doca_buf *doca_buf = NULL; + struct doca_cc_consumer_post_recv_task *consumer_task; + struct doca_task *task_obj; + do { + result = doca_buf_inventory_buf_get_by_addr(ctx->ctx_fifo.consumer_mem.buf_inv, ctx->ctx_fifo.consumer_mem.mmap, + ctx->ctx_fifo.consumer_mem.mem, MSG_SIZE, &doca_buf); + if (result == DOCA_ERROR_NO_MEMORY) { + nanosleep(&ts, &ts); + } + } while (s_user_params.is_blocked && result == DOCA_ERROR_NO_MEMORY); + if (result != DOCA_SUCCESS) { + if (result == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + errno = EAGAIN; + ctx->buf_size = -1; + } else { + log_err("Doca doca_cc_consumer_post_recv_task_alloc_init failed"); + ret = RET_SOCKET_SHUTDOWN; + } + } else { + do { + result = doca_cc_consumer_post_recv_task_alloc_init(ctx->ctx_fifo.consumer, doca_buf, &consumer_task); + if (result == DOCA_ERROR_NO_MEMORY) { + nanosleep(&ts, &ts); + } + } while (s_user_params.is_blocked && result == DOCA_ERROR_NO_MEMORY); + if (result != DOCA_SUCCESS) { + if (result == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + errno = EAGAIN; + ctx->buf_size = -1; + (void)doca_buf_dec_refcount(doca_buf, NULL); + } else { + log_err("Doca doca_cc_consumer_post_recv_task_alloc_init failed"); + ret = RET_SOCKET_SHUTDOWN; + } + } else { + task_obj = doca_cc_consumer_post_recv_task_as_task(consumer_task); + result = doca_task_submit(task_obj); + if (result != DOCA_SUCCESS) { + log_err("failed submitting recv task with error = %s", doca_error_get_name(result)); + doca_task_free(task_obj); + (void)doca_buf_dec_refcount(doca_buf, NULL); + return RET_SOCKET_SHUTDOWN; + } + } + } + } // end of doca fast path + // Waiting for meesage receive callback - blocking or fifo mode and task submitted successfully + if (s_user_params.is_blocked || (s_user_params.doca_fast_path && result == DOCA_SUCCESS)) { while (!ctx->recv_flag) { if (doca_pe_progress(s_user_params.pe) == 0) { nanosleep(&ts, &ts); } } - } else { // non-blocking + } else { // non-blocked or fifo NO_MEMORY doca_pe_progress(s_user_params.pe); if (!ctx->recv_flag) {// Message recv errno = EAGAIN; diff --git a/src/server.cpp b/src/server.cpp index 401f19ec..f01fb232 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -96,7 +96,24 @@ int ServerBase::initBeforeLoop() { std::string hostport = sockaddr_to_hostport(p_bind_addr); #if defined(USING_DOCA_COMM_CHANNEL_API) - if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { + if (s_user_params.doca_comm_channel && s_user_params.doca_fast_path) { + doca_error_t result; + struct cc_local_mem_bufs *local_producer_mem = &(g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.producer_mem); + struct cc_local_mem_bufs *local_consumer_mem = &(g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.consumer_mem); + // Buf is needed for registering with memrange + local_producer_mem->mem = local_consumer_mem->mem; + result = cc_init_local_mem_bufs(local_producer_mem, g_fds_array[ifd]->doca_cc_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result)); + return result; + } + log_dbg("[fd=%d] Init producer memory succeeded", ifd); + // Waiting for connection recv before using fast path + while (g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.fifo_connection_state != CC_FIFO_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + log_dbg("[fd=%d] New client connected successfully", ifd); + } else if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { #else log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { diff --git a/src/sockperf.cpp b/src/sockperf.cpp index c568d43e..206effb3 100644 --- a/src/sockperf.cpp +++ b/src/sockperf.cpp @@ -303,6 +303,8 @@ static const AOPT_DESC common_opt_desc[] = { #if defined(USING_DOCA_COMM_CHANNEL_API) { OPT_DOCA, AOPT_NOARG, aopt_set_literal(0), aopt_set_string("doca-comm-channel"), "Use Doca communication channel" }, + { OPT_DOCA_FAST_PATH, AOPT_NOARG, aopt_set_literal(0), + aopt_set_string("doca-fast-path"), "Use Doca fast data path (required doca-comm-channel option)" }, { OPT_PCI, AOPT_ARG, aopt_set_literal(0), aopt_set_string("pci-address"), "Comm Channel DOCA device PCI address"}, { OPT_PCI_REP, AOPT_ARG, aopt_set_literal(0), @@ -2265,6 +2267,14 @@ static int parse_common_opt(const AOPT_OBJECT *common_obj) { } log_dbg("doca_pe_create succeeded"); } + if (!rc && aopt_check(common_obj, OPT_DOCA_FAST_PATH)) { + if (!aopt_check(common_obj, OPT_DOCA)) { + log_msg("--doca-comm-channel is required for fast path option"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + s_user_params.doca_fast_path = true; + } + } #endif /* USING_DOCA_COMM_CHANNEL_API */ } @@ -2463,6 +2473,14 @@ void cleanup() { doca_dev_close(ctx->hw_dev); os_mutex_close(&ctx->lock); os_cond_destroy(&ctx->cond); + if (s_user_params.doca_fast_path) { + doca_cc_consumer_destroy(ctx->ctx_fifo.consumer); + doca_cc_producer_destroy(ctx->ctx_fifo.producer); + doca_mmap_destroy(ctx->ctx_fifo.consumer_mem.mmap); + doca_mmap_destroy(ctx->ctx_fifo.producer_mem.mmap); + doca_buf_inventory_destroy(ctx->ctx_fifo.consumer_mem.buf_inv); + doca_buf_inventory_destroy(ctx->ctx_fifo.producer_mem.buf_inv); + } if (s_user_params.mode == MODE_SERVER) { struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; doca_cc_server_destroy(ctx_server->server); @@ -3621,9 +3639,18 @@ int bringup_for_doca(std::unique_ptr &tmp) cc_ctx.recv_buffer = tmp->recv.buf; cc_ctx.num_connected_clients = 0; cc_ctx.buf_size = 0; + cc_ctx.recv_flag = false; os_mutex_init(&cc_ctx.lock); os_cond_init(&cc_ctx.cond); + if (s_user_params.doca_fast_path) { + cc_ctx.fast_path = true; + cc_ctx.ctx_fifo.fifo_connection_state = FIFO_CONNECTION_IN_PROGRESS; + cc_ctx.ctx_fifo.fifo_data_path_state = CC_FIFO_GET_BUF; + cc_ctx.ctx_fifo.pe = s_user_params.pe; + } else { + cc_ctx.fast_path = false; + } struct priv_doca_pci_bdf dev_pcie = {0}; doca_error_t doca_error = DOCA_SUCCESS; struct doca_ctx *ctx; @@ -3654,7 +3681,6 @@ int bringup_for_doca(std::unique_ptr &tmp) if (s_user_params.mode == MODE_SERVER) { ctx_server = (struct cc_ctx_server*)MALLOC(sizeof(struct cc_ctx_server)); - cc_ctx.recv_flag = true; /* Convert the PCI addresses into the matching struct */ struct priv_doca_pci_bdf dev_rep_pcie = {0}; doca_error = cc_parse_pci_addr(s_user_params.cc_dev_rep_pci_addr, &dev_rep_pcie); @@ -3684,8 +3710,7 @@ int bringup_for_doca(std::unique_ptr &tmp) } else { // MODE_CLIENT ctx_client = (struct cc_ctx_client*)MALLOC(sizeof(struct cc_ctx_client)); - ctx_client->state = CONNECTION_IN_PROGRESS; - cc_ctx.recv_flag = false; + cc_ctx.state = CONNECTION_IN_PROGRESS; if (!s_user_params.b_client_ping_pong && !s_user_params.b_stream) { // latency_under_load ctx_client->underload_mode = true; } else {