Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TMutex, TCondVar replacement #67

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions client/ydb_persqueue_core/impl/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ void TSerialExecutor::PostImpl(std::vector<TFunction>&& fs) {
}

void TSerialExecutor::PostImpl(TFunction&& f) {
with_lock(Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
ExecutionQueue.push(std::move(f));
if (Busy) {
return;
Expand All @@ -157,7 +158,8 @@ void TSerialExecutor::PostNext() {
Executor->Post([weakThis, f = std::move(ExecutionQueue.front())]() {
if (auto sharedThis = weakThis.lock()) {
f();
with_lock(sharedThis->Mutex) {
{
std::lock_guard<std::mutex> guard(sharedThis->Mutex);
sharedThis->Busy = false;
sharedThis->PostNext();
}
Expand Down
16 changes: 9 additions & 7 deletions client/ydb_persqueue_core/impl/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
#include <client/ydb_common_client/impl/client.h>

#include <util/generic/queue.h>
#include <util/system/condvar.h>
#include <util/thread/pool.h>

#include <queue>
#include <condition_variable>

namespace NYdb::NPersQueue {

Expand Down Expand Up @@ -364,7 +364,7 @@ class TBaseSessionEventsQueue : public ISignalable {


void Signal() override {
CondVar.Signal();
CondVar.notify_one();
}

protected:
Expand All @@ -380,7 +380,8 @@ class TBaseSessionEventsQueue : public ISignalable {

void WaitEventsImpl() { // Assumes that we're under lock. Posteffect: HasEventsImpl() is true.
while (!HasEventsImpl()) {
CondVar.WaitI(Mutex);
std::unique_lock<std::mutex> lk(Mutex, std::defer_lock);
CondVar.wait(lk);
}
}

Expand All @@ -393,7 +394,8 @@ class TBaseSessionEventsQueue : public ISignalable {

public:
NThreading::TFuture<void> WaitEvent() {
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard (Mutex);
if (HasEventsImpl()) {
return NThreading::MakeFuture(); // Signalled
} else {
Expand All @@ -413,8 +415,8 @@ class TBaseSessionEventsQueue : public ISignalable {
TWaiter Waiter;
bool WaiterWillBeSignaled = false;
std::queue<TEventInfo> Events;
TCondVar CondVar;
TMutex Mutex;
std::condition_variable CondVar;
std::mutex Mutex;
TMaybe<TClosedEvent> CloseEvent;
std::atomic<bool> Closed = false;
};
Expand Down Expand Up @@ -467,7 +469,7 @@ class TSerialExecutor : public IAsyncExecutor, public std::enable_shared_from_th
private:
IAsyncExecutor::TPtr Executor; //!< Wrapped executor that is actually doing the job
bool Busy = false; //!< Set if some closure was scheduled for execution and did not finish yet
TMutex Mutex = {};
std::mutex Mutex;
TQueue<TFunction> ExecutionQueue = {};

public:
Expand Down
8 changes: 4 additions & 4 deletions client/ydb_persqueue_core/impl/read_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <util/digest/numeric.h>
#include <util/generic/hash.h>
#include <util/generic/hash_multi_map.h>
#include <util/system/condvar.h>

#include <atomic>
#include <deque>
Expand Down Expand Up @@ -726,7 +725,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
std::vector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator);

TMutex& GetLock() {
std::mutex& GetLock() {
return Lock;
}

Expand All @@ -742,7 +741,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
TDisjointIntervalTree<ui64> Commits;
TDisjointIntervalTree<ui64> ClientCommits;

TMutex Lock;
std::mutex Lock;
};

template <bool UseMigrationProtocol>
Expand Down Expand Up @@ -776,7 +775,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti

bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
TWaiter waiter;
with_lock (TParent::Mutex) {
{
std::lock_guard<std::mutex> guard(TParent::Mutex);
if (TParent::Closed)
return false;
TParent::CloseEvent = event;
Expand Down
112 changes: 54 additions & 58 deletions client/ydb_persqueue_core/impl/read_session.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1848,26 +1848,25 @@ bool TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPar
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
TDeferredActions<UseMigrationProtocol>& deferred)
{
with_lock (TParent::Mutex) {
if (TParent::Closed) {
return false;
}
//TODO: check session closed event and return false
using TClosedEvent = std::conditional_t<
UseMigrationProtocol,
NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent
>;
std::lock_guard<std::mutex> guard(TParent::Mutex);
if (TParent::Closed) {
return false;
}
//TODO: check session closed event and return false
using TClosedEvent = std::conditional_t<
UseMigrationProtocol,
NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent,
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent
>;

if (std::holds_alternative<TClosedEvent>(event)) {
stream->DeleteNotReadyTail(deferred);
}
if (std::holds_alternative<TClosedEvent>(event)) {
stream->DeleteNotReadyTail(deferred);
}

stream->InsertEvent(std::move(event));
Y_ASSERT(stream->HasEvents());
stream->InsertEvent(std::move(event));
Y_ASSERT(stream->HasEvents());

SignalReadyEventsImpl(stream, deferred);
}
SignalReadyEventsImpl(stream, deferred);
return true;
}

Expand Down Expand Up @@ -1906,12 +1905,12 @@ bool TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<
TDataDecompressionInfoPtr<UseMigrationProtocol> parent,
std::atomic<bool>& ready)
{
with_lock (TParent::Mutex) {
if (this->Closed) {
return false;
}
partitionStream->InsertDataEvent(batch, message, parent, ready);

std::lock_guard<std::mutex> guard(TParent::Mutex);
if (this->Closed) {
return false;
}
partitionStream->InsertDataEvent(batch, message, parent, ready);
return true;
}

Expand Down Expand Up @@ -2046,22 +2045,21 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvents(bool block, TMaybe<size
const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max();
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol> accumulator;

with_lock (TParent::Mutex) {
eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount));
do {
if (block) {
TParent::WaitEventsImpl();
}
std::lock_guard<std::mutex> guard(TParent::Mutex);
eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.Defined(), maxCount));
do {
if (block) {
TParent::WaitEventsImpl();
}

while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(maxByteSize, accumulator);
eventInfos.emplace_back(std::move(event));
if (eventInfos.back().IsSessionClosedEvent()) {
break;
}
while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(maxByteSize, accumulator);
eventInfos.emplace_back(std::move(event));
if (eventInfos.back().IsSessionClosedEvent()) {
break;
}
} while (block && eventInfos.empty());
}
}
} while (block && eventInfos.empty());

accumulator.OnUserRetrievedEvent();

Expand All @@ -2085,18 +2083,17 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvent(bool block, size_t maxBy
TMaybe<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo;
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol> accumulator;

with_lock (TParent::Mutex) {
do {
if (block) {
TParent::WaitEventsImpl();
}
std::lock_guard<std::mutex> guard(TParent::Mutex);
do {
if (block) {
TParent::WaitEventsImpl();
}

if (TParent::HasEventsImpl()) {
eventInfo = GetEventImpl(maxByteSize, accumulator);
}
if (TParent::HasEventsImpl()) {
eventInfo = GetEventImpl(maxByteSize, accumulator);
}

} while (block && !eventInfo);
}
} while (block && !eventInfo);

accumulator.OnUserRetrievedEvent();

Expand All @@ -2112,11 +2109,11 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalReadyEvents(
TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream) {
Y_ASSERT(partitionStream);

with_lock (partitionStream->GetLock()) {
TDeferredActions<UseMigrationProtocol> deferred;
with_lock (TParent::Mutex) {
SignalReadyEventsImpl(partitionStream, deferred);
}
std::lock_guard<std::mutex> guard1(partitionStream->GetLock());
TDeferredActions<UseMigrationProtocol> deferred;
{
std::lock_guard<std::mutex> g(TParent::Mutex);
SignalReadyEventsImpl(partitionStream, deferred);
}
}

Expand Down Expand Up @@ -2192,14 +2189,13 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventCallbackSettings

template<bool UseMigrationProtocol>
void TReadSessionEventsQueue<UseMigrationProtocol>::ClearAllEvents() {
with_lock (TParent::Mutex) {
while (!TParent::Events.empty()) {
auto& event = TParent::Events.front();
if (event.PartitionStream && event.PartitionStream->HasEvents()) {
event.PartitionStream->PopEvent();
}
TParent::Events.pop();
std::lock_guard<std::mutex> guard(TParent::Mutex);
while (!TParent::Events.empty()) {
auto& event = TParent::Events.front();
if (event.PartitionStream && event.PartitionStream->HasEvents()) {
event.PartitionStream->PopEvent();
}
TParent::Events.pop();
}
}

Expand Down
12 changes: 8 additions & 4 deletions client/ydb_persqueue_core/impl/write_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett
}

TWaiter waiter;
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
Events.emplace(std::move(eventInfo));
waiter = PopWaiterImpl();
}
Expand All @@ -47,7 +48,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett

TMaybe<TEvent> GetEvent(bool block = false) {
TMaybe<TEventInfo> eventInfo;
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
if (block) {
WaitEventsImpl();
}
Expand All @@ -63,7 +65,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett

std::vector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) {
std::vector<TEventInfo> eventInfos;
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
if (block) {
WaitEventsImpl();
}
Expand All @@ -90,7 +93,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett

void Close(const TSessionClosedEvent& event) {
TWaiter waiter;
with_lock (Mutex) {
{
std::lock_guard<std::mutex> guard(Mutex);
CloseEvent = event;
Closed = true;
waiter = TWaiter(Waiter.ExtractPromise(), this);
Expand Down
15 changes: 8 additions & 7 deletions client/ydb_topic/impl/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ void TSerialExecutor::PostImpl(std::vector<TFunction>&& fs) {
}

void TSerialExecutor::PostImpl(TFunction&& f) {
with_lock(Mutex) {
ExecutionQueue.push(std::move(f));
if (Busy) {
return;
}
PostNext();
std::lock_guard<std::mutex> guard(Mutex);

ExecutionQueue.push(std::move(f));
if (Busy) {
return;
}
PostNext();
}

void TSerialExecutor::PostNext() {
Expand All @@ -53,7 +53,8 @@ void TSerialExecutor::PostNext() {
Executor->Post([weakThis, f = std::move(ExecutionQueue.front())]() {
if (auto sharedThis = weakThis.lock()) {
f();
with_lock(sharedThis->Mutex) {
{
std::lock_guard<std::mutex> guard(sharedThis->Mutex);
sharedThis->Busy = false;
sharedThis->PostNext();
}
Expand Down
3 changes: 1 addition & 2 deletions client/ydb_topic/impl/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <client/ydb_common_client/impl/client.h>

#include <util/generic/queue.h>
#include <util/system/condvar.h>
#include <util/thread/pool.h>


Expand Down Expand Up @@ -58,7 +57,7 @@ class TSerialExecutor : public IAsyncExecutor, public std::enable_shared_from_th
private:
IAsyncExecutor::TPtr Executor; //!< Wrapped executor that is actually doing the job
bool Busy = false; //!< Set if some closure was scheduled for execution and did not finish yet
TMutex Mutex = {};
std::mutex Mutex = {};
TQueue<TFunction> ExecutionQueue = {};

public:
Expand Down
Loading
Loading