Skip to content

Commit

Permalink
tcp transport: inline aio objects for pipe, and organize members
Browse files Browse the repository at this point in the history
We try to put the least likely stuff far from the hotter stuff in
the struct to get better cacheline efficiency.
  • Loading branch information
gdamore committed Nov 25, 2024
1 parent a9f542d commit f6ddada
Showing 1 changed file with 30 additions and 35 deletions.
65 changes: 30 additions & 35 deletions src/sp/transport/tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ struct tcptran_pipe {
uint16_t proto;
size_t rcvmax;
bool closed;
nni_list_node node;
tcptran_ep *ep;
nni_list recvq;
nni_list sendq;
nni_aio txaio;
nni_aio rxaio;
nni_aio negoaio;
nni_mtx mtx;
nni_msg *rxmsg;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
nni_list_node node;
};

struct tcptran_ep {
Expand Down Expand Up @@ -94,9 +94,9 @@ tcptran_pipe_close(void *arg)
p->closed = true;
nni_mtx_unlock(&p->mtx);

nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
nni_aio_close(&p->rxaio);
nni_aio_close(&p->txaio);
nni_aio_close(&p->negoaio);

nng_stream_close(p->conn);
}
Expand All @@ -106,9 +106,9 @@ tcptran_pipe_stop(void *arg)
{
tcptran_pipe *p = arg;

nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
nni_aio_stop(&p->rxaio);
nni_aio_stop(&p->txaio);
nni_aio_stop(&p->negoaio);
}

static int
Expand All @@ -134,10 +134,10 @@ tcptran_pipe_fini(void *arg)
}

nng_stream_free(p->conn);
nni_aio_free(p->rxaio);
nni_aio_free(p->txaio);
nni_aio_free(p->negoaio);
nni_msg_free(p->rxmsg);
nni_aio_fini(&p->rxaio);
nni_aio_fini(&p->txaio);
nni_aio_fini(&p->negoaio);
nni_mtx_fini(&p->mtx);
NNI_FREE_STRUCT(p);
}
Expand All @@ -146,19 +146,14 @@ static int
tcptran_pipe_alloc(tcptran_pipe **pipep)
{
tcptran_pipe *p;
int rv;

if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) !=
0)) {
tcptran_pipe_fini(p);
return (rv);
}
nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p);
nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p);
nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p);
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);

Expand Down Expand Up @@ -190,7 +185,7 @@ tcptran_pipe_nego_cb(void *arg)
{
tcptran_pipe *p = arg;
tcptran_ep *ep = p->ep;
nni_aio *aio = p->negoaio;
nni_aio *aio = &p->negoaio;
nni_aio *uaio;
int rv;

Expand Down Expand Up @@ -274,7 +269,7 @@ tcptran_pipe_send_cb(void *arg)
nni_aio *aio;
size_t n;
nni_msg *msg;
nni_aio *txaio = p->txaio;
nni_aio *txaio = &p->txaio;

nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->sendq);
Expand Down Expand Up @@ -321,7 +316,7 @@ tcptran_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg *msg;
nni_aio *rxaio = p->rxaio;
nni_aio *rxaio = &p->rxaio;

nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->recvq);
Expand Down Expand Up @@ -431,7 +426,7 @@ tcptran_pipe_send_cancel(nni_aio *aio, void *arg, int rv)
// The callback on the txaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->sendq) == aio) {
nni_aio_abort(p->txaio, rv);
nni_aio_abort(&p->txaio, rv);

Check warning on line 429 in src/sp/transport/tcp/tcp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/tcp/tcp.c#L429

Added line #L429 was not covered by tests
nni_mtx_unlock(&p->mtx);
return;
}
Expand Down Expand Up @@ -469,7 +464,7 @@ tcptran_pipe_send_start(tcptran_pipe *p)

NNI_PUT64(p->txlen, len);

txaio = p->txaio;
txaio = &p->txaio;
niov = 0;
iov[0].iov_buf = p->txlen;
iov[0].iov_len = sizeof(p->txlen);
Expand Down Expand Up @@ -528,7 +523,7 @@ tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv)
// The callback on the rxaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->recvq) == aio) {
nni_aio_abort(p->rxaio, rv);
nni_aio_abort(&p->rxaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
Expand Down Expand Up @@ -557,7 +552,7 @@ tcptran_pipe_recv_start(tcptran_pipe *p)
}

// Schedule a read of the header.
rxaio = p->rxaio;
rxaio = &p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
Expand Down Expand Up @@ -626,11 +621,11 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep)
p->wanttxhead = 8;
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(p->negoaio, 1, &iov);
nni_aio_set_iov(&p->negoaio, 1, &iov);
nni_list_append(&ep->negopipes, p);

nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, p->negoaio);
nni_aio_set_timeout(&p->negoaio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, &p->negoaio);
}

static void
Expand Down

0 comments on commit f6ddada

Please sign in to comment.