Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32945 Add support for queue clients with priorities #19306

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 100 additions & 37 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "platform.h"
#include <algorithm>
#include <cstdlib>
#include "limits.h"
#include "jlib.hpp"
#include "jbuff.hpp"
Expand Down Expand Up @@ -51,13 +52,12 @@
JobQueues
JobQueue @name= @count= @state=active|paused|stopped
Edition <num>
Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads)
Client @session= @connected= [@priority=n] @waiting= -- connections and waiting can no longer be > 1
Item* @wuid @owner @node @port @priority @session

#endif



class CJobQueueItem: implements IJobQueueItem, public CInterface
{
int priority;
Expand Down Expand Up @@ -789,16 +789,17 @@ class CJobQueueConst: public CJobQueueBase
class CJobQueue: public CJobQueueBase, implements IJobQueue
{
public:
sQueueData *activeq;
sQueueData *activeq = nullptr;
SessionId sessionid;
unsigned locknest;
bool writemode;
bool connected;
unsigned locknest = 0;
bool writemode = false;
bool connected = false;
Owned<IConversation> initiateconv;
StringAttr initiatewu;
bool dequeuestop;
bool cancelwaiting;
bool validateitemsessions;
std::atomic<bool> isProcessingDequeue = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: = false

bool dequeuestop = false;
bool cancelwaiting = false;
bool validateitemsessions = false;

class csubs: implements ISDSSubscription, public CInterface
{
Expand All @@ -811,15 +812,21 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
{
CriticalBlock block(parent->crit);
//There is a race condition - a callback may be at this point while the CJobQueue is deleted.
//Adding a critical section in parent makes it much more likely to be hit.
//Ultimately the semaphore should be moved to this class instead
//CriticalBlock block(parent->crit);
parent->notifysem.signal();
}
} subs;
};

IMPLEMENT_IINTERFACE;
Owned<csubs> subs;

IMPLEMENT_IINTERFACE_USING(CJobQueueBase);

CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this)
CJobQueue(const char *_qname) : CJobQueueBase(_qname)
{
subs.setown(new csubs(this));
activeq = qdata;
sessionid = myProcessSession();
validateitemsessions = false;
Expand Down Expand Up @@ -1037,7 +1044,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *subs, false);
}
}

Expand All @@ -1048,7 +1055,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (!qd->subscriberid) {
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *subs, false);
}
unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
if (e!=qd->lastWaitEdition) {
Expand Down Expand Up @@ -1128,7 +1135,24 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
}

sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
bool hasHigherPriorityClients(IPropertyTree * queueTree, __uint64 clientPrio, unsigned threshold)
{
unsigned higher = 0;
Owned<IPropertyTreeIterator> iter = queueTree->getElements("Client");
ForEach(*iter)
{
unsigned __int64 priority = iter->query().getPropInt64("@priority", 0);
if (priority > clientPrio)
{
higher++;
if (higher >= threshold)
return true;
}
}
return false;
}

sQueueData *findbestqueue(bool useprev,int minprio,__uint64 clientPrio,unsigned numqueues,sQueueData **queues)
{
if (numqueues==0)
return NULL;
Expand All @@ -1139,7 +1163,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
for (unsigned i=0;i<numqueues;i++) {
sQueueData *qd = queues[i];
unsigned count = qd->root->getPropInt("@count");
if (count) {
if (count)
{
if (hasHigherPriorityClients(qd->root, clientPrio, count))
continue;

int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
StringBuffer path;
Expand All @@ -1160,17 +1188,33 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return best;
}

void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
void setWaiting(unsigned numqueues,sQueueData **queues, unsigned __int64 clientPrio, bool set)
{
for (unsigned i=0; i<numqueues; i++) {
IPropertyTree *croot = queryClientRootSession(*queues[i]);
croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
//If a non-zero client priority has been specified, add (or remove) it from the list of priorities
if (clientPrio)
{
if (set)
croot->setPropInt64("@priority", clientPrio);
else
croot->removeProp("@priority");
}
}
}

// 'simple' queuing
IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
IJobQueueItem *dodequeue(int minprio, __uint64 clientPrio, unsigned timeout, bool useprev, bool * timedout)
{
//If more than one thread is waiting on the queue, then the queue code does not work correctly
//It is undefined which thread the semaphore signal will wake up.
//E.g. there is one thread with a minimum priority of 0, and another with a minimum of 100, and an item of
//priority 50 is queued. If the minimum priority of 100 is woken twice nothing will be dequeued.
//Similar problems occur when the clientPriority is mixed.
if (isProcessingDequeue.exchange(true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you sleep a tiny amount and try again instead of throwing ?
What is the implication if we throw ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see comment -
no component has more than one thread listening to a single job queue object.

throw MakeStringException(0, "Multiple concurrent dequeue not supported");

bool hasminprio=(minprio!=INT_MIN);
if (timedout)
*timedout = false;
Expand Down Expand Up @@ -1200,23 +1244,30 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
active.append(qd);
}
if (stopped==total)
{
isProcessingDequeue.store(false);
return NULL; // all stopped
}
sQueueData **activeqds = (sQueueData **)active.getArray();
unsigned activenum = active.ordinality();
if (activenum) {
sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
sQueueData *bestqd = findbestqueue(useprev,minprio,clientPrio,activenum,activeqds);
unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
// load minp from cache
if (count) {
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
if (!hasminprio||checkprio(*bestqd,mpr)) {
block.setRollback(false);
ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
if (ret) // think it must be!
timeout = 0; // so mark that done
else if (!hasminprio) {
WARNLOG("Resetting queue %s",bestqd->qname.get());
clear(*bestqd); // reset queue as seems to have become out of sync
if (count)
{
if (!hasHigherPriorityClients(bestqd->root, clientPrio, count))
{
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
if (!hasminprio||checkprio(*bestqd,mpr)) {
block.setRollback(false);
ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
if (ret) // think it must be!
timeout = 0; // so mark that done
else if (!hasminprio) {
WARNLOG("Resetting queue %s",bestqd->qname.get());
clear(*bestqd); // reset queue as seems to have become out of sync
}
}
}
}
Expand All @@ -1226,15 +1277,15 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
block.setRollback(false);
}
if (!waitingset) {
setWaiting(activenum,activeqds,true);
setWaiting(activenum, activeqds, clientPrio, true);
block.commit();
waitingset = true;
}
}
}
if (timeout==0) {
if (waitingset) {
setWaiting(activenum,activeqds,false);
setWaiting(activenum, activeqds, clientPrio, false);
block.commit();
}
if (timedout)
Expand All @@ -1255,12 +1306,14 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
timeout = 0;
}
}

isProcessingDequeue.store(false);
return ret;
}

IJobQueueItem *dequeue(unsigned timeout=INFINITE)
{
return dodequeue(INT_MIN,timeout);
return dodequeue(INT_MIN, 0, timeout, false, nullptr);
}


Expand All @@ -1271,13 +1324,18 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
{
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minPrio, timeout, usePrevPrio, nullptr));
item.setown(dodequeue(minPrio, 0, timeout, usePrevPrio, nullptr));
}
if (!item)
item.setown(dodequeue(minPrio, timeout-prioritytransitiondelay, false, nullptr));
item.setown(dodequeue(minPrio, 0, timeout-prioritytransitiondelay, false, nullptr));
return item.getClear();
}

IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)
{
return dodequeue(INT_MIN, priority, timeout, false, nullptr);
}

void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
{
Owned<IJobQueueItem> qi = qitem;
Expand Down Expand Up @@ -1628,6 +1686,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return (state&&(strcmp(state,"stopped")==0));
}

void removeClient(sQueueData & qd, IPropertyTree * croot)
{
qd.root->removeTree(croot);
}

void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
{
Cconnlockblock block(this,false);
Expand All @@ -1640,7 +1703,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
break;
if (validateitemsessions && !validSession(croot)) {
Cconnlockblock block(this,true);
qd.root->removeTree(croot);
removeClient(qd, croot);
}
else {
waiting += croot->getPropInt("@waiting");
Expand Down Expand Up @@ -1772,7 +1835,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
int minprio = 0;
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout));
item.setown(dodequeue(minprio, 0, timeout, usePrevPrio, &timedout));
}
else
item.setown(dequeue(INFINITE));
Expand Down
1 change: 1 addition & 0 deletions common/workunit/wujobq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ interface IJobQueue: extends IJobQueueConst
// validateitemsessions ensures that all queue items have running session
virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0;
virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0;
virtual IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)=0;
virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release)
virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running
virtual bool waitStatsChange(unsigned timeout)=0;
Expand Down
Loading