diff --git a/CMakeLists.txt b/CMakeLists.txt index 596c56f..e2ff175 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -68,8 +68,10 @@ option(INCLUDE_CONNECTOR "link Connector staticly to the core" TRUE) option(INCLUDE_TCPCONNECTOR "link TcpConnector staticly to the core" TRUE) option(INCLUDE_UDP_CONNECTOR "link UdpConnector staticly to the core" TRUE) option(INCLUDE_BRIDGE "link Bridge staticly to the core" TRUE) -if(LINUX) # todo (tun other platforms) + +if(LINUX) # todo (other platforms) option(INCLUDE_TUNDEVICE "link TunDevice staticly to the core" TRUE) + option(INCLUDE_RAWDEVICE "link RawDevice staticly to the core" TRUE) endif() option(INCLUDE_OPENSSL_SERVER "link OpenSSlServer staticly to the core" TRUE) @@ -132,6 +134,22 @@ target_link_libraries(Waterwall ww) +#tun device +if (INCLUDE_TUNDEVICE) +target_compile_definitions(Waterwall PUBLIC INCLUDE_TUNDEVICE=1) +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tunnels/adapters/device/tun) +target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/adapters/device/tun) +target_link_libraries(Waterwall TunDevice) +endif() + +#raw device +if (INCLUDE_RAWDEVICE) +target_compile_definitions(Waterwall PUBLIC INCLUDE_RAWDEVICE=1) +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tunnels/adapters/device/raw) +target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/adapters/device/raw) +target_link_libraries(Waterwall RawDevice) +endif() + #layer3 receiver if (INCLUDE_LAYER3_RECEIVER) target_compile_definitions(Waterwall PUBLIC INCLUDE_LAYER3_RECEIVER=1) @@ -228,13 +246,6 @@ target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/ada target_link_libraries(Waterwall Bridge) endif() -#tun device -if (INCLUDE_TUNDEVICE) -target_compile_definitions(Waterwall PUBLIC INCLUDE_TUNDEVICE=1) -add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tunnels/adapters/device/tun) -target_link_directories(Waterwall PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/tunnels/adapters/device/tun) -target_link_libraries(Waterwall TunDevice) -endif() #http2 server if (INCLUDE_OPENSSL_SERVER) diff --git a/core/static_tunnels.c b/core/static_tunnels.c index bf8f391..a2a88e5 100644 --- a/core/static_tunnels.c +++ b/core/static_tunnels.c @@ -1,13 +1,13 @@ +#include "static_tunnels.h" #include "library_loader.h" #include "loggers/core_logger.h" -#include "static_tunnels.h" #include "ww.h" #define USING(x) \ do \ { \ hash_t h = CALC_HASH_BYTES(#x, strlen(#x)); \ - registerStaticLib((tunnel_lib_t){ \ + registerStaticLib((tunnel_lib_t) { \ .hash_name = h, \ .createHandle = new##x, \ .destroyHandle = destroy##x, \ @@ -17,14 +17,14 @@ LOGD("Imported static tunnel lib%-20s hash:%lx", #x, h); \ } while (0); - - - - #ifdef INCLUDE_TUNDEVICE #include "tunnels/adapters/device/tun/tun_device.h" #endif +#ifdef INCLUDE_RAWDEVICE +#include "tunnels/adapters/device/raw/raw_device.h" +#endif + #ifdef INCLUDE_LAYER3_RECEIVER #include "tunnels/layer3/receiver/receiver.h" #endif @@ -177,13 +177,17 @@ #include "tunnels/client/mux/mux_client.h" #endif - - - - void loadStaticTunnelsIntoCore(void) { +#ifdef INCLUDE_TUNDEVICE + USING(TunDevice); +#endif + +#ifdef INCLUDE_RAWDEVICE + USING(RawDevice); +#endif + #ifdef INCLUDE_LAYER3_RECEIVER USING(Layer3Receiver); #endif @@ -204,10 +208,6 @@ void loadStaticTunnelsIntoCore(void) USING(Layer3TcpManipulator); #endif -#ifdef INCLUDE_TUNDEVICE - USING(TunDevice); -#endif - #ifdef INCLUDE_TCP_LISTENER USING(TcpListener); #endif @@ -339,7 +339,4 @@ void loadStaticTunnelsIntoCore(void) #ifdef INCLUDE_MUX_CLIENT USING(MuxServer); #endif - - - } diff --git a/tunnels/adapters/device/raw/CMakeLists.txt b/tunnels/adapters/device/raw/CMakeLists.txt new file mode 100644 index 0000000..bf4e252 --- /dev/null +++ b/tunnels/adapters/device/raw/CMakeLists.txt @@ -0,0 +1,12 @@ + +add_library(RawDevice STATIC + raw_device.c + +) + +target_link_libraries(RawDevice ww) + +target_include_directories(RawDevice PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../../../shared/layer3) + +target_compile_definitions(RawDevice PRIVATE RawDevice_VERSION=0.1) + diff --git a/tunnels/adapters/device/raw/raw_device.c b/tunnels/adapters/device/raw/raw_device.c new file mode 100644 index 0000000..ffc6074 --- /dev/null +++ b/tunnels/adapters/device/raw/raw_device.c @@ -0,0 +1,186 @@ +#include "raw_device.h" +#include "loggers/network_logger.h" +#include "packet_types.h" +#include "utils/jsonutils.h" +#include "ww/devices/raw/raw.h" + +#define LOG_PACKET_INFO 1 + +enum rawdevice_mode_dynamic_value_status +{ + kDvsRead = kDvsFirstOption, + kDvsWrite, + kDvsReadWrite +}; + +typedef struct raw_device_state_s +{ + raw_device_t *rdev; + line_t **thread_lines; + char *name; + unsigned int subnet_mask; + +} raw_device_state_t; + +static void printIPPacketInfo(const unsigned char *buffer, unsigned int len) +{ + char src_ip[INET6_ADDRSTRLEN]; + char dst_ip[INET6_ADDRSTRLEN]; + char logbuf[2048]; + int rem = sizeof(logbuf); + char *ptr = logbuf; + int ret; + + uint8_t version = buffer[0] >> 4; + + if (version == 4) + { + struct ipv4header *ip_header = (struct ipv4header *) buffer; + + inet_ntop(AF_INET, &ip_header->saddr, src_ip, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &ip_header->daddr, dst_ip, INET_ADDRSTRLEN); + + ret = snprintf(ptr, rem, "Received: => From %s to %s, Data: ", src_ip, dst_ip); + } + else if (version == 6) + { + struct ipv6header *ip6_header = (struct ipv6header *) buffer; + + inet_ntop(AF_INET6, &ip6_header->saddr, src_ip, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &ip6_header->daddr, dst_ip, INET6_ADDRSTRLEN); + + ret = snprintf(ptr, rem, "Received: From %s to %s, Data: ", src_ip, dst_ip); + } + else + { + ret = snprintf(ptr, rem, "Received: => Unknown IP version, Data: "); + } + + ptr += ret; + rem -= ret; + + for (int i = 0; i < (int) min(len, 240); i++) + { + ret = snprintf(ptr, rem, "%02x ", buffer[i]); + ptr += ret; + rem -= ret; + } + *ptr = '\0'; + + LOGD(logbuf); +} + +static void upStream(tunnel_t *self, context_t *c) +{ + raw_device_state_t *state = TSTATE((tunnel_t *) self); + + raw_device_t *rdev = state->rdev; + writeToRawDevce(rdev, c->payload); + + dropContexPayload(c); + destroyContext(c); +} + +static void downStream(tunnel_t *self, context_t *c) +{ + (void) (self); + (void) (c); + assert(false); + + if (c->payload) + { + dropContexPayload(c); + } + destroyContext(c); +} + +static void onIPPacketReceived(struct raw_device_s *rdev, void *userdata, shift_buffer_t *buf, tid_t tid) +{ + (void) rdev; + tunnel_t *self = userdata; + raw_device_state_t *state = TSTATE((tunnel_t *) self); + +#if LOG_PACKET_INFO + printIPPacketInfo(rawBuf(buf), bufLen(buf)); +#endif + + // reuseBuffer(getWorkerBufferPool(tid), buf); + + context_t *ctx = newContext(state->thread_lines[tid]); + ctx->payload = buf; + self->up->upStream(self->up, ctx); +} + +tunnel_t *newRawDevice(node_instance_context_t *instance_info) +{ + raw_device_state_t *state = globalMalloc(sizeof(raw_device_state_t)); + memset(state, 0, sizeof(raw_device_state_t)); + + cJSON *settings = instance_info->node_settings_json; + + if (! (cJSON_IsObject(settings) && settings->child != NULL)) + { + LOGF("JSON Error: RawDevice->settings (object field) : The object was empty or invalid"); + return NULL; + } + + // not forced + getStringFromJsonObject(&(state->name), settings, "device-name"); + uint32_t fwmark = 0; + getIntFromJsonObjectOrDefault((int *) &fwmark, settings, "mark", 0); + + dynamic_value_t mode = parseDynamicNumericValueFromJsonObject(settings, "mode", 3, "R", "W", "RW"); + if ((int) mode.status < kDvsRead) + { + LOGF("JSON Error: RawDevice->settings->mode (string field) : mode is not specified or invalid"); + return NULL; + } + + state->thread_lines = globalMalloc(sizeof(line_t *) * WORKERS_COUNT); + for (unsigned int i = 0; i < WORKERS_COUNT; i++) + { + state->thread_lines[i] = newLine(i); + } + + tunnel_t *t = newTunnel(); + + if ((int) mode.status == kDvsRead || (int) mode.status == kDvsReadWrite) + { + state->rdev = createRawDevice(state->name, fwmark, t, onIPPacketReceived); + } + else + { + state->rdev = createRawDevice(state->name, fwmark, t, NULL); + } + + if (state->rdev == NULL) + { + LOGF("RawDevice: could not create device"); + return NULL; + } + bringRawDeviceUP(state->rdev); + + t->state = state; + t->upStream = &upStream; + t->downStream = &downStream; + + return t; +} + +api_result_t apiRawDevice(tunnel_t *self, const char *msg) +{ + (void) (self); + (void) (msg); + return (api_result_t) {0}; +} + +tunnel_t *destroyRawDevice(tunnel_t *self) +{ + (void) (self); + return NULL; +} + +tunnel_metadata_t getMetadataRawDevice(void) +{ + return (tunnel_metadata_t) {.version = 0001, .flags = 0x0}; +} diff --git a/tunnels/adapters/device/raw/raw_device.h b/tunnels/adapters/device/raw/raw_device.h new file mode 100644 index 0000000..c5f2285 --- /dev/null +++ b/tunnels/adapters/device/raw/raw_device.h @@ -0,0 +1,13 @@ +#pragma once +#include "api.h" + +// +// RawDevice +// + +// this node will not join a chain , it will be used by other nodes (if they accept a device) + +tunnel_t * newRawDevice(node_instance_context_t *instance_info); +api_result_t apiRawDevice(tunnel_t *self, const char *msg); +tunnel_t * destroyRawDevice(tunnel_t *self); +tunnel_metadata_t getMetadataRawDevice(void); diff --git a/ww/devices/raw/raw.h b/ww/devices/raw/raw.h new file mode 100644 index 0000000..eead656 --- /dev/null +++ b/ww/devices/raw/raw.h @@ -0,0 +1,41 @@ +#pragma once +#include "buffer_pool.h" +#include "hloop.h" +#include "hplatform.h" +#include "hthread.h" +#include "master_pool.h" +#include + +struct raw_device_s; + +typedef void (*RawReadEventHandle)(struct raw_device_s *rdev, void *userdata, shift_buffer_t *buf, tid_t tid); + +typedef struct raw_device_s +{ + char *name; + int socket; + uint32_t mark; + void *userdata; + hthread_t read_thread; + hthread_t write_thread; + + hthread_routine routine_reader; + hthread_routine routine_writer; + + master_pool_t *reader_message_pool; + generic_pool_t *reader_shift_buffer_pool; + buffer_pool_t *reader_buffer_pool; + RawReadEventHandle read_event_callback; + + struct hchan_s *writer_buffer_channel; + atomic_bool running; + atomic_bool up; + +} raw_device_t; + +bool bringRawDeviceUP(raw_device_t *tdev); +bool bringRawDeviceDown(raw_device_t *tdev); + +raw_device_t *createRawDevice(const char *name, uint32_t mark, void *userdata, RawReadEventHandle cb); + +void writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf); diff --git a/ww/devices/raw/raw_linux.c b/ww/devices/raw/raw_linux.c new file mode 100644 index 0000000..d215cc1 --- /dev/null +++ b/ww/devices/raw/raw_linux.c @@ -0,0 +1,238 @@ +#include "generic_pool.h" +#include "hchan.h" +#include "loggers/network_logger.h" +#include "raw.h" +#include "ww.h" +#include +#include +#include +#include +#include +#include +#include +#include + +enum +{ + kReadPacketSize = 1500, + kMasterMessagePoolCap = 64, + kRawWriteChannelQueueMax = 128 +}; + +struct msg_event +{ + raw_device_t *rdev; + shift_buffer_t *buf; +}; + +static pool_item_t *allocRawMsgPoolHandle(struct master_pool_s *pool, void *userdata) +{ + (void) userdata; + (void) pool; + return globalMalloc(sizeof(struct msg_event)); +} + +static void destroyRawMsgPoolHandle(struct master_pool_s *pool, master_pool_item_t *item, void *userdata) +{ + (void) pool; + (void) userdata; + globalFree(item); +} + +static void localThreadEventReceived(hevent_t *ev) +{ + struct msg_event *msg = hevent_userdata(ev); + tid_t tid = (tid_t) (hloop_tid(hevent_loop(ev))); + + msg->rdev->read_event_callback(msg->rdev, msg->rdev->userdata, msg->buf, tid); + + reuseMasterPoolItems(msg->rdev->reader_message_pool, (void **) &msg, 1); +} + +static void distributePacketPayload(raw_device_t *rdev, tid_t target_tid, shift_buffer_t *buf) +{ + struct msg_event *msg; + popMasterPoolItems(rdev->reader_message_pool, (const void **) &(msg), 1); + + *msg = (struct msg_event) {.rdev = rdev, .buf = buf}; + + hevent_t ev; + memset(&ev, 0, sizeof(ev)); + ev.loop = getWorkerLoop(target_tid); + ev.cb = localThreadEventReceived; + hevent_set_userdata(&ev, msg); + hloop_post_event(getWorkerLoop(target_tid), &ev); +} + +static HTHREAD_ROUTINE(routineReadFromRaw) // NOLINT +{ + raw_device_t *rdev = userdata; + tid_t distribute_tid = 0; + shift_buffer_t *buf; + ssize_t nread; + struct sockaddr saddr; + int saddr_len = sizeof(saddr); + + while (atomic_load_explicit(&(rdev->running), memory_order_relaxed)) + { + buf = popSmallBuffer(rdev->reader_buffer_pool); + + reserveBufSpace(buf, kReadPacketSize); + + nread = recvfrom(rdev->socket, rawBufMut(buf), kReadPacketSize, 0, &saddr, (socklen_t *) &saddr_len); + + if (nread < 0) + { + LOGE("RawDevice: reading from RAW device failed"); + reuseBuffer(rdev->reader_buffer_pool, buf); + return 0; + } + + setLen(buf, nread); + + distributePacketPayload(rdev, distribute_tid++, buf); + + if (distribute_tid >= WORKERS_COUNT) + { + distribute_tid = 0; + } + } + + return 0; +} + +static HTHREAD_ROUTINE(routineWriteToRaw) // NOLINT +{ + raw_device_t *rdev = userdata; + shift_buffer_t *buf; + ssize_t nwrite; + + while (atomic_load_explicit(&(rdev->running), memory_order_relaxed)) + { + if (! hchanRecv(rdev->writer_buffer_channel, &buf)) + { + LOGD("RawDevice: routine write will exit due to channel closed"); + return 0; + } + + struct iphdr *ip_header = (struct iphdr *) rawBuf(buf); + + struct sockaddr_in to_addr = {.sin_family = AF_INET, .sin_addr.s_addr = ip_header->daddr}; + + nwrite = sendto(rdev->socket, ip_header, bufLen(buf), 0, (struct sockaddr *) (&to_addr), sizeof(to_addr)); + + reuseBufferThreadSafe(buf); + + if (nwrite < 0) + { + LOGE("RawDevice: writing to RAW device failed"); + return 0; + } + } + return 0; +} + +void writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf) +{ + bool closed = false; + if (! hchanTrySend(rdev->writer_buffer_channel, &buf, &closed)) + { + if (closed) + { + LOGE("RawDevice: write failed, channel was closed"); + } + else + { + LOGE("RawDevice: write failed, ring is full"); + } + reuseBufferThreadSafe(buf); + } +} + +bool bringRawDeviceUP(raw_device_t *rdev) +{ + assert(! rdev->up); + + rdev->up = true; + rdev->running = true; + + LOGD("RawDevice: device %s is now up", rdev->name); + + if (rdev->read_event_callback != NULL) + { + rdev->read_thread = hthread_create(rdev->routine_reader, rdev); + } + rdev->write_thread = hthread_create(rdev->routine_writer, rdev); + return true; +} + +bool bringRawDeviceDown(raw_device_t *rdev) +{ + assert(rdev->up); + + rdev->running = false; + rdev->up = false; + + hchanClose(rdev->writer_buffer_channel); + + LOGD("RawDevice: device %s is now down", rdev->name); + + if (rdev->read_event_callback != NULL) + { + hthread_join(rdev->read_thread); + } + hthread_join(rdev->write_thread); + + shift_buffer_t *buf; + while (hchanRecv(rdev->writer_buffer_channel, &buf)) + { + reuseBufferThreadSafe(buf); + } + + return true; +} + +raw_device_t *createRawDevice(const char *name, uint32_t mark, void *userdata, RawReadEventHandle cb) +{ + + int rsocket = socket(PF_INET, SOCK_RAW, IPPROTO_RAW); + if (rsocket < 0) + { + LOGE("RawDevice: unable to open a raw socket"); + return NULL; + } + + if (mark != 0) + { + if (setsockopt(rsocket, SOL_SOCKET, SO_MARK, &mark, sizeof(mark)) != 0) + { + LOGE("RawDevice: unable to set raw socket mark to %u", mark); + return NULL; + } + } + + generic_pool_t *sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile, + allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); + buffer_pool_t *bpool = + createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool); + + raw_device_t *rdev = globalMalloc(sizeof(raw_device_t)); + + *rdev = (raw_device_t) {.name = strdup(name), + .running = false, + .up = false, + .routine_reader = routineReadFromRaw, + .routine_writer = routineWriteToRaw, + .socket = rsocket, + .mark = mark, + .reader_shift_buffer_pool = sb_pool, + .read_event_callback = cb, + .userdata = userdata, + .writer_buffer_channel = hchanOpen(sizeof(void *), kRawWriteChannelQueueMax), + .reader_message_pool = newMasterPoolWithCap(kMasterMessagePoolCap), + .reader_buffer_pool = bpool}; + + installMasterPoolAllocCallbacks(rdev->reader_message_pool, rdev, allocRawMsgPoolHandle, destroyRawMsgPoolHandle); + + return rdev; +}