diff --git a/tunnels/adapters/listener/tcp/tcp_listener.c b/tunnels/adapters/listener/tcp/tcp_listener.c index a57fca67..422d5cfd 100644 --- a/tunnels/adapters/listener/tcp/tcp_listener.c +++ b/tunnels/adapters/listener/tcp/tcp_listener.c @@ -305,7 +305,7 @@ static void onInboundConnected(hevent_t *ev) SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); } - free(data); + destroySocketAcceptResult(data); hio_setcb_read(io, onRecv); hio_setcb_close(io, onClose); diff --git a/tunnels/adapters/listener/udp/udp_listener.c b/tunnels/adapters/listener/udp/udp_listener.c index 6fe0c4b8..6b9be410 100644 --- a/tunnels/adapters/listener/udp/udp_listener.c +++ b/tunnels/adapters/listener/udp/udp_listener.c @@ -101,7 +101,7 @@ static void downStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { - postUdpWrite(cstate->uio, c->payload); + postUdpWrite(cstate->uio,c->line->tid, c->payload); CONTEXT_PAYLOAD_DROP(c); destroyContext(c); } @@ -203,7 +203,7 @@ static void onFilteredRecv(hevent_t *ev) if (! idle) { reuseBuffer(getThreadBufferPool(data->tid), data->buf); - free(data); + destroyUdpPayload(data); return; } udp_listener_con_state_t *con = newConnection(data->tid, data->tunnel, data->sock, data->real_localport); @@ -212,7 +212,7 @@ static void onFilteredRecv(hevent_t *ev) { removeIdleItemByHash(data->tid, data->sock->table, peeraddr_hash); reuseBuffer(getThreadBufferPool(data->tid), data->buf); - free(data); + destroyUdpPayload(data); return; } idle->userdata = con; @@ -229,7 +229,7 @@ static void onFilteredRecv(hevent_t *ev) context->payload = data->buf; self->upStream(self, context); - free(data); + destroyUdpPayload(data); } static void parsePortSection(udp_listener_state_t *state, const cJSON *settings) diff --git a/ww/managers/socket_manager.c b/ww/managers/socket_manager.c index b31e001f..b0e557cd 100644 --- a/ww/managers/socket_manager.c +++ b/ww/managers/socket_manager.c @@ -68,13 +68,13 @@ typedef struct socket_manager_s static socket_manager_state_t *state = NULL; -static pool_item_t *allocTcpPayloadPoolHandle(struct generic_pool_s *pool) +static pool_item_t *allocTcpResultObjectPoolHandle(struct generic_pool_s *pool) { (void) pool; return malloc(sizeof(socket_accept_result_t)); } -static void destroyTcpPayloadPoolHandle(struct generic_pool_s *pool, pool_item_t *item) +static void destroyTcpResultObjectPoolHandle(struct generic_pool_s *pool, pool_item_t *item) { (void) pool; free(item); @@ -92,6 +92,31 @@ static void destroyUdpPayloadPoolHandle(struct generic_pool_s *pool, pool_item_t free(item); } +void destroySocketAcceptResult(socket_accept_result_t *sar) +{ + const uint8_t tid = sar->tid; + hhybridmutex_lock(&(state->tcp_pools[tid].mutex)); + reusePoolItem(state->tcp_pools[tid].pool, sar); + hhybridmutex_unlock(&(state->tcp_pools[tid].mutex)); +} + +static udp_payload_t *newUpdPayload(uint8_t tid) +{ + hhybridmutex_lock(&(state->udp_pools[tid].mutex)); + udp_payload_t *item = popPoolItem(state->udp_pools[tid].pool); + hhybridmutex_unlock(&(state->udp_pools[tid].mutex)); + return item; +} + +void destroyUdpPayload(udp_payload_t *upl) +{ + const uint8_t tid = upl->tid; + + hhybridmutex_lock(&(state->udp_pools[tid].mutex)); + reusePoolItem(state->udp_pools[tid].pool, upl); + hhybridmutex_unlock(&(state->udp_pools[tid].mutex)); +} + static bool redirectPortRangeTcp(unsigned int pmin, unsigned int pmax, unsigned int to) { char b[300]; @@ -357,11 +382,15 @@ static inline void incrementDistributeTid(void) } static void distributeSocket(void *io, socket_filter_t *filter, uint16_t local_port) { - // todo (pool) not really, but its recommended - socket_accept_result_t *result = malloc(sizeof(socket_accept_result_t)); - result->real_localport = local_port; - uint8_t tid = (uint8_t) getCurrentDistributeTid(); + uint8_t tid = (uint8_t) getCurrentDistributeTid(); + + hhybridmutex_lock(&(state->tcp_pools[tid].mutex)); + socket_accept_result_t *result = popPoolItem(state->tcp_pools[tid].pool); + hhybridmutex_unlock(&(state->tcp_pools[tid].mutex)); + + result->real_localport = local_port; + hloop_t *worker_loop = loops[tid]; hevent_t ev = (hevent_t){.loop = worker_loop, .cb = filter->cb}; result->tid = tid; @@ -644,17 +673,15 @@ static void listenTcp(hloop_t *loop, uint8_t *ports_overlapped) // } // } -static void noUdpSocketConsumerFound(udp_payload_t *pl) +static void noUdpSocketConsumerFound(udp_payload_t *upl) { char localaddrstr[SOCKADDR_STRLEN] = {0}; char peeraddrstr[SOCKADDR_STRLEN] = {0}; LOGE("SocketManager: could not find consumer for Udp socket [%s] <= [%s]", - SOCKADDR_STR(hio_localaddr(pl->sock->io), localaddrstr), - SOCKADDR_STR(hio_peeraddr(pl->sock->io), peeraddrstr)); + SOCKADDR_STR(hio_localaddr(upl->sock->io), localaddrstr), + SOCKADDR_STR(hio_peeraddr(upl->sock->io), peeraddrstr)); - hhybridmutex_lock(&(state->udp_pools[pl->tid].mutex)); - reusePoolItem(state->udp_pools[pl->tid].pool, pl); - hhybridmutex_unlock(&(state->udp_pools[pl->tid].mutex)); + destroyUdpPayload(upl); } static void postPayload(udp_payload_t *pl, socket_filter_t *filter) @@ -794,22 +821,18 @@ static void writeUdpThisLoop(hevent_t *ev) udp_payload_t *upl = hevent_userdata(ev); size_t nwrite = hio_write(upl->sock->io, upl->buf); (void) nwrite; - hhybridmutex_lock(&(state->udp_pools[upl->tid].mutex)); - reusePoolItem(state->udp_pools[upl->tid].pool, upl); - hhybridmutex_unlock(&(state->udp_pools[upl->tid].mutex)); + destroyUdpPayload(upl); } -void postUdpWrite(udpsock_t *socket_io, shift_buffer_t *buf) +void postUdpWrite(udpsock_t *socket_io, uint8_t tid_from, shift_buffer_t *buf) { - const uint8_t tid = hloop_tid(hevent_loop(socket_io)); - hhybridmutex_lock(&(state->udp_pools[tid].mutex)); - udp_payload_t *item = popPoolItem(state->udp_pools[tid].pool); - hhybridmutex_unlock(&(state->udp_pools[tid].mutex)); - *item = (udp_payload_t){.sock = socket_io, .buf = buf, .tid = tid}; + udp_payload_t *item = newUpdPayload(tid_from); - hevent_t ev = (hevent_t){.loop = hevent_loop(socket_io), .userdata = item, .cb = writeUdpThisLoop}; + *item = (udp_payload_t){.sock = socket_io, .buf = buf, .tid = tid_from}; - hloop_post_event(hevent_loop(socket_io), &ev); + hevent_t ev = (hevent_t){.loop = hevent_loop(socket_io->io), .userdata = item, .cb = writeUdpThisLoop}; + + hloop_post_event(hevent_loop(socket_io->io), &ev); } static HTHREAD_ROUTINE(accept_thread) // NOLINT @@ -866,6 +889,12 @@ socket_manager_state_t *createSocketManager(void) hhybridmutex_init(&state->mutex); + state->udp_pools = malloc(sizeof(*state->udp_pools) * workers_count); + memset(state->udp_pools, 0, sizeof(*state->udp_pools) * workers_count); + + state->tcp_pools = malloc(sizeof(*state->tcp_pools) * workers_count); + memset(state->tcp_pools, 0, sizeof(*state->tcp_pools) * workers_count); + for (unsigned int i = 0; i < workers_count; ++i) { state->udp_pools[i].pool = @@ -873,7 +902,7 @@ socket_manager_state_t *createSocketManager(void) hhybridmutex_init(&(state->udp_pools[i].mutex)); state->tcp_pools[i].pool = - newGenericPoolWithSize((8) + ram_profile, allocTcpPayloadPoolHandle, destroyTcpPayloadPoolHandle); + newGenericPoolWithSize((8) + ram_profile, allocTcpResultObjectPoolHandle, destroyTcpResultObjectPoolHandle); hhybridmutex_init(&(state->tcp_pools[i].mutex)); } diff --git a/ww/managers/socket_manager.h b/ww/managers/socket_manager.h index 3618d327..301f5691 100644 --- a/ww/managers/socket_manager.h +++ b/ww/managers/socket_manager.h @@ -54,6 +54,8 @@ typedef struct socket_accept_result_s typedef void (*onAccept)(hevent_t *ev); +void destroySocketAcceptResult(socket_accept_result_t *); + typedef struct udpsock_s { hio_t *io; @@ -72,12 +74,11 @@ typedef struct udp_payload_s } udp_payload_t; -udp_payload_t *newUdpPayload(uint8_t tid); -void destroyUdpPayload(udp_payload_t *); +void destroyUdpPayload(udp_payload_t *); struct socket_manager_s *getSocketManager(void); struct socket_manager_s *createSocketManager(void); void setSocketManager(struct socket_manager_s *state); void startSocketManager(void); void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onAccept cb); -void postUdpWrite(udpsock_t *socket_io, shift_buffer_t *buf); +void postUdpWrite(udpsock_t *socket_io,uint8_t tid_from, shift_buffer_t *buf); diff --git a/ww/ww.c b/ww/ww.c index 9d01e34b..587e7975 100644 --- a/ww/ww.c +++ b/ww/ww.c @@ -59,7 +59,6 @@ void setWW(struct ww_runtime_state_s *state) context_pools = state->context_pools; line_pools = state->line_pools; pipeline_msg_pools = state->pipeline_msg_pools; - udp_post_pools = state->udp_post_pools; libhv_hio_pools = state->libhv_hio_pools; socekt_manager = state->socekt_manager; node_manager = state->node_manager; @@ -83,7 +82,6 @@ struct ww_runtime_state_s *getWW(void) state->context_pools = context_pools; state->line_pools = line_pools; state->pipeline_msg_pools = pipeline_msg_pools; - state->udp_post_pools = udp_post_pools; state->libhv_hio_pools = libhv_hio_pools; state->socekt_manager = socekt_manager; state->node_manager = node_manager;