Skip to content

Commit

Permalink
HPCC-28757 New StSizePeakEphemeralStorage and StSizePeakSubgraphTemp …
Browse files Browse the repository at this point in the history
…for graphs

StSizePeakSubgraphTemp tracks the high water mark for intra-graph temp
files.  StSizePeakEphemeralStorage tracks the overall high water mark
for temp files (i.e. both intra-graph and inter-graph temp files)

Note at present, only hash dedup produces the information necessary for
generating these 2 stats and so the stats will not be accurate until
further work is completed.  Other activities that make use of temp
files(such as join and sort) will need to be amended to the info
necessary to produce these stats.

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Apr 18, 2024
1 parent d5f12b6 commit 6c64228
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 87 deletions.
3 changes: 2 additions & 1 deletion system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ enum StatisticKind
StSizeContinuationData,
StNumContinuationRequests,
StNumFailures,
StSizePeakSpill,
StSizePeakSubgraphTemp,
StSizePeakEphemeralStorage,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
5 changes: 3 additions & 2 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ CYCLESTAT(LeafFetch) },
{ TIMESTAT(BlobFetch), "Time spent reading blobs from disk (EXCLUDING the linux page cache)" },
{ CYCLESTAT(BlobFetch) },
{ PEAKSIZESTAT(GraphSpill), "Peak size of spill memory usage" },
{ PEAKSIZESTAT(GraphSpill), "Peak size of graph spills" },
{ TIMESTAT(AgentQueue), "Time worker items were received and queued before being processed\nThis may indicate that the primary node on a channel was down, or that the workers are overloaded with requests" },
{ CYCLESTAT(AgentQueue) },
{ TIMESTAT(IBYTIDelay), "Time spent waiting for another worker to start processing a request\nA non-zero value indicates that the primary node on a channel was down or very busy" },
Expand Down Expand Up @@ -974,7 +974,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ SIZESTAT(ContinuationData), "The total size of continuation data sent from agent to the server\nA large number may indicate a poor filter, or merging from many different index locations" },
{ NUMSTAT(ContinuationRequests), "The number of times the agent indicated there was more data to be returned" },
{ NUMSTAT(Failures), "The number of times a query has failed" },
{ PEAKSIZESTAT(PeakSpill), "The high water mark for spill files"},
{ PEAKSIZESTAT(PeakSubgraphTemp), "The high water mark for intra-graph temp files"},
{ PEAKSIZESTAT(PeakEphemeralStorage), "The high water mark for emphemeral storage use"},
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
19 changes: 5 additions & 14 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2648,18 +2648,18 @@ class CSpill : implements IRowWriter, public CSimpleInterface
CActivityBase &owner;
IThorRowInterfaces *rowIf;
rowcount_t count;
Owned<CFileOwnerSizeUpdater> spillFile;
Owned<CFileOwner> spillFile;
Owned<IFileIO> spillFileIO;
IRowWriter *writer;
StringAttr desc;
unsigned bucketN, rwFlags;
Linked<FilesSizeTracker> spillsSizeTracker;
Linked<CFileSizeTracker> tempFileSizeTracker;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, FilesSizeTracker * _spillsSizeTracker)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), spillsSizeTracker(_spillsSizeTracker)
CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, CFileSizeTracker * _tempFileSizeTracker)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), tempFileSizeTracker(_tempFileSizeTracker)
{
count = 0;
writer = NULL;
Expand All @@ -2677,7 +2677,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
prefix.append(bucketN).append('_').append(desc);
GetTempFilePath(tempname, prefix.str());
OwnedIFile iFile = createIFile(tempname.str());
spillFile.setown(new CFileOwnerSizeUpdater(iFile.getLink(), spillsSizeTracker));
spillFile.setown(new CFileOwner(iFile.getLink(), tempFileSizeTracker));
if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true))
{
rwFlags |= rw_compress;
Expand Down Expand Up @@ -3176,7 +3176,6 @@ class HashDedupSlaveActivityBase : public CSlaveActivity
bucketHandlerStack.kill();
CSlaveActivity::kill();
}

CATCH_NEXTROW()
{
ActivityTimer t(slaveTimerStats, timeActivities);
Expand Down Expand Up @@ -3302,14 +3301,6 @@ class HashDedupSlaveActivityBase : public CSlaveActivity

virtual bool isGrouped() const override { return grouped; }
virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const override = 0;

virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
PARENT::gatherActiveStats(activeStats);
unsigned __int64 peakSpill = queryPeakSpillSize();
activeStats.setStatistic(StSizePeakSpill, peakSpill);
}

friend class CBucketHandler;
friend class CHashTableRowTable;
friend class CBucket;
Expand Down
14 changes: 8 additions & 6 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1287,19 +1287,21 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
// calculate peak spill size
if (started&&initialized)
{
unsigned __int64 activeSpillSize = 0;
unsigned __int64 activeTempSize = 0;
Owned<IThorActivityIterator> iter = getConnectedIterator();
ForEach (*iter)
{
CGraphElementBase &element = iter->query();
CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
activeSpillSize += activity.queryActiveSpillSize();
activeTempSize += activity.queryActiveTempSize();
}
if (activeSpillSize > peakSpillSize)
peakSpillSize = activeSpillSize;
if (activeTempSize > peakTempSize)
peakTempSize = activeTempSize;
}
if (peakSpillSize + sizeGraphSpill)
stats.mergeStatistic(StSizePeakSpill, peakSpillSize + sizeGraphSpill);
if (peakTempSize)
stats.mergeStatistic(StSizePeakSubgraphTemp, peakTempSize);
if (peakTempSize + sizeGraphSpill)
stats.mergeStatistic(StSizePeakEphemeralStorage, peakTempSize + sizeGraphSpill);
stats.serialize(mb);

unsigned cPos = mb.length();
Expand Down
20 changes: 8 additions & 12 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
// fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all.
// (Having it in the base class aids setup and resizing.)
mutable std::vector<OwnedPtr<CRuntimeStatisticCollection>> fileStats;
Owned<FilesSizeTracker> spillsSizeTracker;
Owned<CFileSizeTracker> tempFileSizeTracker;

protected:
unsigned __int64 queryLocalCycles() const;
Expand Down Expand Up @@ -263,19 +263,15 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
bool canStall() const;
bool isFastThrough() const;

FilesSizeTracker * queryFileSizeTracker()
CFileSizeTracker * queryFileSizeTracker()
{
if (!spillsSizeTracker)
spillsSizeTracker.setown(new FilesSizeTracker);
return spillsSizeTracker;
if (!tempFileSizeTracker)
tempFileSizeTracker.setown(new CFileSizeTracker);
return tempFileSizeTracker;
}
unsigned __int64 queryActiveSpillSize() const
unsigned __int64 queryActiveTempSize() const
{
return spillsSizeTracker ? spillsSizeTracker->queryActiveSize() : 0;
}
unsigned __int64 queryPeakSpillSize() const
{
return spillsSizeTracker ? spillsSizeTracker->queryPeakSize() : 0;
return tempFileSizeTracker ? tempFileSizeTracker->queryActiveSize() : 0;
}

// IThorDataLink
Expand Down Expand Up @@ -531,7 +527,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase
bool doneInit = false;
std::atomic_bool progressActive;
ProcessInfo processStartInfo;
unsigned __int64 peakSpillSize = 0;
unsigned __int64 peakTempSize = 0;

public:

Expand Down
4 changes: 2 additions & 2 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows},
const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics);
const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakSubgraphTemp, StSizePeakEphemeralStorage, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakSpill}, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed}, diskWriteRemoteStatistics, basicActivityStatistics);

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
Expand Down
91 changes: 41 additions & 50 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,22 +306,62 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded
virtual bool action() = 0;
};

// Tracks the current and peak storage used for some files
class CFileSizeTracker: public CInterface
{
RelaxedAtomic<unsigned __int64> activeSize{0};
RelaxedAtomic<unsigned __int64> peakSize{0};
public:
void growSize(unsigned __int64 size)
{
if (size)
{
unsigned __int64 newActiveSize = activeSize.add_fetch(size);
peakSize.store_max(newActiveSize);
}
}
void shrinkSize(unsigned __int64 size)
{
if (size)
activeSize.fetch_sub(size);
}
unsigned __int64 queryActiveSize() const
{
return activeSize.load();
}
unsigned __int64 queryPeakSize() const
{
return peakSize.load();
}
};

// simple class which takes ownership of the underlying file and deletes it on destruction
class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
{
OwnedIFile iFile;
Linked<CFileSizeTracker> fileSizeTracker;
__int64 fileSize = 0;
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
CFileOwner(IFile *_iFile) : iFile(_iFile)
CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker)
{
}
~CFileOwner()
{
if (fileSizeTracker)
fileSizeTracker->shrinkSize(fileSize);
iFile->remove();
}
void noteSpill(__int64 size)
{
fileSize = size;
if (fileSizeTracker)
fileSizeTracker->growSize(fileSize);
}
IFile &queryIFile() const { return *iFile; }
};


// stream wrapper, that takes ownership of a CFileOwner
class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
{
Expand Down Expand Up @@ -359,55 +399,6 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf<IExtRowStream>
}
};

// Tracks the current and peak storage used for some files
class FilesSizeTracker: public CInterface
{
RelaxedAtomic<unsigned __int64> activeSize{0};
RelaxedAtomic<unsigned __int64> peakSize{0};
public:
void growSize(unsigned __int64 size)
{
if (size)
{
unsigned __int64 newActiveSize = activeSize.add_fetch(size);
peakSize.store_max(newActiveSize);
}
}
void shrinkSize(unsigned __int64 size)
{
if (size)
activeSize.fetch_sub(size);
}
unsigned __int64 queryActiveSize() const
{
return activeSize.load();
}
unsigned __int64 queryPeakSize() const
{
return peakSize.load();
}
};

class CFileOwnerSizeUpdater : public CFileOwner
{
Linked<FilesSizeTracker> filesSizeTracker;
__int64 fileSize = 0;
public:
CFileOwnerSizeUpdater(IFile *_iFile, FilesSizeTracker * _filesSizeTracker): CFileOwner(_iFile), filesSizeTracker(_filesSizeTracker)
{}

~CFileOwnerSizeUpdater()
{
filesSizeTracker->shrinkSize(fileSize);
}

void noteSpill(__int64 size)
{
fileSize = size;
filesSizeTracker->growSize(fileSize);
}
};

#define DEFAULT_THORMASTERPORT 20000
#define DEFAULT_THORSLAVEPORT 20100
#define DEFAULT_SLAVEPORTINC 20
Expand Down

0 comments on commit 6c64228

Please sign in to comment.