Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataStorm doc & impl comments #3204

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions cpp/include/DataStorm/DataStorm.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,17 @@ namespace DataStorm
bool hasWriters() const noexcept;

/**
* Wait for given number of writers to be online. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for given number of writers to be online.
*
* @param count The number of writers to wait.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForWriters(unsigned int count = 1) const;

/**
* Wait for readers to be offline. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for writers to be offline.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForNoWriters() const;

Expand All @@ -266,8 +267,9 @@ namespace DataStorm
std::vector<Sample<Key, Value, UpdateTag>> getAllUnread() noexcept;

/**
* Wait for given number of unread samples to be available. The node shutdown will cause this method to
* raise NodeShutdownException.
* Wait for given number of unread samples to be available.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForUnread(unsigned int count = 1) const;

Expand All @@ -279,10 +281,10 @@ namespace DataStorm
bool hasUnread() const noexcept;

/**
* Returns the next unread sample. The node shutdown will cause this method to raise
* NodeShutdownException.
* Returns the next unread sample.
*
* @return The unread sample.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
Sample<Key, Value, UpdateTag> getNextUnread();

Expand Down Expand Up @@ -384,15 +386,17 @@ namespace DataStorm
bool hasReaders() const noexcept;

/**
* Wait for given number of readers to be online. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for given number of readers to be online.
*
* @param count The number of readers to wait.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForReaders(unsigned int count = 1) const;

/**
* Wait for readers to be offline. The node shutdown this method to raise NodeShutdownException.
* Wait for readers to be offline.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForNoReaders() const;

Expand All @@ -411,9 +415,10 @@ namespace DataStorm
std::vector<Key> getConnectedKeys() const noexcept;

/**
* Get the last written sample. If there's no sample, the std::logic_error exception is raised.
* Get the last written sample.
*
* @return The last written sample.
* @throws std::logic_error If there's no sample.
**/
Sample<Key, Value, UpdateTag> getLast();

Expand Down Expand Up @@ -549,16 +554,17 @@ namespace DataStorm
bool hasWriters() const noexcept;

/**
* Wait for given number of data writers to be online. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for given number of data writers to be online.
*
* @param count The number of date writers to wait.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForWriters(unsigned int count = 1) const;

/**
* Wait for data writers to be offline. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for data writers to be offline.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForNoWriters() const;

Expand All @@ -577,16 +583,17 @@ namespace DataStorm
bool hasReaders() const noexcept;

/**
* Wait for given number of data readers to be online. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for given number of data readers to be online.
*
* @param count The number of data readers to wait.
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForReaders(unsigned int count = 1) const;

/**
* Wait for data readers to be offline. The node shutdown will cause this method to raise
* NodeShutdownException.
* Wait for data readers to be offline.
*
* @throws NodeShutdownException If the node is shut down while waiting.
*/
void waitForNoReaders() const;

Expand Down
7 changes: 2 additions & 5 deletions cpp/include/DataStorm/InternalT.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,8 @@ namespace DataStormI
return k;
}

//
// The key is being removed concurrently by the deleter, remove it now
// to allow the insertion of a new key. The deleter won't remove the
// new key.
//
// The key is being removed concurrently by the deleter, remove it now to allow the insertion of a new
// key. The deleter won't remove the new key.
_elements.erase(p);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/include/DataStorm/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ namespace DataStorm
* The Cloner template provides a method to clone user types.
*
* The cloner template can be specialized to provide cloning for types that require special cloning. By
* defaut, the template uses plain C++ copy.
* default, the template uses plain C++ copy.
*
* @headerfile DataStorm/DataStorm.h
*/
Expand Down
71 changes: 56 additions & 15 deletions cpp/src/DataStorm/Contract.ice
Original file line number Diff line number Diff line change
Expand Up @@ -66,38 +66,59 @@ module DataStormContract
}
sequence<DataSamples> DataSamplesSeq;

/// Provides information about an element, which can be a key, a filter, or a tag. Includes the element's ID, name,
/// and encoded value.
struct ElementInfo
{
/// The key or filter id.
/// The unique identifier of the element. Filter IDs are negative.
long id;

/// The filter name.
/// The name of the filter. This field is empty for key and tag elements.
string name;

/// The key or filter value.
/// The encoded value of the element.
Ice::ByteSeq value;
}
sequence<ElementInfo> ElementInfoSeq;

/// Provides information about a topic, including its name and the list of active topic reader or topic writer IDs.
///
/// There is a unique `TopicInfo` for all topic instances with the same name, representing a single logical topic.
/// Each instance has its own topic reader and topic writer, which are lazily initialized and have a unique ID.
///
/// @see Session#announceTopics
struct TopicInfo
{
/// The topic name.
/// The name of the topic.
string name;

/// The id of topic writers or readers.
/// The list of active topic reader or topic writer IDs for the topic.
///
/// - In a publisher session announcing topics to a subscriber session, this contains the active topic writer
/// IDs.
/// - In a subscriber session announcing topics to a publisher session, this contains the active topic reader
/// IDs.
Ice::LongSeq ids;
}

/// Represents a sequence of active topics used for transmitting topic information during session establishment.
///
/// @see Session#announceTopics
sequence<TopicInfo> TopicInfoSeq;

/// Provides detailed information about topic readers and topic writers, including its ID, name, keys, filters,
/// and tags.
///
/// @see Session#attachTopic
struct TopicSpec
{
/// The id of the topic.
/// The ID of the topic.
long id;

/// The name of the topic.
string name;

/// The topic keys or filters.
/// The topic's keys and filters.
ElementInfoSeq elements;

/// The topic update tags.
Expand Down Expand Up @@ -135,6 +156,7 @@ module DataStormContract
}
sequence<ElementData> ElementDataSeq;

/// Provides detailed information about elements that can be either a key or a filter.
struct ElementSpec
{
/// The readers and writers associated with the key or filter.
Expand All @@ -143,7 +165,7 @@ module DataStormContract
/// The id of the key or filter.
long id;

/// The name of the filter.
/// The name of the filter. This field is empty for key elements.
string name;

/// The value of the key or filter.
Expand Down Expand Up @@ -200,20 +222,39 @@ module DataStormContract

interface Session
{
/// Called by sessions to announce topics to the peer. A publisher session announces the topics it writes,
/// while a subscriber session announces the topics it reads.
/// Announces existing topics to the peer during session establishment.
/// A publisher session announces the topics it writes, while a subscriber session announces the topics it reads.
///
/// The peer receiving the announcement will invoke `attachTopic` for the topics it is interested in.
///
/// @param topics The topics to announce.
/// @param initialize currently unused.
/// @param topics The sequence of topics to announce.
/// @param initialize Currently unused.
/// @see attachTopic
void announceTopics(TopicInfoSeq topics, bool initialize);

/// Attaches a local topic to a remote topic when a session receives a topic announcement from a peer.
///
/// This method is called if the session is interested in the announced topic, which occurs when:
/// - The session has a reader for a topic that the peer has a writer for, or
/// - The session has a writer for a topic that the peer has a reader for.
///
/// @param topic The TopicSpec object describing the topic being attached to the remote topic.
void attachTopic(TopicSpec topic);

void detachTopic(long topic);

void attachTags(long topic, ElementInfoSeq tags, bool initialize);
void detachTags(long topic, Ice::LongSeq tags);

void announceElements(long topic, ElementInfoSeq keys);
/// Announces new elements to the peer.
/// The peer will invoke `attachElements` for the elements it is interested in. The announced elements include
/// key readers, key writers, and filter readers associated with the specified topic.
///
/// @param topic The ID of the topic associated with the elements.
/// @param elements The sequence of elements to announce.
/// @see attachElements
void announceElements(long topic, ElementInfoSeq elements);

void attachElements(long topic, ElementSpecSeq elements, bool initialize);
void attachElementsAck(long topic, ElementSpecAckSeq elements);
void detachElements(long topic, Ice::LongSeq keys);
Expand Down Expand Up @@ -274,8 +315,8 @@ module DataStormContract
/// Announce a topic reader.
///
/// @param topic The name of the topic.
/// @param node The node reading the topic. The proxy is never null.
idempotent void announceTopicReader(string topic, Node* node);
/// @param subscriber The node reading the topic. The subscriber proxy is never null.
idempotent void announceTopicReader(string topic, Node* subscriber);

/// Announce a topic writer.
///
Expand Down
10 changes: 2 additions & 8 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,10 +1061,7 @@ KeyDataReaderI::KeyDataReaderI(
out << this << ": created key reader";
}

//
// If sample filtering is enabled, ensure the updates are received using a session
// facet specific to this reader.
//
// If sample filtering is enabled, ensure the updates are received using a session facet specific to this reader.
if (_config->sampleFilter)
{
ostringstream os;
Expand Down Expand Up @@ -1343,10 +1340,7 @@ FilteredDataReaderI::FilteredDataReaderI(
out << this << ": created filtered reader";
}

//
// If sample filtering is enabled, ensure the updates are received using a session
// facet specific to this reader.
//
// If sample filtering is enabled, ensure the updates are received using a session facet specific to this reader.
if (_config->sampleFilter)
{
ostringstream os;
Expand Down
29 changes: 21 additions & 8 deletions cpp/src/DataStorm/LookupI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@ LookupI::LookupI(
}

void
LookupI::announceTopicReader(string name, optional<NodePrx> proxy, const Ice::Current& current)
LookupI::announceTopicReader(string name, optional<NodePrx> subscriber, const Ice::Current& current)
{
Ice::checkNotNull(proxy, __FILE__, __LINE__, current);
_nodeSessionManager->announceTopicReader(name, *proxy, current.con);
_topicFactory->createSubscriberSession(name, *proxy, current.con);
Ice::checkNotNull(subscriber, __FILE__, __LINE__, current);
// Forward the announcement to known nodes via the node session manager.
_nodeSessionManager->announceTopicReader(name, *subscriber, current.con);

// Notify the topic factory about the new topic reader.
// If there are any writers for the topic, the factory will create a subscriber session for it.
_topicFactory->createSubscriberSession(name, *subscriber, current.con);
}

void
LookupI::announceTopicWriter(string name, optional<NodePrx> proxy, const Ice::Current& current)
LookupI::announceTopicWriter(string name, optional<NodePrx> publisher, const Ice::Current& current)
{
Ice::checkNotNull(proxy, __FILE__, __LINE__, current);
_nodeSessionManager->announceTopicWriter(name, *proxy, current.con);
_topicFactory->createPublisherSession(name, *proxy, current.con);
Ice::checkNotNull(publisher, __FILE__, __LINE__, current);
// Forward the announcement to known nodes via the node session manager.
_nodeSessionManager->announceTopicWriter(name, *publisher, current.con);

// Notify the topic factory about the new topic writer.
// If there are any readers for the topic, the factory will create a publisher session for it.
_topicFactory->createPublisherSession(name, *publisher, current.con);
}

void
Expand All @@ -45,8 +53,13 @@ LookupI::announceTopics(
const Ice::Current& current)
{
Ice::checkNotNull(proxy, __FILE__, __LINE__, current);
// Forward the announcement to known nodes via the node session manager.
_nodeSessionManager->announceTopics(readers, writers, *proxy, current.con);

// Notify the topic factory about the new topic readers and writers.
// The factory will create subscriber sessions for topics with matching writers and publisher sessions for topics
// with matching readers.

for (const auto& name : readers)
{
_topicFactory->createSubscriberSession(name, *proxy, current.con);
Expand Down
Loading