From b4b82f62db1acb12532672e920f043125369f2ed Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Wed, 26 Jun 2024 05:50:48 +0000 Subject: [PATCH] redesign ww, mostly buffers / new pools --- .../adapters/connector/tcp/tcp_connector.c | 2 +- tunnels/adapters/listener/tcp/tcp_listener.c | 2 +- tunnels/client/http2/helpers.h | 2 +- tunnels/client/openssl/openssl_client.c | 2 +- tunnels/client/reality/reality_client.c | 4 +- tunnels/client/reverse/helpers.h | 6 +- tunnels/client/wolfssl/wolfssl_client.c | 2 +- tunnels/server/halfduplex/halfduplex_server.c | 20 ------ tunnels/server/openssl/openssl_server.c | 10 +-- tunnels/server/reality/reality_server.c | 4 +- tunnels/server/reverse/helpers.h | 2 +- tunnels/server/reverse/reverse_server.c | 11 +-- tunnels/server/wolfssl/wolfssl_server.c | 2 +- ww/buffer_pool.c | 24 ++++--- ww/buffer_pool.h | 4 +- ww/context_queue.c | 6 +- ww/context_queue.h | 3 +- ww/generic_pool.c | 53 ++++++++++---- ww/generic_pool.h | 12 +++- ww/idle_table.h | 2 +- ww/managers/node_manager.c | 7 +- ww/managers/node_manager.h | 2 +- ww/managers/socket_manager.c | 65 +++++++---------- ww/pipe_line.h | 2 +- ww/shiftbuffer.c | 72 ++++++++++++------- ww/shiftbuffer.h | 27 +++---- ww/tunnel.c | 17 +++-- ww/tunnel.h | 15 ++-- ww/ww.c | 64 ++++++++++++----- ww/ww.h | 16 ++--- 30 files changed, 257 insertions(+), 203 deletions(-) diff --git a/tunnels/adapters/connector/tcp/tcp_connector.c b/tunnels/adapters/connector/tcp/tcp_connector.c index 0c877089..efa3f080 100644 --- a/tunnels/adapters/connector/tcp/tcp_connector.c +++ b/tunnels/adapters/connector/tcp/tcp_connector.c @@ -208,7 +208,7 @@ static void upStream(tunnel_t *self, context_t *c) *cstate = (tcp_connector_con_state_t){.buffer_pool = getContextBufferPool(c), .tunnel = self, .line = c->line, - .data_queue = newContextQueue(getContextBufferPool(c)), + .data_queue = newContextQueue(), .write_paused = true}; #ifdef PROFILE diff --git a/tunnels/adapters/listener/tcp/tcp_listener.c b/tunnels/adapters/listener/tcp/tcp_listener.c index 422d5cfd..38d00be8 100644 --- a/tunnels/adapters/listener/tcp/tcp_listener.c +++ b/tunnels/adapters/listener/tcp/tcp_listener.c @@ -280,7 +280,7 @@ static void onInboundConnected(hevent_t *ev) *cstate = (tcp_listener_con_state_t){.line = line, .buffer_pool = getThreadBufferPool(tid), - .data_queue = newContextQueue(getThreadBufferPool(tid)), + .data_queue = newContextQueue(), .io = io, .tunnel = self, .write_paused = false, diff --git a/tunnels/client/http2/helpers.h b/tunnels/client/http2/helpers.h index c860247f..fd7b6272 100644 --- a/tunnels/client/http2/helpers.h +++ b/tunnels/client/http2/helpers.h @@ -161,7 +161,7 @@ static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid) http2_client_con_state_t *con = malloc(sizeof(http2_client_con_state_t)); *con = (http2_client_con_state_t){ - .queue = newContextQueue(getThreadBufferPool(tid)), + .queue = newContextQueue(), .content_type = state->content_type, .path = state->path, .host = state->host, diff --git a/tunnels/client/openssl/openssl_client.c b/tunnels/client/openssl/openssl_client.c index 6b5c9015..6be69b74 100644 --- a/tunnels/client/openssl/openssl_client.c +++ b/tunnels/client/openssl/openssl_client.c @@ -162,7 +162,7 @@ static void upStream(tunnel_t *self, context_t *c) cstate->rbio = BIO_new(BIO_s_mem()); cstate->wbio = BIO_new(BIO_s_mem()); cstate->ssl = SSL_new(state->ssl_context); - cstate->queue = newContextQueue(getContextBufferPool(c)); + cstate->queue = newContextQueue(); SSL_set_connect_state(cstate->ssl); /* sets ssl to work in client mode. */ SSL_set_bio(cstate->ssl, cstate->rbio, cstate->wbio); SSL_set_tlsext_host_name(cstate->ssl, state->sni); diff --git a/tunnels/client/reality/reality_client.c b/tunnels/client/reality/reality_client.c index ae9a9dc1..eb091715 100644 --- a/tunnels/client/reality/reality_client.c +++ b/tunnels/client/reality/reality_client.c @@ -135,7 +135,7 @@ static void upStream(tunnel_t *self, context_t *c) while (bufLen(buf) > 0 && isAlive(c->line)) { const uint16_t remain = (uint16_t) min(bufLen(buf), chunk_size); - shift_buffer_t *chunk = shallowSliceBuffer(buf, remain); + shift_buffer_t *chunk = shallowSliceBuffer(c->line->tid,buf, remain); chunk = genericEncrypt(chunk, cstate->encryption_context, state->context_password, getContextBufferPool(c)); signMessage(chunk, cstate->msg_digest, cstate->sign_context, cstate->sign_key); @@ -159,7 +159,7 @@ static void upStream(tunnel_t *self, context_t *c) cstate->rbio = BIO_new(BIO_s_mem()); cstate->wbio = BIO_new(BIO_s_mem()); cstate->ssl = SSL_new(state->ssl_context); - cstate->queue = newContextQueue(getContextBufferPool(c)); + cstate->queue = newContextQueue(); cstate->encryption_context = EVP_CIPHER_CTX_new(); cstate->decryption_context = EVP_CIPHER_CTX_new(); cstate->sign_context = EVP_MD_CTX_create(); diff --git a/tunnels/client/reverse/helpers.h b/tunnels/client/reverse/helpers.h index 68109d15..0840ca5c 100644 --- a/tunnels/client/reverse/helpers.h +++ b/tunnels/client/reverse/helpers.h @@ -7,6 +7,8 @@ enum { + kHandShakeByte = 0xFF, + kHandShakeLength = 96, kPreconnectDelayShort = 10, kPreconnectDelayLong = 750, kConnectionStarvationTimeOut = 45000 @@ -73,8 +75,8 @@ static void doConnect(struct connect_arg *cg) } hello_data_ctx->first = true; hello_data_ctx->payload = popBuffer(getContextBufferPool(hello_data_ctx)); - setLen(hello_data_ctx->payload, 96); - memset(rawBufMut(hello_data_ctx->payload), 0xFF, 96); + setLen(hello_data_ctx->payload, kHandShakeLength); + memset(rawBufMut(hello_data_ctx->payload), kHandShakeByte, kHandShakeLength); self->up->upStream(self->up, hello_data_ctx); } diff --git a/tunnels/client/wolfssl/wolfssl_client.c b/tunnels/client/wolfssl/wolfssl_client.c index 01424fc1..82a66164 100644 --- a/tunnels/client/wolfssl/wolfssl_client.c +++ b/tunnels/client/wolfssl/wolfssl_client.c @@ -162,7 +162,7 @@ static void upStream(tunnel_t *self, context_t *c) cstate->rbio = BIO_new(BIO_s_mem()); cstate->wbio = BIO_new(BIO_s_mem()); cstate->ssl = SSL_new(state->ssl_context); - cstate->queue = newContextQueue(getContextBufferPool(c)); + cstate->queue = newContextQueue(); SSL_set_connect_state(cstate->ssl); /* sets ssl to work in client mode. */ SSL_set_bio(cstate->ssl, cstate->rbio, cstate->wbio); SSL_set_tlsext_host_name(cstate->ssl, state->sni); diff --git a/tunnels/server/halfduplex/halfduplex_server.c b/tunnels/server/halfduplex/halfduplex_server.c index a39f6a01..f27c72a6 100644 --- a/tunnels/server/halfduplex/halfduplex_server.c +++ b/tunnels/server/halfduplex/halfduplex_server.c @@ -23,9 +23,7 @@ enum connection_status { kCsUnkown, kCsUploadInTable, - // kCsUploadPipedIndirect, kCsUploadDirect, - // kCsUploadPipedDirect, kCsDownloadInTable, kCsDownloadDirect }; @@ -159,11 +157,6 @@ static void notifyDownloadLineIsReadyForBind(hash_t hash, tunnel_t *self, uint8_ pipeTo(self, upload_line_cstate->upload_line, tid_download_line); - // upload_line_cstate->state = kCsUploadPipedIndirect; - - // newPipeLine(&upload_line_cstate->pipe, self, tid_upload_line, upload_line_cstate->upload_line, - // tid_download_line, localUpStream, localDownStream); - if (upload_line_cstate->buffering) { context_t *bctx = newContext(upload_line_cstate->upload_line); @@ -323,18 +316,6 @@ static void upStream(tunnel_t *self, context_t *c) pipeUpStream(c); return; // piped to another worker which has waiting connections - // cstate->state = kCsUploadPipedIndirect; - // setupLineUpSide(c->line, onUploadInDirectLinePaused, cstate, onUploadInDirectLineResumed); - - // newPipeLine(&cstate->pipe, self, c->line->tid, c->line, tid_download_line, localUpStream, - // localDownStream); - - // if (! pipeUpStream(cstate->pipe, c)) - // { - // reuseContextBuffer(c); - // destroyContext(c); - // } - // return; } } else @@ -696,7 +677,6 @@ static void upStream(tunnel_t *self, context_t *c) static void downStream(tunnel_t *self, context_t *c) { - switchLine(c, ((halfduplex_server_con_state_t *) (c->line->dw_state))->download_line); halfduplex_server_con_state_t *cstate = CSTATE(c); if (c->payload != NULL) diff --git a/tunnels/server/openssl/openssl_server.c b/tunnels/server/openssl/openssl_server.c index fda04ab5..9e2c04de 100644 --- a/tunnels/server/openssl/openssl_server.c +++ b/tunnels/server/openssl/openssl_server.c @@ -105,10 +105,6 @@ static size_t paddingDecisionCb(SSL *ssl, int type, size_t len, void *arg) (void) type; oss_server_con_state_t *cstate = arg; - if (cstate->reply_sent_tit == 1 && len <= 4096) - { - return (size_t) 4096 - len; - } if (cstate->reply_sent_tit < 32) { @@ -192,7 +188,7 @@ static void upStream(tunnel_t *self, context_t *c) if (state->fallback != NULL && ! cstate->handshake_completed) { - bufferStreamPush(cstate->fallback_buf, newShallowShiftBuffer(c->payload)); + bufferStreamPush(cstate->fallback_buf, newShallowShiftBuffer(c->line->tid,c->payload)); } if (cstate->fallback_mode) { @@ -491,10 +487,10 @@ static void downStream(tunnel_t *self, context_t *c) // testing how the filtering behaves if we force protocol client to recevie at least // 2 full chunks before sending anymore data int consume = len; - if ((cstate->reply_sent_tit == 1 && len > 2)) + if ((cstate->reply_sent_tit == 1 && len > 64)) { cstate->reply_sent_tit++; - consume = (len / 2); + consume = len - 64; } int n = SSL_write(cstate->ssl, rawBuf(c->payload), consume); diff --git a/tunnels/server/reality/reality_server.c b/tunnels/server/reality/reality_server.c index b599f458..d9ae2402 100644 --- a/tunnels/server/reality/reality_server.c +++ b/tunnels/server/reality/reality_server.c @@ -95,7 +95,7 @@ static void upStream(tunnel_t *self, context_t *c) uint8_t tls_header[1 + 2 + 2]; - bufferStreamPush(cstate->read_stream, newShallowShiftBuffer(buf)); + bufferStreamPush(cstate->read_stream, newShallowShiftBuffer(c->line->tid,buf)); while (isAlive(c->line) && bufferStreamLen(cstate->read_stream) >= kTLSHeaderlen) { bufferStreamViewBytesAt(cstate->read_stream, 0, tls_header, kTLSHeaderlen); @@ -280,7 +280,7 @@ static void downStream(tunnel_t *self, context_t *c) while (bufLen(buf) > 0 && isAlive(c->line)) { const uint16_t remain = (uint16_t) min(bufLen(buf), chunk_size); - shift_buffer_t *chunk = shallowSliceBuffer(buf, remain); + shift_buffer_t *chunk = shallowSliceBuffer(c->line->tid,buf, remain); chunk = genericEncrypt(chunk, cstate->encryption_context, state->context_password, getContextBufferPool(c)); signMessage(chunk, cstate->msg_digest, cstate->sign_context, cstate->sign_key); diff --git a/tunnels/server/reverse/helpers.h b/tunnels/server/reverse/helpers.h index c3535dbe..62772137 100644 --- a/tunnels/server/reverse/helpers.h +++ b/tunnels/server/reverse/helpers.h @@ -84,7 +84,7 @@ static reverse_server_con_state_t *createCstateU(line_t *line) reverse_server_con_state_t *cstate = malloc(sizeof(reverse_server_con_state_t)); memset(cstate, 0, sizeof(reverse_server_con_state_t)); cstate->u = line; - cstate->uqueue = newContextQueue(getLineBufferPool(line)); + cstate->uqueue = newContextQueue(); setupLineUpSide(line, onLinePausedU, cstate, onLineResumedU); return cstate; } diff --git a/tunnels/server/reverse/reverse_server.c b/tunnels/server/reverse/reverse_server.c index 1ad0dceb..654828d7 100644 --- a/tunnels/server/reverse/reverse_server.c +++ b/tunnels/server/reverse/reverse_server.c @@ -11,7 +11,8 @@ enum { - kHandShakeByte = 0xFF + kHandShakeByte = 0xFF, + kHandShakeLength = 96 }; #define VAL_1X kHandShakeByte @@ -68,13 +69,13 @@ static void upStream(tunnel_t *self, context_t *c) } else { - if (bufferStreamLen(dcstate->wait_stream) >= 96) + if (bufferStreamLen(dcstate->wait_stream) >= kHandShakeLength) { - shift_buffer_t *data = bufferStreamRead(dcstate->wait_stream, 96); + shift_buffer_t *data = bufferStreamRead(dcstate->wait_stream, kHandShakeLength); - static const uint8_t kHandshakeExpecetd[96] = {VAL_64X, VAL_32X}; + static const uint8_t kHandshakeExpecetd[kHandShakeLength] = {VAL_64X, VAL_32X}; - dcstate->handshaked = 0 == memcmp(kHandshakeExpecetd, rawBuf(data), 96); + dcstate->handshaked = 0 == memcmp(kHandshakeExpecetd, rawBuf(data), kHandShakeLength); thread_box_t *this_tb = &(state->threadlocal_pool[c->line->tid]); diff --git a/tunnels/server/wolfssl/wolfssl_server.c b/tunnels/server/wolfssl/wolfssl_server.c index 3182d4f2..313e019d 100644 --- a/tunnels/server/wolfssl/wolfssl_server.c +++ b/tunnels/server/wolfssl/wolfssl_server.c @@ -164,7 +164,7 @@ static void upStream(tunnel_t *self, context_t *c) if (! cstate->handshake_completed) { - bufferStreamPush(cstate->fallback_buf, newShallowShiftBuffer(c->payload)); + bufferStreamPush(cstate->fallback_buf, newShallowShiftBuffer(c->line->tid,c->payload)); } if (cstate->fallback_mode) { diff --git a/ww/buffer_pool.c b/ww/buffer_pool.c index 45889ba3..eeb0cff8 100644 --- a/ww/buffer_pool.c +++ b/ww/buffer_pool.c @@ -39,8 +39,9 @@ struct buffer_pool_s { unsigned int len; - unsigned int cap; - unsigned int free_threshould; + uint16_t cap; + uint16_t free_threshould; + uint8_t tid; unsigned int buffers_size; #if defined(DEBUG) && defined(BUFFER_POOL_DEBUG) atomic_size_t in_use; @@ -52,7 +53,7 @@ static void firstCharge(buffer_pool_t *pool) { for (size_t i = 0; i < (pool->cap / 2); i++) { - pool->available[i] = newShiftBuffer(pool->buffers_size); + pool->available[i] = newShiftBuffer(pool->tid,pool->buffers_size); } pool->len = pool->cap / 2; } @@ -63,7 +64,7 @@ static void reCharge(buffer_pool_t *pool) for (size_t i = pool->len; i < (pool->len + increase); i++) { - pool->available[i] = newShiftBuffer(pool->buffers_size); + pool->available[i] = newShiftBuffer(pool->tid,pool->buffers_size); } pool->len += increase; #if defined(DEBUG) && defined(BUFFER_POOL_DEBUG) @@ -77,7 +78,7 @@ static void giveMemBackToOs(buffer_pool_t *pool) for (size_t i = pool->len - decrease; i < pool->len; i++) { - destroyShiftBuffer(pool->available[i]); + destroyShiftBuffer(pool->tid,pool->available[i]); } pool->len -= decrease; @@ -116,7 +117,7 @@ void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b) if (isShallow(b)) { - destroyShiftBuffer(b); + destroyShiftBuffer(pool->tid,b); return; } #if defined(DEBUG) && defined(BUFFER_POOL_DEBUG) @@ -146,7 +147,7 @@ shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict return b2; } -static buffer_pool_t *allocBufferPool(unsigned long bufcount, unsigned int buffer_size) // NOLINT +static buffer_pool_t *allocBufferPool(uint8_t tid,unsigned long bufcount, unsigned int buffer_size) // NOLINT { // stop using pool if you want less, simply uncomment lines in popbuffer and reuseBuffer assert(bufcount >= 1); @@ -163,16 +164,17 @@ static buffer_pool_t *allocBufferPool(unsigned long bufcount, unsigned int buffe pool->cap = bufcount; pool->buffers_size = buffer_size; pool->free_threshould = max(pool->cap / 2, (pool->cap * 2) / 3); + pool->tid = tid; firstCharge(pool); return pool; } -buffer_pool_t *createBufferPool(void) +buffer_pool_t *createBufferPool(uint8_t tid) { - return allocBufferPool(BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE); + return allocBufferPool(tid,BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE); } -buffer_pool_t *createSmallBufferPool(void) +buffer_pool_t *createSmallBufferPool(uint8_t tid) { - return allocBufferPool(BUFFERPOOL_SMALL_CONTAINER_LEN, BUFFER_SIZE_SMALL); + return allocBufferPool(tid,BUFFERPOOL_SMALL_CONTAINER_LEN, BUFFER_SIZE_SMALL); } diff --git a/ww/buffer_pool.h b/ww/buffer_pool.h index d2e31acf..dc7720d7 100644 --- a/ww/buffer_pool.h +++ b/ww/buffer_pool.h @@ -29,8 +29,8 @@ struct buffer_pool_s; typedef struct buffer_pool_s buffer_pool_t; -buffer_pool_t *createSmallBufferPool(void); -buffer_pool_t *createBufferPool(void); +buffer_pool_t *createSmallBufferPool(uint8_t tid); +buffer_pool_t *createBufferPool(uint8_t tid); shift_buffer_t *popBuffer(buffer_pool_t *pool); void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b); shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict b1, shift_buffer_t *restrict b2); diff --git a/ww/context_queue.c b/ww/context_queue.c index a7dc3561..9f98e88b 100644 --- a/ww/context_queue.c +++ b/ww/context_queue.c @@ -16,14 +16,12 @@ enum struct context_queue_s { queue q; - buffer_pool_t *pool; }; -context_queue_t *newContextQueue(buffer_pool_t *pool) +context_queue_t *newContextQueue(void) { context_queue_t *cb = malloc(sizeof(context_queue_t)); cb->q = queue_with_capacity(kQCap); - cb->pool = pool; return cb; } void destroyContextQueue(context_queue_t *self) @@ -32,7 +30,7 @@ void destroyContextQueue(context_queue_t *self) { if ((*i.ref)->payload != NULL) { - reuseBuffer(self->pool, (*i.ref)->payload); + reuseBuffer(getContextBufferPool((*i.ref)), (*i.ref)->payload); CONTEXT_PAYLOAD_DROP((*i.ref)); } destroyContext((*i.ref)); diff --git a/ww/context_queue.h b/ww/context_queue.h index 573f53cc..47a821fc 100644 --- a/ww/context_queue.h +++ b/ww/context_queue.h @@ -1,6 +1,5 @@ #pragma once -#include "buffer_pool.h" #include "tunnel.h" #include @@ -11,7 +10,7 @@ typedef struct context_queue_s context_queue_t; -context_queue_t *newContextQueue(buffer_pool_t *pool); +context_queue_t *newContextQueue(void); void destroyContextQueue(context_queue_t *self); void contextQueuePush(context_queue_t *self, context_t *context); context_t *contextQueuePop(context_queue_t *self); diff --git a/ww/generic_pool.c b/ww/generic_pool.c index f6fa0077..234578d6 100644 --- a/ww/generic_pool.c +++ b/ww/generic_pool.c @@ -47,8 +47,8 @@ static void poolFirstCharge(generic_pool_t *pool) } } -static generic_pool_t *allocateGenericPool(unsigned long pool_width, PoolItemCreateHandle create_h, - PoolItemDestroyHandle destroy_h) +static generic_pool_t *allocateGenericPool(unsigned int item_size, unsigned int pool_width, + PoolItemCreateHandle create_h, PoolItemDestroyHandle destroy_h) { pool_width = (unsigned long) pow(2, floor(log2((double) (max(16, (ssize_t) pool_width))))); @@ -56,25 +56,48 @@ static generic_pool_t *allocateGenericPool(unsigned long pool_width, PoolItemCre pool_width = 2 * pool_width; const unsigned long container_len = pool_width * sizeof(pool_item_t *); - generic_pool_t *pool = malloc(sizeof(generic_pool_t) + container_len); + generic_pool_t *pool_ptr = malloc(sizeof(generic_pool_t) + container_len); #ifdef DEBUG - memset(pool, 0xEE, sizeof(generic_pool_t) + container_len); + memset(pool_ptr, 0xEB, sizeof(generic_pool_t) + container_len); #endif - memset(pool, 0, sizeof(generic_pool_t)); - pool->cap = pool_width; - pool->free_threshould = max(pool->cap / 2, (pool->cap * 2) / 3); - pool->create_item_handle = create_h; - pool->destroy_item_handle = destroy_h; - poolFirstCharge(pool); - return pool; + generic_pool_t pool = {.cap = pool_width, + .free_threshould = max(pool_width / 2, (pool_width * 2) / 3), + .item_size = item_size, + .create_item_handle = create_h, + .destroy_item_handle = destroy_h}; + + memcpy(pool_ptr, &pool, sizeof(generic_pool_t)); + poolFirstCharge(pool_ptr); + return pool_ptr; +} + +static pool_item_t *poolDefaultAllocator(struct generic_pool_s *pool) +{ + return malloc(pool->item_size); +} + +static void poolDefaultDeallocator(struct generic_pool_s *pool, pool_item_t *item) +{ + (void) pool; + free(item); } generic_pool_t *newGenericPool(PoolItemCreateHandle create_h, PoolItemDestroyHandle destroy_h) { - return allocateGenericPool(GENERIC_POOL_DEFAULT_WIDTH, create_h, destroy_h); + return allocateGenericPool(0, GENERIC_POOL_DEFAULT_WIDTH, create_h, destroy_h); } -generic_pool_t *newGenericPoolWithSize(unsigned long pool_width, PoolItemCreateHandle create_h, - PoolItemDestroyHandle destroy_h) +generic_pool_t *newGenericPoolWithCap(unsigned int pool_width, PoolItemCreateHandle create_h, + PoolItemDestroyHandle destroy_h) +{ + return allocateGenericPool(0, pool_width, create_h, destroy_h); +} + +generic_pool_t *newGenericPoolDefaultAllocator(unsigned int item_size) +{ + return allocateGenericPool(item_size, GENERIC_POOL_DEFAULT_WIDTH, poolDefaultAllocator, poolDefaultDeallocator); +} + +generic_pool_t *newGenericPoolDefaultAllocatorWithCap(unsigned int item_size, unsigned int pool_width) { - return allocateGenericPool(pool_width, create_h, destroy_h); + return allocateGenericPool(item_size, pool_width, poolDefaultAllocator, poolDefaultDeallocator); } diff --git a/ww/generic_pool.h b/ww/generic_pool.h index 63e28c2b..49606bac 100644 --- a/ww/generic_pool.h +++ b/ww/generic_pool.h @@ -26,8 +26,9 @@ // #define BYPASS_POOL struct generic_pool_s; - +// struct pool_item_s; // void typedef void pool_item_t; + typedef pool_item_t *(*PoolItemCreateHandle)(struct generic_pool_s *pool); typedef void (*PoolItemDestroyHandle)(struct generic_pool_s *pool, pool_item_t *item); @@ -36,6 +37,7 @@ typedef void (*PoolItemDestroyHandle)(struct generic_pool_s *pool, pool_item_t * unsigned int len; \ unsigned int cap; \ unsigned int free_threshould; \ + const unsigned int item_size; \ atomic_size_t in_use; \ PoolItemCreateHandle create_item_handle; \ PoolItemDestroyHandle destroy_item_handle; \ @@ -46,6 +48,7 @@ typedef void (*PoolItemDestroyHandle)(struct generic_pool_s *pool, pool_item_t * unsigned int len; \ unsigned int cap; \ unsigned int free_threshould; \ + const unsigned int item_size; \ PoolItemCreateHandle create_item_handle; \ PoolItemDestroyHandle destroy_item_handle; \ pool_item_t *available[]; @@ -99,8 +102,11 @@ static inline void reusePoolItem(generic_pool_t *pool, pool_item_t *b) } generic_pool_t *newGenericPool(PoolItemCreateHandle create_h, PoolItemDestroyHandle destroy_h); -generic_pool_t *newGenericPoolWithSize(unsigned long pool_width, PoolItemCreateHandle create_h, - PoolItemDestroyHandle destroy_h); +generic_pool_t *newGenericPoolWithCap(unsigned int pool_width, PoolItemCreateHandle create_h, + PoolItemDestroyHandle destroy_h); + +generic_pool_t *newGenericPoolDefaultAllocator(unsigned int item_size); +generic_pool_t *newGenericPoolDefaultAllocatorWithCap(unsigned int item_size, unsigned int pool_width); #undef BYPASS_POOL #undef POOL_DEBUG diff --git a/ww/idle_table.h b/ww/idle_table.h index aac47b57..a0dfe5db 100644 --- a/ww/idle_table.h +++ b/ww/idle_table.h @@ -20,7 +20,7 @@ and other threads must not change , remove or do anything to it because of that, tid parameter is required in order to find the item - -- valgrind unfriendly, since we required 64byte alignment, so it says "possibly lost" + -- valgrind unfriendly, since we required 64byte alignment, so it says "possibly/definitely lost" but the pointer is saved in "memptr" field inside the object note that libhv timer is also not a real timer, but is a heap like timer diff --git a/ww/managers/node_manager.c b/ww/managers/node_manager.c index 3b970962..20b94daa 100644 --- a/ww/managers/node_manager.c +++ b/ww/managers/node_manager.c @@ -29,7 +29,7 @@ typedef struct node_manager_s static node_manager_t *state; -void runNode(node_t *n1, size_t chain_index) +void runNode(node_t *n1, uint8_t chain_index) { if (n1 == NULL) { @@ -55,7 +55,8 @@ void runNode(node_t *n1, size_t chain_index) exit(1); } - n1->instance->chain_index = chain_index; + memcpy((uint8_t*)&(n1->instance->chain_index), &chain_index, sizeof(uint8_t)); + chain(n1->instance, n2->instance); } else @@ -69,7 +70,7 @@ void runNode(node_t *n1, size_t chain_index) LOGF("NodeManager: node startup failure: node (\"%s\") create() returned NULL handle", n1->name); exit(1); } - n1->instance->chain_index = chain_index; + memcpy((uint8_t*)&(n1->instance->chain_index), &chain_index, sizeof(uint8_t)); } } diff --git a/ww/managers/node_manager.h b/ww/managers/node_manager.h index 979a24c7..a0dc2015 100644 --- a/ww/managers/node_manager.h +++ b/ww/managers/node_manager.h @@ -24,7 +24,7 @@ struct node_manager_s; -void runNode(node_t *n1, size_t chain_index); +void runNode(node_t *n1, uint8_t chain_index); node_t *getNode(hash_t hash_node_name); node_t *newNode(void); void registerNode(node_t *new_node, cJSON *node_settings); diff --git a/ww/managers/socket_manager.c b/ww/managers/socket_manager.c index 25f39a28..d9e7c8c6 100644 --- a/ww/managers/socket_manager.c +++ b/ww/managers/socket_manager.c @@ -10,7 +10,6 @@ #include "tunnel.h" #include "utils/procutils.h" #include "ww.h" -#include typedef struct socket_filter_s { @@ -31,9 +30,9 @@ typedef struct socket_filter_s #define SUPPORT_V6 false enum { - kSoOriginalDest = 80, - kFilterLevels = 4, - kAcceptThreadTid = 1000 + kSoOriginalDest = 80, + kFilterLevels = 4, + kAcceptThreadIdOffset = 0 }; typedef struct socket_manager_s @@ -95,6 +94,7 @@ static void destroyUdpPayloadPoolHandle(struct generic_pool_s *pool, pool_item_t void destroySocketAcceptResult(socket_accept_result_t *sar) { const uint8_t tid = sar->tid; + hhybridmutex_lock(&(state->tcp_pools[tid].mutex)); reusePoolItem(state->tcp_pools[tid].pool, sar); hhybridmutex_unlock(&(state->tcp_pools[tid].mutex)); @@ -201,7 +201,8 @@ static void signalHandler(int signum) raise(signum); } -int checkIPRange4(const struct in_addr test_addr, const struct in_addr base_addr, const struct in_addr subnet_mask) +static inline int checkIPRange4(const struct in_addr test_addr, const struct in_addr base_addr, + const struct in_addr subnet_mask) { if ((test_addr.s_addr & subnet_mask.s_addr) == (base_addr.s_addr & subnet_mask.s_addr)) { @@ -209,7 +210,8 @@ int checkIPRange4(const struct in_addr test_addr, const struct in_addr base_addr } return 0; } -int checkIPRange6(const struct in6_addr test_addr, const struct in6_addr base_addr, const struct in6_addr subnet_mask) +static inline int checkIPRange6(const struct in6_addr test_addr, const struct in6_addr base_addr, + const struct in6_addr subnet_mask) { uint64_t *test_addr_p = (uint64_t *) &(test_addr.s6_addr[0]); @@ -352,15 +354,13 @@ void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onA if (option.white_list_raddr != NULL) { pirority++; + parseWhiteListOption(&option); } if (option.black_list_raddr != NULL) { pirority++; } - if (option.white_list_raddr != NULL) - { - parseWhiteListOption(&option); - } + *filter = (socket_filter_t){.tunnel = tunnel, .option = option, .cb = cb, .listen_io = NULL}; hhybridmutex_lock(&(state->mutex)); @@ -413,12 +413,9 @@ static void noTcpSocketConsumerFound(hio_t *io) static bool checkIpIsWhiteList(sockaddr_u *addr, const socket_filter_option_t option) { - struct in6_addr test_addr = {0}; - const bool is_v4 = addr->sa.sa_family == AF_INET; + const bool is_v4 = addr->sa.sa_family == AF_INET; if (is_v4) { - memcpy(&test_addr, &addr->sin.sin_addr, 4); - for (unsigned int i = 0; i < option.white_list_parsed_length; i++) { @@ -496,7 +493,7 @@ static void onAcceptTcpMultiPort(hio_t *io) #ifdef OS_UNIX unsigned char pbuf[28] = {0}; - socklen_t size = 16; // todo ipv6 value is 28 + socklen_t size = 16; // todo (ipv6) value is 28 but requires dual stack if (getsockopt(hio_fd(io), IPPROTO_IP, kSoOriginalDest, &(pbuf[0]), &size) < 0) { char localaddrstr[SOCKADDR_STRLEN] = {0}; @@ -655,23 +652,6 @@ static void listenTcp(hloop_t *loop, uint8_t *ports_overlapped) } } -// void onUdpSocketExpire(struct idle_item_s *table_item) -// { -// assert(table_item->userdata != NULL); -// udpsock_t *udpsock = table_item->userdata; - -// // call close callback -// if (udpsock->closecb) -// { -// hevent_t ev; -// memset(&ev, 0, sizeof(ev)); -// ev.loop = loops[table_item->tid]; -// ev.cb = udpsock->closecb; -// hevent_set_userdata(&ev, udpsock); -// hloop_post_event(loops[table_item->tid], &ev); -// } -// } - static void noUdpSocketConsumerFound(udp_payload_t *upl) { char localaddrstr[SOCKADDR_STRLEN] = {0}; @@ -838,7 +818,14 @@ static HTHREAD_ROUTINE(accept_thread) // NOLINT { (void) userdata; - hloop_t *loop = hloop_new(HLOOP_FLAG_AUTO_FREE, createSmallBufferPool(), kAcceptThreadTid); + const uint8_t tid = workers_count + kAcceptThreadIdOffset; + assert(tid >= 1); + + shift_buffer_pools[tid] = + newGenericPoolWithCap((64) + ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); + buffer_pools[tid] = createBufferPool(tid); + hloop_t *loop = hloop_new(HLOOP_FLAG_AUTO_FREE, buffer_pools[tid], tid); + loops[tid] = loop; hhybridmutex_lock(&(state->mutex)); // state->table = newIdleTable(loop, onUdpSocketExpire); @@ -873,7 +860,10 @@ void startSocketManager(void) { assert(state != NULL); // accept_thread(accept_thread_loop); + const uint8_t tid = workers_count + kAcceptThreadIdOffset; + state->accept_thread = hthread_create(accept_thread, NULL); + workers[tid] = state->accept_thread; } socket_manager_state_t *createSocketManager(void) @@ -897,11 +887,11 @@ socket_manager_state_t *createSocketManager(void) for (unsigned int i = 0; i < workers_count; ++i) { state->udp_pools[i].pool = - newGenericPoolWithSize((8) + ram_profile, allocUdpPayloadPoolHandle, destroyUdpPayloadPoolHandle); + newGenericPoolWithCap((8) + ram_profile, allocUdpPayloadPoolHandle, destroyUdpPayloadPoolHandle); hhybridmutex_init(&(state->udp_pools[i].mutex)); state->tcp_pools[i].pool = - newGenericPoolWithSize((8) + ram_profile, allocTcpResultObjectPoolHandle, destroyTcpResultObjectPoolHandle); + newGenericPoolWithCap((8) + ram_profile, allocTcpResultObjectPoolHandle, destroyTcpResultObjectPoolHandle); hhybridmutex_init(&(state->tcp_pools[i].mutex)); } @@ -921,11 +911,6 @@ socket_manager_state_t *createSocketManager(void) perror("Error setting SIGINT signal handler"); exit(1); } - if (atexit(exitHook) != 0) - { - perror("Error setting ATEXIT hook"); - exit(1); - } return state; } diff --git a/ww/pipe_line.h b/ww/pipe_line.h index 494fd582..efa808d0 100644 --- a/ww/pipe_line.h +++ b/ww/pipe_line.h @@ -33,7 +33,7 @@ i hope you don't use it, currently only used for halfduplex server since there were no other way... - -- valgrind unfriendly, since we required 64byte alignment, so it says "possibly lost" + -- valgrind unfriendly, since we required 64byte alignment, so it says "possibly/definitely lost" but the pointer is saved in "memptr" field inside the object */ diff --git a/ww/shiftbuffer.c b/ww/shiftbuffer.c index c5837c42..ea37c815 100644 --- a/ww/shiftbuffer.c +++ b/ww/shiftbuffer.c @@ -1,4 +1,5 @@ #include "shiftbuffer.h" +#include "generic_pool.h" #include "utils/mathutils.h" #include "ww.h" #include // for assert @@ -9,20 +10,44 @@ #define PREPADDING ((ram_profile >= kRamProfileS2Memory ? (1U << 11) : (1U << 8)) + 512) -void destroyShiftBuffer(shift_buffer_t *self) +pool_item_t *allocShiftBufferPoolHandle(struct generic_pool_s *pool) { + (void) pool; + shift_buffer_t *self = malloc(sizeof(shift_buffer_t)); + + *self = (shift_buffer_t){ + .refc = malloc(sizeof(self->refc[0])), + }; + return self; +} +void destroyShiftBufferPoolHandle(struct generic_pool_s *pool, pool_item_t *item) +{ + (void) pool; + shift_buffer_t *self = item; + + free(self->refc); + free(self); +} + +void destroyShiftBuffer(uint8_t tid, shift_buffer_t *self) +{ + assert(*(self->refc) > 0); // if its a shallow then the underlying buffer survives *(self->refc) -= 1; if (*(self->refc) <= 0) { free(self->pbuf - self->_offset); - free(self->refc); + reusePoolItem(shift_buffer_pools[tid], self); + } + else + { + self->refc = malloc(sizeof(self->refc[0])); + reusePoolItem(shift_buffer_pools[tid], self); } - free(self); } -shift_buffer_t *newShiftBuffer(unsigned int pre_cap) +shift_buffer_t *newShiftBuffer(uint8_t tid, unsigned int pre_cap) // NOLINT { if (pre_cap != 0 && pre_cap % 16 != 0) { @@ -31,15 +56,15 @@ shift_buffer_t *newShiftBuffer(unsigned int pre_cap) unsigned int real_cap = pre_cap + (PREPADDING); - shift_buffer_t *self = malloc(sizeof(shift_buffer_t)); + // shift_buffer_t *self = malloc(sizeof(shift_buffer_t)); + shift_buffer_t *self = (shift_buffer_t *) popPoolItem(shift_buffer_pools[tid]); - // todo (optimize) i think refc and pbuf could be in 1 malloc - *self = (shift_buffer_t){.calc_len = 0, - ._offset = 0, - .curpos = PREPADDING, - .full_cap = real_cap, - .refc = malloc(sizeof(self->refc[0])), - .pbuf = malloc(real_cap)}; + self->calc_len = 0; + self->_offset = 0; + self->curpos = PREPADDING; + self->full_cap = real_cap; + self->pbuf = malloc(real_cap); + // self->refc = malloc(sizeof(self->refc[0])), *(self->refc) = 1; if (real_cap > 0) // map the virtual memory page to physical memory @@ -55,11 +80,12 @@ shift_buffer_t *newShiftBuffer(unsigned int pre_cap) return self; } -shift_buffer_t *newShallowShiftBuffer(shift_buffer_t *owner) +shift_buffer_t *newShallowShiftBuffer(uint8_t tid, shift_buffer_t *owner) { *(owner->refc) += 1; - shift_buffer_t *shallow = malloc(sizeof(shift_buffer_t)); - *shallow = *owner; + shift_buffer_t *shallow = (shift_buffer_t *) popPoolItem(shift_buffer_pools[tid]); + free(shallow->refc); + *shallow = *owner; return shallow; } @@ -90,12 +116,8 @@ void reset(shift_buffer_t *self, unsigned int pre_cap) void unShallow(shift_buffer_t *self) { - if (*(self->refc) <= 1) - { - // not a shallow - assert(false); - return; - } + // not a shallow + assert(*(self->refc) > 1); *(self->refc) -= 1; self->refc = malloc(sizeof(unsigned int)); @@ -171,11 +193,11 @@ void sliceBufferTo(shift_buffer_t *restrict dest, shift_buffer_t *restrict sourc setLen(dest, bytes); } -shift_buffer_t *sliceBuffer(shift_buffer_t *self, unsigned int bytes) +shift_buffer_t *sliceBuffer(uint8_t tid, shift_buffer_t *self, unsigned int bytes) { assert(bytes <= bufLen(self)); - shift_buffer_t *newbuf = newShiftBuffer(self->full_cap / 2); + shift_buffer_t *newbuf = newShiftBuffer(tid, self->full_cap / 2); if (bytes <= bufLen(self) / 2) { @@ -205,7 +227,7 @@ shift_buffer_t *sliceBuffer(shift_buffer_t *self, unsigned int bytes) return newbuf; } -shift_buffer_t *shallowSliceBuffer(shift_buffer_t *self, unsigned int bytes) +shift_buffer_t *shallowSliceBuffer(uint8_t tid, shift_buffer_t *self, unsigned int bytes) { assert(bytes <= bufLen(self)); @@ -217,7 +239,7 @@ shift_buffer_t *shallowSliceBuffer(shift_buffer_t *self, unsigned int bytes) self->_offset = 0; } - shift_buffer_t *shallow = newShallowShiftBuffer(self); + shift_buffer_t *shallow = newShallowShiftBuffer(tid, self); setLen(shallow, bytes); constrainRight(shallow); diff --git a/ww/shiftbuffer.h b/ww/shiftbuffer.h index b6deda92..8964e199 100644 --- a/ww/shiftbuffer.h +++ b/ww/shiftbuffer.h @@ -39,16 +39,21 @@ struct shift_buffer_s typedef struct shift_buffer_s shift_buffer_t; -shift_buffer_t *newShiftBuffer(unsigned int pre_cap); -shift_buffer_t *newShallowShiftBuffer(shift_buffer_t *owner); -void destroyShiftBuffer(shift_buffer_t *self); +struct generic_pool_s; + +void *allocShiftBufferPoolHandle(struct generic_pool_s *pool); +void destroyShiftBufferPoolHandle(struct generic_pool_s *pool, void *item); + +shift_buffer_t *newShiftBuffer(uint8_t tid, unsigned int pre_cap); +shift_buffer_t *newShallowShiftBuffer(uint8_t tid, shift_buffer_t *owner); +void destroyShiftBuffer(uint8_t tid, shift_buffer_t *self); void reset(shift_buffer_t *self, unsigned int cap); void unShallow(shift_buffer_t *self); void expand(shift_buffer_t *self, unsigned int increase); void concatBuffer(shift_buffer_t *restrict root, shift_buffer_t *restrict buf); void sliceBufferTo(shift_buffer_t *restrict dest, shift_buffer_t *restrict source, unsigned int bytes); -shift_buffer_t *sliceBuffer(shift_buffer_t *self, unsigned int bytes); -shift_buffer_t *shallowSliceBuffer(shift_buffer_t *self, unsigned int bytes); +shift_buffer_t *sliceBuffer(uint8_t tid, shift_buffer_t *self, unsigned int bytes); +shift_buffer_t *shallowSliceBuffer(uint8_t tid, shift_buffer_t *self, unsigned int bytes); static inline bool isShallow(shift_buffer_t *self) { @@ -147,14 +152,12 @@ static inline void readUI8(shift_buffer_t *self, uint8_t *dest) { // *dest = *(uint8_t *) rawBuf(self); address could be misaligned memcpy(dest, rawBuf(self), sizeof(*dest)); - } static inline void readUI16(shift_buffer_t *self, uint16_t *dest) { // *dest = *(uint16_t *) rawBuf(self); address could be misaligned memcpy(dest, rawBuf(self), sizeof(*dest)); - } static inline void readUI64(shift_buffer_t *self, uint64_t *dest) @@ -181,29 +184,29 @@ static inline void writeRaw(shift_buffer_t *restrict self, const void *restrict static inline void writeI32(shift_buffer_t *self, int32_t data) { // *(int32_t *) rawBufMut(self) = data; address could be misaligned - memcpy(rawBufMut(self) , &data, sizeof(data)); + memcpy(rawBufMut(self), &data, sizeof(data)); } static inline void writeUI32(shift_buffer_t *self, uint32_t data) { // *(uint32_t *) rawBufMut(self) = data; address could be misaligned - memcpy(rawBufMut(self) , &data, sizeof(data)); + memcpy(rawBufMut(self), &data, sizeof(data)); } static inline void writeI16(shift_buffer_t *self, int16_t data) { // *(int16_t *) rawBufMut(self) = data; address could be misaligned - memcpy(rawBufMut(self) , &data, sizeof(data)); + memcpy(rawBufMut(self), &data, sizeof(data)); } static inline void writeUI16(shift_buffer_t *self, uint16_t data) { // *(uint16_t *) rawBufMut(self) = data; address could be misaligned - memcpy(rawBufMut(self) , &data, sizeof(data)); + memcpy(rawBufMut(self), &data, sizeof(data)); } static inline void writeUI8(shift_buffer_t *self, uint8_t data) { // *(uint8_t *) rawBufMut(self) = data; address could be misaligned - memcpy(rawBufMut(self) , &data, sizeof(data)); + memcpy(rawBufMut(self), &data, sizeof(data)); } diff --git a/ww/tunnel.c b/ww/tunnel.c index 3ede739d..dc1a2da7 100644 --- a/ww/tunnel.c +++ b/ww/tunnel.c @@ -23,17 +23,22 @@ void chain(tunnel_t *from, tunnel_t *to) { chainUp(from, to); chainDown(from, to); - to->chain_index = from->chain_index + 1; + + const uint8_t new_to_chain_index = from->chain_index + 1; + memcpy((uint8_t*)&(to->chain_index), &new_to_chain_index, sizeof(uint8_t)); } tunnel_t *newTunnel(void) { - tunnel_t *t = malloc(sizeof(tunnel_t)); - *t = (tunnel_t){ - .upStream = &defaultUpStream, - .downStream = &defaultDownStream, + tunnel_t *ptr = malloc(sizeof(tunnel_t)); + + tunnel_t tunnel = (tunnel_t){ + .upStream = &defaultUpStream, + .downStream = &defaultDownStream, }; - return t; + memcpy(ptr, &tunnel, sizeof(tunnel_t)); + + return ptr; } pool_item_t *allocLinePoolHandle(struct generic_pool_s *pool) diff --git a/ww/tunnel.h b/ww/tunnel.h index 8c59ed0d..cbdef072 100644 --- a/ww/tunnel.h +++ b/ww/tunnel.h @@ -117,9 +117,9 @@ typedef struct line_s LineFlowSignal up_resume_cb; LineFlowSignal dw_pause_cb; LineFlowSignal dw_resume_cb; + void *chains_state[kMaxChainLen]; socket_context_t src_ctx; socket_context_t dest_ctx; - void *chains_state[kMaxChainLen]; uint8_t auth_cur; } line_t; @@ -160,7 +160,7 @@ typedef struct tunnel_s // 48 TunnelFlowRoutine upStream; TunnelFlowRoutine downStream; - uint8_t chain_index; + const uint8_t chain_index; } tunnel_t; tunnel_t *newTunnel(void); @@ -193,9 +193,9 @@ static inline line_t *newLine(uint8_t tid) .src_ctx = (socket_context_t){.address.sa = (struct sockaddr){.sa_family = AF_INET, .sa_data = {0}}}, }; // there were no way because we declared tid as const, but im sure compiler will know what to do here - // forexample gcc has builtins + // forexample gcc has builtins memcpy(result, &newline, sizeof(line_t)); - + return result; } @@ -210,7 +210,7 @@ static inline bool isAlive(line_t *line) */ static inline void setupLineUpSide(line_t *l, LineFlowSignal pause_cb, void *state, LineFlowSignal resume_cb) { - assert(l->up_state == NULL || l->up_pause_cb == NULL); + assert(l->up_state == NULL); l->up_state = state; l->up_pause_cb = pause_cb; l->up_resume_cb = resume_cb; @@ -222,7 +222,7 @@ static inline void setupLineUpSide(line_t *l, LineFlowSignal pause_cb, void *sta */ static inline void setupLineDownSide(line_t *l, LineFlowSignal pause_cb, void *state, LineFlowSignal resume_cb) { - assert(l->dw_state == NULL || l->dw_pause_cb == NULL); + assert(l->dw_state == NULL); l->dw_state = state; l->dw_pause_cb = pause_cb; l->dw_resume_cb = resume_cb; @@ -396,6 +396,9 @@ static inline context_t *switchLine(context_t *c, line_t *line) static inline void markAuthenticated(line_t *line) { + // basic overflow protection + assert(line->auth_cur < (((0x1ULL << ((sizeof(line->auth_cur) * 8ULL) - 1ULL)) - 1ULL) | + (0xFULL << ((sizeof(line->auth_cur) * 8ULL) - 4ULL)))); line->auth_cur += 1; } diff --git a/ww/ww.c b/ww/ww.c index ced45ca5..55fbba20 100644 --- a/ww/ww.c +++ b/ww/ww.c @@ -16,11 +16,23 @@ #include #endif +/* + additional threads that dose not require instances of every pools and they will create what they need + so, these additions will only reserve their own space on the workers array + + the only purpose of this is to reduce memory usage +*/ +enum +{ + kAdditionalReservedWorkers = 1 +}; + unsigned int workers_count = 0; hthread_t *workers = NULL; unsigned int ram_profile = 0; struct hloop_s **loops = NULL; struct buffer_pool_s **buffer_pools = NULL; +struct generic_pool_s **shift_buffer_pools = NULL; struct generic_pool_s **context_pools = NULL; struct generic_pool_s **line_pools = NULL; struct generic_pool_s **pipeline_msg_pools = NULL; @@ -38,6 +50,7 @@ struct ww_runtime_state_s unsigned int ram_profile; struct hloop_s **loops; struct buffer_pool_s **buffer_pools; + struct generic_pool_s **shift_buffer_pools; struct generic_pool_s **context_pools; struct generic_pool_s **line_pools; struct generic_pool_s **pipeline_msg_pools; @@ -56,6 +69,7 @@ void setWW(struct ww_runtime_state_s *state) ram_profile = state->ram_profile; loops = state->loops; buffer_pools = state->buffer_pools; + shift_buffer_pools = state->shift_buffer_pools; context_pools = state->context_pools; line_pools = state->line_pools; pipeline_msg_pools = state->pipeline_msg_pools; @@ -79,6 +93,7 @@ struct ww_runtime_state_s *getWW(void) state->ram_profile = ram_profile; state->loops = loops; state->buffer_pools = buffer_pools; + state->shift_buffer_pools = shift_buffer_pools; state->context_pools = context_pools; state->line_pools = line_pools; state->pipeline_msg_pools = pipeline_msg_pools; @@ -158,34 +173,47 @@ void createWW(const ww_construction_data_t init_data) } workers_count = init_data.workers_count; - if (workers_count <= 0 || workers_count > 255) + if (workers_count <= 0 || workers_count > (256 - kAdditionalReservedWorkers)) { fprintf(stderr, "workers count was not in valid range, value: %u range:[1 - 255]\n", workers_count); } - workers = (hthread_t *) malloc(sizeof(hthread_t) * workers_count); - // frand_seed = time(NULL); - ram_profile = init_data.ram_profile; - buffer_pools = (struct buffer_pool_s **) malloc(sizeof(struct buffer_pool_s *) * workers_count); - - context_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * workers_count); - line_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * workers_count); - pipeline_msg_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * workers_count); - libhv_hio_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * workers_count); - + workers = (hthread_t *) malloc(sizeof(hthread_t) * (workers_count + kAdditionalReservedWorkers)); + loops = (hloop_t **) malloc(sizeof(hloop_t *) * (workers_count + kAdditionalReservedWorkers)); + + ram_profile = init_data.ram_profile; + buffer_pools = + (struct buffer_pool_s **) malloc(sizeof(struct buffer_pool_s *) * (workers_count + kAdditionalReservedWorkers)); + + shift_buffer_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * + (workers_count + kAdditionalReservedWorkers)); + context_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * + (workers_count + kAdditionalReservedWorkers)); + line_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * + (workers_count + kAdditionalReservedWorkers)); + pipeline_msg_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * + (workers_count + kAdditionalReservedWorkers)); + libhv_hio_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * + (workers_count + kAdditionalReservedWorkers)); + /* + note that we dont touch additional workers since they must initialize what they want themselves + */ for (unsigned int i = 0; i < workers_count; ++i) { - buffer_pools[i] = createBufferPool(); - context_pools[i] = newGenericPoolWithSize((16) + ram_profile, allocContextPoolHandle, destroyContextPoolHandle); - line_pools[i] = newGenericPoolWithSize((8) + ram_profile, allocLinePoolHandle, destroyLinePoolHandle); + shift_buffer_pools[i] = + newGenericPoolWithCap((64) + ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); + // shift_buffer_pools must be initalized before buffer_pools + buffer_pools[i] = createBufferPool(i); + + context_pools[i] = newGenericPoolWithCap((16) + ram_profile, allocContextPoolHandle, destroyContextPoolHandle); + line_pools[i] = newGenericPoolWithCap((8) + ram_profile, allocLinePoolHandle, destroyLinePoolHandle); pipeline_msg_pools[i] = - newGenericPoolWithSize((8) + ram_profile, allocPipeLineMsgPoolHandle, destroyPipeLineMsgPoolHandle); + newGenericPoolWithCap((8) + ram_profile, allocPipeLineMsgPoolHandle, destroyPipeLineMsgPoolHandle); // todo (half implemented) - libhv_hio_pools[i] = - newGenericPoolWithSize((32) + (2 * ram_profile), allocLinePoolHandle, destroyLinePoolHandle); + // libhv_hio_pools[i] = + // newGenericPoolWithCap((32) + (2 * ram_profile), allocLinePoolHandle, destroyLinePoolHandle); } - loops = (hloop_t **) malloc(sizeof(hloop_t *) * workers_count); loops[0] = hloop_new(HLOOP_FLAG_AUTO_FREE, buffer_pools[0], 0); workers[0] = (hthread_t) NULL; diff --git a/ww/ww.h b/ww/ww.h index 96e7fcdc..ebdae1cf 100644 --- a/ww/ww.h +++ b/ww/ww.h @@ -49,7 +49,6 @@ enum #define ALIGN2(n, w) (((n) + ((w) - 1)) & ~((w) - 1)) - struct ww_runtime_state_s; WWEXPORT void setWW(struct ww_runtime_state_s *state); @@ -65,13 +64,13 @@ typedef struct enum ram_profiles { - kRamProfileInvalid = 0, // 0 is invalid memory multiplier - kRamProfileS1Memory = 1, - kRamProfileS2Memory = 8, - kRamProfileM1Memory = 8 * 16 * 1, - kRamProfileM2Memory = 8 * 16 * 2, - kRamProfileL1Memory = 8 * 16 * 3, - kRamProfileL2Memory = 8 * 16 * 4 + kRamProfileInvalid = 0, // 0 is invalid memory multiplier + kRamProfileS1Memory = 1, + kRamProfileS2Memory = 8, + kRamProfileM1Memory = 8 * 16 * 1, + kRamProfileM2Memory = 8 * 16 * 2, + kRamProfileL1Memory = 8 * 16 * 3, + kRamProfileL2Memory = 8 * 16 * 4 }; typedef struct @@ -93,6 +92,7 @@ extern hthread_t *workers; extern unsigned int ram_profile; extern struct hloop_s **loops; extern struct buffer_pool_s **buffer_pools; +extern struct generic_pool_s **shift_buffer_pools; extern struct generic_pool_s **context_pools; extern struct generic_pool_s **line_pools; extern struct generic_pool_s **pipeline_msg_pools;