diff --git a/configure.ac b/configure.ac index d8a87111..fa407af3 100644 --- a/configure.ac +++ b/configure.ac @@ -10,7 +10,7 @@ AC_CANONICAL_SYSTEM TARGETDIR="unknown" case "$host" in - + i?86-*-*) TARGET=X86; TARGETDIR=x86;; ia64*-*-*) TARGET=IA64; TARGETDIR=ia64;; powerpc*-*-linux* | powerpc-*-sysv*) TARGET=POWERPC; TARGETDIR=powerpc;; @@ -26,14 +26,14 @@ case "$host" in aarch64-*-*) TARGET=AARCH64; TARGETDIR=aarch64;; s390*-*-*) TARGET=S390; TARGETDIR=s390;; esac - + AC_SUBST(AM_RUNTESTFLAGS) AC_SUBST(AM_LTLDFLAGS) - + if test $TARGETDIR = unknown; then AC_MSG_ERROR(["it has not been ported to $host."]) fi - + AM_CONDITIONAL(X86, test x$TARGET = xX86) AM_CONDITIONAL(IA64, test x$TARGET = xIA64) AM_CONDITIONAL(POWERPC, test x$TARGET = xPOWERPC) @@ -100,6 +100,34 @@ AC_MSG_CHECKING( [for vma extra api]) AC_MSG_RESULT([${have_vma_api}]) +########################################################################## +# check DOCA communication channel API +# +AC_ARG_ENABLE( + [doca-communication-channel-api], + AC_HELP_STRING([--enable-doca-communication-channel-api], + [SOCKPERF: enable DOCA communication channel extra api support (default=no)]), + [have_doca_comm_channel_api=$enableval], + [have_doca_comm_channel_api=no]) +AS_IF([test "x${have_doca_comm_channel_api}" == "xyes"], + if test "$have_doca_comm_channel_api" = "yes" + then + have_doca_comm_channel_api=/opt/mellanox/doca + fi + + CPPFLAGS="$CPPFLAGS -I$have_doca_comm_channel_api/include -I$have_doca_comm_channel_api/lib -I$have_doca_comm_channel_api/samples" + LIBS="$LIBS -ldoca_comm_channel -ldoca_common" + + [AC_CHECK_HEADERS([$have_doca_comm_channel_api/include/doca_comm_channel.h $have_doca_comm_channel_api/include/doca_dev.h $have_doca_comm_channel_api/samples/common.h], + [AC_DEFINE([USING_DOCA_COMM_CHANNEL_API],[1],[[Enable using DOCA communication channel extra API]]) + ], + [AC_MSG_ERROR([doca_comm_channel.h file not found])] + [have_doca_comm_channel_api=no])]) +AC_MSG_CHECKING( + [for doca communication channel extra api]) +AC_MSG_RESULT([${have_doca_comm_channel_api}]) + + ########################################################################## # check XLIO extra API # @@ -141,7 +169,7 @@ AM_CONDITIONAL(DOC, test "x$have_doc" = "xyes") ########################## -# Enable tests +# Enable tests # SP_ARG_ENABLE_BOOL( [test], @@ -151,7 +179,7 @@ AM_CONDITIONAL(TEST, test "x$have_test" = "xyes") ########################## -# Enable tools +# Enable tools # SP_ARG_ENABLE_BOOL( [tool], @@ -233,6 +261,7 @@ AC_MSG_RESULT([ test: ${have_test} tool: ${have_tool} vma_api: ${have_vma_api} + doca_api: ${have_doca_comm_channel_api} xlio_api: ${have_xlio_api} debug: ${have_debug} ]) diff --git a/src/client.cpp b/src/client.cpp index 2f2b7268..52946ac6 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -885,7 +885,53 @@ int Client::initBeforeLoop() { if (!(data && (data->active_fd_list))) continue; const sockaddr_store_t *p_client_bind_addr = &g_pApp->m_const_params.client_bind_info; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + // Waiting for connection + doca_error_t result; + while (data->doca_cc_ctx->state != CC_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + log_dbg("[fd=%d] Client connected successfully", ifd); + if (s_user_params.doca_cc_fifo) { + struct cc_local_mem_bufs *local_producer_mem = &(data->doca_cc_ctx->ctx_fifo.producer_mem); + struct cc_local_mem_bufs *local_consumer_mem = &(data->doca_cc_ctx->ctx_fifo.consumer_mem); + // Buf is needed for registering with memrange + local_producer_mem->mem = m_pMsgRequest->getBuf(); + result = cc_init_local_mem_bufs(local_producer_mem, data->doca_cc_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result)); + return result; + } + log_dbg("[fd=%d] Init producer memory succeeded", ifd); + while (data->doca_cc_ctx->ctx_fifo.fifo_connection_state != CC_FIFO_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + if (!g_pApp->m_const_params.b_client_ping_pong && !g_pApp->m_const_params.b_stream) { + struct doca_ctx *ctx; + enum doca_ctx_states state; + ctx = doca_cc_consumer_as_ctx(data->doca_cc_ctx->ctx_fifo.consumer); + doca_ctx_get_state(ctx, &state); + while (state != DOCA_CTX_STATE_RUNNING) { + doca_pe_progress(s_user_params.pe_underload); + doca_ctx_get_state(ctx, &state); + } + } + result = cc_init_doca_consumer_task(local_consumer_mem, &data->doca_cc_ctx->ctx_fifo); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init doca consumer task with error = %s", doca_error_get_name(result)); + } + result = cc_init_doca_producer_task(local_producer_mem, &data->doca_cc_ctx->ctx_fifo); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init doca producer task with error = %s", doca_error_get_name(result)); + } + } + } + // Avoid Client binding in Com Channel mode + if (p_client_bind_addr->addr.sa_family != AF_UNSPEC && !s_user_params.doca_comm_channel) { +#else if (p_client_bind_addr->addr.sa_family != AF_UNSPEC) { +#endif //USING_DOCA_COMM_CHANNEL_API socklen_t client_bind_addr_len = g_pApp->m_const_params.client_bind_info_len; std::string hostport = sockaddr_to_hostport(p_client_bind_addr); #if defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__) diff --git a/src/common.cpp b/src/common.cpp index 52de7d60..390387c3 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -64,7 +64,11 @@ std::string sockaddr_to_hostport(const struct sockaddr *addr) if (addr->sa_family == AF_INET6) { return "[" + std::string(hbuf) + "]:" + std::string(pbuf); } else if (addr->sa_family == AF_UNIX) { - return std::string(addr->sa_data); +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) + return std::string(pbuf) + " [DOCA]"; +#endif + return std::string(pbuf) + " [UNIX]"; } else { return std::string(hbuf) + ":" + std::string(pbuf); } diff --git a/src/common.h b/src/common.h index 7a4da907..242be3ef 100644 --- a/src/common.h +++ b/src/common.h @@ -112,6 +112,92 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, ret = tls_write(g_fds_array[fd]->tls_handle, buf, nbytes); } else #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + doca_error_t doca_error = DOCA_SUCCESS; + int result; + struct doca_cc_producer_send_task *producer_task; + struct doca_cc_send_task *task; + struct doca_buf *doca_buf; + struct doca_task *task_obj; + struct timespec ts = { + .tv_sec = 0, + .tv_nsec = NANOS_10_X_1000, + }; + struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; + if (s_user_params.doca_cc_fifo) { + doca_error = doca_buf_set_data(ctx->ctx_fifo.doca_buf_producer, ctx->ctx_fifo.producer_mem.mem, nbytes); + if (doca_error != DOCA_SUCCESS) { + log_err("failed setting doca data data with error = %s", doca_error_get_name(doca_error)); + } + task_obj = doca_cc_producer_send_task_as_task(ctx->ctx_fifo.producer_task); + do { + doca_error = doca_task_submit(task_obj); + // need to submit until no AGAIN + if (doca_error == DOCA_ERROR_AGAIN) { + nanosleep(&ts, &ts); + } + } while (doca_error == DOCA_ERROR_AGAIN); + } else { // Not doca fast path + do { + if (s_user_params.mode == MODE_SERVER) { + struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; + doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf, + nbytes, &task); + } else { // MODE_CLIENT + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx; + doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf, + nbytes, &task); + } + if (doca_error == DOCA_ERROR_NO_MEMORY) { + // Queue is full of tasks, need to free tasks with completion callback + doca_pe_progress(s_user_params.pe); + } + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_NO_MEMORY); + if (doca_error != DOCA_SUCCESS) { + if (doca_error == DOCA_ERROR_NO_MEMORY) { // only for non-blocked + errno = EAGAIN; + ret = -1; + } else { + log_err("Doca task_alloc_init failed"); + ret = RET_SOCKET_SHUTDOWN; + } + } else { + task_obj = doca_cc_send_task_as_task(task); + do { + doca_error = doca_task_submit(task_obj); + if (doca_error == DOCA_ERROR_AGAIN) { + // Queue is full of tasks, need to free tasks with completion callback + doca_pe_progress(s_user_params.pe); + } + } while (s_user_params.is_blocked && doca_error == DOCA_ERROR_AGAIN); + } + } + + if (doca_error != DOCA_SUCCESS) { + if (doca_error == DOCA_ERROR_AGAIN) { // only for non-blocked + errno = EAGAIN; + ret = -1; + doca_task_free(task_obj); + } else { + log_err("Doca doca_task_submit failed"); + ret = RET_SOCKET_SHUTDOWN; + } + } else { + ret = nbytes; + } + + // Additional call for better performance- release pressure on send queue + if (!s_user_params.doca_cc_fifo || doca_error == DOCA_ERROR_NO_MEMORY) { + doca_pe_progress(s_user_params.pe); + } else { // fast path and task submitted successfully + do { + result = doca_pe_progress(s_user_params.pe); + nanosleep(&ts, &ts); + } while (result == 0); + } + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ { ret = sendto(fd, buf, nbytes, flags, sendto_addr, addrlen); } diff --git a/src/defs.h b/src/defs.h index 04ed0e87..bb360e35 100644 --- a/src/defs.h +++ b/src/defs.h @@ -157,6 +157,11 @@ typedef unsigned short int sa_family_t; #endif // USING_XLIO_EXTRA_API #endif // !defined(__windows__) && !defined(__FreeBSD__) && !defined(__APPLE__) +#ifdef USING_DOCA_COMM_CHANNEL_API +#include "doca_cc_helper.h" +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif /* USING_DOCA_COMM_CHANNEL_API */ + #define MIN_PAYLOAD_SIZE (MsgHeader::EFFECTIVE_SIZE) extern int MAX_PAYLOAD_SIZE; extern int max_fds_num; @@ -291,8 +296,15 @@ enum { OPT_LOAD_XLIO, // 47 OPT_TCP_NB_CONN_TIMEOUT_MS, // 48 #if defined(DEFINED_TLS) - OPT_TLS + OPT_TLS, #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + OPT_DOCA, + OPT_PCI, + OPT_PCI_REP, + OPT_DOCA_FAST_PATH +#endif /* USING_DOCA_COMM_CHANNEL_API */ + }; static const char *const round_trip_str[] = { "latency", "rtt" }; @@ -571,6 +583,9 @@ struct fds_data { #if defined(DEFINED_TLS) void *tls_handle = nullptr; #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + struct cc_ctx *doca_cc_ctx = nullptr; +#endif /* USING_DOCA_COMM_CHANNEL_API */ fds_data() { @@ -804,6 +819,14 @@ struct user_params_t { #if defined(DEFINED_TLS) bool tls = false; #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + bool doca_comm_channel = false; /* Flag to indicate using Com Channel*/ + bool doca_cc_fifo = false; /* Flag to indicate using fast path*/ + char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */ + char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */ + struct doca_pe *pe = nullptr; /* Progress engine for doca, one per thread*/ + struct doca_pe *pe_underload = nullptr; /* Progress engine for doca, one per thread, underload mode */ +#endif /* USING_DOCA_COMM_CHANNEL_API */ user_params_t() { memset(&client_bind_info, 0, sizeof(client_bind_info)); diff --git a/src/doca_cc_helper.h b/src/doca_cc_helper.h new file mode 100644 index 00000000..71270d58 --- /dev/null +++ b/src/doca_cc_helper.h @@ -0,0 +1,1178 @@ +/* + * Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of the Mellanox Technologies Ltd nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifndef DOCA_CC_HELPER_H_ +#define DOCA_CC_HELPER_H_ + +#include +#include + +#include "doca_comm_channel.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "os_abstract.h" + +#define MSG_SIZE 4080 +#define MAX_BUFS 1 +#define CC_DATA_PATH_LOG_TASK_NUM 10 /* Maximum amount of CC consumer and producer task number */ +#define PCI_ADDR_LEN 8 +#define CC_MAX_QUEUE_SIZE 1024 /* Maximum amount of message in queue */ +#define CC_REC_QUEUE_SIZE 10 /* Maximum amount of message in queue */ +#define CC_SEND_TASK_NUM 10 /* Maximum amount of CC send task number */ +#define NANOS_10_X_1000 (10 * 1000) + +#define log_dbg(log_fmt, ...) \ + printf("Doca CC" ": " log_fmt "\n", ##__VA_ARGS__); +#define DOCA_LOG_INFO(format, ...) log_dbg(format, ##__VA_ARGS__) +#define DOCA_LOG_ERR(format, ...) log_dbg(format, ##__VA_ARGS__) +#define DOCA_LOG_DBG(format, ...) log_dbg(format, ##__VA_ARGS__) + +enum cc_state { + CC_CONNECTION_IN_PROGRESS, + CC_CONNECTED +}; +enum cc_fifo_connection_state { + CC_FIFO_CONNECTION_IN_PROGRESS, + CC_FIFO_CONNECTED +}; + +struct cc_local_mem_bufs { + void *mem; /* Memory address for DOCA buf mmap */ + struct doca_mmap *mmap; /* DOCA mmap object */ + struct doca_buf_inventory *buf_inv; /* DOCA buf inventory object */ + int msg_size; /***/ + enum cc_fifo_connection_state fifo_connection_state; /**< Holding state for fast path connection >*/ + struct doca_buf *doca_buf_consumer; + struct doca_buf *doca_buf_producer; + struct doca_cc_consumer_post_recv_task *consumer_task; + struct doca_cc_producer_send_task *producer_task; + struct doca_task *consumer_task_obj; + struct doca_task *producer_task_obj; + bool task_submitted; /**< Indicated if task was already submitted*/ + +}; + +struct cc_ctx { + struct doca_dev *hw_dev; /**< Doca Device used per PCI address > */ + struct doca_cc_connection *connection; /**< Connection object used for pairing a connection >*/ + uint32_t num_connected_clients; /**< Number of currently connected clients >*/ + uint8_t *recv_buffer; /**< Pointer to recv buffer >*/ + int buf_size; /**< Buffer size of recv buffer >*/ + bool recv_flag; /**< flag indicates when message received >*/ + int fd; /**< File Descriptor >*/ + os_mutex_t lock; /**< For underload mode only>*/ + os_cond_t cond; /**< For underload mode only>*/ + enum cc_state state; /**< Holding state of client connection >*/ + bool fast_path; /**< Indicated for using fast data path*/ + struct cc_ctx_fifo ctx_fifo; /**< Data path objects */ +}; + +struct cc_ctx_server { + struct cc_ctx ctx; /**< Base common ctx >*/ + struct doca_dev_rep *rep_dev; /**< Device representor >*/ + struct doca_cc_server *server; /**< Server object >*/ +}; + +struct cc_ctx_client { + struct cc_ctx ctx; /**< Base common ctx >*/ + struct doca_cc_client *client; /**< Client object >*/ +}; + +struct priv_doca_pci_bdf { + #define PCI_FUNCTION_MAX_VALUE 8 + #define PCI_DEVICE_MAX_VALUE 32 + #define PCI_BUS_MAX_VALUE 256 + union { + uint16_t raw; + struct { + uint16_t function : 3; + uint16_t device : 5; + uint16_t bus : 8; + }; + }; +}; + +/************** General ******************/ +static doca_error_t cc_init_producer(struct cc_ctx *ctx); +static doca_error_t cc_init_consumer(struct cc_ctx *ctx); +static doca_error_t +cc_parse_pci_addr(char const *pci_addr, struct priv_doca_pci_bdf *out_bdf) +{ + unsigned int bus_bitmask = 0xFFFFFF00; + unsigned int dev_bitmask = 0xFFFFFFE0; + unsigned int func_bitmask = 0xFFFFFFF8; + uint32_t tmpu; + char tmps[4]; + + if (pci_addr == NULL || strlen(pci_addr) != 7 || pci_addr[2] != ':' || pci_addr[5] != '.') + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[0]; + tmps[1] = pci_addr[1]; + tmps[2] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & bus_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[3]; + tmps[1] = pci_addr[4]; + tmps[2] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & dev_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[6]; + tmps[1] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & func_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + return DOCA_SUCCESS; +} + +typedef doca_error_t (*jobs_check)(struct doca_devinfo *); + +static doca_error_t +cc_open_doca_device_with_pci(const struct priv_doca_pci_bdf *value, jobs_check func, struct doca_dev **retval) +{ + struct doca_devinfo **dev_list; + uint32_t nb_devs; + char pci_buf[DOCA_DEVINFO_REP_PCI_ADDR_SIZE] = {}; + + doca_error_t res; + size_t i; + + /* Set default return value */ + *retval = NULL; + + res = doca_devinfo_create_list(&dev_list, &nb_devs); + if (res != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to load doca devices list. Doca_error value: %d", res); + return res; + } + + /* Search */ + for (i = 0; i < nb_devs; i++) { + res = doca_devinfo_get_pci_addr_str(dev_list[i], pci_buf); + if (res == DOCA_SUCCESS) { + /* If any special capabilities are needed */ + if (func != NULL && func(dev_list[i]) != DOCA_SUCCESS) + continue; + + /* if device can be opened */ + res = doca_dev_open(dev_list[i], retval); + if (res == DOCA_SUCCESS) { + doca_devinfo_destroy_list(dev_list); + return res; + } + } + } + + DOCA_LOG_ERR("Matching device not found."); + res = DOCA_ERROR_NOT_FOUND; + + doca_devinfo_destroy_list(dev_list); + return res; +} + +static doca_error_t +cc_open_doca_device_rep_with_pci(struct doca_dev *local, enum doca_devinfo_rep_filter filter, struct priv_doca_pci_bdf *pci_bdf, + struct doca_dev_rep **retval) +{ + uint32_t nb_rdevs = 0; + struct doca_devinfo_rep **rep_dev_list = NULL; + char pci_buf[DOCA_DEVINFO_REP_PCI_ADDR_SIZE] = {}; + + doca_error_t result; + size_t i; + + *retval = NULL; + + /* Search */ + result = doca_devinfo_rep_create_list(local, filter, &rep_dev_list, &nb_rdevs); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR( + "Failed to create devinfo representors list. Representor devices are available only on DPU, do not run on Host."); + return DOCA_ERROR_INVALID_VALUE; + } + + for (i = 0; i < nb_rdevs; i++) { + result = doca_devinfo_rep_get_pci_addr_str(rep_dev_list[i], pci_buf); + if (result == DOCA_SUCCESS && + doca_dev_rep_open(rep_dev_list[i], retval) == DOCA_SUCCESS) { + doca_devinfo_rep_destroy_list(rep_dev_list); + return DOCA_SUCCESS; + } + } + + DOCA_LOG_ERR("Matching device not found."); + doca_devinfo_rep_destroy_list(rep_dev_list); + return DOCA_ERROR_NOT_FOUND; +} + +static doca_error_t +cc_init_local_mem_bufs(struct cc_local_mem_bufs *local_mem, struct cc_ctx *ctx) +{ + doca_error_t result; + + result = doca_buf_inventory_create(MAX_BUFS, &(local_mem->buf_inv)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to create inventory: %s", doca_error_get_descr(result)); + } + + result = doca_buf_inventory_start(local_mem->buf_inv); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to start inventory: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_create(&local_mem->mmap); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to create mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_add_dev(local_mem->mmap, ctx->hw_dev); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to add device to mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_set_permissions(local_mem->mmap, DOCA_ACCESS_FLAG_PCI_READ_WRITE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to set permission to mmap: %s", doca_error_get_descr(result)); + } + + // set here sockperf buf as local->mem + //result = doca_mmap_set_memrange(local_mem->mmap, local_mem->mem, sizeof(uint8_t) * ctx->ctx_fifo.msg_size * 2); + result = doca_mmap_set_memrange(local_mem->mmap, local_mem->mem, sizeof(uint8_t) * local_mem->msg_size * 2); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to set memrange to mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_start(local_mem->mmap); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to start mmap: %s", doca_error_get_descr(result)); + } + + return DOCA_SUCCESS; +} + +static doca_error_t +cc_init_doca_consumer_task(struct cc_local_mem_bufs *local_mem, struct cc_ctx_fifo *ctx_fifo) +{ + doca_error_t result; + result = doca_buf_inventory_buf_get_by_addr(local_mem->buf_inv, local_mem->mmap, local_mem->mem, + local_mem->msg_size, &ctx_fifo->doca_buf_consumer); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to get doca buf: %s", doca_error_get_descr(result)); + } + result = doca_cc_consumer_post_recv_task_alloc_init(ctx_fifo->consumer, ctx_fifo->doca_buf_consumer, &ctx_fifo->consumer_task); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to allocate consumer task : %s", doca_error_get_descr(result)); + } + ctx_fifo->consumer_task_obj = doca_cc_consumer_post_recv_task_as_task(ctx_fifo->consumer_task); + return result; + +} + +static doca_error_t +cc_init_doca_producer_task(struct cc_local_mem_bufs *local_mem, struct cc_ctx_fifo *ctx_fifo) +{ + doca_error_t result; + result = doca_buf_inventory_buf_get_by_addr(local_mem->buf_inv, local_mem->mmap, local_mem->mem, + local_mem->msg_size, &ctx_fifo->doca_buf_producer); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to get doca buf: %s", doca_error_get_descr(result)); + } + result = doca_cc_producer_send_task_alloc_init(ctx_fifo->producer, ctx_fifo->doca_buf_producer, + ctx_fifo->remote_consumer_id ,&ctx_fifo->producer_task); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to allocate consumer task : %s", doca_error_get_descr(result)); + } + ctx_fifo->producer_task_obj = doca_cc_producer_send_task_as_task(ctx_fifo->producer_task); + return result; + +} +/************** SERVER ******************/ + +/** + * Callback for send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @user_data [in]: User data for context + */ +static void +cc_server_send_task_completion_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + /* These arguments are not in use */ + (void)user_data; + (void)task_user_data; + + // DOCA_LOG_INFO("Task sent successfully"); + + doca_task_free(doca_cc_send_task_as_task(task)); +} + +/** + * Callback for send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @user_data [in]: User data for context + */ +static void +cc_server_send_task_completion_err_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + doca_error_t result; + + /* This argument is not in use */ + (void)task_user_data; + + result = doca_task_get_status(doca_cc_send_task_as_task(task)); + DOCA_LOG_ERR("[fd=%d] Message failed to send with error = %s", ctx_server->ctx.fd , doca_error_get_name(result)); + + doca_task_free(doca_cc_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_server_as_ctx(ctx_server->server)); +} + +/** + * Callback for message recv event + * + * @event [in]: Recv event object + * @recv_buffer [in]: Message buffer + * @msg_len [in]: Message len + * @cc_connection [in]: Connection the message was received on + */ +static void +cc_server_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *recv_buffer, size_t msg_len, + struct doca_cc_connection *cc_connection) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Save the connection that the ping was sent over for sending the response */ + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + ctx_server->ctx.connection = cc_connection; + + // DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); + + memcpy(ctx_server->ctx.recv_buffer, recv_buffer, msg_len); + ctx_server->ctx.buf_size = (int)msg_len; + ctx_server->ctx.recv_flag = true; +} + +/** + * Callback for consumer post recv task successfull completion + * + * @task [in]: Recv task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_consumer_recv_task_completion_callback(struct doca_cc_consumer_post_recv_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + size_t recv_msg_len; + void *recv_msg; + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + struct cc_ctx *ctx = (struct cc_ctx *)user_data.ptr; + + buf = doca_cc_consumer_post_recv_task_get_buf(task); + + result = doca_buf_get_data(buf, &recv_msg); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get data address from DOCA buf with error = %s", doca_error_get_name(result)); + } + + result = doca_buf_get_data_len(buf, &recv_msg_len); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get data length from DOCA buf with error = %s", doca_error_get_name(result)); + } + + ctx->buf_size = (int)recv_msg_len; + ctx->recv_flag = true; + + // DOCA_LOG_INFO("Message received: '%.*s'", (int)recv_msg_len, (char *)recv_msg); +} + +/** + * Callback for consumer post recv task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_consumer_recv_task_completion_err_callback(struct doca_cc_consumer_post_recv_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + result = doca_task_get_status(doca_cc_consumer_post_recv_task_as_task(task)); + DOCA_LOG_ERR("Consumer failed to recv message with error = %s", doca_error_get_name(result)); + + buf = doca_cc_consumer_post_recv_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_consumer_post_recv_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_consumer_as_ctx(ctx_server->ctx.ctx_fifo.consumer)); +} + +static doca_error_t +cc_init_consumer(struct cc_ctx *ctx) +{ + doca_error_t result; + doca_data user_data; + struct doca_ctx *doca_ctx; + struct cc_local_mem_bufs *local_consumer_mem = &(ctx->ctx_fifo.consumer_mem); + + result = doca_cc_consumer_create(ctx->connection, local_consumer_mem->mmap, &(ctx->ctx_fifo.consumer)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to create consumer with error = %s", doca_error_get_name(result)); + return result; + } + doca_ctx = doca_cc_consumer_as_ctx(ctx->ctx_fifo.consumer); + if (ctx->ctx_fifo.underload_mode) { + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe_underload, doca_ctx); + } else { + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe, doca_ctx); + } + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding pe context to server with error = %s", doca_error_get_name(result)); + } + result = doca_cc_consumer_post_recv_task_set_conf(ctx->ctx_fifo.consumer, cc_consumer_recv_task_completion_callback, + cc_consumer_recv_task_completion_err_callback, CC_DATA_PATH_LOG_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting consumer recv task cbs with error = %s", doca_error_get_name(result)); + return result; + } + user_data.ptr = (void*) ctx; + result = doca_ctx_set_user_data(doca_ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting consumer user data with error = %s", doca_error_get_name(result)); + return result; + } + result = doca_ctx_start(doca_ctx); + if (result != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed to start consumer context with error = %s", doca_error_get_name(result)); + } + return DOCA_SUCCESS; +} + +/** + * Callback for connection event + * + * @event [in]: Connection event object + * @cc_conn [in]: Connection object + * @change_success [in]: Whether the connection was successful or not + */ +static void +cc_server_connection_event_callback(struct doca_cc_event_connection_status_changed *event, + struct doca_cc_connection *cc_conn, bool change_success) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_conn); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Update number of connected clients in case of successful connection */ + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + if (!change_success) { + DOCA_LOG_ERR("[fd=%d] Failed connection received", ctx_server->ctx.fd); + return; + } + + ctx_server->ctx.num_connected_clients++; + ctx_server->ctx.connection = cc_conn; + + if (ctx_server->ctx.fast_path) { + /* Init a cc consumer */ + result = cc_init_consumer(&ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a consumer with error = %s", ctx_server->ctx.fd, doca_error_get_name(result)); + } + /* Init a cc producer */ + result = cc_init_producer(&ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a producer with error = %s", ctx_server->ctx.fd, doca_error_get_name(result)); + } + DOCA_LOG_INFO("Consumer & Producer were created successfully"); + } + DOCA_LOG_INFO("[fd=%d] New client connected to server", ctx_server->ctx.fd); +} + +/** + * Callback for disconnection event + * + * @event [in]: Connection event object + * @cc_conn [in]: Connection object + * @change_success [in]: Whether the disconnection was successful or not + */ +static void +cc_server_disconnection_event_callback(struct doca_cc_event_connection_status_changed *event, + struct doca_cc_connection *cc_conn, bool change_success) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* These arguments are not in use */ + (void)event; + (void)change_success; + + cc_server = doca_cc_server_get_server_ctx(cc_conn); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Update number of connected clients in case of disconnection, Currently disconnection only happens if server + * sent a message to a client which already stopped. + */ + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + ctx_server->ctx.num_connected_clients--; + DOCA_LOG_INFO("[fd=%d] client was disconnected from server", ctx_server->ctx.fd); +} + +/** + * Callback triggered whenever CC server context state changes + * + * @user_data [in]: User data associated with the CC server context. Will hold struct cc_ctrl_path_objects * + * @ctx [in]: The CC server context that had a state change + * @prev_state [in]: Previous context state + * @next_state [in]: Next context state (context is already in this state when the callback is called) + */ +static void +cc_server_state_changed_callback(const union doca_data user_data, struct doca_ctx *ctx, enum doca_ctx_states prev_state, + enum doca_ctx_states next_state) +{ + (void)ctx; + (void)prev_state; + + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + + switch (next_state) { + case DOCA_CTX_STATE_IDLE: + DOCA_LOG_INFO("[fd=%d] CC server context has been stopped.", ctx_server->ctx.fd); + break; + case DOCA_CTX_STATE_STARTING: + /** + * The context is in starting state, this is unexpected for CC server. + */ + DOCA_LOG_ERR("[fd=%d] CC server context entered into starting state. Unexpected transition", ctx_server->ctx.fd); + break; + case DOCA_CTX_STATE_RUNNING: + DOCA_LOG_INFO("[fd=%d] CC server context is running. Waiting for clients to connect", ctx_server->ctx.fd); + break; + case DOCA_CTX_STATE_STOPPING: + /** + * The context is in stopping, this can happen when fatal error encountered or when stopping context. + * doca_pe_progress() will cause all tasks to be flushed, and finally transition state to idle + */ + DOCA_LOG_INFO("[fd=%d] CC server context entered into stopping state. Terminating connections with clients", ctx_server->ctx.fd); + break; + default: + break; + } +} + +/** + * Callback for new consumer arrival event + * + * @event [in]: New remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the new remote consumer + */ +static void +cc_server_new_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + ctx_server->ctx.ctx_fifo.remote_consumer_id = id; + DOCA_LOG_INFO("[fd=%d] Got a new remote consumer with ID = [%d]",ctx_server->ctx.fd, id); + ctx_server->ctx.ctx_fifo.fifo_connection_state = CC_FIFO_CONNECTED; + +} + +/** + * Callback for expired consumer arrival event + * + * @event [in]: Expired remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the expired remote consumer + */ +static void +cc_server_expired_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + /* These arguments are not in use */ + (void)event; + (void)cc_connection; + (void)id; +} + + +static doca_error_t +cc_doca_server_set_params(struct cc_ctx_server *ctx_server) +{ + struct doca_ctx *ctx; + doca_error_t result; + union doca_data user_data; + + ctx = doca_cc_server_as_ctx(ctx_server->server); + result = doca_ctx_set_state_changed_cb(ctx, cc_server_state_changed_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting state change callback with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("doca_ctx_set_state_changed_cb succeeded"); + + result = doca_cc_server_send_task_set_conf(ctx_server->server, cc_server_send_task_completion_callback, + cc_server_send_task_completion_err_callback, CC_SEND_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting send task cbs with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("doca_cc_server_send_task_set_conf succeeded"); + + result = doca_cc_server_event_msg_recv_register(ctx_server->server, cc_server_message_recv_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding message recv event cb with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("doca_cc_server_event_msg_recv_register succeeded"); + + result = doca_cc_server_event_connection_register(ctx_server->server, cc_server_connection_event_callback, + cc_server_disconnection_event_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding connection event cbs with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("doca_cc_server_event_connection_register succeeded"); + + /* Set server properties */ + result = doca_cc_server_set_max_msg_size(ctx_server->server, MSG_SIZE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); + } + + result = doca_cc_server_set_recv_queue_size(ctx_server->server, CC_REC_QUEUE_SIZE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); + } + if (ctx_server->ctx.fast_path) { // Fast path option + result = doca_cc_server_event_consumer_register(ctx_server->server, cc_server_new_consumer_callback, + cc_server_expired_consumer_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding consumer event cb with error = %s", doca_error_get_name(result)); + } + struct cc_local_mem_bufs *local_consumer_mem = &(ctx_server->ctx.ctx_fifo.consumer_mem); + local_consumer_mem->mem = ctx_server->ctx.recv_buffer; + result = cc_init_local_mem_bufs(local_consumer_mem, &ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init consumer memory with error = %s", doca_error_get_name(result)); + return result; + } + DOCA_LOG_DBG("Init consumer memory succeeded"); + + } + user_data.ptr = (void *)ctx_server; + + result = doca_ctx_set_user_data(ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(result)); + } + + result = doca_ctx_start(ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to start server context with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_INFO("[fd=%d] server properties setters succeeded", ctx_server->ctx.fd); + return result; + +} + +/************** CLIENT ******************/ + +/** + * Callback for send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @user_data [in]: User data for context + */ +static void +cc_client_send_task_completion_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + /* These arguments are not in use */ + (void)user_data; + (void)task_user_data; + + // DOCA_LOG_INFO("Task sent successfully"); + + doca_task_free(doca_cc_send_task_as_task(task)); +} + +/** + * Callback for send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @user_data [in]: User data for context + */ + +static void +cc_client_send_task_completion_err_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct cc_ctx_client *cc_client = (struct cc_ctx_client *)user_data.ptr; + doca_error_t result; + + /* This argument is not in use */ + (void)task_user_data; + + result = doca_task_get_status(doca_cc_send_task_as_task(task)); + DOCA_LOG_ERR("[fd=%d] Message failed to send with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + + doca_task_free(doca_cc_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_client_as_ctx(cc_client->client)); +} + +/** + * Callback for message recv event + * + * @event [in]: Recv event object + * @recv_buffer [in]: Message buffer + * @msg_len [in]: Message len + * @cc_connection [in]: Connection the message was received on + */ +static void +cc_client_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *recv_buffer, size_t msg_len, + struct doca_cc_connection *cc_connection) +{ + union doca_data user_data = doca_cc_connection_get_user_data(cc_connection); + struct cc_ctx_client *cc_client = (struct cc_ctx_client *)user_data.ptr; + + /* This argument is not in use */ + (void)event; + + // DOCA_LOG_INFO("[fd=%d] Message received: '%d", cc_client->ctx.fd, (int)msg_len); + memcpy(cc_client->ctx.recv_buffer, recv_buffer, msg_len); + cc_client->ctx.buf_size = (int)msg_len; + cc_client->ctx.recv_flag = true; +} + +/** + * Callback for message recv event + * + * @event [in]: Recv event object + * @recv_buffer [in]: Message buffer + * @msg_len [in]: Message len + * @cc_connection [in]: Connection the message was received on + */ +static void +cc_client_message_UL_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *recv_buffer, size_t msg_len, + struct doca_cc_connection *cc_connection) +{ + union doca_data user_data = doca_cc_connection_get_user_data(cc_connection); + struct cc_ctx_client *cc_client = (struct cc_ctx_client *)user_data.ptr; + + struct timespec ts = { + .tv_sec = 0, + .tv_nsec = NANOS_10_X_1000, + }; + /* This argument is not in use */ + (void)event; + os_mutex_lock(&cc_client->ctx.lock); + // In case recv thread is already reading, waiting for completion + // Need to make sure last meesage was read before we override the buffer + while (cc_client->ctx.recv_flag) { + nanosleep(&ts, &ts); + } + DOCA_LOG_INFO("[fd=%d] Message received: '%d", cc_client->ctx.fd, (int)msg_len); + memcpy(cc_client->ctx.recv_buffer, recv_buffer, msg_len); + cc_client->ctx.buf_size = (int)msg_len; + cc_client->ctx.recv_flag = true; + // Siganl to recv thread for copy done- recv thread can continue + os_cond_signal(&cc_client->ctx.cond); + os_mutex_unlock(&cc_client->ctx.lock); +} + + +/** + * Callback for producer send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_producer_send_task_completion_callback(struct doca_cc_producer_send_task *task, union doca_data task_user_data, + union doca_data ctx_user_data) +{ + (void)task_user_data; + (void)ctx_user_data; + struct doca_buf *buf; + + // DOCA_LOG_INFO("Producer task sent successfully"); + buf = doca_cc_producer_send_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_producer_send_task_as_task(task)); +} + +/** + * Callback for producer send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_producer_send_task_completion_err_callback(struct doca_cc_producer_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)user_data.ptr; + result = doca_task_get_status(doca_cc_producer_send_task_as_task(task)); + DOCA_LOG_ERR("Producer message failed to send with error = %s", + doca_error_get_name(result)); + + buf = doca_cc_producer_send_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_producer_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_producer_as_ctx(ctx_client->ctx.ctx_fifo.producer)); +} + +static doca_error_t +cc_init_producer(struct cc_ctx *ctx) +{ + doca_error_t result; + doca_data user_data; + struct doca_ctx *doca_ctx; + + result = doca_cc_producer_create(ctx->connection, &(ctx->ctx_fifo.producer)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to create producer with error = %s", doca_error_get_name(result)); + return result; + } + doca_ctx = doca_cc_producer_as_ctx(ctx->ctx_fifo.producer); + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe, doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding pe context to producer with error = %s", doca_error_get_name(result)); + } + result = doca_cc_producer_send_task_set_conf(ctx->ctx_fifo.producer, cc_producer_send_task_completion_callback, + cc_producer_send_task_completion_err_callback, CC_DATA_PATH_LOG_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting producer send task cbs with error = %s", doca_error_get_name(result)); + } + + user_data.ptr = (void*) ctx; + result = doca_ctx_set_user_data(doca_ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting producer user data with error = %s", doca_error_get_name(result)); + return result; + } + result = doca_ctx_start(doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to start producer context with error = %s", doca_error_get_name(result)); + } + return DOCA_SUCCESS; +} + +/** + * Init message on client + * + * @cc_ctx_client [in]: cc_ctx_client struct + * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise + */ +static doca_error_t +cc_init_client_send_message(struct cc_ctx_client *cc_client) +{ + doca_error_t result; + union doca_data user_data; + + result = doca_cc_client_get_connection(cc_client->client, &(cc_client->ctx.connection)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to get connection from client with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + return result; + } + DOCA_LOG_INFO("[fd=%d] doca_cc_client_get_connection succeeded", cc_client->ctx.fd); + + user_data.ptr = (void *)cc_client; + result = doca_cc_connection_set_user_data(cc_client->ctx.connection, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to set user_data for connection with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + return result; + } + + if (cc_client->ctx.fast_path) { + /* Init a cc producer */ + result = cc_init_producer(&cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a producer with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + } + result = cc_init_consumer(&cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a consumer with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + } + } + cc_client->ctx.state = CC_CONNECTED; + DOCA_LOG_INFO("[fd=%d] init_client_send_message succeeded", cc_client->ctx.fd); + return DOCA_SUCCESS; +} + +/** + * Callback triggered whenever CC client context state changes + * + * @user_data [in]: User data associated with the CC client context. Will hold struct cc_ctrl_path_objects * + * @ctx [in]: The CC client context that had a state change + * @prev_state [in]: Previous context state + * @next_state [in]: Next context state (context is already in this state when the callback is called) + */ +static void +cc_client_state_changed_callback(const union doca_data user_data, struct doca_ctx *ctx, enum doca_ctx_states prev_state, + enum doca_ctx_states next_state) +{ + (void)ctx; + (void)prev_state; + doca_error_t result; + struct cc_ctx_client *cc_client = (struct cc_ctx_client *)user_data.ptr; + + switch (next_state) { + case DOCA_CTX_STATE_IDLE: + DOCA_LOG_INFO("[fd=%d] CC client context has been stopped.", cc_client->ctx.fd); + break; + case DOCA_CTX_STATE_STARTING: + /** + * The context is in starting state, need to progress until connection with server is established. + */ + DOCA_LOG_INFO("[fd=%d] CC client context entered into starting state. Waiting for connection establishment", cc_client->ctx.fd); + break; + case DOCA_CTX_STATE_RUNNING: + DOCA_LOG_INFO("[fd=%d] CC client context is running. initialize message", cc_client->ctx.fd); + result = cc_init_client_send_message(cc_client); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to submit send task with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + (void)doca_ctx_stop(doca_cc_client_as_ctx(cc_client->client)); + } + break; + case DOCA_CTX_STATE_STOPPING: + /** + * The context is in stopping, this can happen when fatal error encountered or when stopping context. + * doca_pe_progress() will cause all tasks to be flushed, and finally transition state to idle + */ + DOCA_LOG_INFO("[fd=%d] CC client context entered into stopping state. Waiting for connection termination", cc_client->ctx.fd); + break; + default: + break; + } +} +/** + * Callback for new consumer arrival event + * + * @event [in]: New remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the new remote consumer + */ +static void +cc_client_new_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + union doca_data user_data; + struct doca_cc_client *cc_client; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_client = doca_cc_client_get_client_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_client_as_ctx(cc_client), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)(user_data.ptr); + ctx_client->ctx.ctx_fifo.remote_consumer_id = id; + + ctx_client->ctx.ctx_fifo.fifo_connection_state = CC_FIFO_CONNECTED; + DOCA_LOG_INFO("[fd=%d] Got a new remote consumer with ID = [%d]",ctx_client->ctx.fd, id); +} + +/** + * Callback for expired consumer arrival event + * + * @event [in]: Expired remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the expired remote consumer + */ +static void +cc_client_expired_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + /* These arguments are not in use */ + (void)event; + (void)cc_connection; + (void)id; +} + +static doca_error_t +cc_doca_client_set_params(struct cc_ctx_client *cc_client) +{ + struct doca_ctx *ctx; + doca_error_t result; + union doca_data user_data; + + ctx = doca_cc_client_as_ctx(cc_client->client); + result = doca_ctx_set_state_changed_cb(ctx, cc_client_state_changed_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting state change callback with error = %s", doca_error_get_name(result)); + } + + result = doca_cc_client_send_task_set_conf(cc_client->client, cc_client_send_task_completion_callback, + cc_client_send_task_completion_err_callback, CC_SEND_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting send task cbs with error = %s", doca_error_get_name(result)); + } + + if (!cc_client->ctx.ctx_fifo.underload_mode) { // ping pong or throughput test + result = doca_cc_client_event_msg_recv_register(cc_client->client, cc_client_message_recv_callback); + } else { // underload test + result = doca_cc_client_event_msg_recv_register(cc_client->client, cc_client_message_UL_recv_callback); + } + + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding message recv event cb with error = %s", doca_error_get_name(result)); + } + + /* Set client properties */ + result = doca_cc_client_set_max_msg_size(cc_client->client, MSG_SIZE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); + } + + result = doca_cc_client_set_recv_queue_size(cc_client->client, CC_REC_QUEUE_SIZE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); + } + + if (cc_client->ctx.fast_path) { // Fast path option + result = doca_cc_client_event_consumer_register(cc_client->client, cc_client_new_consumer_callback, + cc_client_expired_consumer_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding consumer event cb with error = %s", doca_error_get_name(result)); + } + struct cc_local_mem_bufs *local_consumer_mem = &(cc_client->ctx.ctx_fifo.consumer_mem); + local_consumer_mem->mem = cc_client->ctx.recv_buffer; + result = cc_init_local_mem_bufs(local_consumer_mem, &cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to consumer memory with error = %s", doca_error_get_name(result)); + return result; + } + DOCA_LOG_DBG("Init consumer memory succeeded"); + } + + user_data.ptr = (void *)cc_client; + result = doca_ctx_set_user_data(ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(result)); + } + + result = doca_ctx_start(ctx); + if (result != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed to start client context with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("[fd=%d] client properties setters succeeded", cc_client->ctx.fd); + return result; +} + +#endif // DOCA_CC_HELPER_H \ No newline at end of file diff --git a/src/input_handlers.h b/src/input_handlers.h index 4578ffeb..e4c292da 100644 --- a/src/input_handlers.h +++ b/src/input_handlers.h @@ -71,12 +71,84 @@ class RecvFromInputHandler : public MessageParser { ret = tls_read(g_fds_array[fd]->tls_handle, buf, m_recv_data.cur_size); } else #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + struct timespec ts = { + .tv_sec = 0, + .tv_nsec = NANOS_10_X_1000, + }; + struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; + struct doca_pe *pe_local = s_user_params.pe; + doca_error_t result = DOCA_SUCCESS; + if (!s_user_params.doca_cc_fifo && s_user_params.mode == MODE_CLIENT + && !g_pApp->m_const_params.b_client_ping_pong && !g_pApp->m_const_params.b_stream) { // latency_under_load + os_mutex_lock(&ctx->lock); + while (!ctx->recv_flag) { + // UL only-> wait for signal, once done copy buffer + os_cond_wait(&ctx->cond, &ctx->lock); + } + } else { // ping pong or throughput + if (s_user_params.doca_cc_fifo) { + if (!ctx->ctx_fifo.task_submitted) {// avoid submitting the same task again + result = doca_buf_reset_data_len(ctx->ctx_fifo.doca_buf_consumer); + if (result != DOCA_SUCCESS) { + log_err("failed resetting doca data len with error = %s", doca_error_get_name(result)); + } + result = doca_task_submit(ctx->ctx_fifo.consumer_task_obj); + if (result != DOCA_SUCCESS) { + log_err("failed submitting recv task with error = %s", doca_error_get_name(result)); + doca_task_free(ctx->ctx_fifo.consumer_task_obj); + (void)doca_buf_dec_refcount(ctx->ctx_fifo.doca_buf_consumer, NULL); + return RET_SOCKET_SHUTDOWN; + } + // Avoid submitting task when there is already task in queue + ctx->ctx_fifo.task_submitted = true; + } + if (s_user_params.mode == MODE_CLIENT && !g_pApp->m_const_params.b_client_ping_pong + && !g_pApp->m_const_params.b_stream) { // latency_under_load + pe_local = s_user_params.pe_underload; + } + } // end of doca fast path + // Waiting for meesage receive callback - blocking mode + if (s_user_params.is_blocked) { + while (!ctx->recv_flag) { + if (doca_pe_progress(pe_local) == 0) { + nanosleep(&ts, &ts); + } + } + } else { // non-blocked + doca_pe_progress(pe_local); + if (!ctx->recv_flag) {// Message recv + errno = EAGAIN; + ctx->buf_size = -1; + } + } + if (s_user_params.doca_cc_fifo && ctx->recv_flag) { + // another task can be submitted + ctx->ctx_fifo.task_submitted = false; + } + } + + m_actual_buf_size = ctx->buf_size; + m_actual_buf = ctx->recv_buffer; + + // Done reading setting flag to send mode + ctx->recv_flag = false; + + if (!g_pApp->m_const_params.b_client_ping_pong && + !g_pApp->m_const_params.b_stream) { // latency_under_load + os_mutex_unlock(&ctx->lock); + } + ret = ctx->buf_size; + + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ { ret = recvfrom(fd, buf, m_recv_data.cur_size, flags, (struct sockaddr *)recvfrom_addr, &size); - } m_actual_buf = buf; m_actual_buf_size = ret; + } #if defined(LOG_TRACE_MSG_IN) && (LOG_TRACE_MSG_IN == TRUE) printf("> "); diff --git a/src/iohandlers.cpp b/src/iohandlers.cpp index 0903bfe4..74ee2628 100644 --- a/src/iohandlers.cpp +++ b/src/iohandlers.cpp @@ -38,6 +38,11 @@ void print_addresses(const fds_data *data, int &list_count) NI_NUMERICHOST | NI_NUMERICSERV); switch (data->server_addr.addr.sa_family) { case AF_UNIX: +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) + printf("[%2d] Address is %s # DOCA\n", list_count++, pbuf); + else +#endif printf("[%2d] ADDR = %s # %s\n", list_count++, data->server_addr.addr_un.sun_path, PRINT_PROTOCOL(data->sock_type)); break; default: diff --git a/src/os_abstract.cpp b/src/os_abstract.cpp index 2b6816e5..8258ac12 100644 --- a/src/os_abstract.cpp +++ b/src/os_abstract.cpp @@ -139,6 +139,36 @@ os_thread_t os_getthread(void) { #endif return mythread; } +// Conditional Functions + +void os_cond_init(os_cond_t *cond) { +#ifdef __windows__ + // +#else + pthread_cond_init(&cond->cond, NULL); +#endif +} +void os_cond_destroy(os_cond_t *cond) { +#ifdef __windows__ + // +#else + pthread_cond_destroy(&cond->cond); +#endif +} +void os_cond_wait(os_cond_t *cond, os_mutex_t *lock) { +#ifdef __windows__ + // +#else + pthread_cond_wait(&cond->cond, &lock->mutex); +#endif +} +void os_cond_signal(os_cond_t *cond) { +#ifdef __windows__ + // +#else + pthread_cond_signal(&cond->cond); +#endif +} // Mutex functions diff --git a/src/os_abstract.h b/src/os_abstract.h index 5121770a..fdadbc25 100644 --- a/src/os_abstract.h +++ b/src/os_abstract.h @@ -188,6 +188,14 @@ typedef struct os_mutex_t { #endif } os_mutex_t; +typedef struct os_cond_t { +#ifndef __windows__ + pthread_cond_t cond; +#else + CONDITION_VARIABLE cond; +#endif +} os_cond_t; + typedef struct os_cpuset_t { #ifdef __windows__ DWORD_PTR cpuset; @@ -234,6 +242,13 @@ void os_thread_kill(os_thread_t *thr); void os_thread_join(os_thread_t *thr); os_thread_t os_getthread(void); +// Cond Functions + +void os_cond_init(os_cond_t *cond); +void os_cond_destroy(os_cond_t *cond); +void os_cond_wait(os_cond_t *cond, os_mutex_t *lock); +void os_cond_signal(os_cond_t *cond); + // Mutex functions void os_mutex_init(os_mutex_t *lock); diff --git a/src/server.cpp b/src/server.cpp index d0004d99..e4e10a7e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -95,8 +95,38 @@ int ServerBase::initBeforeLoop() { } std::string hostport = sockaddr_to_hostport(p_bind_addr); +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel && s_user_params.doca_cc_fifo) { + doca_error_t result; + struct cc_local_mem_bufs *local_producer_mem = &(g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.producer_mem); + struct cc_local_mem_bufs *local_consumer_mem = &(g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.consumer_mem); + // Buf is needed for registering with memrange + local_producer_mem->mem = local_consumer_mem->mem; + result = cc_init_local_mem_bufs(local_producer_mem, g_fds_array[ifd]->doca_cc_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result)); + return result; + } + log_dbg("[fd=%d] Init producer memory succeeded", ifd); + // Waiting for connection recv before using fast path + while (g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.fifo_connection_state != CC_FIFO_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + log_dbg("[fd=%d] New client connected successfully", ifd); + result = cc_init_doca_consumer_task(local_consumer_mem, &g_fds_array[ifd]->doca_cc_ctx->ctx_fifo); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init doca consumer task with error = %s", doca_error_get_name(result)); + } + result = cc_init_doca_producer_task(local_producer_mem, &g_fds_array[ifd]->doca_cc_ctx->ctx_fifo); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init doca producer task with error = %s", doca_error_get_name(result)); + } + + } else if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { +#else log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { +#endif //USING_DOCA_COMM_CHANNEL_API log_err("[fd=%d] Can`t bind socket, IP to bind: %s\n", ifd, hostport.c_str()); rc = SOCKPERF_ERR_SOCKET; diff --git a/src/sockperf.cpp b/src/sockperf.cpp index 0eb41ce6..c93b9100 100644 --- a/src/sockperf.cpp +++ b/src/sockperf.cpp @@ -103,6 +103,8 @@ #ifndef __windows__ #include #endif +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" // forward declarations from Client.cpp & Server.cpp extern void client_sig_handler(int signum); @@ -138,6 +140,7 @@ static int proc_mode_ping_pong(int, int, const char **); static int proc_mode_throughput(int, int, const char **); static int proc_mode_playback(int, int, const char **); static int proc_mode_server(int, int, const char **); +int bringup_for_doca(std::unique_ptr &tmp); static const struct app_modes { int (*func)(int, int, const char **); /* proc function */ @@ -297,6 +300,16 @@ static const AOPT_DESC common_opt_desc[] = { { OPT_TLS, AOPT_OPTARG, aopt_set_literal(0), aopt_set_string("tls"), "Use TLSv1.2 (default " TLS_CHIPER_DEFAULT ")." }, #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + { OPT_DOCA, AOPT_NOARG, aopt_set_literal(0), + aopt_set_string("doca-comm-channel"), "Use Doca communication channel" }, + { OPT_DOCA_FAST_PATH, AOPT_NOARG, aopt_set_literal(0), + aopt_set_string("doca-fast-path"), "Use Doca fast data path (required doca-comm-channel option)" }, + { OPT_PCI, AOPT_ARG, aopt_set_literal(0), + aopt_set_string("pci-address"), "Comm Channel DOCA device PCI address"}, + { OPT_PCI_REP, AOPT_ARG, aopt_set_literal(0), + aopt_set_string("pci-representor"), "Comm Channel DOCA device representor PCI address"}, +#endif /* USING_DOCA_COMM_CHANNEL_API */ { 'd', AOPT_NOARG, aopt_set_literal('d'), aopt_set_string("debug"), "Print extra debug information." }, { 0, AOPT_NOARG, aopt_set_literal(0), aopt_set_string(NULL), NULL } @@ -2217,6 +2230,52 @@ static int parse_common_opt(const AOPT_OBJECT *common_obj) { } } #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (!rc && aopt_check(common_obj, OPT_DOCA)) { + if (aopt_check(common_obj, 'tls')) { + log_msg("--doca-comm-channel conflicts with --tls option"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } + if (!aopt_check(common_obj, OPT_PCI)) { + log_msg("doca-comm-channel must have pci address"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + const char *optarg = aopt_value(common_obj, OPT_PCI); + if (optarg) { + strcpy(s_user_params.cc_dev_pci_addr, optarg); + } + } + if (s_user_params.mode == MODE_SERVER) { + if (!aopt_check(common_obj, OPT_PCI_REP)) { + log_msg("doca-comm-channel server must have pci representor address"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + const char *optarg = aopt_value(common_obj, OPT_PCI_REP); + if (optarg) { + strcpy(s_user_params.cc_dev_rep_pci_addr, optarg); + } + } + } + s_user_params.doca_comm_channel = true; + + /* Create PE */ + doca_error_t doca_error; + doca_error = doca_pe_create(&(s_user_params.pe)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Fail creating pe with error %s", doca_error_get_name(doca_error)); + rc = SOCKPERF_ERR_NO_MEMORY; + } + log_dbg("doca_pe_create succeeded"); + } + if (!rc && aopt_check(common_obj, OPT_DOCA_FAST_PATH)) { + if (!aopt_check(common_obj, OPT_DOCA)) { + log_msg("--doca-comm-channel is required for fast path option"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + s_user_params.doca_cc_fifo = true; + } + } +#endif /* USING_DOCA_COMM_CHANNEL_API */ } // resolve address: -i, -p and --tcp options must be processed before @@ -2407,6 +2466,36 @@ void cleanup() { FREE(g_fds_array[ifd]->memberships_addr); } if (s_user_params.addr.addr.sa_family == AF_UNIX) { +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + struct cc_ctx *ctx = g_fds_array[ifd]->doca_cc_ctx; + doca_pe_destroy(s_user_params.pe); + doca_dev_close(ctx->hw_dev); + os_mutex_close(&ctx->lock); + os_cond_destroy(&ctx->cond); + if (s_user_params.doca_cc_fifo) { + doca_cc_consumer_destroy(ctx->ctx_fifo.consumer); + doca_cc_producer_destroy(ctx->ctx_fifo.producer); + doca_mmap_destroy(ctx->ctx_fifo.consumer_mem.mmap); + doca_mmap_destroy(ctx->ctx_fifo.producer_mem.mmap); + doca_buf_inventory_destroy(ctx->ctx_fifo.consumer_mem.buf_inv); + doca_buf_inventory_destroy(ctx->ctx_fifo.producer_mem.buf_inv); + if (s_user_params.mode == MODE_CLIENT && ctx->ctx_fifo.underload_mode) { + doca_pe_destroy(s_user_params.pe_underload); + } + } + if (s_user_params.mode == MODE_SERVER) { + struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; + doca_cc_server_destroy(ctx_server->server); + doca_dev_rep_close(ctx_server->rep_dev); + FREE(ctx_server); + } else { // MODE_CLIENT + struct cc_ctx_client *ctx_client = (struct cc_ctx_client*)ctx; + doca_cc_client_destroy(ctx_client->client); + FREE(ctx_client); + } + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ os_unlink_unix_path(s_user_params.client_bind_info.addr_un.sun_path); #ifndef __windows__ // AF_UNIX with DGRAM isn't supported in __windows__ if (s_user_params.mode == MODE_CLIENT && s_user_params.sock_type == SOCK_DGRAM) { // unlink binded client @@ -3283,7 +3372,7 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { #endif // USING_EXTRA_API std::unique_ptr tmp{ new fds_data }; - + bool skip_socket = false; int res = resolve_sockaddr(addr.c_str(), port.c_str(), sock_type, false, reinterpret_cast(&tmp->server_addr), tmp->server_addr_len); if (res != 0) { @@ -3359,23 +3448,69 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { g_fds_array[curr_fd]->memberships_size++; } else { /* create a socket */ - if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < - 0) { // TODO: use SOCKET all over the way and avoid this cast - log_err("socket(AF_INET4/6, SOCK_x)"); - rc = SOCKPERF_ERR_SOCKET; + int i = 0; + + for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { + tmp->active_fd_list[i] = (int)INVALID_SOCKET; // TODO: use SOCKET all + // over the way and avoid + // this cast } - fd_socket_map[port_desc_tmp] = curr_fd; - if (tmp->is_multicast) { - tmp->memberships_addr = reinterpret_cast(MALLOC( - IGMP_MAX_MEMBERSHIPS * sizeof(struct sockaddr_store_t))); + + // TODO: In the following malloc we have a one time memory allocation of + // 128KB that are not reclaimed + // This O(1) leak was introduced in revision 133 + tmp->recv.buf = (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); + if (!tmp->recv.buf) { + log_err("Failed to allocate memory with malloc()"); + rc = SOCKPERF_ERR_NO_MEMORY; } else { - tmp->memberships_addr = NULL; + tmp->recv.cur_addr = tmp->recv.buf; + tmp->recv.max_size = MAX_PAYLOAD_SIZE; + tmp->recv.cur_offset = 0; + tmp->recv.cur_size = tmp->recv.max_size; } - tmp->memberships_size = 0; - s_fd_num++; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + log_dbg("starting feedfile for doca"); + int curr_fd = bringup_for_doca(tmp); + s_fd_num++; + skip_socket = true; + fd_socket_map[port_desc_tmp] = curr_fd; + if (new_socket_flag) { + if (s_fd_num == 1) { /*it is the first fd*/ + s_fd_min = curr_fd; + s_fd_max = curr_fd; + } else { + g_fds_array[last_fd]->next_fd = curr_fd; + s_fd_min = _min(s_fd_min, curr_fd); + s_fd_max = _max(s_fd_max, curr_fd); + } + last_fd = curr_fd; + g_fds_array[curr_fd] = tmp.release(); + } + } +#endif //USING_DOCA_COMM_CHANNEL_API + if (!skip_socket) { + if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < + 0) { // TODO: use SOCKET all over the way and avoid this cast + log_err("socket(AF_INET4/6, SOCK_x)"); + rc = SOCKPERF_ERR_SOCKET; + } + fd_socket_map[port_desc_tmp] = curr_fd; + log_msg("FD is %d", curr_fd); + if (tmp->is_multicast) { + tmp->memberships_addr = reinterpret_cast(MALLOC( + IGMP_MAX_MEMBERSHIPS * sizeof(struct sockaddr_store_t))); + } else { + tmp->memberships_addr = NULL; + } + tmp->memberships_size = 0; + + s_fd_num++; + } } - if (curr_fd >= 0) { + if (!skip_socket && curr_fd >= 0) { if ((curr_fd >= max_fds_num) || (prepare_socket(curr_fd, tmp.get()) == (int) INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid this cast @@ -3383,38 +3518,17 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { close(curr_fd); rc = SOCKPERF_ERR_SOCKET; } else { - int i = 0; - - for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { - tmp->active_fd_list[i] = (int)INVALID_SOCKET; // TODO: use SOCKET all - // over the way and avoid - // this cast - } - // TODO: In the following malloc we have a one time memory allocation of - // 128KB that are not reclaimed - // This O(1) leak was introduced in revision 133 - tmp->recv.buf = (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); - if (!tmp->recv.buf) { - log_err("Failed to allocate memory with malloc()"); - rc = SOCKPERF_ERR_NO_MEMORY; - } else { - tmp->recv.cur_addr = tmp->recv.buf; - tmp->recv.max_size = MAX_PAYLOAD_SIZE; - tmp->recv.cur_offset = 0; - tmp->recv.cur_size = tmp->recv.max_size; - - if (new_socket_flag) { - if (s_fd_num == 1) { /*it is the first fd*/ - s_fd_min = curr_fd; - s_fd_max = curr_fd; - } else { - g_fds_array[last_fd]->next_fd = curr_fd; - s_fd_min = _min(s_fd_min, curr_fd); - s_fd_max = _max(s_fd_max, curr_fd); - } - last_fd = curr_fd; - g_fds_array[curr_fd] = tmp.release(); + if (new_socket_flag) { + if (s_fd_num == 1) { /*it is the first fd*/ + s_fd_min = curr_fd; + s_fd_max = curr_fd; + } else { + g_fds_array[last_fd]->next_fd = curr_fd; + s_fd_min = _min(s_fd_min, curr_fd); + s_fd_max = _max(s_fd_max, curr_fd); } + last_fd = curr_fd; + g_fds_array[curr_fd] = tmp.release(); } } } @@ -3517,6 +3631,188 @@ static bool fds_array_is_valid() { return ((fd == s_fd_max) && ((i + 1) == s_fd_num) && (g_fds_array[fd]->next_fd == s_fd_min)); } +#if defined(USING_DOCA_COMM_CHANNEL_API) +int bringup_for_doca(std::unique_ptr &tmp) +{ + log_dbg("creating Doca with name %s", s_user_params.addr.addr_un.sun_path); + struct cc_ctx_server *ctx_server; + struct cc_ctx_client *ctx_client; + struct cc_ctx cc_ctx; + + cc_ctx.recv_buffer = tmp->recv.buf; + cc_ctx.num_connected_clients = 0; + cc_ctx.buf_size = 0; + cc_ctx.recv_flag = false; + os_mutex_init(&cc_ctx.lock); + os_cond_init(&cc_ctx.cond); + + if (s_user_params.doca_cc_fifo) { + cc_ctx.fast_path = true; + cc_ctx.ctx_fifo.fifo_connection_state = CC_FIFO_CONNECTION_IN_PROGRESS; + cc_ctx.ctx_fifo.pe = s_user_params.pe; + cc_ctx.ctx_fifo.producer_mem.msg_size = s_user_params.msg_size; + cc_ctx.ctx_fifo.consumer_mem.msg_size = MAX_PAYLOAD_SIZE; + cc_ctx.ctx_fifo.task_submitted = false; + } else { + cc_ctx.fast_path = false; + } + struct priv_doca_pci_bdf dev_pcie = {0}; + doca_error_t doca_error = DOCA_SUCCESS; + struct doca_ctx *ctx; + + int epoll_fd = epoll_create(max_fds_num); + doca_event_handle_t event_handle = doca_event_invalid_handle; + struct epoll_event ev = { 0, { 0 } }; + ev.events = EPOLLIN | EPOLLPRI; + + /* Saving fd into context */ + cc_ctx.fd = epoll_fd; + /* Convert the PCI addresses into the matching struct */ + doca_error = cc_parse_pci_addr(s_user_params.cc_dev_pci_addr, &dev_pcie); + if (doca_error != DOCA_SUCCESS) { + errno = EPERM; + log_dbg("doca error %s", doca_error_get_descr(doca_error)); + exit_with_err("Failed to parse the device PCI address", SOCKPERF_ERR_FATAL); + } + log_dbg("parse_pci_addr succeeded for hw_dev"); + + /* Open DOCA device according to the given PCI address */ + doca_error = cc_open_doca_device_with_pci(&dev_pcie, NULL, &(cc_ctx.hw_dev)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to open Comm Channel DOCA device based on PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("open_doca_device_with_pci succeeded for hw_dev"); + + if (s_user_params.mode == MODE_SERVER) { + ctx_server = (struct cc_ctx_server*)MALLOC(sizeof(struct cc_ctx_server)); + /* Convert the PCI addresses into the matching struct */ + struct priv_doca_pci_bdf dev_rep_pcie = {0}; + doca_error = cc_parse_pci_addr(s_user_params.cc_dev_rep_pci_addr, &dev_rep_pcie); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to parse the device representor PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("parse_pci_addr succeeded for rep_dev"); + + /* Open DOCA device representor according to the given PCI address */ + doca_error = cc_open_doca_device_rep_with_pci(cc_ctx.hw_dev, DOCA_DEVINFO_REP_FILTER_NET, + &dev_rep_pcie, &(ctx_server->rep_dev)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to open Comm Channel DOCA device representor based on PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("open_doca_device_rep_with_pci succeeded for rep_dev"); + + doca_error = doca_cc_server_create(cc_ctx.hw_dev, ctx_server->rep_dev, + s_user_params.addr.addr_un.sun_path, &(ctx_server->server)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to create server with error %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("doca_cc_server_create succeeded"); + ctx = doca_cc_server_as_ctx(ctx_server->server); + if (s_user_params.doca_cc_fifo) { + cc_ctx.ctx_fifo.underload_mode = false; + } + + } else { // MODE_CLIENT + ctx_client = (struct cc_ctx_client*)MALLOC(sizeof(struct cc_ctx_client)); + cc_ctx.state = CC_CONNECTION_IN_PROGRESS; + doca_error = doca_cc_client_create(cc_ctx.hw_dev, s_user_params.addr.addr_un.sun_path, + &(ctx_client->client)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to create client with error %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("doca_cc_client_create succeeded"); + ctx = doca_cc_client_as_ctx(ctx_client->client); + if (!s_user_params.b_client_ping_pong && !s_user_params.b_stream) { // latency_under_load + cc_ctx.ctx_fifo.underload_mode = true; + if (s_user_params.doca_cc_fifo) { + // For underload mode we use different PE for consumer, 1 PE per thread + doca_error = doca_pe_create(&(s_user_params.pe_underload)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Fail creating pe for underload mode with error %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + log_dbg("doca_pe_create succeeded for underload mode"); + cc_ctx.ctx_fifo.pe_underload = s_user_params.pe_underload; + } + } else { + cc_ctx.ctx_fifo.underload_mode = false; + } + } + + doca_error = doca_pe_connect_ctx(s_user_params.pe, ctx); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed adding pe context to server with error = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + log_dbg("doca_pe_connect_ctx succeeded"); + +/*******************************************************/ +/*Set Callback*/ + if (s_user_params.mode == MODE_SERVER) { + ctx_server->ctx = cc_ctx; + tmp->doca_cc_ctx = &ctx_server->ctx; + doca_error = cc_doca_server_set_params(ctx_server); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting server params = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + } else { // MODE_CLIENT + ctx_client->ctx = cc_ctx; + tmp->doca_cc_ctx = &ctx_client->ctx; + doca_error = cc_doca_client_set_params(ctx_client); + if (doca_error != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed setting client params = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + } +/********** Event handling fd **********/ + + /* DOCA_LOG_INFO("Registering PE event"); */ + /* doca_event_handle_t is a file descriptor that can be added to an epoll. */ + /* Currently not implemented*/ + /* Works alongisde with doca_pe_request_notification */ + /* doca_pe_get_notification_handle(s_user_params.pe, &event_handle);*/ + + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_handle, &ev); + log_dbg("epoll fd is %d", epoll_fd); + + // Sock type is not needed for comm channel flow + tmp->sock_type = -1; + return epoll_fd; + +destroy_cc: + if (doca_error != DOCA_SUCCESS) { + + /* Destroy Comm Channel DOCA device */ + doca_dev_close(cc_ctx.hw_dev); + /* Destroy PE*/ + doca_pe_destroy(s_user_params.pe); + if (s_user_params.doca_cc_fifo) { + if (s_user_params.mode == MODE_CLIENT && cc_ctx.ctx_fifo.underload_mode) { + doca_pe_destroy(s_user_params.pe_underload); + } + } + os_mutex_close(&cc_ctx.lock); + os_cond_destroy(&cc_ctx.cond); + s_user_params.pe = NULL; + if (s_user_params.mode == MODE_SERVER) { + doca_cc_server_destroy(ctx_server->server); + doca_dev_rep_close(ctx_server->rep_dev); + FREE(ctx_server); + } else { + doca_cc_client_destroy(ctx_client->client); + FREE(ctx_client); + } + exit_with_err("Com channel bringup failed", SOCKPERF_ERR_FATAL); + } +} +#endif //USING_DOCA_COMM_CHANNEL_API + //------------------------------------------------------------------------------ int bringup(const int *p_daemonize) { int rc = SOCKPERF_ERR_NONE; @@ -3627,37 +3923,46 @@ int bringup(const int *p_daemonize) { rc = SOCKPERF_ERR_NO_MEMORY; } else { /* create a socket */ - if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < - 0) { // TODO: use SOCKET all over the way and avoid this cast - log_err("socket(AF_INET4/6/AF_UNIX, SOCK_x)"); - rc = SOCKPERF_ERR_SOCKET; + int i = 0; + s_fd_num = 1; + + for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { + tmp->active_fd_list[i] = (int)INVALID_SOCKET; + } + tmp->recv.buf = + (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); + if (!tmp->recv.buf) { + log_err("Failed to allocate memory with malloc()"); + rc = SOCKPERF_ERR_NO_MEMORY; } else { - if ((curr_fd >= max_fds_num) || - (prepare_socket(curr_fd, tmp.get()) == - (int)INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid - // this cast - log_err("Invalid socket"); - close(curr_fd); + tmp->recv.cur_addr = tmp->recv.buf; + tmp->recv.max_size = MAX_PAYLOAD_SIZE; + tmp->recv.cur_offset = 0; + tmp->recv.cur_size = tmp->recv.max_size; + } + bool skip_socket = false; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + int epoll_fd = bringup_for_doca(tmp); + skip_socket = true; + s_fd_min = s_fd_max = epoll_fd; + g_fds_array[s_fd_min] = tmp.release(); + g_fds_array[s_fd_min]->next_fd = s_fd_min; + } +#endif /* USING_DOCA_COMM_CHANNEL_API */ + if (!skip_socket) { + if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < 0) { // TODO: use SOCKET all over the way and avoid this cast + log_err("socket(AF_INET4/6/AF_UNIX, SOCK_x)"); rc = SOCKPERF_ERR_SOCKET; } else { - int i = 0; - - s_fd_num = 1; - - for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { - tmp->active_fd_list[i] = (int)INVALID_SOCKET; - } - tmp->recv.buf = - (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); - if (!tmp->recv.buf) { - log_err("Failed to allocate memory with malloc()"); - rc = SOCKPERF_ERR_NO_MEMORY; + if ((curr_fd >= max_fds_num) || + (prepare_socket(curr_fd, tmp.get()) == + (int)INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid + // this cast + log_err("Invalid socket"); + close(curr_fd); + rc = SOCKPERF_ERR_SOCKET; } else { - tmp->recv.cur_addr = tmp->recv.buf; - tmp->recv.max_size = MAX_PAYLOAD_SIZE; - tmp->recv.cur_offset = 0; - tmp->recv.cur_size = tmp->recv.max_size; - s_fd_min = s_fd_max = curr_fd; g_fds_array[s_fd_min] = tmp.release(); g_fds_array[s_fd_min]->next_fd = s_fd_min; @@ -3676,12 +3981,12 @@ int bringup(const int *p_daemonize) { } } } - +#ifndef USING_DOCA_COMM_CHANNEL_API if (!rc && !fds_array_is_valid()) { log_err("Sanity check failed for sockets list"); rc = SOCKPERF_ERR_FATAL; } - +#endif /* USING_DOCA_COMM_CHANNEL_API */ if (!rc && (s_user_params.threads_num > s_fd_num || s_user_params.threads_num == 0)) { log_msg("Number of threads should be less than sockets count"); rc = SOCKPERF_ERR_BAD_ARGUMENT;