diff --git a/thorlcr/activities/diskread/thdiskreadslave.cpp b/thorlcr/activities/diskread/thdiskreadslave.cpp index 80fa876bcd9..1268ccfb0bd 100644 --- a/thorlcr/activities/diskread/thdiskreadslave.cpp +++ b/thorlcr/activities/diskread/thdiskreadslave.cpp @@ -1080,7 +1080,14 @@ class CDiskGroupAggregateSlave merging = false; appendOutputLinked(this); } - +// CSlaveActivity overloaded methods + virtual unsigned __int64 queryLookAheadCycles() const override + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorGroupAggregateCallback virtual void processRow(const void *next) { diff --git a/thorlcr/activities/fetch/thfetchslave.cpp b/thorlcr/activities/fetch/thfetchslave.cpp index 96a6d75bfb6..487ed89619e 100644 --- a/thorlcr/activities/fetch/thfetchslave.cpp +++ b/thorlcr/activities/fetch/thfetchslave.cpp @@ -188,7 +188,12 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch if (distributor) distributor->abort(); } - + virtual unsigned __int64 queryLookAheadCycles() const override + { + if (distributor) + return distributor->queryLookAheadCycles(); + return 0; + } // IStopInput virtual void stopInput() { @@ -404,6 +409,15 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler { } + virtual unsigned __int64 queryLookAheadCycles() const override + { + CriticalBlock b(fetchStreamCS); + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (fetchStream) + lookAheadCycles += fetchStream->queryLookAheadCycles(); + return lookAheadCycles; + } + // IThorDataLink impl. virtual void start() override { @@ -515,6 +529,8 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler OwnedRoxieString fileName = fetchBaseHelper->getFileName(); { CriticalBlock b(fetchStreamCS); + if (fetchStream) + slaveTimerStats.lookAheadCycles += fetchStream->queryLookAheadCycles(); fetchStream.setown(createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp)); } fetchStreamOut = fetchStream->queryOutput(); diff --git a/thorlcr/activities/fetch/thfetchslave.ipp b/thorlcr/activities/fetch/thfetchslave.ipp index dd391e6f49b..0cc3a17e9ed 100644 --- a/thorlcr/activities/fetch/thfetchslave.ipp +++ b/thorlcr/activities/fetch/thfetchslave.ipp @@ -39,6 +39,7 @@ interface IFetchStream : extends IInterface virtual void abort() = 0; virtual void getStats(CRuntimeStatisticCollection & stats) const = 0; virtual void getFileStats(std::vector> & fileStats, unsigned fileTableStart) const = 0; + virtual unsigned __int64 queryLookAheadCycles() const = 0; }; IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL); diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index b2e5f034417..e35982a4188 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -505,6 +505,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl RelaxedAtomic numLocalRows {0}; RelaxedAtomic numRemoteRows {0}; RelaxedAtomic sizeRemoteWrite {0}; + RelaxedAtomic lookAheadCycles {0}; void init() { @@ -859,10 +860,19 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl } if (aborted) break; - const void *row = input->ungroupedNextRow(); + const void *row; + if (owner.activity->queryTimeActivities()) + { + CCycleTimer rowTimer; + row = input->ungroupedNextRow(); + lookAheadCycles.fastAdd(rowTimer.elapsedCycles()); + } + else + { + row = input->ungroupedNextRow(); + } if (!row) break; - CTarget *target = nullptr; if (owner.isAll) target = targets.item(0); @@ -947,6 +957,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl stats.setStatistic(StNumRemoteRows, numRemoteRows.load()); stats.setStatistic(StSizeRemoteWrite, sizeRemoteWrite.load()); } + virtual unsigned __int64 queryLookAheadCycles() const + { + return lookAheadCycles.load(); + } // IThreadFactory impl. virtual IPooledThread *createNew() { @@ -1167,6 +1181,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl return ssz.size(); } + // IHashDistributor virtual void setBufferSizes(unsigned _inputBufferSize, unsigned _bucketSendSize, unsigned _pullBufferSize) { if (_inputBufferSize) inputBufferSize = _inputBufferSize; @@ -1257,6 +1272,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl ihash = NULL; iCompare = NULL; } + virtual void mergeStats(CRuntimeStatisticCollection &stats) const + { + sender.mergeStats(stats); + CriticalBlock block(critPiperd); + if (piperd) + mergeRemappedStats(stats, piperd, diskToTempStatsMap); + } + virtual unsigned __int64 queryLookAheadCycles() const + { + return sender.queryLookAheadCycles(); + } virtual void abort() { if (!aborted) @@ -1451,13 +1477,6 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl virtual void stopRecv() = 0; virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0; - virtual void mergeStats(CRuntimeStatisticCollection &stats) const - { - sender.mergeStats(stats); - CriticalBlock block(critPiperd); - if (piperd) - mergeRemappedStats(stats, piperd, diskToTempStatsMap); - } // IExceptionHandler impl. virtual bool fireException(IException *e) { @@ -4103,6 +4122,15 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress()); } } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (lhsDistributor) + lookAheadCycles += lhsDistributor->queryLookAheadCycles(); + if (rhsDistributor) + lookAheadCycles += rhsDistributor->queryLookAheadCycles(); + return lookAheadCycles; + } }; #ifdef _MSC_VER #pragma warning(pop) @@ -4584,6 +4612,13 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato info.canStall = true; // maybe more? } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorRowAggregator impl virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); } virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); } diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp index 862ec32bdd9..fed9d564f5b 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp @@ -31,6 +31,7 @@ interface IHashDistributor : extends IInterface virtual void join()=0; virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0; virtual void mergeStats(CRuntimeStatisticCollection &stats) const = 0; + virtual unsigned __int64 queryLookAheadCycles() const = 0; virtual void abort()=0; }; diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index c73d933337f..e24cb47d302 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -1159,6 +1159,13 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements merging = false; appendOutputLinked(this); } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorGroupAggregateCallback virtual void processRow(const void *next) { diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 2e9f426a97a..febde309d58 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -2837,6 +2837,15 @@ class CLookupJoinActivityBase : public CInMemJoinBaseisGrouped() == grouped)); // std. lookup join expects these to match } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (rhsDistributor) + lookAheadCycles += rhsDistributor->queryLookAheadCycles(); + if (lhsDistributor) + lookAheadCycles += lhsDistributor->queryLookAheadCycles(); + return lookAheadCycles; + } virtual void reset() override { PARENT::reset();