Skip to content

Commit

Permalink
optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 28, 2024
1 parent dadfe6c commit 77c818e
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 50 deletions.
2 changes: 1 addition & 1 deletion tunnels/adapters/connector/tcp/tcp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static bool resumeWriteQueue(tcp_connector_con_state_t *cstate)
context_t *cw = contextQueuePop(data_queue);
int bytes = (int) bufLen(cw->payload);
int nwrite = hio_write(io, cw->payload);
cw->payload = NULL;
CONTEXT_PAYLOAD_DROP(cw);
destroyContext(cw);
if (nwrite >= 0 && nwrite < bytes)
{
Expand Down
3 changes: 1 addition & 2 deletions tunnels/adapters/connector/udp/udp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static void upStream(tunnel_t *self, context_t *c)
}

size_t nwrite = hio_write(cstate->io, c->payload);
c->payload = NULL;
CONTEXT_PAYLOAD_DROP(c);
(void) nwrite;
// assert(nwrite <= 0 || nwrite == bytes);
destroyContext(c);
Expand Down Expand Up @@ -230,7 +230,6 @@ tunnel_t *newUdpConnector(node_instance_context_t *instance_info)
t->upStream = &upStream;
t->downStream = &downStream;


return t;
}
api_result_t apiUdpConnector(tunnel_t *self, const char *msg)
Expand Down
2 changes: 1 addition & 1 deletion tunnels/adapters/listener/tcp/tcp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static bool resumeWriteQueue(tcp_listener_con_state_t *cstate)
context_t *cw = contextQueuePop(data_queue);
int bytes = (int) bufLen(cw->payload);
int nwrite = hio_write(io, cw->payload);
cw->payload = NULL;
CONTEXT_PAYLOAD_DROP(cw);
destroyContext(cw);
if (nwrite >= 0 && nwrite < bytes)
{
Expand Down
17 changes: 4 additions & 13 deletions tunnels/server/halfduplex/halfduplex_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,8 @@ static void upStream(tunnel_t *self, context_t *c)
halfduplex_server_con_state_t *cstate = CSTATE(c);
if (c->payload != NULL)
{
// todo (remove) its hard to say but i think it can be romeved and not required anymore
if (WW_UNLIKELY(cstate == NULL))
{
reuseContextBuffer(c);
destroyContext(c);
return;
}
assert(cstate != NULL);

shift_buffer_t *buf = c->payload;

switch (cstate->state)
Expand Down Expand Up @@ -529,12 +524,8 @@ static void upStream(tunnel_t *self, context_t *c)
}
else if (c->fin)
{
// todo (remove) its hard to say but i think it can be romeved and not required anymore
if (WW_UNLIKELY(cstate == NULL))
{
destroyContext(c);
return;
}
assert(cstate != NULL);

switch (cstate->state)
{

Expand Down
3 changes: 1 addition & 2 deletions tunnels/server/header/header_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static void upStream(tunnel_t *self, context_t *c)
if (bufLen(c->payload) < 2)
{
cstate->buf = c->payload;
c->payload = NULL;
CONTEXT_PAYLOAD_DROP(c);
destroyContext(c);
return;
}
Expand Down Expand Up @@ -147,7 +147,6 @@ tunnel_t *newHeaderServer(node_instance_context_t *instance_info)
t->state = state;
t->upStream = &upStream;
t->downStream = &downStream;


return t;
}
Expand Down
23 changes: 11 additions & 12 deletions tunnels/server/socks/5/socks5_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ typedef enum

typedef struct socks5_server_state_s
{
void*_;
void *_;
} socks5_server_state_t;

typedef struct socks5_server_con_state_s
Expand All @@ -91,10 +91,9 @@ static void cleanup(socks5_server_con_state_t *cstate, buffer_pool_t *reusepool)
reuseBuffer(reusepool, cstate->waitbuf);
}
}
static void encapsulateUdpPacket(context_t* c)
static void encapsulateUdpPacket(context_t *c)
{
shift_buffer_t* packet = c->payload;

shift_buffer_t *packet = c->payload;

uint16_t port = sockaddr_port(&(c->line->dest_ctx.address));
port = (port << 8) | (port >> 8);
Expand Down Expand Up @@ -127,7 +126,7 @@ static void encapsulateUdpPacket(context_t* c)
#define ATLEAST(x) \
do \
{ \
if ((int)bufLen(c->payload) < (x)) \
if ((int) bufLen(c->payload) < (x)) \
{ \
reuseContextBuffer(c); \
goto disconnect; \
Expand Down Expand Up @@ -288,7 +287,7 @@ static void upStream(tunnel_t *self, context_t *c)
if (bufLen(c->payload) < cstate->need)
{
cstate->waitbuf = c->payload;
c->payload = NULL;
CONTEXT_PAYLOAD_DROP(c);
destroyContext(c);
return;
}
Expand All @@ -299,7 +298,7 @@ static void upStream(tunnel_t *self, context_t *c)
{
case kSBegin:
cstate->state = kSAuthMethodsCount;
//fallthrough
// fallthrough
case kSAuthMethodsCount: {
assert(cstate->need == 2);
uint8_t version = 0;
Expand Down Expand Up @@ -521,9 +520,9 @@ static void upStream(tunnel_t *self, context_t *c)
else
{
reuseContextBuffer(c);
//todo (ip filter) socks5 standard says this should whitelist the caller ip
// socks5 outbound accepted, udp relay will connect
shift_buffer_t *respbuf = popBuffer(getContextBufferPool(c));
// todo (ip filter) socks5 standard says this should whitelist the caller ip
// socks5 outbound accepted, udp relay will connect
shift_buffer_t *respbuf = popBuffer(getContextBufferPool(c));
setLen(respbuf, 32);
uint8_t *resp = rawBufMut(respbuf);
memset(resp, 0, 32);
Expand Down Expand Up @@ -631,7 +630,7 @@ static void downStream(tunnel_t *self, context_t *c)
{
cstate->init_sent = false;
// socks5 outbound failed
shift_buffer_t *respbuf = popBuffer(getContextBufferPool(c));
shift_buffer_t *respbuf = popBuffer(getContextBufferPool(c));
setLen(respbuf, 32);
uint8_t *resp = rawBufMut(respbuf);
memset(resp, 0, 32);
Expand Down Expand Up @@ -728,7 +727,7 @@ tunnel_t *newSocks5Server(node_instance_context_t *instance_info)
t->state = state;
t->upStream = &upStream;
t->downStream = &downStream;

return t;
}
api_result_t apiSocks5Server(tunnel_t *self, const char *msg)
Expand Down
2 changes: 1 addition & 1 deletion ww/buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict
{
unsigned int b1_length = bufLen(b1);
unsigned int b2_length = bufLen(b2);
if (b1_length >= b2_length)
if (b1_length >= b2_length || lCap(b2) < b1_length)
{
concatBuffer(b1, b2);
reuseBuffer(pool, b2);
Expand Down
36 changes: 18 additions & 18 deletions ww/pipe_line.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,30 +46,32 @@ void destroyPipeLineMsgPoolHandle(struct generic_pool_s *pool, pool_item_t *item

static void lock(pipe_line_t *pl)
{
// int old_refc = atomic_fetch_add_explicit(&pl->refc, 1, memory_order_release);
int old_refc = atomic_fetch_add_explicit(&pl->refc, 1, memory_order_relaxed);
#ifndef RELEASE
if (0 >= old_refc)
{
// this should not happen, otherwise we must change memory order
// but i think its ok because threads synchronize around the mutex in eventloop
LOGF("PipeLine: thread-safety done incorrectly lock()");
exit(1);
}
#endif
}

static void unlock(pipe_line_t *pl)
{
// int old_refc = atomic_fetch_add_explicit(&pl->refc, -1, memory_order_acquire);
int old_refc = atomic_fetch_add_explicit(&pl->refc, -1, memory_order_relaxed);
if (old_refc == 1)
{
#ifndef RELEASE
if (! atomic_load_explicit(&(pl->closed), memory_order_relaxed))
{
// this should not happen, otherwise we must change memory order
// but i think its ok because threads synchronize around the mutex in eventloop
LOGF("PipeLine: thread-safety done incorrectly unlock()");
exit(1);
}
#endif
free((void *) pl->memptr); // NOLINT
}
}
Expand Down Expand Up @@ -143,7 +145,6 @@ static void finishLeftSide(pipe_line_t *pl, void *arg)
}
context_t *fctx = newFinContext(pl->left_line);
doneLineUpSide(pl->left_line);
// destroyLine(pl->left_line);
pl->left_line = NULL;
pl->local_down_stream(pl->self, fctx, pl);
unlock(pl);
Expand Down Expand Up @@ -258,7 +259,6 @@ bool pipeSendToUpStream(pipe_line_t *pl, context_t *c)
if (c->fin)
{
doneLineUpSide(pl->left_line);
// destroyLine(pl->left_line);
pl->left_line = NULL;

bool expected = false;
Expand Down Expand Up @@ -350,8 +350,8 @@ static void initLeft(pipe_line_t *pl, void *arg)
setupLineUpSide(pl->left_line, pipeOnDownLinePaused, pl, pipeOnDownLineResumed);
}

void newPipeLine(tunnel_t *self, line_t *left_line, uint8_t dest_tid,
PipeLineFlowRoutine local_up_stream, PipeLineFlowRoutine local_down_stream)
void newPipeLine(tunnel_t *self, line_t *left_line, uint8_t dest_tid, PipeLineFlowRoutine local_up_stream,
PipeLineFlowRoutine local_down_stream)

{
assert(sizeof(struct pipe_line_s) <= kCpuLineCacheSize);
Expand All @@ -372,21 +372,21 @@ void newPipeLine(tunnel_t *self, line_t *left_line, uint8_t dest_tid,
uintptr_t ptr = (uintptr_t) malloc(memsize);

MUSTALIGN2(ptr, kCpuLineCacheSize);

// align pointer to line cache boundary
pipe_line_t *pl = (pipe_line_t *) ALIGN2(ptr, kCpuLineCacheSize); // NOLINT

*pl = (pipe_line_t){.memptr = (void *) ptr,
.self = self,
.left_tid = left_line->tid,
.right_tid = dest_tid,
.left_line = left_line,
.right_line = NULL,
.closed = false,
.first_sent = false,
.refc = 1,
.local_up_stream = local_up_stream,
.local_down_stream = local_down_stream};
*pl = (pipe_line_t){.memptr = (void *) ptr,
.self = self,
.left_tid = left_line->tid,
.right_tid = dest_tid,
.left_line = left_line,
.right_line = NULL,
.closed = false,
.first_sent = false,
.refc = 1,
.local_up_stream = local_up_stream,
.local_down_stream = local_down_stream};

initLeft(pl, NULL);
sendMessage(pl, initRight, NULL, pl->left_tid, pl->right_tid);
Expand Down

0 comments on commit 77c818e

Please sign in to comment.