Skip to content

Commit

Permalink
progress on udp dispatcher / reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Apr 29, 2024
1 parent 226a873 commit 22521b6
Show file tree
Hide file tree
Showing 21 changed files with 991 additions and 210 deletions.
3 changes: 2 additions & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ Checks: >
-readability-braces-around-statements,
-readability-magic-numbers,
-readability-identifier-length,
-misc-no-recursion
-misc-no-recursion,
-misc-include-cleaner
# Turn all the warnings from the checks above into errors.
Expand Down
30 changes: 16 additions & 14 deletions tunnels/adapters/connector/tcp/tcp_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "sync_dns.h"
#include "types.h"
#include "utils/sockutils.h"
#include "utils/jsonutils.h"

static void cleanup(tcp_connector_con_state_t *cstate, bool write_queue)
{
Expand Down Expand Up @@ -54,10 +55,10 @@ static bool resumeWriteQueue(tcp_connector_con_state_t *cstate)
{
context_queue_t *data_queue = (cstate)->data_queue;
context_queue_t *finished_queue = (cstate)->finished_queue;
hio_t * io = cstate->io;
hio_t *io = cstate->io;
while (contextQueueLen(data_queue) > 0)
{
context_t *cw = contextQueuePop(data_queue);
context_t *cw = contextQueuePop(data_queue);
unsigned int bytes = bufLen(cw->payload);
int nwrite = hio_write(io, cw->payload);
cw->payload = NULL;
Expand All @@ -73,7 +74,7 @@ static bool resumeWriteQueue(tcp_connector_con_state_t *cstate)
while (contextQueueLen(finished_queue) > 0)
{
context_t *cw = contextQueuePop(finished_queue);
hio_t * upstream_io = cw->src_io;
hio_t *upstream_io = cw->src_io;
if (upstream_io != NULL && (last_resumed_io != upstream_io))
{
last_resumed_io = upstream_io;
Expand Down Expand Up @@ -112,7 +113,7 @@ static void onWriteComplete(hio_t *restrict io)
while (contextQueueLen(finished_queue) > 0)
{
context_t *cw = contextQueuePop(finished_queue);
hio_t * upstream_io = cw->src_io;
hio_t *upstream_io = cw->src_io;
if (upstream_io != NULL && (last_resumed_io != upstream_io))
{
last_resumed_io = upstream_io;
Expand All @@ -128,12 +129,12 @@ static void onRecv(hio_t *restrict io, shift_buffer_t *buf)
tcp_connector_con_state_t *cstate = (tcp_connector_con_state_t *) (hevent_userdata(io));
if (cstate == NULL)
{
reuseBuffer(hloop_bufferpool(hevent_loop(io)),buf);
reuseBuffer(hloop_bufferpool(hevent_loop(io)), buf);
return;
}
shift_buffer_t *payload = buf;
tunnel_t * self = (cstate)->tunnel;
line_t * line = (cstate)->line;
tunnel_t *self = (cstate)->tunnel;
line_t *line = (cstate)->line;

context_t *context = newContext(line);
context->src_io = io;
Expand All @@ -154,8 +155,8 @@ static void onClose(hio_t *io)
}
if (cstate != NULL)
{
tunnel_t * self = (cstate)->tunnel;
line_t * line = (cstate)->line;
tunnel_t *self = (cstate)->tunnel;
line_t *line = (cstate)->line;
context_t *context = newFinContext(line);
self->downStream(self, context);
}
Expand All @@ -178,7 +179,7 @@ static void onOutBoundConnected(hio_t *upstream_io)
#endif

tunnel_t *self = cstate->tunnel;
line_t * line = cstate->line;
line_t *line = cstate->line;
hio_setcb_read(upstream_io, onRecv);

char localaddrstr[SOCKADDR_STRLEN] = {0};
Expand Down Expand Up @@ -249,7 +250,7 @@ static void upStream(tunnel_t *self, context_t *c)

socket_context_t *dest_ctx = &(c->line->dest_ctx);
socket_context_t *src_ctx = &(c->line->src_ctx);
switch (state->dest_addr_selected.status)
switch ((enum tcp_connector_dynamic_value_status) state->dest_addr_selected.status)
{
case kCdvsFromSource:
socketContextAddrCopy(dest_ctx, src_ctx);
Expand All @@ -261,7 +262,7 @@ static void upStream(tunnel_t *self, context_t *c)
case kCdvsFromDest:
break;
}
switch (state->dest_port_selected.status)
switch ((enum tcp_connector_dynamic_value_status) state->dest_port_selected.status)
{
case kCdvsFromSource:
socketContextPortCopy(dest_ctx, src_ctx);
Expand Down Expand Up @@ -314,7 +315,7 @@ static void upStream(tunnel_t *self, context_t *c)
hio_t *upstream_io = hio_get(loop, sockfd);
assert(upstream_io != NULL);

hio_set_peeraddr(upstream_io, &(dest_ctx->address.sa), sockaddr_len(&(dest_ctx->address)));
hio_set_peeraddr(upstream_io, &(dest_ctx->address.sa), (int) sockaddr_len(&(dest_ctx->address)));
cstate->io = upstream_io;
hevent_set_userdata(upstream_io, cstate);

Expand All @@ -327,7 +328,8 @@ static void upStream(tunnel_t *self, context_t *c)
{
hio_t *io = cstate->io;
CSTATE_MUT(c) = NULL;
contextQueueNotifyIoRemoved(c->src_io);
contextQueueNotifyIoRemoved(cstate->data_queue, c->src_io);
contextQueueNotifyIoRemoved(cstate->finished_queue, c->src_io);
cleanup(cstate, true);
destroyContext(c);
hio_close(io);
Expand Down
23 changes: 11 additions & 12 deletions tunnels/adapters/connector/udp/udp_connector.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "loggers/network_logger.h"
#include "sync_dns.h"
#include "types.h"
#include "utils/jsonutils.h"
#include "utils/sockutils.h"

static void cleanup(udp_connector_con_state_t *cstate)
Expand All @@ -13,15 +14,13 @@ static void onRecv(hio_t *io, shift_buffer_t *buf)
udp_connector_con_state_t *cstate = (udp_connector_con_state_t *) (hevent_userdata(io));
if (cstate == NULL)
{
reuseBuffer(hloop_bufferpool(hevent_loop(io)),buf);
reuseBuffer(hloop_bufferpool(hevent_loop(io)), buf);
return;
}
shift_buffer_t *payload = buf;
tunnel_t * self = (cstate)->tunnel;
line_t * line = (cstate)->line;

struct sockaddr *destaddr = hio_peeraddr(io);

shift_buffer_t *payload = buf;
tunnel_t *self = (cstate)->tunnel;
line_t *line = (cstate)->line;
struct sockaddr *destaddr = hio_peeraddr(io);
enum socket_address_type address_type;

if (destaddr->sa_family == AF_INET6)
Expand Down Expand Up @@ -89,7 +88,7 @@ static void upStream(tunnel_t *self, context_t *c)
cstate->tunnel = self;
cstate->line = c->line;
// sockaddr_set_ipport(&(dest->addr),"www.gstatic.com",80);
hloop_t * loop = loops[c->line->tid];
hloop_t *loop = loops[c->line->tid];
sockaddr_u host_addr = {0};
sockaddr_set_ipport(&host_addr, "0.0.0.0", 0);

Expand Down Expand Up @@ -126,7 +125,7 @@ static void upStream(tunnel_t *self, context_t *c)

socket_context_t *dest_ctx = &(c->line->dest_ctx);
socket_context_t *src_ctx = &(c->line->src_ctx);
switch (state->dest_addr_selected.status)
switch ((enum udp_connector_dynamic_value_status) state->dest_addr_selected.status)
{
case kCdvsFromSource:
socketContextAddrCopy(dest_ctx, src_ctx);
Expand All @@ -138,7 +137,7 @@ static void upStream(tunnel_t *self, context_t *c)
case kCdvsFromDest:
break;
}
switch (state->dest_port_selected.status)
switch ((enum udp_connector_dynamic_value_status) state->dest_port_selected.status)
{
case kCdvsFromSource:
socketContextPortCopy(dest_ctx, src_ctx);
Expand All @@ -160,7 +159,7 @@ static void upStream(tunnel_t *self, context_t *c)
goto fail;
}
}
hio_set_peeraddr(cstate->io, &(dest_ctx->address.sa), sockaddr_len(&(dest_ctx->address)));
hio_set_peeraddr(cstate->io, &(dest_ctx->address.sa), (int) sockaddr_len(&(dest_ctx->address)));

destroyContext(c);
}
Expand Down Expand Up @@ -220,7 +219,7 @@ tunnel_t *newUdpConnector(node_instance_context_t *instance_info)
{
state->constant_dest_addr.address_type = getHostAddrType(state->dest_addr_selected.value_ptr);
socketContextDomainSetConstMem(&(state->constant_dest_addr), state->dest_addr_selected.value_ptr,
strlen(state->dest_addr_selected.value_ptr));
strlen(state->dest_addr_selected.value_ptr));
}

state->dest_port_selected =
Expand Down
Loading

0 comments on commit 22521b6

Please sign in to comment.