Skip to content

Commit

Permalink
Merge pull request #19276 from ghalliday/issue32950
Browse files Browse the repository at this point in the history
HPCC-32950 Remove multiThorPriorityLock semantics from queueing

Reviewed-by: Jake Smith <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Nov 6, 2024
2 parents 55df43c + 128a125 commit 02f9a06
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 154 deletions.
74 changes: 7 additions & 67 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,6 @@ class CJobQueueItem: implements IJobQueueItem, public CInterface
enqueuedt.setString(dts.str());
}



IJobQueueItem* clone()
{
IJobQueueItem* ret = new CJobQueueItem(wu);
ret->setPriority(priority);
ret->setPriority(port);
ret->setEndpoint(ep);
ret->setSessionId(sessid);
return ret;
}

void setPriority(int _priority)
{
priority = _priority;
Expand Down Expand Up @@ -1276,13 +1264,6 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}


IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE) // minprio == MAX_INT - used cache priority
{
return dodequeue(minprio,timeout);
}



void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
{
Owned<IJobQueueItem> qi = qitem;
Expand Down Expand Up @@ -1445,32 +1426,13 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return dotake(qd,wuid,false);
}

unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
{
Cconnlockblock block(this,true);
unsigned ret = copyItemsImpl(qd,dest);
clear(qd);
return ret;
}

void enqueueItems(sQueueData &qd,CJobQueueContents &items)
{
unsigned n=items.ordinality();
if (n) {
Cconnlockblock block(this,true);
for (unsigned i=0;i<n;i++)
enqueue(qd,items.item(i).clone());
}
}

void enqueueBefore(IJobQueueItem *qitem,const char *wuid)
{
Cconnlockblock block(this,true);
sQueueData *qd = qdata->next?findQD(wuid):qdata;
enqueueBefore(*qd,qitem,wuid);
}


void enqueueAfter(IJobQueueItem *qitem,const char *wuid)
{
Cconnlockblock block(this,true);
Expand Down Expand Up @@ -1739,24 +1701,6 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
return NULL;
}
unsigned takeItems(CJobQueueContents &dest)
{
assertex(qdata);
if (!qdata->next)
return takeItems(*qdata,dest);
Cconnlockblock block(this,true);
unsigned ret = 0;
ForEachQueue(qd) {
ret += copyItemsImpl(*qd,dest);
clear(*qd);
}
return ret;
}
void enqueueItems(CJobQueueContents &items)
{ // enqueues to firs sub-queue (not sure that useful)
assertex(qdata);
return enqueueItems(*qdata,items);
}

void clear()
{
Expand Down Expand Up @@ -1798,27 +1742,23 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return initiateconv.getClear();
}

IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp)
IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay)
{
CriticalBlock block(crit);
retitem = NULL;
assertex(connected); // must be connected
int curmp = maxp?maxp->get():0;
int nextmp = curmp;
for (;;) {
bool timedout = false;
Owned<IJobQueueItem> item;
{
CriticalUnblock unblock(crit);
// this is a bit complicated with multi-thor
if (prioritytransitiondelay||maxp) {
item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10
prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout));
// if dynamic priority check every minute
if (!prioritytransitiondelay) {
curmp = nextmp; // using max above is a bit devious to allow transition
nextmp = maxp->get();
}
if (prioritytransitiondelay)
{
int minprio = 0;
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout));
}
else
item.setown(dequeue(INFINITE));
Expand Down
41 changes: 30 additions & 11 deletions common/workunit/wujobq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,34 @@
#include "jsocket.hpp"
#include "dasess.hpp"

/*
* The job queues have the following semantics.
*
* Items are queued with a given priority, at a given offset
* If no position is given, then the insertion position in the queue is determined by finding the first item with a lower priority
* If a position is given, then the priority may be adjusted to ensure it is consistent with the items before it and after it in the queue
*
* When an item is dequeued, the head of the queue is removed.
*
* There is an option in acceptConversation(), currently used in thor, to wait for up to 30 seconds if the priority at the header of the queue is lower
* than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there
* are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period.
*
* NOTE: If this logic is included I think this should really be the priority of the last item THIS thor dequeued.
*
*
* We want to add the following semantics:
*
* When a server requests to dequeue an item it can pass a worker priority.
* - If there is an item on the queue, then dequeue it
* - Otherwise record the worker priority in the Client information. (If there are multiple threads do they have to have the same priority??)
* - When an item is received, the WAITING worker with the highest priority gets to process it.
*
* Problems:
* Multiple threads from a single client
* Ensuring there are no race conditions
*
*/
interface IJobQueueItem: extends serializable
{
virtual const char *queryWUID()=0;
Expand All @@ -32,8 +60,7 @@ interface IJobQueueItem: extends serializable

virtual unsigned getPort()=0; // conversation port (not used for DFU server)
virtual bool equals(IJobQueueItem *other)=0;
virtual IJobQueueItem* clone()=0;


virtual void setPriority(int priority)=0;
virtual void setOwner(const char *owner)=0;
virtual void setEndpoint(const SocketEndpoint &ep)=0;
Expand All @@ -59,11 +86,6 @@ class WORKUNIT_API CJobQueueContents: public IArrayOf<IJobQueueItem>
IJobQueueIterator *getIterator(); // only valid during lifetime of CJobQueueContents
};

interface IDynamicPriority
{
virtual int get()=0;
};

interface IJobQueueConst: extends IInterface
{
virtual unsigned ordinality()=0; // number of items on queue
Expand Down Expand Up @@ -98,16 +120,13 @@ interface IJobQueue: extends IJobQueueConst
virtual void connect(bool validateitemsessions)=0; // must be called before dequeueing
// validateitemsessions ensures that all queue items have running session
virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0;
virtual IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE)=0;
virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release)
virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running
virtual bool waitStatsChange(unsigned timeout)=0;
virtual void cancelWaitStatsChange()=0;

//manipulation
virtual IJobQueueItem *take(const char *wuid)=0; // finds and removes
virtual unsigned takeItems(CJobQueueContents &dest)=0; // takes items and clears queue
virtual void enqueueItems(CJobQueueContents &items)=0; // enqueues to first sub-queue
virtual bool moveBefore(const char *wuid,const char *nextwuid)=0;
virtual bool moveAfter(const char *wuid,const char *prevwuid)=0;
virtual bool moveToHead(const char *wuid)=0;
Expand All @@ -130,7 +149,7 @@ interface IJobQueue: extends IJobQueueConst

// conversations:
virtual IConversation *initiateConversation(IJobQueueItem *item)=0; // does enqueue - take ownership of item
virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0,IDynamicPriority *maxp=NULL)=0;
virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0)=0;
// does dequeue - returns queue item dequeued
virtual void cancelInitiateConversation()=0; // cancels initiateConversation in progress
virtual bool cancelInitiateConversation(const char *wuid)=0; // cancels remote initiate
Expand Down
2 changes: 0 additions & 2 deletions initfiles/componentfiles/configschema/xsd/thor.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@
hpcc:tooltip="Port increment between slaves on same node"/>
<xs:attribute name="multiThorMemoryThreshold" type="xs:nonNegativeInteger" hpcc:displayName="Multi Thor Memory Threshold(MB)"
hpcc:tooltip="Memory usage (in MB) beneath which multiple Thors will run in parallel. Leave blank if no limit"/>
<xs:attribute name="multiThorPriorityLock" type="xs:boolean" hpcc:displayName="Mult Thor Priority Lock" hpcc:presetValue="false"
hpcc:tooltip="If set true, prevents lower priority jobs starting on a multithor"/>
<xs:attribute name="multiThorExclusionLockName" type="xs:string" hpcc:displayName="Multi Thor Exclusion Lock Name"
hpcc:tooltip="Prevents other thors (on any queue) sharing the same multiThorExclusionLockName name from running jobs at the same time"/>
<!--todo seems this needs to be true for multinode, what does that mean? (see old xsd and search for autogendefaultformultinode) -->
Expand Down
7 changes: 0 additions & 7 deletions initfiles/componentfiles/configxml/thor.xsd.in
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,6 @@
</xs:appinfo>
</xs:annotation>
</xs:attribute>
<xs:attribute name="multiThorPriorityLock" type="xs:boolean" use="optional" default="false">
<xs:annotation>
<xs:appinfo>
<tooltip>If set true, prevents lower priority jobs starting on a multithor</tooltip>
</xs:appinfo>
</xs:annotation>
</xs:attribute>
<xs:attribute name="multiThorExclusionLockName" type="xs:string" use="optional">
<xs:annotation>
<xs:appinfo>
Expand Down
70 changes: 3 additions & 67 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,47 +475,6 @@ class CIdleShutdown : public CSimpleInterface, implements IThreaded
void stop() { sem.signal(); }
};

static int getRunningMaxPriority(const char *qname)
{
int maxpriority = 0; // ignore neg
try {
Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
if (conn.get())
{
Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("Server"));
ForEach(*it) {
StringBuffer instance;
if(it->query().hasProp("@queue"))
{
const char* queue=it->query().queryProp("@queue");
if(queue&&(strcmp(queue,qname)==0)) {
Owned<IPropertyTreeIterator> wuids = it->query().getElements("WorkUnit");
ForEach(*wuids) {
IPropertyTree &wu = wuids->query();
const char* wuid=wu.queryProp(NULL);
if (wuid&&*wuid) {
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
if (workunit) {
int priority = workunit->getPriorityValue();
if (priority>maxpriority)
maxpriority = priority;
}
}
}
}
}
}
}
}
catch (IException *e)
{
IERRLOG(e,"getRunningMaxPriority");
e->Release();
}
return maxpriority;
}

bool CJobManager::fireException(IException *e)
{
IArrayOf<CJobMaster> jobList;
Expand Down Expand Up @@ -596,23 +555,6 @@ void CJobManager::run()
#endif

jobq.setown(createJobQueue(queueName.get()));
struct cdynprio: public IDynamicPriority
{
const char *qn;
int get()
{
int p = getRunningMaxPriority(qn);
if (p)
PROGLOG("Dynamic Min priority = %d",p);
return p;
}
} *dp = NULL;

if (globals->getPropBool("@multiThorPriorityLock")) {
PROGLOG("multiThorPriorityLock enabled");
dp = new cdynprio;
dp->qn = queueName.get();
}

PROGLOG("verifying mp connection to all slaves");
Owned<IMPServer> mpServer = getMPServer();
Expand Down Expand Up @@ -666,13 +608,8 @@ void CJobManager::run()
{
if (exclusiveLockName.length())
{
if (globals->getPropBool("@multiThorPriorityLock"))
FLLOG(MCoperatorWarning, "multiThorPriorityLock cannot be used in conjunction with multiThorExclusionLockName");
else
{
PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str());
exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str()));
}
PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str());
exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str()));
}
}
bool jobQConnected = false;
Expand Down Expand Up @@ -792,7 +729,7 @@ void CJobManager::run()
jobQConnected = true;
}
IJobQueueItem *_item;
conversation.setown(jobq->acceptConversation(_item,30*1000,dp)); // 30s priority transition delay
conversation.setown(jobq->acceptConversation(_item,30*1000)); // 30s priority transition delay
item.setown(_item);
}
}
Expand Down Expand Up @@ -875,7 +812,6 @@ void CJobManager::run()
// reset for next job
setProcessAborted(false);
}
delete dp;
jobq.clear();
}

Expand Down

0 comments on commit 02f9a06

Please sign in to comment.