Skip to content

Commit

Permalink
Additional fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone committed Dec 2, 2024
1 parent f4fec02 commit 781c3be
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 20 deletions.
19 changes: 9 additions & 10 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ using namespace DataStormContract;

namespace
{
// TODO convert to a middleware
class DispatchInterceptorI : public Ice::Object
class SessionDispatcher : public Ice::Object
{
public:
DispatchInterceptorI(shared_ptr<NodeI> node, shared_ptr<CallbackExecutor> executor)
SessionDispatcher(shared_ptr<NodeI> node, shared_ptr<CallbackExecutor> executor)
: _node(std::move(node)),
_executor(std::move(executor))
{
Expand All @@ -48,15 +47,15 @@ namespace

NodeI::NodeI(const shared_ptr<Instance>& instance)
: _instance(instance),
_nextPublisherSessionId{0},
_nextSubscriberSessionId{0},
_proxy{instance->getObjectAdapter()->createProxy<NodePrx>({Ice::generateUUID(), ""})},
// The subscriber and publisher collocated forwarders are initalized here to avoid using a nullable proxy. These
// objects are only used after the node is initialized and are removed in destroy implementation.
_subscriberForwarder{instance->getCollocatedForwarder()->add<SubscriberSessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToSubscribers(inParams, current); })},
_publisherForwarder{instance->getCollocatedForwarder()->add<PublisherSessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToPublishers(inParams, current); })},
_nextSubscriberSessionId{0},
_nextPublisherSessionId{0}
_subscriberForwarder{instance->getCollocatedForwarder()->add<SubscriberSessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToSubscribers(inParams, current); })}
{
}

Expand All @@ -77,7 +76,9 @@ NodeI::init()
auto adapter = instance->getObjectAdapter();
adapter->add<NodePrx>(self, _proxy->ice_getIdentity());

auto interceptor = make_shared<DispatchInterceptorI>(self, instance->getCallbackExecutor());
// Register the SessionDispatcher object as the default servant for subscriber and publisher sessions.
// The "s" category handles subscriber sessions, and the "p" category handles publisher sessions.
auto interceptor = make_shared<SessionDispatcher>(self, instance->getCallbackExecutor());
adapter->addDefaultServant(interceptor, "s");
adapter->addDefaultServant(interceptor, "p");
}
Expand All @@ -97,9 +98,7 @@ NodeI::destroy(bool ownsCommunicator)

if (!ownsCommunicator)
{
//
// Notifies peer sessions of the disconnection.
//
for (const auto& [_, subscriber] : _subscribers)
{
if (auto session = subscriber->getSession())
Expand Down
27 changes: 20 additions & 7 deletions cpp/src/DataStorm/NodeI.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,31 @@ namespace DataStormI
void forwardToSubscribers(const Ice::ByteSeq&, const Ice::Current&) const;
void forwardToPublishers(const Ice::ByteSeq&, const Ice::Current&) const;

mutable std::mutex _mutex;
mutable std::condition_variable _cond;
std::weak_ptr<Instance> _instance;
mutable std::mutex _mutex;
std::int64_t _nextPublisherSessionId;
std::int64_t _nextSubscriberSessionId;

// The proxy for this node.
DataStormContract::NodePrx _proxy;
DataStormContract::SubscriberSessionPrx _subscriberForwarder;

// A map of all publisher sessions, indexed by the identity of the peer node.
std::map<Ice::Identity, std::shared_ptr<PublisherSessionI>> _publishers;

// A proxy to a colocated publisher session object that forwards requests to all active publisher sessions.
DataStormContract::PublisherSessionPrx _publisherForwarder;

// A map of all publisher sessions, indexed by the identity of each session.
std::map<Ice::Identity, std::shared_ptr<PublisherSessionI>> _publisherSessions;

// A map of all subscriber sessions, indexed by the identity of the peer node.
std::map<Ice::Identity, std::shared_ptr<SubscriberSessionI>> _subscribers;
std::map<Ice::Identity, std::shared_ptr<PublisherSessionI>> _publishers;

// A proxy to a colocated subscriber session object that forwards requests to all active subscriber sessions.
DataStormContract::SubscriberSessionPrx _subscriberForwarder;

// A map of all subscriber sessions, indexed by the identity of each session.
std::map<Ice::Identity, std::shared_ptr<SubscriberSessionI>> _subscriberSessions;
std::map<Ice::Identity, std::shared_ptr<PublisherSessionI>> _publisherSessions;
std::int64_t _nextSubscriberSessionId;
std::int64_t _nextPublisherSessionId;
};
}
#endif
1 change: 1 addition & 0 deletions cpp/src/DataStorm/NodeSessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ namespace DataStormI
// If announce forwarding is disabled, this will be nullopt.
std::optional<DataStormContract::LookupPrx> _lookup;

// A map containing all the publisher and subscriber sessions established between two nodes.
std::map<Ice::Identity, std::optional<DataStormContract::SessionPrx>> _sessions;
};
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/DataStorm/TraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ using namespace DataStormI;
# pragma GCC diagnostic ignored "-Wshadow"
#endif

TraceLevels::TraceLevels(const Ice::PropertiesPtr& properties, const Ice::LoggerPtr& logger)
TraceLevels::TraceLevels(const Ice::PropertiesPtr& properties, Ice::LoggerPtr logger)
: topic(properties->getIcePropertyAsInt("DataStorm.Trace.Topic")),
topicCat("Topic"),
data(properties->getIcePropertyAsInt("DataStorm.Trace.Data")),
dataCat("Data"),
session(properties->getIcePropertyAsInt("DataStorm.Trace.Session")),
sessionCat("Session"),
logger(logger)
logger(std::move(logger))
{
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/DataStorm/TraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ namespace DataStormI
class TraceLevels
{
public:
TraceLevels(const Ice::PropertiesPtr&, const Ice::LoggerPtr&);
TraceLevels(const Ice::PropertiesPtr&, Ice::LoggerPtr);

const int topic;
const char* topicCat;
Expand Down

0 comments on commit 781c3be

Please sign in to comment.