Skip to content

Commit

Permalink
Adding Doca Communication Channel API
Browse files Browse the repository at this point in the history
  • Loading branch information
Eldar Shalev committed Dec 29, 2022
1 parent 5ebd327 commit b45a7b6
Show file tree
Hide file tree
Showing 9 changed files with 580 additions and 92 deletions.
41 changes: 35 additions & 6 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ AC_CANONICAL_SYSTEM

TARGETDIR="unknown"
case "$host" in

i?86-*-*) TARGET=X86; TARGETDIR=x86;;
ia64*-*-*) TARGET=IA64; TARGETDIR=ia64;;
powerpc*-*-linux* | powerpc-*-sysv*) TARGET=POWERPC; TARGETDIR=powerpc;;
Expand All @@ -26,14 +26,14 @@ case "$host" in
aarch64-*-*) TARGET=AARCH64; TARGETDIR=aarch64;;
s390*-*-*) TARGET=S390; TARGETDIR=s390;;
esac

AC_SUBST(AM_RUNTESTFLAGS)
AC_SUBST(AM_LTLDFLAGS)

if test $TARGETDIR = unknown; then
AC_MSG_ERROR(["it has not been ported to $host."])
fi

AM_CONDITIONAL(X86, test x$TARGET = xX86)
AM_CONDITIONAL(IA64, test x$TARGET = xIA64)
AM_CONDITIONAL(POWERPC, test x$TARGET = xPOWERPC)
Expand Down Expand Up @@ -100,6 +100,34 @@ AC_MSG_CHECKING(
[for vma extra api])
AC_MSG_RESULT([${have_vma_api}])

##########################################################################
# check DOCA communication channel API
#
AC_ARG_ENABLE(
[doca-communication-channel-api],
AC_HELP_STRING([--enable-doca-communication-channel-api],
[SOCKPERF: enable DOCA communication channel extra api support (default=no)]),
[have_doca_comm_channel_api=$enableval],
[have_doca_comm_channel_api=no])
AS_IF([test "x${have_doca_comm_channel_api}" == "xyes"],
if test "$have_doca_comm_channel_api" = "yes"
then
have_doca_comm_channel_api=/opt/mellanox/doca
fi

CPPFLAGS="$CPPFLAGS -I$have_doca_comm_channel_api/include -I$have_doca_comm_channel_api/lib -I$have_doca_comm_channel_api/samples"
LIBS="$LIBS -ldoca_comm_channel -ldoca_common"

[AC_CHECK_HEADERS([$have_doca_comm_channel_api/include/doca_comm_channel.h $have_doca_comm_channel_api/include/doca_dev.h $have_doca_comm_channel_api/samples/common.h],
[AC_DEFINE([USING_DOCA_COMM_CHANNEL_API],[1],[[Enable using DOCA communication channel extra API]])
],
[AC_MSG_ERROR([doca_comm_channel.h file not found])]
[have_doca_comm_channel_api=no])])
AC_MSG_CHECKING(
[for doca communication channel extra api])
AC_MSG_RESULT([${have_doca_comm_channel_api}])


##########################################################################
# check XLIO extra API
#
Expand Down Expand Up @@ -141,7 +169,7 @@ AM_CONDITIONAL(DOC, test "x$have_doc" = "xyes")


##########################
# Enable tests
# Enable tests
#
SP_ARG_ENABLE_BOOL(
[test],
Expand All @@ -151,7 +179,7 @@ AM_CONDITIONAL(TEST, test "x$have_test" = "xyes")


##########################
# Enable tools
# Enable tools
#
SP_ARG_ENABLE_BOOL(
[tool],
Expand Down Expand Up @@ -233,6 +261,7 @@ AC_MSG_RESULT([
test: ${have_test}
tool: ${have_tool}
vma_api: ${have_vma_api}
doca_api: ${have_doca_comm_channel_api}
xlio_api: ${have_xlio_api}
debug: ${have_debug}
])
6 changes: 5 additions & 1 deletion src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,11 @@ int Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration,
if (!(data && (data->active_fd_list))) continue;

const sockaddr_store_t *p_client_bind_addr = &g_pApp->m_const_params.client_bind_info;
if (p_client_bind_addr->ss_family != AF_UNSPEC) {
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (p_client_bind_addr->ss_family != AF_UNSPEC && !s_user_params.doca_comm_channel) {
#else
if (p_client_bind_addr->ss_family != AF_UNSPEC) {
#endif //USING_DOCA_COMM_CHANNEL_API
socklen_t client_bind_addr_len = g_pApp->m_const_params.client_bind_info_len;
std::string hostport = sockaddr_to_hostport(p_client_bind_addr);
#ifdef __linux__
Expand Down
6 changes: 5 additions & 1 deletion src/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ std::string sockaddr_to_hostport(const struct sockaddr *addr)
if (addr->sa_family == AF_INET6) {
return "[" + std::string(hbuf) + "]:" + std::string(pbuf);
} else if (addr->sa_family == AF_UNIX) {
return std::string(addr->sa_data);
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (s_user_params.doca_comm_channel)
return std::string(pbuf) + " [DOCA]";
#endif
return std::string(pbuf) + " [UNIX]";
} else {
return std::string(hbuf) + ":" + std::string(pbuf);
}
Expand Down
38 changes: 38 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#ifndef COMMON_H_
#define COMMON_H_
#define POLL_TIMEOUT_MS -1

#include <string>

Expand Down Expand Up @@ -112,6 +113,43 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes,
ret = tls_write(g_fds_array[fd]->tls_handle, buf, nbytes);
} else
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (g_fds_array[fd]->ep) {
doca_error_t doca_error;

if (s_user_params.is_blocked) { // the condifiton remains outside due to perforamnce
doca_error_t arm_ret;
int doca_cc = g_fds_array[fd]->com_channel_fd_send;
struct pollfd fds = { .fd = doca_cc, .events = POLLIN | POLLOUT, };

if (strlen(s_user_params.feedfile_name)) { // Feedfile
if ((doca_error = doca_comm_channel_ep_sendto(g_fds_array[fd]->ep, buf, nbytes, DOCA_CC_MSG_FLAG_NONE,
g_fds_array[fd]->peer_addr)) == DOCA_ERROR_AGAIN) {
arm_ret = doca_comm_channel_ep_event_handle_arm_send(g_fds_array[fd]->ep);
if (arm_ret != DOCA_SUCCESS) {
log_err("Error: arming event handle failed in %s", __func__);
return arm_ret;
}
}
} else { // Not Feefile
while ((doca_error = doca_comm_channel_ep_sendto(g_fds_array[fd]->ep, buf, nbytes, DOCA_CC_MSG_FLAG_NONE,
g_fds_array[fd]->peer_addr)) == DOCA_ERROR_AGAIN) {
arm_ret = doca_comm_channel_ep_event_handle_arm_send(g_fds_array[fd]->ep);
if (arm_ret != DOCA_SUCCESS) {
printf("Error: arming event handle failed in %s", __func__);
return arm_ret;
}
poll(&fds, 1, POLL_TIMEOUT_MS);
}
}
} else {
while ((doca_error = doca_comm_channel_ep_sendto(g_fds_array[fd]->ep, buf, nbytes, DOCA_CC_MSG_FLAG_NONE,
g_fds_array[fd]->peer_addr)) == DOCA_ERROR_AGAIN) {
}
}
ret = nbytes;
} else
#endif /* USING_DOCA_COMM_CHANNEL_API */
{
ret = sendto(fd, buf, nbytes, flags, sendto_addr, addrlen);
}
Expand Down
27 changes: 26 additions & 1 deletion src/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ typedef unsigned short int sa_family_t;
#endif // USING_XLIO_EXTRA_API
#endif // !WIN32 && !__FreeBSD__

#ifdef USING_DOCA_COMM_CHANNEL_API
#include "doca_comm_channel.h"
#include "doca_dev.h"
//#include <samples/common.h>
#define MAX_MSGS 40
#define MSG_SIZE 4080
#define PCI_ADDR_LEN 8
#define CC_MAX_QUEUE_SIZE 10 /* Maximum amount of message in queue */
#endif /* USING_DOCA_COMM_CHANNEL_API */

#define MIN_PAYLOAD_SIZE (MsgHeader::EFFECTIVE_SIZE)
extern int MAX_PAYLOAD_SIZE;
#define MAX_STREAM_SIZE (50 * 1024 * 1024)
Expand Down Expand Up @@ -277,8 +287,12 @@ enum {
OPT_HISTOGRAM, // 46
OPT_LOAD_XLIO, // 47
#if defined(DEFINED_TLS)
OPT_TLS
OPT_TLS,
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
OPT_DOCA
#endif /* USING_DOCA_COMM_CHANNEL_API */

};

static const char *const round_trip_str[] = { "latency", "rtt" };
Expand Down Expand Up @@ -561,6 +575,14 @@ struct fds_data {
#if defined(DEFINED_TLS)
void *tls_handle = nullptr;
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
struct doca_comm_channel_ep_t *ep = nullptr;
struct doca_comm_channel_addr_t *peer_addr = nullptr;
char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */
char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */
int com_channel_fd_recv = 0; /* Com Channel recv Doca FD*/
int com_channel_fd_send = 0; /* Com Channel send Doca FD*/
#endif /* USING_DOCA_COMM_CHANNEL_API */

fds_data()
{
Expand Down Expand Up @@ -769,6 +791,9 @@ struct user_params_t {
#if defined(DEFINED_TLS)
bool tls = false;
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
bool doca_comm_channel = false;
#endif /* USING_DOCA_COMM_CHANNEL_API */

user_params_t() {
memset(&client_bind_info, 0, sizeof(client_bind_info));
Expand Down
45 changes: 45 additions & 0 deletions src/input_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#ifndef INPUT_HANDLERS_H_
#define INPUT_HANDLERS_H_
#define POLL_TIMEOUT -1

#include "message_parser.h"

Expand Down Expand Up @@ -71,6 +72,50 @@ class RecvFromInputHandler : public MessageParser<InPlaceAccumulation> {
ret = tls_read(g_fds_array[fd]->tls_handle, buf, m_recv_data.cur_size);
} else
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (g_fds_array[fd]->ep) {
size_t msg_len = m_recv_data.cur_size;
doca_error_t doca_error;
if (s_user_params.is_blocked) {
doca_error_t arm_ret;
int doca_cc = g_fds_array[fd]->com_channel_fd_recv;
struct pollfd fds = { .fd = doca_cc, .events = POLLIN | POLLOUT, };

if (strlen(s_user_params.feedfile_name)) { // Feedfile
doca_error = doca_comm_channel_ep_recvfrom(g_fds_array[fd]->ep, (void*)buf, &msg_len, DOCA_CC_MSG_FLAG_NONE,
&(g_fds_array[fd]->peer_addr));
if (doca_error == DOCA_ERROR_AGAIN) {
arm_ret = doca_comm_channel_ep_event_handle_arm_recv(g_fds_array[fd]->ep);
if (arm_ret != DOCA_SUCCESS) {
log_err("Error: arming event handle failed in %s", __func__);
return arm_ret;
}
msg_len = 0;
}
} else { // Not Feedfile
while ((doca_error = doca_comm_channel_ep_recvfrom(g_fds_array[fd]->ep, (void*)buf, &msg_len, DOCA_CC_MSG_FLAG_NONE,
&(g_fds_array[fd]->peer_addr))) == DOCA_ERROR_AGAIN) {
arm_ret = doca_comm_channel_ep_event_handle_arm_recv(g_fds_array[fd]->ep);
if (arm_ret != DOCA_SUCCESS) {
log_err("Error: arming event handle failed in %s", __func__);
return arm_ret;
}
poll(&fds, 1, POLL_TIMEOUT);
}
}
} else {
while ((doca_error = doca_comm_channel_ep_recvfrom(g_fds_array[fd]->ep, (void*)buf, &msg_len, DOCA_CC_MSG_FLAG_NONE,
&(g_fds_array[fd]->peer_addr))) == DOCA_ERROR_AGAIN) {
msg_len = MSG_SIZE;
}
}

m_actual_buf = buf;
m_actual_buf_size = msg_len;
return msg_len;

} else
#endif /* USING_DOCA_COMM_CHANNEL_API */
{
ret = recvfrom(fd, buf, m_recv_data.cur_size,
flags, (struct sockaddr *)recvfrom_addr, &size);
Expand Down
5 changes: 5 additions & 0 deletions src/iohandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ static void print_addresses(const fds_data *data, int &list_count)
NI_NUMERICHOST | NI_NUMERICSERV);
switch (data->server_addr.ss_family) {
case AF_UNIX:
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (s_user_params.doca_comm_channel)
printf("[%2d] Address is %s # DOCA\n", list_count++, pbuf);
else
#endif
printf("[%2d] ADDR = %s # %s\n", list_count++, data->server_addr.addr_un.sun_path, PRINT_PROTOCOL(data->sock_type));
break;
default:
Expand Down
44 changes: 24 additions & 20 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,30 @@ int ServerBase::initBeforeLoop() {
p_bind_addr = &bind_addr;
}

std::string hostport = sockaddr_to_hostport(p_bind_addr);
log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str());
if (bind(ifd, reinterpret_cast<const sockaddr *>(p_bind_addr), bind_addr_len) < 0) {
log_err("[fd=%d] Can`t bind socket, IP to bind: %s\n", ifd,
hostport.c_str());
rc = SOCKPERF_ERR_SOCKET;
break;
}
/*
* since when using VMA there is no qp until the bind, and vma cannot
* check that rate-limit is supported this is done here and not
* with the rest of the setsockopt
*/
if (s_user_params.rate_limit > 0 &&
sock_set_rate_limit(ifd, s_user_params.rate_limit)) {
log_err("[fd=%d] failed setting rate limit, %s\n", ifd,
hostport.c_str());
rc = SOCKPERF_ERR_SOCKET;
break;
}
std::string hostport = sockaddr_to_hostport(p_bind_addr);
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast<const sockaddr *>(p_bind_addr), bind_addr_len) < 0) {
#else
log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str());
if (bind(ifd, reinterpret_cast<const sockaddr *>(p_bind_addr), bind_addr_len) < 0) {
#endif //USING_DOCA_COMM_CHANNEL_API
log_err("[fd=%d] Can`t bind socket, IP to bind: %s\n", ifd,
hostport.c_str());
rc = SOCKPERF_ERR_SOCKET;
break;
}
/*
* since when using VMA there is no qp until the bind, and vma cannot
* check that rate-limit is supported this is done here and not
* with the rest of the setsockopt
*/
if (s_user_params.rate_limit > 0 &&
sock_set_rate_limit(ifd, s_user_params.rate_limit)) {
log_err("[fd=%d] failed setting rate limit, %s\n", ifd,
hostport.c_str());
rc = SOCKPERF_ERR_SOCKET;
break;
}

if ((g_fds_array[ifd]->sock_type == SOCK_STREAM) && (listen(ifd, 10) < 0)) {
log_err("Failed listen() for connection\n");
Expand Down
Loading

0 comments on commit b45a7b6

Please sign in to comment.