Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32241 Temp size for sort to use disk size and track actual graph temp disk usage #18883

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions thorlcr/graph/thgraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ class graph_decl CGraphBase : public CGraphStub, implements IEclGraphResults
CChildGraphTable childGraphsTable;
CGraphStubArrayCopy orderedChildGraphs;
Owned<IGraphTempHandler> tmpHandler;

Owned<CFileSizeTracker> tempFileSizeTracker;
void clean();

protected:
Expand Down Expand Up @@ -805,7 +805,24 @@ class graph_decl CGraphBase : public CGraphStub, implements IEclGraphResults
virtual void end();
virtual void abort(IException *e) override;
virtual IThorGraphResults *createThorGraphResults(unsigned num);

CFileSizeTracker * queryTempFileSizeTracker()
{
if (!tempFileSizeTracker)
tempFileSizeTracker.setown(new CFileSizeTracker);
return tempFileSizeTracker;
}
offset_t queryPeakTempSize()
{
if (tempFileSizeTracker)
return tempFileSizeTracker->queryPeakSize();
return 0;
}
offset_t queryActiveTempSize()
{
if (tempFileSizeTracker)
return tempFileSizeTracker->queryActiveSize();
return 0;
}
// IExceptionHandler
virtual bool fireException(IException *e);

Expand Down Expand Up @@ -1175,7 +1192,7 @@ class graph_decl CActivityBase : implements CInterfaceOf<IThorRowInterfaces>, im
CFileSizeTracker * queryTempFileSizeTracker()
{
if (!tempFileSizeTracker)
tempFileSizeTracker.setown(new CFileSizeTracker);
tempFileSizeTracker.setown(new CFileSizeTracker(queryGraph().queryParent()->queryTempFileSizeTracker()));
return tempFileSizeTracker;
}
offset_t queryActiveTempSize() const
Expand Down
15 changes: 1 addition & 14 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1323,20 +1323,7 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
if (tempHandler)
stats.mergeStatistic(StSizeGraphSpill, sizeGraphSpill);

// calculate peak spill size
if (started&&initialized)
{
offset_t activeTempSize = 0;
Owned<IThorActivityIterator> iter = getConnectedIterator();
ForEach (*iter)
{
CGraphElementBase &element = iter->query();
CSlaveActivity &activity = (CSlaveActivity &)*element.queryActivity();
activeTempSize += activity.queryActiveTempSize();
}
if (activeTempSize > peakTempSize)
peakTempSize = activeTempSize;
}
offset_t peakTempSize = queryPeakTempSize();
if (peakTempSize)
stats.mergeStatistic(StSizePeakTempDisk, peakTempSize);
if (peakTempSize + sizeGraphSpill)
Expand Down
1 change: 0 additions & 1 deletion thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,6 @@ class graphslave_decl CSlaveGraph : public CGraphBase
bool doneInit = false;
std::atomic_bool progressActive;
ProcessInfo processStartInfo;
offset_t peakTempSize = 0;

public:

Expand Down
2 changes: 1 addition & 1 deletion thorlcr/msort/tsorts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ class CWriteIntercept : public CSimpleInterface
}
output->flush();
offset_t end = output->getPosition();
dataFile->noteSize(output->getStatistic(StSizeDiskWrite));
output.clear();
dataFile->noteSize(end);
writeidxofs(end);
if (idxFileIO)
{
Expand Down
10 changes: 10 additions & 0 deletions thorlcr/thorutil/thormisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,19 +326,29 @@ class CFileSizeTracker: public CInterface
{
RelaxedAtomic<offset_t> activeSize{0};
RelaxedAtomic<offset_t> peakSize{0};
CFileSizeTracker * parentFileSizeTracker;
public:
CFileSizeTracker(CFileSizeTracker *parent=nullptr): parentFileSizeTracker(parent)
{
}
void growSize(offset_t size)
{
if (size)
{
offset_t newActiveSize = activeSize.add_fetch(size);
peakSize.store_max(newActiveSize);
if (parentFileSizeTracker)
parentFileSizeTracker->growSize(size);
}
}
void shrinkSize(offset_t size)
{
if (size)
{
activeSize.fetch_sub(size);
if (parentFileSizeTracker)
parentFileSizeTracker->shrinkSize(size);
}
}
offset_t queryActiveSize() const
{
Expand Down
Loading