Skip to content

Commit

Permalink
HPCC-29817 Eliminate querySeeks, queryScan & queryWildSeeks for hThor…
Browse files Browse the repository at this point in the history
… and Roxie

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Sep 22, 2023
1 parent 2078c1b commit fc1c60a
Show file tree
Hide file tree
Showing 24 changed files with 213 additions and 258 deletions.
91 changes: 91 additions & 0 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef THORCOMMON_HPP
#define THORCOMMON_HPP

#include "jlog.hpp"
#include "jiface.hpp"
#include "jcrc.hpp"
#include "jlzw.hpp"
Expand Down Expand Up @@ -672,6 +673,96 @@ class THORHELPER_API IndirectCodeContext : implements ICodeContext
protected:
ICodeContext * ctx;
};
class CStatsContextLogger : public CSimpleInterfaceOf<IContextLogger>
{
protected:
const LogMsgJobInfo & job;
unsigned traceLevel = 1;
LogTrace logTrace;
mutable CRuntimeStatisticCollection stats;
public:
CStatsContextLogger(const CRuntimeStatisticCollection &_mapping, const LogMsgJobInfo & _job=unknownJob) : job(_job), stats(_mapping) {}

virtual void CTXLOGva(const LogMsgCategory & cat, const LogMsgJobInfo & job, LogMsgCode code, const char *format, va_list args) const override __attribute__((format(printf,5,0)))
{
VALOG(cat, job, code, format, args);
}
virtual void logOperatorExceptionVA(IException *E, const char *file, unsigned line, const char *format, va_list args) const __attribute__((format(printf,5,0)))
{
StringBuffer ss;
ss.append("ERROR");
if (E)
ss.append(": ").append(E->errorCode());
if (file)
ss.appendf(": %s(%d) ", file, line);
if (E)
E->errorMessage(ss.append(": "));
if (format)
ss.append(": ").valist_appendf(format, args);
LOG(MCoperatorProgress, queryJob(), "%s", ss.str());
}
virtual void noteStatistic(StatisticKind kind, unsigned __int64 value) const override
{
stats.addStatisticAtomic(kind, value);
}
virtual void setStatistic(StatisticKind kind, unsigned __int64 value) const override
{
stats.setStatistic(kind, value);
}
virtual void mergeStats(const CRuntimeStatisticCollection &from) const override
{
stats.merge(from);
}
virtual unsigned queryTraceLevel() const override
{
return traceLevel;
}
virtual void setGlobalId(const char *id, SocketEndpoint &ep, unsigned pid) override
{
logTrace.setGlobalId(id);
}
virtual void setCallerId(const char *id) override
{
logTrace.setCallerId(id);
}
virtual const char *queryGlobalId() const override
{
return logTrace.queryGlobalId();
}
virtual const char *queryLocalId() const override
{
return logTrace.queryLocalId();
}
virtual const char *queryCallerId() const override
{
return logTrace.queryCallerId();
}
virtual void setHttpIdHeaderNames(const char *global, const char *caller) override
{
logTrace.setHttpIdHeaderNames(global, caller);
}
virtual const char *queryGlobalIdHttpHeaderName() const override
{
return logTrace.queryGlobalIdHTTPHeaderName();
}
virtual const char *queryCallerIdHttpHeaderName() const override
{
return logTrace.queryCallerIdHTTPHeaderName();
}
virtual const CRuntimeStatisticCollection &queryStats() const override
{
return stats;
}
virtual void recordStatistics(IStatisticGatherer &progress) const override
{
stats.recordStatistics(progress, false);
}
void updateStatsDeltaTo(CRuntimeStatisticCollection &to, CRuntimeStatisticCollection &previous)
{
previous.updateDelta(to, stats);
}
virtual const LogMsgJobInfo & queryJob() const override { return job; }
};

extern THORHELPER_API bool isActivitySink(ThorActivityKind kind);
extern THORHELPER_API bool isActivitySource(ThorActivityKind kind);
Expand Down
92 changes: 31 additions & 61 deletions ecl/hthor/hthorkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "thorcommon.hpp"
#include "rtldynfield.hpp"
#include "thorfile.hpp"
#include "jstats.h"

#define MAX_FETCH_LOOKAHEAD 1000

Expand Down Expand Up @@ -200,24 +201,10 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
{
CHThorActivityBase::updateProgress(progress);
StatsActivityScope scope(progress, activityId);
contextLogger.recordStatistics(progress);
progress.addStatistic(StNumPostFiltered, queryPostFiltered());
progress.addStatistic(StNumIndexSeeks, querySeeks());
progress.addStatistic(StNumIndexScans, queryScans());
progress.addStatistic(StNumIndexWildSeeks, queryWildSeeks());
}

virtual unsigned querySeeks() const
{
return seeks + (klManager ? klManager->querySeeks() : 0);
}
virtual unsigned queryScans() const
{
return scans + (klManager ? klManager->queryScans() : 0);
}
virtual unsigned queryWildSeeks() const
{
return wildseeks + (klManager ? klManager->queryWildSeeks() : 0);
}
virtual unsigned queryPostFiltered() const
{
return postFiltered;
Expand Down Expand Up @@ -281,10 +268,8 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
UnsignedArray superIndexCache;
unsigned keyIndexCacheIdx = 0;

unsigned seeks;
unsigned scans;
unsigned postFiltered;
unsigned wildseeks;
CStatsContextLogger contextLogger;
bool singlePart = false; // a single part index, not part of a super file - optimize so never reload the part.
bool localSortKey = false;
bool initializedFileInfo = false;
Expand All @@ -306,16 +291,13 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
};

CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), contextLogger(jhtreeCacheStatistics)
{
nextPartNumber = 0;

eclKeySize.set(helper.queryDiskRecordSize());

postFiltered = 0;
seeks = 0;
scans = 0;
wildseeks = 0;
helper.setCallback(&callback);
limitTransformExtra = nullptr;
if (_node)
Expand Down Expand Up @@ -437,7 +419,7 @@ bool CHThorIndexReadActivityBase::doPreopenLimitFile(unsigned __int64 & count, u
{
Owned<IKeyIndex> tlk = openKeyFile(df->queryPart(num));
verifyIndex(tlk);
Owned<IKeyManager> tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, NULL, helper.hasNewSegmentMonitors(), false);
Owned<IKeyManager> tlman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &contextLogger, helper.hasNewSegmentMonitors(), false);
initManager(tlman, true);
while(tlman->lookup(false) && (count<=limit))
{
Expand Down Expand Up @@ -473,7 +455,7 @@ IKeyIndex * CHThorIndexReadActivityBase::doPreopenLimitPart(unsigned __int64 & r
verifyIndex(kidx);
if (limit != (unsigned) -1)
{
Owned<IKeyManager> kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, NULL, helper.hasNewSegmentMonitors(), false);
Owned<IKeyManager> kman = createLocalKeyManager(eclKeySize.queryRecordAccessor(true), kidx, &contextLogger, helper.hasNewSegmentMonitors(), false);
initManager(kman, false);
result += kman->checkCount(limit-result);
}
Expand Down Expand Up @@ -572,10 +554,10 @@ void CHThorIndexReadActivityBase::initManager(IKeyManager *manager, bool isTlk)
manager->reset();
}

void CHThorIndexReadActivityBase::initPart()
{
void CHThorIndexReadActivityBase::initPart()
{
assertex(!keyIndex->isTopLevelKey());
klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, NULL, helper.hasNewSegmentMonitors(), false));
klManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), keyIndex, &contextLogger, helper.hasNewSegmentMonitors(), false));
initManager(klManager, false);
callback.setManager(klManager, nullptr);
}
Expand All @@ -584,12 +566,7 @@ void CHThorIndexReadActivityBase::killPart()
{
callback.setManager(nullptr, nullptr);
if (klManager)
{
seeks += klManager->querySeeks();
scans += klManager->queryScans();
wildseeks += klManager->queryWildSeeks();
klManager.clear();
}
}

bool CHThorIndexReadActivityBase::setCurrentPart(unsigned whichPart)
Expand All @@ -614,7 +591,7 @@ bool CHThorIndexReadActivityBase::firstMultiPart()
if(!tlk)
openTlk();
verifyIndex(tlk);
tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, NULL, helper.hasNewSegmentMonitors(), false));
tlManager.setown(createLocalKeyManager(eclKeySize.queryRecordAccessor(true), tlk, &contextLogger, helper.hasNewSegmentMonitors(), false));
initManager(tlManager, true);
nextPartNumber = 0;
return nextMultiPart();
Expand Down Expand Up @@ -3044,11 +3021,11 @@ class KeyedLookupPartHandler : extends ThreadedPartHandler<MatchSet>, implements
Owned<IKeyManager> manager;
IAgentContext &agent;
DistributedKeyLookupHandler * tlk;

IContextLogger &contextLogger;
public:
IMPLEMENT_IINTERFACE;

KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent);
KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, IContextLogger & _contextLogger);

~KeyedLookupPartHandler()
{
Expand Down Expand Up @@ -3098,6 +3075,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
Owned<IThreadPool> threadPool;
IntArray subSizes;
IAgentContext &agent;
IContextLogger &contextLogger;

void addFile(IDistributedFile &f)
{
Expand All @@ -3108,7 +3086,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
for (unsigned idx = 0; idx < numParts; idx++)
{
IDistributedFilePart *part = f.getPart(idx);
parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent));
parts.append(*new KeyedLookupPartHandler(owner, part, this, tlks.ordinality(), threadPool, agent, contextLogger));
}
keyFiles.append(OLINK(f));
tlks.append(*f.getPart(numParts));
Expand All @@ -3118,8 +3096,8 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
public:
IMPLEMENT_IINTERFACE;

DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
: owner(_owner), file(f), agent(_agent)
DistributedKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, IContextLogger & _contextLogger)
: owner(_owner), file(f), agent(_agent), contextLogger(_contextLogger)
{
threadPool.setown(createThreadPool("hthor keyed join lookup thread pool", &threadFactory));
IDistributedSuperFile *super = f->querySuperFile();
Expand Down Expand Up @@ -3184,7 +3162,7 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
//Owned<IRecordLayoutTranslator>
trans.setown(owner.getLayoutTranslator(&f));
owner.verifyIndex(&f, index, trans);
Owned<IKeyManager> manager = createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false);
Owned<IKeyManager> manager = createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false);
managers.append(*manager.getLink());
}
opened = true;
Expand Down Expand Up @@ -3213,8 +3191,8 @@ class DistributedKeyLookupHandler : public CInterface, implements IThreadedExcep
const IDynamicTransform * queryRecordLayoutTranslator() const { return trans; }
};

KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent)
: ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk)
KeyedLookupPartHandler::KeyedLookupPartHandler(IJoinProcessor &_owner, IDistributedFilePart *_part, DistributedKeyLookupHandler * _tlk, unsigned _subno, IThreadPool * _threadPool, IAgentContext &_agent, IContextLogger & _contextLogger)
: ThreadedPartHandler<MatchSet>(_part, _tlk, _threadPool), owner(_owner), agent(_agent), tlk(_tlk), contextLogger(_contextLogger)
{
}

Expand All @@ -3223,7 +3201,7 @@ void KeyedLookupPartHandler::openPart()
if(manager)
return;
Owned<IKeyIndex> index = openKeyFile(*part);
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false));
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false));
const IDynamicTransform * trans = tlk->queryRecordLayoutTranslator();
if(trans && !index->isTopLevelKey())
manager->setLayoutTranslator(trans);
Expand All @@ -3238,13 +3216,14 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
IJoinProcessor &owner;
IAgentContext &agent;
bool opened;
IContextLogger &contextLogger;

public:
IMPLEMENT_IINTERFACE;


MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent)
: file(f), owner(_owner), agent(_agent), opened(false)
MonolithicKeyLookupHandler(IDistributedFile *f, IJoinProcessor &_owner, IAgentContext &_agent, IContextLogger & _contextLogger)
: file(f), owner(_owner), agent(_agent), opened(false), contextLogger(_contextLogger)
{
super = f->querySuperFile();
if (super)
Expand Down Expand Up @@ -3303,7 +3282,7 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
{
Owned<IKeyIndex> index = openKeyFile(f.queryPart(0));
owner.verifyIndex(&f, index, trans);
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, NULL, owner.hasNewSegmentMonitors(), false));
manager.setown(createLocalKeyManager(owner.queryIndexRecord(), index, &contextLogger, owner.hasNewSegmentMonitors(), false));
}
else
{
Expand All @@ -3316,7 +3295,7 @@ class MonolithicKeyLookupHandler : public CInterface, implements IKeyLookupHandl
parts->addIndex(index.getLink());
}
owner.verifyIndex(&f, index, trans);
manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, nullptr, owner.hasNewSegmentMonitors(), false));
manager.setown(createKeyMerger(owner.queryIndexRecord(), parts, 0, &contextLogger, owner.hasNewSegmentMonitors(), false));
}
if(trans)
manager->setLayoutTranslator(trans);
Expand Down Expand Up @@ -3412,9 +3391,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
RelaxedAtomic<unsigned> prefiltered;
RelaxedAtomic<unsigned> postfiltered;
RelaxedAtomic<unsigned> skips;
unsigned seeks;
unsigned scans;
unsigned wildseeks;
OwnedRowArray extractedRows;
Owned <ILocalOrDistributedFile> ldFile;
IDistributedFile * dFile;
Expand All @@ -3426,15 +3402,15 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
Owned<const IDynamicTransform> translator;
RecordTranslationMode recordTranslationModeHint = RecordTranslationMode::Unspecified;
bool isCodeSigned = false;
CStatsContextLogger contextLogger;

public:
CHThorKeyedJoinActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorKeyedJoinArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg)
: CHThorThreadedActivityBase(_agent, _activityId, _subgraphId, _arg, _arg, _kind, _graph, _arg.queryDiskRecordSize(), _node), helper(_arg), contextLogger(jhtreeCacheStatistics)
{
prefiltered = 0;
postfiltered = 0;
skips = 0;
seeks = 0;
scans = 0;
eclKeySize.set(helper.queryIndexRecordSize());
if (_node)
{
Expand Down Expand Up @@ -3990,9 +3966,9 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
mono = useMonolithic(*dFile);
}
if (mono)
lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent));
lookup.setown(new MonolithicKeyLookupHandler(dFile, *this, agent, contextLogger));
else
lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent));
lookup.setown(new DistributedKeyLookupHandler(dFile, *this, agent, contextLogger));
agent.logFileAccess(dFile, "HThor", "READ", graph);
}
else
Expand All @@ -4015,10 +3991,6 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
virtual void doneManager(IKeyManager * manager)
{
manager->releaseSegmentMonitors();
CriticalBlock b(statsCrit);
seeks += manager->querySeeks();
scans += manager->queryScans();
wildseeks += manager->queryWildSeeks();
}

virtual bool addMatch(MatchSet * ms, IKeyManager * manager)
Expand Down Expand Up @@ -4089,9 +4061,7 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
progress.addStatistic(StNumPreFiltered, prefiltered);
progress.addStatistic(StNumPostFiltered, postfiltered);
progress.addStatistic(StNumIndexSkips, skips);
progress.addStatistic(StNumIndexSeeks, seeks);
progress.addStatistic(StNumIndexScans, scans);
progress.addStatistic(StNumIndexWildSeeks, wildseeks);
contextLogger.recordStatistics(progress);
}

protected:
Expand Down
Loading

0 comments on commit fc1c60a

Please sign in to comment.