diff --git a/src/sp/transport/ipc/ipc.c b/src/sp/transport/ipc/ipc.c index 803c4b4b4..ffdb6d886 100644 --- a/src/sp/transport/ipc/ipc.c +++ b/src/sp/transport/ipc/ipc.c @@ -13,6 +13,7 @@ #include "core/defs.h" #include "core/nng_impl.h" +#include "core/pipe.h" #include "nng/nng.h" // IPC transport. Platform specific IPC operations must be @@ -57,16 +58,15 @@ struct ipc_ep { bool started; bool closed; bool fini; - int ref_cnt; nng_stream_dialer *dialer; nng_stream_listener *listener; + nni_listener *nlistener; + nni_dialer *ndialer; nni_aio *user_aio; - nni_aio *conn_aio; - nni_aio *time_aio; - nni_list busy_pipes; // busy pipes -- ones passed to socket + nni_aio conn_aio; + nni_aio time_aio; nni_list wait_pipes; // pipes waiting to match to socket nni_list nego_pipes; // pipes busy negotiating - nni_reap_node reap; #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; #endif @@ -80,16 +80,6 @@ static void ipc_pipe_nego_cb(void *); static void ipc_pipe_fini(void *); static void ipc_ep_fini(void *); -static nni_reap_list ipc_ep_reap_list = { - .rl_offset = offsetof(ipc_ep, reap), - .rl_func = ipc_ep_fini, -}; - -static nni_reap_list ipc_pipe_reap_list = { - .rl_offset = offsetof(ipc_pipe, reap), - .rl_func = ipc_pipe_fini, -}; - static void ipc_tran_init(void) { @@ -119,11 +109,15 @@ ipc_pipe_close(void *arg) static void ipc_pipe_stop(void *arg) { - ipc_pipe *p = arg; + ipc_pipe *p = arg; + ipc_ep *ep = p->ep; nni_aio_stop(&p->rx_aio); nni_aio_stop(&p->tx_aio); nni_aio_stop(&p->neg_aio); + nni_mtx_lock(&ep->mtx); + nni_list_node_remove(&p->node); + nni_mtx_unlock(&ep->mtx); } static int @@ -131,6 +125,13 @@ ipc_pipe_init(void *arg, nni_pipe *pipe) { ipc_pipe *p = arg; p->pipe = pipe; + nni_mtx_init(&p->mtx); + nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p); + nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p); + nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p); + nni_aio_list_init(&p->send_q); + nni_aio_list_init(&p->recv_q); + nni_atomic_flag_reset(&p->reaped); return (0); } @@ -138,18 +139,8 @@ static void ipc_pipe_fini(void *arg) { ipc_pipe *p = arg; - ipc_ep *ep; ipc_pipe_stop(p); - if ((ep = p->ep) != NULL) { - nni_mtx_lock(&ep->mtx); - nni_list_node_remove(&p->node); - ep->ref_cnt--; - if (ep->fini && (ep->ref_cnt == 0)) { - nni_reap(&ipc_ep_reap_list, ep); - } - nni_mtx_unlock(&ep->mtx); - } nng_stream_free(p->conn); nni_aio_fini(&p->rx_aio); nni_aio_fini(&p->tx_aio); @@ -158,34 +149,6 @@ ipc_pipe_fini(void *arg) nni_msg_free(p->rx_msg); } nni_mtx_fini(&p->mtx); - NNI_FREE_STRUCT(p); -} - -static void -ipc_pipe_reap(ipc_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - nni_reap(&ipc_pipe_reap_list, p); - } -} - -static int -ipc_pipe_alloc(ipc_pipe **pipe_p) -{ - ipc_pipe *p; - - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&p->mtx); - nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p); - nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p); - nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p); - nni_aio_list_init(&p->send_q); - nni_aio_list_init(&p->recv_q); - nni_atomic_flag_reset(&p->reaped); - *pipe_p = p; - return (0); } static void @@ -199,10 +162,9 @@ ipc_ep_match(ipc_ep *ep) return; } nni_list_remove(&ep->wait_pipes, p); - nni_list_append(&ep->busy_pipes, p); ep->user_aio = NULL; p->rcv_max = ep->rcv_max; - nni_aio_set_output(aio, 0, p); + nni_aio_set_output(aio, 0, p->pipe); nni_aio_finish(aio, 0, 0); } @@ -282,7 +244,7 @@ ipc_pipe_nego_cb(void *arg) nni_aio_finish_error(user_aio, rv); } nni_mtx_unlock(&ep->mtx); - ipc_pipe_reap(p); + nni_pipe_close(p->pipe); } static void @@ -636,8 +598,6 @@ ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep) { nni_iov iov; - ep->ref_cnt++; - p->conn = conn; p->ep = ep; p->proto = ep->proto; @@ -665,27 +625,17 @@ ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep) static void ipc_ep_close(void *arg) { - ipc_ep *ep = arg; - ipc_pipe *p; + ipc_ep *ep = arg; nni_mtx_lock(&ep->mtx); ep->closed = true; - nni_aio_close(ep->time_aio); + nni_aio_close(&ep->time_aio); if (ep->dialer != NULL) { nng_stream_dialer_close(ep->dialer); } if (ep->listener != NULL) { nng_stream_listener_close(ep->listener); } - NNI_LIST_FOREACH (&ep->nego_pipes, p) { - ipc_pipe_close(p); - } - NNI_LIST_FOREACH (&ep->wait_pipes, p) { - ipc_pipe_close(p); - } - NNI_LIST_FOREACH (&ep->busy_pipes, p) { - ipc_pipe_close(p); - } if (ep->user_aio != NULL) { nni_aio_finish_error(ep->user_aio, NNG_ECLOSED); ep->user_aio = NULL; @@ -698,21 +648,13 @@ ipc_ep_fini(void *arg) { ipc_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - ep->fini = true; - if (ep->ref_cnt != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - nni_mtx_unlock(&ep->mtx); - nni_aio_stop(ep->time_aio); - nni_aio_stop(ep->conn_aio); + nni_aio_stop(&ep->time_aio); + nni_aio_stop(&ep->conn_aio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_aio_free(ep->time_aio); - nni_aio_free(ep->conn_aio); + nni_aio_fini(&ep->time_aio); + nni_aio_fini(&ep->conn_aio); nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); } static void @@ -720,8 +662,8 @@ ipc_ep_timer_cb(void *arg) { ipc_ep *ep = arg; nni_mtx_lock(&ep->mtx); - if (nni_aio_result(ep->time_aio) == 0) { - nng_stream_listener_accept(ep->listener, ep->conn_aio); + if (nni_aio_result(&ep->time_aio) == 0) { + nng_stream_listener_accept(ep->listener, &ep->conn_aio); } nni_mtx_unlock(&ep->mtx); } @@ -730,7 +672,7 @@ static void ipc_ep_accept_cb(void *arg) { ipc_ep *ep = arg; - nni_aio *aio = ep->conn_aio; + nni_aio *aio = &ep->conn_aio; ipc_pipe *p; int rv; nng_stream *conn; @@ -741,18 +683,21 @@ ipc_ep_accept_cb(void *arg) } conn = nni_aio_get_output(aio, 0); - if ((rv = ipc_pipe_alloc(&p)) != 0) { + + if (ep->closed) { + rv = NNG_ECLOSED; nng_stream_free(conn); goto error; } - if (ep->closed) { - ipc_pipe_fini(p); + rv = nni_pipe_alloc_listener((void **) &p, ep->nlistener); + if (rv != 0) { nng_stream_free(conn); - rv = NNG_ECLOSED; goto error; } + ipc_pipe_start(p, conn, ep); - nng_stream_listener_accept(ep->listener, ep->conn_aio); + + nng_stream_listener_accept(ep->listener, &ep->conn_aio); nni_mtx_unlock(&ep->mtx); return; @@ -765,16 +710,13 @@ ipc_ep_accept_cb(void *arg) } switch (rv) { - case NNG_ENOMEM: case NNG_ENOFILES: - nng_sleep_aio(10, ep->time_aio); + nng_sleep_aio(10, &ep->time_aio); break; default: - if (!ep->closed) { - nng_stream_listener_accept(ep->listener, ep->conn_aio); - } + nng_stream_listener_accept(ep->listener, &ep->conn_aio); break; } nni_mtx_unlock(&ep->mtx); @@ -784,56 +726,51 @@ static void ipc_ep_dial_cb(void *arg) { ipc_ep *ep = arg; - nni_aio *aio = ep->conn_aio; + nni_aio *aio = &ep->conn_aio; + nni_aio *uaio; ipc_pipe *p; int rv; nng_stream *conn; + nni_mtx_lock(&ep->mtx); if ((rv = nni_aio_result(aio)) != 0) { goto error; } conn = nni_aio_get_output(aio, 0); - if ((rv = ipc_pipe_alloc(&p)) != 0) { + + if (ep->closed) { nng_stream_free(conn); + rv = NNG_ECLOSED; goto error; } - nni_mtx_lock(&ep->mtx); - if (ep->closed) { - ipc_pipe_fini(p); + if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) { nng_stream_free(conn); - rv = NNG_ECLOSED; - nni_mtx_unlock(&ep->mtx); goto error; - } else { - ipc_pipe_start(p, conn, ep); } + + ipc_pipe_start(p, conn, ep); nni_mtx_unlock(&ep->mtx); return; error: // Error connecting. We need to pass this straight back // to the user. - nni_mtx_lock(&ep->mtx); - if ((aio = ep->user_aio) != NULL) { + if ((uaio = ep->user_aio) != NULL) { ep->user_aio = NULL; - nni_aio_finish_error(aio, rv); + nni_aio_finish_error(uaio, rv); } nni_mtx_unlock(&ep->mtx); } -static int -ipc_ep_init(ipc_ep **epp, nni_sock *sock) +static void +ipc_ep_init(ipc_ep *ep, nni_sock *sock, void (*conn_cb)(void *)) { - ipc_ep *ep; - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->busy_pipes, ipc_pipe, node); NNI_LIST_INIT(&ep->wait_pipes, ipc_pipe, node); NNI_LIST_INIT(&ep->nego_pipes, ipc_pipe, node); + nni_aio_init(&ep->conn_aio, conn_cb, ep); + nni_aio_init(&ep->time_aio, ipc_ep_timer_cb, ep); ep->proto = nni_sock_proto_id(sock); @@ -847,56 +784,44 @@ ipc_ep_init(ipc_ep **epp, nni_sock *sock) }; nni_stat_init(&ep->st_rcv_max, &rcv_max_info); #endif - - *epp = ep; - return (0); } static int ipc_ep_init_dialer(void **dp, nng_url *url, nni_dialer *dialer) { - ipc_ep *ep; + ipc_ep *ep = (void *) dp; int rv; nni_sock *sock = nni_dialer_sock(dialer); - if ((rv = ipc_ep_init(&ep, sock)) != 0) { - return (rv); - } + ipc_ep_init(ep, sock, ipc_ep_dial_cb); + ep->ndialer = dialer; - if (((rv = nni_aio_alloc(&ep->conn_aio, ipc_ep_dial_cb, ep)) != 0) || - ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) { - ipc_ep_fini(ep); + if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) { return (rv); } #ifdef NNG_ENABLE_STATS nni_dialer_add_stat(dialer, &ep->st_rcv_max); #endif - *dp = ep; return (0); } static int ipc_ep_init_listener(void **dp, nng_url *url, nni_listener *listener) { - ipc_ep *ep; + ipc_ep *ep = (void *) dp; int rv; nni_sock *sock = nni_listener_sock(listener); - if ((rv = ipc_ep_init(&ep, sock)) != 0) { - return (rv); - } + ipc_ep_init(ep, sock, ipc_ep_accept_cb); + ep->nlistener = listener; - if (((rv = nni_aio_alloc(&ep->conn_aio, ipc_ep_accept_cb, ep)) != 0) || - ((rv = nni_aio_alloc(&ep->time_aio, ipc_ep_timer_cb, ep)) != 0) || - ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { - ipc_ep_fini(ep); + if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) { return (rv); } #ifdef NNG_ENABLE_STATS nni_listener_add_stat(listener, &ep->st_rcv_max); #endif - *dp = ep; return (0); } @@ -939,7 +864,7 @@ ipc_ep_connect(void *arg, nni_aio *aio) return; } ep->user_aio = aio; - nng_stream_dialer_dial(ep->dialer, ep->conn_aio); + nng_stream_dialer_dial(ep->dialer, &ep->conn_aio); nni_mtx_unlock(&ep->mtx); } @@ -1013,7 +938,7 @@ ipc_ep_accept(void *arg, nni_aio *aio) ep->user_aio = aio; if (!ep->started) { ep->started = true; - nng_stream_listener_accept(ep->listener, ep->conn_aio); + nng_stream_listener_accept(ep->listener, &ep->conn_aio); } else { ipc_ep_match(ep); } @@ -1030,6 +955,7 @@ ipc_pipe_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) } static nni_sp_pipe_ops ipc_tran_pipe_ops = { + .p_size = sizeof(ipc_pipe), .p_init = ipc_pipe_init, .p_fini = ipc_pipe_fini, .p_stop = ipc_pipe_stop, @@ -1117,6 +1043,7 @@ ipc_listener_set_sec_desc(void *arg, void *pdesc) } static nni_sp_dialer_ops ipc_dialer_ops = { + .d_size = sizeof(ipc_ep), .d_init = ipc_ep_init_dialer, .d_fini = ipc_ep_fini, .d_connect = ipc_ep_connect, @@ -1126,6 +1053,7 @@ static nni_sp_dialer_ops ipc_dialer_ops = { }; static nni_sp_listener_ops ipc_listener_ops = { + .l_size = sizeof(ipc_ep), .l_init = ipc_ep_init_listener, .l_fini = ipc_ep_fini, .l_bind = ipc_ep_bind,