Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataStorm lambda capture fixes #3174

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ DataElementI::attach(
session->subscriberInitialized(topicId, id > 0 ? data.id : -data.id, data.samples, key, shared_from_this());
if (!samplesI.empty())
{
return [=, this]() { initSamples(samplesI, topicId, data.id, priority, now, id < 0); };
return [=, self = shared_from_this()]()
{ self->initSamples(samplesI, topicId, data.id, priority, now, id < 0); };
}
return nullptr;
}
Expand Down
67 changes: 31 additions & 36 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,47 +184,47 @@ NodeI::createSession(
return; // Shutting down or already connected
}

auto self = shared_from_this();
s->ice_getConnectionAsync(
[=, this](auto connection) mutable
[=, self = shared_from_this()](auto connection) mutable
{
if (session->checkSession())
{
return;
}

if (connection && !connection->getAdapter())
{
connection->setAdapter(getInstance()->getObjectAdapter());
}

if (connection)
{
if (!connection->getAdapter())
{
connection->setAdapter(self->getInstance()->getObjectAdapter());
}
subscriberSession = subscriberSession->ice_fixed(connection);
}

try
{
// Must be called before connected
s->confirmCreateSessionAsync(
_proxy,
self->_proxy,
session->getProxy<PublisherSessionPrx>(),
nullptr,
[=](auto ex) { self->removePublisherSession(*subscriber, session, ex); });
[self, subscriber, session](auto ex)
{ self->removePublisherSession(*subscriber, session, ex); });
assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection);

// Session::connected informs the subscriber session of all the topic writers in the current node.
session->connected(
*subscriberSession,
connection,
getInstance()->getTopicFactory()->getTopicWriters());
self->getInstance()->getTopicFactory()->getTopicWriters());
}
catch (const Ice::LocalException&)
{
removePublisherSession(*subscriber, session, current_exception());
self->removePublisherSession(*subscriber, session, current_exception());
}
},
[=](auto ex) { self->removePublisherSession(*subscriber, session, ex); });
[self = shared_from_this(), subscriber, session](auto ex)
{ self->removePublisherSession(*subscriber, session, ex); });
}
catch (const Ice::LocalException&)
{
Expand Down Expand Up @@ -266,41 +266,33 @@ NodeI::confirmCreateSession(
void
NodeI::createSubscriberSession(
NodePrx subscriber,
const Ice::ConnectionPtr& connection,
const Ice::ConnectionPtr& subscriberConnection,
const shared_ptr<PublisherSessionI>& session)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being destroyed.
// Ignore the Node is being shutdown.
return;
}

try
{
subscriber = getNodeWithExistingConnection(std::move(instance), subscriber, connection);
subscriber = getNodeWithExistingConnection(std::move(instance), subscriber, subscriberConnection);

auto self = shared_from_this();
#if defined(__GNUC__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wshadow"
#endif
subscriber->ice_getConnectionAsync(
[=, this](auto connection)
[=, self = shared_from_this()](auto connection)
{
if (connection && !connection->getAdapter())
{
connection->setAdapter(getInstance()->getObjectAdapter());
connection->setAdapter(self->getInstance()->getObjectAdapter());
}
subscriber->initiateCreateSessionAsync(
_proxy,
self->_proxy,
nullptr,
[=](auto ex) { self->removePublisherSession(subscriber, session, ex); });
},
[=](auto ex) { self->removePublisherSession(subscriber, session, ex); });
#if defined(__GNUC__)
# pragma GCC diagnostic pop
#endif
[=, self = shared_from_this()](auto ex) { self->removePublisherSession(subscriber, session, ex); });
}
catch (const Ice::LocalException&)
{
Expand All @@ -309,18 +301,21 @@ NodeI::createSubscriberSession(
}

void
NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con, shared_ptr<SubscriberSessionI> session)
NodeI::createPublisherSession(
NodePrx publisher,
const Ice::ConnectionPtr& publisherConnection,
shared_ptr<SubscriberSessionI> session)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being destroyed.
// Ignore the Node is being shutdown.
return;
}

try
{
auto p = getNodeWithExistingConnection(std::move(instance), publisher, con);
auto p = getNodeWithExistingConnection(std::move(instance), publisher, publisherConnection);

unique_lock<mutex> lock(_mutex);
if (!session)
Expand All @@ -332,9 +327,8 @@ NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con,
}
}

auto self = shared_from_this();
p->ice_getConnectionAsync(
[=, this](auto connection)
[=, self = shared_from_this()](auto connection)
{
if (session->checkSession())
{
Expand All @@ -343,24 +337,25 @@ NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con,

if (connection && !connection->getAdapter())
{
connection->setAdapter(getInstance()->getObjectAdapter());
connection->setAdapter(self->getInstance()->getObjectAdapter());
}

try
{
p->createSessionAsync(
_proxy,
self->_proxy,
session->getProxy<SubscriberSessionPrx>(),
false,
nullptr,
[=](exception_ptr ex) { self->removeSubscriberSession(publisher, session, ex); });
}
catch (const Ice::LocalException&)
{
removeSubscriberSession(publisher, session, current_exception());
self->removeSubscriberSession(publisher, session, current_exception());
}
},
[=](exception_ptr ex) { self->removeSubscriberSession(publisher, session, ex); });
[=, self = shared_from_this()](exception_ptr ex)
{ self->removeSubscriberSession(publisher, session, ex); });
}
catch (const Ice::LocalException&)
{
Expand Down
32 changes: 15 additions & 17 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ NodeSessionManager::createOrGet(NodePrx node, const Ice::ConnectionPtr& connecti
session->init();
_sessions.emplace(node->ice_getIdentity(), session);

// TODO we should review this code, to avoid using the proxy shared_ptr as a map key.
// Specially the connection manager doesn't use this proxy for lookup.
// Register a callback with the connection manager to destroy the session when the connection is closed.
instance->getConnectionManager()->add(
connection,
make_shared<NodePrx>(node),
Expand Down Expand Up @@ -306,6 +305,13 @@ NodeSessionManager::forward(const Ice::ByteSeq& inParams, const Ice::Current& cu
void
NodeSessionManager::connect(LookupPrx lookup, NodePrx proxy)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being shutdown.
return;
}

try
{
lookup->createSessionAsync(
Expand All @@ -326,6 +332,10 @@ NodeSessionManager::connect(LookupPrx lookup, NodePrx proxy)
[=, self = shared_from_this()](std::exception_ptr) { self->disconnected(lookup); });
}
catch (const Ice::CommunicatorDestroyedException&)
{
// Ignore node is being shutdown.
}
catch (const std::exception&)
{
disconnected(lookup);
}
Expand All @@ -338,6 +348,7 @@ NodeSessionManager::connected(NodePrx node, LookupPrx lookup)
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being shutdown.
return;
}

Expand Down Expand Up @@ -376,6 +387,7 @@ NodeSessionManager::connected(NodePrx node, LookupPrx lookup)
}
catch (const Ice::CommunicatorDestroyedException&)
{
// Ignore node is being shutdown.
}
}
}
Expand Down Expand Up @@ -407,21 +419,7 @@ NodeSessionManager::disconnected(LookupPrx lookup)
if (instance)
{
instance->scheduleTimerTask(
[=, self = shared_from_this()]
{
#if defined(__GNUC__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wshadow"
#endif
auto instance = self->_instance.lock();
if (instance)
{
self->connect(lookup, self->_nodePrx);
}
#if defined(__GNUC__)
# pragma GCC diagnostic pop
#endif
},
[=, self = shared_from_this()] { self->connect(lookup, self->_nodePrx); },
instance->getRetryDelay(_retryCount++));
}
}
Expand Down
10 changes: 4 additions & 6 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,12 +789,10 @@ SessionI::checkSession()
{
if (_connection)
{
//
// Make sure the connection is still established. It's possible that the connection got closed
// and we're not notified yet by the connection manager. Check session explicitly check for the
// connection to make sure that if we get a session creation request from a peer (which might
// detect the connection closure before), it doesn't get ignored.
//
// Make sure the connection is still established. It's possible that the connection got closed and we
// were not notified yet by the connection manager. Check session explicitly check for the connection
// to make sure that if we get a session creation request from a peer (which might detect the connection
// closure before), it doesn't get ignored.
try
{
_connection->throwException();
Expand Down
8 changes: 0 additions & 8 deletions cpp/src/DataStorm/TopicFactoryI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ TopicFactoryI::createTopicReader(
catch (const Ice::ObjectAdapterDestroyedException&)
{
}
catch (const std::exception&)
{
assert(false);
}
return reader;
}

Expand Down Expand Up @@ -127,10 +123,6 @@ TopicFactoryI::createTopicWriter(
catch (const Ice::ObjectAdapterDestroyedException&)
{
}
catch (const std::exception&)
{
assert(false);
}

return writer;
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/DataStorm/TopicI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ TopicI::TopicI(
_instance(_factory.lock()->getInstance()),
_traceLevels(_instance->getTraceLevels()),
_id(id),
// The collocated forwarder is initalized here to avoid using a nullable proxy. The forwarder is only used by
// the instance that owns it and is removed in destroy implementation.
_forwarder{_instance->getCollocatedForwarder()->add<SessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forward(inParams, current); })},
_destroyed(false),
Expand Down