Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>

Track start time for project(strands)

Signed-off-by: Shamser Ahmed <[email protected]>

HPCC-32792 Save debug

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Oct 16, 2024
1 parent 09d82ed commit 00a4613
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 14 deletions.
4 changes: 2 additions & 2 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ class ActivityTimer
else
startCycles = 0;
}

~ActivityTimer()
{
if (likely(enabled))
Expand All @@ -313,6 +312,8 @@ class SimpleActivityTimer
{
cycle_t startCycles;
cycle_t &accumulator;
StringBuffer name;
unsigned activityId=0;
protected:
const bool enabled;
public:
Expand All @@ -324,7 +325,6 @@ class SimpleActivityTimer
else
startCycles = 0;
}

inline ~SimpleActivityTimer()
{
if (likely(enabled))
Expand Down
13 changes: 10 additions & 3 deletions thorlcr/activities/funnel/thfunnelslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,18 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
unsigned numRows = 0;
try
{
funnel.activity.startInput(inputIndex);
{
LookAheadTimer t(funnel.activity.slaveTimerStats, funnel.activity.queryTimeActivities());
funnel.activity.startInput(inputIndex);
}
started = true;
inputStream = funnel.activity.queryInputStream(inputIndex);
while (!stopping)
{
numRows = 0;
for (;numRows < chunkSize; numRows++)
{
LookAheadTimer t(funnel.activity.slaveTimerStats, funnel.activity.queryTimeActivities());
const void * row = inputStream->ungroupedNextRow();
if (!row)
break;
Expand Down Expand Up @@ -354,7 +358,6 @@ class FunnelSlaveActivity : public CSlaveActivity
}
virtual void start() override
{
ActivityTimer s(slaveTimerStats, timeActivities);
if (!grouped && parallel)
{
//NB starts inputs on each thread
Expand All @@ -376,7 +379,11 @@ class FunnelSlaveActivity : public CSlaveActivity

auto startInputNFunc = [&](unsigned i)
{
try { startInput(i); }
try
{
LookAheadTimer s(slaveTimerStats, timeActivities);
startInput(i);
}
catch (CATCHALL)
{
ActPrintLog("FUNNEL(%" ACTPF "d): Error staring input %d", container.queryId(), i);
Expand Down
3 changes: 3 additions & 0 deletions thorlcr/activities/join/thjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS
}
virtual void start() override
{
ActivityTimer s(slaveTimerStats, timeActivities);
CThorNarySlaveActivity::start();

ForEachItemIn(i1, expandedInputs)
Expand All @@ -682,6 +683,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS
}
CATCH_NEXTROW()
{
ActivityTimer s(slaveTimerStats, timeActivities);
OwnedConstThorRow ret = processor.nextRow();
if (ret)
{
Expand All @@ -692,6 +694,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS
}
virtual const void *nextRowGE(const void *seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
{
ActivityTimer s(slaveTimerStats, timeActivities);
try { return nextRowGENoCatch(seek, numFields, wasCompleteMatch, stepExtra); }
CATCH_NEXTROWX_CATCH;
}
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
virtual void start() override
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
joined = 0;
joinCounter = 0;
candidateCounter = 0;
Expand Down
1 change: 0 additions & 1 deletion thorlcr/activities/loop/thloopslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ class CConditionalActivity : public CSlaveActivity
}
virtual void start() override
{
ActivityTimer s(slaveTimerStats, timeActivities);
stopUnselectedInputs();
if (queryInput(branch))
{
Expand Down
4 changes: 4 additions & 0 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
}
void prepareInput()
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
// NB: called by 1st output to start()
CriticalBlock block(prepareInputLock);
if (!inputPrepared)
Expand Down Expand Up @@ -299,7 +300,10 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
inline const void *nextRow(unsigned outIdx, rowcount_t current)
{
if (1 == activeOutputCount) // will be true, if only 1 input connected, or only 1 input was active (others stopped) when it started reading
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
return inputStream->nextRow();
}
if (recsReady == current && writeAheadException.get())
throw LINK(writeAheadException);
return sharedRowStream->queryOutput(outIdx)->nextRow(); // will block until available
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/project/thprojectslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class CPrefetchProjectSlaveActivity : public CSlaveActivity
~CPrefetcher() { stop(); }
PrefetchInfo *pullRecord()
{
LookAheadTimer t(parent.slaveTimerStats, parent.timeActivities);
OwnedConstThorRow row = parent.inputStream->nextRow();
if (row)
{
Expand Down
6 changes: 6 additions & 0 deletions thorlcr/activities/when/thwhenslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class CWhenSlaveActivity : public CDependencyExecutorSlaveActivity
info.fastThrough = false;
calcMetaInfoSize(info, queryInput(0));
}
// IThorDataLink
virtual void start() override
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
PARENT::start();
}
};

////////////////////
Expand Down
22 changes: 15 additions & 7 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,21 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
break;
}
}
unsigned __int64 localCycles = queryTotalCycles();
if (localCycles < inputCycles) // not sure how/if possible, but guard against
return 0;
localCycles -= inputCycles;
unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles();
if (processCycles < inputCycles) // not sure how/if possible, but guard against
{
throw makeStringExceptionV(-1, "CSlaveActivity::queryLocalCycles a%u - processCycles %" I64F "u < inputCycles %" I64F "u", queryActivityId(), processCycles, inputCycles);
// IWARNLOG("CSlaveActivity::queryLocalCycles a%u - processCycles %" I64F "u < inputCycles %" I64F "u", queryActivityId(), processCycles, inputCycles);
// return 0;
}
processCycles -= inputCycles;
const unsigned __int64 blockedCycles = queryBlockedCycles();
if (localCycles < blockedCycles)
if (processCycles < blockedCycles)
{
IWARNLOG("CSlaveActivity::queryLocalCycles a%u - processCycles %" I64F "u < blockedCycles %" I64F "u",queryActivityId(), processCycles, blockedCycles);
return 0;
return localCycles-blockedCycles;
}
return processCycles-blockedCycles;
}

void CSlaveActivity::serializeStats(MemoryBuffer &mb)
Expand Down Expand Up @@ -726,6 +733,7 @@ void CThorStrandedActivity::strandedStop()
//For some reason gcc doesn't let you specify a function as pure virtual and define it at the same time.
void CThorStrandedActivity::start()
{
SimpleActivityTimer s(startTime, timeActivities);
CSlaveActivity::start();
startJunction(splitter);
onStartStrands();
Expand Down Expand Up @@ -849,7 +857,7 @@ IStrandJunction *CThorStrandedActivity::getOutputStreams(CActivityBase &ctx, uns

unsigned __int64 CThorStrandedActivity::queryTotalCycles() const
{
unsigned __int64 total = 0;;
cycle_t total = startTime;
ForEachItemIn(i, strands)
{
CThorStrandProcessor &strand = strands.item(i);
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
{
mutable MemoryBuffer *data;
mutable CriticalSection crit;

protected:
CThorInputArray inputs;
IPointerArrayOf<IThorDataLink> outputs;
Expand Down Expand Up @@ -473,6 +472,7 @@ class graphslave_decl CThorStrandedActivity : public CSlaveActivity
{
typedef CSlaveActivity PARENT;
protected:
cycle_t startTime = 0;
CThorStrandOptions strandOptions;
IArrayOf<CThorStrandProcessor> strands;
Owned<IStrandBranch> branch;
Expand Down

0 comments on commit 00a4613

Please sign in to comment.