diff --git a/tunnels/adapters/connector/connector.c b/tunnels/adapters/connector/connector.c index cfdbe5da..88b31949 100644 --- a/tunnels/adapters/connector/connector.c +++ b/tunnels/adapters/connector/connector.c @@ -1,10 +1,27 @@ #include "connector.h" +#include "loggers/network_logger.h" #include "types.h" #include "utils/sockutils.h" -#include "loggers/network_logger.h" - +static void tcpUpStream(tunnel_t *self, context_t *c) +{ +} +static void tcpDownStream(tunnel_t *self, context_t *c) +{ +} +static void udpUpStream(tunnel_t *self, context_t *c) +{ +} +static void udpDownStream(tunnel_t *self, context_t *c) +{ +} +static void upStream(tunnel_t *self, context_t *c) +{ +} +static void downStream(tunnel_t *self, context_t *c) +{ +} tunnel_t *newConnector(node_instance_context_t *instance_info) { @@ -12,68 +29,28 @@ tunnel_t *newConnector(node_instance_context_t *instance_info) memset(state, 0, sizeof(connector_state_t)); const cJSON *settings = instance_info->node_settings_json; - if (!(cJSON_IsObject(settings) && settings->child != NULL)) + if (! (cJSON_IsObject(settings) && settings->child != NULL)) { LOGF("JSON Error: Connector->settings (object field) : The object was empty or invalid"); return NULL; } - const cJSON *tcp_settings = cJSON_GetObjectItemCaseSensitive(settings, "tcp"); - if ((cJSON_IsObject(tcp_settings) && settings->child != NULL)) - { - getBoolFromJsonObject(&(state->tcp_no_delay), tcp_settings, "nodelay"); - getBoolFromJsonObject(&(state->tcp_fast_open), tcp_settings, "fastopen"); - getBoolFromJsonObject(&(state->reuse_addr), tcp_settings, "reuseaddr"); - int ds = 0; - getIntFromJsonObject(&ds, tcp_settings, "domain-strategy"); - state->domain_strategy = ds; - } - else - { - // memset set everything to 0... - } - - state->dest_addr = parseDynamicStrValueFromJsonObject(settings, "address",2, - "src_context->address", - "dest_context->address"); - - if (state->dest_addr.status == kDvsEmpty) - { - LOGF("JSON Error: Connector->settings->address (string field) : The vaule was empty or invalid"); - return NULL; - } - - state->dest_port = parseDynamicNumericValueFromJsonObject(settings, "port",2, - "src_context->port", - "dest_context->port"); - - if (state->dest_port.status == kDvsEmpty) - { - LOGF("JSON Error: Connector->settings->port (number field) : The vaule was empty or invalid"); - return NULL; - } - if(state->dest_addr.status == kDvsConstant){ - state->dest_atype = getHostAddrType(state->dest_addr.value_ptr); - state->dest_domain_len = strlen(state->dest_addr.value_ptr); - } - - - tunnel_t *t = newTunnel(); - t->state = state; - - t->upStream = &upStream; - t->downStream = &downStream; - + tunnel_t *t = newTunnel(); + t->state = state; + t->upStream = &upStream; + t->downStream = &downStream; atomic_thread_fence(memory_order_release); - return t; } -api_result_t apiConnector(tunnel_t *self, char *msg) +api_result_t apiConnector(tunnel_t *self,const char *msg) { - (void)(self); (void)(msg); return (api_result_t){0}; // TODO + (void) (self); + (void) (msg); + return (api_result_t){0}; } tunnel_t *destroyConnector(tunnel_t *self) { + (void) (self); return NULL; } tunnel_metadata_t getMetadataConnector() diff --git a/tunnels/adapters/connector/connector.h b/tunnels/adapters/connector/connector.h index 975590eb..8388d813 100644 --- a/tunnels/adapters/connector/connector.h +++ b/tunnels/adapters/connector/connector.h @@ -1,14 +1,12 @@ #pragma once #include "api.h" - // con <-----\ /-----> TCP Connect || Udp Associate // con <------> Connector <------> TCP Connect || Udp Associate // con <-----/ \-----> TCP Connect || Udp Associate // - -tunnel_t *newConnector(node_instance_context_t *instance_info); -api_result_t apiConnector(tunnel_t *self, char *msg); -tunnel_t *destroyConnector(tunnel_t *self); +tunnel_t * newConnector(node_instance_context_t *instance_info); +api_result_t apiConnector(tunnel_t *self, const char *msg); +tunnel_t * destroyConnector(tunnel_t *self); tunnel_metadata_t getMetadataConnector(); diff --git a/tunnels/adapters/connector/tcp.c b/tunnels/adapters/connector/tcp.c deleted file mode 100644 index 68cad03f..00000000 --- a/tunnels/adapters/connector/tcp.c +++ /dev/null @@ -1,385 +0,0 @@ -#include "types.h" -#include "utils/sockutils.h" -#include "loggers/network_logger.h" - -static void cleanup(connector_con_state_t *cstate) -{ - if (cstate->io) - { - hevent_set_userdata(cstate->io, NULL); - } - hio_t *last_resumed_io = NULL; - - while (contextQueueLen(cstate->data_queue) > 0) - { - context_t *cw = contextQueuePop(cstate->data_queue); - if (cw->src_io != NULL && last_resumed_io != cw->src_io) - { - last_resumed_io = cw->src_io; - hio_read(cw->src_io); - } - if (cw->payload) - { - reuseContextBuffer(cw); - } - destroyContext(cw); - } - - while (contextQueueLen(cstate->finished_queue) > 0) - { - context_t *cw = contextQueuePop(cstate->finished_queue); - if (cw->src_io != NULL && last_resumed_io != cw->src_io) - { - last_resumed_io = cw->src_io; - hio_read(cw->src_io); - } - destroyContext(cw); - } - - destroyContextQueue(cstate->data_queue); - destroyContextQueue(cstate->finished_queue); - free(cstate); -} -static bool resume_write_queue(connector_con_state_t *cstate) -{ - context_queue_t *data_queue = (cstate)->data_queue; - context_queue_t *finished_queue = (cstate)->finished_queue; - hio_t *io = cstate->io; - while (contextQueueLen(data_queue) > 0) - { - context_t *cw = contextQueuePop(data_queue); - - int bytes = bufLen(cw->payload); - int nwrite = hio_write(io, rawBuf(cw->payload), bytes); - reuseBuffer(cstate->buffer_pool, cw->payload); - cw->payload = NULL; - contextQueuePush(cstate->finished_queue, cw); - if (nwrite >= 0 && nwrite < bytes) - return false; // write pending - } - // data data_queue is empty - hio_t *last_resumed_io = NULL; - while (contextQueueLen(finished_queue) > 0) - { - context_t *cw = contextQueuePop(finished_queue); - hio_t *upstream_io = cw->src_io; - if (upstream_io != NULL && (last_resumed_io != upstream_io)) - { - last_resumed_io = upstream_io; - hio_read(upstream_io); - } - destroyContext(cw); - } - return true; -} - -static void on_write_complete(hio_t * restrict io, const void * restrict buf, int writebytes) -{ - // resume the read on other end of the connection - connector_con_state_t *cstate = (connector_con_state_t *)(hevent_userdata(io)); - if (cstate == NULL) - return; - - if (hio_write_is_complete(io)) - { - hio_setcb_write(cstate->io, NULL); - cstate->write_paused = false; - - context_queue_t *data_queue = cstate->data_queue; - context_queue_t *finished_queue = cstate->finished_queue; - if (contextQueueLen(data_queue) > 0) - if (!resume_write_queue(cstate)) - { - hio_setcb_write(cstate->io, on_write_complete); - cstate->write_paused = true; - return; - } - - hio_t *last_resumed_io = NULL; - while (contextQueueLen(finished_queue) > 0) - { - context_t *cw = contextQueuePop(finished_queue); - hio_t *upstream_io = cw->src_io; - if (upstream_io != NULL && (last_resumed_io != upstream_io)) - { - last_resumed_io = upstream_io; - hio_read(upstream_io); - } - destroyContext(cw); - } - } -} - -static void on_recv(hio_t * restrict io, void * restrict buf, int readbytes) -{ - connector_con_state_t *cstate = (connector_con_state_t *)(hevent_userdata(io)); - if (cstate == NULL) - return; - - shift_buffer_t *payload = popBuffer(cstate->buffer_pool); - setLen(payload, readbytes); - writeRaw(payload,buf,readbytes); - - tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; - - context_t *context = newContext(line); - context->src_io = io; - context->payload = payload; - - self->downStream(self, context); -} - -static void on_close(hio_t *io) -{ - connector_con_state_t *cstate = (connector_con_state_t *)(hevent_userdata(io)); - if (cstate != NULL) - LOGD("Connector: received close for FD:%x ", - (int)hio_fd(io)); - else - LOGD("Connector: sent close for FD:%x ", - (int)hio_fd(io)); - - if (cstate != NULL) - { - tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; - context_t *context = newFinContext(line); - self->downStream(self, context); - } -} - -static void onOutBoundConnected(hio_t *upstream_io) -{ - - connector_con_state_t *cstate = hevent_userdata(upstream_io); -#ifdef PROFILE - struct timeval tv2; - gettimeofday(&tv2, NULL); - - double time_spent = (double)(tv2.tv_usec - (cstate->__profile_conenct).tv_usec) / 1000000 + (double)(tv2.tv_sec - (cstate->__profile_conenct).tv_sec); - LOGD("Connector: tcp connect took %d ms", (int)(time_spent * 1000)); -#endif - - tunnel_t *self = cstate->tunnel; - line_t *line = cstate->line; - hio_setcb_read(upstream_io, on_recv); - - char localaddrstr[SOCKADDR_STRLEN] = {0}; - char peeraddrstr[SOCKADDR_STRLEN] = {0}; - - LOGD("Connector: connection succeed FD:%x [%s] => [%s]", - (int)hio_fd(upstream_io), - SOCKADDR_STR(hio_localaddr(upstream_io), localaddrstr), - SOCKADDR_STR(hio_peeraddr(upstream_io), peeraddrstr)); - - context_t *est_context = newContext(line); - est_context->est = true; - est_context->src_io = upstream_io; - self->downStream(self, est_context); -} - -void connectorUpStream(tunnel_t *self, context_t *c) -{ - connector_con_state_t *cstate = CSTATE(c); - - if (c->payload != NULL) - { - if (cstate->write_paused) - { - if (c->src_io) - hio_read_stop(c->src_io); - contextQueuePush(cstate->data_queue, c); - } - else - { - int bytes = bufLen(c->payload); - int nwrite = hio_write(cstate->io, rawBuf(c->payload), bytes); - if (nwrite >= 0 && nwrite < bytes) - { - if (c->src_io) - hio_read_stop(c->src_io); - reuseBuffer(cstate->buffer_pool, c->payload); - c->payload = NULL; - - contextQueuePush(cstate->finished_queue, c); - cstate->write_paused = true; - hio_setcb_write(cstate->io, on_write_complete); - } - else - { - reuseBuffer(cstate->buffer_pool, c->payload); - c->payload = NULL; - destroyContext(c); - } - } - } - else - { - if (c->init) - { - - CSTATE_MUT(c) = malloc(sizeof(connector_con_state_t)); - memset(CSTATE(c), 0, sizeof(connector_con_state_t)); - connector_con_state_t *cstate = CSTATE(c); -#ifdef PROFILE - gettimeofday(&(cstate->__profile_conenct), NULL); -#endif - - cstate->buffer_pool = buffer_pools[c->line->tid]; - cstate->tunnel = self; - cstate->line = c->line; - cstate->data_queue = newContextQueue(cstate->buffer_pool); - cstate->finished_queue = newContextQueue(cstate->buffer_pool); - - cstate->write_paused = true; - - socket_context_t final_ctx = {0}; - // fill the final_ctx address based on settings - { - socket_context_t *src_ctx = &(c->line->src_ctx); - socket_context_t *dest_ctx = &(c->line->dest_ctx); - connector_state_t *state = STATE(self); - - if (state->dest_addr.status == cdvs_from_source) - copySocketContextAddr(&final_ctx, &src_ctx); - else if (state->dest_addr.status == cdvs_from_dest) - copySocketContextAddr(&final_ctx, &dest_ctx); - else - { - final_ctx.atype = state->dest_atype; - if (state->dest_atype == kSatDomainName) - { - final_ctx.domain = malloc(state->dest_domain_len + 1); - memcpy(final_ctx.domain, state->dest_addr.value_ptr, state->dest_domain_len + 1); - final_ctx.resolved = false; - final_ctx.addr.sa.sa_family = AF_INET; // addr resolve will change this - } - else - sockaddr_set_ip(&(final_ctx.addr), state->dest_addr.value_ptr); - } - - if (state->dest_port.status == cdvs_from_source) - sockaddr_set_port(&(final_ctx.addr), sockaddr_port(&(src_ctx->addr))); - else if (state->dest_port.status == cdvs_from_dest) - sockaddr_set_port(&(final_ctx.addr), sockaddr_port(&(dest_ctx->addr))); - else - sockaddr_set_port(&(final_ctx.addr), state->dest_port.value); - } - - // sockaddr_set_ipport(&(final_ctx.addr), "127.0.0.1", 443); - - LOGD("Connector: initiating tcp connection"); - if (final_ctx.atype == kSatDomainName) - { - if (!final_ctx.resolved) - { - if (!connectorResolvedomain(&final_ctx)) - { - free(final_ctx.domain); - cleanup(cstate); - CSTATE_MUT(c) = NULL; - goto fail; - } - } - free(final_ctx.domain); - } - - hloop_t *loop = loops[c->line->tid]; - int sockfd = socket(final_ctx.addr.sa.sa_family, SOCK_STREAM, 0); - if (sockfd < 0) - { - LOGE("Connector: socket fd < 0"); - cleanup(cstate); - CSTATE_MUT(c) = NULL; - goto fail; - } - if (STATE(self)->tcp_no_delay) - { - tcp_nodelay(sockfd, 1); - } - if (STATE(self)->reuse_addr) - { - so_reuseport(sockfd, 1); - } - - if (STATE(self)->tcp_fast_open) - { - const int yes = 1; - setsockopt(sockfd, SOL_TCP, TCP_FASTOPEN, &yes, sizeof(yes)); - } - - hio_t *upstream_io = hio_get(loop, sockfd); - assert(upstream_io != NULL); - - hio_set_peeraddr(upstream_io, &(final_ctx.addr.sa), sockaddr_len(&(final_ctx.addr))); - cstate->io = upstream_io; - hevent_set_userdata(upstream_io, cstate); - - // io <=> upstream_io - // hio_setup_upstream(io, upstream_io); - hio_setcb_connect(upstream_io, onOutBoundConnected); - hio_setcb_close(upstream_io, on_close); - - // printf("connect to "); - // SOCKADDR_PRINT(hio_peeraddr(upstream_io)); - hio_connect(upstream_io); - destroyContext(c); - } - else if (c->fin) - { - hio_t *io = cstate->io; - cleanup(cstate); - CSTATE_MUT(c) = NULL; - destroyContext(c); - hio_close(io); - } - } - return; -fail:; - self->dw->downStream(self->dw, newFinContext(c->line)); - destroyContext(c); -} -void connectorDownStream(tunnel_t *self, context_t *c) -{ - connector_con_state_t *cstate = CSTATE(c); - - if (c->payload != NULL) - { -#ifdef PROFILE - struct timeval tv1, tv2; - gettimeofday(&tv1, NULL); - { - self->dw->downStream(self->dw, c); - } - gettimeofday(&tv2, NULL); - double time_spent = (double)(tv2.tv_usec - tv1.tv_usec) / 1000000 + (double)(tv2.tv_sec - tv1.tv_sec); - LOGD("Connector: tcp downstream took %d ms", (int)(time_spent * 1000)); -#else - self->dw->downStream(self->dw, c); - -#endif - } - else - { - - if (c->est) - { - cstate->established = true; - hio_read(cstate->io); - if (resume_write_queue(cstate)) - cstate->write_paused = false; - else - hio_setcb_write(cstate->io, on_write_complete); - - self->dw->downStream(self->dw, c); - } - else if (c->fin) - { - - cleanup(cstate); - CSTATE_MUT(c) = NULL; - self->dw->downStream(self->dw, c); - } - } -} \ No newline at end of file diff --git a/tunnels/adapters/connector/tcp/tcp_connector.c b/tunnels/adapters/connector/tcp/tcp_connector.c index 2863f8f2..5d20d86b 100644 --- a/tunnels/adapters/connector/tcp/tcp_connector.c +++ b/tunnels/adapters/connector/tcp/tcp_connector.c @@ -3,7 +3,7 @@ #include "types.h" #include "utils/sockutils.h" -static void cleanup(tcp_connector_con_state_t *cstate) +static void cleanup(tcp_connector_con_state_t *cstate, bool write_queue) { if (cstate->io) { @@ -20,10 +20,11 @@ static void cleanup(tcp_connector_con_state_t *cstate) last_resumed_io = cw->src_io; hio_read(cw->src_io); } - if (cw->payload) + if (write_queue) { - reuseContextBuffer(cw); + hio_write(cstate->io, rawBuf(cw->payload), bufLen(cw->payload)); } + reuseContextBuffer(cw); destroyContext(cw); } @@ -233,8 +234,8 @@ void upStream(tunnel_t *self, context_t *c) { if (c->init) { - - CSTATE_MUT(c) = malloc(sizeof(tcp_connector_con_state_t)); + tcp_connector_state_t *state = STATE(self); + CSTATE_MUT(c) = malloc(sizeof(tcp_connector_con_state_t)); memset(CSTATE(c), 0, sizeof(tcp_connector_con_state_t)); tcp_connector_con_state_t *cstate = CSTATE(c); #ifdef PROFILE @@ -248,77 +249,52 @@ void upStream(tunnel_t *self, context_t *c) cstate->finished_queue = newContextQueue(cstate->buffer_pool); cstate->write_paused = true; - socket_context_t final_ctx = {0}; - // fill the final_ctx address based on settings + socket_context_t *dest_ctx = &(c->line->dest_ctx); + socket_context_t *src_ctx = &(c->line->src_ctx); + switch (state->dest_addr_selected.status) { - socket_context_t * src_ctx = &(c->line->src_ctx); - socket_context_t * dest_ctx = &(c->line->dest_ctx); - tcp_connector_state_t *state = STATE(self); - - if (state->dest_addr.status == kCdvsFromSource) - { - copySocketContextAddr(&final_ctx, &src_ctx); - } - else if (state->dest_addr.status == kCdvsFromDest) - { - copySocketContextAddr(&final_ctx, &dest_ctx); - } - else - { - final_ctx.atype = state->dest_atype; - if (state->dest_atype == kSatDomainName) - { - final_ctx.domain = malloc(state->dest_domain_len + 1); - memcpy(final_ctx.domain, state->dest_addr.value_ptr, state->dest_domain_len + 1); - final_ctx.resolved = false; - final_ctx.addr.sa.sa_family = AF_INET; // addr resolve will change this - } - else - { - sockaddr_set_ip(&(final_ctx.addr), state->dest_addr.value_ptr); - } - } - - if (state->dest_port.status == kCdvsFromSource) - { - sockaddr_set_port(&(final_ctx.addr), sockaddr_port(&(src_ctx->addr))); - } - else if (state->dest_port.status == kCdvsFromDest) - { - sockaddr_set_port(&(final_ctx.addr), sockaddr_port(&(dest_ctx->addr))); - } - else - { - sockaddr_set_port(&(final_ctx.addr), state->dest_port.value); - } + case kCdvsFromSource: + copySocketContextAddr(&dest_ctx, &src_ctx); + break; + case kCdvsConstant: + copySocketContextAddr(&dest_ctx, &(state->constant_dest_addr)); + break; + default: + case kCdvsFromDest: + break; + } + switch (state->dest_port_selected.status) + { + case kCdvsFromSource: + copySocketContextPort(&dest_ctx, &src_ctx); + break; + case kCdvsConstant: + copySocketContextPort(&dest_ctx, &(state->constant_dest_addr)); + break; + default: + case kCdvsFromDest: + break; } - // sockaddr_set_ipport(&(final_ctx.addr), "127.0.0.1", 443); + // sockaddr_set_ipport(&(dest_ctx.addr), "127.0.0.1", 443); + // LOGD("TcpConnector: initiating connection"); - LOGD("TcpConnector: initiating connection"); - if (final_ctx.atype == kSatDomainName) + if (dest_ctx->address_type == kSatDomainName && ! dest_ctx->domain_resolved) { - if (! final_ctx.resolved) + if (! resolveContextSync(dest_ctx)) { - if (! tcpConnectorResolvedomain(&final_ctx)) - { - free(final_ctx.domain); - cleanup(cstate); - - CSTATE_MUT(c) = NULL; - goto fail; - } + cleanup(cstate, false); + CSTATE_MUT(c) = NULL; + goto fail; } - free(final_ctx.domain); } - tcp_connector_state_t *state = STATE(self); hloop_t *loop = loops[c->line->tid]; - int sockfd = socket(final_ctx.addr.sa.sa_family, SOCK_STREAM, 0); + int sockfd = socket(dest_ctx->addr.sa.sa_family, SOCK_STREAM, 0); if (sockfd < 0) { LOGE("Connector: socket fd < 0"); - cleanup(cstate); + cleanup(cstate, false); CSTATE_MUT(c) = NULL; goto fail; } @@ -340,7 +316,7 @@ void upStream(tunnel_t *self, context_t *c) hio_t *upstream_io = hio_get(loop, sockfd); assert(upstream_io != NULL); - hio_set_peeraddr(upstream_io, &(final_ctx.addr.sa), sockaddr_len(&(final_ctx.addr))); + hio_set_peeraddr(upstream_io, &(dest_ctx->addr.sa), sockaddr_len(&(dest_ctx->addr))); cstate->io = upstream_io; hevent_set_userdata(upstream_io, cstate); @@ -351,9 +327,9 @@ void upStream(tunnel_t *self, context_t *c) } else if (c->fin) { - hio_t *io = cstate->io; - cleanup(cstate); + hio_t *io = cstate->io; CSTATE_MUT(c) = NULL; + cleanup(cstate, true); destroyContext(c); hio_close(io); } @@ -402,7 +378,7 @@ void downStream(tunnel_t *self, context_t *c) } else if (c->fin) { - cleanup(cstate); + cleanup(cstate, false); CSTATE_MUT(c) = NULL; self->dw->downStream(self->dw, c); } @@ -421,42 +397,38 @@ tunnel_t *newTcpConnector(node_instance_context_t *instance_info) return NULL; } - const cJSON *tcp_settings = cJSON_GetObjectItemCaseSensitive(settings, "tcp"); - if ((cJSON_IsObject(tcp_settings) && settings->child != NULL)) - { - getBoolFromJsonObject(&(state->tcp_no_delay), tcp_settings, "nodelay"); - getBoolFromJsonObject(&(state->tcp_fast_open), tcp_settings, "fastopen"); - getBoolFromJsonObject(&(state->reuse_addr), tcp_settings, "reuseaddr"); - int ds = 0; - getIntFromJsonObject(&ds, tcp_settings, "domain-strategy"); - state->domain_strategy = ds; - } - else - { - // memset set everything to 0... - } + getBoolFromJsonObjectOrDefault(&(state->tcp_no_delay), settings, "nodelay", true); + getBoolFromJsonObjectOrDefault(&(state->tcp_fast_open), settings, "fastopen", false); + getBoolFromJsonObjectOrDefault(&(state->reuse_addr), settings, "reuseaddr", false); + getIntFromJsonObjectOrDefault(&(state->domain_strategy), settings, "domain-strategy", 0); - state->dest_addr = + state->dest_addr_selected = parseDynamicStrValueFromJsonObject(settings, "address", 2, "src_context->address", "dest_context->address"); - if (state->dest_addr.status == kDvsEmpty) + if (state->dest_addr_selected.status == kDvsEmpty) { LOGF("JSON Error: TcpConnector->settings->address (string field) : The vaule was empty or invalid"); return NULL; } + if (state->dest_addr_selected.status == kDvsConstant) + { + state->constant_dest_addr.address_type = getHostAddrType(state->dest_addr_selected.value_ptr); + allocateDomainBuffer(&(state->constant_dest_addr)); + setSocketContextDomain(&(state->constant_dest_addr), state->dest_addr_selected.value_ptr, + strlen(state->dest_addr_selected.value_ptr)); + } - state->dest_port = + state->dest_port_selected = parseDynamicNumericValueFromJsonObject(settings, "port", 2, "src_context->port", "dest_context->port"); - if (state->dest_port.status == kDvsEmpty) + if (state->dest_port_selected.status == kDvsEmpty) { LOGF("JSON Error: TcpConnector->settings->port (number field) : The vaule was empty or invalid"); return NULL; } - if (state->dest_addr.status == kDvsConstant) + if (state->dest_port_selected.status == kDvsConstant) { - state->dest_atype = getHostAddrType(state->dest_addr.value_ptr); - state->dest_domain_len = strlen(state->dest_addr.value_ptr); + sockaddr_set_port(&(state->constant_dest_addr), state->dest_port_selected.value); } tunnel_t *t = newTunnel(); @@ -465,7 +437,6 @@ tunnel_t *newTcpConnector(node_instance_context_t *instance_info) t->downStream = &downStream; atomic_thread_fence(memory_order_release); - return t; } api_result_t apiTcpConnector(tunnel_t *self, const char *msg) diff --git a/tunnels/adapters/connector/tcp/types.h b/tunnels/adapters/connector/tcp/types.h index 40ac9e48..3f6eea5f 100644 --- a/tunnels/adapters/connector/tcp/types.h +++ b/tunnels/adapters/connector/tcp/types.h @@ -15,15 +15,13 @@ enum tcp_connector_dynamic_value_status typedef struct tcp_connector_state_s { // settings - bool tcp_no_delay; - bool tcp_fast_open; - bool reuse_addr; - enum domain_strategy domain_strategy; - - size_t dest_domain_len; - enum socket_address_type dest_atype; - dynamic_value_t dest_addr; - dynamic_value_t dest_port; + bool tcp_no_delay; + bool tcp_fast_open; + bool reuse_addr; + enum domain_strategy domain_strategy; + dynamic_value_t dest_addr_selected; + dynamic_value_t dest_port_selected; + socket_context_t constant_dest_addr; } tcp_connector_state_t; diff --git a/tunnels/adapters/connector/types.h b/tunnels/adapters/connector/types.h index 3e36f8f7..442b0765 100644 --- a/tunnels/adapters/connector/types.h +++ b/tunnels/adapters/connector/types.h @@ -1,59 +1,19 @@ #pragma once #include "api.h" -#define STATE(x) ((connector_state_t *) ((x)->state)) -#define CSTATE(x) ((connector_con_state_t *) ((((x)->line->chains_state)[self->chain_index]))) -#define CSTATE_MUT(x) ((x)->line->chains_state)[self->chain_index] -// enable profile to see how much it takes to connect and downstream write, dns and etc... -// #define PROFILE 1 - -enum connector_dynamic_value_status -{ - kCdvsEmpty = 0x0, - kCdvsConstant, - kCdvsFromSource, - kCdvsFromDest, -}; typedef struct connector_state_s { - // settings - bool tcp_no_delay; - bool tcp_fast_open; - bool reuse_addr; - enum domain_strategy domain_strategy; - - size_t dest_domain_len; - enum socket_address_type dest_atype; - dynamic_value_t dest_addr; - dynamic_value_t dest_port; + tunnel_t *tcp_connector; + tunnel_t *udp_connector; } connector_state_t; typedef struct connector_con_state_s { -#ifdef PROFILE - struct timeval __profile_conenct; -#endif - TunnelFlowRoutine upstream; - TunnelFlowRoutine downstream; - tunnel_t * tunnel; - line_t * line; - hio_t * io; - - buffer_pool_t * buffer_pool; - context_queue_t *data_queue; - context_queue_t *finished_queue; - - bool write_paused; - bool established; + } connector_con_state_t; -bool connectorResolvedomain(socket_context_t *dest); -void connectorTcpUpStream(tunnel_t *self, context_t *c); -void connectorTcpDownStream(tunnel_t *self, context_t *c); -void connectorPacketUpStream(tunnel_t *self, context_t *c); -void connectorPacketDownStream(tunnel_t *self, context_t *c); diff --git a/tunnels/adapters/connector/udp.c b/tunnels/adapters/connector/udp.c deleted file mode 100644 index 7dad88ed..00000000 --- a/tunnels/adapters/connector/udp.c +++ /dev/null @@ -1,222 +0,0 @@ -#include "loggers/network_logger.h" -#include "types.h" -#include "utils/sockutils.h" - -static void cleanup(connector_con_state_t *cstate) -{ - connector_state_t *state = STATE(cstate->tunnel); - if (state->dest_addr.status == cdvs_constant) - { - - }else if (state->dest_addr.status > cdvs_constant){ - free(cstate->line->dest_ctx) - } - - free(cstate); -} -static void onUdpRecv(hio_t *io, void *buf, int readbytes) -{ - connector_con_state_t *cstate = (connector_con_state_t *) (hevent_userdata(io)); - - shift_buffer_t *payload = popBuffer(cstate->buffer_pool); - setLen(payload, readbytes); - writeRaw(payload, buf, readbytes); - - tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; - - struct sockaddr *destaddr = hio_peeraddr(io); - - enum socket_address_type atype; - - if (destaddr->sa_family == AF_INET6) - { - atype = kSatIPV6; - } - else - { - atype = kSatIPV4; - } - - if (! cstate->established) - { - cstate->established = true; - context_t *est_context = newContext(line); - est_context->est = true; - est_context->src_io = io; - self->packetDownStream(self, est_context); - if (hevent_userdata(io) == NULL) - { - return; - } - } - - context_t *context = newContext(line); - context->src_io = io; - context->payload = payload; - - self->packetDownStream(self, context); -} - -void connectorPacketUpStream(tunnel_t *self, context_t *c) -{ - connector_con_state_t *cstate = CSTATE(c); - - if (c->payload != NULL) - { - unsigned int bytes = bufLen(c->payload); - - if (hio_is_closed(cstate->io)) - { - cleanup(CSTATE(c)); - CSTATE_MUT(c) = NULL; - reuseContextBuffer(c); - goto fail; - } - - size_t nwrite = hio_write(cstate->io, rawBuf(c->payload), bytes); - if (nwrite >= 0 && nwrite < bytes) - { - assert(false); // should not happen - } - - reuseContextBuffer(c); - destroyContext(c); - } - else - { - if (c->init) - { - CSTATE_MUT(c) = malloc(sizeof(connector_con_state_t)); - memset(CSTATE(c), 0, sizeof(connector_con_state_t)); - connector_con_state_t *cstate = CSTATE(c); - - cstate->buffer_pool = buffer_pools[c->line->tid]; - cstate->tunnel = self; - cstate->line = c->line; - cstate->write_paused = false; - cstate->data_queue = NULL; - cstate->finished_queue = NULL; - - // sockaddr_set_ipport(&(dest->addr),"www.gstatic.com",80); - - hloop_t *loop = loops[c->line->tid]; - - sockaddr_u host_addr = {0}; - sockaddr_set_ipport(&host_addr, "0.0.0.0", 0); - - int sockfd = socket(host_addr.sa.sa_family, SOCK_DGRAM, 0); - if (sockfd < 0) - { - LOGE("Connector: socket fd < 0"); - cleanup(CSTATE(c)); - CSTATE_MUT(c) = NULL; - goto fail; - } - -#ifdef OS_UNIX - so_reuseaddr(sockfd, 1); -#endif - sockaddr_u addr; - - sockaddr_set_ipport(&addr, "0.0.0.0", 0); - - if (bind(sockfd, &addr.sa, sockaddr_len(&addr)) < 0) - { - LOGE("UDP bind failed;"); - closesocket(sockfd); - goto fail; - } - - hio_t *upstream_io = hio_get(loop, sockfd); - assert(upstream_io != NULL); - - cstate->io = upstream_io; - hevent_set_userdata(upstream_io, cstate); - hio_setcb_read(upstream_io, onUdpRecv); - hio_read(upstream_io); - - socket_context_t* resolved_dest_context; - // fill the resolved_dest_context address based on settings - { - socket_context_t *src_ctx = &(c->line->src_ctx); - socket_context_t *dest_ctx = &(c->line->dest_ctx); - connector_state_t *state = STATE(self); - - if (state->dest_addr.status == cdvs_from_source) - { - resolved_dest_context = src_ctx; - // copySocketContextAddr(&resolved_dest_context, &src_ctx); - } - else if (state->dest_addr.status == cdvs_from_dest) - { - resolved_dest_context = dest_ctx; - // copySocketContextAddr(&resolved_dest_context, &dest_ctx); - } - else - { - resolved_dest_context.atype = state->dest_atype; - if (state->dest_atype == kSatDomainName) - { - resolved_dest_context.domain = state->dest_addr.value_ptr; - resolved_dest_context.domain_len = state->dest_domain_len; - resolved_dest_context.resolved = false; - } - else - sockaddr_set_ip(&(resolved_dest_context.addr), state->dest_addr.value_ptr); - } - - if (state->dest_port.status == cdvs_from_source) - sockaddr_set_port(&(resolved_dest_context.addr), sockaddr_port(&(src_ctx->addr))); - else if (state->dest_port.status == cdvs_from_dest) - sockaddr_set_port(&(resolved_dest_context.addr), sockaddr_port(&(dest_ctx->addr))); - else - sockaddr_set_port(&(resolved_dest_context.addr), state->dest_port.value); - } - - if (resolved_dest_context.atype == kSatDomainName) - { - if (! resolved_dest_context.resolved) - { - if (! connectorResolvedomain(&(resolved_dest_context))) - { - free(resolved_dest_context.domain); - cleanup(CSTATE(c)); - CSTATE_MUT(c) = NULL; - reuseContextBuffer(c); - goto fail; - } - } - free(resolved_dest_context.domain); - } - hio_set_peeraddr(cstate->io, &(resolved_dest_context.addr.sa), sockaddr_len(&(resolved_dest_context.addr))); - - destroyContext(c); - } - else if (c->fin) - { - hio_t *io = cstate->io; - hevent_set_userdata(io, NULL); - cleanup(CSTATE(c)); - CSTATE_MUT(c) = NULL; - destroyContext(c); - hio_close(io); - } - } - return; -fail:; - self->dw->downStream(self->dw, newFinContext(c->line)); - destroyContext(c); -} - -void connectorPacketDownStream(tunnel_t *self, context_t *c) -{ - if (c->fin) - { - hio_t *io = CSTATE(c)->io; - hevent_set_userdata(io, NULL); - cleanup(CSTATE(c)); - CSTATE_MUT(c) = NULL; - } - self->dw->downStream(self->dw, c); -} diff --git a/tunnels/adapters/connector/udp/types.h b/tunnels/adapters/connector/udp/types.h new file mode 100644 index 00000000..f63ced70 --- /dev/null +++ b/tunnels/adapters/connector/udp/types.h @@ -0,0 +1,38 @@ +#pragma once +#include "api.h" + +// enable profile to see how much it takes to connect and downstream write +// #define PROFILE 1 + +enum udp_connector_dynamic_value_status +{ + kCdvsEmpty = 0x0, + kCdvsConstant, + kCdvsFromSource, + kCdvsFromDest, +}; + +typedef struct udp_connector_state_s +{ + // settings + bool reuse_addr; + enum domain_strategy domain_strategy; + dynamic_value_t dest_addr_selected; + dynamic_value_t dest_port_selected; + socket_context_t constant_dest_addr; + +} udp_connector_state_t; + +typedef struct udp_connector_con_state_s +{ +#ifdef PROFILE + struct timeval __profile_conenct; +#endif + + tunnel_t *tunnel; + line_t * line; + hio_t * io; + buffer_pool_t * buffer_pool; + + bool established; +} udp_connector_con_state_t; diff --git a/tunnels/adapters/connector/udp/udp_connector.c b/tunnels/adapters/connector/udp/udp_connector.c index 77a9204b..99ccbb4e 100644 --- a/tunnels/adapters/connector/udp/udp_connector.c +++ b/tunnels/adapters/connector/udp/udp_connector.c @@ -2,40 +2,33 @@ #include "types.h" #include "utils/sockutils.h" -static void cleanup(connector_con_state_t *cstate) +static void cleanup(udp_connector_con_state_t *cstate) { - connector_state_t *state = STATE(cstate->tunnel); - if (state->dest_addr.status == cdvs_constant) - { - - }else if (state->dest_addr.status > cdvs_constant){ - free(cstate->line->dest_ctx) - } - + udp_connector_state_t *state = STATE(cstate->tunnel); free(cstate); } -static void onUdpRecv(hio_t *io, void *buf, int readbytes) +static void onRecv(hio_t *io, void *buf, int readbytes) { - connector_con_state_t *cstate = (connector_con_state_t *) (hevent_userdata(io)); + udp_connector_con_state_t *cstate = (udp_connector_con_state_t *) (hevent_userdata(io)); shift_buffer_t *payload = popBuffer(cstate->buffer_pool); setLen(payload, readbytes); writeRaw(payload, buf, readbytes); tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; + line_t * line = (cstate)->line; struct sockaddr *destaddr = hio_peeraddr(io); - enum socket_address_type atype; + enum socket_address_type address_type; if (destaddr->sa_family == AF_INET6) { - atype = kSatIPV6; + address_type = kSatIPV6; } else { - atype = kSatIPV4; + address_type = kSatIPV4; } if (! cstate->established) @@ -58,9 +51,9 @@ static void onUdpRecv(hio_t *io, void *buf, int readbytes) self->packetDownStream(self, context); } -void connectorPacketUpStream(tunnel_t *self, context_t *c) +void upStream(tunnel_t *self, context_t *c) { - connector_con_state_t *cstate = CSTATE(c); + udp_connector_con_state_t *cstate = CSTATE(c); if (c->payload != NULL) { @@ -87,21 +80,17 @@ void connectorPacketUpStream(tunnel_t *self, context_t *c) { if (c->init) { - CSTATE_MUT(c) = malloc(sizeof(connector_con_state_t)); - memset(CSTATE(c), 0, sizeof(connector_con_state_t)); - connector_con_state_t *cstate = CSTATE(c); + udp_connector_state_t *state = STATE(self); - cstate->buffer_pool = buffer_pools[c->line->tid]; - cstate->tunnel = self; - cstate->line = c->line; - cstate->write_paused = false; - cstate->data_queue = NULL; - cstate->finished_queue = NULL; + CSTATE_MUT(c) = malloc(sizeof(udp_connector_con_state_t)); + memset(CSTATE(c), 0, sizeof(udp_connector_con_state_t)); + udp_connector_con_state_t *cstate = CSTATE(c); + cstate->buffer_pool = buffer_pools[c->line->tid]; + cstate->tunnel = self; + cstate->line = c->line; // sockaddr_set_ipport(&(dest->addr),"www.gstatic.com",80); - - hloop_t *loop = loops[c->line->tid]; - + hloop_t * loop = loops[c->line->tid]; sockaddr_u host_addr = {0}; sockaddr_set_ipport(&host_addr, "0.0.0.0", 0); @@ -133,63 +122,46 @@ void connectorPacketUpStream(tunnel_t *self, context_t *c) cstate->io = upstream_io; hevent_set_userdata(upstream_io, cstate); - hio_setcb_read(upstream_io, onUdpRecv); + hio_setcb_read(upstream_io, onRecv); hio_read(upstream_io); - socket_context_t* resolved_dest_context; - // fill the resolved_dest_context address based on settings + socket_context_t *dest_ctx = &(c->line->dest_ctx); + socket_context_t *src_ctx = &(c->line->src_ctx); + switch (state->dest_addr_selected.status) { - socket_context_t *src_ctx = &(c->line->src_ctx); - socket_context_t *dest_ctx = &(c->line->dest_ctx); - connector_state_t *state = STATE(self); - - if (state->dest_addr.status == cdvs_from_source) - { - resolved_dest_context = src_ctx; - // copySocketContextAddr(&resolved_dest_context, &src_ctx); - } - else if (state->dest_addr.status == cdvs_from_dest) - { - resolved_dest_context = dest_ctx; - // copySocketContextAddr(&resolved_dest_context, &dest_ctx); - } - else - { - resolved_dest_context.atype = state->dest_atype; - if (state->dest_atype == kSatDomainName) - { - resolved_dest_context.domain = state->dest_addr.value_ptr; - resolved_dest_context.domain_len = state->dest_domain_len; - resolved_dest_context.resolved = false; - } - else - sockaddr_set_ip(&(resolved_dest_context.addr), state->dest_addr.value_ptr); - } - - if (state->dest_port.status == cdvs_from_source) - sockaddr_set_port(&(resolved_dest_context.addr), sockaddr_port(&(src_ctx->addr))); - else if (state->dest_port.status == cdvs_from_dest) - sockaddr_set_port(&(resolved_dest_context.addr), sockaddr_port(&(dest_ctx->addr))); - else - sockaddr_set_port(&(resolved_dest_context.addr), state->dest_port.value); + case kCdvsFromSource: + copySocketContextAddr(&dest_ctx, &src_ctx); + break; + case kCdvsConstant: + copySocketContextAddr(&dest_ctx, &(state->constant_dest_addr)); + break; + default: + case kCdvsFromDest: + break; + } + switch (state->dest_port_selected.status) + { + case kCdvsFromSource: + copySocketContextPort(&dest_ctx, &src_ctx); + break; + case kCdvsConstant: + copySocketContextPort(&dest_ctx, &(state->constant_dest_addr)); + break; + default: + case kCdvsFromDest: + break; } - if (resolved_dest_context.atype == kSatDomainName) + if (dest_ctx->address_type == kSatDomainName && ! dest_ctx->domain_resolved) { - if (! resolved_dest_context.resolved) + if (! resolveContextSync(dest_ctx)) { - if (! connectorResolvedomain(&(resolved_dest_context))) - { - free(resolved_dest_context.domain); - cleanup(CSTATE(c)); - CSTATE_MUT(c) = NULL; - reuseContextBuffer(c); - goto fail; - } + cleanup(CSTATE(c)); + CSTATE_MUT(c) = NULL; + goto fail; } - free(resolved_dest_context.domain); } - hio_set_peeraddr(cstate->io, &(resolved_dest_context.addr.sa), sockaddr_len(&(resolved_dest_context.addr))); + hio_set_peeraddr(cstate->io, &(dest_ctx->addr.sa), sockaddr_len(&(dest_ctx->addr))); destroyContext(c); } @@ -209,91 +181,82 @@ fail:; destroyContext(c); } -void connectorPacketDownStream(tunnel_t *self, context_t *c) +void downStream(tunnel_t *self, context_t *c) { + udp_connector_con_state_t *cstate = CSTATE(c); + if (c->fin) { - hio_t *io = CSTATE(c)->io; + hio_t *io = cstate->io; hevent_set_userdata(io, NULL); - cleanup(CSTATE(c)); + cleanup(cstate); CSTATE_MUT(c) = NULL; } self->dw->downStream(self->dw, c); } - - -tunnel_t *newTcpConnector(node_instance_context_t *instance_info) +tunnel_t *newUdpConnector(node_instance_context_t *instance_info) { - tcp_connector_state_t *state = malloc(sizeof(tcp_connector_state_t)); - memset(state, 0, sizeof(tcp_connector_state_t)); + udp_connector_state_t *state = malloc(sizeof(udp_connector_state_t)); + memset(state, 0, sizeof(udp_connector_state_t)); const cJSON *settings = instance_info->node_settings_json; if (! (cJSON_IsObject(settings) && settings->child != NULL)) { - LOGF("JSON Error: TcpConnector->settings (object field) : The object was empty or invalid"); + LOGF("JSON Error: UdpConnector->settings (object field) : The object was empty or invalid"); return NULL; } + + getBoolFromJsonObject(&(state->reuse_addr), settings, "reuseaddr"); - const cJSON *tcp_settings = cJSON_GetObjectItemCaseSensitive(settings, "tcp"); - if ((cJSON_IsObject(tcp_settings) && settings->child != NULL)) - { - getBoolFromJsonObject(&(state->tcp_no_delay), tcp_settings, "nodelay"); - getBoolFromJsonObject(&(state->tcp_fast_open), tcp_settings, "fastopen"); - getBoolFromJsonObject(&(state->reuse_addr), tcp_settings, "reuseaddr"); - int ds = 0; - getIntFromJsonObject(&ds, tcp_settings, "domain-strategy"); - state->domain_strategy = ds; - } - else - { - // memset set everything to 0... - } - - state->dest_addr = + state->dest_addr_selected = parseDynamicStrValueFromJsonObject(settings, "address", 2, "src_context->address", "dest_context->address"); - if (state->dest_addr.status == kDvsEmpty) + if (state->dest_addr_selected.status == kDvsEmpty) { - LOGF("JSON Error: TcpConnector->settings->address (string field) : The vaule was empty or invalid"); + LOGF("JSON Error: UdpConnector->settings->address (string field) : The vaule was empty or invalid"); return NULL; } + if (state->dest_addr_selected.status == kDvsConstant) + { + state->constant_dest_addr.address_type = getHostAddrType(state->dest_addr_selected.value_ptr); + allocateDomainBuffer(&(state->constant_dest_addr)); + setSocketContextDomain(&(state->constant_dest_addr), state->dest_addr_selected.value_ptr, + strlen(state->dest_addr_selected.value_ptr)); + } - state->dest_port = + state->dest_port_selected = parseDynamicNumericValueFromJsonObject(settings, "port", 2, "src_context->port", "dest_context->port"); - if (state->dest_port.status == kDvsEmpty) + if (state->dest_port_selected.status == kDvsEmpty) { - LOGF("JSON Error: TcpConnector->settings->port (number field) : The vaule was empty or invalid"); + LOGF("JSON Error: UdpConnector->settings->port (number field) : The vaule was empty or invalid"); return NULL; } - if (state->dest_addr.status == kDvsConstant) + if (state->dest_port_selected.status == kDvsConstant) { - state->dest_atype = getHostAddrType(state->dest_addr.value_ptr); - state->dest_domain_len = strlen(state->dest_addr.value_ptr); + sockaddr_set_port(&(state->constant_dest_addr), state->dest_port_selected.value); } - tunnel_t *t = newTunnel(); t->state = state; t->upStream = &upStream; t->downStream = &downStream; atomic_thread_fence(memory_order_release); - return t; } -api_result_t apiTcpConnector(tunnel_t *self, const char *msg) +api_result_t apiUdpConnector(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); return (api_result_t){0}; } -tunnel_t *destroyTcpConnector(tunnel_t *self) +tunnel_t *destroyUdpConnector(tunnel_t *self) { (void) (self); return NULL; } -tunnel_metadata_t getMetadataTcpConnector() +tunnel_metadata_t getMetadataUdpConnector() { return (tunnel_metadata_t){.version = 0001, .flags = 0x0}; } \ No newline at end of file diff --git a/tunnels/adapters/listener/tcp/tcp_listener.c b/tunnels/adapters/listener/tcp/tcp_listener.c index f8a4bdaf..9f895c97 100644 --- a/tunnels/adapters/listener/tcp/tcp_listener.c +++ b/tunnels/adapters/listener/tcp/tcp_listener.c @@ -1,50 +1,48 @@ #include "tcp_listener.h" #include "buffer_pool.h" -#include "managers/socket_manager.h" #include "loggers/network_logger.h" +#include "managers/socket_manager.h" #include "utils/jsonutils.h" -#include #include +#include // enable profile to see some time info // #define PROFILE 1 -#define STATE(x) ((tcp_listener_state_t *)((x)->state)) -#define CSTATE(x) ((tcp_listener_con_state_t *)((((x)->line->chains_state)[self->chain_index]))) -#define CSTATE_MUT(x) ((x)->line->chains_state)[self->chain_index] - typedef struct tcp_listener_state_s { // settings - char *address; - int multiport_backend; + char * address; + int multiport_backend; uint16_t port_min; uint16_t port_max; - char **white_list_raddr; - char **black_list_raddr; - bool fast_open; - bool no_delay; + char ** white_list_raddr; + char ** black_list_raddr; + bool fast_open; + bool no_delay; } tcp_listener_state_t; typedef struct tcp_listener_con_state_s { - hloop_t *loop; - tunnel_t *tunnel; - line_t *line; - hio_t *io; + hloop_t * loop; + tunnel_t * tunnel; + line_t * line; + hio_t * io; context_queue_t *finished_queue; context_queue_t *data_queue; - buffer_pool_t *buffer_pool; - bool write_paused; - bool established; - bool first_packet_sent; + buffer_pool_t * buffer_pool; + bool write_paused; + bool established; + bool first_packet_sent; } tcp_listener_con_state_t; -static void cleanup(tcp_listener_con_state_t *cstate) +static void cleanup(tcp_listener_con_state_t *cstate, bool write_queue) { if (cstate->io) + { hevent_set_userdata(cstate->io, NULL); + } hio_t *last_resumed_io = NULL; @@ -56,10 +54,11 @@ static void cleanup(tcp_listener_con_state_t *cstate) last_resumed_io = cw->src_io; hio_read(cw->src_io); } - if (cw->payload) + if (write_queue) { - reuseContextBuffer(cw); + hio_write(cstate->io, rawBuf(cw->payload), bufLen(cw->payload)); } + reuseContextBuffer(cw); destroyContext(cw); } @@ -80,29 +79,30 @@ static void cleanup(tcp_listener_con_state_t *cstate) free(cstate); } -static bool resume_write_queue(tcp_listener_con_state_t *cstate) +static bool resumeWriteQueue(tcp_listener_con_state_t *cstate) { - context_queue_t *data_queue = (cstate)->data_queue; + context_queue_t *data_queue = (cstate)->data_queue; context_queue_t *finished_queue = (cstate)->finished_queue; - hio_t *io = cstate->io; + hio_t * io = cstate->io; while (contextQueueLen(data_queue) > 0) { - context_t *cw = contextQueuePop(data_queue); - - int bytes = bufLen(cw->payload); - int nwrite = hio_write(io, rawBuf(cw->payload), bytes); + context_t * cw = contextQueuePop(data_queue); + unsigned int bytes = bufLen(cw->payload); + int nwrite = hio_write(io, rawBuf(cw->payload), bytes); reuseBuffer(cstate->buffer_pool, cw->payload); cw->payload = NULL; contextQueuePush(cstate->finished_queue, cw); if (nwrite >= 0 && nwrite < bytes) + { return false; // write pending + } } // data queue is empty hio_t *last_resumed_io = NULL; while (contextQueueLen(finished_queue) > 0) { - context_t *cw = contextQueuePop(finished_queue); - hio_t *upstream_io = cw->src_io; + context_t *cw = contextQueuePop(finished_queue); + hio_t * upstream_io = cw->src_io; if (upstream_io != NULL && (last_resumed_io != upstream_io)) { last_resumed_io = upstream_io; @@ -113,33 +113,39 @@ static bool resume_write_queue(tcp_listener_con_state_t *cstate) return true; } -static void on_write_complete(hio_t * restrict io, const void * restrict buf, int writebytes) +static void onWriteComplete(hio_t *restrict io, const void *restrict buf, int writebytes) { + (void) buf; + (void) writebytes; // resume the read on other end of the connection - tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *)(hevent_userdata(io)); + tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *) (hevent_userdata(io)); if (cstate == NULL) + { return; + } if (hio_write_is_complete(io)) { hio_setcb_write(cstate->io, NULL); cstate->write_paused = false; - context_queue_t *data_queue = cstate->data_queue; + context_queue_t *data_queue = cstate->data_queue; context_queue_t *finished_queue = cstate->finished_queue; if (contextQueueLen(data_queue) > 0) - if (!resume_write_queue(cstate)) + { + if (! resumeWriteQueue(cstate)) { - hio_setcb_write(cstate->io, on_write_complete); + hio_setcb_write(cstate->io, onWriteComplete); cstate->write_paused = true; return; } + } hio_t *last_resumed_io = NULL; while (contextQueueLen(finished_queue) > 0) { - context_t *cw = contextQueuePop(finished_queue); - hio_t *upstream_io = cw->src_io; + context_t *cw = contextQueuePop(finished_queue); + hio_t * upstream_io = cw->src_io; if (upstream_io != NULL && (last_resumed_io != upstream_io)) { last_resumed_io = upstream_io; @@ -152,7 +158,6 @@ static void on_write_complete(hio_t * restrict io, const void * restrict buf, in static inline void upStream(tunnel_t *self, context_t *c) { - if (c->payload != NULL) { #ifdef PROFILE @@ -164,11 +169,10 @@ static inline void upStream(tunnel_t *self, context_t *c) self->up->upStream(self->up, c); } gettimeofday(&tv2, NULL); - double time_spent = (double)(tv2.tv_usec - tv1.tv_usec) / 1000000 + (double)(tv2.tv_sec - tv1.tv_sec); - LOGD("TcpListener: upstream took %d ms", (int)(time_spent * 1000)); + double time_spent = (double) (tv2.tv_usec - tv1.tv_usec) / 1000000 + (double) (tv2.tv_sec - tv1.tv_sec); + LOGD("TcpListener: upstream took %d ms", (int) (time_spent * 1000)); return; } - #endif } else @@ -177,7 +181,7 @@ static inline void upStream(tunnel_t *self, context_t *c) { tcp_listener_con_state_t *cstate = CSTATE(c); - cleanup(cstate); + cleanup(cstate, false); CSTATE_MUT(c) = NULL; destroyLine(c->line); } @@ -195,23 +199,27 @@ static inline void downStream(tunnel_t *self, context_t *c) if (cstate->write_paused) { if (c->src_io) + { hio_read_stop(c->src_io); + } contextQueuePush(cstate->data_queue, c); } else { - int bytes = bufLen(c->payload); - int nwrite = hio_write(cstate->io, rawBuf(c->payload), bytes); + unsigned int bytes = bufLen(c->payload); + int nwrite = hio_write(cstate->io, rawBuf(c->payload), bytes); if (nwrite >= 0 && nwrite < bytes) { if (c->src_io) + { hio_read_stop(c->src_io); + } reuseBuffer(cstate->buffer_pool, c->payload); c->payload = NULL; contextQueuePush(cstate->finished_queue, c); cstate->write_paused = true; - hio_setcb_write(cstate->io, on_write_complete); + hio_setcb_write(cstate->io, onWriteComplete); } else { @@ -228,122 +236,125 @@ static inline void downStream(tunnel_t *self, context_t *c) { cstate->established = true; destroyContext(c); - return; } if (c->fin) { - hio_t *io = cstate->io; - cleanup(cstate); + hio_t *io = cstate->io; CSTATE_MUT(c) = NULL; + cleanup(cstate, true); destroyLine(c->line); destroyContext(c); hio_close(io); - return; } } } - - -static void on_recv(hio_t *io, void *buf, int readbytes) +static void onRecv(hio_t *io, void *buf, int readbytes) { - tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *)(hevent_userdata(io)); + tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *) (hevent_userdata(io)); if (cstate == NULL) + { return; + } shift_buffer_t *payload = popBuffer(cstate->buffer_pool); setLen(payload, readbytes); - writeRaw(payload,buf,readbytes); + writeRaw(payload, buf, readbytes); - tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; - bool *first_packet_sent = &((cstate)->first_packet_sent); + tunnel_t *self = (cstate)->tunnel; + line_t * line = (cstate)->line; + bool * first_packet_sent = &((cstate)->first_packet_sent); context_t *context = newContext(line); - context->src_io = io; - context->payload = payload; - if (!(*first_packet_sent)) + context->src_io = io; + context->payload = payload; + if (! (*first_packet_sent)) { *first_packet_sent = true; - context->first = true; + context->first = true; } self->upStream(self, context); } -static void on_close(hio_t *io) +static void onClose(hio_t *io) { - tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *)(hevent_userdata(io)); + tcp_listener_con_state_t *cstate = (tcp_listener_con_state_t *) (hevent_userdata(io)); if (cstate != NULL) - LOGD("TcpListener: received close for FD:%x ", - (int)hio_fd(io)); + { + LOGD("TcpListener: received close for FD:%x ", (int) hio_fd(io)); + } else - LOGD("TcpListener: sent close for FD:%x ", - (int)hio_fd(io)); + { + LOGD("TcpListener: sent close for FD:%x ", (int) hio_fd(io)); + } if (cstate != NULL) { - tunnel_t *self = (cstate)->tunnel; - line_t *line = (cstate)->line; + tunnel_t * self = (cstate)->tunnel; + line_t * line = (cstate)->line; context_t *context = newFinContext(line); self->upStream(self, context); } } -void on_inbound_connected(hevent_t *ev) +void onInboundConnected(hevent_t *ev) { - hloop_t *loop = ev->loop; - socket_accept_result_t *data = (socket_accept_result_t *)hevent_userdata(ev); - hio_t *io = data->io; - size_t tid = data->tid; + hloop_t * loop = ev->loop; + socket_accept_result_t *data = (socket_accept_result_t *) hevent_userdata(ev); + hio_t * io = data->io; + size_t tid = data->tid; hio_attach(loop, io); char localaddrstr[SOCKADDR_STRLEN] = {0}; - char peeraddrstr[SOCKADDR_STRLEN] = {0}; - - tunnel_t *self = data->tunnel; - line_t *line = newLine(tid); - tcp_listener_con_state_t *cstate = malloc(sizeof(tcp_listener_con_state_t)); - cstate->line = line; - cstate->buffer_pool = buffer_pools[tid]; - cstate->finished_queue = newContextQueue(cstate->buffer_pool); - cstate->data_queue = newContextQueue(cstate->buffer_pool); - cstate->io = io; - cstate->tunnel = self; - cstate->write_paused = false; - cstate->established = false; - cstate->first_packet_sent = false; + char peeraddrstr[SOCKADDR_STRLEN] = {0}; + + tunnel_t * self = data->tunnel; + line_t * line = newLine(tid); + tcp_listener_con_state_t *cstate = malloc(sizeof(tcp_listener_con_state_t)); + cstate->line = line; + cstate->buffer_pool = buffer_pools[tid]; + cstate->finished_queue = newContextQueue(cstate->buffer_pool); + cstate->data_queue = newContextQueue(cstate->buffer_pool); + cstate->io = io; + cstate->tunnel = self; + cstate->write_paused = false; + cstate->established = false; + cstate->first_packet_sent = false; line->chains_state[self->chain_index] = cstate; - line->src_ctx.protocol = data->proto; - line->src_ctx.addr.sa = *hio_peeraddr(io); + line->src_ctx.address_protocol = data->proto; + line->src_ctx.addr.sa = *hio_peeraddr(io); - // sockaddr_set_port(&(line->src_ctx.addr), data->real_localport == 0 ? sockaddr_port((sockaddr_u *)hio_localaddr(io)) : data->real_localport); + // sockaddr_set_port(&(line->src_ctx.addr), data->real_localport == 0 ? sockaddr_port((sockaddr_u + // *)hio_localaddr(io)) : data->real_localport); sockaddr_set_port(&(line->src_ctx.addr), data->real_localport); - line->src_ctx.atype = line->src_ctx.addr.sa.sa_family == AF_INET ? kSatIPV4 : kSatIPV6; + line->src_ctx.address_type = line->src_ctx.addr.sa.sa_family == AF_INET ? kSatIPV4 : kSatIPV6; hevent_set_userdata(io, cstate); struct sockaddr log_localaddr = *hio_localaddr(io); - sockaddr_set_port((sockaddr_u *)&(log_localaddr), data->real_localport); + sockaddr_set_port((sockaddr_u *) &(log_localaddr), data->real_localport); - LOGD("TcpListener: Accepted FD:%x [%s] <= [%s]", - (int)hio_fd(io), - SOCKADDR_STR(&log_localaddr, localaddrstr), + LOGD("TcpListener: Accepted FD:%x [%s] <= [%s]", (int) hio_fd(io), SOCKADDR_STR(&log_localaddr, localaddrstr), SOCKADDR_STR(hio_peeraddr(io), peeraddrstr)); free(data); // io->upstream_io = NULL; - hio_setcb_read(io, on_recv); - hio_setcb_close(io, on_close); - // hio_setcb_write(io, on_write_complete); not required here - if (resume_write_queue(cstate)) + hio_setcb_read(io, onRecv); + hio_setcb_close(io, onClose); + // hio_setcb_write(io, onWriteComplete); not required here + if (resumeWriteQueue(cstate)) + { cstate->write_paused = false; + } else - hio_setcb_write(cstate->io, on_write_complete); + { + hio_setcb_write(cstate->io, onWriteComplete); + } // send the init packet context_t *context = newInitContext(line); - context->src_io = io; + context->src_io = io; self->upStream(self, context); - if ((line->chains_state)[0] == NULL) + if (! isAlive(line)) { LOGW("TcpListener: socket just got closed by upstream before anything happend"); return; @@ -353,56 +364,61 @@ void on_inbound_connected(hevent_t *ev) tunnel_t *newTcpListener(node_instance_context_t *instance_info) { - tunnel_t *t = newTunnel(); - t->state = malloc(sizeof(tcp_listener_state_t)); - memset(t->state, 0, sizeof(tcp_listener_state_t)); + tcp_listener_state_t *state = malloc(sizeof(tcp_listener_state_t)); + memset(state, 0, sizeof(tcp_listener_state_t)); const cJSON *settings = instance_info->node_settings_json; - if (!(cJSON_IsObject(settings) && settings->child != NULL)) + if (! (cJSON_IsObject(settings) && settings->child != NULL)) { LOGF("JSON Error: TcpListener->settings (object field) : The object was empty or invalid"); return NULL; } - getBoolFromJsonObject(&(STATE(t)->no_delay), settings, "nodelay"); + getBoolFromJsonObject(&(state->no_delay), settings, "nodelay"); - if (!getStringFromJsonObject(&(STATE(t)->address), settings, "address")) + if (! getStringFromJsonObject(&(state->address), settings, "address")) { LOGF("JSON Error: TcpListener->settings->address (string field) : The data was empty or invalid"); return NULL; } - int multiport_backend = multiport_backend_nothing; + int multiport_backend = kMultiportBackendNothing; const cJSON *port = cJSON_GetObjectItemCaseSensitive(settings, "port"); if ((cJSON_IsNumber(port) && (port->valuedouble != 0))) { // single port given as a number - STATE(t)->port_min = port->valuedouble; - STATE(t)->port_max = port->valuedouble; + state->port_min = port->valuedouble; + state->port_max = port->valuedouble; } else { - multiport_backend = multiport_backend_default; + multiport_backend = kMultiportBackendDefault; if (cJSON_IsArray(port)) { const cJSON *port_minmax; - int i = 0; + int i = 0; // multi port given cJSON_ArrayForEach(port_minmax, port) { - if (!(cJSON_IsNumber(port_minmax) && (port_minmax->valuedouble != 0))) + if (! (cJSON_IsNumber(port_minmax) && (port_minmax->valuedouble != 0))) { - LOGF("JSON Error: TcpListener->settings->port (number-or-array field) : The data was empty or invalid"); + LOGF("JSON Error: TcpListener->settings->port (number-or-array field) : The data was empty or " + "invalid"); LOGF("JSON Error: MultiPort parsing failed"); return NULL; } if (i == 0) - STATE(t)->port_min = port_minmax->valuedouble; + { + state->port_min = port_minmax->valuedouble; + } else if (i == 1) - STATE(t)->port_max = port_minmax->valuedouble; + { + state->port_max = port_minmax->valuedouble; + } else { - LOGF("JSON Error: TcpListener->settings->port (number-or-array field) : The data was empty or invalid"); + LOGF("JSON Error: TcpListener->settings->port (number-or-array field) : The data was empty or " + "invalid"); LOGF("JSON Error: MultiPort port range has more data than expected"); return NULL; } @@ -410,11 +426,16 @@ tunnel_t *newTcpListener(node_instance_context_t *instance_info) i++; } - dynamic_value_t dy_mb = parseDynamicStrValueFromJsonObject(settings, "multiport-backend", 2, "iptables", "socket"); + dynamic_value_t dy_mb = + parseDynamicStrValueFromJsonObject(settings, "multiport-backend", 2, "iptables", "socket"); if (dy_mb.status == 2) - multiport_backend = multiport_backend_iptables; + { + multiport_backend = kMultiportBackendIptables; + } if (dy_mb.status == 3) - multiport_backend = multiport_backend_sockets; + { + multiport_backend = kMultiportBackendSockets; + } } else { @@ -425,7 +446,7 @@ tunnel_t *newTcpListener(node_instance_context_t *instance_info) socket_filter_option_t filter_opt = {0}; filter_opt.white_list_raddr = NULL; - const cJSON *wlist = cJSON_GetObjectItemCaseSensitive(settings, "whitelist"); + const cJSON *wlist = cJSON_GetObjectItemCaseSensitive(settings, "whitelist"); if (cJSON_IsArray(wlist)) { size_t len = cJSON_GetArraySize(wlist); @@ -433,15 +454,17 @@ tunnel_t *newTcpListener(node_instance_context_t *instance_info) { char **list = malloc(sizeof(char *) * (len + 1)); memset(list, 0, sizeof(char *) * (len + 1)); - list[len] = 0x0; - int i = 0; + list[len] = 0x0; + int i = 0; const cJSON *list_item = NULL; cJSON_ArrayForEach(list_item, wlist) { - int list_item_len = 0; - if (!getStringFromJson(&(list[i]), list_item) || (list_item_len = strlen(list[i])) < 4) + unsigned int list_item_len = 0; + if (! getStringFromJson(&(list[i]), list_item) || (list_item_len = strlen(list[i])) < 4) { - LOGF("JSON Error: TcpListener->settings->whitelist (array of strings field) index %d : The data was empty or invalid", i); + LOGF("JSON Error: TcpListener->settings->whitelist (array of strings field) index %d : The data " + "was empty or invalid", + i); exit(1); } char *slash = strchr(list[i], '/'); @@ -451,29 +474,31 @@ tunnel_t *newTcpListener(node_instance_context_t *instance_info) exit(1); } *slash = '\0'; - if (!is_ipaddr(list[i])) + if (! is_ipaddr(list[i])) { LOGF("Value Error: whitelist %d : \"%s\" is not a valid ip address", i, list[i]); exit(1); } bool is_v4 = is_ipv4(list[i]); - *slash = '/'; + *slash = '/'; - char *subnet_part = slash + 1; - int prefix_length = atoi(subnet_part); + char *subnet_part = slash + 1; + int prefix_length = atoi(subnet_part); if (is_v4 && (prefix_length < 0 || prefix_length > 32)) { - LOGF("Value Error: Invalid subnet mask length for ipv4 %s prefix %d must be between 0 and 32", list[i], prefix_length); + LOGF("Value Error: Invalid subnet mask length for ipv4 %s prefix %d must be between 0 and 32", + list[i], prefix_length); exit(1); } - if (!is_v4 && (prefix_length < 0 || prefix_length > 128)) + if (! is_v4 && (prefix_length < 0 || prefix_length > 128)) { - LOGF("Value Error: Invalid subnet mask length for ipv6 %s prefix %d must be between 0 and 128", list[i], prefix_length); + LOGF("Value Error: Invalid subnet mask length for ipv6 %s prefix %d must be between 0 and 128", + list[i], prefix_length); exit(1); } - if (prefix_length > 0 && slash + 2 + (int)(log10(prefix_length)) < list[i] + list_item_len) + if (prefix_length > 0 && slash + 2 + (int) (log10(prefix_length)) < list[i] + list_item_len) { LOGW("the value \"%s\" looks incorrect, it has more data than ip/prefix", list[i]); } @@ -484,29 +509,33 @@ tunnel_t *newTcpListener(node_instance_context_t *instance_info) } } - filter_opt.host = STATE(t)->address; - filter_opt.port_min = STATE(t)->port_min; - filter_opt.port_max = STATE(t)->port_max; - filter_opt.proto = socket_protocol_tcp; + filter_opt.host = state->address; + filter_opt.port_min = state->port_min; + filter_opt.port_max = state->port_max; + filter_opt.proto = kSapTcp; filter_opt.multiport_backend = multiport_backend; - filter_opt.black_list_raddr = NULL; - - registerSocketAcceptor(t, filter_opt, on_inbound_connected); + filter_opt.black_list_raddr = NULL; - t->upStream = &upStream; - t->downStream = &downStream; + tunnel_t *t = newTunnel(); + t->state = state; + t->upStream = &upStream; + t->downStream = &downStream; + registerSocketAcceptor(t, filter_opt, onInboundConnected); atomic_thread_fence(memory_order_release); return t; } -api_result_t apiTcpListener(tunnel_t *self, char *msg) +api_result_t apiTcpListener(tunnel_t *self, const char *msg) { - (void)(self); (void)(msg); return (api_result_t){0}; // TODO + (void) (self); + (void) (msg); + return (api_result_t){0}; // TODO(root): } tunnel_t *destroyTcpListener(tunnel_t *self) { + (void) (self); return NULL; } tunnel_metadata_t getMetadataTcpListener() diff --git a/tunnels/adapters/listener/tcp/tcp_listener.h b/tunnels/adapters/listener/tcp/tcp_listener.h index edf95826..5b5a2c9d 100644 --- a/tunnels/adapters/listener/tcp/tcp_listener.h +++ b/tunnels/adapters/listener/tcp/tcp_listener.h @@ -9,6 +9,6 @@ #define NODE_TCP_LISTINER tunnel_t * newTcpListener(node_instance_context_t *instance_info); -api_result_t apiTcpListener(tunnel_t *self, char *msg); +api_result_t apiTcpListener(tunnel_t *self, const char *msg); tunnel_t * destroyTcpListener(tunnel_t *self); tunnel_metadata_t getMetadataTcpListener(); diff --git a/tunnels/server/openssl/openssl_server.c b/tunnels/server/openssl/openssl_server.c index 4c64e58e..75d5c604 100644 --- a/tunnels/server/openssl/openssl_server.c +++ b/tunnels/server/openssl/openssl_server.c @@ -41,16 +41,12 @@ typedef struct oss_server_con_state_s bool fallback_first_sent; buffer_stream_t *fallback_buf; line_t * fallback_line; - // htimer_t *fallback_timer; - - SSL *ssl; - - BIO *rbio; - BIO *wbio; - - bool first_sent; - bool init_sent; - int reply_sent_tit; + SSL * ssl; + BIO * rbio; + BIO * wbio; + bool first_sent; + bool init_sent; + int reply_sent_tit; } oss_server_con_state_t; @@ -60,10 +56,10 @@ struct timer_eventdata context_t *c; }; -static int onAlpnSelect(SSL *ssl,const unsigned char **out, unsigned char *outlen, const unsigned char *in, unsigned int inlen, - void *arg) +static int onAlpnSelect(SSL *ssl, const unsigned char **out, unsigned char *outlen, const unsigned char *in, + unsigned int inlen, void *arg) { - (void)ssl; + (void) ssl; assert(inlen != 0); oss_server_state_t *state = arg; unsigned int offset = 0; @@ -586,7 +582,6 @@ failed_after_establishment:; self->dw->downStream(self->dw, fail_context); } - tunnel_t *newOpenSSLServer(node_instance_context_t *instance_info) { oss_server_state_t *state = malloc(sizeof(oss_server_state_t)); @@ -709,10 +704,10 @@ tunnel_t *newOpenSSLServer(node_instance_context_t *instance_info) t->state = state; if (state->fallback != NULL) { - state->fallback->dw = t; + chainDown(t,state->fallback); } - t->upStream = &upStream; - t->downStream = &downStream; + t->upStream = &upStream; + t->downStream = &downStream; atomic_thread_fence(memory_order_release); return t; } @@ -721,13 +716,12 @@ api_result_t apiOpenSSLServer(tunnel_t *self, const char *msg) { (void) (self); (void) (msg); - return (api_result_t){0}; // TODO(root): + return (api_result_t){0}; } tunnel_t *destroyOpenSSLServer(tunnel_t *self) { (void) (self); - return NULL; } diff --git a/tunnels/server/trojan/socks/trojan_socks_server.c b/tunnels/server/trojan/socks/trojan_socks_server.c index 46c86081..9810af79 100644 --- a/tunnels/server/trojan/socks/trojan_socks_server.c +++ b/tunnels/server/trojan/socks/trojan_socks_server.c @@ -52,7 +52,7 @@ static void makeUdpPacketAddress(context_t *c) shiftl(c->payload, 2); // port writeUI16(c->payload, port); - switch (c->line->dest_ctx.atype) + switch (c->line->dest_ctx.address_type) { case kSatIPV6: shiftl(c->payload, 16); @@ -80,7 +80,7 @@ static bool parseAddress(context_t *c) enum trojan_cmd command = ((unsigned char *) rawBuf(c->payload))[0]; enum trojan_atyp address_type = ((unsigned char *) rawBuf(c->payload))[1]; dest_context->acmd = (enum socket_address_cmd)(command); - dest_context->atype = (enum socket_address_type)(address_type); + dest_context->address_type = (enum socket_address_type)(address_type); shiftr(c->payload, 2); switch (command) @@ -205,11 +205,11 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, return true; } - uint8_t atype = bufferStreamViewByteAt(bstream, 0); + uint8_t address_type = bufferStreamViewByteAt(bstream, 0); uint16_t packet_size = 0; uint16_t full_len = 0; uint8_t domain_len = 0; - switch (atype) + switch (address_type) { case kTrojanatypIpV4: // address_type | DST.ADDR | DST.PORT | Length | CRLF | Payload @@ -297,11 +297,11 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, shiftr(c->payload, 1); - switch (atype) + switch (address_type) { case kTrojanatypIpV4: dest_context->addr.sa.sa_family = AF_INET; - dest_context->atype = kSatIPV4; + dest_context->address_type = kSatIPV4; memcpy(&(dest_context->addr.sin.sin_addr), rawBuf(c->payload), 4); shiftr(c->payload, 4); if (! cstate->first_sent) @@ -311,7 +311,7 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, break; case kTrojanatypDomainname: - dest_context->atype = kSatDomainName; + dest_context->address_type = kSatDomainName; // size_t addr_len = (unsigned char)(rawBuf(c->payload)[0]); shiftr(c->payload, 1); if (dest_context->domain == NULL) @@ -330,7 +330,7 @@ static bool processUdp(tunnel_t *self, trojan_socks_server_con_state_t *cstate, break; case kTrojanatypIpV6: - dest_context->atype = kSatIPV6; + dest_context->address_type = kSatIPV6; dest_context->addr.sa.sa_family = AF_INET6; memcpy(&(dest_context->addr.sin.sin_addr), rawBuf(c->payload), 16); shiftr(c->payload, 16); diff --git a/tunnels/server/wolfssl/wolfssl_server.c b/tunnels/server/wolfssl/wolfssl_server.c index c54103b8..e654d09c 100644 --- a/tunnels/server/wolfssl/wolfssl_server.c +++ b/tunnels/server/wolfssl/wolfssl_server.c @@ -26,7 +26,6 @@ typedef struct wssl_server_state_s ssl_ctx_t ssl_context; alpn_item_t *alpns; unsigned int alpns_length; - // settings tunnel_t *fallback; int fallback_delay; @@ -43,16 +42,12 @@ typedef struct wssl_server_con_state_s bool fallback_first_sent; buffer_stream_t *fallback_buf; line_t * fallback_line; - // htimer_t *fallback_timer; - - SSL *ssl; - - BIO *rbio; - BIO *wbio; - - bool first_sent; - bool init_sent; - int reply_sent_tit; + SSL * ssl; + BIO * rbio; + BIO * wbio; + bool first_sent; + bool init_sent; + int reply_sent_tit; } wssl_server_con_state_t; @@ -110,7 +105,7 @@ static enum sslstatus getSslstatus(SSL *ssl, int n) } } -// todo (tls in tls padding) wolfssl dose not support it but since its standard in tls 1.3, there must be a way +// todo (tls in tls padding) wolfssl dose not support it but since its standard in tls 1.3, there must be a way // static size_t paddingDecisionCb(SSL *ssl, int type, size_t len, void *arg) // { // (void) ssl; @@ -584,7 +579,6 @@ failed_after_establishment:; self->dw->downStream(self->dw, fail_context); } - tunnel_t *newWolfSSLServer(node_instance_context_t *instance_info) { wssl_server_state_t *state = malloc(sizeof(wssl_server_state_t)); @@ -712,10 +706,10 @@ tunnel_t *newWolfSSLServer(node_instance_context_t *instance_info) t->state = state; if (state->fallback != NULL) { - state->fallback->dw = t; + chainDown(t,state->fallback); } - t->upStream = &upStream; - t->downStream = &downStream; + t->upStream = &upStream; + t->downStream = &downStream; atomic_thread_fence(memory_order_release); return t; } diff --git a/ww/basic_types.h b/ww/basic_types.h index 329b5f17..52492fd5 100644 --- a/ww/basic_types.h +++ b/ww/basic_types.h @@ -103,18 +103,19 @@ enum socket_address_type enum socket_address_protocol { - kSapTcp = 0X1, - kSapUdp = 0X3, + kSapTcp = IPPROTO_TCP, + kSapUdp = IPPROTO_UDP, }; // all data we need to connect to somewhere typedef struct socket_context_s { - enum socket_address_protocol aproto; - enum socket_address_type atype; + enum socket_address_protocol address_protocol; + enum socket_address_type address_type; + sockaddr_u addr; char * domain; unsigned int domain_len; - bool resolved; - sockaddr_u addr; - + bool domain_constant; + enum domain_strategy domain_strategy; + bool domain_resolved; } socket_context_t; diff --git a/ww/http_def.h b/ww/http_def.h index 82d49367..4f88f48a 100644 --- a/ww/http_def.h +++ b/ww/http_def.h @@ -64,7 +64,7 @@ enum http_parser_state { XX(412, PreconditionFailed, Precondition Failed) \ XX(413, PayloadTooLarge, Payload Too Large) \ XX(414, UriTooLong, URI Too Long) \ - XX(415, UnsupportedMediaType, Unsupported Media Type) \ + XX(415, UnsupportedMediaddress_type, Unsupported Media Type) \ XX(416, RangeNotSatisfiable, Range Not Satisfiable) \ XX(417, ExpectationFailed, Expectation Failed) \ XX(421, MisdirectedRequest, Misdirected Request) \ diff --git a/ww/managers/socket_manager.c b/ww/managers/socket_manager.c index ebfcd68a..7f412353 100644 --- a/ww/managers/socket_manager.c +++ b/ww/managers/socket_manager.c @@ -483,7 +483,7 @@ static void listenTcp(hloop_t *loop, uint8_t *ports_overlapped) { option.multiport_backend == kMultiportBackendNothing; } - if (option.proto == kSocketProtocolTcp) + if (option.proto == kSapTcp) { if (option.multiport_backend == kMultiportBackendIptables) { diff --git a/ww/managers/socket_manager.h b/ww/managers/socket_manager.h index e6a86ffd..4f475172 100644 --- a/ww/managers/socket_manager.h +++ b/ww/managers/socket_manager.h @@ -3,14 +3,6 @@ #include "hv/hmutex.h" #include "tunnel.h" -typedef enum -{ - kSocketProtocolInvalid, - kSocketProtocolTcp, - kSocketProtocolUdp, - kSocketProtocolIcmp -} socket_protocol_t; - typedef enum { kMultiportBackendNothing, @@ -21,26 +13,26 @@ typedef enum typedef struct socket_filter_option_s { - char * host; - socket_protocol_t proto; - multiport_backend_t multiport_backend; - uint16_t port_min; - uint16_t port_max; - char ** white_list_raddr; - char ** black_list_raddr; - bool fast_open; - bool no_delay; + char * host; + enum socket_address_protocol proto; + multiport_backend_t multiport_backend; + uint16_t port_min; + uint16_t port_max; + char ** white_list_raddr; + char ** black_list_raddr; + bool fast_open; + bool no_delay; } socket_filter_option_t; // user data of accept event typedef struct socket_accept_result_s { - hio_t * io; // it also has the owner loop - tunnel_t * tunnel; - socket_protocol_t proto; - size_t tid; - uint16_t real_localport; + hio_t * io; // it also has the owner loop + tunnel_t * tunnel; + enum socket_address_protocol proto; + size_t tid; + uint16_t real_localport; } socket_accept_result_t; diff --git a/ww/sync_dns.c b/ww/sync_dns.c index 219dfed3..b520358d 100644 --- a/ww/sync_dns.c +++ b/ww/sync_dns.c @@ -1,41 +1,36 @@ #include "sync_dns.h" #include "loggers/dns_logger.h" -static inline bool resolve(socket_context_t *dest) +bool resolveContextSync(socket_context_t *sctx) { + // please check these before calling this function -> more performance + assert(sctx->address_type == kSatDomainName && sctx->domain_resolved == false && sctx->domain != NULL); // we need to get and set port again because resolved ip can be v6/v4 which have different sizes - uint16_t old_port = sockaddr_port(&(dest->addr)); + uint16_t old_port = sockaddr_port(&(sctx->addr)); #ifdef PROFILE struct timeval tv1, tv2; gettimeofday(&tv1, NULL); #endif /* resolve domain */ { - if (sockaddr_set_ip_port(&(dest->addr), dest->domain, old_port) != 0) + if (sockaddr_set_ip_port(&(sctx->addr), sctx->domain, old_port) != 0) { - LOGE("Connector: resolve failed %s", dest->domain); + LOGE("SyncDns: resolve failed %s", sctx->domain); return false; } - if (logger_will_write_level(getDnsLogger(), (log_level_e)LOG_LEVEL_INFO)) + if (logger_will_write_level(getDnsLogger(), (log_level_e) LOG_LEVEL_INFO)) { - char ip[60]; - sockaddr_str(&(dest->addr), ip, 60); - LOGI("Connector: %s resolved to %s", dest->domain, ip); + char ip[64]; + sockaddr_str(&(sctx->addr), ip, 64); + LOGI("SyncDns: %s resolved to %s", sctx->domain, ip); } } #ifdef PROFILE gettimeofday(&tv2, NULL); double time_spent = (double) (tv2.tv_usec - tv1.tv_usec) / 1000000 + (double) (tv2.tv_sec - tv1.tv_sec); - LOGD("Connector: dns resolve took %lf sec", time_spent); + LOGD("SyncDns: dns resolve took %lf sec", time_spent); #endif - dest->resolved = true; + sctx->domain_resolved = true; return true; -} - -bool resolveContextSync(socket_context_t *s_ctx) -{ - // please check these before calling this function -> more performance - assert(s_ctx.atype == kSatDomainName && s_ctx.resolved == false && dest->domain != NULL); - return resolve(s_ctx); } \ No newline at end of file diff --git a/ww/sync_dns.h b/ww/sync_dns.h index c7a6bd16..a09aceef 100644 --- a/ww/sync_dns.h +++ b/ww/sync_dns.h @@ -1,5 +1,6 @@ #pragma once #include "basic_types.h" +// TODO (internal cache , prefer v4/6) bool resolveContextSync(socket_context_t *s_ctx); diff --git a/ww/tunnel.c b/ww/tunnel.c index 76e26768..3142e487 100644 --- a/ww/tunnel.c +++ b/ww/tunnel.c @@ -1,10 +1,10 @@ #include "tunnel.h" #include "string.h" // memset -extern line_t *newLine(uint16_t tid); -extern size_t reserveChainStateIndex(line_t *l); -extern void destroyLine(line_t *l); -extern void destroyContext(context_t *c); +extern line_t * newLine(uint16_t tid); +extern uint8_t reserveChainStateIndex(line_t *l); +extern void destroyLine(line_t *l); +extern void destroyContext(context_t *c); extern context_t *newContext(line_t *line); extern context_t *newContextFrom(context_t *source); extern context_t *newEstContext(line_t *line); @@ -12,23 +12,34 @@ extern context_t *newFinContext(line_t *line); extern context_t *newInitContext(line_t *line); extern context_t *switchLine(context_t *c, line_t *line); -void chain(tunnel_t *from, tunnel_t *to) +// `from` upstreams to `to` +void chainUp(tunnel_t *from, tunnel_t *to) { - assert(to->dw == NULL); // 2 nodes cannot chain to 1 exact node from->up = to; - to->dw = from; - to->chain_index = from->chain_index + 1; } +// `to` downstreams to `from` +void chainDown(tunnel_t *from, tunnel_t *to) +{ + assert(to->dw == NULL); // 2 nodes cannot chain to 1 exact node + to->dw = from; +} +// `from` <-> `to` +void chain(tunnel_t *from, tunnel_t *to) +{ + chainUp(from, to); + chainDown(from, to); + to->chain_index = from->chain_index + 1; +} tunnel_t *newTunnel() { tunnel_t *t = malloc(sizeof(tunnel_t)); - t->state = NULL; - t->dw = NULL; - t->up = NULL; + t->state = NULL; + t->dw = NULL; + t->up = NULL; - t->upStream = &defaultUpStream; + t->upStream = &defaultUpStream; t->downStream = &defaultDownStream; return t; } @@ -48,4 +59,3 @@ void defaultDownStream(tunnel_t *self, context_t *c) self->dw->downStream(self->dw, c); } } - diff --git a/ww/tunnel.h b/ww/tunnel.h index 020d9b66..55897c8d 100644 --- a/ww/tunnel.h +++ b/ww/tunnel.h @@ -6,7 +6,7 @@ #include "hv/hloop.h" #include "ww.h" -#define MAX_CHAIN_LEN 30 +#define MAX_CHAIN_LEN (16 * 2) #define STATE(x) ((void *) ((x)->state)) #define CSTATE(x) ((void *) ((((x)->line->chains_state)[self->chain_index]))) @@ -20,7 +20,7 @@ typedef struct line_s uint16_t lcid; uint8_t auth_cur; uint8_t auth_max; - bool root_alive; + bool alive; socket_context_t src_ctx; socket_context_t dest_ctx; @@ -42,7 +42,7 @@ typedef struct context_s struct tunnel_s; -typedef void(*TunnelFlowRoutine)(struct tunnel_s *, struct context_s *); +typedef void (*TunnelFlowRoutine)(struct tunnel_s *, struct context_s *); struct tunnel_s { @@ -62,6 +62,8 @@ tunnel_t *newTunnel(); void destroyTunnel(tunnel_t *self); void chain(tunnel_t *from, tunnel_t *to); +void chainDown(tunnel_t *from, tunnel_t *to); +void chainUp(tunnel_t *from, tunnel_t *to); void defaultUpStream(tunnel_t *self, context_t *c); void defaultDownStream(tunnel_t *self, context_t *c); @@ -77,6 +79,7 @@ inline line_t *newLine(uint16_t tid) .auth_cur = 0, .auth_max = 0, .loop = loops[tid], + .alive = true, // to set a port we need to know the AF family, default v4 .dest_ctx = (socket_context_t){.addr.sa = (struct sockaddr){.sa_family = AF_INET, .sa_data = {0}}}, .src_ctx = (socket_context_t){.addr.sa = (struct sockaddr){.sa_family = AF_INET, .sa_data = {0}}}, @@ -84,13 +87,13 @@ inline line_t *newLine(uint16_t tid) memset(&(result->chains_state), 0, MAX_CHAIN_LEN); return result; } -inline size_t reserveChainStateIndex(line_t *l) +inline uint8_t reserveChainStateIndex(line_t *l) { - size_t result = l->lcid; + uint8_t result = l->lcid; l->lcid -= 1; return result; } -inline void destroyLine(line_t *l) +static inline void internalUnRefLine(line_t *l) { l->refc -= 1; // check line @@ -99,33 +102,38 @@ inline void destroyLine(line_t *l) return; } -#ifdef DEBUG + assert(l->alive == false); + // there should not be any conn-state alive at this point for (size_t i = 0; i < MAX_CHAIN_LEN; i++) { assert(l->chains_state[i] == NULL); } -#endif - assert(l->src_ctx.domain == NULL); // impossible (hopefully) + assert(l->src_ctx.domain == NULL); // impossible (source domain?) - if (l->dest_ctx.domain != NULL) + if (l->dest_ctx.domain != NULL && ! l->dest_ctx.domain_constant) { free(l->dest_ctx.domain); } free(l); } +inline void destroyLine(line_t *l) +{ + l->alive = false; + internalUnRefLine(l); +} inline void destroyContext(context_t *c) { assert(c->payload == NULL); - destroyLine(c->line); + internalUnRefLine(c->line); free(c); } inline context_t *newContext(line_t *line) { context_t *new_ctx = malloc(sizeof(context_t)); - *new_ctx = (context_t){.line = line}; // yes, everything else is zero + *new_ctx = (context_t){.line = line}; line->refc += 1; return new_ctx; } @@ -134,12 +142,7 @@ inline context_t *newContextFrom(context_t *source) { source->line->refc += 1; context_t *new_ctx = malloc(sizeof(context_t)); - *new_ctx = *source; - new_ctx->payload = NULL; - new_ctx->init = false; - new_ctx->est = false; - new_ctx->first = false; - new_ctx->fin = false; + *new_ctx = (context_t){.line = source->line, .src_io = source->src_io}; return new_ctx; } inline context_t *newEstContext(line_t *line) @@ -165,63 +168,45 @@ inline context_t *newInitContext(line_t *line) inline context_t *switchLine(context_t *c, line_t *line) { line->refc += 1; - destroyLine(c->line); + internalUnRefLine(c->line); c->line = line; return c; } -static inline bool isAlive(line_t *line) +inline bool isAlive(line_t *line) { - return line->root_alive; + return line->alive; } // when you don't have a context from a line, you cant guess the line is free()ed or not -// so you should use locks -static inline void lockLine(line_t *line) +// so you should use locks before losing the last context +inline void lockLine(line_t *line) { line->refc++; } -static inline void unLockLine(line_t *line) +inline void unLockLine(line_t *line) { destroyLine(line); } -static inline void markAuthenticationNodePresence(line_t *line) +inline void markAuthenticationNodePresence(line_t *line) { line->auth_max += 1; } -static inline void markAuthenticated(line_t *line) +inline void markAuthenticated(line_t *line) { line->auth_cur += 1; } -static inline bool isAuthenticated(line_t *line) +inline bool isAuthenticated(line_t *line) { return line->auth_cur > 0; } -static inline bool isFullyAuthenticated(line_t *line) +inline bool isFullyAuthenticated(line_t *line) { return line->auth_cur >= line->auth_max; } -static inline void reuseContextBuffer(context_t *c) +inline void reuseContextBuffer(context_t *c) { assert(c->payload != NULL); reuseBuffer(buffer_pools[c->line->tid], c->payload); c->payload = NULL; } - -static inline void allocateDomainBuffer(socket_context_t *scontext) -{ - if (scontext->domain != NULL) - { - scontext->domain = malloc(256); -#ifdef DEBUG - memset(&(scontext->domain), 0xEE, 256); -#endif - } -} -// len is max 255 since it is 8bit -static inline void setSocketContextDomain(socket_context_t *restrict scontext, char *restrict domain, uint8_t len) -{ - assert(scontext->domain != NULL); - domain[len] = 0x0; - memcpy(scontext->domain, domain, len); -} \ No newline at end of file diff --git a/ww/utils/sockutils.h b/ww/utils/sockutils.h index a86095ee..d7448154 100644 --- a/ww/utils/sockutils.h +++ b/ww/utils/sockutils.h @@ -2,13 +2,18 @@ #include "basic_types.h" #include "hv/hsocket.h" -#define CMP(a, b) \ - if ((a) != (b)) \ - return false +inline void socketAddrCopy(const sockaddr_u *restrict dest, const sockaddr_u *restrict source) +{ + if (source->sa.sa_family == AF_INET) + { + memcpy(dest->sin.sin_addr.s_addr, source->sin.sin_addr.s_addr, sizeof(source->sin.sin_addr.s_addr)); + return; + } + memcpy(dest->sin6.sin6_addr.s6_addr, source->sin6.sin6_addr.s6_addr, sizeof(source->sin6.sin6_addr.s6_addr)); +} inline bool socketCmpIPV4(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2) { - return (addr1->sin.sin_addr.s_addr == addr2->sin.sin_addr.s_addr); } @@ -17,17 +22,39 @@ inline bool socketCmpIPV6(const sockaddr_u *restrict addr1, const sockaddr_u *re int r = memcmp(addr1->sin6.sin6_addr.s6_addr, addr2->sin6.sin6_addr.s6_addr, sizeof(addr1->sin6.sin6_addr.s6_addr)); if (r != 0) { - return r; + return false; + } + if (addr1->sin6.sin6_flowinfo != addr2->sin6.sin6_flowinfo) + { + return false; + } + if (addr1->sin6.sin6_scope_id, addr2->sin6.sin6_scope_id) + { + return false; } - CMP(addr1->sin6.sin6_flowinfo, addr2->sin6.sin6_flowinfo); - CMP(addr1->sin6.sin6_scope_id, addr2->sin6.sin6_scope_id); - return false; + return true; } -#undef CMP - -bool socketCmpIP(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2); - -void copySocketContextAddr(socket_context_t *dest, socket_context_t **source); +bool socketCmpIP(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2); +void copySocketContextAddr(socket_context_t *dest, const socket_context_t *source); void copySocketContextPort(socket_context_t *dest, socket_context_t *source); enum socket_address_type getHostAddrType(char *host); + +inline void allocateDomainBuffer(socket_context_t *scontext) +{ + if (scontext->domain != NULL) + { + scontext->domain = malloc(256); +#ifdef DEBUG + memset(&(scontext->domain), 0xEE, 256); +#endif + } +} +// len is max 255 since it is 8bit +inline void setSocketContextDomain(socket_context_t *restrict scontext, char *restrict domain, uint8_t len) +{ + assert(scontext->domain != NULL); + domain[len] = 0x0; + memcpy(scontext->domain, domain, len); + scontext->domain_len = len; +} \ No newline at end of file diff --git a/ww/utils/utils.c b/ww/utils/utils.c index 47e4d129..5342e4a8 100644 --- a/ww/utils/utils.c +++ b/ww/utils/utils.c @@ -12,6 +12,12 @@ #include #include +extern void socketAddrCopy(const sockaddr_u *restrict dest, const sockaddr_u *restrict source); +extern bool socketCmpIPV4(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2); +extern bool socketCmpIPV6(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2); +extern void allocateDomainBuffer(socket_context_t *scontext); +extern void setSocketContextDomain(socket_context_t *restrict scontext, char *restrict domain, uint8_t len); + char *readFile(const char *const path) { FILE *f = fopen(path, "rb"); @@ -155,9 +161,7 @@ bool getStringFromJsonObjectOrDefault(char **dest, const cJSON *json_obj, const return true; } -extern bool socketCmpIPV4(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2); -extern bool socketCmpIPV6(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2); -bool socketCmpIP(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2) +bool socketCmpIP(const sockaddr_u *restrict addr1, const sockaddr_u *restrict addr2) { if (addr1->sa.sa_family != addr2->sa.sa_family) @@ -179,32 +183,54 @@ bool socketCmpIP(const sockaddr_u *restrict addr1, const sockaddr_u *rest return false; } -void copySocketContextAddr(socket_context_t *dest, socket_context_t **source) +void copySocketContextAddr(socket_context_t *dest, const socket_context_t *const source) { - dest->acmd = (*source)->acmd; - dest->atype = (*source)->atype; - - switch (dest->atype) + dest->address_protocol = source->address_protocol; + dest->address_type = source->address_type; + switch (dest->address_type) { case kSatIPV4: dest->addr.sa.sa_family = AF_INET; - dest->addr.sin.sin_addr = (*source)->addr.sin.sin_addr; + dest->addr.sin.sin_addr = source->addr.sin.sin_addr; break; case kSatDomainName: dest->addr.sa.sa_family = AF_INET; - if ((*source)->domain != NULL) + if (source->domain != NULL) { - dest->domain = (*source)->domain; - // (*source)->domain = NULL; // this copies the pointer and the caller is awair + if (source->domain_constant) + { + if (dest->domain && ! dest->domain_constant) + { + free(dest->domain); + } + dest->domain_constant = true; + dest->domain = source->domain; + dest->domain_len = source->domain_len; + } + else + { + if (dest->domain && dest->domain_constant) + { + dest->domain_constant = false; + allocateDomainBuffer(dest); + } + setSocketContextDomain(dest, source->domain, source->domain_len); + } + if (source->domain_resolved) + { + dest->domain_resolved = true; + socketAddrCopy(&(dest->addr), &(source->addr)); + } + } break; case kSatIPV6: dest->addr.sa.sa_family = AF_INET6; - memcpy(&(dest->addr.sin6.sin6_addr), &((*source)->addr.sin6.sin6_addr), sizeof(struct in6_addr)); + memcpy(&(dest->addr.sin6.sin6_addr), &(source->addr.sin6.sin6_addr), sizeof(struct in6_addr)); break; } @@ -225,19 +251,23 @@ enum socket_address_type getHostAddrType(char *host) void copySocketContextPort(socket_context_t *dest, socket_context_t *source) { - - switch (dest->atype) - { - case kSatIPV4: - case kSatDomainName: - default: - dest->addr.sin.sin_port = source->addr.sin.sin_port; - break; - - case kSatIPV6: - dest->addr.sin6.sin6_port = source->addr.sin6.sin6_port; - break; - } + // this is supposed to work for both ipv4/6 + dest->addr.sin.sin_port = source->addr.sin.sin_port; + + // alternative: + + // switch (dest->address_type) + // { + // case kSatIPV4: + // case kSatDomainName: + // default: + // dest->addr.sin.sin_port = source->addr.sin.sin_port; + // break; + + // case kSatIPV6: + // dest->addr.sin6.sin6_port = source->addr.sin6.sin6_port; + // break; + // } } struct user_s *parseUserFromJsonObject(const cJSON *user_json)