diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c index 10b308d50..9f3f6d0b7 100644 --- a/src/supplemental/websocket/websocket.c +++ b/src/supplemental/websocket/websocket.c @@ -62,11 +62,11 @@ struct nni_ws { nni_list rxq; ws_frame *txframe; ws_frame *rxframe; - nni_aio *txaio; // physical aios - nni_aio *rxaio; - nni_aio *closeaio; // used for lingering/draining close - nni_aio *httpaio; - nni_aio *connaio; // connect aio + nni_aio txaio; // physical aios + nni_aio rxaio; + nni_aio closeaio; // used for lingering/draining close + nni_aio httpaio; + nni_aio connaio; // connect aio nni_aio *useraio; // user aio, during HTTP negotiation nni_http_conn *http; nni_http_req *req; @@ -519,9 +519,9 @@ ws_close_cb(void *arg) nni_ws *ws = arg; ws_frame *frame; - nni_aio_close(ws->txaio); - nni_aio_close(ws->rxaio); - nni_aio_close(ws->httpaio); + nni_aio_close(&ws->txaio); + nni_aio_close(&ws->rxaio); + nni_aio_close(&ws->httpaio); // Either we sent a close frame, or we didn't. Either way, // we are done, and its time to abort everything else. @@ -560,8 +560,8 @@ ws_close(nni_ws *ws, uint16_t code) // pending connect request. if (!ws->closed) { // ABORT connection negotiation. - nni_aio_close(ws->connaio); - nni_aio_close(ws->httpaio); + nni_aio_close(&ws->connaio); + nni_aio_close(&ws->httpaio); ws_send_close(ws, code); } } @@ -593,8 +593,8 @@ ws_start_write(nni_ws *ws) iov[1].iov_len = frame->len; iov[1].iov_buf = frame->buf; } - nni_aio_set_iov(ws->txaio, niov, iov); - nni_http_write_full(ws->http, ws->txaio); + nni_aio_set_iov(&ws->txaio, niov, iov); + nni_http_write_full(ws->http, &ws->txaio); } static void @@ -642,7 +642,7 @@ ws_write_cb(void *arg) if (ws->peer_closed) { if (ws->wclose) { // could assert this? ws->wclose = false; - nni_aio_finish(ws->closeaio, 0, 0); + nni_aio_finish(&ws->closeaio, 0, 0); } } nni_mtx_unlock(&ws->mtx); @@ -650,7 +650,7 @@ ws_write_cb(void *arg) } aio = frame->aio; - if ((rv = nni_aio_result(ws->txaio)) != 0) { + if ((rv = nni_aio_result(&ws->txaio)) != 0) { // if tx fails, we can't send a close frame either // we expect the caller to just close this connection frame->aio = NULL; @@ -718,7 +718,7 @@ ws_write_cancel(nni_aio *aio, void *arg, int rv) } frame = nni_aio_get_prov_data(aio); if (frame == ws->txframe) { - nni_aio_abort(ws->txaio, rv); + nni_aio_abort(&ws->txaio, rv); // We will wait for callback on the txaio to finish aio. } else { // If scheduled, just need to remove node and complete it. @@ -737,7 +737,7 @@ ws_send_close(nni_ws *ws, uint16_t code) ws_frame *frame; uint8_t buf[sizeof(uint16_t)]; int rv; - nni_aio *aio; + nni_aio *aio = &ws->closeaio; NNI_PUT16(buf, code); @@ -745,7 +745,6 @@ ws_send_close(nni_ws *ws, uint16_t code) return; } ws->closed = true; - aio = ws->closeaio; if (nni_aio_begin(aio) != 0) { return; @@ -818,7 +817,7 @@ ws_start_read(nni_ws *ws) frame->len = 0; ws->rxframe = frame; - aio = ws->rxaio; + aio = &ws->rxaio; iov.iov_len = 2; // We want the first two bytes. iov.iov_buf = frame->head; nni_aio_set_iov(aio, 1, &iov); @@ -999,7 +998,7 @@ ws_read_frame_cb(nni_ws *ws, ws_frame *frame) ws_close(ws, WS_CLOSE_NORMAL_CLOSE); } else { ws->wclose = false; - nni_aio_finish(ws->closeaio, 0, 0); + nni_aio_finish(&ws->closeaio, 0, 0); } return; default: @@ -1014,7 +1013,7 @@ static void ws_read_cb(void *arg) { nni_ws *ws = arg; - nni_aio *aio = ws->rxaio; + nni_aio *aio = &ws->rxaio; ws_frame *frame; nni_mtx_lock(&ws->mtx); @@ -1193,15 +1192,13 @@ ws_fini(void *arg) ws_close_error(ws, WS_CLOSE_NORMAL_CLOSE); // Give a chance for the close frame to drain. - if (ws->closeaio) { - nni_aio_wait(ws->closeaio); - } + nni_aio_wait(&ws->closeaio); - nni_aio_stop(ws->rxaio); - nni_aio_stop(ws->txaio); - nni_aio_stop(ws->closeaio); - nni_aio_stop(ws->httpaio); - nni_aio_stop(ws->connaio); + nni_aio_stop(&ws->rxaio); + nni_aio_stop(&ws->txaio); + nni_aio_stop(&ws->closeaio); + nni_aio_stop(&ws->httpaio); + nni_aio_stop(&ws->connaio); if (nni_list_node_active(&ws->node)) { nni_ws_dialer *d; @@ -1252,11 +1249,11 @@ ws_fini(void *arg) nni_strfree(ws->reqhdrs); nni_strfree(ws->reshdrs); - nni_aio_free(ws->rxaio); - nni_aio_free(ws->txaio); - nni_aio_free(ws->closeaio); - nni_aio_free(ws->httpaio); - nni_aio_free(ws->connaio); + nni_aio_fini(&ws->rxaio); + nni_aio_fini(&ws->txaio); + nni_aio_fini(&ws->closeaio); + nni_aio_fini(&ws->httpaio); + nni_aio_fini(&ws->connaio); nni_mtx_fini(&ws->mtx); NNI_FREE_STRUCT(ws); } @@ -1335,7 +1332,7 @@ ws_http_cb_dialer(nni_ws *ws, nni_aio *aio) if ((rv = nni_http_res_alloc(&ws->res)) != 0) { goto err; } - nni_http_read_res(ws->http, ws->res, ws->httpaio); + nni_http_read_res(ws->http, ws->res, &ws->httpaio); nni_mtx_unlock(&d->mtx); return; } @@ -1419,7 +1416,7 @@ static void ws_http_cb(void *arg) { nni_ws *ws = arg; - nni_aio *aio = ws->httpaio; + nni_aio *aio = &ws->httpaio; if (ws->server) { ws_http_cb_listener(ws, aio); @@ -1432,7 +1429,6 @@ static int ws_init(nni_ws **wsp) { nni_ws *ws; - int rv; if ((ws = NNI_ALLOC_STRUCT(ws)) == NULL) { return (NNG_ENOMEM); @@ -1443,17 +1439,14 @@ ws_init(nni_ws **wsp) nni_aio_list_init(&ws->sendq); nni_aio_list_init(&ws->recvq); - if (((rv = nni_aio_alloc(&ws->closeaio, ws_close_cb, ws)) != 0) || - ((rv = nni_aio_alloc(&ws->txaio, ws_write_cb, ws)) != 0) || - ((rv = nni_aio_alloc(&ws->rxaio, ws_read_cb, ws)) != 0) || - ((rv = nni_aio_alloc(&ws->httpaio, ws_http_cb, ws)) != 0) || - ((rv = nni_aio_alloc(&ws->connaio, ws_conn_cb, ws)) != 0)) { - ws_fini(ws); - return (rv); - } + nni_aio_init(&ws->closeaio, ws_close_cb, ws); + nni_aio_init(&ws->txaio, ws_write_cb, ws); + nni_aio_init(&ws->rxaio, ws_read_cb, ws); + nni_aio_init(&ws->httpaio, ws_http_cb, ws); + nni_aio_init(&ws->connaio, ws_conn_cb, ws); - nni_aio_set_timeout(ws->closeaio, 100); - nni_aio_set_timeout(ws->httpaio, 2000); + nni_aio_set_timeout(&ws->closeaio, 100); + nni_aio_set_timeout(&ws->httpaio, 2000); ws->ops.s_close = ws_str_close; ws->ops.s_free = ws_str_free; @@ -1676,7 +1669,7 @@ ws_handler(nni_aio *aio) ws->listener = l; nni_list_append(&l->reply, ws); - nni_http_write_res(conn, res, ws->httpaio); + nni_http_write_res(conn, res, &ws->httpaio); (void) nni_http_hijack(conn); nni_aio_set_output(aio, 0, NULL); nni_aio_finish(aio, 0, 0); @@ -2181,7 +2174,7 @@ ws_conn_cb(void *arg) ws = arg; d = ws->dialer; - if ((rv = nni_aio_result(ws->connaio)) != 0) { + if ((rv = nni_aio_result(&ws->connaio)) != 0) { nni_mtx_lock(&ws->mtx); if ((uaio = ws->useraio) != NULL) { ws->useraio = NULL; @@ -2205,8 +2198,8 @@ ws_conn_cb(void *arg) nni_mtx_lock(&ws->mtx); uaio = ws->useraio; - http = nni_aio_get_output(ws->connaio, 0); - nni_aio_set_output(ws->connaio, 0, NULL); + http = nni_aio_get_output(&ws->connaio, 0); + nni_aio_set_output(&ws->connaio, 0, NULL); if (uaio == NULL) { // This request was canceled for some reason. nni_http_conn_fini(http); @@ -2245,7 +2238,7 @@ ws_conn_cb(void *arg) ws->http = http; ws->req = req; - nni_http_write_req(http, req, ws->httpaio); + nni_http_write_req(http, req, &ws->httpaio); nni_mtx_unlock(&ws->mtx); return; @@ -2303,8 +2296,8 @@ ws_dialer_close(void *arg) } d->closed = true; NNI_LIST_FOREACH (&d->wspend, ws) { - nni_aio_close(ws->connaio); - nni_aio_close(ws->httpaio); + nni_aio_close(&ws->connaio); + nni_aio_close(&ws->httpaio); } nni_mtx_unlock(&d->mtx); } @@ -2316,8 +2309,8 @@ ws_dial_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_lock(&ws->mtx); if (aio == ws->useraio) { - nni_aio_abort(ws->connaio, rv); - nni_aio_abort(ws->httpaio, rv); + nni_aio_abort(&ws->connaio, rv); + nni_aio_abort(&ws->httpaio, rv); ws->useraio = NULL; nni_aio_finish_error(aio, rv); } @@ -2359,7 +2352,7 @@ ws_dialer_dial(void *arg, nni_aio *aio) ws->recv_text = d->recv_text; ws->send_text = d->send_text; nni_list_append(&d->wspend, ws); - nni_http_client_connect(d->client, ws->connaio); + nni_http_client_connect(d->client, &ws->connaio); nni_mtx_unlock(&d->mtx); }