Skip to content

Commit

Permalink
almost finish standalone mux...
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jul 23, 2024
1 parent e5a8da9 commit fd1e4e6
Show file tree
Hide file tree
Showing 6 changed files with 573 additions and 88 deletions.
19 changes: 13 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ option(INCLUDE_OPENSSL_SERVER "link OpenSSlServer staticly to the core" TRUE)
option(INCLUDE_OPENSSL_CLIENT "link OpenSSLClient staticly to the core" TRUE)
option(INCLUDE_TROJAN_AUTH_SERVER "link TrojanAuthServer staticly to the core" TRUE)
option(INCLUDE_TROJAN_SOCKS_SERVER "link TrojanSocksServer staticly to the core" TRUE)
option(INCLUDE_WOLFSSL_SERVER "link WolfSSLServer staticly to the core" TRUE) # temporarely disabled for compile speed
option(INCLUDE_WOLFSSL_CLIENT "link WolfSSLClient staticly to the core" TRUE) # temporarely disabled for compile speed
option(INCLUDE_WOLFSSL_SERVER "link WolfSSLServer staticly to the core" TRUE) # this downloads 750 mb of data!
option(INCLUDE_WOLFSSL_CLIENT "link WolfSSLClient staticly to the core" TRUE) # this downloads 750 mb of data!
option(INCLUDE_HTTP2_SERVER "link Http2Server staticly to the core" TRUE)
option(INCLUDE_HTTP2_CLIENT "link Http2Client staticly to the core" TRUE)
option(INCLUDE_PROTOBUF_SERVER "link ProtoBufServer staticly to the core" TRUE)
Expand All @@ -73,6 +73,7 @@ option(INCLUDE_HALFDUPLEX_SERVER "link HalfDuplexServer staticly to the core" T
option(INCLUDE_HALFDUPLEX_CLIENT "link HalfDuplexClient staticly to the core" TRUE)
option(INCLUDE_BGP4_SERVER "link Bgp4Server staticly to the core" TRUE)
option(INCLUDE_BGP4_CLIENT "link Bgp4Client staticly to the core" TRUE)
option(INCLUDE_MUX_SERVER "link MuxServer staticly to the core" TRUE)
option(INCLUDE_MUX_CLIENT "link MuxClient staticly to the core" TRUE)

set(OPENSSL_CONFIGURE_VERBOSE ON)
Expand Down Expand Up @@ -356,7 +357,16 @@ target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/cli
target_link_libraries(Waterwall Bgp4Client)
endif()

#Mux client
#mux server
if (INCLUDE_MUX_SERVER)
target_compile_definitions(Waterwall PUBLIC INCLUDE_MUX_SERVER=1)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tunnels/server/mux)
target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/server/mux)
target_link_libraries(Waterwall MuxServer)
endif()


#mux client
if (INCLUDE_MUX_CLIENT)
target_compile_definitions(Waterwall PUBLIC INCLUDE_MUX_CLIENT=1)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tunnels/client/mux)
Expand All @@ -367,9 +377,6 @@ endif()






#------------------------------------------------------------------------------------------
if (BUILD_OPENSSL_GLOBALS)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tunnels/shared/openssl)
Expand Down
158 changes: 106 additions & 52 deletions tunnels/client/mux/mux_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ typedef struct mux_client_state_s
enum concurrency_mode mode;
uint32_t connection_cunc_duration;
uint32_t connection_cunc_capacity;
uint32_t width;
thread_connection_pool_t threadlocal_cons[];

} mux_client_state_t;
Expand Down Expand Up @@ -109,6 +110,11 @@ static void destroyChildConnecton(mux_client_child_con_state_t *child)
{
child->next->prev = child->prev;
}
else
{
mux_client_con_state_t *parent = LSTATE(child->parent);
parent->children_root.prev = NULL;
}
doneLineUpSide(child->line);
LSTATE_DROP(child->line);
globalFree(child);
Expand All @@ -126,13 +132,17 @@ static mux_client_child_con_state_t *createChildConnection(mux_client_con_state_

};

if (parent->children_root.next == NULL)
{
parent->children_root.prev = child;
}
parent->children_root.next = child;

if (child->next)
{
child->next->prev = child;
}
setupLineDownSide(child->line, onChildLinePaused, child, onChildLineResumed);
setupLineUpSide(child->line, onChildLinePaused, child, onChildLineResumed);

return child;
}
Expand Down Expand Up @@ -166,8 +176,8 @@ static mux_client_con_state_t *createMainConnection(tunnel_t *self, tid_t tid)
*con = (mux_client_con_state_t) {.tunnel = self,
.line = newLine(tid),
.children_root = {0},
.creation_epoch = hloop_now(WORKERS[tid].loop),
.read_stream = newBufferStream(WORKERS[tid].buffer_pool)};
.creation_epoch = hloop_now(getThreadLoop(tid)),
.read_stream = newBufferStream(getThreadBufferPool(tid))};

setupLineDownSide(con->line, onMainLinePaused, con, onMainLineResumed);

Expand All @@ -180,52 +190,59 @@ static mux_client_con_state_t *grabConnection(tunnel_t *self, tid_t tid)
mux_client_state_t *state = TSTATE(self);
vec_cons *vector = &(state->threadlocal_cons[tid].cons);

if (vec_cons_size(vector) > 0)
while (true)
{
c_foreach(k, vec_cons, *vector)
unsigned int i = state->threadlocal_cons[tid].round_index;
state->threadlocal_cons[tid].round_index++;
if (state->threadlocal_cons[tid].round_index > state->width - 1)
{
switch (state->mode)
{
default:
case kCuncurrencyModeCounter:
if ((*k.ref)->contained < state->connection_cunc_capacity)
{
mux_client_con_state_t *con = (*k.ref);
con->contained += 1;
if (con->contained >= state->connection_cunc_capacity)
{
vec_cons_erase_at(vector, k);
}
return con;
}
else
{
vec_cons_erase_at(vector, k);
return grabConnection(self, tid);
}
state->threadlocal_cons[tid].round_index = 0;
}

break;
if (vec_cons_size(vector) <= i)
{
mux_client_con_state_t *con = createMainConnection(self, tid);
vec_cons_push(vector, con);
return con;
}
struct mux_client_con_state_s *con = *vec_cons_at(vector, i);

case kCuncurrencyModeTimer:
if ((*k.ref)->creation_epoch < hloop_now(WORKERS[tid].loop) + state->connection_cunc_duration)
{
return (*k.ref);
break;
}
else
switch (state->mode)
{
default:
case kCuncurrencyModeCounter:
if (con->contained < state->connection_cunc_capacity)
{
con->contained += 1;
if (con->contained >= state->connection_cunc_capacity)
{
vec_cons_erase_at(vector, k);
return grabConnection(self, tid);
vec_cons_erase_n(vector, i, 1);
}
return con;
}
else
{
vec_cons_erase_n(vector, i, 1);
return grabConnection(self, tid);
}

break;

case kCuncurrencyModeTimer:
if (con->creation_epoch < hloop_now(WORKERS[tid].loop) + state->connection_cunc_duration)
{
return con;
break;
}
else
{
vec_cons_erase_n(vector, i, 1);
return grabConnection(self, tid);
}

break;
}
}

mux_client_con_state_t *con = createMainConnection(self, tid);
vec_cons_push(vector, con);
return con;
}

static bool shouldClose(tunnel_t *self, mux_client_con_state_t *main_con)
Expand Down Expand Up @@ -270,28 +287,47 @@ static void upStream(tunnel_t *self, context_t *c)
mux_client_child_con_state_t *child_con = CSTATE(c);
if (c->payload != NULL)
{
line_t* current_writing_line = c->line;
line_t* main_line = child_con->parent;
line_t *current_writing_line = c->line;
line_t *main_line = child_con->parent;

switchLine(c, main_line);
mux_client_con_state_t *main_con = CSTATE(c);

makeDataFrame(c->payload, child_con->cid);
main_con->current_writing_line = current_writing_line;

while (bufLen(c->payload) > kMuxMaxFrameLength)
{
shift_buffer_t *chunk = popBuffer(getContextBufferPool(c));
sliceBufferTo(chunk, c->payload, kMuxMaxFrameLength);
makeDataFrame(chunk, child_con->cid);

context_t *data_chunk_ctx = newContextFrom(c);
data_chunk_ctx->payload = chunk;
self->up->upStream(self->up, data_chunk_ctx);

if (! isAlive(main_line))
{
reuseContextPayload(c);
destroyContext(c);

return;
}
}

lockLine(main_line);
lockLine(current_writing_line);

main_con->current_writing_line = current_writing_line;
makeDataFrame(c->payload, child_con->cid);

self->up->upStream(self->up, c);

if(isAlive(main_line)){
if (isAlive(main_line))
{
main_con->current_writing_line = NULL;
}

unLockLine(main_line);
unLockLine(current_writing_line);

}
else
{
Expand Down Expand Up @@ -322,8 +358,16 @@ static void upStream(tunnel_t *self, context_t *c)
data_fin_ctx->payload = popBuffer(getLineBufferPool(child_con->parent));
makeCloseFrame(data_fin_ctx->payload, child_con->cid);
destroyChildConnecton(child_con);
lockLine(main_con->line);
self->up->upStream(self->up, data_fin_ctx);

if (! isAlive(main_con->line))
{
unLockLine(main_con->line);
return;
}
unLockLine(main_con->line);

if (shouldClose(self, main_con))
{
context_t *main_con_fin_ctx = newFinContext(main_con->line);
Expand All @@ -350,11 +394,11 @@ static void downStream(tunnel_t *self, context_t *c)
bufferStreamPushContextPayload(main_con->read_stream, c);
while (bufferStreamLen(main_con->read_stream) > sizeof(mux_frame_t))
{
uint16_t length;
mux_length_t length;
bufferStreamViewBytesAt(main_con->read_stream, 0, (uint8_t *) &length, 2);
if (WW_UNLIKELY(length < 1))
if (WW_UNLIKELY(length < kMuxMinFrameLength))
{
LOGE("MuxClient: length < 1");
LOGE("MuxClient: payload length < kMuxMinFrameLength");
destroyMainConnecton(main_con);
destroyContext(c);
return;
Expand All @@ -373,24 +417,32 @@ static void downStream(tunnel_t *self, context_t *c)
{
if (child_con_i->cid == frame.cid)
{
tunnel_t *dest = main_con->tunnel->dw;

switch (frame.flags)
{
case kMuxFlagClose: {
reuseBuffer(getLineBufferPool(c->line), frame_payload);
context_t *fin_ctx = newFinContext(child_con_i->line);
destroyChildConnecton(child_con_i);
dest->downStream(dest, fin_ctx);
self->dw->downStream(self->dw, fin_ctx);
frame_payload = NULL;
}

break;

case kMuxFlagData: {

if (WW_UNLIKELY(bufLen(frame_payload) <= 0))
{
LOGE("MuxClient: payload length < 0");
reuseBuffer(getLineBufferPool(main_con->line), frame_payload);
destroyMainConnecton(main_con);
destroyContext(c);
return;
}
context_t *data_ctx = newContext(child_con_i->line);
data_ctx->payload = frame_payload;
dest->downStream(dest, data_ctx);
self->dw->downStream(self->dw, data_ctx);
frame_payload = NULL;
}

Expand All @@ -401,6 +453,7 @@ static void downStream(tunnel_t *self, context_t *c)
case kMuxFlagOpen:
default:
LOGE("MuxClient: incorrect frame flag");
reuseBuffer(getLineBufferPool(main_con->line), frame_payload);
destroyMainConnecton(main_con);
destroyContext(c);
return;
Expand All @@ -412,7 +465,7 @@ static void downStream(tunnel_t *self, context_t *c)
}
if (frame_payload != NULL)
{
LOGE("MuxClient: a payload could not find consumer cid: %d", (int) frame.cid);
LOGW("MuxClient: a frame could not find consumer cid: %d", (int) frame.cid);
reuseBuffer(getLineBufferPool(main_con->line), frame_payload);
}
else if (! isAlive(c->line))
Expand Down Expand Up @@ -457,6 +510,7 @@ tunnel_t *destroyMuxClient(tunnel_t *self)
(void) (self);
return NULL;
}

tunnel_metadata_t getMetadataMuxClient(void)
{
return (tunnel_metadata_t) {.version = 0001, .flags = 0x0};
Expand Down
16 changes: 16 additions & 0 deletions tunnels/server/mux/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

add_library(MuxServer STATIC
mux_server.c

)

#ww api
target_include_directories(MuxServer PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../ww)
target_link_libraries(MuxServer PUBLIC ww)

# add dependencies
include(${CMAKE_BINARY_DIR}/cmake/CPM.cmake)

target_include_directories(MuxServer PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../shared/mux)

target_compile_definitions(MuxServer PRIVATE MuxServer_VERSION=0.1)
Loading

0 comments on commit fd1e4e6

Please sign in to comment.