Skip to content

Commit

Permalink
Use mempool for packets
Browse files Browse the repository at this point in the history
  • Loading branch information
bertmelis authored May 15, 2024
1 parent 2251a7a commit 2c75910
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
14 changes: 13 additions & 1 deletion src/MemoryPool/src/Fixed.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ the LICENSE file.

#include <cstddef> // std::size_t
#include <cassert> // assert
#if _GLIBCXX_HAS_GTHREADS
#include <mutex> // NOLINT [build/c++11] std::mutex, std::lock_guard
#else
#warning "The memory pool is not thread safe"
#endif

#ifdef MEMPOL_DEBUG
#include <iostream>
Expand Down Expand Up @@ -38,7 +42,9 @@ class Fixed {
Fixed& operator= (const Fixed&) = delete;

void* malloc() {
#if _GLIBCXX_HAS_GTHREADS
const std::lock_guard<std::mutex> lockGuard(_mutex);
#endif
if (_head) {
void* retVal = _head;
_head = *reinterpret_cast<unsigned char**>(_head);
Expand All @@ -49,13 +55,17 @@ class Fixed {

void free(void* ptr) {
if (!ptr) return;
#if _GLIBCXX_HAS_GTHREADS
const std::lock_guard<std::mutex> lockGuard(_mutex);
#endif
*reinterpret_cast<unsigned char**>(ptr) = _head;
_head = reinterpret_cast<unsigned char*>(ptr);
}

std::size_t freeMemory() {
#if _GLIBCXX_HAS_GTHREADS
const std::lock_guard<std::mutex> lockGuard(_mutex);
#endif
unsigned char* i = _head;
std::size_t retVal = 0;
while (i) {
Expand Down Expand Up @@ -101,7 +111,9 @@ class Fixed {
private:
unsigned char _buffer[nrBlocks * (sizeof(std::size_t) > blocksize ? sizeof(std::size_t) : blocksize)];
unsigned char* _head;
#if _GLIBCXX_HAS_GTHREADS
std::mutex _mutex;
#endif
};

} // end namespace MemoryPool
} // end namespace MemoryPool
14 changes: 14 additions & 0 deletions src/MemoryPool/src/Variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ the LICENSE file.

#include <cstddef> // std::size_t
#include <cassert> // assert
#if _GLIBCXX_HAS_GTHREADS
#include <mutex> // NOLINT [build/c++11] std::mutex, std::lock_guard
#else
#warning "The memory pool is not thread safe"
#endif

#ifdef MEMPOL_DEBUG
#include <iostream>
Expand Down Expand Up @@ -45,7 +49,9 @@ class Variable {
Variable& operator= (const Variable&) = delete;

void* malloc(size_t size) {
#if _GLIBCXX_HAS_GTHREADS
const std::lock_guard<std::mutex> lockGuard(_mutex);
#endif
if (size == 0) return nullptr;

size = (size / sizeof(BlockHeader) + (size % sizeof(BlockHeader) != 0)) + 1; // count by BlockHeader size, add 1 for header
Expand Down Expand Up @@ -107,7 +113,9 @@ class Variable {
std::cout << "free " << static_cast<void*>(reinterpret_cast<BlockHeader*>(ptr) - 1) << std::endl;
#endif

#if _GLIBCXX_HAS_GTHREADS
const std::lock_guard<std::mutex> lockGuard(_mutex);
#endif

BlockHeader* toFree = reinterpret_cast<BlockHeader*>(ptr) - 1;
BlockHeader* previous = reinterpret_cast<BlockHeader*>(_buffer);
Expand Down Expand Up @@ -152,7 +160,9 @@ class Variable {
}

std::size_t freeMemory() {
#if _GLIBCXX_HAS_GTHREADS
const std::lock_guard<std::mutex> lockGuard(_mutex);
#endif
size_t retVal = 0;
BlockHeader* currentBlock = reinterpret_cast<BlockHeader*>(_head);

Expand All @@ -165,7 +175,9 @@ class Variable {
}

std::size_t maxBlockSize() {
#if _GLIBCXX_HAS_GTHREADS
const std::lock_guard<std::mutex> lockGuard(_mutex);
#endif
size_t retVal = 0;
BlockHeader* currentBlock = reinterpret_cast<BlockHeader*>(_head);

Expand Down Expand Up @@ -218,7 +230,9 @@ class Variable {
*/
unsigned char _buffer[(nrBlocks * ((blocksize / sizeof(BlockHeader) + ((blocksize % sizeof(BlockHeader)) ? 1 : 0)) + 1)) * sizeof(BlockHeader)];
BlockHeader* _head;
#if _GLIBCXX_HAS_GTHREADS
std::mutex _mutex;
#endif

#ifdef MEMPOL_DEBUG
std::size_t _bufferSize;
Expand Down
28 changes: 22 additions & 6 deletions src/Packets/Packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@ the LICENSE file.

namespace espMqttClientInternals {

#if EMC_USE_MEMPOOL
MemoryPool::Variable<EMC_NUM_POOL_ELEMENTS, EMC_SIZE_POOL_ELEMENTS> Packet::_memPool;
#endif

Packet::~Packet() {
#if EMC_USE_MEMPOOL
_memPool.free(_data);
#else
free(_data);
#endif
}

size_t Packet::available(size_t index) {
Expand Down Expand Up @@ -178,7 +186,7 @@ Packet::Packet(espMqttClientTypes::Error& error,
_packetId = 0;
}

if (!_allocate(remainingLength)) {
if (!_allocate(remainingLength, true)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
Expand Down Expand Up @@ -215,7 +223,7 @@ Packet::Packet(espMqttClientTypes::Error& error,
_packetId = 0;
}

if (!_allocate(remainingLength - payloadLength + std::min(payloadLength, static_cast<size_t>(EMC_RX_BUFFER_SIZE)))) {
if (!_allocate(remainingLength - payloadLength + std::min(payloadLength, static_cast<size_t>(EMC_RX_BUFFER_SIZE)), true)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
Expand Down Expand Up @@ -251,7 +259,7 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type, uint16_t p
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
if (!_allocate(2)) {
if (!_allocate(2, true)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
Expand Down Expand Up @@ -290,7 +298,7 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
if (!_allocate(0)) {
if (!_allocate(0, true)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
Expand All @@ -301,12 +309,20 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type)


bool Packet::_allocate(size_t remainingLength, bool check) {
#if EMC_USE_MEMPOOL
(void) check;
#else
if (check && EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
emc_log_w("Packet buffer not allocated: low memory");
return false;
}
#endif
_size = 1 + remainingLengthLength(remainingLength) + remainingLength;
#if EMC_USE_MEMPOOL
_data = reinterpret_cast<uint8_t*>(_memPool.malloc(_size));
#else
_data = reinterpret_cast<uint8_t*>(malloc(_size));
#endif
if (!_data) {
_size = 0;
emc_log_w("Alloc failed (l:%zu)", _size);
Expand Down Expand Up @@ -357,7 +373,7 @@ void Packet::_createSubscribe(espMqttClientTypes::Error& error,
size_t remainingLength = 2 + payload; // packetId + payload

// allocate memory
if (!_allocate(remainingLength)) {
if (!_allocate(remainingLength, true)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
Expand Down Expand Up @@ -387,7 +403,7 @@ void Packet::_createUnsubscribe(espMqttClientTypes::Error& error,
size_t remainingLength = 2 + payload; // packetId + payload

// allocate memory
if (!_allocate(remainingLength)) {
if (!_allocate(remainingLength, true)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
Expand Down
10 changes: 9 additions & 1 deletion src/Packets/Packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ the LICENSE file.
#include "RemainingLength.h"
#include "StringUtil.h"

#if EMC_USE_MEMPOOL
#include "MemoryPool/src/MemoryPool.h"
#endif

namespace espMqttClientInternals {

class Packet {
Expand Down Expand Up @@ -133,7 +137,7 @@ class Packet {

private:
// pass remainingLength = total size - header - remainingLengthLength!
bool _allocate(size_t remainingLength, bool check = true);
bool _allocate(size_t remainingLength, bool check);

// fills header and returns index of next available byte in buffer
size_t _fillPublishHeader(uint16_t packetId,
Expand All @@ -150,6 +154,10 @@ class Packet {

size_t _chunkedAvailable(size_t index);
const uint8_t* _chunkedData(size_t index) const;

#if EMC_USE_MEMPOOL
static MemoryPool::Variable<EMC_NUM_POOL_ELEMENTS, EMC_SIZE_POOL_ELEMENTS> _memPool;
#endif
};

} // end namespace espMqttClientInternals

0 comments on commit 2c75910

Please sign in to comment.