Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-28757 New StSizePeakEphemeralDisk and StSizePeakTempDisk for graphs #18482

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ enum StatisticKind
StNumLocalRows,
StNumRemoteRows,
StSizeRemoteWrite,
StSizePeakTempDisk,
StSizePeakEphemeralDisk,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
4 changes: 3 additions & 1 deletion system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ CYCLESTAT(LeafFetch) },
{ TIMESTAT(BlobFetch), "Time spent reading blobs from disk (EXCLUDING the linux page cache)" },
{ CYCLESTAT(BlobFetch) },
{ PEAKSIZESTAT(GraphSpill), "Peak size of spill memory usage" },
{ PEAKSIZESTAT(GraphSpill), "High water mark for inter-subgraph spill size" },
{ TIMESTAT(AgentQueue), "Time worker items were received and queued before being processed\nThis may indicate that the primary node on a channel was down, or that the workers are overloaded with requests" },
{ CYCLESTAT(AgentQueue) },
{ TIMESTAT(IBYTIDelay), "Time spent waiting for another worker to start processing a request\nA non-zero value indicates that the primary node on a channel was down or very busy" },
Expand Down Expand Up @@ -977,6 +977,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ NUMSTAT(LocalRows), "Number of rows handled locally"},
{ NUMSTAT(RemoteRows), "Number of rows sent to remote workers"},
{ SIZESTAT(RemoteWrite), "Size of data sent to remote workers"},
{ PEAKSIZESTAT(PeakTempDisk), "High water mark for temporary files"},
{ PEAKSIZESTAT(PeakEphemeralDisk), "High water mark for emphemeral storage use"},
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
10 changes: 6 additions & 4 deletions thorlcr/activities/hashdistrib/thhashdistribslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2703,12 +2703,13 @@ class CSpill : implements IRowWriter, public CSimpleInterface
IRowWriter *writer;
StringAttr desc;
unsigned bucketN, rwFlags;
Linked<CFileSizeTracker> tempFileSizeTracker;

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN)
CSpill(CActivityBase &_owner, IThorRowInterfaces *_rowIf, const char *_desc, unsigned _bucketN, CFileSizeTracker * _tempFileSizeTracker)
: owner(_owner), rowIf(_rowIf), desc(_desc), bucketN(_bucketN), tempFileSizeTracker(_tempFileSizeTracker)
{
count = 0;
writer = NULL;
Expand All @@ -2726,7 +2727,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
prefix.append(bucketN).append('_').append(desc);
GetTempFilePath(tempname, prefix.str());
OwnedIFile iFile = createIFile(tempname.str());
spillFile.setown(new CFileOwner(iFile.getLink()));
spillFile.setown(new CFileOwner(iFile.getLink(), tempFileSizeTracker));
if (owner.getOptBool(THOROPT_COMPRESS_SPILLS, true))
{
rwFlags |= rw_compress;
Expand Down Expand Up @@ -2762,6 +2763,7 @@ class CSpill : implements IRowWriter, public CSimpleInterface
writer = NULL;
spillFileIO->flush();
mergeStats(stats, this);
spillFile->noteSize(getStatistic(StSizeSpillFile));
spillFileIO.clear();
}
inline __int64 getStatistic(StatisticKind kind) const
Expand Down Expand Up @@ -3423,7 +3425,7 @@ void CHashTableRowTable::rehash(const void **newRows)

CBucket::CBucket(HashDedupSlaveActivityBase &_owner, IThorRowInterfaces *_rowIf, IThorRowInterfaces *_keyIf, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows)
: owner(_owner), keyIf(_keyIf), extractKey(_extractKey), bucketN(_bucketN), htRows(_htRows),
rowSpill(owner, _rowIf, "rows", _bucketN), keySpill(owner, _keyIf, "keys", _bucketN)
rowSpill(owner, _rowIf, "rows", _bucketN, _owner.queryTempFileSizeTracker()), keySpill(owner, _keyIf, "keys", _bucketN, _owner.queryTempFileSizeTracker())

{
spilt = false;
Expand Down
16 changes: 16 additions & 0 deletions thorlcr/graph/thgraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,7 @@ class graph_decl CActivityBase : implements CInterfaceOf<IThorRowInterfaces>, im
CSingletonLock CABserializerlock;
CSingletonLock CABdeserializerlock;
roxiemem::RoxieHeapFlags defaultRoxieMemHeapFlags = roxiemem::RHFnone;
Owned<CFileSizeTracker> tempFileSizeTracker;

protected:
CGraphElementBase &container;
Expand Down Expand Up @@ -1171,6 +1172,21 @@ class graph_decl CActivityBase : implements CInterfaceOf<IThorRowInterfaces>, im
IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0);
IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0);

CFileSizeTracker * queryTempFileSizeTracker()
{
if (!tempFileSizeTracker)
tempFileSizeTracker.setown(new CFileSizeTracker);
return tempFileSizeTracker;
}
offset_t queryActiveTempSize() const
{
return tempFileSizeTracker ? tempFileSizeTracker->queryActiveSize() : 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is only updated when the file is closed. @jakesmith I assume that is going to work for hash dedup, but will it scale to other situations - e.g. lookahead - where the file is not closed until the end.

Also are you planning to publish the active and peak spill file size for each activity? What should we call them rather than spill files? Do we need to rename the existing internal spills to match this convention.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we call them rather than spill files?

we are restricting 'spill' to mean inter-subgraph spills. And 'temp' to mean any internal activity temp files usage (i.e. inc. lookahead etc.). NB: we already do distinguish that way via "spill" and "temp" plane categories.

Copy link
Member

@jakesmith jakesmith Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is only updated when the file is closed.

the tracker could be updated at any time, but yes in hashdedup, it only updates it at the end, when a bucket spills. That seemed reasonable since the write is expected to be relatively quick.

For lookahead, we should update periodically not at end.

The actual collection is periodic (in this PR) and will collect whatever updates there are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lookahead will be done differently, so that the size is updated intermittently.

I'm not attempting to record active temp file size as at the active temp size always ends up 0 because at the end of each activity the temp files are closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Active could be useful, e.g. a long running subgraph that hit peak early on, but is now slow because active is growing slowly.
We should also capture the total temp written too. e.g. peak might be a few GB, but it may have written near to that 100's of times.
Can revisit in another PR though.

}
offset_t queryPeakTempSize() const
{
return tempFileSizeTracker ? tempFileSizeTracker->queryPeakSize() : 0;
}

// IExceptionHandler
bool fireException(IException *e);
__declspec(noreturn) void processAndThrowOwnedException(IException * e) __attribute__((noreturn));
Expand Down
22 changes: 21 additions & 1 deletion thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1280,8 +1280,28 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
jobS->querySharedAllocator()->queryRowManager()->reportSummaryStatistics(stats);

IGraphTempHandler *tempHandler = owner ? queryTempHandler(false) : queryJob().queryTempHandler();
offset_t sizeGraphSpill = tempHandler ? tempHandler->getActiveUsageSize() : 0;
if (tempHandler)
stats.mergeStatistic(StSizeGraphSpill, tempHandler->getActiveUsageSize());
stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill);

// calculate peak spill size
if (started&&initialized)
{
offset_t activeTempSize = 0;
Owned<IThorActivityIterator> iter = getConnectedIterator();
ForEach (*iter)
{
CGraphElementBase &element = iter->query();
CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
activeTempSize += activity.queryActiveTempSize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be calling activity.quertPeakTempSize()? I can see problems either way.

  • if calling active size then you will only record the peak when the stats were gathered. So if a temp file was created and then deleted it would not show up.
  • If calling peak size, then the number may end up too high. E.g. if you have sequential operations where one temp file is always deleted before another is created. I suspect that it fairly unusual, and we are probably more interested in the worse case, rather than the general case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure.
Sum peaks (if >1 act), will I think usually result in an peak that is significantly higher than the true peak, because it is normal for downstream acts to consume some of the spill files. I think this over-estimate will be problematic.

IMO is it is better as it is, sampling active sum to see if > current peak.
My only concern is that the sample period is too long.

An alterntive perhaps (still sampling), is to recalculate more frequently at the time CFileSizeTracker::growSize is called. e.g. call back ito subgraph and have it collate peak (if hasn't in last in last e.g. 1s).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question is why are we gathering this information. I think it is so that we know an upper bound on the amount of temp space that a workunit will use - so we can correctly size a disk.
If that is the case it is better to over-estimate than under-estimate - which is why I would go for peaksize.
Should we call to discuss?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser - I discussed this with Gavin (I meant to update this PR before) and the conclusion was that we tracking the active peak is the right approach, but calculating that peak at low-intervals is insufficient. i.e. calculating the subgraph peak every ~15seconds when the progress is collected is too infrequent and will often lead to inaccurate (low) estimates.

As I began to outline in my comment above, we think a good approach is for all the trackers to callback into the subgraph each time they grow in size, and for the subgraph to recalculate the peak. Those calls will be frequent, but it (the subgraph impl.) can decide when to recalculate, e.g. it has recalculated <1s ago, so do nothing.
Then that more frequently calculated peak can be serialized back with the graph progress when needed.

@ghalliday - have I forgotten anything? And, I vote for this not to hold up this PR being merged. A new PR to implement this strategy can be opened. I'll re-approve this PR.

}
if (activeTempSize > peakTempSize)
peakTempSize = activeTempSize;
}
if (peakTempSize)
stats.mergeStatistic(StSizePeakTempDisk, peakTempSize);
if (peakTempSize + sizeGraphSpill)
stats.mergeStatistic(StSizePeakEphemeralDisk, peakTempSize + sizeGraphSpill);
stats.serialize(mb);

unsigned cPos = mb.length();
Expand Down
9 changes: 7 additions & 2 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
void startLookAhead(unsigned index);
bool isLookAheadActive(unsigned index) const;
void setupSpace4FileStats(unsigned where, bool statsForMultipleFiles, bool isSuper, unsigned numSubs, const StatisticsMapping & statsMapping);
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const { }
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
offset_t peakTempSize = queryPeakTempSize();
if (peakTempSize)
activeStats.mergeStatistic(StSizePeakTempDisk, peakTempSize);
}
public:
IMPLEMENT_IINTERFACE_USING(CActivityBase)

Expand Down Expand Up @@ -262,7 +267,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
bool canStall() const;
bool isFastThrough() const;


// IThorDataLink
virtual CSlaveActivity *queryFromActivity() override { return this; }
virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
Expand Down Expand Up @@ -516,6 +520,7 @@ class graphslave_decl CSlaveGraph : public CGraphBase
bool doneInit = false;
std::atomic_bool progressActive;
ProcessInfo processStartInfo;
offset_t peakTempSize = 0;

public:

Expand Down
5 changes: 2 additions & 3 deletions thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,13 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows},
const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead, }, basicActivityStatistics, diskReadRemoteStatistics);
const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizeGraphSpill, StSizePeakTempDisk, StSizePeakEphemeralDisk, StTimeUser, StTimeSystem, StNumContextSwitches, StSizeMemory, StSizePeakMemory, StSizeRowMemory, StSizePeakRowMemory}, basicActivityStatistics);
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
const StatisticsMapping indexDistribActivityStatistics({}, basicActivityStatistics, jhtreeCacheStatistics);
const StatisticsMapping soapcallActivityStatistics({}, basicActivityStatistics, soapcallStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed}, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDedupActivityStatistics({StNumSpills, StSizeSpillFile, StTimeSortElapsed, StSizePeakTempDisk}, diskWriteRemoteStatistics, basicActivityStatistics);
const StatisticsMapping hashDistribActivityStatistics({StNumLocalRows, StNumRemoteRows, StSizeRemoteWrite}, basicActivityStatistics);


MODULE_INIT(INIT_PRIORITY_STANDARD)
{
ClusterMPAllocator.setown(createMPtagRangeAllocator(MPTAG_THORGLOBAL_BASE,MPTAG_THORGLOBAL_COUNT));
Expand Down
41 changes: 40 additions & 1 deletion thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,19 +308,58 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded
virtual bool action() = 0;
};

// Tracks the current and peak storage used for some files
class CFileSizeTracker: public CInterface
{
RelaxedAtomic<offset_t> activeSize{0};
RelaxedAtomic<offset_t> peakSize{0};
public:
void growSize(offset_t size)
{
if (size)
{
offset_t newActiveSize = activeSize.add_fetch(size);
peakSize.store_max(newActiveSize);
}
}
void shrinkSize(offset_t size)
{
if (size)
activeSize.fetch_sub(size);
}
offset_t queryActiveSize() const
{
return activeSize.load();
}
offset_t queryPeakSize() const
{
return peakSize.load();
}
};

// simple class which takes ownership of the underlying file and deletes it on destruction
class graph_decl CFileOwner : public CSimpleInterface, implements IInterface
{
OwnedIFile iFile;
Linked<CFileSizeTracker> fileSizeTracker;
offset_t fileSize = 0;
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
CFileOwner(IFile *_iFile) : iFile(_iFile)
CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker)
{
}
~CFileOwner()
{
if (fileSizeTracker)
fileSizeTracker->shrinkSize(fileSize);
iFile->remove();
}
void noteSize(offset_t size)
{
fileSize = size;
if (fileSizeTracker)
fileSizeTracker->growSize(fileSize);
}
IFile &queryIFile() const { return *iFile; }
};

Expand Down
Loading