diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 532f351ca2e..c9f8bfa4e54 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -295,7 +295,6 @@ extern unsigned callbackTimeout; extern unsigned lowTimeout; extern unsigned highTimeout; extern unsigned slaTimeout; -extern unsigned headRegionSize; extern unsigned ccdMulticastPort; extern IPropertyTree *topology; extern MapStringTo *preferredClusters; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 3eca077f7d9..d0af342b917 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -75,7 +75,6 @@ unsigned numRequestArrayThreads = 5; bool blockedLocalAgent = true; bool acknowledgeAllRequests = true; unsigned packetAcknowledgeTimeout = 100; -unsigned headRegionSize; unsigned ccdMulticastPort; bool enableHeartBeat = true; unsigned parallelLoopFlowLimit = 100; @@ -1000,7 +999,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize); blockedLocalAgent = topology->getPropBool("@blockedLocalAgent", blockedLocalAgent); acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests); - headRegionSize = topology->getPropInt("@headRegionSize", 0); packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout); ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600); @@ -1450,7 +1448,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) DBGLOG("Loading all packages took %ums", loadPackageTimer.elapsedMs()); ROQ = createOutputQueueManager(numAgentThreads, encryptInTransit); - ROQ->setHeadRegionSize(headRegionSize); ROQ->start(); Owned packetDiscarder = createPacketDiscarder(); #if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL) diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index faef7a055db..b1036df9cd3 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -1151,7 +1151,6 @@ class RoxieQueue : public CInterface, implements IThreadFactory Semaphore available; CriticalSection availCrit; // Semaphore post may be slow with a lot of waiters - this crit may be used to limit to a single waiter CriticalSection qcrit; - unsigned headRegionSize; unsigned numWorkers; RelaxedAtomic started; std::atomic idle; @@ -1173,9 +1172,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory public: IMPLEMENT_IINTERFACE; - RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers) + RoxieQueue(unsigned _numWorkers) { - headRegionSize = _headRegionSize; numWorkers = _numWorkers; workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers)); started = 0; @@ -1330,31 +1328,7 @@ class RoxieQueue : public CInterface, implements IThreadFactory ISerializedRoxieQueryPacket *dequeue() { CriticalBlock qc(qcrit); - unsigned lim = waiting.ordinality(); - if (lim) - { - if (headRegionSize) - { - if (lim > headRegionSize) - lim = headRegionSize; - return waiting.dequeue(fastRand() % lim); - } - return waiting.dequeue(); - } - else - return NULL; - } - - unsigned getHeadRegionSize() const - { - return headRegionSize; - } - - unsigned setHeadRegionSize(unsigned newsize) - { - unsigned ret = headRegionSize; - headRegionSize = newsize; - return ret; + return waiting.dequeue(); } void noteOrphanIBYTI(const RoxiePacketHeader &hdr) @@ -1901,20 +1875,8 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface public: IMPLEMENT_IINTERFACE; - RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers) - { - } - - virtual unsigned getHeadRegionSize() const - { - return loQueue.getHeadRegionSize(); - } - - virtual void setHeadRegionSize(unsigned newSize) + RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers), hiQueue(_numWorkers), loQueue(_numWorkers), numWorkers(_numWorkers) { - slaQueue.setHeadRegionSize(newSize); - hiQueue.setHeadRegionSize(newSize); - loQueue.setHeadRegionSize(newSize); } virtual void start() diff --git a/roxie/udplib/udplib.hpp b/roxie/udplib/udplib.hpp index ebd674376df..56d75c36d35 100644 --- a/roxie/udplib/udplib.hpp +++ b/roxie/udplib/udplib.hpp @@ -177,9 +177,6 @@ interface IRoxieOutputQueueManager : public IInterface virtual bool replyPending(RoxiePacketHeader &x) = 0; virtual bool abortCompleted(RoxiePacketHeader &x) = 0; - virtual unsigned getHeadRegionSize() const = 0; - virtual void setHeadRegionSize(unsigned newsize) = 0; - virtual void start() = 0; virtual void stop() = 0; virtual void join() = 0;