diff --git a/cmake/NNGHelpers.cmake b/cmake/NNGHelpers.cmake index b2daa1dc2..f2a5723e4 100644 --- a/cmake/NNGHelpers.cmake +++ b/cmake/NNGHelpers.cmake @@ -117,6 +117,19 @@ function(nng_test NAME) endif () endfunction() +function(nng_test_quiet NAME) + if (NNG_TESTS) + add_executable(${NAME} ${NAME}.c ${ARGN}) + target_link_libraries(${NAME} nng_testing) + target_include_directories(${NAME} PRIVATE + ${PROJECT_SOURCE_DIR}/tests + ${PROJECT_SOURCE_DIR}/src + ${PROJECT_SOURCE_DIR}/include) + add_test(NAME ${NNG_TEST_PREFIX}.${NAME} COMMAND ${NAME} -t) + set_tests_properties(${NNG_TEST_PREFIX}.${NAME} PROPERTIES TIMEOUT 180) + endif () +endfunction() + function(nng_test_if COND NAME) if (${COND} AND NNG_TESTS) add_executable(${NAME} ${NAME}.c ${ARGN}) diff --git a/src/sp/CMakeLists.txt b/src/sp/CMakeLists.txt index ef79e674d..7cc361c2d 100644 --- a/src/sp/CMakeLists.txt +++ b/src/sp/CMakeLists.txt @@ -23,3 +23,4 @@ nng_test(device_test) nng_test(multistress_test) nng_test(nonblock_test) nng_test(scalability_test) +nng_test_quiet(reconnect_stress_test) diff --git a/src/sp/protocol/survey0/respond.c b/src/sp/protocol/survey0/respond.c index ad0732c12..ffa6a2dd9 100644 --- a/src/sp/protocol/survey0/respond.c +++ b/src/sp/protocol/survey0/respond.c @@ -334,6 +334,10 @@ resp0_pipe_close(void *arg) nni_mtx_lock(&s->mtx); p->closed = true; + if (nni_list_active(&s->recvpipes, p)) { + // We are no longer "receivable". + nni_list_remove(&s->recvpipes, p); + } while ((ctx = nni_list_first(&p->sendq)) != NULL) { nni_aio *aio; nni_msg *msg; diff --git a/src/sp/reconnect_stress_test.c b/src/sp/reconnect_stress_test.c new file mode 100644 index 000000000..3b221af84 --- /dev/null +++ b/src/sp/reconnect_stress_test.c @@ -0,0 +1,269 @@ +#include +#include +#include +#include +#include +#include + +#include + +enum state { + SEND, + RECV, +}; + +struct work; +typedef void (*work_fn)(struct work *); + +struct work { + work_fn start; + nng_socket socket; + nng_aio * aio; + enum state state; + nni_atomic_int received; +}; + +void +work_send(struct work *w, void *data, size_t size) +{ + nng_msg *msg; + + w->state = SEND; + NUTS_PASS(nng_msg_alloc(&msg, 0)); + NUTS_PASS(nng_msg_append(msg, data, size)); + nng_aio_set_msg(w->aio, msg); + nng_send_aio(w->socket, w->aio); +} + +void +work_listen(struct work *w, const char *url) +{ + NUTS_PASS(nng_listen(w->socket, url, NULL, 0)); +} + +void +work_dial(struct work *w, const char * const * urls, size_t urls_size) +{ + size_t i; + + for (i = 0; i < urls_size; ++i) + NUTS_PASS(nng_dial(w->socket, urls[i], NULL, 0)); +} + +void +close_work(struct work *w) +{ + nng_close(w->socket); + nng_aio_free(w->aio); +} + +void +ping_start(struct work *w) +{ + work_send(w, "ping", 5); +} + +void +ping_cb(void *arg) +{ + nng_msg *msg; + struct work *w = arg; + int result = nng_aio_result(w->aio); + + if (result) + switch (result) { + case NNG_ETIMEDOUT: + case NNG_ESTATE: + ping_start(w); + return; + case NNG_ECANCELED: + case NNG_ECLOSED: + return; + default: + NUTS_PASS(result); + abort(); + } + + switch (w->state) { + case SEND: + w->state = RECV; + nng_recv_aio(w->socket, w->aio); + break; + case RECV: + msg = nng_aio_get_msg(w->aio); + TEST_ASSERT(msg); + TEST_ASSERT(nng_msg_len(msg) == 5); + TEST_ASSERT(0 == strncmp(nng_msg_body(msg), "echo", 4)); + nng_msg_free(msg); + nni_atomic_inc(&w->received); + ping_start(w); + break; + } +} + +void +echo_start(struct work *w) +{ + w->state = RECV; + nng_recv_aio(w->socket, w->aio); +} + +void +echo_cb(void *arg) +{ + nng_msg *msg; + struct work *w = arg; + int result = nng_aio_result(w->aio); + + if (result) + switch (result) { + case NNG_ECANCELED: + case NNG_ECLOSED: + return; + default: + NUTS_PASS(result); + abort(); + } + + switch (w->state) { + case RECV: + msg = nng_aio_get_msg(w->aio); + TEST_ASSERT(msg); + TEST_ASSERT(nng_msg_len(msg) == 5); + TEST_ASSERT(0 == strncmp(nng_msg_body(msg), "ping", 4)); + nng_msg_free(msg); + nni_atomic_inc(&w->received); + work_send(w, "echo", 5); + break; + case SEND: + echo_start(w); + break; + } +} + +#define CLIENTS_COUNT 64 +#define SERVICES_COUNT 8 +#define CLIENT_RX_COUNT 100 +#define TEST_DURATION_MS 3000 +#define SURVEY_TIMEOUT_MS 100 + +void +surveyor_open(struct work *w) +{ + w->start = ping_start; + NUTS_PASS(nng_surveyor_open(&w->socket)); + NUTS_PASS(nng_socket_set_ms(w->socket, NNG_OPT_SURVEYOR_SURVEYTIME, SURVEY_TIMEOUT_MS)); + NUTS_PASS(nng_aio_alloc(&w->aio, ping_cb, w)); + nni_atomic_init(&w->received); +} + +void +respondent_open(struct work *w) +{ + w->start = echo_start; + NUTS_PASS(nng_respondent_open(&w->socket)); + NUTS_PASS(nng_aio_alloc(&w->aio, echo_cb, w)); + nni_atomic_init(&w->received); +} + +void +req_open(struct work *w) +{ + w->start = ping_start; + NUTS_PASS(nng_req_open(&w->socket)); + NUTS_PASS(nng_aio_alloc(&w->aio, ping_cb, w)); + nni_atomic_init(&w->received); +} + +void +rep_open(struct work *w) +{ + w->start = echo_start; + NUTS_PASS(nng_rep_open(&w->socket)); + NUTS_PASS(nng_aio_alloc(&w->aio, echo_cb, w)); + nni_atomic_init(&w->received); +} + +void +run_test(work_fn open_service, work_fn open_client) +{ + int i; + nng_time stop_time; + struct work * service; + struct work * client; + struct work services[SERVICES_COUNT]; + struct work clients [CLIENTS_COUNT]; + + const char * service_urls[SERVICES_COUNT] = { + "inproc://stressA", + "inproc://stressB", + "inproc://stressC", + "inproc://stressD", + "inproc://stressE", + "inproc://stressF", + "inproc://stressG", + "inproc://stressH", + }; + + for (i = 0; i < SERVICES_COUNT; i++) { + service = &services[i]; + (*open_service)(service); + work_listen(service, service_urls[i]); + (*service->start)(service); + } + + for (i = 0; i < CLIENTS_COUNT; i++) { + client = &clients[i]; + (*open_client)(client); + work_dial(client, service_urls, SERVICES_COUNT); + (*client->start)(client); + } + + stop_time = nng_clock() + TEST_DURATION_MS; + while (nng_clock() < stop_time) { + client = &clients[nng_random() % CLIENTS_COUNT]; + while (nni_atomic_get(&client->received) < CLIENT_RX_COUNT) + nng_msleep(1); + close_work(client); + (*open_client)(client); + work_dial(client, service_urls, SERVICES_COUNT); + (*client->start)(client); + } + + for (i = 0; i < CLIENTS_COUNT; i++) + close_work(&clients[i]); + for (i = 0; i < SERVICES_COUNT; i++) + close_work(&services[i]); +} + +void +reconnect_stress_respondent(void) +{ + run_test(respondent_open, surveyor_open); +} + +void +reconnect_stress_surveyor(void) +{ + run_test(surveyor_open, respondent_open); +} + +void +reconnect_stress_rep(void) +{ + run_test(rep_open, req_open); +} + +void +reconnect_stress_req(void) +{ + run_test(req_open, rep_open); +} + +TEST_LIST = { + { "reconnect stress respondent", reconnect_stress_respondent }, + { "reconnect stress surveyor", reconnect_stress_surveyor }, + { "reconnect stress rep", reconnect_stress_rep }, + { "reconnect stress req", reconnect_stress_req }, + { NULL, NULL }, +};