diff --git a/configure.ac b/configure.ac index d8a87111..fa407af3 100644 --- a/configure.ac +++ b/configure.ac @@ -10,7 +10,7 @@ AC_CANONICAL_SYSTEM TARGETDIR="unknown" case "$host" in - + i?86-*-*) TARGET=X86; TARGETDIR=x86;; ia64*-*-*) TARGET=IA64; TARGETDIR=ia64;; powerpc*-*-linux* | powerpc-*-sysv*) TARGET=POWERPC; TARGETDIR=powerpc;; @@ -26,14 +26,14 @@ case "$host" in aarch64-*-*) TARGET=AARCH64; TARGETDIR=aarch64;; s390*-*-*) TARGET=S390; TARGETDIR=s390;; esac - + AC_SUBST(AM_RUNTESTFLAGS) AC_SUBST(AM_LTLDFLAGS) - + if test $TARGETDIR = unknown; then AC_MSG_ERROR(["it has not been ported to $host."]) fi - + AM_CONDITIONAL(X86, test x$TARGET = xX86) AM_CONDITIONAL(IA64, test x$TARGET = xIA64) AM_CONDITIONAL(POWERPC, test x$TARGET = xPOWERPC) @@ -100,6 +100,34 @@ AC_MSG_CHECKING( [for vma extra api]) AC_MSG_RESULT([${have_vma_api}]) +########################################################################## +# check DOCA communication channel API +# +AC_ARG_ENABLE( + [doca-communication-channel-api], + AC_HELP_STRING([--enable-doca-communication-channel-api], + [SOCKPERF: enable DOCA communication channel extra api support (default=no)]), + [have_doca_comm_channel_api=$enableval], + [have_doca_comm_channel_api=no]) +AS_IF([test "x${have_doca_comm_channel_api}" == "xyes"], + if test "$have_doca_comm_channel_api" = "yes" + then + have_doca_comm_channel_api=/opt/mellanox/doca + fi + + CPPFLAGS="$CPPFLAGS -I$have_doca_comm_channel_api/include -I$have_doca_comm_channel_api/lib -I$have_doca_comm_channel_api/samples" + LIBS="$LIBS -ldoca_comm_channel -ldoca_common" + + [AC_CHECK_HEADERS([$have_doca_comm_channel_api/include/doca_comm_channel.h $have_doca_comm_channel_api/include/doca_dev.h $have_doca_comm_channel_api/samples/common.h], + [AC_DEFINE([USING_DOCA_COMM_CHANNEL_API],[1],[[Enable using DOCA communication channel extra API]]) + ], + [AC_MSG_ERROR([doca_comm_channel.h file not found])] + [have_doca_comm_channel_api=no])]) +AC_MSG_CHECKING( + [for doca communication channel extra api]) +AC_MSG_RESULT([${have_doca_comm_channel_api}]) + + ########################################################################## # check XLIO extra API # @@ -141,7 +169,7 @@ AM_CONDITIONAL(DOC, test "x$have_doc" = "xyes") ########################## -# Enable tests +# Enable tests # SP_ARG_ENABLE_BOOL( [test], @@ -151,7 +179,7 @@ AM_CONDITIONAL(TEST, test "x$have_test" = "xyes") ########################## -# Enable tools +# Enable tools # SP_ARG_ENABLE_BOOL( [tool], @@ -233,6 +261,7 @@ AC_MSG_RESULT([ test: ${have_test} tool: ${have_tool} vma_api: ${have_vma_api} + doca_api: ${have_doca_comm_channel_api} xlio_api: ${have_xlio_api} debug: ${have_debug} ]) diff --git a/src/client.cpp b/src/client.cpp index 2f2b7268..767e5dc3 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -885,7 +885,20 @@ int Client::initBeforeLoop() { if (!(data && (data->active_fd_list))) continue; const sockaddr_store_t *p_client_bind_addr = &g_pApp->m_const_params.client_bind_info; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + // Waiting for connection + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)data->doca_cc_ctx; + while (ctx_client->state != CC_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + log_dbg("[fd=%d] Client connected successfully", ifd); + } + // Avoid Client binding in Com Channel mode + if (p_client_bind_addr->addr.sa_family != AF_UNSPEC && !s_user_params.doca_comm_channel) { +#else if (p_client_bind_addr->addr.sa_family != AF_UNSPEC) { +#endif //USING_DOCA_COMM_CHANNEL_API socklen_t client_bind_addr_len = g_pApp->m_const_params.client_bind_info_len; std::string hostport = sockaddr_to_hostport(p_client_bind_addr); #if defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__) diff --git a/src/common.cpp b/src/common.cpp index 52de7d60..390387c3 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -64,7 +64,11 @@ std::string sockaddr_to_hostport(const struct sockaddr *addr) if (addr->sa_family == AF_INET6) { return "[" + std::string(hbuf) + "]:" + std::string(pbuf); } else if (addr->sa_family == AF_UNIX) { - return std::string(addr->sa_data); +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) + return std::string(pbuf) + " [DOCA]"; +#endif + return std::string(pbuf) + " [UNIX]"; } else { return std::string(hbuf) + ":" + std::string(pbuf); } diff --git a/src/common.h b/src/common.h index 7a4da907..1587ce8b 100644 --- a/src/common.h +++ b/src/common.h @@ -112,6 +112,56 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, ret = tls_write(g_fds_array[fd]->tls_handle, buf, nbytes); } else #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + doca_error_t doca_error; + struct doca_cc_send_task *task; + struct doca_task *task_obj; + struct timespec ts = { + .tv_sec = 0, + .tv_nsec = NANOS_10_X_1000, + }; + struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; + do { + if (s_user_params.mode == MODE_SERVER) { + struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; + doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf, + nbytes, &task); + } else { // MODE_CLIENT + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx; + doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf, + nbytes, &task); + } + if (doca_error == DOCA_ERROR_NO_MEMORY) { + // Queue is full of tasks, need to free tasks with completion callback + doca_pe_progress(s_user_params.pe); + } + } while (doca_error == DOCA_ERROR_NO_MEMORY); + + if (doca_error != DOCA_SUCCESS) { + log_err("Doca task_alloc_init failed"); + return RET_SOCKET_SHUTDOWN; + } + + task_obj = doca_cc_send_task_as_task(task); + do { + doca_error = doca_task_submit(task_obj); + if (doca_error == DOCA_ERROR_AGAIN) { + // Queue is full of tasks, need to free tasks with completion callback + doca_pe_progress(s_user_params.pe); + } + } while (doca_error == DOCA_ERROR_AGAIN); + + if (doca_error != DOCA_SUCCESS) { + log_err("Doca doca_task_submit failed"); + return RET_SOCKET_SHUTDOWN; + } + + // Additional call for better performance- release pressure on send queue + doca_pe_progress(s_user_params.pe); + ret = nbytes; + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ { ret = sendto(fd, buf, nbytes, flags, sendto_addr, addrlen); } diff --git a/src/defs.h b/src/defs.h index 04ed0e87..fea3ba3c 100644 --- a/src/defs.h +++ b/src/defs.h @@ -157,6 +157,11 @@ typedef unsigned short int sa_family_t; #endif // USING_XLIO_EXTRA_API #endif // !defined(__windows__) && !defined(__FreeBSD__) && !defined(__APPLE__) +#ifdef USING_DOCA_COMM_CHANNEL_API +#include "doca_cc_helper.h" +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif /* USING_DOCA_COMM_CHANNEL_API */ + #define MIN_PAYLOAD_SIZE (MsgHeader::EFFECTIVE_SIZE) extern int MAX_PAYLOAD_SIZE; extern int max_fds_num; @@ -291,8 +296,14 @@ enum { OPT_LOAD_XLIO, // 47 OPT_TCP_NB_CONN_TIMEOUT_MS, // 48 #if defined(DEFINED_TLS) - OPT_TLS + OPT_TLS, #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + OPT_DOCA, + OPT_PCI, + OPT_PCI_REP +#endif /* USING_DOCA_COMM_CHANNEL_API */ + }; static const char *const round_trip_str[] = { "latency", "rtt" }; @@ -571,6 +582,9 @@ struct fds_data { #if defined(DEFINED_TLS) void *tls_handle = nullptr; #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + struct cc_ctx *doca_cc_ctx = nullptr; +#endif /* USING_DOCA_COMM_CHANNEL_API */ fds_data() { @@ -804,6 +818,12 @@ struct user_params_t { #if defined(DEFINED_TLS) bool tls = false; #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + bool doca_comm_channel = false; /* Flag to indicate using Com Channel*/ + char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */ + char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */ + struct doca_pe *pe = nullptr; /* Progress engine for doca, one per thread*/ +#endif /* USING_DOCA_COMM_CHANNEL_API */ user_params_t() { memset(&client_bind_info, 0, sizeof(client_bind_info)); diff --git a/src/doca_cc_helper.h b/src/doca_cc_helper.h new file mode 100644 index 00000000..e13f3256 --- /dev/null +++ b/src/doca_cc_helper.h @@ -0,0 +1,715 @@ +/* + * Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of the Mellanox Technologies Ltd nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifndef DOCA_CC_HELPER_H_ +#define DOCA_CC_HELPER_H_ + +#include +#include + +#include "doca_comm_channel.h" +#include +#include +#include +#include +#include +#include +#include "os_abstract.h" + +#define MSG_SIZE 4080 +#define PCI_ADDR_LEN 8 +#define CC_MAX_QUEUE_SIZE 1024 /* Maximum amount of message in queue */ +#define CC_REC_QUEUE_SIZE 10 /* Maximum amount of message in queue */ +#define CC_SEND_TASK_NUM 10 /* Maximum amount of CC send task number */ +#define NANOS_10_X_1000 (10 * 1000) + +#define log_dbg(log_fmt, ...) \ + printf("Doca CC" ": " log_fmt "\n", ##__VA_ARGS__); +#define DOCA_LOG_INFO(format, ...) log_dbg(format, ##__VA_ARGS__) +#define DOCA_LOG_ERR(format, ...) log_dbg(format, ##__VA_ARGS__) +#define DOCA_LOG_DBG(format, ...) log_dbg(format, ##__VA_ARGS__) + +enum cc_client_state { + CONNECTION_IN_PROGRESS, + CC_CONNECTED +}; +struct cc_ctx { + struct doca_dev *hw_dev; /**< Doca Device used per PCI address > */ + struct doca_cc_connection *connection; /**< Connection object used for pairing a connection >*/ + uint32_t num_connected_clients; /**< Number of currently connected clients >*/ + uint8_t *recv_buffer; /**< Pointer to recv buffer >*/ + int buf_size; /**< Buffer size of recv buffer >*/ + bool recv_flag; /**< flag indicates when message received >*/ + int fd; /**< File Descriptor >*/ + os_mutex_t lock; /**< For underload mode only>*/ + os_cond_t cond; /**< For underload mode only>*/ +}; + +struct cc_ctx_server { + struct cc_ctx ctx; /**< Base common ctx >*/ + struct doca_dev_rep *rep_dev; /**< Device representor >*/ + struct doca_cc_server *server; /**< Server object >*/ +}; + +struct cc_ctx_client { + struct cc_ctx ctx; /**< Base common ctx >*/ + struct doca_cc_client *client; /**< Client object >*/ + enum cc_client_state state; /**< Holding state of client connection >*/ + bool underload_mode; /**< For using different callback>*/ +}; + +struct priv_doca_pci_bdf { + #define PCI_FUNCTION_MAX_VALUE 8 + #define PCI_DEVICE_MAX_VALUE 32 + #define PCI_BUS_MAX_VALUE 256 + union { + uint16_t raw; + struct { + uint16_t function : 3; + uint16_t device : 5; + uint16_t bus : 8; + }; + }; +}; + +/************** General ******************/ +static doca_error_t +cc_parse_pci_addr(char const *pci_addr, struct priv_doca_pci_bdf *out_bdf) +{ + unsigned int bus_bitmask = 0xFFFFFF00; + unsigned int dev_bitmask = 0xFFFFFFE0; + unsigned int func_bitmask = 0xFFFFFFF8; + uint32_t tmpu; + char tmps[4]; + + if (pci_addr == NULL || strlen(pci_addr) != 7 || pci_addr[2] != ':' || pci_addr[5] != '.') + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[0]; + tmps[1] = pci_addr[1]; + tmps[2] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & bus_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[3]; + tmps[1] = pci_addr[4]; + tmps[2] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & dev_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[6]; + tmps[1] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & func_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + return DOCA_SUCCESS; +} + +typedef doca_error_t (*jobs_check)(struct doca_devinfo *); + +static doca_error_t +cc_open_doca_device_with_pci(const struct priv_doca_pci_bdf *value, jobs_check func, struct doca_dev **retval) +{ + struct doca_devinfo **dev_list; + uint32_t nb_devs; + char pci_buf[DOCA_DEVINFO_REP_PCI_ADDR_SIZE] = {}; + + doca_error_t res; + size_t i; + + /* Set default return value */ + *retval = NULL; + + res = doca_devinfo_create_list(&dev_list, &nb_devs); + if (res != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to load doca devices list. Doca_error value: %d", res); + return res; + } + + /* Search */ + for (i = 0; i < nb_devs; i++) { + res = doca_devinfo_get_pci_addr_str(dev_list[i], pci_buf); + if (res == DOCA_SUCCESS) { + /* If any special capabilities are needed */ + if (func != NULL && func(dev_list[i]) != DOCA_SUCCESS) + continue; + + /* if device can be opened */ + res = doca_dev_open(dev_list[i], retval); + if (res == DOCA_SUCCESS) { + doca_devinfo_destroy_list(dev_list); + return res; + } + } + } + + DOCA_LOG_ERR("Matching device not found."); + res = DOCA_ERROR_NOT_FOUND; + + doca_devinfo_destroy_list(dev_list); + return res; +} + +static doca_error_t +cc_open_doca_device_rep_with_pci(struct doca_dev *local, enum doca_devinfo_rep_filter filter, struct priv_doca_pci_bdf *pci_bdf, + struct doca_dev_rep **retval) +{ + uint32_t nb_rdevs = 0; + struct doca_devinfo_rep **rep_dev_list = NULL; + char pci_buf[DOCA_DEVINFO_REP_PCI_ADDR_SIZE] = {}; + + doca_error_t result; + size_t i; + + *retval = NULL; + + /* Search */ + result = doca_devinfo_rep_create_list(local, filter, &rep_dev_list, &nb_rdevs); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR( + "Failed to create devinfo representors list. Representor devices are available only on DPU, do not run on Host."); + return DOCA_ERROR_INVALID_VALUE; + } + + for (i = 0; i < nb_rdevs; i++) { + result = doca_devinfo_rep_get_pci_addr_str(rep_dev_list[i], pci_buf); + if (result == DOCA_SUCCESS && + doca_dev_rep_open(rep_dev_list[i], retval) == DOCA_SUCCESS) { + doca_devinfo_rep_destroy_list(rep_dev_list); + return DOCA_SUCCESS; + } + } + + DOCA_LOG_ERR("Matching device not found."); + doca_devinfo_rep_destroy_list(rep_dev_list); + return DOCA_ERROR_NOT_FOUND; +} + +/************** SERVER ******************/ + +/** + * Callback for send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @user_data [in]: User data for context + */ +static void +cc_server_send_task_completion_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + /* These arguments are not in use */ + (void)user_data; + (void)task_user_data; + + // DOCA_LOG_INFO("Task sent successfully"); + + doca_task_free(doca_cc_send_task_as_task(task)); +} + +/** + * Callback for send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @user_data [in]: User data for context + */ +static void +cc_server_send_task_completion_err_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + doca_error_t result; + + /* This argument is not in use */ + (void)task_user_data; + + result = doca_task_get_status(doca_cc_send_task_as_task(task)); + DOCA_LOG_ERR("[fd=%d] Message failed to send with error = %s", ctx_server->ctx.fd , doca_error_get_name(result)); + + doca_task_free(doca_cc_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_server_as_ctx(ctx_server->server)); +} + +/** + * Callback for message recv event + * + * @event [in]: Recv event object + * @recv_buffer [in]: Message buffer + * @msg_len [in]: Message len + * @cc_connection [in]: Connection the message was received on + */ +static void +cc_server_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *recv_buffer, size_t msg_len, + struct doca_cc_connection *cc_connection) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Save the connection that the ping was sent over for sending the response */ + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + ctx_server->ctx.connection = cc_connection; + + //DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); + + memcpy(ctx_server->ctx.recv_buffer, recv_buffer, msg_len); + ctx_server->ctx.buf_size = (int)msg_len; + ctx_server->ctx.recv_flag = true; +} + +/** + * Callback for connection event + * + * @event [in]: Connection event object + * @cc_conn [in]: Connection object + * @change_success [in]: Whether the connection was successful or not + */ +static void +cc_server_connection_event_callback(struct doca_cc_event_connection_status_changed *event, + struct doca_cc_connection *cc_conn, bool change_success) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_conn); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Update number of connected clients in case of successful connection */ + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + if (!change_success) { + DOCA_LOG_ERR("[fd=%d] Failed connection received", ctx_server->ctx.fd); + return; + } + + ctx_server->ctx.num_connected_clients++; + DOCA_LOG_INFO("[fd=%d] New client connected to server", ctx_server->ctx.fd); +} + +/** + * Callback for disconnection event + * + * @event [in]: Connection event object + * @cc_conn [in]: Connection object + * @change_success [in]: Whether the disconnection was successful or not + */ +static void +cc_server_disconnection_event_callback(struct doca_cc_event_connection_status_changed *event, + struct doca_cc_connection *cc_conn, bool change_success) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* These arguments are not in use */ + (void)event; + (void)change_success; + + cc_server = doca_cc_server_get_server_ctx(cc_conn); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Update number of connected clients in case of disconnection, Currently disconnection only happens if server + * sent a message to a client which already stopped. + */ + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + ctx_server->ctx.num_connected_clients--; + DOCA_LOG_INFO("[fd=%d] client was disconnected from server", ctx_server->ctx.fd); +} + + + +/** + * Callback triggered whenever CC server context state changes + * + * @user_data [in]: User data associated with the CC server context. Will hold struct cc_ctrl_path_objects * + * @ctx [in]: The CC server context that had a state change + * @prev_state [in]: Previous context state + * @next_state [in]: Next context state (context is already in this state when the callback is called) + */ +static void +cc_server_state_changed_callback(const union doca_data user_data, struct doca_ctx *ctx, enum doca_ctx_states prev_state, + enum doca_ctx_states next_state) +{ + (void)ctx; + (void)prev_state; + + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + + switch (next_state) { + case DOCA_CTX_STATE_IDLE: + DOCA_LOG_INFO("[fd=%d] CC server context has been stopped.", ctx_server->ctx.fd); + break; + case DOCA_CTX_STATE_STARTING: + /** + * The context is in starting state, this is unexpected for CC server. + */ + DOCA_LOG_ERR("[fd=%d] CC server context entered into starting state. Unexpected transition", ctx_server->ctx.fd); + break; + case DOCA_CTX_STATE_RUNNING: + DOCA_LOG_INFO("[fd=%d] CC server context is running. Waiting for clients to connect", ctx_server->ctx.fd); + break; + case DOCA_CTX_STATE_STOPPING: + /** + * The context is in stopping, this can happen when fatal error encountered or when stopping context. + * doca_pe_progress() will cause all tasks to be flushed, and finally transition state to idle + */ + DOCA_LOG_INFO("[fd=%d] CC server context entered into stopping state. Terminating connections with clients", ctx_server->ctx.fd); + break; + default: + break; + } +} + +static doca_error_t +cc_doca_server_set_params(struct cc_ctx_server *ctx_server) +{ + struct doca_ctx *ctx; + doca_error_t result; + union doca_data user_data; + + ctx = doca_cc_server_as_ctx(ctx_server->server); + result = doca_ctx_set_state_changed_cb(ctx, cc_server_state_changed_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting state change callback with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("doca_ctx_set_state_changed_cb succeeded"); + + result = doca_cc_server_send_task_set_conf(ctx_server->server, cc_server_send_task_completion_callback, + cc_server_send_task_completion_err_callback, CC_SEND_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting send task cbs with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("doca_cc_server_send_task_set_conf succeeded"); + + result = doca_cc_server_event_msg_recv_register(ctx_server->server, cc_server_message_recv_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding message recv event cb with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("doca_cc_server_event_msg_recv_register succeeded"); + + result = doca_cc_server_event_connection_register(ctx_server->server, cc_server_connection_event_callback, + cc_server_disconnection_event_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding connection event cbs with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("doca_cc_server_event_connection_register succeeded"); + + /* Set server properties */ + result = doca_cc_server_set_max_msg_size(ctx_server->server, MSG_SIZE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); + } + + result = doca_cc_server_set_recv_queue_size(ctx_server->server, CC_REC_QUEUE_SIZE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); + } + user_data.ptr = (void *)ctx_server; + + result = doca_ctx_set_user_data(ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(result)); + } + + result = doca_ctx_start(ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to start server context with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("[fd=%d] server properties setters succeeded", ctx_server->ctx.fd); + return result; + +} + +/************** CLIENT ******************/ + +/** + * Callback for send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @user_data [in]: User data for context + */ +static void +cc_client_send_task_completion_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + /* These arguments are not in use */ + (void)user_data; + (void)task_user_data; + + DOCA_LOG_INFO("Task sent successfully"); + + doca_task_free(doca_cc_send_task_as_task(task)); +} + +/** + * Callback for send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @user_data [in]: User data for context + */ + +static void +cc_client_send_task_completion_err_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct cc_ctx_client *cc_client = (struct cc_ctx_client *)user_data.ptr; + doca_error_t result; + + /* This argument is not in use */ + (void)task_user_data; + + result = doca_task_get_status(doca_cc_send_task_as_task(task)); + DOCA_LOG_ERR("[fd=%d] Message failed to send with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + + doca_task_free(doca_cc_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_client_as_ctx(cc_client->client)); +} + +/** + * Callback for message recv event + * + * @event [in]: Recv event object + * @recv_buffer [in]: Message buffer + * @msg_len [in]: Message len + * @cc_connection [in]: Connection the message was received on + */ +static void +cc_client_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *recv_buffer, size_t msg_len, + struct doca_cc_connection *cc_connection) +{ + union doca_data user_data = doca_cc_connection_get_user_data(cc_connection); + struct cc_ctx_client *cc_client = (struct cc_ctx_client *)user_data.ptr; + + /* This argument is not in use */ + (void)event; + + DOCA_LOG_INFO("[fd=%d] Message received: '%d", cc_client->ctx.fd, (int)msg_len); + memcpy(cc_client->ctx.recv_buffer, recv_buffer, msg_len); + cc_client->ctx.buf_size = (int)msg_len; + cc_client->ctx.recv_flag = true; +} + +/** + * Callback for message recv event + * + * @event [in]: Recv event object + * @recv_buffer [in]: Message buffer + * @msg_len [in]: Message len + * @cc_connection [in]: Connection the message was received on + */ +static void +cc_client_message_UL_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *recv_buffer, size_t msg_len, + struct doca_cc_connection *cc_connection) +{ + union doca_data user_data = doca_cc_connection_get_user_data(cc_connection); + struct cc_ctx_client *cc_client = (struct cc_ctx_client *)user_data.ptr; + + struct timespec ts = { + .tv_sec = 0, + .tv_nsec = NANOS_10_X_1000, + }; + /* This argument is not in use */ + (void)event; + os_mutex_lock(&cc_client->ctx.lock); + // In case recv thread is already reading, waiting for completion + // Need to make sure last meesage was read before we override the buffer + while (cc_client->ctx.recv_flag) { + nanosleep(&ts, &ts); + } + DOCA_LOG_INFO("[fd=%d] Message received: '%d", cc_client->ctx.fd, (int)msg_len); + memcpy(cc_client->ctx.recv_buffer, recv_buffer, msg_len); + cc_client->ctx.buf_size = (int)msg_len; + cc_client->ctx.recv_flag = true; + // Siganl to recv thread for copy done- recv thread can continue + os_cond_signal(&cc_client->ctx.cond); + os_mutex_unlock(&cc_client->ctx.lock); +} + +/** + * Init message on client + * + * @cc_ctx_client [in]: cc_ctx_client struct + * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise + */ +static doca_error_t +cc_init_client_send_message(struct cc_ctx_client *cc_client) +{ + doca_error_t result; + union doca_data user_data; + + result = doca_cc_client_get_connection(cc_client->client, &(cc_client->ctx.connection)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to get connection from client with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + return result; + } + DOCA_LOG_INFO("[fd=%d] doca_cc_client_get_connection succeeded", cc_client->ctx.fd); + + user_data.ptr = (void *)cc_client; + result = doca_cc_connection_set_user_data(cc_client->ctx.connection, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to set user_data for connection with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + return result; + } + + cc_client->state = CC_CONNECTED; + DOCA_LOG_INFO("[fd=%d] init_client_send_message succeeded", cc_client->ctx.fd); + return DOCA_SUCCESS; +} + +/** + * Callback triggered whenever CC client context state changes + * + * @user_data [in]: User data associated with the CC client context. Will hold struct cc_ctrl_path_objects * + * @ctx [in]: The CC client context that had a state change + * @prev_state [in]: Previous context state + * @next_state [in]: Next context state (context is already in this state when the callback is called) + */ +static void +cc_client_state_changed_callback(const union doca_data user_data, struct doca_ctx *ctx, enum doca_ctx_states prev_state, + enum doca_ctx_states next_state) +{ + (void)ctx; + (void)prev_state; + doca_error_t result; + struct cc_ctx_client *cc_client = (struct cc_ctx_client *)user_data.ptr; + + switch (next_state) { + case DOCA_CTX_STATE_IDLE: + DOCA_LOG_INFO("[fd=%d] CC client context has been stopped.", cc_client->ctx.fd); + break; + case DOCA_CTX_STATE_STARTING: + /** + * The context is in starting state, need to progress until connection with server is established. + */ + DOCA_LOG_INFO("[fd=%d] CC client context entered into starting state. Waiting for connection establishment", cc_client->ctx.fd); + break; + case DOCA_CTX_STATE_RUNNING: + DOCA_LOG_INFO("[fd=%d] CC client context is running. initialize message", cc_client->ctx.fd); + result = cc_init_client_send_message(cc_client); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to submit send task with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + (void)doca_ctx_stop(doca_cc_client_as_ctx(cc_client->client)); + } + break; + case DOCA_CTX_STATE_STOPPING: + /** + * The context is in stopping, this can happen when fatal error encountered or when stopping context. + * doca_pe_progress() will cause all tasks to be flushed, and finally transition state to idle + */ + DOCA_LOG_INFO("[fd=%d] CC client context entered into stopping state. Waiting for connection termination", cc_client->ctx.fd); + break; + default: + break; + } +} + +static doca_error_t +cc_doca_client_set_params(struct cc_ctx_client *cc_client) +{ + struct doca_ctx *ctx; + doca_error_t result; + union doca_data user_data; + + ctx = doca_cc_client_as_ctx(cc_client->client); + result = doca_ctx_set_state_changed_cb(ctx, cc_client_state_changed_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting state change callback with error = %s", doca_error_get_name(result)); + } + + result = doca_cc_client_send_task_set_conf(cc_client->client, cc_client_send_task_completion_callback, + cc_client_send_task_completion_err_callback, CC_SEND_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting send task cbs with error = %s", doca_error_get_name(result)); + } + + if (!cc_client->underload_mode) { // ping pong or throughput test + result = doca_cc_client_event_msg_recv_register(cc_client->client, cc_client_message_recv_callback); + } else { // underload test + result = doca_cc_client_event_msg_recv_register(cc_client->client, cc_client_message_UL_recv_callback); + } + + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding message recv event cb with error = %s", doca_error_get_name(result)); + } + + /* Set client properties */ + result = doca_cc_client_set_max_msg_size(cc_client->client, MSG_SIZE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); + } + + result = doca_cc_client_set_recv_queue_size(cc_client->client, CC_REC_QUEUE_SIZE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); + } + user_data.ptr = (void *)cc_client; + + result = doca_ctx_set_user_data(ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(result)); + } + + result = doca_ctx_start(ctx); + if (result != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed to start client context with error = %s", doca_error_get_name(result)); + } + DOCA_LOG_DBG("[fd=%d] client properties setters succeeded", cc_client->ctx.fd); + return result; +} + +#endif // DOCA_CC_HELPER_H \ No newline at end of file diff --git a/src/input_handlers.h b/src/input_handlers.h index 4578ffeb..7fc08612 100644 --- a/src/input_handlers.h +++ b/src/input_handlers.h @@ -71,6 +71,50 @@ class RecvFromInputHandler : public MessageParser { ret = tls_read(g_fds_array[fd]->tls_handle, buf, m_recv_data.cur_size); } else #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + struct timespec ts = { + .tv_sec = 0, + .tv_nsec = NANOS_10_X_1000, + }; + doca_error_t doca_error; + struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; + if (s_user_params.mode == MODE_SERVER) { + struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; + } else { + struct cc_ctx_client *ctx_client = (struct cc_ctx_client*)ctx; + } + + if (s_user_params.mode == MODE_CLIENT && !g_pApp->m_const_params.b_client_ping_pong && + !g_pApp->m_const_params.b_stream) { // latency_under_load + os_mutex_lock(&ctx->lock); + while (!ctx->recv_flag) { + // UL only-> wait for signal, once done copy buffer + os_cond_wait(&ctx->cond, &ctx->lock); + } + } else { + // Waiting for meesage receive callback + while (!ctx->recv_flag) { + if (doca_pe_progress(s_user_params.pe) == 0) { + nanosleep(&ts, &ts); + } + } + } + + m_actual_buf_size = ctx->buf_size; + m_actual_buf = ctx->recv_buffer; + + // Done reading setting flag to send mode + ctx->recv_flag = false; + + if (!g_pApp->m_const_params.b_client_ping_pong && + !g_pApp->m_const_params.b_stream) { // latency_under_load + os_mutex_unlock(&ctx->lock); + } + + return ctx->buf_size; + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ { ret = recvfrom(fd, buf, m_recv_data.cur_size, flags, (struct sockaddr *)recvfrom_addr, &size); diff --git a/src/iohandlers.cpp b/src/iohandlers.cpp index 0903bfe4..74ee2628 100644 --- a/src/iohandlers.cpp +++ b/src/iohandlers.cpp @@ -38,6 +38,11 @@ void print_addresses(const fds_data *data, int &list_count) NI_NUMERICHOST | NI_NUMERICSERV); switch (data->server_addr.addr.sa_family) { case AF_UNIX: +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) + printf("[%2d] Address is %s # DOCA\n", list_count++, pbuf); + else +#endif printf("[%2d] ADDR = %s # %s\n", list_count++, data->server_addr.addr_un.sun_path, PRINT_PROTOCOL(data->sock_type)); break; default: diff --git a/src/os_abstract.cpp b/src/os_abstract.cpp index 2b6816e5..8258ac12 100644 --- a/src/os_abstract.cpp +++ b/src/os_abstract.cpp @@ -139,6 +139,36 @@ os_thread_t os_getthread(void) { #endif return mythread; } +// Conditional Functions + +void os_cond_init(os_cond_t *cond) { +#ifdef __windows__ + // +#else + pthread_cond_init(&cond->cond, NULL); +#endif +} +void os_cond_destroy(os_cond_t *cond) { +#ifdef __windows__ + // +#else + pthread_cond_destroy(&cond->cond); +#endif +} +void os_cond_wait(os_cond_t *cond, os_mutex_t *lock) { +#ifdef __windows__ + // +#else + pthread_cond_wait(&cond->cond, &lock->mutex); +#endif +} +void os_cond_signal(os_cond_t *cond) { +#ifdef __windows__ + // +#else + pthread_cond_signal(&cond->cond); +#endif +} // Mutex functions diff --git a/src/os_abstract.h b/src/os_abstract.h index 5121770a..fdadbc25 100644 --- a/src/os_abstract.h +++ b/src/os_abstract.h @@ -188,6 +188,14 @@ typedef struct os_mutex_t { #endif } os_mutex_t; +typedef struct os_cond_t { +#ifndef __windows__ + pthread_cond_t cond; +#else + CONDITION_VARIABLE cond; +#endif +} os_cond_t; + typedef struct os_cpuset_t { #ifdef __windows__ DWORD_PTR cpuset; @@ -234,6 +242,13 @@ void os_thread_kill(os_thread_t *thr); void os_thread_join(os_thread_t *thr); os_thread_t os_getthread(void); +// Cond Functions + +void os_cond_init(os_cond_t *cond); +void os_cond_destroy(os_cond_t *cond); +void os_cond_wait(os_cond_t *cond, os_mutex_t *lock); +void os_cond_signal(os_cond_t *cond); + // Mutex functions void os_mutex_init(os_mutex_t *lock); diff --git a/src/server.cpp b/src/server.cpp index d0004d99..401f19ec 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -95,8 +95,12 @@ int ServerBase::initBeforeLoop() { } std::string hostport = sockaddr_to_hostport(p_bind_addr); +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { +#else log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { +#endif //USING_DOCA_COMM_CHANNEL_API log_err("[fd=%d] Can`t bind socket, IP to bind: %s\n", ifd, hostport.c_str()); rc = SOCKPERF_ERR_SOCKET; diff --git a/src/sockperf.cpp b/src/sockperf.cpp index 0eb41ce6..c568d43e 100644 --- a/src/sockperf.cpp +++ b/src/sockperf.cpp @@ -103,6 +103,8 @@ #ifndef __windows__ #include #endif +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" // forward declarations from Client.cpp & Server.cpp extern void client_sig_handler(int signum); @@ -138,6 +140,7 @@ static int proc_mode_ping_pong(int, int, const char **); static int proc_mode_throughput(int, int, const char **); static int proc_mode_playback(int, int, const char **); static int proc_mode_server(int, int, const char **); +int bringup_for_doca(std::unique_ptr &tmp); static const struct app_modes { int (*func)(int, int, const char **); /* proc function */ @@ -297,6 +300,14 @@ static const AOPT_DESC common_opt_desc[] = { { OPT_TLS, AOPT_OPTARG, aopt_set_literal(0), aopt_set_string("tls"), "Use TLSv1.2 (default " TLS_CHIPER_DEFAULT ")." }, #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + { OPT_DOCA, AOPT_NOARG, aopt_set_literal(0), + aopt_set_string("doca-comm-channel"), "Use Doca communication channel" }, + { OPT_PCI, AOPT_ARG, aopt_set_literal(0), + aopt_set_string("pci-address"), "Comm Channel DOCA device PCI address"}, + { OPT_PCI_REP, AOPT_ARG, aopt_set_literal(0), + aopt_set_string("pci-representor"), "Comm Channel DOCA device representor PCI address"}, +#endif /* USING_DOCA_COMM_CHANNEL_API */ { 'd', AOPT_NOARG, aopt_set_literal('d'), aopt_set_string("debug"), "Print extra debug information." }, { 0, AOPT_NOARG, aopt_set_literal(0), aopt_set_string(NULL), NULL } @@ -2217,6 +2228,44 @@ static int parse_common_opt(const AOPT_OBJECT *common_obj) { } } #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (!rc && aopt_check(common_obj, OPT_DOCA)) { + if (aopt_check(common_obj, 'tls')) { + log_msg("--doca-comm-channel conflicts with --tls option"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } + if (!aopt_check(common_obj, OPT_PCI)) { + log_msg("doca-comm-channel must have pci address"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + const char *optarg = aopt_value(common_obj, OPT_PCI); + if (optarg) { + strcpy(s_user_params.cc_dev_pci_addr, optarg); + } + } + if (s_user_params.mode == MODE_SERVER) { + if (!aopt_check(common_obj, OPT_PCI_REP)) { + log_msg("doca-comm-channel server must have pci representor address"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + const char *optarg = aopt_value(common_obj, OPT_PCI_REP); + if (optarg) { + strcpy(s_user_params.cc_dev_rep_pci_addr, optarg); + } + } + } + s_user_params.doca_comm_channel = true; + + /* Create PE */ + doca_error_t doca_error; + doca_error = doca_pe_create(&(s_user_params.pe)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Fail creating pe with error %s", doca_error_get_name(doca_error)); + rc = SOCKPERF_ERR_NO_MEMORY; + } + log_dbg("doca_pe_create succeeded"); + } +#endif /* USING_DOCA_COMM_CHANNEL_API */ } // resolve address: -i, -p and --tcp options must be processed before @@ -2407,6 +2456,25 @@ void cleanup() { FREE(g_fds_array[ifd]->memberships_addr); } if (s_user_params.addr.addr.sa_family == AF_UNIX) { +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + struct cc_ctx *ctx = g_fds_array[ifd]->doca_cc_ctx; + doca_pe_destroy(s_user_params.pe); + doca_dev_close(ctx->hw_dev); + os_mutex_close(&ctx->lock); + os_cond_destroy(&ctx->cond); + if (s_user_params.mode == MODE_SERVER) { + struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; + doca_cc_server_destroy(ctx_server->server); + doca_dev_rep_close(ctx_server->rep_dev); + FREE(ctx_server); + } else { // MODE_CLIENT + struct cc_ctx_client *ctx_client = (struct cc_ctx_client*)ctx; + doca_cc_client_destroy(ctx_client->client); + FREE(ctx_client); + } + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ os_unlink_unix_path(s_user_params.client_bind_info.addr_un.sun_path); #ifndef __windows__ // AF_UNIX with DGRAM isn't supported in __windows__ if (s_user_params.mode == MODE_CLIENT && s_user_params.sock_type == SOCK_DGRAM) { // unlink binded client @@ -3283,7 +3351,7 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { #endif // USING_EXTRA_API std::unique_ptr tmp{ new fds_data }; - + bool skip_socket = false; int res = resolve_sockaddr(addr.c_str(), port.c_str(), sock_type, false, reinterpret_cast(&tmp->server_addr), tmp->server_addr_len); if (res != 0) { @@ -3359,23 +3427,69 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { g_fds_array[curr_fd]->memberships_size++; } else { /* create a socket */ - if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < - 0) { // TODO: use SOCKET all over the way and avoid this cast - log_err("socket(AF_INET4/6, SOCK_x)"); - rc = SOCKPERF_ERR_SOCKET; + int i = 0; + + for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { + tmp->active_fd_list[i] = (int)INVALID_SOCKET; // TODO: use SOCKET all + // over the way and avoid + // this cast } - fd_socket_map[port_desc_tmp] = curr_fd; - if (tmp->is_multicast) { - tmp->memberships_addr = reinterpret_cast(MALLOC( - IGMP_MAX_MEMBERSHIPS * sizeof(struct sockaddr_store_t))); + + // TODO: In the following malloc we have a one time memory allocation of + // 128KB that are not reclaimed + // This O(1) leak was introduced in revision 133 + tmp->recv.buf = (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); + if (!tmp->recv.buf) { + log_err("Failed to allocate memory with malloc()"); + rc = SOCKPERF_ERR_NO_MEMORY; } else { - tmp->memberships_addr = NULL; + tmp->recv.cur_addr = tmp->recv.buf; + tmp->recv.max_size = MAX_PAYLOAD_SIZE; + tmp->recv.cur_offset = 0; + tmp->recv.cur_size = tmp->recv.max_size; + } + +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + log_dbg("starting feedfile for doca"); + int curr_fd = bringup_for_doca(tmp); + s_fd_num++; + skip_socket = true; + fd_socket_map[port_desc_tmp] = curr_fd; + if (new_socket_flag) { + if (s_fd_num == 1) { /*it is the first fd*/ + s_fd_min = curr_fd; + s_fd_max = curr_fd; + } else { + g_fds_array[last_fd]->next_fd = curr_fd; + s_fd_min = _min(s_fd_min, curr_fd); + s_fd_max = _max(s_fd_max, curr_fd); + } + last_fd = curr_fd; + g_fds_array[curr_fd] = tmp.release(); + } } - tmp->memberships_size = 0; +#endif //USING_DOCA_COMM_CHANNEL_API + if (!skip_socket) { + if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < + 0) { // TODO: use SOCKET all over the way and avoid this cast + log_err("socket(AF_INET4/6, SOCK_x)"); + rc = SOCKPERF_ERR_SOCKET; + } + fd_socket_map[port_desc_tmp] = curr_fd; + log_msg("FD is %d", curr_fd); + if (tmp->is_multicast) { + tmp->memberships_addr = reinterpret_cast(MALLOC( + IGMP_MAX_MEMBERSHIPS * sizeof(struct sockaddr_store_t))); + } else { + tmp->memberships_addr = NULL; + } + tmp->memberships_size = 0; - s_fd_num++; + s_fd_num++; + } } - if (curr_fd >= 0) { + if (!skip_socket && curr_fd >= 0) { if ((curr_fd >= max_fds_num) || (prepare_socket(curr_fd, tmp.get()) == (int) INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid this cast @@ -3383,38 +3497,17 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { close(curr_fd); rc = SOCKPERF_ERR_SOCKET; } else { - int i = 0; - - for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { - tmp->active_fd_list[i] = (int)INVALID_SOCKET; // TODO: use SOCKET all - // over the way and avoid - // this cast - } - // TODO: In the following malloc we have a one time memory allocation of - // 128KB that are not reclaimed - // This O(1) leak was introduced in revision 133 - tmp->recv.buf = (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); - if (!tmp->recv.buf) { - log_err("Failed to allocate memory with malloc()"); - rc = SOCKPERF_ERR_NO_MEMORY; - } else { - tmp->recv.cur_addr = tmp->recv.buf; - tmp->recv.max_size = MAX_PAYLOAD_SIZE; - tmp->recv.cur_offset = 0; - tmp->recv.cur_size = tmp->recv.max_size; - - if (new_socket_flag) { - if (s_fd_num == 1) { /*it is the first fd*/ - s_fd_min = curr_fd; - s_fd_max = curr_fd; - } else { - g_fds_array[last_fd]->next_fd = curr_fd; - s_fd_min = _min(s_fd_min, curr_fd); - s_fd_max = _max(s_fd_max, curr_fd); - } - last_fd = curr_fd; - g_fds_array[curr_fd] = tmp.release(); + if (new_socket_flag) { + if (s_fd_num == 1) { /*it is the first fd*/ + s_fd_min = curr_fd; + s_fd_max = curr_fd; + } else { + g_fds_array[last_fd]->next_fd = curr_fd; + s_fd_min = _min(s_fd_min, curr_fd); + s_fd_max = _max(s_fd_max, curr_fd); } + last_fd = curr_fd; + g_fds_array[curr_fd] = tmp.release(); } } } @@ -3517,6 +3610,162 @@ static bool fds_array_is_valid() { return ((fd == s_fd_max) && ((i + 1) == s_fd_num) && (g_fds_array[fd]->next_fd == s_fd_min)); } +#if defined(USING_DOCA_COMM_CHANNEL_API) +int bringup_for_doca(std::unique_ptr &tmp) +{ + log_dbg("creating Doca with name %s", s_user_params.addr.addr_un.sun_path); + struct cc_ctx_server *ctx_server; + struct cc_ctx_client *ctx_client; + struct cc_ctx cc_ctx; + + cc_ctx.recv_buffer = tmp->recv.buf; + cc_ctx.num_connected_clients = 0; + cc_ctx.buf_size = 0; + os_mutex_init(&cc_ctx.lock); + os_cond_init(&cc_ctx.cond); + + struct priv_doca_pci_bdf dev_pcie = {0}; + doca_error_t doca_error = DOCA_SUCCESS; + struct doca_ctx *ctx; + + int epoll_fd = epoll_create(max_fds_num); + doca_event_handle_t event_handle = doca_event_invalid_handle; + struct epoll_event ev = { 0, { 0 } }; + ev.events = EPOLLIN | EPOLLPRI; + + /* Saving fd into context */ + cc_ctx.fd = epoll_fd; + /* Convert the PCI addresses into the matching struct */ + doca_error = cc_parse_pci_addr(s_user_params.cc_dev_pci_addr, &dev_pcie); + if (doca_error != DOCA_SUCCESS) { + errno = EPERM; + log_dbg("doca error %s", doca_error_get_descr(doca_error)); + exit_with_err("Failed to parse the device PCI address", SOCKPERF_ERR_FATAL); + } + log_dbg("parse_pci_addr succeeded for hw_dev"); + + /* Open DOCA device according to the given PCI address */ + doca_error = cc_open_doca_device_with_pci(&dev_pcie, NULL, &(cc_ctx.hw_dev)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to open Comm Channel DOCA device based on PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("open_doca_device_with_pci succeeded for hw_dev"); + + if (s_user_params.mode == MODE_SERVER) { + ctx_server = (struct cc_ctx_server*)MALLOC(sizeof(struct cc_ctx_server)); + cc_ctx.recv_flag = true; + /* Convert the PCI addresses into the matching struct */ + struct priv_doca_pci_bdf dev_rep_pcie = {0}; + doca_error = cc_parse_pci_addr(s_user_params.cc_dev_rep_pci_addr, &dev_rep_pcie); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to parse the device representor PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("parse_pci_addr succeeded for rep_dev"); + + /* Open DOCA device representor according to the given PCI address */ + doca_error = cc_open_doca_device_rep_with_pci(cc_ctx.hw_dev, DOCA_DEVINFO_REP_FILTER_NET, + &dev_rep_pcie, &(ctx_server->rep_dev)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to open Comm Channel DOCA device representor based on PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("open_doca_device_rep_with_pci succeeded for rep_dev"); + + doca_error = doca_cc_server_create(cc_ctx.hw_dev, ctx_server->rep_dev, + s_user_params.addr.addr_un.sun_path, &(ctx_server->server)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to create server with error %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("doca_cc_server_create succeeded"); + ctx = doca_cc_server_as_ctx(ctx_server->server); + + } else { // MODE_CLIENT + ctx_client = (struct cc_ctx_client*)MALLOC(sizeof(struct cc_ctx_client)); + ctx_client->state = CONNECTION_IN_PROGRESS; + cc_ctx.recv_flag = false; + if (!s_user_params.b_client_ping_pong && !s_user_params.b_stream) { // latency_under_load + ctx_client->underload_mode = true; + } else { + ctx_client->underload_mode = false; + } + + doca_error = doca_cc_client_create(cc_ctx.hw_dev, s_user_params.addr.addr_un.sun_path, + &(ctx_client->client)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to create client with error %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("doca_cc_client_create succeeded"); + ctx = doca_cc_client_as_ctx(ctx_client->client); + } + + doca_error = doca_pe_connect_ctx(s_user_params.pe, ctx); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed adding pe context to server with error = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + log_dbg("doca_pe_connect_ctx succeeded"); + +/*******************************************************/ +/*Set Callback*/ + if (s_user_params.mode == MODE_SERVER) { + ctx_server->ctx = cc_ctx; + tmp->doca_cc_ctx = &ctx_server->ctx; + doca_error = cc_doca_server_set_params(ctx_server); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting server params = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + } else { // MODE_CLIENT + ctx_client->ctx = cc_ctx; + tmp->doca_cc_ctx = &ctx_client->ctx; + doca_error = cc_doca_client_set_params(ctx_client); + if (doca_error != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed setting client params = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + } +/********** Event handling fd **********/ + + /* DOCA_LOG_INFO("Registering PE event"); */ + /* doca_event_handle_t is a file descriptor that can be added to an epoll. */ + /* Currently not implemented*/ + /* Works alongisde with doca_pe_request_notification */ + /* doca_pe_get_notification_handle(s_user_params.pe, &event_handle);*/ + + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_handle, &ev); + log_dbg("epoll fd is %d", epoll_fd); + + // Sock type is not needed for comm channel flow + tmp->sock_type = -1; + return epoll_fd; + +destroy_cc: + if (doca_error != DOCA_SUCCESS) { + + /* Destroy Comm Channel DOCA device */ + doca_dev_close(cc_ctx.hw_dev); + /* Destroy PE*/ + doca_pe_destroy(s_user_params.pe); + os_mutex_close(&cc_ctx.lock); + os_cond_destroy(&cc_ctx.cond); + s_user_params.pe = NULL; + if (s_user_params.mode == MODE_SERVER) { + doca_cc_server_destroy(ctx_server->server); + doca_dev_rep_close(ctx_server->rep_dev); + FREE(ctx_server); + } else { + doca_cc_client_destroy(ctx_client->client); + FREE(ctx_client); + } + exit_with_err("Com channel bringup failed", SOCKPERF_ERR_FATAL); + } +} +#endif //USING_DOCA_COMM_CHANNEL_API + //------------------------------------------------------------------------------ int bringup(const int *p_daemonize) { int rc = SOCKPERF_ERR_NONE; @@ -3627,37 +3876,46 @@ int bringup(const int *p_daemonize) { rc = SOCKPERF_ERR_NO_MEMORY; } else { /* create a socket */ - if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < - 0) { // TODO: use SOCKET all over the way and avoid this cast - log_err("socket(AF_INET4/6/AF_UNIX, SOCK_x)"); - rc = SOCKPERF_ERR_SOCKET; + int i = 0; + s_fd_num = 1; + + for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { + tmp->active_fd_list[i] = (int)INVALID_SOCKET; + } + tmp->recv.buf = + (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); + if (!tmp->recv.buf) { + log_err("Failed to allocate memory with malloc()"); + rc = SOCKPERF_ERR_NO_MEMORY; } else { - if ((curr_fd >= max_fds_num) || - (prepare_socket(curr_fd, tmp.get()) == - (int)INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid - // this cast - log_err("Invalid socket"); - close(curr_fd); + tmp->recv.cur_addr = tmp->recv.buf; + tmp->recv.max_size = MAX_PAYLOAD_SIZE; + tmp->recv.cur_offset = 0; + tmp->recv.cur_size = tmp->recv.max_size; + } + bool skip_socket = false; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + int epoll_fd = bringup_for_doca(tmp); + skip_socket = true; + s_fd_min = s_fd_max = epoll_fd; + g_fds_array[s_fd_min] = tmp.release(); + g_fds_array[s_fd_min]->next_fd = s_fd_min; + } +#endif /* USING_DOCA_COMM_CHANNEL_API */ + if (!skip_socket) { + if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < 0) { // TODO: use SOCKET all over the way and avoid this cast + log_err("socket(AF_INET4/6/AF_UNIX, SOCK_x)"); rc = SOCKPERF_ERR_SOCKET; } else { - int i = 0; - - s_fd_num = 1; - - for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { - tmp->active_fd_list[i] = (int)INVALID_SOCKET; - } - tmp->recv.buf = - (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); - if (!tmp->recv.buf) { - log_err("Failed to allocate memory with malloc()"); - rc = SOCKPERF_ERR_NO_MEMORY; + if ((curr_fd >= max_fds_num) || + (prepare_socket(curr_fd, tmp.get()) == + (int)INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid + // this cast + log_err("Invalid socket"); + close(curr_fd); + rc = SOCKPERF_ERR_SOCKET; } else { - tmp->recv.cur_addr = tmp->recv.buf; - tmp->recv.max_size = MAX_PAYLOAD_SIZE; - tmp->recv.cur_offset = 0; - tmp->recv.cur_size = tmp->recv.max_size; - s_fd_min = s_fd_max = curr_fd; g_fds_array[s_fd_min] = tmp.release(); g_fds_array[s_fd_min]->next_fd = s_fd_min; @@ -3676,12 +3934,12 @@ int bringup(const int *p_daemonize) { } } } - +#ifndef USING_DOCA_COMM_CHANNEL_API if (!rc && !fds_array_is_valid()) { log_err("Sanity check failed for sockets list"); rc = SOCKPERF_ERR_FATAL; } - +#endif /* USING_DOCA_COMM_CHANNEL_API */ if (!rc && (s_user_params.threads_num > s_fd_num || s_user_params.threads_num == 0)) { log_msg("Number of threads should be less than sockets count"); rc = SOCKPERF_ERR_BAD_ARGUMENT;