Skip to content

Commit

Permalink
survey0: fix occasional SIGSEGV in resp0_ctx_recv() when surveyor clo…
Browse files Browse the repository at this point in the history
…ses pipe
  • Loading branch information
aleksejsolovev committed Dec 3, 2024
1 parent ec714f0 commit af7bc15
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 0 deletions.
13 changes: 13 additions & 0 deletions cmake/NNGHelpers.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ function(nng_test NAME)
endif ()
endfunction()

function(nng_test_quiet NAME)
if (NNG_TESTS)
add_executable(${NAME} ${NAME}.c ${ARGN})
target_link_libraries(${NAME} nng_testing)
target_include_directories(${NAME} PRIVATE
${PROJECT_SOURCE_DIR}/tests
${PROJECT_SOURCE_DIR}/src
${PROJECT_SOURCE_DIR}/include)
add_test(NAME ${NNG_TEST_PREFIX}.${NAME} COMMAND ${NAME} -t)
set_tests_properties(${NNG_TEST_PREFIX}.${NAME} PROPERTIES TIMEOUT 180)
endif ()
endfunction()

function(nng_test_if COND NAME)
if (${COND} AND NNG_TESTS)
add_executable(${NAME} ${NAME}.c ${ARGN})
Expand Down
1 change: 1 addition & 0 deletions src/sp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ nng_test(device_test)
nng_test(multistress_test)
nng_test(nonblock_test)
nng_test(scalability_test)
nng_test_quiet(reconnect_stress_test)
4 changes: 4 additions & 0 deletions src/sp/protocol/survey0/respond.c
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ resp0_pipe_close(void *arg)

nni_mtx_lock(&s->mtx);
p->closed = true;
if (nni_list_active(&s->recvpipes, p)) {
// We are no longer "receivable".
nni_list_remove(&s->recvpipes, p);
}
while ((ctx = nni_list_first(&p->sendq)) != NULL) {
nni_aio *aio;
nni_msg *msg;
Expand Down
269 changes: 269 additions & 0 deletions src/sp/reconnect_stress_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
#include <nng/nng.h>
#include <core/nng_impl.h>
#include <nng/protocol/survey0/survey.h>
#include <nng/protocol/survey0/respond.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/reqrep0/rep.h>

#include <nuts.h>

enum state {
SEND,
RECV,
};

struct work;
typedef void (*work_fn)(struct work *);

struct work {
work_fn start;
nng_socket socket;
nng_aio * aio;
enum state state;
nni_atomic_int received;
};

void
work_send(struct work *w, void *data, size_t size)
{
nng_msg *msg;

w->state = SEND;
NUTS_PASS(nng_msg_alloc(&msg, 0));
NUTS_PASS(nng_msg_append(msg, data, size));
nng_aio_set_msg(w->aio, msg);
nng_send_aio(w->socket, w->aio);
}

void
work_listen(struct work *w, const char *url)
{
NUTS_PASS(nng_listen(w->socket, url, NULL, 0));
}

void
work_dial(struct work *w, const char * const * urls, size_t urls_size)
{
size_t i;

for (i = 0; i < urls_size; ++i)
NUTS_PASS(nng_dial(w->socket, urls[i], NULL, 0));
}

void
close_work(struct work *w)
{
nng_close(w->socket);
nng_aio_free(w->aio);
}

void
ping_start(struct work *w)
{
work_send(w, "ping", 5);
}

void
ping_cb(void *arg)
{
nng_msg *msg;
struct work *w = arg;
int result = nng_aio_result(w->aio);

if (result)
switch (result) {
case NNG_ETIMEDOUT:
case NNG_ESTATE:
ping_start(w);
return;
case NNG_ECANCELED:
case NNG_ECLOSED:
return;
default:
NUTS_PASS(result);
abort();
}

switch (w->state) {
case SEND:
w->state = RECV;
nng_recv_aio(w->socket, w->aio);
break;
case RECV:
msg = nng_aio_get_msg(w->aio);
TEST_ASSERT(msg);
TEST_ASSERT(nng_msg_len(msg) == 5);
TEST_ASSERT(0 == strncmp(nng_msg_body(msg), "echo", 4));
nng_msg_free(msg);
nni_atomic_inc(&w->received);
ping_start(w);
break;
}
}

void
echo_start(struct work *w)
{
w->state = RECV;
nng_recv_aio(w->socket, w->aio);
}

void
echo_cb(void *arg)
{
nng_msg *msg;
struct work *w = arg;
int result = nng_aio_result(w->aio);

if (result)
switch (result) {
case NNG_ECANCELED:
case NNG_ECLOSED:
return;
default:
NUTS_PASS(result);
abort();
}

switch (w->state) {
case RECV:
msg = nng_aio_get_msg(w->aio);
TEST_ASSERT(msg);
TEST_ASSERT(nng_msg_len(msg) == 5);
TEST_ASSERT(0 == strncmp(nng_msg_body(msg), "ping", 4));
nng_msg_free(msg);
nni_atomic_inc(&w->received);
work_send(w, "echo", 5);
break;
case SEND:
echo_start(w);
break;
}
}

#define CLIENTS_COUNT 64
#define SERVICES_COUNT 8
#define CLIENT_RX_COUNT 100
#define TEST_DURATION_MS 3000
#define SURVEY_TIMEOUT_MS 100

void
surveyor_open(struct work *w)
{
w->start = ping_start;
NUTS_PASS(nng_surveyor_open(&w->socket));
NUTS_PASS(nng_socket_set_ms(w->socket, NNG_OPT_SURVEYOR_SURVEYTIME, SURVEY_TIMEOUT_MS));
NUTS_PASS(nng_aio_alloc(&w->aio, ping_cb, w));
nni_atomic_init(&w->received);
}

void
respondent_open(struct work *w)
{
w->start = echo_start;
NUTS_PASS(nng_respondent_open(&w->socket));
NUTS_PASS(nng_aio_alloc(&w->aio, echo_cb, w));
nni_atomic_init(&w->received);
}

void
req_open(struct work *w)
{
w->start = ping_start;
NUTS_PASS(nng_req_open(&w->socket));
NUTS_PASS(nng_aio_alloc(&w->aio, ping_cb, w));
nni_atomic_init(&w->received);
}

void
rep_open(struct work *w)
{
w->start = echo_start;
NUTS_PASS(nng_rep_open(&w->socket));
NUTS_PASS(nng_aio_alloc(&w->aio, echo_cb, w));
nni_atomic_init(&w->received);
}

void
run_test(work_fn open_service, work_fn open_client)
{
int i;
nng_time stop_time;
struct work * service;
struct work * client;
struct work services[SERVICES_COUNT];
struct work clients [CLIENTS_COUNT];

const char * service_urls[SERVICES_COUNT] = {
"inproc://stressA",
"inproc://stressB",
"inproc://stressC",
"inproc://stressD",
"inproc://stressE",
"inproc://stressF",
"inproc://stressG",
"inproc://stressH",
};

for (i = 0; i < SERVICES_COUNT; i++) {
service = &services[i];
(*open_service)(service);
work_listen(service, service_urls[i]);
(*service->start)(service);
}

for (i = 0; i < CLIENTS_COUNT; i++) {
client = &clients[i];
(*open_client)(client);
work_dial(client, service_urls, SERVICES_COUNT);
(*client->start)(client);
}

stop_time = nng_clock() + TEST_DURATION_MS;
while (nng_clock() < stop_time) {
client = &clients[nng_random() % CLIENTS_COUNT];
while (nni_atomic_get(&client->received) < CLIENT_RX_COUNT)
nng_msleep(1);
close_work(client);
(*open_client)(client);
work_dial(client, service_urls, SERVICES_COUNT);
(*client->start)(client);
}

for (i = 0; i < CLIENTS_COUNT; i++)
close_work(&clients[i]);
for (i = 0; i < SERVICES_COUNT; i++)
close_work(&services[i]);
}

void
reconnect_stress_respondent(void)
{
run_test(respondent_open, surveyor_open);
}

void
reconnect_stress_surveyor(void)
{
run_test(surveyor_open, respondent_open);
}

void
reconnect_stress_rep(void)
{
run_test(rep_open, req_open);
}

void
reconnect_stress_req(void)
{
run_test(req_open, rep_open);
}

TEST_LIST = {
{ "reconnect stress respondent", reconnect_stress_respondent },
{ "reconnect stress surveyor", reconnect_stress_surveyor },
{ "reconnect stress rep", reconnect_stress_rep },
{ "reconnect stress req", reconnect_stress_req },
{ NULL, NULL },
};

0 comments on commit af7bc15

Please sign in to comment.