Skip to content

Commit

Permalink
Session reestablishment test
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone committed Dec 20, 2024
1 parent 676ea4f commit e8c112e
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
67 changes: 66 additions & 1 deletion cpp/test/DataStorm/reliability/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void ::Reader::run(int argc, char* argv[])
auto connection = node.getSessionConnection(sample.getSession());
while (!connection)
{
this_thread::sleep_for(chrono::milliseconds(200));
this_thread::sleep_for(chrono::milliseconds(10));
connection = node.getSessionConnection(sample.getSession());
}
connection->close().get();
Expand All @@ -68,6 +68,71 @@ void ::Reader::run(int argc, char* argv[])
writer.update(0);
writer.waitForNoReaders();
}

{
Topic<string, int> topic(node, "int2");
auto reader = makeSingleKeyReader(topic, "element", "", config);
string session;

// Read 100 samples from the "element" key and close the connection.
for (int i = 0; i < 100; ++i)
{
auto sample = reader.getNextUnread();
if (sample.getValue() != i)
{
cerr << "unexpected sample: " << sample.getValue() << " expected:" << i << endl;
test(false);
}
session = sample.getSession();
}

auto connection = node.getSessionConnection(session);
while (!connection)
{
this_thread::sleep_for(chrono::milliseconds(10));
connection = node.getSessionConnection(session);
}
connection->close().get();

// Send a sample to the writer on "reader_barrier" to let it know that the connection was closed.
// The writer will read it after the session is reestablished.
auto writerB = makeSingleKeyWriter(topic, "reader_barrier");
writerB.waitForReaders();
writerB.update(0);

// Wait for the writer to acknowledge the sample send on "reader_barrier" and close the connection again.
auto readerB = makeSingleKeyReader(topic, "writer_barrier");
[[maybe_unused]] auto _ = readerB.getNextUnread();

// Session was reestablish close again
connection = node.getSessionConnection(session);
while (!connection)
{
this_thread::sleep_for(chrono::milliseconds(10));
connection = node.getSessionConnection(session);
}
connection->close().get();

// Let the writer know the connection was closed again, and that it can proceed with the second batch of
// samples.
writerB.update(0);

for (int i = 0; i < 100; ++i)
{
auto sample = reader.getNextUnread();
if (sample.getValue() != i + 100)
{
cerr << "unexpected sample: " << sample.getValue() << " expected:" << (i + 100) << endl;
test(false);
}
session = sample.getSession();
}

// Let the writer know we have processed all samples.
writerB.waitForReaders();
writerB.update(0);
writerB.waitForNoReaders();
}
}

DEFINE_TEST(::Reader)
37 changes: 37 additions & 0 deletions cpp/test/DataStorm/reliability/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,43 @@ void ::Writer::run(int argc, char* argv[])
[[maybe_unused]] auto _ = makeSingleKeyReader(topic, "barrier").getNextUnread();
}
cout << "ok" << endl;

// Publish a batch of samples to a topic's key, follow by two consecutive session recovery events without writer
// activity on the given key.
// Then send a second batch of samples to the same topic's key and ensure the reader continue reading from when it
// left off.
cout << "testing reader multiple connection closure without writer activity... " << flush;
{
Topic<string, int> topic(node, "int2");
auto writer = makeSingleKeyWriter(topic, "element", "", config);
writer.waitForReaders();
for (int i = 0; i < 100; ++i)
{
writer.update(i);
}

auto readerB = makeSingleKeyReader(topic, "reader_barrier");

// A control sample send by the reader to let the writer know the connection was closed. The writer process this
// sample after the first session reestablishment.
auto sample = readerB.getNextUnread();

// Send a control sample to let the reader know session was reestablished.
auto writerB = makeSingleKeyWriter(topic, "writer_barrier");
writerB.update(0);

// Wait for a second control sample from the reader indicating the second session closure. The writer process this
// sample after the second session reestablishment.
sample = readerB.getNextUnread();

// Session has been reestablish twice without activity in "element" key. Send the second batch of samples.
for (int i = 0; i < 100; ++i)
{
writer.update(i + 100);
}
sample = readerB.getNextUnread();
}
cout << "ok" << endl;
}

DEFINE_TEST(::Writer)

0 comments on commit e8c112e

Please sign in to comment.