diff --git a/system/mp/mpcomm.cpp b/system/mp/mpcomm.cpp index 6caa83f8e14..63ff2c856a1 100644 --- a/system/mp/mpcomm.cpp +++ b/system/mp/mpcomm.cpp @@ -595,6 +595,7 @@ class CMPServer: private CMPChannelHT, implements IMPServer bool tryReopenChannel = false; bool useTLS = false; unsigned mpTraceLevel = 0; + bool dumpQueue = true; // packet handlers PingPacketHandler *pingpackethandler; // TAG_SYS_PING @@ -2682,10 +2683,13 @@ CMPServer::CMPServer(unsigned __int64 _role, unsigned _port, bool _listen) CMPServer::~CMPServer() { #ifdef _TRACEORPHANS - StringBuffer buf; - getReceiveQueueDetails(buf); - if (buf.length()) - LOG(MCdebugInfo, "MP: Orphan check\n%s",buf.str()); + if (dumpQueue) + { + StringBuffer buf; + getReceiveQueueDetails(buf); + if (buf.length()) + LOG(MCdebugInfo, "MP: Orphan check\n%s",buf.str()); + } #endif _releaseAll(); selecthandler->stop(true); @@ -3583,6 +3587,7 @@ class CGlobalMPServer : public CMPServer unsigned queryNest() { return nestLevel; } bool isPaused() const { return paused; } void setPaused(bool onOff) { paused = onOff; } + void setDumpQueue(bool onOff) { dumpQueue = onOff; } }; CriticalSection CGlobalMPServer::sect; static CGlobalMPServer *globalMPServer; @@ -3627,7 +3632,7 @@ void startMPServer(unsigned port, bool paused, bool listen) startMPServer(0, port, paused, listen); } -void stopMPServer() +void stopMPServer(bool dumpQueue) { CGlobalMPServer *_globalMPServer = NULL; { @@ -3646,6 +3651,7 @@ void stopMPServer() } if (NULL == _globalMPServer) return; + _globalMPServer->setDumpQueue(dumpQueue); _globalMPServer->stop(); _globalMPServer->Release(); #ifdef _TRACE diff --git a/system/mp/mpcomm.hpp b/system/mp/mpcomm.hpp index 4825640e071..e91af5f9f5b 100644 --- a/system/mp/mpcomm.hpp +++ b/system/mp/mpcomm.hpp @@ -107,7 +107,7 @@ interface IMPServer : extends IInterface extern mp_decl void startMPServer(unsigned port, bool paused=false, bool listen=false); extern mp_decl void startMPServer(unsigned __int64 role, unsigned port, bool paused=false, bool listen=false); -extern mp_decl void stopMPServer(); +extern mp_decl void stopMPServer(bool dumpQueue=true); extern mp_decl IMPServer *getMPServer(); extern mp_decl IMPServer *startNewMPServer(unsigned port, bool listen=false); diff --git a/thorlcr/slave/slavmain.cpp b/thorlcr/slave/slavmain.cpp index d3eb593354b..1b6cc826f77 100644 --- a/thorlcr/slave/slavmain.cpp +++ b/thorlcr/slave/slavmain.cpp @@ -53,6 +53,8 @@ #include "rtlcommon.hpp" #include "../activities/keyedjoin/thkeyedjoincommon.hpp" +bool recvShutdown = false; + //--------------------------------------------------------------------------- //--------------------------------------------------------------------------- @@ -1497,7 +1499,8 @@ class CKJService : public CSimpleInterfaceOf, implements IThreaded, } catch (IMP_Exception *e) { - EXCLOG(e, nullptr); + if (!recvShutdown) + EXCLOG(e, nullptr); e->Release(); break; } @@ -2094,6 +2097,7 @@ class CJobListener : public CSimpleInterface case Shutdown: { stopped = true; + recvShutdown = true; PROGLOG("Shutdown received"); if (watchdog) watchdog->stop(); diff --git a/thorlcr/slave/slavmain.hpp b/thorlcr/slave/slavmain.hpp index 4e8af88bde0..2ce336977bf 100644 --- a/thorlcr/slave/slavmain.hpp +++ b/thorlcr/slave/slavmain.hpp @@ -24,6 +24,7 @@ void slaveMain(bool &jobListenerStopped, ILogMsgHandler *logHandler); void enableThorSlaveAsDaliClient(); void disableThorSlaveAsDaliClient(); +extern bool recvShutdown; #endif diff --git a/thorlcr/slave/thslavemain.cpp b/thorlcr/slave/thslavemain.cpp index c1fa682a57c..3037f55343e 100644 --- a/thorlcr/slave/thslavemain.cpp +++ b/thorlcr/slave/thslavemain.cpp @@ -50,8 +50,6 @@ #include "dafdesc.hpp" #include "rmtfile.hpp" -#include "slavmain.hpp" - #ifdef _CONTAINERIZED #include "dafsserver.hpp" #endif @@ -285,12 +283,49 @@ bool UnregisterSelf(IException *e) return false; } +void signalInFuture(int signo, unsigned timeoutSec) +{ +#ifndef _WIN32 + int ret; + timer_t timerId; + struct sigevent sigev; + struct itimerspec itSpec; + + sigev.sigev_notify = SIGEV_SIGNAL; + sigev.sigev_signo = signo; + sigev.sigev_value.sival_ptr = &timerId; + sigev.sigev_notify_function = nullptr; + sigev.sigev_notify_attributes = NULL; + + itSpec.it_value.tv_sec = timeoutSec; + itSpec.it_value.tv_nsec = 0; + itSpec.it_interval.tv_sec = 0; + itSpec.it_interval.tv_nsec = 0; + + ret = timer_create(CLOCK_MONOTONIC, &sigev, &timerId); + if (!ret) + timer_settime(timerId, 0, &itSpec, 0); +#endif +} + bool ControlHandler(ahType type) { + static bool recvdSig = false; + if (recvdSig) + { + if (ahInterrupt == type) + _exit(128+SIGINT); + else + _exit(128+SIGTERM); + } + recvdSig = true; + signalInFuture(SIGTERM, 20); + if (ahInterrupt == type) LOG(MCdebugProgress, "CTRL-C detected"); else if (!jobListenerStopped) LOG(MCdebugProgress, "SIGTERM detected"); + bool unregOK = false; if (!jobListenerStopped) { @@ -298,6 +333,8 @@ bool ControlHandler(ahType type) unregOK = UnregisterSelf(NULL); abortSlave(); } + if (recvShutdown) + return false; return !unregOK; } @@ -599,7 +636,7 @@ int main( int argc, const char *argv[] ) setMultiThorMemoryNotify(0,NULL); roxiemem::releaseRoxieHeap(); - if (unregisterException.get()) + if (!recvShutdown && unregisterException.get()) UnregisterSelf(unregisterException); if (getExpertOptBool("slaveDaliClient")) @@ -608,7 +645,7 @@ int main( int argc, const char *argv[] ) #ifdef USE_MP_LOG stopLogMsgReceivers(); #endif - stopMPServer(); + stopMPServer(!recvShutdown); releaseAtoms(); // don't know why we can't use a module_exit to destruct these... ExitModuleObjects(); // not necessary, atexit will call, but good for leak checking