Skip to content

Commit

Permalink
HPCC-31649 New StSizePeakEphemeralDisk and StSizePeakTempDisk for loo…
Browse files Browse the repository at this point in the history
…k ahead and hash distribute spilling

Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jun 14, 2024
1 parent 65e845e commit 4d40d35
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions thorlcr/thorutil/thbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
ThorRowQueue *in;
size32_t insz;
ThorRowQueue *out;
Linked<IFile> file;
Owned<IFileIO> fileio;
CFileOwner tmpFileOwner;
Owned<IFileIO> tempFileIO;
SpinLock lock;
bool waiting;
Semaphore waitsem;
Expand Down Expand Up @@ -139,12 +139,12 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
insz = 0;
return;
}
if (!fileio) {
if (!tempFileIO) {
SpinUnblock unblock(lock);
fileio.setown(file->open(IFOcreaterw));
if (!fileio)
tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw));
if (!tempFileIO)
{
throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename());
throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",tmpFileOwner.queryIFile().queryFilename());
}
}
MemoryBuffer mb;
Expand Down Expand Up @@ -182,7 +182,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
size32_t left = nb*blocksize-mb.length();
memset(mb.reserve(left),0,left);
}
fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase());
tempFileIO->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase());
tmpFileOwner.noteSize(numblocks*blocksize);
mb.clear();
}
if (waiting) {
Expand Down Expand Up @@ -220,8 +221,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
size32_t readBlockSize = nb*blocksize;
byte *buf = (byte *)ma.allocate(readBlockSize);
CThorStreamDeserializerSource ds(readBlockSize,buf);
assertex(fileio.get());
size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf);
assertex(tempFileIO.get());
size32_t rd = tempFileIO->read(blk*(offset_t)blocksize,readBlockSize,buf);
assertex(rd==readBlockSize);
for (;;) {
byte b;
Expand All @@ -246,8 +247,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif)
: activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer())
CSmartRowBuffer(CActivityBase *_activity,IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif)
: activity(_activity), tmpFileOwner(_file, _activity->queryTempFileSizeTracker()), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer())
{
#ifdef _DEBUG
putrecheck = false;
Expand Down Expand Up @@ -282,11 +283,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl
while (out->ordinality())
ReleaseThorRow(out->dequeue());
delete out;
if (fileio)
{
fileio.clear();
file->remove();
}
if (tempFileIO)
tempFileIO.clear();
}

void putRow(const void *row)
Expand Down Expand Up @@ -1197,8 +1195,8 @@ bool CRowSet::Release() const

class CSharedWriteAheadDisk : public CSharedWriteAheadBase
{
Owned<IFile> spillFile;
Owned<IFileIO> spillFileIO;
Owned<CFileOwner> tempFileOwner;
Owned<IFileIO> tempFileIO;
CIArrayOf<Chunk> freeChunks;
PointerArrayOf<Chunk> freeChunksSized;
QueueOf<Chunk, false> savedChunks;
Expand Down Expand Up @@ -1420,7 +1418,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
}
else
{
Owned<ISerialStream> stream = createFileSerialStream(spillFileIO, chunk.offset);
Owned<ISerialStream> stream = createFileSerialStream(tempFileIO, chunk.offset);
#ifdef TRACE_WRITEAHEAD
unsigned diskChunkNum;
stream->get(sizeof(diskChunkNum), &diskChunkNum);
Expand Down Expand Up @@ -1484,7 +1482,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
mb.append((byte)0);
size32_t len = mb.length();
chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list
spillFileIO->write(chunk->offset, len, mb.toByteArray());
tempFileIO->write(chunk->offset, len, mb.toByteArray());
tempFileOwner->noteSize(highOffset);
#ifdef TRACE_WRITEAHEAD
ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len);
#endif
Expand All @@ -1507,16 +1506,15 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta())
{
assertex(spillName);
spillFile.setown(createIFile(spillName));
spillFile->setShareMode(IFSHnone);
spillFileIO.setown(spillFile->open(IFOcreaterw));
Owned<IFile> iFile = createIFile(spillName);
iFile->setShareMode(IFSHnone);
tempFileIO.setown(iFile->open(IFOcreaterw));
tempFileOwner.setown(new CFileOwner(iFile.getClear(), activity->queryTempFileSizeTracker()));
highOffset = 0;
}
~CSharedWriteAheadDisk()
{
spillFileIO.clear();
if (spillFile)
spillFile->remove();
tempFileIO.clear();

for (;;)
{
Expand All @@ -1536,7 +1534,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase
freeChunks.kill();
freeChunksSized.kill();
highOffset = 0;
spillFileIO->setSize(0);
tempFileIO->setSize(0);
tempFileOwner->noteSize(0);
}
};

Expand Down

0 comments on commit 4d40d35

Please sign in to comment.