From 026d1f319843648221fd10e42ec3d3e3c1164ce1 Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Thu, 2 May 2024 00:33:29 +0000 Subject: [PATCH] finish udp listener --- .gitignore | 1 + tunnels/adapters/connector/connector.c | 4 +- tunnels/adapters/listener/CMakeLists.txt | 18 +++ tunnels/adapters/listener/listener.c | 90 +++++++++++ tunnels/adapters/listener/listener.h | 12 ++ tunnels/adapters/listener/tcp/tcp_listener.c | 17 +- tunnels/adapters/listener/tcp/tcp_listener.h | 7 +- tunnels/adapters/listener/udp/CMakeLists.txt | 14 +- tunnels/adapters/listener/udp/udp_listener.c | 162 ++++++------------- tunnels/adapters/listener/udp/udp_listener.h | 6 +- ww/managers/socket_manager.c | 20 +-- ww/managers/socket_manager.h | 2 +- ww/tunnel.c | 2 + 13 files changed, 211 insertions(+), 144 deletions(-) create mode 100644 tunnels/adapters/listener/CMakeLists.txt create mode 100644 tunnels/adapters/listener/listener.c create mode 100644 tunnels/adapters/listener/listener.h diff --git a/.gitignore b/.gitignore index f79c7975..3f37aca7 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ cacert.pem /core/texts /cmake +/.cache .env.local .env.*.local diff --git a/tunnels/adapters/connector/connector.c b/tunnels/adapters/connector/connector.c index e532f9f7..e60ebc92 100644 --- a/tunnels/adapters/connector/connector.c +++ b/tunnels/adapters/connector/connector.c @@ -57,8 +57,8 @@ tunnel_t *newConnector(node_instance_context_t *instance_info) udp_outbound_node->version = instance_info->node->version; registerNode(tcp_outbound_node, settings); registerNode(udp_outbound_node, settings); - runNode(tcp_outbound_node, instance_info->chain_index + 1); - runNode(udp_outbound_node, instance_info->chain_index + 1); + runNode(tcp_outbound_node, instance_info->chain_index); + runNode(udp_outbound_node, instance_info->chain_index); state->tcp_connector = tcp_outbound_node->instance; state->udp_connector = udp_outbound_node->instance; diff --git a/tunnels/adapters/listener/CMakeLists.txt b/tunnels/adapters/listener/CMakeLists.txt new file mode 100644 index 00000000..e4665220 --- /dev/null +++ b/tunnels/adapters/listener/CMakeLists.txt @@ -0,0 +1,18 @@ + + +add_library(Listener STATIC + listener.c + +) + +# target_include_directories(Listener PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +#ww api +target_include_directories(Listener PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../ww) +target_link_libraries(Listener ww) +# add dependencies +include(${CMAKE_BINARY_DIR}/cmake/CPM.cmake) + +target_compile_definitions(Listener PRIVATE Listener_VERSION=0.1) +if(CMAKE_BUILD_TYPE STREQUAL "Debug") +target_compile_definitions(Listener PRIVATE DEBUG=1) +endif() diff --git a/tunnels/adapters/listener/listener.c b/tunnels/adapters/listener/listener.c new file mode 100644 index 00000000..f44aba63 --- /dev/null +++ b/tunnels/adapters/listener/listener.c @@ -0,0 +1,90 @@ +#include "listener.h" +#include "loggers/network_logger.h" +#include "managers/node_manager.h" +#include "utils/stringutils.h" + +typedef struct listener_state_s +{ + tunnel_t *tcp_listener; + tunnel_t *udp_listener; + +} listener_state_t; + +typedef struct listener_con_state_s +{ + +} listener_con_state_t; + +static void upStream(tunnel_t *self, context_t *c) +{ + self->up->upStream(self->up, c); +} +static void downStream(tunnel_t *self, context_t *c) +{ + listener_state_t *state = STATE(self); + + switch ((c->line->src_ctx.address_protocol)) + { + default: + case kSapTcp: + state->tcp_listener->downStream(state->tcp_listener, c); + break; + + case kSapUdp: + state->udp_listener->downStream(state->udp_listener, c); + break; + } +} + +tunnel_t *newListener(node_instance_context_t *instance_info) +{ + listener_state_t *state = malloc(sizeof(listener_state_t)); + memset(state, 0, sizeof(listener_state_t)); + cJSON *settings = instance_info->node_settings_json; + + if (! (cJSON_IsObject(settings) && settings->child != NULL)) + { + LOGF("JSON Error: Listener->settings (object field) : The object was empty or invalid"); + return NULL; + } + node_t *tcp_outbound_node = newNode(); + node_t *udp_outbound_node = newNode(); + tcp_outbound_node->name = concat(instance_info->node->name, "_tcp_inbound"); + tcp_outbound_node->type = "TcpListener"; + tcp_outbound_node->version = instance_info->node->version; + udp_outbound_node->name = concat(instance_info->node->name, "_tcp_inbound"); + udp_outbound_node->type = "UdpListener"; + udp_outbound_node->version = instance_info->node->version; + registerNode(tcp_outbound_node, settings); + registerNode(udp_outbound_node, settings); + runNode(tcp_outbound_node, instance_info->chain_index); + runNode(udp_outbound_node, instance_info->chain_index); + state->tcp_listener = tcp_outbound_node->instance; + state->udp_listener = udp_outbound_node->instance; + + tunnel_t *t = newTunnel(); + t->state = state; + t->upStream = &upStream; + t->downStream = &downStream; + + chainDown(t, state->tcp_listener); + chainDown(t, state->udp_listener); + + atomic_thread_fence(memory_order_release); + return t; +} +api_result_t apiListener(tunnel_t *self, const char *msg) +{ + (void) (self); + (void) (msg); + return (api_result_t){0}; +} +tunnel_t *destroyListener(tunnel_t *self) +{ + (void) (self); + return NULL; +} +tunnel_metadata_t getMetadataListener() +{ + return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; +} \ No newline at end of file diff --git a/tunnels/adapters/listener/listener.h b/tunnels/adapters/listener/listener.h new file mode 100644 index 00000000..92cb7822 --- /dev/null +++ b/tunnels/adapters/listener/listener.h @@ -0,0 +1,12 @@ +#pragma once +#include "api.h" + +// user <-----\ /-----> (Tcp|Udp) listener +// user <------> Listener <------> (Tcp|Udp) listener +// user <-----/ \-----> (Tcp|Udp) listener +// + +tunnel_t * newListener(node_instance_context_t *instance_info); +api_result_t apiListener(tunnel_t *self, const char *msg); +tunnel_t * destroyListener(tunnel_t *self); +tunnel_metadata_t getMetadataListener(); diff --git a/tunnels/adapters/listener/tcp/tcp_listener.c b/tunnels/adapters/listener/tcp/tcp_listener.c index 38be85f4..86f74802 100644 --- a/tunnels/adapters/listener/tcp/tcp_listener.c +++ b/tunnels/adapters/listener/tcp/tcp_listener.c @@ -349,14 +349,19 @@ static void onInboundConnected(hevent_t *ev) } // send the init packet - context_t *context = newInitContext(line); - context->src_io = io; - self->upStream(self, context); - if (! isAlive(line)) + lockLine(line); { - LOGW("TcpListener: socket just got closed by upstream before anything happend"); - return; + context_t *context = newInitContext(line); + context->src_io = io; + self->upStream(self, context); + if (! isAlive(line)) + { + LOGW("TcpListener: socket just got closed by upstream before anything happend"); + unLockLine(line); + return; + } } + unLockLine(line); hio_read(io); } diff --git a/tunnels/adapters/listener/tcp/tcp_listener.h b/tunnels/adapters/listener/tcp/tcp_listener.h index 6d0fdda0..d0278038 100644 --- a/tunnels/adapters/listener/tcp/tcp_listener.h +++ b/tunnels/adapters/listener/tcp/tcp_listener.h @@ -1,10 +1,9 @@ #pragma once #include "api.h" -// user <-----\ /-----> con 1 -// user <------> TcpListener <------> con 2 -// user <-----/ \-----> con 3 -// +// user <-----\ /-----> Tcp con 1 +// user <------> TcpListener <------> Tcp con 2 +// user <-----/ \-----> Tcp con 3 tunnel_t *newTcpListener(node_instance_context_t *instance_info); diff --git a/tunnels/adapters/listener/udp/CMakeLists.txt b/tunnels/adapters/listener/udp/CMakeLists.txt index 9f361555..c281b89f 100644 --- a/tunnels/adapters/listener/udp/CMakeLists.txt +++ b/tunnels/adapters/listener/udp/CMakeLists.txt @@ -1,15 +1,15 @@ -add_library(TcpListener STATIC - tcp_listener.c +add_library(UdpListener STATIC + udp_listener.c ) -# target_include_directories(TcpListener PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +# target_include_directories(UdpListener PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) #ww api -target_include_directories(TcpListener PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../ww) -target_link_libraries(TcpListener ww) +target_include_directories(UdpListener PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../ww) +target_link_libraries(UdpListener ww) # add dependencies @@ -19,9 +19,9 @@ include(${CMAKE_BINARY_DIR}/cmake/CPM.cmake) -target_compile_definitions(TcpListener PRIVATE TcpListener_VERSION=0.1) +target_compile_definitions(UdpListener PRIVATE UdpListener_VERSION=0.1) if(CMAKE_BUILD_TYPE STREQUAL "Debug") -target_compile_definitions(TcpListener PRIVATE DEBUG=1) +target_compile_definitions(UdpListener PRIVATE DEBUG=1) endif() diff --git a/tunnels/adapters/listener/udp/udp_listener.c b/tunnels/adapters/listener/udp/udp_listener.c index da5d5211..d07ece0d 100644 --- a/tunnels/adapters/listener/udp/udp_listener.c +++ b/tunnels/adapters/listener/udp/udp_listener.c @@ -3,9 +3,9 @@ #include "idle_table.h" #include "loggers/network_logger.h" #include "managers/socket_manager.h" +#include "tunnel.h" #include "utils/jsonutils.h" -#include -#include +#include "utils/sockutils.h" // enable profile to see some time info // #define PROFILE 1 @@ -29,6 +29,7 @@ typedef struct udp_listener_con_state_s hio_t *io; idle_table_t *table; line_t *line; + idle_item_t *idle_handle; buffer_pool_t *buffer_pool; bool established; bool first_packet_sent; @@ -36,11 +37,11 @@ typedef struct udp_listener_con_state_s static void cleanup(udp_listener_con_state_t *cstate) { - if (cstate->io) + + if (cstate->idle_handle) { - hevent_set_userdata(cstate->io, NULL); + removeIdleItemByHandle(cstate->table, cstate->idle_handle); } - free(cstate); } @@ -84,34 +85,8 @@ static inline void downStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { - if (cstate->write_paused) - { - if (c->src_io) - { - hio_read_stop(c->src_io); - } - contextQueuePush(cstate->data_queue, c); - } - else - { - unsigned int bytes = bufLen(c->payload); - int nwrite = hio_write(cstate->io, c->payload); - c->payload = NULL; - if (nwrite >= 0 && nwrite < bytes) - { - if (c->src_io) - { - hio_read_stop(c->src_io); - } - contextQueuePush(cstate->finished_queue, c); - cstate->write_paused = true; - hio_setcb_write(cstate->io, onWriteComplete); - } - else - { - destroyContext(c); - } - } + postUdpWrite(cstate->io, c->payload); + destroyContext(c); } else { @@ -129,74 +104,31 @@ static inline void downStream(tunnel_t *self, context_t *c) cleanup(cstate); destroyLine(c->line); destroyContext(c); - hio_close(io); return; } } } -static void onRecvFrom(hio_t *io, shift_buffer_t *buf) -{ - - char localaddrstr[SOCKADDR_STRLEN] = {0}; - char peeraddrstr[SOCKADDR_STRLEN] = {0}; - printf("[%s] <=> [%s]\n", SOCKADDR_STR(hio_localaddr(io), localaddrstr), - SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); - - hash_t peer_hash = sockAddrCalcHash(hio_peeraddr(io)); - - line_t *line = newLine(tid); - udp_listener_con_state_t *cstate = (udp_listener_con_state_t *) (hevent_userdata(io)); - - if (cstate == NULL) - { - reuseBuffer(hloop_bufferpool(hevent_loop(io)), buf); - return; - } - shift_buffer_t *payload = buf; - tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; - bool *first_packet_sent = &((cstate)->first_packet_sent); - - context_t *context = newContext(line); - context->src_io = io; - context->payload = payload; - if (! (*first_packet_sent)) - { - *first_packet_sent = true; - context->first = true; - } - - self->upStream(self, context); -} - static void onUdpConnectonExpire(idle_item_t *idle_udp) { - udp_listener_con_state_t *cstate = (udp_listener_con_state_t *) (hevent_userdata(io)); - if (cstate != NULL) - { - LOGD("UdpListener: received close for FD:%x ", (int) hio_fd(io)); - } - else - { - LOGD("UdpListener: sent close for FD:%x ", (int) hio_fd(io)); - } - - if (cstate != NULL) - { - tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; - context_t *context = newFinContext(line); - self->upStream(self, context); - } + + udp_listener_con_state_t *cstate = idle_udp->userdata; + assert(cstate != NULL); + LOGD("UdpListener: expired idle udp FD:%x ", (int) hio_fd(cstate->io)); + cstate->idle_handle = NULL; // its freed by the table after return + tunnel_t *self = (cstate)->tunnel; + line_t *line = (cstate)->line; + context_t *context = newFinContext(line); + self->upStream(self, context); } -static udp_listener_con_state_t *newConnection(hloop_t *loop, uint8_t tid, tunnel_t *self, hio_t *io, - idle_table_t *table, uint8_t real_localport) +static udp_listener_con_state_t *newConnection(uint8_t tid, tunnel_t *self, hio_t *io, idle_table_t *table, + uint8_t real_localport) { line_t *line = newLine(tid); udp_listener_con_state_t *cstate = malloc(sizeof(udp_listener_con_state_t)); + cstate->loop = loops[tid]; cstate->line = line; cstate->buffer_pool = getThreadBufferPool(tid); cstate->io = io; @@ -211,45 +143,58 @@ static udp_listener_con_state_t *newConnection(hloop_t *loop, uint8_t tid, tunne sockaddr_set_port(&(line->src_ctx.address), real_localport); struct sockaddr log_localaddr = *hio_localaddr(cstate->io); - sockaddr_set_port((sockaddr_u *) &(log_localaddr), data->real_localport); + sockaddr_set_port((sockaddr_u *) &(log_localaddr), real_localport); char localaddrstr[SOCKADDR_STRLEN] = {0}; char peeraddrstr[SOCKADDR_STRLEN] = {0}; LOGD("UdpListener: Accepted FD:%x [%s] <= [%s]", (int) hio_fd(cstate->io), SOCKADDR_STR(&log_localaddr, localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); - free(data); - - hio_setcb_read(io, onRecv); - hio_setcb_close(io, onClose); // send the init packet - context_t *context = newInitContext(line); - context->src_io = io; - self->upStream(self, context); - if (! isAlive(line)) + lockLine(line); { - LOGW("UdpListener: socket just got closed by upstream before anything happend"); - return; + context_t *context = newInitContext(line); + self->upStream(self, context); + if (! isAlive(line)) + { + LOGW("UdpListener: socket just got closed by upstream before anything happend"); + unLockLine(line); + return NULL; + } } - hio_read(io); + unLockLine(line); + return cstate; } static void onFilteredRecv(hevent_t *ev) { - udp_payload_t *data = (udp_payload_t *) hevent_userdata(ev); - - hash_t peeraddr_hash = sockAddrCalcHash((sockaddr_u *) hio_peeraddr(data->sock->io)); + udp_payload_t *data = (udp_payload_t *) hevent_userdata(ev); + hash_t peeraddr_hash = sockAddrCalcHash((sockaddr_u *) hio_peeraddr(data->sock->io)); - idle_item_t *idle = getIdleItemByHash(data->sock->table, peeraddr_hash); + idle_item_t *idle = getIdleItemByHash(data->sock->udp_table, peeraddr_hash); if (idle == NULL) { - - newIdleItem(data->sock->table, peeraddr_hash, ) onUdpConnectonExpire + udp_listener_con_state_t *con = + newConnection(data->tid, data->tunnel, data->sock->io, data->sock->udp_table, data->real_localport); + if (! con) + { + reuseBuffer(getThreadBufferPool(data->tid), data->buf); + free(data); + return; + } + con->idle_handle = newIdleItem(data->sock->udp_table, peeraddr_hash, con, onUdpConnectonExpire, data->tid, + (uint64_t) 70 * 1000); } + tunnel_t *self = data->tunnel; + udp_listener_con_state_t *con = idle->userdata; + context_t *context = newContext(con->line); + context->payload = data->buf; + self->upStream(self, context); + free(data); } -void parsePortSection(udp_listener_state_t *state, const cJSON *settings) +static void parsePortSection(udp_listener_state_t *state, const cJSON *settings) { const cJSON *port_json = cJSON_GetObjectItemCaseSensitive(settings, "port"); if ((cJSON_IsNumber(port_json) && (port_json->valuedouble != 0))) @@ -303,7 +248,6 @@ tunnel_t *newUdpListener(node_instance_context_t *instance_info) LOGF("JSON Error: UdpListener->settings (object field) : The object was empty or invalid"); return NULL; } - getBoolFromJsonObject(&(state->no_delay), settings, "nodelay"); if (! getStringFromJsonObject(&(state->address), settings, "address")) { @@ -369,7 +313,7 @@ tunnel_t *newUdpListener(node_instance_context_t *instance_info) t->state = state; t->upStream = &upStream; t->downStream = &downStream; - registerSocketAcceptor(t, filter_opt, onInboundConnected); + registerSocketAcceptor(t, filter_opt, onFilteredRecv); atomic_thread_fence(memory_order_release); return t; diff --git a/tunnels/adapters/listener/udp/udp_listener.h b/tunnels/adapters/listener/udp/udp_listener.h index 295f5838..e78fa677 100644 --- a/tunnels/adapters/listener/udp/udp_listener.h +++ b/tunnels/adapters/listener/udp/udp_listener.h @@ -1,9 +1,9 @@ #pragma once #include "api.h" -// user <-----\ /-----> simulated con 1 -// user <------> UdpListener <------> simulated con 2 -// user <-----/ \-----> simulated con 3 +// user <-----\ /-----> simulated Udp con 1 +// user <------> UdpListener <------> simulated Udp con 2 +// user <-----/ \-----> simulated Udp con 3 // diff --git a/ww/managers/socket_manager.c b/ww/managers/socket_manager.c index ef29f9f4..68e34f31 100644 --- a/ww/managers/socket_manager.c +++ b/ww/managers/socket_manager.c @@ -3,12 +3,10 @@ #include "buffer_pool.h" #include "hloop.h" #include "hmutex.h" -#include "idle_table.h" #include "loggers/network_logger.h" #include "stc/common.h" #include "tunnel.h" #include "utils/procutils.h" -#include "utils/sockutils.h" #include "ww.h" typedef struct socket_filter_s @@ -611,19 +609,17 @@ static void onRecvFrom(hio_t *io, shift_buffer_t *buf) // char peeraddrstr[SOCKADDR_STRLEN] = {0}; // printf("[%s] <=> [%s]\n", SOCKADDR_STR(hio_localaddr(io), localaddrstr), // SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); - // hash_t peeraddr_hash = sockAddrCalcHash((sockaddr_u *) hio_peeraddr(io)); + udpsock_t *socket = hevent_userdata(io); uint16_t local_port = sockaddr_port((sockaddr_u *) hio_localaddr(io)); uint8_t target_tid = local_port % workers_count; - distributeUdpPayload((udp_payload_t){ - .sock = socket, - .buf = buf, - .tid = target_tid, - .peer_addr = *(sockaddr_u *) hio_peeraddr(io), - .real_localport = local_port - }); + distributeUdpPayload((udp_payload_t){.sock = socket, + .buf = buf, + .tid = target_tid, + .peer_addr = *(sockaddr_u *) hio_peeraddr(io), + .real_localport = local_port}); } static void listenUdpSinglePort(hloop_t *loop, socket_filter_t *filter, char *host, uint16_t port, @@ -691,7 +687,7 @@ static void listenUdp(hloop_t *loop, uint8_t *ports_overlapped) } } } - +// todo (async channel) :( struct udp_sb { hio_t *socket_io; @@ -703,7 +699,7 @@ void writeUdpThisLoop(hevent_t *ev) size_t nwrite = hio_write(ub->socket_io, ub->buf); free(ub); } -void writeUdp(hio_t *socket_io, shift_buffer_t *buf) +void postUdpWrite(hio_t *socket_io, shift_buffer_t *buf) { struct udp_sb *ub = malloc(sizeof(struct udp_sb)); *ub = (struct udp_sb){.socket_io = socket_io, buf = buf}; diff --git a/ww/managers/socket_manager.h b/ww/managers/socket_manager.h index d8a392f1..30bf8ee6 100644 --- a/ww/managers/socket_manager.h +++ b/ww/managers/socket_manager.h @@ -62,7 +62,7 @@ typedef struct udp_payload_s shift_buffer_t *buf; } udp_payload_t; -void writeUdp(hio_t *socket_io, shift_buffer_t *buf); +void postUdpWrite(hio_t *socket_io, shift_buffer_t *buf); void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onAccept cb); diff --git a/ww/tunnel.c b/ww/tunnel.c index 52454f29..24f92a51 100644 --- a/ww/tunnel.c +++ b/ww/tunnel.c @@ -14,6 +14,8 @@ extern bool isAlive(line_t *line); extern void reuseContextBuffer(context_t *c); extern bool isFullyAuthenticated(line_t *line); extern bool isAuthenticated(line_t *line); +extern void lockLine(line_t *line); +extern void unLockLine(line_t *line); extern void markAuthenticated(line_t *line); extern void markAuthenticationNodePresence(line_t *line); extern context_t *newContext(line_t *line);