From 7b0880bf71379a0a17cc546802d4ea8d68777d25 Mon Sep 17 00:00:00 2001 From: msieben <4319079+msieben@users.noreply.github.com> Date: Thu, 15 Aug 2024 07:58:41 +0000 Subject: [PATCH] [Core/CyclicBuffer / Tests/unit/core] : fix build 'test_cyclicbuffer' and revert 'CyclicBuffer' changes --- Source/core/CyclicBuffer.cpp | 287 +++++----------- Source/core/CyclicBuffer.h | 31 +- Tests/unit/core/test_cyclicbuffer.cpp | 452 +++++++++++++++----------- 3 files changed, 356 insertions(+), 414 deletions(-) diff --git a/Source/core/CyclicBuffer.cpp b/Source/core/CyclicBuffer.cpp index c1e4201d5..5a3e8c873 100644 --- a/Source/core/CyclicBuffer.cpp +++ b/Source/core/CyclicBuffer.cpp @@ -20,10 +20,6 @@ #include "CyclicBuffer.h" #include "ProcessInfo.h" -#include // FUTEX_* constants -#include // Definition of SYS_* constants -#include - namespace Thunder { namespace Core { @@ -36,9 +32,6 @@ namespace Core { } - // Arbitrary value - constexpr time_t STARTUP_TIMEOUT = 10; // seconds - CyclicBuffer::CyclicBuffer(const string& fileName, const uint32_t mode, const uint32_t bufferSize, const bool overwrite) : _buffer( fileName, @@ -58,12 +51,10 @@ namespace Core { if (_buffer.IsValid() == true) { _administration = reinterpret_cast(_buffer.Buffer()); - - ASSERT(_administration != nullptr); - _realBuffer = (&(_buffer.Buffer()[sizeof(struct control)])); if (bufferSize != 0) { + #ifndef __WINDOWS__ pthread_condattr_t cond_attr; @@ -118,39 +109,7 @@ namespace Core { _administration->_tailIndexMask = (_administration->_tailIndexMask << 1) + 1; _administration->_roundCountModulo = _administration->_roundCountModulo >> 1; } - - // Signal mutex and condition variable are initialized and ready to be used -#ifdef __POSIX__ - // Assume first data member is a uint16_t - // Hint: C++20 has std::is_layout_compatible - if ( std::is_standard_layout>::value - && sizeof(uint16_t) == sizeof(std::atomic) - ) { - std::atomic_fetch_or(&(_administration->_state), static_cast(state::INITIALIZED)); - - long err = syscall(SYS_futex, reinterpret_cast(&_administration->_state), FUTEX_WAKE, std::numeric_limits::max() /* INT_MAX, number of waiters to wake up */, nullptr, 0, 0); - ASSERT(err != 1 /* number of waiters woken up */); DEBUG_VARIABLE(err); - } -#endif - } else { -#ifdef __POSIX__ - if ( std::is_standard_layout>::value - && sizeof(uint16_t) == sizeof(std::atomic) - ) { - const struct timespec timeout = {.tv_sec = STARTUP_TIMEOUT, .tv_nsec = 0}; - - const uint16_t value = _administration->_state.load(); - - if ((value & INITIALIZED) != INITIALIZED) { - // wait until value changes or timeout expires - long err = syscall(SYS_futex, reinterpret_cast(&_administration->_state), FUTEX_WAIT, value, &timeout, nullptr, 0); - - // ETIMEDOUT is allowed - ASSERT(err == 0 || (err != 0 /* see errno */ && errno == ETIMEDOUT)); DEBUG_VARIABLE(err); - } - } } -#endif } } @@ -183,95 +142,62 @@ namespace Core { if (_buffer.IsValid() == true) { _realBuffer = &(_buffer.Buffer()[sizeof(struct control) + actual_offset]); _administration = reinterpret_cast(&(_buffer.Buffer()[actual_offset])); + } - ASSERT(_administration != nullptr); + if (initiator == true) { - if (initiator == true) { #ifndef __WINDOWS__ - pthread_condattr_t cond_attr; + pthread_condattr_t cond_attr; - // default values - int ret = pthread_condattr_init(&cond_attr); - ASSERT(ret == 0); DEBUG_VARIABLE(ret); + // default values + int ret = pthread_condattr_init(&cond_attr); + ASSERT(ret == 0); DEBUG_VARIABLE(ret); - ret = pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); - ASSERT(ret == 0); DEBUG_VARIABLE(ret); + ret = pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); + ASSERT(ret == 0); DEBUG_VARIABLE(ret); - ret = pthread_cond_init(&(_administration->_signal), &cond_attr); - ASSERT(ret == 0); DEBUG_VARIABLE(ret); + ret = pthread_cond_init(&(_administration->_signal), &cond_attr); + ASSERT(ret == 0); DEBUG_VARIABLE(ret); - pthread_mutexattr_t mutex_attr; + pthread_mutexattr_t mutex_attr; - // default values - ret = pthread_mutexattr_init(&mutex_attr); - ASSERT(ret == 0); DEBUG_VARIABLE(ret); + // default values + ret = pthread_mutexattr_init(&mutex_attr); + ASSERT(ret == 0); DEBUG_VARIABLE(ret); - // enable access for threads, also in different processes - ret = pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); - ASSERT(ret == 0); DEBUG_VARIABLE(ret); + // enable access for threads, also in different processes + ret = pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + ASSERT(ret == 0); DEBUG_VARIABLE(ret); #ifdef __DEBUG__ - ret = pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK); - ASSERT(ret == 0); DEBUG_VARIABLE(ret); + ret = pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK); + ASSERT(ret == 0); DEBUG_VARIABLE(ret); #endif - ret = pthread_mutex_init(&(_administration->_mutex), &mutex_attr); - ASSERT(ret ==0); DEBUG_VARIABLE(ret); + ret = pthread_mutex_init(&(_administration->_mutex), &mutex_attr); + ASSERT(ret ==0); DEBUG_VARIABLE(ret); #endif - std::atomic_init(&(_administration->_head), static_cast(0)); - std::atomic_init(&(_administration->_tail), static_cast(0)); - std::atomic_init(&(_administration->_agents), static_cast(0)); - std::atomic_init(&(_administration->_state), static_cast(state::UNLOCKED /* state::EMPTY */ | (overwrite ? state::OVERWRITE : 0))); - _administration->_lockPID = 0; - _administration->_size = static_cast(actual_bufferSize - sizeof(struct control)); + std::atomic_init(&(_administration->_head), static_cast(0)); + std::atomic_init(&(_administration->_tail), static_cast(0)); + std::atomic_init(&(_administration->_agents), static_cast(0)); + std::atomic_init(&(_administration->_state), static_cast(state::UNLOCKED /* state::EMPTY */ | (overwrite ? state::OVERWRITE : 0))); + _administration->_lockPID = 0; + _administration->_size = static_cast(actual_bufferSize - sizeof(struct control)); - _administration->_reserved = 0; - _administration->_reservedWritten = 0; + _administration->_reserved = 0; + _administration->_reservedWritten = 0; #ifndef __WINDOWS__ - std::atomic_init(&(_administration->_reservedPID), static_cast(0)); + std::atomic_init(&(_administration->_reservedPID), static_cast(0)); #else - std::atomic_init(&(_administration->_reservedPID), static_cast(0)); -#endif - - _administration->_tailIndexMask = 1; - _administration->_roundCountModulo = 1L << 31; - while (_administration->_tailIndexMask < _administration->_size) { - _administration->_tailIndexMask = (_administration->_tailIndexMask << 1) + 1; - _administration->_roundCountModulo = _administration->_roundCountModulo >> 1; - } - - // Signal mutex and condition variable are initialized and ready to be used -#ifdef __POSIX__ - // Assume first data member is a uint16_t - // Hint: C++20 has std::is_layout_compatible - if ( std::is_standard_layout>::value - && sizeof(uint16_t) == sizeof(std::atomic) - ) { - std::atomic_fetch_or(&(_administration->_state), static_cast(state::INITIALIZED)); - - long err = syscall(SYS_futex, reinterpret_cast(&_administration->_state), FUTEX_WAKE, std::numeric_limits::max() /* INT_MAX, number of waiters to wake up */, nullptr, 0, 0); - ASSERT(err != 1 /* number of waiters woken up */); DEBUG_VARIABLE(err); - } + std::atomic_init(&(_administration->_reservedPID), static_cast(0)); #endif - } else { -#ifdef __POSIX__ - if ( std::is_standard_layout>::value - && sizeof(uint16_t) == sizeof(std::atomic) - ) { - const struct timespec timeout = {.tv_sec = STARTUP_TIMEOUT, .tv_nsec = 0}; - const uint16_t value = _administration->_state.load(); - - if ((value & INITIALIZED) != INITIALIZED) { - // wait until value changes or timeout expires - long err = syscall(SYS_futex, reinterpret_cast(&_administration->_state), FUTEX_WAIT, value, &timeout, nullptr, 0); - - // ETIMEDOUT is allowed - ASSERT(err == 0 || (err != 0 /* see errno */ && errno == ETIMEDOUT)); DEBUG_VARIABLE(err); - } - } -#endif + _administration->_tailIndexMask = 1; + _administration->_roundCountModulo = 1L << 31; + while (_administration->_tailIndexMask < _administration->_size) { + _administration->_tailIndexMask = (_administration->_tailIndexMask << 1) + 1; + _administration->_roundCountModulo = _administration->_roundCountModulo >> 1; } } } @@ -307,8 +233,6 @@ namespace Core { void CyclicBuffer::AdminLock() { #ifdef __POSIX__ - ASSERT(_administration != nullptr); - int ret = pthread_mutex_lock(&(_administration->_mutex)); ASSERT(ret == 0); DEBUG_VARIABLE(ret); #else @@ -326,28 +250,23 @@ namespace Core { // This is in MS... uint32_t CyclicBuffer::SignalLock(const uint32_t waitTime) { + uint32_t result = waitTime; if (waitTime != Core::infinite) { #ifdef __POSIX__ - struct timespec structTime; + struct timespec structTime = {0,0}; - clock_gettime(CLOCK_MONOTONIC_RAW, &structTime); + clock_gettime(CLOCK_MONOTONIC, &structTime); structTime.tv_nsec += ((waitTime % 1000) * 1000 * 1000); /* remainder, milliseconds to nanoseconds */ structTime.tv_sec += (waitTime / 1000); // + (structTime.tv_nsec / 1000000000); /* milliseconds to seconds */ structTime.tv_nsec = structTime.tv_nsec % 1000000000; - AdminLock(); - -// ASSERT(_administration != nullptr); - if (pthread_cond_timedwait(&(_administration->_signal), &(_administration->_mutex), &structTime) != 0) { - AdminUnlock(); + struct timespec nowTime = {0,0}; - struct timespec nowTime; - - clock_gettime(CLOCK_MONOTONIC_RAW, &nowTime); + clock_gettime(CLOCK_MONOTONIC, &nowTime); if (nowTime.tv_nsec > structTime.tv_nsec) { result = (nowTime.tv_sec - structTime.tv_sec) * 1000 + ((nowTime.tv_nsec - structTime.tv_nsec) / 1000000); @@ -356,11 +275,7 @@ namespace Core { result = (nowTime.tv_sec - structTime.tv_sec - 1) * 1000 + ((1000000000 - (structTime.tv_nsec - nowTime.tv_nsec)) / 1000000); } TRACE_L1("End wait. %d\n", result); - } else { - AdminUnlock(); - ASSERT(false); } - #else if (::WaitForSingleObjectEx(_signal, waitTime, FALSE) == WAIT_OBJECT_0) { @@ -373,14 +288,7 @@ namespace Core { ASSERT(result <= waitTime); } else { #ifdef __POSIX__ - AdminLock(); - -// ASSERT(_administration != nullptr); - - int err = pthread_cond_wait(&(_administration->_signal), &(_administration->_mutex)); - ASSERT(err == 0); DEBUG_VARIABLE(err); - - AdminUnlock(); + pthread_cond_wait(&(_administration->_signal), &(_administration->_mutex)); #else ::WaitForSingleObjectEx(_signal, INFINITE, FALSE); #endif @@ -391,8 +299,6 @@ namespace Core { void CyclicBuffer::AdminUnlock() { #ifdef __POSIX__ - ASSERT(_administration != nullptr); - int ret = pthread_mutex_unlock(&(_administration->_mutex)); ASSERT(ret == 0); DEBUG_VARIABLE(ret); #else @@ -404,38 +310,41 @@ namespace Core { } void CyclicBuffer::Reevaluate() - { - ASSERT(_administration != nullptr); + { // See if we need to have some interested actor reevaluate its state.. if (_administration->_agents.load() > 0) { #ifdef __POSIX__ - pthread_cond_signal(&(_administration->_signal)); + for (int index = _administration->_agents.load(); index != 0; index--) { + pthread_cond_signal(&(_administration->_signal)); + } #else ReleaseSemaphore(_signal, _administration->_agents.load(), nullptr); #endif - std::this_thread::yield(); + // Wait till all waiters have seen the trigger.. + while (_administration->_agents.load() > 0) { + std::this_thread::yield(); + } } } void CyclicBuffer::Alert() { + // Lock the administrator.. AdminLock(); _alert = true; - AdminUnlock(); - Reevaluate(); + + AdminUnlock(); } uint32_t CyclicBuffer::Read(uint8_t buffer[], const uint32_t length, bool partialRead) { - ASSERT(_administration != nullptr); - ASSERT(length <= _administration->_size); ASSERT(IsValid() == true); @@ -458,7 +367,7 @@ namespace Core { break; } - Cursor cursor(*this, oldTail, std::min(length, result)); + Cursor cursor(*this, oldTail, length); result = GetReadSize(cursor); //data was found if result is greater than 0 and the tail was not moved by the writer. @@ -518,9 +427,7 @@ namespace Core { uint32_t CyclicBuffer::Write(const uint8_t buffer[], const uint32_t length) { - ASSERT(_administration != nullptr); - - ASSERT(length <= _administration->_size); + ASSERT(length < _administration->_size); ASSERT(IsValid() == true); uint32_t head = _administration->_head; @@ -531,14 +438,11 @@ namespace Core { if (_administration->_reservedPID != 0) { #ifdef __WINDOWS__ // We are writing because of reservation. - if (_administration->_reservedPID != ::GetCurrentThreadId()) { + ASSERT(_administration->_reservedPID == ::GetCurrentProcessId()); #else // We are writing because of reservation. - if (_administration->_reservedPID != ::gettid()) { + ASSERT(_administration->_reservedPID == ::getpid()); #endif - // We are not the one that made the reservation so skip any writing for now - return 0; - } // Check if we are not writing more than reserved. uint32_t newReservedWritten = _administration->_reservedWritten + length; @@ -588,10 +492,12 @@ namespace Core { if (startingEmpty) { // Was empty before, tell observers about new data. + AdminLock(); Reevaluate(); - DataAvailable(); + + AdminUnlock(); } else { //The tail moved during write which could mean the reader read everything from the buffer //and won't be notified about new data coming in, because the writer thinks it is not empty. @@ -607,8 +513,6 @@ namespace Core { void CyclicBuffer::AssureFreeSpace(uint32_t required) { - ASSERT(_administration != nullptr); - uint32_t oldTail = _administration->_tail; uint32_t tail = oldTail & _administration->_tailIndexMask; uint32_t free = Free(_administration->_head, tail); @@ -620,6 +524,7 @@ namespace Core { ASSERT((offset + free) >= required); uint32_t newTail = cursor.GetCompleteTail(offset); + if (std::atomic_compare_exchange_weak(&(_administration->_tail), &oldTail, newTail) == false) { oldTail = _administration->_tail; tail = oldTail & _administration->_tailIndexMask; @@ -634,25 +539,21 @@ namespace Core { uint32_t CyclicBuffer::Reserve(const uint32_t length) { #ifdef __WINDOWS__ - DWORD processId = ::GetCurrentThreadId(); + DWORD processId = GetCurrentProcessId(); DWORD expectedProcessId = static_cast(0); #else - pid_t processId = ::gettid(); + pid_t processId = ::getpid(); pid_t expectedProcessId = static_cast(0); #endif - ASSERT(_administration != nullptr); - if ((length >= Size()) || (((_administration->_state.load() & state::OVERWRITE) == 0) && (length >= Free()))) return Core::ERROR_INVALID_INPUT_LENGTH; - // Multiple can do a request but only one should be allowed to continue. The others should wait until a new opportunity arises. - bool noOtherReservation = atomic_compare_exchange_strong(&(_administration->_reservedPID), &expectedProcessId, processId); + ASSERT(noOtherReservation); - if (!noOtherReservation) { // Request not honored + if (!noOtherReservation) return Core::ERROR_ILLEGAL_STATE; - } uint32_t actualLength = length; if (length >= _administration->_size) { @@ -666,13 +567,7 @@ namespace Core { _administration->_reserved = actualLength; _administration->_reservedWritten = 0; - return Core::ERROR_NONE; - } - - uint32_t CyclicBuffer::ReservedRemaining() const - { - ASSERT(_administration != nullptr); - return _administration->_reserved - _administration->_reservedWritten; + return actualLength; } uint32_t CyclicBuffer::Lock(const bool dataPresent, const uint32_t waitTime) @@ -680,29 +575,26 @@ namespace Core { uint32_t result = Core::ERROR_TIMEDOUT; uint32_t timeLeft = waitTime; - do { - AdminLock(); + // Lock can not be called recursive, unlock if you would like to lock it.. + ASSERT(_administration->_lockPID == 0); -// ASSERT(_administration != nullptr); + // Lock the administrator.. + AdminLock(); + + do { if ((((_administration->_state.load()) & state::LOCKED) != state::LOCKED) && ((dataPresent == false) || (Used() > 0))) { std::atomic_fetch_or(&(_administration->_state), static_cast(state::LOCKED)); // Remember that we, as a process, took the lock -#ifdef __WINDOWS__ - _administration->_lockPID = ::GetCurrentThreadId(); -#else - _administration->_lockPID = gettid(); -#endif - + _administration->_lockPID = Core::ProcessInfo().Id(); result = Core::ERROR_NONE; } else if (timeLeft > 0) { - AdminUnlock(); - - ASSERT(_administration != nullptr); _administration->_agents++; + AdminUnlock(); + timeLeft = SignalLock(timeLeft); _administration->_agents--; @@ -715,50 +607,43 @@ namespace Core { } } - AdminUnlock(); } while ((timeLeft > 0) && (result == Core::ERROR_TIMEDOUT)); + AdminUnlock(); return (result); } uint32_t CyclicBuffer::Unlock() { + uint32_t result(Core::ERROR_ILLEGAL_STATE); - ASSERT(_administration != nullptr); + // Lock the administrator.. + AdminLock(); + // Lock can not be called recursive, unlock if you would like to lock it.. + ASSERT(_administration->_lockPID == Core::ProcessInfo().Id()); ASSERT((_administration->_state.load() & state::LOCKED) == state::LOCKED); - AdminLock(); - // Only unlock if it is "our" lock. -#ifdef __WINDOWS__ - if (_administration->_lockPID == ::GetCurrentThreadId()) { -#else - if (_administration->_lockPID == gettid()) { -#endif - _administration->_lockPID = 0; - - AdminUnlock(); - - ASSERT(_administration != nullptr); + if (_administration->_lockPID == Core::ProcessInfo().Id()) { + _administration->_lockPID = 0; std::atomic_fetch_and(&(_administration->_state), static_cast(~state::LOCKED)); Reevaluate(); result = Core::ERROR_NONE; - } else { - AdminUnlock(); } + AdminUnlock(); + return (result); } uint32_t CyclicBuffer::Peek(uint8_t buffer[], const uint32_t length) const { - ASSERT(_administration != nullptr); ASSERT(length <= _administration->_size); ASSERT(IsValid() == true); diff --git a/Source/core/CyclicBuffer.h b/Source/core/CyclicBuffer.h index 22c2c2c3c..b588a6f54 100644 --- a/Source/core/CyclicBuffer.h +++ b/Source/core/CyclicBuffer.h @@ -30,8 +30,6 @@ #include #endif -#include - // ---- Referenced classes and types ---- // ---- Helper types and constants ---- @@ -67,8 +65,6 @@ namespace Core { template void Peek(ArgType& buffer) const { - ASSERT(_Parent._administration != nullptr); - uint32_t startIndex = _Tail & _Parent._administration->_tailIndexMask; startIndex += _Offset; startIndex %= _Parent._administration->_size; @@ -98,8 +94,7 @@ namespace Core { uint32_t GetCompleteTail(uint32_t offset) const { - ASSERT(_Parent._administration != nullptr); - ASSERT(_Parent._administration->_tailIndexMask < std::numeric_limits::max()); + ASSERT(_Parent._administration->_tailIndexMask < static_cast(~0)); uint32_t oldTail = _Tail; uint32_t roundCount = oldTail / (1 + _Parent._administration->_tailIndexMask); @@ -136,16 +131,12 @@ namespace Core { inline uint32_t Used(uint32_t head, uint32_t tail) const { - ASSERT(_administration != nullptr); - uint32_t output = (head >= tail ? head - tail : _administration->_size - (tail - head)); return output; } inline uint32_t Free(uint32_t head, uint32_t tail) const { - ASSERT(_administration != nullptr); - uint32_t result = (head >= tail ? _administration->_size - (head - tail) : tail - head); return result; } @@ -153,14 +144,10 @@ namespace Core { public: inline void Flush() { - ASSERT(_administration != nullptr); - std::atomic_store_explicit(&(_administration->_tail), (std::atomic_load(&(_administration->_head))), std::memory_order_relaxed); } inline bool Overwritten() const { - ASSERT(_administration != nullptr); - bool overwritten((std::atomic_load(&(_administration->_state)) & OVERWRITTEN) == OVERWRITTEN); // Now clear the flag. @@ -190,20 +177,14 @@ namespace Core { } inline bool IsLocked() const { - ASSERT(_administration != nullptr); - return ((std::atomic_load(&(_administration->_state)) & LOCKED) == LOCKED); } inline uint32_t LockPid() const { - ASSERT(_administration != nullptr); - return (_administration->_lockPID); } inline bool IsOverwrite() const { - ASSERT(_administration != nullptr); - return ((std::atomic_load(&(_administration->_state)) & OVERWRITE) == OVERWRITE); } inline bool IsValid() const @@ -216,8 +197,6 @@ namespace Core { } inline uint32_t Used() const { - ASSERT(_administration != nullptr); - uint32_t head(_administration->_head); uint32_t tail(_administration->_tail & _administration->_tailIndexMask); @@ -225,8 +204,6 @@ namespace Core { } inline uint32_t Free() const { - ASSERT(_administration != nullptr); - uint32_t head(_administration->_head); uint32_t tail(_administration->_tail & _administration->_tailIndexMask); @@ -234,8 +211,6 @@ namespace Core { } inline uint32_t Size() const { - ASSERT(_administration != nullptr); - return (_administration->_size); } bool Open(); @@ -270,7 +245,6 @@ namespace Core { // This allows for writes of partial buffers without worrying about // readers seeing incomplete data. uint32_t Reserve(const uint32_t length); - uint32_t ReservedRemaining() const; virtual void DataAvailable(); @@ -312,8 +286,7 @@ namespace Core { UNLOCKED = 0x00, LOCKED = 0x01, OVERWRITE = 0x02, - OVERWRITTEN = 0x04, - INITIALIZED = 0x08 // simplified signal for non-creator mutex and condition variable are initialized + OVERWRITTEN = 0x04 }; Core::DataElementFile _buffer; diff --git a/Tests/unit/core/test_cyclicbuffer.cpp b/Tests/unit/core/test_cyclicbuffer.cpp index b1031598c..8c7277b09 100644 --- a/Tests/unit/core/test_cyclicbuffer.cpp +++ b/Tests/unit/core/test_cyclicbuffer.cpp @@ -910,52 +910,53 @@ namespace Core { constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; constexpr uint8_t maxRetries = 1; - std::string bufferName {"cyclicbuffer01"}; - const uint32_t CyclicBufferSize = 10; + const std::string bufferName {"cyclicbuffer01"}; - uint32_t shareableFlag = (shareable == true) ? ::Thunder::Core::File::Mode::SHAREABLE : 0; - const uint32_t mode = - (::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - shareableFlag); + constexpr uint32_t CyclicBufferSize = 10; + constexpr uint32_t cyclicBufferWithControlDataSize = CyclicBufferSize + sizeof(::Thunder::Core::CyclicBuffer::control); - struct Data data; - data.mode = mode; - data.bufferName = bufferName.c_str(); - data.shareable = shareable; - data.usingDataElementFile = usingDataElementFile; - data.offset = offset; + const uint32_t mode = ( ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | (shareable ? ::Thunder::Core::File::Mode::SHAREABLE : 0) + ); + + const struct Data data{shareable, usingDataElementFile, mode, offset, bufferName.c_str()}; IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { -// struct Data* data = (reinterpret_cast(testAdmin.Data())); + // a small delay so the parent can be set up + SleepMs(maxInitTime); + uint32_t result; - uint32_t cyclicBufferSize = CyclicBufferSize; - uint8_t loadBuffer[cyclicBufferSize + 1]; + uint8_t loadBuffer[CyclicBufferSize + 1]; - CyclicBufferTest* buffer = nullptr; + CyclicBufferTest* buffer = nullptr; ::Thunder::Core::DataElementFile* dataElementFile = nullptr; + if (data.usingDataElementFile == true) { - uint8_t cyclicBufferWithControlDataSize = cyclicBufferSize + sizeof(::Thunder::Core::CyclicBuffer::control); dataElementFile = new ::Thunder::Core::DataElementFile(bufferName, data.mode | ::Thunder::Core::File::CREATE, cyclicBufferWithControlDataSize + data.offset); + + ASSERT_TRUE(dataElementFile != nullptr); + buffer = new CyclicBufferTest(*dataElementFile, true, data.offset, cyclicBufferWithControlDataSize, false); } else { - buffer = new CyclicBufferTest(bufferName, data.mode, cyclicBufferSize, false); + buffer = new CyclicBufferTest(bufferName, data.mode, CyclicBufferSize, false); } - EXPECT_EQ(buffer->Size(), cyclicBufferSize); -// testAdmin.Sync("setup server"); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + ASSERT_TRUE(buffer != nullptr); + + EXPECT_EQ(buffer->Size(), CyclicBufferSize); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("setup client"); -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); if (data.shareable == true) { -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("client read empty data"); - string dataStr = "abcd"; + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + string dataStr = "abcd"; result = buffer->Write(reinterpret_cast(dataStr.c_str()), dataStr.size()); EXPECT_EQ(result, dataStr.size()); @@ -963,126 +964,108 @@ namespace Core { result = buffer->Write(reinterpret_cast(dataStr.c_str()), dataStr.size()); EXPECT_EQ(result, dataStr.size()); -// testAdmin.Sync("server wrote"); - ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - -// testAdmin.Sync("client read"); -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("client wrote"); -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); result = buffer->Peek(loadBuffer, buffer->Used()); loadBuffer[result] = '\0'; EXPECT_EQ(result, 9u); - EXPECT_STREQ((char*)loadBuffer, "bcdefghkl"); + + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "bcdefghkl"); EXPECT_EQ(buffer->Used(), 9u); -// testAdmin.Sync("server peek"); - ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("server start read"); result = buffer->Read(loadBuffer, buffer->Used()); loadBuffer[result] = '\0'; + EXPECT_EQ(result, 9u); - EXPECT_STREQ((char*)loadBuffer, "bcdefghkl"); - ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "bcdefghkl"); EXPECT_EQ(buffer->Used(), 0u); -// testAdmin.Sync("server read"); -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); delete buffer; + if (dataElementFile) { delete dataElementFile; } } }; -// acting as vclient IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { - // a small delay so the child can be set up - SleepMs(maxInitTime); - -// testAdmin.Sync("setup server"); ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); uint32_t result; - uint32_t cyclicBufferSize = CyclicBufferSize; - uint8_t loadBuffer[cyclicBufferSize]; + uint8_t loadBuffer[CyclicBufferSize]; - CyclicBufferTest* buffer; + CyclicBufferTest* buffer = nullptr; ::Thunder::Core::DataElementFile* dataElementFile = nullptr; + if (usingDataElementFile == true) { dataElementFile = new ::Thunder::Core::DataElementFile(bufferName, mode, 0); + + ASSERT_TRUE(dataElementFile != nullptr); + buffer = new CyclicBufferTest(*dataElementFile, false, offset, 0, false); } else { buffer = new CyclicBufferTest(bufferName, mode, 0, false); } - EXPECT_EQ(buffer->Size(), static_cast(((shareable == true) ?CyclicBufferSize : 0))); -// testAdmin.Sync("setup client"); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + ASSERT_TRUE(buffer != nullptr); + + EXPECT_EQ(buffer->Size(), static_cast((shareable ? CyclicBufferSize : 0))); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); if (shareable == true) { - memset(loadBuffer, 0, cyclicBufferSize); + memset(loadBuffer, 0, CyclicBufferSize); + result = buffer->Read(loadBuffer, 1, false); - EXPECT_EQ(result, static_cast(0)); + EXPECT_EQ(result, 0); -// testAdmin.Sync("client read empty data"); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("server wrote"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); result = buffer->Read(loadBuffer, 1, false); - EXPECT_EQ(result, static_cast(1)); + EXPECT_EQ(result, 1); + loadBuffer[result] = '\0'; - EXPECT_STREQ((char*)loadBuffer, "a"); -// testAdmin.Sync("client read"); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "a"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); string data = "kl"; result = buffer->Reserve(data.size()); EXPECT_EQ(result, 2u); + result = buffer->Write(reinterpret_cast(data.c_str()), data.size()); EXPECT_EQ(result, 2u); -// testAdmin.Sync("client wrote"); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("server peek"); - ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); EXPECT_EQ(buffer->Used(), 9u); -// testAdmin.Sync("server start read"); - ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("server read"); -// ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); EXPECT_EQ(buffer->Used(), 0u); } + buffer->Close(); + delete buffer; + if (dataElementFile) { delete dataElementFile; } @@ -1115,36 +1098,49 @@ namespace Core { constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; constexpr uint8_t maxRetries = 1; - std::string bufferName {"cyclicbuffer02"}; + const std::string bufferName {"cyclicbuffer02"}; IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { -// testAdmin.Sync("setup client"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - uint32_t cyclicBufferSize = 0; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + constexpr uint32_t cyclicBufferSize = 0; - ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; -// testAdmin.Sync("setup server"); -// testAdmin.Sync("client wrote"); + ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); + + ASSERT_TRUE(buffer.IsValid()); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint8_t loadBuffer[cyclicBufferSize + 1]; + uint32_t result = buffer.Read(loadBuffer, 4); loadBuffer[result] = '\0'; - EXPECT_STREQ((char*)loadBuffer, "abcd"); -// testAdmin.Sync("server read"); - string data = "klmnopq"; + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "abcd"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + const string data = "klmnopq"; + result = buffer.Reserve(data.size()); EXPECT_EQ(result, ::Thunder::Core::ERROR_INVALID_INPUT_LENGTH); + result = buffer.Write(reinterpret_cast(data.c_str()), data.size()); EXPECT_EQ(result, 0u); -// testAdmin.Sync("server wrote"); -// testAdmin.Sync("client peek"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); EXPECT_FALSE(buffer.Overwritten()); EXPECT_FALSE(buffer.IsLocked()); @@ -1152,54 +1148,74 @@ namespace Core { EXPECT_EQ(buffer.LockPid(), 0u); EXPECT_EQ(buffer.Free(), 5u); -// testAdmin.Sync("client start read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + EXPECT_STREQ(buffer.Name().c_str(), bufferName.c_str()); EXPECT_STREQ(buffer.Storage().Name().c_str(), bufferName.c_str()); -// testAdmin.Sync("client read"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(buffer.Used(), 0u); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); }; IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { // a small delay so the child can be set up SleepMs(maxInitTime); - - uint32_t cyclicBufferSize = 10; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + constexpr uint32_t cyclicBufferSize = 10; - ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; -// testAdmin.Sync("setup client"); -// testAdmin.Sync("setup server"); + ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); + + ASSERT_TRUE(buffer.IsValid()); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint8_t loadBuffer[cyclicBufferSize + 1]; - string data = "abcdefghi"; + const string data = "abcdefghi"; + uint32_t result = buffer.Write(reinterpret_cast(data.c_str()), data.size()); EXPECT_EQ(result, data.size()); -// testAdmin.Sync("client wrote"); -// testAdmin.Sync("server read"); -// testAdmin.Sync("server wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); result = buffer.Peek(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; + EXPECT_EQ(result, 5u); - EXPECT_STREQ((char*)loadBuffer, "efghi"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "efghi"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("client peek"); -// testAdmin.Sync("client start read"); result = buffer.Read(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; + EXPECT_EQ(result, 5u); - EXPECT_STREQ((char*)loadBuffer, "efghi"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "efghi"); EXPECT_EQ(buffer.Used(), 0u); -// testAdmin.Sync("client read"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); buffer.Close(); }; @@ -1215,80 +1231,107 @@ namespace Core { constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; constexpr uint8_t maxRetries = 1; - std::string bufferName {"cyclicbuffer03"}; + const std::string bufferName {"cyclicbuffer03"}; IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { - uint32_t cyclicBufferSize = 10; + // a small delay so the parent can be set up + SleepMs(maxInitTime); - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + constexpr uint32_t cyclicBufferSize = 10; - CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; + + CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); -// testAdmin.Sync("setup server"); -// testAdmin.Sync("setup client"); + ASSERT_TRUE(buffer.IsValid()); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + const string data = "abcdef"; + const uint16_t size = data.size() + 2; - string data = "abcdef"; - uint16_t size = data.size() + 2; uint32_t result = buffer.Write(reinterpret_cast(&size), 2u); EXPECT_EQ(result, 2u); + result = buffer.Write(reinterpret_cast(data.c_str()), data.size()); EXPECT_EQ(result, data.size()); -// testAdmin.Sync("server wrote"); -// testAdmin.Sync("client wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint8_t loadBuffer[cyclicBufferSize + 1]; + result = buffer.Peek(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; -// testAdmin.Sync("server peek"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); result = buffer.Read(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + buffer.Alert(); buffer.Flush(); -// testAdmin.Sync("server read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); }; IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { - // a small delay so the child can be set up - SleepMs(maxInitTime); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("setup server"); + constexpr uint32_t cyclicBufferSize = 0; - uint32_t cyclicBufferSize = 0; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; - CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); + CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); -// testAdmin.Sync("setup client"); -// testAdmin.Sync("server wrote"); + ASSERT_TRUE(buffer.IsValid()); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); string data = "j"; - uint16_t size = 9; + const uint16_t size = 9; + uint32_t result = buffer.Reserve(9); EXPECT_EQ(result, 9u); + result = buffer.Write(reinterpret_cast(&size), 2u); EXPECT_EQ(result, 2u); + result = buffer.Write(reinterpret_cast(data.c_str()), 1u); EXPECT_EQ(result, data.size()); data = "klmnop"; + result = buffer.Write(reinterpret_cast(data.c_str()), 6u); EXPECT_EQ(result, data.size()); -// testAdmin.Sync("client wrote"); -// testAdmin.Sync("server peek"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("server read"); buffer.Close(); }; @@ -1303,101 +1346,140 @@ namespace Core { constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; constexpr uint8_t maxRetries = 1; - std::string bufferName {"cyclicbuffer03"}; + const std::string bufferName {"cyclicbuffer03"}; IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { - uint32_t cyclicBufferSize = 0; -// testAdmin.Sync("setup server"); + constexpr uint32_t cyclicBufferSize = 0; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; + + CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); EXPECT_TRUE(buffer.IsOverwrite()); -// testAdmin.Sync("setup client"); -// testAdmin.Sync("server wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint16_t size; - buffer.Read(reinterpret_cast(&size), 2); + EXPECT_EQ(buffer.Read(reinterpret_cast(&size), 2), 2); + uint8_t loadBuffer[cyclicBufferSize + 1]; uint32_t result = buffer.Read(loadBuffer, size - 2); loadBuffer[result] = '\0'; + EXPECT_EQ(result, size - 2); -// testAdmin.Sync("client read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); string data = "j"; size = 9; + result = buffer.Reserve(size); + EXPECT_EQ(result, size); + result = buffer.Write(reinterpret_cast(&size), 2); + EXPECT_EQ(result, size); + result = buffer.Write(reinterpret_cast(data.c_str()), 1); + EXPECT_EQ(result, size); + data = "lmnopq"; result = buffer.Write(reinterpret_cast(data.c_str()), 6); + EXPECT_EQ(result, 6); + EXPECT_EQ(buffer.Used(), 9u); EXPECT_EQ(buffer.Overwritten(), false); size = 7; result = buffer.Reserve(size); + EXPECT_EQ(result, size); + result = buffer.Write(reinterpret_cast(&size), 2); + EXPECT_EQ(result, size); + result = buffer.Write(reinterpret_cast(data.c_str()), size - 2); + EXPECT_EQ(result, size); + EXPECT_EQ(buffer.Free(), 3u); EXPECT_EQ(buffer.Used(), 7u); -// testAdmin.Sync("client wrote"); -// testAdmin.Sync("server peek"); -// testAdmin.Sync("server read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); buffer.Flush(); EXPECT_EQ(buffer.Used(), 0u); -// testAdmin.Sync("client flush"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); }; IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { // a small delay so the child can be set up SleepMs(maxInitTime); - uint32_t cyclicBufferSize = 10; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + constexpr uint32_t cyclicBufferSize = 10; + + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; + + CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); - CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); EXPECT_TRUE(buffer.IsOverwrite()); EXPECT_TRUE(buffer.IsValid()); -// testAdmin.Sync("setup server"); -// testAdmin.Sync("setup client"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint8_t loadBuffer[cyclicBufferSize + 1]; uint16_t size = 9; string data = "abcdefi"; uint32_t result = buffer.Write(reinterpret_cast(&size), 2); + result = buffer.Write(reinterpret_cast(data.c_str()), size - 2); EXPECT_EQ(result, data.size()); -// testAdmin.Sync("server wrote"); -// testAdmin.Sync("client read"); -// testAdmin.Sync("client wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); buffer.Peek(reinterpret_cast(&size), 2); EXPECT_EQ(size, buffer.Used()); + result = buffer.Peek(loadBuffer, size); loadBuffer[result] = '\0'; EXPECT_EQ(result, 7u); - EXPECT_STREQ((char*)loadBuffer +2, "lmnop"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0] + 2), "lmnop"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); -// testAdmin.Sync("server peek"); EXPECT_EQ(buffer.Free(), 3u); + buffer.Read(reinterpret_cast(&size), 2); EXPECT_EQ(buffer.Used(), static_cast(size - 2)); + result = buffer.Read(loadBuffer, size - 2); loadBuffer[result] = '\0'; EXPECT_EQ(result, 5u); - EXPECT_STREQ((char*)loadBuffer, "lmnop"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "lmnop"); + EXPECT_EQ(buffer.Free(), 10u); EXPECT_EQ(buffer.Used(), 0u); @@ -1405,8 +1487,10 @@ namespace Core { EXPECT_FALSE(buffer.IsLocked()); EXPECT_EQ(buffer.LockPid(), 0u); -// testAdmin.Sync("server read"); -// testAdmin.Sync("client flush"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(buffer.Free(), cyclicBufferSize); buffer.Close();