diff --git a/src/client.cpp b/src/client.cpp index 767e5dc3..3677b4ca 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -888,11 +888,35 @@ int Client::initBeforeLoop() { #if defined(USING_DOCA_COMM_CHANNEL_API) if (s_user_params.doca_comm_channel) { // Waiting for connection - struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)data->doca_cc_ctx; - while (ctx_client->state != CC_CONNECTED) { + doca_error_t result; + while (data->doca_cc_ctx->state != CC_CONNECTED) { doca_pe_progress(s_user_params.pe); } log_dbg("[fd=%d] Client connected successfully", ifd); + if (s_user_params.doca_cc_fifo) { + struct cc_local_mem_bufs *local_producer_mem = &(data->doca_cc_ctx->ctx_fifo.producer_mem); + // Buf is needed for registering with memrange + local_producer_mem->mem = m_pMsgRequest->getBuf(); + result = cc_init_local_mem_bufs(local_producer_mem, data->doca_cc_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result)); + return result; + } + log_dbg("[fd=%d] Init producer memory succeeded", ifd); + while (data->doca_cc_ctx->ctx_fifo.fifo_connection_state != CC_FIFO_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + if (!g_pApp->m_const_params.b_client_ping_pong && !g_pApp->m_const_params.b_stream) { + struct doca_ctx *ctx; + enum doca_ctx_states state; + ctx = doca_cc_consumer_as_ctx(data->doca_cc_ctx->ctx_fifo.consumer); + doca_ctx_get_state(ctx, &state); + while (state != DOCA_CTX_STATE_RUNNING) { + doca_pe_progress(s_user_params.pe_underload); + doca_ctx_get_state(ctx, &state); + } + } + } } // Avoid Client binding in Com Channel mode if (p_client_bind_addr->addr.sa_family != AF_UNSPEC && !s_user_params.doca_comm_channel) { diff --git a/src/common.h b/src/common.h index 31a82df3..9ee4a4de 100644 --- a/src/common.h +++ b/src/common.h @@ -115,51 +115,107 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, #if defined(USING_DOCA_COMM_CHANNEL_API) if (s_user_params.doca_comm_channel) { doca_error_t doca_error; + int result; + struct doca_cc_producer_send_task *producer_task; struct doca_cc_send_task *task; + struct doca_buf *doca_buf; struct doca_task *task_obj; struct timespec ts = { .tv_sec = 0, .tv_nsec = NANOS_10_X_1000, }; struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; - do { - if (s_user_params.mode == MODE_SERVER) { - struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; - doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf, - nbytes, &task); - } else { // MODE_CLIENT - struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx; - doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf, - nbytes, &task); - } - if (doca_error == DOCA_ERROR_NO_MEMORY) { - // Queue is full of tasks, need to free tasks with completion callback - doca_pe_progress(s_user_params.pe); - } - } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); - - if (doca_error != DOCA_SUCCESS) { - if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked - errno = EAGAIN; - ret = -1; + if (s_user_params.doca_cc_fifo) { + do { + doca_error = doca_buf_inventory_buf_get_by_data(ctx->ctx_fifo.producer_mem.buf_inv, ctx->ctx_fifo.producer_mem.mmap, + ctx->ctx_fifo.producer_mem.mem, nbytes, &doca_buf); + if (doca_error == DOCA_ERROR_NO_MEMORY) { + // Queue is full of tasks, need to free tasks with completion callback + doca_pe_progress(s_user_params.pe); + } + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); + if (doca_error != DOCA_SUCCESS) { + if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + errno = EAGAIN; + ret = -1; + } else { + log_err("Failed to get doca buf from producer mmap with error = %s", doca_error_get_name(doca_error)); + ret = RET_SOCKET_SHUTDOWN; + } } else { - log_err("Doca task_alloc_init failed"); - ret = RET_SOCKET_SHUTDOWN; + do { + doca_error = doca_cc_producer_send_task_alloc_init(ctx->ctx_fifo.producer, doca_buf, + ctx->ctx_fifo.remote_consumer_id, &producer_task); + if (doca_error == DOCA_ERROR_NO_MEMORY) { + // Queue is full of tasks, need to free tasks with completion callback and buffer decrease + doca_pe_progress(s_user_params.pe); + } + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); + if (doca_error != DOCA_SUCCESS) { + if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + (void)doca_buf_dec_refcount(doca_buf, NULL); + errno = EAGAIN; + ret = -1; + } else { + log_err("Doca task_alloc_init failed"); + ret = RET_SOCKET_SHUTDOWN; + } + } } - } else { // task_alloc_init succeeded - task_obj = doca_cc_send_task_as_task(task); + } else { // Not doca fast path do { - doca_error = doca_task_submit(task_obj); - if (doca_error == DOCA_ERROR_AGAIN) { + if (s_user_params.mode == MODE_SERVER) { + struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; + doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf, + nbytes, &task); + } else { // MODE_CLIENT + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx; + doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf, + nbytes, &task); + } + if (doca_error == DOCA_ERROR_NO_MEMORY) { // Queue is full of tasks, need to free tasks with completion callback doca_pe_progress(s_user_params.pe); } - } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN); - + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); + if (doca_error != DOCA_SUCCESS) { + if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + errno = EAGAIN; + ret = -1; + } else { + log_err("Doca task_alloc_init failed"); + ret = RET_SOCKET_SHUTDOWN; + } + } + } + if (doca_error == DOCA_SUCCESS) { // task_alloc_init succeeded + if (s_user_params.doca_cc_fifo) { + task_obj = doca_cc_producer_send_task_as_task(producer_task); + do { + doca_error = doca_task_submit(task_obj); + // doca buf_get_by_data and task_alloc_init succeeded, need to submit until no AGAIN + if (doca_error == DOCA_ERROR_AGAIN) { + nanosleep(&ts, &ts); + } + } while (doca_error == DOCA_ERROR_AGAIN); + } else { + task_obj = doca_cc_send_task_as_task(task); + do { + doca_error = doca_task_submit(task_obj); + if (doca_error == DOCA_ERROR_AGAIN) { + // Queue is full of tasks, need to free tasks with completion callback + doca_pe_progress(s_user_params.pe); + } + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN); + } if (doca_error != DOCA_SUCCESS) { if (doca_error == DOCA_ERROR_AGAIN) { // only for non-blocked errno = EAGAIN; ret = -1; + doca_task_free(task_obj); + if (s_user_params.doca_cc_fifo) { + (void)doca_buf_dec_refcount(doca_buf, NULL); + } } else { log_err("Doca doca_task_submit failed"); ret = RET_SOCKET_SHUTDOWN; @@ -169,7 +225,14 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, } } // Additional call for better performance- release pressure on send queue - doca_pe_progress(s_user_params.pe); + if (!s_user_params.doca_cc_fifo || doca_error == DOCA_ERROR_NO_MEMORY) { + doca_pe_progress(s_user_params.pe); + } else { // fast path and task submitted successfully + do { + result = doca_pe_progress(s_user_params.pe); + nanosleep(&ts, &ts); + } while (result == 0); + } } else #endif /* USING_DOCA_COMM_CHANNEL_API */ { diff --git a/src/defs.h b/src/defs.h index fea3ba3c..bb360e35 100644 --- a/src/defs.h +++ b/src/defs.h @@ -301,7 +301,8 @@ enum { #if defined(USING_DOCA_COMM_CHANNEL_API) OPT_DOCA, OPT_PCI, - OPT_PCI_REP + OPT_PCI_REP, + OPT_DOCA_FAST_PATH #endif /* USING_DOCA_COMM_CHANNEL_API */ }; @@ -820,9 +821,11 @@ struct user_params_t { #endif /* DEFINED_TLS */ #if defined(USING_DOCA_COMM_CHANNEL_API) bool doca_comm_channel = false; /* Flag to indicate using Com Channel*/ + bool doca_cc_fifo = false; /* Flag to indicate using fast path*/ char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */ char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */ struct doca_pe *pe = nullptr; /* Progress engine for doca, one per thread*/ + struct doca_pe *pe_underload = nullptr; /* Progress engine for doca, one per thread, underload mode */ #endif /* USING_DOCA_COMM_CHANNEL_API */ user_params_t() { diff --git a/src/doca_cc_helper.h b/src/doca_cc_helper.h index e13f3256..5ba1353e 100644 --- a/src/doca_cc_helper.h +++ b/src/doca_cc_helper.h @@ -39,9 +39,16 @@ #include #include #include +#include +#include +#include +#include + #include "os_abstract.h" #define MSG_SIZE 4080 +#define MAX_BUFS 1 +#define CC_DATA_PATH_LOG_TASK_NUM 10 /* Maximum amount of CC consumer and producer task number */ #define PCI_ADDR_LEN 8 #define CC_MAX_QUEUE_SIZE 1024 /* Maximum amount of message in queue */ #define CC_REC_QUEUE_SIZE 10 /* Maximum amount of message in queue */ @@ -54,10 +61,40 @@ #define DOCA_LOG_ERR(format, ...) log_dbg(format, ##__VA_ARGS__) #define DOCA_LOG_DBG(format, ...) log_dbg(format, ##__VA_ARGS__) -enum cc_client_state { - CONNECTION_IN_PROGRESS, +enum cc_state { + CC_CONNECTION_IN_PROGRESS, CC_CONNECTED }; +enum cc_fifo_connection_state { + CC_FIFO_CONNECTION_IN_PROGRESS, + CC_FIFO_CONNECTED +}; + +struct cc_local_mem_bufs { + void *mem; /* Memory address for DOCA buf mmap */ + struct doca_mmap *mmap; /* DOCA mmap object */ + struct doca_buf_inventory *buf_inv; /* DOCA buf inventory object */ +}; + +struct cc_ctx_fifo { + struct doca_cc_consumer *consumer; /**< CC consumer object */ + struct cc_local_mem_bufs consumer_mem; /**< Mmap and DOCA buf objects for consumer */ + struct doca_cc_producer *producer; /**< CC producer object */ + struct cc_local_mem_bufs producer_mem; /**< Mmap and DOCA buf objects for producer */ + uint32_t remote_consumer_id; /**< Consumer ID on the peer side */ + struct doca_pe *pe; /**< Progress Engine for */ + struct doca_pe *pe_underload; /**< Progress Engine for */ + bool underload_mode; /**< For using different callback>*/ + enum cc_fifo_connection_state fifo_connection_state; /**< Holding state for fast path connection >*/ + int msg_size; /** */ struct doca_cc_connection *connection; /**< Connection object used for pairing a connection >*/ @@ -67,7 +104,10 @@ struct cc_ctx { bool recv_flag; /**< flag indicates when message received >*/ int fd; /**< File Descriptor >*/ os_mutex_t lock; /**< For underload mode only>*/ - os_cond_t cond; /**< For underload mode only>*/ + os_cond_t cond; /**< For underload mode only>*/ + enum cc_state state; /**< Holding state of client connection >*/ + bool fast_path; /**< Indicated for using fast data path*/ + struct cc_ctx_fifo ctx_fifo; /**< Data path objects */ }; struct cc_ctx_server { @@ -79,8 +119,6 @@ struct cc_ctx_server { struct cc_ctx_client { struct cc_ctx ctx; /**< Base common ctx >*/ struct doca_cc_client *client; /**< Client object >*/ - enum cc_client_state state; /**< Holding state of client connection >*/ - bool underload_mode; /**< For using different callback>*/ }; struct priv_doca_pci_bdf { @@ -98,6 +136,8 @@ struct priv_doca_pci_bdf { }; /************** General ******************/ +static doca_error_t cc_init_producer(struct cc_ctx *ctx); +static doca_error_t cc_init_consumer(struct cc_ctx *ctx); static doca_error_t cc_parse_pci_addr(char const *pci_addr, struct priv_doca_pci_bdf *out_bdf) { @@ -213,6 +253,69 @@ cc_open_doca_device_rep_with_pci(struct doca_dev *local, enum doca_devinfo_rep_f return DOCA_ERROR_NOT_FOUND; } +static doca_error_t +cc_init_local_mem_bufs(struct cc_local_mem_bufs *local_mem, struct cc_ctx *ctx) +{ + doca_error_t result; + + result = doca_buf_inventory_create(MAX_BUFS, &(local_mem->buf_inv)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to create inventory: %s", doca_error_get_descr(result)); + } + + result = doca_buf_inventory_start(local_mem->buf_inv); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to start inventory: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_create(&local_mem->mmap); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to create mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_add_dev(local_mem->mmap, ctx->hw_dev); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to add device to mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_set_permissions(local_mem->mmap, DOCA_ACCESS_FLAG_PCI_READ_WRITE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to set permission to mmap: %s", doca_error_get_descr(result)); + } + + // set here sockperf buf as local->mem + //result = doca_mmap_set_memrange(local_mem->mmap, local_mem->mem, sizeof(uint8_t) * ctx->ctx_fifo.msg_size * 2); + result = doca_mmap_set_memrange(local_mem->mmap, local_mem->mem, sizeof(uint8_t) * 65507 * 2); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to set memrange to mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_start(local_mem->mmap); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to start mmap: %s", doca_error_get_descr(result)); + } + + return DOCA_SUCCESS; +} + +static doca_error_t +cc_init_doca_consumer_task(struct cc_local_mem_bufs *local_mem, struct cc_ctx_fifo *ctx_fifo) +{ + doca_error_t result; + result = doca_buf_inventory_buf_get_by_addr(local_mem->buf_inv, local_mem->mmap, local_mem->mem, + MSG_SIZE, &ctx_fifo->doca_buf_consumer); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to get doca buf: %s", doca_error_get_descr(result)); + } + result = doca_cc_consumer_post_recv_task_alloc_init(ctx_fifo->consumer, ctx_fifo->doca_buf_consumer, &ctx_fifo->consumer_task); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to allocate consumer task : %s", doca_error_get_descr(result)); + } + ctx_fifo->consumer_task_obj = doca_cc_consumer_post_recv_task_as_task(ctx_fifo->consumer_task); + return result; + +} + /************** SERVER ******************/ /** @@ -289,14 +392,120 @@ cc_server_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *r /* Save the connection that the ping was sent over for sending the response */ struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; ctx_server->ctx.connection = cc_connection; - - //DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); + + // DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); memcpy(ctx_server->ctx.recv_buffer, recv_buffer, msg_len); ctx_server->ctx.buf_size = (int)msg_len; ctx_server->ctx.recv_flag = true; } +/** + * Callback for consumer post recv task successfull completion + * + * @task [in]: Recv task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_consumer_recv_task_completion_callback(struct doca_cc_consumer_post_recv_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + size_t recv_msg_len; + void *recv_msg; + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + struct cc_ctx *ctx = (struct cc_ctx *)user_data.ptr; + + buf = doca_cc_consumer_post_recv_task_get_buf(task); + + result = doca_buf_get_data(buf, &recv_msg); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get data address from DOCA buf with error = %s", doca_error_get_name(result)); + } + + result = doca_buf_get_data_len(buf, &recv_msg_len); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get data length from DOCA buf with error = %s", doca_error_get_name(result)); + } + + ctx->buf_size = (int)recv_msg_len; + ctx->recv_flag = true; + + // DOCA_LOG_INFO("Message received: '%.*s'", (int)recv_msg_len, (char *)recv_msg); + // (void)doca_buf_dec_refcount(buf, NULL); + // doca_task_free(doca_cc_consumer_post_recv_task_as_task(task)); +} + +/** + * Callback for consumer post recv task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_consumer_recv_task_completion_err_callback(struct doca_cc_consumer_post_recv_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + result = doca_task_get_status(doca_cc_consumer_post_recv_task_as_task(task)); + DOCA_LOG_ERR("Consumer failed to recv message with error = %s", doca_error_get_name(result)); + + buf = doca_cc_consumer_post_recv_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_consumer_post_recv_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_consumer_as_ctx(ctx_server->ctx.ctx_fifo.consumer)); +} + +static doca_error_t +cc_init_consumer(struct cc_ctx *ctx) +{ + doca_error_t result; + doca_data user_data; + struct doca_ctx *doca_ctx; + struct cc_local_mem_bufs *local_consumer_mem = &(ctx->ctx_fifo.consumer_mem); + + result = doca_cc_consumer_create(ctx->connection, local_consumer_mem->mmap, &(ctx->ctx_fifo.consumer)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to create consumer with error = %s", doca_error_get_name(result)); + return result; + } + doca_ctx = doca_cc_consumer_as_ctx(ctx->ctx_fifo.consumer); + if (ctx->ctx_fifo.underload_mode) { + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe_underload, doca_ctx); + } else { + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe, doca_ctx); + } + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding pe context to server with error = %s", doca_error_get_name(result)); + } + result = doca_cc_consumer_post_recv_task_set_conf(ctx->ctx_fifo.consumer, cc_consumer_recv_task_completion_callback, + cc_consumer_recv_task_completion_err_callback, CC_DATA_PATH_LOG_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting consumer recv task cbs with error = %s", doca_error_get_name(result)); + return result; + } + user_data.ptr = (void*) ctx; + result = doca_ctx_set_user_data(doca_ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting consumer user data with error = %s", doca_error_get_name(result)); + return result; + } + result = doca_ctx_start(doca_ctx); + if (result != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed to start consumer context with error = %s", doca_error_get_name(result)); + } + return DOCA_SUCCESS; +} + /** * Callback for connection event * @@ -331,6 +540,21 @@ cc_server_connection_event_callback(struct doca_cc_event_connection_status_chang } ctx_server->ctx.num_connected_clients++; + ctx_server->ctx.connection = cc_conn; + + if (ctx_server->ctx.fast_path) { + /* Init a cc consumer */ + result = cc_init_consumer(&ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a consumer with error = %s", ctx_server->ctx.fd, doca_error_get_name(result)); + } + /* Init a cc producer */ + result = cc_init_producer(&ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a producer with error = %s", ctx_server->ctx.fd, doca_error_get_name(result)); + } + DOCA_LOG_INFO("Consumer & Producer were created successfully"); + } DOCA_LOG_INFO("[fd=%d] New client connected to server", ctx_server->ctx.fd); } @@ -369,8 +593,6 @@ cc_server_disconnection_event_callback(struct doca_cc_event_connection_status_ch DOCA_LOG_INFO("[fd=%d] client was disconnected from server", ctx_server->ctx.fd); } - - /** * Callback triggered whenever CC server context state changes * @@ -413,6 +635,54 @@ cc_server_state_changed_callback(const union doca_data user_data, struct doca_ct } } +/** + * Callback for new consumer arrival event + * + * @event [in]: New remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the new remote consumer + */ +static void +cc_server_new_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + ctx_server->ctx.ctx_fifo.remote_consumer_id = id; + DOCA_LOG_INFO("[fd=%d] Got a new remote consumer with ID = [%d]",ctx_server->ctx.fd, id); + ctx_server->ctx.ctx_fifo.fifo_connection_state = CC_FIFO_CONNECTED; + +} + +/** + * Callback for expired consumer arrival event + * + * @event [in]: Expired remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the expired remote consumer + */ +static void +cc_server_expired_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + /* These arguments are not in use */ + (void)event; + (void)cc_connection; + (void)id; +} + + static doca_error_t cc_doca_server_set_params(struct cc_ctx_server *ctx_server) { @@ -457,6 +727,22 @@ cc_doca_server_set_params(struct cc_ctx_server *ctx_server) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); } + if (ctx_server->ctx.fast_path) { // Fast path option + result = doca_cc_server_event_consumer_register(ctx_server->server, cc_server_new_consumer_callback, + cc_server_expired_consumer_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding consumer event cb with error = %s", doca_error_get_name(result)); + } + struct cc_local_mem_bufs *local_consumer_mem = &(ctx_server->ctx.ctx_fifo.consumer_mem); + local_consumer_mem->mem = ctx_server->ctx.recv_buffer; + result = cc_init_local_mem_bufs(local_consumer_mem, &ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init consumer memory with error = %s", doca_error_get_name(result)); + return result; + } + DOCA_LOG_DBG("Init consumer memory succeeded"); + + } user_data.ptr = (void *)ctx_server; result = doca_ctx_set_user_data(ctx, user_data); @@ -468,7 +754,7 @@ cc_doca_server_set_params(struct cc_ctx_server *ctx_server) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to start server context with error = %s", doca_error_get_name(result)); } - DOCA_LOG_DBG("[fd=%d] server properties setters succeeded", ctx_server->ctx.fd); + DOCA_LOG_INFO("[fd=%d] server properties setters succeeded", ctx_server->ctx.fd); return result; } @@ -490,7 +776,7 @@ cc_client_send_task_completion_callback(struct doca_cc_send_task *task, union do (void)user_data; (void)task_user_data; - DOCA_LOG_INFO("Task sent successfully"); + // DOCA_LOG_INFO("Task sent successfully"); doca_task_free(doca_cc_send_task_as_task(task)); } @@ -538,7 +824,7 @@ cc_client_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *r /* This argument is not in use */ (void)event; - DOCA_LOG_INFO("[fd=%d] Message received: '%d", cc_client->ctx.fd, (int)msg_len); + // DOCA_LOG_INFO("[fd=%d] Message received: '%d", cc_client->ctx.fd, (int)msg_len); memcpy(cc_client->ctx.recv_buffer, recv_buffer, msg_len); cc_client->ctx.buf_size = (int)msg_len; cc_client->ctx.recv_flag = true; @@ -580,6 +866,91 @@ cc_client_message_UL_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t os_mutex_unlock(&cc_client->ctx.lock); } + +/** + * Callback for producer send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_producer_send_task_completion_callback(struct doca_cc_producer_send_task *task, union doca_data task_user_data, + union doca_data ctx_user_data) +{ + (void)task_user_data; + (void)ctx_user_data; + struct doca_buf *buf; + + // DOCA_LOG_INFO("Producer task sent successfully"); + buf = doca_cc_producer_send_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_producer_send_task_as_task(task)); +} + +/** + * Callback for producer send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_producer_send_task_completion_err_callback(struct doca_cc_producer_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)user_data.ptr; + result = doca_task_get_status(doca_cc_producer_send_task_as_task(task)); + DOCA_LOG_ERR("Producer message failed to send with error = %s", + doca_error_get_name(result)); + + buf = doca_cc_producer_send_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_producer_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_producer_as_ctx(ctx_client->ctx.ctx_fifo.producer)); +} + +static doca_error_t +cc_init_producer(struct cc_ctx *ctx) +{ + doca_error_t result; + doca_data user_data; + struct doca_ctx *doca_ctx; + + result = doca_cc_producer_create(ctx->connection, &(ctx->ctx_fifo.producer)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to create producer with error = %s", doca_error_get_name(result)); + return result; + } + doca_ctx = doca_cc_producer_as_ctx(ctx->ctx_fifo.producer); + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe, doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding pe context to producer with error = %s", doca_error_get_name(result)); + } + result = doca_cc_producer_send_task_set_conf(ctx->ctx_fifo.producer, cc_producer_send_task_completion_callback, + cc_producer_send_task_completion_err_callback, CC_DATA_PATH_LOG_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting producer send task cbs with error = %s", doca_error_get_name(result)); + } + + user_data.ptr = (void*) ctx; + result = doca_ctx_set_user_data(doca_ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting producer user data with error = %s", doca_error_get_name(result)); + return result; + } + result = doca_ctx_start(doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to start producer context with error = %s", doca_error_get_name(result)); + } + return DOCA_SUCCESS; +} + /** * Init message on client * @@ -606,7 +977,18 @@ cc_init_client_send_message(struct cc_ctx_client *cc_client) return result; } - cc_client->state = CC_CONNECTED; + if (cc_client->ctx.fast_path) { + /* Init a cc producer */ + result = cc_init_producer(&cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a producer with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + } + result = cc_init_consumer(&cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a consumer with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + } + } + cc_client->ctx.state = CC_CONNECTED; DOCA_LOG_INFO("[fd=%d] init_client_send_message succeeded", cc_client->ctx.fd); return DOCA_SUCCESS; } @@ -657,6 +1039,53 @@ cc_client_state_changed_callback(const union doca_data user_data, struct doca_ct break; } } +/** + * Callback for new consumer arrival event + * + * @event [in]: New remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the new remote consumer + */ +static void +cc_client_new_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + union doca_data user_data; + struct doca_cc_client *cc_client; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_client = doca_cc_client_get_client_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_client_as_ctx(cc_client), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)(user_data.ptr); + ctx_client->ctx.ctx_fifo.remote_consumer_id = id; + + ctx_client->ctx.ctx_fifo.fifo_connection_state = CC_FIFO_CONNECTED; + DOCA_LOG_INFO("[fd=%d] Got a new remote consumer with ID = [%d]",ctx_client->ctx.fd, id); +} + +/** + * Callback for expired consumer arrival event + * + * @event [in]: Expired remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the expired remote consumer + */ +static void +cc_client_expired_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + /* These arguments are not in use */ + (void)event; + (void)cc_connection; + (void)id; +} static doca_error_t cc_doca_client_set_params(struct cc_ctx_client *cc_client) @@ -677,7 +1106,7 @@ cc_doca_client_set_params(struct cc_ctx_client *cc_client) DOCA_LOG_ERR("Failed setting send task cbs with error = %s", doca_error_get_name(result)); } - if (!cc_client->underload_mode) { // ping pong or throughput test + if (!cc_client->ctx.ctx_fifo.underload_mode) { // ping pong or throughput test result = doca_cc_client_event_msg_recv_register(cc_client->client, cc_client_message_recv_callback); } else { // underload test result = doca_cc_client_event_msg_recv_register(cc_client->client, cc_client_message_UL_recv_callback); @@ -697,8 +1126,24 @@ cc_doca_client_set_params(struct cc_ctx_client *cc_client) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); } - user_data.ptr = (void *)cc_client; + if (cc_client->ctx.fast_path) { // Fast path option + result = doca_cc_client_event_consumer_register(cc_client->client, cc_client_new_consumer_callback, + cc_client_expired_consumer_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding consumer event cb with error = %s", doca_error_get_name(result)); + } + struct cc_local_mem_bufs *local_consumer_mem = &(cc_client->ctx.ctx_fifo.consumer_mem); + local_consumer_mem->mem = cc_client->ctx.recv_buffer; + result = cc_init_local_mem_bufs(local_consumer_mem, &cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to consumer memory with error = %s", doca_error_get_name(result)); + return result; + } + DOCA_LOG_DBG("Init consumer memory succeeded"); + } + + user_data.ptr = (void *)cc_client; result = doca_ctx_set_user_data(ctx, user_data); if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(result)); diff --git a/src/input_handlers.h b/src/input_handlers.h index b9483a0a..82bce8a2 100644 --- a/src/input_handlers.h +++ b/src/input_handlers.h @@ -78,28 +78,56 @@ class RecvFromInputHandler : public MessageParser { .tv_nsec = NANOS_10_X_1000, }; struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; - if (s_user_params.mode == MODE_CLIENT && !g_pApp->m_const_params.b_client_ping_pong && - !g_pApp->m_const_params.b_stream) { // latency_under_load + struct doca_pe *pe_local = s_user_params.pe; + doca_error_t result = DOCA_SUCCESS; + if (!s_user_params.doca_cc_fifo && s_user_params.mode == MODE_CLIENT + && !g_pApp->m_const_params.b_client_ping_pong && !g_pApp->m_const_params.b_stream) { // latency_under_load os_mutex_lock(&ctx->lock); while (!ctx->recv_flag) { // UL only-> wait for signal, once done copy buffer os_cond_wait(&ctx->cond, &ctx->lock); } - } else { - // Waiting for meesage receive callback - blocking mode + } else { // ping pong or throughput + if (s_user_params.doca_cc_fifo) { + if (!ctx->ctx_fifo.task_submitted) {// avoid submitting the same task again + result = doca_buf_reset_data_len(ctx->ctx_fifo.doca_buf_consumer); + if (result != DOCA_SUCCESS) { + log_err("failed resetting doca data len with error = %s", doca_error_get_name(result)); + } + result = doca_task_submit(ctx->ctx_fifo.consumer_task_obj); + if (result != DOCA_SUCCESS) { + log_err("failed submitting recv task with error = %s", doca_error_get_name(result)); + doca_task_free(ctx->ctx_fifo.consumer_task_obj); + (void)doca_buf_dec_refcount(ctx->ctx_fifo.doca_buf_consumer, NULL); + return RET_SOCKET_SHUTDOWN; + } + // Avoid submitting task when there is already task in queue + ctx->ctx_fifo.task_submitted = true; + } + if (s_user_params.mode == MODE_CLIENT && !g_pApp->m_const_params.b_client_ping_pong + && !g_pApp->m_const_params.b_stream) { // latency_under_load + pe_local = s_user_params.pe_underload; + } + } // end of doca fast path + // Waiting for meesage receive callback - blocking mode if (s_user_params.is_blocked) { while (!ctx->recv_flag) { - if (doca_pe_progress(s_user_params.pe) == 0) { + if (doca_pe_progress(pe_local) == 0) { nanosleep(&ts, &ts); } } - } else { // non-blocking - doca_pe_progress(s_user_params.pe); + } else { // non-blocked + doca_pe_progress(pe_local); if (!ctx->recv_flag) {// Message recv errno = EAGAIN; ctx->buf_size = -1; } } + + if (s_user_params.doca_cc_fifo && ctx->recv_flag) { + // another task can be submitted + ctx->ctx_fifo.task_submitted = false; + } } m_actual_buf_size = ctx->buf_size; diff --git a/src/server.cpp b/src/server.cpp index 401f19ec..68ab64a7 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -96,7 +96,29 @@ int ServerBase::initBeforeLoop() { std::string hostport = sockaddr_to_hostport(p_bind_addr); #if defined(USING_DOCA_COMM_CHANNEL_API) - if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { + if (s_user_params.doca_comm_channel && s_user_params.doca_cc_fifo) { + doca_error_t result; + struct cc_local_mem_bufs *local_producer_mem = &(g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.producer_mem); + struct cc_local_mem_bufs *local_consumer_mem = &(g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.consumer_mem); + // Buf is needed for registering with memrange + local_producer_mem->mem = local_consumer_mem->mem; + result = cc_init_local_mem_bufs(local_producer_mem, g_fds_array[ifd]->doca_cc_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result)); + return result; + } + log_dbg("[fd=%d] Init producer memory succeeded", ifd); + // Waiting for connection recv before using fast path + while (g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.fifo_connection_state != CC_FIFO_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + log_dbg("[fd=%d] New client connected successfully", ifd); + result = cc_init_doca_consumer_task(local_consumer_mem, &g_fds_array[ifd]->doca_cc_ctx->ctx_fifo); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init doca task with error = %s", doca_error_get_name(result)); + } + + } else if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { #else log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { diff --git a/src/sockperf.cpp b/src/sockperf.cpp index c568d43e..43b38b1c 100644 --- a/src/sockperf.cpp +++ b/src/sockperf.cpp @@ -303,6 +303,8 @@ static const AOPT_DESC common_opt_desc[] = { #if defined(USING_DOCA_COMM_CHANNEL_API) { OPT_DOCA, AOPT_NOARG, aopt_set_literal(0), aopt_set_string("doca-comm-channel"), "Use Doca communication channel" }, + { OPT_DOCA_FAST_PATH, AOPT_NOARG, aopt_set_literal(0), + aopt_set_string("doca-fast-path"), "Use Doca fast data path (required doca-comm-channel option)" }, { OPT_PCI, AOPT_ARG, aopt_set_literal(0), aopt_set_string("pci-address"), "Comm Channel DOCA device PCI address"}, { OPT_PCI_REP, AOPT_ARG, aopt_set_literal(0), @@ -2265,6 +2267,14 @@ static int parse_common_opt(const AOPT_OBJECT *common_obj) { } log_dbg("doca_pe_create succeeded"); } + if (!rc && aopt_check(common_obj, OPT_DOCA_FAST_PATH)) { + if (!aopt_check(common_obj, OPT_DOCA)) { + log_msg("--doca-comm-channel is required for fast path option"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + s_user_params.doca_cc_fifo = true; + } + } #endif /* USING_DOCA_COMM_CHANNEL_API */ } @@ -2463,6 +2473,17 @@ void cleanup() { doca_dev_close(ctx->hw_dev); os_mutex_close(&ctx->lock); os_cond_destroy(&ctx->cond); + if (s_user_params.doca_cc_fifo) { + doca_cc_consumer_destroy(ctx->ctx_fifo.consumer); + doca_cc_producer_destroy(ctx->ctx_fifo.producer); + doca_mmap_destroy(ctx->ctx_fifo.consumer_mem.mmap); + doca_mmap_destroy(ctx->ctx_fifo.producer_mem.mmap); + doca_buf_inventory_destroy(ctx->ctx_fifo.consumer_mem.buf_inv); + doca_buf_inventory_destroy(ctx->ctx_fifo.producer_mem.buf_inv); + if (s_user_params.mode == MODE_CLIENT && ctx->ctx_fifo.underload_mode) { + doca_pe_destroy(s_user_params.pe_underload); + } + } if (s_user_params.mode == MODE_SERVER) { struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; doca_cc_server_destroy(ctx_server->server); @@ -3621,9 +3642,19 @@ int bringup_for_doca(std::unique_ptr &tmp) cc_ctx.recv_buffer = tmp->recv.buf; cc_ctx.num_connected_clients = 0; cc_ctx.buf_size = 0; + cc_ctx.recv_flag = false; os_mutex_init(&cc_ctx.lock); os_cond_init(&cc_ctx.cond); + if (s_user_params.doca_cc_fifo) { + cc_ctx.fast_path = true; + cc_ctx.ctx_fifo.fifo_connection_state = CC_FIFO_CONNECTION_IN_PROGRESS; + cc_ctx.ctx_fifo.pe = s_user_params.pe; + cc_ctx.ctx_fifo.msg_size = s_user_params.msg_size; + cc_ctx.ctx_fifo.task_submitted = false; + } else { + cc_ctx.fast_path = false; + } struct priv_doca_pci_bdf dev_pcie = {0}; doca_error_t doca_error = DOCA_SUCCESS; struct doca_ctx *ctx; @@ -3654,7 +3685,6 @@ int bringup_for_doca(std::unique_ptr &tmp) if (s_user_params.mode == MODE_SERVER) { ctx_server = (struct cc_ctx_server*)MALLOC(sizeof(struct cc_ctx_server)); - cc_ctx.recv_flag = true; /* Convert the PCI addresses into the matching struct */ struct priv_doca_pci_bdf dev_rep_pcie = {0}; doca_error = cc_parse_pci_addr(s_user_params.cc_dev_rep_pci_addr, &dev_rep_pcie); @@ -3681,17 +3711,13 @@ int bringup_for_doca(std::unique_ptr &tmp) } log_dbg("doca_cc_server_create succeeded"); ctx = doca_cc_server_as_ctx(ctx_server->server); + if (s_user_params.doca_cc_fifo) { + cc_ctx.ctx_fifo.underload_mode = false; + } } else { // MODE_CLIENT ctx_client = (struct cc_ctx_client*)MALLOC(sizeof(struct cc_ctx_client)); - ctx_client->state = CONNECTION_IN_PROGRESS; - cc_ctx.recv_flag = false; - if (!s_user_params.b_client_ping_pong && !s_user_params.b_stream) { // latency_under_load - ctx_client->underload_mode = true; - } else { - ctx_client->underload_mode = false; - } - + cc_ctx.state = CC_CONNECTION_IN_PROGRESS; doca_error = doca_cc_client_create(cc_ctx.hw_dev, s_user_params.addr.addr_un.sun_path, &(ctx_client->client)); if (doca_error != DOCA_SUCCESS) { @@ -3700,6 +3726,21 @@ int bringup_for_doca(std::unique_ptr &tmp) } log_dbg("doca_cc_client_create succeeded"); ctx = doca_cc_client_as_ctx(ctx_client->client); + if (!s_user_params.b_client_ping_pong && !s_user_params.b_stream) { // latency_under_load + cc_ctx.ctx_fifo.underload_mode = true; + if (s_user_params.doca_cc_fifo) { + // For underload mode we use different PE for consumer, 1 PE per thread + doca_error = doca_pe_create(&(s_user_params.pe_underload)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Fail creating pe for underload mode with error %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + log_dbg("doca_pe_create succeeded for underload mode"); + cc_ctx.ctx_fifo.pe_underload = s_user_params.pe_underload; + } + } else { + cc_ctx.ctx_fifo.underload_mode = false; + } } doca_error = doca_pe_connect_ctx(s_user_params.pe, ctx); @@ -3750,6 +3791,11 @@ int bringup_for_doca(std::unique_ptr &tmp) doca_dev_close(cc_ctx.hw_dev); /* Destroy PE*/ doca_pe_destroy(s_user_params.pe); + if (s_user_params.doca_cc_fifo) { + if (s_user_params.mode == MODE_CLIENT && cc_ctx.ctx_fifo.underload_mode) { + doca_pe_destroy(s_user_params.pe_underload); + } + } os_mutex_close(&cc_ctx.lock); os_cond_destroy(&cc_ctx.cond); s_user_params.pe = NULL;