Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32946 Capture and report lookahead timings for hash distributor #19272

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,14 @@ class CDiskGroupAggregateSlave
merging = false;
appendOutputLinked(this);
}

// CSlaveActivity overloaded methods
virtual unsigned __int64 queryLookAheadCycles() const override
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
18 changes: 17 additions & 1 deletion thorlcr/activities/fetch/thfetchslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch
if (distributor)
distributor->abort();
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
if (distributor)
return distributor->queryLookAheadCycles();
return 0;
}
// IStopInput
virtual void stopInput()
{
Expand Down Expand Up @@ -404,6 +409,15 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
{
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
CriticalBlock b(fetchStreamCS);
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (fetchStream)
lookAheadCycles += fetchStream->queryLookAheadCycles();
return lookAheadCycles;
}

// IThorDataLink impl.
virtual void start() override
{
Expand Down Expand Up @@ -515,6 +529,8 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
OwnedRoxieString fileName = fetchBaseHelper->getFileName();
{
CriticalBlock b(fetchStreamCS);
if (fetchStream)
slaveTimerStats.lookAheadCycles += fetchStream->queryLookAheadCycles();
fetchStream.setown(createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp));
}
fetchStreamOut = fetchStream->queryOutput();
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/fetch/thfetchslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ interface IFetchStream : extends IInterface
virtual void abort() = 0;
virtual void getStats(CRuntimeStatisticCollection & stats) const = 0;
virtual void getFileStats(std::vector<OwnedPtr<CRuntimeStatisticCollection>> & fileStats, unsigned fileTableStart) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
};

IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);
Expand Down
52 changes: 43 additions & 9 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
RelaxedAtomic<stat_type> numLocalRows {0};
RelaxedAtomic<stat_type> numRemoteRows {0};
RelaxedAtomic<size_t> sizeRemoteWrite {0};
RelaxedAtomic<cycle_t> lookAheadCycles {0};

void init()
{
Expand Down Expand Up @@ -859,10 +860,19 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
}
if (aborted)
break;
const void *row = input->ungroupedNextRow();
const void *row;
if (owner.activity->queryTimeActivities())
{
CCycleTimer rowTimer;
row = input->ungroupedNextRow();
lookAheadCycles.fastAdd(rowTimer.elapsedCycles());
}
else
{
row = input->ungroupedNextRow();
}
if (!row)
break;

CTarget *target = nullptr;
if (owner.isAll)
target = targets.item(0);
Expand Down Expand Up @@ -947,6 +957,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
stats.setStatistic(StNumRemoteRows, numRemoteRows.load());
stats.setStatistic(StSizeRemoteWrite, sizeRemoteWrite.load());
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return lookAheadCycles.load();
}
// IThreadFactory impl.
virtual IPooledThread *createNew()
{
Expand Down Expand Up @@ -1257,6 +1271,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
ihash = NULL;
iCompare = NULL;
}
virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return sender.queryLookAheadCycles();
}
virtual void abort()
{
if (!aborted)
Expand Down Expand Up @@ -1451,13 +1476,6 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
virtual void stopRecv() = 0;
virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0;

virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
// IExceptionHandler impl.
virtual bool fireException(IException *e)
{
Expand Down Expand Up @@ -4103,6 +4121,15 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput
activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
}
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
};
#ifdef _MSC_VER
#pragma warning(pop)
Expand Down Expand Up @@ -4584,6 +4611,13 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato
info.canStall = true;
// maybe more?
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorRowAggregator impl
virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); }
virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); }
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/hashdistrib/thhashdistribslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface IHashDistributor : extends IInterface
virtual void join()=0;
virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
virtual void mergeStats(CRuntimeStatisticCollection &stats) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
virtual void abort()=0;
};

Expand Down
7 changes: 7 additions & 0 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,13 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
merging = false;
appendOutputLinked(this);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
9 changes: 9 additions & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2837,6 +2837,15 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
PARENT::start();
dbgassertex(isSmart() || (leftITDL->isGrouped() == grouped)); // std. lookup join expects these to match
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = PARENT::queryLookAheadCycles();
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
virtual void reset() override
{
PARENT::reset();
Expand Down
Loading