diff --git a/cpp/src/DataStorm/Contract.ice b/cpp/src/DataStorm/Contract.ice index 323c005e3a8..3099f4a52b6 100644 --- a/cpp/src/DataStorm/Contract.ice +++ b/cpp/src/DataStorm/Contract.ice @@ -11,8 +11,8 @@ module DataStormContract { - /// The ClearHistoryPolicy enumeration defines the policy that determines when a reader clears its - /// DataSample history in response to various events. + /// The ClearHistoryPolicy enumeration defines the policy that determines when a reader clears its DataSample + /// history in response to various events. enum ClearHistoryPolicy { /// The reader clears its history when a new DataSample is added. @@ -70,7 +70,7 @@ module DataStormContract /// and encoded value. struct ElementInfo { - /// The unique identifier of the element. Filter IDs are negative. + /// The ID of the element. Filter IDs are negative, while key and tag IDs are positive. long id; /// The name of the filter. This field is empty for key and tag elements. @@ -131,15 +131,31 @@ module DataStormContract Ice::ByteSeq criteria; } + /// Represents the configuration of a reader or writer. class ElementConfig(1) { optional(1) string facet; + + /// An optional sample filter associated with the reader. Sample filters are specified on the reader side. optional(2) FilterInfo sampleFilter; + + /// An optional name for the reader or writer. optional(3) string name; + + /// An optional priority for the writer. + /// See also the `DataStorm.Topic.Priority` property. optional(4) int priority; + /// An optional sample count, specifying the number of samples queued in the writer or reader sample queue. + /// See also the `DataStorm.Topic.SampleCount` property. optional(10) int sampleCount; + + /// An optional lifetime, specified in milliseconds, representing the maximum time samples are kept in the + /// writer or reader sample queue. See also the `DataStorm.Topic.SampleLifetime` property. optional(11) int sampleLifetime; + + /// An optional clear history policy that determines when the reader or writer sample history is cleared. + /// See also the `DataStorm.Topic.ClearHistory` property. optional(12) ClearHistoryPolicy clearHistory; } @@ -247,15 +263,22 @@ module DataStormContract void detachTags(long topic, Ice::LongSeq tags); /// Announces new elements to the peer. + /// /// The peer will invoke `attachElements` for the elements it is interested in. The announced elements include - /// key readers, key writers, and filter readers associated with the specified topic. + /// the readers and writers associated with the specified topic. /// /// @param topic The ID of the topic associated with the elements. /// @param elements The sequence of elements to announce. /// @see attachElements void announceElements(long topic, ElementInfoSeq elements); - void attachElements(long topic, ElementSpecSeq elements, bool initialize); + /// Attaches the given topic elements to all subscribers of the specified topic. + /// + /// @param topicId The ID of the topic to which the elements belong. + /// @param elements The sequence of elements to attach to the topic's subscribers. + /// @param initialize True if called from attachTopic, false otherwise. + void attachElements(long topicId, ElementSpecSeq elements, bool initialize); + void attachElementsAck(long topic, ElementSpecAckSeq elements); void detachElements(long topic, Ice::LongSeq keys); @@ -270,6 +293,11 @@ module DataStormContract interface SubscriberSession extends Session { + /// Queue a sample with the subscribers of the topic element. + /// + /// @param topicId The ID of the topic. + /// @param elementId The ID of the element. + /// @param sample The sample to queue. void s(long topicId, long elementId, DataSample sample); } diff --git a/cpp/src/DataStorm/DataElementI.cpp b/cpp/src/DataStorm/DataElementI.cpp index d64c2b29376..5219817ab22 100644 --- a/cpp/src/DataStorm/DataElementI.cpp +++ b/cpp/src/DataStorm/DataElementI.cpp @@ -18,14 +18,14 @@ namespace { DataSample toSample(const shared_ptr& sample, const Ice::CommunicatorPtr& communicator, bool marshalKey) { - return { - sample->id, - marshalKey ? 0 : sample->key->getId(), - marshalKey ? sample->key->encode(communicator) : Ice::ByteSeq{}, - chrono::time_point_cast(sample->timestamp).time_since_epoch().count(), - sample->tag ? sample->tag->getId() : 0, - sample->event, - sample->encode(communicator)}; + return DataSample{ + .id = sample->id, + .keyId = marshalKey ? 0 : sample->key->getId(), + .keyValue = marshalKey ? sample->key->encode(communicator) : Ice::ByteSeq{}, + .timestamp = chrono::time_point_cast(sample->timestamp).time_since_epoch().count(), + .tag = sample->tag ? sample->tag->getId() : 0, + .event = sample->event, + .value = sample->encode(communicator)}; } void cleanOldSamples( @@ -111,8 +111,10 @@ DataElementI::attach( auto info = *data.config->sampleFilter; sampleFilter = _parent->getSampleFilterFactories()->decode(getCommunicator(), info.name, info.criteria); } + string facet = data.config->facet ? *data.config->facet : string(); int priority = data.config->priority ? *data.config->priority : 0; + string name; if (data.config->name) { @@ -124,14 +126,20 @@ DataElementI::attach( os << session->getId() << '-' << topicId << '-' << data.id; name = os.str(); } + if ((id > 0 && attachKey(topicId, data.id, key, sampleFilter, session, prx, facet, id, name, priority)) || (id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority))) { auto q = data.lastIds.find(_id); - long long lastId = q != data.lastIds.end() ? q->second : 0; - LongLongDict lastIds = key ? session->getLastIds(topicId, id, shared_from_this()) : LongLongDict(); + int64_t lastId = q != data.lastIds.end() ? q->second : 0; + LongLongDict lastIds = key ? session->getLastIds(topicId, id, shared_from_this()) : LongLongDict{}; DataSamples samples = getSamples(key, sampleFilter, data.config, lastId, now); - acks.push_back({_id, _config, lastIds, samples.samples, data.id}); + acks.push_back(ElementDataAck{ + .id = _id, + .config = _config, + .lastIds = std::move(lastIds), + .samples = std::move(samples.samples), + .peerId = data.id}); } } @@ -153,8 +161,10 @@ DataElementI::attach( auto info = *data.config->sampleFilter; sampleFilter = _parent->getSampleFilterFactories()->decode(getCommunicator(), info.name, info.criteria); } + string facet = data.config->facet ? *data.config->facet : string(); int priority = data.config->priority ? *data.config->priority : 0; + string name; if (data.config->name) { @@ -166,6 +176,7 @@ DataElementI::attach( os << session->getId() << '-' << topicId << '-' << data.id; name = os.str(); } + if ((id > 0 && attachKey(topicId, data.id, key, sampleFilter, session, prx, facet, id, name, priority)) || (id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority))) { @@ -178,7 +189,7 @@ DataElementI::attach( session->subscriberInitialized(topicId, id > 0 ? data.id : -data.id, data.samples, key, shared_from_this()); if (!samplesI.empty()) { - return [=, self = shared_from_this()]() + return [=, samplesI = std::move(samplesI), self = shared_from_this()]() { self->initSamples(samplesI, topicId, data.id, priority, now, id < 0); }; } return nullptr; @@ -198,14 +209,17 @@ DataElementI::attachKey( int priority) { // No locking necessary, called by the session with the mutex locked - auto p = _listeners.find({session, facet}); + + ListenerKey listenerKey{session, facet}; + auto p = _listeners.find(listenerKey); if (p == _listeners.end()) { - p = _listeners.emplace(ListenerKey{session, facet}, Listener(std::move(prx), facet)).first; + p = _listeners.emplace(std::move(listenerKey), Listener(std::move(prx), facet)).first; } bool added = false; auto subscriber = p->second.addOrGet(topicId, elementId, keyId, nullptr, sampleFilter, name, priority, added); + if (_onConnectedElements && added) { _executor->queue([self = shared_from_this(), name] @@ -634,7 +648,7 @@ DataElementI::forward(const Ice::ByteSeq& inParams, const Ice::Current& current) { for (const auto& [_, listener] : _listeners) { - // If there's at least one subscriber interested in the update + // If we are forwarding a sample check if at least once of the listeners is interested in the sample. if (!_sample || listener.matchOne(_sample, false)) { // Forward the call using the listener's session proxy don't need to wait for the result. @@ -1221,19 +1235,23 @@ KeyDataWriterI::getSamples( int64_t lastId, const chrono::time_point& now) { + // Collect all queued samples that match the key and sample filter, are newer than the lastId, and are not stale. DataSamples samples; samples.id = _keys.empty() ? -_id : _id; + // If the caller sample queueing is disabled, there is no need to return any samples. if (config->sampleCount && *config->sampleCount == 0) { return samples; } + // Reap stale samples before collecting any samples. if (_config->sampleLifetime && *_config->sampleLifetime > 0) { cleanOldSamples(_samples, now, *_config->sampleLifetime); } + // Compute the stale time, according to the callers sample lifetime configuration. chrono::time_point staleTime = chrono::time_point::min(); if (config->sampleLifetime && *config->sampleLifetime > 0) { @@ -1241,6 +1259,15 @@ KeyDataWriterI::getSamples( } shared_ptr first; + // Iterate through samples in reverse chronological order, starting with the newest. + // Stop iterating if any of the following conditions are met: + // - A sample's timestamp is older than the specified stale time. + // - A sample's ID is less than or equal to the specified last ID. + // - The requested number of samples has been collected. + // - A sample event triggers history clearing based on the caller's clear history policy. + // For each sample: + // - Check if it matches the optional key and sample filter. + // - If it matches, add it to the result set and update the first matched sample. for (auto p = _samples.rbegin(); p != _samples.rend(); ++p) { if ((*p)->timestamp < staleTime) @@ -1274,19 +1301,20 @@ KeyDataWriterI::getSamples( } } } + if (!samples.samples.empty()) { // If the first sample is a partial update, transform it to a full Update if (first->event == DataStorm::SampleEvent::PartialUpdate) { - samples.samples[0] = { - first->id, - samples.samples[0].keyId, - samples.samples[0].keyValue, - chrono::time_point_cast(first->timestamp).time_since_epoch().count(), - 0, - DataStorm::SampleEvent::Update, - first->encodeValue(getCommunicator())}; + samples.samples[0] = DataSample{ + .id = first->id, + .keyId = samples.samples[0].keyId, + .keyValue = samples.samples[0].keyValue, + .timestamp = chrono::time_point_cast(first->timestamp).time_since_epoch().count(), + .tag = 0, + .event = DataStorm::SampleEvent::Update, + .value = first->encodeValue(getCommunicator())}; } } return samples; diff --git a/cpp/src/DataStorm/DataElementI.h b/cpp/src/DataStorm/DataElementI.h index bd55f90baf6..1eefdb35c31 100644 --- a/cpp/src/DataStorm/DataElementI.h +++ b/cpp/src/DataStorm/DataElementI.h @@ -77,19 +77,29 @@ namespace DataStormI struct Listener { - Listener(DataStormContract::SessionPrx proxy, const std::string& facet) - : proxy(facet.empty() ? proxy : proxy->ice_facet(facet)) + Listener(DataStormContract::SessionPrx proxy, std::string facet) + : proxy( + facet.empty() ? std::move(proxy) + : proxy->ice_facet(std::move(facet))) { } + /** + * Determines if any subscriber matches the given sample. + * + * @param sample The sample to evaluate against the subscribers. + * @param matchKey If true, the sample's key is matched against subscriber keys. + * If false, the key match is skipped. + * @return True if at least one subscriber matches the sample, otherwise false. + */ bool matchOne(const std::shared_ptr& sample, bool matchKey) const { - for (const auto& s : subscribers) + for (const auto& [_, subscriber] : subscribers) { - if ((!matchKey || s.second->keys.empty() || - s.second->keys.find(sample->key) != s.second->keys.end()) && - (!s.second->filter || s.second->filter->match(sample->key)) && - (!s.second->sampleFilter || s.second->sampleFilter->match(sample))) + if ((!matchKey || subscriber->keys.empty() || + subscriber->keys.find(sample->key) != subscriber->keys.end()) && + (!subscriber->filter || subscriber->filter->match(sample->key)) && + (!subscriber->sampleFilter || subscriber->sampleFilter->match(sample))) { return true; } @@ -129,7 +139,9 @@ namespace DataStormI return subscribers.empty(); } + // The proxy to the peer session. DataStormContract::SessionPrx proxy; + // A map containing the data element subscribers, indexed by the topic ID and the element ID. std::map, std::shared_ptr> subscribers; }; @@ -267,6 +279,9 @@ namespace DataStormI mutable std::shared_ptr _sample; DataStormContract::SessionPrx _forwarder; std::map, std::vector>> _connectedKeys; + + // A map containing the element listeners, indexed by the session servant and the target facet. The + // implementation of forward utilizes the listener map to forward calls to the peer sessions. std::map _listeners; private: diff --git a/cpp/src/DataStorm/NodeI.cpp b/cpp/src/DataStorm/NodeI.cpp index c528b40acf6..0a9c7211c50 100644 --- a/cpp/src/DataStorm/NodeI.cpp +++ b/cpp/src/DataStorm/NodeI.cpp @@ -17,11 +17,10 @@ using namespace DataStormContract; namespace { - // TODO convert to a middleware - class DispatchInterceptorI : public Ice::Object + class SessionDispatcher : public Ice::Object { public: - DispatchInterceptorI(shared_ptr node, shared_ptr executor) + SessionDispatcher(shared_ptr node, shared_ptr executor) : _node(std::move(node)), _executor(std::move(executor)) { @@ -48,15 +47,15 @@ namespace NodeI::NodeI(const shared_ptr& instance) : _instance(instance), + _nextPublisherSessionId{0}, + _nextSubscriberSessionId{0}, _proxy{instance->getObjectAdapter()->createProxy({Ice::generateUUID(), ""})}, // The subscriber and publisher collocated forwarders are initalized here to avoid using a nullable proxy. These // objects are only used after the node is initialized and are removed in destroy implementation. - _subscriberForwarder{instance->getCollocatedForwarder()->add( - [this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToSubscribers(inParams, current); })}, _publisherForwarder{instance->getCollocatedForwarder()->add( [this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToPublishers(inParams, current); })}, - _nextSubscriberSessionId{0}, - _nextPublisherSessionId{0} + _subscriberForwarder{instance->getCollocatedForwarder()->add( + [this](Ice::ByteSeq inParams, const Ice::Current& current) { forwardToSubscribers(inParams, current); })} { } @@ -77,7 +76,9 @@ NodeI::init() auto adapter = instance->getObjectAdapter(); adapter->add(self, _proxy->ice_getIdentity()); - auto interceptor = make_shared(self, instance->getCallbackExecutor()); + // Register the SessionDispatcher object as the default servant for subscriber and publisher sessions. + // The "s" category handles subscriber sessions, and the "p" category handles publisher sessions. + auto interceptor = make_shared(self, instance->getCallbackExecutor()); adapter->addDefaultServant(interceptor, "s"); adapter->addDefaultServant(interceptor, "p"); } @@ -97,9 +98,7 @@ NodeI::destroy(bool ownsCommunicator) if (!ownsCommunicator) { - // // Notifies peer sessions of the disconnection. - // for (const auto& [_, subscriber] : _subscribers) { if (auto session = subscriber->getSession()) diff --git a/cpp/src/DataStorm/NodeI.h b/cpp/src/DataStorm/NodeI.h index 40eda295998..2be7af2528c 100644 --- a/cpp/src/DataStorm/NodeI.h +++ b/cpp/src/DataStorm/NodeI.h @@ -92,18 +92,31 @@ namespace DataStormI void forwardToSubscribers(const Ice::ByteSeq&, const Ice::Current&) const; void forwardToPublishers(const Ice::ByteSeq&, const Ice::Current&) const; - mutable std::mutex _mutex; - mutable std::condition_variable _cond; std::weak_ptr _instance; + mutable std::mutex _mutex; + std::int64_t _nextPublisherSessionId; + std::int64_t _nextSubscriberSessionId; + + // The proxy for this node. DataStormContract::NodePrx _proxy; - DataStormContract::SubscriberSessionPrx _subscriberForwarder; + + // A map of all publisher sessions, indexed by the identity of the peer node. + std::map> _publishers; + + // A proxy to a colocated publisher session object that forwards requests to all active publisher sessions. DataStormContract::PublisherSessionPrx _publisherForwarder; + + // A map of all publisher sessions, indexed by the identity of each session. + std::map> _publisherSessions; + + // A map of all subscriber sessions, indexed by the identity of the peer node. std::map> _subscribers; - std::map> _publishers; + + // A proxy to a colocated subscriber session object that forwards requests to all active subscriber sessions. + DataStormContract::SubscriberSessionPrx _subscriberForwarder; + + // A map of all subscriber sessions, indexed by the identity of each session. std::map> _subscriberSessions; - std::map> _publisherSessions; - std::int64_t _nextSubscriberSessionId; - std::int64_t _nextPublisherSessionId; }; } #endif diff --git a/cpp/src/DataStorm/NodeSessionI.cpp b/cpp/src/DataStorm/NodeSessionI.cpp index ba68a99f404..aa71992f5a1 100644 --- a/cpp/src/DataStorm/NodeSessionI.cpp +++ b/cpp/src/DataStorm/NodeSessionI.cpp @@ -19,9 +19,12 @@ namespace class NodeForwarder : public Node, public enable_shared_from_this { public: - NodeForwarder(shared_ptr nodeSessionManager, shared_ptr session, NodePrx node) + NodeForwarder( + shared_ptr nodeSessionManager, + shared_ptr nodeSession, + NodePrx node) : _nodeSessionManager(std::move(nodeSessionManager)), - _session(std::move(session)), + _nodeSession(std::move(nodeSession)), _node(std::move(node)) { } @@ -29,7 +32,7 @@ namespace void initiateCreateSession(optional publisher, const Ice::Current& current) final { Ice::checkNotNull(publisher, __FILE__, __LINE__, current); - if (auto session = _session.lock()) + if (auto nodeSession = _nodeSession.lock()) { try { @@ -53,12 +56,12 @@ namespace Ice::checkNotNull(subscriber, __FILE__, __LINE__, current); Ice::checkNotNull(subscriberSession, __FILE__, __LINE__, current); - if (auto session = _session.lock()) + if (auto nodeSession = _nodeSession.lock()) { try { updateNodeAndSessionProxy(*subscriber, subscriberSession, current); - session->addSession(*subscriberSession); + nodeSession->addSession(*subscriberSession); // Forward the call to the target Node object, don't need to wait for the result. _node->createSessionAsync(subscriber, subscriberSession, true, nullptr); } @@ -76,12 +79,12 @@ namespace Ice::checkNotNull(publisher, __FILE__, __LINE__, current); Ice::checkNotNull(publisherSession, __FILE__, __LINE__, current); - if (auto session = _session.lock()) + if (auto nodeSession = _nodeSession.lock()) { try { updateNodeAndSessionProxy(*publisher, publisherSession, current); - session->addSession(*publisherSession); + nodeSession->addSession(*publisherSession); // Forward the call to the target Node object, don't need to wait for the result. _node->confirmCreateSessionAsync(publisher, publisherSession, nullptr); } @@ -99,29 +102,28 @@ namespace { if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty()) { - auto peerSession = _nodeSessionManager->createOrGet(node, current.con, false); - assert(peerSession); - node = peerSession->getPublicNode(); + shared_ptr nodeSession = _nodeSessionManager->createOrGet(node, current.con, false); + assert(nodeSession); + node = nodeSession->getPublicNode(); if (session) { - session = peerSession->forwarder(*session); + session = nodeSession->forwarder(*session); } } } const shared_ptr _nodeSessionManager; - const weak_ptr _session; + const weak_ptr _nodeSession; const NodePrx _node; }; } NodeSessionI::NodeSessionI( shared_ptr instance, - optional node, + NodePrx node, Ice::ConnectionPtr connection, bool forwardAnnouncements) : _instance(std::move(instance)), - _traceLevels(_instance->getTraceLevels()), _node(std::move(node)), _connection(std::move(connection)) { @@ -150,9 +152,9 @@ NodeSessionI::init() _publicNode = _node; } - if (_traceLevels->session > 0) + if (_instance->getTraceLevels()->session > 0) { - Trace out(_traceLevels, _traceLevels->sessionCat); + Trace out(_instance->getTraceLevels(), _instance->getTraceLevels()->sessionCat); out << "created node session (peer = `" << _publicNode << "'):\n" << _connection->toString(); } } @@ -184,9 +186,9 @@ NodeSessionI::destroy() { } - if (_traceLevels->session > 0) + if (_instance->getTraceLevels()->session > 0) { - Trace out(_traceLevels, _traceLevels->sessionCat); + Trace out(_instance->getTraceLevels(), _instance->getTraceLevels()->sessionCat); out << "destroyed node session (peer = `" << _publicNode << "')"; } } diff --git a/cpp/src/DataStorm/NodeSessionI.h b/cpp/src/DataStorm/NodeSessionI.h index f2a09e2df79..7a0315c09fb 100644 --- a/cpp/src/DataStorm/NodeSessionI.h +++ b/cpp/src/DataStorm/NodeSessionI.h @@ -11,12 +11,10 @@ namespace DataStormI { - class TraceLevels; - class NodeSessionI final : public std::enable_shared_from_this { public: - NodeSessionI(std::shared_ptr, std::optional, Ice::ConnectionPtr, bool); + NodeSessionI(std::shared_ptr, DataStormContract::NodePrx, Ice::ConnectionPtr, bool); void init(); void destroy(); @@ -43,14 +41,18 @@ namespace DataStormI private: const std::shared_ptr _instance; - const std::shared_ptr _traceLevels; - std::optional _node; + DataStormContract::NodePrx _node; const Ice::ConnectionPtr _connection; std::mutex _mutex; bool _destroyed; std::optional _publicNode; + + // A proxy for forwarding announcements to the target node when announce forwarding is enabled. + // If announce forwarding is disabled, this will be nullopt. std::optional _lookup; + + // A map containing all the publisher and subscriber sessions established between two nodes. std::map> _sessions; }; } diff --git a/cpp/src/DataStorm/NodeSessionManager.cpp b/cpp/src/DataStorm/NodeSessionManager.cpp index 2f3e7e1af68..3a6b90f2a55 100644 --- a/cpp/src/DataStorm/NodeSessionManager.cpp +++ b/cpp/src/DataStorm/NodeSessionManager.cpp @@ -30,7 +30,7 @@ namespace auto pos = current.id.name.find('-'); if (pos != string::npos && pos < current.id.name.length()) { - if (auto session = _nodeSessionManager->getSession(current.id.name.substr(pos + 1))) + if (auto session = _nodeSessionManager->getSession(Ice::Identity{current.id.name.substr(pos + 1), ""})) { // Forward the call to the target session, don't need to wait for the result. auto id = Ice::Identity{current.id.name.substr(0, pos), current.id.category.substr(0, 1)}; @@ -310,14 +310,11 @@ NodeSessionManager::forward(const Ice::ByteSeq& inParams, const Ice::Current& cu } } - // Forward the call to all nodes we are connected to, don't need to wait for the result. - for (const auto& [_, lookup] : _connectedTo) + // Forward the call to the connected to node, don't need to wait for the result. + if (_connectedTo && (*_connectedTo)->ice_getCachedConnection() != _exclude) { - if (lookup.second->ice_getCachedConnection() != _exclude) - { - lookup.second - ->ice_invokeAsync(current.operation, current.mode, inParams, nullptr, nullptr, nullptr, current.ctx); - } + (*_connectedTo) + ->ice_invokeAsync(current.operation, current.mode, inParams, nullptr, nullptr, nullptr, current.ctx); } } @@ -394,7 +391,7 @@ NodeSessionManager::connected(NodePrx node, LookupPrx lookup) { lookup = lookup->ice_fixed(connection); } - _connectedTo.emplace(node->ice_getIdentity(), make_pair(node, lookup)); + _connectedTo = lookup; auto readerNames = instance->getTopicFactory()->getTopicReaderNames(); auto writerNames = instance->getTopicFactory()->getTopicWriterNames(); @@ -424,7 +421,7 @@ NodeSessionManager::disconnected(NodePrx node, LookupPrx lookup) Trace out(_traceLevels, _traceLevels->sessionCat); out << "disconnected node session (peer = `" << node << "')"; } - _connectedTo.erase(node->ice_getIdentity()); + _connectedTo = nullopt; lock.unlock(); connect(lookup, _nodePrx); } diff --git a/cpp/src/DataStorm/NodeSessionManager.h b/cpp/src/DataStorm/NodeSessionManager.h index 9363138b49b..9d3b343c32d 100644 --- a/cpp/src/DataStorm/NodeSessionManager.h +++ b/cpp/src/DataStorm/NodeSessionManager.h @@ -39,10 +39,6 @@ namespace DataStormI const Ice::ConnectionPtr& = nullptr) const; std::shared_ptr getSession(const Ice::Identity&) const; - std::shared_ptr getSession(const std::string& name) const - { - return getSession(Ice::Identity{name, ""}); - } void forward(const Ice::ByteSeq&, const Ice::Current&) const; @@ -72,8 +68,14 @@ namespace DataStormI int _retryCount; + // A map containing the `NodeSessionI` servants for all nodes that have an active session with this node. + // The map is indexed by the identity of the nodes. std::map> _sessions; - std::map> _connectedTo; + + // The `Lookup` proxy for the `ConnectTo` node, which is set when there is an active connection to the target + // node. If the `DataStorm.Node.ConnectTo` property is configured, the session manager attempts to connect to + // the specified node and sets this member once the connection is established. + std::optional _connectedTo; mutable Ice::ConnectionPtr _exclude; DataStormContract::LookupPrx _forwarder; diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 92ccdf6484d..bde9ef40b9b 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -166,6 +166,7 @@ SessionI::attachTopic(TopicSpec spec, const Ice::Current&) _session->attachTagsAsync(topic->getId(), tags, true, nullptr); } + // Attach to the remote elements that match the local elements. auto specs = topic->getElementSpecs(spec.id, spec.elements, shared_from_this()); if (!specs.empty()) { @@ -174,6 +175,7 @@ SessionI::attachTopic(TopicSpec spec, const Ice::Current&) Trace out(_traceLevels, _traceLevels->sessionCat); out << _id << ": matched elements `" << spec << "' on `" << topic << "'"; } + // Don't wait for the response here, the remote session will send an ack. _session->attachElementsAsync(topic->getId(), specs, true, nullptr); } }); @@ -216,7 +218,7 @@ SessionI::attachTags(int64_t topicId, ElementInfoSeq tags, bool initialize, cons runWithTopics( topicId, - [&](TopicI* topic, TopicSubscriber& subscriber, TopicSubscribers&) + [&](TopicI* topic, TopicSubscriber& subscriber) { if (_traceLevels->session > 2) { @@ -246,7 +248,7 @@ SessionI::detachTags(int64_t topicId, Ice::LongSeq tags, const Ice::Current&) runWithTopics( topicId, - [&](TopicI* topic, TopicSubscriber& subscriber, TopicSubscribers&) + [&](TopicI* topic, TopicSubscriber& subscriber) { if (_traceLevels->session > 2) { @@ -272,7 +274,7 @@ SessionI::announceElements(int64_t topicId, ElementInfoSeq elements, const Ice:: runWithTopics( topicId, - [&](TopicI* topic, TopicSubscriber&, TopicSubscribers&) + [&](TopicI* topic, TopicSubscriber&) { if (_traceLevels->session > 2) { @@ -296,7 +298,7 @@ SessionI::announceElements(int64_t topicId, ElementInfoSeq elements, const Ice:: } void -SessionI::attachElements(int64_t id, ElementSpecSeq elements, bool initialize, const Ice::Current&) +SessionI::attachElements(int64_t topicId, ElementSpecSeq elements, bool initialize, const Ice::Current&) { lock_guard lock(_mutex); if (!_session) @@ -306,20 +308,21 @@ SessionI::attachElements(int64_t id, ElementSpecSeq elements, bool initialize, c auto now = chrono::system_clock::now(); runWithTopics( - id, - [&](TopicI* topic, TopicSubscriber& subscriber, TopicSubscribers&) + topicId, + [&](TopicI* topic, TopicSubscriber& subscriber) { if (_traceLevels->session > 2) { Trace out(_traceLevels, _traceLevels->sessionCat); - out << _id << ": attaching elements `[" << elements << "]@" << id << "' on topic `" << topic << "'"; + out << _id << ": attaching elements `[" << elements << "]@" << topicId << "' on topic `" << topic + << "'"; if (initialize) { out << " (initializing)"; } } - auto specAck = topic->attachElements(id, elements, shared_from_this(), *_session, now); + auto specAck = topic->attachElements(topicId, elements, shared_from_this(), *_session, now); if (initialize) { @@ -334,8 +337,8 @@ SessionI::attachElements(int64_t id, ElementSpecSeq elements, bool initialize, c if (_traceLevels->session > 2) { Trace out(_traceLevels, _traceLevels->sessionCat); - out << _id << ": attaching elements matched `[" << specAck << "]@" << id << "' on topic `" << topic - << "'"; + out << _id << ": attaching elements matched `[" << specAck << "]@" << topicId << "' on topic `" + << topic << "'"; } _session->attachElementsAckAsync(topic->getId(), specAck, nullptr); } @@ -353,7 +356,7 @@ SessionI::attachElementsAck(int64_t id, ElementSpecAckSeq elements, const Ice::C auto now = chrono::system_clock::now(); runWithTopics( id, - [&](TopicI* topic, TopicSubscriber&, TopicSubscribers&) + [&](TopicI* topic, TopicSubscriber&) { if (_traceLevels->session > 2) { @@ -404,7 +407,7 @@ SessionI::detachElements(int64_t id, Ice::LongSeq elements, const Ice::Current&) auto k = subscriber.remove(e); for (auto& s : k.getSubscribers()) { - for (auto key : s.second.keys) + for (const auto& key : s.second.keys) { if (e > 0) { @@ -919,8 +922,9 @@ SessionI::subscribeToKey( const string& name, int priority) { + // Called with the session and topic mutex locked. assert(_topics.find(topicId) != _topics.end()); - auto& subscriber = _topics.at(topicId).getSubscriber(element->getTopic()); + TopicSubscriber& subscriber = _topics.at(topicId).getSubscriber(element->getTopic()); if (_traceLevels->session > 1) { Trace out(_traceLevels, _traceLevels->sessionCat); @@ -1134,15 +1138,14 @@ SessionI::runWithTopics( for (auto topic : topics) { retained.push_back(topic); - unique_lock l(topic->getMutex()); + unique_lock lock(topic->getMutex()); if (topic->isDestroyed()) { continue; } - _topicLock = &l; + _topicLock = &lock; fn(topic); _topicLock = nullptr; - l.unlock(); } } @@ -1152,38 +1155,16 @@ SessionI::runWithTopics(int64_t id, function fn auto t = _topics.find(id); if (t != _topics.end()) { - for (auto& s : t->second.getSubscribers()) + for (auto& [topic, subscriber] : t->second.getSubscribers()) { - unique_lock l(s.first->getMutex()); - if (s.first->isDestroyed()) - { - continue; - } - _topicLock = &l; - fn(s.first, s.second); - _topicLock = nullptr; - l.unlock(); - } - } -} - -void -SessionI::runWithTopics(int64_t id, function fn) -{ - auto t = _topics.find(id); - if (t != _topics.end()) - { - for (auto& s : t->second.getSubscribers()) - { - if (s.first->isDestroyed()) + unique_lock lock(topic->getMutex()); + if (topic->isDestroyed()) { continue; } - unique_lock l(s.first->getMutex()); - _topicLock = &l; - fn(s.first, s.second, t->second); + _topicLock = &lock; + fn(topic, subscriber); _topicLock = nullptr; - l.unlock(); } } } @@ -1197,15 +1178,14 @@ SessionI::runWithTopic(int64_t id, TopicI* topic, functionsecond.getSubscribers().find(topic); if (p != t->second.getSubscribers().end()) { - unique_lock l(topic->getMutex()); + unique_lock lock(topic->getMutex()); if (topic->isDestroyed()) { return; } - _topicLock = &l; + _topicLock = &lock; fn(p->second); _topicLock = nullptr; - l.unlock(); } } } @@ -1222,7 +1202,7 @@ SubscriberSessionI::getTopics(const string& name) const } void -SubscriberSessionI::s(int64_t topicId, int64_t elementId, DataSample s, const Ice::Current& current) +SubscriberSessionI::s(int64_t topicId, int64_t elementId, DataSample dataSample, const Ice::Current& current) { lock_guard lock(_mutex); if (!_session || current.con != _connection) @@ -1230,7 +1210,8 @@ SubscriberSessionI::s(int64_t topicId, int64_t elementId, DataSample s, const Ic if (current.con != _connection) { Trace out(_traceLevels, _traceLevels->sessionCat); - out << _id << ": discarding sample `" << s.id << "' from `e" << elementId << '@' << topicId << "'\n"; + out << _id << ": discarding sample `" << dataSample.id << "' from `e" << elementId << '@' << topicId + << "'\n"; if (_connection) { out << current.con->toString() << "\n" << _connection->toString(); @@ -1242,66 +1223,77 @@ SubscriberSessionI::s(int64_t topicId, int64_t elementId, DataSample s, const Ic } return; } + + // Queue the received sample with all matching subscribers. auto now = chrono::system_clock::now(); runWithTopics( topicId, - [&](TopicI* topic, TopicSubscriber& subscriber, TopicSubscribers&) + [&](TopicI* topic, TopicSubscriber& topicSubscriber) { - auto e = subscriber.get(elementId); - if (e && !e->getSubscribers().empty()) + auto elementSubscribers = topicSubscriber.get(elementId); + if (elementSubscribers && !elementSubscribers->getSubscribers().empty()) { if (_traceLevels->session > 2) { Trace out(_traceLevels, _traceLevels->sessionCat); - out << _id << ": queuing sample `" << s.id << "[k" << s.keyId << "]' from `e" << elementId << '@' - << topicId << "'"; + out << _id << ": queuing sample `" << dataSample.id << "[k" << dataSample.keyId << "]' from `e" + << elementId << '@' << topicId << "'"; if (!current.facet.empty()) { out << " facet=" << current.facet; } out << " to ["; - for (auto q = e->getSubscribers().begin(); q != e->getSubscribers().end(); ++q) + bool first = true; + for (const auto& [element, elementSubscriber] : elementSubscribers->getSubscribers()) { - if (q != e->getSubscribers().begin()) + if (!first) { out << ", "; } - out << q->first; - if (!q->second.facet.empty()) + out << element; + if (!elementSubscriber.facet.empty()) { - out << ":" << q->second.facet; + out << ":" << elementSubscriber.facet; } + first = false; } out << "]"; } shared_ptr key; - if (s.keyValue.empty()) + if (dataSample.keyValue.empty()) { - key = subscriber.keys[s.keyId].first; + key = topicSubscriber.keys[dataSample.keyId].first; } else { - key = topic->getKeyFactory()->decode(_instance->getCommunicator(), s.keyValue); + key = topic->getKeyFactory()->decode(_instance->getCommunicator(), dataSample.keyValue); } assert(key); - auto impl = topic->getSampleFactory()->create( + auto sample = topic->getSampleFactory()->create( _id, - e->name, - - s.id, - s.event, + elementSubscribers->name, + dataSample.id, + dataSample.event, key, - subscriber.tags[s.tag], - s.value, - s.timestamp); - for (auto& es : e->getSubscribers()) + topicSubscriber.tags[dataSample.tag], + dataSample.value, + dataSample.timestamp); + + for (auto& [element, elementSubscriber] : elementSubscribers->getSubscribers()) { - if (es.second.initialized && (s.keyId <= 0 || es.second.keys.find(key) != es.second.keys.end())) + if (elementSubscriber.initialized && + (dataSample.keyId <= 0 || elementSubscriber.keys.find(key) != elementSubscriber.keys.end())) { - es.second.lastId = s.id; - es.first->queue(impl, e->priority, shared_from_this(), current.facet, now, !s.keyValue.empty()); + elementSubscriber.lastId = dataSample.id; + element->queue( + sample, + elementSubscribers->priority, + shared_from_this(), + current.facet, + now, + !dataSample.keyValue.empty()); } } } diff --git a/cpp/src/DataStorm/SessionI.h b/cpp/src/DataStorm/SessionI.h index 33b17a8a0a4..58c30ce2c57 100644 --- a/cpp/src/DataStorm/SessionI.h +++ b/cpp/src/DataStorm/SessionI.h @@ -336,16 +336,50 @@ namespace DataStormI const std::shared_ptr&); protected: + /// Runs the provided callback function for each topic with the specified name. + /// The callback is executed with the topic's mutex locked. + /// + /// The topics are appended to the `retained` vector, allowing the caller to release them after unlocking the + /// session mutex. This method must be called with the session mutex already locked. + /// + /// @param name The name of the topics to process. + /// @param retained A vector to store the topics, ensuring they are retained until released by the caller. + /// @param callback The callback function to execute for each topic void runWithTopics( - const std::string&, - std::vector>&, - std::function&)>); - void runWithTopics(std::int64_t, std::function); - void runWithTopics(std::int64_t, std::function); - void runWithTopic(std::int64_t, TopicI*, std::function); - - virtual std::vector> getTopics(const std::string&) const = 0; - virtual void reconnect(DataStormContract::NodePrx) = 0; + const std::string& name, + std::vector>& retained, + std::function&)> callback); + + /// Runs the provided callback function for each subscriber of the specified topic. + /// The callback is executed with the topic's mutex locked. + /// + /// @param id The ID of the topic to process. + /// @param callback The callback function to execute for each subscriber. + void runWithTopics(std::int64_t id, std::function callback); + + /// Runs the provided callback function for the specified topic, if it is among the subscribers for the given + /// topic ID. + /// The callback is executed with the topic's mutex locked. + /// + /// @param id The ID of the topic to process. + /// @param topic The topic to process. + /// @param callback The callback function to execute for the subscriber. + void runWithTopic(std::int64_t id, TopicI* topic, std::function callback); + + /// Returns the topics that match the specified name. + /// + /// This method is implemented by the derived classes `PublisherSessionI` and `SubscriberSessionI`: + /// - `PublisherSessionI` returns the topic writers for the specified topic name. + /// - `SubscriberSessionI` returns the topic readers for the specified topic name. + /// + /// @param name The name of the topics to retrieve. + /// @return A vector containing the topics that match the specified name. + virtual std::vector> getTopics(const std::string& name) const = 0; + + /// Attempts to reconnect the session with the specified node. + /// + /// @param node The node to reconnect to. + virtual void reconnect(DataStormContract::NodePrx node) = 0; virtual void remove() = 0; const std::shared_ptr _instance; @@ -361,7 +395,7 @@ namespace DataStormI IceInternal::TimerTaskPtr _retryTask; // Keeps track of the topics that this session is subscribed to. The key represents the topic ID in the remote - // node. + // node. The TopicSubscribers object contains the subscribers for the remote topic. std::map _topics; std::unique_lock* _topicLock; diff --git a/cpp/src/DataStorm/TopicI.cpp b/cpp/src/DataStorm/TopicI.cpp index a76ba16f1d8..eec2ac91795 100644 --- a/cpp/src/DataStorm/TopicI.cpp +++ b/cpp/src/DataStorm/TopicI.cpp @@ -196,90 +196,114 @@ ElementSpecSeq TopicI::getElementSpecs(int64_t topicId, const ElementInfoSeq& infos, const shared_ptr& session) { ElementSpecSeq specs; + // Iterate over the element infos representing the remote keys, and filter and compute the element spec for local + // keys and filters that match. Positive IDs represent keys and negative IDs represent filters. for (const auto& info : infos) { - // Positive IDs represent keys and negative IDs represent filters. if (info.id > 0) { auto key = _keyFactory->decode(_instance->getCommunicator(), info.value); auto p = _keyElements.find(key); if (p != _keyElements.end()) { - // If we have a matching key add it to the spec. + // If we have a matching key add it to the spec, with all the key data readers or writers. ElementDataSeq elements; - for (auto k : p->second) + for (const auto& dataElement : p->second) { - elements.push_back({k->getId(), k->getConfig(), session->getLastIds(topicId, info.id, k)}); + elements.push_back(ElementData{ + .id = dataElement->getId(), + .config = dataElement->getConfig(), + .lastIds = session->getLastIds(topicId, info.id, dataElement)}); } - specs.push_back({std::move(elements), key->getId(), "", {}, info.id, ""}); + specs.push_back(ElementSpec{ + .elements = std::move(elements), + .id = key->getId(), + .name = "", + .value = {}, + .peerId = info.id, + .peerName = ""}); } - for (auto e : _filteredElements) + // Add filtered elements matching the key. + for (const auto& [filter, filteredDataElements] : _filteredElements) { - if (e.first->match(key)) + if (filter->match(key)) { ElementDataSeq elements; - for (auto f : e.second) + for (const auto& dataElement : filteredDataElements) { - elements.push_back({f->getId(), f->getConfig(), session->getLastIds(topicId, info.id, f)}); + elements.push_back(ElementData{ + .id = dataElement->getId(), + .config = dataElement->getConfig(), + .lastIds = session->getLastIds(topicId, info.id, dataElement)}); } - specs.push_back( - {std::move(elements), - -e.first->getId(), - e.first->getName(), - e.first->encode(_instance->getCommunicator()), - info.id, - ""}); + + specs.push_back(ElementSpec{ + .elements = std::move(elements), + .id = -filter->getId(), + .name = filter->getName(), + .value = filter->encode(_instance->getCommunicator()), + .peerId = info.id, + .peerName = ""}); } } } else { - shared_ptr filter; + // An empty filter value represents a match all filter. Otherwise we decode the filter using the key filter + // factory. + shared_ptr peerFilter; if (info.value.empty()) { - filter = alwaysMatchFilter; + peerFilter = alwaysMatchFilter; } else { - filter = _keyFilterFactories->decode(_instance->getCommunicator(), info.name, info.value); + peerFilter = _keyFilterFactories->decode(_instance->getCommunicator(), info.name, info.value); } - for (auto e : _keyElements) + // Add key elements matching the filter. + for (const auto& [key, keyDataElements] : _keyElements) { - if (filter->match(e.first)) + if (peerFilter->match(key)) { ElementDataSeq elements; - for (auto k : e.second) + for (const auto& dataElement : keyDataElements) { - elements.push_back({k->getId(), k->getConfig(), session->getLastIds(topicId, info.id, k)}); + elements.push_back(ElementData{ + .id = dataElement->getId(), + .config = dataElement->getConfig(), + .lastIds = session->getLastIds(topicId, info.id, dataElement)}); } - specs.push_back( - {std::move(elements), - e.first->getId(), - "", - e.first->encode(_instance->getCommunicator()), - info.id, - info.name}); + specs.push_back(ElementSpec{ + .elements = std::move(elements), + .id = key->getId(), + .name = "", + .value = key->encode(_instance->getCommunicator()), + .peerId = info.id, + .peerName = info.name}); } } - if (filter == alwaysMatchFilter) + if (peerFilter == alwaysMatchFilter) { - for (auto e : _filteredElements) + for (const auto& [filter, filteredDataElements] : _filteredElements) { ElementDataSeq elements; - for (auto f : e.second) + for (const auto& dataElement : filteredDataElements) { - elements.push_back({f->getId(), f->getConfig(), session->getLastIds(topicId, info.id, f)}); + elements.push_back(ElementData{ + .id = dataElement->getId(), + .config = dataElement->getConfig(), + .lastIds = session->getLastIds(topicId, info.id, dataElement)}); } - specs.push_back( - {std::move(elements), - -e.first->getId(), - e.first->getName(), - e.first->encode(_instance->getCommunicator()), - info.id, - info.name}); + specs.push_back(ElementSpec{ + .elements = std::move(elements), + .id = -filter->getId(), + .name = filter->getName(), + .value = filter->encode(_instance->getCommunicator()), + .peerId = info.id, + .peerName = info.name}); } } else @@ -288,17 +312,20 @@ TopicI::getElementSpecs(int64_t topicId, const ElementInfoSeq& infos, const shar if (p != _filteredElements.end()) { ElementDataSeq elements; - for (auto f : p->second) + for (const auto& dataElement : p->second) { - elements.push_back({f->getId(), f->getConfig(), session->getLastIds(topicId, info.id, f)}); + elements.push_back(ElementData{ + .id = dataElement->getId(), + .config = dataElement->getConfig(), + .lastIds = session->getLastIds(topicId, info.id, dataElement)}); } - specs.push_back( - {std::move(elements), - -alwaysMatchFilter->getId(), - alwaysMatchFilter->getName(), - alwaysMatchFilter->encode(_instance->getCommunicator()), - info.id, - info.name}); + specs.push_back(ElementSpec{ + .elements = std::move(elements), + .id = -alwaysMatchFilter->getId(), + .name = alwaysMatchFilter->getName(), + .value = alwaysMatchFilter->encode(_instance->getCommunicator()), + .peerId = info.id, + .peerName = info.name}); } } } @@ -343,10 +370,14 @@ TopicI::attachElements( SessionPrx prx, const chrono::time_point& now) { + // Called by the session holding the session and topic locks. + ElementSpecAckSeq specs; for (const auto& spec : elements) { - if (spec.peerId > 0) // Key + // The peer ID is computed by the remote caller and represents our local ID. Positive IDs represent keys and + // negative IDs represent filters. + if (spec.peerId > 0) { auto key = _keyFactory->get(spec.peerId); auto p = _keyElements.find(key); @@ -365,30 +396,31 @@ TopicI::attachElements( } } - for (auto e : p->second) + // Iterate over the data elements for the matching key, attaching them to the data elements of the spec. + for (const auto& dataElement : p->second) { ElementDataAckSeq acks; for (const auto& data : spec.elements) { if (spec.id > 0) // Key { - e->attach(topicId, spec.id, key, nullptr, session, prx, data, now, acks); + dataElement->attach(topicId, spec.id, key, nullptr, session, prx, data, now, acks); } - else if (filter->match(key)) + else if (filter->match(key)) // TODO: can we move the match outside the nested loop? { - e->attach(topicId, spec.id, key, filter, session, prx, data, now, acks); + dataElement->attach(topicId, spec.id, key, filter, session, prx, data, now, acks); } } if (!acks.empty()) { - specs.push_back( - {std::move(acks), - key->getId(), - "", - spec.id < 0 ? key->encode(_instance->getCommunicator()) : Ice::ByteSeq{}, - spec.id, - spec.name}); + specs.push_back(ElementSpecAck{ + .elements = std::move(acks), + .id = key->getId(), + .name = "", + .value = spec.id < 0 ? key->encode(_instance->getCommunicator()) : Ice::ByteSeq{}, + .peerId = spec.id, + .peerName = spec.name}); } } } @@ -414,29 +446,30 @@ TopicI::attachElements( key = _keyFactory->decode(_instance->getCommunicator(), spec.value); } - for (auto e : p->second) + for (const auto& dataElement : p->second) { ElementDataAckSeq acks; for (const auto& data : spec.elements) { if (spec.id < 0) // Filter { - e->attach(topicId, spec.id, nullptr, filter, session, prx, data, now, acks); + dataElement->attach(topicId, spec.id, nullptr, filter, session, prx, data, now, acks); } else if (filter->match(key)) { - e->attach(topicId, spec.id, key, filter, session, prx, data, now, acks); + dataElement->attach(topicId, spec.id, key, filter, session, prx, data, now, acks); } } + if (!acks.empty()) { - specs.push_back( - {std::move(acks), - -filter->getId(), - filter->getName(), - spec.id > 0 ? filter->encode(_instance->getCommunicator()) : Ice::ByteSeq{}, - spec.id, - spec.name}); + specs.push_back(ElementSpecAck{ + .elements = std::move(acks), + .id = -filter->getId(), + .name = filter->getName(), + .value = spec.id > 0 ? filter->encode(_instance->getCommunicator()) : Ice::ByteSeq{}, + .peerId = spec.id, + .peerName = spec.name}); } } } @@ -721,6 +754,8 @@ TopicI::waitForListeners(int count) const } _cond.wait(lock); ++_notified; + // Ensure that notifyListenerWaiters checks the wait condition after _notified is incremented. + _cond.notify_all(); } } diff --git a/cpp/src/DataStorm/TopicI.h b/cpp/src/DataStorm/TopicI.h index ee20e597ac6..5921daa23a3 100644 --- a/cpp/src/DataStorm/TopicI.h +++ b/cpp/src/DataStorm/TopicI.h @@ -48,8 +48,16 @@ namespace DataStormI DataStormContract::TopicSpec getTopicSpec() const; DataStormContract::ElementInfoSeq getTags() const; - DataStormContract::ElementSpecSeq - getElementSpecs(std::int64_t, const DataStormContract::ElementInfoSeq&, const std::shared_ptr&); + /// Compute the element specs for the local elements that match the given element infos. + /// + /// @param topicId The remote topic ID for the provided element infos. + /// @param infos The element infos to match. + /// @param session The session that requested the element specs. + /// @return The element specs for the local elements that match the given element infos. + DataStormContract::ElementSpecSeq getElementSpecs( + std::int64_t topicId, + const DataStormContract::ElementInfoSeq& infos, + const std::shared_ptr& session); void attach(std::int64_t, std::shared_ptr, DataStormContract::SessionPrx); void detach(std::int64_t, const std::shared_ptr&); diff --git a/cpp/src/DataStorm/TraceUtil.cpp b/cpp/src/DataStorm/TraceUtil.cpp index 84e880b72d4..eb8ba4ca524 100644 --- a/cpp/src/DataStorm/TraceUtil.cpp +++ b/cpp/src/DataStorm/TraceUtil.cpp @@ -15,14 +15,14 @@ using namespace DataStormI; # pragma GCC diagnostic ignored "-Wshadow" #endif -TraceLevels::TraceLevels(const Ice::PropertiesPtr& properties, const Ice::LoggerPtr& logger) +TraceLevels::TraceLevels(const Ice::PropertiesPtr& properties, Ice::LoggerPtr logger) : topic(properties->getIcePropertyAsInt("DataStorm.Trace.Topic")), topicCat("Topic"), data(properties->getIcePropertyAsInt("DataStorm.Trace.Data")), dataCat("Data"), session(properties->getIcePropertyAsInt("DataStorm.Trace.Session")), sessionCat("Session"), - logger(logger) + logger(std::move(logger)) { } diff --git a/cpp/src/DataStorm/TraceUtil.h b/cpp/src/DataStorm/TraceUtil.h index b7791e22a5f..71e5c1eb767 100644 --- a/cpp/src/DataStorm/TraceUtil.h +++ b/cpp/src/DataStorm/TraceUtil.h @@ -208,7 +208,7 @@ namespace DataStormI class TraceLevels { public: - TraceLevels(const Ice::PropertiesPtr&, const Ice::LoggerPtr&); + TraceLevels(const Ice::PropertiesPtr&, Ice::LoggerPtr); const int topic; const char* topicCat;