Skip to content

Commit

Permalink
HPCC-32480 Capture "look ahead" timings for unordered concat (paralle…
Browse files Browse the repository at this point in the history
…l funnel)

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Sep 27, 2024
1 parent 1f17a0f commit feed3d7
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
7 changes: 6 additions & 1 deletion common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ class SimpleActivityTimer
cycle_t startCycles;
cycle_t &accumulator;
protected:
const bool enabled;
bool enabled;
public:
inline SimpleActivityTimer(cycle_t &_accumulator, const bool _enabled)
: accumulator(_accumulator), enabled(_enabled)
Expand All @@ -328,12 +328,17 @@ class SimpleActivityTimer
}

inline ~SimpleActivityTimer()
{
leave();
}
inline void leave()
{
if (likely(enabled))
{
cycle_t nowCycles = get_cycles_now();
cycle_t elapsedCycles = nowCycles - startCycles;
accumulator += elapsedCycles;
enabled = false;
}
}
};
Expand Down
12 changes: 10 additions & 2 deletions thorlcr/activities/funnel/thfunnelslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
inputStream = funnel.activity.queryInputStream(inputIndex);
while (!stopping)
{
LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities());
numRows = 0;
for (;numRows < chunkSize; numRows++)
{
Expand Down Expand Up @@ -141,14 +142,15 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
Linked<IOutputRowSerializer> serializer;

void push(const void *row)
{
{
size32_t rowSize = thorRowMemoryFootprint(serializer, row);

bool waitForSpace = false;
// only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time
{

BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit.
timer.leave();
if (stopped)
{
ReleaseThorRow(row);
Expand Down Expand Up @@ -179,7 +181,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
bool waitForSpace = false;
// only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time
{
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit.
timer.leave();
if (stopped)
{
for (unsigned i=0; i < numRows; i++)
Expand Down Expand Up @@ -266,7 +270,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
}

{
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
CriticalBlock b(crit);
timer.leave();
stopped = true; // ensure any pending push()'s don't enqueue and if big row potentially block again.
if (waiting)
{
Expand Down Expand Up @@ -301,7 +307,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
size32_t sz = thorRowMemoryFootprint(serializer, row.get());
unsigned numToSignal = 0;
{
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
CriticalBlock b(crit);
timer.leave();
assertex(totSize>=sz);
totSize -= sz;
if (waiting && (totSize <= FUNNEL_MIN_BUFF_SIZE))
Expand Down

0 comments on commit feed3d7

Please sign in to comment.