Skip to content

Commit

Permalink
Finish socket manager new pools
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 24, 2024
1 parent f34f864 commit 7ce515d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 34 deletions.
2 changes: 1 addition & 1 deletion tunnels/adapters/listener/tcp/tcp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ static void onInboundConnected(hevent_t *ev)
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
}

free(data);
destroySocketAcceptResult(data);

hio_setcb_read(io, onRecv);
hio_setcb_close(io, onClose);
Expand Down
8 changes: 4 additions & 4 deletions tunnels/adapters/listener/udp/udp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ static void downStream(tunnel_t *self, context_t *c)

if (c->payload != NULL)
{
postUdpWrite(cstate->uio, c->payload);
postUdpWrite(cstate->uio,c->line->tid, c->payload);
CONTEXT_PAYLOAD_DROP(c);
destroyContext(c);
}
Expand Down Expand Up @@ -203,7 +203,7 @@ static void onFilteredRecv(hevent_t *ev)
if (! idle)
{
reuseBuffer(getThreadBufferPool(data->tid), data->buf);
free(data);
destroyUdpPayload(data);
return;
}
udp_listener_con_state_t *con = newConnection(data->tid, data->tunnel, data->sock, data->real_localport);
Expand All @@ -212,7 +212,7 @@ static void onFilteredRecv(hevent_t *ev)
{
removeIdleItemByHash(data->tid, data->sock->table, peeraddr_hash);
reuseBuffer(getThreadBufferPool(data->tid), data->buf);
free(data);
destroyUdpPayload(data);
return;
}
idle->userdata = con;
Expand All @@ -229,7 +229,7 @@ static void onFilteredRecv(hevent_t *ev)
context->payload = data->buf;

self->upStream(self, context);
free(data);
destroyUdpPayload(data);
}

static void parsePortSection(udp_listener_state_t *state, const cJSON *settings)
Expand Down
77 changes: 53 additions & 24 deletions ww/managers/socket_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ typedef struct socket_manager_s

static socket_manager_state_t *state = NULL;

static pool_item_t *allocTcpPayloadPoolHandle(struct generic_pool_s *pool)
static pool_item_t *allocTcpResultObjectPoolHandle(struct generic_pool_s *pool)
{
(void) pool;
return malloc(sizeof(socket_accept_result_t));
}

static void destroyTcpPayloadPoolHandle(struct generic_pool_s *pool, pool_item_t *item)
static void destroyTcpResultObjectPoolHandle(struct generic_pool_s *pool, pool_item_t *item)
{
(void) pool;
free(item);
Expand All @@ -92,6 +92,31 @@ static void destroyUdpPayloadPoolHandle(struct generic_pool_s *pool, pool_item_t
free(item);
}

void destroySocketAcceptResult(socket_accept_result_t *sar)
{
const uint8_t tid = sar->tid;
hhybridmutex_lock(&(state->tcp_pools[tid].mutex));
reusePoolItem(state->tcp_pools[tid].pool, sar);
hhybridmutex_unlock(&(state->tcp_pools[tid].mutex));
}

static udp_payload_t *newUpdPayload(uint8_t tid)
{
hhybridmutex_lock(&(state->udp_pools[tid].mutex));
udp_payload_t *item = popPoolItem(state->udp_pools[tid].pool);
hhybridmutex_unlock(&(state->udp_pools[tid].mutex));
return item;
}

void destroyUdpPayload(udp_payload_t *upl)
{
const uint8_t tid = upl->tid;

hhybridmutex_lock(&(state->udp_pools[tid].mutex));
reusePoolItem(state->udp_pools[tid].pool, upl);
hhybridmutex_unlock(&(state->udp_pools[tid].mutex));
}

static bool redirectPortRangeTcp(unsigned int pmin, unsigned int pmax, unsigned int to)
{
char b[300];
Expand Down Expand Up @@ -357,11 +382,15 @@ static inline void incrementDistributeTid(void)
}
static void distributeSocket(void *io, socket_filter_t *filter, uint16_t local_port)
{
// todo (pool) not really, but its recommended
socket_accept_result_t *result = malloc(sizeof(socket_accept_result_t));
result->real_localport = local_port;

uint8_t tid = (uint8_t) getCurrentDistributeTid();
uint8_t tid = (uint8_t) getCurrentDistributeTid();

hhybridmutex_lock(&(state->tcp_pools[tid].mutex));
socket_accept_result_t *result = popPoolItem(state->tcp_pools[tid].pool);
hhybridmutex_unlock(&(state->tcp_pools[tid].mutex));

result->real_localport = local_port;

hloop_t *worker_loop = loops[tid];
hevent_t ev = (hevent_t){.loop = worker_loop, .cb = filter->cb};
result->tid = tid;
Expand Down Expand Up @@ -644,17 +673,15 @@ static void listenTcp(hloop_t *loop, uint8_t *ports_overlapped)
// }
// }

static void noUdpSocketConsumerFound(udp_payload_t *pl)
static void noUdpSocketConsumerFound(udp_payload_t *upl)
{
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
LOGE("SocketManager: could not find consumer for Udp socket [%s] <= [%s]",
SOCKADDR_STR(hio_localaddr(pl->sock->io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(pl->sock->io), peeraddrstr));
SOCKADDR_STR(hio_localaddr(upl->sock->io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(upl->sock->io), peeraddrstr));

hhybridmutex_lock(&(state->udp_pools[pl->tid].mutex));
reusePoolItem(state->udp_pools[pl->tid].pool, pl);
hhybridmutex_unlock(&(state->udp_pools[pl->tid].mutex));
destroyUdpPayload(upl);
}

static void postPayload(udp_payload_t *pl, socket_filter_t *filter)
Expand Down Expand Up @@ -794,22 +821,18 @@ static void writeUdpThisLoop(hevent_t *ev)
udp_payload_t *upl = hevent_userdata(ev);
size_t nwrite = hio_write(upl->sock->io, upl->buf);
(void) nwrite;
hhybridmutex_lock(&(state->udp_pools[upl->tid].mutex));
reusePoolItem(state->udp_pools[upl->tid].pool, upl);
hhybridmutex_unlock(&(state->udp_pools[upl->tid].mutex));
destroyUdpPayload(upl);
}
void postUdpWrite(udpsock_t *socket_io, shift_buffer_t *buf)
void postUdpWrite(udpsock_t *socket_io, uint8_t tid_from, shift_buffer_t *buf)
{
const uint8_t tid = hloop_tid(hevent_loop(socket_io));
hhybridmutex_lock(&(state->udp_pools[tid].mutex));
udp_payload_t *item = popPoolItem(state->udp_pools[tid].pool);
hhybridmutex_unlock(&(state->udp_pools[tid].mutex));

*item = (udp_payload_t){.sock = socket_io, .buf = buf, .tid = tid};
udp_payload_t *item = newUpdPayload(tid_from);

hevent_t ev = (hevent_t){.loop = hevent_loop(socket_io), .userdata = item, .cb = writeUdpThisLoop};
*item = (udp_payload_t){.sock = socket_io, .buf = buf, .tid = tid_from};

hloop_post_event(hevent_loop(socket_io), &ev);
hevent_t ev = (hevent_t){.loop = hevent_loop(socket_io->io), .userdata = item, .cb = writeUdpThisLoop};

hloop_post_event(hevent_loop(socket_io->io), &ev);
}

static HTHREAD_ROUTINE(accept_thread) // NOLINT
Expand Down Expand Up @@ -866,14 +889,20 @@ socket_manager_state_t *createSocketManager(void)

hhybridmutex_init(&state->mutex);

state->udp_pools = malloc(sizeof(*state->udp_pools) * workers_count);
memset(state->udp_pools, 0, sizeof(*state->udp_pools) * workers_count);

state->tcp_pools = malloc(sizeof(*state->tcp_pools) * workers_count);
memset(state->tcp_pools, 0, sizeof(*state->tcp_pools) * workers_count);

for (unsigned int i = 0; i < workers_count; ++i)
{
state->udp_pools[i].pool =
newGenericPoolWithSize((8) + ram_profile, allocUdpPayloadPoolHandle, destroyUdpPayloadPoolHandle);
hhybridmutex_init(&(state->udp_pools[i].mutex));

state->tcp_pools[i].pool =
newGenericPoolWithSize((8) + ram_profile, allocTcpPayloadPoolHandle, destroyTcpPayloadPoolHandle);
newGenericPoolWithSize((8) + ram_profile, allocTcpResultObjectPoolHandle, destroyTcpResultObjectPoolHandle);
hhybridmutex_init(&(state->tcp_pools[i].mutex));
}

Expand Down
7 changes: 4 additions & 3 deletions ww/managers/socket_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ typedef struct socket_accept_result_s

typedef void (*onAccept)(hevent_t *ev);

void destroySocketAcceptResult(socket_accept_result_t *);

typedef struct udpsock_s
{
hio_t *io;
Expand All @@ -72,12 +74,11 @@ typedef struct udp_payload_s

} udp_payload_t;

udp_payload_t *newUdpPayload(uint8_t tid);
void destroyUdpPayload(udp_payload_t *);
void destroyUdpPayload(udp_payload_t *);

struct socket_manager_s *getSocketManager(void);
struct socket_manager_s *createSocketManager(void);
void setSocketManager(struct socket_manager_s *state);
void startSocketManager(void);
void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onAccept cb);
void postUdpWrite(udpsock_t *socket_io, shift_buffer_t *buf);
void postUdpWrite(udpsock_t *socket_io,uint8_t tid_from, shift_buffer_t *buf);
2 changes: 0 additions & 2 deletions ww/ww.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ void setWW(struct ww_runtime_state_s *state)
context_pools = state->context_pools;
line_pools = state->line_pools;
pipeline_msg_pools = state->pipeline_msg_pools;
udp_post_pools = state->udp_post_pools;
libhv_hio_pools = state->libhv_hio_pools;
socekt_manager = state->socekt_manager;
node_manager = state->node_manager;
Expand All @@ -83,7 +82,6 @@ struct ww_runtime_state_s *getWW(void)
state->context_pools = context_pools;
state->line_pools = line_pools;
state->pipeline_msg_pools = pipeline_msg_pools;
state->udp_post_pools = udp_post_pools;
state->libhv_hio_pools = libhv_hio_pools;
state->socekt_manager = socekt_manager;
state->node_manager = node_manager;
Expand Down

0 comments on commit 7ce515d

Please sign in to comment.