diff --git a/src/core/pipe.c b/src/core/pipe.c index f13f874be..279a22f53 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -11,8 +11,10 @@ #include +#include "core/defs.h" #include "core/nng_impl.h" +#include "core/pipe.h" #include "dialer.h" #include "listener.h" #include "sockimpl.h" @@ -41,7 +43,9 @@ pipe_destroy(void *arg) { nni_pipe *p = arg; - p->p_proto_ops.pipe_fini(p->p_proto_data); + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_fini(p->p_proto_data); + } if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } @@ -71,7 +75,9 @@ pipe_reap(void *arg) #endif nni_pipe_remove(p); - p->p_proto_ops.pipe_stop(p->p_proto_data); + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_stop(p->p_proto_data); + } if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) { p->p_tran_ops.p_stop(p->p_tran_data); } @@ -233,28 +239,40 @@ pipe_stats_init(nni_pipe *p) static int pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) { - nni_pipe *p; - int rv; - void *sock_data = nni_sock_proto_data(sock); - nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - size_t sz; + nni_pipe *p; + int rv; + void *sock_data = nni_sock_proto_data(sock); + const nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + const nni_sp_pipe_ops *tops = tran->tran_pipe; + size_t sz; - sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) + + NNI_ALIGN_UP(tops->p_size); if ((p = nni_zalloc(sz)) == NULL) { // In this case we just toss the pipe... - tran->tran_pipe->p_fini(tran_data); + // TODO: remove when all transports converted + // to use p_size. + if (tran_data != NULL) { + tops->p_fini(tran_data); + } return (NNG_ENOMEM); } - p->p_size = sz; - p->p_proto_data = p + 1; - p->p_tran_ops = *tran->tran_pipe; - p->p_tran_data = tran_data; - p->p_proto_ops = *pops; - p->p_sock = sock; - p->p_cbs = false; + uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p)); + + if (tran_data == NULL) { + tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size); + } + + p->p_size = sz; + p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_sock = sock; + p->p_cbs = false; + // Two references - one for our caller, and + // one to be dropped when the pipe is closed. nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); @@ -272,8 +290,14 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) nni_sock_hold(sock); - if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) || - ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) { + if ((rv == 0) && (rv = tops->p_init(tran_data, p)) == 0) { + p->p_tran_data = tran_data; + } + if ((rv == 0) && + (rv = pops->pipe_init(proto_data, p, sock_data)) == 0) { + p->p_proto_data = proto_data; + } + if (rv != 0) { nni_pipe_close(p); nni_pipe_rele(p); return (rv); @@ -308,6 +332,30 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) return (0); } +void * +nni_pipe_tran_data(nni_pipe *p) +{ + return (p->p_tran_data); +} + +int +nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l) +{ + int rv; + nni_pipe *p; + if (d != NULL) { + rv = nni_pipe_create_dialer(&p, d, NULL); + } else if (l != NULL) { + rv = nni_pipe_create_listener(&p, l, NULL); + } else { + rv = NNG_EINVAL; + } + if (rv == 0) { + *datap = nni_pipe_tran_data(p); + } + return (rv); +} + int nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 9e7109de6..6295fac20 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -15,9 +15,10 @@ // OUTSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. +#include "nng/nng.h" + #include "core/defs.h" #include "core/thread.h" -#include "nng/nng.h" #include "sp/transport.h" // AIO @@ -62,5 +63,6 @@ extern void nni_pipe_bump_tx(nni_pipe *, size_t); extern void nni_pipe_bump_error(nni_pipe *, int); extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]); +extern void *nni_pipe_tran_data(nni_pipe *); #endif // CORE_PIPE_H diff --git a/src/core/socket.c b/src/core/socket.c index 4bc32d3f2..c6e880758 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1272,9 +1272,13 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; + if (l->l_tran->tran_pipe->p_size != 0) { + p = tpipe; + } else { + if (nni_pipe_create_listener(&p, l, tpipe) != 0) { + nni_mtx_unlock(&s->s_mx); + return; + } } nni_list_append(&l->l_pipes, p); diff --git a/src/sp/transport.h b/src/sp/transport.h index b7134595a..4cc6bdb3d 100644 --- a/src/sp/transport.h +++ b/src/sp/transport.h @@ -12,6 +12,8 @@ #ifndef PROTOCOL_SP_TRANSPORT_H #define PROTOCOL_SP_TRANSPORT_H +#include "core/defs.h" +#include "core/list.h" #include "core/options.h" // Endpoint operations are called by the socket in a @@ -118,9 +120,14 @@ struct nni_sp_listener_ops { // pointers back to socket or even enclosing pipe state, are not // provided.) struct nni_sp_pipe_ops { - // p_init initializes the pipe data structures. The main - // purpose of this is so that the pipe will see the upper - // layer nni_pipe and get a chance to register stats and such. + // p_size specifies the size of the transport private pipe data. + size_t p_size; + + // p_init initializes the transport's pipe data structure. + // The pipe MUST be left in a state that p_fini can be safely + // called on it, even if it does not succeed. (The upper layers + // will call p_fini as part of the cleanup of a failure.) + // This function should not acquire any locks. int (*p_init)(void *, nni_pipe *); // p_fini destroys the pipe. This should clean up all local @@ -134,7 +141,7 @@ struct nni_sp_pipe_ops { // resources with p_fini. void (*p_stop)(void *); - // p_aio_send queues the message for transmit. If this fails, + // p_send queues the message for transmit. If this fails, // then the caller may try again with the same message (or free // it). If the call succeeds, then the transport has taken // ownership of the message, and the caller may not use it @@ -194,4 +201,6 @@ extern void nni_sp_tran_sys_init(void); extern void nni_sp_tran_sys_fini(void); extern void nni_sp_tran_register(nni_sp_tran *); +extern int nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l); + #endif // PROTOCOL_SP_TRANSPORT_H diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c index 45927d197..c1907e9f7 100644 --- a/src/sp/transport/socket/sockfd.c +++ b/src/sp/transport/socket/sockfd.c @@ -25,48 +25,43 @@ typedef struct sfd_tran_ep sfd_tran_ep; // sfd_tran_pipe wraps an open file descriptor struct sfd_tran_pipe { - nng_stream *conn; - nni_pipe *npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool closed; - nni_list_node node; - sfd_tran_ep *ep; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t txlen[sizeof(uint64_t)]; - uint8_t rxlen[sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio txaio; - nni_aio rxaio; - nni_aio negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + sfd_tran_ep *ep; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct sfd_tran_ep { nni_mtx mtx; uint16_t proto; size_t rcvmax; - bool fini; bool started; bool closed; nng_sockaddr src; - int refcnt; // active pipes nni_aio *useraio; nni_aio connaio; nni_aio timeaio; - nni_list busypipes; // busy pipes -- ones passed to socket nni_list waitpipes; // pipes waiting to match to socket nni_list negopipes; // pipes busy negotiating - nni_reap_node reap; nng_stream_listener *listener; + nni_listener *nlistener; #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; @@ -81,16 +76,6 @@ static void sfd_tran_pipe_nego_cb(void *); static void sfd_tran_ep_fini(void *); static void sfd_tran_pipe_fini(void *); -static nni_reap_list sfd_tran_ep_reap_list = { - .rl_offset = offsetof(sfd_tran_ep, reap), - .rl_func = sfd_tran_ep_fini, -}; - -static nni_reap_list sfd_tran_pipe_reap_list = { - .rl_offset = offsetof(sfd_tran_pipe, reap), - .rl_func = sfd_tran_pipe_fini, -}; - static void sfd_tran_init(void) { @@ -109,12 +94,13 @@ sfd_tran_pipe_close(void *arg) nni_mtx_lock(&p->mtx); p->closed = true; nni_mtx_unlock(&p->mtx); + if (p->conn != NULL) { + nng_stream_close(p->conn); + } nni_aio_close(&p->rxaio); nni_aio_close(&p->txaio); nni_aio_close(&p->negoaio); - - nng_stream_close(p->conn); } static void @@ -132,6 +118,12 @@ sfd_tran_pipe_init(void *arg, nni_pipe *npipe) { sfd_tran_pipe *p = arg; p->npipe = npipe; + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); return (0); } @@ -146,52 +138,17 @@ sfd_tran_pipe_fini(void *arg) if ((ep = p->ep) != NULL) { nni_mtx_lock(&ep->mtx); nni_list_node_remove(&p->node); - ep->refcnt--; - if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&sfd_tran_ep_reap_list, ep); - } nni_mtx_unlock(&ep->mtx); } + if (p->conn != NULL) { + nng_stream_free(p->conn); + } nni_aio_fini(&p->rxaio); nni_aio_fini(&p->txaio); nni_aio_fini(&p->negoaio); - nng_stream_free(p->conn); nni_msg_free(p->rxmsg); nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static void -sfd_tran_pipe_reap(sfd_tran_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->conn != NULL) { - nng_stream_close(p->conn); - } - nni_reap(&sfd_tran_pipe_reap_list, p); - } -} - -static int -sfd_tran_pipe_alloc(sfd_tran_pipe **pipep) -{ - sfd_tran_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); - nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); - nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); - nni_aio_list_init(&p->recvq); - nni_aio_list_init(&p->sendq); - nni_atomic_flag_reset(&p->reaped); - - *pipep = p; - - return (0); } static void @@ -205,10 +162,9 @@ sfd_tran_ep_match(sfd_tran_ep *ep) return; } nni_list_remove(&ep->waitpipes, p); - nni_list_append(&ep->busypipes, p); ep->useraio = NULL; p->rcvmax = ep->rcvmax; - nni_aio_set_output(aio, 0, p); + nni_aio_set_output(aio, 0, p->npipe); nni_aio_finish(aio, 0, 0); } @@ -282,14 +238,13 @@ sfd_tran_pipe_nego_cb(void *arg) if (rv == NNG_ECLOSED) { rv = NNG_ECONNSHUT; } - nng_stream_close(p->conn); if ((uaio = ep->useraio) != NULL) { ep->useraio = NULL; nni_aio_finish_error(uaio, rv); } nni_mtx_unlock(&ep->mtx); - sfd_tran_pipe_reap(p); + nni_pipe_close(p->npipe); } static void @@ -621,8 +576,6 @@ sfd_tran_pipe_start(sfd_tran_pipe *p, nng_stream *conn, sfd_tran_ep *ep) { nni_iov iov; - ep->refcnt++; - p->conn = conn; p->ep = ep; p->proto = ep->proto; @@ -659,13 +612,6 @@ sfd_tran_ep_fini(void *arg) { sfd_tran_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - ep->fini = true; - if (ep->refcnt != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - nni_mtx_unlock(&ep->mtx); nni_aio_stop(&ep->timeaio); nni_aio_stop(&ep->connaio); nng_stream_listener_free(ep->listener); @@ -689,14 +635,13 @@ sfd_tran_ep_close(void *arg) if (ep->listener != NULL) { nng_stream_listener_close(ep->listener); } - NNI_LIST_FOREACH (&ep->negopipes, p) { - sfd_tran_pipe_close(p); - } - NNI_LIST_FOREACH (&ep->waitpipes, p) { - sfd_tran_pipe_close(p); - } - NNI_LIST_FOREACH (&ep->busypipes, p) { - sfd_tran_pipe_close(p); + while (((p = nni_list_first(&ep->negopipes)) != NULL) || + ((p = nni_list_first(&ep->waitpipes)) != NULL)) { + nni_list_node_remove(&p->node); + nni_mtx_unlock(&ep->mtx); + nni_pipe_close(p->npipe); + nni_pipe_rele(p->npipe); + nni_mtx_lock(&ep->mtx); } if (ep->useraio != NULL) { nni_aio_finish_error(ep->useraio, NNG_ECLOSED); @@ -731,14 +676,16 @@ sfd_tran_accept_cb(void *arg) } conn = nni_aio_get_output(aio, 0); - if ((rv = sfd_tran_pipe_alloc(&p)) != 0) { + if ((rv = nni_sp_pipe_alloc((void **) &p, NULL, ep->nlistener)) != 0) { nng_stream_free(conn); goto error; } + p->conn = conn; + p->ep = ep; if (ep->closed) { - sfd_tran_pipe_fini(p); - nng_stream_free(conn); + nni_pipe_close(p->npipe); + nni_pipe_rele(p->npipe); rv = NNG_ECLOSED; goto error; } @@ -771,20 +718,20 @@ sfd_tran_accept_cb(void *arg) } static int -sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) +sfd_tran_ep_init(sfd_tran_ep **epp, nni_listener *nlistener) { sfd_tran_ep *ep; - NNI_ARG_UNUSED(url); + nni_sock *sock = nni_listener_sock(nlistener); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->busypipes, sfd_tran_pipe, node); NNI_LIST_INIT(&ep->waitpipes, sfd_tran_pipe, node); NNI_LIST_INIT(&ep->negopipes, sfd_tran_pipe, node); - ep->proto = nni_sock_proto_id(sock); + ep->nlistener = nlistener; + ep->proto = nni_sock_proto_id(sock); #ifdef NNG_ENABLE_STATS static const nni_stat_info rcv_max_info = { @@ -806,7 +753,6 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) { sfd_tran_ep *ep; int rv; - nni_sock *sock = nni_listener_sock(nlistener); // Check for invalid URL components -- we only accept a bare scheme if ((url->u_hostname != NULL) || (strlen(url->u_path) != 0) || @@ -815,7 +761,7 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) return (NNG_EADDRINVAL); } - if ((rv = sfd_tran_ep_init(&ep, url, sock)) != 0) { + if ((rv = sfd_tran_ep_init(&ep, nlistener)) != 0) { return (rv); } @@ -920,6 +866,7 @@ sfd_tran_ep_accept(void *arg, nni_aio *aio) } static nni_sp_pipe_ops sfd_tran_pipe_ops = { + .p_size = sizeof(sfd_tran_pipe), .p_init = sfd_tran_pipe_init, .p_fini = sfd_tran_pipe_fini, .p_stop = sfd_tran_pipe_stop, diff --git a/src/sp/transport/socket/sockfd_test.c b/src/sp/transport/socket/sockfd_test.c index de1f582b0..278f6c685 100644 --- a/src/sp/transport/socket/sockfd_test.c +++ b/src/sp/transport/socket/sockfd_test.c @@ -157,8 +157,8 @@ void test_sfd_recv_max(void) { #ifdef NNG_HAVE_SOCKETPAIR - char msg[256]; - char buf[256]; + char msg[256] = { 0 }; + char buf[256] = { 0 }; nng_socket s0; nng_socket s1; nng_listener l0;