Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataStorm comments & minor fixes #3212

Merged
merged 8 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions cpp/src/DataStorm/Contract.ice
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

module DataStormContract
{
/// The ClearHistoryPolicy enumeration defines the policy that determines when a reader clears its
/// DataSample history in response to various events.
/// The ClearHistoryPolicy enumeration defines the policy that determines when a reader clears its DataSample
/// history in response to various events.
enum ClearHistoryPolicy
{
/// The reader clears its history when a new DataSample is added.
Expand Down Expand Up @@ -70,7 +70,7 @@ module DataStormContract
/// and encoded value.
struct ElementInfo
{
/// The unique identifier of the element. Filter IDs are negative.
/// The ID of the element. Filter IDs are negative, while key and tag IDs are positive.
long id;

/// The name of the filter. This field is empty for key and tag elements.
Expand Down Expand Up @@ -131,15 +131,31 @@ module DataStormContract
Ice::ByteSeq criteria;
}

/// Represents the configuration of a reader or writer.
class ElementConfig(1)
{
optional(1) string facet;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

facet is missing a doc comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add one.


/// An optional sample filter associated with the reader. Sample filters are specified on the reader side.
optional(2) FilterInfo sampleFilter;

/// An optional name for the reader or writer.
optional(3) string name;

/// An optional priority for the writer.
/// See also the `DataStorm.Topic.Priority` property.
optional(4) int priority;

/// An optional sample count, specifying the number of samples queued in the writer or reader sample queue.
/// See also the `DataStorm.Topic.SampleCount` property.
optional(10) int sampleCount;

/// An optional lifetime, specified in milliseconds, representing the maximum time samples are kept in the
/// writer or reader sample queue. See also the `DataStorm.Topic.SampleLifetime` property.
optional(11) int sampleLifetime;

/// An optional clear history policy that determines when the reader or writer sample history is cleared.
/// See also the `DataStorm.Topic.ClearHistory` property.
optional(12) ClearHistoryPolicy clearHistory;
}

Expand Down Expand Up @@ -247,15 +263,22 @@ module DataStormContract
void detachTags(long topic, Ice::LongSeq tags);

/// Announces new elements to the peer.
///
/// The peer will invoke `attachElements` for the elements it is interested in. The announced elements include
/// key readers, key writers, and filter readers associated with the specified topic.
/// the readers, and writers associated with the specified topic.
pepone marked this conversation as resolved.
Show resolved Hide resolved
///
/// @param topic The ID of the topic associated with the elements.
/// @param elements The sequence of elements to announce.
/// @see attachElements
void announceElements(long topic, ElementInfoSeq elements);

void attachElements(long topic, ElementSpecSeq elements, bool initialize);
/// Attaches the given topic elements to all subscribers of the specified topic.
///
/// @param topicId The ID of the topic to which the elements belong.
/// @param elements The sequence of elements to attach to the topic's subscribers.
/// @param initialize true if called from attachTopic, false otherwise.
pepone marked this conversation as resolved.
Show resolved Hide resolved
void attachElements(long topicId, ElementSpecSeq elements, bool initialize);

void attachElementsAck(long topic, ElementSpecAckSeq elements);
void detachElements(long topic, Ice::LongSeq keys);

Expand All @@ -270,6 +293,11 @@ module DataStormContract

interface SubscriberSession extends Session
{
/// Queue a sample with the subscribers of the topic element.
///
/// @param topicId The ID of the topic.
/// @param elementId The ID of the element.
/// @param sample The sample to queue.
void s(long topicId, long elementId, DataSample sample);
}

Expand Down
74 changes: 51 additions & 23 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ namespace
{
DataSample toSample(const shared_ptr<Sample>& sample, const Ice::CommunicatorPtr& communicator, bool marshalKey)
{
return {
sample->id,
marshalKey ? 0 : sample->key->getId(),
marshalKey ? sample->key->encode(communicator) : Ice::ByteSeq{},
chrono::time_point_cast<chrono::microseconds>(sample->timestamp).time_since_epoch().count(),
sample->tag ? sample->tag->getId() : 0,
sample->event,
sample->encode(communicator)};
return DataSample{
.id = sample->id,
.keyId = marshalKey ? 0 : sample->key->getId(),
.keyValue = marshalKey ? sample->key->encode(communicator) : Ice::ByteSeq{},
.timestamp = chrono::time_point_cast<chrono::microseconds>(sample->timestamp).time_since_epoch().count(),
.tag = sample->tag ? sample->tag->getId() : 0,
.event = sample->event,
.value = sample->encode(communicator)};
Comment on lines +21 to +28
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For large structs with many fields this style, using the filed names, seems easier to read.

}

void cleanOldSamples(
Expand Down Expand Up @@ -111,8 +111,10 @@ DataElementI::attach(
auto info = *data.config->sampleFilter;
sampleFilter = _parent->getSampleFilterFactories()->decode(getCommunicator(), info.name, info.criteria);
}

string facet = data.config->facet ? *data.config->facet : string();
int priority = data.config->priority ? *data.config->priority : 0;

string name;
if (data.config->name)
{
Expand All @@ -124,14 +126,20 @@ DataElementI::attach(
os << session->getId() << '-' << topicId << '-' << data.id;
name = os.str();
}

if ((id > 0 && attachKey(topicId, data.id, key, sampleFilter, session, prx, facet, id, name, priority)) ||
(id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority)))
{
auto q = data.lastIds.find(_id);
long long lastId = q != data.lastIds.end() ? q->second : 0;
LongLongDict lastIds = key ? session->getLastIds(topicId, id, shared_from_this()) : LongLongDict();
int64_t lastId = q != data.lastIds.end() ? q->second : 0;
LongLongDict lastIds = key ? session->getLastIds(topicId, id, shared_from_this()) : LongLongDict{};
DataSamples samples = getSamples(key, sampleFilter, data.config, lastId, now);
acks.push_back({_id, _config, lastIds, samples.samples, data.id});
acks.push_back(ElementDataAck{
.id = _id,
.config = _config,
.lastIds = std::move(lastIds),
.samples = std::move(samples.samples),
Comment on lines +140 to +141
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move them, don't need to copy these containers.

.peerId = data.id});
}
}

Expand All @@ -153,8 +161,10 @@ DataElementI::attach(
auto info = *data.config->sampleFilter;
sampleFilter = _parent->getSampleFilterFactories()->decode(getCommunicator(), info.name, info.criteria);
}

string facet = data.config->facet ? *data.config->facet : string();
int priority = data.config->priority ? *data.config->priority : 0;

string name;
if (data.config->name)
{
Expand All @@ -166,6 +176,7 @@ DataElementI::attach(
os << session->getId() << '-' << topicId << '-' << data.id;
name = os.str();
}

if ((id > 0 && attachKey(topicId, data.id, key, sampleFilter, session, prx, facet, id, name, priority)) ||
(id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority)))
{
Expand All @@ -178,7 +189,7 @@ DataElementI::attach(
session->subscriberInitialized(topicId, id > 0 ? data.id : -data.id, data.samples, key, shared_from_this());
if (!samplesI.empty())
{
return [=, self = shared_from_this()]()
return [=, samplesI = std::move(samplesI), self = shared_from_this()]()
Copy link
Member Author

@pepone pepone Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the previous code, captures samplesI as a copy, which is them passed by reference to initSamples.

I updated it to move capture it because it was not used anywhere else.

{ self->initSamples(samplesI, topicId, data.id, priority, now, id < 0); };
}
return nullptr;
Expand All @@ -198,14 +209,17 @@ DataElementI::attachKey(
int priority)
{
// No locking necessary, called by the session with the mutex locked
auto p = _listeners.find({session, facet});

ListenerKey listenerKey{session, facet};
auto p = _listeners.find(listenerKey);
if (p == _listeners.end())
{
p = _listeners.emplace(ListenerKey{session, facet}, Listener(std::move(prx), facet)).first;
p = _listeners.emplace(std::move(listenerKey), Listener(std::move(prx), facet)).first;
}

bool added = false;
auto subscriber = p->second.addOrGet(topicId, elementId, keyId, nullptr, sampleFilter, name, priority, added);

if (_onConnectedElements && added)
{
_executor->queue([self = shared_from_this(), name]
Expand Down Expand Up @@ -634,7 +648,7 @@ DataElementI::forward(const Ice::ByteSeq& inParams, const Ice::Current& current)
{
for (const auto& [_, listener] : _listeners)
{
// If there's at least one subscriber interested in the update
// If we are forwarding a sample check if at least once of the listeners is interested in the sample.
if (!_sample || listener.matchOne(_sample, false))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The !_sample check is there because this also forwards other operations. _sample is only set for SubscriberSession::s when sending a sample.

{
// Forward the call using the listener's session proxy don't need to wait for the result.
Expand Down Expand Up @@ -1221,26 +1235,39 @@ KeyDataWriterI::getSamples(
int64_t lastId,
const chrono::time_point<chrono::system_clock>& now)
{
// Collect all queued samples that match the key and sample filter, are newer than the lastId, and are not stale.
DataSamples samples;
samples.id = _keys.empty() ? -_id : _id;

// If the caller sample queueing is disabled, there is no need to return any samples.
if (config->sampleCount && *config->sampleCount == 0)
{
return samples;
}

// Reap stale samples before collecting any samples.
if (_config->sampleLifetime && *_config->sampleLifetime > 0)
{
cleanOldSamples(_samples, now, *_config->sampleLifetime);
}

// Compute the stale time, according to the callers sample lifetime configuration.
chrono::time_point<chrono::system_clock> staleTime = chrono::time_point<chrono::system_clock>::min();
if (config->sampleLifetime && *config->sampleLifetime > 0)
{
staleTime = now - chrono::milliseconds(*config->sampleLifetime);
}

shared_ptr<Sample> first;
// Iterate through samples in reverse chronological order, starting with the newest.
// Stop iterating if any of the following conditions are met:
// - A sample's timestamp is older than the specified stale time.
// - A sample's ID is less than or equal to the specified last ID.
// - The requested number of samples has been collected.
// - A sample event triggers history clearing based on the caller's clear history policy.
// For each sample:
// - Check if it matches the optional key and sample filter.
// - If it matches, add it to the result set and update the first matched sample.
for (auto p = _samples.rbegin(); p != _samples.rend(); ++p)
{
if ((*p)->timestamp < staleTime)
Expand Down Expand Up @@ -1274,19 +1301,20 @@ KeyDataWriterI::getSamples(
}
}
}

if (!samples.samples.empty())
{
// If the first sample is a partial update, transform it to a full Update
if (first->event == DataStorm::SampleEvent::PartialUpdate)
{
samples.samples[0] = {
first->id,
samples.samples[0].keyId,
samples.samples[0].keyValue,
chrono::time_point_cast<chrono::microseconds>(first->timestamp).time_since_epoch().count(),
0,
DataStorm::SampleEvent::Update,
first->encodeValue(getCommunicator())};
samples.samples[0] = DataSample{
.id = first->id,
.keyId = samples.samples[0].keyId,
.keyValue = samples.samples[0].keyValue,
.timestamp = chrono::time_point_cast<chrono::microseconds>(first->timestamp).time_since_epoch().count(),
.tag = 0,
.event = DataStorm::SampleEvent::Update,
.value = first->encodeValue(getCommunicator())};
}
}
return samples;
Expand Down
29 changes: 22 additions & 7 deletions cpp/src/DataStorm/DataElementI.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,29 @@ namespace DataStormI

struct Listener
{
Listener(DataStormContract::SessionPrx proxy, const std::string& facet)
: proxy(facet.empty() ? proxy : proxy->ice_facet<DataStormContract::SessionPrx>(facet))
Listener(DataStormContract::SessionPrx proxy, std::string facet)
: proxy(
facet.empty() ? std::move(proxy)
: proxy->ice_facet<DataStormContract::SessionPrx>(std::move(facet)))
{
}

/**
* Determines if any subscriber matches the given sample.
*
* @param sample The sample to evaluate against the subscribers.
* @param matchKey If true, the sample's key is matched against subscriber keys.
* If false, the key match is skipped.
* @return True if at least one subscriber matches the sample, otherwise false.
*/
bool matchOne(const std::shared_ptr<Sample>& sample, bool matchKey) const
{
for (const auto& s : subscribers)
for (const auto& [_, subscriber] : subscribers)
{
if ((!matchKey || s.second->keys.empty() ||
s.second->keys.find(sample->key) != s.second->keys.end()) &&
(!s.second->filter || s.second->filter->match(sample->key)) &&
(!s.second->sampleFilter || s.second->sampleFilter->match(sample)))
if ((!matchKey || subscriber->keys.empty() ||
subscriber->keys.find(sample->key) != subscriber->keys.end()) &&
(!subscriber->filter || subscriber->filter->match(sample->key)) &&
(!subscriber->sampleFilter || subscriber->sampleFilter->match(sample)))
{
return true;
}
Expand Down Expand Up @@ -129,7 +139,9 @@ namespace DataStormI
return subscribers.empty();
}

// The proxy to the peer session.
DataStormContract::SessionPrx proxy;
// A map containing the data element subscribers, indexed by the topic ID and the element ID.
std::map<std::pair<std::int64_t, std::int64_t>, std::shared_ptr<Subscriber>> subscribers;
};

Expand Down Expand Up @@ -267,6 +279,9 @@ namespace DataStormI
mutable std::shared_ptr<Sample> _sample;
DataStormContract::SessionPrx _forwarder;
std::map<std::shared_ptr<Key>, std::vector<std::shared_ptr<Subscriber>>> _connectedKeys;

// A map containing the element listeners, indexed by the session servant and the target facet. The
// implementation of forward utilizes the listener map to forward calls to the peer sessions.
std::map<ListenerKey, Listener> _listeners;

private:
Expand Down
Loading
Loading