Skip to content

Commit

Permalink
HPCC-29817 Changes following review
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Sep 21, 2023
1 parent 7549254 commit a3d679b
Show file tree
Hide file tree
Showing 18 changed files with 79 additions and 121 deletions.
24 changes: 6 additions & 18 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,26 +673,17 @@ class THORHELPER_API IndirectCodeContext : implements ICodeContext
protected:
ICodeContext * ctx;
};
class CThorBaseContextLogger : public CSimpleInterfaceOf<IContextLogger>
class CStatsContextLogger : public CSimpleInterfaceOf<IContextLogger>
{
protected:
const LogMsgJobInfo & job;
unsigned traceLevel = 1;
LogTrace logTrace;
mutable CRuntimeStatisticCollection stats;

public:
CThorBaseContextLogger() : stats(jhtreeCacheStatistics)
{
}
virtual void CTXLOG(const char *format, ...) const override __attribute__((format(printf,2,3)))
{
va_list args;
va_start(args, format);
CTXLOGva(MCdebugProgress, unknownJob, NoLogMsgCode, format, args);
va_end(args);
}
CStatsContextLogger(const CRuntimeStatisticCollection &_mapping, const LogMsgJobInfo _job=unknownJob) : job(_job), stats(_mapping) {}

virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0)))
virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0)))
{
VALOG(cat, job, code, format, args);
}
Expand All @@ -708,7 +699,7 @@ class CThorBaseContextLogger : public CSimpleInterfaceOf<IContextLogger>
E->errorMessage(ss.append(": "));
if (format)
ss.append(": ").valist_appendf(format, args);
LOG(MCoperatorProgress, unknownJob, "%s", ss.str());
LOG(MCoperatorProgress, queryJob(), "%s", ss.str());
}
virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const override
{
Expand Down Expand Up @@ -770,10 +761,7 @@ class CThorBaseContextLogger : public CSimpleInterfaceOf<IContextLogger>
{
previous.updateDelta(to, stats);
}
void reset()
{
stats.reset();
}
virtual const LogMsgJobInfo & queryJob() const override { return job; }
};

extern THORHELPER_API bool isActivitySink(ThorActivityKind kind);
Expand Down
4 changes: 0 additions & 4 deletions ecl/hthor/hthor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -3239,10 +3239,6 @@ protected:
void onLimitExceeded();
};

// improvement: override constructor to store setHttpIdHeaderNames, override CTXLOG & logOperatorExceptionVA to log LogMsgJobInfo
class CHThorContextLogger : public CThorBaseContextLogger
{
};

#define MAKEFACTORY(NAME) \
extern HTHOR_API IHThorActivity * create ## NAME ## Activity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThor ## NAME ## Arg &arg, ThorActivityKind kind, EclGraph & _graph) \
Expand Down
24 changes: 12 additions & 12 deletions ecl/hthor/hthorkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "thorcommon.hpp"
#include "rtldynfield.hpp"
#include "thorfile.hpp"
#include "jstats.h"

#define MAX_FETCH_LOOKAHEAD 1000

Expand Down Expand Up @@ -268,7 +269,7 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
unsigned keyIndexCacheIdx = 0;

unsigned postFiltered;
CHThorContextLogger contextLogger;
CStatsContextLogger contextLogger;
bool singlePart = false; // a single part index, not part of a super file - optimize so never reload the part.
bool localSortKey = false;
bool initializedFileInfo = false;
Expand All @@ -290,7 +291,7 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
};

CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), contextLogger(jhtreeCacheStatistics)
{
nextPartNumber = 0;

Expand Down Expand Up @@ -3020,11 +3021,11 @@ class KeyedLookupPartHandler : extends ThreadedPartHandler<MatchSet>, implements
Owned<IKeyManager> manager;
IAgentContext &agent;
DistributedKeyLookupHandler * tlk;
CHThorContextLogger &contextLogger;
IContextLogger &contextLogger;
public:
IMPLEMENT_IINTERFACE;

KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, CHThorContextLogger & _contextLogger);
KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, IContextLogger & _contextLogger);

~KeyedLookupPartHandler()
{
Expand Down Expand Up @@ -3074,7 +3075,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
Owned<IThreadPool> threadPool;
IntArray subSizes;
IAgentContext &agent;
CHThorContextLogger &contextLogger;
IContextLogger &contextLogger;

void addFile(IDistributedFile &f)
{
Expand All @@ -3095,7 +3096,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
public:
IMPLEMENT_IINTERFACE;

DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, CHThorContextLogger & _contextLogger)
DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, IContextLogger & _contextLogger)
: owner(_owner), file(f), agent(_agent), contextLogger(_contextLogger)
{
threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory));
Expand Down Expand Up @@ -3190,7 +3191,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
const IDynamicTransform * queryRecordLayoutTranslator() const { return trans; }
};

KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, CHThorContextLogger & _contextLogger)
KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, IContextLogger & _contextLogger)
: ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk), contextLogger(_contextLogger)
{
}
Expand All @@ -3215,13 +3216,13 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
IJoinProcessor &owner;
IAgentContext &agent;
bool opened;
CHThorContextLogger &contextLogger;
IContextLogger &contextLogger;

public:
IMPLEMENT_IINTERFACE;


MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, CHThorContextLogger & _contextLogger)
MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, IContextLogger & _contextLogger)
: file(f), owner(_owner), agent(_agent), opened(false), contextLogger(_contextLogger)
{
super = f->querySuperFile();
Expand Down Expand Up @@ -3401,11 +3402,11 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
Owned<const IDynamicTransform> translator;
RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
bool isCodeSigned = false;
CHThorContextLogger contextLogger;
CStatsContextLogger contextLogger;

public:
CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg)
: CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg), contextLogger(jhtreeCacheStatistics)
{
prefiltered = 0;
postfiltered = 0;
Expand Down Expand Up @@ -3985,7 +3986,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
manager->finishSegmentMonitors();
manager->reset();
manager->resetCounts();
contextLogger.reset();
}

virtual void doneManager(IKeyManager * manager)
Expand Down
8 changes: 6 additions & 2 deletions roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ class InlineXmlDataReader : public WorkUnitRowReaderBase

//---------------------------------------------------------------------------------------

static const StatisticsMapping roxieGraphStatistics({});
static const StatisticsMapping graphStatistics({});
class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext, implements roxiemem::ITimeLimiter, implements IRowAllocatorMetaActIdCacheCallback, public CInterface
{
protected:
Expand Down Expand Up @@ -1265,7 +1265,7 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
public:
IMPLEMENT_IINTERFACE;
CRoxieContextBase(const IQueryFactory *_factory, const IRoxieContextLogger &_logctx)
: factory(_factory), options(factory->queryOptions()), logctx(_logctx), globalStats(roxieGraphStatistics)
: factory(_factory), options(factory->queryOptions()), logctx(_logctx), globalStats(graphStatistics)
{
startTime = lastWuAbortCheck = msTick();
persists = NULL;
Expand Down Expand Up @@ -1331,6 +1331,10 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
{
globalStats.recordStatistics(progress, false);
}
virtual void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous)
{
globalStats.updateDelta(to, globalStats);
}
virtual void CTXLOGa(TracingCategory category, const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *prefix, const char *text) const override
{
logctx.CTXLOGa(category, cat, job, code, prefix, text);
Expand Down
1 change: 0 additions & 1 deletion system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,6 @@ bool CKeyCursor::lookupSkip(const void *seek, size32_t seekOffset, size32_t seek
else
stats.noteSkips(0, 1);
bool ret = lookup(true, stats);

#ifdef _DEBUG
if (doTrace(traceSmartStepping, TraceFlags::Max))
{
Expand Down
4 changes: 2 additions & 2 deletions system/jlib/jlog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2776,7 +2776,7 @@ void IContextLogger::CTXLOG(const char *format, ...) const
{
va_list args;
va_start(args, format);
CTXLOGva(MCdebugInfo, unknownJob, NoLogMsgCode, format, args);
CTXLOGva(MCdebugInfo, queryJob(), NoLogMsgCode, format, args);
va_end(args);
}

Expand Down Expand Up @@ -2848,7 +2848,7 @@ class DummyLogCtx : implements IContextLogger
E->errorMessage(ss.append(": "));
if (format)
ss.append(": ").valist_appendf(format, args);
LOG(MCoperatorProgress, unknownJob, "%s", ss.str());
LOG(MCoperatorProgress, queryJob(), "%s", ss.str());
}
virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const
{
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jlog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,7 @@ interface jlib_decl IContextLogger : extends IInterface
virtual const char *queryCallerId() const = 0;
virtual const CRuntimeStatisticCollection & queryStats() const = 0;
virtual void recordStatistics(IStatisticGatherer &progress) const = 0;
virtual const LogMsgJobInfo & queryJob() const { return unknownJob; }
};

extern jlib_decl StringBuffer &appendGloballyUniqueId(StringBuffer &s);
Expand Down
16 changes: 0 additions & 16 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1319,26 +1319,10 @@ bool StatisticsMapping::equals(const StatisticsMapping & other)
}

// stat. mappings shared between master and slave activities
const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile});
const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks,
StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles,
StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches,
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles});
const StatisticsMapping basicActivityStatistics({StTimeLocalExecute, StTimeBlocked});
const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics);
const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, diskReadRemoteStatistics, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping indexWriteActivityStatistics({StPerReplicated, StNumLeafCacheAdds, StNumNodeCacheAdds, StNumBlobCacheAdds }, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping keyedJoinActivityStatistics({ StNumIndexAccepted, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping loopActivityStatistics({StNumIterations}, basicActivityStatistics);
const StatisticsMapping lookupJoinActivityStatistics({StNumSmartJoinSlavesDegradedToStd, StNumSmartJoinDegradedToLocal}, basicActivityStatistics);
const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics, spillStatistics);
const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics);
const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);

const StatisticsMapping allStatistics(StKindAll);
const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans});
Expand Down
19 changes: 1 addition & 18 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -942,23 +942,6 @@ class jlib_decl RuntimeStatisticTarget : implements IStatisticTarget
extern jlib_decl StringBuffer & formatMoney(StringBuffer &out, unsigned __int64 value);
extern jlib_decl stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection);

//statistics gathered by the different activities
extern jlib_decl const StatisticsMapping spillStatistics;
extern jlib_decl const StatisticsMapping jhtreeCacheStatistics;
extern jlib_decl const StatisticsMapping basicActivityStatistics;
extern jlib_decl const StatisticsMapping groupActivityStatistics;
extern jlib_decl const StatisticsMapping hashJoinActivityStatistics;
extern jlib_decl const StatisticsMapping indexReadActivityStatistics;
extern jlib_decl const StatisticsMapping indexWriteActivityStatistics;
extern jlib_decl const StatisticsMapping joinActivityStatistics;
extern jlib_decl const StatisticsMapping keyedJoinActivityStatistics;
extern jlib_decl const StatisticsMapping lookupJoinActivityStatistics;
extern jlib_decl const StatisticsMapping loopActivityStatistics;
extern jlib_decl const StatisticsMapping diskReadActivityStatistics;
extern jlib_decl const StatisticsMapping diskReadPartStatistics;
extern jlib_decl const StatisticsMapping diskWriteActivityStatistics;
extern jlib_decl const StatisticsMapping sortActivityStatistics;

extern jlib_decl const StatisticsMapping graphStatistics;
extern jlib_decl const StatisticsMapping indexDistribActivityStatistics;

#endif
4 changes: 2 additions & 2 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2454,7 +2454,7 @@ class ReDistributeSlaveActivity : public HashDistributeSlaveActivity
class IndexDistributeSlaveActivity : public HashDistributeSlaveBase
{
typedef HashDistributeSlaveBase PARENT;
CThorContextLogger contextLogger;
CStatsContextLogger contextLogger;
CStatsCtxLoggerDeltaUpdater statsUpdater;

class CKeyLookup : implements IHash
Expand Down Expand Up @@ -2487,7 +2487,7 @@ class IndexDistributeSlaveActivity : public HashDistributeSlaveBase
} *lookup;

public:
IndexDistributeSlaveActivity(CGraphElementBase *container) : PARENT(container, indexDistribActivityStatistics), lookup(NULL), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
IndexDistributeSlaveActivity(CGraphElementBase *container) : PARENT(container, indexDistribActivityStatistics), lookup(NULL), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
{
}
~IndexDistributeSlaveActivity()
Expand Down
5 changes: 2 additions & 3 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
Owned<IFileIO> lazyIFileIO;
mutable CriticalSection ioStatsCS;
unsigned fileTableStart = NotFound;
CThorContextLogger contextLogger;
CStatsContextLogger contextLogger;
CStatsCtxLoggerDeltaUpdater statsUpdater;

class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
Expand Down Expand Up @@ -528,8 +528,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
public:
CIndexReadSlaveBase(CGraphElementBase *container)
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this),
statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
{
helper = (IHThorIndexReadBaseArg *)container->queryHelper();
limitTransformExtra = nullptr;
Expand Down
6 changes: 3 additions & 3 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
unsigned candidateCount;
__int64 lastSeeks, lastScans;
IConstPointerArrayOf<ITranslator> translators;
CThorContextLogger contextLogger;
CStatsContextLogger contextLogger;

inline void updateJhTreeStats()
{
Expand Down Expand Up @@ -1253,7 +1253,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CKeyLocalLookup(CKeyedJoinSlave &_owner, const RtlRecord &_keyRecInfo) : owner(_owner), keyRecInfo(_keyRecInfo), indexReadFieldsRow(_owner.indexInputAllocator)
CKeyLocalLookup(CKeyedJoinSlave &_owner, const RtlRecord &_keyRecInfo) : owner(_owner), keyRecInfo(_keyRecInfo), indexReadFieldsRow(_owner.indexInputAllocator), contextLogger(jhtreeCacheStatistics, thorJob)
{
tlkManager.setown(owner.keyHasTlk ? createLocalKeyManager(keyRecInfo, nullptr, &contextLogger, owner.helper->hasNewSegmentMonitors(), false) : nullptr);
reset();
Expand Down Expand Up @@ -1363,7 +1363,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
++candidateCount;
if (candidateCount > owner.atMost)
break;
KLBlobProviderAdapter adapter(partManager, & contextLogger);
KLBlobProviderAdapter adapter(partManager, &contextLogger);
byte const * keyRow = partManager->queryKeyBuffer();
size_t fposOffset = partManager->queryRowSize() - sizeof(offset_t);
offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset);
Expand Down
Loading

0 comments on commit a3d679b

Please sign in to comment.