From 079d506b1aff3079d09fbf3212808e5acd4c8724 Mon Sep 17 00:00:00 2001 From: Radkesvat <134321679+radkesvat@users.noreply.github.com> Date: Thu, 6 Jun 2024 01:37:25 +0000 Subject: [PATCH] [WIP] 30% of halfduplexserver --- tunnels/client/halfduplex/halfduplex_client.c | 7 +- tunnels/server/halfduplex/halfduplex_server.c | 92 +++++++++++++++++-- 2 files changed, 86 insertions(+), 13 deletions(-) diff --git a/tunnels/client/halfduplex/halfduplex_client.c b/tunnels/client/halfduplex/halfduplex_client.c index 988f434e..7f7152ac 100644 --- a/tunnels/client/halfduplex/halfduplex_client.c +++ b/tunnels/client/halfduplex/halfduplex_client.c @@ -67,13 +67,14 @@ static void upStream(tunnel_t *self, context_t *c) if (c->first) { // 63 bits of random is enough and is better than hashing sender addr on halfduplex server, i believe so... - uint32_t cids[2] = {fastRand(), fastRand()}; + uint32_t cids[2] = {fastRand(), fastRand()}; + uint8_t *cid_bytes = (uint8_t *) &cids[0]; context_t *intro_context = newContext(cstate->download_line); intro_context->first = true; intro_context->payload = popBuffer(getContextBufferPool(c)); - cids[0] = htonl(cids[0] | (1 << 31)); // kHLFDCmdDownload + cid_bytes[0] = cid_bytes[0] | (1 << 7); // kHLFDCmdDownload shiftl(intro_context->payload, 16); writeRaw(intro_context->payload, &cids[0], sizeof(cids)); @@ -89,7 +90,7 @@ static void upStream(tunnel_t *self, context_t *c) return; } - cids[0] = htonl(cids[0] & 0x7FFFFFFF); // kHLFDCmdUpload + cid_bytes[0] = cid_bytes[0] & 0x7f; // kHLFDCmdDownload shiftl(intro_context->payload, 16); writeRaw(intro_context->payload, &cids[0], sizeof(cids)); diff --git a/tunnels/server/halfduplex/halfduplex_server.c b/tunnels/server/halfduplex/halfduplex_server.c index afce1741..140ac02e 100644 --- a/tunnels/server/halfduplex/halfduplex_server.c +++ b/tunnels/server/halfduplex/halfduplex_server.c @@ -1,11 +1,18 @@ #include "halfduplex_server.h" +#include "basic_types.h" #include "hmutex.h" #include "loggers/network_logger.h" +#include "pipe_line.h" +#include "shiftbuffer.h" +#include "tunnel.h" #include "utils/jsonutils.h" +#include +#include +#include -#define i_type hmap_lines_t // NOLINT -#define i_key hash_t // NOLINT -#define i_val line_t * // NOLINT +#define i_type hmap_lines_t // NOLINT +#define i_key hash_t // NOLINT +#define i_val halfduplex_server_con_state_t * // NOLINT enum { @@ -17,16 +24,23 @@ enum typedef struct halfduplex_server_state_s { hhybridmutex_t hmap_mutex; - hmap_lines_t line_map; + + hmap_lines_t upload_line_map; + hmap_lines_t download_line_map; } halfduplex_server_state_t; typedef struct halfduplex_server_con_state_s { + atomic_bool ready; + + shift_buffer_t *buffering; + pipe_line_t *_Atomic pipe; + line_t *main_line; line_t *upload_line; line_t *download_line; - + } halfduplex_server_con_state_t; static void upStream(tunnel_t *self, context_t *c) @@ -35,11 +49,67 @@ static void upStream(tunnel_t *self, context_t *c) halfduplex_server_con_state_t *cstate = CSTATE(c); if (c->payload != NULL) { + // todo (buffering) do the buffering + assert(bufLen(c->payload) >= 8); + shift_buffer_t *buf = c->payload; + + bool ready = atomic_load_explicit(&(cstate->ready), memory_order_acquire); + + if (ready) + { + } + else + { + pipe_line_t *pipe = atomic_load_explicit(&(cstate->pipe), memory_order_relaxed); + if (pipe) + { + } + else + { + + const bool is_upload = (((uint8_t *) rawBuf(c->payload))[0] & 0x80) == 0x0; + + hash_t hash = 0x0; + readUI64(c->payload, (uint64_t *) &hash); + hash = hash & (0x7FFFFFFFFFFFFFFFULL); + + shiftr(buf, sizeof(uint64_t)); + + if (is_upload) + { + hhybridmutex_lock(&(state->hmap_mutex)); + hmap_lines_t_iter f_iter = hmap_lines_t_find(&(state->download_line_map), hash); + if (f_iter.ref == hmap_lines_t_end(&(state->download_line_map)).ref) + { + if (! hmap_lines_t_push(&(state->download_line_map), cstate)) + { + reuseContextBuffer(c); + destroyContext(c); + } + } + else + { + } + + hhybridmutex_unlock(&(state->hmap_mutex)); + } + else + { + } + } + } } else { if (c->init) { + cstate = malloc(sizeof(halfduplex_server_con_state_t)); + *cstate = (halfduplex_server_con_state_t){ + .buffering = NULL, .pipe = NULL, .main_line = NULL, .upload_line = NULL, .download_line = NULL}; + + CSTATE_MUT(c) = cstate; + self->dw->downStream(self->dw, newEstContext(c->line)); + destroyContext(c); } else if (c->fin) { @@ -54,6 +124,7 @@ static void downStream(tunnel_t *self, context_t *c) halfduplex_server_con_state_t *cstate = CSTATE(c); if (c->payload != NULL) { + self->dw->downStream(self->dw, c); } else { @@ -66,12 +137,13 @@ static void downStream(tunnel_t *self, context_t *c) } } -tunnel_t *newHeaderServer(node_instance_context_t *instance_info) +tunnel_t *newHalfDuplexServer(node_instance_context_t *instance_info) { halfduplex_server_state_t *state = malloc(sizeof(halfduplex_server_state_t)); memset(state, 0, sizeof(halfduplex_server_state_t)); - const cJSON *settings = instance_info->node_settings_json; + + hhybridmutex_init(&state->hmap_mutex); tunnel_t *t = newTunnel(); t->state = state; @@ -81,19 +153,19 @@ tunnel_t *newHeaderServer(node_instance_context_t *instance_info) return t; } -api_result_t apiHeaderServer(tunnel_t *self, const char *msg) +api_result_t apiHalfDuplexServer(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); return (api_result_t){0}; } -tunnel_t *destroyHeaderServer(tunnel_t *self) +tunnel_t *destroyHalfDuplexServer(tunnel_t *self) { (void) (self); return NULL; } -tunnel_metadata_t getMetadataHeaderServer(void) +tunnel_metadata_t getMetadataHalfDuplexServer(void) { return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; }