Skip to content

Commit

Permalink
[WIP] 30% of halfduplexserver
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 6, 2024
1 parent ec94824 commit 079d506
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 13 deletions.
7 changes: 4 additions & 3 deletions tunnels/client/halfduplex/halfduplex_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ static void upStream(tunnel_t *self, context_t *c)
if (c->first)
{
// 63 bits of random is enough and is better than hashing sender addr on halfduplex server, i believe so...
uint32_t cids[2] = {fastRand(), fastRand()};
uint32_t cids[2] = {fastRand(), fastRand()};
uint8_t *cid_bytes = (uint8_t *) &cids[0];

context_t *intro_context = newContext(cstate->download_line);
intro_context->first = true;
intro_context->payload = popBuffer(getContextBufferPool(c));

cids[0] = htonl(cids[0] | (1 << 31)); // kHLFDCmdDownload
cid_bytes[0] = cid_bytes[0] | (1 << 7); // kHLFDCmdDownload
shiftl(intro_context->payload, 16);
writeRaw(intro_context->payload, &cids[0], sizeof(cids));

Expand All @@ -89,7 +90,7 @@ static void upStream(tunnel_t *self, context_t *c)
return;
}

cids[0] = htonl(cids[0] & 0x7FFFFFFF); // kHLFDCmdUpload
cid_bytes[0] = cid_bytes[0] & 0x7f; // kHLFDCmdDownload
shiftl(intro_context->payload, 16);
writeRaw(intro_context->payload, &cids[0], sizeof(cids));

Expand Down
92 changes: 82 additions & 10 deletions tunnels/server/halfduplex/halfduplex_server.c
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
#include "halfduplex_server.h"
#include "basic_types.h"
#include "hmutex.h"
#include "loggers/network_logger.h"
#include "pipe_line.h"
#include "shiftbuffer.h"
#include "tunnel.h"
#include "utils/jsonutils.h"
#include <stdatomic.h>
#include <stdint.h>
#include <stdlib.h>

#define i_type hmap_lines_t // NOLINT
#define i_key hash_t // NOLINT
#define i_val line_t * // NOLINT
#define i_type hmap_lines_t // NOLINT
#define i_key hash_t // NOLINT
#define i_val halfduplex_server_con_state_t * // NOLINT

enum
{
Expand All @@ -17,16 +24,23 @@ enum
typedef struct halfduplex_server_state_s
{
hhybridmutex_t hmap_mutex;
hmap_lines_t line_map;

hmap_lines_t upload_line_map;
hmap_lines_t download_line_map;

} halfduplex_server_state_t;

typedef struct halfduplex_server_con_state_s
{
atomic_bool ready;

shift_buffer_t *buffering;
pipe_line_t *_Atomic pipe;

line_t *main_line;
line_t *upload_line;
line_t *download_line;

} halfduplex_server_con_state_t;

static void upStream(tunnel_t *self, context_t *c)
Expand All @@ -35,11 +49,67 @@ static void upStream(tunnel_t *self, context_t *c)
halfduplex_server_con_state_t *cstate = CSTATE(c);
if (c->payload != NULL)
{
// todo (buffering) do the buffering
assert(bufLen(c->payload) >= 8);
shift_buffer_t *buf = c->payload;

bool ready = atomic_load_explicit(&(cstate->ready), memory_order_acquire);

if (ready)
{
}
else
{
pipe_line_t *pipe = atomic_load_explicit(&(cstate->pipe), memory_order_relaxed);
if (pipe)
{
}
else
{

const bool is_upload = (((uint8_t *) rawBuf(c->payload))[0] & 0x80) == 0x0;

hash_t hash = 0x0;
readUI64(c->payload, (uint64_t *) &hash);
hash = hash & (0x7FFFFFFFFFFFFFFFULL);

shiftr(buf, sizeof(uint64_t));

if (is_upload)
{
hhybridmutex_lock(&(state->hmap_mutex));
hmap_lines_t_iter f_iter = hmap_lines_t_find(&(state->download_line_map), hash);
if (f_iter.ref == hmap_lines_t_end(&(state->download_line_map)).ref)
{
if (! hmap_lines_t_push(&(state->download_line_map), cstate))
{
reuseContextBuffer(c);
destroyContext(c);
}
}
else
{
}

hhybridmutex_unlock(&(state->hmap_mutex));
}
else
{
}
}
}
}
else
{
if (c->init)
{
cstate = malloc(sizeof(halfduplex_server_con_state_t));
*cstate = (halfduplex_server_con_state_t){
.buffering = NULL, .pipe = NULL, .main_line = NULL, .upload_line = NULL, .download_line = NULL};

CSTATE_MUT(c) = cstate;
self->dw->downStream(self->dw, newEstContext(c->line));
destroyContext(c);
}
else if (c->fin)
{
Expand All @@ -54,6 +124,7 @@ static void downStream(tunnel_t *self, context_t *c)
halfduplex_server_con_state_t *cstate = CSTATE(c);
if (c->payload != NULL)
{
self->dw->downStream(self->dw, c);
}
else
{
Expand All @@ -66,12 +137,13 @@ static void downStream(tunnel_t *self, context_t *c)
}
}

tunnel_t *newHeaderServer(node_instance_context_t *instance_info)
tunnel_t *newHalfDuplexServer(node_instance_context_t *instance_info)
{

halfduplex_server_state_t *state = malloc(sizeof(halfduplex_server_state_t));
memset(state, 0, sizeof(halfduplex_server_state_t));
const cJSON *settings = instance_info->node_settings_json;

hhybridmutex_init(&state->hmap_mutex);

tunnel_t *t = newTunnel();
t->state = state;
Expand All @@ -81,19 +153,19 @@ tunnel_t *newHeaderServer(node_instance_context_t *instance_info)
return t;
}

api_result_t apiHeaderServer(tunnel_t *self, const char *msg)
api_result_t apiHalfDuplexServer(tunnel_t *self, const char *msg)
{
(void) (self);
(void) (msg);
return (api_result_t){0};
}

tunnel_t *destroyHeaderServer(tunnel_t *self)
tunnel_t *destroyHalfDuplexServer(tunnel_t *self)
{
(void) (self);
return NULL;
}
tunnel_metadata_t getMetadataHeaderServer(void)
tunnel_metadata_t getMetadataHalfDuplexServer(void)
{
return (tunnel_metadata_t){.version = 0001, .flags = 0x0};
}

0 comments on commit 079d506

Please sign in to comment.