Skip to content

Commit

Permalink
api chane: remove the "first" flag from context struct
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jul 12, 2024
1 parent 32368fe commit dc9f7e5
Show file tree
Hide file tree
Showing 31 changed files with 206 additions and 304 deletions.
34 changes: 15 additions & 19 deletions tunnels/adapters/listener/tcp/tcp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,16 @@ static void onLineResumed(void *userdata)

static void upStream(tunnel_t *self, context_t *c)
{
#ifdef PROFILE
if (c->payload != NULL)
{
#ifdef PROFILE
if (c->first)
tcp_listener_con_state_t *cstate = CSTATE(c);

bool *first_packet_sent = &((cstate)->first_packet_sent);

if (! (*first_packet_sent))
{
*first_packet_sent = true;
struct timeval tv1, tv2;
gettimeofday(&tv1, NULL);
{
Expand All @@ -161,16 +166,13 @@ static void upStream(tunnel_t *self, context_t *c)
LOGD("TcpListener: upstream took %d ms", (int) (time_spent * 1000));
return;
}
#endif
}
else
#endif
if (c->fin)
{
if (c->fin)
{
tcp_listener_con_state_t *cstate = CSTATE(c);
CSTATE_DROP(c);
cleanup(cstate, false);
}
tcp_listener_con_state_t *cstate = CSTATE(c);
CSTATE_DROP(c);
cleanup(cstate, false);
}

self->up->upStream(self->up, c);
Expand Down Expand Up @@ -231,18 +233,12 @@ static void onRecv(hio_t *io, shift_buffer_t *buf)
reuseBuffer(hloop_bufferpool(hevent_loop(io)), buf);
return;
}
shift_buffer_t *payload = buf;
tunnel_t *self = (cstate)->tunnel;
line_t *line = (cstate)->line;
bool *first_packet_sent = &((cstate)->first_packet_sent);
shift_buffer_t *payload = buf;
tunnel_t *self = (cstate)->tunnel;
line_t *line = (cstate)->line;

context_t *context = newContext(line);
context->payload = payload;
if (! (*first_packet_sent))
{
*first_packet_sent = true;
context->first = true;
}

self->upStream(self, context);
}
Expand Down
29 changes: 16 additions & 13 deletions tunnels/adapters/listener/udp/udp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static void cleanup(udp_listener_con_state_t *cstate)
// sounds impossible...
LOGE("Checkpoint udp listener");
// this prevent double free
*cstate = (udp_listener_con_state_t){0};
*cstate = (udp_listener_con_state_t) {0};
}
}
else
Expand All @@ -66,8 +66,11 @@ static void upStream(tunnel_t *self, context_t *c)
if (c->payload != NULL)
{
#ifdef PROFILE
if (c->first)
udp_listener_con_state_t *cstate = CSTATE(c);
bool *first_packet_sent = &((cstate)->first_packet_sent);
if (! (*first_packet_sent))
{
*first_packet_sent = true;
struct timeval tv1, tv2;
gettimeofday(&tv1, NULL);
{
Expand Down Expand Up @@ -101,7 +104,7 @@ static void downStream(tunnel_t *self, context_t *c)

if (c->payload != NULL)
{
postUdpWrite(cstate->uio,c->line->tid, c->payload);
postUdpWrite(cstate->uio, c->line->tid, c->payload);
dropContexPayload(c);
destroyContext(c);
}
Expand Down Expand Up @@ -151,13 +154,13 @@ static udp_listener_con_state_t *newConnection(uint8_t tid, tunnel_t *self, udps
line->src_ctx.address_protocol = kSapUdp;
line->src_ctx.address = *(sockaddr_u *) hio_peeraddr(uio->io);

*cstate = (udp_listener_con_state_t){.loop = loops[tid],
.line = line,
.buffer_pool = getThreadBufferPool(tid),
.uio = uio,
.tunnel = self,
.established = false,
.first_packet_sent = false};
*cstate = (udp_listener_con_state_t) {.loop = loops[tid],
.line = line,
.buffer_pool = getThreadBufferPool(tid),
.uio = uio,
.tunnel = self,
.established = false,
.first_packet_sent = false};

sockaddr_set_port(&(line->src_ctx.address), real_localport);

Expand Down Expand Up @@ -294,7 +297,7 @@ tunnel_t *newUdpListener(node_instance_context_t *instance_info)
}
socket_filter_option_t filter_opt = {0};

getStringFromJsonObject(&(filter_opt.balance_group_name),settings, "balance-group");
getStringFromJsonObject(&(filter_opt.balance_group_name), settings, "balance-group");
getIntFromJsonObject((int *) &(filter_opt.balance_group_interval), settings, "balance-interval");

filter_opt.multiport_backend = kMultiportBackendNothing;
Expand Down Expand Up @@ -362,7 +365,7 @@ api_result_t apiUdpListener(tunnel_t *self, const char *msg)
{
(void) (self);
(void) (msg);
return (api_result_t){0};
return (api_result_t) {0};
}

tunnel_t *destroyUdpListener(tunnel_t *self)
Expand All @@ -372,5 +375,5 @@ tunnel_t *destroyUdpListener(tunnel_t *self)
}
tunnel_metadata_t getMetadataUdpListener(void)
{
return (tunnel_metadata_t){.version = 0001, .flags = kNodeFlagChainHead};
return (tunnel_metadata_t) {.version = 0001, .flags = kNodeFlagChainHead};
}
23 changes: 14 additions & 9 deletions tunnels/client/bgp4/bgp4_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,23 @@ typedef struct bgp4_client_state_s
typedef struct bgp4_client_con_state_s
{
buffer_stream_t *read_stream;
bool first_packet_sent;

} bgp4_client_con_state_t;

static void upStream(tunnel_t *self, context_t *c)
{
bgp4_client_state_t *state = TSTATE(self);
bgp4_client_state_t *state = TSTATE(self);
bgp4_client_con_state_t *cstate = CSTATE(c);

if (c->payload != NULL)
{
if (c->first)
uint8_t bgp_type = 2 + (fastRand() % kBgpTypes - 1);

if (! cstate->first_packet_sent)
{
cstate->first_packet_sent = true;

uint32_t additions = 3 + fastRand() % 8;

shiftl(c->payload, kBgpOpenPacketHeaderSize + additions);
Expand All @@ -66,9 +72,9 @@ static void upStream(tunnel_t *self, context_t *c)
{
header[1 + 2 + 2 + 4 + 1 + i] = fastRand() % 200;
}
}

uint8_t bgp_type = c->first ? 1 : 2 + (fastRand() % kBgpTypes - 1);
bgp_type = 1; // BGP Open
}

shiftl(c->payload, 1); // type
writeUI8(c->payload, bgp_type);
Expand All @@ -84,12 +90,11 @@ static void upStream(tunnel_t *self, context_t *c)
}
else
{
bgp4_client_con_state_t *cstate = CSTATE(c);

if (c->init)
{
cstate = wwmGlobalMalloc(sizeof(bgp4_client_con_state_t));
*cstate = (bgp4_client_con_state_t){.read_stream = newBufferStream(getContextBufferPool(c))};
*cstate = (bgp4_client_con_state_t) {.read_stream = newBufferStream(getContextBufferPool(c))};
CSTATE_MUT(c) = cstate;
}
else if (c->fin)
Expand Down Expand Up @@ -166,7 +171,7 @@ static void downStream(tunnel_t *self, context_t *c)
wwmGlobalFree(cstate);
CSTATE_DROP(c);
}

self->dw->downStream(self->dw, c);
return;

Expand Down Expand Up @@ -207,7 +212,7 @@ api_result_t apiBgp4Client(tunnel_t *self, const char *msg)
{
(void) (self);
(void) (msg);
return (api_result_t){0};
return (api_result_t) {0};
}

tunnel_t *destroyBgp4Client(tunnel_t *self)
Expand All @@ -217,5 +222,5 @@ tunnel_t *destroyBgp4Client(tunnel_t *self)
}
tunnel_metadata_t getMetadataBgp4Client(void)
{
return (tunnel_metadata_t){.version = 0001, .flags = 0x0};
return (tunnel_metadata_t) {.version = 0001, .flags = 0x0};
}
7 changes: 4 additions & 3 deletions tunnels/client/halfduplex/halfduplex_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ typedef struct halfduplex_con_state_s
line_t *main_line;
line_t *upload_line;
line_t *download_line;

bool first_packet_sent;
} halfduplex_con_state_t;

static void onMainLinePaused(void *cstate)
Expand Down Expand Up @@ -43,16 +43,17 @@ static void onUDLineResumed(void *cstate)
static void upStream(tunnel_t *self, context_t *c)
{
halfduplex_con_state_t *cstate = CSTATE(c);

if (c->payload != NULL)
{
if (c->first)
if (!cstate->first_packet_sent)
{
cstate->first_packet_sent = true;
// 63 bits of random is enough and is better than hashing sender addr on halfduplex server, i believe so...
uint32_t cids[2] = {fastRand(), fastRand()};
uint8_t *cid_bytes = (uint8_t *) &(cids[0]);

context_t *intro_context = newContext(cstate->download_line);
intro_context->first = true;
intro_context->payload = popBuffer(getContextBufferPool(c));

cid_bytes[0] = cid_bytes[0] | (1 << 7); // kHLFDCmdDownload
Expand Down
30 changes: 24 additions & 6 deletions tunnels/client/header/header_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,27 @@ typedef struct header_client_state_s

typedef struct header_client_con_state_s
{
void *_;
bool first_packet_received;
} header_client_con_state_t;

static void upStream(tunnel_t *self, context_t *c)
{
header_client_state_t *state = TSTATE(self);

if (c->first && c->payload != NULL)
header_client_state_t *state = TSTATE(self);
header_client_con_state_t *cstate = CSTATE(c);
if (c->init)
{
cstate = wwmGlobalMalloc(sizeof(header_client_con_state_t));
*cstate = (header_client_con_state_t) {0};
CSTATE_MUT(c) = cstate;
}
else if (c->fin)
{
wwmGlobalFree(cstate);
CSTATE_DROP(c);
}
else if (! cstate->first_packet_received && c->payload != NULL)
{
cstate->first_packet_received = true;

switch ((enum header_dynamic_value_status) state->data.status)
{
Expand All @@ -47,6 +59,12 @@ static void upStream(tunnel_t *self, context_t *c)
static void downStream(tunnel_t *self, context_t *c)
{

if (c->fin)
{
wwmGlobalFree( CSTATE(c));
CSTATE_DROP(c);
}

self->dw->downStream(self->dw, c);
}

Expand All @@ -71,7 +89,7 @@ api_result_t apiHeaderClient(tunnel_t *self, const char *msg)
{
(void) (self);
(void) (msg);
return (api_result_t){0};
return (api_result_t) {0};
}

tunnel_t *destroyHeaderClient(tunnel_t *self)
Expand All @@ -81,5 +99,5 @@ tunnel_t *destroyHeaderClient(tunnel_t *self)
}
tunnel_metadata_t getMetadataHeaderClient(void)
{
return (tunnel_metadata_t){.version = 0001, .flags = 0x0};
return (tunnel_metadata_t) {.version = 0001, .flags = 0x0};
}
6 changes: 0 additions & 6 deletions tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,9 @@ static void onPingTimer(htimer_t *timer)
writeRaw(send_buf, data, len);
context_t *req = newContext(h2line);
req->payload = send_buf;
if (! con->first_sent)
{
con->first_sent = true;
req->first = true;
}
con->tunnel->up->upStream(con->tunnel->up, req);
if (! isAlive(h2line))
{

unLockLine(h2line);
return;
}
Expand Down
11 changes: 5 additions & 6 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ static int onStreamClosedCallback(nghttp2_session *session, int32_t stream_id, u

http2_client_con_state_t *con = (http2_client_con_state_t *) userdata;
http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, stream_id);
LOGD("callback end stream for: %d", stream_id);
// LOGD("callback end stream for: %d", stream_id);
if (! stream)
{
return 0;
Expand Down Expand Up @@ -159,7 +159,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr
if (stream->bytes_sent_nack >= kMaxSendBeforeAck)
{
stream->bytes_sent_nack -= consumed;
LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack);
// LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack);

if (stream->bytes_sent_nack < kMaxSendBeforeAck)
{
Expand All @@ -169,7 +169,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr
else
{
stream->bytes_sent_nack -= consumed;
LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack);
// LOGD("consumed: %d left: %d", consumed, (int) stream->bytes_sent_nack);
}
}
}
Expand All @@ -187,7 +187,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr
{
if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == NGHTTP2_FLAG_END_STREAM)
{
LOGD("end stream for: %d", frame->hd.stream_id);
// LOGD("end stream for: %d", frame->hd.stream_id);

http2_client_child_con_state_t *stream = nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (WW_UNLIKELY(! stream))
Expand Down Expand Up @@ -478,7 +478,6 @@ static void upStream(tunnel_t *self, context_t *c)
http2_client_con_state_t *con = LSTATE(stream->parent);
CSTATE_DROP(c);

// LOGE("closed %d",stream->stream_id);
int flags = NGHTTP2_FLAG_END_STREAM | NGHTTP2_FLAG_END_HEADERS;
if (con->content_type == kApplicationGrpc)
{
Expand All @@ -489,7 +488,7 @@ static void upStream(tunnel_t *self, context_t *c)
{
nghttp2_submit_headers(con->session, flags, stream->stream_id, NULL, NULL, 0, NULL);
}
LOGD("destroy %d", (int) stream->stream_id);
// LOGD("closing -> %d", (int) stream->stream_id);
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
removeStream(con, stream);
deleteHttp2Stream(stream);
Expand Down
1 change: 0 additions & 1 deletion tunnels/client/http2/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ typedef struct http2_client_con_state_s
int host_port;
bool handshake_completed;
bool init_sent;
bool first_sent;
bool no_ping_ack;

} http2_client_con_state_t;
Expand Down
1 change: 0 additions & 1 deletion tunnels/client/openssl/openssl_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ static void upStream(tunnel_t *self, context_t *c)
{
setLen(buf, n);
client_hello_ctx->payload = buf;
client_hello_ctx->first = true;
self->up->upStream(self->up, client_hello_ctx);
}
else if (! BIO_should_retry(cstate->rbio))
Expand Down
Loading

0 comments on commit dc9f7e5

Please sign in to comment.