From 2e14f13af90a5296e421beb3ede083c7f9060bdc Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 2 Dec 2024 07:44:48 -0500 Subject: [PATCH] pipe: allocate and destroy pipe transport data with common pipe data This updates the pipe to use contiguous data for the transport data as well as the pipe protocol data. It updates sockfd to use this, and eliminates the need for the sockfd transport to do its own asynchronous reaping, thereby hopefully closing a shutdown race. The other transports will shortly get the same treatment. --- src/core/pipe.c | 62 ++++++++++++--- src/core/pipe.h | 4 +- src/core/socket.c | 10 ++- src/sp/transport.h | 17 +++- src/sp/transport/socket/sockfd.c | 132 ++++++++++--------------------- 5 files changed, 114 insertions(+), 111 deletions(-) diff --git a/src/core/pipe.c b/src/core/pipe.c index f13f874be..65531e4f6 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -11,8 +11,10 @@ #include +#include "core/defs.h" #include "core/nng_impl.h" +#include "core/pipe.h" #include "dialer.h" #include "listener.h" #include "sockimpl.h" @@ -233,28 +235,42 @@ pipe_stats_init(nni_pipe *p) static int pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) { - nni_pipe *p; - int rv; - void *sock_data = nni_sock_proto_data(sock); - nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); - size_t sz; + nni_pipe *p; + int rv; + void *sock_data = nni_sock_proto_data(sock); + const nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock); + const nni_sp_pipe_ops *tops = tran->tran_pipe; + size_t sz; - sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size; + sz = NNI_ALIGN_UP(sizeof(*p)) + NNI_ALIGN_UP(pops->pipe_size) + + NNI_ALIGN_UP(tops->p_size); if ((p = nni_zalloc(sz)) == NULL) { // In this case we just toss the pipe... - tran->tran_pipe->p_fini(tran_data); + // TODO: remove when all transports converted + // to use p_size. + if (tran_data != NULL) { + tops->p_fini(tran_data); + } return (NNG_ENOMEM); } + uint8_t *proto_data = (uint8_t *) p + NNI_ALIGN_UP(sizeof(*p)); + + if (tran_data == NULL) { + tran_data = proto_data + NNI_ALIGN_UP(pops->pipe_size); + } + p->p_size = sz; - p->p_proto_data = p + 1; - p->p_tran_ops = *tran->tran_pipe; - p->p_tran_data = tran_data; + p->p_proto_data = proto_data; p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_tran_data = tran_data; p->p_sock = sock; p->p_cbs = false; + // Two references - one for our caller, and + // one to be dropped when the pipe is closed. nni_refcnt_init(&p->p_refcnt, 2, p, pipe_destroy); nni_atomic_init_bool(&p->p_closed); @@ -272,7 +288,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) nni_sock_hold(sock); - if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tran_data, p)) != 0) || + if ((rv != 0) || ((rv = tops->p_init(tran_data, p)) != 0) || ((rv = pops->pipe_init(p->p_proto_data, p, sock_data)) != 0)) { nni_pipe_close(p); nni_pipe_rele(p); @@ -308,6 +324,30 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) return (0); } +void * +nni_pipe_tran_data(nni_pipe *p) +{ + return (p->p_tran_data); +} + +int +nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l) +{ + int rv; + nni_pipe *p; + if (d != NULL) { + rv = nni_pipe_create_dialer(&p, d, NULL); + } else if (l != NULL) { + rv = nni_pipe_create_listener(&p, l, NULL); + } else { + rv = NNG_EINVAL; + } + if (rv == 0) { + *datap = nni_pipe_tran_data(p); + } + return (rv); +} + int nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 9e7109de6..6295fac20 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -15,9 +15,10 @@ // OUTSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. +#include "nng/nng.h" + #include "core/defs.h" #include "core/thread.h" -#include "nng/nng.h" #include "sp/transport.h" // AIO @@ -62,5 +63,6 @@ extern void nni_pipe_bump_tx(nni_pipe *, size_t); extern void nni_pipe_bump_error(nni_pipe *, int); extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]); +extern void *nni_pipe_tran_data(nni_pipe *); #endif // CORE_PIPE_H diff --git a/src/core/socket.c b/src/core/socket.c index 4bc32d3f2..c6e880758 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -1272,9 +1272,13 @@ nni_listener_add_pipe(nni_listener *l, void *tpipe) nni_pipe *p; nni_mtx_lock(&s->s_mx); - if (nni_pipe_create_listener(&p, l, tpipe) != 0) { - nni_mtx_unlock(&s->s_mx); - return; + if (l->l_tran->tran_pipe->p_size != 0) { + p = tpipe; + } else { + if (nni_pipe_create_listener(&p, l, tpipe) != 0) { + nni_mtx_unlock(&s->s_mx); + return; + } } nni_list_append(&l->l_pipes, p); diff --git a/src/sp/transport.h b/src/sp/transport.h index b7134595a..4cc6bdb3d 100644 --- a/src/sp/transport.h +++ b/src/sp/transport.h @@ -12,6 +12,8 @@ #ifndef PROTOCOL_SP_TRANSPORT_H #define PROTOCOL_SP_TRANSPORT_H +#include "core/defs.h" +#include "core/list.h" #include "core/options.h" // Endpoint operations are called by the socket in a @@ -118,9 +120,14 @@ struct nni_sp_listener_ops { // pointers back to socket or even enclosing pipe state, are not // provided.) struct nni_sp_pipe_ops { - // p_init initializes the pipe data structures. The main - // purpose of this is so that the pipe will see the upper - // layer nni_pipe and get a chance to register stats and such. + // p_size specifies the size of the transport private pipe data. + size_t p_size; + + // p_init initializes the transport's pipe data structure. + // The pipe MUST be left in a state that p_fini can be safely + // called on it, even if it does not succeed. (The upper layers + // will call p_fini as part of the cleanup of a failure.) + // This function should not acquire any locks. int (*p_init)(void *, nni_pipe *); // p_fini destroys the pipe. This should clean up all local @@ -134,7 +141,7 @@ struct nni_sp_pipe_ops { // resources with p_fini. void (*p_stop)(void *); - // p_aio_send queues the message for transmit. If this fails, + // p_send queues the message for transmit. If this fails, // then the caller may try again with the same message (or free // it). If the call succeeds, then the transport has taken // ownership of the message, and the caller may not use it @@ -194,4 +201,6 @@ extern void nni_sp_tran_sys_init(void); extern void nni_sp_tran_sys_fini(void); extern void nni_sp_tran_register(nni_sp_tran *); +extern int nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l); + #endif // PROTOCOL_SP_TRANSPORT_H diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c index 45927d197..9c633db04 100644 --- a/src/sp/transport/socket/sockfd.c +++ b/src/sp/transport/socket/sockfd.c @@ -25,48 +25,44 @@ typedef struct sfd_tran_ep sfd_tran_ep; // sfd_tran_pipe wraps an open file descriptor struct sfd_tran_pipe { - nng_stream *conn; - nni_pipe *npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool closed; - nni_list_node node; - sfd_tran_ep *ep; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t txlen[sizeof(uint64_t)]; - uint8_t rxlen[sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio txaio; - nni_aio rxaio; - nni_aio negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + sfd_tran_ep *ep; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct sfd_tran_ep { nni_mtx mtx; uint16_t proto; size_t rcvmax; - bool fini; bool started; bool closed; nng_sockaddr src; - int refcnt; // active pipes nni_aio *useraio; nni_aio connaio; nni_aio timeaio; nni_list busypipes; // busy pipes -- ones passed to socket nni_list waitpipes; // pipes waiting to match to socket nni_list negopipes; // pipes busy negotiating - nni_reap_node reap; nng_stream_listener *listener; + nni_listener *nlistener; #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; @@ -81,16 +77,6 @@ static void sfd_tran_pipe_nego_cb(void *); static void sfd_tran_ep_fini(void *); static void sfd_tran_pipe_fini(void *); -static nni_reap_list sfd_tran_ep_reap_list = { - .rl_offset = offsetof(sfd_tran_ep, reap), - .rl_func = sfd_tran_ep_fini, -}; - -static nni_reap_list sfd_tran_pipe_reap_list = { - .rl_offset = offsetof(sfd_tran_pipe, reap), - .rl_func = sfd_tran_pipe_fini, -}; - static void sfd_tran_init(void) { @@ -132,6 +118,12 @@ sfd_tran_pipe_init(void *arg, nni_pipe *npipe) { sfd_tran_pipe *p = arg; p->npipe = npipe; + nni_mtx_init(&p->mtx); + nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); + nni_aio_list_init(&p->recvq); + nni_aio_list_init(&p->sendq); return (0); } @@ -146,10 +138,6 @@ sfd_tran_pipe_fini(void *arg) if ((ep = p->ep) != NULL) { nni_mtx_lock(&ep->mtx); nni_list_node_remove(&p->node); - ep->refcnt--; - if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&sfd_tran_ep_reap_list, ep); - } nni_mtx_unlock(&ep->mtx); } @@ -159,39 +147,6 @@ sfd_tran_pipe_fini(void *arg) nng_stream_free(p->conn); nni_msg_free(p->rxmsg); nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static void -sfd_tran_pipe_reap(sfd_tran_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->conn != NULL) { - nng_stream_close(p->conn); - } - nni_reap(&sfd_tran_pipe_reap_list, p); - } -} - -static int -sfd_tran_pipe_alloc(sfd_tran_pipe **pipep) -{ - sfd_tran_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&p->mtx); - nni_aio_init(&p->txaio, sfd_tran_pipe_send_cb, p); - nni_aio_init(&p->rxaio, sfd_tran_pipe_recv_cb, p); - nni_aio_init(&p->negoaio, sfd_tran_pipe_nego_cb, p); - nni_aio_list_init(&p->recvq); - nni_aio_list_init(&p->sendq); - nni_atomic_flag_reset(&p->reaped); - - *pipep = p; - - return (0); } static void @@ -208,7 +163,7 @@ sfd_tran_ep_match(sfd_tran_ep *ep) nni_list_append(&ep->busypipes, p); ep->useraio = NULL; p->rcvmax = ep->rcvmax; - nni_aio_set_output(aio, 0, p); + nni_aio_set_output(aio, 0, p->npipe); nni_aio_finish(aio, 0, 0); } @@ -289,7 +244,7 @@ sfd_tran_pipe_nego_cb(void *arg) nni_aio_finish_error(uaio, rv); } nni_mtx_unlock(&ep->mtx); - sfd_tran_pipe_reap(p); + nni_pipe_close(p->npipe); } static void @@ -621,8 +576,6 @@ sfd_tran_pipe_start(sfd_tran_pipe *p, nng_stream *conn, sfd_tran_ep *ep) { nni_iov iov; - ep->refcnt++; - p->conn = conn; p->ep = ep; p->proto = ep->proto; @@ -659,13 +612,6 @@ sfd_tran_ep_fini(void *arg) { sfd_tran_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - ep->fini = true; - if (ep->refcnt != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - nni_mtx_unlock(&ep->mtx); nni_aio_stop(&ep->timeaio); nni_aio_stop(&ep->connaio); nng_stream_listener_free(ep->listener); @@ -731,14 +677,15 @@ sfd_tran_accept_cb(void *arg) } conn = nni_aio_get_output(aio, 0); - if ((rv = sfd_tran_pipe_alloc(&p)) != 0) { + if ((rv = nni_sp_pipe_alloc((void **) &p, NULL, ep->nlistener)) != 0) { nng_stream_free(conn); goto error; } + p->conn = conn; + p->ep = ep; if (ep->closed) { - sfd_tran_pipe_fini(p); - nng_stream_free(conn); + nni_pipe_close(p->npipe); rv = NNG_ECLOSED; goto error; } @@ -771,10 +718,10 @@ sfd_tran_accept_cb(void *arg) } static int -sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) +sfd_tran_ep_init(sfd_tran_ep **epp, nni_listener *nlistener) { sfd_tran_ep *ep; - NNI_ARG_UNUSED(url); + nni_sock *sock = nni_listener_sock(nlistener); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -784,7 +731,8 @@ sfd_tran_ep_init(sfd_tran_ep **epp, nng_url *url, nni_sock *sock) NNI_LIST_INIT(&ep->waitpipes, sfd_tran_pipe, node); NNI_LIST_INIT(&ep->negopipes, sfd_tran_pipe, node); - ep->proto = nni_sock_proto_id(sock); + ep->nlistener = nlistener; + ep->proto = nni_sock_proto_id(sock); #ifdef NNG_ENABLE_STATS static const nni_stat_info rcv_max_info = { @@ -806,7 +754,6 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) { sfd_tran_ep *ep; int rv; - nni_sock *sock = nni_listener_sock(nlistener); // Check for invalid URL components -- we only accept a bare scheme if ((url->u_hostname != NULL) || (strlen(url->u_path) != 0) || @@ -815,7 +762,7 @@ sfd_tran_listener_init(void **lp, nng_url *url, nni_listener *nlistener) return (NNG_EADDRINVAL); } - if ((rv = sfd_tran_ep_init(&ep, url, sock)) != 0) { + if ((rv = sfd_tran_ep_init(&ep, nlistener)) != 0) { return (rv); } @@ -920,6 +867,7 @@ sfd_tran_ep_accept(void *arg, nni_aio *aio) } static nni_sp_pipe_ops sfd_tran_pipe_ops = { + .p_size = sizeof(sfd_tran_pipe), .p_init = sfd_tran_pipe_init, .p_fini = sfd_tran_pipe_fini, .p_stop = sfd_tran_pipe_stop,