Skip to content

Commit

Permalink
Merge pull request #19207 from jakesmith/HPCC-32822-protocol-error
Browse files Browse the repository at this point in the history
HPCC-32822 Fix MP protocol error logged indefinitely in loop

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Oct 18, 2024
2 parents 396705c + 81b3e2e commit afff3f8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion system/jlib/jdebug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class PeriodicTimer

protected:
cycle_t timePeriodCycles = 0;
cycle_t lastElapsedCycles = 0;
std::atomic<cycle_t> lastElapsedCycles{0};
};


Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jmisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ void throwExceptionIfAborting()

StringBuffer & hexdump2string(byte const * in, size32_t inSize, StringBuffer & out)
{
out.append("[");
out.appendf("%u bytes [", inSize);
byte last = 0;
unsigned seq = 1;
for(unsigned i=0; i<inSize; ++i)
Expand Down
40 changes: 28 additions & 12 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ struct MultiPacketHeader

class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
{
StringAttr msg;
public:
IMPLEMENT_IINTERFACE;

CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
CMPException(MessagePassingError err,const SocketEndpoint &ep, const char *_msg = nullptr) : error(err), endpoint(ep), msg(_msg)
{
}

Expand All @@ -240,6 +241,8 @@ class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
// change it from "MP link closed" to something more helpful
case MPERR_link_closed: str.appendf("Unexpected process termination (ep:%s)",endpoint.getEndpointHostText(tmp).str()); break;
}
if (msg.length())
str.append(" - ").append(msg);
return str;
}
int errorCode() const { return error; }
Expand Down Expand Up @@ -1813,6 +1816,8 @@ class ForwardPacketHandler // TAG_SYS_FORWARD
};


static PeriodicTimer periodicTimer(10*60*1000, false); // 10 minutes
static std::atomic<__uint64> mpProtocolErrors{0};
// --------------------------------------------------------

class CMPPacketReader: public ISocketSelectNotify, public CInterface
Expand Down Expand Up @@ -1847,7 +1852,8 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
{
if (!parent)
return false;
bool gc = false; // if a gc is hit, then will fall through to close socket
bool closeSocket = false; // if a graceful close is hit, this will be set and will fall through to close socket
bool suppressException = false;
try
{
while (true) // NB: breaks out if blocked (if (remaining) ..)
Expand All @@ -1872,7 +1878,7 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
if (!gotPacketHdr)
{
CCycleTimer timer;
gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000));
closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000));
remaining -= szRead;
activeptr += szRead;
if (remaining) // only possible if blocked.
Expand All @@ -1882,10 +1888,20 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100)
{
// TBD IPV6 here
mpProtocolErrors++;
SocketEndpoint ep;
sock->getPeerEndpoint(ep);
IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
throw e;
if (periodicTimer.hasElapsed())
{
VStringBuffer packetHdrBytes("[%" I64F "u incidents to date]. Packet Header: ", mpProtocolErrors.load());
hexdump2string((byte const *)&hdr, sizeof(hdr), packetHdrBytes);
throw new CMPException(MPERR_protocol_version_mismatch, ep, packetHdrBytes.str());
}
else
{
suppressException = true;
throw new CMPException(MPERR_protocol_version_mismatch, ep);
}
}
hdr.setMessageFields(*activemsg);
#ifdef _FULLTRACE
Expand All @@ -1898,9 +1914,9 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
gotPacketHdr = true;
}

if (!gc && remaining)
if (!closeSocket && remaining)
{
gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER);
closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER);
remaining -= szRead;
activeptr += szRead;
}
Expand Down Expand Up @@ -1939,19 +1955,19 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
}
}
while (activemsg);
if (gc)
if (closeSocket)
break;
}
}
catch (IException *e)
{
if (e->errorCode()!=JSOCKERR_graceful_close)
FLLOG(MCoperatorWarning, e,"MP(Packet Reader)");
if (!suppressException && e->errorCode()!=JSOCKERR_graceful_close)
FLLOG(MCoperatorWarning, e, "MP(Packet Reader)");
e->Release();
gotPacketHdr = false;
closeSocket = true; // NB: this select handler will removed and not be notified again
}

if (gc)
if (closeSocket)
{
// here due to error or graceful close, so close socket (ignore error as may be closed already)
try
Expand Down

0 comments on commit afff3f8

Please sign in to comment.