Skip to content

Commit

Permalink
HPCC-28757 New StSizePeakEphemeralStorage and StSizePeakSubgraphTemp …
Browse files Browse the repository at this point in the history
…for graphs

StSizePeakSubgraphTemp tracks the high water mark for intra-graph temp
files.  StSizePeakEphemeralStorage tracks the overall high water mark
for temp files (i.e. both intra-graph and inter-graph temp files)

Note at present, only hash dedup produces the information necessary for
generating these 2 stats and so the stats will not be accurate until
further work is completed.  Other activities that make use of temp
files(such as join and sort) will need to be amended to the info
necessary to produce these stats.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Apr 23, 2024
1 parent cbb5711 commit 1a12426
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 10 deletions.
2 changes: 2 additions & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ enum StatisticKind
StSizeContinuationData,
StNumContinuationRequests,
StNumFailures,
StSizePeakSubgraphTemp,
StSizePeakEphemeralStorage,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
4 changes: 3 additions & 1 deletion system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ CYCLESTAT(LeafFetch) },
{ TIMESTAT(BlobFetch), "Time spent reading blobs from disk (EXCLUDING the linux page cache)" },
{ CYCLESTAT(BlobFetch) },
{ PEAKSIZESTAT(GraphSpill), "Peak size of spill memory usage" },
{ PEAKSIZESTAT(GraphSpill), "Peak size of graph spills" },
{ TIMESTAT(AgentQueue), "Time worker items were received and queued before being processed\nThis may indicate that the primary node on a channel was down, or that the workers are overloaded with requests" },
{ CYCLESTAT(AgentQueue) },
{ TIMESTAT(IBYTIDelay), "Time spent waiting for another worker to start processing a request\nA non-zero value indicates that the primary node on a channel was down or very busy" },
Expand Down Expand Up @@ -974,6 +974,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ SIZESTAT(ContinuationData), "The total size of continuation data sent from agent to the server\nA large number may indicate a poor filter, or merging from many different index locations" },
{ NUMSTAT(ContinuationRequests), "The number of times the agent indicated there was more data to be returned" },
{ NUMSTAT(Failures), "The number of times a query has failed" },
{ PEAKSIZESTAT(PeakSubgraphTemp), "The high water mark for intra-graph temp files"},
{ PEAKSIZESTAT(PeakEphemeralStorage), "The high water mark for emphemeral storage use"},
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
10 changes: 6 additions & 4 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2653,12 +2653,13 @@ class CSpill : implements IRowWriter, public CSimpleInterface
IRowWriter *writer;
StringAttr desc;
unsigned bucketN, rwFlags;
Linked<CFileSizeTracker> tempFileSizeTracker;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN)
CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, CFileSizeTracker * _tempFileSizeTracker)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), tempFileSizeTracker(_tempFileSizeTracker)
{
count = 0;
writer = NULL;
Expand All @@ -2676,7 +2677,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
prefix.append(bucketN).append('_').append(desc);
GetTempFilePath(tempname, prefix.str());
OwnedIFile iFile = createIFile(tempname.str());
spillFile.setown(new CFileOwner(iFile.getLink()));
spillFile.setown(new CFileOwner(iFile.getLink(), tempFileSizeTracker));
if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true))
{
rwFlags |= rw_compress;
Expand Down Expand Up @@ -2712,6 +2713,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
writer = NULL;
spillFileIO->flush();
mergeStats(stats, this);
spillFile->noteSpill(getStatistic(StSizeSpillFile));
spillFileIO.clear();
}
inline __int64 getStatistic(StatisticKind kind) const
Expand Down Expand Up @@ -3373,7 +3375,7 @@ void CHashTableRowTable::rehash(const void **newRows)

CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
: owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryTempFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryTempFileSizeTracker())

{
spilt = false;
Expand Down
12 changes: 12 additions & 0 deletions thorlcr/graph/thgraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,7 @@ class graph_decl CActivityBase : implements CInterfaceOf<IThorRowInterfaces>, im
CSingletonLock CABserializerlock;
CSingletonLock CABdeserializerlock;
roxiemem::RoxieHeapFlags defaultRoxieMemHeapFlags = roxiemem::RHFnone;
Owned<CFileSizeTracker> tempFileSizeTracker;

protected:
CGraphElementBase &container;
Expand Down Expand Up @@ -1171,6 +1172,17 @@ class graph_decl CActivityBase : implements CInterfaceOf<IThorRowInterfaces>, im
IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0);
IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0);

CFileSizeTracker * queryTempFileSizeTracker()
{
if (!tempFileSizeTracker)
tempFileSizeTracker.setown(new CFileSizeTracker);
return tempFileSizeTracker;
}
unsigned __int64 queryActiveTempSize() const
{
return tempFileSizeTracker ? tempFileSizeTracker->queryActiveSize() : 0;
}

// IExceptionHandler
bool fireException(IException *e);
__declspec(noreturn) void processAndThrowOwnedException(IException * e) __attribute__((noreturn));
Expand Down
22 changes: 21 additions & 1 deletion thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1280,8 +1280,28 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats);

IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler();
offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0;
if (tempHandler)
stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize());
stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill);

// calculate peak spill size
if (started&&initialized)
{
unsigned __int64 activeTempSize = 0;
Owned<IThorActivityIterator> iter = getConnectedIterator();
ForEach (*iter)
{
CGraphElementBase &element = iter->query();
CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
activeTempSize += activity.queryActiveTempSize();
}
if (activeTempSize > peakTempSize)
peakTempSize = activeTempSize;
}
if (peakTempSize)
stats.mergeStatistic(StSizePeakSubgraphTemp, peakTempSize);
if (peakTempSize + sizeGraphSpill)
stats.mergeStatistic(StSizePeakEphemeralStorage, peakTempSize + sizeGraphSpill);
stats.serialize(mb);

unsigned cPos = mb.length();
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
bool canStall() const;
bool isFastThrough() const;


// IThorDataLink
virtual CSlaveActivity *queryFromActivity() override { return this; }
virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
Expand Down Expand Up @@ -516,6 +515,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase
bool doneInit = false;
std::atomic_bool progressActive;
ProcessInfo processStartInfo;
unsigned __int64 peakTempSize = 0;

public:

Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows},
const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics);
const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakSubgraphTemp, StSizePeakEphemeralStorage, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics);
Expand Down
43 changes: 41 additions & 2 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,22 +306,62 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded
virtual bool action() = 0;
};

// Tracks the current and peak storage used for some files
class CFileSizeTracker: public CInterface
{
RelaxedAtomic<unsigned __int64> activeSize{0};
RelaxedAtomic<unsigned __int64> peakSize{0};
public:
void growSize(unsigned __int64 size)
{
if (size)
{
unsigned __int64 newActiveSize = activeSize.add_fetch(size);
peakSize.store_max(newActiveSize);
}
}
void shrinkSize(unsigned __int64 size)
{
if (size)
activeSize.fetch_sub(size);
}
unsigned __int64 queryActiveSize() const
{
return activeSize.load();
}
unsigned __int64 queryPeakSize() const
{
return peakSize.load();
}
};

// simple class which takes ownership of the underlying file and deletes it on destruction
class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
{
OwnedIFile iFile;
Linked<CFileSizeTracker> fileSizeTracker;
offset_t fileSize = 0;
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
CFileOwner(IFile *_iFile) : iFile(_iFile)
CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker)
{
}
~CFileOwner()
{
if (fileSizeTracker)
fileSizeTracker->shrinkSize(fileSize);
iFile->remove();
}
void noteSpill(offset_t size)
{
fileSize = size;
if (fileSizeTracker)
fileSizeTracker->growSize(fileSize);
}
IFile &queryIFile() const { return *iFile; }
};


// stream wrapper, that takes ownership of a CFileOwner
class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
{
Expand Down Expand Up @@ -359,7 +399,6 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
}
};


#define DEFAULT_THORMASTERPORT 20000
#define DEFAULT_THORSLAVEPORT 20100
#define DEFAULT_SLAVEPORTINC 20
Expand Down

0 comments on commit 1a12426

Please sign in to comment.