Skip to content

Commit

Permalink
Merge pull request #19174 from ghalliday/issue32760
Browse files Browse the repository at this point in the history
HPCC-32760 Provide periodic details of the performance of RoxieSocketQueueManager::thread

Reviewed-By: Richard Chapman <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Oct 9, 2024
2 parents d9787ff + 447c9f8 commit 6b285d9
Showing 1 changed file with 50 additions and 11 deletions.
61 changes: 50 additions & 11 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2426,6 +2426,31 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
DelayedPacketQueueManager delayed;
#endif

class WorkerUdpTracker : public TimeDivisionTracker<6, false>
{
public:
enum
{
other,
waiting,
allocating,
processing,
pushing,
checkingRunning
};

WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<6, false>(name, reportIntervalSeconds)
{
stateNames[other] = "other";
stateNames[waiting] = "waiting";
stateNames[allocating] = "allocating";
stateNames[processing] = "processing";
stateNames[pushing] = "pushing";
stateNames[checkingRunning] = "checking running";
}

} timeTracker;

class ReceiverThread : public Thread
{
RoxieSocketQueueManager &parent;
Expand All @@ -2445,7 +2470,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
} readThread;

public:
RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), readThread(*this)
RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 60), readThread(*this)
{
maxPacketSize = multicastSocket->get_max_send_size();
if ((maxPacketSize==0)||(maxPacketSize>65535))
Expand Down Expand Up @@ -2758,21 +2783,26 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
// if found, send an IBYTI and discard retry request

bool alreadyRunning = false;
Owned<IPooledThreadIterator> wi = queue.running();
ForEach(*wi)
{
CRoxieWorker &w = (CRoxieWorker &) wi->query();
if (w.match(header))
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::checkingRunning);

Owned<IPooledThreadIterator> wi = queue.running();
ForEach(*wi)
{
alreadyRunning = true;
ROQ->sendIbyti(header, logctx, mySubchannel);
if (doTrace(traceRoxiePackets, TraceFlags::Max))
CRoxieWorker &w = (CRoxieWorker &) wi->query();
if (w.match(header))
{
StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str());
alreadyRunning = true;
ROQ->sendIbyti(header, logctx, mySubchannel);
if (doTrace(traceRoxiePackets, TraceFlags::Max))
{
StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str());
}
break;
}
break;
}
}

if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
{
alreadyRunning = true;
Expand All @@ -2788,6 +2818,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
{
StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
}
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing);
#ifdef NEW_IBYTI
// It's debatable whether we should delay for the primary here - they had one chance already...
// But then again, so did we, assuming the timeout is longer than the IBYTIdelay
Expand All @@ -2806,6 +2837,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
}
else // first time (not a retry).
{
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing);
#ifdef NEW_IBYTI
unsigned delay = 0;
if (mySubchannel != 0 && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) // i.e. I am not the primary here, and never delay special
Expand All @@ -2830,6 +2862,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
doIbytiDelay?"YES":"NO", minIbytiDelay, initIbytiDelay);

MemoryBuffer mb;
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::other);
for (;;)
{
mb.clear();
Expand All @@ -2844,8 +2877,14 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
#else
unsigned timeout = 5000;
#endif
division.switchState(WorkerUdpTracker::allocating);
void * buffer = mb.reserve(maxPacketSize);

division.switchState(WorkerUdpTracker::waiting);
unsigned l;
multicastSocket->readtms(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, timeout);
multicastSocket->readtms(buffer, sizeof(RoxiePacketHeader), maxPacketSize, l, timeout);
division.switchState(WorkerUdpTracker::processing);

mb.setLength(l);
RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray();
if (l != header.packetlength)
Expand Down

0 comments on commit 6b285d9

Please sign in to comment.