diff --git a/Source/core/Singleton.h b/Source/core/Singleton.h index 7b321dc74..e769cc0d9 100644 --- a/Source/core/Singleton.h +++ b/Source/core/Singleton.h @@ -139,7 +139,7 @@ namespace Core { } inline static SINGLETON& Instance() { - // As available does not see through friend clas/protected + // As available does not see through friend clas/protected // declarations, we can not rely on the output of it. // If this Instance method id called, assume it has a // default constructor.. diff --git a/Tests/CMakeLists.txt b/Tests/CMakeLists.txt index 3b5921e48..e4aacf55a 100644 --- a/Tests/CMakeLists.txt +++ b/Tests/CMakeLists.txt @@ -32,3 +32,7 @@ endif() if(MESSAGEBUFFER_TEST) add_subdirectory(message-buffer) endif() + +if(CYCLICBUFFER_TEST) + add_subdirectory(cyclic-buffer) +endif() diff --git a/Tests/cyclic-buffer/CMakeLists.txt b/Tests/cyclic-buffer/CMakeLists.txt new file mode 100644 index 000000000..c5ac3c112 --- /dev/null +++ b/Tests/cyclic-buffer/CMakeLists.txt @@ -0,0 +1,31 @@ +# If not stated otherwise in this file or this component's license file the +# following copyright and licenses apply: +# +# Copyright 2020 Metrological +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(CyclicBuffer main.cpp) + +set_target_properties(CyclicBuffer PROPERTIES + CXX_STANDARD 11 + CXX_STANDARD_REQUIRED YES +) + +target_link_libraries(CyclicBuffer + PRIVATE + ${NAMESPACE}Core +) + +install(TARGETS CyclicBuffer DESTINATION bin) + diff --git a/Tests/cyclic-buffer/main.cpp b/Tests/cyclic-buffer/main.cpp new file mode 100644 index 000000000..564cccf36 --- /dev/null +++ b/Tests/cyclic-buffer/main.cpp @@ -0,0 +1,72 @@ +/* + * If not stated otherwise in this file or this component's LICENSE file the + * following copyright and licenses apply: + * + * Copyright 2020 Metrological + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "process.h" + +#include + +#define ASYNC_TIMEOUT_BEGIN \ + std::promise promise; \ + std::future future = promise.get_future(); \ + std::thread([&](std::promise completed) \ + { /* Before code that should complete before timeout expires */ + +#define ASYNC_TIMEOUT_END(MILLISECONDS /* timeout in milliseconds */, RESULT /* variable that has boolean result of executed code */) \ + /* After code that should complete timely */ \ + /* completed.set_value(true); */ \ + completed.set_value_at_thread_exit(RESULT); \ + } \ + , std::move(promise)).detach() \ + ; \ + if (future.wait_for(std::chrono::milliseconds(MILLISECONDS)) == std::future_status::timeout) { /* Task completed before timeout */ \ + TRACE_L1(_T("Error : Stopping unresposive process.")); \ + killpg(getpgrp(), SIGUSR1); /* Possible 'unresponsive' system, 'unlock' all related 'child' processes, default action is terminate */ \ + } \ + RESULT = future.get(); + +int main(int argc, char* argv[]) +{ + using namespace WPEFramework::Core; + + constexpr uint8_t maxChildren = 3; + + constexpr uint32_t memoryMappedFileRequestedSize = 446; + constexpr uint32_t internalBufferSize = 446; + + constexpr char fileName[] = "/tmp/SharedCyclicBuffer"; + + constexpr uint32_t totalRuntime = infinite /*20000*/; // Milliseconds + constexpr uint32_t totalTimeout = /*totalRuntime +*/ 20000; // Milliseconds + + WPEFramework::Tests::Process process(fileName); + + bool result = false; + + ASYNC_TIMEOUT_BEGIN // result will never be updated in its original scope + + result = process.SetTotalRuntime(totalRuntime) + && process.SetParentUsers(0, 0) /* 0 extra writer(s), 0 reader(s) */ + && process.SetChildUsers(1, 1) /* 1 writer(s), 1 reader(s) */ + && process.Execute() + ; + + ASYNC_TIMEOUT_END(totalTimeout, result) + + return result ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/Tests/cyclic-buffer/process.h b/Tests/cyclic-buffer/process.h new file mode 100644 index 000000000..68dd04bd1 --- /dev/null +++ b/Tests/cyclic-buffer/process.h @@ -0,0 +1,1026 @@ +/* + * If not stated otherwise in this file or this component's LICENSE file the + * following copyright and licenses apply: + * + * Copyright 2020 Metrological + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define MODULE_NAME CyclicBufferStress + +#include +#include +#include +#include +#include +#include +#include /* Definition of FUTEX_* constants */ +#include /* Definition of SYS_* constants */ +#include +#include + +// https://en.wikipedia.org/wiki/Lorem_ipsum +#define LOREM_IPSUM_TEXT "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." + +// Define in unit that includes this file +//#define CREATOR_WRITE +//#define CREATOR_EXTRA_USERS + +namespace WPEFramework { +namespace Tests { + +class Thread : public Core::Thread +{ +private : + + static constexpr uint32_t n = sizeof(LOREM_IPSUM_TEXT); + +public : + + Thread() + : _status{ true } + { + memcpy(_data.data(), LOREM_IPSUM_TEXT, n); + } + + virtual ~Thread() = default; + +protected : + + bool Validate(const uint8_t data[], uint32_t count) + { + _status = false; + + std::queue reference; + + std::for_each(_data.begin(), _data.end(), [&reference] (uint8_t value){reference.push(value);} ); + + // Just a little further than the buffer size since the matching pattern might be shifted + size_t stop = 2 * reference.size(); + + uint32_t offset = 0; + + while ( stop > 0 + && offset < count + ) { + auto element = reference.front(); + + if (element == data[offset]) { + ++offset; + } else { + offset = 0; + } + // Shift by one and match pattern again + + reference.push(element); + reference.pop(); + + --stop; + } + + _status = offset == count; + + return _status; + } + + bool Status() const + { + return _status; + } + + static constexpr uint8_t threadWorkerInterval = 10; // Milliseconds + static constexpr uint32_t lockTimeout = Core::infinite; // Milliseconds + static constexpr uint8_t sampleSizeInterval = 5; // Number of uint8_t elements + static constexpr uint8_t forceUnlockRetryCount = 5; // Attemps until Alert() at destruction + + static constexpr uint32_t N = n; + + mutable std::array _data; + + std::atomic _status; +}; + +template +class Reader : public Thread +{ +public : + + Reader() = delete; + Reader(const Reader&) = delete; + Reader& operator=(const Reader&) = delete; + + Reader(const std::string& fileName) + : Reader(fileName, 0) + {} + + Reader(const std::string& fileName, size_t numReservedBlocks) + : Thread{} + , _enabled{ false } + , _fileName{ fileName } + , _index{ 0 } + , _numReservedBlocks{ numReservedBlocks } + , _buffer{ + fileName + , Core::File::USER_READ // Not relevant for readers + | Core::File::USER_WRITE // Open the existing file with write permission + // Readers normally require readonly but the cyclic buffer administration is updated after read +// | Core::File::CREATE // Readers access exisitng underlying memory mapped files + | Core::File::SHAREABLE // Updates are visible to other processes, but a reader should not update any content except when read data is (automatically) removed + , 0 // Readers do not specify sizes + , false // Overwrite unread data + } + { + _output.fill('\0'); + } + + ~Reader() + { +// ASSERT(_buffer.LockPid() != gettid()); // Thunder schedules Worker on a different Thread + + Stop(); + + // No way to recover if the lock is taken indefinitly, eg, Core::infinite + uint8_t count { forceUnlockRetryCount }; + do { + if (count == 0) { + _buffer.Alert(); + } else { + --count; + } + } while (!Wait(Thread::STOPPED | Thread::BLOCKED, threadWorkerInterval)); + + _buffer.Close(); + } + + bool Enable() + { + _enabled = _enabled + || ( _buffer.IsValid() + && _buffer.Open() // It already should + && _buffer.Storage().Name() == _fileName + && Core::File(_fileName).Exists() + && _buffer.Storage().Exists() + ) + ; + + ASSERT(_enabled); + + return _enabled; + } + + bool IsEnabled() const + { + return _enabled; + } + + bool HasError() const + { + return !Status(); + } + + uint32_t Worker() override + { + uint32_t waitTimeForNextRun = Core::infinite; + + uint32_t status = _buffer.Lock(false /* data present, false == signalling path, true == PID path */, lockTimeout /* waiting time to give up lock */); + + if (status == Core::ERROR_NONE) { + const uint32_t count = std::rand() % sampleSizeInterval; + + uint32_t read = Read(count); + if (read == count) { +// TRACE_L1(_T("Data read")); + } else if (count > 0 ){ +// TRACE_L1(_T("Less data read than requested")); // Possibly too few writes + } + + status = _buffer.Unlock(); + + if (status != Core::ERROR_NONE) { + TRACE_L1(_T("Error: reader unlock failed")); + Stop(); + } else { + waitTimeForNextRun = std::rand() % threadWorkerInterval; + } + } else { + if (status == Core::ERROR_TIMEDOUT) { + TRACE_L1(_T("Warning: reader lock timed out")); + } else { + TRACE_L1(_T("Error: reader lock failed")); + Stop(); + } + } + + return waitTimeForNextRun; // Schedule milliseconds from now to be called (again) eg interval time + } + +private : + + uint32_t Read(uint32_t count) + { + uint32_t read = 0; + + if (count > 0) { + std::vector data(count, '\0'); + + if ((_index + count) > N) { + // Two passes when passing the boundary + read = _buffer.Read(&(data.data()[0]), N - _index, false); + memcpy(&_output[_index], &(data.data()[0]), read); + + if ( read == (N - _index) + && count > read + ) { + const uint32_t position = read; + + read = _buffer.Read(&(data.data()[position]), count - read, false); + memcpy(&_output[0], &(data.data()[position]), read); + + read = read + position; + } + } else { + read = _buffer.Read(&(data.data()[0]), count, false); + memcpy(&_output[_index], &(data.data()[0]), read); + } + + if ( read > 0 + && _buffer.IsOverwrite() // The data may have been overwritten + && !Validate(data.data(), read) + ) { + TRACE_L1("Error: detected read data corruption."); + } + + _index = (_index + read) % N; + } + + return read; + } + + // Only relevant with reserve being actively used, and, in single writer context + bool Validate(const uint8_t data[], uint32_t count) + { + bool result = true; + + if ( _numReservedBlocks > 0 + && count > 0 + ) { + result = Thread::Validate(data, count); + } + + return result; + } + + bool _enabled; + + const std::string _fileName; + + uint64_t _index; + + const size_t _numReservedBlocks; + + std::array _output; + + Core::CyclicBuffer _buffer; +}; + + +template +class Writer : public Thread +{ +public : + + Writer() = delete; + Writer(const Writer&) = delete; + Writer& operator=(const Writer&) = delete; + + Writer(const std::string& fileName, uint32_t requiredSharedBufferSize) + : Writer(fileName, requiredSharedBufferSize, 0) + {} + + Writer(const std::string& fileName, uint32_t requiredSharedBufferSize, size_t numReservedBlocks) + : Thread{} + , _enabled{ false } + , _fileName { fileName } + , _index{ 0 } + , _numReservedBlocks{ numReservedBlocks } + , _reserved{ 0 } + , _buffer{ + fileName + , Core::File::USER_READ // Enable read permissions on the underlying file for other users + | Core::File::USER_WRITE // Enable write permission on the underlying file + | (requiredSharedBufferSize ? Core::File::CREATE : 0) // Create a new underlying memory mapped file + | Core::File::SHAREABLE // Allow other processes to access the content of the file + , requiredSharedBufferSize // Requested size + , false // Overwrite unread data + } + { + static_assert(N > 0 || Thread::N > N, "Specify a data set with at least one character (N > 0) with N <= Thread::N."); + + memcpy(_input.data(), Thread::_data.data(), N); + } + + ~Writer() + { +// ASSERT(_buffer.LockPid() != gettid()); // Thunder schedules Worker on a different Thread + + Stop(); + + // No way to recover if the lock is taken indefinitly, eg, Core::infinite + uint8_t count { forceUnlockRetryCount }; + do { + if (count == 0) { + _buffer.Alert(); + } else { + --count; + } + } while (!Wait(Thread::STOPPED | Thread::BLOCKED, threadWorkerInterval)); + + _buffer.Close(); + } + + bool Enable() + { + _enabled = _enabled + || ( _buffer.IsValid() + && _buffer.Open() // It already should + && _buffer.Storage().Name() == _fileName + && Core::File(_fileName).Exists() + && _buffer.Storage().Exists() + ) + ; + + ASSERT(_enabled); + + return _enabled; + } + + bool IsEnabled() const + { + return _enabled; + } + + bool HasError() const + { + return !Status(); + } + + uint32_t Worker() override + { + uint32_t waitTimeForNextRun = Core::infinite; + + // Write(), Free() and Reserve() may all experience race conditions due to thread scheduling + uint32_t status = _buffer.Lock(false /* data present, false == signalling path, true == PID path */, lockTimeout /* waiting time to give up lock */); + if (status == Core::ERROR_NONE) { + uint32_t count = std::rand() % sampleSizeInterval; + + if ( _numReservedBlocks > 0 + && _reserved == 0 + ) { + _reserved = _buffer.Size() / _numReservedBlocks; + +// TRACE_L1(_T("reserved : %ld"), reserved); + ASSERT(_buffer.Size() % _numReservedBlocks == 0); + + // Both Free() and Reserve() may experience a race condition with other writers + if ( _reserved > _buffer.Free() + || _buffer.Reserve(_reserved) != Core::ERROR_NONE) { + // Another has already made a reservation or the block is unavailable + _reserved = 0; + } + } + + if ( _numReservedBlocks > 0 + && _reserved <= count + ) { + count = _reserved; + } + + uint32_t written = 0; + + if ( _numReservedBlocks > 0 + && _reserved == 0 + ) { + // Reservation mode but failed to 'allocate' a block + } else { + written = Write(count); + _reserved -= (_numReservedBlocks > 0 ? written : 0); + } + + if ( written > 0 + && written == count + ) { +// TRACE_L1(_T("Data written")); + } else if (count > 0) { +// TRACE_L1(_T("Less data written than requested")); // Possibly too few reads + } + + status = _buffer.Unlock(); + + if (status != Core::ERROR_NONE) { + TRACE_L1(_T("Error: writer unlock failed")); + Block(); + } else { + waitTimeForNextRun = std::rand() % threadWorkerInterval; + } + } else { + if (status == Core::ERROR_TIMEDOUT) { + TRACE_L1(_T("Warning: writer lock timed out")); + } else { + TRACE_L1(_T("Error: writer lock failed")); + Block(); + } + } + + return waitTimeForNextRun; // Schedule milliseconds from now to be called (again) eg interval time + } + +private : + + uint32_t Write(uint32_t count) + { + uint32_t written = 0; + + if (count > 0) { + std::vector data(count, '\0'); + + // _index 0..N-1 for N + if ((_index + count) > N) { + // Two passes when passing the boundary + // Note: full data writes with insufficient (total free) space typically are refused but here might be bypassed + + data.assign(&_input[_index], &_input[_index + N -_index]); + written = _buffer.Write(&(data.data()[0]), N - _index); + + if ( written == N - _index + && count > written + ) { + data.insert(data.begin() + written, &_input[0], &_input[count - written]); + written += _buffer.Write(&(data.data()[written]), count - written); + } + } else { + // One pass + data.assign(&_input[_index], &_input[_index + count]); + written = _buffer.Write(&(data.data()[0]), count); + } + + if ( written > 0 + && !Validate(data.data(), written) + ) { + TRACE_L1("Error: detected written data corruption."); + } + + _index = (_index + written) % N; + } + + return written; + } + + // Only 'relevant' with reserve being actively used + bool Validate(const uint8_t data[], uint32_t count) + { + bool result = true; + + if ( _numReservedBlocks > 0 + && count > 0 + ) { + result = Thread::Validate(data, count); + } + + return result; + } + + bool _enabled; + + const std::string _fileName; + + uint64_t _index; + + const size_t _numReservedBlocks; + + uint32_t _reserved; + + mutable std::array _input; + + Core::CyclicBuffer _buffer; +}; + +template +class BufferCreator +{ +public : + + BufferCreator() = delete; + BufferCreator(const BufferCreator&) = delete; + BufferCreator& operator=(const BufferCreator&) = delete; + + BufferCreator(const std::string& fileName) + : BufferCreator(fileName, 0) + {} + + BufferCreator(const std::string& fileName, size_t numReservedBlocks) + : _writer(fileName, memoryMappedFileRequestedSize, numReservedBlocks) + { + static_assert(memoryMappedFileRequestedSize, "Specify memoryMappedFileRequestedSize > 0"); + } + + bool Enable() + { + return _writer.Enable(); + } + + bool Start() + { + constexpr bool result = true; + + _writer.Run(); + + return result; + } + + bool Stop() + { + _writer.Stop(); + + return !_writer.HasError(); + } + + bool IsEnabled() const + { + return _writer.IsEnabled(); + } + +private : + + Writer _writer; +}; + +template +class BufferUsers +{ +public : + + BufferUsers() = delete; + BufferUsers(const BufferUsers&) = delete; + BufferUsers& operator=(const BufferUsers&) = delete; + + BufferUsers(const std::string& fileName) + : BufferUsers(fileName, W, R, 0) + { + } + + BufferUsers(const std::string& fileName, size_t maxWriters, size_t maxReaders) + : BufferUsers(fileName, maxWriters, maxReaders, 0) + {} + + BufferUsers(const std::string& fileName, size_t maxWriters, size_t maxReaders, size_t numReservedBlocks) + : _writers(maxWriters > W ? W : maxWriters) + , _readers(maxReaders > R ? R : maxReaders) + { + for_each(_writers.begin(), _writers.end(), [&fileName, &numReservedBlocks] (std::unique_ptr>& writer){ writer = std::move(std::unique_ptr>(new Writer(fileName, 0, numReservedBlocks))); }); + for_each(_readers.begin(), _readers.end(), [&fileName, &numReservedBlocks] (std::unique_ptr>& reader){ reader = std::move(std::unique_ptr>(new Reader(fileName, numReservedBlocks))); }); + } + + ~BufferUsers() + { + /* bool */ Stop(); + } + + bool Enable() + { + return std::all_of(_writers.begin(), _writers.end(), [] (std::unique_ptr>& writer){ return writer->Enable(); }) + && std::all_of(_readers.begin(), _readers.end(), [] (std::unique_ptr>& reader){ return reader->Enable(); }) + ; + } + + bool Start() const + { + constexpr bool result = true; + + for_each(_writers.begin(), _writers.end(), [] (const std::unique_ptr>& writer){ writer->Run(); }); + for_each(_readers.begin(), _readers.end(), [] (const std::unique_ptr>& reader){ reader->Run(); }); + + return result; + } + + bool Stop() const + { + bool result = true; + + for_each(_writers.begin(), _writers.end(), [&result] (const std::unique_ptr>& writer){ writer->Stop(); result = result && !writer->HasError(); }); + for_each(_readers.begin(), _readers.end(), [&result] (const std::unique_ptr>& reader){ reader->Stop(); result = result && !reader->HasError(); }); + + + return result; + } + +private : + + std::vector>> _writers; + std::vector>> _readers; +}; + +template +class Process +{ +public : + + Process() = delete; + Process(const Process&) = delete; + Process& operator=(const Process&) = delete; + + Process(const std::string& fileName) + : _fileName{ fileName } + , _sync{ nullptr } + , _childUsersSet{ maxReadersPerProcess, maxReadersPerProcess } + , _parentUsersSet{ maxWritersPerProcess, maxReadersPerProcess } + , _setupTime{ Core::infinite } + , _runTime{ Core::infinite } + , _numReservedBlocks{ 0 } + { + std::for_each(_children.begin(), _children.end(), [] (pid_t& child){ child = 0; } ); + + // Add some randomness + std::srand(std::time(nullptr)); + + bool result = PrepareIPSync(); + + ASSERT(result); DEBUG_VARIABLE(result); + } + + ~Process() + { + bool result = CleanupIPSync(); + + ASSERT(result); DEBUG_VARIABLE(result); + } + + bool Execute() + { + return ForkAndExecute(); + } + + bool SetChildUsers(uint8_t numWriters, uint8_t numReaders) + { + if (numWriters <= maxWritersPerProcess) { + _childUsersSet[0] = numWriters; + } + + if (numReaders <= maxReadersPerProcess) { + _childUsersSet[1] = numReaders; + } + + return _childUsersSet[0] == numWriters + && _childUsersSet[1] == numReaders + ; + } + + bool SetParentUsers(uint8_t numWriters, uint8_t numReaders) + { + if (numWriters <= maxWritersPerProcess) { + _parentUsersSet[0] = numWriters; + } + + if (numReaders <= maxReadersPerProcess) { + _parentUsersSet[1] = numReaders; + } + + return _parentUsersSet[0] == numWriters + && _parentUsersSet[1] == numReaders + ; + } + + bool SetTotalRuntime(uint32_t runTime /* milliseconds */) + { + if (runTime <= maxTotalRuntime) { + _runTime = runTime; + } + + return _runTime == runTime; + } + + bool SetAllowedSetupTime(uint32_t setupTime /* seconds */) + { + if (setupTime <= maxSetupTime) { + _setupTime = setupTime; + } + + return _setupTime == setupTime; + } + + bool SetNumReservedBlocks(uint8_t numReservedBlocks) + { + if (numReservedBlocks <= maxNumReservedBlocks) { + _numReservedBlocks = numReservedBlocks; + } + + return _numReservedBlocks == numReservedBlocks; + } + +private : + + // Hard limits + + static constexpr uint32_t maxSetupTime = 10; // Seconds + static constexpr uint32_t maxTotalRuntime = Core::infinite; // Milliseconds + + static constexpr uint8_t maxWritersPerProcess = 2; + static constexpr uint8_t maxReadersPerProcess = 2; + + static constexpr uint8_t maxNumReservedBlocks = 2; + + // The order is important, sync variable + enum status : uint8_t { + uninitialized = 1 + , initialized = 2 + , ready = 4 + }; + + bool PrepareIPSync() + { + if (_sync == nullptr) { + _sync = reinterpret_cast(mmap(nullptr, sizeof(struct sync_wrapper), PROT_READ | PROT_WRITE, MAP_SHARED/*_VALIDATE*/ | MAP_ANONYMOUS, -1, 0)); + } + + bool result = _sync != nullptr; + + if (_sync != nullptr) { + _sync->level = status::uninitialized; + } + + return result; + } + + bool CleanupIPSync() + { + bool result = _sync != nullptr + && munmap(_sync, sizeof(struct sync_wrapper)) == 0 + ; + + return result; + } + + bool LockIPSync(const std::array& flags, const struct timespec& timeout) const + { + long retval = 0; + int state = 0; + + do { + if (_sync->level == flags[0]) { + retval = syscall(SYS_futex, reinterpret_cast(_sync), FUTEX_WAIT, static_cast(flags[0]), &timeout, nullptr, 0); + state = errno; + } + } while (( retval == -1 + && ( (state & ETIMEDOUT) == ETIMEDOUT + || (state & EAGAIN) == EAGAIN + ) + ) + && _sync->level != flags[1] + ); + + // ETIMEDOUT : The parent / child is not yet ready + // EAGAIN : The handshake between parent and (another) child has been completed as the value has already been altered + + return retval >= 0 || _sync->level != flags[0]; // 0 == woken up or parent / child already ready + } + + bool UnlockIPSync(const std::array& flags, long& retval) + { + if ( _sync->level == flags[0] + && _sync->level != flags[1] + ) { + _sync->level = flags[1]; + /*long*/ retval = syscall(SYS_futex, reinterpret_cast(_sync), FUTEX_WAKE, static_cast(std::numeric_limits::max()) /* INT_MAX, number of waiters to wake up */, nullptr, 0, 0); + } + + return retval >= 0; // The parent / child might already be ready + } + + bool ForkAndExecute() + { + bool result = true; + + for (auto it = _children.begin(), end = _children.end(); it != end; it++) { + pid_t pid = fork(); + + // Format specifier %ld, pid_t, gettid() and getpid() + static_assert(sizeof(pid_t) <= sizeof(long), "Specify a more suitable formmat specifier for pid_t."); + + switch (pid) { + case -1 : // Error + { + result = pid < 0; + + TRACE_L1(_T("Error: failed to create the remote process.")); + break; + } + case 0 : // Child + { + const struct timespec timeout = {.tv_sec = _setupTime, .tv_nsec = 0}; + + std::array flags = {status::uninitialized, status::initialized}; + + result = LockIPSync(flags, timeout); + + ASSERT(result); + + // ETIMEDOUT : The parent is not yet ready + // EAGAIN : The handshake between parent and (another) child has been completed as the value has already been altered + + TRACE_L1(_T("Child %ld knows its parent is ready [true/false]: %s."), getpid(), _sync->level != status::uninitialized ? _T("true") : _T("false")); + + //BufferUsers users(_fileName, _childUsersSet[0], _childUsersSet[1]); // # writer(s), # reader(s) + BufferUsers users(_fileName, _childUsersSet[0], _childUsersSet[1], _numReservedBlocks); // # writer(s), # reader(s) + + flags = {status::initialized, status::ready}; + + long retval = 0; + + result = UnlockIPSync(flags, retval); + + ASSERT(result); + + TRACE_L1(_T("Child %ld has woken up %ld parent(s)."), getpid(), retval); + + result = CleanupIPSync() + && Core::File(_fileName).Exists() + && users.Enable() + && users.Start() + ; + + if (result) { + SleepMs(_runTime); + + result = users.Stop(); + } else { + TRACE_L1(_T("Error: Unable to access the CyclicBuffer.")); + } + + return result; + } + default : // Parent + { + // Keep track of conceived children + *it = pid; + + result = pid > 0 + && result + ; + } + } + } + + return result + && ExecuteParent() + ; + } + + bool ExecuteParent() + { + // The underlying memory mapped file will be created and opened via DataElementFile construction + std::unique_ptr> creator { std::move(std::unique_ptr>(new BufferCreator(_fileName, _numReservedBlocks))) }; + + bool result = creator.get() != nullptr + // Immunize, signal to use for unresponsive child processes, default action is terminate + && signal(SIGUSR1, SIG_IGN) != SIG_ERR + && creator->Enable() + && creator->IsEnabled() +#ifdef CREATOR_WRITE + && creator->Start() +#endif + ; + + if (result) { + TRACE_L1(_T("'creator' and shared cyclic buffer ready.")); + +#ifdef CREATOR_EXTRA_USERS + BufferUsers users(_fileName, _parentUsersSet[0], _parentUsersSet[1]); // # writer(s), # reader(s) + + result = users.Enable() + && users.Start() + ; + + if (result) { +#endif + _sync->level = status::uninitialized; + + std::array flags = {status::uninitialized, status::initialized}; + + long retval = 0; + + result = UnlockIPSync(flags, retval); + + ASSERT(result); + + TRACE_L1(_T("Parent %ld has woken up %ld child(ren)."), gettid(), retval); + + const struct timespec timeout = {.tv_sec = _setupTime, .tv_nsec = 0}; + + flags = {status::initialized, status::ready}; + + bool result = LockIPSync(flags, timeout); + + ASSERT(result); + + TRACE_L1(_T("Parent %ld has been woken up by child [true/false]: %s."), gettid(), retval == 0 ? _T("true") : _T("false")); + + result = CleanupIPSync() + && WaitForCompletion(/*timeout for waitpid*/); + +#ifdef CREATOR_EXTRA_USERS + result = users.Stop() + && result + ; + } else { + TRACE_L1(_T("Error: Unable to start 'extra users'.")); + } +#endif + +#ifdef CREATOR_WRITE + result = creator->Stop() + && result + ; +#endif + + } else { + TRACE_L1(_T("Error: 'creator' and shared cyclic buffer unavailable.")); + } + + return result; + } + + bool WaitForCompletion(/*timeout*/) + { + bool result = true; + + // Reap + for (auto it = _children.begin(), end = _children.end(); it != end; it++) { + int status = 0; + +// pid_t pid = waitpid(-1 /* wait for child */, &status, 0); // Wait out of order + pid_t pid = waitpid(*it /* wait for child */, &status, 0); // Wait in order + + switch (pid) { + case -1 : // No more children / child has died + if (errno == ECHILD) { + TRACE_L1(_T("Child %ld is not our offspring."), pid); + result = false; + } + break; + case 0 : // Child has not changed state, see WNOHANG + result = false; + break; + default : // Child's pid + if (WIFEXITED(status)) { + TRACE_L1(_T("Child %ld died NORMALLY with status %ld."), pid, WEXITSTATUS(status)); + result = result + && true; + } else { + result = false; + + if (WIFSIGNALED(status)) { + TRACE_L1(_T("Child %ld died ABNORMALLY due to signal %ld."), pid, WTERMSIG(status)); + } else if (WIFSTOPPED(status)) { + TRACE_L1(_T("Child %ld died ABNORMALLY due to stop signal %ld."), pid, WSTOPSIG(status)); + } else if (errno != EAGAIN) {// pid non-blocking + TRACE_L1(_T("Child %ld died ABNORMALLY."), pid); + } else { + TRACE_L1(_T("Error: unprocessed child %ld status."), pid); + } + } + } + } + + return result; + } + + const std::string _fileName; + + struct sync_wrapper { + status level; + }* _sync; + + std::array _children; + + std::array _childUsersSet; + std::array _parentUsersSet; + + uint32_t _runTime; // Milliseconds + uint32_t _setupTime; // Seconds + + uint8_t _numReservedBlocks; +}; + +} // Tests +} // WPEFramework + diff --git a/Tests/unit/core/CMakeLists.txt b/Tests/unit/core/CMakeLists.txt index 2e665133f..edcd9bc00 100644 --- a/Tests/unit/core/CMakeLists.txt +++ b/Tests/unit/core/CMakeLists.txt @@ -21,7 +21,8 @@ if(LINUX) # IPTestAdministrator only supported on LINUX platform add_executable(${TEST_RUNNER_NAME} ../IPTestAdministrator.cpp - #test_cyclicbuffer.cpp + test_cyclicbuffer.cpp +# test_cyclicbuffer_dataexchange.cpp test_databuffer.cpp test_dataelement.cpp test_dataelementfile.cpp @@ -42,6 +43,7 @@ add_executable(${TEST_RUNNER_NAME} test_lockablecontainer.cpp test_measurementtype.cpp test_memberavailability.cpp + test_message_dispatcher.cpp test_messageException.cpp test_networkinfo.cpp test_nodeid.cpp @@ -56,7 +58,7 @@ add_executable(${TEST_RUNNER_NAME} test_rectangle.cpp test_rpc.cpp test_semaphore.cpp - #test_sharedbuffer.cpp + test_sharedbuffer.cpp test_singleton.cpp test_socketstreamjson.cpp test_socketstreamtext.cpp @@ -72,7 +74,7 @@ add_executable(${TEST_RUNNER_NAME} test_time.cpp test_timer.cpp test_tristate.cpp - #test_valuerecorder.cpp + test_valuerecorder.cpp test_weblinkjson.cpp test_weblinktext.cpp test_websocketjson.cpp @@ -105,7 +107,6 @@ add_executable(${TEST_RUNNER_NAME} test_lockablecontainer.cpp test_measurementtype.cpp test_memberavailability.cpp -# test_message_dispatcher.cpp test_messageException.cpp test_networkinfo.cpp test_nodeid.cpp diff --git a/Tests/unit/core/test_cyclicbuffer.cpp b/Tests/unit/core/test_cyclicbuffer.cpp index 667982cde..8c7277b09 100644 --- a/Tests/unit/core/test_cyclicbuffer.cpp +++ b/Tests/unit/core/test_cyclicbuffer.cpp @@ -907,46 +907,56 @@ namespace Core { } void SetSharePermissionsFromForkedProcessAndVerify(bool shareable, bool usingDataElementFile = false, uint32_t offset = 0) { - std::string bufferName {"cyclicbuffer01"}; - const uint32_t CyclicBufferSize = 10; + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 1; - uint32_t shareableFlag = (shareable == true) ? ::Thunder::Core::File::Mode::SHAREABLE : 0; - const uint32_t mode = - (::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - shareableFlag); + const std::string bufferName {"cyclicbuffer01"}; - struct Data data; - data.mode = mode; - data.bufferName = bufferName.c_str(); - data.shareable = shareable; - data.usingDataElementFile = usingDataElementFile; - data.offset = offset; + constexpr uint32_t CyclicBufferSize = 10; + constexpr uint32_t cyclicBufferWithControlDataSize = CyclicBufferSize + sizeof(::Thunder::Core::CyclicBuffer::control); + + const uint32_t mode = ( ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | (shareable ? ::Thunder::Core::File::Mode::SHAREABLE : 0) + ); + + const struct Data data{shareable, usingDataElementFile, mode, offset, bufferName.c_str()}; + + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { + // a small delay so the parent can be set up + SleepMs(maxInitTime); - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { - struct Data* data = (reinterpret_cast(testAdmin.Data())); uint32_t result; - uint32_t cyclicBufferSize = CyclicBufferSize; - uint8_t loadBuffer[cyclicBufferSize + 1]; + uint8_t loadBuffer[CyclicBufferSize + 1]; - CyclicBufferTest* buffer = nullptr; + CyclicBufferTest* buffer = nullptr; ::Thunder::Core::DataElementFile* dataElementFile = nullptr; - if (data->usingDataElementFile == true) { - uint8_t cyclicBufferWithControlDataSize = cyclicBufferSize + sizeof(::Thunder::Core::CyclicBuffer::control); - dataElementFile = new ::Thunder::Core::DataElementFile(bufferName, data->mode | ::Thunder::Core::File::CREATE, cyclicBufferWithControlDataSize + data->offset); - buffer = new CyclicBufferTest(*dataElementFile, true, data->offset, cyclicBufferWithControlDataSize, false); + + if (data.usingDataElementFile == true) { + dataElementFile = new ::Thunder::Core::DataElementFile(bufferName, data.mode | ::Thunder::Core::File::CREATE, cyclicBufferWithControlDataSize + data.offset); + + ASSERT_TRUE(dataElementFile != nullptr); + + buffer = new CyclicBufferTest(*dataElementFile, true, data.offset, cyclicBufferWithControlDataSize, false); } else { - buffer = new CyclicBufferTest(bufferName, data->mode, cyclicBufferSize, false); + buffer = new CyclicBufferTest(bufferName, data.mode, CyclicBufferSize, false); } - EXPECT_EQ(buffer->Size(), cyclicBufferSize); - testAdmin.Sync("setup server"); - testAdmin.Sync("setup client"); + ASSERT_TRUE(buffer != nullptr); - if (data->shareable == true) { - testAdmin.Sync("client read empty data"); - string dataStr = "abcd"; + EXPECT_EQ(buffer->Size(), CyclicBufferSize); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + if (data.shareable == true) { + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + string dataStr = "abcd"; result = buffer->Write(reinterpret_cast(dataStr.c_str()), dataStr.size()); EXPECT_EQ(result, dataStr.size()); @@ -954,94 +964,118 @@ namespace Core { result = buffer->Write(reinterpret_cast(dataStr.c_str()), dataStr.size()); EXPECT_EQ(result, dataStr.size()); - testAdmin.Sync("server wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("client read"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("client wrote"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); result = buffer->Peek(loadBuffer, buffer->Used()); loadBuffer[result] = '\0'; EXPECT_EQ(result, 9u); - EXPECT_STREQ((char*)loadBuffer, "bcdefghkl"); + + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "bcdefghkl"); EXPECT_EQ(buffer->Used(), 9u); - testAdmin.Sync("server peek"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("server start read"); result = buffer->Read(loadBuffer, buffer->Used()); loadBuffer[result] = '\0'; + EXPECT_EQ(result, 9u); - EXPECT_STREQ((char*)loadBuffer, "bcdefghkl"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "bcdefghkl"); EXPECT_EQ(buffer->Used(), 0u); - testAdmin.Sync("server read"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + delete buffer; + if (dataElementFile) { delete dataElementFile; } } }; - static std::function lambdaVar = lambdaFunc; - - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; - - // This side (tested) acts as client - IPTestAdministrator testAdmin(otherSide, reinterpret_cast(&data), 10); - { - testAdmin.Sync("setup server"); + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint32_t result; - uint32_t cyclicBufferSize = CyclicBufferSize; - uint8_t loadBuffer[cyclicBufferSize]; + uint8_t loadBuffer[CyclicBufferSize]; - CyclicBufferTest* buffer; + CyclicBufferTest* buffer = nullptr; ::Thunder::Core::DataElementFile* dataElementFile = nullptr; + if (usingDataElementFile == true) { dataElementFile = new ::Thunder::Core::DataElementFile(bufferName, mode, 0); + + ASSERT_TRUE(dataElementFile != nullptr); + buffer = new CyclicBufferTest(*dataElementFile, false, offset, 0, false); } else { buffer = new CyclicBufferTest(bufferName, mode, 0, false); } - EXPECT_EQ(buffer->Size(), static_cast(((shareable == true) ?CyclicBufferSize : 0))); - testAdmin.Sync("setup client"); + ASSERT_TRUE(buffer != nullptr); + + EXPECT_EQ(buffer->Size(), static_cast((shareable ? CyclicBufferSize : 0))); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); if (shareable == true) { - memset(loadBuffer, 0, cyclicBufferSize); + memset(loadBuffer, 0, CyclicBufferSize); + result = buffer->Read(loadBuffer, 1, false); - EXPECT_EQ(result, static_cast(0)); + EXPECT_EQ(result, 0); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("client read empty data"); - testAdmin.Sync("server wrote"); result = buffer->Read(loadBuffer, 1, false); - EXPECT_EQ(result, static_cast(1)); + EXPECT_EQ(result, 1); + loadBuffer[result] = '\0'; - EXPECT_STREQ((char*)loadBuffer, "a"); - testAdmin.Sync("client read"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "a"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); string data = "kl"; result = buffer->Reserve(data.size()); EXPECT_EQ(result, 2u); + result = buffer->Write(reinterpret_cast(data.c_str()), data.size()); EXPECT_EQ(result, 2u); - testAdmin.Sync("client wrote"); - testAdmin.Sync("server peek"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(buffer->Used(), 9u); - testAdmin.Sync("server start read"); - testAdmin.Sync("server read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(buffer->Used(), 0u); } + buffer->Close(); + delete buffer; + if (dataElementFile) { delete dataElementFile; } - } + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child + ::Thunder::Core::Singleton::Dispose(); } TEST(Core_CyclicBuffer, CheckSharePermissionsFromForkedProcessWithoutOverwrite) @@ -1061,35 +1095,52 @@ namespace Core { } TEST(Core_CyclicBuffer, WithoutOverwriteUsingForksReversed) { - std::string bufferName {"cyclicbuffer02"}; - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { - testAdmin.Sync("setup client"); + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 1; - uint32_t cyclicBufferSize = 0; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + const std::string bufferName {"cyclicbuffer02"}; - ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + constexpr uint32_t cyclicBufferSize = 0; + + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; + + ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); + + ASSERT_TRUE(buffer.IsValid()); - testAdmin.Sync("setup server"); - testAdmin.Sync("client wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint8_t loadBuffer[cyclicBufferSize + 1]; + uint32_t result = buffer.Read(loadBuffer, 4); loadBuffer[result] = '\0'; - EXPECT_STREQ((char*)loadBuffer, "abcd"); - testAdmin.Sync("server read"); - string data = "klmnopq"; + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "abcd"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + const string data = "klmnopq"; + result = buffer.Reserve(data.size()); EXPECT_EQ(result, ::Thunder::Core::ERROR_INVALID_INPUT_LENGTH); + result = buffer.Write(reinterpret_cast(data.c_str()), data.size()); EXPECT_EQ(result, 0u); - testAdmin.Sync("server wrote"); - testAdmin.Sync("client peek"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); EXPECT_FALSE(buffer.Overwritten()); EXPECT_FALSE(buffer.IsLocked()); @@ -1097,244 +1148,338 @@ namespace Core { EXPECT_EQ(buffer.LockPid(), 0u); EXPECT_EQ(buffer.Free(), 5u); - testAdmin.Sync("client start read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + EXPECT_STREQ(buffer.Name().c_str(), bufferName.c_str()); EXPECT_STREQ(buffer.Storage().Name().c_str(), bufferName.c_str()); - testAdmin.Sync("client read"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(buffer.Used(), 0u); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); }; - static std::function lambdaVar = lambdaFunc; + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + // a small delay so the child can be set up + SleepMs(maxInitTime); - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; + constexpr uint32_t cyclicBufferSize = 10; - // This side (tested) acts as client - IPTestAdministrator testAdmin(otherSide); - { - uint32_t cyclicBufferSize = 10; + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); - ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); + ASSERT_TRUE(buffer.IsValid()); - testAdmin.Sync("setup client"); - testAdmin.Sync("setup server"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint8_t loadBuffer[cyclicBufferSize + 1]; - string data = "abcdefghi"; + const string data = "abcdefghi"; + uint32_t result = buffer.Write(reinterpret_cast(data.c_str()), data.size()); EXPECT_EQ(result, data.size()); - testAdmin.Sync("client wrote"); - testAdmin.Sync("server read"); - testAdmin.Sync("server wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); result = buffer.Peek(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; + EXPECT_EQ(result, 5u); - EXPECT_STREQ((char*)loadBuffer, "efghi"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "efghi"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("client peek"); - testAdmin.Sync("client start read"); result = buffer.Read(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; + EXPECT_EQ(result, 5u); - EXPECT_STREQ((char*)loadBuffer, "efghi"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "efghi"); EXPECT_EQ(buffer.Used(), 0u); - testAdmin.Sync("client read"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); buffer.Close(); - } + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child + ::Thunder::Core::Singleton::Dispose(); } TEST(Core_CyclicBuffer, WithOverWriteUsingFork) { - std::string bufferName {"cyclicbuffer03"}; + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 1; - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { - uint32_t cyclicBufferSize = 10; + const std::string bufferName {"cyclicbuffer03"}; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { + // a small delay so the parent can be set up + SleepMs(maxInitTime); + + constexpr uint32_t cyclicBufferSize = 10; - CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; + + CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); - testAdmin.Sync("setup server"); - testAdmin.Sync("setup client"); + ASSERT_TRUE(buffer.IsValid()); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + const string data = "abcdef"; + const uint16_t size = data.size() + 2; - string data = "abcdef"; - uint16_t size = data.size() + 2; uint32_t result = buffer.Write(reinterpret_cast(&size), 2u); EXPECT_EQ(result, 2u); + result = buffer.Write(reinterpret_cast(data.c_str()), data.size()); EXPECT_EQ(result, data.size()); - testAdmin.Sync("server wrote"); - testAdmin.Sync("client wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint8_t loadBuffer[cyclicBufferSize + 1]; + result = buffer.Peek(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; - testAdmin.Sync("server peek"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); result = buffer.Read(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + buffer.Alert(); buffer.Flush(); - testAdmin.Sync("server read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); }; - static std::function lambdaVar = lambdaFunc; + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; + constexpr uint32_t cyclicBufferSize = 0; - // This side (tested) acts as client - IPTestAdministrator testAdmin(otherSide); - { - testAdmin.Sync("setup server"); + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; - uint32_t cyclicBufferSize = 0; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); + + ASSERT_TRUE(buffer.IsValid()); - CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("setup client"); - testAdmin.Sync("server wrote"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); string data = "j"; - uint16_t size = 9; + const uint16_t size = 9; + uint32_t result = buffer.Reserve(9); EXPECT_EQ(result, 9u); + result = buffer.Write(reinterpret_cast(&size), 2u); EXPECT_EQ(result, 2u); + result = buffer.Write(reinterpret_cast(data.c_str()), 1u); EXPECT_EQ(result, data.size()); data = "klmnop"; + result = buffer.Write(reinterpret_cast(data.c_str()), 6u); EXPECT_EQ(result, data.size()); - testAdmin.Sync("client wrote"); - testAdmin.Sync("server peek"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("server read"); buffer.Close(); - } + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child + ::Thunder::Core::Singleton::Dispose(); } TEST(Core_CyclicBuffer, WithOverwriteUsingForksReversed) { - std::string bufferName {"cyclicbuffer03"}; + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 1; - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { - uint32_t cyclicBufferSize = 0; - testAdmin.Sync("setup server"); + const std::string bufferName {"cyclicbuffer03"}; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { + constexpr uint32_t cyclicBufferSize = 0; + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; + + CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); EXPECT_TRUE(buffer.IsOverwrite()); - testAdmin.Sync("setup client"); - testAdmin.Sync("server wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint16_t size; - buffer.Read(reinterpret_cast(&size), 2); + EXPECT_EQ(buffer.Read(reinterpret_cast(&size), 2), 2); + uint8_t loadBuffer[cyclicBufferSize + 1]; uint32_t result = buffer.Read(loadBuffer, size - 2); loadBuffer[result] = '\0'; + EXPECT_EQ(result, size - 2); - testAdmin.Sync("client read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); string data = "j"; size = 9; + result = buffer.Reserve(size); + EXPECT_EQ(result, size); + result = buffer.Write(reinterpret_cast(&size), 2); + EXPECT_EQ(result, size); + result = buffer.Write(reinterpret_cast(data.c_str()), 1); + EXPECT_EQ(result, size); + data = "lmnopq"; result = buffer.Write(reinterpret_cast(data.c_str()), 6); + EXPECT_EQ(result, 6); + EXPECT_EQ(buffer.Used(), 9u); EXPECT_EQ(buffer.Overwritten(), false); size = 7; result = buffer.Reserve(size); + EXPECT_EQ(result, size); + result = buffer.Write(reinterpret_cast(&size), 2); + EXPECT_EQ(result, size); + result = buffer.Write(reinterpret_cast(data.c_str()), size - 2); + EXPECT_EQ(result, size); + EXPECT_EQ(buffer.Free(), 3u); EXPECT_EQ(buffer.Used(), 7u); - testAdmin.Sync("client wrote"); - testAdmin.Sync("server peek"); - testAdmin.Sync("server read"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); buffer.Flush(); EXPECT_EQ(buffer.Used(), 0u); - testAdmin.Sync("client flush"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); }; - static std::function lambdaVar = lambdaFunc; + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + // a small delay so the child can be set up + SleepMs(maxInitTime); - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; - // This side (tested) acts as server - IPTestAdministrator testAdmin(otherSide); - { - uint32_t cyclicBufferSize = 10; - const uint32_t mode = - ::Thunder::Core::File::Mode::USER_READ | ::Thunder::Core::File::Mode::USER_WRITE | ::Thunder::Core::File::Mode::USER_EXECUTE | - ::Thunder::Core::File::Mode::GROUP_READ | ::Thunder::Core::File::Mode::GROUP_WRITE | - ::Thunder::Core::File::Mode::SHAREABLE; + constexpr uint32_t cyclicBufferSize = 10; + + constexpr uint32_t mode = ::Thunder::Core::File::Mode::USER_READ + | ::Thunder::Core::File::Mode::USER_WRITE + | ::Thunder::Core::File::Mode::USER_EXECUTE + | ::Thunder::Core::File::Mode::GROUP_READ + | ::Thunder::Core::File::Mode::GROUP_WRITE + | ::Thunder::Core::File::Mode::SHAREABLE + ; + + CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); - CyclicBufferTest buffer(bufferName.c_str(), mode, cyclicBufferSize, true); EXPECT_TRUE(buffer.IsOverwrite()); EXPECT_TRUE(buffer.IsValid()); - testAdmin.Sync("setup server"); - testAdmin.Sync("setup client"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); uint8_t loadBuffer[cyclicBufferSize + 1]; uint16_t size = 9; string data = "abcdefi"; uint32_t result = buffer.Write(reinterpret_cast(&size), 2); + result = buffer.Write(reinterpret_cast(data.c_str()), size - 2); EXPECT_EQ(result, data.size()); - testAdmin.Sync("server wrote"); - testAdmin.Sync("client read"); - testAdmin.Sync("client wrote"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); buffer.Peek(reinterpret_cast(&size), 2); EXPECT_EQ(size, buffer.Used()); + result = buffer.Peek(loadBuffer, size); loadBuffer[result] = '\0'; EXPECT_EQ(result, 7u); - EXPECT_STREQ((char*)loadBuffer +2, "lmnop"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0] + 2), "lmnop"); + + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("server peek"); EXPECT_EQ(buffer.Free(), 3u); + buffer.Read(reinterpret_cast(&size), 2); EXPECT_EQ(buffer.Used(), static_cast(size - 2)); + result = buffer.Read(loadBuffer, size - 2); loadBuffer[result] = '\0'; EXPECT_EQ(result, 5u); - EXPECT_STREQ((char*)loadBuffer, "lmnop"); + EXPECT_STREQ(reinterpret_cast(&loadBuffer[0]), "lmnop"); + EXPECT_EQ(buffer.Free(), 10u); EXPECT_EQ(buffer.Used(), 0u); @@ -1342,12 +1487,19 @@ namespace Core { EXPECT_FALSE(buffer.IsLocked()); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("server read"); - testAdmin.Sync("client flush"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(buffer.Free(), cyclicBufferSize); buffer.Close(); - } + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child + ::Thunder::Core::Singleton::Dispose(); } TEST(Core_CyclicBuffer, LockUnlock_WithoutDataPresent) @@ -1387,9 +1539,12 @@ namespace Core { TEST(Core_CyclicBuffer, DISABLED_LockUnLock_FromParentAndForks) { + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 1; + std::string bufferName {"cyclicbuffer04"}; - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { uint32_t cyclicBufferSize = 20; const uint32_t mode = @@ -1399,23 +1554,23 @@ namespace Core { ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); - testAdmin.Sync("setup server"); - testAdmin.Sync("setup client"); +// testAdmin.Sync("setup server"); +// testAdmin.Sync("setup client"); EXPECT_EQ(buffer.LockPid(), 0u); buffer.Lock(false); EXPECT_EQ(buffer.LockPid(), static_cast(getpid())); - testAdmin.Sync("server locked"); +// testAdmin.Sync("server locked"); - testAdmin.Sync("client wrote"); +// testAdmin.Sync("client wrote"); buffer.Unlock(); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("server unlocked"); +// testAdmin.Sync("server unlocked"); - testAdmin.Sync("client locked"); +// testAdmin.Sync("client locked"); EXPECT_NE(buffer.LockPid(), 0u); EXPECT_NE(buffer.LockPid(), static_cast(getpid())); - testAdmin.Sync("server verified"); +// testAdmin.Sync("server verified"); // TODO: What is the purpose of lock ?? since we are able to write from server process // even it is locked from client process @@ -1426,22 +1581,19 @@ namespace Core { uint8_t loadBuffer[cyclicBufferSize + 1]; result = buffer.Peek(loadBuffer, buffer.Used()); loadBuffer[result] = '\0'; - testAdmin.Sync("server wrote and peeked"); +// testAdmin.Sync("server wrote and peeked"); - testAdmin.Sync("client unlocked"); +// testAdmin.Sync("client unlocked"); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("client read"); +// testAdmin.Sync("client read"); }; - static std::function lambdaVar = lambdaFunc; + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + // a small delay so the child can be set up + SleepMs(maxInitTime); - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; - - // This side (tested) acts as client - IPTestAdministrator testAdmin(otherSide, 10); - { - testAdmin.Sync("setup server"); +// testAdmin.Sync("setup server"); uint32_t cyclicBufferSize = 0; const uint32_t mode = @@ -1452,8 +1604,8 @@ namespace Core { ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, true); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("setup client"); - testAdmin.Sync("server locked"); +// testAdmin.Sync("setup client"); +// testAdmin.Sync("server locked"); EXPECT_NE(buffer.LockPid(), 0u); EXPECT_NE(buffer.LockPid(), static_cast(getpid())); @@ -1465,37 +1617,45 @@ namespace Core { EXPECT_NE(buffer.LockPid(), 0u); EXPECT_NE(buffer.LockPid(), static_cast(getpid())); - testAdmin.Sync("client wrote"); - testAdmin.Sync("server unlocked"); +// testAdmin.Sync("client wrote"); +// testAdmin.Sync("server unlocked"); EXPECT_EQ(buffer.LockPid(), 0u); buffer.Lock(false); EXPECT_EQ(buffer.LockPid(), static_cast(getpid())); - testAdmin.Sync("client locked"); - testAdmin.Sync("server verified"); +// testAdmin.Sync("client locked"); +// testAdmin.Sync("server verified"); - testAdmin.Sync("server wrote and peeked"); +// testAdmin.Sync("server wrote and peeked"); buffer.Unlock(); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("client unlocked"); +// testAdmin.Sync("client unlocked"); uint8_t loadBuffer[cyclicBufferSize + 1]; result = buffer.Read(loadBuffer, 4); loadBuffer[result] = '\0'; EXPECT_STREQ((char*)loadBuffer, "jklm"); - testAdmin.Sync("client read"); +// testAdmin.Sync("client read"); buffer.Close(); - } + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child + ::Thunder::Core::Singleton::Dispose(); } //TODO: revisit these test cases after fixing the issues with cyclicbuffer lock/unlock sequence TEST(Core_CyclicBuffer, DISABLED_LockUnlock_FromParentAndForks_WithDataPresent) { + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 1; + std::string bufferName {"cyclicbuffer05"}; - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { uint32_t cyclicBufferSize = 20; const uint32_t mode = @@ -1505,13 +1665,13 @@ namespace Core { ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); - testAdmin.Sync("setup server"); - testAdmin.Sync("setup client"); +// testAdmin.Sync("setup server"); +// testAdmin.Sync("setup client"); EXPECT_EQ(buffer.LockPid(), 0u); buffer.Lock(true, 100); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("server timedLock"); +// testAdmin.Sync("server timedLock"); EXPECT_EQ(buffer.IsLocked(), false); { @@ -1539,7 +1699,7 @@ namespace Core { EXPECT_EQ(buffer.IsLocked(), true); buffer.Unlock(); } - testAdmin.Sync("server locked & unlocked"); +// testAdmin.Sync("server locked & unlocked"); buffer.Flush(); EXPECT_EQ(buffer.Used(), 0u); @@ -1558,7 +1718,7 @@ namespace Core { } sleep(1); - testAdmin.Sync("server locked"); +// testAdmin.Sync("server locked"); EXPECT_EQ(buffer.LockPid(), 0u); EXPECT_EQ(buffer.IsLocked(), false); @@ -1574,24 +1734,18 @@ namespace Core { event.ResetEvent(); threadLock.Stop(); - testAdmin.Sync("server locked & wrote"); +// testAdmin.Sync("server locked & wrote"); buffer.Unlock(); - testAdmin.Sync("server unlocked"); - testAdmin.Sync("client wrote & locked"); +// testAdmin.Sync("server unlocked"); +// testAdmin.Sync("client wrote & locked"); - testAdmin.Sync("client exit"); +// testAdmin.Sync("client exit"); EXPECT_EQ(buffer.LockPid(), 0u); }; - static std::function lambdaVar = lambdaFunc; - - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; - - // This side (tested) acts as client - IPTestAdministrator testAdmin(otherSide, 15); - { - testAdmin.Sync("setup server"); + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { +// testAdmin.Sync("setup server"); uint32_t cyclicBufferSize = 0; const uint32_t mode = @@ -1600,17 +1754,17 @@ namespace Core { ::Thunder::Core::File::Mode::SHAREABLE; ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, true); - testAdmin.Sync("setup client"); +// testAdmin.Sync("setup client"); - testAdmin.Sync("server timedLock"); +// testAdmin.Sync("server timedLock"); - testAdmin.Sync("server locked & unlocked"); +// testAdmin.Sync("server locked & unlocked"); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("server locked"); +// testAdmin.Sync("server locked"); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("server locked & wrote"); +// testAdmin.Sync("server locked & wrote"); EXPECT_NE(buffer.LockPid(), 0u); - testAdmin.Sync("server unlocked"); +// testAdmin.Sync("server unlocked"); EXPECT_EQ(buffer.LockPid(), 0u); // Check Lock Timed Out after wait Time @@ -1643,12 +1797,17 @@ namespace Core { threadLock.Stop(); EXPECT_EQ(buffer.LockPid(), static_cast(getpid())); - testAdmin.Sync("client wrote & locked"); +// testAdmin.Sync("client wrote & locked"); buffer.Unlock(); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("client exit"); +// testAdmin.Sync("client exit"); buffer.Close(); - } + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child + ::Thunder::Core::Singleton::Dispose(); } TEST(Core_CyclicBuffer, DISABLED_LockUnlock_UsingAlert) @@ -1685,9 +1844,12 @@ namespace Core { } TEST(Core_CyclicBuffer, DISABLED_LockUnlock_FromParentAndForks_UsingAlert) { + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 1; + std::string bufferName {"cyclicbuffer05"}; - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { uint32_t cyclicBufferSize = 20; const uint32_t mode = @@ -1697,15 +1859,15 @@ namespace Core { ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, false); - testAdmin.Sync("setup server"); - testAdmin.Sync("setup client"); +// testAdmin.Sync("setup server"); +// testAdmin.Sync("setup client"); EXPECT_EQ(buffer.LockPid(), 0u); ::Thunder::Core::Event event(false, false); ThreadLock threadLock(buffer, ::Thunder::Core::infinite, event); threadLock.Run(); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("server locked"); +// testAdmin.Sync("server locked"); // Lock before requesting cyclic buffer lock if (event.Lock(MaxSignalWaitTime * 2) == ::Thunder::Core::ERROR_NONE) { @@ -1721,21 +1883,18 @@ namespace Core { event.ResetEvent(); threadLock.Stop(); - testAdmin.Sync("server alerted"); - testAdmin.Sync("client locked"); +// testAdmin.Sync("server alerted"); +// testAdmin.Sync("client locked"); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("client alerted"); +// testAdmin.Sync("client alerted"); EXPECT_EQ(buffer.LockPid(), 0u); }; - static std::function lambdaVar = lambdaFunc; - - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + // a small delay so the child can be set up + SleepMs(maxInitTime); - // This side (tested) acts as client - IPTestAdministrator testAdmin(otherSide, 10); - { - testAdmin.Sync("setup server"); +// testAdmin.Sync("setup server"); uint32_t cyclicBufferSize = 0; const uint32_t mode = @@ -1745,19 +1904,19 @@ namespace Core { ::Thunder::Core::CyclicBuffer buffer(bufferName.c_str(), mode, cyclicBufferSize, true); - testAdmin.Sync("setup client"); - testAdmin.Sync("server locked"); +// testAdmin.Sync("setup client"); +// testAdmin.Sync("server locked"); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("server alerted"); +// testAdmin.Sync("server alerted"); EXPECT_EQ(buffer.LockPid(), 0u); ::Thunder::Core::Event event(false, false); ThreadLock threadLock(buffer, ::Thunder::Core::infinite, event); threadLock.Run(); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("client locked"); +// testAdmin.Sync("client locked"); // Lock before requesting cyclic buffer lock if (event.Lock(MaxSignalWaitTime * 2) == ::Thunder::Core::ERROR_NONE) { @@ -1774,10 +1933,15 @@ namespace Core { threadLock.Stop(); EXPECT_EQ(buffer.LockPid(), 0u); - testAdmin.Sync("client alerted"); +// testAdmin.Sync("client alerted"); buffer.Close(); - } + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child + ::Thunder::Core::Singleton::Dispose(); } diff --git a/Tests/unit/core/test_cyclicbuffer_dataexchange.cpp b/Tests/unit/core/test_cyclicbuffer_dataexchange.cpp new file mode 100644 index 000000000..a066539b4 --- /dev/null +++ b/Tests/unit/core/test_cyclicbuffer_dataexchange.cpp @@ -0,0 +1,105 @@ +/* + * If not stated otherwise in this file or this component's LICENSE file the + * following copyright and licenses apply: + * + * Copyright 2020 Metrological + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include "../../cyclic-buffer/process.h" + + +namespace WPEFramework { +namespace Core { +namespace Tests { + +#define ASYNC_TIMEOUT_BEGIN \ + std::promise promise; \ + std::future future = promise.get_future(); \ + std::thread([&](std::promise completed) \ + { /* Before code that should complete before timeout expires */ + +#define ASYNC_TIMEOUT_END(MILLISECONDS /* timeout in milliseconds */, EXPECTATION /* boolean */) \ + /* After code that should complete timely */ \ + /* completed.set_value(true); */ \ + completed.set_value_at_thread_exit(true); \ + } \ + , std::move(promise)).detach() \ + ; \ + bool result = future.wait_for(std::chrono::milliseconds(MILLISECONDS)) != std::future_status::timeout; /* Task completed before timeout */ \ + EXPECT_TRUE(result == EXPECTATION); /* Task completed before timeout */ \ + if (!result) { \ + TRACE_L1(_T("Error : Stopping unresposive process.")); \ + killpg(getpgrp(), SIGUSR1); /* Possible 'unresponsive' system, 'unlock' all related 'child' processes, default action is terminate */ \ + } + +TEST(Core_CyclicBuffer, DataExchangeTimeout) +{ + constexpr uint8_t maxChildren = 1; + + constexpr uint32_t memoryMappedFileRequestedSize = 30;//446; + constexpr uint32_t internalBufferSize = 40;//446; + + constexpr char fileName[] = "/tmp/SharedCyclicBuffer"; + + constexpr uint32_t totalRuntime = Core::infinite; // Milliseconds + constexpr uint32_t timeout = 10000; // Milliseconds + + WPEFramework::Tests::Process process(fileName); + + ASYNC_TIMEOUT_BEGIN; // Avoid leaking resources, eg, children + + EXPECT_TRUE(process.SetTotalRuntime(totalRuntime) + && process.SetNumReservedBlocks(2) + && process.SetParentUsers(0, 0) /* 0 extra writer(s), 0 reader(s) */ + && process.SetChildUsers(0, 1) /* 1 writer(s), 1 reader(s) */ + && process.Execute() + ); + + ASYNC_TIMEOUT_END(timeout, false /* expect no timeout */); +} + +TEST(Core_CyclicBuffer, DataExchange) +{ + constexpr uint8_t maxChildren = 1; + + constexpr uint32_t memoryMappedFileRequestedSize = 30;//446; + constexpr uint32_t internalBufferSize = 40;//446; + + constexpr char fileName[] = "/tmp/SharedCyclicBuffer"; + + constexpr uint32_t totalRuntime = 10000; // Milliseconds + constexpr uint32_t timeout = totalRuntime + 10000; // Milliseconds + + WPEFramework::Tests::Process process(fileName); + + ASYNC_TIMEOUT_BEGIN; // Avoid leaking resources, eg, children + + EXPECT_TRUE(process.SetTotalRuntime(totalRuntime) + && process.SetNumReservedBlocks(2) + && process.SetParentUsers(0, 0) /* 0 extra writer(s), 0 reader(s) */ + && process.SetChildUsers(0, 1) /* 1 writer(s), 1 reader(s) */ + && process.Execute() + ); + + ASYNC_TIMEOUT_END(timeout, true /* expect no timeout */); +} + +} // Tests +} // Core +} // WPEFramework diff --git a/Tests/unit/core/test_dataelementfile.cpp b/Tests/unit/core/test_dataelementfile.cpp index 472ee898f..5987e7fa0 100644 --- a/Tests/unit/core/test_dataelementfile.cpp +++ b/Tests/unit/core/test_dataelementfile.cpp @@ -55,7 +55,7 @@ namespace Core { { const string fileName = "dataFile.txt"; const string message = ">echo 'DataElement file checking......'"; - string buffer = message + fileName; + const string buffer = message + fileName; #ifdef __POSIX__ errno = 0; @@ -74,8 +74,8 @@ namespace Core { #endif ASSERT_TRUE(file.Create(true)); - EXPECT_EQ(file.IsOpen(), true); - EXPECT_EQ(file.Name(), fileName); + EXPECT_TRUE(file.IsOpen()); + EXPECT_STREQ(file.Name().c_str(), fileName.c_str()); #ifdef __POSIX__ errno = 0; @@ -101,6 +101,7 @@ namespace Core { EXPECT_EQ(obj2.IsValid(), true); const ::Thunder::Core::File& obj1File = obj1.Storage(); + EXPECT_TRUE(obj1File.IsOpen()); EXPECT_STREQ(obj1File.FileName().c_str(), file.FileName().c_str()); obj1.ReloadFileInfo(); diff --git a/Tests/unit/core/test_message_dispatcher.cpp b/Tests/unit/core/test_message_dispatcher.cpp index cf0a3a2ce..0898110ce 100644 --- a/Tests/unit/core/test_message_dispatcher.cpp +++ b/Tests/unit/core/test_message_dispatcher.cpp @@ -189,30 +189,41 @@ namespace Core { TEST_F(Core_MessageDispatcher, WriteAndReadDataAreEqualInDifferentProcesses) { - auto lambdaFunc = [this](IPTestAdministrator& testAdmin) { + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 4, maxWaitTimeMs = 4000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 1; + + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { ::Thunder::Messaging::MessageDataBuffer dispatcher(this->_identifier, this->_instanceId, this->_basePath, DATA_SIZE, 0, false); uint8_t readData[4]; uint16_t readLength = sizeof(readData); - // Arbitrary timeout value, 1 second - ASSERT_EQ(dispatcher.Wait(1000), ::Thunder::Core::ERROR_NONE); + ASSERT_EQ(dispatcher.Wait(maxWaitTimeMs), ::Thunder::Core::ERROR_NONE); ASSERT_EQ(dispatcher.PopData(readLength, readData), ::Thunder::Core::ERROR_NONE); ASSERT_EQ(readLength, 2); ASSERT_EQ(readData[0], 13); ASSERT_EQ(readData[1], 37); - }; - static std::function lambdaVar = lambdaFunc; - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin) { lambdaVar(testAdmin); }; + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + }; // This side (tested) acts as writer - IPTestAdministrator testAdmin(otherSide); - { + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + // a small delay so the child can be set up + SleepMs(maxInitTime); + uint8_t testData[2] = { 13, 37 }; ASSERT_EQ(_dispatcher->PushData(sizeof(testData), testData), ::Thunder::Core::ERROR_NONE); - } + + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child + + ::Thunder::Core::Singleton::Dispose(); } TEST_F(Core_MessageDispatcher, PushDataShouldNotFitWhenExcedingDataBufferSize) diff --git a/Tests/unit/core/test_sharedbuffer.cpp b/Tests/unit/core/test_sharedbuffer.cpp index 0dd41eecc..be461a0d2 100644 --- a/Tests/unit/core/test_sharedbuffer.cpp +++ b/Tests/unit/core/test_sharedbuffer.cpp @@ -31,28 +31,19 @@ namespace Thunder { namespace Tests { namespace Core { - void CleanUpBuffer(string bufferName) - { - // TODO: shouldn't this be done producer-side? - char systemCmd[1024]; - string command = "rm -rf "; - snprintf(systemCmd, command.size() + bufferName.size() + 1, "%s%s", command.c_str(), bufferName.c_str()); - system(systemCmd); - - string ext = ".admin"; - snprintf(systemCmd, command.size() + bufferName.size() + ext.size() + 1, "%s%s%s", command.c_str(), bufferName.c_str(), ext.c_str()); - system(const_cast(systemCmd)); - } - TEST(Core_SharedBuffer, simpleSet) { - std::string bufferName {"testbuffer01"} ; - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { - CleanUpBuffer(bufferName); + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 6, maxWaitTimeMs = 6000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 20; - uint16_t administrationSize = 64; - uint32_t bufferSize = 8 * 1024; - uint32_t result; + constexpr uint16_t administrationSize = 64; + constexpr uint32_t bufferSize = 8 * 1024; + + const std::string bufferName {"testbuffer01"} ; + + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { + // a small delay so the parent can be set up + SleepMs(maxInitTime); ::Thunder::Core::SharedBuffer buff01(bufferName.c_str(), ::Thunder::Core::File::USER_READ | @@ -62,99 +53,97 @@ namespace Core { ::Thunder::Core::File::GROUP_WRITE , bufferSize, administrationSize); - result = buff01.RequestProduce(::Thunder::Core::infinite); - EXPECT_EQ(result, ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("setup producer"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("setup consumer"); + EXPECT_EQ(buff01.RequestProduce(maxWaitTimeMs), ::Thunder::Core::ERROR_NONE); - uint8_t * buffer = buff01.Buffer(); + uint8_t* buffer = buff01.Buffer(); + + ASSERT_TRUE(buffer != nullptr); EXPECT_EQ(buff01.Size(), bufferSize); buffer[0] = 42; buffer[1] = 43; buffer[2] = 44; - result = buff01.Produced(); - EXPECT_EQ(result, ::Thunder::Core::ERROR_NONE); - - testAdmin.Sync("consumer done"); - }; + EXPECT_EQ(buff01.Produced(), ::Thunder::Core::ERROR_NONE); - static std::function lambdaVar = lambdaFunc; + // Data made available + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; + // Buffer no longer used + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + }; - // This side (tested) acts as client (consumer). - IPTestAdministrator testAdmin(otherSide); - { - // In extra scope, to make sure "buff01" is destructed before producer. - testAdmin.Sync("setup producer"); + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - uint32_t bufferSize = 8 * 1024; - uint32_t result; ::Thunder::Core::SharedBuffer buff01(bufferName.c_str()); - testAdmin.Sync("setup consumer"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + + EXPECT_EQ(buff01.RequestConsume(maxWaitTimeMs), ::Thunder::Core::ERROR_NONE); - result = buff01.RequestConsume(::Thunder::Core::infinite); - EXPECT_EQ(result, ::Thunder::Core::ERROR_NONE); + uint8_t* buffer = buff01.Buffer(); - uint8_t * buffer = buff01.Buffer(); + ASSERT_TRUE(buffer != nullptr); EXPECT_EQ(buff01.Size(), bufferSize); EXPECT_EQ(buffer[0], 42); EXPECT_EQ(buffer[1], 43); EXPECT_EQ(buffer[2], 44); - buff01.Consumed(); - EXPECT_EQ(result, ::Thunder::Core::ERROR_NONE); - } + EXPECT_EQ(buff01.Consumed(), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("consumer done"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); + + // Code after this line is executed by both parent and child ::Thunder::Core::Singleton::Dispose(); } TEST(Core_SharedBuffer, simpleSetReversed) { - std::string bufferName {"testbuffer02"} ; - auto lambdaFunc = [bufferName](IPTestAdministrator & testAdmin) { + constexpr uint32_t initHandshakeValue = 0, maxWaitTime = 6, maxWaitTimeMs = 6000, maxInitTime = 2000; + constexpr uint8_t maxRetries = 20; + + constexpr uint16_t administrationSize = 64; + constexpr uint32_t bufferSize = 8 * 1024; + + const std::string bufferName {"testbuffer02"} ; + + IPTestAdministrator::Callback callback_child = [&](IPTestAdministrator& testAdmin) { // In extra scope, to make sure "buff01" is destructed before producer. - testAdmin.Sync("setup consumer"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - uint32_t bufferSize = 8 * 1024; - uint32_t result; ::Thunder::Core::SharedBuffer buff01(bufferName.c_str()); - testAdmin.Sync("setup producer"); + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); - result = buff01.RequestConsume(::Thunder::Core::infinite); - EXPECT_EQ(result, ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(buff01.RequestConsume(maxWaitTimeMs), ::Thunder::Core::ERROR_NONE); - uint8_t * buffer = buff01.Buffer(); + uint8_t* buffer = buff01.Buffer(); + + ASSERT_TRUE(buffer != nullptr); EXPECT_EQ(buff01.Size(), bufferSize); EXPECT_EQ(buffer[0], 42); EXPECT_EQ(buffer[1], 43); EXPECT_EQ(buffer[2], 44); - buff01.Consumed(); - EXPECT_EQ(result, ::Thunder::Core::ERROR_NONE); + EXPECT_EQ(buff01.Consumed(), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("producer done"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); }; - static std::function lambdaVar = lambdaFunc; - - IPTestAdministrator::OtherSideMain otherSide = [](IPTestAdministrator& testAdmin ) { lambdaVar(testAdmin); }; - - // This side (tested) acts as client (consumer). - IPTestAdministrator testAdmin(otherSide); - { - CleanUpBuffer(bufferName); + IPTestAdministrator::Callback callback_parent = [&](IPTestAdministrator& testAdmin) { + // a small delay so the child can be set up + SleepMs(maxInitTime); uint16_t administrationSize = 64; uint32_t bufferSize = 8 * 1024; @@ -168,27 +157,33 @@ namespace Core { ::Thunder::Core::File::GROUP_WRITE , bufferSize, administrationSize); - result = buff01.RequestProduce(::Thunder::Core::infinite); - EXPECT_EQ(result, ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("setup consumer"); + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + EXPECT_EQ(buff01.RequestProduce(maxWaitTimeMs), ::Thunder::Core::ERROR_NONE); - testAdmin.Sync("setup producer"); + uint8_t* buffer = buff01.Buffer(); - uint8_t * buffer = buff01.Buffer(); + ASSERT_TRUE(buffer != nullptr); EXPECT_EQ(buff01.Size(), bufferSize); buffer[0] = 42; buffer[1] = 43; buffer[2] = 44; - result = buff01.Produced(); - EXPECT_EQ(result, ::Thunder::Core::ERROR_NONE); - } + EXPECT_EQ(buff01.Produced(), Thunder::Core::ERROR_NONE); + + // Data made available + ASSERT_EQ(testAdmin.Signal(initHandshakeValue, maxRetries), ::Thunder::Core::ERROR_NONE); + + // Buffer no longer used + ASSERT_EQ(testAdmin.Wait(initHandshakeValue), ::Thunder::Core::ERROR_NONE); + }; + + IPTestAdministrator testAdmin(callback_parent, callback_child, initHandshakeValue, maxWaitTime); - testAdmin.Sync("producer done"); + // Code after this line is executed by both parent and child - CleanUpBuffer(bufferName); ::Thunder::Core::Singleton::Dispose(); } diff --git a/Tests/unit/core/test_singleton.cpp b/Tests/unit/core/test_singleton.cpp index 1b1c6a046..e105c0b4e 100644 --- a/Tests/unit/core/test_singleton.cpp +++ b/Tests/unit/core/test_singleton.cpp @@ -37,11 +37,16 @@ namespace Core { virtual ~SingletonTypeOne() { } + bool operator==(const SingletonTypeOne&) const + { + return true; + } }; class SingletonTypeTwo { public: - SingletonTypeTwo(string) + SingletonTypeTwo() = default; // Required by Instance() + explicit SingletonTypeTwo(string) { } virtual ~SingletonTypeTwo() @@ -50,7 +55,8 @@ namespace Core { }; class SingletonTypeThree { public: - SingletonTypeThree (string, string) + SingletonTypeThree() = default; // Required by Instance() + explicit SingletonTypeThree (string, string) { } virtual ~SingletonTypeThree() @@ -60,17 +66,35 @@ namespace Core { TEST(test_singleton, simple_singleton) { - static SingletonTypeOne& object1 = ::Thunder::Core::SingletonType::Instance(); - static SingletonTypeOne& object_sample = ::Thunder::Core::SingletonType::Instance(); - EXPECT_EQ(&object1,&object_sample); -#ifndef __APPLE__ // These are already marked as deprecated in core/Singleton.h, so commenting to avoid warning - static SingletonTypeTwo& object2 = ::Thunder::Core::SingletonType::Instance("SingletonTypeTwo"); - static SingletonTypeThree& object3 = ::Thunder::Core::SingletonType::Instance("SingletonTypeThree","SingletonTypeThree"); - ::Thunder::Core::SingletonType* x = (::Thunder::Core::SingletonType*)&object2; - EXPECT_STREQ(x->ImplementationName().c_str(),"SingletonTypeTwo"); - ::Thunder::Core::SingletonType* y = (::Thunder::Core::SingletonType*)&object3; - EXPECT_STREQ(y->ImplementationName().c_str(),"SingletonTypeThree"); -#endif + // 'old' use + ::Thunder::Tests::Core::SingletonTypeOne& objectTypeOne = ::Thunder::Core::SingletonType::Instance(); + + // Multiple inheritance, SingletonType has base SINGLETON, eg, ::Thunder::Tests::Core::SingletonTypeOne, and base Singleton + // Internally a ::Thunder::Core::SingletonType pointer is contructed and via upcasted version of a base avaiable via Instance + // It is safe to downcast it + ASSERT_NE(dynamic_cast<::Thunder::Core::SingletonType*>(&objectTypeOne), nullptr); + EXPECT_FALSE(dynamic_cast<::Thunder::Core::SingletonType*>(&objectTypeOne)->ImplementationName().empty()); + EXPECT_STREQ(dynamic_cast<::Thunder::Core::SingletonType*>(&objectTypeOne)->ImplementationName().c_str(), "SingletonTypeOne"); + + // Well-known lifetime! + ::Thunder::Core::SingletonType::Create("My custom 2-string"); + ::Thunder::Tests::Core::SingletonTypeTwo& objectTypeTwo = ::Thunder::Core::SingletonType::Instance(); + EXPECT_FALSE(dynamic_cast<::Thunder::Core::SingletonType*>(&objectTypeTwo)->ImplementationName().empty()); + EXPECT_STREQ(dynamic_cast<::Thunder::Core::SingletonType*>(&objectTypeTwo)->ImplementationName().c_str(), "SingletonTypeTwo"); + EXPECT_TRUE(::Thunder::Core::SingletonType::Dispose()); + + // Well-known lifetime! + ::Thunder::Core::SingletonType::Create("My first custom 3-string", "My second custom 3-string"); + ::Thunder::Tests::Core::SingletonTypeThree& objectTypeThree = ::Thunder::Core::SingletonType::Instance(); + EXPECT_FALSE(dynamic_cast<::Thunder::Core::SingletonType*>(&objectTypeThree)->ImplementationName().empty()); + EXPECT_STREQ(dynamic_cast<::Thunder::Core::SingletonType*>(&objectTypeThree)->ImplementationName().c_str(), "SingletonTypeThree"); + EXPECT_TRUE(::Thunder::Core::SingletonType::Dispose()); + + // SingletonTypeOne has not yet been destroyed + // Assume equality operator has been defined + EXPECT_EQ(::Thunder::Core::SingletonType::Instance(), ::Thunder::Core::SingletonType::Instance()); + + // Keep going with good practise ::Thunder::Core::Singleton::Dispose(); } diff --git a/Tests/unit/core/test_valuerecorder.cpp b/Tests/unit/core/test_valuerecorder.cpp index db51c7226..4babdbf96 100644 --- a/Tests/unit/core/test_valuerecorder.cpp +++ b/Tests/unit/core/test_valuerecorder.cpp @@ -29,112 +29,102 @@ namespace Thunder { namespace Tests { namespace Core { - const unsigned int BLOCKSIZE = 20; + constexpr unsigned int BLOCKSIZE = 20; - class WriterClass : public RecorderType::Writer + class WriterClass { - public: - WriterClass() = delete; - - WriterClass(string filename) - : Writer(filename) - , _file(filename) - { - } - - ~WriterClass() - { - } - - public: - void WriterJob() - { - uint8_t arr[] = {1,2,3}; - SetBuffer(arr); - auto object = Create(_file); - Record(10); - uint64_t TimeValue = Time(); - std::string storageName = Source(); - uint32_t value = Value(); - object.Release(); - } - - private: - string _file; + public: + WriterClass() = delete; + + WriterClass(const string& filename) + : _file{filename} + { + _writer = ::Thunder::Core::RecorderType::Writer::Create(filename); + } + + ~WriterClass() + { + _writer.Release(); + } + + public: + void WriterJob(const uint32_t& value) + { + ASSERT_TRUE(_writer.IsValid()); + + _writer->Record(value); + + EXPECT_STREQ(_writer->Source().c_str(), _file.c_str()); + EXPECT_EQ(_writer->Value(), value); + } + + void Save() + { + _writer->Save(); + } + + private: + const string _file; + ::Thunder::Core::ProxyType<::Thunder::Core::RecorderType::Writer> _writer; }; - class ReaderClass : public RecorderType::Reader + class ReaderClass : public ::Thunder::Core::RecorderType::Reader { - public: - ReaderClass() = delete; - - ReaderClass(string filename) - : Reader(filename) - , _file(filename) - { - } - - ReaderClass(const ProxyType& recorder, const uint32_t id = static_cast(~0)) - : Reader(recorder->Source()) - , _file(recorder->Source()) - { - } - - ~ReaderClass() - { - } - - public: - void ReaderJob() - { - Next(); - EXPECT_TRUE(IsValid()); - - uint32_t time = 20; - ::Thunder::Core::Time curTime = ::Thunder::Core::Time::Now(); - curTime.Add(time); - uint32_t index = Store(curTime.Ticks(), 1); - - StepForward(); - StepBack(); - ClearData(); - - Reader obj1(_file, 1u); - EXPECT_FALSE(obj1.Previous()); - EXPECT_TRUE(obj1.Next()); - - EXPECT_EQ(StartId(),1u); - - if (EndId() == StartId()) - EXPECT_EQ(EndId(),1u); - else - EXPECT_EQ(EndId(),2u); - - uint32_t id = EndId(); - std::string storageName = Source(); - } - - private: - string _file; - }; + public: + ReaderClass() = delete; - TEST(test_valuerecorder, test_writer) - { - string filename = "baseRecorder.txt"; + ReaderClass(const string& filename) + : _file{filename} + , Reader(filename) + { + } + + ~ReaderClass() = default; + + public: + void ReaderJob(const uint32_t value) + { + static_assert(std::is_same<::Thunder::Core::Time::microsecondsfromepoch, uint64_t>::value); + + const ::Thunder::Core::Time::microsecondsfromepoch readTime = ::Thunder::Core::Time::Now().Ticks(); + + // Get a valid position + Reset(StartId()); + + ASSERT_TRUE(IsValid()); + + EXPECT_EQ(Id(), StartId()); - auto obj1 = RecorderType::Writer::Create(filename); + EXPECT_STREQ(Source().c_str(), _file.c_str()); + EXPECT_EQ(value, Value()); - obj1->Copy(*(obj1),1); - obj1->Copy(*(obj1),100); + ASSERT_EQ(Id(), EndId()); - static_cast(*obj1).WriterJob(); + // Load next file if it exist if no additional data exist + EXPECT_FALSE(Next()); + + // The previous failed so no new files has been loaded and the current file is still used + EXPECT_TRUE(Previous()); + + EXPECT_LE(Time(), readTime); + } + + private: + const string _file; + }; + + TEST(test_valuerecorder, test_writer) + { + const string filename = "baseRecorder.txt"; - ReaderClass obj2(filename); - obj2.ReaderJob(); + constexpr uint32_t value = 10; - ReaderClass obj4(ProxyType(obj3)); + WriterClass writer(filename); + writer.WriterJob(value); + writer.Save(); - obj1.Release(); + ReaderClass reader(filename); + reader.ReaderJob(value); } } // Core