diff --git a/tunnels/adapters/device/capture/caputre_device.c b/tunnels/adapters/device/capture/caputre_device.c index 550ea62..987a380 100644 --- a/tunnels/adapters/device/capture/caputre_device.c +++ b/tunnels/adapters/device/capture/caputre_device.c @@ -90,9 +90,14 @@ static void upStream(tunnel_t *self, context_t *c) capture_device_state_t *state = TSTATE((tunnel_t *) self); capture_device_t *cdev = state->cdev; - writeToCaptureDevce(cdev, c->payload); - - dropContexPayload(c); + if (! writeToCaptureDevce(cdev, c->payload)) + { + reuseContextPayload(c); + } + else + { + dropContexPayload(c); + } destroyContext(c); } diff --git a/tunnels/adapters/device/raw/raw_device.c b/tunnels/adapters/device/raw/raw_device.c index e16f023..47acdc7 100644 --- a/tunnels/adapters/device/raw/raw_device.c +++ b/tunnels/adapters/device/raw/raw_device.c @@ -74,9 +74,14 @@ static void upStream(tunnel_t *self, context_t *c) raw_device_state_t *state = TSTATE((tunnel_t *) self); raw_device_t *rdev = state->rdev; - writeToRawDevce(rdev, c->payload); - - dropContexPayload(c); + if (! writeToRawDevce(rdev, c->payload)) + { + reuseContextPayload(c); + } + else + { + dropContexPayload(c); + } destroyContext(c); } @@ -124,7 +129,7 @@ tunnel_t *newRawDevice(node_instance_context_t *instance_info) } // not forced - getStringFromJsonObjectOrDefault(&(state->name), settings, "device-name","unnamed-device"); + getStringFromJsonObjectOrDefault(&(state->name), settings, "device-name", "unnamed-device"); uint32_t fwmark = 0; getIntFromJsonObjectOrDefault((int *) &fwmark, settings, "mark", 0); diff --git a/tunnels/adapters/device/tun/tun_device.c b/tunnels/adapters/device/tun/tun_device.c index f4faa87..b8f3f0e 100644 --- a/tunnels/adapters/device/tun/tun_device.c +++ b/tunnels/adapters/device/tun/tun_device.c @@ -66,16 +66,20 @@ static void printIPPacketInfo(const unsigned char *buffer, unsigned int len) LOGD(logbuf); } - - static void upStream(tunnel_t *self, context_t *c) { tun_device_state_t *state = TSTATE((tunnel_t *) self); tun_device_t *tdev = state->tdev; - writeToTunDevce(tdev, c->payload); + if (! writeToTunDevce(tdev, c->payload)) + { + reuseContextPayload(c); + } + else + { + dropContexPayload(c); + } - dropContexPayload(c); destroyContext(c); } @@ -99,7 +103,7 @@ static void onIPPacketReceived(struct tun_device_s *tdev, void *userdata, shift_ tun_device_state_t *state = TSTATE((tunnel_t *) self); #if LOG_PACKET_INFO - printIPPacketInfo(rawBuf(buf),bufLen(buf)); + printIPPacketInfo(rawBuf(buf), bufLen(buf)); #endif // reuseBuffer(getWorkerBufferPool(tid), buf); diff --git a/tunnels/client/http2/http2_client.c b/tunnels/client/http2/http2_client.c index 9f38003..57c285a 100644 --- a/tunnels/client/http2/http2_client.c +++ b/tunnels/client/http2/http2_client.c @@ -486,7 +486,7 @@ static void downStream(tunnel_t *self, context_t *c) if (ret != (ssize_t) consumed) { - assert(false); + // assert(false); deleteHttp2Connection(con); self->up->upStream(self->up, newFinContext(c->line)); reuseContextPayload(c); diff --git a/tunnels/server/http2/http2_server.c b/tunnels/server/http2/http2_server.c index 77bdd39..85bffa7 100644 --- a/tunnels/server/http2/http2_server.c +++ b/tunnels/server/http2/http2_server.c @@ -355,7 +355,7 @@ static void upStream(tunnel_t *self, context_t *c) if (ret != (ssize_t) consumed) { - assert(false); + // assert(false); deleteHttp2Connection(con); self->dw->downStream(self->dw, newFinContext(c->line)); reuseContextPayload(c); diff --git a/ww/buffer_pool.c b/ww/buffer_pool.c index 4eda6d8..45eee03 100644 --- a/ww/buffer_pool.c +++ b/ww/buffer_pool.c @@ -38,24 +38,10 @@ struct buffer_pool_s shift_buffer_t **large_buffers; master_pool_t *small_buffers_mp; shift_buffer_t **small_buffers; - uint8_t tid; }; // NOLINTEND -void reuseBufferThreadSafe(shift_buffer_t *buf) -{ - if (isLargeBuffer(buf)) - { - reset(buf, getBufferPoolLargeBufferDefaultSize()); - reuseMasterPoolItems(GSTATE.masterpool_buffer_pools_large, (void **) &buf, 1); - } - else - { - reset(buf, getBufferPoolSmallBufferDefaultSize()); - reuseMasterPoolItems(GSTATE.masterpool_buffer_pools_small, (void **) &buf, 1); - } -} unsigned int getBufferPoolLargeBufferDefaultSize(void) { @@ -74,6 +60,7 @@ bool isLargeBuffer(shift_buffer_t *buf) static master_pool_item_t *createLargeBufHandle(struct master_pool_s *pool, void *userdata) { (void) pool; + buffer_pool_t *bpool = userdata; return newShiftBuffer(bpool->shift_buffer_pool, bpool->large_buffers_default_size); } @@ -104,7 +91,7 @@ static void reChargeLargeBuffers(buffer_pool_t *pool) const size_t increase = min((pool->cap - pool->large_buffers_container_len), pool->cap / 2); popMasterPoolItems(pool->large_buffers_mp, - (void const **) &(pool->large_buffers[pool->large_buffers_container_len]), increase); + (void const **) &(pool->large_buffers[pool->large_buffers_container_len]), increase, pool); pool->large_buffers_container_len += increase; #if defined(DEBUG) && defined(BUFFER_POOL_DEBUG) @@ -117,7 +104,7 @@ static void reChargeSmallBuffers(buffer_pool_t *pool) const size_t increase = min((pool->cap - pool->small_buffers_container_len), pool->cap / 2); popMasterPoolItems(pool->small_buffers_mp, - (void const **) &(pool->small_buffers[pool->small_buffers_container_len]), increase); + (void const **) &(pool->small_buffers[pool->small_buffers_container_len]), increase, pool); pool->small_buffers_container_len += increase; #if defined(DEBUG) && defined(BUFFER_POOL_DEBUG) @@ -142,7 +129,8 @@ static void shrinkLargeBuffers(buffer_pool_t *pool) const size_t decrease = min(pool->large_buffers_container_len, pool->cap / 2); reuseMasterPoolItems(pool->large_buffers_mp, - (void **) &(pool->large_buffers[pool->large_buffers_container_len - decrease]), decrease); + (void **) &(pool->large_buffers[pool->large_buffers_container_len - decrease]), decrease, + pool); pool->large_buffers_container_len -= decrease; @@ -156,7 +144,8 @@ static void shrinkSmallBuffers(buffer_pool_t *pool) const size_t decrease = min(pool->small_buffers_container_len, pool->cap / 2); reuseMasterPoolItems(pool->small_buffers_mp, - (void **) &(pool->small_buffers[pool->small_buffers_container_len - decrease]), decrease); + (void **) &(pool->small_buffers[pool->small_buffers_container_len - decrease]), decrease, + pool); pool->small_buffers_container_len -= decrease; @@ -285,8 +274,8 @@ static buffer_pool_t *allocBufferPool(struct master_pool_s *mp_large, struct mas .small_buffers = globalMalloc(container_len), }; - installMasterPoolAllocCallbacks(ptr_pool->large_buffers_mp, ptr_pool, createLargeBufHandle, destroyLargeBufHandle); - installMasterPoolAllocCallbacks(ptr_pool->small_buffers_mp, ptr_pool, createSmallBufHandle, destroySmallBufHandle); + installMasterPoolAllocCallbacks(ptr_pool->large_buffers_mp, createLargeBufHandle, destroyLargeBufHandle); + installMasterPoolAllocCallbacks(ptr_pool->small_buffers_mp, createSmallBufHandle, destroySmallBufHandle); #ifdef DEBUG memset(ptr_pool->large_buffers, 0xFE, container_len); @@ -297,7 +286,8 @@ static buffer_pool_t *allocBufferPool(struct master_pool_s *mp_large, struct mas return ptr_pool; } -buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, generic_pool_t *sb_pool) +buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, generic_pool_t *sb_pool, + unsigned int pool_width) { - return allocBufferPool(mp_large, mp_small, sb_pool, BUFFERPOOL_CONTAINER_LEN, LARGE_BUFFER_SIZE, SMALL_BUFFER_SIZE); + return allocBufferPool(mp_large, mp_small, sb_pool, pool_width, LARGE_BUFFER_SIZE, SMALL_BUFFER_SIZE); } diff --git a/ww/buffer_pool.h b/ww/buffer_pool.h index 1334aa1..bfe14b6 100644 --- a/ww/buffer_pool.h +++ b/ww/buffer_pool.h @@ -29,13 +29,13 @@ typedef struct buffer_pool_s buffer_pool_t; -buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, - generic_pool_t *sb_pool); +buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, generic_pool_t *sb_pool, + unsigned int pool_width); shift_buffer_t *popBuffer(buffer_pool_t *pool); shift_buffer_t *popSmallBuffer(buffer_pool_t *pool); shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict b1, shift_buffer_t *restrict b2); void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b); -void reuseBufferThreadSafe(shift_buffer_t *buf); -unsigned int getBufferPoolLargeBufferDefaultSize(void); -unsigned int getBufferPoolSmallBufferDefaultSize(void); -bool isLargeBuffer(shift_buffer_t *buf); +// void reuseBufferThreadSafe(shift_buffer_t *buf); +unsigned int getBufferPoolLargeBufferDefaultSize(void); +unsigned int getBufferPoolSmallBufferDefaultSize(void); +bool isLargeBuffer(shift_buffer_t *buf); diff --git a/ww/devices/capture/capture.h b/ww/devices/capture/capture.h index 38aad2e..1099ea4 100644 --- a/ww/devices/capture/capture.h +++ b/ww/devices/capture/capture.h @@ -23,9 +23,12 @@ typedef struct capture_device_s hthread_routine routine_reader; hthread_routine routine_writer; - master_pool_t *reader_message_pool; - generic_pool_t *reader_shift_buffer_pool; - buffer_pool_t *reader_buffer_pool; + master_pool_t *reader_message_pool; + generic_pool_t *reader_shift_buffer_pool; + buffer_pool_t *reader_buffer_pool; + generic_pool_t *writer_shift_buffer_pool; + buffer_pool_t *writer_buffer_pool; + CaptureReadEventHandle read_event_callback; struct hchan_s *writer_buffer_channel; @@ -39,4 +42,4 @@ bool bringCaptureDeviceDown(capture_device_t *cdev); capture_device_t *createCaptureDevice(const char *name, uint32_t queue_number, void *userdata, CaptureReadEventHandle cb); -void writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf); +bool writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf); diff --git a/ww/devices/capture/capture_linux.c b/ww/devices/capture/capture_linux.c index e96c9dd..382e1aa 100644 --- a/ww/devices/capture/capture_linux.c +++ b/ww/devices/capture/capture_linux.c @@ -52,13 +52,13 @@ static void localThreadEventReceived(hevent_t *ev) msg->cdev->read_event_callback(msg->cdev, msg->cdev->userdata, msg->buf, tid); - reuseMasterPoolItems(msg->cdev->reader_message_pool, (void **) &msg, 1); + reuseMasterPoolItems(msg->cdev->reader_message_pool, (void **) &msg, 1, msg->cdev); } static void distributePacketPayload(capture_device_t *cdev, tid_t target_tid, shift_buffer_t *buf) { struct msg_event *msg; - popMasterPoolItems(cdev->reader_message_pool, (const void **) &(msg), 1); + popMasterPoolItems(cdev->reader_message_pool, (const void **) &(msg), 1, cdev); *msg = (struct msg_event) {.cdev = cdev, .buf = buf}; @@ -176,13 +176,13 @@ static bool netfilterSetQueueLength(int netfilter_socket, uint16_t qnumber, uint static int netfilterGetPacket(int netfilter_socket, uint16_t qnumber, shift_buffer_t *buff) { // Read a message from netlink - char nl_buff[512+kEthDataLen + sizeof(struct ethhdr) + sizeof(struct nfqnl_msg_packet_hdr)]; + char nl_buff[512 + kEthDataLen + sizeof(struct ethhdr) + sizeof(struct nfqnl_msg_packet_hdr)]; struct sockaddr_nl nl_addr; socklen_t nl_addr_len = sizeof(nl_addr); ssize_t result = recvfrom(netfilter_socket, nl_buff, sizeof(nl_buff), 0, (struct sockaddr *) &nl_addr, &nl_addr_len); - - if (result <= (int) sizeof(struct nlmsghdr)) + + if (result <= (int) sizeof(struct nlmsghdr)) { errno = EINVAL; return -1; @@ -335,7 +335,7 @@ static HTHREAD_ROUTINE(routineWriteToCapture) // NOLINT nwrite = sendto(cdev->socket, ip_header, bufLen(buf), 0, (struct sockaddr *) (&to_addr), sizeof(to_addr)); - reuseBufferThreadSafe(buf); + reuseBuffer(cdev->reader_buffer_pool, buf); if (nwrite < 0) { @@ -346,7 +346,7 @@ static HTHREAD_ROUTINE(routineWriteToCapture) // NOLINT return 0; } -void writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf) +bool writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf) { bool closed = false; if (! hchanTrySend(cdev->writer_buffer_channel, &buf, &closed)) @@ -359,8 +359,9 @@ void writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf) { LOGE("CaptureDevice:write failed, ring is full"); } - reuseBufferThreadSafe(buf); + return false; } + return true; } bool bringCaptureDeviceUP(capture_device_t *cdev) @@ -394,7 +395,7 @@ bool bringCaptureDeviceDown(capture_device_t *cdev) shift_buffer_t *buf; while (hchanRecv(cdev->writer_buffer_channel, &buf)) { - reuseBufferThreadSafe(buf); + reuseBuffer(cdev->reader_buffer_pool, buf); } return true; @@ -446,8 +447,13 @@ capture_device_t *createCaptureDevice(const char *name, uint32_t queue_number, v generic_pool_t *sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); - buffer_pool_t *bpool = - createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool); + buffer_pool_t *bpool = createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, + sb_pool, GSTATE.ram_profile); + + generic_pool_t *writer_sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, 1, + allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); + buffer_pool_t *writer_bpool = + createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool, 1); capture_device_t *cdev = globalMalloc(sizeof(capture_device_t)); @@ -463,10 +469,11 @@ capture_device_t *createCaptureDevice(const char *name, uint32_t queue_number, v .userdata = userdata, .writer_buffer_channel = hchanOpen(sizeof(void *), kCaptureWriteChannelQueueMax), .reader_message_pool = newMasterPoolWithCap(kMasterMessagePoolCap), - .reader_buffer_pool = bpool}; + .reader_buffer_pool = bpool, + .writer_shift_buffer_pool = writer_sb_pool, + .writer_buffer_pool = writer_bpool}; - installMasterPoolAllocCallbacks(cdev->reader_message_pool, cdev, allocCaptureMsgPoolHandle, - destroyCaptureMsgPoolHandle); + installMasterPoolAllocCallbacks(cdev->reader_message_pool, allocCaptureMsgPoolHandle, destroyCaptureMsgPoolHandle); return cdev; } diff --git a/ww/devices/raw/raw.h b/ww/devices/raw/raw.h index bbec0a8..4b59aff 100644 --- a/ww/devices/raw/raw.h +++ b/ww/devices/raw/raw.h @@ -22,9 +22,12 @@ typedef struct raw_device_s hthread_routine routine_reader; hthread_routine routine_writer; - master_pool_t *reader_message_pool; - generic_pool_t *reader_shift_buffer_pool; - buffer_pool_t *reader_buffer_pool; + master_pool_t *reader_message_pool; + generic_pool_t *reader_shift_buffer_pool; + buffer_pool_t *reader_buffer_pool; + generic_pool_t *writer_shift_buffer_pool; + buffer_pool_t *writer_buffer_pool; + RawReadEventHandle read_event_callback; struct hchan_s *writer_buffer_channel; @@ -38,4 +41,4 @@ bool bringRawDeviceDown(raw_device_t *rdev); raw_device_t *createRawDevice(const char *name, uint32_t mark, void *userdata, RawReadEventHandle cb); -void writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf); +bool writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf); diff --git a/ww/devices/raw/raw_linux.c b/ww/devices/raw/raw_linux.c index 94a1e45..38b6f53 100644 --- a/ww/devices/raw/raw_linux.c +++ b/ww/devices/raw/raw_linux.c @@ -46,13 +46,13 @@ static void localThreadEventReceived(hevent_t *ev) msg->rdev->read_event_callback(msg->rdev, msg->rdev->userdata, msg->buf, tid); - reuseMasterPoolItems(msg->rdev->reader_message_pool, (void **) &msg, 1); + reuseMasterPoolItems(msg->rdev->reader_message_pool, (void **) &msg, 1, msg->rdev); } static void distributePacketPayload(raw_device_t *rdev, tid_t target_tid, shift_buffer_t *buf) { struct msg_event *msg; - popMasterPoolItems(rdev->reader_message_pool, (const void **) &(msg), 1); + popMasterPoolItems(rdev->reader_message_pool, (const void **) &(msg), 1, rdev); *msg = (struct msg_event) {.rdev = rdev, .buf = buf}; @@ -121,7 +121,7 @@ static HTHREAD_ROUTINE(routineWriteToRaw) // NOLINT nwrite = sendto(rdev->socket, ip_header, bufLen(buf), 0, (struct sockaddr *) (&to_addr), sizeof(to_addr)); - reuseBufferThreadSafe(buf); + reuseBuffer(rdev->reader_buffer_pool, buf); if (nwrite < 0) { @@ -132,7 +132,7 @@ static HTHREAD_ROUTINE(routineWriteToRaw) // NOLINT return 0; } -void writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf) +bool writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf) { bool closed = false; if (! hchanTrySend(rdev->writer_buffer_channel, &buf, &closed)) @@ -145,8 +145,9 @@ void writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf) { LOGE("RawDevice: write failed, ring is full"); } - reuseBufferThreadSafe(buf); + return false; } + return true; } bool bringRawDeviceUP(raw_device_t *rdev) @@ -186,7 +187,7 @@ bool bringRawDeviceDown(raw_device_t *rdev) shift_buffer_t *buf; while (hchanRecv(rdev->writer_buffer_channel, &buf)) { - reuseBufferThreadSafe(buf); + reuseBuffer(rdev->reader_buffer_pool, buf); } return true; @@ -221,12 +222,18 @@ raw_device_t *createRawDevice(const char *name, uint32_t mark, void *userdata, R // if the user really wanted to read from raw socket sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); - bpool = createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool); + bpool = createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool, + GSTATE.ram_profile); mpool = newMasterPoolWithCap(kMasterMessagePoolCap); - installMasterPoolAllocCallbacks(mpool, rdev, allocRawMsgPoolHandle, destroyRawMsgPoolHandle); + installMasterPoolAllocCallbacks(mpool, allocRawMsgPoolHandle, destroyRawMsgPoolHandle); } + generic_pool_t *writer_sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, GSTATE.ram_profile, + allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); + buffer_pool_t *writer_bpool = createBufferPool(GSTATE.masterpool_buffer_pools_large, + GSTATE.masterpool_buffer_pools_small, sb_pool, GSTATE.ram_profile); + *rdev = (raw_device_t) {.name = strdup(name), .running = false, .up = false, @@ -239,7 +246,9 @@ raw_device_t *createRawDevice(const char *name, uint32_t mark, void *userdata, R .userdata = userdata, .writer_buffer_channel = hchanOpen(sizeof(void *), kRawWriteChannelQueueMax), .reader_message_pool = mpool, - .reader_buffer_pool = bpool}; + .reader_buffer_pool = bpool, + .writer_shift_buffer_pool = writer_sb_pool, + .writer_buffer_pool = writer_bpool}; return rdev; } diff --git a/ww/devices/tun/tun.h b/ww/devices/tun/tun.h index b1ad3d3..2c9b990 100644 --- a/ww/devices/tun/tun.h +++ b/ww/devices/tun/tun.h @@ -33,6 +33,9 @@ typedef struct tun_device_s master_pool_t *reader_message_pool; generic_pool_t *reader_shift_buffer_pool; buffer_pool_t *reader_buffer_pool; + generic_pool_t *writer_shift_buffer_pool; + buffer_pool_t *writer_buffer_pool; + TunReadEventHandle read_event_callback; struct hchan_s *writer_buffer_channel; @@ -47,4 +50,4 @@ bool bringTunDeviceUP(tun_device_t *tdev); bool bringTunDeviceDown(tun_device_t *tdev); bool assignIpToTunDevice(tun_device_t *tdev, const char *ip_presentation, unsigned int subnet); bool unAssignIpToTunDevice(tun_device_t *tdev, const char *ip_presentation, unsigned int subnet); -void writeToTunDevce(tun_device_t *tdev, shift_buffer_t *buf); +bool writeToTunDevce(tun_device_t *tdev, shift_buffer_t *buf); diff --git a/ww/devices/tun/tun_linux.c b/ww/devices/tun/tun_linux.c index 0103721..b013786 100644 --- a/ww/devices/tun/tun_linux.c +++ b/ww/devices/tun/tun_linux.c @@ -98,13 +98,13 @@ static void localThreadEventReceived(hevent_t *ev) msg->tdev->read_event_callback(msg->tdev, msg->tdev->userdata, msg->buf, tid); - reuseMasterPoolItems(msg->tdev->reader_message_pool, (void **) &msg, 1); + reuseMasterPoolItems(msg->tdev->reader_message_pool, (void **) &msg, 1, msg->tdev); } static void distributePacketPayload(tun_device_t *tdev, tid_t target_tid, shift_buffer_t *buf) { struct msg_event *msg; - popMasterPoolItems(tdev->reader_message_pool, (const void **) &(msg), 1); + popMasterPoolItems(tdev->reader_message_pool, (const void **) &(msg), 1, tdev); *msg = (struct msg_event) {.tdev = tdev, .buf = buf}; @@ -172,7 +172,7 @@ static HTHREAD_ROUTINE(routineWriteToTun) // NOLINT nwrite = write(tdev->handle, rawBuf(buf), bufLen(buf)); - reuseBufferThreadSafe(buf); + reuseBuffer(tdev->writer_buffer_pool, buf); if (nwrite < 0) { @@ -183,7 +183,7 @@ static HTHREAD_ROUTINE(routineWriteToTun) // NOLINT return 0; } -void writeToTunDevce(tun_device_t *tdev, shift_buffer_t *buf) +bool writeToTunDevce(tun_device_t *tdev, shift_buffer_t *buf) { bool closed = false; @@ -197,8 +197,9 @@ void writeToTunDevce(tun_device_t *tdev, shift_buffer_t *buf) { LOGE("TunDevice: write failed, ring is full"); } - reuseBufferThreadSafe(buf); + return false; } + return true; } bool unAssignIpToTunDevice(tun_device_t *tdev, const char *ip_presentation, unsigned int subnet) @@ -281,7 +282,7 @@ bool bringTunDeviceDown(tun_device_t *tdev) shift_buffer_t *buf; while (hchanRecv(tdev->writer_buffer_channel, &buf)) { - reuseBufferThreadSafe(buf); + reuseBuffer(tdev->reader_buffer_pool, buf); } return true; @@ -318,8 +319,13 @@ tun_device_t *createTunDevice(const char *name, bool offload, void *userdata, Tu generic_pool_t *sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); - buffer_pool_t *bpool = - createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool); + buffer_pool_t *bpool = createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, + sb_pool, (0) + GSTATE.ram_profile); + + generic_pool_t *writer_sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, 1, + allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); + buffer_pool_t *writer_bpool = + createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool, 1); tun_device_t *tdev = globalMalloc(sizeof(tun_device_t)); @@ -329,14 +335,18 @@ tun_device_t *createTunDevice(const char *name, bool offload, void *userdata, Tu .routine_reader = routineReadFromTun, .routine_writer = routineWriteToTun, .handle = fd, - .reader_shift_buffer_pool = sb_pool, .read_event_callback = cb, .userdata = userdata, .writer_buffer_channel = hchanOpen(sizeof(void *), kTunWriteChannelQueueMax), + .reader_shift_buffer_pool = sb_pool, .reader_message_pool = newMasterPoolWithCap(kMasterMessagePoolCap), - .reader_buffer_pool = bpool}; + .reader_buffer_pool = bpool, + .writer_shift_buffer_pool = writer_sb_pool, + .writer_buffer_pool = writer_bpool + + }; - installMasterPoolAllocCallbacks(tdev->reader_message_pool, tdev, allocTunMsgPoolHandle, destroyTunMsgPoolHandle); + installMasterPoolAllocCallbacks(tdev->reader_message_pool, allocTunMsgPoolHandle, destroyTunMsgPoolHandle); return tdev; } diff --git a/ww/generic_pool.c b/ww/generic_pool.c index cb70444..5b711d5 100644 --- a/ww/generic_pool.c +++ b/ww/generic_pool.c @@ -35,7 +35,7 @@ void poolReCharge(generic_pool_t *pool) { const size_t increase = min((pool->cap - pool->len), (pool->cap) / 2); - popMasterPoolItems(pool->mp, (void const **) &(pool->available[pool->len]), increase); + popMasterPoolItems(pool->mp, (void const **) &(pool->available[pool->len]), increase,pool); pool->len += increase; #if defined(DEBUG) && defined(POOL_DEBUG) @@ -47,7 +47,7 @@ void poolShrink(generic_pool_t *pool) { const size_t decrease = (pool->len < (pool->cap / 2) ? pool->len : (pool->cap / 2)); - reuseMasterPoolItems(pool->mp, &(pool->available[pool->len - decrease]), decrease); + reuseMasterPoolItems(pool->mp, &(pool->available[pool->len - decrease]), decrease,pool); pool->len -= decrease; @@ -65,7 +65,7 @@ static generic_pool_t *allocateGenericPool(struct master_pool_s *mp, unsigned in PoolItemCreateHandle create_h, PoolItemDestroyHandle destroy_h) { - pool_width = (max(1, pool_width) + 15) & ~0x0F; + pool_width = max(1, pool_width); // half of the pool is used, other half is free at startup pool_width = 2 * pool_width; @@ -81,7 +81,7 @@ static generic_pool_t *allocateGenericPool(struct master_pool_s *mp, unsigned in .mp = mp, .create_item_handle = create_h, .destroy_item_handle = destroy_h}; - installMasterPoolAllocCallbacks(pool_ptr->mp, pool_ptr, poolCreateItemHandle, poolDestroyItemHandle); + installMasterPoolAllocCallbacks(pool_ptr->mp, poolCreateItemHandle, poolDestroyItemHandle); // poolFirstCharge(pool_ptr); return pool_ptr; } diff --git a/ww/managers/memory_manager.h b/ww/managers/memory_manager.h index 8a9e1d9..6fe8cb3 100644 --- a/ww/managers/memory_manager.h +++ b/ww/managers/memory_manager.h @@ -4,7 +4,7 @@ #include -// #define ALLOCATOR_BYPASS // switch to stdlib allocators +#define ALLOCATOR_BYPASS // switch to stdlib allocators diff --git a/ww/managers/socket_manager.c b/ww/managers/socket_manager.c index 966eca1..5dba870 100644 --- a/ww/managers/socket_manager.c +++ b/ww/managers/socket_manager.c @@ -967,7 +967,7 @@ socket_manager_state_t *createSocketManager(void) allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle); worker->buffer_pool = createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, - worker->shift_buffer_pool); + worker->shift_buffer_pool,GSTATE.ram_profile); worker->loop = hloop_new(HLOOP_FLAG_AUTO_FREE, worker->buffer_pool, worker->tid); diff --git a/ww/master_pool.h b/ww/master_pool.h index 0a2631c..54f40c9 100644 --- a/ww/master_pool.h +++ b/ww/master_pool.h @@ -63,7 +63,6 @@ typedef void (*MasterPoolItemDestroyHandle)(struct master_pool_s *pool, master_p typedef struct master_pool_s { void *memptr; - void *userdata; hhybridmutex_t mutex; MasterPoolItemCreateHandle create_item_handle; MasterPoolItemDestroyHandle destroy_item_handle; @@ -73,11 +72,11 @@ typedef struct master_pool_s } ATTR_ALIGNED_LINE_CACHE master_pool_t; static inline void popMasterPoolItems(master_pool_t *const pool, master_pool_item_t const **const iptr, - const unsigned int count) + const unsigned int count,void* userdata) { // for (unsigned int i = 0; i < count; i++) // { - // iptr[i] = pool->create_item_handle(pool, pool->userdata); + // iptr[i] = pool->create_item_handle(pool, userdata); // } // return; @@ -97,7 +96,7 @@ static inline void popMasterPoolItems(master_pool_t *const pool, master_pool_ite } for (; i < count; i++) { - iptr[i] = pool->create_item_handle(pool, pool->userdata); + iptr[i] = pool->create_item_handle(pool, userdata); } } @@ -107,16 +106,16 @@ static inline void popMasterPoolItems(master_pool_t *const pool, master_pool_ite for (unsigned int i = 0; i < count; i++) { - iptr[i] = pool->create_item_handle(pool, pool->userdata); + iptr[i] = pool->create_item_handle(pool, userdata); } } static inline void reuseMasterPoolItems(master_pool_t *const pool, master_pool_item_t **const iptr, - const unsigned int count) + const unsigned int count,void* userdata) { // for (unsigned int i = 0; i < count; i++) // { - // pool->destroy_item_handle(pool, iptr[i], pool->userdata); + // pool->destroy_item_handle(pool, iptr[i], userdata); // } // return; @@ -124,7 +123,7 @@ static inline void reuseMasterPoolItems(master_pool_t *const pool, master_pool_i { for (unsigned int i = 0; i < count; i++) { - pool->destroy_item_handle(pool, iptr[i], pool->userdata); + pool->destroy_item_handle(pool, iptr[i], userdata); } return; } @@ -145,18 +144,17 @@ static inline void reuseMasterPoolItems(master_pool_t *const pool, master_pool_i } for (; i < count; i++) { - pool->destroy_item_handle(pool, iptr[i], pool->userdata); + pool->destroy_item_handle(pool, iptr[i], userdata); } } hhybridmutex_unlock(&(pool->mutex)); } -static void installMasterPoolAllocCallbacks(master_pool_t *pool, void *userdata, MasterPoolItemCreateHandle create_h, +static void installMasterPoolAllocCallbacks(master_pool_t *pool, MasterPoolItemCreateHandle create_h, MasterPoolItemDestroyHandle destroy_h) { hhybridmutex_lock(&(pool->mutex)); - pool->userdata = userdata; pool->create_item_handle = create_h; pool->destroy_item_handle = destroy_h; hhybridmutex_unlock(&(pool->mutex)); diff --git a/ww/ww.c b/ww/ww.c index 04ff0d6..efd6f04 100644 --- a/ww/ww.c +++ b/ww/ww.c @@ -52,7 +52,7 @@ static void initalizeWorker(worker_t *worker, tid_t tid) GSTATE.shortcut_shift_buffer_pools[tid] = getWorker(tid)->shift_buffer_pool; worker->buffer_pool = createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, - worker->shift_buffer_pool); + worker->shift_buffer_pool, (0) + GSTATE.ram_profile); GSTATE.shortcut_buffer_pools[tid] = getWorker(tid)->buffer_pool; worker->loop = hloop_new(HLOOP_FLAG_AUTO_FREE, worker->buffer_pool, tid);