diff --git a/common/thorhelper/thorcommon.cpp b/common/thorhelper/thorcommon.cpp index 4be88493223..86f91ce0d43 100644 --- a/common/thorhelper/thorcommon.cpp +++ b/common/thorhelper/thorcommon.cpp @@ -1801,8 +1801,6 @@ void ActivityTimeAccumulator::addStatistics(IStatisticGatherer & builder) const if (blockedCycles) builder.addStatistic(StTimeBlocked, cycle_to_nanosec(blockedCycles)); } - if (lookAheadCycles) - builder.addStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles)); } void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged) const @@ -1816,8 +1814,6 @@ void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged if (blockedCycles) merged.mergeStatistic(StTimeBlocked, cycle_to_nanosec(blockedCycles)); } - if (lookAheadCycles) - merged.mergeStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles)); } void ActivityTimeAccumulator::merge(const ActivityTimeAccumulator & other) diff --git a/thorlcr/activities/diskread/thdiskreadslave.cpp b/thorlcr/activities/diskread/thdiskreadslave.cpp index 6c28ad1db05..abbabc586aa 100644 --- a/thorlcr/activities/diskread/thdiskreadslave.cpp +++ b/thorlcr/activities/diskread/thdiskreadslave.cpp @@ -1084,7 +1084,14 @@ class CDiskGroupAggregateSlave merging = false; appendOutputLinked(this); } - +// CSlaveActivity overloaded methods + virtual unsigned __int64 queryLookAheadCycles() const override + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorGroupAggregateCallback virtual void processRow(const void *next) { diff --git a/thorlcr/activities/fetch/thfetchslave.cpp b/thorlcr/activities/fetch/thfetchslave.cpp index 96a6d75bfb6..487ed89619e 100644 --- a/thorlcr/activities/fetch/thfetchslave.cpp +++ b/thorlcr/activities/fetch/thfetchslave.cpp @@ -188,7 +188,12 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch if (distributor) distributor->abort(); } - + virtual unsigned __int64 queryLookAheadCycles() const override + { + if (distributor) + return distributor->queryLookAheadCycles(); + return 0; + } // IStopInput virtual void stopInput() { @@ -404,6 +409,15 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler { } + virtual unsigned __int64 queryLookAheadCycles() const override + { + CriticalBlock b(fetchStreamCS); + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (fetchStream) + lookAheadCycles += fetchStream->queryLookAheadCycles(); + return lookAheadCycles; + } + // IThorDataLink impl. virtual void start() override { @@ -515,6 +529,8 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler OwnedRoxieString fileName = fetchBaseHelper->getFileName(); { CriticalBlock b(fetchStreamCS); + if (fetchStream) + slaveTimerStats.lookAheadCycles += fetchStream->queryLookAheadCycles(); fetchStream.setown(createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp)); } fetchStreamOut = fetchStream->queryOutput(); diff --git a/thorlcr/activities/fetch/thfetchslave.ipp b/thorlcr/activities/fetch/thfetchslave.ipp index dd391e6f49b..0cc3a17e9ed 100644 --- a/thorlcr/activities/fetch/thfetchslave.ipp +++ b/thorlcr/activities/fetch/thfetchslave.ipp @@ -39,6 +39,7 @@ interface IFetchStream : extends IInterface virtual void abort() = 0; virtual void getStats(CRuntimeStatisticCollection & stats) const = 0; virtual void getFileStats(std::vector> & fileStats, unsigned fileTableStart) const = 0; + virtual unsigned __int64 queryLookAheadCycles() const = 0; }; IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL); diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index b2e5f034417..4fce9175a60 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -505,6 +505,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl RelaxedAtomic numLocalRows {0}; RelaxedAtomic numRemoteRows {0}; RelaxedAtomic sizeRemoteWrite {0}; + RelaxedAtomic lookAheadCycles {0}; void init() { @@ -859,10 +860,19 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl } if (aborted) break; - const void *row = input->ungroupedNextRow(); + const void *row; + if (owner.activity->queryTimeActivities()) + { + CCycleTimer rowTimer; + row = input->ungroupedNextRow(); + lookAheadCycles.fastAdd(rowTimer.elapsedCycles()); + } + else + { + row = input->ungroupedNextRow(); + } if (!row) break; - CTarget *target = nullptr; if (owner.isAll) target = targets.item(0); @@ -947,6 +957,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl stats.setStatistic(StNumRemoteRows, numRemoteRows.load()); stats.setStatistic(StSizeRemoteWrite, sizeRemoteWrite.load()); } + virtual unsigned __int64 queryLookAheadCycles() const + { + return lookAheadCycles.load(); + } // IThreadFactory impl. virtual IPooledThread *createNew() { @@ -1257,6 +1271,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl ihash = NULL; iCompare = NULL; } + virtual void mergeStats(CRuntimeStatisticCollection &stats) const + { + sender.mergeStats(stats); + CriticalBlock block(critPiperd); + if (piperd) + mergeRemappedStats(stats, piperd, diskToTempStatsMap); + } + virtual unsigned __int64 queryLookAheadCycles() const + { + return sender.queryLookAheadCycles(); + } virtual void abort() { if (!aborted) @@ -1451,13 +1476,6 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl virtual void stopRecv() = 0; virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0; - virtual void mergeStats(CRuntimeStatisticCollection &stats) const - { - sender.mergeStats(stats); - CriticalBlock block(critPiperd); - if (piperd) - mergeRemappedStats(stats, piperd, diskToTempStatsMap); - } // IExceptionHandler impl. virtual bool fireException(IException *e) { @@ -4103,6 +4121,15 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress()); } } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (lhsDistributor) + lookAheadCycles += lhsDistributor->queryLookAheadCycles(); + if (rhsDistributor) + lookAheadCycles += rhsDistributor->queryLookAheadCycles(); + return lookAheadCycles; + } }; #ifdef _MSC_VER #pragma warning(pop) @@ -4584,6 +4611,13 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato info.canStall = true; // maybe more? } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorRowAggregator impl virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); } virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); } diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp index 862ec32bdd9..fed9d564f5b 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp @@ -31,6 +31,7 @@ interface IHashDistributor : extends IInterface virtual void join()=0; virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0; virtual void mergeStats(CRuntimeStatisticCollection &stats) const = 0; + virtual unsigned __int64 queryLookAheadCycles() const = 0; virtual void abort()=0; }; diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index c73d933337f..e24cb47d302 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -1159,6 +1159,13 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements merging = false; appendOutputLinked(this); } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorGroupAggregateCallback virtual void processRow(const void *next) { diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 2e9f426a97a..febde309d58 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -2837,6 +2837,15 @@ class CLookupJoinActivityBase : public CInMemJoinBaseisGrouped() == grouped)); // std. lookup join expects these to match } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (rhsDistributor) + lookAheadCycles += rhsDistributor->queryLookAheadCycles(); + if (lhsDistributor) + lookAheadCycles += lhsDistributor->queryLookAheadCycles(); + return lookAheadCycles; + } virtual void reset() override { PARENT::reset(); diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index ff11c691ffc..eb2b065fb05 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -554,7 +554,7 @@ MemoryBuffer &CSlaveActivity::getInitializationData(unsigned slave, MemoryBuffer return mb.append(queryInitializationData(slave)); } -unsigned __int64 CSlaveActivity::queryLocalCycles() const +unsigned __int64 CSlaveActivity::queryLocalCycles(unsigned __int64 totalCycles, unsigned __int64 blockedCycles, unsigned __int64 lookAheadCycles) const { unsigned __int64 inputCycles = 0; if (1 == inputs.ordinality()) @@ -587,11 +587,10 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const break; } } - unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles(); + unsigned __int64 processCycles = totalCycles + lookAheadCycles; if (processCycles < inputCycles) // not sure how/if possible, but guard against return 0; processCycles -= inputCycles; - const unsigned __int64 blockedCycles = queryBlockedCycles(); if (processCycles < blockedCycles) { ActPrintLog("CSlaveActivity::queryLocalCycles - process %" I64F "uns < blocked %" I64F "uns", cycle_to_nanosec(processCycles), cycle_to_nanosec(blockedCycles)); @@ -600,6 +599,11 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const return processCycles-blockedCycles; } +unsigned __int64 CSlaveActivity::queryLocalCycles() const +{ + return queryLocalCycles(queryTotalCycles(), queryBlockedCycles(), queryLookAheadCycles()); +} + void CSlaveActivity::serializeStats(MemoryBuffer &mb) { CriticalBlock b(crit); // JCSMORE not sure what this is protecting.. @@ -619,7 +623,15 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb) queryCodeContext()->gatherStats(serializedStats); // JCS->GH - should these be serialized as cycles, and a different mapping used on master? - serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles())); + // + // Note: Look ahead cycles are not being kept up to date in slaverStats as multiple objects and threads are updating + // look ahead cycles. At the moment, each thread and objects that generate look ahead cycles, track its own look ahead + // cycles and the up to date lookahead cycles is only available with a call to queryLookAheadCycles(). The code would + // need to be refactored to change this behaviour. + unsigned __int64 lookAheadCycles = queryLookAheadCycles(); + unsigned __int64 localCycles = queryLocalCycles(queryTotalCycles(), queryBlockedCycles(), lookAheadCycles); + serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles)); + serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(localCycles)); slaveTimerStats.addStatistics(serializedStats); serializedStats.serialize(mb); ForEachItemIn(i, outputs) diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index aca3bf3a385..5476e5f999a 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -269,6 +269,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool canStall() const; bool isFastThrough() const; bool suppressLookAhead() const; + unsigned __int64 queryLocalCycles(unsigned __int64 totalCycles, unsigned __int64 blockedCycles, unsigned __int64 lookAheadCycles) const; // IThorDataLink virtual CSlaveActivity *queryFromActivity() override { return this; }