diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index f84e493a2c0..03268d603c0 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -82,6 +82,13 @@ void setMulticastEndpoints(unsigned numChannels); #define ROXIE_BG_PRIORITY 0xc0000000 // mask in activityId indicating it goes on the bg queue #define ROXIE_PRIORITY_MASK (ROXIE_SLA_PRIORITY | ROXIE_HIGH_PRIORITY | ROXIE_LOW_PRIORITY) +#define QUERY_BG_PRIORITY_VALUE -1 +#define QUERY_LOW_PRIORITY_VALUE 0 +#define QUERY_HIGH_PRIORITY_VALUE 1 +#define QUERY_SLA_PRIORITY_VALUE 2 +static constexpr int queryMinPriorityValue = QUERY_BG_PRIORITY_VALUE; +static constexpr int queryMaxPriorityValue = QUERY_SLA_PRIORITY_VALUE; + #define ROXIE_ACTIVITY_FETCH 0x20000000 // or'ed into activityId for fetch part of full keyed join activities // Status information returned in the activityId field of the header: @@ -305,6 +312,7 @@ extern StringArray allQuerySetNames; extern bool blockedLocalAgent; extern bool acknowledgeAllRequests; extern unsigned packetAcknowledgeTimeout; +extern cycle_t dynPriorityAdjustCycles; extern bool alwaysTrustFormatCrcs; extern bool allFilesDynamic; extern bool lockSuperFiles; @@ -484,6 +492,8 @@ inline unsigned getBondedChannel(unsigned partNo) return ((partNo - 1) % numChannels) + 1; } +extern unsigned getPriorityMask(int priority); + extern void FatalError(const char *format, ...) __attribute__((format(printf, 1, 2))); extern unsigned getNextInstanceId(); extern void closedown(); diff --git a/roxie/ccd/ccdcontext.cpp b/roxie/ccd/ccdcontext.cpp index 2d84c5b7d6b..9a5b98853a4 100644 --- a/roxie/ccd/ccdcontext.cpp +++ b/roxie/ccd/ccdcontext.cpp @@ -1530,6 +1530,11 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext return options; } + virtual cycle_t queryElapsedCycles() const + { + return elapsedTimer.elapsedCycles(); + } + const char *queryAuthToken() { return authToken.str(); diff --git a/roxie/ccd/ccdcontext.hpp b/roxie/ccd/ccdcontext.hpp index a9968029a03..e88c0414cf3 100644 --- a/roxie/ccd/ccdcontext.hpp +++ b/roxie/ccd/ccdcontext.hpp @@ -54,6 +54,7 @@ interface IRoxieAgentContext : extends IRoxieContextLogger virtual void noteChildGraph(unsigned id, IActivityGraph *childGraph) = 0; virtual roxiemem::IRowManager &queryRowManager() = 0; virtual const QueryOptions &queryOptions() const = 0; + virtual cycle_t queryElapsedCycles() const = 0; virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) = 0; virtual const char *queryAuthToken() = 0; virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt, bool isPrivilegedUser) = 0; diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index 57e5c1f2d0a..a0c014358b6 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -1241,10 +1241,10 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker { switch((int)priority) { - case 0: loQueryStats.noteQuery(failed, elapsedTime); break; - case 1: hiQueryStats.noteQuery(failed, elapsedTime); break; - case 2: slaQueryStats.noteQuery(failed, elapsedTime); break; - case -1: bgQueryStats.noteQuery(failed, elapsedTime); break; + case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteQuery(failed, elapsedTime); break; + case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteQuery(failed, elapsedTime); break; + case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteQuery(failed, elapsedTime); break; + case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteQuery(failed, elapsedTime); break; } combinedQueryStats.noteQuery(failed, elapsedTime); } @@ -1334,7 +1334,7 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker unsigned agentsReplyLen = 0; unsigned agentsDuplicates = 0; unsigned agentsResends = 0; - unsigned priority = (unsigned) -2; + unsigned priority = (unsigned) -2; // NB -2 is outside of priority range try { bool isBlind = wu->getDebugValueBool("blindLogging", false); @@ -1358,10 +1358,10 @@ class RoxieWorkUnitWorker : public RoxieQueryWorker priority = queryFactory->queryOptions().priority; switch ((int)priority) { - case 0: loQueryStats.noteActive(); break; - case 1: hiQueryStats.noteActive(); break; - case 2: slaQueryStats.noteActive(); break; - case -1: bgQueryStats.noteActive(); break; + case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteActive(); break; + case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteActive(); break; + case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteActive(); break; + case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteActive(); break; } combinedQueryStats.noteActive(); Owned ctx = queryFactory->createContext(wu, logctx); @@ -1528,10 +1528,10 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte unsigned priority = getQueryPriority(); switch ((int)priority) { - case 0: loQueryStats.noteActive(); break; - case 1: hiQueryStats.noteActive(); break; - case 2: slaQueryStats.noteActive(); break; - case -1: bgQueryStats.noteActive(); break; + case QUERY_LOW_PRIORITY_VALUE: loQueryStats.noteActive(); break; + case QUERY_HIGH_PRIORITY_VALUE: hiQueryStats.noteActive(); break; + case QUERY_SLA_PRIORITY_VALUE: slaQueryStats.noteActive(); break; + case QUERY_BG_PRIORITY_VALUE: bgQueryStats.noteActive(); break; } unknownQueryStats.noteComplete(); combinedQueryStats.noteActive(); diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 192e58a5670..1f8476cec72 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -75,6 +75,7 @@ unsigned numRequestArrayThreads = 5; bool blockedLocalAgent = true; bool acknowledgeAllRequests = true; unsigned packetAcknowledgeTimeout = 100; +cycle_t dynPriorityAdjustCycles = 0; // default off (0) unsigned headRegionSize; unsigned ccdMulticastPort; bool enableHeartBeat = true; @@ -1007,6 +1008,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests); headRegionSize = topology->getPropInt("@headRegionSize", 0); packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout); + unsigned dynAdjustMsec = topology->getPropInt("@dynPriorityAdjustTime", 0); + if (dynAdjustMsec) + dynPriorityAdjustCycles = dynAdjustMsec * (queryOneSecCycles() / 1000ULL); ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600); roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0)); diff --git a/roxie/ccd/ccdquery.cpp b/roxie/ccd/ccdquery.cpp index 67631c107fa..05287d8f803 100644 --- a/roxie/ccd/ccdquery.cpp +++ b/roxie/ccd/ccdquery.cpp @@ -318,7 +318,8 @@ class CSharedOnceContext : public CInterfaceOf QueryOptions::QueryOptions() { - priority = 0; + priority = QUERY_LOW_PRIORITY_VALUE; + dynPriority = QUERY_LOW_PRIORITY_VALUE; timeLimit = defaultTimeLimit[0]; warnTimeLimit = defaultWarnTimeLimit[0]; @@ -358,6 +359,7 @@ QueryOptions::QueryOptions() QueryOptions::QueryOptions(const QueryOptions &other) { priority = other.priority; + dynPriority = other.dynPriority.load(); timeLimit = other.timeLimit; warnTimeLimit = other.warnTimeLimit; @@ -394,13 +396,10 @@ QueryOptions::QueryOptions(const QueryOptions &other) numWorkflowThreads = other.numWorkflowThreads; } -void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo) +void QueryOptions::updateDynPriority(int _priority) { - // calculate priority before others since it affects the defaults of others - updateFromWorkUnit(priority, wu, "priority"); - if (stateInfo) - updateFromContext(priority, stateInfo, "@priority"); - if ((int)priority < 0) + dynPriority = _priority; + if (dynPriority < QUERY_LOW_PRIORITY_VALUE) { // use LOW queue time limits ... timeLimit = defaultTimeLimit[0]; @@ -408,9 +407,20 @@ void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stat } else { - timeLimit = defaultTimeLimit[priority]; - warnTimeLimit = defaultWarnTimeLimit[priority]; + timeLimit = defaultTimeLimit[_priority]; + warnTimeLimit = defaultWarnTimeLimit[_priority]; } +} + +void QueryOptions::setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo) +{ + // calculate priority before others since it affects the defaults of others + updateFromWorkUnit(priority, wu, "priority"); + if (stateInfo) + updateFromContext(priority, stateInfo, "@priority"); + + updateDynPriority((int)priority); + updateFromWorkUnit(timeLimit, wu, "timeLimit"); updateFromWorkUnit(warnTimeLimit, wu, "warnTimeLimit"); updateFromWorkUnitM(memoryLimit, wu, "memoryLimit"); @@ -495,6 +505,20 @@ void QueryOptions::setFromContext(const IPropertyTree *ctx) if (ctx) { // Note: priority cannot be set at context level + // b/c this is after activities have been created, but we could + // dynamically adj priority in the header activityId before sending + int tmpPriority = (int)priority; + updateFromContext(tmpPriority, ctx, "@priority", "_Priority"); + + if (tmpPriority > queryMaxPriorityValue) + tmpPriority = queryMaxPriorityValue; + if (tmpPriority < queryMinPriorityValue) + tmpPriority = queryMinPriorityValue; + + // only adjust lower ... + if (tmpPriority < (int)priority) + updateDynPriority(tmpPriority); + updateFromContext(timeLimit, ctx, "@timeLimit", "_TimeLimit"); updateFromContext(warnTimeLimit, ctx, "@warnTimeLimit", "_WarnTimeLimit"); updateFromContextM(memoryLimit, ctx, "@memoryLimit", "_MemoryLimit"); @@ -624,15 +648,9 @@ class CQueryFactory : implements IQueryFactory, implements IResourceContext, pub if (isSuspended) return createRoxieServerDummyActivityFactory(id, subgraphId, *this, NULL, TAKnone, node, false); // Is there actually any point? - switch (options.priority) - { - case 1: - rid |= ROXIE_HIGH_PRIORITY; - break; - case 2: - rid |= ROXIE_SLA_PRIORITY; - break; - } + + rid |= getPriorityMask(options.priority); + StringBuffer helperName; helperName.append("fAc").append(id); HelperFactory *helperFactory = dll->getFactory(helperName); diff --git a/roxie/ccd/ccdquery.hpp b/roxie/ccd/ccdquery.hpp index f4f45d12fd4..0c2f5d90968 100644 --- a/roxie/ccd/ccdquery.hpp +++ b/roxie/ccd/ccdquery.hpp @@ -79,9 +79,10 @@ class QueryOptions void setFromWorkUnit(IConstWorkUnit &wu, const IPropertyTree *stateInfo); void setFromContext(const IPropertyTree *ctx); void setFromAgentLoggingFlags(unsigned loggingFlags); - + void updateDynPriority(int _priority); unsigned priority; + mutable std::atomic dynPriority; unsigned timeLimit; unsigned warnTimeLimit; unsigned traceLimit; diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index e99399e6f8d..503e57c2d99 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -316,6 +316,10 @@ class IndirectAgentContext : implements IRoxieAgentContext, public CInterface { return ctx->queryOptions(); } + virtual cycle_t queryElapsedCycles() const override + { + return ctx->queryElapsedCycles(); + } virtual void addAgentsReplyLen(unsigned len, unsigned duplicates, unsigned resends) override { ctx->addAgentsReplyLen(len, duplicates, resends); @@ -3625,6 +3629,24 @@ void throwRemoteException(IMessageUnpackCursor *extra) throwUnexpected(); } +unsigned getPriorityMask(int priority) +{ + unsigned newPri = ROXIE_BG_PRIORITY; + switch (priority) + { + case QUERY_SLA_PRIORITY_VALUE: + newPri = ROXIE_SLA_PRIORITY; + break; + case QUERY_HIGH_PRIORITY_VALUE: + newPri = ROXIE_HIGH_PRIORITY; + break; + case QUERY_LOW_PRIORITY_VALUE: + newPri = ROXIE_LOW_PRIORITY; + break; + } + return newPri; +} + class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxieInput, implements IExceptionHandler, public CInterface { friend class CRemoteResultMerger; @@ -4557,6 +4579,27 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie // But this could still cause too many reply packets on the fastlane // (higher priority output Q), which may cause the activities on the // low priority output Q to not get service on time. + + int origPriority = (int)ctx->queryOptions().priority; + int dynPriority = ctx->queryOptions().dynPriority; + if (dynPriority < origPriority) + { + unsigned newPri = getPriorityMask(dynPriority); + p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK; + p->queryHeader().activityId |= newPri; + } + + // TODO: perhaps check elapsed every Nth msg ? + if ( (dynPriorityAdjustCycles > 0) && (origPriority == QUERY_LOW_PRIORITY_VALUE) && (dynPriority == QUERY_LOW_PRIORITY_VALUE) && + (ctx->queryElapsedCycles() > dynPriorityAdjustCycles) ) + { + ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE; + unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles(); + UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec); + p->queryHeader().activityId |= ROXIE_BG_PRIORITY; + // TODO: what to do about still running activities' continuation/ack priorities ? + } + unsigned pmask = p->queryHeader().activityId & ROXIE_PRIORITY_MASK; if ((colocalArg == 0) && // not a child query activity?? ( (pmask == ROXIE_SLA_PRIORITY) || (pmask == ROXIE_HIGH_PRIORITY) ) && @@ -5014,7 +5057,18 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie mu.clear(); SimpleActivityTimer t(unpackerWaitCycles, timeActivities); unsigned ctxTraceLevel = activity.queryLogCtx().queryTraceLevel(); - unsigned timeout = remoteId.isSLAPriority() ? slaTimeout : (remoteId.isHighPriority() ? highTimeout : lowTimeout); + + unsigned timeout = lowTimeout; + switch (activity.queryContext()->queryOptions().dynPriority) + { + case QUERY_SLA_PRIORITY_VALUE: + timeout = slaTimeout; + break; + case QUERY_HIGH_PRIORITY_VALUE: + timeout = highTimeout; + break; + } + unsigned checkInterval = activity.queryContext()->checkInterval(); if (checkInterval > timeout) checkInterval = timeout;