From feed3d78dffb87b522c9b3f809212e072546802e Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 27 Sep 2024 16:33:31 +0100 Subject: [PATCH] HPCC-32480 Capture "look ahead" timings for unordered concat (parallel funnel) Signed-off-by: Shamser Ahmed --- common/thorhelper/thorcommon.hpp | 7 ++++++- thorlcr/activities/funnel/thfunnelslave.cpp | 12 ++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index c61ed99f218..d5245aaaa9a 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -316,7 +316,7 @@ class SimpleActivityTimer cycle_t startCycles; cycle_t &accumulator; protected: - const bool enabled; + bool enabled; public: inline SimpleActivityTimer(cycle_t &_accumulator, const bool _enabled) : accumulator(_accumulator), enabled(_enabled) @@ -328,12 +328,17 @@ class SimpleActivityTimer } inline ~SimpleActivityTimer() + { + leave(); + } + inline void leave() { if (likely(enabled)) { cycle_t nowCycles = get_cycles_now(); cycle_t elapsedCycles = nowCycles - startCycles; accumulator += elapsedCycles; + enabled = false; } } }; diff --git a/thorlcr/activities/funnel/thfunnelslave.cpp b/thorlcr/activities/funnel/thfunnelslave.cpp index 32962951c52..e2773c3c28d 100644 --- a/thorlcr/activities/funnel/thfunnelslave.cpp +++ b/thorlcr/activities/funnel/thfunnelslave.cpp @@ -85,6 +85,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface inputStream = funnel.activity.queryInputStream(inputIndex); while (!stopping) { + LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities()); numRows = 0; for (;numRows < chunkSize; numRows++) { @@ -141,14 +142,15 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface Linked serializer; void push(const void *row) - { + { size32_t rowSize = thorRowMemoryFootprint(serializer, row); bool waitForSpace = false; // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time { - + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. + timer.leave(); if (stopped) { ReleaseThorRow(row); @@ -179,7 +181,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface bool waitForSpace = false; // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. + timer.leave(); if (stopped) { for (unsigned i=0; i < numRows; i++) @@ -266,7 +270,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface } { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); + timer.leave(); stopped = true; // ensure any pending push()'s don't enqueue and if big row potentially block again. if (waiting) { @@ -301,7 +307,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface size32_t sz = thorRowMemoryFootprint(serializer, row.get()); unsigned numToSignal = 0; { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); + timer.leave(); assertex(totSize>=sz); totSize -= sz; if (waiting && (totSize <= FUNNEL_MIN_BUFF_SIZE))