diff --git a/cmake/NNGOptions.cmake b/cmake/NNGOptions.cmake index 3a764b8b1..d117ee36d 100644 --- a/cmake/NNGOptions.cmake +++ b/cmake/NNGOptions.cmake @@ -135,6 +135,9 @@ mark_as_advanced(NNG_TRANSPORT_WSS) option (NNG_TRANSPORT_FDC "Enable File Descriptor transport (EXPERIMENTAL)" ON) mark_as_advanced(NNG_TRANSPORT_FDC) +option (NNG_TRANSPORT_UDP "Enable UDP transport (EXPERIMENTAL)" ON) +mark_as_advanced(NNG_TRANSPORT_UDP) + # ZeroTier option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) mark_as_advanced(NNG_TRANSPORT_ZEROTIER) diff --git a/src/core/aio.c b/src/core/aio.c index 2f548d745..30a963f9e 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -8,6 +8,8 @@ // found online at https://opensource.org/licenses/MIT. // +#include "core/defs.h" +#include "nng/nng.h" #include "nng_impl.h" #include @@ -764,7 +766,16 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) ms = aio->a_timeout; } } - aio->a_expire = nni_clock() + ms; + switch (ms) { + case NNG_DURATION_INFINITE: + case NNG_DURATION_DEFAULT: + // infinite sleep + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + ms; + break; + } if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { nni_aio_finish_error(aio, rv); diff --git a/src/core/idhash.c b/src/core/idhash.c index 0ee86f0dc..4ee77f6c1 100644 --- a/src/core/idhash.c +++ b/src/core/idhash.c @@ -389,3 +389,9 @@ nni_id_visit(nni_id_map *m, uint64_t *keyp, void **valp, uint32_t *cursor) *cursor = index; return (false); } + +uint32_t +nni_id_count(const nni_id_map *m) +{ + return (m->id_count); +} diff --git a/src/core/idhash.h b/src/core/idhash.h index ea91b6be8..826016bda 100644 --- a/src/core/idhash.h +++ b/src/core/idhash.h @@ -46,15 +46,16 @@ struct nni_id_map { #define NNI_ID_FLAG_RANDOM 2 // start at a random value #define NNI_ID_FLAG_REGISTER 4 // map is registered for finalization -extern void nni_id_map_init(nni_id_map *, uint64_t, uint64_t, bool); -extern void nni_id_map_fini(nni_id_map *); -extern void *nni_id_get(nni_id_map *, uint64_t); -extern int nni_id_set(nni_id_map *, uint64_t, void *); -extern int nni_id_alloc(nni_id_map *, uint64_t *, void *); -extern int nni_id_alloc32(nni_id_map *, uint32_t *, void *); -extern int nni_id_remove(nni_id_map *, uint64_t); -extern void nni_id_map_sys_fini(void); -extern bool nni_id_visit(nni_id_map *, uint64_t *, void **, uint32_t *); +extern void nni_id_map_init(nni_id_map *, uint64_t, uint64_t, bool); +extern void nni_id_map_fini(nni_id_map *); +extern void *nni_id_get(nni_id_map *, uint64_t); +extern int nni_id_set(nni_id_map *, uint64_t, void *); +extern int nni_id_alloc(nni_id_map *, uint64_t *, void *); +extern int nni_id_alloc32(nni_id_map *, uint32_t *, void *); +extern int nni_id_remove(nni_id_map *, uint64_t); +extern void nni_id_map_sys_fini(void); +extern bool nni_id_visit(nni_id_map *, uint64_t *, void **, uint32_t *); +extern uint32_t nni_id_count(const nni_id_map *); #define NNI_ID_MAP_INITIALIZER(min, max, flags) \ { \ diff --git a/src/core/message.c b/src/core/message.c index 3da93ac67..7a644d79c 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -29,6 +29,7 @@ struct nng_msg { nni_chunk m_body; uint32_t m_pipe; // set on receive nni_atomic_int m_refcnt; + nng_sockaddr m_addr; // set on receive, transport use }; #if 0 @@ -544,6 +545,16 @@ nni_msg_chop(nni_msg *m, size_t len) return (nni_chunk_chop(&m->m_body, len)); } +// Grow the message header, but don't put anything there. +// This is useful for setting up to receive directly into it +// for zero copy purposes. +void +nni_msg_header_extend(nni_msg *m, size_t len) +{ + NNI_ASSERT((len + m->m_header_len) <= sizeof(m->m_header_buf)); + m->m_header_len += len; +} + int nni_msg_header_append(nni_msg *m, const void *data, size_t len) { @@ -656,3 +667,15 @@ nni_msg_get_pipe(const nni_msg *m) { return (m->m_pipe); } + +const nng_sockaddr * +nni_msg_address(const nni_msg *msg) +{ + return (&msg->m_addr); +} + +void +nni_msg_set_address(nng_msg *msg, const nng_sockaddr *addr) +{ + msg->m_addr = *addr; +} diff --git a/src/core/message.h b/src/core/message.h index 7e35ba752..cc47457a9 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -20,9 +20,9 @@ extern int nni_msg_realloc(nni_msg *, size_t); extern int nni_msg_reserve(nni_msg *, size_t); extern size_t nni_msg_capacity(nni_msg *); extern int nni_msg_dup(nni_msg **, const nni_msg *); -extern void * nni_msg_header(nni_msg *); +extern void *nni_msg_header(nni_msg *); extern size_t nni_msg_header_len(const nni_msg *); -extern void * nni_msg_body(nni_msg *); +extern void *nni_msg_body(nni_msg *); extern size_t nni_msg_len(const nni_msg *); extern int nni_msg_append(nni_msg *, const void *, size_t); extern int nni_msg_insert(nni_msg *, const void *, size_t); @@ -55,6 +55,14 @@ extern void nni_msg_clone(nni_msg *); extern nni_msg *nni_msg_unique(nni_msg *); extern bool nni_msg_shared(nni_msg *); +// Socket address access. Principally useful for transports like UDP, +// which may need to remember or add the socket address later. +// SP transports will generally not support upper layers setting the +// address on send, but will take the information from the pipe. +// It may be set on receive, depending upon the transport. +extern const nng_sockaddr *nni_msg_address(const nni_msg *); +extern void nni_msg_set_address(nng_msg *, const nng_sockaddr *); + // nni_msg_pull_up ensures that the message is unique, and that any // header present is "pulled up" into the message body. If the function // cannot do this for any reason (out of space in the body), then NULL diff --git a/src/core/platform.h b/src/core/platform.h index 5cd931e6d..45a47a592 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -356,7 +356,7 @@ extern int nni_tcp_listener_get( // Symbolic service names will be looked up assuming SOCK_STREAM, so // they may not work with UDP. extern void nni_resolv_ip( - const char *, const char *, int, bool, nng_sockaddr *sa, nni_aio *); + const char *, const char *, uint16_t, bool, nng_sockaddr *sa, nni_aio *); // nni_parse_ip parses an IP address, without a port. extern int nni_parse_ip(const char *, nng_sockaddr *); diff --git a/src/core/url.c b/src/core/url.c index 65cf478bd..a42ece1f4 100644 --- a/src/core/url.c +++ b/src/core/url.c @@ -8,6 +8,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "nng/nng.h" #include "nng_impl.h" #include @@ -277,6 +278,32 @@ nni_url_default_port(const char *scheme) return (""); } +// Return the address family for an address scheme. +// Returns NNG_AF_UNSPEC for unknown cases or where +// we do not want to choose between AF_INET and AF_INET6. +uint16_t +nni_url_family(const char *scheme) +{ + if (strcmp(scheme, "ipc") == 0) { + return (NNG_AF_IPC); + } + if (strcmp(scheme, "inproc") == 0) { + return (NNG_AF_INPROC); + } + if (strcmp(scheme, "abstract") == 0) { + return (NNG_AF_ABSTRACT); + } +#ifdef NNG_HAVE_INET6 + if (strchr(scheme, '6') != NULL) { + return (NNG_AF_INET6); + } +#endif + if (strchr(scheme, '4') != NULL) { + return (NNG_AF_INET); + } + return (NNG_AF_UNSPEC); +} + // URLs usually follow the following format: // // scheme:[//[userinfo@]host][/]path[?query][#fragment] @@ -600,7 +627,7 @@ nni_url_clone(nni_url **dstp, const nni_url *src) // nni_url_to_address resolves a URL into a sockaddr, assuming the URL is for // an IP address. int -nni_url_to_address(nng_sockaddr *sa, const nng_url *url) +nni_url_to_address(nng_sockaddr *sa, const nni_url *url) { int af; nni_aio aio; diff --git a/src/core/url.h b/src/core/url.h index 877a07917..10e569fb2 100644 --- a/src/core/url.h +++ b/src/core/url.h @@ -17,6 +17,7 @@ extern int nni_url_parse(nni_url **, const char *path); extern void nni_url_free(nni_url *); extern int nni_url_clone(nni_url **, const nni_url *); extern const char *nni_url_default_port(const char *); +extern uint16_t nni_url_family(const char *); extern int nni_url_asprintf(char **, const nni_url *); extern int nni_url_asprintf_port(char **, const nni_url *, int); extern size_t nni_url_decode(uint8_t *, const char *, size_t); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index f522499ee..7bb725ec1 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -236,7 +236,7 @@ resolv_task(resolv_item *item) } void -nni_resolv_ip(const char *host, const char *serv, int af, bool passive, +nni_resolv_ip(const char *host, const char *serv, uint16_t af, bool passive, nng_sockaddr *sa, nni_aio *aio) { resolv_item *item; diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 4ef4c68ce..911787cae 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -9,8 +9,8 @@ // #include "core/nng_impl.h" -#include "nng/nng.h" #include "platform/posix/posix_impl.h" +#include #include #ifdef NNG_PLATFORM_POSIX diff --git a/src/sp/transport.c b/src/sp/transport.c index 1d895c828..ab2a68da9 100644 --- a/src/sp/transport.c +++ b/src/sp/transport.c @@ -75,6 +75,9 @@ extern void nni_sp_zt_register(void); #ifdef NNG_TRANSPORT_FDC extern void nni_sp_sfd_register(void); #endif +#ifdef NNG_TRANSPORT_UDP +extern void nni_sp_udp_register(void); +#endif void nni_sp_tran_sys_init(void) @@ -103,6 +106,9 @@ nni_sp_tran_sys_init(void) #ifdef NNG_TRANSPORT_FDC nni_sp_sfd_register(); #endif +#ifdef NNG_TRANSPORT_UDP + nni_sp_udp_register(); +#endif } // nni_sp_tran_sys_fini finalizes the entire transport system, including all diff --git a/src/sp/transport/CMakeLists.txt b/src/sp/transport/CMakeLists.txt index 0de80015d..d59a19d40 100644 --- a/src/sp/transport/CMakeLists.txt +++ b/src/sp/transport/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2023 Staysail Systems, Inc. +# Copyright 2024 Staysail Systems, Inc. # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -17,4 +17,4 @@ add_subdirectory(tcp) add_subdirectory(tls) add_subdirectory(ws) add_subdirectory(zerotier) - +add_subdirectory(udp) diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index be2e03458..1760551cc 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -730,7 +730,7 @@ tcptran_ep_close(void *arg) static int tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl) { - int af; + uint16_t af; char *semi; char *src; size_t len; @@ -751,18 +751,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl) len = (size_t) (semi - url->u_hostname); url->u_hostname = semi + 1; - - if (strcmp(surl->u_scheme, "tcp") == 0) { - af = NNG_AF_UNSPEC; - } else if (strcmp(surl->u_scheme, "tcp4") == 0) { - af = NNG_AF_INET; -#ifdef NNG_ENABLE_IPV6 - } else if (strcmp(surl->u_scheme, "tcp6") == 0) { - af = NNG_AF_INET6; -#endif - } else { - return (NNG_EADDRINVAL); - } + af = nni_url_family(url->u_scheme); if ((src = nni_alloc(len + 1)) == NULL) { return (NNG_ENOMEM); diff --git a/src/sp/transport/tls/tls.c b/src/sp/transport/tls/tls.c index f488771b5..504d885ed 100644 --- a/src/sp/transport/tls/tls.c +++ b/src/sp/transport/tls/tls.c @@ -712,18 +712,9 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl) len = (size_t) (semi - url->u_hostname); url->u_hostname = semi + 1; + af = nni_url_family(url->u_scheme); - if (strcmp(surl->u_scheme, "tls+tcp") == 0) { - af = NNG_AF_UNSPEC; - } else if (strcmp(surl->u_scheme, "tls+tcp4") == 0) { - af = NNG_AF_INET; -#ifdef NNG_ENABLE_IPV6 - } else if (strcmp(surl->u_scheme, "tls+tcp6") == 0) { - af = NNG_AF_INET6; -#endif - } else { - return (NNG_EADDRINVAL); - } + rv = nni_url_to_address(sa, url); if ((src = nni_alloc(len + 1)) == NULL) { return (NNG_ENOMEM); diff --git a/src/sp/transport/udp/CMakeLists.txt b/src/sp/transport/udp/CMakeLists.txt new file mode 100644 index 000000000..b08cd8613 --- /dev/null +++ b/src/sp/transport/udp/CMakeLists.txt @@ -0,0 +1,15 @@ +# +# Copyright 2024 Staysail Systems, Inc. +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# UDP transport +nng_directory(udp) + +nng_sources_if(NNG_TRANSPORT_UDP udp.c) +nng_defines_if(NNG_TRANSPORT_UDP NNG_TRANSPORT_UDP) +nng_test_if(NNG_TRANSPORT_UDP udp_tran_test) diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c new file mode 100644 index 000000000..21d14ae6c --- /dev/null +++ b/src/sp/transport/udp/udp.c @@ -0,0 +1,1843 @@ +// Copyright 2024 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/aio.h" +#include "core/defs.h" +#include "core/idhash.h" +#include "core/nng_impl.h" +#include "core/options.h" +#include "core/platform.h" +#include "nng/nng.h" + +#include +#include +#include + +// Experimental UDP transport. Unicast only. +typedef struct udp_pipe udp_pipe; +typedef struct udp_ep udp_ep; + +// These should reallyh be renamed for the project. +#define nni_udp_open nni_plat_udp_open +#define nni_udp_close nni_plat_udp_close +#define nni_udp_send nni_plat_udp_send +#define nni_udp_recv nni_plat_udp_recv +#define nni_udp nni_plat_udp +#define nni_udp_sockname nni_plat_udp_sockname + +// OP code, 8 bits +enum udp_opcode { + OPCODE_DATA = 0, + OPCODE_CREQ = 1, + OPCODE_CACK = 2, + OPCODE_DISC = 3, + OPCODE_MESH = 4, +}; + +// Disconnect reason, must be 16 bits +typedef enum udp_disc_reason { + DISC_CLOSED = 0, // normal close + DISC_TYPE = 1, // bad SP type + DISC_NOTCONN = 2, // no such connection + DISC_REFUSED = 3, // refused by policy + DISC_MSGSIZE = 4, // message too large + DISC_NEGO = 5, // neogtiation failed + DISC_INACTIVE = 6, // closed due to inactivity + DISC_PROTO = 7, // other protocol error + DISC_NOBUF = 8, // resources exhausted +} udp_disc_reason; + +#ifndef NNG_UDP_TXQUEUE_LEN +#define NNG_UDP_TXQUEUE_LEN 32 +#endif + +#ifndef NNG_UDP_RXQUEUE_LEN +#define NNG_UDP_RXQUEUE_LEN 16 +#endif + +#ifndef NNG_UDP_RECVMAX +#define NNG_UDP_RECVMAX 65000 // largest permitted by spec +#endif + +#ifndef NNG_UDP_REFRESH +#define NNG_UDP_REFRESH (5 * NNI_SECOND) +#endif + +#ifndef NNG_UDP_CONNRETRY +#define NNG_UDP_CONNRETRY (NNI_SECOND / 5) +#endif + +#define UDP_EP_ROLE(ep) ((ep)->dialer ? "dialer " : "listener") + +// NB: Each of the following messages is exactly 20 bytes in size + +typedef struct udp_sp_data { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint32_t us_sender_id; + uint32_t us_peer_id; + uint32_t us_sequence; + uint16_t us_length; + uint16_t us_reserved; // depends on message type +} udp_sp_data; + +typedef struct udp_sp_creq { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint32_t us_sender_id; + uint32_t us_peer_id; + uint32_t us_sequence; + uint16_t us_recv_max; // actually max payload size + uint8_t us_reserved; + uint8_t us_refresh; +} udp_sp_creq; + +typedef struct udp_sp_disc { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint32_t us_sender_id; + uint32_t us_peer_id; + uint32_t us_sequence; + uint16_t us_reason; // depends on message type + uint16_t us_reserved; +} udp_sp_disc; + +typedef struct udp_sp_mesh { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint32_t us_sender_id; + uint32_t us_reserved1; + uint32_t us_sequence; + uint32_t us_reserved2; +} udp_sp_mesh; + +// ack is the same format as request +typedef struct udp_sp_creq udp_sp_cack; + +typedef union udp_sp_msg { + udp_sp_data data; + udp_sp_creq creq; + udp_sp_cack cack; + udp_sp_disc disc; + udp_sp_mesh mesh; +} udp_sp_msg; + +// Like a NIC driver, this is a "descriptor" for UDP TX packets. +// This allows us to create a circular ring of these to support +// queueing for TX gracefully. +typedef struct udp_txdesc { + udp_sp_msg header; // UDP transport message headers + nni_msg *payload; // may be null, only for data messages + nng_sockaddr sa; + bool submitted; // true if submitted +} udp_txdesc; + +typedef struct udp_txring { + udp_txdesc *descs; + uint16_t head; + uint16_t tail; + uint16_t count; + uint16_t size; +} udp_txring; + +#define UDP_TXRING_SZ 128 + +// UDP pipe resend (CREQ) in msec (nng_duration) +#define UDP_PIPE_REFRESH(p) ((p)->refresh) + +// UDP pipe timeout in msec (nng_duration) +#define UDP_PIPE_TIMEOUT(p) ((p)->refresh * 5) + +struct udp_pipe { + udp_ep *ep; + nni_pipe *npipe; + nng_sockaddr peer_addr; + uint16_t peer; + uint16_t proto; + uint32_t self_id; + uint32_t peer_id; + uint32_t self_seq; + uint32_t peer_seq; + uint16_t sndmax; // peer's max recv size + uint16_t rcvmax; // max recv size + bool closed; + bool dialer; + nng_duration refresh; // seconds, for the protocol + nng_time next_wake; + nng_time next_creq; + nng_time expire; + nni_list_node node; + nni_lmq rx_mq; + nni_list rx_aios; +}; + +struct udp_ep { + nni_udp *udp; + nni_mtx mtx; + uint16_t proto; + uint16_t af; // address family + bool fini; + bool started; + bool closed; + bool cooldown; + nng_url *url; + const char *host; // for dialers + int refcnt; // active pipes + nni_aio *useraio; + nni_aio *connaio; + nni_aio timeaio; + nni_aio resaio; + bool dialer; + bool tx_busy; // true if tx pending + nni_msg *rx_payload; // current receive message + nng_sockaddr rx_sa; // addr for last message + nni_aio tx_aio; // aio for TX handling + nni_aio rx_aio; // aio for RX handling + nni_id_map pipes; // pipes (indexed by id) + nni_sockaddr self_sa; // our address + nni_sockaddr peer_sa; // peer address, only for dialer; + nni_sockaddr mesh_sa; // mesh source address (ours) + nni_list connaios; // aios from accept waiting for a client peer + nni_list connpipes; // pipes waiting to be connected + nng_duration refresh; // refresh interval for connections in seconds + udp_sp_msg rx_msg; // contains the received message header + uint16_t rcvmax; // max payload, trimmed to uint16_t + uint16_t short_msg; + udp_txring tx_ring; + nni_time next_wake; + nni_aio_completions complq; + +#ifdef NNG_ENABLE_STATS + nni_stat_item st_rcv_max; +#endif +}; + +static void udp_ep_hold(udp_ep *ep); +static void udp_ep_rele(udp_ep *ep); +static void udp_ep_fini(void *); +static void udp_ep_start(udp_ep *); +static void udp_pipe_fini(void *); +static void udp_resolv_cb(void *); +static void udp_rx_cb(void *); + +static int +udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa) +{ + udp_pipe *p; + int rv; + nni_time now; + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_id_alloc32(&ep->pipes, &p->self_id, p)) != 0) { + NNI_FREE_STRUCT(p); + return (rv); + } + now = nni_clock(); + nni_aio_list_init(&p->rx_aios); + p->ep = ep; + p->dialer = ep->dialer; + p->self_seq = nni_random(); + p->peer_id = peer_id; + p->proto = ep->proto; + p->peer_addr = *sa; + p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh; + p->next_wake = now + UDP_PIPE_REFRESH(p); + p->expire = now + (p->dialer ? (5 * NNI_SECOND) : UDP_PIPE_TIMEOUT(p)); + p->rcvmax = ep->rcvmax; + *pp = p; + nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN); + udp_ep_hold(ep); + return (0); +} + +static void udp_recv_data( + udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa); +static void udp_send_disc_full(udp_ep *ep, nng_sockaddr *sa, uint32_t local_id, + uint32_t remote_id, uint32_t seq, udp_disc_reason reason); +static void udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason); + +static void udp_ep_match(udp_ep *ep); + +static void +udp_tran_init(void) +{ +} + +static void +udp_tran_fini(void) +{ +} + +static void +udp_pipe_close(void *arg) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + nni_aio *aio; + + nni_mtx_lock(&ep->mtx); + udp_send_disc(ep, p, DISC_CLOSED); + while ((aio = nni_list_first(&p->rx_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +udp_pipe_stop(void *arg) +{ + udp_pipe_close(arg); +} + +static int +udp_pipe_init(void *arg, nni_pipe *npipe) +{ + udp_pipe *p = arg; + p->npipe = npipe; + + return (0); +} + +static void +udp_pipe_destroy(udp_pipe *p) +{ + nng_msg *m; + + // call with ep->mtx lock held + while (!nni_lmq_empty(&p->rx_mq)) { + nni_lmq_get(&p->rx_mq, &m); + nni_msg_free(m); + } + NNI_ASSERT(nni_list_empty(&p->rx_aios)); + + NNI_FREE_STRUCT(p); +} + +static void +udp_pipe_fini(void *arg) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + + nni_mtx_lock(&ep->mtx); + nni_id_remove(&ep->pipes, p->self_id); + + udp_pipe_destroy(p); + udp_ep_rele(ep); // releases lock +} + +// Find the pipe matching the given id (our pipe id, taken from the peer_id +// of the header) and peer's sockaddr. Returns NULL if not found. The +// ep lock must be held. If a pending pipe (not yet connected) is found, then +// it is returned instead. +static udp_pipe * +udp_find_pipe(udp_ep *ep, uint32_t self_id, uint32_t peer_id) +{ + udp_pipe *p; + if (((p = nni_id_get(&ep->pipes, self_id)) != NULL) && (!p->closed)) { + if (p->peer_id == 0 || p->peer_id == peer_id) { + return (p); + } + } + return (NULL); +} + +static bool +udp_check_pipe_sequence(udp_pipe *p, uint32_t seq) +{ + int32_t delta; + // signed math so we can see how far apart they are + delta = (int32_t) (seq - p->peer_seq); + if (delta < 0) { + // out of order delivery + return (false); + } + // TODO: bump a stat for misses if delta > 0. + p->peer_seq = seq + 1; // expected next sequence number + return (true); +} + +static void +udp_pipe_schedule(udp_pipe *p) +{ + udp_ep *ep = p->ep; + bool changed = false; + if (p->expire < ep->next_wake) { + ep->next_wake = p->expire; + changed = true; + } + if (p->next_wake < ep->next_wake) { + ep->next_wake = p->next_wake; + changed = true; + } + if (changed) { + nni_aio_abort(&ep->timeaio, NNG_EINTR); + } +} + +static void +udp_start_rx(udp_ep *ep) +{ + nni_iov iov[2]; + + iov[0].iov_buf = &ep->rx_msg; + iov[0].iov_len = sizeof(ep->rx_msg); + iov[1].iov_buf = nni_msg_body(ep->rx_payload); + iov[1].iov_len = nni_msg_len(ep->rx_payload); + nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa); + nni_aio_set_iov(&ep->rx_aio, 2, iov); + nni_udp_recv(ep->udp, &ep->rx_aio); +} + +static void +udp_start_tx(udp_ep *ep) +{ + udp_txring *ring = &ep->tx_ring; + udp_txdesc *desc; + nng_msg *msg; + + if ((!ring->count) || (!ep->started) || ep->tx_busy) { + return; + } + ep->tx_busy = true; + + // NB: This does not advance the tail yet. + // The tail will be advanced when the operation is complete. + desc = &ring->descs[ring->tail]; + nni_iov iov[3]; + int niov = 0; + + NNI_ASSERT(desc->submitted); + iov[0].iov_buf = &desc->header; + iov[0].iov_len = sizeof(desc->header); + niov++; + + if ((msg = desc->payload) != NULL) { + if (nni_msg_header_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + } + nni_aio_set_input(&ep->tx_aio, 0, &desc->sa); + nni_aio_set_iov(&ep->tx_aio, niov, iov); + // it should *never* take this long, but allow for ARP resolution + nni_aio_set_timeout(&ep->tx_aio, NNI_SECOND * 10); + nni_udp_send(ep->udp, &ep->tx_aio); +} + +static void +udp_queue_tx(udp_ep *ep, nng_sockaddr *sa, udp_sp_msg *msg, nni_msg *payload) +{ + udp_txring *ring = &ep->tx_ring; + udp_txdesc *desc = &ring->descs[ring->head]; + + if (ring->count == ring->size || !ep->started) { + // ring is full + // TODO: bump a stat + if (payload != NULL) { + nni_msg_free(payload); + } + return; + } +#ifdef NNG_LITTLE_ENDIAN + // This covers modern GCC, clang, Visual Studio. + desc->header = *msg; +#else + // Fix the endianness, so other routines don't have to. + // It turns out that the endianness of the fields of CREQ + // is compatible with the fields of every other message type. + // We only have to do this for systems that are not known + // (at compile time) to be little endian. + desc->header.creq.us_ver = 0x1; + desc->header.creq.us_op_code = msg->creq.us_op_code; + NNI_PUT16LE(&desc->header.creq.us_type, msg->creq.us_type); + NNI_PUT32LE(&desc->header.creq.us_sended_id, msg->creq.us_sender_id); + NNI_PUT32LE(&desc->header.creq.us_peer_id, msg->creq.us_peer_id); + NNI_PUT32LE(&desc->header.creq.us_sequence, msg->creq.us_sequence); + NNI_PUT16LE(&desc->header.creq.us_recv_max, msg->creq.us_recv_max); + desc->header.creq.us_reserved = 0; + desc->header.creq.us_refresh = msg->creq.us_refresh; +#endif + + desc->payload = payload; + desc->sa = *sa; + desc->submitted = true; + ring->count++; + ring->head++; + if (ring->head == ring->size) { + ring->head = 0; + } + udp_start_tx(ep); +} + +static void +udp_finish_tx(udp_ep *ep) +{ + udp_txring *ring = &ep->tx_ring; + udp_txdesc *desc; + + NNI_ASSERT(ring->count > 0); + desc = &ring->descs[ring->tail]; + NNI_ASSERT(desc->submitted); + if (desc->payload != NULL) { + nni_msg_free(desc->payload); + desc->payload = NULL; + } + desc->submitted = false; + ring->tail++; + ring->count--; + if (ring->tail == ring->size) { + ring->tail = 0; + } + ep->tx_busy = false; + + // possibly start another tx going + udp_start_tx(ep); +} + +static void +udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason) +{ + nni_aio *aio; + if (p->closed) { + return; + } + p->closed = true; + while ((aio = nni_list_first(&p->rx_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + udp_send_disc_full( + ep, &p->peer_addr, p->self_id, p->peer_id, p->self_seq++, reason); +} + +static void +udp_send_disc_full(udp_ep *ep, nng_sockaddr *sa, uint32_t local_id, + uint32_t remote_id, uint32_t seq, udp_disc_reason reason) +{ + udp_sp_disc disc; + + disc.us_ver = 0x1; + disc.us_op_code = OPCODE_DISC; + disc.us_type = ep->proto; + disc.us_sender_id = local_id; + disc.us_peer_id = remote_id; + disc.us_sequence = seq; + disc.us_reason = (uint16_t) reason; + udp_queue_tx(ep, sa, (void *) &disc, NULL); +} + +static void +udp_send_creq(udp_ep *ep, udp_pipe *p) +{ + udp_sp_creq creq; + creq.us_ver = 0x1; + creq.us_op_code = OPCODE_CREQ; + creq.us_type = p->proto; + creq.us_sender_id = p->self_id; + creq.us_peer_id = p->peer_id; + creq.us_sequence = p->self_seq++; + creq.us_recv_max = p->rcvmax; + creq.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; + p->next_creq = nni_clock() + UDP_PIPE_REFRESH(p); + p->next_wake = p->next_creq; + + udp_pipe_schedule(p); + udp_queue_tx(ep, &p->peer_addr, (void *) &creq, NULL); +} + +static void +udp_send_cack(udp_ep *ep, udp_pipe *p) +{ + udp_sp_cack cack; + cack.us_ver = 0x01; + cack.us_op_code = OPCODE_CACK; + cack.us_type = p->proto; + cack.us_sender_id = p->self_id; + cack.us_peer_id = p->peer_id; + cack.us_sequence = p->self_seq++; + cack.us_recv_max = p->rcvmax; + cack.us_refresh = (p->refresh + NNI_SECOND - 1) / NNI_SECOND; + udp_queue_tx(ep, &p->peer_addr, (void *) &cack, NULL); +} + +static void +udp_recv_disc(udp_ep *ep, udp_sp_disc *disc, nng_sockaddr *sa) +{ + udp_pipe *p; + nni_aio *aio; + NNI_ARG_UNUSED(sa); + + p = udp_find_pipe(ep, disc->us_peer_id, disc->us_sender_id); + if (p != NULL) { + // For now we aren't validating the sequence numbers. + // This allows for an out of order DISC to cause the + // connection to be dropped, but it should self heal. + p->closed = true; + p->self_id = 0; // prevent it from being identified later + while ((aio = nni_list_first(&p->rx_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } +} + +// Receive data for the pipe. Returns true if we used +// the message, false otherwise. +static void +udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) +{ + // NB: ep mtx is locked + udp_pipe *p; + nni_aio *aio; + nni_msg *msg; + nni_time now; + + // send_id is the remote peer's ID + // peer_id is our ID (receiver ID) + // sequence number is our sender's sequence + uint32_t send_id = dreq->us_sender_id; + uint32_t peer_id = dreq->us_peer_id; + + // NB: Peer ID endianness does not matter, as long we use it + // consistently. + if ((p = udp_find_pipe(ep, peer_id, send_id)) == NULL) { + // TODO: Bump a stat... + udp_send_disc_full(ep, sa, send_id, peer_id, 0, DISC_NOTCONN); + // Question: how do we store the sockaddr for that? + return; + } + if (p->peer_id == 0) { + // connection isn't formed yet ... send another CREQ + udp_send_creq(ep, p); + return; + } + + now = nni_clock(); + + // Make sure the message wasn't truncated, and that it fits within + // our maximum agreed upon payload. + if ((dreq->us_length > len) || (dreq->us_length > p->rcvmax)) { + udp_send_disc(ep, p, DISC_MSGSIZE); + return; + } + + p->expire = now + UDP_PIPE_TIMEOUT(p); + p->next_wake = now + UDP_PIPE_REFRESH(p); + + udp_pipe_schedule(p); + + // trim the message down to its + nni_msg_chop( + ep->rx_payload, nni_msg_len(ep->rx_payload) - dreq->us_length); + + if (!udp_check_pipe_sequence(p, dreq->us_sequence)) { + // out of order delivery, drop it + // TODO: bump a stat + return; + } + + if (nni_lmq_full(&p->rx_mq)) { + // bump a NOBUF stat + return; + } + + // Short message, just alloc and copy + if (len <= ep->short_msg) { + if (nng_msg_alloc(&msg, len) != 0) { + // TODO: bump a stat + return; + } + nni_msg_set_address(msg, sa); + nni_msg_clear(msg); + nni_msg_append(msg, nni_msg_body(ep->rx_payload), len); + nni_lmq_put(&p->rx_mq, msg); + } else { + // Message size larger than copy break, do zero copy + msg = ep->rx_payload; + if (nng_msg_alloc(&ep->rx_payload, + ep->rcvmax + sizeof(ep->rx_msg)) != 0) { + // TODO: bump a stat + ep->rx_payload = msg; // make sure we put it back + return; + } + + if (len > nng_msg_len(msg)) { + // chop off any unfilled tail + nng_msg_chop(msg, nng_msg_len(msg) - len); + } + nni_msg_set_address(msg, sa); + nni_lmq_put(&p->rx_mq, msg); + } + + while (((aio = nni_list_first(&p->rx_aios)) != NULL) && + (!nni_lmq_empty(&p->rx_mq))) { + nni_aio_list_remove(aio); + nni_lmq_get(&p->rx_mq, &msg); + nni_aio_set_msg(aio, msg); + nni_aio_completions_add( + &ep->complq, aio, 0, nni_aio_count(aio)); + } +} + +static void +udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) +{ + udp_pipe *p; + nni_time now; + + now = nni_clock(); + if (ep->dialer) { + // dialers do not accept CREQ requests + udp_send_disc_full(ep, sa, creq->us_peer_id, + creq->us_sender_id, 0, DISC_REFUSED); + return; + } + if ((p = udp_find_pipe(ep, creq->us_peer_id, creq->us_sender_id))) { + if ((p->peer_id == 0) || (p->peer != creq->us_type)) { + // we don't expect this -- a connection request from a + // peer while we have an oustanding request of our own. + // We *could* compare the sockaddrs to see if they + // match and if so then treat this as just a dueling + // connection. but for now we just discard it -- we'll + // wait for the CACK. + return; + } + + // so we know who it is from.. this is a refresh. + if (creq->us_refresh == 0) { + udp_send_disc(ep, p, DISC_NEGO); + return; + } + if ((creq->us_refresh * NNI_SECOND) < p->refresh) { + p->refresh = creq->us_refresh * NNI_SECOND; + } + p->next_wake = now + UDP_PIPE_REFRESH(p); + p->expire = now + UDP_PIPE_TIMEOUT(p); + + udp_pipe_schedule(p); + udp_send_cack(ep, p); + return; + } + + // new pipe + if (ep->fini || ep->closed) { + // endpoint is closing down, reject it. + udp_send_disc_full( + ep, sa, 0, creq->us_sender_id, 0, DISC_REFUSED); + return; + } + if (creq->us_refresh == 0) { + udp_send_disc_full( + ep, sa, 0, creq->us_sender_id, 0, DISC_NEGO); + return; + } + + if (udp_pipe_alloc(&p, ep, creq->us_peer_id, sa) != 0) { + udp_send_disc_full( + ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF); + return; + } + p->refresh = ep->refresh; + if ((creq->us_refresh * NNI_SECOND) < p->refresh) { + p->refresh = (creq->us_refresh * NNI_SECOND); + } + p->peer = creq->us_type; + p->peer_id = creq->us_sender_id; + p->peer_seq = creq->us_sequence + 1; + p->sndmax = creq->us_recv_max; + p->next_wake = now + UDP_PIPE_REFRESH(p); + p->expire = now + UDP_PIPE_TIMEOUT(p); + + udp_pipe_schedule(p); + nni_list_append(&ep->connpipes, p); + udp_send_cack(ep, p); + udp_ep_match(ep); +} + +static void +udp_recv_cack(udp_ep *ep, udp_sp_creq *cack, nng_sockaddr *sa) +{ + udp_pipe *p; + bool first; + nni_time now; + + if ((p = udp_find_pipe(ep, cack->us_peer_id, cack->us_sender_id)) && + (!p->closed)) { + if ((p->peer_id != 0) && (p->peer != cack->us_type)) { + udp_send_disc(ep, p, DISC_TYPE); + return; + } + + first = (p->peer_id == 0); + + // so we know who it is from.. this is a refresh. + p->sndmax = cack->us_recv_max; + p->peer = cack->us_type; + p->peer_id = cack->us_sender_id; + + if (cack->us_refresh == 0) { + udp_send_disc(ep, p, DISC_NEGO); + return; + } + if (first) { + p->refresh = ep->refresh; + p->peer_seq = cack->us_sequence + 1; + } + if ((cack->us_refresh * NNI_SECOND) < p->refresh) { + p->refresh = cack->us_refresh * NNI_SECOND; + } + now = nni_clock(); + p->next_wake = now + UDP_PIPE_REFRESH(p); + p->expire = now + UDP_PIPE_TIMEOUT(p); + udp_pipe_schedule(p); + + if (first) { + nni_list_append(&ep->connpipes, p); + udp_ep_match(ep); + } + return; + } + + // a CACK without a corresponding CREQ (or timed out pipe already) + udp_send_disc_full( + ep, sa, cack->us_peer_id, cack->us_sender_id, 0, DISC_NOTCONN); +} + +static void +udp_tx_cb(void *arg) +{ + udp_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + udp_finish_tx(ep); + nni_mtx_unlock(&ep->mtx); +} + +// In the case of unicast UDP, we don't know +// whether the message arrived from a connected peer as part of a +// logical connection, or is a message related to connection management. +static void +udp_rx_cb(void *arg) +{ + udp_ep *ep = arg; + nni_aio *aio = &ep->rx_aio; + int rv; + size_t n; + udp_sp_msg *hdr; + nng_sockaddr *sa; + nni_aio_completions complq; + + // for a received packet we are either receiving it for a + // connection we already have established, or for a new connection. + // Dialers cannot receive connection requests (as a safety + // precaution). + + nni_mtx_lock(&ep->mtx); + if ((rv = nni_aio_result(aio)) != 0) { + // something bad happened on RX... which is unexpected. + // sleep a little bit and hope for recovery. + switch (nni_aio_result(aio)) { + case NNG_ECLOSED: + case NNG_ECANCELED: + nni_mtx_unlock(&ep->mtx); + return; + case NNG_ETIMEDOUT: + case NNG_EAGAIN: + case NNG_EINTR: + ep->cooldown = false; + goto finish; + break; + default: + ep->cooldown = true; + nni_sleep_aio(5, aio); + nni_mtx_unlock(&ep->mtx); + return; + } + } + if (ep->cooldown) { + ep->cooldown = false; + goto finish; + } + + // Received message will be in the ep rx header. + hdr = &ep->rx_msg; + sa = &ep->rx_sa; + n = nng_aio_count(aio); + + if ((n >= sizeof(*hdr)) && (hdr->data.us_ver == 1)) { + n -= sizeof(*hdr); + +#ifndef NNG_LITTLE_ENDIAN + // Fix the endianness, so other routines don't have to. + // It turns out that the endianness of the fields of CREQ + // is compatible with the fields of every other message type. + // We only have to do this for systems that are not known + // (at compile time) to be little endian. + hdr->data.us_type = NNI_GET16LE(&hdr->data.us_type); + hdr->data.us_sender_id = NNI_GET32LE(&hdr->data.us_sender_id); + hdr->data.us_peeer_id = NNI_GET32LE(&hdr->data.us_peer_id); + hdr->data.us_sequence = NNI_GET32LE(&hdr->data.us_sequence); + hdr->data.us_length = NNI_GET16LE(&hdr->data.us_length); +#endif + + // TODO: verify that incoming type matches us! + + switch (hdr->data.us_op_code) { + case OPCODE_DATA: + udp_recv_data(ep, &hdr->data, n, sa); + break; + case OPCODE_CREQ: + udp_recv_creq(ep, &hdr->creq, sa); + break; + case OPCODE_CACK: + udp_recv_cack(ep, &hdr->cack, sa); + break; + case OPCODE_DISC: + udp_recv_disc(ep, &hdr->disc, sa); + break; + case OPCODE_MESH: // TODO: + // udp_recv_mesh(ep, &hdr->mesh, sa); + // break; + default: + udp_send_disc_full( + ep, sa, 0, hdr->data.us_sender_id, 0, DISC_PROTO); + break; + } + } + +finish: + // start another receive + udp_start_rx(ep); + + // grab the list of completions so we can finish them. + complq = ep->complq; + nni_aio_completions_init(&ep->complq); + nni_mtx_unlock(&ep->mtx); + + // now run the completions -- synchronously + nni_aio_completions_run(&complq); +} + +static void +udp_pipe_send(void *arg, nni_aio *aio) +{ + udp_pipe *p = arg; + udp_ep *ep; + udp_sp_data dreq; + nng_msg *msg; + + if (nni_aio_begin(aio) != 0) { + // No way to give the message back to the protocol, + // so we just discard it silently to prevent it from leaking. + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + return; + } + + msg = nni_aio_get_msg(aio); + ep = p->ep; + + nni_mtx_lock(&ep->mtx); + if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > p->sndmax) { + nni_mtx_unlock(&ep->mtx); + // rather failing this with an error, we just drop it on the + // floor. this is on the sender, so there isn't a compelling + // need to disconnect the pipe, since it we're not being + // "ill-behaved" to our peer. + // TODO: bump a stat + nni_msg_free(msg); + return; + } + + dreq.us_ver = 1; + dreq.us_type = ep->proto; + dreq.us_op_code = OPCODE_DATA; + dreq.us_sender_id = p->self_id; + dreq.us_peer_id = p->peer_id; + dreq.us_sequence = p->self_seq++; + dreq.us_length = + msg != NULL ? nni_msg_len(msg) + nni_msg_header_len(msg) : 0; + + // Just queue it, or fail it. + udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish(aio, 0, dreq.us_length); +} + +static void +udp_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + + nni_mtx_lock(&ep->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&ep->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); +} + +static void +udp_pipe_recv(void *arg, nni_aio *aio) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (p->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, udp_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&p->rx_aios, aio); + nni_mtx_unlock(&ep->mtx); +} + +static uint16_t +udp_pipe_peer(void *arg) +{ + udp_pipe *p = arg; + + return (p->peer); +} + +static int +udp_pipe_get_recvmax(void *arg, void *v, size_t *szp, nni_type t) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(p->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +udp_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_type t) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_sockaddr(&p->peer_addr, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static nni_option udp_pipe_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = udp_pipe_get_recvmax, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = udp_pipe_get_remaddr, + }, + { + .o_name = NULL, + }, +}; + +static int +udp_pipe_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + udp_pipe *p = arg; + int rv; + + rv = nni_getopt(udp_pipe_options, name, p, buf, szp, t); + return (rv); +} + +// udp_ep_hold simply bumps the reference count. +// This needs to be done with the lock for the EP held. +static void +udp_ep_hold(udp_ep *ep) +{ + ep->refcnt++; +} + +// udp_ep_rele drops the reference count on the endpoint. +// If the endpoint drops to zero, the EP is freed. It also +// unlocks the mutex, which must be held when calling this. +static void +udp_ep_rele(udp_ep *ep) +{ + nni_aio *aio; + NNI_ASSERT(ep->refcnt > 0); + ep->refcnt--; + if (!ep->fini || ep->refcnt > 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + while ((aio = nni_list_first(&ep->connaios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&ep->mtx); + + nni_aio_close(&ep->timeaio); + nni_aio_close(&ep->resaio); + nni_aio_close(&ep->tx_aio); + nni_aio_close(&ep->rx_aio); + if (ep->udp != NULL) { + nni_udp_close(ep->udp); + } + nni_aio_fini(&ep->timeaio); + nni_aio_fini(&ep->resaio); + nni_aio_fini(&ep->tx_aio); + nni_aio_fini(&ep->rx_aio); + nni_id_map_fini(&ep->pipes); + NNI_FREE_STRUCTS(ep->tx_ring.descs, ep->tx_ring.size); + NNI_FREE_STRUCT(ep); +} + +static void +udp_ep_fini(void *arg) +{ + udp_ep *ep = arg; + + // We optionally linger a little bit (up to a half second) + // so that the disconnect messages can get pushed out. On + // most systems this should only take a single millisecond. + nni_time linger = + nni_clock() + NNI_SECOND / 2; // half second to drain, max + nni_mtx_lock(&ep->mtx); + ep->fini = true; + while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) { + NNI_ASSERT(ep->refcnt > 0); + nni_mtx_unlock(&ep->mtx); + nng_msleep(1); + nni_mtx_lock(&ep->mtx); + } + if (ep->tx_ring.count > 0) { + nng_log_warn("NNG-UDP-LINGER", + "Lingering timed out on endpoint close, peer " + "notifications dropped"); + } + udp_ep_rele(ep); // drops the lock +} + +static void +udp_ep_close(void *arg) +{ + udp_ep *ep = arg; + udp_pipe *p; + nni_aio *aio; + + nni_mtx_lock(&ep->mtx); + while ((aio = nni_list_first(&ep->connaios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECONNABORTED); + } + + // close all pipes + uint32_t cursor = 0; + while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) { + p->closed = true; + if (p->peer_id != 0) { + udp_send_disc(ep, p, DISC_CLOSED); + } + while ((aio = nni_list_first(&p->rx_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if (p->npipe == NULL) { + nni_list_remove(&ep->connpipes, p); + nni_id_remove(&ep->pipes, p->self_id); + udp_pipe_destroy(p); + ep->refcnt--; + } + } + nni_aio_close(&ep->resaio); + nni_mtx_unlock(&ep->mtx); +} + +// timer handler - sends out additional creqs as needed, +// reaps stale connections, and handles linger. +static void +udp_timer_cb(void *arg) +{ + udp_ep *ep = arg; + udp_pipe *p; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_aio_result(&ep->timeaio); + if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED) || ep->closed) { + nni_mtx_unlock(&ep->mtx); + return; + } + + uint32_t cursor = 0; + nni_time now = nni_clock(); + nng_duration refresh = ep->refresh; + + ep->next_wake = NNI_TIME_NEVER; + while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) { + + if (p->closed) { + if (p->npipe == NULL) { + // pipe closed, but we have to clean it up + // ourselves + nni_id_remove(&ep->pipes, p->self_id); + udp_pipe_destroy(p); + ep->refcnt--; + } + continue; + } + + if (now > p->expire) { + char buf[128]; + nni_aio *aio; + nng_log_info("NNG-UDP-INACTIVE", + "Pipe peer %s timed out due to inactivity", + nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf))); + if ((ep->dialer) && (p->peer_id == 0) && + (aio = nni_list_first(&ep->connaios))) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ETIMEDOUT); + } + // This will probably not be received by the peer, + // since we aren't getting anything from them. But + // having it on the wire may help debugging later. + udp_send_disc(ep, p, DISC_INACTIVE); + continue; + } + + if (p->dialer && now > p->next_creq) { + udp_send_creq(ep, p); + } + if (p->next_wake < ep->next_wake) { + ep->next_wake = p->next_wake; + } + } + refresh = ep->next_wake == NNI_TIME_NEVER + ? NNG_DURATION_INFINITE + : (nng_duration) (ep->next_wake - now); + nni_sleep_aio(refresh, &ep->timeaio); + + nni_mtx_unlock(&ep->mtx); +} + +static int +udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock) +{ + udp_ep *ep; + int rv; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + + ep->tx_ring.descs = + NNI_ALLOC_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN); + if (ep->tx_ring.descs == NULL) { + NNI_FREE_STRUCT(ep); + return (NNG_ENOMEM); + } + ep->tx_ring.size = NNG_UDP_TXQUEUE_LEN; + + ep->af = nni_url_family(url->u_scheme); + ep->self_sa.s_family = ep->af; + ep->proto = nni_sock_proto_id(sock); + ep->url = url; + ep->refresh = NNG_UDP_REFRESH; // one minute by default + ep->rcvmax = NNG_UDP_RECVMAX; + ep->refcnt = 1; + if ((rv = nni_msg_alloc(&ep->rx_payload, + ep->rcvmax + sizeof(ep->rx_msg)) != 0)) { + NNI_FREE_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN); + NNI_FREE_STRUCT(ep); + return (rv); + } + + nni_mtx_init(&ep->mtx); + nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true); + NNI_LIST_INIT(&ep->connpipes, udp_pipe, node); + nni_aio_list_init(&ep->connaios); + + nni_aio_init(&ep->rx_aio, udp_rx_cb, ep); + nni_aio_init(&ep->tx_aio, udp_tx_cb, ep); + nni_aio_init(&ep->timeaio, udp_timer_cb, ep); + nni_aio_init(&ep->resaio, udp_resolv_cb, ep); + nni_aio_completions_init(&ep->complq); + +#ifdef NNG_ENABLE_STATS + static const nni_stat_info rcv_max_info = { + .si_name = "rcv_max", + .si_desc = "maximum receive size", + .si_type = NNG_STAT_LEVEL, + .si_unit = NNG_UNIT_BYTES, + .si_atomic = true, + }; + nni_stat_init(&ep->st_rcv_max, &rcv_max_info); +#endif + + // schedule our timer callback - forever for now + // adjusted automatically as we add pipes or other + // actions which require earlier wakeup. + nni_sleep_aio(NNG_DURATION_INFINITE, &ep->timeaio); + + *epp = ep; + return (0); +} + +static int +udp_check_url(nng_url *url, bool listen) +{ + // Check for invalid URL components. + if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { + return (NNG_EADDRINVAL); + } + if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); + } + if (!listen) { + if ((strlen(url->u_hostname) == 0) || + (strlen(url->u_port) == 0) || (atoi(url->u_port) == 0)) { + return (NNG_EADDRINVAL); + } + } + return (0); +} + +static int +udp_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) +{ + udp_ep *ep; + int rv; + nni_sock *sock = nni_dialer_sock(ndialer); + + if ((rv = udp_check_url(url, false)) != 0) { + return (rv); + } + + if ((rv = udp_ep_init(&ep, url, sock)) != 0) { + return (rv); + } + +#ifdef NNG_ENABLE_STATS + nni_dialer_add_stat(ndialer, &ep->st_rcv_max); +#endif + *dp = ep; + return (0); +} + +static int +udp_listener_init(void **lp, nng_url *url, nni_listener *nlistener) +{ + udp_ep *ep; + int rv; + nni_sock *sock = nni_listener_sock(nlistener); + + // Check for invalid URL components. + if ((rv = udp_check_url(url, true)) != 0) { + return (rv); + } + + if ((rv = udp_ep_init(&ep, url, sock)) != 0) { + return (rv); + } + + if ((rv = nni_url_to_address(&ep->self_sa, url)) != 0) { + return (rv); + } + +#ifdef NNG_ENABLE_STATS + nni_listener_add_stat(nlistener, &ep->st_rcv_max); +#endif + + *lp = ep; + return (0); +} + +static void +udp_ep_cancel(nni_aio *aio, void *arg, int rv) +{ + udp_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_aio_abort(&ep->resaio, rv); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +udp_resolv_cb(void *arg) +{ + udp_ep *ep = arg; + udp_pipe *p; + nni_aio *aio; + int rv; + nni_mtx_lock(&ep->mtx); + if ((aio = nni_list_first(&ep->connaios)) == NULL) { + nni_mtx_unlock(&ep->mtx); + return; + } + if (ep->closed) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_result(&ep->resaio)) != 0) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nng_log_warn("NNG-UDP-RESOLV", + "Failed resolving IP address: %s", nng_strerror(rv)); + nni_aio_finish_error(aio, rv); + return; + } + + // Choose the right port to bind to. The family must match. + if (ep->self_sa.s_family == NNG_AF_UNSPEC) { + ep->self_sa.s_family = ep->peer_sa.s_family; + } + + if (ep->udp == NULL) { + if ((rv = nni_udp_open(&ep->udp, &ep->self_sa)) != 0) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + } + + // places a "hold" on the ep + if ((rv = udp_pipe_alloc(&p, ep, 0, &ep->peer_sa)) != 0) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + udp_pipe_schedule(p); + udp_ep_start(ep); + + // Send out the connection request. We don't complete + // the user aio until we confirm a connection, so that + // we can supply details like maximum receive message size + // and the protocol the peer is using. + udp_send_creq(ep, p); + nni_mtx_unlock(&ep->mtx); +} + +static void +udp_ep_connect(void *arg, nni_aio *aio) +{ + udp_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_EBUSY); + return; + } + NNI_ASSERT(nni_list_empty(&ep->connaios)); + ep->dialer = true; + + if ((rv = nni_aio_schedule(aio, udp_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&ep->connaios, aio); + + // lookup the IP address + + nni_aio_set_timeout(&ep->resaio, NNI_SECOND * 5); + nni_resolv_ip(ep->url->u_hostname, ep->url->u_port, ep->af, false, + &ep->peer_sa, &ep->resaio); + + // wake up for retries + nni_aio_abort(&ep->timeaio, NNG_EINTR); + + nni_mtx_unlock(&ep->mtx); +} + +static int +udp_ep_get_port(void *arg, void *buf, size_t *szp, nni_type t) +{ + udp_ep *ep = arg; + nng_sockaddr sa; + int port; + uint8_t *paddr; + + nni_mtx_lock(&ep->mtx); + if (ep->udp != NULL) { + (void) nni_udp_sockname(ep->udp, &sa); + } else { + sa = ep->self_sa; + } + switch (sa.s_family) { + case NNG_AF_INET: + paddr = (void *) &sa.s_in.sa_port; + break; + + case NNG_AF_INET6: + paddr = (void *) &sa.s_in6.sa_port; + break; + + default: + paddr = NULL; + break; + } + nni_mtx_unlock(&ep->mtx); + + if (paddr == NULL) { + return (NNG_ESTATE); + } + + NNI_GET16(paddr, port); + return (nni_copyout_int(port, buf, szp, t)); +} + +static int +udp_ep_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + char *s; + int rv; + int port = 0; + nng_sockaddr sa; + + nni_mtx_lock(&ep->mtx); + if (ep->udp != NULL) { + (void) nni_udp_sockname(ep->udp, &sa); + } else { + sa = ep->self_sa; + } + switch (sa.s_family) { + case NNG_AF_INET: + NNI_GET16((uint8_t *) &sa.s_in.sa_port, port); + break; + case NNG_AF_INET6: + NNI_GET16((uint8_t *) &sa.s_in6.sa_port, port); + break; + } + if ((rv = nni_url_asprintf_port(&s, ep->url, port)) == 0) { + rv = nni_copyout_str(s, v, szp, t); + nni_strfree(s); + } + nni_mtx_unlock(&ep->mtx); + + return (rv); +} + +static int +udp_ep_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + int rv; + nng_sockaddr sa; + + if (ep->udp != NULL) { + (void) nni_udp_sockname(ep->udp, &sa); + } else { + sa = ep->self_sa; + } + + rv = nni_copyout_sockaddr(&sa, v, szp, t); + return (rv); +} + +static int +udp_ep_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + int rv; + nng_sockaddr sa; + + if (!ep->dialer) { + return (NNG_ENOTSUP); + } + sa = ep->peer_sa; + + rv = nni_copyout_sockaddr(&sa, v, szp, t); + return (rv); +} + +static int +udp_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +udp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + udp_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, 65000, t)) == 0) { + nni_mtx_lock(&ep->mtx); + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } + ep->rcvmax = (uint16_t) val; + nni_mtx_unlock(&ep->mtx); +#ifdef NNG_ENABLE_STATS + nni_stat_set_value(&ep->st_rcv_max, val); +#endif + } + return (rv); +} + +// this just looks for pipes waiting for an aio, and aios waiting for +// a connection, and matches them together. +static void +udp_ep_match(udp_ep *ep) +{ + nng_aio *aio = nni_list_first(&ep->connaios); + udp_pipe *p = nni_list_first(&ep->connpipes); + + if ((aio == NULL) || (p == NULL)) { + return; + } + + nni_aio_list_remove(aio); + nni_list_remove(&ep->connpipes, p); + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +udp_ep_start(udp_ep *ep) +{ + ep->started = true; + udp_start_rx(ep); +} + +static int +udp_ep_bind(void *arg) +{ + udp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } + + rv = nni_udp_open(&ep->udp, &ep->self_sa); + if (rv != 0) { + nni_mtx_unlock(&ep->mtx); + return (rv); + } + udp_ep_start(ep); + nni_mtx_unlock(&ep->mtx); + + return (rv); +} + +static void +udp_ep_accept(void *arg, nni_aio *aio) +{ + udp_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, udp_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&ep->connaios, aio); + udp_ep_match(ep); + nni_mtx_unlock(&ep->mtx); +} + +static nni_sp_pipe_ops udp_pipe_ops = { + .p_init = udp_pipe_init, + .p_fini = udp_pipe_fini, + .p_stop = udp_pipe_stop, + .p_send = udp_pipe_send, + .p_recv = udp_pipe_recv, + .p_close = udp_pipe_close, + .p_peer = udp_pipe_peer, + .p_getopt = udp_pipe_getopt, +}; + +static const nni_option udp_ep_opts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = udp_ep_get_recvmaxsz, + .o_set = udp_ep_set_recvmaxsz, + }, + { + .o_name = NNG_OPT_URL, + .o_get = udp_ep_get_url, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = udp_ep_get_locaddr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = udp_ep_get_remaddr, + }, + { + .o_name = NNG_OPT_TCP_BOUND_PORT, + .o_get = udp_ep_get_port, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +udp_dialer_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + udp_ep *ep = arg; + + return (nni_getopt(udp_ep_opts, name, ep, buf, szp, t)); +} + +static int +udp_dialer_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + udp_ep *ep = arg; + + return (nni_setopt(udp_ep_opts, name, ep, buf, sz, t)); +} + +static int +udp_listener_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + udp_ep *ep = arg; + + return (nni_getopt(udp_ep_opts, name, ep, buf, szp, t)); +} + +static int +udp_listener_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + udp_ep *ep = arg; + + return (nni_setopt(udp_ep_opts, name, ep, buf, sz, t)); +} + +static nni_sp_dialer_ops udp_dialer_ops = { + .d_init = udp_dialer_init, + .d_fini = udp_ep_fini, + .d_connect = udp_ep_connect, + .d_close = udp_ep_close, + .d_getopt = udp_dialer_getopt, + .d_setopt = udp_dialer_setopt, +}; + +static nni_sp_listener_ops udp_listener_ops = { + .l_init = udp_listener_init, + .l_fini = udp_ep_fini, + .l_bind = udp_ep_bind, + .l_accept = udp_ep_accept, + .l_close = udp_ep_close, + .l_getopt = udp_listener_getopt, + .l_setopt = udp_listener_setopt, +}; + +static nni_sp_tran udp_tran = { + .tran_scheme = "udp", + .tran_dialer = &udp_dialer_ops, + .tran_listener = &udp_listener_ops, + .tran_pipe = &udp_pipe_ops, + .tran_init = udp_tran_init, + .tran_fini = udp_tran_fini, +}; + +static nni_sp_tran udp4_tran = { + .tran_scheme = "udp4", + .tran_dialer = &udp_dialer_ops, + .tran_listener = &udp_listener_ops, + .tran_pipe = &udp_pipe_ops, + .tran_init = udp_tran_init, + .tran_fini = udp_tran_fini, +}; + +#ifdef NNG_ENABLE_IPV6 +static nni_sp_tran udp6_tran = { + .tran_scheme = "udp6", + .tran_dialer = &udp_dialer_ops, + .tran_listener = &udp_listener_ops, + .tran_pipe = &udp_pipe_ops, + .tran_init = udp_tran_init, + .tran_fini = udp_tran_fini, +}; +#endif + +void +nni_sp_udp_register(void) +{ + nni_sp_tran_register(&udp_tran); + nni_sp_tran_register(&udp4_tran); +#ifdef NNG_ENABLE_IPV6 + nni_sp_tran_register(&udp6_tran); +#endif +} diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c new file mode 100644 index 000000000..b99c5af19 --- /dev/null +++ b/src/sp/transport/udp/udp_tran_test.c @@ -0,0 +1,171 @@ +// +// Copyright 2024 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// Copyright 2018 Devolutions +// Copyright 2018 Cody Piersall +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "nng/nng.h" +#include + +// TCP tests. + +static void +test_udp_wild_card_connect_fail(void) +{ + nng_socket s; + char addr[NNG_MAXADDRLEN]; + + NUTS_OPEN(s); + (void) snprintf(addr, sizeof(addr), "udp://*:%u", nuts_next_port()); + NUTS_FAIL(nng_dial(s, addr, NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s); +} + +void +test_udp_wild_card_bind(void) +{ + nng_socket s1; + nng_socket s2; + char addr[NNG_MAXADDRLEN]; + uint16_t port; + + port = nuts_next_port(); + + NUTS_OPEN(s1); + NUTS_OPEN(s2); + (void) snprintf(addr, sizeof(addr), "udp4://*:%u", port); + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + nng_msleep(500); + (void) snprintf(addr, sizeof(addr), "udp://127.0.0.1:%u", port); + NUTS_PASS(nng_dial(s2, addr, NULL, 0)); + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +void +test_udp_local_address_connect(void) +{ + + nng_socket s1; + nng_socket s2; + char addr[NNG_MAXADDRLEN]; + uint16_t port; + + NUTS_OPEN(s1); + NUTS_OPEN(s2); + port = nuts_next_port(); + (void) snprintf(addr, sizeof(addr), "udp://127.0.0.1:%u", port); + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + (void) snprintf(addr, sizeof(addr), "udp://127.0.0.1:%u", port); + NUTS_PASS(nng_dial(s2, addr, NULL, 0)); + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +void +test_udp_port_zero_bind(void) +{ + nng_socket s1; + nng_socket s2; + nng_sockaddr sa; + nng_listener l; + char *addr; + int port; + + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listen(s1, "udp://127.0.0.1:0", &l, 0)); + nng_msleep(100); + NUTS_PASS(nng_listener_get_string(l, NNG_OPT_URL, &addr)); + NUTS_TRUE(memcmp(addr, "udp://", 6) == 0); + NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa)); + NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET); + NUTS_TRUE(sa.s_in.sa_port != 0); + NUTS_TRUE(sa.s_in.sa_addr == nuts_be32(0x7f000001)); + NUTS_PASS(nng_dial(s2, addr, NULL, 0)); + NUTS_PASS(nng_listener_get_int(l, NNG_OPT_TCP_BOUND_PORT, &port)); + NUTS_TRUE(port == nuts_be16(sa.s_in.sa_port)); + nng_strfree(addr); + + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +void +test_udp_non_local_address(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_FAIL(nng_listen(s1, "udp://8.8.8.8", NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s1); +} + +void +test_udp_malformed_address(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_FAIL(nng_dial(s1, "udp://127.0.0.1", NULL, 0), NNG_EADDRINVAL); + NUTS_FAIL(nng_dial(s1, "udp://127.0.0.1.32", NULL, 0), NNG_EADDRINVAL); + NUTS_FAIL(nng_dial(s1, "udp://127.0.x.1.32", NULL, 0), NNG_EADDRINVAL); + NUTS_FAIL( + nng_listen(s1, "udp://127.0.0.1.32", NULL, 0), NNG_EADDRINVAL); + NUTS_FAIL( + nng_listen(s1, "udp://127.0.x.1.32", NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s1); +} + +void +test_udp_recv_max(void) +{ + char msg[256]; + char buf[256]; + nng_socket s0; + nng_socket s1; + nng_listener l; + size_t sz; + char *addr; + + NUTS_ADDR(addr, "udp"); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 200)); + NUTS_PASS(nng_listener_create(&l, s0, addr)); + NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz)); + NUTS_TRUE(sz == 200); + NUTS_PASS(nng_listener_set_size(l, NNG_OPT_RECVMAXSZ, 100)); + NUTS_PASS(nng_listener_start(l, 0)); + + NUTS_OPEN(s1); + NUTS_PASS(nng_dial(s1, addr, NULL, 0)); + nng_msleep(1000); + NUTS_PASS(nng_send(s1, msg, 95, 0)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_recv(s0, buf, &sz, 0)); + NUTS_TRUE(sz == 95); + NUTS_PASS(nng_send(s1, msg, 150, 0)); + NUTS_FAIL(nng_recv(s0, buf, &sz, 0), NNG_ETIMEDOUT); + NUTS_PASS(nng_close(s0)); + NUTS_CLOSE(s1); +} + +NUTS_TESTS = { + + { "udp wild card connect fail", test_udp_wild_card_connect_fail }, + { "udp wild card bind", test_udp_wild_card_bind }, + { "udp port zero bind", test_udp_port_zero_bind }, + { "udp local address connect", test_udp_local_address_connect }, + { "udp non-local address", test_udp_non_local_address }, + { "udp malformed address", test_udp_malformed_address }, + { "udp recv max", test_udp_recv_max }, + { NULL, NULL }, +}; diff --git a/src/testing/marry.c b/src/testing/marry.c index d39cf583a..7441468ff 100644 --- a/src/testing/marry.c +++ b/src/testing/marry.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -46,7 +46,8 @@ nuts_scratch_addr(const char *scheme, size_t sz, char *addr) } if ((strncmp(scheme, "tcp", 3) == 0) || - (strncmp(scheme, "tls", 3) == 0)) { + (strncmp(scheme, "tls", 3) == 0) || + (strncmp(scheme, "udp", 3) == 0)) { (void) snprintf( addr, sz, "%s://127.0.0.1:%u", scheme, nuts_next_port()); return; @@ -84,6 +85,7 @@ nuts_scratch_addr(const char *scheme, size_t sz, char *addr) } // We should not be here. + nng_log_err("NUTS", "Unknown scheme"); abort(); }