diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 80b333369fb..e32f3e00afa 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -1134,28 +1134,45 @@ SessionI::subscriberInitialized( out << _id << ": initialized '" << element << "' from 'e" << elementId << '@' << topicId << "'"; } elementSubscriber->initialized = true; - elementSubscriber->lastId = samples.empty() ? 0 : samples.back().id; - vector> samplesI; - samplesI.reserve(samples.size()); - auto sampleFactory = element->getTopic()->getSampleFactory(); - auto keyFactory = element->getTopic()->getKeyFactory(); - for (const auto& sample : samples) + // If the samples collection is empty, the element subscriber's lastId remains unchanged: + // - If no samples have been received, lastId is 0. + // - If the element subscriber has been initialized before, lastId represents the ID of the latest received sample. + // + // If the samples collection is not empty: + // - It contains samples queued in the peer writer for the element that are valid according to the element's + // configuration. + // - These samples have not yet been processed by the element subscriber, according to the subscriber's lastId. + if (samples.empty()) { - assert((!key && !sample.keyValue.empty()) || key == subscriber.keys[sample.keyId].first); - - samplesI.push_back(sampleFactory->create( - _id, - elementSubscribers->name, - sample.id, - sample.event, - key ? key : keyFactory->decode(_instance->getCommunicator(), sample.keyValue), - subscriber.tags[sample.tag], - sample.value, - sample.timestamp)); - assert(samplesI.back()->key); + return {}; + } + else + { + assert(samples.front().id > elementSubscriber->lastId); + elementSubscriber->lastId = samples.back().id; + + vector> samplesI; + samplesI.reserve(samples.size()); + auto sampleFactory = element->getTopic()->getSampleFactory(); + auto keyFactory = element->getTopic()->getKeyFactory(); + for (const auto& sample : samples) + { + assert((!key && !sample.keyValue.empty()) || key == subscriber.keys[sample.keyId].first); + + samplesI.push_back(sampleFactory->create( + _id, + elementSubscribers->name, + sample.id, + sample.event, + key ? key : keyFactory->decode(_instance->getCommunicator(), sample.keyValue), + subscriber.tags[sample.tag], + sample.value, + sample.timestamp)); + assert(samplesI.back()->key); + } + return samplesI; } - return samplesI; } void diff --git a/cpp/test/DataStorm/reliability/Reader.cpp b/cpp/test/DataStorm/reliability/Reader.cpp index 0b0856ef001..d6fdf4b76cc 100644 --- a/cpp/test/DataStorm/reliability/Reader.cpp +++ b/cpp/test/DataStorm/reliability/Reader.cpp @@ -57,7 +57,7 @@ void ::Reader::run(int argc, char* argv[]) auto connection = node.getSessionConnection(sample.getSession()); while (!connection) { - this_thread::sleep_for(chrono::milliseconds(200)); + this_thread::sleep_for(chrono::milliseconds(10)); connection = node.getSessionConnection(sample.getSession()); } connection->close().get(); @@ -68,6 +68,62 @@ void ::Reader::run(int argc, char* argv[]) writer.update(0); writer.waitForNoReaders(); } + + { + Topic topic(node, "int2"); + auto reader = makeSingleKeyReader(topic, "element", "", config); + string session; + + // Read 100 samples from the "element" key and close the connection. + for (int i = 0; i < 100; ++i) + { + auto sample = reader.getNextUnread(); + if (sample.getValue() != i) + { + cerr << "unexpected sample: " << sample.getValue() << " expected:" << i << endl; + test(false); + } + session = sample.getSession(); + } + + auto connection = node.getSessionConnection(session); + test(connection); + connection->close().get(); + + // Send a sample to the writer on "reader_barrier" to let it know that the connection was closed. + // The writer will read it after the session is reestablished. + auto writerB = makeSingleKeyWriter(topic, "reader_barrier"); + writerB.waitForReaders(); + writerB.update(0); + + // Wait for the writer to acknowledge the sample send on "reader_barrier" and close the connection again. + auto readerB = makeSingleKeyReader(topic, "writer_barrier"); + [[maybe_unused]] auto _ = readerB.getNextUnread(); + + // Session was reestablished; close it again. + connection = node.getSessionConnection(session); + test(connection); + connection->close().get(); + + // Let the writer know the connection was closed again, and that it can proceed with the second batch of + // samples. + writerB.update(0); + + for (int i = 0; i < 100; ++i) + { + auto sample = reader.getNextUnread(); + if (sample.getValue() != i + 100) + { + cerr << "unexpected sample: " << sample.getValue() << " expected:" << (i + 100) << endl; + test(false); + } + } + + // Let the writer know we have processed all samples. + writerB.waitForReaders(); + writerB.update(0); + writerB.waitForNoReaders(); + } } DEFINE_TEST(::Reader) diff --git a/cpp/test/DataStorm/reliability/Writer.cpp b/cpp/test/DataStorm/reliability/Writer.cpp index e0517fd641b..01b0a5a406e 100644 --- a/cpp/test/DataStorm/reliability/Writer.cpp +++ b/cpp/test/DataStorm/reliability/Writer.cpp @@ -54,6 +54,43 @@ void ::Writer::run(int argc, char* argv[]) [[maybe_unused]] auto _ = makeSingleKeyReader(topic, "barrier").getNextUnread(); } cout << "ok" << endl; + + // Publish a batch of samples to a topic's key, follow by two consecutive session recovery events without writer + // activity on the given key. + // Then send a second batch of samples to the same topic's key and ensure the reader continue reading from when it + // left off. + cout << "testing reader multiple connection closure without writer activity... " << flush; + { + Topic topic(node, "int2"); + auto writer = makeSingleKeyWriter(topic, "element", "", config); + writer.waitForReaders(); + for (int i = 0; i < 100; ++i) + { + writer.update(i); + } + + auto readerB = makeSingleKeyReader(topic, "reader_barrier"); + + // A control sample sent by the reader to let the writer know the connection was closed. The writer processes + // this sample after the first session reestablishment. + auto sample = readerB.getNextUnread(); + + // Send a control sample to let the reader know session was reestablished. + auto writerB = makeSingleKeyWriter(topic, "writer_barrier"); + writerB.update(0); + + // Wait for a second control sample from the reader indicating the second session closure. The writer process + // this sample after the second session reestablishment. + sample = readerB.getNextUnread(); + + // Session has been reestablish twice without activity in "element" key. Send the second batch of samples. + for (int i = 0; i < 100; ++i) + { + writer.update(i + 100); + } + sample = readerB.getNextUnread(); + } + cout << "ok" << endl; } DEFINE_TEST(::Writer)