Skip to content

Commit

Permalink
HPCC-32945 Add support for queue clients with priorities
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Jan 10, 2025
1 parent b061d66 commit 8bc2465
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 153 deletions.
159 changes: 110 additions & 49 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "platform.h"
#include <algorithm>
#include <cstdlib>
#include "limits.h"
#include "jlib.hpp"
#include "jbuff.hpp"
Expand Down Expand Up @@ -51,13 +52,12 @@
JobQueues
JobQueue @name= @count= @state=active|paused|stopped
Edition <num>
Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads)
Client @session= @connected= [@priority=n] @waiting= -- connections and waiting can no longer be > 1
Item* @wuid @owner @node @port @priority @session

#endif



class CJobQueueItem: implements IJobQueueItem, public CInterface
{
int priority;
Expand Down Expand Up @@ -395,7 +395,6 @@ class CJobQueueBase: implements IJobQueueConst, public CInterface
}
public:
sQueueData *qdata;
Semaphore notifysem;
CriticalSection crit;

IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -789,37 +788,43 @@ class CJobQueueConst: public CJobQueueBase
class CJobQueue: public CJobQueueBase, implements IJobQueue
{
public:
sQueueData *activeq;
sQueueData *activeq = nullptr;
SessionId sessionid;
unsigned locknest;
bool writemode;
bool connected;
unsigned locknest = 0;
bool writemode = false;
bool connected = false;
Owned<IConversation> initiateconv;
StringAttr initiatewu;
bool dequeuestop;
bool cancelwaiting;
bool validateitemsessions;
std::atomic<bool> isProcessingDequeue{false}; // Used to detect more than one thread waiting on the same queue
bool dequeuestop = false;
bool cancelwaiting = false;
bool validateitemsessions = false;

class csubs: implements ISDSSubscription, public CInterface
class QueueChangeSubscription : implements ISDSSubscription, public CInterface
{
CJobQueue *parent;
public:
//If this semaphone is in the CJobQueue class then there is a race condition
//A callback may be at this point while the CJobQueue is deleted - causing it to signal
//a deleted semaphore
Semaphore notifysem;
public:
IMPLEMENT_IINTERFACE;
csubs(CJobQueue *_parent)
{
parent = _parent;
}

void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
{
CriticalBlock block(parent->crit);
parent->notifysem.signal();
notifysem.signal();
}
} subs;
};

IMPLEMENT_IINTERFACE;
//This must be an owned pointer, rather than a member, to avoid it being deleted while the notify()
//callback is being called.
Owned<QueueChangeSubscription> notifySubscription;

CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this)
IMPLEMENT_IINTERFACE_USING(CJobQueueBase);

CJobQueue(const char *_qname) : CJobQueueBase(_qname)
{
notifySubscription.setown(new QueueChangeSubscription);
activeq = qdata;
sessionid = myProcessSession();
validateitemsessions = false;
Expand Down Expand Up @@ -1037,7 +1042,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false);
}
}

Expand All @@ -1048,7 +1053,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (!qd->subscriberid) {
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false);
}
unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
if (e!=qd->lastWaitEdition) {
Expand Down Expand Up @@ -1128,7 +1133,24 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
}

sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
bool hasHigherPriorityClients(IPropertyTree * queueTree, __uint64 clientPrio, unsigned threshold)
{
unsigned higher = 0;
Owned<IPropertyTreeIterator> iter = queueTree->getElements("Client");
ForEach(*iter)
{
unsigned __int64 priority = iter->query().getPropInt64("@priority", 0);
if (priority > clientPrio)
{
higher++;
if (higher >= threshold)
return true;
}
}
return false;
}

sQueueData *findbestqueue(bool useprev,int minprio,__uint64 clientPrio,unsigned numqueues,sQueueData **queues)
{
if (numqueues==0)
return NULL;
Expand All @@ -1139,7 +1161,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
for (unsigned i=0;i<numqueues;i++) {
sQueueData *qd = queues[i];
unsigned count = qd->root->getPropInt("@count");
if (count) {
if (count)
{
if (hasHigherPriorityClients(qd->root, clientPrio, count))
continue;

int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
StringBuffer path;
Expand All @@ -1160,17 +1186,33 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return best;
}

void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
void setWaiting(unsigned numqueues,sQueueData **queues, unsigned __int64 clientPrio, bool set)
{
for (unsigned i=0; i<numqueues; i++) {
IPropertyTree *croot = queryClientRootSession(*queues[i]);
croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
//If a non-zero client priority has been specified, add (or remove) it from the list of priorities
if (clientPrio)
{
if (set)
croot->setPropInt64("@priority", clientPrio);
else
croot->removeProp("@priority");
}
}
}

// 'simple' queuing
IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
IJobQueueItem *dodequeue(int minprio, __uint64 clientPrio, unsigned timeout, bool useprev, bool * timedout)
{
//If more than one thread is waiting on the queue, then the queue code does not work correctly
//It is undefined which thread the semaphore signal will wake up.
//E.g. there is one thread with a minimum priority of 0, and another with a minimum of 100, and an item of
//priority 50 is queued. If the minimum priority of 100 is woken twice nothing will be dequeued.
//Similar problems occur when the clientPriority is mixed.
if (isProcessingDequeue.exchange(true))
throw MakeStringException(0, "Multiple concurrent dequeue not supported");

bool hasminprio=(minprio!=INT_MIN);
if (timedout)
*timedout = false;
Expand Down Expand Up @@ -1200,23 +1242,30 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
active.append(qd);
}
if (stopped==total)
{
isProcessingDequeue.store(false);
return NULL; // all stopped
}
sQueueData **activeqds = (sQueueData **)active.getArray();
unsigned activenum = active.ordinality();
if (activenum) {
sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
sQueueData *bestqd = findbestqueue(useprev,minprio,clientPrio,activenum,activeqds);
unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
// load minp from cache
if (count) {
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
if (!hasminprio||checkprio(*bestqd,mpr)) {
block.setRollback(false);
ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
if (ret) // think it must be!
timeout = 0; // so mark that done
else if (!hasminprio) {
WARNLOG("Resetting queue %s",bestqd->qname.get());
clear(*bestqd); // reset queue as seems to have become out of sync
if (count)
{
if (!hasHigherPriorityClients(bestqd->root, clientPrio, count))
{
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
if (!hasminprio||checkprio(*bestqd,mpr)) {
block.setRollback(false);
ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
if (ret) // think it must be!
timeout = 0; // so mark that done
else if (!hasminprio) {
WARNLOG("Resetting queue %s",bestqd->qname.get());
clear(*bestqd); // reset queue as seems to have become out of sync
}
}
}
}
Expand All @@ -1226,15 +1275,15 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
block.setRollback(false);
}
if (!waitingset) {
setWaiting(activenum,activeqds,true);
setWaiting(activenum, activeqds, clientPrio, true);
block.commit();
waitingset = true;
}
}
}
if (timeout==0) {
if (waitingset) {
setWaiting(activenum,activeqds,false);
setWaiting(activenum, activeqds, clientPrio, false);
block.commit();
}
if (timedout)
Expand All @@ -1246,7 +1295,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
// check every 5 mins independant of notify (in case subscription lost for some reason)
if (to>timeout)
to = timeout;
notifysem.wait(to);
notifySubscription->notifysem.wait(to);
if (timeout!=(unsigned)INFINITE) {
t = msTick()-t;
if (t<timeout)
Expand All @@ -1255,12 +1304,14 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
timeout = 0;
}
}

isProcessingDequeue.store(false);
return ret;
}

IJobQueueItem *dequeue(unsigned timeout=INFINITE)
{
return dodequeue(INT_MIN,timeout);
return dodequeue(INT_MIN, 0, timeout, false, nullptr);
}


Expand All @@ -1270,13 +1321,18 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (prioritytransitiondelay)
{
bool usePrevPrio = true;
item.setown(dodequeue(minPrio, prioritytransitiondelay, usePrevPrio, nullptr));
item.setown(dodequeue(minPrio, 0, prioritytransitiondelay, usePrevPrio, nullptr));
}
if (!item)
item.setown(dodequeue(minPrio, timeout-prioritytransitiondelay, false, nullptr));
item.setown(dodequeue(minPrio, 0, timeout-prioritytransitiondelay, false, nullptr));
return item.getClear();
}

IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)
{
return dodequeue(INT_MIN, priority, timeout, false, nullptr);
}

void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
{
Owned<IJobQueueItem> qi = qitem;
Expand Down Expand Up @@ -1627,6 +1683,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return (state&&(strcmp(state,"stopped")==0));
}

void removeClient(sQueueData & qd, IPropertyTree * croot)
{
qd.root->removeTree(croot);
}

void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
{
Cconnlockblock block(this,false);
Expand All @@ -1639,7 +1700,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
break;
if (validateitemsessions && !validSession(croot)) {
Cconnlockblock block(this,true);
qd.root->removeTree(croot);
removeClient(qd, croot);
}
else {
waiting += croot->getPropInt("@waiting");
Expand Down Expand Up @@ -1771,7 +1832,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
int minprio = 0;
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout));
item.setown(dodequeue(minprio, 0, timeout, usePrevPrio, &timedout));
}
else
item.setown(dequeue(INFINITE));
Expand Down Expand Up @@ -1814,7 +1875,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
{
CriticalBlock block(crit);
dequeuestop = true;
notifysem.signal();
notifySubscription->notifysem.signal();
}

bool cancelInitiateConversation(sQueueData &qd,const char *wuid)
Expand Down Expand Up @@ -1851,7 +1912,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (haschanged())
return true;
}
if (!notifysem.wait(timeout))
if (!notifySubscription->notifysem.wait(timeout))
break;
}
return false;
Expand All @@ -1860,7 +1921,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
{
CriticalBlock block(crit);
cancelwaiting = true;
notifysem.signal();
notifySubscription->notifysem.signal();
}

virtual void enqueue(IJobQueueItem *qitem)
Expand Down
1 change: 1 addition & 0 deletions common/workunit/wujobq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ interface IJobQueue: extends IJobQueueConst
// validateitemsessions ensures that all queue items have running session
virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0;
virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0;
virtual IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)=0;
virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release)
virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running
virtual bool waitStatsChange(unsigned timeout)=0;
Expand Down
Loading

0 comments on commit 8bc2465

Please sign in to comment.