From 08d6fedef509cebf205b84d8a0eb133bd1025e0f Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Sat, 15 Jun 2024 15:35:45 +0000 Subject: [PATCH] change some http2 stuff, sizes, etc... --- tunnels/client/halfduplex/halfduplex_client.c | 5 +- tunnels/client/http2/helpers.h | 2 +- tunnels/client/http2/http2_client.c | 71 +++++++++++-------- tunnels/client/reality/reality_client.c | 4 +- tunnels/client/reverse/helpers.h | 20 +++--- tunnels/client/reverse/reverse_client.c | 43 ++++++----- tunnels/client/reverse/types.h | 9 ++- tunnels/server/halfduplex/halfduplex_server.c | 14 +--- tunnels/server/http2/http2_server.c | 63 +++++++++------- tunnels/server/reality/reality_server.c | 2 +- 10 files changed, 135 insertions(+), 98 deletions(-) diff --git a/tunnels/client/halfduplex/halfduplex_client.c b/tunnels/client/halfduplex/halfduplex_client.c index ae4e2456..3305df7a 100644 --- a/tunnels/client/halfduplex/halfduplex_client.c +++ b/tunnels/client/halfduplex/halfduplex_client.c @@ -19,7 +19,7 @@ typedef struct halfduplex_con_state_s static void onMainLinePaused(void *cstate) { - pauseLineUpSide(((halfduplex_con_state_t *) cstate)->upload_line); + // pauseLineUpSide(((halfduplex_con_state_t *) cstate)->upload_line); pauseLineUpSide(((halfduplex_con_state_t *) cstate)->download_line); } @@ -36,6 +36,9 @@ static void onUDLinePaused(void *cstate) static void onUDLineResumed(void *cstate) { resumeLineDownSide(((halfduplex_con_state_t *) cstate)->main_line); + resumeLineUpSide(((halfduplex_con_state_t *) cstate)->download_line); + resumeLineUpSide(((halfduplex_con_state_t *) cstate)->upload_line); + } static void upStream(tunnel_t *self, context_t *c) diff --git a/tunnels/client/http2/helpers.h b/tunnels/client/http2/helpers.h index b92aca90..52ee870f 100644 --- a/tunnels/client/http2/helpers.h +++ b/tunnels/client/http2/helpers.h @@ -252,7 +252,7 @@ static http2_client_con_state_t *takeHttp2Connection(tunnel_t *self, int tid) } return con; } - + // assert(false); con = createHttp2Connection(self, tid); vec_cons_push(vector, con); return con; diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index 3169c9dd..d89dc15a 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -420,48 +420,59 @@ static void upStream(tunnel_t *self, context_t *c) static void downStream(tunnel_t *self, context_t *c) { - http2_client_state_t *state = STATE(self); - http2_client_con_state_t *con = CSTATE(c); + http2_client_con_state_t *con = CSTATE(c); if (c->payload != NULL) { - - con->state = kH2WantRecv; - size_t len = bufLen(c->payload); - ssize_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *) rawBuf(c->payload), len); - assert(ret == (ssize_t) len); - reuseContextBuffer(c); - - if (! isAlive(c->line)) + size_t len = 0; + while ((len = bufLen(c->payload)) > 0) { - destroyContext(c); - return; - } + size_t consumed = min(1 << 15UL, (ssize_t) len); + con->state = kH2WantRecv; + ssize_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *) rawBuf(c->payload), consumed); + shiftr(c->payload, consumed); - if (ret != (ssize_t) len) - { - deleteHttp2Connection(con); - self->dw->downStream(self->dw, newFinContext(c->line)); - destroyContext(c); - return; - } - - while (trySendRequest(self, con, 0, NULL)) - { if (! isAlive(c->line)) { + reuseContextBuffer(c); destroyContext(c); return; } - } - if (con->root.next == NULL && con->childs_added >= state->concurrency && isAlive(c->line)) - { - context_t *con_fc = newFinContext(con->line); - tunnel_t *con_dest = con->tunnel->up; - deleteHttp2Connection(con); - con_dest->upStream(con_dest, con_fc); + if (ret != (ssize_t) consumed) + { + assert(false); + deleteHttp2Connection(con); + self->up->upStream(self->up, newFinContext(c->line)); + destroyContext(c); + return; + } + + if (nghttp2_session_want_write(con->session) != 0) + { + + while (trySendRequest(self, con, 0, NULL)) + { + if (! isAlive(c->line)) + { + reuseContextBuffer(c); + destroyContext(c); + return; + } + } + } + if (nghttp2_session_want_read(con->session) == 0 && nghttp2_session_want_write(con->session) == 0) + { + assert(false); + context_t *fin_ctx = newFinContext(con->line); + deleteHttp2Connection(con); + self->up->upStream(self->up, fin_ctx); + reuseContextBuffer(c); + destroyContext(c); + return; + } } + reuseContextBuffer(c); destroyContext(c); } else diff --git a/tunnels/client/reality/reality_client.c b/tunnels/client/reality/reality_client.c index 04c54e02..5b6d209c 100644 --- a/tunnels/client/reality/reality_client.c +++ b/tunnels/client/reality/reality_client.c @@ -117,7 +117,7 @@ static void upStream(tunnel_t *self, context_t *c) shift_buffer_t *buf = c->payload; c->payload = NULL; - const unsigned int chunk_size = ((1 << 15) - (kSignLen + (2 * kEncryptionBlockSize) + kIVlen)); + const unsigned int chunk_size = ((1 << 16) - (kSignLen + (2 * kEncryptionBlockSize) + kIVlen)); if (bufLen(buf) < chunk_size) { @@ -257,7 +257,7 @@ static void downStream(tunnel_t *self, context_t *c) uint16_t tls_ver_b; memcpy(&tls_ver_b, ((uint8_t *) rawBuf(buf)) + 1, sizeof(uint16_t)); bool is_tls_33 = tls_ver_b == kTLSVersion12; - + shiftr(buf, kTLSHeaderlen); if (! verifyMessage(buf, cstate->msg_digest, cstate->sign_context, cstate->sign_key) || diff --git a/tunnels/client/reverse/helpers.h b/tunnels/client/reverse/helpers.h index 36008d6f..b7b9ca2c 100644 --- a/tunnels/client/reverse/helpers.h +++ b/tunnels/client/reverse/helpers.h @@ -12,9 +12,9 @@ #define CSTATE_U_MUT(x) ((x)->line->chains_state)[self->chain_index] enum { - kPreconnectDelayShort = 10, - kPreconnectDelayLong = 750, - kConnectionStarvationTimeOut = 30000 + kPreconnectDelayShort = 10, + kPreconnectDelayLong = 750, + kConnectionStarvationTimeOut = 30000 }; static void onLinePausedU(void *cstate) @@ -81,7 +81,7 @@ static void doConnect(struct connect_arg *cg) hello_data_ctx->first = true; hello_data_ctx->payload = popBuffer(getContextBufferPool(hello_data_ctx)); setLen(hello_data_ctx->payload, 96); - memset(rawBufMut(hello_data_ctx->payload), 0xFF,96); + memset(rawBufMut(hello_data_ctx->payload), 0xFF, 96); self->up->upStream(self->up, hello_data_ctx); } @@ -108,12 +108,14 @@ static void initiateConnect(tunnel_t *self, uint8_t tid, bool delay) { reverse_client_state_t *state = STATE(self); - if (state->unused_cons[tid] >= state->min_unused_cons) + if (state->threadlocal_pool[tid].unused_cons_count + state->threadlocal_pool[tid].connecting_cons_count >= + state->min_unused_cons) { return; } - // bool more_delay = state->unused_cons[tid] <= 0; - // state->unused_cons[tid] += 1; + state->threadlocal_pool[tid].connecting_cons_count += 1; + // bool more_delay = state->threadlocal_pool[tid].unused_cons_count <= 0; + // state->threadlocal_pool[tid].unused_cons_count += 1; // int tid = 0; // if (workers_count > 0) @@ -154,9 +156,9 @@ static void onStarvedConnectionExpire(idle_item_t *idle_con) } assert(! cstate->pair_connected); - if (state->unused_cons[cstate->u->tid] > 0) + if (state->threadlocal_pool[cstate->u->tid].unused_cons_count > 0) { - state->unused_cons[cstate->u->tid] -= 1; + state->threadlocal_pool[cstate->u->tid].unused_cons_count -= 1; } LOGW("ReverseClient: a idle connection detected and closed"); diff --git a/tunnels/client/reverse/reverse_client.c b/tunnels/client/reverse/reverse_client.c index 656777b8..d04924a2 100644 --- a/tunnels/client/reverse/reverse_client.c +++ b/tunnels/client/reverse/reverse_client.c @@ -29,8 +29,8 @@ static void upStream(tunnel_t *self, context_t *c) context_t *fc = switchLine(c, dcstate->u); cleanup(dcstate); state->reverse_cons -= 1; - LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", fc->line->tid, state->unused_cons[tid], - state->reverse_cons); + LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", fc->line->tid, + state->threadlocal_pool[tid].unused_cons_count, state->reverse_cons); self->up->upStream(self->up, fc); } else if (c->est) @@ -59,10 +59,7 @@ static void downStream(tunnel_t *self, context_t *c) } else { - if (state->unused_cons[tid] > 0) - { - state->unused_cons[tid] -= 1; - } + state->threadlocal_pool[tid].unused_cons_count -= 1; initiateConnect(self, tid, false); atomic_fetch_add_explicit(&(state->reverse_cons), 1, memory_order_relaxed); @@ -101,36 +98,46 @@ static void downStream(tunnel_t *self, context_t *c) if (ucstate->pair_connected) { state->reverse_cons -= 1; - LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], - state->reverse_cons); + LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, + state->threadlocal_pool[tid].unused_cons_count, state->reverse_cons); context_t *fc = switchLine(c, ucstate->d); cleanup(ucstate); self->dw->downStream(self->dw, fc); } else { - if (state->unused_cons[tid] > 0) + if (ucstate->established) { - state->unused_cons[tid] -= 1; + state->threadlocal_pool[tid].unused_cons_count -= 1; + LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, + state->threadlocal_pool[tid].unused_cons_count, + atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed)); + initiateConnect(self, tid, false); } - LOGD("ReverseClient: disconnected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], - atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed)); + else + { + state->threadlocal_pool[tid].connecting_cons_count -= 1; + initiateConnect(self, tid, true); + } + cleanup(ucstate); - initiateConnect(self, tid, true); destroyContext(c); } } else if (c->est) { ucstate->established = true; - state->unused_cons[tid] += 1; - LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, state->unused_cons[tid], + state->threadlocal_pool[tid].connecting_cons_count -= 1; + state->threadlocal_pool[tid].unused_cons_count += 1; + LOGI("ReverseClient: connected, tid: %d unused: %u active: %d", tid, + state->threadlocal_pool[tid].unused_cons_count, atomic_load_explicit(&(state->reverse_cons), memory_order_relaxed)); initiateConnect(self, tid, false); // ucstate->idle_handle = newIdleItem(state->starved_connections, (hash_t) (ucstate), ucstate, - // onStarvedConnectionExpire, c->line->tid, kConnectionStarvationTimeOut); + // onStarvedConnectionExpire, c->line->tid, + // kConnectionStarvationTimeOut); destroyContext(c); } @@ -157,8 +164,8 @@ tunnel_t *newReverseClient(node_instance_context_t *instance_info) const size_t start_delay_ms = 150; - reverse_client_state_t *state = malloc(sizeof(reverse_client_state_t) + (sizeof(atomic_uint) * workers_count)); - memset(state, 0, sizeof(reverse_client_state_t) + (sizeof(atomic_uint) * workers_count)); + reverse_client_state_t *state = malloc(sizeof(reverse_client_state_t) + (sizeof(thread_box_t) * workers_count)); + memset(state, 0, sizeof(reverse_client_state_t) + (sizeof(thread_box_t) * workers_count)); const cJSON *settings = instance_info->node_settings_json; getIntFromJsonObject((int *) &(state->min_unused_cons), settings, "minimum-unused"); diff --git a/tunnels/client/reverse/types.h b/tunnels/client/reverse/types.h index 6f7ffd4c..8921fe3f 100644 --- a/tunnels/client/reverse/types.h +++ b/tunnels/client/reverse/types.h @@ -21,6 +21,13 @@ typedef struct reverse_client_con_state_s } reverse_client_con_state_t; +typedef struct thread_box_s +{ + uint32_t unused_cons_count; + uint32_t connecting_cons_count; + +} thread_box_t; + typedef struct reverse_client_state_s { idle_table_t *starved_connections; @@ -29,6 +36,6 @@ typedef struct reverse_client_state_s uint8_t chain_index_d; unsigned int min_unused_cons; - unsigned int unused_cons[]; + thread_box_t threadlocal_pool[]; } reverse_client_state_t; diff --git a/tunnels/server/halfduplex/halfduplex_server.c b/tunnels/server/halfduplex/halfduplex_server.c index 871704fc..6550336a 100644 --- a/tunnels/server/halfduplex/halfduplex_server.c +++ b/tunnels/server/halfduplex/halfduplex_server.c @@ -73,7 +73,7 @@ static void onMainLinePaused(void *_cstate) { pauseLineDownSide(cstate->upload_line); } - pauseLineDownSide(cstate->download_line); + // pauseLineDownSide(cstate->download_line); } static void onMainLineResumed(void *_cstate) @@ -93,21 +93,13 @@ static void onMainLineResumed(void *_cstate) static void onDownloadLinePaused(void *_cstate) { halfduplex_server_con_state_t *cstate = _cstate; - // todo (no if) not required - if (cstate->main_line) - { - pauseLineUpSide(cstate->main_line); - } + pauseLineUpSide(cstate->main_line); } static void onDownloadLineResumed(void *_cstate) { halfduplex_server_con_state_t *cstate = _cstate; - // todo (no if) not required - if (cstate->main_line) - { - resumeLineUpSide(cstate->main_line); - } + resumeLineUpSide(cstate->main_line); } static void onUploadInDirectLinePaused(void *_cstate) diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index 7a981c87..9c7c1d65 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -304,36 +304,51 @@ static void upStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { http2_server_con_state_t *con = CSTATE(c); - con->state = kH2WantRecv; - size_t len = bufLen(c->payload); + size_t len = 0; - ssize_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *) rawBuf(c->payload), len); - - if (! isAlive(c->line)) + while ((len = bufLen(c->payload)) > 0) { - destroyContext(c); - return; - } + size_t consumed = min(1 << 15UL, (ssize_t) len); + con->state = kH2WantRecv; + ssize_t ret = nghttp2_session_mem_recv2(con->session, (const uint8_t *) rawBuf(c->payload), consumed); + shiftr(c->payload, consumed); - if (ret != (ssize_t) len) - { - deleteHttp2Connection(con); - self->dw->downStream(self->dw, newFinContext(c->line)); - destroyContext(c); - return; - } + if (! isAlive(c->line)) + { + reuseContextBuffer(c); + destroyContext(c); + return; + } - if (nghttp2_session_want_read(con->session) == 0 && nghttp2_session_want_write(con->session) == 0) - { - context_t *fin_ctx = newFinContext(con->line); - deleteHttp2Connection(con); - self->dw->downStream(self->dw, fin_ctx); - } + if (ret != (ssize_t) consumed) + { + assert(false); + deleteHttp2Connection(con); + self->dw->downStream(self->dw, newFinContext(c->line)); + destroyContext(c); + return; + } - while (trySendResponse(self, con, 0, NULL)) - { - if (! isAlive(c->line)) + if (nghttp2_session_want_write(con->session) != 0) { + + while (trySendResponse(self, con, 0, NULL)) + { + if (! isAlive(c->line)) + { + reuseContextBuffer(c); + destroyContext(c); + return; + } + } + } + if (nghttp2_session_want_read(con->session) == 0 && nghttp2_session_want_write(con->session) == 0) + { + assert(false); + context_t *fin_ctx = newFinContext(con->line); + deleteHttp2Connection(con); + self->dw->downStream(self->dw, fin_ctx); + reuseContextBuffer(c); destroyContext(c); return; } diff --git a/tunnels/server/reality/reality_server.c b/tunnels/server/reality/reality_server.c index 5f7ad67a..ace22a83 100644 --- a/tunnels/server/reality/reality_server.c +++ b/tunnels/server/reality/reality_server.c @@ -265,7 +265,7 @@ static void downStream(tunnel_t *self, context_t *c) case kConAuthorized:; shift_buffer_t *buf = c->payload; c->payload = NULL; - const unsigned int chunk_size = ((1 << 15) - (kSignLen + (2 * kEncryptionBlockSize) + kIVlen)); + const unsigned int chunk_size = ((1 << 16) - (kSignLen + (2 * kEncryptionBlockSize) + kIVlen)); if (bufLen(buf) < chunk_size) {