diff --git a/Tests/CMakeLists.txt b/Tests/CMakeLists.txt index e4aacf55a..79a328ea9 100644 --- a/Tests/CMakeLists.txt +++ b/Tests/CMakeLists.txt @@ -4,6 +4,7 @@ option(WORKERPOOL_TEST "WorkerPool stress test" OFF) option(FILE_UNLINK_TEST "File unlink test" OFF) option(REDIRECT_TEST "Test stream redirection" OFF) option(MESSAGEBUFFER_TEST "Test message buffer" OFF) +option(UNRAVELLER "reveal thread details" OFF) if(BUILD_TESTS) add_subdirectory(unit) @@ -36,3 +37,7 @@ endif() if(CYCLICBUFFER_TEST) add_subdirectory(cyclic-buffer) endif() + +if(UNRAVELLER) + add_subdirectory(unraveller) +endif() \ No newline at end of file diff --git a/Tests/unraveller/CMakeLists.txt b/Tests/unraveller/CMakeLists.txt new file mode 100644 index 000000000..ed8fa4b01 --- /dev/null +++ b/Tests/unraveller/CMakeLists.txt @@ -0,0 +1,31 @@ +# If not stated otherwise in this file or this component's license file the +# following copyright and licenses apply: +# +# Copyright 2024 Metrological +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(unraveller main.cpp) + +target_link_libraries(unraveller + PRIVATE + ${NAMESPACE}Core::${NAMESPACE}Core + ${NAMESPACE}Cryptalgo::${NAMESPACE}Cryptalgo + ) + +set_target_properties(unraveller PROPERTIES + CXX_STANDARD 11 + CXX_STANDARD_REQUIRED YES + ) + +install(TARGETS unraveller DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT ${NAMESPACE}_Test) diff --git a/Tests/unraveller/main.cpp b/Tests/unraveller/main.cpp new file mode 100644 index 000000000..f102491ce --- /dev/null +++ b/Tests/unraveller/main.cpp @@ -0,0 +1,630 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#define MODULE_NAME unraveller + +#include +#include + +using namespace Thunder; + +namespace App { +struct IThreader { +public: + enum Type : uint8_t { + NONE = 0x0, + PTHREAD = 0x1, + STDTHREAD = 0x2, + }; + + struct ICallback { + virtual ~ICallback() = default; + virtual uint32_t Work(uint64_t id) = 0; + }; + + virtual uint32_t Run(const uint32_t nThreads) = 0; + virtual uint32_t Stop() = 0; + + virtual bool IsRunning() const = 0; + + virtual ~IThreader() = default; + + static Core::ProxyType Instance(const Type type, ICallback* callback); +}; + +static std::unordered_map const threadTypeTable = { + { "pthread", IThreader::Type::PTHREAD }, + { "stdthread", IThreader::Type::STDTHREAD } +}; + +class Printer { +public: + Printer(Printer&&) = delete; + Printer(const Printer&) = delete; + Printer& operator=(Printer&&) = delete; + Printer& operator=(const Printer&) = delete; + + Printer(std::ostream& output = std::cout) + : _lock(std::unique_lock(_mutex)) + , _output(output) + { + } + + template + Printer& operator<<(const TYPE& _t) + { + _output << _t; + return *this; + } + + Printer& operator<<(std::ostream& (*fp)(std::ostream&)) + { + _output << fp; + return *this; + } + +private: + std::unique_lock _lock; + std::ostream& _output; + static std::mutex _mutex; +}; + +std::mutex Printer::_mutex; + +class MemProber { +public: + struct mstat_t { + unsigned long size, resident, share, text, lib, data, dt; + }; + + MemProber(); + ~MemProber() = default; + + MemProber(MemProber&&) = delete; + MemProber(const MemProber&) = delete; + MemProber& operator=(MemProber&&) = delete; + MemProber& operator=(const MemProber&) = delete; + + static bool Probe(mstat_t& mstat) + { + + Core::ProcessInfo info; + + info.MemoryStats(); + + uint64_t rss = info.RSS(); + uint64_t vss = info.VSS(); + + // constexpr TCHAR mstatPath[] = _T("/proc/self/statm"); + bool result(true); + + // FILE* mstatFile = fopen(mstatPath, "r"); + // result = (7 != fscanf(mstatFile, "%ld %ld %ld %ld %ld %ld %ld", &mstat.size, &mstat.resident, &mstat.share, &mstat.text, &mstat.lib, &mstat.data, &mstat.dt)); + // fclose(mstatFile); + + mstat.resident = rss; + mstat.size = vss; + + return result; + } + + static void Dump(const mstat_t& mstat, std::ostream& ostream = std::cout) + { + const int32_t deltaRss(mstat.resident - _cache.resident); + const int32_t deltaVsz(mstat.size - _cache.size); + ostream << " PID: " << getpid() << ", RSS: " << mstat.resident << "kB(" << ((deltaRss > 0) ? "+" : "") << deltaRss << "kB), VSZ: " << mstat.size << "kB (" << ((deltaVsz > 0) ? "+" : "") << deltaVsz << "kB)"; + } + + static void Dump(std::ostream& ostream) + { + std::lock_guard lock(_mutex); + + mstat_t mstat; + memset(&mstat, 0, sizeof(mstat)); + + Probe(mstat); + + Dump(mstat, ostream); + + _cache = std::move(mstat); + } + + static std::string Dump() + { + std::lock_guard lock(_mutex); + + std::stringstream s; + + mstat_t mstat; + memset(&mstat, 0, sizeof(mstat)); + + Probe(mstat); + + Dump(mstat, s); + + _cache = std::move(mstat); + + return s.str(); + } + +private: + static std::mutex _mutex; + static mstat_t _cache; +}; + +std::mutex MemProber::_mutex; +MemProber::mstat_t MemProber::_cache; + +MemProber::MemProber() +{ + memset(&_cache, 0, sizeof(mstat_t)); +} + +class Randomizer { +public: + Randomizer() + : _engine() + , _x() + { + Reseed(); + } + + ~Randomizer() = default; + + template + const TYPE Generate() + { + TYPE ret(0); + + do { + std::uniform_int_distribution random(0, TYPE(~0)); + ret = random(_engine); + } while (ret == 0); + + return ret; + } + +private: + void Reseed() + { + int y; + + void* z = std::malloc(sizeof(int)); + free(z); + + std::seed_seq seed{ + static_cast(reinterpret_cast(&_x)), + static_cast(reinterpret_cast(&y)), + static_cast(reinterpret_cast(z)), + static_cast(++_x), + static_cast(std::hash()(std::this_thread::get_id())), + static_cast(std::chrono::high_resolution_clock::now().time_since_epoch().count()), + }; + + _engine.seed(seed); + } + + std::mt19937 _engine; + uint32_t _x; +}; + +class ConsoleOptions : public Core::Options { +public: + ConsoleOptions(ConsoleOptions&&) = delete; + ConsoleOptions(const ConsoleOptions&) = delete; + ConsoleOptions& operator=(ConsoleOptions&&) = delete; + ConsoleOptions& operator=(const ConsoleOptions&) = delete; + + ConsoleOptions(int argumentCount, TCHAR* arguments[]) + : Core::Options(argumentCount, arguments, _T("t:c:s:h")) + , Count(0) + , Stack(0) + , Type(IThreader::Type::NONE) + , executableName(Core::FileNameOnly(arguments[0])) + { + Parse(); + } + + ~ConsoleOptions() + { + } + +private: + std::string Sanitize(const TCHAR data[]) + { + std::string text(data); + + text.erase( + std::remove_if( + text.begin(), text.end(), [](char const c) { + return ' ' == c || '"' == c || '\'' == c; + }), + text.end()); + + return text; + } + + virtual void Option(const TCHAR option, const TCHAR* argument) + { + switch (option) { + case 't': { + auto it = threadTypeTable.find(Sanitize(argument)); + + if (it != threadTypeTable.end()) { + Type = (it->second); + } + break; + } + case 'c': { + Count = Core::NumberType(Core::TextFragment(argument)).Value(); + break; + } + case 's': { + Stack = Core::NumberType(Core::TextFragment(argument)).Value(); + break; + } + case 'h': + default: { + Printer(std::cerr) << "Usage: " << executableName << " -s -c -t " << std::endl + << " -c The number of threads" << std::endl + << " -s The stack allocation in Kb" << std::endl + << " -t Type of threading to use " << std::endl; + + exit(EXIT_FAILURE); + break; + } + } + } + +public: + uint32_t Count; + uint32_t Stack; + IThreader::Type Type; + +private: + const std::string executableName; +}; + +class CommonThreader : virtual public IThreader { +public: + CommonThreader(CommonThreader&&) = delete; + CommonThreader(const CommonThreader&) = delete; + CommonThreader& operator=(CommonThreader&&) = delete; + CommonThreader& operator=(const CommonThreader&) = delete; + CommonThreader() = delete; + + bool IsRunning() const override + { + return _running; + } + void Arm() const + { + std::unique_lock lock(_mutex); + } + + void Trigger() const + { + _cv.notify_all(); + } + +protected: + CommonThreader(ICallback* callback) + : _callback(callback) + , _running(false) + , _mutex() + , _cv() + { + } + + const uint32_t Work(uint64_t tid) + { + std::unique_lock lock(_mutex); + _cv.wait(lock); + lock.unlock(); + + Printer() << "TID: " << tid << ": Work start: " << MemProber::Dump() << std::endl; + + uint16_t sum(0); + + if ((_callback != nullptr) && (_running == true)) { + sum = _callback->Work(tid); + } + + Printer() << "TID: " << tid << ": Work done: " << sum << MemProber::Dump() << std::endl; + + return sum; + } + + ICallback* _callback; + std::atomic_bool _running; + + mutable std::mutex _mutex; + mutable std::condition_variable _cv; + + MemProber _mem_probe; +}; + +class PThreader : public CommonThreader { +public: + PThreader(PThreader&&) = delete; + PThreader(const PThreader&) = delete; + PThreader& operator=(PThreader&&) = delete; + PThreader& operator=(const PThreader&) = delete; + PThreader() = delete; + + uint32_t Run(const uint32_t nThreads) override + { + + Arm(); + + _running = true; + + for (uint32_t i = 0; i < nThreads; i++) { + pthread_t new_thread; + pthread_create(&new_thread, NULL, Process, this); + _threads.push_back(new_thread); + } + + Printer() << "PThreads starting" << MemProber::Dump() << std::endl; + + Trigger(); + + return Core::ERROR_NONE; + } + + uint32_t Stop() override + { + Printer() << "PThreads stopping" << MemProber::Dump() << std::endl; + + _running = false; + + Trigger(); + + for (auto& thread : _threads) { + // Printer() << "Joining Thread ID: " << thread << std::endl; + pthread_join(thread, NULL); + } + + _threads.clear(); + + return Core::ERROR_NONE; + } + + PThreader(ICallback* callback) + : CommonThreader(callback) + , _threads() + { + } + + ~PThreader() = default; + +private: + static void* Process(void* data) + { + ASSERT(data != nullptr); + + PThreader* context = reinterpret_cast(data); + + const uint64_t tid(pthread_self()); + + if (context) { + const uint16_t sum(context->Work(tid)); + } + + return nullptr; + } + +private: + std::vector _threads; +}; + +class StdThreader : public CommonThreader { +public: + StdThreader(StdThreader&&) = delete; + StdThreader(const StdThreader&) = delete; + StdThreader& operator=(StdThreader&&) = delete; + StdThreader& operator=(const StdThreader&) = delete; + StdThreader() = delete; + + uint32_t Run(const uint32_t nThreads) override + { + Arm(); + + _running = true; + + for (uint32_t i = 0; i < nThreads; i++) { + _threads.emplace_back(Process, this); + } + + Printer() << "STDThreads Starting " << MemProber::Dump() << std::endl; + + Trigger(); + + return Core::ERROR_NONE; + } + + uint32_t Stop() override + { + Printer() << "STDThreads Stopping " << MemProber::Dump() << std::endl; + + _running = false; + + Trigger(); + + for (auto& thread : _threads) { + // Printer() << "Joining Thread ID: " << thread.get_id() << std::endl; + thread.join(); + } + + _threads.clear(); + + return Core::ERROR_NONE; + } + + StdThreader(ICallback* callback) + : CommonThreader(callback) + , _threads() + { + } + + ~StdThreader() = default; + +private: + static void Process(StdThreader* context) + { + ASSERT(context != nullptr); + + const uint64_t tid(static_cast(std::hash()(std::this_thread::get_id()))); + + if (context) { + const uint16_t sum(context->Work(tid)); + } + } + +private: + std::vector _threads; +}; + +class WorkLoad : public IThreader::ICallback { + static constexpr uint16_t Kilobyte = 1024; + +public: + WorkLoad(const uint16_t sizeKb) + : _sizeKb(sizeKb) + , _mutex() + , _cv() + , _random() + { + } + + uint32_t Work(uint64_t tid) override + { + const uint32_t bytes(_sizeKb * Kilobyte); + + Printer() << "TID: " << tid << " - About to allocate " << bytes << "bytes " << MemProber::Dump() << std::endl; + + uint8_t* data = static_cast(::alloca(bytes)); + + if (data) { + for (uint32_t i = 0; i < (bytes / sizeof(uint32_t)); i++) { + data[i * sizeof(uint32_t)] = _random.Generate(); + } + } else { + Printer(std::cerr) << "TID: " << tid << " - Failed to allocate stack of " << bytes << " bytes" << std::endl; + } + + Printer() << "TID: " << tid << " - Waiting to be released, " << MemProber::Dump() << std::endl; + + // wait until release + std::unique_lock lock(_mutex); + _cv.wait(lock); + lock.unlock(); + + return Core::ERROR_NONE; + } + + void Arm() + { + std::unique_lock lock(_mutex); + } + + void Trigger() + { + _cv.notify_all(); + } + +private: + const uint16_t _sizeKb; + std::mutex _mutex; + std::condition_variable _cv; + Randomizer _random; +}; + +Core::ProxyType IThreader::Instance(const IThreader::Type type, ICallback* callback) +{ + Core::ProxyType iface; + static Core::ProxyListType _ifaces; + + switch (type) { + case IThreader::Type::PTHREAD: + iface = _ifaces.Instance(callback); + break; + + case IThreader::Type::STDTHREAD: + iface = _ifaces.Instance(callback); + break; + + default: + break; + } + + return iface; +} +} + +int main(int argc, char** argv) +{ + App::ConsoleOptions consoleOptions(argc, argv); + + Core::ProxyType thread; + + if (consoleOptions.Type != App::IThreader::Type::NONE) { + + App::Printer() << "Test with " << consoleOptions.Count << " threads allocating " << consoleOptions.Stack << "kB" << std::endl; + + App::WorkLoad job(consoleOptions.Stack); + + std::atomic_bool started(false); + + thread = App::IThreader::Instance(consoleOptions.Type, &job); + + App::Printer() << "Threads initialised" << App::MemProber::Dump() << std::endl; + + char element; + + do { + element = toupper(getchar()); + + switch (element) { + case 'S': + if (thread->IsRunning() == false) { + job.Arm(); + thread->Run(consoleOptions.Count); + } else { + job.Trigger(); + thread->Stop(); + App::Printer() << "Released threads" << App::MemProber::Dump() << std::endl; + } + break; + case 'I': + // info + break; + case 'Q': + break; + default: { + } + } + } while (element != 'Q'); + + } else { + App::Printer(std::cerr) << "Please provide a proper thread implementation." << std::endl; + } + + if (thread.IsValid() == true) { + thread.Release(); + } + + Core::Singleton::Dispose(); + + App::Printer() << "Exit" << App::MemProber::Dump() << std::endl; + + return (0); +}