From f8a7c63f0d120819607a7427456d1636b739c94b Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 20 Feb 2024 15:46:50 +0000 Subject: [PATCH 1/2] Replace TMutex -> std::mutex, TCondVar -> std::condition_variable --- client/ydb_persqueue_core/impl/common.cpp | 6 +- client/ydb_persqueue_core/impl/common.h | 14 ++- client/ydb_persqueue_core/impl/read_session.h | 7 +- .../ydb_persqueue_core/impl/read_session.ipp | 112 +++++++++--------- .../impl/write_session_impl.h | 12 +- client/ydb_topic/impl/executor.cpp | 15 +-- client/ydb_topic/impl/executor.h | 2 +- client/ydb_topic/impl/write_session_impl.h | 12 +- .../ut/ut_utils/managed_executor.cpp | 32 +++-- .../ydb_topic/ut/ut_utils/managed_executor.h | 2 +- 10 files changed, 110 insertions(+), 104 deletions(-) diff --git a/client/ydb_persqueue_core/impl/common.cpp b/client/ydb_persqueue_core/impl/common.cpp index 16fc3432494..f5089e164c4 100644 --- a/client/ydb_persqueue_core/impl/common.cpp +++ b/client/ydb_persqueue_core/impl/common.cpp @@ -137,7 +137,8 @@ void TSerialExecutor::PostImpl(std::vector&& fs) { } void TSerialExecutor::PostImpl(TFunction&& f) { - with_lock(Mutex) { + { + std::lock_guard guard(Mutex); ExecutionQueue.push(std::move(f)); if (Busy) { return; @@ -157,7 +158,8 @@ void TSerialExecutor::PostNext() { Executor->Post([weakThis, f = std::move(ExecutionQueue.front())]() { if (auto sharedThis = weakThis.lock()) { f(); - with_lock(sharedThis->Mutex) { + { + std::lock_guard guard(sharedThis->Mutex); sharedThis->Busy = false; sharedThis->PostNext(); } diff --git a/client/ydb_persqueue_core/impl/common.h b/client/ydb_persqueue_core/impl/common.h index eca83ed4c2f..3eebc2d52f1 100644 --- a/client/ydb_persqueue_core/impl/common.h +++ b/client/ydb_persqueue_core/impl/common.h @@ -364,7 +364,7 @@ class TBaseSessionEventsQueue : public ISignalable { void Signal() override { - CondVar.Signal(); + CondVar.notify_one(); } protected: @@ -380,7 +380,8 @@ class TBaseSessionEventsQueue : public ISignalable { void WaitEventsImpl() { // Assumes that we're under lock. Posteffect: HasEventsImpl() is true. while (!HasEventsImpl()) { - CondVar.WaitI(Mutex); + std::unique_lock lk(Mutex, std::defer_lock); + CondVar.wait(lk); } } @@ -393,7 +394,8 @@ class TBaseSessionEventsQueue : public ISignalable { public: NThreading::TFuture WaitEvent() { - with_lock (Mutex) { + { + std::lock_guard guard (Mutex); if (HasEventsImpl()) { return NThreading::MakeFuture(); // Signalled } else { @@ -413,8 +415,8 @@ class TBaseSessionEventsQueue : public ISignalable { TWaiter Waiter; bool WaiterWillBeSignaled = false; std::queue Events; - TCondVar CondVar; - TMutex Mutex; + std::condition_variable CondVar; + std::mutex Mutex; TMaybe CloseEvent; std::atomic Closed = false; }; @@ -467,7 +469,7 @@ class TSerialExecutor : public IAsyncExecutor, public std::enable_shared_from_th private: IAsyncExecutor::TPtr Executor; //!< Wrapped executor that is actually doing the job bool Busy = false; //!< Set if some closure was scheduled for execution and did not finish yet - TMutex Mutex = {}; + std::mutex Mutex; TQueue ExecutionQueue = {}; public: diff --git a/client/ydb_persqueue_core/impl/read_session.h b/client/ydb_persqueue_core/impl/read_session.h index e78a7e3d984..e77a9b69b90 100644 --- a/client/ydb_persqueue_core/impl/read_session.h +++ b/client/ydb_persqueue_core/impl/read_session.h @@ -726,7 +726,7 @@ class TPartitionStreamImpl : public TAPartitionStream { std::vector::TDataReceivedEvent::TCompressedMessage>& compressedMessages, TUserRetrievedEventsInfoAccumulator& accumulator); - TMutex& GetLock() { + std::mutex& GetLock() { return Lock; } @@ -742,7 +742,7 @@ class TPartitionStreamImpl : public TAPartitionStream { TDisjointIntervalTree Commits; TDisjointIntervalTree ClientCommits; - TMutex Lock; + std::mutex Lock; }; template @@ -776,7 +776,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue& event, TDeferredActions& deferred) { TWaiter waiter; - with_lock (TParent::Mutex) { + { + std::lock_guard guard(TParent::Mutex); if (TParent::Closed) return false; TParent::CloseEvent = event; diff --git a/client/ydb_persqueue_core/impl/read_session.ipp b/client/ydb_persqueue_core/impl/read_session.ipp index b063325b21f..b760cd168be 100644 --- a/client/ydb_persqueue_core/impl/read_session.ipp +++ b/client/ydb_persqueue_core/impl/read_session.ipp @@ -1848,26 +1848,25 @@ bool TReadSessionEventsQueue::PushEvent(TIntrusivePtr::TEvent event, TDeferredActions& deferred) { - with_lock (TParent::Mutex) { - if (TParent::Closed) { - return false; - } - //TODO: check session closed event and return false - using TClosedEvent = std::conditional_t< - UseMigrationProtocol, - NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, - NTopic::TReadSessionEvent::TPartitionSessionClosedEvent - >; + std::lock_guard guard(TParent::Mutex); + if (TParent::Closed) { + return false; + } + //TODO: check session closed event and return false + using TClosedEvent = std::conditional_t< + UseMigrationProtocol, + NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, + NTopic::TReadSessionEvent::TPartitionSessionClosedEvent + >; - if (std::holds_alternative(event)) { - stream->DeleteNotReadyTail(deferred); - } + if (std::holds_alternative(event)) { + stream->DeleteNotReadyTail(deferred); + } - stream->InsertEvent(std::move(event)); - Y_ASSERT(stream->HasEvents()); + stream->InsertEvent(std::move(event)); + Y_ASSERT(stream->HasEvents()); - SignalReadyEventsImpl(stream, deferred); - } + SignalReadyEventsImpl(stream, deferred); return true; } @@ -1906,12 +1905,12 @@ bool TReadSessionEventsQueue::PushDataEvent(TIntrusivePtr< TDataDecompressionInfoPtr parent, std::atomic& ready) { - with_lock (TParent::Mutex) { - if (this->Closed) { - return false; - } - partitionStream->InsertDataEvent(batch, message, parent, ready); + + std::lock_guard guard(TParent::Mutex); + if (this->Closed) { + return false; } + partitionStream->InsertDataEvent(batch, message, parent, ready); return true; } @@ -2046,22 +2045,21 @@ TReadSessionEventsQueue::GetEvents(bool block, TMaybe::max(); TUserRetrievedEventsInfoAccumulator accumulator; - with_lock (TParent::Mutex) { - eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount)); - do { - if (block) { - TParent::WaitEventsImpl(); - } + std::lock_guard guard(TParent::Mutex); + eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount)); + do { + if (block) { + TParent::WaitEventsImpl(); + } - while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { - TReadSessionEventInfo event = GetEventImpl(maxByteSize, accumulator); - eventInfos.emplace_back(std::move(event)); - if (eventInfos.back().IsSessionClosedEvent()) { - break; - } + while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) { + TReadSessionEventInfo event = GetEventImpl(maxByteSize, accumulator); + eventInfos.emplace_back(std::move(event)); + if (eventInfos.back().IsSessionClosedEvent()) { + break; } - } while (block && eventInfos.empty()); - } + } + } while (block && eventInfos.empty()); accumulator.OnUserRetrievedEvent(); @@ -2085,18 +2083,17 @@ TReadSessionEventsQueue::GetEvent(bool block, size_t maxBy TMaybe> eventInfo; TUserRetrievedEventsInfoAccumulator accumulator; - with_lock (TParent::Mutex) { - do { - if (block) { - TParent::WaitEventsImpl(); - } + std::lock_guard guard(TParent::Mutex); + do { + if (block) { + TParent::WaitEventsImpl(); + } - if (TParent::HasEventsImpl()) { - eventInfo = GetEventImpl(maxByteSize, accumulator); - } + if (TParent::HasEventsImpl()) { + eventInfo = GetEventImpl(maxByteSize, accumulator); + } - } while (block && !eventInfo); - } + } while (block && !eventInfo); accumulator.OnUserRetrievedEvent(); @@ -2112,11 +2109,11 @@ void TReadSessionEventsQueue::SignalReadyEvents( TIntrusivePtr> partitionStream) { Y_ASSERT(partitionStream); - with_lock (partitionStream->GetLock()) { - TDeferredActions deferred; - with_lock (TParent::Mutex) { - SignalReadyEventsImpl(partitionStream, deferred); - } + std::lock_guard guard1(partitionStream->GetLock()); + TDeferredActions deferred; + { + std::lock_guard g(TParent::Mutex); + SignalReadyEventsImpl(partitionStream, deferred); } } @@ -2192,14 +2189,13 @@ void TReadSessionEventsQueue::GetDataEventCallbackSettings template void TReadSessionEventsQueue::ClearAllEvents() { - with_lock (TParent::Mutex) { - while (!TParent::Events.empty()) { - auto& event = TParent::Events.front(); - if (event.PartitionStream && event.PartitionStream->HasEvents()) { - event.PartitionStream->PopEvent(); - } - TParent::Events.pop(); + std::lock_guard guard(TParent::Mutex); + while (!TParent::Events.empty()) { + auto& event = TParent::Events.front(); + if (event.PartitionStream && event.PartitionStream->HasEvents()) { + event.PartitionStream->PopEvent(); } + TParent::Events.pop(); } } diff --git a/client/ydb_persqueue_core/impl/write_session_impl.h b/client/ydb_persqueue_core/impl/write_session_impl.h index 8a7e49751ec..189b4a058c7 100644 --- a/client/ydb_persqueue_core/impl/write_session_impl.h +++ b/client/ydb_persqueue_core/impl/write_session_impl.h @@ -38,7 +38,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue guard(Mutex); Events.emplace(std::move(eventInfo)); waiter = PopWaiterImpl(); } @@ -47,7 +48,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue GetEvent(bool block = false) { TMaybe eventInfo; - with_lock (Mutex) { + { + std::lock_guard guard(Mutex); if (block) { WaitEventsImpl(); } @@ -63,7 +65,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue GetEvents(bool block = false, TMaybe maxEventsCount = Nothing()) { std::vector eventInfos; - with_lock (Mutex) { + { + std::lock_guard guard(Mutex); if (block) { WaitEventsImpl(); } @@ -90,7 +93,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue guard(Mutex); CloseEvent = event; Closed = true; waiter = TWaiter(Waiter.ExtractPromise(), this); diff --git a/client/ydb_topic/impl/executor.cpp b/client/ydb_topic/impl/executor.cpp index 88ddbe57e3a..4bd1698780f 100644 --- a/client/ydb_topic/impl/executor.cpp +++ b/client/ydb_topic/impl/executor.cpp @@ -33,13 +33,13 @@ void TSerialExecutor::PostImpl(std::vector&& fs) { } void TSerialExecutor::PostImpl(TFunction&& f) { - with_lock(Mutex) { - ExecutionQueue.push(std::move(f)); - if (Busy) { - return; - } - PostNext(); + std::lock_guard guard(Mutex); + + ExecutionQueue.push(std::move(f)); + if (Busy) { + return; } + PostNext(); } void TSerialExecutor::PostNext() { @@ -53,7 +53,8 @@ void TSerialExecutor::PostNext() { Executor->Post([weakThis, f = std::move(ExecutionQueue.front())]() { if (auto sharedThis = weakThis.lock()) { f(); - with_lock(sharedThis->Mutex) { + { + std::lock_guard guard(sharedThis->Mutex); sharedThis->Busy = false; sharedThis->PostNext(); } diff --git a/client/ydb_topic/impl/executor.h b/client/ydb_topic/impl/executor.h index e9738514f84..501cd8e86c0 100644 --- a/client/ydb_topic/impl/executor.h +++ b/client/ydb_topic/impl/executor.h @@ -58,7 +58,7 @@ class TSerialExecutor : public IAsyncExecutor, public std::enable_shared_from_th private: IAsyncExecutor::TPtr Executor; //!< Wrapped executor that is actually doing the job bool Busy = false; //!< Set if some closure was scheduled for execution and did not finish yet - TMutex Mutex = {}; + std::mutex Mutex = {}; TQueue ExecutionQueue = {}; public: diff --git a/client/ydb_topic/impl/write_session_impl.h b/client/ydb_topic/impl/write_session_impl.h index 5dd8c80045e..8295772266c 100644 --- a/client/ydb_topic/impl/write_session_impl.h +++ b/client/ydb_topic/impl/write_session_impl.h @@ -36,7 +36,8 @@ class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue guard(Mutex); Events.emplace(std::move(eventInfo)); waiter = PopWaiterImpl(); } @@ -45,7 +46,8 @@ class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue GetEvent(bool block = false) { TMaybe eventInfo; - with_lock (Mutex) { + { + std::lock_guard guard(Mutex); if (block) { WaitEventsImpl(); } @@ -61,7 +63,8 @@ class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue GetEvents(bool block = false, TMaybe maxEventsCount = Nothing()) { std::vector eventInfos; - with_lock (Mutex) { + { + std::lock_guard guard(Mutex); if (block) { WaitEventsImpl(); } @@ -88,7 +91,8 @@ class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue guard(Mutex); CloseEvent = event; Closed = true; waiter = NPersQueue::TWaiter(Waiter.ExtractPromise(), this); diff --git a/client/ydb_topic/ut/ut_utils/managed_executor.cpp b/client/ydb_topic/ut/ut_utils/managed_executor.cpp index e6b21741f15..63b5f910082 100644 --- a/client/ydb_topic/ut/ut_utils/managed_executor.cpp +++ b/client/ydb_topic/ut/ut_utils/managed_executor.cpp @@ -14,10 +14,9 @@ bool TManagedExecutor::IsAsync() const void TManagedExecutor::Post(TFunction &&f) { - with_lock (Mutex) { - Funcs.push_back(std::move(f)); - ++Planned; - } + std::lock_guard guard(Mutex); + Funcs.push_back(std::move(f)); + ++Planned; } void TManagedExecutor::DoStart() @@ -46,21 +45,19 @@ void TManagedExecutor::RunTask(TFunction&& func) void TManagedExecutor::StartFuncs(const std::vector& indicies) { - with_lock (Mutex) { - for (auto index : indicies) { - Y_ABORT_UNLESS(index < Funcs.size()); - Y_ABORT_UNLESS(Funcs[index]); + std::lock_guard guard(Mutex); + for (auto index : indicies) { + Y_ABORT_UNLESS(index < Funcs.size()); + Y_ABORT_UNLESS(Funcs[index]); - RunTask(std::move(Funcs[index])); - } + RunTask(std::move(Funcs[index])); } } size_t TManagedExecutor::GetFuncsCount() const { - with_lock (Mutex) { - return Funcs.size(); - } + std::lock_guard guard(Mutex); + return Funcs.size(); } size_t TManagedExecutor::GetPlannedCount() const @@ -80,11 +77,10 @@ size_t TManagedExecutor::GetExecutedCount() const void TManagedExecutor::RunAllTasks() { - with_lock (Mutex) { - for (auto& func : Funcs) { - if (func) { - RunTask(std::move(func)); - } + std::lock_guard guard(Mutex); + for (auto& func : Funcs) { + if (func) { + RunTask(std::move(func)); } } } diff --git a/client/ydb_topic/ut/ut_utils/managed_executor.h b/client/ydb_topic/ut/ut_utils/managed_executor.h index 99d70d3d4ee..47c1aa7960d 100644 --- a/client/ydb_topic/ut/ut_utils/managed_executor.h +++ b/client/ydb_topic/ut/ut_utils/managed_executor.h @@ -34,7 +34,7 @@ class TManagedExecutor : public IExecutor { void RunTask(TFunction&& func); TExecutorPtr Executor; - TMutex Mutex; + std::mutex Mutex; std::vector Funcs; std::atomic Planned = 0; std::atomic Running = 0; From a5fa333b8401c252bccd749c1831a1a34624ac47 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 20 Feb 2024 15:54:45 +0000 Subject: [PATCH 2/2] f --- client/ydb_persqueue_core/impl/common.h | 2 +- client/ydb_persqueue_core/impl/read_session.h | 1 - client/ydb_topic/impl/executor.h | 1 - client/ydb_topic/ut/ut_utils/managed_executor.h | 2 +- 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/client/ydb_persqueue_core/impl/common.h b/client/ydb_persqueue_core/impl/common.h index 3eebc2d52f1..477eeaa048e 100644 --- a/client/ydb_persqueue_core/impl/common.h +++ b/client/ydb_persqueue_core/impl/common.h @@ -4,10 +4,10 @@ #include #include -#include #include #include +#include namespace NYdb::NPersQueue { diff --git a/client/ydb_persqueue_core/impl/read_session.h b/client/ydb_persqueue_core/impl/read_session.h index e77a9b69b90..bbda64b69a5 100644 --- a/client/ydb_persqueue_core/impl/read_session.h +++ b/client/ydb_persqueue_core/impl/read_session.h @@ -16,7 +16,6 @@ #include #include #include -#include #include #include diff --git a/client/ydb_topic/impl/executor.h b/client/ydb_topic/impl/executor.h index 501cd8e86c0..344173eae4e 100644 --- a/client/ydb_topic/impl/executor.h +++ b/client/ydb_topic/impl/executor.h @@ -4,7 +4,6 @@ #include #include -#include #include diff --git a/client/ydb_topic/ut/ut_utils/managed_executor.h b/client/ydb_topic/ut/ut_utils/managed_executor.h index 47c1aa7960d..9bf3976af8f 100644 --- a/client/ydb_topic/ut/ut_utils/managed_executor.h +++ b/client/ydb_topic/ut/ut_utils/managed_executor.h @@ -2,9 +2,9 @@ #include #include -#include #include +#include namespace NYdb::NTopic::NTests {