From 32d8fbe8daa23263ea886eca380ebfaf26cbb4c9 Mon Sep 17 00:00:00 2001 From: Eldar Shalev Date: Sun, 10 Jul 2022 16:45:31 +0300 Subject: [PATCH] Adding Doca Communication Channel API --- configure.ac | 41 ++- src/client.cpp | 6 +- src/common.cpp | 6 +- src/common.h | 60 ++++ src/defs.h | 22 +- src/doca_cc_helper.h | 671 +++++++++++++++++++++++++++++++++++++++++++ src/input_handlers.h | 31 ++ src/iohandlers.cpp | 5 + src/server.cpp | 44 +-- src/sockperf.cpp | 365 +++++++++++++++++++---- 10 files changed, 1159 insertions(+), 92 deletions(-) create mode 100644 src/doca_cc_helper.h diff --git a/configure.ac b/configure.ac index d8a87111..fa407af3 100644 --- a/configure.ac +++ b/configure.ac @@ -10,7 +10,7 @@ AC_CANONICAL_SYSTEM TARGETDIR="unknown" case "$host" in - + i?86-*-*) TARGET=X86; TARGETDIR=x86;; ia64*-*-*) TARGET=IA64; TARGETDIR=ia64;; powerpc*-*-linux* | powerpc-*-sysv*) TARGET=POWERPC; TARGETDIR=powerpc;; @@ -26,14 +26,14 @@ case "$host" in aarch64-*-*) TARGET=AARCH64; TARGETDIR=aarch64;; s390*-*-*) TARGET=S390; TARGETDIR=s390;; esac - + AC_SUBST(AM_RUNTESTFLAGS) AC_SUBST(AM_LTLDFLAGS) - + if test $TARGETDIR = unknown; then AC_MSG_ERROR(["it has not been ported to $host."]) fi - + AM_CONDITIONAL(X86, test x$TARGET = xX86) AM_CONDITIONAL(IA64, test x$TARGET = xIA64) AM_CONDITIONAL(POWERPC, test x$TARGET = xPOWERPC) @@ -100,6 +100,34 @@ AC_MSG_CHECKING( [for vma extra api]) AC_MSG_RESULT([${have_vma_api}]) +########################################################################## +# check DOCA communication channel API +# +AC_ARG_ENABLE( + [doca-communication-channel-api], + AC_HELP_STRING([--enable-doca-communication-channel-api], + [SOCKPERF: enable DOCA communication channel extra api support (default=no)]), + [have_doca_comm_channel_api=$enableval], + [have_doca_comm_channel_api=no]) +AS_IF([test "x${have_doca_comm_channel_api}" == "xyes"], + if test "$have_doca_comm_channel_api" = "yes" + then + have_doca_comm_channel_api=/opt/mellanox/doca + fi + + CPPFLAGS="$CPPFLAGS -I$have_doca_comm_channel_api/include -I$have_doca_comm_channel_api/lib -I$have_doca_comm_channel_api/samples" + LIBS="$LIBS -ldoca_comm_channel -ldoca_common" + + [AC_CHECK_HEADERS([$have_doca_comm_channel_api/include/doca_comm_channel.h $have_doca_comm_channel_api/include/doca_dev.h $have_doca_comm_channel_api/samples/common.h], + [AC_DEFINE([USING_DOCA_COMM_CHANNEL_API],[1],[[Enable using DOCA communication channel extra API]]) + ], + [AC_MSG_ERROR([doca_comm_channel.h file not found])] + [have_doca_comm_channel_api=no])]) +AC_MSG_CHECKING( + [for doca communication channel extra api]) +AC_MSG_RESULT([${have_doca_comm_channel_api}]) + + ########################################################################## # check XLIO extra API # @@ -141,7 +169,7 @@ AM_CONDITIONAL(DOC, test "x$have_doc" = "xyes") ########################## -# Enable tests +# Enable tests # SP_ARG_ENABLE_BOOL( [test], @@ -151,7 +179,7 @@ AM_CONDITIONAL(TEST, test "x$have_test" = "xyes") ########################## -# Enable tools +# Enable tools # SP_ARG_ENABLE_BOOL( [tool], @@ -233,6 +261,7 @@ AC_MSG_RESULT([ test: ${have_test} tool: ${have_tool} vma_api: ${have_vma_api} + doca_api: ${have_doca_comm_channel_api} xlio_api: ${have_xlio_api} debug: ${have_debug} ]) diff --git a/src/client.cpp b/src/client.cpp index 2f2b7268..a60c92fc 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -885,7 +885,11 @@ int Client::initBeforeLoop() { if (!(data && (data->active_fd_list))) continue; const sockaddr_store_t *p_client_bind_addr = &g_pApp->m_const_params.client_bind_info; - if (p_client_bind_addr->addr.sa_family != AF_UNSPEC) { +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (p_client_bind_addr->addr.sa_family != AF_UNSPEC && !s_user_params.doca_comm_channel) { +#else + if (p_client_bind_addr->addr.sa_family != AF_UNSPEC) { +#endif //USING_DOCA_COMM_CHANNEL_API socklen_t client_bind_addr_len = g_pApp->m_const_params.client_bind_info_len; std::string hostport = sockaddr_to_hostport(p_client_bind_addr); #if defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__) diff --git a/src/common.cpp b/src/common.cpp index 52de7d60..390387c3 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -64,7 +64,11 @@ std::string sockaddr_to_hostport(const struct sockaddr *addr) if (addr->sa_family == AF_INET6) { return "[" + std::string(hbuf) + "]:" + std::string(pbuf); } else if (addr->sa_family == AF_UNIX) { - return std::string(addr->sa_data); +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) + return std::string(pbuf) + " [DOCA]"; +#endif + return std::string(pbuf) + " [UNIX]"; } else { return std::string(hbuf) + ":" + std::string(pbuf); } diff --git a/src/common.h b/src/common.h index 7a4da907..9864cfd5 100644 --- a/src/common.h +++ b/src/common.h @@ -28,6 +28,7 @@ #ifndef COMMON_H_ #define COMMON_H_ +#define POLL_TIMEOUT_MS -1 #include @@ -112,6 +113,65 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, ret = tls_write(g_fds_array[fd]->tls_handle, buf, nbytes); } else #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (g_fds_array[fd]->doca_objects) { + doca_error_t doca_error; + struct doca_cc_send_task *task; + struct doca_task *task_obj; + union doca_data user_data; + struct cc_ctrl_path_objects *sample_objects = g_fds_array[fd]->doca_objects; + struct timespec ts = { + .tv_sec = 0, + .tv_nsec = SLEEP_IN_NANOS, + }; + + if (s_user_params.mode == MODE_SERVER) { + /* This function will only be called after a message was received, so connection should be available */ + if (sample_objects->connection == NULL) { + DOCA_LOG_ERR("Failed to send response: no connection available"); + return DOCA_ERROR_NOT_CONNECTED; + } + doca_error = doca_cc_server_send_task_alloc_init(sample_objects->server, sample_objects->connection, buf, + nbytes, &task); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to allocate task in server with error = %s", doca_error_get_name(doca_error)); + return doca_error; + } + } else { // MODE_CLIENT + // need to call doca_pe_progress here once, to move client to state DOCA_CTX_STATE_RUNNING + if (sample_objects->state != DOCA_CTX_STATE_RUNNING) { + log_dbg("msg_sendto: calling to doca_pe_progress"); + while (doca_pe_progress(sample_objects->pe) == 0) { + nanosleep(&ts, &ts); + } + } + doca_error = doca_cc_client_send_task_alloc_init(sample_objects->client, sample_objects->connection, buf, nbytes, &task); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to allocate task in client with error = %s", doca_error_get_name(doca_error)); + return doca_error; + } + } + + task_obj = doca_cc_send_task_as_task(task); + if (s_user_params.mode == MODE_SERVER) { + user_data.ptr = (void *)sample_objects; + doca_task_set_user_data(task_obj, user_data); + } + + doca_error = doca_task_submit(task_obj); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed submitting send task with error = %s", doca_error_get_name(doca_error)); + doca_task_free(task_obj); + return doca_error; + } + + while (doca_pe_progress(sample_objects->pe) == 0) { + nanosleep(&ts, &ts); + } + + ret = nbytes; + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ { ret = sendto(fd, buf, nbytes, flags, sendto_addr, addrlen); } diff --git a/src/defs.h b/src/defs.h index 04ed0e87..2ad2604d 100644 --- a/src/defs.h +++ b/src/defs.h @@ -28,7 +28,6 @@ #ifndef DEFS_H_ #define DEFS_H_ - #define __STDC_FORMAT_MACROS #ifdef __windows__ @@ -157,6 +156,11 @@ typedef unsigned short int sa_family_t; #endif // USING_XLIO_EXTRA_API #endif // !defined(__windows__) && !defined(__FreeBSD__) && !defined(__APPLE__) +#ifdef USING_DOCA_COMM_CHANNEL_API +#include "doca_cc_helper.h" +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#endif /* USING_DOCA_COMM_CHANNEL_API */ + #define MIN_PAYLOAD_SIZE (MsgHeader::EFFECTIVE_SIZE) extern int MAX_PAYLOAD_SIZE; extern int max_fds_num; @@ -291,8 +295,14 @@ enum { OPT_LOAD_XLIO, // 47 OPT_TCP_NB_CONN_TIMEOUT_MS, // 48 #if defined(DEFINED_TLS) - OPT_TLS + OPT_TLS, #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + OPT_DOCA, + OPT_PCI, + OPT_PCI_REP +#endif /* USING_DOCA_COMM_CHANNEL_API */ + }; static const char *const round_trip_str[] = { "latency", "rtt" }; @@ -571,6 +581,9 @@ struct fds_data { #if defined(DEFINED_TLS) void *tls_handle = nullptr; #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + struct cc_ctrl_path_objects *doca_objects = nullptr; +#endif /* USING_DOCA_COMM_CHANNEL_API */ fds_data() { @@ -804,6 +817,11 @@ struct user_params_t { #if defined(DEFINED_TLS) bool tls = false; #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + bool doca_comm_channel = false; + char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */ + char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */ +#endif /* USING_DOCA_COMM_CHANNEL_API */ user_params_t() { memset(&client_bind_info, 0, sizeof(client_bind_info)); diff --git a/src/doca_cc_helper.h b/src/doca_cc_helper.h new file mode 100644 index 00000000..f9e1a222 --- /dev/null +++ b/src/doca_cc_helper.h @@ -0,0 +1,671 @@ +/* + * Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of the Mellanox Technologies Ltd nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#ifndef DOCA_CC_HELPER_H_ +#define DOCA_CC_HELPER_H_ + +#include +#include + +#include "doca_comm_channel.h" +#include +#include +#include +#include +#include +#include + +#define MSG_SIZE 4080 +#define PCI_ADDR_LEN 8 +#define CC_MAX_QUEUE_SIZE 1024 /* Maximum amount of message in queue */ +#define CC_REC_QUEUE_SIZE 10 /* Maximum amount of message in queue */ +#define CC_SEND_TASK_NUM 10 /* Maximum amount of CC send task number */ +#define SLEEP_IN_NANOS (10 * 1000) + +#define log_dbg(log_fmt, ...) \ + printf("Doca CC" ": " log_fmt "\n", ##__VA_ARGS__); +#define DOCA_LOG_INFO(format, ...) log_dbg(format, ##__VA_ARGS__) +#define DOCA_LOG_ERR(format, ...) log_dbg(format, ##__VA_ARGS__) +#define DOCA_LOG_DBG(format, ...) log_dbg(format, ##__VA_ARGS__) + +struct cc_ctrl_path_objects { + struct doca_dev *hw_dev; /**< Device used in the sample */ + struct doca_dev_rep *rep_dev; /**< Device representor used in the sample */ + struct doca_pe *pe; /**< PE object used in the sample */ + struct doca_cc_server *server; /**< Server object used in the sample */ + struct doca_cc_client *client; /**< Client object used in the sample */ + struct doca_cc_connection *connection; /**< Connection object used in the sample */ + uint32_t num_connected_clients; /**< Number of currently connected clients */ + doca_error_t result; /**< Holds result will be updated in callbacks */ + uint8_t *recv_buffer; /**< Pointer to recv buffer*/ + int buf_size = 0; /**< Buffer size of recv buffer>*/ + enum doca_ctx_states state = DOCA_CTX_STATE_IDLE; /**< Holding state of connection>*/ + bool recv_flg = false; /** Client side only, flag for receiving*/ +}; +struct priv_doca_pci_bdf { + #define PCI_FUNCTION_MAX_VALUE 8 + #define PCI_DEVICE_MAX_VALUE 32 + #define PCI_BUS_MAX_VALUE 256 + union { + uint16_t raw; + struct { + uint16_t function : 3; + uint16_t device : 5; + uint16_t bus : 8; + }; + }; +}; + +/************** General ******************/ + +static doca_error_t +parse_pci_addr(char const *pci_addr, struct priv_doca_pci_bdf *out_bdf) +{ + unsigned int bus_bitmask = 0xFFFFFF00; + unsigned int dev_bitmask = 0xFFFFFFE0; + unsigned int func_bitmask = 0xFFFFFFF8; + uint32_t tmpu; + char tmps[4]; + + if (pci_addr == NULL || strlen(pci_addr) != 7 || pci_addr[2] != ':' || pci_addr[5] != '.') + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[0]; + tmps[1] = pci_addr[1]; + tmps[2] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & bus_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[3]; + tmps[1] = pci_addr[4]; + tmps[2] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & dev_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + tmps[0] = pci_addr[6]; + tmps[1] = '\0'; + tmpu = strtoul(tmps, NULL, 16); + if ((tmpu & func_bitmask) != 0) + return DOCA_ERROR_INVALID_VALUE; + + return DOCA_SUCCESS; +} + +typedef doca_error_t (*jobs_check)(struct doca_devinfo *); + +static doca_error_t +open_doca_device_with_pci(const struct priv_doca_pci_bdf *value, jobs_check func, struct doca_dev **retval) +{ + struct doca_devinfo **dev_list; + uint32_t nb_devs; + char pci_buf[DOCA_DEVINFO_REP_PCI_ADDR_SIZE] = {}; + + doca_error_t res; + size_t i; + + /* Set default return value */ + *retval = NULL; + + res = doca_devinfo_create_list(&dev_list, &nb_devs); + if (res != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to load doca devices list. Doca_error value: %d", res); + return res; + } + + /* Search */ + for (i = 0; i < nb_devs; i++) { + res = doca_devinfo_get_pci_addr_str(dev_list[i], pci_buf); + if (res == DOCA_SUCCESS) { + /* If any special capabilities are needed */ + if (func != NULL && func(dev_list[i]) != DOCA_SUCCESS) + continue; + + /* if device can be opened */ + res = doca_dev_open(dev_list[i], retval); + if (res == DOCA_SUCCESS) { + doca_devinfo_destroy_list(dev_list); + return res; + } + } + } + + DOCA_LOG_ERR("Matching device not found."); + res = DOCA_ERROR_NOT_FOUND; + + doca_devinfo_destroy_list(dev_list); + return res; +} + +static doca_error_t +open_doca_device_rep_with_pci(struct doca_dev *local, enum doca_devinfo_rep_filter filter, struct priv_doca_pci_bdf *pci_bdf, + struct doca_dev_rep **retval) +{ + uint32_t nb_rdevs = 0; + struct doca_devinfo_rep **rep_dev_list = NULL; + char pci_buf[DOCA_DEVINFO_REP_PCI_ADDR_SIZE] = {}; + + doca_error_t result; + size_t i; + + *retval = NULL; + + /* Search */ + result = doca_devinfo_rep_create_list(local, filter, &rep_dev_list, &nb_rdevs); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR( + "Failed to create devinfo representors list. Representor devices are available only on DPU, do not run on Host."); + return DOCA_ERROR_INVALID_VALUE; + } + + for (i = 0; i < nb_rdevs; i++) { + result = doca_devinfo_rep_get_pci_addr_str(rep_dev_list[i], pci_buf); + if (result == DOCA_SUCCESS && + doca_dev_rep_open(rep_dev_list[i], retval) == DOCA_SUCCESS) { + doca_devinfo_rep_destroy_list(rep_dev_list); + return DOCA_SUCCESS; + } + } + + DOCA_LOG_ERR("Matching device not found."); + doca_devinfo_rep_destroy_list(rep_dev_list); + return DOCA_ERROR_NOT_FOUND; +} + +/************** SERVER ******************/ + +/** + * Callback for send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +server_send_task_completion_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data ctx_user_data) +{ + struct cc_ctrl_path_objects *sample_objects = (struct cc_ctrl_path_objects *)ctx_user_data.ptr; + + /* This argument is not in use */ + (void)task_user_data; + + sample_objects->result = DOCA_SUCCESS; + // DOCA_LOG_INFO("Task sent successfully"); + + doca_task_free(doca_cc_send_task_as_task(task)); +} + +/** + * Callback for send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +server_send_task_completion_err_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data ctx_user_data) +{ + struct cc_ctrl_path_objects *sample_objects = (struct cc_ctrl_path_objects *)ctx_user_data.ptr; + + /* This argument is not in use */ + (void)task_user_data; + + sample_objects->result = doca_task_get_status(doca_cc_send_task_as_task(task)); + DOCA_LOG_ERR("Message failed to send with error = %s", doca_error_get_name(sample_objects->result)); + + doca_task_free(doca_cc_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_server_as_ctx(sample_objects->server)); +} + +/** + * Callback for message recv event + * + * @event [in]: Recv event object + * @recv_buffer [in]: Message buffer + * @msg_len [in]: Message len + * @cc_connection [in]: Connection the message was received on + */ +static void +server_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *recv_buffer, size_t msg_len, + struct doca_cc_connection *cc_connection) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + struct cc_ctrl_path_objects *sample_objects; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Save the connection that the ping was sent over for sending the response */ + sample_objects = (struct cc_ctrl_path_objects *)user_data.ptr; + sample_objects->connection = cc_connection; + + //DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); + if (sample_objects->result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to submit send task with error = %s", doca_error_get_name(sample_objects->result)); + (void)doca_ctx_stop(doca_cc_server_as_ctx(cc_server)); + } + memcpy(sample_objects->recv_buffer, recv_buffer,msg_len); + sample_objects->buf_size = (int)msg_len; +} + +/** + * Callback for connection event + * + * @event [in]: Connection event object + * @cc_conn [in]: Connection object + * @change_success [in]: Whether the connection was successful or not + */ +static void +server_connection_event_callback(struct doca_cc_event_connection_status_changed *event, + struct doca_cc_connection *cc_conn, bool change_success) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + struct cc_ctrl_path_objects *sample_objects; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_conn); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Update number of connected clients in case of successful connection */ + sample_objects = (struct cc_ctrl_path_objects *)user_data.ptr; + if (!change_success) { + DOCA_LOG_ERR("Failed connection received"); + return; + } + + sample_objects->num_connected_clients++; + DOCA_LOG_INFO("New client connected to server"); +} + +/** + * Callback for disconnection event + * + * @event [in]: Connection event object + * @cc_conn [in]: Connection object + * @change_success [in]: Whether the disconnection was successful or not + */ +static void +server_disconnection_event_callback(struct doca_cc_event_connection_status_changed *event, + struct doca_cc_connection *cc_conn, bool change_success) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + struct cc_ctrl_path_objects *sample_objects; + doca_error_t result; + + /* These arguments are not in use */ + (void)event; + (void)change_success; + + cc_server = doca_cc_server_get_server_ctx(cc_conn); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + /* Update number of connected clients in case of disconnection, Currently disconnection only happens if server + * sent a message to a client which already stopped. + */ + sample_objects = (struct cc_ctrl_path_objects *)user_data.ptr; + sample_objects->num_connected_clients--; + DOCA_LOG_INFO("A client was disconnected from server"); +} + + + +/** + * Callback triggered whenever CC server context state changes + * + * @user_data [in]: User data associated with the CC server context. Will hold struct cc_ctrl_path_objects * + * @ctx [in]: The CC server context that had a state change + * @prev_state [in]: Previous context state + * @next_state [in]: Next context state (context is already in this state when the callback is called) + */ +static void +cc_server_state_changed_callback(const union doca_data user_data, struct doca_ctx *ctx, enum doca_ctx_states prev_state, + enum doca_ctx_states next_state) +{ + (void)ctx; + (void)prev_state; + + struct cc_ctrl_path_objects *sample_objects = (struct cc_ctrl_path_objects *)user_data.ptr; + + switch (next_state) { + case DOCA_CTX_STATE_IDLE: + DOCA_LOG_INFO("CC server context has been stopped."); + break; + case DOCA_CTX_STATE_STARTING: + /** + * The context is in starting state, this is unexpected for CC server. + */ + DOCA_LOG_ERR("CC server context entered into starting state. Unexpected transition"); + break; + case DOCA_CTX_STATE_RUNNING: + DOCA_LOG_INFO("CC server context is running. Waiting for clients to connect"); + break; + case DOCA_CTX_STATE_STOPPING: + /** + * The context is in stopping, this can happen when fatal error encountered or when stopping context. + * doca_pe_progress() will cause all tasks to be flushed, and finally transition state to idle + */ + DOCA_LOG_INFO("CC server context entered into stopping state. Terminating connections with clients"); + break; + default: + break; + } +} + +static doca_error_t doca_server_set_params(struct cc_ctrl_path_objects *sample_object) +{ + struct doca_ctx *ctx; + doca_error_t doca_error; + union doca_data user_data; + + ctx = doca_cc_server_as_ctx(sample_object->server); + doca_error = doca_ctx_set_state_changed_cb(ctx, cc_server_state_changed_callback); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting state change callback with error = %s", doca_error_get_name(doca_error)); + } + DOCA_LOG_DBG("doca_ctx_set_state_changed_cb succeeded"); + + doca_error = doca_cc_server_send_task_set_conf(sample_object->server, server_send_task_completion_callback, + server_send_task_completion_err_callback, CC_SEND_TASK_NUM); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting send task cbs with error = %s", doca_error_get_name(doca_error)); + } + DOCA_LOG_DBG("doca_cc_server_send_task_set_conf succeeded"); + + doca_error = doca_cc_server_event_msg_recv_register(sample_object->server, server_message_recv_callback); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding message recv event cb with error = %s", doca_error_get_name(doca_error)); + } + DOCA_LOG_DBG("doca_cc_server_event_msg_recv_register succeeded"); + + doca_error = doca_cc_server_event_connection_register(sample_object->server, server_connection_event_callback, + server_disconnection_event_callback); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding connection event cbs with error = %s", doca_error_get_name(doca_error)); + } + DOCA_LOG_DBG("doca_cc_server_event_connection_register succeeded"); + + /* Set server properties */ + doca_error = doca_cc_server_set_max_msg_size(sample_object->server, MSG_SIZE); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(doca_error)); + } + + doca_error = doca_cc_server_set_recv_queue_size(sample_object->server, CC_REC_QUEUE_SIZE); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(doca_error)); + } + + user_data.ptr = sample_object; + doca_error = doca_ctx_set_user_data(ctx, user_data); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(doca_error)); + } + + doca_error = doca_ctx_start(ctx); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to start server context with error = %s", doca_error_get_name(doca_error)); + } + DOCA_LOG_DBG("server properties setters succeeded, ctx started"); + + return doca_error; + +} + +/************** CLIENT ******************/ + +/** + * Callback for send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +client_send_task_completion_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data ctx_user_data) +{ + struct cc_ctrl_path_objects *sample_objects = (struct cc_ctrl_path_objects *)ctx_user_data.ptr; + + /* This argument is not in use */ + (void)task_user_data; + + sample_objects->result = DOCA_SUCCESS; + DOCA_LOG_INFO("Task sent successfully"); + + doca_task_free(doca_cc_send_task_as_task(task)); +} + +/** + * Callback for send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ + +static void +client_send_task_completion_err_callback(struct doca_cc_send_task *task, union doca_data task_user_data, + union doca_data ctx_user_data) +{ + struct cc_ctrl_path_objects *sample_objects = (struct cc_ctrl_path_objects *)ctx_user_data.ptr; + + /* This argument is not in use */ + (void)task_user_data; + + sample_objects->result = doca_task_get_status(doca_cc_send_task_as_task(task)); + DOCA_LOG_ERR("Message failed to send with error = %s", doca_error_get_name(sample_objects->result)); + + doca_task_free(doca_cc_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_client_as_ctx(sample_objects->client)); +} + +/** + * Callback for message recv event + * + * @event [in]: Recv event object + * @recv_buffer [in]: Message buffer + * @msg_len [in]: Message len + * @cc_connection [in]: Connection the message was received on + */ +static void +client_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *recv_buffer, size_t msg_len, + struct doca_cc_connection *cc_connection) +{ + union doca_data user_data = doca_cc_connection_get_user_data(cc_connection); + struct cc_ctrl_path_objects *sample_objects = (struct cc_ctrl_path_objects *)user_data.ptr; + + /* This argument is not in use */ + (void)event; + + DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); + memcpy(sample_objects->recv_buffer, recv_buffer, msg_len); + sample_objects->buf_size = (int)msg_len; + sample_objects->recv_flg = true; +} + +/** + * Init message on client + * + * @sample_objects [in]: Sample objects struct + * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise + */ +static doca_error_t +init_client_send_message(struct cc_ctrl_path_objects *sample_objects) +{ + doca_error_t doca_error; + union doca_data user_data; + + doca_error = doca_cc_client_get_connection(sample_objects->client, &(sample_objects->connection)); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get connection from client with error = %s", doca_error_get_name(doca_error)); + return doca_error; + } + DOCA_LOG_INFO("doca_cc_client_get_connection succeeded"); + + user_data.ptr = (void *)sample_objects; + doca_error = doca_cc_connection_set_user_data(sample_objects->connection, user_data); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set user_data for connection with error = %s", doca_error_get_name(doca_error)); + return doca_error; + } + + sample_objects->state = DOCA_CTX_STATE_RUNNING; + DOCA_LOG_INFO("init_client_send_message succeeded"); + return DOCA_SUCCESS; + +} + +/** + * Callback triggered whenever CC client context state changes + * + * @user_data [in]: User data associated with the CC client context. Will hold struct cc_ctrl_path_objects * + * @ctx [in]: The CC client context that had a state change + * @prev_state [in]: Previous context state + * @next_state [in]: Next context state (context is already in this state when the callback is called) + */ +static void +cc_client_state_changed_callback(const union doca_data user_data, struct doca_ctx *ctx, enum doca_ctx_states prev_state, + enum doca_ctx_states next_state) +{ + (void)ctx; + (void)prev_state; + + struct cc_ctrl_path_objects *sample_objects = (struct cc_ctrl_path_objects *)user_data.ptr; + + switch (next_state) { + case DOCA_CTX_STATE_IDLE: + DOCA_LOG_INFO("CC client context has been stopped."); + break; + case DOCA_CTX_STATE_STARTING: + /** + * The context is in starting state, need to progress until connection with server is established. + */ + DOCA_LOG_INFO("CC client context entered into starting state. Waiting for connection establishment"); + break; + case DOCA_CTX_STATE_RUNNING: + DOCA_LOG_INFO("CC client context is running. Sending message"); + sample_objects->result = init_client_send_message(sample_objects); + if (sample_objects->result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to submit send task with error = %s", + doca_error_get_name(sample_objects->result)); + (void)doca_ctx_stop(doca_cc_client_as_ctx(sample_objects->client)); + } + break; + case DOCA_CTX_STATE_STOPPING: + /** + * The context is in stopping, this can happen when fatal error encountered or when stopping context. + * doca_pe_progress() will cause all tasks to be flushed, and finally transition state to idle + */ + DOCA_LOG_INFO("CC client context entered into stopping state. Waiting for connection termination"); + break; + default: + break; + } +} + +static doca_error_t doca_client_set_params(struct cc_ctrl_path_objects *sample_object) +{ + struct doca_ctx *ctx; + doca_error_t doca_error; + union doca_data user_data; + + ctx = doca_cc_client_as_ctx(sample_object->client); + doca_error = doca_ctx_set_state_changed_cb(ctx, cc_client_state_changed_callback); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting state change callback with error = %s", doca_error_get_name(doca_error)); + } + + doca_error = doca_cc_client_send_task_set_conf(sample_object->client, client_send_task_completion_callback, + client_send_task_completion_err_callback, CC_SEND_TASK_NUM); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting send task cbs with error = %s", doca_error_get_name(doca_error)); + } + + doca_error = doca_cc_client_event_msg_recv_register(sample_object->client, client_message_recv_callback); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding message recv event cb with error = %s", doca_error_get_name(doca_error)); + } + + /* Set client properties */ + doca_error = doca_cc_client_set_max_msg_size(sample_object->client, MSG_SIZE); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(doca_error)); + } + + doca_error = doca_cc_client_set_recv_queue_size(sample_object->client, CC_REC_QUEUE_SIZE); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(doca_error)); + } + + user_data.ptr = sample_object; + doca_error = doca_ctx_set_user_data(ctx, user_data); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(doca_error)); + } + + /* Client is not started until connection is finished, so getting connection in progress */ + doca_error = doca_ctx_start(ctx); + if (doca_error != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed to start client context with error = %s", doca_error_get_name(doca_error)); + } + DOCA_LOG_DBG("client properties setters succeeded, ctx started"); + + return DOCA_SUCCESS; +} + + + +#endif // DOCA_CC_HELPER_H \ No newline at end of file diff --git a/src/input_handlers.h b/src/input_handlers.h index 4578ffeb..9659bca1 100644 --- a/src/input_handlers.h +++ b/src/input_handlers.h @@ -28,6 +28,7 @@ #ifndef INPUT_HANDLERS_H_ #define INPUT_HANDLERS_H_ +#define POLL_TIMEOUT -1 #include "message_parser.h" @@ -71,6 +72,36 @@ class RecvFromInputHandler : public MessageParser { ret = tls_read(g_fds_array[fd]->tls_handle, buf, m_recv_data.cur_size); } else #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (g_fds_array[fd]->doca_objects) { + struct timespec ts = { + .tv_sec = 0, + .tv_nsec = SLEEP_IN_NANOS, + }; + struct cc_ctrl_path_objects *sample_objects = g_fds_array[fd]->doca_objects; + doca_error_t doca_error; + + // Mandatory for throughput test, Client side only + if (s_user_params.mode == MODE_CLIENT && !g_pApp->m_const_params.b_client_ping_pong) { + // As long as we don't have message to read + while (!sample_objects->recv_flg) { + nanosleep(&ts, &ts); + } + } + + // Waiting for meesage receive callback + while (doca_pe_progress(sample_objects->pe) == 0) { + nanosleep(&ts, &ts); + } + + m_actual_buf_size = sample_objects->buf_size; + m_actual_buf = sample_objects->recv_buffer; + + // Rellevant for Client side only, Done reading setting flag to send mode + sample_objects->recv_flg = false; + return sample_objects->buf_size; + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ { ret = recvfrom(fd, buf, m_recv_data.cur_size, flags, (struct sockaddr *)recvfrom_addr, &size); diff --git a/src/iohandlers.cpp b/src/iohandlers.cpp index 0903bfe4..74ee2628 100644 --- a/src/iohandlers.cpp +++ b/src/iohandlers.cpp @@ -38,6 +38,11 @@ void print_addresses(const fds_data *data, int &list_count) NI_NUMERICHOST | NI_NUMERICSERV); switch (data->server_addr.addr.sa_family) { case AF_UNIX: +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) + printf("[%2d] Address is %s # DOCA\n", list_count++, pbuf); + else +#endif printf("[%2d] ADDR = %s # %s\n", list_count++, data->server_addr.addr_un.sun_path, PRINT_PROTOCOL(data->sock_type)); break; default: diff --git a/src/server.cpp b/src/server.cpp index d0004d99..8d18e0e3 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -94,26 +94,30 @@ int ServerBase::initBeforeLoop() { p_bind_addr = &bind_addr; } - std::string hostport = sockaddr_to_hostport(p_bind_addr); - log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); - if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { - log_err("[fd=%d] Can`t bind socket, IP to bind: %s\n", ifd, - hostport.c_str()); - rc = SOCKPERF_ERR_SOCKET; - break; - } - /* - * since when using VMA there is no qp until the bind, and vma cannot - * check that rate-limit is supported this is done here and not - * with the rest of the setsockopt - */ - if (s_user_params.rate_limit > 0 && - sock_set_rate_limit(ifd, s_user_params.rate_limit)) { - log_err("[fd=%d] failed setting rate limit, %s\n", ifd, - hostport.c_str()); - rc = SOCKPERF_ERR_SOCKET; - break; - } + std::string hostport = sockaddr_to_hostport(p_bind_addr); +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { +#else + log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); + if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { +#endif //USING_DOCA_COMM_CHANNEL_API + log_err("[fd=%d] Can`t bind socket, IP to bind: %s\n", ifd, + hostport.c_str()); + rc = SOCKPERF_ERR_SOCKET; + break; + } + /* + * since when using VMA there is no qp until the bind, and vma cannot + * check that rate-limit is supported this is done here and not + * with the rest of the setsockopt + */ + if (s_user_params.rate_limit > 0 && + sock_set_rate_limit(ifd, s_user_params.rate_limit)) { + log_err("[fd=%d] failed setting rate limit, %s\n", ifd, + hostport.c_str()); + rc = SOCKPERF_ERR_SOCKET; + break; + } if ((g_fds_array[ifd]->sock_type == SOCK_STREAM) && (listen(ifd, 10) < 0)) { log_err("Failed listen() for connection\n"); diff --git a/src/sockperf.cpp b/src/sockperf.cpp index 0eb41ce6..7b5b1276 100644 --- a/src/sockperf.cpp +++ b/src/sockperf.cpp @@ -103,6 +103,8 @@ #ifndef __windows__ #include #endif +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" // forward declarations from Client.cpp & Server.cpp extern void client_sig_handler(int signum); @@ -138,6 +140,7 @@ static int proc_mode_ping_pong(int, int, const char **); static int proc_mode_throughput(int, int, const char **); static int proc_mode_playback(int, int, const char **); static int proc_mode_server(int, int, const char **); +int bringup_for_doca(std::unique_ptr &tmp); static const struct app_modes { int (*func)(int, int, const char **); /* proc function */ @@ -297,6 +300,14 @@ static const AOPT_DESC common_opt_desc[] = { { OPT_TLS, AOPT_OPTARG, aopt_set_literal(0), aopt_set_string("tls"), "Use TLSv1.2 (default " TLS_CHIPER_DEFAULT ")." }, #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + { OPT_DOCA, AOPT_NOARG, aopt_set_literal(0), + aopt_set_string("doca-comm-channel"), "Use Doca communication channel" }, + { OPT_PCI, AOPT_ARG, aopt_set_literal(0), + aopt_set_string("pci-address"), "Comm Channel DOCA device PCI address"}, + { OPT_PCI_REP, AOPT_ARG, aopt_set_literal(0), + aopt_set_string("pci-representor"), "Comm Channel DOCA device representor PCI address"}, +#endif /* USING_DOCA_COMM_CHANNEL_API */ { 'd', AOPT_NOARG, aopt_set_literal('d'), aopt_set_string("debug"), "Print extra debug information." }, { 0, AOPT_NOARG, aopt_set_literal(0), aopt_set_string(NULL), NULL } @@ -2217,6 +2228,33 @@ static int parse_common_opt(const AOPT_OBJECT *common_obj) { } } #endif /* DEFINED_TLS */ +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (!rc && aopt_check(common_obj, OPT_DOCA)) { + if (aopt_check(common_obj, 'tls')) { + log_msg("--doca-comm-channel conflicts with --tls option"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } + if (!aopt_check(common_obj, OPT_PCI)) { + log_msg("doca-comm-channel must have pci address"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + const char *optarg = aopt_value(common_obj, OPT_PCI); + if (optarg) + strcpy(s_user_params.cc_dev_pci_addr, optarg); + } + if (s_user_params.mode == MODE_SERVER) { + if (!aopt_check(common_obj, OPT_PCI_REP)) { + log_msg("doca-comm-channel server must have pci representor address"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + const char *optarg = aopt_value(common_obj, OPT_PCI_REP); + if (optarg) + strcpy(s_user_params.cc_dev_rep_pci_addr, optarg); + } + } + s_user_params.doca_comm_channel = true; + } +#endif /* USING_DOCA_COMM_CHANNEL_API */ } // resolve address: -i, -p and --tcp options must be processed before @@ -2407,6 +2445,20 @@ void cleanup() { FREE(g_fds_array[ifd]->memberships_addr); } if (s_user_params.addr.addr.sa_family == AF_UNIX) { +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (g_fds_array[ifd]->doca_objects) { + doca_error_t result = DOCA_SUCCESS; + if (s_user_params.mode == MODE_SERVER) { + doca_cc_server_destroy(g_fds_array[ifd]->doca_objects->server); + doca_dev_rep_close(g_fds_array[ifd]->doca_objects->rep_dev); + } else { // MODE_CLIENT + doca_cc_client_destroy(g_fds_array[ifd]->doca_objects->client); + } + doca_pe_destroy(g_fds_array[ifd]->doca_objects->pe); + doca_dev_close(g_fds_array[ifd]->doca_objects->hw_dev); + FREE(g_fds_array[ifd]->doca_objects); + } else +#endif /* USING_DOCA_COMM_CHANNEL_API */ os_unlink_unix_path(s_user_params.client_bind_info.addr_un.sun_path); #ifndef __windows__ // AF_UNIX with DGRAM isn't supported in __windows__ if (s_user_params.mode == MODE_CLIENT && s_user_params.sock_type == SOCK_DGRAM) { // unlink binded client @@ -2416,7 +2468,7 @@ void cleanup() { } #endif // __windows__ if (s_user_params.mode == MODE_SERVER) - os_unlink_unix_path(g_fds_array[ifd]->server_addr.addr_un.sun_path); + unlink(g_fds_array[ifd]->server_addr.addr_un.sun_path); } delete g_fds_array[ifd]; } @@ -2426,15 +2478,15 @@ void cleanup() { if (s_user_params.select_timeout) { FREE(s_user_params.select_timeout); } -#ifdef USING_EXTRA_API - if ((g_vma_api || g_xlio_api) && s_user_params.is_zcopyread) { +#ifdef USING_VMA_EXTRA_API + if (g_vma_api && s_user_params.is_zcopyread) { zeroCopyMap::iterator it; while ((it = g_zeroCopyData.begin()) != g_zeroCopyData.end()) { delete it->second; g_zeroCopyData.erase(it); } } -#endif // USING_EXTRA_API +#endif // USING_VMA_EXTRA_API if (g_fds_array) { FREE(g_fds_array); @@ -2670,6 +2722,7 @@ inline bool CallbackMessageHandler::handle_message() } return VMA_PACKET_DROP; }*/ + log_dbg("SHOULDN'T BE PRINTED.."); msgReply->setHeaderToHost(); } @@ -3283,7 +3336,7 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { #endif // USING_EXTRA_API std::unique_ptr tmp{ new fds_data }; - + bool skip_socket = false; int res = resolve_sockaddr(addr.c_str(), port.c_str(), sock_type, false, reinterpret_cast(&tmp->server_addr), tmp->server_addr_len); if (res != 0) { @@ -3359,23 +3412,70 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { g_fds_array[curr_fd]->memberships_size++; } else { /* create a socket */ - if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < - 0) { // TODO: use SOCKET all over the way and avoid this cast - log_err("socket(AF_INET4/6, SOCK_x)"); - rc = SOCKPERF_ERR_SOCKET; + int i = 0; + + for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { + tmp->active_fd_list[i] = (int)INVALID_SOCKET; // TODO: use SOCKET all + // over the way and avoid + // this cast } - fd_socket_map[port_desc_tmp] = curr_fd; - if (tmp->is_multicast) { - tmp->memberships_addr = reinterpret_cast(MALLOC( - IGMP_MAX_MEMBERSHIPS * sizeof(struct sockaddr_store_t))); + + // TODO: In the following malloc we have a one time memory allocation of + // 128KB that are not reclaimed + // This O(1) leak was introduced in revision 133 + tmp->recv.buf = (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); + if (!tmp->recv.buf) { + log_err("Failed to allocate memory with malloc()"); + rc = SOCKPERF_ERR_NO_MEMORY; } else { - tmp->memberships_addr = NULL; + tmp->recv.cur_addr = tmp->recv.buf; + tmp->recv.max_size = MAX_PAYLOAD_SIZE; + tmp->recv.cur_offset = 0; + tmp->recv.cur_size = tmp->recv.max_size; } - tmp->memberships_size = 0; - s_fd_num++; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + log_dbg("starting feedfile for doca"); + int curr_fd = bringup_for_doca(tmp); + s_fd_num++; + skip_socket = true; + fd_socket_map[port_desc_tmp] = curr_fd; + if (new_socket_flag) { + if (s_fd_num == 1) { /*it is the first fd*/ + s_fd_min = curr_fd; + s_fd_max = curr_fd; + } else { + g_fds_array[last_fd]->next_fd = curr_fd; + s_fd_min = _min(s_fd_min, curr_fd); + s_fd_max = _max(s_fd_max, curr_fd); + } + last_fd = curr_fd; + g_fds_array[curr_fd] = tmp.release(); + } + } +#endif //USING_DOCA_COMM_CHANNEL_API + + if (!skip_socket) { + if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < + 0) { // TODO: use SOCKET all over the way and avoid this cast + log_err("socket(AF_INET4/6, SOCK_x)"); + rc = SOCKPERF_ERR_SOCKET; + } + fd_socket_map[port_desc_tmp] = curr_fd; + log_msg("FD is %d", curr_fd); + if (tmp->is_multicast) { + tmp->memberships_addr = reinterpret_cast(MALLOC( + IGMP_MAX_MEMBERSHIPS * sizeof(struct sockaddr_store_t))); + } else { + tmp->memberships_addr = NULL; + } + tmp->memberships_size = 0; + + s_fd_num++; + } } - if (curr_fd >= 0) { + if (!skip_socket && curr_fd >= 0) { if ((curr_fd >= max_fds_num) || (prepare_socket(curr_fd, tmp.get()) == (int) INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid this cast @@ -3383,26 +3483,10 @@ static int set_sockets_from_feedfile(const char *feedfile_name) { close(curr_fd); rc = SOCKPERF_ERR_SOCKET; } else { - int i = 0; - - for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { - tmp->active_fd_list[i] = (int)INVALID_SOCKET; // TODO: use SOCKET all - // over the way and avoid - // this cast - } - // TODO: In the following malloc we have a one time memory allocation of - // 128KB that are not reclaimed - // This O(1) leak was introduced in revision 133 - tmp->recv.buf = (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); if (!tmp->recv.buf) { log_err("Failed to allocate memory with malloc()"); rc = SOCKPERF_ERR_NO_MEMORY; } else { - tmp->recv.cur_addr = tmp->recv.buf; - tmp->recv.max_size = MAX_PAYLOAD_SIZE; - tmp->recv.cur_offset = 0; - tmp->recv.cur_size = tmp->recv.max_size; - if (new_socket_flag) { if (s_fd_num == 1) { /*it is the first fd*/ s_fd_min = curr_fd; @@ -3517,6 +3601,154 @@ static bool fds_array_is_valid() { return ((fd == s_fd_max) && ((i + 1) == s_fd_num) && (g_fds_array[fd]->next_fd == s_fd_min)); } +#if defined(USING_DOCA_COMM_CHANNEL_API) +int bringup_for_doca(std::unique_ptr &tmp) +{ + log_dbg("creating Doca with name %s", s_user_params.addr.addr_un.sun_path); + struct cc_ctrl_path_objects *sample_objects = (struct cc_ctrl_path_objects*)MALLOC(sizeof(struct cc_ctrl_path_objects)); + if (!sample_objects) { + log_err("Failed to allocate memory for doca objects structure"); + exit_with_log(SOCKPERF_ERR_NO_MEMORY); + } + + sample_objects->recv_buffer = tmp->recv.buf; + sample_objects->recv_flg = false; + + struct priv_doca_pci_bdf dev_pcie = {0}; + doca_error_t doca_error = DOCA_SUCCESS; + union doca_data user_data; + struct doca_ctx *ctx; + + int epoll_fd = epoll_create(max_fds_num); + doca_event_handle_t event_handle = doca_event_invalid_handle; + struct epoll_event ev = { 0, { 0 } }; + ev.events = EPOLLIN | EPOLLPRI; + + /* Convert the PCI addresses into the matching struct */ + doca_error = parse_pci_addr(s_user_params.cc_dev_pci_addr, &dev_pcie); + if (doca_error != DOCA_SUCCESS) { + errno = EPERM; + log_dbg("doca error %s", doca_error_get_descr(doca_error)); + exit_with_err("Failed to parse the device PCI address", SOCKPERF_ERR_FATAL); + } + log_dbg("parse_pci_addr succeeded for hw_dev"); + + /* Open DOCA device according to the given PCI address */ + doca_error = open_doca_device_with_pci(&dev_pcie, NULL, &(sample_objects->hw_dev)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to open Comm Channel DOCA device based on PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("open_doca_device_with_pci succeeded for hw_dev"); + + if (s_user_params.mode == MODE_SERVER) { + /* Convert the PCI addresses into the matching struct */ + struct priv_doca_pci_bdf dev_rep_pcie = {0}; + doca_error = parse_pci_addr(s_user_params.cc_dev_rep_pci_addr, &dev_rep_pcie); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to parse the device representor PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("parse_pci_addr succeeded for rep_dev"); + + /* Open DOCA device representor according to the given PCI address */ + doca_error = open_doca_device_rep_with_pci(sample_objects->hw_dev, DOCA_DEVINFO_REP_FILTER_NET, + &dev_rep_pcie, &(sample_objects->rep_dev)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to open Comm Channel DOCA device representor based on PCI address %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("open_doca_device_rep_with_pci succeeded for rep_dev"); + + doca_error = doca_cc_server_create(sample_objects->hw_dev, sample_objects->rep_dev, + s_user_params.addr.addr_un.sun_path, &(sample_objects->server)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to create server with error %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + + log_dbg("doca_cc_server_create succeeded"); + ctx = doca_cc_server_as_ctx(sample_objects->server); + log_dbg("doca_cc_server_as_ctx succeeded"); + + } else { // MODE_CLIENT + doca_error = doca_cc_client_create(sample_objects->hw_dev, s_user_params.addr.addr_un.sun_path, + &(sample_objects->client)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed to create client with error %s", doca_error_get_descr(doca_error)); + goto destroy_cc; + } + log_dbg("doca_cc_client_create succeeded"); + ctx = doca_cc_client_as_ctx(sample_objects->client); + log_dbg("doca_cc_client_as_ctx succeeded"); + } + + /* Create PE */ + doca_error = doca_pe_create(&(sample_objects->pe)); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Fail creating pe with error %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + log_dbg("doca_pe_create succeeded"); + + doca_error = doca_pe_connect_ctx(sample_objects->pe, ctx); + if (doca_error != DOCA_SUCCESS) { + log_dbg("Failed adding pe context to server with error = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + log_dbg("doca_pe_connect_ctx succeeded"); + +/*******************************************************/ +/*Set Callback*/ + if (s_user_params.mode == MODE_SERVER) { + doca_error = doca_server_set_params(sample_objects); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting server params = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + } else { // MODE_CLIENT + doca_error = doca_client_set_params(sample_objects); + if (doca_error != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting client params = %s", doca_error_get_name(doca_error)); + goto destroy_cc; + } + } + +/********** Event handling fd **********/ + + DOCA_LOG_INFO("Registering PE event"); + /* doca_event_handle_t is a file descriptor that can be added to an epoll. */ + /* Currently not implemented*/ + /* Works alongisde with doca_pe_request_notification */ + doca_pe_get_notification_handle(sample_objects->pe, &event_handle); + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_handle, &ev); + log_dbg("epoll fd is %d", epoll_fd); + + // Sock type is not needed for comm channel flow + tmp->sock_type = -1; + tmp->doca_objects = sample_objects; + return epoll_fd; + +destroy_cc: + if (doca_error != DOCA_SUCCESS) { + /* Destroy Comm Channel DOCA device representor */ + if (s_user_params.mode == MODE_SERVER) { + doca_cc_server_destroy(sample_objects->server); + doca_dev_rep_close(sample_objects->rep_dev); + } else { + doca_cc_client_destroy(sample_objects->client); + } + /* Destroy Comm Channel DOCA device */ + doca_dev_close(sample_objects->hw_dev); + /* Destroy PE*/ + doca_pe_destroy(sample_objects->pe); + sample_objects->pe = NULL; + FREE(sample_objects); + exit_with_err("Com channel bringup failed", SOCKPERF_ERR_FATAL); + } +} +#endif //USING_DOCA_COMM_CHANNEL_API + //------------------------------------------------------------------------------ int bringup(const int *p_daemonize) { int rc = SOCKPERF_ERR_NONE; @@ -3627,37 +3859,46 @@ int bringup(const int *p_daemonize) { rc = SOCKPERF_ERR_NO_MEMORY; } else { /* create a socket */ - if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < - 0) { // TODO: use SOCKET all over the way and avoid this cast - log_err("socket(AF_INET4/6/AF_UNIX, SOCK_x)"); - rc = SOCKPERF_ERR_SOCKET; + int i = 0; + s_fd_num = 1; + + for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { + tmp->active_fd_list[i] = (int)INVALID_SOCKET; + } + tmp->recv.buf = + (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); + if (!tmp->recv.buf) { + log_err("Failed to allocate memory with malloc()"); + rc = SOCKPERF_ERR_NO_MEMORY; } else { - if ((curr_fd >= max_fds_num) || - (prepare_socket(curr_fd, tmp.get()) == - (int)INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid - // this cast - log_err("Invalid socket"); - close(curr_fd); + tmp->recv.cur_addr = tmp->recv.buf; + tmp->recv.max_size = MAX_PAYLOAD_SIZE; + tmp->recv.cur_offset = 0; + tmp->recv.cur_size = tmp->recv.max_size; + } + bool skip_socket = false; +#if defined(USING_DOCA_COMM_CHANNEL_API) + if (s_user_params.doca_comm_channel) { + int epoll_fd = bringup_for_doca(tmp); + skip_socket = true; + s_fd_min = s_fd_max = epoll_fd; + g_fds_array[s_fd_min] = tmp.release(); + g_fds_array[s_fd_min]->next_fd = s_fd_min; + } +#endif /* USING_DOCA_COMM_CHANNEL_API */ + if (!skip_socket) { + if ((curr_fd = (int)socket(tmp->server_addr.addr.sa_family, tmp->sock_type, 0)) < 0) { // TODO: use SOCKET all over the way and avoid this cast + log_err("socket(AF_INET4/6/AF_UNIX, SOCK_x)"); rc = SOCKPERF_ERR_SOCKET; } else { - int i = 0; - - s_fd_num = 1; - - for (i = 0; i < MAX_ACTIVE_FD_NUM; i++) { - tmp->active_fd_list[i] = (int)INVALID_SOCKET; - } - tmp->recv.buf = - (uint8_t *)MALLOC(sizeof(uint8_t) * 2 * MAX_PAYLOAD_SIZE); - if (!tmp->recv.buf) { - log_err("Failed to allocate memory with malloc()"); - rc = SOCKPERF_ERR_NO_MEMORY; + if ((curr_fd >= max_fds_num) || + (prepare_socket(curr_fd, tmp.get()) == + (int)INVALID_SOCKET)) { // TODO: use SOCKET all over the way and avoid + // this cast + log_err("Invalid socket"); + close(curr_fd); + rc = SOCKPERF_ERR_SOCKET; } else { - tmp->recv.cur_addr = tmp->recv.buf; - tmp->recv.max_size = MAX_PAYLOAD_SIZE; - tmp->recv.cur_offset = 0; - tmp->recv.cur_size = tmp->recv.max_size; - s_fd_min = s_fd_max = curr_fd; g_fds_array[s_fd_min] = tmp.release(); g_fds_array[s_fd_min]->next_fd = s_fd_min; @@ -3676,12 +3917,12 @@ int bringup(const int *p_daemonize) { } } } - +#ifndef USING_DOCA_COMM_CHANNEL_API if (!rc && !fds_array_is_valid()) { log_err("Sanity check failed for sockets list"); rc = SOCKPERF_ERR_FATAL; } - +#endif /* USING_DOCA_COMM_CHANNEL_API */ if (!rc && (s_user_params.threads_num > s_fd_num || s_user_params.threads_num == 0)) { log_msg("Number of threads should be less than sockets count"); rc = SOCKPERF_ERR_BAD_ARGUMENT;