Skip to content

Commit

Permalink
change some http2 stuff, sizes, etc...
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 15, 2024
1 parent a840e27 commit 08d6fed
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 98 deletions.
5 changes: 4 additions & 1 deletion tunnels/client/halfduplex/halfduplex_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ typedef struct halfduplex_con_state_s

static void onMainLinePaused(void *cstate)
{
pauseLineUpSide(((halfduplex_con_state_t *) cstate)->upload_line);
// pauseLineUpSide(((halfduplex_con_state_t *) cstate)->upload_line);
pauseLineUpSide(((halfduplex_con_state_t *) cstate)->download_line);
}

Expand All @@ -36,6 +36,9 @@ static void onUDLinePaused(void *cstate)
static void onUDLineResumed(void *cstate)
{
resumeLineDownSide(((halfduplex_con_state_t *) cstate)->main_line);
resumeLineUpSide(((halfduplex_con_state_t *) cstate)->download_line);
resumeLineUpSide(((halfduplex_con_state_t *) cstate)->upload_line);

}

static void upStream(tunnel_t *self, context_t *c)
Expand Down
2 changes: 1 addition & 1 deletion tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ static http2_client_con_state_t *takeHttp2Connection(tunnel_t *self, int tid)
}
return con;
}

// assert(false);
con = createHttp2Connection(self, tid);
vec_cons_push(vector, con);
return con;
Expand Down
71 changes: 41 additions & 30 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,48 +420,59 @@ static void upStream(tunnel_t *self, context_t *c)

static void downStream(tunnel_t *self, context_t *c)
{
http2_client_state_t *state = STATE(self);
http2_client_con_state_t *con = CSTATE(c);
http2_client_con_state_t *con = CSTATE(c);
if (c->payload != NULL)
{

con->state = kH2WantRecv;
size_t len = bufLen(c->payload);
ssize_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *) rawBuf(c->payload), len);
assert(ret == (ssize_t) len);
reuseContextBuffer(c);

if (! isAlive(c->line))
size_t len = 0;
while ((len = bufLen(c->payload)) > 0)
{
destroyContext(c);
return;
}
size_t consumed = min(1 << 15UL, (ssize_t) len);
con->state = kH2WantRecv;
ssize_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *) rawBuf(c->payload), consumed);
shiftr(c->payload, consumed);

if (ret != (ssize_t) len)
{
deleteHttp2Connection(con);
self->dw->downStream(self->dw, newFinContext(c->line));
destroyContext(c);
return;
}

while (trySendRequest(self, con, 0, NULL))
{
if (! isAlive(c->line))
{
reuseContextBuffer(c);
destroyContext(c);
return;
}
}

if (con->root.next == NULL && con->childs_added >= state->concurrency && isAlive(c->line))
{
context_t *con_fc = newFinContext(con->line);
tunnel_t *con_dest = con->tunnel->up;
deleteHttp2Connection(con);
con_dest->upStream(con_dest, con_fc);
if (ret != (ssize_t) consumed)
{
assert(false);
deleteHttp2Connection(con);
self->up->upStream(self->up, newFinContext(c->line));
destroyContext(c);
return;
}

if (nghttp2_session_want_write(con->session) != 0)
{

while (trySendRequest(self, con, 0, NULL))
{
if (! isAlive(c->line))
{
reuseContextBuffer(c);
destroyContext(c);
return;
}
}
}
if (nghttp2_session_want_read(con->session) == 0 && nghttp2_session_want_write(con->session) == 0)
{
assert(false);
context_t *fin_ctx = newFinContext(con->line);
deleteHttp2Connection(con);
self->up->upStream(self->up, fin_ctx);
reuseContextBuffer(c);
destroyContext(c);
return;
}
}

reuseContextBuffer(c);
destroyContext(c);
}
else
Expand Down
4 changes: 2 additions & 2 deletions tunnels/client/reality/reality_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static void upStream(tunnel_t *self, context_t *c)
shift_buffer_t *buf = c->payload;
c->payload = NULL;

const unsigned int chunk_size = ((1 << 15) - (kSignLen + (2 * kEncryptionBlockSize) + kIVlen));
const unsigned int chunk_size = ((1 << 16) - (kSignLen + (2 * kEncryptionBlockSize) + kIVlen));

if (bufLen(buf) < chunk_size)
{
Expand Down Expand Up @@ -257,7 +257,7 @@ static void downStream(tunnel_t *self, context_t *c)
uint16_t tls_ver_b;
memcpy(&tls_ver_b, ((uint8_t *) rawBuf(buf)) + 1, sizeof(uint16_t));
bool is_tls_33 = tls_ver_b == kTLSVersion12;

shiftr(buf, kTLSHeaderlen);

if (! verifyMessage(buf, cstate->msg_digest, cstate->sign_context, cstate->sign_key) ||
Expand Down
20 changes: 11 additions & 9 deletions tunnels/client/reverse/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
#define CSTATE_U_MUT(x) ((x)->line->chains_state)[self->chain_index]
enum
{
kPreconnectDelayShort = 10,
kPreconnectDelayLong = 750,
kConnectionStarvationTimeOut = 30000
kPreconnectDelayShort = 10,
kPreconnectDelayLong = 750,
kConnectionStarvationTimeOut = 30000
};

static void onLinePausedU(void *cstate)
Expand Down Expand Up @@ -81,7 +81,7 @@ static void doConnect(struct connect_arg *cg)
hello_data_ctx->first = true;
hello_data_ctx->payload = popBuffer(getContextBufferPool(hello_data_ctx));
setLen(hello_data_ctx->payload, 96);
memset(rawBufMut(hello_data_ctx->payload), 0xFF,96);
memset(rawBufMut(hello_data_ctx->payload), 0xFF, 96);
self->up->upStream(self->up, hello_data_ctx);
}

Expand All @@ -108,12 +108,14 @@ static void initiateConnect(tunnel_t *self, uint8_t tid, bool delay)
{
reverse_client_state_t *state = STATE(self);

if (state->unused_cons[tid] >= state->min_unused_cons)
if (state->threadlocal_pool[tid].unused_cons_count + state->threadlocal_pool[tid].connecting_cons_count >=
state->min_unused_cons)
{
return;
}
// bool more_delay = state->unused_cons[tid] <= 0;
// state->unused_cons[tid] += 1;
state->threadlocal_pool[tid].connecting_cons_count += 1;
// bool more_delay = state->threadlocal_pool[tid].unused_cons_count <= 0;
// state->threadlocal_pool[tid].unused_cons_count += 1;

// int tid = 0;
// if (workers_count > 0)
Expand Down Expand Up @@ -154,9 +156,9 @@ static void onStarvedConnectionExpire(idle_item_t *idle_con)
}

assert(! cstate->pair_connected);
if (state->unused_cons[cstate->u->tid] > 0)
if (state->threadlocal_pool[cstate->u->tid].unused_cons_count > 0)
{
state->unused_cons[cstate->u->tid] -= 1;
state->threadlocal_pool[cstate->u->tid].unused_cons_count -= 1;
}
LOGW("ReverseClient: a idle connection detected and closed");

Expand Down
43 changes: 25 additions & 18 deletions tunnels/client/reverse/reverse_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ static void upStream(tunnel_t *self, context_t *c)
context_t *fc = switchLine(c, dcstate->u);
cleanup(dcstate);
state->reverse_cons -= 1;
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", fc->line->tid, state->unused_cons[tid],
state->reverse_cons);
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", fc->line->tid,
state->threadlocal_pool[tid].unused_cons_count, state->reverse_cons);
self->up->upStream(self->up, fc);
}
else if (c->est)
Expand Down Expand Up @@ -59,10 +59,7 @@ static void downStream(tunnel_t *self, context_t *c)
}
else
{
if (state->unused_cons[tid] > 0)
{
state->unused_cons[tid] -= 1;
}
state->threadlocal_pool[tid].unused_cons_count -= 1;
initiateConnect(self, tid, false);
atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed);

Expand Down Expand Up @@ -101,36 +98,46 @@ static void downStream(tunnel_t *self, context_t *c)
if (ucstate->pair_connected)
{
state->reverse_cons -= 1;
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
state->reverse_cons);
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid,
state->threadlocal_pool[tid].unused_cons_count, state->reverse_cons);
context_t *fc = switchLine(c, ucstate->d);
cleanup(ucstate);
self->dw->downStream(self->dw, fc);
}
else
{
if (state->unused_cons[tid] > 0)
if (ucstate->established)
{
state->unused_cons[tid] -= 1;
state->threadlocal_pool[tid].unused_cons_count -= 1;
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid,
state->threadlocal_pool[tid].unused_cons_count,
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));
initiateConnect(self, tid, false);
}
LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));
else
{
state->threadlocal_pool[tid].connecting_cons_count -= 1;
initiateConnect(self, tid, true);
}

cleanup(ucstate);
initiateConnect(self, tid, true);
destroyContext(c);
}
}
else if (c->est)
{
ucstate->established = true;
state->unused_cons[tid] += 1;
LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid],
state->threadlocal_pool[tid].connecting_cons_count -= 1;
state->threadlocal_pool[tid].unused_cons_count += 1;
LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid,
state->threadlocal_pool[tid].unused_cons_count,
atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed));

initiateConnect(self, tid, false);

// ucstate->idle_handle = newIdleItem(state->starved_connections, (hash_t) (ucstate), ucstate,
// onStarvedConnectionExpire, c->line->tid, kConnectionStarvationTimeOut);
// onStarvedConnectionExpire, c->line->tid,
// kConnectionStarvationTimeOut);

destroyContext(c);
}
Expand All @@ -157,8 +164,8 @@ tunnel_t *newReverseClient(node_instance_context_t *instance_info)

const size_t start_delay_ms = 150;

reverse_client_state_t *state = malloc(sizeof(reverse_client_state_t) + (sizeof(atomic_uint) * workers_count));
memset(state, 0, sizeof(reverse_client_state_t) + (sizeof(atomic_uint) * workers_count));
reverse_client_state_t *state = malloc(sizeof(reverse_client_state_t) + (sizeof(thread_box_t) * workers_count));
memset(state, 0, sizeof(reverse_client_state_t) + (sizeof(thread_box_t) * workers_count));
const cJSON *settings = instance_info->node_settings_json;

getIntFromJsonObject((int *) &(state->min_unused_cons), settings, "minimum-unused");
Expand Down
9 changes: 8 additions & 1 deletion tunnels/client/reverse/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ typedef struct reverse_client_con_state_s

} reverse_client_con_state_t;

typedef struct thread_box_s
{
uint32_t unused_cons_count;
uint32_t connecting_cons_count;

} thread_box_t;

typedef struct reverse_client_state_s
{
idle_table_t *starved_connections;
Expand All @@ -29,6 +36,6 @@ typedef struct reverse_client_state_s
uint8_t chain_index_d;
unsigned int min_unused_cons;

unsigned int unused_cons[];
thread_box_t threadlocal_pool[];

} reverse_client_state_t;
14 changes: 3 additions & 11 deletions tunnels/server/halfduplex/halfduplex_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ static void onMainLinePaused(void *_cstate)
{
pauseLineDownSide(cstate->upload_line);
}
pauseLineDownSide(cstate->download_line);
// pauseLineDownSide(cstate->download_line);
}

static void onMainLineResumed(void *_cstate)
Expand All @@ -93,21 +93,13 @@ static void onMainLineResumed(void *_cstate)
static void onDownloadLinePaused(void *_cstate)
{
halfduplex_server_con_state_t *cstate = _cstate;
// todo (no if) not required
if (cstate->main_line)
{
pauseLineUpSide(cstate->main_line);
}
pauseLineUpSide(cstate->main_line);
}

static void onDownloadLineResumed(void *_cstate)
{
halfduplex_server_con_state_t *cstate = _cstate;
// todo (no if) not required
if (cstate->main_line)
{
resumeLineUpSide(cstate->main_line);
}
resumeLineUpSide(cstate->main_line);
}

static void onUploadInDirectLinePaused(void *_cstate)
Expand Down
Loading

0 comments on commit 08d6fed

Please sign in to comment.