-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-32945 Add support for queue clients with priorities #19306
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
|
||
#include "platform.h" | ||
#include <algorithm> | ||
#include <cstdlib> | ||
#include "limits.h" | ||
#include "jlib.hpp" | ||
#include "jbuff.hpp" | ||
|
@@ -51,13 +52,12 @@ | |
JobQueues | ||
JobQueue @name= @count= @state=active|paused|stopped | ||
Edition <num> | ||
Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads) | ||
Client @session= @connected= [@priority=n] @waiting= -- connections and waiting can no longer be > 1 | ||
Item* @wuid @owner @node @port @priority @session | ||
|
||
#endif | ||
|
||
|
||
|
||
class CJobQueueItem: implements IJobQueueItem, public CInterface | ||
{ | ||
int priority; | ||
|
@@ -395,7 +395,6 @@ class CJobQueueBase: implements IJobQueueConst, public CInterface | |
} | ||
public: | ||
sQueueData *qdata; | ||
Semaphore notifysem; | ||
CriticalSection crit; | ||
|
||
IMPLEMENT_IINTERFACE; | ||
|
@@ -789,37 +788,43 @@ class CJobQueueConst: public CJobQueueBase | |
class CJobQueue: public CJobQueueBase, implements IJobQueue | ||
{ | ||
public: | ||
sQueueData *activeq; | ||
sQueueData *activeq = nullptr; | ||
SessionId sessionid; | ||
unsigned locknest; | ||
bool writemode; | ||
bool connected; | ||
unsigned locknest = 0; | ||
bool writemode = false; | ||
bool connected = false; | ||
Owned<IConversation> initiateconv; | ||
StringAttr initiatewu; | ||
bool dequeuestop; | ||
bool cancelwaiting; | ||
bool validateitemsessions; | ||
std::atomic<bool> isProcessingDequeue = 0; | ||
bool dequeuestop = false; | ||
bool cancelwaiting = false; | ||
bool validateitemsessions = false; | ||
|
||
class csubs: implements ISDSSubscription, public CInterface | ||
class QueueChangeSubscription : implements ISDSSubscription, public CInterface | ||
{ | ||
CJobQueue *parent; | ||
public: | ||
//If this semaphone is in the CJobQueue class then there is a race condition | ||
//A callback may be at this point while the CJobQueue is deleted - causing it to signal | ||
//a deleted semaphore | ||
Semaphore notifysem; | ||
public: | ||
IMPLEMENT_IINTERFACE; | ||
csubs(CJobQueue *_parent) | ||
{ | ||
parent = _parent; | ||
} | ||
|
||
void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) | ||
{ | ||
CriticalBlock block(parent->crit); | ||
parent->notifysem.signal(); | ||
notifysem.signal(); | ||
} | ||
} subs; | ||
}; | ||
|
||
IMPLEMENT_IINTERFACE; | ||
//This must be an owned pointer, rather than a member, to avoid it being deleted while the notify() | ||
//callback is being called. | ||
Owned<QueueChangeSubscription> notifySubscription; | ||
|
||
CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this) | ||
IMPLEMENT_IINTERFACE_USING(CJobQueueBase); | ||
|
||
CJobQueue(const char *_qname) : CJobQueueBase(_qname) | ||
{ | ||
notifySubscription.setown(new QueueChangeSubscription); | ||
activeq = qdata; | ||
sessionid = myProcessSession(); | ||
validateitemsessions = false; | ||
|
@@ -1037,7 +1042,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
} | ||
StringBuffer path; | ||
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); | ||
qd->subscriberid = querySDS().subscribe(path.str(), subs, false); | ||
qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false); | ||
} | ||
} | ||
|
||
|
@@ -1048,7 +1053,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
if (!qd->subscriberid) { | ||
StringBuffer path; | ||
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); | ||
qd->subscriberid = querySDS().subscribe(path.str(), subs, false); | ||
qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false); | ||
} | ||
unsigned e = (unsigned)qd->root->getPropInt("Edition", 1); | ||
if (e!=qd->lastWaitEdition) { | ||
|
@@ -1128,7 +1133,24 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
} | ||
} | ||
|
||
sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues) | ||
bool hasHigherPriorityClients(IPropertyTree * queueTree, __uint64 clientPrio, unsigned threshold) | ||
{ | ||
unsigned higher = 0; | ||
Owned<IPropertyTreeIterator> iter = queueTree->getElements("Client"); | ||
ForEach(*iter) | ||
{ | ||
unsigned __int64 priority = iter->query().getPropInt64("@priority", 0); | ||
if (priority > clientPrio) | ||
{ | ||
higher++; | ||
if (higher >= threshold) | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
sQueueData *findbestqueue(bool useprev,int minprio,__uint64 clientPrio,unsigned numqueues,sQueueData **queues) | ||
{ | ||
if (numqueues==0) | ||
return NULL; | ||
|
@@ -1139,7 +1161,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
for (unsigned i=0;i<numqueues;i++) { | ||
sQueueData *qd = queues[i]; | ||
unsigned count = qd->root->getPropInt("@count"); | ||
if (count) { | ||
if (count) | ||
{ | ||
if (hasHigherPriorityClients(qd->root, clientPrio, count)) | ||
continue; | ||
|
||
int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio; | ||
if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) { | ||
StringBuffer path; | ||
|
@@ -1160,17 +1186,33 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
return best; | ||
} | ||
|
||
void setWaiting(unsigned numqueues,sQueueData **queues, bool set) | ||
void setWaiting(unsigned numqueues,sQueueData **queues, unsigned __int64 clientPrio, bool set) | ||
{ | ||
for (unsigned i=0; i<numqueues; i++) { | ||
IPropertyTree *croot = queryClientRootSession(*queues[i]); | ||
croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1)); | ||
//If a non-zero client priority has been specified, add (or remove) it from the list of priorities | ||
if (clientPrio) | ||
{ | ||
if (set) | ||
croot->setPropInt64("@priority", clientPrio); | ||
else | ||
croot->removeProp("@priority"); | ||
} | ||
} | ||
} | ||
|
||
// 'simple' queuing | ||
IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL) | ||
IJobQueueItem *dodequeue(int minprio, __uint64 clientPrio, unsigned timeout, bool useprev, bool * timedout) | ||
{ | ||
//If more than one thread is waiting on the queue, then the queue code does not work correctly | ||
//It is undefined which thread the semaphore signal will wake up. | ||
//E.g. there is one thread with a minimum priority of 0, and another with a minimum of 100, and an item of | ||
//priority 50 is queued. If the minimum priority of 100 is woken twice nothing will be dequeued. | ||
//Similar problems occur when the clientPriority is mixed. | ||
if (isProcessingDequeue.exchange(true)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should you sleep a tiny amount and try again instead of throwing ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, I see comment - |
||
throw MakeStringException(0, "Multiple concurrent dequeue not supported"); | ||
|
||
bool hasminprio=(minprio!=INT_MIN); | ||
if (timedout) | ||
*timedout = false; | ||
|
@@ -1200,23 +1242,30 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
active.append(qd); | ||
} | ||
if (stopped==total) | ||
{ | ||
isProcessingDequeue.store(false); | ||
return NULL; // all stopped | ||
} | ||
sQueueData **activeqds = (sQueueData **)active.getArray(); | ||
unsigned activenum = active.ordinality(); | ||
if (activenum) { | ||
sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds); | ||
sQueueData *bestqd = findbestqueue(useprev,minprio,clientPrio,activenum,activeqds); | ||
unsigned count = bestqd?bestqd->root->getPropInt("@count"):0; | ||
// load minp from cache | ||
if (count) { | ||
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio; | ||
if (!hasminprio||checkprio(*bestqd,mpr)) { | ||
block.setRollback(false); | ||
ret = dotake(*bestqd,NULL,true,hasminprio,mpr); | ||
if (ret) // think it must be! | ||
timeout = 0; // so mark that done | ||
else if (!hasminprio) { | ||
WARNLOG("Resetting queue %s",bestqd->qname.get()); | ||
clear(*bestqd); // reset queue as seems to have become out of sync | ||
if (count) | ||
{ | ||
if (!hasHigherPriorityClients(bestqd->root, clientPrio, count)) | ||
{ | ||
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio; | ||
if (!hasminprio||checkprio(*bestqd,mpr)) { | ||
block.setRollback(false); | ||
ret = dotake(*bestqd,NULL,true,hasminprio,mpr); | ||
if (ret) // think it must be! | ||
timeout = 0; // so mark that done | ||
else if (!hasminprio) { | ||
WARNLOG("Resetting queue %s",bestqd->qname.get()); | ||
clear(*bestqd); // reset queue as seems to have become out of sync | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -1226,15 +1275,15 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
block.setRollback(false); | ||
} | ||
if (!waitingset) { | ||
setWaiting(activenum,activeqds,true); | ||
setWaiting(activenum, activeqds, clientPrio, true); | ||
block.commit(); | ||
waitingset = true; | ||
} | ||
} | ||
} | ||
if (timeout==0) { | ||
if (waitingset) { | ||
setWaiting(activenum,activeqds,false); | ||
setWaiting(activenum, activeqds, clientPrio, false); | ||
block.commit(); | ||
} | ||
if (timedout) | ||
|
@@ -1246,7 +1295,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
// check every 5 mins independant of notify (in case subscription lost for some reason) | ||
if (to>timeout) | ||
to = timeout; | ||
notifysem.wait(to); | ||
notifySubscription->notifysem.wait(to); | ||
if (timeout!=(unsigned)INFINITE) { | ||
t = msTick()-t; | ||
if (t<timeout) | ||
|
@@ -1255,15 +1304,36 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
timeout = 0; | ||
} | ||
} | ||
|
||
isProcessingDequeue.store(false); | ||
return ret; | ||
} | ||
|
||
IJobQueueItem *dequeue(unsigned timeout=INFINITE) | ||
{ | ||
return dodequeue(INT_MIN,timeout); | ||
return dodequeue(INT_MIN, 0, timeout, false, nullptr); | ||
} | ||
|
||
|
||
IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay) override | ||
{ | ||
Owned<IJobQueueItem> item; | ||
if (prioritytransitiondelay) | ||
{ | ||
unsigned timeout = prioritytransitiondelay; | ||
bool usePrevPrio = true; | ||
item.setown(dodequeue(minPrio, 0, timeout, usePrevPrio, nullptr)); | ||
} | ||
if (!item) | ||
item.setown(dodequeue(minPrio, 0, timeout-prioritytransitiondelay, false, nullptr)); | ||
return item.getClear(); | ||
} | ||
|
||
IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE) | ||
{ | ||
return dodequeue(INT_MIN, priority, timeout, false, nullptr); | ||
} | ||
|
||
void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem | ||
{ | ||
Owned<IJobQueueItem> qi = qitem; | ||
|
@@ -1614,6 +1684,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
return (state&&(strcmp(state,"stopped")==0)); | ||
} | ||
|
||
void removeClient(sQueueData & qd, IPropertyTree * croot) | ||
{ | ||
qd.root->removeTree(croot); | ||
} | ||
|
||
void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued) | ||
{ | ||
Cconnlockblock block(this,false); | ||
|
@@ -1626,7 +1701,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
break; | ||
if (validateitemsessions && !validSession(croot)) { | ||
Cconnlockblock block(this,true); | ||
qd.root->removeTree(croot); | ||
removeClient(qd, croot); | ||
} | ||
else { | ||
waiting += croot->getPropInt("@waiting"); | ||
|
@@ -1758,7 +1833,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
int minprio = 0; | ||
unsigned timeout = prioritytransitiondelay; | ||
bool usePrevPrio = true; | ||
item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout)); | ||
item.setown(dodequeue(minprio, 0, timeout, usePrevPrio, &timedout)); | ||
} | ||
else | ||
item.setown(dequeue(INFINITE)); | ||
|
@@ -1801,7 +1876,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
{ | ||
CriticalBlock block(crit); | ||
dequeuestop = true; | ||
notifysem.signal(); | ||
notifySubscription->notifysem.signal(); | ||
} | ||
|
||
bool cancelInitiateConversation(sQueueData &qd,const char *wuid) | ||
|
@@ -1838,7 +1913,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
if (haschanged()) | ||
return true; | ||
} | ||
if (!notifysem.wait(timeout)) | ||
if (!notifySubscription->notifysem.wait(timeout)) | ||
break; | ||
} | ||
return false; | ||
|
@@ -1847,7 +1922,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue | |
{ | ||
CriticalBlock block(crit); | ||
cancelwaiting = true; | ||
notifysem.signal(); | ||
notifySubscription->notifysem.signal(); | ||
} | ||
|
||
virtual void enqueue(IJobQueueItem *qitem) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: = false