Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Nov 13, 2024
1 parent 06a9ffe commit fae3645
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 23 deletions.
12 changes: 12 additions & 0 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ class LookAheadTimer : public SimpleActivityTimer
: SimpleActivityTimer(_accumulator.lookAheadCycles, _enabled) { }
};

#define MEASURE_CYCLES_ATOMIC(atomicvar, enabled, code_block) \
{ \
auto start = (enabled) ? get_cycles_now() : 0; \
code_block; \
if (enabled) \
atomicvar.add(get_cycles_now()-start); \
}

#else
class ActivityTimer
{
Expand All @@ -371,6 +379,10 @@ struct LookAheadTimer
inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled){ }
};

#define MEASURE_CYCLES_ATOMIC(atomicvar, enabled, code_block) \
{\
code_block; \
}
#endif

class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext
Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/diskread/thdiskreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,14 @@ class CDiskGroupAggregateSlave
merging = false;
appendOutputLinked(this);
}

// CSlaveActivity overloaded methods
virtual unsigned __int64 queryLookAheadCycles() const override
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
22 changes: 19 additions & 3 deletions thorlcr/activities/fetch/thfetchslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch
bool abortSoon;
mptag_t tag;
Owned<IRowStream> keyOutStream;
CSlaveActivity &owner;
CActivityBase &owner;
Linked<IThorRowInterfaces> keyRowIf, fetchRowIf;
StringAttr logicalFilename;

Expand Down Expand Up @@ -124,7 +124,7 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CFetchStream(CSlaveActivity &_owner, IThorRowInterfaces *_keyRowIf, IThorRowInterfaces *_fetchRowIf, bool &_abortSoon, const char *_logicalFilename, CPartDescriptorArray &_parts, unsigned _offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *_iFetchHandler, mptag_t _tag, IExpander *_eexp)
CFetchStream(CActivityBase &_owner, IThorRowInterfaces *_keyRowIf, IThorRowInterfaces *_fetchRowIf, bool &_abortSoon, const char *_logicalFilename, CPartDescriptorArray &_parts, unsigned _offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *_iFetchHandler, mptag_t _tag, IExpander *_eexp)
: owner(_owner), keyRowIf(_keyRowIf), fetchRowIf(_fetchRowIf), abortSoon(_abortSoon), logicalFilename(_logicalFilename),
iFetchHandler(_iFetchHandler), offsetCount(_offsetCount), tag(_tag), eexp(_eexp)
{
Expand Down Expand Up @@ -188,7 +188,12 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch
if (distributor)
distributor->abort();
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
if (distributor)
return distributor->queryLookAheadCycles();
return 0;
}
// IStopInput
virtual void stopInput()
{
Expand Down Expand Up @@ -404,6 +409,15 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
{
}

virtual unsigned __int64 queryLookAheadCycles() const override
{
CriticalBlock b(fetchStreamCS);
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (fetchStream)
lookAheadCycles += fetchStream->queryLookAheadCycles();
return lookAheadCycles;
}

// IThorDataLink impl.
virtual void start() override
{
Expand Down Expand Up @@ -515,6 +529,8 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler
OwnedRoxieString fileName = fetchBaseHelper->getFileName();
{
CriticalBlock b(fetchStreamCS);
if (fetchStream)
slaveTimerStats.lookAheadCycles += fetchStream->queryLookAheadCycles();
fetchStream.setown(createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp));
}
fetchStreamOut = fetchStream->queryOutput();
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/fetch/thfetchslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ interface IFetchStream : extends IInterface
virtual void abort() = 0;
virtual void getStats(CRuntimeStatisticCollection & stats) const = 0;
virtual void getFileStats(std::vector<OwnedPtr<CRuntimeStatisticCollection>> & fileStats, unsigned fileTableStart) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
};

IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL);
Expand Down
62 changes: 44 additions & 18 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
RelaxedAtomic<stat_type> numLocalRows {0};
RelaxedAtomic<stat_type> numRemoteRows {0};
RelaxedAtomic<size_t> sizeRemoteWrite {0};
RelaxedAtomic<cycle_t> lookAheadCycles {0};

void init()
{
Expand Down Expand Up @@ -860,13 +861,13 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
if (aborted)
break;
const void *row;
{
LookAheadTimer t(owner.activity->getActivityTimerAccumulator(), owner.activity->queryTimeActivities());
row = input->ungroupedNextRow();
}
MEASURE_CYCLES_ATOMIC(lookAheadCycles, owner.activity->queryTimeActivities(),
{
row = input->ungroupedNextRow();
}
);
if (!row)
break;

CTarget *target = nullptr;
if (owner.isAll)
target = targets.item(0);
Expand Down Expand Up @@ -951,6 +952,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
stats.setStatistic(StNumRemoteRows, numRemoteRows.load());
stats.setStatistic(StSizeRemoteWrite, sizeRemoteWrite.load());
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return lookAheadCycles.load();
}
// IThreadFactory impl.
virtual IPooledThread *createNew()
{
Expand Down Expand Up @@ -1064,7 +1069,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
::ActPrintLogEx(&activity->queryContainer(), e, thorlog_all, MCexception(e), "%s", msg.str());
}
protected:
CSlaveActivity *activity;
CActivityBase *activity;
size32_t inputBufferSize, pullBufferSize;
unsigned writerPoolSize;
unsigned self;
Expand All @@ -1082,7 +1087,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
public:
IMPLEMENT_IINTERFACE_USING(CInterface);

CDistributorBase(CSlaveActivity *_activity, bool _doDedup, bool _isall, IStopInput *_istop, const char *_id)
CDistributorBase(CActivityBase *_activity, bool _doDedup, bool _isall, IStopInput *_istop, const char *_id)
: activity(_activity), recvthread(this), sendthread(this), sender(*this), id(_id)
{
aborted = connected = false;
Expand Down Expand Up @@ -1171,6 +1176,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
return ssz.size();
}

// IHashDistributor
virtual void setBufferSizes(unsigned _inputBufferSize, unsigned _bucketSendSize, unsigned _pullBufferSize)
{
if (_inputBufferSize) inputBufferSize = _inputBufferSize;
Expand Down Expand Up @@ -1261,6 +1267,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
ihash = NULL;
iCompare = NULL;
}
virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
return sender.queryLookAheadCycles();
}
virtual void abort()
{
if (!aborted)
Expand Down Expand Up @@ -1455,13 +1472,6 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl
virtual void stopRecv() = 0;
virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0;

virtual void mergeStats(CRuntimeStatisticCollection &stats) const
{
sender.mergeStats(stats);
CriticalBlock block(critPiperd);
if (piperd)
mergeRemappedStats(stats, piperd, diskToTempStatsMap);
}
// IExceptionHandler impl.
virtual bool fireException(IException *e)
{
Expand Down Expand Up @@ -1534,7 +1544,7 @@ class CRowDistributor: public CDistributorBase
ICommunicator &comm;
bool stopping;
public:
CRowDistributor(CSlaveActivity *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, bool isAll, IStopInput *istop, const char *id)
CRowDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, bool isAll, IStopInput *istop, const char *id)
: CDistributorBase(activity, doDedup, isAll, istop, id), comm(_comm), tag(_tag)
{
stopping = false;
Expand Down Expand Up @@ -1795,7 +1805,7 @@ class CRowPullDistributor: public CDistributorBase
selfdone.reinit();
}
public:
CRowPullDistributor(CSlaveActivity *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id)
CRowPullDistributor(CActivityBase *activity, ICommunicator &_comm, mptag_t _tag, bool doDedup, IStopInput *istop, const char *id)
: CDistributorBase(activity, doDedup, false, istop, id), comm(_comm), tag(_tag)
{
pull = true;
Expand Down Expand Up @@ -2090,12 +2100,12 @@ class CRowPullDistributor: public CDistributorBase
//==================================================================================================


IHashDistributor *createHashDistributor(CSlaveActivity *activity, ICommunicator &comm, mptag_t tag, bool doDedup, bool isAll, IStopInput *istop, const char *id)
IHashDistributor *createHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, bool isAll, IStopInput *istop, const char *id)
{
return new CRowDistributor(activity, comm, tag, doDedup, isAll, istop, id);
}

IHashDistributor *createPullHashDistributor(CSlaveActivity *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id=NULL)
IHashDistributor *createPullHashDistributor(CActivityBase *activity, ICommunicator &comm, mptag_t tag, bool doDedup, IStopInput *istop, const char *id=NULL)
{
return new CRowPullDistributor(activity, comm, tag, doDedup, istop, id);
}
Expand Down Expand Up @@ -4107,6 +4117,15 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput
activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress());
}
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
};
#ifdef _MSC_VER
#pragma warning(pop)
Expand Down Expand Up @@ -4588,6 +4607,13 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato
info.canStall = true;
// maybe more?
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorRowAggregator impl
virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); }
virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); }
Expand Down
3 changes: 2 additions & 1 deletion thorlcr/activities/hashdistrib/thhashdistribslave.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ interface IHashDistributor : extends IInterface
virtual void join()=0;
virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
virtual void mergeStats(CRuntimeStatisticCollection &stats) const = 0;
virtual unsigned __int64 queryLookAheadCycles() const = 0;
virtual void abort()=0;
};

interface IStopInput;
IHashDistributor *createHashDistributor(
CSlaveActivity *activity,
CActivityBase *activity,
ICommunicator &comm,
mptag_t tag,
bool dedup,
Expand Down
7 changes: 7 additions & 0 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,13 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
merging = false;
appendOutputLinked(this);
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (distributor)
lookAheadCycles += distributor->queryLookAheadCycles();
return lookAheadCycles;
}
// IHThorGroupAggregateCallback
virtual void processRow(const void *next)
{
Expand Down
9 changes: 9 additions & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,15 @@ class CLookupJoinActivityBase : public CInMemJoinBase<HTHELPER, IHThorHashJoinAr
PARENT::start();
dbgassertex(isSmart() || (leftITDL->isGrouped() == grouped)); // std. lookup join expects these to match
}
virtual unsigned __int64 queryLookAheadCycles() const
{
cycle_t lookAheadCycles = slaveTimerStats.lookAheadCycles;
if (rhsDistributor)
lookAheadCycles += rhsDistributor->queryLookAheadCycles();
if (lhsDistributor)
lookAheadCycles += lhsDistributor->queryLookAheadCycles();
return lookAheadCycles;
}
virtual void reset() override
{
PARENT::reset();
Expand Down

0 comments on commit fae3645

Please sign in to comment.