Skip to content

Commit

Permalink
rework reverse
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed May 13, 2024
1 parent fa6a6b5 commit 4070700
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 99 deletions.
18 changes: 12 additions & 6 deletions tunnels/client/reverse/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@ static reverse_client_con_state_t *createCstate(uint8_t tid)
return cstate;
}

static void destroyCstate(reverse_client_con_state_t *cstate)
static void cleanup(reverse_client_con_state_t *cstate)
{
destroyLine(cstate->u);
destroyLine(cstate->d);
if (cstate->u)
{
destroyLine(cstate->u);
}
if (cstate->d)
{
destroyLine(cstate->d);
}
free(cstate);
}
static void doConnect(struct connect_arg *cg)
{
tunnel_t * self = cg->t;
reverse_client_state_t * state = STATE(self);
tunnel_t *self = cg->t;
reverse_client_state_t *state = STATE(self);
reverse_client_con_state_t *cstate = createCstate(cg->tid);
free(cg);
(cstate->u->chains_state)[state->chain_index_pi] = cstate;
Expand All @@ -48,7 +54,7 @@ static void connectTimerFinished(htimer_t *timer)
static void beforeConnect(hevent_t *ev)
{
struct connect_arg *cg = hevent_userdata(ev);
htimer_t * connect_timer = htimer_add(loops[cg->tid], connectTimerFinished, cg->delay, 1);
htimer_t *connect_timer = htimer_add(loops[cg->tid], connectTimerFinished, cg->delay, 1);
if (connect_timer)
{
hevent_set_userdata(connect_timer, cg);
Expand Down
70 changes: 36 additions & 34 deletions tunnels/client/reverse/reverse_client.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "reverse_client.h"
#include "buffer_pool.h"
#include "helpers.h"
#include "loggers/network_logger.h"
#include "shiftbuffer.h"
#include "tunnel.h"
#include "types.h"
#include "utils/jsonutils.h"

Expand Down Expand Up @@ -28,7 +31,7 @@ static void upStream(tunnel_t *self, context_t *c)
CSTATE_D_MUT(c) = NULL;
(dcstate->u->chains_state)[state->chain_index_pi] = NULL;
context_t *fc = switchLine(c, dcstate->u);
destroyCstate(dcstate);
cleanup(dcstate);
const unsigned int old_reverse_cons =
atomic_fetch_add_explicit(&(state->reverse_cons), -1, memory_order_relaxed);
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", fc->line->tid, state->unused_cons[tid],
Expand Down Expand Up @@ -59,6 +62,14 @@ static void downStream(tunnel_t *self, context_t *c)
{
if (! ucstate->first_sent_d)
{
ucstate->pair_connected = true;
if (state->unused_cons[tid] > 0)
{
state->unused_cons[tid] -= 1;
}
atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed);
initiateConnect(self, tid, false);

ucstate->first_sent_d = true;
context_t *turned = switchLine(c, ucstate->d);
turned->first = true;
Expand All @@ -71,40 +82,28 @@ static void downStream(tunnel_t *self, context_t *c)
}
else
{
ucstate->pair_connected = true;
if (state->unused_cons[tid] > 0)
{
state->unused_cons[tid] -= 1;
}
atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed);
self->dw->downStream(self->dw, newInitContext(ucstate->d));

if (CSTATE_U(c) == NULL)
{
reuseBuffer(getContextBufferPool(c), c->payload);
c->payload = NULL;
destroyContext(c);
return;
}

// first byte is 0xFF a signal from reverse server
uint8_t check = 0x0;
readUI8(c->payload, &check);
assert(check == (unsigned char) 0xFF);
shiftr(c->payload, 1);
if (bufLen(c->payload) <= 0)
if (check != (unsigned char) 0xFF)
{
initiateConnect(self, tid, false);
reuseBuffer(getContextBufferPool(c), c->payload);
c->payload = NULL;
reuseContextBuffer(c);
cleanup(ucstate);
self->up->upStream(self->up, newFinContextFrom(c));
destroyContext(c);
return;
}
shiftr(c->payload, 1);
state->unused_cons[tid] += 1;
LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));

ucstate->first_sent_d = true;
c->first = true;
self->dw->downStream(self->dw, switchLine(c, ucstate->d));
initiateConnect(self, tid, false);
self->dw->downStream(self->dw, newInitContext(ucstate->d));

reuseContextBuffer(c);
destroyContext(c);
return;
}
}
else
Expand All @@ -123,12 +122,12 @@ static void downStream(tunnel_t *self, context_t *c)
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
old_reverse_cons - 1);
context_t *fc = switchLine(c, ucstate->d);
destroyCstate(ucstate);
cleanup(ucstate);
self->dw->downStream(self->dw, fc);
}
else
{
destroyCstate(ucstate);
cleanup(ucstate);
if (state->unused_cons[tid] > 0)
{
state->unused_cons[tid] -= 1;
Expand All @@ -143,11 +142,14 @@ static void downStream(tunnel_t *self, context_t *c)
else if (c->est)
{
CSTATE_U(c)->established = true;
state->unused_cons[tid] += 1;
LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));

context_t *hello_data_ctx = newContextFrom(c);
hello_data_ctx->payload = popBuffer(getContextBufferPool(c));
setLen(hello_data_ctx->payload, 1);
writeUI8(hello_data_ctx->payload, 0xFF);
self->up->upStream(self->up, hello_data_ctx);

destroyContext(c);
initiateConnect(self, tid, false);
}
else
{
Expand Down Expand Up @@ -185,8 +187,8 @@ tunnel_t *newReverseClient(node_instance_context_t *instance_info)

// int total = max(16, state->cons_forward);
// int total = max(1, state->cons_forward);
state->min_unused_cons = min(max(workers_count * 4, state->min_unused_cons), 128);
state->connection_per_thread = min(4, state->min_unused_cons / workers_count);
state->min_unused_cons = 1;
state->connection_per_thread = 1;

// we are always the first line creator so its easy to get the positon independent index here
line_t *l = newLine(0);
Expand Down
2 changes: 1 addition & 1 deletion tunnels/server/reverse/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static reverse_server_con_state_t *createCstate(bool isup, line_t *line)
return cstate;
}

static void destroyCstate(reverse_server_con_state_t *cstate)
static void cleanup(reverse_server_con_state_t *cstate)
{

if (cstate->uqueue)
Expand Down
107 changes: 50 additions & 57 deletions tunnels/server/reverse/reverse_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,26 @@
#include "loggers/network_logger.h"
#include "managers/node_manager.h"
#include "types.h"
#include "ww.h"

static void flushWriteQueue(tunnel_t *self, reverse_server_con_state_t *cstate)
{

if (contextQueueLen(cstate->uqueue) > 0)
{
reverse_server_state_t *state = STATE(self);
line_t * down_line = cstate->d;
line_t *down_line = cstate->d;
while (contextQueueLen(cstate->uqueue) > 0)
{

if (! cstate->signal_sent)
{
cstate->signal_sent = true;
context_t *c = switchLine(contextQueuePop(cstate->uqueue), cstate->d);
shiftl(c->payload, 1);
writeUI8(c->payload, 0xFF);
self->dw->downStream(self->dw, c);
}
else
{
self->dw->downStream(self->dw, switchLine(contextQueuePop(cstate->uqueue), cstate->d));
}
self->dw->downStream(self->dw, switchLine(contextQueuePop(cstate->uqueue), cstate->d));

if (! isAlive(down_line))
{
return;
}
}
}
else
{
cstate->signal_sent = true;
shift_buffer_t *buf = popBuffer(getLineBufferPool(cstate->d));
shiftl(buf, 1);
writeUI8(buf, 0xFF);
context_t *c = newContext(cstate->d);
c->payload = buf;
self->dw->downStream(self->dw, c);
}
}

static void upStream(tunnel_t *self, context_t *c)
Expand All @@ -56,22 +37,29 @@ static void upStream(tunnel_t *self, context_t *c)
}
else
{
// a real pair will not send that before it receives something
reuseBuffer(getContextBufferPool(c), c->payload);
c->payload = NULL;
destroyContext(c);
}
}
else
{
const uint8_t tid = c->line->tid;
thread_box_t * this_tb = &(state->threadlocal_pool[tid]);
if (c->init)
{
if (state->chain_index_d == 0)
thread_box_t *this_tb = &(state->threadlocal_pool[c->line->tid]);
reverse_server_con_state_t *dcstate = CSTATE_D(c);

// first byte is 0xFF a signal from reverse server
uint8_t check = 0x0;
readUI8(c->payload, &check);
if (dcstate->handshaked || check != (unsigned char) 0xFF)
{
state->chain_index_d = reserveChainStateIndex(c->line);
reuseContextBuffer(c);
cleanup(dcstate);
self->dw->downStream(self->dw, newFinContextFrom(c));
destroyContext(c);
return;
}
shiftr(c->payload, 1);
dcstate->handshaked = true;
self->dw->downStream(self->dw, newEstContext(c->line));

context_t *hello_data_ctx = newContextFrom(c);
hello_data_ctx->payload = popBuffer(getContextBufferPool(c));
setLen(hello_data_ctx->payload, 1);
writeUI8(hello_data_ctx->payload, 0xFF);
self->dw->downStream(self->dw, hello_data_ctx);

if (this_tb->u_count > 0)
{
Expand All @@ -81,15 +69,28 @@ static void upStream(tunnel_t *self, context_t *c)
ucstate->paired = true;
CSTATE_D_MUT(c) = ucstate;
self->up->upStream(self->up, newEstContext(ucstate->u));
self->dw->downStream(self->dw, newEstContext(c->line));

flushWriteQueue(self, ucstate);
}
else
{
reverse_server_con_state_t *dcstate = createCstate(false, c->line);
CSTATE_D_MUT(c) = dcstate;
addConnectionD(this_tb, dcstate);
}
}
}
else
{
const uint8_t tid = c->line->tid;
thread_box_t *this_tb = &(state->threadlocal_pool[tid]);
if (c->init)
{
if (WW_UNLIKELY(state->chain_index_d == 0))
{
state->chain_index_d = reserveChainStateIndex(c->line);
}
reverse_server_con_state_t *dcstate = createCstate(false, c->line);
CSTATE_D_MUT(c) = dcstate;

destroyContext(c);
}
else if (c->fin)
Expand All @@ -101,13 +102,13 @@ static void upStream(tunnel_t *self, context_t *c)
{
line_t *u_line = dcstate->u;
(dcstate->u->chains_state)[state->chain_index_u] = NULL;
destroyCstate(dcstate);
cleanup(dcstate);
self->up->upStream(self->up, switchLine(c, u_line));
}
else
{
removeConnectionD(this_tb, dcstate);
destroyCstate(dcstate);
cleanup(dcstate);
destroyContext(c);
}
}
Expand All @@ -131,7 +132,7 @@ static void downStream(tunnel_t *self, context_t *c)
else
{
const uint8_t tid = c->line->tid;
thread_box_t * this_tb = &(state->threadlocal_pool[tid]);
thread_box_t *this_tb = &(state->threadlocal_pool[tid]);
if (c->init)
{
if (state->chain_index_u == 0)
Expand Down Expand Up @@ -160,14 +161,6 @@ static void downStream(tunnel_t *self, context_t *c)
destroyContext(c);
return;
}

dcstate->signal_sent = true;
shift_buffer_t *buf = popBuffer(getLineBufferPool(dcstate->d));
shiftl(buf, 1);
writeUI8(buf, 0xFF);
context_t *c = newContext(dcstate->d);
c->payload = buf;
self->dw->downStream(self->dw, c);
}
else
{
Expand All @@ -188,13 +181,13 @@ static void downStream(tunnel_t *self, context_t *c)
line_t *d_line = ucstate->d;
(ucstate->d->chains_state)[state->chain_index_d] = NULL;

destroyCstate(ucstate);
cleanup(ucstate);
self->dw->downStream(self->dw, switchLine(c, d_line));
}
else
{
removeConnectionU(this_tb, ucstate);
destroyCstate(ucstate);
cleanup(ucstate);
destroyContext(c);
}
}
Expand All @@ -207,10 +200,10 @@ tunnel_t *newReverseServer(node_instance_context_t *instance_info)
reverse_server_state_t *state = malloc(sizeof(reverse_server_state_t) + (workers_count * sizeof(thread_box_t)));
memset(state, 0, sizeof(reverse_server_state_t) + (workers_count * sizeof(thread_box_t)));

tunnel_t *t = newTunnel();
t->state = state;
t->upStream = &upStream;
t->downStream = &downStream;
tunnel_t *t = newTunnel();
t->state = state;
t->upStream = &upStream;
t->downStream = &downStream;
atomic_thread_fence(memory_order_release);

return t;
Expand Down
2 changes: 1 addition & 1 deletion tunnels/server/reverse/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ typedef struct reverse_server_con_state_s
{
struct reverse_server_con_state_s *prev, *next;
bool paired;
bool signal_sent;
bool handshaked;
context_queue_t * uqueue;
line_t * u;
line_t * d;
Expand Down

0 comments on commit 4070700

Please sign in to comment.