From d6fa120a8677f4f4da4bdbd48851c579db2266ee Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Sat, 3 Aug 2024 21:06:30 +0000 Subject: [PATCH] Finish integrating master pool and tested a little bit --- ww/buffer_pool.c | 32 +++++++++++++------------- ww/generic_pool.c | 14 ++++++++---- ww/managers/memory_manager.h | 2 +- ww/managers/socket_manager.c | 34 +++++++++++++++------------- ww/master_pool.c | 2 +- ww/ww.c | 44 ++++++++++++++++++++++++------------ ww/ww.h | 6 +++++ 7 files changed, 81 insertions(+), 53 deletions(-) diff --git a/ww/buffer_pool.c b/ww/buffer_pool.c index 794a709..dc45cee 100644 --- a/ww/buffer_pool.c +++ b/ww/buffer_pool.c @@ -80,7 +80,7 @@ static void reChargeLargeBuffers(buffer_pool_t *pool) const size_t increase = min((pool->cap - pool->large_buffers_len), pool->cap / 2); popMasterPoolItems(pool->large_buffers_mp, (void const **) &(pool->large_buffers[pool->large_buffers_len]), - increase, pool); + increase, pool); pool->large_buffers_len += increase; #if defined(DEBUG) && defined(BUFFER_POOL_DEBUG) @@ -93,7 +93,7 @@ static void reChargeSmallBuffers(buffer_pool_t *pool) const size_t increase = min((pool->cap - pool->small_buffers_len), pool->cap / 2); popMasterPoolItems(pool->small_buffers_mp, (void const **) &(pool->small_buffers[pool->small_buffers_len]), - increase, pool); + increase, pool); pool->small_buffers_len += increase; #if defined(DEBUG) && defined(BUFFER_POOL_DEBUG) @@ -226,7 +226,8 @@ shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict return b2; } -static buffer_pool_t *allocBufferPool(struct master_pool_s* mp_large,struct master_pool_s* mp_small,uint8_t tid, unsigned int bufcount, unsigned int large_buffer_size, +static buffer_pool_t *allocBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, uint8_t tid, + unsigned int bufcount, unsigned int large_buffer_size, unsigned int small_buffer_size) { // stop using pool if you want less, simply uncomment lines in popbuffer and reuseBuffer @@ -239,19 +240,18 @@ static buffer_pool_t *allocBufferPool(struct master_pool_s* mp_large,struct mast buffer_pool_t *ptr_pool = globalMalloc(sizeof(buffer_pool_t)); - *ptr_pool = (buffer_pool_t) { - .cap = bufcount, - .large_buffers_size = large_buffer_size, - .small_buffers_size = small_buffer_size, - .free_threshold = max(bufcount / 2, (bufcount * 2) / 3), + *ptr_pool = (buffer_pool_t) {.cap = bufcount, + .large_buffers_size = large_buffer_size, + .small_buffers_size = small_buffer_size, + .free_threshold = max(bufcount / 2, (bufcount * 2) / 3), #if defined(DEBUG) && defined(BUFFER_POOL_DEBUG) - .in_use = 0, + .in_use = 0, #endif - .large_buffers_mp = mp_large, - .large_buffers = globalMalloc(container_len), - .small_buffers_mp = mp_small, - .small_buffers = globalMalloc(container_len), - .tid = tid}; + .large_buffers_mp = mp_large, + .large_buffers = globalMalloc(container_len), + .small_buffers_mp = mp_small, + .small_buffers = globalMalloc(container_len), + .tid = tid}; installMasterPoolAllocCallbacks(ptr_pool->large_buffers_mp, createLargeBufHandle, destroyLargeBufHandle); installMasterPoolAllocCallbacks(ptr_pool->small_buffers_mp, createSmallBufHandle, destroySmallBufHandle); @@ -265,7 +265,7 @@ static buffer_pool_t *allocBufferPool(struct master_pool_s* mp_large,struct mast return ptr_pool; } -buffer_pool_t *createBufferPool(struct master_pool_s* mp_large,struct master_pool_s* mp_small,uint8_t tid) +buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, uint8_t tid) { - return allocBufferPool(mp_large,mp_small,tid, BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE, SMALL_BUFSIZE); + return allocBufferPool(mp_large, mp_small, tid, BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE, SMALL_BUFSIZE); } diff --git a/ww/generic_pool.c b/ww/generic_pool.c index 01b64f2..d125f15 100644 --- a/ww/generic_pool.c +++ b/ww/generic_pool.c @@ -10,16 +10,22 @@ static master_pool_item_t *poolCreateItemHandle(struct master_pool_s *pool, void { (void) pool; generic_pool_t *gpool = userdata; - ifgp - - return gpool->create_item_handle(gpool); + if (gpool->item_size == 0) + { + return gpool->create_item_handle(gpool); + } + return globalMalloc(gpool->item_size); } static void poolDestroyItemHandle(struct master_pool_s *pool, master_pool_item_t *item, void *userdata) { (void) pool; generic_pool_t *gpool = userdata; - gpool->destroy_item_handle(gpool, item); + if (gpool->item_size == 0) + { + gpool->destroy_item_handle(gpool, item); + } + globalFree(item); } void poolReCharge(generic_pool_t *pool) diff --git a/ww/managers/memory_manager.h b/ww/managers/memory_manager.h index 618632c..d09485e 100644 --- a/ww/managers/memory_manager.h +++ b/ww/managers/memory_manager.h @@ -1,6 +1,6 @@ #pragma once -#define ALLOCATOR_BYPASS // switch to stdlib allocators +// #define ALLOCATOR_BYPASS // switch to stdlib allocators #include #include diff --git a/ww/managers/socket_manager.c b/ww/managers/socket_manager.c index 7701fb6..00e846f 100644 --- a/ww/managers/socket_manager.c +++ b/ww/managers/socket_manager.c @@ -435,7 +435,7 @@ void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onA option.shared_balance_table = b_table; } - *filter = (socket_filter_t){.tunnel = tunnel, .option = option, .cb = cb, .listen_io = NULL}; + *filter = (socket_filter_t) {.tunnel = tunnel, .option = option, .cb = cb, .listen_io = NULL}; hhybridmutex_lock(&(state->mutex)); filters_t_push(&(state->filters[pirority]), filter); @@ -467,7 +467,7 @@ static void distributeSocket(void *io, socket_filter_t *filter, uint16_t local_p result->real_localport = local_port; hloop_t *worker_loop = getWorkerLoop(tid); - hevent_t ev = (hevent_t){.loop = worker_loop, .cb = filter->cb}; + hevent_t ev = (hevent_t) {.loop = worker_loop, .cb = filter->cb}; result->tid = tid; result->io = io; result->tunnel = filter->tunnel; @@ -826,7 +826,7 @@ static void postPayload(udp_payload_t post_pl, socket_filter_t *filter) pl->tunnel = filter->tunnel; hloop_t *worker_loop = getWorkerLoop(pl->tid); - hevent_t ev = (hevent_t){.loop = worker_loop, .cb = filter->cb}; + hevent_t ev = (hevent_t) {.loop = worker_loop, .cb = filter->cb}; ev.userdata = (void *) pl; hloop_post_event(worker_loop, &ev); @@ -922,13 +922,13 @@ static void onRecvFrom(hio_t *io, shift_buffer_t *buf) { udpsock_t *socket = hevent_userdata(io); uint16_t local_port = sockaddr_port((sockaddr_u *) hio_localaddr_u(io)); - uint8_t target_tid = local_port % getWorkersCount(); + uint8_t target_tid = local_port % getWorkersCount(); - udp_payload_t item = (udp_payload_t){.sock = socket, - .buf = buf, - .tid = target_tid, - .peer_addr = *(sockaddr_u *) hio_peeraddr_u(io), - .real_localport = local_port}; + udp_payload_t item = (udp_payload_t) {.sock = socket, + .buf = buf, + .tid = target_tid, + .peer_addr = *(sockaddr_u *) hio_peeraddr_u(io), + .real_localport = local_port}; distributeUdpPayload(item); } @@ -950,7 +950,7 @@ static void listenUdpSinglePort(hloop_t *loop, socket_filter_t *filter, char *ho exit(1); } udpsock_t *socket = globalMalloc(sizeof(udpsock_t)); - *socket = (udpsock_t){.io = filter->listen_io, .table = newIdleTable(loop)}; + *socket = (udpsock_t) {.io = filter->listen_io, .table = newIdleTable(loop)}; hevent_set_userdata(filter->listen_io, socket); hio_setcb_read(filter->listen_io, onRecvFrom); hio_read(filter->listen_io); @@ -1014,9 +1014,9 @@ void postUdpWrite(udpsock_t *socket_io, uint8_t tid_from, shift_buffer_t *buf) udp_payload_t *item = newUpdPayload(tid_from); - *item = (udp_payload_t){.sock = socket_io, .buf = buf, .tid = tid_from}; + *item = (udp_payload_t) {.sock = socket_io, .buf = buf, .tid = tid_from}; - hevent_t ev = (hevent_t){.loop = hevent_loop(socket_io->io), .userdata = item, .cb = writeUdpThisLoop}; + hevent_t ev = (hevent_t) {.loop = hevent_loop(socket_io->io), .userdata = item, .cb = writeUdpThisLoop}; hloop_post_event(hevent_loop(socket_io->io), &ev); } @@ -1085,15 +1085,17 @@ socket_manager_state_t *createSocketManager(worker_t *worker) state->tcp_pools = globalMalloc(sizeof(*state->tcp_pools) * getWorkersCount()); memset(state->tcp_pools, 0, sizeof(*state->tcp_pools) * getWorkersCount()); - + master_pool_t *mp_udp = newMasterPoolWithCap(2 * ((8) + RAM_PROFILE)); + master_pool_t *mp_tcp = newMasterPoolWithCap(2 * ((8) + RAM_PROFILE)); for (unsigned int i = 0; i < getWorkersCount(); ++i) { + state->udp_pools[i].pool = - newGenericPoolWithCap((8) + RAM_PROFILE, allocUdpPayloadPoolHandle, destroyUdpPayloadPoolHandle); + newGenericPoolWithCap(mp_udp, (8) + RAM_PROFILE, allocUdpPayloadPoolHandle, destroyUdpPayloadPoolHandle); hhybridmutex_init(&(state->udp_pools[i].mutex)); - state->tcp_pools[i].pool = - newGenericPoolWithCap((8) + RAM_PROFILE, allocTcpResultObjectPoolHandle, destroyTcpResultObjectPoolHandle); + state->tcp_pools[i].pool = newGenericPoolWithCap(mp_tcp, (8) + RAM_PROFILE, allocTcpResultObjectPoolHandle, + destroyTcpResultObjectPoolHandle); hhybridmutex_init(&(state->tcp_pools[i].mutex)); } diff --git a/ww/master_pool.c b/ww/master_pool.c index 53fadda..6823aee 100644 --- a/ww/master_pool.c +++ b/ww/master_pool.c @@ -1,7 +1,7 @@ #include "master_pool.h" #include "ww.h" -static void defaultCreateHandle(struct master_pool_s *pool, void *userdata) +static master_pool_item_t* defaultCreateHandle(struct master_pool_s *pool, void *userdata) { (void) pool; (void) userdata; diff --git a/ww/ww.c b/ww/ww.c index 2cd825c..128a043 100644 --- a/ww/ww.c +++ b/ww/ww.c @@ -50,11 +50,12 @@ static void initalizeSocketManagerWorker(worker_t *worker, tid_t tid) { *worker = (worker_t) {.tid = tid}; - worker->shift_buffer_pool = - newGenericPoolWithCap((64) + GSTATE.ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); + worker->shift_buffer_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile, + allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); GSTATE.shortcut_shift_buffer_pools[tid] = getWorker(tid)->shift_buffer_pool; - worker->buffer_pool = createBufferPool(worker->tid); + worker->buffer_pool = + createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, worker->tid); GSTATE.shortcut_buffer_pools[tid] = getWorker(tid)->buffer_pool; worker->loop = hloop_new(HLOOP_FLAG_AUTO_FREE, worker->buffer_pool, 0); @@ -65,25 +66,27 @@ static void initalizeWorker(worker_t *worker, tid_t tid) { *worker = (worker_t) {.tid = tid}; - worker->shift_buffer_pool = - newGenericPoolWithCap((64) + GSTATE.ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); + worker->shift_buffer_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile, + allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); GSTATE.shortcut_shift_buffer_pools[tid] = getWorker(tid)->shift_buffer_pool; - worker->buffer_pool = createBufferPool(worker->tid); + worker->buffer_pool = + createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, worker->tid); GSTATE.shortcut_buffer_pools[tid] = getWorker(tid)->buffer_pool; worker->loop = hloop_new(HLOOP_FLAG_AUTO_FREE, worker->buffer_pool, 0); GSTATE.shortcut_loops[tid] = getWorker(tid)->loop; - worker->context_pool = - newGenericPoolWithCap((16) + GSTATE.ram_profile, allocContextPoolHandle, destroyContextPoolHandle); + worker->context_pool = newGenericPoolWithCap(GSTATE.masterpool_context_pools, (16) + GSTATE.ram_profile, + allocContextPoolHandle, destroyContextPoolHandle); GSTATE.shortcut_context_pools[tid] = getWorker(tid)->context_pool; - worker->line_pool = newGenericPoolWithCap((8) + GSTATE.ram_profile, allocLinePoolHandle, destroyLinePoolHandle); + worker->line_pool = newGenericPoolWithCap(GSTATE.masterpool_line_pools, (8) + GSTATE.ram_profile, + allocLinePoolHandle, destroyLinePoolHandle); GSTATE.shortcut_line_pools[tid] = getWorker(tid)->line_pool; - worker->pipeline_msg_pool = - newGenericPoolWithCap((8) + GSTATE.ram_profile, allocPipeLineMsgPoolHandle, destroyPipeLineMsgPoolHandle); + worker->pipeline_msg_pool = newGenericPoolWithCap(GSTATE.masterpool_pipeline_msg_pools, (8) + GSTATE.ram_profile, + allocPipeLineMsgPoolHandle, destroyPipeLineMsgPoolHandle); GSTATE.shortcut_pipeline_msg_pools[tid] = getWorker(tid)->pipeline_msg_pool; } @@ -128,10 +131,8 @@ static void initializeShortCuts(void) { assert(GSTATE.initialized); - tid_t workers_count = GSTATE.workers_count; - static const int kShourtcutsCount = 6; - const int total_workers = workers_count + kAdditionalReservedWorkers; + const int total_workers = WORKERS_COUNT + kAdditionalReservedWorkers; void **space = globalMalloc(sizeof(void *) * kShourtcutsCount * total_workers); @@ -143,6 +144,18 @@ static void initializeShortCuts(void) GSTATE.shortcut_pipeline_msg_pools = (generic_pool_t **) (space + (5UL * total_workers)); } +static void initializeMasterPools(void) +{ + assert(GSTATE.initialized); + + GSTATE.masterpool_shift_buffer_pools = newMasterPoolWithCap(2 * ((64) + GSTATE.ram_profile)); + GSTATE.masterpool_buffer_pools_large = newMasterPoolWithCap(2 * ((0) + GSTATE.ram_profile)); + GSTATE.masterpool_buffer_pools_small = newMasterPoolWithCap(2 * ((0) + GSTATE.ram_profile)); + GSTATE.masterpool_context_pools = newMasterPoolWithCap(2 * ((16) + GSTATE.ram_profile)); + GSTATE.masterpool_line_pools = newMasterPoolWithCap(2 * ((8) + GSTATE.ram_profile)); + GSTATE.masterpool_pipeline_msg_pools = newMasterPoolWithCap(2 * ((8) + GSTATE.ram_profile)); +} + void createWW(const ww_construction_data_t init_data) { GSTATE.initialized = true; @@ -191,9 +204,10 @@ void createWW(const ww_construction_data_t init_data) WORKERS_COUNT = (255 - kAdditionalReservedWorkers); } - WORKERS = (worker_t *) malloc(sizeof(worker_t) * (WORKERS_COUNT + kAdditionalReservedWorkers)); + WORKERS = (worker_t *) globalMalloc(sizeof(worker_t) * (WORKERS_COUNT + kAdditionalReservedWorkers)); initializeShortCuts(); + initializeMasterPools(); for (unsigned int i = 0; i < WORKERS_COUNT; ++i) { diff --git a/ww/ww.h b/ww/ww.h index 732cb59..2774478 100644 --- a/ww/ww.h +++ b/ww/ww.h @@ -113,6 +113,12 @@ typedef struct ww_global_state_s struct generic_pool_s **shortcut_context_pools; struct generic_pool_s **shortcut_line_pools; struct generic_pool_s **shortcut_pipeline_msg_pools; + struct master_pool_s *masterpool_buffer_pools_large; + struct master_pool_s *masterpool_buffer_pools_small; + struct master_pool_s *masterpool_shift_buffer_pools; + struct master_pool_s *masterpool_context_pools; + struct master_pool_s *masterpool_line_pools; + struct master_pool_s *masterpool_pipeline_msg_pools; struct worker_s *workers; struct socket_manager_s *socekt_manager; struct node_manager_s *node_manager;