diff --git a/include/nng/nng.h b/include/nng/nng.h index 1b3eae207..f41afb8e6 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -818,9 +818,18 @@ NNG_DECL nng_listener nng_pipe_listener(nng_pipe); // low order 16 bits will be set. This is provided in native byte order, // which makes it more convenient than using the NNG_OPT_LOCADDR option. #define NNG_OPT_TCP_BOUND_PORT "tcp-bound-port" + +// UDP options. + // UDP alias for convenience uses the same value #define NNG_OPT_UDP_BOUND_PORT NNG_OPT_TCP_BOUND_PORT +// UDP short message size. Messages smaller than (or equal to) this +// will be copied, instead of loan up. This can allow for a faster +// pass up as we can allocate smaller message buffers instead of having +// to replace a full message buffer. +#define NNG_OPT_UDP_COPY_MAX "udp:copy-max" + // IPC options. These will largely vary depending on the platform, // as POSIX systems have very different options than Windows. diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c index 10e403978..b55fcdb16 100644 --- a/src/sp/transport/udp/udp.c +++ b/src/sp/transport/udp/udp.c @@ -66,6 +66,10 @@ typedef enum udp_disc_reason { #define NNG_UDP_RECVMAX 65000 // largest permitted by spec #endif +#ifndef NNG_UDP_COPYMAX // threshold for copying instead of loan up +#define NNG_UDP_COPYMAX 1024 +#endif + #ifndef NNG_UDP_REFRESH #define NNG_UDP_REFRESH (5 * NNI_SECOND) #endif @@ -213,12 +217,11 @@ struct udp_ep { nng_duration refresh; // refresh interval for connections in seconds udp_sp_msg rx_msg; // contains the received message header uint16_t rcvmax; // max payload, trimmed to uint16_t - uint16_t short_msg; + uint16_t copymax; udp_txring tx_ring; nni_time next_wake; nni_aio_completions complq; -#ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; nni_stat_item st_rcv_reorder; nni_stat_item st_rcv_toobig; @@ -229,7 +232,7 @@ struct udp_ep { nni_stat_item st_snd_toobig; nni_stat_item st_snd_nobuf; nni_stat_item st_peer_inactive; -#endif + nni_stat_item st_copy_max; }; static void udp_ep_hold(udp_ep *ep); @@ -253,8 +256,10 @@ udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa) NNI_FREE_STRUCT(p); return (rv); } + udp_ep_hold(ep); now = nni_clock(); nni_aio_list_init(&p->rx_aios); + nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN); p->ep = ep; p->dialer = ep->dialer; p->self_seq = nni_random(); @@ -266,8 +271,6 @@ udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa) p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p)); p->rcvmax = ep->rcvmax; *pp = p; - nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN); - udp_ep_hold(ep); return (0); } @@ -325,6 +328,12 @@ udp_pipe_destroy(udp_pipe *p) { nng_msg *m; + if (p->self_id != 0) { + nni_id_remove(&p->ep->pipes, p->self_id); + p->self_id = 0; + } + nni_list_node_remove(&p->node); + // call with ep->mtx lock held while (!nni_lmq_empty(&p->rx_mq)) { nni_lmq_get(&p->rx_mq, &m); @@ -343,8 +352,6 @@ udp_pipe_fini(void *arg) udp_ep *ep = p->ep; nni_mtx_lock(&ep->mtx); - nni_id_remove(&ep->pipes, p->self_id); - udp_pipe_destroy(p); udp_ep_rele(ep); // releases lock } @@ -605,8 +612,7 @@ udp_recv_disc(udp_ep *ep, udp_sp_disc *disc, nng_sockaddr *sa) // For now we aren't validating the sequence numbers. // This allows for an out of order DISC to cause the // connection to be dropped, but it should self heal. - p->closed = true; - p->self_id = 0; // prevent it from being identified later + p->closed = true; while ((aio = nni_list_first(&p->rx_aios)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); @@ -641,6 +647,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) } if (p->peer_id == 0) { // connection isn't formed yet ... send another CREQ + nni_stat_inc(&ep->st_rcv_nomatch, 1); udp_send_creq(ep, p); return; } @@ -675,7 +682,7 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) } // Short message, just alloc and copy - if (len <= ep->short_msg) { + if (len <= ep->copymax) { nni_stat_inc(&ep->st_rcv_copy, 1); if (nng_msg_alloc(&msg, len) != 0) { if (p->npipe != NULL) { @@ -684,9 +691,9 @@ udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) return; } nni_msg_set_address(msg, sa); - nni_msg_clear(msg); - nni_msg_append(msg, nni_msg_body(ep->rx_payload), len); + memcpy(nni_msg_body(msg), nni_msg_body(ep->rx_payload), len); nni_lmq_put(&p->rx_mq, msg); + nni_msg_realloc(ep->rx_payload, ep->rcvmax); } else { nni_stat_inc(&ep->st_rcv_nocopy, 1); // Message size larger than copy break, do zero copy @@ -998,7 +1005,8 @@ udp_pipe_send(void *arg, nni_aio *aio) // Just queue it, or fail it. udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg); nni_mtx_unlock(&ep->mtx); - nni_aio_finish(aio, 0, dreq.us_length); + nni_aio_finish( + aio, 0, msg ? nni_msg_len(msg) + nni_msg_header_len(msg) : 0); } static void @@ -1190,21 +1198,22 @@ udp_ep_close(void *arg) // close all pipes uint32_t cursor = 0; - while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) { - p->closed = true; + uint64_t id; + + // first we grab the connpipes that are not closed upstream + while ((p = nni_list_first(&ep->connpipes)) != NULL) { + udp_pipe_destroy(p); + ep->refcnt--; + } + while (nni_id_visit(&ep->pipes, &id, (void **) &p, &cursor)) { if (p->peer_id != 0) { udp_send_disc(ep, p, DISC_CLOSED); } + p->closed = true; while ((aio = nni_list_first(&p->rx_aios)) != NULL) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } - if (p->npipe == NULL) { - nni_list_remove(&ep->connpipes, p); - nni_id_remove(&ep->pipes, p->self_id); - udp_pipe_destroy(p); - ep->refcnt--; - } } nni_aio_close(&ep->resaio); nni_mtx_unlock(&ep->mtx); @@ -1233,28 +1242,29 @@ udp_timer_cb(void *arg) ep->next_wake = NNI_TIME_NEVER; while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) { - if (p->closed) { - if (p->npipe == NULL) { - // pipe closed, but we have to clean it up - // ourselves - nni_id_remove(&ep->pipes, p->self_id); - udp_pipe_destroy(p); - ep->refcnt--; - } - continue; - } - if (now > p->expire) { char buf[128]; nni_aio *aio; nng_log_info("NNG-UDP-INACTIVE", "Pipe peer %s timed out due to inactivity", nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf))); + + // Possibly alert the dialer, so it can restart a new + // attempt. if ((ep->dialer) && (p->peer_id == 0) && (aio = nni_list_first(&ep->connaios))) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ETIMEDOUT); } + + // If we're still on the connect list, then we need + // take responsibility for cleaning this up. + if (nni_list_node_active(&p->node)) { + udp_pipe_destroy(p); + ep->refcnt--; + continue; + } + // This will probably not be received by the peer, // since we aren't getting anything from them. But // having it on the wire may help debugging later. @@ -1312,6 +1322,7 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock) ep->url = url; ep->refresh = NNG_UDP_REFRESH; // one minute by default ep->rcvmax = NNG_UDP_RECVMAX; + ep->copymax = NNG_UDP_COPYMAX; ep->refcnt = 1; if ((rv = nni_msg_alloc(&ep->rx_payload, ep->rcvmax + sizeof(ep->rx_msg)) != 0)) { @@ -1331,7 +1342,6 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock) nni_aio_init(&ep->resaio, udp_resolv_cb, ep); nni_aio_completions_init(&ep->complq); -#ifdef NNG_ENABLE_STATS static const nni_stat_info rcv_max_info = { .si_name = "rcv_max", .si_desc = "maximum receive size", @@ -1402,8 +1412,16 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock) .si_unit = NNG_UNIT_EVENTS, .si_atomic = true, }; + static const nni_stat_info copy_max_info = { + .si_name = "rcv_copy_max", + .si_desc = "threshold to copy instead of loan-up", + .si_type = NNG_STAT_LEVEL, + .si_unit = NNG_UNIT_BYTES, + .si_atomic = true, + }; nni_stat_init(&ep->st_rcv_max, &rcv_max_info); + nni_stat_init(&ep->st_copy_max, ©_max_info); nni_stat_init(&ep->st_rcv_copy, &rcv_copy_info); nni_stat_init(&ep->st_rcv_nocopy, &rcv_nocopy_info); nni_stat_init(&ep->st_rcv_reorder, &rcv_reorder_info); @@ -1413,7 +1431,8 @@ udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock) nni_stat_init(&ep->st_snd_toobig, &snd_toobig_info); nni_stat_init(&ep->st_snd_nobuf, &snd_nobuf_info); nni_stat_init(&ep->st_peer_inactive, &peer_inactive_info); -#endif + + nni_stat_set_value(&ep->st_rcv_max, ep->rcvmax); // schedule our timer callback - forever for now // adjusted automatically as we add pipes or other @@ -1459,9 +1478,7 @@ udp_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) return (rv); } -#ifdef NNG_ENABLE_STATS nni_dialer_add_stat(ndialer, &ep->st_rcv_max); -#endif *dp = ep; return (0); } @@ -1485,9 +1502,7 @@ udp_listener_init(void **lp, nng_url *url, nni_listener *nlistener) } ep->self_sa = sa; -#ifdef NNG_ENABLE_STATS nni_listener_add_stat(nlistener, &ep->st_rcv_max); -#endif *lp = ep; return (0); @@ -1730,6 +1745,9 @@ udp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) size_t val; int rv; if ((rv = nni_copyin_size(&val, v, sz, 0, 65000, t)) == 0) { + if ((val == 0) || (val > 65000)) { + val = 65000; + } nni_mtx_lock(&ep->mtx); if (ep->started) { nni_mtx_unlock(&ep->mtx); @@ -1737,9 +1755,38 @@ udp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) } ep->rcvmax = (uint16_t) val; nni_mtx_unlock(&ep->mtx); -#ifdef NNG_ENABLE_STATS nni_stat_set_value(&ep->st_rcv_max, val); -#endif + } + return (rv); +} + +static int +udp_ep_get_copymax(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->copymax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +udp_ep_set_copymax(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + udp_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, 65000, t)) == 0) { + nni_mtx_lock(&ep->mtx); + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } + ep->copymax = (uint16_t) val; + nni_mtx_unlock(&ep->mtx); + nni_stat_set_value(&ep->st_copy_max, val); } return (rv); } @@ -1834,6 +1881,11 @@ static const nni_option udp_ep_opts[] = { .o_get = udp_ep_get_recvmaxsz, .o_set = udp_ep_set_recvmaxsz, }, + { + .o_name = NNG_OPT_UDP_COPY_MAX, + .o_get = udp_ep_get_copymax, + .o_set = udp_ep_set_copymax, + }, { .o_name = NNG_OPT_URL, .o_get = udp_ep_get_url, diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c index b99c5af19..c2b515e8d 100644 --- a/src/sp/transport/udp/udp_tran_test.c +++ b/src/sp/transport/udp/udp_tran_test.c @@ -143,6 +143,8 @@ test_udp_recv_max(void) NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz)); NUTS_TRUE(sz == 200); NUTS_PASS(nng_listener_set_size(l, NNG_OPT_RECVMAXSZ, 100)); + NUTS_PASS(nng_listener_get_size(l, NNG_OPT_RECVMAXSZ, &sz)); + NUTS_TRUE(sz == 100); NUTS_PASS(nng_listener_start(l, 0)); NUTS_OPEN(s1); @@ -158,6 +160,41 @@ test_udp_recv_max(void) NUTS_CLOSE(s1); } +void +test_udp_recv_copy(void) +{ + char msg[256]; + char buf[256]; + nng_socket s0; + nng_socket s1; + nng_listener l; + size_t sz; + char *addr; + + NUTS_ADDR(addr, "udp"); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_listener_create(&l, s0, addr)); + NUTS_PASS(nng_listener_set_size(l, NNG_OPT_UDP_COPY_MAX, 100)); + NUTS_PASS(nng_listener_get_size(l, NNG_OPT_UDP_COPY_MAX, &sz)); + NUTS_TRUE(sz == 100); + NUTS_PASS(nng_listener_start(l, 0)); + + NUTS_OPEN(s1); + NUTS_PASS(nng_dial(s1, addr, NULL, 0)); + nng_msleep(100); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_send(s1, msg, 95, 0)); + NUTS_PASS(nng_recv(s0, buf, &sz, 0)); + NUTS_TRUE(sz == 95); + NUTS_PASS(nng_send(s1, msg, 150, 0)); + NUTS_PASS(nng_recv(s0, buf, &sz, 0)); + NUTS_TRUE(sz == 150); + NUTS_CLOSE(s0); + NUTS_CLOSE(s1); +} + NUTS_TESTS = { { "udp wild card connect fail", test_udp_wild_card_connect_fail }, @@ -167,5 +204,6 @@ NUTS_TESTS = { { "udp non-local address", test_udp_non_local_address }, { "udp malformed address", test_udp_malformed_address }, { "udp recv max", test_udp_recv_max }, + { "udp recv copy", test_udp_recv_copy }, { NULL, NULL }, };