Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32950 Remove multiThorPriorityLock semantics from queueing #19276

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 7 additions & 67 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,6 @@ class CJobQueueItem: implements IJobQueueItem, public CInterface
enqueuedt.setString(dts.str());
}



IJobQueueItem* clone()
{
IJobQueueItem* ret = new CJobQueueItem(wu);
ret->setPriority(priority);
ret->setPriority(port);
ret->setEndpoint(ep);
ret->setSessionId(sessid);
return ret;
}

void setPriority(int _priority)
{
priority = _priority;
Expand Down Expand Up @@ -1276,13 +1264,6 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}


IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE) // minprio == MAX_INT - used cache priority
{
return dodequeue(minprio,timeout);
}



void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
{
Owned<IJobQueueItem> qi = qitem;
Expand Down Expand Up @@ -1445,32 +1426,13 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return dotake(qd,wuid,false);
}

unsigned takeItems(sQueueData &qd,CJobQueueContents &dest)
{
Cconnlockblock block(this,true);
unsigned ret = copyItemsImpl(qd,dest);
clear(qd);
return ret;
}

void enqueueItems(sQueueData &qd,CJobQueueContents &items)
{
unsigned n=items.ordinality();
if (n) {
Cconnlockblock block(this,true);
for (unsigned i=0;i<n;i++)
enqueue(qd,items.item(i).clone());
}
}

void enqueueBefore(IJobQueueItem *qitem,const char *wuid)
{
Cconnlockblock block(this,true);
sQueueData *qd = qdata->next?findQD(wuid):qdata;
enqueueBefore(*qd,qitem,wuid);
}


void enqueueAfter(IJobQueueItem *qitem,const char *wuid)
{
Cconnlockblock block(this,true);
Expand Down Expand Up @@ -1739,24 +1701,6 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
return NULL;
}
unsigned takeItems(CJobQueueContents &dest)
{
assertex(qdata);
if (!qdata->next)
return takeItems(*qdata,dest);
Cconnlockblock block(this,true);
unsigned ret = 0;
ForEachQueue(qd) {
ret += copyItemsImpl(*qd,dest);
clear(*qd);
}
return ret;
}
void enqueueItems(CJobQueueContents &items)
{ // enqueues to firs sub-queue (not sure that useful)
assertex(qdata);
return enqueueItems(*qdata,items);
}

void clear()
{
Expand Down Expand Up @@ -1798,27 +1742,23 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return initiateconv.getClear();
}

IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp)
IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay)
{
CriticalBlock block(crit);
retitem = NULL;
assertex(connected); // must be connected
int curmp = maxp?maxp->get():0;
int nextmp = curmp;
for (;;) {
bool timedout = false;
Owned<IJobQueueItem> item;
{
CriticalUnblock unblock(crit);
// this is a bit complicated with multi-thor
if (prioritytransitiondelay||maxp) {
item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10
prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout));
// if dynamic priority check every minute
if (!prioritytransitiondelay) {
curmp = nextmp; // using max above is a bit devious to allow transition
nextmp = maxp->get();
}
if (prioritytransitiondelay)
{
int minprio = 0;
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout));
}
else
item.setown(dequeue(INFINITE));
Expand Down
41 changes: 30 additions & 11 deletions common/workunit/wujobq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,34 @@
#include "jsocket.hpp"
#include "dasess.hpp"

/*
* The job queues have the following semantics.
*
* Items are queued with a given priority, at a given offset
* If no position is given, then the insertion position in the queue is determined by finding the first item with a lower priority
* If a position is given, then the priority may be adjusted to ensure it is consistent with the items before it and after it in the queue
*
* When an item is dequeued, the head of the queue is removed.
*
* There is an option in acceptConversation(), currently used in thor, to wait for up to 30 seconds if the priority at the header of the queue is lower
* than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there
* are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjust comment, since now removed..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That option is still currently there...

*
* NOTE: If this logic is included I think this should really be the priority of the last item THIS thor dequeued.
*
*
* We want to add the following semantics:
*
* When a server requests to dequeue an item it can pass a worker priority.
* - If there is an item on the queue, then dequeue it
* - Otherwise record the worker priority in the Client information. (If there are multiple threads do they have to have the same priority??)
* - When an item is received, the WAITING worker with the highest priority gets to process it.
*
* Problems:
* Multiple threads from a single client
* Ensuring there are no race conditions
*
*/
interface IJobQueueItem: extends serializable
{
virtual const char *queryWUID()=0;
Expand All @@ -32,8 +60,7 @@ interface IJobQueueItem: extends serializable

virtual unsigned getPort()=0; // conversation port (not used for DFU server)
virtual bool equals(IJobQueueItem *other)=0;
virtual IJobQueueItem* clone()=0;


virtual void setPriority(int priority)=0;
virtual void setOwner(const char *owner)=0;
virtual void setEndpoint(const SocketEndpoint &ep)=0;
Expand All @@ -59,11 +86,6 @@ class WORKUNIT_API CJobQueueContents: public IArrayOf<IJobQueueItem>
IJobQueueIterator *getIterator(); // only valid during lifetime of CJobQueueContents
};

interface IDynamicPriority
{
virtual int get()=0;
};

interface IJobQueueConst: extends IInterface
{
virtual unsigned ordinality()=0; // number of items on queue
Expand Down Expand Up @@ -98,16 +120,13 @@ interface IJobQueue: extends IJobQueueConst
virtual void connect(bool validateitemsessions)=0; // must be called before dequeueing
// validateitemsessions ensures that all queue items have running session
virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0;
virtual IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE)=0;
virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release)
virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running
virtual bool waitStatsChange(unsigned timeout)=0;
virtual void cancelWaitStatsChange()=0;

//manipulation
virtual IJobQueueItem *take(const char *wuid)=0; // finds and removes
virtual unsigned takeItems(CJobQueueContents &dest)=0; // takes items and clears queue
virtual void enqueueItems(CJobQueueContents &items)=0; // enqueues to first sub-queue
virtual bool moveBefore(const char *wuid,const char *nextwuid)=0;
virtual bool moveAfter(const char *wuid,const char *prevwuid)=0;
virtual bool moveToHead(const char *wuid)=0;
Expand All @@ -130,7 +149,7 @@ interface IJobQueue: extends IJobQueueConst

// conversations:
virtual IConversation *initiateConversation(IJobQueueItem *item)=0; // does enqueue - take ownership of item
virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0,IDynamicPriority *maxp=NULL)=0;
virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0)=0;
// does dequeue - returns queue item dequeued
virtual void cancelInitiateConversation()=0; // cancels initiateConversation in progress
virtual bool cancelInitiateConversation(const char *wuid)=0; // cancels remote initiate
Expand Down
2 changes: 0 additions & 2 deletions initfiles/componentfiles/configschema/xsd/thor.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@
hpcc:tooltip="Port increment between slaves on same node"/>
<xs:attribute name="multiThorMemoryThreshold" type="xs:nonNegativeInteger" hpcc:displayName="Multi Thor Memory Threshold(MB)"
hpcc:tooltip="Memory usage (in MB) beneath which multiple Thors will run in parallel. Leave blank if no limit"/>
<xs:attribute name="multiThorPriorityLock" type="xs:boolean" hpcc:displayName="Mult Thor Priority Lock" hpcc:presetValue="false"
hpcc:tooltip="If set true, prevents lower priority jobs starting on a multithor"/>
<xs:attribute name="multiThorExclusionLockName" type="xs:string" hpcc:displayName="Multi Thor Exclusion Lock Name"
hpcc:tooltip="Prevents other thors (on any queue) sharing the same multiThorExclusionLockName name from running jobs at the same time"/>
<!--todo seems this needs to be true for multinode, what does that mean? (see old xsd and search for autogendefaultformultinode) -->
Expand Down
7 changes: 0 additions & 7 deletions initfiles/componentfiles/configxml/thor.xsd.in
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,6 @@
</xs:appinfo>
</xs:annotation>
</xs:attribute>
<xs:attribute name="multiThorPriorityLock" type="xs:boolean" use="optional" default="false">
<xs:annotation>
<xs:appinfo>
<tooltip>If set true, prevents lower priority jobs starting on a multithor</tooltip>
</xs:appinfo>
</xs:annotation>
</xs:attribute>
<xs:attribute name="multiThorExclusionLockName" type="xs:string" use="optional">
<xs:annotation>
<xs:appinfo>
Expand Down
70 changes: 3 additions & 67 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,47 +475,6 @@ class CIdleShutdown : public CSimpleInterface, implements IThreaded
void stop() { sem.signal(); }
};

static int getRunningMaxPriority(const char *qname)
{
int maxpriority = 0; // ignore neg
try {
Owned<IRemoteConnection> conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000);
if (conn.get())
{
Owned<IPropertyTreeIterator> it(conn->queryRoot()->getElements("Server"));
ForEach(*it) {
StringBuffer instance;
if(it->query().hasProp("@queue"))
{
const char* queue=it->query().queryProp("@queue");
if(queue&&(strcmp(queue,qname)==0)) {
Owned<IPropertyTreeIterator> wuids = it->query().getElements("WorkUnit");
ForEach(*wuids) {
IPropertyTree &wu = wuids->query();
const char* wuid=wu.queryProp(NULL);
if (wuid&&*wuid) {
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
if (workunit) {
int priority = workunit->getPriorityValue();
if (priority>maxpriority)
maxpriority = priority;
}
}
}
}
}
}
}
}
catch (IException *e)
{
IERRLOG(e,"getRunningMaxPriority");
e->Release();
}
return maxpriority;
}

bool CJobManager::fireException(IException *e)
{
IArrayOf<CJobMaster> jobList;
Expand Down Expand Up @@ -596,23 +555,6 @@ void CJobManager::run()
#endif

jobq.setown(createJobQueue(queueName.get()));
struct cdynprio: public IDynamicPriority
{
const char *qn;
int get()
{
int p = getRunningMaxPriority(qn);
if (p)
PROGLOG("Dynamic Min priority = %d",p);
return p;
}
} *dp = NULL;

if (globals->getPropBool("@multiThorPriorityLock")) {
PROGLOG("multiThorPriorityLock enabled");
dp = new cdynprio;
dp->qn = queueName.get();
}

PROGLOG("verifying mp connection to all slaves");
Owned<IMPServer> mpServer = getMPServer();
Expand Down Expand Up @@ -666,13 +608,8 @@ void CJobManager::run()
{
if (exclusiveLockName.length())
{
if (globals->getPropBool("@multiThorPriorityLock"))
FLLOG(MCoperatorWarning, "multiThorPriorityLock cannot be used in conjunction with multiThorExclusionLockName");
else
{
PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str());
exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str()));
}
PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str());
exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str()));
}
}
bool jobQConnected = false;
Expand Down Expand Up @@ -792,7 +729,7 @@ void CJobManager::run()
jobQConnected = true;
}
IJobQueueItem *_item;
conversation.setown(jobq->acceptConversation(_item,30*1000,dp)); // 30s priority transition delay
conversation.setown(jobq->acceptConversation(_item,30*1000)); // 30s priority transition delay
item.setown(_item);
}
}
Expand Down Expand Up @@ -875,7 +812,6 @@ void CJobManager::run()
// reset for next job
setProcessAborted(false);
}
delete dp;
jobq.clear();
}

Expand Down
Loading