From 81b3e2ea21b42f5cae62792255029e87c641dc14 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Thu, 17 Oct 2024 18:45:06 +0100 Subject: [PATCH 1/3] HPCC-32822 Fix MP protocol error logged indefinitely in loop The detection of a MP packet with an invalid header threw an exception that was logged, but did not close the socket. When the client closed the socket, the MP server was notified but because it had nothing left to read of the [bad] header, it saw the invalid protocol error again and rethrew the same error. The epoll handler loop continuously notified the handler of the close event (because it did nothing with it), and the protocol error was continuosly output. The socket should be close on this and any other exception. In case these events are too frequent, add a timer to log the protocol errors less frequently, and log the header bytes received to help diagnose what the source of these 'rogue' connections are. Signed-off-by: Jake Smith --- system/jlib/jdebug.hpp | 2 +- system/jlib/jmisc.cpp | 2 +- system/mp/mpcomm.cpp | 40 ++++++++++++++++++++++++++++------------ 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/system/jlib/jdebug.hpp b/system/jlib/jdebug.hpp index a4d5d41c705..e0983b892c5 100644 --- a/system/jlib/jdebug.hpp +++ b/system/jlib/jdebug.hpp @@ -275,7 +275,7 @@ class PeriodicTimer protected: cycle_t timePeriodCycles = 0; - cycle_t lastElapsedCycles = 0; + std::atomic lastElapsedCycles{0}; }; diff --git a/system/jlib/jmisc.cpp b/system/jlib/jmisc.cpp index 9d1fbf5d504..f778d79cf2e 100644 --- a/system/jlib/jmisc.cpp +++ b/system/jlib/jmisc.cpp @@ -920,7 +920,7 @@ void throwExceptionIfAborting() StringBuffer & hexdump2string(byte const * in, size32_t inSize, StringBuffer & out) { - out.append("["); + out.appendf("%u bytes [", inSize); byte last = 0; unsigned seq = 1; for(unsigned i=0; i mpProtocolErrors{0}; // -------------------------------------------------------- class CMPPacketReader: public ISocketSelectNotify, public CInterface @@ -1847,7 +1852,8 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface { if (!parent) return false; - bool gc = false; // if a gc is hit, then will fall through to close socket + bool closeSocket = false; // if a graceful close is hit, this will be set and will fall through to close socket + bool suppressException = false; try { while (true) // NB: breaks out if blocked (if (remaining) ..) @@ -1872,7 +1878,7 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface if (!gotPacketHdr) { CCycleTimer timer; - gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000)); + closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000)); remaining -= szRead; activeptr += szRead; if (remaining) // only possible if blocked. @@ -1882,10 +1888,20 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) { // TBD IPV6 here + mpProtocolErrors++; SocketEndpoint ep; sock->getPeerEndpoint(ep); - IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep); - throw e; + if (periodicTimer.hasElapsed()) + { + VStringBuffer packetHdrBytes("[%" I64F "u incidents to date]. Packet Header: ", mpProtocolErrors.load()); + hexdump2string((byte const *)&hdr, sizeof(hdr), packetHdrBytes); + throw new CMPException(MPERR_protocol_version_mismatch, ep, packetHdrBytes.str()); + } + else + { + suppressException = true; + throw new CMPException(MPERR_protocol_version_mismatch, ep); + } } hdr.setMessageFields(*activemsg); #ifdef _FULLTRACE @@ -1898,9 +1914,9 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface gotPacketHdr = true; } - if (!gc && remaining) + if (!closeSocket && remaining) { - gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER); + closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER); remaining -= szRead; activeptr += szRead; } @@ -1939,19 +1955,19 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface } } while (activemsg); - if (gc) + if (closeSocket) break; } } catch (IException *e) { - if (e->errorCode()!=JSOCKERR_graceful_close) - FLLOG(MCoperatorWarning, e,"MP(Packet Reader)"); + if (!suppressException && e->errorCode()!=JSOCKERR_graceful_close) + FLLOG(MCoperatorWarning, e, "MP(Packet Reader)"); e->Release(); - gotPacketHdr = false; + closeSocket = true; // NB: this select handler will removed and not be notified again } - if (gc) + if (closeSocket) { // here due to error or graceful close, so close socket (ignore error as may be closed already) try From 70e4bcd52bad29c0f35d258c07129637dc3a7756 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Fri, 18 Oct 2024 13:39:24 +0100 Subject: [PATCH 2/3] HPCC-32828 Add filename to fsync warnings Signed-off-by: Jake Smith --- system/jlib/jfile.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index cce5b1b54a2..67b15ca9aa4 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -2055,7 +2055,7 @@ void CFileIO::setSize(offset_t pos) //-- Unix implementation ---------------------------------------------------- -static void doSync(int fd, bool dataOnly) +static void doSync(const CFileIO &fileIO, int fd, bool dataOnly) { #ifdef F_FULLFSYNC // No EIO type retry available @@ -2066,25 +2066,25 @@ static void doSync(int fd, bool dataOnly) if (ret == 0) { if (timer.elapsedMs() >= 10000) - IWARNLOG("doSync: slow success: took %u ms", timer.elapsedMs()); + IWARNLOG("doSync(%s): slow success: took %u ms", fileIO.querySafeFilename(), timer.elapsedMs()); } else { int err = errno; printStackReport(); - Owned e = makeErrnoExceptionV(err, "doSync: failed after %u ms", timer.elapsedMs()); + Owned e = makeErrnoExceptionV(err, "doSync(%s): failed after %u ms", fileIO.querySafeFilename(), timer.elapsedMs()); OWARNLOG(e); throw e.getClear(); } #endif } -static void syncFileData(int fd, bool notReadOnly, IFEflags extraFlags, bool wait_previous=false) +static void syncFileData(const CFileIO &fileIO, int fd, bool notReadOnly, IFEflags extraFlags, bool wait_previous=false) { if (notReadOnly) { if (extraFlags & IFEsync) - doSync(fd, true); + doSync(fileIO, fd, true); #if defined(__linux__) else if (extraFlags & IFEnocache) { @@ -2176,9 +2176,9 @@ void CFileIO::close() DBGLOG("CFileIO::close(%d), extraFlags = %d", tmpHandle, extraFlags); #endif if (extraFlags & (IFEnocache | IFEsync)) - syncFileData(tmpHandle, openmode!=IFOread, extraFlags, false); + syncFileData(*this, tmpHandle, openmode!=IFOread, extraFlags, false); else if (extraFlags & IFEsyncAtClose) - doSync(tmpHandle, false); + doSync(*this, tmpHandle, false); if (::close(tmpHandle) < 0) throw makeErrnoExceptionV(errno, "CFileIO::close for file '%s'", querySafeFilename()); @@ -2192,7 +2192,7 @@ void CFileIO::flush() CriticalBlock procedure(cs); - syncFileData(file, true, extraFlags, false); + syncFileData(*this, file, true, extraFlags, false); } @@ -2229,7 +2229,7 @@ size32_t CFileIO::read(offset_t pos, size32_t len, void * data) if (unflushedReadBytes.add_fetch(ret) >= PGCFLUSH_BLKSIZE) { unflushedReadBytes.store(0); - syncFileData(file, false, extraFlags, false); + syncFileData(*this, file, false, extraFlags, false); } } return ret; @@ -2259,7 +2259,7 @@ size32_t CFileIO::write(offset_t pos, size32_t len, const void * data) { unflushedWriteBytes.store(0); // request to write-out dirty pages - syncFileData(file, true, extraFlags, true); + syncFileData(*this, file, true, extraFlags, true); } } return ret; From 41ba3c9b8c0013aa3af18d523b5d788f5321cb8a Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Fri, 18 Oct 2024 14:43:41 +0100 Subject: [PATCH 3/3] HPCC-32829 Avoid getPeerEndpoint refetching peer name When a socket is attached the peer is captured in existing member, reuse them in getPeerEndpoint/getPeerAddress. This also means that these calls will continue to work after the socket is closed. Therefore tracing of the peer on a closed socket will now be valid. Signed-off-by: Jake Smith --- system/jlib/jsocket.cpp | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/system/jlib/jsocket.cpp b/system/jlib/jsocket.cpp index b75500c9cb9..5dfd4acde52 100644 --- a/system/jlib/jsocket.cpp +++ b/system/jlib/jsocket.cpp @@ -609,10 +609,10 @@ class CSocket: public ISocket, public CInterface enum { ss_open, ss_shutdown, ss_close, ss_pre_open } state; T_SOCKET sock; // char* hostname; // host address - unsigned short hostport; // host port + unsigned short hostport; // host port (NB: this is the peer port if an attached socket) unsigned short localPort; SOCKETMODE sockmode; - IpAddress targetip; + IpAddress targetip; // NB: this is peer if an attached socket SocketEndpoint returnep; // set by set_return_addr MCASTREQ * mcastreq; @@ -1472,16 +1472,8 @@ SocketEndpoint &CSocket::getPeerEndpoint(SocketEndpoint &ep) if (sockmode==sm_udp_server) { // udp server ep.set(returnep); } - else { - DEFINE_SOCKADDR(u); - socklen_t ul = sizeof(u); - if (::getpeername(sock,&u.sa, &ul)<0) { - DBGLOG("getpeername failed %d",SOCKETERRNO()); - ep.set(NULL, 0); - } - else - getSockAddrEndpoint(u,ul,ep); - } + else + ep.set(hostport, targetip); // NB: if this is an attached socket, targetip/hostpost are the peer return ep; }