diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index 6ac786c5608..47a35e8f1dd 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -179,18 +179,6 @@ class CJobQueueItem: implements IJobQueueItem, public CInterface enqueuedt.setString(dts.str()); } - - - IJobQueueItem* clone() - { - IJobQueueItem* ret = new CJobQueueItem(wu); - ret->setPriority(priority); - ret->setPriority(port); - ret->setEndpoint(ep); - ret->setSessionId(sessid); - return ret; - } - void setPriority(int _priority) { priority = _priority; @@ -1276,13 +1264,6 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } - IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE) // minprio == MAX_INT - used cache priority - { - return dodequeue(minprio,timeout); - } - - - void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem { Owned qi = qitem; @@ -1445,24 +1426,6 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return dotake(qd,wuid,false); } - unsigned takeItems(sQueueData &qd,CJobQueueContents &dest) - { - Cconnlockblock block(this,true); - unsigned ret = copyItemsImpl(qd,dest); - clear(qd); - return ret; - } - - void enqueueItems(sQueueData &qd,CJobQueueContents &items) - { - unsigned n=items.ordinality(); - if (n) { - Cconnlockblock block(this,true); - for (unsigned i=0;inext) - return takeItems(*qdata,dest); - Cconnlockblock block(this,true); - unsigned ret = 0; - ForEachQueue(qd) { - ret += copyItemsImpl(*qd,dest); - clear(*qd); - } - return ret; - } - void enqueueItems(CJobQueueContents &items) - { // enqueues to firs sub-queue (not sure that useful) - assertex(qdata); - return enqueueItems(*qdata,items); - } void clear() { @@ -1798,27 +1742,23 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return initiateconv.getClear(); } - IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay,IDynamicPriority *maxp) + IConversation *acceptConversation(IJobQueueItem *&retitem, unsigned prioritytransitiondelay) { CriticalBlock block(crit); retitem = NULL; assertex(connected); // must be connected - int curmp = maxp?maxp->get():0; - int nextmp = curmp; for (;;) { bool timedout = false; Owned item; { CriticalUnblock unblock(crit); // this is a bit complicated with multi-thor - if (prioritytransitiondelay||maxp) { - item.setown(dodequeue((std::max(curmp,nextmp)/10)*10, // round down to multiple of 10 - prioritytransitiondelay?prioritytransitiondelay:60000,prioritytransitiondelay>0,&timedout)); - // if dynamic priority check every minute - if (!prioritytransitiondelay) { - curmp = nextmp; // using max above is a bit devious to allow transition - nextmp = maxp->get(); - } + if (prioritytransitiondelay) + { + int minprio = 0; + unsigned timeout = prioritytransitiondelay; + bool usePrevPrio = true; + item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout)); } else item.setown(dequeue(INFINITE)); diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index 9435f380c89..c3209904a50 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -22,6 +22,34 @@ #include "jsocket.hpp" #include "dasess.hpp" +/* + * The job queues have the following semantics. + * + * Items are queued with a given priority, at a given offset + * If no position is given, then the insertion position in the queue is determined by finding the first item with a lower priority + * If a position is given, then the priority may be adjusted to ensure it is consistent with the items before it and after it in the queue + * + * When an item is dequeued, the head of the queue is removed. + * + * There is an option in acceptConversation(), currently used in thor, to wait for up to 30 seconds if the priority at the header of the queue is lower + * than the last item dequeued from that queue. I think this is to ensure that high priority workunits get precedence even if there + * are short pauses between graphs. However, this will prevent all thor instances for the same queue from dequeuing for that period. + * + * NOTE: If this logic is included I think this should really be the priority of the last item THIS thor dequeued. + * + * + * We want to add the following semantics: + * + * When a server requests to dequeue an item it can pass a worker priority. + * - If there is an item on the queue, then dequeue it + * - Otherwise record the worker priority in the Client information. (If there are multiple threads do they have to have the same priority??) + * - When an item is received, the WAITING worker with the highest priority gets to process it. + * + * Problems: + * Multiple threads from a single client + * Ensuring there are no race conditions + * + */ interface IJobQueueItem: extends serializable { virtual const char *queryWUID()=0; @@ -32,8 +60,7 @@ interface IJobQueueItem: extends serializable virtual unsigned getPort()=0; // conversation port (not used for DFU server) virtual bool equals(IJobQueueItem *other)=0; - virtual IJobQueueItem* clone()=0; - + virtual void setPriority(int priority)=0; virtual void setOwner(const char *owner)=0; virtual void setEndpoint(const SocketEndpoint &ep)=0; @@ -59,11 +86,6 @@ class WORKUNIT_API CJobQueueContents: public IArrayOf IJobQueueIterator *getIterator(); // only valid during lifetime of CJobQueueContents }; -interface IDynamicPriority -{ - virtual int get()=0; -}; - interface IJobQueueConst: extends IInterface { virtual unsigned ordinality()=0; // number of items on queue @@ -98,7 +120,6 @@ interface IJobQueue: extends IJobQueueConst virtual void connect(bool validateitemsessions)=0; // must be called before dequeueing // validateitemsessions ensures that all queue items have running session virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0; - virtual IJobQueueItem *prioDequeue(int minprio,unsigned timeout=INFINITE)=0; virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release) virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running virtual bool waitStatsChange(unsigned timeout)=0; @@ -106,8 +127,6 @@ interface IJobQueue: extends IJobQueueConst //manipulation virtual IJobQueueItem *take(const char *wuid)=0; // finds and removes - virtual unsigned takeItems(CJobQueueContents &dest)=0; // takes items and clears queue - virtual void enqueueItems(CJobQueueContents &items)=0; // enqueues to first sub-queue virtual bool moveBefore(const char *wuid,const char *nextwuid)=0; virtual bool moveAfter(const char *wuid,const char *prevwuid)=0; virtual bool moveToHead(const char *wuid)=0; @@ -130,7 +149,7 @@ interface IJobQueue: extends IJobQueueConst // conversations: virtual IConversation *initiateConversation(IJobQueueItem *item)=0; // does enqueue - take ownership of item - virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0,IDynamicPriority *maxp=NULL)=0; + virtual IConversation *acceptConversation(IJobQueueItem *&item,unsigned prioritytransitiondelay=0)=0; // does dequeue - returns queue item dequeued virtual void cancelInitiateConversation()=0; // cancels initiateConversation in progress virtual bool cancelInitiateConversation(const char *wuid)=0; // cancels remote initiate diff --git a/initfiles/componentfiles/configschema/xsd/thor.xsd b/initfiles/componentfiles/configschema/xsd/thor.xsd index c9183448b86..8316e0e91d7 100644 --- a/initfiles/componentfiles/configschema/xsd/thor.xsd +++ b/initfiles/componentfiles/configschema/xsd/thor.xsd @@ -175,8 +175,6 @@ hpcc:tooltip="Port increment between slaves on same node"/> - diff --git a/initfiles/componentfiles/configxml/thor.xsd.in b/initfiles/componentfiles/configxml/thor.xsd.in index 696e0985b5f..4e882a3e097 100644 --- a/initfiles/componentfiles/configxml/thor.xsd.in +++ b/initfiles/componentfiles/configxml/thor.xsd.in @@ -491,13 +491,6 @@ - - - - If set true, prevents lower priority jobs starting on a multithor - - - diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 475432c45dd..ed2bd7d7f1c 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -475,47 +475,6 @@ class CIdleShutdown : public CSimpleInterface, implements IThreaded void stop() { sem.signal(); } }; -static int getRunningMaxPriority(const char *qname) -{ - int maxpriority = 0; // ignore neg - try { - Owned conn = querySDS().connect("/Status/Servers",myProcessSession(),RTM_LOCK_READ,30000); - if (conn.get()) - { - Owned it(conn->queryRoot()->getElements("Server")); - ForEach(*it) { - StringBuffer instance; - if(it->query().hasProp("@queue")) - { - const char* queue=it->query().queryProp("@queue"); - if(queue&&(strcmp(queue,qname)==0)) { - Owned wuids = it->query().getElements("WorkUnit"); - ForEach(*wuids) { - IPropertyTree &wu = wuids->query(); - const char* wuid=wu.queryProp(NULL); - if (wuid&&*wuid) { - Owned factory = getWorkUnitFactory(); - Owned workunit = factory->openWorkUnit(wuid); - if (workunit) { - int priority = workunit->getPriorityValue(); - if (priority>maxpriority) - maxpriority = priority; - } - } - } - } - } - } - } - } - catch (IException *e) - { - IERRLOG(e,"getRunningMaxPriority"); - e->Release(); - } - return maxpriority; -} - bool CJobManager::fireException(IException *e) { IArrayOf jobList; @@ -596,23 +555,6 @@ void CJobManager::run() #endif jobq.setown(createJobQueue(queueName.get())); - struct cdynprio: public IDynamicPriority - { - const char *qn; - int get() - { - int p = getRunningMaxPriority(qn); - if (p) - PROGLOG("Dynamic Min priority = %d",p); - return p; - } - } *dp = NULL; - - if (globals->getPropBool("@multiThorPriorityLock")) { - PROGLOG("multiThorPriorityLock enabled"); - dp = new cdynprio; - dp->qn = queueName.get(); - } PROGLOG("verifying mp connection to all slaves"); Owned mpServer = getMPServer(); @@ -666,13 +608,8 @@ void CJobManager::run() { if (exclusiveLockName.length()) { - if (globals->getPropBool("@multiThorPriorityLock")) - FLLOG(MCoperatorWarning, "multiThorPriorityLock cannot be used in conjunction with multiThorExclusionLockName"); - else - { - PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str()); - exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str())); - } + PROGLOG("Multi-Thor exclusive lock defined: %s", exclusiveLockName.str()); + exclLockDaliMutex.setown(createDaliMutex(exclusiveLockName.str())); } } bool jobQConnected = false; @@ -792,7 +729,7 @@ void CJobManager::run() jobQConnected = true; } IJobQueueItem *_item; - conversation.setown(jobq->acceptConversation(_item,30*1000,dp)); // 30s priority transition delay + conversation.setown(jobq->acceptConversation(_item,30*1000)); // 30s priority transition delay item.setown(_item); } } @@ -875,7 +812,6 @@ void CJobManager::run() // reset for next job setProcessAborted(false); } - delete dp; jobq.clear(); }