Skip to content

Commit

Permalink
HPCC-XXXXX Remove headRegionSize option and associated code
Browse files Browse the repository at this point in the history
This was not used (I hope!) and was desiged to solve an issue that was better
managed by the IBYTI delay logic.

Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Oct 2, 2024
1 parent 1d20f33 commit 716badc
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 48 deletions.
1 change: 0 additions & 1 deletion roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ extern unsigned callbackTimeout;
extern unsigned lowTimeout;
extern unsigned highTimeout;
extern unsigned slaTimeout;
extern unsigned headRegionSize;
extern unsigned ccdMulticastPort;
extern IPropertyTree *topology;
extern MapStringTo<int> *preferredClusters;
Expand Down
3 changes: 0 additions & 3 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ unsigned numRequestArrayThreads = 5;
bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
unsigned headRegionSize;
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
unsigned parallelLoopFlowLimit = 100;
Expand Down Expand Up @@ -1000,7 +999,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize);
blockedLocalAgent = topology->getPropBool("@blockedLocalAgent", blockedLocalAgent);
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
headRegionSize = topology->getPropInt("@headRegionSize", 0);
packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout);
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
Expand Down Expand Up @@ -1450,7 +1448,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
DBGLOG("Loading all packages took %ums", loadPackageTimer.elapsedMs());

ROQ = createOutputQueueManager(numAgentThreads, encryptInTransit);
ROQ->setHeadRegionSize(headRegionSize);
ROQ->start();
Owned<IPacketDiscarder> packetDiscarder = createPacketDiscarder();
#if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL)
Expand Down
44 changes: 3 additions & 41 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,6 @@ class RoxieQueue : public CInterface, implements IThreadFactory
Semaphore available;
CriticalSection availCrit; // Semaphore post may be slow with a lot of waiters - this crit may be used to limit to a single waiter
CriticalSection qcrit;
unsigned headRegionSize;
unsigned numWorkers;
RelaxedAtomic<unsigned> started;
std::atomic<unsigned> idle;
Expand All @@ -1173,9 +1172,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory
public:
IMPLEMENT_IINTERFACE;

RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
RoxieQueue(unsigned _numWorkers)
{
headRegionSize = _headRegionSize;
numWorkers = _numWorkers;
workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers));
started = 0;
Expand Down Expand Up @@ -1330,31 +1328,7 @@ class RoxieQueue : public CInterface, implements IThreadFactory
ISerializedRoxieQueryPacket *dequeue()
{
CriticalBlock qc(qcrit);
unsigned lim = waiting.ordinality();
if (lim)
{
if (headRegionSize)
{
if (lim > headRegionSize)
lim = headRegionSize;
return waiting.dequeue(fastRand() % lim);
}
return waiting.dequeue();
}
else
return NULL;
}

unsigned getHeadRegionSize() const
{
return headRegionSize;
}

unsigned setHeadRegionSize(unsigned newsize)
{
unsigned ret = headRegionSize;
headRegionSize = newsize;
return ret;
return waiting.dequeue();
}

void noteOrphanIBYTI(const RoxiePacketHeader &hdr)
Expand Down Expand Up @@ -1901,20 +1875,8 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
{
}

virtual unsigned getHeadRegionSize() const
{
return loQueue.getHeadRegionSize();
}

virtual void setHeadRegionSize(unsigned newSize)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers), hiQueue(_numWorkers), loQueue(_numWorkers), numWorkers(_numWorkers)
{
slaQueue.setHeadRegionSize(newSize);
hiQueue.setHeadRegionSize(newSize);
loQueue.setHeadRegionSize(newSize);
}

virtual void start()
Expand Down
3 changes: 0 additions & 3 deletions roxie/udplib/udplib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ interface IRoxieOutputQueueManager : public IInterface
virtual bool replyPending(RoxiePacketHeader &x) = 0;
virtual bool abortCompleted(RoxiePacketHeader &x) = 0;

virtual unsigned getHeadRegionSize() const = 0;
virtual void setHeadRegionSize(unsigned newsize) = 0;

virtual void start() = 0;
virtual void stop() = 0;
virtual void join() = 0;
Expand Down

0 comments on commit 716badc

Please sign in to comment.