Skip to content

Commit

Permalink
Additional fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone committed Jan 3, 2025
1 parent 46f6adc commit 4f684c9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 16 deletions.
8 changes: 1 addition & 7 deletions cpp/src/DataStorm/NodeSessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ namespace
{
// Checks whether there is an active NodeSession for the publisher matching the current connection.
// - If there is a match, forward the call to the target node.
// - Otherwise, destroy the publisher's NodeSession and notify the publisher of the disconnection.
// - Otherwise notify the publisher of the disconnection.
auto publisherNodeSession = _nodeSessionManager->getSession(publisher->ice_getIdentity());
if (publisherNodeSession && publisherNodeSession->getConnection() == current.con)
{
Expand All @@ -106,12 +106,6 @@ namespace
}
else
{
// The publisher's NodeSession is from an older connection, it just happen that the dispatch of the
// confirmCreateSession request run before that the close connection callback removed it.
if (publisherNodeSession)
{
_nodeSessionManager->destroySession(publisherNodeSession, *publisher);
}
publisherSession->ice_fixed(current.con)->disconnectedAsync(nullptr);
}
}
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ NodeSessionManager::createOrGet(NodePrx node, const ConnectionPtr& connection, b
instance->getConnectionManager()->add(
connection,
make_shared<NodePrx>(node),
[self = shared_from_this(), node = std::move(node)](const ConnectionPtr& c, exception_ptr) mutable
{ self->destroySession(c, node); });
[self = shared_from_this(), node = std::move(node)](const ConnectionPtr&, exception_ptr) mutable
{ self->destroySession(node); });

return session;
}
Expand Down Expand Up @@ -450,13 +450,12 @@ NodeSessionManager::disconnected(const LookupPrx& lookup)
}

void
NodeSessionManager::destroySession([[maybe_unused]] const ConnectionPtr& connection, const NodePrx& node)
NodeSessionManager::destroySession(const NodePrx& node)
{
unique_lock<mutex> lock(_mutex);
auto p = _sessions.find(node->ice_getIdentity());
if (p != _sessions.end())
{
assert(p->second->getConnection() == connection);
p->second->destroy();
_sessions.erase(p);
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/DataStorm/NodeSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ namespace DataStormI
[[nodiscard]] std::shared_ptr<NodeSessionI> getSessionNoLock(const Ice::Identity&) const;

void forward(const Ice::ByteSeq&, const Ice::Current&) const;
void destroySession(const Ice::ConnectionPtr&, const DataStormContract::NodePrx&);

private:
void connect(const DataStormContract::LookupPrx&, const DataStormContract::NodePrx&);
Expand All @@ -51,6 +50,8 @@ namespace DataStormI
void disconnected(const DataStormContract::NodePrx&, const DataStormContract::LookupPrx&);
void disconnected(const DataStormContract::LookupPrx&);

void destroySession(const DataStormContract::NodePrx&);

std::weak_ptr<Instance> _instance;
const std::shared_ptr<TraceLevels> _traceLevels;
DataStormContract::NodePrx _nodePrx;
Expand Down
22 changes: 18 additions & 4 deletions cpp/test/DataStorm/reliability/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,53 @@
"Ice.Trace.Protocol" : 1,
}

# Disable client and server idle timeout below to avoid a test to recover after the connection is closed by the idle timeout.
# This ensure that test hangs if the session goes into an invalid state, and is not silently recover after the idle timeout
# closes the connection.

# A client connected to the default server
clientProps = {
"DataStorm.Node.Multicast.Enabled": 0,
"DataStorm.Node.Server.Enabled": 0,
"DataStorm.Node.ConnectTo": "tcp -p {port1}"
"DataStorm.Node.ConnectTo": "tcp -p {port1}",
"Ice.Connection.Server.IdleTimeout": 0,
"Ice.Connection.Client.IdleTimeout": 0,
}

# A client connected to the second server
client2Props = {
"DataStorm.Node.Multicast.Enabled": 0,
"DataStorm.Node.Server.Enabled": 0,
"DataStorm.Node.ConnectTo": "tcp -p {port2}"
"DataStorm.Node.ConnectTo": "tcp -p {port2}",
"Ice.Connection.Server.IdleTimeout": 0,
"Ice.Connection.Client.IdleTimeout": 0,
}

# The default server, not connected to any other server.
serverProps = {
"DataStorm.Node.Multicast.Enabled": 0,
"DataStorm.Node.Server.Endpoints": "tcp -p {port1}",
"DataStorm.Node.ConnectTo": ""
"DataStorm.Node.ConnectTo": "",
"Ice.Connection.Server.IdleTimeout": 0,
"Ice.Connection.Client.IdleTimeout": 0,
}

# A second server connected to the first server
server2Props = {
"DataStorm.Node.Multicast.Enabled": 0,
"DataStorm.Node.Server.Endpoints": "tcp -p {port2}",
"DataStorm.Node.ConnectTo": "tcp -p {port1}"
"DataStorm.Node.ConnectTo": "tcp -p {port1}",
"Ice.Connection.Server.IdleTimeout": 0,
"Ice.Connection.Client.IdleTimeout": 0,
}

# A server without a fixed endpoint, connected to the first server.
serverAnyProps = {
"DataStorm.Node.Multicast.Enabled": 0,
"DataStorm.Node.Server.Endpoints": "tcp",
"DataStorm.Node.ConnectTo": "tcp -p {port1}",
"Ice.Connection.Server.IdleTimeout": 0,
"Ice.Connection.Client.IdleTimeout": 0,
}

props = [
Expand Down

0 comments on commit 4f684c9

Please sign in to comment.