diff --git a/src/core/pipe.c b/src/core/pipe.c index f13f874be..65531e4f6 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -11,8 +11,10 @@ #include +#include "core/defs.h" #include "core/nng_impl.h" +#include "core/pipe.h" #include "dialer.h" #include "listener.h" #include "sockimpl.h" @@ -233,28 +235,42 @@ pipe_stats_init(nni_pipe *p) static int pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) { - nni_pipe *p; - int rv; - void *sock_data = nni_sock_proto_data(sock); - nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - size_t sz; + nni_pipe *p; + int rv; + void *sock_data = nni_sock_proto_data(sock); + const nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + const nni_sp_pipe_ops *tops = tran->tran_pipe; + size_t sz; - sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) + + NNI_ALIGN_UP(tops->p_size); if ((p = nni_zalloc(sz)) == NULL) { // In this case we just toss the pipe... - tran->tran_pipe->p_fini(tran_data); + // TODO: remove when all transports converted + // to use p_size. + if (tran_data != NULL) { + tops->p_fini(tran_data); + } return (NNG_ENOMEM); } + uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p)); + + if (tran_data == NULL) { + tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size); + } + p->p_size = sz; - p->p_proto_data = p + 1; - p->p_tran_ops = *tran->tran_pipe; - p->p_tran_data = tran_data; + p->p_proto_data = proto_data; p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_tran_data = tran_data; p->p_sock = sock; p->p_cbs = false; + // Two references - one for our caller, and + // one to be dropped when the pipe is closed. nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); @@ -272,7 +288,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) nni_sock_hold(sock); - if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) || + if ((rv != 0) || ((rv = tops->p_init(tran_data, p)) != 0) || ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) { nni_pipe_close(p); nni_pipe_rele(p); @@ -308,6 +324,30 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) return (0); } +void * +nni_pipe_tran_data(nni_pipe *p) +{ + return (p->p_tran_data); +} + +int +nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l) +{ + int rv; + nni_pipe *p; + if (d != NULL) { + rv = nni_pipe_create_dialer(&p, d, NULL); + } else if (l != NULL) { + rv = nni_pipe_create_listener(&p, l, NULL); + } else { + rv = NNG_EINVAL; + } + if (rv == 0) { + *datap = nni_pipe_tran_data(p); + } + return (rv); +} + int nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 9e7109de6..6295fac20 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -15,9 +15,10 @@ // OUTSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. +#include "nng/nng.h" + #include "core/defs.h" #include "core/thread.h" -#include "nng/nng.h" #include "sp/transport.h" // AIO @@ -62,5 +63,6 @@ extern void nni_pipe_bump_tx(nni_pipe *, size_t); extern void nni_pipe_bump_error(nni_pipe *, int); extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]); +extern void *nni_pipe_tran_data(nni_pipe *); #endif // CORE_PIPE_H diff --git a/src/core/socket.c b/src/core/socket.c index 4bc32d3f2..c6e880758 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1272,9 +1272,13 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; + if (l->l_tran->tran_pipe->p_size != 0) { + p = tpipe; + } else { + if (nni_pipe_create_listener(&p, l, tpipe) != 0) { + nni_mtx_unlock(&s->s_mx); + return; + } } nni_list_append(&l->l_pipes, p); diff --git a/src/sp/transport.h b/src/sp/transport.h index b7134595a..4cc6bdb3d 100644 --- a/src/sp/transport.h +++ b/src/sp/transport.h @@ -12,6 +12,8 @@ #ifndef PROTOCOL_SP_TRANSPORT_H #define PROTOCOL_SP_TRANSPORT_H +#include "core/defs.h" +#include "core/list.h" #include "core/options.h" // Endpoint operations are called by the socket in a @@ -118,9 +120,14 @@ struct nni_sp_listener_ops { // pointers back to socket or even enclosing pipe state, are not // provided.) struct nni_sp_pipe_ops { - // p_init initializes the pipe data structures. The main - // purpose of this is so that the pipe will see the upper - // layer nni_pipe and get a chance to register stats and such. + // p_size specifies the size of the transport private pipe data. + size_t p_size; + + // p_init initializes the transport's pipe data structure. + // The pipe MUST be left in a state that p_fini can be safely + // called on it, even if it does not succeed. (The upper layers + // will call p_fini as part of the cleanup of a failure.) + // This function should not acquire any locks. int (*p_init)(void *, nni_pipe *); // p_fini destroys the pipe. This should clean up all local @@ -134,7 +141,7 @@ struct nni_sp_pipe_ops { // resources with p_fini. void (*p_stop)(void *); - // p_aio_send queues the message for transmit. If this fails, + // p_send queues the message for transmit. If this fails, // then the caller may try again with the same message (or free // it). If the call succeeds, then the transport has taken // ownership of the message, and the caller may not use it @@ -194,4 +201,6 @@ extern void nni_sp_tran_sys_init(void); extern void nni_sp_tran_sys_fini(void); extern void nni_sp_tran_register(nni_sp_tran *); +extern int nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l); + #endif // PROTOCOL_SP_TRANSPORT_H diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c index 45927d197..9c633db04 100644 --- a/src/sp/transport/socket/sockfd.c +++ b/src/sp/transport/socket/sockfd.c @@ -25,48 +25,44 @@ typedef struct sfd_tran_ep sfd_tran_ep; // sfd_tran_pipe wraps an open file descriptor struct sfd_tran_pipe { - nng_stream *conn; - nni_pipe *npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool closed; - nni_list_node node; - sfd_tran_ep *ep; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t txlen[sizeof(uint64_t)]; - uint8_t rxlen[sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio txaio; - nni_aio rxaio; - nni_aio negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + sfd_tran_ep *ep; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct sfd_tran_ep { nni_mtx mtx; uint16_t proto; size_t rcvmax; - bool fini; bool started; bool closed; nng_sockaddr src; - int refcnt; // active pipes nni_aio *useraio; nni_aio connaio; nni_aio timeaio; nni_list busypipes; // busy pipes -- ones passed to socket nni_list waitpipes; // pipes waiting to match to socket nni_list negopipes; // pipes busy negotiating - nni_reap_node reap; nng_stream_listener *listener; + nni_listener *nlistener; #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; @@ -81,16 +77,6 @@ static void sfd_tran_pipe_nego_cb(void *); static void sfd_tran_ep_fini(void *); static void sfd_tran_pipe_fini(void *); -static nni_reap_list sfd_tran_ep_reap_list = { - .rl_offset = offsetof(sfd_tran_ep, reap), - .rl_func = sfd_tran_ep_fini, -}; - -static nni_reap_list sfd_tran_pipe_reap_list = { - .rl_offset = offsetof(sfd_tran_pipe, reap), - .rl_func = sfd_tran_pipe_fini, -}; - static void sfd_tran_init(void) { @@ -132,6 +118,12 @@ sfd_tran_pipe_init(void *arg, nni_pipe *npipe) { sfd_tran_pipe *p = arg; p->npipe = npipe; + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); return (0); } @@ -146,10 +138,6 @@ sfd_tran_pipe_fini(void *arg) if ((ep = p->ep) != NULL) { nni_mtx_lock(&ep->mtx); nni_list_node_remove(&p->node); - ep->refcnt--; - if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&sfd_tran_ep_reap_list, ep); - } nni_mtx_unlock(&ep->mtx); } @@ -159,39 +147,6 @@ sfd_tran_pipe_fini(void *arg) nng_stream_free(p->conn); nni_msg_free(p->rxmsg); nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static void -sfd_tran_pipe_reap(sfd_tran_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->conn != NULL) { - nng_stream_close(p->conn); - } - nni_reap(&sfd_tran_pipe_reap_list, p); - } -} - -static int -sfd_tran_pipe_alloc(sfd_tran_pipe **pipep) -{ - sfd_tran_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); - nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); - nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); - nni_aio_list_init(&p->recvq); - nni_aio_list_init(&p->sendq); - nni_atomic_flag_reset(&p->reaped); - - *pipep = p; - - return (0); } static void @@ -208,7 +163,7 @@ sfd_tran_ep_match(sfd_tran_ep *ep) nni_list_append(&ep->busypipes, p); ep->useraio = NULL; p->rcvmax = ep->rcvmax; - nni_aio_set_output(aio, 0, p); + nni_aio_set_output(aio, 0, p->npipe); nni_aio_finish(aio, 0, 0); } @@ -289,7 +244,7 @@ sfd_tran_pipe_nego_cb(void *arg) nni_aio_finish_error(uaio, rv); } nni_mtx_unlock(&ep->mtx); - sfd_tran_pipe_reap(p); + nni_pipe_close(p->npipe); } static void @@ -621,8 +576,6 @@ sfd_tran_pipe_start(sfd_tran_pipe *p, nng_stream *conn, sfd_tran_ep *ep) { nni_iov iov; - ep->refcnt++; - p->conn = conn; p->ep = ep; p->proto = ep->proto; @@ -659,13 +612,6 @@ sfd_tran_ep_fini(void *arg) { sfd_tran_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - ep->fini = true; - if (ep->refcnt != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - nni_mtx_unlock(&ep->mtx); nni_aio_stop(&ep->timeaio); nni_aio_stop(&ep->connaio); nng_stream_listener_free(ep->listener); @@ -731,14 +677,15 @@ sfd_tran_accept_cb(void *arg) } conn = nni_aio_get_output(aio, 0); - if ((rv = sfd_tran_pipe_alloc(&p)) != 0) { + if ((rv = nni_sp_pipe_alloc((void **) &p, NULL, ep->nlistener)) != 0) { nng_stream_free(conn); goto error; } + p->conn = conn; + p->ep = ep; if (ep->closed) { - sfd_tran_pipe_fini(p); - nng_stream_free(conn); + nni_pipe_close(p->npipe); rv = NNG_ECLOSED; goto error; } @@ -771,10 +718,10 @@ sfd_tran_accept_cb(void *arg) } static int -sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) +sfd_tran_ep_init(sfd_tran_ep **epp, nni_listener *nlistener) { sfd_tran_ep *ep; - NNI_ARG_UNUSED(url); + nni_sock *sock = nni_listener_sock(nlistener); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -784,7 +731,8 @@ sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) NNI_LIST_INIT(&ep->waitpipes, sfd_tran_pipe, node); NNI_LIST_INIT(&ep->negopipes, sfd_tran_pipe, node); - ep->proto = nni_sock_proto_id(sock); + ep->nlistener = nlistener; + ep->proto = nni_sock_proto_id(sock); #ifdef NNG_ENABLE_STATS static const nni_stat_info rcv_max_info = { @@ -806,7 +754,6 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) { sfd_tran_ep *ep; int rv; - nni_sock *sock = nni_listener_sock(nlistener); // Check for invalid URL components -- we only accept a bare scheme if ((url->u_hostname != NULL) || (strlen(url->u_path) != 0) || @@ -815,7 +762,7 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) return (NNG_EADDRINVAL); } - if ((rv = sfd_tran_ep_init(&ep, url, sock)) != 0) { + if ((rv = sfd_tran_ep_init(&ep, nlistener)) != 0) { return (rv); } @@ -920,6 +867,7 @@ sfd_tran_ep_accept(void *arg, nni_aio *aio) } static nni_sp_pipe_ops sfd_tran_pipe_ops = { + .p_size = sizeof(sfd_tran_pipe), .p_init = sfd_tran_pipe_init, .p_fini = sfd_tran_pipe_fini, .p_stop = sfd_tran_pipe_stop,