Skip to content

Commit

Permalink
redesign ww, mostly buffers / new pools
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 26, 2024
1 parent 529993d commit b4b82f6
Show file tree
Hide file tree
Showing 30 changed files with 257 additions and 203 deletions.
2 changes: 1 addition & 1 deletion tunnels/adapters/connector/tcp/tcp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ static void upStream(tunnel_t *self, context_t *c)
*cstate = (tcp_connector_con_state_t){.buffer_pool = getContextBufferPool(c),
.tunnel = self,
.line = c->line,
.data_queue = newContextQueue(getContextBufferPool(c)),
.data_queue = newContextQueue(),
.write_paused = true};

#ifdef PROFILE
Expand Down
2 changes: 1 addition & 1 deletion tunnels/adapters/listener/tcp/tcp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ static void onInboundConnected(hevent_t *ev)

*cstate = (tcp_listener_con_state_t){.line = line,
.buffer_pool = getThreadBufferPool(tid),
.data_queue = newContextQueue(getThreadBufferPool(tid)),
.data_queue = newContextQueue(),
.io = io,
.tunnel = self,
.write_paused = false,
Expand Down
2 changes: 1 addition & 1 deletion tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ static http2_client_con_state_t *createHttp2Connection(tunnel_t *self, int tid)
http2_client_con_state_t *con = malloc(sizeof(http2_client_con_state_t));

*con = (http2_client_con_state_t){
.queue = newContextQueue(getThreadBufferPool(tid)),
.queue = newContextQueue(),
.content_type = state->content_type,
.path = state->path,
.host = state->host,
Expand Down
2 changes: 1 addition & 1 deletion tunnels/client/openssl/openssl_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ static void upStream(tunnel_t *self, context_t *c)
cstate->rbio = BIO_new(BIO_s_mem());
cstate->wbio = BIO_new(BIO_s_mem());
cstate->ssl = SSL_new(state->ssl_context);
cstate->queue = newContextQueue(getContextBufferPool(c));
cstate->queue = newContextQueue();
SSL_set_connect_state(cstate->ssl); /* sets ssl to work in client mode. */
SSL_set_bio(cstate->ssl, cstate->rbio, cstate->wbio);
SSL_set_tlsext_host_name(cstate->ssl, state->sni);
Expand Down
4 changes: 2 additions & 2 deletions tunnels/client/reality/reality_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ static void upStream(tunnel_t *self, context_t *c)
while (bufLen(buf) > 0 && isAlive(c->line))
{
const uint16_t remain = (uint16_t) min(bufLen(buf), chunk_size);
shift_buffer_t *chunk = shallowSliceBuffer(buf, remain);
shift_buffer_t *chunk = shallowSliceBuffer(c->line->tid,buf, remain);
chunk =
genericEncrypt(chunk, cstate->encryption_context, state->context_password, getContextBufferPool(c));
signMessage(chunk, cstate->msg_digest, cstate->sign_context, cstate->sign_key);
Expand All @@ -159,7 +159,7 @@ static void upStream(tunnel_t *self, context_t *c)
cstate->rbio = BIO_new(BIO_s_mem());
cstate->wbio = BIO_new(BIO_s_mem());
cstate->ssl = SSL_new(state->ssl_context);
cstate->queue = newContextQueue(getContextBufferPool(c));
cstate->queue = newContextQueue();
cstate->encryption_context = EVP_CIPHER_CTX_new();
cstate->decryption_context = EVP_CIPHER_CTX_new();
cstate->sign_context = EVP_MD_CTX_create();
Expand Down
6 changes: 4 additions & 2 deletions tunnels/client/reverse/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

enum
{
kHandShakeByte = 0xFF,
kHandShakeLength = 96,
kPreconnectDelayShort = 10,
kPreconnectDelayLong = 750,
kConnectionStarvationTimeOut = 45000
Expand Down Expand Up @@ -73,8 +75,8 @@ static void doConnect(struct connect_arg *cg)
}
hello_data_ctx->first = true;
hello_data_ctx->payload = popBuffer(getContextBufferPool(hello_data_ctx));
setLen(hello_data_ctx->payload, 96);
memset(rawBufMut(hello_data_ctx->payload), 0xFF, 96);
setLen(hello_data_ctx->payload, kHandShakeLength);
memset(rawBufMut(hello_data_ctx->payload), kHandShakeByte, kHandShakeLength);
self->up->upStream(self->up, hello_data_ctx);
}

Expand Down
2 changes: 1 addition & 1 deletion tunnels/client/wolfssl/wolfssl_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ static void upStream(tunnel_t *self, context_t *c)
cstate->rbio = BIO_new(BIO_s_mem());
cstate->wbio = BIO_new(BIO_s_mem());
cstate->ssl = SSL_new(state->ssl_context);
cstate->queue = newContextQueue(getContextBufferPool(c));
cstate->queue = newContextQueue();
SSL_set_connect_state(cstate->ssl); /* sets ssl to work in client mode. */
SSL_set_bio(cstate->ssl, cstate->rbio, cstate->wbio);
SSL_set_tlsext_host_name(cstate->ssl, state->sni);
Expand Down
20 changes: 0 additions & 20 deletions tunnels/server/halfduplex/halfduplex_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ enum connection_status
{
kCsUnkown,
kCsUploadInTable,
// kCsUploadPipedIndirect,
kCsUploadDirect,
// kCsUploadPipedDirect,
kCsDownloadInTable,
kCsDownloadDirect
};
Expand Down Expand Up @@ -159,11 +157,6 @@ static void notifyDownloadLineIsReadyForBind(hash_t hash, tunnel_t *self, uint8_

pipeTo(self, upload_line_cstate->upload_line, tid_download_line);

// upload_line_cstate->state = kCsUploadPipedIndirect;

// newPipeLine(&upload_line_cstate->pipe, self, tid_upload_line, upload_line_cstate->upload_line,
// tid_download_line, localUpStream, localDownStream);

if (upload_line_cstate->buffering)
{
context_t *bctx = newContext(upload_line_cstate->upload_line);
Expand Down Expand Up @@ -323,18 +316,6 @@ static void upStream(tunnel_t *self, context_t *c)
pipeUpStream(c);
return; // piped to another worker which has waiting connections

// cstate->state = kCsUploadPipedIndirect;
// setupLineUpSide(c->line, onUploadInDirectLinePaused, cstate, onUploadInDirectLineResumed);

// newPipeLine(&cstate->pipe, self, c->line->tid, c->line, tid_download_line, localUpStream,
// localDownStream);

// if (! pipeUpStream(cstate->pipe, c))
// {
// reuseContextBuffer(c);
// destroyContext(c);
// }
// return;
}
}
else
Expand Down Expand Up @@ -696,7 +677,6 @@ static void upStream(tunnel_t *self, context_t *c)

static void downStream(tunnel_t *self, context_t *c)
{

switchLine(c, ((halfduplex_server_con_state_t *) (c->line->dw_state))->download_line);
halfduplex_server_con_state_t *cstate = CSTATE(c);
if (c->payload != NULL)
Expand Down
10 changes: 3 additions & 7 deletions tunnels/server/openssl/openssl_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ static size_t paddingDecisionCb(SSL *ssl, int type, size_t len, void *arg)
(void) type;
oss_server_con_state_t *cstate = arg;

if (cstate->reply_sent_tit == 1 && len <= 4096)
{
return (size_t) 4096 - len;
}

if (cstate->reply_sent_tit < 32)
{
Expand Down Expand Up @@ -192,7 +188,7 @@ static void upStream(tunnel_t *self, context_t *c)

if (state->fallback != NULL && ! cstate->handshake_completed)
{
bufferStreamPush(cstate->fallback_buf, newShallowShiftBuffer(c->payload));
bufferStreamPush(cstate->fallback_buf, newShallowShiftBuffer(c->line->tid,c->payload));
}
if (cstate->fallback_mode)
{
Expand Down Expand Up @@ -491,10 +487,10 @@ static void downStream(tunnel_t *self, context_t *c)
// testing how the filtering behaves if we force protocol client to recevie at least
// 2 full chunks before sending anymore data
int consume = len;
if ((cstate->reply_sent_tit == 1 && len > 2))
if ((cstate->reply_sent_tit == 1 && len > 64))
{
cstate->reply_sent_tit++;
consume = (len / 2);
consume = len - 64;
}

int n = SSL_write(cstate->ssl, rawBuf(c->payload), consume);
Expand Down
4 changes: 2 additions & 2 deletions tunnels/server/reality/reality_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ static void upStream(tunnel_t *self, context_t *c)

uint8_t tls_header[1 + 2 + 2];

bufferStreamPush(cstate->read_stream, newShallowShiftBuffer(buf));
bufferStreamPush(cstate->read_stream, newShallowShiftBuffer(c->line->tid,buf));
while (isAlive(c->line) && bufferStreamLen(cstate->read_stream) >= kTLSHeaderlen)
{
bufferStreamViewBytesAt(cstate->read_stream, 0, tls_header, kTLSHeaderlen);
Expand Down Expand Up @@ -280,7 +280,7 @@ static void downStream(tunnel_t *self, context_t *c)
while (bufLen(buf) > 0 && isAlive(c->line))
{
const uint16_t remain = (uint16_t) min(bufLen(buf), chunk_size);
shift_buffer_t *chunk = shallowSliceBuffer(buf, remain);
shift_buffer_t *chunk = shallowSliceBuffer(c->line->tid,buf, remain);
chunk = genericEncrypt(chunk, cstate->encryption_context, state->context_password,
getContextBufferPool(c));
signMessage(chunk, cstate->msg_digest, cstate->sign_context, cstate->sign_key);
Expand Down
2 changes: 1 addition & 1 deletion tunnels/server/reverse/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ static reverse_server_con_state_t *createCstateU(line_t *line)
reverse_server_con_state_t *cstate = malloc(sizeof(reverse_server_con_state_t));
memset(cstate, 0, sizeof(reverse_server_con_state_t));
cstate->u = line;
cstate->uqueue = newContextQueue(getLineBufferPool(line));
cstate->uqueue = newContextQueue();
setupLineUpSide(line, onLinePausedU, cstate, onLineResumedU);
return cstate;
}
Expand Down
11 changes: 6 additions & 5 deletions tunnels/server/reverse/reverse_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

enum
{
kHandShakeByte = 0xFF
kHandShakeByte = 0xFF,
kHandShakeLength = 96
};

#define VAL_1X kHandShakeByte
Expand Down Expand Up @@ -68,13 +69,13 @@ static void upStream(tunnel_t *self, context_t *c)
}
else
{
if (bufferStreamLen(dcstate->wait_stream) >= 96)
if (bufferStreamLen(dcstate->wait_stream) >= kHandShakeLength)
{
shift_buffer_t *data = bufferStreamRead(dcstate->wait_stream, 96);
shift_buffer_t *data = bufferStreamRead(dcstate->wait_stream, kHandShakeLength);

static const uint8_t kHandshakeExpecetd[96] = {VAL_64X, VAL_32X};
static const uint8_t kHandshakeExpecetd[kHandShakeLength] = {VAL_64X, VAL_32X};

dcstate->handshaked = 0 == memcmp(kHandshakeExpecetd, rawBuf(data), 96);
dcstate->handshaked = 0 == memcmp(kHandshakeExpecetd, rawBuf(data), kHandShakeLength);

thread_box_t *this_tb = &(state->threadlocal_pool[c->line->tid]);

Expand Down
2 changes: 1 addition & 1 deletion tunnels/server/wolfssl/wolfssl_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static void upStream(tunnel_t *self, context_t *c)

if (! cstate->handshake_completed)
{
bufferStreamPush(cstate->fallback_buf, newShallowShiftBuffer(c->payload));
bufferStreamPush(cstate->fallback_buf, newShallowShiftBuffer(c->line->tid,c->payload));
}
if (cstate->fallback_mode)
{
Expand Down
24 changes: 13 additions & 11 deletions ww/buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
struct buffer_pool_s
{
unsigned int len;
unsigned int cap;
unsigned int free_threshould;
uint16_t cap;
uint16_t free_threshould;
uint8_t tid;
unsigned int buffers_size;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
atomic_size_t in_use;
Expand All @@ -52,7 +53,7 @@ static void firstCharge(buffer_pool_t *pool)
{
for (size_t i = 0; i < (pool->cap / 2); i++)
{
pool->available[i] = newShiftBuffer(pool->buffers_size);
pool->available[i] = newShiftBuffer(pool->tid,pool->buffers_size);
}
pool->len = pool->cap / 2;
}
Expand All @@ -63,7 +64,7 @@ static void reCharge(buffer_pool_t *pool)

for (size_t i = pool->len; i < (pool->len + increase); i++)
{
pool->available[i] = newShiftBuffer(pool->buffers_size);
pool->available[i] = newShiftBuffer(pool->tid,pool->buffers_size);
}
pool->len += increase;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
Expand All @@ -77,7 +78,7 @@ static void giveMemBackToOs(buffer_pool_t *pool)

for (size_t i = pool->len - decrease; i < pool->len; i++)
{
destroyShiftBuffer(pool->available[i]);
destroyShiftBuffer(pool->tid,pool->available[i]);
}
pool->len -= decrease;

Expand Down Expand Up @@ -116,7 +117,7 @@ void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b)

if (isShallow(b))
{
destroyShiftBuffer(b);
destroyShiftBuffer(pool->tid,b);
return;
}
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
Expand Down Expand Up @@ -146,7 +147,7 @@ shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict
return b2;
}

static buffer_pool_t *allocBufferPool(unsigned long bufcount, unsigned int buffer_size) // NOLINT
static buffer_pool_t *allocBufferPool(uint8_t tid,unsigned long bufcount, unsigned int buffer_size) // NOLINT
{
// stop using pool if you want less, simply uncomment lines in popbuffer and reuseBuffer
assert(bufcount >= 1);
Expand All @@ -163,16 +164,17 @@ static buffer_pool_t *allocBufferPool(unsigned long bufcount, unsigned int buffe
pool->cap = bufcount;
pool->buffers_size = buffer_size;
pool->free_threshould = max(pool->cap / 2, (pool->cap * 2) / 3);
pool->tid = tid;
firstCharge(pool);
return pool;
}

buffer_pool_t *createBufferPool(void)
buffer_pool_t *createBufferPool(uint8_t tid)
{
return allocBufferPool(BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE);
return allocBufferPool(tid,BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE);
}

buffer_pool_t *createSmallBufferPool(void)
buffer_pool_t *createSmallBufferPool(uint8_t tid)
{
return allocBufferPool(BUFFERPOOL_SMALL_CONTAINER_LEN, BUFFER_SIZE_SMALL);
return allocBufferPool(tid,BUFFERPOOL_SMALL_CONTAINER_LEN, BUFFER_SIZE_SMALL);
}
4 changes: 2 additions & 2 deletions ww/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
struct buffer_pool_s;
typedef struct buffer_pool_s buffer_pool_t;

buffer_pool_t *createSmallBufferPool(void);
buffer_pool_t *createBufferPool(void);
buffer_pool_t *createSmallBufferPool(uint8_t tid);
buffer_pool_t *createBufferPool(uint8_t tid);
shift_buffer_t *popBuffer(buffer_pool_t *pool);
void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b);
shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict b1, shift_buffer_t *restrict b2);
Expand Down
6 changes: 2 additions & 4 deletions ww/context_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ enum
struct context_queue_s
{
queue q;
buffer_pool_t *pool;
};

context_queue_t *newContextQueue(buffer_pool_t *pool)
context_queue_t *newContextQueue(void)
{
context_queue_t *cb = malloc(sizeof(context_queue_t));
cb->q = queue_with_capacity(kQCap);
cb->pool = pool;
return cb;
}
void destroyContextQueue(context_queue_t *self)
Expand All @@ -32,7 +30,7 @@ void destroyContextQueue(context_queue_t *self)
{
if ((*i.ref)->payload != NULL)
{
reuseBuffer(self->pool, (*i.ref)->payload);
reuseBuffer(getContextBufferPool((*i.ref)), (*i.ref)->payload);
CONTEXT_PAYLOAD_DROP((*i.ref));
}
destroyContext((*i.ref));
Expand Down
3 changes: 1 addition & 2 deletions ww/context_queue.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "buffer_pool.h"
#include "tunnel.h"
#include <stddef.h>

Expand All @@ -11,7 +10,7 @@

typedef struct context_queue_s context_queue_t;

context_queue_t *newContextQueue(buffer_pool_t *pool);
context_queue_t *newContextQueue(void);
void destroyContextQueue(context_queue_t *self);
void contextQueuePush(context_queue_t *self, context_t *context);
context_t *contextQueuePop(context_queue_t *self);
Expand Down
Loading

0 comments on commit b4b82f6

Please sign in to comment.