diff --git a/tunnels/adapters/listener/tcp/tcp_listener.c b/tunnels/adapters/listener/tcp/tcp_listener.c index 9f12e967..7e44af92 100644 --- a/tunnels/adapters/listener/tcp/tcp_listener.c +++ b/tunnels/adapters/listener/tcp/tcp_listener.c @@ -146,11 +146,16 @@ static void onLineResumed(void *userdata) static void upStream(tunnel_t *self, context_t *c) { +#ifdef PROFILE if (c->payload != NULL) { -#ifdef PROFILE - if (c->first) + tcp_listener_con_state_t *cstate = CSTATE(c); + + bool *first_packet_sent = &((cstate)->first_packet_sent); + + if (! (*first_packet_sent)) { + *first_packet_sent = true; struct timeval tv1, tv2; gettimeofday(&tv1, NULL); { @@ -161,16 +166,13 @@ static void upStream(tunnel_t *self, context_t *c) LOGD("TcpListener: upstream took %d ms", (int) (time_spent * 1000)); return; } -#endif } - else +#endif + if (c->fin) { - if (c->fin) - { - tcp_listener_con_state_t *cstate = CSTATE(c); - CSTATE_DROP(c); - cleanup(cstate, false); - } + tcp_listener_con_state_t *cstate = CSTATE(c); + CSTATE_DROP(c); + cleanup(cstate, false); } self->up->upStream(self->up, c); @@ -231,18 +233,12 @@ static void onRecv(hio_t *io, shift_buffer_t *buf) reuseBuffer(hloop_bufferpool(hevent_loop(io)), buf); return; } - shift_buffer_t *payload = buf; - tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; - bool *first_packet_sent = &((cstate)->first_packet_sent); + shift_buffer_t *payload = buf; + tunnel_t *self = (cstate)->tunnel; + line_t *line = (cstate)->line; context_t *context = newContext(line); context->payload = payload; - if (! (*first_packet_sent)) - { - *first_packet_sent = true; - context->first = true; - } self->upStream(self, context); } diff --git a/tunnels/adapters/listener/udp/udp_listener.c b/tunnels/adapters/listener/udp/udp_listener.c index 3cc54584..7210735b 100644 --- a/tunnels/adapters/listener/udp/udp_listener.c +++ b/tunnels/adapters/listener/udp/udp_listener.c @@ -52,7 +52,7 @@ static void cleanup(udp_listener_con_state_t *cstate) // sounds impossible... LOGE("Checkpoint udp listener"); // this prevent double free - *cstate = (udp_listener_con_state_t){0}; + *cstate = (udp_listener_con_state_t) {0}; } } else @@ -66,8 +66,11 @@ static void upStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { #ifdef PROFILE - if (c->first) + udp_listener_con_state_t *cstate = CSTATE(c); + bool *first_packet_sent = &((cstate)->first_packet_sent); + if (! (*first_packet_sent)) { + *first_packet_sent = true; struct timeval tv1, tv2; gettimeofday(&tv1, NULL); { @@ -101,7 +104,7 @@ static void downStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { - postUdpWrite(cstate->uio,c->line->tid, c->payload); + postUdpWrite(cstate->uio, c->line->tid, c->payload); dropContexPayload(c); destroyContext(c); } @@ -151,13 +154,13 @@ static udp_listener_con_state_t *newConnection(uint8_t tid, tunnel_t *self, udps line->src_ctx.address_protocol = kSapUdp; line->src_ctx.address = *(sockaddr_u *) hio_peeraddr(uio->io); - *cstate = (udp_listener_con_state_t){.loop = loops[tid], - .line = line, - .buffer_pool = getThreadBufferPool(tid), - .uio = uio, - .tunnel = self, - .established = false, - .first_packet_sent = false}; + *cstate = (udp_listener_con_state_t) {.loop = loops[tid], + .line = line, + .buffer_pool = getThreadBufferPool(tid), + .uio = uio, + .tunnel = self, + .established = false, + .first_packet_sent = false}; sockaddr_set_port(&(line->src_ctx.address), real_localport); @@ -294,7 +297,7 @@ tunnel_t *newUdpListener(node_instance_context_t *instance_info) } socket_filter_option_t filter_opt = {0}; - getStringFromJsonObject(&(filter_opt.balance_group_name),settings, "balance-group"); + getStringFromJsonObject(&(filter_opt.balance_group_name), settings, "balance-group"); getIntFromJsonObject((int *) &(filter_opt.balance_group_interval), settings, "balance-interval"); filter_opt.multiport_backend = kMultiportBackendNothing; @@ -362,7 +365,7 @@ api_result_t apiUdpListener(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyUdpListener(tunnel_t *self) @@ -372,5 +375,5 @@ tunnel_t *destroyUdpListener(tunnel_t *self) } tunnel_metadata_t getMetadataUdpListener(void) { - return (tunnel_metadata_t){.version = 0001, .flags = kNodeFlagChainHead}; + return (tunnel_metadata_t) {.version = 0001, .flags = kNodeFlagChainHead}; } diff --git a/tunnels/client/bgp4/bgp4_client.c b/tunnels/client/bgp4/bgp4_client.c index 2562a085..bfa678b1 100644 --- a/tunnels/client/bgp4/bgp4_client.c +++ b/tunnels/client/bgp4/bgp4_client.c @@ -39,17 +39,23 @@ typedef struct bgp4_client_state_s typedef struct bgp4_client_con_state_s { buffer_stream_t *read_stream; + bool first_packet_sent; } bgp4_client_con_state_t; static void upStream(tunnel_t *self, context_t *c) { - bgp4_client_state_t *state = TSTATE(self); + bgp4_client_state_t *state = TSTATE(self); + bgp4_client_con_state_t *cstate = CSTATE(c); if (c->payload != NULL) { - if (c->first) + uint8_t bgp_type = 2 + (fastRand() % kBgpTypes - 1); + + if (! cstate->first_packet_sent) { + cstate->first_packet_sent = true; + uint32_t additions = 3 + fastRand() % 8; shiftl(c->payload, kBgpOpenPacketHeaderSize + additions); @@ -66,9 +72,9 @@ static void upStream(tunnel_t *self, context_t *c) { header[1 + 2 + 2 + 4 + 1 + i] = fastRand() % 200; } - } - uint8_t bgp_type = c->first ? 1 : 2 + (fastRand() % kBgpTypes - 1); + bgp_type = 1; // BGP Open + } shiftl(c->payload, 1); // type writeUI8(c->payload, bgp_type); @@ -84,12 +90,11 @@ static void upStream(tunnel_t *self, context_t *c) } else { - bgp4_client_con_state_t *cstate = CSTATE(c); if (c->init) { cstate = wwmGlobalMalloc(sizeof(bgp4_client_con_state_t)); - *cstate = (bgp4_client_con_state_t){.read_stream = newBufferStream(getContextBufferPool(c))}; + *cstate = (bgp4_client_con_state_t) {.read_stream = newBufferStream(getContextBufferPool(c))}; CSTATE_MUT(c) = cstate; } else if (c->fin) @@ -166,7 +171,7 @@ static void downStream(tunnel_t *self, context_t *c) wwmGlobalFree(cstate); CSTATE_DROP(c); } - + self->dw->downStream(self->dw, c); return; @@ -207,7 +212,7 @@ api_result_t apiBgp4Client(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyBgp4Client(tunnel_t *self) @@ -217,5 +222,5 @@ tunnel_t *destroyBgp4Client(tunnel_t *self) } tunnel_metadata_t getMetadataBgp4Client(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/client/halfduplex/halfduplex_client.c b/tunnels/client/halfduplex/halfduplex_client.c index 40fdaee5..f5daf2db 100644 --- a/tunnels/client/halfduplex/halfduplex_client.c +++ b/tunnels/client/halfduplex/halfduplex_client.c @@ -14,7 +14,7 @@ typedef struct halfduplex_con_state_s line_t *main_line; line_t *upload_line; line_t *download_line; - + bool first_packet_sent; } halfduplex_con_state_t; static void onMainLinePaused(void *cstate) @@ -43,16 +43,17 @@ static void onUDLineResumed(void *cstate) static void upStream(tunnel_t *self, context_t *c) { halfduplex_con_state_t *cstate = CSTATE(c); + if (c->payload != NULL) { - if (c->first) + if (!cstate->first_packet_sent) { + cstate->first_packet_sent = true; // 63 bits of random is enough and is better than hashing sender addr on halfduplex server, i believe so... uint32_t cids[2] = {fastRand(), fastRand()}; uint8_t *cid_bytes = (uint8_t *) &(cids[0]); context_t *intro_context = newContext(cstate->download_line); - intro_context->first = true; intro_context->payload = popBuffer(getContextBufferPool(c)); cid_bytes[0] = cid_bytes[0] | (1 << 7); // kHLFDCmdDownload diff --git a/tunnels/client/header/header_client.c b/tunnels/client/header/header_client.c index 2d5492d3..8b384fc0 100644 --- a/tunnels/client/header/header_client.c +++ b/tunnels/client/header/header_client.c @@ -19,15 +19,27 @@ typedef struct header_client_state_s typedef struct header_client_con_state_s { - void *_; + bool first_packet_received; } header_client_con_state_t; static void upStream(tunnel_t *self, context_t *c) { - header_client_state_t *state = TSTATE(self); - - if (c->first && c->payload != NULL) + header_client_state_t *state = TSTATE(self); + header_client_con_state_t *cstate = CSTATE(c); + if (c->init) + { + cstate = wwmGlobalMalloc(sizeof(header_client_con_state_t)); + *cstate = (header_client_con_state_t) {0}; + CSTATE_MUT(c) = cstate; + } + else if (c->fin) + { + wwmGlobalFree(cstate); + CSTATE_DROP(c); + } + else if (! cstate->first_packet_received && c->payload != NULL) { + cstate->first_packet_received = true; switch ((enum header_dynamic_value_status) state->data.status) { @@ -47,6 +59,12 @@ static void upStream(tunnel_t *self, context_t *c) static void downStream(tunnel_t *self, context_t *c) { + if (c->fin) + { + wwmGlobalFree( CSTATE(c)); + CSTATE_DROP(c); + } + self->dw->downStream(self->dw, c); } @@ -71,7 +89,7 @@ api_result_t apiHeaderClient(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyHeaderClient(tunnel_t *self) @@ -81,5 +99,5 @@ tunnel_t *destroyHeaderClient(tunnel_t *self) } tunnel_metadata_t getMetadataHeaderClient(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/client/http2/helpers.h b/tunnels/client/http2/helpers.h index 370b2b4a..019381b1 100644 --- a/tunnels/client/http2/helpers.h +++ b/tunnels/client/http2/helpers.h @@ -290,15 +290,9 @@ static void onPingTimer(htimer_t *timer) writeRaw(send_buf, data, len); context_t *req = newContext(h2line); req->payload = send_buf; - if (! con->first_sent) - { - con->first_sent = true; - req->first = true; - } con->tunnel->up->upStream(con->tunnel->up, req); if (! isAlive(h2line)) { - unLockLine(h2line); return; } diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index fb51b12e..6fa3a07d 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -18,7 +18,7 @@ static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, u http2_client_con_state_t *con = (http2_client_con_state_t *) userdata; http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id); - LOGD("callback end stream for: %d", stream_id); + // LOGD("callback end stream for: %d", stream_id); if (! stream) { return 0; @@ -159,7 +159,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr if (stream->bytes_sent_nack >= kMaxSendBeforeAck) { stream->bytes_sent_nack -= consumed; - LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack); + // LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack); if (stream->bytes_sent_nack < kMaxSendBeforeAck) { @@ -169,7 +169,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr else { stream->bytes_sent_nack -= consumed; - LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack); + // LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack); } } } @@ -187,7 +187,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr { if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == NGHTTP2_FLAG_END_STREAM) { - LOGD("end stream for: %d", frame->hd.stream_id); + // LOGD("end stream for: %d", frame->hd.stream_id); http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); if (WW_UNLIKELY(! stream)) @@ -478,7 +478,6 @@ static void upStream(tunnel_t *self, context_t *c) http2_client_con_state_t *con = LSTATE(stream->parent); CSTATE_DROP(c); - // LOGE("closed %d",stream->stream_id); int flags = NGHTTP2_FLAG_END_STREAM | NGHTTP2_FLAG_END_HEADERS; if (con->content_type == kApplicationGrpc) { @@ -489,7 +488,7 @@ static void upStream(tunnel_t *self, context_t *c) { nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, NULL, 0, NULL); } - LOGD("destroy %d", (int) stream->stream_id); + // LOGD("closing -> %d", (int) stream->stream_id); nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); removeStream(con, stream); deleteHttp2Stream(stream); diff --git a/tunnels/client/http2/types.h b/tunnels/client/http2/types.h index b899b4ca..eeb2093a 100644 --- a/tunnels/client/http2/types.h +++ b/tunnels/client/http2/types.h @@ -70,7 +70,6 @@ typedef struct http2_client_con_state_s int host_port; bool handshake_completed; bool init_sent; - bool first_sent; bool no_ping_ack; } http2_client_con_state_t; diff --git a/tunnels/client/openssl/openssl_client.c b/tunnels/client/openssl/openssl_client.c index 834b4a4e..60f00b4b 100644 --- a/tunnels/client/openssl/openssl_client.c +++ b/tunnels/client/openssl/openssl_client.c @@ -185,7 +185,6 @@ static void upStream(tunnel_t *self, context_t *c) { setLen(buf, n); client_hello_ctx->payload = buf; - client_hello_ctx->first = true; self->up->upStream(self->up, client_hello_ctx); } else if (! BIO_should_retry(cstate->rbio)) diff --git a/tunnels/client/protobuf/protobuf_client.c b/tunnels/client/protobuf/protobuf_client.c index 40e1c6a5..6685e5be 100644 --- a/tunnels/client/protobuf/protobuf_client.c +++ b/tunnels/client/protobuf/protobuf_client.c @@ -27,7 +27,6 @@ typedef struct protobuf_client_state_s typedef struct protobuf_client_con_state_s { buffer_stream_t *stream_buf; - bool first_sent; } protobuf_client_con_state_t; @@ -52,9 +51,8 @@ static void upStream(tunnel_t *self, context_t *c) if (c->init) { protobuf_client_con_state_t *cstate = wwmGlobalMalloc(sizeof(protobuf_client_con_state_t)); - *cstate = (protobuf_client_con_state_t){.first_sent = false, - .stream_buf = newBufferStream(getContextBufferPool(c))}; - CSTATE_MUT(c) = cstate; + *cstate = (protobuf_client_con_state_t) {.stream_buf = newBufferStream(getContextBufferPool(c))}; + CSTATE_MUT(c) = cstate; } else if (c->fin) { @@ -116,12 +114,6 @@ static void downStream(tunnel_t *self, context_t *c) reuseBuffer(getContextBufferPool(c), full_data); } - if (! cstate->first_sent) - { - downstream_ctx->first = true; - cstate->first_sent = true; - } - self->dw->downStream(self->dw, downstream_ctx); if (! isAlive(c->line)) @@ -163,7 +155,7 @@ api_result_t apiProtoBufClient(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyProtoBufClient(tunnel_t *self) @@ -173,5 +165,5 @@ tunnel_t *destroyProtoBufClient(tunnel_t *self) } tunnel_metadata_t getMetadataProtoBufClient(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/client/reality/reality_client.c b/tunnels/client/reality/reality_client.c index 768e11df..93383ae3 100644 --- a/tunnels/client/reality/reality_client.c +++ b/tunnels/client/reality/reality_client.c @@ -45,7 +45,6 @@ typedef struct reality_client_con_state_s buffer_stream_t *read_stream; context_queue_t *queue; bool handshake_completed; - bool first_sent; } reality_client_con_state_t; @@ -111,7 +110,6 @@ static void upStream(tunnel_t *self, context_t *c) if (! cstate->handshake_completed) { - c->first = false; contextQueuePush(cstate->queue, c); return; } @@ -193,7 +191,6 @@ static void upStream(tunnel_t *self, context_t *c) { setLen(buf, n); client_hello_ctx->payload = buf; - client_hello_ctx->first = true; self->up->upStream(self->up, client_hello_ctx); } else if (! BIO_should_retry(cstate->rbio)) diff --git a/tunnels/client/reverse/helpers.h b/tunnels/client/reverse/helpers.h index 268798d5..b0333920 100644 --- a/tunnels/client/reverse/helpers.h +++ b/tunnels/client/reverse/helpers.h @@ -73,7 +73,6 @@ static void doConnect(struct connect_arg *cg) destroyContext(hello_data_ctx); return; } - hello_data_ctx->first = true; hello_data_ctx->payload = popBuffer(getContextBufferPool(hello_data_ctx)); setLen(hello_data_ctx->payload, kHandShakeLength); memset(rawBufMut(hello_data_ctx->payload), kHandShakeByte, kHandShakeLength); diff --git a/tunnels/client/reverse/reverse_client.c b/tunnels/client/reverse/reverse_client.c index 677d510a..eb486d9b 100644 --- a/tunnels/client/reverse/reverse_client.c +++ b/tunnels/client/reverse/reverse_client.c @@ -81,10 +81,7 @@ static void downStream(tunnel_t *self, context_t *c) return; } unLockLine(ucstate->d); - ucstate->first_sent_d = true; - context_t *turned = switchLine(c, ucstate->d); - turned->first = true; - self->dw->downStream(self->dw, turned); + self->dw->downStream(self->dw, switchLine(c, ucstate->d)); } } else diff --git a/tunnels/client/reverse/types.h b/tunnels/client/reverse/types.h index 1e045a80..68ccc431 100644 --- a/tunnels/client/reverse/types.h +++ b/tunnels/client/reverse/types.h @@ -13,7 +13,6 @@ typedef struct reverse_client_con_state_s { bool pair_connected; bool established; - bool first_sent_d; idle_item_t *idle_handle; line_t *u; line_t *d; diff --git a/tunnels/client/wolfssl/wolfssl_client.c b/tunnels/client/wolfssl/wolfssl_client.c index a79163b5..847d90c0 100644 --- a/tunnels/client/wolfssl/wolfssl_client.c +++ b/tunnels/client/wolfssl/wolfssl_client.c @@ -185,7 +185,6 @@ static void upStream(tunnel_t *self, context_t *c) { setLen(buf, n); client_hello_ctx->payload = buf; - client_hello_ctx->first = true; self->up->upStream(self->up, client_hello_ctx); } else if (! BIO_should_retry(cstate->rbio)) diff --git a/tunnels/server/bgp4/bgp4_server.c b/tunnels/server/bgp4/bgp4_server.c index 6ebbb184..eacc7a66 100644 --- a/tunnels/server/bgp4/bgp4_server.c +++ b/tunnels/server/bgp4/bgp4_server.c @@ -35,7 +35,6 @@ typedef struct bgp4_client_con_state_s { buffer_stream_t *read_stream; bool open_received; - bool first_sent; } bgp4_client_con_state_t; @@ -120,11 +119,6 @@ static void upStream(tunnel_t *self, context_t *c) context_t *data_ctx = newContext(c->line); data_ctx->payload = buf; - if (! cstate->first_sent) - { - cstate->first_sent = true; - c->first = true; - } self->up->upStream(self->up, data_ctx); } else diff --git a/tunnels/server/halfduplex/halfduplex_server.c b/tunnels/server/halfduplex/halfduplex_server.c index a641e283..e44382b2 100644 --- a/tunnels/server/halfduplex/halfduplex_server.c +++ b/tunnels/server/halfduplex/halfduplex_server.c @@ -40,17 +40,15 @@ typedef struct halfduplex_server_state_s typedef struct halfduplex_server_con_state_s { + + shift_buffer_t *buffering; + line_t *upload_line; + line_t *download_line; + line_t *main_line; + pipe_line_t *pipe; enum connection_status state; hash_t hash; - - shift_buffer_t *buffering; - line_t *upload_line; - line_t *download_line; - line_t *main_line; - pipe_line_t *pipe; - bool first_sent; - } halfduplex_server_con_state_t; struct notify_argument_s @@ -147,7 +145,7 @@ static void notifyDownloadLineIsReadyForBind(hash_t hash, tunnel_t *self, uint8_ uint8_t tid_download_line = (*f_iter.ref).second->download_line->tid; hhybridmutex_unlock(&(state->download_line_map_mutex)); - // a very rare case is when this_tid == tid_download_line + // a very rare case is when this_tid == tid_download_line LSTATE_DROP(upload_line_cstate->upload_line); @@ -286,11 +284,6 @@ static void upStream(tunnel_t *self, context_t *c) shiftr(c->payload, sizeof(uint64_t)); if (bufLen(buf) > 0) { - if (! cstate->first_sent) - { - cstate->first_sent = true; - c->first = true; - } self->up->upStream(self->up, switchLine(c, main_line)); return; } @@ -306,7 +299,6 @@ static void upStream(tunnel_t *self, context_t *c) pipeTo(self, c->line, tid_download_line); pipeUpStream(c); return; // piped to another worker which has waiting connections - } } else @@ -391,11 +383,9 @@ static void upStream(tunnel_t *self, context_t *c) if (bufLen(upload_line_cstate->buffering) > 0) { - context_t *buf_ctx = newContext(main_line); - buf_ctx->payload = upload_line_cstate->buffering; - buf_ctx->first = true; - upload_line_cstate->buffering = NULL; - upload_line_cstate->first_sent = true; + context_t *buf_ctx = newContext(main_line); + buf_ctx->payload = upload_line_cstate->buffering; + upload_line_cstate->buffering = NULL; shiftr(buf_ctx->payload, sizeof(uint64_t)); self->up->upStream(self->up, buf_ctx); } @@ -426,7 +416,7 @@ static void upStream(tunnel_t *self, context_t *c) // tell upload line to re-check struct notify_argument_s *evdata = wwmGlobalMalloc(sizeof(struct notify_argument_s)); - *evdata = (struct notify_argument_s){.self = self, .hash = hash, .tid = tid_upload_line}; + *evdata = (struct notify_argument_s) {.self = self, .hash = hash, .tid = tid_upload_line}; hevent_t ev; memset(&ev, 0, sizeof(ev)); @@ -480,11 +470,6 @@ static void upStream(tunnel_t *self, context_t *c) break; case kCsUploadDirect: - if (! cstate->first_sent) - { - cstate->first_sent = true; - c->first = true; - } self->up->upStream(self->up, switchLine(c, cstate->main_line)); break; @@ -500,12 +485,8 @@ static void upStream(tunnel_t *self, context_t *c) if (c->init) { cstate = wwmGlobalMalloc(sizeof(halfduplex_server_con_state_t)); - *cstate = (halfduplex_server_con_state_t){.state = kCsUnkown, - .buffering = NULL, - .pipe = NULL, - .upload_line = NULL, - .download_line = NULL, - .first_sent = false}; + *cstate = (halfduplex_server_con_state_t) { + .state = kCsUnkown, .buffering = NULL, .pipe = NULL, .upload_line = NULL, .download_line = NULL}; CSTATE_MUT(c) = cstate; if (isDownPiped(c->line)) @@ -775,7 +756,7 @@ api_result_t apiHalfDuplexServer(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyHalfDuplexServer(tunnel_t *self) @@ -785,5 +766,5 @@ tunnel_t *destroyHalfDuplexServer(tunnel_t *self) } tunnel_metadata_t getMetadataHalfDuplexServer(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/server/header/header_server.c b/tunnels/server/header/header_server.c index 047dbca3..4257853f 100644 --- a/tunnels/server/header/header_server.c +++ b/tunnels/server/header/header_server.c @@ -20,7 +20,6 @@ typedef struct header_server_state_s typedef struct header_server_con_state_s { bool init_sent; - bool first_sent; shift_buffer_t *buf; } header_server_con_state_t; @@ -38,18 +37,21 @@ static void upStream(tunnel_t *self, context_t *c) c->payload = appendBufferMerge(getContextBufferPool(c), cstate->buf, c->payload); cstate->buf = NULL; } - if (bufLen(c->payload) < 2) - { - cstate->buf = c->payload; - dropContexPayload(c); - destroyContext(c); - return; - } - shift_buffer_t *buf = c->payload; - uint16_t port = 0; + + shift_buffer_t *buf = c->payload; + switch ((enum header_dynamic_value_status) state->data.status) { - case kHdvsDestPort: + case kHdvsDestPort:; + + uint16_t port = 0; + if (bufLen(c->payload) < 2) + { + cstate->buf = c->payload; + dropContexPayload(c); + destroyContext(c); + return; + } readUI16(buf, &port); sockaddr_set_port(&(c->line->dest_ctx.address), port); @@ -84,17 +86,12 @@ static void upStream(tunnel_t *self, context_t *c) return; } } - if (! cstate->first_sent) - { - c->first = true; - cstate->first_sent = true; - } self->up->upStream(self->up, c); } else if (c->init) { cstate = wwmGlobalMalloc(sizeof(header_server_con_state_t)); - *cstate = (header_server_con_state_t){0}; + *cstate = (header_server_con_state_t) {0}; CSTATE_MUT(c) = cstate; destroyContext(c); } @@ -155,7 +152,7 @@ api_result_t apiHeaderServer(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyHeaderServer(tunnel_t *self) @@ -165,5 +162,5 @@ tunnel_t *destroyHeaderServer(tunnel_t *self) } tunnel_metadata_t getMetadataHeaderServer(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index e66ba6ff..4364b47a 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -13,7 +13,7 @@ static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, u http2_server_con_state_t *con = (http2_server_con_state_t *) userdata; http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id); - LOGD("callback end stream for: %d", stream_id); + // LOGD("callback end stream for: %d", stream_id); if (! stream) { @@ -98,8 +98,9 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3 writeRaw(buf, data, len); lockLine(stream->line); - action_queue_t_push(&con->actions, - (http2_action_t) {.action_id = kActionStreamDataReceived, .stream_line = stream->line, .buf = buf}); + action_queue_t_push( + &con->actions, + (http2_action_t) {.action_id = kActionStreamDataReceived, .stream_line = stream->line, .buf = buf}); return 0; } @@ -143,7 +144,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr if (stream->bytes_sent_nack >= kMaxSendBeforeAck) { stream->bytes_sent_nack -= consumed; - LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack); + // LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack); if (stream->bytes_sent_nack < kMaxSendBeforeAck) { @@ -153,7 +154,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr else { stream->bytes_sent_nack -= consumed; - LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack); + // LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack); } } } @@ -171,7 +172,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr { if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == NGHTTP2_FLAG_END_STREAM) { - LOGD("end stream for: %d", frame->hd.stream_id); + // LOGD("end stream for: %d", frame->hd.stream_id); http2_server_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); if (WW_UNLIKELY(! stream)) @@ -339,13 +340,7 @@ static void doHttp2Action(const http2_action_t action, http2_server_con_state_t context_t *stream_data = newContext(stream->line); stream_data->payload = gdata_buf; - if (! stream->first_sent) - { - stream->first_sent = true; - stream_data->first = true; - } stream->bytes_received_nack += bufLen(gdata_buf); - stream->tunnel->up->upStream(stream->tunnel->up, stream_data); // check http2 connection is alive @@ -365,11 +360,6 @@ static void doHttp2Action(const http2_action_t action, http2_server_con_state_t context_t *stream_data = newContext(stream->line); stream_data->payload = buf; - if (! stream->first_sent) - { - stream->first_sent = true; - stream_data->first = true; - } stream->bytes_received_nack += bufLen(buf); stream->tunnel->up->upStream(stream->tunnel->up, stream_data); } @@ -531,7 +521,7 @@ static void downStream(tunnel_t *self, context_t *c) nghttp2_submit_trailer(con->session, stream->stream_id, NULL, 0); } - LOGE("destroy %d", stream->stream_id); + // LOGE("closing -> %d", stream->stream_id); nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL); removeStream(con, stream); deleteHttp2Stream(stream); diff --git a/tunnels/server/http2/types.h b/tunnels/server/http2/types.h index ca9913f1..596c4e8c 100644 --- a/tunnels/server/http2/types.h +++ b/tunnels/server/http2/types.h @@ -45,7 +45,6 @@ typedef struct http2_server_child_con_state_s size_t bytes_received_nack; size_t grpc_bytes_needed; int32_t stream_id; - bool first_sent; } http2_server_child_con_state_t; diff --git a/tunnels/server/openssl/openssl_server.c b/tunnels/server/openssl/openssl_server.c index 4bf15701..9db40c92 100644 --- a/tunnels/server/openssl/openssl_server.c +++ b/tunnels/server/openssl/openssl_server.c @@ -33,20 +33,18 @@ typedef struct oss_server_state_s typedef struct oss_server_con_state_s { - bool handshake_completed; - bool fallback_mode; - bool fallback_init_sent; - bool fallback_first_sent; - bool first_sent; - bool init_sent; - bool fallback_disabled; - /* 8-bit pad*/ + buffer_stream_t *fallback_buf; SSL *ssl; BIO *rbio; BIO *wbio; + bool handshake_completed; + bool fallback_mode; + bool fallback_init_sent; + bool init_sent; + int reply_sent_tit; - int reply_sent_tit; + bool fallback_disabled; } oss_server_con_state_t; @@ -166,12 +164,6 @@ static void fallbackWrite(tunnel_t *self, context_t *c) destroyContext(c); return; } - if (! cstate->fallback_first_sent) - { - c->first = true; - cstate->fallback_first_sent = true; - } - c->payload = bufferStreamIdealRead(cstate->fallback_buf); state->fallback->upStream(state->fallback, c); } @@ -312,11 +304,6 @@ static void upStream(tunnel_t *self, context_t *c) setLen(buf, n); context_t *data_ctx = newContextFrom(c); data_ctx->payload = buf; - if (! (cstate->first_sent)) - { - data_ctx->first = true; - cstate->first_sent = true; - } self->up->upStream(self->up, data_ctx); if (! isAlive(c->line)) { diff --git a/tunnels/server/preconnect/preconnect_server.c b/tunnels/server/preconnect/preconnect_server.c index aa5db647..2850593f 100644 --- a/tunnels/server/preconnect/preconnect_server.c +++ b/tunnels/server/preconnect/preconnect_server.c @@ -11,18 +11,20 @@ typedef struct preconnect_server_state_s typedef struct preconnect_server_con_state_s { bool init_sent; + bool first_packet_sent; } preconnect_server_con_state_t; static void upStream(tunnel_t *self, context_t *c) { + preconnect_server_con_state_t *cstate = CSTATE(c); if (c->payload != NULL) { - preconnect_server_con_state_t *cstate = CSTATE(c); - if (c->first) + if (! cstate->first_packet_sent) { - cstate->init_sent = true; + cstate->first_packet_sent = true; + cstate->init_sent = true; self->up->upStream(self->up, newInitContext(c->line)); if (! isAlive(c->line)) { @@ -35,16 +37,15 @@ static void upStream(tunnel_t *self, context_t *c) } else if (c->init) { - preconnect_server_con_state_t *cstate = wwmGlobalMalloc(sizeof(preconnect_server_con_state_t)); - cstate->init_sent = false; - CSTATE_MUT(c) = cstate; + cstate = wwmGlobalMalloc(sizeof(preconnect_server_con_state_t)); + *cstate = (preconnect_server_con_state_t) {.init_sent = false, .first_packet_sent = false}; + CSTATE_MUT(c) = cstate; destroyContext(c); return; } else if (c->fin) { - preconnect_server_con_state_t *cstate = CSTATE(c); - bool send_fin = cstate->init_sent; + bool send_fin = cstate->init_sent; wwmGlobalFree(cstate); CSTATE_DROP(c); if (send_fin) @@ -82,7 +83,6 @@ tunnel_t *newPreConnectServer(node_instance_context_t *instance_info) t->state = state; t->upStream = &upStream; t->downStream = &downStream; - return t; } @@ -91,7 +91,7 @@ api_result_t apiPreConnectServer(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyPreConnectServer(tunnel_t *self) @@ -101,5 +101,5 @@ tunnel_t *destroyPreConnectServer(tunnel_t *self) } tunnel_metadata_t getMetadataPreConnectServer(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/server/protobuf/protobuf_server.c b/tunnels/server/protobuf/protobuf_server.c index a3e840ea..c255e123 100644 --- a/tunnels/server/protobuf/protobuf_server.c +++ b/tunnels/server/protobuf/protobuf_server.c @@ -29,7 +29,6 @@ typedef struct protobuf_server_state_s typedef struct protobuf_server_con_state_s { buffer_stream_t *stream_buf; - bool first_sent; } protobuf_server_con_state_t; @@ -89,12 +88,6 @@ static void upStream(tunnel_t *self, context_t *c) reuseBuffer(getContextBufferPool(c), full_data); } - if (! cstate->first_sent) - { - upstream_ctx->first = true; - cstate->first_sent = true; - } - self->up->upStream(self->up, upstream_ctx); if (! isAlive(c->line)) @@ -109,8 +102,7 @@ static void upStream(tunnel_t *self, context_t *c) if (c->init) { cstate = wwmGlobalMalloc(sizeof(protobuf_server_con_state_t)); - *cstate = (protobuf_server_con_state_t){.first_sent = false, - .stream_buf = newBufferStream(getContextBufferPool(c))}; + *cstate = (protobuf_server_con_state_t) {.stream_buf = newBufferStream(getContextBufferPool(c))}; CSTATE_MUT(c) = cstate; } else if (c->fin) @@ -168,7 +160,7 @@ api_result_t apiProtoBufServer(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyProtoBufServer(tunnel_t *self) @@ -178,5 +170,5 @@ tunnel_t *destroyProtoBufServer(tunnel_t *self) } tunnel_metadata_t getMetadataProtoBufServer(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/server/reality/reality_server.c b/tunnels/server/reality/reality_server.c index 9b9e4744..8c745393 100644 --- a/tunnels/server/reality/reality_server.c +++ b/tunnels/server/reality/reality_server.c @@ -44,7 +44,6 @@ typedef struct reality_server_con_state_s buffer_stream_t *read_stream; uint8_t giveup_counter; enum connection_auth_state auth_state; - bool first_sent; uint32_t reply_sent_tit; } reality_server_con_state_t; @@ -108,10 +107,8 @@ static void upStream(tunnel_t *self, context_t *c) record_buf = genericDecrypt(record_buf, cstate->cipher_context, state->context_password, getContextBufferPool(c)); - cstate->first_sent = true; context_t *plain_data_ctx = newContextFrom(c); plain_data_ctx->payload = record_buf; - plain_data_ctx->first = true; self->up->upStream(self->up, plain_data_ctx); if (! isAlive(c->line)) @@ -174,12 +171,6 @@ static void upStream(tunnel_t *self, context_t *c) context_t *plain_data_ctx = newContextFrom(c); plain_data_ctx->payload = buf; - - if (WW_UNLIKELY(! cstate->first_sent)) - { - plain_data_ctx->first = true; - cstate->first_sent = true; - } self->up->upStream(self->up, plain_data_ctx); } else diff --git a/tunnels/server/reverse/reverse_server.c b/tunnels/server/reverse/reverse_server.c index 1eef2331..e9267046 100644 --- a/tunnels/server/reverse/reverse_server.c +++ b/tunnels/server/reverse/reverse_server.c @@ -217,9 +217,10 @@ static void downStream(tunnel_t *self, context_t *c) if (c->payload != NULL) { - if (c->first) + // reverse server will not create and consider the connection, before it sends at least 1 data packet + // so the context is null if nothing is received so far... + if (ucstate == NULL) { - c->first = false; const uint8_t tid = c->line->tid; thread_box_t *this_tb = &(state->threadlocal_pool[tid]); @@ -230,7 +231,7 @@ static void downStream(tunnel_t *self, context_t *c) dcstate->u = c->line; dcstate->paired = true; setupLineUpSide(dcstate->u, onLinePausedU, dcstate, onLineResumedU); - + if (! isAlive(c->line)) { reuseContextPayload(c); @@ -288,6 +289,7 @@ static void downStream(tunnel_t *self, context_t *c) { if (c->init) { + assert(c->line->up_state == NULL); self->up->upStream(self->up, newEstContext(c->line)); @@ -331,7 +333,8 @@ static void downStream(tunnel_t *self, context_t *c) tunnel_t *newReverseServer(node_instance_context_t *instance_info) { (void) instance_info; - reverse_server_state_t *state = wwmGlobalMalloc(sizeof(reverse_server_state_t) + (workers_count * sizeof(thread_box_t))); + reverse_server_state_t *state = + wwmGlobalMalloc(sizeof(reverse_server_state_t) + (workers_count * sizeof(thread_box_t))); memset(state, 0, sizeof(reverse_server_state_t) + (workers_count * sizeof(thread_box_t))); tunnel_t *t = newTunnel(); @@ -346,7 +349,7 @@ api_result_t apiReverseServer(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyReverseServer(tunnel_t *self) @@ -356,5 +359,5 @@ tunnel_t *destroyReverseServer(tunnel_t *self) } tunnel_metadata_t getMetadataReverseServer(void) { - return (tunnel_metadata_t){.version = 0001, .flags = kNodeFlagChainHead}; + return (tunnel_metadata_t) {.version = 0001, .flags = kNodeFlagChainHead}; } diff --git a/tunnels/server/socks/5/socks5_server.c b/tunnels/server/socks/5/socks5_server.c index b2a5dce9..1219d1a9 100644 --- a/tunnels/server/socks/5/socks5_server.c +++ b/tunnels/server/socks/5/socks5_server.c @@ -75,7 +75,6 @@ typedef struct socks5_server_con_state_s bool authenticated; bool established; bool init_sent; - bool first_sent; socks5_state_e state; shift_buffer_t *waitbuf; unsigned int udp_data_offset; @@ -268,10 +267,7 @@ static void upStream(tunnel_t *self, context_t *c) } else if (c->line->dest_ctx.address_protocol == kSapTcp) { - if (! cstate->first_sent) - { - c->first = cstate->first_sent = true; - } + self->up->upStream(self->up, c); } @@ -508,8 +504,6 @@ static void upStream(tunnel_t *self, context_t *c) { context_t *updata = newContextFrom(c); updata->payload = bytes; - updata->first = true; - cstate->first_sent = true; self->up->upStream(self->up, updata); } else diff --git a/tunnels/server/trojan/auth/trojan_auth_server.c b/tunnels/server/trojan/auth/trojan_auth_server.c index 4a5d2f7a..8d5a474a 100644 --- a/tunnels/server/trojan/auth/trojan_auth_server.c +++ b/tunnels/server/trojan/auth/trojan_auth_server.c @@ -29,6 +29,7 @@ typedef struct trojan_auth_server_con_state_s { bool authenticated; bool init_sent; + bool first_packet_received; } trojan_auth_server_con_state_t; @@ -78,11 +79,10 @@ static void upStream(tunnel_t *self, context_t *c) if (cstate->authenticated) { self->up->upStream(self->up, c); - return; } - - if (c->first) + else if (! cstate->first_packet_received) { + cstate->first_packet_received = true; // struct timeval tv1, tv2; // gettimeofday(&tv1, NULL); { @@ -142,17 +142,18 @@ static void upStream(tunnel_t *self, context_t *c) // gettimeofday(&tv2, NULL); // double time_spent = (double)(tv2.tv_usec - tv1.tv_usec) / 1000000 + (double)(tv2.tv_sec - // tv1.tv_sec); LOGD("Auth: took %lf sec", time_spent); - return; } - - goto failed; + else + { + goto failed; + } } else { if (c->init) { - cstate = wwmGlobalMalloc(sizeof(trojan_auth_server_con_state_t)); - memset(cstate, 0, sizeof(trojan_auth_server_con_state_t)); + cstate = wwmGlobalMalloc(sizeof(trojan_auth_server_con_state_t)); + *cstate = (trojan_auth_server_con_state_t) {0}; CSTATE_MUT(c) = cstate; destroyContext(c); } @@ -181,13 +182,13 @@ static void upStream(tunnel_t *self, context_t *c) } return; -failed:; +failed: if (state->fallback != NULL) { goto fallback; } - // disconnect:; + // disconnect: reuseContextPayload(c); wwmGlobalFree(CSTATE(c)); CSTATE_DROP(c); @@ -195,7 +196,7 @@ failed:; destroyContext(c); self->dw->downStream(self->dw, reply); return; -fallback:; +fallback: if (! cstate->init_sent) { cstate->init_sent = true; @@ -228,7 +229,7 @@ static void downStream(tunnel_t *self, context_t *c) self->dw->downStream(self->dw, c); } -static void parse(tunnel_t *t, cJSON *settings,node_instance_context_t *instance_info) +static void parse(tunnel_t *t, cJSON *settings, node_instance_context_t *instance_info) { trojan_auth_server_state_t *state = t->state; if (! (cJSON_IsObject(settings) && settings->child != NULL)) @@ -297,7 +298,7 @@ static void parse(tunnel_t *t, cJSON *settings,node_instance_context_t *instance } hash_t hash_next = CALC_HASH_BYTES(fallback_node_name, strlen(fallback_node_name)); - node_t *fallback_node = getNode(instance_info->node_manager_config,hash_next); + node_t *fallback_node = getNode(instance_info->node_manager_config, hash_next); if (fallback_node == NULL) { LOGF("TrojanAuthServer: fallback node not found"); @@ -305,7 +306,7 @@ static void parse(tunnel_t *t, cJSON *settings,node_instance_context_t *instance } if (fallback_node->instance == NULL) { - runNode(instance_info->node_manager_config,fallback_node, instance_info->chain_index + 1); + runNode(instance_info->node_manager_config, fallback_node, instance_info->chain_index + 1); } state->fallback = fallback_node->instance; @@ -344,7 +345,7 @@ api_result_t apiTrojanAuthServer(tunnel_t *self, const char *msg) (void) (self); (void) (msg); - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyTrojanAuthServer(tunnel_t *self) @@ -355,5 +356,5 @@ tunnel_t *destroyTrojanAuthServer(tunnel_t *self) tunnel_metadata_t getMetadataTrojanAuthServer(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/server/trojan/socks/trojan_socks_server.c b/tunnels/server/trojan/socks/trojan_socks_server.c index c584687a..677befcc 100644 --- a/tunnels/server/trojan/socks/trojan_socks_server.c +++ b/tunnels/server/trojan/socks/trojan_socks_server.c @@ -31,10 +31,11 @@ typedef struct trojan_socks_server_state_s typedef struct trojan_socks_server_con_state_s { - bool init_sent; - bool first_sent; buffer_stream_t *udp_stream; + bool init_sent; + bool first_packet_received; + bool udp_logged; } trojan_socks_server_con_state_t; static void cleanup(trojan_socks_server_con_state_t *cstate) @@ -54,7 +55,7 @@ static void encapsulateUdpPacket(context_t *c) writeRaw(c->payload, (unsigned char *) "\r\n", 2); shiftl(c->payload, 2); // LEN - writeUI16(c->payload, htons(packet_len)); + writeUI16(c->payload, htons(packet_len)); uint16_t port = sockaddr_port(&(c->line->dest_ctx.address)); shiftl(c->payload, 2); // port @@ -317,8 +318,9 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, dest_context->address_type = kSatIPV4; memcpy(&(dest_context->address.sin.sin_addr), rawBuf(c->payload), 4); shiftr(c->payload, 4); - if (! cstate->first_sent) + if (! cstate->udp_logged) { + cstate->udp_logged = true; LOGD("TrojanSocksServer: udp ipv4"); } @@ -327,8 +329,9 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, dest_context->address_type = kSatDomainName; // size_t addr_len = (unsigned char)(rawBuf(c->payload)[0]); shiftr(c->payload, 1); - if (! cstate->first_sent) // print once per connection + if (! cstate->udp_logged) { + cstate->udp_logged = true; LOGD("TrojanSocksServer: udp domain %.*s", domain_len, rawBuf(c->payload)); } @@ -341,8 +344,9 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, dest_context->address.sa.sa_family = AF_INET6; memcpy(&(dest_context->address.sin.sin_addr), rawBuf(c->payload), 16); shiftr(c->payload, 16); - if (! cstate->first_sent) + if (! cstate->udp_logged) { + cstate->udp_logged = true; LOGD("TrojanSocksServer: udp ipv6"); } break; @@ -380,11 +384,7 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, } cstate->init_sent = true; } - if (! cstate->first_sent) - { - c->first = true; - cstate->first_sent = true; - } + self->up->upStream(self->up, c); if (! isAlive(line)) @@ -398,13 +398,14 @@ static void upStream(tunnel_t *self, context_t *c) { if (c->payload != NULL) { + trojan_socks_server_con_state_t *cstate = CSTATE(c); - if (c->first) + if (! cstate->first_packet_received) { + cstate->first_packet_received = true; if (parseAddress(c)) { - trojan_socks_server_con_state_t *cstate = CSTATE(c); - socket_context_t *dest_context = &(c->line->dest_ctx); + socket_context_t *dest_context = &(c->line->dest_ctx); if (dest_context->address_protocol == kSapTcp) { @@ -434,11 +435,6 @@ static void upStream(tunnel_t *self, context_t *c) } if (dest_context->address_protocol == kSapTcp) { - if (! cstate->first_sent) - { - c->first = true; - cstate->first_sent = true; - } self->up->upStream(self->up, c); return; } @@ -474,9 +470,7 @@ static void upStream(tunnel_t *self, context_t *c) } else { - trojan_socks_server_con_state_t *cstate = CSTATE(c); reuseContextPayload(c); - cleanup(cstate); CSTATE_DROP(c); context_t *reply = newFinContextFrom(c); @@ -486,7 +480,6 @@ static void upStream(tunnel_t *self, context_t *c) } else { - trojan_socks_server_con_state_t *cstate = CSTATE(c); if (c->line->dest_ctx.address_protocol == kSapUdp) { @@ -549,10 +542,11 @@ static void upStream(tunnel_t *self, context_t *c) static void downStream(tunnel_t *self, context_t *c) { - trojan_socks_server_con_state_t *cstate = CSTATE(c); if (c->fin) { + trojan_socks_server_con_state_t *cstate = CSTATE(c); + cleanup(cstate); CSTATE_DROP(c); self->dw->downStream(self->dw, c); @@ -582,7 +576,7 @@ api_result_t apiTrojanSocksServer(tunnel_t *self, const char *msg) { (void) self; (void) msg; - return (api_result_t){0}; + return (api_result_t) {0}; } tunnel_t *destroyTrojanSocksServer(tunnel_t *self) @@ -593,5 +587,5 @@ tunnel_t *destroyTrojanSocksServer(tunnel_t *self) tunnel_metadata_t getMetadataTrojanSocksServer(void) { - return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; } diff --git a/tunnels/server/wolfssl/wolfssl_server.c b/tunnels/server/wolfssl/wolfssl_server.c index a93e7e2c..8432f1ad 100644 --- a/tunnels/server/wolfssl/wolfssl_server.c +++ b/tunnels/server/wolfssl/wolfssl_server.c @@ -36,8 +36,6 @@ typedef struct wssl_server_con_state_s bool handshake_completed; bool fallback_mode; bool fallback_init_sent; - bool fallback_first_sent; - bool first_sent; bool init_sent; bool fallback_disabled; buffer_stream_t *fallback_buf; @@ -143,11 +141,7 @@ static void fallbackWrite(tunnel_t *self, context_t *c) { return; } - if (! cstate->fallback_first_sent) - { - c->first = true; - cstate->fallback_first_sent = true; - } + c->payload = bufferStreamIdealRead(cstate->fallback_buf); state->fallback->upStream(state->fallback, c); @@ -301,11 +295,7 @@ static void upStream(tunnel_t *self, context_t *c) setLen(buf, n); context_t *data_ctx = newContextFrom(c); data_ctx->payload = buf; - if (! (cstate->first_sent)) - { - data_ctx->first = true; - cstate->first_sent = true; - } + self->up->upStream(self->up, data_ctx); if (! isAlive(c->line)) { diff --git a/ww/pipe_line.c b/ww/pipe_line.c index d6576eb1..d7cbf517 100644 --- a/ww/pipe_line.c +++ b/ww/pipe_line.c @@ -11,11 +11,10 @@ struct pipe_line_s tunnel_t *self; line_t *left_line; line_t *right_line; - atomic_bool closed; atomic_int refc; - bool first_sent; + atomic_bool closed; uint8_t left_tid; - uint8_t right_tid; + uint8_t right_tid; /* 8-bit pad */ PipeLineFlowRoutine local_up_stream; PipeLineFlowRoutine local_down_stream; @@ -86,7 +85,7 @@ static void onMsgReceived(hevent_t *ev) static void sendMessage(pipe_line_t *pl, MsgTargetFunction fn, void *arg, uint8_t tid_from, uint8_t tid_to) { - + if (WW_UNLIKELY(tid_from == tid_to)) { fn(pl, arg); @@ -94,7 +93,7 @@ static void sendMessage(pipe_line_t *pl, MsgTargetFunction fn, void *arg, uint8_ } lock(pl); struct msg_event *evdata = popPoolItem(pipeline_msg_pools[tid_from]); - *evdata = (struct msg_event){.pl = pl, .function = *(void **) (&fn), .arg = arg, .target_tid = tid_to}; + *evdata = (struct msg_event) {.pl = pl, .function = *(void **) (&fn), .arg = arg, .target_tid = tid_to}; hevent_t ev; memset(&ev, 0, sizeof(ev)); @@ -127,11 +126,6 @@ static void writeBufferToRightSide(pipe_line_t *pl, void *arg) } context_t *ctx = newContext(pl->right_line); ctx->payload = buf; - if (WW_UNLIKELY(! pl->first_sent)) - { - pl->first_sent = true; - ctx->first = true; - } pl->local_up_stream(pl->self, ctx, pl); } @@ -376,17 +370,16 @@ void newPipeLine(tunnel_t *self, line_t *left_line, uint8_t dest_tid, PipeLineFl // align pointer to line cache boundary pipe_line_t *pl = (pipe_line_t *) ALIGN2(ptr, kCpuLineCacheSize); // NOLINT - *pl = (pipe_line_t){.memptr = (void *) ptr, - .self = self, - .left_tid = left_line->tid, - .right_tid = dest_tid, - .left_line = left_line, - .right_line = NULL, - .closed = false, - .first_sent = false, - .refc = 1, - .local_up_stream = local_up_stream, - .local_down_stream = local_down_stream}; + *pl = (pipe_line_t) {.memptr = (void *) ptr, + .self = self, + .left_tid = left_line->tid, + .right_tid = dest_tid, + .left_line = left_line, + .right_line = NULL, + .closed = false, + .refc = 1, + .local_up_stream = local_up_stream, + .local_down_stream = local_down_stream}; initLeft(pl, NULL); sendMessage(pl, initRight, NULL, pl->left_tid, pl->right_tid); diff --git a/ww/tunnel.h b/ww/tunnel.h index 53b3f5d9..4a5adeca 100644 --- a/ww/tunnel.h +++ b/ww/tunnel.h @@ -126,13 +126,12 @@ typedef struct line_s the only flag that also has a payload is `first` , other flags have no payload */ -typedef struct context_s // 24 +typedef struct context_s { shift_buffer_t *payload; line_t *line; bool init; bool est; - bool first; bool fin; } context_t;