Skip to content

Commit

Permalink
HPCC-32931 Capture and report execute timings for splitters
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Nov 4, 2024
1 parent 4c31ec4 commit f6730cb
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
bool inputConnected = false;
unsigned numOutputs = 0;
ThorDataLinkMetaInfo cachedMetaInfo;
std::unique_ptr<BlockedActivityTimer> blockedActivityTimer;

// NB: CWriter only used by 'balanced' splitter, which blocks write when too far ahead
class CWriter : public CSimpleInterface, IThreaded
Expand Down Expand Up @@ -217,7 +218,10 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
assertex(((unsigned)-1) != connectedOutputCount);
activeOutputCount = connectedOutputCount;

PARENT::start();
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
PARENT::start();
}
initMetaInfo(cachedMetaInfo);
cachedMetaInfo.suppressLookAhead = spill; // only suppress downstream lookaheads if this is a spilling splitter

Expand Down Expand Up @@ -299,6 +303,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
}
inline const void *nextRow(unsigned outIdx, rowcount_t current)
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
if (1 == activeOutputCount) // will be true, if only 1 input connected, or only 1 input was active (others stopped) when it started reading
return inputStream->nextRow();
if (recsReady == current && writeAheadException.get())
Expand Down Expand Up @@ -412,11 +417,13 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
virtual void paged() { pagedOut = true; }
virtual void blocked()
{
blockedActivityTimer.reset(new BlockedActivityTimer(slaveTimerStats, queryTimeActivities()));
writeBlocked = true; // Prevent other users getting beyond checking recsReady in writeahead()
writeAheadCrit.leave();
}
virtual void unblocked()
{
blockedActivityTimer.reset(nullptr);
writeAheadCrit.enter();
writeBlocked = false;
if (stalledWriters.ordinality())
Expand Down

0 comments on commit f6730cb

Please sign in to comment.