From 447c9f86fdd9ad1560db9679df0ab7d98c8964b6 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Wed, 2 Oct 2024 17:09:55 +0100 Subject: [PATCH] HPCC-32760 Provide periodic details of the performance of RoxieSocketQueueManager::thread Signed-off-by: Gavin Halliday --- roxie/ccd/ccdqueue.cpp | 61 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 5e41da7fbbf..4b799fd7bf3 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -2422,6 +2422,31 @@ class RoxieSocketQueueManager : public RoxieReceiverBase DelayedPacketQueueManager delayed; #endif + class WorkerUdpTracker : public TimeDivisionTracker<6, false> + { + public: + enum + { + other, + waiting, + allocating, + processing, + pushing, + checkingRunning + }; + + WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<6, false>(name, reportIntervalSeconds) + { + stateNames[other] = "other"; + stateNames[waiting] = "waiting"; + stateNames[allocating] = "allocating"; + stateNames[processing] = "processing"; + stateNames[pushing] = "pushing"; + stateNames[checkingRunning] = "checking running"; + } + + } timeTracker; + class ReceiverThread : public Thread { RoxieSocketQueueManager &parent; @@ -2441,7 +2466,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } readThread; public: - RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), readThread(*this) + RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 60), readThread(*this) { maxPacketSize = multicastSocket->get_max_send_size(); if ((maxPacketSize==0)||(maxPacketSize>65535)) @@ -2754,21 +2779,26 @@ class RoxieSocketQueueManager : public RoxieReceiverBase // if found, send an IBYTI and discard retry request bool alreadyRunning = false; - Owned wi = queue.running(); - ForEach(*wi) { - CRoxieWorker &w = (CRoxieWorker &) wi->query(); - if (w.match(header)) + WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::checkingRunning); + + Owned wi = queue.running(); + ForEach(*wi) { - alreadyRunning = true; - ROQ->sendIbyti(header, logctx, mySubchannel); - if (doTrace(traceRoxiePackets, TraceFlags::Max)) + CRoxieWorker &w = (CRoxieWorker &) wi->query(); + if (w.match(header)) { - StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str()); + alreadyRunning = true; + ROQ->sendIbyti(header, logctx, mySubchannel); + if (doTrace(traceRoxiePackets, TraceFlags::Max)) + { + StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str()); + } + break; } - break; } } + if (!alreadyRunning && checkCompleted && ROQ->replyPending(header)) { alreadyRunning = true; @@ -2784,6 +2814,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase { StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str()); } + WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing); #ifdef NEW_IBYTI // It's debatable whether we should delay for the primary here - they had one chance already... // But then again, so did we, assuming the timeout is longer than the IBYTIdelay @@ -2802,6 +2833,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } else // first time (not a retry). { + WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing); #ifdef NEW_IBYTI unsigned delay = 0; if (mySubchannel != 0 && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) // i.e. I am not the primary here, and never delay special @@ -2826,6 +2858,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase doIbytiDelay?"YES":"NO", minIbytiDelay, initIbytiDelay); MemoryBuffer mb; + WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::other); for (;;) { mb.clear(); @@ -2840,8 +2873,14 @@ class RoxieSocketQueueManager : public RoxieReceiverBase #else unsigned timeout = 5000; #endif + division.switchState(WorkerUdpTracker::allocating); + void * buffer = mb.reserve(maxPacketSize); + + division.switchState(WorkerUdpTracker::waiting); unsigned l; - multicastSocket->readtms(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, timeout); + multicastSocket->readtms(buffer, sizeof(RoxiePacketHeader), maxPacketSize, l, timeout); + division.switchState(WorkerUdpTracker::processing); + mb.setLength(l); RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray(); if (l != header.packetlength)