Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-29880 Serialize index write's jhtree and disk io stats regularly #19385

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions system/jhtree/keybuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ class CKeyBuilder : public CInterfaceOf<IKeyBuilder>

private:
unsigned __int64 duplicateCount;
unsigned __int64 numLeaves = 0;
unsigned __int64 numBranches = 0;
unsigned __int64 numBlobs = 0;
RelaxedAtomic<__uint64> numLeaves{0};
RelaxedAtomic<__uint64> numBranches{0};
RelaxedAtomic<__uint64> numBlobs{0};
__uint64 partitionFieldMask = 0;
CWriteNode *activeNode = nullptr;
CBlobWriteNode *activeBlobNode = nullptr;
Expand Down Expand Up @@ -485,7 +485,7 @@ class CKeyBuilder : public CInterfaceOf<IKeyBuilder>
keyHdr->getHdrStruct()->partitionFieldMask = partitionFieldMask;
CRC32 headerCrc;
writeFileHeader(false, &headerCrc);

out->flush();
if (fileCrc)
{
if (doCrc)
Expand Down Expand Up @@ -605,7 +605,20 @@ class CKeyBuilder : public CInterfaceOf<IKeyBuilder>
virtual unsigned __int64 getOffsetBranches() const override { return offsetBranches; }
virtual unsigned __int64 getBranchMemorySize() const override { return indexCompressor->queryBranchMemorySize(); }
virtual unsigned __int64 getLeafMemorySize() const override { return indexCompressor->queryLeafMemorySize(); }

virtual unsigned __int64 getStatistic(StatisticKind kind) const override
{
switch (kind)
{
case StNumLeafCacheAdds:
return numLeaves;
case StNumNodeCacheAdds:
return numBranches;
case StNumBlobCacheAdds:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future: add other stats (offsetBranches, branchMemorySize, leadMemorySize) to stats mapping and collect these via gather stats too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return numBlobs;
default:
return out->getStatistic(kind);
}
}
protected:
void writeMetadata(char const * data, size32_t size)
{
Expand Down
1 change: 1 addition & 0 deletions system/jhtree/keybuild.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ interface IKeyBuilder : public IInterface
virtual unsigned __int64 getOffsetBranches() const = 0;
virtual unsigned __int64 getBranchMemorySize() const = 0;
virtual unsigned __int64 getLeafMemorySize() const = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) const = 0;
};

extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, IHThorIndexWriteArg *helper, const char * defaultCompression, bool enforceOrder, bool isTLK);
Expand Down
77 changes: 37 additions & 40 deletions thorlcr/activities/indexwrite/thindexwriteslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
StringAttr logicalFilename;
Owned<IPartDescriptor> partDesc, tlkDesc;
IHThorIndexWriteArg *helper;
Owned <IKeyBuilder> builder;
OwnedIFileIO builderIFileIO;
Owned<IKeyBuilder> builder;
Owned<IRowStream> myInputStream;
Owned<IPropertyTree> metadata;
Linked<IEngineRowAllocator> outRowAllocator;
mutable CriticalSection builderCS;

bool buildTlk, active;
bool sizeSignalled;
Expand All @@ -54,9 +54,6 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt

size32_t lastRowSize, firstRowSize, maxRecordSizeSeen, keyedSize;
unsigned __int64 duplicateKeyCount;
unsigned __int64 numLeafNodes = 0;
unsigned __int64 numBranchNodes = 0;
unsigned __int64 numBlobNodes = 0;
offset_t offsetBranches = 0;
offset_t uncompressedSize = 0;
offset_t originalBlobSize = 0;
Expand Down Expand Up @@ -212,12 +209,15 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
if (metadata->getPropBool("_useTrailingHeader", true))
flags |= USE_TRAILING_HEADER;
unsigned twFlags = isUrl(partFname) ? TW_Direct : TW_RenameToPrimary;
builderIFileIO.setown(createMultipleWrite(this, partDesc, 0, twFlags, compress, NULL, this, &abortSoon));
OwnedIFileIO builderIFileIO = createMultipleWrite(this, partDesc, 0, twFlags, compress, NULL, this, &abortSoon);
Owned<IFileIOStream> out = createBufferedIOStream(builderIFileIO, 0x100000);
if (!needsSeek)
out.setown(createNoSeekIOStream(out));
maxRecordSizeSeen = 0;
builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTlk ? 0 : totalCount, helper, defaultIndexCompression, !isTlk, isTlk));
{
CriticalBlock b(builderCS);
builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTlk ? 0 : totalCount, helper, defaultIndexCompression, !isTlk, isTlk));
}
}
void buildLayoutMetadata(Owned<IPropertyTree> & metadata)
{
Expand All @@ -235,16 +235,24 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
{
if (builder)
{
builder->finish(metadata, &crc, maxRecordSizeSeen);
if (!isTLK)
// Clear out builder before merging builder stats into inactive stats
// so that gatherActiveStatistics doesn't also merge builder stats.
Owned<IKeyBuilder> tmpBuilder;
{
CriticalBlock b(builderCS);
tmpBuilder.setown(builder.getClear());
}
if (tmpBuilder)
{
duplicateKeyCount = builder->getDuplicateCount();
numLeafNodes = builder->getNumLeafNodes();
numBranchNodes = builder->getNumBranchNodes();
numBlobNodes = builder->getNumBlobNodes();
offsetBranches = builder->getOffsetBranches();
branchMemorySize = builder->getBranchMemorySize();
leafMemorySize = builder->getLeafMemorySize();
tmpBuilder->finish(metadata, &crc, maxRecordSizeSeen);
if (!isTLK)
{
duplicateKeyCount = tmpBuilder->getDuplicateCount();
offsetBranches = tmpBuilder->getOffsetBranches();
branchMemorySize = tmpBuilder->getBranchMemorySize();
leafMemorySize = tmpBuilder->getLeafMemorySize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future: for next JIRA, but these should be added as stats to indexWriteActivityStatistics, and collected via builder in a common way, and manual serialization/deserialization here and in master should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
mergeStats(inactiveStats, tmpBuilder, indexWriteActivityStatistics);
}
}
}
Expand All @@ -259,22 +267,7 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
abortSoon = true;
e.setown(MakeActivityException(this, 0, "INDEXWRITE: Error closing file: %s - unknown exception", partFname.str()));
}
try
{
metadata.clear();
builder.clear();
if (builderIFileIO)
{
mergeStats(inactiveStats, builderIFileIO, diskWriteRemoteStatistics);
builderIFileIO->close();
builderIFileIO.clear();
}
}
catch (IException *_e)
{
ActPrintLog(_e, "Error closing file: %s", partFname.str());
e.setown(_e);
}
metadata.clear();
if (abortSoon)
removeFiles(partDesc);
if (e)
Expand Down Expand Up @@ -613,7 +606,10 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
}
virtual void processDone(MemoryBuffer &mb) override
{
builder.clear();
{
CriticalBlock b(builderCS);
builder.clear();
}
if (refactor && !active)
return;
rowcount_t _processed = processed & THORDATALINK_COUNT_MASK;
Expand All @@ -630,10 +626,9 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
ifile->getTime(&createTime, &modifiedTime, &accessedTime);
modifiedTime.serialize(mb);
mb.append(partCrc);

mb.append(numLeafNodes);
mb.append(numBlobNodes);
mb.append(numBranchNodes);
mb.append(inactiveStats.getStatisticValue(StNumLeafCacheAdds));
mb.append(inactiveStats.getStatisticValue(StNumBlobCacheAdds));
mb.append(inactiveStats.getStatisticValue(StNumNodeCacheAdds));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future: Is there a separate JIRA to remove these manual serializations? ..since they will now be serialized as part of the activity stats. and the deserializing and manual setting of @numLeafNodes, @numBranchNodes, @numBlobNodes is superfluous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to change it in this PR as they are used in few other places. Yes, I can remove these in a future jira.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mb.append(offsetBranches);
mb.append(uncompressedSize);
mb.append(originalBlobSize);
Expand Down Expand Up @@ -699,10 +694,12 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
PARENT::gatherActiveStats(activeStats);
{
CriticalBlock b(builderCS);
if (builder)
mergeStats(activeStats, builder, indexWriteActivityStatistics);
}
activeStats.setStatistic(StPerReplicated, replicateDone);
activeStats.setStatistic(StNumLeafCacheAdds, numLeafNodes);
activeStats.setStatistic(StNumNodeCacheAdds, numBranchNodes);
activeStats.setStatistic(StNumBlobCacheAdds, numBlobNodes);
}

// ICopyFileProgress
Expand Down
Loading