Skip to content

Commit

Permalink
Additional fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone committed Nov 30, 2024
1 parent f7c1174 commit e7a8f78
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 38 deletions.
35 changes: 17 additions & 18 deletions cpp/src/DataStorm/NodeSessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ namespace
class NodeForwarder : public Node, public enable_shared_from_this<NodeForwarder>
{
public:
NodeForwarder(shared_ptr<NodeSessionManager> nodeSessionManager, shared_ptr<NodeSessionI> session, NodePrx node)
NodeForwarder(shared_ptr<NodeSessionManager> nodeSessionManager, shared_ptr<NodeSessionI> nodeSession, NodePrx node)
: _nodeSessionManager(std::move(nodeSessionManager)),
_session(std::move(session)),
_nodeSession(std::move(nodeSession)),
_node(std::move(node))
{
}

void initiateCreateSession(optional<NodePrx> publisher, const Ice::Current& current) final
{
Ice::checkNotNull(publisher, __FILE__, __LINE__, current);
if (auto session = _session.lock())
if (auto nodeSession = _nodeSession.lock())
{
try
{
Expand All @@ -53,12 +53,12 @@ namespace
Ice::checkNotNull(subscriber, __FILE__, __LINE__, current);
Ice::checkNotNull(subscriberSession, __FILE__, __LINE__, current);

if (auto session = _session.lock())
if (auto nodeSession = _nodeSession.lock())
{
try
{
updateNodeAndSessionProxy(*subscriber, subscriberSession, current);
session->addSession(*subscriberSession);
nodeSession->addSession(*subscriberSession);
// Forward the call to the target Node object, don't need to wait for the result.
_node->createSessionAsync(subscriber, subscriberSession, true, nullptr);
}
Expand All @@ -76,12 +76,12 @@ namespace
Ice::checkNotNull(publisher, __FILE__, __LINE__, current);
Ice::checkNotNull(publisherSession, __FILE__, __LINE__, current);

if (auto session = _session.lock())
if (auto nodeSession = _nodeSession.lock())
{
try
{
updateNodeAndSessionProxy(*publisher, publisherSession, current);
session->addSession(*publisherSession);
nodeSession->addSession(*publisherSession);
// Forward the call to the target Node object, don't need to wait for the result.
_node->confirmCreateSessionAsync(publisher, publisherSession, nullptr);
}
Expand All @@ -99,29 +99,28 @@ namespace
{
if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty())
{
auto peerSession = _nodeSessionManager->createOrGet(node, current.con, false);
assert(peerSession);
node = peerSession->getPublicNode();
shared_ptr<NodeSessionI> nodeSession = _nodeSessionManager->createOrGet(node, current.con, false);
assert(nodeSession);
node = nodeSession->getPublicNode();
if (session)
{
session = peerSession->forwarder(*session);
session = nodeSession->forwarder(*session);
}
}
}

const shared_ptr<NodeSessionManager> _nodeSessionManager;
const weak_ptr<NodeSessionI> _session;
const weak_ptr<NodeSessionI> _nodeSession;
const NodePrx _node;
};
}

NodeSessionI::NodeSessionI(
shared_ptr<Instance> instance,
optional<NodePrx> node,
NodePrx node,
Ice::ConnectionPtr connection,
bool forwardAnnouncements)
: _instance(std::move(instance)),
_traceLevels(_instance->getTraceLevels()),
_node(std::move(node)),
_connection(std::move(connection))
{
Expand Down Expand Up @@ -150,9 +149,9 @@ NodeSessionI::init()
_publicNode = _node;
}

if (_traceLevels->session > 0)
if (_instance->getTraceLevels()->session > 0)
{
Trace out(_traceLevels, _traceLevels->sessionCat);
Trace out(_instance->getTraceLevels(), _instance->getTraceLevels()->sessionCat);
out << "created node session (peer = `" << _publicNode << "'):\n" << _connection->toString();
}
}
Expand Down Expand Up @@ -184,9 +183,9 @@ NodeSessionI::destroy()
{
}

if (_traceLevels->session > 0)
if (_instance->getTraceLevels()->session > 0)
{
Trace out(_traceLevels, _traceLevels->sessionCat);
Trace out(_instance->getTraceLevels(), _instance->getTraceLevels()->sessionCat);
out << "destroyed node session (peer = `" << _publicNode << "')";
}
}
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/DataStorm/NodeSessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@

namespace DataStormI
{
class TraceLevels;

class NodeSessionI final : public std::enable_shared_from_this<NodeSessionI>
{
public:
NodeSessionI(std::shared_ptr<Instance>, std::optional<DataStormContract::NodePrx>, Ice::ConnectionPtr, bool);
NodeSessionI(std::shared_ptr<Instance>, DataStormContract::NodePrx, Ice::ConnectionPtr, bool);

void init();
void destroy();
Expand All @@ -43,14 +41,17 @@ namespace DataStormI

private:
const std::shared_ptr<Instance> _instance;
const std::shared_ptr<TraceLevels> _traceLevels;
std::optional<DataStormContract::NodePrx> _node;
DataStormContract::NodePrx _node;
const Ice::ConnectionPtr _connection;

std::mutex _mutex;
bool _destroyed;
std::optional<DataStormContract::NodePrx> _publicNode;

// A proxy for forwarding announcements to the target node when announce forwarding is enabled.
// If announce forwarding is disabled, this will be nullopt.
std::optional<DataStormContract::LookupPrx> _lookup;

std::map<Ice::Identity, std::optional<DataStormContract::SessionPrx>> _sessions;
};
}
Expand Down
17 changes: 7 additions & 10 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace
auto pos = current.id.name.find('-');
if (pos != string::npos && pos < current.id.name.length())
{
if (auto session = _nodeSessionManager->getSession(current.id.name.substr(pos + 1)))
if (auto session = _nodeSessionManager->getSession(Ice::Identity{current.id.name.substr(pos + 1), ""}))
{
// Forward the call to the target session, don't need to wait for the result.
auto id = Ice::Identity{current.id.name.substr(0, pos), current.id.category.substr(0, 1)};
Expand Down Expand Up @@ -310,14 +310,11 @@ NodeSessionManager::forward(const Ice::ByteSeq& inParams, const Ice::Current& cu
}
}

// Forward the call to all nodes we are connected to, don't need to wait for the result.
for (const auto& [_, lookup] : _connectedTo)
// Forward the call to the connected to node, don't need to wait for the result.
if (_connectedTo && (*_connectedTo)->ice_getCachedConnection() != _exclude)
{
if (lookup.second->ice_getCachedConnection() != _exclude)
{
lookup.second
->ice_invokeAsync(current.operation, current.mode, inParams, nullptr, nullptr, nullptr, current.ctx);
}
(*_connectedTo)
->ice_invokeAsync(current.operation, current.mode, inParams, nullptr, nullptr, nullptr, current.ctx);
}
}

Expand Down Expand Up @@ -394,7 +391,7 @@ NodeSessionManager::connected(NodePrx node, LookupPrx lookup)
{
lookup = lookup->ice_fixed(connection);
}
_connectedTo.emplace(node->ice_getIdentity(), make_pair(node, lookup));
_connectedTo = lookup;

auto readerNames = instance->getTopicFactory()->getTopicReaderNames();
auto writerNames = instance->getTopicFactory()->getTopicWriterNames();
Expand Down Expand Up @@ -424,7 +421,7 @@ NodeSessionManager::disconnected(NodePrx node, LookupPrx lookup)
Trace out(_traceLevels, _traceLevels->sessionCat);
out << "disconnected node session (peer = `" << node << "')";
}
_connectedTo.erase(node->ice_getIdentity());
_connectedTo = nullopt;
lock.unlock();
connect(lookup, _nodePrx);
}
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/DataStorm/NodeSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ namespace DataStormI
const Ice::ConnectionPtr& = nullptr) const;

std::shared_ptr<NodeSessionI> getSession(const Ice::Identity&) const;
std::shared_ptr<NodeSessionI> getSession(const std::string& name) const
{
return getSession(Ice::Identity{name, ""});
}

void forward(const Ice::ByteSeq&, const Ice::Current&) const;

Expand Down Expand Up @@ -72,8 +68,14 @@ namespace DataStormI

int _retryCount;

// A map containing the `NodeSessionI` servants for all nodes that have an active session with this node.
// The map is indexed by the identity of the nodes.
std::map<Ice::Identity, std::shared_ptr<NodeSessionI>> _sessions;
std::map<Ice::Identity, std::pair<DataStormContract::NodePrx, DataStormContract::LookupPrx>> _connectedTo;

// The `Lookup` proxy for the `ConnectTo` node, which is set when there is an active connection to the target node.
// If the `DataStorm.Node.ConnectTo` property is configured, the session manager attempts to connect to the specified
// node and sets this member once the connection is established.
std::optional<DataStormContract::LookupPrx> _connectedTo;

mutable Ice::ConnectionPtr _exclude;
DataStormContract::LookupPrx _forwarder;
Expand Down

0 comments on commit e7a8f78

Please sign in to comment.