diff --git a/src/base/BackedReader.cpp b/src/base/BackedReader.cpp index 865760784..b4a23975d 100644 --- a/src/base/BackedReader.cpp +++ b/src/base/BackedReader.cpp @@ -50,7 +50,7 @@ int BackedReader::read(string* buf) { // In EternalTCP, the server needs to explictly tell the client that // the session is over. errno = EPIPE; - bytesRead = -1; + return -1; } else if (bytesRead > 0) { partialMessage.append(tmpBuf, bytesRead); } else if (bytesRead == -1 && errno == EAGAIN) { diff --git a/test/ConnectionTest.cpp b/test/ConnectionTest.cpp index 7640e4a6e..02ed63227 100644 --- a/test/ConnectionTest.cpp +++ b/test/ConnectionTest.cpp @@ -40,7 +40,6 @@ class Collector { if (status == 1) { if (s == string("DONE")) { fifo.push_back(s); - break; } if (s != string("HEARTBEAT")) { fifo.push_back(s); @@ -49,7 +48,10 @@ class Collector { FATAL_FAIL(status); } } - ::usleep(1000); + if (connection->isShuttingDown()) { + done = true; + } + ::usleep(10 * 1000); if (lastSecond <= time(NULL) - 5) { lock_guard guard(collectorMutex); lastSecond = time(NULL); @@ -58,6 +60,8 @@ class Collector { } } + void join() { collectorThread->join(); } + void finish() { connection->shutdown(); lock_guard guard(collectorMutex); @@ -82,13 +86,15 @@ class Collector { string read() { while (!hasData()) { - ::usleep(1000); + ::usleep(10 * 1000); } return pop(); } void write(const string& s) { return connection->writeMessage(s); } + shared_ptr getConnection() { return connection; } + protected: shared_ptr connection; deque fifo; @@ -100,13 +106,11 @@ class Collector { void listenFn(bool* stopListening, int serverFd, shared_ptr serverConnection) { - // Only works when there is 1:1 mapping between endpoint and fds. Will fix in - // future api while (*stopListening == false) { if (serverConnection->getSocketHandler()->hasData(serverFd)) { serverConnection->acceptNewConnection(serverFd); } - ::usleep(1000 * 1000); + ::usleep(10 * 1000); } } @@ -205,8 +209,9 @@ class ConnectionTest : public testing::Test { result = clientCollector->read(); EXPECT_EQ(result, "DONE"); - serverCollector->finish(); - clientCollector->finish(); + serverConnection->removeClient(serverCollector->getConnection()->getId()); + serverCollector->join(); + clientCollector->join(); EXPECT_EQ(resultConcat, s); } @@ -282,7 +287,6 @@ TEST_F(FlakyConnectionTest, MultiReadWrite) { new_id[0] = 'A' + a; pool.push([&, this](int id, string clientId) { readWriteTest(clientId); }, new_id); - ::usleep((500 + rand() % 1000) * 1000); } pool.stop(true); } diff --git a/test/FlakySocketHandler.hpp b/test/FlakySocketHandler.hpp index 1a834ff96..db34ad860 100644 --- a/test/FlakySocketHandler.hpp +++ b/test/FlakySocketHandler.hpp @@ -39,7 +39,7 @@ class FlakySocketHandler : public SocketHandler { } int writeAllOrReturn(int fd, const void* buf, size_t count) { - if (rand() % 20 == 0) { + if (rand() % 30 == 0) { errno = EPIPE; return -1; }