From f5f8aa936d50b2f66b0f313e33eecc2169dea0d9 Mon Sep 17 00:00:00 2001 From: Jose Date: Wed, 27 Nov 2024 17:25:34 +0100 Subject: [PATCH] DataStorm doc & impl comments (#3204) --- cpp/include/DataStorm/DataStorm.h | 47 +++++++++------- cpp/include/DataStorm/InternalT.h | 7 +-- cpp/include/DataStorm/Types.h | 2 +- cpp/src/DataStorm/Contract.ice | 71 +++++++++++++++++++----- cpp/src/DataStorm/DataElementI.cpp | 10 +--- cpp/src/DataStorm/LookupI.cpp | 29 +++++++--- cpp/src/DataStorm/NodeSessionManager.cpp | 31 +++++++++-- cpp/src/DataStorm/SessionI.cpp | 6 ++ cpp/src/DataStorm/SessionI.h | 28 +++++++++- cpp/src/DataStorm/TopicI.cpp | 14 +++-- cpp/src/DataStorm/TopicI.h | 25 ++++++++- 11 files changed, 198 insertions(+), 72 deletions(-) diff --git a/cpp/include/DataStorm/DataStorm.h b/cpp/include/DataStorm/DataStorm.h index 19c5ae5fb82..3e578b5ef20 100644 --- a/cpp/include/DataStorm/DataStorm.h +++ b/cpp/include/DataStorm/DataStorm.h @@ -231,16 +231,17 @@ namespace DataStorm bool hasWriters() const noexcept; /** - * Wait for given number of writers to be online. The node shutdown will cause this method to raise - * NodeShutdownException. + * Wait for given number of writers to be online. * * @param count The number of writers to wait. + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForWriters(unsigned int count = 1) const; /** - * Wait for readers to be offline. The node shutdown will cause this method to raise - * NodeShutdownException. + * Wait for writers to be offline. + * + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForNoWriters() const; @@ -266,8 +267,9 @@ namespace DataStorm std::vector> getAllUnread() noexcept; /** - * Wait for given number of unread samples to be available. The node shutdown will cause this method to - * raise NodeShutdownException. + * Wait for given number of unread samples to be available. + * + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForUnread(unsigned int count = 1) const; @@ -279,10 +281,10 @@ namespace DataStorm bool hasUnread() const noexcept; /** - * Returns the next unread sample. The node shutdown will cause this method to raise - * NodeShutdownException. + * Returns the next unread sample. * * @return The unread sample. + * @throws NodeShutdownException If the node is shut down while waiting. */ Sample getNextUnread(); @@ -384,15 +386,17 @@ namespace DataStorm bool hasReaders() const noexcept; /** - * Wait for given number of readers to be online. The node shutdown will cause this method to raise - * NodeShutdownException. + * Wait for given number of readers to be online. * * @param count The number of readers to wait. + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForReaders(unsigned int count = 1) const; /** - * Wait for readers to be offline. The node shutdown this method to raise NodeShutdownException. + * Wait for readers to be offline. + * + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForNoReaders() const; @@ -411,9 +415,10 @@ namespace DataStorm std::vector getConnectedKeys() const noexcept; /** - * Get the last written sample. If there's no sample, the std::logic_error exception is raised. + * Get the last written sample. * * @return The last written sample. + * @throws std::logic_error If there's no sample. **/ Sample getLast(); @@ -549,16 +554,17 @@ namespace DataStorm bool hasWriters() const noexcept; /** - * Wait for given number of data writers to be online. The node shutdown will cause this method to raise - * NodeShutdownException. + * Wait for given number of data writers to be online. * * @param count The number of date writers to wait. + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForWriters(unsigned int count = 1) const; /** - * Wait for data writers to be offline. The node shutdown will cause this method to raise - * NodeShutdownException. + * Wait for data writers to be offline. + * + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForNoWriters() const; @@ -577,16 +583,17 @@ namespace DataStorm bool hasReaders() const noexcept; /** - * Wait for given number of data readers to be online. The node shutdown will cause this method to raise - * NodeShutdownException. + * Wait for given number of data readers to be online. * * @param count The number of data readers to wait. + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForReaders(unsigned int count = 1) const; /** - * Wait for data readers to be offline. The node shutdown will cause this method to raise - * NodeShutdownException. + * Wait for data readers to be offline. + * + * @throws NodeShutdownException If the node is shut down while waiting. */ void waitForNoReaders() const; diff --git a/cpp/include/DataStorm/InternalT.h b/cpp/include/DataStorm/InternalT.h index dd76439ab64..a0c2936d752 100644 --- a/cpp/include/DataStorm/InternalT.h +++ b/cpp/include/DataStorm/InternalT.h @@ -203,11 +203,8 @@ namespace DataStormI return k; } - // - // The key is being removed concurrently by the deleter, remove it now - // to allow the insertion of a new key. The deleter won't remove the - // new key. - // + // The key is being removed concurrently by the deleter, remove it now to allow the insertion of a new + // key. The deleter won't remove the new key. _elements.erase(p); } diff --git a/cpp/include/DataStorm/Types.h b/cpp/include/DataStorm/Types.h index 77db824e204..2b8fec0e139 100644 --- a/cpp/include/DataStorm/Types.h +++ b/cpp/include/DataStorm/Types.h @@ -244,7 +244,7 @@ namespace DataStorm * The Cloner template provides a method to clone user types. * * The cloner template can be specialized to provide cloning for types that require special cloning. By - * defaut, the template uses plain C++ copy. + * default, the template uses plain C++ copy. * * @headerfile DataStorm/DataStorm.h */ diff --git a/cpp/src/DataStorm/Contract.ice b/cpp/src/DataStorm/Contract.ice index 90508858dfd..323c005e3a8 100644 --- a/cpp/src/DataStorm/Contract.ice +++ b/cpp/src/DataStorm/Contract.ice @@ -66,38 +66,59 @@ module DataStormContract } sequence DataSamplesSeq; + /// Provides information about an element, which can be a key, a filter, or a tag. Includes the element's ID, name, + /// and encoded value. struct ElementInfo { - /// The key or filter id. + /// The unique identifier of the element. Filter IDs are negative. long id; - /// The filter name. + /// The name of the filter. This field is empty for key and tag elements. string name; - /// The key or filter value. + /// The encoded value of the element. Ice::ByteSeq value; } sequence ElementInfoSeq; + /// Provides information about a topic, including its name and the list of active topic reader or topic writer IDs. + /// + /// There is a unique `TopicInfo` for all topic instances with the same name, representing a single logical topic. + /// Each instance has its own topic reader and topic writer, which are lazily initialized and have a unique ID. + /// + /// @see Session#announceTopics struct TopicInfo { - /// The topic name. + /// The name of the topic. string name; - /// The id of topic writers or readers. + /// The list of active topic reader or topic writer IDs for the topic. + /// + /// - In a publisher session announcing topics to a subscriber session, this contains the active topic writer + /// IDs. + /// - In a subscriber session announcing topics to a publisher session, this contains the active topic reader + /// IDs. Ice::LongSeq ids; } + + /// Represents a sequence of active topics used for transmitting topic information during session establishment. + /// + /// @see Session#announceTopics sequence TopicInfoSeq; + /// Provides detailed information about topic readers and topic writers, including its ID, name, keys, filters, + /// and tags. + /// + /// @see Session#attachTopic struct TopicSpec { - /// The id of the topic. + /// The ID of the topic. long id; /// The name of the topic. string name; - /// The topic keys or filters. + /// The topic's keys and filters. ElementInfoSeq elements; /// The topic update tags. @@ -135,6 +156,7 @@ module DataStormContract } sequence ElementDataSeq; + /// Provides detailed information about elements that can be either a key or a filter. struct ElementSpec { /// The readers and writers associated with the key or filter. @@ -143,7 +165,7 @@ module DataStormContract /// The id of the key or filter. long id; - /// The name of the filter. + /// The name of the filter. This field is empty for key elements. string name; /// The value of the key or filter. @@ -200,20 +222,39 @@ module DataStormContract interface Session { - /// Called by sessions to announce topics to the peer. A publisher session announces the topics it writes, - /// while a subscriber session announces the topics it reads. + /// Announces existing topics to the peer during session establishment. + /// A publisher session announces the topics it writes, while a subscriber session announces the topics it reads. + /// + /// The peer receiving the announcement will invoke `attachTopic` for the topics it is interested in. /// - /// @param topics The topics to announce. - /// @param initialize currently unused. + /// @param topics The sequence of topics to announce. + /// @param initialize Currently unused. + /// @see attachTopic void announceTopics(TopicInfoSeq topics, bool initialize); + /// Attaches a local topic to a remote topic when a session receives a topic announcement from a peer. + /// + /// This method is called if the session is interested in the announced topic, which occurs when: + /// - The session has a reader for a topic that the peer has a writer for, or + /// - The session has a writer for a topic that the peer has a reader for. + /// + /// @param topic The TopicSpec object describing the topic being attached to the remote topic. void attachTopic(TopicSpec topic); + void detachTopic(long topic); void attachTags(long topic, ElementInfoSeq tags, bool initialize); void detachTags(long topic, Ice::LongSeq tags); - void announceElements(long topic, ElementInfoSeq keys); + /// Announces new elements to the peer. + /// The peer will invoke `attachElements` for the elements it is interested in. The announced elements include + /// key readers, key writers, and filter readers associated with the specified topic. + /// + /// @param topic The ID of the topic associated with the elements. + /// @param elements The sequence of elements to announce. + /// @see attachElements + void announceElements(long topic, ElementInfoSeq elements); + void attachElements(long topic, ElementSpecSeq elements, bool initialize); void attachElementsAck(long topic, ElementSpecAckSeq elements); void detachElements(long topic, Ice::LongSeq keys); @@ -274,8 +315,8 @@ module DataStormContract /// Announce a topic reader. /// /// @param topic The name of the topic. - /// @param node The node reading the topic. The proxy is never null. - idempotent void announceTopicReader(string topic, Node* node); + /// @param subscriber The node reading the topic. The subscriber proxy is never null. + idempotent void announceTopicReader(string topic, Node* subscriber); /// Announce a topic writer. /// diff --git a/cpp/src/DataStorm/DataElementI.cpp b/cpp/src/DataStorm/DataElementI.cpp index fa88d300e93..d64c2b29376 100644 --- a/cpp/src/DataStorm/DataElementI.cpp +++ b/cpp/src/DataStorm/DataElementI.cpp @@ -1061,10 +1061,7 @@ KeyDataReaderI::KeyDataReaderI( out << this << ": created key reader"; } - // - // If sample filtering is enabled, ensure the updates are received using a session - // facet specific to this reader. - // + // If sample filtering is enabled, ensure the updates are received using a session facet specific to this reader. if (_config->sampleFilter) { ostringstream os; @@ -1343,10 +1340,7 @@ FilteredDataReaderI::FilteredDataReaderI( out << this << ": created filtered reader"; } - // - // If sample filtering is enabled, ensure the updates are received using a session - // facet specific to this reader. - // + // If sample filtering is enabled, ensure the updates are received using a session facet specific to this reader. if (_config->sampleFilter) { ostringstream os; diff --git a/cpp/src/DataStorm/LookupI.cpp b/cpp/src/DataStorm/LookupI.cpp index 632377b9622..fb18138f5c5 100644 --- a/cpp/src/DataStorm/LookupI.cpp +++ b/cpp/src/DataStorm/LookupI.cpp @@ -22,19 +22,27 @@ LookupI::LookupI( } void -LookupI::announceTopicReader(string name, optional proxy, const Ice::Current& current) +LookupI::announceTopicReader(string name, optional subscriber, const Ice::Current& current) { - Ice::checkNotNull(proxy, __FILE__, __LINE__, current); - _nodeSessionManager->announceTopicReader(name, *proxy, current.con); - _topicFactory->createSubscriberSession(name, *proxy, current.con); + Ice::checkNotNull(subscriber, __FILE__, __LINE__, current); + // Forward the announcement to known nodes via the node session manager. + _nodeSessionManager->announceTopicReader(name, *subscriber, current.con); + + // Notify the topic factory about the new topic reader. + // If there are any writers for the topic, the factory will create a subscriber session for it. + _topicFactory->createSubscriberSession(name, *subscriber, current.con); } void -LookupI::announceTopicWriter(string name, optional proxy, const Ice::Current& current) +LookupI::announceTopicWriter(string name, optional publisher, const Ice::Current& current) { - Ice::checkNotNull(proxy, __FILE__, __LINE__, current); - _nodeSessionManager->announceTopicWriter(name, *proxy, current.con); - _topicFactory->createPublisherSession(name, *proxy, current.con); + Ice::checkNotNull(publisher, __FILE__, __LINE__, current); + // Forward the announcement to known nodes via the node session manager. + _nodeSessionManager->announceTopicWriter(name, *publisher, current.con); + + // Notify the topic factory about the new topic writer. + // If there are any readers for the topic, the factory will create a publisher session for it. + _topicFactory->createPublisherSession(name, *publisher, current.con); } void @@ -45,8 +53,13 @@ LookupI::announceTopics( const Ice::Current& current) { Ice::checkNotNull(proxy, __FILE__, __LINE__, current); + // Forward the announcement to known nodes via the node session manager. _nodeSessionManager->announceTopics(readers, writers, *proxy, current.con); + // Notify the topic factory about the new topic readers and writers. + // The factory will create subscriber sessions for topics with matching writers and publisher sessions for topics + // with matching readers. + for (const auto& name : readers) { _topicFactory->createSubscriberSession(name, *proxy, current.con); diff --git a/cpp/src/DataStorm/NodeSessionManager.cpp b/cpp/src/DataStorm/NodeSessionManager.cpp index 3f1cd19ec9e..2f3e7e1af68 100644 --- a/cpp/src/DataStorm/NodeSessionManager.cpp +++ b/cpp/src/DataStorm/NodeSessionManager.cpp @@ -147,11 +147,17 @@ NodeSessionManager::announceTopicReader(const string& topic, NodePrx node, const auto p = _sessions.find(node->ice_getIdentity()); auto nodePrx = p != _sessions.end() ? p->second->getPublicNode() : node; + // Set the exclude connection to prevent forwarding the announcement back to the sender. _exclude = connection; + // Forward the announcement to all known nodes, including nodes with an active session and those we are connected + // to. This is a collocated, synchronous call. _forwarder->announceTopicReader(topic, nodePrx); lock.unlock(); + // Forward the announcement to the multicast lookup if: + // - It is a local announcement, or + // - It comes from a non-multicast lookup and multicast-forwarding is enabled. if (!connection || (_forwardToMulticast && connection->type() != "udp")) { auto instance = _instance.lock(); @@ -184,16 +190,20 @@ NodeSessionManager::announceTopicWriter(const string& topic, NodePrx node, const } } - _exclude = connection; auto p = _sessions.find(node->ice_getIdentity()); - if (p != _sessions.end()) - { - node = p->second->getPublicNode(); - } + auto nodePrx = p != _sessions.end() ? p->second->getPublicNode() : node; + + // Set the exclude connection to prevent forwarding the announcement back to the sender. + _exclude = connection; + // Forward the announcement to all known nodes, including nodes with an active session and those we are connected + // to. This is a collocated, synchronous call. _forwarder->announceTopicWriter(topic, node); lock.unlock(); + // Forward the announcement to the multicast lookup if: + // - It is a local announcement, or + // - It comes from a non-multicast lookup and multicast-forwarding is enabled. if (!connection || (_forwardToMulticast && connection->type() != "udp")) { auto instance = _instance.lock(); @@ -244,13 +254,20 @@ NodeSessionManager::announceTopics( } } - _exclude = connection; auto p = _sessions.find(node->ice_getIdentity()); auto nodePrx = p != _sessions.end() ? p->second->getPublicNode() : node; + + // Set the exclude connection to prevent forwarding the announcement back to the sender. + _exclude = connection; + // Forward the announcement to all known nodes, including nodes with an active session and those we are connected + // to. This is a collocated, synchronous call. _forwarder->announceTopics(readers, writers, nodePrx); lock.unlock(); + // Forward the announcement to the multicast lookup if: + // - It is a local announcement, or + // - It comes from a non-multicast lookup and multicast-forwarding is enabled. if (!connection || (_forwardToMulticast && connection->type() != "udp")) { auto instance = _instance.lock(); @@ -272,6 +289,8 @@ NodeSessionManager::getSession(const Ice::Identity& node) const void NodeSessionManager::forward(const Ice::ByteSeq& inParams, const Ice::Current& current) const { + // Called while holding the mutex lock to ensure `_exclude` is not updated concurrently. + // Forward the call to all nodes that have an active session, don't need to wait for the result. for (const auto& [_, session] : _sessions) { diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 03a12b12744..92ccdf6484d 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -130,6 +130,7 @@ SessionI::attachTopic(TopicSpec spec, const Ice::Current&) lock_guard lock(_mutex); if (!_session) { + // Ignore the session was disconnected. return; } @@ -144,8 +145,10 @@ SessionI::attachTopic(TopicSpec spec, const Ice::Current&) out << _id << ": attaching topic `" << spec << "' to `" << topic << "'"; } + // Attach local topics with the given name to the remote topic. topic->attach(spec.id, shared_from_this(), *_session); + // If the topic spec has tags, decode them and add them to the subscriber. if (!spec.tags.empty()) { auto& subscriber = _topics.at(spec.id).getSubscriber(topic.get()); @@ -156,6 +159,7 @@ SessionI::attachTopic(TopicSpec spec, const Ice::Current&) } } + // Provide the local tags to the remote topic by calling attachTagsAsync. auto tags = topic->getTags(); if (!tags.empty()) { @@ -482,6 +486,7 @@ SessionI::initSamples(int64_t topicId, DataSamplesSeq samplesSeq, const Ice::Cur s.value, s.timestamp)); } + for (auto& ks : k->getSubscribers()) { if (!ks.second.initialized) @@ -846,6 +851,7 @@ SessionI::subscribe(int64_t id, TopicI* topic) Trace out(_traceLevels, _traceLevels->sessionCat); out << _id << ": subscribed topic `" << id << "' to topic `" << topic << "'"; } + // Add the topic as a subscriber of the remote topic with the given id. _topics[id].addSubscriber(topic, _sessionInstanceId); } diff --git a/cpp/src/DataStorm/SessionI.h b/cpp/src/DataStorm/SessionI.h index 8990db725a5..33b17a8a0a4 100644 --- a/cpp/src/DataStorm/SessionI.h +++ b/cpp/src/DataStorm/SessionI.h @@ -119,6 +119,7 @@ namespace DataStormI int _sessionInstanceId; }; + /// Represents the subscription from a local topic object to a remote topic. class TopicSubscriber { public: @@ -182,11 +183,11 @@ namespace DataStormI std::map _elements; }; + // Tracks the subscribers of a remote topic using a given session instance. class TopicSubscribers { public: - TopicSubscribers() {} - + // Add the give topic as a subscriber, if a subscription already exists, update the session instance id. void addSubscriber(TopicI* topic, int sessionInstanceId) { _sessionInstanceId = sessionInstanceId; @@ -211,10 +212,12 @@ namespace DataStormI std::map& getSubscribers() { return _subscribers; } + // Determine if the subscriber should be reaped. bool reap(int sessionInstanceId) { if (sessionInstanceId != _sessionInstanceId) { + // If using a prior session instance id, we can remove all subscribers. return true; } @@ -223,6 +226,7 @@ namespace DataStormI { if (p->second.sessionInstanceId != sessionInstanceId) { + // Remove the subscriber if it is using a prior session instance id. _subscribers.erase(p++); } else @@ -230,11 +234,18 @@ namespace DataStormI ++p; } } + + // If there are no subscribers left, we can reap this object. return _subscribers.empty(); } private: + // Each entry in the map represents a subscriber to the same remote topic. + // The key is a pointer to the local topic object subscribing to the remote topic, and the TopicSubscriber + // object contains the subscription details. std::map _subscribers; + + // The session instance id for the last subscription. int _sessionInstanceId; }; @@ -307,7 +318,16 @@ namespace DataStormI void unsubscribeFromFilter(std::int64_t, std::int64_t, const std::shared_ptr&, std::int64_t); void disconnectFromFilter(std::int64_t, std::int64_t, const std::shared_ptr&, std::int64_t); - DataStormContract::LongLongDict getLastIds(std::int64_t, std::int64_t, const std::shared_ptr&); + /** + * Return a map containing the last sample IDs read by the subscribers of the given topic and key. + * + * @param topic The ID of the peer topic. + * @param key The ID of the peer key. + * @param element The data element. + */ + DataStormContract::LongLongDict + getLastIds(std::int64_t topic, std::int64_t key, const std::shared_ptr& element); + std::vector> subscriberInitialized( std::int64_t, std::int64_t, @@ -340,6 +360,8 @@ namespace DataStormI int _retryCount; IceInternal::TimerTaskPtr _retryTask; + // Keeps track of the topics that this session is subscribed to. The key represents the topic ID in the remote + // node. std::map _topics; std::unique_lock* _topicLock; diff --git a/cpp/src/DataStorm/TopicI.cpp b/cpp/src/DataStorm/TopicI.cpp index fd8b598e608..a76ba16f1d8 100644 --- a/cpp/src/DataStorm/TopicI.cpp +++ b/cpp/src/DataStorm/TopicI.cpp @@ -198,12 +198,14 @@ TopicI::getElementSpecs(int64_t topicId, const ElementInfoSeq& infos, const shar ElementSpecSeq specs; for (const auto& info : infos) { - if (info.id > 0) // Key + // Positive IDs represent keys and negative IDs represent filters. + if (info.id > 0) { auto key = _keyFactory->decode(_instance->getCommunicator(), info.value); auto p = _keyElements.find(key); if (p != _keyElements.end()) { + // If we have a matching key add it to the spec. ElementDataSeq elements; for (auto k : p->second) { @@ -211,6 +213,7 @@ TopicI::getElementSpecs(int64_t topicId, const ElementInfoSeq& infos, const shar } specs.push_back({std::move(elements), key->getId(), "", {}, info.id, ""}); } + for (auto e : _filteredElements) { if (e.first->match(key)) @@ -748,11 +751,11 @@ TopicI::disconnect() listeners.swap(_listeners); } - for (auto s : listeners) + for (const auto& [session, listener] : listeners) { - for (auto t : s.second.topics) + for (auto id : listener.topics) { - s.first->disconnect(t, this); + session->disconnect(id, this); } } @@ -797,6 +800,8 @@ TopicI::forwarderException() const void TopicI::add(const shared_ptr& element, const vector>& keys) { + assert(element); + if (keys.empty()) { addFiltered(element, alwaysMatchFilter); @@ -811,7 +816,6 @@ TopicI::add(const shared_ptr& element, const vector>()).first; } - assert(element); infos.push_back({key->getId(), "", key->encode(_instance->getCommunicator())}); p->second.insert(element); } diff --git a/cpp/src/DataStorm/TopicI.h b/cpp/src/DataStorm/TopicI.h index 2dcc702b9eb..ee20e597ac6 100644 --- a/cpp/src/DataStorm/TopicI.h +++ b/cpp/src/DataStorm/TopicI.h @@ -8,7 +8,6 @@ #include "DataElementI.h" #include "DataStorm/InternalI.h" #include "DataStorm/Types.h" -#include "ForwarderManager.h" #include "Instance.h" namespace DataStormI @@ -48,6 +47,7 @@ namespace DataStormI DataStormContract::TopicSpec getTopicSpec() const; DataStormContract::ElementInfoSeq getTags() const; + DataStormContract::ElementSpecSeq getElementSpecs(std::int64_t, const DataStormContract::ElementInfoSeq&, const std::shared_ptr&); @@ -131,11 +131,34 @@ namespace DataStormI mutable std::mutex _mutex; mutable std::condition_variable _cond; bool _destroyed; + + // A map containing the data readers or data writers for this topic. + // The map's key is a pointer returned by the topic's key factory, and the value is a set of data elements + // associated with that key. + // + // - When this class is an instance of the derived `TopicReaderI` class, the data elements are data readers. + // - When this class is an instance of the derived `TopicWriterI` class, the data elements are data writers. std::map, std::set>> _keyElements; + + // A map containing the filtered data readers for this topic. + // The map's key is a pointer returned by the topic's filter factory, and the value is a set of data elements + // representing the filtered data readers. std::map, std::set>> _filteredElements; + + // A map containing the per-session topic listeners. + // The map's key is the session servant pointer, and the value is a listener object that contains: + // - A set of remote topic IDs, and + // - The peer session proxy. std::map, Listener> _listeners; + + // A map containing the tag updaters for this topic. The tag updaters are used for partial updates. + // The map's key is a pointer returned by the topic's tag factory, and the value is the updater function. std::map, Updater> _updaters; + + // The number of connected listeners. size_t _listenerCount; + + // The number of threads waiting for a listener notification. See waitForListeners(). mutable size_t _waiters; mutable size_t _notified; std::int64_t _nextId;