diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index 324ffd42e..65999162b 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -29,21 +29,21 @@ struct tcptran_pipe { uint16_t proto; size_t rcvmax; bool closed; - nni_list_node node; tcptran_ep *ep; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_mtx mtx; + nni_msg *rxmsg; uint8_t txlen[sizeof(uint64_t)]; uint8_t rxlen[sizeof(uint64_t)]; size_t gottxhead; size_t gotrxhead; size_t wanttxhead; size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio *txaio; - nni_aio *rxaio; - nni_aio *negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nni_list_node node; }; struct tcptran_ep { @@ -94,9 +94,9 @@ tcptran_pipe_close(void *arg) p->closed = true; nni_mtx_unlock(&p->mtx); - nni_aio_close(p->rxaio); - nni_aio_close(p->txaio); - nni_aio_close(p->negoaio); + nni_aio_close(&p->rxaio); + nni_aio_close(&p->txaio); + nni_aio_close(&p->negoaio); nng_stream_close(p->conn); } @@ -106,9 +106,9 @@ tcptran_pipe_stop(void *arg) { tcptran_pipe *p = arg; - nni_aio_stop(p->rxaio); - nni_aio_stop(p->txaio); - nni_aio_stop(p->negoaio); + nni_aio_stop(&p->rxaio); + nni_aio_stop(&p->txaio); + nni_aio_stop(&p->negoaio); } static int @@ -134,10 +134,10 @@ tcptran_pipe_fini(void *arg) } nng_stream_free(p->conn); - nni_aio_free(p->rxaio); - nni_aio_free(p->txaio); - nni_aio_free(p->negoaio); nni_msg_free(p->rxmsg); + nni_aio_fini(&p->rxaio); + nni_aio_fini(&p->txaio); + nni_aio_fini(&p->negoaio); nni_mtx_fini(&p->mtx); NNI_FREE_STRUCT(p); } @@ -146,19 +146,14 @@ static int tcptran_pipe_alloc(tcptran_pipe **pipep) { tcptran_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) != - 0)) { - tcptran_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p); nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); @@ -190,7 +185,7 @@ tcptran_pipe_nego_cb(void *arg) { tcptran_pipe *p = arg; tcptran_ep *ep = p->ep; - nni_aio *aio = p->negoaio; + nni_aio *aio = &p->negoaio; nni_aio *uaio; int rv; @@ -274,7 +269,7 @@ tcptran_pipe_send_cb(void *arg) nni_aio *aio; size_t n; nni_msg *msg; - nni_aio *txaio = p->txaio; + nni_aio *txaio = &p->txaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->sendq); @@ -321,7 +316,7 @@ tcptran_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg *msg; - nni_aio *rxaio = p->rxaio; + nni_aio *rxaio = &p->rxaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->recvq); @@ -431,7 +426,7 @@ tcptran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) // The callback on the txaio will cause the user aio to // be canceled too. if (nni_list_first(&p->sendq) == aio) { - nni_aio_abort(p->txaio, rv); + nni_aio_abort(&p->txaio, rv); nni_mtx_unlock(&p->mtx); return; } @@ -469,7 +464,7 @@ tcptran_pipe_send_start(tcptran_pipe *p) NNI_PUT64(p->txlen, len); - txaio = p->txaio; + txaio = &p->txaio; niov = 0; iov[0].iov_buf = p->txlen; iov[0].iov_len = sizeof(p->txlen); @@ -528,7 +523,7 @@ tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) // The callback on the rxaio will cause the user aio to // be canceled too. if (nni_list_first(&p->recvq) == aio) { - nni_aio_abort(p->rxaio, rv); + nni_aio_abort(&p->rxaio, rv); nni_mtx_unlock(&p->mtx); return; } @@ -557,7 +552,7 @@ tcptran_pipe_recv_start(tcptran_pipe *p) } // Schedule a read of the header. - rxaio = p->rxaio; + rxaio = &p->rxaio; iov.iov_buf = p->rxlen; iov.iov_len = sizeof(p->rxlen); nni_aio_set_iov(rxaio, 1, &iov); @@ -626,11 +621,11 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep) p->wanttxhead = 8; iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; - nni_aio_set_iov(p->negoaio, 1, &iov); + nni_aio_set_iov(&p->negoaio, 1, &iov); nni_list_append(&ep->negopipes, p); - nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate - nng_stream_send(p->conn, p->negoaio); + nni_aio_set_timeout(&p->negoaio, 10000); // 10 sec timeout to negotiate + nng_stream_send(p->conn, &p->negoaio); } static void