Skip to content

Commit

Permalink
HPCC-32958 Roxie dynamic priority 4
Browse files Browse the repository at this point in the history
Signed-off-by: M Kelly <[email protected]>
  • Loading branch information
mckellyln committed Jan 3, 2025
1 parent cf88c05 commit abef643
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 43 deletions.
9 changes: 8 additions & 1 deletion roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ void setMulticastEndpoints(unsigned numChannels);
#define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue
#define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY)

#define QUERY_BG_PRIORITY_VALUE -1
#define QUERY_LOW_PRIORITY_VALUE 0
#define QUERY_HIGH_PRIORITY_VALUE 1
#define QUERY_SLA_PRIORITY_VALUE 2
const static int queryMinPriorityValue = QUERY_BG_PRIORITY_VALUE;
const static int queryMaxPriorityValue = QUERY_SLA_PRIORITY_VALUE;

#define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities

// Status information returned in the activityId field of the header:
Expand Down Expand Up @@ -485,7 +492,7 @@ inline unsigned getBondedChannel(unsigned partNo)
return ((partNo - 1) % numChannels) + 1;
}

extern unsigned priorityMask(int priority);
extern unsigned getPriorityMask(int priority);

extern void FatalError(const char *format, ...) __attribute__((format(printf, 1, 2)));
extern unsigned getNextInstanceId();
Expand Down
60 changes: 28 additions & 32 deletions roxie/ccd/ccdquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ class CSharedOnceContext : public CInterfaceOf<ISharedOnceContext>

QueryOptions::QueryOptions()
{
priority = 0;
dynPriority = 0;
priority = QUERY_LOW_PRIORITY_VALUE;
dynPriority = QUERY_LOW_PRIORITY_VALUE;
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];

Expand Down Expand Up @@ -359,7 +359,7 @@ QueryOptions::QueryOptions()
QueryOptions::QueryOptions(const QueryOptions &other)
{
priority = other.priority;
dynPriority = other.dynPriority;
dynPriority = other.dynPriority.load();
timeLimit = other.timeLimit;
warnTimeLimit = other.warnTimeLimit;

Expand Down Expand Up @@ -396,24 +396,31 @@ QueryOptions::QueryOptions(const QueryOptions &other)
numWorkflowThreads = other.numWorkflowThreads;
}

void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
void QueryOptions::updateDynPriority(int _priority)
{
// calculate priority before others since it affects the defaults of others
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");
dynPriority = priority;
if ((int)priority < 0)
dynPriority = _priority;
if (dynPriority < QUERY_LOW_PRIORITY_VALUE)
{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[priority];
warnTimeLimit = defaultWarnTimeLimit[priority];
timeLimit = defaultTimeLimit[_priority];
warnTimeLimit = defaultWarnTimeLimit[_priority];
}
}

void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo)
{
// calculate priority before others since it affects the defaults of others
updateFromWorkUnit(priority, wu, "priority");
if (stateInfo)
updateFromContext(priority, stateInfo, "@priority");

updateDynPriority((int)priority);

updateFromWorkUnit(timeLimit, wu, "timeLimit");
updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit");
updateFromWorkUnitM(memoryLimit, wu, "memoryLimit");
Expand Down Expand Up @@ -500,28 +507,17 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx)
// Note: priority cannot be set at context level
// b/c this is after activities have been created, but we could
// dynamically adj priority in the header activityId before sending
int tmpPriority;
int tmpPriority = (int)priority;
updateFromContext(tmpPriority, ctx, "@priority", "_Priority");
if (tmpPriority > 1)
tmpPriority = 1;
if (tmpPriority < -1)
tmpPriority = -1;

if (tmpPriority > queryMaxPriorityValue)
tmpPriority = queryMaxPriorityValue;
if (tmpPriority < queryMinPriorityValue)
tmpPriority = queryMinPriorityValue;

// only adjust lower ...
if (tmpPriority < (int)priority)
{
dynPriority = tmpPriority;
if (dynPriority < 0)
{
// use LOW queue time limits ...
timeLimit = defaultTimeLimit[0];
warnTimeLimit = defaultWarnTimeLimit[0];
}
else
{
timeLimit = defaultTimeLimit[dynPriority];
warnTimeLimit = defaultWarnTimeLimit[dynPriority];
}
}
updateDynPriority(tmpPriority);

updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit");
updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit");
Expand Down Expand Up @@ -653,7 +649,7 @@ class CQueryFactory : implements IQueryFactory, implements IResourceContext, pub
if (isSuspended)
return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point?

rid |= priorityMask(options.priority);
rid |= getPriorityMask(options.priority);

StringBuffer helperName;
helperName.append("fAc").append(id);
Expand Down
3 changes: 2 additions & 1 deletion roxie/ccd/ccdquery.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ class QueryOptions
void setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo);
void setFromContext(const IPropertyTree *ctx);
void setFromAgentLoggingFlags(unsigned loggingFlags);
void updateDynPriority(int _priority);

unsigned priority;
mutable int dynPriority;
mutable std::atomic<int> dynPriority;
unsigned timeLimit;
unsigned warnTimeLimit;
unsigned traceLimit;
Expand Down
18 changes: 9 additions & 9 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3629,18 +3629,18 @@ void throwRemoteException(IMessageUnpackCursor *extra)
throwUnexpected();
}

unsigned priorityMask(int priority)
unsigned getPriorityMask(int priority)
{
unsigned newPri = ROXIE_BG_PRIORITY;
switch (priority)
{
case 2:
case QUERY_SLA_PRIORITY_VALUE:
newPri = ROXIE_SLA_PRIORITY;
break;
case 1:
case QUERY_HIGH_PRIORITY_VALUE:
newPri = ROXIE_HIGH_PRIORITY;
break;
case 0:
case QUERY_LOW_PRIORITY_VALUE:
newPri = ROXIE_LOW_PRIORITY;
break;
}
Expand Down Expand Up @@ -4584,16 +4584,16 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
int dynPriority = ctx->queryOptions().dynPriority;
if (dynPriority < origPriority)
{
unsigned newPri = priorityMask(dynPriority);
unsigned newPri = getPriorityMask(dynPriority);
p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK;
p->queryHeader().activityId |= newPri;
}

// TODO: perhaps check elapsed every Nth msg ?
if ( (dynPriorityAdjustCycles > 0) && (origPriority == 0) && (dynPriority == 0) &&
if ( (dynPriorityAdjustCycles > 0) && (origPriority == QUERY_LOW_PRIORITY_VALUE) && (dynPriority == QUERY_LOW_PRIORITY_VALUE) &&
(ctx->queryElapsedCycles() > dynPriorityAdjustCycles) )
{
ctx->queryOptions().dynPriority = -1;
ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE;
unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles();
UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec);
p->queryHeader().activityId |= ROXIE_BG_PRIORITY;
Expand Down Expand Up @@ -5061,10 +5061,10 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
unsigned timeout = lowTimeout;
switch (activity.queryContext()->queryOptions().dynPriority)
{
case 2:
case QUERY_SLA_PRIORITY_VALUE:
timeout = slaTimeout;
break;
case 1:
case QUERY_HIGH_PRIORITY_VALUE:
timeout = highTimeout;
break;
}
Expand Down

0 comments on commit abef643

Please sign in to comment.