diff --git a/src/core/pipe.c b/src/core/pipe.c index 1558536a9..28cf451af 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -292,17 +292,12 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data) return (0); } -int -nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) +static void +pipe_init_dialer(nni_pipe *p, nni_dialer *d) { - int rv; - nni_sp_tran *tran = d->d_tran; - nni_pipe *p; - - if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) { - return (rv); - } p->p_dialer = d; + nni_sock_hold(d->d_sock); + nni_dialer_hold(d); #ifdef NNG_ENABLE_STATS static const nni_stat_info dialer_info = { .si_name = "dialer", @@ -312,34 +307,38 @@ nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) pipe_stat_init(p, &p->st_ep_id, &dialer_info); nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d)); #endif - nni_sock_hold(d->d_sock); - nni_dialer_hold(d); - *pp = p; - return (0); } -void * -nni_pipe_tran_data(nni_pipe *p) +static void +pipe_init_listener(nni_pipe *p, nni_listener *l) { - return (p->p_tran_data); + p->p_listener = l; + nni_listener_hold(l); + nni_sock_hold(l->l_sock); +#ifdef NNG_ENABLE_STATS + static const nni_stat_info listener_info = { + .si_name = "listener", + .si_desc = "listener for pipe", + .si_type = NNG_STAT_ID, + }; + pipe_stat_init(p, &p->st_ep_id, &listener_info); + nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l)); +#endif } int -nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l) +nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tran_data) { - int rv; - nni_pipe *p; - if (d != NULL) { - rv = nni_pipe_create_dialer(&p, d, NULL); - } else if (l != NULL) { - rv = nni_pipe_create_listener(&p, l, NULL); - } else { - rv = NNG_EINVAL; - } - if (rv == 0) { - *datap = nni_pipe_tran_data(p); + int rv; + nni_sp_tran *tran = d->d_tran; + nni_pipe *p; + + if ((rv = pipe_create(&p, d->d_sock, tran, tran_data)) != 0) { + return (rv); } - return (rv); + pipe_init_dialer(p, d); + *pp = p; + return (0); } int @@ -352,22 +351,43 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data) if ((rv = pipe_create(&p, l->l_sock, tran, tran_data)) != 0) { return (rv); } - p->p_listener = l; -#ifdef NNG_ENABLE_STATS - static const nni_stat_info listener_info = { - .si_name = "listener", - .si_desc = "listener for pipe", - .si_type = NNG_STAT_ID, - }; - pipe_stat_init(p, &p->st_ep_id, &listener_info); - nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l)); -#endif - nni_listener_hold(l); - nni_sock_hold(l->l_sock); + pipe_init_listener(p, l); *pp = p; return (0); } +int +nni_pipe_alloc_dialer(void **datap, nni_dialer *d) +{ + int rv; + nni_sp_tran *tran = d->d_tran; + nni_sock *s = d->d_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, NULL)) != 0) { + return (rv); + } + pipe_init_dialer(p, d); + *datap = p->p_tran_data; + return (0); +} + +int +nni_pipe_alloc_listener(void **datap, nni_listener *l) +{ + int rv; + nni_sp_tran *tran = l->l_tran; + nni_sock *s = l->l_sock; + nni_pipe *p; + + if ((rv = pipe_create(&p, s, tran, NULL)) != 0) { + return (rv); + } + pipe_init_listener(p, l); + *datap = p->p_tran_data; + return (0); +} + int nni_pipe_getopt( nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t) diff --git a/src/core/pipe.h b/src/core/pipe.h index 6295fac20..8289bae9c 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -63,6 +63,8 @@ extern void nni_pipe_bump_tx(nni_pipe *, size_t); extern void nni_pipe_bump_error(nni_pipe *, int); extern char *nni_pipe_peer_addr(nni_pipe *p, char buf[NNG_MAXADDRSTRLEN]); -extern void *nni_pipe_tran_data(nni_pipe *); + +extern int nni_pipe_alloc_dialer(void **, nni_dialer *); +extern int nni_pipe_alloc_listener(void **, nni_listener *); #endif // CORE_PIPE_H diff --git a/src/sp/transport.h b/src/sp/transport.h index 4cc6bdb3d..091df7209 100644 --- a/src/sp/transport.h +++ b/src/sp/transport.h @@ -201,6 +201,4 @@ extern void nni_sp_tran_sys_init(void); extern void nni_sp_tran_sys_fini(void); extern void nni_sp_tran_register(nni_sp_tran *); -extern int nni_sp_pipe_alloc(void **datap, nni_dialer *d, nni_listener *l); - #endif // PROTOCOL_SP_TRANSPORT_H diff --git a/src/sp/transport/socket/sockfd.c b/src/sp/transport/socket/sockfd.c index 6d7d385a8..6950df416 100644 --- a/src/sp/transport/socket/sockfd.c +++ b/src/sp/transport/socket/sockfd.c @@ -661,21 +661,20 @@ sfd_tran_accept_cb(void *arg) if ((rv = nni_aio_result(aio)) != 0) { goto error; } - conn = nni_aio_get_output(aio, 0); - if ((rv = nni_sp_pipe_alloc((void **) &p, NULL, ep->nlistener)) != 0) { + if (ep->closed) { nng_stream_free(conn); + rv = NNG_ECLOSED; goto error; } - p->conn = conn; - p->ep = ep; - if (ep->closed) { - nni_pipe_close(p->npipe); - nni_pipe_rele(p->npipe); - rv = NNG_ECLOSED; + if ((rv = nni_pipe_alloc_listener((void **) &p, ep->nlistener)) != 0) { + nng_stream_free(conn); goto error; } + p->conn = conn; + p->ep = ep; + sfd_tran_pipe_start(p, conn, ep); nng_stream_listener_accept(ep->listener, &ep->connaio); nni_mtx_unlock(&ep->mtx); @@ -829,15 +828,13 @@ sfd_tran_ep_accept(void *arg, nni_aio *aio) if ((rv = nni_aio_schedule(aio, sfd_tran_ep_cancel, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); - return; } ep->useraio = aio; if (!ep->started) { ep->started = true; nng_stream_listener_accept(ep->listener, &ep->connaio); - } else { - sfd_tran_ep_match(ep); } + sfd_tran_ep_match(ep); nni_mtx_unlock(&ep->mtx); }