Skip to content

Commit

Permalink
reformat and remove redundant code
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 17, 2024
1 parent 32ea8a3 commit d61bfb3
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 57 deletions.
30 changes: 21 additions & 9 deletions tunnels/adapters/connector/tcp/tcp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,26 @@ static void onClose(hio_t *io)
LOGD("TcpConnector: sent close for FD:%x ", hio_fd(io));
}
}
static void onLinePaused(void *cstate)
static void onLinePaused(void *userdata)
{
hio_read_stop(((tcp_connector_con_state_t *) cstate)->io);
tcp_connector_con_state_t *cstate = (tcp_connector_con_state_t *) (userdata);

if (! cstate->read_paused)
{
cstate->read_paused = true;
hio_read_stop(cstate->io);
}
}

static void onLineResumed(void *cstate)
static void onLineResumed(void *userdata)
{
hio_read(((tcp_connector_con_state_t *) cstate)->io);
tcp_connector_con_state_t *cstate = (tcp_connector_con_state_t *) (userdata);

if (cstate->read_paused)
{
cstate->read_paused = false;
hio_read(cstate->io);
}
}

static void onOutBoundConnected(hio_t *upstream_io)
Expand Down Expand Up @@ -234,8 +246,8 @@ static void upStream(tunnel_t *self, context_t *c)
{
if (! resolveContextSync(dest_ctx))
{
cleanup(cstate, false);
CSTATE_DROP(c);
cleanup(cstate, false);
goto fail;
}
}
Expand All @@ -245,8 +257,8 @@ static void upStream(tunnel_t *self, context_t *c)
if (sockfd < 0)
{
LOGE("Connector: socket fd < 0");
cleanup(cstate, false);
CSTATE_DROP(c);
cleanup(cstate, false);
goto fail;
}
if (state->tcp_no_delay)
Expand Down Expand Up @@ -277,14 +289,14 @@ static void upStream(tunnel_t *self, context_t *c)
}
else if (c->fin)
{
cleanup(cstate, true);
CSTATE_DROP(c);
cleanup(cstate, true);
destroyContext(c);
}
}
return;
fail:;
self->dw->downStream(self->dw, newFinContext(c->line));
self->dw->downStream(self->dw, newFinContextFrom(c));
destroyContext(c);
}
static void downStream(tunnel_t *self, context_t *c)
Expand Down Expand Up @@ -327,8 +339,8 @@ static void downStream(tunnel_t *self, context_t *c)
}
else if (c->fin)
{
cleanup(cstate, false);
CSTATE_DROP(c);
cleanup(cstate, false);
self->dw->downStream(self->dw, c);
}
}
Expand Down
1 change: 1 addition & 0 deletions tunnels/adapters/connector/tcp/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ typedef struct tcp_connector_con_state_s
context_queue_t *data_queue;
bool write_paused;
bool established;
bool read_paused;
} tcp_connector_con_state_t;
7 changes: 3 additions & 4 deletions tunnels/adapters/connector/udp/udp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ static void onRecvFrom(hio_t *io, shift_buffer_t *buf)

context_t *context = newContext(line);
context->payload = payload;

self->downStream(self, context);
}

Expand All @@ -48,8 +47,8 @@ static void upStream(tunnel_t *self, context_t *c)

if (hio_is_closed(cstate->io))
{
cleanup(CSTATE(c));
CSTATE_DROP(c);
cleanup(cstate);
reuseContextBuffer(c);
goto fail;
}
Expand Down Expand Up @@ -82,8 +81,8 @@ static void upStream(tunnel_t *self, context_t *c)
if (sockfd < 0)
{
LOGE("Connector: socket fd < 0");
cleanup(CSTATE(c));
CSTATE_DROP(c);
cleanup(cstate);
goto fail;
}

Expand Down Expand Up @@ -173,8 +172,8 @@ static void downStream(tunnel_t *self, context_t *c)
{
hio_t *io = cstate->io;
hevent_set_userdata(io, NULL);
cleanup(cstate);
CSTATE_DROP(c);
cleanup(cstate);
}
self->dw->downStream(self->dw, c);
}
Expand Down
27 changes: 20 additions & 7 deletions tunnels/adapters/listener/tcp/tcp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ typedef struct tcp_listener_con_state_s
bool write_paused;
bool established;
bool first_packet_sent;
bool read_paused;
} tcp_listener_con_state_t;

static void cleanup(tcp_listener_con_state_t *cstate, bool write_queue)
Expand Down Expand Up @@ -116,14 +117,26 @@ static void onWriteComplete(hio_t *io)
}
}

static void onLinePaused(void *cstate)
static void onLinePaused(void *userdata)
{
hio_read_stop(((tcp_listener_con_state_t *) cstate)->io);
tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *) (userdata);

if (! cstate->read_paused)
{
cstate->read_paused = true;
hio_read_stop(cstate->io);
}
}

static void onLineResumed(void *cstate)
static void onLineResumed(void *userdata)
{
hio_read(((tcp_listener_con_state_t *) cstate)->io);
tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *) (userdata);

if (cstate->read_paused)
{
cstate->read_paused = false;
hio_read(cstate->io);
}
}

static void upStream(tunnel_t *self, context_t *c)
Expand All @@ -149,9 +162,9 @@ static void upStream(tunnel_t *self, context_t *c)
{
if (c->fin)
{

cleanup(CSTATE(c), false);
tcp_listener_con_state_t *cstate = CSTATE(c);
CSTATE_DROP(c);
cleanup(cstate, false);
}
}

Expand Down Expand Up @@ -196,8 +209,8 @@ static void downStream(tunnel_t *self, context_t *c)
}
if (c->fin)
{
cleanup(cstate, true);
CSTATE_DROP(c);
cleanup(cstate, true);
destroyContext(c);
return;
}
Expand Down
2 changes: 0 additions & 2 deletions tunnels/client/header/header_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ tunnel_t *newHeaderClient(node_instance_context_t *instance_info)
t->upStream = &upStream;
t->downStream = &downStream;



return t;
}

Expand Down
11 changes: 4 additions & 7 deletions tunnels/client/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_
stream->chunkbs = newBufferStream(getLineBufferPool(con->line));
stream->parent = con->line;
stream->line = child_line;
stream->tunnel = con->tunnel->dw;
LSTATE_I_MUT(stream->line, stream->tunnel->chain_index + 1) = stream;
stream->tunnel = con->tunnel;
LSTATE_I_MUT(stream->line, stream->tunnel->chain_index) = stream;
setupLineUpSide(stream->line, onStreamLinePaused, stream, onStreamLineResumed);

addStraem(con, stream);
Expand All @@ -147,11 +147,9 @@ static http2_client_child_con_state_t *createHttp2Stream(http2_client_con_state_
static void deleteHttp2Stream(http2_client_child_con_state_t *stream)
{

LSTATE_I_DROP(stream->line, stream->tunnel->chain_index);
destroyBufferStream(stream->chunkbs);
LSTATE_I_DROP(stream->line, stream->tunnel->chain_index + 1);
doneLineUpSide(stream->line);
resumeLineUpSide(stream->parent);

free(stream);
}

Expand Down Expand Up @@ -211,13 +209,12 @@ static void deleteHttp2Connection(http2_client_con_state_t *con)
context_t *fin_ctx = newFinContext(stream_i->line);
tunnel_t *dest = stream_i->tunnel;
deleteHttp2Stream(stream_i);
CSTATE_DROP(fin_ctx);
dest->downStream(dest, fin_ctx);
stream_i = next;
}
doneLineDownSide(con->line);
nghttp2_session_del(con->session);
LSTATE_DROP(con->line);
nghttp2_session_del(con->session);
destroyContextQueue(con->queue);
destroyLine(con->line);
htimer_del(con->ping_timer);
Expand Down
12 changes: 6 additions & 6 deletions tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
stream->bytes_needed = 0;
context_t *stream_data = newContext(stream->line);
stream_data->payload = gdata_buf;
stream->tunnel->downStream(stream->tunnel, stream_data);
stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data);

if (nghttp2_session_get_stream_user_data(session, stream_id))
{
Expand All @@ -252,7 +252,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
writeRaw(buf, data, len);
context_t *stream_data = newContext(stream->line);
stream_data->payload = buf;
stream->tunnel->downStream(stream->tunnel, stream_data);
stream->tunnel->dw->downStream(stream->tunnel->dw, stream_data);
}

return 0;
Expand Down Expand Up @@ -300,13 +300,13 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr
{
return 0;
}
resumeLineUpSide(stream->parent);
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
context_t *fc = newFinContext(stream->line);
tunnel_t *dest = stream->tunnel;
CSTATE_DROP(fc);
tunnel_t *dest = stream->tunnel->dw;
removeStream(con, stream);
deleteHttp2Stream(stream);
CSTATE_DROP(fc);

dest->downStream(dest, fc);

return 0;
Expand All @@ -322,7 +322,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr
{
con->handshake_completed = true;
flushWriteQueue(con);
stream->tunnel->downStream(stream->tunnel, newEstContext(stream->line));
stream->tunnel->dw->downStream(stream->tunnel->dw, newEstContext(stream->line));
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions tunnels/client/protobuf/protobuf_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ static void upStream(tunnel_t *self, context_t *c)
else if (c->fin)
{
protobuf_client_con_state_t *cstate = CSTATE(c);
cleanup(cstate);
CSTATE_DROP(c);
cleanup(cstate);
}
}
self->up->upStream(self->up, c);
Expand Down Expand Up @@ -136,15 +136,15 @@ static void downStream(tunnel_t *self, context_t *c)
{
if (c->fin)
{
cleanup(cstate);
CSTATE_DROP(c);
cleanup(cstate);
}
self->dw->downStream(self->dw, c);
}
return;
disconnect:;
cleanup(cstate);
CSTATE_DROP(c);
cleanup(cstate);
self->up->upStream(self->up, newFinContext(c->line));
self->dw->downStream(self->dw, newFinContext(c->line));
destroyContext(c);
Expand Down
20 changes: 9 additions & 11 deletions tunnels/server/http2/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,29 @@ static void onH2LineResumed(void *arg)
}
}

http2_server_child_con_state_t *createHttp2Stream(http2_server_con_state_t *con, line_t *this_line,
tunnel_t *target_tun, int32_t stream_id)
http2_server_child_con_state_t *createHttp2Stream(http2_server_con_state_t *con, line_t *this_line, tunnel_t *self,
int32_t stream_id)
{
http2_server_child_con_state_t *stream;
stream = malloc(sizeof(http2_server_child_con_state_t));
memset(stream, 0, sizeof(http2_server_child_con_state_t));

stream->stream_id = stream_id;
stream->chunkbs = newBufferStream(getLineBufferPool(this_line));
stream->parent = this_line;
stream->line = newLine(this_line->tid);
LSTATE_I_MUT(stream->line, target_tun->chain_index - 1) = stream;
stream->tunnel = target_tun;
stream->stream_id = stream_id;
stream->chunkbs = newBufferStream(getLineBufferPool(this_line));
stream->parent = this_line;
stream->line = newLine(this_line->tid);
LSTATE_MUT(stream->line) = stream;
stream->tunnel = self;
nghttp2_session_set_stream_user_data(con->session, stream_id, stream);
setupLineDownSide(stream->line, onStreamLinePaused, stream, onStreamLineResumed);

return stream;
}
static void deleteHttp2Stream(http2_server_child_con_state_t *stream)
{
LSTATE_I_DROP(stream->line, stream->tunnel->chain_index - 1);
LSTATE_I_DROP(stream->line, stream->tunnel->chain_index);
destroyBufferStream(stream->chunkbs);
doneLineDownSide(stream->line);
resumeLineDownSide(stream->parent);
destroyLine(stream->line);
if (stream->request_path)
{
Expand Down Expand Up @@ -147,7 +146,6 @@ static void deleteHttp2Connection(http2_server_con_state_t *con)
tunnel_t *dest = stream_i->tunnel;
http2_server_child_con_state_t *next = stream_i->next;
deleteHttp2Stream(stream_i);
CSTATE_DROP(fin_ctx);
dest->upStream(dest, fin_ctx);
stream_i = next;
}
Expand Down
12 changes: 6 additions & 6 deletions tunnels/server/http2/http2_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
stream->first_sent = true;
stream_data->first = true;
}
stream->tunnel->upStream(stream->tunnel, stream_data);
stream->tunnel->up->upStream(stream->tunnel->up, stream_data);

if (nghttp2_session_get_stream_user_data(session, stream_id))
{
Expand All @@ -132,7 +132,7 @@ static int onDataChunkRecvCallback(nghttp2_session *session, uint8_t flags, int3
stream->first_sent = true;
stream_data->first = true;
}
stream->tunnel->upStream(stream->tunnel, stream_data);
stream->tunnel->up->upStream(stream->tunnel->up, stream_data);
}

return 0;
Expand Down Expand Up @@ -183,13 +183,13 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr
{
return 0;
}
resumeLineDownSide(stream->parent);
nghttp2_session_set_stream_user_data(con->session, stream->stream_id, NULL);
context_t *fc = newFinContext(stream->line);
tunnel_t *dest = stream->tunnel;
tunnel_t *dest = stream->tunnel->up;
CSTATE_DROP(fc);
removeStream(con, stream);
deleteHttp2Stream(stream);
CSTATE_DROP(fc);

dest->upStream(dest, fc);
return 0;
}
Expand All @@ -216,7 +216,7 @@ static int onFrameRecvCallback(nghttp2_session *session, const nghttp2_frame *fr

http2_server_child_con_state_t *stream = createHttp2Stream(con, con->line, self->up, frame->hd.stream_id);
addStream(con, stream);
stream->tunnel->upStream(stream->tunnel, newInitContext(stream->line));
stream->tunnel->up->upStream(stream->tunnel->up, newInitContext(stream->line));

return 0;
}
Expand Down
Loading

0 comments on commit d61bfb3

Please sign in to comment.