diff --git a/src/platform/tcp_stream_test.c b/src/platform/tcp_stream_test.c index c1353e8a9..b46d2d1c6 100644 --- a/src/platform/tcp_stream_test.c +++ b/src/platform/tcp_stream_test.c @@ -130,7 +130,29 @@ test_tcp_stream(void) nng_stream_free(c2); } +void +test_tcp_listen_accept_cancel(void) +{ + nng_stream_listener *l; + char *addr; + nng_aio *aio; + + nng_log_set_logger(nng_stderr_logger); + NUTS_ADDR(addr, "tcp"); + NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); + + // start a listening stream listener but do not call accept + NUTS_PASS(nng_stream_listener_alloc(&l, addr)); + NUTS_PASS(nng_stream_listener_listen(l)); + nng_stream_listener_accept(l, aio); + nng_msleep(100); + nng_aio_free(aio); + nng_stream_listener_close(l); + nng_stream_listener_free(l); +} + NUTS_TESTS = { { "tcp stream", test_tcp_stream }, + { "tcp stream listen accept cancel", test_tcp_listen_accept_cancel }, { NULL, NULL }, }; diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c index 815f9bf33..f55567190 100644 --- a/src/platform/windows/win_io.c +++ b/src/platform/windows/win_io.c @@ -37,11 +37,10 @@ win_io_handler(void *arg) int rv; ok = GetQueuedCompletionStatus( - win_io_h, &cnt, &key, &olpd, INFINITE); + win_io_h, &cnt, &key, &olpd, 5000); if (olpd == NULL) { // Completion port closed... - NNI_ASSERT(ok == FALSE); break; } @@ -124,12 +123,16 @@ nni_win_io_sysfini(void) HANDLE h; if ((h = win_io_h) != NULL) { + // send wakeups in case closing the handle doesn't work + for (i = 0; i < win_io_nthr; i++) { + PostQueuedCompletionStatus(h, 0, 0, NULL); + } CloseHandle(h); + for (i = 0; i < win_io_nthr; i++) { + nni_thr_fini(&win_io_thrs[i]); + } win_io_h = NULL; } - for (i = 0; i < win_io_nthr; i++) { - nni_thr_fini(&win_io_thrs[i]); - } NNI_FREE_STRUCTS(win_io_thrs, win_io_nthr); } diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index ef6ceb66d..5f540a6c6 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -9,6 +9,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "core/aio.h" #include "core/nng_impl.h" #include "win_ipc.h" @@ -32,11 +33,27 @@ typedef struct ipc_conn { bool closed; bool sending; bool recving; + bool recv_fail; + bool send_fail; nni_mtx mtx; nni_cv cv; nni_reap_node reap; } ipc_conn; +static void +ipc_recv_fail(ipc_conn *c, int rv) +{ + nni_aio *aio; + c->recving = false; + c->recv_fail = true; + c->recv_rv = rv; + while ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_cv_wake(&c->cv); +} + static void ipc_recv_start(ipc_conn *c) { @@ -48,48 +65,45 @@ ipc_recv_start(ipc_conn *c) DWORD len; int rv; - while ((aio = nni_list_first(&c->recv_aios)) != NULL) { - if (c->closed) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - continue; - } + if ((aio = nni_list_first(&c->recv_aios)) == NULL) { + nni_cv_wake(&c->cv); + return; + } - nni_aio_get_iov(aio, &naiov, &aiov); + if (c->closed) { + ipc_recv_fail(c, NNG_ECLOSED); + return; + } - idx = 0; - while ((idx < naiov) && (aiov[idx].iov_len == 0)) { - idx++; - } - NNI_ASSERT(idx < naiov); - // Now start a transfer. We assume that only one send can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes - // do not appear to support scatter/gather, so we have to - // process each element in turn. - buf = aiov[idx].iov_buf; - len = (DWORD) aiov[idx].iov_len; - NNI_ASSERT(buf != NULL); - NNI_ASSERT(len != 0); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; - } + nni_aio_get_iov(aio, &naiov, &aiov); - c->recving = true; - if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - c->recving = false; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, nni_win_error(rv)); - } else { - return; - } + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes + // do not appear to support scatter/gather, so we have to + // process each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + c->recving = true; + if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + ipc_recv_fail(c, nni_win_error(rv)); } - nni_cv_wake(&c->cv); } static void @@ -97,6 +111,7 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num) { nni_aio *aio; ipc_conn *c = io->ptr; + nni_mtx_lock(&c->mtx); aio = nni_list_first(&c->recv_aios); NNI_ASSERT(aio != NULL); @@ -109,11 +124,17 @@ ipc_recv_cb(nni_win_io *io, int rv, size_t num) rv = NNG_ECONNSHUT; } c->recving = false; + if (rv != 0) { + ipc_recv_fail(c, nni_win_error(rv)); + nni_mtx_unlock(&c->mtx); + return; + } nni_aio_list_remove(aio); ipc_recv_start(c); nni_mtx_unlock(&c->mtx); - nni_aio_finish_sync(aio, rv, num); + // nni_aio_finish_sync(aio, rv, num); + nni_aio_finish(aio, rv, num); } static void @@ -153,6 +174,12 @@ ipc_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } + if (c->recv_fail) { + rv = c->recv_rv; + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); @@ -165,6 +192,21 @@ ipc_recv(void *arg, nni_aio *aio) nni_mtx_unlock(&c->mtx); } +static void +ipc_send_fail(ipc_conn *c, int rv) +{ + nni_aio *aio; + + c->sending = false; + c->send_fail = true; + c->send_rv = rv; + while ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_cv_wake(&c->cv); +} + static void ipc_send_start(ipc_conn *c) { @@ -176,43 +218,45 @@ ipc_send_start(ipc_conn *c) DWORD len; int rv; - while ((aio = nni_list_first(&c->send_aios)) != NULL) { + if ((aio = nni_list_first(&c->send_aios)) == NULL) { + nni_cv_wake(&c->cv); + return; + } - nni_aio_get_iov(aio, &naiov, &aiov); + if (c->closed) { + ipc_send_fail(c, NNG_ECLOSED); + return; + } - idx = 0; - while ((idx < naiov) && (aiov[idx].iov_len == 0)) { - idx++; - } - NNI_ASSERT(idx < naiov); - // Now start a transfer. We assume that only one send can be - // outstanding on a pipe at a time. This is important to avoid - // scrambling the data anyway. Note that Windows named pipes - // do not appear to support scatter/gather, so we have to - // process each element in turn. - buf = aiov[idx].iov_buf; - len = (DWORD) aiov[idx].iov_len; - NNI_ASSERT(buf != NULL); - NNI_ASSERT(len != 0); - - // We limit ourselves to writing 16MB at a time. Named Pipes - // on Windows have limits of between 31 and 64MB. - if (len > 0x1000000) { - len = 0x1000000; - } + nni_aio_get_iov(aio, &naiov, &aiov); - c->sending = true; - if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) && - ((rv = GetLastError()) != ERROR_IO_PENDING)) { - // Synchronous failure. - c->sending = false; - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, nni_win_error(rv)); - } else { - return; - } + idx = 0; + while ((idx < naiov) && (aiov[idx].iov_len == 0)) { + idx++; + } + NNI_ASSERT(idx < naiov); + // Now start a transfer. We assume that only one send can be + // outstanding on a pipe at a time. This is important to avoid + // scrambling the data anyway. Note that Windows named pipes + // do not appear to support scatter/gather, so we have to + // process each element in turn. + buf = aiov[idx].iov_buf; + len = (DWORD) aiov[idx].iov_len; + NNI_ASSERT(buf != NULL); + NNI_ASSERT(len != 0); + + // We limit ourselves to writing 16MB at a time. Named Pipes + // on Windows have limits of between 31 and 64MB. + if (len > 0x1000000) { + len = 0x1000000; + } + + c->sending = true; + if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) && + ((rv = GetLastError()) != ERROR_IO_PENDING)) { + // Synchronous failure. + ipc_send_fail(c, nni_win_error(rv)); } - nni_cv_wake(&c->cv); } static void @@ -284,6 +328,8 @@ ipc_close(void *arg) { ipc_conn *c = arg; nni_time now; + nni_aio *aio; + nni_mtx_lock(&c->mtx); if (!c->closed) { HANDLE f = c->f; @@ -294,58 +340,53 @@ ipc_close(void *arg) if (f != INVALID_HANDLE_VALUE) { CancelIoEx(f, &c->send_io.olpd); CancelIoEx(f, &c->recv_io.olpd); - DisconnectNamedPipe(f); - CloseHandle(f); } } - now = nni_clock(); - // wait up to a maximum of 10 seconds before assuming something is - // badly amiss. from what we can tell, this doesn't happen, and we do - // see the timer expire properly, but this safeguard can prevent a - // hang. - while ((c->recving || c->sending) && - ((nni_clock() - now) < (NNI_SECOND * 10))) { - nni_mtx_unlock(&c->mtx); - nni_msleep(1); - nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->send_aios)) != NULL) { + nni_aio_abort(aio, NNG_ECLOSED); + } + if ((aio = nni_list_first(&c->recv_aios)) != NULL) { + nni_aio_abort(aio, NNG_ECLOSED); } nni_mtx_unlock(&c->mtx); } static void -ipc_conn_reap(void *arg) +ipc_free(void *arg) { ipc_conn *c = arg; + nni_aio *aio; + HANDLE f = c->f; + int loop = 0; nni_mtx_lock(&c->mtx); - while ((!nni_list_empty(&c->recv_aios)) || - (!nni_list_empty(&c->send_aios))) { - nni_cv_wait(&c->cv); + // time for callbacks to fire/drain. + nni_time when = nng_clock() + 5000; + while (c->sending || c->recving) { + if (nni_cv_until(&c->cv, when) == NNG_ETIMEDOUT) { + nng_log_err("NNG-WIN-IPC", + "Timeout waiting for operations to cancel"); + break; + } } + // These asserts are for debug, we should never see it. + // If we do then something bad happened. + NNI_ASSERT(!c->sending); + NNI_ASSERT(!c->recving); + NNI_ASSERT(nni_list_empty(&c->recv_aios)); + NNI_ASSERT(nni_list_empty(&c->send_aios)); nni_mtx_unlock(&c->mtx); - if (c->f != INVALID_HANDLE_VALUE) { - CloseHandle(c->f); + if (f != INVALID_HANDLE_VALUE) { + DisconnectNamedPipe(f); + CloseHandle(f); } + nni_cv_fini(&c->cv); nni_mtx_fini(&c->mtx); NNI_FREE_STRUCT(c); } -static nni_reap_list ipc_reap_list = { - .rl_offset = offsetof(ipc_conn, reap), - .rl_func = ipc_conn_reap, -}; - -static void -ipc_free(void *arg) -{ - ipc_conn *c = arg; - ipc_close(c); - - nni_reap(&ipc_reap_list, c); -} - static int ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t) { diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c index b7b7b5260..98cb82732 100644 --- a/src/platform/windows/win_ipclisten.c +++ b/src/platform/windows/win_ipclisten.c @@ -20,11 +20,11 @@ typedef struct { char *path; bool started; bool closed; + bool accepting; HANDLE f; SECURITY_ATTRIBUTES sec_attr; nni_list aios; nni_mtx mtx; - nni_cv cv; nni_win_io io; nni_sockaddr sa; int rv; @@ -39,7 +39,6 @@ ipc_accept_done(ipc_listener *l, int rv) aio = nni_list_first(&l->aios); nni_list_remove(&l->aios, aio); - nni_cv_wake(&l->cv); if (l->closed) { rv = NNG_ECLOSED; @@ -86,6 +85,7 @@ ipc_accept_start(ipc_listener *l) { nni_aio *aio; + NNI_ASSERT(!l->accepting); while ((aio = nni_list_first(&l->aios)) != NULL) { int rv; @@ -97,6 +97,7 @@ ipc_accept_start(ipc_listener *l) rv = 0; } else if ((rv = GetLastError()) == ERROR_IO_PENDING) { // asynchronous completion pending + l->accepting = true; return; } else if (rv == ERROR_PIPE_CONNECTED) { rv = 0; @@ -104,8 +105,6 @@ ipc_accept_start(ipc_listener *l) // synchronous completion ipc_accept_done(l, rv); } - - nni_cv_wake(&l->cv); } static void @@ -116,9 +115,18 @@ ipc_accept_cb(nni_win_io *io, int rv, size_t cnt) NNI_ARG_UNUSED(cnt); nni_mtx_lock(&l->mtx); - if (nni_list_empty(&l->aios)) { - // We canceled this somehow. We no longer care. - DisconnectNamedPipe(l->f); + l->accepting = false; + if (l->closed) { + // We're shutting down, and the handle is probably closed. + // We should not have gotten anything here. + nni_mtx_unlock(&l->mtx); + return; + } + if (nni_list_empty(&l->aios) && l->rv == 0) { + // We canceled, and nobody waiting. + // But... we'll probably have another caller do + // accept momentarily, so we leave this and it will be + // ERROR_PIPE_CONNECTED later. nni_mtx_unlock(&l->mtx); return; } @@ -242,12 +250,8 @@ ipc_accept_cancel(nni_aio *aio, void *arg, int rv) ipc_listener *l = arg; nni_mtx_unlock(&l->mtx); - if (aio == nni_list_first(&l->aios)) { - l->rv = rv; - CancelIoEx(l->f, &l->io.olpd); - } else if (nni_aio_list_active(aio)) { - nni_list_remove(&l->aios, aio); - nni_cv_wake(&l->cv); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&l->mtx); @@ -282,29 +286,55 @@ static void ipc_listener_close(void *arg) { ipc_listener *l = arg; + nni_aio *aio; + int rv; + DWORD nb; + nni_mtx_lock(&l->mtx); - if (!l->closed) { - l->closed = true; - if (!nni_list_empty(&l->aios)) { - CancelIoEx(l->f, &l->io.olpd); - } - DisconnectNamedPipe(l->f); - CloseHandle(l->f); + if (l->closed) { + nni_mtx_unlock(&l->mtx); + return; } + l->closed = true; + while ((aio = nni_list_first(&l->aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + bool accepting = l->accepting; nni_mtx_unlock(&l->mtx); + + // This craziness because CancelIoEx on ConnectNamedPipe + // seems to be incredibly unreliable. It does work, sometimes, + // but often it doesn't. This entire named pipe business needs + // to be retired in favor of UNIX domain sockets anyway. + + while (accepting) { + if (!CancelIoEx(l->f, &l->io.olpd)) { + // operation not found probably + // We just inject a safety sleep to + // let it drain and give the callback + // a chance to fire (although it should + // already have done so.) + DisconnectNamedPipe(l->f); + CloseHandle(l->f); + nng_msleep(500); + return; + } + nng_msleep(100); + nni_mtx_lock(&l->mtx); + accepting = l->accepting; + nni_mtx_unlock(&l->mtx); + } + DisconnectNamedPipe(l->f); + CloseHandle(l->f); } static void ipc_listener_free(void *arg) { ipc_listener *l = arg; - nni_mtx_lock(&l->mtx); - while (!nni_list_empty(&l->aios)) { - nni_cv_wait(&l->cv); - } - nni_mtx_unlock(&l->mtx); + nni_strfree(l->path); - nni_cv_fini(&l->cv); nni_mtx_fini(&l->mtx); NNI_FREE_STRUCT(l); } @@ -339,7 +369,6 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) snprintf(l->sa.s_ipc.sa_path, NNG_MAXADDRLEN, "%s", url->u_path); nni_aio_list_init(&l->aios); nni_mtx_init(&l->mtx); - nni_cv_init(&l->cv, &l->mtx); *lp = (void *) l; return (0); } diff --git a/src/sp/transport/ipc/ipc_test.c b/src/sp/transport/ipc/ipc_test.c index 51eb975da..509d87226 100644 --- a/src/sp/transport/ipc/ipc_test.c +++ b/src/sp/transport/ipc/ipc_test.c @@ -152,7 +152,6 @@ test_ipc_ping_pong(void) nng_socket s1; char *addr; - NUTS_ENABLE_LOG(NNG_LOG_INFO); NUTS_ADDR(addr, "ipc"); NUTS_OPEN(s0); NUTS_OPEN(s1); @@ -245,7 +244,6 @@ test_ipc_recv_max(void) size_t sz; char *addr; - NUTS_ENABLE_LOG(NNG_LOG_INFO); NUTS_ADDR(addr, "ipc"); NUTS_OPEN(s0); NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); @@ -275,7 +273,6 @@ test_ipc_connect_refused(void) nng_dialer d; char *addr; - NUTS_ENABLE_LOG(NNG_LOG_INFO); NUTS_ADDR(addr, "ipc"); NUTS_OPEN(s0); NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); @@ -292,7 +289,6 @@ test_ipc_connect_blocking(void) nng_stream_listener *l; char *addr; - NUTS_ENABLE_LOG(NNG_LOG_INFO); NUTS_ADDR(addr, "ipc"); NUTS_OPEN(s0); @@ -315,7 +311,6 @@ test_ipc_connect_blocking_accept(void) char *addr; nng_aio *aio; - NUTS_ENABLE_LOG(NNG_LOG_INFO); NUTS_ADDR(addr, "ipc"); NUTS_OPEN(s0); @@ -344,7 +339,6 @@ test_ipc_listen_accept_cancel(void) char *addr; nng_aio *aio; - NUTS_ENABLE_LOG(NNG_LOG_INFO); NUTS_ADDR(addr, "ipc"); NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -364,7 +358,6 @@ test_ipc_listen_duplicate(void) nng_socket s0; char *addr; - NUTS_ENABLE_LOG(NNG_LOG_INFO); NUTS_ADDR(addr, "ipc"); NUTS_OPEN(s0); @@ -384,7 +377,6 @@ test_ipc_listener_clean_stale(void) char *path; char renamed[256]; - NUTS_ENABLE_LOG(NNG_LOG_INFO); NUTS_ADDR(addr, "ipc"); NUTS_OPEN(s0);