diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c index 9a00974a9..59b5e8a65 100644 --- a/src/sp/transport/udp/udp.c +++ b/src/sp/transport/udp/udp.c @@ -16,8 +16,6 @@ #include "core/platform.h" #include "nng/nng.h" -#include -#include #include // Experimental UDP transport. Unicast only. @@ -187,39 +185,40 @@ struct udp_pipe { }; struct udp_ep { - nni_udp *udp; - nni_mtx mtx; - uint16_t proto; - uint16_t af; // address family - bool fini; - bool started; - bool closed; - bool cooldown; - nng_url *url; - const char *host; // for dialers - int refcnt; // active pipes - nni_aio *useraio; - nni_aio *connaio; - nni_aio timeaio; - nni_aio resaio; - bool dialer; - bool tx_busy; // true if tx pending - nni_msg *rx_payload; // current receive message - nng_sockaddr rx_sa; // addr for last message - nni_aio tx_aio; // aio for TX handling - nni_aio rx_aio; // aio for RX handling - nni_id_map pipes; // pipes (indexed by id) - nni_sockaddr self_sa; // our address - nni_sockaddr peer_sa; // peer address, only for dialer; - nni_sockaddr mesh_sa; // mesh source address (ours) - nni_list connaios; // aios from accept waiting for a client peer - nni_list connpipes; // pipes waiting to be connected - nng_duration refresh; // refresh interval for connections in seconds - udp_sp_msg rx_msg; // contains the received message header - uint16_t rcvmax; // max payload, trimmed to uint16_t - uint16_t copymax; - udp_txring tx_ring; - nni_time next_wake; + nni_udp *udp; + nni_mtx mtx; + uint16_t proto; + uint16_t af; // address family + bool fini; + bool started; + bool closed; + bool cooldown; + nng_url *url; + const char *host; // for dialers + nni_aio *useraio; + nni_aio *connaio; + nni_aio timeaio; + nni_aio resaio; + bool dialer; + bool tx_busy; // true if tx pending + nni_listener *nlistener; + nni_dialer *ndialer; + nni_msg *rx_payload; // current receive message + nng_sockaddr rx_sa; // addr for last message + nni_aio tx_aio; // aio for TX handling + nni_aio rx_aio; // aio for RX handling + nni_id_map pipes; // pipes (indexed by id) + nni_sockaddr self_sa; // our address + nni_sockaddr peer_sa; // peer address, only for dialer; + nni_sockaddr mesh_sa; // mesh source address (ours) + nni_list connaios; // aios from accept waiting for a client peer + nni_list connpipes; // pipes waiting to be connected + nng_duration refresh; // refresh interval for connections in seconds + udp_sp_msg rx_msg; // contains the received message header + uint16_t rcvmax; // max payload, trimmed to uint16_t + uint16_t copymax; + udp_txring tx_ring; + nni_time next_wake; nni_aio_completions complq; nni_stat_item st_rcv_max; @@ -235,45 +234,10 @@ struct udp_ep { nni_stat_item st_copy_max; }; -static void udp_ep_hold(udp_ep *ep); -static void udp_ep_rele(udp_ep *ep); -static void udp_ep_fini(void *); static void udp_ep_start(udp_ep *); -static void udp_pipe_fini(void *); static void udp_resolv_cb(void *); static void udp_rx_cb(void *); -static int -udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa) -{ - udp_pipe *p; - int rv; - nni_time now; - if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { - return (NNG_ENOMEM); - } - if ((rv = nni_id_alloc32(&ep->pipes, &p->self_id, p)) != 0) { - NNI_FREE_STRUCT(p); - return (rv); - } - udp_ep_hold(ep); - now = nni_clock(); - nni_aio_list_init(&p->rx_aios); - nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN); - p->ep = ep; - p->dialer = ep->dialer; - p->self_seq = nni_random(); - p->peer_id = peer_id; - p->proto = ep->proto; - p->peer_addr = *sa; - p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh; - p->next_wake = now + UDP_PIPE_REFRESH(p); - p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p)); - p->rcvmax = ep->rcvmax; - *pp = p; - return (0); -} - static void udp_recv_data( udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa); static void udp_send_disc_full(udp_ep *ep, nng_sockaddr *sa, uint32_t local_id, @@ -311,7 +275,18 @@ udp_pipe_close(void *arg) static void udp_pipe_stop(void *arg) { + udp_pipe *p = arg; + udp_ep *ep = p->ep; + udp_pipe_close(arg); + + nni_mtx_lock(&ep->mtx); + if (p->self_id != 0) { + nni_id_remove(&p->ep->pipes, p->self_id); + p->self_id = 0; + } + nni_list_node_remove(&p->node); + nni_mtx_unlock(&ep->mtx); } static int @@ -319,20 +294,31 @@ udp_pipe_init(void *arg, nni_pipe *npipe) { udp_pipe *p = arg; p->npipe = npipe; - + nni_aio_list_init(&p->rx_aios); + nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN); return (0); } -static void -udp_pipe_destroy(udp_pipe *p) +static int +udp_pipe_start(udp_pipe *p, udp_ep *ep, nng_sockaddr *sa) { - nng_msg *m; + nni_time now = nni_clock(); + p->ep = ep; + p->proto = ep->proto; + p->peer_addr = *sa; + p->dialer = ep->dialer; + p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh; + p->rcvmax = ep->rcvmax; + p->self_seq = nni_random(); + p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p)); + return (nni_id_alloc32(&ep->pipes, &p->self_id, p)); +} - if (p->self_id != 0) { - nni_id_remove(&p->ep->pipes, p->self_id); - p->self_id = 0; - } - nni_list_node_remove(&p->node); +static void +udp_pipe_fini(void *arg) +{ + udp_pipe *p = arg; + nng_msg *m; // call with ep->mtx lock held while (!nni_lmq_empty(&p->rx_mq)) { @@ -341,19 +327,6 @@ udp_pipe_destroy(udp_pipe *p) } nni_lmq_fini(&p->rx_mq); NNI_ASSERT(nni_list_empty(&p->rx_aios)); - - NNI_FREE_STRUCT(p); -} - -static void -udp_pipe_fini(void *arg) -{ - udp_pipe *p = arg; - udp_ep *ep = p->ep; - - nni_mtx_lock(&ep->mtx); - udp_pipe_destroy(p); - udp_ep_rele(ep); // releases lock } // Find the pipe matching the given id (our pipe id, taken from the peer_id @@ -784,12 +757,18 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) return; } - if (udp_pipe_alloc(&p, ep, creq->us_peer_id, sa) != 0) { + if (nni_pipe_alloc_listener((void **) &p, ep->nlistener) != 0) { + udp_send_disc_full( + ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF); + return; + } + if (udp_pipe_start(p, ep, sa) != 0) { udp_send_disc_full( ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF); + nni_pipe_close(p->npipe); return; } - p->refresh = ep->refresh; + if ((creq->us_refresh * NNI_SECOND) < p->refresh) { p->refresh = (creq->us_refresh * NNI_SECOND); } @@ -798,7 +777,6 @@ udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) p->peer_seq = creq->us_sequence + 1; p->sndmax = creq->us_recv_max; p->next_wake = now + UDP_PIPE_REFRESH(p); - p->expire = now + UDP_PIPE_TIMEOUT(p); udp_pipe_schedule(p); nni_list_append(&ep->connpipes, p); @@ -1119,33 +1097,29 @@ udp_pipe_getopt( return (rv); } -// udp_ep_hold simply bumps the reference count. -// This needs to be done with the lock for the EP held. static void -udp_ep_hold(udp_ep *ep) +udp_ep_fini(void *arg) { - ep->refcnt++; -} + udp_ep *ep = arg; -// udp_ep_rele drops the reference count on the endpoint. -// If the endpoint drops to zero, the EP is freed. It also -// unlocks the mutex, which must be held when calling this. -static void -udp_ep_rele(udp_ep *ep) -{ - nni_aio *aio; - NNI_ASSERT(ep->refcnt > 0); - ep->refcnt--; - if (!ep->fini || ep->refcnt > 0) { + // We optionally linger a little bit (up to a half second) + // so that the disconnect messages can get pushed out. On + // most systems this should only take a single millisecond. + nni_time linger = + nni_clock() + NNI_SECOND / 2; // half second to drain, max + nni_mtx_lock(&ep->mtx); + ep->fini = true; + while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) { nni_mtx_unlock(&ep->mtx); - return; + nng_msleep(1); + nni_mtx_lock(&ep->mtx); } - while ((aio = nni_list_first(&ep->connaios)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); + if (ep->tx_ring.count > 0) { + nng_log_warn("NNG-UDP-LINGER", + "Lingering timed out on endpoint close, peer " + "notifications dropped"); } nni_mtx_unlock(&ep->mtx); - nni_aio_close(&ep->timeaio); nni_aio_close(&ep->resaio); nni_aio_close(&ep->tx_aio); @@ -1165,41 +1139,13 @@ udp_ep_rele(udp_ep *ep) nni_msg_free(ep->rx_payload); // safe even if msg is null nni_id_map_fini(&ep->pipes); NNI_FREE_STRUCTS(ep->tx_ring.descs, ep->tx_ring.size); - NNI_FREE_STRUCT(ep); -} - -static void -udp_ep_fini(void *arg) -{ - udp_ep *ep = arg; - - // We optionally linger a little bit (up to a half second) - // so that the disconnect messages can get pushed out. On - // most systems this should only take a single millisecond. - nni_time linger = - nni_clock() + NNI_SECOND / 2; // half second to drain, max - nni_mtx_lock(&ep->mtx); - ep->fini = true; - while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) { - NNI_ASSERT(ep->refcnt > 0); - nni_mtx_unlock(&ep->mtx); - nng_msleep(1); - nni_mtx_lock(&ep->mtx); - } - if (ep->tx_ring.count > 0) { - nng_log_warn("NNG-UDP-LINGER", - "Lingering timed out on endpoint close, peer " - "notifications dropped"); - } - udp_ep_rele(ep); // drops the lock } static void udp_ep_close(void *arg) { - udp_ep *ep = arg; - udp_pipe *p; - nni_aio *aio; + udp_ep *ep = arg; + nni_aio *aio; nni_mtx_lock(&ep->mtx); while ((aio = nni_list_first(&ep->connaios)) != NULL) { @@ -1207,25 +1153,6 @@ udp_ep_close(void *arg) nni_aio_finish_error(aio, NNG_ECONNABORTED); } - // close all pipes - uint32_t cursor = 0; - uint64_t id; - - // first we grab the connpipes that are not closed upstream - while ((p = nni_list_first(&ep->connpipes)) != NULL) { - udp_pipe_destroy(p); - ep->refcnt--; - } - while (nni_id_visit(&ep->pipes, &id, (void **) &p, &cursor)) { - if (p->peer_id != 0) { - udp_send_disc(ep, p, DISC_CLOSED); - } - p->closed = true; - while ((aio = nni_list_first(&p->rx_aios)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - } nni_aio_close(&ep->resaio); nni_mtx_unlock(&ep->mtx); } @@ -1271,8 +1198,7 @@ udp_timer_cb(void *arg) // If we're still on the connect list, then we need // take responsibility for cleaning this up. if (nni_list_node_active(&p->node)) { - udp_pipe_destroy(p); - ep->refcnt--; + nni_pipe_close(p->npipe); continue; } @@ -1300,15 +1226,21 @@ udp_timer_cb(void *arg) } static int -udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer, - nni_listener *listener) +udp_ep_init( + udp_ep *ep, nng_url *url, nni_sock *sock, nni_dialer *d, nni_listener *l) { - udp_ep *ep; - int rv; + int rv; - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } + nni_mtx_init(&ep->mtx); + nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true); + NNI_LIST_INIT(&ep->connpipes, udp_pipe, node); + nni_aio_list_init(&ep->connaios); + + nni_aio_init(&ep->rx_aio, udp_rx_cb, ep); + nni_aio_init(&ep->tx_aio, udp_tx_cb, ep); + nni_aio_init(&ep->timeaio, udp_timer_cb, ep); + nni_aio_init(&ep->resaio, udp_resolv_cb, ep); + nni_aio_completions_init(&ep->complq); ep->tx_ring.descs = NNI_ALLOC_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN); @@ -1325,7 +1257,6 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer, } else if (strcmp(url->u_scheme, "udp6") == 0) { ep->af = NNG_AF_INET6; } else { - NNI_FREE_STRUCT(ep); return (NNG_EADDRINVAL); } @@ -1335,25 +1266,12 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer, ep->refresh = NNG_UDP_REFRESH; // one minute by default ep->rcvmax = NNG_UDP_RECVMAX; ep->copymax = NNG_UDP_COPYMAX; - ep->refcnt = 1; if ((rv = nni_msg_alloc(&ep->rx_payload, ep->rcvmax + sizeof(ep->rx_msg)) != 0)) { NNI_FREE_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN); - NNI_FREE_STRUCT(ep); return (rv); } - nni_mtx_init(&ep->mtx); - nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true); - NNI_LIST_INIT(&ep->connpipes, udp_pipe, node); - nni_aio_list_init(&ep->connaios); - - nni_aio_init(&ep->rx_aio, udp_rx_cb, ep); - nni_aio_init(&ep->tx_aio, udp_tx_cb, ep); - nni_aio_init(&ep->timeaio, udp_timer_cb, ep); - nni_aio_init(&ep->resaio, udp_resolv_cb, ep); - nni_aio_completions_init(&ep->complq); - NNI_STAT_LOCK(rcv_max_info, "rcv_max", "maximum receive size", NNG_STAT_LEVEL, NNG_UNIT_BYTES); NNI_STAT_LOCK(copy_max_info, "copy_max", @@ -1399,28 +1317,31 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer, nni_stat_init_lock( &ep->st_peer_inactive, &peer_inactive_info, &ep->mtx); - if (listener) { - nni_listener_add_stat(listener, &ep->st_rcv_max); - nni_listener_add_stat(listener, &ep->st_copy_max); - nni_listener_add_stat(listener, &ep->st_rcv_copy); - nni_listener_add_stat(listener, &ep->st_rcv_nocopy); - nni_listener_add_stat(listener, &ep->st_rcv_reorder); - nni_listener_add_stat(listener, &ep->st_rcv_toobig); - nni_listener_add_stat(listener, &ep->st_rcv_nomatch); - nni_listener_add_stat(listener, &ep->st_rcv_nobuf); - nni_listener_add_stat(listener, &ep->st_snd_toobig); - nni_listener_add_stat(listener, &ep->st_snd_nobuf); - } else { - nni_dialer_add_stat(dialer, &ep->st_rcv_max); - nni_dialer_add_stat(dialer, &ep->st_copy_max); - nni_dialer_add_stat(dialer, &ep->st_rcv_copy); - nni_dialer_add_stat(dialer, &ep->st_rcv_nocopy); - nni_dialer_add_stat(dialer, &ep->st_rcv_reorder); - nni_dialer_add_stat(dialer, &ep->st_rcv_toobig); - nni_dialer_add_stat(dialer, &ep->st_rcv_nomatch); - nni_dialer_add_stat(dialer, &ep->st_rcv_nobuf); - nni_dialer_add_stat(dialer, &ep->st_snd_toobig); - nni_dialer_add_stat(dialer, &ep->st_snd_nobuf); + if (l) { + NNI_ASSERT(d == NULL); + nni_listener_add_stat(l, &ep->st_rcv_max); + nni_listener_add_stat(l, &ep->st_copy_max); + nni_listener_add_stat(l, &ep->st_rcv_copy); + nni_listener_add_stat(l, &ep->st_rcv_nocopy); + nni_listener_add_stat(l, &ep->st_rcv_reorder); + nni_listener_add_stat(l, &ep->st_rcv_toobig); + nni_listener_add_stat(l, &ep->st_rcv_nomatch); + nni_listener_add_stat(l, &ep->st_rcv_nobuf); + nni_listener_add_stat(l, &ep->st_snd_toobig); + nni_listener_add_stat(l, &ep->st_snd_nobuf); + } + if (d) { + NNI_ASSERT(l == NULL); + nni_dialer_add_stat(d, &ep->st_rcv_max); + nni_dialer_add_stat(d, &ep->st_copy_max); + nni_dialer_add_stat(d, &ep->st_rcv_copy); + nni_dialer_add_stat(d, &ep->st_rcv_nocopy); + nni_dialer_add_stat(d, &ep->st_rcv_reorder); + nni_dialer_add_stat(d, &ep->st_rcv_toobig); + nni_dialer_add_stat(d, &ep->st_rcv_nomatch); + nni_dialer_add_stat(d, &ep->st_rcv_nobuf); + nni_dialer_add_stat(d, &ep->st_snd_toobig); + nni_dialer_add_stat(d, &ep->st_snd_nobuf); } // schedule our timer callback - forever for now @@ -1428,7 +1349,6 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock, nni_dialer *dialer, // actions which require earlier wakeup. nni_sleep_aio(NNG_DURATION_INFINITE, &ep->timeaio); - *epp = ep; return (0); } @@ -1454,42 +1374,39 @@ udp_check_url(nng_url *url, bool listen) static int udp_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) { - udp_ep *ep; + udp_ep *ep = (void *) dp; int rv; nni_sock *sock = nni_dialer_sock(ndialer); - if ((rv = udp_check_url(url, false)) != 0) { + ep->ndialer = ndialer; + if ((rv = udp_ep_init(ep, url, sock, ndialer, NULL)) != 0) { return (rv); } - if ((rv = udp_ep_init(&ep, url, sock, ndialer, NULL)) != 0) { + if ((rv = udp_check_url(url, false)) != 0) { return (rv); } - *dp = ep; return (0); } static int udp_listener_init(void **lp, nng_url *url, nni_listener *nlistener) { - udp_ep *ep; - int rv; - nni_sock *sock = nni_listener_sock(nlistener); - nng_sockaddr sa; + udp_ep *ep = (void *) lp; + int rv; + nni_sock *sock = nni_listener_sock(nlistener); - // Check for invalid URL components. - if (((rv = udp_check_url(url, true)) != 0) || - ((rv = nni_url_to_address(&sa, url)) != 0)) { + ep->nlistener = nlistener; + if ((rv = udp_ep_init(ep, url, sock, NULL, nlistener)) != 0) { return (rv); } - - if ((rv = udp_ep_init(&ep, url, sock, NULL, nlistener)) != 0) { + // Check for invalid URL components. + if (((rv = udp_check_url(url, true)) != 0) || + ((rv = nni_url_to_address(&ep->self_sa, url)) != 0)) { return (rv); } - ep->self_sa = sa; - *lp = ep; return (0); } @@ -1548,11 +1465,16 @@ udp_resolv_cb(void *arg) } // places a "hold" on the ep - if ((rv = udp_pipe_alloc(&p, ep, 0, &ep->peer_sa)) != 0) { + if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) { nni_aio_list_remove(aio); nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); - return; + } + if ((rv = udp_pipe_start(p, ep, &ep->peer_sa)) != 0) { + nni_aio_list_remove(aio); + nni_pipe_close(p->npipe); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); } udp_pipe_schedule(p); @@ -1758,7 +1680,7 @@ udp_ep_match(udp_ep *ep) nni_aio_list_remove(aio); nni_list_remove(&ep->connpipes, p); - nni_aio_set_output(aio, 0, p); + nni_aio_set_output(aio, 0, p->npipe); nni_aio_finish(aio, 0, 0); } @@ -1821,6 +1743,7 @@ udp_ep_accept(void *arg, nni_aio *aio) } static nni_sp_pipe_ops udp_pipe_ops = { + .p_size = sizeof(udp_pipe), .p_init = udp_pipe_init, .p_fini = udp_pipe_fini, .p_stop = udp_pipe_stop, @@ -1897,6 +1820,7 @@ udp_listener_setopt( } static nni_sp_dialer_ops udp_dialer_ops = { + .d_size = sizeof(udp_ep), .d_init = udp_dialer_init, .d_fini = udp_ep_fini, .d_connect = udp_ep_connect, @@ -1906,6 +1830,7 @@ static nni_sp_dialer_ops udp_dialer_ops = { }; static nni_sp_listener_ops udp_listener_ops = { + .l_size = sizeof(udp_ep), .l_init = udp_listener_init, .l_fini = udp_ep_fini, .l_bind = udp_ep_bind,