Skip to content

Commit

Permalink
fixes #168 UDP transport
Browse files Browse the repository at this point in the history
This is the initial implementation of UDP transport.
It does in order guarantees (and consequently filters duplicates),
but it does not guarantee delivery.  The protocol limits payloads
to 65000 bytes (minus headers for SP), but you really want to
keep it to much less -- probably best for short messages that within
a single MTU to avoid IP fragmentation and reassembly.

This is unicast only for now (although there are plans for some
support for multicast and broadcast as well as being able to
perform automatic mesh building, but that will be in following work.

Additional tunables are coming.  This is only lightly tested at
this point, and should be considered experimental.  Its also undocumented.
  • Loading branch information
gdamore committed Oct 5, 2024
1 parent 034b1a0 commit 522cf24
Show file tree
Hide file tree
Showing 19 changed files with 2,142 additions and 45 deletions.
3 changes: 3 additions & 0 deletions cmake/NNGOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ mark_as_advanced(NNG_TRANSPORT_WSS)
option (NNG_TRANSPORT_FDC "Enable File Descriptor transport (EXPERIMENTAL)" ON)
mark_as_advanced(NNG_TRANSPORT_FDC)

option (NNG_TRANSPORT_UDP "Enable UDP transport (EXPERIMENTAL)" ON)
mark_as_advanced(NNG_TRANSPORT_UDP)

# ZeroTier
option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF)
mark_as_advanced(NNG_TRANSPORT_ZEROTIER)
Expand Down
13 changes: 12 additions & 1 deletion src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "core/defs.h"
#include "nng/nng.h"
#include "nng_impl.h"

#include <string.h>
Expand Down Expand Up @@ -764,7 +766,16 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
ms = aio->a_timeout;
}
}
aio->a_expire = nni_clock() + ms;
switch (ms) {
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
// infinite sleep
aio->a_expire = NNI_TIME_NEVER;
break;
default:
aio->a_expire = nni_clock() + ms;
break;
}

if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
nni_aio_finish_error(aio, rv);
Expand Down
6 changes: 6 additions & 0 deletions src/core/idhash.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,9 @@ nni_id_visit(nni_id_map *m, uint64_t *keyp, void **valp, uint32_t *cursor)
*cursor = index;
return (false);
}

uint32_t
nni_id_count(const nni_id_map *m)
{
return (m->id_count);
}
19 changes: 10 additions & 9 deletions src/core/idhash.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ struct nni_id_map {
#define NNI_ID_FLAG_RANDOM 2 // start at a random value
#define NNI_ID_FLAG_REGISTER 4 // map is registered for finalization

extern void nni_id_map_init(nni_id_map *, uint64_t, uint64_t, bool);
extern void nni_id_map_fini(nni_id_map *);
extern void *nni_id_get(nni_id_map *, uint64_t);
extern int nni_id_set(nni_id_map *, uint64_t, void *);
extern int nni_id_alloc(nni_id_map *, uint64_t *, void *);
extern int nni_id_alloc32(nni_id_map *, uint32_t *, void *);
extern int nni_id_remove(nni_id_map *, uint64_t);
extern void nni_id_map_sys_fini(void);
extern bool nni_id_visit(nni_id_map *, uint64_t *, void **, uint32_t *);
extern void nni_id_map_init(nni_id_map *, uint64_t, uint64_t, bool);
extern void nni_id_map_fini(nni_id_map *);
extern void *nni_id_get(nni_id_map *, uint64_t);
extern int nni_id_set(nni_id_map *, uint64_t, void *);
extern int nni_id_alloc(nni_id_map *, uint64_t *, void *);
extern int nni_id_alloc32(nni_id_map *, uint32_t *, void *);
extern int nni_id_remove(nni_id_map *, uint64_t);
extern void nni_id_map_sys_fini(void);
extern bool nni_id_visit(nni_id_map *, uint64_t *, void **, uint32_t *);
extern uint32_t nni_id_count(const nni_id_map *);

#define NNI_ID_MAP_INITIALIZER(min, max, flags) \
{ \
Expand Down
23 changes: 23 additions & 0 deletions src/core/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct nng_msg {
nni_chunk m_body;
uint32_t m_pipe; // set on receive
nni_atomic_int m_refcnt;
nng_sockaddr m_addr; // set on receive, transport use
};

#if 0
Expand Down Expand Up @@ -544,6 +545,16 @@ nni_msg_chop(nni_msg *m, size_t len)
return (nni_chunk_chop(&m->m_body, len));
}

// Grow the message header, but don't put anything there.
// This is useful for setting up to receive directly into it
// for zero copy purposes.
void
nni_msg_header_extend(nni_msg *m, size_t len)
{
NNI_ASSERT((len + m->m_header_len) <= sizeof(m->m_header_buf));
m->m_header_len += len;
}

int
nni_msg_header_append(nni_msg *m, const void *data, size_t len)
{
Expand Down Expand Up @@ -656,3 +667,15 @@ nni_msg_get_pipe(const nni_msg *m)
{
return (m->m_pipe);
}

const nng_sockaddr *
nni_msg_address(const nni_msg *msg)
{
return (&msg->m_addr);
}

void
nni_msg_set_address(nng_msg *msg, const nng_sockaddr *addr)
{
msg->m_addr = *addr;
}
14 changes: 11 additions & 3 deletions src/core/message.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2017 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand All @@ -20,9 +20,9 @@ extern int nni_msg_realloc(nni_msg *, size_t);
extern int nni_msg_reserve(nni_msg *, size_t);
extern size_t nni_msg_capacity(nni_msg *);
extern int nni_msg_dup(nni_msg **, const nni_msg *);
extern void * nni_msg_header(nni_msg *);
extern void *nni_msg_header(nni_msg *);
extern size_t nni_msg_header_len(const nni_msg *);
extern void * nni_msg_body(nni_msg *);
extern void *nni_msg_body(nni_msg *);
extern size_t nni_msg_len(const nni_msg *);
extern int nni_msg_append(nni_msg *, const void *, size_t);
extern int nni_msg_insert(nni_msg *, const void *, size_t);
Expand Down Expand Up @@ -55,6 +55,14 @@ extern void nni_msg_clone(nni_msg *);
extern nni_msg *nni_msg_unique(nni_msg *);
extern bool nni_msg_shared(nni_msg *);

// Socket address access. Principally useful for transports like UDP,
// which may need to remember or add the socket address later.
// SP transports will generally not support upper layers setting the
// address on send, but will take the information from the pipe.
// It may be set on receive, depending upon the transport.
extern const nng_sockaddr *nni_msg_address(const nni_msg *);
extern void nni_msg_set_address(nng_msg *, const nng_sockaddr *);

// nni_msg_pull_up ensures that the message is unique, and that any
// header present is "pulled up" into the message body. If the function
// cannot do this for any reason (out of space in the body), then NULL
Expand Down
2 changes: 1 addition & 1 deletion src/core/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ extern int nni_tcp_listener_get(
// Symbolic service names will be looked up assuming SOCK_STREAM, so
// they may not work with UDP.
extern void nni_resolv_ip(
const char *, const char *, int, bool, nng_sockaddr *sa, nni_aio *);
const char *, const char *, uint16_t, bool, nng_sockaddr *sa, nni_aio *);

// nni_parse_ip parses an IP address, without a port.
extern int nni_parse_ip(const char *, nng_sockaddr *);
Expand Down
29 changes: 28 additions & 1 deletion src/core/url.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "nng/nng.h"
#include "nng_impl.h"

#include <ctype.h>
Expand Down Expand Up @@ -277,6 +278,32 @@ nni_url_default_port(const char *scheme)
return ("");
}

// Return the address family for an address scheme.
// Returns NNG_AF_UNSPEC for unknown cases or where
// we do not want to choose between AF_INET and AF_INET6.
uint16_t
nni_url_family(const char *scheme)
{
if (strcmp(scheme, "ipc") == 0) {
return (NNG_AF_IPC);
}
if (strcmp(scheme, "inproc") == 0) {
return (NNG_AF_INPROC);
}
if (strcmp(scheme, "abstract") == 0) {
return (NNG_AF_ABSTRACT);
}
#ifdef NNG_HAVE_INET6
if (strchr(scheme, '6') != NULL) {
return (NNG_AF_INET6);
}
#endif
if (strchr(scheme, '4') != NULL) {
return (NNG_AF_INET);
}
return (NNG_AF_UNSPEC);
}

// URLs usually follow the following format:
//
// scheme:[//[userinfo@]host][/]path[?query][#fragment]
Expand Down Expand Up @@ -600,7 +627,7 @@ nni_url_clone(nni_url **dstp, const nni_url *src)
// nni_url_to_address resolves a URL into a sockaddr, assuming the URL is for
// an IP address.
int
nni_url_to_address(nng_sockaddr *sa, const nng_url *url)
nni_url_to_address(nng_sockaddr *sa, const nni_url *url)
{
int af;
nni_aio aio;
Expand Down
1 change: 1 addition & 0 deletions src/core/url.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extern int nni_url_parse(nni_url **, const char *path);
extern void nni_url_free(nni_url *);
extern int nni_url_clone(nni_url **, const nni_url *);
extern const char *nni_url_default_port(const char *);
extern uint16_t nni_url_family(const char *);
extern int nni_url_asprintf(char **, const nni_url *);
extern int nni_url_asprintf_port(char **, const nni_url *, int);
extern size_t nni_url_decode(uint8_t *, const char *, size_t);
Expand Down
2 changes: 1 addition & 1 deletion src/platform/posix/posix_resolv_gai.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ resolv_task(resolv_item *item)
}

void
nni_resolv_ip(const char *host, const char *serv, int af, bool passive,
nni_resolv_ip(const char *host, const char *serv, uint16_t af, bool passive,
nng_sockaddr *sa, nni_aio *aio)
{
resolv_item *item;
Expand Down
2 changes: 1 addition & 1 deletion src/platform/posix/posix_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
//

#include "core/nng_impl.h"
#include "nng/nng.h"
#include "platform/posix/posix_impl.h"
#include <nng/nng.h>
#include <sys/errno.h>

#ifdef NNG_PLATFORM_POSIX
Expand Down
6 changes: 6 additions & 0 deletions src/sp/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ extern void nni_sp_zt_register(void);
#ifdef NNG_TRANSPORT_FDC
extern void nni_sp_sfd_register(void);
#endif
#ifdef NNG_TRANSPORT_UDP
extern void nni_sp_udp_register(void);
#endif

void
nni_sp_tran_sys_init(void)
Expand Down Expand Up @@ -103,6 +106,9 @@ nni_sp_tran_sys_init(void)
#ifdef NNG_TRANSPORT_FDC
nni_sp_sfd_register();
#endif
#ifdef NNG_TRANSPORT_UDP
nni_sp_udp_register();
#endif
}

// nni_sp_tran_sys_fini finalizes the entire transport system, including all
Expand Down
4 changes: 2 additions & 2 deletions src/sp/transport/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2023 Staysail Systems, Inc. <[email protected]>
# Copyright 2024 Staysail Systems, Inc. <[email protected]>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
Expand All @@ -17,4 +17,4 @@ add_subdirectory(tcp)
add_subdirectory(tls)
add_subdirectory(ws)
add_subdirectory(zerotier)

add_subdirectory(udp)
15 changes: 2 additions & 13 deletions src/sp/transport/tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ tcptran_ep_close(void *arg)
static int
tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl)
{
int af;
uint16_t af;
char *semi;
char *src;
size_t len;
Expand All @@ -751,18 +751,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl)

len = (size_t) (semi - url->u_hostname);
url->u_hostname = semi + 1;

if (strcmp(surl->u_scheme, "tcp") == 0) {
af = NNG_AF_UNSPEC;
} else if (strcmp(surl->u_scheme, "tcp4") == 0) {
af = NNG_AF_INET;
#ifdef NNG_ENABLE_IPV6
} else if (strcmp(surl->u_scheme, "tcp6") == 0) {
af = NNG_AF_INET6;
#endif
} else {
return (NNG_EADDRINVAL);
}
af = nni_url_family(url->u_scheme);

if ((src = nni_alloc(len + 1)) == NULL) {
return (NNG_ENOMEM);
Expand Down
13 changes: 2 additions & 11 deletions src/sp/transport/tls/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -712,18 +712,9 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl)

len = (size_t) (semi - url->u_hostname);
url->u_hostname = semi + 1;
af = nni_url_family(url->u_scheme);

if (strcmp(surl->u_scheme, "tls+tcp") == 0) {
af = NNG_AF_UNSPEC;
} else if (strcmp(surl->u_scheme, "tls+tcp4") == 0) {
af = NNG_AF_INET;
#ifdef NNG_ENABLE_IPV6
} else if (strcmp(surl->u_scheme, "tls+tcp6") == 0) {
af = NNG_AF_INET6;
#endif
} else {
return (NNG_EADDRINVAL);
}
rv = nni_url_to_address(sa, url);

if ((src = nni_alloc(len + 1)) == NULL) {
return (NNG_ENOMEM);
Expand Down
15 changes: 15 additions & 0 deletions src/sp/transport/udp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2024 Staysail Systems, Inc. <[email protected]>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
# file was obtained (LICENSE.txt). A copy of the license may also be
# found online at https://opensource.org/licenses/MIT.
#

# UDP transport
nng_directory(udp)

nng_sources_if(NNG_TRANSPORT_UDP udp.c)
nng_defines_if(NNG_TRANSPORT_UDP NNG_TRANSPORT_UDP)
nng_test_if(NNG_TRANSPORT_UDP udp_tran_test)
Loading

0 comments on commit 522cf24

Please sign in to comment.