Skip to content

Commit

Permalink
HPCC-30362 Fix setSlaveAffinity regression (since 7.8)
Browse files Browse the repository at this point in the history
setSlaveAffinity needed to be called after the worker had
registered with the manager, only then are the thor
configuration settings available, and in particular
slavesPerNode which setSlaveAffinity depends upon.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Sep 27, 2023
1 parent 2b038c1 commit 637576a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 31 deletions.
7 changes: 4 additions & 3 deletions system/jlib/jdebug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1578,13 +1578,13 @@ void clearAffinityCache()
cachedNumCpus.store(0, std::memory_order_release);
}

void applyResourcedCPUAffinity(const IPropertyTree *resourceSection)
bool applyResourcedCPUAffinity(const IPropertyTree *resourceSection)
{
if (nullptr == resourceSection)
return;
return false;
const char *cpusText = resourceSection->queryProp("@cpu");
if (isEmptyString(cpusText))
return;
return false;
double cpus = friendlyCPUToDecimal(cpusText);
if (0.0 == cpus)
throw makeStringExceptionV(0, "Invalid number of resources cpus: %s", cpusText);
Expand All @@ -1601,6 +1601,7 @@ void applyResourcedCPUAffinity(const IPropertyTree *resourceSection)
// NB: if something were to clear affinity, then this setting would be lost
setAffinityCpus(cpusI);
}
return true; // signifies there was a cpu resource setting of some kind
}

#define RXMAX 1000000 // can be 10x bigger but this produces reasonable amounts
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jdebug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ extern jlib_decl void getCpuInfo(unsigned &numCPUs, unsigned &CPUSpeed);
extern jlib_decl unsigned getAffinityCpus();
extern jlib_decl void setAffinityCpus(unsigned cpus);
extern jlib_decl void clearAffinityCache(); // should be called whenever the process affinity is changed to reset the cache
extern jlib_decl void applyResourcedCPUAffinity(const IPropertyTree *resourceSection);
extern jlib_decl bool applyResourcedCPUAffinity(const IPropertyTree *resourceSection);

extern jlib_decl void printProcMap(const char *fn, bool printbody, bool printsummary, StringBuffer *lnout, MemoryBuffer *mb, bool useprintf);
extern jlib_decl void PrintMemoryReport(bool full=true);
Expand Down
59 changes: 32 additions & 27 deletions thorlcr/slave/thslavemain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@ static void replyError(unsigned errorCode, const char *errorMsg)
queryNodeComm().send(msg, 0, MPTAG_THORREGISTRATION);
}

static void setSlaveAffinity(unsigned processOnNode)
{
const char * affinity = globals->queryProp("@affinity");
if (affinity)
setProcessAffinity(affinity);
else if (globals->getPropBool("@autoAffinity", true))
{
const char * nodes = globals->queryProp("@autoNodeAffinityNodes");
unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
setAutoAffinity(processOnNode, slavesPerNode, nodes);
}

//The default policy is to allocate from the local node, so restricting allocations to the current sockets
//may not buy much once the affinity is set up. It also means it will fail if there is no memory left on
//this socket - even if there is on others.
//Therefore it is not recommended unless you have maybe several independent thors running on the same machines
//with exclusive access to memory.
if (globals->getPropBool("@numaBindLocal", false))
bindMemoryToLocalNodes();
}

static std::atomic<bool> isRegistered {false};

static bool RegisterSelf(SocketEndpoint &masterEp)
Expand Down Expand Up @@ -153,8 +174,11 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
return false;
}

// NB: if any resource cpu restriction, this superceeds any affinity that setSlaveAffinity may have applied
applyResourcedCPUAffinity(globals->queryPropTree("workerResources"));
if (!applyResourcedCPUAffinity(globals->queryPropTree("workerResources")))
{
// NB: autoAffinity/affinity only applicable in the absence of workerResources.cpu
setSlaveAffinity(globals->getPropInt("@slaveprocessnum"));
}

StringBuffer xpath;
getExpertOptPath(nullptr, xpath); // 'expert' in container world, or 'Debug' in bare-metal
Expand Down Expand Up @@ -315,27 +339,6 @@ ILogMsgHandler *startSlaveLog()
return logHandler;
}

void setSlaveAffinity(unsigned processOnNode)
{
const char * affinity = globals->queryProp("@affinity");
if (affinity)
setProcessAffinity(affinity);
else if (globals->getPropBool("@autoAffinity", true))
{
const char * nodes = globals->queryProp("@autoNodeAffinityNodes");
unsigned slavesPerNode = globals->getPropInt("@slavesPerNode", 1);
setAutoAffinity(processOnNode, slavesPerNode, nodes);
}

//The default policy is to allocate from the local node, so restricting allocations to the current sockets
//may not buy much once the affinity is set up. It also means it will fail if there is no memory left on
//this socket - even if there is on others.
//Therefore it is not recommended unless you have maybe several independent thors running on the same machines
//with exclusive access to memory.
if (globals->getPropBool("@numaBindLocal", false))
bindMemoryToLocalNodes();
}

int main( int argc, const char *argv[] )
{
if (!checkCreateDaemon(argc, argv))
Expand Down Expand Up @@ -380,6 +383,9 @@ int main( int argc, const char *argv[] )
globals.setown(loadConfiguration(globals, argv, "thor", "THOR", nullptr, nullptr, nullptr, false));
#endif

// NB: the thor configuration is serialized from the manager and only available after RegisterSelf
// Until that point, only properties on the command line are available.

const char *master = globals->queryProp("@master");
if (!master)
usage();
Expand Down Expand Up @@ -411,10 +417,6 @@ int main( int argc, const char *argv[] )
slfEp.port = queryMyNode()->endpoint().port;
setMachinePortBase(slfEp.port);

setSlaveAffinity(globals->getPropInt("@slaveprocessnum"));

if (globals->getPropBool("@MPChannelReconnect"))
getMPServer()->setOpt(mpsopt_channelreopen, "true");
#ifdef USE_MP_LOG
startLogMsgParentReceiver();
LOG(MCdebugProgress, thorJob, "MPServer started on port %d", getFixedPort(TPORT_mp));
Expand All @@ -430,6 +432,9 @@ int main( int argc, const char *argv[] )
if (!slaveLogHandler)
slaveLogHandler = startSlaveLog();

if (globals->getPropBool("@MPChannelReconnect"))
getMPServer()->setOpt(mpsopt_channelreopen, "true");

if (getExpertOptBool("slaveDaliClient"))
enableThorSlaveAsDaliClient();

Expand Down

0 comments on commit 637576a

Please sign in to comment.