From 33b6a9541ae05d38d3ad955974c421edfc411ab8 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Thu, 25 Apr 2024 16:24:50 +0100 Subject: [PATCH] HPCC-31649 New StSizePeakEphemeralDisk and StSizePeakTempDisk for look ahead and hash distribute spilling Signed-off-by: Shamser Ahmed --- thorlcr/thorutil/thbuf.cpp | 56 +++++++++++++++++------------------ thorlcr/thorutil/thormisc.hpp | 11 +++++-- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index d64d06da1a2..d47124262d6 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -69,8 +69,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl ThorRowQueue *in; size32_t insz; ThorRowQueue *out; - Linked file; - Owned fileio; + CFileOwner tmpFileOwner; + Owned tempFileIO; SpinLock lock; bool waiting; Semaphore waitsem; @@ -139,12 +139,12 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl insz = 0; return; } - if (!fileio) { + if (!tempFileIO) { SpinUnblock unblock(lock); - fileio.setown(file->open(IFOcreaterw)); - if (!fileio) + tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw)); + if (!tempFileIO) { - throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename()); + throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",tmpFileOwner.queryIFile().queryFilename()); } } MemoryBuffer mb; @@ -182,7 +182,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl size32_t left = nb*blocksize-mb.length(); memset(mb.reserve(left),0,left); } - fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase()); + tempFileIO->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase()); + tmpFileOwner.noteSize(numblocks*blocksize); mb.clear(); } if (waiting) { @@ -220,8 +221,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl size32_t readBlockSize = nb*blocksize; byte *buf = (byte *)ma.allocate(readBlockSize); CThorStreamDeserializerSource ds(readBlockSize,buf); - assertex(fileio.get()); - size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf); + assertex(tempFileIO.get()); + size32_t rd = tempFileIO->read(blk*(offset_t)blocksize,readBlockSize,buf); assertex(rd==readBlockSize); for (;;) { byte b; @@ -246,8 +247,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif) - : activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer()) + CSmartRowBuffer(CActivityBase *_activity,IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif) + : activity(_activity), tmpFileOwner(_file, _activity->queryTempFileSizeTracker()), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer()) { #ifdef _DEBUG putrecheck = false; @@ -282,11 +283,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl while (out->ordinality()) ReleaseThorRow(out->dequeue()); delete out; - if (fileio) - { - fileio.clear(); - file->remove(); - } + if (tempFileIO) + tempFileIO.clear(); } void putRow(const void *row) @@ -609,8 +607,7 @@ class CSmartRowInMemoryBuffer: public CSimpleInterface, implements ISmartRowBuff ISmartRowBuffer * createSmartBuffer(CActivityBase *activity, const char * tempname, size32_t buffsize, IThorRowInterfaces *rowif) { - Owned file = createIFile(tempname); - return new CSmartRowBuffer(activity,file,buffsize,rowif); + return new CSmartRowBuffer(activity,createIFile(tempname),buffsize,rowif); } ISmartRowBuffer * createSmartInMemoryBuffer(CActivityBase *activity, IThorRowInterfaces *rowIf, size32_t buffsize) @@ -1197,8 +1194,8 @@ bool CRowSet::Release() const class CSharedWriteAheadDisk : public CSharedWriteAheadBase { - Owned spillFile; - Owned spillFileIO; + Owned tempFileOwner; + Owned tempFileIO; CIArrayOf freeChunks; PointerArrayOf freeChunksSized; QueueOf savedChunks; @@ -1420,7 +1417,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase } else { - Owned stream = createFileSerialStream(spillFileIO, chunk.offset); + Owned stream = createFileSerialStream(tempFileIO, chunk.offset); #ifdef TRACE_WRITEAHEAD unsigned diskChunkNum; stream->get(sizeof(diskChunkNum), &diskChunkNum); @@ -1484,7 +1481,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase mb.append((byte)0); size32_t len = mb.length(); chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list - spillFileIO->write(chunk->offset, len, mb.toByteArray()); + tempFileIO->write(chunk->offset, len, mb.toByteArray()); + tempFileOwner->noteSize(highOffset); #ifdef TRACE_WRITEAHEAD ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len); #endif @@ -1507,16 +1505,15 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()) { assertex(spillName); - spillFile.setown(createIFile(spillName)); - spillFile->setShareMode(IFSHnone); - spillFileIO.setown(spillFile->open(IFOcreaterw)); + Owned iFile = createIFile(spillName); + iFile->setShareMode(IFSHnone); + tempFileIO.setown(iFile->open(IFOcreaterw)); + tempFileOwner.setown(new CFileOwner(iFile.getClear(), activity->queryTempFileSizeTracker())); highOffset = 0; } ~CSharedWriteAheadDisk() { - spillFileIO.clear(); - if (spillFile) - spillFile->remove(); + tempFileIO.clear(); for (;;) { @@ -1536,7 +1533,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase freeChunks.kill(); freeChunksSized.kill(); highOffset = 0; - spillFileIO->setSize(0); + tempFileIO->setSize(0); + tempFileOwner->noteSize(0); } }; diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 2bc8f0c776d..9faad9556ae 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -356,9 +356,14 @@ class graph_decl CFileOwner : public CSimpleInterface, implements IInterface } void noteSize(offset_t size) { - fileSize = size; - if (fileSizeTracker) - fileSizeTracker->growSize(fileSize); + if (fileSizeTracker && fileSize!=size) + { + if (size > fileSize) + fileSizeTracker->growSize(size-fileSize); + else + fileSizeTracker->shrinkSize(fileSize-size); + fileSize = size; + } } IFile &queryIFile() const { return *iFile; } };