Skip to content

Commit

Permalink
RingBuffer v1
Browse files Browse the repository at this point in the history
  • Loading branch information
mrouffet committed Mar 30, 2024
1 parent f140b57 commit 1a47bd5
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 60 deletions.
22 changes: 6 additions & 16 deletions Include/SA/Logger/LoggerThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@

#include <SA/Logger/LoggerBase.hpp>

#include <queue>
#include <SA/Logger/Misc/RingBuffer.hpp>

#include <mutex>
#include <atomic>
#include <thread>
#include <condition_variable>

/**
* \file LoggerThread.hpp
*
* \brief \e Multithread <b>Logger</b> class implementation.
* \brief \e Multithread <b>Lock-free Logger</b> class implementation.
*
* \ingroup Logger
* \{
Expand Down Expand Up @@ -46,17 +45,8 @@ namespace SA
/// Current running state.
std::atomic<bool> mIsRunning = true;

/// Log queue mutex operations.
std::mutex mLogQueueMutex;

/// Log queue condition variable.
std::condition_variable mLogConditionVar;

/// Log saved queue.
std::queue<SA::Log> mLogQueue;

/// Atomic queue size.
std::atomic<size_t> mQueueSize = 0;
/// FIFO Ring buffer
RingBuffer<SA::Log> mRingBuffer;

void ThreadLoop();

Expand All @@ -76,7 +66,7 @@ namespace SA

public:
/// Default Constructor.
LoggerThread() noexcept;
LoggerThread(uint32_t _ringBufferSize = 32) noexcept;

/**
* Thread-safe destructor.
Expand Down
89 changes: 89 additions & 0 deletions Include/SA/Logger/Misc/RindBuffer.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2024 Sapphire's Suite. All Rights Reserved.

namespace SA
{
template <typename T>
RingBuffer<T>::RingBuffer(uint32_t _capacity) :
mCapacity { _capacity }
{
mHandleBuffer = static_cast<T*>(::operator new(_capacity * sizeof(T)));
mPushCompleted = new std::atomic<bool>[_capacity];
}

template <typename T>
RingBuffer<T>::~RingBuffer()
{
::operator delete(mHandleBuffer);
delete[] mPushCompleted;
}


template <typename T>
void RingBuffer<T>::Push(T&& _obj)
{
while (IsFull())
std::this_thread::yield();

const uint32_t index = mPushCursor = (mPushCursor + 1) % Capacity();

new(&mHandleBuffer[index]) T(std::move(_obj));

#if SA_DEBUG || 1

if (mPushCompleted[index])
{
throw std::string("DATA RACE");
}

#endif

mPushCompleted[index] = true;
}

template <typename T>
T RingBuffer<T>::Pop()
{
//while (IsEmpty())
// std::this_thread::yield();

const uint32_t index = mPopCursor = (mPopCursor + 1) % Capacity();

while(!mPushCompleted[index])
std::this_thread::yield();

// Reset for next use.
mPushCompleted[index] = false;

T output = std::move(mHandleBuffer[index]);

mHandleBuffer[index].~T();

return std::move(output);
}


template <typename T>
uint32_t RingBuffer<T>::Size() const noexcept
{
return mPushCursor - mPopCursor;
}

template <typename T>
uint32_t RingBuffer<T>::Capacity() const noexcept
{
return mCapacity;
}


template <typename T>
bool RingBuffer<T>::IsEmpty() const noexcept
{
return Size() == 0;
}

template <typename T>
bool RingBuffer<T>::IsFull() const noexcept
{
return Size() == Capacity();
}
}
41 changes: 41 additions & 0 deletions Include/SA/Logger/Misc/RingBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2024 Sapphire's Suite. All Rights Reserved.

#pragma once

#ifndef SAPPHIRE_LOGGER_RING_BUFFER_GUARD
#define SAPPHIRE_LOGGER_RING_BUFFER_GUARD

#include <atomic>
#include <thread>

namespace SA
{
template <typename T>
class RingBuffer
{
T* mHandleBuffer = nullptr;
std::atomic<bool>* mPushCompleted = nullptr;

const uint32_t mCapacity = 0;

std::atomic<uint32_t> mPushCursor = 0;
std::atomic<uint32_t> mPopCursor = 0;

public:
RingBuffer(uint32_t _capacity = 32);
~RingBuffer();

void Push(T&& _obj);
T Pop();

uint32_t Size() const noexcept;
uint32_t Capacity() const noexcept;

bool IsEmpty() const noexcept;
bool IsFull() const noexcept;
};
}

#include "RindBuffer.inl"

#endif // SAPPHIRE_LOGGER_RING_BUFFER_GUARD
54 changes: 10 additions & 44 deletions Source/SA/Logger/LoggerThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,39 @@

namespace SA
{
LoggerThread::LoggerThread() noexcept
LoggerThread::LoggerThread(uint32_t _ringBufferSize) noexcept :
mRingBuffer(_ringBufferSize)
{
mThread = std::thread(&LoggerThread::ThreadLoop, this);
}

LoggerThread::~LoggerThread()
{
// Flush all pending logs.
Flush();

// Stop running thread.
mIsRunning = false;
mLogConditionVar.notify_one();

if(mThread.joinable())
mThread.join();
}

void LoggerThread::Log(SA::Log _log)
{
mLogQueueMutex.lock();

mLogQueue.push(std::move(_log));
++mQueueSize;

mLogQueueMutex.unlock();

mLogConditionVar.notify_one();
mRingBuffer.Push(std::move(_log));
}

//{ Thread

void LoggerThread::ThreadLoop()
{
std::unique_lock locker(mLogQueueMutex);

// Wait for first push.
if (mLogQueue.empty())
mLogConditionVar.wait(locker);

// Wait for first push
while (mRingBuffer.IsEmpty() && mIsRunning)
std::this_thread::yield();

while (mIsRunning)
{
// Pop Log.
SA::Log log = std::move(mLogQueue.front());
mLogQueue.pop();

// Allow queue.push() while outputing in streams.
locker.unlock();
ProcessLog(mRingBuffer.Pop());


ProcessLog(log);

// Decrease queue size after process: ensure correct flush.
--mQueueSize;


// re-lock before accessing size.
locker.lock();

// Queue empty: wait for push.
if (mLogQueue.empty())
mLogConditionVar.wait(locker); // Wait and aquire locker for next loop.

// Check running state after wait.
while(mRingBuffer.IsEmpty() && mIsRunning)
std::this_thread::yield();
}
}

Expand Down Expand Up @@ -99,9 +67,7 @@ namespace SA

void LoggerThread::Flush()
{
// Wait for empty queue.
while(mQueueSize)
std::this_thread::yield();
// TODO: close the queue and wait for all processed

// Flush all.
std::lock_guard lkStreams(mStreamMutex);
Expand Down
1 change: 1 addition & 0 deletions Tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
# Entrypoints

add_subdirectory(Prototype)
add_subdirectory(PrototypeMT)
add_subdirectory(UnitTests)
27 changes: 27 additions & 0 deletions Tests/PrototypeMT/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2023 Sapphire's Suite. All Rights Reserved.



# Project

project(SA_LoggerProtoMT)



# Executable

## Add executable target.
add_executable(SA_LoggerProtoMT "main.cpp")



# Dependencies

### Add library dependencies.
target_link_libraries(SA_LoggerProtoMT PRIVATE SA_Logger)



# Tests

add_test(NAME CSA_LoggerProtoMT COMMAND SA_LoggerProto)
35 changes: 35 additions & 0 deletions Tests/PrototypeMT/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2023 Sapphire's Suite. All Rights Reserved.

#include <iostream>

#include <SA/Collections/Debug>

void LoggingThread()
{
for (int i = 0; i < 1000; ++i)
SA_LOG(("HELLO %1", i));
}

int main()
{
SA::Debug::InitDefaultLoggerThread;

std::thread t1(LoggingThread);
std::thread t2(LoggingThread);
std::thread t3(LoggingThread);
std::thread t4(LoggingThread);

if (t1.joinable())
t1.join();

if (t2.joinable())
t2.join();

if (t3.joinable())
t3.join();

if (t4.joinable())
t4.join();

return 0;
}

0 comments on commit 1a47bd5

Please sign in to comment.