Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30366 Remove unused CMasterWatchdogUDP #17825

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 27 additions & 87 deletions thorlcr/master/mawatchdog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,18 @@ class CMachineStatus
};


CMasterWatchdogBase::CMasterWatchdogBase() : threaded("CMasterWatchdogBase")
CMasterWatchdog::CMasterWatchdog(bool startNow) : threaded("CMasterWatchdogBase")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I'm not sure why the thread would be started in the constructor, as there wouldn't be nodes registered so early.
Any reason this is start should be supported during construction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slaves could register a split second after the ctor, so it would make some sense to start here.
But, and I don't remember the exact reasoning, it was decided to delay monitoring until everything was up though (probably relates to the original idea that slaves could come and go which never happened).

Not new, so will leave it as it is.

{
stopped = true;
watchdogMachineTimeout = globals->getPropInt("@slaveDownTimeout", DEFAULT_SLAVEDOWNTIMEOUT);
if (watchdogMachineTimeout <= HEARTBEAT_INTERVAL*10)
watchdogMachineTimeout = HEARTBEAT_INTERVAL*10;
watchdogMachineTimeout *= 1000;
if (startNow)
start();
}

CMasterWatchdogBase::~CMasterWatchdogBase()
CMasterWatchdog::~CMasterWatchdog()
{
stop();
ForEachItemInRev(i, state)
Expand All @@ -80,7 +82,7 @@ CMasterWatchdogBase::~CMasterWatchdogBase()
}
}

void CMasterWatchdogBase::start()
void CMasterWatchdog::start()
{
if (stopped)
{
Expand All @@ -93,14 +95,14 @@ void CMasterWatchdogBase::start()
}
}

void CMasterWatchdogBase::addSlave(const SocketEndpoint &slave)
void CMasterWatchdog::addSlave(const SocketEndpoint &slave)
{
synchronized block(mutex);
CMachineStatus *mstate=new CMachineStatus(slave);
state.append(mstate);
}

void CMasterWatchdogBase::removeSlave(const SocketEndpoint &slave)
void CMasterWatchdog::removeSlave(const SocketEndpoint &slave)
{
synchronized block(mutex);
CMachineStatus *ms = findSlave(slave);
Expand All @@ -110,7 +112,7 @@ void CMasterWatchdogBase::removeSlave(const SocketEndpoint &slave)
}
}

CMachineStatus *CMasterWatchdogBase::findSlave(const SocketEndpoint &ep)
CMachineStatus *CMasterWatchdog::findSlave(const SocketEndpoint &ep)
{
ForEachItemInRev(i, state)
{
Expand All @@ -122,7 +124,7 @@ CMachineStatus *CMasterWatchdogBase::findSlave(const SocketEndpoint &ep)
}


void CMasterWatchdogBase::stop()
void CMasterWatchdog::stop()
{
{
synchronized block(mutex);
Expand All @@ -140,7 +142,7 @@ void CMasterWatchdogBase::stop()
LOG(MCdebugProgress, thorJob, "Stopped watchdog");
}

void CMasterWatchdogBase::checkMachineStatus()
void CMasterWatchdog::checkMachineStatus()
{
synchronized block(mutex);
ForEachItemInRev(i, state)
Expand All @@ -165,7 +167,7 @@ void CMasterWatchdogBase::checkMachineStatus()
}
}

unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb)
unsigned CMasterWatchdog::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb)
{
mb.clear();
unsigned read = readData(mb);
Expand All @@ -185,7 +187,20 @@ unsigned CMasterWatchdogBase::readPacket(HeartBeatPacketHeader &hb, MemoryBuffer
return 0;
}

void CMasterWatchdogBase::threadmain()
unsigned CMasterWatchdog::readData(MemoryBuffer &mb)
{
CMessageBuffer msg;
if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORWATCHDOG, NULL, watchdogMachineTimeout))
return 0;
mb.swapWith(msg);
return mb.length();
}
void CMasterWatchdog::stopReading()
{
queryNodeComm().cancel(0, MPTAG_THORWATCHDOG);
}

void CMasterWatchdog::threadmain()
{
LOG(MCdebugProgress, thorJob, "Started watchdog");
unsigned lastbeat=msTick();
Expand Down Expand Up @@ -258,84 +273,9 @@ void CMasterWatchdogBase::threadmain()
}
}


class CMasterWatchdogUDP : public CMasterWatchdogBase
{
ISocket *sock;
public:
CMasterWatchdogUDP(bool startNow)
{
sock = ISocket::udp_create(getFixedPort(TPORT_watchdog));
if (startNow)
start();
}
~CMasterWatchdogUDP()
{
::Release(sock);
}
virtual unsigned readData(MemoryBuffer &mb)
{
size32_t read;
try
{
sock->readtms(mb.reserveTruncate(UDP_DATA_MAX), sizeof(HeartBeatPacketHeader), UDP_DATA_MAX, read, watchdogMachineTimeout);
}
catch (IJSOCK_Exception *e)
{
if ((e->errorCode()!=JSOCKERR_timeout_expired)&&(e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
throw;
e->Release();
return 0; // will retry
}
return read;
}
virtual void stopReading()
{
if (sock)
{
SocketEndpoint masterEp(getMasterPortBase());
StringBuffer ipStr;
masterEp.getHostText(ipStr);
Owned<ISocket> sock = ISocket::udp_connect(getFixedPort(masterEp.port, TPORT_watchdog), ipStr.str());
// send empty packet, stopped set, will cease reading
HeartBeatPacketHeader hb;
hb.packetSize = sizeof(HeartBeatPacketHeader);
sock->write(&hb, sizeof(HeartBeatPacketHeader));
sock->close();
}
}
};

/////////////////////

class CMasterWatchdogMP : public CMasterWatchdogBase
{
public:
CMasterWatchdogMP(bool startNow)
{
if (startNow)
start();
}
virtual unsigned readData(MemoryBuffer &mb)
{
CMessageBuffer msg;
if (!queryNodeComm().recv(msg, RANK_ALL, MPTAG_THORWATCHDOG, NULL, watchdogMachineTimeout))
return 0;
mb.swapWith(msg);
return mb.length();
}
virtual void stopReading()
{
queryNodeComm().cancel(0, MPTAG_THORWATCHDOG);
}
};

/////////////////////

CMasterWatchdogBase *createMasterWatchdog(bool udp, bool startNow)
CMasterWatchdog *createMasterWatchdog(bool startNow)
{
if (udp)
return new CMasterWatchdogUDP(startNow);
else
return new CMasterWatchdogMP(startNow);
return new CMasterWatchdog(startNow);
}
14 changes: 7 additions & 7 deletions thorlcr/master/mawatchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
class CMachineStatus;
struct HeartBeatPacketHeader;

class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded
class CMasterWatchdog : public CSimpleInterface, implements IThreaded
{
PointerArray state;
SocketEndpoint master;
Expand All @@ -37,22 +37,22 @@ class CMasterWatchdogBase : public CSimpleInterface, implements IThreaded
bool stopped;
unsigned watchdogMachineTimeout;
public:
CMasterWatchdogBase();
~CMasterWatchdogBase();
CMasterWatchdog(bool startNow);
~CMasterWatchdog();
void addSlave(const SocketEndpoint &slave);
void removeSlave(const SocketEndpoint &slave);
CMachineStatus *findSlave(const SocketEndpoint &ep);
void checkMachineStatus();
unsigned readPacket(HeartBeatPacketHeader &hb, MemoryBuffer &mb);
void start();
void stop();
unsigned readData(MemoryBuffer &mb);
void stopReading();
// IThredaed
virtual void threadmain() override;

virtual unsigned readData(MemoryBuffer &mb) = 0;
virtual void stopReading() = 0;
};

CMasterWatchdogBase *createMasterWatchdog(bool udp=false, bool startNow=false);
CMasterWatchdog *createMasterWatchdog(bool startNow=false);

#endif

4 changes: 2 additions & 2 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class CRegistryServer : public CSimpleInterface
}
} deregistrationWatch;
public:
Linked<CMasterWatchdogBase> watchdog;
Linked<CMasterWatchdog> watchdog;
IBitSet *status;

CRegistryServer() : deregistrationWatch(*this)
Expand All @@ -209,7 +209,7 @@ class CRegistryServer : public CSimpleInterface
msgDelay = SLAVEREG_VERIFY_DELAY;
slavesRegistered = 0;
if (globals->getPropBool("@watchdogEnabled"))
watchdog.setown(createMasterWatchdog(globals->getPropBool("@useUDPWatchdog")));
watchdog.setown(createMasterWatchdog());
else
globals->setPropBool("@watchdogProgressEnabled", false);
CriticalBlock b(regCrit);
Expand Down