Skip to content

Commit

Permalink
Merge pull request hpcc-systems#18771 from mckellyln/thorslave_shutdown
Browse files Browse the repository at this point in the history
HPCC-32072 Thor worker cleaner shutdown

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Aug 5, 2024
2 parents fc04a04 + 7d9a990 commit f61f861
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 11 deletions.
26 changes: 26 additions & 0 deletions system/jlib/jexcept.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <sys/wait.h>
#include <sys/types.h>
#include <stddef.h>
#include <time.h>
#include <signal.h>
#include <errno.h>
#ifdef __linux__
#include <execinfo.h> // comment out if not present
Expand Down Expand Up @@ -1663,6 +1665,30 @@ void jlib_decl disableSEHtoExceptionMapping()
#endif
}

void jlib_decl raiseSignalInFuture(int signo, unsigned timeoutSec)
{
#if defined(__linux__)
int ret;
timer_t timerId;
struct sigevent sigev;
struct itimerspec itSpec;

sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = signo;
sigev.sigev_value.sival_ptr = &timerId;
sigev.sigev_notify_function = nullptr;
sigev.sigev_notify_attributes = NULL;

itSpec.it_value.tv_sec = timeoutSec;
itSpec.it_value.tv_nsec = 0;
itSpec.it_interval.tv_sec = 0;
itSpec.it_interval.tv_nsec = 0;

ret = timer_create(CLOCK_MONOTONIC, &sigev, &timerId);
if (!ret)
timer_settime(timerId, 0, &itSpec, 0);
#endif
}

StringBuffer & formatSystemError(StringBuffer & out, unsigned errcode)
{
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jexcept.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ void jlib_decl disableSEHtoExceptionMapping();

void jlib_decl *setSEHtoExceptionHandler(IExceptionHandler *handler); // sets handler and return old value

void jlib_decl raiseSignalInFuture(int signo, unsigned timeoutSec);

void jlib_decl setTerminateOnSEHInSystemDLLs(bool set=true);
void jlib_decl setTerminateOnSEH(bool set=true);
Expand Down
16 changes: 11 additions & 5 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ class CMPServer: private CMPChannelHT, implements IMPServer
bool tryReopenChannel = false;
bool useTLS = false;
unsigned mpTraceLevel = 0;
bool dumpQueue = true;

// packet handlers
PingPacketHandler *pingpackethandler; // TAG_SYS_PING
Expand Down Expand Up @@ -2724,10 +2725,13 @@ CMPServer::CMPServer(unsigned __int64 _role, unsigned _port, bool _listen)
CMPServer::~CMPServer()
{
#ifdef _TRACEORPHANS
StringBuffer buf;
getReceiveQueueDetails(buf);
if (buf.length())
LOG(MCdebugInfo, "MP: Orphan check\n%s",buf.str());
if (dumpQueue)
{
StringBuffer buf;
getReceiveQueueDetails(buf);
if (buf.length())
LOG(MCdebugInfo, "MP: Orphan check\n%s",buf.str());
}
#endif
_releaseAll();
selecthandler->stop(true);
Expand Down Expand Up @@ -3625,6 +3629,7 @@ class CGlobalMPServer : public CMPServer
unsigned queryNest() { return nestLevel; }
bool isPaused() const { return paused; }
void setPaused(bool onOff) { paused = onOff; }
void setDumpQueue(bool onOff) { dumpQueue = onOff; }
};
CriticalSection CGlobalMPServer::sect;
static CGlobalMPServer *globalMPServer;
Expand Down Expand Up @@ -3669,7 +3674,7 @@ void startMPServer(unsigned port, bool paused, bool listen)
startMPServer(0, port, paused, listen);
}

void stopMPServer()
void stopMPServer(bool dumpQueue)
{
CGlobalMPServer *_globalMPServer = NULL;
{
Expand All @@ -3688,6 +3693,7 @@ void stopMPServer()
}
if (NULL == _globalMPServer)
return;
_globalMPServer->setDumpQueue(dumpQueue);
_globalMPServer->stop();
_globalMPServer->Release();
#ifdef _TRACE
Expand Down
2 changes: 1 addition & 1 deletion system/mp/mpcomm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ interface IMPServer : extends IInterface

extern mp_decl void startMPServer(unsigned port, bool paused=false, bool listen=false);
extern mp_decl void startMPServer(unsigned __int64 role, unsigned port, bool paused=false, bool listen=false);
extern mp_decl void stopMPServer();
extern mp_decl void stopMPServer(bool dumpQueue=true);
extern mp_decl IMPServer *getMPServer();
extern mp_decl IMPServer *startNewMPServer(unsigned port, bool listen=false);

Expand Down
6 changes: 5 additions & 1 deletion thorlcr/slave/slavmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
#include "rtlcommon.hpp"
#include "../activities/keyedjoin/thkeyedjoincommon.hpp"

bool recvShutdown = false;

//---------------------------------------------------------------------------

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -1497,7 +1499,8 @@ class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded,
}
catch (IMP_Exception *e)
{
EXCLOG(e, nullptr);
if (!recvShutdown)
EXCLOG(e, nullptr);
e->Release();
break;
}
Expand Down Expand Up @@ -2094,6 +2097,7 @@ class CJobListener : public CSimpleInterface
case Shutdown:
{
stopped = true;
recvShutdown = true;
PROGLOG("Shutdown received");
if (watchdog)
watchdog->stop();
Expand Down
1 change: 1 addition & 0 deletions thorlcr/slave/slavmain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void slaveMain(bool &jobListenerStopped, ILogMsgHandler *logHandler);
void enableThorSlaveAsDaliClient();
void disableThorSlaveAsDaliClient();

extern bool recvShutdown;
#endif


20 changes: 16 additions & 4 deletions thorlcr/slave/thslavemain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
#include "dafdesc.hpp"
#include "rmtfile.hpp"

#include "slavmain.hpp"

#ifdef _CONTAINERIZED
#include "dafsserver.hpp"
#endif
Expand Down Expand Up @@ -287,17 +285,31 @@ bool UnregisterSelf(IException *e)

bool ControlHandler(ahType type)
{
static bool recvdSig = false;
if (recvdSig)
{
if (ahInterrupt == type)
_exit(128+SIGINT);
else
_exit(128+SIGTERM);
}
recvdSig = true;
raiseSignalInFuture(SIGTERM, 20);

if (ahInterrupt == type)
LOG(MCdebugProgress, "CTRL-C detected");
else if (!jobListenerStopped)
LOG(MCdebugProgress, "SIGTERM detected");

bool unregOK = false;
if (!jobListenerStopped)
{
if (masterNode)
unregOK = UnregisterSelf(NULL);
abortSlave();
}
if (recvShutdown)
return false;
return !unregOK;
}

Expand Down Expand Up @@ -599,7 +611,7 @@ int main( int argc, const char *argv[] )
setMultiThorMemoryNotify(0,NULL);
roxiemem::releaseRoxieHeap();

if (unregisterException.get())
if (!recvShutdown && unregisterException.get())
UnregisterSelf(unregisterException);

if (getExpertOptBool("slaveDaliClient"))
Expand All @@ -608,7 +620,7 @@ int main( int argc, const char *argv[] )
#ifdef USE_MP_LOG
stopLogMsgReceivers();
#endif
stopMPServer();
stopMPServer(!recvShutdown);
releaseAtoms(); // don't know why we can't use a module_exit to destruct these...

ExitModuleObjects(); // not necessary, atexit will call, but good for leak checking
Expand Down

0 comments on commit f61f861

Please sign in to comment.