Skip to content

Commit

Permalink
websocket: more aio inlining (generic websocket layer)
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Dec 1, 2024
1 parent 40b2445 commit 635be5c
Showing 1 changed file with 49 additions and 56 deletions.
105 changes: 49 additions & 56 deletions src/supplemental/websocket/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ struct nni_ws {
nni_list rxq;
ws_frame *txframe;
ws_frame *rxframe;
nni_aio *txaio; // physical aios
nni_aio *rxaio;
nni_aio *closeaio; // used for lingering/draining close
nni_aio *httpaio;
nni_aio *connaio; // connect aio
nni_aio txaio; // physical aios
nni_aio rxaio;
nni_aio closeaio; // used for lingering/draining close
nni_aio httpaio;
nni_aio connaio; // connect aio
nni_aio *useraio; // user aio, during HTTP negotiation
nni_http_conn *http;
nni_http_req *req;
Expand Down Expand Up @@ -519,9 +519,9 @@ ws_close_cb(void *arg)
nni_ws *ws = arg;
ws_frame *frame;

nni_aio_close(ws->txaio);
nni_aio_close(ws->rxaio);
nni_aio_close(ws->httpaio);
nni_aio_close(&ws->txaio);
nni_aio_close(&ws->rxaio);
nni_aio_close(&ws->httpaio);

// Either we sent a close frame, or we didn't. Either way,
// we are done, and its time to abort everything else.
Expand Down Expand Up @@ -560,8 +560,8 @@ ws_close(nni_ws *ws, uint16_t code)
// pending connect request.
if (!ws->closed) {
// ABORT connection negotiation.
nni_aio_close(ws->connaio);
nni_aio_close(ws->httpaio);
nni_aio_close(&ws->connaio);
nni_aio_close(&ws->httpaio);
ws_send_close(ws, code);
}
}
Expand Down Expand Up @@ -593,8 +593,8 @@ ws_start_write(nni_ws *ws)
iov[1].iov_len = frame->len;
iov[1].iov_buf = frame->buf;
}
nni_aio_set_iov(ws->txaio, niov, iov);
nni_http_write_full(ws->http, ws->txaio);
nni_aio_set_iov(&ws->txaio, niov, iov);
nni_http_write_full(ws->http, &ws->txaio);
}

static void
Expand Down Expand Up @@ -642,15 +642,15 @@ ws_write_cb(void *arg)
if (ws->peer_closed) {
if (ws->wclose) { // could assert this?
ws->wclose = false;
nni_aio_finish(ws->closeaio, 0, 0);
nni_aio_finish(&ws->closeaio, 0, 0);
}
}
nni_mtx_unlock(&ws->mtx);
return;
}

aio = frame->aio;
if ((rv = nni_aio_result(ws->txaio)) != 0) {
if ((rv = nni_aio_result(&ws->txaio)) != 0) {
// if tx fails, we can't send a close frame either
// we expect the caller to just close this connection
frame->aio = NULL;
Expand Down Expand Up @@ -718,7 +718,7 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv)
}
frame = nni_aio_get_prov_data(aio);
if (frame == ws->txframe) {
nni_aio_abort(ws->txaio, rv);
nni_aio_abort(&ws->txaio, rv);

Check warning on line 721 in src/supplemental/websocket/websocket.c

View check run for this annotation

Codecov / codecov/patch

src/supplemental/websocket/websocket.c#L721

Added line #L721 was not covered by tests
// We will wait for callback on the txaio to finish aio.
} else {
// If scheduled, just need to remove node and complete it.
Expand All @@ -737,15 +737,14 @@ ws_send_close(nni_ws *ws, uint16_t code)
ws_frame *frame;
uint8_t buf[sizeof(uint16_t)];
int rv;
nni_aio *aio;
nni_aio *aio = &ws->closeaio;

NNI_PUT16(buf, code);

if (ws->closed || !ws->ready) {
return;
}
ws->closed = true;
aio = ws->closeaio;

if (nni_aio_begin(aio) != 0) {
return;
Expand Down Expand Up @@ -818,7 +817,7 @@ ws_start_read(nni_ws *ws)
frame->len = 0;
ws->rxframe = frame;

aio = ws->rxaio;
aio = &ws->rxaio;
iov.iov_len = 2; // We want the first two bytes.
iov.iov_buf = frame->head;
nni_aio_set_iov(aio, 1, &iov);
Expand Down Expand Up @@ -999,7 +998,7 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame)
ws_close(ws, WS_CLOSE_NORMAL_CLOSE);
} else {
ws->wclose = false;
nni_aio_finish(ws->closeaio, 0, 0);
nni_aio_finish(&ws->closeaio, 0, 0);
}
return;
default:
Expand All @@ -1014,7 +1013,7 @@ static void
ws_read_cb(void *arg)
{
nni_ws *ws = arg;
nni_aio *aio = ws->rxaio;
nni_aio *aio = &ws->rxaio;
ws_frame *frame;

nni_mtx_lock(&ws->mtx);
Expand Down Expand Up @@ -1193,15 +1192,13 @@ ws_fini(void *arg)
ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE);

// Give a chance for the close frame to drain.
if (ws->closeaio) {
nni_aio_wait(ws->closeaio);
}
nni_aio_wait(&ws->closeaio);

nni_aio_stop(ws->rxaio);
nni_aio_stop(ws->txaio);
nni_aio_stop(ws->closeaio);
nni_aio_stop(ws->httpaio);
nni_aio_stop(ws->connaio);
nni_aio_stop(&ws->rxaio);
nni_aio_stop(&ws->txaio);
nni_aio_stop(&ws->closeaio);
nni_aio_stop(&ws->httpaio);
nni_aio_stop(&ws->connaio);

if (nni_list_node_active(&ws->node)) {
nni_ws_dialer *d;
Expand Down Expand Up @@ -1252,11 +1249,11 @@ ws_fini(void *arg)

nni_strfree(ws->reqhdrs);
nni_strfree(ws->reshdrs);
nni_aio_free(ws->rxaio);
nni_aio_free(ws->txaio);
nni_aio_free(ws->closeaio);
nni_aio_free(ws->httpaio);
nni_aio_free(ws->connaio);
nni_aio_fini(&ws->rxaio);
nni_aio_fini(&ws->txaio);
nni_aio_fini(&ws->closeaio);
nni_aio_fini(&ws->httpaio);
nni_aio_fini(&ws->connaio);
nni_mtx_fini(&ws->mtx);
NNI_FREE_STRUCT(ws);
}
Expand Down Expand Up @@ -1335,7 +1332,7 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio)
if ((rv = nni_http_res_alloc(&ws->res)) != 0) {
goto err;
}
nni_http_read_res(ws->http, ws->res, ws->httpaio);
nni_http_read_res(ws->http, ws->res, &ws->httpaio);
nni_mtx_unlock(&d->mtx);
return;
}
Expand Down Expand Up @@ -1419,7 +1416,7 @@ static void
ws_http_cb(void *arg)
{
nni_ws *ws = arg;
nni_aio *aio = ws->httpaio;
nni_aio *aio = &ws->httpaio;

if (ws->server) {
ws_http_cb_listener(ws, aio);
Expand All @@ -1432,7 +1429,6 @@ static int
ws_init(nni_ws **wsp)
{
nni_ws *ws;
int rv;

if ((ws = NNI_ALLOC_STRUCT(ws)) == NULL) {
return (NNG_ENOMEM);
Expand All @@ -1443,17 +1439,14 @@ ws_init(nni_ws **wsp)
nni_aio_list_init(&ws->sendq);
nni_aio_list_init(&ws->recvq);

if (((rv = nni_aio_alloc(&ws->closeaio, ws_close_cb, ws)) != 0) ||
((rv = nni_aio_alloc(&ws->txaio, ws_write_cb, ws)) != 0) ||
((rv = nni_aio_alloc(&ws->rxaio, ws_read_cb, ws)) != 0) ||
((rv = nni_aio_alloc(&ws->httpaio, ws_http_cb, ws)) != 0) ||
((rv = nni_aio_alloc(&ws->connaio, ws_conn_cb, ws)) != 0)) {
ws_fini(ws);
return (rv);
}
nni_aio_init(&ws->closeaio, ws_close_cb, ws);
nni_aio_init(&ws->txaio, ws_write_cb, ws);
nni_aio_init(&ws->rxaio, ws_read_cb, ws);
nni_aio_init(&ws->httpaio, ws_http_cb, ws);
nni_aio_init(&ws->connaio, ws_conn_cb, ws);

nni_aio_set_timeout(ws->closeaio, 100);
nni_aio_set_timeout(ws->httpaio, 2000);
nni_aio_set_timeout(&ws->closeaio, 100);
nni_aio_set_timeout(&ws->httpaio, 2000);

ws->ops.s_close = ws_str_close;
ws->ops.s_free = ws_str_free;
Expand Down Expand Up @@ -1676,7 +1669,7 @@ ws_handler(nni_aio *aio)
ws->listener = l;

nni_list_append(&l->reply, ws);
nni_http_write_res(conn, res, ws->httpaio);
nni_http_write_res(conn, res, &ws->httpaio);
(void) nni_http_hijack(conn);
nni_aio_set_output(aio, 0, NULL);
nni_aio_finish(aio, 0, 0);
Expand Down Expand Up @@ -2181,7 +2174,7 @@ ws_conn_cb(void *arg)
ws = arg;

d = ws->dialer;
if ((rv = nni_aio_result(ws->connaio)) != 0) {
if ((rv = nni_aio_result(&ws->connaio)) != 0) {
nni_mtx_lock(&ws->mtx);
if ((uaio = ws->useraio) != NULL) {
ws->useraio = NULL;
Expand All @@ -2205,8 +2198,8 @@ ws_conn_cb(void *arg)

nni_mtx_lock(&ws->mtx);
uaio = ws->useraio;
http = nni_aio_get_output(ws->connaio, 0);
nni_aio_set_output(ws->connaio, 0, NULL);
http = nni_aio_get_output(&ws->connaio, 0);
nni_aio_set_output(&ws->connaio, 0, NULL);
if (uaio == NULL) {
// This request was canceled for some reason.
nni_http_conn_fini(http);
Expand Down Expand Up @@ -2245,7 +2238,7 @@ ws_conn_cb(void *arg)
ws->http = http;
ws->req = req;

nni_http_write_req(http, req, ws->httpaio);
nni_http_write_req(http, req, &ws->httpaio);
nni_mtx_unlock(&ws->mtx);
return;

Expand Down Expand Up @@ -2303,8 +2296,8 @@ ws_dialer_close(void *arg)
}
d->closed = true;
NNI_LIST_FOREACH (&d->wspend, ws) {
nni_aio_close(ws->connaio);
nni_aio_close(ws->httpaio);
nni_aio_close(&ws->connaio);
nni_aio_close(&ws->httpaio);
}
nni_mtx_unlock(&d->mtx);
}
Expand All @@ -2316,8 +2309,8 @@ ws_dial_cancel(nni_aio *aio, void *arg, int rv)

nni_mtx_lock(&ws->mtx);
if (aio == ws->useraio) {
nni_aio_abort(ws->connaio, rv);
nni_aio_abort(ws->httpaio, rv);
nni_aio_abort(&ws->connaio, rv);
nni_aio_abort(&ws->httpaio, rv);
ws->useraio = NULL;
nni_aio_finish_error(aio, rv);
}
Expand Down Expand Up @@ -2359,7 +2352,7 @@ ws_dialer_dial(void *arg, nni_aio *aio)
ws->recv_text = d->recv_text;
ws->send_text = d->send_text;
nni_list_append(&d->wspend, ws);
nni_http_client_connect(d->client, ws->connaio);
nni_http_client_connect(d->client, &ws->connaio);
nni_mtx_unlock(&d->mtx);
}

Expand Down

0 comments on commit 635be5c

Please sign in to comment.