Skip to content

Commit

Permalink
HPCC-33166 Roxie dynamic priority adjust BG thread priority
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <[email protected]>
  • Loading branch information
mckellyln committed Jan 16, 2025
1 parent 30c5ecb commit 7362637
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 2 deletions.
2 changes: 2 additions & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ extern bool blockedLocalAgent;
extern bool acknowledgeAllRequests;
extern unsigned packetAcknowledgeTimeout;
extern cycle_t dynPriorityAdjustCycles;
extern bool traceThreadStartDelay;
extern int adjustBGThreadNiceValue;
extern bool alwaysTrustFormatCrcs;
extern bool allFilesDynamic;
extern bool lockSuperFiles;
Expand Down
8 changes: 8 additions & 0 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
cycle_t dynPriorityAdjustCycles = 0; // default off (0)
bool traceThreadStartDelay = true;
int adjustBGThreadNiceValue = 5;
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
unsigned parallelLoopFlowLimit = 100;
Expand Down Expand Up @@ -1010,6 +1012,12 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
unsigned dynAdjustMsec = topology->getPropInt("@dynPriorityAdjustTime", 0);
if (dynAdjustMsec)
dynPriorityAdjustCycles = dynAdjustMsec * (queryOneSecCycles() / 1000ULL);
traceThreadStartDelay = topology->getPropBool("@traceThreadStartDelay", traceThreadStartDelay);
adjustBGThreadNiceValue = topology->getPropInt("@adjustBGThreadNiceValue", adjustBGThreadNiceValue);
if (adjustBGThreadNiceValue < 0)
adjustBGThreadNiceValue = 0;
if (adjustBGThreadNiceValue > 19)
adjustBGThreadNiceValue = 19;
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0));
Expand Down
11 changes: 9 additions & 2 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,13 @@ class RoxieQueue : public CInterface, implements IThreadFactory
if (qname && *qname)
tname.appendf(" (%s)", qname);
workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers));
if (traceThreadStartDelay)
workers->setStartDelayTracing(60);
if (qname && *qname)
{
if (streq(qname, "BG"))
workers->setNiceValue(adjustBGThreadNiceValue);
}
started = 0;
idle = 0;
if (IBYTIbufferSize)
Expand Down Expand Up @@ -1893,7 +1900,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers, "SLA"), hiQueue(_numWorkers, "HIGH"), loQueue(_numWorkers, "LOW"), bgQueue(_numWorkers/2 + 1, "BG"), numWorkers(_numWorkers)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers, "SLA"), hiQueue(_numWorkers, "HIGH"), loQueue(_numWorkers, "LOW"), bgQueue(_numWorkers, "BG"), numWorkers(_numWorkers)
{
}

Expand All @@ -1902,7 +1909,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
loQueue.start();
hiQueue.start();
slaQueue.start();
bgQueue.start(); // consider nice(+3) BG threads
bgQueue.start(); // NB BG thread priority can be adjusted
}

virtual void stop()
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4611,6 +4611,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE;
unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles();
UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec);
p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK;
p->queryHeader().activityId |= ROXIE_BG_PRIORITY;
// TODO: what to do about still running activities' continuation/ack priorities ?
}
Expand Down
7 changes: 7 additions & 0 deletions system/jlib/jthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
unsigned stacksize;
unsigned timeoutOnRelease;
unsigned traceStartDelayPeriod = 0;
int niceValue = 0;
unsigned startsInPeriod = 0;
cycle_t startDelayInPeriod = 0;
CCycleTimer overAllTimer;
Expand Down Expand Up @@ -1114,6 +1115,8 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew());
if (stacksize)
ret.setStackSize(stacksize);
if (niceValue)
ret.setNice(niceValue);
ret.start(false);
threadwrappers.append(ret);
return ret;
Expand Down Expand Up @@ -1281,6 +1284,10 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
{
traceStartDelayPeriod = secs;
}
void setNiceValue(int value)
{
niceValue = value;
}
bool waitAvailable(unsigned timeout)
{
if (!defaultmax)
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jthread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ interface IThreadPool : extends IInterface
virtual unsigned runningCount()=0; // number of currently running threads
virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception
virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period
virtual void setNiceValue(int value) = 0; // set priority for thread
virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available
};

Expand Down

0 comments on commit 7362637

Please sign in to comment.