Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32945 Add support for queue clients with priorities #19306

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 122 additions & 47 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "platform.h"
#include <algorithm>
#include <cstdlib>
#include "limits.h"
#include "jlib.hpp"
#include "jbuff.hpp"
Expand Down Expand Up @@ -51,13 +52,12 @@
JobQueues
JobQueue @name= @count= @state=active|paused|stopped
Edition <num>
Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads)
Client @session= @connected= [@priority=n] @waiting= -- connections and waiting can no longer be > 1
Item* @wuid @owner @node @port @priority @session

#endif



class CJobQueueItem: implements IJobQueueItem, public CInterface
{
int priority;
Expand Down Expand Up @@ -395,7 +395,6 @@ class CJobQueueBase: implements IJobQueueConst, public CInterface
}
public:
sQueueData *qdata;
Semaphore notifysem;
CriticalSection crit;

IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -789,37 +788,43 @@ class CJobQueueConst: public CJobQueueBase
class CJobQueue: public CJobQueueBase, implements IJobQueue
{
public:
sQueueData *activeq;
sQueueData *activeq = nullptr;
SessionId sessionid;
unsigned locknest;
bool writemode;
bool connected;
unsigned locknest = 0;
bool writemode = false;
bool connected = false;
Owned<IConversation> initiateconv;
StringAttr initiatewu;
bool dequeuestop;
bool cancelwaiting;
bool validateitemsessions;
std::atomic<bool> isProcessingDequeue = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: = false

bool dequeuestop = false;
bool cancelwaiting = false;
bool validateitemsessions = false;

class csubs: implements ISDSSubscription, public CInterface
class QueueChangeSubscription : implements ISDSSubscription, public CInterface
{
CJobQueue *parent;
public:
//If this semaphone is in the CJobQueue class then there is a race condition
//A callback may be at this point while the CJobQueue is deleted - causing it to signal
//a deleted semaphore
Semaphore notifysem;
public:
IMPLEMENT_IINTERFACE;
csubs(CJobQueue *_parent)
{
parent = _parent;
}

void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
{
CriticalBlock block(parent->crit);
parent->notifysem.signal();
notifysem.signal();
}
} subs;
};

IMPLEMENT_IINTERFACE;
//This must be an owned pointer, rather than a member, to avoid it being deleted while the notify()
//callback is being called.
Owned<QueueChangeSubscription> notifySubscription;

CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this)
IMPLEMENT_IINTERFACE_USING(CJobQueueBase);

CJobQueue(const char *_qname) : CJobQueueBase(_qname)
{
notifySubscription.setown(new QueueChangeSubscription);
activeq = qdata;
sessionid = myProcessSession();
validateitemsessions = false;
Expand Down Expand Up @@ -1037,7 +1042,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false);
}
}

Expand All @@ -1048,7 +1053,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (!qd->subscriberid) {
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false);
}
unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
if (e!=qd->lastWaitEdition) {
Expand Down Expand Up @@ -1128,7 +1133,24 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
}

sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
bool hasHigherPriorityClients(IPropertyTree * queueTree, __uint64 clientPrio, unsigned threshold)
{
unsigned higher = 0;
Owned<IPropertyTreeIterator> iter = queueTree->getElements("Client");
ForEach(*iter)
{
unsigned __int64 priority = iter->query().getPropInt64("@priority", 0);
if (priority > clientPrio)
{
higher++;
if (higher >= threshold)
return true;
}
}
return false;
}

sQueueData *findbestqueue(bool useprev,int minprio,__uint64 clientPrio,unsigned numqueues,sQueueData **queues)
{
if (numqueues==0)
return NULL;
Expand All @@ -1139,7 +1161,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
for (unsigned i=0;i<numqueues;i++) {
sQueueData *qd = queues[i];
unsigned count = qd->root->getPropInt("@count");
if (count) {
if (count)
{
if (hasHigherPriorityClients(qd->root, clientPrio, count))
continue;

int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
StringBuffer path;
Expand All @@ -1160,17 +1186,33 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return best;
}

void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
void setWaiting(unsigned numqueues,sQueueData **queues, unsigned __int64 clientPrio, bool set)
{
for (unsigned i=0; i<numqueues; i++) {
IPropertyTree *croot = queryClientRootSession(*queues[i]);
croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
//If a non-zero client priority has been specified, add (or remove) it from the list of priorities
if (clientPrio)
{
if (set)
croot->setPropInt64("@priority", clientPrio);
else
croot->removeProp("@priority");
}
}
}

// 'simple' queuing
IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
IJobQueueItem *dodequeue(int minprio, __uint64 clientPrio, unsigned timeout, bool useprev, bool * timedout)
{
//If more than one thread is waiting on the queue, then the queue code does not work correctly
//It is undefined which thread the semaphore signal will wake up.
//E.g. there is one thread with a minimum priority of 0, and another with a minimum of 100, and an item of
//priority 50 is queued. If the minimum priority of 100 is woken twice nothing will be dequeued.
//Similar problems occur when the clientPriority is mixed.
if (isProcessingDequeue.exchange(true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you sleep a tiny amount and try again instead of throwing ?
What is the implication if we throw ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see comment -
no component has more than one thread listening to a single job queue object.

throw MakeStringException(0, "Multiple concurrent dequeue not supported");

bool hasminprio=(minprio!=INT_MIN);
if (timedout)
*timedout = false;
Expand Down Expand Up @@ -1200,23 +1242,30 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
active.append(qd);
}
if (stopped==total)
{
isProcessingDequeue.store(false);
return NULL; // all stopped
}
sQueueData **activeqds = (sQueueData **)active.getArray();
unsigned activenum = active.ordinality();
if (activenum) {
sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
sQueueData *bestqd = findbestqueue(useprev,minprio,clientPrio,activenum,activeqds);
unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
// load minp from cache
if (count) {
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
if (!hasminprio||checkprio(*bestqd,mpr)) {
block.setRollback(false);
ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
if (ret) // think it must be!
timeout = 0; // so mark that done
else if (!hasminprio) {
WARNLOG("Resetting queue %s",bestqd->qname.get());
clear(*bestqd); // reset queue as seems to have become out of sync
if (count)
{
if (!hasHigherPriorityClients(bestqd->root, clientPrio, count))
{
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
if (!hasminprio||checkprio(*bestqd,mpr)) {
block.setRollback(false);
ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
if (ret) // think it must be!
timeout = 0; // so mark that done
else if (!hasminprio) {
WARNLOG("Resetting queue %s",bestqd->qname.get());
clear(*bestqd); // reset queue as seems to have become out of sync
}
}
}
}
Expand All @@ -1226,15 +1275,15 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
block.setRollback(false);
}
if (!waitingset) {
setWaiting(activenum,activeqds,true);
setWaiting(activenum, activeqds, clientPrio, true);
block.commit();
waitingset = true;
}
}
}
if (timeout==0) {
if (waitingset) {
setWaiting(activenum,activeqds,false);
setWaiting(activenum, activeqds, clientPrio, false);
block.commit();
}
if (timedout)
Expand All @@ -1246,7 +1295,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
// check every 5 mins independant of notify (in case subscription lost for some reason)
if (to>timeout)
to = timeout;
notifysem.wait(to);
notifySubscription->notifysem.wait(to);
if (timeout!=(unsigned)INFINITE) {
t = msTick()-t;
if (t<timeout)
Expand All @@ -1255,15 +1304,36 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
timeout = 0;
}
}

isProcessingDequeue.store(false);
return ret;
}

IJobQueueItem *dequeue(unsigned timeout=INFINITE)
{
return dodequeue(INT_MIN,timeout);
return dodequeue(INT_MIN, 0, timeout, false, nullptr);
}


IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay) override
{
Owned<IJobQueueItem> item;
if (prioritytransitiondelay)
{
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minPrio, 0, timeout, usePrevPrio, nullptr));
}
if (!item)
item.setown(dodequeue(minPrio, 0, timeout-prioritytransitiondelay, false, nullptr));
return item.getClear();
}

IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)
{
return dodequeue(INT_MIN, priority, timeout, false, nullptr);
}

void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
{
Owned<IJobQueueItem> qi = qitem;
Expand Down Expand Up @@ -1614,6 +1684,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return (state&&(strcmp(state,"stopped")==0));
}

void removeClient(sQueueData & qd, IPropertyTree * croot)
{
qd.root->removeTree(croot);
}

void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
{
Cconnlockblock block(this,false);
Expand All @@ -1626,7 +1701,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
break;
if (validateitemsessions && !validSession(croot)) {
Cconnlockblock block(this,true);
qd.root->removeTree(croot);
removeClient(qd, croot);
}
else {
waiting += croot->getPropInt("@waiting");
Expand Down Expand Up @@ -1758,7 +1833,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
int minprio = 0;
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout));
item.setown(dodequeue(minprio, 0, timeout, usePrevPrio, &timedout));
}
else
item.setown(dequeue(INFINITE));
Expand Down Expand Up @@ -1801,7 +1876,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
{
CriticalBlock block(crit);
dequeuestop = true;
notifysem.signal();
notifySubscription->notifysem.signal();
}

bool cancelInitiateConversation(sQueueData &qd,const char *wuid)
Expand Down Expand Up @@ -1838,7 +1913,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (haschanged())
return true;
}
if (!notifysem.wait(timeout))
if (!notifySubscription->notifysem.wait(timeout))
break;
}
return false;
Expand All @@ -1847,7 +1922,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
{
CriticalBlock block(crit);
cancelwaiting = true;
notifysem.signal();
notifySubscription->notifysem.signal();
}

virtual void enqueue(IJobQueueItem *qitem)
Expand Down
2 changes: 2 additions & 0 deletions common/workunit/wujobq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ interface IJobQueue: extends IJobQueueConst
virtual void connect(bool validateitemsessions)=0; // must be called before dequeueing
// validateitemsessions ensures that all queue items have running session
virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0;
virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0;
virtual IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)=0;
virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release)
virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running
virtual bool waitStatsChange(unsigned timeout)=0;
Expand Down
2 changes: 2 additions & 0 deletions testing/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ include_directories (
./../../dali/base
./../../system/security/shared
./../../common/deftype
./../../common/workunit
./../../system/security/cryptohelper
./../../configuration/configmgr/configmgrlib
${HPCC_SOURCE_DIR}/system/masking/include
Expand Down Expand Up @@ -118,6 +119,7 @@ target_link_libraries ( unittests
esphttp
esdllib
logginglib
workunit
${CppUnit_LIBRARIES}
)

Expand Down
Loading
Loading