-
Notifications
You must be signed in to change notification settings - Fork 593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DataStorm comments & minor fixes #3212
Changes from 1 commit
47e4713
67c6d92
f7c1174
e7a8f78
f4fec02
781c3be
1793c9f
d61c5fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,14 @@ namespace | |
{ | ||
DataSample toSample(const shared_ptr<Sample>& sample, const Ice::CommunicatorPtr& communicator, bool marshalKey) | ||
{ | ||
return { | ||
sample->id, | ||
marshalKey ? 0 : sample->key->getId(), | ||
marshalKey ? sample->key->encode(communicator) : Ice::ByteSeq{}, | ||
chrono::time_point_cast<chrono::microseconds>(sample->timestamp).time_since_epoch().count(), | ||
sample->tag ? sample->tag->getId() : 0, | ||
sample->event, | ||
sample->encode(communicator)}; | ||
return DataSample{ | ||
.id = sample->id, | ||
.keyId = marshalKey ? 0 : sample->key->getId(), | ||
.keyValue = marshalKey ? sample->key->encode(communicator) : Ice::ByteSeq{}, | ||
.timestamp = chrono::time_point_cast<chrono::microseconds>(sample->timestamp).time_since_epoch().count(), | ||
.tag = sample->tag ? sample->tag->getId() : 0, | ||
.event = sample->event, | ||
.value = sample->encode(communicator)}; | ||
Comment on lines
+21
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For large structs with many fields this style, using the filed names, seems easier to read. |
||
} | ||
|
||
void cleanOldSamples( | ||
|
@@ -111,8 +111,10 @@ DataElementI::attach( | |
auto info = *data.config->sampleFilter; | ||
sampleFilter = _parent->getSampleFilterFactories()->decode(getCommunicator(), info.name, info.criteria); | ||
} | ||
|
||
string facet = data.config->facet ? *data.config->facet : string(); | ||
int priority = data.config->priority ? *data.config->priority : 0; | ||
|
||
string name; | ||
if (data.config->name) | ||
{ | ||
|
@@ -124,14 +126,20 @@ DataElementI::attach( | |
os << session->getId() << '-' << topicId << '-' << data.id; | ||
name = os.str(); | ||
} | ||
|
||
if ((id > 0 && attachKey(topicId, data.id, key, sampleFilter, session, prx, facet, id, name, priority)) || | ||
(id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority))) | ||
{ | ||
auto q = data.lastIds.find(_id); | ||
long long lastId = q != data.lastIds.end() ? q->second : 0; | ||
LongLongDict lastIds = key ? session->getLastIds(topicId, id, shared_from_this()) : LongLongDict(); | ||
int64_t lastId = q != data.lastIds.end() ? q->second : 0; | ||
LongLongDict lastIds = key ? session->getLastIds(topicId, id, shared_from_this()) : LongLongDict{}; | ||
DataSamples samples = getSamples(key, sampleFilter, data.config, lastId, now); | ||
acks.push_back({_id, _config, lastIds, samples.samples, data.id}); | ||
acks.push_back(ElementDataAck{ | ||
.id = _id, | ||
.config = _config, | ||
.lastIds = std::move(lastIds), | ||
.samples = std::move(samples.samples), | ||
Comment on lines
+140
to
+141
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can move them, don't need to copy these containers. |
||
.peerId = data.id}); | ||
} | ||
} | ||
|
||
|
@@ -153,8 +161,10 @@ DataElementI::attach( | |
auto info = *data.config->sampleFilter; | ||
sampleFilter = _parent->getSampleFilterFactories()->decode(getCommunicator(), info.name, info.criteria); | ||
} | ||
|
||
string facet = data.config->facet ? *data.config->facet : string(); | ||
int priority = data.config->priority ? *data.config->priority : 0; | ||
|
||
string name; | ||
if (data.config->name) | ||
{ | ||
|
@@ -166,6 +176,7 @@ DataElementI::attach( | |
os << session->getId() << '-' << topicId << '-' << data.id; | ||
name = os.str(); | ||
} | ||
|
||
if ((id > 0 && attachKey(topicId, data.id, key, sampleFilter, session, prx, facet, id, name, priority)) || | ||
(id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority))) | ||
{ | ||
|
@@ -178,7 +189,7 @@ DataElementI::attach( | |
session->subscriberInitialized(topicId, id > 0 ? data.id : -data.id, data.samples, key, shared_from_this()); | ||
if (!samplesI.empty()) | ||
{ | ||
return [=, self = shared_from_this()]() | ||
return [=, samplesI = std::move(samplesI), self = shared_from_this()]() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that the previous code, captures I updated it to move capture it because it was not used anywhere else. |
||
{ self->initSamples(samplesI, topicId, data.id, priority, now, id < 0); }; | ||
} | ||
return nullptr; | ||
|
@@ -198,14 +209,17 @@ DataElementI::attachKey( | |
int priority) | ||
{ | ||
// No locking necessary, called by the session with the mutex locked | ||
auto p = _listeners.find({session, facet}); | ||
|
||
ListenerKey listenerKey{session, facet}; | ||
auto p = _listeners.find(listenerKey); | ||
if (p == _listeners.end()) | ||
{ | ||
p = _listeners.emplace(ListenerKey{session, facet}, Listener(std::move(prx), facet)).first; | ||
p = _listeners.emplace(std::move(listenerKey), Listener(std::move(prx), facet)).first; | ||
} | ||
|
||
bool added = false; | ||
auto subscriber = p->second.addOrGet(topicId, elementId, keyId, nullptr, sampleFilter, name, priority, added); | ||
|
||
if (_onConnectedElements && added) | ||
{ | ||
_executor->queue([self = shared_from_this(), name] | ||
|
@@ -634,7 +648,7 @@ DataElementI::forward(const Ice::ByteSeq& inParams, const Ice::Current& current) | |
{ | ||
for (const auto& [_, listener] : _listeners) | ||
{ | ||
// If there's at least one subscriber interested in the update | ||
// If we are forwarding a sample check if at least once of the listeners is interested in the sample. | ||
if (!_sample || listener.matchOne(_sample, false)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
{ | ||
// Forward the call using the listener's session proxy don't need to wait for the result. | ||
|
@@ -1221,26 +1235,39 @@ KeyDataWriterI::getSamples( | |
int64_t lastId, | ||
const chrono::time_point<chrono::system_clock>& now) | ||
{ | ||
// Collect all queued samples that match the key and sample filter, are newer than the lastId, and are not stale. | ||
DataSamples samples; | ||
samples.id = _keys.empty() ? -_id : _id; | ||
|
||
// If the caller sample queueing is disabled, there is no need to return any samples. | ||
if (config->sampleCount && *config->sampleCount == 0) | ||
{ | ||
return samples; | ||
} | ||
|
||
// Reap stale samples before collecting any samples. | ||
if (_config->sampleLifetime && *_config->sampleLifetime > 0) | ||
{ | ||
cleanOldSamples(_samples, now, *_config->sampleLifetime); | ||
} | ||
|
||
// Compute the stale time, according to the callers sample lifetime configuration. | ||
chrono::time_point<chrono::system_clock> staleTime = chrono::time_point<chrono::system_clock>::min(); | ||
if (config->sampleLifetime && *config->sampleLifetime > 0) | ||
{ | ||
staleTime = now - chrono::milliseconds(*config->sampleLifetime); | ||
} | ||
|
||
shared_ptr<Sample> first; | ||
// Iterate through samples in reverse chronological order, starting with the newest. | ||
// Stop iterating if any of the following conditions are met: | ||
// - A sample's timestamp is older than the specified stale time. | ||
// - A sample's ID is less than or equal to the specified last ID. | ||
// - The requested number of samples has been collected. | ||
// - A sample event triggers history clearing based on the caller's clear history policy. | ||
// For each sample: | ||
// - Check if it matches the optional key and sample filter. | ||
// - If it matches, add it to the result set and update the first matched sample. | ||
for (auto p = _samples.rbegin(); p != _samples.rend(); ++p) | ||
{ | ||
if ((*p)->timestamp < staleTime) | ||
|
@@ -1274,19 +1301,20 @@ KeyDataWriterI::getSamples( | |
} | ||
} | ||
} | ||
|
||
if (!samples.samples.empty()) | ||
{ | ||
// If the first sample is a partial update, transform it to a full Update | ||
if (first->event == DataStorm::SampleEvent::PartialUpdate) | ||
{ | ||
samples.samples[0] = { | ||
first->id, | ||
samples.samples[0].keyId, | ||
samples.samples[0].keyValue, | ||
chrono::time_point_cast<chrono::microseconds>(first->timestamp).time_since_epoch().count(), | ||
0, | ||
DataStorm::SampleEvent::Update, | ||
first->encodeValue(getCommunicator())}; | ||
samples.samples[0] = DataSample{ | ||
.id = first->id, | ||
.keyId = samples.samples[0].keyId, | ||
.keyValue = samples.samples[0].keyValue, | ||
.timestamp = chrono::time_point_cast<chrono::microseconds>(first->timestamp).time_since_epoch().count(), | ||
.tag = 0, | ||
.event = DataStorm::SampleEvent::Update, | ||
.value = first->encodeValue(getCommunicator())}; | ||
} | ||
} | ||
return samples; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
facet is missing a doc comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add one.