Skip to content

Commit

Permalink
Adding Doca Communication Channel API
Browse files Browse the repository at this point in the history
  • Loading branch information
Eldar Shalev committed Sep 13, 2022
1 parent 97c0bb7 commit 0b1dc18
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 62 deletions.
34 changes: 28 additions & 6 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ AC_CANONICAL_SYSTEM

TARGETDIR="unknown"
case "$host" in

i?86-*-*) TARGET=X86; TARGETDIR=x86;;
ia64*-*-*) TARGET=IA64; TARGETDIR=ia64;;
powerpc*-*-linux* | powerpc-*-sysv*) TARGET=POWERPC; TARGETDIR=powerpc;;
Expand All @@ -26,14 +26,14 @@ case "$host" in
aarch64-*-*) TARGET=AARCH64; TARGETDIR=aarch64;;
s390*-*-*) TARGET=S390; TARGETDIR=s390;;
esac

AC_SUBST(AM_RUNTESTFLAGS)
AC_SUBST(AM_LTLDFLAGS)

if test $TARGETDIR = unknown; then
AC_MSG_ERROR(["it has not been ported to $host."])
fi

AM_CONDITIONAL(X86, test x$TARGET = xX86)
AM_CONDITIONAL(IA64, test x$TARGET = xIA64)
AM_CONDITIONAL(POWERPC, test x$TARGET = xPOWERPC)
Expand Down Expand Up @@ -100,6 +100,27 @@ AC_MSG_CHECKING(
[for vma extra api])
AC_MSG_RESULT([${have_vma_api}])

##########################################################################
# check DOCA communication channel API
#
AC_ARG_ENABLE(
[doca-communication-channel-api],
AC_HELP_STRING([--enable-doca-communication-channel-api],
[SOCKPERF: enable DOCA communication channel extra api support (default=no)]),
[have_doca_comm_channel_api=$enableval],
[have_doca_comm_channel_api=no])
AS_IF([test "x${have_doca_comm_channel_api}" == "xyes"],
[AC_CHECK_HEADER([doca_comm_channel.h],
[AC_DEFINE([USING_DOCA_COMM_CHANNEL_API],[1],[[Enable using DOCA communication channel extra API]])
#LDFLAGS="$LDFLAGS -ldoca_cc_ -L..."
],
[AC_MSG_ERROR([doca_comm_channel.h file not found])]
[have_doca_comm_channel_api=no])])
AC_MSG_CHECKING(
[for doca communication channel extra api])
AC_MSG_RESULT([${have_doca_comm_channel_api}])


##########################################################################
# check XLIO extra API
#
Expand Down Expand Up @@ -141,7 +162,7 @@ AM_CONDITIONAL(DOC, test "x$have_doc" = "xyes")


##########################
# Enable tests
# Enable tests
#
SP_ARG_ENABLE_BOOL(
[test],
Expand All @@ -151,7 +172,7 @@ AM_CONDITIONAL(TEST, test "x$have_test" = "xyes")


##########################
# Enable tools
# Enable tools
#
SP_ARG_ENABLE_BOOL(
[tool],
Expand Down Expand Up @@ -233,6 +254,7 @@ AC_MSG_RESULT([
test: ${have_test}
tool: ${have_tool}
vma_api: ${have_vma_api}
doca_api: ${have_doca_comm_channel_api}
xlio_api: ${have_xlio_api}
debug: ${have_debug}
])
19 changes: 18 additions & 1 deletion src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ int Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration,
if (!(data && (data->active_fd_list))) continue;

const sockaddr_store_t *p_client_bind_addr = &g_pApp->m_const_params.client_bind_info;
if (p_client_bind_addr->ss_family != AF_UNSPEC) {
if (p_client_bind_addr->ss_family != AF_UNSPEC && !s_user_params.doca_comm_channel) {
socklen_t client_bind_addr_len = g_pApp->m_const_params.client_bind_info_len;
std::string hostport = sockaddr_to_hostport(p_client_bind_addr);
#ifdef __linux__
Expand Down Expand Up @@ -953,6 +953,23 @@ int Client<IoType, SwitchDataIntegrity, SwitchActivityInfo, SwitchCycleDuration,
}
#endif /* DEFINED_TLS */
}


#if defined(USING_DOCA_COMM_CHANNEL_API)
if (g_fds_array[ifd]->ep) {
log_msg("connecting to g_fds_array[%d]->ep",ifd);
doca_error_t doca_error = DOCA_SUCCESS;
doca_error = doca_comm_channel_ep_connect(g_fds_array[ifd]->ep, s_user_params.addr.addr_un.sun_path,
&(g_fds_array[ifd]->peer_addr));

if (doca_error != DOCA_SUCCESS) {
log_err("Can`t connect to doca socket");
rc = SOCKPERF_ERR_SOCKET;
break;
}
}
#endif /* USING_DOCA_COMM_CHANNEL_API */

/*
* since when using VMA there is no qp until the bind, and vma cannot
* check that rate-limit is supported this is done here and not
Expand Down
6 changes: 5 additions & 1 deletion src/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ std::string sockaddr_to_hostport(const struct sockaddr *addr)
if (addr->sa_family == AF_INET6) {
return "[" + std::string(hbuf) + "]:" + std::string(pbuf);
} else if (addr->sa_family == AF_UNIX) {
return std::string(addr->sa_data);
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (s_user_params.doca_comm_channel)
return std::string(pbuf) + " [DOCA]";
#endif
return std::string(pbuf) + " [UNIX]";
} else {
return std::string(hbuf) + ":" + std::string(pbuf);
}
Expand Down
13 changes: 13 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes,
ret = tls_write(g_fds_array[fd]->tls_handle, buf, nbytes);
} else
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (g_fds_array[fd]->ep) {
if (!s_user_params.is_blocked)
flags |= DOCA_CC_MSG_FLAG_DONTWAIT;

doca_error_t doca_error;
doca_error = doca_comm_channel_ep_sendto(g_fds_array[fd]->ep, buf, nbytes, flags,
g_fds_array[fd]->peer_addr);
if (doca_error != DOCA_ERROR_AGAIN)
ret = nbytes;

} else
#endif /* USING_DOCA_COMM_CHANNEL_API */
{
ret = sendto(fd, buf, nbytes, flags, sendto_addr, addrlen);
}
Expand Down
19 changes: 18 additions & 1 deletion src/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ typedef unsigned short int sa_family_t;
#endif // USING_XLIO_EXTRA_API
#endif // !WIN32 && !__FreeBSD__

#ifdef USING_DOCA_COMM_CHANNEL_API
#include "doca_comm_channel.h"
#define MAX_MSGS 40
#define MSG_SIZE 2000
#endif /* USING_DOCA_COMM_CHANNEL_API */

#define MIN_PAYLOAD_SIZE (MsgHeader::EFFECTIVE_SIZE)
extern int MAX_PAYLOAD_SIZE;
#define MAX_STREAM_SIZE (50 * 1024 * 1024)
Expand Down Expand Up @@ -277,8 +283,12 @@ enum {
OPT_HISTOGRAM, // 46
OPT_LOAD_XLIO, // 47
#if defined(DEFINED_TLS)
OPT_TLS
OPT_TLS,
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
OPT_DOCA
#endif /* USING_DOCA_COMM_CHANNEL_API */

};

static const char *const round_trip_str[] = { "latency", "rtt" };
Expand Down Expand Up @@ -561,6 +571,10 @@ struct fds_data {
#if defined(DEFINED_TLS)
void *tls_handle = nullptr;
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
struct doca_comm_channel_ep_t *ep = nullptr;
struct doca_comm_channel_addr_t *peer_addr = nullptr;
#endif /* USING_DOCA_COMM_CHANNEL_API */

fds_data()
{
Expand Down Expand Up @@ -769,6 +783,9 @@ struct user_params_t {
#if defined(DEFINED_TLS)
bool tls = false;
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
bool doca_comm_channel = false;
#endif /* USING_DOCA_COMM_CHANNEL_API */

user_params_t() {
memset(&client_bind_info, 0, sizeof(client_bind_info));
Expand Down
16 changes: 16 additions & 0 deletions src/input_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ class RecvFromInputHandler : public MessageParser<InPlaceAccumulation> {
ret = tls_read(g_fds_array[fd]->tls_handle, buf, m_recv_data.cur_size);
} else
#endif /* DEFINED_TLS */
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (g_fds_array[fd]->ep) {
if (!s_user_params.is_blocked) {
flags |= DOCA_CC_MSG_FLAG_DONTWAIT;
}
size_t msg_len = m_recv_data.cur_size;
doca_error_t doca_error;
doca_error = doca_comm_channel_ep_recvfrom(g_fds_array[fd]->ep, (void*)buf, &msg_len, flags, &(g_fds_array[fd]->peer_addr));
if (doca_error != DOCA_ERROR_AGAIN) {
m_actual_buf = buf;
m_actual_buf_size = msg_len;
return msg_len;
}

} else
#endif /* USING_DOCA_COMM_CHANNEL_API */
{
ret = recvfrom(fd, buf, m_recv_data.cur_size,
flags, (struct sockaddr *)recvfrom_addr, &size);
Expand Down
5 changes: 5 additions & 0 deletions src/iohandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ static void print_addresses(const fds_data *data, int &list_count)
NI_NUMERICHOST | NI_NUMERICSERV);
switch (data->server_addr.ss_family) {
case AF_UNIX:
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (s_user_params.doca_comm_channel)
printf("[%2d] Address is %s # DOCA\n", list_count++, pbuf);
else
#endif
printf("[%2d] ADDR = %s # %s\n", list_count++, data->server_addr.addr_un.sun_path, PRINT_PROTOCOL(data->sock_type));
break;
default:
Expand Down
59 changes: 37 additions & 22 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,39 @@ int ServerBase::initBeforeLoop() {
#ifdef USING_VMA_EXTRA_API // VMA callback-extra-api Only
g_fds_array[ifd]->p_msg = m_pMsgReply;
#endif // USING_VMA_EXTRA_API
const sockaddr_store_t *p_bind_addr = &g_fds_array[ifd]->server_addr;
socklen_t bind_addr_len = g_fds_array[ifd]->server_addr_len;

struct sockaddr_store_t bind_addr;
// Meny: Can't bind to a multicast addr in Windows
if (g_fds_array[ifd]->memberships_size ||
is_multicast_addr(*p_bind_addr)) {
// if more then one address on socket (for multiple MC join case oon same port)
memcpy(&bind_addr, p_bind_addr, bind_addr_len);
switch (bind_addr.ss_family) {
case AF_INET:
reinterpret_cast<sockaddr_in &>(bind_addr).sin_addr.s_addr = INADDR_ANY;
break;
case AF_INET6:
reinterpret_cast<sockaddr_in6 &>(bind_addr).sin6_addr = in6addr_any;
break;
#if defined(USING_DOCA_COMM_CHANNEL_API)
if (g_fds_array[ifd]->ep) {
// Sockperf allows for several listen sockets- Phase 2 to support several listen sockets.
doca_error_t doca_error = DOCA_SUCCESS;
log_msg("listening to g_fds_array[%d]->ep. name is %s",ifd, s_user_params.addr.addr_un.sun_path);
doca_error = doca_comm_channel_ep_listen(g_fds_array[ifd]->ep, s_user_params.addr.addr_un.sun_path);
if (doca_error != DOCA_SUCCESS) {
log_err("Can`t listen to doca socket");
rc = SOCKPERF_ERR_SOCKET;
return rc;
}
} else {
#endif /* USING_DOCA_COMM_CHANNEL_API */

const sockaddr_store_t *p_bind_addr = &g_fds_array[ifd]->server_addr;
socklen_t bind_addr_len = g_fds_array[ifd]->server_addr_len;

struct sockaddr_store_t bind_addr;
// Meny: Can't bind to a multicast addr in Windows
if (g_fds_array[ifd]->memberships_size ||
is_multicast_addr(*p_bind_addr)) {
// if more then one address on socket (for multiple MC join case oon same port)
memcpy(&bind_addr, p_bind_addr, bind_addr_len);
switch (bind_addr.ss_family) {
case AF_INET:
reinterpret_cast<sockaddr_in &>(bind_addr).sin_addr.s_addr = INADDR_ANY;
break;
case AF_INET6:
reinterpret_cast<sockaddr_in6 &>(bind_addr).sin6_addr = in6addr_any;
break;
}
p_bind_addr = &bind_addr;
}
p_bind_addr = &bind_addr;
}

std::string hostport = sockaddr_to_hostport(p_bind_addr);
log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str());
Expand All @@ -116,10 +130,11 @@ int ServerBase::initBeforeLoop() {
break;
}

if ((g_fds_array[ifd]->sock_type == SOCK_STREAM) && (listen(ifd, 10) < 0)) {
log_err("Failed listen() for connection\n");
rc = SOCKPERF_ERR_SOCKET;
break;
if ((g_fds_array[ifd]->sock_type == SOCK_STREAM) && (listen(ifd, 10) < 0)) {
log_err("Failed listen() for connection\n");
rc = SOCKPERF_ERR_SOCKET;
break;
}
}
}
}
Expand Down
Loading

0 comments on commit 0b1dc18

Please sign in to comment.