Skip to content

Commit

Permalink
websocket: inline the aios
Browse files Browse the repository at this point in the history
This covers both the ttransport and the supplemental layers.
  • Loading branch information
gdamore committed Dec 7, 2024
1 parent f8a314e commit 32c78f7
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 94 deletions.
71 changes: 33 additions & 38 deletions src/sp/transport/ws/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct ws_dialer {
uint16_t peer; // remote protocol
nni_list aios;
nni_mtx mtx;
nni_aio *connaio;
nni_aio connaio;
nng_stream_dialer *dialer;
bool started;
};
Expand All @@ -35,7 +35,7 @@ struct ws_listener {
uint16_t peer; // remote protocol
nni_list aios;
nni_mtx mtx;
nni_aio *accaio;
nni_aio accaio;
nng_stream_listener *listener;
bool started;
};
Expand All @@ -46,20 +46,19 @@ struct ws_pipe {
uint16_t peer;
nni_aio *user_txaio;
nni_aio *user_rxaio;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio txaio;
nni_aio rxaio;
nng_stream *ws;
};

static void
wstran_pipe_send_cb(void *arg)
{
ws_pipe *p = arg;
nni_aio *taio;
ws_pipe *p = arg;
nni_aio *taio = &p->txaio;
nni_aio *uaio;

nni_mtx_lock(&p->mtx);
taio = p->txaio;
uaio = p->user_txaio;
p->user_txaio = NULL;

Expand All @@ -78,7 +77,7 @@ static void
wstran_pipe_recv_cb(void *arg)
{
ws_pipe *p = arg;
nni_aio *raio = p->rxaio;
nni_aio *raio = &p->rxaio;
nni_aio *uaio;
int rv;

Expand Down Expand Up @@ -110,7 +109,7 @@ wstran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv)
return;
}
p->user_rxaio = NULL;
nni_aio_abort(p->rxaio, rv);
nni_aio_abort(&p->rxaio, rv);
nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&p->mtx);
}
Expand All @@ -131,7 +130,7 @@ wstran_pipe_recv(void *arg, nni_aio *aio)
return;
}
p->user_rxaio = aio;
nng_stream_recv(p->ws, p->rxaio);
nng_stream_recv(p->ws, &p->rxaio);
nni_mtx_unlock(&p->mtx);
}

Expand All @@ -145,7 +144,7 @@ wstran_pipe_send_cancel(nni_aio *aio, void *arg, int rv)
return;
}
p->user_txaio = NULL;
nni_aio_abort(p->txaio, rv);
nni_aio_abort(&p->txaio, rv);

Check warning on line 147 in src/sp/transport/ws/websocket.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/ws/websocket.c#L147

Added line #L147 was not covered by tests
nni_aio_finish_error(aio, rv);
nni_mtx_unlock(&p->mtx);
}
Expand All @@ -170,10 +169,10 @@ wstran_pipe_send(void *arg, nni_aio *aio)
return;
}
p->user_txaio = aio;
nni_aio_set_msg(p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(&p->txaio, nni_aio_get_msg(aio));
nni_aio_set_msg(aio, NULL);

nng_stream_send(p->ws, p->txaio);
nng_stream_send(p->ws, &p->txaio);
nni_mtx_unlock(&p->mtx);
}

Expand All @@ -182,8 +181,8 @@ wstran_pipe_stop(void *arg)
{
ws_pipe *p = arg;

nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(&p->rxaio);
nni_aio_stop(&p->txaio);
}

static int
Expand All @@ -200,8 +199,8 @@ wstran_pipe_fini(void *arg)
ws_pipe *p = arg;

nng_stream_free(p->ws);
nni_aio_free(p->rxaio);
nni_aio_free(p->txaio);
nni_aio_fini(&p->rxaio);
nni_aio_fini(&p->txaio);

nni_mtx_fini(&p->mtx);
NNI_FREE_STRUCT(p);
Expand All @@ -212,8 +211,8 @@ wstran_pipe_close(void *arg)
{
ws_pipe *p = arg;

nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(&p->rxaio);
nni_aio_close(&p->txaio);

nni_mtx_lock(&p->mtx);
nng_stream_close(p->ws);
Expand All @@ -224,20 +223,16 @@ static int
wstran_pipe_alloc(ws_pipe **pipep, void *ws)
{
ws_pipe *p;
int rv;

if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
p->ws = ws;
nni_mtx_init(&p->mtx);

// Initialize AIOs.
if (((rv = nni_aio_alloc(&p->txaio, wstran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->rxaio, wstran_pipe_recv_cb, p)) != 0)) {
wstran_pipe_fini(p);
return (rv);
}
p->ws = ws;
nni_aio_init(&p->txaio, wstran_pipe_send_cb, p);
nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p);

*pipep = p;
return (0);
Expand Down Expand Up @@ -300,7 +295,7 @@ wstran_listener_accept(void *arg, nni_aio *aio)
}
nni_list_append(&l->aios, aio);
if (aio == nni_list_first(&l->aios)) {
nng_stream_listener_accept(l->listener, l->accaio);
nng_stream_listener_accept(l->listener, &l->accaio);
}
nni_mtx_unlock(&l->mtx);
}
Expand Down Expand Up @@ -337,7 +332,7 @@ wstran_dialer_connect(void *arg, nni_aio *aio)
NNI_ASSERT(nni_list_empty(&d->aios));
d->started = true;
nni_list_append(&d->aios, aio);
nng_stream_dialer_dial(d->dialer, d->connaio);
nng_stream_dialer_dial(d->dialer, &d->connaio);
nni_mtx_unlock(&d->mtx);
}

Expand Down Expand Up @@ -377,9 +372,9 @@ wstran_dialer_fini(void *arg)
{
ws_dialer *d = arg;

nni_aio_stop(d->connaio);
nni_aio_stop(&d->connaio);
nng_stream_dialer_free(d->dialer);
nni_aio_free(d->connaio);
nni_aio_fini(&d->connaio);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}
Expand All @@ -389,9 +384,9 @@ wstran_listener_fini(void *arg)
{
ws_listener *l = arg;

nni_aio_stop(l->accaio);
nni_aio_stop(&l->accaio);
nng_stream_listener_free(l->listener);
nni_aio_free(l->accaio);
nni_aio_fini(&l->accaio);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}
Expand All @@ -401,7 +396,7 @@ wstran_connect_cb(void *arg)
{
ws_dialer *d = arg;
ws_pipe *p;
nni_aio *caio = d->connaio;
nni_aio *caio = &d->connaio;
nni_aio *uaio;
int rv;
nng_stream *ws = NULL;
Expand Down Expand Up @@ -437,7 +432,7 @@ wstran_dialer_close(void *arg)
{
ws_dialer *d = arg;

nni_aio_close(d->connaio);
nni_aio_close(&d->connaio);
nng_stream_dialer_close(d->dialer);
}

Expand All @@ -446,15 +441,15 @@ wstran_listener_close(void *arg)
{
ws_listener *l = arg;

nni_aio_close(l->accaio);
nni_aio_close(&l->accaio);
nng_stream_listener_close(l->listener);
}

static void
wstran_accept_cb(void *arg)
{
ws_listener *l = arg;
nni_aio *aaio = l->accaio;
nni_aio *aaio = &l->accaio;
nni_aio *uaio;
int rv;

Expand Down Expand Up @@ -502,14 +497,14 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
nni_mtx_init(&d->mtx);

nni_aio_list_init(&d->aios);
nni_aio_init(&d->connaio, wstran_connect_cb, d);

d->peer = nni_sock_peer_id(s);

snprintf(
name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_peer_name(s));

if (((rv = nni_ws_dialer_alloc(&d->dialer, url)) != 0) ||
((rv = nni_aio_alloc(&d->connaio, wstran_connect_cb, d)) != 0) ||
((rv = nng_stream_dialer_set_bool(
d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_dialer_set_string(
Expand All @@ -536,14 +531,14 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
nni_mtx_init(&l->mtx);

nni_aio_list_init(&l->aios);
nni_aio_init(&l->accaio, wstran_accept_cb, l);

l->peer = nni_sock_peer_id(s);

snprintf(
name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_proto_name(s));

if (((rv = nni_ws_listener_alloc(&l->listener, url)) != 0) ||
((rv = nni_aio_alloc(&l->accaio, wstran_accept_cb, l)) != 0) ||
((rv = nng_stream_listener_set_bool(
l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_listener_set_string(
Expand Down
Loading

0 comments on commit 32c78f7

Please sign in to comment.