From 637576a591e58a2394019725faf8ddf7708f4856 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Tue, 26 Sep 2023 15:30:01 +0100 Subject: [PATCH] HPCC-30362 Fix setSlaveAffinity regression (since 7.8) setSlaveAffinity needed to be called after the worker had registered with the manager, only then are the thor configuration settings available, and in particular slavesPerNode which setSlaveAffinity depends upon. Signed-off-by: Jake Smith --- system/jlib/jdebug.cpp | 7 +++-- system/jlib/jdebug.hpp | 2 +- thorlcr/slave/thslavemain.cpp | 59 +++++++++++++++++++---------------- 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/system/jlib/jdebug.cpp b/system/jlib/jdebug.cpp index 934067258c3..3092a6a642f 100644 --- a/system/jlib/jdebug.cpp +++ b/system/jlib/jdebug.cpp @@ -1578,13 +1578,13 @@ void clearAffinityCache() cachedNumCpus.store(0, std::memory_order_release); } -void applyResourcedCPUAffinity(const IPropertyTree *resourceSection) +bool applyResourcedCPUAffinity(const IPropertyTree *resourceSection) { if (nullptr == resourceSection) - return; + return false; const char *cpusText = resourceSection->queryProp("@cpu"); if (isEmptyString(cpusText)) - return; + return false; double cpus = friendlyCPUToDecimal(cpusText); if (0.0 == cpus) throw makeStringExceptionV(0, "Invalid number of resources cpus: %s", cpusText); @@ -1601,6 +1601,7 @@ void applyResourcedCPUAffinity(const IPropertyTree *resourceSection) // NB: if something were to clear affinity, then this setting would be lost setAffinityCpus(cpusI); } + return true; // signifies there was a cpu resource setting of some kind } #define RXMAX 1000000 // can be 10x bigger but this produces reasonable amounts diff --git a/system/jlib/jdebug.hpp b/system/jlib/jdebug.hpp index 6c89b35b0ab..39219db1177 100644 --- a/system/jlib/jdebug.hpp +++ b/system/jlib/jdebug.hpp @@ -606,7 +606,7 @@ extern jlib_decl void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed); extern jlib_decl unsigned getAffinityCpus(); extern jlib_decl void setAffinityCpus(unsigned cpus); extern jlib_decl void clearAffinityCache(); // should be called whenever the process affinity is changed to reset the cache -extern jlib_decl void applyResourcedCPUAffinity(const IPropertyTree *resourceSection); +extern jlib_decl bool applyResourcedCPUAffinity(const IPropertyTree *resourceSection); extern jlib_decl void printProcMap(const char *fn, bool printbody, bool printsummary, StringBuffer *lnout, MemoryBuffer *mb, bool useprintf); extern jlib_decl void PrintMemoryReport(bool full=true); diff --git a/thorlcr/slave/thslavemain.cpp b/thorlcr/slave/thslavemain.cpp index dccf3e0d100..23cfdbbbc5d 100644 --- a/thorlcr/slave/thslavemain.cpp +++ b/thorlcr/slave/thslavemain.cpp @@ -92,6 +92,27 @@ static void replyError(unsigned errorCode, const char *errorMsg) queryNodeComm().send(msg, 0, MPTAG_THORREGISTRATION); } +static void setSlaveAffinity(unsigned processOnNode) +{ + const char * affinity = globals->queryProp("@affinity"); + if (affinity) + setProcessAffinity(affinity); + else if (globals->getPropBool("@autoAffinity", true)) + { + const char * nodes = globals->queryProp("@autoNodeAffinityNodes"); + unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1); + setAutoAffinity(processOnNode, slavesPerNode, nodes); + } + + //The default policy is to allocate from the local node, so restricting allocations to the current sockets + //may not buy much once the affinity is set up. It also means it will fail if there is no memory left on + //this socket - even if there is on others. + //Therefore it is not recommended unless you have maybe several independent thors running on the same machines + //with exclusive access to memory. + if (globals->getPropBool("@numaBindLocal", false)) + bindMemoryToLocalNodes(); +} + static std::atomic isRegistered {false}; static bool RegisterSelf(SocketEndpoint &masterEp) @@ -153,8 +174,11 @@ static bool RegisterSelf(SocketEndpoint &masterEp) return false; } - // NB: if any resource cpu restriction, this superceeds any affinity that setSlaveAffinity may have applied - applyResourcedCPUAffinity(globals->queryPropTree("workerResources")); + if (!applyResourcedCPUAffinity(globals->queryPropTree("workerResources"))) + { + // NB: autoAffinity/affinity only applicable in the absence of workerResources.cpu + setSlaveAffinity(globals->getPropInt("@slaveprocessnum")); + } StringBuffer xpath; getExpertOptPath(nullptr, xpath); // 'expert' in container world, or 'Debug' in bare-metal @@ -315,27 +339,6 @@ ILogMsgHandler *startSlaveLog() return logHandler; } -void setSlaveAffinity(unsigned processOnNode) -{ - const char * affinity = globals->queryProp("@affinity"); - if (affinity) - setProcessAffinity(affinity); - else if (globals->getPropBool("@autoAffinity", true)) - { - const char * nodes = globals->queryProp("@autoNodeAffinityNodes"); - unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1); - setAutoAffinity(processOnNode, slavesPerNode, nodes); - } - - //The default policy is to allocate from the local node, so restricting allocations to the current sockets - //may not buy much once the affinity is set up. It also means it will fail if there is no memory left on - //this socket - even if there is on others. - //Therefore it is not recommended unless you have maybe several independent thors running on the same machines - //with exclusive access to memory. - if (globals->getPropBool("@numaBindLocal", false)) - bindMemoryToLocalNodes(); -} - int main( int argc, const char *argv[] ) { if (!checkCreateDaemon(argc, argv)) @@ -380,6 +383,9 @@ int main( int argc, const char *argv[] ) globals.setown(loadConfiguration(globals, argv, "thor", "THOR", nullptr, nullptr, nullptr, false)); #endif + // NB: the thor configuration is serialized from the manager and only available after RegisterSelf + // Until that point, only properties on the command line are available. + const char *master = globals->queryProp("@master"); if (!master) usage(); @@ -411,10 +417,6 @@ int main( int argc, const char *argv[] ) slfEp.port = queryMyNode()->endpoint().port; setMachinePortBase(slfEp.port); - setSlaveAffinity(globals->getPropInt("@slaveprocessnum")); - - if (globals->getPropBool("@MPChannelReconnect")) - getMPServer()->setOpt(mpsopt_channelreopen, "true"); #ifdef USE_MP_LOG startLogMsgParentReceiver(); LOG(MCdebugProgress, thorJob, "MPServer started on port %d", getFixedPort(TPORT_mp)); @@ -430,6 +432,9 @@ int main( int argc, const char *argv[] ) if (!slaveLogHandler) slaveLogHandler = startSlaveLog(); + if (globals->getPropBool("@MPChannelReconnect")) + getMPServer()->setOpt(mpsopt_channelreopen, "true"); + if (getExpertOptBool("slaveDaliClient")) enableThorSlaveAsDaliClient();