Skip to content

Commit

Permalink
TMutex, TCondVar replacement (#67)
Browse files Browse the repository at this point in the history
* Replace TMutex -> std::mutex, TCondVar -> std::condition_variable

* f
  • Loading branch information
dcherednik authored Feb 20, 2024
1 parent e947399 commit c6a09fb
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 108 deletions.
6 changes: 4 additions & 2 deletions client/ydb_persqueue_core/impl/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ void TSerialExecutor::PostImpl(std::vector<TFunction>&& fs) {
}

void TSerialExecutor::PostImpl(TFunction&& f) {
with_lock(Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
ExecutionQueue.push(std::move(f));
if (Busy) {
return;
Expand All @@ -157,7 +158,8 @@ void TSerialExecutor::PostNext() {
Executor->Post([weakThis, f = std::move(ExecutionQueue.front())]() {
if (auto sharedThis = weakThis.lock()) {
f();
with_lock(sharedThis->Mutex) {
{
std::lock_guard<std::mutex> guard(sharedThis->Mutex);
sharedThis->Busy = false;
sharedThis->PostNext();
}
Expand Down
16 changes: 9 additions & 7 deletions client/ydb_persqueue_core/impl/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
#include <client/ydb_common_client/impl/client.h>

#include <util/generic/queue.h>
#include <util/system/condvar.h>
#include <util/thread/pool.h>

#include <queue>
#include <condition_variable>

namespace NYdb::NPersQueue {

Expand Down Expand Up @@ -364,7 +364,7 @@ class TBaseSessionEventsQueue : public ISignalable {


void Signal() override {
CondVar.Signal();
CondVar.notify_one();
}

protected:
Expand All @@ -380,7 +380,8 @@ class TBaseSessionEventsQueue : public ISignalable {

void WaitEventsImpl() { // Assumes that we're under lock. Posteffect: HasEventsImpl() is true.
while (!HasEventsImpl()) {
CondVar.WaitI(Mutex);
std::unique_lock<std::mutex> lk(Mutex, std::defer_lock);
CondVar.wait(lk);
}
}

Expand All @@ -393,7 +394,8 @@ class TBaseSessionEventsQueue : public ISignalable {

public:
NThreading::TFuture<void> WaitEvent() {
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard (Mutex);
if (HasEventsImpl()) {
return NThreading::MakeFuture(); // Signalled
} else {
Expand All @@ -413,8 +415,8 @@ class TBaseSessionEventsQueue : public ISignalable {
TWaiter Waiter;
bool WaiterWillBeSignaled = false;
std::queue<TEventInfo> Events;
TCondVar CondVar;
TMutex Mutex;
std::condition_variable CondVar;
std::mutex Mutex;
TMaybe<TClosedEvent> CloseEvent;
std::atomic<bool> Closed = false;
};
Expand Down Expand Up @@ -467,7 +469,7 @@ class TSerialExecutor : public IAsyncExecutor, public std::enable_shared_from_th
private:
IAsyncExecutor::TPtr Executor; //!< Wrapped executor that is actually doing the job
bool Busy = false; //!< Set if some closure was scheduled for execution and did not finish yet
TMutex Mutex = {};
std::mutex Mutex;
TQueue<TFunction> ExecutionQueue = {};

public:
Expand Down
8 changes: 4 additions & 4 deletions client/ydb_persqueue_core/impl/read_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <util/digest/numeric.h>
#include <util/generic/hash.h>
#include <util/generic/hash_multi_map.h>
#include <util/system/condvar.h>

#include <atomic>
#include <deque>
Expand Down Expand Up @@ -726,7 +725,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
std::vector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator);

TMutex& GetLock() {
std::mutex& GetLock() {
return Lock;
}

Expand All @@ -742,7 +741,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
TDisjointIntervalTree<ui64> Commits;
TDisjointIntervalTree<ui64> ClientCommits;

TMutex Lock;
std::mutex Lock;
};

template <bool UseMigrationProtocol>
Expand Down Expand Up @@ -776,7 +775,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti

bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
TWaiter waiter;
with_lock (TParent::Mutex) {
{
std::lock_guard<std::mutex> guard(TParent::Mutex);
if (TParent::Closed)
return false;
TParent::CloseEvent = event;
Expand Down
112 changes: 54 additions & 58 deletions client/ydb_persqueue_core/impl/read_session.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1848,26 +1848,25 @@ bool TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPar
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
TDeferredActions<UseMigrationProtocol>& deferred)
{
with_lock (TParent::Mutex) {
if (TParent::Closed) {
return false;
}
//TODO: check session closed event and return false
using TClosedEvent = std::conditional_t<
UseMigrationProtocol,
NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent
>;
std::lock_guard<std::mutex> guard(TParent::Mutex);
if (TParent::Closed) {
return false;
}
//TODO: check session closed event and return false
using TClosedEvent = std::conditional_t<
UseMigrationProtocol,
NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent
>;

if (std::holds_alternative<TClosedEvent>(event)) {
stream->DeleteNotReadyTail(deferred);
}
if (std::holds_alternative<TClosedEvent>(event)) {
stream->DeleteNotReadyTail(deferred);
}

stream->InsertEvent(std::move(event));
Y_ASSERT(stream->HasEvents());
stream->InsertEvent(std::move(event));
Y_ASSERT(stream->HasEvents());

SignalReadyEventsImpl(stream, deferred);
}
SignalReadyEventsImpl(stream, deferred);
return true;
}

Expand Down Expand Up @@ -1906,12 +1905,12 @@ bool TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<
TDataDecompressionInfoPtr<UseMigrationProtocol> parent,
std::atomic<bool>& ready)
{
with_lock (TParent::Mutex) {
if (this->Closed) {
return false;
}
partitionStream->InsertDataEvent(batch, message, parent, ready);

std::lock_guard<std::mutex> guard(TParent::Mutex);
if (this->Closed) {
return false;
}
partitionStream->InsertDataEvent(batch, message, parent, ready);
return true;
}

Expand Down Expand Up @@ -2046,22 +2045,21 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvents(bool block, TMaybe<size
const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max();
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol> accumulator;

with_lock (TParent::Mutex) {
eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount));
do {
if (block) {
TParent::WaitEventsImpl();
}
std::lock_guard<std::mutex> guard(TParent::Mutex);
eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount));
do {
if (block) {
TParent::WaitEventsImpl();
}

while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(maxByteSize, accumulator);
eventInfos.emplace_back(std::move(event));
if (eventInfos.back().IsSessionClosedEvent()) {
break;
}
while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(maxByteSize, accumulator);
eventInfos.emplace_back(std::move(event));
if (eventInfos.back().IsSessionClosedEvent()) {
break;
}
} while (block && eventInfos.empty());
}
}
} while (block && eventInfos.empty());

accumulator.OnUserRetrievedEvent();

Expand All @@ -2085,18 +2083,17 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvent(bool block, size_t maxBy
TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo;
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol> accumulator;

with_lock (TParent::Mutex) {
do {
if (block) {
TParent::WaitEventsImpl();
}
std::lock_guard<std::mutex> guard(TParent::Mutex);
do {
if (block) {
TParent::WaitEventsImpl();
}

if (TParent::HasEventsImpl()) {
eventInfo = GetEventImpl(maxByteSize, accumulator);
}
if (TParent::HasEventsImpl()) {
eventInfo = GetEventImpl(maxByteSize, accumulator);
}

} while (block && !eventInfo);
}
} while (block && !eventInfo);

accumulator.OnUserRetrievedEvent();

Expand All @@ -2112,11 +2109,11 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalReadyEvents(
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) {
Y_ASSERT(partitionStream);

with_lock (partitionStream->GetLock()) {
TDeferredActions<UseMigrationProtocol> deferred;
with_lock (TParent::Mutex) {
SignalReadyEventsImpl(partitionStream, deferred);
}
std::lock_guard<std::mutex> guard1(partitionStream->GetLock());
TDeferredActions<UseMigrationProtocol> deferred;
{
std::lock_guard<std::mutex> g(TParent::Mutex);
SignalReadyEventsImpl(partitionStream, deferred);
}
}

Expand Down Expand Up @@ -2192,14 +2189,13 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventCallbackSettings

template<bool UseMigrationProtocol>
void TReadSessionEventsQueue<UseMigrationProtocol>::ClearAllEvents() {
with_lock (TParent::Mutex) {
while (!TParent::Events.empty()) {
auto& event = TParent::Events.front();
if (event.PartitionStream && event.PartitionStream->HasEvents()) {
event.PartitionStream->PopEvent();
}
TParent::Events.pop();
std::lock_guard<std::mutex> guard(TParent::Mutex);
while (!TParent::Events.empty()) {
auto& event = TParent::Events.front();
if (event.PartitionStream && event.PartitionStream->HasEvents()) {
event.PartitionStream->PopEvent();
}
TParent::Events.pop();
}
}

Expand Down
12 changes: 8 additions & 4 deletions client/ydb_persqueue_core/impl/write_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett
}

TWaiter waiter;
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
Events.emplace(std::move(eventInfo));
waiter = PopWaiterImpl();
}
Expand All @@ -47,7 +48,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett

TMaybe<TEvent> GetEvent(bool block = false) {
TMaybe<TEventInfo> eventInfo;
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
if (block) {
WaitEventsImpl();
}
Expand All @@ -63,7 +65,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett

std::vector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) {
std::vector<TEventInfo> eventInfos;
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
if (block) {
WaitEventsImpl();
}
Expand All @@ -90,7 +93,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett

void Close(const TSessionClosedEvent& event) {
TWaiter waiter;
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
CloseEvent = event;
Closed = true;
waiter = TWaiter(Waiter.ExtractPromise(), this);
Expand Down
15 changes: 8 additions & 7 deletions client/ydb_topic/impl/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ void TSerialExecutor::PostImpl(std::vector<TFunction>&& fs) {
}

void TSerialExecutor::PostImpl(TFunction&& f) {
with_lock(Mutex) {
ExecutionQueue.push(std::move(f));
if (Busy) {
return;
}
PostNext();
std::lock_guard<std::mutex> guard(Mutex);

ExecutionQueue.push(std::move(f));
if (Busy) {
return;
}
PostNext();
}

void TSerialExecutor::PostNext() {
Expand All @@ -53,7 +53,8 @@ void TSerialExecutor::PostNext() {
Executor->Post([weakThis, f = std::move(ExecutionQueue.front())]() {
if (auto sharedThis = weakThis.lock()) {
f();
with_lock(sharedThis->Mutex) {
{
std::lock_guard<std::mutex> guard(sharedThis->Mutex);
sharedThis->Busy = false;
sharedThis->PostNext();
}
Expand Down
3 changes: 1 addition & 2 deletions client/ydb_topic/impl/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <client/ydb_common_client/impl/client.h>

#include <util/generic/queue.h>
#include <util/system/condvar.h>
#include <util/thread/pool.h>


Expand Down Expand Up @@ -58,7 +57,7 @@ class TSerialExecutor : public IAsyncExecutor, public std::enable_shared_from_th
private:
IAsyncExecutor::TPtr Executor; //!< Wrapped executor that is actually doing the job
bool Busy = false; //!< Set if some closure was scheduled for execution and did not finish yet
TMutex Mutex = {};
std::mutex Mutex = {};
TQueue<TFunction> ExecutionQueue = {};

public:
Expand Down
Loading

0 comments on commit c6a09fb

Please sign in to comment.